musicmuster/app/playlistmodel.py
Keith Edmunds 617c39c0de Reworked inserting rows into model
_insert_row() handles database
insert_header() handles playlist_rows and display updates
2023-10-27 12:01:09 +01:00

482 lines
16 KiB
Python

from datetime import datetime
from enum import auto, Enum
from sqlalchemy import update
from typing import List, Optional, TYPE_CHECKING
from dbconfig import scoped_session, Session
from PyQt6.QtCore import (
QAbstractTableModel,
QModelIndex,
Qt,
QVariant,
)
from PyQt6.QtGui import (
QBrush,
QColor,
QFont,
)
from config import Config
from helpers import (
file_is_unreadable,
)
from models import PlaylistRows, Tracks
if TYPE_CHECKING:
from musicmuster import MusicMusterSignals
class Col(Enum):
START_GAP = 0
TITLE = auto()
ARTIST = auto()
DURATION = auto()
START_TIME = auto()
END_TIME = auto()
LAST_PLAYED = auto()
BITRATE = auto()
NOTE = auto()
HEADER_NOTES_COLUMN = 1
class PlaylistRowData:
def __init__(self, plr: PlaylistRows) -> None:
"""
Populate PlaylistRowData from database PlaylistRows record
"""
self.start_gap: Optional[int] = None
self.title: str = ""
self.artist: str = ""
self.duration: int = 0
self.lastplayed: datetime = Config.EPOCH
self.bitrate = 0
self.path = ""
self.plrid: int = plr.id
self.plr_rownum: int = plr.plr_rownum
self.note: str = plr.note
if plr.track:
self.start_gap = plr.track.start_gap
self.title = plr.track.title
self.artist = plr.track.artist
self.duration = plr.track.duration
if plr.track.playdates:
self.lastplayed = max([a.lastplayed for a in plr.track.playdates])
else:
self.lastplayed = Config.EPOCH
self.bitrate = plr.track.bitrate or 0
self.path = plr.track.path
def __repr__(self) -> str:
return (
f"<PlaylistRowData: plrid={self.plrid}, plr_rownum={self.plr_rownum}, "
f"note='{self.note}', title='{self.title}', artist='{self.artist}'>"
)
class PlaylistModel(QAbstractTableModel):
"""
The Playlist Model
Update strategy: update the database and then refresh the
row-indexed cached copy (self.playlist_rows). Do not edit
self.playlist_rows directly because keeping it and the
database in sync is uncessarily challenging.
refresh_row() will populate one row of playlist_rows from the
database
refresh_data() will repopulate all of playlist_rows from the
database
"""
def __init__(
self, playlist_id: int, signals: "MusicMusterSignals", *args, **kwargs
):
self.playlist_id = playlist_id
self.signals = signals
super().__init__(*args, **kwargs)
self.playlist_rows: dict[int, PlaylistRowData] = {}
with Session() as session:
self.refresh_data(session)
def __repr__(self) -> str:
return (
f"<PlaylistModel: playlist_id={self.playlist_id}, {self.rowCount()} rows>"
)
def background_role(self, row: int, column: int, prd: PlaylistRowData) -> QBrush:
"""Return background setting"""
# Handle entire row colouring
# Header row
if not prd.path:
return QBrush(QColor(Config.COLOUR_NOTES_PLAYLIST))
# Unreadable track file
if file_is_unreadable(prd.path):
return QBrush(QColor(Config.COLOUR_UNREADABLE))
# Individual cell colouring
if column == Col.START_GAP.value:
if prd.start_gap and prd.start_gap >= Config.START_GAP_WARNING_THRESHOLD:
return QBrush(QColor(Config.COLOUR_LONG_START))
if column == Col.BITRATE.value:
if prd.bitrate < Config.BITRATE_LOW_THRESHOLD:
return QBrush(QColor(Config.COLOUR_BITRATE_LOW))
elif prd.bitrate and prd.bitrate < Config.BITRATE_OK_THRESHOLD:
return QBrush(QColor(Config.COLOUR_BITRATE_MEDIUM))
else:
return QBrush(QColor(Config.COLOUR_BITRATE_OK))
return QBrush()
def columnCount(self, parent: QModelIndex = QModelIndex()) -> int:
"""Standard function for view"""
return 9
def data(self, index: QModelIndex, role: int = Qt.ItemDataRole.DisplayRole):
"""Return data to view"""
if not index.isValid() or not (0 <= index.row() < len(self.playlist_rows)):
return QVariant()
row = index.row()
column = index.column()
# prd for playlist row data as it's used a lot
prd = self.playlist_rows[row]
# Dispatch to role-specific functions
if role == Qt.ItemDataRole.DisplayRole:
return self.display_role(row, column, prd)
elif role == Qt.ItemDataRole.DecorationRole:
pass
elif role == Qt.ItemDataRole.EditRole:
return self.edit_role(row, column, prd)
elif role == Qt.ItemDataRole.ToolTipRole:
pass
elif role == Qt.ItemDataRole.StatusTipRole:
pass
elif role == Qt.ItemDataRole.WhatsThisRole:
pass
elif role == Qt.ItemDataRole.SizeHintRole:
pass
elif role == Qt.ItemDataRole.FontRole:
pass
elif role == Qt.ItemDataRole.TextAlignmentRole:
pass
elif role == Qt.ItemDataRole.BackgroundRole:
return self.background_role(row, column, prd)
elif role == Qt.ItemDataRole.ForegroundRole:
pass
elif role == Qt.ItemDataRole.CheckStateRole:
pass
elif role == Qt.ItemDataRole.InitialSortOrderRole:
pass
# Fall through to no-op
return QVariant()
def display_role(self, row: int, column: int, prd: PlaylistRowData) -> QVariant:
"""
Return text for display
"""
if not prd.path:
# No track so this is a header row
if column == HEADER_NOTES_COLUMN:
self.signals.span_cells_signal.emit(
row, HEADER_NOTES_COLUMN, 1, self.columnCount() - 1
)
return QVariant(prd.note)
else:
return QVariant()
if column == Col.START_GAP.value:
return QVariant(prd.start_gap)
if column == Col.TITLE.value:
return QVariant(prd.title)
if column == Col.ARTIST.value:
return QVariant(prd.artist)
if column == Col.DURATION.value:
return QVariant(prd.duration)
if column == Col.START_TIME.value:
return QVariant("FIXME")
if column == Col.END_TIME.value:
return QVariant("FIXME")
if column == Col.LAST_PLAYED.value:
return QVariant(prd.lastplayed)
if column == Col.BITRATE.value:
return QVariant(prd.bitrate)
if column == Col.NOTE.value:
return QVariant(prd.note)
return QVariant()
def edit_role(self, row: int, column: int, prd: PlaylistRowData) -> QVariant:
"""
Return text for editing
"""
if column == Col.TITLE.value:
return QVariant(prd.title)
if column == Col.ARTIST.value:
return QVariant(prd.artist)
if column == Col.NOTE.value:
return QVariant(prd.note)
return QVariant()
def flags(self, index: QModelIndex) -> Qt.ItemFlag:
"""
Standard model flags
"""
if not index.isValid():
return Qt.ItemFlag.ItemIsDropEnabled
default = (
Qt.ItemFlag.ItemIsEnabled
| Qt.ItemFlag.ItemIsSelectable
| Qt.ItemFlag.ItemIsDragEnabled
)
if index.column() in [Col.TITLE.value, Col.ARTIST.value, Col.NOTE.value]:
return default | Qt.ItemFlag.ItemIsEditable
return default
def headerData(
self,
section: int,
orientation: Qt.Orientation,
role: int = Qt.ItemDataRole.DisplayRole,
) -> QVariant:
"""
Return text for headers
"""
if role == Qt.ItemDataRole.DisplayRole:
if orientation == Qt.Orientation.Horizontal:
if section == Col.START_GAP.value:
return QVariant(Config.HEADER_START_GAP)
elif section == Col.TITLE.value:
return QVariant(Config.HEADER_TITLE)
elif section == Col.ARTIST.value:
return QVariant(Config.HEADER_ARTIST)
elif section == Col.DURATION.value:
return QVariant(Config.HEADER_DURATION)
elif section == Col.START_TIME.value:
return QVariant(Config.HEADER_START_TIME)
elif section == Col.END_TIME.value:
return QVariant(Config.HEADER_END_TIME)
elif section == Col.LAST_PLAYED.value:
return QVariant(Config.HEADER_LAST_PLAYED)
elif section == Col.BITRATE.value:
return QVariant(Config.HEADER_BITRATE)
elif section == Col.NOTE.value:
return QVariant(Config.HEADER_NOTE)
else:
if Config.ROWS_FROM_ZERO:
return QVariant(str(section))
else:
return QVariant(str(section + 1))
elif role == Qt.ItemDataRole.FontRole:
boldfont = QFont()
boldfont.setBold(True)
return QVariant(boldfont)
return QVariant()
def insert_header_row(self, row_number: Optional[int], text: str) -> None:
"""
Insert a header row. Return row number or None if insertion failed.
"""
with Session() as session:
plr = self._insert_row(session, row_number)
# Update the PlaylistRows object
plr.note = text
# Repopulate self.playlist_rows
self.refresh_data(session)
# Update the display from the new row onwards
self.invalidate_rows(list(range(plr.plr_rownum, len(self.playlist_rows))))
def _insert_row(
self, session: scoped_session, row_number: Optional[int]
) -> PlaylistRows:
"""
Insert a row in the database.
If row_number is greater than length of list plus 1, or if row
number is None, put row at end of list.
Move existing rows to make space if ncessary.
Return the new PlaylistRows object.
"""
if row_number is None or row_number > len(self.playlist_rows):
# We are adding to the end of the list so we can optimise
new_row_number = len(self.playlist_rows)
return PlaylistRows(session, self.playlist_id, new_row_number)
elif row_number < 0:
raise ValueError(
f"playlistmodel._insert_row, invalid row number ({row_number})"
)
else:
new_row_number = row_number
# Move rows below new row down
stmt = (
update(PlaylistRows)
.where(PlaylistRows.plr_rownum >= new_row_number)
.values({PlaylistRows.plr_rownum: PlaylistRows.plr_rownum + 1})
)
session.execute(stmt)
# Insert the new row and return it
return PlaylistRows(session, self.playlist_id, new_row_number)
def invalidate_row(self, modified_row: int) -> None:
"""
Signal to view to refresh invlidated row
"""
self.dataChanged.emit(
self.index(modified_row, 0), self.index(modified_row, self.columnCount())
)
def invalidate_rows(self, modified_rows: List[int]) -> None:
"""
Signal to view to refresh invlidated rows
"""
for modified_row in modified_rows:
self.invalidate_row(modified_row)
def move_rows(self, from_rows: List[int], to_row: int) -> None:
"""
Move the playlist rows given to to_row and below.
"""
new_playlist_rows: dict[int, PlaylistRowData] = {}
# Move the from_row records from the playlist_rows dict to the
# new_playlist_rows dict. The total number of elements in the
# playlist doesn't change, so check that adding the moved rows
# starting at to_row won't overshoot the end of the playlist.
if to_row + len(from_rows) > len(self.playlist_rows):
next_to_row = len(self.playlist_rows) - len(from_rows)
else:
next_to_row = to_row
for from_row in from_rows:
new_playlist_rows[next_to_row] = self.playlist_rows[from_row]
del self.playlist_rows[from_row]
next_to_row += 1
# Move the remaining rows to the gaps in new_playlist_rows
new_row = 0
for old_row in self.playlist_rows.keys():
# Find next gap
while new_row in new_playlist_rows:
new_row += 1
new_playlist_rows[new_row] = self.playlist_rows[old_row]
new_row += 1
# Make copy of rows live
self.playlist_rows = new_playlist_rows
# Update PlaylistRows table and notify display of rows that
# moved
with Session() as session:
for idx in range(len(self.playlist_rows)):
if self.playlist_rows[idx].plr_rownum == idx:
continue
# Row number in this row is incorred. Fix it in
# database:
plr = session.get(PlaylistRows, self.playlist_rows[idx].plrid)
if not plr:
print(f"\nCan't find plr in playlistmodel:move_rows {idx=}")
continue
plr.plr_rownum = idx
# Fix in self.playlist_rows
self.playlist_rows[idx].plr_rownum = idx
# Update display
self.invalidate_row(idx)
def refresh_data(self, session: scoped_session):
"""Populate dicts for data calls"""
# Populate self.playlist_rows with playlist data
self.playlist_rows.clear()
for p in PlaylistRows.deep_rows(session, self.playlist_id):
self.playlist_rows[p.plr_rownum] = PlaylistRowData(p)
def refresh_row(self, session, row_number):
"""Populate dict for one row for data calls"""
p = PlaylistRows.deep_row(session, self.playlist_id, row_number)
self.playlist_rows[p.plr_rownum] = PlaylistRowData(p)
def rowCount(self, index: QModelIndex = QModelIndex()) -> int:
"""Standard function for view"""
return len(self.playlist_rows)
def setData(
self, index: QModelIndex, value: QVariant, role: int = Qt.ItemDataRole.EditRole
) -> bool:
"""
Update model with edited data
"""
if index.isValid() and role == Qt.ItemDataRole.EditRole:
row_number = index.row()
column = index.column()
with Session() as session:
plr = session.get(PlaylistRows, self.playlist_rows[row_number].plrid)
if not plr:
print(
f"Error saving data: {row_number=}, {column=}, "
f"{value=}, {self.playlist_id=}"
)
return False
if column == Col.TITLE.value or column == Col.ARTIST.value:
track = session.get(Tracks, plr.track_id)
if not track:
print(f"Error retreiving track: {plr=}")
return False
if column == Col.TITLE.value:
track.title = str(value)
elif column == Col.ARTIST.value:
track.artist = str(value)
else:
print(f"Error updating track: {column=}, {value=}")
return False
elif column == Col.NOTE.value:
plr.note = str(value)
# Flush changes before refreshing data
session.flush()
self.refresh_row(session, row_number)
self.dataChanged.emit(index, index, [Qt.ItemDataRole.DisplayRole, role])
return True
return False
def supportedDropActions(self):
return Qt.DropAction.MoveAction | Qt.DropAction.CopyAction