From e40a4ab57a0a0970d051feb983f60b3742092add Mon Sep 17 00:00:00 2001 From: Keith Edmunds Date: Mon, 17 Mar 2025 18:43:46 +0000 Subject: [PATCH] WIP remove sessions, use reporistory --- app/classes.py | 39 ++-- app/helpers.py | 2 +- app/models.py | 4 +- app/music_manager.py | 220 ++++++++++++------- app/musicmuster.py | 6 +- app/playlistmodel.py | 218 +++++++++---------- app/querylistmodel.py | 4 +- app/repository.py | 384 ++++++++++++++++++++++++++++++++- tests/template_test_harness.py | 40 ++++ tests/test_db_updates.py | 76 +++++++ 10 files changed, 753 insertions(+), 240 deletions(-) create mode 100644 tests/template_test_harness.py create mode 100644 tests/test_db_updates.py diff --git a/app/classes.py b/app/classes.py index 94417bb..43eaaca 100644 --- a/app/classes.py +++ b/app/classes.py @@ -5,7 +5,7 @@ import datetime as dt from enum import auto, Enum import functools import threading -from typing import Any, NamedTuple +from typing import NamedTuple # Third party imports @@ -125,6 +125,15 @@ class Tags(NamedTuple): duration: int = 0 +@dataclass +class PlaylistDTO: + name: str + playlist_id: int + favourite: bool = False + is_template: bool = False + open: bool = False + + @dataclass class TrackDTO: track_id: int @@ -141,34 +150,12 @@ class TrackDTO: @dataclass -class PlaylistRowObj(TrackDTO): +class PlaylistRowDTO(TrackDTO): note: str played: bool playlist_id: int playlistrow_id: int row_number: int - row_fg: str | None = None - row_bg: str | None = None - note_fg: str | None = None - note_bg: str | None = None - end_of_track_signalled: bool = False - end_time: dt.datetime | None = None - fade_graph: Any | None = None - fade_graph_start_updates: dt.datetime | None = None - resume_marker: float = 0.0 - forecast_end_time: dt.datetime | None = None - forecast_start_time: dt.datetime | None = None - start_time: dt.datetime | None = None - - def is_playing(self) -> bool: - """ - Return True if we're currently playing else False - """ - - if self.start_time is None: - return False - - return True class TrackInfo(NamedTuple): @@ -193,8 +180,10 @@ class MusicMusterSignals(QObject): search_songfacts_signal = pyqtSignal(str) search_wikipedia_signal = pyqtSignal(str) show_warning_signal = pyqtSignal(str, str) + signal_add_track_to_header = pyqtSignal(int, int) signal_set_next_row = pyqtSignal(int) - signal_set_next_track = pyqtSignal(PlaylistRowObj) + # TODO: undestirable (and unresolvable) reference + # signal_set_next_track = pyqtSignal(PlaylistRow) span_cells_signal = pyqtSignal(int, int, int, int, int) status_message_signal = pyqtSignal(str, int) track_ended_signal = pyqtSignal() diff --git a/app/helpers.py b/app/helpers.py index 2971c08..929f3b6 100644 --- a/app/helpers.py +++ b/app/helpers.py @@ -20,7 +20,7 @@ from pydub.utils import mediainfo from tinytag import TinyTag, TinyTagException # type: ignore # App imports -from classes import AudioMetadata, ApplicationError, Tags +from classes import AudioMetadata, ApplicationError, Tags, TrackDTO from config import Config from log import log from models import Tracks diff --git a/app/models.py b/app/models.py index 44cf141..fe62ea3 100644 --- a/app/models.py +++ b/app/models.py @@ -241,7 +241,9 @@ class Playlists(dbtables.PlaylistsTable): """ session.execute( - update(Playlists).where((Playlists.id.in_(playlist_ids))).values(tab=None) + update(Playlists) + .where(Playlists.id.in_(playlist_ids)) + .values(tab=None) ) def close(self, session: Session) -> None: diff --git a/app/music_manager.py b/app/music_manager.py index 89ded2d..7bf6aac 100644 --- a/app/music_manager.py +++ b/app/music_manager.py @@ -3,7 +3,7 @@ from __future__ import annotations import datetime as dt from time import sleep -from typing import Optional +from typing import Any, Optional # Third party imports # import line_profiler @@ -27,7 +27,7 @@ from classes import ApplicationError, MusicMusterSignals from config import Config import helpers from log import log -from models import PlaylistRows +from repository import PlaylistRowDTO from vlcmanager import VLCManager # Define the VLC callback function type @@ -73,13 +73,13 @@ class _AddFadeCurve(QObject): def __init__( self, - rat: RowAndTrack, + plr: PlaylistRow, track_path: str, track_fade_at: int, track_silence_at: int, ) -> None: super().__init__() - self.rat = rat + self.plr = plr self.track_path = track_path self.track_fade_at = track_fade_at self.track_silence_at = track_silence_at @@ -93,7 +93,7 @@ class _AddFadeCurve(QObject): if not fc: log.error(f"Failed to create FadeCurve for {self.track_path=}") else: - self.rat.fade_graph = fc + self.plr.fade_graph = fc self.finished.emit() @@ -271,11 +271,11 @@ class _Music: elapsed_seconds = (now - self.start_dt).total_seconds() return int(elapsed_seconds * 1000) - def get_position(self) -> Optional[float]: + def get_position(self) -> float: """Return current position""" if not self.player: - return None + return 0.0 return self.player.get_position() def is_playing(self) -> bool: @@ -383,84 +383,138 @@ class _Music: self.player = None -class RowAndTrack: +class PlaylistRow: """ - Object to manage playlist rows and tracks. + Object to manage playlist row and track. """ - def __init__(self, playlist_row: PlaylistRows) -> None: + def __init__(self, dto: PlaylistRowDTO) -> None: """ - Initialises data structure. - - The passed PlaylistRows object will include a Tracks object if this - row has a track. + The dto object will include a Tracks object if this row has a track. """ - # Collect playlistrow data - self.note = playlist_row.note - self.played = playlist_row.played - self.playlist_id = playlist_row.playlist_id - self.playlistrow_id = playlist_row.id - self.row_number = playlist_row.row_number - self.track_id = playlist_row.track_id - - # Playlist display data - self.row_fg: Optional[str] = None - self.row_bg: Optional[str] = None - self.note_fg: Optional[str] = None - self.note_bg: Optional[str] = None - - # Collect track data if there's a track - if playlist_row.track_id: - self.artist = playlist_row.track.artist - self.bitrate = playlist_row.track.bitrate - self.duration = playlist_row.track.duration - self.fade_at = playlist_row.track.fade_at - self.intro = playlist_row.track.intro - if playlist_row.track.playdates: - self.lastplayed = max( - [a.lastplayed for a in playlist_row.track.playdates] - ) - else: - self.lastplayed = Config.EPOCH - self.path = playlist_row.track.path - self.silence_at = playlist_row.track.silence_at - self.start_gap = playlist_row.track.start_gap - self.title = playlist_row.track.title - else: - self.artist = "" - self.bitrate = 0 - self.duration = 0 - self.fade_at = 0 - self.intro = None - self.lastplayed = Config.EPOCH - self.path = "" - self.silence_at = 0 - self.start_gap = 0 - self.title = "" - - # Track playing data - self.end_of_track_signalled: bool = False - self.end_time: Optional[dt.datetime] = None - self.fade_graph: Optional[FadeCurve] = None - self.fade_graph_start_updates: Optional[dt.datetime] = None - self.resume_marker: Optional[float] = 0.0 - self.forecast_end_time: Optional[dt.datetime] = None - self.forecast_start_time: Optional[dt.datetime] = None - self.start_time: Optional[dt.datetime] = None - - # Other object initialisation + self.dto = dto self.music = _Music(name=Config.VLC_MAIN_PLAYER_NAME) self.signals = MusicMusterSignals() + self.end_of_track_signalled: bool = False + self.end_time: dt.datetime | None = None + self.fade_graph: Any | None = None + self.fade_graph_start_updates: dt.datetime | None = None + self.forecast_end_time: dt.datetime | None = None + self.forecast_start_time: dt.datetime | None = None + self.note_bg: str | None = None + self.note_fg: str | None = None + self.resume_marker: float = 0.0 + self.row_bg: str | None = None + self.row_fg: str | None = None + self.start_time: dt.datetime | None = None def __repr__(self) -> str: return ( - f"" + f"" ) + # Expose TrackDTO fields as properties + @property + def artist(self): + return self.dto.artist + + @property + def bitrate(self): + return self.dto.bitrate + + @property + def duration(self): + return self.dto.duration + + @property + def fade_at(self): + return self.dto.fade_at + + @property + def intro(self): + return self.dto.intro + + @property + def lastplayed(self): + return self.dto.lastplayed + + @property + def path(self): + return self.dto.path + + @property + def silence_at(self): + return self.dto.silence_at + + @property + def start_gap(self): + return self.dto.start_gap + + @property + def title(self): + return self.dto.title + + @property + def track_id(self): + return self.dto.track_id + + @track_id.setter + def track_id(self, value: int) -> None: + """ + Adding a track_id should only happen to a header row. + """ + + if self.track_id: + raise ApplicationError("Attempting to add track to row with existing track ({self=}") + + # TODO: set up write access to track_id. Should only update if + # track_id == 0. Need to update all other track fields at the + # same time. + print("set track_id attribute for {self=}, {value=}") + pass + + # Expose PlaylistRowDTO fields as properties + @property + def note(self): + return self.dto.note + + @note.setter + def note(self, value: str) -> None: + # TODO set up write access to db + print("set note attribute for {self=}, {value=}") + # self.dto.note = value + + @property + def played(self): + return self.dto.played + + @played.setter + def played(self, value: bool = True) -> None: + # TODO set up write access to db + print("set played attribute for {self=}") + # self.dto.played = value + + @property + def playlist_id(self): + return self.dto.playlist_id + + @property + def playlistrow_id(self): + return self.dto.playlistrow_id + + @property + def row_number(self): + return self.dto.row_number + + @row_number.setter + def row_number(self, value: int) -> None: + # TODO do we need to set up write access to db? + self.dto.row_number = value + def check_for_end_of_track(self) -> None: """ Check whether track has ended. If so, emit track_ended_signal @@ -552,7 +606,7 @@ class RowAndTrack: changed = True new_start_time = None else: - end_time = start + dt.timedelta(milliseconds=self.duration) + end_time = start + dt.timedelta(milliseconds=self.duration()) new_start_time = end_time if self.forecast_end_time != end_time: self.forecast_end_time = end_time @@ -640,19 +694,21 @@ class RowAndTrack: Update local playlist_id and row_number from playlistrow_id """ - plr = session.get(PlaylistRows, self.playlistrow_id) - if not plr: - raise ApplicationError(f"(Can't retrieve PlaylistRows entry, {self=}") - self.playlist_id = plr.playlist_id - self.row_number = plr.row_number + # TODO: only seems to be used by track_sequence + return + # plr = session.get(PlaylistRows, self.playlistrow_id) + # if not plr: + # raise ApplicationError(f"(Can't retrieve PlaylistRows entry, {self=}") + # self.playlist_id = plr.playlist_id + # self.row_number = plr.row_number class TrackSequence: - next: Optional[RowAndTrack] = None - current: Optional[RowAndTrack] = None - previous: Optional[RowAndTrack] = None + next: Optional[PlaylistRow] = None + current: Optional[PlaylistRow] = None + previous: Optional[PlaylistRow] = None - def set_next(self, rat: Optional[RowAndTrack]) -> None: + def set_next(self, rat: Optional[PlaylistRow]) -> None: """ Set the 'next' track to be passed rat. Clear any previous next track. If passed rat is None diff --git a/app/musicmuster.py b/app/musicmuster.py index 402d7c4..ab51968 100755 --- a/app/musicmuster.py +++ b/app/musicmuster.py @@ -75,7 +75,7 @@ from file_importer import FileImporter from helpers import ask_yes_no, file_is_unreadable, get_name from log import log from models import db, Playdates, PlaylistRows, Playlists, Queries, Settings, Tracks -from music_manager import RowAndTrack, track_sequence +from music_manager import PlaylistRow, track_sequence from playlistmodel import PlaylistModel, PlaylistProxyModel from playlists import PlaylistTab from querylistmodel import QuerylistModel @@ -2483,7 +2483,7 @@ class Window(QMainWindow): self.current.proxy_model.set_incremental_search(self.txtSearch.text()) - def selected_or_next_track_info(self) -> Optional[RowAndTrack]: + def selected_or_next_track_info(self) -> Optional[PlaylistRow]: """ Return RowAndTrack info for selected track. If no selected track, return for next track. If no next track, return None. @@ -2569,7 +2569,7 @@ class Window(QMainWindow): else: self.statusbar.clearMessage() - def show_track(self, playlist_track: RowAndTrack) -> None: + def show_track(self, playlist_track: PlaylistRow) -> None: """Scroll to show track in plt""" # Switch to the correct tab diff --git a/app/playlistmodel.py b/app/playlistmodel.py index fad13ab..55918bb 100644 --- a/app/playlistmodel.py +++ b/app/playlistmodel.py @@ -35,7 +35,6 @@ from classes import ( ApplicationError, Col, MusicMusterSignals, - PlaylistRowObj, ) from config import Config from helpers import ( @@ -49,8 +48,8 @@ from helpers import ( ) from log import log from models import db, NoteColours, Playdates, PlaylistRows, Tracks -from music_manager import RowAndTrack, track_sequence -from repository import get_playlist_rows +from music_manager import PlaylistRow, track_sequence +import repository HEADER_NOTES_COLUMN = 1 @@ -85,17 +84,19 @@ class PlaylistModel(QAbstractTableModel): self.playlist_id = playlist_id self.is_template = is_template - self.playlist_rows: dict[int, PlaylistRowObj] = {} - self.selected_rows: list[PlaylistRowObj] = [] + self.playlist_rows: dict[int, PlaylistRow] = {} + self.selected_rows: list[PlaylistRow] = [] self.signals = MusicMusterSignals() - self.signals.signal_set_next_row.connect(self.set_next_row) self.played_tracks_hidden = False + # Connect signals self.signals.begin_reset_model_signal.connect(self.begin_reset_model) self.signals.end_reset_model_signal.connect(self.end_reset_model) + self.signals.signal_add_track_to_header.connect(self.add_track_to_header) with db.Session() as session: # Ensure row numbers in playlist are contiguous + # TODO: remove this PlaylistRows.fixup_rownumbers(session, playlist_id) # Populate self.playlist_rows self.load_data(session) @@ -143,68 +144,66 @@ class PlaylistModel(QAbstractTableModel): return header_row def add_track_to_header( - self, row_number: int, track_id: int, note: Optional[str] = None + self, playlist_id: int, track_id: int, note: str = "" ) -> None: """ - Add track to existing header row + Handle signal_add_track_to_header """ - log.debug(f"{self}: add_track_to_header({row_number=}, {track_id=}, {note=}") + log.debug(f"{self}: add_track_to_header({playlist_id=}, {track_id=}, {note=}") - # Get existing row - try: - rat = self.playlist_rows[row_number] - except KeyError: - raise ApplicationError( - f"{self}: KeyError in add_track_to_header ({row_number=}, {track_id=})" - ) - if rat.path: - raise ApplicationError( - f"{self}: Header row already has track associated ({rat=}, {track_id=})" - ) - with db.Session() as session: - playlistrow = session.get(PlaylistRows, rat.playlistrow_id) - if not playlistrow: - raise ApplicationError( - f"{self}: Failed to retrieve playlist row ({rat.playlistrow_id=}" - ) - # Add track to PlaylistRows - playlistrow.track_id = track_id - # Add any further note (header will already have a note) - if note: - playlistrow.note += " " + note - session.commit() + if playlist_id != self.playlist_id: + return - # Update local copy - self.refresh_row(session, row_number) - # Repaint row - roles = [ - Qt.ItemDataRole.BackgroundRole, - Qt.ItemDataRole.DisplayRole, - Qt.ItemDataRole.FontRole, - Qt.ItemDataRole.ForegroundRole, - ] - # only invalidate required roles - self.invalidate_row(row_number, roles) + if not self.selected_rows: + return + + if len(self.selected_rows) > 1: + self.signals.show_warning_signal.emit( + "Add track to header", "Select one header to add track to" + ) + return + + selected_row = self.selected_rows[0] + if selected_row.path: + self.signals.show_warning_signal.emit( + "Add track to header", "Select header to add track to" + ) + return + + if selected_row.note: + selected_row.note += " " + note + selected_row.track_id = track_id + + # Update local copy + self.refresh_row(selected_row.row_number) + # Repaint row + roles = [ + Qt.ItemDataRole.BackgroundRole, + Qt.ItemDataRole.DisplayRole, + Qt.ItemDataRole.FontRole, + Qt.ItemDataRole.ForegroundRole, + ] + # only invalidate required roles + self.invalidate_row(row_number, roles) self.signals.resize_rows_signal.emit(self.playlist_id) - def _background_role(self, row: int, column: int, rat: RowAndTrack) -> QBrush: + def _background_role(self, row: int, column: int, plr: PlaylistRow) -> QBrush: """Return background setting""" # Handle entire row colouring # Header row if self.is_header_row(row): # Check for specific header colouring - if rat.row_bg is None: - with db.Session() as session: - rat.row_bg = NoteColours.get_colour(session, rat.note) - if rat.row_bg: - return QBrush(QColor(rat.row_bg)) + if plr.row_bg is None: + plr.row_bg = repository.get_colour(plr.note) + if plr.row_bg: + return QBrush(QColor(plr.row_bg)) else: return QBrush(QColor(Config.COLOUR_NOTES_PLAYLIST)) # Unreadable track file - if file_is_unreadable(rat.path): + if file_is_unreadable(plr.path): return QBrush(QColor(Config.COLOUR_UNREADABLE)) # Current track if ( @@ -223,22 +222,21 @@ class PlaylistModel(QAbstractTableModel): # Individual cell colouring if column == Col.START_GAP.value: - if rat.start_gap and rat.start_gap >= Config.START_GAP_WARNING_THRESHOLD: + if plr.start_gap and plr.start_gap >= Config.START_GAP_WARNING_THRESHOLD: return QBrush(QColor(Config.COLOUR_LONG_START)) if column == Col.BITRATE.value: - if not rat.bitrate or rat.bitrate < Config.BITRATE_LOW_THRESHOLD: + if not plr.bitrate or plr.bitrate < Config.BITRATE_LOW_THRESHOLD: return QBrush(QColor(Config.COLOUR_BITRATE_LOW)) - elif rat.bitrate < Config.BITRATE_OK_THRESHOLD: + elif plr.bitrate < Config.BITRATE_OK_THRESHOLD: return QBrush(QColor(Config.COLOUR_BITRATE_MEDIUM)) else: return QBrush(QColor(Config.COLOUR_BITRATE_OK)) if column == Col.NOTE.value: - if rat.note: - if rat.note_bg is None: - with db.Session() as session: - rat.note_bg = NoteColours.get_colour(session, rat.note) - if rat.note_bg: - return QBrush(QColor(rat.note_bg)) + if plr.note: + if plr.note_bg is None: + plr.row_bg = repository.get_colour(plr.note) + if plr.note_bg: + return QBrush(QColor(plr.note_bg)) return QBrush() @@ -406,7 +404,7 @@ class PlaylistModel(QAbstractTableModel): self.reset_track_sequence_row_numbers() self.update_track_times() - def _display_role(self, row: int, column: int, rat: RowAndTrack) -> str: + def _display_role(self, row: int, column: int, rat: PlaylistRow) -> str: """ Return text for display """ @@ -481,7 +479,7 @@ class PlaylistModel(QAbstractTableModel): super().endResetModel() self.reset_track_sequence_row_numbers() - def _edit_role(self, row: int, column: int, rat: RowAndTrack) -> str | int: + def _edit_role(self, row: int, column: int, rat: PlaylistRow) -> str | int: """ Return value for editing """ @@ -502,7 +500,7 @@ class PlaylistModel(QAbstractTableModel): return "" - def _foreground_role(self, row: int, column: int, rat: RowAndTrack) -> QBrush: + def _foreground_role(self, row: int, column: int, rat: PlaylistRow) -> QBrush: """Return header foreground colour or QBrush() if none""" if self.is_header_row(row): @@ -539,7 +537,7 @@ class PlaylistModel(QAbstractTableModel): return default - def _font_role(self, row: int, column: int, rat: RowAndTrack) -> QFont: + def _font_role(self, row: int, column: int, rat: PlaylistRow) -> QFont: """ Return font """ @@ -598,7 +596,7 @@ class PlaylistModel(QAbstractTableModel): log.debug(f"{self}: get_new_row_number() return: {new_row_number=}") return new_row_number - def get_row_info(self, row_number: int) -> RowAndTrack: + def get_row_info(self, row_number: int) -> PlaylistRow: """ Return info about passed row """ @@ -681,7 +679,7 @@ class PlaylistModel(QAbstractTableModel): return QVariant() - def header_text(self, rat: RowAndTrack) -> str: + def header_text(self, rat: PlaylistRow) -> str: """ Process possible section timing directives embeded in header """ @@ -735,31 +733,33 @@ class PlaylistModel(QAbstractTableModel): new_row_number = self._get_new_row_number(proposed_row_number) - with db.Session() as session: - super().beginInsertRows(QModelIndex(), new_row_number, new_row_number) - _ = PlaylistRows.insert_row( - session=session, - playlist_id=self.playlist_id, - new_row_number=new_row_number, - note=note, - track_id=track_id, - ) - session.commit() + super().beginInsertRows(QModelIndex(), new_row_number, new_row_number) - self.refresh_data(session) - super().endInsertRows() + new_row = repository.insert_row( + playlist_id=self.playlist_id, + row_number=new_row_number, + track_id=track_id, + note=note, + ) + + # Insert into self.playlist_rows + for destination_row in range(len(self.playlist_rows), new_row_number, -1): + self.playlist_rows[destination_row] = self.playlist_rows[destination_row - 1] + self.playlist_rows[new_row_number] = new_row + + super().endInsertRows() self.signals.resize_rows_signal.emit(self.playlist_id) + # TODO check this what we want to do and how we want to do it self.reset_track_sequence_row_numbers() - # only invalidate required roles - roles = [ + roles_to_invalidate = [ Qt.ItemDataRole.BackgroundRole, Qt.ItemDataRole.DisplayRole, Qt.ItemDataRole.FontRole, Qt.ItemDataRole.ForegroundRole, ] self.invalidate_rows( - list(range(new_row_number, len(self.playlist_rows))), roles + list(range(new_row_number, len(self.playlist_rows))), roles_to_invalidate ) def invalidate_row(self, modified_row: int, roles: list[Qt.ItemDataRole]) -> None: @@ -804,7 +804,7 @@ class PlaylistModel(QAbstractTableModel): return self.playlist_rows[row_number].played - def is_track_in_playlist(self, track_id: int) -> Optional[RowAndTrack]: + def is_track_in_playlist(self, track_id: int) -> Optional[PlaylistRow]: """ If this track_id is in the playlist, return the RowAndTrack object else return None @@ -841,9 +841,10 @@ class PlaylistModel(QAbstractTableModel): # new_playlist_rows[p.row_number] = self.playlist_rows[plid_to_row[p.id]] # new_playlist_rows[p.row_number].row_number = p.row_number # build a new playlist_rows - new_playlist_rows: dict[int, PlaylistRowObj] = {} - for p in get_playlist_rows(self.playlist_id): - new_playlist_rows[p.row_number] = p + # shouldn't be PlaylistRow + new_playlist_rows: dict[int, PlaylistRow] = {} + for p in repository.get_playlist_rows(self.playlist_id): + new_playlist_rows[p.row_number] = PlaylistRow(p) # Copy to self.playlist_rows self.playlist_rows = new_playlist_rows @@ -1008,7 +1009,7 @@ class PlaylistModel(QAbstractTableModel): self.update_track_times() def move_track_add_note( - self, new_row_number: int, existing_rat: RowAndTrack, note: str + self, new_row_number: int, existing_rat: PlaylistRow, note: str ) -> None: """ Move existing_rat track to new_row_number and append note to any existing note @@ -1034,26 +1035,6 @@ class PlaylistModel(QAbstractTableModel): self.move_rows([existing_rat.row_number], new_row_number) self.signals.resize_rows_signal.emit(self.playlist_id) - def move_track_to_header( - self, - header_row_number: int, - existing_rat: RowAndTrack, - note: Optional[str], - ) -> None: - """ - Add the existing_rat track details to the existing header at header_row_number - """ - - log.debug( - f"{self}: move_track_to_header({header_row_number=}, {existing_rat=}, {note=}" - ) - - if existing_rat.track_id: - if note and existing_rat.note: - note += "\n" + existing_rat.note - self.add_track_to_header(header_row_number, existing_rat.track_id, note) - self.delete_rows([existing_rat.row_number]) - def obs_scene_change(self, row_number: int) -> None: """ Check this row and any preceding headers for OBS scene change command @@ -1133,20 +1114,21 @@ class PlaylistModel(QAbstractTableModel): refresh_row(). """ - # Note where each playlist_id is - plid_to_row: dict[int, int] = {} + # Note where each playlist_id is by mapping each playlistrow_id + # to its current row_number + plrid_to_row: dict[int, int] = {} for oldrow in self.playlist_rows: plrdata = self.playlist_rows[oldrow] - plid_to_row[plrdata.playlistrow_id] = plrdata.row_number + plrid_to_row[plrdata.playlistrow_id] = plrdata.row_number # build a new playlist_rows - new_playlist_rows: dict[int, RowAndTrack] = {} - for p in PlaylistRows.get_playlist_rows(session, self.playlist_id): - if p.id not in plid_to_row: - new_playlist_rows[p.row_number] = RowAndTrack(p) + new_playlist_rows: dict[int, PlaylistRow] = {} + for p in repository.get_playlist_rows(self.playlist_id): + if p.playlistrow_id not in plrid_to_row: + new_playlist_rows[p.row_number] = PlaylistRow(p) else: - new_playlist_rows[p.row_number] = self.playlist_rows[plid_to_row[p.id]] - new_playlist_rows[p.row_number].row_number = p.row_number + new_playlist_row = self.playlist_rows[plrid_to_row[p.playlistrow_id]] + new_playlist_row.row_number = p.row_number # Copy to self.playlist_rows self.playlist_rows = new_playlist_rows @@ -1155,7 +1137,7 @@ class PlaylistModel(QAbstractTableModel): """Populate dict for one row from database""" p = PlaylistRows.deep_row(session, self.playlist_id, row_number) - self.playlist_rows[row_number] = RowAndTrack(p) + self.playlist_rows[row_number] = PlaylistRow(p) def remove_track(self, row_number: int) -> None: """ @@ -1344,7 +1326,7 @@ class PlaylistModel(QAbstractTableModel): return len(self.playlist_rows) - def section_subtotal_header(self, rat: RowAndTrack) -> str: + def section_subtotal_header(self, rat: PlaylistRow) -> str: """ Process this row as subtotal within a timed section and return display text for this row @@ -1589,7 +1571,7 @@ class PlaylistModel(QAbstractTableModel): self.sort_by_attribute(row_numbers, "title") - def start_of_timed_section_header(self, rat: RowAndTrack) -> str: + def start_of_timed_section_header(self, rat: PlaylistRow) -> str: """ Process this row as the start of a timed section and return display text for this row @@ -1625,7 +1607,7 @@ class PlaylistModel(QAbstractTableModel): def supportedDropActions(self) -> Qt.DropAction: return Qt.DropAction.MoveAction | Qt.DropAction.CopyAction - def _tooltip_role(self, row: int, column: int, rat: RowAndTrack) -> str: + def _tooltip_role(self, row: int, column: int, rat: PlaylistRow) -> str: """ Return tooltip. Currently only used for last_played column. """ diff --git a/app/querylistmodel.py b/app/querylistmodel.py index 954074e..db17f20 100644 --- a/app/querylistmodel.py +++ b/app/querylistmodel.py @@ -40,7 +40,7 @@ from helpers import ( ) from log import log from models import db, Playdates, Tracks -from music_manager import RowAndTrack +from music_manager import PlaylistRow @dataclass @@ -268,7 +268,7 @@ class QuerylistModel(QAbstractTableModel): bottom_right = self.index(row, self.columnCount() - 1) self.dataChanged.emit(top_left, bottom_right, [Qt.ItemDataRole.BackgroundRole]) - def _tooltip_role(self, row: int, column: int, rat: RowAndTrack) -> str | QVariant: + def _tooltip_role(self, row: int, column: int, rat: PlaylistRow) -> str | QVariant: """ Return tooltip. Currently only used for last_played column. """ diff --git a/app/repository.py b/app/repository.py index 98d3c99..d52bb89 100644 --- a/app/repository.py +++ b/app/repository.py @@ -1,15 +1,157 @@ # Standard library imports +import re # PyQt imports # Third party imports -from sqlalchemy import select, func +from sqlalchemy import ( + func, + select, + update, +) from sqlalchemy.orm import aliased -from classes import PlaylistRowObj +from sqlalchemy.orm.session import Session +from classes import ApplicationError, PlaylistRowDTO # App imports -from classes import TrackDTO -from models import db, Tracks, PlaylistRows, Playdates +from classes import PlaylistDTO, TrackDTO +from app import helpers +from log import log +from models import ( + db, + NoteColours, + Playdates, + PlaylistRows, + Playlists, + Tracks, +) + + +# Notecolour functions +def get_colour(text: str, foreground: bool = False) -> str: + """ + Parse text and return background (foreground if foreground==True) + colour string if matched, else None + """ + + if not text: + return "" + + match = False + + with db.Session() as session: + for rec in NoteColours.get_all(session): + if rec.is_regex: + flags = re.UNICODE + if not rec.is_casesensitive: + flags |= re.IGNORECASE + p = re.compile(rec.substring, flags) + if p.match(text): + match = True + else: + if rec.is_casesensitive: + if rec.substring in text: + match = True + else: + if rec.substring.lower() in text.lower(): + match = True + + if match: + if foreground: + return rec.foreground or "" + else: + return rec.colour + return "" + + +# Track functions +def add_track_to_header(self, playlistrow_id: int, track_id: int) -> None: + """ + Add a track to this (header) row + """ + + with db.Session() as session: + session.execute( + update(PlaylistRows) + .where(PlaylistRows.id == playlistrow_id) + .values(track_id=track_id) + ) + session.commit() + + +def create_track(path: str) -> TrackDTO: + """ + Create a track db entry from a track path and return the DTO + """ + + metadata = helpers.get_all_track_metadata(path) + with db.Session() as session: + try: + track = Tracks(session=session, **metadata) + track_id = track.id + session.commit() + except Exception: + raise ApplicationError("Can't create Track") + + return track_by_id(track_id) + + +def track_by_id(track_id: int) -> TrackDTO | None: + """ + Return track with specified id + """ + + # Alias PlaydatesTable for subquery + LatestPlaydate = aliased(Playdates) + + # Subquery: latest playdate for each track + latest_playdate_subq = ( + select( + LatestPlaydate.track_id, + func.max(LatestPlaydate.lastplayed).label("lastplayed"), + ) + .group_by(LatestPlaydate.track_id) + .subquery() + ) + + stmt = ( + select( + Tracks.id.label("track_id"), + Tracks.artist, + Tracks.bitrate, + Tracks.duration, + Tracks.fade_at, + Tracks.intro, + Tracks.path, + Tracks.silence_at, + Tracks.start_gap, + Tracks.title, + latest_playdate_subq.c.lastplayed, + ) + .outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id) + .where(Tracks.id == track_id) + ) + + with db.Session() as session: + record = session.execute(stmt).one_or_none() + if not record: + return None + + dto = TrackDTO( + artist=record.artist, + bitrate=record.bitrate, + duration=record.duration, + fade_at=record.fade_at, + intro=record.intro, + lastplayed=record.lastplayed, + path=record.path, + silence_at=record.silence_at, + start_gap=record.start_gap, + title=record.title, + track_id=record.track_id, + ) + + return dto def tracks_like_title(filter_str: str) -> list[TrackDTO]: @@ -29,7 +171,87 @@ def tracks_like_title(filter_str: str) -> list[TrackDTO]: ] -def get_playlist_rows(playlist_id: int) -> list[PlaylistRowObj]: +# Playlist functions +def _check_row_number_sequence( + session: Session, playlist_id: int, fix: bool = False +) -> None: + """ + The row numbers for any playlist should run from 0 to (length - 1). + This function checks that that is the case. + + If there are errors, 'fix' determines what action is taken. + + If fix == True: + Fix the row numbers and save to database. Log at info level. + If fix == False: + Log at error level and raise ApplicationError + """ + + errors = False + + playlist_rows = session.scalars( + select(PlaylistRows) + .where(PlaylistRows.playlist_id == playlist_id) + .order_by(PlaylistRows.row_number) + ).all() + + for idx, playlist_row in enumerate(playlist_rows): + if playlist_row.row_number != idx: + errors = True + msg = f"_check_row_number_sequence({playlist_id=}, {fix=}, {playlist_row=}, {idx=}" + if fix: + log.info(msg) + playlist_row.row_number = idx + else: + log.error(msg) + raise ApplicationError(msg) + + if errors: + session.commit() + + +def _move_rows_down( + session: Session, playlist_id: int, starting_row: int, move_by: int +) -> None: + """ + Create space to insert move_by additional rows by incremented row + number from starting_row to end of playlist + """ + + log.debug(f"(_move_rows_down({playlist_id=}, {starting_row=}, {move_by=}") + + session.execute( + update(PlaylistRows) + .where( + (PlaylistRows.playlist_id == playlist_id), + (PlaylistRows.row_number >= starting_row), + ) + .values(row_number=PlaylistRows.row_number + move_by) + ) + session.commit() + + +def create_playlist(name: str, template_id: int) -> PlaylistDTO: + """ + Create playlist and return DTO. + """ + + with db.Session() as session: + try: + playlist = Playlists(session, name, template_id) + playlist_id = playlist.id + session.commit() + except Exception: + raise ApplicationError("Can't create Playlist") + + return playlist_by_id(playlist_id) + + +def get_playlist_row(playlist_row_id: int) -> PlaylistRowDTO | None: + """ + Return specific row DTO + """ + # Alias PlaydatesTable for subquery LatestPlaydate = aliased(Playdates) @@ -37,7 +259,94 @@ def get_playlist_rows(playlist_id: int) -> list[PlaylistRowObj]: latest_playdate_subq = ( select( LatestPlaydate.track_id, - func.max(LatestPlaydate.lastplayed).label("lastplayed") + func.max(LatestPlaydate.lastplayed).label("lastplayed"), + ) + .group_by(LatestPlaydate.track_id) + .subquery() + ) + + stmt = ( + select( + PlaylistRows.id.label("playlistrow_id"), + PlaylistRows.row_number, + PlaylistRows.note, + PlaylistRows.played, + PlaylistRows.playlist_id, + Tracks.id.label("track_id"), + Tracks.artist, + Tracks.bitrate, + Tracks.duration, + Tracks.fade_at, + Tracks.intro, + Tracks.path, + Tracks.silence_at, + Tracks.start_gap, + Tracks.title, + latest_playdate_subq.c.lastplayed, + ) + .outerjoin(Tracks, PlaylistRows.track_id == Tracks.id) + .outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id) + .where(PlaylistRows.id == playlist_row_id) + .order_by(PlaylistRows.row_number) + ) + + with db.Session() as session: + record = session.execute(stmt).one_or_none() + if not record: + return None + + # Handle cases where track_id is None (no track associated) + if record.track_id is None: + dto = PlaylistRowDTO( + artist="", + bitrate=0, + duration=0, + fade_at=0, + intro=None, + lastplayed=None, + note=record.note, + path="", + played=record.played, + playlist_id=record.playlist_id, + playlistrow_id=record.playlistrow_id, + row_number=record.row_number, + silence_at=0, + start_gap=0, + title="", + track_id=-1, + ) + else: + dto = PlaylistRowDTO( + artist=record.artist, + bitrate=record.bitrate, + duration=record.duration, + fade_at=record.fade_at, + intro=record.intro, + lastplayed=record.lastplayed, + note=record.note, + path=record.path, + played=record.played, + playlist_id=record.playlist_id, + playlistrow_id=record.playlistrow_id, + row_number=record.row_number, + silence_at=record.silence_at, + start_gap=record.start_gap, + title=record.title, + track_id=record.track_id, + ) + + return dto + + +def get_playlist_rows(playlist_id: int) -> list[PlaylistRowDTO]: + # Alias PlaydatesTable for subquery + LatestPlaydate = aliased(Playdates) + + # Subquery: latest playdate for each track + latest_playdate_subq = ( + select( + LatestPlaydate.track_id, + func.max(LatestPlaydate.lastplayed).label("lastplayed"), ) .group_by(LatestPlaydate.track_id) .subquery() @@ -75,7 +384,7 @@ def get_playlist_rows(playlist_id: int) -> list[PlaylistRowObj]: for row in results: # Handle cases where track_id is None (no track associated) if row.track_id is None: - dto = PlaylistRowObj( + dto = PlaylistRowDTO( artist="", bitrate=0, duration=0, @@ -95,7 +404,7 @@ def get_playlist_rows(playlist_id: int) -> list[PlaylistRowObj]: # Additional fields like row_fg, row_bg, etc., use default None values ) else: - dto = PlaylistRowObj( + dto = PlaylistRowDTO( artist=row.artist, bitrate=row.bitrate, duration=row.duration, @@ -117,3 +426,62 @@ def get_playlist_rows(playlist_id: int) -> list[PlaylistRowObj]: dto_list.append(dto) return dto_list + + +def insert_row(playlist_id: int, row_number: int, track_id: int, note: str) -> PlaylistRowDTO: + """ + Insert a new row into playlist and return new row DTO + """ + + with db.Session() as session: + # Sanity check + _check_row_number_sequence(session=session, playlist_id=playlist_id, fix=False) + + # Make space for new row + _move_rows_down( + session=session, playlist_id=playlist_id, starting_row=row_number, move_by=1 + ) + + playlist_row = PlaylistRows.insert_row( + session=session, + playlist_id=playlist_id, + new_row_number=row_number, + note=note, + track_id=track_id, + ) + session.commit() + playlist_row_id = playlist_row.id + + # Sanity check + _check_row_number_sequence(session=session, playlist_id=playlist_id, fix=False) + + return get_playlist_row(playlist_row_id=playlist_row_id) + + +def playlist_by_id(playlist_id: int) -> PlaylistDTO | None: + """ + Return playlist with specified id + """ + + stmt = select( + Playlists.id.label("playlist_id"), + Playlists.name, + Playlists.favourite, + Playlists.is_template, + Playlists.open, + ).where(Playlists.id == playlist_id) + + with db.Session() as session: + record = session.execute(stmt).one_or_none() + if not record: + return None + + dto = PlaylistDTO( + name=record.name, + playlist_id=record.playlist_id, + favourite=record.favourite, + is_template=record.is_template, + open=record.open, + ) + + return dto diff --git a/tests/template_test_harness.py b/tests/template_test_harness.py new file mode 100644 index 0000000..b01b20b --- /dev/null +++ b/tests/template_test_harness.py @@ -0,0 +1,40 @@ +# Standard library imports +import unittest + +# PyQt imports + +# Third party imports + +# App imports +from app.models import ( + db, +) + + +class MyTestCase(unittest.TestCase): + @classmethod + def setUpClass(cls): + """Runs once before any test in this class""" + + pass + + @classmethod + def tearDownClass(cls): + """Runs once after all tests""" + + pass + + def setUp(self): + """Runs before each test""" + + db.create_all() + + def tearDown(self): + """Runs after each test""" + + db.drop_all() + + def test_xxx(self): + """Comment""" + + pass diff --git a/tests/test_db_updates.py b/tests/test_db_updates.py new file mode 100644 index 0000000..fb89e17 --- /dev/null +++ b/tests/test_db_updates.py @@ -0,0 +1,76 @@ +# Standard library imports +import unittest + +# PyQt imports + +# Third party imports + +# App imports +from app import playlistmodel +from app import repository +from app.models import db + + +class MyTestCase(unittest.TestCase): + @classmethod + def setUpClass(cls): + """Runs once before any test in this class""" + + pass + + @classmethod + def tearDownClass(cls): + """Runs once after all tests""" + + pass + + def setUp(self): + """Runs before each test""" + + db.create_all() + + # Create a playlist and model + playlist_name = "my playlist" + self.playlist = repository.create_playlist(name=playlist_name, template_id=0) + assert self.playlist + self.model = playlistmodel.PlaylistModel( + self.playlist.playlist_id, is_template=False + ) + assert self.model + + # Create tracks + track1_path = "testdata/isa.mp3" + self.track1 = repository.create_track(track1_path) + + track2_path = "testdata/mom.mp3" + self.track2 = repository.create_track(track2_path) + + # Add tracks and header to playlist + repository.insert_row( + self.playlist.playlist_id, + row_number=0, + track_id=self.track1.track_id, + note="track 1", + ) + repository.insert_row( + self.playlist.playlist_id, + row_number=1, + track_id=0, + note="Header row", + ) + repository.insert_row( + self.playlist.playlist_id, + row_number=2, + track_id=self.track2.track_id, + note="track 2", + ) + + def tearDown(self): + """Runs after each test""" + + db.drop_all() + + def test_xxx(self): + """Comment""" + + pass