#!/usr/bin/python3 import sqlalchemy from datetime import datetime from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import ( Column, DateTime, Float, ForeignKey, Integer, String, func ) from sqlalchemy.orm.exc import NoResultFound from sqlalchemy.orm import relationship, sessionmaker from config import Config from log import DEBUG, ERROR # Create session at the global level as per # https://docs.sqlalchemy.org/en/13/orm/session_basics.html # Set up database connection engine = sqlalchemy.create_engine(f"{Config.MYSQL_CONNECT}?charset=utf8", encoding='utf-8', echo=Config.DISPLAY_SQL, pool_pre_ping=True) Base = declarative_base() Base.metadata.create_all(engine) # Create a Session Session = sessionmaker(bind=engine) session = Session() # Database classes class Notes(Base): __tablename__ = 'notes' id = Column(Integer, primary_key=True, autoincrement=True) playlist_id = Column(Integer, ForeignKey('playlists.id')) playlist = relationship("Playlists", back_populates="notes") row = Column(Integer, nullable=False) note = Column(String(256), index=False) def __repr__(self): return ( f"" ) @staticmethod def add_note(playlist_id, row, text): DEBUG(f"add_note(playlist_id={playlist_id}, row={row}, text={text})") note = Notes() note.playlist_id = playlist_id note.row = row note.note = text session.add(note) session.commit() return note.id @classmethod def update_note(cls, id, row, text): DEBUG(f"update_note(id={id}, row={row}, text={text})") note = session.query(cls).filter(cls.id == id).one() note.row = row note.note = text session.commit() class Playdates(Base): __tablename__ = 'playdates' id = Column(Integer, primary_key=True, autoincrement=True) lastplayed = Column(DateTime, index=True, default=None) track_id = Column(Integer, ForeignKey('tracks.id')) tracks = relationship("Tracks", back_populates="playdates") @staticmethod def add_playdate(track): DEBUG(f"add_playdate(track={track})") pd = Playdates() pd.lastplayed = datetime.now() pd.track_id = track.id session.add(pd) track.update_lastplayed() session.commit() class Playlists(Base): """ Usage: pl = session.query(Playlists).filter(Playlists.id == 1).one() pl pl.tracks [<__main__.PlaylistTracks at 0x7fcd20181c18>, <__main__.PlaylistTracks at 0x7fcd20181c88>, <__main__.PlaylistTracks at 0x7fcd20181be0>, <__main__.PlaylistTracks at 0x7fcd20181c50>] [a.tracks for a in pl.tracks] [ glue.track_id = tr.id pl.tracks.append(glue) session.commit() [a.tracks for a in pl.tracks] [") @staticmethod def new(name): DEBUG(f"Playlists.new(name={name})") pl = Playlists() pl.name = name session.add(pl) session.commit() return pl.id @staticmethod def get_all_playlists(): "Returns a list of (id, name) of all playlists" return session.query(Playlists).all() @classmethod def get_playlist_by_id(cls, plid): "Returns a playlist object for playlist id" return session.query(Playlists).filter(Playlists.id == plid).one() def add_track(self, track, row): glue = PlaylistTracks(row=row) glue.track_id = track.id self.tracks.append(glue) def get_notes(self): return [a.note for a in self.notes] def get_tracks(self): return [a.tracks for a in self.tracks] class PlaylistTracks(Base): __tablename__ = 'playlisttracks' id = Column(Integer, primary_key=True, autoincrement=True) playlist_id = Column(Integer, ForeignKey('playlists.id'), primary_key=True) track_id = Column(Integer, ForeignKey('tracks.id'), primary_key=True) row = Column(Integer, nullable=False) tracks = relationship("Tracks", back_populates="playlists") playlists = relationship("Playlists", back_populates="tracks") @staticmethod def new_row(playlist_id): "Return row number > largest existing row number" last_row = session.query(func.max(PlaylistTracks.row)).one()[0] return last_row + 1 @staticmethod def remove_track(playlist_id, track_id): DEBUG(f"remove_track(playlist_id={playlist_id}, track_id={track_id})") session.query(PlaylistTracks).filter( PlaylistTracks.playlist_id == playlist_id, PlaylistTracks.track_id == track_id ).delete() @staticmethod def update_track_row(playlist_id, track_id, old_row, new_row): DEBUG( f"update_track_row(playlist_id={playlist_id}, " f"track_id={track_id}, old_row={old_row}, new_row={new_row})" ) plt = session.query(PlaylistTracks).filter( PlaylistTracks.playlist_id == playlist_id, PlaylistTracks.track_id == track_id, PlaylistTracks.row == old_row ).one() plt.row = new_row session.commit() @staticmethod def remove_track(playlist_id, track_id): DEBUG(f"remove_track({playlist_id}, {track_id})") session.query(PlaylistTracks).filter( PlaylistTracks.playlist_id == playlist_id, PlaylistTracks.track_id == track_id ).delete() class Settings(Base): __tablename__ = 'settings' id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(32), nullable=False, unique=True) f_datetime = Column(DateTime, default=None, nullable=True) f_int = Column(Integer, default=None, nullable=True) f_string = Column(String(128), default=None, nullable=True) @classmethod def get_int(cls, name): try: int_setting = session.query(cls).filter( cls.name == name).one() except NoResultFound: int_setting = Settings() int_setting.name = name int_setting.f_int = None session.add(int_setting) session.commit() return int_setting def update(self, data): for key, value in data.items(): assert hasattr(self, key) setattr(self, key, value) session.commit() class Tracks(Base): __tablename__ = 'tracks' id = Column(Integer, primary_key=True, autoincrement=True) title = Column(String(256), index=True) artist = Column(String(256), index=True) duration = Column(Integer, index=True) start_gap = Column(Integer, index=False) fade_at = Column(Integer, index=False) silence_at = Column(Integer, index=False) path = Column(String(2048), index=False, nullable=False) mtime = Column(Float, index=True) lastplayed = Column(DateTime, index=True, default=None) playlists = relationship("PlaylistTracks", back_populates="tracks") playdates = relationship("Playdates", back_populates="tracks") def __repr__(self): return ( f"" ) @classmethod def get_or_create(cls, path): DEBUG(f"Tracks.get_or_create(path={path})") try: track = session.query(cls).filter(cls.path == path).one() except NoResultFound: track = Tracks() track.path = path session.add(track) return track @staticmethod def get_duration(id): try: return session.query( Tracks.duration).filter(Tracks.id == id).one()[0] except NoResultFound: ERROR(f"Can't find track id {id}") return None @staticmethod def get_all_paths(): "Return a list of paths of all tracks" return [a[0] for a in session.query(Tracks.path).all()] @staticmethod def get_path(id): try: return session.query(Tracks.path).filter(Tracks.id == id).one()[0] except NoResultFound: ERROR(f"Can't find track id {id}") return None @staticmethod def get_track(id): try: DEBUG(f"Tracks.get_track(track_id={id})") track = session.query(Tracks).filter(Tracks.id == id).one() return track except NoResultFound: ERROR(f"get_track({id}): not found") return None @staticmethod def search_titles(text): return ( session.query(Tracks) .filter(Tracks.title.ilike(f"%{text}%")) .order_by(Tracks.title) ).all() @staticmethod def track_from_id(id): return session.query(Tracks).filter( Tracks.id == id).one() def update_lastplayed(self): self.lastplayed = datetime.now()