#!/usr/bin/python3 import os.path import re import stackprinter # type: ignore from dbconfig import Session from datetime import datetime from typing import List, Optional from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy import ( Boolean, Column, DateTime, delete, Float, ForeignKey, func, Integer, select, String, update, ) from sqlalchemy.orm import ( declarative_base, relationship, ) from sqlalchemy.orm.exc import ( NoResultFound ) from config import Config from helpers import ( fade_point, get_audio_segment, get_tags, leading_silence, trailing_silence, ) from log import log Base = declarative_base() # Database classes class Carts(Base): __tablename__ = 'carts' id: int = Column(Integer, primary_key=True, autoincrement=True) cart_number = Column(Integer, nullable=False, unique=True) name = Column(String(256), index=True) duration = Column(Integer, index=True) path = Column(String(2048), index=False) enabled = Column(Boolean, default=False, nullable=False) def __repr__(self) -> str: return ( f"" ) def __init__(self, session: Session, cart_number: int, name: str = None, duration: int = None, path: str = None, enabled: bool = True) -> None: """Create new cart""" self.cart_number = cart_number self.name = name self.duration = duration self.path = path self.enabled = enabled session.add(self) session.commit() class NoteColours(Base): __tablename__ = 'notecolours' id = Column(Integer, primary_key=True, autoincrement=True) substring = Column(String(256), index=False) colour = Column(String(21), index=False) enabled = Column(Boolean, default=True, index=True) is_regex = Column(Boolean, default=False, index=False) is_casesensitive = Column(Boolean, default=False, index=False) order = Column(Integer, index=True) def __repr__(self) -> str: return ( f"" ) @staticmethod def get_colour(session: Session, text: str) -> Optional[str]: """ Parse text and return colour string if matched, else None """ if not text: return None for rec in session.execute( select(NoteColours) .filter(NoteColours.enabled.is_(True)) .order_by(NoteColours.order) ).scalars().all(): if rec.is_regex: flags = re.UNICODE if not rec.is_casesensitive: flags |= re.IGNORECASE p = re.compile(rec.substring, flags) if p.match(text): return rec.colour else: if rec.is_casesensitive: if rec.substring in text: return rec.colour else: if rec.substring.lower() in text.lower(): return rec.colour return None class Playdates(Base): __tablename__ = 'playdates' id: int = Column(Integer, primary_key=True, autoincrement=True) lastplayed = Column(DateTime, index=True, default=None) track_id = Column(Integer, ForeignKey('tracks.id')) track = relationship("Tracks", back_populates="playdates") def __repr__(self) -> str: return ( f"" ) def __init__(self, session: Session, track_id: int) -> None: """Record that track was played""" self.lastplayed = datetime.now() self.track_id = track_id session.add(self) session.commit() @staticmethod def last_played(session: Session, track_id: int) -> Optional[datetime]: """Return datetime track last played or None""" last_played = session.execute( select(Playdates.lastplayed) .where(Playdates.track_id == track_id) .order_by(Playdates.lastplayed.desc()) .limit(1) ).first() if last_played: return last_played[0] else: return None @staticmethod def played_after(session: Session, since: datetime) -> List["Playdates"]: """Return a list of Playdates objects since passed time""" return ( session.execute( select(Playdates) .where(Playdates.lastplayed >= since) .order_by(Playdates.lastplayed) ) .scalars() .all() ) class Playlists(Base): """ Manage playlists """ __tablename__ = "playlists" id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(32), nullable=False, unique=True) last_used = Column(DateTime, default=None, nullable=True) tab = Column(Integer, default=None, nullable=True, unique=True) sort_column = Column(Integer, default=None, nullable=True, unique=False) is_template = Column(Boolean, default=False, nullable=False) query = Column(String(256), default=None, nullable=True, unique=False) deleted = Column(Boolean, default=False, nullable=False) rows = relationship( "PlaylistRows", back_populates="playlist", cascade="all, delete-orphan", order_by="PlaylistRows.row_number" ) def __repr__(self) -> str: return ( f"" ) def __init__(self, session: Session, name: str) -> None: self.name = name session.add(self) session.commit() def close(self, session: Session) -> None: """Mark playlist as unloaded""" closed_idx = self.tab self.tab = None # Closing this tab will mean all higher-number tabs have moved # down by one session.execute( update(Playlists) .where(Playlists.tab > closed_idx) .values(tab=Playlists.tab - 1) ) @classmethod def create_playlist_from_template(cls, session: Session, template: "Playlists", playlist_name: str) \ -> "Playlists": """Create a new playlist from template""" playlist = cls(session, playlist_name) PlaylistRows.copy_playlist(session, template.id, playlist.id) return playlist @classmethod def get_all(cls, session: Session) -> List["Playlists"]: """Returns a list of all playlists ordered by last use""" return ( session.execute( select(cls) .filter(cls.is_template.is_(False)) .order_by(cls.tab.desc(), cls.last_used.desc()) ) .scalars() .all() ) @classmethod def get_all_templates(cls, session: Session) -> List["Playlists"]: """Returns a list of all templates ordered by name""" return ( session.execute( select(cls) .filter(cls.is_template.is_(True)) .order_by(cls.name) ) .scalars() .all() ) @classmethod def get_closed(cls, session: Session) -> List["Playlists"]: """Returns a list of all closed playlists ordered by last use""" return ( session.execute( select(cls) .filter( cls.tab.is_(None), cls.is_template.is_(False) ) .order_by(cls.last_used.desc()) ) .scalars() .all() ) @classmethod def get_open(cls, session: Session) -> List[Optional["Playlists"]]: """ Return a list of loaded playlists ordered by tab order. """ return ( session.execute( select(cls) .where(cls.tab.is_not(None)) .order_by(cls.tab) ) .scalars() .all() ) def mark_open(self, session: Session, tab_index: int) -> None: """Mark playlist as loaded and used now""" self.tab = tab_index self.last_used = datetime.now() @staticmethod def move_tab(session: Session, frm: int, to: int) -> None: """Move tabs""" row_frm = session.execute( select(Playlists) .filter_by(tab=frm) ).scalar_one() row_to = session.execute( select(Playlists) .filter_by(tab=to) ).scalar_one() row_frm.tab = None row_to.tab = None session.commit() row_to.tab = frm row_frm.tab = to @staticmethod def save_as_template(session: Session, playlist_id: int, template_name: str) -> None: """Save passed playlist as new template""" template = Playlists(session, template_name) template.is_template = True session.commit() PlaylistRows.copy_playlist(session, playlist_id, template.id) class PlaylistRows(Base): __tablename__ = 'playlist_rows' id = Column(Integer, primary_key=True, autoincrement=True) row_number = Column(Integer, nullable=False) note = Column(String(2048), index=False) playlist_id = Column(Integer, ForeignKey('playlists.id'), nullable=False) playlist = relationship(Playlists, back_populates="rows") track_id = Column(Integer, ForeignKey('tracks.id'), nullable=True) track = relationship("Tracks", back_populates="playlistrows") played = Column(Boolean, nullable=False, index=False, default=False) def __repr__(self) -> str: return ( f"" ) def __init__(self, session: Session, playlist_id: int, track_id: int, row_number: int, note: Optional[str] = None ) -> None: """Create PlaylistRows object""" self.playlist_id = playlist_id self.track_id = track_id self.row_number = row_number self.note = note session.add(self) session.flush() @staticmethod def copy_playlist(session: Session, src_id: int, dst_id: int) -> None: """Copy playlist entries""" src_rows = session.execute( select(PlaylistRows) .filter(PlaylistRows.playlist_id == src_id) ).scalars().all() for plr in src_rows: PlaylistRows(session, dst_id, plr.track_id, plr.row_number, plr.note) @staticmethod def delete_plrids_not_in_list(session: Session, playlist_id: int, plrids: List["PlaylistRows"]) -> None: """ Delete rows in given playlist that have a higher row number than 'maxrow' """ session.execute( delete(PlaylistRows) .where( PlaylistRows.playlist_id == playlist_id, PlaylistRows.id.not_in(plrids) ) ) # Delete won't take effect until commit() session.commit() @staticmethod def fixup_rownumbers(session: Session, playlist_id: int) -> None: """ Ensure the row numbers for passed playlist have no gaps """ plrs = session.execute( select(PlaylistRows) .where(PlaylistRows.playlist_id == playlist_id) .order_by(PlaylistRows.row_number) ).scalars().all() for i, plr in enumerate(plrs): plr.row_number = i # Ensure new row numbers are available to the caller session.commit() @staticmethod def get_track_plr(session: Session, track_id: int, playlist_id: int) -> Optional["PlaylistRows"]: """Return first matching PlaylistRows object or None""" return session.scalars( select(PlaylistRows) .where( PlaylistRows.track_id == track_id, PlaylistRows.playlist_id == playlist_id ) .limit(1) ).first() @staticmethod def get_last_used_row(session: Session, playlist_id: int) -> Optional[int]: """Return the last used row for playlist, or None if no rows""" return session.execute( select(func.max(PlaylistRows.row_number)) .where(PlaylistRows.playlist_id == playlist_id) ).scalar_one() @classmethod def get_played_rows(cls, session: Session, playlist_id: int) -> List[int]: """ For passed playlist, return a list of rows that have been played. """ plrs = session.execute( select(cls) .where( cls.playlist_id == playlist_id, cls.played.is_(True) ) .order_by(cls.row_number) ).scalars().all() return plrs @classmethod def get_rows_with_tracks(cls, session: Session, playlist_id: int) -> List[int]: """ For passed playlist, return a list of rows that contain tracks """ plrs = session.execute( select(cls) .where( cls.playlist_id == playlist_id, cls.track_id.is_not(None) ) .order_by(cls.row_number) ).scalars().all() return plrs @classmethod def get_unplayed_rows(cls, session: Session, playlist_id: int) -> List[int]: """ For passed playlist, return a list of track rows that have not been played. """ plrs = session.execute( select(cls) .where( cls.playlist_id == playlist_id, cls.track_id.is_not(None), cls.played.is_(False) ) .order_by(cls.row_number) ).scalars().all() return plrs @staticmethod def move_rows_down(session: Session, playlist_id: int, starting_row: int, move_by: int) -> None: """ Create space to insert move_by additional rows by incremented row number from starting_row to end of playlist """ session.execute( update(PlaylistRows) .where( (PlaylistRows.playlist_id == playlist_id), (PlaylistRows.row_number >= starting_row) ) .values(row_number=PlaylistRows.row_number + move_by) ) @staticmethod def indexed_by_id(session: Session, plr_ids: List[int]) -> dict: """ Return a dictionary of playlist_rows indexed by their plr id from the passed plr_id list. """ plrs = session.execute( select(PlaylistRows) .where( PlaylistRows.id.in_(plr_ids) ) ).scalars().all() result = {} for plr in plrs: result[plr.id] = plr return result class Settings(Base): """Manage settings""" __tablename__ = 'settings' id: int = Column(Integer, primary_key=True, autoincrement=True) name: str = Column(String(64), nullable=False, unique=True) f_datetime = Column(DateTime, default=None, nullable=True) f_int: int = Column(Integer, default=None, nullable=True) f_string = Column(String(128), default=None, nullable=True) def __repr__(self) -> str: value = self.f_datetime or self.f_int or self.f_string return f"" @classmethod def get_int_settings(cls, session: Session, name: str) -> "Settings": """Get setting for an integer or return new setting record""" int_setting: Settings try: int_setting = session.execute( select(cls) .where(cls.name == name) ).scalar_one() except NoResultFound: int_setting = Settings() int_setting.name = name int_setting.f_int = None session.add(int_setting) return int_setting def update(self, session: Session, data: "Settings"): for key, value in data.items(): assert hasattr(self, key) setattr(self, key, value) session.flush() class Tracks(Base): __tablename__ = 'tracks' id: int = Column(Integer, primary_key=True, autoincrement=True) title = Column(String(256), index=True) artist = Column(String(256), index=True) duration = Column(Integer, index=True) start_gap = Column(Integer, index=False) fade_at = Column(Integer, index=False) silence_at = Column(Integer, index=False) path = Column(String(2048), index=False, nullable=False, unique=True) mtime = Column(Float, index=True) bitrate = Column(Integer, nullable=True, default=None) playlistrows = relationship("PlaylistRows", back_populates="track") playlists = association_proxy("playlistrows", "playlist") playdates = relationship("Playdates", back_populates="track") def __repr__(self) -> str: return ( f"" ) def __init__( self, session: Session, path: str, title: Optional[str] = None, artist: Optional[str] = None, duration: int = 0, start_gap: int = 0, fade_at: Optional[int] = None, silence_at: Optional[int] = None, mtime: Optional[float] = None, lastplayed: Optional[datetime] = None, ) -> None: self.path = path self.title = title self.artist = artist self.duration = duration self.start_gap = start_gap self.fade_at = fade_at self.silence_at = silence_at self.mtime = mtime self.lastplayed = lastplayed session.add(self) session.commit() @classmethod def get_all(cls, session) -> List["Tracks"]: """Return a list of all tracks""" return session.execute(select(cls)).scalars().all() @classmethod def get_by_path(cls, session: Session, path: str) -> "Tracks": """ Return track with passed path, or None. """ try: return ( session.execute( select(Tracks) .where(Tracks.path == path) ).scalar_one() ) except NoResultFound: return None @classmethod def search_artists(cls, session: Session, text: str) -> List["Tracks"]: """Search case-insenstively for artists containing str""" return ( session.execute( select(cls) .where(cls.artist.ilike(f"%{text}%")) .order_by(cls.title) ) .scalars() .all() ) @classmethod def search_titles(cls, session: Session, text: str) -> List["Tracks"]: """Search case-insenstively for titles containing str""" return ( session.execute( select(cls) .where(cls.title.ilike(f"%{text}%")) .order_by(cls.title) ) .scalars() .all() )