musicmuster/app/models.py
2025-03-09 19:23:55 +00:00

874 lines
25 KiB
Python

# Standard library imports
from __future__ import annotations
from typing import Optional, Sequence
import datetime as dt
import os
import re
import sys
# PyQt imports
# Third party imports
from dogpile.cache import make_region
from dogpile.cache.api import NO_VALUE
from sqlalchemy import (
bindparam,
delete,
func,
select,
text,
update,
)
from sqlalchemy.exc import IntegrityError, ProgrammingError
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm import joinedload, selectinload
from sqlalchemy.orm.session import Session
from sqlalchemy.engine.row import RowMapping
# App imports
from classes import ApplicationError, Filter
from config import Config
from dbmanager import DatabaseManager
import dbtables
from log import log
# Establish database connection
DATABASE_URL = os.environ.get("DATABASE_URL")
if DATABASE_URL is None:
raise ValueError("DATABASE_URL is undefined")
if "unittest" in sys.modules and "sqlite" not in DATABASE_URL:
raise ValueError("Unit tests running on non-Sqlite database")
db = DatabaseManager.get_instance(DATABASE_URL, engine_options=Config.ENGINE_OPTIONS).db
# Configure the cache region
cache_region = make_region().configure(
'dogpile.cache.memory', # Use in-memory caching for now (switch to Redis if needed)
expiration_time=600 # Cache expires after 10 minutes
)
def run_sql(session: Session, sql: str) -> Sequence[RowMapping]:
"""
Run a sql string and return results
"""
try:
return session.execute(text(sql)).mappings().all()
except ProgrammingError as e:
raise ApplicationError(e)
# Database classes
class NoteColours(dbtables.NoteColoursTable):
def __init__(
self,
session: Session,
substring: str,
colour: str,
enabled: bool = True,
is_regex: bool = False,
is_casesensitive: bool = False,
order: Optional[int] = 0,
) -> None:
self.substring = substring
self.colour = colour
self.enabled = enabled
self.is_regex = is_regex
self.is_casesensitive = is_casesensitive
self.order = order
session.add(self)
session.commit()
@classmethod
def get_all(cls, session: Session) -> Sequence["NoteColours"]:
"""
Return all records
"""
cache_key = "note_colours_all"
cached_result = cache_region.get(cache_key)
if cached_result is not NO_VALUE:
return cached_result
# Query the database
result = session.scalars(
select(cls)
.where(
cls.enabled.is_(True),
)
.order_by(cls.order)
).all()
cache_region.set(cache_key, result)
return result
@staticmethod
def get_colour(
session: Session, text: str, foreground: bool = False
) -> str:
"""
Parse text and return background (foreground if foreground==True) colour
string if matched, else None
"""
if not text:
return ""
match = False
for rec in NoteColours.get_all(session):
if rec.is_regex:
flags = re.UNICODE
if not rec.is_casesensitive:
flags |= re.IGNORECASE
p = re.compile(rec.substring, flags)
if p.match(text):
match = True
else:
if rec.is_casesensitive:
if rec.substring in text:
match = True
else:
if rec.substring.lower() in text.lower():
match = True
if match:
if foreground:
return rec.foreground or ""
else:
return rec.colour
return ""
@staticmethod
def invalidate_cache() -> None:
"""Invalidate dogpile cache"""
cache_region.delete("note_colours_all")
class Playdates(dbtables.PlaydatesTable):
def __init__(
self, session: Session, track_id: int, when: Optional[dt.datetime] = None
) -> None:
"""Record that track was played"""
if not when:
self.lastplayed = dt.datetime.now()
else:
self.lastplayed = when
self.track_id = track_id
session.add(self)
session.commit()
@staticmethod
def last_playdates(
session: Session, track_id: int, limit: int = 5
) -> Sequence["Playdates"]:
"""
Return a list of the last limit playdates for this track, sorted
latest to earliest.
"""
return session.scalars(
Playdates.select()
.where(Playdates.track_id == track_id)
.order_by(Playdates.lastplayed.desc())
.limit(limit)
).all()
@staticmethod
def last_played(session: Session, track_id: int) -> dt.datetime:
"""Return datetime track last played or None"""
last_played = session.execute(
select(Playdates.lastplayed)
.where(Playdates.track_id == track_id)
.order_by(Playdates.lastplayed.desc())
.limit(1)
).first()
if last_played:
return last_played[0]
else:
# Should never be reached as we create record with a
# last_played value
return Config.EPOCH # pragma: no cover
@staticmethod
def last_played_tracks(session: Session, limit: int = 5) -> Sequence["Playdates"]:
"""
Return a list of the last limit tracks played, sorted
earliest to latest.
"""
return session.scalars(
Playdates.select().order_by(Playdates.lastplayed.desc()).limit(limit)
).all()
@staticmethod
def played_after(session: Session, since: dt.datetime) -> Sequence["Playdates"]:
"""Return a list of Playdates objects since passed time"""
return session.scalars(
select(Playdates)
.where(Playdates.lastplayed >= since)
.order_by(Playdates.lastplayed)
).all()
class Playlists(dbtables.PlaylistsTable):
def __init__(self, session: Session, name: str, template_id: int) -> None:
"""Create playlist with passed name"""
self.name = name
self.last_used = dt.datetime.now()
session.add(self)
session.commit()
# If a template is specified, copy from it
if template_id:
PlaylistRows.copy_playlist(session, template_id, self.id)
@staticmethod
def clear_tabs(session: Session, playlist_ids: list[int]) -> None:
"""
Make all tab records NULL
"""
session.execute(
update(Playlists).where((Playlists.id.in_(playlist_ids))).values(tab=None)
)
def close(self, session: Session) -> None:
"""Mark playlist as unloaded"""
self.open = False
session.commit()
@classmethod
def get_all(cls, session: Session) -> Sequence["Playlists"]:
"""Returns a list of all playlists ordered by last use"""
return session.scalars(
select(cls)
.filter(cls.is_template.is_(False))
.order_by(cls.last_used.desc())
).all()
@classmethod
def get_all_templates(cls, session: Session) -> Sequence["Playlists"]:
"""Returns a list of all templates ordered by name"""
return session.scalars(
select(cls).where(cls.is_template.is_(True)).order_by(cls.name)
).all()
@classmethod
def get_favourite_templates(cls, session: Session) -> Sequence["Playlists"]:
"""Returns a list of favourite templates ordered by name"""
return session.scalars(
select(cls)
.where(cls.is_template.is_(True), cls.favourite.is_(True))
.order_by(cls.name)
).all()
@classmethod
def get_closed(cls, session: Session) -> Sequence["Playlists"]:
"""Returns a list of all closed playlists ordered by last use"""
return session.scalars(
select(cls)
.filter(
cls.open.is_(False),
cls.is_template.is_(False),
)
.order_by(cls.last_used.desc())
).all()
@classmethod
def get_open(cls, session: Session) -> Sequence[Optional["Playlists"]]:
"""
Return a list of loaded playlists ordered by tab.
"""
return session.scalars(
select(cls).where(cls.open.is_(True)).order_by(cls.tab)
).all()
def mark_open(self) -> None:
"""Mark playlist as loaded and used now"""
self.open = True
self.last_used = dt.datetime.now()
@staticmethod
def name_is_available(session: Session, name: str) -> bool:
"""
Return True if no playlist of this name exists else false.
"""
return (
session.execute(select(Playlists).where(Playlists.name == name)).first()
is None
)
def rename(self, session: Session, new_name: str) -> None:
"""
Rename playlist
"""
self.name = new_name
session.commit()
@staticmethod
def save_as_template(
session: Session, playlist_id: int, template_name: str
) -> None:
"""Save passed playlist as new template"""
template = Playlists(session, template_name, template_id=0)
if not template or not template.id:
return
template.is_template = True
session.commit()
PlaylistRows.copy_playlist(session, playlist_id, template.id)
class PlaylistRows(dbtables.PlaylistRowsTable):
def __init__(
self,
session: Session,
playlist_id: int,
row_number: int,
note: str = "",
track_id: Optional[int] = None,
) -> None:
"""Create PlaylistRows object"""
self.playlist_id = playlist_id
self.track_id = track_id
self.row_number = row_number
self.note = note
session.add(self)
session.commit()
def append_note(self, extra_note: str) -> None:
"""Append passed note to any existing note"""
current_note = self.note
if current_note:
self.note = current_note + "\n" + extra_note
else:
self.note = extra_note
@staticmethod
def copy_playlist(session: Session, src_id: int, dst_id: int) -> None:
"""Copy playlist entries"""
src_rows = session.scalars(
select(PlaylistRows).filter(PlaylistRows.playlist_id == src_id)
).all()
for plr in src_rows:
PlaylistRows(
session=session,
playlist_id=dst_id,
row_number=plr.row_number,
note=plr.note,
track_id=plr.track_id,
)
@classmethod
def deep_row(
cls, session: Session, playlist_id: int, row_number: int
) -> "PlaylistRows":
"""
Return a playlist row that includes full track and lastplayed data for
given playlist_id and row
"""
stmt = (
select(PlaylistRows)
.options(joinedload(cls.track))
.where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.row_number == row_number,
)
# .options(joinedload(Tracks.playdates))
)
return session.execute(stmt).unique().scalar_one()
@staticmethod
def delete_higher_rows(session: Session, playlist_id: int, maxrow: int) -> None:
"""
Delete rows in given playlist that have a higher row number
than 'maxrow'
"""
session.execute(
delete(PlaylistRows).where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.row_number > maxrow,
)
)
session.commit()
@staticmethod
def delete_row(session: Session, playlist_id: int, row_number: int) -> None:
"""
Delete passed row in given playlist.
"""
session.execute(
delete(PlaylistRows).where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.row_number == row_number,
)
)
@staticmethod
def fixup_rownumbers(session: Session, playlist_id: int) -> None:
"""
Ensure the row numbers for passed playlist have no gaps
"""
plrs = session.scalars(
select(PlaylistRows)
.where(PlaylistRows.playlist_id == playlist_id)
.order_by(PlaylistRows.row_number)
).all()
for i, plr in enumerate(plrs):
plr.row_number = i
# Ensure new row numbers are available to the caller
session.commit()
@classmethod
def plrids_to_plrs(
cls, session: Session, playlist_id: int, plr_ids: list[int]
) -> Sequence["PlaylistRows"]:
"""
Take a list of PlaylistRows ids and return a list of corresponding
PlaylistRows objects
"""
plrs = session.scalars(
select(cls)
.where(cls.playlist_id == playlist_id, cls.id.in_(plr_ids))
.order_by(cls.row_number)
).all()
return plrs
@staticmethod
def get_last_used_row(session: Session, playlist_id: int) -> Optional[int]:
"""Return the last used row for playlist, or None if no rows"""
return session.execute(
select(func.max(PlaylistRows.row_number)).where(
PlaylistRows.playlist_id == playlist_id
)
).scalar_one()
@staticmethod
def get_track_plr(
session: Session, track_id: int, playlist_id: int
) -> Optional["PlaylistRows"]:
"""Return first matching PlaylistRows object or None"""
return session.scalars(
select(PlaylistRows)
.where(
PlaylistRows.track_id == track_id,
PlaylistRows.playlist_id == playlist_id,
)
.limit(1)
).first()
@classmethod
def get_played_rows(
cls, session: Session, playlist_id: int
) -> Sequence["PlaylistRows"]:
"""
For passed playlist, return a list of rows that
have been played.
"""
plrs = session.scalars(
select(cls)
.where(cls.playlist_id == playlist_id, cls.played.is_(True))
.order_by(cls.row_number)
).all()
return plrs
@classmethod
def get_playlist_rows(
cls, session: Session, playlist_id: int
) -> Sequence["PlaylistRows"]:
"""
For passed playlist, return a list of rows.
"""
stmt = (
select(cls)
.where(cls.playlist_id == playlist_id)
.options(selectinload(cls.track))
.order_by(cls.row_number)
)
plrs = session.execute(stmt).scalars().all()
return plrs
@classmethod
def get_rows_with_tracks(
cls,
session: Session,
playlist_id: int,
) -> Sequence["PlaylistRows"]:
"""
For passed playlist, return a list of rows that
contain tracks
"""
query = select(cls).where(
cls.playlist_id == playlist_id, cls.track_id.is_not(None)
)
plrs = session.scalars((query).order_by(cls.row_number)).all()
return plrs
@classmethod
def get_unplayed_rows(
cls, session: Session, playlist_id: int
) -> Sequence["PlaylistRows"]:
"""
For passed playlist, return a list of playlist rows that
have not been played.
"""
plrs = session.scalars(
select(cls)
.where(
cls.playlist_id == playlist_id,
cls.track_id.is_not(None),
cls.played.is_(False),
)
.order_by(cls.row_number)
).all()
return plrs
@classmethod
def insert_row(
cls,
session: Session,
playlist_id: int,
new_row_number: int,
note: str = "",
track_id: Optional[int] = None,
) -> "PlaylistRows":
cls.move_rows_down(session, playlist_id, new_row_number, 1)
return cls(
session,
playlist_id=playlist_id,
row_number=new_row_number,
note=note,
track_id=track_id,
)
@staticmethod
def move_rows_down(
session: Session, playlist_id: int, starting_row: int, move_by: int
) -> None:
"""
Create space to insert move_by additional rows by incremented row
number from starting_row to end of playlist
"""
log.debug(f"(move_rows_down({playlist_id=}, {starting_row=}, {move_by=}")
session.execute(
update(PlaylistRows)
.where(
(PlaylistRows.playlist_id == playlist_id),
(PlaylistRows.row_number >= starting_row),
)
.values(row_number=PlaylistRows.row_number + move_by)
)
@staticmethod
def update_plr_row_numbers(
session: Session,
playlist_id: int,
sqla_map: list[dict[str, int]],
) -> None:
"""
Take a {plrid: row_number} dictionary and update the row numbers accordingly
"""
# Update database. Ref:
# https://docs.sqlalchemy.org/en/20/tutorial/data_update.html#the-update-sql-expression-construct
stmt = (
update(PlaylistRows)
.where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.id == bindparam("playlistrow_id"),
)
.values(row_number=bindparam("row_number"))
)
session.connection().execute(stmt, sqla_map)
class Queries(dbtables.QueriesTable):
def __init__(
self,
session: Session,
name: str,
filter: dbtables.Filter,
favourite: bool = False,
) -> None:
"""Create new query"""
self.name = name
self.filter = filter
self.favourite = favourite
session.add(self)
session.commit()
@classmethod
def get_all(cls, session: Session) -> Sequence["Queries"]:
"""Returns a list of all queries ordered by name"""
return session.scalars(select(cls).order_by(cls.name)).all()
@classmethod
def get_favourites(cls, session: Session) -> Sequence["Queries"]:
"""Returns a list of favourite queries ordered by name"""
return session.scalars(
select(cls).where(cls.favourite.is_(True)).order_by(cls.name)
).all()
class Settings(dbtables.SettingsTable):
def __init__(self, session: Session, name: str) -> None:
self.name = name
session.add(self)
session.commit()
@classmethod
def get_setting(cls, session: Session, name: str) -> "Settings":
"""Get existing setting or return new setting record"""
try:
return session.execute(select(cls).where(cls.name == name)).scalar_one()
except NoResultFound:
return Settings(session, name)
class Tracks(dbtables.TracksTable):
def __init__(
self,
session: Session,
path: str,
title: str,
artist: str,
duration: int,
start_gap: int,
fade_at: int,
silence_at: int,
bitrate: int,
) -> None:
self.path = path
self.title = title
self.artist = artist
self.bitrate = bitrate
self.duration = duration
self.start_gap = start_gap
self.fade_at = fade_at
self.silence_at = silence_at
try:
session.add(self)
session.commit()
except IntegrityError as error:
session.rollback()
log.error(f"Error ({error=}) importing track ({path=})")
raise ValueError(error)
@classmethod
def get_all(cls, session: Session) -> Sequence["Tracks"]:
"""Return a list of all tracks"""
return session.scalars(select(cls)).unique().all()
@classmethod
def all_tracks_indexed_by_id(cls, session: Session) -> dict[int, Tracks]:
"""
Return a dictionary of all tracks, keyed by title
"""
result: dict[int, Tracks] = {}
for track in cls.get_all(session):
result[track.id] = track
return result
@classmethod
def exact_title_and_artist(
cls, session: Session, title: str, artist: str
) -> Sequence["Tracks"]:
"""
Search for exact but case-insensitive match of title and artist
"""
return (
session.scalars(
select(cls)
.where(cls.title.ilike(title), cls.artist.ilike(artist))
.order_by(cls.title)
)
.unique()
.all()
)
@classmethod
def get_filtered_tracks(
cls, session: Session, filter: Filter
) -> Sequence["Tracks"]:
"""
Return tracks matching filter
"""
query = select(cls)
# Path specification
if filter.path:
if filter.path_type == "contains":
query = query.where(cls.path.ilike(f"%{filter.path}%"))
elif filter.path_type == "excluding":
query = query.where(cls.path.notilike(f"%{filter.path}%"))
else:
raise ApplicationError(f"Can't process filter path ({filter=})")
# Duration specification
seconds_duration = filter.duration_number
if filter.duration_unit == Config.FILTER_DURATION_MINUTES:
seconds_duration *= 60
elif filter.duration_unit != Config.FILTER_DURATION_SECONDS:
raise ApplicationError(f"Can't process filter duration ({filter=})")
if filter.duration_type == Config.FILTER_DURATION_LONGER:
query = query.where(cls.duration >= seconds_duration)
elif filter.duration_unit == Config.FILTER_DURATION_SHORTER:
query = query.where(cls.duration <= seconds_duration)
else:
raise ApplicationError(f"Can't process filter duration type ({filter=})")
# Process comparator
if filter.last_played_comparator == Config.FILTER_PLAYED_COMPARATOR_NEVER:
# Select tracks that have never been played
query = query.outerjoin(Playdates, cls.id == Playdates.track_id).where(
Playdates.id.is_(None)
)
else:
# Last played specification
now = dt.datetime.now()
# Set sensible default, and correct for Config.FILTER_PLAYED_COMPARATOR_ANYTIME
before = now
# If not ANYTIME, set 'before' appropriates
if filter.last_played_comparator != Config.FILTER_PLAYED_COMPARATOR_ANYTIME:
if filter.last_played_unit == Config.FILTER_PLAYED_DAYS:
before = now - dt.timedelta(days=filter.last_played_number)
elif filter.last_played_unit == Config.FILTER_PLAYED_WEEKS:
before = now - dt.timedelta(days=7 * filter.last_played_number)
elif filter.last_played_unit == Config.FILTER_PLAYED_MONTHS:
before = now - dt.timedelta(days=30 * filter.last_played_number)
elif filter.last_played_unit == Config.FILTER_PLAYED_YEARS:
before = now - dt.timedelta(days=365 * filter.last_played_number)
subquery = (
select(
Playdates.track_id,
func.max(Playdates.lastplayed).label("max_last_played"),
)
.group_by(Playdates.track_id)
.subquery()
)
query = query.join(subquery, Tracks.id == subquery.c.track_id).where(
subquery.c.max_last_played < before
)
records = session.scalars(query).unique().all()
return records
@classmethod
def get_by_path(cls, session: Session, path: str) -> Optional["Tracks"]:
"""
Return track with passed path, or None.
"""
try:
return (
session.execute(select(Tracks).where(Tracks.path == path))
.unique()
.scalar_one()
)
except NoResultFound:
return None
@classmethod
def search_artists(cls, session: Session, text: str) -> Sequence["Tracks"]:
"""
Search case-insenstively for artists containing str
The query performs an outer join with 'joinedload' to populate the results
from the Playdates table at the same time. unique() needed; see
https://docs.sqlalchemy.org/en/20/orm/queryguide/relationships.html#joined-eager-loading
"""
return (
session.scalars(
select(cls)
.options(joinedload(Tracks.playdates))
.where(cls.artist.ilike(f"%{text}%"))
.order_by(cls.title)
)
.unique()
.all()
)
@classmethod
def search_titles(cls, session: Session, text: str) -> Sequence["Tracks"]:
"""
Search case-insenstively for titles containing str
The query performs an outer join with 'joinedload' to populate the results
from the Playdates table at the same time. unique() needed; see
https://docs.sqlalchemy.org/en/20/orm/queryguide/relationships.html#joined-eager-loading
"""
return (
session.scalars(
select(cls)
.options(joinedload(Tracks.playdates))
.where(cls.title.like(f"{text}%"))
.order_by(cls.title)
)
.unique()
.all()
)