#!/usr/bin/python3 # import os.path import re # from dbconfig import Session # from datetime import datetime from typing import List, Optional # # from pydub import AudioSegment from sqlalchemy.ext.associationproxy import association_proxy # from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta from sqlalchemy import ( Boolean, Column, DateTime, delete, Float, ForeignKey, func, Integer, select, String, UniqueConstraint, ) # from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import ( backref, declarative_base, relationship, RelationshipProperty ) from sqlalchemy.orm.collections import attribute_mapped_collection from sqlalchemy.orm.exc import ( # MultipleResultsFound, NoResultFound ) # from config import Config from helpers import ( fade_point, get_audio_segment, get_tags, leading_silence, trailing_silence, ) from log import log # Base = declarative_base() # Database classes class NoteColours(Base): __tablename__ = 'notecolours' id = Column(Integer, primary_key=True, autoincrement=True) substring = Column(String(256), index=False) colour = Column(String(21), index=False) enabled = Column(Boolean, default=True, index=True) is_regex = Column(Boolean, default=False, index=False) is_casesensitive = Column(Boolean, default=False, index=False) order = Column(Integer, index=True) def __repr__(self) -> str: return ( f"" ) # def __init__( # self, session: Session, substring: str, colour: str, # enabled: bool = True, is_regex: bool = False, # is_casesensitive: bool = False, order: int = 0) -> None: # self.substring = substring # self.colour = colour # self.enabled = enabled # self.is_regex = is_regex # self.is_casesensitive = is_casesensitive # self.order = order # # session.add(self) # session.flush() # # @classmethod # def get_all(cls, session: Session) -> # Optional[List["NoteColours"]]: # """Return all records""" # # return session.query(cls).all() # # @classmethod # def get_by_id(cls, session: Session, note_id: int) -> \ # Optional["NoteColours"]: # """Return record identified by id, or None if not found""" # # return session.query(NoteColours).filter( # NoteColours.id == note_id).first() @staticmethod def get_colour(session: Session, text: str) -> Optional[str]: """ Parse text and return colour string if matched, else None """ if not text: return None for rec in session.execute( select(NoteColours) .filter(NoteColours.enabled.is_(True)) .order_by(NoteColours.order) ).scalars().all(): if rec.is_regex: flags = re.UNICODE if not rec.is_casesensitive: flags |= re.IGNORECASE p = re.compile(rec.substring, flags) if p.match(text): return rec.colour else: if rec.is_casesensitive: if rec.substring in text: return rec.colour else: if rec.substring.lower() in text.lower(): return rec.colour return None class Playdates(Base): __tablename__ = 'playdates' id: int = Column(Integer, primary_key=True, autoincrement=True) lastplayed = Column(DateTime, index=True, default=None) track_id = Column(Integer, ForeignKey('tracks.id')) track = relationship("Tracks", back_populates="playdates") def __repr__(self) -> str: return ( f"" ) def __init__(self, session: Session, track_id: int) -> None: """Record that track was played""" self.lastplayed = datetime.now() self.track_id = track_id session.add(self) session.commit() @staticmethod def last_played(session: Session, track_id: int) -> Optional[datetime]: """Return datetime track last played or None""" last_played = session.execute( select(Playdates.lastplayed) .where(Playdates.track_id == track_id) .order_by(Playdates.lastplayed.desc()) .limit(1) ).first() if last_played: return last_played[0] else: return None @staticmethod def played_after(session: Session, since: datetime) -> List["Playdates"]: """Return a list of Playdates objects since passed time""" return ( session.execute( select(Playdates) .where(Playdates.lastplayed >= since) .order_by(Playdates.lastplayed) ) .scalars() .all() ) # @staticmethod # def remove_track(session: Session, track_id: int) -> None: # """ # Remove all records of track_id # """ # # session.query(Playdates).filter( # Playdates.track_id == track_id).delete() # session.flush() class Playlists(Base): """ Manage playlists """ __tablename__ = "playlists" id: int = Column(Integer, primary_key=True, autoincrement=True) name: str = Column(String(32), nullable=False, unique=True) last_used = Column(DateTime, default=None, nullable=True) loaded: bool = Column(Boolean, default=True, nullable=False) rows = relationship( "PlaylistRows", back_populates="playlist", cascade="all, delete-orphan", order_by="PlaylistRows.row_number" ) def __repr__(self) -> str: return f"" def __init__(self, session: Session, name: str) -> None: self.name = name session.add(self) session.commit() # def add_track( # self, session: Session, track_id: int, # row: Optional[int] = None) -> None: # """ # Add track to playlist at given row. # If row=None, add to end of playlist # """ # # if row is None: # row = self.next_free_row(session, self.id) # # xPlaylistTracks(session, self.id, track_id, row) def close(self, session: Session) -> None: """Mark playlist as unloaded""" self.loaded = False @classmethod def get_all(cls, session: Session) -> List["Playlists"]: """Returns a list of all playlists ordered by last use""" return ( session.execute( select(cls) .order_by(cls.loaded.desc(), cls.last_used.desc()) ) .scalars() .all() ) @classmethod def get_closed(cls, session: Session) -> List["Playlists"]: """Returns a list of all closed playlists ordered by last use""" return ( session.execute( select(cls) .filter(cls.loaded.is_(False)) .order_by(cls.last_used.desc()) ) .scalars() .all() ) @classmethod def get_open(cls, session: Session) -> List[Optional["Playlists"]]: """ Return a list of playlists marked "loaded", ordered by loaded date. """ return ( session.execute( select(cls) .where(cls.loaded.is_(True)) .order_by(cls.last_used.desc()) ) .scalars() .all() ) def mark_open(self, session: Session) -> None: """Mark playlist as loaded and used now""" self.loaded = True self.last_used = datetime.now() # def remove_track(self, session: Session, row: int) -> None: # log.debug(f"Playlist.remove_track({self.id=}, {row=})") # # # Refresh self first (this is necessary when calling # remove_track # # multiple times before session.commit()) # session.refresh(self) # # Get tracks collection for this playlist # # Tracks are a dictionary of tracks keyed on row # # number. Remove the relevant row. # del self.tracks[row] # # Save the new tracks collection # session.flush() # class PlaylistRows(Base): __tablename__ = 'playlist_rows' id = Column(Integer, primary_key=True, autoincrement=True) row_number = Column(Integer, nullable=False) note = Column(String(2048), index=False) playlist_id = Column(Integer, ForeignKey('playlists.id'), nullable=False) playlist = relationship(Playlists, back_populates="rows") track_id = Column(Integer, ForeignKey('tracks.id'), nullable=True) track = relationship("Tracks", back_populates="playlistrows") played = Column(Boolean, nullable=False, index=False, default=False) def __repr__(self) -> str: return ( f"" ) def __init__( self, session: Session, playlist_id: int, track_id: int, row_number: int) -> None: """Create PlaylistRows object""" self.playlist_id = playlist_id self.track_id = track_id self.row_number = row_number session.add(self) session.flush() @staticmethod def delete_higher_rows(session: Session, playlist_id: int, row: int) \ -> None: """ Delete rows in given playlist that have a higher row number than 'row' """ # Log the rows to be deleted rows_to_go = session.execute( select(PlaylistRows) .where(PlaylistRows.playlist_id == playlist_id, PlaylistRows.row_number > row) ).scalars().all() if not rows_to_go: return for row in rows_to_go: log.debug(f"Should delete: {row}") # If needed later: # session.delete(row) rows_to_go = session.execute( select(PlaylistRows) .where(PlaylistRows.playlist_id == playlist_id, PlaylistRows.row_number > row) ).scalars().all() @staticmethod def delete_rows(session: Session, ids: List[int]) -> None: """ Delete passed ids """ session.execute( delete(PlaylistRows) .where(PlaylistRows.id.in_(ids)) ) # Delete won't take effect until commit() session.commit() @staticmethod def fixup_rownumbers(session: Session, playlist_id: int) -> None: """ Ensure the row numbers for passed playlist have no gaps """ plrs = session.execute( select(PlaylistRows) .where(PlaylistRows.playlist_id == playlist_id) .order_by(PlaylistRows.row_number) ).scalars().all() for i, plr in enumerate(plrs): plr.row_number = i # Ensure new row numbers are available to the caller session.commit() @classmethod def get_played_rows(cls, session: Session, playlist_id: int) -> List[int]: """ For passed playlist, return a list of rows that have been played. """ plrs = session.execute( select(cls) .where( cls.playlist_id == playlist_id, cls.played.is_(True) ) .order_by(cls.row_number) ).scalars().all() return plrs @classmethod def get_rows_with_tracks(cls, session: Session, playlist_id: int) -> List[int]: """ For passed playlist, return a list of rows that contain tracks """ plrs = session.execute( select(cls) .where( cls.playlist_id == playlist_id, cls.track_id.is_not(None) ) .order_by(cls.row_number) ).scalars().all() return plrs @staticmethod def get_last_used_row(session: Session, playlist_id: int) -> Optional[int]: """Return the last used row for playlist, or None if no rows""" return session.execute( select(func.max(PlaylistRows.row_number)) .where(PlaylistRows.playlist_id == playlist_id) ).scalar_one() @classmethod def get_unplayed_rows(cls, session: Session, playlist_id: int) -> List[int]: """ For passed playlist, return a list of track rows that have not been played. """ plrs = session.execute( select(cls) .where( cls.playlist_id == playlist_id, cls.track_id.is_not(None), cls.played.is_(False) ) .order_by(cls.row_number) ).scalars().all() return plrs class Settings(Base): """Manage settings""" __tablename__ = 'settings' id: int = Column(Integer, primary_key=True, autoincrement=True) name: str = Column(String(64), nullable=False, unique=True) f_datetime = Column(DateTime, default=None, nullable=True) f_int: int = Column(Integer, default=None, nullable=True) f_string = Column(String(128), default=None, nullable=True) def __repr__(self) -> str: value = self.f_datetime or self.f_int or self.f_string return f"" @classmethod def get_int_settings(cls, session: Session, name: str) -> "Settings": """Get setting for an integer or return new setting record""" int_setting: Settings try: int_setting = session.execute( select(cls) .where(cls.name == name) ).scalar_one() except NoResultFound: int_setting = Settings() int_setting.name = name int_setting.f_int = None session.add(int_setting) return int_setting def update(self, session: Session, data: "Settings"): for key, value in data.items(): assert hasattr(self, key) setattr(self, key, value) session.flush() class Tracks(Base): __tablename__ = 'tracks' id: int = Column(Integer, primary_key=True, autoincrement=True) title = Column(String(256), index=True) artist = Column(String(256), index=True) duration = Column(Integer, index=True) start_gap = Column(Integer, index=False) fade_at = Column(Integer, index=False) silence_at = Column(Integer, index=False) path = Column(String(2048), index=False, nullable=False, unique=True) mtime = Column(Float, index=True) bitrate = Column(Integer, nullable=True, default=None) playlistrows = relationship("PlaylistRows", back_populates="track") playlists = association_proxy("playlistrows", "playlist") playdates = relationship("Playdates", back_populates="track") def __repr__(self) -> str: return ( f"" ) def __init__( self, session: Session, path: str, title: Optional[str] = None, artist: Optional[str] = None, duration: int = 0, start_gap: int = 0, fade_at: Optional[int] = None, silence_at: Optional[int] = None, mtime: Optional[float] = None, lastplayed: Optional[datetime] = None, ) -> None: self.path = path self.title = title self.artist = artist self.duration = duration self.start_gap = start_gap self.fade_at = fade_at self.silence_at = silence_at self.mtime = mtime self.lastplayed = lastplayed session.add(self) session.commit() @classmethod def get_all(cls, session) -> List["Tracks"]: """Return a list of all tracks""" return session.execute(select(cls)).scalars().all() @classmethod def get_by_path(cls, session: Session, path: str) -> "Tracks": """ Return track with passed path, or None. """ try: return ( session.execute( select(Tracks) .where(Tracks.path == path) ).scalar_one() ) except NoResultFound: return None @classmethod def search_artists(cls, session: Session, text: str) -> List["Tracks"]: """Search case-insenstively for artists containing str""" return ( session.execute( select(cls) .where(cls.artist.ilike(f"%{text}%")) .order_by(cls.title) ) .scalars() .all() ) @classmethod def search_titles(cls, session: Session, text: str) -> List["Tracks"]: """Search case-insenstively for titles containing str""" return ( session.execute( select(cls) .where(cls.title.ilike(f"%{text}%")) .order_by(cls.title) ) .scalars() .all() )