892 lines
26 KiB
Python
892 lines
26 KiB
Python
# Standard library imports
|
|
import re
|
|
|
|
# PyQt imports
|
|
|
|
# Third party imports
|
|
from sqlalchemy import (
|
|
delete,
|
|
func,
|
|
select,
|
|
update,
|
|
)
|
|
from sqlalchemy.orm import aliased
|
|
from sqlalchemy.orm.session import Session
|
|
from sqlalchemy.sql.elements import BinaryExpression, ColumnElement
|
|
from classes import ApplicationError, PlaylistRowDTO
|
|
|
|
# App imports
|
|
from classes import PlaylistDTO, TrackDTO
|
|
from config import Config
|
|
from log import log, log_call
|
|
from models import (
|
|
db,
|
|
NoteColours,
|
|
Playdates,
|
|
PlaylistRows,
|
|
Playlists,
|
|
Settings,
|
|
Tracks,
|
|
)
|
|
|
|
|
|
# Helper functions
|
|
|
|
def _remove_substring_case_insensitive(parent_string: str, substring: str) -> str:
|
|
"""
|
|
Remove all instances of substring from parent string, case insensitively
|
|
"""
|
|
|
|
# Convert both strings to lowercase for case-insensitive comparison
|
|
lower_parent = parent_string.lower()
|
|
lower_substring = substring.lower()
|
|
|
|
# Initialize the result string
|
|
result = parent_string
|
|
|
|
# Continue removing the substring until it's no longer found
|
|
while lower_substring in lower_parent:
|
|
# Find the index of the substring
|
|
index = lower_parent.find(lower_substring)
|
|
|
|
# Remove the substring
|
|
result = result[:index] + result[index + len(substring) :]
|
|
|
|
# Update the lowercase versions
|
|
lower_parent = result.lower()
|
|
|
|
return result
|
|
|
|
|
|
# Notecolour functions
|
|
def _get_colour_record(text: str) -> tuple[NoteColours | None, str]:
|
|
"""
|
|
Parse text and return first matching colour record or None
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
for rec in NoteColours.get_all(session):
|
|
if rec.is_regex:
|
|
flags = re.UNICODE
|
|
if not rec.is_casesensitive:
|
|
flags |= re.IGNORECASE
|
|
p = re.compile(rec.substring, flags)
|
|
if p.match(text):
|
|
if rec.strip_substring:
|
|
return_text = re.sub(p, "", text)
|
|
else:
|
|
return_text = text
|
|
return (rec, return_text)
|
|
else:
|
|
if rec.is_casesensitive:
|
|
if rec.substring in text:
|
|
return_text = text.replace(rec.substring, "")
|
|
return (rec, return_text)
|
|
else:
|
|
if rec.substring.lower() in text.lower():
|
|
return_text = _remove_substring_case_insensitive(
|
|
text, rec.substring
|
|
)
|
|
return (rec, return_text)
|
|
|
|
return (None, text)
|
|
|
|
|
|
def get_colour(text: str, foreground: bool = False) -> str:
|
|
"""
|
|
Parse text and return background (foreground if foreground==True)
|
|
colour string if matched, else None
|
|
"""
|
|
|
|
(rec, _) = _get_colour_record(text)
|
|
if rec is None:
|
|
return ""
|
|
elif foreground:
|
|
return rec.foreground or ""
|
|
else:
|
|
return rec.colour
|
|
|
|
|
|
def remove_colour_substring(text: str) -> str:
|
|
"""
|
|
Remove text that identifies the colour to be used if strip_substring is True
|
|
"""
|
|
|
|
(rec, stripped_text) = _get_colour_record(text)
|
|
|
|
return stripped_text
|
|
|
|
|
|
# Track functions
|
|
def add_track_to_header(playlistrow_id: int, track_id: int) -> None:
|
|
"""
|
|
Add a track to this (header) row
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
session.execute(
|
|
update(PlaylistRows)
|
|
.where(PlaylistRows.id == playlistrow_id)
|
|
.values(track_id=track_id)
|
|
)
|
|
session.commit()
|
|
|
|
|
|
def create_track(path: str, metadata: dict[str, str | int | float]) -> TrackDTO:
|
|
"""
|
|
Create a track db entry from a track path and return the DTO
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
try:
|
|
track = Tracks(
|
|
session=session,
|
|
path=str(metadata["path"]),
|
|
title=str(metadata["title"]),
|
|
artist=str(metadata["artist"]),
|
|
duration=int(metadata["duration"]),
|
|
start_gap=int(metadata["start_gap"]),
|
|
fade_at=int(metadata["fade_at"]),
|
|
silence_at=int(metadata["silence_at"]),
|
|
bitrate=int(metadata["bitrate"]),
|
|
)
|
|
|
|
track_id = track.id
|
|
session.commit()
|
|
except Exception:
|
|
raise ApplicationError("Can't create Track")
|
|
|
|
new_track = track_by_id(track_id)
|
|
if not new_track:
|
|
raise ApplicationError("Unable to create new track")
|
|
|
|
return new_track
|
|
|
|
|
|
def update_track(
|
|
path: str, track_id: int, metadata: dict[str, str | int | float]
|
|
) -> TrackDTO:
|
|
"""
|
|
Update an existing track db entry return the DTO
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
track = session.get(Tracks, track_id)
|
|
if not track:
|
|
raise ApplicationError(f"Can't retrieve Track ({track_id=})")
|
|
track.path = (str(metadata["path"]),)
|
|
track.title = (str(metadata["title"]),)
|
|
track.artist = (str(metadata["artist"]),)
|
|
track.duration = (int(metadata["duration"]),)
|
|
track.start_gap = (int(metadata["start_gap"]),)
|
|
track.fade_at = (int(metadata["fade_at"]),)
|
|
track.silence_at = (int(metadata["silence_at"]),)
|
|
track.bitrate = (int(metadata["bitrate"]),)
|
|
|
|
session.commit()
|
|
|
|
updated_track = track_by_id(track_id)
|
|
if not updated_track:
|
|
raise ApplicationError("Unable to retrieve updated track")
|
|
|
|
return updated_track
|
|
|
|
|
|
def get_all_tracks() -> list[TrackDTO]:
|
|
"""Return a list of all tracks"""
|
|
|
|
return _tracks_where(Tracks.id > 0)
|
|
|
|
|
|
def track_by_id(track_id: int) -> TrackDTO | None:
|
|
"""
|
|
Return track with specified id
|
|
"""
|
|
|
|
# Alias PlaydatesTable for subquery
|
|
LatestPlaydate = aliased(Playdates)
|
|
|
|
# Subquery: latest playdate for each track
|
|
latest_playdate_subq = (
|
|
select(
|
|
LatestPlaydate.track_id,
|
|
func.max(LatestPlaydate.lastplayed).label("lastplayed"),
|
|
)
|
|
.group_by(LatestPlaydate.track_id)
|
|
.subquery()
|
|
)
|
|
|
|
stmt = (
|
|
select(
|
|
Tracks.id.label("track_id"),
|
|
Tracks.artist,
|
|
Tracks.bitrate,
|
|
Tracks.duration,
|
|
Tracks.fade_at,
|
|
Tracks.intro,
|
|
Tracks.path,
|
|
Tracks.silence_at,
|
|
Tracks.start_gap,
|
|
Tracks.title,
|
|
latest_playdate_subq.c.lastplayed,
|
|
)
|
|
.outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id)
|
|
.where(Tracks.id == track_id)
|
|
)
|
|
|
|
with db.Session() as session:
|
|
record = session.execute(stmt).one_or_none()
|
|
if not record:
|
|
return None
|
|
|
|
dto = TrackDTO(
|
|
artist=record.artist,
|
|
bitrate=record.bitrate,
|
|
duration=record.duration,
|
|
fade_at=record.fade_at,
|
|
intro=record.intro,
|
|
lastplayed=record.lastplayed,
|
|
path=record.path,
|
|
silence_at=record.silence_at,
|
|
start_gap=record.start_gap,
|
|
title=record.title,
|
|
track_id=record.track_id,
|
|
)
|
|
|
|
return dto
|
|
|
|
|
|
def _tracks_where(where: BinaryExpression | ColumnElement[bool]) -> list[TrackDTO]:
|
|
"""
|
|
Return tracks selected by where
|
|
"""
|
|
|
|
# Alias PlaydatesTable for subquery
|
|
LatestPlaydate = aliased(Playdates)
|
|
|
|
# Subquery: latest playdate for each track
|
|
latest_playdate_subq = (
|
|
select(
|
|
LatestPlaydate.track_id,
|
|
func.max(LatestPlaydate.lastplayed).label("lastplayed"),
|
|
)
|
|
.group_by(LatestPlaydate.track_id)
|
|
.subquery()
|
|
)
|
|
|
|
stmt = (
|
|
select(
|
|
Tracks.id.label("track_id"),
|
|
Tracks.artist,
|
|
Tracks.bitrate,
|
|
Tracks.duration,
|
|
Tracks.fade_at,
|
|
Tracks.intro,
|
|
Tracks.path,
|
|
Tracks.silence_at,
|
|
Tracks.start_gap,
|
|
Tracks.title,
|
|
latest_playdate_subq.c.lastplayed,
|
|
)
|
|
.outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id)
|
|
.where(where)
|
|
)
|
|
|
|
results: list[TrackDTO] = []
|
|
|
|
with db.Session() as session:
|
|
records = session.execute(stmt).all()
|
|
for record in records:
|
|
dto = TrackDTO(
|
|
artist=record.artist,
|
|
bitrate=record.bitrate,
|
|
duration=record.duration,
|
|
fade_at=record.fade_at,
|
|
intro=record.intro,
|
|
lastplayed=record.lastplayed,
|
|
path=record.path,
|
|
silence_at=record.silence_at,
|
|
start_gap=record.start_gap,
|
|
title=record.title,
|
|
track_id=record.track_id,
|
|
)
|
|
results.append(dto)
|
|
|
|
return results
|
|
|
|
|
|
def track_with_path(path: str) -> bool:
|
|
"""
|
|
Return True if a track with passed path exists, else False
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
track = (
|
|
session.execute(select(Tracks).where(Tracks.path == path))
|
|
.scalars()
|
|
.one_or_none()
|
|
)
|
|
|
|
return track is not None
|
|
|
|
|
|
def tracks_like_artist(filter_str: str) -> list[TrackDTO]:
|
|
"""
|
|
Return tracks where artist is like filter
|
|
"""
|
|
|
|
return _tracks_where(Tracks.artist.ilike(f"%{filter_str}%"))
|
|
|
|
|
|
def tracks_like_title(filter_str: str) -> list[TrackDTO]:
|
|
"""
|
|
Return tracks where title is like filter
|
|
"""
|
|
|
|
return _tracks_where(Tracks.title.ilike(f"%{filter_str}%"))
|
|
|
|
|
|
def get_last_played_dates(track_id: int, limit: int = 5) -> str:
|
|
"""
|
|
Return the most recent 'limit' dates that this track has been played
|
|
as a text list
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
playdates = session.scalars(
|
|
Playdates.select()
|
|
.where(Playdates.track_id == track_id)
|
|
.order_by(Playdates.lastplayed.desc())
|
|
.limit(limit)
|
|
).all()
|
|
|
|
return "<br>".join(
|
|
[
|
|
a.lastplayed.strftime(Config.LAST_PLAYED_TOOLTIP_DATE_FORMAT)
|
|
for a in playdates
|
|
]
|
|
)
|
|
|
|
|
|
# Playlist functions
|
|
def _check_playlist_integrity(
|
|
session: Session, playlist_id: int, fix: bool = False
|
|
) -> None:
|
|
"""
|
|
Ensure the row numbers are contiguous. Fix and log if fix==True,
|
|
else raise ApplicationError.
|
|
"""
|
|
|
|
playlist_rows = (
|
|
session.execute(
|
|
select(PlaylistRows)
|
|
.where(PlaylistRows.playlist_id == playlist_id)
|
|
.order_by(PlaylistRows.row_number)
|
|
)
|
|
.scalars()
|
|
.all()
|
|
)
|
|
for idx, plr in enumerate(playlist_rows):
|
|
if plr.row_number == idx:
|
|
continue
|
|
|
|
msg = (
|
|
"_check_playlist_integrity: incorrect row number "
|
|
f"({plr.id=}, {plr.row_number=}, {idx=})"
|
|
)
|
|
if fix:
|
|
log.debug(msg)
|
|
plr.row_number = idx
|
|
else:
|
|
raise ApplicationError(msg)
|
|
|
|
|
|
@log_call
|
|
def _shift_rows(
|
|
session: Session, playlist_id: int, starting_row: int, shift_by: int
|
|
) -> None:
|
|
"""
|
|
Shift rows from starting_row by shift_by. If shift_by is +ve, shift rows
|
|
down; if -ve, shift them up.
|
|
"""
|
|
|
|
session.execute(
|
|
update(PlaylistRows)
|
|
.where(
|
|
(PlaylistRows.playlist_id == playlist_id),
|
|
(PlaylistRows.row_number >= starting_row),
|
|
)
|
|
.values(row_number=PlaylistRows.row_number + shift_by)
|
|
)
|
|
|
|
|
|
@log_call
|
|
def move_rows(
|
|
from_rows: list[int],
|
|
from_playlist_id: int,
|
|
to_row: int,
|
|
to_playlist_id: int | None = None,
|
|
) -> None:
|
|
"""
|
|
Move rows with or between playlists.
|
|
|
|
Algorithm:
|
|
- Sanity check row numbers
|
|
- Check there are no playlist rows with playlist_id == PENDING_MOVE
|
|
- Put rows to be moved into PENDING_MOVE playlist
|
|
- Resequence remaining row numbers
|
|
- Make space for moved rows
|
|
- Move the PENDING_MOVE rows back and fixup row numbers
|
|
- Sanity check row numbers
|
|
"""
|
|
|
|
# If to_playlist_id isn't specified, we're moving within the one
|
|
# playlist.
|
|
if to_playlist_id is None:
|
|
to_playlist_id = from_playlist_id
|
|
|
|
with db.Session() as session:
|
|
# Sanity check row numbers
|
|
_check_playlist_integrity(session, from_playlist_id, fix=False)
|
|
if from_playlist_id != to_playlist_id:
|
|
_check_playlist_integrity(session, to_playlist_id, fix=False)
|
|
|
|
# Check there are no playlist rows with playlist_id == PENDING_MOVE
|
|
pending_move_rows = get_playlist_rows(Config.PLAYLIST_PENDING_MOVE)
|
|
if pending_move_rows:
|
|
raise ApplicationError(f"move_rows_to_playlist: {pending_move_rows=}")
|
|
|
|
# We need playlist length if we're moving within a playlist. Get
|
|
# that now before we remove rows.
|
|
from_playlist_length = len(get_playlist_rows(from_playlist_id))
|
|
# Put rows to be moved into PENDING_MOVE playlist
|
|
session.execute(
|
|
update(PlaylistRows)
|
|
.where(
|
|
PlaylistRows.playlist_id == from_playlist_id,
|
|
PlaylistRows.row_number.in_(from_rows),
|
|
)
|
|
.values(playlist_id=Config.PLAYLIST_PENDING_MOVE)
|
|
)
|
|
|
|
# Resequence remaining row numbers
|
|
_check_playlist_integrity(session, from_playlist_id, fix=True)
|
|
session.commit()
|
|
|
|
# Make space for moved rows. If moving within one playlist,
|
|
# determning where to make the space is non-trivial. For example,
|
|
# if the playlist has ten entries and we're moving four of them
|
|
# to row 8, after we've moved the rows to the
|
|
# PLAYLIST_PENDING_MOVE there will only be six entries left.
|
|
# Clearly we can't make space at row 8...
|
|
space_row = to_row
|
|
if to_playlist_id == from_playlist_id:
|
|
overflow = max(to_row + len(from_rows) - from_playlist_length, 0)
|
|
if overflow != 0:
|
|
space_row = (
|
|
to_row - overflow - len([a for a in from_rows if a > to_row])
|
|
)
|
|
|
|
_shift_rows(session, to_playlist_id, space_row, len(from_rows))
|
|
|
|
# Move the PENDING_MOVE rows back and fixup row numbers
|
|
update_list: list[dict[str, int]] = []
|
|
next_row = space_row
|
|
# PLAYLIST_PENDING_MOVE may have gaps so don't check it
|
|
for row_to_move in get_playlist_rows(
|
|
Config.PLAYLIST_PENDING_MOVE, check_playlist_itegrity=False
|
|
):
|
|
update_list.append(
|
|
{"id": row_to_move.playlistrow_id, "row_number": next_row}
|
|
)
|
|
update_list.append(
|
|
{"id": row_to_move.playlistrow_id, "playlist_id": to_playlist_id}
|
|
)
|
|
next_row += 1
|
|
session.execute(update(PlaylistRows), update_list)
|
|
session.commit()
|
|
|
|
# Sanity check row numbers
|
|
_check_playlist_integrity(session, from_playlist_id, fix=False)
|
|
if from_playlist_id != to_playlist_id:
|
|
_check_playlist_integrity(session, to_playlist_id, fix=False)
|
|
|
|
|
|
def update_playdates(track_id: int) -> None:
|
|
"""
|
|
Update playdates for passed track
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
_ = Playdates(session, track_id)
|
|
|
|
|
|
def update_row_numbers(
|
|
playlist_id: int, id_to_row_number: list[dict[int, int]]
|
|
) -> None:
|
|
"""
|
|
Update playlistrows rownumbers for passed playlistrow_ids
|
|
playlist_id is only needed for sanity checking
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
session.execute(update(PlaylistRows), id_to_row_number)
|
|
session.commit()
|
|
|
|
# Sanity check
|
|
_check_playlist_integrity(session, playlist_id, fix=False)
|
|
|
|
|
|
def create_playlist(name: str, template_id: int) -> PlaylistDTO:
|
|
"""
|
|
Create playlist and return DTO.
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
try:
|
|
playlist = Playlists(session, name, template_id)
|
|
playlist_id = playlist.id
|
|
session.commit()
|
|
except Exception:
|
|
raise ApplicationError("Can't create Playlist")
|
|
|
|
new_playlist = playlist_by_id(playlist_id)
|
|
if not new_playlist:
|
|
raise ApplicationError("Can't retrieve new Playlist")
|
|
|
|
return new_playlist
|
|
|
|
|
|
def get_playlist_row(playlistrow_id: int) -> PlaylistRowDTO | None:
|
|
"""
|
|
Return specific row DTO
|
|
"""
|
|
|
|
# Alias PlaydatesTable for subquery
|
|
LatestPlaydate = aliased(Playdates)
|
|
|
|
# Subquery: latest playdate for each track
|
|
latest_playdate_subq = (
|
|
select(
|
|
LatestPlaydate.track_id,
|
|
func.max(LatestPlaydate.lastplayed).label("lastplayed"),
|
|
)
|
|
.group_by(LatestPlaydate.track_id)
|
|
.subquery()
|
|
)
|
|
|
|
stmt = (
|
|
select(
|
|
PlaylistRows.id.label("playlistrow_id"),
|
|
PlaylistRows.row_number,
|
|
PlaylistRows.note,
|
|
PlaylistRows.played,
|
|
PlaylistRows.playlist_id,
|
|
Tracks.id.label("track_id"),
|
|
Tracks.artist,
|
|
Tracks.bitrate,
|
|
Tracks.duration,
|
|
Tracks.fade_at,
|
|
Tracks.intro,
|
|
Tracks.path,
|
|
Tracks.silence_at,
|
|
Tracks.start_gap,
|
|
Tracks.title,
|
|
latest_playdate_subq.c.lastplayed,
|
|
)
|
|
.outerjoin(Tracks, PlaylistRows.track_id == Tracks.id)
|
|
.outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id)
|
|
.where(PlaylistRows.id == playlistrow_id)
|
|
.order_by(PlaylistRows.row_number)
|
|
)
|
|
|
|
with db.Session() as session:
|
|
record = session.execute(stmt).one_or_none()
|
|
if not record:
|
|
return None
|
|
|
|
# Handle cases where track_id is None (no track associated)
|
|
if record.track_id is None:
|
|
dto = PlaylistRowDTO(
|
|
artist="",
|
|
bitrate=0,
|
|
duration=0,
|
|
fade_at=0,
|
|
intro=None,
|
|
lastplayed=None,
|
|
note=record.note,
|
|
path="",
|
|
played=record.played,
|
|
playlist_id=record.playlist_id,
|
|
playlistrow_id=record.playlistrow_id,
|
|
row_number=record.row_number,
|
|
silence_at=0,
|
|
start_gap=0,
|
|
title="",
|
|
track_id=-1,
|
|
)
|
|
else:
|
|
dto = PlaylistRowDTO(
|
|
artist=record.artist,
|
|
bitrate=record.bitrate,
|
|
duration=record.duration,
|
|
fade_at=record.fade_at,
|
|
intro=record.intro,
|
|
lastplayed=record.lastplayed,
|
|
note=record.note,
|
|
path=record.path,
|
|
played=record.played,
|
|
playlist_id=record.playlist_id,
|
|
playlistrow_id=record.playlistrow_id,
|
|
row_number=record.row_number,
|
|
silence_at=record.silence_at,
|
|
start_gap=record.start_gap,
|
|
title=record.title,
|
|
track_id=record.track_id,
|
|
)
|
|
|
|
return dto
|
|
|
|
|
|
def get_playlist_rows(
|
|
playlist_id: int, check_playlist_itegrity: bool = True
|
|
) -> list[PlaylistRowDTO]:
|
|
# Alias PlaydatesTable for subquery
|
|
LatestPlaydate = aliased(Playdates)
|
|
|
|
# Subquery: latest playdate for each track
|
|
latest_playdate_subq = (
|
|
select(
|
|
LatestPlaydate.track_id,
|
|
func.max(LatestPlaydate.lastplayed).label("lastplayed"),
|
|
)
|
|
.group_by(LatestPlaydate.track_id)
|
|
.subquery()
|
|
)
|
|
|
|
stmt = (
|
|
select(
|
|
PlaylistRows.id.label("playlistrow_id"),
|
|
PlaylistRows.row_number,
|
|
PlaylistRows.note,
|
|
PlaylistRows.played,
|
|
PlaylistRows.playlist_id,
|
|
Tracks.id.label("track_id"),
|
|
Tracks.artist,
|
|
Tracks.bitrate,
|
|
Tracks.duration,
|
|
Tracks.fade_at,
|
|
Tracks.intro,
|
|
Tracks.path,
|
|
Tracks.silence_at,
|
|
Tracks.start_gap,
|
|
Tracks.title,
|
|
latest_playdate_subq.c.lastplayed,
|
|
)
|
|
.outerjoin(Tracks, PlaylistRows.track_id == Tracks.id)
|
|
.outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id)
|
|
.where(PlaylistRows.playlist_id == playlist_id)
|
|
.order_by(PlaylistRows.row_number)
|
|
)
|
|
|
|
with db.Session() as session:
|
|
results = session.execute(stmt).all()
|
|
# Sanity check
|
|
# TODO: would be good to be confident at removing this
|
|
if check_playlist_itegrity:
|
|
_check_playlist_integrity(
|
|
session=session, playlist_id=playlist_id, fix=False
|
|
)
|
|
|
|
dto_list = []
|
|
for row in results:
|
|
# Handle cases where track_id is None (no track associated)
|
|
if row.track_id is None:
|
|
dto = PlaylistRowDTO(
|
|
artist="",
|
|
bitrate=0,
|
|
duration=0,
|
|
fade_at=0,
|
|
intro=None,
|
|
lastplayed=None,
|
|
note=row.note,
|
|
path="",
|
|
played=row.played,
|
|
playlist_id=row.playlist_id,
|
|
playlistrow_id=row.playlistrow_id,
|
|
row_number=row.row_number,
|
|
silence_at=0,
|
|
start_gap=0,
|
|
title="",
|
|
track_id=-1,
|
|
# Additional fields like row_fg, row_bg, etc., use default None values
|
|
)
|
|
else:
|
|
dto = PlaylistRowDTO(
|
|
artist=row.artist,
|
|
bitrate=row.bitrate,
|
|
duration=row.duration,
|
|
fade_at=row.fade_at,
|
|
intro=row.intro,
|
|
lastplayed=row.lastplayed,
|
|
note=row.note,
|
|
path=row.path,
|
|
played=row.played,
|
|
playlist_id=row.playlist_id,
|
|
playlistrow_id=row.playlistrow_id,
|
|
row_number=row.row_number,
|
|
silence_at=row.silence_at,
|
|
start_gap=row.start_gap,
|
|
title=row.title,
|
|
track_id=row.track_id,
|
|
# Additional fields like row_fg, row_bg, etc., use default None values
|
|
)
|
|
dto_list.append(dto)
|
|
|
|
return dto_list
|
|
|
|
|
|
def insert_row(
|
|
playlist_id: int, row_number: int, track_id: int | None, note: str
|
|
) -> PlaylistRowDTO:
|
|
"""
|
|
Insert a new row into playlist and return new row DTO
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
# Sanity check
|
|
_check_playlist_integrity(session, playlist_id, fix=False)
|
|
|
|
# Make space for new row
|
|
_shift_rows(
|
|
session=session,
|
|
playlist_id=playlist_id,
|
|
starting_row=row_number,
|
|
shift_by=1,
|
|
)
|
|
|
|
playlist_row = PlaylistRows.insert_row(
|
|
session=session,
|
|
playlist_id=playlist_id,
|
|
new_row_number=row_number,
|
|
note=note,
|
|
track_id=track_id,
|
|
)
|
|
session.commit()
|
|
playlist_row_id = playlist_row.id
|
|
|
|
# Sanity check
|
|
_check_playlist_integrity(session, playlist_id, fix=False)
|
|
|
|
new_playlist_row = get_playlist_row(playlistrow_id=playlist_row_id)
|
|
if not new_playlist_row:
|
|
raise ApplicationError("Can't retrieve new playlist row")
|
|
|
|
return new_playlist_row
|
|
|
|
|
|
@log_call
|
|
def remove_comments(playlist_id: int, row_numbers: list[int]) -> None:
|
|
"""
|
|
Remove comments from rows in playlist
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
session.execute(
|
|
update(PlaylistRows)
|
|
.where(
|
|
PlaylistRows.playlist_id == playlist_id,
|
|
PlaylistRows.row_number.in_(row_numbers),
|
|
)
|
|
.values(note="")
|
|
)
|
|
session.commit()
|
|
|
|
|
|
@log_call
|
|
def remove_rows(playlist_id: int, row_numbers: list[int]) -> None:
|
|
"""
|
|
Remove rows from playlist
|
|
|
|
Delete from highest row back so that not yet deleted row numbers don't change.
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
for row_number in sorted(row_numbers, reverse=True):
|
|
session.execute(
|
|
delete(PlaylistRows).where(
|
|
PlaylistRows.playlist_id == playlist_id,
|
|
PlaylistRows.row_number == row_number,
|
|
)
|
|
)
|
|
# Fixup row number to remove gaps
|
|
_check_playlist_integrity(session, playlist_id, fix=True)
|
|
|
|
session.commit()
|
|
|
|
|
|
def playlist_by_id(playlist_id: int) -> PlaylistDTO | None:
|
|
"""
|
|
Return playlist with specified id
|
|
"""
|
|
|
|
stmt = select(
|
|
Playlists.id.label("playlist_id"),
|
|
Playlists.name,
|
|
Playlists.favourite,
|
|
Playlists.is_template,
|
|
Playlists.open,
|
|
).where(Playlists.id == playlist_id)
|
|
|
|
with db.Session() as session:
|
|
record = session.execute(stmt).one_or_none()
|
|
if not record:
|
|
return None
|
|
|
|
dto = PlaylistDTO(
|
|
name=record.name,
|
|
playlist_id=record.playlist_id,
|
|
favourite=record.favourite,
|
|
is_template=record.is_template,
|
|
open=record.open,
|
|
)
|
|
|
|
return dto
|
|
|
|
|
|
# Misc
|
|
def get_setting(name: str) -> int | None:
|
|
"""
|
|
Get int setting
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
record = (
|
|
session.execute(select(Settings).where(Settings.name == name))
|
|
.scalars()
|
|
.one_or_none()
|
|
)
|
|
if not record:
|
|
return None
|
|
|
|
return record.f_int
|
|
|
|
|
|
def set_setting(name: str, value: int) -> None:
|
|
"""
|
|
Add int setting
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
record = (
|
|
session.execute(select(Settings).where(Settings.name == name))
|
|
.scalars()
|
|
.one_or_none()
|
|
)
|
|
if not record:
|
|
record = Settings(session=session, name=name)
|
|
if not record:
|
|
raise ApplicationError("Can't create Settings record")
|
|
record.f_int = value
|
|
session.commit()
|