diff --git a/alembic.ini b/alembic.ini index c52f478..b604d8c 100644 --- a/alembic.ini +++ b/alembic.ini @@ -5,7 +5,7 @@ # there are two components separated by a colon: # the left part is the import path to the module containing the database instance # the right part is the name of the database instance, typically 'db' -alchemical_db = models:db +alchemical_db = ds:db # path to migration scripts script_location = migrations diff --git a/app/dbmanager.py b/app/dbmanager.py index dc255a2..822ec16 100644 --- a/app/dbmanager.py +++ b/app/dbmanager.py @@ -1,6 +1,4 @@ # Standard library imports -import os -import sys # PyQt imports @@ -8,7 +6,6 @@ import sys from alchemical import Alchemical # type:ignore # App imports -from config import Config class DatabaseManager: @@ -34,10 +31,3 @@ class DatabaseManager: return DatabaseManager.__instance -# Establish database connection -DATABASE_URL = os.environ.get("DATABASE_URL") -if DATABASE_URL is None: - raise ValueError("DATABASE_URL is undefined") -if "unittest" in sys.modules and "sqlite" not in DATABASE_URL: - raise ValueError("Unit tests running on non-Sqlite database") -db = DatabaseManager.get_instance(DATABASE_URL, engine_options=Config.ENGINE_OPTIONS).db diff --git a/app/dbtables.py b/app/dbtables.py index 6c6cbe8..8567efe 100644 --- a/app/dbtables.py +++ b/app/dbtables.py @@ -15,13 +15,13 @@ from sqlalchemy import ( String, ) from sqlalchemy.ext.associationproxy import association_proxy -from sqlalchemy.ext.hybrid import hybrid_property from sqlalchemy.engine.interfaces import Dialect from sqlalchemy.orm import ( Mapped, mapped_column, relationship, ) +from sqlalchemy.orm.session import Session from sqlalchemy.types import TypeDecorator, TEXT # App imports @@ -49,7 +49,7 @@ class JSONEncodedDict(TypeDecorator): # Database classes -class NoteColoursTable(Model): +class NoteColours(Model): __tablename__ = "notecolours" id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) @@ -68,18 +68,52 @@ class NoteColoursTable(Model): f"colour={self.colour}>" ) + def __init__( + self, + session: Session, + substring: str, + colour: str, + enabled: bool = True, + is_regex: bool = False, + is_casesensitive: bool = False, + order: Optional[int] = 0, + ) -> None: + self.substring = substring + self.colour = colour + self.enabled = enabled + self.is_regex = is_regex + self.is_casesensitive = is_casesensitive + self.order = order -class PlaydatesTable(Model): + session.add(self) + session.commit() + + +class Playdates(Model): __tablename__ = "playdates" id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) lastplayed: Mapped[dt.datetime] = mapped_column(index=True) track_id: Mapped[int] = mapped_column(ForeignKey("tracks.id", ondelete="CASCADE")) - track: Mapped["TracksTable"] = relationship( - "TracksTable", + track: Mapped["Tracks"] = relationship( + "Tracks", back_populates="playdates", ) + def __init__( + self, session: Session, track_id: int, when: dt.datetime | None = None + ) -> None: + """Record that track was played""" + + if not when: + self.lastplayed = dt.datetime.now() + else: + self.lastplayed = when + self.track_id = track_id + + session.add(self) + session.commit() + def __repr__(self) -> str: return ( f"" ) + def __init__(self, session: Session, name: str, template_id: int) -> None: + """Create playlist with passed name""" -class PlaylistRowsTable(Model): + self.name = name + self.last_used = dt.datetime.now() + + session.add(self) + session.commit() + + # If a template is specified, copy from it + if template_id: + PlaylistRows.copy_playlist(session, template_id, self.id) + + +class PlaylistRows(Model): __tablename__ = "playlist_rows" id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) @@ -129,12 +176,12 @@ class PlaylistRowsTable(Model): ForeignKey("playlists.id", ondelete="CASCADE"), index=True ) - playlist: Mapped[PlaylistsTable] = relationship(back_populates="rows") + playlist: Mapped[Playlists] = relationship(back_populates="rows") track_id: Mapped[Optional[int]] = mapped_column( ForeignKey("tracks.id", ondelete="CASCADE") ) - track: Mapped["TracksTable"] = relationship( - "TracksTable", + track: Mapped["Tracks"] = relationship( + "Tracks", back_populates="playlistrows", ) played: Mapped[bool] = mapped_column( @@ -148,8 +195,26 @@ class PlaylistRowsTable(Model): f"note={self.note}, row_number={self.row_number}>" ) + def __init__( + self, + session: Session, + playlist_id: int, + row_number: int, + note: str = "", + track_id: Optional[int] = None, + ) -> None: + """Create PlaylistRows object""" -class QueriesTable(Model): + self.playlist_id = playlist_id + self.track_id = track_id + self.row_number = row_number + self.note = note + + session.add(self) + session.commit() + + +class Queries(Model): __tablename__ = "queries" id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) @@ -171,10 +236,26 @@ class QueriesTable(Model): filter = property(_get_filter, _set_filter) def __repr__(self) -> str: - return f"" + return f"" + + def __init__( + self, + session: Session, + name: str, + filter: Filter, + favourite: bool = False, + ) -> None: + """Create new query""" + + self.name = name + self.filter = filter + self.favourite = favourite + + session.add(self) + session.commit() -class SettingsTable(Model): +class Settings(Model): """Manage settings""" __tablename__ = "settings" @@ -191,8 +272,14 @@ class SettingsTable(Model): f"f_datetime={self.f_datetime}, f_int={self.f_int}, f_string={self.f_string}>" ) + def __init__(self, session: Session, name: str) -> None: + self.name = name -class TracksTable(Model): + session.add(self) + session.commit() + + +class Tracks(Model): __tablename__ = "tracks" id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) @@ -206,14 +293,14 @@ class TracksTable(Model): start_gap: Mapped[int] = mapped_column(index=False) title: Mapped[str] = mapped_column(String(256), index=True) - playlistrows: Mapped[list[PlaylistRowsTable]] = relationship( - "PlaylistRowsTable", + playlistrows: Mapped[list[PlaylistRows]] = relationship( + "PlaylistRows", back_populates="track", cascade="all, delete-orphan", ) playlists = association_proxy("playlistrows", "playlist") - playdates: Mapped[list[PlaydatesTable]] = relationship( - "PlaydatesTable", + playdates: Mapped[list[Playdates]] = relationship( + "Playdates", back_populates="track", cascade="all, delete-orphan", lazy="joined", @@ -224,3 +311,27 @@ class TracksTable(Model): f"" ) + + def __init__( + self, + session: Session, + path: str, + title: str, + artist: str, + duration: int, + start_gap: int, + fade_at: int, + silence_at: int, + bitrate: int, + ) -> None: + self.path = path + self.title = title + self.artist = artist + self.bitrate = bitrate + self.duration = duration + self.start_gap = start_gap + self.fade_at = fade_at + self.silence_at = silence_at + + session.add(self) + session.commit() diff --git a/app/ds.py b/app/ds.py index 0b92fe1..5f2b117 100644 --- a/app/ds.py +++ b/app/ds.py @@ -1,6 +1,8 @@ # Standard library imports import datetime as dt +import os import re +import sys # PyQt imports @@ -29,9 +31,8 @@ from classes import ( TrackDTO, ) from config import Config -from dbmanager import db from log import log, log_call -from models import ( +from dbtables import ( NoteColours, Playdates, PlaylistRows, @@ -40,7 +41,15 @@ from models import ( Settings, Tracks, ) +from dbmanager import DatabaseManager +# Establish database connection +DATABASE_URL = os.environ.get("DATABASE_URL") +if DATABASE_URL is None: + raise ValueError("DATABASE_URL is undefined") +if "unittest" in sys.modules and "sqlite" not in DATABASE_URL: + raise ValueError("Unit tests running on non-Sqlite database") +db = DatabaseManager.get_instance(DATABASE_URL, engine_options=Config.ENGINE_OPTIONS).db # Configure the dogpile cache region cache_region = make_region().configure( diff --git a/app/helpers.py b/app/helpers.py index 40b2d9c..22957d8 100644 --- a/app/helpers.py +++ b/app/helpers.py @@ -23,7 +23,6 @@ from tinytag import TinyTag, TinyTagException # type: ignore from classes import AudioMetadata, ApplicationError, Tags, TrackDTO from config import Config from log import log -from models import Tracks start_time_re = re.compile(r"@\d\d:\d\d") @@ -385,18 +384,6 @@ def send_mail(to_addr: str, from_addr: str, subj: str, body: str) -> None: s.quit() -def set_track_metadata(track: Tracks) -> None: - """Set/update track metadata in database""" - - audio_metadata = get_audio_metadata(track.path) - tags = get_tags(track.path) - - for audio_key in AudioMetadata._fields: - setattr(track, audio_key, getattr(audio_metadata, audio_key)) - for tag_key in Tags._fields: - setattr(track, tag_key, getattr(tags, tag_key)) - - def show_OK(title: str, msg: str, parent: Optional[QWidget] = None) -> None: """Display a message to user""" diff --git a/app/models.py b/app/models.py deleted file mode 100644 index 55769d1..0000000 --- a/app/models.py +++ /dev/null @@ -1,853 +0,0 @@ -# Standard library imports -from __future__ import annotations - -from typing import Optional, Sequence -import datetime as dt -import os -import re -import sys - -# PyQt imports - -# Third party imports -from dogpile.cache import make_region -from dogpile.cache.api import NO_VALUE -from sqlalchemy import ( - bindparam, - delete, - func, - select, - text, - update, -) -from sqlalchemy.exc import IntegrityError, ProgrammingError -from sqlalchemy.orm.exc import NoResultFound -from sqlalchemy.orm import joinedload, selectinload -from sqlalchemy.orm.session import Session -from sqlalchemy.engine.row import RowMapping - -# App imports -from classes import ApplicationError, Filter -from config import Config -from dbmanager import DatabaseManager -import dbtables -from log import log - - -# Configure the cache region -cache_region = make_region().configure( - 'dogpile.cache.memory', # Use in-memory caching for now (switch to Redis if needed) - expiration_time=600 # Cache expires after 10 minutes -) - - -def run_sql(session: Session, sql: str) -> Sequence[RowMapping]: - """ - Run a sql string and return results - """ - - try: - return session.execute(text(sql)).mappings().all() - except ProgrammingError as e: - raise ApplicationError(e) - - -# Database classes -class NoteColours(dbtables.NoteColoursTable): - - def __init__( - self, - session: Session, - substring: str, - colour: str, - enabled: bool = True, - is_regex: bool = False, - is_casesensitive: bool = False, - order: Optional[int] = 0, - ) -> None: - self.substring = substring - self.colour = colour - self.enabled = enabled - self.is_regex = is_regex - self.is_casesensitive = is_casesensitive - self.order = order - - session.add(self) - session.commit() - - @classmethod - def get_all(cls, session: Session) -> Sequence["NoteColours"]: - """ - Return all records - """ - - cache_key = "note_colours_all" - cached_result = cache_region.get(cache_key) - - if cached_result is not NO_VALUE: - return cached_result - - # Query the database - result = session.scalars( - select(cls) - .where( - cls.enabled.is_(True), - ) - .order_by(cls.order) - ).all() - cache_region.set(cache_key, result) - - return result - - @staticmethod - def get_colour( - session: Session, text: str, foreground: bool = False - ) -> str: - """ - Parse text and return background (foreground if foreground==True) colour - string if matched, else None - - """ - - if not text: - return "" - - match = False - for rec in NoteColours.get_all(session): - if rec.is_regex: - flags = re.UNICODE - if not rec.is_casesensitive: - flags |= re.IGNORECASE - p = re.compile(rec.substring, flags) - if p.match(text): - match = True - else: - if rec.is_casesensitive: - if rec.substring in text: - match = True - else: - if rec.substring.lower() in text.lower(): - match = True - - if match: - if foreground: - return rec.foreground or "" - else: - return rec.colour - return "" - - @staticmethod - def invalidate_cache() -> None: - """Invalidate dogpile cache""" - - cache_region.delete("note_colours_all") - - -class Playdates(dbtables.PlaydatesTable): - def __init__( - self, session: Session, track_id: int, when: dt.datetime | None = None - ) -> None: - """Record that track was played""" - - if not when: - self.lastplayed = dt.datetime.now() - else: - self.lastplayed = when - self.track_id = track_id - session.add(self) - session.commit() - - @staticmethod - def last_playdates( - session: Session, track_id: int, limit: int = 5 - ) -> Sequence["Playdates"]: - """ - Return a list of the last limit playdates for this track, sorted - latest to earliest. - """ - - return session.scalars( - Playdates.select() - .where(Playdates.track_id == track_id) - .order_by(Playdates.lastplayed.desc()) - .limit(limit) - ).all() - - @staticmethod - def last_played(session: Session, track_id: int) -> dt.datetime: - """Return datetime track last played or None""" - - last_played = session.execute( - select(Playdates.lastplayed) - .where(Playdates.track_id == track_id) - .order_by(Playdates.lastplayed.desc()) - .limit(1) - ).first() - - if last_played: - return last_played[0] - else: - # Should never be reached as we create record with a - # last_played value - return Config.EPOCH # pragma: no cover - - @staticmethod - def last_played_tracks(session: Session, limit: int = 5) -> Sequence["Playdates"]: - """ - Return a list of the last limit tracks played, sorted - earliest to latest. - """ - - return session.scalars( - Playdates.select().order_by(Playdates.lastplayed.desc()).limit(limit) - ).all() - - @staticmethod - def played_after(session: Session, since: dt.datetime) -> Sequence["Playdates"]: - """Return a list of Playdates objects since passed time""" - - return session.scalars( - select(Playdates) - .where(Playdates.lastplayed >= since) - .order_by(Playdates.lastplayed) - ).all() - - -class Playlists(dbtables.PlaylistsTable): - def __init__(self, session: Session, name: str, template_id: int) -> None: - """Create playlist with passed name""" - - self.name = name - self.last_used = dt.datetime.now() - session.add(self) - session.commit() - - # If a template is specified, copy from it - if template_id: - PlaylistRows.copy_playlist(session, template_id, self.id) - - @staticmethod - def clear_tabs(session: Session, playlist_ids: list[int]) -> None: - """ - Make all tab records NULL - """ - - session.execute( - update(Playlists) - .where(Playlists.id.in_(playlist_ids)) - .values(tab=None) - ) - - def close(self, session: Session) -> None: - """Mark playlist as unloaded""" - - self.open = False - session.commit() - - @classmethod - def get_all(cls, session: Session) -> Sequence["Playlists"]: - """Returns a list of all playlists ordered by last use""" - - return session.scalars( - select(cls) - .filter(cls.is_template.is_(False)) - .order_by(cls.last_used.desc()) - ).all() - - @classmethod - def get_all_templates(cls, session: Session) -> Sequence["Playlists"]: - """Returns a list of all templates ordered by name""" - - return session.scalars( - select(cls).where(cls.is_template.is_(True)).order_by(cls.name) - ).all() - - @classmethod - def get_favourite_templates(cls, session: Session) -> Sequence["Playlists"]: - """Returns a list of favourite templates ordered by name""" - - return session.scalars( - select(cls) - .where(cls.is_template.is_(True), cls.favourite.is_(True)) - .order_by(cls.name) - ).all() - - @classmethod - def get_closed(cls, session: Session) -> Sequence["Playlists"]: - """Returns a list of all closed playlists ordered by last use""" - - return session.scalars( - select(cls) - .filter( - cls.open.is_(False), - cls.is_template.is_(False), - ) - .order_by(cls.last_used.desc()) - ).all() - - @classmethod - def get_open(cls, session: Session) -> Sequence[Optional["Playlists"]]: - """ - Return a list of loaded playlists ordered by tab. - """ - - return session.scalars( - select(cls).where(cls.open.is_(True)).order_by(cls.tab) - ).all() - - def mark_open(self) -> None: - """Mark playlist as loaded and used now""" - - self.open = True - self.last_used = dt.datetime.now() - - @staticmethod - def name_is_available(session: Session, name: str) -> bool: - """ - Return True if no playlist of this name exists else false. - """ - - return ( - session.execute(select(Playlists).where(Playlists.name == name)).first() - is None - ) - - def rename(self, session: Session, new_name: str) -> None: - """ - Rename playlist - """ - - self.name = new_name - session.commit() - - @staticmethod - def save_as_template( - session: Session, playlist_id: int, template_name: str - ) -> None: - """Save passed playlist as new template""" - - template = Playlists(session, template_name, template_id=0) - if not template or not template.id: - return - - template.is_template = True - session.commit() - - PlaylistRows.copy_playlist(session, playlist_id, template.id) - - -class PlaylistRows(dbtables.PlaylistRowsTable): - def __init__( - self, - session: Session, - playlist_id: int, - row_number: int, - note: str = "", - track_id: Optional[int] = None, - ) -> None: - """Create PlaylistRows object""" - - self.playlist_id = playlist_id - self.track_id = track_id - self.row_number = row_number - self.note = note - session.add(self) - session.commit() - - def append_note(self, extra_note: str) -> None: - """Append passed note to any existing note""" - - current_note = self.note - if current_note: - self.note = current_note + "\n" + extra_note - else: - self.note = extra_note - - @staticmethod - def copy_playlist(session: Session, src_id: int, dst_id: int) -> None: - """Copy playlist entries""" - - src_rows = session.scalars( - select(PlaylistRows).filter(PlaylistRows.playlist_id == src_id) - ).all() - - for plr in src_rows: - PlaylistRows( - session=session, - playlist_id=dst_id, - row_number=plr.row_number, - note=plr.note, - track_id=plr.track_id, - ) - - @classmethod - def deep_row( - cls, session: Session, playlist_id: int, row_number: int - ) -> "PlaylistRows": - """ - Return a playlist row that includes full track and lastplayed data for - given playlist_id and row - """ - - # TODO: use selectinload? - stmt = ( - select(PlaylistRows) - .options(joinedload(cls.track)) - .where( - PlaylistRows.playlist_id == playlist_id, - PlaylistRows.row_number == row_number, - ) - # .options(joinedload(Tracks.playdates)) - ) - - return session.execute(stmt).unique().scalar_one() - - @staticmethod - def delete_higher_rows(session: Session, playlist_id: int, maxrow: int) -> None: - """ - Delete rows in given playlist that have a higher row number - than 'maxrow' - """ - - session.execute( - delete(PlaylistRows).where( - PlaylistRows.playlist_id == playlist_id, - PlaylistRows.row_number > maxrow, - ) - ) - session.commit() - - @staticmethod - def delete_row(session: Session, playlist_id: int, row_number: int) -> None: - """ - Delete passed row in given playlist. - """ - - session.execute( - delete(PlaylistRows).where( - PlaylistRows.playlist_id == playlist_id, - PlaylistRows.row_number == row_number, - ) - ) - - @classmethod - def plrids_to_plrs( - cls, session: Session, playlist_id: int, plr_ids: list[int] - ) -> Sequence["PlaylistRows"]: - """ - Take a list of PlaylistRows ids and return a list of corresponding - PlaylistRows objects - """ - - plrs = session.scalars( - select(cls) - .where(cls.playlist_id == playlist_id, cls.id.in_(plr_ids)) - .order_by(cls.row_number) - ).all() - - return plrs - - @staticmethod - def get_last_used_row(session: Session, playlist_id: int) -> Optional[int]: - """Return the last used row for playlist, or None if no rows""" - - return session.execute( - select(func.max(PlaylistRows.row_number)).where( - PlaylistRows.playlist_id == playlist_id - ) - ).scalar_one() - - @staticmethod - def get_track_plr( - session: Session, track_id: int, playlist_id: int - ) -> Optional["PlaylistRows"]: - """Return first matching PlaylistRows object or None""" - - return session.scalars( - select(PlaylistRows) - .where( - PlaylistRows.track_id == track_id, - PlaylistRows.playlist_id == playlist_id, - ) - .limit(1) - ).first() - - @classmethod - def get_played_rows( - cls, session: Session, playlist_id: int - ) -> Sequence["PlaylistRows"]: - """ - For passed playlist, return a list of rows that - have been played. - """ - - plrs = session.scalars( - select(cls) - .where(cls.playlist_id == playlist_id, cls.played.is_(True)) - .order_by(cls.row_number) - ).all() - - return plrs - - @classmethod - def get_playlist_rows( - cls, session: Session, playlist_id: int - ) -> Sequence["PlaylistRows"]: - """ - For passed playlist, return a list of rows. - """ - - stmt = ( - select(cls) - .where(cls.playlist_id == playlist_id) - .options(selectinload(cls.track)) - .order_by(cls.row_number) - ) - plrs = session.execute(stmt).scalars().all() - - return plrs - - @classmethod - def get_rows_with_tracks( - cls, - session: Session, - playlist_id: int, - ) -> Sequence["PlaylistRows"]: - """ - For passed playlist, return a list of rows that - contain tracks - """ - - query = select(cls).where( - cls.playlist_id == playlist_id, cls.track_id.is_not(None) - ) - plrs = session.scalars((query).order_by(cls.row_number)).all() - - return plrs - - @classmethod - def get_unplayed_rows( - cls, session: Session, playlist_id: int - ) -> Sequence["PlaylistRows"]: - """ - For passed playlist, return a list of playlist rows that - have not been played. - """ - - plrs = session.scalars( - select(cls) - .where( - cls.playlist_id == playlist_id, - cls.track_id.is_not(None), - cls.played.is_(False), - ) - .order_by(cls.row_number) - ).all() - - return plrs - - @classmethod - def insert_row( - cls, - session: Session, - playlist_id: int, - new_row_number: int, - note: str = "", - track_id: Optional[int] = None, - ) -> "PlaylistRows": - cls.move_rows_down(session, playlist_id, new_row_number, 1) - return cls( - session, - playlist_id=playlist_id, - row_number=new_row_number, - note=note, - track_id=track_id, - ) - - @staticmethod - def move_rows_down( - session: Session, playlist_id: int, starting_row: int, move_by: int - ) -> None: - """ - Create space to insert move_by additional rows by incremented row - number from starting_row to end of playlist - """ - - log.debug(f"(move_rows_down({playlist_id=}, {starting_row=}, {move_by=}") - - session.execute( - update(PlaylistRows) - .where( - (PlaylistRows.playlist_id == playlist_id), - (PlaylistRows.row_number >= starting_row), - ) - .values(row_number=PlaylistRows.row_number + move_by) - ) - - @staticmethod - def update_plr_row_numbers( - session: Session, - playlist_id: int, - sqla_map: list[dict[str, int]], - ) -> None: - """ - Take a {plrid: row_number} dictionary and update the row numbers accordingly - """ - - # Update database. Ref: - # https://docs.sqlalchemy.org/en/20/tutorial/data_update.html#the-update-sql-expression-construct - stmt = ( - update(PlaylistRows) - .where( - PlaylistRows.playlist_id == playlist_id, - PlaylistRows.id == bindparam("playlistrow_id"), - ) - .values(row_number=bindparam("row_number")) - ) - session.connection().execute(stmt, sqla_map) - - -class Queries(dbtables.QueriesTable): - def __init__( - self, - session: Session, - name: str, - filter: dbtables.Filter, - favourite: bool = False, - ) -> None: - """Create new query""" - - self.name = name - self.filter = filter - self.favourite = favourite - session.add(self) - session.commit() - - @classmethod - def get_all(cls, session: Session) -> Sequence["Queries"]: - """Returns a list of all queries ordered by name""" - - return session.scalars(select(cls).order_by(cls.name)).all() - - @classmethod - def get_favourites(cls, session: Session) -> Sequence["Queries"]: - """Returns a list of favourite queries ordered by name""" - - return session.scalars( - select(cls).where(cls.favourite.is_(True)).order_by(cls.name) - ).all() - - -class Settings(dbtables.SettingsTable): - def __init__(self, session: Session, name: str) -> None: - self.name = name - session.add(self) - session.commit() - - @classmethod - def get_setting(cls, session: Session, name: str) -> "Settings": - """Get existing setting or return new setting record""" - - try: - return session.execute(select(cls).where(cls.name == name)).scalar_one() - - except NoResultFound: - return Settings(session, name) - - -class Tracks(dbtables.TracksTable): - def __init__( - self, - session: Session, - path: str, - title: str, - artist: str, - duration: int, - start_gap: int, - fade_at: int, - silence_at: int, - bitrate: int, - ) -> None: - self.path = path - self.title = title - self.artist = artist - self.bitrate = bitrate - self.duration = duration - self.start_gap = start_gap - self.fade_at = fade_at - self.silence_at = silence_at - - try: - session.add(self) - session.commit() - except IntegrityError as error: - session.rollback() - log.error(f"Error ({error=}) importing track ({path=})") - raise ValueError(error) - - @classmethod - def get_all(cls, session: Session) -> Sequence["Tracks"]: - """Return a list of all tracks""" - - return session.scalars(select(cls)).unique().all() - - @classmethod - def all_tracks_indexed_by_id(cls, session: Session) -> dict[int, Tracks]: - """ - Return a dictionary of all tracks, keyed by title - """ - - result: dict[int, Tracks] = {} - - for track in cls.get_all(session): - result[track.id] = track - - return result - - @classmethod - def exact_title_and_artist( - cls, session: Session, title: str, artist: str - ) -> Sequence["Tracks"]: - """ - Search for exact but case-insensitive match of title and artist - """ - - return ( - session.scalars( - select(cls) - .where(cls.title.ilike(title), cls.artist.ilike(artist)) - .order_by(cls.title) - ) - .unique() - .all() - ) - - @classmethod - def get_filtered_tracks( - cls, session: Session, filter: Filter - ) -> Sequence["Tracks"]: - """ - Return tracks matching filter - """ - - # Now implemented in repostory.py - return [] - - query = select(cls) - - # Path specification - if filter.path: - if filter.path_type == "contains": - query = query.where(cls.path.ilike(f"%{filter.path}%")) - elif filter.path_type == "excluding": - query = query.where(cls.path.notilike(f"%{filter.path}%")) - else: - raise ApplicationError(f"Can't process filter path ({filter=})") - - # Duration specification - seconds_duration = filter.duration_number - if filter.duration_unit == Config.FILTER_DURATION_MINUTES: - seconds_duration *= 60 - elif filter.duration_unit != Config.FILTER_DURATION_SECONDS: - raise ApplicationError(f"Can't process filter duration ({filter=})") - - if filter.duration_type == Config.FILTER_DURATION_LONGER: - query = query.where(cls.duration >= seconds_duration) - elif filter.duration_unit == Config.FILTER_DURATION_SHORTER: - query = query.where(cls.duration <= seconds_duration) - else: - raise ApplicationError(f"Can't process filter duration type ({filter=})") - - # Process comparator - if filter.last_played_comparator == Config.FILTER_PLAYED_COMPARATOR_NEVER: - # Select tracks that have never been played - query = query.outerjoin(Playdates, cls.id == Playdates.track_id).where( - Playdates.id.is_(None) - ) - else: - # Last played specification - now = dt.datetime.now() - # Set sensible default, and correct for Config.FILTER_PLAYED_COMPARATOR_ANYTIME - before = now - # If not ANYTIME, set 'before' appropriates - if filter.last_played_comparator != Config.FILTER_PLAYED_COMPARATOR_ANYTIME: - if filter.last_played_unit == Config.FILTER_PLAYED_DAYS: - before = now - dt.timedelta(days=filter.last_played_number) - elif filter.last_played_unit == Config.FILTER_PLAYED_WEEKS: - before = now - dt.timedelta(days=7 * filter.last_played_number) - elif filter.last_played_unit == Config.FILTER_PLAYED_MONTHS: - before = now - dt.timedelta(days=30 * filter.last_played_number) - elif filter.last_played_unit == Config.FILTER_PLAYED_YEARS: - before = now - dt.timedelta(days=365 * filter.last_played_number) - - subquery = ( - select( - Playdates.track_id, - func.max(Playdates.lastplayed).label("max_last_played"), - ) - .group_by(Playdates.track_id) - .subquery() - ) - query = query.join(subquery, Tracks.id == subquery.c.track_id).where( - subquery.c.max_last_played < before - ) - - records = session.scalars(query).unique().all() - - return records - - @classmethod - def get_by_path(cls, session: Session, path: str) -> Optional["Tracks"]: - """ - Return track with passed path, or None. - """ - - try: - return ( - session.execute(select(Tracks).where(Tracks.path == path)) - .unique() - .scalar_one() - ) - except NoResultFound: - return None - - @classmethod - def search_artists(cls, session: Session, text: str) -> Sequence["Tracks"]: - """ - Search case-insenstively for artists containing str - - The query performs an outer join with 'joinedload' to populate the results - from the Playdates table at the same time. unique() needed; see - https://docs.sqlalchemy.org/en/20/orm/queryguide/relationships.html#joined-eager-loading - """ - - return ( - session.scalars( - select(cls) - .options(joinedload(Tracks.playdates)) - .where(cls.artist.ilike(f"%{text}%")) - .order_by(cls.title) - ) - .unique() - .all() - ) - - @classmethod - def search_titles(cls, session: Session, text: str) -> Sequence["Tracks"]: - """ - Search case-insenstively for titles containing str - - The query performs an outer join with 'joinedload' to populate the results - from the Playdates table at the same time. unique() needed; see - https://docs.sqlalchemy.org/en/20/orm/queryguide/relationships.html#joined-eager-loading - """ - return ( - session.scalars( - select(cls) - .options(joinedload(Tracks.playdates)) - .where(cls.title.like(f"{text}%")) - .order_by(cls.title) - ) - .unique() - .all() - ) diff --git a/migrations/env.py b/migrations/env.py index 027fd36..c6420ae 100644 --- a/migrations/env.py +++ b/migrations/env.py @@ -2,25 +2,26 @@ from importlib import import_module from alembic import context from alchemical.alembic.env import run_migrations -# this is the Alembic Config object, which provides -# access to the values within the .ini file in use. +# Load Alembic configuration config = context.config -# import the application's Alchemical instance try: - import_mod, db_name = config.get_main_option('alchemical_db', '').split( - ':') + # Import the Alchemical database instance as specified in alembic.ini + import_mod, db_name = config.get_main_option('alchemical_db', '').split(':') db = getattr(import_module(import_mod), db_name) -except (ModuleNotFoundError, AttributeError): - raise ValueError( - 'Could not import the Alchemical database instance. ' - 'Ensure that the alchemical_db setting in alembic.ini is correct.' - ) + print(f"Successfully loaded Alchemical database instance: {db}") -# run the migration engine -# The dictionary provided as second argument includes options to pass to the -# Alembic context. For details on what other options are available, see -# https://alembic.sqlalchemy.org/en/latest/autogenerate.html + # Use the metadata associated with the Alchemical instance + metadata = db.Model.metadata + print(f"Metadata tables detected: {metadata.tables.keys()}") # Debug output +except (ModuleNotFoundError, AttributeError) as e: + raise ValueError( + 'Could not import the Alchemical database instance or access metadata. ' + 'Ensure that the alchemical_db setting in alembic.ini is correct and ' + 'that the Alchemical instance is correctly configured.' + ) from e + +# Run migrations with metadata run_migrations(db, { 'render_as_batch': True, 'compare_type': True,