From 4e89d72a8fc477ba2d3b37cd1618b44c1242eae4 Mon Sep 17 00:00:00 2001 From: Keith Edmunds Date: Sat, 22 Mar 2025 18:53:14 +0000 Subject: [PATCH] WIP: move within playlist tests working --- app/classes.py | 9 ++ app/file_importer.py | 12 +- app/models.py | 18 --- app/musicmuster.py | 6 +- app/playlistmodel.py | 278 ++++++++++++++---------------------- app/repository.py | 217 +++++++++++++++++++++------- kae.py | 166 +++++++++++++++++++++ tests/test_db_updates.py | 78 ---------- tests/test_playlistmodel.py | 116 --------------- tests/test_repository.py | 252 ++++++++++++++++++++++++++++++++ 10 files changed, 710 insertions(+), 442 deletions(-) create mode 100755 kae.py delete mode 100644 tests/test_db_updates.py create mode 100644 tests/test_repository.py diff --git a/app/classes.py b/app/classes.py index b48ddc1..290802a 100644 --- a/app/classes.py +++ b/app/classes.py @@ -163,6 +163,13 @@ class TrackInfo(NamedTuple): # Classes for signals +@dataclass +class InsertRows: + playlist_id: int + from_row: int + to_row: int + + @dataclass class InsertTrack: playlist_id: int @@ -188,6 +195,8 @@ class MusicMusterSignals(QObject): search_wikipedia_signal = pyqtSignal(str) show_warning_signal = pyqtSignal(str, str) signal_add_track_to_header = pyqtSignal(int, int) + signal_begin_insert_rows = pyqtSignal(InsertRows) + signal_end_insert_rows = pyqtSignal(int) signal_insert_track = pyqtSignal(InsertTrack) signal_playlist_selected_rows = pyqtSignal(int, list) signal_set_next_row = pyqtSignal(int) diff --git a/app/file_importer.py b/app/file_importer.py index 18ec7a3..7d0f2be 100644 --- a/app/file_importer.py +++ b/app/file_importer.py @@ -32,6 +32,7 @@ from classes import ( MusicMusterSignals, singleton, Tags, + TrackDTO, ) from config import Config from helpers import ( @@ -40,7 +41,6 @@ from helpers import ( show_OK, ) from log import log -from models import db, Tracks from playlistrow import TrackSequence from playlistmodel import PlaylistModel import helpers @@ -122,13 +122,7 @@ class FileImporter: # Get signals self.signals = MusicMusterSignals() - def _get_existing_tracks(self) -> Sequence[Tracks]: - """ - Return a list of all existing Tracks - """ - - with db.Session() as session: - return Tracks.get_all(session) + self.existing_tracks: list[TrackDTO] = [] def start(self) -> None: """ @@ -148,7 +142,7 @@ class FileImporter: # Refresh list of existing tracks as they may have been updated # by previous imports - self.existing_tracks = self._get_existing_tracks() + self.existing_tracks = repository.get_all_tracks() for infile in [ os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f) diff --git a/app/models.py b/app/models.py index fe62ea3..96cb156 100644 --- a/app/models.py +++ b/app/models.py @@ -438,24 +438,6 @@ class PlaylistRows(dbtables.PlaylistRowsTable): ) ) - @staticmethod - def fixup_rownumbers(session: Session, playlist_id: int) -> None: - """ - Ensure the row numbers for passed playlist have no gaps - """ - - plrs = session.scalars( - select(PlaylistRows) - .where(PlaylistRows.playlist_id == playlist_id) - .order_by(PlaylistRows.row_number) - ).all() - - for i, plr in enumerate(plrs): - plr.row_number = i - - # Ensure new row numbers are available to the caller - session.commit() - @classmethod def plrids_to_plrs( cls, session: Session, playlist_id: int, plr_ids: list[int] diff --git a/app/musicmuster.py b/app/musicmuster.py index d673563..bb23a98 100755 --- a/app/musicmuster.py +++ b/app/musicmuster.py @@ -1180,7 +1180,7 @@ class Window(QMainWindow): self.footer_section.widgetFadeVolume.setDefaultPadding(0) self.footer_section.widgetFadeVolume.setBackground(Config.FADE_CURVE_BACKGROUND) - self.move_source_rows: Optional[list[int]] = None + self.move_source_rows: list[PlaylistRow] = [] self.move_source_model: Optional[PlaylistModel] = None self.disable_selection_timing = False @@ -2032,7 +2032,7 @@ class Window(QMainWindow): # Save the selected PlaylistRows items ready for a later # paste - self.move_source_rows = self.current.selected_row_numbers + self.move_source_rows = self.current.base_model.selected_rows self.move_source_model = self.current.base_model log.debug( @@ -2667,6 +2667,8 @@ class Window(QMainWindow): Update track clocks. """ + self.timer1000.stop() + raise ApplicationError("test") # If track is playing, update track clocks time and colours if self.track_sequence.current and self.track_sequence.current.is_playing(): # Elapsed time diff --git a/app/playlistmodel.py b/app/playlistmodel.py index f351fa8..5bdd56e 100644 --- a/app/playlistmodel.py +++ b/app/playlistmodel.py @@ -34,6 +34,7 @@ import obswebsocket # type: ignore from classes import ( ApplicationError, Col, + InsertRows, MusicMusterSignals, ) from config import Config @@ -94,16 +95,14 @@ class PlaylistModel(QAbstractTableModel): self.signals.begin_reset_model_signal.connect(self.begin_reset_model) self.signals.end_reset_model_signal.connect(self.end_reset_model) self.signals.signal_add_track_to_header.connect(self.add_track_to_header) + self.signals.signal_begin_insert_rows.connect(self.begin_insert_rows) + self.signals.signal_end_insert_rows.connect(self.end_insert_rows) self.signals.signal_playlist_selected_rows.connect(self.set_selected_rows) self.signals.signal_set_next_row.connect(self.set_next_row) - with db.Session() as session: - # Ensure row numbers in playlist are contiguous - # TODO: remove this - PlaylistRows.fixup_rownumbers(session, playlist_id) - # Populate self.playlist_rows - self.load_data() + for dto in repository.get_playlist_rows(self.playlist_id): + self.playlist_rows[dto.row_number] = PlaylistRow(dto) self.update_track_times() def __repr__(self) -> str: @@ -389,25 +388,18 @@ class PlaylistModel(QAbstractTableModel): Need to delete them in contiguous groups wrapped in beginRemoveRows / endRemoveRows calls. To keep it simple, if inefficient, delete rows one by one. - TODO: delete in blocks - Delete from highest row back so that not yet deleted row numbers don't change. """ - with db.Session() as session: - for row_number in sorted(row_numbers, reverse=True): - log.debug(f"{self}: delete_rows(), {row_number=}") - super().beginRemoveRows(QModelIndex(), row_number, row_number) - # We need to remove data from the underlying data store, - # which is the database, but we cache in - # self.playlist_rows, which is what calls to data() - # reads, so fixup that too. - PlaylistRows.delete_row(session, self.playlist_id, row_number) - PlaylistRows.fixup_rownumbers(session, self.playlist_id) - self.refresh_data(session) - session.commit() - super().endRemoveRows() + for row_group in self._reversed_contiguous_row_groups(row_numbers): + # Signal that rows will be removed + super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group)) + # Remove rows from data store + repository.remove_rows(self.playlist_id, row_group) + # Signal that data store has been updated + super().endRemoveRows() + self.refresh_data() self.track_sequence.update() self.update_track_times() @@ -825,39 +817,6 @@ class PlaylistModel(QAbstractTableModel): return None - def load_data(self) -> None: - """ - Same as refresh data, but only used when creating playslit. - Distinguishes profile time between initial load and other - refreshes. - """ - - # We used to clear self.playlist_rows each time but that's - # expensive and slow on big playlists - - # # Note where each playlist_id is - # plid_to_row: dict[int, int] = {} - # for oldrow in self.playlist_rows: - # plrdata = self.playlist_rows[oldrow] - # plid_to_row[plrdata.playlistrow_id] = plrdata.row_number - - # build a new playlist_rows - # new_playlist_rows: dict[int, RowAndTrack] = {} - # for p in PlaylistRows.get_playlist_rows(session, self.playlist_id): - # if p.id not in plid_to_row: - # new_playlist_rows[p.row_number] = RowAndTrack(p) - # else: - # new_playlist_rows[p.row_number] = self.playlist_rows[plid_to_row[p.id]] - # new_playlist_rows[p.row_number].row_number = p.row_number - # build a new playlist_rows - # shouldn't be PlaylistRow - new_playlist_rows: dict[int, PlaylistRow] = {} - for dto in repository.get_playlist_rows(self.playlist_id): - new_playlist_rows[dto.row_number] = PlaylistRow(dto) - - # Copy to self.playlist_rows - self.playlist_rows = new_playlist_rows - def mark_unplayed(self, row_numbers: list[int]) -> None: """ Mark row as unplayed @@ -874,78 +833,81 @@ class PlaylistModel(QAbstractTableModel): ] self.invalidate_rows(row_numbers, roles) - def move_rows(self, from_rows: list[int], to_row_number: int) -> None: + def move_rows(self, from_rows: list[PlaylistRow], to_row_number: int) -> None: """ Move the playlist rows given to to_row and below. """ log.debug(f"{self}: move_rows({from_rows=}, {to_row_number=}") - # Build a {current_row_number: new_row_number} dictionary - row_map: dict[int, int] = {} + # Don't move current row + if self.track_sequence.current: + current_row = self.track_sequence.current.row_number + if current_row in from_rows: + log.debug( + "move_rows: Removing {current_row=} from {from_rows=}" + ) + from_rows.remove(self.track_sequence.current.row_number) - # The destination row number will need to be reduced by the - # number of rows being move from above the destination row - # otherwise rows below the destination row will end up above the - # moved rows. - adjusted_to_row = to_row_number - len( - [a for a in from_rows if a < to_row_number] - ) + # Row moves must be wrapped in beginMoveRows .. endMoveRows and + # the row range must be contiguous. Process the highest rows + # first so the lower row numbers are unchanged - # Put the from_row row numbers into the row_map. Ultimately the - # total number of elements in the playlist doesn't change, so - # check that adding the moved rows starting at to_row won't - # overshoot the end of the playlist. - if adjusted_to_row + len(from_rows) > len(self.playlist_rows): - next_to_row = len(self.playlist_rows) - len(from_rows) - else: - next_to_row = adjusted_to_row + row_groups = self._reversed_contiguous_row_groups([a.row_number for a in from_rows]) - # zip iterates from_row and to_row simultaneously from the - # respective sequences inside zip() - for from_row, to_row in zip( - from_rows, range(next_to_row, next_to_row + len(from_rows)) - ): - row_map[from_row] = to_row - - # Move the remaining rows to the row_map. We want to fill it - # before (if there are gaps) and after (likewise) the rows that - # are moving. - # zip iterates old_row and new_row simultaneously from the - # respective sequences inside zip() - for old_row, new_row in zip( - [x for x in self.playlist_rows.keys() if x not in from_rows], - [y for y in range(len(self.playlist_rows)) if y not in row_map.values()], - ): - # Optimise: only add to map if there is a change - if old_row != new_row: - row_map[old_row] = new_row - - # For SQLAlchemy, build a list of dictionaries that map playlistrow_id to - # new row number: - sqla_map: list[dict[str, int]] = [] - for oldrow, newrow in row_map.items(): - playlistrow_id = self.playlist_rows[oldrow].playlistrow_id - sqla_map.append({"playlistrow_id": playlistrow_id, "row_number": newrow}) - - with db.Session() as session: - PlaylistRows.update_plr_row_numbers(session, self.playlist_id, sqla_map) - session.commit() - # Update playlist_rows - self.refresh_data(session) + # Handle the moves in row_group chunks + for row_group in row_groups: + # Tell model we will be moving rows + # See https://doc.qt.io/qt-6/qabstractitemmodel.html#beginMoveRows + # for how destination is calculated + destination = to_row_number + if to_row_number > max(row_group): + destination = to_row_number - max(row_group) + 1 + super().beginMoveRows(QModelIndex(), + min(row_group), + max(row_group), + QModelIndex(), + destination + ) + # Update database + repository.move_rows_within_playlist(self.playlist_id, row_group, to_row_number) + # Tell model we have finished moving rows + super().endMoveRows() # Update display + self.refresh_data() self.track_sequence.update() self.update_track_times() - # only invalidate required roles - roles = [ - Qt.ItemDataRole.DisplayRole, - ] - self.invalidate_rows(list(row_map.keys()), roles) + # TODO: do we need this? + # # only invalidate required roles + # roles = [ + # Qt.ItemDataRole.DisplayRole, + # ] + # self.invalidate_rows(list(row_map.keys()), roles) + + def begin_insert_rows(self, insert_rows: InsertRows) -> None: + """ + Prepare model to insert rows + """ + + if insert_rows.playlist_id != self.playlist_id: + return + + super().beginInsertRows(QModelIndex(), insert_rows.from_row, insert_rows.to_row) + + def end_insert_rows(self, playlist_id: int) -> None: + """ + End insert rows + """ + + if playlist_id != self.playlist_id: + return + + super().endInsertRows() def move_rows_between_playlists( self, - from_rows: list[int], + from_rows: list[PlaylistRow], to_row_number: int, to_playlist_id: int, ) -> None: @@ -958,56 +920,46 @@ class PlaylistModel(QAbstractTableModel): f"{to_row_number=}, {to_playlist_id=}" ) - # Row removal must be wrapped in beginRemoveRows .. - # endRemoveRows and the row range must be contiguous. Process - # the highest rows first so the lower row numbers are unchanged - row_groups = self._reversed_contiguous_row_groups(from_rows) - - # Prepare destination playlist for a reset - self.signals.begin_reset_model_signal.emit(to_playlist_id) - - with db.Session() as session: - for row_group in row_groups: - # Make room in destination playlist - max_destination_row_number = PlaylistRows.get_last_used_row( - session, to_playlist_id + # Don't move current row + if self.track_sequence.current: + current_row = self.track_sequence.current.row_number + if current_row in from_rows: + log.debug( + "move_rows_between_playlists: Removing {current_row=} from {from_rows=}" ) - if ( - max_destination_row_number - and to_row_number <= max_destination_row_number - ): - # Move the destination playlist rows down to make room. - PlaylistRows.move_rows_down( - session, to_playlist_id, to_row_number, len(row_group) - ) - next_to_row = to_row_number + from_rows.remove(self.track_sequence.current.row_number) - super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group)) - for playlist_row in PlaylistRows.plrids_to_plrs( - session, - self.playlist_id, - [self.playlist_rows[a].playlistrow_id for a in row_group], - ): - if ( - self.track_sequence.current - and playlist_row.id == self.track_sequence.current.playlistrow_id - ): - # Don't move current track - continue - playlist_row.playlist_id = to_playlist_id - playlist_row.row_number = next_to_row - next_to_row += 1 - self.refresh_data(session) - super().endRemoveRows() - # We need to remove gaps in row numbers after tracks have - # moved. - PlaylistRows.fixup_rownumbers(session, self.playlist_id) - self.refresh_data(session) - session.commit() + # Row removal must be wrapped in beginRemoveRows .. endRemoveRows + # and the row range must be contiguous. Process the highest rows + # first so the lower row numbers are unchanged - # Reset of model must come after session has been closed + row_groups = self._reversed_contiguous_row_groups([a.row_number for a in from_rows]) + + # Handle the moves in row_group chunks + + # TODO: use bool QAbstractItemModel::beginMoveRows(const + # QModelIndex &sourceParent, int sourceFirst, int sourceLast, + # const QModelIndex &destinationParent, int destinationChild) + + for row_group in row_groups: + # Prepare source model + super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group)) + # Prepare destination model + insert_rows = InsertRows(to_playlist_id, + to_row_number, + to_row_number + len(row_group) + ) + self.signals.signal_begin_insert_rows.emit(insert_rows) + repository.move_rows_to_playlist(from_rows=row_group, + from_playlist_id=self.playlist_id, + to_row=to_row_number, + to_playlist_id=to_playlist_id + ) + self.signals.signal_end_insert_rows.emit(to_playlist_id) + super().endRemoveRows() + + self.refresh_data() self.track_sequence.update() - self.signals.end_reset_model_signal.emit(to_playlist_id) self.update_track_times() def move_track_add_note( @@ -1101,16 +1053,9 @@ class PlaylistModel(QAbstractTableModel): ] self.invalidate_row(self.track_sequence.previous.row_number, roles) - def refresh_data(self, session: Session) -> None: + def refresh_data(self) -> None: """ Populate self.playlist_rows with playlist data - - We used to clear self.playlist_rows each time but that's - expensive and slow on big playlists. Instead we track where rows - are in database versus self.playlist_rows and fixup the latter. - This works well for news rows added and for rows moved, but - doesn't work for changed comments so they must be handled using - refresh_row(). """ # Note where each playlist_id is by mapping each playlistrow_id @@ -1216,9 +1161,7 @@ class PlaylistModel(QAbstractTableModel): ] self.invalidate_rows(row_numbers, roles) - def _reversed_contiguous_row_groups( - self, row_numbers: list[int] - ) -> list[list[int]]: + def _reversed_contiguous_row_groups(self, row_numbers: list[int]) -> list[list[int]]: """ Take the list of row numbers and split into groups of contiguous rows. Return as a list of lists with the highest row numbers first. @@ -1233,6 +1176,7 @@ class PlaylistModel(QAbstractTableModel): result: list[list[int]] = [] temp: list[int] = [] last_value = row_numbers[0] - 1 + row_numbers.sort() for idx in range(len(row_numbers)): if row_numbers[idx] != last_value + 1: diff --git a/app/repository.py b/app/repository.py index 10fbe71..5330b7a 100644 --- a/app/repository.py +++ b/app/repository.py @@ -5,13 +5,14 @@ import re # Third party imports from sqlalchemy import ( + delete, func, select, update, ) from sqlalchemy.orm import aliased from sqlalchemy.orm.session import Session -from sqlalchemy.sql.elements import BinaryExpression +from sqlalchemy.sql.elements import BinaryExpression, ColumnElement from classes import ApplicationError, PlaylistRowDTO # App imports @@ -113,6 +114,12 @@ def create_track(path: str) -> TrackDTO: return new_track +def get_all_tracks() -> list[TrackDTO]: + """Return a list of all tracks""" + + return _tracks_where(Tracks.id > 0) + + def track_by_id(track_id: int) -> TrackDTO | None: """ Return track with specified id @@ -171,7 +178,7 @@ def track_by_id(track_id: int) -> TrackDTO | None: return dto -def _tracks_like(where: BinaryExpression) -> list[TrackDTO]: +def _tracks_where(where: BinaryExpression | ColumnElement[bool]) -> list[TrackDTO]: """ Return tracks selected by where """ @@ -235,7 +242,7 @@ def tracks_like_artist(filter_str: str) -> list[TrackDTO]: Return tracks where artist is like filter """ - return _tracks_like(Tracks.artist.ilike(f"%{filter_str}%")) + return _tracks_where(Tracks.artist.ilike(f"%{filter_str}%")) def tracks_like_title(filter_str: str) -> list[TrackDTO]: @@ -243,54 +250,16 @@ def tracks_like_title(filter_str: str) -> list[TrackDTO]: Return tracks where title is like filter """ - return _tracks_like(Tracks.title.ilike(f"%{filter_str}%")) + return _tracks_where(Tracks.title.ilike(f"%{filter_str}%")) # Playlist functions -def _check_row_number_sequence( - session: Session, playlist_id: int, fix: bool = False -) -> None: - """ - The row numbers for any playlist should run from 0 to (length - 1). - This function checks that that is the case. - - If there are errors, 'fix' determines what action is taken. - - If fix == True: - Fix the row numbers and save to database. Log at info level. - If fix == False: - Log at error level and raise ApplicationError - """ - - errors = False - - playlist_rows = session.scalars( - select(PlaylistRows) - .where(PlaylistRows.playlist_id == playlist_id) - .order_by(PlaylistRows.row_number) - ).all() - - for idx, playlist_row in enumerate(playlist_rows): - if playlist_row.row_number != idx: - errors = True - msg = f"_check_row_number_sequence({playlist_id=}, {fix=}, {playlist_row=}, {idx=}" - if fix: - log.info(msg) - playlist_row.row_number = idx - else: - log.error(msg) - raise ApplicationError(msg) - - if errors: - session.commit() - - -def _move_rows_down( +def _move_rows( session: Session, playlist_id: int, starting_row: int, move_by: int ) -> None: """ - Create space to insert move_by additional rows by incremented row - number from starting_row to end of playlist + Move rows from starting_row by move_by. If move_by is +ve, move rows + down; if -ve, move them up. """ log.debug(f"(_move_rows_down({playlist_id=}, {starting_row=}, {move_by=}") @@ -306,6 +275,101 @@ def _move_rows_down( session.commit() +def move_rows_to_playlist( + from_rows: list[int], from_playlist_id: int, to_row: int, to_playlist_id: int +) -> None: + """ + Move rows between playlists. + """ + + with db.Session() as session: + # Prepare desination playlist + # Find last used row + last_row = session.execute( + select(func.max(PlaylistRows.row_number)).where( + PlaylistRows.playlist_id == to_playlist_id + ) + ).scalar_one() + if last_row is None: + last_row = -1 + # Make room in destination + if to_row <= last_row: + _move_rows(session, to_playlist_id, to_row, len(from_rows)) + # Move rows + row_offset = to_row - min(from_rows) + stmt = ( + update(PlaylistRows) + .where( + PlaylistRows.playlist_id == from_playlist_id, + PlaylistRows.row_number.in_(from_rows) + ) + .values( + playlist_id=to_playlist_id, + row_number=PlaylistRows.row_number + row_offset + ) + ) + session.execute(stmt) + # Remove gaps in source + _move_rows(session=session, + playlist_id=from_playlist_id, + starting_row=max(from_rows) + 1, + move_by=(len(from_rows) * -1) + ) + # Commit changes + session.commit() + # Sanity check + _check_playlist_integrity(session, get_playlist_rows(from_playlist_id), fix=False) + _check_playlist_integrity(session, get_playlist_rows(to_playlist_id), fix=False) + + +def move_rows_within_playlist(playlist_id: int, from_rows: list[int], to_row: int) -> None: + """ + Move rows within a playlist. + """ + + log.debug(f"move_rows_within_playlist({playlist_id=}, {from_rows=}, {to_row=})") + + playlistrows_dto = get_playlist_rows(playlist_id) + new_order: dict[int, int | None] = dict.fromkeys(range(len(playlistrows_dto))) + + # Populate new_order with moved rows + next_row = to_row + # We need to keep, where possible, the rows after to_row unmoved + if to_row + len(from_rows) > len(playlistrows_dto): + next_row = max(to_row - len(from_rows) + 1, 0) + for from_row in from_rows: + new_order[next_row] = from_row + next_row += 1 + + # Move remaining rows + remaining_rows = set(new_order.keys()) - set(from_rows) + next_row = 0 + for row in remaining_rows: + while new_order[next_row] is not None: + next_row += 1 + new_order[next_row] = row + next_row += 1 + + # Sanity check + if None in new_order: + raise ApplicationError(f"None remains after move: {new_order=}") + + # Update database + # Build a list of dicts of (id: value, row_number: value} + update_list = [] + for new_row_number, old_row_number in new_order.items(): + plrid = [a.playlistrow_id for a in playlistrows_dto if a.row_number == old_row_number][0] + update_list.append(dict(id=plrid, row_number=new_row_number)) + + # Update rows + with db.Session() as session: + session.execute(update(PlaylistRows), update_list) + session.commit() + + # Sanity check + _check_playlist_integrity(session, get_playlist_rows(playlist_id), fix=False) + + def create_playlist(name: str, template_id: int) -> PlaylistDTO: """ Create playlist and return DTO. @@ -417,6 +481,27 @@ def get_playlist_row(playlistrow_id: int) -> PlaylistRowDTO | None: return dto +def _check_playlist_integrity( + session: Session, playlist_rows: list[PlaylistRowDTO], fix: bool = False +) -> None: + """ + Ensure the row numbers are contiguous. Fix and log if fix==True, + else raise ApplicationError. + """ + + for idx, plr in enumerate(playlist_rows): + if plr.row_number == idx: + continue + + msg = f"_check_playlist_integrity: incorrect row number ({plr.playlistrow_id=}, {idx=})" + if fix: + log.debug(msg) + plr.row_number = idx + session.commit() + else: + raise ApplicationError(msg) + + def get_playlist_rows(playlist_id: int) -> list[PlaylistRowDTO]: # Alias PlaydatesTable for subquery LatestPlaydate = aliased(Playdates) @@ -458,6 +543,9 @@ def get_playlist_rows(playlist_id: int) -> list[PlaylistRowDTO]: with db.Session() as session: results = session.execute(stmt).all() + # Sanity check + # TODO: would be good to be confident at removing this + _check_playlist_integrity(session=session, playlist_rows=results, fix=False) dto_list = [] for row in results: @@ -516,10 +604,10 @@ def insert_row( with db.Session() as session: # Sanity check - _check_row_number_sequence(session=session, playlist_id=playlist_id, fix=False) + _check_playlist_integrity(session, get_playlist_rows(playlist_id), fix=False) # Make space for new row - _move_rows_down( + _move_rows( session=session, playlist_id=playlist_id, starting_row=row_number, move_by=1 ) @@ -534,7 +622,7 @@ def insert_row( playlist_row_id = playlist_row.id # Sanity check - _check_row_number_sequence(session=session, playlist_id=playlist_id, fix=False) + _check_playlist_integrity(session, get_playlist_rows(playlist_id), fix=False) new_playlist_row = get_playlist_row(playlistrow_id=playlist_row_id) if not new_playlist_row: @@ -543,6 +631,29 @@ def insert_row( return new_playlist_row +def remove_rows(playlist_id: int, row_numbers: list[int]) -> None: + """ + Remove rows from playlist + + Delete from highest row back so that not yet deleted row numbers don't change. + """ + + log.debug(f"remove_rows({playlist_id=}, {row_numbers=}") + + with db.Session() as session: + for row_number in sorted(row_numbers, reverse=True): + session.execute( + delete(PlaylistRows).where( + PlaylistRows.playlist_id == playlist_id, + PlaylistRows.row_number == row_number, + ) + ) + # Fixup row number to remove gaps + _check_playlist_integrity(session, playlist_id, fix=True) + + session.commit() + + def playlist_by_id(playlist_id: int) -> PlaylistDTO | None: """ Return playlist with specified id @@ -579,7 +690,9 @@ def get_setting(name: str) -> int | None: """ with db.Session() as session: - record = session.execute(select(Settings).where(Settings.name == name)).one_or_none() + record = session.execute( + select(Settings).where(Settings.name == name) + ).one_or_none() if not record: return None @@ -592,12 +705,12 @@ def set_setting(name: str, value: int) -> None: """ with db.Session() as session: - record = session.execute(select(Settings).where(Settings.name == name)).one_or_none() + record = session.execute( + select(Settings).where(Settings.name == name) + ).one_or_none() if not record: record = Settings(session=session, name=name) if not record: raise ApplicationError("Can't create Settings record") record.f_int = value session.commit() - - diff --git a/kae.py b/kae.py new file mode 100755 index 0000000..ad1fdbe --- /dev/null +++ b/kae.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python3 + +import sys +import datetime as dt +from dataclasses import dataclass +from PyQt6.QtWidgets import ( + QApplication, QDialog, QLabel, QLineEdit, QListWidget, + QVBoxLayout, QHBoxLayout, QPushButton, QListWidgetItem +) +from PyQt6.QtCore import Qt, pyqtSignal + +@dataclass +class TrackDTO: + track_id: int + artist: str + bitrate: int + duration: int # milliseconds + fade_at: int + intro: int | None + path: str + silence_at: int + start_gap: int + title: str + lastplayed: dt.datetime | None + +# Placeholder external function to simulate search +def search_titles(query: str) -> list[TrackDTO]: + now = dt.datetime.now() + dummy_tracks = [ + TrackDTO(1, "Artist A", 320, 210000, 0, None, "", 0, 0, "Title One", now - dt.timedelta(days=2)), + TrackDTO(2, "Artist B", 256, 185000, 0, None, "", 0, 0, "Another Title", now - dt.timedelta(days=30)), + TrackDTO(3, "Artist C", 320, 240000, 0, None, "", 0, 0, "More Music", None), + ] + return [t for t in dummy_tracks if query.lower() in t.title.lower()] + +def format_duration(ms: int) -> str: + minutes, seconds = divmod(ms // 1000, 60) + return f"{minutes}:{seconds:02d}" + +def friendly_last_played(lastplayed: dt.datetime | None) -> str: + if lastplayed is None: + return "(Never)" + now = dt.datetime.now() + delta = now - lastplayed + days = delta.days + + if days == 0: + return "(Today)" + elif days == 1: + return "(Yesterday)" + + years, days_remain = divmod(days, 365) + months, days_final = divmod(days_remain, 30) + + parts = [] + if years: + parts.append(f"{years}y") + if months: + parts.append(f"{months}m") + if days_final: + parts.append(f"{days_final}d") + formatted = " ".join(parts) + return f"({formatted} ago)" + +class TrackInsertDialog(QDialog): + signal_insert_track = pyqtSignal(int, str) + + def __init__(self, parent=None): + super().__init__(parent) + self.setWindowTitle("Insert Track") + + # Title input on one line + self.title_label = QLabel("Title:") + self.title_edit = QLineEdit() + self.title_edit.textChanged.connect(self.update_list) + + title_layout = QHBoxLayout() + title_layout.addWidget(self.title_label) + title_layout.addWidget(self.title_edit) + + # Track list + self.track_list = QListWidget() + + # Note input on one line + self.note_label = QLabel("Note:") + self.note_edit = QLineEdit() + + note_layout = QHBoxLayout() + note_layout.addWidget(self.note_label) + note_layout.addWidget(self.note_edit) + + # Buttons + self.add_btn = QPushButton("Add") + self.add_close_btn = QPushButton("Add and close") + self.close_btn = QPushButton("Close") + + self.add_btn.clicked.connect(self.add_clicked) + self.add_close_btn.clicked.connect(self.add_and_close_clicked) + self.close_btn.clicked.connect(self.close) + + btn_layout = QHBoxLayout() + btn_layout.addWidget(self.add_btn) + btn_layout.addWidget(self.add_close_btn) + btn_layout.addWidget(self.close_btn) + + # Main layout + layout = QVBoxLayout() + layout.addLayout(title_layout) + layout.addWidget(self.track_list) + layout.addLayout(note_layout) + layout.addLayout(btn_layout) + + self.setLayout(layout) + self.resize(600, 400) + + def update_list(self, text: str): + self.track_list.clear() + if text.strip() == "": + # Do not search or populate list if input is empty + return + + tracks = search_titles(text) + for track in tracks: + duration_str = format_duration(track.duration) + last_played_str = friendly_last_played(track.lastplayed) + item_str = f"{track.title} - {track.artist} [{duration_str}] {last_played_str}" + item = QListWidgetItem(item_str) + item.setData(Qt.ItemDataRole.UserRole, track.track_id) + self.track_list.addItem(item) + + def get_selected_track_id(self) -> int | None: + selected_items = self.track_list.selectedItems() + if selected_items: + return selected_items[0].data(Qt.ItemDataRole.UserRole) + return None + + def add_clicked(self): + track_id = self.get_selected_track_id() + if track_id is not None: + note_text = self.note_edit.text() + self.signal_insert_track.emit(track_id, note_text) + + self.title_edit.clear() + self.note_edit.clear() + self.track_list.clear() + self.title_edit.setFocus() + + def add_and_close_clicked(self): + track_id = self.get_selected_track_id() + if track_id is not None: + note_text = self.note_edit.text() + self.signal_insert_track.emit(track_id, note_text) + self.accept() + +# Test harness (for quick testing) +if __name__ == "__main__": + app = QApplication(sys.argv) + + dialog = TrackInsertDialog() + + def print_inserted(track_id, note): + print(f"Inserted track ID: {track_id}, Note: '{note}'") + + dialog.signal_insert_track.connect(print_inserted) + dialog.exec() + diff --git a/tests/test_db_updates.py b/tests/test_db_updates.py deleted file mode 100644 index a78c574..0000000 --- a/tests/test_db_updates.py +++ /dev/null @@ -1,78 +0,0 @@ -# Standard library imports -import unittest - -# PyQt imports - -# Third party imports - -# App imports -from app import playlistmodel -from app import repository -from app.models import db - - -class MyTestCase(unittest.TestCase): - @classmethod - def setUpClass(cls): - """Runs once before any test in this class""" - - pass - - @classmethod - def tearDownClass(cls): - """Runs once after all tests""" - - pass - - def setUp(self): - """Runs before each test""" - - db.create_all() - - # Create a playlist and model - playlist_name = "my playlist" - self.playlist = repository.create_playlist(name=playlist_name, template_id=0) - assert self.playlist - self.model = playlistmodel.PlaylistModel( - self.playlist.playlist_id, is_template=False - ) - assert self.model - - # Create tracks - track1_path = "testdata/isa.mp3" - self.track1 = repository.create_track(track1_path) - - track2_path = "testdata/mom.mp3" - self.track2 = repository.create_track(track2_path) - - # Add tracks and header to playlist - self.row0 = repository.insert_row( - self.playlist.playlist_id, - row_number=0, - track_id=self.track1.track_id, - note="track 1", - ) - self.row1 = repository.insert_row( - self.playlist.playlist_id, - row_number=1, - track_id=0, - note="Header row", - ) - self.row2 = repository.insert_row( - self.playlist.playlist_id, - row_number=2, - track_id=self.track2.track_id, - note="track 2", - ) - - def tearDown(self): - """Runs after each test""" - - db.drop_all() - - def test_add_track_to_header(self): - """Add a track to a header row""" - - repository.add_track_to_header(self.row1.playlistrow_id, self.track2.track_id) - result = repository.get_playlist_row(self.row1.playlistrow_id) - assert result.track_id == self.track2.track_id diff --git a/tests/test_playlistmodel.py b/tests/test_playlistmodel.py index 3590b1a..1d8d3d3 100644 --- a/tests/test_playlistmodel.py +++ b/tests/test_playlistmodel.py @@ -134,122 +134,6 @@ class TestMMMiscRowMove(unittest.TestCase): def tearDown(self): db.drop_all() - def test_move_rows_test2(self): - # move row 3 to row 5 - self.model.move_rows([3], 5) - # Check we have all rows and plr_rownums are correct - for row in range(self.model.rowCount()): - assert row in self.model.playlist_rows - assert self.model.playlist_rows[row].row_number == row - if row not in [3, 4, 5]: - assert self.model.playlist_rows[row].note == str(row) - elif row == 3: - assert self.model.playlist_rows[row].note == str(4) - elif row == 4: - assert self.model.playlist_rows[row].note == str(3) - elif row == 5: - assert self.model.playlist_rows[row].note == str(5) - - def test_move_rows_test3(self): - # move row 4 to row 3 - - self.model.move_rows([4], 3) - - # Check we have all rows and plr_rownums are correct - for row in range(self.model.rowCount()): - assert row in self.model.playlist_rows - assert self.model.playlist_rows[row].row_number == row - if row not in [3, 4]: - assert self.model.playlist_rows[row].note == str(row) - elif row == 3: - assert self.model.playlist_rows[row].note == str(4) - elif row == 4: - assert self.model.playlist_rows[row].note == str(3) - - def test_move_rows_test4(self): - # move row 4 to row 2 - - self.model.move_rows([4], 2) - - # Check we have all rows and plr_rownums are correct - for row in range(self.model.rowCount()): - assert row in self.model.playlist_rows - assert self.model.playlist_rows[row].row_number == row - if row not in [2, 3, 4]: - assert self.model.playlist_rows[row].note == str(row) - elif row == 2: - assert self.model.playlist_rows[row].note == str(4) - elif row == 3: - assert self.model.playlist_rows[row].note == str(2) - elif row == 4: - assert self.model.playlist_rows[row].note == str(3) - - def test_move_rows_test5(self): - # move rows [1, 4, 5, 10] → 8 - - self.model.move_rows([1, 4, 5, 10], 8) - - # Check we have all rows and plr_rownums are correct - new_order = [] - for row in range(self.model.rowCount()): - assert row in self.model.playlist_rows - assert self.model.playlist_rows[row].row_number == row - new_order.append(int(self.model.playlist_rows[row].note)) - assert new_order == [0, 2, 3, 6, 7, 1, 4, 5, 10, 8, 9] - - def test_move_rows_test6(self): - # move rows [3, 6] → 5 - - self.model.move_rows([3, 6], 5) - - # Check we have all rows and plr_rownums are correct - new_order = [] - for row in range(self.model.rowCount()): - assert row in self.model.playlist_rows - assert self.model.playlist_rows[row].row_number == row - new_order.append(int(self.model.playlist_rows[row].note)) - assert new_order == [0, 1, 2, 4, 3, 6, 5, 7, 8, 9, 10] - - def test_move_rows_test7(self): - # move rows [3, 5, 6] → 8 - - self.model.move_rows([3, 5, 6], 8) - - # Check we have all rows and plr_rownums are correct - new_order = [] - for row in range(self.model.rowCount()): - assert row in self.model.playlist_rows - assert self.model.playlist_rows[row].row_number == row - new_order.append(int(self.model.playlist_rows[row].note)) - assert new_order == [0, 1, 2, 4, 7, 3, 5, 6, 8, 9, 10] - - def test_move_rows_test8(self): - # move rows [7, 8, 10] → 5 - - self.model.move_rows([7, 8, 10], 5) - - # Check we have all rows and plr_rownums are correct - new_order = [] - for row in range(self.model.rowCount()): - assert row in self.model.playlist_rows - assert self.model.playlist_rows[row].row_number == row - new_order.append(int(self.model.playlist_rows[row].note)) - assert new_order == [0, 1, 2, 3, 4, 7, 8, 10, 5, 6, 9] - - def test_move_rows_test9(self): - # move rows [1, 2, 3] → 0 - # Replicate issue 244 - - self.model.move_rows([0, 1, 2, 3], 0) - - # Check we have all rows and plr_rownums are correct - new_order = [] - for row in range(self.model.rowCount()): - assert row in self.model.playlist_rows - assert self.model.playlist_rows[row].row_number == row - new_order.append(int(self.model.playlist_rows[row].note)) - assert new_order == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] - def test_insert_header_row_end(self): # insert header row at end of playlist diff --git a/tests/test_repository.py b/tests/test_repository.py new file mode 100644 index 0000000..9dfc3e0 --- /dev/null +++ b/tests/test_repository.py @@ -0,0 +1,252 @@ +# Standard library imports +import unittest + +# PyQt imports + +# Third party imports + +# App imports +from app import playlistmodel +from app import repository +from app.models import db +from classes import PlaylistDTO +from playlistmodel import PlaylistModel + + +class MyTestCase(unittest.TestCase): + @classmethod + def setUpClass(cls): + """Runs once before any test in this class""" + + cls.isa_path = "testdata/isa.mp3" + cls.isa_title = "I'm So Afraid" + cls.isa_artist = "Fleetwood Mac" + cls.mom_path = "testdata/mom.mp3" + cls.mom_title = "Man of Mystery" + cls.mom_artist = "The Shadows" + + @classmethod + def tearDownClass(cls): + """Runs once after all tests""" + + pass + + def setUp(self): + """Runs before each test""" + + db.create_all() + + def create_playlist_and_model(self, playlist_name: str) -> (PlaylistDTO, PlaylistModel): + # Create a playlist and model + playlist = repository.create_playlist(name=playlist_name, template_id=0) + assert playlist + model = playlistmodel.PlaylistModel(playlist.playlist_id, is_template=False) + assert model + + return (playlist, model) + + def create_playlist_model_tracks(self, playlist_name: str): + (playlist, model) = self.create_playlist_and_model("my playlist") + # Create tracks + self.track1 = repository.create_track(self.isa_path) + + self.track2 = repository.create_track(self.mom_path) + + # Add tracks and header to playlist + self.row0 = repository.insert_row( + playlist.playlist_id, + row_number=0, + track_id=self.track1.track_id, + note="track 1", + ) + self.row1 = repository.insert_row( + playlist.playlist_id, + row_number=1, + track_id=0, + note="Header row", + ) + self.row2 = repository.insert_row( + playlist.playlist_id, + row_number=2, + track_id=self.track2.track_id, + note="track 2", + ) + + def create_rows(self, playlist_name: str, number_of_rows: int) -> (PlaylistDTO, PlaylistModel): + (playlist, model) = self.create_playlist_and_model(playlist_name) + for row_number in range(number_of_rows): + repository.insert_row( + playlist.playlist_id, row_number, None, str(row_number) + ) + + return (playlist, model) + + def tearDown(self): + """Runs after each test""" + + db.drop_all() + + def test_add_track_to_header(self): + """Add a track to a header row""" + + self.create_playlist_model_tracks("my playlist") + repository.add_track_to_header(self.row1.playlistrow_id, self.track2.track_id) + result = repository.get_playlist_row(self.row1.playlistrow_id) + assert result.track_id == self.track2.track_id + + def test_create_track(self): + repository.create_track(self.isa_path) + results = repository.get_all_tracks() + assert len(results) == 1 + assert results[0].path == self.isa_path + + def test_get_track_by_id(self): + dto = repository.create_track(self.isa_path) + result = repository.track_by_id(dto.track_id) + assert result.path == self.isa_path + + def test_get_track_by_artist(self): + _ = repository.create_track(self.isa_path) + _ = repository.create_track(self.mom_path) + result_isa = repository.tracks_like_artist(self.isa_artist) + assert len(result_isa) == 1 + assert result_isa[0].artist == self.isa_artist + result_mom = repository.tracks_like_artist(self.mom_artist) + assert len(result_mom) == 1 + assert result_mom[0].artist == self.mom_artist + + def test_get_track_by_title(self): + _ = repository.create_track(self.isa_path) + _ = repository.create_track(self.mom_path) + result_isa = repository.tracks_like_title(self.isa_title) + assert len(result_isa) == 1 + assert result_isa[0].title == self.isa_title + result_mom = repository.tracks_like_title(self.mom_title) + assert len(result_mom) == 1 + assert result_mom[0].title == self.mom_title + + def test_move_rows_test1(self): + # move row 3 to row 5 + + number_of_rows = 10 + (playlist, model) = self.create_rows("test_move_rows_test1", number_of_rows) + + repository.move_rows_within_playlist(playlist.playlist_id, [3], 5) + + # Check we have all rows and plr_rownums are correct + new_order = [] + for row in repository.get_playlist_rows(playlist.playlist_id): + new_order.append(int(row.note)) + assert new_order == [0, 1, 2, 4, 5, 3, 6, 7, 8, 9] + + def test_move_rows_test2(self): + # move row 4 to row 3 + + number_of_rows = 10 + (playlist, model) = self.create_rows("test_move_rows_test2", number_of_rows) + + repository.move_rows_within_playlist(playlist.playlist_id, [4], 3) + + # Check we have all rows and plr_rownums are correct + new_order = [] + for row in repository.get_playlist_rows(playlist.playlist_id): + new_order.append(int(row.note)) + assert new_order == [0, 1, 2, 4, 3, 5, 6, 7, 8, 9] + + def test_move_rows_test3(self): + # move row 4 to row 2 + + number_of_rows = 10 + (playlist, model) = self.create_rows("test_move_rows_test3", number_of_rows) + + repository.move_rows_within_playlist(playlist.playlist_id, [4], 2) + + # Check we have all rows and plr_rownums are correct + new_order = [] + for row in repository.get_playlist_rows(playlist.playlist_id): + new_order.append(int(row.note)) + assert new_order == [0, 1, 4, 2, 3, 5, 6, 7, 8, 9] + + def test_move_rows_test4(self): + # move rows [1, 4, 5, 10] → 8 + + number_of_rows = 11 + (playlist, model) = self.create_rows("test_move_rows_test4", number_of_rows) + + repository.move_rows_within_playlist(playlist.playlist_id, [1, 4, 5, 10], 8) + + # Check we have all rows and plr_rownums are correct + new_order = [] + for row in repository.get_playlist_rows(playlist.playlist_id): + new_order.append(int(row.note)) + assert new_order == [0, 2, 3, 6, 7, 1, 4, 5, 10, 8, 9] + + def test_move_rows_test5(self): + # move rows [3, 6] → 5 + + number_of_rows = 11 + (playlist, model) = self.create_rows("test_move_rows_test5", number_of_rows) + + repository.move_rows_within_playlist(playlist.playlist_id, [3, 6], 5) + + # Check we have all rows and plr_rownums are correct + new_order = [] + for row in repository.get_playlist_rows(playlist.playlist_id): + new_order.append(int(row.note)) + assert new_order == [0, 1, 2, 4, 5, 3, 6, 7, 8, 9, 10] + + def test_move_rows_test8(self): + # move rows [3, 5, 6] → 8 + + number_of_rows = 11 + (playlist, model) = self.create_rows("test_move_rows_test6", number_of_rows) + + repository.move_rows_within_playlist(playlist.playlist_id, [3, 5, 6], 8) + + # Check we have all rows and plr_rownums are correct + new_order = [] + for row in repository.get_playlist_rows(playlist.playlist_id): + new_order.append(int(row.note)) + assert new_order == [0, 1, 2, 4, 7, 3, 5, 6, 8, 9, 10] + + + + + self.model.move_rows([3, 5, 6], 8) + + # Check we have all rows and plr_rownums are correct + new_order = [] + for row in range(self.model.rowCount()): + assert row in self.model.playlist_rows + assert self.model.playlist_rows[row].row_number == row + new_order.append(int(self.model.playlist_rows[row].note)) + assert new_order == [0, 1, 2, 4, 7, 3, 5, 6, 8, 9, 10] + + # def test_move_rows_test8(self): + # # move rows [7, 8, 10] → 5 + + # self.model.move_rows([7, 8, 10], 5) + + # # Check we have all rows and plr_rownums are correct + # new_order = [] + # for row in range(self.model.rowCount()): + # assert row in self.model.playlist_rows + # assert self.model.playlist_rows[row].row_number == row + # new_order.append(int(self.model.playlist_rows[row].note)) + # assert new_order == [0, 1, 2, 3, 4, 7, 8, 10, 5, 6, 9] + + # def test_move_rows_test9(self): + # # move rows [1, 2, 3] → 0 + # # Replicate issue 244 + + # self.model.move_rows([0, 1, 2, 3], 0) + + # # Check we have all rows and plr_rownums are correct + # new_order = [] + # for row in range(self.model.rowCount()): + # assert row in self.model.playlist_rows + # assert self.model.playlist_rows[row].row_number == row + # new_order.append(int(self.model.playlist_rows[row].note)) + # assert new_order == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + +