# Standard library imports # Allow forward reference to PlaylistModel from __future__ import annotations from operator import attrgetter from random import shuffle from typing import Optional import datetime as dt import re # PyQt imports from PyQt6.QtCore import ( QAbstractTableModel, QModelIndex, QObject, QRegularExpression, QSortFilterProxyModel, Qt, QTimer, QVariant, ) from PyQt6.QtGui import ( QBrush, QColor, QFont, ) # Third party imports import obswebsocket # type: ignore # import snoop # type: ignore # App imports from classes import ( Col, MusicMusterSignals, RowAndTrack, track_sequence, ) from config import Config from helpers import ( file_is_unreadable, get_embedded_time, get_relative_date, ms_to_mmss, set_track_metadata, ) from log import log from models import db, NoteColours, Playdates, PlaylistRows, Tracks HEADER_NOTES_COLUMN = 1 scene_change_re = re.compile(r"SetScene=\[([^[\]]*)\]") class PlaylistModel(QAbstractTableModel): """ The Playlist Model Update strategy: update the database and then refresh the row-indexed cached copy (self.playlist_rows). Do not edit self.playlist_rows directly because keeping it and the database in sync is uncessarily challenging. refresh_row() will populate one row of playlist_rows from the database refresh_data() will repopulate all of playlist_rows from the database """ def __init__( self, playlist_id: int, *args: Optional[QObject], **kwargs: Optional[QObject], ) -> None: log.debug("PlaylistModel.__init__()") self.playlist_id = playlist_id super().__init__(*args, **kwargs) self.playlist_rows: dict[int, RowAndTrack] = {} self.signals = MusicMusterSignals() self.played_tracks_hidden = False self.signals.begin_reset_model_signal.connect(self.begin_reset_model) self.signals.end_reset_model_signal.connect(self.end_reset_model) with db.Session() as session: # Ensure row numbers in playlist are contiguous PlaylistRows.fixup_rownumbers(session, playlist_id) # Populate self.playlist_rows self.refresh_data(session) self.update_track_times() def __repr__(self) -> str: return ( f"" ) def add_track_to_header( self, row_number: int, track_id: int, note: Optional[str] = None ) -> None: """ Add track to existing header row """ log.debug(f"{self}: add_track_to_header({row_number=}, {track_id=}, {note=}") # Get existing row try: rat = self.playlist_rows[row_number] except KeyError: log.error( f"{self}: KeyError in add_track_to_header ({row_number=}, {track_id=})" ) return if rat.path: log.error( f"{self}: Header row already has track associated ({rat=}, {track_id=})" ) return with db.Session() as session: playlistrow = session.get(PlaylistRows, rat.playlistrow_id) if playlistrow: # Add track to PlaylistRows playlistrow.track_id = track_id # Add any further note (header will already have a note) if note: playlistrow.note += "\n" + note # Update local copy self.refresh_row(session, row_number) # Repaint row self.invalidate_row(row_number) session.commit() self.signals.resize_rows_signal.emit(self.playlist_id) def background_role(self, row: int, column: int, rat: RowAndTrack) -> QBrush: """Return background setting""" # Handle entire row colouring # Header row if self.is_header_row(row): # Check for specific header colouring with db.Session() as session: note_colour = NoteColours.get_colour(session, rat.note) if note_colour: return QBrush(QColor(note_colour)) else: return QBrush(QColor(Config.COLOUR_NOTES_PLAYLIST)) # Unreadable track file if file_is_unreadable(rat.path): return QBrush(QColor(Config.COLOUR_UNREADABLE)) # Current track if track_sequence.current and track_sequence.current.row_number == row: return QBrush(QColor(Config.COLOUR_CURRENT_PLAYLIST)) # Next track if track_sequence.next and track_sequence.next.row_number == row: return QBrush(QColor(Config.COLOUR_NEXT_PLAYLIST)) # Individual cell colouring if column == Col.START_GAP.value: if rat.start_gap and rat.start_gap >= Config.START_GAP_WARNING_THRESHOLD: return QBrush(QColor(Config.COLOUR_LONG_START)) if column == Col.BITRATE.value: if not rat.bitrate or rat.bitrate < Config.BITRATE_LOW_THRESHOLD: return QBrush(QColor(Config.COLOUR_BITRATE_LOW)) elif rat.bitrate < Config.BITRATE_OK_THRESHOLD: return QBrush(QColor(Config.COLOUR_BITRATE_MEDIUM)) else: return QBrush(QColor(Config.COLOUR_BITRATE_OK)) if column == Col.NOTE.value: if rat.note: with db.Session() as session: note_colour = NoteColours.get_colour(session, rat.note) if note_colour: return QBrush(QColor(note_colour)) return QBrush() def begin_reset_model(self, playlist_id: int) -> None: """ Reset model if playlist_id is ours """ if playlist_id != self.playlist_id: return super().beginResetModel() def columnCount(self, parent: QModelIndex = QModelIndex()) -> int: """Standard function for view""" return len(Col) def current_track_started(self) -> None: """ Notification from musicmuster that the current track has just started playing Actions required: - sanity check - change OBS scene if needed - update Playdates in database - update PlaylistRows in database - update display - find next track - update track times """ if not track_sequence.current: return row_number = track_sequence.current.row_number # Check for OBS scene change log.debug(f"{self}: Call OBS scene change") self.obs_scene_change(row_number) # Sanity check that we have a track_id if not track_sequence.current.track_id: log.error( f"{self}: current_track_started() called with {track_sequence.current.track_id=}" ) return with db.Session() as session: # Update Playdates in database log.debug(f"{self}: update playdates") Playdates(session, track_sequence.current.track_id) # Mark track as played in playlist log.debug(f"{self}: Mark track as played") plr = session.get(PlaylistRows, track_sequence.current.playlistrow_id) if plr: plr.played = True self.refresh_row(session, plr.row_number) else: log.error( f"{self}: Can't retrieve plr, {track_sequence.current.playlistrow_id=}" ) # Update colour and times for current row self.invalidate_row(row_number) # Update previous row in case we're hiding played rows if track_sequence.previous and track_sequence.previous.row_number: self.invalidate_row(track_sequence.previous.row_number) # Update all other track times self.update_track_times() # Find next track # Get all unplayed track rows log.debug(f"{self}: Find next track") next_row = None unplayed_rows = self.get_unplayed_rows() if unplayed_rows: try: # Find next row after current track next_row = min( [ a for a in unplayed_rows if a > row_number and not self.is_header_row(a) ] ) except ValueError: # Find first unplayed track next_row = min(unplayed_rows) if next_row is not None: self.set_next_row(next_row) session.commit() def data( self, index: QModelIndex, role: int = Qt.ItemDataRole.DisplayRole ) -> QVariant: """Return data to view""" if not index.isValid() or not (0 <= index.row() < len(self.playlist_rows)): return QVariant() row = index.row() column = index.column() # rat for playlist row data as it's used a lot rat = self.playlist_rows[row] # Dispatch to role-specific functions dispatch_table = { int(Qt.ItemDataRole.BackgroundRole): self.background_role, int(Qt.ItemDataRole.DisplayRole): self.display_role, int(Qt.ItemDataRole.EditRole): self.edit_role, int(Qt.ItemDataRole.FontRole): self.font_role, int(Qt.ItemDataRole.ToolTipRole): self.tooltip_role, } if role in dispatch_table: return QVariant(dispatch_table[role](row, column, rat)) # Document other roles but don't use them if role in [ Qt.ItemDataRole.DecorationRole, Qt.ItemDataRole.StatusTipRole, Qt.ItemDataRole.WhatsThisRole, Qt.ItemDataRole.SizeHintRole, Qt.ItemDataRole.TextAlignmentRole, Qt.ItemDataRole.ForegroundRole, Qt.ItemDataRole.CheckStateRole, Qt.ItemDataRole.InitialSortOrderRole, ]: return QVariant() # Fall through to no-op return QVariant() def delete_rows(self, row_numbers: list[int]) -> None: """ Delete passed rows from model Need to delete them in contiguous groups wrapped in beginRemoveRows / endRemoveRows calls. To keep it simple, if inefficient, delete rows one by one. TODO: delete in blocks Delete from highest row back so that not yet deleted row numbers don't change. """ with db.Session() as session: for row_number in sorted(row_numbers, reverse=True): log.debug(f"{self}: delete_rows(), {row_number=}") super().beginRemoveRows(QModelIndex(), row_number, row_number) # We need to remove data from the underlying data store, # which is the database, but we cache in # self.playlist_rows, which is what calls to data() # reads, so fixup that too. PlaylistRows.delete_row(session, self.playlist_id, row_number) PlaylistRows.fixup_rownumbers(session, self.playlist_id) self.refresh_data(session) session.commit() super().endRemoveRows() self.reset_track_sequence_row_numbers() def display_role(self, row: int, column: int, rat: RowAndTrack) -> QVariant: """ Return text for display """ header_row = self.is_header_row(row) # Set / reset column span if column == HEADER_NOTES_COLUMN: column_span = 1 if header_row: column_span = self.columnCount() - 1 self.signals.span_cells_signal.emit( self.playlist_id, row, HEADER_NOTES_COLUMN, 1, column_span ) if header_row: if column == HEADER_NOTES_COLUMN: header_text = self.header_text(rat) if not header_text: return QVariant(Config.TEXT_NO_TRACK_NO_NOTE) else: return QVariant(self.header_text(rat)) else: return QVariant() if column == Col.START_TIME.value: start_time = rat.forecast_start_time if start_time: return QVariant(start_time.strftime(Config.TRACK_TIME_FORMAT)) return QVariant() if column == Col.END_TIME.value: end_time = rat.forecast_end_time if end_time: return QVariant(end_time.strftime(Config.TRACK_TIME_FORMAT)) return QVariant() if column == Col.INTRO.value: if rat.intro: return QVariant(f"{rat.intro / 1000:{Config.INTRO_SECONDS_FORMAT}}") else: return QVariant() dispatch_table = { Col.ARTIST.value: QVariant(rat.artist), Col.BITRATE.value: QVariant(rat.bitrate), Col.DURATION.value: QVariant(ms_to_mmss(rat.duration)), Col.LAST_PLAYED.value: QVariant(get_relative_date(rat.lastplayed)), Col.NOTE.value: QVariant(rat.note), Col.START_GAP.value: QVariant(rat.start_gap), Col.TITLE.value: QVariant(rat.title), } if column in dispatch_table: return dispatch_table[column] return QVariant() def end_reset_model(self, playlist_id: int) -> None: """ End model reset if this is our playlist """ log.debug(f"{self}: end_reset_model({playlist_id=})") if playlist_id != self.playlist_id: log.debug(f"{self}: end_reset_model: not us ({self.playlist_id=})") return with db.Session() as session: self.refresh_data(session) super().endResetModel() self.reset_track_sequence_row_numbers() def edit_role(self, row: int, column: int, rat: RowAndTrack) -> QVariant: """ Return text for editing """ # If this is a header row and we're being asked for the # HEADER_NOTES_COLUMN, return the note value if self.is_header_row(row) and column == HEADER_NOTES_COLUMN: return QVariant(rat.note) if column == Col.INTRO.value: return QVariant(rat.intro) if column == Col.TITLE.value: return QVariant(rat.title) if column == Col.ARTIST.value: return QVariant(rat.artist) if column == Col.NOTE.value: return QVariant(rat.note) return QVariant() def flags(self, index: QModelIndex) -> Qt.ItemFlag: """ Standard model flags """ if not index.isValid(): return Qt.ItemFlag.ItemIsDropEnabled default = ( Qt.ItemFlag.ItemIsEnabled | Qt.ItemFlag.ItemIsSelectable | Qt.ItemFlag.ItemIsDragEnabled ) if index.column() in [ Col.TITLE.value, Col.ARTIST.value, Col.NOTE.value, Col.INTRO.value, ]: return default | Qt.ItemFlag.ItemIsEditable return default def font_role(self, row: int, column: int, rat: RowAndTrack) -> QVariant: """ Return font """ # Notes column is never bold if column == Col.NOTE.value: return QVariant() boldfont = QFont() boldfont.setBold(not self.playlist_rows[row].played) return QVariant(boldfont) def get_duplicate_rows(self) -> list[int]: """ Return a list of duplicate rows. If track appears in rows 2, 3 and 4, return [3, 4] (ie, ignore the first, not-yet-duplicate, track). """ log.debug(f"{self}: get_duplicate_rows() called") found = [] result = [] for i in range(len(self.playlist_rows)): track_id = self.playlist_rows[i].track_id if track_id is None: continue if track_id in found: result.append(i) else: found.append(track_id) log.debug(f"{self}: get_duplicate_rows() returned: {result=}") return result def _get_new_row_number(self, proposed_row_number: Optional[int]) -> int: """ Sanitises proposed new row number. If proposed_row_number given, ensure it is valid. If not given, return row number to add to end of model. """ log.debug(f"{self}: _get_new_row_number({proposed_row_number=})") if proposed_row_number is None or proposed_row_number > len(self.playlist_rows): # We are adding to the end of the list new_row_number = len(self.playlist_rows) elif proposed_row_number < 0: # Add to start of list new_row_number = 0 else: new_row_number = proposed_row_number log.debug(f"{self}: get_new_row_number() return: {new_row_number=}") return new_row_number def get_row_info(self, row_number: int) -> RowAndTrack: """ Return info about passed row """ return self.playlist_rows[row_number] def get_row_track_id(self, row_number: int) -> Optional[int]: """ Return id of track associated with row or None if no track associated """ return self.playlist_rows[row_number].track_id def get_row_track_path(self, row_number: int) -> str: """ Return path of track associated with row or empty string if no track associated """ return self.playlist_rows[row_number].path def get_rows_duration(self, row_numbers: list[int]) -> int: """ Return the total duration of the passed rows """ duration = 0 for row_number in row_numbers: duration += self.playlist_rows[row_number].duration return duration def get_unplayed_rows(self) -> list[int]: """ Return a list of unplayed row numbers """ result = [ a.row_number for a in self.playlist_rows.values() if not a.played and a.track_id is not None ] # log.debug(f"{self}: get_unplayed_rows() returned: {result=}") return result def headerData( self, section: int, orientation: Qt.Orientation, role: int = Qt.ItemDataRole.DisplayRole, ) -> QVariant: """ Return text for headers """ display_dispatch_table = { Col.START_GAP.value: QVariant(Config.HEADER_START_GAP), Col.INTRO.value: QVariant(Config.HEADER_INTRO), Col.TITLE.value: QVariant(Config.HEADER_TITLE), Col.ARTIST.value: QVariant(Config.HEADER_ARTIST), Col.DURATION.value: QVariant(Config.HEADER_DURATION), Col.START_TIME.value: QVariant(Config.HEADER_START_TIME), Col.END_TIME.value: QVariant(Config.HEADER_END_TIME), Col.LAST_PLAYED.value: QVariant(Config.HEADER_LAST_PLAYED), Col.BITRATE.value: QVariant(Config.HEADER_BITRATE), Col.NOTE.value: QVariant(Config.HEADER_NOTE), } if role == Qt.ItemDataRole.DisplayRole: if orientation == Qt.Orientation.Horizontal: return display_dispatch_table[section] else: if Config.ROWS_FROM_ZERO: return QVariant(str(section)) else: return QVariant(str(section + 1)) elif role == Qt.ItemDataRole.FontRole: boldfont = QFont() boldfont.setBold(True) return QVariant(boldfont) return QVariant() def header_text(self, rat: RowAndTrack) -> str: """ Process possible section timing directives embeded in header """ if rat.note.endswith("+"): return self.start_of_timed_section_header(rat) elif rat.note.endswith("="): return self.section_subtotal_header(rat) elif rat.note == "-": # If the hyphen is the only thing on the line, echo the note # that started the section without the trailing "+". for row_number in range(rat.row_number - 1, -1, -1): row_rat = self.playlist_rows[row_number] if self.is_header_row(row_number): if row_rat.note.endswith("-"): # We didn't find a matching section start break if row_rat.note.endswith("+"): return f"[End: {row_rat.note[:-1]}]" return "-" return rat.note def hide_played_tracks(self, hide: bool) -> None: """ Set played tracks hidden according to 'hide' """ self.played_tracks_hidden = hide for row_number in range(len(self.playlist_rows)): if self.is_played_row(row_number): self.invalidate_row(row_number) def insert_row( self, proposed_row_number: Optional[int], track_id: Optional[int] = None, note: str = "", ) -> None: """ Insert a row. """ log.debug(f"{self}: insert_row({proposed_row_number=}, {track_id=}, {note=})") new_row_number = self._get_new_row_number(proposed_row_number) with db.Session() as session: super().beginInsertRows(QModelIndex(), new_row_number, new_row_number) _ = PlaylistRows.insert_row( session=session, playlist_id=self.playlist_id, new_row_number=new_row_number, note=note, track_id=track_id, ) session.commit() self.refresh_data(session) super().endInsertRows() self.signals.resize_rows_signal.emit(self.playlist_id) self.reset_track_sequence_row_numbers() self.invalidate_rows(list(range(new_row_number, len(self.playlist_rows)))) def invalidate_row(self, modified_row: int) -> None: """ Signal to view to refresh invalidated row """ self.dataChanged.emit( self.index(modified_row, 0), self.index(modified_row, self.columnCount() - 1), ) def invalidate_rows(self, modified_rows: list[int]) -> None: """ Signal to view to refresh invlidated rows """ for modified_row in modified_rows: self.invalidate_row(modified_row) def is_header_row(self, row_number: int) -> bool: """ Return True if row is a header row, else False """ if row_number in self.playlist_rows: return self.playlist_rows[row_number].path == "" return False def is_played_row(self, row_number: int) -> bool: """ Return True if row is an unplayed track row, else False """ return self.playlist_rows[row_number].played def is_track_in_playlist(self, track_id: int) -> Optional[RowAndTrack]: """ If this track_id is in the playlist, return the RowAndTrack object else return None """ for row_number in range(len(self.playlist_rows)): if self.playlist_rows[row_number].track_id == track_id: return self.playlist_rows[row_number] return None def mark_unplayed(self, row_numbers: list[int]) -> None: """ Mark row as unplayed """ with db.Session() as session: for row_number in row_numbers: playlist_row = session.get( PlaylistRows, self.playlist_rows[row_number].playlistrow_id ) if not playlist_row: return playlist_row.played = False session.commit() self.refresh_row(session, row_number) self.update_track_times() self.invalidate_rows(row_numbers) def move_rows(self, from_rows: list[int], to_row_number: int) -> None: """ Move the playlist rows given to to_row and below. """ log.debug(f"{self}: move_rows({from_rows=}, {to_row_number=}") # Build a {current_row_number: new_row_number} dictionary row_map: dict[int, int] = {} # The destination row number will need to be reduced by the # number of rows being move from above the destination row # otherwise rows below the destination row will end up above the # moved rows. adjusted_to_row = to_row_number - len( [a for a in from_rows if a < to_row_number] ) # Put the from_row row numbers into the row_map. Ultimately the # total number of elements in the playlist doesn't change, so # check that adding the moved rows starting at to_row won't # overshoot the end of the playlist. if adjusted_to_row + len(from_rows) > len(self.playlist_rows): next_to_row = len(self.playlist_rows) - len(from_rows) else: next_to_row = adjusted_to_row # zip iterates from_row and to_row simultaneously from the # respective sequences inside zip() for from_row, to_row in zip( from_rows, range(next_to_row, next_to_row + len(from_rows)) ): row_map[from_row] = to_row # Move the remaining rows to the row_map. We want to fill it # before (if there are gaps) and after (likewise) the rows that # are moving. # zip iterates old_row and new_row simultaneously from the # respective sequences inside zip() for old_row, new_row in zip( [x for x in self.playlist_rows.keys() if x not in from_rows], [y for y in range(len(self.playlist_rows)) if y not in row_map.values()], ): # Optimise: only add to map if there is a change if old_row != new_row: row_map[old_row] = new_row # Check to see whether any rows in track_sequence have moved if track_sequence.previous and track_sequence.previous.row_number in row_map: track_sequence.previous.row_number = row_map[ track_sequence.previous.row_number ] if track_sequence.current and track_sequence.current.row_number in row_map: track_sequence.current.row_number = row_map[ track_sequence.current.row_number ] if track_sequence.next and track_sequence.next.row_number in row_map: track_sequence.next.row_number = row_map[track_sequence.next.row_number] # For SQLAlchemy, build a list of dictionaries that map playlistrow_id to # new row number: sqla_map: list[dict[str, int]] = [] for oldrow, newrow in row_map.items(): playlistrow_id = self.playlist_rows[oldrow].playlistrow_id sqla_map.append({"playlistrow_id": playlistrow_id, "row_number": newrow}) with db.Session() as session: PlaylistRows.update_plr_row_numbers(session, self.playlist_id, sqla_map) session.commit() # Update playlist_rows self.refresh_data(session) # Update display self.reset_track_sequence_row_numbers() self.update_track_times() self.invalidate_rows(list(row_map.keys())) def move_rows_between_playlists( self, from_rows: list[int], to_row_number: int, to_playlist_id: int ) -> None: """ Move the playlist rows given to to_row and below of to_playlist. """ log.debug( f"{self}: move_rows_between_playlists({from_rows=}, " f"{to_row_number=}, {to_playlist_id=}" ) # Row removal must be wrapped in beginRemoveRows .. # endRemoveRows and the row range must be contiguous. Process # the highest rows first so the lower row numbers are unchanged row_groups = self._reversed_contiguous_row_groups(from_rows) next_to_row = to_row_number # Prepare destination playlist for a reset self.signals.begin_reset_model_signal.emit(to_playlist_id) with db.Session() as session: # Make room in destination playlist max_destination_row_number = PlaylistRows.get_last_used_row( session, to_playlist_id ) if ( max_destination_row_number and to_row_number <= max_destination_row_number ): # Move the destination playlist rows down to make room. PlaylistRows.move_rows_down( session, to_playlist_id, to_row_number, len(from_rows) ) for row_group in row_groups: super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group)) for playlist_row in PlaylistRows.plrids_to_plrs( session, self.playlist_id, [self.playlist_rows[a].playlistrow_id for a in row_group], ): if ( track_sequence.current and playlist_row.id == track_sequence.current.playlistrow_id ): # Don't move current track continue playlist_row.playlist_id = to_playlist_id playlist_row.row_number = next_to_row next_to_row += 1 self.refresh_data(session) super().endRemoveRows() # We need to remove gaps in row numbers after tracks have # moved. PlaylistRows.fixup_rownumbers(session, self.playlist_id) self.refresh_data(session) session.commit() # Reset of model must come after session has been closed self.reset_track_sequence_row_numbers() self.signals.end_reset_model_signal.emit(to_playlist_id) self.update_track_times() def move_track_add_note( self, new_row_number: int, existing_rat: RowAndTrack, note: str ) -> None: """ Move existing_rat track to new_row_number and append note to any existing note """ log.debug( f"{self}: move_track_add_note({new_row_number=}, {existing_rat=}, {note=}" ) if note: with db.Session() as session: playlist_row = session.get(PlaylistRows, existing_rat.playlistrow_id) if playlist_row: if playlist_row.note: playlist_row.note += "\n" + note else: playlist_row.note = note session.commit() # Carry out the move outside of the session context to ensure # database updated with any note change self.move_rows([existing_rat.row_number], new_row_number) self.signals.resize_rows_signal.emit(self.playlist_id) def move_track_to_header( self, header_row_number: int, existing_rat: RowAndTrack, note: Optional[str], ) -> None: """ Add the existing_rat track details to the existing header at header_row_number """ log.debug( f"{self}: move_track_to_header({header_row_number=}, {existing_rat=}, {note=}" ) if existing_rat.track_id: if note and existing_rat.note: note += "\n" + existing_rat.note self.add_track_to_header(header_row_number, existing_rat.track_id, note) self.delete_rows([existing_rat.row_number]) def obs_scene_change(self, row_number: int) -> None: """ Check this row and any preceding headers for OBS scene change command and execute any found """ log.debug(f"{self}: obs_scene_change({row_number=})") # Check any headers before this row idx = row_number - 1 while self.is_header_row(idx): idx -= 1 # Step through headers in row order and finish with this row for chkrow in range(idx + 1, row_number + 1): match_obj = scene_change_re.search(self.playlist_rows[chkrow].note) if match_obj: scene_name = match_obj.group(1) if scene_name: ws = obswebsocket.obsws( host=Config.OBS_HOST, port=Config.OBS_PORT, password=Config.OBS_PASSWORD, ) try: ws.connect() ws.call( obswebsocket.requests.SetCurrentProgramScene( sceneName=scene_name ) ) log.debug(f"{self}: OBS scene changed to '{scene_name}'") continue except obswebsocket.exceptions.ConnectionFailure: log.error(f"{self}: OBS connection refused") return def previous_track_ended(self) -> None: """ Notification from musicmuster that the previous track has ended. Actions required: - sanity check - update display """ log.debug(f"{self}: previous_track_ended()") # Sanity check if not track_sequence.previous: log.error( f"{self}: playlistmodel:previous_track_ended called with no current track" ) return if track_sequence.previous.row_number is None: log.error( f"{self}: previous_track_ended called with no row number " f"({track_sequence.previous=})" ) return # Update display self.invalidate_row(track_sequence.previous.row_number) def refresh_data(self, session: db.session) -> None: """Populate dicts for data calls""" # Populate self.playlist_rows with playlist data self.playlist_rows.clear() for p in PlaylistRows.get_playlist_rows(session, self.playlist_id): self.playlist_rows[p.row_number] = RowAndTrack(p) def refresh_row(self, session, row_number): """Populate dict for one row from database""" p = PlaylistRows.deep_row(session, self.playlist_id, row_number) self.playlist_rows[row_number] = RowAndTrack(p) def remove_track(self, row_number: int) -> None: """ Remove track from row, retaining row as a header row """ log.debug(f"{self}: remove_track({row_number=})") with db.Session() as session: playlist_row = session.get( PlaylistRows, self.playlist_rows[row_number].playlistrow_id ) if playlist_row: playlist_row.track_id = None session.commit() self.refresh_row(session, row_number) self.invalidate_row(row_number) def rescan_track(self, row_number: int) -> None: """ Rescan track at passed row number """ track_id = self.playlist_rows[row_number].track_id if track_id: with db.Session() as session: track = session.get(Tracks, track_id) set_track_metadata(track) self.refresh_row(session, row_number) self.update_track_times() self.invalidate_row(row_number) self.signals.resize_rows_signal.emit(self.playlist_id) session.commit() def reset_track_sequence_row_numbers(self) -> None: """ Signal handler for when row ordering has changed. Example: row 4 is marked as next. Row 2 is deleted. The PlaylistRows table will be correctly updated with change of row number, but track_sequence.next will still contain row_number==4. This function fixes up the track_sequence row numbers by looking up the playlistrow_id and retrieving the row number from the database. """ log.debug(f"{self}: reset_track_sequence_row_numbers()") # Check the track_sequence.next, current and previous plrs and # update the row number with db.Session() as session: for ts in [ track_sequence.next, track_sequence.current, track_sequence.previous, ]: if ts and ts.playlist_id == self.playlist_id and ts.row_number: playlist_row = session.get(PlaylistRows, ts.playlistrow_id) if playlist_row and playlist_row.row_number != ts.row_number: ts.row_number = playlist_row.row_number self.update_track_times() def _reversed_contiguous_row_groups( self, row_numbers: list[int] ) -> list[list[int]]: """ Take the list of row numbers and split into groups of contiguous rows. Return as a list of lists with the highest row numbers first. Example: input [2, 3, 4, 5, 7, 9, 10, 13, 17, 20, 21] return: [[20, 21], [17], [13], [9, 10], [7], [2, 3, 4, 5]] """ log.debug(f"{self}: _reversed_contiguous_row_groups({row_numbers=} called") result: list[list[int]] = [] temp: list[int] = [] last_value = row_numbers[0] - 1 for idx in range(len(row_numbers)): if row_numbers[idx] != last_value + 1: result.append(temp) temp = [] last_value = row_numbers[idx] temp.append(last_value) if temp: result.append(temp) result.reverse() log.debug(f"{self}: _reversed_contiguous_row_groups() returned: {result=}") return result def rowCount(self, index: QModelIndex = QModelIndex()) -> int: """Standard function for view""" return len(self.playlist_rows) def section_subtotal_header(self, rat: RowAndTrack) -> str: """ Process this row as subtotal within a timed section and return display text for this row """ count: int = 0 unplayed_count: int = 0 duration: int = 0 # Show subtotal for row_number in range(rat.row_number - 1, -1, -1): row_rat = self.playlist_rows[row_number] if self.is_header_row(row_number): if row_rat.note.endswith("-"): # There was no start of section return rat.note if row_rat.note.endswith(("+", "=")): # If we are playing this section, also # calculate end time if all tracks are played. end_time_str = "" if ( track_sequence.current and track_sequence.current.end_time and ( row_number < track_sequence.current.row_number < rat.row_number ) ): section_end_time = ( track_sequence.current.end_time + dt.timedelta(milliseconds=duration) ) end_time_str = ( ", section end time " + section_end_time.strftime(Config.TRACK_TIME_FORMAT) ) stripped_note = rat.note[:-1].strip() if stripped_note: return ( f"{stripped_note} [" f"{unplayed_count}/{count} track{'s' if count > 1 else ''} " f"({ms_to_mmss(duration)}) unplayed{end_time_str}]" ) else: return ( f"[{unplayed_count}/{count} track{'s' if count > 1 else ''} " f"({ms_to_mmss(duration)}) unplayed{end_time_str}]" ) else: continue else: count += 1 if not row_rat.played: unplayed_count += 1 duration += row_rat.duration # Should never get here return f"Error calculating subtotal ({row_rat.note})" def selection_is_sortable(self, row_numbers: list[int]) -> bool: """ Return True if the selection is sortable. That means: - at least two rows selected - selected rows are contiguous - selected rows do not include any header rows """ # at least two rows selected if len(row_numbers) < 2: return False # selected rows are contiguous if sorted(row_numbers) != list(range(min(row_numbers), max(row_numbers) + 1)): return False # selected rows do not include any header rows for row_number in row_numbers: if self.is_header_row(row_number): return False return True def set_next_row(self, row_number: Optional[int]) -> None: """ Set row_number as next track. If row_number is None, clear next track. Return True if successful else False. """ log.debug(f"{self}: set_next_row({row_number=})") if row_number is None: # Clear next track if track_sequence.next is not None: track_sequence.set_next(None) else: # Get playlistrow_id of row try: rat = self.playlist_rows[row_number] except IndexError: log.error(f"{self} set_track_sequence.next({row_number=}, IndexError") return if rat.track_id is None or rat.row_number is None: log.error( f"{self} .set_track_sequence.next({row_number=}, " f"No track / row number {rat.track_id=}, {rat.row_number=}" ) return old_next_row: Optional[int] = None if track_sequence.next: old_next_row = track_sequence.next.row_number track_sequence.set_next(rat) if Config.WIKIPEDIA_ON_NEXT: self.signals.search_wikipedia_signal.emit( self.playlist_rows[row_number].title ) if Config.SONGFACTS_ON_NEXT: self.signals.search_songfacts_signal.emit( self.playlist_rows[row_number].title ) if old_next_row is not None: self.invalidate_row(old_next_row) self.invalidate_row(row_number) self.signals.next_track_changed_signal.emit() self.update_track_times() def setData( self, index: QModelIndex, value: str | float, role: int = Qt.ItemDataRole.EditRole, ) -> bool: """ Update model with edited data """ if not index.isValid() or role != Qt.ItemDataRole.EditRole: return False row_number = index.row() column = index.column() with db.Session() as session: playlist_row = session.get( PlaylistRows, self.playlist_rows[row_number].playlistrow_id ) if not playlist_row: log.error( f"{self}: Error saving data: {row_number=}, {column=}, {value=}" ) return False if playlist_row.track_id: if column in [Col.TITLE.value, Col.ARTIST.value, Col.INTRO.value]: track = session.get(Tracks, playlist_row.track_id) if not track: log.error(f"{self}: Error retreiving track: {playlist_row=}") return False if column == Col.TITLE.value: track.title = str(value) elif column == Col.ARTIST.value: track.artist = str(value) elif column == Col.INTRO.value: track.intro = int(round(float(value), 1) * 1000) else: log.error(f"{self}: Error updating track: {column=}, {value=}") return False elif column == Col.NOTE.value: playlist_row.note = str(value) else: # This is a header row if column == HEADER_NOTES_COLUMN: playlist_row.note = str(value) # commit changes before refreshing data session.commit() self.refresh_row(session, row_number) self.dataChanged.emit(index, index, [Qt.ItemDataRole.DisplayRole, role]) return True def sort_by_artist(self, row_numbers: list[int]) -> None: """ Sort selected rows by artist """ self.sort_by_attribute(row_numbers, "artist") def sort_by_attribute(self, row_numbers: list[int], attr_name: str) -> None: """ Sort selected rows by passed attribute name where 'attribute' is a key in PlaylistRowData """ # Create a subset of playlist_rows with the rows we are # interested in shortlist_rows = {k: self.playlist_rows[k] for k in row_numbers} sorted_list = [ playlist_row.row_number for playlist_row in sorted( shortlist_rows.values(), key=attrgetter(attr_name) ) ] self.move_rows(sorted_list, min(sorted_list)) def sort_by_duration(self, row_numbers: list[int]) -> None: """ Sort selected rows by duration """ self.sort_by_attribute(row_numbers, "duration") def sort_by_lastplayed(self, row_numbers: list[int]) -> None: """ Sort selected rows by lastplayed """ self.sort_by_attribute(row_numbers, "lastplayed") def sort_randomly(self, row_numbers: list[int]) -> None: """ Sort selected rows randomly """ shuffle(row_numbers) self.move_rows(row_numbers, min(row_numbers)) def sort_by_title(self, row_numbers: list[int]) -> None: """ Sort selected rows by title """ self.sort_by_attribute(row_numbers, "title") def start_of_timed_section_header(self, rat: RowAndTrack) -> str: """ Process this row as the start of a timed section and return display text for this row """ count: int = 0 unplayed_count: int = 0 duration: int = 0 for row_number in range(rat.row_number + 1, len(self.playlist_rows)): row_rat = self.playlist_rows[row_number] if self.is_header_row(row_number): if row_rat.note.endswith("-"): return ( f"{rat.note[:-1].strip()} " f"[{count} tracks, {ms_to_mmss(duration)} unplayed]" ) else: continue else: count += 1 if not row_rat.played: unplayed_count += 1 duration += row_rat.duration return ( f"{rat.note[:-1].strip()} " f"[{count} tracks, {ms_to_mmss(duration, none='none')} " "unplayed (to end of playlist)]" ) def supportedDropActions(self) -> Qt.DropAction: return Qt.DropAction.MoveAction | Qt.DropAction.CopyAction def tooltip_role(self, row: int, column: int, rat: RowAndTrack) -> QVariant: """ Return tooltip. Currently only used for last_played column. """ if column != Col.LAST_PLAYED.value: return QVariant() with db.Session() as session: track_id = self.playlist_rows[row].track_id if not track_id: return QVariant() playdates = Playdates.last_playdates(session, track_id) return QVariant( "
".join( [ a.lastplayed.strftime(Config.LAST_PLAYED_TOOLTIP_DATE_FORMAT) for a in reversed(playdates) ] ) ) def update_track_times(self) -> None: """ Update track start/end times in self.playlist_rows """ log.debug(f"{self}: update_track_times()") next_start_time: Optional[dt.datetime] = None update_rows: list[int] = [] row_count = len(self.playlist_rows) current_track_row = None next_track_row = None if ( track_sequence.current and track_sequence.current.playlist_id == self.playlist_id ): current_track_row = track_sequence.current.row_number # Update current track details now so that they are available # when we deal with next track row which may be above current # track row. self.playlist_rows[current_track_row].set_forecast_start_time( update_rows, track_sequence.current.start_time ) if track_sequence.next and track_sequence.next.playlist_id == self.playlist_id: next_track_row = track_sequence.next.row_number for row_number in range(row_count): rat = self.playlist_rows[row_number] # Don't update times for tracks that have been played, for # unreadable tracks or for the current track, handled above. if ( rat.played or row_number == current_track_row or (rat.path and file_is_unreadable(rat.path)) ): continue # Reset start time if timing in header if self.is_header_row(row_number): header_time = get_embedded_time(rat.note) if header_time: next_start_time = header_time continue # Set start time for next row if we have a current track if ( row_number == next_track_row and track_sequence.current and track_sequence.current.end_time ): next_start_time = rat.set_forecast_start_time( update_rows, track_sequence.current.end_time ) continue # If we're between the current and next row, zero out # times if (current_track_row or row_count) < row_number < (next_track_row or 0): rat.set_forecast_start_time(update_rows, None) continue # Set start/end next_start_time = rat.set_forecast_start_time(update_rows, next_start_time) # Update start/stop times of rows that have changed for updated_row in update_rows: self.dataChanged.emit( self.index(updated_row, Col.START_TIME.value), self.index(updated_row, Col.END_TIME.value), ) class PlaylistProxyModel(QSortFilterProxyModel): """ For searching and filtering """ def __init__( self, source_model: PlaylistModel, *args: QObject, **kwargs: QObject, ) -> None: self.source_model = source_model super().__init__(*args, **kwargs) self.setSourceModel(source_model) # Search all columns self.setFilterKeyColumn(-1) def __repr__(self) -> str: return f"" def filterAcceptsRow(self, source_row: int, source_parent: QModelIndex) -> bool: """ Subclass to filter by played status. Return True to show this row, False to hide it. """ if self.source_model.played_tracks_hidden: if self.source_model.is_played_row(source_row): # Don't hide current track if ( track_sequence.current and track_sequence.current.playlist_id == self.source_model.playlist_id and track_sequence.current.row_number == source_row ): return True # Don't hide next track if ( track_sequence.next and track_sequence.next.playlist_id == self.source_model.playlist_id and track_sequence.next.row_number == source_row ): return True # Handle previous track if track_sequence.previous: if ( track_sequence.previous.playlist_id != self.source_model.playlist_id or track_sequence.previous.row_number != source_row ): # This row isn't our previous track: hide it return False if track_sequence.current and track_sequence.current.start_time: # This row is our previous track. Don't hide it # until HIDE_AFTER_PLAYING_OFFSET milliseconds # after current track has started if track_sequence.current.start_time and dt.datetime.now() > ( track_sequence.current.start_time + dt.timedelta( milliseconds=Config.HIDE_AFTER_PLAYING_OFFSET ) ): return False else: # Invalidate this row in # HIDE_AFTER_PLAYING_OFFSET and a bit # milliseconds so that it hides then. We add # 100mS on so that the if clause above is # true next time through. QTimer.singleShot( Config.HIDE_AFTER_PLAYING_OFFSET + 100, lambda: self.source_model.invalidate_row(source_row), ) return True # Next track not playing yet so don't hide previous else: return True # No previous track so hide this played track immediately return False return super().filterAcceptsRow(source_row, source_parent) def set_incremental_search(self, search_string: str) -> None: """ Update search pattern """ self.setFilterRegularExpression( QRegularExpression( search_string, QRegularExpression.PatternOption.CaseInsensitiveOption ) ) # ###################################### # Forward functions not handled in proxy # ###################################### def current_track_started(self): return self.source_model.current_track_started() def delete_rows(self, row_numbers: list[int]) -> None: return self.source_model.delete_rows(row_numbers) def get_duplicate_rows(self) -> list[int]: return self.source_model.get_duplicate_rows() def get_rows_duration(self, row_numbers: list[int]) -> int: return self.source_model.get_rows_duration(row_numbers) def get_row_info(self, row_number: int) -> RowAndTrack: return self.source_model.get_row_info(row_number) def get_row_track_path(self, row_number: int) -> str: return self.source_model.get_row_track_path(row_number) def get_unplayed_rows(self) -> list[int]: return self.source_model.get_unplayed_rows() def hide_played_tracks(self, hide: bool) -> None: return self.source_model.hide_played_tracks(hide) def insert_row( self, proposed_row_number: Optional[int], track_id: Optional[int] = None, note: str = "", ) -> None: return self.source_model.insert_row(proposed_row_number, track_id, note) def is_header_row(self, row_number: int) -> bool: return self.source_model.is_header_row(row_number) def is_played_row(self, row_number: int) -> bool: return self.source_model.is_played_row(row_number) def is_track_in_playlist(self, track_id: int) -> Optional[RowAndTrack]: return self.source_model.is_track_in_playlist(track_id) def mark_unplayed(self, row_numbers: list[int]) -> None: return self.source_model.mark_unplayed(row_numbers) def move_rows(self, from_rows: list[int], to_row_number: int) -> None: return self.source_model.move_rows(from_rows, to_row_number) def move_rows_between_playlists( self, from_rows: list[int], to_row_number: int, to_playlist_id: int ) -> None: return self.source_model.move_rows_between_playlists( from_rows, to_row_number, to_playlist_id ) def move_track_add_note( self, new_row_number: int, existing_rat: RowAndTrack, note: str ) -> None: return self.source_model.move_track_add_note(new_row_number, existing_rat, note) def move_track_to_header( self, header_row_number: int, existing_rat: RowAndTrack, note: Optional[str], ) -> None: return self.source_model.move_track_to_header( header_row_number, existing_rat, note ) def previous_track_ended(self) -> None: return self.source_model.previous_track_ended() def remove_track(self, row_number: int) -> None: return self.source_model.remove_track(row_number) def rescan_track(self, row_number: int) -> None: return self.source_model.rescan_track(row_number) def set_next_row(self, row_number: Optional[int]) -> None: self.source_model.set_next_row(row_number) def sort_by_artist(self, row_numbers: list[int]) -> None: return self.source_model.sort_by_artist(row_numbers) def sort_by_duration(self, row_numbers: list[int]) -> None: return self.source_model.sort_by_duration(row_numbers) def sort_by_lastplayed(self, row_numbers: list[int]) -> None: return self.source_model.sort_by_lastplayed(row_numbers) def sort_randomly(self, row_numbers: list[int]) -> None: return self.source_model.sort_randomly(row_numbers) def sort_by_title(self, row_numbers: list[int]) -> None: return self.source_model.sort_by_title(row_numbers) def update_track_times(self) -> None: return self.source_model.update_track_times()