musicmuster/app/models.py
2022-05-02 17:32:29 +01:00

651 lines
20 KiB
Python

#!/usr/bin/python3
import os.path
import re
from dbconfig import Session
from datetime import datetime
from typing import List, Optional
from pydub import AudioSegment
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta
from sqlalchemy import (
Boolean,
Column,
DateTime,
Float,
ForeignKey,
func,
Integer,
String,
UniqueConstraint,
)
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import (
backref,
relationship,
RelationshipProperty
)
from sqlalchemy.orm.collections import attribute_mapped_collection
from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound
from config import Config
from helpers import (
fade_point,
get_audio_segment,
leading_silence,
trailing_silence,
)
from log import DEBUG, ERROR
Base: DeclarativeMeta = declarative_base()
# Database classes
class NoteColours(Base):
__tablename__ = 'notecolours'
id: int = Column(Integer, primary_key=True, autoincrement=True)
substring: str = Column(String(256), index=False)
colour: str = Column(String(21), index=False)
enabled: bool = Column(Boolean, default=True, index=True)
is_regex: bool = Column(Boolean, default=False, index=False)
is_casesensitive: bool = Column(Boolean, default=False, index=False)
order: int = Column(Integer, index=True)
def __init__(
self, session: Session, substring: str, colour: str,
enabled: bool = True, is_regex: bool = False,
is_casesensitive: bool = False, order: int = 0) -> None:
self.substring = substring
self.colour = colour
self.enabled = enabled
self.is_regex = is_regex
self.is_casesensitive = is_casesensitive
self.order = order
session.add(self)
session.flush()
def __repr__(self) -> str:
return (
f"<NoteColour(id={self.id}, substring={self.substring}, "
f"colour={self.colour}>"
)
@classmethod
def get_all(cls, session: Session) -> Optional[List["NoteColours"]]:
"""Return all records"""
return session.query(cls).all()
@classmethod
def get_by_id(cls, session: Session, note_id: int) -> \
Optional["NoteColours"]:
"""Return record identified by id, or None if not found"""
return session.query(NoteColours).filter(
NoteColours.id == note_id).first()
@staticmethod
def get_colour(session: Session, text: str) -> Optional[str]:
"""
Parse text and return colour string if matched, else None
"""
for rec in (
session.query(NoteColours)
.filter(NoteColours.enabled.is_(True))
.order_by(NoteColours.order)
.all()
):
if rec.is_regex:
flags = re.UNICODE
if not rec.is_casesensitive:
flags |= re.IGNORECASE
p = re.compile(rec.substring, flags)
if p.match(text):
return rec.colour
else:
if rec.is_casesensitive:
if rec.substring in text:
return rec.colour
else:
if rec.substring.lower() in text.lower():
return rec.colour
return None
class Notes(Base):
__tablename__ = 'notes'
id: int = Column(Integer, primary_key=True, autoincrement=True)
playlist_id: int = Column(Integer, ForeignKey('playlists.id'))
playlist: RelationshipProperty = relationship(
"Playlists", back_populates="notes", lazy="joined")
row: int = Column(Integer, nullable=False)
note: str = Column(String(256), index=False)
def __init__(self, session: Session, playlist_id: int,
row: int, text: str) -> None:
"""Create note"""
DEBUG(f"Notes.__init__({playlist_id=}, {row=}, {text=})")
self.playlist_id = playlist_id
self.row = row
self.note = text
session.add(self)
session.flush()
def __repr__(self) -> str:
return (
f"<Note(id={self.id}, row={self.row}, note={self.note}>"
)
def delete_note(self, session: Session) -> None:
"""Delete note"""
DEBUG(f"delete_note({self.id=}")
session.query(Notes).filter_by(id=self.id).delete()
session.flush()
@classmethod
def get_by_id(cls, session: Session, note_id: int) -> Optional["Notes"]:
"""Return note or None"""
try:
DEBUG(f"Notes.get_track(track_id={note_id})")
note = session.query(cls).filter(cls.id == note_id).one()
return note
except NoResultFound:
ERROR(f"get_track({note_id}): not found")
return None
def update(
self, session: Session, row: int,
text: Optional[str] = None) -> None:
"""
Update note details. If text=None, don't change text.
"""
DEBUG(f"Notes.update_note({self.id=}, {row=}, {text=})")
self.row = row
if text:
self.note = text
session.flush()
class Playdates(Base):
__tablename__ = 'playdates'
id: int = Column(Integer, primary_key=True, autoincrement=True)
lastplayed: datetime = Column(DateTime, index=True, default=None)
track_id: int = Column(Integer, ForeignKey('tracks.id'))
track: RelationshipProperty = relationship(
"Tracks", back_populates="playdates", lazy="joined")
def __init__(self, session: Session, track_id: int) -> None:
"""Record that track was played"""
DEBUG(f"add_playdate({track_id=})")
self.lastplayed = datetime.now()
self.track_id = track_id
session.add(self)
session.flush()
@staticmethod
def last_played(session: Session, track_id: int) -> Optional[datetime]:
"""Return datetime track last played or None"""
last_played: Optional[Playdates] = session.query(
Playdates.lastplayed).filter(
(Playdates.track_id == track_id)
).order_by(Playdates.lastplayed.desc()).first()
if last_played:
return last_played[0]
else:
return None
@staticmethod
def played_after(session: Session, since: datetime) -> List["Playdates"]:
"""Return a list of Playdates objects since passed time"""
return session.query(Playdates).filter(
Playdates.lastplayed >= since).all()
@staticmethod
def remove_track(session: Session, track_id: int) -> None:
"""
Remove all records of track_id
"""
session.query(Playdates).filter(
Playdates.track_id == track_id).delete()
session.flush()
class Playlists(Base):
"""
Manage playlists
"""
__tablename__ = "playlists"
id: int = Column(Integer, primary_key=True, autoincrement=True)
name: str = Column(String(32), nullable=False, unique=True)
last_used: datetime = Column(DateTime, default=None, nullable=True)
loaded: bool = Column(Boolean, default=True, nullable=False)
notes = relationship(
"Notes", order_by="Notes.row",
back_populates="playlist", lazy="joined"
)
tracks = association_proxy('playlist_tracks', 'tracks')
row = association_proxy('playlist_tracks', 'row')
def __init__(self, session: Session, name: str) -> None:
self.name = name
session.add(self)
session.flush()
def __repr__(self) -> str:
return f"<Playlists(id={self.id}, name={self.name}>"
def add_track(
self, session: Session, track_id: int,
row: Optional[int] = None) -> None:
"""
Add track to playlist at given row.
If row=None, add to end of playlist
"""
if not row:
row = PlaylistTracks.next_free_row(session, self.id)
PlaylistTracks(session, self.id, track_id, row)
def close(self, session: Session) -> None:
"""Record playlist as no longer loaded"""
self.loaded = False
session.add(self)
session.flush()
@classmethod
def get_all(cls, session: Session) -> List["Playlists"]:
"""Returns a list of all playlists ordered by last use"""
return (
session.query(cls).order_by(cls.last_used.desc())
).all()
@classmethod
def get_by_id(cls, session: Session, playlist_id: int) -> "Playlists":
return (session.query(cls).filter(cls.id == playlist_id)).one()
@classmethod
def get_closed(cls, session: Session) -> List["Playlists"]:
"""Returns a list of all closed playlists ordered by last use"""
return (
session.query(cls)
.filter(cls.loaded.is_(False))
.order_by(cls.last_used.desc())
).all()
@classmethod
def get_open(cls, session: Session) -> List["Playlists"]:
"""
Return a list of playlists marked "loaded", ordered by loaded date.
"""
return (
session.query(cls)
.filter(cls.loaded.is_(True))
.order_by(cls.last_used.desc())
).all()
def mark_open(self, session: Session) -> None:
"""Mark playlist as loaded and used now"""
self.loaded = True
self.last_used = datetime.now()
session.flush()
def move_track(
self, session: Session, rows: List[int],
to_playlist: "Playlists") -> None:
"""Move tracks to another playlist"""
for row in rows:
track = self.tracks[row]
to_playlist.add_track(session, track.id)
del self.tracks[row]
session.flush()
def remove_all_tracks(self, session: Session) -> None:
"""
Remove all tracks from this playlist
"""
self.tracks = {}
session.flush()
def remove_track(self, session: Session, row: int) -> None:
DEBUG(f"Playlist.remove_track({self.id=}, {row=})")
# Get tracks collection for this playlist
# Tracks are a dictionary of tracks keyed on row
# number. Remove the relevant row.
del self.tracks[row]
# Save the new tracks collection
session.flush()
class PlaylistTracks(Base):
__tablename__ = 'playlist_tracks'
id: int = Column(Integer, primary_key=True, autoincrement=True)
playlist_id: int = Column(Integer, ForeignKey('playlists.id'),
primary_key=True)
track_id: int = Column(Integer, ForeignKey('tracks.id'), primary_key=True)
row: int = Column(Integer, nullable=False)
tracks: RelationshipProperty = relationship("Tracks")
playlist: RelationshipProperty = relationship(
Playlists,
backref=backref(
"playlist_tracks",
collection_class=attribute_mapped_collection("row"),
lazy="joined",
cascade="all, delete-orphan"
)
)
# Ensure row numbers are unique within each playlist
__table_args__ = (UniqueConstraint
('row', 'playlist_id', name="uniquerow"),
)
def __init__(
self, session: Session, playlist_id: int, track_id: int,
row: int) -> None:
DEBUG(f"PlaylistTracks.__init__({playlist_id=}, {track_id=}, {row=})")
self.playlist_id = playlist_id
self.track_id = track_id
self.row = row
session.add(self)
session.flush()
@staticmethod
def next_free_row(session: Session, playlist_id: int) -> int:
"""Return next free row number"""
row: int
last_row = session.query(
func.max(PlaylistTracks.row)
).filter_by(playlist_id=playlist_id).first()
# if there are no rows, the above returns (None, ) which is True
if last_row and last_row[0] is not None:
row = last_row[0] + 1
else:
row = 0
return row
@staticmethod
def move_rows(
session: Session, rows: List[int], from_playlist_id: int,
to_playlist_id: int) -> None:
"""Move rows between playlists"""
# A constraint deliberately blocks duplicate (playlist_id, row)
# entries in database; however, unallocated rows in the database
# are fine (ie, we can have rows 1, 4, 6 and no 2, 3, 5).
# Unallocated rows will be automatically removed when the
# playlist is saved.
lowest_source_row: int = min(rows)
first_destination_free_row = PlaylistTracks.next_free_row(
session, to_playlist_id)
# Calculate offset that will put the lowest row number being
# moved at the first free row in destination playlist
offset = first_destination_free_row - lowest_source_row
session.query(PlaylistTracks).filter(
PlaylistTracks.playlist_id == from_playlist_id,
PlaylistTracks.row.in_(rows)
).update({'playlist_id': to_playlist_id,
'row': PlaylistTracks.row + offset},
False
)
class Settings(Base):
__tablename__ = 'settings'
id: int = Column(Integer, primary_key=True, autoincrement=True)
name: str = Column(String(32), nullable=False, unique=True)
f_datetime: datetime = Column(DateTime, default=None, nullable=True)
f_int: int = Column(Integer, default=None, nullable=True)
f_string: str = Column(String(128), default=None, nullable=True)
@classmethod
def get_int_settings(cls, session: Session, name: str) -> "Settings":
"""Get setting for an integer or return new setting record"""
int_setting: Settings
try:
int_setting = session.query(cls).filter(
cls.name == name).one()
except NoResultFound:
int_setting = Settings()
int_setting.name = name
int_setting.f_int = None
session.add(int_setting)
session.flush()
return int_setting
def update(self, session: Session, data):
for key, value in data.items():
assert hasattr(self, key)
setattr(self, key, value)
session.flush()
class Tracks(Base):
__tablename__ = 'tracks'
id: int = Column(Integer, primary_key=True, autoincrement=True)
title: str = Column(String(256), index=True)
artist: str = Column(String(256), index=True)
duration: int = Column(Integer, index=True)
start_gap: int = Column(Integer, index=False)
fade_at: int = Column(Integer, index=False)
silence_at: int = Column(Integer, index=False)
path: str = Column(String(2048), index=False, nullable=False)
mtime: float = Column(Float, index=True)
lastplayed: datetime = Column(DateTime, index=True, default=None)
playlists: RelationshipProperty = relationship("PlaylistTracks",
back_populates="tracks",
lazy="joined")
playdates: RelationshipProperty = relationship("Playdates",
back_populates="track"
"",
lazy="joined")
def __init__(
self,
session: Session,
path: str,
title: Optional[str] = None,
artist: Optional[str] = None,
duration: int = 0,
start_gap: int = 0,
fade_at: Optional[int] = None,
silence_at: Optional[int] = None,
mtime: Optional[float] = None,
lastplayed: Optional[datetime] = None,
) -> None:
self.path = path
self.title = title
self.artist = artist
self.duration = duration
self.start_gap = start_gap
self.fade_at = fade_at
self.silence_at = silence_at
self.mtime = mtime
self.lastplayed = lastplayed
session.add(self)
session.flush()
def __repr__(self) -> str:
return (
f"<Track(id={self.id}, title={self.title}, "
f"artist={self.artist}, path={self.path}>"
)
@staticmethod
def get_all_paths(session) -> List[str]:
"""Return a list of paths of all tracks"""
return [a[0] for a in session.query(Tracks.path).all()]
@classmethod
def get_all_tracks(cls, session: Session) -> List["Tracks"]:
"""Return a list of all tracks"""
return session.query(cls).all()
@classmethod
def get_or_create(cls, session: Session, path: str) -> "Tracks":
"""
If a track with path exists, return it;
else created new track and return it
"""
DEBUG(f"Tracks.get_or_create({path=})")
try:
track = session.query(cls).filter(cls.path == path).one()
except NoResultFound:
track = Tracks(session, path)
return track
@classmethod
def get_by_filename(cls, session: Session, filename: str) \
-> Optional["Tracks"]:
"""
Return track if one and only one track in database has passed
filename (ie, basename of path). Return None if zero or more
than one track matches.
"""
DEBUG(f"Tracks.get_track_from_filename({filename=})")
try:
track = session.query(Tracks).filter(Tracks.path.ilike(
f'%{os.path.sep}{filename}')).one()
return track
except (NoResultFound, MultipleResultsFound):
return None
@classmethod
def get_by_path(cls, session: Session, path: str) -> List["Tracks"]:
"""
Return track with passee path, or None.
"""
DEBUG(f"Tracks.get_track_from_path({path=})")
return session.query(Tracks).filter(Tracks.path == path).first()
@classmethod
def get_by_id(cls, session: Session, track_id: int) -> Optional["Tracks"]:
"""Return track or None"""
try:
DEBUG(f"Tracks.get_track(track_id={track_id})")
track = session.query(Tracks).filter(Tracks.id == track_id).one()
return track
except NoResultFound:
ERROR(f"get_track({track_id}): not found")
return None
def rescan(self, session: Session) -> None:
"""
Update audio metadata for passed track.
"""
audio: AudioSegment = get_audio_segment(self.path)
self.duration = len(audio)
self.fade_at = round(fade_point(audio) / 1000,
Config.MILLISECOND_SIGFIGS) * 1000
self.mtime = os.path.getmtime(self.path)
self.silence_at = round(trailing_silence(audio) / 1000,
Config.MILLISECOND_SIGFIGS) * 1000
self.start_gap = leading_silence(audio)
session.add(self)
session.flush()
@staticmethod
def remove_by_path(session: Session, path: str) -> None:
"""Remove track with passed path from database"""
DEBUG(f"Tracks.remove_path({path=})")
try:
session.query(Tracks).filter(Tracks.path == path).delete()
session.flush()
except IntegrityError as exception:
ERROR(f"Can't remove track with {path=} ({exception=})")
@classmethod
def search_artists(cls, session: Session, text: str) -> List["Tracks"]:
return (
session.query(cls)
.filter(cls.artist.ilike(f"%{text}%"))
.order_by(cls.title)
).all()
@classmethod
def search_titles(cls, session: Session, text: str) -> List["Tracks"]:
return (
session.query(cls)
.filter(cls.title.ilike(f"%{text}%"))
.order_by(cls.title)
).all()
@staticmethod
def update_lastplayed(session: Session, track_id: int) -> None:
"""Update the last_played field to current datetime"""
rec = session.query(Tracks).get(track_id)
rec.lastplayed = datetime.now()
session.add(rec)
session.flush()
def update_artist(self, session: Session, artist: str) -> None:
self.artist = artist
session.add(self)
session.flush()
def update_title(self, session: Session, title: str) -> None:
self.title = title
session.add(self)
session.flush()
def update_path(self, session, newpath: str) -> None:
self.path = newpath
session.commit()