#!/usr/bin/python3 import os.path import re from dbconfig import Session from datetime import datetime from typing import List, Optional from pydub import AudioSegment from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta from sqlalchemy import ( Boolean, Column, DateTime, Float, ForeignKey, func, Integer, String, UniqueConstraint, ) from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import ( backref, relationship, RelationshipProperty ) from sqlalchemy.orm.collections import attribute_mapped_collection from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound from config import Config from helpers import ( fade_point, get_audio_segment, leading_silence, trailing_silence, ) from log import DEBUG, ERROR Base: DeclarativeMeta = declarative_base() # Database classes class NoteColours(Base): __tablename__ = 'notecolours' id: int = Column(Integer, primary_key=True, autoincrement=True) substring: str = Column(String(256), index=False) colour: str = Column(String(21), index=False) enabled: bool = Column(Boolean, default=True, index=True) is_regex: bool = Column(Boolean, default=False, index=False) is_casesensitive: bool = Column(Boolean, default=False, index=False) order: int = Column(Integer, index=True) def __init__( self, session: Session, substring: str, colour: str, enabled: bool = True, is_regex: bool = False, is_casesensitive: bool = False, order: int = 0) -> None: self.substring = substring self.colour = colour self.enabled = enabled self.is_regex = is_regex self.is_casesensitive = is_casesensitive self.order = order session.add(self) session.flush() def __repr__(self) -> str: return ( f"" ) @classmethod def get_all(cls, session: Session) -> Optional[List["NoteColours"]]: """Return all records""" return session.query(cls).all() @classmethod def get_by_id(cls, session: Session, note_id: int) -> \ Optional["NoteColours"]: """Return record identified by id, or None if not found""" return session.query(NoteColours).filter( NoteColours.id == note_id).first() @staticmethod def get_colour(session: Session, text: str) -> Optional[str]: """ Parse text and return colour string if matched, else None """ for rec in ( session.query(NoteColours) .filter(NoteColours.enabled.is_(True)) .order_by(NoteColours.order) .all() ): if rec.is_regex: flags = re.UNICODE if not rec.is_casesensitive: flags |= re.IGNORECASE p = re.compile(rec.substring, flags) if p.match(text): return rec.colour else: if rec.is_casesensitive: if rec.substring in text: return rec.colour else: if rec.substring.lower() in text.lower(): return rec.colour return None class Notes(Base): __tablename__ = 'notes' id: int = Column(Integer, primary_key=True, autoincrement=True) playlist_id: int = Column(Integer, ForeignKey('playlists.id')) playlist: RelationshipProperty = relationship( "Playlists", back_populates="notes", lazy="joined") row: int = Column(Integer, nullable=False) note: str = Column(String(256), index=False) def __init__(self, session: Session, playlist_id: int, row: int, text: str) -> None: """Create note""" DEBUG(f"Notes.__init__({playlist_id=}, {row=}, {text=})") self.playlist_id = playlist_id self.row = row self.note = text session.add(self) session.flush() def __repr__(self) -> str: return ( f"" ) def delete_note(self, session: Session) -> None: """Delete note""" DEBUG(f"delete_note({self.id=}") session.query(Notes).filter_by(id=self.id).delete() session.flush() @classmethod def get_by_id(cls, session: Session, note_id: int) -> Optional["Notes"]: """Return note or None""" try: DEBUG(f"Notes.get_track(track_id={note_id})") note = session.query(cls).filter(cls.id == note_id).one() return note except NoResultFound: ERROR(f"get_track({note_id}): not found") return None def update( self, session: Session, row: int, text: Optional[str] = None) -> None: """ Update note details. If text=None, don't change text. """ DEBUG(f"Notes.update_note({self.id=}, {row=}, {text=})") self.row = row if text: self.note = text session.flush() class Playdates(Base): __tablename__ = 'playdates' id: int = Column(Integer, primary_key=True, autoincrement=True) lastplayed: datetime = Column(DateTime, index=True, default=None) track_id: int = Column(Integer, ForeignKey('tracks.id')) track: RelationshipProperty = relationship( "Tracks", back_populates="playdates", lazy="joined") def __init__(self, session: Session, track_id: int) -> None: """Record that track was played""" DEBUG(f"add_playdate({track_id=})") self.lastplayed = datetime.now() self.track_id = track_id session.add(self) session.flush() @staticmethod def last_played(session: Session, track_id: int) -> Optional[datetime]: """Return datetime track last played or None""" last_played: Optional[Playdates] = session.query( Playdates.lastplayed).filter( (Playdates.track_id == track_id) ).order_by(Playdates.lastplayed.desc()).first() if last_played: return last_played[0] else: return None @staticmethod def played_after(session: Session, since: datetime) -> List["Playdates"]: """Return a list of Playdates objects since passed time""" return session.query(Playdates).filter( Playdates.lastplayed >= since).all() @staticmethod def remove_track(session: Session, track_id: int) -> None: """ Remove all records of track_id """ session.query(Playdates).filter( Playdates.track_id == track_id).delete() session.flush() class Playlists(Base): """ Manage playlists """ __tablename__ = "playlists" id: int = Column(Integer, primary_key=True, autoincrement=True) name: str = Column(String(32), nullable=False, unique=True) last_used: datetime = Column(DateTime, default=None, nullable=True) loaded: bool = Column(Boolean, default=True, nullable=False) notes = relationship( "Notes", order_by="Notes.row", back_populates="playlist", lazy="joined" ) tracks = association_proxy('playlist_tracks', 'tracks') row = association_proxy('playlist_tracks', 'row') def __init__(self, session: Session, name: str) -> None: self.name = name session.add(self) session.flush() def __repr__(self) -> str: return f"" def add_track( self, session: Session, track_id: int, row: Optional[int] = None) -> None: """ Add track to playlist at given row. If row=None, add to end of playlist """ if not row: row = PlaylistTracks.next_free_row(session, self.id) PlaylistTracks(session, self.id, track_id, row) def close(self, session: Session) -> None: """Record playlist as no longer loaded""" self.loaded = False session.add(self) session.flush() @classmethod def get_all(cls, session: Session) -> List["Playlists"]: """Returns a list of all playlists ordered by last use""" return ( session.query(cls).order_by(cls.last_used.desc()) ).all() @classmethod def get_by_id(cls, session: Session, playlist_id: int) -> "Playlists": return (session.query(cls).filter(cls.id == playlist_id)).one() @classmethod def get_closed(cls, session: Session) -> List["Playlists"]: """Returns a list of all closed playlists ordered by last use""" return ( session.query(cls) .filter(cls.loaded.is_(False)) .order_by(cls.last_used.desc()) ).all() @classmethod def get_open(cls, session: Session) -> List["Playlists"]: """ Return a list of playlists marked "loaded", ordered by loaded date. """ return ( session.query(cls) .filter(cls.loaded.is_(True)) .order_by(cls.last_used.desc()) ).all() def mark_open(self, session: Session) -> None: """Mark playlist as loaded and used now""" self.loaded = True self.last_used = datetime.now() session.flush() def move_track( self, session: Session, rows: List[int], to_playlist: "Playlists") -> None: """Move tracks to another playlist""" for row in rows: track = self.tracks[row] to_playlist.add_track(session, track.id) del self.tracks[row] session.flush() def remove_all_tracks(self, session: Session) -> None: """ Remove all tracks from this playlist """ self.tracks = {} session.flush() def remove_track(self, session: Session, row: int) -> None: DEBUG(f"Playlist.remove_track({self.id=}, {row=})") # Get tracks collection for this playlist tracks_collections = self.tracks # Tracks are a dictionary of tracks keyed on row # number. Remove the relevant row. del tracks_collections[row] # Save the new tracks collection self.tracks = tracks_collections session.flush() class PlaylistTracks(Base): __tablename__ = 'playlist_tracks' id: int = Column(Integer, primary_key=True, autoincrement=True) playlist_id: int = Column(Integer, ForeignKey('playlists.id'), primary_key=True) track_id: int = Column(Integer, ForeignKey('tracks.id'), primary_key=True) row: int = Column(Integer, nullable=False) tracks: RelationshipProperty = relationship("Tracks") playlist: RelationshipProperty = relationship( Playlists, backref=backref( "playlist_tracks", collection_class=attribute_mapped_collection("row"), lazy="joined", cascade="all, delete-orphan" ) ) # Ensure row numbers are unique within each playlist __table_args__ = (UniqueConstraint ('row', 'playlist_id', name="uniquerow"), ) def __init__( self, session: Session, playlist_id: int, track_id: int, row: int) -> None: DEBUG(f"PlaylistTracks.__init__({playlist_id=}, {track_id=}, {row=})") self.playlist_id = playlist_id self.track_id = track_id self.row = row session.add(self) session.flush() @staticmethod def next_free_row(session: Session, playlist_id: int) -> int: """Return next free row number""" row: int last_row = session.query( func.max(PlaylistTracks.row) ).filter_by(playlist_id=playlist_id).first() # if there are no rows, the above returns (None, ) which is True if last_row and last_row[0] is not None: row = last_row[0] + 1 else: row = 0 return row @staticmethod def move_rows( session: Session, rows: List[int], from_playlist_id: int, to_playlist_id: int) -> None: """Move rows between playlists""" # A constraint deliberately blocks duplicate (playlist_id, row) # entries in database; however, unallocated rows in the database # are fine (ie, we can have rows 1, 4, 6 and no 2, 3, 5). # Unallocated rows will be automatically removed when the # playlist is saved. lowest_source_row: int = min(rows) first_destination_free_row = PlaylistTracks.next_free_row( session, to_playlist_id) # Calculate offset that will put the lowest row number being # moved at the first free row in destination playlist offset = first_destination_free_row - lowest_source_row session.query(PlaylistTracks).filter( PlaylistTracks.playlist_id == from_playlist_id, PlaylistTracks.row.in_(rows) ).update({'playlist_id': to_playlist_id, 'row': PlaylistTracks.row + offset}, False ) class Settings(Base): __tablename__ = 'settings' id: int = Column(Integer, primary_key=True, autoincrement=True) name: str = Column(String(32), nullable=False, unique=True) f_datetime: datetime = Column(DateTime, default=None, nullable=True) f_int: int = Column(Integer, default=None, nullable=True) f_string: str = Column(String(128), default=None, nullable=True) @classmethod def get_int_settings(cls, session: Session, name: str) -> "Settings": """Get setting for an integer or return new setting record""" int_setting: Settings try: int_setting = session.query(cls).filter( cls.name == name).one() except NoResultFound: int_setting = Settings() int_setting.name = name int_setting.f_int = None session.add(int_setting) session.flush() return int_setting def update(self, session: Session, data): for key, value in data.items(): assert hasattr(self, key) setattr(self, key, value) session.flush() class Tracks(Base): __tablename__ = 'tracks' id: int = Column(Integer, primary_key=True, autoincrement=True) title: str = Column(String(256), index=True) artist: str = Column(String(256), index=True) duration: int = Column(Integer, index=True) start_gap: int = Column(Integer, index=False) fade_at: int = Column(Integer, index=False) silence_at: int = Column(Integer, index=False) path: str = Column(String(2048), index=False, nullable=False) mtime: float = Column(Float, index=True) lastplayed: datetime = Column(DateTime, index=True, default=None) playlists: RelationshipProperty = relationship("PlaylistTracks", back_populates="tracks", lazy="joined") playdates: RelationshipProperty = relationship("Playdates", back_populates="track" "", lazy="joined") def __init__( self, session: Session, path: str, title: Optional[str] = None, artist: Optional[str] = None, duration: int = 0, start_gap: int = 0, fade_at: Optional[int] = None, silence_at: Optional[int] = None, mtime: Optional[float] = None, lastplayed: Optional[datetime] = None, ) -> None: self.path = path self.title = title self.artist = artist self.duration = duration self.start_gap = start_gap self.fade_at = fade_at self.silence_at = silence_at self.mtime = mtime self.lastplayed = lastplayed session.add(self) session.flush() def __repr__(self) -> str: return ( f"" ) @staticmethod def get_all_paths(session) -> List[str]: """Return a list of paths of all tracks""" return [a[0] for a in session.query(Tracks.path).all()] @classmethod def get_all_tracks(cls, session: Session) -> List["Tracks"]: """Return a list of all tracks""" return session.query(cls).all() @classmethod def get_or_create(cls, session: Session, path: str) -> "Tracks": """ If a track with path exists, return it; else created new track and return it """ DEBUG(f"Tracks.get_or_create({path=})") try: track = session.query(cls).filter(cls.path == path).one() except NoResultFound: track = Tracks(session, path) return track @classmethod def get_by_filename(cls, session: Session, filename: str) \ -> Optional["Tracks"]: """ Return track if one and only one track in database has passed filename (ie, basename of path). Return None if zero or more than one track matches. """ DEBUG(f"Tracks.get_track_from_filename({filename=})") try: track = session.query(Tracks).filter(Tracks.path.ilike( f'%{os.path.sep}{filename}')).one() return track except (NoResultFound, MultipleResultsFound): return None @classmethod def get_by_path(cls, session: Session, path: str) -> List["Tracks"]: """ Return track with passee path, or None. """ DEBUG(f"Tracks.get_track_from_path({path=})") return session.query(Tracks).filter(Tracks.path == path).first() @classmethod def get_by_id(cls, session: Session, track_id: int) -> Optional["Tracks"]: """Return track or None""" try: DEBUG(f"Tracks.get_track(track_id={track_id})") track = session.query(Tracks).filter(Tracks.id == track_id).one() return track except NoResultFound: ERROR(f"get_track({track_id}): not found") return None def rescan(self, session: Session) -> None: """ Update audio metadata for passed track. """ audio: AudioSegment = get_audio_segment(self.path) self.duration = len(audio) self.fade_at = round(fade_point(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000 self.mtime = os.path.getmtime(self.path) self.silence_at = round(trailing_silence(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000 self.start_gap = leading_silence(audio) session.add(self) session.flush() @staticmethod def remove_by_path(session: Session, path: str) -> None: """Remove track with passed path from database""" DEBUG(f"Tracks.remove_path({path=})") try: session.query(Tracks).filter(Tracks.path == path).delete() session.flush() except IntegrityError as exception: ERROR(f"Can't remove track with {path=} ({exception=})") @classmethod def search_artists(cls, session: Session, text: str) -> List["Tracks"]: return ( session.query(cls) .filter(cls.artist.ilike(f"%{text}%")) .order_by(cls.title) ).all() @classmethod def search_titles(cls, session: Session, text: str) -> List["Tracks"]: return ( session.query(cls) .filter(cls.title.ilike(f"%{text}%")) .order_by(cls.title) ).all() @staticmethod def update_lastplayed(session: Session, track_id: int) -> None: """Update the last_played field to current datetime""" rec = session.query(Tracks).get(track_id) rec.lastplayed = datetime.now() session.add(rec) session.flush() def update_artist(self, session: Session, artist: str) -> None: self.artist = artist session.add(self) session.flush() def update_title(self, session: Session, title: str) -> None: self.title = title session.add(self) session.flush() def update_path(self, session, newpath: str) -> None: self.path = newpath session.commit()