WIP remove sessions, use reporistory

This commit is contained in:
Keith Edmunds 2025-03-17 18:43:46 +00:00
parent e733e7025d
commit e40a4ab57a
10 changed files with 753 additions and 240 deletions

View File

@ -5,7 +5,7 @@ import datetime as dt
from enum import auto, Enum
import functools
import threading
from typing import Any, NamedTuple
from typing import NamedTuple
# Third party imports
@ -125,6 +125,15 @@ class Tags(NamedTuple):
duration: int = 0
@dataclass
class PlaylistDTO:
name: str
playlist_id: int
favourite: bool = False
is_template: bool = False
open: bool = False
@dataclass
class TrackDTO:
track_id: int
@ -141,34 +150,12 @@ class TrackDTO:
@dataclass
class PlaylistRowObj(TrackDTO):
class PlaylistRowDTO(TrackDTO):
note: str
played: bool
playlist_id: int
playlistrow_id: int
row_number: int
row_fg: str | None = None
row_bg: str | None = None
note_fg: str | None = None
note_bg: str | None = None
end_of_track_signalled: bool = False
end_time: dt.datetime | None = None
fade_graph: Any | None = None
fade_graph_start_updates: dt.datetime | None = None
resume_marker: float = 0.0
forecast_end_time: dt.datetime | None = None
forecast_start_time: dt.datetime | None = None
start_time: dt.datetime | None = None
def is_playing(self) -> bool:
"""
Return True if we're currently playing else False
"""
if self.start_time is None:
return False
return True
class TrackInfo(NamedTuple):
@ -193,8 +180,10 @@ class MusicMusterSignals(QObject):
search_songfacts_signal = pyqtSignal(str)
search_wikipedia_signal = pyqtSignal(str)
show_warning_signal = pyqtSignal(str, str)
signal_add_track_to_header = pyqtSignal(int, int)
signal_set_next_row = pyqtSignal(int)
signal_set_next_track = pyqtSignal(PlaylistRowObj)
# TODO: undestirable (and unresolvable) reference
# signal_set_next_track = pyqtSignal(PlaylistRow)
span_cells_signal = pyqtSignal(int, int, int, int, int)
status_message_signal = pyqtSignal(str, int)
track_ended_signal = pyqtSignal()

View File

@ -20,7 +20,7 @@ from pydub.utils import mediainfo
from tinytag import TinyTag, TinyTagException # type: ignore
# App imports
from classes import AudioMetadata, ApplicationError, Tags
from classes import AudioMetadata, ApplicationError, Tags, TrackDTO
from config import Config
from log import log
from models import Tracks

View File

@ -241,7 +241,9 @@ class Playlists(dbtables.PlaylistsTable):
"""
session.execute(
update(Playlists).where((Playlists.id.in_(playlist_ids))).values(tab=None)
update(Playlists)
.where(Playlists.id.in_(playlist_ids))
.values(tab=None)
)
def close(self, session: Session) -> None:

View File

@ -3,7 +3,7 @@ from __future__ import annotations
import datetime as dt
from time import sleep
from typing import Optional
from typing import Any, Optional
# Third party imports
# import line_profiler
@ -27,7 +27,7 @@ from classes import ApplicationError, MusicMusterSignals
from config import Config
import helpers
from log import log
from models import PlaylistRows
from repository import PlaylistRowDTO
from vlcmanager import VLCManager
# Define the VLC callback function type
@ -73,13 +73,13 @@ class _AddFadeCurve(QObject):
def __init__(
self,
rat: RowAndTrack,
plr: PlaylistRow,
track_path: str,
track_fade_at: int,
track_silence_at: int,
) -> None:
super().__init__()
self.rat = rat
self.plr = plr
self.track_path = track_path
self.track_fade_at = track_fade_at
self.track_silence_at = track_silence_at
@ -93,7 +93,7 @@ class _AddFadeCurve(QObject):
if not fc:
log.error(f"Failed to create FadeCurve for {self.track_path=}")
else:
self.rat.fade_graph = fc
self.plr.fade_graph = fc
self.finished.emit()
@ -271,11 +271,11 @@ class _Music:
elapsed_seconds = (now - self.start_dt).total_seconds()
return int(elapsed_seconds * 1000)
def get_position(self) -> Optional[float]:
def get_position(self) -> float:
"""Return current position"""
if not self.player:
return None
return 0.0
return self.player.get_position()
def is_playing(self) -> bool:
@ -383,84 +383,138 @@ class _Music:
self.player = None
class RowAndTrack:
class PlaylistRow:
"""
Object to manage playlist rows and tracks.
Object to manage playlist row and track.
"""
def __init__(self, playlist_row: PlaylistRows) -> None:
def __init__(self, dto: PlaylistRowDTO) -> None:
"""
Initialises data structure.
The passed PlaylistRows object will include a Tracks object if this
row has a track.
The dto object will include a Tracks object if this row has a track.
"""
# Collect playlistrow data
self.note = playlist_row.note
self.played = playlist_row.played
self.playlist_id = playlist_row.playlist_id
self.playlistrow_id = playlist_row.id
self.row_number = playlist_row.row_number
self.track_id = playlist_row.track_id
# Playlist display data
self.row_fg: Optional[str] = None
self.row_bg: Optional[str] = None
self.note_fg: Optional[str] = None
self.note_bg: Optional[str] = None
# Collect track data if there's a track
if playlist_row.track_id:
self.artist = playlist_row.track.artist
self.bitrate = playlist_row.track.bitrate
self.duration = playlist_row.track.duration
self.fade_at = playlist_row.track.fade_at
self.intro = playlist_row.track.intro
if playlist_row.track.playdates:
self.lastplayed = max(
[a.lastplayed for a in playlist_row.track.playdates]
)
else:
self.lastplayed = Config.EPOCH
self.path = playlist_row.track.path
self.silence_at = playlist_row.track.silence_at
self.start_gap = playlist_row.track.start_gap
self.title = playlist_row.track.title
else:
self.artist = ""
self.bitrate = 0
self.duration = 0
self.fade_at = 0
self.intro = None
self.lastplayed = Config.EPOCH
self.path = ""
self.silence_at = 0
self.start_gap = 0
self.title = ""
# Track playing data
self.end_of_track_signalled: bool = False
self.end_time: Optional[dt.datetime] = None
self.fade_graph: Optional[FadeCurve] = None
self.fade_graph_start_updates: Optional[dt.datetime] = None
self.resume_marker: Optional[float] = 0.0
self.forecast_end_time: Optional[dt.datetime] = None
self.forecast_start_time: Optional[dt.datetime] = None
self.start_time: Optional[dt.datetime] = None
# Other object initialisation
self.dto = dto
self.music = _Music(name=Config.VLC_MAIN_PLAYER_NAME)
self.signals = MusicMusterSignals()
self.end_of_track_signalled: bool = False
self.end_time: dt.datetime | None = None
self.fade_graph: Any | None = None
self.fade_graph_start_updates: dt.datetime | None = None
self.forecast_end_time: dt.datetime | None = None
self.forecast_start_time: dt.datetime | None = None
self.note_bg: str | None = None
self.note_fg: str | None = None
self.resume_marker: float = 0.0
self.row_bg: str | None = None
self.row_fg: str | None = None
self.start_time: dt.datetime | None = None
def __repr__(self) -> str:
return (
f"<RowAndTrack(playlist_id={self.playlist_id}, "
f"row_number={self.row_number}, "
f"playlistrow_id={self.playlistrow_id}, "
f"note={self.note}, track_id={self.track_id}>"
f"<PlaylistRow(playlist_id={self.dto.playlist_id}, "
f"row_number={self.dto.row_number}, "
f"playlistrow_id={self.dto.playlistrow_id}, "
f"note={self.dto.note}, track_id={self.dto.track_id}>"
)
# Expose TrackDTO fields as properties
@property
def artist(self):
return self.dto.artist
@property
def bitrate(self):
return self.dto.bitrate
@property
def duration(self):
return self.dto.duration
@property
def fade_at(self):
return self.dto.fade_at
@property
def intro(self):
return self.dto.intro
@property
def lastplayed(self):
return self.dto.lastplayed
@property
def path(self):
return self.dto.path
@property
def silence_at(self):
return self.dto.silence_at
@property
def start_gap(self):
return self.dto.start_gap
@property
def title(self):
return self.dto.title
@property
def track_id(self):
return self.dto.track_id
@track_id.setter
def track_id(self, value: int) -> None:
"""
Adding a track_id should only happen to a header row.
"""
if self.track_id:
raise ApplicationError("Attempting to add track to row with existing track ({self=}")
# TODO: set up write access to track_id. Should only update if
# track_id == 0. Need to update all other track fields at the
# same time.
print("set track_id attribute for {self=}, {value=}")
pass
# Expose PlaylistRowDTO fields as properties
@property
def note(self):
return self.dto.note
@note.setter
def note(self, value: str) -> None:
# TODO set up write access to db
print("set note attribute for {self=}, {value=}")
# self.dto.note = value
@property
def played(self):
return self.dto.played
@played.setter
def played(self, value: bool = True) -> None:
# TODO set up write access to db
print("set played attribute for {self=}")
# self.dto.played = value
@property
def playlist_id(self):
return self.dto.playlist_id
@property
def playlistrow_id(self):
return self.dto.playlistrow_id
@property
def row_number(self):
return self.dto.row_number
@row_number.setter
def row_number(self, value: int) -> None:
# TODO do we need to set up write access to db?
self.dto.row_number = value
def check_for_end_of_track(self) -> None:
"""
Check whether track has ended. If so, emit track_ended_signal
@ -552,7 +606,7 @@ class RowAndTrack:
changed = True
new_start_time = None
else:
end_time = start + dt.timedelta(milliseconds=self.duration)
end_time = start + dt.timedelta(milliseconds=self.duration())
new_start_time = end_time
if self.forecast_end_time != end_time:
self.forecast_end_time = end_time
@ -640,19 +694,21 @@ class RowAndTrack:
Update local playlist_id and row_number from playlistrow_id
"""
plr = session.get(PlaylistRows, self.playlistrow_id)
if not plr:
raise ApplicationError(f"(Can't retrieve PlaylistRows entry, {self=}")
self.playlist_id = plr.playlist_id
self.row_number = plr.row_number
# TODO: only seems to be used by track_sequence
return
# plr = session.get(PlaylistRows, self.playlistrow_id)
# if not plr:
# raise ApplicationError(f"(Can't retrieve PlaylistRows entry, {self=}")
# self.playlist_id = plr.playlist_id
# self.row_number = plr.row_number
class TrackSequence:
next: Optional[RowAndTrack] = None
current: Optional[RowAndTrack] = None
previous: Optional[RowAndTrack] = None
next: Optional[PlaylistRow] = None
current: Optional[PlaylistRow] = None
previous: Optional[PlaylistRow] = None
def set_next(self, rat: Optional[RowAndTrack]) -> None:
def set_next(self, rat: Optional[PlaylistRow]) -> None:
"""
Set the 'next' track to be passed rat. Clear
any previous next track. If passed rat is None

View File

@ -75,7 +75,7 @@ from file_importer import FileImporter
from helpers import ask_yes_no, file_is_unreadable, get_name
from log import log
from models import db, Playdates, PlaylistRows, Playlists, Queries, Settings, Tracks
from music_manager import RowAndTrack, track_sequence
from music_manager import PlaylistRow, track_sequence
from playlistmodel import PlaylistModel, PlaylistProxyModel
from playlists import PlaylistTab
from querylistmodel import QuerylistModel
@ -2483,7 +2483,7 @@ class Window(QMainWindow):
self.current.proxy_model.set_incremental_search(self.txtSearch.text())
def selected_or_next_track_info(self) -> Optional[RowAndTrack]:
def selected_or_next_track_info(self) -> Optional[PlaylistRow]:
"""
Return RowAndTrack info for selected track. If no selected track, return for
next track. If no next track, return None.
@ -2569,7 +2569,7 @@ class Window(QMainWindow):
else:
self.statusbar.clearMessage()
def show_track(self, playlist_track: RowAndTrack) -> None:
def show_track(self, playlist_track: PlaylistRow) -> None:
"""Scroll to show track in plt"""
# Switch to the correct tab

View File

@ -35,7 +35,6 @@ from classes import (
ApplicationError,
Col,
MusicMusterSignals,
PlaylistRowObj,
)
from config import Config
from helpers import (
@ -49,8 +48,8 @@ from helpers import (
)
from log import log
from models import db, NoteColours, Playdates, PlaylistRows, Tracks
from music_manager import RowAndTrack, track_sequence
from repository import get_playlist_rows
from music_manager import PlaylistRow, track_sequence
import repository
HEADER_NOTES_COLUMN = 1
@ -85,17 +84,19 @@ class PlaylistModel(QAbstractTableModel):
self.playlist_id = playlist_id
self.is_template = is_template
self.playlist_rows: dict[int, PlaylistRowObj] = {}
self.selected_rows: list[PlaylistRowObj] = []
self.playlist_rows: dict[int, PlaylistRow] = {}
self.selected_rows: list[PlaylistRow] = []
self.signals = MusicMusterSignals()
self.signals.signal_set_next_row.connect(self.set_next_row)
self.played_tracks_hidden = False
# Connect signals
self.signals.begin_reset_model_signal.connect(self.begin_reset_model)
self.signals.end_reset_model_signal.connect(self.end_reset_model)
self.signals.signal_add_track_to_header.connect(self.add_track_to_header)
with db.Session() as session:
# Ensure row numbers in playlist are contiguous
# TODO: remove this
PlaylistRows.fixup_rownumbers(session, playlist_id)
# Populate self.playlist_rows
self.load_data(session)
@ -143,68 +144,66 @@ class PlaylistModel(QAbstractTableModel):
return header_row
def add_track_to_header(
self, row_number: int, track_id: int, note: Optional[str] = None
self, playlist_id: int, track_id: int, note: str = ""
) -> None:
"""
Add track to existing header row
Handle signal_add_track_to_header
"""
log.debug(f"{self}: add_track_to_header({row_number=}, {track_id=}, {note=}")
log.debug(f"{self}: add_track_to_header({playlist_id=}, {track_id=}, {note=}")
# Get existing row
try:
rat = self.playlist_rows[row_number]
except KeyError:
raise ApplicationError(
f"{self}: KeyError in add_track_to_header ({row_number=}, {track_id=})"
)
if rat.path:
raise ApplicationError(
f"{self}: Header row already has track associated ({rat=}, {track_id=})"
)
with db.Session() as session:
playlistrow = session.get(PlaylistRows, rat.playlistrow_id)
if not playlistrow:
raise ApplicationError(
f"{self}: Failed to retrieve playlist row ({rat.playlistrow_id=}"
)
# Add track to PlaylistRows
playlistrow.track_id = track_id
# Add any further note (header will already have a note)
if note:
playlistrow.note += " " + note
session.commit()
if playlist_id != self.playlist_id:
return
# Update local copy
self.refresh_row(session, row_number)
# Repaint row
roles = [
Qt.ItemDataRole.BackgroundRole,
Qt.ItemDataRole.DisplayRole,
Qt.ItemDataRole.FontRole,
Qt.ItemDataRole.ForegroundRole,
]
# only invalidate required roles
self.invalidate_row(row_number, roles)
if not self.selected_rows:
return
if len(self.selected_rows) > 1:
self.signals.show_warning_signal.emit(
"Add track to header", "Select one header to add track to"
)
return
selected_row = self.selected_rows[0]
if selected_row.path:
self.signals.show_warning_signal.emit(
"Add track to header", "Select header to add track to"
)
return
if selected_row.note:
selected_row.note += " " + note
selected_row.track_id = track_id
# Update local copy
self.refresh_row(selected_row.row_number)
# Repaint row
roles = [
Qt.ItemDataRole.BackgroundRole,
Qt.ItemDataRole.DisplayRole,
Qt.ItemDataRole.FontRole,
Qt.ItemDataRole.ForegroundRole,
]
# only invalidate required roles
self.invalidate_row(row_number, roles)
self.signals.resize_rows_signal.emit(self.playlist_id)
def _background_role(self, row: int, column: int, rat: RowAndTrack) -> QBrush:
def _background_role(self, row: int, column: int, plr: PlaylistRow) -> QBrush:
"""Return background setting"""
# Handle entire row colouring
# Header row
if self.is_header_row(row):
# Check for specific header colouring
if rat.row_bg is None:
with db.Session() as session:
rat.row_bg = NoteColours.get_colour(session, rat.note)
if rat.row_bg:
return QBrush(QColor(rat.row_bg))
if plr.row_bg is None:
plr.row_bg = repository.get_colour(plr.note)
if plr.row_bg:
return QBrush(QColor(plr.row_bg))
else:
return QBrush(QColor(Config.COLOUR_NOTES_PLAYLIST))
# Unreadable track file
if file_is_unreadable(rat.path):
if file_is_unreadable(plr.path):
return QBrush(QColor(Config.COLOUR_UNREADABLE))
# Current track
if (
@ -223,22 +222,21 @@ class PlaylistModel(QAbstractTableModel):
# Individual cell colouring
if column == Col.START_GAP.value:
if rat.start_gap and rat.start_gap >= Config.START_GAP_WARNING_THRESHOLD:
if plr.start_gap and plr.start_gap >= Config.START_GAP_WARNING_THRESHOLD:
return QBrush(QColor(Config.COLOUR_LONG_START))
if column == Col.BITRATE.value:
if not rat.bitrate or rat.bitrate < Config.BITRATE_LOW_THRESHOLD:
if not plr.bitrate or plr.bitrate < Config.BITRATE_LOW_THRESHOLD:
return QBrush(QColor(Config.COLOUR_BITRATE_LOW))
elif rat.bitrate < Config.BITRATE_OK_THRESHOLD:
elif plr.bitrate < Config.BITRATE_OK_THRESHOLD:
return QBrush(QColor(Config.COLOUR_BITRATE_MEDIUM))
else:
return QBrush(QColor(Config.COLOUR_BITRATE_OK))
if column == Col.NOTE.value:
if rat.note:
if rat.note_bg is None:
with db.Session() as session:
rat.note_bg = NoteColours.get_colour(session, rat.note)
if rat.note_bg:
return QBrush(QColor(rat.note_bg))
if plr.note:
if plr.note_bg is None:
plr.row_bg = repository.get_colour(plr.note)
if plr.note_bg:
return QBrush(QColor(plr.note_bg))
return QBrush()
@ -406,7 +404,7 @@ class PlaylistModel(QAbstractTableModel):
self.reset_track_sequence_row_numbers()
self.update_track_times()
def _display_role(self, row: int, column: int, rat: RowAndTrack) -> str:
def _display_role(self, row: int, column: int, rat: PlaylistRow) -> str:
"""
Return text for display
"""
@ -481,7 +479,7 @@ class PlaylistModel(QAbstractTableModel):
super().endResetModel()
self.reset_track_sequence_row_numbers()
def _edit_role(self, row: int, column: int, rat: RowAndTrack) -> str | int:
def _edit_role(self, row: int, column: int, rat: PlaylistRow) -> str | int:
"""
Return value for editing
"""
@ -502,7 +500,7 @@ class PlaylistModel(QAbstractTableModel):
return ""
def _foreground_role(self, row: int, column: int, rat: RowAndTrack) -> QBrush:
def _foreground_role(self, row: int, column: int, rat: PlaylistRow) -> QBrush:
"""Return header foreground colour or QBrush() if none"""
if self.is_header_row(row):
@ -539,7 +537,7 @@ class PlaylistModel(QAbstractTableModel):
return default
def _font_role(self, row: int, column: int, rat: RowAndTrack) -> QFont:
def _font_role(self, row: int, column: int, rat: PlaylistRow) -> QFont:
"""
Return font
"""
@ -598,7 +596,7 @@ class PlaylistModel(QAbstractTableModel):
log.debug(f"{self}: get_new_row_number() return: {new_row_number=}")
return new_row_number
def get_row_info(self, row_number: int) -> RowAndTrack:
def get_row_info(self, row_number: int) -> PlaylistRow:
"""
Return info about passed row
"""
@ -681,7 +679,7 @@ class PlaylistModel(QAbstractTableModel):
return QVariant()
def header_text(self, rat: RowAndTrack) -> str:
def header_text(self, rat: PlaylistRow) -> str:
"""
Process possible section timing directives embeded in header
"""
@ -735,31 +733,33 @@ class PlaylistModel(QAbstractTableModel):
new_row_number = self._get_new_row_number(proposed_row_number)
with db.Session() as session:
super().beginInsertRows(QModelIndex(), new_row_number, new_row_number)
_ = PlaylistRows.insert_row(
session=session,
playlist_id=self.playlist_id,
new_row_number=new_row_number,
note=note,
track_id=track_id,
)
session.commit()
super().beginInsertRows(QModelIndex(), new_row_number, new_row_number)
self.refresh_data(session)
super().endInsertRows()
new_row = repository.insert_row(
playlist_id=self.playlist_id,
row_number=new_row_number,
track_id=track_id,
note=note,
)
# Insert into self.playlist_rows
for destination_row in range(len(self.playlist_rows), new_row_number, -1):
self.playlist_rows[destination_row] = self.playlist_rows[destination_row - 1]
self.playlist_rows[new_row_number] = new_row
super().endInsertRows()
self.signals.resize_rows_signal.emit(self.playlist_id)
# TODO check this what we want to do and how we want to do it
self.reset_track_sequence_row_numbers()
# only invalidate required roles
roles = [
roles_to_invalidate = [
Qt.ItemDataRole.BackgroundRole,
Qt.ItemDataRole.DisplayRole,
Qt.ItemDataRole.FontRole,
Qt.ItemDataRole.ForegroundRole,
]
self.invalidate_rows(
list(range(new_row_number, len(self.playlist_rows))), roles
list(range(new_row_number, len(self.playlist_rows))), roles_to_invalidate
)
def invalidate_row(self, modified_row: int, roles: list[Qt.ItemDataRole]) -> None:
@ -804,7 +804,7 @@ class PlaylistModel(QAbstractTableModel):
return self.playlist_rows[row_number].played
def is_track_in_playlist(self, track_id: int) -> Optional[RowAndTrack]:
def is_track_in_playlist(self, track_id: int) -> Optional[PlaylistRow]:
"""
If this track_id is in the playlist, return the RowAndTrack object
else return None
@ -841,9 +841,10 @@ class PlaylistModel(QAbstractTableModel):
# new_playlist_rows[p.row_number] = self.playlist_rows[plid_to_row[p.id]]
# new_playlist_rows[p.row_number].row_number = p.row_number
# build a new playlist_rows
new_playlist_rows: dict[int, PlaylistRowObj] = {}
for p in get_playlist_rows(self.playlist_id):
new_playlist_rows[p.row_number] = p
# shouldn't be PlaylistRow
new_playlist_rows: dict[int, PlaylistRow] = {}
for p in repository.get_playlist_rows(self.playlist_id):
new_playlist_rows[p.row_number] = PlaylistRow(p)
# Copy to self.playlist_rows
self.playlist_rows = new_playlist_rows
@ -1008,7 +1009,7 @@ class PlaylistModel(QAbstractTableModel):
self.update_track_times()
def move_track_add_note(
self, new_row_number: int, existing_rat: RowAndTrack, note: str
self, new_row_number: int, existing_rat: PlaylistRow, note: str
) -> None:
"""
Move existing_rat track to new_row_number and append note to any existing note
@ -1034,26 +1035,6 @@ class PlaylistModel(QAbstractTableModel):
self.move_rows([existing_rat.row_number], new_row_number)
self.signals.resize_rows_signal.emit(self.playlist_id)
def move_track_to_header(
self,
header_row_number: int,
existing_rat: RowAndTrack,
note: Optional[str],
) -> None:
"""
Add the existing_rat track details to the existing header at header_row_number
"""
log.debug(
f"{self}: move_track_to_header({header_row_number=}, {existing_rat=}, {note=}"
)
if existing_rat.track_id:
if note and existing_rat.note:
note += "\n" + existing_rat.note
self.add_track_to_header(header_row_number, existing_rat.track_id, note)
self.delete_rows([existing_rat.row_number])
def obs_scene_change(self, row_number: int) -> None:
"""
Check this row and any preceding headers for OBS scene change command
@ -1133,20 +1114,21 @@ class PlaylistModel(QAbstractTableModel):
refresh_row().
"""
# Note where each playlist_id is
plid_to_row: dict[int, int] = {}
# Note where each playlist_id is by mapping each playlistrow_id
# to its current row_number
plrid_to_row: dict[int, int] = {}
for oldrow in self.playlist_rows:
plrdata = self.playlist_rows[oldrow]
plid_to_row[plrdata.playlistrow_id] = plrdata.row_number
plrid_to_row[plrdata.playlistrow_id] = plrdata.row_number
# build a new playlist_rows
new_playlist_rows: dict[int, RowAndTrack] = {}
for p in PlaylistRows.get_playlist_rows(session, self.playlist_id):
if p.id not in plid_to_row:
new_playlist_rows[p.row_number] = RowAndTrack(p)
new_playlist_rows: dict[int, PlaylistRow] = {}
for p in repository.get_playlist_rows(self.playlist_id):
if p.playlistrow_id not in plrid_to_row:
new_playlist_rows[p.row_number] = PlaylistRow(p)
else:
new_playlist_rows[p.row_number] = self.playlist_rows[plid_to_row[p.id]]
new_playlist_rows[p.row_number].row_number = p.row_number
new_playlist_row = self.playlist_rows[plrid_to_row[p.playlistrow_id]]
new_playlist_row.row_number = p.row_number
# Copy to self.playlist_rows
self.playlist_rows = new_playlist_rows
@ -1155,7 +1137,7 @@ class PlaylistModel(QAbstractTableModel):
"""Populate dict for one row from database"""
p = PlaylistRows.deep_row(session, self.playlist_id, row_number)
self.playlist_rows[row_number] = RowAndTrack(p)
self.playlist_rows[row_number] = PlaylistRow(p)
def remove_track(self, row_number: int) -> None:
"""
@ -1344,7 +1326,7 @@ class PlaylistModel(QAbstractTableModel):
return len(self.playlist_rows)
def section_subtotal_header(self, rat: RowAndTrack) -> str:
def section_subtotal_header(self, rat: PlaylistRow) -> str:
"""
Process this row as subtotal within a timed section and
return display text for this row
@ -1589,7 +1571,7 @@ class PlaylistModel(QAbstractTableModel):
self.sort_by_attribute(row_numbers, "title")
def start_of_timed_section_header(self, rat: RowAndTrack) -> str:
def start_of_timed_section_header(self, rat: PlaylistRow) -> str:
"""
Process this row as the start of a timed section and
return display text for this row
@ -1625,7 +1607,7 @@ class PlaylistModel(QAbstractTableModel):
def supportedDropActions(self) -> Qt.DropAction:
return Qt.DropAction.MoveAction | Qt.DropAction.CopyAction
def _tooltip_role(self, row: int, column: int, rat: RowAndTrack) -> str:
def _tooltip_role(self, row: int, column: int, rat: PlaylistRow) -> str:
"""
Return tooltip. Currently only used for last_played column.
"""

View File

@ -40,7 +40,7 @@ from helpers import (
)
from log import log
from models import db, Playdates, Tracks
from music_manager import RowAndTrack
from music_manager import PlaylistRow
@dataclass
@ -268,7 +268,7 @@ class QuerylistModel(QAbstractTableModel):
bottom_right = self.index(row, self.columnCount() - 1)
self.dataChanged.emit(top_left, bottom_right, [Qt.ItemDataRole.BackgroundRole])
def _tooltip_role(self, row: int, column: int, rat: RowAndTrack) -> str | QVariant:
def _tooltip_role(self, row: int, column: int, rat: PlaylistRow) -> str | QVariant:
"""
Return tooltip. Currently only used for last_played column.
"""

View File

@ -1,15 +1,157 @@
# Standard library imports
import re
# PyQt imports
# Third party imports
from sqlalchemy import select, func
from sqlalchemy import (
func,
select,
update,
)
from sqlalchemy.orm import aliased
from classes import PlaylistRowObj
from sqlalchemy.orm.session import Session
from classes import ApplicationError, PlaylistRowDTO
# App imports
from classes import TrackDTO
from models import db, Tracks, PlaylistRows, Playdates
from classes import PlaylistDTO, TrackDTO
from app import helpers
from log import log
from models import (
db,
NoteColours,
Playdates,
PlaylistRows,
Playlists,
Tracks,
)
# Notecolour functions
def get_colour(text: str, foreground: bool = False) -> str:
"""
Parse text and return background (foreground if foreground==True)
colour string if matched, else None
"""
if not text:
return ""
match = False
with db.Session() as session:
for rec in NoteColours.get_all(session):
if rec.is_regex:
flags = re.UNICODE
if not rec.is_casesensitive:
flags |= re.IGNORECASE
p = re.compile(rec.substring, flags)
if p.match(text):
match = True
else:
if rec.is_casesensitive:
if rec.substring in text:
match = True
else:
if rec.substring.lower() in text.lower():
match = True
if match:
if foreground:
return rec.foreground or ""
else:
return rec.colour
return ""
# Track functions
def add_track_to_header(self, playlistrow_id: int, track_id: int) -> None:
"""
Add a track to this (header) row
"""
with db.Session() as session:
session.execute(
update(PlaylistRows)
.where(PlaylistRows.id == playlistrow_id)
.values(track_id=track_id)
)
session.commit()
def create_track(path: str) -> TrackDTO:
"""
Create a track db entry from a track path and return the DTO
"""
metadata = helpers.get_all_track_metadata(path)
with db.Session() as session:
try:
track = Tracks(session=session, **metadata)
track_id = track.id
session.commit()
except Exception:
raise ApplicationError("Can't create Track")
return track_by_id(track_id)
def track_by_id(track_id: int) -> TrackDTO | None:
"""
Return track with specified id
"""
# Alias PlaydatesTable for subquery
LatestPlaydate = aliased(Playdates)
# Subquery: latest playdate for each track
latest_playdate_subq = (
select(
LatestPlaydate.track_id,
func.max(LatestPlaydate.lastplayed).label("lastplayed"),
)
.group_by(LatestPlaydate.track_id)
.subquery()
)
stmt = (
select(
Tracks.id.label("track_id"),
Tracks.artist,
Tracks.bitrate,
Tracks.duration,
Tracks.fade_at,
Tracks.intro,
Tracks.path,
Tracks.silence_at,
Tracks.start_gap,
Tracks.title,
latest_playdate_subq.c.lastplayed,
)
.outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id)
.where(Tracks.id == track_id)
)
with db.Session() as session:
record = session.execute(stmt).one_or_none()
if not record:
return None
dto = TrackDTO(
artist=record.artist,
bitrate=record.bitrate,
duration=record.duration,
fade_at=record.fade_at,
intro=record.intro,
lastplayed=record.lastplayed,
path=record.path,
silence_at=record.silence_at,
start_gap=record.start_gap,
title=record.title,
track_id=record.track_id,
)
return dto
def tracks_like_title(filter_str: str) -> list[TrackDTO]:
@ -29,7 +171,87 @@ def tracks_like_title(filter_str: str) -> list[TrackDTO]:
]
def get_playlist_rows(playlist_id: int) -> list[PlaylistRowObj]:
# Playlist functions
def _check_row_number_sequence(
session: Session, playlist_id: int, fix: bool = False
) -> None:
"""
The row numbers for any playlist should run from 0 to (length - 1).
This function checks that that is the case.
If there are errors, 'fix' determines what action is taken.
If fix == True:
Fix the row numbers and save to database. Log at info level.
If fix == False:
Log at error level and raise ApplicationError
"""
errors = False
playlist_rows = session.scalars(
select(PlaylistRows)
.where(PlaylistRows.playlist_id == playlist_id)
.order_by(PlaylistRows.row_number)
).all()
for idx, playlist_row in enumerate(playlist_rows):
if playlist_row.row_number != idx:
errors = True
msg = f"_check_row_number_sequence({playlist_id=}, {fix=}, {playlist_row=}, {idx=}"
if fix:
log.info(msg)
playlist_row.row_number = idx
else:
log.error(msg)
raise ApplicationError(msg)
if errors:
session.commit()
def _move_rows_down(
session: Session, playlist_id: int, starting_row: int, move_by: int
) -> None:
"""
Create space to insert move_by additional rows by incremented row
number from starting_row to end of playlist
"""
log.debug(f"(_move_rows_down({playlist_id=}, {starting_row=}, {move_by=}")
session.execute(
update(PlaylistRows)
.where(
(PlaylistRows.playlist_id == playlist_id),
(PlaylistRows.row_number >= starting_row),
)
.values(row_number=PlaylistRows.row_number + move_by)
)
session.commit()
def create_playlist(name: str, template_id: int) -> PlaylistDTO:
"""
Create playlist and return DTO.
"""
with db.Session() as session:
try:
playlist = Playlists(session, name, template_id)
playlist_id = playlist.id
session.commit()
except Exception:
raise ApplicationError("Can't create Playlist")
return playlist_by_id(playlist_id)
def get_playlist_row(playlist_row_id: int) -> PlaylistRowDTO | None:
"""
Return specific row DTO
"""
# Alias PlaydatesTable for subquery
LatestPlaydate = aliased(Playdates)
@ -37,7 +259,94 @@ def get_playlist_rows(playlist_id: int) -> list[PlaylistRowObj]:
latest_playdate_subq = (
select(
LatestPlaydate.track_id,
func.max(LatestPlaydate.lastplayed).label("lastplayed")
func.max(LatestPlaydate.lastplayed).label("lastplayed"),
)
.group_by(LatestPlaydate.track_id)
.subquery()
)
stmt = (
select(
PlaylistRows.id.label("playlistrow_id"),
PlaylistRows.row_number,
PlaylistRows.note,
PlaylistRows.played,
PlaylistRows.playlist_id,
Tracks.id.label("track_id"),
Tracks.artist,
Tracks.bitrate,
Tracks.duration,
Tracks.fade_at,
Tracks.intro,
Tracks.path,
Tracks.silence_at,
Tracks.start_gap,
Tracks.title,
latest_playdate_subq.c.lastplayed,
)
.outerjoin(Tracks, PlaylistRows.track_id == Tracks.id)
.outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id)
.where(PlaylistRows.id == playlist_row_id)
.order_by(PlaylistRows.row_number)
)
with db.Session() as session:
record = session.execute(stmt).one_or_none()
if not record:
return None
# Handle cases where track_id is None (no track associated)
if record.track_id is None:
dto = PlaylistRowDTO(
artist="",
bitrate=0,
duration=0,
fade_at=0,
intro=None,
lastplayed=None,
note=record.note,
path="",
played=record.played,
playlist_id=record.playlist_id,
playlistrow_id=record.playlistrow_id,
row_number=record.row_number,
silence_at=0,
start_gap=0,
title="",
track_id=-1,
)
else:
dto = PlaylistRowDTO(
artist=record.artist,
bitrate=record.bitrate,
duration=record.duration,
fade_at=record.fade_at,
intro=record.intro,
lastplayed=record.lastplayed,
note=record.note,
path=record.path,
played=record.played,
playlist_id=record.playlist_id,
playlistrow_id=record.playlistrow_id,
row_number=record.row_number,
silence_at=record.silence_at,
start_gap=record.start_gap,
title=record.title,
track_id=record.track_id,
)
return dto
def get_playlist_rows(playlist_id: int) -> list[PlaylistRowDTO]:
# Alias PlaydatesTable for subquery
LatestPlaydate = aliased(Playdates)
# Subquery: latest playdate for each track
latest_playdate_subq = (
select(
LatestPlaydate.track_id,
func.max(LatestPlaydate.lastplayed).label("lastplayed"),
)
.group_by(LatestPlaydate.track_id)
.subquery()
@ -75,7 +384,7 @@ def get_playlist_rows(playlist_id: int) -> list[PlaylistRowObj]:
for row in results:
# Handle cases where track_id is None (no track associated)
if row.track_id is None:
dto = PlaylistRowObj(
dto = PlaylistRowDTO(
artist="",
bitrate=0,
duration=0,
@ -95,7 +404,7 @@ def get_playlist_rows(playlist_id: int) -> list[PlaylistRowObj]:
# Additional fields like row_fg, row_bg, etc., use default None values
)
else:
dto = PlaylistRowObj(
dto = PlaylistRowDTO(
artist=row.artist,
bitrate=row.bitrate,
duration=row.duration,
@ -117,3 +426,62 @@ def get_playlist_rows(playlist_id: int) -> list[PlaylistRowObj]:
dto_list.append(dto)
return dto_list
def insert_row(playlist_id: int, row_number: int, track_id: int, note: str) -> PlaylistRowDTO:
"""
Insert a new row into playlist and return new row DTO
"""
with db.Session() as session:
# Sanity check
_check_row_number_sequence(session=session, playlist_id=playlist_id, fix=False)
# Make space for new row
_move_rows_down(
session=session, playlist_id=playlist_id, starting_row=row_number, move_by=1
)
playlist_row = PlaylistRows.insert_row(
session=session,
playlist_id=playlist_id,
new_row_number=row_number,
note=note,
track_id=track_id,
)
session.commit()
playlist_row_id = playlist_row.id
# Sanity check
_check_row_number_sequence(session=session, playlist_id=playlist_id, fix=False)
return get_playlist_row(playlist_row_id=playlist_row_id)
def playlist_by_id(playlist_id: int) -> PlaylistDTO | None:
"""
Return playlist with specified id
"""
stmt = select(
Playlists.id.label("playlist_id"),
Playlists.name,
Playlists.favourite,
Playlists.is_template,
Playlists.open,
).where(Playlists.id == playlist_id)
with db.Session() as session:
record = session.execute(stmt).one_or_none()
if not record:
return None
dto = PlaylistDTO(
name=record.name,
playlist_id=record.playlist_id,
favourite=record.favourite,
is_template=record.is_template,
open=record.open,
)
return dto

View File

@ -0,0 +1,40 @@
# Standard library imports
import unittest
# PyQt imports
# Third party imports
# App imports
from app.models import (
db,
)
class MyTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Runs once before any test in this class"""
pass
@classmethod
def tearDownClass(cls):
"""Runs once after all tests"""
pass
def setUp(self):
"""Runs before each test"""
db.create_all()
def tearDown(self):
"""Runs after each test"""
db.drop_all()
def test_xxx(self):
"""Comment"""
pass

76
tests/test_db_updates.py Normal file
View File

@ -0,0 +1,76 @@
# Standard library imports
import unittest
# PyQt imports
# Third party imports
# App imports
from app import playlistmodel
from app import repository
from app.models import db
class MyTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Runs once before any test in this class"""
pass
@classmethod
def tearDownClass(cls):
"""Runs once after all tests"""
pass
def setUp(self):
"""Runs before each test"""
db.create_all()
# Create a playlist and model
playlist_name = "my playlist"
self.playlist = repository.create_playlist(name=playlist_name, template_id=0)
assert self.playlist
self.model = playlistmodel.PlaylistModel(
self.playlist.playlist_id, is_template=False
)
assert self.model
# Create tracks
track1_path = "testdata/isa.mp3"
self.track1 = repository.create_track(track1_path)
track2_path = "testdata/mom.mp3"
self.track2 = repository.create_track(track2_path)
# Add tracks and header to playlist
repository.insert_row(
self.playlist.playlist_id,
row_number=0,
track_id=self.track1.track_id,
note="track 1",
)
repository.insert_row(
self.playlist.playlist_id,
row_number=1,
track_id=0,
note="Header row",
)
repository.insert_row(
self.playlist.playlist_id,
row_number=2,
track_id=self.track2.track_id,
note="track 2",
)
def tearDown(self):
"""Runs after each test"""
db.drop_all()
def test_xxx(self):
"""Comment"""
pass