# Standard library imports import re # PyQt imports # Third party imports from sqlalchemy import ( func, select, update, ) from sqlalchemy.orm import aliased from sqlalchemy.orm.session import Session from sqlalchemy.sql.elements import BinaryExpression from classes import ApplicationError, PlaylistRowDTO # App imports from classes import PlaylistDTO, TrackDTO import helpers from log import log from models import ( db, NoteColours, Playdates, PlaylistRows, Playlists, Settings, Tracks, ) # Notecolour functions def get_colour(text: str, foreground: bool = False) -> str: """ Parse text and return background (foreground if foreground==True) colour string if matched, else None """ if not text: return "" match = False with db.Session() as session: for rec in NoteColours.get_all(session): if rec.is_regex: flags = re.UNICODE if not rec.is_casesensitive: flags |= re.IGNORECASE p = re.compile(rec.substring, flags) if p.match(text): match = True else: if rec.is_casesensitive: if rec.substring in text: match = True else: if rec.substring.lower() in text.lower(): match = True if match: if foreground: return rec.foreground or "" else: return rec.colour return "" # Track functions def add_track_to_header(playlistrow_id: int, track_id: int) -> None: """ Add a track to this (header) row """ with db.Session() as session: session.execute( update(PlaylistRows) .where(PlaylistRows.id == playlistrow_id) .values(track_id=track_id) ) session.commit() def create_track(path: str) -> TrackDTO: """ Create a track db entry from a track path and return the DTO """ metadata = helpers.get_all_track_metadata(path) with db.Session() as session: try: track = Tracks( session=session, path=str(metadata["path"]), title=str(metadata["title"]), artist=str(metadata["artist"]), duration=int(metadata["duration"]), start_gap=int(metadata["start_gap"]), fade_at=int(metadata["fade_at"]), silence_at=int(metadata["silence_at"]), bitrate=int(metadata["bitrate"]), ) track_id = track.id session.commit() except Exception: raise ApplicationError("Can't create Track") new_track = track_by_id(track_id) if not new_track: raise ApplicationError("Unable to create new track") return new_track def track_by_id(track_id: int) -> TrackDTO | None: """ Return track with specified id """ # Alias PlaydatesTable for subquery LatestPlaydate = aliased(Playdates) # Subquery: latest playdate for each track latest_playdate_subq = ( select( LatestPlaydate.track_id, func.max(LatestPlaydate.lastplayed).label("lastplayed"), ) .group_by(LatestPlaydate.track_id) .subquery() ) stmt = ( select( Tracks.id.label("track_id"), Tracks.artist, Tracks.bitrate, Tracks.duration, Tracks.fade_at, Tracks.intro, Tracks.path, Tracks.silence_at, Tracks.start_gap, Tracks.title, latest_playdate_subq.c.lastplayed, ) .outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id) .where(Tracks.id == track_id) ) with db.Session() as session: record = session.execute(stmt).one_or_none() if not record: return None dto = TrackDTO( artist=record.artist, bitrate=record.bitrate, duration=record.duration, fade_at=record.fade_at, intro=record.intro, lastplayed=record.lastplayed, path=record.path, silence_at=record.silence_at, start_gap=record.start_gap, title=record.title, track_id=record.track_id, ) return dto def _tracks_like(where: BinaryExpression) -> list[TrackDTO]: """ Return tracks selected by where """ # Alias PlaydatesTable for subquery LatestPlaydate = aliased(Playdates) # Subquery: latest playdate for each track latest_playdate_subq = ( select( LatestPlaydate.track_id, func.max(LatestPlaydate.lastplayed).label("lastplayed"), ) .group_by(LatestPlaydate.track_id) .subquery() ) stmt = ( select( Tracks.id.label("track_id"), Tracks.artist, Tracks.bitrate, Tracks.duration, Tracks.fade_at, Tracks.intro, Tracks.path, Tracks.silence_at, Tracks.start_gap, Tracks.title, latest_playdate_subq.c.lastplayed, ) .outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id) .where(where) ) results: list[TrackDTO] = [] with db.Session() as session: records = session.execute(stmt).all() for record in records: dto = TrackDTO( artist=record.artist, bitrate=record.bitrate, duration=record.duration, fade_at=record.fade_at, intro=record.intro, lastplayed=record.lastplayed, path=record.path, silence_at=record.silence_at, start_gap=record.start_gap, title=record.title, track_id=record.track_id, ) results.append(dto) return results def tracks_like_artist(filter_str: str) -> list[TrackDTO]: """ Return tracks where artist is like filter """ return _tracks_like(Tracks.artist.ilike(f"%{filter_str}%")) def tracks_like_title(filter_str: str) -> list[TrackDTO]: """ Return tracks where title is like filter """ return _tracks_like(Tracks.title.ilike(f"%{filter_str}%")) # Playlist functions def _check_row_number_sequence( session: Session, playlist_id: int, fix: bool = False ) -> None: """ The row numbers for any playlist should run from 0 to (length - 1). This function checks that that is the case. If there are errors, 'fix' determines what action is taken. If fix == True: Fix the row numbers and save to database. Log at info level. If fix == False: Log at error level and raise ApplicationError """ errors = False playlist_rows = session.scalars( select(PlaylistRows) .where(PlaylistRows.playlist_id == playlist_id) .order_by(PlaylistRows.row_number) ).all() for idx, playlist_row in enumerate(playlist_rows): if playlist_row.row_number != idx: errors = True msg = f"_check_row_number_sequence({playlist_id=}, {fix=}, {playlist_row=}, {idx=}" if fix: log.info(msg) playlist_row.row_number = idx else: log.error(msg) raise ApplicationError(msg) if errors: session.commit() def _move_rows_down( session: Session, playlist_id: int, starting_row: int, move_by: int ) -> None: """ Create space to insert move_by additional rows by incremented row number from starting_row to end of playlist """ log.debug(f"(_move_rows_down({playlist_id=}, {starting_row=}, {move_by=}") session.execute( update(PlaylistRows) .where( (PlaylistRows.playlist_id == playlist_id), (PlaylistRows.row_number >= starting_row), ) .values(row_number=PlaylistRows.row_number + move_by) ) session.commit() def create_playlist(name: str, template_id: int) -> PlaylistDTO: """ Create playlist and return DTO. """ with db.Session() as session: try: playlist = Playlists(session, name, template_id) playlist_id = playlist.id session.commit() except Exception: raise ApplicationError("Can't create Playlist") new_playlist = playlist_by_id(playlist_id) if not new_playlist: raise ApplicationError("Can't retrieve new Playlist") return new_playlist def get_playlist_row(playlistrow_id: int) -> PlaylistRowDTO | None: """ Return specific row DTO """ # Alias PlaydatesTable for subquery LatestPlaydate = aliased(Playdates) # Subquery: latest playdate for each track latest_playdate_subq = ( select( LatestPlaydate.track_id, func.max(LatestPlaydate.lastplayed).label("lastplayed"), ) .group_by(LatestPlaydate.track_id) .subquery() ) stmt = ( select( PlaylistRows.id.label("playlistrow_id"), PlaylistRows.row_number, PlaylistRows.note, PlaylistRows.played, PlaylistRows.playlist_id, Tracks.id.label("track_id"), Tracks.artist, Tracks.bitrate, Tracks.duration, Tracks.fade_at, Tracks.intro, Tracks.path, Tracks.silence_at, Tracks.start_gap, Tracks.title, latest_playdate_subq.c.lastplayed, ) .outerjoin(Tracks, PlaylistRows.track_id == Tracks.id) .outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id) .where(PlaylistRows.id == playlistrow_id) .order_by(PlaylistRows.row_number) ) with db.Session() as session: record = session.execute(stmt).one_or_none() if not record: return None # Handle cases where track_id is None (no track associated) if record.track_id is None: dto = PlaylistRowDTO( artist="", bitrate=0, duration=0, fade_at=0, intro=None, lastplayed=None, note=record.note, path="", played=record.played, playlist_id=record.playlist_id, playlistrow_id=record.playlistrow_id, row_number=record.row_number, silence_at=0, start_gap=0, title="", track_id=-1, ) else: dto = PlaylistRowDTO( artist=record.artist, bitrate=record.bitrate, duration=record.duration, fade_at=record.fade_at, intro=record.intro, lastplayed=record.lastplayed, note=record.note, path=record.path, played=record.played, playlist_id=record.playlist_id, playlistrow_id=record.playlistrow_id, row_number=record.row_number, silence_at=record.silence_at, start_gap=record.start_gap, title=record.title, track_id=record.track_id, ) return dto def get_playlist_rows(playlist_id: int) -> list[PlaylistRowDTO]: # Alias PlaydatesTable for subquery LatestPlaydate = aliased(Playdates) # Subquery: latest playdate for each track latest_playdate_subq = ( select( LatestPlaydate.track_id, func.max(LatestPlaydate.lastplayed).label("lastplayed"), ) .group_by(LatestPlaydate.track_id) .subquery() ) stmt = ( select( PlaylistRows.id.label("playlistrow_id"), PlaylistRows.row_number, PlaylistRows.note, PlaylistRows.played, PlaylistRows.playlist_id, Tracks.id.label("track_id"), Tracks.artist, Tracks.bitrate, Tracks.duration, Tracks.fade_at, Tracks.intro, Tracks.path, Tracks.silence_at, Tracks.start_gap, Tracks.title, latest_playdate_subq.c.lastplayed, ) .outerjoin(Tracks, PlaylistRows.track_id == Tracks.id) .outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id) .where(PlaylistRows.playlist_id == playlist_id) .order_by(PlaylistRows.row_number) ) with db.Session() as session: results = session.execute(stmt).all() dto_list = [] for row in results: # Handle cases where track_id is None (no track associated) if row.track_id is None: dto = PlaylistRowDTO( artist="", bitrate=0, duration=0, fade_at=0, intro=None, lastplayed=None, note=row.note, path="", played=row.played, playlist_id=row.playlist_id, playlistrow_id=row.playlistrow_id, row_number=row.row_number, silence_at=0, start_gap=0, title="", track_id=-1, # Additional fields like row_fg, row_bg, etc., use default None values ) else: dto = PlaylistRowDTO( artist=row.artist, bitrate=row.bitrate, duration=row.duration, fade_at=row.fade_at, intro=row.intro, lastplayed=row.lastplayed, note=row.note, path=row.path, played=row.played, playlist_id=row.playlist_id, playlistrow_id=row.playlistrow_id, row_number=row.row_number, silence_at=row.silence_at, start_gap=row.start_gap, title=row.title, track_id=row.track_id, # Additional fields like row_fg, row_bg, etc., use default None values ) dto_list.append(dto) return dto_list def insert_row( playlist_id: int, row_number: int, track_id: int | None, note: str ) -> PlaylistRowDTO: """ Insert a new row into playlist and return new row DTO """ with db.Session() as session: # Sanity check _check_row_number_sequence(session=session, playlist_id=playlist_id, fix=False) # Make space for new row _move_rows_down( session=session, playlist_id=playlist_id, starting_row=row_number, move_by=1 ) playlist_row = PlaylistRows.insert_row( session=session, playlist_id=playlist_id, new_row_number=row_number, note=note, track_id=track_id, ) session.commit() playlist_row_id = playlist_row.id # Sanity check _check_row_number_sequence(session=session, playlist_id=playlist_id, fix=False) new_playlist_row = get_playlist_row(playlistrow_id=playlist_row_id) if not new_playlist_row: raise ApplicationError("Can't retrieve new playlist row") return new_playlist_row def playlist_by_id(playlist_id: int) -> PlaylistDTO | None: """ Return playlist with specified id """ stmt = select( Playlists.id.label("playlist_id"), Playlists.name, Playlists.favourite, Playlists.is_template, Playlists.open, ).where(Playlists.id == playlist_id) with db.Session() as session: record = session.execute(stmt).one_or_none() if not record: return None dto = PlaylistDTO( name=record.name, playlist_id=record.playlist_id, favourite=record.favourite, is_template=record.is_template, open=record.open, ) return dto # Misc def get_setting(name: str) -> int | None: """ Get int setting """ with db.Session() as session: record = session.execute(select(Settings).where(Settings.name == name)).one_or_none() if not record: return None return record.f_int def set_setting(name: str, value: int) -> None: """ Add int setting """ with db.Session() as session: record = session.execute(select(Settings).where(Settings.name == name)).one_or_none() if not record: record = Settings(session=session, name=name) if not record: raise ApplicationError("Can't create Settings record") record.f_int = value session.commit()