# Standard library imports import re # PyQt imports # Third party imports from sqlalchemy import ( delete, func, select, update, ) from sqlalchemy.orm import aliased from sqlalchemy.orm.session import Session from sqlalchemy.sql.elements import BinaryExpression, ColumnElement from classes import ApplicationError, PlaylistRowDTO # App imports from classes import PlaylistDTO, TrackDTO from config import Config import helpers from log import log from models import ( db, NoteColours, Playdates, PlaylistRows, Playlists, Settings, Tracks, ) # Notecolour functions def get_colour(text: str, foreground: bool = False) -> str: """ Parse text and return background (foreground if foreground==True) colour string if matched, else None """ if not text: return "" match = False with db.Session() as session: for rec in NoteColours.get_all(session): if rec.is_regex: flags = re.UNICODE if not rec.is_casesensitive: flags |= re.IGNORECASE p = re.compile(rec.substring, flags) if p.match(text): match = True else: if rec.is_casesensitive: if rec.substring in text: match = True else: if rec.substring.lower() in text.lower(): match = True if match: if foreground: return rec.foreground or "" else: return rec.colour return "" # Track functions def add_track_to_header(playlistrow_id: int, track_id: int) -> None: """ Add a track to this (header) row """ with db.Session() as session: session.execute( update(PlaylistRows) .where(PlaylistRows.id == playlistrow_id) .values(track_id=track_id) ) session.commit() def create_track(path: str) -> TrackDTO: """ Create a track db entry from a track path and return the DTO """ metadata = helpers.get_all_track_metadata(path) with db.Session() as session: try: track = Tracks( session=session, path=str(metadata["path"]), title=str(metadata["title"]), artist=str(metadata["artist"]), duration=int(metadata["duration"]), start_gap=int(metadata["start_gap"]), fade_at=int(metadata["fade_at"]), silence_at=int(metadata["silence_at"]), bitrate=int(metadata["bitrate"]), ) track_id = track.id session.commit() except Exception: raise ApplicationError("Can't create Track") new_track = track_by_id(track_id) if not new_track: raise ApplicationError("Unable to create new track") return new_track def get_all_tracks() -> list[TrackDTO]: """Return a list of all tracks""" return _tracks_where(Tracks.id > 0) def track_by_id(track_id: int) -> TrackDTO | None: """ Return track with specified id """ # Alias PlaydatesTable for subquery LatestPlaydate = aliased(Playdates) # Subquery: latest playdate for each track latest_playdate_subq = ( select( LatestPlaydate.track_id, func.max(LatestPlaydate.lastplayed).label("lastplayed"), ) .group_by(LatestPlaydate.track_id) .subquery() ) stmt = ( select( Tracks.id.label("track_id"), Tracks.artist, Tracks.bitrate, Tracks.duration, Tracks.fade_at, Tracks.intro, Tracks.path, Tracks.silence_at, Tracks.start_gap, Tracks.title, latest_playdate_subq.c.lastplayed, ) .outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id) .where(Tracks.id == track_id) ) with db.Session() as session: record = session.execute(stmt).one_or_none() if not record: return None dto = TrackDTO( artist=record.artist, bitrate=record.bitrate, duration=record.duration, fade_at=record.fade_at, intro=record.intro, lastplayed=record.lastplayed, path=record.path, silence_at=record.silence_at, start_gap=record.start_gap, title=record.title, track_id=record.track_id, ) return dto def _tracks_where(where: BinaryExpression | ColumnElement[bool]) -> list[TrackDTO]: """ Return tracks selected by where """ # Alias PlaydatesTable for subquery LatestPlaydate = aliased(Playdates) # Subquery: latest playdate for each track latest_playdate_subq = ( select( LatestPlaydate.track_id, func.max(LatestPlaydate.lastplayed).label("lastplayed"), ) .group_by(LatestPlaydate.track_id) .subquery() ) stmt = ( select( Tracks.id.label("track_id"), Tracks.artist, Tracks.bitrate, Tracks.duration, Tracks.fade_at, Tracks.intro, Tracks.path, Tracks.silence_at, Tracks.start_gap, Tracks.title, latest_playdate_subq.c.lastplayed, ) .outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id) .where(where) ) results: list[TrackDTO] = [] with db.Session() as session: records = session.execute(stmt).all() for record in records: dto = TrackDTO( artist=record.artist, bitrate=record.bitrate, duration=record.duration, fade_at=record.fade_at, intro=record.intro, lastplayed=record.lastplayed, path=record.path, silence_at=record.silence_at, start_gap=record.start_gap, title=record.title, track_id=record.track_id, ) results.append(dto) return results def track_with_path(path: str) -> bool: """ Return True if a track with passed path exists, else False """ with db.Session() as session: track = ( session.execute( select(Tracks) .where(Tracks.path == path) ) .scalars() .one_or_none() ) return track is not None def tracks_like_artist(filter_str: str) -> list[TrackDTO]: """ Return tracks where artist is like filter """ return _tracks_where(Tracks.artist.ilike(f"%{filter_str}%")) def tracks_like_title(filter_str: str) -> list[TrackDTO]: """ Return tracks where title is like filter """ return _tracks_where(Tracks.title.ilike(f"%{filter_str}%")) # Playlist functions def _check_playlist_integrity( session: Session, playlist_id: int, fix: bool = False ) -> None: """ Ensure the row numbers are contiguous. Fix and log if fix==True, else raise ApplicationError. """ playlist_rows = ( session.execute( select(PlaylistRows) .where(PlaylistRows.playlist_id == playlist_id) .order_by(PlaylistRows.row_number) ) .scalars() .all() ) for idx, plr in enumerate(playlist_rows): if plr.row_number == idx: continue msg = ( "_check_playlist_integrity: incorrect row number " f"({plr.id=}, {plr.row_number=}, {idx=})" ) if fix: log.debug(msg) plr.row_number = idx else: raise ApplicationError(msg) def _shift_rows( session: Session, playlist_id: int, starting_row: int, shift_by: int ) -> None: """ Shift rows from starting_row by shift_by. If shift_by is +ve, shift rows down; if -ve, shift them up. """ log.debug(f"(_shift_rows_down({playlist_id=}, {starting_row=}, {shift_by=}") session.execute( update(PlaylistRows) .where( (PlaylistRows.playlist_id == playlist_id), (PlaylistRows.row_number >= starting_row), ) .values(row_number=PlaylistRows.row_number + shift_by) ) def move_rows( from_rows: list[int], from_playlist_id: int, to_row: int, to_playlist_id: int | None = None ) -> None: """ Move rows with or between playlists. Algorithm: - Sanity check row numbers - Check there are no playlist rows with playlist_id == PENDING_MOVE - Put rows to be moved into PENDING_MOVE playlist - Resequence remaining row numbers - Make space for moved rows - Move the PENDING_MOVE rows back and fixup row numbers - Sanity check row numbers """ log.debug( f"move_rows_to_playlist({from_rows=}, {from_playlist_id=}, {to_row=}, {to_playlist_id=})" ) # If to_playlist_id isn't specified, we're moving within the one # playlist. if to_playlist_id is None: to_playlist_id = from_playlist_id with db.Session() as session: # Sanity check row numbers _check_playlist_integrity(session, from_playlist_id, fix=False) if from_playlist_id != to_playlist_id: _check_playlist_integrity(session, to_playlist_id, fix=False) # Check there are no playlist rows with playlist_id == PENDING_MOVE pending_move_rows = get_playlist_rows(Config.PLAYLIST_PENDING_MOVE) if pending_move_rows: raise ApplicationError(f"move_rows_to_playlist: {pending_move_rows=}") # We need playlist length if we're moving within a playlist. Get # that now before we remove rows. from_playlist_length = len(get_playlist_rows(from_playlist_id)) # Put rows to be moved into PENDING_MOVE playlist session.execute( update(PlaylistRows) .where( PlaylistRows.playlist_id == from_playlist_id, PlaylistRows.row_number.in_(from_rows), ) .values(playlist_id=Config.PLAYLIST_PENDING_MOVE) ) # Resequence remaining row numbers _check_playlist_integrity(session, from_playlist_id, fix=True) session.commit() # Make space for moved rows. If moving within one playlist, # determning where to make the space is non-trivial. For example, # if the playlist has ten entries and we're moving four of them # to row 8, after we've moved the rows to the # PLAYLIST_PENDING_MOVE there will only be six entries left. # Clearly we can't make space at row 8... space_row = to_row if to_playlist_id == from_playlist_id: overflow = max(to_row + len(from_rows) - from_playlist_length, 0) if overflow != 0: space_row = ( to_row - overflow - len([a for a in from_rows if a > to_row]) ) _shift_rows(session, to_playlist_id, space_row, len(from_rows)) # Move the PENDING_MOVE rows back and fixup row numbers update_list: list[dict[str, int]] = [] next_row = space_row # PLAYLIST_PENDING_MOVE may have gaps so don't check it for row_to_move in get_playlist_rows( Config.PLAYLIST_PENDING_MOVE, check_playlist_itegrity=False ): update_list.append( {"id": row_to_move.playlistrow_id, "row_number": next_row} ) update_list.append( {"id": row_to_move.playlistrow_id, "playlist_id": to_playlist_id} ) next_row += 1 session.execute(update(PlaylistRows), update_list) session.commit() # Sanity check row numbers _check_playlist_integrity(session, from_playlist_id, fix=False) if from_playlist_id != to_playlist_id: _check_playlist_integrity(session, to_playlist_id, fix=False) def update_playdates(track_id: int) -> None: """ Update playdates for passed track """ with db.Session() as session: _ = Playdates(session, track_id) def update_row_numbers( playlist_id: int, id_to_row_number: list[dict[int, int]] ) -> None: """ Update playlistrows rownumbers for passed playlistrow_ids playlist_id is only needed for sanity checking """ with db.Session() as session: session.execute(update(PlaylistRows), id_to_row_number) session.commit() # Sanity check _check_playlist_integrity(session, playlist_id, fix=False) def create_playlist(name: str, template_id: int) -> PlaylistDTO: """ Create playlist and return DTO. """ with db.Session() as session: try: playlist = Playlists(session, name, template_id) playlist_id = playlist.id session.commit() except Exception: raise ApplicationError("Can't create Playlist") new_playlist = playlist_by_id(playlist_id) if not new_playlist: raise ApplicationError("Can't retrieve new Playlist") return new_playlist def get_playlist_row(playlistrow_id: int) -> PlaylistRowDTO | None: """ Return specific row DTO """ # Alias PlaydatesTable for subquery LatestPlaydate = aliased(Playdates) # Subquery: latest playdate for each track latest_playdate_subq = ( select( LatestPlaydate.track_id, func.max(LatestPlaydate.lastplayed).label("lastplayed"), ) .group_by(LatestPlaydate.track_id) .subquery() ) stmt = ( select( PlaylistRows.id.label("playlistrow_id"), PlaylistRows.row_number, PlaylistRows.note, PlaylistRows.played, PlaylistRows.playlist_id, Tracks.id.label("track_id"), Tracks.artist, Tracks.bitrate, Tracks.duration, Tracks.fade_at, Tracks.intro, Tracks.path, Tracks.silence_at, Tracks.start_gap, Tracks.title, latest_playdate_subq.c.lastplayed, ) .outerjoin(Tracks, PlaylistRows.track_id == Tracks.id) .outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id) .where(PlaylistRows.id == playlistrow_id) .order_by(PlaylistRows.row_number) ) with db.Session() as session: record = session.execute(stmt).one_or_none() if not record: return None # Handle cases where track_id is None (no track associated) if record.track_id is None: dto = PlaylistRowDTO( artist="", bitrate=0, duration=0, fade_at=0, intro=None, lastplayed=None, note=record.note, path="", played=record.played, playlist_id=record.playlist_id, playlistrow_id=record.playlistrow_id, row_number=record.row_number, silence_at=0, start_gap=0, title="", track_id=-1, ) else: dto = PlaylistRowDTO( artist=record.artist, bitrate=record.bitrate, duration=record.duration, fade_at=record.fade_at, intro=record.intro, lastplayed=record.lastplayed, note=record.note, path=record.path, played=record.played, playlist_id=record.playlist_id, playlistrow_id=record.playlistrow_id, row_number=record.row_number, silence_at=record.silence_at, start_gap=record.start_gap, title=record.title, track_id=record.track_id, ) return dto def get_playlist_rows( playlist_id: int, check_playlist_itegrity: bool = True ) -> list[PlaylistRowDTO]: # Alias PlaydatesTable for subquery LatestPlaydate = aliased(Playdates) # Subquery: latest playdate for each track latest_playdate_subq = ( select( LatestPlaydate.track_id, func.max(LatestPlaydate.lastplayed).label("lastplayed"), ) .group_by(LatestPlaydate.track_id) .subquery() ) stmt = ( select( PlaylistRows.id.label("playlistrow_id"), PlaylistRows.row_number, PlaylistRows.note, PlaylistRows.played, PlaylistRows.playlist_id, Tracks.id.label("track_id"), Tracks.artist, Tracks.bitrate, Tracks.duration, Tracks.fade_at, Tracks.intro, Tracks.path, Tracks.silence_at, Tracks.start_gap, Tracks.title, latest_playdate_subq.c.lastplayed, ) .outerjoin(Tracks, PlaylistRows.track_id == Tracks.id) .outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id) .where(PlaylistRows.playlist_id == playlist_id) .order_by(PlaylistRows.row_number) ) with db.Session() as session: results = session.execute(stmt).all() # Sanity check # TODO: would be good to be confident at removing this if check_playlist_itegrity: _check_playlist_integrity( session=session, playlist_id=playlist_id, fix=False ) dto_list = [] for row in results: # Handle cases where track_id is None (no track associated) if row.track_id is None: dto = PlaylistRowDTO( artist="", bitrate=0, duration=0, fade_at=0, intro=None, lastplayed=None, note=row.note, path="", played=row.played, playlist_id=row.playlist_id, playlistrow_id=row.playlistrow_id, row_number=row.row_number, silence_at=0, start_gap=0, title="", track_id=-1, # Additional fields like row_fg, row_bg, etc., use default None values ) else: dto = PlaylistRowDTO( artist=row.artist, bitrate=row.bitrate, duration=row.duration, fade_at=row.fade_at, intro=row.intro, lastplayed=row.lastplayed, note=row.note, path=row.path, played=row.played, playlist_id=row.playlist_id, playlistrow_id=row.playlistrow_id, row_number=row.row_number, silence_at=row.silence_at, start_gap=row.start_gap, title=row.title, track_id=row.track_id, # Additional fields like row_fg, row_bg, etc., use default None values ) dto_list.append(dto) return dto_list def insert_row( playlist_id: int, row_number: int, track_id: int | None, note: str ) -> PlaylistRowDTO: """ Insert a new row into playlist and return new row DTO """ with db.Session() as session: # Sanity check _check_playlist_integrity(session, playlist_id, fix=False) # Make space for new row _shift_rows( session=session, playlist_id=playlist_id, starting_row=row_number, shift_by=1, ) playlist_row = PlaylistRows.insert_row( session=session, playlist_id=playlist_id, new_row_number=row_number, note=note, track_id=track_id, ) session.commit() playlist_row_id = playlist_row.id # Sanity check _check_playlist_integrity(session, playlist_id, fix=False) new_playlist_row = get_playlist_row(playlistrow_id=playlist_row_id) if not new_playlist_row: raise ApplicationError("Can't retrieve new playlist row") return new_playlist_row def remove_rows(playlist_id: int, row_numbers: list[int]) -> None: """ Remove rows from playlist Delete from highest row back so that not yet deleted row numbers don't change. """ log.debug(f"remove_rows({playlist_id=}, {row_numbers=}") with db.Session() as session: for row_number in sorted(row_numbers, reverse=True): session.execute( delete(PlaylistRows).where( PlaylistRows.playlist_id == playlist_id, PlaylistRows.row_number == row_number, ) ) # Fixup row number to remove gaps _check_playlist_integrity(session, playlist_id, fix=True) session.commit() def playlist_by_id(playlist_id: int) -> PlaylistDTO | None: """ Return playlist with specified id """ stmt = select( Playlists.id.label("playlist_id"), Playlists.name, Playlists.favourite, Playlists.is_template, Playlists.open, ).where(Playlists.id == playlist_id) with db.Session() as session: record = session.execute(stmt).one_or_none() if not record: return None dto = PlaylistDTO( name=record.name, playlist_id=record.playlist_id, favourite=record.favourite, is_template=record.is_template, open=record.open, ) return dto # Misc def get_setting(name: str) -> int | None: """ Get int setting """ with db.Session() as session: record = session.execute( select(Settings).where(Settings.name == name) ).scalars().one_or_none() if not record: return None return record.f_int def set_setting(name: str, value: int) -> None: """ Add int setting """ with db.Session() as session: record = session.execute( select(Settings).where(Settings.name == name) ).scalars().one_or_none() if not record: record = Settings(session=session, name=name) if not record: raise ApplicationError("Can't create Settings record") record.f_int = value session.commit()