709 lines
21 KiB
Python
709 lines
21 KiB
Python
#!/usr/bin/python3
|
|
|
|
import os.path
|
|
import re
|
|
import stackprinter # type: ignore
|
|
|
|
from dbconfig import Session, scoped_session
|
|
|
|
from datetime import datetime
|
|
from typing import List, Optional
|
|
|
|
from sqlalchemy.ext.associationproxy import association_proxy
|
|
|
|
from sqlalchemy import (
|
|
Boolean,
|
|
Column,
|
|
DateTime,
|
|
delete,
|
|
Float,
|
|
ForeignKey,
|
|
func,
|
|
Integer,
|
|
select,
|
|
String,
|
|
update,
|
|
)
|
|
|
|
from sqlalchemy.orm import (
|
|
declarative_base,
|
|
relationship,
|
|
)
|
|
from sqlalchemy.orm.exc import (
|
|
NoResultFound
|
|
)
|
|
from config import Config
|
|
from helpers import (
|
|
fade_point,
|
|
get_audio_segment,
|
|
get_tags,
|
|
leading_silence,
|
|
trailing_silence,
|
|
)
|
|
from log import log
|
|
Base = declarative_base()
|
|
|
|
|
|
# Database classes
|
|
class Carts(Base):
|
|
__tablename__ = 'carts'
|
|
|
|
id: int = Column(Integer, primary_key=True, autoincrement=True)
|
|
cart_number = Column(Integer, nullable=False, unique=True)
|
|
name = Column(String(256), index=True)
|
|
duration = Column(Integer, index=True)
|
|
path = Column(String(2048), index=False)
|
|
enabled = Column(Boolean, default=False, nullable=False)
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f"<Carts(id={self.id}, cart={self.cart_number}, "
|
|
f"name={self.name}, path={self.path}>"
|
|
)
|
|
|
|
def __init__(self, session: scoped_session, cart_number: int,
|
|
name: Optional[str] = None,
|
|
duration: Optional[int] = None, path: Optional[str] = None,
|
|
enabled: bool = True) -> None:
|
|
"""Create new cart"""
|
|
|
|
self.cart_number = cart_number
|
|
self.name = name
|
|
self.duration = duration
|
|
self.path = path
|
|
self.enabled = enabled
|
|
|
|
session.add(self)
|
|
session.commit()
|
|
|
|
|
|
class NoteColours(Base):
|
|
__tablename__ = 'notecolours'
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
substring = Column(String(256), index=False)
|
|
colour = Column(String(21), index=False)
|
|
enabled = Column(Boolean, default=True, index=True)
|
|
is_regex = Column(Boolean, default=False, index=False)
|
|
is_casesensitive = Column(Boolean, default=False, index=False)
|
|
order = Column(Integer, index=True)
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f"<NoteColour(id={self.id}, substring={self.substring}, "
|
|
f"colour={self.colour}>"
|
|
)
|
|
|
|
@staticmethod
|
|
def get_colour(session: scoped_session, text: str) -> Optional[str]:
|
|
"""
|
|
Parse text and return colour string if matched, else None
|
|
"""
|
|
|
|
if not text:
|
|
return None
|
|
|
|
for rec in session.execute(
|
|
select(NoteColours)
|
|
.filter(NoteColours.enabled.is_(True))
|
|
.order_by(NoteColours.order)
|
|
).scalars().all():
|
|
if rec.is_regex:
|
|
flags = re.UNICODE
|
|
if not rec.is_casesensitive:
|
|
flags |= re.IGNORECASE
|
|
p = re.compile(rec.substring, flags)
|
|
if p.match(text):
|
|
return rec.colour
|
|
else:
|
|
if rec.is_casesensitive:
|
|
if rec.substring in text:
|
|
return rec.colour
|
|
else:
|
|
if rec.substring.lower() in text.lower():
|
|
return rec.colour
|
|
|
|
return None
|
|
|
|
|
|
class Playdates(Base):
|
|
__tablename__ = 'playdates'
|
|
|
|
id: int = Column(Integer, primary_key=True, autoincrement=True)
|
|
lastplayed = Column(DateTime, index=True, default=None)
|
|
track_id = Column(Integer, ForeignKey('tracks.id'))
|
|
track: "Tracks" = relationship("Tracks", back_populates="playdates")
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f"<Playdates(id={self.id}, track_id={self.track_id} "
|
|
f"lastplayed={self.lastplayed}>"
|
|
)
|
|
|
|
def __init__(self, session: scoped_session, track_id: int) -> None:
|
|
"""Record that track was played"""
|
|
|
|
self.lastplayed = datetime.now()
|
|
self.track_id = track_id
|
|
session.add(self)
|
|
session.commit()
|
|
|
|
@staticmethod
|
|
def last_played(session: scoped_session,
|
|
track_id: int) -> Optional[datetime]:
|
|
"""Return datetime track last played or None"""
|
|
|
|
last_played = session.execute(
|
|
select(Playdates.lastplayed)
|
|
.where(Playdates.track_id == track_id)
|
|
.order_by(Playdates.lastplayed.desc())
|
|
.limit(1)
|
|
).first()
|
|
|
|
if last_played:
|
|
return last_played[0]
|
|
else:
|
|
return None
|
|
|
|
@staticmethod
|
|
def played_after(session: scoped_session,
|
|
since: datetime) -> List["Playdates"]:
|
|
"""Return a list of Playdates objects since passed time"""
|
|
|
|
return (
|
|
session.execute(
|
|
select(Playdates)
|
|
.where(Playdates.lastplayed >= since)
|
|
.order_by(Playdates.lastplayed)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
|
|
class Playlists(Base):
|
|
"""
|
|
Manage playlists
|
|
"""
|
|
|
|
__tablename__ = "playlists"
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True, nullable=False)
|
|
name = Column(String(32), nullable=False, unique=True)
|
|
last_used = Column(DateTime, default=None, nullable=True)
|
|
tab = Column(Integer, default=None, nullable=True, unique=True)
|
|
sort_column = Column(Integer, default=None, nullable=True, unique=False)
|
|
is_template = Column(Boolean, default=False, nullable=False)
|
|
query = Column(String(256), default=None, nullable=True, unique=False)
|
|
deleted = Column(Boolean, default=False, nullable=False)
|
|
rows: "PlaylistRows" = relationship(
|
|
"PlaylistRows",
|
|
back_populates="playlist",
|
|
cascade="all, delete-orphan",
|
|
order_by="PlaylistRows.row_number"
|
|
)
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f"<Playlists(id={self.id}, name={self.name}, "
|
|
f"is_templatee={self.is_template}>"
|
|
)
|
|
|
|
def __init__(self, session: scoped_session, name: str):
|
|
self.name = name
|
|
session.add(self)
|
|
session.flush()
|
|
|
|
def close(self, session: scoped_session) -> None:
|
|
"""Mark playlist as unloaded"""
|
|
|
|
closed_idx = self.tab
|
|
self.tab = None
|
|
|
|
# Closing this tab will mean all higher-number tabs have moved
|
|
# down by one
|
|
session.execute(
|
|
update(Playlists)
|
|
.where(Playlists.tab > closed_idx)
|
|
.values(tab=Playlists.tab - 1)
|
|
)
|
|
|
|
@classmethod
|
|
def create_playlist_from_template(cls,
|
|
session: scoped_session,
|
|
template: "Playlists",
|
|
playlist_name: str) \
|
|
-> Optional["Playlists"]:
|
|
"""Create a new playlist from template"""
|
|
|
|
playlist = cls(session, playlist_name)
|
|
|
|
# Sanity / mypy checks
|
|
if not playlist or not playlist.id or not template.id:
|
|
return None
|
|
|
|
PlaylistRows.copy_playlist(session, template.id, playlist.id)
|
|
|
|
return playlist
|
|
|
|
@classmethod
|
|
def get_all(cls, session: scoped_session) -> List["Playlists"]:
|
|
"""Returns a list of all playlists ordered by last use"""
|
|
|
|
return (
|
|
session.execute(
|
|
select(cls)
|
|
.filter(cls.is_template.is_(False))
|
|
.order_by(cls.tab.desc(), cls.last_used.desc())
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
@classmethod
|
|
def get_all_templates(cls, session: scoped_session) -> List["Playlists"]:
|
|
"""Returns a list of all templates ordered by name"""
|
|
|
|
return (
|
|
session.execute(
|
|
select(cls)
|
|
.filter(cls.is_template.is_(True))
|
|
.order_by(cls.name)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
@classmethod
|
|
def get_closed(cls, session: scoped_session) -> List["Playlists"]:
|
|
"""Returns a list of all closed playlists ordered by last use"""
|
|
|
|
return (
|
|
session.execute(
|
|
select(cls)
|
|
.filter(
|
|
cls.tab.is_(None),
|
|
cls.is_template.is_(False)
|
|
)
|
|
.order_by(cls.last_used.desc())
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
@classmethod
|
|
def get_open(cls, session: scoped_session) -> List[Optional["Playlists"]]:
|
|
"""
|
|
Return a list of loaded playlists ordered by tab order.
|
|
"""
|
|
|
|
return (
|
|
session.execute(
|
|
select(cls)
|
|
.where(cls.tab.is_not(None))
|
|
.order_by(cls.tab)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
def mark_open(self, session: scoped_session, tab_index: int) -> None:
|
|
"""Mark playlist as loaded and used now"""
|
|
|
|
self.tab = tab_index
|
|
self.last_used = datetime.now()
|
|
|
|
@staticmethod
|
|
def move_tab(session: scoped_session, frm: int, to: int) -> None:
|
|
"""Move tabs"""
|
|
|
|
row_frm = session.execute(
|
|
select(Playlists)
|
|
.filter_by(tab=frm)
|
|
).scalar_one()
|
|
|
|
row_to = session.execute(
|
|
select(Playlists)
|
|
.filter_by(tab=to)
|
|
).scalar_one()
|
|
|
|
row_frm.tab = None
|
|
row_to.tab = None
|
|
session.commit()
|
|
row_to.tab = frm
|
|
row_frm.tab = to
|
|
|
|
@staticmethod
|
|
def save_as_template(session: scoped_session,
|
|
playlist_id: int, template_name: str) -> None:
|
|
"""Save passed playlist as new template"""
|
|
|
|
template = Playlists(session, template_name)
|
|
if not template or not template.id:
|
|
return
|
|
|
|
template.is_template = True
|
|
session.commit()
|
|
|
|
PlaylistRows.copy_playlist(session, playlist_id, template.id)
|
|
|
|
|
|
class PlaylistRows(Base):
|
|
__tablename__ = 'playlist_rows'
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
row_number = Column(Integer, nullable=False)
|
|
note = Column(String(2048), index=False)
|
|
playlist_id = Column(Integer, ForeignKey('playlists.id'), nullable=False)
|
|
playlist: Playlists = relationship(Playlists, back_populates="rows")
|
|
track_id = Column(Integer, ForeignKey('tracks.id'), nullable=True)
|
|
track: "Tracks" = relationship("Tracks", back_populates="playlistrows")
|
|
played = Column(Boolean, nullable=False, index=False, default=False)
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f"<PlaylistRow(id={self.id}, playlist_id={self.playlist_id}, "
|
|
f"track_id={self.track_id}, "
|
|
f"note={self.note}, row_number={self.row_number}>"
|
|
)
|
|
|
|
def __init__(self,
|
|
session: scoped_session,
|
|
playlist_id: int,
|
|
track_id: int,
|
|
row_number: int,
|
|
note: Optional[str] = None
|
|
) -> None:
|
|
"""Create PlaylistRows object"""
|
|
|
|
self.playlist_id = playlist_id
|
|
self.track_id = track_id
|
|
self.row_number = row_number
|
|
self.note = note
|
|
session.add(self)
|
|
session.flush()
|
|
|
|
def append_note(self, extra_note: str) -> None:
|
|
"""Append passed note to any existing note"""
|
|
|
|
current_note = self.note
|
|
if current_note:
|
|
self.note = current_note + '\n' + extra_note
|
|
else:
|
|
self.note = extra_note
|
|
|
|
@staticmethod
|
|
def copy_playlist(session: scoped_session,
|
|
src_id: int,
|
|
dst_id: int) -> None:
|
|
"""Copy playlist entries"""
|
|
|
|
src_rows = session.execute(
|
|
select(PlaylistRows)
|
|
.filter(PlaylistRows.playlist_id == src_id)
|
|
).scalars().all()
|
|
|
|
for plr in src_rows:
|
|
PlaylistRows(session, dst_id, plr.track_id, plr.row_number,
|
|
plr.note)
|
|
|
|
@staticmethod
|
|
def delete_plrids_not_in_list(session: scoped_session, playlist_id: int,
|
|
plrids: List["PlaylistRows"]) -> None:
|
|
"""
|
|
Delete rows in given playlist that have a higher row number
|
|
than 'maxrow'
|
|
"""
|
|
|
|
session.execute(
|
|
delete(PlaylistRows)
|
|
.where(
|
|
PlaylistRows.playlist_id == playlist_id,
|
|
PlaylistRows.id.not_in(plrids)
|
|
)
|
|
)
|
|
# Delete won't take effect until commit()
|
|
session.commit()
|
|
|
|
@staticmethod
|
|
def fixup_rownumbers(session: scoped_session, playlist_id: int) -> None:
|
|
"""
|
|
Ensure the row numbers for passed playlist have no gaps
|
|
"""
|
|
|
|
plrs = session.execute(
|
|
select(PlaylistRows)
|
|
.where(PlaylistRows.playlist_id == playlist_id)
|
|
.order_by(PlaylistRows.row_number)
|
|
).scalars().all()
|
|
|
|
for i, plr in enumerate(plrs):
|
|
plr.row_number = i
|
|
|
|
# Ensure new row numbers are available to the caller
|
|
session.commit()
|
|
|
|
@staticmethod
|
|
def get_track_plr(session: scoped_session, track_id: int,
|
|
playlist_id: int) -> Optional["PlaylistRows"]:
|
|
"""Return first matching PlaylistRows object or None"""
|
|
|
|
return session.scalars(
|
|
select(PlaylistRows)
|
|
.where(
|
|
PlaylistRows.track_id == track_id,
|
|
PlaylistRows.playlist_id == playlist_id
|
|
)
|
|
.limit(1)
|
|
).first()
|
|
|
|
@staticmethod
|
|
def get_last_used_row(session: scoped_session,
|
|
playlist_id: int) -> Optional[int]:
|
|
"""Return the last used row for playlist, or None if no rows"""
|
|
|
|
return session.execute(
|
|
select(func.max(PlaylistRows.row_number))
|
|
.where(PlaylistRows.playlist_id == playlist_id)
|
|
).scalar_one()
|
|
|
|
@classmethod
|
|
def get_played_rows(cls, session: scoped_session,
|
|
playlist_id: int) -> List[int]:
|
|
"""
|
|
For passed playlist, return a list of rows that
|
|
have been played.
|
|
"""
|
|
|
|
plrs = session.execute(
|
|
select(cls)
|
|
.where(
|
|
cls.playlist_id == playlist_id,
|
|
cls.played.is_(True)
|
|
)
|
|
.order_by(cls.row_number)
|
|
).scalars().all()
|
|
|
|
return plrs
|
|
|
|
@classmethod
|
|
def get_rows_with_tracks(cls, session: scoped_session,
|
|
playlist_id: int) -> List[int]:
|
|
"""
|
|
For passed playlist, return a list of rows that
|
|
contain tracks
|
|
"""
|
|
|
|
plrs = session.execute(
|
|
select(cls)
|
|
.where(
|
|
cls.playlist_id == playlist_id,
|
|
cls.track_id.is_not(None)
|
|
)
|
|
.order_by(cls.row_number)
|
|
).scalars().all()
|
|
|
|
return plrs
|
|
|
|
@classmethod
|
|
def get_unplayed_rows(cls, session: scoped_session,
|
|
playlist_id: int) -> List["PlaylistRows"]:
|
|
"""
|
|
For passed playlist, return a list of playlist rows that
|
|
have not been played.
|
|
"""
|
|
|
|
plrs = session.execute(
|
|
select(cls)
|
|
.where(
|
|
cls.playlist_id == playlist_id,
|
|
cls.track_id.is_not(None),
|
|
cls.played.is_(False)
|
|
)
|
|
.order_by(cls.row_number)
|
|
).scalars().all()
|
|
|
|
return plrs
|
|
|
|
@staticmethod
|
|
def move_rows_down(session: scoped_session, playlist_id: int,
|
|
starting_row: int, move_by: int) -> None:
|
|
"""
|
|
Create space to insert move_by additional rows by incremented row
|
|
number from starting_row to end of playlist
|
|
"""
|
|
|
|
session.execute(
|
|
update(PlaylistRows)
|
|
.where(
|
|
(PlaylistRows.playlist_id == playlist_id),
|
|
(PlaylistRows.row_number >= starting_row)
|
|
)
|
|
.values(row_number=PlaylistRows.row_number + move_by)
|
|
)
|
|
|
|
@staticmethod
|
|
def indexed_by_id(session: scoped_session, plr_ids: List[int]) -> dict:
|
|
"""
|
|
Return a dictionary of playlist_rows indexed by their plr id from
|
|
the passed plr_id list.
|
|
"""
|
|
|
|
plrs = session.execute(
|
|
select(PlaylistRows)
|
|
.where(
|
|
PlaylistRows.id.in_(plr_ids)
|
|
)
|
|
).scalars().all()
|
|
|
|
result = {}
|
|
for plr in plrs:
|
|
result[plr.id] = plr
|
|
|
|
return result
|
|
|
|
|
|
class Settings(Base):
|
|
"""Manage settings"""
|
|
|
|
__tablename__ = 'settings'
|
|
|
|
id: int = Column(Integer, primary_key=True, autoincrement=True)
|
|
name: str = Column(String(64), nullable=False, unique=True)
|
|
f_datetime = Column(DateTime, default=None, nullable=True)
|
|
f_int: int = Column(Integer, default=None, nullable=True)
|
|
f_string = Column(String(128), default=None, nullable=True)
|
|
|
|
def __repr__(self) -> str:
|
|
value = self.f_datetime or self.f_int or self.f_string
|
|
return f"<Settings(id={self.id}, name={self.name}, {value=}>"
|
|
|
|
def __init__(self, session: scoped_session, name: str):
|
|
|
|
self.name = name
|
|
session.add(self)
|
|
session.flush()
|
|
|
|
@classmethod
|
|
def get_int_settings(cls, session: scoped_session,
|
|
name: str) -> "Settings":
|
|
"""Get setting for an integer or return new setting record"""
|
|
|
|
try:
|
|
return session.execute(
|
|
select(cls)
|
|
.where(cls.name == name)
|
|
).scalar_one()
|
|
|
|
except NoResultFound:
|
|
return Settings(session, name)
|
|
|
|
def update(self, session: scoped_session, data: dict):
|
|
for key, value in data.items():
|
|
assert hasattr(self, key)
|
|
setattr(self, key, value)
|
|
session.flush()
|
|
|
|
|
|
class Tracks(Base):
|
|
__tablename__ = 'tracks'
|
|
|
|
id: int = Column(Integer, primary_key=True, autoincrement=True)
|
|
title = Column(String(256), index=True)
|
|
artist = Column(String(256), index=True)
|
|
duration = Column(Integer, index=True)
|
|
start_gap = Column(Integer, index=False)
|
|
fade_at = Column(Integer, index=False)
|
|
silence_at = Column(Integer, index=False)
|
|
path = Column(String(2048), index=False, nullable=False, unique=True)
|
|
mtime = Column(Float, index=True)
|
|
bitrate = Column(Integer, nullable=True, default=None)
|
|
playlistrows: PlaylistRows = relationship("PlaylistRows",
|
|
back_populates="track")
|
|
playlists = association_proxy("playlistrows", "playlist")
|
|
playdates: Playdates = relationship("Playdates", back_populates="track")
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f"<Track(id={self.id}, title={self.title}, "
|
|
f"artist={self.artist}, path={self.path}>"
|
|
)
|
|
|
|
def __init__(
|
|
self,
|
|
session: scoped_session,
|
|
path: str,
|
|
title: Optional[str] = None,
|
|
artist: Optional[str] = None,
|
|
duration: int = 0,
|
|
start_gap: int = 0,
|
|
fade_at: Optional[int] = None,
|
|
silence_at: Optional[int] = None,
|
|
mtime: Optional[float] = None,
|
|
lastplayed: Optional[datetime] = None,
|
|
) -> None:
|
|
self.path = path
|
|
self.title = title
|
|
self.artist = artist
|
|
self.duration = duration
|
|
self.start_gap = start_gap
|
|
self.fade_at = fade_at
|
|
self.silence_at = silence_at
|
|
self.mtime = mtime
|
|
self.lastplayed = lastplayed
|
|
|
|
session.add(self)
|
|
session.commit()
|
|
|
|
@classmethod
|
|
def get_all(cls, session) -> List["Tracks"]:
|
|
"""Return a list of all tracks"""
|
|
|
|
return session.execute(select(cls)).scalars().all()
|
|
|
|
@classmethod
|
|
def get_by_path(cls, session: scoped_session,
|
|
path: str) -> Optional["Tracks"]:
|
|
"""
|
|
Return track with passed path, or None.
|
|
"""
|
|
|
|
try:
|
|
return (
|
|
session.execute(
|
|
select(Tracks)
|
|
.where(Tracks.path == path)
|
|
).scalar_one()
|
|
)
|
|
except NoResultFound:
|
|
return None
|
|
|
|
@classmethod
|
|
def search_artists(cls, session: scoped_session,
|
|
text: str) -> List["Tracks"]:
|
|
"""Search case-insenstively for artists containing str"""
|
|
|
|
return (
|
|
session.execute(
|
|
select(cls)
|
|
.where(cls.artist.ilike(f"%{text}%"))
|
|
.order_by(cls.title)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
|
|
@classmethod
|
|
def search_titles(cls, session: scoped_session,
|
|
text: str) -> List["Tracks"]:
|
|
"""Search case-insenstively for titles containing str"""
|
|
return (
|
|
session.execute(
|
|
select(cls)
|
|
.where(cls.title.ilike(f"%{text}%"))
|
|
.order_by(cls.title)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|