musicmuster/app/repository.py
2025-03-27 11:24:13 +00:00

769 lines
22 KiB
Python

# Standard library imports
import re
# PyQt imports
# Third party imports
from sqlalchemy import (
delete,
func,
select,
update,
)
from sqlalchemy.orm import aliased
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.elements import BinaryExpression, ColumnElement
from classes import ApplicationError, PlaylistRowDTO
# App imports
from classes import PlaylistDTO, TrackDTO
from config import Config
import helpers
from log import log
from models import (
db,
NoteColours,
Playdates,
PlaylistRows,
Playlists,
Settings,
Tracks,
)
# Notecolour functions
def get_colour(text: str, foreground: bool = False) -> str:
"""
Parse text and return background (foreground if foreground==True)
colour string if matched, else None
"""
if not text:
return ""
match = False
with db.Session() as session:
for rec in NoteColours.get_all(session):
if rec.is_regex:
flags = re.UNICODE
if not rec.is_casesensitive:
flags |= re.IGNORECASE
p = re.compile(rec.substring, flags)
if p.match(text):
match = True
else:
if rec.is_casesensitive:
if rec.substring in text:
match = True
else:
if rec.substring.lower() in text.lower():
match = True
if match:
if foreground:
return rec.foreground or ""
else:
return rec.colour
return ""
# Track functions
def add_track_to_header(playlistrow_id: int, track_id: int) -> None:
"""
Add a track to this (header) row
"""
with db.Session() as session:
session.execute(
update(PlaylistRows)
.where(PlaylistRows.id == playlistrow_id)
.values(track_id=track_id)
)
session.commit()
def create_track(path: str) -> TrackDTO:
"""
Create a track db entry from a track path and return the DTO
"""
metadata = helpers.get_all_track_metadata(path)
with db.Session() as session:
try:
track = Tracks(
session=session,
path=str(metadata["path"]),
title=str(metadata["title"]),
artist=str(metadata["artist"]),
duration=int(metadata["duration"]),
start_gap=int(metadata["start_gap"]),
fade_at=int(metadata["fade_at"]),
silence_at=int(metadata["silence_at"]),
bitrate=int(metadata["bitrate"]),
)
track_id = track.id
session.commit()
except Exception:
raise ApplicationError("Can't create Track")
new_track = track_by_id(track_id)
if not new_track:
raise ApplicationError("Unable to create new track")
return new_track
def get_all_tracks() -> list[TrackDTO]:
"""Return a list of all tracks"""
return _tracks_where(Tracks.id > 0)
def track_by_id(track_id: int) -> TrackDTO | None:
"""
Return track with specified id
"""
# Alias PlaydatesTable for subquery
LatestPlaydate = aliased(Playdates)
# Subquery: latest playdate for each track
latest_playdate_subq = (
select(
LatestPlaydate.track_id,
func.max(LatestPlaydate.lastplayed).label("lastplayed"),
)
.group_by(LatestPlaydate.track_id)
.subquery()
)
stmt = (
select(
Tracks.id.label("track_id"),
Tracks.artist,
Tracks.bitrate,
Tracks.duration,
Tracks.fade_at,
Tracks.intro,
Tracks.path,
Tracks.silence_at,
Tracks.start_gap,
Tracks.title,
latest_playdate_subq.c.lastplayed,
)
.outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id)
.where(Tracks.id == track_id)
)
with db.Session() as session:
record = session.execute(stmt).one_or_none()
if not record:
return None
dto = TrackDTO(
artist=record.artist,
bitrate=record.bitrate,
duration=record.duration,
fade_at=record.fade_at,
intro=record.intro,
lastplayed=record.lastplayed,
path=record.path,
silence_at=record.silence_at,
start_gap=record.start_gap,
title=record.title,
track_id=record.track_id,
)
return dto
def _tracks_where(where: BinaryExpression | ColumnElement[bool]) -> list[TrackDTO]:
"""
Return tracks selected by where
"""
# Alias PlaydatesTable for subquery
LatestPlaydate = aliased(Playdates)
# Subquery: latest playdate for each track
latest_playdate_subq = (
select(
LatestPlaydate.track_id,
func.max(LatestPlaydate.lastplayed).label("lastplayed"),
)
.group_by(LatestPlaydate.track_id)
.subquery()
)
stmt = (
select(
Tracks.id.label("track_id"),
Tracks.artist,
Tracks.bitrate,
Tracks.duration,
Tracks.fade_at,
Tracks.intro,
Tracks.path,
Tracks.silence_at,
Tracks.start_gap,
Tracks.title,
latest_playdate_subq.c.lastplayed,
)
.outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id)
.where(where)
)
results: list[TrackDTO] = []
with db.Session() as session:
records = session.execute(stmt).all()
for record in records:
dto = TrackDTO(
artist=record.artist,
bitrate=record.bitrate,
duration=record.duration,
fade_at=record.fade_at,
intro=record.intro,
lastplayed=record.lastplayed,
path=record.path,
silence_at=record.silence_at,
start_gap=record.start_gap,
title=record.title,
track_id=record.track_id,
)
results.append(dto)
return results
def tracks_like_artist(filter_str: str) -> list[TrackDTO]:
"""
Return tracks where artist is like filter
"""
return _tracks_where(Tracks.artist.ilike(f"%{filter_str}%"))
def tracks_like_title(filter_str: str) -> list[TrackDTO]:
"""
Return tracks where title is like filter
"""
return _tracks_where(Tracks.title.ilike(f"%{filter_str}%"))
# Playlist functions
def _check_playlist_integrity(
session: Session, playlist_id: int, fix: bool = False
) -> None:
"""
Ensure the row numbers are contiguous. Fix and log if fix==True,
else raise ApplicationError.
"""
playlist_rows = (
session.execute(
select(PlaylistRows)
.where(PlaylistRows.playlist_id == playlist_id)
.order_by(PlaylistRows.row_number)
)
.scalars()
.all()
)
for idx, plr in enumerate(playlist_rows):
if plr.row_number == idx:
continue
msg = (
"_check_playlist_integrity: incorrect row number "
f"({plr.id=}, {plr.row_number=}, {idx=})"
)
if fix:
log.debug(msg)
plr.row_number = idx
session.commit()
else:
raise ApplicationError(msg)
def _move_rows(
session: Session, playlist_id: int, starting_row: int, move_by: int
) -> None:
"""
Move rows from starting_row by move_by. If move_by is +ve, move rows
down; if -ve, move them up.
"""
log.debug(f"(_move_rows_down({playlist_id=}, {starting_row=}, {move_by=}")
session.execute(
update(PlaylistRows)
.where(
(PlaylistRows.playlist_id == playlist_id),
(PlaylistRows.row_number >= starting_row),
)
.values(row_number=PlaylistRows.row_number + move_by)
)
session.commit()
def move_rows_to_playlist(
from_rows: list[int], from_playlist_id: int, to_row: int, to_playlist_id: int
) -> None:
"""
Move rows between playlists.
"""
with db.Session() as session:
# Prepare desination playlist
# Find last used row
last_row = session.execute(
select(func.max(PlaylistRows.row_number)).where(
PlaylistRows.playlist_id == to_playlist_id
)
).scalar_one()
if last_row is None:
last_row = -1
# Make room in destination
if to_row <= last_row:
_move_rows(session, to_playlist_id, to_row, len(from_rows))
# Move rows
row_offset = to_row - min(from_rows)
stmt = (
update(PlaylistRows)
.where(
PlaylistRows.playlist_id == from_playlist_id,
PlaylistRows.row_number.in_(from_rows),
)
.values(
playlist_id=to_playlist_id,
row_number=PlaylistRows.row_number + row_offset,
)
)
session.execute(stmt)
# Remove gaps in source
_move_rows(
session=session,
playlist_id=from_playlist_id,
starting_row=max(from_rows) + 1,
move_by=(len(from_rows) * -1),
)
# Commit changes
session.commit()
# Sanity check
_check_playlist_integrity(session, from_playlist_id, fix=False)
_check_playlist_integrity(session, to_playlist_id, fix=False)
def move_rows_within_playlist(
playlist_id: int, from_rows: list[int], to_row: int
) -> None:
"""
Move rows within a playlist.
Algorithm:
- Sanity check row numbers
- Check there are no playlist rows with playlist_id == PENDING_MOVE
- Put rows to be moved into PENDING_MOVE playlist
- Resequence remaining row numbers
- Make space for moved rows
- Move the PENDING_MOVE rows back and fixup row numbers
- Sanity check row numbers
"""
log.debug(f"move_rows_within_playlist({playlist_id=}, {from_rows=}, {to_row=})")
with db.Session() as session:
# Sanity check row numbers
_check_playlist_integrity(session, playlist_id, fix=False)
# Check there are no playlist rows with playlist_id == PENDING_MOVE
pending_move_rows = get_playlist_rows(Config.PLAYLIST_PENDING_MOVE)
if pending_move_rows:
raise ApplicationError(f"move_rows_within_playlist: {pending_move_rows=}")
# Get length of playlist
playlist_length = len(get_playlist_rows(playlist_id))
# Put rows to be moved into PENDING_MOVE playlist
session.execute(
update(PlaylistRows)
.where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.row_number.in_(from_rows),
)
.values(playlist_id=Config.PLAYLIST_PENDING_MOVE)
)
session.commit()
# Resequence remaining row numbers
_check_playlist_integrity(session, playlist_id, fix=True)
# Make space for moved rows. Determning where to make the space
# is non-trivial. For example, if the playlist has ten entries
# and we're moving four of them to row 8, after we've moved the
# rows to the PLAYLIST_PENDING_MOVE there will only be six
# entries left. Clearly we can't make space at row 8...
overflow = max(to_row + len(from_rows) - playlist_length, 0)
if overflow == 0:
space_row = to_row
else:
space_row = to_row - overflow - len([a for a in from_rows if a > to_row])
_move_rows(session, playlist_id, space_row, len(from_rows))
# Move the PENDING_MOVE rows back and fixup row numbers
update_list: list[dict[str, int]] = []
next_row = space_row
for row_to_move in get_playlist_rows(Config.PLAYLIST_PENDING_MOVE):
update_list.append({"id": row_to_move.playlistrow_id, "row_number": next_row})
update_list.append({"id": row_to_move.playlistrow_id, "playlist_id": playlist_id})
next_row += 1
session.execute(update(PlaylistRows), update_list)
session.commit()
# Sanity check row numbers
_check_playlist_integrity(session, playlist_id, fix=False)
def update_row_numbers(
playlist_id: int, id_to_row_number: list[dict[int, int]]
) -> None:
"""
Update playlistrows rownumbers for pass playlistrow_ids
playlist_id is only needed for sanity checking
"""
with db.Session() as session:
session.execute(update(PlaylistRows), id_to_row_number)
session.commit()
# Sanity check
_check_playlist_integrity(session, playlist_id, fix=False)
def create_playlist(name: str, template_id: int) -> PlaylistDTO:
"""
Create playlist and return DTO.
"""
with db.Session() as session:
try:
playlist = Playlists(session, name, template_id)
playlist_id = playlist.id
session.commit()
except Exception:
raise ApplicationError("Can't create Playlist")
new_playlist = playlist_by_id(playlist_id)
if not new_playlist:
raise ApplicationError("Can't retrieve new Playlist")
return new_playlist
def get_playlist_row(playlistrow_id: int) -> PlaylistRowDTO | None:
"""
Return specific row DTO
"""
# Alias PlaydatesTable for subquery
LatestPlaydate = aliased(Playdates)
# Subquery: latest playdate for each track
latest_playdate_subq = (
select(
LatestPlaydate.track_id,
func.max(LatestPlaydate.lastplayed).label("lastplayed"),
)
.group_by(LatestPlaydate.track_id)
.subquery()
)
stmt = (
select(
PlaylistRows.id.label("playlistrow_id"),
PlaylistRows.row_number,
PlaylistRows.note,
PlaylistRows.played,
PlaylistRows.playlist_id,
Tracks.id.label("track_id"),
Tracks.artist,
Tracks.bitrate,
Tracks.duration,
Tracks.fade_at,
Tracks.intro,
Tracks.path,
Tracks.silence_at,
Tracks.start_gap,
Tracks.title,
latest_playdate_subq.c.lastplayed,
)
.outerjoin(Tracks, PlaylistRows.track_id == Tracks.id)
.outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id)
.where(PlaylistRows.id == playlistrow_id)
.order_by(PlaylistRows.row_number)
)
with db.Session() as session:
record = session.execute(stmt).one_or_none()
if not record:
return None
# Handle cases where track_id is None (no track associated)
if record.track_id is None:
dto = PlaylistRowDTO(
artist="",
bitrate=0,
duration=0,
fade_at=0,
intro=None,
lastplayed=None,
note=record.note,
path="",
played=record.played,
playlist_id=record.playlist_id,
playlistrow_id=record.playlistrow_id,
row_number=record.row_number,
silence_at=0,
start_gap=0,
title="",
track_id=-1,
)
else:
dto = PlaylistRowDTO(
artist=record.artist,
bitrate=record.bitrate,
duration=record.duration,
fade_at=record.fade_at,
intro=record.intro,
lastplayed=record.lastplayed,
note=record.note,
path=record.path,
played=record.played,
playlist_id=record.playlist_id,
playlistrow_id=record.playlistrow_id,
row_number=record.row_number,
silence_at=record.silence_at,
start_gap=record.start_gap,
title=record.title,
track_id=record.track_id,
)
return dto
def get_playlist_rows(playlist_id: int) -> list[PlaylistRowDTO]:
# Alias PlaydatesTable for subquery
LatestPlaydate = aliased(Playdates)
# Subquery: latest playdate for each track
latest_playdate_subq = (
select(
LatestPlaydate.track_id,
func.max(LatestPlaydate.lastplayed).label("lastplayed"),
)
.group_by(LatestPlaydate.track_id)
.subquery()
)
stmt = (
select(
PlaylistRows.id.label("playlistrow_id"),
PlaylistRows.row_number,
PlaylistRows.note,
PlaylistRows.played,
PlaylistRows.playlist_id,
Tracks.id.label("track_id"),
Tracks.artist,
Tracks.bitrate,
Tracks.duration,
Tracks.fade_at,
Tracks.intro,
Tracks.path,
Tracks.silence_at,
Tracks.start_gap,
Tracks.title,
latest_playdate_subq.c.lastplayed,
)
.outerjoin(Tracks, PlaylistRows.track_id == Tracks.id)
.outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id)
.where(PlaylistRows.playlist_id == playlist_id)
.order_by(PlaylistRows.row_number)
)
with db.Session() as session:
results = session.execute(stmt).all()
# Sanity check
# TODO: would be good to be confident at removing this
_check_playlist_integrity(session=session, playlist_rows=results, fix=False)
dto_list = []
for row in results:
# Handle cases where track_id is None (no track associated)
if row.track_id is None:
dto = PlaylistRowDTO(
artist="",
bitrate=0,
duration=0,
fade_at=0,
intro=None,
lastplayed=None,
note=row.note,
path="",
played=row.played,
playlist_id=row.playlist_id,
playlistrow_id=row.playlistrow_id,
row_number=row.row_number,
silence_at=0,
start_gap=0,
title="",
track_id=-1,
# Additional fields like row_fg, row_bg, etc., use default None values
)
else:
dto = PlaylistRowDTO(
artist=row.artist,
bitrate=row.bitrate,
duration=row.duration,
fade_at=row.fade_at,
intro=row.intro,
lastplayed=row.lastplayed,
note=row.note,
path=row.path,
played=row.played,
playlist_id=row.playlist_id,
playlistrow_id=row.playlistrow_id,
row_number=row.row_number,
silence_at=row.silence_at,
start_gap=row.start_gap,
title=row.title,
track_id=row.track_id,
# Additional fields like row_fg, row_bg, etc., use default None values
)
dto_list.append(dto)
return dto_list
def insert_row(
playlist_id: int, row_number: int, track_id: int | None, note: str
) -> PlaylistRowDTO:
"""
Insert a new row into playlist and return new row DTO
"""
with db.Session() as session:
# Sanity check
_check_playlist_integrity(session, playlist_id, fix=False)
# Make space for new row
_move_rows(
session=session, playlist_id=playlist_id, starting_row=row_number, move_by=1
)
playlist_row = PlaylistRows.insert_row(
session=session,
playlist_id=playlist_id,
new_row_number=row_number,
note=note,
track_id=track_id,
)
session.commit()
playlist_row_id = playlist_row.id
# Sanity check
_check_playlist_integrity(session, playlist_id, fix=False)
new_playlist_row = get_playlist_row(playlistrow_id=playlist_row_id)
if not new_playlist_row:
raise ApplicationError("Can't retrieve new playlist row")
return new_playlist_row
def remove_rows(playlist_id: int, row_numbers: list[int]) -> None:
"""
Remove rows from playlist
Delete from highest row back so that not yet deleted row numbers don't change.
"""
log.debug(f"remove_rows({playlist_id=}, {row_numbers=}")
with db.Session() as session:
for row_number in sorted(row_numbers, reverse=True):
session.execute(
delete(PlaylistRows).where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.row_number == row_number,
)
)
# Fixup row number to remove gaps
_check_playlist_integrity(session, playlist_id, fix=True)
session.commit()
def playlist_by_id(playlist_id: int) -> PlaylistDTO | None:
"""
Return playlist with specified id
"""
stmt = select(
Playlists.id.label("playlist_id"),
Playlists.name,
Playlists.favourite,
Playlists.is_template,
Playlists.open,
).where(Playlists.id == playlist_id)
with db.Session() as session:
record = session.execute(stmt).one_or_none()
if not record:
return None
dto = PlaylistDTO(
name=record.name,
playlist_id=record.playlist_id,
favourite=record.favourite,
is_template=record.is_template,
open=record.open,
)
return dto
# Misc
def get_setting(name: str) -> int | None:
"""
Get int setting
"""
with db.Session() as session:
record = session.execute(
select(Settings).where(Settings.name == name)
).one_or_none()
if not record:
return None
return record.f_int
def set_setting(name: str, value: int) -> None:
"""
Add int setting
"""
with db.Session() as session:
record = session.execute(
select(Settings).where(Settings.name == name)
).one_or_none()
if not record:
record = Settings(session=session, name=name)
if not record:
raise ApplicationError("Can't create Settings record")
record.f_int = value
session.commit()