musicmuster/app/playlistmodel.py
Keith Edmunds 698fa4625a WIP V3: track start/stop times basics working
Only updates from header rows or current track. Changing
current track doesn't update correctly.
2023-11-07 23:14:26 +00:00

750 lines
25 KiB
Python

from datetime import datetime, timedelta
from enum import auto, Enum
from typing import List, Optional
from PyQt6.QtCore import (
QAbstractTableModel,
QModelIndex,
Qt,
QVariant,
)
from PyQt6.QtGui import (
QBrush,
QColor,
QFont,
)
from classes import track_sequence, MusicMusterSignals, PlaylistTrack
from config import Config
from dbconfig import scoped_session, Session
from helpers import file_is_unreadable, get_embedded_time
from log import log
from models import Playdates, PlaylistRows, Tracks
HEADER_NOTES_COLUMN = 1
class Col(Enum):
START_GAP = 0
TITLE = auto()
ARTIST = auto()
DURATION = auto()
START_TIME = auto()
END_TIME = auto()
LAST_PLAYED = auto()
BITRATE = auto()
NOTE = auto()
class PlaylistRowData:
def __init__(self, plr: PlaylistRows) -> None:
"""
Populate PlaylistRowData from database PlaylistRows record
"""
self.artist: str = ""
self.bitrate = 0
self.duration: int = 0
self.end_time: Optional[datetime] = None
self.lastplayed: datetime = Config.EPOCH
self.path = ""
self.played = False
self.start_gap: Optional[int] = None
self.start_time: Optional[datetime] = None
self.title: str = ""
self.plrid: int = plr.id
self.plr_rownum: int = plr.plr_rownum
self.note: str = plr.note
if plr.track:
self.start_gap = plr.track.start_gap
self.title = plr.track.title
self.artist = plr.track.artist
self.duration = plr.track.duration
self.played = plr.played
if plr.track.playdates:
self.lastplayed = max([a.lastplayed for a in plr.track.playdates])
else:
self.lastplayed = Config.EPOCH
self.bitrate = plr.track.bitrate or 0
self.path = plr.track.path
def __repr__(self) -> str:
return (
f"<PlaylistRowData: plrid={self.plrid}, plr_rownum={self.plr_rownum}, "
f"note='{self.note}', title='{self.title}', artist='{self.artist}'>"
)
class PlaylistModel(QAbstractTableModel):
"""
The Playlist Model
Update strategy: update the database and then refresh the
row-indexed cached copy (self.playlist_rows). Do not edit
self.playlist_rows directly because keeping it and the
database in sync is uncessarily challenging.
refresh_row() will populate one row of playlist_rows from the
database
refresh_data() will repopulate all of playlist_rows from the
database
"""
def __init__(
self,
playlist_id: int,
*args,
**kwargs,
):
self.playlist_id = playlist_id
super().__init__(*args, **kwargs)
self.playlist_rows: dict[int, PlaylistRowData] = {}
self.signals = MusicMusterSignals()
self.signals.add_track_to_header_signal.connect(self.add_track_to_header)
self.signals.add_track_to_playlist_signal.connect(self.add_track)
with Session() as session:
self.refresh_data(session)
self.update_track_times()
def __repr__(self) -> str:
return (
f"<PlaylistModel: playlist_id={self.playlist_id}, {self.rowCount()} rows>"
)
def add_track(
self,
playlist_id: int,
new_row_number: int,
track_id: Optional[int],
note: Optional[str],
) -> None:
"""
Add track if it's for our playlist
"""
# Ignore if it's not for us
if playlist_id != self.playlist_id:
return
# Insert track if we have one
if track_id:
self.insert_track_row(new_row_number, track_id, note)
# If we only have a note, add as a header row
elif note:
self.insert_header_row(new_row_number, note)
else:
# No track, no note, no point
return
def add_track_to_header(
self,
playlist_id: int,
row_number: int,
track_id: int,
) -> None:
"""
Add track to existing header row if it's for our playlist
"""
# Ignore if it's not for us
if playlist_id != self.playlist_id:
return
# Get existing row
try:
prd = self.playlist_rows[row_number]
except KeyError:
log.error(
f"KeyError in PlaylistModel:add_track_to_header ({playlist_id=}, "
f"{row_number=}, {track_id=}, {len(self.playlist_rows)=}"
)
return
if prd.path:
log.error(
f"Error in PlaylistModel:add_track_to_header ({prd=}, "
"Header row already has track associated"
)
return
with Session() as session:
plr = session.get(PlaylistRows, prd.plrid)
if plr:
# Add track to PlaylistRows
plr.track_id = track_id
# Reset header row spanning
self.signals.span_cells_signal.emit(
row_number, HEADER_NOTES_COLUMN, 1, 1
)
# Update local copy
self.refresh_row(session, row_number)
# Repaint row
self.invalidate_row(row_number)
def background_role(self, row: int, column: int, prd: PlaylistRowData) -> QBrush:
"""Return background setting"""
# Handle entire row colouring
# Header row
if not prd.path:
return QBrush(QColor(Config.COLOUR_NOTES_PLAYLIST))
# Unreadable track file
if file_is_unreadable(prd.path):
return QBrush(QColor(Config.COLOUR_UNREADABLE))
# Current track
if prd.plrid == track_sequence.now.plr_id:
return QBrush(QColor(Config.COLOUR_CURRENT_PLAYLIST))
# Next track
if prd.plrid == track_sequence.next.plr_id:
return QBrush(QColor(Config.COLOUR_NEXT_PLAYLIST))
# Individual cell colouring
if column == Col.START_GAP.value:
if prd.start_gap and prd.start_gap >= Config.START_GAP_WARNING_THRESHOLD:
return QBrush(QColor(Config.COLOUR_LONG_START))
if column == Col.BITRATE.value:
if prd.bitrate < Config.BITRATE_LOW_THRESHOLD:
return QBrush(QColor(Config.COLOUR_BITRATE_LOW))
elif prd.bitrate and prd.bitrate < Config.BITRATE_OK_THRESHOLD:
return QBrush(QColor(Config.COLOUR_BITRATE_MEDIUM))
else:
return QBrush(QColor(Config.COLOUR_BITRATE_OK))
return QBrush()
def columnCount(self, parent: QModelIndex = QModelIndex()) -> int:
"""Standard function for view"""
return 9
def current_track_started(self) -> None:
"""
Notification from musicmuster that the current track has just
started playing
Actions required:
- sanity check
- update display
- update track times
- update Playdates in database
- update PlaylistRows in database
- find next track
"""
row_number = track_sequence.now.plr_rownum
# Sanity check
if not track_sequence.now.track_id:
log.error(
"playlistmodel:current_track_started called with no current track"
)
return
if row_number is None:
log.error(
"playlistmodel:current_track_started called with no row number "
f"({track_sequence.now=})"
)
return
# Update display
self.invalidate_row(row_number)
# Update track times
self.playlist_rows[row_number].start_time = datetime.now()
self.playlist_rows[row_number].end_time = datetime.now() + timedelta(
milliseconds=self.playlist_rows[row_number].duration
)
self.update_track_times()
# Update Playdates in database
with Session() as session:
Playdates(session, track_sequence.now.track_id)
plr = session.get(PlaylistRows, track_sequence.now.plr_id)
if plr:
plr.played = True
self.refresh_row(session, plr.plr_rownum)
# Find next track
# Get all unplayed track rows
next_row = None
unplayed_rows = [
a.plr_rownum
for a in PlaylistRows.get_unplayed_rows(session, self.playlist_id)
]
if unplayed_rows:
try:
# Find next row after current track
next_row = min(
[a for a in unplayed_rows if a > track_sequence.now.plr_rownum]
)
except ValueError:
# Find first unplayed track
next_row = min(unplayed_rows)
if next_row is not None:
self.set_next_row(next_row)
def data(self, index: QModelIndex, role: int = Qt.ItemDataRole.DisplayRole):
"""Return data to view"""
if not index.isValid() or not (0 <= index.row() < len(self.playlist_rows)):
return QVariant()
row = index.row()
column = index.column()
# prd for playlist row data as it's used a lot
prd = self.playlist_rows[row]
# Dispatch to role-specific functions
if role == Qt.ItemDataRole.DisplayRole:
return self.display_role(row, column, prd)
elif role == Qt.ItemDataRole.DecorationRole:
pass
elif role == Qt.ItemDataRole.EditRole:
return self.edit_role(row, column, prd)
elif role == Qt.ItemDataRole.ToolTipRole:
pass
elif role == Qt.ItemDataRole.StatusTipRole:
pass
elif role == Qt.ItemDataRole.WhatsThisRole:
pass
elif role == Qt.ItemDataRole.SizeHintRole:
pass
elif role == Qt.ItemDataRole.FontRole:
return self.font_role(row, column, prd)
elif role == Qt.ItemDataRole.TextAlignmentRole:
pass
elif role == Qt.ItemDataRole.BackgroundRole:
return self.background_role(row, column, prd)
elif role == Qt.ItemDataRole.ForegroundRole:
pass
elif role == Qt.ItemDataRole.CheckStateRole:
pass
elif role == Qt.ItemDataRole.InitialSortOrderRole:
pass
# Fall through to no-op
return QVariant()
def display_role(self, row: int, column: int, prd: PlaylistRowData) -> QVariant:
"""
Return text for display
"""
if not prd.path:
# No track so this is a header row
if column == HEADER_NOTES_COLUMN:
self.signals.span_cells_signal.emit(
row, HEADER_NOTES_COLUMN, 1, self.columnCount() - 1
)
return QVariant(prd.note)
else:
return QVariant()
if column == Col.START_GAP.value:
return QVariant(prd.start_gap)
if column == Col.TITLE.value:
return QVariant(prd.title)
if column == Col.ARTIST.value:
return QVariant(prd.artist)
if column == Col.DURATION.value:
return QVariant(prd.duration)
if column == Col.START_TIME.value:
if prd.start_time:
return QVariant(prd.start_time.strftime(Config.TRACK_TIME_FORMAT))
else:
return QVariant()
if column == Col.END_TIME.value:
if prd.end_time:
return QVariant(prd.end_time.strftime(Config.TRACK_TIME_FORMAT))
else:
return QVariant()
if column == Col.LAST_PLAYED.value:
return QVariant(prd.lastplayed)
if column == Col.BITRATE.value:
return QVariant(prd.bitrate)
if column == Col.NOTE.value:
return QVariant(prd.note)
return QVariant()
def edit_role(self, row: int, column: int, prd: PlaylistRowData) -> QVariant:
"""
Return text for editing
"""
if column == Col.TITLE.value:
return QVariant(prd.title)
if column == Col.ARTIST.value:
return QVariant(prd.artist)
if column == Col.NOTE.value:
return QVariant(prd.note)
return QVariant()
def flags(self, index: QModelIndex) -> Qt.ItemFlag:
"""
Standard model flags
"""
if not index.isValid():
return Qt.ItemFlag.ItemIsDropEnabled
default = (
Qt.ItemFlag.ItemIsEnabled
| Qt.ItemFlag.ItemIsSelectable
| Qt.ItemFlag.ItemIsDragEnabled
)
if index.column() in [Col.TITLE.value, Col.ARTIST.value, Col.NOTE.value]:
return default | Qt.ItemFlag.ItemIsEditable
return default
def font_role(self, row: int, column: int, prd: PlaylistRowData) -> QVariant:
"""
Return font
"""
# Notes column is never bold
if column == Col.NOTE.value:
return QVariant()
boldfont = QFont()
boldfont.setBold(not self.playlist_rows[row].played)
return QVariant(boldfont)
def headerData(
self,
section: int,
orientation: Qt.Orientation,
role: int = Qt.ItemDataRole.DisplayRole,
) -> QVariant:
"""
Return text for headers
"""
if role == Qt.ItemDataRole.DisplayRole:
if orientation == Qt.Orientation.Horizontal:
if section == Col.START_GAP.value:
return QVariant(Config.HEADER_START_GAP)
elif section == Col.TITLE.value:
return QVariant(Config.HEADER_TITLE)
elif section == Col.ARTIST.value:
return QVariant(Config.HEADER_ARTIST)
elif section == Col.DURATION.value:
return QVariant(Config.HEADER_DURATION)
elif section == Col.START_TIME.value:
return QVariant(Config.HEADER_START_TIME)
elif section == Col.END_TIME.value:
return QVariant(Config.HEADER_END_TIME)
elif section == Col.LAST_PLAYED.value:
return QVariant(Config.HEADER_LAST_PLAYED)
elif section == Col.BITRATE.value:
return QVariant(Config.HEADER_BITRATE)
elif section == Col.NOTE.value:
return QVariant(Config.HEADER_NOTE)
else:
if Config.ROWS_FROM_ZERO:
return QVariant(str(section))
else:
return QVariant(str(section + 1))
elif role == Qt.ItemDataRole.FontRole:
boldfont = QFont()
boldfont.setBold(True)
return QVariant(boldfont)
return QVariant()
def is_header_row(self, row_number: int) -> bool:
"""
Return True if row is a header row, else False
"""
return self.playlist_rows[row_number].path == ""
def insert_header_row(self, row_number: Optional[int], text: str) -> None:
"""
Insert a header row.
"""
with Session() as session:
plr = self._insert_row(session, row_number)
# Update the PlaylistRows object
plr.note = text
# Repopulate self.playlist_rows
self.refresh_data(session)
# Update the display from the new row onwards
self.invalidate_rows(list(range(plr.plr_rownum, len(self.playlist_rows))))
def _insert_row(
self, session: scoped_session, row_number: Optional[int]
) -> PlaylistRows:
"""
Insert a row in the database.
If row_number is greater than length of list plus 1, or if row
number is None, put row at end of list.
Move existing rows to make space if ncessary.
Return the new PlaylistRows object.
"""
if row_number is None or row_number > len(self.playlist_rows):
# We are adding to the end of the list so we can optimise
new_row_number = len(self.playlist_rows)
return PlaylistRows(session, self.playlist_id, new_row_number)
elif row_number < 0:
raise ValueError(
f"playlistmodel._insert_row, invalid row number ({row_number})"
)
else:
new_row_number = row_number
# Insert the new row and return it
return PlaylistRows.insert_row(session, self.playlist_id, new_row_number)
def insert_track_row(
self, row_number: Optional[int], track_id: int, text: Optional[str]
) -> None:
"""
Insert a track row.
"""
with Session() as session:
plr = self._insert_row(session, row_number)
# Update the PlaylistRows object
plr.track_id = track_id
if text:
plr.note = text
# Repopulate self.playlist_rows
self.refresh_data(session)
# Update the display from the new row onwards
self.invalidate_rows(list(range(plr.plr_rownum, len(self.playlist_rows))))
def invalidate_row(self, modified_row: int) -> None:
"""
Signal to view to refresh invlidated row
"""
self.dataChanged.emit(
self.index(modified_row, 0), self.index(modified_row, self.columnCount())
)
def invalidate_rows(self, modified_rows: List[int]) -> None:
"""
Signal to view to refresh invlidated rows
"""
for modified_row in modified_rows:
self.invalidate_row(modified_row)
def move_rows(self, from_rows: List[int], to_row_number: int) -> None:
"""
Move the playlist rows given to to_row and below.
"""
# Build a {current_row_number: new_row_number} dictionary
row_map: dict[int, int] = {}
# Put the from_row row numbers into the row_map. Ultimately the
# total number of elements in the playlist doesn't change, so
# check that adding the moved rows starting at to_row won't
# overshoot the end of the playlist.
if to_row_number + len(from_rows) > len(self.playlist_rows):
next_to_row = len(self.playlist_rows) - len(from_rows)
else:
next_to_row = to_row_number
for from_row, to_row in zip(
from_rows, range(next_to_row, next_to_row + len(from_rows))
):
row_map[from_row] = to_row
# Move the remaining rows to the row_map. We want to fill it
# before (if there are gaps) and after (likewise) the rows that
# are moving.
# This iterates old_row and new_row simultaneously.
for old_row, new_row in zip(
[x for x in self.playlist_rows.keys() if x not in from_rows],
[y for y in range(len(self.playlist_rows)) if y not in row_map.values()],
):
# Optimise: only add to map if there is a change
if old_row != new_row:
row_map[old_row] = new_row
# For SQLAlchemy, build a list of dictionaries that map plrid to
# new row number:
sqla_map: List[dict[str, int]] = []
for oldrow, newrow in row_map.items():
plrid = self.playlist_rows[oldrow].plrid
sqla_map.append({"plrid": plrid, "plr_rownum": newrow})
with Session() as session:
PlaylistRows.update_plr_rownumbers(session, self.playlist_id, sqla_map)
# Update playlist_rows
self.refresh_data(session)
# Update display
self.invalidate_rows(list(row_map.keys()))
def previous_track_ended(self) -> None:
"""
Notification from musicmuster that the previous track has ended.
Actions required:
- sanity check
- update display
- update track times
"""
# Sanity check
if not track_sequence.previous.track_id:
log.error("playlistmodel:previous_track_ended called with no current track")
return
if track_sequence.previous.plr_rownum is None:
log.error(
"playlistmodel:previous_track_ended called with no row number "
f"({track_sequence.previous=})"
)
return
# Update display
self.invalidate_row(track_sequence.previous.plr_rownum)
# Update track times
# TODO
def refresh_data(self, session: scoped_session):
"""Populate dicts for data calls"""
# Populate self.playlist_rows with playlist data
self.playlist_rows.clear()
for p in PlaylistRows.deep_rows(session, self.playlist_id):
self.playlist_rows[p.plr_rownum] = PlaylistRowData(p)
def refresh_row(self, session, row_number):
"""Populate dict for one row for data calls"""
p = PlaylistRows.deep_row(session, self.playlist_id, row_number)
self.playlist_rows[row_number] = PlaylistRowData(p)
def rowCount(self, index: QModelIndex = QModelIndex()) -> int:
"""Standard function for view"""
return len(self.playlist_rows)
def set_next_row(self, row_number: int) -> None:
"""
Set row_number as next track
"""
# Update playing_trtack
with Session() as session:
track_sequence.next = PlaylistTrack()
try:
plrid = self.playlist_rows[row_number].plrid
except IndexError:
log.error(
f"playlistmodel.set_next_track({row_number=}, "
f"{self.playlist_id=}"
)
return
plr = session.get(PlaylistRows, plrid)
if plr:
# Check this isn't a header row
if plr.track is None:
return
# Check track is readable
if file_is_unreadable(plr.track.path):
return
track_sequence.next.set_plr(session, plr)
self.signals.next_track_changed_signal.emit()
self.invalidate_row(row_number)
def setData(
self, index: QModelIndex, value: QVariant, role: int = Qt.ItemDataRole.EditRole
) -> bool:
"""
Update model with edited data
"""
if index.isValid() and role == Qt.ItemDataRole.EditRole:
row_number = index.row()
column = index.column()
with Session() as session:
plr = session.get(PlaylistRows, self.playlist_rows[row_number].plrid)
if not plr:
print(
f"Error saving data: {row_number=}, {column=}, "
f"{value=}, {self.playlist_id=}"
)
return False
if column == Col.TITLE.value or column == Col.ARTIST.value:
track = session.get(Tracks, plr.track_id)
if not track:
print(f"Error retreiving track: {plr=}")
return False
if column == Col.TITLE.value:
track.title = str(value)
elif column == Col.ARTIST.value:
track.artist = str(value)
else:
print(f"Error updating track: {column=}, {value=}")
return False
elif column == Col.NOTE.value:
plr.note = str(value)
# Flush changes before refreshing data
session.flush()
self.refresh_row(session, row_number)
self.dataChanged.emit(index, index, [Qt.ItemDataRole.DisplayRole, role])
return True
return False
def supportedDropActions(self) -> Qt.DropAction:
return Qt.DropAction.MoveAction | Qt.DropAction.CopyAction
def update_track_times(self) -> None:
"""
Update track start/end times in self.playlist_rows
"""
next_start_time: Optional[datetime] = None
for row_number in range(len(self.playlist_rows)):
plr = self.playlist_rows[row_number]
# Reset start_time if this is the current row
if row_number == track_sequence.now.plr_rownum:
next_start_time = plr.end_time = track_sequence.now.end_time
continue
# Don't update times for tracks that have been played
if plr.played:
continue
# Reset start time if timing in hearer or at current track
if not plr.path:
# This is a header row
header_time = get_embedded_time(plr.note)
if header_time:
next_start_time = header_time
else:
# This is an unplayed track; set start/end if we have a
# start time
if next_start_time is None:
continue
plr.start_time = next_start_time
next_start_time = plr.end_time = next_start_time + timedelta(
milliseconds=self.playlist_rows[row_number].duration
)