1348 lines
37 KiB
Python
1348 lines
37 KiB
Python
# Standard library imports
|
|
import datetime as dt
|
|
import os
|
|
import re
|
|
import sys
|
|
|
|
# PyQt imports
|
|
|
|
# Third party imports
|
|
from dogpile.cache import make_region
|
|
from dogpile.cache.api import NO_VALUE
|
|
from sqlalchemy import (
|
|
delete,
|
|
func,
|
|
select,
|
|
update,
|
|
)
|
|
from sqlalchemy.orm import aliased
|
|
from sqlalchemy.orm.session import Session
|
|
from sqlalchemy.sql.elements import BinaryExpression, ColumnElement
|
|
|
|
# App imports
|
|
from classes import (
|
|
ApplicationError,
|
|
Filter,
|
|
NoteColoursDTO,
|
|
PlaydatesDTO,
|
|
PlaylistDTO,
|
|
PlaylistRowDTO,
|
|
QueryDTO,
|
|
TrackDTO,
|
|
)
|
|
from config import Config
|
|
from log import log, log_call
|
|
from dbtables import (
|
|
NoteColours,
|
|
Playdates,
|
|
PlaylistRows,
|
|
Playlists,
|
|
Queries,
|
|
Settings,
|
|
Tracks,
|
|
)
|
|
from dbmanager import DatabaseManager
|
|
|
|
# Establish database connection
|
|
DATABASE_URL = os.environ.get("DATABASE_URL")
|
|
if DATABASE_URL is None:
|
|
raise ValueError("DATABASE_URL is undefined")
|
|
if "unittest" in sys.modules and "sqlite" not in DATABASE_URL:
|
|
raise ValueError("Unit tests running on non-Sqlite database")
|
|
db = DatabaseManager.get_instance(DATABASE_URL, engine_options=Config.ENGINE_OPTIONS).db
|
|
|
|
# Configure the dogpile cache region
|
|
cache_region = make_region().configure(
|
|
"dogpile.cache.memory", # Use in-memory caching for now (switch to Redis if needed)
|
|
expiration_time=600, # Cache expires after 10 minutes
|
|
)
|
|
|
|
|
|
# Helper functions
|
|
# @log_call
|
|
def _remove_substring_case_insensitive(parent_string: str, substring: str) -> str:
|
|
"""
|
|
Remove all instances of substring from parent string, case insensitively
|
|
"""
|
|
|
|
# Convert both strings to lowercase for case-insensitive comparison
|
|
lower_parent = parent_string.lower()
|
|
lower_substring = substring.lower()
|
|
|
|
# Initialize the result string
|
|
result = parent_string
|
|
|
|
# Continue removing the substring until it's no longer found
|
|
while lower_substring in lower_parent:
|
|
# Find the index of the substring
|
|
index = lower_parent.find(lower_substring)
|
|
|
|
# Remove the substring
|
|
result = result[:index] + result[index + len(substring) :]
|
|
|
|
# Update the lowercase versions
|
|
lower_parent = result.lower()
|
|
|
|
return result
|
|
|
|
|
|
# Notecolour functions
|
|
def _notecolours_all(session: Session) -> list[NoteColoursDTO]:
|
|
"""
|
|
Return all notecolour records
|
|
"""
|
|
|
|
cache_key = "note_colours_all"
|
|
cached_result = cache_region.get(cache_key)
|
|
|
|
if cached_result is not NO_VALUE:
|
|
return cached_result
|
|
|
|
# Query the database
|
|
records = session.scalars(
|
|
select(NoteColours)
|
|
.where(
|
|
NoteColours.enabled.is_(True),
|
|
)
|
|
.order_by(NoteColours.order)
|
|
).all()
|
|
|
|
results: list[NoteColoursDTO] = []
|
|
for record in records:
|
|
result = NoteColoursDTO(
|
|
notecolour_id=record.notecolour_id,
|
|
substring=record.substring,
|
|
colour=record.colour,
|
|
enabled=record.enabled,
|
|
foreground=record.foreground,
|
|
is_regex=record.is_regex,
|
|
is_casesensitive=record.is_casesensitive,
|
|
order=record.order,
|
|
strip_substring=record.strip_substring,
|
|
)
|
|
results.append(result)
|
|
|
|
cache_region.set(cache_key, results)
|
|
|
|
return results
|
|
|
|
|
|
def _notecolors_get_notecolours_dto(text: str) -> tuple[NoteColoursDTO | None, str]:
|
|
"""
|
|
Parse text and return first matching colour record or None
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
for rec in _notecolours_all(session):
|
|
if rec.is_regex:
|
|
flags = re.UNICODE
|
|
if not rec.is_casesensitive:
|
|
flags |= re.IGNORECASE
|
|
p = re.compile(rec.substring, flags)
|
|
if p.match(text):
|
|
if rec.strip_substring:
|
|
return_text = re.sub(p, "", text)
|
|
else:
|
|
return_text = text
|
|
return (rec, return_text)
|
|
else:
|
|
if rec.is_casesensitive:
|
|
if rec.substring in text:
|
|
return_text = text.replace(rec.substring, "")
|
|
return (rec, return_text)
|
|
else:
|
|
if rec.substring.lower() in text.lower():
|
|
return_text = _remove_substring_case_insensitive(
|
|
text, rec.substring
|
|
)
|
|
return (rec, return_text)
|
|
|
|
return (None, text)
|
|
|
|
|
|
def notecolours_get_colour(text: str, foreground: bool = False) -> str:
|
|
"""
|
|
Parse text and return background (foreground if foreground==True)
|
|
colour string if matched, else None
|
|
"""
|
|
|
|
(rec, _) = _notecolors_get_notecolours_dto(text)
|
|
if rec is None:
|
|
return ""
|
|
elif foreground:
|
|
return rec.foreground or ""
|
|
else:
|
|
return rec.colour
|
|
|
|
|
|
# @log_call
|
|
def notecolours_remove_colour_substring(text: str) -> str:
|
|
"""
|
|
Remove text that identifies the colour to be used if strip_substring is True
|
|
"""
|
|
|
|
(rec, stripped_text) = _notecolors_get_notecolours_dto(text)
|
|
|
|
return stripped_text
|
|
|
|
|
|
# Track functions
|
|
# @log_call
|
|
def _tracks_where(
|
|
query: BinaryExpression | ColumnElement[bool],
|
|
) -> list[TrackDTO]:
|
|
"""
|
|
filter_by_last_played: bool = False,
|
|
last_played_before: dt.datetime | None = None,
|
|
Return tracks selected by query
|
|
"""
|
|
|
|
# Alibas PlaydatesTable for subquery
|
|
LatestPlaydate = aliased(Playdates)
|
|
|
|
# Create a 'latest playdate' subquery
|
|
latest_playdate_subq = (
|
|
select(
|
|
LatestPlaydate.track_id,
|
|
func.max(LatestPlaydate.lastplayed).label("lastplayed"),
|
|
)
|
|
.group_by(LatestPlaydate.track_id)
|
|
.subquery()
|
|
)
|
|
stmt = (
|
|
select(
|
|
Tracks.track_id,
|
|
Tracks.artist,
|
|
Tracks.bitrate,
|
|
Tracks.duration,
|
|
Tracks.fade_at,
|
|
Tracks.intro,
|
|
Tracks.path,
|
|
Tracks.silence_at,
|
|
Tracks.start_gap,
|
|
Tracks.title,
|
|
latest_playdate_subq.c.lastplayed,
|
|
)
|
|
.outerjoin(
|
|
latest_playdate_subq, Tracks.track_id == latest_playdate_subq.c.track_id
|
|
)
|
|
.where(query)
|
|
)
|
|
|
|
results: list[TrackDTO] = []
|
|
|
|
with db.Session() as session:
|
|
records = session.execute(stmt).all()
|
|
for record in records:
|
|
dto = TrackDTO(
|
|
artist=record.artist,
|
|
bitrate=record.bitrate,
|
|
duration=record.duration,
|
|
fade_at=record.fade_at,
|
|
intro=record.intro,
|
|
lastplayed=record.lastplayed,
|
|
path=record.path,
|
|
silence_at=record.silence_at,
|
|
start_gap=record.start_gap,
|
|
title=record.title,
|
|
track_id=record.track_id,
|
|
)
|
|
results.append(dto)
|
|
|
|
return results
|
|
|
|
|
|
# @log_call
|
|
def track_add_to_header(playlistrow_id: int, track_id: int) -> None:
|
|
"""
|
|
Add a track to this (header) row
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
session.execute(
|
|
update(PlaylistRows)
|
|
.where(PlaylistRows.playlistrow_id == playlistrow_id)
|
|
.values(track_id=track_id)
|
|
)
|
|
session.commit()
|
|
|
|
|
|
def tracks_all() -> list[TrackDTO]:
|
|
"""Return a list of all tracks"""
|
|
|
|
return _tracks_where(Tracks.track_id > 0)
|
|
|
|
|
|
def tracks_by_artist(filter_str: str) -> list[TrackDTO]:
|
|
"""
|
|
Return tracks where artist is like filter
|
|
"""
|
|
|
|
return _tracks_where(Tracks.artist.ilike(f"%{filter_str}%"))
|
|
|
|
|
|
def track_by_id(track_id: int) -> TrackDTO | None:
|
|
"""
|
|
Return track with specified id
|
|
"""
|
|
|
|
track_list = _tracks_where(Tracks.track_id == track_id)
|
|
if not track_list:
|
|
return None
|
|
if len(track_list) > 1:
|
|
raise ApplicationError(f"Duplicate {track_id=}")
|
|
return track_list[0]
|
|
|
|
|
|
def track_by_path(path: str) -> TrackDTO | None:
|
|
"""
|
|
Return track with passed path or None
|
|
"""
|
|
|
|
track_list = _tracks_where(Tracks.path.ilike(path))
|
|
if not track_list:
|
|
return None
|
|
if len(track_list) > 1:
|
|
raise ApplicationError(f"Duplicate {path=}")
|
|
return track_list[0]
|
|
|
|
|
|
def tracks_by_title(filter_str: str) -> list[TrackDTO]:
|
|
"""
|
|
Return tracks where title is like filter
|
|
"""
|
|
|
|
return _tracks_where(Tracks.title.ilike(f"%{filter_str}%"))
|
|
|
|
|
|
# @log_call
|
|
def track_create(metadata: dict[str, str | int | float]) -> TrackDTO:
|
|
"""
|
|
Create a track db entry from a track path and return the DTO
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
try:
|
|
track = Tracks(
|
|
session=session,
|
|
path=str(metadata["path"]),
|
|
title=str(metadata["title"]),
|
|
artist=str(metadata["artist"]),
|
|
duration=int(metadata["duration"]),
|
|
start_gap=int(metadata["start_gap"]),
|
|
fade_at=int(metadata["fade_at"]),
|
|
silence_at=int(metadata["silence_at"]),
|
|
bitrate=int(metadata["bitrate"]),
|
|
)
|
|
|
|
track_id = track.track_id
|
|
session.commit()
|
|
except Exception:
|
|
raise ApplicationError("Can't create Track")
|
|
|
|
new_track = track_by_id(track_id)
|
|
if not new_track:
|
|
raise ApplicationError("Unable to create new track")
|
|
|
|
return new_track
|
|
|
|
|
|
def track_delete(track_id: int) -> None:
|
|
"""Delete track"""
|
|
|
|
with db.Session() as session:
|
|
track = session.get(Tracks, track_id)
|
|
session.delete(track)
|
|
session.commit()
|
|
|
|
|
|
def tracks_filtered(filter: Filter) -> list[TrackDTO]:
|
|
"""
|
|
Return tracks matching filter
|
|
"""
|
|
|
|
query = select(Tracks)
|
|
|
|
# Path specification
|
|
if filter.path:
|
|
if filter.path_type == "contains":
|
|
query = query.where(Tracks.path.ilike(f"%{filter.path}%"))
|
|
elif filter.path_type == "excluding":
|
|
query = query.where(Tracks.path.notilike(f"%{filter.path}%"))
|
|
else:
|
|
raise ApplicationError(f"Can't process filter path ({filter=})")
|
|
|
|
# Duration specification
|
|
seconds_duration = filter.duration_number
|
|
if filter.duration_unit == Config.FILTER_DURATION_MINUTES:
|
|
seconds_duration *= 60
|
|
elif filter.duration_unit != Config.FILTER_DURATION_SECONDS:
|
|
raise ApplicationError(f"Can't process filter duration ({filter=})")
|
|
|
|
if filter.duration_type == Config.FILTER_DURATION_LONGER:
|
|
query = query.where(Tracks.duration >= seconds_duration)
|
|
elif filter.duration_unit == Config.FILTER_DURATION_SHORTER:
|
|
query = query.where(Tracks.duration <= seconds_duration)
|
|
else:
|
|
raise ApplicationError(f"Can't process filter duration type ({filter=})")
|
|
|
|
# Process comparator
|
|
if filter.last_played_comparator == Config.FILTER_PLAYED_COMPARATOR_NEVER:
|
|
# Select tracks that have never been played
|
|
query = query.outerjoin(Playdates, Tracks.track_id == Playdates.track_id).where(
|
|
Playdates.playdate_id.is_(None)
|
|
)
|
|
else:
|
|
# Last played specification
|
|
now = dt.datetime.now()
|
|
# Set sensible default, and correct for Config.FILTER_PLAYED_COMPARATOR_ANYTIME
|
|
before = now
|
|
# If not ANYTIME, set 'before' appropriates
|
|
if filter.last_played_comparator != Config.FILTER_PLAYED_COMPARATOR_ANYTIME:
|
|
if filter.last_played_unit == Config.FILTER_PLAYED_DAYS:
|
|
before = now - dt.timedelta(days=filter.last_played_number)
|
|
elif filter.last_played_unit == Config.FILTER_PLAYED_WEEKS:
|
|
before = now - dt.timedelta(days=7 * filter.last_played_number)
|
|
elif filter.last_played_unit == Config.FILTER_PLAYED_MONTHS:
|
|
before = now - dt.timedelta(days=30 * filter.last_played_number)
|
|
elif filter.last_played_unit == Config.FILTER_PLAYED_YEARS:
|
|
before = now - dt.timedelta(days=365 * filter.last_played_number)
|
|
|
|
subquery = (
|
|
select(
|
|
Playdates.track_id,
|
|
func.max(Playdates.lastplayed).label("max_last_played"),
|
|
)
|
|
.group_by(Playdates.track_id)
|
|
.subquery()
|
|
)
|
|
query = query.join(subquery, Tracks.track_id == subquery.c.track_id).where(
|
|
subquery.c.max_last_played < before
|
|
)
|
|
|
|
results: list[TrackDTO] = []
|
|
with db.Session() as session:
|
|
records = session.scalars(query).unique().all()
|
|
for record in records:
|
|
if record.playdates:
|
|
last_played = record.playdates[0].lastplayed
|
|
else:
|
|
last_played = None
|
|
last_played
|
|
dto = TrackDTO(
|
|
artist=record.artist,
|
|
bitrate=record.bitrate,
|
|
duration=record.duration,
|
|
fade_at=record.fade_at,
|
|
intro=record.intro,
|
|
lastplayed=last_played,
|
|
path=record.path,
|
|
silence_at=record.silence_at,
|
|
start_gap=record.start_gap,
|
|
title=record.title,
|
|
track_id=record.track_id,
|
|
)
|
|
results.append(dto)
|
|
|
|
return results
|
|
|
|
|
|
# @log_call
|
|
def track_update(track_id: int, metadata: dict[str, str | int | float]) -> TrackDTO:
|
|
"""
|
|
Update an existing track db entry return the DTO
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
track = session.get(Tracks, track_id)
|
|
|
|
if not track:
|
|
raise ApplicationError(f"Can't retrieve Track ({track_id=})")
|
|
|
|
for key, value in metadata.items():
|
|
if hasattr(track, key):
|
|
setattr(track, key, value)
|
|
else:
|
|
raise ApplicationError(f"Tried to set attribute {key} on {track}")
|
|
|
|
session.commit()
|
|
|
|
updated_track = track_by_id(track_id)
|
|
if not updated_track:
|
|
raise ApplicationError("Unable to retrieve updated track")
|
|
|
|
return updated_track
|
|
|
|
|
|
# Playlist functions
|
|
def _playlist_check_playlist(
|
|
session: Session, playlist_id: int, fix: bool = False
|
|
) -> None:
|
|
"""
|
|
Ensure the row numbers are contiguous. Fix and log if fix==True,
|
|
else raise ApplicationError.
|
|
"""
|
|
|
|
fixed = False
|
|
|
|
playlist_rows = (
|
|
session.execute(
|
|
select(PlaylistRows)
|
|
.where(PlaylistRows.playlist_id == playlist_id)
|
|
.order_by(PlaylistRows.row_number)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
for idx, plr in enumerate(playlist_rows):
|
|
if plr.row_number == idx:
|
|
continue
|
|
|
|
msg = (
|
|
"_check_playlist_integrity: incorrect row number "
|
|
f"({plr.playlistrow_id=}, {plr.row_number=}, {idx=})"
|
|
)
|
|
if fix:
|
|
log.debug(msg)
|
|
plr.row_number = idx
|
|
fixed = True
|
|
else:
|
|
raise ApplicationError(msg)
|
|
|
|
if fixed:
|
|
session.commit()
|
|
|
|
|
|
# @log_call
|
|
def _playlist_shift_rows(
|
|
session: Session, playlist_id: int, starting_row: int, shift_by: int
|
|
) -> None:
|
|
"""
|
|
Shift rows from starting_row by shift_by. If shift_by is +ve, shift rows
|
|
down; if -ve, shift them up.
|
|
"""
|
|
|
|
session.execute(
|
|
update(PlaylistRows)
|
|
.where(
|
|
(PlaylistRows.playlist_id == playlist_id),
|
|
(PlaylistRows.row_number >= starting_row),
|
|
)
|
|
.values(row_number=PlaylistRows.row_number + shift_by)
|
|
)
|
|
|
|
|
|
# @log_call
|
|
def _playlists_where(
|
|
query: BinaryExpression | ColumnElement[bool],
|
|
) -> list[PlaylistDTO]:
|
|
"""
|
|
Return playlists selected by query
|
|
"""
|
|
|
|
stmt = (
|
|
select(
|
|
Playlists.favourite,
|
|
Playlists.is_template,
|
|
Playlists.playlist_id,
|
|
Playlists.name,
|
|
Playlists.open,
|
|
)
|
|
.where(query)
|
|
.order_by(Playlists.tab)
|
|
)
|
|
|
|
results: list[PlaylistDTO] = []
|
|
|
|
with db.Session() as session:
|
|
records = session.execute(stmt).all()
|
|
for record in records:
|
|
dto = PlaylistDTO(
|
|
favourite=record.favourite,
|
|
is_template=record.is_template,
|
|
playlist_id=record.playlist_id,
|
|
name=record.name,
|
|
open=record.open,
|
|
)
|
|
results.append(dto)
|
|
|
|
return results
|
|
|
|
|
|
def playlists_all():
|
|
"""Return all playlists"""
|
|
|
|
return _playlists_where(Playlists.playlist_id > 0)
|
|
|
|
|
|
# @log_call
|
|
def playlist_by_id(playlist_id: int) -> PlaylistDTO | None:
|
|
"""
|
|
Return playlist with specified id
|
|
"""
|
|
|
|
playlist_list = _playlists_where(Playlists.playlist_id == playlist_id)
|
|
if not playlist_list:
|
|
return None
|
|
if len(playlist_list) > 1:
|
|
raise ApplicationError(f"Duplicate {playlist_id=}")
|
|
return playlist_list[0]
|
|
|
|
|
|
def playlist_copy(src_id: int, dst_id: int) -> None:
|
|
"""Copy playlist entries"""
|
|
|
|
with db.Session() as session:
|
|
src_rows = session.scalars(
|
|
select(PlaylistRows).where(PlaylistRows.playlist_id == src_id)
|
|
).all()
|
|
|
|
for plr in src_rows:
|
|
PlaylistRows(
|
|
session=session,
|
|
playlist_id=dst_id,
|
|
row_number=plr.row_number,
|
|
note=plr.note,
|
|
track_id=plr.track_id,
|
|
)
|
|
|
|
session.commit()
|
|
|
|
|
|
def playlists_closed() -> list[PlaylistDTO]:
|
|
"""
|
|
Return a list of closed playlists
|
|
"""
|
|
|
|
return _playlists_where(Playlists.open.is_(False))
|
|
|
|
|
|
# @log_call
|
|
def playlist_create(
|
|
name: str, template_id: int, as_template: bool = False
|
|
) -> PlaylistDTO:
|
|
"""
|
|
Create playlist and return DTO.
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
try:
|
|
playlist = Playlists(session, name, template_id)
|
|
playlist.is_template = as_template
|
|
playlist_id = playlist.playlist_id
|
|
session.commit()
|
|
except Exception:
|
|
raise ApplicationError("Can't create Playlist")
|
|
|
|
if template_id != 0:
|
|
playlist_copy(template_id, playlist_id)
|
|
|
|
new_playlist = playlist_by_id(playlist_id)
|
|
if not new_playlist:
|
|
raise ApplicationError("Can't retrieve new Playlist")
|
|
|
|
return new_playlist
|
|
|
|
|
|
def playlist_delete(playlist_id: int) -> None:
|
|
"""Delete playlist"""
|
|
|
|
with db.Session() as session:
|
|
query = session.get(Playlists, playlist_id)
|
|
session.delete(query)
|
|
session.commit()
|
|
|
|
|
|
# @log_call
|
|
def playlist_insert_row(
|
|
playlist_id: int, row_number: int, track_id: int | None, note: str
|
|
) -> PlaylistRowDTO:
|
|
"""
|
|
Insert a new row into playlist and return new row DTO
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
# Sanity check
|
|
_playlist_check_playlist(session, playlist_id, fix=False)
|
|
|
|
# Make space for new row
|
|
_playlist_shift_rows(
|
|
session=session,
|
|
playlist_id=playlist_id,
|
|
starting_row=row_number,
|
|
shift_by=1,
|
|
)
|
|
|
|
playlist_row = PlaylistRows(
|
|
session=session,
|
|
playlist_id=playlist_id,
|
|
row_number=row_number,
|
|
note=note,
|
|
track_id=track_id,
|
|
)
|
|
session.commit()
|
|
playlist_row_id = playlist_row.playlistrow_id
|
|
|
|
# Sanity check
|
|
_playlist_check_playlist(session, playlist_id, fix=False)
|
|
|
|
new_playlist_row = playlistrow_by_id(playlistrow_id=playlist_row_id)
|
|
if not new_playlist_row:
|
|
raise ApplicationError("Can't retrieve new playlist row")
|
|
|
|
return new_playlist_row
|
|
|
|
|
|
# @log_call
|
|
def playlist_mark_status(playlist_id: int, open: bool) -> None:
|
|
"""Mark playlist as open or closed"""
|
|
|
|
with db.Session() as session:
|
|
session.execute(
|
|
update(Playlists)
|
|
.where(Playlists.playlist_id == playlist_id)
|
|
.values(open=open)
|
|
)
|
|
|
|
session.commit()
|
|
|
|
|
|
# @log_call
|
|
def playlist_move_rows(
|
|
from_rows: list[int],
|
|
from_playlist_id: int,
|
|
to_row: int,
|
|
to_playlist_id: int | None = None,
|
|
) -> None:
|
|
"""
|
|
Call helper function depending upon whether we are moving rows within
|
|
a playlist or between playlists.
|
|
"""
|
|
|
|
# If to_playlist_id isn't specified, we're moving within the one
|
|
# playlist.
|
|
if to_playlist_id is None or to_playlist_id == from_playlist_id:
|
|
_playlist_move_rows_within_playlist(from_rows, from_playlist_id, to_row)
|
|
else:
|
|
_playlist_move_rows_between_playlists(
|
|
from_rows, from_playlist_id, to_row, to_playlist_id
|
|
)
|
|
|
|
|
|
def _playlist_move_rows_between_playlists(
|
|
from_rows: list[int],
|
|
from_playlist_id: int,
|
|
to_row: int,
|
|
to_playlist_id: int,
|
|
) -> None:
|
|
"""
|
|
Move rows between playlists.
|
|
|
|
Algorithm:
|
|
- Sanity check row numbers
|
|
- Resequence remaining row numbers
|
|
- Make space for moved rows
|
|
- Move the PENDING_MOVE rows back and fixup row numbers
|
|
- Sanity check row numbers
|
|
"""
|
|
|
|
# Sanity check destination not being moved
|
|
if to_row in from_rows:
|
|
log.error(f"ds._playlist_move_rows_within_playlist: {to_row=} in {from_rows=}")
|
|
return
|
|
|
|
with db.Session() as session:
|
|
# Sanity check row numbers
|
|
_playlist_check_playlist(session, from_playlist_id, fix=False)
|
|
_playlist_check_playlist(session, to_playlist_id, fix=False)
|
|
|
|
# Make space in destination playlist
|
|
_playlist_shift_rows(session, to_playlist_id, to_row, len(from_rows))
|
|
|
|
# Update database
|
|
# Build a dictionary of changes to make
|
|
update_list: list[dict[str, int]] = []
|
|
old_row_to_id = _playlist_rows_to_id(from_playlist_id)
|
|
next_row = to_row
|
|
|
|
for from_row in from_rows:
|
|
plrid = old_row_to_id[from_row]
|
|
update_list.append({"id": plrid, "row_number": next_row})
|
|
update_list.append({"id": plrid, "playlist_id": to_playlist_id})
|
|
next_row += 1
|
|
|
|
session.execute(update(PlaylistRows), update_list)
|
|
session.commit()
|
|
|
|
# Resequence row numbers in source
|
|
_playlist_check_playlist(session, from_playlist_id, fix=True)
|
|
# Sanity check destination
|
|
_playlist_check_playlist(session, from_playlist_id, fix=False)
|
|
|
|
|
|
def _playlist_rows_to_id(playlist_id: int) -> dict[int, int]:
|
|
"""
|
|
Return a dict of {row_number: playlistrow_id} for passed playlist
|
|
"""
|
|
|
|
row_to_id = {
|
|
p.row_number: p.playlistrow_id for p in playlistrows_by_playlist(playlist_id)
|
|
}
|
|
|
|
return row_to_id
|
|
|
|
|
|
# @log_call
|
|
def _playlist_move_rows_within_playlist(
|
|
from_rows: list[int],
|
|
from_playlist_id: int,
|
|
to_row: int,
|
|
) -> None:
|
|
"""
|
|
Move rows within playlists.
|
|
|
|
Algorithm:
|
|
- Sanity checks
|
|
- Create a list of row numbers in the new order
|
|
- Update the database with the new order
|
|
- Sanity check row numbers
|
|
"""
|
|
|
|
# Sanity check destination not being moved
|
|
if to_row in from_rows:
|
|
log.error(f"ds._playlist_move_rows_within_playlist: {to_row=} in {from_rows=}")
|
|
return
|
|
|
|
with db.Session() as session:
|
|
# Sanity check row numbers
|
|
_playlist_check_playlist(session, from_playlist_id, fix=False)
|
|
|
|
# Create a list showing the new order of rows in playlist
|
|
# Start with a list of rows excluding those to be moved
|
|
from_playlist_length = len(playlistrows_by_playlist(from_playlist_id))
|
|
new_row_order = [a for a in range(from_playlist_length) if a not in from_rows]
|
|
# Insert the moved row numbers
|
|
try:
|
|
idx = new_row_order.index(to_row)
|
|
except ValueError:
|
|
raise ApplicationError(f"Can't find {to_row=} in {new_row_order=}")
|
|
new_row_order[idx:idx] = from_rows
|
|
|
|
# Update database
|
|
# Build a dictionary of {old_row_number: new_row_number} where
|
|
# they differ
|
|
row_changes = {old: new for new, old in enumerate(new_row_order) if old != new}
|
|
# Build a dictionary of changes to make
|
|
update_list: list[dict[str, int]] = []
|
|
old_row_to_id = _playlist_rows_to_id(from_playlist_id)
|
|
for old_row, new_row in row_changes.items():
|
|
plrid = old_row_to_id[old_row]
|
|
update_list.append({"id": plrid, "row_number": new_row})
|
|
|
|
# Update database
|
|
session.execute(update(PlaylistRows), update_list)
|
|
session.commit()
|
|
|
|
# Sanity check row numbers
|
|
_playlist_check_playlist(session, from_playlist_id, fix=False)
|
|
|
|
|
|
def playlists_open() -> list[PlaylistDTO]:
|
|
"""
|
|
Return a list of open playlists
|
|
"""
|
|
|
|
return _playlists_where(Playlists.open.is_(True))
|
|
|
|
|
|
def playlist_rename(playlist_id: int, new_name: str) -> None:
|
|
"""
|
|
Rename playlist
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
session.execute(
|
|
update(Playlists)
|
|
.where(Playlists.playlist_id == playlist_id)
|
|
.values(name=new_name)
|
|
)
|
|
|
|
session.commit()
|
|
|
|
|
|
def playlist_row_count(playlist_id: int) -> int:
|
|
"""
|
|
Return number of rows in playlist
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
count = session.scalar(
|
|
select(func.count())
|
|
.select_from(PlaylistRows)
|
|
.where(PlaylistRows.playlist_id == playlist_id)
|
|
)
|
|
|
|
return count
|
|
|
|
|
|
def playlist_save_as_template(playlist_id: int, template_name: str) -> None:
|
|
"""
|
|
Save playlist as templated
|
|
"""
|
|
|
|
new_template = playlist_create(template_name, 0, as_template=True)
|
|
|
|
playlist_copy(playlist_id, new_template.playlist_id)
|
|
|
|
|
|
def playlists_templates_all() -> list[PlaylistDTO]:
|
|
"""
|
|
Return a list of playlist templates
|
|
"""
|
|
|
|
return _playlists_where(Playlists.is_template.is_(True))
|
|
|
|
|
|
def playlists_template_by_id(playlist_id: int) -> PlaylistDTO | None:
|
|
"""
|
|
Return a list of closed playlists
|
|
"""
|
|
|
|
playlist_list = _playlists_where(Playlists.playlist_id == playlist_id)
|
|
|
|
if not playlist_list:
|
|
return None
|
|
if len(playlist_list) > 1:
|
|
raise ApplicationError(f"Duplicate {playlist_id=}")
|
|
template = playlist_list[0]
|
|
if template.is_template is False:
|
|
raise ApplicationError(f"Playlist {playlist_id=} is not a template")
|
|
|
|
return template
|
|
|
|
|
|
# @log_call
|
|
def playlist_update_row_numbers(
|
|
playlist_id: int, id_to_row_number: list[dict[int, int]]
|
|
) -> None:
|
|
"""
|
|
Update playlistrows rownumbers for passed playlistrow_ids
|
|
playlist_id is only needed for sanity checking
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
session.execute(update(PlaylistRows), id_to_row_number)
|
|
session.commit()
|
|
|
|
# Sanity check
|
|
_playlist_check_playlist(session, playlist_id, fix=False)
|
|
|
|
|
|
# @log_call
|
|
def playlist_remove_comments(playlist_id: int, row_numbers: list[int]) -> None:
|
|
"""
|
|
Remove comments from rows in playlist
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
session.execute(
|
|
update(PlaylistRows)
|
|
.where(
|
|
PlaylistRows.playlist_id == playlist_id,
|
|
PlaylistRows.row_number.in_(row_numbers),
|
|
)
|
|
.values(note="")
|
|
)
|
|
session.commit()
|
|
|
|
|
|
# @log_call
|
|
def playlist_remove_rows(playlist_id: int, row_numbers: list[int]) -> None:
|
|
"""
|
|
Remove rows from playlist
|
|
|
|
Delete from highest row back so that not yet deleted row numbers don't change.
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
for row_number in sorted(row_numbers, reverse=True):
|
|
session.execute(
|
|
delete(PlaylistRows).where(
|
|
PlaylistRows.playlist_id == playlist_id,
|
|
PlaylistRows.row_number == row_number,
|
|
)
|
|
)
|
|
# Fixup row number to remove gaps
|
|
_playlist_check_playlist(session, playlist_id, fix=True)
|
|
|
|
|
|
# @log_call
|
|
def playlist_save_tabs(playlist_id_to_tab: dict[int, int]) -> None:
|
|
"""
|
|
Save the tab numbers of the open playlists.
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
# Clear all existing tab numbers
|
|
session.execute(
|
|
update(Playlists)
|
|
.where(Playlists.playlist_id.in_(playlist_id_to_tab.keys()))
|
|
.values(tab=None)
|
|
)
|
|
for playlist_id, tab in playlist_id_to_tab.items():
|
|
session.execute(
|
|
update(Playlists)
|
|
.where(Playlists.playlist_id == playlist_id)
|
|
.values(tab=tab)
|
|
)
|
|
session.commit()
|
|
|
|
|
|
# @log_call
|
|
def playlist_update_template_favourite(template_id: int, favourite: bool) -> None:
|
|
"""Update template favourite"""
|
|
|
|
with db.Session() as session:
|
|
session.execute(
|
|
update(Playlists)
|
|
.where(Playlists.playlist_id == template_id)
|
|
.values(favourite=favourite)
|
|
)
|
|
session.commit()
|
|
|
|
|
|
# Playlist Rows
|
|
|
|
|
|
# @log_call
|
|
def playlistrow_by_id(playlistrow_id: int) -> PlaylistRowDTO | None:
|
|
"""
|
|
Return specific row DTO
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
record = (
|
|
session.execute(
|
|
select(PlaylistRows).where(
|
|
PlaylistRows.playlistrow_id == playlistrow_id
|
|
)
|
|
)
|
|
.scalars()
|
|
.one_or_none()
|
|
)
|
|
if not record:
|
|
return None
|
|
|
|
track = None
|
|
if record.track_id:
|
|
track = track_by_id(record.track_id)
|
|
|
|
dto = PlaylistRowDTO(
|
|
note=record.note,
|
|
played=record.played,
|
|
playlist_id=record.playlist_id,
|
|
playlistrow_id=record.playlistrow_id,
|
|
row_number=record.row_number,
|
|
track=track,
|
|
)
|
|
|
|
return dto
|
|
|
|
|
|
def playlistrows_by_playlist(
|
|
playlist_id: int, check_playlist_itegrity: bool = True
|
|
) -> list[PlaylistRowDTO]:
|
|
with db.Session() as session:
|
|
# TODO: would be good to be confident at removing this
|
|
if check_playlist_itegrity:
|
|
_playlist_check_playlist(
|
|
session=session, playlist_id=playlist_id, fix=False
|
|
)
|
|
|
|
records = session.scalars(
|
|
select(PlaylistRows)
|
|
.where(PlaylistRows.playlist_id == playlist_id)
|
|
.order_by(PlaylistRows.row_number)
|
|
).all()
|
|
|
|
dto_list = []
|
|
for record in records:
|
|
track = None
|
|
if record.track_id:
|
|
track = track_by_id(record.track_id)
|
|
|
|
dto = PlaylistRowDTO(
|
|
note=record.note,
|
|
played=record.played,
|
|
playlist_id=record.playlist_id,
|
|
playlistrow_id=record.playlistrow_id,
|
|
row_number=record.row_number,
|
|
track=track,
|
|
)
|
|
|
|
dto_list.append(dto)
|
|
|
|
return dto_list
|
|
|
|
|
|
def playlistrow_update_note(playlistrow_id: int, note: str) -> PlaylistRowDTO:
|
|
"""
|
|
Update the note on a playlist row
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
plr = session.get(PlaylistRows, playlistrow_id)
|
|
|
|
if not plr:
|
|
raise ApplicationError(f"Can't retrieve Playlistrow ({playlistrow_id=})")
|
|
|
|
plr.note = note
|
|
|
|
session.commit()
|
|
|
|
new_plr = playlistrow_by_id(playlistrow_id)
|
|
if not new_plr:
|
|
raise ApplicationError(f"Can't retrieve new Playlistrow ({playlistrow_id=})")
|
|
|
|
return new_plr
|
|
|
|
|
|
def playlistrow_played(playlistrow_id: int, status: bool) -> None:
|
|
"""Update played status of row"""
|
|
|
|
with db.Session() as session:
|
|
session.execute(
|
|
update(PlaylistRows)
|
|
.where(PlaylistRows.playlistrow_id == playlistrow_id)
|
|
.values(played=status)
|
|
)
|
|
session.commit()
|
|
|
|
|
|
# Playdates
|
|
# @log_call
|
|
def playdates_get_last(track_id: int, limit: int = 5) -> str:
|
|
"""
|
|
Return the most recent 'limit' dates that this track has been played
|
|
as a text list
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
playdates = session.scalars(
|
|
Playdates.select()
|
|
.where(Playdates.track_id == track_id)
|
|
.order_by(Playdates.lastplayed.desc())
|
|
.limit(limit)
|
|
).all()
|
|
|
|
return "<br>".join(
|
|
[
|
|
a.lastplayed.strftime(Config.LAST_PLAYED_TOOLTIP_DATE_FORMAT)
|
|
for a in playdates
|
|
]
|
|
)
|
|
|
|
|
|
def playdates_update(track_id: int, when: dt.datetime | None = None) -> None:
|
|
"""
|
|
Update playdates for passed track
|
|
"""
|
|
|
|
if not when:
|
|
when = dt.datetime.now()
|
|
|
|
with db.Session() as session:
|
|
_ = Playdates(session, track_id, when)
|
|
|
|
|
|
def playdates_between_dates(
|
|
start: dt.datetime, end: dt.datetime | None = None
|
|
) -> list[PlaydatesDTO]:
|
|
"""
|
|
Return a list of PlaydateDTO objects from between times (until now if end is None)
|
|
"""
|
|
|
|
if end is None:
|
|
end = dt.datetime.now()
|
|
|
|
stmt = select(
|
|
Playdates.playdate_id,
|
|
Playdates.lastplayed,
|
|
Playdates.track_id,
|
|
Playdates.track,
|
|
).where(Playdates.lastplayed >= start, Playdates.lastplayed <= end)
|
|
|
|
results: list[PlaydatesDTO] = []
|
|
|
|
with db.Session() as session:
|
|
records = session.execute(stmt).all()
|
|
for record in records:
|
|
dto = PlaydatesDTO(
|
|
playdate_id=record.playdate_id,
|
|
lastplayed=record.lastplayed,
|
|
track_id=record.track_id,
|
|
artist=record.track.artist,
|
|
bitrate=record.track.bitrate,
|
|
duration=record.track.duration,
|
|
fade_at=record.track.fade_at,
|
|
intro=record.track.intro,
|
|
path=record.track.path,
|
|
silence_at=record.track.silence_at,
|
|
start_gap=record.track.start_gap,
|
|
title=record.track.title,
|
|
)
|
|
results.append(dto)
|
|
|
|
return results
|
|
|
|
|
|
# Queries
|
|
# @log_call
|
|
def _queries_where(
|
|
query: BinaryExpression | ColumnElement[bool],
|
|
) -> list[QueryDTO]:
|
|
"""
|
|
Return queries selected by query
|
|
"""
|
|
|
|
results: list[QueryDTO] = []
|
|
|
|
with db.Session() as session:
|
|
records = session.scalars(select(Queries).where(query)).all()
|
|
for record in records:
|
|
dto = QueryDTO(
|
|
favourite=record.favourite,
|
|
filter=record.filter,
|
|
name=record.name,
|
|
query_id=record.query_id,
|
|
)
|
|
results.append(dto)
|
|
|
|
return results
|
|
|
|
|
|
def queries_all(favourites_only: bool = False) -> list[QueryDTO]:
|
|
"""Return a list of all queries"""
|
|
|
|
query = Queries.query_id > 0
|
|
return _queries_where(query)
|
|
|
|
|
|
def query_by_id(query_id: int) -> QueryDTO | None:
|
|
"""Return query"""
|
|
|
|
query_list = _queries_where(Queries.query_id == query_id)
|
|
if not query_list:
|
|
return None
|
|
if len(query_list) > 1:
|
|
raise ApplicationError(f"Duplicate {query_id=}")
|
|
return query_list[0]
|
|
|
|
|
|
def query_create(name: str, filter: Filter) -> QueryDTO:
|
|
"""
|
|
Create a query and return the DTO
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
try:
|
|
query = Queries(session=session, name=name, filter=filter)
|
|
query_id = query.query_id
|
|
session.commit()
|
|
except Exception:
|
|
raise ApplicationError("Can't create Query")
|
|
|
|
new_query = query_by_id(query_id)
|
|
if not new_query:
|
|
raise ApplicationError("Unable to create new query")
|
|
|
|
return new_query
|
|
|
|
|
|
def query_delete(query_id: int) -> None:
|
|
"""Delete query"""
|
|
|
|
with db.Session() as session:
|
|
query = session.get(Queries, query_id)
|
|
session.delete(query)
|
|
session.commit()
|
|
|
|
|
|
def query_update_favourite(query_id: int, favourite: bool) -> None:
|
|
"""Update query favourite"""
|
|
|
|
with db.Session() as session:
|
|
session.execute(
|
|
update(Queries)
|
|
.where(Queries.query_id == query_id)
|
|
.values(favourite=favourite)
|
|
)
|
|
session.commit()
|
|
|
|
|
|
def query_update_filter(query_id: int, filter: Filter) -> None:
|
|
"""Update query filter"""
|
|
|
|
with db.Session() as session:
|
|
session.execute(
|
|
update(Queries).where(Queries.query_id == query_id).values(filter=filter)
|
|
)
|
|
session.commit()
|
|
|
|
|
|
def query_update_name(query_id: int, name: str) -> None:
|
|
"""Update query name"""
|
|
|
|
with db.Session() as session:
|
|
session.execute(
|
|
update(Queries).where(Queries.query_id == query_id).values(name=name)
|
|
)
|
|
session.commit()
|
|
|
|
|
|
# Misc
|
|
def setting_get(name: str) -> int | None:
|
|
"""
|
|
Get int setting
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
record = (
|
|
session.execute(select(Settings).where(Settings.name == name))
|
|
.scalars()
|
|
.one_or_none()
|
|
)
|
|
if not record:
|
|
return None
|
|
|
|
return record.f_int
|
|
|
|
|
|
def setting_set(name: str, value: int) -> None:
|
|
"""
|
|
Add int setting
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
record = (
|
|
session.execute(select(Settings).where(Settings.name == name))
|
|
.scalars()
|
|
.one_or_none()
|
|
)
|
|
if not record:
|
|
record = Settings(session=session, name=name)
|
|
if not record:
|
|
raise ApplicationError("Can't create Settings record")
|
|
record.f_int = value
|
|
session.commit()
|
|
|
|
|
|
def db_name_get() -> str:
|
|
"""Return database name"""
|
|
|
|
with db.Session() as session:
|
|
if session.bind:
|
|
dbname = session.bind.engine.url.database
|
|
return dbname
|
|
return Config.DB_NOT_FOUND
|