#!/usr/bin/python3 import os.path import re import sqlalchemy from datetime import datetime from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import ( Boolean, Column, DateTime, Float, ForeignKey, Integer, String, func ) from sqlalchemy.exc import IntegrityError from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound from sqlalchemy.orm import relationship, sessionmaker, scoped_session from app.config import Config from app.log import DEBUG, ERROR # Create session at the global level as per # https://docs.sqlalchemy.org/en/13/orm/session_basics.html Base = declarative_base() Session = scoped_session(sessionmaker()) def db_init(): # Set up database connection global Session engine = sqlalchemy.create_engine( f"{Config.MYSQL_CONNECT}?charset=utf8", encoding='utf-8', echo=Config.DISPLAY_SQL, pool_pre_ping=True) Session.configure(bind=engine) Base.metadata.create_all(engine) # Create a Session factory Session = sessionmaker(bind=engine) # Database classes class NoteColours(Base): __tablename__ = 'notecolours' id = Column(Integer, primary_key=True, autoincrement=True) substring = Column(String(256), index=False) colour = Column(String(21), index=False) enabled = Column(Boolean, default=True, index=True) is_regex = Column(Boolean, default=False, index=False) is_casesensitive = Column(Boolean, default=False, index=False) order = Column(Integer, index=True) def __repr__(self): return ( f"" ) @classmethod def get_all(cls, session): """Return all records""" return session.query(cls).all() @classmethod def get_by_id(cls, session, note_id): """Return record identified by id, or None if not found""" return session.query(NoteColours).filter( NoteColours.id == note_id).first() @staticmethod def get_colour(session, text): """ Parse text and return colour string if matched, else None """ for rec in ( session.query(NoteColours) .filter(NoteColours.enabled.is_(True)) .order_by(NoteColours.order) .all() ): if rec.is_regex: flags = re.UNICODE if not rec.is_casesensitive: flags |= re.IGNORECASE p = re.compile(rec.substring, flags) if p.match(text): return rec.colour else: if rec.is_casesensitive: if rec.substring in text: return rec.colour else: if rec.substring.lower() in text.lower(): return rec.colour return None class Notes(Base): __tablename__ = 'notes' id = Column(Integer, primary_key=True, autoincrement=True) playlist_id = Column(Integer, ForeignKey('playlists.id')) playlist = relationship("Playlists", back_populates="notes") row = Column(Integer, nullable=False) note = Column(String(256), index=False) def __init__(self, session, playlist_id, row, text): """Create note""" DEBUG(f"Notes.__init__({playlist_id=}, {row=}, {text=})") self.playlist_id = playlist_id self.row = row self.note = text session.add(self) session.commit() def __repr__(self): return ( f"" ) def delete_note(self, session): """Delete note""" DEBUG(f"delete_note({self.id=}") session.query(self).filter(id == self.id).delete() session.commit() def update_note(self, session, row, text=None): """ Update note details. If text=None, don't change text. """ DEBUG(f"Notes.update_note({self.id=}, {row=}, {text=})") note = session.query(self).filter(id == self.id).one() note.row = row if text: note.note = text session.commit() class Playdates(Base): __tablename__ = 'playdates' id = Column(Integer, primary_key=True, autoincrement=True) lastplayed = Column(DateTime, index=True, default=None) track_id = Column(Integer, ForeignKey('tracks.id')) tracks = relationship("Tracks", back_populates="playdates") def __init__(self, session, track): """Record that track was played""" DEBUG(f"add_playdate(track={track})") self.lastplayed = datetime.now() self.track_id = track.id track.update_lastplayed(session) session.add(self) session.commit() @staticmethod def last_played(session, track_id): """Return datetime track last played or None""" last_played = session.query(Playdates.lastplayed).filter( (Playdates.track_id == track_id) ).order_by(Playdates.lastplayed.desc()).first() if last_played: return last_played[0] else: return None @staticmethod def remove_track(session, track_id): """ Remove all records of track_id """ session.query(Playdates).filter( Playdates.track_id == track_id, ).delete() session.commit() class Playlists(Base): """ Manage playlists """ __tablename__ = "playlists" id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(32), nullable=False, unique=True) last_used = Column(DateTime, default=None, nullable=True) loaded = Column(Boolean, default=True, nullable=False) notes = relationship("Notes", order_by="Notes.row", back_populates="playlist") tracks = relationship("PlaylistTracks", order_by="PlaylistTracks.row", back_populates="playlists") def __init__(self, session, name): self.name = name session.add(self) session.commit() def __repr__(self): return f"" def add_note(self, session, row, text): """Add note to playlist at passed row""" return Notes(session, self.id, row, text) def add_track(self, session, track, row=None): """ Add track to playlist at given row. If row=None, add to end of playlist """ if not row: last_row = session.query( func.max(PlaylistTracks.row) ).filter_by(playlist_id=self.id).first() if last_row: row = last_row[0] + 1 else: row = 0 PlaylistTracks(session, self.id, track.id, row) def close(self, session): """Record playlist as no longer loaded""" self.loaded = False session.add(self) session.commit() @classmethod def get_all(cls, session): """Returns a list of all playlists ordered by last use""" return ( session.query(cls) .order_by(cls.last_used.desc()) ).all() @classmethod def get_by_id(cls, session, playlist_id): return (session.query(cls).filter(cls.id == playlist_id)).one() @classmethod def get_closed(cls, session): """Returns a list of all closed playlists ordered by last use""" return ( session.query(cls) .filter(cls.loaded.is_(False)) .order_by(cls.last_used.desc()) ).all() @classmethod def get_open(cls, session): """ Return a list of playlists marked "loaded", ordered by loaded date. """ return ( session.query(cls) .filter(cls.loaded.is_(True)) .order_by(cls.last_used.desc()) ).all() def mark_open(self, session): """Mark playlist as loaded and used now""" self.loaded = True self.last_used = datetime.now() session.add(self) session.commit() def remove_all_tracks(self, session): """ Remove all tracks from this playlist """ session.query(PlaylistTracks).filter( PlaylistTracks.playlist_id == self.id, ).delete() session.commit() def remove_track(self, session, row): DEBUG(f"Playlist.remove_track({self.id=}, {row=})") session.query(PlaylistTracks).filter( PlaylistTracks.playlist_id == self.id, PlaylistTracks.row == row ).delete() session.commit() class PlaylistTracks(Base): __tablename__ = 'playlisttracks' id = Column(Integer, primary_key=True, autoincrement=True) playlist_id = Column(Integer, ForeignKey('playlists.id'), primary_key=True) track_id = Column(Integer, ForeignKey('tracks.id'), primary_key=True) row = Column(Integer, nullable=False) tracks = relationship("Tracks", back_populates="playlists") playlists = relationship("Playlists", back_populates="tracks") def __init__(self, session, playlist_id, track_id, row): DEBUG(f"PlaylistTracks.__init__({playlist_id=}, {track_id=}, {row=})") self.playlist_id = playlist_id, self.track_id = track_id, self.row = row session.add(self) session.commit() @staticmethod def get_track_playlists(session, track_id): """Return all PlaylistTracks objects with this track_id""" return session.query(PlaylistTracks).filter( PlaylistTracks.track_id == track_id).all() class Settings(Base): __tablename__ = 'settings' id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(32), nullable=False, unique=True) f_datetime = Column(DateTime, default=None, nullable=True) f_int = Column(Integer, default=None, nullable=True) f_string = Column(String(128), default=None, nullable=True) @classmethod def get_int(cls, session, name): try: int_setting = session.query(cls).filter( cls.name == name).one() except NoResultFound: int_setting = Settings() int_setting.name = name int_setting.f_int = None session.add(int_setting) session.commit() return int_setting def update(self, session, data): for key, value in data.items(): assert hasattr(self, key) setattr(self, key, value) session.commit() class Tracks(Base): __tablename__ = 'tracks' id = Column(Integer, primary_key=True, autoincrement=True) title = Column(String(256), index=True) artist = Column(String(256), index=True) duration = Column(Integer, index=True) start_gap = Column(Integer, index=False) fade_at = Column(Integer, index=False) silence_at = Column(Integer, index=False) path = Column(String(2048), index=False, nullable=False) mtime = Column(Float, index=True) lastplayed = Column(DateTime, index=True, default=None) playlists = relationship("PlaylistTracks", back_populates="tracks") playdates = relationship("Playdates", back_populates="tracks") def __init__(self, session, path): self.path = path session.add(self) session.commit() def __repr__(self): return ( f"" ) @staticmethod def get_all_paths(session): """Return a list of paths of all tracks""" return [a[0] for a in session.query(Tracks.path).all()] @classmethod def get_all_tracks(cls, session): """Return a list of all tracks""" return session.query(cls).all() @classmethod def get_or_create(cls, session, path): """ If a track with path exists, return it; else created new track and return it """ DEBUG(f"Tracks.get_or_create({path=})") try: track = session.query(cls).filter(cls.path == path).one() except NoResultFound: track = Tracks(session, path) return track @classmethod def get_from_filename(cls, session, filename): """ Return track if one and only one track in database has passed filename (ie, basename of path). Return None if zero or more than one track matches. """ DEBUG(f"Tracks.get_track_from_filename({filename=})") try: track = session.query(Tracks).filter(Tracks.path.ilike( f'%{os.path.sep}{filename}')).one() return track except (NoResultFound, MultipleResultsFound): return None @classmethod def get_from_id(cls, session, id): return session.query(Tracks).filter( Tracks.id == id).one() @classmethod def get_from_path(cls, session, path): """ Return track with passee path, or None. """ DEBUG(f"Tracks.get_track_from_path({path=})") return session.query(Tracks).filter(Tracks.path == path).first() @classmethod def get_by_id(cls, session, track_id): """Return track or None""" try: DEBUG(f"Tracks.get_track(track_id={track_id})") track = session.query(Tracks).filter(Tracks.id == track_id).one() return track except NoResultFound: ERROR(f"get_track({track_id}): not found") return None @staticmethod def remove_by_path(session, path): "Remove track with passed path from database" DEBUG(f"Tracks.remove_path({path=})") try: session.query(Tracks).filter(Tracks.path == path).delete() session.commit() except IntegrityError as exception: ERROR(f"Can't remove track with {path=} ({exception=})") @classmethod def search_artists(cls, session, text): return ( session.query(Tracks) .filter(Tracks.artist.ilike(f"%{text}%")) .order_by(Tracks.title) ).all() @classmethod def search_titles(cls, session, text): return ( session.query(Tracks) .filter(Tracks.title.ilike(f"%{text}%")) .order_by(Tracks.title) ).all() def update_lastplayed(self, session): self.lastplayed = datetime.now() session.add(self) session.commit() def update_artist(self, session, artist): self.artist = artist session.add(self) session.commit() def update_title(self, session, title): self.title = title session.add(self) session.commit() def update_path(self, newpath): self.path = newpath