musicmuster/app/playlistmodel.py
Keith Edmunds 86c3c3fd80 Black
2025-04-22 21:56:44 +01:00

1745 lines
58 KiB
Python

# Standard library imports
from __future__ import annotations
from operator import attrgetter
from random import shuffle
from typing import cast
import datetime as dt
import re
# PyQt imports
from PyQt6.QtCore import (
QAbstractTableModel,
QModelIndex,
QRegularExpression,
QSortFilterProxyModel,
Qt,
QTimer,
QVariant,
)
from PyQt6.QtGui import (
QBrush,
QColor,
QFont,
)
# Third party imports
# import line_profiler
import obswebsocket # type: ignore
# import snoop # type: ignore
# App imports
from classes import (
ApplicationError,
Col,
InsertRows,
InsertTrack,
MusicMusterSignals,
SelectedRows,
TrackAndPlaylist,
)
from config import Config
from helpers import (
ask_yes_no,
file_is_unreadable,
get_all_track_metadata,
get_embedded_time,
get_relative_date,
ms_to_mmss,
)
from log import log, log_call
from playlistrow import PlaylistRow, TrackSequence
import ds
HEADER_NOTES_COLUMN = 1
scene_change_re = re.compile(r"SetScene=\[([^[\]]*)\]")
class PlaylistModel(QAbstractTableModel):
"""
The Playlist Model
Cache the database info in self.playlist_rows, a dictionary of
PlaylistRow objects indexed by row_number.
Update strategy: update the database and then refresh the
row-indexed cached copy (self.playlist_rows). Do not edit
self.playlist_rows directly because keeping it and the
database in sync is uncessarily challenging.
refresh_row() will populate one row of playlist_rows from the
database
refresh_data() will repopulate all of playlist_rows from the
database.
"""
def __init__(
self,
playlist_id: int,
is_template: bool,
) -> None:
super().__init__()
self.playlist_id = playlist_id
self.is_template = is_template
self.track_sequence = TrackSequence()
self.playlist_rows: dict[int, PlaylistRow] = {}
self.selected_rows: list[PlaylistRow] = []
self.signals = MusicMusterSignals()
self.played_tracks_hidden = False
# Connect signals
self.signals.signal_add_track_to_header.connect(
self.signal_add_track_to_header_handler
)
self.signals.signal_begin_insert_rows.connect(self.begin_insert_rows_handler)
self.signals.signal_end_insert_rows.connect(self.end_insert_rows_handler)
self.signals.signal_insert_track.connect(self.insert_row_signal_handler)
self.signals.signal_playlist_selected_rows.connect(
self.playlist_selected_rows_handler
)
self.signals.signal_set_next_row.connect(self.set_next_row_handler)
self.signals.signal_track_started.connect(self.track_started_handler)
self.signals.signal_track_ended.connect(self.signal_track_ended_handler)
self.signals.signal_next_track_changed.connect(self.next_track_changed_handler)
# Populate self.playlist_rows
for dto in ds.playlistrows_by_playlist(self.playlist_id):
self.playlist_rows[dto.row_number] = PlaylistRow(dto)
self.update_track_times()
def __repr__(self) -> str:
return (
f"<PlaylistModel: playlist_id={self.playlist_id}, "
f"is_template={self.is_template}, "
f"{self.rowCount()} rows>"
)
def active_section_header(self) -> int:
"""
Return the row number of the first header that has any of the following below it:
- unplayed tracks
- the currently being played track
- the track marked as next to play
"""
header_row = 0
for row_number in range(len(self.playlist_rows)):
if self.is_header_row(row_number):
header_row = row_number
continue
if not self.is_played_row(row_number):
break
# Here means that row_number points to a played track. The
# current track will be marked as played when we start
# playing it. It's also possible that the track marked as
# next has already been played. Check for either of those.
for ts in [self.track_sequence.next, self.track_sequence.current]:
if (
ts
and ts.row_number == row_number
and ts.playlist_id == self.playlist_id
):
# We've found the current or next track, so return
# the last-found header row
return header_row
return header_row
# @log_call
def signal_add_track_to_header_handler(
self, track_and_playlist: TrackAndPlaylist
) -> None:
"""
Handle signal_add_track_to_header
"""
if track_and_playlist.playlist_id != self.playlist_id:
return
if not self.selected_rows:
self.signals.show_warning_signal.emit(
"Add track to header", "Add track to header but no row selected"
)
if len(self.selected_rows) > 1:
self.signals.show_warning_signal.emit(
"Add track to header", "Select one header to add track to"
)
return
selected_row = self.selected_rows[0]
if selected_row.path:
self.signals.show_warning_signal.emit(
"Add track to header", "Select a header to add track to"
)
return
selected_row.track_id = track_and_playlist.track_id
# Update local copy
self.refresh_row(selected_row.row_number)
# Repaint row
roles_to_invalidate = [
Qt.ItemDataRole.BackgroundRole,
Qt.ItemDataRole.DisplayRole,
Qt.ItemDataRole.FontRole,
Qt.ItemDataRole.ForegroundRole,
]
# only invalidate required roles
self.invalidate_row(selected_row.row_number, roles_to_invalidate)
self.signals.resize_rows_signal.emit(self.playlist_id)
def _background_role(self, row: int, column: int, plr: PlaylistRow) -> QBrush:
"""Return background setting"""
# Handle entire row colouring
# Header row
if self.is_header_row(row):
# Check for specific header colouring
if plr.row_bg is None:
plr.row_bg = ds.notecolours_get_colour(plr.note)
if plr.row_bg:
return QBrush(QColor(plr.row_bg))
else:
return QBrush(QColor(Config.COLOUR_NOTES_PLAYLIST))
# Unreadable track file
if file_is_unreadable(plr.path):
return QBrush(QColor(Config.COLOUR_UNREADABLE))
# Current track
if (
self.track_sequence.current
and self.track_sequence.current.playlist_id == self.playlist_id
and self.track_sequence.current.row_number == row
):
return QBrush(QColor(Config.COLOUR_CURRENT_PLAYLIST))
# Next track
if (
self.track_sequence.next
and self.track_sequence.next.playlist_id == self.playlist_id
and self.track_sequence.next.row_number == row
):
return QBrush(QColor(Config.COLOUR_NEXT_PLAYLIST))
# Individual cell colouring
if column == Col.START_GAP.value:
if plr.start_gap and plr.start_gap >= Config.START_GAP_WARNING_THRESHOLD:
return QBrush(QColor(Config.COLOUR_LONG_START))
if column == Col.BITRATE.value:
if not plr.bitrate or plr.bitrate < Config.BITRATE_LOW_THRESHOLD:
return QBrush(QColor(Config.COLOUR_BITRATE_LOW))
elif plr.bitrate < Config.BITRATE_OK_THRESHOLD:
return QBrush(QColor(Config.COLOUR_BITRATE_MEDIUM))
else:
return QBrush(QColor(Config.COLOUR_BITRATE_OK))
if column == Col.NOTE.value:
if plr.note:
if plr.note_bg is None:
plr.row_bg = ds.notecolours_get_colour(plr.note)
if plr.note_bg:
return QBrush(QColor(plr.note_bg))
return QBrush()
def columnCount(self, parent: QModelIndex = QModelIndex()) -> int:
"""Standard function for view"""
return len(Col)
# @log_call
def track_started_handler(self) -> None:
"""
Handle signal_track_started signal.
Actions required:
- sanity check
- change OBS scene if needed
- update Playdates in database
- update PlaylistRows in database
- update display
- find next track
- update track times
"""
if self.track_sequence.current is None:
raise ApplicationError("track_started called with no current track")
if self.track_sequence.current.playlist_id != self.playlist_id:
# Not for us
return
track_id = self.track_sequence.current.track_id
if not track_id:
raise ApplicationError("track_started() called with no track_id")
row_number = self.track_sequence.current.row_number
# Check for OBS scene change
self.obs_scene_change(row_number)
# Update Playdates in database
ds.playdates_update(track_id)
# Mark track as played in playlist
self.playlist_rows[row_number].played = True
# Update colour and times for current row
roles = [Qt.ItemDataRole.DisplayRole]
self.invalidate_row(row_number, roles)
# Update previous row in case we're hiding played rows
if self.track_sequence.previous and self.track_sequence.previous.row_number:
# only invalidate required roles
self.invalidate_row(self.track_sequence.previous.row_number, roles)
# Update all other track times
self.update_track_times()
# Find next track
next_row = self.find_next_row_to_play(row_number)
if next_row:
self.signals.signal_set_next_track.emit(self.playlist_rows[next_row])
def find_next_row_to_play(self, from_row_number: int) -> int | None:
"""
Find the next row to play in this playlist. Return row number or
None if there's no next track.
"""
next_row = None
unplayed_rows = [
a
for a in self.get_unplayed_rows()
if not self.is_header_row(a)
and not file_is_unreadable(self.playlist_rows[a].path)
]
if unplayed_rows:
try:
next_row = min([a for a in unplayed_rows if a > from_row_number])
except ValueError:
next_row = min(unplayed_rows)
return next_row
def data(
self, index: QModelIndex, role: int = Qt.ItemDataRole.DisplayRole
) -> QVariant | QFont | QBrush | str | int:
"""Return data to view"""
if (
not index.isValid()
or not (0 <= index.row() < len(self.playlist_rows))
or role
in [
Qt.ItemDataRole.DecorationRole,
Qt.ItemDataRole.StatusTipRole,
Qt.ItemDataRole.WhatsThisRole,
Qt.ItemDataRole.SizeHintRole,
Qt.ItemDataRole.TextAlignmentRole,
Qt.ItemDataRole.CheckStateRole,
Qt.ItemDataRole.InitialSortOrderRole,
]
):
return QVariant()
row = index.row()
column = index.column()
# plr for playlist row data as it's used a lot
plr = self.playlist_rows[row]
# These are ordered in approximately the frequency with which
# they are called
if role == Qt.ItemDataRole.BackgroundRole:
return self._background_role(row, column, plr)
elif role == Qt.ItemDataRole.DisplayRole:
return self._display_role(row, column, plr)
elif role == Qt.ItemDataRole.EditRole:
return self._edit_role(row, column, plr)
elif role == Qt.ItemDataRole.FontRole:
return self._font_role(row, column, plr)
elif role == Qt.ItemDataRole.ForegroundRole:
return self._foreground_role(row, column, plr)
elif role == Qt.ItemDataRole.ToolTipRole:
return self._tooltip_role(row, column, plr)
return QVariant()
# @log_call
def delete_rows(self, row_numbers: list[int]) -> None:
"""
Delete passed rows from model
Need to delete them in contiguous groups wrapped in beginRemoveRows / endRemoveRows
calls. To keep it simple, if inefficient, delete rows one by one.
Delete from highest row back so that not yet deleted row numbers don't change.
"""
for row_group in self._reversed_contiguous_row_groups(row_numbers):
# Signal that rows will be removed
super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group))
# Remove rows from data store
ds.playlist_remove_rows(self.playlist_id, row_group)
# Signal that data store has been updated
super().endRemoveRows()
self.refresh_data()
self.track_sequence.update()
self.update_track_times()
def _display_role(self, row: int, column: int, plr: PlaylistRow) -> str:
"""
Return text for display
"""
header_row = self.is_header_row(row)
# Set / reset column span
if column == HEADER_NOTES_COLUMN:
column_span = 1
if header_row:
column_span = self.columnCount() - 1
self.signals.span_cells_signal.emit(
self.playlist_id, row, HEADER_NOTES_COLUMN, 1, column_span
)
if header_row:
if column == HEADER_NOTES_COLUMN:
header_text = self.header_text(plr)
if not header_text:
return Config.SECTION_HEADER
else:
formatted_header = self.header_text(plr)
trimmed_header = self.remove_section_timer_markers(formatted_header)
return trimmed_header
else:
return ""
if column == Col.START_TIME.value:
start_time = plr.forecast_start_time
if start_time:
return start_time.strftime(Config.TRACK_TIME_FORMAT)
return ""
if column == Col.END_TIME.value:
end_time = plr.forecast_end_time
if end_time:
return end_time.strftime(Config.TRACK_TIME_FORMAT)
return ""
if column == Col.INTRO.value:
if plr.intro:
return f"{plr.intro / 1000:{Config.INTRO_SECONDS_FORMAT}}"
else:
return ""
dispatch_table: dict[int, str] = {
Col.ARTIST.value: plr.artist,
Col.BITRATE.value: str(plr.bitrate),
Col.DURATION.value: ms_to_mmss(plr.duration),
Col.LAST_PLAYED.value: get_relative_date(plr.lastplayed),
Col.NOTE.value: plr.note,
Col.START_GAP.value: str(plr.start_gap),
Col.TITLE.value: plr.title,
}
if column in dispatch_table:
return dispatch_table[column]
return ""
def _edit_role(self, row: int, column: int, plr: PlaylistRow) -> str | int:
"""
Return value for editing
"""
# If this is a header row and we're being asked for the
# HEADER_NOTES_COLUMN, return the note value
if self.is_header_row(row) and column == HEADER_NOTES_COLUMN:
return plr.note
if column == Col.INTRO.value:
return plr.intro or 0
if column == Col.TITLE.value:
return plr.title
if column == Col.ARTIST.value:
return plr.artist
if column == Col.NOTE.value:
return plr.note
return ""
def _foreground_role(self, row: int, column: int, plr: PlaylistRow) -> QBrush:
"""Return header foreground colour or QBrush() if none"""
plr.row_fg = ds.notecolours_get_colour(plr.note, foreground=True)
if plr.row_fg:
return QBrush(QColor(plr.row_fg))
return QBrush()
def flags(self, index: QModelIndex) -> Qt.ItemFlag:
"""
Standard model flags
"""
if not index.isValid():
return Qt.ItemFlag.ItemIsDropEnabled
default = (
Qt.ItemFlag.ItemIsEnabled
| Qt.ItemFlag.ItemIsSelectable
| Qt.ItemFlag.ItemIsDragEnabled
)
if index.column() in [
Col.TITLE.value,
Col.ARTIST.value,
Col.NOTE.value,
Col.INTRO.value,
]:
return default | Qt.ItemFlag.ItemIsEditable
return default
def _font_role(self, row: int, column: int, plr: PlaylistRow) -> QFont:
"""
Return font
"""
# Notes column is never bold
if column == Col.NOTE.value:
return QFont()
boldfont = QFont()
boldfont.setBold(not self.playlist_rows[row].played)
return boldfont
# @log_call
def get_duplicate_rows(self) -> list[int]:
"""
Return a list of duplicate rows. If track appears in rows 2, 3 and 4, return [3, 4]
(ie, ignore the first, not-yet-duplicate, track).
"""
found = []
result = []
for i in range(len(self.playlist_rows)):
track_id = self.playlist_rows[i].track_id
if track_id is None:
continue
if track_id in found:
result.append(i)
else:
found.append(track_id)
return result
# @log_call
def _get_new_row_number(self) -> int:
"""
Get row number for new row.
If any rows are selected, return the first such row number, else
return row number to add to end of model.
"""
if not self.selected_rows:
return len(self.playlist_rows)
return self.selected_rows[0].row_number
def get_row_info(self, row_number: int) -> PlaylistRow:
"""
Return info about passed row
"""
return self.playlist_rows[row_number]
# @log_call
def get_row_track_id(self, row_number: int) -> int | None:
"""
Return id of track associated with row or None if no track associated
"""
return self.playlist_rows[row_number].track_id
def get_row_track_path(self, row_number: int) -> str:
"""
Return path of track associated with row or empty string if no track associated
"""
return self.playlist_rows[row_number].path
def get_rows_duration(self, row_numbers: list[int]) -> int:
"""
Return the total duration of the passed rows
"""
duration = 0
for row_number in row_numbers:
duration += self.playlist_rows[row_number].duration
return duration
def get_unplayed_rows(self) -> list[int]:
"""
Return a list of unplayed row numbers
"""
result = [
a.row_number
for a in self.playlist_rows.values()
if not a.played and a.track_id is not None
]
return result
def headerData(
self,
section: int,
orientation: Qt.Orientation,
role: int = Qt.ItemDataRole.DisplayRole,
) -> str | int | QFont | QVariant:
"""
Return text for headers
"""
display_dispatch_table = {
Col.START_GAP.value: Config.HEADER_START_GAP,
Col.INTRO.value: Config.HEADER_INTRO,
Col.TITLE.value: Config.HEADER_TITLE,
Col.ARTIST.value: Config.HEADER_ARTIST,
Col.DURATION.value: Config.HEADER_DURATION,
Col.START_TIME.value: Config.HEADER_START_TIME,
Col.END_TIME.value: Config.HEADER_END_TIME,
Col.LAST_PLAYED.value: Config.HEADER_LAST_PLAYED,
Col.BITRATE.value: Config.HEADER_BITRATE,
Col.NOTE.value: Config.HEADER_NOTE,
}
if role == Qt.ItemDataRole.DisplayRole:
if orientation == Qt.Orientation.Horizontal:
return display_dispatch_table[section]
else:
if Config.ROWS_FROM_ZERO:
return section
else:
return section + 1
elif role == Qt.ItemDataRole.FontRole:
boldfont = QFont()
boldfont.setBold(True)
return boldfont
return QVariant()
def header_text(self, plr: PlaylistRow) -> str:
"""
Process possible section timing directives embeded in header
"""
if plr.note.endswith(Config.SECTION_STARTS):
return self.start_of_timed_section_header(plr)
elif plr.note.endswith("="):
return self.section_subtotal_header(plr)
elif plr.note == "-":
# If the hyphen is the only thing on the line, echo the note
# that started the section without the trailing "+".
for row_number in range(plr.row_number - 1, -1, -1):
row_rat = self.playlist_rows[row_number]
if self.is_header_row(row_number):
if row_rat.note.endswith("-"):
# We didn't find a matching section start
break
if row_rat.note.endswith("+"):
return f"[End: {row_rat.note[:-1]}]"
return "-"
return plr.note
def hide_played_tracks(self, hide: bool) -> None:
"""
Set played tracks hidden according to 'hide'
"""
self.played_tracks_hidden = hide
for row_number in range(len(self.playlist_rows)):
if self.is_played_row(row_number):
# only invalidate required roles
roles = [
Qt.ItemDataRole.DisplayRole,
]
self.invalidate_row(row_number, roles)
# @log_call
def insert_row_signal_handler(self, row_data: InsertTrack) -> None:
"""
Handle the signal_insert_track signal
"""
if row_data.playlist_id != self.playlist_id:
return
new_row_number = self._get_new_row_number()
# Check whether track is already in playlist
move_existing = False
if row_data.track_id:
existing_plr = self.is_track_in_playlist(row_data.track_id)
if existing_plr is not None:
if ask_yes_no(
"Duplicate row",
"Track already in playlist. " "Move to new location?",
default_yes=True,
):
move_existing = True
if move_existing and existing_plr:
self.move_track_add_note(new_row_number, existing_plr, note="")
else:
super().beginInsertRows(QModelIndex(), new_row_number, new_row_number)
_ = ds.playlist_insert_row(
playlist_id=self.playlist_id,
row_number=new_row_number,
track_id=row_data.track_id,
note=row_data.note,
)
super().endInsertRows()
# Need to refresh self.playlist_rows because row numbers will have
# changed
self.refresh_data()
self.signals.resize_rows_signal.emit(self.playlist_id)
self.track_sequence.update()
self.update_track_times()
roles_to_invalidate = [
Qt.ItemDataRole.BackgroundRole,
Qt.ItemDataRole.DisplayRole,
Qt.ItemDataRole.FontRole,
Qt.ItemDataRole.ForegroundRole,
]
self.invalidate_rows(
list(range(new_row_number, len(self.playlist_rows))), roles_to_invalidate
)
# @log_call
def invalidate_row(self, modified_row: int, roles: list[Qt.ItemDataRole]) -> None:
"""
Signal to view to refresh invalidated row
"""
self.dataChanged.emit(
self.index(modified_row, 0),
self.index(modified_row, self.columnCount() - 1),
roles,
)
def invalidate_rows(
self, modified_rows: list[int], roles: list[Qt.ItemDataRole]
) -> None:
"""
Signal to view to refresh invlidated rows
"""
for modified_row in modified_rows:
# only invalidate required roles
self.invalidate_row(modified_row, roles)
def is_header_row(self, row_number: int) -> bool:
"""
Return True if row is a header row, else False
"""
if row_number in self.playlist_rows:
return self.playlist_rows[row_number].path == ""
return False
# @log_call
def is_played_row(self, row_number: int) -> bool:
"""
Return True if row is an unplayed track row, else False
"""
return self.playlist_rows[row_number].played
def is_track_in_playlist(self, track_id: int) -> PlaylistRow | None:
"""
If this track_id is in the playlist, return the RowAndTrack object
else return None
"""
for row_number in range(len(self.playlist_rows)):
if self.playlist_rows[row_number].track_id == track_id:
return self.playlist_rows[row_number]
return None
def mark_unplayed(self, row_numbers: list[int]) -> None:
"""
Mark row as unplayed
"""
for row_number in row_numbers:
self.playlist_rows[row_number].played = False
self.refresh_row(row_number)
self.update_track_times()
# only invalidate required roles
roles = [
Qt.ItemDataRole.FontRole,
]
self.invalidate_rows(row_numbers, roles)
# @log_call
def move_rows(self, from_rows: list[int], to_row_number: int) -> bool:
"""
Move the playlist rows in from_rows to to_row. Return True if successful
else False.
"""
log.debug(f"move_rows({from_rows=}, {to_row_number=})")
if not from_rows:
log.debug("move_rows called with no from_rows")
return False
# Don't move current row
if self.track_sequence.current:
current_row = self.track_sequence.current.row_number
if current_row in from_rows:
log.debug("move_rows: Removing {current_row=} from {from_rows=}")
from_rows.remove(self.track_sequence.current.row_number)
from_rows = sorted(set(from_rows))
if (
min(from_rows) < 0
or max(from_rows) >= self.rowCount()
or to_row_number < 0
or to_row_number > self.rowCount()
):
log.debug("move_rows: invalid indexes")
return False
if to_row_number in from_rows:
return False # Destination within rows to be moved
# Notify model going to change
self.beginResetModel()
# Update database
ds.playlist_move_rows(from_rows, self.playlist_id, to_row_number)
# Notify model changed
self.endResetModel()
# Update display
self.refresh_data()
self.track_sequence.update()
self.update_track_times()
# TODO: do we need this?
# # only invalidate required roles
# roles = [
# Qt.ItemDataRole.DisplayRole,
# ]
# self.invalidate_rows(list(row_map.keys()), roles)
return True
# @log_call
def move_rows_between_playlists(
self,
from_rows: list[int],
to_row_number: int,
to_playlist_id: int,
) -> None:
"""
Move the playlist rows given to to_row and below of to_playlist.
"""
# Don't move current row
if self.track_sequence.current:
current_row = self.track_sequence.current.row_number
if current_row in from_rows:
log.debug(
"move_rows_between_playlists: Removing {current_row=} from {from_rows=}"
)
from_rows.remove(self.track_sequence.current.row_number)
# Row removal must be wrapped in beginRemoveRows .. endRemoveRows
# and the row range must be contiguous. Process the highest rows
# first so the lower row numbers are unchanged
row_groups = self._reversed_contiguous_row_groups(from_rows)
# Handle the moves in row_group chunks
for row_group in row_groups:
# Prepare source model
super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group))
# Prepare destination model
insert_rows = InsertRows(
to_playlist_id, to_row_number, to_row_number + len(row_group)
)
self.signals.signal_begin_insert_rows.emit(insert_rows)
ds.playlist_move_rows(
from_rows=row_group,
from_playlist_id=self.playlist_id,
to_row=to_row_number,
to_playlist_id=to_playlist_id,
)
self.signals.signal_end_insert_rows.emit(to_playlist_id)
super().endRemoveRows()
self.refresh_data()
self.track_sequence.update()
self.update_track_times()
def begin_insert_rows_handler(self, insert_rows: InsertRows) -> None:
"""
Prepare model to insert rows
"""
if insert_rows.playlist_id != self.playlist_id:
return
super().beginInsertRows(QModelIndex(), insert_rows.from_row, insert_rows.to_row)
def end_insert_rows_handler(self, playlist_id: int) -> None:
"""
End insert rows
"""
if playlist_id != self.playlist_id:
return
super().endInsertRows()
self.refresh_data()
# @log_call
def move_track_add_note(
self, new_row_number: int, existing_plr: PlaylistRow, note: str
) -> None:
"""
Move existing_rat track to new_row_number and append note to any existing note
"""
if note:
playlist_row = self.playlist_rows[existing_plr.row_number]
if playlist_row.note:
playlist_row.note += "\n" + note
else:
playlist_row.note = note
self.refresh_row(existing_plr.row_number)
self.move_rows([existing_plr.row_number], new_row_number)
self.signals.resize_rows_signal.emit(self.playlist_id)
# @log_call
def obs_scene_change(self, row_number: int) -> None:
"""
Check this row and any preceding headers for OBS scene change command
and execute any found
"""
# Check any headers before this row
idx = row_number - 1
while self.is_header_row(idx):
idx -= 1
# Step through headers in row order and finish with this row
for chkrow in range(idx + 1, row_number + 1):
match_obj = scene_change_re.search(self.playlist_rows[chkrow].note)
if match_obj:
scene_name = match_obj.group(1)
if scene_name:
ws = obswebsocket.obsws(
host=Config.OBS_HOST,
port=Config.OBS_PORT,
password=Config.OBS_PASSWORD,
)
try:
ws.connect()
ws.call(
obswebsocket.requests.SetCurrentProgramScene(
sceneName=scene_name
)
)
log.debug(f"{self}: OBS scene changed to '{scene_name}'")
continue
except obswebsocket.exceptions.ConnectionFailure:
log.warning(f"{self}: OBS connection refused")
return
# @log_call
def signal_track_ended_handler(self, playlist_id: int) -> None:
"""
Notification from signal_track_ended that the previous track has ended.
Actions required:
- sanity check
- update display
"""
if playlist_id != self.playlist_id:
# Not for us
return
# Sanity check
if not self.track_sequence.previous:
log.error(
f"{self}: playlistmodel:signal_track_ended_handler called with no current track"
)
return
if self.track_sequence.previous.row_number is None:
log.error(
f"{self}: signal_track_ended_handler called with no row number "
f"({self.track_sequence.previous=})"
)
return
# Update display
# only invalidate required roles
roles = [
Qt.ItemDataRole.BackgroundRole,
]
self.invalidate_row(self.track_sequence.previous.row_number, roles)
def refresh_data(self) -> None:
"""
Populate self.playlist_rows with playlist data
"""
# Note where each playlist_id is by mapping each playlistrow_id
# to its current row_number
plrid_to_row: dict[int, int] = {}
for oldrow in self.playlist_rows:
plrdata = self.playlist_rows[oldrow]
plrid_to_row[plrdata.playlistrow_id] = plrdata.row_number
# build a new playlist_rows
new_playlist_rows: dict[int, PlaylistRow] = {}
for dto in ds.playlistrows_by_playlist(self.playlist_id):
if dto.playlistrow_id not in plrid_to_row:
new_playlist_rows[dto.row_number] = PlaylistRow(dto)
else:
new_playlist_row = self.playlist_rows[plrid_to_row[dto.playlistrow_id]]
new_playlist_row.row_number = dto.row_number
new_playlist_rows[dto.row_number] = new_playlist_row
# Copy to self.playlist_rows
self.playlist_rows = new_playlist_rows
def refresh_row(self, row_number: int) -> None:
"""Populate dict for one row from database"""
plrid = self.playlist_rows[row_number].playlistrow_id
refreshed_row = ds.playlistrow_by_id(plrid)
if not refreshed_row:
raise ApplicationError(
f"Failed to retrieve row {self.playlist_id=}, {row_number=}"
)
self.playlist_rows[row_number] = PlaylistRow(refreshed_row)
# @log_call
def remove_track(self, row_number: int) -> None:
"""
Remove track from row, retaining row as a header row
"""
self.playlist_rows[row_number].track_id = 0
# only invalidate required roles
roles = [
Qt.ItemDataRole.DisplayRole,
]
self.invalidate_row(row_number, roles)
def rescan_track(self, row_number: int) -> None:
"""
Rescan track at passed row number
"""
track = self.playlist_rows[row_number]
metadata = get_all_track_metadata(track.path)
_ = ds.track_update(track.track_id, metadata)
roles = [
Qt.ItemDataRole.BackgroundRole,
Qt.ItemDataRole.DisplayRole,
]
# only invalidate required roles
self.invalidate_row(row_number, roles)
self.signals.resize_rows_signal.emit(self.playlist_id)
# @log_call
def reset_track_sequence_row_numbers(self) -> None:
"""
Signal handler for when row ordering has changed.
Example: row 4 is marked as next. Row 2 is deleted. The PlaylistRows table will
be correctly updated with change of row number, but track_sequence.next will still
contain row_number==4. This function fixes up the track_sequence row numbers by
looking up the playlistrow_id and retrieving the row number from the database.
"""
self.track_sequence.update()
self.update_track_times()
def remove_comments(self, row_numbers: list[int]) -> None:
"""
Remove comments from passed rows
"""
if not row_numbers:
return
# Safety check
if not ask_yes_no(
title="Remove comments",
question=f"Remove comments from {len(row_numbers)} rows?",
):
return
ds.playlist_remove_comments(self.playlist_id, row_numbers)
# only invalidate required roles
roles = [
Qt.ItemDataRole.BackgroundRole,
Qt.ItemDataRole.DisplayRole,
Qt.ItemDataRole.ForegroundRole,
]
self.invalidate_rows(row_numbers, roles)
# @log_call
def _reversed_contiguous_row_groups(
self, row_numbers: list[int]
) -> list[list[int]]:
"""
Take the list of row numbers and split into groups of contiguous rows. Return as a list
of lists with the highest row numbers first.
Example:
input [2, 3, 4, 5, 7, 9, 10, 13, 17, 20, 21]
return: [[20, 21], [17], [13], [9, 10], [7], [2, 3, 4, 5]]
"""
result: list[list[int]] = []
temp: list[int] = []
last_value = row_numbers[0] - 1
row_numbers.sort()
for idx in range(len(row_numbers)):
if row_numbers[idx] != last_value + 1:
result.append(temp)
temp = []
last_value = row_numbers[idx]
temp.append(last_value)
if temp:
result.append(temp)
result.reverse()
return result
def remove_section_timer_markers(self, header_text: str) -> str:
"""
Remove characters used to mark section timings from
passed header text.
Remove text using to signal header colours if colour entry
is so marked.
Return header text witout markers
"""
if header_text == "=":
return ""
while header_text.endswith(Config.SECTION_STARTS):
header_text = header_text[0:-1]
while header_text.endswith(Config.SECTION_ENDINGS):
header_text = header_text[0:-1]
# Parse passed header text and remove the first colour match string
return ds.notecolours_remove_colour_substring(header_text)
def rowCount(self, index: QModelIndex = QModelIndex()) -> int:
"""Standard function for view"""
return len(self.playlist_rows)
def section_subtotal_header(self, plr: PlaylistRow) -> str:
"""
Process this row as subtotal within a timed section and
return display text for this row
"""
count: int = 0
unplayed_count: int = 0
duration: int = 0
if plr.row_number == 0:
# Meaningless to have a subtotal on row 0
return Config.SUBTOTAL_ON_ROW_ZERO
# Show subtotal
for row_number in range(plr.row_number - 1, -1, -1):
row_rat = self.playlist_rows[row_number]
if self.is_header_row(row_number) or row_number == 0:
if row_rat.note.endswith(Config.SECTION_STARTS) or row_number == 0:
# If we are playing this section, also
# calculate end time when all tracks are played.
end_time_str = ""
if (
self.track_sequence.current
and self.track_sequence.current.end_time
and (
row_number
< self.track_sequence.current.row_number
< plr.row_number
)
):
section_end_time = (
self.track_sequence.current.end_time
+ dt.timedelta(milliseconds=duration)
)
end_time_str = (
", section end time "
+ section_end_time.strftime(Config.TRACK_TIME_FORMAT)
)
clean_header = self.remove_section_timer_markers(plr.note)
if clean_header:
return (
f"{clean_header} ["
f"{unplayed_count}/{count} track{'s' if count > 1 else ''} "
f"({ms_to_mmss(duration)}) unplayed{end_time_str}]"
)
else:
return (
f"[{unplayed_count}/{count} track{'s' if count > 1 else ''} "
f"({ms_to_mmss(duration)}) unplayed{end_time_str}]"
)
else:
continue
else:
count += 1
if not row_rat.played:
unplayed_count += 1
duration += row_rat.duration
# We should never get here
raise ApplicationError("Error in section_subtotal_header()")
def selection_is_sortable(self, row_numbers: list[int]) -> bool:
"""
Return True if the selection is sortable. That means:
- at least two rows selected
- selected rows are contiguous
- selected rows do not include any header rows
"""
# at least two rows selected
if len(row_numbers) < 2:
return False
# selected rows are contiguous
if sorted(row_numbers) != list(range(min(row_numbers), max(row_numbers) + 1)):
return False
# selected rows do not include any header rows
for row_number in row_numbers:
if self.is_header_row(row_number):
return False
return True
# @log_call
def playlist_selected_rows_handler(self, selected_rows: SelectedRows) -> None:
"""
Handle signal_playlist_selected_rows to keep track of which rows
are selected in the view
"""
if selected_rows.playlist_id != self.playlist_id:
return
self.selected_rows = [self.playlist_rows[a] for a in selected_rows.rows]
# @log_call
def set_next_row_handler(self, playlist_id: int) -> None:
"""
Handle signal_set_next_row
"""
if playlist_id != self.playlist_id:
return
if len(self.selected_rows) == 0:
# No row selected so clear next track
if self.track_sequence.next is not None:
self.track_sequence.set_next(None)
return
if len(self.selected_rows) > 1:
self.signals.show_warning_signal.emit(
"Too many rows selected", "Select one row for next row"
)
return
plr = self.selected_rows[0]
if plr.track_id is None:
raise ApplicationError(f"set_next_row: no track_id ({plr=})")
old_next_row: int | None = None
if self.track_sequence.next:
old_next_row = self.track_sequence.next.row_number
roles = [
Qt.ItemDataRole.BackgroundRole,
]
if old_next_row is not None:
# only invalidate required roles
self.invalidate_row(old_next_row, roles)
# only invalidate required roles
self.invalidate_row(plr.row_number, roles)
self.signals.signal_set_next_track.emit(plr)
def next_track_changed_handler(self) -> None:
"""
Handle next track changed
"""
self.update_track_times()
# Refresh display to show new next track
if self.track_sequence.next:
next_row_number = self.track_sequence.next.row_number
if next_row_number is not None:
self.invalidate_row(next_row_number, [Qt.ItemDataRole.BackgroundRole])
# @log_call
def setData(
self,
index: QModelIndex,
value: str | float,
role: int = Qt.ItemDataRole.EditRole,
) -> bool:
"""
Update model with edited data. Here we simply update the
playlist_row in self.playlist_rows. The act of doing that will
trigger a database update in the @setter property in the
PlaylistRow class.
"""
if not index.isValid() or role != Qt.ItemDataRole.EditRole:
return False
row_number = index.row()
column = index.column()
plr = self.playlist_rows[row_number]
if column == Col.NOTE.value:
plr.note = str(value)
elif column == Col.TITLE.value:
plr.title = str(value)
elif column == Col.ARTIST.value:
plr.artist = str(value)
elif column == Col.INTRO.value:
intro = int(round(float(value), 1) * 1000)
plr.intro = intro
else:
raise ApplicationError(f"setData called with unexpected column ({column=})")
self.refresh_row(row_number)
self.dataChanged.emit(index, index, [Qt.ItemDataRole.DisplayRole, role])
return True
def sort_by_artist(self, row_numbers: list[int]) -> None:
"""
Sort selected rows by artist
"""
self.sort_by_attribute(row_numbers, "artist")
def sort_by_attribute(self, row_numbers: list[int], attr_name: str) -> None:
"""
Sort selected rows by passed attribute name where 'attribute' is a
key in PlaylistRowData
"""
# Create a subset of playlist_rows with the rows we are
# interested in
shortlist_rows = {k: self.playlist_rows[k] for k in row_numbers}
sorted_list = [
playlist_row.row_number
for playlist_row in sorted(
shortlist_rows.values(), key=attrgetter(attr_name)
)
]
self.move_rows(sorted_list, min(sorted_list))
def sort_by_duration(self, row_numbers: list[int]) -> None:
"""
Sort selected rows by duration
"""
self.sort_by_attribute(row_numbers, "duration")
def sort_by_lastplayed(self, row_numbers: list[int]) -> None:
"""
Sort selected rows by lastplayed
"""
self.sort_by_attribute(row_numbers, "lastplayed")
def sort_randomly(self, row_numbers: list[int]) -> None:
"""
Sort selected rows randomly
"""
shuffle(row_numbers)
self.move_rows(row_numbers, min(row_numbers))
def sort_by_title(self, row_numbers: list[int]) -> None:
"""
Sort selected rows by title
"""
self.sort_by_attribute(row_numbers, "title")
def start_of_timed_section_header(self, plr: PlaylistRow) -> str:
"""
Process this row as the start of a timed section and
return display text for this row
"""
count: int = 0
unplayed_count: int = 0
duration: int = 0
clean_header = self.remove_section_timer_markers(plr.note)
for row_number in range(plr.row_number + 1, len(self.playlist_rows)):
row_rat = self.playlist_rows[row_number]
if self.is_header_row(row_number):
if row_rat.note.endswith(Config.SECTION_ENDINGS):
return (
f"{clean_header} "
f"[{count} tracks, {ms_to_mmss(duration)} unplayed]"
)
else:
continue
else:
count += 1
if not row_rat.played:
unplayed_count += 1
duration += row_rat.duration
return (
f"{clean_header} "
f"[{count} tracks, {ms_to_mmss(duration, none='none')} "
"unplayed (to end of playlist)]"
)
def supportedDropActions(self) -> Qt.DropAction:
return Qt.DropAction.MoveAction | Qt.DropAction.CopyAction
def _tooltip_role(self, row: int, column: int, plr: PlaylistRow) -> str:
"""
Return tooltip. Currently only used for last_played column.
"""
if column != Col.LAST_PLAYED.value:
return ""
track_id = self.playlist_rows[row].track_id
if not track_id:
return ""
return ds.playdates_get_last(track_id)
# @log_call
def update_or_insert(self, track_id: int, row_number: int) -> None:
"""
If the passed track_id exists in this playlist, update the
row(s), otherwise insert this track at row_number.
"""
track_rows = [
a.row_number for a in self.playlist_rows.values() if a.track_id == track_id
]
if track_rows:
for row in track_rows:
self.refresh_row(row)
# only invalidate required roles
roles = [
Qt.ItemDataRole.BackgroundRole,
Qt.ItemDataRole.DisplayRole,
Qt.ItemDataRole.FontRole,
Qt.ItemDataRole.ForegroundRole,
]
self.invalidate_rows(track_rows, roles)
else:
self.insert_row_signal_handler(
InsertTrack(playlist_id=self.playlist_id, track_id=track_id, note="")
)
def get_end_time(self, row_number: int, start_time: dt.datetime) -> dt.datetime:
"""
Return the end time for row_number from the passed start_time and
the row duration.
"""
plr = self.playlist_rows[row_number]
end_time = start_time + dt.timedelta(milliseconds=plr.duration)
return end_time
def update_start_end_times(
self, plr: PlaylistRow, start_time: dt.datetime, end_time: dt.datetime
) -> bool:
"""
Set the the start time of the passed PlaylistRow. If we changed
it, return True else return False.
"""
changed = False
if start_time != plr.forecast_start_time:
plr.forecast_start_time = start_time
changed = True
if end_time != plr.forecast_end_time:
plr.forecast_end_time = end_time
changed = True
return changed
# @log_call
def update_track_times(self) -> None:
"""
Update track start/end times in self.playlist_rows
"""
next_start_time: dt.datetime | None = None
update_rows: list[int] = []
current_track_row_number: int | None = None
next_track_row: int | None = None
row_count = len(self.playlist_rows)
# If we have a current track, get its end time
if (
self.track_sequence.current
and self.track_sequence.current.playlist_id == self.playlist_id
):
plr = self.track_sequence.current
current_track_row_number = plr.row_number
current_track_start_time = self.track_sequence.current.start_time
if current_track_start_time is None:
raise ApplicationError(
f"Can't get start time for current track ({self.track_sequence.current=})"
)
current_track_end_time = self.get_end_time(
current_track_row_number, current_track_start_time
)
if self.update_start_end_times(
plr, current_track_start_time, current_track_end_time
):
update_rows.append(current_track_row_number)
# If we have a next track, note row number
if (
self.track_sequence.next
and self.track_sequence.next.playlist_id == self.playlist_id
):
next_track_row = self.track_sequence.next.row_number
# Step through rows and update start/end times
for row_number in range(row_count):
plr = self.playlist_rows[row_number]
# Don't update times for tracks that have been played unless
# this is the next track, for unreadable tracks or for the
# current track, handled above.
if (
(plr.played and row_number != next_track_row)
or row_number == current_track_row_number
or (plr.path and file_is_unreadable(plr.path))
):
continue
# Reset start time if timing in header; otherwise skip header
if self.is_header_row(row_number):
header_time = get_embedded_time(plr.note)
if header_time:
next_start_time = header_time
continue
# Set start time for next row if we have a current track
if current_track_row_number is not None and row_number == next_track_row:
next_start_time = self.get_end_time(row_number, current_track_end_time)
if self.update_start_end_times(
plr, current_track_end_time, next_start_time
):
update_rows.append(row_number)
# If we're between the current and next row, zero out
# times
if (
(current_track_row_number or row_count)
< row_number
< (next_track_row or 0)
):
plr.forecast_start_time = plr.forecast_end_time = None
update_rows.append(row_number)
continue
# If we don't have a start time, keep looking
if next_start_time is None:
continue
# Set start/end
plr = self.playlist_rows[row_number]
start_time = next_start_time
next_start_time = self.get_end_time(row_number, next_start_time)
if self.update_start_end_times(plr, start_time, next_start_time):
update_rows.append(row_number)
continue
# Update start/stop times of rows that have changed
for updated_row in update_rows:
self.dataChanged.emit(
self.index(updated_row, Col.START_TIME.value),
self.index(updated_row, Col.END_TIME.value),
)
class PlaylistProxyModel(QSortFilterProxyModel):
"""
For searching and filtering
"""
def __init__(
self,
) -> None:
super().__init__()
# Search all columns
self.setFilterKeyColumn(-1)
self.track_sequence = TrackSequence()
def __repr__(self) -> str:
return f"<PlaylistProxyModel: sourceModel={self.sourceModel()}>"
def filterAcceptsRow(self, source_row: int, source_parent: QModelIndex) -> bool:
"""
Subclass to filter by played status. Return True to show this row, False to hide it.
"""
if Config.HIDE_PLAYED_MODE != Config.HIDE_PLAYED_MODE_TRACKS:
return super().filterAcceptsRow(source_row, source_parent)
if self.sourceModel().played_tracks_hidden:
if self.sourceModel().is_played_row(source_row):
# Don't hide current track
if (
self.track_sequence.current
and self.track_sequence.current.playlist_id
== self.sourceModel().playlist_id
and self.track_sequence.current.row_number == source_row
):
return True
# Don't hide next track
if (
self.track_sequence.next
and self.track_sequence.next.playlist_id
== self.sourceModel().playlist_id
and self.track_sequence.next.row_number == source_row
):
return True
# Handle previous track
if self.track_sequence.previous:
if (
self.track_sequence.previous.playlist_id
!= self.sourceModel().playlist_id
or self.track_sequence.previous.row_number != source_row
):
# This row isn't our previous track: hide it
return False
if (
self.track_sequence.current
and self.track_sequence.current.start_time
):
# This row is our previous track. Don't hide it
# until HIDE_AFTER_PLAYING_OFFSET milliseconds
# after current track has started
if (
self.track_sequence.current.start_time
and dt.datetime.now()
> (
self.track_sequence.current.start_time
+ dt.timedelta(
milliseconds=Config.HIDE_AFTER_PLAYING_OFFSET
)
)
):
return False
else:
# Invalidate this row in
# HIDE_AFTER_PLAYING_OFFSET and a bit
# milliseconds so that it hides then. We add
# 100mS on so that the if clause above is
# true next time through.
# only invalidate required roles
roles = [
Qt.ItemDataRole.DisplayRole,
]
QTimer.singleShot(
Config.HIDE_AFTER_PLAYING_OFFSET + 100,
lambda: self.sourceModel().invalidate_row(
source_row, roles
),
)
return True
# Next track not playing yet so don't hide previous
else:
return True
# No previous track so hide this played track immediately
return False
return super().filterAcceptsRow(source_row, source_parent)
def set_incremental_search(self, search_string: str) -> None:
"""
Update search pattern
"""
self.setFilterRegularExpression(
QRegularExpression(
search_string, QRegularExpression.PatternOption.CaseInsensitiveOption
)
)
def sourceModel(self) -> PlaylistModel:
"""
Override sourceModel to return correct type
"""
return cast(PlaylistModel, super().sourceModel())