musicmuster/app/playlistmodel.py
2023-12-01 17:08:13 +00:00

1549 lines
55 KiB
Python

import obsws_python as obs # type: ignore
import re
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import auto, Enum
from operator import attrgetter
from pprint import pprint
from typing import List, Optional
from PyQt6.QtCore import (
QAbstractTableModel,
QModelIndex,
QRegularExpression,
QSortFilterProxyModel,
Qt,
QTimer,
QVariant,
)
from PyQt6.QtGui import (
QBrush,
QColor,
QFont,
)
from classes import track_sequence, MusicMusterSignals, PlaylistTrack
from config import Config
from dbconfig import scoped_session, Session
from helpers import (
file_is_unreadable,
get_embedded_time,
get_relative_date,
open_in_audacity,
ms_to_mmss,
set_track_metadata,
)
from log import log
from models import NoteColours, Playdates, PlaylistRows, Tracks
HEADER_NOTES_COLUMN = 1
scene_change_re = re.compile(r"SetScene=\[([^[\]]*)\]")
class Col(Enum):
START_GAP = 0
TITLE = auto()
ARTIST = auto()
DURATION = auto()
START_TIME = auto()
END_TIME = auto()
LAST_PLAYED = auto()
BITRATE = auto()
NOTE = auto()
class PlaylistRowData:
def __init__(self, plr: PlaylistRows) -> None:
"""
Populate PlaylistRowData from database PlaylistRows record
"""
self.artist: str = ""
self.bitrate = 0
self.duration: int = 0
self.lastplayed: datetime = Config.EPOCH
self.path = ""
self.played = False
self.start_gap: Optional[int] = None
self.title: str = ""
self.plrid: int = plr.id
self.plr_rownum: int = plr.plr_rownum
self.note: str = plr.note
self.track_id = plr.track_id
if plr.track:
self.start_gap = plr.track.start_gap
self.title = plr.track.title
self.artist = plr.track.artist
self.duration = plr.track.duration
self.played = plr.played
if plr.track.playdates:
self.lastplayed = max([a.lastplayed for a in plr.track.playdates])
else:
self.lastplayed = Config.EPOCH
self.bitrate = plr.track.bitrate or 0
self.path = plr.track.path
def __repr__(self) -> str:
return (
f"<PlaylistRowData: plrid={self.plrid}, plr_rownum={self.plr_rownum}, "
f"note='{self.note}', title='{self.title}', artist='{self.artist}'>"
)
@dataclass
class StartEndTimes:
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
class PlaylistModel(QAbstractTableModel):
"""
The Playlist Model
Update strategy: update the database and then refresh the
row-indexed cached copy (self.playlist_rows). Do not edit
self.playlist_rows directly because keeping it and the
database in sync is uncessarily challenging.
refresh_row() will populate one row of playlist_rows from the
database
refresh_data() will repopulate all of playlist_rows from the
database
"""
def __init__(
self,
playlist_id: int,
*args,
**kwargs,
):
self.playlist_id = playlist_id
super().__init__(*args, **kwargs)
self.playlist_rows: dict[int, PlaylistRowData] = {}
self.start_end_times: dict[int, StartEndTimes] = {}
self.signals = MusicMusterSignals()
self.played_tracks_hidden = False
self.signals.begin_reset_model_signal.connect(self.begin_reset_model)
self.signals.end_reset_model_signal.connect(self.end_reset_model)
self.signals.row_order_changed_signal.connect(self.row_order_changed)
with Session() as session:
# Ensure row numbers in playlist are contiguous
PlaylistRows.fixup_rownumbers(session, playlist_id)
# Populate self.playlist_rows
self.refresh_data(session)
self.update_track_times()
def __repr__(self) -> str:
return (
f"<PlaylistModel: playlist_id={self.playlist_id}, {self.rowCount()} rows>"
)
def add_track_to_header(
self, row_number: int, track_id: int, note: Optional[str] = None
) -> None:
"""
Add track to existing header row
"""
# Get existing row
try:
prd = self.playlist_rows[row_number]
except KeyError:
log.error(
f"KeyError in PlaylistModel:add_track_to_header "
f"{row_number=}, {track_id=}, {len(self.playlist_rows)=}"
)
return
if prd.path:
log.error(
f"Error in PlaylistModel:add_track_to_header ({prd=}, "
"Header row already has track associated"
)
return
with Session() as session:
plr = session.get(PlaylistRows, prd.plrid)
if plr:
# Add track to PlaylistRows
plr.track_id = track_id
# Add any further note (header will already have a note)
if note:
plr.note += "\n" + note
# Reset header row spanning
self.signals.span_cells_signal.emit(
self.playlist_id, row_number, HEADER_NOTES_COLUMN, 1, 1
)
# Update local copy
self.refresh_row(session, row_number)
# Repaint row
self.invalidate_row(row_number)
self.signals.resize_rows_signal.emit(self.playlist_id)
def background_role(self, row: int, column: int, prd: PlaylistRowData) -> QBrush:
"""Return background setting"""
# Handle entire row colouring
# Header row
if self.is_header_row(row):
# Check for specific header colouring
with Session() as session:
note_colour = NoteColours.get_colour(session, prd.note)
if note_colour:
return QBrush(QColor(note_colour))
else:
return QBrush(QColor(Config.COLOUR_NOTES_PLAYLIST))
# Unreadable track file
if file_is_unreadable(prd.path):
return QBrush(QColor(Config.COLOUR_UNREADABLE))
# Current track
if prd.plrid == track_sequence.now.plr_id:
return QBrush(QColor(Config.COLOUR_CURRENT_PLAYLIST))
# Next track
if prd.plrid == track_sequence.next.plr_id:
return QBrush(QColor(Config.COLOUR_NEXT_PLAYLIST))
# Individual cell colouring
if column == Col.START_GAP.value:
if prd.start_gap and prd.start_gap >= Config.START_GAP_WARNING_THRESHOLD:
return QBrush(QColor(Config.COLOUR_LONG_START))
if column == Col.BITRATE.value:
if prd.bitrate < Config.BITRATE_LOW_THRESHOLD:
return QBrush(QColor(Config.COLOUR_BITRATE_LOW))
elif prd.bitrate and prd.bitrate < Config.BITRATE_OK_THRESHOLD:
return QBrush(QColor(Config.COLOUR_BITRATE_MEDIUM))
else:
return QBrush(QColor(Config.COLOUR_BITRATE_OK))
if column == Col.NOTE.value:
if prd.note:
with Session() as session:
note_colour = NoteColours.get_colour(session, prd.note)
if note_colour:
return QBrush(QColor(note_colour))
return QBrush()
def begin_reset_model(self, playlist_id: int) -> None:
"""
Reset model if playlist_id is ours
"""
if playlist_id != self.playlist_id:
return
super().beginResetModel()
def columnCount(self, parent: QModelIndex = QModelIndex()) -> int:
"""Standard function for view"""
return 9
def current_track_started(self) -> None:
"""
Notification from musicmuster that the current track has just
started playing
Actions required:
- sanity check
- change OBS scene if needed
- update display
- update track times
- update Playdates in database
- update PlaylistRows in database
- find next track
"""
row_number = track_sequence.now.plr_rownum
# Sanity check
if not track_sequence.now.track_id:
log.error(
"playlistmodel:current_track_started called with no current track"
)
return
if row_number is None:
log.error(
"playlistmodel:current_track_started called with no row number "
f"({track_sequence.now=})"
)
return
# Check for OBS scene change
self.obs_scene_change(row_number)
with Session() as session:
# Update Playdates in database
Playdates(session, track_sequence.now.track_id)
# Mark track as played in playlist
plr = session.get(PlaylistRows, track_sequence.now.plr_id)
if plr:
plr.played = True
self.refresh_row(session, plr.plr_rownum)
# Update track times
self.start_end_times[row_number].start_time = track_sequence.now.start_time
self.start_end_times[row_number].end_time = track_sequence.now.end_time
# Update colour and times for current row
self.invalidate_row(row_number)
# Update previous row in case we're hiding played rows
if track_sequence.previous.plr_rownum:
self.invalidate_row(track_sequence.previous.plr_rownum)
# Update all other track times
self.update_track_times()
# Find next track
# Get all unplayed track rows
next_row = None
unplayed_rows = self.get_unplayed_rows()
if unplayed_rows:
try:
# Find next row after current track
next_row = min([a for a in unplayed_rows if a > row_number])
except ValueError:
# Find first unplayed track
next_row = min(unplayed_rows)
if next_row is not None:
self.set_next_row(next_row)
def data(self, index: QModelIndex, role: int = Qt.ItemDataRole.DisplayRole):
"""Return data to view"""
if not index.isValid() or not (0 <= index.row() < len(self.playlist_rows)):
return QVariant()
row = index.row()
column = index.column()
# prd for playlist row data as it's used a lot
prd = self.playlist_rows[row]
# Dispatch to role-specific functions
dispatch_table = {
int(Qt.ItemDataRole.DisplayRole): self.display_role,
int(Qt.ItemDataRole.EditRole): self.edit_role,
int(Qt.ItemDataRole.FontRole): self.font_role,
int(Qt.ItemDataRole.BackgroundRole): self.background_role,
}
if role in dispatch_table:
return dispatch_table[role](row, column, prd)
# Document other roles but don't use them
if role in [
Qt.ItemDataRole.DecorationRole,
Qt.ItemDataRole.ToolTipRole,
Qt.ItemDataRole.StatusTipRole,
Qt.ItemDataRole.WhatsThisRole,
Qt.ItemDataRole.SizeHintRole,
Qt.ItemDataRole.TextAlignmentRole,
Qt.ItemDataRole.ForegroundRole,
Qt.ItemDataRole.CheckStateRole,
Qt.ItemDataRole.InitialSortOrderRole,
]:
return QVariant()
# Fall through to no-op
return QVariant()
def delete_rows(self, row_numbers: List[int]) -> None:
"""
Delete passed rows from model
Need to delete them in contiguous groups wrapped in beginRemoveRows / endRemoveRows
calls. To keep it simple, if inefficient, delete rows one by one.
"""
with Session() as session:
for row_number in row_numbers:
super().beginRemoveRows(QModelIndex(), row_number, row_number)
PlaylistRows.delete_row(session, self.playlist_id, row_number)
super().endRemoveRows()
PlaylistRows.fixup_rownumbers(session, self.playlist_id)
self.refresh_data(session)
self.reset_track_sequence_row_numbers()
def display_role(self, row: int, column: int, prd: PlaylistRowData) -> QVariant:
"""
Return text for display
"""
if self.is_header_row(row):
if column == HEADER_NOTES_COLUMN:
self.signals.span_cells_signal.emit(
self.playlist_id,
row,
HEADER_NOTES_COLUMN,
1,
self.columnCount() - 1,
)
header_text = self.header_text(prd)
if not header_text:
return QVariant(Config.TEXT_NO_TRACK_NO_NOTE)
else:
return QVariant(self.header_text(prd))
else:
return QVariant()
if column == Col.START_TIME.value:
if row in self.start_end_times:
start_time = self.start_end_times[row].start_time
if start_time:
return QVariant(start_time.strftime(Config.TRACK_TIME_FORMAT))
return QVariant()
if column == Col.END_TIME.value:
if row in self.start_end_times:
end_time = self.start_end_times[row].end_time
if end_time:
return QVariant(end_time.strftime(Config.TRACK_TIME_FORMAT))
return QVariant()
dispatch_table = {
Col.START_GAP.value: QVariant(prd.start_gap),
Col.TITLE.value: QVariant(prd.title),
Col.ARTIST.value: QVariant(prd.artist),
Col.DURATION.value: QVariant(ms_to_mmss(prd.duration)),
Col.LAST_PLAYED.value: QVariant(get_relative_date(prd.lastplayed)),
Col.BITRATE.value: QVariant(prd.bitrate),
Col.NOTE.value: QVariant(prd.note),
}
if column in dispatch_table:
return dispatch_table[column]
return QVariant()
def end_reset_model(self, playlist_id: int) -> None:
"""
End model reset if this is our playlist
"""
if playlist_id != self.playlist_id:
return
with Session() as session:
self.refresh_data(session)
super().endResetModel()
self.reset_track_sequence_row_numbers()
def edit_role(self, row: int, column: int, prd: PlaylistRowData) -> QVariant:
"""
Return text for editing
"""
# If this is a header row and we're being asked for the
# HEADER_NOTES_COLUMN, return the note value
if self.is_header_row(row) and column == HEADER_NOTES_COLUMN:
return QVariant(prd.note)
if column == Col.TITLE.value:
return QVariant(prd.title)
if column == Col.ARTIST.value:
return QVariant(prd.artist)
if column == Col.NOTE.value:
return QVariant(prd.note)
return QVariant()
def flags(self, index: QModelIndex) -> Qt.ItemFlag:
"""
Standard model flags
"""
if not index.isValid():
return Qt.ItemFlag.ItemIsDropEnabled
default = (
Qt.ItemFlag.ItemIsEnabled
| Qt.ItemFlag.ItemIsSelectable
| Qt.ItemFlag.ItemIsDragEnabled
)
if index.column() in [Col.TITLE.value, Col.ARTIST.value, Col.NOTE.value]:
return default | Qt.ItemFlag.ItemIsEditable
return default
def font_role(self, row: int, column: int, prd: PlaylistRowData) -> QVariant:
"""
Return font
"""
# Notes column is never bold
if column == Col.NOTE.value:
return QVariant()
boldfont = QFont()
boldfont.setBold(not self.playlist_rows[row].played)
return QVariant(boldfont)
def get_duplicate_rows(self) -> List[int]:
"""
Return a list of duplicate rows. If track appears in rows 2, 3 and 4, return [3, 4]
(ie, ignore the first, not-yet-duplicate, track).
"""
found = []
result = []
for i in range(len(self.playlist_rows)):
track_id = self.playlist_rows[i].track_id
if track_id is None:
continue
if track_id in found:
result.append(i)
else:
found.append(track_id)
return result
def _get_new_row_number(self, proposed_row_number: Optional[int]) -> int:
"""
Sanitises proposed new row number.
If proposed_row_number given, ensure it is valid.
If not given, return row number to add to end of model.
"""
if proposed_row_number is None or proposed_row_number > len(self.playlist_rows):
# We are adding to the end of the list
new_row_number = len(self.playlist_rows)
elif proposed_row_number < 0:
# Add to start of list
new_row_number = 0
else:
new_row_number = proposed_row_number
return new_row_number
def get_row_info(self, row_number: int) -> PlaylistRowData:
"""
Return info about passed row
"""
return self.playlist_rows[row_number]
def get_row_track_path(self, row_number: int) -> str:
"""
Return path of track associated with row or empty string if no track associated
"""
return self.playlist_rows[row_number].path
def get_rows_duration(self, row_numbers: List[int]) -> int:
"""
Return the total duration of the passed rows
"""
duration = 0
for row_number in row_numbers:
duration += self.playlist_rows[row_number].duration
return duration
def get_unplayed_rows(self) -> List[int]:
"""
Return a list of unplayed row numbers
"""
return [a.plr_rownum for a in self.playlist_rows.values() if not a.played]
def headerData(
self,
section: int,
orientation: Qt.Orientation,
role: int = Qt.ItemDataRole.DisplayRole,
) -> QVariant:
"""
Return text for headers
"""
if role == Qt.ItemDataRole.DisplayRole:
if orientation == Qt.Orientation.Horizontal:
if section == Col.START_GAP.value:
return QVariant(Config.HEADER_START_GAP)
elif section == Col.TITLE.value:
return QVariant(Config.HEADER_TITLE)
elif section == Col.ARTIST.value:
return QVariant(Config.HEADER_ARTIST)
elif section == Col.DURATION.value:
return QVariant(Config.HEADER_DURATION)
elif section == Col.START_TIME.value:
return QVariant(Config.HEADER_START_TIME)
elif section == Col.END_TIME.value:
return QVariant(Config.HEADER_END_TIME)
elif section == Col.LAST_PLAYED.value:
return QVariant(Config.HEADER_LAST_PLAYED)
elif section == Col.BITRATE.value:
return QVariant(Config.HEADER_BITRATE)
elif section == Col.NOTE.value:
return QVariant(Config.HEADER_NOTE)
else:
if Config.ROWS_FROM_ZERO:
return QVariant(str(section))
else:
return QVariant(str(section + 1))
elif role == Qt.ItemDataRole.FontRole:
boldfont = QFont()
boldfont.setBold(True)
return QVariant(boldfont)
return QVariant()
def header_text(self, prd: PlaylistRowData) -> str:
"""
Process possible section timing directives embeded in header
"""
count: int = 0
unplayed_count: int = 0
duration: int = 0
if prd.note.endswith("+"):
# This header is the start of a timed section
for row_number in range(prd.plr_rownum + 1, len(self.playlist_rows)):
row_prd = self.playlist_rows[row_number]
if self.is_header_row(row_number):
if row_prd.note.endswith("-"):
return (
f"{prd.note[:-1].strip()} "
f"[{count} tracks, {ms_to_mmss(duration)} unplayed]"
)
else:
continue
else:
count += 1
if not row_prd.played:
unplayed_count += 1
duration += row_prd.duration
return (
f"{prd.note[:-1].strip()} "
f"[{count} tracks, {ms_to_mmss(duration, none='none')} "
"unplayed (to end of playlist)]"
)
elif prd.note.endswith("="):
# Show subtotal
for row_number in range(prd.plr_rownum - 1, -1, -1):
row_prd = self.playlist_rows[row_number]
if self.is_header_row(row_number):
if row_prd.note.endswith("-"):
# There was no start of section
return prd.note
if row_prd.note.endswith(("+", "=")):
# If we are playing this section, also
# calculate end time if all tracks are played.
end_time_str = ""
if (
track_sequence.now.plr_rownum
and track_sequence.now.end_time
and (
row_number
< track_sequence.now.plr_rownum
< prd.plr_rownum
)
):
section_end_time = track_sequence.now.end_time + timedelta(
milliseconds=duration
)
end_time_str = (
", section end time "
+ section_end_time.strftime(Config.TRACK_TIME_FORMAT)
)
stripped_note = prd.note[:-1].strip()
if stripped_note:
return (
f"{stripped_note} ["
f"{unplayed_count}/{count} track{'s' if count > 1 else ''} "
f"({ms_to_mmss(duration)}) unplayed{end_time_str}]"
)
else:
return (
f"[{unplayed_count}/{count} track{'s' if count > 1 else ''} "
f"({ms_to_mmss(duration)}) unplayed{end_time_str}]"
)
else:
continue
else:
count += 1
if not row_prd.played:
unplayed_count += 1
duration += row_prd.duration
elif prd.note == "-":
# If the hyphen is the only thing on the line, echo the note
# tha started the section without the trailing "+".
for row_number in range(prd.plr_rownum - 1, -1, -1):
row_prd = self.playlist_rows[row_number]
if self.is_header_row(row_number):
if row_prd.note.endswith("-"):
# We didn't find a matching section start
break
if row_prd.note.endswith("+"):
return f"[End: {row_prd.note[:-1]}]"
return "-"
return prd.note
def hide_played_tracks(self, hide: bool) -> None:
"""
Set played tracks hidden according to 'hide'
"""
self.played_tracks_hidden = hide
for row_number in range(len(self.playlist_rows)):
if self.is_played_row(row_number):
self.invalidate_row(row_number)
def insert_row(
self,
proposed_row_number: Optional[int],
track_id: Optional[int] = None,
note: Optional[str] = None,
) -> None:
"""
Insert a row.
"""
new_row_number = self._get_new_row_number(proposed_row_number)
with Session() as session:
super().beginInsertRows(QModelIndex(), new_row_number, new_row_number)
plr = PlaylistRows.insert_row(session, self.playlist_id, new_row_number)
plr.track_id = track_id
if note:
plr.note = note
self.refresh_data(session)
super().endInsertRows()
self.reset_track_sequence_row_numbers()
self.invalidate_rows(list(range(new_row_number, len(self.playlist_rows))))
def invalidate_row(self, modified_row: int) -> None:
"""
Signal to view to refresh invlidated row
"""
self.dataChanged.emit(
self.index(modified_row, 0),
self.index(modified_row, self.columnCount() - 1),
)
def invalidate_rows(self, modified_rows: List[int]) -> None:
"""
Signal to view to refresh invlidated rows
"""
for modified_row in modified_rows:
self.invalidate_row(modified_row)
def is_header_row(self, row_number: int) -> bool:
"""
Return True if row is a header row, else False
"""
if row_number in self.playlist_rows:
return self.playlist_rows[row_number].path == ""
return False
def is_played_row(self, row_number: int) -> bool:
"""
Return True if row is an unplayed track row, else False
"""
return self.playlist_rows[row_number].played
def is_track_in_playlist(self, track_id: int) -> Optional[PlaylistRowData]:
"""
If this track_id is in the playlist, return the PlaylistRowData object
else return None
"""
for row_number in range(len(self.playlist_rows)):
if self.playlist_rows[row_number].track_id == track_id:
return self.playlist_rows[row_number]
return None
def move_rows(self, from_rows: List[int], to_row_number: int) -> None:
"""
Move the playlist rows given to to_row and below.
"""
# Build a {current_row_number: new_row_number} dictionary
row_map: dict[int, int] = {}
# Put the from_row row numbers into the row_map. Ultimately the
# total number of elements in the playlist doesn't change, so
# check that adding the moved rows starting at to_row won't
# overshoot the end of the playlist.
if to_row_number + len(from_rows) > len(self.playlist_rows):
next_to_row = len(self.playlist_rows) - len(from_rows)
else:
next_to_row = to_row_number
for from_row, to_row in zip(
from_rows, range(next_to_row, next_to_row + len(from_rows))
):
row_map[from_row] = to_row
# Move the remaining rows to the row_map. We want to fill it
# before (if there are gaps) and after (likewise) the rows that
# are moving.
# This iterates old_row and new_row simultaneously.
for old_row, new_row in zip(
[x for x in self.playlist_rows.keys() if x not in from_rows],
[y for y in range(len(self.playlist_rows)) if y not in row_map.values()],
):
# Optimise: only add to map if there is a change
if old_row != new_row:
row_map[old_row] = new_row
# Reset any header rows that we're moving
for moving_row in row_map:
if self.is_header_row(moving_row):
# Reset column span
self.signals.span_cells_signal.emit(
self.playlist_id, moving_row, HEADER_NOTES_COLUMN, 1, 1
)
# Check to see whether any rows in track_sequence have moved
if track_sequence.previous.plr_rownum in row_map:
track_sequence.previous.plr_rownum = row_map[
track_sequence.previous.plr_rownum
]
if track_sequence.now.plr_rownum in row_map:
track_sequence.now.plr_rownum = row_map[track_sequence.now.plr_rownum]
if track_sequence.next.plr_rownum in row_map:
track_sequence.next.plr_rownum = row_map[track_sequence.next.plr_rownum]
# For SQLAlchemy, build a list of dictionaries that map plrid to
# new row number:
sqla_map: List[dict[str, int]] = []
for oldrow, newrow in row_map.items():
plrid = self.playlist_rows[oldrow].plrid
sqla_map.append({"plrid": plrid, "plr_rownum": newrow})
with Session() as session:
PlaylistRows.update_plr_rownumbers(session, self.playlist_id, sqla_map)
# Update playlist_rows
self.refresh_data(session)
# Update display
self.reset_track_sequence_row_numbers()
self.invalidate_rows(list(row_map.keys()))
def mark_unplayed(self, row_numbers: List[int]) -> None:
"""
Mark row as unplayed
"""
with Session() as session:
for row_number in row_numbers:
plr = session.get(PlaylistRows, self.playlist_rows[row_number].plrid)
if not plr:
return
plr.played = False
self.refresh_row(session, row_number)
self.invalidate_rows(row_numbers)
def move_rows_between_playlists(
self, from_rows: List[int], to_row_number: int, to_playlist_id: int
) -> None:
"""
Move the playlist rows given to to_row and below of to_playlist.
"""
# Row removal must be wrapped in beginRemoveRows ..
# endRemoveRows and the row range must be contiguous. Process
# the highest rows first so the lower row numbers are unchanged
row_groups = self._reversed_contiguous_row_groups(from_rows)
next_to_row = to_row_number
# Prepare destination playlist for a reset
self.signals.begin_reset_model_signal.emit(to_playlist_id)
with Session() as session:
# Make room in destination playlist
max_destination_row_number = PlaylistRows.get_last_used_row(
session, to_playlist_id
)
if (
max_destination_row_number
and to_row_number <= max_destination_row_number
):
PlaylistRows.move_rows_down(
session, to_playlist_id, to_row_number, len(from_rows)
)
for row_group in row_groups:
super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group))
for plr in PlaylistRows.plrids_to_plrs(
session,
self.playlist_id,
[self.playlist_rows[a].plrid for a in row_group],
):
if plr.id == track_sequence.now.plr_id:
# Don't move current track
continue
plr.playlist_id = to_playlist_id
plr.plr_rownum = next_to_row
next_to_row += 1
self.refresh_data(session)
super().endRemoveRows()
# We need to remove gaps in row numbers after tracks have
# moved.
PlaylistRows.fixup_rownumbers(session, self.playlist_id)
self.refresh_data(session)
# Reset of model must come after session has been closed
self.reset_track_sequence_row_numbers()
self.signals.row_order_changed_signal.emit(to_playlist_id)
self.signals.end_reset_model_signal.emit(to_playlist_id)
self.update_track_times()
def move_track_add_note(
self, new_row_number: int, existing_prd: PlaylistRowData, note: str
) -> None:
"""
Move existing_prd track to new_row_number and append note to any existing note
"""
if note:
with Session() as session:
plr = session.get(PlaylistRows, existing_prd.plrid)
if plr:
if plr.note:
plr.note += "\n" + note
else:
plr.note = note
# Carry out the move outside of the session context to ensure
# database updated with any note change
self.move_rows([existing_prd.plr_rownum], new_row_number)
self.signals.resize_rows_signal.emit(self.playlist_id)
def move_track_to_header(
self, header_row_number: int, existing_prd: PlaylistRowData, note: Optional[str]
) -> None:
"""
Add the existing_prd track details to the existing header at header_row_number
"""
if existing_prd.track_id:
if note and existing_prd.note:
note += "\n" + existing_prd.note
self.add_track_to_header(header_row_number, existing_prd.track_id, note)
self.delete_rows([existing_prd.plr_rownum])
def obs_scene_change(self, row_number: int) -> None:
"""
Check this row and any preceding headers for OBS scene change command
and execute any found
"""
# Check any headers before this row
idx = row_number - 1
while self.is_header_row(idx):
idx -= 1
# Step through headers in row order and finish with this row
for chkrow in range(idx + 1, row_number + 1):
match_obj = scene_change_re.search(self.playlist_rows[chkrow].note)
if match_obj:
scene_name = match_obj.group(1)
if scene_name:
try:
cl = obs.ReqClient(
host=Config.OBS_HOST,
port=Config.OBS_PORT,
password=Config.OBS_PASSWORD,
)
except ConnectionRefusedError:
log.error("OBS connection refused")
return
try:
cl.set_current_program_scene(scene_name)
log.info(f"OBS scene changed to '{scene_name}'")
continue
except obs.error.OBSSDKError as e:
log.error(f"OBS SDK error ({e})")
return
def open_in_audacity(self, row_number: int) -> None:
"""
Open track at passed row number in Audacity
"""
path = self.playlist_rows[row_number].path
if path:
open_in_audacity(path)
def previous_track_ended(self) -> None:
"""
Notification from musicmuster that the previous track has ended.
Actions required:
- sanity check
- update display
"""
# Sanity check
if not track_sequence.previous.track_id:
log.error("playlistmodel:previous_track_ended called with no current track")
return
if track_sequence.previous.plr_rownum is None:
log.error(
"playlistmodel:previous_track_ended called with no row number "
f"({track_sequence.previous=})"
)
return
# Update display
self.invalidate_row(track_sequence.previous.plr_rownum)
def refresh_data(self, session: scoped_session):
"""Populate dicts for data calls"""
# Populate self.playlist_rows with playlist data
self.playlist_rows.clear()
for p in PlaylistRows.deep_rows(session, self.playlist_id):
self.playlist_rows[p.plr_rownum] = PlaylistRowData(p)
def refresh_row(self, session, row_number):
"""Populate dict for one row from database"""
p = PlaylistRows.deep_row(session, self.playlist_id, row_number)
self.playlist_rows[row_number] = PlaylistRowData(p)
def remove_track(self, row_number: int) -> None:
"""
Remove track from row, retaining row as a header row
"""
with Session() as session:
plr = session.get(PlaylistRows, self.playlist_rows[row_number].plrid)
if plr:
plr.track_id = None
self.refresh_row(session, row_number)
self.invalidate_row(row_number)
def rescan_track(self, row_number: int) -> None:
"""
Rescan track at passed row number
"""
track_id = self.playlist_rows[row_number].track_id
if track_id:
with Session() as session:
track = session.get(Tracks, track_id)
set_track_metadata(track)
self.refresh_row(session, row_number)
self.invalidate_row(row_number)
self.signals.resize_rows_signal.emit(self.playlist_id)
def reset_track_sequence_row_numbers(self) -> None:
"""
Signal handler for when row ordering has changed
"""
# Check the track_sequence next, now and previous plrs and
# update the row number
with Session() as session:
if track_sequence.next.plr_rownum:
next_plr = session.get(PlaylistRows, track_sequence.next.plr_id)
if next_plr:
track_sequence.next.plr_rownum = next_plr.plr_rownum
if track_sequence.now.plr_rownum:
now_plr = session.get(PlaylistRows, track_sequence.now.plr_id)
if now_plr:
track_sequence.now.plr_rownum = now_plr.plr_rownum
if track_sequence.previous.plr_rownum:
previous_plr = session.get(PlaylistRows, track_sequence.previous.plr_id)
if previous_plr:
track_sequence.previous.plr_rownum = previous_plr.plr_rownum
self.update_track_times()
def _reversed_contiguous_row_groups(
self, row_numbers: List[int]
) -> List[List[int]]:
"""
Take the list of row numbers and split into groups of contiguous rows. Return as a list
of lists with the highest row numbers first.
Example:
input [2, 3, 4, 5, 7, 9, 10, 13, 17, 20, 21]
return: [[20, 21], [17], [13], [9, 10], [7], [2, 3, 4, 5]]
"""
result: List[List[int]] = []
temp: List[int] = []
last_value = row_numbers[0] - 1
for idx in range(len(row_numbers)):
if row_numbers[idx] != last_value + 1:
result.append(temp)
temp = []
last_value = row_numbers[idx]
temp.append(last_value)
if temp:
result.append(temp)
result.reverse()
return result
def rowCount(self, index: QModelIndex = QModelIndex()) -> int:
"""Standard function for view"""
return len(self.playlist_rows)
def row_order_changed(self, playlist_id: int) -> None:
"""
Signal handler for when row ordering has changed
"""
# Only action if this is for us
if playlist_id != self.playlist_id:
return
self.reset_track_sequence_row_numbers()
def selection_is_sortable(self, row_numbers: List[int]) -> bool:
"""
Return True if the selection is sortable. That means:
- at least two rows selected
- selected rows are contiguous
- selected rows do not include any header rows
"""
# at least two rows selected
if len(row_numbers) < 2:
return False
# selected rows are contiguous
if sorted(row_numbers) != list(range(min(row_numbers), max(row_numbers) + 1)):
return False
# selected rows do not include any header rows
for row_number in row_numbers:
if self.is_header_row(row_number):
return False
return True
def set_next_row(self, row_number: Optional[int]) -> None:
"""
Set row_number as next track. If row_number is None, clear next track.
"""
next_row_was = track_sequence.next.plr_rownum
if row_number is None:
if next_row_was is None:
return
track_sequence.next = PlaylistTrack()
self.signals.next_track_changed_signal.emit()
return
# Update playing_track
with Session() as session:
track_sequence.next = PlaylistTrack()
try:
plrid = self.playlist_rows[row_number].plrid
except IndexError:
log.error(
f"playlistmodel.set_next_track({row_number=}, "
f"{self.playlist_id=}"
)
return
plr = session.get(PlaylistRows, plrid)
if plr:
# Check this isn't a header row
if self.is_header_row(row_number):
return
# Check track is readable
if file_is_unreadable(plr.track.path):
return
track_sequence.next.set_plr(session, plr)
self.signals.next_track_changed_signal.emit()
self.signals.search_wikipedia_signal.emit(
self.playlist_rows[row_number].title
)
self.invalidate_row(row_number)
if next_row_was is not None:
self.invalidate_row(next_row_was)
self.update_track_times()
def setData(
self, index: QModelIndex, value: QVariant, role: int = Qt.ItemDataRole.EditRole
) -> bool:
"""
Update model with edited data
"""
if index.isValid() and role == Qt.ItemDataRole.EditRole:
row_number = index.row()
column = index.column()
with Session() as session:
plr = session.get(PlaylistRows, self.playlist_rows[row_number].plrid)
if not plr:
print(
f"Error saving data: {row_number=}, {column=}, "
f"{value=}, {self.playlist_id=}"
)
return False
if plr.track_id:
if column == Col.TITLE.value or column == Col.ARTIST.value:
track = session.get(Tracks, plr.track_id)
if not track:
print(f"Error retreiving track: {plr=}")
return False
if column == Col.TITLE.value:
track.title = str(value)
elif column == Col.ARTIST.value:
track.artist = str(value)
else:
print(f"Error updating track: {column=}, {value=}")
return False
elif column == Col.NOTE.value:
plr.note = str(value)
else:
# This is a header row
if column == HEADER_NOTES_COLUMN:
plr.note = str(value)
# Flush changes before refreshing data
session.flush()
self.refresh_row(session, row_number)
self.dataChanged.emit(index, index, [Qt.ItemDataRole.DisplayRole, role])
return True
return False
def sort_by_artist(self, row_numbers: List[int]) -> None:
"""
Sort selected rows by artist
"""
self.sort_by_attribute(row_numbers, "artist")
def sort_by_attribute(self, row_numbers: List[int], attr_name: str) -> None:
"""
Sort selected rows by passed attribute name where 'attribute' is a
key in PlaylistRowData
"""
# Create a subset of playlist_rows with the rows we are
# interested in
shortlist_rows = {k: self.playlist_rows[k] for k in row_numbers}
sorted_list = [
plr.plr_rownum
for plr in sorted(shortlist_rows.values(), key=attrgetter(attr_name))
]
self.move_rows(sorted_list, min(sorted_list))
def sort_by_duration(self, row_numbers: List[int]) -> None:
"""
Sort selected rows by duration
"""
self.sort_by_attribute(row_numbers, "duration")
def sort_by_lastplayed(self, row_numbers: List[int]) -> None:
"""
Sort selected rows by lastplayed
"""
self.sort_by_attribute(row_numbers, "lastplayed")
def sort_by_title(self, row_numbers: List[int]) -> None:
"""
Sort selected rows by title
"""
self.sort_by_attribute(row_numbers, "title")
def supportedDropActions(self) -> Qt.DropAction:
return Qt.DropAction.MoveAction | Qt.DropAction.CopyAction
def update_track_times(self) -> None:
"""
Update track start/end times in self.playlist_rows
"""
next_start_time: Optional[datetime] = None
update_rows: List[int] = []
for row_number in range(len(self.playlist_rows)):
stend = self.start_end_times[row_number] = StartEndTimes()
prd = self.playlist_rows[row_number]
# Reset start_time if this is the current row
if row_number == track_sequence.now.plr_rownum:
stend.start_time = track_sequence.now.start_time
stend.end_time = track_sequence.now.end_time
if not next_start_time:
next_start_time = stend.end_time
continue
# Set start time for next row if we have a current track
if (
row_number == track_sequence.next.plr_rownum
and track_sequence.now.end_time
):
stend.start_time = track_sequence.now.end_time
stend.end_time = stend.start_time + timedelta(milliseconds=prd.duration)
next_start_time = stend.end_time
update_rows.append(row_number)
continue
# Don't update times for tracks that have been played
if prd.played:
continue
# If we're between the current and next row, zero out
# times
if (
track_sequence.now.plr_rownum is not None
and track_sequence.next.plr_rownum is not None
and track_sequence.now.plr_rownum
< row_number
< track_sequence.next.plr_rownum
):
update_rows.append(row_number)
continue
# Reset start time if timing in header or at current track
if self.is_header_row(row_number):
header_time = get_embedded_time(prd.note)
if header_time:
next_start_time = header_time
else:
# This is an unplayed track
# Don't schedule unplayable tracks
if file_is_unreadable(prd.path):
continue
# Set start/end if we have a start time
if next_start_time is None:
continue
if stend.start_time != next_start_time:
stend.start_time = next_start_time
update_rows.append(row_number)
next_start_time += timedelta(
milliseconds=self.playlist_rows[row_number].duration
)
if stend.end_time != next_start_time:
stend.end_time = next_start_time
update_rows.append(row_number)
# Update start/stop times of rows that have changed
for updated_row in update_rows:
self.dataChanged.emit(
self.index(updated_row, Col.START_TIME.value),
self.index(updated_row, Col.END_TIME.value),
)
class PlaylistProxyModel(QSortFilterProxyModel):
"""
For searching and filtering
"""
def __init__(
self,
data_model: PlaylistModel,
*args,
**kwargs,
):
self.data_model = data_model
super().__init__(*args, **kwargs)
self.setSourceModel(data_model)
# Search all columns
self.setFilterKeyColumn(-1)
def filterAcceptsRow(self, source_row: int, source_parent: QModelIndex) -> bool:
"""
Subclass to filter by played status
"""
if self.data_model.played_tracks_hidden:
if self.data_model.is_played_row(source_row):
# Don't hide current or next track
with Session() as session:
if track_sequence.next.plr_id:
next_plr = session.get(PlaylistRows, track_sequence.next.plr_id)
if (
next_plr
and next_plr.plr_rownum == source_row
and next_plr.playlist_id == self.data_model.playlist_id
):
return True
if track_sequence.now.plr_id:
now_plr = session.get(PlaylistRows, track_sequence.now.plr_id)
if (
now_plr
and now_plr.plr_rownum == source_row
and now_plr.playlist_id == self.data_model.playlist_id
):
return True
# Don't hide previous track until
# HIDE_AFTER_PLAYING_OFFSET milliseconds after
# current track has started
if track_sequence.previous.plr_id:
previous_plr = session.get(
PlaylistRows, track_sequence.previous.plr_id
)
if (
previous_plr
and previous_plr.plr_rownum == source_row
and previous_plr.playlist_id == self.data_model.playlist_id
):
if track_sequence.now.start_time:
if datetime.now() > (
track_sequence.now.start_time
+ timedelta(
milliseconds=Config.HIDE_AFTER_PLAYING_OFFSET
)
):
return False
else:
# Invalidate this row in
# HIDE_AFTER_PLAYING_OFFSET and a
# bit milliseconds
# so that it hides then - add 100mS
# on so that it if clause above it
# true next time through.
QTimer.singleShot(
Config.HIDE_AFTER_PLAYING_OFFSET + 100,
lambda: self.data_model.invalidate_row(
source_row
),
)
return True
else:
return True
return False
return super().filterAcceptsRow(source_row, source_parent)
def set_incremental_search(self, search_string: str) -> None:
"""
Update search pattern
"""
self.setFilterRegularExpression(
QRegularExpression(
search_string, QRegularExpression.PatternOption.CaseInsensitiveOption
)
)
# ######################################
# Forward functions not handled in proxy
# ######################################
def current_track_started(self):
return self.data_model.current_track_started()
def delete_rows(self, row_numbers: List[int]) -> None:
return self.data_model.delete_rows(row_numbers)
def get_duplicate_rows(self) -> List[int]:
return self.data_model.get_duplicate_rows()
def get_rows_duration(self, row_numbers: List[int]) -> int:
return self.data_model.get_rows_duration(row_numbers)
def get_row_info(self, row_number: int) -> PlaylistRowData:
return self.data_model.get_row_info(row_number)
def get_row_track_path(self, row_number: int) -> str:
return self.data_model.get_row_track_path(row_number)
def hide_played_tracks(self, hide: bool) -> None:
return self.data_model.hide_played_tracks(hide)
def insert_row(
self,
proposed_row_number: Optional[int],
track_id: Optional[int] = None,
note: Optional[str] = None,
) -> None:
return self.data_model.insert_row(proposed_row_number, track_id, note)
def is_header_row(self, row_number: int) -> bool:
return self.data_model.is_header_row(row_number)
def is_played_row(self, row_number: int) -> bool:
return self.data_model.is_played_row(row_number)
def is_track_in_playlist(self, track_id: int) -> Optional[PlaylistRowData]:
return self.data_model.is_track_in_playlist(track_id)
def mark_unplayed(self, row_numbers: List[int]) -> None:
return self.data_model.mark_unplayed(row_numbers)
def move_rows(self, from_rows: List[int], to_row_number: int) -> None:
return self.data_model.move_rows(from_rows, to_row_number)
def move_rows_between_playlists(
self, from_rows: List[int], to_row_number: int, to_playlist_id: int
) -> None:
return self.data_model.move_rows_between_playlists(
from_rows, to_row_number, to_playlist_id
)
def move_track_add_note(
self, new_row_number: int, existing_prd: PlaylistRowData, note: str
) -> None:
return self.data_model.move_track_add_note(new_row_number, existing_prd, note)
def move_track_to_header(
self, header_row_number: int, existing_prd: PlaylistRowData, note: Optional[str]
) -> None:
return self.data_model.move_track_to_header(
header_row_number, existing_prd, note
)
def open_in_audacity(self, row_number: int) -> None:
return self.data_model.open_in_audacity(row_number)
def previous_track_ended(self) -> None:
return self.data_model.previous_track_ended()
def remove_track(self, row_number: int) -> None:
return self.data_model.remove_track(row_number)
def rescan_track(self, row_number: int) -> None:
return self.data_model.rescan_track(row_number)
def set_next_row(self, row_number: Optional[int]) -> None:
return self.data_model.set_next_row(row_number)
def sort_by_artist(self, row_numbers: List[int]) -> None:
return self.data_model.sort_by_artist(row_numbers)
def sort_by_duration(self, row_numbers: List[int]) -> None:
return self.data_model.sort_by_duration(row_numbers)
def sort_by_lastplayed(self, row_numbers: List[int]) -> None:
return self.data_model.sort_by_lastplayed(row_numbers)
def sort_by_title(self, row_numbers: List[int]) -> None:
return self.data_model.sort_by_title(row_numbers)
def update_track_times(self) -> None:
return self.data_model.update_track_times()