# Standard library imports import re # PyQt imports # Third party imports from sqlalchemy import ( delete, func, select, update, ) from sqlalchemy.orm import aliased from sqlalchemy.orm.session import Session from sqlalchemy.sql.elements import BinaryExpression, ColumnElement from classes import ApplicationError, PlaylistRowDTO # App imports from classes import PlaylistDTO, TrackDTO import helpers from log import log from models import ( db, NoteColours, Playdates, PlaylistRows, Playlists, Settings, Tracks, ) # Notecolour functions def get_colour(text: str, foreground: bool = False) -> str: """ Parse text and return background (foreground if foreground==True) colour string if matched, else None """ if not text: return "" match = False with db.Session() as session: for rec in NoteColours.get_all(session): if rec.is_regex: flags = re.UNICODE if not rec.is_casesensitive: flags |= re.IGNORECASE p = re.compile(rec.substring, flags) if p.match(text): match = True else: if rec.is_casesensitive: if rec.substring in text: match = True else: if rec.substring.lower() in text.lower(): match = True if match: if foreground: return rec.foreground or "" else: return rec.colour return "" # Track functions def add_track_to_header(playlistrow_id: int, track_id: int) -> None: """ Add a track to this (header) row """ with db.Session() as session: session.execute( update(PlaylistRows) .where(PlaylistRows.id == playlistrow_id) .values(track_id=track_id) ) session.commit() def create_track(path: str) -> TrackDTO: """ Create a track db entry from a track path and return the DTO """ metadata = helpers.get_all_track_metadata(path) with db.Session() as session: try: track = Tracks( session=session, path=str(metadata["path"]), title=str(metadata["title"]), artist=str(metadata["artist"]), duration=int(metadata["duration"]), start_gap=int(metadata["start_gap"]), fade_at=int(metadata["fade_at"]), silence_at=int(metadata["silence_at"]), bitrate=int(metadata["bitrate"]), ) track_id = track.id session.commit() except Exception: raise ApplicationError("Can't create Track") new_track = track_by_id(track_id) if not new_track: raise ApplicationError("Unable to create new track") return new_track def get_all_tracks() -> list[TrackDTO]: """Return a list of all tracks""" return _tracks_where(Tracks.id > 0) def track_by_id(track_id: int) -> TrackDTO | None: """ Return track with specified id """ # Alias PlaydatesTable for subquery LatestPlaydate = aliased(Playdates) # Subquery: latest playdate for each track latest_playdate_subq = ( select( LatestPlaydate.track_id, func.max(LatestPlaydate.lastplayed).label("lastplayed"), ) .group_by(LatestPlaydate.track_id) .subquery() ) stmt = ( select( Tracks.id.label("track_id"), Tracks.artist, Tracks.bitrate, Tracks.duration, Tracks.fade_at, Tracks.intro, Tracks.path, Tracks.silence_at, Tracks.start_gap, Tracks.title, latest_playdate_subq.c.lastplayed, ) .outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id) .where(Tracks.id == track_id) ) with db.Session() as session: record = session.execute(stmt).one_or_none() if not record: return None dto = TrackDTO( artist=record.artist, bitrate=record.bitrate, duration=record.duration, fade_at=record.fade_at, intro=record.intro, lastplayed=record.lastplayed, path=record.path, silence_at=record.silence_at, start_gap=record.start_gap, title=record.title, track_id=record.track_id, ) return dto def _tracks_where(where: BinaryExpression | ColumnElement[bool]) -> list[TrackDTO]: """ Return tracks selected by where """ # Alias PlaydatesTable for subquery LatestPlaydate = aliased(Playdates) # Subquery: latest playdate for each track latest_playdate_subq = ( select( LatestPlaydate.track_id, func.max(LatestPlaydate.lastplayed).label("lastplayed"), ) .group_by(LatestPlaydate.track_id) .subquery() ) stmt = ( select( Tracks.id.label("track_id"), Tracks.artist, Tracks.bitrate, Tracks.duration, Tracks.fade_at, Tracks.intro, Tracks.path, Tracks.silence_at, Tracks.start_gap, Tracks.title, latest_playdate_subq.c.lastplayed, ) .outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id) .where(where) ) results: list[TrackDTO] = [] with db.Session() as session: records = session.execute(stmt).all() for record in records: dto = TrackDTO( artist=record.artist, bitrate=record.bitrate, duration=record.duration, fade_at=record.fade_at, intro=record.intro, lastplayed=record.lastplayed, path=record.path, silence_at=record.silence_at, start_gap=record.start_gap, title=record.title, track_id=record.track_id, ) results.append(dto) return results def tracks_like_artist(filter_str: str) -> list[TrackDTO]: """ Return tracks where artist is like filter """ return _tracks_where(Tracks.artist.ilike(f"%{filter_str}%")) def tracks_like_title(filter_str: str) -> list[TrackDTO]: """ Return tracks where title is like filter """ return _tracks_where(Tracks.title.ilike(f"%{filter_str}%")) # Playlist functions def _move_rows( session: Session, playlist_id: int, starting_row: int, move_by: int ) -> None: """ Move rows from starting_row by move_by. If move_by is +ve, move rows down; if -ve, move them up. """ log.debug(f"(_move_rows_down({playlist_id=}, {starting_row=}, {move_by=}") session.execute( update(PlaylistRows) .where( (PlaylistRows.playlist_id == playlist_id), (PlaylistRows.row_number >= starting_row), ) .values(row_number=PlaylistRows.row_number + move_by) ) session.commit() def move_rows_to_playlist( from_rows: list[int], from_playlist_id: int, to_row: int, to_playlist_id: int ) -> None: """ Move rows between playlists. """ with db.Session() as session: # Prepare desination playlist # Find last used row last_row = session.execute( select(func.max(PlaylistRows.row_number)).where( PlaylistRows.playlist_id == to_playlist_id ) ).scalar_one() if last_row is None: last_row = -1 # Make room in destination if to_row <= last_row: _move_rows(session, to_playlist_id, to_row, len(from_rows)) # Move rows row_offset = to_row - min(from_rows) stmt = ( update(PlaylistRows) .where( PlaylistRows.playlist_id == from_playlist_id, PlaylistRows.row_number.in_(from_rows) ) .values( playlist_id=to_playlist_id, row_number=PlaylistRows.row_number + row_offset ) ) session.execute(stmt) # Remove gaps in source _move_rows(session=session, playlist_id=from_playlist_id, starting_row=max(from_rows) + 1, move_by=(len(from_rows) * -1) ) # Commit changes session.commit() # Sanity check _check_playlist_integrity(session, get_playlist_rows(from_playlist_id), fix=False) _check_playlist_integrity(session, get_playlist_rows(to_playlist_id), fix=False) def move_rows_within_playlist(playlist_id: int, from_rows: list[int], to_row: int) -> None: """ Move rows within a playlist. """ log.debug(f"move_rows_within_playlist({playlist_id=}, {from_rows=}, {to_row=})") playlistrows_dto = get_playlist_rows(playlist_id) new_order: dict[int, int | None] = dict.fromkeys(range(len(playlistrows_dto))) # Populate new_order with moved rows next_row = to_row # We need to keep, where possible, the rows after to_row unmoved if to_row + len(from_rows) > len(playlistrows_dto): next_row = max(to_row - len(from_rows) + 1, 0) for from_row in from_rows: new_order[next_row] = from_row next_row += 1 # Move remaining rows remaining_rows = set(new_order.keys()) - set(from_rows) next_row = 0 for row in remaining_rows: while new_order[next_row] is not None: next_row += 1 new_order[next_row] = row next_row += 1 # Sanity check if None in new_order: raise ApplicationError(f"None remains after move: {new_order=}") # Update database # Build a list of dicts of (id: value, row_number: value} update_list = [] for new_row_number, old_row_number in new_order.items(): plrid = [a.playlistrow_id for a in playlistrows_dto if a.row_number == old_row_number][0] update_list.append(dict(id=plrid, row_number=new_row_number)) # Update rows with db.Session() as session: session.execute(update(PlaylistRows), update_list) session.commit() # Sanity check _check_playlist_integrity(session, get_playlist_rows(playlist_id), fix=False) def create_playlist(name: str, template_id: int) -> PlaylistDTO: """ Create playlist and return DTO. """ with db.Session() as session: try: playlist = Playlists(session, name, template_id) playlist_id = playlist.id session.commit() except Exception: raise ApplicationError("Can't create Playlist") new_playlist = playlist_by_id(playlist_id) if not new_playlist: raise ApplicationError("Can't retrieve new Playlist") return new_playlist def get_playlist_row(playlistrow_id: int) -> PlaylistRowDTO | None: """ Return specific row DTO """ # Alias PlaydatesTable for subquery LatestPlaydate = aliased(Playdates) # Subquery: latest playdate for each track latest_playdate_subq = ( select( LatestPlaydate.track_id, func.max(LatestPlaydate.lastplayed).label("lastplayed"), ) .group_by(LatestPlaydate.track_id) .subquery() ) stmt = ( select( PlaylistRows.id.label("playlistrow_id"), PlaylistRows.row_number, PlaylistRows.note, PlaylistRows.played, PlaylistRows.playlist_id, Tracks.id.label("track_id"), Tracks.artist, Tracks.bitrate, Tracks.duration, Tracks.fade_at, Tracks.intro, Tracks.path, Tracks.silence_at, Tracks.start_gap, Tracks.title, latest_playdate_subq.c.lastplayed, ) .outerjoin(Tracks, PlaylistRows.track_id == Tracks.id) .outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id) .where(PlaylistRows.id == playlistrow_id) .order_by(PlaylistRows.row_number) ) with db.Session() as session: record = session.execute(stmt).one_or_none() if not record: return None # Handle cases where track_id is None (no track associated) if record.track_id is None: dto = PlaylistRowDTO( artist="", bitrate=0, duration=0, fade_at=0, intro=None, lastplayed=None, note=record.note, path="", played=record.played, playlist_id=record.playlist_id, playlistrow_id=record.playlistrow_id, row_number=record.row_number, silence_at=0, start_gap=0, title="", track_id=-1, ) else: dto = PlaylistRowDTO( artist=record.artist, bitrate=record.bitrate, duration=record.duration, fade_at=record.fade_at, intro=record.intro, lastplayed=record.lastplayed, note=record.note, path=record.path, played=record.played, playlist_id=record.playlist_id, playlistrow_id=record.playlistrow_id, row_number=record.row_number, silence_at=record.silence_at, start_gap=record.start_gap, title=record.title, track_id=record.track_id, ) return dto def _check_playlist_integrity( session: Session, playlist_rows: list[PlaylistRowDTO], fix: bool = False ) -> None: """ Ensure the row numbers are contiguous. Fix and log if fix==True, else raise ApplicationError. """ for idx, plr in enumerate(playlist_rows): if plr.row_number == idx: continue msg = f"_check_playlist_integrity: incorrect row number ({plr.playlistrow_id=}, {idx=})" if fix: log.debug(msg) plr.row_number = idx session.commit() else: raise ApplicationError(msg) def get_playlist_rows(playlist_id: int) -> list[PlaylistRowDTO]: # Alias PlaydatesTable for subquery LatestPlaydate = aliased(Playdates) # Subquery: latest playdate for each track latest_playdate_subq = ( select( LatestPlaydate.track_id, func.max(LatestPlaydate.lastplayed).label("lastplayed"), ) .group_by(LatestPlaydate.track_id) .subquery() ) stmt = ( select( PlaylistRows.id.label("playlistrow_id"), PlaylistRows.row_number, PlaylistRows.note, PlaylistRows.played, PlaylistRows.playlist_id, Tracks.id.label("track_id"), Tracks.artist, Tracks.bitrate, Tracks.duration, Tracks.fade_at, Tracks.intro, Tracks.path, Tracks.silence_at, Tracks.start_gap, Tracks.title, latest_playdate_subq.c.lastplayed, ) .outerjoin(Tracks, PlaylistRows.track_id == Tracks.id) .outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id) .where(PlaylistRows.playlist_id == playlist_id) .order_by(PlaylistRows.row_number) ) with db.Session() as session: results = session.execute(stmt).all() # Sanity check # TODO: would be good to be confident at removing this _check_playlist_integrity(session=session, playlist_rows=results, fix=False) dto_list = [] for row in results: # Handle cases where track_id is None (no track associated) if row.track_id is None: dto = PlaylistRowDTO( artist="", bitrate=0, duration=0, fade_at=0, intro=None, lastplayed=None, note=row.note, path="", played=row.played, playlist_id=row.playlist_id, playlistrow_id=row.playlistrow_id, row_number=row.row_number, silence_at=0, start_gap=0, title="", track_id=-1, # Additional fields like row_fg, row_bg, etc., use default None values ) else: dto = PlaylistRowDTO( artist=row.artist, bitrate=row.bitrate, duration=row.duration, fade_at=row.fade_at, intro=row.intro, lastplayed=row.lastplayed, note=row.note, path=row.path, played=row.played, playlist_id=row.playlist_id, playlistrow_id=row.playlistrow_id, row_number=row.row_number, silence_at=row.silence_at, start_gap=row.start_gap, title=row.title, track_id=row.track_id, # Additional fields like row_fg, row_bg, etc., use default None values ) dto_list.append(dto) return dto_list def insert_row( playlist_id: int, row_number: int, track_id: int | None, note: str ) -> PlaylistRowDTO: """ Insert a new row into playlist and return new row DTO """ with db.Session() as session: # Sanity check _check_playlist_integrity(session, get_playlist_rows(playlist_id), fix=False) # Make space for new row _move_rows( session=session, playlist_id=playlist_id, starting_row=row_number, move_by=1 ) playlist_row = PlaylistRows.insert_row( session=session, playlist_id=playlist_id, new_row_number=row_number, note=note, track_id=track_id, ) session.commit() playlist_row_id = playlist_row.id # Sanity check _check_playlist_integrity(session, get_playlist_rows(playlist_id), fix=False) new_playlist_row = get_playlist_row(playlistrow_id=playlist_row_id) if not new_playlist_row: raise ApplicationError("Can't retrieve new playlist row") return new_playlist_row def remove_rows(playlist_id: int, row_numbers: list[int]) -> None: """ Remove rows from playlist Delete from highest row back so that not yet deleted row numbers don't change. """ log.debug(f"remove_rows({playlist_id=}, {row_numbers=}") with db.Session() as session: for row_number in sorted(row_numbers, reverse=True): session.execute( delete(PlaylistRows).where( PlaylistRows.playlist_id == playlist_id, PlaylistRows.row_number == row_number, ) ) # Fixup row number to remove gaps _check_playlist_integrity(session, playlist_id, fix=True) session.commit() def playlist_by_id(playlist_id: int) -> PlaylistDTO | None: """ Return playlist with specified id """ stmt = select( Playlists.id.label("playlist_id"), Playlists.name, Playlists.favourite, Playlists.is_template, Playlists.open, ).where(Playlists.id == playlist_id) with db.Session() as session: record = session.execute(stmt).one_or_none() if not record: return None dto = PlaylistDTO( name=record.name, playlist_id=record.playlist_id, favourite=record.favourite, is_template=record.is_template, open=record.open, ) return dto # Misc def get_setting(name: str) -> int | None: """ Get int setting """ with db.Session() as session: record = session.execute( select(Settings).where(Settings.name == name) ).one_or_none() if not record: return None return record.f_int def set_setting(name: str, value: int) -> None: """ Add int setting """ with db.Session() as session: record = session.execute( select(Settings).where(Settings.name == name) ).one_or_none() if not record: record = Settings(session=session, name=name) if not record: raise ApplicationError("Can't create Settings record") record.f_int = value session.commit()