musicmuster/app/playlists.py
2023-11-08 18:34:10 +00:00

2631 lines
88 KiB
Python

import os
import re
import stackprinter # type: ignore
import subprocess
import threading
import obsws_python as obs # type: ignore
from datetime import datetime, timedelta
from typing import Any, Callable, cast, List, Optional, Tuple, TYPE_CHECKING
from PyQt6.QtCore import (
QEvent,
QModelIndex,
QObject,
Qt,
# QTimer,
)
from PyQt6.QtGui import QAction, QBrush, QColor, QFont, QDropEvent, QKeyEvent
from PyQt6.QtWidgets import (
QAbstractItemDelegate,
QAbstractItemView,
QApplication,
QHeaderView,
QMenu,
QMessageBox,
QPlainTextEdit,
QStyledItemDelegate,
QStyleOptionViewItem,
QTableView,
QTableWidgetItem,
QWidget,
QProxyStyle,
QStyle,
QStyleOption,
)
from dbconfig import Session, scoped_session
from dialogs import TrackSelectDialog
from classes import MusicMusterSignals
from config import Config
from helpers import (
ask_yes_no,
file_is_unreadable,
get_relative_date,
ms_to_mmss,
open_in_audacity,
send_mail,
set_track_metadata,
)
from log import log
from models import PlaylistRows, Settings, Tracks, NoteColours
if TYPE_CHECKING:
from musicmuster import Window
from playlistmodel import PlaylistModel
HEADER_NOTES_COLUMN = 2
class EscapeDelegate(QStyledItemDelegate):
"""
- increases the height of a row when editing to make editing easier
- closes the edit on control-return
- checks with user before abandoning edit on Escape
"""
def __init__(self, parent) -> None:
super().__init__(parent)
self.signals = MusicMusterSignals()
def createEditor(
self,
parent: Optional[QWidget],
option: QStyleOptionViewItem,
index: QModelIndex,
):
"""
Intercept createEditor call and make row just a little bit taller
"""
self.signals = MusicMusterSignals()
self.signals.enable_escape_signal.emit(False)
if isinstance(self.parent(), PlaylistTab):
p = cast(PlaylistTab, self.parent())
if isinstance(index.data(), str):
row = index.row()
row_height = p.rowHeight(row)
p.setRowHeight(row, row_height + Config.MINIMUM_ROW_HEIGHT)
return QPlainTextEdit(parent)
return super().createEditor(parent, option, index)
def destroyEditor(self, editor: Optional[QWidget], index: QModelIndex) -> None:
"""
Intercept editor destroyment
"""
self.signals.enable_escape_signal.emit(True)
return super().destroyEditor(editor, index)
def eventFilter(self, editor: Optional[QObject], event: Optional[QEvent]) -> bool:
"""By default, QPlainTextEdit doesn't handle enter or return"""
if editor is None or event is None:
return False
if event.type() == QEvent.Type.KeyPress:
key_event = cast(QKeyEvent, event)
if key_event.key() == Qt.Key.Key_Return:
if key_event.modifiers() == (Qt.KeyboardModifier.ControlModifier):
self.commitData.emit(editor)
self.closeEditor.emit(editor)
return True
elif key_event.key() == Qt.Key.Key_Escape:
discard_edits = QMessageBox.question(
cast(QWidget, self.parent()), "Abandon edit", "Discard changes?"
)
if discard_edits == QMessageBox.StandardButton.Yes:
self.closeEditor.emit(editor)
return True
return False
def setEditorData(self, editor, index):
value = index.model().data(index, Qt.ItemDataRole.EditRole)
editor.setPlainText(value.value())
def setModelData(self, editor, model, index):
value = editor.toPlainText()
model.setData(index, value, Qt.ItemDataRole.EditRole)
def updateEditorGeometry(self, editor, option, index):
editor.setGeometry(option.rect)
class PlaylistStyle(QProxyStyle):
def drawPrimitive(self, element, option, painter, widget=None):
"""
Draw a line across the entire row rather than just the column
we're hovering over.
"""
if (
element == QStyle.PrimitiveElement.PE_IndicatorItemViewItemDrop
and not option.rect.isNull()
):
option_new = QStyleOption(option)
option_new.rect.setLeft(0)
if widget:
option_new.rect.setRight(widget.width())
option = option_new
super().drawPrimitive(element, option, painter, widget)
class PlaylistTab(QTableView):
def __init__(
self,
musicmuster: "Window",
playlist_id: int,
) -> None:
super().__init__()
# Save passed settings
self.musicmuster = musicmuster
self.playlist_id = playlist_id
# Set up widget
self.setItemDelegate(EscapeDelegate(self))
self.setAlternatingRowColors(True)
self.setSelectionMode(QAbstractItemView.SelectionMode.ExtendedSelection)
self.setSelectionBehavior(QAbstractItemView.SelectionBehavior.SelectRows)
# self.setEditTriggers(QAbstractItemView.EditTrigger.DoubleClicked)
self.setVerticalScrollMode(QAbstractItemView.ScrollMode.ScrollPerPixel)
self.setDragDropMode(QAbstractItemView.DragDropMode.InternalMove)
self.setDragDropOverwriteMode(False)
self.setAcceptDrops(True)
# Set our custom style - this draws the drop indicator across the whole row
self.setStyle(PlaylistStyle())
# TODO: change this later to only enable drags when multiple
# rows selected
self.setDragEnabled(True)
# Prepare for context menu
self.menu = QMenu()
self.setContextMenuPolicy(Qt.ContextMenuPolicy.CustomContextMenu)
self.customContextMenuRequested.connect(self._context_menu)
# Connect signals
# This dancing is to satisfy mypy
h_header = self.horizontalHeader()
if isinstance(h_header, QHeaderView):
h_header.sectionResized.connect(self._column_resize)
h_header.setStretchLastSection(True)
# self.itemSelectionChanged.connect(self._select_event)
# self.signals.set_next_track_signal.connect(self._reset_next)
self.signals = MusicMusterSignals()
self.signals.span_cells_signal.connect(self._span_cells)
# Call self.eventFilter() for events
# self.installEventFilter(self)
# Initialise miscellaneous instance variables
self.search_text: str = ""
self.sort_undo: List[int] = []
# self.edit_cell_type: Optional[int]
# Load playlist rows
self.setModel(PlaylistModel(playlist_id))
self._set_column_widths()
# kae def __repr__(self) -> str:
# kae return f"<PlaylistTab(id={self.playlist_id}>"
# ########## Events other than cell editing ##########
def dropEvent(self, event):
if event.source() is not self or (
event.dropAction() != Qt.DropAction.MoveAction
and self.dragDropMode() != QAbstractItemView.InternalMove
):
super().dropEvent(event)
from_rows = list(set([a.row() for a in self.selectedIndexes()]))
to_row = self.indexAt(event.position().toPoint()).row()
if (
0 <= min(from_rows) <= self.model().rowCount()
and 0 <= max(from_rows) <= self.model().rowCount()
and 0 <= to_row <= self.model().rowCount()
):
self.model().move_rows(from_rows, to_row)
event.accept()
super().dropEvent(event)
# def dropEvent(self, event: Optional[QDropEvent]) -> None:
# """
# Handle drag/drop of rows
# https://stackoverflow.com/questions/26227885/drag-and-drop-rows-within-qtablewidget
# """
# if not event:
# return
# if not event.source() == self:
# return # We don't accept external drops
# top_row = self.rowAt(0)
# row_set = set([mi.row() for mi in self.selectedIndexes()])
# targetRow = self.indexAt(event.position().toPoint()).row()
# row_set.discard(targetRow)
# rows = list(sorted(row_set))
# if not rows:
# return
# if targetRow == -1:
# targetRow = self.rowCount()
# for _ in range(len(rows)):
# self.insertRow(targetRow)
# rowMapping = dict() # Src row to target row.
# for idx, row in enumerate(rows):
# if row < targetRow:
# rowMapping[row] = targetRow + idx
# else:
# rowMapping[row + len(rows)] = targetRow + idx
# colCount = self.columnCount()
# for srcRow, tgtRow in sorted(rowMapping.items()):
# if self._get_row_track_id(srcRow):
# # This is a track row
# for col in range(0, colCount):
# self.setItem(tgtRow, col, self.takeItem(srcRow, col))
# else:
# self.setItem(
# tgtRow,
# HEADER_NOTES_COLUMN,
# self.takeItem(srcRow, HEADER_NOTES_COLUMN),
# )
# self.setSpan(tgtRow, HEADER_NOTES_COLUMN, 1, len(columns) - 1)
# for row in reversed(sorted(rowMapping.keys())):
# self.removeRow(row)
# self.resizeRowsToContents()
# # Scroll to drop zone
# self.scrollToItem(
# self.item(top_row, 1), QAbstractItemView.ScrollHint.PositionAtTop
# )
# event.accept()
# # Reset drag mode to allow row selection by dragging
# self.setDragEnabled(False)
# # Disable sort undo
# self.sort_undo = []
# with Session() as session:
# self.save_playlist(session)
# self._update_start_end_times(session)
# self.hide_or_show_played_tracks()
def _add_context_menu(
self,
text: str,
action: Callable,
disabled: bool = False,
parent_menu: Optional[QMenu] = None,
) -> Optional[QAction]:
"""
Add item to self.menu
"""
if parent_menu is None:
parent_menu = self.menu
menu_item = parent_menu.addAction(text)
if not menu_item:
return None
menu_item.setDisabled(disabled)
menu_item.triggered.connect(action)
return menu_item
# def mouseReleaseEvent(self, event):
# """
# Enable dragging if rows are selected
# """
# if self.selectedIndexes():
# self.setDragEnabled(True)
# else:
# self.setDragEnabled(False)
# super().mouseReleaseEvent(event)
# ########## Cell editing ##########
# We only want to allow cell editing on tracks, artists and notes,
# although notes may be section headers.
#
# Once editing starts, we need to disable play controls so that a
# 'return' doesn't play the next track.
#
# Earlier in this file:
# self.setEditTriggers(QAbstractItemView.DoubleClicked) - triggers
# editing on double-click
#
# Call sequences:
# Start editing:
# edit()
# End editing:
# _cell_changed() (only if changes made)
# closeEditor()
# def _cell_changed(self, row: int, column: int) -> None:
# """Called when cell content has changed"""
# # Disable cell changed signal connection as note updates will
# # change cell again (metadata)
# self.cellChanged.disconnect(self._cell_changed)
# cell = self.item(row, column)
# if not cell:
# return
# new_text = cell.text().strip()
# # Update cell with strip()'d text
# cell.setText(new_text)
# track_id = self._get_row_track_id(row)
# # Determine cell type changed
# with Session() as session:
# # Get playlistrow object
# plr_id = self._get_row_plr_id(row)
# plr_item = session.get(PlaylistRows, plr_id)
# if not plr_item:
# return
# # Note any updates needed to PlaylistTrack objects
# update_current = self.musicmuster.current_track.plr_id == plr_id
# update_next = self.musicmuster.next_track.plr_id == plr_id
# if self.edit_cell_type == ROW_NOTES:
# plr_item.note = new_text
# if track_id:
# self._set_row_note_text(session, row, new_text)
# else:
# self._set_row_header_text(session, row, new_text)
# else:
# if track_id:
# track = session.get(Tracks, track_id)
# if track:
# if self.edit_cell_type == TITLE:
# track.title = new_text
# if update_current:
# self.musicmuster.current_track.title = new_text
# if update_next:
# self.musicmuster.next_track.title = new_text
# elif self.edit_cell_type == ARTIST:
# track.artist = new_text
# if update_current:
# self.musicmuster.current_track.artist = new_text
# if update_next:
# self.musicmuster.next_track.artist = new_text
# if update_next or update_current:
# self.musicmuster.update_headers()
# if update_current:
# self._set_row_colour_current(row)
# elif update_next:
# self._set_row_colour_next(row)
# self.clear_selection()
# def closeEditor(
# self, editor: QWidget, hint: QAbstractItemDelegate.EndEditHint
# ) -> None:
# """
# Override PySide2.QAbstractItemView.closeEditor to enable
# play controls and update display.
# """
# # If edit was cancelled (eg, by pressing ESC), the signal will
# # still be connected
# try:
# self.cellChanged.disconnect(self._cell_changed)
# except TypeError:
# pass
# self.edit_cell_type = None
# self.musicmuster.enable_play_next_controls()
# self.musicmuster.actionSetNext.setEnabled(True)
# self.musicmuster.action_Clear_selection.setEnabled(True)
# super(PlaylistTab, self).closeEditor(editor, hint)
# # Optimise row heights after increasing row height for editing
# self.resizeRowsToContents()
# # Update start times in case a start time in a note has been
# # edited
# with Session() as session:
# self._update_start_end_times(session)
# def edit(
# self,
# index: QModelIndex, # type: ignore # FIXME
# trigger: QAbstractItemView.EditTrigger,
# event: QEvent,
# ) -> bool:
# """
# Override PySide2.QAbstractItemView.edit to catch when editing starts
# Editing only ever starts with a double click on a cell
# """
# # 'result' will only be true on double-click
# result = super(PlaylistTab, self).edit(index, trigger, event)
# if result:
# row = index.row()
# column = index.column()
# note_column = 0
# if self._get_row_track_id(row):
# # If a track row, we only allow editing of title, artist and
# # note. Check that this column is one of those.
# if column in [TITLE, ARTIST, ROW_NOTES]:
# self.edit_cell_type = column
# else:
# # Can't edit other columns
# return False
# # Check whether we're editing a notes row for later
# if self.edit_cell_type == ROW_NOTES:
# note_column = ROW_NOTES
# else:
# # This is a section header.
# if column != HEADER_NOTES_COLUMN:
# return False
# note_column = HEADER_NOTES_COLUMN
# self.edit_cell_type = ROW_NOTES
# # Disable play controls so that keyboard input doesn't
# # disturb playing
# self.musicmuster.disable_play_next_controls()
# self.musicmuster.actionSetNext.setEnabled(False)
# self.musicmuster.action_Clear_selection.setEnabled(False)
# # If this is a note cell, we need to remove any existing section
# # timing so user can't edit that. Keep it simple: refresh text
# # from database. note_column will only be non-zero if we are
# # editing a note.
# if note_column:
# with Session() as session:
# plr_item = self._get_row_plr(session, row)
# if not plr_item:
# return False
# if note_column == ROW_NOTES:
# self._set_row_note_text(session, row, plr_item.note)
# else:
# self._set_row_header_text(session, row, plr_item.note)
# # Connect signal so we know when cell has changed.
# self.cellChanged.connect(self._cell_changed)
# return result
# # ########## Externally called functions ##########
# def clear_next(self) -> None:
# """
# Unmark next track
# """
# row_number = self._get_next_track_row_number()
# if not row_number:
# return
# self._set_row_colour_default(row_number)
# self.clear_selection()
# self.musicmuster.set_next_plr_id(None, self)
def clear_selection(self) -> None:
"""Unselect all tracks and reset drag mode"""
self.clearSelection()
# self.setDragEnabled(False)
def get_selected_row_number(self) -> Optional[int]:
"""
Return the selected row number or None if none selected.
"""
sm = self.selectionModel()
if sm and sm.hasSelection():
index = sm.currentIndex()
if index.isValid():
return index.row()
return None
# def get_new_row_number(self) -> int:
# """
# Return the selected row or the row count if no row selected
# (ie, new row will be appended)
# """
# if self.selectionModel().hasSelection():
# return self.currentRow()
# else:
# return self.rowCount()
def get_selected_playlistrow_ids(self) -> list:
"""
Return a list of PlaylistRow ids of the selected rows
"""
return [self._get_row_plr_id(a) for a in self._get_selected_rows()]
# def get_selected_playlistrows(self, session: scoped_session) -> List[PlaylistRows]:
# """
# Return a list of PlaylistRows of the selected rows
# """
# plr_ids = self.get_selected_playlistrow_ids()
# if not plr_ids:
# return []
# plrs = [session.get(PlaylistRows, a) for a in plr_ids]
# return [plr for plr in plrs if plr is not None]
# def get_selected_row_track_path(self) -> Optional[str]:
# """
# Return the path of the first selected row or
# None if no rows are selected or first selected row doesn't
# have a track.
# """
# first_selected_row = self._get_selected_row()
# if first_selected_row is None:
# return None
# path = self._get_row_track_path(first_selected_row)
# if not path:
# return None
# return path
# def hide_or_show_played_tracks(self) -> None:
# """
# Hide or show played tracks.
# Never hide current or next track
# """
# current_next = [
# self._get_current_track_row_number(),
# self._get_next_track_row_number(),
# ]
# for row_number in range(self.rowCount()):
# if row_number in current_next:
# continue
# if self._get_row_userdata(row_number, self.PLAYED):
# if self.musicmuster.hide_played_tracks:
# self.hideRow(row_number)
# else:
# self.showRow(row_number)
# # This causes scrolling, so ensure current track is visible
# self.scroll_current_to_top()
# def insert_header(self, session: scoped_session, note: str) -> None:
# """
# Insert section header into playlist tab.
# If a row is selected, add header above. Otherwise, add to end of
# playlist.
# We simply build a PlaylistRows object and pass it to insert_row()
# to do the heavy lifing.
# """
# row_number = self.get_new_row_number()
# TODO: check arg order plr = PlaylistRows(session, self.playlist_id, None, row_number, note)
# self.insert_row(session, plr)
# self._set_row_header_text(session, row_number, note)
# self.save_playlist(session)
# self._update_start_end_times(session)
# def insert_row(
# self,
# session: scoped_session,
# plr: PlaylistRows,
# update_track_times: bool = True,
# played=False,
# ) -> None:
# """
# Insert passed playlist row (plr) into playlist tab.
# """
# row_number = plr.plr_rownum
# bold = True
# self.insertRow(row_number)
# _ = self._set_row_plr_id(row_number, plr.id)
# if plr.track:
# self._update_row_track_info(session, row_number, plr.track)
# if played:
# bold = False
# _ = self._set_row_userdata(row_number, self.PLAYED, True)
# self._set_row_note_text(session, row_number, plr.note)
# else:
# # This is a section header so it must have note text
# if plr.note is None:
# log.debug(f"insert_row({plr=}) with no track_id and no note")
# return
# # Use one QTableWidgetItem to span all columns from column 1
# self._set_row_header_text(session, row_number, plr.note)
# self.setSpan(row_number, HEADER_NOTES_COLUMN, 1, len(columns) - 1)
# # Save (or clear) track_id
# _ = self._set_row_track_id(row_number, 0)
# # Set bold as needed
# self._set_row_bold(row_number, bold)
# def insert_track(
# self,
# session: scoped_session,
# track: Tracks,
# note: str = "",
# repaint: bool = True,
# target_row: Optional[int] = None,
# ) -> None:
# """
# Insert track into playlist tab.
# If a row is selected, add track above. Otherwise, add to end of
# playlist.
# We simply build a PlaylistRows object and pass it to insert_row()
# to do the heavy lifing.
# """
# if not track:
# log.debug(
# f"insert_track(session={hex(id(Session))}, {note=}, {repaint=}"
# " called with no track"
# )
# return
# if target_row:
# row_number = target_row
# else:
# row_number = self.get_new_row_number()
# # Check to see whether track is already in playlist
# existing_plr = PlaylistRows.get_track_plr(session, track.id, self.playlist_id)
# if existing_plr and ask_yes_no(
# "Duplicate row",
# "Track already in playlist. " "Move to new location?",
# default_yes=True,
# ):
# # Yes it is and we should reuse it
# # If we've been passed a note, we need to add that to the
# # existing track
# if note:
# existing_plr.append_note(note)
# return self._move_row(session, existing_plr, row_number)
# # Build playlist_row object
# plr = TODO: check arg order PlaylistRows(session, self.playlist_id, track.id, row_number, note)
# self.insert_row(session, plr)
# self.save_playlist(session)
# self._update_start_end_times(session)
# def lookup_row_in_songfacts(self) -> None:
# """
# If there is a selected row and it is a track row,
# look up its title in songfacts.
# If multiple rows are selected, only consider the first one.
# Otherwise return.
# """
# self._look_up_row(website="songfacts")
# def lookup_row_in_wikipedia(self) -> None:
# """
# If there is a selected row and it is a track row,
# look up its title in wikipedia.
# If multiple rows are selected, only consider the first one.
# Otherwise return.
# """
# self._look_up_row(website="wikipedia")
# def play_ended(self) -> None:
# """
# Called by musicmuster when play has ended.
# current_track points to track that's just finished
# """
# row_number = self._get_current_track_row_number()
# if row_number is None:
# return
# self._set_row_colour_default(row_number)
# self.clear_selection()
# self._set_row_last_played_time(
# row_number, self.musicmuster.current_track.start_time
# )
# with Session() as session:
# self._set_row_note_colour(session, row_number)
# def play_started(self, session: scoped_session) -> None:
# """
# Notification from musicmuster that track has started playing.
# Actions required:
# - Mark current row as played
# - Set next track
# - Display track as current
# - Update start/stop times
# - Change OBS scene if needed
# - Update hidden tracks
# """
# print("playlists_v3:play_starter()")
# return
# # current_row = self._get_current_track_row_number()
# # if current_row is None:
# # if os.environ["MM_ENV"] == "PRODUCTION":
# # send_mail(
# # Config.ERRORS_TO,
# # Config.ERRORS_FROM,
# # "playlists:play_started:current_row is None",
# # stackprinter.format(),
# # )
# # print("playlists:play_started:current_row is None")
# # # stackprinter.show(add_summary=True, style="darkbg")
# # return
# # # Mark current row as played
# # self._set_played_row(session, current_row)
# # # Set next track
# # next_row = self._find_next_track_row(session, current_row + 1)
# # if next_row:
# # self.musicmuster.set_next_plr_id(self._get_row_plr_id(next_row), self)
# # # Display row as current track
# # self._set_row_colour_current(current_row)
# # # Update start/stop times
# # self._update_start_end_times(session)
# # # Change OBS scene if needed
# # self._obs_change_scene(current_row)
# # # Update hidden tracks
# # QTimer.singleShot(
# # Config.HIDE_AFTER_PLAYING_OFFSET, self.hide_or_show_played_tracks
# # )
# def populate_display(
# self, session: scoped_session, playlist_id: int, scroll_to_top: bool = True
# ) -> None:
# """
# Populate display from the associated playlist ID
# """
# print("playlists_v3:populate_display()")
# return
# # # Sanity check row numbering before we load
# # PlaylistRows.fixup_rownumbers(session, playlist_id)
# # # Clear playlist
# # self.setRowCount(0)
# # # Get played tracks
# # played_rows = self._get_played_rows(session)
# # # Add the rows
# # playlist = session.get(Playlists, playlist_id)
# # if not playlist:
# # if os.environ["MM_ENV"] == "PRODUCTION":
# # send_mail(
# # Config.ERRORS_TO,
# # Config.ERRORS_FROM,
# # "playlists:populate_display:no playlist",
# # stackprinter.format(),
# # )
# # print("playlists:populate_display:no playlist")
# # # stackprinter.show(add_summary=True, style="darkbg")
# # return
# # for plr in PlaylistRows.deep_rows(session, playlist_id):
# # self.insert_row(
# # session,
# # plr,
# # update_track_times=False,
# # played=plr.plr_rownum in played_rows,
# # )
# # # Scroll to top
# # if scroll_to_top:
# # row0_item = self.item(0, 0)
# # if row0_item:
# # self.scrollToItem(row0_item, QAbstractItemView.ScrollHint.PositionAtTop)
# # # Queue up time calculations to take place after UI has
# # # updated
# # self._update_start_end_times(session)
# # # It's possible that the current/next tracks are in this
# # # playlist, so check and set.
# # current_row = self._get_current_track_row_number()
# # if current_row is not None:
# # self._set_row_colour_current(current_row)
# # next_row = self._get_next_track_row_number()
# # if next_row is not None:
# # self._set_row_colour_next(next_row)
# # # Needed to wrap notes column correctly - add to event queue so
# # # that it's processed after list is populated
# # QTimer.singleShot(0, self.tab_visible)
# def remove_rows(self, row_numbers: List[int]) -> None:
# """Remove passed rows from display"""
# # Remove rows from display. Do so in reverse order so that
# # row numbers remain valid.
# for row in sorted(row_numbers, reverse=True):
# self.removeRow(row)
# def save_playlist(self, session: scoped_session) -> None:
# """
# Get the PlaylistRow objects for each row in the display. Correct
# the row_number and playlist_id if necessary. Remove any row
# numbers in the database that are higher than the last row in
# the display.
# """
# # Ensure all row plrs have correct row number and playlist_id
# for row_number in range(self.rowCount()):
# plr = self._get_row_plr(session, row_number)
# if not plr:
# continue
# plr.plr_rownum = row_number
# plr.playlist_id = self.playlist_id
# # Any rows in the database for this playlist that has a row
# # number equal to or greater than the row count needs to be
# # removed.
# PlaylistRows.delete_higher_rows(session, self.playlist_id, self.rowCount() - 1)
# # Get changes into db
# session.flush()
# def scroll_current_to_top(self) -> None:
# """Scroll currently-playing row to top"""
# current_row = self._get_current_track_row_number()
# if current_row is not None:
# self._scroll_to_top(current_row)
# def scroll_next_to_top(self) -> None:
# """Scroll nextly-playing row to top"""
# next_row = self._get_next_track_row_number()
# if next_row is not None:
# self._scroll_to_top(next_row)
def set_search(self, text: str) -> None:
"""Set search text and find first match"""
self.search_text = text
if not text:
# Search string has been reset
return
self._search(next=True)
# def search_next(self) -> None:
# """
# Select next row containg self.search_string.
# """
# self._search(next=True)
# def search_previous(self) -> None:
# """
# Select previous row containg self.search_string.
# """
# self._search(next=False)
# def select_next_row(self) -> None:
# """
# Select next or first row. Don't select section headers.
# Wrap at last row.
# """
# selected_rows = self._get_selected_rows()
# # we will only handle zero or one selected rows
# if len(selected_rows) > 1:
# return
# # select first row if none selected
# if len(selected_rows) == 0:
# row_number = 0
# else:
# row_number = selected_rows[0] + 1
# if row_number >= self.rowCount():
# row_number = 0
# # Don't select section headers
# wrapped = False
# track_id = self._get_row_track_id(row_number)
# while not track_id:
# row_number += 1
# if row_number >= self.rowCount():
# if wrapped:
# # we're already wrapped once, so there are no
# # non-headers
# return
# row_number = 0
# wrapped = True
# track_id = self._get_row_track_id(row_number)
# self.selectRow(row_number)
# def select_previous_row(self) -> None:
# """
# Select previous or last track. Don't select section headers.
# Wrap at first row.
# """
# selected_rows = self._get_selected_rows()
# # we will only handle zero or one selected rows
# if len(selected_rows) > 1:
# return
# # select last row if none selected
# last_row = self.rowCount() - 1
# if len(selected_rows) == 0:
# row_number = last_row
# else:
# row_number = selected_rows[0] - 1
# if row_number < 0:
# row_number = last_row
# # Don't select section headers
# wrapped = False
# track_id = self._get_row_track_id(row_number)
# while not track_id:
# row_number -= 1
# if row_number < 0:
# if wrapped:
# # we're already wrapped once, so there are no
# # non-notes
# return
# row_number = last_row
# wrapped = True
# track_id = self._get_row_track_id(row_number)
# self.selectRow(row_number)
# def select_rows(self, rows: List[int]) -> None:
# """
# Select rows that are passed
# """
# # Clear any selected rows to avoid confustion
# self.clear_selection()
# # We need to be in MultiSelection mode
# self.setSelectionMode(QAbstractItemView.SelectionMode.MultiSelection)
# # Select the rows
# for row in rows:
# self.selectRow(row)
# # Reset selection mode
# self.setSelectionMode(QAbstractItemView.SelectionMode.ExtendedSelection)
def set_row_as_next_track(self) -> None:
"""
Set selected row as next track
"""
selected_row = self._get_selected_row()
if selected_row is None:
return
model = cast(PlaylistModel, self.model())
model.set_next_row(selected_row)
self.clearSelection()
# def tab_visible(self) -> None:
# """Called when tab becomes visible"""
# # Set row heights
# self.resizeRowsToContents()
# self.setColumnWidth(len(columns) - 1, 0)
# # Hide/show rows
# self.hide_or_show_played_tracks()
# # # ########## Internally called functions ##########
def _add_track(self, row_number: int) -> None:
"""Add a track to a section header making it a normal track row"""
with Session() as session:
dlg = TrackSelectDialog(
session=session,
new_row_number=row_number,
playlist_id=self.playlist_id,
add_to_header=True,
)
dlg.exec()
def _build_context_menu(self, item: QTableWidgetItem) -> None:
"""Used to process context (right-click) menu, which is defined here"""
self.menu.clear()
row_number = item.row()
# track_id = self._get_row_track_id(row_number)
# track_row = bool(track_id)
header_row = False
model = cast(PlaylistModel, self.model())
if model:
header_row = model.is_header_row(row_number)
# current = row_number == self._get_current_track_row_number()
# next_row = row_number == self._get_next_track_row_number()
# # Play with mplayer
# if track_row and not current:
# self._add_context_menu(
# "Play with mplayer", lambda: self._mplayer_play(row_number)
# )
# # Paste
# self._add_context_menu(
# "Paste",
# lambda: self.musicmuster.paste_rows(),
# self.musicmuster.selected_plrs is None,
# )
# # Open in Audacity
# if track_row and not current:
# self._add_context_menu(
# "Open in Audacity", lambda: self._open_in_audacity(row_number)
# )
# # Rescan
# if track_row and not current:
# self._add_context_menu(
# "Rescan track", lambda: self._rescan(row_number, track_id)
# )
# # ----------------------
self.menu.addSeparator()
# # Remove row
# if not current and not next_row:
# self._add_context_menu("Delete row", self._delete_rows)
# # Move to playlist
# if not current and not next_row:
# self._add_context_menu(
# "Move to playlist...", self.musicmuster.move_selected
# )
# # ----------------------
# self.menu.addSeparator()
# # Remove track from row
# if track_row and not current and not next_row:
# self._add_context_menu(
# "Remove track from row", lambda: self._remove_track(row_number)
# )
# Add track to section header (ie, make this a track row)
if header_row:
self._add_context_menu("Add a track", lambda: self._add_track(row_number))
# # Mark unplayed
# if self._get_row_userdata(row_number, self.PLAYED):
# self._add_context_menu("Mark unplayed", self._mark_unplayed)
# # Unmark as next
# if next_row:
# self._add_context_menu("Unmark as next track", self.clear_next)
# # ----------------------
self.menu.addSeparator()
# # Sort
# sort_menu = self.menu.addMenu("Sort")
# self._add_context_menu(
# "by title", lambda: self._sort_selection(TITLE), parent_menu=sort_menu
# )
# self._add_context_menu(
# "by artist", lambda: self._sort_selection(ARTIST), parent_menu=sort_menu
# )
# self._add_context_menu(
# "by duration", lambda: self._sort_selection(DURATION), parent_menu=sort_menu
# )
# self._add_context_menu(
# "by last played",
# lambda: self._sort_selection(LASTPLAYED),
# parent_menu=sort_menu,
# )
# if sort_menu:
# sort_menu.setEnabled(self._sortable())
# self._add_context_menu("Undo sort", self._sort_undo, not bool(self.sort_undo))
# # Build submenu
# # ----------------------
# self.menu.addSeparator()
# # Info
# if track_row:
# self._add_context_menu("Info", lambda: self._info_row(track_id))
# # Track path
# if track_row:
# self._add_context_menu(
# "Copy track path", lambda: self._copy_path(row_number)
# )
# # return super(PlaylistTab, self).eventFilter(source, event)
def _calculate_end_time(
self, start: Optional[datetime], duration: int
) -> Optional[datetime]:
"""Return datetime 'duration' ms after 'start'"""
if start is None:
return None
return start + timedelta(milliseconds=duration)
def _column_resize(self, column_number: int, _old: int, _new: int) -> None:
"""
Called when column width changes. Save new width to database.
"""
header = self.horizontalHeader()
if not header:
return
# Resize rows if necessary
self.resizeRowsToContents()
with Session() as session:
attr_name = f"playlist_col_{column_number}_width"
record = Settings.get_int_settings(session, attr_name)
record.f_int = self.columnWidth(column_number)
def _context_menu(self, pos):
"""Display right-click menu"""
item = self.indexAt(pos)
self._build_context_menu(item)
self.menu.exec(self.mapToGlobal(pos))
def _copy_path(self, row_number: int) -> None:
"""
If passed row_number has a track, copy the track path, single-quoted,
to the clipboard. Otherwise, return None.
"""
track_path = self._get_row_track_path(row_number)
if not track_path:
return
replacements = [
("'", "\\'"),
(" ", "\\ "),
("(", "\\("),
(")", "\\)"),
]
for old, new in replacements:
track_path = track_path.replace(old, new)
cb = QApplication.clipboard()
cb.clear(mode=cb.Mode.Clipboard)
cb.setText(track_path, mode=cb.Mode.Clipboard)
def _delete_rows(self) -> None:
"""
Delete mutliple rows
Actions required:
- Remove the rows from the display
- Save the playlist
- Update track start/end times
"""
rows_to_delete: List[int] = []
with Session() as session:
plrs = self.get_selected_playlistrows(session)
row_count = len(plrs)
if not row_count:
return
# Get confirmation
plural = "s" if row_count > 1 else ""
if not ask_yes_no("Delete rows", f"Really delete {row_count} row{plural}?"):
return
rows_to_delete = [plr.plr_rownum for plr in plrs]
# Delete rows from database. Would be more efficient to
# query then have a single delete.
for plr in plrs:
session.delete(plr)
# Remove from display
self.remove_rows(rows_to_delete)
# Need to save the playlist to ensure the PlaylistRows have
# the correct row_number
self.save_playlist(session)
# Reset drag mode
# self.setDragEnabled(False)
self._update_start_end_times(session)
# def _find_next_track_row(
# self, session: scoped_session, starting_row: Optional[int] = None
# ) -> Optional[int]:
# """
# Find next track to play. If a starting row is given, start there;
# otherwise, start from top. Skip rows already played.
# If not found, return None.
# If found, return row number.
# """
# if starting_row is None:
# starting_row = 0
# track_rows = [
# p.plr_rownum
# for p in PlaylistRows.get_rows_with_tracks(session, self.playlist_id)
# ]
# played_rows = [
# p.plr_rownum
# for p in PlaylistRows.get_played_rows(session, self.playlist_id)
# ]
# for row_number in range(starting_row, self.rowCount()):
# if row_number not in track_rows or row_number in played_rows:
# continue
# plr = selWIP V3: play track workingf._get_row_plr(session, row_number)
# if not plr:
# continue
# if file_is_unreadable(plr.track.path):
# continue
# else:
# return row_number
# return None
def _get_current_track_row_number(self) -> Optional[int]:
"""Return current track row or None"""
current_track = self.musicmuster.current_track
if not current_track or not current_track.plr_id:
return None
return self._plrid_to_row_number(current_track.plr_id)
def _get_next_track_row_number(self) -> Optional[int]:
"""Return next track row or None"""
next_track = self.musicmuster.next_track
if not next_track or not next_track.plr_id:
return None
return self._plrid_to_row_number(next_track.plr_id)
# @staticmethod
# def _get_note_text_time(text: str) -> Optional[datetime]:
# """Return datetime specified as @hh:mm:ss in text"""
# try:
# match = start_time_re.search(text)
# except TypeError:
# return None
# if not match:
# return None
# try:
# return datetime.strptime(match.group(0)[1:], Config.NOTE_TIME_FORMAT)
# except ValueError:
# return None
def _get_played_rows(self, session: scoped_session) -> List[int]:
"""
Return a list of row numbers that have been played
"""
return [
p.plr_rownum
for p in PlaylistRows.get_played_rows(session, self.playlist_id)
if p.plr_rownum is not None
]
def _get_row_artist(self, row_number: int) -> str:
"""Return artist on this row_number or None if none"""
item_artist = self.item(row_number, ARTIST)
if not item_artist:
return ""
return item_artist.text()
def _get_row_duration(self, row_number: int) -> int:
"""Return duration associated with this row_number"""
duration_udata = self._get_row_userdata(row_number, self.ROW_DURATION)
if not duration_udata:
return 0
else:
return int(duration_udata)
def _get_row_last_played(self, row_number: int) -> Optional[datetime]:
"""Return last played datetime associated with this row_number"""
return self._get_row_userdata(row_number, self.ROW_LAST_PLAYED)
def _get_row_note(self, row_number: int) -> str:
"""Return note on this row_number or null string if none"""
track_id = self._get_row_track_id(row_number)
if track_id:
item_note = self.item(row_number, ROW_NOTES)
else:
item_note = self.item(row_number, HEADER_NOTES_COLUMN)
if not item_note:
return ""
return item_note.text()
def _get_row_path(self, row_number: int) -> str:
"""
Return path of track associated with this row_number or null string
"""
path = str(self._get_row_userdata(row_number, self.TRACK_PATH))
if not path:
return ""
return path
def _get_row_plr(
self, session: scoped_session, row_number: int
) -> Optional[PlaylistRows]:
"""
Return PlaylistRows object for this row_number
"""
return session.get(PlaylistRows, self._get_row_plr_id(row_number))
def _get_row_plr_id(self, row_number: int) -> int:
"""Return the plr_id associated with this row_number or 0"""
plr_id = self._get_row_userdata(row_number, self.PLAYLISTROW_ID)
if not plr_id:
return 0
else:
return int(plr_id)
def _get_row_title(self, row_number: int) -> Optional[str]:
"""Return title on this row_number or None if none"""
# Header rows may have note in TITLE row so check for track_id
if not self._get_row_track_id(row_number):
return None
item_title = self.item(row_number, TITLE)
if not item_title:
return None
return item_title.text()
def _get_row_track(
self, session: scoped_session, row_number: int
) -> Optional[Tracks]:
"""Return the track associated with this row_number or None"""
track_id = self._get_row_track_id(row_number)
if track_id:
return session.get(Tracks, track_id)
else:
return None
def _get_row_track_id(self, row_number: int) -> int:
"""Return the track_id associated with this row_number or None"""
track_id = self._get_row_userdata(row_number, self.ROW_TRACK_ID)
if not track_id:
return 0
else:
return int(track_id)
def _get_row_track_path(self, row_number: int) -> str:
"""Return the track path associated with this row_number or ''"""
path = self._get_row_userdata(row_number, self.TRACK_PATH)
if not path:
return ""
else:
return str(path)
def _get_row_userdata(self, row_number: int, role: int) -> Optional[Any]:
"""
Return the specified userdata, if any.
"""
userdata_item = self.item(row_number, USERDATA)
if not userdata_item:
return None
return userdata_item.data(role)
def _get_section_timing_string(
self, total_time: int, unplayed_time: int, no_end: bool = False
) -> str:
"""Return string describing section duration"""
total_duration = ms_to_mmss(total_time)
if unplayed_time:
unplayed_duration = ms_to_mmss(unplayed_time)
else:
unplayed_duration = "[No unplayed tracks]"
caveat = ""
if no_end:
caveat = " (to end of playlist)"
return f" {unplayed_duration} ({total_duration}){caveat}"
def _get_selected_row(self) -> Optional[int]:
"""
Return row_number number of first selected row,
or None if none selected
"""
sm = self.selectionModel()
if sm:
if sm.hasSelection():
return sm.selectedIndexes()[0].row()
return None
def _get_selected_rows(self) -> List[int]:
"""Return a list of selected row numbers sorted by row"""
# Use a set to deduplicate result (a selected row will have all
# items in that row selected)
return sorted(list(set([a.row() for a in self.selectedIndexes()])))
def _info_row(self, track_id: int) -> None:
"""Display popup with info re row"""
with Session() as session:
track = session.get(Tracks, track_id)
if track:
txt = (
f"Title: {track.title}\n"
f"Artist: {track.artist}\n"
f"Track ID: {track.id}\n"
f"Track duration: {ms_to_mmss(track.duration)}\n"
f"Track bitrate: {track.bitrate}\n"
f"Track fade at: {ms_to_mmss(track.fade_at)}\n"
f"Track silence at: {ms_to_mmss(track.silence_at)}"
"\n\n"
f"Path: {track.path}\n"
)
else:
txt = f"Can't find {track_id=}"
info: QMessageBox = QMessageBox(self)
info.setIcon(QMessageBox.Icon.Information)
info.setText(txt)
info.setStandardButtons(QMessageBox.StandardButton.Ok)
info.setDefaultButton(QMessageBox.StandardButton.Cancel)
info.exec()
def _look_up_row(self, website: str) -> None:
"""
If there is a selected row and it is a track row,
look up its title in the passed website
If multiple rows are selected, only consider the first one.
Otherwise return.
"""
print("playlists_v3:_look_up_row()")
return
# selected_row = self._get_selected_row()
# if not selected_row:
# return
# if not self._get_row_track_id(selected_row):
# return
# title = self._get_row_title(selected_row)
# if website == "wikipedia":
# QTimer.singleShot(
# 0, lambda: self.musicmuster.tabInfolist.open_in_wikipedia(title)
# )
# elif website == "songfacts":
# QTimer.singleShot(
# 0, lambda: self.musicmuster.tabInfolist.open_in_songfacts(title)
# )
# else:
# return
def _mark_unplayed(self) -> None:
"""
Mark selected rows as unplayed in this playlist
"""
with Session() as session:
for row_number in self._get_selected_rows():
_ = self._set_row_userdata(row_number, self.PLAYED, False)
self._set_row_bold(row_number, True)
plr = self._get_row_plr(session, row_number)
if not plr:
continue
plr.played = False
self._update_start_end_times(session)
self.clear_selection()
self.hide_or_show_played_tracks()
def _move_row(
self, session: scoped_session, plr: PlaylistRows, new_row_number: int
) -> None:
"""Move playlist row to new_row_number using parent copy/paste"""
if plr.plr_rownum is None:
return
# Remove source row
self.removeRow(plr.plr_rownum)
# Fixup plr row number
if plr.plr_rownum < new_row_number:
plr.plr_rownum = new_row_number - 1
else:
plr.plr_rownum = new_row_number
self.insert_row(session, plr)
self.save_playlist(session)
self.hide_or_show_played_tracks()
# Queue up time calculations to take place after UI has
# updated
self._update_start_end_times(session)
def _mplayer_play(self, row_number: int) -> None:
"""Play track with mplayer"""
track_path = self._get_row_track_path(row_number)
if not track_path:
log.error(
f"{self.playlist_id=} playlists._mplayer_play({row_number=}): "
"track_path not set"
)
return
cmd_list = ["gmplayer", "-vc", "null", "-vo", "null", track_path]
thread = threading.Thread(target=self._run_subprocess, args=(cmd_list,))
thread.start()
def _obs_change_scene(self, current_row: int) -> None:
"""
Try to change OBS scene to the name passed
"""
check_row = current_row
while True:
# If we have a note and it has a scene change command,
# execute it
note_text = self._get_row_note(check_row)
if note_text:
match_obj = scene_change_re.search(note_text)
if match_obj:
scene_name = match_obj.group(1)
if scene_name:
try:
cl = obs.ReqClient(
host=Config.OBS_HOST,
port=Config.OBS_PORT,
password=Config.OBS_PASSWORD,
)
except ConnectionRefusedError:
log.error("OBS connection refused")
return
try:
cl.set_current_program_scene(scene_name)
log.info(f"OBS scene changed to '{scene_name}'")
return
except obs.error.OBSSDKError as e:
log.error(f"OBS SDK error ({e})")
return
# After current track row, only check header rows and stop
# at first non-header row
check_row -= 1
if check_row < 0:
break
if self._get_row_track_id(check_row):
break
def _open_in_audacity(self, row_number: int) -> None:
"""Open track in Audacity. Audacity must be already running"""
track_path = self._get_row_track_path(row_number)
if not track_path:
log.error(
f"{self.playlist_id=} "
f"playlists._open_in_audactity({row_number=}): "
"track_path not set"
)
return
open_in_audacity(track_path)
def _plrid_to_row_number(self, plrid: int) -> Optional[int]:
"""
Return row number of passed plrid, or None if not found
"""
for row_number in range(self.rowCount()):
if self._get_row_plr_id(row_number) == plrid:
return row_number
return None
def _remove_track(self, row_number: int) -> None:
"""Remove track from row, making it a section header"""
# Get confirmation
if not ask_yes_no(
"Remove music", "Really remove the music track from this row?"
):
return
# Update playlist_rows record
with Session() as session:
plr = self._get_row_plr(session, row_number)
if not plr:
return
plr.track_id = None
# We can't have null text
if not plr.note:
plr.note = Config.TEXT_NO_TRACK_NO_NOTE
session.flush()
# Clear track text items
for i in range(2, len(columns)):
_ = self._set_item_text(row_number, i, "")
# Remove row duration
self._set_row_duration(row_number, 0)
# Remove row start gap
self._set_row_start_gap(row_number, None)
# Remote track_id from row
_ = self._set_row_userdata(row_number, self.ROW_TRACK_ID, 0)
# Span the rows
self.setSpan(row_number, HEADER_NOTES_COLUMN, 1, len(columns) - 1)
# Set note text in correct column for section head
self._set_row_header_text(session, row_number, plr.note)
self.clear_selection()
# Save playlist to ensure correct detection of new header
# row
self.save_playlist(session)
# Set track start/end times after track list is populated
self._update_start_end_times(session)
def _reorder_rows(self, source_row_numbers: List[int]) -> None:
"""
Take the list of source row numbers and put those playlist rows in that order.
Algorithm: create new rows below the source rows and copy source rows in
the correct order. When complete, delete source rows.
"""
next_row = max(source_row_numbers) + 1
for source_row_number in source_row_numbers:
self.insertRow(next_row)
for column in range(self.columnCount()):
self.setItem(next_row, column, self.takeItem(source_row_number, column))
next_row += 1
# Remove source rows
for i in reversed(sorted(source_row_numbers)):
self.removeRow(i)
def _rescan(self, row_number: int, track_id: int) -> None:
"""Rescan track"""
with Session() as session:
track = session.get(Tracks, track_id)
if track:
if file_is_unreadable(track.path):
self._set_row_colour_unreadable(row_number)
else:
self._set_row_colour_default(row_number)
set_track_metadata(track)
self._update_row_track_info(session, row_number, track)
else:
_ = self._set_row_track_id(row_number, 0)
note_text = self._get_row_note(row_number)
if note_text is None:
note_text = ""
else:
note_text += f"{track_id=} not found"
self._set_row_header_text(session, row_number, note_text)
log.error(f"playlists._rescan({track_id=}): " "Track not found")
self._set_row_colour_unreadable(row_number)
self._update_start_end_times(session)
self.clear_selection()
def _reset_next(self, old_plrid: int, new_plrid: int) -> None:
"""
Called when set_next_track_signal signal received.
Actions required:
- If old_plrid points to this playlist:
- Remove existing next track
- If new_plrid points to this playlist:
- Set track as next
- Display row as next track
- Update start/stop times
"""
with Session() as session:
# Get plrs
old_plr = new_plr = None
if old_plrid:
old_plr = session.get(PlaylistRows, old_plrid)
# Unmark next track
if old_plr and old_plr.playlist_id == self.playlist_id:
self._set_row_colour_default(old_plr.plr_rownum)
# Mark next track
if new_plrid:
new_plr = session.get(PlaylistRows, new_plrid)
if not new_plr:
log.error(f"_reset_next({new_plrid=}): plr not found")
return
if new_plr.playlist_id == self.playlist_id:
self._set_row_colour_next(new_plr.plr_rownum)
# Update start/stop times
self._update_start_end_times(session)
self.clear_selection()
def _run_subprocess(self, args):
"""Run args in subprocess"""
subprocess.call(args)
def _scroll_to_top(self, row_number: int) -> None:
"""
Scroll to put passed row_number Config.SCROLL_TOP_MARGIN from the
top.
"""
if row_number is None:
return
padding_required = Config.SCROLL_TOP_MARGIN
top_row = row_number
if row_number > Config.SCROLL_TOP_MARGIN:
# We can't scroll to a hidden row. Calculate target_row as
# the one that is ideal to be at the top. Then count upwards
# from passed row_number until we either reach the target,
# pass it or reach row_number 0.
for i in range(row_number - 1, -1, -1):
if self.isRowHidden(i):
continue
if padding_required == 0:
break
top_row = i
padding_required -= 1
scroll_item = self.item(top_row, 0)
self.scrollToItem(scroll_item, QAbstractItemView.ScrollHint.PositionAtTop)
def _search(self, next: bool = True) -> None:
"""
Select next/previous row containg self.search_string. Start from
top selected row if there is one, else from top.
Wrap at last/first row.
"""
if not self.search_text:
return
selected_row = self._get_selected_row()
if next:
if selected_row is not None and selected_row < self.rowCount() - 1:
starting_row = selected_row + 1
else:
starting_row = 0
else:
if selected_row is not None and selected_row > 0:
starting_row = selected_row - 1
else:
starting_row = self.rowCount() - 1
wrapped = False
match_row = None
row_number = starting_row
needle = self.search_text.lower()
while True:
# Check for match in title, artist or notes
title = self._get_row_title(row_number)
if title and needle in title.lower():
match_row = row_number
break
artist = self._get_row_artist(row_number)
if artist and needle in artist.lower():
match_row = row_number
break
note = self._get_row_note(row_number)
if note and needle in note.lower():
match_row = row_number
break
if next:
row_number += 1
if wrapped and row_number >= starting_row:
break
if row_number >= self.rowCount():
row_number = 0
wrapped = True
else:
row_number -= 1
if wrapped and row_number <= starting_row:
break
if row_number < 0:
row_number = self.rowCount() - 1
wrapped = True
if match_row is not None:
self.selectRow(row_number)
# kae def _select_event(self) -> None:
# kae """
# kae Called when item selection changes.
# kae If multiple rows are selected, display sum of durations in status bar.
# kae """
# kae selected_rows = self._get_selected_rows()
# kae # If no rows are selected, we have nothing to do
# kae if len(selected_rows) == 0:
# kae self.musicmuster.lblSumPlaytime.setText("")
# kae return
# kae ms = 0
# kae for row_number in selected_rows:
# kae ms += self._get_row_duration(row_number)
# kae if ms > 0:
# kae self.musicmuster.lblSumPlaytime.setText(
# kae f"Selected duration: {ms_to_mmss(ms)}"
# kae )
# kae else:
# kae self.musicmuster.lblSumPlaytime.setText("")
# def _set_cell_colour(
# self, row_number: int, column: int, colour: Optional[str] = None
# ) -> None:
# """
# Set or reset a cell background colour
# """
# if colour is None:
# brush = QBrush()
# else:
# brush = QBrush(QColor(colour))
# item = self.item(row_number, column)
# if item:
# item.setBackground(brush)
def _set_column_widths(self) -> None:
"""Column widths from settings"""
header = self.horizontalHeader()
if not header:
return
# Set width of last column to zero as it's set to stretch
self.setColumnWidth(header.count() - 1, 0)
# Set remaining column widths from settings
with Session() as session:
for column_number in range(header.count() - 1):
attr_name = f"playlist_col_{column_number}_width"
record = Settings.get_int_settings(session, attr_name)
if record.f_int is not None:
self.setColumnWidth(column_number, record.f_int)
else:
self.setColumnWidth(column_number, Config.DEFAULT_COLUMN_WIDTH)
def _set_item_text(
self, row_number: int, column: int, text: Optional[str]
) -> QTableWidgetItem:
"""
Set text for item if it exists, else create it, and return item
"""
if not text:
text = ""
item = self.item(row_number, column)
if not item:
item = QTableWidgetItem(text)
self.setItem(row_number, column, item)
else:
item.setText(text)
return item
def _set_played_row(self, session: scoped_session, row_number: int) -> None:
"""Mark this row as played"""
_ = self._set_row_userdata(row_number, self.PLAYED, True)
self._set_row_bold(row_number, False)
plr = self._get_row_plr(session, row_number)
if not plr:
return
plr.played = True
session.flush()
def _set_row_artist(
self, row_number: int, artist: Optional[str]
) -> QTableWidgetItem:
"""
Set row artist.
Return QTableWidgetItem.
"""
if not artist:
artist = ""
return self._set_item_text(row_number, ARTIST, artist)
def _set_row_bitrate(
self, row_number: int, bitrate: Optional[int]
) -> QTableWidgetItem:
"""Set bitrate of this row."""
if not bitrate:
bitrate_str = ""
# If no bitrate, flag it as too low
bitrate = Config.BITRATE_LOW_THRESHOLD - 1
else:
bitrate_str = str(bitrate)
bitrate_item = self._set_item_text(row_number, BITRATE, bitrate_str)
if bitrate < Config.BITRATE_LOW_THRESHOLD:
cell_colour = Config.COLOUR_BITRATE_LOW
elif bitrate < Config.BITRATE_OK_THRESHOLD:
cell_colour = Config.COLOUR_BITRATE_MEDIUM
else:
cell_colour = Config.COLOUR_BITRATE_OK
brush = QBrush(QColor(cell_colour))
bitrate_item.setBackground(brush)
return bitrate_item
def _set_row_bold(self, row_number: int, bold: bool = True) -> None:
"""
Make row bold (bold=True) or not bold.
Don't make notes column bold.
"""
boldfont = QFont()
boldfont.setBold(bold)
for column in range(self.columnCount()):
if column == ROW_NOTES:
continue
item = self.item(row_number, column)
if item:
item.setFont(boldfont)
def _set_row_colour(self, row_number: int, colour: Optional[str] = None) -> None:
"""
Set or reset row background colour
"""
if colour is None:
brush = QBrush()
else:
brush = QBrush(QColor(colour))
for column in range(1, self.columnCount()):
if column in [START_GAP, BITRATE]:
continue
item = self.item(row_number, column)
if item:
item.setBackground(brush)
def _set_row_colour_current(self, row_number: int) -> None:
"""
Set current track row colour
"""
self._set_row_colour(row_number, Config.COLOUR_CURRENT_PLAYLIST)
def _set_row_colour_default(self, row_number: int) -> None:
"""
Set default row colour
"""
self._set_row_colour(row_number, None)
def _set_row_colour_next(self, row_number: int) -> None:
"""
Set next track row colour
"""
self._set_row_colour(row_number, Config.COLOUR_NEXT_PLAYLIST)
def _set_row_colour_unreadable(self, row_number: int) -> None:
"""
Set unreadable row colour
"""
self._set_row_colour(row_number, Config.COLOUR_UNREADABLE)
def _set_row_duration(self, row_number: int, ms: Optional[int]) -> QTableWidgetItem:
"""Set duration of this row. Also set in row metadata"""
duration_item = self._set_item_text(row_number, DURATION, ms_to_mmss(ms))
self._set_row_userdata(row_number, self.ROW_DURATION, ms)
return duration_item
def _set_row_end_time(
self, row_number: int, time: Optional[datetime]
) -> QTableWidgetItem:
"""Set row end time"""
if not time:
time_str = ""
else:
try:
time_str = time.strftime(Config.TRACK_TIME_FORMAT)
except AttributeError:
time_str = ""
return self._set_item_text(row_number, END_TIME, time_str)
def _set_row_header_text(
self, session: scoped_session, row_number: int, text: str
) -> None:
"""
Set header text and row colour
"""
# Sanity check: this should be a header row and thus not have a
# track associate
if self._get_row_track_id(row_number):
if os.environ["MM_ENV"] == "PRODUCTION":
send_mail(
Config.ERRORS_TO,
Config.ERRORS_FROM,
"playlists:_set_row_header_text() called on track row",
stackprinter.format(),
)
print(
f"playists:_set_row_header_text() called on track row ({row_number=}, {text=}"
)
# stackprinter.show(add_summary=True, style="darkbg")
return
# Set text
_ = self._set_item_text(row_number, HEADER_NOTES_COLUMN, text)
# Set colour
note_colour = NoteColours.get_colour(session, text)
if not note_colour:
note_colour = Config.COLOUR_NOTES_PLAYLIST
self._set_row_colour(row_number, note_colour)
def _set_row_last_played_time(
self, row_number: int, last_played: datetime
) -> QTableWidgetItem:
"""Set row last played time. Also set in row metadata"""
self._set_row_userdata(row_number, self.ROW_LAST_PLAYED, last_played)
return self._set_item_text(
row_number, LASTPLAYED, get_relative_date(last_played)
)
def _set_row_note_colour(self, session: scoped_session, row_number: int) -> None:
"""
Set row note colour
"""
# Sanity check: this should be a track row and thus have a
# track associated
if not self._get_row_track_id(row_number):
if os.environ["MM_ENV"] == "PRODUCTION":
send_mail(
Config.ERRORS_TO,
Config.ERRORS_FROM,
"playlists:_set_row_note_colour() on header row",
stackprinter.format(),
)
# stackprinter.show(add_summary=True, style="darkbg")
print(f"playists:_set_row_note_colour() called on track row ({row_number=}")
return
# Set colour
note_text = self._get_row_note(row_number)
note_colour = NoteColours.get_colour(session, note_text)
self._set_cell_colour(row_number, ROW_NOTES, note_colour)
def _set_row_note_text(
self, session: scoped_session, row_number: int, text: str
) -> None:
"""
Set row note text and note colour
"""
# Sanity check: this should be a track row and thus have a
# track associated
if not self._get_row_track_id(row_number):
if os.environ["MM_ENV"] == "PRODUCTION":
send_mail(
Config.ERRORS_TO,
Config.ERRORS_FROM,
"playlists:_set_row_note_text() called on header row",
stackprinter.format(),
)
print(
f"playists:_set_row_note_text() called on header row ({row_number=}, {text=}"
)
# stackprinter.show(add_summary=True, style="darkbg")
return
# Set text
_ = self._set_item_text(row_number, ROW_NOTES, text)
# Set colour
self._set_row_note_colour(session, row_number)
def _set_row_plr_id(self, row_number: int, plr_id: int) -> QTableWidgetItem:
"""
Set PlaylistRows id
"""
return self._set_row_userdata(row_number, self.PLAYLISTROW_ID, plr_id)
def _set_row_start_gap(
self, row_number: int, start_gap: Optional[int]
) -> QTableWidgetItem:
"""
Set start gap on row, set backgroud colour.
Return QTableWidgetItem.
"""
if not start_gap:
start_gap = 0
start_gap_item = self._set_item_text(row_number, START_GAP, str(start_gap))
if start_gap >= 500:
brush = QBrush(QColor(Config.COLOUR_LONG_START))
else:
brush = QBrush()
start_gap_item.setBackground(brush)
return start_gap_item
def _set_row_start_time(
self, row_number: int, time: Optional[datetime]
) -> QTableWidgetItem:
"""Set row start time"""
if not time:
time_str = ""
else:
try:
time_str = time.strftime(Config.TRACK_TIME_FORMAT)
except AttributeError:
time_str = ""
return self._set_item_text(row_number, START_TIME, time_str)
def _set_row_times(
self, row_number: int, start: datetime, duration: int
) -> Optional[datetime]:
"""
Set row start and end times, return end time
"""
self._set_row_start_time(row_number, start)
end_time = self._calculate_end_time(start, duration)
self._set_row_end_time(row_number, end_time)
return end_time
def _set_row_title(self, row_number: int, title: Optional[str]) -> QTableWidgetItem:
"""
Set row title.
"""
if not title:
title = ""
return self._set_item_text(row_number, TITLE, title)
def _set_row_track_id(self, row_number: int, track_id: int) -> QTableWidgetItem:
"""
Set track id
"""
return self._set_row_userdata(row_number, self.ROW_TRACK_ID, track_id)
def _set_row_track_path(self, row_number: int, path: str) -> QTableWidgetItem:
"""
Set track path
"""
return self._set_row_userdata(row_number, self.TRACK_PATH, path)
def _set_row_userdata(
self, row_number: int, role: int, value: Any
) -> QTableWidgetItem:
"""
Set passed userdata in USERDATA column
"""
item = self.item(row_number, USERDATA)
if not item:
item = QTableWidgetItem()
self.setItem(row_number, USERDATA, item)
item.setData(role, value)
return item
def _sortable(self) -> bool:
"""
Return True if the selection is sortable. That means:
- at least two rows selected
- selected rows are contiguous
- selected rows do not include any header rows
"""
selectionModel = self.selectionModel()
if not selectionModel:
return False
source_rows = selectionModel.selectedRows()
if len(source_rows) < 2:
return False
sorted_source_rows = sorted([a.row() for a in source_rows])
if sorted_source_rows != list(
range(min(sorted_source_rows), max(sorted_source_rows) + 1)
):
return False
for row in sorted_source_rows:
if self._get_row_track_id(row) == 0:
return False
return True
def _sort_selection(self, sort_column: int) -> None:
"""
Algorithm:
- check row selection is contiguous; return if not
- copy (row-number, sort-field) to a list
- sort the list by sort-field
- create a new row after the selection
- iterate the list and move items to new row
- create another new row and repeat until all rows moved
- delete old rows
"""
if not self._sortable():
return
# Check selection is contiguous
selectionModel = self.selectionModel()
if not selectionModel:
return
source_row_numbers = [a.row() for a in selectionModel.selectedRows()]
# Copy (row-number, sort-field) to a list
sorted_rows: List[tuple[int, Any]] = []
for row in source_row_numbers:
if sort_column == DURATION:
sorted_rows.append((row, self._get_row_duration(row)))
elif sort_column == LASTPLAYED:
sorted_rows.append((row, self._get_row_last_played(row)))
else:
sort_item = self.item(row, sort_column)
if sort_item:
sorted_rows.append((row, sort_item.text()))
else:
sorted_rows.append((row, None))
# Sort the list
reverse = QApplication.keyboardModifiers() == Qt.KeyboardModifier.ShiftModifier
sorted_rows.sort(reverse=reverse, key=lambda row: row[1])
if sort_column == LASTPLAYED:
sorted_rows.reverse()
# Reorder rows
new_order = [a[0] for a in sorted_rows]
self.sort_undo = [
new_order.index(x) + min(new_order)
for x in range(min(new_order), max(new_order) + 1)
]
self._reorder_rows(new_order)
# Reset drag mode to allow row selection by dragging
# self.setDragEnabled(False)
# Save playlist
with Session() as session:
self.save_playlist(session)
self._update_start_end_times(session)
def _sort_undo(self):
"""Undo last sort"""
if not self.sort_undo:
return
new_order = self.sort_undo
self._reorder_rows(new_order)
self.sort_undo = [
new_order.index(x) + min(new_order)
for x in range(min(new_order), max(new_order) + 1)
]
# Reset drag mode to allow row selection by dragging
# self.setDragEnabled(False)
# Save playlist
with Session() as session:
self.save_playlist(session)
self._update_start_end_times(session)
def _span_cells(self, row: int, column: int, rowSpan: int, columnSpan: int) -> None:
"""
Implement spanning of cells, initiated by signal
"""
# Don't set spanning if already in place because that is seen as
# a change to the view and thus it refreshes the data which
# again calls us here.
if (
self.rowSpan(row, column) == rowSpan
and self.columnSpan(row, column) == columnSpan
):
return
self.setSpan(row, column, rowSpan, columnSpan)
def _track_time_between_rows(
self, session: scoped_session, from_plr: PlaylistRows, to_plr: PlaylistRows
) -> Tuple[int, int]:
"""
Returns the (total duration of all tracks in rows between
from_row and to_row inclusive, total unplayed time in those rows)
"""
plr_tracks = PlaylistRows.get_rows_with_tracks(
session, self.playlist_id, from_plr.plr_rownum, to_plr.plr_rownum
)
total_time = 0
total_time = sum([a.track.duration for a in plr_tracks if a.track.duration])
unplayed_time = 0
unplayed_time = sum(
[a.track.duration for a in plr_tracks if a.track.duration and not a.played]
)
return (total_time, unplayed_time)
def _update_row_track_info(
self, session: scoped_session, row: int, track: Tracks
) -> None:
"""
Update the passed row with info from the passed track.
"""
_ = self._set_row_artist(row, track.artist)
_ = self._set_row_bitrate(row, track.bitrate)
_ = self._set_row_duration(row, track.duration)
_ = self._set_row_end_time(row, None)
if track.playdates:
last_play = max([a.lastplayed for a in track.playdates])
else:
last_play = Config.EPOCH
_ = self._set_row_last_played_time(row, last_play)
_ = self._set_row_start_gap(row, track.start_gap)
_ = self._set_row_start_time(row, None)
_ = self._set_row_title(row, track.title)
_ = self._set_row_track_id(row, track.id)
_ = self._set_row_track_path(row, track.path)
if file_is_unreadable(track.path):
self._set_row_colour_unreadable(row)
def _update_section_headers(self, session: scoped_session) -> None:
"""
Update section headers with run time of section
"""
section_start_rows: List[PlaylistRows] = []
subtotal_from: Optional[PlaylistRows] = None
header_rows = [
self._get_row_plr_id(row_number)
for row_number in range(self.rowCount())
if self._get_row_track_id(row_number) == 0
]
plrs = PlaylistRows.plrids_to_plrs(session, self.playlist_id, header_rows)
for plr in plrs:
# Start of timed section
if plr.note.endswith("+"):
section_start_rows.append(plr)
subtotal_from = plr
continue
# End of timed section
elif plr.note.endswith("-"):
try:
from_plr = section_start_rows.pop()
to_plr = plr
total_time, unplayed_time = self._track_time_between_rows(
session, from_plr, to_plr
)
time_str = self._get_section_timing_string(
total_time, unplayed_time
)
self._set_row_header_text(
session, from_plr.plr_rownum, from_plr.note + time_str
)
# Update section end
if to_plr.note.strip() == "-":
new_text = (
"[End "
+ re.sub(
section_header_cleanup_re,
"",
from_plr.note,
).strip()
+ "]"
)
self._set_row_header_text(session, to_plr.plr_rownum, new_text)
subtotal_from = None
except IndexError:
# This ending row may have a time left from before a
# starting row above was deleted, so replace content
self._set_row_header_text(session, plr.plr_rownum, plr.note)
continue
# Subtotal
elif plr.note.endswith("="):
if not subtotal_from:
return
from_plr = subtotal_from
to_plr = plr
total_time, unplayed_time = self._track_time_between_rows(
session, subtotal_from, to_plr
)
time_str = self._get_section_timing_string(total_time, unplayed_time)
if to_plr.note.strip() == "=":
leader_text = "Subtotal: "
else:
leader_text = to_plr.note[:-1] + " "
new_text = leader_text + time_str
self._set_row_header_text(session, to_plr.plr_rownum, new_text)
subtotal_from = to_plr
# If we still have plrs in section_start_rows, there isn't an end
# section row for them
possible_plr = self._get_row_plr(session, self.rowCount() - 1)
if possible_plr:
to_plr = possible_plr
for from_plr in section_start_rows:
total_time, unplayed_time = self._track_time_between_rows(
session, from_plr, to_plr
)
time_str = self._get_section_timing_string(
total_time, unplayed_time, no_end=True
)
self._set_row_header_text(
session, from_plr.plr_rownum, from_plr.note + time_str
)
def _update_start_end_times(self, session: scoped_session) -> None:
"""Update track start and end times"""
current_track_end_time = self.musicmuster.current_track.end_time
current_track_row = self._get_current_track_row_number()
current_track_start_time = self.musicmuster.current_track.start_time
next_start_time = None
next_track_row = self._get_next_track_row_number()
played_rows = self._get_played_rows(session)
for row_number in range(self.rowCount()):
# Don't change start times for tracks that have been
# played other than current/next row
if row_number in played_rows and row_number not in [
current_track_row,
next_track_row,
]:
continue
# Get any timing from header row (that's all we need)
if self._get_row_track_id(row_number) == 0:
note_time = self._get_note_text_time(self._get_row_note(row_number))
if note_time:
next_start_time = note_time
continue
# We have a track. Skip if it is unreadable
if file_is_unreadable(self._get_row_path(row_number)):
continue
# Set next track start from end of current track
if row_number == next_track_row:
if current_track_end_time:
next_start_time = self._set_row_times(
row_number,
current_track_end_time,
self._get_row_duration(row_number),
)
continue
# Else set track times below
if row_number == current_track_row:
if not current_track_start_time:
continue
self._set_row_start_time(row_number, current_track_start_time)
self._set_row_end_time(row_number, current_track_end_time)
# Next track may be above us so only reset
# next_start_time if it's not set
if not next_start_time:
next_start_time = current_track_end_time
continue
if not next_start_time:
# Clear any existing times
self._set_row_start_time(row_number, None)
self._set_row_end_time(row_number, None)
continue
# If we're between the current and next row, zero out
# times
if (
current_track_row
and next_track_row
and current_track_row < row_number < next_track_row
):
self._set_row_start_time(row_number, None)
self._set_row_end_time(row_number, None)
else:
next_start_time = self._set_row_times(
row_number, next_start_time, self._get_row_duration(row_number)
)
self._update_section_headers(session)