import os import re import stackprinter # type: ignore import subprocess import threading import obsws_python as obs # type: ignore from datetime import datetime, timedelta from typing import Any, Callable, cast, List, Optional, Tuple, TYPE_CHECKING from PyQt6.QtCore import ( QEvent, QModelIndex, QObject, Qt, # QTimer, ) from PyQt6.QtGui import QAction, QBrush, QColor, QFont, QDropEvent, QKeyEvent from PyQt6.QtWidgets import ( QAbstractItemDelegate, QAbstractItemView, QApplication, QHeaderView, QMenu, QMessageBox, QPlainTextEdit, QStyledItemDelegate, QStyleOptionViewItem, QTableView, QTableWidgetItem, QWidget, QProxyStyle, QStyle, QStyleOption, ) from dbconfig import Session, scoped_session from dialogs import TrackSelectDialog from classes import MusicMusterSignals from config import Config from helpers import ( ask_yes_no, file_is_unreadable, get_relative_date, ms_to_mmss, open_in_audacity, send_mail, set_track_metadata, ) from log import log from models import PlaylistRows, Settings, Tracks, NoteColours if TYPE_CHECKING: from musicmuster import Window from playlistmodel import PlaylistModel HEADER_NOTES_COLUMN = 2 class EscapeDelegate(QStyledItemDelegate): """ - increases the height of a row when editing to make editing easier - closes the edit on control-return - checks with user before abandoning edit on Escape """ def __init__(self, parent) -> None: super().__init__(parent) self.signals = MusicMusterSignals() def createEditor( self, parent: Optional[QWidget], option: QStyleOptionViewItem, index: QModelIndex, ): """ Intercept createEditor call and make row just a little bit taller """ self.signals = MusicMusterSignals() self.signals.enable_escape_signal.emit(False) if isinstance(self.parent(), PlaylistTab): p = cast(PlaylistTab, self.parent()) if isinstance(index.data(), str): row = index.row() row_height = p.rowHeight(row) p.setRowHeight(row, row_height + Config.MINIMUM_ROW_HEIGHT) return QPlainTextEdit(parent) return super().createEditor(parent, option, index) def destroyEditor(self, editor: Optional[QWidget], index: QModelIndex) -> None: """ Intercept editor destroyment """ self.signals.enable_escape_signal.emit(True) return super().destroyEditor(editor, index) def eventFilter(self, editor: Optional[QObject], event: Optional[QEvent]) -> bool: """By default, QPlainTextEdit doesn't handle enter or return""" if editor is None or event is None: return False if event.type() == QEvent.Type.KeyPress: key_event = cast(QKeyEvent, event) if key_event.key() == Qt.Key.Key_Return: if key_event.modifiers() == (Qt.KeyboardModifier.ControlModifier): self.commitData.emit(editor) self.closeEditor.emit(editor) return True elif key_event.key() == Qt.Key.Key_Escape: discard_edits = QMessageBox.question( cast(QWidget, self), "Abandon edit", "Discard changes?" ) if discard_edits == QMessageBox.StandardButton.Yes: self.closeEditor.emit(editor) return True return False def setEditorData(self, editor, index): value = index.model().data(index, Qt.ItemDataRole.EditRole) editor.setPlainText(value.value()) def setModelData(self, editor, model, index): value = editor.toPlainText() model.setData(index, value, Qt.ItemDataRole.EditRole) def updateEditorGeometry(self, editor, option, index): editor.setGeometry(option.rect) class PlaylistStyle(QProxyStyle): def drawPrimitive(self, element, option, painter, widget=None): """ Draw a line across the entire row rather than just the column we're hovering over. """ if ( element == QStyle.PrimitiveElement.PE_IndicatorItemViewItemDrop and not option.rect.isNull() ): option_new = QStyleOption(option) option_new.rect.setLeft(0) if widget: option_new.rect.setRight(widget.width()) option = option_new super().drawPrimitive(element, option, painter, widget) class PlaylistTab(QTableView): def __init__( self, musicmuster: "Window", playlist_id: int, ) -> None: super().__init__() # Save passed settings self.musicmuster = musicmuster self.playlist_id = playlist_id # Set up widget self.setItemDelegate(EscapeDelegate(self)) self.setAlternatingRowColors(True) self.setSelectionMode(QAbstractItemView.SelectionMode.ExtendedSelection) self.setSelectionBehavior(QAbstractItemView.SelectionBehavior.SelectRows) # self.setEditTriggers(QAbstractItemView.EditTrigger.DoubleClicked) self.setVerticalScrollMode(QAbstractItemView.ScrollMode.ScrollPerPixel) self.setDragDropMode(QAbstractItemView.DragDropMode.InternalMove) self.setDragDropOverwriteMode(False) self.setAcceptDrops(True) # Set our custom style - this draws the drop indicator across the whole row self.setStyle(PlaylistStyle()) # TODO: change this later to only enable drags when multiple # rows selected self.setDragEnabled(True) # Prepare for context menu self.menu = QMenu() self.setContextMenuPolicy(Qt.ContextMenuPolicy.CustomContextMenu) self.customContextMenuRequested.connect(self._context_menu) # Connect signals # This dancing is to satisfy mypy h_header = self.horizontalHeader() if isinstance(h_header, QHeaderView): h_header.sectionResized.connect(self._column_resize) h_header.setStretchLastSection(True) # self.itemSelectionChanged.connect(self._select_event) # self.signals.set_next_track_signal.connect(self._reset_next) self.signals = MusicMusterSignals() self.signals.span_cells_signal.connect(self._span_cells) # Call self.eventFilter() for events # self.installEventFilter(self) # Initialise miscellaneous instance variables self.search_text: str = "" self.sort_undo: List[int] = [] # self.edit_cell_type: Optional[int] # Load playlist rows self.setModel(PlaylistModel(playlist_id)) self._set_column_widths() # kae def __repr__(self) -> str: # kae return f"" # ########## Events other than cell editing ########## def dropEvent(self, event): if event.source() is not self or ( event.dropAction() != Qt.DropAction.MoveAction and self.dragDropMode() != QAbstractItemView.InternalMove ): super().dropEvent(event) from_rows = list(set([a.row() for a in self.selectedIndexes()])) to_row = self.indexAt(event.position().toPoint()).row() if ( 0 <= min(from_rows) <= self.model().rowCount() and 0 <= max(from_rows) <= self.model().rowCount() and 0 <= to_row <= self.model().rowCount() ): self.model().move_rows(from_rows, to_row) event.accept() super().dropEvent(event) # def dropEvent(self, event: Optional[QDropEvent]) -> None: # """ # Handle drag/drop of rows # https://stackoverflow.com/questions/26227885/drag-and-drop-rows-within-qtablewidget # """ # if not event: # return # if not event.source() == self: # return # We don't accept external drops # top_row = self.rowAt(0) # row_set = set([mi.row() for mi in self.selectedIndexes()]) # targetRow = self.indexAt(event.position().toPoint()).row() # row_set.discard(targetRow) # rows = list(sorted(row_set)) # if not rows: # return # if targetRow == -1: # targetRow = self.rowCount() # for _ in range(len(rows)): # self.insertRow(targetRow) # rowMapping = dict() # Src row to target row. # for idx, row in enumerate(rows): # if row < targetRow: # rowMapping[row] = targetRow + idx # else: # rowMapping[row + len(rows)] = targetRow + idx # colCount = self.columnCount() # for srcRow, tgtRow in sorted(rowMapping.items()): # if self._get_row_track_id(srcRow): # # This is a track row # for col in range(0, colCount): # self.setItem(tgtRow, col, self.takeItem(srcRow, col)) # else: # self.setItem( # tgtRow, # HEADER_NOTES_COLUMN, # self.takeItem(srcRow, HEADER_NOTES_COLUMN), # ) # self.setSpan(tgtRow, HEADER_NOTES_COLUMN, 1, len(columns) - 1) # for row in reversed(sorted(rowMapping.keys())): # self.removeRow(row) # self.resizeRowsToContents() # # Scroll to drop zone # self.scrollToItem( # self.item(top_row, 1), QAbstractItemView.ScrollHint.PositionAtTop # ) # event.accept() # # Reset drag mode to allow row selection by dragging # self.setDragEnabled(False) # # Disable sort undo # self.sort_undo = [] # with Session() as session: # self.save_playlist(session) # self._update_start_end_times(session) # self.hide_or_show_played_tracks() def _add_context_menu( self, text: str, action: Callable, disabled: bool = False, parent_menu: Optional[QMenu] = None, ) -> Optional[QAction]: """ Add item to self.menu """ if parent_menu is None: parent_menu = self.menu menu_item = parent_menu.addAction(text) if not menu_item: return None menu_item.setDisabled(disabled) menu_item.triggered.connect(action) return menu_item # def mouseReleaseEvent(self, event): # """ # Enable dragging if rows are selected # """ # if self.selectedIndexes(): # self.setDragEnabled(True) # else: # self.setDragEnabled(False) # super().mouseReleaseEvent(event) # ########## Cell editing ########## # We only want to allow cell editing on tracks, artists and notes, # although notes may be section headers. # # Once editing starts, we need to disable play controls so that a # 'return' doesn't play the next track. # # Earlier in this file: # self.setEditTriggers(QAbstractItemView.DoubleClicked) - triggers # editing on double-click # # Call sequences: # Start editing: # edit() # End editing: # _cell_changed() (only if changes made) # closeEditor() # def _cell_changed(self, row: int, column: int) -> None: # """Called when cell content has changed""" # # Disable cell changed signal connection as note updates will # # change cell again (metadata) # self.cellChanged.disconnect(self._cell_changed) # cell = self.item(row, column) # if not cell: # return # new_text = cell.text().strip() # # Update cell with strip()'d text # cell.setText(new_text) # track_id = self._get_row_track_id(row) # # Determine cell type changed # with Session() as session: # # Get playlistrow object # plr_id = self._get_row_plr_id(row) # plr_item = session.get(PlaylistRows, plr_id) # if not plr_item: # return # # Note any updates needed to PlaylistTrack objects # update_current = self.musicmuster.current_track.plr_id == plr_id # update_next = self.musicmuster.next_track.plr_id == plr_id # if self.edit_cell_type == ROW_NOTES: # plr_item.note = new_text # if track_id: # self._set_row_note_text(session, row, new_text) # else: # self._set_row_header_text(session, row, new_text) # else: # if track_id: # track = session.get(Tracks, track_id) # if track: # if self.edit_cell_type == TITLE: # track.title = new_text # if update_current: # self.musicmuster.current_track.title = new_text # if update_next: # self.musicmuster.next_track.title = new_text # elif self.edit_cell_type == ARTIST: # track.artist = new_text # if update_current: # self.musicmuster.current_track.artist = new_text # if update_next: # self.musicmuster.next_track.artist = new_text # if update_next or update_current: # self.musicmuster.update_headers() # if update_current: # self._set_row_colour_current(row) # elif update_next: # self._set_row_colour_next(row) # self.clear_selection() # def closeEditor( # self, editor: QWidget, hint: QAbstractItemDelegate.EndEditHint # ) -> None: # """ # Override PySide2.QAbstractItemView.closeEditor to enable # play controls and update display. # """ # # If edit was cancelled (eg, by pressing ESC), the signal will # # still be connected # try: # self.cellChanged.disconnect(self._cell_changed) # except TypeError: # pass # self.edit_cell_type = None # self.musicmuster.enable_play_next_controls() # self.musicmuster.actionSetNext.setEnabled(True) # self.musicmuster.action_Clear_selection.setEnabled(True) # super(PlaylistTab, self).closeEditor(editor, hint) # # Optimise row heights after increasing row height for editing # self.resizeRowsToContents() # # Update start times in case a start time in a note has been # # edited # with Session() as session: # self._update_start_end_times(session) # def edit( # self, # index: QModelIndex, # type: ignore # FIXME # trigger: QAbstractItemView.EditTrigger, # event: QEvent, # ) -> bool: # """ # Override PySide2.QAbstractItemView.edit to catch when editing starts # Editing only ever starts with a double click on a cell # """ # # 'result' will only be true on double-click # result = super(PlaylistTab, self).edit(index, trigger, event) # if result: # row = index.row() # column = index.column() # note_column = 0 # if self._get_row_track_id(row): # # If a track row, we only allow editing of title, artist and # # note. Check that this column is one of those. # if column in [TITLE, ARTIST, ROW_NOTES]: # self.edit_cell_type = column # else: # # Can't edit other columns # return False # # Check whether we're editing a notes row for later # if self.edit_cell_type == ROW_NOTES: # note_column = ROW_NOTES # else: # # This is a section header. # if column != HEADER_NOTES_COLUMN: # return False # note_column = HEADER_NOTES_COLUMN # self.edit_cell_type = ROW_NOTES # # Disable play controls so that keyboard input doesn't # # disturb playing # self.musicmuster.disable_play_next_controls() # self.musicmuster.actionSetNext.setEnabled(False) # self.musicmuster.action_Clear_selection.setEnabled(False) # # If this is a note cell, we need to remove any existing section # # timing so user can't edit that. Keep it simple: refresh text # # from database. note_column will only be non-zero if we are # # editing a note. # if note_column: # with Session() as session: # plr_item = self._get_row_plr(session, row) # if not plr_item: # return False # if note_column == ROW_NOTES: # self._set_row_note_text(session, row, plr_item.note) # else: # self._set_row_header_text(session, row, plr_item.note) # # Connect signal so we know when cell has changed. # self.cellChanged.connect(self._cell_changed) # return result # # ########## Externally called functions ########## # def clear_next(self) -> None: # """ # Unmark next track # """ # row_number = self._get_next_track_row_number() # if not row_number: # return # self._set_row_colour_default(row_number) # self.clear_selection() # self.musicmuster.set_next_plr_id(None, self) def clear_selection(self) -> None: """Unselect all tracks and reset drag mode""" self.clearSelection() # self.setDragEnabled(False) def get_selected_row_number(self) -> Optional[int]: """ Return the selected row number or None if none selected. """ sm = self.selectionModel() if sm and sm.hasSelection(): index = sm.currentIndex() if index.isValid(): return index.row() return None # def get_new_row_number(self) -> int: # """ # Return the selected row or the row count if no row selected # (ie, new row will be appended) # """ # if self.selectionModel().hasSelection(): # return self.currentRow() # else: # return self.rowCount() def get_selected_playlistrow_ids(self) -> list: """ Return a list of PlaylistRow ids of the selected rows """ return [self._get_row_plr_id(a) for a in self._get_selected_rows()] # def get_selected_playlistrows(self, session: scoped_session) -> List[PlaylistRows]: # """ # Return a list of PlaylistRows of the selected rows # """ # plr_ids = self.get_selected_playlistrow_ids() # if not plr_ids: # return [] # plrs = [session.get(PlaylistRows, a) for a in plr_ids] # return [plr for plr in plrs if plr is not None] # def get_selected_row_track_path(self) -> Optional[str]: # """ # Return the path of the first selected row or # None if no rows are selected or first selected row doesn't # have a track. # """ # first_selected_row = self._get_selected_row() # if first_selected_row is None: # return None # path = self._get_row_track_path(first_selected_row) # if not path: # return None # return path # def hide_or_show_played_tracks(self) -> None: # """ # Hide or show played tracks. # Never hide current or next track # """ # current_next = [ # self._get_current_track_row_number(), # self._get_next_track_row_number(), # ] # for row_number in range(self.rowCount()): # if row_number in current_next: # continue # if self._get_row_userdata(row_number, self.PLAYED): # if self.musicmuster.hide_played_tracks: # self.hideRow(row_number) # else: # self.showRow(row_number) # # This causes scrolling, so ensure current track is visible # self.scroll_current_to_top() # def insert_header(self, session: scoped_session, note: str) -> None: # """ # Insert section header into playlist tab. # If a row is selected, add header above. Otherwise, add to end of # playlist. # We simply build a PlaylistRows object and pass it to insert_row() # to do the heavy lifing. # """ # row_number = self.get_new_row_number() # TODO: check arg order plr = PlaylistRows(session, self.playlist_id, None, row_number, note) # self.insert_row(session, plr) # self._set_row_header_text(session, row_number, note) # self.save_playlist(session) # self._update_start_end_times(session) # def insert_row( # self, # session: scoped_session, # plr: PlaylistRows, # update_track_times: bool = True, # played=False, # ) -> None: # """ # Insert passed playlist row (plr) into playlist tab. # """ # row_number = plr.plr_rownum # bold = True # self.insertRow(row_number) # _ = self._set_row_plr_id(row_number, plr.id) # if plr.track: # self._update_row_track_info(session, row_number, plr.track) # if played: # bold = False # _ = self._set_row_userdata(row_number, self.PLAYED, True) # self._set_row_note_text(session, row_number, plr.note) # else: # # This is a section header so it must have note text # if plr.note is None: # log.debug(f"insert_row({plr=}) with no track_id and no note") # return # # Use one QTableWidgetItem to span all columns from column 1 # self._set_row_header_text(session, row_number, plr.note) # self.setSpan(row_number, HEADER_NOTES_COLUMN, 1, len(columns) - 1) # # Save (or clear) track_id # _ = self._set_row_track_id(row_number, 0) # # Set bold as needed # self._set_row_bold(row_number, bold) # def insert_track( # self, # session: scoped_session, # track: Tracks, # note: str = "", # repaint: bool = True, # target_row: Optional[int] = None, # ) -> None: # """ # Insert track into playlist tab. # If a row is selected, add track above. Otherwise, add to end of # playlist. # We simply build a PlaylistRows object and pass it to insert_row() # to do the heavy lifing. # """ # if not track: # log.debug( # f"insert_track(session={hex(id(Session))}, {note=}, {repaint=}" # " called with no track" # ) # return # if target_row: # row_number = target_row # else: # row_number = self.get_new_row_number() # # Check to see whether track is already in playlist # existing_plr = PlaylistRows.get_track_plr(session, track.id, self.playlist_id) # if existing_plr and ask_yes_no( # "Duplicate row", # "Track already in playlist. " "Move to new location?", # default_yes=True, # ): # # Yes it is and we should reuse it # # If we've been passed a note, we need to add that to the # # existing track # if note: # existing_plr.append_note(note) # return self._move_row(session, existing_plr, row_number) # # Build playlist_row object # plr = TODO: check arg order PlaylistRows(session, self.playlist_id, track.id, row_number, note) # self.insert_row(session, plr) # self.save_playlist(session) # self._update_start_end_times(session) # def lookup_row_in_songfacts(self) -> None: # """ # If there is a selected row and it is a track row, # look up its title in songfacts. # If multiple rows are selected, only consider the first one. # Otherwise return. # """ # self._look_up_row(website="songfacts") # def lookup_row_in_wikipedia(self) -> None: # """ # If there is a selected row and it is a track row, # look up its title in wikipedia. # If multiple rows are selected, only consider the first one. # Otherwise return. # """ # self._look_up_row(website="wikipedia") # def play_ended(self) -> None: # """ # Called by musicmuster when play has ended. # current_track points to track that's just finished # """ # row_number = self._get_current_track_row_number() # if row_number is None: # return # self._set_row_colour_default(row_number) # self.clear_selection() # self._set_row_last_played_time( # row_number, self.musicmuster.current_track.start_time # ) # with Session() as session: # self._set_row_note_colour(session, row_number) # def play_started(self, session: scoped_session) -> None: # """ # Notification from musicmuster that track has started playing. # Actions required: # - Mark current row as played # - Set next track # - Display track as current # - Update start/stop times # - Change OBS scene if needed # - Update hidden tracks # """ # print("playlists_v3:play_starter()") # return # # current_row = self._get_current_track_row_number() # # if current_row is None: # # if os.environ["MM_ENV"] == "PRODUCTION": # # send_mail( # # Config.ERRORS_TO, # # Config.ERRORS_FROM, # # "playlists:play_started:current_row is None", # # stackprinter.format(), # # ) # # print("playlists:play_started:current_row is None") # # # stackprinter.show(add_summary=True, style="darkbg") # # return # # # Mark current row as played # # self._set_played_row(session, current_row) # # # Set next track # # next_row = self._find_next_track_row(session, current_row + 1) # # if next_row: # # self.musicmuster.set_next_plr_id(self._get_row_plr_id(next_row), self) # # # Display row as current track # # self._set_row_colour_current(current_row) # # # Update start/stop times # # self._update_start_end_times(session) # # # Change OBS scene if needed # # self._obs_change_scene(current_row) # # # Update hidden tracks # # QTimer.singleShot( # # Config.HIDE_AFTER_PLAYING_OFFSET, self.hide_or_show_played_tracks # # ) # def populate_display( # self, session: scoped_session, playlist_id: int, scroll_to_top: bool = True # ) -> None: # """ # Populate display from the associated playlist ID # """ # print("playlists_v3:populate_display()") # return # # # Sanity check row numbering before we load # # PlaylistRows.fixup_rownumbers(session, playlist_id) # # # Clear playlist # # self.setRowCount(0) # # # Get played tracks # # played_rows = self._get_played_rows(session) # # # Add the rows # # playlist = session.get(Playlists, playlist_id) # # if not playlist: # # if os.environ["MM_ENV"] == "PRODUCTION": # # send_mail( # # Config.ERRORS_TO, # # Config.ERRORS_FROM, # # "playlists:populate_display:no playlist", # # stackprinter.format(), # # ) # # print("playlists:populate_display:no playlist") # # # stackprinter.show(add_summary=True, style="darkbg") # # return # # for plr in PlaylistRows.deep_rows(session, playlist_id): # # self.insert_row( # # session, # # plr, # # update_track_times=False, # # played=plr.plr_rownum in played_rows, # # ) # # # Scroll to top # # if scroll_to_top: # # row0_item = self.item(0, 0) # # if row0_item: # # self.scrollToItem(row0_item, QAbstractItemView.ScrollHint.PositionAtTop) # # # Queue up time calculations to take place after UI has # # # updated # # self._update_start_end_times(session) # # # It's possible that the current/next tracks are in this # # # playlist, so check and set. # # current_row = self._get_current_track_row_number() # # if current_row is not None: # # self._set_row_colour_current(current_row) # # next_row = self._get_next_track_row_number() # # if next_row is not None: # # self._set_row_colour_next(next_row) # # # Needed to wrap notes column correctly - add to event queue so # # # that it's processed after list is populated # # QTimer.singleShot(0, self.tab_visible) # def remove_rows(self, row_numbers: List[int]) -> None: # """Remove passed rows from display""" # # Remove rows from display. Do so in reverse order so that # # row numbers remain valid. # for row in sorted(row_numbers, reverse=True): # self.removeRow(row) # def save_playlist(self, session: scoped_session) -> None: # """ # Get the PlaylistRow objects for each row in the display. Correct # the row_number and playlist_id if necessary. Remove any row # numbers in the database that are higher than the last row in # the display. # """ # # Ensure all row plrs have correct row number and playlist_id # for row_number in range(self.rowCount()): # plr = self._get_row_plr(session, row_number) # if not plr: # continue # plr.plr_rownum = row_number # plr.playlist_id = self.playlist_id # # Any rows in the database for this playlist that has a row # # number equal to or greater than the row count needs to be # # removed. # PlaylistRows.delete_higher_rows(session, self.playlist_id, self.rowCount() - 1) # # Get changes into db # session.flush() # def scroll_current_to_top(self) -> None: # """Scroll currently-playing row to top""" # current_row = self._get_current_track_row_number() # if current_row is not None: # self._scroll_to_top(current_row) # def scroll_next_to_top(self) -> None: # """Scroll nextly-playing row to top""" # next_row = self._get_next_track_row_number() # if next_row is not None: # self._scroll_to_top(next_row) def set_search(self, text: str) -> None: """Set search text and find first match""" self.search_text = text if not text: # Search string has been reset return self._search(next=True) # def search_next(self) -> None: # """ # Select next row containg self.search_string. # """ # self._search(next=True) # def search_previous(self) -> None: # """ # Select previous row containg self.search_string. # """ # self._search(next=False) # def select_next_row(self) -> None: # """ # Select next or first row. Don't select section headers. # Wrap at last row. # """ # selected_rows = self._get_selected_rows() # # we will only handle zero or one selected rows # if len(selected_rows) > 1: # return # # select first row if none selected # if len(selected_rows) == 0: # row_number = 0 # else: # row_number = selected_rows[0] + 1 # if row_number >= self.rowCount(): # row_number = 0 # # Don't select section headers # wrapped = False # track_id = self._get_row_track_id(row_number) # while not track_id: # row_number += 1 # if row_number >= self.rowCount(): # if wrapped: # # we're already wrapped once, so there are no # # non-headers # return # row_number = 0 # wrapped = True # track_id = self._get_row_track_id(row_number) # self.selectRow(row_number) # def select_previous_row(self) -> None: # """ # Select previous or last track. Don't select section headers. # Wrap at first row. # """ # selected_rows = self._get_selected_rows() # # we will only handle zero or one selected rows # if len(selected_rows) > 1: # return # # select last row if none selected # last_row = self.rowCount() - 1 # if len(selected_rows) == 0: # row_number = last_row # else: # row_number = selected_rows[0] - 1 # if row_number < 0: # row_number = last_row # # Don't select section headers # wrapped = False # track_id = self._get_row_track_id(row_number) # while not track_id: # row_number -= 1 # if row_number < 0: # if wrapped: # # we're already wrapped once, so there are no # # non-notes # return # row_number = last_row # wrapped = True # track_id = self._get_row_track_id(row_number) # self.selectRow(row_number) # def select_rows(self, rows: List[int]) -> None: # """ # Select rows that are passed # """ # # Clear any selected rows to avoid confustion # self.clear_selection() # # We need to be in MultiSelection mode # self.setSelectionMode(QAbstractItemView.SelectionMode.MultiSelection) # # Select the rows # for row in rows: # self.selectRow(row) # # Reset selection mode # self.setSelectionMode(QAbstractItemView.SelectionMode.ExtendedSelection) def set_row_as_next_track(self) -> None: """ Set selected row as next track """ selected_row = self._get_selected_row() if selected_row is None: return model = cast(PlaylistModel, self.model()) model.set_next_row(selected_row) self.clearSelection() # def tab_visible(self) -> None: # """Called when tab becomes visible""" # # Set row heights # self.resizeRowsToContents() # self.setColumnWidth(len(columns) - 1, 0) # # Hide/show rows # self.hide_or_show_played_tracks() # # # ########## Internally called functions ########## def _add_track(self, row_number: int) -> None: """Add a track to a section header making it a normal track row""" with Session() as session: dlg = TrackSelectDialog( session=session, new_row_number=row_number, playlist_id=self.playlist_id, add_to_header=True, ) dlg.exec() def _build_context_menu(self, item: QTableWidgetItem) -> None: """Used to process context (right-click) menu, which is defined here""" self.menu.clear() row_number = item.row() # track_id = self._get_row_track_id(row_number) # track_row = bool(track_id) header_row = False model = cast(PlaylistModel, self.model()) if model: header_row = model.is_header_row(row_number) # current = row_number == self._get_current_track_row_number() # next_row = row_number == self._get_next_track_row_number() # # Play with mplayer # if track_row and not current: # self._add_context_menu( # "Play with mplayer", lambda: self._mplayer_play(row_number) # ) # # Paste # self._add_context_menu( # "Paste", # lambda: self.musicmuster.paste_rows(), # self.musicmuster.selected_plrs is None, # ) # # Open in Audacity # if track_row and not current: # self._add_context_menu( # "Open in Audacity", lambda: self._open_in_audacity(row_number) # ) # # Rescan # if track_row and not current: # self._add_context_menu( # "Rescan track", lambda: self._rescan(row_number, track_id) # ) # # ---------------------- self.menu.addSeparator() # # Remove row # if not current and not next_row: # self._add_context_menu("Delete row", self._delete_rows) # # Move to playlist # if not current and not next_row: # self._add_context_menu( # "Move to playlist...", self.musicmuster.move_selected # ) # # ---------------------- # self.menu.addSeparator() # # Remove track from row # if track_row and not current and not next_row: # self._add_context_menu( # "Remove track from row", lambda: self._remove_track(row_number) # ) # Add track to section header (ie, make this a track row) if header_row: self._add_context_menu("Add a track", lambda: self._add_track(row_number)) # # Mark unplayed # if self._get_row_userdata(row_number, self.PLAYED): # self._add_context_menu("Mark unplayed", self._mark_unplayed) # # Unmark as next # if next_row: # self._add_context_menu("Unmark as next track", self.clear_next) # # ---------------------- self.menu.addSeparator() # # Sort # sort_menu = self.menu.addMenu("Sort") # self._add_context_menu( # "by title", lambda: self._sort_selection(TITLE), parent_menu=sort_menu # ) # self._add_context_menu( # "by artist", lambda: self._sort_selection(ARTIST), parent_menu=sort_menu # ) # self._add_context_menu( # "by duration", lambda: self._sort_selection(DURATION), parent_menu=sort_menu # ) # self._add_context_menu( # "by last played", # lambda: self._sort_selection(LASTPLAYED), # parent_menu=sort_menu, # ) # if sort_menu: # sort_menu.setEnabled(self._sortable()) # self._add_context_menu("Undo sort", self._sort_undo, not bool(self.sort_undo)) # # Build submenu # # ---------------------- # self.menu.addSeparator() # # Info # if track_row: # self._add_context_menu("Info", lambda: self._info_row(track_id)) # # Track path # if track_row: # self._add_context_menu( # "Copy track path", lambda: self._copy_path(row_number) # ) # # return super(PlaylistTab, self).eventFilter(source, event) def _calculate_end_time( self, start: Optional[datetime], duration: int ) -> Optional[datetime]: """Return datetime 'duration' ms after 'start'""" if start is None: return None return start + timedelta(milliseconds=duration) def _column_resize(self, column_number: int, _old: int, _new: int) -> None: """ Called when column width changes. Save new width to database. """ header = self.horizontalHeader() if not header: return # Resize rows if necessary self.resizeRowsToContents() with Session() as session: attr_name = f"playlist_col_{column_number}_width" record = Settings.get_int_settings(session, attr_name) record.f_int = self.columnWidth(column_number) def _context_menu(self, pos): """Display right-click menu""" item = self.indexAt(pos) self._build_context_menu(item) self.menu.exec(self.mapToGlobal(pos)) def _copy_path(self, row_number: int) -> None: """ If passed row_number has a track, copy the track path, single-quoted, to the clipboard. Otherwise, return None. """ track_path = self._get_row_track_path(row_number) if not track_path: return replacements = [ ("'", "\\'"), (" ", "\\ "), ("(", "\\("), (")", "\\)"), ] for old, new in replacements: track_path = track_path.replace(old, new) cb = QApplication.clipboard() cb.clear(mode=cb.Mode.Clipboard) cb.setText(track_path, mode=cb.Mode.Clipboard) def _delete_rows(self) -> None: """ Delete mutliple rows Actions required: - Remove the rows from the display - Save the playlist - Update track start/end times """ rows_to_delete: List[int] = [] with Session() as session: plrs = self.get_selected_playlistrows(session) row_count = len(plrs) if not row_count: return # Get confirmation plural = "s" if row_count > 1 else "" if not ask_yes_no("Delete rows", f"Really delete {row_count} row{plural}?"): return rows_to_delete = [plr.plr_rownum for plr in plrs] # Delete rows from database. Would be more efficient to # query then have a single delete. for plr in plrs: session.delete(plr) # Remove from display self.remove_rows(rows_to_delete) # Need to save the playlist to ensure the PlaylistRows have # the correct row_number self.save_playlist(session) # Reset drag mode # self.setDragEnabled(False) self._update_start_end_times(session) # def _find_next_track_row( # self, session: scoped_session, starting_row: Optional[int] = None # ) -> Optional[int]: # """ # Find next track to play. If a starting row is given, start there; # otherwise, start from top. Skip rows already played. # If not found, return None. # If found, return row number. # """ # if starting_row is None: # starting_row = 0 # track_rows = [ # p.plr_rownum # for p in PlaylistRows.get_rows_with_tracks(session, self.playlist_id) # ] # played_rows = [ # p.plr_rownum # for p in PlaylistRows.get_played_rows(session, self.playlist_id) # ] # for row_number in range(starting_row, self.rowCount()): # if row_number not in track_rows or row_number in played_rows: # continue # plr = selWIP V3: play track workingf._get_row_plr(session, row_number) # if not plr: # continue # if file_is_unreadable(plr.track.path): # continue # else: # return row_number # return None def _get_current_track_row_number(self) -> Optional[int]: """Return current track row or None""" current_track = self.musicmuster.current_track if not current_track or not current_track.plr_id: return None return self._plrid_to_row_number(current_track.plr_id) def _get_next_track_row_number(self) -> Optional[int]: """Return next track row or None""" next_track = self.musicmuster.next_track if not next_track or not next_track.plr_id: return None return self._plrid_to_row_number(next_track.plr_id) # @staticmethod # def _get_note_text_time(text: str) -> Optional[datetime]: # """Return datetime specified as @hh:mm:ss in text""" # try: # match = start_time_re.search(text) # except TypeError: # return None # if not match: # return None # try: # return datetime.strptime(match.group(0)[1:], Config.NOTE_TIME_FORMAT) # except ValueError: # return None def _get_played_rows(self, session: scoped_session) -> List[int]: """ Return a list of row numbers that have been played """ return [ p.plr_rownum for p in PlaylistRows.get_played_rows(session, self.playlist_id) if p.plr_rownum is not None ] def _get_row_artist(self, row_number: int) -> str: """Return artist on this row_number or None if none""" item_artist = self.item(row_number, ARTIST) if not item_artist: return "" return item_artist.text() def _get_row_duration(self, row_number: int) -> int: """Return duration associated with this row_number""" duration_udata = self._get_row_userdata(row_number, self.ROW_DURATION) if not duration_udata: return 0 else: return int(duration_udata) def _get_row_last_played(self, row_number: int) -> Optional[datetime]: """Return last played datetime associated with this row_number""" return self._get_row_userdata(row_number, self.ROW_LAST_PLAYED) def _get_row_note(self, row_number: int) -> str: """Return note on this row_number or null string if none""" track_id = self._get_row_track_id(row_number) if track_id: item_note = self.item(row_number, ROW_NOTES) else: item_note = self.item(row_number, HEADER_NOTES_COLUMN) if not item_note: return "" return item_note.text() def _get_row_path(self, row_number: int) -> str: """ Return path of track associated with this row_number or null string """ path = str(self._get_row_userdata(row_number, self.TRACK_PATH)) if not path: return "" return path def _get_row_plr( self, session: scoped_session, row_number: int ) -> Optional[PlaylistRows]: """ Return PlaylistRows object for this row_number """ return session.get(PlaylistRows, self._get_row_plr_id(row_number)) def _get_row_plr_id(self, row_number: int) -> int: """Return the plr_id associated with this row_number or 0""" plr_id = self._get_row_userdata(row_number, self.PLAYLISTROW_ID) if not plr_id: return 0 else: return int(plr_id) def _get_row_title(self, row_number: int) -> Optional[str]: """Return title on this row_number or None if none""" # Header rows may have note in TITLE row so check for track_id if not self._get_row_track_id(row_number): return None item_title = self.item(row_number, TITLE) if not item_title: return None return item_title.text() def _get_row_track( self, session: scoped_session, row_number: int ) -> Optional[Tracks]: """Return the track associated with this row_number or None""" track_id = self._get_row_track_id(row_number) if track_id: return session.get(Tracks, track_id) else: return None def _get_row_track_id(self, row_number: int) -> int: """Return the track_id associated with this row_number or None""" track_id = self._get_row_userdata(row_number, self.ROW_TRACK_ID) if not track_id: return 0 else: return int(track_id) def _get_row_track_path(self, row_number: int) -> str: """Return the track path associated with this row_number or ''""" path = self._get_row_userdata(row_number, self.TRACK_PATH) if not path: return "" else: return str(path) def _get_row_userdata(self, row_number: int, role: int) -> Optional[Any]: """ Return the specified userdata, if any. """ userdata_item = self.item(row_number, USERDATA) if not userdata_item: return None return userdata_item.data(role) def _get_section_timing_string( self, total_time: int, unplayed_time: int, no_end: bool = False ) -> str: """Return string describing section duration""" total_duration = ms_to_mmss(total_time) if unplayed_time: unplayed_duration = ms_to_mmss(unplayed_time) else: unplayed_duration = "[No unplayed tracks]" caveat = "" if no_end: caveat = " (to end of playlist)" return f" {unplayed_duration} ({total_duration}){caveat}" def _get_selected_row(self) -> Optional[int]: """ Return row_number number of first selected row, or None if none selected """ sm = self.selectionModel() if sm: if sm.hasSelection(): return sm.selectedIndexes()[0].row() return None def _get_selected_rows(self) -> List[int]: """Return a list of selected row numbers sorted by row""" # Use a set to deduplicate result (a selected row will have all # items in that row selected) return sorted(list(set([a.row() for a in self.selectedIndexes()]))) def _info_row(self, track_id: int) -> None: """Display popup with info re row""" with Session() as session: track = session.get(Tracks, track_id) if track: txt = ( f"Title: {track.title}\n" f"Artist: {track.artist}\n" f"Track ID: {track.id}\n" f"Track duration: {ms_to_mmss(track.duration)}\n" f"Track bitrate: {track.bitrate}\n" f"Track fade at: {ms_to_mmss(track.fade_at)}\n" f"Track silence at: {ms_to_mmss(track.silence_at)}" "\n\n" f"Path: {track.path}\n" ) else: txt = f"Can't find {track_id=}" info: QMessageBox = QMessageBox(self) info.setIcon(QMessageBox.Icon.Information) info.setText(txt) info.setStandardButtons(QMessageBox.StandardButton.Ok) info.setDefaultButton(QMessageBox.StandardButton.Cancel) info.exec() def _look_up_row(self, website: str) -> None: """ If there is a selected row and it is a track row, look up its title in the passed website If multiple rows are selected, only consider the first one. Otherwise return. """ print("playlists_v3:_look_up_row()") return # selected_row = self._get_selected_row() # if not selected_row: # return # if not self._get_row_track_id(selected_row): # return # title = self._get_row_title(selected_row) # if website == "wikipedia": # QTimer.singleShot( # 0, lambda: self.musicmuster.tabInfolist.open_in_wikipedia(title) # ) # elif website == "songfacts": # QTimer.singleShot( # 0, lambda: self.musicmuster.tabInfolist.open_in_songfacts(title) # ) # else: # return def _mark_unplayed(self) -> None: """ Mark selected rows as unplayed in this playlist """ with Session() as session: for row_number in self._get_selected_rows(): _ = self._set_row_userdata(row_number, self.PLAYED, False) self._set_row_bold(row_number, True) plr = self._get_row_plr(session, row_number) if not plr: continue plr.played = False self._update_start_end_times(session) self.clear_selection() self.hide_or_show_played_tracks() def _move_row( self, session: scoped_session, plr: PlaylistRows, new_row_number: int ) -> None: """Move playlist row to new_row_number using parent copy/paste""" if plr.plr_rownum is None: return # Remove source row self.removeRow(plr.plr_rownum) # Fixup plr row number if plr.plr_rownum < new_row_number: plr.plr_rownum = new_row_number - 1 else: plr.plr_rownum = new_row_number self.insert_row(session, plr) self.save_playlist(session) self.hide_or_show_played_tracks() # Queue up time calculations to take place after UI has # updated self._update_start_end_times(session) def _mplayer_play(self, row_number: int) -> None: """Play track with mplayer""" track_path = self._get_row_track_path(row_number) if not track_path: log.error( f"{self.playlist_id=} playlists._mplayer_play({row_number=}): " "track_path not set" ) return cmd_list = ["gmplayer", "-vc", "null", "-vo", "null", track_path] thread = threading.Thread(target=self._run_subprocess, args=(cmd_list,)) thread.start() def _obs_change_scene(self, current_row: int) -> None: """ Try to change OBS scene to the name passed """ check_row = current_row while True: # If we have a note and it has a scene change command, # execute it note_text = self._get_row_note(check_row) if note_text: match_obj = scene_change_re.search(note_text) if match_obj: scene_name = match_obj.group(1) if scene_name: try: cl = obs.ReqClient( host=Config.OBS_HOST, port=Config.OBS_PORT, password=Config.OBS_PASSWORD, ) except ConnectionRefusedError: log.error("OBS connection refused") return try: cl.set_current_program_scene(scene_name) log.info(f"OBS scene changed to '{scene_name}'") return except obs.error.OBSSDKError as e: log.error(f"OBS SDK error ({e})") return # After current track row, only check header rows and stop # at first non-header row check_row -= 1 if check_row < 0: break if self._get_row_track_id(check_row): break def _open_in_audacity(self, row_number: int) -> None: """Open track in Audacity. Audacity must be already running""" track_path = self._get_row_track_path(row_number) if not track_path: log.error( f"{self.playlist_id=} " f"playlists._open_in_audactity({row_number=}): " "track_path not set" ) return open_in_audacity(track_path) def _plrid_to_row_number(self, plrid: int) -> Optional[int]: """ Return row number of passed plrid, or None if not found """ for row_number in range(self.rowCount()): if self._get_row_plr_id(row_number) == plrid: return row_number return None def _remove_track(self, row_number: int) -> None: """Remove track from row, making it a section header""" # Get confirmation if not ask_yes_no( "Remove music", "Really remove the music track from this row?" ): return # Update playlist_rows record with Session() as session: plr = self._get_row_plr(session, row_number) if not plr: return plr.track_id = None # We can't have null text if not plr.note: plr.note = Config.TEXT_NO_TRACK_NO_NOTE session.flush() # Clear track text items for i in range(2, len(columns)): _ = self._set_item_text(row_number, i, "") # Remove row duration self._set_row_duration(row_number, 0) # Remove row start gap self._set_row_start_gap(row_number, None) # Remote track_id from row _ = self._set_row_userdata(row_number, self.ROW_TRACK_ID, 0) # Span the rows self.setSpan(row_number, HEADER_NOTES_COLUMN, 1, len(columns) - 1) # Set note text in correct column for section head self._set_row_header_text(session, row_number, plr.note) self.clear_selection() # Save playlist to ensure correct detection of new header # row self.save_playlist(session) # Set track start/end times after track list is populated self._update_start_end_times(session) def _reorder_rows(self, source_row_numbers: List[int]) -> None: """ Take the list of source row numbers and put those playlist rows in that order. Algorithm: create new rows below the source rows and copy source rows in the correct order. When complete, delete source rows. """ next_row = max(source_row_numbers) + 1 for source_row_number in source_row_numbers: self.insertRow(next_row) for column in range(self.columnCount()): self.setItem(next_row, column, self.takeItem(source_row_number, column)) next_row += 1 # Remove source rows for i in reversed(sorted(source_row_numbers)): self.removeRow(i) def _rescan(self, row_number: int, track_id: int) -> None: """Rescan track""" with Session() as session: track = session.get(Tracks, track_id) if track: if file_is_unreadable(track.path): self._set_row_colour_unreadable(row_number) else: self._set_row_colour_default(row_number) set_track_metadata(track) self._update_row_track_info(session, row_number, track) else: _ = self._set_row_track_id(row_number, 0) note_text = self._get_row_note(row_number) if note_text is None: note_text = "" else: note_text += f"{track_id=} not found" self._set_row_header_text(session, row_number, note_text) log.error(f"playlists._rescan({track_id=}): " "Track not found") self._set_row_colour_unreadable(row_number) self._update_start_end_times(session) self.clear_selection() def _reset_next(self, old_plrid: int, new_plrid: int) -> None: """ Called when set_next_track_signal signal received. Actions required: - If old_plrid points to this playlist: - Remove existing next track - If new_plrid points to this playlist: - Set track as next - Display row as next track - Update start/stop times """ with Session() as session: # Get plrs old_plr = new_plr = None if old_plrid: old_plr = session.get(PlaylistRows, old_plrid) # Unmark next track if old_plr and old_plr.playlist_id == self.playlist_id: self._set_row_colour_default(old_plr.plr_rownum) # Mark next track if new_plrid: new_plr = session.get(PlaylistRows, new_plrid) if not new_plr: log.error(f"_reset_next({new_plrid=}): plr not found") return if new_plr.playlist_id == self.playlist_id: self._set_row_colour_next(new_plr.plr_rownum) # Update start/stop times self._update_start_end_times(session) self.clear_selection() def _run_subprocess(self, args): """Run args in subprocess""" subprocess.call(args) def _scroll_to_top(self, row_number: int) -> None: """ Scroll to put passed row_number Config.SCROLL_TOP_MARGIN from the top. """ if row_number is None: return padding_required = Config.SCROLL_TOP_MARGIN top_row = row_number if row_number > Config.SCROLL_TOP_MARGIN: # We can't scroll to a hidden row. Calculate target_row as # the one that is ideal to be at the top. Then count upwards # from passed row_number until we either reach the target, # pass it or reach row_number 0. for i in range(row_number - 1, -1, -1): if self.isRowHidden(i): continue if padding_required == 0: break top_row = i padding_required -= 1 scroll_item = self.item(top_row, 0) self.scrollToItem(scroll_item, QAbstractItemView.ScrollHint.PositionAtTop) def _search(self, next: bool = True) -> None: """ Select next/previous row containg self.search_string. Start from top selected row if there is one, else from top. Wrap at last/first row. """ if not self.search_text: return selected_row = self._get_selected_row() if next: if selected_row is not None and selected_row < self.rowCount() - 1: starting_row = selected_row + 1 else: starting_row = 0 else: if selected_row is not None and selected_row > 0: starting_row = selected_row - 1 else: starting_row = self.rowCount() - 1 wrapped = False match_row = None row_number = starting_row needle = self.search_text.lower() while True: # Check for match in title, artist or notes title = self._get_row_title(row_number) if title and needle in title.lower(): match_row = row_number break artist = self._get_row_artist(row_number) if artist and needle in artist.lower(): match_row = row_number break note = self._get_row_note(row_number) if note and needle in note.lower(): match_row = row_number break if next: row_number += 1 if wrapped and row_number >= starting_row: break if row_number >= self.rowCount(): row_number = 0 wrapped = True else: row_number -= 1 if wrapped and row_number <= starting_row: break if row_number < 0: row_number = self.rowCount() - 1 wrapped = True if match_row is not None: self.selectRow(row_number) # kae def _select_event(self) -> None: # kae """ # kae Called when item selection changes. # kae If multiple rows are selected, display sum of durations in status bar. # kae """ # kae selected_rows = self._get_selected_rows() # kae # If no rows are selected, we have nothing to do # kae if len(selected_rows) == 0: # kae self.musicmuster.lblSumPlaytime.setText("") # kae return # kae ms = 0 # kae for row_number in selected_rows: # kae ms += self._get_row_duration(row_number) # kae if ms > 0: # kae self.musicmuster.lblSumPlaytime.setText( # kae f"Selected duration: {ms_to_mmss(ms)}" # kae ) # kae else: # kae self.musicmuster.lblSumPlaytime.setText("") # def _set_cell_colour( # self, row_number: int, column: int, colour: Optional[str] = None # ) -> None: # """ # Set or reset a cell background colour # """ # if colour is None: # brush = QBrush() # else: # brush = QBrush(QColor(colour)) # item = self.item(row_number, column) # if item: # item.setBackground(brush) def _set_column_widths(self) -> None: """Column widths from settings""" header = self.horizontalHeader() if not header: return # Set width of last column to zero as it's set to stretch self.setColumnWidth(header.count() - 1, 0) # Set remaining column widths from settings with Session() as session: for column_number in range(header.count() - 1): attr_name = f"playlist_col_{column_number}_width" record = Settings.get_int_settings(session, attr_name) if record.f_int is not None: self.setColumnWidth(column_number, record.f_int) else: self.setColumnWidth(column_number, Config.DEFAULT_COLUMN_WIDTH) def _set_item_text( self, row_number: int, column: int, text: Optional[str] ) -> QTableWidgetItem: """ Set text for item if it exists, else create it, and return item """ if not text: text = "" item = self.item(row_number, column) if not item: item = QTableWidgetItem(text) self.setItem(row_number, column, item) else: item.setText(text) return item def _set_played_row(self, session: scoped_session, row_number: int) -> None: """Mark this row as played""" _ = self._set_row_userdata(row_number, self.PLAYED, True) self._set_row_bold(row_number, False) plr = self._get_row_plr(session, row_number) if not plr: return plr.played = True session.flush() def _set_row_artist( self, row_number: int, artist: Optional[str] ) -> QTableWidgetItem: """ Set row artist. Return QTableWidgetItem. """ if not artist: artist = "" return self._set_item_text(row_number, ARTIST, artist) def _set_row_bitrate( self, row_number: int, bitrate: Optional[int] ) -> QTableWidgetItem: """Set bitrate of this row.""" if not bitrate: bitrate_str = "" # If no bitrate, flag it as too low bitrate = Config.BITRATE_LOW_THRESHOLD - 1 else: bitrate_str = str(bitrate) bitrate_item = self._set_item_text(row_number, BITRATE, bitrate_str) if bitrate < Config.BITRATE_LOW_THRESHOLD: cell_colour = Config.COLOUR_BITRATE_LOW elif bitrate < Config.BITRATE_OK_THRESHOLD: cell_colour = Config.COLOUR_BITRATE_MEDIUM else: cell_colour = Config.COLOUR_BITRATE_OK brush = QBrush(QColor(cell_colour)) bitrate_item.setBackground(brush) return bitrate_item def _set_row_bold(self, row_number: int, bold: bool = True) -> None: """ Make row bold (bold=True) or not bold. Don't make notes column bold. """ boldfont = QFont() boldfont.setBold(bold) for column in range(self.columnCount()): if column == ROW_NOTES: continue item = self.item(row_number, column) if item: item.setFont(boldfont) def _set_row_colour(self, row_number: int, colour: Optional[str] = None) -> None: """ Set or reset row background colour """ if colour is None: brush = QBrush() else: brush = QBrush(QColor(colour)) for column in range(1, self.columnCount()): if column in [START_GAP, BITRATE]: continue item = self.item(row_number, column) if item: item.setBackground(brush) def _set_row_colour_current(self, row_number: int) -> None: """ Set current track row colour """ self._set_row_colour(row_number, Config.COLOUR_CURRENT_PLAYLIST) def _set_row_colour_default(self, row_number: int) -> None: """ Set default row colour """ self._set_row_colour(row_number, None) def _set_row_colour_next(self, row_number: int) -> None: """ Set next track row colour """ self._set_row_colour(row_number, Config.COLOUR_NEXT_PLAYLIST) def _set_row_colour_unreadable(self, row_number: int) -> None: """ Set unreadable row colour """ self._set_row_colour(row_number, Config.COLOUR_UNREADABLE) def _set_row_duration(self, row_number: int, ms: Optional[int]) -> QTableWidgetItem: """Set duration of this row. Also set in row metadata""" duration_item = self._set_item_text(row_number, DURATION, ms_to_mmss(ms)) self._set_row_userdata(row_number, self.ROW_DURATION, ms) return duration_item def _set_row_end_time( self, row_number: int, time: Optional[datetime] ) -> QTableWidgetItem: """Set row end time""" if not time: time_str = "" else: try: time_str = time.strftime(Config.TRACK_TIME_FORMAT) except AttributeError: time_str = "" return self._set_item_text(row_number, END_TIME, time_str) def _set_row_header_text( self, session: scoped_session, row_number: int, text: str ) -> None: """ Set header text and row colour """ # Sanity check: this should be a header row and thus not have a # track associate if self._get_row_track_id(row_number): if os.environ["MM_ENV"] == "PRODUCTION": send_mail( Config.ERRORS_TO, Config.ERRORS_FROM, "playlists:_set_row_header_text() called on track row", stackprinter.format(), ) print( f"playists:_set_row_header_text() called on track row ({row_number=}, {text=}" ) # stackprinter.show(add_summary=True, style="darkbg") return # Set text _ = self._set_item_text(row_number, HEADER_NOTES_COLUMN, text) # Set colour note_colour = NoteColours.get_colour(session, text) if not note_colour: note_colour = Config.COLOUR_NOTES_PLAYLIST self._set_row_colour(row_number, note_colour) def _set_row_last_played_time( self, row_number: int, last_played: datetime ) -> QTableWidgetItem: """Set row last played time. Also set in row metadata""" self._set_row_userdata(row_number, self.ROW_LAST_PLAYED, last_played) return self._set_item_text( row_number, LASTPLAYED, get_relative_date(last_played) ) def _set_row_note_colour(self, session: scoped_session, row_number: int) -> None: """ Set row note colour """ # Sanity check: this should be a track row and thus have a # track associated if not self._get_row_track_id(row_number): if os.environ["MM_ENV"] == "PRODUCTION": send_mail( Config.ERRORS_TO, Config.ERRORS_FROM, "playlists:_set_row_note_colour() on header row", stackprinter.format(), ) # stackprinter.show(add_summary=True, style="darkbg") print(f"playists:_set_row_note_colour() called on track row ({row_number=}") return # Set colour note_text = self._get_row_note(row_number) note_colour = NoteColours.get_colour(session, note_text) self._set_cell_colour(row_number, ROW_NOTES, note_colour) def _set_row_note_text( self, session: scoped_session, row_number: int, text: str ) -> None: """ Set row note text and note colour """ # Sanity check: this should be a track row and thus have a # track associated if not self._get_row_track_id(row_number): if os.environ["MM_ENV"] == "PRODUCTION": send_mail( Config.ERRORS_TO, Config.ERRORS_FROM, "playlists:_set_row_note_text() called on header row", stackprinter.format(), ) print( f"playists:_set_row_note_text() called on header row ({row_number=}, {text=}" ) # stackprinter.show(add_summary=True, style="darkbg") return # Set text _ = self._set_item_text(row_number, ROW_NOTES, text) # Set colour self._set_row_note_colour(session, row_number) def _set_row_plr_id(self, row_number: int, plr_id: int) -> QTableWidgetItem: """ Set PlaylistRows id """ return self._set_row_userdata(row_number, self.PLAYLISTROW_ID, plr_id) def _set_row_start_gap( self, row_number: int, start_gap: Optional[int] ) -> QTableWidgetItem: """ Set start gap on row, set backgroud colour. Return QTableWidgetItem. """ if not start_gap: start_gap = 0 start_gap_item = self._set_item_text(row_number, START_GAP, str(start_gap)) if start_gap >= 500: brush = QBrush(QColor(Config.COLOUR_LONG_START)) else: brush = QBrush() start_gap_item.setBackground(brush) return start_gap_item def _set_row_start_time( self, row_number: int, time: Optional[datetime] ) -> QTableWidgetItem: """Set row start time""" if not time: time_str = "" else: try: time_str = time.strftime(Config.TRACK_TIME_FORMAT) except AttributeError: time_str = "" return self._set_item_text(row_number, START_TIME, time_str) def _set_row_times( self, row_number: int, start: datetime, duration: int ) -> Optional[datetime]: """ Set row start and end times, return end time """ self._set_row_start_time(row_number, start) end_time = self._calculate_end_time(start, duration) self._set_row_end_time(row_number, end_time) return end_time def _set_row_title(self, row_number: int, title: Optional[str]) -> QTableWidgetItem: """ Set row title. """ if not title: title = "" return self._set_item_text(row_number, TITLE, title) def _set_row_track_id(self, row_number: int, track_id: int) -> QTableWidgetItem: """ Set track id """ return self._set_row_userdata(row_number, self.ROW_TRACK_ID, track_id) def _set_row_track_path(self, row_number: int, path: str) -> QTableWidgetItem: """ Set track path """ return self._set_row_userdata(row_number, self.TRACK_PATH, path) def _set_row_userdata( self, row_number: int, role: int, value: Any ) -> QTableWidgetItem: """ Set passed userdata in USERDATA column """ item = self.item(row_number, USERDATA) if not item: item = QTableWidgetItem() self.setItem(row_number, USERDATA, item) item.setData(role, value) return item def _sortable(self) -> bool: """ Return True if the selection is sortable. That means: - at least two rows selected - selected rows are contiguous - selected rows do not include any header rows """ selectionModel = self.selectionModel() if not selectionModel: return False source_rows = selectionModel.selectedRows() if len(source_rows) < 2: return False sorted_source_rows = sorted([a.row() for a in source_rows]) if sorted_source_rows != list( range(min(sorted_source_rows), max(sorted_source_rows) + 1) ): return False for row in sorted_source_rows: if self._get_row_track_id(row) == 0: return False return True def _sort_selection(self, sort_column: int) -> None: """ Algorithm: - check row selection is contiguous; return if not - copy (row-number, sort-field) to a list - sort the list by sort-field - create a new row after the selection - iterate the list and move items to new row - create another new row and repeat until all rows moved - delete old rows """ if not self._sortable(): return # Check selection is contiguous selectionModel = self.selectionModel() if not selectionModel: return source_row_numbers = [a.row() for a in selectionModel.selectedRows()] # Copy (row-number, sort-field) to a list sorted_rows: List[tuple[int, Any]] = [] for row in source_row_numbers: if sort_column == DURATION: sorted_rows.append((row, self._get_row_duration(row))) elif sort_column == LASTPLAYED: sorted_rows.append((row, self._get_row_last_played(row))) else: sort_item = self.item(row, sort_column) if sort_item: sorted_rows.append((row, sort_item.text())) else: sorted_rows.append((row, None)) # Sort the list reverse = QApplication.keyboardModifiers() == Qt.KeyboardModifier.ShiftModifier sorted_rows.sort(reverse=reverse, key=lambda row: row[1]) if sort_column == LASTPLAYED: sorted_rows.reverse() # Reorder rows new_order = [a[0] for a in sorted_rows] self.sort_undo = [ new_order.index(x) + min(new_order) for x in range(min(new_order), max(new_order) + 1) ] self._reorder_rows(new_order) # Reset drag mode to allow row selection by dragging # self.setDragEnabled(False) # Save playlist with Session() as session: self.save_playlist(session) self._update_start_end_times(session) def _sort_undo(self): """Undo last sort""" if not self.sort_undo: return new_order = self.sort_undo self._reorder_rows(new_order) self.sort_undo = [ new_order.index(x) + min(new_order) for x in range(min(new_order), max(new_order) + 1) ] # Reset drag mode to allow row selection by dragging # self.setDragEnabled(False) # Save playlist with Session() as session: self.save_playlist(session) self._update_start_end_times(session) def _span_cells(self, row: int, column: int, rowSpan: int, columnSpan: int) -> None: """ Implement spanning of cells, initiated by signal """ # Don't set spanning if already in place because that is seen as # a change to the view and thus it refreshes the data which # again calls us here. if ( self.rowSpan(row, column) == rowSpan and self.columnSpan(row, column) == columnSpan ): return self.setSpan(row, column, rowSpan, columnSpan) def _track_time_between_rows( self, session: scoped_session, from_plr: PlaylistRows, to_plr: PlaylistRows ) -> Tuple[int, int]: """ Returns the (total duration of all tracks in rows between from_row and to_row inclusive, total unplayed time in those rows) """ plr_tracks = PlaylistRows.get_rows_with_tracks( session, self.playlist_id, from_plr.plr_rownum, to_plr.plr_rownum ) total_time = 0 total_time = sum([a.track.duration for a in plr_tracks if a.track.duration]) unplayed_time = 0 unplayed_time = sum( [a.track.duration for a in plr_tracks if a.track.duration and not a.played] ) return (total_time, unplayed_time) def _update_row_track_info( self, session: scoped_session, row: int, track: Tracks ) -> None: """ Update the passed row with info from the passed track. """ _ = self._set_row_artist(row, track.artist) _ = self._set_row_bitrate(row, track.bitrate) _ = self._set_row_duration(row, track.duration) _ = self._set_row_end_time(row, None) if track.playdates: last_play = max([a.lastplayed for a in track.playdates]) else: last_play = Config.EPOCH _ = self._set_row_last_played_time(row, last_play) _ = self._set_row_start_gap(row, track.start_gap) _ = self._set_row_start_time(row, None) _ = self._set_row_title(row, track.title) _ = self._set_row_track_id(row, track.id) _ = self._set_row_track_path(row, track.path) if file_is_unreadable(track.path): self._set_row_colour_unreadable(row) def _update_section_headers(self, session: scoped_session) -> None: """ Update section headers with run time of section """ section_start_rows: List[PlaylistRows] = [] subtotal_from: Optional[PlaylistRows] = None header_rows = [ self._get_row_plr_id(row_number) for row_number in range(self.rowCount()) if self._get_row_track_id(row_number) == 0 ] plrs = PlaylistRows.plrids_to_plrs(session, self.playlist_id, header_rows) for plr in plrs: # Start of timed section if plr.note.endswith("+"): section_start_rows.append(plr) subtotal_from = plr continue # End of timed section elif plr.note.endswith("-"): try: from_plr = section_start_rows.pop() to_plr = plr total_time, unplayed_time = self._track_time_between_rows( session, from_plr, to_plr ) time_str = self._get_section_timing_string( total_time, unplayed_time ) self._set_row_header_text( session, from_plr.plr_rownum, from_plr.note + time_str ) # Update section end if to_plr.note.strip() == "-": new_text = ( "[End " + re.sub( section_header_cleanup_re, "", from_plr.note, ).strip() + "]" ) self._set_row_header_text(session, to_plr.plr_rownum, new_text) subtotal_from = None except IndexError: # This ending row may have a time left from before a # starting row above was deleted, so replace content self._set_row_header_text(session, plr.plr_rownum, plr.note) continue # Subtotal elif plr.note.endswith("="): if not subtotal_from: return from_plr = subtotal_from to_plr = plr total_time, unplayed_time = self._track_time_between_rows( session, subtotal_from, to_plr ) time_str = self._get_section_timing_string(total_time, unplayed_time) if to_plr.note.strip() == "=": leader_text = "Subtotal: " else: leader_text = to_plr.note[:-1] + " " new_text = leader_text + time_str self._set_row_header_text(session, to_plr.plr_rownum, new_text) subtotal_from = to_plr # If we still have plrs in section_start_rows, there isn't an end # section row for them possible_plr = self._get_row_plr(session, self.rowCount() - 1) if possible_plr: to_plr = possible_plr for from_plr in section_start_rows: total_time, unplayed_time = self._track_time_between_rows( session, from_plr, to_plr ) time_str = self._get_section_timing_string( total_time, unplayed_time, no_end=True ) self._set_row_header_text( session, from_plr.plr_rownum, from_plr.note + time_str ) def _update_start_end_times(self, session: scoped_session) -> None: """Update track start and end times""" current_track_end_time = self.musicmuster.current_track.end_time current_track_row = self._get_current_track_row_number() current_track_start_time = self.musicmuster.current_track.start_time next_start_time = None next_track_row = self._get_next_track_row_number() played_rows = self._get_played_rows(session) for row_number in range(self.rowCount()): # Don't change start times for tracks that have been # played other than current/next row if row_number in played_rows and row_number not in [ current_track_row, next_track_row, ]: continue # Get any timing from header row (that's all we need) if self._get_row_track_id(row_number) == 0: note_time = self._get_note_text_time(self._get_row_note(row_number)) if note_time: next_start_time = note_time continue # We have a track. Skip if it is unreadable if file_is_unreadable(self._get_row_path(row_number)): continue # Set next track start from end of current track if row_number == next_track_row: if current_track_end_time: next_start_time = self._set_row_times( row_number, current_track_end_time, self._get_row_duration(row_number), ) continue # Else set track times below if row_number == current_track_row: if not current_track_start_time: continue self._set_row_start_time(row_number, current_track_start_time) self._set_row_end_time(row_number, current_track_end_time) # Next track may be above us so only reset # next_start_time if it's not set if not next_start_time: next_start_time = current_track_end_time continue if not next_start_time: # Clear any existing times self._set_row_start_time(row_number, None) self._set_row_end_time(row_number, None) continue # If we're between the current and next row, zero out # times if ( current_track_row and next_track_row and current_track_row < row_number < next_track_row ): self._set_row_start_time(row_number, None) self._set_row_end_time(row_number, None) else: next_start_time = self._set_row_times( row_number, next_start_time, self._get_row_duration(row_number) ) self._update_section_headers(session)