562 lines
18 KiB
Python
562 lines
18 KiB
Python
from datetime import datetime
|
|
from enum import auto, Enum
|
|
from typing import List, Optional
|
|
|
|
from PyQt6.QtCore import (
|
|
QAbstractTableModel,
|
|
QModelIndex,
|
|
Qt,
|
|
QVariant,
|
|
)
|
|
from PyQt6.QtGui import (
|
|
QBrush,
|
|
QColor,
|
|
QFont,
|
|
)
|
|
|
|
from classes import CurrentTrack, MusicMusterSignals, NextTrack
|
|
from config import Config
|
|
from dbconfig import scoped_session, Session
|
|
from helpers import file_is_unreadable
|
|
from log import log
|
|
from models import PlaylistRows, Tracks
|
|
|
|
|
|
HEADER_NOTES_COLUMN = 1
|
|
|
|
|
|
class Col(Enum):
|
|
START_GAP = 0
|
|
TITLE = auto()
|
|
ARTIST = auto()
|
|
DURATION = auto()
|
|
START_TIME = auto()
|
|
END_TIME = auto()
|
|
LAST_PLAYED = auto()
|
|
BITRATE = auto()
|
|
NOTE = auto()
|
|
|
|
|
|
class PlaylistRowData:
|
|
def __init__(self, plr: PlaylistRows) -> None:
|
|
"""
|
|
Populate PlaylistRowData from database PlaylistRows record
|
|
"""
|
|
|
|
self.start_gap: Optional[int] = None
|
|
self.title: str = ""
|
|
self.artist: str = ""
|
|
self.duration: int = 0
|
|
self.lastplayed: datetime = Config.EPOCH
|
|
self.bitrate = 0
|
|
self.path = ""
|
|
|
|
self.plrid: int = plr.id
|
|
self.plr_rownum: int = plr.plr_rownum
|
|
self.note: str = plr.note
|
|
if plr.track:
|
|
self.start_gap = plr.track.start_gap
|
|
self.title = plr.track.title
|
|
self.artist = plr.track.artist
|
|
self.duration = plr.track.duration
|
|
if plr.track.playdates:
|
|
self.lastplayed = max([a.lastplayed for a in plr.track.playdates])
|
|
else:
|
|
self.lastplayed = Config.EPOCH
|
|
self.bitrate = plr.track.bitrate or 0
|
|
self.path = plr.track.path
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f"<PlaylistRowData: plrid={self.plrid}, plr_rownum={self.plr_rownum}, "
|
|
f"note='{self.note}', title='{self.title}', artist='{self.artist}'>"
|
|
)
|
|
|
|
|
|
class PlaylistModel(QAbstractTableModel):
|
|
"""
|
|
The Playlist Model
|
|
|
|
Update strategy: update the database and then refresh the
|
|
row-indexed cached copy (self.playlist_rows). Do not edit
|
|
self.playlist_rows directly because keeping it and the
|
|
database in sync is uncessarily challenging.
|
|
|
|
refresh_row() will populate one row of playlist_rows from the
|
|
database
|
|
|
|
refresh_data() will repopulate all of playlist_rows from the
|
|
database
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
playlist_id: int,
|
|
*args,
|
|
**kwargs,
|
|
):
|
|
self.playlist_id = playlist_id
|
|
super().__init__(*args, **kwargs)
|
|
|
|
self.playlist_rows: dict[int, PlaylistRowData] = {}
|
|
self.signals = MusicMusterSignals()
|
|
|
|
self.signals.add_track_to_playlist_signal.connect(self.add_track)
|
|
self.signals.add_track_to_header_signal.connect(self.add_track_to_header)
|
|
|
|
with Session() as session:
|
|
self.refresh_data(session)
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f"<PlaylistModel: playlist_id={self.playlist_id}, {self.rowCount()} rows>"
|
|
)
|
|
|
|
def add_track(
|
|
self,
|
|
playlist_id: int,
|
|
new_row_number: int,
|
|
track_id: Optional[int],
|
|
note: Optional[str],
|
|
) -> None:
|
|
"""
|
|
Add track if it's for our playlist
|
|
"""
|
|
|
|
# Ignore if it's not for us
|
|
if playlist_id != self.playlist_id:
|
|
return
|
|
|
|
# Insert track if we have one
|
|
if track_id:
|
|
self.insert_track_row(new_row_number, track_id, note)
|
|
# If we only have a note, add as a header row
|
|
elif note:
|
|
self.insert_header_row(new_row_number, note)
|
|
else:
|
|
# No track, no note, no point
|
|
return
|
|
|
|
def add_track_to_header(
|
|
self,
|
|
playlist_id: int,
|
|
row_number: int,
|
|
track_id: int,
|
|
) -> None:
|
|
"""
|
|
Add track to existing header row if it's for our playlist
|
|
"""
|
|
|
|
# Ignore if it's not for us
|
|
if playlist_id != self.playlist_id:
|
|
return
|
|
|
|
# Get existing row
|
|
try:
|
|
prd = self.playlist_rows[row_number]
|
|
except KeyError:
|
|
log.error(
|
|
f"KeyError in PlaylistModel:add_track_to_header ({playlist_id=}, "
|
|
f"{row_number=}, {track_id=}, {len(self.playlist_rows)=}"
|
|
)
|
|
return
|
|
if prd.path:
|
|
log.error(
|
|
f"Error in PlaylistModel:add_track_to_header ({prd=}, "
|
|
"Header row already has track associated"
|
|
)
|
|
return
|
|
with Session() as session:
|
|
plr = session.get(PlaylistRows, prd.plrid)
|
|
if plr:
|
|
# Add track to PlaylistRows
|
|
plr.track_id = track_id
|
|
# Reset header row spanning
|
|
self.signals.span_cells_signal.emit(
|
|
row_number, HEADER_NOTES_COLUMN, 1, 1
|
|
)
|
|
# Update local copy
|
|
self.refresh_row(session, row_number)
|
|
# Repaint row
|
|
self.invalidate_row(row_number)
|
|
|
|
def background_role(self, row: int, column: int, prd: PlaylistRowData) -> QBrush:
|
|
"""Return background setting"""
|
|
|
|
# Handle entire row colouring
|
|
# Header row
|
|
if not prd.path:
|
|
return QBrush(QColor(Config.COLOUR_NOTES_PLAYLIST))
|
|
# Unreadable track file
|
|
if file_is_unreadable(prd.path):
|
|
return QBrush(QColor(Config.COLOUR_UNREADABLE))
|
|
|
|
# Individual cell colouring
|
|
if column == Col.START_GAP.value:
|
|
if prd.start_gap and prd.start_gap >= Config.START_GAP_WARNING_THRESHOLD:
|
|
return QBrush(QColor(Config.COLOUR_LONG_START))
|
|
if column == Col.BITRATE.value:
|
|
if prd.bitrate < Config.BITRATE_LOW_THRESHOLD:
|
|
return QBrush(QColor(Config.COLOUR_BITRATE_LOW))
|
|
elif prd.bitrate and prd.bitrate < Config.BITRATE_OK_THRESHOLD:
|
|
return QBrush(QColor(Config.COLOUR_BITRATE_MEDIUM))
|
|
else:
|
|
return QBrush(QColor(Config.COLOUR_BITRATE_OK))
|
|
|
|
return QBrush()
|
|
|
|
def columnCount(self, parent: QModelIndex = QModelIndex()) -> int:
|
|
"""Standard function for view"""
|
|
|
|
return 9
|
|
|
|
def data(self, index: QModelIndex, role: int = Qt.ItemDataRole.DisplayRole):
|
|
"""Return data to view"""
|
|
|
|
if not index.isValid() or not (0 <= index.row() < len(self.playlist_rows)):
|
|
return QVariant()
|
|
|
|
row = index.row()
|
|
column = index.column()
|
|
# prd for playlist row data as it's used a lot
|
|
prd = self.playlist_rows[row]
|
|
|
|
# Dispatch to role-specific functions
|
|
if role == Qt.ItemDataRole.DisplayRole:
|
|
return self.display_role(row, column, prd)
|
|
elif role == Qt.ItemDataRole.DecorationRole:
|
|
pass
|
|
elif role == Qt.ItemDataRole.EditRole:
|
|
return self.edit_role(row, column, prd)
|
|
elif role == Qt.ItemDataRole.ToolTipRole:
|
|
pass
|
|
elif role == Qt.ItemDataRole.StatusTipRole:
|
|
pass
|
|
elif role == Qt.ItemDataRole.WhatsThisRole:
|
|
pass
|
|
elif role == Qt.ItemDataRole.SizeHintRole:
|
|
pass
|
|
elif role == Qt.ItemDataRole.FontRole:
|
|
pass
|
|
elif role == Qt.ItemDataRole.TextAlignmentRole:
|
|
pass
|
|
elif role == Qt.ItemDataRole.BackgroundRole:
|
|
return self.background_role(row, column, prd)
|
|
elif role == Qt.ItemDataRole.ForegroundRole:
|
|
pass
|
|
elif role == Qt.ItemDataRole.CheckStateRole:
|
|
pass
|
|
elif role == Qt.ItemDataRole.InitialSortOrderRole:
|
|
pass
|
|
|
|
# Fall through to no-op
|
|
return QVariant()
|
|
|
|
def display_role(self, row: int, column: int, prd: PlaylistRowData) -> QVariant:
|
|
"""
|
|
Return text for display
|
|
"""
|
|
|
|
if not prd.path:
|
|
# No track so this is a header row
|
|
if column == HEADER_NOTES_COLUMN:
|
|
self.signals.span_cells_signal.emit(
|
|
row, HEADER_NOTES_COLUMN, 1, self.columnCount() - 1
|
|
)
|
|
return QVariant(prd.note)
|
|
else:
|
|
return QVariant()
|
|
|
|
if column == Col.START_GAP.value:
|
|
return QVariant(prd.start_gap)
|
|
if column == Col.TITLE.value:
|
|
return QVariant(prd.title)
|
|
if column == Col.ARTIST.value:
|
|
return QVariant(prd.artist)
|
|
if column == Col.DURATION.value:
|
|
return QVariant(prd.duration)
|
|
if column == Col.START_TIME.value:
|
|
return QVariant("FIXME")
|
|
if column == Col.END_TIME.value:
|
|
return QVariant("FIXME")
|
|
if column == Col.LAST_PLAYED.value:
|
|
return QVariant(prd.lastplayed)
|
|
if column == Col.BITRATE.value:
|
|
return QVariant(prd.bitrate)
|
|
if column == Col.NOTE.value:
|
|
return QVariant(prd.note)
|
|
|
|
return QVariant()
|
|
|
|
def edit_role(self, row: int, column: int, prd: PlaylistRowData) -> QVariant:
|
|
"""
|
|
Return text for editing
|
|
"""
|
|
|
|
if column == Col.TITLE.value:
|
|
return QVariant(prd.title)
|
|
if column == Col.ARTIST.value:
|
|
return QVariant(prd.artist)
|
|
if column == Col.NOTE.value:
|
|
return QVariant(prd.note)
|
|
|
|
return QVariant()
|
|
|
|
def flags(self, index: QModelIndex) -> Qt.ItemFlag:
|
|
"""
|
|
Standard model flags
|
|
"""
|
|
|
|
if not index.isValid():
|
|
return Qt.ItemFlag.ItemIsDropEnabled
|
|
|
|
default = (
|
|
Qt.ItemFlag.ItemIsEnabled
|
|
| Qt.ItemFlag.ItemIsSelectable
|
|
| Qt.ItemFlag.ItemIsDragEnabled
|
|
)
|
|
if index.column() in [Col.TITLE.value, Col.ARTIST.value, Col.NOTE.value]:
|
|
return default | Qt.ItemFlag.ItemIsEditable
|
|
|
|
return default
|
|
|
|
def headerData(
|
|
self,
|
|
section: int,
|
|
orientation: Qt.Orientation,
|
|
role: int = Qt.ItemDataRole.DisplayRole,
|
|
) -> QVariant:
|
|
"""
|
|
Return text for headers
|
|
"""
|
|
|
|
if role == Qt.ItemDataRole.DisplayRole:
|
|
if orientation == Qt.Orientation.Horizontal:
|
|
if section == Col.START_GAP.value:
|
|
return QVariant(Config.HEADER_START_GAP)
|
|
elif section == Col.TITLE.value:
|
|
return QVariant(Config.HEADER_TITLE)
|
|
elif section == Col.ARTIST.value:
|
|
return QVariant(Config.HEADER_ARTIST)
|
|
elif section == Col.DURATION.value:
|
|
return QVariant(Config.HEADER_DURATION)
|
|
elif section == Col.START_TIME.value:
|
|
return QVariant(Config.HEADER_START_TIME)
|
|
elif section == Col.END_TIME.value:
|
|
return QVariant(Config.HEADER_END_TIME)
|
|
elif section == Col.LAST_PLAYED.value:
|
|
return QVariant(Config.HEADER_LAST_PLAYED)
|
|
elif section == Col.BITRATE.value:
|
|
return QVariant(Config.HEADER_BITRATE)
|
|
elif section == Col.NOTE.value:
|
|
return QVariant(Config.HEADER_NOTE)
|
|
else:
|
|
if Config.ROWS_FROM_ZERO:
|
|
return QVariant(str(section))
|
|
else:
|
|
return QVariant(str(section + 1))
|
|
|
|
elif role == Qt.ItemDataRole.FontRole:
|
|
boldfont = QFont()
|
|
boldfont.setBold(True)
|
|
return QVariant(boldfont)
|
|
|
|
return QVariant()
|
|
|
|
def is_header_row(self, row_number: int) -> bool:
|
|
"""
|
|
Return True if row is a header row, else False
|
|
"""
|
|
|
|
return self.playlist_rows[row_number].path == ""
|
|
|
|
def insert_header_row(self, row_number: Optional[int], text: str) -> None:
|
|
"""
|
|
Insert a header row.
|
|
"""
|
|
|
|
with Session() as session:
|
|
plr = self._insert_row(session, row_number)
|
|
# Update the PlaylistRows object
|
|
plr.note = text
|
|
# Repopulate self.playlist_rows
|
|
self.refresh_data(session)
|
|
# Update the display from the new row onwards
|
|
self.invalidate_rows(list(range(plr.plr_rownum, len(self.playlist_rows))))
|
|
|
|
def _insert_row(
|
|
self, session: scoped_session, row_number: Optional[int]
|
|
) -> PlaylistRows:
|
|
"""
|
|
Insert a row in the database.
|
|
|
|
If row_number is greater than length of list plus 1, or if row
|
|
number is None, put row at end of list.
|
|
|
|
Move existing rows to make space if ncessary.
|
|
|
|
Return the new PlaylistRows object.
|
|
"""
|
|
|
|
if row_number is None or row_number > len(self.playlist_rows):
|
|
# We are adding to the end of the list so we can optimise
|
|
new_row_number = len(self.playlist_rows)
|
|
return PlaylistRows(session, self.playlist_id, new_row_number)
|
|
elif row_number < 0:
|
|
raise ValueError(
|
|
f"playlistmodel._insert_row, invalid row number ({row_number})"
|
|
)
|
|
else:
|
|
new_row_number = row_number
|
|
|
|
# Insert the new row and return it
|
|
return PlaylistRows.insert_row(session, self.playlist_id, new_row_number)
|
|
|
|
def insert_track_row(
|
|
self, row_number: Optional[int], track_id: int, text: Optional[str]
|
|
) -> None:
|
|
"""
|
|
Insert a track row.
|
|
"""
|
|
|
|
with Session() as session:
|
|
plr = self._insert_row(session, row_number)
|
|
# Update the PlaylistRows object
|
|
plr.track_id = track_id
|
|
if text:
|
|
plr.note = text
|
|
# Repopulate self.playlist_rows
|
|
self.refresh_data(session)
|
|
# Update the display from the new row onwards
|
|
self.invalidate_rows(list(range(plr.plr_rownum, len(self.playlist_rows))))
|
|
|
|
def invalidate_row(self, modified_row: int) -> None:
|
|
"""
|
|
Signal to view to refresh invlidated row
|
|
"""
|
|
|
|
self.dataChanged.emit(
|
|
self.index(modified_row, 0), self.index(modified_row, self.columnCount())
|
|
)
|
|
|
|
def invalidate_rows(self, modified_rows: List[int]) -> None:
|
|
"""
|
|
Signal to view to refresh invlidated rows
|
|
"""
|
|
|
|
for modified_row in modified_rows:
|
|
self.invalidate_row(modified_row)
|
|
|
|
def move_rows(self, from_rows: List[int], to_row_number: int) -> None:
|
|
"""
|
|
Move the playlist rows given to to_row and below.
|
|
"""
|
|
|
|
# Build a {current_row_number: new_row_number} dictionary
|
|
row_map: dict[int, int] = {}
|
|
|
|
# Put the from_row row numbers into the row_map. Ultimately the
|
|
# total number of elements in the playlist doesn't change, so
|
|
# check that adding the moved rows starting at to_row won't
|
|
# overshoot the end of the playlist.
|
|
if to_row_number + len(from_rows) > len(self.playlist_rows):
|
|
next_to_row = len(self.playlist_rows) - len(from_rows)
|
|
else:
|
|
next_to_row = to_row_number
|
|
|
|
for from_row, to_row in zip(
|
|
from_rows, range(next_to_row, next_to_row + len(from_rows))
|
|
):
|
|
row_map[from_row] = to_row
|
|
# Move the remaining rows to the row_map. We want to fill it
|
|
# before (if there are gaps) and after (likewise) the rows that
|
|
# are moving.
|
|
# This iterates old_row and new_row simultaneously.
|
|
for old_row, new_row in zip(
|
|
[x for x in self.playlist_rows.keys() if x not in from_rows],
|
|
[y for y in range(len(self.playlist_rows)) if y not in row_map.values()],
|
|
):
|
|
# Optimise: only add to map if there is a change
|
|
if old_row != new_row:
|
|
row_map[old_row] = new_row
|
|
|
|
# For SQLAlchemy, build a list of dictionaries that map plrid to
|
|
# new row number:
|
|
sqla_map: List[dict[str, int]] = []
|
|
for oldrow, newrow in row_map.items():
|
|
plrid = self.playlist_rows[oldrow].plrid
|
|
sqla_map.append({"plrid": plrid, "plr_rownum": newrow})
|
|
|
|
with Session() as session:
|
|
PlaylistRows.update_plr_rownumbers(session, self.playlist_id, sqla_map)
|
|
# Update playlist_rows
|
|
self.refresh_data(session)
|
|
|
|
# Update display
|
|
self.invalidate_rows(list(row_map.keys()))
|
|
|
|
def refresh_data(self, session: scoped_session):
|
|
"""Populate dicts for data calls"""
|
|
|
|
# Populate self.playlist_rows with playlist data
|
|
self.playlist_rows.clear()
|
|
for p in PlaylistRows.deep_rows(session, self.playlist_id):
|
|
self.playlist_rows[p.plr_rownum] = PlaylistRowData(p)
|
|
|
|
def refresh_row(self, session, row_number):
|
|
"""Populate dict for one row for data calls"""
|
|
|
|
p = PlaylistRows.deep_row(session, self.playlist_id, row_number)
|
|
self.playlist_rows[p.plr_rownum] = PlaylistRowData(p)
|
|
|
|
def rowCount(self, index: QModelIndex = QModelIndex()) -> int:
|
|
"""Standard function for view"""
|
|
|
|
return len(self.playlist_rows)
|
|
|
|
def setData(
|
|
self, index: QModelIndex, value: QVariant, role: int = Qt.ItemDataRole.EditRole
|
|
) -> bool:
|
|
"""
|
|
Update model with edited data
|
|
"""
|
|
|
|
if index.isValid() and role == Qt.ItemDataRole.EditRole:
|
|
row_number = index.row()
|
|
column = index.column()
|
|
|
|
with Session() as session:
|
|
plr = session.get(PlaylistRows, self.playlist_rows[row_number].plrid)
|
|
if not plr:
|
|
print(
|
|
f"Error saving data: {row_number=}, {column=}, "
|
|
f"{value=}, {self.playlist_id=}"
|
|
)
|
|
return False
|
|
|
|
if column == Col.TITLE.value or column == Col.ARTIST.value:
|
|
track = session.get(Tracks, plr.track_id)
|
|
if not track:
|
|
print(f"Error retreiving track: {plr=}")
|
|
return False
|
|
if column == Col.TITLE.value:
|
|
track.title = str(value)
|
|
elif column == Col.ARTIST.value:
|
|
track.artist = str(value)
|
|
else:
|
|
print(f"Error updating track: {column=}, {value=}")
|
|
return False
|
|
elif column == Col.NOTE.value:
|
|
plr.note = str(value)
|
|
|
|
# Flush changes before refreshing data
|
|
session.flush()
|
|
self.refresh_row(session, row_number)
|
|
self.dataChanged.emit(index, index, [Qt.ItemDataRole.DisplayRole, role])
|
|
return True
|
|
|
|
return False
|
|
|
|
def supportedDropActions(self):
|
|
return Qt.DropAction.MoveAction | Qt.DropAction.CopyAction
|