#!/usr/bin/python3 import sqlalchemy from datetime import datetime from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import ( Boolean, Column, DateTime, Float, ForeignKey, Integer, String, func ) from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound from sqlalchemy.orm import relationship, sessionmaker from config import Config from log import DEBUG, ERROR # Create session at the global level as per # https://docs.sqlalchemy.org/en/13/orm/session_basics.html # Set up database connection engine = sqlalchemy.create_engine(f"{Config.MYSQL_CONNECT}?charset=utf8", encoding='utf-8', echo=Config.DISPLAY_SQL, pool_pre_ping=True) Base = declarative_base() Base.metadata.create_all(engine) # Create a Session factory Session = sessionmaker(bind=engine) # Database classes class Notes(Base): __tablename__ = 'notes' id = Column(Integer, primary_key=True, autoincrement=True) playlist_id = Column(Integer, ForeignKey('playlists.id')) playlist = relationship("Playlists", back_populates="notes") row = Column(Integer, nullable=False) note = Column(String(256), index=False) def __repr__(self): return ( f"" ) @staticmethod def add_note(session, playlist_id, row, text): DEBUG(f"add_note(playlist_id={playlist_id}, row={row}, text={text})") note = Notes() note.playlist_id = playlist_id note.row = row note.note = text session.add(note) session.commit() return note @staticmethod def delete_note(session, id): DEBUG(f"delete_note(id={id}") session.query(Notes).filter(Notes.id == id).delete() session.commit() # Not currently used 1 June 2021 # @staticmethod # def get_note(session, id): # return session.query(Notes).filter(Notes.id == id).one() @classmethod def update_note(cls, session, id, row, text=None): """ Update note details. If text=None, don't change text. """ DEBUG(f"update_note(id={id}, row={row}, text={text})") note = session.query(cls).filter(cls.id == id).one() note.row = row if text: note.note = text session.commit() class Playdates(Base): __tablename__ = 'playdates' id = Column(Integer, primary_key=True, autoincrement=True) lastplayed = Column(DateTime, index=True, default=None) track_id = Column(Integer, ForeignKey('tracks.id')) tracks = relationship("Tracks", back_populates="playdates") @staticmethod def add_playdate(session, track): DEBUG(f"add_playdate(track={track})") pd = Playdates() pd.lastplayed = datetime.now() pd.track_id = track.id session.add(pd) track.update_lastplayed() session.commit() class Playlists(Base): """ Usage: pl = session.query(Playlists).filter(Playlists.id == 1).one() pl pl.tracks [<__main__.PlaylistTracks at 0x7fcd20181c18>, <__main__.PlaylistTracks at 0x7fcd20181c88>, <__main__.PlaylistTracks at 0x7fcd20181be0>, <__main__.PlaylistTracks at 0x7fcd20181c50>] [a.tracks for a in pl.tracks] [ glue.track_id = tr.id pl.tracks.append(glue) session.commit() [a.tracks for a in pl.tracks] [") def add_track(self, session, track, row=None): """ Add track to playlist at given row. If row=None, add to end of playlist """ if not row: row = PlaylistTracks.new_row(session, self.id) glue = PlaylistTracks(row=row) glue.track_id = track.id self.tracks.append(glue) session.commit() def close(self, session): "Record playlist as no longer loaded" self.loaded = False session.add(self) session.commit() @staticmethod def get_all_closed_playlists(session): "Returns a list of all playlists not currently open" return ( session.query(Playlists) .filter( (Playlists.loaded == False) | # noqa E712 (Playlists.loaded == None) ) .order_by(Playlists.last_used.desc()) ).all() @staticmethod def get_all_playlists(session): "Returns a list of all playlists" return session.query(Playlists).all() @staticmethod def get_last_used(session): """ Return a list of playlists marked "loaded", ordered by loaded date. """ return ( session.query(Playlists) .filter(Playlists.loaded == True) # noqa E712 .order_by(Playlists.last_used.desc()) ).all() def get_notes(self): return [a.note for a in self.notes] def get_tracks(self): return [a.tracks for a in self.tracks] @staticmethod def new(session, name): DEBUG(f"Playlists.new(name={name})") playlist = Playlists() playlist.name = name session.add(playlist) session.commit() return playlist @staticmethod def open(session, plid): "Record playlist as loaded and used now" p = session.query(Playlists).filter(Playlists.id == plid).one() p.loaded = True p.last_used = datetime.now() session.commit() return p class PlaylistTracks(Base): __tablename__ = 'playlisttracks' id = Column(Integer, primary_key=True, autoincrement=True) playlist_id = Column(Integer, ForeignKey('playlists.id'), primary_key=True) track_id = Column(Integer, ForeignKey('tracks.id'), primary_key=True) row = Column(Integer, nullable=False) tracks = relationship("Tracks", back_populates="playlists") playlists = relationship("Playlists", back_populates="tracks") @staticmethod def add_track(session, playlist_id, track_id, row): DEBUG( f"PlaylistTracks.add_track(playlist_id={playlist_id}, " f"track_id={track_id}, row={row})" ) plt = PlaylistTracks() plt.playlist_id = playlist_id, plt.track_id = track_id, plt.row = row session.add(plt) session.commit() @staticmethod def move_track(session, from_playlist_id, row, to_playlist_id): DEBUG( "PlaylistTracks.move_tracks(from_playlist_id=" f"{from_playlist_id}, row={row}, " f"to_playlist_id={to_playlist_id})" ) new_row = ( session.query(func.max(PlaylistTracks.row)).filter( PlaylistTracks.playlist_id == to_playlist_id).scalar() ) + 1 record = session.query(PlaylistTracks).filter( PlaylistTracks.playlist_id == from_playlist_id, PlaylistTracks.row == row ).one() record.playlist_id = to_playlist_id record.row = new_row session.commit() @staticmethod def new_row(session, playlist_id): "Return row number > largest existing row number" last_row = session.query(func.max(PlaylistTracks.row)).one()[0] return last_row + 1 @staticmethod def remove_all_tracks(session, playlist_id): """ Remove all tracks from passed playlist_id """ session.query(PlaylistTracks).filter( PlaylistTracks.playlist_id == playlist_id, ).delete() session.commit() @staticmethod def remove_track(session, playlist_id, row): DEBUG( f"PlaylistTracks.remove_track(playlist_id={playlist_id}, " f"row={row})" ) session.query(PlaylistTracks).filter( PlaylistTracks.playlist_id == playlist_id, PlaylistTracks.row == row ).delete() session.commit() @staticmethod def update_row_track(session, playlist_id, row, track_id): DEBUG( f"PlaylistTracks.update_track_row(playlist_id={playlist_id}, " f"row={row}, track_id={track_id})" ) try: plt = session.query(PlaylistTracks).filter( PlaylistTracks.playlist_id == playlist_id, PlaylistTracks.row == row ).one() except MultipleResultsFound: ERROR( f"Multiple rows matched in query: " f"PlaylistTracks.playlist_id == {playlist_id}, " f"PlaylistTracks.row == {row}" ) return except NoResultFound: ERROR( f"No rows matched in query: " f"PlaylistTracks.playlist_id == {playlist_id}, " f"PlaylistTracks.row == {row}" ) return plt.track_id = track_id session.commit() class Settings(Base): __tablename__ = 'settings' id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(32), nullable=False, unique=True) f_datetime = Column(DateTime, default=None, nullable=True) f_int = Column(Integer, default=None, nullable=True) f_string = Column(String(128), default=None, nullable=True) @classmethod def get_int(cls, session, name): try: int_setting = session.query(cls).filter( cls.name == name).one() except NoResultFound: int_setting = Settings() int_setting.name = name int_setting.f_int = None session.add(int_setting) session.commit() return int_setting def update(self, session, data): for key, value in data.items(): assert hasattr(self, key) setattr(self, key, value) session.commit() class Tracks(Base): __tablename__ = 'tracks' id = Column(Integer, primary_key=True, autoincrement=True) title = Column(String(256), index=True) artist = Column(String(256), index=True) duration = Column(Integer, index=True) start_gap = Column(Integer, index=False) fade_at = Column(Integer, index=False) silence_at = Column(Integer, index=False) path = Column(String(2048), index=False, nullable=False) mtime = Column(Float, index=True) lastplayed = Column(DateTime, index=True, default=None) playlists = relationship("PlaylistTracks", back_populates="tracks") playdates = relationship("Playdates", back_populates="tracks") def __repr__(self): return ( f"" ) @classmethod def get_or_create(cls, session, path): DEBUG(f"Tracks.get_or_create(path={path})") try: track = session.query(cls).filter(cls.path == path).one() except NoResultFound: track = Tracks() track.path = path session.add(track) return track @staticmethod def get_duration(session, id): try: return session.query( Tracks.duration).filter(Tracks.id == id).one()[0] except NoResultFound: ERROR(f"Can't find track id {id}") return None @staticmethod def get_all_paths(session): "Return a list of paths of all tracks" return [a[0] for a in session.query(Tracks.path).all()] # Not used as of 1 June 2021 # @classmethod # def get_all_tracks(cls): # "Return a list of all tracks" # return session.query(cls).all() @staticmethod def get_path(session, id): try: return session.query(Tracks.path).filter(Tracks.id == id).one()[0] except NoResultFound: ERROR(f"Can't find track id {id}") return None @staticmethod def get_track(session, id): try: DEBUG(f"Tracks.get_track(track_id={id})") track = session.query(Tracks).filter(Tracks.id == id).one() return track except NoResultFound: ERROR(f"get_track({id}): not found") return None @staticmethod def search_titles(session, text): return ( session.query(Tracks) .filter(Tracks.title.ilike(f"%{text}%")) .order_by(Tracks.title) ).all() @staticmethod def track_from_id(session, id): return session.query(Tracks).filter( Tracks.id == id).one() def update_lastplayed(self): self.lastplayed = datetime.now()