musicmuster/app/playlists.py
Keith Edmunds c0ae9eba9f Don't scroll display after drop
With no code, display scroll back to where the source rows came from.
With code we had, we ensured dropped rows were visible, but display
would still scroll.
Now freeze display as it is when rows are dropped.
2023-05-01 18:04:03 +01:00

2320 lines
78 KiB
Python

import os
import re
import stackprinter # type: ignore
import subprocess
import threading
import obsws_python as obs # type: ignore
from collections import namedtuple
from datetime import datetime, timedelta
from typing import Callable, cast, List, Optional, TYPE_CHECKING, Union
from PyQt6.QtCore import (
QEvent,
QModelIndex,
QObject,
QSize,
Qt,
QTimer,
)
from PyQt6.QtGui import (
QAction,
QBrush,
QColor,
QFont,
QDropEvent,
QKeyEvent
)
from PyQt6.QtWidgets import (
QAbstractItemDelegate,
QAbstractItemView,
QApplication,
QLineEdit,
QMainWindow,
QMenu,
QMessageBox,
QPlainTextEdit,
QStyledItemDelegate,
QStyleOptionViewItem,
QTableWidget,
QTableWidgetItem,
QWidget
)
from config import Config
from dbconfig import Session, scoped_session
from helpers import (
ask_yes_no,
file_is_unreadable,
get_relative_date,
ms_to_mmss,
open_in_audacity,
send_mail,
set_track_metadata,
)
from log import log
from models import (
Playdates,
Playlists,
PlaylistRows,
Settings,
Tracks,
NoteColours
)
if TYPE_CHECKING:
from musicmuster import Window, MusicMusterSignals
scene_change_re = re.compile(r"SetScene=\[([^[\]]*)\]")
section_header_cleanup_re = re.compile(r"(@\d\d:\d\d:\d\d.*)?(\+)?")
start_time_re = re.compile(r"@\d\d:\d\d:\d\d")
HEADER_NOTES_COLUMN = 2
# Columns
Column = namedtuple("Column", ['idx', 'heading'])
columns = {}
columns["userdata"] = Column(idx=0, heading=Config.COLUMN_NAME_AUTOPLAY)
columns["start_gap"] = Column(idx=1,
heading=Config.COLUMN_NAME_LEADING_SILENCE)
columns["title"] = Column(idx=2, heading=Config.COLUMN_NAME_TITLE)
columns["artist"] = Column(idx=3, heading=Config.COLUMN_NAME_ARTIST)
columns["duration"] = Column(idx=4, heading=Config.COLUMN_NAME_LENGTH)
columns["start_time"] = Column(idx=5, heading=Config.COLUMN_NAME_START_TIME)
columns["end_time"] = Column(idx=6, heading=Config.COLUMN_NAME_END_TIME)
columns["lastplayed"] = Column(idx=7, heading=Config.COLUMN_NAME_LAST_PLAYED)
columns["bitrate"] = Column(idx=8, heading=Config.COLUMN_NAME_BITRATE)
columns["row_notes"] = Column(idx=9, heading=Config.COLUMN_NAME_NOTES)
USERDATA = columns["userdata"].idx
START_GAP = columns["start_gap"].idx
TITLE = columns["title"].idx
ARTIST = columns["artist"].idx
DURATION = columns["duration"].idx
START_TIME = columns["start_time"].idx
END_TIME = columns["end_time"].idx
LASTPLAYED = columns["lastplayed"].idx
BITRATE = columns["bitrate"].idx
ROW_NOTES = columns["row_notes"].idx
class EscapeDelegate(QStyledItemDelegate):
"""
- increases the height of a row when editing to make editing easier
- closes the edit on control-return
- checks with user before abandoning edit on Escape
"""
def __init__(self, parent) -> None:
super().__init__(parent)
def createEditor(self, parent: QWidget, option: QStyleOptionViewItem,
index: QModelIndex):
"""
Intercept createEditor call and make row just a little bit taller
"""
if isinstance(self.parent(), PlaylistTab):
p = cast(PlaylistTab, self.parent())
if isinstance(index.data(), str):
row = index.row()
row_height = p.rowHeight(row)
p.setRowHeight(row, row_height + Config.MINIMUM_ROW_HEIGHT)
return QPlainTextEdit(parent)
return super().createEditor(parent, option, index)
def eventFilter(self, editor: QObject, event: QEvent) -> bool:
"""By default, QPlainTextEdit doesn't handle enter or return"""
if event.type() == QEvent.Type.KeyPress:
key_event = cast(QKeyEvent, event)
if key_event.key() == Qt.Key.Key_Return:
if key_event.modifiers() == (
Qt.KeyboardModifier.ControlModifier
):
self.commitData.emit(editor)
self.closeEditor.emit(editor)
return True
elif key_event.key() == Qt.Key.Key_Escape:
discard_edits = QMessageBox.question(
self.parent(), "Abandon edit", "Discard changes?")
if discard_edits == QMessageBox.StandardButton.Yes:
self.closeEditor.emit(editor)
return True
return False
class PlaylistTab(QTableWidget):
# Qt.ItemDataRole.UserRoles
ROW_TRACK_ID = Qt.ItemDataRole.UserRole
ROW_DURATION = Qt.ItemDataRole.UserRole + 1
PLAYLISTROW_ID = Qt.ItemDataRole.UserRole + 2
TRACK_PATH = Qt.ItemDataRole.UserRole + 3
PLAYED = Qt.ItemDataRole.UserRole + 4
def __init__(self, musicmuster: "Window",
session: scoped_session,
playlist_id: int, signals: "MusicMusterSignals") -> None:
super().__init__()
self.musicmuster: Window = musicmuster
self.playlist_id = playlist_id
self.signals = signals
# Set up widget
self.menu = QMenu()
self.setItemDelegate(EscapeDelegate(self))
self.setAlternatingRowColors(True)
self.setSelectionMode(
QAbstractItemView.SelectionMode.ExtendedSelection)
self.setSelectionBehavior(
QAbstractItemView.SelectionBehavior.SelectRows)
self.setEditTriggers(QAbstractItemView.EditTrigger.DoubleClicked)
self.setVerticalScrollMode(QAbstractItemView.ScrollMode.ScrollPerPixel)
self.setRowCount(0)
self.setColumnCount(len(columns))
self.v_header = self.verticalHeader()
self.v_header.setMinimumSectionSize(Config.MINIMUM_ROW_HEIGHT)
self.horizontalHeader().setStretchLastSection(True)
# Header row
for idx in [a for a in range(len(columns))]:
item = QTableWidgetItem()
self.setHorizontalHeaderItem(idx, item)
self.horizontalHeader().setMinimumSectionSize(0)
# Set column headings sorted by idx
self.setHorizontalHeaderLabels(
[a.heading for a in list(sorted(columns.values(),
key=lambda item: item.idx))]
)
self.horizontalHeader().sectionResized.connect(
self.resizeRowsToContents)
# Drag and drop setup
self.setAcceptDrops(True)
self.viewport().setAcceptDrops(True)
self.setDragDropOverwriteMode(False)
self.setDropIndicatorShown(True)
self.setDragDropMode(QAbstractItemView.DragDropMode.InternalMove)
self.setDragEnabled(False)
# This property defines how the widget shows a context menu
self.setContextMenuPolicy(Qt.ContextMenuPolicy.CustomContextMenu)
# This signal is emitted when the widget's contextMenuPolicy is
# Qt::CustomContextMenu, and the user has requested a context
# menu on the widget.
self.customContextMenuRequested.connect(self._context_menu)
# Call self.eventFilter() for events
self.viewport().installEventFilter(self)
self.search_text: str = ""
self.edit_cell_type: Optional[int]
self.selecting_in_progress = False
# Connect signals
self.horizontalHeader().sectionResized.connect(self._column_resize)
self.itemSelectionChanged.connect(self._select_event)
self.signals.set_next_track_signal.connect(self._reset_next)
# Load playlist rows
self.populate_display(session, self.playlist_id)
def __repr__(self) -> str:
return f"<PlaylistTab(id={self.playlist_id}>"
# ########## Events other than cell editing ##########
def dropEvent(self, event: QDropEvent) -> None:
"""
Handle drag/drop of rows
https://stackoverflow.com/questions/26227885/drag-and-drop-rows-within-qtablewidget
"""
if not event.source() == self:
return # We don't accept external drops
top_row = self.rowAt(0)
row_set = set([mi.row() for mi in self.selectedIndexes()])
targetRow = self.indexAt(event.position().toPoint()).row()
row_set.discard(targetRow)
rows = list(sorted(row_set))
if not rows:
return
if targetRow == -1:
targetRow = self.rowCount()
for _ in range(len(rows)):
self.insertRow(targetRow)
rowMapping = dict() # Src row to target row.
for idx, row in enumerate(rows):
if row < targetRow:
rowMapping[row] = targetRow + idx
else:
rowMapping[row + len(rows)] = targetRow + idx
colCount = self.columnCount()
for srcRow, tgtRow in sorted(rowMapping.items()):
if self._get_row_track_id(srcRow):
# This is a track row
for col in range(0, colCount):
self.setItem(tgtRow, col, self.takeItem(srcRow, col))
else:
self.setItem(tgtRow, HEADER_NOTES_COLUMN,
self.takeItem(srcRow, HEADER_NOTES_COLUMN))
self.setSpan(tgtRow, HEADER_NOTES_COLUMN, 1, len(columns) - 1)
for row in reversed(sorted(rowMapping.keys())):
self.removeRow(row)
self.resizeRowsToContents()
# Scroll to drop zone
self.scrollToItem(self.item(top_row, 1),
QAbstractItemView.ScrollHint.PositionAtTop)
event.accept()
# Reset drag mode to allow row selection by dragging
self.setDragEnabled(False)
with Session() as session:
self.save_playlist(session)
self._update_start_end_times(session)
self.hide_or_show_played_tracks()
def _add_context_menu(self, text: str, action: Callable,
disabled: bool = False) -> QAction:
"""
Add item to self.menu
"""
menu_item = self.menu.addAction(text)
menu_item.setDisabled(disabled)
menu_item.triggered.connect(action)
return menu_item
def mouseReleaseEvent(self, event):
"""
Enable dragging if rows are selected
"""
if self.selectedItems():
self.setDragEnabled(True)
else:
self.setDragEnabled(False)
super().mouseReleaseEvent(event)
# ########## Cell editing ##########
# We only want to allow cell editing on tracks, artists and notes,
# although notes may be section headers.
#
# Once editing starts, we need to disable play controls so that a
# 'return' doesn't play the next track.
#
# Earlier in this file:
# self.setEditTriggers(QAbstractItemView.DoubleClicked) - triggers
# editing on double-click
#
# Call sequences:
# Start editing:
# edit()
# End editing:
# _cell_changed() (only if changes made)
# closeEditor()
def _cell_changed(self, row: int, column: int) -> None:
"""Called when cell content has changed"""
# Disable cell changed signal connection as note updates will
# change cell again (metadata)
self.cellChanged.disconnect(self._cell_changed)
cell = self.item(row, column)
if not cell:
return
new_text = cell.text().strip()
# Update cell with strip()'d text
cell.setText(new_text)
track_id = self._get_row_track_id(row)
# Determine cell type changed
with Session() as session:
# Get playlistrow object
plr_id = self._get_row_plr_id(row)
plr_item = session.get(PlaylistRows, plr_id)
if not plr_item:
return
# Note any updates needed to PlaylistTrack objects
update_current = self.musicmuster.current_track.plr_id == plr_id
update_next = self.musicmuster.next_track.plr_id == plr_id
if self.edit_cell_type == ROW_NOTES:
plr_item.note = new_text
if track_id:
self._set_row_note_text(session, row, new_text)
else:
self._set_row_header_text(session, row, new_text)
else:
if track_id:
track = session.get(Tracks, track_id)
if track:
if self.edit_cell_type == TITLE:
track.title = new_text
if update_current:
self.musicmuster.current_track.title = new_text
if update_next:
self.musicmuster.next_track.title = new_text
elif self.edit_cell_type == ARTIST:
track.artist = new_text
if update_current:
self.musicmuster.current_track.artist = \
new_text
if update_next:
self.musicmuster.next_track.artist = new_text
if update_next or update_current:
self.musicmuster.update_headers()
if update_current:
self._set_row_colour_current(row)
elif update_next:
self._set_row_colour_next(row)
self.clear_selection()
def closeEditor(self,
editor: QWidget,
hint: QAbstractItemDelegate.EndEditHint) -> None:
"""
Override PySide2.QAbstractItemView.closeEditor to enable
play controls and update display.
"""
# If edit was cancelled (eg, by pressing ESC), the signal will
# still be connected
try:
self.cellChanged.disconnect(self._cell_changed)
except TypeError:
pass
self.edit_cell_type = None
self.musicmuster.enable_play_next_controls()
self.musicmuster.actionSetNext.setEnabled(True)
self.musicmuster.action_Clear_selection.setEnabled(True)
super(PlaylistTab, self).closeEditor(editor, hint)
# Optimise row heights after increasing row height for editing
self.resizeRowsToContents()
# Update start times in case a start time in a note has been
# edited
with Session() as session:
self._update_start_end_times(session)
def edit(self, index: QModelIndex, # type: ignore # FIXME
trigger: QAbstractItemView.EditTrigger,
event: QEvent) -> bool:
"""
Override PySide2.QAbstractItemView.edit to catch when editing starts
Editing only ever starts with a double click on a cell
"""
# 'result' will only be true on double-click
result = super(PlaylistTab, self).edit(index, trigger, event)
if result:
row = index.row()
column = index.column()
note_column = 0
if self._get_row_track_id(row):
# If a track row, we only allow editing of title, artist and
# note. Check that this column is one of those.
if column in [TITLE, ARTIST, ROW_NOTES]:
self.edit_cell_type = column
else:
# Can't edit other columns
return False
# Check whether we're editing a notes row for later
if self.edit_cell_type == ROW_NOTES:
note_column = ROW_NOTES
else:
# This is a section header.
if column != HEADER_NOTES_COLUMN:
return False
note_column = HEADER_NOTES_COLUMN
self.edit_cell_type = ROW_NOTES
# Disable play controls so that keyboard input doesn't
# disturb playing
self.musicmuster.disable_play_next_controls()
self.musicmuster.actionSetNext.setEnabled(False)
self.musicmuster.action_Clear_selection.setEnabled(False)
# If this is a note cell, we need to remove any existing section
# timing so user can't edit that. Keep it simple: refresh text
# from database. note_column will only be non-zero if we are
# editing a note.
if note_column:
with Session() as session:
plr_item = self._get_row_plr(session, row)
if not plr_item:
return False
if note_column == ROW_NOTES:
self._set_row_note_text(session, row, plr_item.note)
else:
self._set_row_header_text(session, row, plr_item.note)
# Connect signal so we know when cell has changed.
self.cellChanged.connect(self._cell_changed)
return result
# # ########## Externally called functions ##########
def clear_next(self) -> None:
"""
Unmark next track
"""
row_number = self._get_next_track_row_number()
if not row_number:
return
self._set_row_colour_default(row_number)
self.clear_selection()
self.musicmuster.set_next_plr_id(None, self)
def clear_selection(self) -> None:
"""Unselect all tracks and reset drag mode"""
self.clearSelection()
self.setDragEnabled(False)
def get_new_row_number(self) -> int:
"""
Return the selected row or the row count if no row selected
(ie, new row will be appended)
"""
if self.selectionModel().hasSelection():
return self.currentRow()
else:
return self.rowCount()
def get_selected_playlistrow_ids(self) -> list:
"""
Return a list of PlaylistRow ids of the selected rows
"""
return [self._get_row_plr_id(a) for a in self._get_selected_rows()]
def get_selected_playlistrows(
self, session: scoped_session) -> List[PlaylistRows]:
"""
Return a list of PlaylistRows of the selected rows
"""
plr_ids = self.get_selected_playlistrow_ids()
if not plr_ids:
return []
plrs = [session.get(PlaylistRows, a) for a in plr_ids]
return [plr for plr in plrs if plr is not None]
def hide_or_show_played_tracks(self) -> None:
"""
Hide or show played tracks.
Never hide current or next track
"""
current_next = [self._get_current_track_row_number(),
self._get_next_track_row_number()]
for row_number in range(self.rowCount()):
if row_number in current_next:
continue
if self._get_row_userdata(row_number, self.PLAYED):
if self.musicmuster.hide_played_tracks:
self.hideRow(row_number)
else:
self.showRow(row_number)
# This causes scrolling, so ensure current track is visible
self.scroll_current_to_top()
def insert_header(self, session: scoped_session, note: str) -> None:
"""
Insert section header into playlist tab.
If a row is selected, add header above. Otherwise, add to end of
playlist.
We simply build a PlaylistRows object and pass it to insert_row()
to do the heavy lifing.
"""
row_number = self.get_new_row_number()
plr = PlaylistRows(session, self.playlist_id, None, row_number, note)
self.insert_row(session, plr)
self._set_row_header_text(session, row_number, note)
self.save_playlist(session)
self._update_start_end_times(session)
def insert_row(self, session: scoped_session, plr: PlaylistRows,
update_track_times: bool = True, played=False) -> None:
"""
Insert passed playlist row (plr) into playlist tab.
"""
row_number = plr.plr_rownum
bold = True
self.insertRow(row_number)
_ = self._set_row_plr_id(row_number, plr.id)
if plr.track:
self._update_row_track_info(session, row_number, plr.track)
if played:
bold = False
_ = self._set_row_userdata(row_number, self.PLAYED, True)
if plr.note is None:
plr.note = ""
self._set_row_note_text(session, row_number, plr.note)
else:
# This is a section header so it must have note text
if plr.note is None:
log.debug(
f"insert_row({plr=}) with no track_id and no note"
)
return
# Use one QTableWidgetItem to span all columns from column 1
self._set_row_header_text(session, row_number, plr.note)
self.setSpan(row_number, HEADER_NOTES_COLUMN, 1, len(columns) - 1)
# Save (or clear) track_id
_ = self._set_row_track_id(row_number, 0)
# Set bold as needed
self._set_row_bold(row_number, bold)
def insert_track(self, session: scoped_session, track: Tracks,
note: str = "", repaint: bool = True) -> None:
"""
Insert track into playlist tab.
If a row is selected, add track above. Otherwise, add to end of
playlist.
We simply build a PlaylistRows object and pass it to insert_row()
to do the heavy lifing.
"""
if not track:
log.debug(
f"insert_track(session={hex(id(Session))}, {note=}, {repaint=}"
" called with no track"
)
return
row_number = self.get_new_row_number()
# Check to see whether track is already in playlist
existing_plr = PlaylistRows.get_track_plr(session, track.id,
self.playlist_id)
if existing_plr and ask_yes_no("Duplicate row",
"Track already in playlist. "
"Move to new location?",
default_yes=True):
# Yes it is and we should reuse it
# If we've been passed a note, we need to add that to the
# existing track
if note:
existing_plr.append_note(note)
return self._move_row(session, existing_plr, row_number)
# Build playlist_row object
plr = PlaylistRows(session, self.playlist_id, track.id,
row_number, note)
self.insert_row(session, plr)
self.save_playlist(session)
self._update_start_end_times(session)
def lookup_row_in_songfacts(self) -> None:
"""
If there is a selected row and it is a track row,
look up its title in songfacts.
If multiple rows are selected, only consider the first one.
Otherwise return.
"""
self._look_up_row(website="songfacts")
def lookup_row_in_wikipedia(self) -> None:
"""
If there is a selected row and it is a track row,
look up its title in wikipedia.
If multiple rows are selected, only consider the first one.
Otherwise return.
"""
self._look_up_row(website="wikipedia")
def play_ended(self) -> None:
"""
Called by musicmuster when play has ended.
current_track points to track that's just finished
"""
row_number = self._get_current_track_row_number()
if row_number is None:
return
self._set_row_colour_default(row_number)
self.clear_selection()
self._set_row_last_played_time(
row_number, self.musicmuster.current_track.start_time)
with Session() as session:
self._set_row_note_colour(session, row_number)
def play_started(self, session: scoped_session) -> None:
"""
Notification from musicmuster that track has started playing.
Actions required:
- Mark current row as played
- Set next track
- Display track as current
- Update start/stop times
- Change OBS scene if needed
- Update hidden tracks
"""
current_row = self._get_current_track_row_number()
if current_row is None:
if os.environ["MM_ENV"] == "PRODUCTION":
send_mail(Config.ERRORS_TO,
Config.ERRORS_FROM,
"playlists:play_started:current_row is None",
stackprinter.format()
)
print("playlists:play_started:current_row is None")
stackprinter.show(add_summary=True, style="darkbg")
return
# Mark current row as played
self._set_played_row(session, current_row)
# Set next track
next_row = self._find_next_track_row(session, current_row + 1)
if next_row:
self.musicmuster.set_next_plr_id(self._get_row_plr_id(next_row),
self)
# Display row as current track
self._set_row_colour_current(current_row)
# Update start/stop times
self._update_start_end_times(session)
# Change OBS scene if needed
self._obs_change_scene(current_row)
# Update hidden tracks
QTimer.singleShot(Config.HIDE_AFTER_PLAYING_OFFSET,
self.hide_or_show_played_tracks)
def populate_display(self, session: scoped_session, playlist_id: int,
scroll_to_top: bool = True) -> None:
"""
Populate display from the associated playlist ID
"""
# Sanity check row numbering before we load
PlaylistRows.fixup_rownumbers(session, playlist_id)
# Clear playlist
self.setRowCount(0)
# Get played tracks
played_rows = self._get_played_rows(session)
# Add the rows
playlist = session.get(Playlists, playlist_id)
if not playlist:
if os.environ["MM_ENV"] == "PRODUCTION":
send_mail(Config.ERRORS_TO,
Config.ERRORS_FROM,
"playlists:populate_display:no playlist",
stackprinter.format()
)
print("playlists:populate_display:no playlist")
stackprinter.show(add_summary=True, style="darkbg")
return
for plr in playlist.rows:
self.insert_row(session, plr, update_track_times=False,
played=plr.plr_rownum in played_rows)
# Scroll to top
if scroll_to_top:
row0_item = self.item(0, 0)
if row0_item:
self.scrollToItem(row0_item,
QAbstractItemView.ScrollHint.PositionAtTop)
# Set widths
self._set_column_widths(session)
self.save_playlist(session)
# Queue up time calculations to take place after UI has
# updated
self._update_start_end_times(session)
# It's possible that the current/next tracks are in this
# playlist, so check and set.
current_row = self._get_current_track_row_number()
if current_row is not None:
self._set_row_colour_current(current_row)
next_row = self._get_next_track_row_number()
if next_row is not None:
self._set_row_colour_next(next_row)
# Needed to wrap notes column correctly - add to event queue so
# that it's processed after list is populated
QTimer.singleShot(0, self.tab_visible)
def remove_rows(self, row_numbers: List[int]) -> None:
"""Remove passed rows from display"""
# Remove rows from display. Do so in reverse order so that
# row numbers remain valid.
for row in sorted(row_numbers, reverse=True):
self.removeRow(row)
def save_playlist(self, session: scoped_session) -> None:
"""
Get the PlaylistRow objects for each row in the display. Correct
the row_number and playlist_id if necessary. Remove any row
numbers in the database that are higher than the last row in
the display.
"""
# Ensure all row plrs have correct row number and playlist_id
for row_number in range(self.rowCount()):
plr = self._get_row_plr(session, row_number)
if not plr:
continue
plr.plr_rownum = row_number
plr.playlist_id = self.playlist_id
# Any rows in the database for this playlist that has a row
# number equal to or greater than the row count needs to be
# removed.
PlaylistRows.delete_higher_rows(
session, self.playlist_id, self.rowCount() - 1)
# Get changes into db
session.flush()
def scroll_current_to_top(self) -> None:
"""Scroll currently-playing row to top"""
current_row = self._get_current_track_row_number()
if current_row is not None:
self._scroll_to_top(current_row)
def scroll_next_to_top(self) -> None:
"""Scroll nextly-playing row to top"""
next_row = self._get_next_track_row_number()
if next_row is not None:
self._scroll_to_top(next_row)
def set_search(self, text: str) -> None:
"""Set search text and find first match"""
self.search_text = text
if not text:
# Search string has been reset
return
self._search(next=True)
def search_next(self) -> None:
"""
Select next row containg self.search_string.
"""
self._search(next=True)
def search_previous(self) -> None:
"""
Select previous row containg self.search_string.
"""
self._search(next=False)
def select_next_row(self) -> None:
"""
Select next or first row. Don't select section headers.
Wrap at last row.
"""
selected_rows = self._get_selected_rows()
# we will only handle zero or one selected rows
if len(selected_rows) > 1:
return
# select first row if none selected
if len(selected_rows) == 0:
row_number = 0
else:
row_number = selected_rows[0] + 1
if row_number >= self.rowCount():
row_number = 0
# Don't select section headers
wrapped = False
track_id = self._get_row_track_id(row_number)
while not track_id:
row_number += 1
if row_number >= self.rowCount():
if wrapped:
# we're already wrapped once, so there are no
# non-headers
return
row_number = 0
wrapped = True
track_id = self._get_row_track_id(row_number)
self.selectRow(row_number)
def select_previous_row(self) -> None:
"""
Select previous or last track. Don't select section headers.
Wrap at first row.
"""
selected_rows = self._get_selected_rows()
# we will only handle zero or one selected rows
if len(selected_rows) > 1:
return
# select last row if none selected
last_row = self.rowCount() - 1
if len(selected_rows) == 0:
row_number = last_row
else:
row_number = selected_rows[0] - 1
if row_number < 0:
row_number = last_row
# Don't select section headers
wrapped = False
track_id = self._get_row_track_id(row_number)
while not track_id:
row_number -= 1
if row_number < 0:
if wrapped:
# we're already wrapped once, so there are no
# non-notes
return
row_number = last_row
wrapped = True
track_id = self._get_row_track_id(row_number)
self.selectRow(row_number)
def tab_visible(self) -> None:
"""Called when tab becomes visible"""
# Set row heights
self.resizeRowsToContents()
self.setColumnWidth(len(columns) - 1, 0)
# Hide/show rows
self.hide_or_show_played_tracks()
# # ########## Internally called functions ##########
def _add_track(self, row_number: int) -> None:
"""Add a track to a section header making it a normal track row"""
with Session() as session:
# Add track to playlist row
plr = self._get_row_plr(session, row_number)
if not plr:
return
# Don't add track if there's already a track there
if plr.track_id is not None:
return
# Get track
track = self.musicmuster.get_one_track(session)
if not track:
return
plr.track_id = track.id
# Reset row span
self.setSpan(row_number, HEADER_NOTES_COLUMN, 1, 1)
# Update attributes of row
self._update_row_track_info(session, row_number, track)
self._set_row_bold(row_number)
self._set_row_colour_default(row_number)
self._set_row_note_text(session, row_number, plr.note)
self.clear_selection()
self.save_playlist(session)
# Update times once display updated
self._update_start_end_times(session)
def _build_context_menu(self, item: QTableWidgetItem) -> None:
"""Used to process context (right-click) menu, which is defined here"""
self.menu.clear()
row_number = item.row()
track_id = self._get_row_track_id(row_number)
track_row = bool(track_id)
header_row = not track_row
current = row_number == self._get_current_track_row_number()
next_row = row_number == self._get_next_track_row_number()
# Play with mplayer
if track_row and not current:
self._add_context_menu("Play with mplayer",
lambda: self._mplayer_play(row_number))
# Paste
self._add_context_menu("Paste",
lambda: self.musicmuster.paste_rows(),
self.musicmuster.selected_plrs is None)
# Open in Audacity
if track_row and not current:
self._add_context_menu("Open in Audacity",
lambda: self._open_in_audacity(row_number)
)
# Rescan
if track_row and not current:
self._add_context_menu(
"Rescan track", lambda: self._rescan(row_number, track_id))
# ----------------------
self.menu.addSeparator()
# Remove row
if not current and not next_row:
self._add_context_menu('Delete row', self._delete_rows)
# Move to playlist
if not current and not next_row:
self._add_context_menu('Move to playlist...',
self.musicmuster.move_selected)
# ----------------------
self.menu.addSeparator()
# Remove track from row
if track_row and not current and not next_row:
self._add_context_menu('Remove track from row',
lambda: self._remove_track(row_number))
# Add track to section header (ie, make this a track row)
if header_row:
self._add_context_menu('Add a track',
lambda: self._add_track(row_number))
# Mark unplayed
if self._get_row_userdata(row_number, self.PLAYED):
self._add_context_menu("Mark unplayed", self._mark_unplayed)
# Unmark as next
if next_row:
self._add_context_menu("Unmark as next track",
self.clear_next)
# ----------------------
self.menu.addSeparator()
# Info
if track_row:
self._add_context_menu('Info',
lambda: self._info_row(track_id))
# Track path
if track_row:
self._add_context_menu("Copy track path",
lambda: self._copy_path(row_number))
# return super(PlaylistTab, self).eventFilter(source, event)
def _calculate_end_time(self, start: Optional[datetime],
duration: int) -> Optional[datetime]:
"""Return datetime 'duration' ms after 'start'"""
if start is None:
return None
return start + timedelta(milliseconds=duration)
def _column_resize(self, idx: int, _old: int, _new: int) -> None:
"""
Called when column widths are changed.
Save column sizes to database
"""
with Session() as session:
for column_name, data in columns.items():
idx = data.idx
if idx == len(columns) - 1:
# Don't set width of last column as it's set to
# stretch
continue
width = self.columnWidth(idx)
attribute_name = f"playlist_{column_name}_col_width"
record = Settings.get_int_settings(session, attribute_name)
if record.f_int != self.columnWidth(idx):
record.update(session, {'f_int': width})
def _context_menu(self, pos):
"""Display right-click menu"""
item = self.itemAt(pos)
self._build_context_menu(item)
self.menu.exec(self.mapToGlobal(pos))
def _copy_path(self, row_number: int) -> None:
"""
If passed row_number has a track, copy the track path, single-quoted,
to the clipboard. Otherwise, return None.
"""
track_path = self._get_row_track_path(row_number)
if not track_path:
return
replacements = [
("'", "\\'"),
(" ", "\\ "),
("(", "\\("),
(")", "\\)"),
]
for old, new in replacements:
track_path = track_path.replace(old, new)
cb = QApplication.clipboard()
cb.clear(mode=cb.Mode.Clipboard)
cb.setText(track_path, mode=cb.Mode.Clipboard)
def _delete_rows(self) -> None:
"""
Delete mutliple rows
Actions required:
- Remove the rows from the display
- Save the playlist
- Update track start/end times
"""
rows_to_delete: List[int] = []
with Session() as session:
plrs = self.get_selected_playlistrows(session)
row_count = len(plrs)
if not row_count:
return
# Get confirmation
plural = 's' if row_count > 1 else ''
if not ask_yes_no("Delete rows",
f"Really delete {row_count} row{plural}?"):
return
rows_to_delete = [plr.plr_rownum for plr in plrs]
# Delete rows from database. Would be more efficient to
# query then have a single delete.
for plr in plrs:
session.delete(plr)
# Remove from display
self.remove_rows(rows_to_delete)
# Need to save the playlist to ensure the PlaylistRows have
# the correct row_number
self.save_playlist(session)
# Reset drag mode
self.setDragEnabled(False)
self._update_start_end_times(session)
def _find_next_track_row(self, session: scoped_session,
starting_row: Optional[int] = None) \
-> Optional[int]:
"""
Find next track to play. If a starting row is given, start there;
otherwise, start from top. Skip rows already played.
If not found, return None.
If found, return row number.
"""
if starting_row is None:
starting_row = 0
track_rows = [
p.plr_rownum for p in PlaylistRows.get_rows_with_tracks(
session, self.playlist_id)
]
played_rows = [
p.plr_rownum for p in PlaylistRows.get_played_rows(
session, self.playlist_id)
]
for row_number in range(starting_row, self.rowCount()):
if row_number not in track_rows or row_number in played_rows:
continue
plr = self._get_row_plr(session, row_number)
if not plr:
continue
if file_is_unreadable(plr.track.path):
continue
else:
return row_number
return None
def _get_current_track_row_number(self) -> Optional[int]:
"""Return current track row or None"""
current_track = self.musicmuster.current_track
if not current_track or not current_track.plr_id:
return None
return self._plrid_to_row_number(current_track.plr_id)
def _get_next_track_row_number(self) -> Optional[int]:
"""Return next track row or None"""
next_track = self.musicmuster.next_track
if not next_track or not next_track.plr_id:
return None
return self._plrid_to_row_number(next_track.plr_id)
@staticmethod
def _get_note_text_time(text: str) -> Optional[datetime]:
"""Return datetime specified as @hh:mm:ss in text"""
try:
match = start_time_re.search(text)
except TypeError:
return None
if not match:
return None
try:
return datetime.strptime(match.group(0)[1:],
Config.NOTE_TIME_FORMAT)
except ValueError:
return None
def _get_played_rows(self, session: scoped_session) -> List[int]:
"""
Return a list of row numbers that have been played
"""
return [
p.plr_rownum for p in PlaylistRows.get_played_rows(
session, self.playlist_id) if p.plr_rownum is not None
]
def _get_row_artist(self, row_number: int) -> str:
"""Return artist on this row_number or None if none"""
item_artist = self.item(row_number, ARTIST)
if not item_artist:
return ""
return item_artist.text()
def _get_row_duration(self, row_number: int) -> int:
"""Return duration associated with this row_number"""
duration_udata = self._get_row_userdata(row_number, self.ROW_DURATION)
if not duration_udata:
return 0
else:
return int(duration_udata)
def _get_row_note(self, row_number: int) -> str:
"""Return note on this row_number or null string if none"""
track_id = self._get_row_track_id(row_number)
if track_id:
item_note = self.item(row_number, ROW_NOTES)
else:
item_note = self.item(row_number, HEADER_NOTES_COLUMN)
if not item_note:
return ""
return item_note.text()
def _get_row_path(self, row_number: int) -> str:
"""
Return path of track associated with this row_number or null string
"""
path = str(self._get_row_userdata(row_number, self.TRACK_PATH))
if not path:
return ""
return path
def _get_row_plr(self, session: scoped_session,
row_number: int) -> Optional[PlaylistRows]:
"""
Return PlaylistRows object for this row_number
"""
return session.get(PlaylistRows, self._get_row_plr_id(row_number))
def _get_row_plr_id(self, row_number: int) -> int:
"""Return the plr_id associated with this row_number or 0"""
plr_id = self._get_row_userdata(row_number, self.PLAYLISTROW_ID)
if not plr_id:
return 0
else:
return int(plr_id)
def _get_row_title(self, row_number: int) -> Optional[str]:
"""Return title on this row_number or None if none"""
# Header rows may have note in TITLE row so check for track_id
if not self._get_row_track_id(row_number):
return None
item_title = self.item(row_number, TITLE)
if not item_title:
return None
return item_title.text()
def _get_row_track(self, session: scoped_session,
row_number: int) -> Optional[Tracks]:
"""Return the track associated with this row_number or None"""
track_id = self._get_row_track_id(row_number)
if track_id:
return session.get(Tracks, track_id)
else:
return None
def _get_row_track_id(self, row_number: int) -> int:
"""Return the track_id associated with this row_number or None"""
track_id = self._get_row_userdata(row_number, self.ROW_TRACK_ID)
if not track_id:
return 0
else:
return int(track_id)
def _get_row_track_path(self, row_number: int) -> str:
"""Return the track path associated with this row_number or '' """
path = self._get_row_userdata(row_number, self.TRACK_PATH)
if not path:
return ""
else:
return str(path)
def _get_row_userdata(self, row_number: int,
role: int) -> Optional[Union[str, int]]:
"""
Return the specified userdata, if any.
"""
userdata_item = self.item(row_number, USERDATA)
if not userdata_item:
return None
return userdata_item.data(role)
def _get_section_timing_string(self, ms: int,
no_end: bool = False) -> str:
"""Return string describing section duration"""
duration = ms_to_mmss(ms)
caveat = ""
if no_end:
caveat = " (to end of playlist)"
return ' [' + duration + caveat + ']'
def _get_selected_row(self) -> Optional[int]:
"""
Return row_number number of first selected row,
or None if none selected
"""
if not self.selectionModel().hasSelection():
return None
else:
return self.selectionModel().selectedRows()[0].row()
def _get_selected_rows(self) -> List[int]:
"""Return a list of selected row numbers sorted by row"""
# Use a set to deduplicate result (a selected row will have all
# items in that row selected)
return sorted(
[row_number for row_number in
set([a.row() for a in self.selectedItems()])]
)
def _info_row(self, track_id: int) -> None:
"""Display popup with info re row"""
with Session() as session:
track = session.get(Tracks, track_id)
if track:
txt = (
f"Title: {track.title}\n"
f"Artist: {track.artist}\n"
f"Track ID: {track.id}\n"
f"Track duration: {ms_to_mmss(track.duration)}\n"
f"Track bitrate: {track.bitrate}\n"
f"Track fade at: {ms_to_mmss(track.fade_at)}\n"
f"Track silence at: {ms_to_mmss(track.silence_at)}"
"\n\n"
f"Path: {track.path}\n"
)
else:
txt = f"Can't find {track_id=}"
info: QMessageBox = QMessageBox(self)
info.setIcon(QMessageBox.Icon.Information)
info.setText(txt)
info.setStandardButtons(QMessageBox.StandardButton.Ok)
info.setDefaultButton(QMessageBox.StandardButton.Cancel)
info.exec()
def _look_up_row(self, website: str) -> None:
"""
If there is a selected row and it is a track row,
look up its title in the passed website
If multiple rows are selected, only consider the first one.
Otherwise return.
"""
selected_row = self._get_selected_row()
if not selected_row:
return
if not self._get_row_track_id(selected_row):
return
title = self._get_row_title(selected_row)
if website == "wikipedia":
QTimer.singleShot(
0,
lambda: self.musicmuster.tabInfolist.open_in_wikipedia(title)
)
elif website == "songfacts":
QTimer.singleShot(
0,
lambda: self.musicmuster.tabInfolist.open_in_songfacts(title)
)
else:
return
def _mark_unplayed(self) -> None:
"""
Mark selected rows as unplayed in this playlist
"""
with Session() as session:
for row_number in self._get_selected_rows():
_ = self._set_row_userdata(row_number, self.PLAYED, False)
self._set_row_bold(row_number, True)
plr = self._get_row_plr(session, row_number)
if not plr:
continue
plr.played = False
self._update_start_end_times(session)
self.clear_selection()
self.hide_or_show_played_tracks()
def _move_row(self, session: scoped_session, plr: PlaylistRows,
new_row_number: int) -> None:
"""Move playlist row to new_row_number using parent copy/paste"""
if plr.plr_rownum is None:
return
# Remove source row
self.removeRow(plr.plr_rownum)
# Fixup plr row number
if plr.plr_rownum < new_row_number:
plr.plr_rownum = new_row_number - 1
else:
plr.plr_rownum = new_row_number
self.insert_row(session, plr)
self.save_playlist(session)
self.hide_or_show_played_tracks()
# Queue up time calculations to take place after UI has
# updated
self._update_start_end_times(session)
def _mplayer_play(self, row_number: int) -> None:
"""Play track with mplayer"""
track_path = self._get_row_track_path(row_number)
if not track_path:
log.error(
f"{self.playlist_id=} playlists._mplayer_play({row_number=}): "
"track_path not set"
)
return
cmd_list = ['gmplayer', '-vc', 'null', '-vo', 'null', track_path]
thread = threading.Thread(
target=self._run_subprocess, args=(cmd_list,))
thread.start()
def _obs_change_scene(self, current_row: int) -> None:
"""
Try to change OBS scene to the name passed
"""
check_row = current_row
while True:
# If we have a note and it has a scene change command,
# execute it
note_text = self._get_row_note(check_row)
if note_text:
match_obj = scene_change_re.search(note_text)
if match_obj:
scene_name = match_obj.group(1)
if scene_name:
try:
cl = obs.ReqClient(host=Config.OBS_HOST,
port=Config.OBS_PORT,
password=Config.OBS_PASSWORD)
except ConnectionRefusedError:
log.error(f"OBS connection refused")
return
try:
cl.set_current_program_scene(scene_name)
log.info(f"OBS scene changed to '{scene_name}'")
return
except obs.error.OBSSDKError as e:
log.error(f"OBS SDK error ({e})")
return
# After current track row, only check header rows and stop
# at first non-header row
check_row -= 1
if check_row < 0:
break
if self._get_row_track_id(check_row):
break
def _open_in_audacity(self, row_number: int) -> None:
"""Open track in Audacity. Audacity must be already running"""
track_path = self._get_row_track_path(row_number)
if not track_path:
log.error(
f"{self.playlist_id=} "
f"playlists._open_in_audactity({row_number=}): "
"track_path not set"
)
return
open_in_audacity(track_path)
def _plrid_to_row_number(self, plrid: int) -> Optional[int]:
"""
Return row number of passed plrid, or None if not found
"""
for row_number in range(self.rowCount()):
if self._get_row_plr_id(row_number) == plrid:
return row_number
return None
def _remove_track(self, row_number: int) -> None:
"""Remove track from row, making it a section header"""
# Get confirmation
if not ask_yes_no("Remove music",
"Really remove the music track from this row?"):
return
# Update playlist_rows record
with Session() as session:
plr = self._get_row_plr(session, row_number)
if not plr:
return
plr.track_id = None
# We can't have null text
if not plr.note:
plr.note = Config.TEXT_NO_TRACK_NO_NOTE
session.flush()
# Clear track text items
for i in range(2, len(columns)):
_ = self._set_item_text(row_number, i, "")
# Remove row duration
self._set_row_duration(row_number, 0)
# Remove row start gap
self._set_row_start_gap(row_number, None)
# Remote track_id from row
_ = self._set_row_userdata(row_number, self.ROW_TRACK_ID, 0)
# Span the rows
self.setSpan(row_number, HEADER_NOTES_COLUMN, 1, len(columns) - 1)
# Set note text in correct column for section head
self._set_row_header_text(session, row_number, plr.note)
self.clear_selection()
# Save playlist to ensure correct detection of new header
# row
self.save_playlist(session)
# Set track start/end times after track list is populated
self._update_start_end_times(session)
def _rescan(self, row_number: int, track_id: int) -> None:
"""Rescan track"""
with Session() as session:
track = session.get(Tracks, track_id)
if track:
if file_is_unreadable(track.path):
self._set_row_colour_unreadable(row_number)
else:
self._set_row_colour_default(row_number)
set_track_metadata(session, track)
self._update_row_track_info(session, row_number, track)
else:
_ = self._set_row_track_id(row_number, 0)
note_text = self._get_row_note(row_number)
if note_text is None:
note_text = ""
else:
note_text += f"{track_id=} not found"
self._set_row_header_text(session, row_number, note_text)
log.error(
f"playlists._rescan({track_id=}): "
"Track not found"
)
self._set_row_colour_unreadable(row_number)
self._update_start_end_times(session)
self.clear_selection()
def _run_subprocess(self, args):
"""Run args in subprocess"""
subprocess.call(args)
def _scroll_to_top(self, row_number: int) -> None:
"""
Scroll to put passed row_number Config.SCROLL_TOP_MARGIN from the
top.
"""
if row_number is None:
return
padding_required = Config.SCROLL_TOP_MARGIN
top_row = row_number
if row_number > Config.SCROLL_TOP_MARGIN:
# We can't scroll to a hidden row. Calculate target_row as
# the one that is ideal to be at the top. Then count upwards
# from passed row_number until we either reach the target,
# pass it or reach row_number 0.
for i in range(row_number - 1, -1, -1):
if self.isRowHidden(i):
continue
if padding_required == 0:
break
top_row = i
padding_required -= 1
scroll_item = self.item(top_row, 0)
self.scrollToItem(scroll_item,
QAbstractItemView.ScrollHint.PositionAtTop)
def _search(self, next: bool = True) -> None:
"""
Select next/previous row containg self.search_string. Start from
top selected row if there is one, else from top.
Wrap at last/first row.
"""
if not self.search_text:
return
selected_row = self._get_selected_row()
if next:
if selected_row is not None and selected_row < self.rowCount() - 1:
starting_row = selected_row + 1
else:
starting_row = 0
else:
if selected_row is not None and selected_row > 0:
starting_row = selected_row - 1
else:
starting_row = self.rowCount() - 1
wrapped = False
match_row = None
row_number = starting_row
needle = self.search_text.lower()
while True:
# Check for match in title, artist or notes
title = self._get_row_title(row_number)
if title and needle in title.lower():
match_row = row_number
break
artist = self._get_row_artist(row_number)
if artist and needle in artist.lower():
match_row = row_number
break
note = self._get_row_note(row_number)
if note and needle in note.lower():
match_row = row_number
break
if next:
row_number += 1
if wrapped and row_number >= starting_row:
break
if row_number >= self.rowCount():
row_number = 0
wrapped = True
else:
row_number -= 1
if wrapped and row_number <= starting_row:
break
if row_number < 0:
row_number = self.rowCount() - 1
wrapped = True
if match_row is not None:
self.selectRow(row_number)
def _select_event(self) -> None:
"""
Called when item selection changes.
If multiple rows are selected, display sum of durations in status bar.
"""
# If we are in the process of selecting multiple tracks, no-op here
if self.selecting_in_progress:
return
selected_rows = self._get_selected_rows()
# If no rows are selected, we have nothing to do
if len(selected_rows) == 0:
self.musicmuster.lblSumPlaytime.setText("")
return
ms = 0
for row_number in selected_rows:
ms += self._get_row_duration(row_number)
if ms > 0:
self.musicmuster.lblSumPlaytime.setText(
f"Selected duration: {ms_to_mmss(ms)}")
else:
self.musicmuster.lblSumPlaytime.setText("")
def _set_cell_colour(self, row_number: int, column: int,
colour: Optional[str] = None) -> None:
"""
Set or reset a cell background colour
"""
if colour is None:
brush = QBrush()
else:
brush = QBrush(QColor(colour))
item = self.item(row_number, column)
if item:
item.setBackground(brush)
def _set_column_widths(self, session: scoped_session) -> None:
"""Column widths from settings"""
for column_name, data in columns.items():
idx = data.idx
if idx == len(columns) - 1:
# Set width of last column to zero as it's set to stretch
self.setColumnWidth(idx, 0)
continue
attr_name = f"playlist_{column_name}_col_width"
record: Settings = Settings.get_int_settings(session, attr_name)
if record and record.f_int >= 0:
self.setColumnWidth(idx, record.f_int)
else:
self.setColumnWidth(idx, Config.DEFAULT_COLUMN_WIDTH)
def _set_item_text(self, row_number: int, column: int,
text: Optional[str]) -> QTableWidgetItem:
"""
Set text for item if it exists, else create it, and return item
"""
if not text:
text = ""
item = self.item(row_number, column)
if not item:
item = QTableWidgetItem(text)
self.setItem(row_number, column, item)
else:
item.setText(text)
return item
def _reset_next(self, old_plrid: int, new_plrid: int) -> None:
"""
Called when set_next_track_signal signal received.
Actions required:
- If old_plrid points to this playlist:
- Remove existing next track
- If new_plrid points to this playlist:
- Set track as next
- Display row as next track
- Update start/stop times
"""
with Session() as session:
# Get plrs
old_plr = new_plr = None
if old_plrid:
old_plr = session.get(PlaylistRows, old_plrid)
# Unmark next track
if old_plr and old_plr.playlist_id == self.playlist_id:
self._set_row_colour_default(old_plr.plr_rownum)
# Mark next track
if new_plrid:
new_plr = session.get(PlaylistRows, new_plrid)
if not new_plr:
log.error(f"_reset_next({new_plrid=}): plr not found")
return
if new_plr.playlist_id == self.playlist_id:
self._set_row_colour_next(new_plr.plr_rownum)
# Update start/stop times
self._update_start_end_times(session)
self.clear_selection()
def _set_played_row(self, session: scoped_session,
row_number: int) -> None:
"""Mark this row as played"""
_ = self._set_row_userdata(row_number, self.PLAYED, True)
self._set_row_bold(row_number, False)
plr = self._get_row_plr(session, row_number)
if not plr:
return
plr.played = True
session.flush()
def _set_row_artist(self, row_number: int,
artist: Optional[str]) -> QTableWidgetItem:
"""
Set row artist.
Return QTableWidgetItem.
"""
if not artist:
artist = ""
return self._set_item_text(row_number, ARTIST, artist)
def _set_row_bitrate(self, row_number: int,
bitrate: Optional[int]) -> QTableWidgetItem:
"""Set bitrate of this row."""
if not bitrate:
bitrate_str = ""
# If no bitrate, flag it as too low
bitrate = Config.BITRATE_LOW_THRESHOLD - 1
else:
bitrate_str = str(bitrate)
bitrate_item = self._set_item_text(row_number, BITRATE, bitrate_str)
if bitrate < Config.BITRATE_LOW_THRESHOLD:
cell_colour = Config.COLOUR_BITRATE_LOW
elif bitrate < Config.BITRATE_OK_THRESHOLD:
cell_colour = Config.COLOUR_BITRATE_MEDIUM
else:
cell_colour = Config.COLOUR_BITRATE_OK
brush = QBrush(QColor(cell_colour))
bitrate_item.setBackground(brush)
return bitrate_item
def _set_row_bold(self, row_number: int, bold: bool = True) -> None:
"""
Make row bold (bold=True) or not bold.
Don't make notes column bold.
"""
boldfont = QFont()
boldfont.setBold(bold)
for column in range(self.columnCount()):
if column == ROW_NOTES:
continue
item = self.item(row_number, column)
if item:
item.setFont(boldfont)
def _set_row_colour(self, row_number: int,
colour: Optional[str] = None) -> None:
"""
Set or reset row background colour
"""
if colour is None:
brush = QBrush()
else:
brush = QBrush(QColor(colour))
for column in range(1, self.columnCount()):
if column in [START_GAP, BITRATE]:
continue
item = self.item(row_number, column)
if item:
item.setBackground(brush)
def _set_row_colour_current(self, row_number: int) -> None:
"""
Set current track row colour
"""
self._set_row_colour(row_number, Config.COLOUR_CURRENT_PLAYLIST)
def _set_row_colour_default(self, row_number: int) -> None:
"""
Set default row colour
"""
self._set_row_colour(row_number, None)
def _set_row_colour_next(self, row_number: int) -> None:
"""
Set next track row colour
"""
self._set_row_colour(row_number, Config.COLOUR_NEXT_PLAYLIST)
def _set_row_colour_unreadable(self, row_number: int) -> None:
"""
Set unreadable row colour
"""
self._set_row_colour(row_number, Config.COLOUR_UNREADABLE)
def _set_row_duration(self, row_number: int,
ms: Optional[int]) -> QTableWidgetItem:
"""Set duration of this row. Also set in row metadata"""
duration_item = self._set_item_text(
row_number, DURATION, ms_to_mmss(ms))
self._set_row_userdata(row_number, self.ROW_DURATION, ms)
return duration_item
def _set_row_end_time(self, row_number: int,
time: Optional[datetime]) -> QTableWidgetItem:
"""Set row end time"""
if not time:
time_str = ""
else:
try:
time_str = time.strftime(Config.TRACK_TIME_FORMAT)
except AttributeError:
time_str = ""
return self._set_item_text(row_number, END_TIME, time_str)
def _set_row_header_text(self, session: scoped_session,
row_number: int, text: str) -> None:
"""
Set header text and row colour
"""
# Sanity check: this should be a header row and thus not have a
# track associate
if self._get_row_track_id(row_number):
if os.environ["MM_ENV"] == "PRODUCTION":
send_mail(
Config.ERRORS_TO,
Config.ERRORS_FROM,
"playlists:_set_row_header_text() called on track row",
stackprinter.format()
)
print("playists:_set_row_header_text() called on track row")
stackprinter.show(add_summary=True, style="darkbg")
return
# Set text
_ = self._set_item_text(row_number, HEADER_NOTES_COLUMN, text)
# Set colour
note_colour = NoteColours.get_colour(session, text)
if not note_colour:
note_colour = Config.COLOUR_NOTES_PLAYLIST
self._set_row_colour(row_number, note_colour)
def _set_row_last_played_time(
self, row_number: int,
last_played: Optional[datetime]) -> QTableWidgetItem:
"""Set row last played time"""
last_played_str = get_relative_date(last_played)
return self._set_item_text(row_number, LASTPLAYED, last_played_str)
def _set_row_note_colour(self, session: scoped_session,
row_number: int) -> None:
"""
Set row note colour
"""
# Sanity check: this should be a track row and thus have a
# track associated
if not self._get_row_track_id(row_number):
if os.environ["MM_ENV"] == "PRODUCTION":
send_mail(Config.ERRORS_TO,
Config.ERRORS_FROM,
"playlists:_set_row_note_colour() on header row",
stackprinter.format()
)
print("playists:_set_row_note_colour() called on header row")
stackprinter.show(add_summary=True, style="darkbg")
return
# Set colour
note_text = self._get_row_note(row_number)
note_colour = NoteColours.get_colour(session, note_text)
self._set_cell_colour(row_number, ROW_NOTES, note_colour)
def _set_row_note_text(self, session: scoped_session,
row_number: int, text: str) -> None:
"""
Set row note text and note colour
"""
# Sanity check: this should be a track row and thus have a
# track associated
if not self._get_row_track_id(row_number):
if os.environ["MM_ENV"] == "PRODUCTION":
send_mail(
Config.ERRORS_TO,
Config.ERRORS_FROM,
"playlists:_set_row_note_text() called on header row",
stackprinter.format()
)
print("playists:_set_row_note_text() called on header row")
stackprinter.show(add_summary=True, style="darkbg")
return
# Set text
_ = self._set_item_text(row_number, ROW_NOTES, text)
# Set colour
self._set_row_note_colour(session, row_number)
def _set_row_plr_id(self, row_number: int,
plr_id: int) -> QTableWidgetItem:
"""
Set PlaylistRows id
"""
return self._set_row_userdata(row_number, self.PLAYLISTROW_ID, plr_id)
def _set_row_start_gap(self, row_number: int,
start_gap: Optional[int]) -> QTableWidgetItem:
"""
Set start gap on row, set backgroud colour.
Return QTableWidgetItem.
"""
if not start_gap:
start_gap = 0
start_gap_item = self._set_item_text(
row_number, START_GAP, str(start_gap))
if start_gap >= 500:
brush = QBrush(QColor(Config.COLOUR_LONG_START))
else:
brush = QBrush()
start_gap_item.setBackground(brush)
return start_gap_item
def _set_row_start_time(self, row_number: int,
time: Optional[datetime]) -> QTableWidgetItem:
"""Set row start time"""
if not time:
time_str = ""
else:
try:
time_str = time.strftime(Config.TRACK_TIME_FORMAT)
except AttributeError:
time_str = ""
return self._set_item_text(row_number, START_TIME, time_str)
def _set_row_times(self, row_number: int, start: datetime,
duration: int) -> Optional[datetime]:
"""
Set row start and end times, return end time
"""
self._set_row_start_time(row_number, start)
end_time = self._calculate_end_time(start, duration)
self._set_row_end_time(row_number, end_time)
return end_time
def _set_row_title(self, row_number: int,
title: Optional[str]) -> QTableWidgetItem:
"""
Set row title.
"""
if not title:
title = ""
return self._set_item_text(row_number, TITLE, title)
def _set_row_track_id(self, row_number: int,
track_id: int) -> QTableWidgetItem:
"""
Set track id
"""
return self._set_row_userdata(row_number, self.ROW_TRACK_ID, track_id)
def _set_row_track_path(self, row_number: int,
path: str) -> QTableWidgetItem:
"""
Set track path
"""
return self._set_row_userdata(row_number, self.TRACK_PATH, path)
def _set_row_userdata(self, row_number: int, role: int,
value: Optional[Union[str, int]]) \
-> QTableWidgetItem:
"""
Set passed userdata in USERDATA column
"""
item = self.item(row_number, USERDATA)
if not item:
item = QTableWidgetItem()
self.setItem(row_number, USERDATA, item)
item.setData(role, value)
return item
def _track_time_between_rows(self, session: scoped_session,
from_plr: PlaylistRows,
to_plr: PlaylistRows) -> int:
"""
Returns the total duration of all tracks in rows between
from_row and to_row inclusive
"""
plr_tracks = PlaylistRows.get_rows_with_tracks(
session, self.playlist_id, from_plr.plr_rownum, to_plr.plr_rownum)
total_time = 0
total_time = sum([a.track.duration for a in plr_tracks
if a.track.duration])
return total_time
def _update_row_track_info(self, session: scoped_session, row: int,
track: Tracks) -> None:
"""
Update the passed row with info from the passed track.
"""
_ = self._set_row_artist(row, track.artist)
_ = self._set_row_bitrate(row, track.bitrate)
_ = self._set_row_duration(row, track.duration)
_ = self._set_row_end_time(row, None)
_ = self._set_row_last_played_time(
row, Playdates.last_played(session, track.id))
_ = self._set_row_start_gap(row, track.start_gap)
_ = self._set_row_start_time(row, None)
_ = self._set_row_title(row, track.title)
_ = self._set_row_track_id(row, track.id)
_ = self._set_row_track_path(row, track.path)
if file_is_unreadable(track.path):
self._set_row_colour_unreadable(row)
def _update_section_headers(self, session: scoped_session) -> None:
"""
Update section headers with run time of section
"""
section_start_rows: List[PlaylistRows] = []
header_rows = [self._get_row_plr_id(row_number) for row_number in
range(self.rowCount())
if self._get_row_track_id(row_number) == 0]
plrs = PlaylistRows.get_from_id_list(session, self.playlist_id,
header_rows)
for plr in plrs:
if plr.note.endswith("+"):
section_start_rows.append(plr)
continue
elif plr.note.endswith("-"):
try:
from_plr = section_start_rows.pop()
to_plr = plr
total_time = self._track_time_between_rows(
session, from_plr, to_plr)
time_str = self._get_section_timing_string(total_time)
self._set_row_header_text(session, from_plr.plr_rownum,
from_plr.note + time_str)
# Update section end
if to_plr.note.strip() == "-":
new_text = (
"[End " + re.sub(
section_header_cleanup_re, '', from_plr.note,
).strip() + "]"
)
self._set_row_header_text(session, to_plr.plr_rownum,
new_text)
except IndexError:
# This ending row may have a time left from before a
# starting row above was deleted, so replace content
self._set_row_header_text(session, plr.plr_rownum,
plr.note)
continue
# If we still have plrs in section_start_rows, there isn't an end
# section row for them
possible_plr = self._get_row_plr(session, self.rowCount() - 1)
if possible_plr:
to_plr = possible_plr
for from_plr in section_start_rows:
total_time = self._track_time_between_rows(session,
from_plr, to_plr)
time_str = self._get_section_timing_string(total_time,
no_end=True)
self._set_row_header_text(session, from_plr.plr_rownum,
from_plr.note + time_str)
def _update_start_end_times(self, session: scoped_session) -> None:
""" Update track start and end times """
current_track_end_time = self.musicmuster.current_track.end_time
current_track_row = self._get_current_track_row_number()
current_track_start_time = (
self.musicmuster.current_track.start_time)
next_start_time = None
next_track_row = self._get_next_track_row_number()
played_rows = self._get_played_rows(session)
for row_number in range(self.rowCount()):
# Don't change start times for tracks that have been
# played other than current/next row
if row_number in played_rows and row_number not in [
current_track_row, next_track_row]:
continue
# Get any timing from header row (that's all we need)
if self._get_row_track_id(row_number) == 0:
note_time = self._get_note_text_time(
self._get_row_note(row_number))
if note_time:
next_start_time = note_time
continue
# We have a track. Skip if it is unreadable
if file_is_unreadable(self._get_row_path(row_number)):
continue
# Set next track start from end of current track
if row_number == next_track_row:
if current_track_end_time:
next_start_time = self._set_row_times(
row_number, current_track_end_time,
self._get_row_duration(row_number))
continue
# Else set track times below
if row_number == current_track_row:
if not current_track_start_time:
continue
self._set_row_start_time(row_number,
current_track_start_time)
self._set_row_end_time(row_number, current_track_end_time)
# Next track may be above us so only reset
# next_start_time if it's not set
if not next_start_time:
next_start_time = current_track_end_time
continue
if not next_start_time:
# Clear any existing times
self._set_row_start_time(row_number, None)
self._set_row_end_time(row_number, None)
continue
# If we're between the current and next row, zero out
# times
if (current_track_row and next_track_row and
current_track_row < row_number < next_track_row):
self._set_row_start_time(row_number, None)
self._set_row_end_time(row_number, None)
else:
next_start_time = self._set_row_times(
row_number, next_start_time,
self._get_row_duration(row_number))
self._update_section_headers(session)