#!/usr/bin/python3 # import os.path import re # from dbconfig import Session # from datetime import datetime from typing import List, Optional # # from pydub import AudioSegment from sqlalchemy.ext.associationproxy import association_proxy # from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta from sqlalchemy import ( Boolean, Column, DateTime, delete, Float, ForeignKey, func, Integer, select, String, UniqueConstraint, ) # from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import ( backref, declarative_base, relationship, RelationshipProperty ) from sqlalchemy.orm.collections import attribute_mapped_collection from sqlalchemy.orm.exc import ( # MultipleResultsFound, NoResultFound ) # from config import Config from helpers import ( fade_point, get_audio_segment, leading_silence, trailing_silence, ) from log import log # Base = declarative_base() # Database classes class NoteColours(Base): __tablename__ = 'notecolours' id = Column(Integer, primary_key=True, autoincrement=True) substring = Column(String(256), index=False) colour = Column(String(21), index=False) enabled = Column(Boolean, default=True, index=True) is_regex = Column(Boolean, default=False, index=False) is_casesensitive = Column(Boolean, default=False, index=False) order = Column(Integer, index=True) def __repr__(self) -> str: return ( f"" ) # # def __init__( # self, session: Session, substring: str, colour: str, # enabled: bool = True, is_regex: bool = False, # is_casesensitive: bool = False, order: int = 0) -> None: # self.substring = substring # self.colour = colour # self.enabled = enabled # self.is_regex = is_regex # self.is_casesensitive = is_casesensitive # self.order = order # # session.add(self) # session.flush() # # @classmethod # def get_all(cls, session: Session) -> Optional[List["NoteColours"]]: # """Return all records""" # # return session.query(cls).all() # # @classmethod # def get_by_id(cls, session: Session, note_id: int) -> \ # Optional["NoteColours"]: # """Return record identified by id, or None if not found""" # # return session.query(NoteColours).filter( # NoteColours.id == note_id).first() @staticmethod def get_colour(session: Session, text: str) -> Optional[str]: """ Parse text and return colour string if matched, else None """ if not text: return None for rec in session.execute( select(NoteColours) .filter(NoteColours.enabled.is_(True)) .order_by(NoteColours.order) ).scalars().all(): if rec.is_regex: flags = re.UNICODE if not rec.is_casesensitive: flags |= re.IGNORECASE p = re.compile(rec.substring, flags) if p.match(text): return rec.colour else: if rec.is_casesensitive: if rec.substring in text: return rec.colour else: if rec.substring.lower() in text.lower(): return rec.colour return None # class Notes(Base): # __tablename__ = 'notes' # # id: int = Column(Integer, primary_key=True, autoincrement=True) # playlist_id: int = Column(Integer, ForeignKey('playlists.id')) # playlist: RelationshipProperty = relationship( # "Playlists", back_populates="notes", lazy="joined") # row: int = Column(Integer, nullable=False) # note: str = Column(String(256), index=False) # # def __init__(self, session: Session, playlist_id: int, # row: int, text: str) -> None: # """Create note""" # # log.debug(f"Notes.__init__({playlist_id=}, {row=}, {text=})") # self.playlist_id = playlist_id # self.row = row # self.note = text # session.add(self) # session.flush() # # def __repr__(self) -> str: # return ( # f"" # ) # # def delete_note(self, session: Session) -> None: # """Delete note""" # # log.debug(f"delete_note({self.id=}") # # session.query(Notes).filter_by(id=self.id).delete() # session.flush() # # @staticmethod # def max_used_row(session: Session, playlist_id: int) -> Optional[int]: # """ # Return maximum notes row for passed playlist ID or None if not notes # """ # # last_row = session.query(func.max(Notes.row)).filter_by( # playlist_id=playlist_id).first() # # if there are no rows, the above returns (None, ) which is True # if last_row and last_row[0] is not None: # return last_row[0] # else: # return None # # def move_row(self, session: Session, row: int, to_playlist_id: int) \ # -> None: # """ # Move note to another playlist # """ # # self.row = row # self.playlist_id = to_playlist_id # session.commit() # # @classmethod # def get_by_id(cls, session: Session, note_id: int) -> Optional["Notes"]: # """Return note or None""" # # try: # log.debug(f"Notes.get_track(track_id={note_id})") # note = session.query(cls).filter(cls.id == note_id).one() # return note # except NoResultFound: # log.error(f"get_track({note_id}): not found") # return None # # def update( # self, session: Session, row: int, # text: Optional[str] = None) -> None: # """ # Update note details. If text=None, don't change text. # """ # # log.debug(f"Notes.update_note({self.id=}, {row=}, {text=})") # # self.row = row # if text: # self.note = text # session.flush() class Playdates(Base): __tablename__ = 'playdates' id: int = Column(Integer, primary_key=True, autoincrement=True) lastplayed = Column(DateTime, index=True, default=None) track_id = Column(Integer, ForeignKey('tracks.id')) track = relationship("Tracks", back_populates="playdates") def __repr__(self) -> str: return ( f"" ) def __init__(self, session: Session, track_id: int) -> None: """Record that track was played""" self.lastplayed = datetime.now() self.track_id = track_id session.add(self) session.commit() @staticmethod def last_played(session: Session, track_id: int) -> Optional[datetime]: """Return datetime track last played or None""" last_played = session.execute( select(Playdates.lastplayed) .where(Playdates.track_id == track_id) .order_by(Playdates.lastplayed.desc()) .limit(1) ).first() if last_played: return last_played[0] else: return None @staticmethod def played_after(session: Session, since: datetime) -> List["Playdates"]: """Return a list of Playdates objects since passed time""" return ( session.execute( select(Playdates) .where(Playdates.lastplayed >= since) .order_by(Playdates.lastplayed) ) .scalars() .all() ) # # @staticmethod # def remove_track(session: Session, track_id: int) -> None: # """ # Remove all records of track_id # """ # # session.query(Playdates).filter( # Playdates.track_id == track_id).delete() # session.flush() class Playlists(Base): """ Manage playlists """ __tablename__ = "playlists" id: int = Column(Integer, primary_key=True, autoincrement=True) name: str = Column(String(32), nullable=False, unique=True) last_used = Column(DateTime, default=None, nullable=True) loaded: bool = Column(Boolean, default=True, nullable=False) rows = relationship( "PlaylistRows", back_populates="playlist", cascade="all, delete-orphan", order_by="PlaylistRows.row_number" ) def __repr__(self) -> str: return f"" # # def __init__(self, session: Session, name: str) -> None: # self.name = name # session.add(self) # session.flush() # # def add_track( # self, session: Session, track_id: int, # row: Optional[int] = None) -> None: # """ # Add track to playlist at given row. # If row=None, add to end of playlist # """ # # if row is None: # row = self.next_free_row(session, self.id) # # xPlaylistTracks(session, self.id, track_id, row) # def close(self, session: Session) -> None: """Mark playlist as unloaded""" self.loaded = False @classmethod def get_all(cls, session: Session) -> List["Playlists"]: """Returns a list of all playlists ordered by last use""" return ( session.query(cls).order_by( cls.loaded.desc(), cls.last_used.desc()) ).all() # # @classmethod # def get_closed(cls, session: Session) -> List["Playlists"]: # """Returns a list of all closed playlists ordered by last use""" # # return ( # session.query(cls) # .filter(cls.loaded.is_(False)) # .order_by(cls.last_used.desc()) # ).all() @classmethod def get_open(cls, session: Session) -> List[Optional["Playlists"]]: """ Return a list of playlists marked "loaded", ordered by loaded date. """ return ( session.execute( select(cls) .where(cls.loaded.is_(True)) .order_by(cls.last_used.desc()) ) .scalars() .all() ) def mark_open(self, session: Session) -> None: """Mark playlist as loaded and used now""" self.loaded = True self.last_used = datetime.now() # session.flush() # # @staticmethod # def next_free_row(session: Session, playlist_id: int) -> int: # """Return next free row for this playlist""" # # max_notes_row = Notes.max_used_row(session, playlist_id) # max_tracks_row = xPlaylistTracks.max_used_row(session, playlist_id) # # if max_notes_row is not None and max_tracks_row is not None: # return max(max_notes_row, max_tracks_row) + 1 # # if max_notes_row is None and max_tracks_row is None: # return 0 # # if max_notes_row is None: # return max_tracks_row + 1 # else: # return max_notes_row + 1 # # def remove_all_tracks(self, session: Session) -> None: # """ # Remove all tracks from this playlist # """ # # self.tracks = {} # session.flush() # # def remove_track(self, session: Session, row: int) -> None: # log.debug(f"Playlist.remove_track({self.id=}, {row=})") # # # Refresh self first (this is necessary when calling remove_track # # multiple times before session.commit()) # session.refresh(self) # # Get tracks collection for this playlist # # Tracks are a dictionary of tracks keyed on row # # number. Remove the relevant row. # del self.tracks[row] # # Save the new tracks collection # session.flush() # # # class PlaylistTracks(Base): # __tablename__ = 'playlist_tracks' # # id: int = Column(Integer, primary_key=True, autoincrement=True) # playlist_id: int = Column(Integer, ForeignKey('playlists.id'), # primary_key=True) # track_id: int = Column(Integer, ForeignKey('tracks.id'), primary_key=True) # row: int = Column(Integer, nullable=False) # tracks: RelationshipProperty = relationship("Tracks") # playlist: RelationshipProperty = relationship( # Playlists, # backref=backref( # "playlist_tracks", # collection_class=attribute_mapped_collection("row"), # lazy="joined", # cascade="all, delete-orphan" # ) # ) class PlaylistRows(Base): __tablename__ = 'playlist_rows' id = Column(Integer, primary_key=True, autoincrement=True) row_number = Column(Integer, nullable=False) note = Column(String(2048), index=False) playlist_id = Column(Integer, ForeignKey('playlists.id'), nullable=False) playlist = relationship(Playlists, back_populates="rows") track_id = Column(Integer, ForeignKey('tracks.id'), nullable=True) track = relationship("Tracks", back_populates="playlistrows") played = Column(Boolean, nullable=False, index=False, default=False) def __repr__(self) -> str: return ( f"" ) # def __init__( # self, session: Session, playlist_id: int, track_id: int, # row: int) -> None: # log.debug(f"xPlaylistTracks.__init__({playlist_id=}, {track_id=}, {row=})") # # self.playlist_id = playlist_id # self.track_id = track_id # self.row = row # session.add(self) # session.flush() # @staticmethod def delete_higher_rows(session: Session, playlist_id: int, row: int) \ -> None: """ Delete rows in given playlist that have a higher row number than 'row' """ # Log the rows to be deleted rows_to_go = session.execute( select(PlaylistRows) .where(PlaylistRows.playlist_id == playlist_id, PlaylistRows.row_number > row) ).scalars().all() if not rows_to_go: return for row in rows_to_go: log.debug(f"Should delete: {row}") # If needed later: # session.delete(row) rows_to_go = session.execute( select(PlaylistRows) .where(PlaylistRows.playlist_id == playlist_id, PlaylistRows.row_number > row) ).scalars().all() @staticmethod def delete_rows(session: Session, ids: List[int]) -> None: """ Delete passed ids """ session.execute( delete(PlaylistRows) .where(PlaylistRows.id.in_(ids)) ) # Delete won't take effect until commit() session.commit() @staticmethod def fixup_rownumbers(session: Session, playlist_id: int) -> None: """ Ensure the row numbers for passed playlist have no gaps """ plrs = session.execute( select(PlaylistRows) .where(PlaylistRows.playlist_id == playlist_id) .order_by(PlaylistRows.row_number) ).scalars().all() for i, plr in enumerate(plrs): plr.row_number = i # Ensure new row numbers are available to the caller session.commit() @staticmethod def get_played_rows(session: Session, playlist_id: int) -> List[int]: """ For passed playlist, return a list of row numbers that have been played. """ plrs = session.execute( select(PlaylistRows.row_number) .where( PlaylistRows.playlist_id == playlist_id, PlaylistRows.played.is_(True) ) .order_by(PlaylistRows.row_number) ).scalars().all() return plrs @staticmethod def get_rows_with_tracks(session: Session, playlist_id: int) -> List[int]: """ For passed playlist, return a list of all row numbers that contain tracks """ plrs = session.execute( select(PlaylistRows.row_number) .where( PlaylistRows.playlist_id == playlist_id, PlaylistRows.track_id.is_not(None) ) .order_by(PlaylistRows.row_number) ).scalars().all() return plrs @staticmethod def move_to_playlist(session: Session, playlistrow_ids: List[int], destination_playlist_id: int) -> None: """ Move the list of playlistrow_ids to the end of destination_playlist """ # Find last row of destination playlist last_row = session.execute( select(func.max(PlaylistRows.row_number)) .where(PlaylistRows.playlist_id == destination_playlist_id) ).scalar_one() if last_row is None: last_row = 0 # Update the PlaylistRows entries for plr_id in playlistrow_ids: last_row += 1 plr = session.get(PlaylistRows, plr_id) plr.row_number = last_row plr.playlist_id = destination_playlist_id session.commit() # @classmethod # def get_playlist_rows(cls, playlist_id: int) -> \ # Optional[List["PlaylistRows"]]: # """ # Return a list of PlaylistRows for passed playlist ordered by row # """ # # return session.execute( # select(cls) # .where(cls.playlist_id == playlist_id) # .order_by(cls.row_number) # ).scalars().all() # # @staticmethod # def max_used_row(session: Session, playlist_id: int) -> Optional[int]: # """ # Return highest track row number used or None if there are no # tracks # """ # # last_row = session.query( # func.max(xPlaylistTracks.row) # ).filter_by(playlist_id=playlist_id).first() # # if there are no rows, the above returns (None, ) which is True # if last_row and last_row[0] is not None: # return last_row[0] # else: # return None # # @staticmethod # def move_row(session: Session, from_row: int, from_playlist_id: int, # to_row: int, to_playlist_id: int) -> None: # """Move row to another playlist""" # # session.query(xPlaylistTracks).filter( # xPlaylistTracks.playlist_id == from_playlist_id, # xPlaylistTracks.row == from_row).update( # {'playlist_id': to_playlist_id, 'row': to_row}, False) class Settings(Base): """Manage settings""" __tablename__ = 'settings' id: int = Column(Integer, primary_key=True, autoincrement=True) name: str = Column(String(64), nullable=False, unique=True) f_datetime = Column(DateTime, default=None, nullable=True) f_int: int = Column(Integer, default=None, nullable=True) f_string = Column(String(128), default=None, nullable=True) def __repr__(self) -> str: value = self.f_datetime or self.f_int or self.f_string return f"" @classmethod def get_int_settings(cls, session: Session, name: str) -> "Settings": """Get setting for an integer or return new setting record""" int_setting: Settings try: int_setting = session.execute( select(cls) .where(cls.name == name) ).scalar_one() except NoResultFound: int_setting = Settings() int_setting.name = name int_setting.f_int = None session.add(int_setting) return int_setting def update(self, session: Session, data: "Settings"): for key, value in data.items(): assert hasattr(self, key) setattr(self, key, value) session.flush() class Tracks(Base): __tablename__ = 'tracks' id: int = Column(Integer, primary_key=True, autoincrement=True) title = Column(String(256), index=True) artist = Column(String(256), index=True) duration = Column(Integer, index=True) start_gap = Column(Integer, index=False) fade_at = Column(Integer, index=False) silence_at = Column(Integer, index=False) path = Column(String(2048), index=False, nullable=False) mtime = Column(Float, index=True) # lastplayed = Column(DateTime, index=True, default=None) playlistrows = relationship("PlaylistRows", back_populates="track") playlists = association_proxy("playlistrows", "playlist") playdates = relationship("Playdates", back_populates="track") def __repr__(self) -> str: return ( f"" ) # # # def __init__( # self, # session: Session, # path: str, # title: Optional[str] = None, # artist: Optional[str] = None, # duration: int = 0, # start_gap: int = 0, # fade_at: Optional[int] = None, # silence_at: Optional[int] = None, # mtime: Optional[float] = None, # lastplayed: Optional[datetime] = None, # ) -> None: # self.path = path # self.title = title # self.artist = artist # self.duration = duration # self.start_gap = start_gap # self.fade_at = fade_at # self.silence_at = silence_at # self.mtime = mtime # self.lastplayed = lastplayed # # session.add(self) # session.flush() # # @staticmethod # def get_all_paths(session) -> List[str]: # """Return a list of paths of all tracks""" # # return [a[0] for a in session.query(Tracks.path).all()] # # @classmethod # def get_all_tracks(cls, session: Session) -> List["Tracks"]: # """Return a list of all tracks""" # # return session.query(cls).all() # # @classmethod # def get_or_create(cls, session: Session, path: str) -> "Tracks": # """ # If a track with path exists, return it; # else created new track and return it # """ # # log.debug(f"Tracks.get_or_create({path=})") # # try: # track = session.query(cls).filter(cls.path == path).one() # except NoResultFound: # track = Tracks(session, path) # # return track # # @classmethod # def get_by_filename(cls, session: Session, filename: str) \ # -> Optional["Tracks"]: # """ # Return track if one and only one track in database has passed # filename (ie, basename of path). Return None if zero or more # than one track matches. # """ # # log.debug(f"Tracks.get_track_from_filename({filename=})") # try: # track = session.query(Tracks).filter(Tracks.path.ilike( # f'%{os.path.sep}{filename}')).one() # return track # except (NoResultFound, MultipleResultsFound): # return None # # @classmethod # def get_by_path(cls, session: Session, path: str) -> List["Tracks"]: # """ # Return track with passee path, or None. # """ # # log.debug(f"Tracks.get_track_from_path({path=})") # # return session.query(Tracks).filter(Tracks.path == path).first() # # @classmethod # def get_by_id(cls, session: Session, track_id: int) -> Optional["Tracks"]: # """Return track or None""" # # try: # log.debug(f"Tracks.get_track(track_id={track_id})") # track = session.query(Tracks).filter(Tracks.id == track_id).one() # return track # except NoResultFound: # log.error(f"get_track({track_id}): not found") # return None def rescan(self, session: Session) -> None: """ Update audio metadata for passed track. """ audio: AudioSegment = get_audio_segment(self.path) self.duration = len(audio) self.fade_at = round(fade_point(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000 self.mtime = os.path.getmtime(self.path) self.silence_at = round(trailing_silence(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000 self.start_gap = leading_silence(audio) session.add(self) session.flush() # # @staticmethod # def remove_by_path(session: Session, path: str) -> None: # """Remove track with passed path from database""" # # log.debug(f"Tracks.remove_path({path=})") # # try: # session.query(Tracks).filter(Tracks.path == path).delete() # session.flush() # except IntegrityError as exception: # log.error(f"Can't remove track with {path=} ({exception=})") # # @classmethod # def search_artists(cls, session: Session, text: str) -> List["Tracks"]: # # return ( # session.query(cls) # .filter(cls.artist.ilike(f"%{text}%")) # .order_by(cls.title) # ).all() # # @classmethod # def search_titles(cls, session: Session, text: str) -> List["Tracks"]: # return ( # session.query(cls) # .filter(cls.title.ilike(f"%{text}%")) # .order_by(cls.title) # ).all() # # @staticmethod # def update_lastplayed(session: Session, track_id: int) -> None: # """Update the last_played field to current datetime""" # # rec = session.query(Tracks).get(track_id) # rec.lastplayed = datetime.now() # session.add(rec) # session.flush() # # def update_artist(self, session: Session, artist: str) -> None: # self.artist = artist # session.add(self) # session.flush() # # def update_title(self, session: Session, title: str) -> None: # self.title = title # session.add(self) # session.flush() # # def update_path(self, session, newpath: str) -> None: # self.path = newpath # session.commit()