Remove app/models.py

dbtables.py now contains table definitions, __repr__ and __init__
This commit is contained in:
Keith Edmunds 2025-04-21 22:30:16 +01:00
parent c58eb47cc1
commit 6d012d7b5a
7 changed files with 159 additions and 914 deletions

View File

@ -5,7 +5,7 @@
# there are two components separated by a colon: # there are two components separated by a colon:
# the left part is the import path to the module containing the database instance # the left part is the import path to the module containing the database instance
# the right part is the name of the database instance, typically 'db' # the right part is the name of the database instance, typically 'db'
alchemical_db = models:db alchemical_db = ds:db
# path to migration scripts # path to migration scripts
script_location = migrations script_location = migrations

View File

@ -1,6 +1,4 @@
# Standard library imports # Standard library imports
import os
import sys
# PyQt imports # PyQt imports
@ -8,7 +6,6 @@ import sys
from alchemical import Alchemical # type:ignore from alchemical import Alchemical # type:ignore
# App imports # App imports
from config import Config
class DatabaseManager: class DatabaseManager:
@ -34,10 +31,3 @@ class DatabaseManager:
return DatabaseManager.__instance return DatabaseManager.__instance
# Establish database connection
DATABASE_URL = os.environ.get("DATABASE_URL")
if DATABASE_URL is None:
raise ValueError("DATABASE_URL is undefined")
if "unittest" in sys.modules and "sqlite" not in DATABASE_URL:
raise ValueError("Unit tests running on non-Sqlite database")
db = DatabaseManager.get_instance(DATABASE_URL, engine_options=Config.ENGINE_OPTIONS).db

View File

@ -15,13 +15,13 @@ from sqlalchemy import (
String, String,
) )
from sqlalchemy.ext.associationproxy import association_proxy from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.engine.interfaces import Dialect from sqlalchemy.engine.interfaces import Dialect
from sqlalchemy.orm import ( from sqlalchemy.orm import (
Mapped, Mapped,
mapped_column, mapped_column,
relationship, relationship,
) )
from sqlalchemy.orm.session import Session
from sqlalchemy.types import TypeDecorator, TEXT from sqlalchemy.types import TypeDecorator, TEXT
# App imports # App imports
@ -49,7 +49,7 @@ class JSONEncodedDict(TypeDecorator):
# Database classes # Database classes
class NoteColoursTable(Model): class NoteColours(Model):
__tablename__ = "notecolours" __tablename__ = "notecolours"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
@ -68,18 +68,52 @@ class NoteColoursTable(Model):
f"colour={self.colour}>" f"colour={self.colour}>"
) )
def __init__(
self,
session: Session,
substring: str,
colour: str,
enabled: bool = True,
is_regex: bool = False,
is_casesensitive: bool = False,
order: Optional[int] = 0,
) -> None:
self.substring = substring
self.colour = colour
self.enabled = enabled
self.is_regex = is_regex
self.is_casesensitive = is_casesensitive
self.order = order
class PlaydatesTable(Model): session.add(self)
session.commit()
class Playdates(Model):
__tablename__ = "playdates" __tablename__ = "playdates"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
lastplayed: Mapped[dt.datetime] = mapped_column(index=True) lastplayed: Mapped[dt.datetime] = mapped_column(index=True)
track_id: Mapped[int] = mapped_column(ForeignKey("tracks.id", ondelete="CASCADE")) track_id: Mapped[int] = mapped_column(ForeignKey("tracks.id", ondelete="CASCADE"))
track: Mapped["TracksTable"] = relationship( track: Mapped["Tracks"] = relationship(
"TracksTable", "Tracks",
back_populates="playdates", back_populates="playdates",
) )
def __init__(
self, session: Session, track_id: int, when: dt.datetime | None = None
) -> None:
"""Record that track was played"""
if not when:
self.lastplayed = dt.datetime.now()
else:
self.lastplayed = when
self.track_id = track_id
session.add(self)
session.commit()
def __repr__(self) -> str: def __repr__(self) -> str:
return ( return (
f"<Playdates(id={self.id}, track_id={self.track_id} " f"<Playdates(id={self.id}, track_id={self.track_id} "
@ -87,7 +121,7 @@ class PlaydatesTable(Model):
) )
class PlaylistsTable(Model): class Playlists(Model):
""" """
Manage playlists Manage playlists
""" """
@ -100,11 +134,11 @@ class PlaylistsTable(Model):
tab: Mapped[Optional[int]] = mapped_column(default=None) tab: Mapped[Optional[int]] = mapped_column(default=None)
open: Mapped[bool] = mapped_column(default=False) open: Mapped[bool] = mapped_column(default=False)
is_template: Mapped[bool] = mapped_column(default=False) is_template: Mapped[bool] = mapped_column(default=False)
rows: Mapped[list["PlaylistRowsTable"]] = relationship( rows: Mapped[list["PlaylistRows"]] = relationship(
"PlaylistRowsTable", "PlaylistRows",
back_populates="playlist", back_populates="playlist",
cascade="all, delete-orphan", cascade="all, delete-orphan",
order_by="PlaylistRowsTable.row_number", order_by="PlaylistRows.row_number",
) )
favourite: Mapped[bool] = mapped_column( favourite: Mapped[bool] = mapped_column(
Boolean, nullable=False, index=False, default=False Boolean, nullable=False, index=False, default=False
@ -116,8 +150,21 @@ class PlaylistsTable(Model):
f"is_templatee={self.is_template}, open={self.open}>" f"is_templatee={self.is_template}, open={self.open}>"
) )
def __init__(self, session: Session, name: str, template_id: int) -> None:
"""Create playlist with passed name"""
class PlaylistRowsTable(Model): self.name = name
self.last_used = dt.datetime.now()
session.add(self)
session.commit()
# If a template is specified, copy from it
if template_id:
PlaylistRows.copy_playlist(session, template_id, self.id)
class PlaylistRows(Model):
__tablename__ = "playlist_rows" __tablename__ = "playlist_rows"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
@ -129,12 +176,12 @@ class PlaylistRowsTable(Model):
ForeignKey("playlists.id", ondelete="CASCADE"), index=True ForeignKey("playlists.id", ondelete="CASCADE"), index=True
) )
playlist: Mapped[PlaylistsTable] = relationship(back_populates="rows") playlist: Mapped[Playlists] = relationship(back_populates="rows")
track_id: Mapped[Optional[int]] = mapped_column( track_id: Mapped[Optional[int]] = mapped_column(
ForeignKey("tracks.id", ondelete="CASCADE") ForeignKey("tracks.id", ondelete="CASCADE")
) )
track: Mapped["TracksTable"] = relationship( track: Mapped["Tracks"] = relationship(
"TracksTable", "Tracks",
back_populates="playlistrows", back_populates="playlistrows",
) )
played: Mapped[bool] = mapped_column( played: Mapped[bool] = mapped_column(
@ -148,8 +195,26 @@ class PlaylistRowsTable(Model):
f"note={self.note}, row_number={self.row_number}>" f"note={self.note}, row_number={self.row_number}>"
) )
def __init__(
self,
session: Session,
playlist_id: int,
row_number: int,
note: str = "",
track_id: Optional[int] = None,
) -> None:
"""Create PlaylistRows object"""
class QueriesTable(Model): self.playlist_id = playlist_id
self.track_id = track_id
self.row_number = row_number
self.note = note
session.add(self)
session.commit()
class Queries(Model):
__tablename__ = "queries" __tablename__ = "queries"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
@ -171,10 +236,26 @@ class QueriesTable(Model):
filter = property(_get_filter, _set_filter) filter = property(_get_filter, _set_filter)
def __repr__(self) -> str: def __repr__(self) -> str:
return f"<QueriesTable(id={self.id}, name={self.name}, filter={self.filter})>" return f"<Queries(id={self.id}, name={self.name}, filter={self.filter})>"
def __init__(
self,
session: Session,
name: str,
filter: Filter,
favourite: bool = False,
) -> None:
"""Create new query"""
self.name = name
self.filter = filter
self.favourite = favourite
session.add(self)
session.commit()
class SettingsTable(Model): class Settings(Model):
"""Manage settings""" """Manage settings"""
__tablename__ = "settings" __tablename__ = "settings"
@ -191,8 +272,14 @@ class SettingsTable(Model):
f"f_datetime={self.f_datetime}, f_int={self.f_int}, f_string={self.f_string}>" f"f_datetime={self.f_datetime}, f_int={self.f_int}, f_string={self.f_string}>"
) )
def __init__(self, session: Session, name: str) -> None:
self.name = name
class TracksTable(Model): session.add(self)
session.commit()
class Tracks(Model):
__tablename__ = "tracks" __tablename__ = "tracks"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
@ -206,14 +293,14 @@ class TracksTable(Model):
start_gap: Mapped[int] = mapped_column(index=False) start_gap: Mapped[int] = mapped_column(index=False)
title: Mapped[str] = mapped_column(String(256), index=True) title: Mapped[str] = mapped_column(String(256), index=True)
playlistrows: Mapped[list[PlaylistRowsTable]] = relationship( playlistrows: Mapped[list[PlaylistRows]] = relationship(
"PlaylistRowsTable", "PlaylistRows",
back_populates="track", back_populates="track",
cascade="all, delete-orphan", cascade="all, delete-orphan",
) )
playlists = association_proxy("playlistrows", "playlist") playlists = association_proxy("playlistrows", "playlist")
playdates: Mapped[list[PlaydatesTable]] = relationship( playdates: Mapped[list[Playdates]] = relationship(
"PlaydatesTable", "Playdates",
back_populates="track", back_populates="track",
cascade="all, delete-orphan", cascade="all, delete-orphan",
lazy="joined", lazy="joined",
@ -224,3 +311,27 @@ class TracksTable(Model):
f"<Track(id={self.id}, title={self.title}, " f"<Track(id={self.id}, title={self.title}, "
f"artist={self.artist}, path={self.path}>" f"artist={self.artist}, path={self.path}>"
) )
def __init__(
self,
session: Session,
path: str,
title: str,
artist: str,
duration: int,
start_gap: int,
fade_at: int,
silence_at: int,
bitrate: int,
) -> None:
self.path = path
self.title = title
self.artist = artist
self.bitrate = bitrate
self.duration = duration
self.start_gap = start_gap
self.fade_at = fade_at
self.silence_at = silence_at
session.add(self)
session.commit()

View File

@ -1,6 +1,8 @@
# Standard library imports # Standard library imports
import datetime as dt import datetime as dt
import os
import re import re
import sys
# PyQt imports # PyQt imports
@ -29,9 +31,8 @@ from classes import (
TrackDTO, TrackDTO,
) )
from config import Config from config import Config
from dbmanager import db
from log import log, log_call from log import log, log_call
from models import ( from dbtables import (
NoteColours, NoteColours,
Playdates, Playdates,
PlaylistRows, PlaylistRows,
@ -40,7 +41,15 @@ from models import (
Settings, Settings,
Tracks, Tracks,
) )
from dbmanager import DatabaseManager
# Establish database connection
DATABASE_URL = os.environ.get("DATABASE_URL")
if DATABASE_URL is None:
raise ValueError("DATABASE_URL is undefined")
if "unittest" in sys.modules and "sqlite" not in DATABASE_URL:
raise ValueError("Unit tests running on non-Sqlite database")
db = DatabaseManager.get_instance(DATABASE_URL, engine_options=Config.ENGINE_OPTIONS).db
# Configure the dogpile cache region # Configure the dogpile cache region
cache_region = make_region().configure( cache_region = make_region().configure(

View File

@ -23,7 +23,6 @@ from tinytag import TinyTag, TinyTagException # type: ignore
from classes import AudioMetadata, ApplicationError, Tags, TrackDTO from classes import AudioMetadata, ApplicationError, Tags, TrackDTO
from config import Config from config import Config
from log import log from log import log
from models import Tracks
start_time_re = re.compile(r"@\d\d:\d\d") start_time_re = re.compile(r"@\d\d:\d\d")
@ -385,18 +384,6 @@ def send_mail(to_addr: str, from_addr: str, subj: str, body: str) -> None:
s.quit() s.quit()
def set_track_metadata(track: Tracks) -> None:
"""Set/update track metadata in database"""
audio_metadata = get_audio_metadata(track.path)
tags = get_tags(track.path)
for audio_key in AudioMetadata._fields:
setattr(track, audio_key, getattr(audio_metadata, audio_key))
for tag_key in Tags._fields:
setattr(track, tag_key, getattr(tags, tag_key))
def show_OK(title: str, msg: str, parent: Optional[QWidget] = None) -> None: def show_OK(title: str, msg: str, parent: Optional[QWidget] = None) -> None:
"""Display a message to user""" """Display a message to user"""

View File

@ -1,853 +0,0 @@
# Standard library imports
from __future__ import annotations
from typing import Optional, Sequence
import datetime as dt
import os
import re
import sys
# PyQt imports
# Third party imports
from dogpile.cache import make_region
from dogpile.cache.api import NO_VALUE
from sqlalchemy import (
bindparam,
delete,
func,
select,
text,
update,
)
from sqlalchemy.exc import IntegrityError, ProgrammingError
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm import joinedload, selectinload
from sqlalchemy.orm.session import Session
from sqlalchemy.engine.row import RowMapping
# App imports
from classes import ApplicationError, Filter
from config import Config
from dbmanager import DatabaseManager
import dbtables
from log import log
# Configure the cache region
cache_region = make_region().configure(
'dogpile.cache.memory', # Use in-memory caching for now (switch to Redis if needed)
expiration_time=600 # Cache expires after 10 minutes
)
def run_sql(session: Session, sql: str) -> Sequence[RowMapping]:
"""
Run a sql string and return results
"""
try:
return session.execute(text(sql)).mappings().all()
except ProgrammingError as e:
raise ApplicationError(e)
# Database classes
class NoteColours(dbtables.NoteColoursTable):
def __init__(
self,
session: Session,
substring: str,
colour: str,
enabled: bool = True,
is_regex: bool = False,
is_casesensitive: bool = False,
order: Optional[int] = 0,
) -> None:
self.substring = substring
self.colour = colour
self.enabled = enabled
self.is_regex = is_regex
self.is_casesensitive = is_casesensitive
self.order = order
session.add(self)
session.commit()
@classmethod
def get_all(cls, session: Session) -> Sequence["NoteColours"]:
"""
Return all records
"""
cache_key = "note_colours_all"
cached_result = cache_region.get(cache_key)
if cached_result is not NO_VALUE:
return cached_result
# Query the database
result = session.scalars(
select(cls)
.where(
cls.enabled.is_(True),
)
.order_by(cls.order)
).all()
cache_region.set(cache_key, result)
return result
@staticmethod
def get_colour(
session: Session, text: str, foreground: bool = False
) -> str:
"""
Parse text and return background (foreground if foreground==True) colour
string if matched, else None
"""
if not text:
return ""
match = False
for rec in NoteColours.get_all(session):
if rec.is_regex:
flags = re.UNICODE
if not rec.is_casesensitive:
flags |= re.IGNORECASE
p = re.compile(rec.substring, flags)
if p.match(text):
match = True
else:
if rec.is_casesensitive:
if rec.substring in text:
match = True
else:
if rec.substring.lower() in text.lower():
match = True
if match:
if foreground:
return rec.foreground or ""
else:
return rec.colour
return ""
@staticmethod
def invalidate_cache() -> None:
"""Invalidate dogpile cache"""
cache_region.delete("note_colours_all")
class Playdates(dbtables.PlaydatesTable):
def __init__(
self, session: Session, track_id: int, when: dt.datetime | None = None
) -> None:
"""Record that track was played"""
if not when:
self.lastplayed = dt.datetime.now()
else:
self.lastplayed = when
self.track_id = track_id
session.add(self)
session.commit()
@staticmethod
def last_playdates(
session: Session, track_id: int, limit: int = 5
) -> Sequence["Playdates"]:
"""
Return a list of the last limit playdates for this track, sorted
latest to earliest.
"""
return session.scalars(
Playdates.select()
.where(Playdates.track_id == track_id)
.order_by(Playdates.lastplayed.desc())
.limit(limit)
).all()
@staticmethod
def last_played(session: Session, track_id: int) -> dt.datetime:
"""Return datetime track last played or None"""
last_played = session.execute(
select(Playdates.lastplayed)
.where(Playdates.track_id == track_id)
.order_by(Playdates.lastplayed.desc())
.limit(1)
).first()
if last_played:
return last_played[0]
else:
# Should never be reached as we create record with a
# last_played value
return Config.EPOCH # pragma: no cover
@staticmethod
def last_played_tracks(session: Session, limit: int = 5) -> Sequence["Playdates"]:
"""
Return a list of the last limit tracks played, sorted
earliest to latest.
"""
return session.scalars(
Playdates.select().order_by(Playdates.lastplayed.desc()).limit(limit)
).all()
@staticmethod
def played_after(session: Session, since: dt.datetime) -> Sequence["Playdates"]:
"""Return a list of Playdates objects since passed time"""
return session.scalars(
select(Playdates)
.where(Playdates.lastplayed >= since)
.order_by(Playdates.lastplayed)
).all()
class Playlists(dbtables.PlaylistsTable):
def __init__(self, session: Session, name: str, template_id: int) -> None:
"""Create playlist with passed name"""
self.name = name
self.last_used = dt.datetime.now()
session.add(self)
session.commit()
# If a template is specified, copy from it
if template_id:
PlaylistRows.copy_playlist(session, template_id, self.id)
@staticmethod
def clear_tabs(session: Session, playlist_ids: list[int]) -> None:
"""
Make all tab records NULL
"""
session.execute(
update(Playlists)
.where(Playlists.id.in_(playlist_ids))
.values(tab=None)
)
def close(self, session: Session) -> None:
"""Mark playlist as unloaded"""
self.open = False
session.commit()
@classmethod
def get_all(cls, session: Session) -> Sequence["Playlists"]:
"""Returns a list of all playlists ordered by last use"""
return session.scalars(
select(cls)
.filter(cls.is_template.is_(False))
.order_by(cls.last_used.desc())
).all()
@classmethod
def get_all_templates(cls, session: Session) -> Sequence["Playlists"]:
"""Returns a list of all templates ordered by name"""
return session.scalars(
select(cls).where(cls.is_template.is_(True)).order_by(cls.name)
).all()
@classmethod
def get_favourite_templates(cls, session: Session) -> Sequence["Playlists"]:
"""Returns a list of favourite templates ordered by name"""
return session.scalars(
select(cls)
.where(cls.is_template.is_(True), cls.favourite.is_(True))
.order_by(cls.name)
).all()
@classmethod
def get_closed(cls, session: Session) -> Sequence["Playlists"]:
"""Returns a list of all closed playlists ordered by last use"""
return session.scalars(
select(cls)
.filter(
cls.open.is_(False),
cls.is_template.is_(False),
)
.order_by(cls.last_used.desc())
).all()
@classmethod
def get_open(cls, session: Session) -> Sequence[Optional["Playlists"]]:
"""
Return a list of loaded playlists ordered by tab.
"""
return session.scalars(
select(cls).where(cls.open.is_(True)).order_by(cls.tab)
).all()
def mark_open(self) -> None:
"""Mark playlist as loaded and used now"""
self.open = True
self.last_used = dt.datetime.now()
@staticmethod
def name_is_available(session: Session, name: str) -> bool:
"""
Return True if no playlist of this name exists else false.
"""
return (
session.execute(select(Playlists).where(Playlists.name == name)).first()
is None
)
def rename(self, session: Session, new_name: str) -> None:
"""
Rename playlist
"""
self.name = new_name
session.commit()
@staticmethod
def save_as_template(
session: Session, playlist_id: int, template_name: str
) -> None:
"""Save passed playlist as new template"""
template = Playlists(session, template_name, template_id=0)
if not template or not template.id:
return
template.is_template = True
session.commit()
PlaylistRows.copy_playlist(session, playlist_id, template.id)
class PlaylistRows(dbtables.PlaylistRowsTable):
def __init__(
self,
session: Session,
playlist_id: int,
row_number: int,
note: str = "",
track_id: Optional[int] = None,
) -> None:
"""Create PlaylistRows object"""
self.playlist_id = playlist_id
self.track_id = track_id
self.row_number = row_number
self.note = note
session.add(self)
session.commit()
def append_note(self, extra_note: str) -> None:
"""Append passed note to any existing note"""
current_note = self.note
if current_note:
self.note = current_note + "\n" + extra_note
else:
self.note = extra_note
@staticmethod
def copy_playlist(session: Session, src_id: int, dst_id: int) -> None:
"""Copy playlist entries"""
src_rows = session.scalars(
select(PlaylistRows).filter(PlaylistRows.playlist_id == src_id)
).all()
for plr in src_rows:
PlaylistRows(
session=session,
playlist_id=dst_id,
row_number=plr.row_number,
note=plr.note,
track_id=plr.track_id,
)
@classmethod
def deep_row(
cls, session: Session, playlist_id: int, row_number: int
) -> "PlaylistRows":
"""
Return a playlist row that includes full track and lastplayed data for
given playlist_id and row
"""
# TODO: use selectinload?
stmt = (
select(PlaylistRows)
.options(joinedload(cls.track))
.where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.row_number == row_number,
)
# .options(joinedload(Tracks.playdates))
)
return session.execute(stmt).unique().scalar_one()
@staticmethod
def delete_higher_rows(session: Session, playlist_id: int, maxrow: int) -> None:
"""
Delete rows in given playlist that have a higher row number
than 'maxrow'
"""
session.execute(
delete(PlaylistRows).where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.row_number > maxrow,
)
)
session.commit()
@staticmethod
def delete_row(session: Session, playlist_id: int, row_number: int) -> None:
"""
Delete passed row in given playlist.
"""
session.execute(
delete(PlaylistRows).where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.row_number == row_number,
)
)
@classmethod
def plrids_to_plrs(
cls, session: Session, playlist_id: int, plr_ids: list[int]
) -> Sequence["PlaylistRows"]:
"""
Take a list of PlaylistRows ids and return a list of corresponding
PlaylistRows objects
"""
plrs = session.scalars(
select(cls)
.where(cls.playlist_id == playlist_id, cls.id.in_(plr_ids))
.order_by(cls.row_number)
).all()
return plrs
@staticmethod
def get_last_used_row(session: Session, playlist_id: int) -> Optional[int]:
"""Return the last used row for playlist, or None if no rows"""
return session.execute(
select(func.max(PlaylistRows.row_number)).where(
PlaylistRows.playlist_id == playlist_id
)
).scalar_one()
@staticmethod
def get_track_plr(
session: Session, track_id: int, playlist_id: int
) -> Optional["PlaylistRows"]:
"""Return first matching PlaylistRows object or None"""
return session.scalars(
select(PlaylistRows)
.where(
PlaylistRows.track_id == track_id,
PlaylistRows.playlist_id == playlist_id,
)
.limit(1)
).first()
@classmethod
def get_played_rows(
cls, session: Session, playlist_id: int
) -> Sequence["PlaylistRows"]:
"""
For passed playlist, return a list of rows that
have been played.
"""
plrs = session.scalars(
select(cls)
.where(cls.playlist_id == playlist_id, cls.played.is_(True))
.order_by(cls.row_number)
).all()
return plrs
@classmethod
def get_playlist_rows(
cls, session: Session, playlist_id: int
) -> Sequence["PlaylistRows"]:
"""
For passed playlist, return a list of rows.
"""
stmt = (
select(cls)
.where(cls.playlist_id == playlist_id)
.options(selectinload(cls.track))
.order_by(cls.row_number)
)
plrs = session.execute(stmt).scalars().all()
return plrs
@classmethod
def get_rows_with_tracks(
cls,
session: Session,
playlist_id: int,
) -> Sequence["PlaylistRows"]:
"""
For passed playlist, return a list of rows that
contain tracks
"""
query = select(cls).where(
cls.playlist_id == playlist_id, cls.track_id.is_not(None)
)
plrs = session.scalars((query).order_by(cls.row_number)).all()
return plrs
@classmethod
def get_unplayed_rows(
cls, session: Session, playlist_id: int
) -> Sequence["PlaylistRows"]:
"""
For passed playlist, return a list of playlist rows that
have not been played.
"""
plrs = session.scalars(
select(cls)
.where(
cls.playlist_id == playlist_id,
cls.track_id.is_not(None),
cls.played.is_(False),
)
.order_by(cls.row_number)
).all()
return plrs
@classmethod
def insert_row(
cls,
session: Session,
playlist_id: int,
new_row_number: int,
note: str = "",
track_id: Optional[int] = None,
) -> "PlaylistRows":
cls.move_rows_down(session, playlist_id, new_row_number, 1)
return cls(
session,
playlist_id=playlist_id,
row_number=new_row_number,
note=note,
track_id=track_id,
)
@staticmethod
def move_rows_down(
session: Session, playlist_id: int, starting_row: int, move_by: int
) -> None:
"""
Create space to insert move_by additional rows by incremented row
number from starting_row to end of playlist
"""
log.debug(f"(move_rows_down({playlist_id=}, {starting_row=}, {move_by=}")
session.execute(
update(PlaylistRows)
.where(
(PlaylistRows.playlist_id == playlist_id),
(PlaylistRows.row_number >= starting_row),
)
.values(row_number=PlaylistRows.row_number + move_by)
)
@staticmethod
def update_plr_row_numbers(
session: Session,
playlist_id: int,
sqla_map: list[dict[str, int]],
) -> None:
"""
Take a {plrid: row_number} dictionary and update the row numbers accordingly
"""
# Update database. Ref:
# https://docs.sqlalchemy.org/en/20/tutorial/data_update.html#the-update-sql-expression-construct
stmt = (
update(PlaylistRows)
.where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.id == bindparam("playlistrow_id"),
)
.values(row_number=bindparam("row_number"))
)
session.connection().execute(stmt, sqla_map)
class Queries(dbtables.QueriesTable):
def __init__(
self,
session: Session,
name: str,
filter: dbtables.Filter,
favourite: bool = False,
) -> None:
"""Create new query"""
self.name = name
self.filter = filter
self.favourite = favourite
session.add(self)
session.commit()
@classmethod
def get_all(cls, session: Session) -> Sequence["Queries"]:
"""Returns a list of all queries ordered by name"""
return session.scalars(select(cls).order_by(cls.name)).all()
@classmethod
def get_favourites(cls, session: Session) -> Sequence["Queries"]:
"""Returns a list of favourite queries ordered by name"""
return session.scalars(
select(cls).where(cls.favourite.is_(True)).order_by(cls.name)
).all()
class Settings(dbtables.SettingsTable):
def __init__(self, session: Session, name: str) -> None:
self.name = name
session.add(self)
session.commit()
@classmethod
def get_setting(cls, session: Session, name: str) -> "Settings":
"""Get existing setting or return new setting record"""
try:
return session.execute(select(cls).where(cls.name == name)).scalar_one()
except NoResultFound:
return Settings(session, name)
class Tracks(dbtables.TracksTable):
def __init__(
self,
session: Session,
path: str,
title: str,
artist: str,
duration: int,
start_gap: int,
fade_at: int,
silence_at: int,
bitrate: int,
) -> None:
self.path = path
self.title = title
self.artist = artist
self.bitrate = bitrate
self.duration = duration
self.start_gap = start_gap
self.fade_at = fade_at
self.silence_at = silence_at
try:
session.add(self)
session.commit()
except IntegrityError as error:
session.rollback()
log.error(f"Error ({error=}) importing track ({path=})")
raise ValueError(error)
@classmethod
def get_all(cls, session: Session) -> Sequence["Tracks"]:
"""Return a list of all tracks"""
return session.scalars(select(cls)).unique().all()
@classmethod
def all_tracks_indexed_by_id(cls, session: Session) -> dict[int, Tracks]:
"""
Return a dictionary of all tracks, keyed by title
"""
result: dict[int, Tracks] = {}
for track in cls.get_all(session):
result[track.id] = track
return result
@classmethod
def exact_title_and_artist(
cls, session: Session, title: str, artist: str
) -> Sequence["Tracks"]:
"""
Search for exact but case-insensitive match of title and artist
"""
return (
session.scalars(
select(cls)
.where(cls.title.ilike(title), cls.artist.ilike(artist))
.order_by(cls.title)
)
.unique()
.all()
)
@classmethod
def get_filtered_tracks(
cls, session: Session, filter: Filter
) -> Sequence["Tracks"]:
"""
Return tracks matching filter
"""
# Now implemented in repostory.py
return []
query = select(cls)
# Path specification
if filter.path:
if filter.path_type == "contains":
query = query.where(cls.path.ilike(f"%{filter.path}%"))
elif filter.path_type == "excluding":
query = query.where(cls.path.notilike(f"%{filter.path}%"))
else:
raise ApplicationError(f"Can't process filter path ({filter=})")
# Duration specification
seconds_duration = filter.duration_number
if filter.duration_unit == Config.FILTER_DURATION_MINUTES:
seconds_duration *= 60
elif filter.duration_unit != Config.FILTER_DURATION_SECONDS:
raise ApplicationError(f"Can't process filter duration ({filter=})")
if filter.duration_type == Config.FILTER_DURATION_LONGER:
query = query.where(cls.duration >= seconds_duration)
elif filter.duration_unit == Config.FILTER_DURATION_SHORTER:
query = query.where(cls.duration <= seconds_duration)
else:
raise ApplicationError(f"Can't process filter duration type ({filter=})")
# Process comparator
if filter.last_played_comparator == Config.FILTER_PLAYED_COMPARATOR_NEVER:
# Select tracks that have never been played
query = query.outerjoin(Playdates, cls.id == Playdates.track_id).where(
Playdates.id.is_(None)
)
else:
# Last played specification
now = dt.datetime.now()
# Set sensible default, and correct for Config.FILTER_PLAYED_COMPARATOR_ANYTIME
before = now
# If not ANYTIME, set 'before' appropriates
if filter.last_played_comparator != Config.FILTER_PLAYED_COMPARATOR_ANYTIME:
if filter.last_played_unit == Config.FILTER_PLAYED_DAYS:
before = now - dt.timedelta(days=filter.last_played_number)
elif filter.last_played_unit == Config.FILTER_PLAYED_WEEKS:
before = now - dt.timedelta(days=7 * filter.last_played_number)
elif filter.last_played_unit == Config.FILTER_PLAYED_MONTHS:
before = now - dt.timedelta(days=30 * filter.last_played_number)
elif filter.last_played_unit == Config.FILTER_PLAYED_YEARS:
before = now - dt.timedelta(days=365 * filter.last_played_number)
subquery = (
select(
Playdates.track_id,
func.max(Playdates.lastplayed).label("max_last_played"),
)
.group_by(Playdates.track_id)
.subquery()
)
query = query.join(subquery, Tracks.id == subquery.c.track_id).where(
subquery.c.max_last_played < before
)
records = session.scalars(query).unique().all()
return records
@classmethod
def get_by_path(cls, session: Session, path: str) -> Optional["Tracks"]:
"""
Return track with passed path, or None.
"""
try:
return (
session.execute(select(Tracks).where(Tracks.path == path))
.unique()
.scalar_one()
)
except NoResultFound:
return None
@classmethod
def search_artists(cls, session: Session, text: str) -> Sequence["Tracks"]:
"""
Search case-insenstively for artists containing str
The query performs an outer join with 'joinedload' to populate the results
from the Playdates table at the same time. unique() needed; see
https://docs.sqlalchemy.org/en/20/orm/queryguide/relationships.html#joined-eager-loading
"""
return (
session.scalars(
select(cls)
.options(joinedload(Tracks.playdates))
.where(cls.artist.ilike(f"%{text}%"))
.order_by(cls.title)
)
.unique()
.all()
)
@classmethod
def search_titles(cls, session: Session, text: str) -> Sequence["Tracks"]:
"""
Search case-insenstively for titles containing str
The query performs an outer join with 'joinedload' to populate the results
from the Playdates table at the same time. unique() needed; see
https://docs.sqlalchemy.org/en/20/orm/queryguide/relationships.html#joined-eager-loading
"""
return (
session.scalars(
select(cls)
.options(joinedload(Tracks.playdates))
.where(cls.title.like(f"{text}%"))
.order_by(cls.title)
)
.unique()
.all()
)

View File

@ -2,25 +2,26 @@ from importlib import import_module
from alembic import context from alembic import context
from alchemical.alembic.env import run_migrations from alchemical.alembic.env import run_migrations
# this is the Alembic Config object, which provides # Load Alembic configuration
# access to the values within the .ini file in use.
config = context.config config = context.config
# import the application's Alchemical instance
try: try:
import_mod, db_name = config.get_main_option('alchemical_db', '').split( # Import the Alchemical database instance as specified in alembic.ini
':') import_mod, db_name = config.get_main_option('alchemical_db', '').split(':')
db = getattr(import_module(import_mod), db_name) db = getattr(import_module(import_mod), db_name)
except (ModuleNotFoundError, AttributeError): print(f"Successfully loaded Alchemical database instance: {db}")
raise ValueError(
'Could not import the Alchemical database instance. '
'Ensure that the alchemical_db setting in alembic.ini is correct.'
)
# run the migration engine # Use the metadata associated with the Alchemical instance
# The dictionary provided as second argument includes options to pass to the metadata = db.Model.metadata
# Alembic context. For details on what other options are available, see print(f"Metadata tables detected: {metadata.tables.keys()}") # Debug output
# https://alembic.sqlalchemy.org/en/latest/autogenerate.html except (ModuleNotFoundError, AttributeError) as e:
raise ValueError(
'Could not import the Alchemical database instance or access metadata. '
'Ensure that the alchemical_db setting in alembic.ini is correct and '
'that the Alchemical instance is correctly configured.'
) from e
# Run migrations with metadata
run_migrations(db, { run_migrations(db, {
'render_as_batch': True, 'render_as_batch': True,
'compare_type': True, 'compare_type': True,