1694 lines
56 KiB
Python
1694 lines
56 KiB
Python
# Standard library imports
|
|
from __future__ import annotations
|
|
|
|
from operator import attrgetter
|
|
from random import shuffle
|
|
from typing import cast, Optional
|
|
import datetime as dt
|
|
import re
|
|
|
|
# PyQt imports
|
|
from PyQt6.QtCore import (
|
|
QAbstractTableModel,
|
|
QModelIndex,
|
|
QRegularExpression,
|
|
QSortFilterProxyModel,
|
|
Qt,
|
|
QTimer,
|
|
QVariant,
|
|
)
|
|
from PyQt6.QtGui import (
|
|
QBrush,
|
|
QColor,
|
|
QFont,
|
|
)
|
|
|
|
# Third party imports
|
|
# import line_profiler
|
|
import obswebsocket # type: ignore
|
|
|
|
# import snoop # type: ignore
|
|
|
|
# App imports
|
|
from classes import (
|
|
ApplicationError,
|
|
Col,
|
|
InsertRows,
|
|
InsertTrack,
|
|
MusicMusterSignals,
|
|
SelectedRows,
|
|
TrackAndPlaylist,
|
|
)
|
|
from config import Config
|
|
from helpers import (
|
|
ask_yes_no,
|
|
file_is_unreadable,
|
|
get_all_track_metadata,
|
|
get_embedded_time,
|
|
get_relative_date,
|
|
ms_to_mmss,
|
|
)
|
|
from log import log, log_call
|
|
from playlistrow import PlaylistRow, TrackSequence
|
|
import ds
|
|
|
|
|
|
HEADER_NOTES_COLUMN = 1
|
|
scene_change_re = re.compile(r"SetScene=\[([^[\]]*)\]")
|
|
|
|
|
|
class PlaylistModel(QAbstractTableModel):
|
|
"""
|
|
The Playlist Model
|
|
|
|
Cache the database info in self.playlist_rows, a dictionary of
|
|
PlaylistRow objects indexed by row_number.
|
|
|
|
Update strategy: update the database and then refresh the
|
|
row-indexed cached copy (self.playlist_rows). Do not edit
|
|
self.playlist_rows directly because keeping it and the
|
|
database in sync is uncessarily challenging.
|
|
|
|
refresh_row() will populate one row of playlist_rows from the
|
|
database
|
|
|
|
refresh_data() will repopulate all of playlist_rows from the
|
|
database.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
playlist_id: int,
|
|
is_template: bool,
|
|
) -> None:
|
|
super().__init__()
|
|
|
|
self.playlist_id = playlist_id
|
|
self.is_template = is_template
|
|
self.track_sequence = TrackSequence()
|
|
|
|
self.playlist_rows: dict[int, PlaylistRow] = {}
|
|
self.selected_rows: list[PlaylistRow] = []
|
|
self.signals = MusicMusterSignals()
|
|
self.played_tracks_hidden = False
|
|
|
|
# Connect signals
|
|
self.signals.signal_add_track_to_header.connect(self.add_track_to_header)
|
|
self.signals.signal_begin_insert_rows.connect(self.begin_insert_rows)
|
|
self.signals.signal_end_insert_rows.connect(self.end_insert_rows)
|
|
self.signals.signal_insert_track.connect(self.insert_row_signal_handler)
|
|
self.signals.signal_playlist_selected_rows.connect(self.set_selected_rows)
|
|
self.signals.signal_set_next_row.connect(self.set_next_row)
|
|
self.signals.signal_track_started.connect(self.track_started)
|
|
self.signals.track_ended_signal.connect(self.previous_track_ended)
|
|
self.signals.signal_next_track_changed.connect(self.signal_next_track_changed_handler)
|
|
|
|
# Populate self.playlist_rows
|
|
for dto in ds.playlistrows_by_playlist(self.playlist_id):
|
|
self.playlist_rows[dto.row_number] = PlaylistRow(dto)
|
|
self.update_track_times()
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f"<PlaylistModel: playlist_id={self.playlist_id}, "
|
|
f"is_template={self.is_template}, "
|
|
f"{self.rowCount()} rows>"
|
|
)
|
|
|
|
def active_section_header(self) -> int:
|
|
"""
|
|
Return the row number of the first header that has any of the following below it:
|
|
- unplayed tracks
|
|
- the currently being played track
|
|
- the track marked as next to play
|
|
"""
|
|
|
|
header_row = 0
|
|
|
|
for row_number in range(len(self.playlist_rows)):
|
|
if self.is_header_row(row_number):
|
|
header_row = row_number
|
|
continue
|
|
if not self.is_played_row(row_number):
|
|
break
|
|
|
|
# Here means that row_number points to a played track. The
|
|
# current track will be marked as played when we start
|
|
# playing it. It's also possible that the track marked as
|
|
# next has already been played. Check for either of those.
|
|
|
|
for ts in [self.track_sequence.next, self.track_sequence.current]:
|
|
if (
|
|
ts
|
|
and ts.row_number == row_number
|
|
and ts.playlist_id == self.playlist_id
|
|
):
|
|
# We've found the current or next track, so return
|
|
# the last-found header row
|
|
return header_row
|
|
|
|
return header_row
|
|
|
|
# @log_call
|
|
def add_track_to_header(self, track_and_playlist: TrackAndPlaylist) -> None:
|
|
"""
|
|
Handle signal_add_track_to_header
|
|
"""
|
|
|
|
if track_and_playlist.playlist_id != self.playlist_id:
|
|
return
|
|
|
|
if not self.selected_rows:
|
|
raise ApplicationError("Add track to header but no row selected")
|
|
|
|
if len(self.selected_rows) > 1:
|
|
self.signals.show_warning_signal.emit(
|
|
"Add track to header", "Select one header to add track to"
|
|
)
|
|
return
|
|
|
|
selected_row = self.selected_rows[0]
|
|
if selected_row.path:
|
|
self.signals.show_warning_signal.emit(
|
|
"Add track to header", "Select header to add track to"
|
|
)
|
|
return
|
|
|
|
selected_row.track_id = track_and_playlist.track_id
|
|
|
|
# Update local copy
|
|
self.refresh_row(selected_row.row_number)
|
|
# Repaint row
|
|
roles_to_invalidate = [
|
|
Qt.ItemDataRole.BackgroundRole,
|
|
Qt.ItemDataRole.DisplayRole,
|
|
Qt.ItemDataRole.FontRole,
|
|
Qt.ItemDataRole.ForegroundRole,
|
|
]
|
|
# only invalidate required roles
|
|
self.invalidate_row(selected_row.row_number, roles_to_invalidate)
|
|
|
|
self.signals.resize_rows_signal.emit(self.playlist_id)
|
|
|
|
def _background_role(self, row: int, column: int, plr: PlaylistRow) -> QBrush:
|
|
"""Return background setting"""
|
|
|
|
# Handle entire row colouring
|
|
# Header row
|
|
if self.is_header_row(row):
|
|
# Check for specific header colouring
|
|
if plr.row_bg is None:
|
|
plr.row_bg = ds.notecolours_get_colour(plr.note)
|
|
if plr.row_bg:
|
|
return QBrush(QColor(plr.row_bg))
|
|
else:
|
|
return QBrush(QColor(Config.COLOUR_NOTES_PLAYLIST))
|
|
# Unreadable track file
|
|
if file_is_unreadable(plr.path):
|
|
return QBrush(QColor(Config.COLOUR_UNREADABLE))
|
|
# Current track
|
|
if (
|
|
self.track_sequence.current
|
|
and self.track_sequence.current.playlist_id == self.playlist_id
|
|
and self.track_sequence.current.row_number == row
|
|
):
|
|
return QBrush(QColor(Config.COLOUR_CURRENT_PLAYLIST))
|
|
# Next track
|
|
if (
|
|
self.track_sequence.next
|
|
and self.track_sequence.next.playlist_id == self.playlist_id
|
|
and self.track_sequence.next.row_number == row
|
|
):
|
|
return QBrush(QColor(Config.COLOUR_NEXT_PLAYLIST))
|
|
|
|
# Individual cell colouring
|
|
if column == Col.START_GAP.value:
|
|
if plr.start_gap and plr.start_gap >= Config.START_GAP_WARNING_THRESHOLD:
|
|
return QBrush(QColor(Config.COLOUR_LONG_START))
|
|
if column == Col.BITRATE.value:
|
|
if not plr.bitrate or plr.bitrate < Config.BITRATE_LOW_THRESHOLD:
|
|
return QBrush(QColor(Config.COLOUR_BITRATE_LOW))
|
|
elif plr.bitrate < Config.BITRATE_OK_THRESHOLD:
|
|
return QBrush(QColor(Config.COLOUR_BITRATE_MEDIUM))
|
|
else:
|
|
return QBrush(QColor(Config.COLOUR_BITRATE_OK))
|
|
if column == Col.NOTE.value:
|
|
if plr.note:
|
|
if plr.note_bg is None:
|
|
plr.row_bg = ds.notecolours_get_colour(plr.note)
|
|
if plr.note_bg:
|
|
return QBrush(QColor(plr.note_bg))
|
|
|
|
return QBrush()
|
|
|
|
def columnCount(self, parent: QModelIndex = QModelIndex()) -> int:
|
|
"""Standard function for view"""
|
|
|
|
return len(Col)
|
|
|
|
# @log_call
|
|
def track_started(self, play_track: TrackAndPlaylist) -> None:
|
|
"""
|
|
Notification from musicmuster that the current track has just
|
|
started playing
|
|
|
|
Actions required:
|
|
- sanity check
|
|
- change OBS scene if needed
|
|
- update Playdates in database
|
|
- update PlaylistRows in database
|
|
- update display
|
|
- find next track
|
|
- update track times
|
|
"""
|
|
|
|
if play_track.playlist_id != self.playlist_id:
|
|
# Not for us
|
|
return
|
|
|
|
track_id = play_track.track_id
|
|
# Sanity check - 1
|
|
if not track_id:
|
|
raise ApplicationError("track_started() called with no track_id")
|
|
|
|
# Sanity check - 2
|
|
if self.track_sequence.current is None:
|
|
raise ApplicationError("track_started callced with no current track")
|
|
|
|
row_number = self.track_sequence.current.row_number
|
|
playlist_dto = self.playlist_rows[row_number]
|
|
|
|
# Sanity check - 3
|
|
if playlist_dto.track_id != track_id:
|
|
raise ApplicationError("track_id mismatch between playlist_rows and signal")
|
|
|
|
# Check for OBS scene change
|
|
self.obs_scene_change(row_number)
|
|
|
|
# Update Playdates in database
|
|
ds.playdates_update(track_id)
|
|
|
|
# Mark track as played in playlist
|
|
playlist_dto.played = True
|
|
|
|
# Update colour and times for current row
|
|
roles_to_invalidate = [Qt.ItemDataRole.DisplayRole]
|
|
self.invalidate_row(row_number, roles_to_invalidate)
|
|
|
|
# Update previous row in case we're hiding played rows
|
|
if self.track_sequence.previous and self.track_sequence.previous.row_number:
|
|
# only invalidate required roles
|
|
self.invalidate_row(
|
|
self.track_sequence.previous.row_number, roles_to_invalidate
|
|
)
|
|
|
|
# Update all other track times
|
|
self.update_track_times()
|
|
|
|
# Find next track
|
|
next_row = self.find_next_row_to_play(row_number)
|
|
if next_row:
|
|
self.signals.signal_set_next_track.emit(self.playlist_rows[next_row])
|
|
|
|
def find_next_row_to_play(self, from_row_number: int) -> int | None:
|
|
"""
|
|
Find the next row to play in this playlist. Return row number or
|
|
None if there's no next track.
|
|
"""
|
|
|
|
next_row = None
|
|
unplayed_rows = [
|
|
a
|
|
for a in self.get_unplayed_rows()
|
|
if not self.is_header_row(a)
|
|
and not file_is_unreadable(self.playlist_rows[a].path)
|
|
]
|
|
if unplayed_rows:
|
|
try:
|
|
next_row = min([a for a in unplayed_rows if a > from_row_number])
|
|
except ValueError:
|
|
next_row = min(unplayed_rows)
|
|
|
|
return next_row
|
|
|
|
def data(
|
|
self, index: QModelIndex, role: int = Qt.ItemDataRole.DisplayRole
|
|
) -> QVariant | QFont | QBrush | str | int:
|
|
"""Return data to view"""
|
|
|
|
if (
|
|
not index.isValid()
|
|
or not (0 <= index.row() < len(self.playlist_rows))
|
|
or role
|
|
in [
|
|
Qt.ItemDataRole.DecorationRole,
|
|
Qt.ItemDataRole.StatusTipRole,
|
|
Qt.ItemDataRole.WhatsThisRole,
|
|
Qt.ItemDataRole.SizeHintRole,
|
|
Qt.ItemDataRole.TextAlignmentRole,
|
|
Qt.ItemDataRole.CheckStateRole,
|
|
Qt.ItemDataRole.InitialSortOrderRole,
|
|
]
|
|
):
|
|
return QVariant()
|
|
|
|
row = index.row()
|
|
column = index.column()
|
|
# plr for playlist row data as it's used a lot
|
|
plr = self.playlist_rows[row]
|
|
|
|
# These are ordered in approximately the frequency with which
|
|
# they are called
|
|
if role == Qt.ItemDataRole.BackgroundRole:
|
|
return self._background_role(row, column, plr)
|
|
elif role == Qt.ItemDataRole.DisplayRole:
|
|
return self._display_role(row, column, plr)
|
|
elif role == Qt.ItemDataRole.EditRole:
|
|
return self._edit_role(row, column, plr)
|
|
elif role == Qt.ItemDataRole.FontRole:
|
|
return self._font_role(row, column, plr)
|
|
elif role == Qt.ItemDataRole.ForegroundRole:
|
|
return self._foreground_role(row, column, plr)
|
|
elif role == Qt.ItemDataRole.ToolTipRole:
|
|
return self._tooltip_role(row, column, plr)
|
|
|
|
return QVariant()
|
|
|
|
# @log_call
|
|
def delete_rows(self, row_numbers: list[int]) -> None:
|
|
"""
|
|
Delete passed rows from model
|
|
|
|
Need to delete them in contiguous groups wrapped in beginRemoveRows / endRemoveRows
|
|
calls. To keep it simple, if inefficient, delete rows one by one.
|
|
|
|
Delete from highest row back so that not yet deleted row numbers don't change.
|
|
"""
|
|
|
|
for row_group in self._reversed_contiguous_row_groups(row_numbers):
|
|
# Signal that rows will be removed
|
|
super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group))
|
|
# Remove rows from data store
|
|
ds.playlist_remove_rows(self.playlist_id, row_group)
|
|
# Signal that data store has been updated
|
|
super().endRemoveRows()
|
|
|
|
self.refresh_data()
|
|
self.track_sequence.update()
|
|
self.update_track_times()
|
|
|
|
def _display_role(self, row: int, column: int, plr: PlaylistRow) -> str:
|
|
"""
|
|
Return text for display
|
|
"""
|
|
|
|
header_row = self.is_header_row(row)
|
|
|
|
# Set / reset column span
|
|
if column == HEADER_NOTES_COLUMN:
|
|
column_span = 1
|
|
if header_row:
|
|
column_span = self.columnCount() - 1
|
|
self.signals.span_cells_signal.emit(
|
|
self.playlist_id, row, HEADER_NOTES_COLUMN, 1, column_span
|
|
)
|
|
|
|
if header_row:
|
|
if column == HEADER_NOTES_COLUMN:
|
|
header_text = self.header_text(plr)
|
|
if not header_text:
|
|
return Config.SECTION_HEADER
|
|
else:
|
|
formatted_header = self.header_text(plr)
|
|
trimmed_header = self.remove_section_timer_markers(formatted_header)
|
|
return trimmed_header
|
|
else:
|
|
return ""
|
|
|
|
if column == Col.START_TIME.value:
|
|
start_time = plr.forecast_start_time
|
|
if start_time:
|
|
return start_time.strftime(Config.TRACK_TIME_FORMAT)
|
|
return ""
|
|
|
|
if column == Col.END_TIME.value:
|
|
end_time = plr.forecast_end_time
|
|
if end_time:
|
|
return end_time.strftime(Config.TRACK_TIME_FORMAT)
|
|
return ""
|
|
|
|
if column == Col.INTRO.value:
|
|
if plr.intro:
|
|
return f"{plr.intro / 1000:{Config.INTRO_SECONDS_FORMAT}}"
|
|
else:
|
|
return ""
|
|
|
|
dispatch_table: dict[int, str] = {
|
|
Col.ARTIST.value: plr.artist,
|
|
Col.BITRATE.value: str(plr.bitrate),
|
|
Col.DURATION.value: ms_to_mmss(plr.duration),
|
|
Col.LAST_PLAYED.value: get_relative_date(plr.lastplayed),
|
|
Col.NOTE.value: plr.note,
|
|
Col.START_GAP.value: str(plr.start_gap),
|
|
Col.TITLE.value: plr.title,
|
|
}
|
|
if column in dispatch_table:
|
|
return dispatch_table[column]
|
|
|
|
return ""
|
|
|
|
def _edit_role(self, row: int, column: int, plr: PlaylistRow) -> str | int:
|
|
"""
|
|
Return value for editing
|
|
"""
|
|
|
|
# If this is a header row and we're being asked for the
|
|
# HEADER_NOTES_COLUMN, return the note value
|
|
if self.is_header_row(row) and column == HEADER_NOTES_COLUMN:
|
|
return plr.note
|
|
|
|
if column == Col.INTRO.value:
|
|
return plr.intro or 0
|
|
if column == Col.TITLE.value:
|
|
return plr.title
|
|
if column == Col.ARTIST.value:
|
|
return plr.artist
|
|
if column == Col.NOTE.value:
|
|
return plr.note
|
|
|
|
return ""
|
|
|
|
def _foreground_role(self, row: int, column: int, plr: PlaylistRow) -> QBrush:
|
|
"""Return header foreground colour or QBrush() if none"""
|
|
|
|
plr.row_fg = ds.notecolours_get_colour(plr.note, foreground=True)
|
|
if plr.row_fg:
|
|
return QBrush(QColor(plr.row_fg))
|
|
return QBrush()
|
|
|
|
def flags(self, index: QModelIndex) -> Qt.ItemFlag:
|
|
"""
|
|
Standard model flags
|
|
"""
|
|
|
|
if not index.isValid():
|
|
return Qt.ItemFlag.ItemIsDropEnabled
|
|
|
|
default = (
|
|
Qt.ItemFlag.ItemIsEnabled
|
|
| Qt.ItemFlag.ItemIsSelectable
|
|
| Qt.ItemFlag.ItemIsDragEnabled
|
|
)
|
|
if index.column() in [
|
|
Col.TITLE.value,
|
|
Col.ARTIST.value,
|
|
Col.NOTE.value,
|
|
Col.INTRO.value,
|
|
]:
|
|
return default | Qt.ItemFlag.ItemIsEditable
|
|
|
|
return default
|
|
|
|
def _font_role(self, row: int, column: int, plr: PlaylistRow) -> QFont:
|
|
"""
|
|
Return font
|
|
"""
|
|
|
|
# Notes column is never bold
|
|
if column == Col.NOTE.value:
|
|
return QFont()
|
|
|
|
boldfont = QFont()
|
|
boldfont.setBold(not self.playlist_rows[row].played)
|
|
|
|
return boldfont
|
|
|
|
# @log_call
|
|
def get_duplicate_rows(self) -> list[int]:
|
|
"""
|
|
Return a list of duplicate rows. If track appears in rows 2, 3 and 4, return [3, 4]
|
|
(ie, ignore the first, not-yet-duplicate, track).
|
|
"""
|
|
|
|
found = []
|
|
result = []
|
|
|
|
for i in range(len(self.playlist_rows)):
|
|
track_id = self.playlist_rows[i].track_id
|
|
if track_id is None:
|
|
continue
|
|
if track_id in found:
|
|
result.append(i)
|
|
else:
|
|
found.append(track_id)
|
|
|
|
return result
|
|
|
|
# @log_call
|
|
def _get_new_row_number(self) -> int:
|
|
"""
|
|
Get row number for new row.
|
|
|
|
If any rows are selected, return the first such row number, else
|
|
return row number to add to end of model.
|
|
"""
|
|
|
|
if not self.selected_rows:
|
|
return len(self.playlist_rows)
|
|
|
|
return self.selected_rows[0].row_number
|
|
|
|
def get_row_info(self, row_number: int) -> PlaylistRow:
|
|
"""
|
|
Return info about passed row
|
|
"""
|
|
|
|
return self.playlist_rows[row_number]
|
|
|
|
# @log_call
|
|
def get_row_track_id(self, row_number: int) -> Optional[int]:
|
|
"""
|
|
Return id of track associated with row or None if no track associated
|
|
"""
|
|
|
|
return self.playlist_rows[row_number].track_id
|
|
|
|
def get_row_track_path(self, row_number: int) -> str:
|
|
"""
|
|
Return path of track associated with row or empty string if no track associated
|
|
"""
|
|
|
|
return self.playlist_rows[row_number].path
|
|
|
|
def get_rows_duration(self, row_numbers: list[int]) -> int:
|
|
"""
|
|
Return the total duration of the passed rows
|
|
"""
|
|
|
|
duration = 0
|
|
for row_number in row_numbers:
|
|
duration += self.playlist_rows[row_number].duration
|
|
|
|
return duration
|
|
|
|
def get_unplayed_rows(self) -> list[int]:
|
|
"""
|
|
Return a list of unplayed row numbers
|
|
"""
|
|
|
|
result = [
|
|
a.row_number
|
|
for a in self.playlist_rows.values()
|
|
if not a.played and a.track_id is not None
|
|
]
|
|
return result
|
|
|
|
def headerData(
|
|
self,
|
|
section: int,
|
|
orientation: Qt.Orientation,
|
|
role: int = Qt.ItemDataRole.DisplayRole,
|
|
) -> str | int | QFont | QVariant:
|
|
"""
|
|
Return text for headers
|
|
"""
|
|
|
|
display_dispatch_table = {
|
|
Col.START_GAP.value: Config.HEADER_START_GAP,
|
|
Col.INTRO.value: Config.HEADER_INTRO,
|
|
Col.TITLE.value: Config.HEADER_TITLE,
|
|
Col.ARTIST.value: Config.HEADER_ARTIST,
|
|
Col.DURATION.value: Config.HEADER_DURATION,
|
|
Col.START_TIME.value: Config.HEADER_START_TIME,
|
|
Col.END_TIME.value: Config.HEADER_END_TIME,
|
|
Col.LAST_PLAYED.value: Config.HEADER_LAST_PLAYED,
|
|
Col.BITRATE.value: Config.HEADER_BITRATE,
|
|
Col.NOTE.value: Config.HEADER_NOTE,
|
|
}
|
|
|
|
if role == Qt.ItemDataRole.DisplayRole:
|
|
if orientation == Qt.Orientation.Horizontal:
|
|
return display_dispatch_table[section]
|
|
else:
|
|
if Config.ROWS_FROM_ZERO:
|
|
return section
|
|
else:
|
|
return section + 1
|
|
|
|
elif role == Qt.ItemDataRole.FontRole:
|
|
boldfont = QFont()
|
|
boldfont.setBold(True)
|
|
return boldfont
|
|
|
|
return QVariant()
|
|
|
|
def header_text(self, plr: PlaylistRow) -> str:
|
|
"""
|
|
Process possible section timing directives embeded in header
|
|
"""
|
|
|
|
if plr.note.endswith(Config.SECTION_STARTS):
|
|
return self.start_of_timed_section_header(plr)
|
|
|
|
elif plr.note.endswith("="):
|
|
return self.section_subtotal_header(plr)
|
|
|
|
elif plr.note == "-":
|
|
# If the hyphen is the only thing on the line, echo the note
|
|
# that started the section without the trailing "+".
|
|
for row_number in range(plr.row_number - 1, -1, -1):
|
|
row_rat = self.playlist_rows[row_number]
|
|
if self.is_header_row(row_number):
|
|
if row_rat.note.endswith("-"):
|
|
# We didn't find a matching section start
|
|
break
|
|
if row_rat.note.endswith("+"):
|
|
return f"[End: {row_rat.note[:-1]}]"
|
|
return "-"
|
|
|
|
return plr.note
|
|
|
|
def hide_played_tracks(self, hide: bool) -> None:
|
|
"""
|
|
Set played tracks hidden according to 'hide'
|
|
"""
|
|
|
|
self.played_tracks_hidden = hide
|
|
for row_number in range(len(self.playlist_rows)):
|
|
if self.is_played_row(row_number):
|
|
# only invalidate required roles
|
|
roles = [
|
|
Qt.ItemDataRole.DisplayRole,
|
|
]
|
|
self.invalidate_row(row_number, roles)
|
|
|
|
# @log_call
|
|
def insert_row_signal_handler(self, row_data: InsertTrack) -> None:
|
|
"""
|
|
Handle the signal_insert_track signal
|
|
"""
|
|
|
|
if row_data.playlist_id != self.playlist_id:
|
|
return
|
|
|
|
new_row_number = self._get_new_row_number()
|
|
|
|
# Check whether track is already in playlist
|
|
move_existing = False
|
|
if row_data.track_id:
|
|
existing_plr = self.is_track_in_playlist(row_data.track_id)
|
|
if existing_plr is not None:
|
|
if ask_yes_no(
|
|
"Duplicate row",
|
|
"Track already in playlist. " "Move to new location?",
|
|
default_yes=True,
|
|
):
|
|
move_existing = True
|
|
|
|
if move_existing and existing_plr:
|
|
self.move_track_add_note(
|
|
new_row_number, existing_plr, note=""
|
|
)
|
|
else:
|
|
super().beginInsertRows(QModelIndex(), new_row_number, new_row_number)
|
|
|
|
_ = ds.playlist_insert_row(
|
|
playlist_id=self.playlist_id,
|
|
row_number=new_row_number,
|
|
track_id=row_data.track_id,
|
|
note=row_data.note,
|
|
)
|
|
super().endInsertRows()
|
|
|
|
# Need to refresh self.playlist_rows because row numbers will have
|
|
# changed
|
|
self.refresh_data()
|
|
self.signals.resize_rows_signal.emit(self.playlist_id)
|
|
self.track_sequence.update()
|
|
self.update_track_times()
|
|
roles_to_invalidate = [
|
|
Qt.ItemDataRole.BackgroundRole,
|
|
Qt.ItemDataRole.DisplayRole,
|
|
Qt.ItemDataRole.FontRole,
|
|
Qt.ItemDataRole.ForegroundRole,
|
|
]
|
|
self.invalidate_rows(
|
|
list(range(new_row_number, len(self.playlist_rows))), roles_to_invalidate
|
|
)
|
|
|
|
# @log_call
|
|
def invalidate_row(self, modified_row: int, roles: list[Qt.ItemDataRole]) -> None:
|
|
"""
|
|
Signal to view to refresh invalidated row
|
|
"""
|
|
|
|
self.dataChanged.emit(
|
|
self.index(modified_row, 0),
|
|
self.index(modified_row, self.columnCount() - 1),
|
|
roles,
|
|
)
|
|
|
|
def invalidate_rows(
|
|
self, modified_rows: list[int], roles: list[Qt.ItemDataRole]
|
|
) -> None:
|
|
"""
|
|
Signal to view to refresh invlidated rows
|
|
"""
|
|
|
|
for modified_row in modified_rows:
|
|
# only invalidate required roles
|
|
self.invalidate_row(modified_row, roles)
|
|
|
|
def is_header_row(self, row_number: int) -> bool:
|
|
"""
|
|
Return True if row is a header row, else False
|
|
"""
|
|
|
|
if row_number in self.playlist_rows:
|
|
return self.playlist_rows[row_number].path == ""
|
|
return False
|
|
|
|
# @log_call
|
|
def is_played_row(self, row_number: int) -> bool:
|
|
"""
|
|
Return True if row is an unplayed track row, else False
|
|
"""
|
|
|
|
return self.playlist_rows[row_number].played
|
|
|
|
def is_track_in_playlist(self, track_id: int) -> Optional[PlaylistRow]:
|
|
"""
|
|
If this track_id is in the playlist, return the RowAndTrack object
|
|
else return None
|
|
"""
|
|
|
|
for row_number in range(len(self.playlist_rows)):
|
|
if self.playlist_rows[row_number].track_id == track_id:
|
|
return self.playlist_rows[row_number]
|
|
|
|
return None
|
|
|
|
def mark_unplayed(self, row_numbers: list[int]) -> None:
|
|
"""
|
|
Mark row as unplayed
|
|
"""
|
|
|
|
for row_number in row_numbers:
|
|
self.playlist_rows[row_number].played = False
|
|
self.refresh_row(row_number)
|
|
|
|
self.update_track_times()
|
|
# only invalidate required roles
|
|
roles = [
|
|
Qt.ItemDataRole.FontRole,
|
|
]
|
|
self.invalidate_rows(row_numbers, roles)
|
|
|
|
# @log_call
|
|
def move_rows(self, from_rows: list[int], to_row_number: int) -> bool:
|
|
"""
|
|
Move the playlist rows in from_rows to to_row. Return True if successful
|
|
else False.
|
|
"""
|
|
|
|
log.debug(f"move_rows({from_rows=}, {to_row_number=})")
|
|
|
|
if not from_rows:
|
|
log.debug("move_rows called with no from_rows")
|
|
return False
|
|
|
|
# Don't move current row
|
|
if self.track_sequence.current:
|
|
current_row = self.track_sequence.current.row_number
|
|
if current_row in from_rows:
|
|
log.debug("move_rows: Removing {current_row=} from {from_rows=}")
|
|
from_rows.remove(self.track_sequence.current.row_number)
|
|
|
|
from_rows = sorted(set(from_rows))
|
|
if (
|
|
min(from_rows) < 0
|
|
or max(from_rows) >= self.rowCount()
|
|
or to_row_number < 0
|
|
or to_row_number > self.rowCount()
|
|
):
|
|
log.debug("move_rows: invalid indexes")
|
|
return False
|
|
|
|
if to_row_number in from_rows:
|
|
return False # Destination within rows to be moved
|
|
|
|
# Notify model going to change
|
|
self.beginResetModel()
|
|
# Update database
|
|
ds.playlist_move_rows(from_rows, self.playlist_id, to_row_number)
|
|
# Notify model changed
|
|
self.endResetModel()
|
|
|
|
# Update display
|
|
self.refresh_data()
|
|
self.track_sequence.update()
|
|
self.update_track_times()
|
|
# TODO: do we need this?
|
|
# # only invalidate required roles
|
|
# roles = [
|
|
# Qt.ItemDataRole.DisplayRole,
|
|
# ]
|
|
# self.invalidate_rows(list(row_map.keys()), roles)
|
|
return True
|
|
|
|
# @log_call
|
|
def move_rows_between_playlists(
|
|
self,
|
|
from_rows: list[int],
|
|
to_row_number: int,
|
|
to_playlist_id: int,
|
|
) -> None:
|
|
"""
|
|
Move the playlist rows given to to_row and below of to_playlist.
|
|
"""
|
|
|
|
# Don't move current row
|
|
if self.track_sequence.current:
|
|
current_row = self.track_sequence.current.row_number
|
|
if current_row in from_rows:
|
|
log.debug(
|
|
"move_rows_between_playlists: Removing {current_row=} from {from_rows=}"
|
|
)
|
|
from_rows.remove(self.track_sequence.current.row_number)
|
|
|
|
# Row removal must be wrapped in beginRemoveRows .. endRemoveRows
|
|
# and the row range must be contiguous. Process the highest rows
|
|
# first so the lower row numbers are unchanged
|
|
|
|
row_groups = self._reversed_contiguous_row_groups(from_rows)
|
|
|
|
# Handle the moves in row_group chunks
|
|
|
|
for row_group in row_groups:
|
|
# Prepare source model
|
|
super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group))
|
|
# Prepare destination model
|
|
insert_rows = InsertRows(
|
|
to_playlist_id, to_row_number, to_row_number + len(row_group)
|
|
)
|
|
self.signals.signal_begin_insert_rows.emit(insert_rows)
|
|
ds.playlist_move_rows(
|
|
from_rows=row_group,
|
|
from_playlist_id=self.playlist_id,
|
|
to_row=to_row_number,
|
|
to_playlist_id=to_playlist_id,
|
|
)
|
|
self.signals.signal_end_insert_rows.emit(to_playlist_id)
|
|
super().endRemoveRows()
|
|
|
|
self.refresh_data()
|
|
self.track_sequence.update()
|
|
self.update_track_times()
|
|
|
|
def begin_insert_rows(self, insert_rows: InsertRows) -> None:
|
|
"""
|
|
Prepare model to insert rows
|
|
"""
|
|
|
|
if insert_rows.playlist_id != self.playlist_id:
|
|
return
|
|
|
|
super().beginInsertRows(QModelIndex(), insert_rows.from_row, insert_rows.to_row)
|
|
|
|
def end_insert_rows(self, playlist_id: int) -> None:
|
|
"""
|
|
End insert rows
|
|
"""
|
|
|
|
if playlist_id != self.playlist_id:
|
|
return
|
|
|
|
super().endInsertRows()
|
|
self.refresh_data()
|
|
|
|
# @log_call
|
|
def move_track_add_note(
|
|
self, new_row_number: int, existing_plr: PlaylistRow, note: str
|
|
) -> None:
|
|
"""
|
|
Move existing_rat track to new_row_number and append note to any existing note
|
|
"""
|
|
|
|
if note:
|
|
playlist_row = self.playlist_rows[existing_plr.row_number]
|
|
if playlist_row.note:
|
|
playlist_row.note += "\n" + note
|
|
else:
|
|
playlist_row.note = note
|
|
self.refresh_row(existing_plr.row_number)
|
|
|
|
self.move_rows([existing_plr.row_number], new_row_number)
|
|
self.signals.resize_rows_signal.emit(self.playlist_id)
|
|
|
|
# @log_call
|
|
def obs_scene_change(self, row_number: int) -> None:
|
|
"""
|
|
Check this row and any preceding headers for OBS scene change command
|
|
and execute any found
|
|
"""
|
|
|
|
# Check any headers before this row
|
|
idx = row_number - 1
|
|
while self.is_header_row(idx):
|
|
idx -= 1
|
|
# Step through headers in row order and finish with this row
|
|
for chkrow in range(idx + 1, row_number + 1):
|
|
match_obj = scene_change_re.search(self.playlist_rows[chkrow].note)
|
|
if match_obj:
|
|
scene_name = match_obj.group(1)
|
|
if scene_name:
|
|
ws = obswebsocket.obsws(
|
|
host=Config.OBS_HOST,
|
|
port=Config.OBS_PORT,
|
|
password=Config.OBS_PASSWORD,
|
|
)
|
|
try:
|
|
ws.connect()
|
|
ws.call(
|
|
obswebsocket.requests.SetCurrentProgramScene(
|
|
sceneName=scene_name
|
|
)
|
|
)
|
|
log.debug(f"{self}: OBS scene changed to '{scene_name}'")
|
|
continue
|
|
except obswebsocket.exceptions.ConnectionFailure:
|
|
log.warning(f"{self}: OBS connection refused")
|
|
return
|
|
|
|
# @log_call
|
|
def previous_track_ended(self, playlist_id: int) -> None:
|
|
"""
|
|
Notification from track_ended_signal that the previous track has ended.
|
|
|
|
Actions required:
|
|
- sanity check
|
|
- update display
|
|
"""
|
|
|
|
if playlist_id != self.playlist_id:
|
|
# Not for us
|
|
return
|
|
|
|
# Sanity check
|
|
if not self.track_sequence.previous:
|
|
log.error(
|
|
f"{self}: playlistmodel:previous_track_ended called with no current track"
|
|
)
|
|
return
|
|
if self.track_sequence.previous.row_number is None:
|
|
log.error(
|
|
f"{self}: previous_track_ended called with no row number "
|
|
f"({self.track_sequence.previous=})"
|
|
)
|
|
return
|
|
|
|
# Update display
|
|
# only invalidate required roles
|
|
roles = [
|
|
Qt.ItemDataRole.BackgroundRole,
|
|
]
|
|
self.invalidate_row(self.track_sequence.previous.row_number, roles)
|
|
|
|
def refresh_data(self) -> None:
|
|
"""
|
|
Populate self.playlist_rows with playlist data
|
|
"""
|
|
|
|
# Note where each playlist_id is by mapping each playlistrow_id
|
|
# to its current row_number
|
|
plrid_to_row: dict[int, int] = {}
|
|
for oldrow in self.playlist_rows:
|
|
plrdata = self.playlist_rows[oldrow]
|
|
plrid_to_row[plrdata.playlistrow_id] = plrdata.row_number
|
|
|
|
# build a new playlist_rows
|
|
new_playlist_rows: dict[int, PlaylistRow] = {}
|
|
for dto in ds.playlistrows_by_playlist(self.playlist_id):
|
|
if dto.playlistrow_id not in plrid_to_row:
|
|
new_playlist_rows[dto.row_number] = PlaylistRow(dto)
|
|
else:
|
|
new_playlist_row = self.playlist_rows[plrid_to_row[dto.playlistrow_id]]
|
|
new_playlist_row.row_number = dto.row_number
|
|
new_playlist_rows[dto.row_number] = new_playlist_row
|
|
|
|
# Copy to self.playlist_rows
|
|
self.playlist_rows = new_playlist_rows
|
|
|
|
def refresh_row(self, row_number: int) -> None:
|
|
"""Populate dict for one row from database"""
|
|
|
|
plrid = self.playlist_rows[row_number].playlistrow_id
|
|
refreshed_row = ds.playlistrow_by_id(plrid)
|
|
if not refreshed_row:
|
|
raise ApplicationError(
|
|
f"Failed to retrieve row {self.playlist_id=}, {row_number=}"
|
|
)
|
|
|
|
self.playlist_rows[row_number] = PlaylistRow(refreshed_row)
|
|
|
|
# @log_call
|
|
def remove_track(self, row_number: int) -> None:
|
|
"""
|
|
Remove track from row, retaining row as a header row
|
|
"""
|
|
|
|
self.playlist_rows[row_number].track_id = 0
|
|
|
|
# only invalidate required roles
|
|
roles = [
|
|
Qt.ItemDataRole.DisplayRole,
|
|
]
|
|
self.invalidate_row(row_number, roles)
|
|
|
|
def rescan_track(self, row_number: int) -> None:
|
|
"""
|
|
Rescan track at passed row number
|
|
"""
|
|
|
|
track = self.playlist_rows[row_number]
|
|
metadata = get_all_track_metadata(track.path)
|
|
_ = ds.track_update(track.track_id, metadata)
|
|
|
|
roles = [
|
|
Qt.ItemDataRole.BackgroundRole,
|
|
Qt.ItemDataRole.DisplayRole,
|
|
]
|
|
# only invalidate required roles
|
|
self.invalidate_row(row_number, roles)
|
|
self.signals.resize_rows_signal.emit(self.playlist_id)
|
|
|
|
# @log_call
|
|
def reset_track_sequence_row_numbers(self) -> None:
|
|
"""
|
|
Signal handler for when row ordering has changed.
|
|
|
|
Example: row 4 is marked as next. Row 2 is deleted. The PlaylistRows table will
|
|
be correctly updated with change of row number, but track_sequence.next will still
|
|
contain row_number==4. This function fixes up the track_sequence row numbers by
|
|
looking up the playlistrow_id and retrieving the row number from the database.
|
|
"""
|
|
|
|
self.track_sequence.update()
|
|
|
|
self.update_track_times()
|
|
|
|
def remove_comments(self, row_numbers: list[int]) -> None:
|
|
"""
|
|
Remove comments from passed rows
|
|
"""
|
|
|
|
if not row_numbers:
|
|
return
|
|
|
|
# Safety check
|
|
if not ask_yes_no(
|
|
title="Remove comments",
|
|
question=f"Remove comments from {len(row_numbers)} rows?",
|
|
):
|
|
return
|
|
|
|
ds.playlist_remove_comments(self.playlist_id, row_numbers)
|
|
|
|
# only invalidate required roles
|
|
roles = [
|
|
Qt.ItemDataRole.BackgroundRole,
|
|
Qt.ItemDataRole.DisplayRole,
|
|
Qt.ItemDataRole.ForegroundRole,
|
|
]
|
|
self.invalidate_rows(row_numbers, roles)
|
|
|
|
# @log_call
|
|
def _reversed_contiguous_row_groups(
|
|
self, row_numbers: list[int]
|
|
) -> list[list[int]]:
|
|
"""
|
|
Take the list of row numbers and split into groups of contiguous rows. Return as a list
|
|
of lists with the highest row numbers first.
|
|
|
|
Example:
|
|
input [2, 3, 4, 5, 7, 9, 10, 13, 17, 20, 21]
|
|
return: [[20, 21], [17], [13], [9, 10], [7], [2, 3, 4, 5]]
|
|
"""
|
|
|
|
result: list[list[int]] = []
|
|
temp: list[int] = []
|
|
last_value = row_numbers[0] - 1
|
|
row_numbers.sort()
|
|
|
|
for idx in range(len(row_numbers)):
|
|
if row_numbers[idx] != last_value + 1:
|
|
result.append(temp)
|
|
temp = []
|
|
last_value = row_numbers[idx]
|
|
temp.append(last_value)
|
|
if temp:
|
|
result.append(temp)
|
|
result.reverse()
|
|
|
|
return result
|
|
|
|
def remove_section_timer_markers(self, header_text: str) -> str:
|
|
"""
|
|
Remove characters used to mark section timings from
|
|
passed header text.
|
|
|
|
Remove text using to signal header colours if colour entry
|
|
is so marked.
|
|
|
|
Return header text witout markers
|
|
"""
|
|
|
|
if header_text == "=":
|
|
return ""
|
|
while header_text.endswith(Config.SECTION_STARTS):
|
|
header_text = header_text[0:-1]
|
|
while header_text.endswith(Config.SECTION_ENDINGS):
|
|
header_text = header_text[0:-1]
|
|
|
|
# Parse passed header text and remove the first colour match string
|
|
return ds.notecolours_remove_colour_substring(header_text)
|
|
|
|
def rowCount(self, index: QModelIndex = QModelIndex()) -> int:
|
|
"""Standard function for view"""
|
|
|
|
return len(self.playlist_rows)
|
|
|
|
def section_subtotal_header(self, plr: PlaylistRow) -> str:
|
|
"""
|
|
Process this row as subtotal within a timed section and
|
|
return display text for this row
|
|
"""
|
|
|
|
count: int = 0
|
|
unplayed_count: int = 0
|
|
duration: int = 0
|
|
|
|
if plr.row_number == 0:
|
|
# Meaningless to have a subtotal on row 0
|
|
return Config.SUBTOTAL_ON_ROW_ZERO
|
|
|
|
# Show subtotal
|
|
for row_number in range(plr.row_number - 1, -1, -1):
|
|
row_rat = self.playlist_rows[row_number]
|
|
if self.is_header_row(row_number) or row_number == 0:
|
|
if row_rat.note.endswith(Config.SECTION_STARTS) or row_number == 0:
|
|
# If we are playing this section, also
|
|
# calculate end time when all tracks are played.
|
|
end_time_str = ""
|
|
if (
|
|
self.track_sequence.current
|
|
and self.track_sequence.current.end_time
|
|
and (
|
|
row_number
|
|
< self.track_sequence.current.row_number
|
|
< plr.row_number
|
|
)
|
|
):
|
|
section_end_time = (
|
|
self.track_sequence.current.end_time
|
|
+ dt.timedelta(milliseconds=duration)
|
|
)
|
|
end_time_str = (
|
|
", section end time "
|
|
+ section_end_time.strftime(Config.TRACK_TIME_FORMAT)
|
|
)
|
|
clean_header = self.remove_section_timer_markers(plr.note)
|
|
if clean_header:
|
|
return (
|
|
f"{clean_header} ["
|
|
f"{unplayed_count}/{count} track{'s' if count > 1 else ''} "
|
|
f"({ms_to_mmss(duration)}) unplayed{end_time_str}]"
|
|
)
|
|
else:
|
|
return (
|
|
f"[{unplayed_count}/{count} track{'s' if count > 1 else ''} "
|
|
f"({ms_to_mmss(duration)}) unplayed{end_time_str}]"
|
|
)
|
|
else:
|
|
continue
|
|
else:
|
|
count += 1
|
|
if not row_rat.played:
|
|
unplayed_count += 1
|
|
duration += row_rat.duration
|
|
|
|
# We should never get here
|
|
raise ApplicationError("Error in section_subtotal_header()")
|
|
|
|
def selection_is_sortable(self, row_numbers: list[int]) -> bool:
|
|
"""
|
|
Return True if the selection is sortable. That means:
|
|
- at least two rows selected
|
|
- selected rows are contiguous
|
|
- selected rows do not include any header rows
|
|
"""
|
|
|
|
# at least two rows selected
|
|
if len(row_numbers) < 2:
|
|
return False
|
|
|
|
# selected rows are contiguous
|
|
if sorted(row_numbers) != list(range(min(row_numbers), max(row_numbers) + 1)):
|
|
return False
|
|
|
|
# selected rows do not include any header rows
|
|
for row_number in row_numbers:
|
|
if self.is_header_row(row_number):
|
|
return False
|
|
|
|
return True
|
|
|
|
# @log_call
|
|
def set_selected_rows(self, selected_rows: SelectedRows) -> None:
|
|
"""
|
|
Handle signal_playlist_selected_rows to keep track of which rows
|
|
are selected in the view
|
|
"""
|
|
|
|
if selected_rows.playlist_id != self.playlist_id:
|
|
return
|
|
|
|
self.selected_rows = [self.playlist_rows[a] for a in selected_rows.rows]
|
|
|
|
# @log_call
|
|
def set_next_row(self, playlist_id: int) -> None:
|
|
"""
|
|
Handle signal_set_next_row
|
|
"""
|
|
|
|
if playlist_id != self.playlist_id:
|
|
return
|
|
|
|
if len(self.selected_rows) == 0:
|
|
# No row selected so clear next track
|
|
if self.track_sequence.next is not None:
|
|
self.track_sequence.set_next(None)
|
|
return
|
|
|
|
if len(self.selected_rows) > 1:
|
|
self.signals.show_warning_signal.emit(
|
|
"Too many rows selected", "Select one row for next row"
|
|
)
|
|
return
|
|
|
|
plr = self.selected_rows[0]
|
|
if plr.track_id is None:
|
|
raise ApplicationError(f"set_next_row: no track_id ({plr=})")
|
|
|
|
old_next_row: Optional[int] = None
|
|
if self.track_sequence.next:
|
|
old_next_row = self.track_sequence.next.row_number
|
|
|
|
roles = [
|
|
Qt.ItemDataRole.BackgroundRole,
|
|
]
|
|
if old_next_row is not None:
|
|
# only invalidate required roles
|
|
self.invalidate_row(old_next_row, roles)
|
|
# only invalidate required roles
|
|
self.invalidate_row(plr.row_number, roles)
|
|
|
|
self.signals.signal_set_next_track.emit(plr)
|
|
|
|
def signal_next_track_changed_handler(self) -> None:
|
|
"""
|
|
Handle next track changed
|
|
"""
|
|
|
|
self.update_track_times()
|
|
# Refresh display to show new next track
|
|
if self.track_sequence.next:
|
|
next_row_number = self.track_sequence.next.row_number
|
|
if next_row_number is not None:
|
|
self.invalidate_row(next_row_number, [Qt.ItemDataRole.BackgroundRole])
|
|
|
|
# @log_call
|
|
def setData(
|
|
self,
|
|
index: QModelIndex,
|
|
value: str | float,
|
|
role: int = Qt.ItemDataRole.EditRole,
|
|
) -> bool:
|
|
"""
|
|
Update model with edited data. Here we simply update the
|
|
playlist_row in self.playlist_rows. The act of doing that will
|
|
trigger a database update in the @setter property in the
|
|
PlaylistRow class.
|
|
"""
|
|
|
|
if not index.isValid() or role != Qt.ItemDataRole.EditRole:
|
|
return False
|
|
|
|
row_number = index.row()
|
|
column = index.column()
|
|
plr = self.playlist_rows[row_number]
|
|
|
|
if column == Col.NOTE.value:
|
|
plr.note = str(value)
|
|
|
|
elif column == Col.TITLE.value:
|
|
plr.title = str(value)
|
|
|
|
elif column == Col.ARTIST.value:
|
|
plr.artist = str(value)
|
|
|
|
elif column == Col.INTRO.value:
|
|
intro = int(round(float(value), 1) * 1000)
|
|
plr.intro = intro
|
|
|
|
else:
|
|
raise ApplicationError(f"setData called with unexpected column ({column=})")
|
|
|
|
self.refresh_row(row_number)
|
|
self.dataChanged.emit(index, index, [Qt.ItemDataRole.DisplayRole, role])
|
|
|
|
return True
|
|
|
|
def sort_by_artist(self, row_numbers: list[int]) -> None:
|
|
"""
|
|
Sort selected rows by artist
|
|
"""
|
|
|
|
self.sort_by_attribute(row_numbers, "artist")
|
|
|
|
def sort_by_attribute(self, row_numbers: list[int], attr_name: str) -> None:
|
|
"""
|
|
Sort selected rows by passed attribute name where 'attribute' is a
|
|
key in PlaylistRowData
|
|
"""
|
|
|
|
# Create a subset of playlist_rows with the rows we are
|
|
# interested in
|
|
shortlist_rows = {k: self.playlist_rows[k] for k in row_numbers}
|
|
sorted_list = [
|
|
playlist_row.row_number
|
|
for playlist_row in sorted(
|
|
shortlist_rows.values(), key=attrgetter(attr_name)
|
|
)
|
|
]
|
|
self.move_rows(sorted_list, min(sorted_list))
|
|
|
|
def sort_by_duration(self, row_numbers: list[int]) -> None:
|
|
"""
|
|
Sort selected rows by duration
|
|
"""
|
|
|
|
self.sort_by_attribute(row_numbers, "duration")
|
|
|
|
def sort_by_lastplayed(self, row_numbers: list[int]) -> None:
|
|
"""
|
|
Sort selected rows by lastplayed
|
|
"""
|
|
|
|
self.sort_by_attribute(row_numbers, "lastplayed")
|
|
|
|
def sort_randomly(self, row_numbers: list[int]) -> None:
|
|
"""
|
|
Sort selected rows randomly
|
|
"""
|
|
|
|
shuffle(row_numbers)
|
|
self.move_rows(row_numbers, min(row_numbers))
|
|
|
|
def sort_by_title(self, row_numbers: list[int]) -> None:
|
|
"""
|
|
Sort selected rows by title
|
|
"""
|
|
|
|
self.sort_by_attribute(row_numbers, "title")
|
|
|
|
def start_of_timed_section_header(self, plr: PlaylistRow) -> str:
|
|
"""
|
|
Process this row as the start of a timed section and
|
|
return display text for this row
|
|
"""
|
|
|
|
count: int = 0
|
|
unplayed_count: int = 0
|
|
duration: int = 0
|
|
|
|
clean_header = self.remove_section_timer_markers(plr.note)
|
|
|
|
for row_number in range(plr.row_number + 1, len(self.playlist_rows)):
|
|
row_rat = self.playlist_rows[row_number]
|
|
if self.is_header_row(row_number):
|
|
if row_rat.note.endswith(Config.SECTION_ENDINGS):
|
|
return (
|
|
f"{clean_header} "
|
|
f"[{count} tracks, {ms_to_mmss(duration)} unplayed]"
|
|
)
|
|
else:
|
|
continue
|
|
else:
|
|
count += 1
|
|
if not row_rat.played:
|
|
unplayed_count += 1
|
|
duration += row_rat.duration
|
|
return (
|
|
f"{clean_header} "
|
|
f"[{count} tracks, {ms_to_mmss(duration, none='none')} "
|
|
"unplayed (to end of playlist)]"
|
|
)
|
|
|
|
def supportedDropActions(self) -> Qt.DropAction:
|
|
return Qt.DropAction.MoveAction | Qt.DropAction.CopyAction
|
|
|
|
def _tooltip_role(self, row: int, column: int, plr: PlaylistRow) -> str:
|
|
"""
|
|
Return tooltip. Currently only used for last_played column.
|
|
"""
|
|
|
|
if column != Col.LAST_PLAYED.value:
|
|
return ""
|
|
|
|
track_id = self.playlist_rows[row].track_id
|
|
if not track_id:
|
|
return ""
|
|
|
|
return ds.playdates_get_last(track_id)
|
|
|
|
# @log_call
|
|
def update_or_insert(self, track_id: int, row_number: int) -> None:
|
|
"""
|
|
If the passed track_id exists in this playlist, update the
|
|
row(s), otherwise insert this track at row_number.
|
|
"""
|
|
|
|
track_rows = [
|
|
a.row_number for a in self.playlist_rows.values() if a.track_id == track_id
|
|
]
|
|
if track_rows:
|
|
for row in track_rows:
|
|
self.refresh_row(row)
|
|
# only invalidate required roles
|
|
roles = [
|
|
Qt.ItemDataRole.BackgroundRole,
|
|
Qt.ItemDataRole.DisplayRole,
|
|
Qt.ItemDataRole.FontRole,
|
|
Qt.ItemDataRole.ForegroundRole,
|
|
]
|
|
self.invalidate_rows(track_rows, roles)
|
|
else:
|
|
self.insert_row_signal_handler(
|
|
InsertTrack(playlist_id=self.playlist_id, track_id=track_id, note="")
|
|
)
|
|
|
|
# @log_call
|
|
def update_track_times(self) -> None:
|
|
"""
|
|
Update track start/end times in self.playlist_rows
|
|
"""
|
|
|
|
next_start_time: Optional[dt.datetime] = None
|
|
update_rows: list[int] = []
|
|
row_count = len(self.playlist_rows)
|
|
|
|
current_track_row = None
|
|
next_track_row = None
|
|
if (
|
|
self.track_sequence.current
|
|
and self.track_sequence.current.playlist_id == self.playlist_id
|
|
):
|
|
current_track_row = self.track_sequence.current.row_number
|
|
# Update current track details now so that they are available
|
|
# when we deal with next track row which may be above current
|
|
# track row.
|
|
self.playlist_rows[current_track_row].set_forecast_start_time(
|
|
update_rows, self.track_sequence.current.start_time
|
|
)
|
|
|
|
if (
|
|
self.track_sequence.next
|
|
and self.track_sequence.next.playlist_id == self.playlist_id
|
|
):
|
|
next_track_row = self.track_sequence.next.row_number
|
|
|
|
for row_number in range(row_count):
|
|
plr = self.playlist_rows[row_number]
|
|
|
|
# Don't update times for tracks that have been played, for
|
|
# unreadable tracks or for the current track, handled above.
|
|
if (
|
|
plr.played
|
|
or row_number == current_track_row
|
|
or (plr.path and file_is_unreadable(plr.path))
|
|
):
|
|
continue
|
|
|
|
# Reset start time if timing in header
|
|
if self.is_header_row(row_number):
|
|
header_time = get_embedded_time(plr.note)
|
|
if header_time:
|
|
next_start_time = header_time
|
|
continue
|
|
|
|
# Set start time for next row if we have a current track
|
|
if (
|
|
row_number == next_track_row
|
|
and self.track_sequence.current
|
|
and self.track_sequence.current.end_time
|
|
):
|
|
next_start_time = plr.set_forecast_start_time(
|
|
update_rows, self.track_sequence.current.end_time
|
|
)
|
|
continue
|
|
|
|
# If we're between the current and next row, zero out
|
|
# times
|
|
if (current_track_row or row_count) < row_number < (next_track_row or 0):
|
|
plr.set_forecast_start_time(update_rows, None)
|
|
continue
|
|
|
|
# Set start/end
|
|
plr.forecast_start_time = next_start_time
|
|
|
|
# Update start/stop times of rows that have changed
|
|
for updated_row in update_rows:
|
|
self.dataChanged.emit(
|
|
self.index(updated_row, Col.START_TIME.value),
|
|
self.index(updated_row, Col.END_TIME.value),
|
|
)
|
|
|
|
|
|
class PlaylistProxyModel(QSortFilterProxyModel):
|
|
"""
|
|
For searching and filtering
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
) -> None:
|
|
super().__init__()
|
|
|
|
# Search all columns
|
|
self.setFilterKeyColumn(-1)
|
|
|
|
self.track_sequence = TrackSequence()
|
|
|
|
def __repr__(self) -> str:
|
|
return f"<PlaylistProxyModel: sourceModel={self.sourceModel()}>"
|
|
|
|
def filterAcceptsRow(self, source_row: int, source_parent: QModelIndex) -> bool:
|
|
"""
|
|
Subclass to filter by played status. Return True to show this row, False to hide it.
|
|
"""
|
|
|
|
if Config.HIDE_PLAYED_MODE != Config.HIDE_PLAYED_MODE_TRACKS:
|
|
return super().filterAcceptsRow(source_row, source_parent)
|
|
|
|
if self.sourceModel().played_tracks_hidden:
|
|
if self.sourceModel().is_played_row(source_row):
|
|
# Don't hide current track
|
|
if (
|
|
self.track_sequence.current
|
|
and self.track_sequence.current.playlist_id
|
|
== self.sourceModel().playlist_id
|
|
and self.track_sequence.current.row_number == source_row
|
|
):
|
|
return True
|
|
|
|
# Don't hide next track
|
|
if (
|
|
self.track_sequence.next
|
|
and self.track_sequence.next.playlist_id
|
|
== self.sourceModel().playlist_id
|
|
and self.track_sequence.next.row_number == source_row
|
|
):
|
|
return True
|
|
|
|
# Handle previous track
|
|
if self.track_sequence.previous:
|
|
if (
|
|
self.track_sequence.previous.playlist_id
|
|
!= self.sourceModel().playlist_id
|
|
or self.track_sequence.previous.row_number != source_row
|
|
):
|
|
# This row isn't our previous track: hide it
|
|
return False
|
|
if (
|
|
self.track_sequence.current
|
|
and self.track_sequence.current.start_time
|
|
):
|
|
# This row is our previous track. Don't hide it
|
|
# until HIDE_AFTER_PLAYING_OFFSET milliseconds
|
|
# after current track has started
|
|
if (
|
|
self.track_sequence.current.start_time
|
|
and dt.datetime.now()
|
|
> (
|
|
self.track_sequence.current.start_time
|
|
+ dt.timedelta(
|
|
milliseconds=Config.HIDE_AFTER_PLAYING_OFFSET
|
|
)
|
|
)
|
|
):
|
|
return False
|
|
else:
|
|
# Invalidate this row in
|
|
# HIDE_AFTER_PLAYING_OFFSET and a bit
|
|
# milliseconds so that it hides then. We add
|
|
# 100mS on so that the if clause above is
|
|
# true next time through.
|
|
# only invalidate required roles
|
|
roles = [
|
|
Qt.ItemDataRole.DisplayRole,
|
|
]
|
|
QTimer.singleShot(
|
|
Config.HIDE_AFTER_PLAYING_OFFSET + 100,
|
|
lambda: self.sourceModel().invalidate_row(
|
|
source_row, roles
|
|
),
|
|
)
|
|
return True
|
|
# Next track not playing yet so don't hide previous
|
|
else:
|
|
return True
|
|
|
|
# No previous track so hide this played track immediately
|
|
return False
|
|
|
|
return super().filterAcceptsRow(source_row, source_parent)
|
|
|
|
def set_incremental_search(self, search_string: str) -> None:
|
|
"""
|
|
Update search pattern
|
|
"""
|
|
|
|
self.setFilterRegularExpression(
|
|
QRegularExpression(
|
|
search_string, QRegularExpression.PatternOption.CaseInsensitiveOption
|
|
)
|
|
)
|
|
|
|
def sourceModel(self) -> PlaylistModel:
|
|
"""
|
|
Override sourceModel to return correct type
|
|
"""
|
|
|
|
return cast(PlaylistModel, super().sourceModel())
|