WIP: Use PlaylistRowDTO to isolate SQLAlchemy objects

This commit is contained in:
Keith Edmunds 2025-03-14 13:21:46 +00:00
parent 1749f0a0b8
commit 9e07e73167
5 changed files with 180 additions and 21 deletions

View File

@ -1,7 +1,7 @@
# Standard library imports
from __future__ import annotations
from dataclasses import dataclass
import datetime as dt
from enum import auto, Enum
import functools
import threading
@ -149,6 +149,42 @@ class Tags(NamedTuple):
duration: int = 0
@dataclass
class TrackDTO:
track_id: int
artist: str
bitrate: int
duration: int
fade_at: int
intro: int | None
path: str
silence_at: int
start_gap: int
title: str
lastplayed: dt.datetime | None
@dataclass
class PlaylistRowDTO(TrackDTO):
note: str
played: bool
playlist_id: int
playlistrow_id: int
row_number: int
row_fg: str | None = None
row_bg: str | None = None
note_fg: str | None = None
note_bg: str | None = None
end_of_track_signalled: bool = False
end_time: dt.datetime | None = None
# fade_graph: FadeCurve | None = None
fade_graph_start_updates: dt.datetime | None = None
resume_marker: float = 0.0
forecast_end_time: dt.datetime | None = None
forecast_start_time: dt.datetime | None = None
start_time: dt.datetime | None = None
class TrackInfo(NamedTuple):
track_id: int
row_number: int

View File

@ -89,7 +89,7 @@ class _AddFadeCurve(QObject):
Create fade curve and add to PlaylistTrack object
"""
fc = _FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at)
fc = FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at)
if not fc:
log.error(f"Failed to create FadeCurve for {self.track_path=}")
else:
@ -97,7 +97,7 @@ class _AddFadeCurve(QObject):
self.finished.emit()
class _FadeCurve:
class FadeCurve:
GraphWidget: Optional[PlotWidget] = None
def __init__(
@ -478,7 +478,7 @@ class RowAndTrack:
# Track playing data
self.end_of_track_signalled: bool = False
self.end_time: Optional[dt.datetime] = None
self.fade_graph: Optional[_FadeCurve] = None
self.fade_graph: Optional[FadeCurve] = None
self.fade_graph_start_updates: Optional[dt.datetime] = None
self.resume_marker: Optional[float] = 0.0
self.forecast_end_time: Optional[dt.datetime] = None

View File

@ -1255,9 +1255,6 @@ class Window(QMainWindow):
# # # # # # # # # # Internal utility functions # # # # # # # # # #
def active_base_model(self) -> PlaylistModel:
return self.current.base_model
def active_tab(self) -> PlaylistTab:
return self.playlist_section.tabPlaylist.currentWidget()

View File

@ -35,6 +35,7 @@ from classes import (
ApplicationError,
Col,
MusicMusterSignals,
PlaylistRowDTO,
)
from config import Config
from helpers import (
@ -49,6 +50,7 @@ from helpers import (
from log import log
from models import db, NoteColours, Playdates, PlaylistRows, Tracks
from music_manager import RowAndTrack, track_sequence
from repository import get_playlist_rows
HEADER_NOTES_COLUMN = 1
@ -83,7 +85,8 @@ class PlaylistModel(QAbstractTableModel):
self.playlist_id = playlist_id
self.is_template = is_template
self.playlist_rows: dict[int, RowAndTrack] = {}
self.playlist_rows: dict[int, PlaylistRowDTO] = {}
self.selected_rows: list[PlaylistRowDTO] = []
self.signals = MusicMusterSignals()
self.played_tracks_hidden = False
@ -820,20 +823,24 @@ class PlaylistModel(QAbstractTableModel):
# We used to clear self.playlist_rows each time but that's
# expensive and slow on big playlists
# Note where each playlist_id is
plid_to_row: dict[int, int] = {}
for oldrow in self.playlist_rows:
plrdata = self.playlist_rows[oldrow]
plid_to_row[plrdata.playlistrow_id] = plrdata.row_number
# # Note where each playlist_id is
# plid_to_row: dict[int, int] = {}
# for oldrow in self.playlist_rows:
# plrdata = self.playlist_rows[oldrow]
# plid_to_row[plrdata.playlistrow_id] = plrdata.row_number
# build a new playlist_rows
new_playlist_rows: dict[int, RowAndTrack] = {}
for p in PlaylistRows.get_playlist_rows(session, self.playlist_id):
if p.id not in plid_to_row:
new_playlist_rows[p.row_number] = RowAndTrack(p)
else:
new_playlist_rows[p.row_number] = self.playlist_rows[plid_to_row[p.id]]
new_playlist_rows[p.row_number].row_number = p.row_number
# new_playlist_rows: dict[int, RowAndTrack] = {}
# for p in PlaylistRows.get_playlist_rows(session, self.playlist_id):
# if p.id not in plid_to_row:
# new_playlist_rows[p.row_number] = RowAndTrack(p)
# else:
# new_playlist_rows[p.row_number] = self.playlist_rows[plid_to_row[p.id]]
# new_playlist_rows[p.row_number].row_number = p.row_number
# build a new playlist_rows
new_playlist_rows: dict[int, PlaylistRowDTO] = {}
for p in get_playlist_rows(self.playlist_id):
new_playlist_rows[p.row_number] = p
# Copy to self.playlist_rows
self.playlist_rows = new_playlist_rows
@ -1725,7 +1732,7 @@ class PlaylistModel(QAbstractTableModel):
continue
# Set start/end
next_start_time = rat.set_forecast_start_time(update_rows, next_start_time)
rat.forecast_start_time = next_start_time
# Update start/stop times of rows that have changed
for updated_row in update_rows:

119
app/repository.py Normal file
View File

@ -0,0 +1,119 @@
# Standard library imports
# PyQt imports
# Third party imports
from sqlalchemy import select, func
from sqlalchemy.orm import aliased
from classes import PlaylistRowDTO
# App imports
from classes import TrackDTO
from models import db, Tracks, PlaylistRows, Playdates
def tracks_like_title(filter_str: str) -> list[TrackDTO]:
"""
Return tracks where title is like filter
"""
# TODO: add in playdates as per Tracks.search_titles
with db.Session() as session:
stmt = select(Tracks).where(Tracks.title.ilike(f"%{filter_str}%"))
results = (
session.execute(stmt).scalars().unique().all()
) # `scalars()` extracts ORM objects
return [
TrackDTO(**{k: v for k, v in vars(t).items() if not k.startswith("_")})
for t in results
]
def get_playlist_rows(playlist_id: int) -> list[PlaylistRowDTO]:
# Alias PlaydatesTable for subquery
LatestPlaydate = aliased(Playdates)
# Subquery: latest playdate for each track
latest_playdate_subq = (
select(
LatestPlaydate.track_id,
func.max(LatestPlaydate.lastplayed).label("lastplayed")
)
.group_by(LatestPlaydate.track_id)
.subquery()
)
stmt = (
select(
PlaylistRows.id.label("playlistrow_id"),
PlaylistRows.row_number,
PlaylistRows.note,
PlaylistRows.played,
PlaylistRows.playlist_id,
Tracks.id.label("track_id"),
Tracks.artist,
Tracks.bitrate,
Tracks.duration,
Tracks.fade_at,
Tracks.intro,
Tracks.path,
Tracks.silence_at,
Tracks.start_gap,
Tracks.title,
latest_playdate_subq.c.lastplayed,
)
.outerjoin(Tracks, PlaylistRows.track_id == Tracks.id)
.outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id)
.where(PlaylistRows.playlist_id == playlist_id)
.order_by(PlaylistRows.row_number)
)
with db.Session() as session:
results = session.execute(stmt).all()
dto_list = []
for row in results:
# Handle cases where track_id is None (no track associated)
if row.track_id is None:
dto = PlaylistRowDTO(
artist="",
bitrate=0,
duration=0,
fade_at=0,
intro=None,
lastplayed=None,
note=row.note,
path="",
played=row.played,
playlist_id=row.playlist_id,
playlistrow_id=row.playlistrow_id,
row_number=row.row_number,
silence_at=0,
start_gap=0,
title="",
track_id=-1,
# Additional fields like row_fg, row_bg, etc., use default None values
)
else:
dto = PlaylistRowDTO(
artist=row.artist,
bitrate=row.bitrate,
duration=row.duration,
fade_at=row.fade_at,
intro=row.intro,
lastplayed=row.lastplayed,
note=row.note,
path=row.path,
played=row.played,
playlist_id=row.playlist_id,
playlistrow_id=row.playlistrow_id,
row_number=row.row_number,
silence_at=row.silence_at,
start_gap=row.start_gap,
title=row.title,
track_id=row.track_id,
# Additional fields like row_fg, row_bg, etc., use default None values
)
dto_list.append(dto)
return dto_list