musicmuster/app/models.py
Keith Edmunds b7111d8a3b SQLA2: WIP
2022-07-31 21:11:34 +01:00

704 lines
22 KiB
Python

#!/usr/bin/python3
#
# import os.path
# import re
#
from dbconfig import Session
#
from datetime import datetime
from typing import List, Optional
#
# from pydub import AudioSegment
from sqlalchemy.ext.associationproxy import association_proxy
# from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta
from sqlalchemy import (
Boolean,
Column,
DateTime,
Float,
ForeignKey,
# func,
Integer,
String,
UniqueConstraint,
select,
)
# from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import (
backref,
declarative_base,
relationship,
RelationshipProperty
)
from sqlalchemy.orm.collections import attribute_mapped_collection
from sqlalchemy.orm.exc import (
# MultipleResultsFound,
NoResultFound
)
#
# from config import Config
# from helpers import (
# fade_point,
# get_audio_segment,
# leading_silence,
# trailing_silence,
# )
# from log import log.debug, log.error
#
Base = declarative_base()
#
#
# Database classes
class NoteColours(Base):
__tablename__ = 'notecolours'
id = Column(Integer, primary_key=True, autoincrement=True)
substring = Column(String(256), index=False)
colour = Column(String(21), index=False)
enabled = Column(Boolean, default=True, index=True)
is_regex = Column(Boolean, default=False, index=False)
is_casesensitive = Column(Boolean, default=False, index=False)
order = Column(Integer, index=True)
def __repr__(self) -> str:
return (
f"<NoteColour(id={self.id}, substring={self.substring}, "
f"colour={self.colour}>"
)
#
# def __init__(
# self, session: Session, substring: str, colour: str,
# enabled: bool = True, is_regex: bool = False,
# is_casesensitive: bool = False, order: int = 0) -> None:
# self.substring = substring
# self.colour = colour
# self.enabled = enabled
# self.is_regex = is_regex
# self.is_casesensitive = is_casesensitive
# self.order = order
#
# session.add(self)
# session.flush()
#
# @classmethod
# def get_all(cls, session: Session) -> Optional[List["NoteColours"]]:
# """Return all records"""
#
# return session.query(cls).all()
#
# @classmethod
# def get_by_id(cls, session: Session, note_id: int) -> \
# Optional["NoteColours"]:
# """Return record identified by id, or None if not found"""
#
# return session.query(NoteColours).filter(
# NoteColours.id == note_id).first()
#
# @staticmethod
# def get_colour(session: Session, text: str) -> Optional[str]:
# """
# Parse text and return colour string if matched, else None
# """
#
# for rec in (
# session.query(NoteColours)
# .filter(NoteColours.enabled.is_(True))
# .order_by(NoteColours.order)
# .all()
# ):
# if rec.is_regex:
# flags = re.UNICODE
# if not rec.is_casesensitive:
# flags |= re.IGNORECASE
# p = re.compile(rec.substring, flags)
# if p.match(text):
# return rec.colour
# else:
# if rec.is_casesensitive:
# if rec.substring in text:
# return rec.colour
# else:
# if rec.substring.lower() in text.lower():
# return rec.colour
#
# return None
#
#
#class Notes(Base):
# __tablename__ = 'notes'
#
# id: int = Column(Integer, primary_key=True, autoincrement=True)
# playlist_id: int = Column(Integer, ForeignKey('playlists.id'))
# playlist: RelationshipProperty = relationship(
# "Playlists", back_populates="notes", lazy="joined")
# row: int = Column(Integer, nullable=False)
# note: str = Column(String(256), index=False)
#
# def __init__(self, session: Session, playlist_id: int,
# row: int, text: str) -> None:
# """Create note"""
#
# log.debug(f"Notes.__init__({playlist_id=}, {row=}, {text=})")
# self.playlist_id = playlist_id
# self.row = row
# self.note = text
# session.add(self)
# session.flush()
#
# def __repr__(self) -> str:
# return (
# f"<Note(id={self.id}, row={self.row}, note={self.note}>"
# )
#
# def delete_note(self, session: Session) -> None:
# """Delete note"""
#
# log.debug(f"delete_note({self.id=}")
#
# session.query(Notes).filter_by(id=self.id).delete()
# session.flush()
#
# @staticmethod
# def max_used_row(session: Session, playlist_id: int) -> Optional[int]:
# """
# Return maximum notes row for passed playlist ID or None if not notes
# """
#
# last_row = session.query(func.max(Notes.row)).filter_by(
# playlist_id=playlist_id).first()
# # if there are no rows, the above returns (None, ) which is True
# if last_row and last_row[0] is not None:
# return last_row[0]
# else:
# return None
#
# def move_row(self, session: Session, row: int, to_playlist_id: int) \
# -> None:
# """
# Move note to another playlist
# """
#
# self.row = row
# self.playlist_id = to_playlist_id
# session.commit()
#
# @classmethod
# def get_by_id(cls, session: Session, note_id: int) -> Optional["Notes"]:
# """Return note or None"""
#
# try:
# log.debug(f"Notes.get_track(track_id={note_id})")
# note = session.query(cls).filter(cls.id == note_id).one()
# return note
# except NoResultFound:
# log.error(f"get_track({note_id}): not found")
# return None
#
# def update(
# self, session: Session, row: int,
# text: Optional[str] = None) -> None:
# """
# Update note details. If text=None, don't change text.
# """
#
# log.debug(f"Notes.update_note({self.id=}, {row=}, {text=})")
#
# self.row = row
# if text:
# self.note = text
# session.flush()
class Playdates(Base):
__tablename__ = 'playdates'
id: int = Column(Integer, primary_key=True, autoincrement=True)
lastplayed = Column(DateTime, index=True, default=None)
track_id = Column(Integer, ForeignKey('tracks.id'))
track = relationship("Tracks", back_populates="playdates")
def __repr__(self) -> str:
return (
f"<Playdates(id={self.id}, track_id={self.track_id} "
f"lastplayed={self.lastplayed}>"
)
#
# def __init__(self, session: Session, track_id: int) -> None:
# """Record that track was played"""
#
# log.debug(f"add_playdate({track_id=})")
#
# self.lastplayed = datetime.now()
# self.track_id = track_id
# session.add(self)
# session.flush()
#
# @staticmethod
# def last_played(session: Session, track_id: int) -> Optional[datetime]:
# """Return datetime track last played or None"""
#
# last_played: Optional[Playdates] = session.query(
# Playdates.lastplayed).filter(
# (Playdates.track_id == track_id)
# ).order_by(Playdates.lastplayed.desc()).first()
# if last_played:
# return last_played[0]
# else:
# return None
#
# @staticmethod
# def played_after(session: Session, since: datetime) -> List["Playdates"]:
# """Return a list of Playdates objects since passed time"""
#
# return session.query(Playdates).filter(
# Playdates.lastplayed >= since).all()
#
# @staticmethod
# def remove_track(session: Session, track_id: int) -> None:
# """
# Remove all records of track_id
# """
#
# session.query(Playdates).filter(
# Playdates.track_id == track_id).delete()
# session.flush()
class Playlists(Base):
"""
Manage playlists
"""
__tablename__ = "playlists"
id: int = Column(Integer, primary_key=True, autoincrement=True)
name: str = Column(String(32), nullable=False, unique=True)
last_used = Column(DateTime, default=None, nullable=True)
loaded: bool = Column(Boolean, default=True, nullable=False)
rows = relationship(
"PlaylistRows",
back_populates="playlist",
cascade="all, delete-orphan",
order_by="PlaylistRows.row_number"
)
def __repr__(self) -> str:
return f"<Playlists(id={self.id}, name={self.name}>"
#
# def __init__(self, session: Session, name: str) -> None:
# self.name = name
# session.add(self)
# session.flush()
#
# def add_track(
# self, session: Session, track_id: int,
# row: Optional[int] = None) -> None:
# """
# Add track to playlist at given row.
# If row=None, add to end of playlist
# """
#
# if row is None:
# row = self.next_free_row(session, self.id)
#
# xPlaylistTracks(session, self.id, track_id, row)
#
# def close(self, session: Session) -> None:
# """Record playlist as no longer loaded"""
#
# self.loaded = False
# session.add(self)
# session.flush()
#
# @classmethod
# def get_all(cls, session: Session) -> List["Playlists"]:
# """Returns a list of all playlists ordered by last use"""
#
# return (
# session.query(cls).order_by(cls.last_used.desc())
# ).all()
#
# @classmethod
# def get_closed(cls, session: Session) -> List["Playlists"]:
# """Returns a list of all closed playlists ordered by last use"""
#
# return (
# session.query(cls)
# .filter(cls.loaded.is_(False))
# .order_by(cls.last_used.desc())
# ).all()
@classmethod
def get_open(cls, session: Session) -> List[Optional["Playlists"]]:
"""
Return a list of playlists marked "loaded", ordered by loaded date.
"""
return (
session.execute(
select(cls)
.where(cls.loaded.is_(True))
.order_by(cls.last_used.desc())
)
.scalars()
.all()
)
def mark_open(self, session: Session) -> None:
"""Mark playlist as loaded and used now"""
self.loaded = True
self.last_used = datetime.now()
# session.flush()
#
# @staticmethod
# def next_free_row(session: Session, playlist_id: int) -> int:
# """Return next free row for this playlist"""
#
# max_notes_row = Notes.max_used_row(session, playlist_id)
# max_tracks_row = xPlaylistTracks.max_used_row(session, playlist_id)
#
# if max_notes_row is not None and max_tracks_row is not None:
# return max(max_notes_row, max_tracks_row) + 1
#
# if max_notes_row is None and max_tracks_row is None:
# return 0
#
# if max_notes_row is None:
# return max_tracks_row + 1
# else:
# return max_notes_row + 1
#
# def remove_all_tracks(self, session: Session) -> None:
# """
# Remove all tracks from this playlist
# """
#
# self.tracks = {}
# session.flush()
#
# def remove_track(self, session: Session, row: int) -> None:
# log.debug(f"Playlist.remove_track({self.id=}, {row=})")
#
# # Refresh self first (this is necessary when calling remove_track
# # multiple times before session.commit())
# session.refresh(self)
# # Get tracks collection for this playlist
# # Tracks are a dictionary of tracks keyed on row
# # number. Remove the relevant row.
# del self.tracks[row]
# # Save the new tracks collection
# session.flush()
#
#
# class PlaylistTracks(Base):
# __tablename__ = 'playlist_tracks'
#
# id: int = Column(Integer, primary_key=True, autoincrement=True)
# playlist_id: int = Column(Integer, ForeignKey('playlists.id'),
# primary_key=True)
# track_id: int = Column(Integer, ForeignKey('tracks.id'), primary_key=True)
# row: int = Column(Integer, nullable=False)
# tracks: RelationshipProperty = relationship("Tracks")
# playlist: RelationshipProperty = relationship(
# Playlists,
# backref=backref(
# "playlist_tracks",
# collection_class=attribute_mapped_collection("row"),
# lazy="joined",
# cascade="all, delete-orphan"
# )
# )
class PlaylistRows(Base):
__tablename__ = 'playlist_rows'
id = Column(Integer, primary_key=True, autoincrement=True)
row_number = Column(Integer, nullable=False)
note = Column(String(2048), index=False)
playlist_id = Column(Integer, ForeignKey('playlists.id'), nullable=False)
playlist = relationship(Playlists, back_populates="rows")
track_id = Column(Integer, ForeignKey('tracks.id'), nullable=True)
track = relationship("Tracks", back_populates="playlistrows")
# Ensure row numbers are unique within each playlist
__table_args__ = (UniqueConstraint
('row_number', 'playlist_id', name="uniquerow"),
)
def __repr__(self) -> str:
return (
f"<PlaylistRow(id={self.id}, playlist_id={self.playlist_id}, "
f"track_id={self.track_id}, "
f"note={self.note} row_number={self.row_number}>"
)
# def __init__(
# self, session: Session, playlist_id: int, track_id: int,
# row: int) -> None:
# log.debug(f"xPlaylistTracks.__init__({playlist_id=}, {track_id=}, {row=})")
#
# self.playlist_id = playlist_id
# self.track_id = track_id
# self.row = row
# session.add(self)
# session.flush()
#
# @staticmethod
# def max_used_row(session: Session, playlist_id: int) -> Optional[int]:
# """
# Return highest track row number used or None if there are no
# tracks
# """
#
# last_row = session.query(
# func.max(xPlaylistTracks.row)
# ).filter_by(playlist_id=playlist_id).first()
# # if there are no rows, the above returns (None, ) which is True
# if last_row and last_row[0] is not None:
# return last_row[0]
# else:
# return None
#
# @staticmethod
# def move_row(session: Session, from_row: int, from_playlist_id: int,
# to_row: int, to_playlist_id: int) -> None:
# """Move row to another playlist"""
#
# session.query(xPlaylistTracks).filter(
# xPlaylistTracks.playlist_id == from_playlist_id,
# xPlaylistTracks.row == from_row).update(
# {'playlist_id': to_playlist_id, 'row': to_row}, False)
class Settings(Base):
"""Manage settings"""
__tablename__ = 'settings'
id: int = Column(Integer, primary_key=True, autoincrement=True)
name: str = Column(String(64), nullable=False, unique=True)
f_datetime = Column(DateTime, default=None, nullable=True)
f_int: int = Column(Integer, default=None, nullable=True)
f_string = Column(String(128), default=None, nullable=True)
def __repr__(self) -> str:
value = self.f_datetime or self.f_int or self.f_string
return f"<Settings(id={self.id}, name={self.name}, {value=}>"
@classmethod
def get_int_settings(cls, session: Session, name: str) -> "Settings":
"""Get setting for an integer or return new setting record"""
int_setting: Settings
try:
int_setting = session.execute(
select(cls)
.where(cls.name == name)
).scalar_one()
except NoResultFound:
int_setting = Settings()
int_setting.name = name
int_setting.f_int = None
session.add(int_setting)
return int_setting
def update(self, session: Session, data):
for key, value in data.items():
assert hasattr(self, key)
setattr(self, key, value)
session.flush()
class Tracks(Base):
__tablename__ = 'tracks'
id: int = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String(256), index=True)
artist = Column(String(256), index=True)
duration = Column(Integer, index=True)
start_gap = Column(Integer, index=False)
fade_at = Column(Integer, index=False)
silence_at = Column(Integer, index=False)
path = Column(String(2048), index=False, nullable=False)
mtime = Column(Float, index=True)
# lastplayed = Column(DateTime, index=True, default=None)
playlistrows = relationship("PlaylistRows", back_populates="track")
playlists = association_proxy("playlistrows", "playlist")
playdates = relationship("Playdates", back_populates="track")
def __repr__(self) -> str:
return (
f"<Track(id={self.id}, title={self.title}, "
f"artist={self.artist}, path={self.path}>"
)
#
#
# def __init__(
# self,
# session: Session,
# path: str,
# title: Optional[str] = None,
# artist: Optional[str] = None,
# duration: int = 0,
# start_gap: int = 0,
# fade_at: Optional[int] = None,
# silence_at: Optional[int] = None,
# mtime: Optional[float] = None,
# lastplayed: Optional[datetime] = None,
# ) -> None:
# self.path = path
# self.title = title
# self.artist = artist
# self.duration = duration
# self.start_gap = start_gap
# self.fade_at = fade_at
# self.silence_at = silence_at
# self.mtime = mtime
# self.lastplayed = lastplayed
#
# session.add(self)
# session.flush()
#
# @staticmethod
# def get_all_paths(session) -> List[str]:
# """Return a list of paths of all tracks"""
#
# return [a[0] for a in session.query(Tracks.path).all()]
#
# @classmethod
# def get_all_tracks(cls, session: Session) -> List["Tracks"]:
# """Return a list of all tracks"""
#
# return session.query(cls).all()
#
# @classmethod
# def get_or_create(cls, session: Session, path: str) -> "Tracks":
# """
# If a track with path exists, return it;
# else created new track and return it
# """
#
# log.debug(f"Tracks.get_or_create({path=})")
#
# try:
# track = session.query(cls).filter(cls.path == path).one()
# except NoResultFound:
# track = Tracks(session, path)
#
# return track
#
# @classmethod
# def get_by_filename(cls, session: Session, filename: str) \
# -> Optional["Tracks"]:
# """
# Return track if one and only one track in database has passed
# filename (ie, basename of path). Return None if zero or more
# than one track matches.
# """
#
# log.debug(f"Tracks.get_track_from_filename({filename=})")
# try:
# track = session.query(Tracks).filter(Tracks.path.ilike(
# f'%{os.path.sep}{filename}')).one()
# return track
# except (NoResultFound, MultipleResultsFound):
# return None
#
# @classmethod
# def get_by_path(cls, session: Session, path: str) -> List["Tracks"]:
# """
# Return track with passee path, or None.
# """
#
# log.debug(f"Tracks.get_track_from_path({path=})")
#
# return session.query(Tracks).filter(Tracks.path == path).first()
#
# @classmethod
# def get_by_id(cls, session: Session, track_id: int) -> Optional["Tracks"]:
# """Return track or None"""
#
# try:
# log.debug(f"Tracks.get_track(track_id={track_id})")
# track = session.query(Tracks).filter(Tracks.id == track_id).one()
# return track
# except NoResultFound:
# log.error(f"get_track({track_id}): not found")
# return None
#
# def rescan(self, session: Session) -> None:
# """
# Update audio metadata for passed track.
# """
#
# audio: AudioSegment = get_audio_segment(self.path)
# self.duration = len(audio)
# self.fade_at = round(fade_point(audio) / 1000,
# Config.MILLISECOND_SIGFIGS) * 1000
# self.mtime = os.path.getmtime(self.path)
# self.silence_at = round(trailing_silence(audio) / 1000,
# Config.MILLISECOND_SIGFIGS) * 1000
# self.start_gap = leading_silence(audio)
# session.add(self)
# session.flush()
#
# @staticmethod
# def remove_by_path(session: Session, path: str) -> None:
# """Remove track with passed path from database"""
#
# log.debug(f"Tracks.remove_path({path=})")
#
# try:
# session.query(Tracks).filter(Tracks.path == path).delete()
# session.flush()
# except IntegrityError as exception:
# log.error(f"Can't remove track with {path=} ({exception=})")
#
# @classmethod
# def search_artists(cls, session: Session, text: str) -> List["Tracks"]:
#
# return (
# session.query(cls)
# .filter(cls.artist.ilike(f"%{text}%"))
# .order_by(cls.title)
# ).all()
#
# @classmethod
# def search_titles(cls, session: Session, text: str) -> List["Tracks"]:
# return (
# session.query(cls)
# .filter(cls.title.ilike(f"%{text}%"))
# .order_by(cls.title)
# ).all()
#
# @staticmethod
# def update_lastplayed(session: Session, track_id: int) -> None:
# """Update the last_played field to current datetime"""
#
# rec = session.query(Tracks).get(track_id)
# rec.lastplayed = datetime.now()
# session.add(rec)
# session.flush()
#
# def update_artist(self, session: Session, artist: str) -> None:
# self.artist = artist
# session.add(self)
# session.flush()
#
# def update_title(self, session: Session, title: str) -> None:
# self.title = title
# session.add(self)
# session.flush()
#
# def update_path(self, session, newpath: str) -> None:
# self.path = newpath
# session.commit()