musicmuster/app/playlists.py
2023-11-15 23:54:06 +00:00

969 lines
31 KiB
Python

import os
import re
import stackprinter # type: ignore
import subprocess
import threading
import obsws_python as obs # type: ignore
from datetime import datetime, timedelta
from typing import Any, Callable, cast, List, Optional, Tuple, TYPE_CHECKING
from PyQt6.QtCore import (
QEvent,
QModelIndex,
QObject,
QItemSelection,
Qt,
# QTimer,
)
from PyQt6.QtGui import QAction, QBrush, QColor, QFont, QDropEvent, QKeyEvent
from PyQt6.QtWidgets import (
QAbstractItemDelegate,
QAbstractItemView,
QApplication,
QHeaderView,
QMenu,
QMessageBox,
QPlainTextEdit,
QStyledItemDelegate,
QStyleOptionViewItem,
QTableView,
QTableWidgetItem,
QWidget,
QProxyStyle,
QStyle,
QStyleOption,
)
from dbconfig import Session, scoped_session
from dialogs import TrackSelectDialog
from classes import MusicMusterSignals, track_sequence
from config import Config
from helpers import (
ask_yes_no,
file_is_unreadable,
get_relative_date,
ms_to_mmss,
open_in_audacity,
send_mail,
set_track_metadata,
)
from log import log
from models import PlaylistRows, Settings, Tracks, NoteColours
if TYPE_CHECKING:
from musicmuster import Window
from playlistmodel import PlaylistModel
# HEADER_NOTES_COLUMN = 2
class EscapeDelegate(QStyledItemDelegate):
"""
- increases the height of a row when editing to make editing easier
- closes the edit on control-return
- checks with user before abandoning edit on Escape
"""
def __init__(self, parent) -> None:
super().__init__(parent)
self.signals = MusicMusterSignals()
def createEditor(
self,
parent: Optional[QWidget],
option: QStyleOptionViewItem,
index: QModelIndex,
):
"""
Intercept createEditor call and make row just a little bit taller
"""
self.signals = MusicMusterSignals()
self.signals.enable_escape_signal.emit(False)
if isinstance(self.parent(), PlaylistTab):
p = cast(PlaylistTab, self.parent())
if isinstance(index.data(), str):
row = index.row()
row_height = p.rowHeight(row)
p.setRowHeight(row, row_height + Config.MINIMUM_ROW_HEIGHT)
return QPlainTextEdit(parent)
return super().createEditor(parent, option, index)
def destroyEditor(self, editor: Optional[QWidget], index: QModelIndex) -> None:
"""
Intercept editor destroyment
"""
self.signals.enable_escape_signal.emit(True)
return super().destroyEditor(editor, index)
def eventFilter(self, editor: Optional[QObject], event: Optional[QEvent]) -> bool:
"""By default, QPlainTextEdit doesn't handle enter or return"""
if editor is None or event is None:
return False
if event.type() == QEvent.Type.KeyPress:
key_event = cast(QKeyEvent, event)
if key_event.key() == Qt.Key.Key_Return:
if key_event.modifiers() == (Qt.KeyboardModifier.ControlModifier):
self.commitData.emit(editor)
self.closeEditor.emit(editor)
return True
elif key_event.key() == Qt.Key.Key_Escape:
discard_edits = QMessageBox.question(
cast(QWidget, self.parent()), "Abandon edit", "Discard changes?"
)
if discard_edits == QMessageBox.StandardButton.Yes:
self.closeEditor.emit(editor)
return True
return False
def setEditorData(self, editor, index):
value = index.model().data(index, Qt.ItemDataRole.EditRole)
editor.setPlainText(value.value())
def setModelData(self, editor, model, index):
value = editor.toPlainText()
model.setData(index, value, Qt.ItemDataRole.EditRole)
def updateEditorGeometry(self, editor, option, index):
editor.setGeometry(option.rect)
class PlaylistStyle(QProxyStyle):
def drawPrimitive(self, element, option, painter, widget=None):
"""
Draw a line across the entire row rather than just the column
we're hovering over.
"""
if (
element == QStyle.PrimitiveElement.PE_IndicatorItemViewItemDrop
and not option.rect.isNull()
):
option_new = QStyleOption(option)
option_new.rect.setLeft(0)
if widget:
option_new.rect.setRight(widget.width())
option = option_new
super().drawPrimitive(element, option, painter, widget)
class PlaylistTab(QTableView):
def __init__(
self,
musicmuster: "Window",
playlist_id: int,
) -> None:
super().__init__()
# Save passed settings
self.musicmuster = musicmuster
self.playlist_id = playlist_id
# Set up widget
self.setItemDelegate(EscapeDelegate(self))
self.setAlternatingRowColors(True)
self.setSelectionMode(QAbstractItemView.SelectionMode.ExtendedSelection)
self.setSelectionBehavior(QAbstractItemView.SelectionBehavior.SelectRows)
# self.setEditTriggers(QAbstractItemView.EditTrigger.DoubleClicked)
self.setVerticalScrollMode(QAbstractItemView.ScrollMode.ScrollPerPixel)
self.setDragDropMode(QAbstractItemView.DragDropMode.InternalMove)
self.setDragDropOverwriteMode(False)
self.setAcceptDrops(True)
# Set our custom style - this draws the drop indicator across the whole row
self.setStyle(PlaylistStyle())
# We will enable dragging when rows are selected. Disabling it
# here means we can click and drag to select rows.
self.setDragEnabled(False)
# Prepare for context menu
self.menu = QMenu()
self.setContextMenuPolicy(Qt.ContextMenuPolicy.CustomContextMenu)
self.customContextMenuRequested.connect(self._context_menu)
# Connect signals
# This dancing is to satisfy mypy
h_header = self.horizontalHeader()
if isinstance(h_header, QHeaderView):
h_header.sectionResized.connect(self._column_resize)
h_header.setStretchLastSection(True)
# self.signals.set_next_track_signal.connect(self._reset_next)
self.signals = MusicMusterSignals()
self.signals.span_cells_signal.connect(self._span_cells)
# Call self.eventFilter() for events
# self.installEventFilter(self)
# Initialise miscellaneous instance variables
self.search_text: str = ""
self.sort_undo: List[int] = []
# self.edit_cell_type: Optional[int]
# Load playlist rows
self.setModel(PlaylistModel(playlist_id))
self._set_column_widths()
# ########## Events other than cell editing ##########
def dropEvent(self, event):
if event.source() is not self or (
event.dropAction() != Qt.DropAction.MoveAction
and self.dragDropMode() != QAbstractItemView.InternalMove
):
super().dropEvent(event)
from_rows = list(set([a.row() for a in self.selectedIndexes()]))
to_row = self.indexAt(event.position().toPoint()).row()
if (
0 <= min(from_rows) <= self.model().rowCount()
and 0 <= max(from_rows) <= self.model().rowCount()
and 0 <= to_row <= self.model().rowCount()
):
self.model().move_rows(from_rows, to_row)
# Reset drag mode to allow row selection by dragging
self.setDragEnabled(False)
# Deselect rows
self.clear_selection()
event.accept()
def _add_context_menu(
self,
text: str,
action: Callable,
disabled: bool = False,
parent_menu: Optional[QMenu] = None,
) -> Optional[QAction]:
"""
Add item to self.menu
"""
if parent_menu is None:
parent_menu = self.menu
menu_item = parent_menu.addAction(text)
if not menu_item:
return None
menu_item.setDisabled(disabled)
menu_item.triggered.connect(action)
return menu_item
def mouseReleaseEvent(self, event):
"""
Enable dragging if rows are selected
"""
if self.selectedIndexes():
self.setDragEnabled(True)
else:
self.setDragEnabled(False)
self.reset()
super().mouseReleaseEvent(event)
# # ########## Externally called functions ##########
def clear_selection(self) -> None:
"""Unselect all tracks and reset drag mode"""
self.clearSelection()
self.setDragEnabled(False)
def get_selected_row_number(self) -> Optional[int]:
"""
Return the selected row number or None if none selected.
"""
sm = self.selectionModel()
if sm and sm.hasSelection():
index = sm.currentIndex()
if index.isValid():
return index.row()
return None
def get_selected_playlistrow_ids(self) -> list:
"""
Return a list of PlaylistRow ids of the selected rows
"""
return [self._get_row_plr_id(a) for a in self._get_selected_rows()]
# def lookup_row_in_songfacts(self) -> None:
# """
# If there is a selected row and it is a track row,
# look up its title in songfacts.
# If multiple rows are selected, only consider the first one.
# Otherwise return.
# """
# self._look_up_row(website="songfacts")
# def lookup_row_in_wikipedia(self) -> None:
# """
# If there is a selected row and it is a track row,
# look up its title in wikipedia.
# If multiple rows are selected, only consider the first one.
# Otherwise return.
# """
# self._look_up_row(website="wikipedia")
# def scroll_current_to_top(self) -> None:
# """Scroll currently-playing row to top"""
# current_row = self._get_current_track_row_number()
# if current_row is not None:
# self._scroll_to_top(current_row)
# def scroll_next_to_top(self) -> None:
# """Scroll nextly-playing row to top"""
# next_row = self._get_next_track_row_number()
# if next_row is not None:
# self._scroll_to_top(next_row)
def set_search(self, text: str) -> None:
"""Set search text and find first match"""
self.search_text = text
if not text:
# Search string has been reset
return
self._search(next=True)
# def search_next(self) -> None:
# """
# Select next row containg self.search_string.
# """
# self._search(next=True)
# def search_previous(self) -> None:
# """
# Select previous row containg self.search_string.
# """
# self._search(next=False)
# def select_next_row(self) -> None:
# """
# Select next or first row. Don't select section headers.
# Wrap at last row.
# """
# selected_rows = self._get_selected_rows()
# # we will only handle zero or one selected rows
# if len(selected_rows) > 1:
# return
# # select first row if none selected
# if len(selected_rows) == 0:
# row_number = 0
# else:
# row_number = selected_rows[0] + 1
# if row_number >= self.rowCount():
# row_number = 0
# # Don't select section headers
# wrapped = False
# track_id = self._get_row_track_id(row_number)
# while not track_id:
# row_number += 1
# if row_number >= self.rowCount():
# if wrapped:
# # we're already wrapped once, so there are no
# # non-headers
# return
# row_number = 0
# wrapped = True
# track_id = self._get_row_track_id(row_number)
# self.selectRow(row_number)
# def select_previous_row(self) -> None:
# """
# Select previous or last track. Don't select section headers.
# Wrap at first row.
# """
# selected_rows = self._get_selected_rows()
# # we will only handle zero or one selected rows
# if len(selected_rows) > 1:
# return
# # select last row if none selected
# last_row = self.rowCount() - 1
# if len(selected_rows) == 0:
# row_number = last_row
# else:
# row_number = selected_rows[0] - 1
# if row_number < 0:
# row_number = last_row
# # Don't select section headers
# wrapped = False
# track_id = self._get_row_track_id(row_number)
# while not track_id:
# row_number -= 1
# if row_number < 0:
# if wrapped:
# # we're already wrapped once, so there are no
# # non-notes
# return
# row_number = last_row
# wrapped = True
# track_id = self._get_row_track_id(row_number)
# self.selectRow(row_number)
def set_row_as_next_track(self) -> None:
"""
Set selected row as next track
"""
selected_row = self._get_selected_row()
if selected_row is None:
return
model = cast(PlaylistModel, self.model())
model.set_next_row(selected_row)
self.clearSelection()
# # # ########## Internally called functions ##########
def _add_track(self, row_number: int) -> None:
"""Add a track to a section header making it a normal track row"""
with Session() as session:
dlg = TrackSelectDialog(
session=session,
new_row_number=row_number,
playlist_id=self.playlist_id,
add_to_header=True,
)
dlg.exec()
def _build_context_menu(self, item: QTableWidgetItem) -> None:
"""Used to process context (right-click) menu, which is defined here"""
self.menu.clear()
model = cast(PlaylistModel, self.model())
if not model:
return
row_number = item.row()
header_row = model.is_header_row(row_number)
track_row = not header_row
current_row = row_number == track_sequence.now.plr_rownum
next_row = row_number == track_sequence.next.plr_rownum
# Open in Audacity
if track_row and not current_row:
self._add_context_menu(
"Open in Audacity", lambda: model.open_in_audacity(row_number)
)
# Rescan
if track_row and not current_row:
self._add_context_menu("Rescan track", lambda: self._rescan(row_number))
# ----------------------
self.menu.addSeparator()
# Delete row
if not current_row and not next_row:
self._add_context_menu("Delete row", lambda: self._delete_rows())
# Remove track from row
if track_row and not current_row and not next_row:
self._add_context_menu(
"Remove track from row", lambda: model.remove_track(row_number)
)
# Add track to section header (ie, make this a track row)
# TODO
if header_row:
self._add_context_menu("Add a track", lambda: print("Add a track"))
# # ----------------------
self.menu.addSeparator()
# Mark unplayed
if track_row and model.is_unplayed_row(row_number):
self._add_context_menu(
"Mark unplayed", lambda: model.mark_unplayed(self._get_selected_rows())
)
# Unmark as next
if next_row:
self._add_context_menu(
"Unmark as next track", lambda: model.set_next_row(None)
)
# ----------------------
self.menu.addSeparator()
# Sort
sort_menu = self.menu.addMenu("Sort")
self._add_context_menu(
"by title",
lambda: model.sort_by_title(self._get_selected_rows()),
parent_menu=sort_menu,
)
self._add_context_menu(
"by artist",
lambda: model.sort_by_artist(self._get_selected_rows()),
parent_menu=sort_menu,
)
self._add_context_menu(
"by duration",
lambda: model.sort_by_duration(self._get_selected_rows()),
parent_menu=sort_menu,
)
self._add_context_menu(
"by last played",
lambda: model.sort_by_lastplayed(self._get_selected_rows()),
parent_menu=sort_menu,
)
# Info TODO
if track_row:
self._add_context_menu("Info", lambda: print("Track info"))
# Track path TODO
if track_row:
self._add_context_menu("Copy track path", lambda: print("Track path"))
def _calculate_end_time(
self, start: Optional[datetime], duration: int
) -> Optional[datetime]:
"""Return datetime 'duration' ms after 'start'"""
if start is None:
return None
return start + timedelta(milliseconds=duration)
def _column_resize(self, column_number: int, _old: int, _new: int) -> None:
"""
Called when column width changes. Save new width to database.
"""
header = self.horizontalHeader()
if not header:
return
# Resize rows if necessary
self.resizeRowsToContents()
with Session() as session:
attr_name = f"playlist_col_{column_number}_width"
record = Settings.get_int_settings(session, attr_name)
record.f_int = self.columnWidth(column_number)
def _context_menu(self, pos):
"""Display right-click menu"""
item = self.indexAt(pos)
self._build_context_menu(item)
self.menu.exec(self.mapToGlobal(pos))
def _copy_path(self, row_number: int) -> None:
"""
If passed row_number has a track, copy the track path, single-quoted,
to the clipboard. Otherwise, return None.
"""
track_path = self._get_row_track_path(row_number)
if not track_path:
return
replacements = [
("'", "\\'"),
(" ", "\\ "),
("(", "\\("),
(")", "\\)"),
]
for old, new in replacements:
track_path = track_path.replace(old, new)
cb = QApplication.clipboard()
cb.clear(mode=cb.Mode.Clipboard)
cb.setText(track_path, mode=cb.Mode.Clipboard)
def _delete_rows(self) -> None:
"""
Delete mutliple rows
Actions required:
- Confirm deletion should go ahead
- Pass to model to do the deed
"""
rows_to_delete = self._get_selected_rows()
row_count = len(rows_to_delete)
if row_count < 1:
return
# Get confirmation
plural = "s" if row_count > 1 else ""
if not ask_yes_no("Delete rows", f"Really delete {row_count} row{plural}?"):
return
model = cast(PlaylistModel, self.model())
model.delete_rows(self._get_selected_rows())
def _get_selected_row(self) -> Optional[int]:
"""
Return row_number number of first selected row,
or None if none selected
"""
sm = self.selectionModel()
if sm:
if sm.hasSelection():
return sm.selectedIndexes()[0].row()
return None
def _get_selected_rows(self) -> List[int]:
"""Return a list of selected row numbers sorted by row"""
# Use a set to deduplicate result (a selected row will have all
# items in that row selected)
return sorted(list(set([a.row() for a in self.selectedIndexes()])))
def _info_row(self, track_id: int) -> None:
"""Display popup with info re row"""
with Session() as session:
track = session.get(Tracks, track_id)
if track:
txt = (
f"Title: {track.title}\n"
f"Artist: {track.artist}\n"
f"Track ID: {track.id}\n"
f"Track duration: {ms_to_mmss(track.duration)}\n"
f"Track bitrate: {track.bitrate}\n"
f"Track fade at: {ms_to_mmss(track.fade_at)}\n"
f"Track silence at: {ms_to_mmss(track.silence_at)}"
"\n\n"
f"Path: {track.path}\n"
)
else:
txt = f"Can't find {track_id=}"
info: QMessageBox = QMessageBox(self)
info.setIcon(QMessageBox.Icon.Information)
info.setText(txt)
info.setStandardButtons(QMessageBox.StandardButton.Ok)
info.setDefaultButton(QMessageBox.StandardButton.Cancel)
info.exec()
def _look_up_row(self, website: str) -> None:
"""
If there is a selected row and it is a track row,
look up its title in the passed website
If multiple rows are selected, only consider the first one.
Otherwise return.
"""
print("playlists_v3:_look_up_row()")
return
# selected_row = self._get_selected_row()
# if not selected_row:
# return
# if not self._get_row_track_id(selected_row):
# return
# title = self._get_row_title(selected_row)
# if website == "wikipedia":
# QTimer.singleShot(
# 0, lambda: self.musicmuster.tabInfolist.open_in_wikipedia(title)
# )
# elif website == "songfacts":
# QTimer.singleShot(
# 0, lambda: self.musicmuster.tabInfolist.open_in_songfacts(title)
# )
# else:
# return
def _obs_change_scene(self, current_row: int) -> None:
"""
Try to change OBS scene to the name passed
"""
check_row = current_row
while True:
# If we have a note and it has a scene change command,
# execute it
note_text = self._get_row_note(check_row)
if note_text:
match_obj = scene_change_re.search(note_text)
if match_obj:
scene_name = match_obj.group(1)
if scene_name:
try:
cl = obs.ReqClient(
host=Config.OBS_HOST,
port=Config.OBS_PORT,
password=Config.OBS_PASSWORD,
)
except ConnectionRefusedError:
log.error("OBS connection refused")
return
try:
cl.set_current_program_scene(scene_name)
log.info(f"OBS scene changed to '{scene_name}'")
return
except obs.error.OBSSDKError as e:
log.error(f"OBS SDK error ({e})")
return
# After current track row, only check header rows and stop
# at first non-header row
check_row -= 1
if check_row < 0:
break
if self._get_row_track_id(check_row):
break
def _open_in_audacity(self, row_number: int) -> None:
"""Open track in Audacity. Audacity must be already running"""
track_path = self._get_row_track_path(row_number)
if not track_path:
log.error(
f"{self.playlist_id=} "
f"playlists._open_in_audactity({row_number=}): "
"track_path not set"
)
return
open_in_audacity(track_path)
def _rescan(self, row_number: int) -> None:
"""Rescan track"""
model = cast(PlaylistModel, self.model())
model.rescan_track(row_number)
self.clear_selection()
def _reset_next(self, old_plrid: int, new_plrid: int) -> None:
"""
Called when set_next_track_signal signal received.
Actions required:
- If old_plrid points to this playlist:
- Remove existing next track
- If new_plrid points to this playlist:
- Set track as next
- Display row as next track
- Update start/stop times
"""
with Session() as session:
# Get plrs
old_plr = new_plr = None
if old_plrid:
old_plr = session.get(PlaylistRows, old_plrid)
# Unmark next track
if old_plr and old_plr.playlist_id == self.playlist_id:
self._set_row_colour_default(old_plr.plr_rownum)
# Mark next track
if new_plrid:
new_plr = session.get(PlaylistRows, new_plrid)
if not new_plr:
log.error(f"_reset_next({new_plrid=}): plr not found")
return
if new_plr.playlist_id == self.playlist_id:
self._set_row_colour_next(new_plr.plr_rownum)
# Update start/stop times
self._update_start_end_times(session)
self.clear_selection()
def _run_subprocess(self, args):
"""Run args in subprocess"""
subprocess.call(args)
def _scroll_to_top(self, row_number: int) -> None:
"""
Scroll to put passed row_number Config.SCROLL_TOP_MARGIN from the
top.
"""
if row_number is None:
return
padding_required = Config.SCROLL_TOP_MARGIN
top_row = row_number
if row_number > Config.SCROLL_TOP_MARGIN:
# We can't scroll to a hidden row. Calculate target_row as
# the one that is ideal to be at the top. Then count upwards
# from passed row_number until we either reach the target,
# pass it or reach row_number 0.
for i in range(row_number - 1, -1, -1):
if self.isRowHidden(i):
continue
if padding_required == 0:
break
top_row = i
padding_required -= 1
scroll_item = self.item(top_row, 0)
self.scrollToItem(scroll_item, QAbstractItemView.ScrollHint.PositionAtTop)
def _search(self, next: bool = True) -> None:
"""
Select next/previous row containg self.search_string. Start from
top selected row if there is one, else from top.
Wrap at last/first row.
"""
if not self.search_text:
return
selected_row = self._get_selected_row()
if next:
if selected_row is not None and selected_row < self.rowCount() - 1:
starting_row = selected_row + 1
else:
starting_row = 0
else:
if selected_row is not None and selected_row > 0:
starting_row = selected_row - 1
else:
starting_row = self.rowCount() - 1
wrapped = False
match_row = None
row_number = starting_row
needle = self.search_text.lower()
while True:
# Check for match in title, artist or notes
title = self._get_row_title(row_number)
if title and needle in title.lower():
match_row = row_number
break
artist = self._get_row_artist(row_number)
if artist and needle in artist.lower():
match_row = row_number
break
note = self._get_row_note(row_number)
if note and needle in note.lower():
match_row = row_number
break
if next:
row_number += 1
if wrapped and row_number >= starting_row:
break
if row_number >= self.rowCount():
row_number = 0
wrapped = True
else:
row_number -= 1
if wrapped and row_number <= starting_row:
break
if row_number < 0:
row_number = self.rowCount() - 1
wrapped = True
if match_row is not None:
self.selectRow(row_number)
def selectionChanged(self, selected: QItemSelection, deselected: QItemSelection) -> None:
"""
Toggle drag behaviour according to whether rows are selected
"""
selected_rows = self._get_selected_rows()
# If no rows are selected, we have nothing to do
if len(selected_rows) == 0:
self.musicmuster.lblSumPlaytime.setText("")
else:
model = cast(PlaylistModel, self.model())
selected_duration = model.get_rows_duration(self._get_selected_rows())
if selected_duration > 0:
self.musicmuster.lblSumPlaytime.setText(
f"Selected duration: {ms_to_mmss(selected_duration)}"
)
else:
self.musicmuster.lblSumPlaytime.setText("")
super().selectionChanged(selected, deselected)
def _set_column_widths(self) -> None:
"""Column widths from settings"""
header = self.horizontalHeader()
if not header:
return
# Set width of last column to zero as it's set to stretch
self.setColumnWidth(header.count() - 1, 0)
# Set remaining column widths from settings
with Session() as session:
for column_number in range(header.count() - 1):
attr_name = f"playlist_col_{column_number}_width"
record = Settings.get_int_settings(session, attr_name)
if record.f_int is not None:
self.setColumnWidth(column_number, record.f_int)
else:
self.setColumnWidth(column_number, Config.DEFAULT_COLUMN_WIDTH)
def _set_row_note_colour(self, session: scoped_session, row_number: int) -> None:
"""
Set row note colour
"""
# Sanity check: this should be a track row and thus have a
# track associated
if not self._get_row_track_id(row_number):
if os.environ["MM_ENV"] == "PRODUCTION":
send_mail(
Config.ERRORS_TO,
Config.ERRORS_FROM,
"playlists:_set_row_note_colour() on header row",
stackprinter.format(),
)
# stackprinter.show(add_summary=True, style="darkbg")
print(f"playists:_set_row_note_colour() called on track row ({row_number=}")
return
# Set colour
note_text = self._get_row_note(row_number)
note_colour = NoteColours.get_colour(session, note_text)
self._set_cell_colour(row_number, ROW_NOTES, note_colour)
def _span_cells(self, row: int, column: int, rowSpan: int, columnSpan: int) -> None:
"""
Implement spanning of cells, initiated by signal
"""
# Don't set spanning if already in place because that is seen as
# a change to the view and thus it refreshes the data which
# again calls us here.
if (
self.rowSpan(row, column) == rowSpan
and self.columnSpan(row, column) == columnSpan
):
return
self.setSpan(row, column, rowSpan, columnSpan)