#!/usr/bin/python3 import os.path import re import sqlalchemy from datetime import datetime from typing import List, Optional from pydub import AudioSegment from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta from sqlalchemy import ( Boolean, Column, DateTime, Float, ForeignKey, Integer, String, func ) from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import ( backref, relationship, sessionmaker, scoped_session, RelationshipProperty ) from sqlalchemy.orm.collections import attribute_mapped_collection from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound from config import Config from helpers import ( fade_point, get_audio_segment, leading_silence, trailing_silence, ) from log import DEBUG, ERROR # Create session at the global level as per # https://docs.sqlalchemy.org/en/13/orm/session_basics.html # and make objects persistent # https://docs.sqlalchemy.org/en/14/orm/session_state_management.html engine = sqlalchemy.create_engine( f"{Config.MYSQL_CONNECT}?charset=utf8", encoding='utf-8', echo=Config.DISPLAY_SQL, pool_pre_ping=True) # Create a Session factory Session = scoped_session(sessionmaker(bind=engine)) # sm: sessionmaker = sessionmaker(bind=engine) # , expire_on_commit=False) # Session = scoped_session(sm) Base: DeclarativeMeta = declarative_base() Base.metadata.create_all(engine) def db_init(): return # Database classes class NoteColours(Base): __tablename__ = 'notecolours' id: int = Column(Integer, primary_key=True, autoincrement=True) substring: str = Column(String(256), index=False) colour: str = Column(String(21), index=False) enabled: bool = Column(Boolean, default=True, index=True) is_regex: bool = Column(Boolean, default=False, index=False) is_casesensitive: bool = Column(Boolean, default=False, index=False) order: int = Column(Integer, index=True) def __init__( self, session: Session, substring: str, colour: str, enabled: bool = True, is_regex: bool = False, is_casesensitive: bool = False, order: int = 0) -> None: self.substring = substring self.colour = colour self.enabled = enabled self.is_regex = is_regex self.is_casesensitive = is_casesensitive self.order = order session.add(self) session.commit() def __repr__(self) -> str: return ( f"" ) @classmethod def get_all(cls, session: Session) -> Optional[List["NoteColours"]]: """Return all records""" return session.query(cls).all() @classmethod def get_by_id(cls, session: Session, note_id: int) -> \ Optional["NoteColours"]: """Return record identified by id, or None if not found""" return session.query(NoteColours).filter( NoteColours.id == note_id).first() @staticmethod def get_colour(session: Session, text: str) -> Optional[str]: """ Parse text and return colour string if matched, else None """ for rec in ( session.query(NoteColours) .filter(NoteColours.enabled.is_(True)) .order_by(NoteColours.order) .all() ): if rec.is_regex: flags = re.UNICODE if not rec.is_casesensitive: flags |= re.IGNORECASE p = re.compile(rec.substring, flags) if p.match(text): return rec.colour else: if rec.is_casesensitive: if rec.substring in text: return rec.colour else: if rec.substring.lower() in text.lower(): return rec.colour return None class Notes(Base): __tablename__ = 'notes' id: int = Column(Integer, primary_key=True, autoincrement=True) playlist_id: int = Column(Integer, ForeignKey('playlists.id')) playlist: RelationshipProperty = relationship( "Playlists", back_populates="notes", lazy="joined") row: int = Column(Integer, nullable=False) note: str = Column(String(256), index=False) def __init__( self, session: Session, playlist_id: int, row: int, text: str) -> None: """Create note""" DEBUG(f"Notes.__init__({playlist_id=}, {row=}, {text=})") self.playlist_id = playlist_id self.row = row self.note = text session.add(self) session.commit() def __repr__(self) -> str: return ( f"" ) def delete_note(self, session: Session) -> None: """Delete note""" DEBUG(f"delete_note({self.id=}") session.query(Notes).filter_by(id=self.id).delete() session.commit() def update_note( self, session: Session, row: int, text: Optional[str] = None) -> None: """ Update note details. If text=None, don't change text. """ DEBUG(f"Notes.update_note({self.id=}, {row=}, {text=})") self.row = row if text: self.note = text session.commit() class Playdates(Base): __tablename__ = 'playdates' id: int = Column(Integer, primary_key=True, autoincrement=True) lastplayed: datetime = Column(DateTime, index=True, default=None) track_id: int = Column(Integer, ForeignKey('tracks.id')) tracks: RelationshipProperty = relationship( "Tracks", back_populates="playdates", lazy="joined") def __init__(self, session: Session, track: "Tracks") -> None: """Record that track was played""" DEBUG(f"add_playdate(track={track})") self.lastplayed = datetime.now() self.track_id = track.id track.update_lastplayed(session) session.add(self) session.commit() @staticmethod def last_played(session: Session, track_id: int) -> Optional[datetime]: """Return datetime track last played or None""" last_played: Optional[Playdates] = session.query( Playdates.lastplayed).filter((Playdates.track_id == track_id) ).order_by(Playdates.lastplayed.desc()).first() if last_played: return last_played[0] else: return None @staticmethod def remove_track(session: Session, track_id: int) -> None: """ Remove all records of track_id """ session.query(Playdates).filter( Playdates.track_id == track_id, ).delete() session.commit() class Playlists(Base): """ Manage playlists """ __tablename__ = "playlists" id: int = Column(Integer, primary_key=True, autoincrement=True) name: str = Column(String(32), nullable=False, unique=True) last_used: datetime = Column(DateTime, default=None, nullable=True) loaded: bool = Column(Boolean, default=True, nullable=False) notes = relationship( "Notes", order_by="Notes.row", back_populates="playlist", lazy="joined") tracks = association_proxy('playlist_tracks', 'tracks') row = association_proxy('playlist_tracks', 'row') def __init__(self, session: Session, name: str) -> None: self.name = name session.add(self) session.commit() def __repr__(self) -> str: return f"" def add_note(self, session: Session, row: int, text: str) -> Notes: """Add note to playlist at passed row""" return Notes(session, self.id, row, text) def add_track( self, session: Session, track: "Tracks", row: Optional[int] = None) -> None: """ Add track to playlist at given row. If row=None, add to end of playlist """ if not row: row = PlaylistTracks.next_free_row(session, self) PlaylistTracks(session, self.id, track.id, row) def close(self, session: Session) -> None: """Record playlist as no longer loaded""" self.loaded = False session.add(self) session.commit() @classmethod def get_all(cls, session: Session) -> List["Playlists"]: """Returns a list of all playlists ordered by last use""" return ( session.query(cls) .order_by(cls.last_used.desc()) ).all() @classmethod def get_by_id(cls, session: Session, playlist_id: int) -> "Playlists": return (session.query(cls).filter(cls.id == playlist_id)).one() @classmethod def get_closed(cls, session: Session) -> List["Playlists"]: """Returns a list of all closed playlists ordered by last use""" return ( session.query(cls) .filter(cls.loaded.is_(False)) .order_by(cls.last_used.desc()) ).all() @classmethod def get_open(cls, session: Session) -> List["Playlists"]: """ Return a list of playlists marked "loaded", ordered by loaded date. """ return ( session.query(cls) .filter(cls.loaded.is_(True)) .order_by(cls.last_used.desc()) ).all() def mark_open(self, session: Session) -> None: """Mark playlist as loaded and used now""" self.loaded = True self.last_used = datetime.now() if self not in session: session.add(self) session.commit() def remove_all_tracks(self, session: Session) -> None: """ Remove all tracks from this playlist """ session.query(PlaylistTracks).filter( PlaylistTracks.playlist_id == self.id, ).delete() session.commit() def remove_track(self, session: Session, row: int) -> None: DEBUG(f"Playlist.remove_track({self.id=}, {row=})") session.query(PlaylistTracks).filter( PlaylistTracks.playlist_id == self.id, PlaylistTracks.row == row ).delete() session.commit() class PlaylistTracks(Base): __tablename__ = 'playlist_tracks' id: int = Column(Integer, primary_key=True, autoincrement=True) playlist_id: int = Column(Integer, ForeignKey('playlists.id'), primary_key=True) track_id: int = Column(Integer, ForeignKey('tracks.id'), primary_key=True) row: int = Column(Integer, nullable=False) tracks: RelationshipProperty = relationship("Tracks") playlist: RelationshipProperty = relationship( Playlists, backref=backref( "playlist_tracks", collection_class=attribute_mapped_collection("row"), lazy="joined" ) ) def __init__( self, session: Session, playlist_id: int, track_id: int, row: int) -> None: DEBUG(f"PlaylistTracks.__init__({playlist_id=}, {track_id=}, {row=})") self.playlist_id = playlist_id self.track_id = track_id self.row = row session.add(self) session.commit() @staticmethod def move_track( session: Session, from_playlist_id: int, row: int, to_playlist_id: int) -> None: """ Move track between playlists. This would be more efficient with an ORM-enabled UPDATE statement, but this works just fine. """ DEBUG( "PlaylistTracks.move_tracks(" f"{from_playlist_id=}, {row=}, {to_playlist_id=})" ) new_row: int max_row: Optional[int] = session.query( func.max(PlaylistTracks.row)).filter( PlaylistTracks.playlist_id == to_playlist_id).scalar() if max_row is None: # Destination playlist is empty; use row 0 new_row = 0 else: # Destination playlist has tracks; add to end new_row = max_row + 1 try: record: PlaylistTracks = session.query(PlaylistTracks).filter( PlaylistTracks.playlist_id == from_playlist_id, PlaylistTracks.row == row).one() except NoResultFound: ERROR( f"No rows matched in query: " f"PlaylistTracks.playlist_id == {from_playlist_id}, " f"PlaylistTracks.row == {row}" ) return record.playlist_id = to_playlist_id record.row = new_row session.commit() @staticmethod def next_free_row(session: Session, playlist: Playlists) -> int: """Return next free row number""" row: int last_row: int = session.query( func.max(PlaylistTracks.row) ).filter_by(playlist_id=playlist.id).first() # if there are no rows, the above returns (None, ) which is True if last_row and last_row[0] is not None: row = last_row[0] + 1 else: row = 0 return row class Settings(Base): __tablename__ = 'settings' id: int = Column(Integer, primary_key=True, autoincrement=True) name: str = Column(String(32), nullable=False, unique=True) f_datetime: datetime = Column(DateTime, default=None, nullable=True) f_int: int = Column(Integer, default=None, nullable=True) f_string: str = Column(String(128), default=None, nullable=True) @classmethod def get_int_settings(cls, session: Session, name: str) -> "Settings": """Get setting for an integer or return new setting record""" int_setting: Settings try: int_setting = session.query(cls).filter( cls.name == name).one() except NoResultFound: int_setting = Settings() int_setting.name = name int_setting.f_int = None session.add(int_setting) session.commit() return int_setting def update(self, session: Session, data): for key, value in data.items(): assert hasattr(self, key) setattr(self, key, value) session.commit() class Tracks(Base): __tablename__ = 'tracks' id: int = Column(Integer, primary_key=True, autoincrement=True) title: str = Column(String(256), index=True) artist: str = Column(String(256), index=True) duration: int = Column(Integer, index=True) start_gap: int = Column(Integer, index=False) fade_at: int = Column(Integer, index=False) silence_at: int = Column(Integer, index=False) path: str = Column(String(2048), index=False, nullable=False) mtime: float = Column(Float, index=True) lastplayed: datetime = Column(DateTime, index=True, default=None) playlists: RelationshipProperty = relationship("PlaylistTracks", back_populates="tracks", lazy="joined") playdates: RelationshipProperty = relationship("Playdates", back_populates="tracks", lazy="joined") def __init__(self, session: Session, path: str) -> None: self.path = path session.add(self) session.commit() def __repr__(self) -> str: return ( f"" ) @staticmethod def get_all_paths(session) -> List[str]: """Return a list of paths of all tracks""" return [a[0] for a in session.query(Tracks.path).all()] @classmethod def get_all_tracks(cls, session: Session) -> List["Tracks"]: """Return a list of all tracks""" return session.query(cls).all() @classmethod def get_or_create(cls, session: Session, path: str) -> "Tracks": """ If a track with path exists, return it; else created new track and return it """ DEBUG(f"Tracks.get_or_create({path=})") try: track = session.query(cls).filter(cls.path == path).one() except NoResultFound: track = Tracks(session, path) return track @classmethod def get_from_filename(cls, session: Session, filename: str) \ -> Optional["Tracks"]: """ Return track if one and only one track in database has passed filename (ie, basename of path). Return None if zero or more than one track matches. """ DEBUG(f"Tracks.get_track_from_filename({filename=})") try: track = session.query(Tracks).filter(Tracks.path.ilike( f'%{os.path.sep}{filename}')).one() return track except (NoResultFound, MultipleResultsFound): return None @classmethod def get_from_path(cls, session: Session, path: str) -> List["Tracks"]: """ Return track with passee path, or None. """ DEBUG(f"Tracks.get_track_from_path({path=})") return session.query(Tracks).filter(Tracks.path == path).first() @classmethod def get_by_id(cls, session: Session, track_id: int) -> Optional["Tracks"]: """Return track or None""" try: DEBUG(f"Tracks.get_track(track_id={track_id})") track = session.query(Tracks).filter(Tracks.id == track_id).one() return track except NoResultFound: ERROR(f"get_track({track_id}): not found") return None def rescan(self, session: Session) -> None: """ Update audio metadata for passed track. """ audio: AudioSegment = get_audio_segment(self.path) self.duration = len(audio) self.fade_at = round(fade_point(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000 self.mtime = os.path.getmtime(self.path) self.silence_at = round(trailing_silence(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000 self.start_gap = leading_silence(audio) session.add(self) session.commit() @staticmethod def remove_by_path(session: Session, path: str) -> None: """Remove track with passed path from database""" DEBUG(f"Tracks.remove_path({path=})") try: session.query(Tracks).filter(Tracks.path == path).delete() session.commit() except IntegrityError as exception: ERROR(f"Can't remove track with {path=} ({exception=})") @classmethod def search_artists(cls, session: Session, text: str) -> List["Tracks"]: return ( session.query(cls) .filter(cls.artist.ilike(f"%{text}%")) .order_by(cls.title) ).all() @classmethod def search_titles(cls, session: Session, text: str) -> List["Tracks"]: return ( session.query(cls) .filter(cls.title.ilike(f"%{text}%")) .order_by(cls.title) ).all() def update_lastplayed(self, session: Session) -> None: self.lastplayed = datetime.now() session.add(self) session.commit() def update_artist(self, session: Session, artist: str) -> None: self.artist = artist session.add(self) session.commit() def update_title(self, session: Session, title: str) -> None: self.title = title session.add(self) session.commit() def update_path(self, newpath: str) -> None: self.path = newpath