from datetime import datetime from enum import auto, Enum from typing import List, Optional, TYPE_CHECKING from dbconfig import scoped_session, Session from PyQt6.QtCore import ( QAbstractTableModel, QModelIndex, Qt, QVariant, ) from PyQt6.QtGui import ( QBrush, QColor, QFont, ) from config import Config from helpers import ( file_is_unreadable, ) from models import PlaylistRows, Tracks if TYPE_CHECKING: from musicmuster import MusicMusterSignals class Col(Enum): START_GAP = 0 TITLE = auto() ARTIST = auto() DURATION = auto() START_TIME = auto() END_TIME = auto() LAST_PLAYED = auto() BITRATE = auto() NOTE = auto() HEADER_NOTES_COLUMN = 1 class PlaylistRowData: def __init__(self, plr: PlaylistRows) -> None: """ Populate PlaylistRowData from database PlaylistRows record """ self.start_gap: Optional[int] = None self.title: str = "" self.artist: str = "" self.duration: int = 0 self.lastplayed: datetime = Config.EPOCH self.bitrate = 0 self.path = "" self.plrid: int = plr.id self.plr_rownum: int = plr.plr_rownum self.note: str = plr.note if plr.track: self.start_gap = plr.track.start_gap self.title = plr.track.title self.artist = plr.track.artist self.duration = plr.track.duration if plr.track.playdates: self.lastplayed = max([a.lastplayed for a in plr.track.playdates]) else: self.lastplayed = Config.EPOCH self.bitrate = plr.track.bitrate or 0 self.path = plr.track.path def __repr__(self) -> str: return ( f"" ) class PlaylistModel(QAbstractTableModel): """ The Playlist Model Update strategy: update the database and then refresh the cached copy (self.playlist_rows). We do not try to edit playlist_rows directly. It would be too easy for a bug to get us out of sync with the database, and if that wasn't immediately apparent then debugging it would be hard. refresh_row() will populate one row of playlist_rows from the database refresh_data() will repopulate all of playlist_rows from the database """ def __init__( self, playlist_id: int, signals: "MusicMusterSignals", *args, **kwargs ): self.playlist_id = playlist_id self.signals = signals super().__init__(*args, **kwargs) self.playlist_rows: dict[int, PlaylistRowData] = {} self.refresh_data() def __repr__(self) -> str: return f"" def background_role(self, row: int, column: int, prd: PlaylistRowData) -> QBrush: """Return background setting""" # Handle entire row colouring # Header row if not prd.path: return QBrush(QColor(Config.COLOUR_NOTES_PLAYLIST)) # Unreadable track file if file_is_unreadable(prd.path): return QBrush(QColor(Config.COLOUR_UNREADABLE)) # Individual cell colouring if column == Col.START_GAP.value: if prd.start_gap and prd.start_gap >= Config.START_GAP_WARNING_THRESHOLD: return QBrush(QColor(Config.COLOUR_LONG_START)) if column == Col.BITRATE.value: if prd.bitrate < Config.BITRATE_LOW_THRESHOLD: return QBrush(QColor(Config.COLOUR_BITRATE_LOW)) elif prd.bitrate and prd.bitrate < Config.BITRATE_OK_THRESHOLD: return QBrush(QColor(Config.COLOUR_BITRATE_MEDIUM)) else: return QBrush(QColor(Config.COLOUR_BITRATE_OK)) return QBrush() def columnCount(self, parent: QModelIndex = QModelIndex()) -> int: """Standard function for view""" return 9 def data(self, index: QModelIndex, role: int = Qt.ItemDataRole.DisplayRole): """Return data to view""" if not index.isValid() or not (0 <= index.row() < len(self.playlist_rows)): return QVariant() row = index.row() column = index.column() # prd for playlist row data as it's used a lot prd = self.playlist_rows[row] # Dispatch to role-specific functions if role == Qt.ItemDataRole.DisplayRole: return self.display_role(row, column, prd) elif role == Qt.ItemDataRole.DecorationRole: pass elif role == Qt.ItemDataRole.EditRole: return self.edit_role(row, column, prd) elif role == Qt.ItemDataRole.ToolTipRole: pass elif role == Qt.ItemDataRole.StatusTipRole: pass elif role == Qt.ItemDataRole.WhatsThisRole: pass elif role == Qt.ItemDataRole.SizeHintRole: pass elif role == Qt.ItemDataRole.FontRole: pass elif role == Qt.ItemDataRole.TextAlignmentRole: pass elif role == Qt.ItemDataRole.BackgroundRole: return self.background_role(row, column, prd) elif role == Qt.ItemDataRole.ForegroundRole: pass elif role == Qt.ItemDataRole.CheckStateRole: pass elif role == Qt.ItemDataRole.InitialSortOrderRole: pass # Fall through to no-op return QVariant() def display_role(self, row: int, column: int, prd: PlaylistRowData) -> QVariant: """ Return text for display """ if not prd.path: # No track so this is a header row if column == HEADER_NOTES_COLUMN: self.signals.span_cells_signal.emit( row, HEADER_NOTES_COLUMN, 1, self.columnCount() - 1 ) return QVariant(prd.note) else: return QVariant() if column == Col.START_GAP.value: return QVariant(prd.start_gap) if column == Col.TITLE.value: return QVariant(prd.title) if column == Col.ARTIST.value: return QVariant(prd.artist) if column == Col.DURATION.value: return QVariant(prd.duration) if column == Col.START_TIME.value: return QVariant("FIXME") if column == Col.END_TIME.value: return QVariant("FIXME") if column == Col.LAST_PLAYED.value: return QVariant(prd.lastplayed) if column == Col.BITRATE.value: return QVariant(prd.bitrate) if column == Col.NOTE.value: return QVariant(prd.note) return QVariant() def edit_role(self, row: int, column: int, prd: PlaylistRowData) -> QVariant: """ Return text for editing """ if column == Col.TITLE.value: return QVariant(prd.title) if column == Col.ARTIST.value: return QVariant(prd.artist) if column == Col.NOTE.value: return QVariant(prd.note) return QVariant() def flags(self, index: QModelIndex) -> Qt.ItemFlag: """ Standard model flags """ if not index.isValid(): return Qt.ItemFlag.ItemIsDropEnabled default = ( Qt.ItemFlag.ItemIsEnabled | Qt.ItemFlag.ItemIsSelectable | Qt.ItemFlag.ItemIsDragEnabled ) if index.column() in [Col.TITLE.value, Col.ARTIST.value, Col.NOTE.value]: return default | Qt.ItemFlag.ItemIsEditable return default def headerData( self, section: int, orientation: Qt.Orientation, role: int = Qt.ItemDataRole.DisplayRole, ) -> QVariant: """ Return text for headers """ if role == Qt.ItemDataRole.DisplayRole: if orientation == Qt.Orientation.Horizontal: if section == Col.START_GAP.value: return QVariant(Config.HEADER_START_GAP) elif section == Col.TITLE.value: return QVariant(Config.HEADER_TITLE) elif section == Col.ARTIST.value: return QVariant(Config.HEADER_ARTIST) elif section == Col.DURATION.value: return QVariant(Config.HEADER_DURATION) elif section == Col.START_TIME.value: return QVariant(Config.HEADER_START_TIME) elif section == Col.END_TIME.value: return QVariant(Config.HEADER_END_TIME) elif section == Col.LAST_PLAYED.value: return QVariant(Config.HEADER_LAST_PLAYED) elif section == Col.BITRATE.value: return QVariant(Config.HEADER_BITRATE) elif section == Col.NOTE.value: return QVariant(Config.HEADER_NOTE) else: if Config.ROWS_FROM_ZERO: return QVariant(str(section)) else: return QVariant(str(section + 1)) elif role == Qt.ItemDataRole.FontRole: boldfont = QFont() boldfont.setBold(True) return QVariant(boldfont) return QVariant() def insert_header_row(self, row_number: Optional[int], text: str) -> Optional[int]: """ Insert a header row. Return row number or None if insertion failed. """ with Session() as session: prd = self._insert_row(session, row_number) # Update playlist_rows prd.note = text # Get row from db and update plr = session.get(PlaylistRows, prd.plrid) if plr: plr.note = text self.refresh_row(session, plr.plr_rownum) self.invalidate_row(plr.plr_rownum) return plr.plr_rownum return None def _insert_row( self, session: scoped_session, row_number: Optional[int] ) -> PlaylistRowData: """ Make space for a row at row_number. If row_number is greater than length of list plus 1, or if row number is -1, put row at end of list. Return the new PlaylistData structure """ modified_rows: List[int] = [] if row_number is None or row_number > len(self.playlist_rows): new_row_number = len(self.playlist_rows) elif row_number < 0: raise ValueError( f"playlistmodel.insert_row, invalid row number ({row_number})" ) else: new_row_number = row_number modified_rows.append(new_row_number) # Move rows below new row down for i in reversed(range(new_row_number, len(self.playlist_rows))): self.playlist_rows[i + 1] = self.playlist_rows[i] self.playlist_rows[i + 1].plr_rownum += 1 modified_rows.append(i + 1) # If we are not adding to the end of the list, we need to clear # out the existing recored at new_row_number (which we have # already copied to its new location) if new_row_number in self.playlist_rows: del self.playlist_rows[new_row_number] *** Problem here is that we haven't yet updated the database so when we insert a new row with the PlaylistRows.__init__ call below, we'll get a duplicate. How best to keep playlist_rows in step with database? # Insert new row, possibly replace old row plr = PlaylistRows( session=session, playlist_id=self.playlist_id, row_number=new_row_number ) prd = PlaylistRowData(plr) # Add row to playlist_rows self.playlist_rows[new_row_number] = prd return prd def invalidate_row(self, modified_row: int) -> None: """ Signal to view to refresh invlidated row """ self.dataChanged.emit( self.index(modified_row, 0), self.index(modified_row, self.columnCount()) ) def invalidate_rows(self, modified_rows: List[int]) -> None: """ Signal to view to refresh invlidated rows """ for modified_row in modified_rows: self.invalidate_row(modified_row) def move_rows(self, from_rows: List[int], to_row: int) -> None: """ Move the playlist rows given to to_row and below. """ new_playlist_rows: dict[int, PlaylistRowData] = {} # Move the from_row records from the playlist_rows dict to the # new_playlist_rows dict. The total number of elements in the # playlist doesn't change, so check that adding the moved rows # starting at to_row won't overshoot the end of the playlist. if to_row + len(from_rows) > len(self.playlist_rows): next_to_row = len(self.playlist_rows) - len(from_rows) else: next_to_row = to_row for from_row in from_rows: new_playlist_rows[next_to_row] = self.playlist_rows[from_row] del self.playlist_rows[from_row] next_to_row += 1 # Move the remaining rows to the gaps in new_playlist_rows new_row = 0 for old_row in self.playlist_rows.keys(): # Find next gap while new_row in new_playlist_rows: new_row += 1 new_playlist_rows[new_row] = self.playlist_rows[old_row] new_row += 1 # Make copy of rows live self.playlist_rows = new_playlist_rows # Update PlaylistRows table and notify display of rows that # moved with Session() as session: for idx in range(len(self.playlist_rows)): if self.playlist_rows[idx].plr_rownum == idx: continue # Row number in this row is incorred. Fix it in # database: plr = session.get(PlaylistRows, self.playlist_rows[idx].plrid) if not plr: print(f"\nCan't find plr in playlistmodel:move_rows {idx=}") continue plr.plr_rownum = idx # Fix in self.playlist_rows self.playlist_rows[idx].plr_rownum = idx # Update display self.invalidate_row(idx) def refresh_data(self): """Populate dicts for data calls""" # Populate self.playlist_rows with playlist data self.playlist_rows.clear() with Session() as session: for p in PlaylistRows.deep_rows(session, self.playlist_id): self.playlist_rows[p.plr_rownum] = PlaylistRowData(p) def refresh_row(self, session, row_number): """Populate dict for one row for data calls""" p = PlaylistRows.deep_row(session, self.playlist_id, row_number) self.playlist_rows[p.plr_rownum] = PlaylistRowData(p) def rowCount(self, index: QModelIndex = QModelIndex()) -> int: """Standard function for view""" return len(self.playlist_rows) def setData( self, index: QModelIndex, value: QVariant, role: int = Qt.ItemDataRole.EditRole ) -> bool: """ Update model with edited data """ if index.isValid() and role == Qt.ItemDataRole.EditRole: row_number = index.row() column = index.column() with Session() as session: plr = session.get(PlaylistRows, self.playlist_rows[row_number].plrid) if not plr: print( f"Error saving data: {row_number=}, {column=}, " f"{value=}, {self.playlist_id=}" ) return False if column == Col.TITLE.value or column == Col.ARTIST.value: track = session.get(Tracks, plr.track_id) if not track: print(f"Error retreiving track: {plr=}") return False if column == Col.TITLE.value: track.title = str(value) elif column == Col.ARTIST.value: track.artist = str(value) else: print(f"Error updating track: {column=}, {value=}") return False elif column == Col.NOTE.value: plr.note = str(value) # Flush changes before refreshing data session.flush() self.refresh_row(session, row_number) self.dataChanged.emit(index, index, [Qt.ItemDataRole.DisplayRole, role]) return True return False def supportedDropActions(self): return Qt.DropAction.MoveAction | Qt.DropAction.CopyAction