musicmuster/app/playlistmodel.py
2026-01-04 13:54:14 +00:00

1837 lines
64 KiB
Python

# Standard library imports
from __future__ import annotations
from operator import attrgetter
from random import shuffle
from typing import cast, Optional
import datetime as dt
import re
# PyQt imports
from PyQt6.QtCore import (
QAbstractTableModel,
QModelIndex,
QRegularExpression,
QSortFilterProxyModel,
Qt,
QTimer,
QVariant,
)
from PyQt6.QtGui import (
QBrush,
QColor,
QFont,
)
# Third party imports
# import line_profiler
from sqlalchemy.orm.session import Session
import obswebsocket # type: ignore
# import snoop # type: ignore
# App imports
from classes import (
ApplicationError,
Col,
MusicMusterSignals,
)
from config import Config
from helpers import (
ask_yes_no,
file_is_unreadable,
get_embedded_time,
get_relative_date,
ms_to_mmss,
remove_substring_case_insensitive,
set_track_metadata,
)
from log import log, log_call
from models import db, NoteColours, Playdates, PlaylistRows, Tracks
from music_manager import RowAndTrack, track_sequence
HEADER_NOTES_COLUMN = 0
scene_change_re = re.compile(r"SetScene=\[([^[\]]*)\]")
class PlaylistModel(QAbstractTableModel):
"""
The Playlist Model
Update strategy: update the database and then refresh the
row-indexed cached copy (self.playlist_rows). Do not edit
self.playlist_rows directly because keeping it and the
database in sync is uncessarily challenging.
refresh_row() will populate one row of playlist_rows from the
database
refresh_data() will repopulate all of playlist_rows from the
database.
"""
def __init__(
self,
playlist_id: int,
is_template: bool,
) -> None:
super().__init__()
log.debug("PlaylistModel.__init__()")
self.playlist_id = playlist_id
self.is_template = is_template
self.playlist_rows: dict[int, RowAndTrack] = {}
self.signals = MusicMusterSignals()
self.played_tracks_hidden = False
self.signals.begin_reset_model_signal.connect(self.begin_reset_model)
self.signals.end_reset_model_signal.connect(self.end_reset_model)
with db.Session() as session:
# Ensure row numbers in playlist are contiguous
PlaylistRows.fixup_rownumbers(session, playlist_id)
# Populate self.playlist_rows
self.load_data(session)
self.update_track_times()
def __repr__(self) -> str:
return (
f"<PlaylistModel: playlist_id={self.playlist_id}, "
f"is_template={self.is_template}, "
f"{self.rowCount()} rows>"
)
def active_section_header(self) -> int:
"""
Return the row number of the first header that has any of the following below it:
- unplayed tracks
- the currently being played track
- the track marked as next to play
"""
header_row = 0
for row_number in range(len(self.playlist_rows)):
if self.is_header_row(row_number):
header_row = row_number
continue
if not self.is_played_row(row_number):
break
# Here means that row_number points to a played track. The
# current track will be marked as played when we start
# playing it. It's also possible that the track marked as
# next has already been played. Check for either of those.
for ts in [track_sequence.next, track_sequence.current]:
if (
ts
and ts.row_number == row_number
and ts.playlist_id == self.playlist_id
):
# We've found the current or next track, so return
# the last-found header row
return header_row
return header_row
def add_track_to_header(
self, row_number: int, track_id: int, note: Optional[str] = None
) -> None:
"""
Add track to existing header row
"""
log.debug(f"{self}: add_track_to_header({row_number=}, {track_id=}, {note=}")
# Get existing row
try:
rat = self.playlist_rows[row_number]
except KeyError:
raise ApplicationError(
f"{self}: KeyError in add_track_to_header ({row_number=}, {track_id=})"
)
if rat.path:
raise ApplicationError(
f"{self}: Header row already has track associated ({rat=}, {track_id=})"
)
with db.Session() as session:
playlistrow = session.get(PlaylistRows, rat.playlistrow_id)
if not playlistrow:
raise ApplicationError(
f"{self}: Failed to retrieve playlist row ({rat.playlistrow_id=}"
)
# Add track to PlaylistRows
playlistrow.track_id = track_id
# Add any further note (header will already have a note)
if note:
playlistrow.note += " " + note
session.commit()
# Update local copy
self.refresh_row(session, row_number)
# Repaint row
roles = [
Qt.ItemDataRole.BackgroundRole,
Qt.ItemDataRole.DisplayRole,
Qt.ItemDataRole.FontRole,
Qt.ItemDataRole.ForegroundRole,
]
# only invalidate required roles
self.invalidate_row(row_number, roles)
self.signals.resize_rows_signal.emit(self.playlist_id)
def _background_role(self, row: int, column: int, rat: RowAndTrack) -> QBrush:
"""Return background setting"""
# Handle entire row colouring
# Header row
if self.is_header_row(row):
# Check for specific header colouring
if rat.row_bg is None:
with db.Session() as session:
rat.row_bg = NoteColours.get_colour(session, rat.note)
if rat.row_bg:
return QBrush(QColor(rat.row_bg))
else:
return QBrush(QColor(Config.COLOUR_NOTES_PLAYLIST))
# Unreadable track file
if file_is_unreadable(rat.path):
return QBrush(QColor(Config.COLOUR_UNREADABLE))
# Current track
if (
track_sequence.current
and track_sequence.current.playlist_id == self.playlist_id
and track_sequence.current.row_number == row
):
return QBrush(QColor(Config.COLOUR_CURRENT_PLAYLIST))
# Next track
if (
track_sequence.next
and track_sequence.next.playlist_id == self.playlist_id
and track_sequence.next.row_number == row
):
return QBrush(QColor(Config.COLOUR_NEXT_PLAYLIST))
# Individual cell colouring
if column == Col.START_GAP.value:
if rat.start_gap and rat.start_gap >= Config.START_GAP_WARNING_THRESHOLD:
return QBrush(QColor(Config.COLOUR_LONG_START))
if column == Col.BITRATE.value:
if not rat.bitrate or rat.bitrate < Config.BITRATE_LOW_THRESHOLD:
return QBrush(QColor(Config.COLOUR_BITRATE_LOW))
elif rat.bitrate < Config.BITRATE_OK_THRESHOLD:
return QBrush(QColor(Config.COLOUR_BITRATE_MEDIUM))
else:
return QBrush(QColor(Config.COLOUR_BITRATE_OK))
if column == Col.NOTE.value:
if rat.note:
if rat.note_bg is None:
with db.Session() as session:
rat.note_bg = NoteColours.get_colour(session, rat.note)
if rat.note_bg:
return QBrush(QColor(rat.note_bg))
return QBrush()
def begin_reset_model(self, playlist_id: int) -> None:
"""
Reset model if playlist_id is ours
"""
if playlist_id != self.playlist_id:
return
super().beginResetModel()
def columnCount(self, parent: QModelIndex = QModelIndex()) -> int:
"""Standard function for view"""
return len(Col)
def current_track_started(self) -> None:
"""
Notification from musicmuster that the current track has just
started playing
Actions required:
- sanity check
- change OBS scene if needed
- update Playdates in database
- update PlaylistRows in database
- update display
- find next track
- update track times
"""
log.debug(f"{self}: current_track_started()")
if not track_sequence.current:
return
row_number = track_sequence.current.row_number
# Check for OBS scene change
self.obs_scene_change(row_number)
# Sanity check that we have a track_id
track_id = track_sequence.current.track_id
if not track_id:
raise ApplicationError(
f"{self}: current_track_started() called with {track_id=}"
)
with db.Session() as session:
# Update Playdates in database
log.debug(f"{self}: update playdates {track_id=}")
Playdates(session, track_id)
# Mark track as played in playlist
log.debug(f"{self}: Mark track as played")
plr = session.get(PlaylistRows, track_sequence.current.playlistrow_id)
if plr:
plr.played = True
self.refresh_row(session, plr.row_number)
else:
log.error(
f"{self}: Can't retrieve plr, "
f"{track_sequence.current.playlistrow_id=}"
)
session.commit()
# Update colour and times for current row
# only invalidate required roles
roles = [
Qt.ItemDataRole.DisplayRole
]
self.invalidate_row(row_number, roles)
# Update previous row in case we're hiding played rows
if track_sequence.previous and track_sequence.previous.row_number:
# only invalidate required roles
self.invalidate_row(track_sequence.previous.row_number, roles)
# Find next track
next_row = None
unplayed_rows = [
a
for a in self.get_unplayed_rows()
if not self.is_header_row(a)
and not file_is_unreadable(self.playlist_rows[a].path)
]
if unplayed_rows:
try:
next_row = min([a for a in unplayed_rows if a > row_number])
except ValueError:
next_row = min(unplayed_rows)
if next_row is not None:
self.set_next_row(next_row)
else:
# set_next_row() calls update_track_times(); else we call it
self.update_track_times()
def data(
self, index: QModelIndex, role: int = Qt.ItemDataRole.DisplayRole
) -> QVariant | QFont | QBrush | str | int:
"""Return data to view"""
if (
not index.isValid()
or not (0 <= index.row() < len(self.playlist_rows))
or role
in [
Qt.ItemDataRole.DecorationRole,
Qt.ItemDataRole.StatusTipRole,
Qt.ItemDataRole.WhatsThisRole,
Qt.ItemDataRole.SizeHintRole,
Qt.ItemDataRole.TextAlignmentRole,
Qt.ItemDataRole.CheckStateRole,
Qt.ItemDataRole.InitialSortOrderRole,
]
):
return QVariant()
row = index.row()
column = index.column()
# rat for playlist row data as it's used a lot
rat = self.playlist_rows[row]
# These are ordered in approximately the frequency with which
# they are called
if role == Qt.ItemDataRole.BackgroundRole:
return self._background_role(row, column, rat)
elif role == Qt.ItemDataRole.DisplayRole:
return self._display_role(row, column, rat)
elif role == Qt.ItemDataRole.EditRole:
return self._edit_role(row, column, rat)
elif role == Qt.ItemDataRole.FontRole:
return self._font_role(row, column, rat)
elif role == Qt.ItemDataRole.ForegroundRole:
return self._foreground_role(row, column, rat)
elif role == Qt.ItemDataRole.ToolTipRole:
return self._tooltip_role(row, column, rat)
return QVariant()
def delete_rows(self, row_numbers: list[int]) -> None:
"""
Delete passed rows from model
Need to delete them in contiguous groups wrapped in beginRemoveRows / endRemoveRows
calls. To keep it simple, if inefficient, delete rows one by one.
TODO: delete in blocks
Delete from highest row back so that not yet deleted row numbers don't change.
"""
with db.Session() as session:
for row_number in sorted(row_numbers, reverse=True):
log.debug(f"{self}: delete_rows(), {row_number=}")
super().beginRemoveRows(QModelIndex(), row_number, row_number)
# We need to remove data from the underlying data store,
# which is the database, but we cache in
# self.playlist_rows, which is what calls to data()
# reads, so fixup that too.
PlaylistRows.delete_row(session, self.playlist_id, row_number)
PlaylistRows.fixup_rownumbers(session, self.playlist_id)
self.refresh_data(session)
session.commit()
super().endRemoveRows()
self.reset_track_sequence_row_numbers()
self.update_track_times()
def _display_role(self, row: int, column: int, rat: RowAndTrack) -> str:
"""
Return text for display
"""
header_row = self.is_header_row(row)
# Set / reset column span
if column == HEADER_NOTES_COLUMN:
column_span = 1
if header_row:
column_span = self.columnCount() - HEADER_NOTES_COLUMN
self.signals.span_cells_signal.emit(
self.playlist_id, row, HEADER_NOTES_COLUMN, 1, column_span
)
if header_row:
if column == HEADER_NOTES_COLUMN:
header_text = self.header_text(rat)
if not header_text:
return Config.SECTION_HEADER
else:
formatted_header = self.header_text(rat)
trimmed_header = self.remove_section_timer_markers(formatted_header)
return trimmed_header
else:
return ""
if column == Col.START_TIME.value:
start_time = rat.forecast_start_time
if start_time:
return start_time.strftime(Config.TRACK_TIME_FORMAT)
return ""
if column == Col.END_TIME.value:
end_time = rat.forecast_end_time
if end_time:
return end_time.strftime(Config.TRACK_TIME_FORMAT)
return ""
if column == Col.INTRO.value:
if rat.intro:
return f"{rat.intro / 1000:{Config.INTRO_SECONDS_FORMAT}}"
else:
return ""
dispatch_table: dict[int, str] = {
Col.ARTIST.value: rat.artist,
Col.BITRATE.value: str(rat.bitrate),
Col.DURATION.value: ms_to_mmss(rat.duration),
Col.LAST_PLAYED.value: get_relative_date(rat.lastplayed),
Col.NOTE.value: rat.note,
Col.START_GAP.value: str(rat.start_gap),
Col.TITLE.value: rat.title,
}
if column in dispatch_table:
return dispatch_table[column]
return ""
def end_reset_model(self, playlist_id: int) -> None:
"""
End model reset if this is our playlist
"""
log.debug(f"{self}: end_reset_model({playlist_id=})")
if playlist_id != self.playlist_id:
log.debug(f"{self}: end_reset_model: not us ({self.playlist_id=})")
return
with db.Session() as session:
self.refresh_data(session)
super().endResetModel()
self.reset_track_sequence_row_numbers()
def _edit_role(self, row: int, column: int, rat: RowAndTrack) -> str | int:
"""
Return value for editing
"""
# If this is a header row and we're being asked for the
# HEADER_NOTES_COLUMN, return the note value
if self.is_header_row(row) and column == HEADER_NOTES_COLUMN:
return rat.note
if column == Col.INTRO.value:
return rat.intro or 0
if column == Col.TITLE.value:
return rat.title
if column == Col.ARTIST.value:
return rat.artist
if column == Col.NOTE.value:
return rat.note
return ""
def _foreground_role(self, row: int, column: int, rat: RowAndTrack) -> QBrush:
"""Return header foreground colour or QBrush() if none"""
if self.is_header_row(row):
if rat.row_fg is None:
with db.Session() as session:
rat.row_fg = NoteColours.get_colour(
session, rat.note, foreground=True
)
if rat.row_fg:
return QBrush(QColor(rat.row_fg))
return QBrush()
def flags(self, index: QModelIndex) -> Qt.ItemFlag:
"""
Standard model flags
"""
if not index.isValid():
return Qt.ItemFlag.ItemIsDropEnabled
default = (
Qt.ItemFlag.ItemIsEnabled
| Qt.ItemFlag.ItemIsSelectable
| Qt.ItemFlag.ItemIsDragEnabled
)
if index.column() in [
Col.TITLE.value,
Col.ARTIST.value,
Col.NOTE.value,
Col.INTRO.value,
] or self.is_header_row(index.row()) and index.column() == HEADER_NOTES_COLUMN:
return default | Qt.ItemFlag.ItemIsEditable
return default
def _font_role(self, row: int, column: int, rat: RowAndTrack) -> QFont:
"""
Return font
"""
# Notes column is never bold
if column == Col.NOTE.value:
return QFont()
boldfont = QFont()
boldfont.setBold(not self.playlist_rows[row].played)
return boldfont
def get_duplicate_rows(self) -> list[int]:
"""
Return a list of duplicate rows. If track appears in rows 2, 3 and 4, return [3, 4]
(ie, ignore the first, not-yet-duplicate, track).
"""
log.debug(f"{self}: get_duplicate_rows() called")
found = []
result = []
for i in range(len(self.playlist_rows)):
track_id = self.playlist_rows[i].track_id
if track_id is None:
continue
if track_id in found:
result.append(i)
else:
found.append(track_id)
log.debug(f"{self}: get_duplicate_rows() returned: {result=}")
return result
def _get_new_row_number(self, proposed_row_number: Optional[int]) -> int:
"""
Sanitises proposed new row number.
If proposed_row_number given, ensure it is valid.
If not given, return row number to add to end of model.
"""
log.debug(f"{self}: _get_new_row_number({proposed_row_number=})")
if proposed_row_number is None or proposed_row_number > len(self.playlist_rows):
# We are adding to the end of the list
new_row_number = len(self.playlist_rows)
elif proposed_row_number < 0:
# Add to start of list
new_row_number = 0
else:
new_row_number = proposed_row_number
log.debug(f"{self}: get_new_row_number() return: {new_row_number=}")
return new_row_number
def get_row_info(self, row_number: int) -> RowAndTrack:
"""
Return info about passed row
"""
return self.playlist_rows[row_number]
def get_row_track_id(self, row_number: int) -> Optional[int]:
"""
Return id of track associated with row or None if no track associated
"""
return self.playlist_rows[row_number].track_id
def get_row_track_path(self, row_number: int) -> str:
"""
Return path of track associated with row or empty string if no track associated
"""
return self.playlist_rows[row_number].path
def get_rows_duration(self, row_numbers: list[int]) -> int:
"""
Return the total duration of the passed rows
"""
duration = 0
for row_number in row_numbers:
duration += self.playlist_rows[row_number].duration
return duration
def get_unplayed_rows(self) -> list[int]:
"""
Return a list of unplayed row numbers
"""
result = [
a.row_number
for a in self.playlist_rows.values()
if not a.played and a.track_id is not None
]
return result
def headerData(
self,
section: int,
orientation: Qt.Orientation,
role: int = Qt.ItemDataRole.DisplayRole,
) -> str | int | QFont | QVariant:
"""
Return text for headers
"""
display_dispatch_table = {
Col.START_GAP.value: Config.HEADER_START_GAP,
Col.INTRO.value: Config.HEADER_INTRO,
Col.TITLE.value: Config.HEADER_TITLE,
Col.ARTIST.value: Config.HEADER_ARTIST,
Col.DURATION.value: Config.HEADER_DURATION,
Col.START_TIME.value: Config.HEADER_START_TIME,
Col.END_TIME.value: Config.HEADER_END_TIME,
Col.LAST_PLAYED.value: Config.HEADER_LAST_PLAYED,
Col.BITRATE.value: Config.HEADER_BITRATE,
Col.NOTE.value: Config.HEADER_NOTE,
}
if role == Qt.ItemDataRole.DisplayRole:
if orientation == Qt.Orientation.Horizontal:
return display_dispatch_table[section]
else:
if Config.ROWS_FROM_ZERO:
return section
else:
return section + 1
elif role == Qt.ItemDataRole.FontRole:
boldfont = QFont()
boldfont.setBold(True)
return boldfont
return QVariant()
def header_text(self, rat: RowAndTrack) -> str:
"""
Process possible section timing directives embeded in header
"""
if rat.note.endswith(Config.SECTION_STARTS):
return self.start_of_timed_section_header(rat)
elif rat.note.endswith("="):
return self.section_subtotal_header(rat)
elif rat.note == "-":
# If the hyphen is the only thing on the line, echo the note
# that started the section without the trailing "+".
for row_number in range(rat.row_number - 1, -1, -1):
row_rat = self.playlist_rows[row_number]
if self.is_header_row(row_number):
if row_rat.note.endswith("-"):
# We didn't find a matching section start
break
if row_rat.note.endswith("+"):
return f"[End: {row_rat.note[:-1]}]"
return "-"
return rat.note
def hide_played_tracks(self, hide: bool) -> None:
"""
Set played tracks hidden according to 'hide'
"""
self.played_tracks_hidden = hide
for row_number in range(len(self.playlist_rows)):
if self.is_played_row(row_number):
# only invalidate required roles
roles = [
Qt.ItemDataRole.DisplayRole,
]
self.invalidate_row(row_number, roles)
def insert_row(
self,
proposed_row_number: Optional[int],
track_id: Optional[int] = None,
note: str = "",
) -> None:
"""
Insert a row.
"""
log.debug(f"{self}: insert_row({proposed_row_number=}, {track_id=}, {note=})")
new_row_number = self._get_new_row_number(proposed_row_number)
with db.Session() as session:
super().beginInsertRows(QModelIndex(), new_row_number, new_row_number)
_ = PlaylistRows.insert_row(
session=session,
playlist_id=self.playlist_id,
new_row_number=new_row_number,
note=note,
track_id=track_id,
)
session.commit()
self.refresh_data(session)
super().endInsertRows()
self.signals.resize_rows_signal.emit(self.playlist_id)
self.reset_track_sequence_row_numbers()
# only invalidate required roles
roles = [
Qt.ItemDataRole.BackgroundRole,
Qt.ItemDataRole.DisplayRole,
Qt.ItemDataRole.FontRole,
Qt.ItemDataRole.ForegroundRole,
]
self.invalidate_rows(list(range(new_row_number, len(self.playlist_rows))), roles)
def invalidate_row(self, modified_row: int, roles: list[Qt.ItemDataRole]) -> None:
"""
Signal to view to refresh invalidated row
"""
self.dataChanged.emit(
self.index(modified_row, 0),
self.index(modified_row, self.columnCount() - 1),
roles
)
def invalidate_rows(self, modified_rows: list[int], roles: list[Qt.ItemDataRole]) -> None:
"""
Signal to view to refresh invlidated rows
"""
for modified_row in modified_rows:
# only invalidate required roles
self.invalidate_row(modified_row, roles)
def is_header_row(self, row_number: int) -> bool:
"""
Return True if row is a header row, else False
"""
if row_number in self.playlist_rows:
return self.playlist_rows[row_number].path == ""
return False
def is_played_row(self, row_number: int) -> bool:
"""
Return True if row is an unplayed track row, else False
"""
return self.playlist_rows[row_number].played
def is_track_in_playlist(self, track_id: int) -> Optional[RowAndTrack]:
"""
If this track_id is in the playlist, return the RowAndTrack object
else return None
"""
for row_number in range(len(self.playlist_rows)):
if self.playlist_rows[row_number].track_id == track_id:
return self.playlist_rows[row_number]
return None
def load_data(self, session: Session) -> None:
"""
Same as refresh data, but only used when creating playslit.
Distinguishes profile time between initial load and other
refreshes.
"""
# We used to clear self.playlist_rows each time but that's
# expensive and slow on big playlists
# Note where each playlist_id is
plid_to_row: dict[int, int] = {}
for oldrow in self.playlist_rows:
plrdata = self.playlist_rows[oldrow]
plid_to_row[plrdata.playlistrow_id] = plrdata.row_number
# build a new playlist_rows
new_playlist_rows: dict[int, RowAndTrack] = {}
for p in PlaylistRows.get_playlist_rows(session, self.playlist_id):
if p.id not in plid_to_row:
new_playlist_rows[p.row_number] = RowAndTrack(p)
else:
new_playlist_rows[p.row_number] = self.playlist_rows[plid_to_row[p.id]]
new_playlist_rows[p.row_number].row_number = p.row_number
# Copy to self.playlist_rows
self.playlist_rows = new_playlist_rows
def mark_unplayed(self, row_numbers: list[int]) -> None:
"""
Mark row as unplayed
"""
with db.Session() as session:
for row_number in row_numbers:
playlist_row = session.get(
PlaylistRows, self.playlist_rows[row_number].playlistrow_id
)
if not playlist_row:
return
playlist_row.played = False
session.commit()
self.refresh_row(session, row_number)
self.update_track_times()
# only invalidate required roles
roles = [
Qt.ItemDataRole.FontRole,
]
self.invalidate_rows(row_numbers, roles)
def move_rows(self, from_rows: list[int], to_row_number: int) -> None:
"""
Move the playlist rows given to to_row and below.
"""
log.debug(f"{self}: move_rows({from_rows=}, {to_row_number=}")
# Build a {current_row_number: new_row_number} dictionary
row_map: dict[int, int] = {}
# The destination row number will need to be reduced by the
# number of rows being move from above the destination row
# otherwise rows below the destination row will end up above the
# moved rows.
adjusted_to_row = to_row_number - len(
[a for a in from_rows if a < to_row_number]
)
# Put the from_row row numbers into the row_map. Ultimately the
# total number of elements in the playlist doesn't change, so
# check that adding the moved rows starting at to_row won't
# overshoot the end of the playlist.
if adjusted_to_row + len(from_rows) > len(self.playlist_rows):
next_to_row = len(self.playlist_rows) - len(from_rows)
else:
next_to_row = adjusted_to_row
# zip iterates from_row and to_row simultaneously from the
# respective sequences inside zip()
for from_row, to_row in zip(
from_rows, range(next_to_row, next_to_row + len(from_rows))
):
row_map[from_row] = to_row
# Move the remaining rows to the row_map. We want to fill it
# before (if there are gaps) and after (likewise) the rows that
# are moving.
# zip iterates old_row and new_row simultaneously from the
# respective sequences inside zip()
for old_row, new_row in zip(
[x for x in self.playlist_rows.keys() if x not in from_rows],
[y for y in range(len(self.playlist_rows)) if y not in row_map.values()],
):
# Optimise: only add to map if there is a change
if old_row != new_row:
row_map[old_row] = new_row
# For SQLAlchemy, build a list of dictionaries that map playlistrow_id to
# new row number:
sqla_map: list[dict[str, int]] = []
for oldrow, newrow in row_map.items():
playlistrow_id = self.playlist_rows[oldrow].playlistrow_id
sqla_map.append({"playlistrow_id": playlistrow_id, "row_number": newrow})
with db.Session() as session:
PlaylistRows.update_plr_row_numbers(session, self.playlist_id, sqla_map)
session.commit()
# Update playlist_rows
self.refresh_data(session)
# Update display
self.reset_track_sequence_row_numbers()
self.update_track_times()
# only invalidate required roles
roles = [
Qt.ItemDataRole.DisplayRole,
]
self.invalidate_rows(list(row_map.keys()), roles)
def move_rows_between_playlists(
self,
from_rows: list[int],
to_row_number: int,
to_playlist_id: int,
) -> None:
"""
Move the playlist rows given to to_row and below of to_playlist.
"""
log.debug(
f"{self}: move_rows_between_playlists({from_rows=}, "
f"{to_row_number=}, {to_playlist_id=}"
)
# Row removal must be wrapped in beginRemoveRows ..
# endRemoveRows and the row range must be contiguous. Process
# the highest rows first so the lower row numbers are unchanged
row_groups = self._reversed_contiguous_row_groups(from_rows)
# Prepare destination playlist for a reset
self.signals.begin_reset_model_signal.emit(to_playlist_id)
with db.Session() as session:
for row_group in row_groups:
# Make room in destination playlist
max_destination_row_number = PlaylistRows.get_last_used_row(
session, to_playlist_id
)
if (
max_destination_row_number
and to_row_number <= max_destination_row_number
):
# Move the destination playlist rows down to make room.
PlaylistRows.move_rows_down(
session, to_playlist_id, to_row_number, len(row_group)
)
next_to_row = to_row_number
super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group))
for playlist_row in PlaylistRows.plrids_to_plrs(
session,
self.playlist_id,
[self.playlist_rows[a].playlistrow_id for a in row_group],
):
if (
track_sequence.current
and playlist_row.id == track_sequence.current.playlistrow_id
):
# Don't move current track
continue
playlist_row.playlist_id = to_playlist_id
playlist_row.row_number = next_to_row
next_to_row += 1
self.refresh_data(session)
super().endRemoveRows()
# We need to remove gaps in row numbers after tracks have
# moved.
PlaylistRows.fixup_rownumbers(session, self.playlist_id)
self.refresh_data(session)
session.commit()
# Reset of model must come after session has been closed
self.reset_track_sequence_row_numbers()
self.signals.end_reset_model_signal.emit(to_playlist_id)
self.update_track_times()
def move_track_add_note(
self, new_row_number: int, existing_rat: RowAndTrack, note: str
) -> None:
"""
Move existing_rat track to new_row_number and append note to any existing note
"""
log.debug(
f"{self}: move_track_add_note({new_row_number=}, {existing_rat=}, {note=}"
)
if note:
with db.Session() as session:
playlist_row = session.get(PlaylistRows, existing_rat.playlistrow_id)
if playlist_row:
if playlist_row.note:
playlist_row.note += "\n" + note
else:
playlist_row.note = note
self.refresh_row(session, playlist_row.row_number)
session.commit()
# Carry out the move outside of the session context to ensure
# database updated with any note change
self.move_rows([existing_rat.row_number], new_row_number)
self.signals.resize_rows_signal.emit(self.playlist_id)
def move_track_to_header(
self,
header_row_number: int,
existing_rat: RowAndTrack,
note: Optional[str],
) -> None:
"""
Add the existing_rat track details to the existing header at header_row_number
"""
log.debug(
f"{self}: move_track_to_header({header_row_number=}, {existing_rat=}, {note=}"
)
if existing_rat.track_id:
if note and existing_rat.note:
note += "\n" + existing_rat.note
self.add_track_to_header(header_row_number, existing_rat.track_id, note)
self.delete_rows([existing_rat.row_number])
def obs_scene_change(self, row_number: int) -> None:
"""
Check this row and any preceding headers for OBS scene change command
and execute any found
"""
log.debug(f"{self}: obs_scene_change({row_number=})")
# Check any headers before this row
idx = row_number - 1
while self.is_header_row(idx):
idx -= 1
# Step through headers in row order and finish with this row
for chkrow in range(idx + 1, row_number + 1):
match_obj = scene_change_re.search(self.playlist_rows[chkrow].note)
if match_obj:
scene_name = match_obj.group(1)
if scene_name:
ws = obswebsocket.obsws(
host=Config.OBS_HOST,
port=Config.OBS_PORT,
password=Config.OBS_PASSWORD,
)
try:
ws.connect()
ws.call(
obswebsocket.requests.SetCurrentProgramScene(
sceneName=scene_name
)
)
log.debug(f"{self}: OBS scene changed to '{scene_name}'")
continue
except obswebsocket.exceptions.ConnectionFailure:
log.warning(f"{self}: OBS connection refused")
return
def previous_track_ended(self) -> None:
"""
Notification from musicmuster that the previous track has ended.
Actions required:
- sanity check
- update display
"""
log.debug(f"{self}: previous_track_ended()")
# Sanity check
if not track_sequence.previous:
log.error(
f"{self}: playlistmodel:previous_track_ended called with no current track"
)
return
if track_sequence.previous.row_number is None:
log.error(
f"{self}: previous_track_ended called with no row number "
f"({track_sequence.previous=})"
)
return
# Update display
# only invalidate required roles
roles = [
Qt.ItemDataRole.BackgroundRole,
]
self.invalidate_row(track_sequence.previous.row_number, roles)
def refresh_data(self, session: Session) -> None:
"""
Populate self.playlist_rows with playlist data
We used to clear self.playlist_rows each time but that's
expensive and slow on big playlists. Instead we track where rows
are in database versus self.playlist_rows and fixup the latter.
This works well for news rows added and for rows moved, but
doesn't work for changed comments so they must be handled using
refresh_row().
"""
# Note where each playlist_id is
plid_to_row: dict[int, int] = {}
for oldrow in self.playlist_rows:
plrdata = self.playlist_rows[oldrow]
plid_to_row[plrdata.playlistrow_id] = plrdata.row_number
# build a new playlist_rows
new_playlist_rows: dict[int, RowAndTrack] = {}
for p in PlaylistRows.get_playlist_rows(session, self.playlist_id):
if p.id not in plid_to_row:
new_playlist_rows[p.row_number] = RowAndTrack(p)
else:
new_playlist_rows[p.row_number] = self.playlist_rows[plid_to_row[p.id]]
new_playlist_rows[p.row_number].row_number = p.row_number
# Copy to self.playlist_rows
self.playlist_rows = new_playlist_rows
def refresh_row(self, session, row_number):
"""Populate dict for one row from database"""
p = PlaylistRows.deep_row(session, self.playlist_id, row_number)
self.playlist_rows[row_number] = RowAndTrack(p)
def remove_track(self, row_number: int) -> None:
"""
Remove track from row, retaining row as a header row
"""
log.debug(f"{self}: remove_track({row_number=})")
with db.Session() as session:
playlist_row = session.get(
PlaylistRows, self.playlist_rows[row_number].playlistrow_id
)
if playlist_row:
playlist_row.track_id = None
session.commit()
self.refresh_row(session, row_number)
# only invalidate required roles
roles = [
Qt.ItemDataRole.DisplayRole,
]
self.invalidate_row(row_number, roles)
def rescan_track(self, row_number: int) -> None:
"""
Rescan track at passed row number
"""
track_id = self.playlist_rows[row_number].track_id
if track_id:
with db.Session() as session:
track = session.get(Tracks, track_id)
set_track_metadata(track)
self.refresh_row(session, row_number)
self.update_track_times()
roles = [
Qt.ItemDataRole.BackgroundRole,
Qt.ItemDataRole.DisplayRole,
]
# only invalidate required roles
self.invalidate_row(row_number, roles)
self.signals.resize_rows_signal.emit(self.playlist_id)
session.commit()
def reset_track_sequence_row_numbers(self) -> None:
"""
Signal handler for when row ordering has changed.
Example: row 4 is marked as next. Row 2 is deleted. The PlaylistRows table will
be correctly updated with change of row number, but track_sequence.next will still
contain row_number==4. This function fixes up the track_sequence row numbers by
looking up the playlistrow_id and retrieving the row number from the database.
"""
# Check the track_sequence.next, current and previous plrs and
# update the row number
with db.Session() as session:
for ts in [
track_sequence.next,
track_sequence.current,
track_sequence.previous,
]:
if ts:
ts.update_playlist_and_row(session)
session.commit()
self.update_track_times()
def remove_comments(self, row_numbers: list[int]) -> None:
"""
Remove comments from passed rows
"""
if not row_numbers:
return
# Safety check
if not ask_yes_no(
title="Remove comments",
question=f"Remove comments from {len(row_numbers)} rows?",
):
return
with db.Session() as session:
for row_number in row_numbers:
playlist_row = session.get(
PlaylistRows, self.playlist_rows[row_number].playlistrow_id
)
if playlist_row.track_id:
playlist_row.note = ""
# We can't use refresh_data() because its
# optimisations mean it won't update comments in
# self.playlist_rows
# The "correct" approach would be to re-read from the
# database but we optimise here by simply updating
# self.playlist_rows directly.
self.playlist_rows[row_number].note = ""
session.commit()
# only invalidate required roles
roles = [
Qt.ItemDataRole.BackgroundRole,
Qt.ItemDataRole.DisplayRole,
Qt.ItemDataRole.ForegroundRole,
]
self.invalidate_rows(row_numbers, roles)
def _reversed_contiguous_row_groups(
self, row_numbers: list[int]
) -> list[list[int]]:
"""
Take the list of row numbers and split into groups of contiguous rows. Return as a list
of lists with the highest row numbers first.
Example:
input [2, 3, 4, 5, 7, 9, 10, 13, 17, 20, 21]
return: [[20, 21], [17], [13], [9, 10], [7], [2, 3, 4, 5]]
"""
log.debug(f"{self}: _reversed_contiguous_row_groups({row_numbers=} called")
result: list[list[int]] = []
temp: list[int] = []
last_value = row_numbers[0] - 1
for idx in range(len(row_numbers)):
if row_numbers[idx] != last_value + 1:
result.append(temp)
temp = []
last_value = row_numbers[idx]
temp.append(last_value)
if temp:
result.append(temp)
result.reverse()
log.debug(f"{self}: _reversed_contiguous_row_groups() returned: {result=}")
return result
def remove_section_timer_markers(self, header_text: str) -> str:
"""
Remove characters used to mark section timeings from
passed header text.
Remove text using to signal header colours if colour entry
is so marked.
Return header text witout markers
"""
if header_text == "=":
return ""
while header_text.endswith(Config.SECTION_STARTS):
header_text = header_text[0:-1]
while header_text.endswith(Config.SECTION_ENDINGS):
header_text = header_text[0:-1]
# Parse passed header text and remove the first colour match string
with db.Session() as session:
for rec in NoteColours.get_all(session):
if not rec.strip_substring:
continue
if rec.is_regex:
flags = re.UNICODE
if not rec.is_casesensitive:
flags |= re.IGNORECASE
p = re.compile(rec.substring, flags)
if p.match(header_text):
header_text = re.sub(p, "", header_text)
break
else:
if rec.is_casesensitive:
if rec.substring.lower() in header_text.lower():
header_text = remove_substring_case_insensitive(
header_text, rec.substring
)
break
else:
if rec.substring in header_text:
header_text = header_text.replace(rec.substring, "")
break
return header_text
def rowCount(self, index: QModelIndex = QModelIndex()) -> int:
"""Standard function for view"""
return len(self.playlist_rows)
def section_subtotal_header(self, rat: RowAndTrack) -> str:
"""
Process this row as subtotal within a timed section and
return display text for this row
"""
count: int = 0
unplayed_count: int = 0
duration: int = 0
if rat.row_number == 0:
# Meaningless to have a subtotal on row 0
return Config.SUBTOTAL_ON_ROW_ZERO
# Show subtotal
for row_number in range(rat.row_number - 1, -1, -1):
row_rat = self.playlist_rows[row_number]
if self.is_header_row(row_number) or row_number == 0:
if row_rat.note.endswith(Config.SECTION_STARTS) or row_number == 0:
# If we are playing this section, also
# calculate end time when all tracks are played.
end_time_str = ""
if (
track_sequence.current
and track_sequence.current.end_time
and (
row_number
< track_sequence.current.row_number
< rat.row_number
)
):
section_end_time = (
track_sequence.current.end_time
+ dt.timedelta(milliseconds=duration)
)
end_time_str = (
", section end time "
+ section_end_time.strftime(Config.TRACK_TIME_FORMAT)
)
clean_header = self.remove_section_timer_markers(rat.note)
if clean_header:
return (
f"{clean_header} ["
f"{unplayed_count}/{count} track{'s' if count > 1 else ''} "
f"({ms_to_mmss(duration)}) unplayed{end_time_str}]"
)
else:
return (
f"[{unplayed_count}/{count} track{'s' if count > 1 else ''} "
f"({ms_to_mmss(duration)}) unplayed{end_time_str}]"
)
else:
continue
else:
count += 1
if not row_rat.played:
unplayed_count += 1
duration += row_rat.duration
# We should never get here
raise ApplicationError("Error in section_subtotal_header()")
def selection_is_sortable(self, row_numbers: list[int]) -> bool:
"""
Return True if the selection is sortable. That means:
- at least two rows selected
- selected rows are contiguous
- selected rows do not include any header rows
"""
# at least two rows selected
if len(row_numbers) < 2:
return False
# selected rows are contiguous
if sorted(row_numbers) != list(range(min(row_numbers), max(row_numbers) + 1)):
return False
# selected rows do not include any header rows
for row_number in row_numbers:
if self.is_header_row(row_number):
return False
return True
def set_next_row(self, row_number: Optional[int]) -> None:
"""
Set row_number as next track. If row_number is None, clear next track.
Return True if successful else False.
"""
log.debug(f"{self}: set_next_row({row_number=})")
if row_number is None:
# Clear next track
if track_sequence.next is not None:
track_sequence.set_next(None)
else:
# Get playlistrow_id of row
try:
rat = self.playlist_rows[row_number]
except IndexError:
log.error(f"{self} set_track_sequence.next({row_number=}, IndexError")
return
if rat.track_id is None or rat.row_number is None:
log.error(
f"{self} .set_track_sequence.next({row_number=}, "
f"No track / row number {rat.track_id=}, {rat.row_number=}"
)
return
old_next_row: Optional[int] = None
if track_sequence.next:
old_next_row = track_sequence.next.row_number
track_sequence.set_next(rat)
if Config.WIKIPEDIA_ON_NEXT:
self.signals.search_wikipedia_signal.emit(
self.playlist_rows[row_number].title
)
if Config.SONGFACTS_ON_NEXT:
self.signals.search_songfacts_signal.emit(
self.playlist_rows[row_number].title
)
roles = [
Qt.ItemDataRole.BackgroundRole,
]
if old_next_row is not None:
# only invalidate required roles
self.invalidate_row(old_next_row, roles)
# only invalidate required roles
self.invalidate_row(row_number, roles)
self.signals.next_track_changed_signal.emit()
self.update_track_times()
def setData(
self,
index: QModelIndex,
value: str | float,
role: int = Qt.ItemDataRole.EditRole,
) -> bool:
"""
Update model with edited data
"""
if not index.isValid() or role != Qt.ItemDataRole.EditRole:
return False
row_number = index.row()
column = index.column()
with db.Session() as session:
playlist_row = session.get(
PlaylistRows, self.playlist_rows[row_number].playlistrow_id
)
if not playlist_row:
log.error(
f"{self}: Error saving data: {row_number=}, {column=}, {value=}"
)
return False
if playlist_row.track_id:
if column in [Col.TITLE.value, Col.ARTIST.value, Col.INTRO.value]:
track = session.get(Tracks, playlist_row.track_id)
if not track:
log.error(f"{self}: Error retreiving track: {playlist_row=}")
return False
if column == Col.TITLE.value:
track.title = str(value)
elif column == Col.ARTIST.value:
track.artist = str(value)
elif column == Col.INTRO.value:
track.intro = int(round(float(value), 1) * 1000)
else:
log.error(f"{self}: Error updating track: {column=}, {value=}")
return False
elif column == Col.NOTE.value:
playlist_row.note = str(value)
else:
# This is a header row
if column == HEADER_NOTES_COLUMN:
playlist_row.note = str(value)
# commit changes before refreshing data
session.commit()
self.refresh_row(session, row_number)
self.dataChanged.emit(index, index, [Qt.ItemDataRole.DisplayRole, role])
return True
def sort_by_artist(self, row_numbers: list[int]) -> None:
"""
Sort selected rows by artist
"""
self.sort_by_attribute(row_numbers, "artist")
def sort_by_attribute(self, row_numbers: list[int], attr_name: str) -> None:
"""
Sort selected rows by passed attribute name where 'attribute' is a
key in PlaylistRowData
"""
# Create a subset of playlist_rows with the rows we are
# interested in
shortlist_rows = {k: self.playlist_rows[k] for k in row_numbers}
sorted_list = [
playlist_row.row_number
for playlist_row in sorted(
shortlist_rows.values(), key=attrgetter(attr_name)
)
]
self.move_rows(sorted_list, min(sorted_list))
def sort_by_duration(self, row_numbers: list[int]) -> None:
"""
Sort selected rows by duration
"""
self.sort_by_attribute(row_numbers, "duration")
def sort_by_lastplayed(self, row_numbers: list[int]) -> None:
"""
Sort selected rows by lastplayed
"""
self.sort_by_attribute(row_numbers, "lastplayed")
def sort_randomly(self, row_numbers: list[int]) -> None:
"""
Sort selected rows randomly
"""
shuffle(row_numbers)
self.move_rows(row_numbers, min(row_numbers))
def sort_by_title(self, row_numbers: list[int]) -> None:
"""
Sort selected rows by title
"""
self.sort_by_attribute(row_numbers, "title")
def start_of_timed_section_header(self, rat: RowAndTrack) -> str:
"""
Process this row as the start of a timed section and
return display text for this row
"""
count: int = 0
unplayed_count: int = 0
duration: int = 0
clean_header = self.remove_section_timer_markers(rat.note)
for row_number in range(rat.row_number + 1, len(self.playlist_rows)):
row_rat = self.playlist_rows[row_number]
if self.is_header_row(row_number):
if row_rat.note.endswith(Config.SECTION_ENDINGS):
return (
f"{clean_header} "
f"[{count} tracks, {ms_to_mmss(duration)} unplayed]"
)
else:
continue
else:
count += 1
if not row_rat.played:
unplayed_count += 1
duration += row_rat.duration
return (
f"{clean_header} "
f"[{count} tracks, {ms_to_mmss(duration, none='none')} "
"unplayed (to end of playlist)]"
)
def supportedDropActions(self) -> Qt.DropAction:
return Qt.DropAction.MoveAction | Qt.DropAction.CopyAction
def _tooltip_role(self, row: int, column: int, rat: RowAndTrack) -> str:
"""
Return tooltip. Currently only used for last_played column.
"""
if column != Col.LAST_PLAYED.value:
return ""
with db.Session() as session:
track_id = self.playlist_rows[row].track_id
if not track_id:
return ""
playdates = Playdates.last_playdates(session, track_id)
return "<br>".join(
[
a.lastplayed.strftime(Config.LAST_PLAYED_TOOLTIP_DATE_FORMAT)
for a in playdates
]
)
def update_or_insert(self, track_id: int, row_number: int) -> None:
"""
If the passed track_id exists in this playlist, update the
row(s), otherwise insert this track at row_number.
"""
track_rows = [
a.row_number for a in self.playlist_rows.values() if a.track_id == track_id
]
if track_rows:
with db.Session() as session:
for row in track_rows:
self.refresh_row(session, row)
# only invalidate required roles
roles = [
Qt.ItemDataRole.BackgroundRole,
Qt.ItemDataRole.DisplayRole,
Qt.ItemDataRole.FontRole,
Qt.ItemDataRole.ForegroundRole,
]
self.invalidate_rows(track_rows, roles)
else:
self.insert_row(proposed_row_number=row_number, track_id=track_id)
def update_track_times(self) -> None:
"""
Update track start/end times in self.playlist_rows
"""
next_start_time: Optional[dt.datetime] = None
update_rows: list[int] = []
row_count = len(self.playlist_rows)
current_track_row = None
next_track_row = None
if (
track_sequence.current
and track_sequence.current.playlist_id == self.playlist_id
):
current_track_row = track_sequence.current.row_number
# Update current track details now so that they are available
# when we deal with next track row which may be above current
# track row.
self.playlist_rows[current_track_row].set_forecast_start_time(
update_rows, track_sequence.current.start_time
)
if track_sequence.next and track_sequence.next.playlist_id == self.playlist_id:
next_track_row = track_sequence.next.row_number
for row_number in range(row_count):
rat = self.playlist_rows[row_number]
# Don't update times for tracks that have been played, for
# unreadable tracks or for the current track, handled above.
if (
rat.played
or row_number == current_track_row
or (rat.path and file_is_unreadable(rat.path))
):
continue
# Reset start time if timing in header
if self.is_header_row(row_number):
header_time = get_embedded_time(rat.note)
if header_time:
next_start_time = header_time
continue
# Set start time for next row if we have a current track
if (
row_number == next_track_row
and track_sequence.current
and track_sequence.current.end_time
):
next_start_time = rat.set_forecast_start_time(
update_rows, track_sequence.current.end_time
)
continue
# If we're between the current and next row, zero out
# times
if (current_track_row or row_count) < row_number < (next_track_row or 0):
rat.set_forecast_start_time(update_rows, None)
continue
# Set start/end
next_start_time = rat.set_forecast_start_time(update_rows, next_start_time)
# Update start/stop times of rows that have changed
for updated_row in update_rows:
self.dataChanged.emit(
self.index(updated_row, Col.START_TIME.value),
self.index(updated_row, Col.END_TIME.value),
)
class PlaylistProxyModel(QSortFilterProxyModel):
"""
For searching and filtering
"""
def __init__(
self,
) -> None:
super().__init__()
# Search all columns
self.setFilterKeyColumn(-1)
def __repr__(self) -> str:
return f"<PlaylistProxyModel: sourceModel={self.sourceModel()}>"
def filterAcceptsRow(self, source_row: int, source_parent: QModelIndex) -> bool:
"""
Subclass to filter by played status. Return True to show this row, False to hide it.
"""
if Config.HIDE_PLAYED_MODE != Config.HIDE_PLAYED_MODE_TRACKS:
return super().filterAcceptsRow(source_row, source_parent)
if self.sourceModel().played_tracks_hidden:
if self.sourceModel().is_played_row(source_row):
# Don't hide current track
if (
track_sequence.current
and track_sequence.current.playlist_id
== self.sourceModel().playlist_id
and track_sequence.current.row_number == source_row
):
return True
# Don't hide next track
if (
track_sequence.next
and track_sequence.next.playlist_id
== self.sourceModel().playlist_id
and track_sequence.next.row_number == source_row
):
return True
# Handle previous track
if track_sequence.previous:
if (
track_sequence.previous.playlist_id
!= self.sourceModel().playlist_id
or track_sequence.previous.row_number != source_row
):
# This row isn't our previous track: hide it
return False
if track_sequence.current and track_sequence.current.start_time:
# This row is our previous track. Don't hide it
# until HIDE_AFTER_PLAYING_OFFSET milliseconds
# after current track has started
if track_sequence.current.start_time and dt.datetime.now() > (
track_sequence.current.start_time
+ dt.timedelta(
milliseconds=Config.HIDE_AFTER_PLAYING_OFFSET
)
):
return False
else:
# Invalidate this row in
# HIDE_AFTER_PLAYING_OFFSET and a bit
# milliseconds so that it hides then. We add
# 100mS on so that the if clause above is
# true next time through.
# only invalidate required roles
roles = [
Qt.ItemDataRole.DisplayRole,
]
QTimer.singleShot(
Config.HIDE_AFTER_PLAYING_OFFSET + 100,
lambda: self.sourceModel().invalidate_row(source_row, roles),
)
return True
# Next track not playing yet so don't hide previous
else:
return True
# No previous track so hide this played track immediately
return False
return super().filterAcceptsRow(source_row, source_parent)
def set_incremental_search(self, search_string: str) -> None:
"""
Update search pattern
"""
self.setFilterRegularExpression(
QRegularExpression(
search_string, QRegularExpression.PatternOption.CaseInsensitiveOption
)
)
def sourceModel(self) -> PlaylistModel:
"""
Override sourceModel to return correct type
"""
return cast(PlaylistModel, super().sourceModel())