#!/usr/bin/python3 # # import os.path # import re # from dbconfig import Session # from datetime import datetime from typing import List, Optional # # from pydub import AudioSegment # from sqlalchemy.ext.associationproxy import association_proxy # from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta from sqlalchemy import ( Boolean, Column, DateTime, Float, ForeignKey, # func, Integer, String, # UniqueConstraint, select, ) # from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import ( backref, declarative_base, relationship, RelationshipProperty ) from sqlalchemy.orm.collections import attribute_mapped_collection from sqlalchemy.orm.exc import ( # MultipleResultsFound, NoResultFound ) # # from config import Config # from helpers import ( # fade_point, # get_audio_segment, # leading_silence, # trailing_silence, # ) # from log import log.debug, log.error # Base = declarative_base() # # # Database classes class NoteColours(Base): __tablename__ = 'notecolours' id: int = Column(Integer, primary_key=True, autoincrement=True) substring: str = Column(String(256), index=False) colour: str = Column(String(21), index=False) enabled: bool = Column(Boolean, default=True, index=True) is_regex: bool = Column(Boolean, default=False, index=False) is_casesensitive: bool = Column(Boolean, default=False, index=False) order: int = Column(Integer, index=True) # # def __init__( # self, session: Session, substring: str, colour: str, # enabled: bool = True, is_regex: bool = False, # is_casesensitive: bool = False, order: int = 0) -> None: # self.substring = substring # self.colour = colour # self.enabled = enabled # self.is_regex = is_regex # self.is_casesensitive = is_casesensitive # self.order = order # # session.add(self) # session.flush() # # def __repr__(self) -> str: # return ( # f"" # ) # # @classmethod # def get_all(cls, session: Session) -> Optional[List["NoteColours"]]: # """Return all records""" # # return session.query(cls).all() # # @classmethod # def get_by_id(cls, session: Session, note_id: int) -> \ # Optional["NoteColours"]: # """Return record identified by id, or None if not found""" # # return session.query(NoteColours).filter( # NoteColours.id == note_id).first() # # @staticmethod # def get_colour(session: Session, text: str) -> Optional[str]: # """ # Parse text and return colour string if matched, else None # """ # # for rec in ( # session.query(NoteColours) # .filter(NoteColours.enabled.is_(True)) # .order_by(NoteColours.order) # .all() # ): # if rec.is_regex: # flags = re.UNICODE # if not rec.is_casesensitive: # flags |= re.IGNORECASE # p = re.compile(rec.substring, flags) # if p.match(text): # return rec.colour # else: # if rec.is_casesensitive: # if rec.substring in text: # return rec.colour # else: # if rec.substring.lower() in text.lower(): # return rec.colour # # return None class Notes(Base): __tablename__ = 'notes' id: int = Column(Integer, primary_key=True, autoincrement=True) playlist_id: int = Column(Integer, ForeignKey('playlists.id')) playlist: RelationshipProperty = relationship( "Playlists", back_populates="notes", lazy="joined") row: int = Column(Integer, nullable=False) note: str = Column(String(256), index=False) # # def __init__(self, session: Session, playlist_id: int, # row: int, text: str) -> None: # """Create note""" # # log.debug(f"Notes.__init__({playlist_id=}, {row=}, {text=})") # self.playlist_id = playlist_id # self.row = row # self.note = text # session.add(self) # session.flush() # # def __repr__(self) -> str: # return ( # f"" # ) # # def delete_note(self, session: Session) -> None: # """Delete note""" # # log.debug(f"delete_note({self.id=}") # # session.query(Notes).filter_by(id=self.id).delete() # session.flush() # # @staticmethod # def max_used_row(session: Session, playlist_id: int) -> Optional[int]: # """ # Return maximum notes row for passed playlist ID or None if not notes # """ # # last_row = session.query(func.max(Notes.row)).filter_by( # playlist_id=playlist_id).first() # # if there are no rows, the above returns (None, ) which is True # if last_row and last_row[0] is not None: # return last_row[0] # else: # return None # # def move_row(self, session: Session, row: int, to_playlist_id: int) \ # -> None: # """ # Move note to another playlist # """ # # self.row = row # self.playlist_id = to_playlist_id # session.commit() # # @classmethod # def get_by_id(cls, session: Session, note_id: int) -> Optional["Notes"]: # """Return note or None""" # # try: # log.debug(f"Notes.get_track(track_id={note_id})") # note = session.query(cls).filter(cls.id == note_id).one() # return note # except NoResultFound: # log.error(f"get_track({note_id}): not found") # return None # # def update( # self, session: Session, row: int, # text: Optional[str] = None) -> None: # """ # Update note details. If text=None, don't change text. # """ # # log.debug(f"Notes.update_note({self.id=}, {row=}, {text=})") # # self.row = row # if text: # self.note = text # session.flush() class Playdates(Base): __tablename__ = 'playdates' id: int = Column(Integer, primary_key=True, autoincrement=True) lastplayed: datetime = Column(DateTime, index=True, default=None) track_id: int = Column(Integer, ForeignKey('tracks.id')) track: RelationshipProperty = relationship( "Tracks", back_populates="playdates", lazy="joined") # # def __init__(self, session: Session, track_id: int) -> None: # """Record that track was played""" # # log.debug(f"add_playdate({track_id=})") # # self.lastplayed = datetime.now() # self.track_id = track_id # session.add(self) # session.flush() # # @staticmethod # def last_played(session: Session, track_id: int) -> Optional[datetime]: # """Return datetime track last played or None""" # # last_played: Optional[Playdates] = session.query( # Playdates.lastplayed).filter( # (Playdates.track_id == track_id) # ).order_by(Playdates.lastplayed.desc()).first() # if last_played: # return last_played[0] # else: # return None # # @staticmethod # def played_after(session: Session, since: datetime) -> List["Playdates"]: # """Return a list of Playdates objects since passed time""" # # return session.query(Playdates).filter( # Playdates.lastplayed >= since).all() # # @staticmethod # def remove_track(session: Session, track_id: int) -> None: # """ # Remove all records of track_id # """ # # session.query(Playdates).filter( # Playdates.track_id == track_id).delete() # session.flush() class Playlists(Base): """ Manage playlists """ __tablename__ = "playlists" id: int = Column(Integer, primary_key=True, autoincrement=True) name: str = Column(String(32), nullable=False, unique=True) last_used: datetime = Column(DateTime, default=None, nullable=True) loaded: bool = Column(Boolean, default=True, nullable=False) # notes = relationship( # "Notes", order_by="Notes.row", # back_populates="playlist", lazy="select" # ) # # tracks = association_proxy('playlist_tracks', 'tracks') # row = association_proxy('playlist_tracks', 'row') # # def __init__(self, session: Session, name: str) -> None: # self.name = name # session.add(self) # session.flush() def __repr__(self) -> str: return f"" # # def add_track( # self, session: Session, track_id: int, # row: Optional[int] = None) -> None: # """ # Add track to playlist at given row. # If row=None, add to end of playlist # """ # # if row is None: # row = self.next_free_row(session, self.id) # # xPlaylistTracks(session, self.id, track_id, row) # # def close(self, session: Session) -> None: # """Record playlist as no longer loaded""" # # self.loaded = False # session.add(self) # session.flush() # # @classmethod # def get_all(cls, session: Session) -> List["Playlists"]: # """Returns a list of all playlists ordered by last use""" # # return ( # session.query(cls).order_by(cls.last_used.desc()) # ).all() # # @classmethod # def get_by_id(cls, session: Session, playlist_id: int) -> "Playlists": # return (session.query(cls).filter(cls.id == playlist_id)).one() # # @classmethod # def get_closed(cls, session: Session) -> List["Playlists"]: # """Returns a list of all closed playlists ordered by last use""" # # return ( # session.query(cls) # .filter(cls.loaded.is_(False)) # .order_by(cls.last_used.desc()) # ).all() @classmethod def get_open(cls, session: Session) -> List[Optional["Playlists"]]: """ Return a list of playlists marked "loaded", ordered by loaded date. """ return ( session.execute( select(cls) .where(cls.loaded.is_(True)) .order_by(cls.last_used.desc()) ) .scalars() .all() ) def mark_open(self, session: Session) -> None: """Mark playlist as loaded and used now""" self.loaded = True self.last_used = datetime.now() # session.flush() # # @staticmethod # def next_free_row(session: Session, playlist_id: int) -> int: # """Return next free row for this playlist""" # # max_notes_row = Notes.max_used_row(session, playlist_id) # max_tracks_row = xPlaylistTracks.max_used_row(session, playlist_id) # # if max_notes_row is not None and max_tracks_row is not None: # return max(max_notes_row, max_tracks_row) + 1 # # if max_notes_row is None and max_tracks_row is None: # return 0 # # if max_notes_row is None: # return max_tracks_row + 1 # else: # return max_notes_row + 1 # # def remove_all_tracks(self, session: Session) -> None: # """ # Remove all tracks from this playlist # """ # # self.tracks = {} # session.flush() # # def remove_track(self, session: Session, row: int) -> None: # log.debug(f"Playlist.remove_track({self.id=}, {row=})") # # # Refresh self first (this is necessary when calling remove_track # # multiple times before session.commit()) # session.refresh(self) # # Get tracks collection for this playlist # # Tracks are a dictionary of tracks keyed on row # # number. Remove the relevant row. # del self.tracks[row] # # Save the new tracks collection # session.flush() # # # class PlaylistTracks(Base): # __tablename__ = 'playlist_tracks' # # id: int = Column(Integer, primary_key=True, autoincrement=True) # playlist_id: int = Column(Integer, ForeignKey('playlists.id'), # primary_key=True) # track_id: int = Column(Integer, ForeignKey('tracks.id'), primary_key=True) # row: int = Column(Integer, nullable=False) # tracks: RelationshipProperty = relationship("Tracks") # playlist: RelationshipProperty = relationship( # Playlists, # backref=backref( # "playlist_tracks", # collection_class=attribute_mapped_collection("row"), # lazy="joined", # cascade="all, delete-orphan" # ) # ) class PlaylistRows(Base): __tablename__ = 'playlist_rows' id: int = Column(Integer, primary_key=True, autoincrement=True) playlist_id: int = Column(Integer, ForeignKey('playlists.id'), primary_key=True) row: int = Column(Integer, nullable=False) text: str = Column(String(2048), index=False) track_id: int = Column(Integer, ForeignKey('tracks.id'), primary_key=True) tracks: RelationshipProperty = relationship("Tracks") playlist: RelationshipProperty = relationship( Playlists, backref=backref( "playlist_tracks", collection_class=attribute_mapped_collection("row"), lazy="joined", cascade="all, delete-orphan" ) ) # # Ensure row numbers are unique within each playlist # __table_args__ = (UniqueConstraint # ('row', 'playlist_id', name="uniquerow"), # ) # # def __init__( # self, session: Session, playlist_id: int, track_id: int, # row: int) -> None: # log.debug(f"xPlaylistTracks.__init__({playlist_id=}, {track_id=}, {row=})") # # self.playlist_id = playlist_id # self.track_id = track_id # self.row = row # session.add(self) # session.flush() # # @staticmethod # def max_used_row(session: Session, playlist_id: int) -> Optional[int]: # """ # Return highest track row number used or None if there are no # tracks # """ # # last_row = session.query( # func.max(xPlaylistTracks.row) # ).filter_by(playlist_id=playlist_id).first() # # if there are no rows, the above returns (None, ) which is True # if last_row and last_row[0] is not None: # return last_row[0] # else: # return None # # @staticmethod # def move_row(session: Session, from_row: int, from_playlist_id: int, # to_row: int, to_playlist_id: int) -> None: # """Move row to another playlist""" # # session.query(xPlaylistTracks).filter( # xPlaylistTracks.playlist_id == from_playlist_id, # xPlaylistTracks.row == from_row).update( # {'playlist_id': to_playlist_id, 'row': to_row}, False) class Settings(Base): """Manage settings""" __tablename__ = 'settings' id: int = Column(Integer, primary_key=True, autoincrement=True) name: str = Column(String(64), nullable=False, unique=True) f_datetime: datetime = Column(DateTime, default=None, nullable=True) f_int: int = Column(Integer, default=None, nullable=True) f_string: str = Column(String(128), default=None, nullable=True) def __repr__(self) -> str: value = self.f_datetime or self.f_int or self.f_string return f"" @classmethod def get_int_settings(cls, session: Session, name: str) -> "Settings": """Get setting for an integer or return new setting record""" int_setting: Settings try: int_setting = session.execute( select(cls) .where(cls.name == name) ).scalar_one() except NoResultFound: int_setting = Settings() int_setting.name = name int_setting.f_int = None session.add(int_setting) return int_setting def update(self, session: Session, data): for key, value in data.items(): assert hasattr(self, key) setattr(self, key, value) session.flush() class Tracks(Base): __tablename__ = 'tracks' id: int = Column(Integer, primary_key=True, autoincrement=True) title: str = Column(String(256), index=True) artist: str = Column(String(256), index=True) duration: int = Column(Integer, index=True) start_gap: int = Column(Integer, index=False) fade_at: int = Column(Integer, index=False) silence_at: int = Column(Integer, index=False) path: str = Column(String(2048), index=False, nullable=False) mtime: float = Column(Float, index=True) lastplayed: datetime = Column(DateTime, index=True, default=None) playlists: RelationshipProperty = relationship("PlaylistRows", back_populates="tracks", lazy="joined") playdates: RelationshipProperty = relationship("Playdates", back_populates="track" "", lazy="joined") # # def __init__( # self, # session: Session, # path: str, # title: Optional[str] = None, # artist: Optional[str] = None, # duration: int = 0, # start_gap: int = 0, # fade_at: Optional[int] = None, # silence_at: Optional[int] = None, # mtime: Optional[float] = None, # lastplayed: Optional[datetime] = None, # ) -> None: # self.path = path # self.title = title # self.artist = artist # self.duration = duration # self.start_gap = start_gap # self.fade_at = fade_at # self.silence_at = silence_at # self.mtime = mtime # self.lastplayed = lastplayed # # session.add(self) # session.flush() # # def __repr__(self) -> str: # return ( # f"" # ) # # @staticmethod # def get_all_paths(session) -> List[str]: # """Return a list of paths of all tracks""" # # return [a[0] for a in session.query(Tracks.path).all()] # # @classmethod # def get_all_tracks(cls, session: Session) -> List["Tracks"]: # """Return a list of all tracks""" # # return session.query(cls).all() # # @classmethod # def get_or_create(cls, session: Session, path: str) -> "Tracks": # """ # If a track with path exists, return it; # else created new track and return it # """ # # log.debug(f"Tracks.get_or_create({path=})") # # try: # track = session.query(cls).filter(cls.path == path).one() # except NoResultFound: # track = Tracks(session, path) # # return track # # @classmethod # def get_by_filename(cls, session: Session, filename: str) \ # -> Optional["Tracks"]: # """ # Return track if one and only one track in database has passed # filename (ie, basename of path). Return None if zero or more # than one track matches. # """ # # log.debug(f"Tracks.get_track_from_filename({filename=})") # try: # track = session.query(Tracks).filter(Tracks.path.ilike( # f'%{os.path.sep}{filename}')).one() # return track # except (NoResultFound, MultipleResultsFound): # return None # # @classmethod # def get_by_path(cls, session: Session, path: str) -> List["Tracks"]: # """ # Return track with passee path, or None. # """ # # log.debug(f"Tracks.get_track_from_path({path=})") # # return session.query(Tracks).filter(Tracks.path == path).first() # # @classmethod # def get_by_id(cls, session: Session, track_id: int) -> Optional["Tracks"]: # """Return track or None""" # # try: # log.debug(f"Tracks.get_track(track_id={track_id})") # track = session.query(Tracks).filter(Tracks.id == track_id).one() # return track # except NoResultFound: # log.error(f"get_track({track_id}): not found") # return None # # def rescan(self, session: Session) -> None: # """ # Update audio metadata for passed track. # """ # # audio: AudioSegment = get_audio_segment(self.path) # self.duration = len(audio) # self.fade_at = round(fade_point(audio) / 1000, # Config.MILLISECOND_SIGFIGS) * 1000 # self.mtime = os.path.getmtime(self.path) # self.silence_at = round(trailing_silence(audio) / 1000, # Config.MILLISECOND_SIGFIGS) * 1000 # self.start_gap = leading_silence(audio) # session.add(self) # session.flush() # # @staticmethod # def remove_by_path(session: Session, path: str) -> None: # """Remove track with passed path from database""" # # log.debug(f"Tracks.remove_path({path=})") # # try: # session.query(Tracks).filter(Tracks.path == path).delete() # session.flush() # except IntegrityError as exception: # log.error(f"Can't remove track with {path=} ({exception=})") # # @classmethod # def search_artists(cls, session: Session, text: str) -> List["Tracks"]: # # return ( # session.query(cls) # .filter(cls.artist.ilike(f"%{text}%")) # .order_by(cls.title) # ).all() # # @classmethod # def search_titles(cls, session: Session, text: str) -> List["Tracks"]: # return ( # session.query(cls) # .filter(cls.title.ilike(f"%{text}%")) # .order_by(cls.title) # ).all() # # @staticmethod # def update_lastplayed(session: Session, track_id: int) -> None: # """Update the last_played field to current datetime""" # # rec = session.query(Tracks).get(track_id) # rec.lastplayed = datetime.now() # session.add(rec) # session.flush() # # def update_artist(self, session: Session, artist: str) -> None: # self.artist = artist # session.add(self) # session.flush() # # def update_title(self, session: Session, title: str) -> None: # self.title = title # session.add(self) # session.flush() # # def update_path(self, session, newpath: str) -> None: # self.path = newpath # session.commit()