musicmuster/app/playlistmodel.py
Keith Edmunds d6f55c5987 Rewrite of track handling
Combine the old track_manager and playlist data structures into
RowAndTrack data structure.
2024-07-29 18:52:02 +01:00

1659 lines
58 KiB
Python

# Standard library imports
# Allow forward reference to PlaylistModel
from __future__ import annotations
from operator import attrgetter
from random import shuffle
from typing import Optional
import datetime as dt
import re
# PyQt imports
from PyQt6.QtCore import (
QAbstractTableModel,
QModelIndex,
QObject,
QRegularExpression,
QSortFilterProxyModel,
Qt,
QTimer,
QVariant,
)
from PyQt6.QtGui import (
QBrush,
QColor,
QFont,
)
# Third party imports
import obswebsocket # type: ignore
# import snoop # type: ignore
# App imports
from classes import (
Col,
MusicMusterSignals,
RowAndTrack,
track_sequence,
)
from config import Config
from helpers import (
file_is_unreadable,
get_embedded_time,
get_relative_date,
ms_to_mmss,
set_track_metadata,
)
from log import log
from models import db, NoteColours, Playdates, PlaylistRows, Tracks
HEADER_NOTES_COLUMN = 1
scene_change_re = re.compile(r"SetScene=\[([^[\]]*)\]")
class PlaylistModel(QAbstractTableModel):
"""
The Playlist Model
Update strategy: update the database and then refresh the
row-indexed cached copy (self.playlist_rows). Do not edit
self.playlist_rows directly because keeping it and the
database in sync is uncessarily challenging.
refresh_row() will populate one row of playlist_rows from the
database
refresh_data() will repopulate all of playlist_rows from the
database
"""
def __init__(
self,
playlist_id: int,
*args: Optional[QObject],
**kwargs: Optional[QObject],
) -> None:
log.debug(f"PlaylistModel.__init__({playlist_id=})")
self.playlist_id = playlist_id
super().__init__(*args, **kwargs)
self.playlist_rows: dict[int, RowAndTrack] = {}
self.signals = MusicMusterSignals()
self.played_tracks_hidden = False
self.signals.begin_reset_model_signal.connect(self.begin_reset_model)
self.signals.end_reset_model_signal.connect(self.end_reset_model)
with db.Session() as session:
# Ensure row numbers in playlist are contiguous
PlaylistRows.fixup_rownumbers(session, playlist_id)
# Populate self.playlist_rows
self.refresh_data(session)
self.update_track_times()
def __repr__(self) -> str:
return (
f"<PlaylistModel: playlist_id={self.playlist_id}, {self.rowCount()} rows>"
)
def add_track_to_header(
self, row_number: int, track_id: int, note: Optional[str] = None
) -> None:
"""
Add track to existing header row
"""
log.debug(f"add_track_to_header({row_number=}, {track_id=}, {note=}")
# Get existing row
try:
rat = self.playlist_rows[row_number]
except KeyError:
log.error(
f"KeyError in PlaylistModel:add_track_to_header "
f"{row_number=}, {track_id=}, {len(self.playlist_rows)=}"
)
return
if rat.path:
log.error(
f"Error in PlaylistModel:add_track_to_header ({rat=}, "
"Header row already has track associated"
)
return
with db.Session() as session:
playlistrow = session.get(PlaylistRows, rat.playlistrow_id)
if playlistrow:
# Add track to PlaylistRows
playlistrow.track_id = track_id
# Add any further note (header will already have a note)
if note:
playlistrow.note += "\n" + note
# Update local copy
self.refresh_row(session, row_number)
# Repaint row
self.invalidate_row(row_number)
session.commit()
self.signals.resize_rows_signal.emit(self.playlist_id)
def background_role(self, row: int, column: int, rat: RowAndTrack) -> QBrush:
"""Return background setting"""
# Handle entire row colouring
# Header row
if self.is_header_row(row):
# Check for specific header colouring
with db.Session() as session:
note_colour = NoteColours.get_colour(session, rat.note)
if note_colour:
return QBrush(QColor(note_colour))
else:
return QBrush(QColor(Config.COLOUR_NOTES_PLAYLIST))
# Unreadable track file
if file_is_unreadable(rat.path):
return QBrush(QColor(Config.COLOUR_UNREADABLE))
# Current track
if track_sequence.current and track_sequence.current.track_id == rat.track_id:
return QBrush(QColor(Config.COLOUR_CURRENT_PLAYLIST))
# Next track
if track_sequence.next and track_sequence.next.track_id == rat.track_id:
return QBrush(QColor(Config.COLOUR_NEXT_PLAYLIST))
# Individual cell colouring
if column == Col.START_GAP.value:
if rat.start_gap and rat.start_gap >= Config.START_GAP_WARNING_THRESHOLD:
return QBrush(QColor(Config.COLOUR_LONG_START))
if column == Col.BITRATE.value:
if not rat.bitrate or rat.bitrate < Config.BITRATE_LOW_THRESHOLD:
return QBrush(QColor(Config.COLOUR_BITRATE_LOW))
elif rat.bitrate < Config.BITRATE_OK_THRESHOLD:
return QBrush(QColor(Config.COLOUR_BITRATE_MEDIUM))
else:
return QBrush(QColor(Config.COLOUR_BITRATE_OK))
if column == Col.NOTE.value:
if rat.note:
with db.Session() as session:
note_colour = NoteColours.get_colour(session, rat.note)
if note_colour:
return QBrush(QColor(note_colour))
return QBrush()
def begin_reset_model(self, playlist_id: int) -> None:
"""
Reset model if playlist_id is ours
"""
if playlist_id != self.playlist_id:
return
super().beginResetModel()
def columnCount(self, parent: QModelIndex = QModelIndex()) -> int:
"""Standard function for view"""
return len(Col)
def current_track_started(self) -> None:
"""
Notification from musicmuster that the current track has just
started playing
Actions required:
- sanity check
- change OBS scene if needed
- update display
- update track times
- update Playdates in database
- update PlaylistRows in database
- find next track
"""
if not track_sequence.current:
return
row_number = track_sequence.current.row_number
# Check for OBS scene change
log.debug("Call OBS scene change")
self.obs_scene_change(row_number)
if not track_sequence.current.track_id:
log.error(f"current_track_started() called with {track_sequence.current.track_id=}")
return
with db.Session() as session:
# Update Playdates in database
log.debug("update playdates")
Playdates(session, track_sequence.current.track_id)
# Mark track as played in playlist
log.debug("Mark track as played")
plr = session.get(PlaylistRows, track_sequence.current.playlistrow_id)
if plr:
plr.played = True
self.refresh_row(session, plr.row_number)
else:
log.error(f"Can't retrieve plr, {track_sequence.current.playlistrow_id=}")
# Update colour and times for current row
self.invalidate_row(row_number)
# Update previous row in case we're hiding played rows
if track_sequence.previous and track_sequence.previous.row_number:
self.invalidate_row(track_sequence.previous.row_number)
# Update all other track times
self.update_track_times()
# Find next track
# Get all unplayed track rows
log.debug("Find next track")
next_row = None
unplayed_rows = self.get_unplayed_rows()
if unplayed_rows:
try:
# Find next row after current track
next_row = min(
[
a
for a in unplayed_rows
if a > row_number and not self.is_header_row(a)
]
)
except ValueError:
# Find first unplayed track
next_row = min(unplayed_rows)
if next_row is not None:
self.set_next_row(next_row)
session.commit()
def data(
self, index: QModelIndex, role: int = Qt.ItemDataRole.DisplayRole
) -> QVariant:
"""Return data to view"""
if not index.isValid() or not (0 <= index.row() < len(self.playlist_rows)):
return QVariant()
row = index.row()
column = index.column()
# rat for playlist row data as it's used a lot
rat = self.playlist_rows[row]
# Dispatch to role-specific functions
dispatch_table = {
int(Qt.ItemDataRole.BackgroundRole): self.background_role,
int(Qt.ItemDataRole.DisplayRole): self.display_role,
int(Qt.ItemDataRole.EditRole): self.edit_role,
int(Qt.ItemDataRole.FontRole): self.font_role,
int(Qt.ItemDataRole.ToolTipRole): self.tooltip_role,
}
if role in dispatch_table:
return QVariant(dispatch_table[role](row, column, rat))
# Document other roles but don't use them
if role in [
Qt.ItemDataRole.DecorationRole,
Qt.ItemDataRole.StatusTipRole,
Qt.ItemDataRole.WhatsThisRole,
Qt.ItemDataRole.SizeHintRole,
Qt.ItemDataRole.TextAlignmentRole,
Qt.ItemDataRole.ForegroundRole,
Qt.ItemDataRole.CheckStateRole,
Qt.ItemDataRole.InitialSortOrderRole,
]:
return QVariant()
# Fall through to no-op
return QVariant()
def delete_rows(self, row_numbers: list[int]) -> None:
"""
Delete passed rows from model
Need to delete them in contiguous groups wrapped in beginRemoveRows / endRemoveRows
calls. To keep it simple, if inefficient, delete rows one by one.
TODO: delete in blocks
Delete from highest row back so that not yet deleted row numbers don't change.
"""
with db.Session() as session:
for row_number in sorted(row_numbers, reverse=True):
log.info(f"delete_rows(), {row_number=}")
super().beginRemoveRows(QModelIndex(), row_number, row_number)
# We need to remove data from the underlying data store,
# which is the database, but we cache in
# self.playlist_rows, which is what calls to data()
# reads, so fixup that too.
PlaylistRows.delete_row(session, self.playlist_id, row_number)
PlaylistRows.fixup_rownumbers(session, self.playlist_id)
self.refresh_data(session)
session.commit()
super().endRemoveRows()
self.reset_track_sequence_row_numbers()
def display_role(self, row: int, column: int, rat: RowAndTrack) -> QVariant:
"""
Return text for display
"""
# Set / reset column span
if column == HEADER_NOTES_COLUMN:
column_span = 1
if self.is_header_row(row):
column_span = self.columnCount() - 1
self.signals.span_cells_signal.emit(
self.playlist_id, row, HEADER_NOTES_COLUMN, 1, column_span
)
if self.is_header_row(row):
if column == HEADER_NOTES_COLUMN:
header_text = self.header_text(rat)
if not header_text:
return QVariant(Config.TEXT_NO_TRACK_NO_NOTE)
else:
return QVariant(self.header_text(rat))
else:
return QVariant()
if column == Col.START_TIME.value:
start_time = rat.forecast_start_time
if start_time:
return QVariant(start_time.strftime(Config.TRACK_TIME_FORMAT))
return QVariant()
if column == Col.END_TIME.value:
end_time = rat.forecast_end_time
if end_time:
return QVariant(end_time.strftime(Config.TRACK_TIME_FORMAT))
return QVariant()
if column == Col.INTRO.value:
if rat.intro:
return QVariant(f"{rat.intro / 1000:{Config.INTRO_SECONDS_FORMAT}}")
else:
return QVariant()
dispatch_table = {
Col.ARTIST.value: QVariant(rat.artist),
Col.BITRATE.value: QVariant(rat.bitrate),
Col.DURATION.value: QVariant(ms_to_mmss(rat.duration)),
Col.LAST_PLAYED.value: QVariant(get_relative_date(rat.lastplayed)),
Col.NOTE.value: QVariant(rat.note),
Col.START_GAP.value: QVariant(rat.start_gap),
Col.TITLE.value: QVariant(rat.title),
}
if column in dispatch_table:
return dispatch_table[column]
return QVariant()
def end_reset_model(self, playlist_id: int) -> None:
"""
End model reset if this is our playlist
"""
log.debug(f"end_reset_model({playlist_id=})")
if playlist_id != self.playlist_id:
log.debug(f"end_reset_model: not us ({self.playlist_id=})")
return
with db.Session() as session:
self.refresh_data(session)
super().endResetModel()
self.reset_track_sequence_row_numbers()
def edit_role(self, row: int, column: int, rat: RowAndTrack) -> QVariant:
"""
Return text for editing
"""
# If this is a header row and we're being asked for the
# HEADER_NOTES_COLUMN, return the note value
if self.is_header_row(row) and column == HEADER_NOTES_COLUMN:
return QVariant(rat.note)
if column == Col.INTRO.value:
return QVariant(rat.intro)
if column == Col.TITLE.value:
return QVariant(rat.title)
if column == Col.ARTIST.value:
return QVariant(rat.artist)
if column == Col.NOTE.value:
return QVariant(rat.note)
return QVariant()
def flags(self, index: QModelIndex) -> Qt.ItemFlag:
"""
Standard model flags
"""
if not index.isValid():
return Qt.ItemFlag.ItemIsDropEnabled
default = (
Qt.ItemFlag.ItemIsEnabled
| Qt.ItemFlag.ItemIsSelectable
| Qt.ItemFlag.ItemIsDragEnabled
)
if index.column() in [
Col.TITLE.value,
Col.ARTIST.value,
Col.NOTE.value,
Col.INTRO.value,
]:
return default | Qt.ItemFlag.ItemIsEditable
return default
def font_role(self, row: int, column: int, rat: RowAndTrack) -> QVariant:
"""
Return font
"""
# Notes column is never bold
if column == Col.NOTE.value:
return QVariant()
boldfont = QFont()
boldfont.setBold(not self.playlist_rows[row].played)
return QVariant(boldfont)
def get_duplicate_rows(self) -> list[int]:
"""
Return a list of duplicate rows. If track appears in rows 2, 3 and 4, return [3, 4]
(ie, ignore the first, not-yet-duplicate, track).
"""
log.info("get_duplicate_rows() called")
found = []
result = []
for i in range(len(self.playlist_rows)):
track_id = self.playlist_rows[i].track_id
if track_id is None:
continue
if track_id in found:
result.append(i)
else:
found.append(track_id)
log.info(f"get_duplicate_rows() returned: {result=}")
return result
def _get_new_row_number(self, proposed_row_number: Optional[int]) -> int:
"""
Sanitises proposed new row number.
If proposed_row_number given, ensure it is valid.
If not given, return row number to add to end of model.
"""
log.debug(f"_get_new_row_number({proposed_row_number=})")
if proposed_row_number is None or proposed_row_number > len(self.playlist_rows):
# We are adding to the end of the list
new_row_number = len(self.playlist_rows)
elif proposed_row_number < 0:
# Add to start of list
new_row_number = 0
else:
new_row_number = proposed_row_number
log.debug(f"get_new_row_number() return: {new_row_number=}")
return new_row_number
def get_row_info(self, row_number: int) -> RowAndTrack:
"""
Return info about passed row
"""
return self.playlist_rows[row_number]
def get_row_track_id(self, row_number: int) -> Optional[int]:
"""
Return id of track associated with row or None if no track associated
"""
return self.playlist_rows[row_number].track_id
def get_row_track_path(self, row_number: int) -> str:
"""
Return path of track associated with row or empty string if no track associated
"""
return self.playlist_rows[row_number].path
def get_rows_duration(self, row_numbers: list[int]) -> int:
"""
Return the total duration of the passed rows
"""
duration = 0
for row_number in row_numbers:
duration += self.playlist_rows[row_number].duration
return duration
def get_unplayed_rows(self) -> list[int]:
"""
Return a list of unplayed row numbers
"""
result = [
a.row_number
for a in self.playlist_rows.values()
if not a.played and a.track_id is not None
]
log.debug(f"get_unplayed_rows() returned: {result=}")
return result
def headerData(
self,
section: int,
orientation: Qt.Orientation,
role: int = Qt.ItemDataRole.DisplayRole,
) -> QVariant:
"""
Return text for headers
"""
display_dispatch_table = {
Col.START_GAP.value: QVariant(Config.HEADER_START_GAP),
Col.INTRO.value: QVariant(Config.HEADER_INTRO),
Col.TITLE.value: QVariant(Config.HEADER_TITLE),
Col.ARTIST.value: QVariant(Config.HEADER_ARTIST),
Col.DURATION.value: QVariant(Config.HEADER_DURATION),
Col.START_TIME.value: QVariant(Config.HEADER_START_TIME),
Col.END_TIME.value: QVariant(Config.HEADER_END_TIME),
Col.LAST_PLAYED.value: QVariant(Config.HEADER_LAST_PLAYED),
Col.BITRATE.value: QVariant(Config.HEADER_BITRATE),
Col.NOTE.value: QVariant(Config.HEADER_NOTE),
}
if role == Qt.ItemDataRole.DisplayRole:
if orientation == Qt.Orientation.Horizontal:
return display_dispatch_table[section]
else:
if Config.ROWS_FROM_ZERO:
return QVariant(str(section))
else:
return QVariant(str(section + 1))
elif role == Qt.ItemDataRole.FontRole:
boldfont = QFont()
boldfont.setBold(True)
return QVariant(boldfont)
return QVariant()
def header_text(self, rat: RowAndTrack) -> str:
"""
Process possible section timing directives embeded in header
"""
if rat.note.endswith("+"):
return self.start_of_timed_section_header(rat)
elif rat.note.endswith("="):
return self.section_subtotal_header(rat)
elif rat.note == "-":
# If the hyphen is the only thing on the line, echo the note
# that started the section without the trailing "+".
for row_number in range(rat.row_number - 1, -1, -1):
row_rat = self.playlist_rows[row_number]
if self.is_header_row(row_number):
if row_rat.note.endswith("-"):
# We didn't find a matching section start
break
if row_rat.note.endswith("+"):
return f"[End: {row_rat.note[:-1]}]"
return "-"
return rat.note
def hide_played_tracks(self, hide: bool) -> None:
"""
Set played tracks hidden according to 'hide'
"""
self.played_tracks_hidden = hide
for row_number in range(len(self.playlist_rows)):
if self.is_played_row(row_number):
self.invalidate_row(row_number)
def insert_row(
self,
proposed_row_number: Optional[int],
track_id: Optional[int] = None,
note: str = "",
) -> None:
"""
Insert a row.
"""
log.debug(f"insert_row({proposed_row_number=}, {track_id=}, {note=})")
new_row_number = self._get_new_row_number(proposed_row_number)
with db.Session() as session:
super().beginInsertRows(QModelIndex(), new_row_number, new_row_number)
_ = PlaylistRows.insert_row(
session=session,
playlist_id=self.playlist_id,
new_row_number=new_row_number,
note=note,
track_id=track_id,
)
session.commit()
self.refresh_data(session)
super().endInsertRows()
self.signals.resize_rows_signal.emit(self.playlist_id)
self.reset_track_sequence_row_numbers()
self.invalidate_rows(list(range(new_row_number, len(self.playlist_rows))))
def invalidate_row(self, modified_row: int) -> None:
"""
Signal to view to refresh invlidated row
"""
self.dataChanged.emit(
self.index(modified_row, 0),
self.index(modified_row, self.columnCount() - 1),
)
def invalidate_rows(self, modified_rows: list[int]) -> None:
"""
Signal to view to refresh invlidated rows
"""
for modified_row in modified_rows:
self.invalidate_row(modified_row)
def is_header_row(self, row_number: int) -> bool:
"""
Return True if row is a header row, else False
"""
if row_number in self.playlist_rows:
return self.playlist_rows[row_number].path == ""
return False
def is_played_row(self, row_number: int) -> bool:
"""
Return True if row is an unplayed track row, else False
"""
return self.playlist_rows[row_number].played
def is_track_in_playlist(self, track_id: int) -> Optional[RowAndTrack]:
"""
If this track_id is in the playlist, return the PlaylistRowData object
else return None
"""
for row_number in range(len(self.playlist_rows)):
if self.playlist_rows[row_number].track_id == track_id:
return self.playlist_rows[row_number]
return None
def mark_unplayed(self, row_numbers: list[int]) -> None:
"""
Mark row as unplayed
"""
with db.Session() as session:
for row_number in row_numbers:
plr = session.get(PlaylistRows, self.playlist_rows[row_number].row_number)
if not plr:
return
plr.played = False
session.commit()
self.refresh_row(session, row_number)
self.update_track_times()
self.invalidate_rows(row_numbers)
def move_rows(self, from_rows: list[int], to_row_number: int) -> None:
"""
Move the playlist rows given to to_row and below.
"""
log.debug(f"move_rows({from_rows=}, {to_row_number=}")
# Build a {current_row_number: new_row_number} dictionary
row_map: dict[int, int] = {}
# The destination row number will need to be reduced by the
# number of rows being move from above the destination row
# otherwise rows below the destination row will end up above the
# moved rows.
adjusted_to_row = to_row_number - len(
[a for a in from_rows if a < to_row_number]
)
# Put the from_row row numbers into the row_map. Ultimately the
# total number of elements in the playlist doesn't change, so
# check that adding the moved rows starting at to_row won't
# overshoot the end of the playlist.
if adjusted_to_row + len(from_rows) > len(self.playlist_rows):
next_to_row = len(self.playlist_rows) - len(from_rows)
else:
next_to_row = adjusted_to_row
# zip iterates from_row and to_row simultaneously from the
# respective sequences inside zip()
for from_row, to_row in zip(
from_rows, range(next_to_row, next_to_row + len(from_rows))
):
row_map[from_row] = to_row
# Move the remaining rows to the row_map. We want to fill it
# before (if there are gaps) and after (likewise) the rows that
# are moving.
# zip iterates old_row and new_row simultaneously from the
# respective sequences inside zip()
for old_row, new_row in zip(
[x for x in self.playlist_rows.keys() if x not in from_rows],
[y for y in range(len(self.playlist_rows)) if y not in row_map.values()],
):
# Optimise: only add to map if there is a change
if old_row != new_row:
row_map[old_row] = new_row
# Check to see whether any rows in track_sequence have moved
if track_sequence.previous and track_sequence.previous.row_number in row_map:
track_sequence.previous.row_number = row_map[
track_sequence.previous.row_number
]
if track_sequence.current and track_sequence.current.row_number in row_map:
track_sequence.current.row_number = row_map[
track_sequence.current.row_number
]
if track_sequence.next and track_sequence.next.row_number in row_map:
track_sequence.next.row_number = row_map[track_sequence.next.row_number]
# For SQLAlchemy, build a list of dictionaries that map playlistrow_id to
# new row number:
sqla_map: list[dict[str, int]] = []
for oldrow, newrow in row_map.items():
playlistrow_id = self.playlist_rows[oldrow].playlistrow_id
sqla_map.append({"playlistrow_id": playlistrow_id, "row_number": newrow})
with db.Session() as session:
PlaylistRows.update_plr_row_numbers(session, self.playlist_id, sqla_map)
session.commit()
# Update playlist_rows
self.refresh_data(session)
# Update display
self.reset_track_sequence_row_numbers()
self.update_track_times()
self.invalidate_rows(list(row_map.keys()))
def move_rows_between_playlists(
self, from_rows: list[int], to_row_number: int, to_playlist_id: int
) -> None:
"""
Move the playlist rows given to to_row and below of to_playlist.
"""
log.debug(
f"move_rows_between_playlists({from_rows=}, {to_row_number=}, {to_playlist_id=}"
)
# Row removal must be wrapped in beginRemoveRows ..
# endRemoveRows and the row range must be contiguous. Process
# the highest rows first so the lower row numbers are unchanged
row_groups = self._reversed_contiguous_row_groups(from_rows)
next_to_row = to_row_number
# Prepare destination playlist for a reset
self.signals.begin_reset_model_signal.emit(to_playlist_id)
with db.Session() as session:
# Make room in destination playlist
max_destination_row_number = PlaylistRows.get_last_used_row(
session, to_playlist_id
)
if (
max_destination_row_number
and to_row_number <= max_destination_row_number
):
# Move the destination playlist rows down to make room.
PlaylistRows.move_rows_down(
session, to_playlist_id, to_row_number, len(from_rows)
)
for row_group in row_groups:
super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group))
for playlist_row in PlaylistRows.plrids_to_plrs(
session,
self.playlist_id,
[self.playlist_rows[a].playlistrow_id for a in row_group],
):
if (
track_sequence.current
and playlist_row.id == track_sequence.current.playlistrow_id
):
# Don't move current track
continue
playlist_row.playlist_id = to_playlist_id
playlist_row.row_number = next_to_row
next_to_row += 1
self.refresh_data(session)
super().endRemoveRows()
# We need to remove gaps in row numbers after tracks have
# moved.
PlaylistRows.fixup_rownumbers(session, self.playlist_id)
self.refresh_data(session)
session.commit()
# Reset of model must come after session has been closed
self.reset_track_sequence_row_numbers()
self.signals.end_reset_model_signal.emit(to_playlist_id)
self.update_track_times()
def move_track_add_note(
self, new_row_number: int, existing_rat: RowAndTrack, note: str
) -> None:
"""
Move existing_rat track to new_row_number and append note to any existing note
"""
log.info(f"move_track_add_note({new_row_number=}, {existing_rat=}, {note=}")
if note:
with db.Session() as session:
plr = session.get(PlaylistRows, existing_rat.playlistrow_id)
if plr:
if plr.note:
plr.note += "\n" + note
else:
plr.note = note
session.commit()
# Carry out the move outside of the session context to ensure
# database updated with any note change
self.move_rows([existing_rat.row_number], new_row_number)
self.signals.resize_rows_signal.emit(self.playlist_id)
def move_track_to_header(
self,
header_row_number: int,
existing_rat: RowAndTrack,
note: Optional[str],
) -> None:
"""
Add the existing_rat track details to the existing header at header_row_number
"""
log.info(f"move_track_to_header({header_row_number=}, {existing_rat=}, {note=}")
if existing_rat.track_id:
if note and existing_rat.note:
note += "\n" + existing_rat.note
self.add_track_to_header(header_row_number, existing_rat.track_id, note)
self.delete_rows([existing_rat.row_number])
def obs_scene_change(self, row_number: int) -> None:
"""
Check this row and any preceding headers for OBS scene change command
and execute any found
"""
log.info(f"obs_scene_change({row_number=})")
# Check any headers before this row
idx = row_number - 1
while self.is_header_row(idx):
idx -= 1
# Step through headers in row order and finish with this row
for chkrow in range(idx + 1, row_number + 1):
match_obj = scene_change_re.search(self.playlist_rows[chkrow].note)
if match_obj:
scene_name = match_obj.group(1)
if scene_name:
ws = obswebsocket.obsws(
host=Config.OBS_HOST,
port=Config.OBS_PORT,
password=Config.OBS_PASSWORD,
)
try:
ws.connect()
ws.call(
obswebsocket.requests.SetCurrentProgramScene(
sceneName=scene_name
)
)
log.info(f"OBS scene changed to '{scene_name}'")
continue
except obswebsocket.exceptions.ConnectionFailure:
log.error("OBS connection refused")
return
def previous_track_ended(self) -> None:
"""
Notification from musicmuster that the previous track has ended.
Actions required:
- sanity check
- update display
"""
log.info("previous_track_ended()")
# Sanity check
if not track_sequence.previous:
log.error("playlistmodel:previous_track_ended called with no current track")
return
if track_sequence.previous.row_number is None:
log.error(
"playlistmodel:previous_track_ended called with no row number "
f"({track_sequence.previous=})"
)
return
# Update display
self.invalidate_row(track_sequence.previous.row_number)
def refresh_data(self, session: db.session) -> None:
"""Populate dicts for data calls"""
# Populate self.playlist_rows with playlist data
self.playlist_rows.clear()
for p in PlaylistRows.deep_rows(session, self.playlist_id):
self.playlist_rows[p.row_number] = RowAndTrack(p)
def refresh_row(self, session, row_number):
"""Populate dict for one row from database"""
p = PlaylistRows.deep_row(session, self.playlist_id, row_number)
self.playlist_rows[row_number] = RowAndTrack(p)
def remove_track(self, row_number: int) -> None:
"""
Remove track from row, retaining row as a header row
"""
log.info(f"remove_track({row_number=})")
with db.Session() as session:
plr = session.get(PlaylistRows, self.playlist_rows[row_number].playlistrow_id)
if plr:
plr.track_id = None
session.commit()
self.refresh_row(session, row_number)
self.invalidate_row(row_number)
def rescan_track(self, row_number: int) -> None:
"""
Rescan track at passed row number
"""
track_id = self.playlist_rows[row_number].track_id
if track_id:
with db.Session() as session:
track = session.get(Tracks, track_id)
set_track_metadata(track)
self.refresh_row(session, row_number)
self.update_track_times()
self.invalidate_row(row_number)
self.signals.resize_rows_signal.emit(self.playlist_id)
session.commit()
def reset_track_sequence_row_numbers(self) -> None:
"""
Signal handler for when row ordering has changed.
Example: row 4 is marked as next. Row 2 is deleted. The PlaylistRows table will
be correctly updated with change of row number, but track_sequence.next will still
contain row_number==4. This function fixes up the track_sequence row numbers by
looking up the playlistrow_id and retrieving the row number from the database.
"""
log.debug("reset_track_sequence_row_numbers()")
# Check the track_sequence.next, current and previous plrs and
# update the row number
with db.Session() as session:
for ts in [
track_sequence.next,
track_sequence.current,
track_sequence.previous,
]:
if ts and ts.playlist_id == self.playlist_id and ts.row_number:
playlist_row = session.get(PlaylistRows, ts.playlistrow_id)
if playlist_row and playlist_row.row_number != ts.row_number:
ts.row_number = playlist_row.row_number
self.update_track_times()
def _reversed_contiguous_row_groups(
self, row_numbers: list[int]
) -> list[list[int]]:
"""
Take the list of row numbers and split into groups of contiguous rows. Return as a list
of lists with the highest row numbers first.
Example:
input [2, 3, 4, 5, 7, 9, 10, 13, 17, 20, 21]
return: [[20, 21], [17], [13], [9, 10], [7], [2, 3, 4, 5]]
"""
log.debug(f"_reversed_contiguous_row_groups({row_numbers=} called")
result: list[list[int]] = []
temp: list[int] = []
last_value = row_numbers[0] - 1
for idx in range(len(row_numbers)):
if row_numbers[idx] != last_value + 1:
result.append(temp)
temp = []
last_value = row_numbers[idx]
temp.append(last_value)
if temp:
result.append(temp)
result.reverse()
log.debug(f"_reversed_contiguous_row_groups() returned: {result=}")
return result
def rowCount(self, index: QModelIndex = QModelIndex()) -> int:
"""Standard function for view"""
return len(self.playlist_rows)
def section_subtotal_header(self, rat: RowAndTrack) -> str:
"""
Process this row as subtotal within a timed section and
return display text for this row
"""
count: int = 0
unplayed_count: int = 0
duration: int = 0
# Show subtotal
for row_number in range(rat.row_number - 1, -1, -1):
row_rat = self.playlist_rows[row_number]
if self.is_header_row(row_number):
if row_rat.note.endswith("-"):
# There was no start of section
return rat.note
if row_rat.note.endswith(("+", "=")):
# If we are playing this section, also
# calculate end time if all tracks are played.
end_time_str = ""
if (
track_sequence.current
and track_sequence.current.end_time
and (
row_number
< track_sequence.current.row_number
< rat.row_number
)
):
section_end_time = (
track_sequence.current.end_time
+ dt.timedelta(milliseconds=duration)
)
end_time_str = (
", section end time "
+ section_end_time.strftime(Config.TRACK_TIME_FORMAT)
)
stripped_note = rat.note[:-1].strip()
if stripped_note:
return (
f"{stripped_note} ["
f"{unplayed_count}/{count} track{'s' if count > 1 else ''} "
f"({ms_to_mmss(duration)}) unplayed{end_time_str}]"
)
else:
return (
f"[{unplayed_count}/{count} track{'s' if count > 1 else ''} "
f"({ms_to_mmss(duration)}) unplayed{end_time_str}]"
)
else:
continue
else:
count += 1
if not row_rat.played:
unplayed_count += 1
duration += row_rat.duration
# Should never get here
return f"Error calculating subtotal ({row_rat.note})"
def selection_is_sortable(self, row_numbers: list[int]) -> bool:
"""
Return True if the selection is sortable. That means:
- at least two rows selected
- selected rows are contiguous
- selected rows do not include any header rows
"""
# at least two rows selected
if len(row_numbers) < 2:
return False
# selected rows are contiguous
if sorted(row_numbers) != list(range(min(row_numbers), max(row_numbers) + 1)):
return False
# selected rows do not include any header rows
for row_number in row_numbers:
if self.is_header_row(row_number):
return False
return True
def set_next_row(self, row_number: Optional[int]) -> bool:
"""
Set row_number as next track. If row_number is None, clear next track.
Return True if successful else False.
"""
log.debug(f"set_next_row({row_number=})")
if row_number is None:
# Clear next track
if track_sequence.next is not None:
track_sequence.next = None
else:
return True
else:
# Get playlistrow_id of row
try:
rat = self.playlist_rows[row_number]
except IndexError:
log.error(
f"playlistmodel.set_track_sequence.next({row_number=}, "
f"{self.playlist_id=}"
"IndexError"
)
return False
if rat.track_id is None or rat.row_number is None:
log.error(
f"playlistmodel.set_track_sequence.next({row_number=}, "
"No track / row number "
f"{self.playlist_id=}, {rat.track_id=}, {rat.row_number=}"
)
return False
old_next_row: Optional[int] = None
if track_sequence.next:
old_next_row = track_sequence.next.row_number
track_sequence.next = rat
track_sequence.next.create_fade_graph()
self.invalidate_row(row_number)
if Config.WIKIPEDIA_ON_NEXT:
self.signals.search_wikipedia_signal.emit(
self.playlist_rows[row_number].title
)
if Config.SONGFACTS_ON_NEXT:
self.signals.search_songfacts_signal.emit(
self.playlist_rows[row_number].title
)
if old_next_row:
self.invalidate_row(old_next_row)
self.invalidate_row(row_number)
self.signals.next_track_changed_signal.emit()
self.update_track_times()
return True
def setData(
self,
index: QModelIndex,
value: str | float,
role: int = Qt.ItemDataRole.EditRole,
) -> bool:
"""
Update model with edited data
"""
if index.isValid() and role == Qt.ItemDataRole.EditRole:
row_number = index.row()
column = index.column()
with db.Session() as session:
plr = session.get(PlaylistRows, self.playlist_rows[row_number].playlistrow_id)
if not plr:
print(
f"Error saving data: {row_number=}, {column=}, "
f"{value=}, {self.playlist_id=}"
)
return False
if plr.track_id:
if column in [Col.TITLE.value, Col.ARTIST.value, Col.INTRO.value]:
track = session.get(Tracks, plr.track_id)
if not track:
print(f"Error retreiving track: {plr=}")
return False
if column == Col.TITLE.value:
track.title = str(value)
elif column == Col.ARTIST.value:
track.artist = str(value)
elif column == Col.INTRO.value:
track.intro = int(round(float(value), 1) * 1000)
else:
print(f"Error updating track: {column=}, {value=}")
return False
elif column == Col.NOTE.value:
plr.note = str(value)
else:
# This is a header row
if column == HEADER_NOTES_COLUMN:
plr.note = str(value)
# commit changes before refreshing data
session.commit()
self.refresh_row(session, row_number)
self.dataChanged.emit(index, index, [Qt.ItemDataRole.DisplayRole, role])
return True
return False
def sort_by_artist(self, row_numbers: list[int]) -> None:
"""
Sort selected rows by artist
"""
self.sort_by_attribute(row_numbers, "artist")
def sort_by_attribute(self, row_numbers: list[int], attr_name: str) -> None:
"""
Sort selected rows by passed attribute name where 'attribute' is a
key in PlaylistRowData
"""
# Create a subset of playlist_rows with the rows we are
# interested in
shortlist_rows = {k: self.playlist_rows[k] for k in row_numbers}
sorted_list = [
plr.row_number
for plr in sorted(shortlist_rows.values(), key=attrgetter(attr_name))
]
self.move_rows(sorted_list, min(sorted_list))
def sort_by_duration(self, row_numbers: list[int]) -> None:
"""
Sort selected rows by duration
"""
self.sort_by_attribute(row_numbers, "duration")
def sort_by_lastplayed(self, row_numbers: list[int]) -> None:
"""
Sort selected rows by lastplayed
"""
self.sort_by_attribute(row_numbers, "lastplayed")
def sort_randomly(self, row_numbers: list[int]) -> None:
"""
Sort selected rows randomly
"""
shuffle(row_numbers)
self.move_rows(row_numbers, min(row_numbers))
def sort_by_title(self, row_numbers: list[int]) -> None:
"""
Sort selected rows by title
"""
self.sort_by_attribute(row_numbers, "title")
def start_of_timed_section_header(self, rat: RowAndTrack) -> str:
"""
Process this row as the start of a timed section and
return display text for this row
"""
count: int = 0
unplayed_count: int = 0
duration: int = 0
for row_number in range(rat.row_number + 1, len(self.playlist_rows)):
row_rat = self.playlist_rows[row_number]
if self.is_header_row(row_number):
if row_rat.note.endswith("-"):
return (
f"{rat.note[:-1].strip()} "
f"[{count} tracks, {ms_to_mmss(duration)} unplayed]"
)
else:
continue
else:
count += 1
if not row_rat.played:
unplayed_count += 1
duration += row_rat.duration
return (
f"{rat.note[:-1].strip()} "
f"[{count} tracks, {ms_to_mmss(duration, none='none')} "
"unplayed (to end of playlist)]"
)
def supportedDropActions(self) -> Qt.DropAction:
return Qt.DropAction.MoveAction | Qt.DropAction.CopyAction
def tooltip_role(self, row: int, column: int, rat: RowAndTrack) -> QVariant:
"""
Return tooltip. Currently only used for last_played column.
"""
if column != Col.LAST_PLAYED.value:
return QVariant()
with db.Session() as session:
track_id = self.playlist_rows[row].track_id
if not track_id:
return QVariant()
playdates = Playdates.last_playdates(session, track_id)
return QVariant(
"<br>".join(
[
a.lastplayed.strftime(Config.LAST_PLAYED_TOOLTIP_DATE_FORMAT)
for a in reversed(playdates)
]
)
)
def update_track_times(self) -> None:
"""
Update track start/end times in self.playlist_rows
"""
log.debug("update_track_times()")
next_start_time: Optional[dt.datetime] = None
update_rows: list[int] = []
row_count = len(self.playlist_rows)
current_track_row = None
next_track_row = None
if (
track_sequence.current
and track_sequence.current.playlist_id == self.playlist_id
):
current_track_row = track_sequence.current.row_number
# Update current track details now so that they are available
# when we deal with next track row which may be above current
# track row.
self.playlist_rows[current_track_row].set_forecast_start_time(
update_rows, track_sequence.current.start_time
)
if track_sequence.next and track_sequence.next.playlist_id == self.playlist_id:
next_track_row = track_sequence.next.row_number
for row_number in range(row_count):
rat = self.playlist_rows[row_number]
# Don't update times for tracks that have been played, for
# unreadable tracks or for the current track, handled above.
if (
rat.played
or row_number == current_track_row
or (rat.path and file_is_unreadable(rat.path))
):
continue
# Reset start time if timing in header
if self.is_header_row(row_number):
header_time = get_embedded_time(rat.note)
if header_time:
next_start_time = header_time
continue
# Set start time for next row if we have a current track
if (
row_number == next_track_row
and track_sequence.current
and track_sequence.current.end_time
):
next_start_time = rat.set_forecast_start_time(
update_rows, track_sequence.current.end_time
)
continue
# If we're between the current and next row, zero out
# times
if (current_track_row or row_count) < row_number < (next_track_row or 0):
rat.set_forecast_start_time(update_rows, None)
continue
# Set start/end
next_start_time = rat.set_forecast_start_time(update_rows, next_start_time)
# Update start/stop times of rows that have changed
for updated_row in update_rows:
self.dataChanged.emit(
self.index(updated_row, Col.START_TIME.value),
self.index(updated_row, Col.END_TIME.value),
)
class PlaylistProxyModel(QSortFilterProxyModel):
"""
For searching and filtering
"""
def __init__(
self,
source_model: PlaylistModel,
*args: QObject,
**kwargs: QObject,
) -> None:
self.source_model = source_model
super().__init__(*args, **kwargs)
self.setSourceModel(source_model)
# Search all columns
self.setFilterKeyColumn(-1)
def __repr__(self) -> str:
return f"<PlaylistProxyModel: source_model={self.source_model}>"
def filterAcceptsRow(self, source_row: int, source_parent: QModelIndex) -> bool:
"""
Subclass to filter by played status. Return True to show this row, False to hide it.
"""
if self.source_model.played_tracks_hidden:
if self.source_model.is_played_row(source_row):
# Don't hide current track
if (
track_sequence.current
and track_sequence.current.playlist_id
== self.source_model.playlist_id
and track_sequence.current.row_number == source_row
):
return True
# Don't hide next track
if (
track_sequence.next
and track_sequence.next.playlist_id == self.source_model.playlist_id
and track_sequence.next.row_number == source_row
):
return True
# Handle previous track
if track_sequence.previous:
if (
track_sequence.previous.playlist_id
!= self.source_model.playlist_id
or track_sequence.previous.row_number != source_row
):
# This row isn't our previous track: hide it
return False
if track_sequence.current and track_sequence.current.start_time:
# This row is our previous track. Don't hide it
# until HIDE_AFTER_PLAYING_OFFSET milliseconds
# after current track has started
if track_sequence.current.start_time and dt.datetime.now() > (
track_sequence.current.start_time
+ dt.timedelta(
milliseconds=Config.HIDE_AFTER_PLAYING_OFFSET
)
):
return False
else:
# Invalidate this row in
# HIDE_AFTER_PLAYING_OFFSET and a bit
# milliseconds so that it hides then. We add
# 100mS on so that the if clause above is
# true next time through.
QTimer.singleShot(
Config.HIDE_AFTER_PLAYING_OFFSET + 100,
lambda: self.source_model.invalidate_row(source_row),
)
return True
# Next track not playing yet so don't hide previous
else:
return True
# No previous track so hide this played track immediately
return False
return super().filterAcceptsRow(source_row, source_parent)
def set_incremental_search(self, search_string: str) -> None:
"""
Update search pattern
"""
self.setFilterRegularExpression(
QRegularExpression(
search_string, QRegularExpression.PatternOption.CaseInsensitiveOption
)
)
# ######################################
# Forward functions not handled in proxy
# ######################################
def current_track_started(self):
return self.source_model.current_track_started()
def delete_rows(self, row_numbers: list[int]) -> None:
return self.source_model.delete_rows(row_numbers)
def get_duplicate_rows(self) -> list[int]:
return self.source_model.get_duplicate_rows()
def get_rows_duration(self, row_numbers: list[int]) -> int:
return self.source_model.get_rows_duration(row_numbers)
def get_row_info(self, row_number: int) -> RowAndTrack:
return self.source_model.get_row_info(row_number)
def get_row_track_path(self, row_number: int) -> str:
return self.source_model.get_row_track_path(row_number)
def get_unplayed_rows(self) -> list[int]:
return self.source_model.get_unplayed_rows()
def hide_played_tracks(self, hide: bool) -> None:
return self.source_model.hide_played_tracks(hide)
def insert_row(
self,
proposed_row_number: Optional[int],
track_id: Optional[int] = None,
note: str = "",
) -> None:
return self.source_model.insert_row(proposed_row_number, track_id, note)
def is_header_row(self, row_number: int) -> bool:
return self.source_model.is_header_row(row_number)
def is_played_row(self, row_number: int) -> bool:
return self.source_model.is_played_row(row_number)
def is_track_in_playlist(self, track_id: int) -> Optional[RowAndTrack]:
return self.source_model.is_track_in_playlist(track_id)
def mark_unplayed(self, row_numbers: list[int]) -> None:
return self.source_model.mark_unplayed(row_numbers)
def move_rows(self, from_rows: list[int], to_row_number: int) -> None:
return self.source_model.move_rows(from_rows, to_row_number)
def move_rows_between_playlists(
self, from_rows: list[int], to_row_number: int, to_playlist_id: int
) -> None:
return self.source_model.move_rows_between_playlists(
from_rows, to_row_number, to_playlist_id
)
def move_track_add_note(
self, new_row_number: int, existing_rat: RowAndTrack, note: str
) -> None:
return self.source_model.move_track_add_note(new_row_number, existing_rat, note)
def move_track_to_header(
self,
header_row_number: int,
existing_rat: RowAndTrack,
note: Optional[str],
) -> None:
return self.source_model.move_track_to_header(
header_row_number, existing_rat, note
)
def previous_track_ended(self) -> None:
return self.source_model.previous_track_ended()
def remove_track(self, row_number: int) -> None:
return self.source_model.remove_track(row_number)
def rescan_track(self, row_number: int) -> None:
return self.source_model.rescan_track(row_number)
def set_next_row(self, row_number: Optional[int]) -> bool:
return self.source_model.set_next_row(row_number)
def sort_by_artist(self, row_numbers: list[int]) -> None:
return self.source_model.sort_by_artist(row_numbers)
def sort_by_duration(self, row_numbers: list[int]) -> None:
return self.source_model.sort_by_duration(row_numbers)
def sort_by_lastplayed(self, row_numbers: list[int]) -> None:
return self.source_model.sort_by_lastplayed(row_numbers)
def sort_randomly(self, row_numbers: list[int]) -> None:
return self.source_model.sort_randomly(row_numbers)
def sort_by_title(self, row_numbers: list[int]) -> None:
return self.source_model.sort_by_title(row_numbers)
def update_track_times(self) -> None:
return self.source_model.update_track_times()