After around 1.5h of operation, we'd get messages such as: vlcpulse audio output error: PulseAudio server connection failure: Connection terminated Tracked down to not correctly releasing vlc player resources when track had finished playing. Fixed now, and much simplified the fadeout code as well.
1658 lines
58 KiB
Python
1658 lines
58 KiB
Python
# Standard library imports
|
|
# Allow forward reference to PlaylistModel
|
|
from __future__ import annotations
|
|
|
|
from operator import attrgetter
|
|
from random import shuffle
|
|
from typing import Optional
|
|
import datetime as dt
|
|
import re
|
|
|
|
# PyQt imports
|
|
from PyQt6.QtCore import (
|
|
QAbstractTableModel,
|
|
QModelIndex,
|
|
QObject,
|
|
QRegularExpression,
|
|
QSortFilterProxyModel,
|
|
Qt,
|
|
QTimer,
|
|
QVariant,
|
|
)
|
|
from PyQt6.QtGui import (
|
|
QBrush,
|
|
QColor,
|
|
QFont,
|
|
)
|
|
|
|
# Third party imports
|
|
import obswebsocket # type: ignore
|
|
|
|
# import snoop # type: ignore
|
|
|
|
# App imports
|
|
from classes import (
|
|
Col,
|
|
MusicMusterSignals,
|
|
RowAndTrack,
|
|
track_sequence,
|
|
)
|
|
from config import Config
|
|
from helpers import (
|
|
file_is_unreadable,
|
|
get_embedded_time,
|
|
get_relative_date,
|
|
ms_to_mmss,
|
|
set_track_metadata,
|
|
)
|
|
from log import log
|
|
from models import db, NoteColours, Playdates, PlaylistRows, Tracks
|
|
|
|
|
|
HEADER_NOTES_COLUMN = 1
|
|
scene_change_re = re.compile(r"SetScene=\[([^[\]]*)\]")
|
|
|
|
|
|
class PlaylistModel(QAbstractTableModel):
|
|
"""
|
|
The Playlist Model
|
|
|
|
Update strategy: update the database and then refresh the
|
|
row-indexed cached copy (self.playlist_rows). Do not edit
|
|
self.playlist_rows directly because keeping it and the
|
|
database in sync is uncessarily challenging.
|
|
|
|
refresh_row() will populate one row of playlist_rows from the
|
|
database
|
|
|
|
refresh_data() will repopulate all of playlist_rows from the
|
|
database
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
playlist_id: int,
|
|
*args: Optional[QObject],
|
|
**kwargs: Optional[QObject],
|
|
) -> None:
|
|
log.debug("PlaylistModel.__init__()")
|
|
|
|
self.playlist_id = playlist_id
|
|
super().__init__(*args, **kwargs)
|
|
|
|
self.playlist_rows: dict[int, RowAndTrack] = {}
|
|
self.signals = MusicMusterSignals()
|
|
self.played_tracks_hidden = False
|
|
|
|
self.signals.begin_reset_model_signal.connect(self.begin_reset_model)
|
|
self.signals.end_reset_model_signal.connect(self.end_reset_model)
|
|
|
|
with db.Session() as session:
|
|
# Ensure row numbers in playlist are contiguous
|
|
PlaylistRows.fixup_rownumbers(session, playlist_id)
|
|
# Populate self.playlist_rows
|
|
self.refresh_data(session)
|
|
self.update_track_times()
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f"<PlaylistModel: playlist_id={self.playlist_id}, {self.rowCount()} rows>"
|
|
)
|
|
|
|
def add_track_to_header(
|
|
self, row_number: int, track_id: int, note: Optional[str] = None
|
|
) -> None:
|
|
"""
|
|
Add track to existing header row
|
|
"""
|
|
|
|
log.debug(f"{self}: add_track_to_header({row_number=}, {track_id=}, {note=}")
|
|
|
|
# Get existing row
|
|
try:
|
|
rat = self.playlist_rows[row_number]
|
|
except KeyError:
|
|
log.error(
|
|
f"{self}: KeyError in add_track_to_header ({row_number=}, {track_id=})"
|
|
)
|
|
return
|
|
if rat.path:
|
|
log.error(
|
|
f"{self}: Header row already has track associated ({rat=}, {track_id=})"
|
|
)
|
|
return
|
|
with db.Session() as session:
|
|
playlistrow = session.get(PlaylistRows, rat.playlistrow_id)
|
|
if playlistrow:
|
|
# Add track to PlaylistRows
|
|
playlistrow.track_id = track_id
|
|
# Add any further note (header will already have a note)
|
|
if note:
|
|
playlistrow.note += "\n" + note
|
|
# Update local copy
|
|
self.refresh_row(session, row_number)
|
|
# Repaint row
|
|
self.invalidate_row(row_number)
|
|
session.commit()
|
|
|
|
self.signals.resize_rows_signal.emit(self.playlist_id)
|
|
|
|
def background_role(self, row: int, column: int, rat: RowAndTrack) -> QBrush:
|
|
"""Return background setting"""
|
|
|
|
# Handle entire row colouring
|
|
# Header row
|
|
if self.is_header_row(row):
|
|
# Check for specific header colouring
|
|
with db.Session() as session:
|
|
note_colour = NoteColours.get_colour(session, rat.note)
|
|
if note_colour:
|
|
return QBrush(QColor(note_colour))
|
|
else:
|
|
return QBrush(QColor(Config.COLOUR_NOTES_PLAYLIST))
|
|
# Unreadable track file
|
|
if file_is_unreadable(rat.path):
|
|
return QBrush(QColor(Config.COLOUR_UNREADABLE))
|
|
# Current track
|
|
if track_sequence.current and track_sequence.current.row_number == row:
|
|
return QBrush(QColor(Config.COLOUR_CURRENT_PLAYLIST))
|
|
# Next track
|
|
if track_sequence.next and track_sequence.next.row_number == row:
|
|
return QBrush(QColor(Config.COLOUR_NEXT_PLAYLIST))
|
|
|
|
# Individual cell colouring
|
|
if column == Col.START_GAP.value:
|
|
if rat.start_gap and rat.start_gap >= Config.START_GAP_WARNING_THRESHOLD:
|
|
return QBrush(QColor(Config.COLOUR_LONG_START))
|
|
if column == Col.BITRATE.value:
|
|
if not rat.bitrate or rat.bitrate < Config.BITRATE_LOW_THRESHOLD:
|
|
return QBrush(QColor(Config.COLOUR_BITRATE_LOW))
|
|
elif rat.bitrate < Config.BITRATE_OK_THRESHOLD:
|
|
return QBrush(QColor(Config.COLOUR_BITRATE_MEDIUM))
|
|
else:
|
|
return QBrush(QColor(Config.COLOUR_BITRATE_OK))
|
|
if column == Col.NOTE.value:
|
|
if rat.note:
|
|
with db.Session() as session:
|
|
note_colour = NoteColours.get_colour(session, rat.note)
|
|
if note_colour:
|
|
return QBrush(QColor(note_colour))
|
|
|
|
return QBrush()
|
|
|
|
def begin_reset_model(self, playlist_id: int) -> None:
|
|
"""
|
|
Reset model if playlist_id is ours
|
|
"""
|
|
|
|
if playlist_id != self.playlist_id:
|
|
return
|
|
super().beginResetModel()
|
|
|
|
def columnCount(self, parent: QModelIndex = QModelIndex()) -> int:
|
|
"""Standard function for view"""
|
|
|
|
return len(Col)
|
|
|
|
def current_track_started(self) -> None:
|
|
"""
|
|
Notification from musicmuster that the current track has just
|
|
started playing
|
|
|
|
Actions required:
|
|
- sanity check
|
|
- change OBS scene if needed
|
|
- update Playdates in database
|
|
- update PlaylistRows in database
|
|
- update display
|
|
- find next track
|
|
- update track times
|
|
"""
|
|
|
|
if not track_sequence.current:
|
|
return
|
|
|
|
row_number = track_sequence.current.row_number
|
|
|
|
# Check for OBS scene change
|
|
log.debug(f"{self}: Call OBS scene change")
|
|
self.obs_scene_change(row_number)
|
|
|
|
# Sanity check that we have a track_id
|
|
if not track_sequence.current.track_id:
|
|
log.error(
|
|
f"{self}: current_track_started() called with {track_sequence.current.track_id=}"
|
|
)
|
|
return
|
|
|
|
with db.Session() as session:
|
|
# Update Playdates in database
|
|
log.debug(f"{self}: update playdates")
|
|
Playdates(session, track_sequence.current.track_id)
|
|
|
|
# Mark track as played in playlist
|
|
log.debug(f"{self}: Mark track as played")
|
|
plr = session.get(PlaylistRows, track_sequence.current.playlistrow_id)
|
|
if plr:
|
|
plr.played = True
|
|
self.refresh_row(session, plr.row_number)
|
|
else:
|
|
log.error(
|
|
f"{self}: Can't retrieve plr, {track_sequence.current.playlistrow_id=}"
|
|
)
|
|
|
|
# Update colour and times for current row
|
|
self.invalidate_row(row_number)
|
|
|
|
# Update previous row in case we're hiding played rows
|
|
if track_sequence.previous and track_sequence.previous.row_number:
|
|
self.invalidate_row(track_sequence.previous.row_number)
|
|
|
|
# Update all other track times
|
|
self.update_track_times()
|
|
|
|
# Find next track
|
|
# Get all unplayed track rows
|
|
log.debug(f"{self}: Find next track")
|
|
next_row = None
|
|
unplayed_rows = self.get_unplayed_rows()
|
|
if unplayed_rows:
|
|
try:
|
|
# Find next row after current track
|
|
next_row = min(
|
|
[
|
|
a
|
|
for a in unplayed_rows
|
|
if a > row_number and not self.is_header_row(a)
|
|
]
|
|
)
|
|
except ValueError:
|
|
# Find first unplayed track
|
|
next_row = min(unplayed_rows)
|
|
if next_row is not None:
|
|
self.set_next_row(next_row)
|
|
|
|
session.commit()
|
|
|
|
def data(
|
|
self, index: QModelIndex, role: int = Qt.ItemDataRole.DisplayRole
|
|
) -> QVariant:
|
|
"""Return data to view"""
|
|
|
|
if not index.isValid() or not (0 <= index.row() < len(self.playlist_rows)):
|
|
return QVariant()
|
|
|
|
row = index.row()
|
|
column = index.column()
|
|
# rat for playlist row data as it's used a lot
|
|
rat = self.playlist_rows[row]
|
|
|
|
# Dispatch to role-specific functions
|
|
dispatch_table = {
|
|
int(Qt.ItemDataRole.BackgroundRole): self.background_role,
|
|
int(Qt.ItemDataRole.DisplayRole): self.display_role,
|
|
int(Qt.ItemDataRole.EditRole): self.edit_role,
|
|
int(Qt.ItemDataRole.FontRole): self.font_role,
|
|
int(Qt.ItemDataRole.ToolTipRole): self.tooltip_role,
|
|
}
|
|
|
|
if role in dispatch_table:
|
|
return QVariant(dispatch_table[role](row, column, rat))
|
|
|
|
# Document other roles but don't use them
|
|
if role in [
|
|
Qt.ItemDataRole.DecorationRole,
|
|
Qt.ItemDataRole.StatusTipRole,
|
|
Qt.ItemDataRole.WhatsThisRole,
|
|
Qt.ItemDataRole.SizeHintRole,
|
|
Qt.ItemDataRole.TextAlignmentRole,
|
|
Qt.ItemDataRole.ForegroundRole,
|
|
Qt.ItemDataRole.CheckStateRole,
|
|
Qt.ItemDataRole.InitialSortOrderRole,
|
|
]:
|
|
return QVariant()
|
|
|
|
# Fall through to no-op
|
|
return QVariant()
|
|
|
|
def delete_rows(self, row_numbers: list[int]) -> None:
|
|
"""
|
|
Delete passed rows from model
|
|
|
|
Need to delete them in contiguous groups wrapped in beginRemoveRows / endRemoveRows
|
|
calls. To keep it simple, if inefficient, delete rows one by one.
|
|
|
|
TODO: delete in blocks
|
|
|
|
Delete from highest row back so that not yet deleted row numbers don't change.
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
for row_number in sorted(row_numbers, reverse=True):
|
|
log.debug(f"{self}: delete_rows(), {row_number=}")
|
|
super().beginRemoveRows(QModelIndex(), row_number, row_number)
|
|
# We need to remove data from the underlying data store,
|
|
# which is the database, but we cache in
|
|
# self.playlist_rows, which is what calls to data()
|
|
# reads, so fixup that too.
|
|
PlaylistRows.delete_row(session, self.playlist_id, row_number)
|
|
PlaylistRows.fixup_rownumbers(session, self.playlist_id)
|
|
self.refresh_data(session)
|
|
session.commit()
|
|
super().endRemoveRows()
|
|
|
|
self.reset_track_sequence_row_numbers()
|
|
|
|
def display_role(self, row: int, column: int, rat: RowAndTrack) -> QVariant:
|
|
"""
|
|
Return text for display
|
|
"""
|
|
|
|
header_row = self.is_header_row(row)
|
|
|
|
# Set / reset column span
|
|
if column == HEADER_NOTES_COLUMN:
|
|
column_span = 1
|
|
if header_row:
|
|
column_span = self.columnCount() - 1
|
|
self.signals.span_cells_signal.emit(
|
|
self.playlist_id, row, HEADER_NOTES_COLUMN, 1, column_span
|
|
)
|
|
|
|
if header_row:
|
|
if column == HEADER_NOTES_COLUMN:
|
|
header_text = self.header_text(rat)
|
|
if not header_text:
|
|
return QVariant(Config.TEXT_NO_TRACK_NO_NOTE)
|
|
else:
|
|
return QVariant(self.header_text(rat))
|
|
else:
|
|
return QVariant()
|
|
|
|
if column == Col.START_TIME.value:
|
|
start_time = rat.forecast_start_time
|
|
if start_time:
|
|
return QVariant(start_time.strftime(Config.TRACK_TIME_FORMAT))
|
|
return QVariant()
|
|
|
|
if column == Col.END_TIME.value:
|
|
end_time = rat.forecast_end_time
|
|
if end_time:
|
|
return QVariant(end_time.strftime(Config.TRACK_TIME_FORMAT))
|
|
return QVariant()
|
|
|
|
if column == Col.INTRO.value:
|
|
if rat.intro:
|
|
return QVariant(f"{rat.intro / 1000:{Config.INTRO_SECONDS_FORMAT}}")
|
|
else:
|
|
return QVariant()
|
|
|
|
dispatch_table = {
|
|
Col.ARTIST.value: QVariant(rat.artist),
|
|
Col.BITRATE.value: QVariant(rat.bitrate),
|
|
Col.DURATION.value: QVariant(ms_to_mmss(rat.duration)),
|
|
Col.LAST_PLAYED.value: QVariant(get_relative_date(rat.lastplayed)),
|
|
Col.NOTE.value: QVariant(rat.note),
|
|
Col.START_GAP.value: QVariant(rat.start_gap),
|
|
Col.TITLE.value: QVariant(rat.title),
|
|
}
|
|
if column in dispatch_table:
|
|
return dispatch_table[column]
|
|
|
|
return QVariant()
|
|
|
|
def end_reset_model(self, playlist_id: int) -> None:
|
|
"""
|
|
End model reset if this is our playlist
|
|
"""
|
|
|
|
log.debug(f"{self}: end_reset_model({playlist_id=})")
|
|
|
|
if playlist_id != self.playlist_id:
|
|
log.debug(f"{self}: end_reset_model: not us ({self.playlist_id=})")
|
|
return
|
|
with db.Session() as session:
|
|
self.refresh_data(session)
|
|
super().endResetModel()
|
|
self.reset_track_sequence_row_numbers()
|
|
|
|
def edit_role(self, row: int, column: int, rat: RowAndTrack) -> QVariant:
|
|
"""
|
|
Return text for editing
|
|
"""
|
|
|
|
# If this is a header row and we're being asked for the
|
|
# HEADER_NOTES_COLUMN, return the note value
|
|
if self.is_header_row(row) and column == HEADER_NOTES_COLUMN:
|
|
return QVariant(rat.note)
|
|
|
|
if column == Col.INTRO.value:
|
|
return QVariant(rat.intro)
|
|
if column == Col.TITLE.value:
|
|
return QVariant(rat.title)
|
|
if column == Col.ARTIST.value:
|
|
return QVariant(rat.artist)
|
|
if column == Col.NOTE.value:
|
|
return QVariant(rat.note)
|
|
|
|
return QVariant()
|
|
|
|
def flags(self, index: QModelIndex) -> Qt.ItemFlag:
|
|
"""
|
|
Standard model flags
|
|
"""
|
|
|
|
if not index.isValid():
|
|
return Qt.ItemFlag.ItemIsDropEnabled
|
|
|
|
default = (
|
|
Qt.ItemFlag.ItemIsEnabled
|
|
| Qt.ItemFlag.ItemIsSelectable
|
|
| Qt.ItemFlag.ItemIsDragEnabled
|
|
)
|
|
if index.column() in [
|
|
Col.TITLE.value,
|
|
Col.ARTIST.value,
|
|
Col.NOTE.value,
|
|
Col.INTRO.value,
|
|
]:
|
|
return default | Qt.ItemFlag.ItemIsEditable
|
|
|
|
return default
|
|
|
|
def font_role(self, row: int, column: int, rat: RowAndTrack) -> QVariant:
|
|
"""
|
|
Return font
|
|
"""
|
|
|
|
# Notes column is never bold
|
|
if column == Col.NOTE.value:
|
|
return QVariant()
|
|
|
|
boldfont = QFont()
|
|
boldfont.setBold(not self.playlist_rows[row].played)
|
|
|
|
return QVariant(boldfont)
|
|
|
|
def get_duplicate_rows(self) -> list[int]:
|
|
"""
|
|
Return a list of duplicate rows. If track appears in rows 2, 3 and 4, return [3, 4]
|
|
(ie, ignore the first, not-yet-duplicate, track).
|
|
"""
|
|
|
|
log.debug(f"{self}: get_duplicate_rows() called")
|
|
|
|
found = []
|
|
result = []
|
|
|
|
for i in range(len(self.playlist_rows)):
|
|
track_id = self.playlist_rows[i].track_id
|
|
if track_id is None:
|
|
continue
|
|
if track_id in found:
|
|
result.append(i)
|
|
else:
|
|
found.append(track_id)
|
|
|
|
log.debug(f"{self}: get_duplicate_rows() returned: {result=}")
|
|
return result
|
|
|
|
def _get_new_row_number(self, proposed_row_number: Optional[int]) -> int:
|
|
"""
|
|
Sanitises proposed new row number.
|
|
|
|
If proposed_row_number given, ensure it is valid.
|
|
If not given, return row number to add to end of model.
|
|
"""
|
|
|
|
log.debug(f"{self}: _get_new_row_number({proposed_row_number=})")
|
|
|
|
if proposed_row_number is None or proposed_row_number > len(self.playlist_rows):
|
|
# We are adding to the end of the list
|
|
new_row_number = len(self.playlist_rows)
|
|
elif proposed_row_number < 0:
|
|
# Add to start of list
|
|
new_row_number = 0
|
|
else:
|
|
new_row_number = proposed_row_number
|
|
|
|
log.debug(f"{self}: get_new_row_number() return: {new_row_number=}")
|
|
return new_row_number
|
|
|
|
def get_row_info(self, row_number: int) -> RowAndTrack:
|
|
"""
|
|
Return info about passed row
|
|
"""
|
|
|
|
return self.playlist_rows[row_number]
|
|
|
|
def get_row_track_id(self, row_number: int) -> Optional[int]:
|
|
"""
|
|
Return id of track associated with row or None if no track associated
|
|
"""
|
|
|
|
return self.playlist_rows[row_number].track_id
|
|
|
|
def get_row_track_path(self, row_number: int) -> str:
|
|
"""
|
|
Return path of track associated with row or empty string if no track associated
|
|
"""
|
|
|
|
return self.playlist_rows[row_number].path
|
|
|
|
def get_rows_duration(self, row_numbers: list[int]) -> int:
|
|
"""
|
|
Return the total duration of the passed rows
|
|
"""
|
|
|
|
duration = 0
|
|
for row_number in row_numbers:
|
|
duration += self.playlist_rows[row_number].duration
|
|
|
|
return duration
|
|
|
|
def get_unplayed_rows(self) -> list[int]:
|
|
"""
|
|
Return a list of unplayed row numbers
|
|
"""
|
|
|
|
result = [
|
|
a.row_number
|
|
for a in self.playlist_rows.values()
|
|
if not a.played and a.track_id is not None
|
|
]
|
|
# log.debug(f"{self}: get_unplayed_rows() returned: {result=}")
|
|
return result
|
|
|
|
def headerData(
|
|
self,
|
|
section: int,
|
|
orientation: Qt.Orientation,
|
|
role: int = Qt.ItemDataRole.DisplayRole,
|
|
) -> QVariant:
|
|
"""
|
|
Return text for headers
|
|
"""
|
|
|
|
display_dispatch_table = {
|
|
Col.START_GAP.value: QVariant(Config.HEADER_START_GAP),
|
|
Col.INTRO.value: QVariant(Config.HEADER_INTRO),
|
|
Col.TITLE.value: QVariant(Config.HEADER_TITLE),
|
|
Col.ARTIST.value: QVariant(Config.HEADER_ARTIST),
|
|
Col.DURATION.value: QVariant(Config.HEADER_DURATION),
|
|
Col.START_TIME.value: QVariant(Config.HEADER_START_TIME),
|
|
Col.END_TIME.value: QVariant(Config.HEADER_END_TIME),
|
|
Col.LAST_PLAYED.value: QVariant(Config.HEADER_LAST_PLAYED),
|
|
Col.BITRATE.value: QVariant(Config.HEADER_BITRATE),
|
|
Col.NOTE.value: QVariant(Config.HEADER_NOTE),
|
|
}
|
|
|
|
if role == Qt.ItemDataRole.DisplayRole:
|
|
if orientation == Qt.Orientation.Horizontal:
|
|
return display_dispatch_table[section]
|
|
else:
|
|
if Config.ROWS_FROM_ZERO:
|
|
return QVariant(str(section))
|
|
else:
|
|
return QVariant(str(section + 1))
|
|
|
|
elif role == Qt.ItemDataRole.FontRole:
|
|
boldfont = QFont()
|
|
boldfont.setBold(True)
|
|
return QVariant(boldfont)
|
|
|
|
return QVariant()
|
|
|
|
def header_text(self, rat: RowAndTrack) -> str:
|
|
"""
|
|
Process possible section timing directives embeded in header
|
|
"""
|
|
|
|
if rat.note.endswith("+"):
|
|
return self.start_of_timed_section_header(rat)
|
|
|
|
elif rat.note.endswith("="):
|
|
return self.section_subtotal_header(rat)
|
|
|
|
elif rat.note == "-":
|
|
# If the hyphen is the only thing on the line, echo the note
|
|
# that started the section without the trailing "+".
|
|
for row_number in range(rat.row_number - 1, -1, -1):
|
|
row_rat = self.playlist_rows[row_number]
|
|
if self.is_header_row(row_number):
|
|
if row_rat.note.endswith("-"):
|
|
# We didn't find a matching section start
|
|
break
|
|
if row_rat.note.endswith("+"):
|
|
return f"[End: {row_rat.note[:-1]}]"
|
|
return "-"
|
|
|
|
return rat.note
|
|
|
|
def hide_played_tracks(self, hide: bool) -> None:
|
|
"""
|
|
Set played tracks hidden according to 'hide'
|
|
"""
|
|
|
|
self.played_tracks_hidden = hide
|
|
for row_number in range(len(self.playlist_rows)):
|
|
if self.is_played_row(row_number):
|
|
self.invalidate_row(row_number)
|
|
|
|
def insert_row(
|
|
self,
|
|
proposed_row_number: Optional[int],
|
|
track_id: Optional[int] = None,
|
|
note: str = "",
|
|
) -> None:
|
|
"""
|
|
Insert a row.
|
|
"""
|
|
|
|
log.debug(f"{self}: insert_row({proposed_row_number=}, {track_id=}, {note=})")
|
|
|
|
new_row_number = self._get_new_row_number(proposed_row_number)
|
|
|
|
with db.Session() as session:
|
|
super().beginInsertRows(QModelIndex(), new_row_number, new_row_number)
|
|
_ = PlaylistRows.insert_row(
|
|
session=session,
|
|
playlist_id=self.playlist_id,
|
|
new_row_number=new_row_number,
|
|
note=note,
|
|
track_id=track_id,
|
|
)
|
|
session.commit()
|
|
|
|
self.refresh_data(session)
|
|
super().endInsertRows()
|
|
|
|
self.signals.resize_rows_signal.emit(self.playlist_id)
|
|
self.reset_track_sequence_row_numbers()
|
|
self.invalidate_rows(list(range(new_row_number, len(self.playlist_rows))))
|
|
|
|
def invalidate_row(self, modified_row: int) -> None:
|
|
"""
|
|
Signal to view to refresh invalidated row
|
|
"""
|
|
|
|
self.dataChanged.emit(
|
|
self.index(modified_row, 0),
|
|
self.index(modified_row, self.columnCount() - 1),
|
|
)
|
|
|
|
def invalidate_rows(self, modified_rows: list[int]) -> None:
|
|
"""
|
|
Signal to view to refresh invlidated rows
|
|
"""
|
|
|
|
for modified_row in modified_rows:
|
|
self.invalidate_row(modified_row)
|
|
|
|
def is_header_row(self, row_number: int) -> bool:
|
|
"""
|
|
Return True if row is a header row, else False
|
|
"""
|
|
|
|
if row_number in self.playlist_rows:
|
|
return self.playlist_rows[row_number].path == ""
|
|
return False
|
|
|
|
def is_played_row(self, row_number: int) -> bool:
|
|
"""
|
|
Return True if row is an unplayed track row, else False
|
|
"""
|
|
|
|
return self.playlist_rows[row_number].played
|
|
|
|
def is_track_in_playlist(self, track_id: int) -> Optional[RowAndTrack]:
|
|
"""
|
|
If this track_id is in the playlist, return the RowAndTrack object
|
|
else return None
|
|
"""
|
|
|
|
for row_number in range(len(self.playlist_rows)):
|
|
if self.playlist_rows[row_number].track_id == track_id:
|
|
return self.playlist_rows[row_number]
|
|
|
|
return None
|
|
|
|
def mark_unplayed(self, row_numbers: list[int]) -> None:
|
|
"""
|
|
Mark row as unplayed
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
for row_number in row_numbers:
|
|
playlist_row = session.get(
|
|
PlaylistRows, self.playlist_rows[row_number].playlistrow_id
|
|
)
|
|
if not playlist_row:
|
|
return
|
|
playlist_row.played = False
|
|
session.commit()
|
|
self.refresh_row(session, row_number)
|
|
|
|
self.update_track_times()
|
|
self.invalidate_rows(row_numbers)
|
|
|
|
def move_rows(self, from_rows: list[int], to_row_number: int) -> None:
|
|
"""
|
|
Move the playlist rows given to to_row and below.
|
|
"""
|
|
|
|
log.debug(f"{self}: move_rows({from_rows=}, {to_row_number=}")
|
|
|
|
# Build a {current_row_number: new_row_number} dictionary
|
|
row_map: dict[int, int] = {}
|
|
|
|
# The destination row number will need to be reduced by the
|
|
# number of rows being move from above the destination row
|
|
# otherwise rows below the destination row will end up above the
|
|
# moved rows.
|
|
adjusted_to_row = to_row_number - len(
|
|
[a for a in from_rows if a < to_row_number]
|
|
)
|
|
|
|
# Put the from_row row numbers into the row_map. Ultimately the
|
|
# total number of elements in the playlist doesn't change, so
|
|
# check that adding the moved rows starting at to_row won't
|
|
# overshoot the end of the playlist.
|
|
if adjusted_to_row + len(from_rows) > len(self.playlist_rows):
|
|
next_to_row = len(self.playlist_rows) - len(from_rows)
|
|
else:
|
|
next_to_row = adjusted_to_row
|
|
|
|
# zip iterates from_row and to_row simultaneously from the
|
|
# respective sequences inside zip()
|
|
for from_row, to_row in zip(
|
|
from_rows, range(next_to_row, next_to_row + len(from_rows))
|
|
):
|
|
row_map[from_row] = to_row
|
|
|
|
# Move the remaining rows to the row_map. We want to fill it
|
|
# before (if there are gaps) and after (likewise) the rows that
|
|
# are moving.
|
|
# zip iterates old_row and new_row simultaneously from the
|
|
# respective sequences inside zip()
|
|
for old_row, new_row in zip(
|
|
[x for x in self.playlist_rows.keys() if x not in from_rows],
|
|
[y for y in range(len(self.playlist_rows)) if y not in row_map.values()],
|
|
):
|
|
# Optimise: only add to map if there is a change
|
|
if old_row != new_row:
|
|
row_map[old_row] = new_row
|
|
|
|
# Check to see whether any rows in track_sequence have moved
|
|
if track_sequence.previous and track_sequence.previous.row_number in row_map:
|
|
track_sequence.previous.row_number = row_map[
|
|
track_sequence.previous.row_number
|
|
]
|
|
if track_sequence.current and track_sequence.current.row_number in row_map:
|
|
track_sequence.current.row_number = row_map[
|
|
track_sequence.current.row_number
|
|
]
|
|
if track_sequence.next and track_sequence.next.row_number in row_map:
|
|
track_sequence.next.row_number = row_map[track_sequence.next.row_number]
|
|
|
|
# For SQLAlchemy, build a list of dictionaries that map playlistrow_id to
|
|
# new row number:
|
|
sqla_map: list[dict[str, int]] = []
|
|
for oldrow, newrow in row_map.items():
|
|
playlistrow_id = self.playlist_rows[oldrow].playlistrow_id
|
|
sqla_map.append({"playlistrow_id": playlistrow_id, "row_number": newrow})
|
|
|
|
with db.Session() as session:
|
|
PlaylistRows.update_plr_row_numbers(session, self.playlist_id, sqla_map)
|
|
session.commit()
|
|
# Update playlist_rows
|
|
self.refresh_data(session)
|
|
|
|
# Update display
|
|
self.reset_track_sequence_row_numbers()
|
|
self.update_track_times()
|
|
self.invalidate_rows(list(row_map.keys()))
|
|
|
|
def move_rows_between_playlists(
|
|
self, from_rows: list[int], to_row_number: int, to_playlist_id: int
|
|
) -> None:
|
|
"""
|
|
Move the playlist rows given to to_row and below of to_playlist.
|
|
"""
|
|
|
|
log.debug(
|
|
f"{self}: move_rows_between_playlists({from_rows=}, "
|
|
f"{to_row_number=}, {to_playlist_id=}"
|
|
)
|
|
|
|
# Row removal must be wrapped in beginRemoveRows ..
|
|
# endRemoveRows and the row range must be contiguous. Process
|
|
# the highest rows first so the lower row numbers are unchanged
|
|
row_groups = self._reversed_contiguous_row_groups(from_rows)
|
|
next_to_row = to_row_number
|
|
|
|
# Prepare destination playlist for a reset
|
|
self.signals.begin_reset_model_signal.emit(to_playlist_id)
|
|
|
|
with db.Session() as session:
|
|
# Make room in destination playlist
|
|
max_destination_row_number = PlaylistRows.get_last_used_row(
|
|
session, to_playlist_id
|
|
)
|
|
if (
|
|
max_destination_row_number
|
|
and to_row_number <= max_destination_row_number
|
|
):
|
|
# Move the destination playlist rows down to make room.
|
|
PlaylistRows.move_rows_down(
|
|
session, to_playlist_id, to_row_number, len(from_rows)
|
|
)
|
|
|
|
for row_group in row_groups:
|
|
super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group))
|
|
for playlist_row in PlaylistRows.plrids_to_plrs(
|
|
session,
|
|
self.playlist_id,
|
|
[self.playlist_rows[a].playlistrow_id for a in row_group],
|
|
):
|
|
if (
|
|
track_sequence.current
|
|
and playlist_row.id == track_sequence.current.playlistrow_id
|
|
):
|
|
# Don't move current track
|
|
continue
|
|
playlist_row.playlist_id = to_playlist_id
|
|
playlist_row.row_number = next_to_row
|
|
next_to_row += 1
|
|
self.refresh_data(session)
|
|
super().endRemoveRows()
|
|
# We need to remove gaps in row numbers after tracks have
|
|
# moved.
|
|
PlaylistRows.fixup_rownumbers(session, self.playlist_id)
|
|
self.refresh_data(session)
|
|
session.commit()
|
|
|
|
# Reset of model must come after session has been closed
|
|
self.reset_track_sequence_row_numbers()
|
|
self.signals.end_reset_model_signal.emit(to_playlist_id)
|
|
self.update_track_times()
|
|
|
|
def move_track_add_note(
|
|
self, new_row_number: int, existing_rat: RowAndTrack, note: str
|
|
) -> None:
|
|
"""
|
|
Move existing_rat track to new_row_number and append note to any existing note
|
|
"""
|
|
|
|
log.debug(f"{self}: move_track_add_note({new_row_number=}, {existing_rat=}, {note=}")
|
|
|
|
if note:
|
|
with db.Session() as session:
|
|
playlist_row = session.get(PlaylistRows, existing_rat.playlistrow_id)
|
|
if playlist_row:
|
|
if playlist_row.note:
|
|
playlist_row.note += "\n" + note
|
|
else:
|
|
playlist_row.note = note
|
|
session.commit()
|
|
|
|
# Carry out the move outside of the session context to ensure
|
|
# database updated with any note change
|
|
self.move_rows([existing_rat.row_number], new_row_number)
|
|
self.signals.resize_rows_signal.emit(self.playlist_id)
|
|
|
|
def move_track_to_header(
|
|
self,
|
|
header_row_number: int,
|
|
existing_rat: RowAndTrack,
|
|
note: Optional[str],
|
|
) -> None:
|
|
"""
|
|
Add the existing_rat track details to the existing header at header_row_number
|
|
"""
|
|
|
|
log.debug(f"{self}: move_track_to_header({header_row_number=}, {existing_rat=}, {note=}")
|
|
|
|
if existing_rat.track_id:
|
|
if note and existing_rat.note:
|
|
note += "\n" + existing_rat.note
|
|
self.add_track_to_header(header_row_number, existing_rat.track_id, note)
|
|
self.delete_rows([existing_rat.row_number])
|
|
|
|
def obs_scene_change(self, row_number: int) -> None:
|
|
"""
|
|
Check this row and any preceding headers for OBS scene change command
|
|
and execute any found
|
|
"""
|
|
|
|
log.debug(f"{self}: obs_scene_change({row_number=})")
|
|
|
|
# Check any headers before this row
|
|
idx = row_number - 1
|
|
while self.is_header_row(idx):
|
|
idx -= 1
|
|
# Step through headers in row order and finish with this row
|
|
for chkrow in range(idx + 1, row_number + 1):
|
|
match_obj = scene_change_re.search(self.playlist_rows[chkrow].note)
|
|
if match_obj:
|
|
scene_name = match_obj.group(1)
|
|
if scene_name:
|
|
ws = obswebsocket.obsws(
|
|
host=Config.OBS_HOST,
|
|
port=Config.OBS_PORT,
|
|
password=Config.OBS_PASSWORD,
|
|
)
|
|
try:
|
|
ws.connect()
|
|
ws.call(
|
|
obswebsocket.requests.SetCurrentProgramScene(
|
|
sceneName=scene_name
|
|
)
|
|
)
|
|
log.debug(f"{self}: OBS scene changed to '{scene_name}'")
|
|
continue
|
|
except obswebsocket.exceptions.ConnectionFailure:
|
|
log.error(f"{self}: OBS connection refused")
|
|
return
|
|
|
|
def previous_track_ended(self) -> None:
|
|
"""
|
|
Notification from musicmuster that the previous track has ended.
|
|
|
|
Actions required:
|
|
- sanity check
|
|
- update display
|
|
"""
|
|
|
|
log.debug(f"{self}: previous_track_ended()")
|
|
|
|
# Sanity check
|
|
if not track_sequence.previous:
|
|
log.error(f"{self}: playlistmodel:previous_track_ended called with no current track")
|
|
return
|
|
if track_sequence.previous.row_number is None:
|
|
log.error(
|
|
f"{self}: previous_track_ended called with no row number "
|
|
f"({track_sequence.previous=})"
|
|
)
|
|
return
|
|
|
|
# Update display
|
|
self.invalidate_row(track_sequence.previous.row_number)
|
|
|
|
def refresh_data(self, session: db.session) -> None:
|
|
"""Populate dicts for data calls"""
|
|
|
|
# Populate self.playlist_rows with playlist data
|
|
self.playlist_rows.clear()
|
|
for p in PlaylistRows.deep_rows(session, self.playlist_id):
|
|
self.playlist_rows[p.row_number] = RowAndTrack(p)
|
|
|
|
def refresh_row(self, session, row_number):
|
|
"""Populate dict for one row from database"""
|
|
|
|
p = PlaylistRows.deep_row(session, self.playlist_id, row_number)
|
|
self.playlist_rows[row_number] = RowAndTrack(p)
|
|
|
|
def remove_track(self, row_number: int) -> None:
|
|
"""
|
|
Remove track from row, retaining row as a header row
|
|
"""
|
|
|
|
log.debug(f"{self}: remove_track({row_number=})")
|
|
|
|
with db.Session() as session:
|
|
playlist_row = session.get(
|
|
PlaylistRows, self.playlist_rows[row_number].playlistrow_id
|
|
)
|
|
if playlist_row:
|
|
playlist_row.track_id = None
|
|
session.commit()
|
|
self.refresh_row(session, row_number)
|
|
self.invalidate_row(row_number)
|
|
|
|
def rescan_track(self, row_number: int) -> None:
|
|
"""
|
|
Rescan track at passed row number
|
|
"""
|
|
|
|
track_id = self.playlist_rows[row_number].track_id
|
|
if track_id:
|
|
with db.Session() as session:
|
|
track = session.get(Tracks, track_id)
|
|
set_track_metadata(track)
|
|
self.refresh_row(session, row_number)
|
|
self.update_track_times()
|
|
self.invalidate_row(row_number)
|
|
self.signals.resize_rows_signal.emit(self.playlist_id)
|
|
session.commit()
|
|
|
|
def reset_track_sequence_row_numbers(self) -> None:
|
|
"""
|
|
Signal handler for when row ordering has changed.
|
|
|
|
Example: row 4 is marked as next. Row 2 is deleted. The PlaylistRows table will
|
|
be correctly updated with change of row number, but track_sequence.next will still
|
|
contain row_number==4. This function fixes up the track_sequence row numbers by
|
|
looking up the playlistrow_id and retrieving the row number from the database.
|
|
"""
|
|
|
|
log.debug(f"{self}: reset_track_sequence_row_numbers()")
|
|
|
|
# Check the track_sequence.next, current and previous plrs and
|
|
# update the row number
|
|
with db.Session() as session:
|
|
for ts in [
|
|
track_sequence.next,
|
|
track_sequence.current,
|
|
track_sequence.previous,
|
|
]:
|
|
if ts and ts.playlist_id == self.playlist_id and ts.row_number:
|
|
playlist_row = session.get(PlaylistRows, ts.playlistrow_id)
|
|
if playlist_row and playlist_row.row_number != ts.row_number:
|
|
ts.row_number = playlist_row.row_number
|
|
|
|
self.update_track_times()
|
|
|
|
def _reversed_contiguous_row_groups(
|
|
self, row_numbers: list[int]
|
|
) -> list[list[int]]:
|
|
"""
|
|
Take the list of row numbers and split into groups of contiguous rows. Return as a list
|
|
of lists with the highest row numbers first.
|
|
|
|
Example:
|
|
input [2, 3, 4, 5, 7, 9, 10, 13, 17, 20, 21]
|
|
return: [[20, 21], [17], [13], [9, 10], [7], [2, 3, 4, 5]]
|
|
"""
|
|
|
|
log.debug(f"{self}: _reversed_contiguous_row_groups({row_numbers=} called")
|
|
|
|
result: list[list[int]] = []
|
|
temp: list[int] = []
|
|
last_value = row_numbers[0] - 1
|
|
|
|
for idx in range(len(row_numbers)):
|
|
if row_numbers[idx] != last_value + 1:
|
|
result.append(temp)
|
|
temp = []
|
|
last_value = row_numbers[idx]
|
|
temp.append(last_value)
|
|
if temp:
|
|
result.append(temp)
|
|
result.reverse()
|
|
|
|
log.debug(f"{self}: _reversed_contiguous_row_groups() returned: {result=}")
|
|
return result
|
|
|
|
def rowCount(self, index: QModelIndex = QModelIndex()) -> int:
|
|
"""Standard function for view"""
|
|
|
|
return len(self.playlist_rows)
|
|
|
|
def section_subtotal_header(self, rat: RowAndTrack) -> str:
|
|
"""
|
|
Process this row as subtotal within a timed section and
|
|
return display text for this row
|
|
"""
|
|
|
|
count: int = 0
|
|
unplayed_count: int = 0
|
|
duration: int = 0
|
|
|
|
# Show subtotal
|
|
for row_number in range(rat.row_number - 1, -1, -1):
|
|
row_rat = self.playlist_rows[row_number]
|
|
if self.is_header_row(row_number):
|
|
if row_rat.note.endswith("-"):
|
|
# There was no start of section
|
|
return rat.note
|
|
if row_rat.note.endswith(("+", "=")):
|
|
# If we are playing this section, also
|
|
# calculate end time if all tracks are played.
|
|
end_time_str = ""
|
|
if (
|
|
track_sequence.current
|
|
and track_sequence.current.end_time
|
|
and (
|
|
row_number
|
|
< track_sequence.current.row_number
|
|
< rat.row_number
|
|
)
|
|
):
|
|
section_end_time = (
|
|
track_sequence.current.end_time
|
|
+ dt.timedelta(milliseconds=duration)
|
|
)
|
|
end_time_str = (
|
|
", section end time "
|
|
+ section_end_time.strftime(Config.TRACK_TIME_FORMAT)
|
|
)
|
|
stripped_note = rat.note[:-1].strip()
|
|
if stripped_note:
|
|
return (
|
|
f"{stripped_note} ["
|
|
f"{unplayed_count}/{count} track{'s' if count > 1 else ''} "
|
|
f"({ms_to_mmss(duration)}) unplayed{end_time_str}]"
|
|
)
|
|
else:
|
|
return (
|
|
f"[{unplayed_count}/{count} track{'s' if count > 1 else ''} "
|
|
f"({ms_to_mmss(duration)}) unplayed{end_time_str}]"
|
|
)
|
|
else:
|
|
continue
|
|
else:
|
|
count += 1
|
|
if not row_rat.played:
|
|
unplayed_count += 1
|
|
duration += row_rat.duration
|
|
|
|
# Should never get here
|
|
return f"Error calculating subtotal ({row_rat.note})"
|
|
|
|
def selection_is_sortable(self, row_numbers: list[int]) -> bool:
|
|
"""
|
|
Return True if the selection is sortable. That means:
|
|
- at least two rows selected
|
|
- selected rows are contiguous
|
|
- selected rows do not include any header rows
|
|
"""
|
|
|
|
# at least two rows selected
|
|
if len(row_numbers) < 2:
|
|
return False
|
|
|
|
# selected rows are contiguous
|
|
if sorted(row_numbers) != list(range(min(row_numbers), max(row_numbers) + 1)):
|
|
return False
|
|
|
|
# selected rows do not include any header rows
|
|
for row_number in row_numbers:
|
|
if self.is_header_row(row_number):
|
|
return False
|
|
|
|
return True
|
|
|
|
def set_next_row(self, row_number: Optional[int]) -> None:
|
|
"""
|
|
Set row_number as next track. If row_number is None, clear next track.
|
|
|
|
Return True if successful else False.
|
|
"""
|
|
|
|
log.debug(f"{self}: set_next_row({row_number=})")
|
|
|
|
if row_number is None:
|
|
# Clear next track
|
|
if track_sequence.next is not None:
|
|
track_sequence.set_next(None)
|
|
else:
|
|
# Get playlistrow_id of row
|
|
try:
|
|
rat = self.playlist_rows[row_number]
|
|
except IndexError:
|
|
log.error(f"{self} set_track_sequence.next({row_number=}, IndexError")
|
|
return
|
|
if rat.track_id is None or rat.row_number is None:
|
|
log.error(
|
|
f"{self} .set_track_sequence.next({row_number=}, "
|
|
f"No track / row number {rat.track_id=}, {rat.row_number=}"
|
|
)
|
|
return
|
|
|
|
old_next_row: Optional[int] = None
|
|
if track_sequence.next:
|
|
old_next_row = track_sequence.next.row_number
|
|
|
|
track_sequence.set_next(rat)
|
|
|
|
if Config.WIKIPEDIA_ON_NEXT:
|
|
self.signals.search_wikipedia_signal.emit(
|
|
self.playlist_rows[row_number].title
|
|
)
|
|
if Config.SONGFACTS_ON_NEXT:
|
|
self.signals.search_songfacts_signal.emit(
|
|
self.playlist_rows[row_number].title
|
|
)
|
|
if old_next_row is not None:
|
|
self.invalidate_row(old_next_row)
|
|
self.invalidate_row(row_number)
|
|
|
|
self.signals.next_track_changed_signal.emit()
|
|
self.update_track_times()
|
|
|
|
def setData(
|
|
self,
|
|
index: QModelIndex,
|
|
value: str | float,
|
|
role: int = Qt.ItemDataRole.EditRole,
|
|
) -> bool:
|
|
"""
|
|
Update model with edited data
|
|
"""
|
|
|
|
if not index.isValid() or role != Qt.ItemDataRole.EditRole:
|
|
return False
|
|
|
|
row_number = index.row()
|
|
column = index.column()
|
|
|
|
with db.Session() as session:
|
|
playlist_row = session.get(
|
|
PlaylistRows, self.playlist_rows[row_number].playlistrow_id
|
|
)
|
|
if not playlist_row:
|
|
log.error(f"{self}: Error saving data: {row_number=}, {column=}, {value=}")
|
|
return False
|
|
|
|
if playlist_row.track_id:
|
|
if column in [Col.TITLE.value, Col.ARTIST.value, Col.INTRO.value]:
|
|
track = session.get(Tracks, playlist_row.track_id)
|
|
if not track:
|
|
log.error(f"{self}: Error retreiving track: {playlist_row=}")
|
|
return False
|
|
if column == Col.TITLE.value:
|
|
track.title = str(value)
|
|
elif column == Col.ARTIST.value:
|
|
track.artist = str(value)
|
|
elif column == Col.INTRO.value:
|
|
track.intro = int(round(float(value), 1) * 1000)
|
|
else:
|
|
log.error(f"{self}: Error updating track: {column=}, {value=}")
|
|
return False
|
|
elif column == Col.NOTE.value:
|
|
playlist_row.note = str(value)
|
|
|
|
else:
|
|
# This is a header row
|
|
if column == HEADER_NOTES_COLUMN:
|
|
playlist_row.note = str(value)
|
|
|
|
# commit changes before refreshing data
|
|
session.commit()
|
|
self.refresh_row(session, row_number)
|
|
self.dataChanged.emit(index, index, [Qt.ItemDataRole.DisplayRole, role])
|
|
|
|
return True
|
|
|
|
def sort_by_artist(self, row_numbers: list[int]) -> None:
|
|
"""
|
|
Sort selected rows by artist
|
|
"""
|
|
|
|
self.sort_by_attribute(row_numbers, "artist")
|
|
|
|
def sort_by_attribute(self, row_numbers: list[int], attr_name: str) -> None:
|
|
"""
|
|
Sort selected rows by passed attribute name where 'attribute' is a
|
|
key in PlaylistRowData
|
|
"""
|
|
|
|
# Create a subset of playlist_rows with the rows we are
|
|
# interested in
|
|
shortlist_rows = {k: self.playlist_rows[k] for k in row_numbers}
|
|
sorted_list = [
|
|
playlist_row.row_number
|
|
for playlist_row in sorted(shortlist_rows.values(), key=attrgetter(attr_name))
|
|
]
|
|
self.move_rows(sorted_list, min(sorted_list))
|
|
|
|
def sort_by_duration(self, row_numbers: list[int]) -> None:
|
|
"""
|
|
Sort selected rows by duration
|
|
"""
|
|
|
|
self.sort_by_attribute(row_numbers, "duration")
|
|
|
|
def sort_by_lastplayed(self, row_numbers: list[int]) -> None:
|
|
"""
|
|
Sort selected rows by lastplayed
|
|
"""
|
|
|
|
self.sort_by_attribute(row_numbers, "lastplayed")
|
|
|
|
def sort_randomly(self, row_numbers: list[int]) -> None:
|
|
"""
|
|
Sort selected rows randomly
|
|
"""
|
|
|
|
shuffle(row_numbers)
|
|
self.move_rows(row_numbers, min(row_numbers))
|
|
|
|
def sort_by_title(self, row_numbers: list[int]) -> None:
|
|
"""
|
|
Sort selected rows by title
|
|
"""
|
|
|
|
self.sort_by_attribute(row_numbers, "title")
|
|
|
|
def start_of_timed_section_header(self, rat: RowAndTrack) -> str:
|
|
"""
|
|
Process this row as the start of a timed section and
|
|
return display text for this row
|
|
"""
|
|
|
|
count: int = 0
|
|
unplayed_count: int = 0
|
|
duration: int = 0
|
|
|
|
for row_number in range(rat.row_number + 1, len(self.playlist_rows)):
|
|
row_rat = self.playlist_rows[row_number]
|
|
if self.is_header_row(row_number):
|
|
if row_rat.note.endswith("-"):
|
|
return (
|
|
f"{rat.note[:-1].strip()} "
|
|
f"[{count} tracks, {ms_to_mmss(duration)} unplayed]"
|
|
)
|
|
else:
|
|
continue
|
|
else:
|
|
count += 1
|
|
if not row_rat.played:
|
|
unplayed_count += 1
|
|
duration += row_rat.duration
|
|
return (
|
|
f"{rat.note[:-1].strip()} "
|
|
f"[{count} tracks, {ms_to_mmss(duration, none='none')} "
|
|
"unplayed (to end of playlist)]"
|
|
)
|
|
|
|
def supportedDropActions(self) -> Qt.DropAction:
|
|
return Qt.DropAction.MoveAction | Qt.DropAction.CopyAction
|
|
|
|
def tooltip_role(self, row: int, column: int, rat: RowAndTrack) -> QVariant:
|
|
"""
|
|
Return tooltip. Currently only used for last_played column.
|
|
"""
|
|
|
|
if column != Col.LAST_PLAYED.value:
|
|
return QVariant()
|
|
with db.Session() as session:
|
|
track_id = self.playlist_rows[row].track_id
|
|
if not track_id:
|
|
return QVariant()
|
|
playdates = Playdates.last_playdates(session, track_id)
|
|
return QVariant(
|
|
"<br>".join(
|
|
[
|
|
a.lastplayed.strftime(Config.LAST_PLAYED_TOOLTIP_DATE_FORMAT)
|
|
for a in reversed(playdates)
|
|
]
|
|
)
|
|
)
|
|
|
|
def update_track_times(self) -> None:
|
|
"""
|
|
Update track start/end times in self.playlist_rows
|
|
"""
|
|
|
|
log.debug(f"{self}: update_track_times()")
|
|
|
|
next_start_time: Optional[dt.datetime] = None
|
|
update_rows: list[int] = []
|
|
row_count = len(self.playlist_rows)
|
|
|
|
current_track_row = None
|
|
next_track_row = None
|
|
if (
|
|
track_sequence.current
|
|
and track_sequence.current.playlist_id == self.playlist_id
|
|
):
|
|
current_track_row = track_sequence.current.row_number
|
|
# Update current track details now so that they are available
|
|
# when we deal with next track row which may be above current
|
|
# track row.
|
|
self.playlist_rows[current_track_row].set_forecast_start_time(
|
|
update_rows, track_sequence.current.start_time
|
|
)
|
|
|
|
if track_sequence.next and track_sequence.next.playlist_id == self.playlist_id:
|
|
next_track_row = track_sequence.next.row_number
|
|
|
|
for row_number in range(row_count):
|
|
rat = self.playlist_rows[row_number]
|
|
|
|
# Don't update times for tracks that have been played, for
|
|
# unreadable tracks or for the current track, handled above.
|
|
if (
|
|
rat.played
|
|
or row_number == current_track_row
|
|
or (rat.path and file_is_unreadable(rat.path))
|
|
):
|
|
continue
|
|
|
|
# Reset start time if timing in header
|
|
if self.is_header_row(row_number):
|
|
header_time = get_embedded_time(rat.note)
|
|
if header_time:
|
|
next_start_time = header_time
|
|
continue
|
|
|
|
# Set start time for next row if we have a current track
|
|
if (
|
|
row_number == next_track_row
|
|
and track_sequence.current
|
|
and track_sequence.current.end_time
|
|
):
|
|
next_start_time = rat.set_forecast_start_time(
|
|
update_rows, track_sequence.current.end_time
|
|
)
|
|
continue
|
|
|
|
# If we're between the current and next row, zero out
|
|
# times
|
|
if (current_track_row or row_count) < row_number < (next_track_row or 0):
|
|
rat.set_forecast_start_time(update_rows, None)
|
|
continue
|
|
|
|
# Set start/end
|
|
next_start_time = rat.set_forecast_start_time(update_rows, next_start_time)
|
|
|
|
# Update start/stop times of rows that have changed
|
|
for updated_row in update_rows:
|
|
self.dataChanged.emit(
|
|
self.index(updated_row, Col.START_TIME.value),
|
|
self.index(updated_row, Col.END_TIME.value),
|
|
)
|
|
|
|
|
|
class PlaylistProxyModel(QSortFilterProxyModel):
|
|
"""
|
|
For searching and filtering
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
source_model: PlaylistModel,
|
|
*args: QObject,
|
|
**kwargs: QObject,
|
|
) -> None:
|
|
self.source_model = source_model
|
|
super().__init__(*args, **kwargs)
|
|
|
|
self.setSourceModel(source_model)
|
|
# Search all columns
|
|
self.setFilterKeyColumn(-1)
|
|
|
|
def __repr__(self) -> str:
|
|
return f"<PlaylistProxyModel: source_model={self.source_model}>"
|
|
|
|
def filterAcceptsRow(self, source_row: int, source_parent: QModelIndex) -> bool:
|
|
"""
|
|
Subclass to filter by played status. Return True to show this row, False to hide it.
|
|
"""
|
|
|
|
if self.source_model.played_tracks_hidden:
|
|
if self.source_model.is_played_row(source_row):
|
|
# Don't hide current track
|
|
if (
|
|
track_sequence.current
|
|
and track_sequence.current.playlist_id
|
|
== self.source_model.playlist_id
|
|
and track_sequence.current.row_number == source_row
|
|
):
|
|
return True
|
|
|
|
# Don't hide next track
|
|
if (
|
|
track_sequence.next
|
|
and track_sequence.next.playlist_id == self.source_model.playlist_id
|
|
and track_sequence.next.row_number == source_row
|
|
):
|
|
return True
|
|
|
|
# Handle previous track
|
|
if track_sequence.previous:
|
|
if (
|
|
track_sequence.previous.playlist_id
|
|
!= self.source_model.playlist_id
|
|
or track_sequence.previous.row_number != source_row
|
|
):
|
|
# This row isn't our previous track: hide it
|
|
return False
|
|
if track_sequence.current and track_sequence.current.start_time:
|
|
# This row is our previous track. Don't hide it
|
|
# until HIDE_AFTER_PLAYING_OFFSET milliseconds
|
|
# after current track has started
|
|
if track_sequence.current.start_time and dt.datetime.now() > (
|
|
track_sequence.current.start_time
|
|
+ dt.timedelta(
|
|
milliseconds=Config.HIDE_AFTER_PLAYING_OFFSET
|
|
)
|
|
):
|
|
return False
|
|
else:
|
|
# Invalidate this row in
|
|
# HIDE_AFTER_PLAYING_OFFSET and a bit
|
|
# milliseconds so that it hides then. We add
|
|
# 100mS on so that the if clause above is
|
|
# true next time through.
|
|
QTimer.singleShot(
|
|
Config.HIDE_AFTER_PLAYING_OFFSET + 100,
|
|
lambda: self.source_model.invalidate_row(source_row),
|
|
)
|
|
return True
|
|
# Next track not playing yet so don't hide previous
|
|
else:
|
|
return True
|
|
|
|
# No previous track so hide this played track immediately
|
|
return False
|
|
|
|
return super().filterAcceptsRow(source_row, source_parent)
|
|
|
|
def set_incremental_search(self, search_string: str) -> None:
|
|
"""
|
|
Update search pattern
|
|
"""
|
|
|
|
self.setFilterRegularExpression(
|
|
QRegularExpression(
|
|
search_string, QRegularExpression.PatternOption.CaseInsensitiveOption
|
|
)
|
|
)
|
|
|
|
# ######################################
|
|
# Forward functions not handled in proxy
|
|
# ######################################
|
|
|
|
def current_track_started(self):
|
|
return self.source_model.current_track_started()
|
|
|
|
def delete_rows(self, row_numbers: list[int]) -> None:
|
|
return self.source_model.delete_rows(row_numbers)
|
|
|
|
def get_duplicate_rows(self) -> list[int]:
|
|
return self.source_model.get_duplicate_rows()
|
|
|
|
def get_rows_duration(self, row_numbers: list[int]) -> int:
|
|
return self.source_model.get_rows_duration(row_numbers)
|
|
|
|
def get_row_info(self, row_number: int) -> RowAndTrack:
|
|
return self.source_model.get_row_info(row_number)
|
|
|
|
def get_row_track_path(self, row_number: int) -> str:
|
|
return self.source_model.get_row_track_path(row_number)
|
|
|
|
def get_unplayed_rows(self) -> list[int]:
|
|
return self.source_model.get_unplayed_rows()
|
|
|
|
def hide_played_tracks(self, hide: bool) -> None:
|
|
return self.source_model.hide_played_tracks(hide)
|
|
|
|
def insert_row(
|
|
self,
|
|
proposed_row_number: Optional[int],
|
|
track_id: Optional[int] = None,
|
|
note: str = "",
|
|
) -> None:
|
|
return self.source_model.insert_row(proposed_row_number, track_id, note)
|
|
|
|
def is_header_row(self, row_number: int) -> bool:
|
|
return self.source_model.is_header_row(row_number)
|
|
|
|
def is_played_row(self, row_number: int) -> bool:
|
|
return self.source_model.is_played_row(row_number)
|
|
|
|
def is_track_in_playlist(self, track_id: int) -> Optional[RowAndTrack]:
|
|
return self.source_model.is_track_in_playlist(track_id)
|
|
|
|
def mark_unplayed(self, row_numbers: list[int]) -> None:
|
|
return self.source_model.mark_unplayed(row_numbers)
|
|
|
|
def move_rows(self, from_rows: list[int], to_row_number: int) -> None:
|
|
return self.source_model.move_rows(from_rows, to_row_number)
|
|
|
|
def move_rows_between_playlists(
|
|
self, from_rows: list[int], to_row_number: int, to_playlist_id: int
|
|
) -> None:
|
|
return self.source_model.move_rows_between_playlists(
|
|
from_rows, to_row_number, to_playlist_id
|
|
)
|
|
|
|
def move_track_add_note(
|
|
self, new_row_number: int, existing_rat: RowAndTrack, note: str
|
|
) -> None:
|
|
return self.source_model.move_track_add_note(new_row_number, existing_rat, note)
|
|
|
|
def move_track_to_header(
|
|
self,
|
|
header_row_number: int,
|
|
existing_rat: RowAndTrack,
|
|
note: Optional[str],
|
|
) -> None:
|
|
return self.source_model.move_track_to_header(
|
|
header_row_number, existing_rat, note
|
|
)
|
|
|
|
def previous_track_ended(self) -> None:
|
|
return self.source_model.previous_track_ended()
|
|
|
|
def remove_track(self, row_number: int) -> None:
|
|
return self.source_model.remove_track(row_number)
|
|
|
|
def rescan_track(self, row_number: int) -> None:
|
|
return self.source_model.rescan_track(row_number)
|
|
|
|
def set_next_row(self, row_number: Optional[int]) -> None:
|
|
self.source_model.set_next_row(row_number)
|
|
|
|
def sort_by_artist(self, row_numbers: list[int]) -> None:
|
|
return self.source_model.sort_by_artist(row_numbers)
|
|
|
|
def sort_by_duration(self, row_numbers: list[int]) -> None:
|
|
return self.source_model.sort_by_duration(row_numbers)
|
|
|
|
def sort_by_lastplayed(self, row_numbers: list[int]) -> None:
|
|
return self.source_model.sort_by_lastplayed(row_numbers)
|
|
|
|
def sort_randomly(self, row_numbers: list[int]) -> None:
|
|
return self.source_model.sort_randomly(row_numbers)
|
|
|
|
def sort_by_title(self, row_numbers: list[int]) -> None:
|
|
return self.source_model.sort_by_title(row_numbers)
|
|
|
|
def update_track_times(self) -> None:
|
|
return self.source_model.update_track_times()
|