musicmuster/app/ds.py
2025-04-12 11:15:54 +01:00

1247 lines
34 KiB
Python

# Standard library imports
import datetime as dt
import re
# PyQt imports
# Third party imports
from dogpile.cache import make_region
from dogpile.cache.api import NO_VALUE
from sqlalchemy import (
delete,
func,
select,
update,
)
from sqlalchemy.orm import aliased
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.elements import BinaryExpression, ColumnElement
# App imports
from classes import (
ApplicationError,
Filter,
NoteColoursDTO,
PlaydatesDTO,
PlaylistDTO,
PlaylistRowDTO,
QueryDTO,
TrackDTO,
)
from config import Config
from log import log, log_call
from models import (
db,
NoteColours,
Playdates,
PlaylistRows,
Playlists,
Queries,
Settings,
Tracks,
)
# Configure the dogpile cache region
cache_region = make_region().configure(
'dogpile.cache.memory', # Use in-memory caching for now (switch to Redis if needed)
expiration_time=600 # Cache expires after 10 minutes
)
# Helper functions
@log_call
def _remove_substring_case_insensitive(parent_string: str, substring: str) -> str:
"""
Remove all instances of substring from parent string, case insensitively
"""
# Convert both strings to lowercase for case-insensitive comparison
lower_parent = parent_string.lower()
lower_substring = substring.lower()
# Initialize the result string
result = parent_string
# Continue removing the substring until it's no longer found
while lower_substring in lower_parent:
# Find the index of the substring
index = lower_parent.find(lower_substring)
# Remove the substring
result = result[:index] + result[index + len(substring) :]
# Update the lowercase versions
lower_parent = result.lower()
return result
# Notecolour functions
def _all_notecolours(session: Session) -> list[NoteColoursDTO]:
"""
Return all notecolour records
"""
cache_key = "note_colours_all"
cached_result = cache_region.get(cache_key)
if cached_result is not NO_VALUE:
return cached_result
# Query the database
records = session.scalars(
select(NoteColours)
.where(
NoteColours.enabled.is_(True),
)
.order_by(NoteColours.order)
).all()
results: list[NoteColoursDTO] = []
for record in records:
result = NoteColoursDTO(
notecolour_id=record.id,
substring=record.substring,
colour=record.colour,
enabled=record.enabled,
foreground=record.foreground,
is_regex=record.is_regex,
is_casesensitive=record.is_casesensitive,
order=record.order,
strip_substring=record.strip_substring,
)
results.append(result)
cache_region.set(cache_key, results)
return results
def _get_colour_record(text: str) -> tuple[NoteColoursDTO | None, str]:
"""
Parse text and return first matching colour record or None
"""
with db.Session() as session:
for rec in _all_notecolours(session):
if rec.is_regex:
flags = re.UNICODE
if not rec.is_casesensitive:
flags |= re.IGNORECASE
p = re.compile(rec.substring, flags)
if p.match(text):
if rec.strip_substring:
return_text = re.sub(p, "", text)
else:
return_text = text
return (rec, return_text)
else:
if rec.is_casesensitive:
if rec.substring in text:
return_text = text.replace(rec.substring, "")
return (rec, return_text)
else:
if rec.substring.lower() in text.lower():
return_text = _remove_substring_case_insensitive(
text, rec.substring
)
return (rec, return_text)
return (None, text)
def get_colour(text: str, foreground: bool = False) -> str:
"""
Parse text and return background (foreground if foreground==True)
colour string if matched, else None
"""
(rec, _) = _get_colour_record(text)
if rec is None:
return ""
elif foreground:
return rec.foreground or ""
else:
return rec.colour
@log_call
def remove_colour_substring(text: str) -> str:
"""
Remove text that identifies the colour to be used if strip_substring is True
"""
(rec, stripped_text) = _get_colour_record(text)
return stripped_text
# Track functions
@log_call
def _tracks_where(query: BinaryExpression | ColumnElement[bool],) -> list[TrackDTO]:
"""
filter_by_last_played: bool = False,
last_played_before: dt.datetime | None = None,
Return tracks selected by query
"""
# Alibas PlaydatesTable for subquery
LatestPlaydate = aliased(Playdates)
# Create a 'latest playdate' subquery
latest_playdate_subq = (
select(
LatestPlaydate.track_id,
func.max(LatestPlaydate.lastplayed).label("lastplayed"),
)
.group_by(LatestPlaydate.track_id)
.subquery()
)
stmt = (
select(
Tracks.id.label("track_id"),
Tracks.artist,
Tracks.bitrate,
Tracks.duration,
Tracks.fade_at,
Tracks.intro,
Tracks.path,
Tracks.silence_at,
Tracks.start_gap,
Tracks.title,
latest_playdate_subq.c.lastplayed,
)
.outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id)
.where(query)
)
results: list[TrackDTO] = []
with db.Session() as session:
records = session.execute(stmt).all()
for record in records:
dto = TrackDTO(
artist=record.artist,
bitrate=record.bitrate,
duration=record.duration,
fade_at=record.fade_at,
intro=record.intro,
lastplayed=record.lastplayed,
path=record.path,
silence_at=record.silence_at,
start_gap=record.start_gap,
title=record.title,
track_id=record.track_id,
)
results.append(dto)
return results
def track_by_path(path: str) -> TrackDTO | None:
"""
Return track with passed path or None
"""
track_list = _tracks_where(Tracks.path.ilike(path))
if not track_list:
return None
if len(track_list) > 1:
raise ApplicationError(f"Duplicate {path=}")
return track_list[0]
def track_by_id(track_id: int) -> TrackDTO | None:
"""
Return track with specified id
"""
track_list = _tracks_where(Tracks.id == track_id)
if not track_list:
return None
if len(track_list) > 1:
raise ApplicationError(f"Duplicate {track_id=}")
return track_list[0]
def tracks_by_artist(filter_str: str) -> list[TrackDTO]:
"""
Return tracks where artist is like filter
"""
return _tracks_where(Tracks.artist.ilike(f"%{filter_str}%"))
def tracks_by_title(filter_str: str) -> list[TrackDTO]:
"""
Return tracks where title is like filter
"""
return _tracks_where(Tracks.title.ilike(f"%{filter_str}%"))
def get_all_tracks() -> list[TrackDTO]:
"""Return a list of all tracks"""
return _tracks_where(Tracks.id > 0)
def get_filtered_tracks(filter: Filter) -> list[TrackDTO]:
"""
Return tracks matching filter
"""
query = select(Tracks)
# Path specification
if filter.path:
if filter.path_type == "contains":
query = query.where(Tracks.path.ilike(f"%{filter.path}%"))
elif filter.path_type == "excluding":
query = query.where(Tracks.path.notilike(f"%{filter.path}%"))
else:
raise ApplicationError(f"Can't process filter path ({filter=})")
# Duration specification
seconds_duration = filter.duration_number
if filter.duration_unit == Config.FILTER_DURATION_MINUTES:
seconds_duration *= 60
elif filter.duration_unit != Config.FILTER_DURATION_SECONDS:
raise ApplicationError(f"Can't process filter duration ({filter=})")
if filter.duration_type == Config.FILTER_DURATION_LONGER:
query = query.where(Tracks.duration >= seconds_duration)
elif filter.duration_unit == Config.FILTER_DURATION_SHORTER:
query = query.where(Tracks.duration <= seconds_duration)
else:
raise ApplicationError(f"Can't process filter duration type ({filter=})")
# Process comparator
if filter.last_played_comparator == Config.FILTER_PLAYED_COMPARATOR_NEVER:
# Select tracks that have never been played
query = query.outerjoin(Playdates, Tracks.id == Playdates.track_id).where(
Playdates.id.is_(None)
)
else:
# Last played specification
now = dt.datetime.now()
# Set sensible default, and correct for Config.FILTER_PLAYED_COMPARATOR_ANYTIME
before = now
# If not ANYTIME, set 'before' appropriates
if filter.last_played_comparator != Config.FILTER_PLAYED_COMPARATOR_ANYTIME:
if filter.last_played_unit == Config.FILTER_PLAYED_DAYS:
before = now - dt.timedelta(days=filter.last_played_number)
elif filter.last_played_unit == Config.FILTER_PLAYED_WEEKS:
before = now - dt.timedelta(days=7 * filter.last_played_number)
elif filter.last_played_unit == Config.FILTER_PLAYED_MONTHS:
before = now - dt.timedelta(days=30 * filter.last_played_number)
elif filter.last_played_unit == Config.FILTER_PLAYED_YEARS:
before = now - dt.timedelta(days=365 * filter.last_played_number)
subquery = (
select(
Playdates.track_id,
func.max(Playdates.lastplayed).label("max_last_played"),
)
.group_by(Playdates.track_id)
.subquery()
)
query = query.join(subquery, Tracks.id == subquery.c.track_id).where(
subquery.c.max_last_played < before
)
results: list[TrackDTO] = []
with db.Session() as session:
records = session.scalars(query).unique().all()
for record in records:
if record.playdates:
last_played = record.playdates[0].lastplayed
else:
last_played = None
last_played
dto = TrackDTO(
artist=record.artist,
bitrate=record.bitrate,
duration=record.duration,
fade_at=record.fade_at,
intro=record.intro,
lastplayed=last_played,
path=record.path,
silence_at=record.silence_at,
start_gap=record.start_gap,
title=record.title,
track_id=record.id,
)
results.append(dto)
return results
@log_call
def add_track_to_header(playlistrow_id: int, track_id: int) -> None:
"""
Add a track to this (header) row
"""
with db.Session() as session:
session.execute(
update(PlaylistRows)
.where(PlaylistRows.id == playlistrow_id)
.values(track_id=track_id)
)
session.commit()
@log_call
def create_track(path: str, metadata: dict[str, str | int | float]) -> TrackDTO:
"""
Create a track db entry from a track path and return the DTO
"""
with db.Session() as session:
try:
track = Tracks(
session=session,
path=str(metadata["path"]),
title=str(metadata["title"]),
artist=str(metadata["artist"]),
duration=int(metadata["duration"]),
start_gap=int(metadata["start_gap"]),
fade_at=int(metadata["fade_at"]),
silence_at=int(metadata["silence_at"]),
bitrate=int(metadata["bitrate"]),
)
track_id = track.id
session.commit()
except Exception:
raise ApplicationError("Can't create Track")
new_track = track_by_id(track_id)
if not new_track:
raise ApplicationError("Unable to create new track")
return new_track
@log_call
def update_track(
path: str, track_id: int, metadata: dict[str, str | int | float]
) -> TrackDTO:
"""
Update an existing track db entry return the DTO
"""
with db.Session() as session:
track = session.get(Tracks, track_id)
if not track:
raise ApplicationError(f"Can't retrieve Track ({track_id=})")
track.path = (str(metadata["path"]),)
track.title = (str(metadata["title"]),)
track.artist = (str(metadata["artist"]),)
track.duration = (int(metadata["duration"]),)
track.start_gap = (int(metadata["start_gap"]),)
track.fade_at = (int(metadata["fade_at"]),)
track.silence_at = (int(metadata["silence_at"]),)
track.bitrate = (int(metadata["bitrate"]),)
session.commit()
updated_track = track_by_id(track_id)
if not updated_track:
raise ApplicationError("Unable to retrieve updated track")
return updated_track
def set_track_intro(track_id: int, intro: int) -> None:
"""
Set track intro time
"""
with db.Session() as session:
session.execute(
update(Tracks)
.where(Tracks.id == track_id)
.values(intro=intro)
)
session.commit()
# Playlist functions
@log_call
def _playlists_where(
query: BinaryExpression | ColumnElement[bool],
) -> list[PlaylistDTO]:
"""
Return playlists selected by query
"""
stmt = select(
Playlists.favourite,
Playlists.is_template,
Playlists.id.label("playlist_id"),
Playlists.name,
Playlists.open,
).where(query)
results: list[PlaylistDTO] = []
with db.Session() as session:
records = session.execute(stmt).all()
for record in records:
dto = PlaylistDTO(
favourite=record.favourite,
is_template=record.is_template,
playlist_id=record.playlist_id,
name=record.name,
open=record.open,
)
results.append(dto)
return results
@log_call
def playlist_by_id(playlist_id: int) -> PlaylistDTO | None:
"""
Return playlist with specified id
"""
playlist_list = _playlists_where(Playlists.id == playlist_id)
if not playlist_list:
return None
if len(playlist_list) > 1:
raise ApplicationError(f"Duplicate {playlist_id=}")
return playlist_list[0]
def playlists_closed() -> list[PlaylistDTO]:
"""
Return a list of closed playlists
"""
return _playlists_where(Playlists.open.is_(False))
def playlists_open() -> list[PlaylistDTO]:
"""
Return a list of open playlists
"""
return _playlists_where(Playlists.open.is_(True))
def playlists_template_by_id(playlist_id: int) -> PlaylistDTO | None:
"""
Return a list of closed playlists
"""
playlist_list = _playlists_where(
Playlists.playlist_id == playlist_id, Playlists.is_template.is_(True)
)
if not playlist_list:
return None
if len(playlist_list) > 1:
raise ApplicationError(f"Duplicate {playlist_id=}")
return playlist_list[0]
def playlists_templates() -> list[PlaylistDTO]:
"""
Return a list of playlist templates
"""
return _playlists_where(Playlists.is_template.is_(True))
def get_all_playlists():
"""Return all playlists"""
return _playlists_where(Playlists.id > 0)
def delete_playlist(playlist_id: int) -> None:
"""Delete playlist"""
with db.Session() as session:
query = session.get(Playlists, playlist_id)
session.delete(query)
session.commit()
def save_as_template(playlist_id: int, template_name: str) -> None:
"""
Save playlist as templated
"""
new_template = create_playlist(template_name, 0, as_template=True)
copy_playlist(playlist_id, new_template.id)
def playlist_rename(playlist_id: int, new_name: str) -> None:
"""
Rename playlist
"""
with db.Session() as session:
session.execute(
update(Playlists)
.where(Playlists.id == playlist_id)
.values(name=new_name)
)
session.commit()
def _check_playlist_integrity(
session: Session, playlist_id: int, fix: bool = False
) -> None:
"""
Ensure the row numbers are contiguous. Fix and log if fix==True,
else raise ApplicationError.
"""
playlist_rows = (
session.execute(
select(PlaylistRows)
.where(PlaylistRows.playlist_id == playlist_id)
.order_by(PlaylistRows.row_number)
)
.scalars()
.all()
)
for idx, plr in enumerate(playlist_rows):
if plr.row_number == idx:
continue
msg = (
"_check_playlist_integrity: incorrect row number "
f"({plr.id=}, {plr.row_number=}, {idx=})"
)
if fix:
log.debug(msg)
plr.row_number = idx
else:
raise ApplicationError(msg)
@log_call
def playlist_mark_status(playlist_id: int, open: bool) -> None:
"""Mark playlist as open or closed"""
with db.Session() as session:
session.execute(
update(Playlists)
.where(Playlists.id == playlist_id)
.values(open=open)
)
session.commit()
@log_call
def _shift_rows(
session: Session, playlist_id: int, starting_row: int, shift_by: int
) -> None:
"""
Shift rows from starting_row by shift_by. If shift_by is +ve, shift rows
down; if -ve, shift them up.
"""
session.execute(
update(PlaylistRows)
.where(
(PlaylistRows.playlist_id == playlist_id),
(PlaylistRows.row_number >= starting_row),
)
.values(row_number=PlaylistRows.row_number + shift_by)
)
@log_call
def move_rows(
from_rows: list[int],
from_playlist_id: int,
to_row: int,
to_playlist_id: int | None = None,
) -> None:
"""
Move rows with or between playlists.
Algorithm:
- Sanity check row numbers
- Check there are no playlist rows with playlist_id == PENDING_MOVE
- Put rows to be moved into PENDING_MOVE playlist
- Resequence remaining row numbers
- Make space for moved rows
- Move the PENDING_MOVE rows back and fixup row numbers
- Sanity check row numbers
"""
# If to_playlist_id isn't specified, we're moving within the one
# playlist.
if to_playlist_id is None:
to_playlist_id = from_playlist_id
with db.Session() as session:
# Sanity check row numbers
_check_playlist_integrity(session, from_playlist_id, fix=False)
if from_playlist_id != to_playlist_id:
_check_playlist_integrity(session, to_playlist_id, fix=False)
# Check there are no playlist rows with playlist_id == PENDING_MOVE
pending_move_rows = get_playlist_rows(Config.PLAYLIST_PENDING_MOVE)
if pending_move_rows:
raise ApplicationError(f"move_rows_to_playlist: {pending_move_rows=}")
# We need playlist length if we're moving within a playlist. Get
# that now before we remove rows.
from_playlist_length = len(get_playlist_rows(from_playlist_id))
# Put rows to be moved into PENDING_MOVE playlist
session.execute(
update(PlaylistRows)
.where(
PlaylistRows.playlist_id == from_playlist_id,
PlaylistRows.row_number.in_(from_rows),
)
.values(playlist_id=Config.PLAYLIST_PENDING_MOVE)
)
# Resequence remaining row numbers
_check_playlist_integrity(session, from_playlist_id, fix=True)
session.commit()
# Make space for moved rows. If moving within one playlist,
# determning where to make the space is non-trivial. For example,
# if the playlist has ten entries and we're moving four of them
# to row 8, after we've moved the rows to the
# PLAYLIST_PENDING_MOVE there will only be six entries left.
# Clearly we can't make space at row 8...
space_row = to_row
if to_playlist_id == from_playlist_id:
overflow = max(to_row + len(from_rows) - from_playlist_length, 0)
if overflow != 0:
space_row = (
to_row - overflow - len([a for a in from_rows if a > to_row])
)
_shift_rows(session, to_playlist_id, space_row, len(from_rows))
# Move the PENDING_MOVE rows back and fixup row numbers
update_list: list[dict[str, int]] = []
next_row = space_row
# PLAYLIST_PENDING_MOVE may have gaps so don't check it
for row_to_move in get_playlist_rows(
Config.PLAYLIST_PENDING_MOVE, check_playlist_itegrity=False
):
update_list.append(
{"id": row_to_move.playlistrow_id, "row_number": next_row}
)
update_list.append(
{"id": row_to_move.playlistrow_id, "playlist_id": to_playlist_id}
)
next_row += 1
session.execute(update(PlaylistRows), update_list)
session.commit()
# Sanity check row numbers
_check_playlist_integrity(session, from_playlist_id, fix=False)
if from_playlist_id != to_playlist_id:
_check_playlist_integrity(session, to_playlist_id, fix=False)
@log_call
def update_row_numbers(
playlist_id: int, id_to_row_number: list[dict[int, int]]
) -> None:
"""
Update playlistrows rownumbers for passed playlistrow_ids
playlist_id is only needed for sanity checking
"""
with db.Session() as session:
session.execute(update(PlaylistRows), id_to_row_number)
session.commit()
# Sanity check
_check_playlist_integrity(session, playlist_id, fix=False)
@log_call
def create_playlist(name: str, template_id: int, as_template: bool = False) -> PlaylistDTO:
"""
Create playlist and return DTO.
"""
with db.Session() as session:
try:
playlist = Playlists(session, name, template_id)
playlist.is_template = as_template
playlist_id = playlist.id
session.commit()
except Exception:
raise ApplicationError("Can't create Playlist")
if template_id != 0:
copy_playlist(template_id, playlist_id)
new_playlist = playlist_by_id(playlist_id)
if not new_playlist:
raise ApplicationError("Can't retrieve new Playlist")
return new_playlist
def copy_playlist(src_id: int, dst_id: int) -> None:
"""Copy playlist entries"""
with db.Session() as session:
src_rows = session.scalars(
select(PlaylistRows).where(PlaylistRows.playlist_id == src_id)
).all()
for plr in src_rows:
PlaylistRows(
session=session,
playlist_id=dst_id,
row_number=plr.row_number,
note=plr.note,
track_id=plr.track_id,
)
session.commit()
def playlist_row_count(playlist_id: int) -> int:
"""
Return number of rows in playlist
"""
with db.Session() as session:
count = session.scalar(
select(func.count())
.select_from(PlaylistRows)
.where(PlaylistRows.playlist_id == playlist_id)
)
return count
@log_call
def insert_row(
playlist_id: int, row_number: int, track_id: int | None, note: str
) -> PlaylistRowDTO:
"""
Insert a new row into playlist and return new row DTO
"""
with db.Session() as session:
# Sanity check
_check_playlist_integrity(session, playlist_id, fix=False)
# Make space for new row
_shift_rows(
session=session,
playlist_id=playlist_id,
starting_row=row_number,
shift_by=1,
)
playlist_row = PlaylistRows(
session=session,
playlist_id=playlist_id,
row_number=row_number,
note=note,
track_id=track_id,
)
session.commit()
playlist_row_id = playlist_row.id
# Sanity check
_check_playlist_integrity(session, playlist_id, fix=False)
new_playlist_row = get_playlist_row(playlistrow_id=playlist_row_id)
if not new_playlist_row:
raise ApplicationError("Can't retrieve new playlist row")
return new_playlist_row
@log_call
def remove_comments(playlist_id: int, row_numbers: list[int]) -> None:
"""
Remove comments from rows in playlist
"""
with db.Session() as session:
session.execute(
update(PlaylistRows)
.where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.row_number.in_(row_numbers),
)
.values(note="")
)
session.commit()
@log_call
def remove_rows(playlist_id: int, row_numbers: list[int]) -> None:
"""
Remove rows from playlist
Delete from highest row back so that not yet deleted row numbers don't change.
"""
with db.Session() as session:
for row_number in sorted(row_numbers, reverse=True):
session.execute(
delete(PlaylistRows).where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.row_number == row_number,
)
)
# Fixup row number to remove gaps
_check_playlist_integrity(session, playlist_id, fix=True)
session.commit()
@log_call
def update_template_favourite(template_id: int, favourite: bool) -> None:
"""Update template favourite"""
with db.Session() as session:
session.execute(
update(Playlists)
.where(Playlists.id == template_id)
.values(favourite=favourite)
)
session.commit()
@log_call
def playlist_save_tabs(playlist_id_to_tab: dict[int, int]) -> None:
"""
Save the tab numbers of the open playlists.
"""
with db.Session() as session:
# Clear all existing tab numbers
session.execute(
update(Playlists)
.where(Playlists.id.in_(playlist_id_to_tab.keys()))
.values(tab=None)
)
for (playlist_id, tab) in playlist_id_to_tab.items():
session.execute(
update(Playlists)
.where(Playlists.id == playlist_id)
.values(tab=tab)
)
session.commit()
# Playlist Rows
@log_call
def get_playlist_row(playlistrow_id: int) -> PlaylistRowDTO | None:
"""
Return specific row DTO
"""
with db.Session() as session:
record = (
session.execute(select(PlaylistRows).where(PlaylistRows.id == playlistrow_id))
.scalars()
.one_or_none()
)
if not record:
return None
track = None
if record.track_id:
track = track_by_id(record.track_id)
dto = PlaylistRowDTO(
note=record.note,
played=record.played,
playlist_id=record.playlist_id,
playlistrow_id=record.id,
row_number=record.row_number,
track=track,
)
return dto
def get_playlist_rows(
playlist_id: int, check_playlist_itegrity: bool = True
) -> list[PlaylistRowDTO]:
with db.Session() as session:
# TODO: would be good to be confident at removing this
if check_playlist_itegrity:
_check_playlist_integrity(
session=session, playlist_id=playlist_id, fix=False
)
records = session.scalars(
select(PlaylistRows)
.where(PlaylistRows.playlist_id == playlist_id)
.order_by(PlaylistRows.row_number)
).all()
dto_list = []
for record in records:
track = None
if record.track_id:
track = track_by_id(record.track_id)
dto = PlaylistRowDTO(
note=record.note,
played=record.played,
playlist_id=record.playlist_id,
playlistrow_id=record.id,
row_number=record.row_number,
track=track,
)
dto_list.append(dto)
return dto_list
# Playdates
@log_call
def get_last_played_dates(track_id: int, limit: int = 5) -> str:
"""
Return the most recent 'limit' dates that this track has been played
as a text list
"""
with db.Session() as session:
playdates = session.scalars(
Playdates.select()
.where(Playdates.track_id == track_id)
.order_by(Playdates.lastplayed.desc())
.limit(limit)
).all()
return "<br>".join(
[
a.lastplayed.strftime(Config.LAST_PLAYED_TOOLTIP_DATE_FORMAT)
for a in playdates
]
)
def update_playdates(track_id: int) -> None:
"""
Update playdates for passed track
"""
with db.Session() as session:
_ = Playdates(session, track_id)
def playdates_between_dates(
start: dt.datetime, end: dt.datetime | None = None
) -> list[PlaydatesDTO]:
"""
Return a list of PlaydateDTO objects from between times (until now if end is None)
"""
if end is None:
end = dt.datetime.now()
stmt = select(
Playdates.id.label("playdate_id"),
Playdates.lastplayed,
Playdates.track_id,
Playdates.track,
).where(
Playdates.lastplayed >= start,
Playdates.lastplayed <= end
)
results: list[PlaydatesDTO] = []
with db.Session() as session:
records = session.execute(stmt).all()
for record in records:
dto = PlaydatesDTO(
playdate_id=record.playdate_id,
lastplayed=record.lastplayed,
track_id=record.track_id,
artist=record.track.artist,
bitrate=record.track.bitrate,
duration=record.track.duration,
fade_at=record.track.fade_at,
intro=record.track.intro,
path=record.track.path,
silence_at=record.track.silence_at,
start_gap=record.track.start_gap,
title=record.track.title,
)
results.append(dto)
return results
# Queries
@log_call
def _queries_where(
query: BinaryExpression | ColumnElement[bool],
) -> list[QueryDTO]:
"""
Return queries selected by query
"""
results: list[QueryDTO] = []
with db.Session() as session:
records = session.scalars(
select(Queries)
.where(query)
).all()
for record in records:
dto = QueryDTO(
favourite=record.favourite,
filter=record.filter,
name=record.name,
query_id=record.id,
)
results.append(dto)
return results
def get_all_queries(favourites_only: bool = False) -> list[QueryDTO]:
"""Return a list of all queries"""
query = Queries.id > 0
return _queries_where(query)
def query_by_id(query_id: int) -> QueryDTO | None:
"""Return query"""
query_list = _queries_where(Queries.id == query_id)
if not query_list:
return None
if len(query_list) > 1:
raise ApplicationError(f"Duplicate {query_id=}")
return query_list[0]
def update_query_filter(query_id: int, filter: Filter) -> None:
"""Update query filter"""
with db.Session() as session:
session.execute(
update(Queries).where(Queries.id == query_id).values(filter=filter)
)
session.commit()
def delete_query(query_id: int) -> None:
"""Delete query"""
with db.Session() as session:
query = session.get(Queries, query_id)
session.delete(query)
session.commit()
def update_query_name(query_id: int, name: str) -> None:
"""Update query name"""
with db.Session() as session:
session.execute(update(Queries).where(Queries.id == query_id).values(name=name))
session.commit()
def update_query_favourite(query_id: int, favourite: bool) -> None:
"""Update query favourite"""
with db.Session() as session:
session.execute(
update(Queries).where(Queries.id == query_id).values(favourite=favourite)
)
session.commit()
def create_query(name: str, filter: Filter) -> QueryDTO:
"""
Create a query and return the DTO
"""
with db.Session() as session:
try:
query = Queries(session=session, name=name, filter=filter)
query_id = query.id
session.commit()
except Exception:
raise ApplicationError("Can't create Query")
new_query = query_by_id(query_id)
if not new_query:
raise ApplicationError("Unable to create new query")
return new_query
# Misc
def get_setting(name: str) -> int | None:
"""
Get int setting
"""
with db.Session() as session:
record = (
session.execute(select(Settings).where(Settings.name == name))
.scalars()
.one_or_none()
)
if not record:
return None
return record.f_int
def set_setting(name: str, value: int) -> None:
"""
Add int setting
"""
with db.Session() as session:
record = (
session.execute(select(Settings).where(Settings.name == name))
.scalars()
.one_or_none()
)
if not record:
record = Settings(session=session, name=name)
if not record:
raise ApplicationError("Can't create Settings record")
record.f_int = value
session.commit()
def get_db_name() -> str:
"""Return database name"""
with db.Session() as session:
if session.bind:
dbname = session.bind.engine.url.database
return dbname
return Config.DB_NOT_FOUND