# Standard library imports # Allow forward reference to PlaylistModel from __future__ import annotations from dataclasses import dataclass from operator import attrgetter from random import shuffle from typing import cast, Optional import datetime as dt # PyQt imports from PyQt6.QtCore import ( QAbstractTableModel, QModelIndex, QObject, QRegularExpression, QSortFilterProxyModel, Qt, QTimer, QVariant, ) from PyQt6.QtGui import ( QBrush, QColor, QFont, ) # Third party imports import line_profiler import obswebsocket # type: ignore # import snoop # type: ignore # App imports from classes import ( QueryCol, MusicMusterSignals, ) from config import Config from helpers import ( ask_yes_no, file_is_unreadable, get_embedded_time, get_relative_date, ms_to_mmss, remove_substring_case_insensitive, set_track_metadata, ) from log import log from models import db, NoteColours, Playdates, PlaylistRows, Tracks from music_manager import RowAndTrack, track_sequence @dataclass class QueryRow: artist: str bitrate: int duration: int lastplayed: dt.datetime path: str title: str track_id: int class QuerylistModel(QAbstractTableModel): """ The Querylist Model Used to support query lists. The underlying database is never updated. We just present tracks that match a query and allow the user to copy those to a playlist. """ def __init__( self, playlist_id: int, ) -> None: log.debug("QuerylistModel.__init__()") self.playlist_id = playlist_id super().__init__() self.querylist_rows: dict[int, QueryRow] = {} self.signals = MusicMusterSignals() self.signals.begin_reset_model_signal.connect(self.begin_reset_model) self.signals.end_reset_model_signal.connect(self.end_reset_model) with db.Session() as session: # Populate self.playlist_rows self.load_data(session) def __repr__(self) -> str: return ( f"" ) def background_role(self, row: int, column: int, qrow: QueryRow) -> QBrush: """Return background setting""" # Unreadable track file if file_is_unreadable(qrow.path): return QBrush(QColor(Config.COLOUR_UNREADABLE)) # Individual cell colouring if column == QueryCol.BITRATE.value: if not qrow.bitrate or qrow.bitrate < Config.BITRATE_LOW_THRESHOLD: return QBrush(QColor(Config.COLOUR_BITRATE_LOW)) elif qrow.bitrate < Config.BITRATE_OK_THRESHOLD: return QBrush(QColor(Config.COLOUR_BITRATE_MEDIUM)) else: return QBrush(QColor(Config.COLOUR_BITRATE_OK)) return QBrush() def begin_reset_model(self, playlist_id: int) -> None: """ Reset model if playlist_id is ours """ if playlist_id != self.playlist_id: return super().beginResetModel() def columnCount(self, parent: QModelIndex = QModelIndex()) -> int: """Standard function for view""" return len(QueryCol) def data( self, index: QModelIndex, role: int = Qt.ItemDataRole.DisplayRole ) -> QVariant: """Return data to view""" if not index.isValid() or not (0 <= index.row() < len(self.querylist_rows)): return QVariant() row = index.row() column = index.column() # rat for playlist row data as it's used a lot qrow = self.querylist_rows[row] # Dispatch to role-specific functions dispatch_table = { int(Qt.ItemDataRole.BackgroundRole): self.background_role, int(Qt.ItemDataRole.DisplayRole): self.display_role, int(Qt.ItemDataRole.EditRole): self.edit_role, } if role in dispatch_table: return QVariant(dispatch_table[role](row, column, qrow)) # Document other roles but don't use them if role in [ Qt.ItemDataRole.CheckStateRole, Qt.ItemDataRole.DecorationRole, Qt.ItemDataRole.FontRole, Qt.ItemDataRole.ForegroundRole, Qt.ItemDataRole.InitialSortOrderRole, Qt.ItemDataRole.SizeHintRole, Qt.ItemDataRole.StatusTipRole, Qt.ItemDataRole.TextAlignmentRole, Qt.ItemDataRole.ToolTipRole, Qt.ItemDataRole.WhatsThisRole, ]: return QVariant() # Fall through to no-op return QVariant() def display_role(self, row: int, column: int, qrow: QueryRow) -> QVariant: """ Return text for display """ dispatch_table = { QueryCol.ARTIST.value: QVariant(qrow.artist), QueryCol.BITRATE.value: QVariant(qrow.bitrate), QueryCol.DURATION.value: QVariant(ms_to_mmss(qrow.duration)), QueryCol.LAST_PLAYED.value: QVariant(get_relative_date(qrow.lastplayed)), QueryCol.TITLE.value: QVariant(qrow.title), } if column in dispatch_table: return dispatch_table[column] return QVariant() def end_reset_model(self, playlist_id: int) -> None: """ End model reset if this is our playlist """ log.debug(f"{self}: end_reset_model({playlist_id=})") if playlist_id != self.playlist_id: log.debug(f"{self}: end_reset_model: not us ({self.playlist_id=})") return with db.Session() as session: self.refresh_data(session) super().endResetModel() self.reset_track_sequence_row_numbers() def edit_role(self, row: int, column: int, qrow: QueryRow) -> QVariant: """ Return text for editing """ if column == QueryCol.TITLE.value: return QVariant(qrow.title) if column == QueryCol.ARTIST.value: return QVariant(qrow.artist) return QVariant() def flags(self, index: QModelIndex) -> Qt.ItemFlag: """ Standard model flags """ default = ( Qt.ItemFlag.ItemIsEnabled | Qt.ItemFlag.ItemIsSelectable ) if index.column() in [ QueryCol.TITLE.value, QueryCol.ARTIST.value, ]: return default | Qt.ItemFlag.ItemIsEditable return default def get_row_info(self, row_number: int) -> RowAndTrack: """ Return info about passed row """ return self.querylist_rows[row_number] def get_row_track_id(self, row_number: int) -> Optional[int]: """ Return id of track associated with row or None if no track associated """ return self.querylist_rows[row_number].track_id def get_row_track_path(self, row_number: int) -> str: """ Return path of track associated with row or empty string if no track associated """ return self.querylist_rows[row_number].path def get_rows_duration(self, row_numbers: list[int]) -> int: """ Return the total duration of the passed rows """ duration = 0 for row_number in row_numbers: duration += self.querylist_rows[row_number].duration return duration def invalidate_row(self, modified_row: int) -> None: """ Signal to view to refresh invalidated row """ self.dataChanged.emit( self.index(modified_row, 0), self.index(modified_row, self.columnCount() - 1), ) def invalidate_rows(self, modified_rows: list[int]) -> None: """ Signal to view to refresh invlidated rows """ for modified_row in modified_rows: self.invalidate_row(modified_row) def load_data( self, session: db.session, sql: str = """ SELECT tracks.*,playdates.lastplayed FROM tracks,playdates WHERE playdates.track_id=tracks.id AND tracks.path LIKE '%/Singles/p%' GROUP BY tracks.id HAVING MAX(playdates.lastplayed) < DATE_SUB(NOW(), INTERVAL 1 YEAR) ORDER BY tracks.title ; """ ) -> None: """ Load data from user-defined query. Can probably hard-code the SELECT part to ensure the required fields are returned. """ # TODO: Move the SQLAlchemy parts to models later, but for now as proof # of concept we'll keep it here. from sqlalchemy import text # Clear any exsiting rows self.querylist_rows = {} row = 0 results = session.execute(text(sql)).mappings().all() for result in results: queryrow = QueryRow( artist=result['artist'], bitrate=result['bitrate'], duration=result['duration'], lastplayed=result['lastplayed'], path=result['path'], title=result['title'], track_id=result['id'], ) self.querylist_rows[row] = queryrow row += 1 # # Note where each playlist_id is # plid_to_row: dict[int, int] = {} # for oldrow in self.playlist_rows: # plrdata = self.playlist_rows[oldrow] # plid_to_row[plrdata.playlistrow_id] = plrdata.row_number # # build a new playlist_rows # new_playlist_rows: dict[int, RowAndTrack] = {} # for p in PlaylistRows.get_playlist_rows(session, self.playlist_id): # if p.id not in plid_to_row: # new_playlist_rows[p.row_number] = RowAndTrack(p) # else: # new_playlist_rows[p.row_number] = self.playlist_rows[plid_to_row[p.id]] # new_playlist_rows[p.row_number].row_number = p.row_number # # Copy to self.playlist_rows # self.playlist_rows = new_playlist_rows def move_rows( self, from_rows: list[int], to_row_number: int, dummy_for_profiling: Optional[int] = None, ) -> None: """ Move the playlist rows given to to_row and below. """ log.debug(f"{self}: move_rows({from_rows=}, {to_row_number=}") # Build a {current_row_number: new_row_number} dictionary row_map: dict[int, int] = {} # The destination row number will need to be reduced by the # number of rows being move from above the destination row # otherwise rows below the destination row will end up above the # moved rows. adjusted_to_row = to_row_number - len( [a for a in from_rows if a < to_row_number] ) # Put the from_row row numbers into the row_map. Ultimately the # total number of elements in the playlist doesn't change, so # check that adding the moved rows starting at to_row won't # overshoot the end of the playlist. if adjusted_to_row + len(from_rows) > len(self.querylist_rows): next_to_row = len(self.querylist_rows) - len(from_rows) else: next_to_row = adjusted_to_row # zip iterates from_row and to_row simultaneously from the # respective sequences inside zip() for from_row, to_row in zip( from_rows, range(next_to_row, next_to_row + len(from_rows)) ): row_map[from_row] = to_row # Move the remaining rows to the row_map. We want to fill it # before (if there are gaps) and after (likewise) the rows that # are moving. # zip iterates old_row and new_row simultaneously from the # respective sequences inside zip() for old_row, new_row in zip( [x for x in self.querylist_rows.keys() if x not in from_rows], [y for y in range(len(self.querylist_rows)) if y not in row_map.values()], ): # Optimise: only add to map if there is a change if old_row != new_row: row_map[old_row] = new_row # For SQLAlchemy, build a list of dictionaries that map playlistrow_id to # new row number: sqla_map: list[dict[str, int]] = [] for oldrow, newrow in row_map.items(): playlistrow_id = self.querylist_rows[oldrow].playlistrow_id sqla_map.append({"playlistrow_id": playlistrow_id, "row_number": newrow}) with db.Session() as session: PlaylistRows.update_plr_row_numbers(session, self.playlist_id, sqla_map) session.commit() # Update playlist_rows self.refresh_data(session) # Update display self.reset_track_sequence_row_numbers() self.invalidate_rows(list(row_map.keys())) @line_profiler.profile def move_rows_between_playlists( self, from_rows: list[int], to_row_number: int, to_playlist_id: int, dummy_for_profiling: Optional[int] = None, ) -> None: """ Move the playlist rows given to to_row and below of to_playlist. """ log.debug( f"{self}: move_rows_between_playlists({from_rows=}, " f"{to_row_number=}, {to_playlist_id=}" ) # Row removal must be wrapped in beginRemoveRows .. # endRemoveRows and the row range must be contiguous. Process # the highest rows first so the lower row numbers are unchanged row_groups = self._reversed_contiguous_row_groups(from_rows) # Prepare destination playlist for a reset self.signals.begin_reset_model_signal.emit(to_playlist_id) with db.Session() as session: for row_group in row_groups: # Make room in destination playlist max_destination_row_number = PlaylistRows.get_last_used_row( session, to_playlist_id ) if ( max_destination_row_number and to_row_number <= max_destination_row_number ): # Move the destination playlist rows down to make room. PlaylistRows.move_rows_down( session, to_playlist_id, to_row_number, len(row_group) ) next_to_row = to_row_number super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group)) for playlist_row in PlaylistRows.plrids_to_plrs( session, self.playlist_id, [self.querylist_rows[a].playlistrow_id for a in row_group], ): if ( track_sequence.current and playlist_row.id == track_sequence.current.playlistrow_id ): # Don't move current track continue playlist_row.playlist_id = to_playlist_id playlist_row.row_number = next_to_row next_to_row += 1 self.refresh_data(session) super().endRemoveRows() # We need to remove gaps in row numbers after tracks have # moved. PlaylistRows.fixup_rownumbers(session, self.playlist_id) self.refresh_data(session) session.commit() # Reset of model must come after session has been closed self.reset_track_sequence_row_numbers() self.signals.end_reset_model_signal.emit(to_playlist_id) def reset_track_sequence_row_numbers(self) -> None: """ Signal handler for when row ordering has changed. Example: row 4 is marked as next. Row 2 is deleted. The PlaylistRows table will be correctly updated with change of row number, but track_sequence.next will still contain row_number==4. This function fixes up the track_sequence row numbers by looking up the playlistrow_id and retrieving the row number from the database. """ log.debug(f"{self}: reset_track_sequence_row_numbers()") # Check the track_sequence.next, current and previous plrs and # update the row number with db.Session() as session: for ts in [ track_sequence.next, track_sequence.current, track_sequence.previous, ]: if ts: ts.update_playlist_and_row(session) def _reversed_contiguous_row_groups( self, row_numbers: list[int] ) -> list[list[int]]: """ Take the list of row numbers and split into groups of contiguous rows. Return as a list of lists with the highest row numbers first. Example: input [2, 3, 4, 5, 7, 9, 10, 13, 17, 20, 21] return: [[20, 21], [17], [13], [9, 10], [7], [2, 3, 4, 5]] """ log.debug(f"{self}: _reversed_contiguous_row_groups({row_numbers=} called") result: list[list[int]] = [] temp: list[int] = [] last_value = row_numbers[0] - 1 for idx in range(len(row_numbers)): if row_numbers[idx] != last_value + 1: result.append(temp) temp = [] last_value = row_numbers[idx] temp.append(last_value) if temp: result.append(temp) result.reverse() log.debug(f"{self}: _reversed_contiguous_row_groups() returned: {result=}") return result def rowCount(self, index: QModelIndex = QModelIndex()) -> int: """Standard function for view""" return len(self.querylist_rows) def selection_is_sortable(self, row_numbers: list[int]) -> bool: """ Return True if the selection is sortable. That means: - at least two rows selected - selected rows are contiguous """ # at least two rows selected if len(row_numbers) < 2: return False # selected rows are contiguous if sorted(row_numbers) != list(range(min(row_numbers), max(row_numbers) + 1)): return False return True def setData( self, index: QModelIndex, value: str | float, role: int = Qt.ItemDataRole.EditRole, ) -> bool: """ Update model with edited data """ if not index.isValid() or role != Qt.ItemDataRole.EditRole: return False row_number = index.row() column = index.column() with db.Session() as session: playlist_row = session.get( PlaylistRows, self.querylist_rows[row_number].playlistrow_id ) if not playlist_row: log.error( f"{self}: Error saving data: {row_number=}, {column=}, {value=}" ) return False if column in [QueryCol.TITLE.value, QueryCol.ARTIST.value]: track = session.get(Tracks, playlist_row.track_id) if not track: log.error(f"{self}: Error retreiving track: {playlist_row=}") return False if column == QueryCol.TITLE.value: track.title = str(value) elif column == QueryCol.ARTIST.value: track.artist = str(value) else: log.error(f"{self}: Error updating track: {column=}, {value=}") return False # commit changes before refreshing data session.commit() self.refresh_row(session, row_number) self.dataChanged.emit(index, index, [Qt.ItemDataRole.DisplayRole, role]) return True def sort_by_artist(self, row_numbers: list[int]) -> None: """ Sort selected rows by artist """ self.sort_by_attribute(row_numbers, "artist") def sort_by_attribute(self, row_numbers: list[int], attr_name: str) -> None: """ Sort selected rows by passed attribute name where 'attribute' is a key in PlaylistRowData """ # Create a subset of playlist_rows with the rows we are # interested in shortlist_rows = {k: self.querylist_rows[k] for k in row_numbers} sorted_list = [ playlist_row.row_number for playlist_row in sorted( shortlist_rows.values(), key=attrgetter(attr_name) ) ] self.move_rows(sorted_list, min(sorted_list)) def sort_by_duration(self, row_numbers: list[int]) -> None: """ Sort selected rows by duration """ self.sort_by_attribute(row_numbers, "duration") def sort_by_lastplayed(self, row_numbers: list[int]) -> None: """ Sort selected rows by lastplayed """ self.sort_by_attribute(row_numbers, "lastplayed") def sort_randomly(self, row_numbers: list[int]) -> None: """ Sort selected rows randomly """ shuffle(row_numbers) self.move_rows(row_numbers, min(row_numbers)) def sort_by_title(self, row_numbers: list[int]) -> None: """ Sort selected rows by title """ self.sort_by_attribute(row_numbers, "title") class QuerylistProxyModel(QSortFilterProxyModel): """ For searching and filtering """ def __init__( self, ) -> None: super().__init__() # Search all columns self.setFilterKeyColumn(-1) def __repr__(self) -> str: return f"" def set_incremental_search(self, search_string: str) -> None: """ Update search pattern """ self.setFilterRegularExpression( QRegularExpression( search_string, QRegularExpression.PatternOption.CaseInsensitiveOption ) ) def sourceModel(self) -> QuerylistModel: """ Override sourceModel to return correct type """ return cast(QuerylistModel, super().sourceModel())