# Standard library imports from __future__ import annotations from operator import attrgetter from random import shuffle from typing import cast, Optional import datetime as dt import re # PyQt imports from PyQt6.QtCore import ( QAbstractTableModel, QModelIndex, QRegularExpression, QSortFilterProxyModel, Qt, QTimer, QVariant, ) from PyQt6.QtGui import ( QBrush, QColor, QFont, ) # Third party imports # import line_profiler import obswebsocket # type: ignore # import snoop # type: ignore # App imports from classes import ( ApplicationError, Col, InsertRows, InsertTrack, MusicMusterSignals, TrackAndPlaylist, ) from config import Config from helpers import ( ask_yes_no, file_is_unreadable, get_all_track_metadata, get_embedded_time, get_relative_date, ms_to_mmss, ) from log import log, log_call from playlistrow import PlaylistRow, TrackSequence import ds HEADER_NOTES_COLUMN = 1 scene_change_re = re.compile(r"SetScene=\[([^[\]]*)\]") class PlaylistModel(QAbstractTableModel): """ The Playlist Model Cache the database info in self.playlist_rows, a dictionary of PlaylistRow objects indexed by row_number. Update strategy: update the database and then refresh the row-indexed cached copy (self.playlist_rows). Do not edit self.playlist_rows directly because keeping it and the database in sync is uncessarily challenging. refresh_row() will populate one row of playlist_rows from the database refresh_data() will repopulate all of playlist_rows from the database. """ def __init__( self, playlist_id: int, is_template: bool, ) -> None: super().__init__() self.playlist_id = playlist_id self.is_template = is_template self.track_sequence = TrackSequence() self.playlist_rows: dict[int, PlaylistRow] = {} self.selected_rows: list[PlaylistRow] = [] self.signals = MusicMusterSignals() self.played_tracks_hidden = False # Connect signals self.signals.signal_add_track_to_header.connect(self.add_track_to_header) self.signals.signal_begin_insert_rows.connect(self.begin_insert_rows) self.signals.signal_end_insert_rows.connect(self.end_insert_rows) self.signals.signal_playlist_selected_rows.connect(self.set_selected_rows) self.signals.signal_set_next_row.connect(self.set_next_row) self.signals.signal_track_started.connect(self.track_started) # Populate self.playlist_rows for dto in ds.playlistrows_by_playlist(self.playlist_id): self.playlist_rows[dto.row_number] = PlaylistRow(dto) self.update_track_times() def __repr__(self) -> str: return ( f"" ) def active_section_header(self) -> int: """ Return the row number of the first header that has any of the following below it: - unplayed tracks - the currently being played track - the track marked as next to play """ header_row = 0 for row_number in range(len(self.playlist_rows)): if self.is_header_row(row_number): header_row = row_number continue if not self.is_played_row(row_number): break # Here means that row_number points to a played track. The # current track will be marked as played when we start # playing it. It's also possible that the track marked as # next has already been played. Check for either of those. for ts in [self.track_sequence.next, self.track_sequence.current]: if ( ts and ts.row_number == row_number and ts.playlist_id == self.playlist_id ): # We've found the current or next track, so return # the last-found header row return header_row return header_row # @log_call def add_track_to_header(self, track_and_playlist: TrackAndPlaylist) -> None: """ Handle signal_add_track_to_header """ if track_and_playlist.playlist_id != self.playlist_id: return if not self.selected_rows: raise ApplicationError("Add track to header but no row selected") if len(self.selected_rows) > 1: self.signals.show_warning_signal.emit( "Add track to header", "Select one header to add track to" ) return selected_row = self.selected_rows[0] if selected_row.path: self.signals.show_warning_signal.emit( "Add track to header", "Select header to add track to" ) return selected_row.track_id = track_and_playlist.track_id # Update local copy self.refresh_row(selected_row.row_number) # Repaint row roles_to_invalidate = [ Qt.ItemDataRole.BackgroundRole, Qt.ItemDataRole.DisplayRole, Qt.ItemDataRole.FontRole, Qt.ItemDataRole.ForegroundRole, ] # only invalidate required roles self.invalidate_row(selected_row.row_number, roles_to_invalidate) self.signals.resize_rows_signal.emit(self.playlist_id) def _background_role(self, row: int, column: int, plr: PlaylistRow) -> QBrush: """Return background setting""" # Handle entire row colouring # Header row if self.is_header_row(row): # Check for specific header colouring if plr.row_bg is None: plr.row_bg = ds.notecolours_get_colour(plr.note) if plr.row_bg: return QBrush(QColor(plr.row_bg)) else: return QBrush(QColor(Config.COLOUR_NOTES_PLAYLIST)) # Unreadable track file if file_is_unreadable(plr.path): return QBrush(QColor(Config.COLOUR_UNREADABLE)) # Current track if ( self.track_sequence.current and self.track_sequence.current.playlist_id == self.playlist_id and self.track_sequence.current.row_number == row ): return QBrush(QColor(Config.COLOUR_CURRENT_PLAYLIST)) # Next track if ( self.track_sequence.next and self.track_sequence.next.playlist_id == self.playlist_id and self.track_sequence.next.row_number == row ): return QBrush(QColor(Config.COLOUR_NEXT_PLAYLIST)) # Individual cell colouring if column == Col.START_GAP.value: if plr.start_gap and plr.start_gap >= Config.START_GAP_WARNING_THRESHOLD: return QBrush(QColor(Config.COLOUR_LONG_START)) if column == Col.BITRATE.value: if not plr.bitrate or plr.bitrate < Config.BITRATE_LOW_THRESHOLD: return QBrush(QColor(Config.COLOUR_BITRATE_LOW)) elif plr.bitrate < Config.BITRATE_OK_THRESHOLD: return QBrush(QColor(Config.COLOUR_BITRATE_MEDIUM)) else: return QBrush(QColor(Config.COLOUR_BITRATE_OK)) if column == Col.NOTE.value: if plr.note: if plr.note_bg is None: plr.row_bg = ds.notecolours_get_colour(plr.note) if plr.note_bg: return QBrush(QColor(plr.note_bg)) return QBrush() def columnCount(self, parent: QModelIndex = QModelIndex()) -> int: """Standard function for view""" return len(Col) # @log_call def track_started(self, play_track: TrackAndPlaylist) -> None: """ Notification from musicmuster that the current track has just started playing Actions required: - sanity check - change OBS scene if needed - update Playdates in database - update PlaylistRows in database - update display - find next track - update track times """ if play_track.playlist_id != self.playlist_id: # Not for us return track_id = play_track.track_id # Sanity check - 1 if not track_id: raise ApplicationError("current_track_started() called with no track_id") # Sanity check - 2 if self.track_sequence.current is None: raise ApplicationError("track_started callced with no current track") row_number = self.track_sequence.current.row_number playlist_dto = self.playlist_rows[row_number] # Sanity check - 3 if playlist_dto.track_id != track_id: raise ApplicationError("track_id mismatch between playlist_rows and signal") # Check for OBS scene change self.obs_scene_change(row_number) # Update Playdates in database ds.playdates_update(track_id) # Mark track as played in playlist playlist_dto.played = True # Update colour and times for current row roles_to_invalidate = [Qt.ItemDataRole.DisplayRole] self.invalidate_row(row_number, roles_to_invalidate) # Update previous row in case we're hiding played rows if self.track_sequence.previous and self.track_sequence.previous.row_number: # only invalidate required roles self.invalidate_row(self.track_sequence.previous.row_number, roles_to_invalidate) # Update all other track times self.update_track_times() # Find next track next_row = self.find_next_row_to_play(row_number) if next_row: self.signals.signal_set_next_track.emit(self.playlist_rows[next_row]) def find_next_row_to_play(self, from_row_number: int) -> int | None: """ Find the next row to play in this playlist. Return row number or None if there's no next track. """ next_row = None unplayed_rows = [ a for a in self.get_unplayed_rows() if not self.is_header_row(a) and not file_is_unreadable(self.playlist_rows[a].path) ] if unplayed_rows: try: next_row = min([a for a in unplayed_rows if a > from_row_number]) except ValueError: next_row = min(unplayed_rows) return next_row def data( self, index: QModelIndex, role: int = Qt.ItemDataRole.DisplayRole ) -> QVariant | QFont | QBrush | str | int: """Return data to view""" if ( not index.isValid() or not (0 <= index.row() < len(self.playlist_rows)) or role in [ Qt.ItemDataRole.DecorationRole, Qt.ItemDataRole.StatusTipRole, Qt.ItemDataRole.WhatsThisRole, Qt.ItemDataRole.SizeHintRole, Qt.ItemDataRole.TextAlignmentRole, Qt.ItemDataRole.CheckStateRole, Qt.ItemDataRole.InitialSortOrderRole, ] ): return QVariant() row = index.row() column = index.column() # plr for playlist row data as it's used a lot plr = self.playlist_rows[row] # These are ordered in approximately the frequency with which # they are called if role == Qt.ItemDataRole.BackgroundRole: return self._background_role(row, column, plr) elif role == Qt.ItemDataRole.DisplayRole: return self._display_role(row, column, plr) elif role == Qt.ItemDataRole.EditRole: return self._edit_role(row, column, plr) elif role == Qt.ItemDataRole.FontRole: return self._font_role(row, column, plr) elif role == Qt.ItemDataRole.ForegroundRole: return self._foreground_role(row, column, plr) elif role == Qt.ItemDataRole.ToolTipRole: return self._tooltip_role(row, column, plr) return QVariant() # @log_call def delete_rows(self, row_numbers: list[int]) -> None: """ Delete passed rows from model Need to delete them in contiguous groups wrapped in beginRemoveRows / endRemoveRows calls. To keep it simple, if inefficient, delete rows one by one. Delete from highest row back so that not yet deleted row numbers don't change. """ for row_group in self._reversed_contiguous_row_groups(row_numbers): # Signal that rows will be removed super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group)) # Remove rows from data store ds.playlist_remove_rows(self.playlist_id, row_group) # Signal that data store has been updated super().endRemoveRows() self.refresh_data() self.track_sequence.update() self.update_track_times() def _display_role(self, row: int, column: int, plr: PlaylistRow) -> str: """ Return text for display """ header_row = self.is_header_row(row) # Set / reset column span if column == HEADER_NOTES_COLUMN: column_span = 1 if header_row: column_span = self.columnCount() - 1 self.signals.span_cells_signal.emit( self.playlist_id, row, HEADER_NOTES_COLUMN, 1, column_span ) if header_row: if column == HEADER_NOTES_COLUMN: header_text = self.header_text(plr) if not header_text: return Config.SECTION_HEADER else: formatted_header = self.header_text(plr) trimmed_header = self.remove_section_timer_markers(formatted_header) return trimmed_header else: return "" if column == Col.START_TIME.value: start_time = plr.forecast_start_time if start_time: return start_time.strftime(Config.TRACK_TIME_FORMAT) return "" if column == Col.END_TIME.value: end_time = plr.forecast_end_time if end_time: return end_time.strftime(Config.TRACK_TIME_FORMAT) return "" if column == Col.INTRO.value: if plr.intro: return f"{plr.intro / 1000:{Config.INTRO_SECONDS_FORMAT}}" else: return "" dispatch_table: dict[int, str] = { Col.ARTIST.value: plr.artist, Col.BITRATE.value: str(plr.bitrate), Col.DURATION.value: ms_to_mmss(plr.duration), Col.LAST_PLAYED.value: get_relative_date(plr.lastplayed), Col.NOTE.value: plr.note, Col.START_GAP.value: str(plr.start_gap), Col.TITLE.value: plr.title, } if column in dispatch_table: return dispatch_table[column] return "" def _edit_role(self, row: int, column: int, plr: PlaylistRow) -> str | int: """ Return value for editing """ # If this is a header row and we're being asked for the # HEADER_NOTES_COLUMN, return the note value if self.is_header_row(row) and column == HEADER_NOTES_COLUMN: return plr.note if column == Col.INTRO.value: return plr.intro or 0 if column == Col.TITLE.value: return plr.title if column == Col.ARTIST.value: return plr.artist if column == Col.NOTE.value: return plr.note return "" def _foreground_role(self, row: int, column: int, plr: PlaylistRow) -> QBrush: """Return header foreground colour or QBrush() if none""" plr.row_fg = ds.notecolours_get_colour(plr.note, foreground=True) if plr.row_fg: return QBrush(QColor(plr.row_fg)) return QBrush() def flags(self, index: QModelIndex) -> Qt.ItemFlag: """ Standard model flags """ if not index.isValid(): return Qt.ItemFlag.ItemIsDropEnabled default = ( Qt.ItemFlag.ItemIsEnabled | Qt.ItemFlag.ItemIsSelectable | Qt.ItemFlag.ItemIsDragEnabled ) if index.column() in [ Col.TITLE.value, Col.ARTIST.value, Col.NOTE.value, Col.INTRO.value, ]: return default | Qt.ItemFlag.ItemIsEditable return default def _font_role(self, row: int, column: int, plr: PlaylistRow) -> QFont: """ Return font """ # Notes column is never bold if column == Col.NOTE.value: return QFont() boldfont = QFont() boldfont.setBold(not self.playlist_rows[row].played) return boldfont # @log_call def get_duplicate_rows(self) -> list[int]: """ Return a list of duplicate rows. If track appears in rows 2, 3 and 4, return [3, 4] (ie, ignore the first, not-yet-duplicate, track). """ found = [] result = [] for i in range(len(self.playlist_rows)): track_id = self.playlist_rows[i].track_id if track_id is None: continue if track_id in found: result.append(i) else: found.append(track_id) return result # @log_call def _get_new_row_number(self) -> int: """ Get row number for new row. If any rows are selected, return the first such row number, else return row number to add to end of model. """ if not self.selected_rows: return len(self.playlist_rows) return self.selected_rows[0].row_number def get_row_info(self, row_number: int) -> PlaylistRow: """ Return info about passed row """ return self.playlist_rows[row_number] # @log_call def get_row_track_id(self, row_number: int) -> Optional[int]: """ Return id of track associated with row or None if no track associated """ return self.playlist_rows[row_number].track_id def get_row_track_path(self, row_number: int) -> str: """ Return path of track associated with row or empty string if no track associated """ return self.playlist_rows[row_number].path def get_rows_duration(self, row_numbers: list[int]) -> int: """ Return the total duration of the passed rows """ duration = 0 for row_number in row_numbers: duration += self.playlist_rows[row_number].duration return duration def get_unplayed_rows(self) -> list[int]: """ Return a list of unplayed row numbers """ result = [ a.row_number for a in self.playlist_rows.values() if not a.played and a.track_id is not None ] return result def headerData( self, section: int, orientation: Qt.Orientation, role: int = Qt.ItemDataRole.DisplayRole, ) -> str | int | QFont | QVariant: """ Return text for headers """ display_dispatch_table = { Col.START_GAP.value: Config.HEADER_START_GAP, Col.INTRO.value: Config.HEADER_INTRO, Col.TITLE.value: Config.HEADER_TITLE, Col.ARTIST.value: Config.HEADER_ARTIST, Col.DURATION.value: Config.HEADER_DURATION, Col.START_TIME.value: Config.HEADER_START_TIME, Col.END_TIME.value: Config.HEADER_END_TIME, Col.LAST_PLAYED.value: Config.HEADER_LAST_PLAYED, Col.BITRATE.value: Config.HEADER_BITRATE, Col.NOTE.value: Config.HEADER_NOTE, } if role == Qt.ItemDataRole.DisplayRole: if orientation == Qt.Orientation.Horizontal: return display_dispatch_table[section] else: if Config.ROWS_FROM_ZERO: return section else: return section + 1 elif role == Qt.ItemDataRole.FontRole: boldfont = QFont() boldfont.setBold(True) return boldfont return QVariant() def header_text(self, plr: PlaylistRow) -> str: """ Process possible section timing directives embeded in header """ if plr.note.endswith(Config.SECTION_STARTS): return self.start_of_timed_section_header(plr) elif plr.note.endswith("="): return self.section_subtotal_header(plr) elif plr.note == "-": # If the hyphen is the only thing on the line, echo the note # that started the section without the trailing "+". for row_number in range(plr.row_number - 1, -1, -1): row_rat = self.playlist_rows[row_number] if self.is_header_row(row_number): if row_rat.note.endswith("-"): # We didn't find a matching section start break if row_rat.note.endswith("+"): return f"[End: {row_rat.note[:-1]}]" return "-" return plr.note def hide_played_tracks(self, hide: bool) -> None: """ Set played tracks hidden according to 'hide' """ self.played_tracks_hidden = hide for row_number in range(len(self.playlist_rows)): if self.is_played_row(row_number): # only invalidate required roles roles = [ Qt.ItemDataRole.DisplayRole, ] self.invalidate_row(row_number, roles) # @log_call def insert_row(self, track_id: Optional[int] = None, note: str = "",) -> None: """ Insert a row. """ new_row_number = self._get_new_row_number() super().beginInsertRows(QModelIndex(), new_row_number, new_row_number) _ = ds.playlist_insert_row( playlist_id=self.playlist_id, row_number=new_row_number, track_id=track_id, note=note, ) # Need to refresh self.playlist_rows because row numbers will have # changed self.refresh_data() super().endInsertRows() self.signals.resize_rows_signal.emit(self.playlist_id) self.track_sequence.update() self.update_track_times() roles_to_invalidate = [ Qt.ItemDataRole.BackgroundRole, Qt.ItemDataRole.DisplayRole, Qt.ItemDataRole.FontRole, Qt.ItemDataRole.ForegroundRole, ] self.invalidate_rows( list(range(new_row_number, len(self.playlist_rows))), roles_to_invalidate ) # @log_call def invalidate_row(self, modified_row: int, roles: list[Qt.ItemDataRole]) -> None: """ Signal to view to refresh invalidated row """ self.dataChanged.emit( self.index(modified_row, 0), self.index(modified_row, self.columnCount() - 1), roles, ) def invalidate_rows( self, modified_rows: list[int], roles: list[Qt.ItemDataRole] ) -> None: """ Signal to view to refresh invlidated rows """ for modified_row in modified_rows: # only invalidate required roles self.invalidate_row(modified_row, roles) def is_header_row(self, row_number: int) -> bool: """ Return True if row is a header row, else False """ if row_number in self.playlist_rows: return self.playlist_rows[row_number].path == "" return False # @log_call def is_played_row(self, row_number: int) -> bool: """ Return True if row is an unplayed track row, else False """ return self.playlist_rows[row_number].played def is_track_in_playlist(self, track_id: int) -> Optional[PlaylistRow]: """ If this track_id is in the playlist, return the RowAndTrack object else return None """ for row_number in range(len(self.playlist_rows)): if self.playlist_rows[row_number].track_id == track_id: return self.playlist_rows[row_number] return None def mark_unplayed(self, row_numbers: list[int]) -> None: """ Mark row as unplayed """ for row_number in row_numbers: self.playlist_rows[row_number].played = False self.refresh_row(row_number) self.update_track_times() # only invalidate required roles roles = [ Qt.ItemDataRole.FontRole, ] self.invalidate_rows(row_numbers, roles) # @log_call def move_rows(self, from_rows: list[int], to_row_number: int) -> bool: """ Move the playlist rows in from_rows to to_row. Return True if successful else False. """ log.debug(f"move_rows({from_rows=}, {to_row_number=})") if not from_rows: log.debug("move_rows called with no from_rows") return False # Don't move current row if self.track_sequence.current: current_row = self.track_sequence.current.row_number if current_row in from_rows: log.debug( "move_rows: Removing {current_row=} from {from_rows=}" ) from_rows.remove(self.track_sequence.current.row_number) from_rows = sorted(set(from_rows)) if (min(from_rows) < 0 or max(from_rows) >= self.rowCount() or to_row_number < 0 or to_row_number > self.rowCount()): log.debug("move_rows: invalid indexes") return False if to_row_number in from_rows: return False # Destination within rows to be moved # Notify model going to change self.beginResetModel() # Update database ds.playlist_move_rows(from_rows, self.playlist_id, to_row_number) # Notify model changed self.endResetModel() # Update display self.refresh_data() self.track_sequence.update() self.update_track_times() # TODO: do we need this? # # only invalidate required roles # roles = [ # Qt.ItemDataRole.DisplayRole, # ] # self.invalidate_rows(list(row_map.keys()), roles) return True # @log_call def move_rows_between_playlists( self, from_rows: list[int], to_row_number: int, to_playlist_id: int, ) -> None: """ Move the playlist rows given to to_row and below of to_playlist. """ # Don't move current row if self.track_sequence.current: current_row = self.track_sequence.current.row_number if current_row in from_rows: log.debug( "move_rows_between_playlists: Removing {current_row=} from {from_rows=}" ) from_rows.remove(self.track_sequence.current.row_number) # Row removal must be wrapped in beginRemoveRows .. endRemoveRows # and the row range must be contiguous. Process the highest rows # first so the lower row numbers are unchanged row_groups = self._reversed_contiguous_row_groups(from_rows) # Handle the moves in row_group chunks for row_group in row_groups: # Prepare source model super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group)) # Prepare destination model insert_rows = InsertRows(to_playlist_id, to_row_number, to_row_number + len(row_group) ) self.signals.signal_begin_insert_rows.emit(insert_rows) ds.playlist_move_rows(from_rows=row_group, from_playlist_id=self.playlist_id, to_row=to_row_number, to_playlist_id=to_playlist_id) self.signals.signal_end_insert_rows.emit(to_playlist_id) super().endRemoveRows() self.refresh_data() self.track_sequence.update() self.update_track_times() def begin_insert_rows(self, insert_rows: InsertRows) -> None: """ Prepare model to insert rows """ if insert_rows.playlist_id != self.playlist_id: return super().beginInsertRows(QModelIndex(), insert_rows.from_row, insert_rows.to_row) def end_insert_rows(self, playlist_id: int) -> None: """ End insert rows """ if playlist_id != self.playlist_id: return super().endInsertRows() self.refresh_data() # @log_call def move_track_add_note( self, new_row_number: int, existing_plr: PlaylistRow, note: str ) -> None: """ Move existing_rat track to new_row_number and append note to any existing note """ if note: playlist_row = self.playlist_rows[existing_plr.row_number] if playlist_row.note: playlist_row.note += "\n" + note else: playlist_row.note = note self.refresh_row(existing_plr.row_number) self.move_rows([existing_plr.row_number], new_row_number) self.signals.resize_rows_signal.emit(self.playlist_id) # @log_call def obs_scene_change(self, row_number: int) -> None: """ Check this row and any preceding headers for OBS scene change command and execute any found """ # Check any headers before this row idx = row_number - 1 while self.is_header_row(idx): idx -= 1 # Step through headers in row order and finish with this row for chkrow in range(idx + 1, row_number + 1): match_obj = scene_change_re.search(self.playlist_rows[chkrow].note) if match_obj: scene_name = match_obj.group(1) if scene_name: ws = obswebsocket.obsws( host=Config.OBS_HOST, port=Config.OBS_PORT, password=Config.OBS_PASSWORD, ) try: ws.connect() ws.call( obswebsocket.requests.SetCurrentProgramScene( sceneName=scene_name ) ) log.debug(f"{self}: OBS scene changed to '{scene_name}'") continue except obswebsocket.exceptions.ConnectionFailure: log.warning(f"{self}: OBS connection refused") return # @log_call def previous_track_ended(self) -> None: """ Notification from musicmuster that the previous track has ended. Actions required: - sanity check - update display """ # Sanity check if not self.track_sequence.previous: log.error( f"{self}: playlistmodel:previous_track_ended called with no current track" ) return if self.track_sequence.previous.row_number is None: log.error( f"{self}: previous_track_ended called with no row number " f"({self.track_sequence.previous=})" ) return # Update display # only invalidate required roles roles = [ Qt.ItemDataRole.BackgroundRole, ] self.invalidate_row(self.track_sequence.previous.row_number, roles) def refresh_data(self) -> None: """ Populate self.playlist_rows with playlist data """ # Note where each playlist_id is by mapping each playlistrow_id # to its current row_number plrid_to_row: dict[int, int] = {} for oldrow in self.playlist_rows: plrdata = self.playlist_rows[oldrow] plrid_to_row[plrdata.playlistrow_id] = plrdata.row_number # build a new playlist_rows new_playlist_rows: dict[int, PlaylistRow] = {} for dto in ds.playlistrows_by_playlist(self.playlist_id): if dto.playlistrow_id not in plrid_to_row: new_playlist_rows[dto.row_number] = PlaylistRow(dto) else: new_playlist_row = self.playlist_rows[plrid_to_row[dto.playlistrow_id]] new_playlist_row.row_number = dto.row_number new_playlist_rows[dto.row_number] = new_playlist_row # Copy to self.playlist_rows self.playlist_rows = new_playlist_rows def refresh_row(self, row_number: int) -> None: """Populate dict for one row from database""" plrid = self.playlist_rows[row_number].playlistrow_id refreshed_row = ds.playlistrow_by_id(plrid) if not refreshed_row: raise ApplicationError(f"Failed to retrieve row {self.playlist_id=}, {row_number=}") self.playlist_rows[row_number] = PlaylistRow(refreshed_row) # @log_call def remove_track(self, row_number: int) -> None: """ Remove track from row, retaining row as a header row """ self.playlist_rows[row_number].track_id = None # only invalidate required roles roles = [ Qt.ItemDataRole.DisplayRole, ] self.invalidate_row(row_number, roles) def rescan_track(self, row_number: int) -> None: """ Rescan track at passed row number """ track = self.playlist_rows[row_number] metadata = get_all_track_metadata(track.path) _ = ds.track_update(track.track_id, metadata) roles = [ Qt.ItemDataRole.BackgroundRole, Qt.ItemDataRole.DisplayRole, ] # only invalidate required roles self.invalidate_row(row_number, roles) self.signals.resize_rows_signal.emit(self.playlist_id) # @log_call def reset_track_sequence_row_numbers(self) -> None: """ Signal handler for when row ordering has changed. Example: row 4 is marked as next. Row 2 is deleted. The PlaylistRows table will be correctly updated with change of row number, but track_sequence.next will still contain row_number==4. This function fixes up the track_sequence row numbers by looking up the playlistrow_id and retrieving the row number from the database. """ self.track_sequence.update() self.update_track_times() def remove_comments(self, row_numbers: list[int]) -> None: """ Remove comments from passed rows """ if not row_numbers: return # Safety check if not ask_yes_no( title="Remove comments", question=f"Remove comments from {len(row_numbers)} rows?", ): return ds.playlist_remove_comments(self.playlist_id, row_numbers) # only invalidate required roles roles = [ Qt.ItemDataRole.BackgroundRole, Qt.ItemDataRole.DisplayRole, Qt.ItemDataRole.ForegroundRole, ] self.invalidate_rows(row_numbers, roles) # @log_call def _reversed_contiguous_row_groups( self, row_numbers: list[int] ) -> list[list[int]]: """ Take the list of row numbers and split into groups of contiguous rows. Return as a list of lists with the highest row numbers first. Example: input [2, 3, 4, 5, 7, 9, 10, 13, 17, 20, 21] return: [[20, 21], [17], [13], [9, 10], [7], [2, 3, 4, 5]] """ result: list[list[int]] = [] temp: list[int] = [] last_value = row_numbers[0] - 1 row_numbers.sort() for idx in range(len(row_numbers)): if row_numbers[idx] != last_value + 1: result.append(temp) temp = [] last_value = row_numbers[idx] temp.append(last_value) if temp: result.append(temp) result.reverse() return result def remove_section_timer_markers(self, header_text: str) -> str: """ Remove characters used to mark section timings from passed header text. Remove text using to signal header colours if colour entry is so marked. Return header text witout markers """ if header_text == "=": return "" while header_text.endswith(Config.SECTION_STARTS): header_text = header_text[0:-1] while header_text.endswith(Config.SECTION_ENDINGS): header_text = header_text[0:-1] # Parse passed header text and remove the first colour match string return ds.notecolours_remove_colour_substring(header_text) def rowCount(self, index: QModelIndex = QModelIndex()) -> int: """Standard function for view""" return len(self.playlist_rows) def section_subtotal_header(self, plr: PlaylistRow) -> str: """ Process this row as subtotal within a timed section and return display text for this row """ count: int = 0 unplayed_count: int = 0 duration: int = 0 if plr.row_number == 0: # Meaningless to have a subtotal on row 0 return Config.SUBTOTAL_ON_ROW_ZERO # Show subtotal for row_number in range(plr.row_number - 1, -1, -1): row_rat = self.playlist_rows[row_number] if self.is_header_row(row_number) or row_number == 0: if row_rat.note.endswith(Config.SECTION_STARTS) or row_number == 0: # If we are playing this section, also # calculate end time when all tracks are played. end_time_str = "" if ( self.track_sequence.current and self.track_sequence.current.end_time and ( row_number < self.track_sequence.current.row_number < plr.row_number ) ): section_end_time = ( self.track_sequence.current.end_time + dt.timedelta(milliseconds=duration) ) end_time_str = ( ", section end time " + section_end_time.strftime(Config.TRACK_TIME_FORMAT) ) clean_header = self.remove_section_timer_markers(plr.note) if clean_header: return ( f"{clean_header} [" f"{unplayed_count}/{count} track{'s' if count > 1 else ''} " f"({ms_to_mmss(duration)}) unplayed{end_time_str}]" ) else: return ( f"[{unplayed_count}/{count} track{'s' if count > 1 else ''} " f"({ms_to_mmss(duration)}) unplayed{end_time_str}]" ) else: continue else: count += 1 if not row_rat.played: unplayed_count += 1 duration += row_rat.duration # We should never get here raise ApplicationError("Error in section_subtotal_header()") def selection_is_sortable(self, row_numbers: list[int]) -> bool: """ Return True if the selection is sortable. That means: - at least two rows selected - selected rows are contiguous - selected rows do not include any header rows """ # at least two rows selected if len(row_numbers) < 2: return False # selected rows are contiguous if sorted(row_numbers) != list(range(min(row_numbers), max(row_numbers) + 1)): return False # selected rows do not include any header rows for row_number in row_numbers: if self.is_header_row(row_number): return False return True # @log_call def set_selected_rows(self, playlist_id: int, selected_row_numbers: list[int]) -> None: """ Handle signal_playlist_selected_rows to keep track of which rows are selected in the view """ if playlist_id != self.playlist_id: return self.selected_rows = [self.playlist_rows[a] for a in selected_row_numbers] # @log_call def set_next_row(self, playlist_id: int) -> None: """ Handle signal_set_next_row """ log.debug(f"{self}: set_next_row({playlist_id=})") if playlist_id != self.playlist_id: # Not for us return if len(self.selected_rows) == 0: # No row selected so clear next track if self.track_sequence.next is not None: self.track_sequence.set_next(None) return if len(self.selected_rows) > 1: self.signals.show_warning_signal.emit( "Too many rows selected", "Select one row for next row" ) return plr = self.selected_rows[0] if plr.track_id is None: raise ApplicationError(f"set_next_row: no track_id ({plr=})") old_next_row: Optional[int] = None if self.track_sequence.next: old_next_row = self.track_sequence.next.row_number self.track_sequence.set_next(plr) roles = [ Qt.ItemDataRole.BackgroundRole, ] if old_next_row is not None: # only invalidate required roles self.invalidate_row(old_next_row, roles) # only invalidate required roles self.invalidate_row(plr.row_number, roles) self.signals.next_track_changed_signal.emit() self.update_track_times() # @log_call def setData( self, index: QModelIndex, value: str | float, role: int = Qt.ItemDataRole.EditRole, ) -> bool: """ Update model with edited data """ if not index.isValid() or role != Qt.ItemDataRole.EditRole: return False row_number = index.row() column = index.column() plr = self.playlist_rows[row_number] if column == Col.TITLE.value: plr.title = str(value) elif column == Col.ARTIST.value: plr.artist = str(value) elif column == Col.INTRO.value: plr.intro = int(round(float(value), 1) * 1000) elif column == Col.NOTE.value: plr.note = str(value) else: raise ApplicationError(f"setData called with unexpected column ({column=})") self.refresh_row(row_number) self.dataChanged.emit(index, index, [Qt.ItemDataRole.DisplayRole, role]) return True def sort_by_artist(self, row_numbers: list[int]) -> None: """ Sort selected rows by artist """ self.sort_by_attribute(row_numbers, "artist") def sort_by_attribute(self, row_numbers: list[int], attr_name: str) -> None: """ Sort selected rows by passed attribute name where 'attribute' is a key in PlaylistRowData """ # Create a subset of playlist_rows with the rows we are # interested in shortlist_rows = {k: self.playlist_rows[k] for k in row_numbers} sorted_list = [ playlist_row.row_number for playlist_row in sorted( shortlist_rows.values(), key=attrgetter(attr_name) ) ] self.move_rows(sorted_list, min(sorted_list)) def sort_by_duration(self, row_numbers: list[int]) -> None: """ Sort selected rows by duration """ self.sort_by_attribute(row_numbers, "duration") def sort_by_lastplayed(self, row_numbers: list[int]) -> None: """ Sort selected rows by lastplayed """ self.sort_by_attribute(row_numbers, "lastplayed") def sort_randomly(self, row_numbers: list[int]) -> None: """ Sort selected rows randomly """ shuffle(row_numbers) self.move_rows(row_numbers, min(row_numbers)) def sort_by_title(self, row_numbers: list[int]) -> None: """ Sort selected rows by title """ self.sort_by_attribute(row_numbers, "title") def start_of_timed_section_header(self, plr: PlaylistRow) -> str: """ Process this row as the start of a timed section and return display text for this row """ count: int = 0 unplayed_count: int = 0 duration: int = 0 clean_header = self.remove_section_timer_markers(plr.note) for row_number in range(plr.row_number + 1, len(self.playlist_rows)): row_rat = self.playlist_rows[row_number] if self.is_header_row(row_number): if row_rat.note.endswith(Config.SECTION_ENDINGS): return ( f"{clean_header} " f"[{count} tracks, {ms_to_mmss(duration)} unplayed]" ) else: continue else: count += 1 if not row_rat.played: unplayed_count += 1 duration += row_rat.duration return ( f"{clean_header} " f"[{count} tracks, {ms_to_mmss(duration, none='none')} " "unplayed (to end of playlist)]" ) def supportedDropActions(self) -> Qt.DropAction: return Qt.DropAction.MoveAction | Qt.DropAction.CopyAction def _tooltip_role(self, row: int, column: int, plr: PlaylistRow) -> str: """ Return tooltip. Currently only used for last_played column. """ if column != Col.LAST_PLAYED.value: return "" track_id = self.playlist_rows[row].track_id if not track_id: return "" return ds.playdates_get_last(track_id) # @log_call def update_or_insert(self, track_id: int, row_number: int) -> None: """ If the passed track_id exists in this playlist, update the row(s), otherwise insert this track at row_number. """ track_rows = [ a.row_number for a in self.playlist_rows.values() if a.track_id == track_id ] if track_rows: for row in track_rows: self.refresh_row(row) # only invalidate required roles roles = [ Qt.ItemDataRole.BackgroundRole, Qt.ItemDataRole.DisplayRole, Qt.ItemDataRole.FontRole, Qt.ItemDataRole.ForegroundRole, ] self.invalidate_rows(track_rows, roles) else: self.insert_row(track_id=track_id) # @log_call def update_track_times(self) -> None: """ Update track start/end times in self.playlist_rows """ next_start_time: Optional[dt.datetime] = None update_rows: list[int] = [] row_count = len(self.playlist_rows) current_track_row = None next_track_row = None if ( self.track_sequence.current and self.track_sequence.current.playlist_id == self.playlist_id ): current_track_row = self.track_sequence.current.row_number # Update current track details now so that they are available # when we deal with next track row which may be above current # track row. self.playlist_rows[current_track_row].set_forecast_start_time( update_rows, self.track_sequence.current.start_time ) if self.track_sequence.next and self.track_sequence.next.playlist_id == self.playlist_id: next_track_row = self.track_sequence.next.row_number for row_number in range(row_count): plr = self.playlist_rows[row_number] # Don't update times for tracks that have been played, for # unreadable tracks or for the current track, handled above. if ( plr.played or row_number == current_track_row or (plr.path and file_is_unreadable(plr.path)) ): continue # Reset start time if timing in header if self.is_header_row(row_number): header_time = get_embedded_time(plr.note) if header_time: next_start_time = header_time continue # Set start time for next row if we have a current track if ( row_number == next_track_row and self.track_sequence.current and self.track_sequence.current.end_time ): next_start_time = plr.set_forecast_start_time( update_rows, self.track_sequence.current.end_time ) continue # If we're between the current and next row, zero out # times if (current_track_row or row_count) < row_number < (next_track_row or 0): plr.set_forecast_start_time(update_rows, None) continue # Set start/end plr.forecast_start_time = next_start_time # Update start/stop times of rows that have changed for updated_row in update_rows: self.dataChanged.emit( self.index(updated_row, Col.START_TIME.value), self.index(updated_row, Col.END_TIME.value), ) class PlaylistProxyModel(QSortFilterProxyModel): """ For searching and filtering """ def __init__( self, ) -> None: super().__init__() # Search all columns self.setFilterKeyColumn(-1) self.track_sequence = TrackSequence() def __repr__(self) -> str: return f"" def filterAcceptsRow(self, source_row: int, source_parent: QModelIndex) -> bool: """ Subclass to filter by played status. Return True to show this row, False to hide it. """ if Config.HIDE_PLAYED_MODE != Config.HIDE_PLAYED_MODE_TRACKS: return super().filterAcceptsRow(source_row, source_parent) if self.sourceModel().played_tracks_hidden: if self.sourceModel().is_played_row(source_row): # Don't hide current track if ( self.track_sequence.current and self.track_sequence.current.playlist_id == self.sourceModel().playlist_id and self.track_sequence.current.row_number == source_row ): return True # Don't hide next track if ( self.track_sequence.next and self.track_sequence.next.playlist_id == self.sourceModel().playlist_id and self.track_sequence.next.row_number == source_row ): return True # Handle previous track if self.track_sequence.previous: if ( self.track_sequence.previous.playlist_id != self.sourceModel().playlist_id or self.track_sequence.previous.row_number != source_row ): # This row isn't our previous track: hide it return False if self.track_sequence.current and self.track_sequence.current.start_time: # This row is our previous track. Don't hide it # until HIDE_AFTER_PLAYING_OFFSET milliseconds # after current track has started if self.track_sequence.current.start_time and dt.datetime.now() > ( self.track_sequence.current.start_time + dt.timedelta( milliseconds=Config.HIDE_AFTER_PLAYING_OFFSET ) ): return False else: # Invalidate this row in # HIDE_AFTER_PLAYING_OFFSET and a bit # milliseconds so that it hides then. We add # 100mS on so that the if clause above is # true next time through. # only invalidate required roles roles = [ Qt.ItemDataRole.DisplayRole, ] QTimer.singleShot( Config.HIDE_AFTER_PLAYING_OFFSET + 100, lambda: self.sourceModel().invalidate_row( source_row, roles ), ) return True # Next track not playing yet so don't hide previous else: return True # No previous track so hide this played track immediately return False return super().filterAcceptsRow(source_row, source_parent) def set_incremental_search(self, search_string: str) -> None: """ Update search pattern """ self.setFilterRegularExpression( QRegularExpression( search_string, QRegularExpression.PatternOption.CaseInsensitiveOption ) ) def sourceModel(self) -> PlaylistModel: """ Override sourceModel to return correct type """ return cast(PlaylistModel, super().sourceModel())