From 199abc9c0ccfed6f94de8ae6203f7a08021e56a4 Mon Sep 17 00:00:00 2001 From: Keith Edmunds Date: Fri, 11 Apr 2025 17:13:03 +0100 Subject: [PATCH] Fix showing tracks from queries --- app/classes.py | 3 +- app/models.py | 3 + app/musicmuster.py | 23 +- app/playlistrow.py | 70 ++++-- app/querylistmodel.py | 7 +- app/repository.py | 475 +++++++++++++++------------------------ tests/test_repository.py | 31 ++- 7 files changed, 284 insertions(+), 328 deletions(-) diff --git a/app/classes.py b/app/classes.py index e219c9b..2da2378 100644 --- a/app/classes.py +++ b/app/classes.py @@ -80,12 +80,13 @@ class TrackDTO: @dataclass -class PlaylistRowDTO(TrackDTO): +class PlaylistRowDTO: note: str played: bool playlist_id: int playlistrow_id: int row_number: int + track: TrackDTO | None @dataclass diff --git a/app/models.py b/app/models.py index 96cb156..fea1999 100644 --- a/app/models.py +++ b/app/models.py @@ -738,6 +738,9 @@ class Tracks(dbtables.TracksTable): Return tracks matching filter """ + # Now implemented in repostory.py + return [] + query = select(cls) # Path specification diff --git a/app/musicmuster.py b/app/musicmuster.py index a1a5545..fc0c221 100755 --- a/app/musicmuster.py +++ b/app/musicmuster.py @@ -1297,7 +1297,7 @@ class Window(QMainWindow): if handler: # Use a lambda to pass arguments to the function - action.triggered.connect(lambda _, h=handler, a=args: h(*a)) + action.triggered.connect(lambda _, h=handler, a=args: h(a)) submenu.addAction(action) break @@ -1318,7 +1318,7 @@ class Window(QMainWindow): { "text": "Show all", "handler": "create_playlist_from_template", - "args": (0), + "args": 0, }, { "separator": True, @@ -1829,15 +1829,16 @@ class Window(QMainWindow): # Required directive on first line f.write("#EXTM3U\n") for playlistrow in repository.get_playlist_rows(playlist_id): - f.write( - "#EXTINF:" - f"{int(playlistrow.duration / 1000)}," - f"{playlistrow.title} - " - f"{playlistrow.artist}" - "\n" - f"{playlistrow.path}" - "\n" - ) + if playlistrow.track: + f.write( + "#EXTINF:" + f"{int(playlistrow.track.duration / 1000)}," + f"{playlistrow.track.title} - " + f"{playlistrow.track.artist}" + "\n" + f"{playlistrow.track.path}" + "\n" + ) def fade(self, checked: bool = False) -> None: """Fade currently playing track""" diff --git a/app/playlistrow.py b/app/playlistrow.py index 7118a67..8320b0c 100644 --- a/app/playlistrow.py +++ b/app/playlistrow.py @@ -52,17 +52,23 @@ class PlaylistRow: self.start_time: dt.datetime | None = None def __repr__(self) -> str: + track_id = None + if self.dto.track: + track_id = self.dto.track.track_id return ( f"" + f"note={self.dto.note}, track_id={track_id}>" ) # Expose TrackDTO fields as properties @property def artist(self): - return self.dto.artist + if self.dto.track: + return self.dto.track.artist + else: + return "" @artist.setter def artist(self, value: str) -> None: @@ -70,19 +76,31 @@ class PlaylistRow: @property def bitrate(self): - return self.dto.bitrate + if self.dto.track: + return self.dto.track.bitrate + else: + return 0 @property def duration(self): - return self.dto.duration + if self.dto.track: + return self.dto.track.duration + else: + return 0 @property def fade_at(self): - return self.dto.fade_at + if self.dto.track: + return self.dto.track.fade_at + else: + return 0 @property def intro(self): - return self.dto.intro + if self.dto.track: + return self.dto.track.intro + else: + return 0 @intro.setter def intro(self, value: int) -> None: @@ -90,23 +108,38 @@ class PlaylistRow: @property def lastplayed(self): - return self.dto.lastplayed + if self.dto.track: + return self.dto.track.lastplayed + else: + return None @property def path(self): - return self.dto.path + if self.dto.track: + return self.dto.track.path + else: + return "" @property def silence_at(self): - return self.dto.silence_at + if self.dto.track: + return self.dto.track.silence_at + else: + return 0 @property def start_gap(self): - return self.dto.start_gap + if self.dto.track: + return self.dto.track.start_gap + else: + return 0 @property def title(self): - return self.dto.title + if self.dto.track: + return self.dto.track.title + else: + return "" @title.setter def title(self, value: str) -> None: @@ -114,10 +147,13 @@ class PlaylistRow: @property def track_id(self): - return self.dto.track_id + if self.dto.track: + return self.dto.track.track_id + else: + return 0 @track_id.setter - def track_id(self, value: int) -> None: + def track_id(self, track_id: int) -> None: """ Adding a track_id should only happen to a header row. """ @@ -125,6 +161,14 @@ class PlaylistRow: if self.track_id > 0: raise ApplicationError("Attempting to add track to row with existing track ({self=}") + repository.add_track_to_header(track_id) + + # Need to update with track information + track = repository.track_by_id(track_id) + if track: + for attr, value in track.__dataclass_fields__.items(): + setattr(self, attr, value) + # TODO: set up write access to track_id. Should only update if # track_id == 0. Need to update all other track fields at the # same time. diff --git a/app/querylistmodel.py b/app/querylistmodel.py index 974ca15..27b057d 100644 --- a/app/querylistmodel.py +++ b/app/querylistmodel.py @@ -231,16 +231,11 @@ class QuerylistModel(QAbstractTableModel): try: results = repository.get_filtered_tracks(self.filter) for result in results: - lastplayed = None - if hasattr(result, "playdates"): - pds = result.playdates - if pds: - lastplayed = max([a.lastplayed for a in pds]) queryrow = QueryRow( artist=result.artist, bitrate=result.bitrate or 0, duration=result.duration, - lastplayed=lastplayed, + lastplayed=result.lastplayed, path=result.path, title=result.title, track_id=result.track_id, diff --git a/app/repository.py b/app/repository.py index 0c9839e..0a87edf 100644 --- a/app/repository.py +++ b/app/repository.py @@ -129,12 +129,10 @@ def remove_colour_substring(text: str) -> str: # Track functions @log_call -def _tracks_where( - query: BinaryExpression | ColumnElement[bool], +def _tracks_where(query: BinaryExpression | ColumnElement[bool],) -> list[TrackDTO]: + """ filter_by_last_played: bool = False, last_played_before: dt.datetime | None = None, -) -> list[TrackDTO]: - """ Return tracks selected by query """ @@ -150,36 +148,23 @@ def _tracks_where( .group_by(LatestPlaydate.track_id) .subquery() ) - if not filter_by_last_played: - query = query.outerjoin( - latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id + stmt = ( + select( + Tracks.id.label("track_id"), + Tracks.artist, + Tracks.bitrate, + Tracks.duration, + Tracks.fade_at, + Tracks.intro, + Tracks.path, + Tracks.silence_at, + Tracks.start_gap, + Tracks.title, + latest_playdate_subq.c.lastplayed, ) - else: - # We are filtering by last played. If last_played_before is None, - # we want tracks that have never been played - if last_played_before is None: - query = query.outerjoin(Playdates, Tracks.id == Playdates.track_id).where( - Playdates.id.is_(None) - ) - else: - query = query.join( - latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id - ).where(latest_playdate_subq.c.max_last_played < last_played_before) - pass - - stmt = select( - Tracks.id.label("track_id"), - Tracks.artist, - Tracks.bitrate, - Tracks.duration, - Tracks.fade_at, - Tracks.intro, - Tracks.path, - Tracks.silence_at, - Tracks.start_gap, - Tracks.title, - latest_playdate_subq.c.lastplayed, - ).where(query) + .outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id) + .where(query) + ) results: list[TrackDTO] = [] @@ -252,6 +237,97 @@ def get_all_tracks() -> list[TrackDTO]: return _tracks_where(Tracks.id > 0) +def get_filtered_tracks(filter: Filter) -> list[TrackDTO]: + """ + Return tracks matching filter + """ + + query = select(Tracks) + + # Path specification + if filter.path: + if filter.path_type == "contains": + query = query.where(Tracks.path.ilike(f"%{filter.path}%")) + elif filter.path_type == "excluding": + query = query.where(Tracks.path.notilike(f"%{filter.path}%")) + else: + raise ApplicationError(f"Can't process filter path ({filter=})") + + # Duration specification + seconds_duration = filter.duration_number + if filter.duration_unit == Config.FILTER_DURATION_MINUTES: + seconds_duration *= 60 + elif filter.duration_unit != Config.FILTER_DURATION_SECONDS: + raise ApplicationError(f"Can't process filter duration ({filter=})") + + if filter.duration_type == Config.FILTER_DURATION_LONGER: + query = query.where(Tracks.duration >= seconds_duration) + elif filter.duration_unit == Config.FILTER_DURATION_SHORTER: + query = query.where(Tracks.duration <= seconds_duration) + else: + raise ApplicationError(f"Can't process filter duration type ({filter=})") + + # Process comparator + if filter.last_played_comparator == Config.FILTER_PLAYED_COMPARATOR_NEVER: + # Select tracks that have never been played + query = query.outerjoin(Playdates, Tracks.id == Playdates.track_id).where( + Playdates.id.is_(None) + ) + else: + # Last played specification + now = dt.datetime.now() + # Set sensible default, and correct for Config.FILTER_PLAYED_COMPARATOR_ANYTIME + before = now + # If not ANYTIME, set 'before' appropriates + if filter.last_played_comparator != Config.FILTER_PLAYED_COMPARATOR_ANYTIME: + if filter.last_played_unit == Config.FILTER_PLAYED_DAYS: + before = now - dt.timedelta(days=filter.last_played_number) + elif filter.last_played_unit == Config.FILTER_PLAYED_WEEKS: + before = now - dt.timedelta(days=7 * filter.last_played_number) + elif filter.last_played_unit == Config.FILTER_PLAYED_MONTHS: + before = now - dt.timedelta(days=30 * filter.last_played_number) + elif filter.last_played_unit == Config.FILTER_PLAYED_YEARS: + before = now - dt.timedelta(days=365 * filter.last_played_number) + + subquery = ( + select( + Playdates.track_id, + func.max(Playdates.lastplayed).label("max_last_played"), + ) + .group_by(Playdates.track_id) + .subquery() + ) + query = query.join(subquery, Tracks.id == subquery.c.track_id).where( + subquery.c.max_last_played < before + ) + + results: list[TrackDTO] = [] + with db.Session() as session: + records = session.scalars(query).unique().all() + for record in records: + if record.playdates: + last_played = record.playdates[0].lastplayed + else: + last_played = None + last_played + dto = TrackDTO( + artist=record.artist, + bitrate=record.bitrate, + duration=record.duration, + fade_at=record.fade_at, + intro=record.intro, + lastplayed=last_played, + path=record.path, + silence_at=record.silence_at, + start_gap=record.start_gap, + title=record.title, + track_id=record.id, + ) + results.append(dto) + + return results + + @log_call def add_track_to_header(playlistrow_id: int, track_id: int) -> None: """ @@ -329,66 +405,6 @@ def update_track( return updated_track -@log_call -def get_filtered_tracks(filter: Filter) -> list[TrackDTO]: - """ - Return tracks matching filter - """ - - # Create a base query - query = Tracks.id > 0 - - # Path specification - if filter.path: - if filter.path_type == "contains": - query = query.where(Tracks.path.ilike(f"%{filter.path}%")) - elif filter.path_type == "excluding": - query = query.where(Tracks.path.notilike(f"%{filter.path}%")) - else: - raise ApplicationError(f"Can't process filter path ({filter=})") - - # Duration specification - seconds_duration = filter.duration_number - if filter.duration_unit == Config.FILTER_DURATION_MINUTES: - seconds_duration *= 60 - elif filter.duration_unit != Config.FILTER_DURATION_SECONDS: - raise ApplicationError(f"Can't process filter duration ({filter=})") - - if filter.duration_type == Config.FILTER_DURATION_LONGER: - query = query.where(Tracks.duration >= seconds_duration) - elif filter.duration_unit == Config.FILTER_DURATION_SHORTER: - query = query.where(Tracks.duration <= seconds_duration) - else: - raise ApplicationError(f"Can't process filter duration type ({filter=})") - - # Process comparator - if filter.last_played_comparator == Config.FILTER_PLAYED_COMPARATOR_ANYTIME: - return _tracks_where(query, filter_by_last_played=False) - - elif filter.last_played_comparator == Config.FILTER_PLAYED_COMPARATOR_NEVER: - return _tracks_where(query, filter_by_last_played=True, last_played_before=None) - else: - # Last played specification - now = dt.datetime.now() - # Set sensible default, and correct for Config.FILTER_PLAYED_COMPARATOR_ANYTIME - before = now - # If not ANYTIME, set 'before' appropriates - if filter.last_played_unit == Config.FILTER_PLAYED_DAYS: - before = now - dt.timedelta(days=filter.last_played_number) - elif filter.last_played_unit == Config.FILTER_PLAYED_WEEKS: - before = now - dt.timedelta(days=7 * filter.last_played_number) - elif filter.last_played_unit == Config.FILTER_PLAYED_MONTHS: - before = now - dt.timedelta(days=30 * filter.last_played_number) - elif filter.last_played_unit == Config.FILTER_PLAYED_YEARS: - before = now - dt.timedelta(days=365 * filter.last_played_number) - else: - raise ApplicationError("Can't determine last played criteria") - - return _tracks_where( - query, filter_by_last_played=True, last_played_before=before - ) - - def set_track_intro(track_id: int, intro: int) -> None: """ Set track intro time @@ -730,196 +746,6 @@ def create_playlist(name: str, template_id: int, as_template: bool = False) -> P return new_playlist -@log_call -def get_playlist_row(playlistrow_id: int) -> PlaylistRowDTO | None: - """ - Return specific row DTO - """ - - # Alias PlaydatesTable for subquery - LatestPlaydate = aliased(Playdates) - - # Subquery: latest playdate for each track - latest_playdate_subq = ( - select( - LatestPlaydate.track_id, - func.max(LatestPlaydate.lastplayed).label("lastplayed"), - ) - .group_by(LatestPlaydate.track_id) - .subquery() - ) - - stmt = ( - select( - PlaylistRows.id.label("playlistrow_id"), - PlaylistRows.row_number, - PlaylistRows.note, - PlaylistRows.played, - PlaylistRows.playlist_id, - Tracks.id.label("track_id"), - Tracks.artist, - Tracks.bitrate, - Tracks.duration, - Tracks.fade_at, - Tracks.intro, - Tracks.path, - Tracks.silence_at, - Tracks.start_gap, - Tracks.title, - latest_playdate_subq.c.lastplayed, - ) - .outerjoin(Tracks, PlaylistRows.track_id == Tracks.id) - .outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id) - .where(PlaylistRows.id == playlistrow_id) - .order_by(PlaylistRows.row_number) - ) - - with db.Session() as session: - record = session.execute(stmt).one_or_none() - if not record: - return None - - # Handle cases where track_id is None (no track associated) - if record.track_id is None: - dto = PlaylistRowDTO( - artist="", - bitrate=0, - duration=0, - fade_at=0, - intro=None, - lastplayed=None, - note=record.note, - path="", - played=record.played, - playlist_id=record.playlist_id, - playlistrow_id=record.playlistrow_id, - row_number=record.row_number, - silence_at=0, - start_gap=0, - title="", - track_id=-1, - ) - else: - dto = PlaylistRowDTO( - artist=record.artist, - bitrate=record.bitrate, - duration=record.duration, - fade_at=record.fade_at, - intro=record.intro, - lastplayed=record.lastplayed, - note=record.note, - path=record.path, - played=record.played, - playlist_id=record.playlist_id, - playlistrow_id=record.playlistrow_id, - row_number=record.row_number, - silence_at=record.silence_at, - start_gap=record.start_gap, - title=record.title, - track_id=record.track_id, - ) - - return dto - - -def get_playlist_rows( - playlist_id: int, check_playlist_itegrity: bool = True -) -> list[PlaylistRowDTO]: - # Alias PlaydatesTable for subquery - LatestPlaydate = aliased(Playdates) - - # Subquery: latest playdate for each track - latest_playdate_subq = ( - select( - LatestPlaydate.track_id, - func.max(LatestPlaydate.lastplayed).label("lastplayed"), - ) - .group_by(LatestPlaydate.track_id) - .subquery() - ) - - stmt = ( - select( - PlaylistRows.id.label("playlistrow_id"), - PlaylistRows.row_number, - PlaylistRows.note, - PlaylistRows.played, - PlaylistRows.playlist_id, - Tracks.id.label("track_id"), - Tracks.artist, - Tracks.bitrate, - Tracks.duration, - Tracks.fade_at, - Tracks.intro, - Tracks.path, - Tracks.silence_at, - Tracks.start_gap, - Tracks.title, - latest_playdate_subq.c.lastplayed, - ) - .outerjoin(Tracks, PlaylistRows.track_id == Tracks.id) - .outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id) - .where(PlaylistRows.playlist_id == playlist_id) - .order_by(PlaylistRows.row_number) - ) - - with db.Session() as session: - results = session.execute(stmt).all() - # Sanity check - # TODO: would be good to be confident at removing this - if check_playlist_itegrity: - _check_playlist_integrity( - session=session, playlist_id=playlist_id, fix=False - ) - - dto_list = [] - for row in results: - # Handle cases where track_id is None (no track associated) - if row.track_id is None: - dto = PlaylistRowDTO( - artist="", - bitrate=0, - duration=0, - fade_at=0, - intro=None, - lastplayed=None, - note=row.note, - path="", - played=row.played, - playlist_id=row.playlist_id, - playlistrow_id=row.playlistrow_id, - row_number=row.row_number, - silence_at=0, - start_gap=0, - title="", - track_id=-1, - # Additional fields like row_fg, row_bg, etc., use default None values - ) - else: - dto = PlaylistRowDTO( - artist=row.artist, - bitrate=row.bitrate, - duration=row.duration, - fade_at=row.fade_at, - intro=row.intro, - lastplayed=row.lastplayed, - note=row.note, - path=row.path, - played=row.played, - playlist_id=row.playlist_id, - playlistrow_id=row.playlistrow_id, - row_number=row.row_number, - silence_at=row.silence_at, - start_gap=row.start_gap, - title=row.title, - track_id=row.track_id, - # Additional fields like row_fg, row_bg, etc., use default None values - ) - dto_list.append(dto) - - return dto_list - - def copy_playlist(src_id: int, dst_id: int) -> None: """Copy playlist entries""" @@ -1068,6 +894,78 @@ def playlist_save_tabs(playlist_id_to_tab: dict[int, int]) -> None: session.commit() +# Playlist Rows + +@log_call +def get_playlist_row(playlistrow_id: int) -> PlaylistRowDTO | None: + """ + Return specific row DTO + """ + + with db.Session() as session: + record = ( + session.execute(select(PlaylistRows).where(PlaylistRows.id == playlistrow_id)) + .scalars() + .one_or_none() + ) + if not record: + return None + + track = None + if record.track_id: + track = track_by_id(record.track_id) + + dto = PlaylistRowDTO( + note=record.note, + played=record.played, + playlist_id=record.playlist_id, + playlistrow_id=record.id, + row_number=record.row_number, + track=track, + ) + + return dto + + +def get_playlist_rows( + playlist_id: int, check_playlist_itegrity: bool = True +) -> list[PlaylistRowDTO]: + + with db.Session() as session: + + # TODO: would be good to be confident at removing this + if check_playlist_itegrity: + _check_playlist_integrity( + session=session, playlist_id=playlist_id, fix=False + ) + + records = session.scalars( + select(PlaylistRows) + .where(PlaylistRows.playlist_id == playlist_id) + .order_by(PlaylistRows.row_number) + ).all() + + dto_list = [] + for record in records: + + track = None + if record.track_id: + track = track_by_id(record.track_id) + + dto = PlaylistRowDTO( + note=record.note, + played=record.played, + playlist_id=record.playlist_id, + playlistrow_id=record.id, + row_number=record.row_number, + track=track, + ) + + dto_list.append(dto) + + return dto_list + + # Playdates @log_call def get_last_played_dates(track_id: int, limit: int = 5) -> str: @@ -1154,20 +1052,19 @@ def _queries_where( Return queries selected by query """ - stmt = select( - Queries.id.label("query_id"), Queries.name, Queries.favourite, Queries.filter - ).where(query) - results: list[QueryDTO] = [] with db.Session() as session: - records = session.execute(stmt).one_or_none() + records = session.scalars( + select(Queries) + .where(query) + ).all() for record in records: dto = QueryDTO( favourite=record.favourite, filter=record.filter, name=record.name, - query_id=record.query_id, + query_id=record.id, ) results.append(dto) diff --git a/tests/test_repository.py b/tests/test_repository.py index e94999b..4219d85 100644 --- a/tests/test_repository.py +++ b/tests/test_repository.py @@ -49,7 +49,7 @@ class MyTestCase(unittest.TestCase): return (playlist, model) def create_playlist_model_tracks(self, playlist_name: str): - (playlist, model) = self.create_playlist_and_model("my playlist") + (playlist, model) = self.create_playlist_and_model(playlist_name) # Create tracks metadata1 = get_all_track_metadata(self.isa_path) self.track1 = repository.create_track(self.isa_path, metadata1) @@ -99,7 +99,7 @@ class MyTestCase(unittest.TestCase): self.create_playlist_model_tracks("my playlist") repository.add_track_to_header(self.row1.playlistrow_id, self.track2.track_id) result = repository.get_playlist_row(self.row1.playlistrow_id) - assert result.track_id == self.track2.track_id + assert result.track.track_id == self.track2.track_id def test_create_track(self): metadata = get_all_track_metadata(self.isa_path) @@ -119,18 +119,18 @@ class MyTestCase(unittest.TestCase): _ = repository.create_track(self.isa_path, metadata) metadata = get_all_track_metadata(self.mom_path) _ = repository.create_track(self.mom_path, metadata) - result_isa = repository.tracks_like_artist(self.isa_artist) + result_isa = repository.tracks_by_artist(self.isa_artist) assert len(result_isa) == 1 assert result_isa[0].artist == self.isa_artist - result_mom = repository.tracks_like_artist(self.mom_artist) + result_mom = repository.tracks_by_artist(self.mom_artist) assert len(result_mom) == 1 assert result_mom[0].artist == self.mom_artist def test_get_track_by_title(self): - metadata = get_all_track_metadata(self.isa_path) - _ = repository.create_track(self.isa_path, metadata) - metadata = get_all_track_metadata(self.mom_path) - _ = repository.create_track(self.mom_path, metadata) + metadata_isa = get_all_track_metadata(self.isa_path) + _ = repository.create_track(self.isa_path, metadata_isa) + metadata_mom = get_all_track_metadata(self.mom_path) + _ = repository.create_track(self.mom_path, metadata_mom) result_isa = repository.tracks_by_title(self.isa_title) assert len(result_isa) == 1 assert result_isa[0].title == self.isa_title @@ -138,6 +138,21 @@ class MyTestCase(unittest.TestCase): assert len(result_mom) == 1 assert result_mom[0].title == self.mom_title + def test_tracks_get_all_tracks(self): + self.create_playlist_model_tracks(playlist_name="test_track_get_all_tracks") + all_tracks = repository.get_all_tracks() + assert len(all_tracks) == 2 + + def test_tracks_by_path(self): + metadata_isa = get_all_track_metadata(self.isa_path) + _ = repository.create_track(self.isa_path, metadata_isa) + metadata_mom = get_all_track_metadata(self.mom_path) + _ = repository.create_track(self.mom_path, metadata_mom) + result_isa = repository.track_by_path(self.isa_path) + assert result_isa.title == self.isa_title + result_mom = repository.track_by_path(self.mom_path) + assert result_mom.title == self.mom_title + def test_move_rows_test1(self): # move row 3 to row 5