Compare commits
3 Commits
1749f0a0b8
...
e733e7025d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e733e7025d | ||
|
|
b520178e3a | ||
|
|
9e07e73167 |
103
app/classes.py
103
app/classes.py
@ -1,11 +1,11 @@
|
||||
# Standard library imports
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
import datetime as dt
|
||||
from enum import auto, Enum
|
||||
import functools
|
||||
import threading
|
||||
from typing import NamedTuple
|
||||
from typing import Any, NamedTuple
|
||||
|
||||
# Third party imports
|
||||
|
||||
@ -21,6 +21,7 @@ from PyQt6.QtWidgets import (
|
||||
)
|
||||
|
||||
# App imports
|
||||
# from music_manager import FadeCurve
|
||||
|
||||
|
||||
# Define singleton first as it's needed below
|
||||
@ -91,31 +92,6 @@ class Filter:
|
||||
duration_unit: str = "minutes"
|
||||
|
||||
|
||||
@singleton
|
||||
@dataclass
|
||||
class MusicMusterSignals(QObject):
|
||||
"""
|
||||
Class for all MusicMuster signals. See:
|
||||
- https://zetcode.com/gui/pyqt5/eventssignals/
|
||||
- https://stackoverflow.com/questions/62654525/emit-a-signal-from-another-class-to-main-class
|
||||
"""
|
||||
|
||||
begin_reset_model_signal = pyqtSignal(int)
|
||||
enable_escape_signal = pyqtSignal(bool)
|
||||
end_reset_model_signal = pyqtSignal(int)
|
||||
next_track_changed_signal = pyqtSignal()
|
||||
resize_rows_signal = pyqtSignal(int)
|
||||
search_songfacts_signal = pyqtSignal(str)
|
||||
search_wikipedia_signal = pyqtSignal(str)
|
||||
show_warning_signal = pyqtSignal(str, str)
|
||||
span_cells_signal = pyqtSignal(int, int, int, int, int)
|
||||
status_message_signal = pyqtSignal(str, int)
|
||||
track_ended_signal = pyqtSignal()
|
||||
|
||||
def __post_init__(self):
|
||||
super().__init__()
|
||||
|
||||
|
||||
class PlaylistStyle(QProxyStyle):
|
||||
def drawPrimitive(self, element, option, painter, widget=None):
|
||||
"""
|
||||
@ -149,6 +125,79 @@ class Tags(NamedTuple):
|
||||
duration: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class TrackDTO:
|
||||
track_id: int
|
||||
artist: str
|
||||
bitrate: int
|
||||
duration: int
|
||||
fade_at: int
|
||||
intro: int | None
|
||||
path: str
|
||||
silence_at: int
|
||||
start_gap: int
|
||||
title: str
|
||||
lastplayed: dt.datetime | None
|
||||
|
||||
|
||||
@dataclass
|
||||
class PlaylistRowObj(TrackDTO):
|
||||
note: str
|
||||
played: bool
|
||||
playlist_id: int
|
||||
playlistrow_id: int
|
||||
row_number: int
|
||||
row_fg: str | None = None
|
||||
row_bg: str | None = None
|
||||
note_fg: str | None = None
|
||||
note_bg: str | None = None
|
||||
end_of_track_signalled: bool = False
|
||||
end_time: dt.datetime | None = None
|
||||
fade_graph: Any | None = None
|
||||
fade_graph_start_updates: dt.datetime | None = None
|
||||
resume_marker: float = 0.0
|
||||
forecast_end_time: dt.datetime | None = None
|
||||
forecast_start_time: dt.datetime | None = None
|
||||
start_time: dt.datetime | None = None
|
||||
|
||||
def is_playing(self) -> bool:
|
||||
"""
|
||||
Return True if we're currently playing else False
|
||||
"""
|
||||
|
||||
if self.start_time is None:
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
class TrackInfo(NamedTuple):
|
||||
track_id: int
|
||||
row_number: int
|
||||
|
||||
|
||||
@singleton
|
||||
@dataclass
|
||||
class MusicMusterSignals(QObject):
|
||||
"""
|
||||
Class for all MusicMuster signals. See:
|
||||
- https://zetcode.com/gui/pyqt5/eventssignals/
|
||||
- https://stackoverflow.com/questions/62654525/emit-a-signal-from-another-class-to-main-class
|
||||
"""
|
||||
|
||||
begin_reset_model_signal = pyqtSignal(int)
|
||||
enable_escape_signal = pyqtSignal(bool)
|
||||
end_reset_model_signal = pyqtSignal(int)
|
||||
next_track_changed_signal = pyqtSignal()
|
||||
resize_rows_signal = pyqtSignal(int)
|
||||
search_songfacts_signal = pyqtSignal(str)
|
||||
search_wikipedia_signal = pyqtSignal(str)
|
||||
show_warning_signal = pyqtSignal(str, str)
|
||||
signal_set_next_row = pyqtSignal(int)
|
||||
signal_set_next_track = pyqtSignal(PlaylistRowObj)
|
||||
span_cells_signal = pyqtSignal(int, int, int, int, int)
|
||||
status_message_signal = pyqtSignal(str, int)
|
||||
track_ended_signal = pyqtSignal()
|
||||
|
||||
def __post_init__(self):
|
||||
super().__init__()
|
||||
|
||||
@ -395,6 +395,7 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
|
||||
given playlist_id and row
|
||||
"""
|
||||
|
||||
# TODO: use selectinload?
|
||||
stmt = (
|
||||
select(PlaylistRows)
|
||||
.options(joinedload(cls.track))
|
||||
|
||||
@ -89,7 +89,7 @@ class _AddFadeCurve(QObject):
|
||||
Create fade curve and add to PlaylistTrack object
|
||||
"""
|
||||
|
||||
fc = _FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at)
|
||||
fc = FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at)
|
||||
if not fc:
|
||||
log.error(f"Failed to create FadeCurve for {self.track_path=}")
|
||||
else:
|
||||
@ -97,7 +97,7 @@ class _AddFadeCurve(QObject):
|
||||
self.finished.emit()
|
||||
|
||||
|
||||
class _FadeCurve:
|
||||
class FadeCurve:
|
||||
GraphWidget: Optional[PlotWidget] = None
|
||||
|
||||
def __init__(
|
||||
@ -238,27 +238,6 @@ class _Music:
|
||||
# except Exception as e:
|
||||
# log.error(f"Failed to set up VLC logging: {e}")
|
||||
|
||||
def adjust_by_ms(self, ms: int) -> None:
|
||||
"""Move player position by ms milliseconds"""
|
||||
|
||||
if not self.player:
|
||||
return
|
||||
|
||||
elapsed_ms = self.get_playtime()
|
||||
position = self.get_position()
|
||||
if not position:
|
||||
position = 0.0
|
||||
new_position = max(0.0, position + ((position * ms) / elapsed_ms))
|
||||
self.set_position(new_position)
|
||||
# Adjus start time so elapsed time calculations are correct
|
||||
if new_position == 0:
|
||||
self.start_dt = dt.datetime.now()
|
||||
else:
|
||||
if self.start_dt:
|
||||
self.start_dt -= dt.timedelta(milliseconds=ms)
|
||||
else:
|
||||
self.start_dt = dt.datetime.now() - dt.timedelta(milliseconds=ms)
|
||||
|
||||
def fade(self, fade_seconds: int) -> None:
|
||||
"""
|
||||
Fade the currently playing track.
|
||||
@ -353,21 +332,6 @@ class _Music:
|
||||
self.player.set_position(position)
|
||||
self.start_dt = start_time
|
||||
|
||||
# For as-yet unknown reasons. sometimes the volume gets
|
||||
# reset to zero within 200mS or so of starting play. This
|
||||
# only happened since moving to Debian 12, which uses
|
||||
# Pipewire for sound (which may be irrelevant).
|
||||
# It has been known for the volume to need correcting more
|
||||
# than once in the first 200mS.
|
||||
# Update August 2024: This no longer seems to be an issue
|
||||
# for _ in range(3):
|
||||
# if self.player:
|
||||
# volume = self.player.audio_get_volume()
|
||||
# if volume < Config.VLC_VOLUME_DEFAULT:
|
||||
# self.set_volume(Config.VLC_VOLUME_DEFAULT)
|
||||
# log.error(f"Reset from {volume=}")
|
||||
# sleep(0.1)
|
||||
|
||||
def set_position(self, position: float) -> None:
|
||||
"""
|
||||
Set player position
|
||||
@ -478,7 +442,7 @@ class RowAndTrack:
|
||||
# Track playing data
|
||||
self.end_of_track_signalled: bool = False
|
||||
self.end_time: Optional[dt.datetime] = None
|
||||
self.fade_graph: Optional[_FadeCurve] = None
|
||||
self.fade_graph: Optional[FadeCurve] = None
|
||||
self.fade_graph_start_updates: Optional[dt.datetime] = None
|
||||
self.resume_marker: Optional[float] = 0.0
|
||||
self.forecast_end_time: Optional[dt.datetime] = None
|
||||
@ -519,25 +483,6 @@ class RowAndTrack:
|
||||
self.signals.track_ended_signal.emit()
|
||||
self.end_of_track_signalled = True
|
||||
|
||||
def create_fade_graph(self) -> None:
|
||||
"""
|
||||
Initialise and add FadeCurve in a thread as it's slow
|
||||
"""
|
||||
|
||||
self.fadecurve_thread = QThread()
|
||||
self.worker = _AddFadeCurve(
|
||||
self,
|
||||
track_path=self.path,
|
||||
track_fade_at=self.fade_at,
|
||||
track_silence_at=self.silence_at,
|
||||
)
|
||||
self.worker.moveToThread(self.fadecurve_thread)
|
||||
self.fadecurve_thread.started.connect(self.worker.run)
|
||||
self.worker.finished.connect(self.fadecurve_thread.quit)
|
||||
self.worker.finished.connect(self.worker.deleteLater)
|
||||
self.fadecurve_thread.finished.connect(self.fadecurve_thread.deleteLater)
|
||||
self.fadecurve_thread.start()
|
||||
|
||||
def drop3db(self, enable: bool) -> None:
|
||||
"""
|
||||
If enable is true, drop output by 3db else restore to full volume
|
||||
@ -565,20 +510,6 @@ class RowAndTrack:
|
||||
|
||||
return self.music.is_playing()
|
||||
|
||||
def move_back(self, ms: int = Config.PREVIEW_BACK_MS) -> None:
|
||||
"""
|
||||
Rewind player by ms milliseconds
|
||||
"""
|
||||
|
||||
self.music.adjust_by_ms(ms * -1)
|
||||
|
||||
def move_forward(self, ms: int = Config.PREVIEW_ADVANCE_MS) -> None:
|
||||
"""
|
||||
Rewind player by ms milliseconds
|
||||
"""
|
||||
|
||||
self.music.adjust_by_ms(ms)
|
||||
|
||||
def play(self, position: Optional[float] = None) -> None:
|
||||
"""Play track"""
|
||||
|
||||
@ -599,13 +530,6 @@ class RowAndTrack:
|
||||
milliseconds=update_graph_at_ms
|
||||
)
|
||||
|
||||
def restart(self) -> None:
|
||||
"""
|
||||
Restart player
|
||||
"""
|
||||
|
||||
self.music.adjust_by_ms(self.time_playing() * -1)
|
||||
|
||||
def set_forecast_start_time(
|
||||
self, modified_rows: list[int], start: Optional[dt.datetime]
|
||||
) -> Optional[dt.datetime]:
|
||||
@ -743,7 +667,28 @@ class TrackSequence:
|
||||
self.next = None
|
||||
else:
|
||||
self.next = rat
|
||||
self.next.create_fade_graph()
|
||||
self.create_fade_graph()
|
||||
|
||||
def create_fade_graph(self) -> None:
|
||||
"""
|
||||
Initialise and add FadeCurve in a thread as it's slow
|
||||
"""
|
||||
|
||||
self.fadecurve_thread = QThread()
|
||||
if self.next is None:
|
||||
raise ApplicationError("hell in a handcart")
|
||||
self.worker = _AddFadeCurve(
|
||||
self.next,
|
||||
track_path=self.next.path,
|
||||
track_fade_at=self.next.fade_at,
|
||||
track_silence_at=self.next.silence_at,
|
||||
)
|
||||
self.worker.moveToThread(self.fadecurve_thread)
|
||||
self.fadecurve_thread.started.connect(self.worker.run)
|
||||
self.worker.finished.connect(self.fadecurve_thread.quit)
|
||||
self.worker.finished.connect(self.worker.deleteLater)
|
||||
self.fadecurve_thread.finished.connect(self.fadecurve_thread.deleteLater)
|
||||
self.fadecurve_thread.start()
|
||||
|
||||
|
||||
track_sequence = TrackSequence()
|
||||
|
||||
@ -1255,9 +1255,6 @@ class Window(QMainWindow):
|
||||
|
||||
# # # # # # # # # # Internal utility functions # # # # # # # # # #
|
||||
|
||||
def active_base_model(self) -> PlaylistModel:
|
||||
return self.current.base_model
|
||||
|
||||
def active_tab(self) -> PlaylistTab:
|
||||
return self.playlist_section.tabPlaylist.currentWidget()
|
||||
|
||||
@ -2524,11 +2521,13 @@ class Window(QMainWindow):
|
||||
Set currently-selected row on visible playlist tab as next track
|
||||
"""
|
||||
|
||||
playlist_tab = self.active_tab()
|
||||
if playlist_tab:
|
||||
playlist_tab.set_row_as_next_track()
|
||||
else:
|
||||
log.error("No active tab")
|
||||
self.signals.signal_set_next_row.emit(self.current.playlist_id)
|
||||
self.clear_selection()
|
||||
# playlist_tab = self.active_tab()
|
||||
# if playlist_tab:
|
||||
# playlist_tab.set_row_as_next_track()
|
||||
# else:
|
||||
# log.error("No active tab")
|
||||
|
||||
def set_tab_colour(self, widget: PlaylistTab, colour: QColor) -> None:
|
||||
"""
|
||||
|
||||
@ -35,6 +35,7 @@ from classes import (
|
||||
ApplicationError,
|
||||
Col,
|
||||
MusicMusterSignals,
|
||||
PlaylistRowObj,
|
||||
)
|
||||
from config import Config
|
||||
from helpers import (
|
||||
@ -49,6 +50,7 @@ from helpers import (
|
||||
from log import log
|
||||
from models import db, NoteColours, Playdates, PlaylistRows, Tracks
|
||||
from music_manager import RowAndTrack, track_sequence
|
||||
from repository import get_playlist_rows
|
||||
|
||||
|
||||
HEADER_NOTES_COLUMN = 1
|
||||
@ -83,8 +85,10 @@ class PlaylistModel(QAbstractTableModel):
|
||||
self.playlist_id = playlist_id
|
||||
self.is_template = is_template
|
||||
|
||||
self.playlist_rows: dict[int, RowAndTrack] = {}
|
||||
self.playlist_rows: dict[int, PlaylistRowObj] = {}
|
||||
self.selected_rows: list[PlaylistRowObj] = []
|
||||
self.signals = MusicMusterSignals()
|
||||
self.signals.signal_set_next_row.connect(self.set_next_row)
|
||||
self.played_tracks_hidden = False
|
||||
|
||||
self.signals.begin_reset_model_signal.connect(self.begin_reset_model)
|
||||
@ -303,9 +307,7 @@ class PlaylistModel(QAbstractTableModel):
|
||||
|
||||
# Update colour and times for current row
|
||||
# only invalidate required roles
|
||||
roles = [
|
||||
Qt.ItemDataRole.DisplayRole
|
||||
]
|
||||
roles = [Qt.ItemDataRole.DisplayRole]
|
||||
self.invalidate_row(row_number, roles)
|
||||
|
||||
# Update previous row in case we're hiding played rows
|
||||
@ -756,7 +758,9 @@ class PlaylistModel(QAbstractTableModel):
|
||||
Qt.ItemDataRole.FontRole,
|
||||
Qt.ItemDataRole.ForegroundRole,
|
||||
]
|
||||
self.invalidate_rows(list(range(new_row_number, len(self.playlist_rows))), roles)
|
||||
self.invalidate_rows(
|
||||
list(range(new_row_number, len(self.playlist_rows))), roles
|
||||
)
|
||||
|
||||
def invalidate_row(self, modified_row: int, roles: list[Qt.ItemDataRole]) -> None:
|
||||
"""
|
||||
@ -768,10 +772,12 @@ class PlaylistModel(QAbstractTableModel):
|
||||
self.dataChanged.emit(
|
||||
self.index(modified_row, 0),
|
||||
self.index(modified_row, self.columnCount() - 1),
|
||||
roles
|
||||
roles,
|
||||
)
|
||||
|
||||
def invalidate_rows(self, modified_rows: list[int], roles: list[Qt.ItemDataRole]) -> None:
|
||||
def invalidate_rows(
|
||||
self, modified_rows: list[int], roles: list[Qt.ItemDataRole]
|
||||
) -> None:
|
||||
"""
|
||||
Signal to view to refresh invlidated rows
|
||||
"""
|
||||
@ -820,20 +826,24 @@ class PlaylistModel(QAbstractTableModel):
|
||||
# We used to clear self.playlist_rows each time but that's
|
||||
# expensive and slow on big playlists
|
||||
|
||||
# Note where each playlist_id is
|
||||
plid_to_row: dict[int, int] = {}
|
||||
for oldrow in self.playlist_rows:
|
||||
plrdata = self.playlist_rows[oldrow]
|
||||
plid_to_row[plrdata.playlistrow_id] = plrdata.row_number
|
||||
# # Note where each playlist_id is
|
||||
# plid_to_row: dict[int, int] = {}
|
||||
# for oldrow in self.playlist_rows:
|
||||
# plrdata = self.playlist_rows[oldrow]
|
||||
# plid_to_row[plrdata.playlistrow_id] = plrdata.row_number
|
||||
|
||||
# build a new playlist_rows
|
||||
new_playlist_rows: dict[int, RowAndTrack] = {}
|
||||
for p in PlaylistRows.get_playlist_rows(session, self.playlist_id):
|
||||
if p.id not in plid_to_row:
|
||||
new_playlist_rows[p.row_number] = RowAndTrack(p)
|
||||
else:
|
||||
new_playlist_rows[p.row_number] = self.playlist_rows[plid_to_row[p.id]]
|
||||
new_playlist_rows[p.row_number].row_number = p.row_number
|
||||
# new_playlist_rows: dict[int, RowAndTrack] = {}
|
||||
# for p in PlaylistRows.get_playlist_rows(session, self.playlist_id):
|
||||
# if p.id not in plid_to_row:
|
||||
# new_playlist_rows[p.row_number] = RowAndTrack(p)
|
||||
# else:
|
||||
# new_playlist_rows[p.row_number] = self.playlist_rows[plid_to_row[p.id]]
|
||||
# new_playlist_rows[p.row_number].row_number = p.row_number
|
||||
# build a new playlist_rows
|
||||
new_playlist_rows: dict[int, PlaylistRowObj] = {}
|
||||
for p in get_playlist_rows(self.playlist_id):
|
||||
new_playlist_rows[p.row_number] = p
|
||||
|
||||
# Copy to self.playlist_rows
|
||||
self.playlist_rows = new_playlist_rows
|
||||
@ -1419,55 +1429,53 @@ class PlaylistModel(QAbstractTableModel):
|
||||
|
||||
return True
|
||||
|
||||
def set_next_row(self, row_number: Optional[int]) -> None:
|
||||
def set_selected_rows(self, selected_rows: list[int]) -> None:
|
||||
"""
|
||||
Set row_number as next track. If row_number is None, clear next track.
|
||||
|
||||
Return True if successful else False.
|
||||
Keep track of which rows are selected in the view
|
||||
"""
|
||||
|
||||
log.debug(f"{self}: set_next_row({row_number=})")
|
||||
self.selected_rows = [self.playlist_rows[a] for a in selected_rows]
|
||||
|
||||
if row_number is None:
|
||||
# Clear next track
|
||||
def set_next_row(self, playlist_id: int) -> None:
|
||||
"""
|
||||
Handle signal_set_next_row
|
||||
"""
|
||||
|
||||
log.debug(f"{self}: set_next_row({playlist_id=})")
|
||||
if playlist_id != self.playlist_id:
|
||||
# Not for us
|
||||
return
|
||||
|
||||
if len(self.selected_rows) == 0:
|
||||
# No row selected so clear next track
|
||||
if track_sequence.next is not None:
|
||||
track_sequence.set_next(None)
|
||||
else:
|
||||
# Get playlistrow_id of row
|
||||
try:
|
||||
rat = self.playlist_rows[row_number]
|
||||
except IndexError:
|
||||
log.error(f"{self} set_track_sequence.next({row_number=}, IndexError")
|
||||
return
|
||||
if rat.track_id is None or rat.row_number is None:
|
||||
log.error(
|
||||
f"{self} .set_track_sequence.next({row_number=}, "
|
||||
f"No track / row number {rat.track_id=}, {rat.row_number=}"
|
||||
)
|
||||
return
|
||||
return
|
||||
|
||||
old_next_row: Optional[int] = None
|
||||
if track_sequence.next:
|
||||
old_next_row = track_sequence.next.row_number
|
||||
if len(self.selected_rows) > 1:
|
||||
self.signals.show_warning_signal.emit(
|
||||
"Too many rows selected", "Select one row for next row"
|
||||
)
|
||||
return
|
||||
|
||||
track_sequence.set_next(rat)
|
||||
rat = self.selected_rows[0]
|
||||
if rat.track_id is None:
|
||||
raise ApplicationError(f"set_next_row: no track_id ({rat=})")
|
||||
|
||||
if Config.WIKIPEDIA_ON_NEXT:
|
||||
self.signals.search_wikipedia_signal.emit(
|
||||
self.playlist_rows[row_number].title
|
||||
)
|
||||
if Config.SONGFACTS_ON_NEXT:
|
||||
self.signals.search_songfacts_signal.emit(
|
||||
self.playlist_rows[row_number].title
|
||||
)
|
||||
roles = [
|
||||
Qt.ItemDataRole.BackgroundRole,
|
||||
]
|
||||
if old_next_row is not None:
|
||||
# only invalidate required roles
|
||||
self.invalidate_row(old_next_row, roles)
|
||||
old_next_row: Optional[int] = None
|
||||
if track_sequence.next:
|
||||
old_next_row = track_sequence.next.row_number
|
||||
|
||||
track_sequence.set_next(rat)
|
||||
|
||||
roles = [
|
||||
Qt.ItemDataRole.BackgroundRole,
|
||||
]
|
||||
if old_next_row is not None:
|
||||
# only invalidate required roles
|
||||
self.invalidate_row(row_number, roles)
|
||||
self.invalidate_row(old_next_row, roles)
|
||||
# only invalidate required roles
|
||||
self.invalidate_row(rat.row_number, roles)
|
||||
|
||||
self.signals.next_track_changed_signal.emit()
|
||||
self.update_track_times()
|
||||
@ -1725,7 +1733,7 @@ class PlaylistModel(QAbstractTableModel):
|
||||
continue
|
||||
|
||||
# Set start/end
|
||||
next_start_time = rat.set_forecast_start_time(update_rows, next_start_time)
|
||||
rat.forecast_start_time = next_start_time
|
||||
|
||||
# Update start/stop times of rows that have changed
|
||||
for updated_row in update_rows:
|
||||
@ -1811,7 +1819,9 @@ class PlaylistProxyModel(QSortFilterProxyModel):
|
||||
]
|
||||
QTimer.singleShot(
|
||||
Config.HIDE_AFTER_PLAYING_OFFSET + 100,
|
||||
lambda: self.sourceModel().invalidate_row(source_row, roles),
|
||||
lambda: self.sourceModel().invalidate_row(
|
||||
source_row, roles
|
||||
),
|
||||
)
|
||||
return True
|
||||
# Next track not playing yet so don't hide previous
|
||||
|
||||
@ -456,11 +456,14 @@ class PlaylistTab(QTableView):
|
||||
self, selected: QItemSelection, deselected: QItemSelection
|
||||
) -> None:
|
||||
"""
|
||||
Tell model which rows are selected.
|
||||
|
||||
Toggle drag behaviour according to whether rows are selected
|
||||
"""
|
||||
|
||||
selected_rows = self.get_selected_rows()
|
||||
self.musicmuster.current.selected_rows = selected_rows
|
||||
self.get_base_model().set_selected_rows(selected_rows)
|
||||
|
||||
# If no rows are selected, we have nothing to do
|
||||
if len(selected_rows) == 0:
|
||||
|
||||
119
app/repository.py
Normal file
119
app/repository.py
Normal file
@ -0,0 +1,119 @@
|
||||
# Standard library imports
|
||||
|
||||
# PyQt imports
|
||||
|
||||
# Third party imports
|
||||
from sqlalchemy import select, func
|
||||
from sqlalchemy.orm import aliased
|
||||
from classes import PlaylistRowObj
|
||||
|
||||
# App imports
|
||||
from classes import TrackDTO
|
||||
from models import db, Tracks, PlaylistRows, Playdates
|
||||
|
||||
|
||||
def tracks_like_title(filter_str: str) -> list[TrackDTO]:
|
||||
"""
|
||||
Return tracks where title is like filter
|
||||
"""
|
||||
|
||||
# TODO: add in playdates as per Tracks.search_titles
|
||||
with db.Session() as session:
|
||||
stmt = select(Tracks).where(Tracks.title.ilike(f"%{filter_str}%"))
|
||||
results = (
|
||||
session.execute(stmt).scalars().unique().all()
|
||||
) # `scalars()` extracts ORM objects
|
||||
return [
|
||||
TrackDTO(**{k: v for k, v in vars(t).items() if not k.startswith("_")})
|
||||
for t in results
|
||||
]
|
||||
|
||||
|
||||
def get_playlist_rows(playlist_id: int) -> list[PlaylistRowObj]:
|
||||
# Alias PlaydatesTable for subquery
|
||||
LatestPlaydate = aliased(Playdates)
|
||||
|
||||
# Subquery: latest playdate for each track
|
||||
latest_playdate_subq = (
|
||||
select(
|
||||
LatestPlaydate.track_id,
|
||||
func.max(LatestPlaydate.lastplayed).label("lastplayed")
|
||||
)
|
||||
.group_by(LatestPlaydate.track_id)
|
||||
.subquery()
|
||||
)
|
||||
|
||||
stmt = (
|
||||
select(
|
||||
PlaylistRows.id.label("playlistrow_id"),
|
||||
PlaylistRows.row_number,
|
||||
PlaylistRows.note,
|
||||
PlaylistRows.played,
|
||||
PlaylistRows.playlist_id,
|
||||
Tracks.id.label("track_id"),
|
||||
Tracks.artist,
|
||||
Tracks.bitrate,
|
||||
Tracks.duration,
|
||||
Tracks.fade_at,
|
||||
Tracks.intro,
|
||||
Tracks.path,
|
||||
Tracks.silence_at,
|
||||
Tracks.start_gap,
|
||||
Tracks.title,
|
||||
latest_playdate_subq.c.lastplayed,
|
||||
)
|
||||
.outerjoin(Tracks, PlaylistRows.track_id == Tracks.id)
|
||||
.outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id)
|
||||
.where(PlaylistRows.playlist_id == playlist_id)
|
||||
.order_by(PlaylistRows.row_number)
|
||||
)
|
||||
|
||||
with db.Session() as session:
|
||||
results = session.execute(stmt).all()
|
||||
|
||||
dto_list = []
|
||||
for row in results:
|
||||
# Handle cases where track_id is None (no track associated)
|
||||
if row.track_id is None:
|
||||
dto = PlaylistRowObj(
|
||||
artist="",
|
||||
bitrate=0,
|
||||
duration=0,
|
||||
fade_at=0,
|
||||
intro=None,
|
||||
lastplayed=None,
|
||||
note=row.note,
|
||||
path="",
|
||||
played=row.played,
|
||||
playlist_id=row.playlist_id,
|
||||
playlistrow_id=row.playlistrow_id,
|
||||
row_number=row.row_number,
|
||||
silence_at=0,
|
||||
start_gap=0,
|
||||
title="",
|
||||
track_id=-1,
|
||||
# Additional fields like row_fg, row_bg, etc., use default None values
|
||||
)
|
||||
else:
|
||||
dto = PlaylistRowObj(
|
||||
artist=row.artist,
|
||||
bitrate=row.bitrate,
|
||||
duration=row.duration,
|
||||
fade_at=row.fade_at,
|
||||
intro=row.intro,
|
||||
lastplayed=row.lastplayed,
|
||||
note=row.note,
|
||||
path=row.path,
|
||||
played=row.played,
|
||||
playlist_id=row.playlist_id,
|
||||
playlistrow_id=row.playlistrow_id,
|
||||
row_number=row.row_number,
|
||||
silence_at=row.silence_at,
|
||||
start_gap=row.start_gap,
|
||||
title=row.title,
|
||||
track_id=row.track_id,
|
||||
# Additional fields like row_fg, row_bg, etc., use default None values
|
||||
)
|
||||
dto_list.append(dto)
|
||||
|
||||
return dto_list
|
||||
Loading…
Reference in New Issue
Block a user