from datetime import datetime from enum import auto, Enum from sqlalchemy import bindparam, update from typing import List, Optional, TYPE_CHECKING from dbconfig import scoped_session, Session from PyQt6.QtCore import ( QAbstractTableModel, QModelIndex, Qt, QVariant, ) from PyQt6.QtGui import ( QBrush, QColor, QFont, ) from config import Config from helpers import ( file_is_unreadable, ) from models import PlaylistRows, Tracks if TYPE_CHECKING: from musicmuster import MusicMusterSignals class Col(Enum): START_GAP = 0 TITLE = auto() ARTIST = auto() DURATION = auto() START_TIME = auto() END_TIME = auto() LAST_PLAYED = auto() BITRATE = auto() NOTE = auto() HEADER_NOTES_COLUMN = 1 class PlaylistRowData: def __init__(self, plr: PlaylistRows) -> None: """ Populate PlaylistRowData from database PlaylistRows record """ self.start_gap: Optional[int] = None self.title: str = "" self.artist: str = "" self.duration: int = 0 self.lastplayed: datetime = Config.EPOCH self.bitrate = 0 self.path = "" self.plrid: int = plr.id self.plr_rownum: int = plr.plr_rownum self.note: str = plr.note if plr.track: self.start_gap = plr.track.start_gap self.title = plr.track.title self.artist = plr.track.artist self.duration = plr.track.duration if plr.track.playdates: self.lastplayed = max([a.lastplayed for a in plr.track.playdates]) else: self.lastplayed = Config.EPOCH self.bitrate = plr.track.bitrate or 0 self.path = plr.track.path def __repr__(self) -> str: return ( f"" ) class PlaylistModel(QAbstractTableModel): """ The Playlist Model Update strategy: update the database and then refresh the row-indexed cached copy (self.playlist_rows). Do not edit self.playlist_rows directly because keeping it and the database in sync is uncessarily challenging. refresh_row() will populate one row of playlist_rows from the database refresh_data() will repopulate all of playlist_rows from the database """ def __init__( self, playlist_id: int, signals: "MusicMusterSignals", *args, **kwargs ): self.playlist_id = playlist_id self.signals = signals super().__init__(*args, **kwargs) self.playlist_rows: dict[int, PlaylistRowData] = {} with Session() as session: self.refresh_data(session) def __repr__(self) -> str: return ( f"" ) def background_role(self, row: int, column: int, prd: PlaylistRowData) -> QBrush: """Return background setting""" # Handle entire row colouring # Header row if not prd.path: return QBrush(QColor(Config.COLOUR_NOTES_PLAYLIST)) # Unreadable track file if file_is_unreadable(prd.path): return QBrush(QColor(Config.COLOUR_UNREADABLE)) # Individual cell colouring if column == Col.START_GAP.value: if prd.start_gap and prd.start_gap >= Config.START_GAP_WARNING_THRESHOLD: return QBrush(QColor(Config.COLOUR_LONG_START)) if column == Col.BITRATE.value: if prd.bitrate < Config.BITRATE_LOW_THRESHOLD: return QBrush(QColor(Config.COLOUR_BITRATE_LOW)) elif prd.bitrate and prd.bitrate < Config.BITRATE_OK_THRESHOLD: return QBrush(QColor(Config.COLOUR_BITRATE_MEDIUM)) else: return QBrush(QColor(Config.COLOUR_BITRATE_OK)) return QBrush() def columnCount(self, parent: QModelIndex = QModelIndex()) -> int: """Standard function for view""" return 9 def data(self, index: QModelIndex, role: int = Qt.ItemDataRole.DisplayRole): """Return data to view""" if not index.isValid() or not (0 <= index.row() < len(self.playlist_rows)): return QVariant() row = index.row() column = index.column() # prd for playlist row data as it's used a lot prd = self.playlist_rows[row] # Dispatch to role-specific functions if role == Qt.ItemDataRole.DisplayRole: return self.display_role(row, column, prd) elif role == Qt.ItemDataRole.DecorationRole: pass elif role == Qt.ItemDataRole.EditRole: return self.edit_role(row, column, prd) elif role == Qt.ItemDataRole.ToolTipRole: pass elif role == Qt.ItemDataRole.StatusTipRole: pass elif role == Qt.ItemDataRole.WhatsThisRole: pass elif role == Qt.ItemDataRole.SizeHintRole: pass elif role == Qt.ItemDataRole.FontRole: pass elif role == Qt.ItemDataRole.TextAlignmentRole: pass elif role == Qt.ItemDataRole.BackgroundRole: return self.background_role(row, column, prd) elif role == Qt.ItemDataRole.ForegroundRole: pass elif role == Qt.ItemDataRole.CheckStateRole: pass elif role == Qt.ItemDataRole.InitialSortOrderRole: pass # Fall through to no-op return QVariant() def display_role(self, row: int, column: int, prd: PlaylistRowData) -> QVariant: """ Return text for display """ if not prd.path: # No track so this is a header row if column == HEADER_NOTES_COLUMN: self.signals.span_cells_signal.emit( row, HEADER_NOTES_COLUMN, 1, self.columnCount() - 1 ) return QVariant(prd.note) else: return QVariant() if column == Col.START_GAP.value: return QVariant(prd.start_gap) if column == Col.TITLE.value: return QVariant(prd.title) if column == Col.ARTIST.value: return QVariant(prd.artist) if column == Col.DURATION.value: return QVariant(prd.duration) if column == Col.START_TIME.value: return QVariant("FIXME") if column == Col.END_TIME.value: return QVariant("FIXME") if column == Col.LAST_PLAYED.value: return QVariant(prd.lastplayed) if column == Col.BITRATE.value: return QVariant(prd.bitrate) if column == Col.NOTE.value: return QVariant(prd.note) return QVariant() def edit_role(self, row: int, column: int, prd: PlaylistRowData) -> QVariant: """ Return text for editing """ if column == Col.TITLE.value: return QVariant(prd.title) if column == Col.ARTIST.value: return QVariant(prd.artist) if column == Col.NOTE.value: return QVariant(prd.note) return QVariant() def flags(self, index: QModelIndex) -> Qt.ItemFlag: """ Standard model flags """ if not index.isValid(): return Qt.ItemFlag.ItemIsDropEnabled default = ( Qt.ItemFlag.ItemIsEnabled | Qt.ItemFlag.ItemIsSelectable | Qt.ItemFlag.ItemIsDragEnabled ) if index.column() in [Col.TITLE.value, Col.ARTIST.value, Col.NOTE.value]: return default | Qt.ItemFlag.ItemIsEditable return default def headerData( self, section: int, orientation: Qt.Orientation, role: int = Qt.ItemDataRole.DisplayRole, ) -> QVariant: """ Return text for headers """ if role == Qt.ItemDataRole.DisplayRole: if orientation == Qt.Orientation.Horizontal: if section == Col.START_GAP.value: return QVariant(Config.HEADER_START_GAP) elif section == Col.TITLE.value: return QVariant(Config.HEADER_TITLE) elif section == Col.ARTIST.value: return QVariant(Config.HEADER_ARTIST) elif section == Col.DURATION.value: return QVariant(Config.HEADER_DURATION) elif section == Col.START_TIME.value: return QVariant(Config.HEADER_START_TIME) elif section == Col.END_TIME.value: return QVariant(Config.HEADER_END_TIME) elif section == Col.LAST_PLAYED.value: return QVariant(Config.HEADER_LAST_PLAYED) elif section == Col.BITRATE.value: return QVariant(Config.HEADER_BITRATE) elif section == Col.NOTE.value: return QVariant(Config.HEADER_NOTE) else: if Config.ROWS_FROM_ZERO: return QVariant(str(section)) else: return QVariant(str(section + 1)) elif role == Qt.ItemDataRole.FontRole: boldfont = QFont() boldfont.setBold(True) return QVariant(boldfont) return QVariant() def insert_header_row(self, row_number: Optional[int], text: str) -> None: """ Insert a header row. Return row number or None if insertion failed. """ with Session() as session: plr = self._insert_row(session, row_number) # Update the PlaylistRows object plr.note = text # Repopulate self.playlist_rows self.refresh_data(session) # Update the display from the new row onwards self.invalidate_rows(list(range(plr.plr_rownum, len(self.playlist_rows)))) def _insert_row( self, session: scoped_session, row_number: Optional[int] ) -> PlaylistRows: """ Insert a row in the database. If row_number is greater than length of list plus 1, or if row number is None, put row at end of list. Move existing rows to make space if ncessary. Return the new PlaylistRows object. """ if row_number is None or row_number > len(self.playlist_rows): # We are adding to the end of the list so we can optimise new_row_number = len(self.playlist_rows) return PlaylistRows(session, self.playlist_id, new_row_number) elif row_number < 0: raise ValueError( f"playlistmodel._insert_row, invalid row number ({row_number})" ) else: new_row_number = row_number # Move rows below new row down stmt = ( update(PlaylistRows) .where(PlaylistRows.plr_rownum >= new_row_number) .values({PlaylistRows.plr_rownum: PlaylistRows.plr_rownum + 1}) ) session.execute(stmt) # Insert the new row and return it return PlaylistRows(session, self.playlist_id, new_row_number) def invalidate_row(self, modified_row: int) -> None: """ Signal to view to refresh invlidated row """ self.dataChanged.emit( self.index(modified_row, 0), self.index(modified_row, self.columnCount()) ) def invalidate_rows(self, modified_rows: List[int]) -> None: """ Signal to view to refresh invlidated rows """ for modified_row in modified_rows: self.invalidate_row(modified_row) def move_rows(self, from_rows: List[int], to_row_number: int) -> None: """ Move the playlist rows given to to_row and below. """ # Build a {current_row_number: new_row_number} dictionary row_map: dict[int, int] = {} # Put the from_row row numbers into the row_map. Ultimately the # total number of elements in the playlist doesn't change, so # check that adding the moved rows starting at to_row won't # overshoot the end of the playlist. if to_row_number + len(from_rows) > len(self.playlist_rows): next_to_row = len(self.playlist_rows) - len(from_rows) else: next_to_row = to_row_number for from_row, to_row in zip( from_rows, range(next_to_row, next_to_row + len(from_rows)) ): row_map[from_row] = to_row # Move the remaining rows to the row_map. We want to fill it # before (if there are gaps) and after (likewise) the rows that # are moving. # This iterates old_row and new_row simultaneously. for old_row, new_row in zip( [x for x in self.playlist_rows.keys() if x not in from_rows], [y for y in range(len(self.playlist_rows)) if y not in row_map.values()], ): # Optimise: only add to map if there is a change row_map[old_row] = new_row # For SQLAlchemy, build a list of dictionaries that map plrid to # new row number: sqla_map: List[dict[str, int]] = [] for oldrow, newrow in row_map.items(): plrid = self.playlist_rows[oldrow].plrid sqla_map.append({"plrid": plrid, "plr_rownum": newrow}) # Update database. Ref: # https://docs.sqlalchemy.org/en/20/core/sqlelement.html#sqlalchemy.sql.expression.case stmt = ( update(PlaylistRows) .where( PlaylistRows.playlist_id == self.playlist_id, PlaylistRows.id == bindparam("plrid"), ) .values(plr_rownum=bindparam("plr_rownum")) ) with Session() as session: session.connection().execute(stmt, sqla_map) # Update playlist_rows self.refresh_data(session) # Update display self.invalidate_rows(list(row_map.keys())) def refresh_data(self, session: scoped_session): """Populate dicts for data calls""" # Populate self.playlist_rows with playlist data self.playlist_rows.clear() for p in PlaylistRows.deep_rows(session, self.playlist_id): self.playlist_rows[p.plr_rownum] = PlaylistRowData(p) def refresh_row(self, session, row_number): """Populate dict for one row for data calls""" p = PlaylistRows.deep_row(session, self.playlist_id, row_number) self.playlist_rows[p.plr_rownum] = PlaylistRowData(p) def rowCount(self, index: QModelIndex = QModelIndex()) -> int: """Standard function for view""" return len(self.playlist_rows) def setData( self, index: QModelIndex, value: QVariant, role: int = Qt.ItemDataRole.EditRole ) -> bool: """ Update model with edited data """ if index.isValid() and role == Qt.ItemDataRole.EditRole: row_number = index.row() column = index.column() with Session() as session: plr = session.get(PlaylistRows, self.playlist_rows[row_number].plrid) if not plr: print( f"Error saving data: {row_number=}, {column=}, " f"{value=}, {self.playlist_id=}" ) return False if column == Col.TITLE.value or column == Col.ARTIST.value: track = session.get(Tracks, plr.track_id) if not track: print(f"Error retreiving track: {plr=}") return False if column == Col.TITLE.value: track.title = str(value) elif column == Col.ARTIST.value: track.artist = str(value) else: print(f"Error updating track: {column=}, {value=}") return False elif column == Col.NOTE.value: plr.note = str(value) # Flush changes before refreshing data session.flush() self.refresh_row(session, row_number) self.dataChanged.emit(index, index, [Qt.ItemDataRole.DisplayRole, role]) return True return False def supportedDropActions(self): return Qt.DropAction.MoveAction | Qt.DropAction.CopyAction