Pass all arguments to Tracks.__init__ on track creation Smarten up metadata collecting Reformat code Reinstate stackprinter, but with more sensible settings (mostly defaults, oddly enough)
791 lines
23 KiB
Python
791 lines
23 KiB
Python
#!/usr/bin/python3
|
|
|
|
import re
|
|
|
|
from config import Config
|
|
from dbconfig import scoped_session
|
|
|
|
from datetime import datetime
|
|
from typing import List, Optional, Sequence
|
|
|
|
from sqlalchemy.ext.associationproxy import association_proxy
|
|
|
|
from sqlalchemy import (
|
|
Boolean,
|
|
DateTime,
|
|
delete,
|
|
ForeignKey,
|
|
func,
|
|
select,
|
|
String,
|
|
update,
|
|
)
|
|
|
|
from sqlalchemy.orm import (
|
|
DeclarativeBase,
|
|
joinedload,
|
|
lazyload,
|
|
Mapped,
|
|
mapped_column,
|
|
relationship,
|
|
)
|
|
from sqlalchemy.orm.exc import (
|
|
NoResultFound,
|
|
)
|
|
from sqlalchemy.exc import (
|
|
IntegrityError,
|
|
)
|
|
from log import log
|
|
|
|
|
|
class Base(DeclarativeBase):
|
|
pass
|
|
|
|
|
|
# Database classes
|
|
class Carts(Base):
|
|
__tablename__ = "carts"
|
|
|
|
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
|
|
cart_number: Mapped[int] = mapped_column(unique=True)
|
|
name: Mapped[str] = mapped_column(String(256), index=True)
|
|
duration: Mapped[int] = mapped_column(index=True)
|
|
path: Mapped[str] = mapped_column(String(2048), index=False)
|
|
enabled: Mapped[bool] = mapped_column(default=False)
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f"<Carts(id={self.id}, cart={self.cart_number}, "
|
|
f"name={self.name}, path={self.path}>"
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
session: scoped_session,
|
|
cart_number: int,
|
|
name: Optional[str] = None,
|
|
duration: Optional[int] = None,
|
|
path: Optional[str] = None,
|
|
enabled: bool = True,
|
|
) -> None:
|
|
"""Create new cart"""
|
|
|
|
self.cart_number = cart_number
|
|
self.name = name
|
|
self.duration = duration
|
|
self.path = path
|
|
self.enabled = enabled
|
|
|
|
session.add(self)
|
|
session.commit()
|
|
|
|
|
|
class NoteColours(Base):
|
|
__tablename__ = "notecolours"
|
|
|
|
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
|
|
substring: Mapped[str] = mapped_column(String(256), index=False)
|
|
colour: Mapped[str] = mapped_column(String(21), index=False)
|
|
enabled: Mapped[bool] = mapped_column(default=True, index=True)
|
|
is_regex: Mapped[bool] = mapped_column(default=False, index=False)
|
|
is_casesensitive: Mapped[bool] = mapped_column(default=False, index=False)
|
|
order: Mapped[Optional[int]] = mapped_column(index=True)
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f"<NoteColour(id={self.id}, substring={self.substring}, "
|
|
f"colour={self.colour}>"
|
|
)
|
|
|
|
@staticmethod
|
|
def get_colour(session: scoped_session, text: str) -> Optional[str]:
|
|
"""
|
|
Parse text and return colour string if matched, else empty string
|
|
"""
|
|
|
|
if not text:
|
|
return None
|
|
|
|
for rec in (
|
|
session.execute(
|
|
select(NoteColours)
|
|
.filter(NoteColours.enabled.is_(True))
|
|
.order_by(NoteColours.order)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
):
|
|
if rec.is_regex:
|
|
flags = re.UNICODE
|
|
if not rec.is_casesensitive:
|
|
flags |= re.IGNORECASE
|
|
p = re.compile(rec.substring, flags)
|
|
if p.match(text):
|
|
return rec.colour
|
|
else:
|
|
if rec.is_casesensitive:
|
|
if rec.substring in text:
|
|
return rec.colour
|
|
else:
|
|
if rec.substring.lower() in text.lower():
|
|
return rec.colour
|
|
|
|
return None
|
|
|
|
|
|
class Playdates(Base):
|
|
__tablename__ = "playdates"
|
|
|
|
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
|
|
lastplayed: Mapped[datetime] = mapped_column(index=True)
|
|
track_id: Mapped[int] = mapped_column(ForeignKey("tracks.id"))
|
|
track: Mapped["Tracks"] = relationship("Tracks", back_populates="playdates")
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f"<Playdates(id={self.id}, track_id={self.track_id} "
|
|
f"lastplayed={self.lastplayed}>"
|
|
)
|
|
|
|
def __init__(self, session: scoped_session, track_id: int) -> None:
|
|
"""Record that track was played"""
|
|
|
|
self.lastplayed = datetime.now()
|
|
self.track_id = track_id
|
|
session.add(self)
|
|
session.commit()
|
|
|
|
@staticmethod
|
|
def last_played(session: scoped_session, track_id: int) -> datetime:
|
|
"""Return datetime track last played or None"""
|
|
|
|
last_played = session.execute(
|
|
select(Playdates.lastplayed)
|
|
.where(Playdates.track_id == track_id)
|
|
.order_by(Playdates.lastplayed.desc())
|
|
.limit(1)
|
|
).first()
|
|
|
|
if last_played:
|
|
return last_played[0]
|
|
else:
|
|
return Config.EPOCH
|
|
|
|
@staticmethod
|
|
def played_after(session: scoped_session, since: datetime) -> Sequence["Playdates"]:
|
|
"""Return a list of Playdates objects since passed time"""
|
|
|
|
return (
|
|
session.execute(
|
|
select(Playdates)
|
|
.where(Playdates.lastplayed >= since)
|
|
.order_by(Playdates.lastplayed)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
|
|
class Playlists(Base):
|
|
"""
|
|
Manage playlists
|
|
"""
|
|
|
|
__tablename__ = "playlists"
|
|
|
|
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
|
|
name: Mapped[str] = mapped_column(String(32), unique=True)
|
|
last_used: Mapped[Optional[datetime]] = mapped_column(DateTime, default=None)
|
|
tab: Mapped[Optional[int]] = mapped_column(default=None, unique=True)
|
|
is_template: Mapped[bool] = mapped_column(default=False)
|
|
deleted: Mapped[bool] = mapped_column(default=False)
|
|
rows: Mapped[List["PlaylistRows"]] = relationship(
|
|
"PlaylistRows",
|
|
back_populates="playlist",
|
|
cascade="all, delete-orphan",
|
|
order_by="PlaylistRows.plr_rownum",
|
|
)
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f"<Playlists(id={self.id}, name={self.name}, "
|
|
f"is_templatee={self.is_template}>"
|
|
)
|
|
|
|
def __init__(self, session: scoped_session, name: str):
|
|
self.name = name
|
|
session.add(self)
|
|
session.flush()
|
|
|
|
def close(self, session: scoped_session) -> None:
|
|
"""Mark playlist as unloaded"""
|
|
|
|
closed_idx = self.tab
|
|
self.tab = None
|
|
|
|
# Closing this tab will mean all higher-number tabs have moved
|
|
# down by one
|
|
session.execute(
|
|
update(Playlists)
|
|
.where(Playlists.tab > closed_idx)
|
|
.values(tab=Playlists.tab - 1)
|
|
)
|
|
|
|
@classmethod
|
|
def create_playlist_from_template(
|
|
cls, session: scoped_session, template: "Playlists", playlist_name: str
|
|
) -> Optional["Playlists"]:
|
|
"""Create a new playlist from template"""
|
|
|
|
playlist = cls(session, playlist_name)
|
|
|
|
# Sanity / mypy checks
|
|
if not playlist or not playlist.id or not template.id:
|
|
return None
|
|
|
|
PlaylistRows.copy_playlist(session, template.id, playlist.id)
|
|
|
|
return playlist
|
|
|
|
def delete(self, session: scoped_session) -> None:
|
|
"""
|
|
Mark as deleted
|
|
"""
|
|
|
|
self.deleted = True
|
|
session.flush()
|
|
|
|
@classmethod
|
|
def get_all(cls, session: scoped_session) -> Sequence["Playlists"]:
|
|
"""Returns a list of all playlists ordered by last use"""
|
|
|
|
return (
|
|
session.execute(
|
|
select(cls)
|
|
.filter(cls.is_template.is_(False))
|
|
.order_by(cls.tab.desc(), cls.last_used.desc())
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
@classmethod
|
|
def get_all_templates(cls, session: scoped_session) -> Sequence["Playlists"]:
|
|
"""Returns a list of all templates ordered by name"""
|
|
|
|
return (
|
|
session.execute(
|
|
select(cls).filter(cls.is_template.is_(True)).order_by(cls.name)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
@classmethod
|
|
def get_closed(cls, session: scoped_session) -> Sequence["Playlists"]:
|
|
"""Returns a list of all closed playlists ordered by last use"""
|
|
|
|
return (
|
|
session.execute(
|
|
select(cls)
|
|
.filter(
|
|
cls.tab.is_(None),
|
|
cls.is_template.is_(False),
|
|
cls.deleted.is_(False),
|
|
)
|
|
.order_by(cls.last_used.desc())
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
@classmethod
|
|
def get_open(cls, session: scoped_session) -> Sequence[Optional["Playlists"]]:
|
|
"""
|
|
Return a list of loaded playlists ordered by tab order.
|
|
"""
|
|
|
|
return (
|
|
session.execute(select(cls).where(cls.tab.is_not(None)).order_by(cls.tab))
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
def mark_open(self, session: scoped_session, tab_index: int) -> None:
|
|
"""Mark playlist as loaded and used now"""
|
|
|
|
self.tab = tab_index
|
|
self.last_used = datetime.now()
|
|
|
|
@staticmethod
|
|
def move_tab(session: scoped_session, frm: int, to: int) -> None:
|
|
"""Move tabs"""
|
|
|
|
row_frm = session.execute(select(Playlists).filter_by(tab=frm)).scalar_one()
|
|
|
|
row_to = session.execute(select(Playlists).filter_by(tab=to)).scalar_one()
|
|
|
|
row_frm.tab = None
|
|
row_to.tab = None
|
|
session.commit()
|
|
row_to.tab = frm
|
|
row_frm.tab = to
|
|
|
|
def rename(self, session: scoped_session, new_name: str) -> None:
|
|
"""
|
|
Rename playlist
|
|
"""
|
|
|
|
self.name = new_name
|
|
session.flush()
|
|
|
|
@staticmethod
|
|
def save_as_template(
|
|
session: scoped_session, playlist_id: int, template_name: str
|
|
) -> None:
|
|
"""Save passed playlist as new template"""
|
|
|
|
template = Playlists(session, template_name)
|
|
if not template or not template.id:
|
|
return
|
|
|
|
template.is_template = True
|
|
session.commit()
|
|
|
|
PlaylistRows.copy_playlist(session, playlist_id, template.id)
|
|
|
|
|
|
class PlaylistRows(Base):
|
|
__tablename__ = "playlist_rows"
|
|
|
|
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
|
|
plr_rownum: Mapped[int]
|
|
note: Mapped[str] = mapped_column(
|
|
String(2048), index=False, default="", nullable=False
|
|
)
|
|
playlist_id: Mapped[int] = mapped_column(ForeignKey("playlists.id"))
|
|
playlist: Mapped[Playlists] = relationship(back_populates="rows")
|
|
track_id: Mapped[Optional[int]] = mapped_column(ForeignKey("tracks.id"))
|
|
track: Mapped["Tracks"] = relationship(
|
|
"Tracks",
|
|
back_populates="playlistrows",
|
|
)
|
|
played: Mapped[bool] = mapped_column(
|
|
Boolean, nullable=False, index=False, default=False
|
|
)
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f"<PlaylistRow(id={self.id}, playlist_id={self.playlist_id}, "
|
|
f"track_id={self.track_id}, "
|
|
f"note={self.note}, plr_rownum={self.plr_rownum}>"
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
session: scoped_session,
|
|
playlist_id: int,
|
|
track_id: Optional[int],
|
|
row_number: int,
|
|
note: str = "",
|
|
) -> None:
|
|
"""Create PlaylistRows object"""
|
|
|
|
self.playlist_id = playlist_id
|
|
self.track_id = track_id
|
|
self.plr_rownum = row_number
|
|
self.note = note
|
|
session.add(self)
|
|
session.flush()
|
|
|
|
def append_note(self, extra_note: str) -> None:
|
|
"""Append passed note to any existing note"""
|
|
|
|
current_note = self.note
|
|
if current_note:
|
|
self.note = current_note + "\n" + extra_note
|
|
else:
|
|
self.note = extra_note
|
|
|
|
@staticmethod
|
|
def copy_playlist(session: scoped_session, src_id: int, dst_id: int) -> None:
|
|
"""Copy playlist entries"""
|
|
|
|
src_rows = (
|
|
session.execute(
|
|
select(PlaylistRows).filter(PlaylistRows.playlist_id == src_id)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
for plr in src_rows:
|
|
PlaylistRows(session, dst_id, plr.track_id, plr.plr_rownum, plr.note)
|
|
|
|
@staticmethod
|
|
def delete_higher_rows(
|
|
session: scoped_session, playlist_id: int, maxrow: int
|
|
) -> None:
|
|
"""
|
|
Delete rows in given playlist that have a higher row number
|
|
than 'maxrow'
|
|
"""
|
|
|
|
session.execute(
|
|
delete(PlaylistRows).where(
|
|
PlaylistRows.playlist_id == playlist_id,
|
|
PlaylistRows.plr_rownum > maxrow,
|
|
)
|
|
)
|
|
session.flush()
|
|
|
|
@classmethod
|
|
def deep_rows(
|
|
cls, session: scoped_session, playlist_id: int
|
|
) -> Sequence["PlaylistRows"]:
|
|
"""
|
|
Return a list of playlist rows that include full track and lastplayed data for
|
|
given playlist_id., Sequence
|
|
"""
|
|
|
|
stmt = (
|
|
select(PlaylistRows)
|
|
.options(joinedload(cls.track))
|
|
.where(PlaylistRows.playlist_id == playlist_id)
|
|
.order_by(PlaylistRows.plr_rownum)
|
|
# .options(joinedload(Tracks.playdates))
|
|
)
|
|
|
|
return session.scalars(stmt).unique().all()
|
|
|
|
@staticmethod
|
|
def fixup_rownumbers(session: scoped_session, playlist_id: int) -> None:
|
|
"""
|
|
Ensure the row numbers for passed playlist have no gaps
|
|
"""
|
|
|
|
plrs = (
|
|
session.execute(
|
|
select(PlaylistRows)
|
|
.where(PlaylistRows.playlist_id == playlist_id)
|
|
.order_by(PlaylistRows.plr_rownum)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
for i, plr in enumerate(plrs):
|
|
plr.plr_rownum = i
|
|
|
|
# Ensure new row numbers are available to the caller
|
|
session.commit()
|
|
|
|
@classmethod
|
|
def plrids_to_plrs(
|
|
cls, session: scoped_session, playlist_id: int, plr_ids: List[int]
|
|
) -> Sequence["PlaylistRows"]:
|
|
"""
|
|
Take a list of PlaylistRows ids and return a list of corresponding
|
|
PlaylistRows objects
|
|
"""
|
|
|
|
plrs = (
|
|
session.execute(
|
|
select(cls)
|
|
.where(cls.playlist_id == playlist_id, cls.id.in_(plr_ids))
|
|
.order_by(cls.plr_rownum)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
return plrs
|
|
|
|
@staticmethod
|
|
def get_last_used_row(session: scoped_session, playlist_id: int) -> Optional[int]:
|
|
"""Return the last used row for playlist, or None if no rows"""
|
|
|
|
return session.execute(
|
|
select(func.max(PlaylistRows.plr_rownum)).where(
|
|
PlaylistRows.playlist_id == playlist_id
|
|
)
|
|
).scalar_one()
|
|
|
|
@staticmethod
|
|
def get_track_plr(
|
|
session: scoped_session, track_id: int, playlist_id: int
|
|
) -> Optional["PlaylistRows"]:
|
|
"""Return first matching PlaylistRows object or None"""
|
|
|
|
return session.scalars(
|
|
select(PlaylistRows)
|
|
.where(
|
|
PlaylistRows.track_id == track_id,
|
|
PlaylistRows.playlist_id == playlist_id,
|
|
)
|
|
.limit(1)
|
|
).first()
|
|
|
|
@classmethod
|
|
def get_played_rows(
|
|
cls, session: scoped_session, playlist_id: int
|
|
) -> Sequence["PlaylistRows"]:
|
|
"""
|
|
For passed playlist, return a list of rows that
|
|
have been played.
|
|
"""
|
|
|
|
plrs = (
|
|
session.execute(
|
|
select(cls)
|
|
.where(cls.playlist_id == playlist_id, cls.played.is_(True))
|
|
.order_by(cls.plr_rownum)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
return plrs
|
|
|
|
@classmethod
|
|
def get_rows_with_tracks(
|
|
cls,
|
|
session: scoped_session,
|
|
playlist_id: int,
|
|
from_row: Optional[int] = None,
|
|
to_row: Optional[int] = None,
|
|
) -> Sequence["PlaylistRows"]:
|
|
"""
|
|
For passed playlist, return a list of rows that
|
|
contain tracks
|
|
"""
|
|
|
|
query = select(cls).where(
|
|
cls.playlist_id == playlist_id, cls.track_id.is_not(None)
|
|
)
|
|
if from_row is not None:
|
|
query = query.where(cls.plr_rownum >= from_row)
|
|
if to_row is not None:
|
|
query = query.where(cls.plr_rownum <= to_row)
|
|
|
|
plrs = session.execute((query).order_by(cls.plr_rownum)).scalars().all()
|
|
|
|
return plrs
|
|
|
|
@classmethod
|
|
def get_unplayed_rows(
|
|
cls, session: scoped_session, playlist_id: int
|
|
) -> Sequence["PlaylistRows"]:
|
|
"""
|
|
For passed playlist, return a list of playlist rows that
|
|
have not been played.
|
|
"""
|
|
|
|
plrs = (
|
|
session.execute(
|
|
select(cls)
|
|
.where(
|
|
cls.playlist_id == playlist_id,
|
|
cls.track_id.is_not(None),
|
|
cls.played.is_(False),
|
|
)
|
|
.order_by(cls.plr_rownum)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
return plrs
|
|
|
|
@staticmethod
|
|
def move_rows_down(
|
|
session: scoped_session, playlist_id: int, starting_row: int, move_by: int
|
|
) -> None:
|
|
"""
|
|
Create space to insert move_by additional rows by incremented row
|
|
number from starting_row to end of playlist
|
|
"""
|
|
|
|
session.execute(
|
|
update(PlaylistRows)
|
|
.where(
|
|
(PlaylistRows.playlist_id == playlist_id),
|
|
(PlaylistRows.plr_rownum >= starting_row),
|
|
)
|
|
.values(plr_rownum=PlaylistRows.plr_rownum + move_by)
|
|
)
|
|
|
|
|
|
class Settings(Base):
|
|
"""Manage settings"""
|
|
|
|
__tablename__ = "settings"
|
|
|
|
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
|
|
name: Mapped[str] = mapped_column(String(64), unique=True)
|
|
f_datetime: Mapped[Optional[datetime]] = mapped_column(default=None)
|
|
f_int: Mapped[Optional[int]] = mapped_column(default=None)
|
|
f_string: Mapped[Optional[str]] = mapped_column(String(128), default=None)
|
|
|
|
def __repr__(self) -> str:
|
|
value = self.f_datetime or self.f_int or self.f_string
|
|
return f"<Settings(id={self.id}, name={self.name}, {value=}>"
|
|
|
|
def __init__(self, session: scoped_session, name: str):
|
|
self.name = name
|
|
session.add(self)
|
|
session.flush()
|
|
|
|
@classmethod
|
|
def all_as_dict(cls, session):
|
|
"""
|
|
Return all setting in a dictionary keyed by name
|
|
"""
|
|
|
|
result = {}
|
|
|
|
settings = session.execute(select(cls)).scalars().all()
|
|
for setting in settings:
|
|
result[setting.name] = setting
|
|
|
|
return result
|
|
|
|
@classmethod
|
|
def get_int_settings(cls, session: scoped_session, name: str) -> "Settings":
|
|
"""Get setting for an integer or return new setting record"""
|
|
|
|
try:
|
|
return session.execute(select(cls).where(cls.name == name)).scalar_one()
|
|
|
|
except NoResultFound:
|
|
return Settings(session, name)
|
|
|
|
def update(self, session: scoped_session, data: dict):
|
|
for key, value in data.items():
|
|
assert hasattr(self, key)
|
|
setattr(self, key, value)
|
|
session.flush()
|
|
|
|
|
|
class Tracks(Base):
|
|
__tablename__ = "tracks"
|
|
|
|
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
|
|
title: Mapped[str] = mapped_column(String(256), index=True)
|
|
artist: Mapped[str] = mapped_column(String(256), index=True)
|
|
bitrate: Mapped[Optional[int]] = mapped_column(default=None)
|
|
duration: Mapped[int] = mapped_column(index=True)
|
|
fade_at: Mapped[int] = mapped_column(index=False)
|
|
mtime: Mapped[float] = mapped_column(index=True)
|
|
path: Mapped[str] = mapped_column(String(2048), index=False, unique=True)
|
|
silence_at: Mapped[int] = mapped_column(index=False)
|
|
start_gap: Mapped[int] = mapped_column(index=False)
|
|
playlistrows: Mapped[List[PlaylistRows]] = relationship(
|
|
"PlaylistRows", back_populates="track"
|
|
)
|
|
playlists = association_proxy("playlistrows", "playlist")
|
|
playdates: Mapped[List[Playdates]] = relationship(
|
|
"Playdates",
|
|
back_populates="track",
|
|
lazy="joined",
|
|
)
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f"<Track(id={self.id}, title={self.title}, "
|
|
f"artist={self.artist}, path={self.path}>"
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
session: scoped_session,
|
|
path: str,
|
|
title: str,
|
|
artist: str,
|
|
duration: int,
|
|
start_gap: int,
|
|
fade_at: int,
|
|
silence_at: int,
|
|
mtime: int,
|
|
bitrate: int
|
|
):
|
|
self.path = path
|
|
self.title = title
|
|
self.artist = artist
|
|
self.bitrate = bitrate
|
|
self.duration = duration
|
|
self.start_gap = start_gap
|
|
self.fade_at = fade_at
|
|
self.silence_at = silence_at
|
|
self.mtime = mtime
|
|
|
|
try:
|
|
session.add(self)
|
|
session.commit()
|
|
except IntegrityError as error:
|
|
session.rollback()
|
|
log.error(f"Error ({error=}) importing track ({path=})")
|
|
raise ValueError
|
|
|
|
@classmethod
|
|
def get_all(cls, session) -> List["Tracks"]:
|
|
"""Return a list of all tracks"""
|
|
|
|
return session.execute(select(cls)).scalars().all()
|
|
|
|
@classmethod
|
|
def get_by_path(cls, session: scoped_session, path: str) -> Optional["Tracks"]:
|
|
"""
|
|
Return track with passed path, or None.
|
|
"""
|
|
|
|
try:
|
|
return session.execute(
|
|
select(Tracks).where(Tracks.path == path)
|
|
).scalar_one()
|
|
except NoResultFound:
|
|
return None
|
|
|
|
@classmethod
|
|
def search_artists(cls, session: scoped_session, text: str) -> Sequence["Tracks"]:
|
|
"""
|
|
Search case-insenstively for artists containing str
|
|
|
|
The query performs an outer join with 'joinedload' to populate the results
|
|
from the Playdates table at the same time. unique() needed; see
|
|
https://docs.sqlalchemy.org/en/20/orm/queryguide/relationships.html#joined-eager-loading
|
|
"""
|
|
|
|
return (
|
|
session.execute(
|
|
select(cls)
|
|
.options(joinedload(Tracks.playdates))
|
|
.where(cls.artist.ilike(f"%{text}%"))
|
|
.order_by(cls.title)
|
|
)
|
|
.scalars()
|
|
.unique()
|
|
.all()
|
|
)
|
|
|
|
@classmethod
|
|
def search_titles(cls, session: scoped_session, text: str) -> Sequence["Tracks"]:
|
|
"""
|
|
Search case-insenstively for titles containing str
|
|
|
|
The query performs an outer join with 'joinedload' to populate the results
|
|
from the Playdates table at the same time. unique() needed; see
|
|
https://docs.sqlalchemy.org/en/20/orm/queryguide/relationships.html#joined-eager-loading
|
|
"""
|
|
return (
|
|
session.execute(
|
|
select(cls)
|
|
.options(joinedload(Tracks.playdates))
|
|
.where(cls.title.like(f"{text}%"))
|
|
.order_by(cls.title)
|
|
)
|
|
.scalars()
|
|
.unique()
|
|
.all()
|
|
)
|