WIP: Make TrackDTO a member, not parent of, PlaylistRow

This commit is contained in:
Keith Edmunds 2025-04-11 09:23:17 +01:00
parent f9c8541b17
commit 83f5ad5303
3 changed files with 131 additions and 204 deletions

View File

@ -80,12 +80,13 @@ class TrackDTO:
@dataclass
class PlaylistRowDTO(TrackDTO):
class PlaylistRowDTO:
note: str
played: bool
playlist_id: int
playlistrow_id: int
row_number: int
track: TrackDTO | None
@dataclass

View File

@ -52,17 +52,23 @@ class PlaylistRow:
self.start_time: dt.datetime | None = None
def __repr__(self) -> str:
track_id = None
if self.dto.track:
track_id = self.dto.track.track_id
return (
f"<PlaylistRow(playlist_id={self.dto.playlist_id}, "
f"row_number={self.dto.row_number}, "
f"playlistrow_id={self.dto.playlistrow_id}, "
f"note={self.dto.note}, track_id={self.dto.track_id}>"
f"note={self.dto.note}, track_id={track_id}>"
)
# Expose TrackDTO fields as properties
@property
def artist(self):
return self.dto.artist
if self.dto.track:
return self.dto.track.artist
else:
return ""
@artist.setter
def artist(self, value: str) -> None:
@ -70,19 +76,31 @@ class PlaylistRow:
@property
def bitrate(self):
return self.dto.bitrate
if self.dto.track:
return self.dto.track.bitrate
else:
return 0
@property
def duration(self):
return self.dto.duration
if self.dto.track:
return self.dto.track.duration
else:
return 0
@property
def fade_at(self):
return self.dto.fade_at
if self.dto.track:
return self.dto.track.fade_at
else:
return 0
@property
def intro(self):
return self.dto.intro
if self.dto.track:
return self.dto.track.intro
else:
return 0
@intro.setter
def intro(self, value: int) -> None:
@ -90,23 +108,38 @@ class PlaylistRow:
@property
def lastplayed(self):
return self.dto.lastplayed
if self.dto.track:
return self.dto.track.lastplayed
else:
return None
@property
def path(self):
return self.dto.path
if self.dto.track:
return self.dto.track.path
else:
return ""
@property
def silence_at(self):
return self.dto.silence_at
if self.dto.track:
return self.dto.track.silence_at
else:
return 0
@property
def start_gap(self):
return self.dto.start_gap
if self.dto.track:
return self.dto.track.start_gap
else:
return 0
@property
def title(self):
return self.dto.title
if self.dto.track:
return self.dto.track.title
else:
return ""
@title.setter
def title(self, value: str) -> None:
@ -114,10 +147,13 @@ class PlaylistRow:
@property
def track_id(self):
return self.dto.track_id
if self.dto.track:
return self.dto.track.track_id
else:
return 0
@track_id.setter
def track_id(self, value: int) -> None:
def track_id(self, track_id: int) -> None:
"""
Adding a track_id should only happen to a header row.
"""
@ -125,6 +161,14 @@ class PlaylistRow:
if self.track_id > 0:
raise ApplicationError("Attempting to add track to row with existing track ({self=}")
repository.add_track_to_header(track_id)
# Need to update with track information
track = repository.track_by_id(track_id)
if track:
for attr, value in track.__dataclass_fields__.items():
setattr(self, attr, value)
# TODO: set up write access to track_id. Should only update if
# track_id == 0. Need to update all other track fields at the
# same time.

View File

@ -730,196 +730,6 @@ def create_playlist(name: str, template_id: int, as_template: bool = False) -> P
return new_playlist
@log_call
def get_playlist_row(playlistrow_id: int) -> PlaylistRowDTO | None:
"""
Return specific row DTO
"""
# Alias PlaydatesTable for subquery
LatestPlaydate = aliased(Playdates)
# Subquery: latest playdate for each track
latest_playdate_subq = (
select(
LatestPlaydate.track_id,
func.max(LatestPlaydate.lastplayed).label("lastplayed"),
)
.group_by(LatestPlaydate.track_id)
.subquery()
)
stmt = (
select(
PlaylistRows.id.label("playlistrow_id"),
PlaylistRows.row_number,
PlaylistRows.note,
PlaylistRows.played,
PlaylistRows.playlist_id,
Tracks.id.label("track_id"),
Tracks.artist,
Tracks.bitrate,
Tracks.duration,
Tracks.fade_at,
Tracks.intro,
Tracks.path,
Tracks.silence_at,
Tracks.start_gap,
Tracks.title,
latest_playdate_subq.c.lastplayed,
)
.outerjoin(Tracks, PlaylistRows.track_id == Tracks.id)
.outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id)
.where(PlaylistRows.id == playlistrow_id)
.order_by(PlaylistRows.row_number)
)
with db.Session() as session:
record = session.execute(stmt).one_or_none()
if not record:
return None
# Handle cases where track_id is None (no track associated)
if record.track_id is None:
dto = PlaylistRowDTO(
artist="",
bitrate=0,
duration=0,
fade_at=0,
intro=None,
lastplayed=None,
note=record.note,
path="",
played=record.played,
playlist_id=record.playlist_id,
playlistrow_id=record.playlistrow_id,
row_number=record.row_number,
silence_at=0,
start_gap=0,
title="",
track_id=-1,
)
else:
dto = PlaylistRowDTO(
artist=record.artist,
bitrate=record.bitrate,
duration=record.duration,
fade_at=record.fade_at,
intro=record.intro,
lastplayed=record.lastplayed,
note=record.note,
path=record.path,
played=record.played,
playlist_id=record.playlist_id,
playlistrow_id=record.playlistrow_id,
row_number=record.row_number,
silence_at=record.silence_at,
start_gap=record.start_gap,
title=record.title,
track_id=record.track_id,
)
return dto
def get_playlist_rows(
playlist_id: int, check_playlist_itegrity: bool = True
) -> list[PlaylistRowDTO]:
# Alias PlaydatesTable for subquery
LatestPlaydate = aliased(Playdates)
# Subquery: latest playdate for each track
latest_playdate_subq = (
select(
LatestPlaydate.track_id,
func.max(LatestPlaydate.lastplayed).label("lastplayed"),
)
.group_by(LatestPlaydate.track_id)
.subquery()
)
stmt = (
select(
PlaylistRows.id.label("playlistrow_id"),
PlaylistRows.row_number,
PlaylistRows.note,
PlaylistRows.played,
PlaylistRows.playlist_id,
Tracks.id.label("track_id"),
Tracks.artist,
Tracks.bitrate,
Tracks.duration,
Tracks.fade_at,
Tracks.intro,
Tracks.path,
Tracks.silence_at,
Tracks.start_gap,
Tracks.title,
latest_playdate_subq.c.lastplayed,
)
.outerjoin(Tracks, PlaylistRows.track_id == Tracks.id)
.outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id)
.where(PlaylistRows.playlist_id == playlist_id)
.order_by(PlaylistRows.row_number)
)
with db.Session() as session:
results = session.execute(stmt).all()
# Sanity check
# TODO: would be good to be confident at removing this
if check_playlist_itegrity:
_check_playlist_integrity(
session=session, playlist_id=playlist_id, fix=False
)
dto_list = []
for row in results:
# Handle cases where track_id is None (no track associated)
if row.track_id is None:
dto = PlaylistRowDTO(
artist="",
bitrate=0,
duration=0,
fade_at=0,
intro=None,
lastplayed=None,
note=row.note,
path="",
played=row.played,
playlist_id=row.playlist_id,
playlistrow_id=row.playlistrow_id,
row_number=row.row_number,
silence_at=0,
start_gap=0,
title="",
track_id=-1,
# Additional fields like row_fg, row_bg, etc., use default None values
)
else:
dto = PlaylistRowDTO(
artist=row.artist,
bitrate=row.bitrate,
duration=row.duration,
fade_at=row.fade_at,
intro=row.intro,
lastplayed=row.lastplayed,
note=row.note,
path=row.path,
played=row.played,
playlist_id=row.playlist_id,
playlistrow_id=row.playlistrow_id,
row_number=row.row_number,
silence_at=row.silence_at,
start_gap=row.start_gap,
title=row.title,
track_id=row.track_id,
# Additional fields like row_fg, row_bg, etc., use default None values
)
dto_list.append(dto)
return dto_list
def copy_playlist(src_id: int, dst_id: int) -> None:
"""Copy playlist entries"""
@ -1068,6 +878,78 @@ def playlist_save_tabs(playlist_id_to_tab: dict[int, int]) -> None:
session.commit()
# Playlist Rows
@log_call
def get_playlist_row(playlistrow_id: int) -> PlaylistRowDTO | None:
"""
Return specific row DTO
"""
with db.Session() as session:
record = (
session.execute(select(PlaylistRows).where(PlaylistRows.id == playlistrow_id))
.scalars()
.one_or_none()
)
if not record:
return None
track = None
if record.track_id:
track = track_by_id(record.track_id)
dto = PlaylistRowDTO(
note=record.note,
played=record.played,
playlist_id=record.playlist_id,
playlistrow_id=record.playlistrow_id,
row_number=record.row_number,
track=track,
)
return dto
def get_playlist_rows(
playlist_id: int, check_playlist_itegrity: bool = True
) -> list[PlaylistRowDTO]:
with db.Session() as session:
# TODO: would be good to be confident at removing this
if check_playlist_itegrity:
_check_playlist_integrity(
session=session, playlist_id=playlist_id, fix=False
)
records = session.scalars(
select(PlaylistRows)
.where(PlaylistRows.playlist_id == playlist_id)
.order_by(PlaylistRows.row_number)
).all()
dto_list = []
for record in records:
track = None
if record.track_id:
track = track_by_id(record.track_id)
dto = PlaylistRowDTO(
note=record.note,
played=record.played,
playlist_id=record.playlist_id,
playlistrow_id=record.id,
row_number=record.row_number,
track=track,
)
dto_list.append(dto)
return dto_list
# Playdates
@log_call
def get_last_played_dates(track_id: int, limit: int = 5) -> str: