Compare commits

...

18 Commits

Author SHA1 Message Date
Keith Edmunds
0ea12eb9d9 Clean up merge from dev 2025-03-30 12:05:41 +01:00
Keith Edmunds
75cc7a3f19 Merge changes from dev 2025-03-30 11:56:24 +01:00
Keith Edmunds
f64671d126 Improve function logging
Use @log_call decorator

Add 'checked' parameter to menu slots because PyQt6 will pass a
boolean 'checked' parameter even when the menu item can't be checked.

Remove superfluous logging calls.
2025-03-29 21:04:59 +00:00
Keith Edmunds
2bf1bc64e7 WIP: Unify function to move rows within/between playlists 2025-03-29 18:20:38 +00:00
Keith Edmunds
3c7fc20e5a Remove kae.py from git 2025-03-29 18:20:38 +00:00
Keith Edmunds
52d2269ece WIP: Move rows within and between playlists working 2025-03-29 18:20:38 +00:00
Keith Edmunds
3cd764c893 WIP: moving rows within playlist works 2025-03-29 18:20:38 +00:00
Keith Edmunds
65878b0b75 WIP: all tests for move rows within playlist working 2025-03-29 18:20:38 +00:00
Keith Edmunds
4e89d72a8f WIP: move within playlist tests working 2025-03-29 18:20:38 +00:00
Keith Edmunds
92ecb632b5 Report correct line for ApplicationError 2025-03-29 18:20:38 +00:00
Keith Edmunds
6296566c2c WIP: Can play tracks without errors 2025-03-29 18:20:38 +00:00
Keith Edmunds
7e5b170f5e Use @singleton decorator 2025-03-29 18:20:38 +00:00
Keith Edmunds
3db71a08ae WIP remove sessions, use reporistory 2025-03-29 18:20:38 +00:00
Keith Edmunds
7b0e2b2c6c WIP: playlists load, can't play track 2025-03-29 18:20:38 +00:00
Keith Edmunds
4265472d73 Keep track of selected rows in model 2025-03-29 18:20:38 +00:00
Keith Edmunds
c94cadf24f WIP: Use PlaylistRowDTO to isolate SQLAlchemy objects 2025-03-29 18:20:38 +00:00
Keith Edmunds
9720c11ecc Don't track kae.py in git 2025-03-29 18:20:13 +00:00
Keith Edmunds
ca4c490091 Add log_call decorator and issue 287 logging 2025-03-29 18:19:14 +00:00
19 changed files with 2534 additions and 1572 deletions

1
.gitignore vendored
View File

@ -14,3 +14,4 @@ StudioPlaylist.png
tmp/ tmp/
.coverage .coverage
profile_output* profile_output*
kae.py

View File

@ -1,7 +1,7 @@
# Standard library imports # Standard library imports
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
import datetime as dt
from enum import auto, Enum from enum import auto, Enum
import functools import functools
import threading import threading
@ -91,31 +91,6 @@ class Filter:
duration_unit: str = "minutes" duration_unit: str = "minutes"
@singleton
@dataclass
class MusicMusterSignals(QObject):
"""
Class for all MusicMuster signals. See:
- https://zetcode.com/gui/pyqt5/eventssignals/
- https://stackoverflow.com/questions/62654525/emit-a-signal-from-another-class-to-main-class
"""
begin_reset_model_signal = pyqtSignal(int)
enable_escape_signal = pyqtSignal(bool)
end_reset_model_signal = pyqtSignal(int)
next_track_changed_signal = pyqtSignal()
resize_rows_signal = pyqtSignal(int)
search_songfacts_signal = pyqtSignal(str)
search_wikipedia_signal = pyqtSignal(str)
show_warning_signal = pyqtSignal(str, str)
span_cells_signal = pyqtSignal(int, int, int, int, int)
status_message_signal = pyqtSignal(str, int)
track_ended_signal = pyqtSignal()
def __post_init__(self):
super().__init__()
class PlaylistStyle(QProxyStyle): class PlaylistStyle(QProxyStyle):
def drawPrimitive(self, element, option, painter, widget=None): def drawPrimitive(self, element, option, painter, widget=None):
""" """
@ -149,6 +124,89 @@ class Tags(NamedTuple):
duration: int = 0 duration: int = 0
@dataclass
class PlaylistDTO:
name: str
playlist_id: int
favourite: bool = False
is_template: bool = False
open: bool = False
@dataclass
class TrackDTO:
track_id: int
artist: str
bitrate: int
duration: int
fade_at: int
intro: int | None
path: str
silence_at: int
start_gap: int
title: str
lastplayed: dt.datetime | None
@dataclass
class PlaylistRowDTO(TrackDTO):
note: str
played: bool
playlist_id: int
playlistrow_id: int
row_number: int
class TrackInfo(NamedTuple): class TrackInfo(NamedTuple):
track_id: int track_id: int
row_number: int row_number: int
# Classes for signals
@dataclass
class InsertRows:
playlist_id: int
from_row: int
to_row: int
@dataclass
class InsertTrack:
playlist_id: int
track_id: int | None
note: str
@singleton
@dataclass
class MusicMusterSignals(QObject):
"""
Class for all MusicMuster signals. See:
- https://zetcode.com/gui/pyqt5/eventssignals/
- https://stackoverflow.com/questions/62654525/emit-a-signal-from-another-class-to-main-class
"""
begin_reset_model_signal = pyqtSignal(int)
enable_escape_signal = pyqtSignal(bool)
end_reset_model_signal = pyqtSignal(int)
next_track_changed_signal = pyqtSignal()
resize_rows_signal = pyqtSignal(int)
search_songfacts_signal = pyqtSignal(str)
search_wikipedia_signal = pyqtSignal(str)
show_warning_signal = pyqtSignal(str, str)
signal_add_track_to_header = pyqtSignal(int, int)
signal_begin_insert_rows = pyqtSignal(InsertRows)
signal_end_insert_rows = pyqtSignal(int)
signal_insert_track = pyqtSignal(InsertTrack)
signal_playlist_selected_rows = pyqtSignal(int, list)
signal_set_next_row = pyqtSignal(int)
# signal_set_next_track takes a PlaylistRow as an argument. We can't
# specify that here as it requires us to import PlaylistRow from
# playlistrow.py, which itself imports MusicMusterSignals
signal_set_next_track = pyqtSignal(object)
span_cells_signal = pyqtSignal(int, int, int, int, int)
status_message_signal = pyqtSignal(str, int)
track_ended_signal = pyqtSignal()
def __post_init__(self):
super().__init__()

View File

@ -112,6 +112,8 @@ class Config(object):
PLAYLIST_ICON_CURRENT = ":/icons/green-circle.png" PLAYLIST_ICON_CURRENT = ":/icons/green-circle.png"
PLAYLIST_ICON_NEXT = ":/icons/yellow-circle.png" PLAYLIST_ICON_NEXT = ":/icons/yellow-circle.png"
PLAYLIST_ICON_TEMPLATE = ":/icons/redstar.png" PLAYLIST_ICON_TEMPLATE = ":/icons/redstar.png"
PLAYLIST_PENDING_MOVE = -1
PLAYLIST_FAILED_MOVE = -2
PREVIEW_ADVANCE_MS = 5000 PREVIEW_ADVANCE_MS = 5000
PREVIEW_BACK_MS = 5000 PREVIEW_BACK_MS = 5000
PREVIEW_END_BUFFER_MS = 1000 PREVIEW_END_BUFFER_MS = 1000

View File

@ -9,12 +9,22 @@ from PyQt6.QtWidgets import (
QListWidgetItem, QListWidgetItem,
QMainWindow, QMainWindow,
) )
from PyQt6.QtWidgets import (
QDialog,
QHBoxLayout,
QLabel,
QLineEdit,
QListWidget,
QListWidgetItem,
QPushButton,
QVBoxLayout,
)
# Third party imports # Third party imports
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
# App imports # App imports
from classes import MusicMusterSignals from classes import ApplicationError, InsertTrack, MusicMusterSignals
from helpers import ( from helpers import (
ask_yes_no, ask_yes_no,
get_relative_date, get_relative_date,
@ -23,209 +33,153 @@ from helpers import (
from log import log from log import log
from models import Settings, Tracks from models import Settings, Tracks
from playlistmodel import PlaylistModel from playlistmodel import PlaylistModel
import repository
from ui import dlg_TrackSelect_ui from ui import dlg_TrackSelect_ui
class TrackSelectDialog(QDialog): class TrackInsertDialog(QDialog):
"""Select track from database"""
def __init__( def __init__(
self, self,
parent: QMainWindow, parent: QMainWindow,
session: Session, playlist_id: int,
new_row_number: int,
base_model: PlaylistModel,
add_to_header: Optional[bool] = False, add_to_header: Optional[bool] = False,
*args: Qt.WindowType,
**kwargs: Qt.WindowType,
) -> None: ) -> None:
""" """
Subclassed QDialog to manage track selection Subclassed QDialog to manage track selection
""" """
super().__init__(parent, *args, **kwargs) super().__init__(parent)
self.session = session self.playlist_id = playlist_id
self.new_row_number = new_row_number
self.base_model = base_model
self.add_to_header = add_to_header self.add_to_header = add_to_header
self.ui = dlg_TrackSelect_ui.Ui_Dialog() self.setWindowTitle("Insert Track")
self.ui.setupUi(self)
self.ui.btnAdd.clicked.connect(self.add_selected)
self.ui.btnAddClose.clicked.connect(self.add_selected_and_close)
self.ui.btnClose.clicked.connect(self.close)
self.ui.matchList.itemDoubleClicked.connect(self.add_selected)
self.ui.matchList.itemSelectionChanged.connect(self.selection_changed)
self.ui.radioTitle.toggled.connect(self.title_artist_toggle)
self.ui.searchString.textEdited.connect(self.chars_typed)
self.track: Optional[Tracks] = None
self.signals = MusicMusterSignals()
record = Settings.get_setting(self.session, "dbdialog_width") # Title input on one line
width = record.f_int or 800 self.title_label = QLabel("Title:")
record = Settings.get_setting(self.session, "dbdialog_height") self.title_edit = QLineEdit()
height = record.f_int or 600 self.title_edit.textChanged.connect(self.update_list)
self.resize(width, height)
if add_to_header: title_layout = QHBoxLayout()
self.ui.lblNote.setVisible(False) title_layout.addWidget(self.title_label)
self.ui.txtNote.setVisible(False) title_layout.addWidget(self.title_edit)
def add_selected(self) -> None: # Track list
"""Handle Add button""" self.track_list = QListWidget()
self.track_list.itemDoubleClicked.connect(self.add_clicked)
self.track_list.itemSelectionChanged.connect(self.selection_changed)
track = None # Note input on one line
self.note_label = QLabel("Note:")
self.note_edit = QLineEdit()
if self.ui.matchList.selectedItems(): note_layout = QHBoxLayout()
item = self.ui.matchList.currentItem() note_layout.addWidget(self.note_label)
if item: note_layout.addWidget(self.note_edit)
track = item.data(Qt.ItemDataRole.UserRole)
note = self.ui.txtNote.text() # Track path
self.path = QLabel()
path_layout = QHBoxLayout()
path_layout.addWidget(self.path)
if not (track or note): # Buttons
self.add_btn = QPushButton("Add")
self.add_close_btn = QPushButton("Add and close")
self.close_btn = QPushButton("Close")
self.add_btn.clicked.connect(self.add_clicked)
self.add_close_btn.clicked.connect(self.add_and_close_clicked)
self.close_btn.clicked.connect(self.close)
btn_layout = QHBoxLayout()
btn_layout.addWidget(self.add_btn)
btn_layout.addWidget(self.add_close_btn)
btn_layout.addWidget(self.close_btn)
# Main layout
layout = QVBoxLayout()
layout.addLayout(title_layout)
layout.addWidget(self.track_list)
layout.addLayout(note_layout)
layout.addLayout(path_layout)
layout.addLayout(btn_layout)
self.setLayout(layout)
self.resize(800, 600)
# TODO
# record = Settings.get_setting(self.session, "dbdialog_width")
# width = record.f_int or 800
# record = Settings.get_setting(self.session, "dbdialog_height")
# height = record.f_int or 600
# self.resize(width, height)
def update_list(self, text: str) -> None:
self.track_list.clear()
if text.strip() == "":
# Do not search or populate list if input is empty
return return
track_id = None if text.startswith("a/") and len(text) > 2:
if track: self.tracks = repository.tracks_like_artist(text[2:])
track_id = track.id else:
self.tracks = repository.tracks_like_title(text)
if note and not track_id: for track in self.tracks:
self.base_model.insert_row(self.new_row_number, track_id, note) duration_str = ms_to_mmss(track.duration)
self.ui.txtNote.clear() last_played_str = get_relative_date(track.lastplayed)
self.new_row_number += 1 item_str = (
f"{track.title} - {track.artist} [{duration_str}] {last_played_str}"
)
item = QListWidgetItem(item_str)
item.setData(Qt.ItemDataRole.UserRole, track.track_id)
self.track_list.addItem(item)
def get_selected_track_id(self) -> int | None:
selected_items = self.track_list.selectedItems()
if selected_items:
return selected_items[0].data(Qt.ItemDataRole.UserRole)
return None
def add_clicked(self):
track_id = self.get_selected_track_id()
note_text = self.note_edit.text()
if track_id is None and not note_text:
return return
self.ui.txtNote.clear() insert_track_data = InsertTrack(self.playlist_id, track_id, note_text)
self.select_searchtext() self.signals.signal_insert_track.emit(insert_track_data)
if track_id is None: self.title_edit.clear()
log.error("track_id is None and should not be") self.note_edit.clear()
return self.track_list.clear()
self.title_edit.setFocus()
# Check whether track is already in playlist
move_existing = False
existing_prd = self.base_model.is_track_in_playlist(track_id)
if existing_prd is not None:
if ask_yes_no(
"Duplicate row",
"Track already in playlist. " "Move to new location?",
default_yes=True,
):
move_existing = True
if self.add_to_header: if self.add_to_header:
if move_existing and existing_prd: # "and existing_prd" for mypy's benefit
self.base_model.move_track_to_header(
self.new_row_number, existing_prd, note
)
else:
self.base_model.add_track_to_header(self.new_row_number, track_id)
# Close dialog - we can only add one track to a header
self.accept() self.accept()
else:
# Adding a new track row
if move_existing and existing_prd: # "and existing_prd" for mypy's benefit
self.base_model.move_track_add_note(
self.new_row_number, existing_prd, note
)
else:
self.base_model.insert_row(self.new_row_number, track_id, note)
self.new_row_number += 1 def add_and_close_clicked(self):
track_id = self.get_selected_track_id()
def add_selected_and_close(self) -> None: if track_id is not None:
"""Handle Add and Close button""" note_text = self.note_edit.text()
insert_track_data = InsertTrack(
self.add_selected() playlist_id=self.playlist_id, track_id=self.track_id, note=self.note
)
insert_track_data = InsertTrack(self.playlist_id, track_id, note_text)
self.signals.signal_insert_track.emit(insert_track_data)
self.accept() self.accept()
def chars_typed(self, s: str) -> None:
"""Handle text typed in search box"""
self.ui.matchList.clear()
if len(s) > 0:
if s.startswith("a/") and len(s) > 2:
matches = Tracks.search_artists(self.session, "%" + s[2:])
elif self.ui.radioTitle.isChecked():
matches = Tracks.search_titles(self.session, "%" + s)
else:
matches = Tracks.search_artists(self.session, "%" + s)
if matches:
for track in matches:
last_played = None
last_playdate = max(
track.playdates, key=lambda p: p.lastplayed, default=None
)
if last_playdate:
last_played = last_playdate.lastplayed
t = QListWidgetItem()
track_text = (
f"{track.title} - {track.artist} "
f"[{ms_to_mmss(track.duration)}] "
f"({get_relative_date(last_played)})"
)
t.setText(track_text)
t.setData(Qt.ItemDataRole.UserRole, track)
self.ui.matchList.addItem(t)
def closeEvent(self, event: Optional[QEvent]) -> None:
"""
Override close and save dialog coordinates
"""
if not event:
return
record = Settings.get_setting(self.session, "dbdialog_height")
record.f_int = self.height()
record = Settings.get_setting(self.session, "dbdialog_width")
record.f_int = self.width()
self.session.commit()
event.accept()
def keyPressEvent(self, event: QKeyEvent | None) -> None:
"""
Clear selection on ESC if there is one
"""
if event and event.key() == Qt.Key.Key_Escape:
if self.ui.matchList.selectedItems():
self.ui.matchList.clearSelection()
return
super(TrackSelectDialog, self).keyPressEvent(event)
def select_searchtext(self) -> None:
"""Select the searchbox"""
self.ui.searchString.selectAll()
self.ui.searchString.setFocus()
def selection_changed(self) -> None: def selection_changed(self) -> None:
"""Display selected track path in dialog box""" """Display selected track path in dialog box"""
if not self.ui.matchList.selectedItems(): self.path.setText("")
track_id = self.get_selected_track_id()
if track_id is None:
return return
item = self.ui.matchList.currentItem() tracklist = [t for t in self.tracks if t.track_id == track_id]
track = item.data(Qt.ItemDataRole.UserRole) if not tracklist:
last_playdate = max(track.playdates, key=lambda p: p.lastplayed, default=None) return
if last_playdate: if len(tracklist) > 1:
last_played = last_playdate.lastplayed raise ApplicationError("More than one track returned")
else: track = tracklist[0]
last_played = None
path_text = f"{track.path} ({get_relative_date(last_played)})"
self.ui.dbPath.setText(path_text) self.path.setText(track.path)
def title_artist_toggle(self) -> None:
"""
Handle switching between searching for artists and searching for
titles
"""
# Logic is handled already in chars_typed(), so just call that.
self.chars_typed(self.ui.searchString.text())

View File

@ -32,6 +32,7 @@ from classes import (
MusicMusterSignals, MusicMusterSignals,
singleton, singleton,
Tags, Tags,
TrackDTO,
) )
from config import Config from config import Config
from helpers import ( from helpers import (
@ -40,8 +41,7 @@ from helpers import (
show_OK, show_OK,
) )
from log import log from log import log
from models import db, Tracks from playlistrow import TrackSequence
from music_manager import track_sequence
from playlistmodel import PlaylistModel from playlistmodel import PlaylistModel
import helpers import helpers
@ -104,16 +104,14 @@ class FileImporter:
# variable or an instance variable are effectively the same thing. # variable or an instance variable are effectively the same thing.
workers: dict[str, DoTrackImport] = {} workers: dict[str, DoTrackImport] = {}
def __init__( def __init__(self, base_model: PlaylistModel, row_number: int) -> None:
self, base_model: PlaylistModel, row_number: Optional[int] = None
) -> None:
""" """
Initialise the FileImporter singleton instance. Initialise the FileImporter singleton instance.
""" """
log.debug(f"FileImporter.__init__({base_model=}, {row_number=})")
# Create ModelData # Create ModelData
if not row_number:
row_number = base_model.rowCount()
self.model_data = ThreadData(base_model=base_model, row_number=row_number) self.model_data = ThreadData(base_model=base_model, row_number=row_number)
# Data structure to track files to import # Data structure to track files to import
@ -122,13 +120,7 @@ class FileImporter:
# Get signals # Get signals
self.signals = MusicMusterSignals() self.signals = MusicMusterSignals()
def _get_existing_tracks(self) -> Sequence[Tracks]: self.existing_tracks: list[TrackDTO] = []
"""
Return a list of all existing Tracks
"""
with db.Session() as session:
return Tracks.get_all(session)
def start(self) -> None: def start(self) -> None:
""" """
@ -148,7 +140,7 @@ class FileImporter:
# Refresh list of existing tracks as they may have been updated # Refresh list of existing tracks as they may have been updated
# by previous imports # by previous imports
self.existing_tracks = self._get_existing_tracks() self.existing_tracks = repository.get_all_tracks()
for infile in [ for infile in [
os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f) os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f)
@ -701,6 +693,7 @@ class PickMatch(QDialog):
self.setWindowTitle("New or replace") self.setWindowTitle("New or replace")
layout = QVBoxLayout() layout = QVBoxLayout()
track_sequence = TrackSequence()
# Add instructions # Add instructions
instructions = ( instructions = (

View File

@ -20,7 +20,7 @@ from pydub.utils import mediainfo
from tinytag import TinyTag, TinyTagException # type: ignore from tinytag import TinyTag, TinyTagException # type: ignore
# App imports # App imports
from classes import AudioMetadata, ApplicationError, Tags from classes import AudioMetadata, ApplicationError, Tags, TrackDTO
from config import Config from config import Config
from log import log from log import log
from models import Tracks from models import Tracks
@ -168,7 +168,7 @@ def get_name(prompt: str, default: str = "") -> str | None:
def get_relative_date( def get_relative_date(
past_date: Optional[dt.datetime], reference_date: Optional[dt.datetime] = None past_date: Optional[dt.datetime], now: Optional[dt.datetime] = None
) -> str: ) -> str:
""" """
Return how long before reference_date past_date is as string. Return how long before reference_date past_date is as string.
@ -182,31 +182,33 @@ def get_relative_date(
if not past_date or past_date == Config.EPOCH: if not past_date or past_date == Config.EPOCH:
return "Never" return "Never"
if not reference_date: if not now:
reference_date = dt.datetime.now() now = dt.datetime.now()
# Check parameters # Check parameters
if past_date > reference_date: if past_date > now:
return "get_relative_date() past_date is after relative_date" raise ApplicationError("get_relative_date() past_date is after relative_date")
days: int delta = now - past_date
days_str: str days = delta.days
weeks: int
weeks_str: str
weeks, days = divmod((reference_date.date() - past_date.date()).days, 7) if days == 0:
if weeks == days == 0: return "(Today)"
# Same day so return time instead elif days == 1:
return Config.LAST_PLAYED_TODAY_STRING + " " + past_date.strftime("%H:%M") return "(Yesterday)"
if weeks == 1:
weeks_str = "week" years, days_remain = divmod(days, 365)
else: months, days_final = divmod(days_remain, 30)
weeks_str = "weeks"
if days == 1: parts = []
days_str = "day" if years:
else: parts.append(f"{years}y")
days_str = "days" if months:
return f"{weeks} {weeks_str}, {days} {days_str}" parts.append(f"{months}m")
if days_final:
parts.append(f"{days_final}d")
formatted = " ".join(parts)
return f"({formatted} ago)"
def get_tags(path: str) -> Tags: def get_tags(path: str) -> Tags:
@ -264,39 +266,15 @@ def leading_silence(
return min(trim_ms, len(audio_segment)) return min(trim_ms, len(audio_segment))
def ms_to_mmss( def ms_to_mmss(ms: int | None, none: str = "-") -> str:
ms: Optional[int],
decimals: int = 0,
negative: bool = False,
none: Optional[str] = None,
) -> str:
"""Convert milliseconds to mm:ss""" """Convert milliseconds to mm:ss"""
minutes: int if ms is None:
remainder: int return none
seconds: float
if not ms: minutes, seconds = divmod(ms // 1000, 60)
if none:
return none
else:
return "-"
sign = ""
if ms < 0:
if negative:
sign = "-"
else:
ms = 0
minutes, remainder = divmod(ms, 60 * 1000) return f"{minutes}:{seconds:02d}"
seconds = remainder / 1000
# if seconds >= 59.5, it will be represented as 60, which looks odd.
# So, fake it under those circumstances
if seconds >= 59.5:
seconds = 59.0
return f"{sign}{minutes:.0f}:{seconds:02.{decimals}f}"
def normalise_track(path: str) -> None: def normalise_track(path: str) -> None:

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# Standard library imports # Standard library imports
from collections import defaultdict from collections import defaultdict
from functools import wraps
import logging import logging
import logging.config import logging.config
import logging.handlers import logging.handlers
@ -79,9 +80,22 @@ log = logging.getLogger(Config.LOG_NAME)
def handle_exception(exc_type, exc_value, exc_traceback): def handle_exception(exc_type, exc_value, exc_traceback):
error = str(exc_value) """
Inform user of exception
"""
# Navigate to the inner stack frame
tb = exc_traceback
while tb.tb_next:
tb = tb.tb_next
fname = os.path.basename(tb.tb_frame.f_code.co_filename)
lineno = tb.tb_lineno
msg = f"ApplicationError: {exc_value}\nat {fname}:{lineno}"
logmsg = f"ApplicationError: {exc_value} at {fname}:{lineno}"
if issubclass(exc_type, ApplicationError): if issubclass(exc_type, ApplicationError):
log.error(error) log.error(logmsg)
else: else:
# Handle unexpected errors (log and display) # Handle unexpected errors (log and display)
error_msg = "".join(traceback.format_exception(exc_type, exc_value, exc_traceback)) error_msg = "".join(traceback.format_exception(exc_type, exc_value, exc_traceback))
@ -104,8 +118,33 @@ def handle_exception(exc_type, exc_value, exc_traceback):
) )
if QApplication.instance() is not None: if QApplication.instance() is not None:
fname = os.path.split(exc_traceback.tb_frame.f_code.co_filename)[1] fname = os.path.split(exc_traceback.tb_frame.f_code.co_filename)[1]
msg = f"ApplicationError: {error}\nat {fname}:{exc_traceback.tb_lineno}"
QMessageBox.critical(None, "Application Error", msg) QMessageBox.critical(None, "Application Error", msg)
def truncate_large(obj, limit=5):
"""Helper to truncate large lists or other iterables."""
if isinstance(obj, (list, tuple, set)):
if len(obj) > limit:
return f"{type(obj).__name__}(len={len(obj)}, items={list(obj)[:limit]}...)"
return repr(obj)
def log_call(func):
@wraps(func)
def wrapper(*args, **kwargs):
args_repr = [truncate_large(a) for a in args]
kwargs_repr = [f"{k}={truncate_large(v)}" for k, v in kwargs.items()]
params_repr = ", ".join(args_repr + kwargs_repr)
log.debug(f"call {func.__name__}({params_repr})", stacklevel=2)
try:
result = func(*args, **kwargs)
log.debug(f"return {func.__name__}: {truncate_large(result)}", stacklevel=2)
return result
except Exception as e:
log.debug(f"exception in {func.__name__}: {e}", stacklevel=2)
raise
return wrapper
sys.excepthook = handle_exception sys.excepthook = handle_exception

View File

@ -241,7 +241,9 @@ class Playlists(dbtables.PlaylistsTable):
""" """
session.execute( session.execute(
update(Playlists).where((Playlists.id.in_(playlist_ids))).values(tab=None) update(Playlists)
.where(Playlists.id.in_(playlist_ids))
.values(tab=None)
) )
def close(self, session: Session) -> None: def close(self, session: Session) -> None:
@ -395,6 +397,7 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
given playlist_id and row given playlist_id and row
""" """
# TODO: use selectinload?
stmt = ( stmt = (
select(PlaylistRows) select(PlaylistRows)
.options(joinedload(cls.track)) .options(joinedload(cls.track))
@ -435,24 +438,6 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
) )
) )
@staticmethod
def fixup_rownumbers(session: Session, playlist_id: int) -> None:
"""
Ensure the row numbers for passed playlist have no gaps
"""
plrs = session.scalars(
select(PlaylistRows)
.where(PlaylistRows.playlist_id == playlist_id)
.order_by(PlaylistRows.row_number)
).all()
for i, plr in enumerate(plrs):
plr.row_number = i
# Ensure new row numbers are available to the caller
session.commit()
@classmethod @classmethod
def plrids_to_plrs( def plrids_to_plrs(
cls, session: Session, playlist_id: int, plr_ids: list[int] cls, session: Session, playlist_id: int, plr_ids: list[int]

View File

@ -3,32 +3,23 @@ from __future__ import annotations
import datetime as dt import datetime as dt
from time import sleep from time import sleep
from typing import Optional
# Third party imports # Third party imports
# import line_profiler # import line_profiler
import numpy as np
import pyqtgraph as pg # type: ignore
from sqlalchemy.orm.session import Session
import vlc # type: ignore import vlc # type: ignore
# PyQt imports # PyQt imports
from PyQt6.QtCore import ( from PyQt6.QtCore import (
pyqtSignal, pyqtSignal,
QObject,
QThread, QThread,
) )
from pyqtgraph import PlotWidget
from pyqtgraph.graphicsItems.PlotDataItem import PlotDataItem # type: ignore
from pyqtgraph.graphicsItems.LinearRegionItem import LinearRegionItem # type: ignore
# App imports # App imports
from classes import ApplicationError, MusicMusterSignals from classes import singleton
from config import Config from config import Config
import helpers import helpers
from log import log from log import log
from models import PlaylistRows
from vlcmanager import VLCManager
# Define the VLC callback function type # Define the VLC callback function type
# import ctypes # import ctypes
@ -63,106 +54,6 @@ from vlcmanager import VLCManager
# libc.vsnprintf.restype = ctypes.c_int # libc.vsnprintf.restype = ctypes.c_int
class _AddFadeCurve(QObject):
"""
Initialising a fade curve introduces a noticeable delay so carry out in
a thread.
"""
finished = pyqtSignal()
def __init__(
self,
rat: RowAndTrack,
track_path: str,
track_fade_at: int,
track_silence_at: int,
) -> None:
super().__init__()
self.rat = rat
self.track_path = track_path
self.track_fade_at = track_fade_at
self.track_silence_at = track_silence_at
def run(self) -> None:
"""
Create fade curve and add to PlaylistTrack object
"""
fc = _FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at)
if not fc:
log.error(f"Failed to create FadeCurve for {self.track_path=}")
else:
self.rat.fade_graph = fc
self.finished.emit()
class _FadeCurve:
GraphWidget: Optional[PlotWidget] = None
def __init__(
self, track_path: str, track_fade_at: int, track_silence_at: int
) -> None:
"""
Set up fade graph array
"""
audio = helpers.get_audio_segment(track_path)
if not audio:
log.error(f"FadeCurve: could not get audio for {track_path=}")
return None
# Start point of curve is Config.FADE_CURVE_MS_BEFORE_FADE
# milliseconds before fade starts to silence
self.start_ms: int = max(
0, track_fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1
)
self.end_ms: int = track_silence_at
audio_segment = audio[self.start_ms : self.end_ms]
self.graph_array = np.array(audio_segment.get_array_of_samples())
# Calculate the factor to map milliseconds of track to array
self.ms_to_array_factor = len(self.graph_array) / (self.end_ms - self.start_ms)
self.curve: Optional[PlotDataItem] = None
self.region: Optional[LinearRegionItem] = None
def clear(self) -> None:
"""Clear the current graph"""
if self.GraphWidget:
self.GraphWidget.clear()
def plot(self) -> None:
if self.GraphWidget:
self.curve = self.GraphWidget.plot(self.graph_array)
if self.curve:
self.curve.setPen(Config.FADE_CURVE_FOREGROUND)
else:
log.debug("_FadeCurve.plot: no curve")
else:
log.debug("_FadeCurve.plot: no GraphWidget")
def tick(self, play_time: int) -> None:
"""Update volume fade curve"""
if not self.GraphWidget:
return
ms_of_graph = play_time - self.start_ms
if ms_of_graph < 0:
return
if self.region is None:
# Create the region now that we're into fade
self.region = pg.LinearRegionItem([0, 0], bounds=[0, len(self.graph_array)])
self.GraphWidget.addItem(self.region)
# Update region position
if self.region:
self.region.setRegion([0, ms_of_graph * self.ms_to_array_factor])
class _FadeTrack(QThread): class _FadeTrack(QThread):
finished = pyqtSignal() finished = pyqtSignal()
@ -196,21 +87,32 @@ class _FadeTrack(QThread):
self.finished.emit() self.finished.emit()
# TODO can we move this into the _Music class? @singleton
vlc_instance = VLCManager().vlc_instance class VLCManager:
"""
Singleton class to ensure we only ever have one vlc Instance
"""
def __init__(self) -> None:
self.vlc_instance = vlc.Instance()
def get_instance(self) -> vlc.Instance:
return self.vlc_instance
class _Music: class Music:
""" """
Manage the playing of music tracks Manage the playing of music tracks
""" """
def __init__(self, name: str) -> None: def __init__(self, name: str) -> None:
vlc_instance.set_user_agent(name, name)
self.player: Optional[vlc.MediaPlayer] = None
self.name = name self.name = name
vlc_manager = VLCManager()
self.vlc_instance = vlc_manager.get_instance()
self.vlc_instance.set_user_agent(name, name)
self.player: vlc.MediaPlayer | None = None
self.max_volume: int = Config.VLC_VOLUME_DEFAULT self.max_volume: int = Config.VLC_VOLUME_DEFAULT
self.start_dt: Optional[dt.datetime] = None self.start_dt: dt.datetime | None = None
# Set up logging # Set up logging
# self._set_vlc_log() # self._set_vlc_log()
@ -238,27 +140,6 @@ class _Music:
# except Exception as e: # except Exception as e:
# log.error(f"Failed to set up VLC logging: {e}") # log.error(f"Failed to set up VLC logging: {e}")
def adjust_by_ms(self, ms: int) -> None:
"""Move player position by ms milliseconds"""
if not self.player:
return
elapsed_ms = self.get_playtime()
position = self.get_position()
if not position:
position = 0.0
new_position = max(0.0, position + ((position * ms) / elapsed_ms))
self.set_position(new_position)
# Adjus start time so elapsed time calculations are correct
if new_position == 0:
self.start_dt = dt.datetime.now()
else:
if self.start_dt:
self.start_dt -= dt.timedelta(milliseconds=ms)
else:
self.start_dt = dt.datetime.now() - dt.timedelta(milliseconds=ms)
def fade(self, fade_seconds: int) -> None: def fade(self, fade_seconds: int) -> None:
""" """
Fade the currently playing track. Fade the currently playing track.
@ -292,11 +173,11 @@ class _Music:
elapsed_seconds = (now - self.start_dt).total_seconds() elapsed_seconds = (now - self.start_dt).total_seconds()
return int(elapsed_seconds * 1000) return int(elapsed_seconds * 1000)
def get_position(self) -> Optional[float]: def get_position(self) -> float:
"""Return current position""" """Return current position"""
if not self.player: if not self.player:
return None return 0.0
return self.player.get_position() return self.player.get_position()
def is_playing(self) -> bool: def is_playing(self) -> bool:
@ -321,7 +202,7 @@ class _Music:
self, self,
path: str, path: str,
start_time: dt.datetime, start_time: dt.datetime,
position: Optional[float] = None, position: float | None = None,
) -> None: ) -> None:
""" """
Start playing the track at path. Start playing the track at path.
@ -338,7 +219,7 @@ class _Music:
log.error(f"play({path}): path not readable") log.error(f"play({path}): path not readable")
return None return None
self.player = vlc.MediaPlayer(vlc_instance, path) self.player = vlc.MediaPlayer(self.vlc_instance, path)
if self.player is None: if self.player is None:
log.error(f"_Music:play: failed to create MediaPlayer ({path=})") log.error(f"_Music:play: failed to create MediaPlayer ({path=})")
helpers.show_warning( helpers.show_warning(
@ -353,21 +234,6 @@ class _Music:
self.player.set_position(position) self.player.set_position(position)
self.start_dt = start_time self.start_dt = start_time
# For as-yet unknown reasons. sometimes the volume gets
# reset to zero within 200mS or so of starting play. This
# only happened since moving to Debian 12, which uses
# Pipewire for sound (which may be irrelevant).
# It has been known for the volume to need correcting more
# than once in the first 200mS.
# Update August 2024: This no longer seems to be an issue
# for _ in range(3):
# if self.player:
# volume = self.player.audio_get_volume()
# if volume < Config.VLC_VOLUME_DEFAULT:
# self.set_volume(Config.VLC_VOLUME_DEFAULT)
# log.error(f"Reset from {volume=}")
# sleep(0.1)
def set_position(self, position: float) -> None: def set_position(self, position: float) -> None:
""" """
Set player position Set player position
@ -377,7 +243,7 @@ class _Music:
self.player.set_position(position) self.player.set_position(position)
def set_volume( def set_volume(
self, volume: Optional[int] = None, set_default: bool = True self, volume: int | None = None, set_default: bool = True
) -> None: ) -> None:
"""Set maximum volume used for player""" """Set maximum volume used for player"""
@ -417,333 +283,3 @@ class _Music:
self.player.stop() self.player.stop()
self.player.release() self.player.release()
self.player = None self.player = None
class RowAndTrack:
"""
Object to manage playlist rows and tracks.
"""
def __init__(self, playlist_row: PlaylistRows) -> None:
"""
Initialises data structure.
The passed PlaylistRows object will include a Tracks object if this
row has a track.
"""
# Collect playlistrow data
self.note = playlist_row.note
self.played = playlist_row.played
self.playlist_id = playlist_row.playlist_id
self.playlistrow_id = playlist_row.id
self.row_number = playlist_row.row_number
self.track_id = playlist_row.track_id
# Playlist display data
self.row_fg: Optional[str] = None
self.row_bg: Optional[str] = None
self.note_fg: Optional[str] = None
self.note_bg: Optional[str] = None
# Collect track data if there's a track
if playlist_row.track_id:
self.artist = playlist_row.track.artist
self.bitrate = playlist_row.track.bitrate
self.duration = playlist_row.track.duration
self.fade_at = playlist_row.track.fade_at
self.intro = playlist_row.track.intro
if playlist_row.track.playdates:
self.lastplayed = max(
[a.lastplayed for a in playlist_row.track.playdates]
)
else:
self.lastplayed = Config.EPOCH
self.path = playlist_row.track.path
self.silence_at = playlist_row.track.silence_at
self.start_gap = playlist_row.track.start_gap
self.title = playlist_row.track.title
else:
self.artist = ""
self.bitrate = 0
self.duration = 0
self.fade_at = 0
self.intro = None
self.lastplayed = Config.EPOCH
self.path = ""
self.silence_at = 0
self.start_gap = 0
self.title = ""
# Track playing data
self.end_of_track_signalled: bool = False
self.end_time: Optional[dt.datetime] = None
self.fade_graph: Optional[_FadeCurve] = None
self.fade_graph_start_updates: Optional[dt.datetime] = None
self.resume_marker: Optional[float] = 0.0
self.forecast_end_time: Optional[dt.datetime] = None
self.forecast_start_time: Optional[dt.datetime] = None
self.start_time: Optional[dt.datetime] = None
# Other object initialisation
self.music = _Music(name=Config.VLC_MAIN_PLAYER_NAME)
self.signals = MusicMusterSignals()
def __repr__(self) -> str:
return (
f"<RowAndTrack(playlist_id={self.playlist_id}, "
f"row_number={self.row_number}, "
f"playlistrow_id={self.playlistrow_id}, "
f"note={self.note}, track_id={self.track_id}>"
)
def check_for_end_of_track(self) -> None:
"""
Check whether track has ended. If so, emit track_ended_signal
"""
if self.start_time is None:
return
if self.end_of_track_signalled:
return
if self.music.is_playing():
return
self.start_time = None
if self.fade_graph:
self.fade_graph.clear()
# Ensure that player is released
self.music.fade(0)
self.signals.track_ended_signal.emit()
self.end_of_track_signalled = True
def create_fade_graph(self) -> None:
"""
Initialise and add FadeCurve in a thread as it's slow
"""
self.fadecurve_thread = QThread()
self.worker = _AddFadeCurve(
self,
track_path=self.path,
track_fade_at=self.fade_at,
track_silence_at=self.silence_at,
)
self.worker.moveToThread(self.fadecurve_thread)
self.fadecurve_thread.started.connect(self.worker.run)
self.worker.finished.connect(self.fadecurve_thread.quit)
self.worker.finished.connect(self.worker.deleteLater)
self.fadecurve_thread.finished.connect(self.fadecurve_thread.deleteLater)
self.fadecurve_thread.start()
def drop3db(self, enable: bool) -> None:
"""
If enable is true, drop output by 3db else restore to full volume
"""
if enable:
self.music.set_volume(volume=Config.VLC_VOLUME_DROP3db, set_default=False)
else:
self.music.set_volume(volume=Config.VLC_VOLUME_DEFAULT, set_default=False)
def fade(self, fade_seconds: int = Config.FADEOUT_SECONDS) -> None:
"""Fade music"""
self.resume_marker = self.music.get_position()
self.music.fade(fade_seconds)
self.signals.track_ended_signal.emit()
def is_playing(self) -> bool:
"""
Return True if we're currently playing else False
"""
if self.start_time is None:
return False
return self.music.is_playing()
def move_back(self, ms: int = Config.PREVIEW_BACK_MS) -> None:
"""
Rewind player by ms milliseconds
"""
self.music.adjust_by_ms(ms * -1)
def move_forward(self, ms: int = Config.PREVIEW_ADVANCE_MS) -> None:
"""
Rewind player by ms milliseconds
"""
self.music.adjust_by_ms(ms)
def play(self, position: Optional[float] = None) -> None:
"""Play track"""
now = dt.datetime.now()
self.start_time = now
# Initialise player
self.music.play(self.path, start_time=now, position=position)
self.end_time = now + dt.timedelta(milliseconds=self.duration)
# Calculate time fade_graph should start updating
if self.fade_at:
update_graph_at_ms = max(
0, self.fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1
)
self.fade_graph_start_updates = now + dt.timedelta(
milliseconds=update_graph_at_ms
)
def restart(self) -> None:
"""
Restart player
"""
self.music.adjust_by_ms(self.time_playing() * -1)
def set_forecast_start_time(
self, modified_rows: list[int], start: Optional[dt.datetime]
) -> Optional[dt.datetime]:
"""
Set forecast start time for this row
Update passed modified rows list if we changed the row.
Return new start time
"""
changed = False
if self.forecast_start_time != start:
self.forecast_start_time = start
changed = True
if start is None:
if self.forecast_end_time is not None:
self.forecast_end_time = None
changed = True
new_start_time = None
else:
end_time = start + dt.timedelta(milliseconds=self.duration)
new_start_time = end_time
if self.forecast_end_time != end_time:
self.forecast_end_time = end_time
changed = True
if changed and self.row_number not in modified_rows:
modified_rows.append(self.row_number)
return new_start_time
def stop(self, fade_seconds: int = 0) -> None:
"""
Stop this track playing
"""
self.resume_marker = self.music.get_position()
self.fade(fade_seconds)
# Reset fade graph
if self.fade_graph:
self.fade_graph.clear()
def time_playing(self) -> int:
"""
Return time track has been playing in milliseconds, zero if not playing
"""
if self.start_time is None:
return 0
return self.music.get_playtime()
def time_remaining_intro(self) -> int:
"""
Return milliseconds of intro remaining. Return 0 if no intro time in track
record or if intro has finished.
"""
if not self.intro:
return 0
return max(0, self.intro - self.time_playing())
def time_to_fade(self) -> int:
"""
Return milliseconds until fade time. Return zero if we're not playing.
"""
if self.start_time is None:
return 0
return self.fade_at - self.time_playing()
def time_to_silence(self) -> int:
"""
Return milliseconds until silent. Return zero if we're not playing.
"""
if self.start_time is None:
return 0
return self.silence_at - self.time_playing()
def update_fade_graph(self) -> None:
"""
Update fade graph
"""
if (
not self.is_playing()
or not self.fade_graph_start_updates
or not self.fade_graph
):
return
now = dt.datetime.now()
if self.fade_graph_start_updates > now:
return
self.fade_graph.tick(self.time_playing())
def update_playlist_and_row(self, session: Session) -> None:
"""
Update local playlist_id and row_number from playlistrow_id
"""
plr = session.get(PlaylistRows, self.playlistrow_id)
if not plr:
raise ApplicationError(f"(Can't retrieve PlaylistRows entry, {self=}")
self.playlist_id = plr.playlist_id
self.row_number = plr.row_number
class TrackSequence:
next: Optional[RowAndTrack] = None
current: Optional[RowAndTrack] = None
previous: Optional[RowAndTrack] = None
def set_next(self, rat: Optional[RowAndTrack]) -> None:
"""
Set the 'next' track to be passed rat. Clear
any previous next track. If passed rat is None
just clear existing next track.
"""
# Clear any existing fade graph
if self.next and self.next.fade_graph:
self.next.fade_graph.clear()
if rat is None:
self.next = None
else:
self.next = rat
self.next.create_fade_graph()
track_sequence = TrackSequence()

View File

@ -70,12 +70,12 @@ from classes import (
TrackInfo, TrackInfo,
) )
from config import Config from config import Config
from dialogs import TrackSelectDialog from dialogs import TrackInsertDialog
from file_importer import FileImporter from file_importer import FileImporter
from helpers import ask_yes_no, file_is_unreadable, get_name from helpers import ask_yes_no, file_is_unreadable, get_name
from log import log from log import log, log_call
from models import db, Playdates, PlaylistRows, Playlists, Queries, Settings, Tracks from models import db, Playdates, PlaylistRows, Playlists, Queries, Settings, Tracks
from music_manager import RowAndTrack, track_sequence from playlistrow import PlaylistRow, TrackSequence
from playlistmodel import PlaylistModel, PlaylistProxyModel from playlistmodel import PlaylistModel, PlaylistProxyModel
from playlists import PlaylistTab from playlists import PlaylistTab
from querylistmodel import QuerylistModel from querylistmodel import QuerylistModel
@ -94,12 +94,12 @@ class Current:
base_model: PlaylistModel base_model: PlaylistModel
proxy_model: PlaylistProxyModel proxy_model: PlaylistProxyModel
playlist_id: int = 0 playlist_id: int = 0
selected_rows: list[int] = [] selected_row_numbers: list[int] = []
def __repr__(self): def __repr__(self):
return ( return (
f"<Current(base_model={self.base_model}, proxy_model={self.proxy_model}, " f"<Current(base_model={self.base_model}, proxy_model={self.proxy_model}, "
f"playlist_id={self.playlist_id}, selected_rows={self.selected_rows}>" f"playlist_id={self.playlist_id}, selected_rows={self.selected_row_numbers}>"
) )
@ -478,6 +478,7 @@ class ManageQueries(ItemlistManager):
self.populate_table(query_list) self.populate_table(query_list)
@log_call
def delete_item(self, query_id: int) -> None: def delete_item(self, query_id: int) -> None:
"""delete query""" """delete query"""
@ -490,7 +491,6 @@ class ManageQueries(ItemlistManager):
"Delete query", "Delete query",
f"Delete query '{query.name}': " "Are you sure?", f"Delete query '{query.name}': " "Are you sure?",
): ):
log.debug(f"manage_queries: delete {query=}")
self.session.delete(query) self.session.delete(query)
self.session.commit() self.session.commit()
@ -583,6 +583,7 @@ class ManageTemplates(ItemlistManager):
self.populate_table(template_list) self.populate_table(template_list)
@log_call
def delete_item(self, template_id: int) -> None: def delete_item(self, template_id: int) -> None:
"""delete template""" """delete template"""
@ -606,7 +607,6 @@ class ManageTemplates(ItemlistManager):
else: else:
self.musicmuster.playlist_section.tabPlaylist.removeTab(open_idx) self.musicmuster.playlist_section.tabPlaylist.removeTab(open_idx)
log.debug(f"manage_templates: delete {template=}")
self.session.delete(template) self.session.delete(template)
self.session.commit() self.session.commit()
@ -1180,7 +1180,7 @@ class Window(QMainWindow):
self.footer_section.widgetFadeVolume.setDefaultPadding(0) self.footer_section.widgetFadeVolume.setDefaultPadding(0)
self.footer_section.widgetFadeVolume.setBackground(Config.FADE_CURVE_BACKGROUND) self.footer_section.widgetFadeVolume.setBackground(Config.FADE_CURVE_BACKGROUND)
self.move_source_rows: Optional[list[int]] = None self.move_source_rows: list[PlaylistRow] = []
self.move_source_model: Optional[PlaylistModel] = None self.move_source_model: Optional[PlaylistModel] = None
self.disable_selection_timing = False self.disable_selection_timing = False
@ -1194,6 +1194,7 @@ class Window(QMainWindow):
self.catch_return_key = False self.catch_return_key = False
self.importer: Optional[FileImporter] = None self.importer: Optional[FileImporter] = None
self.current = Current() self.current = Current()
self.track_sequence = TrackSequence()
webbrowser.register( webbrowser.register(
"browser", "browser",
@ -1217,7 +1218,7 @@ class Window(QMainWindow):
return return
# Don't allow window to close when a track is playing # Don't allow window to close when a track is playing
if track_sequence.current and track_sequence.current.is_playing(): if self.track_sequence.current and self.track_sequence.current.is_playing():
event.ignore() event.ignore()
helpers.show_warning( helpers.show_warning(
self, "Track playing", "Can't close application while track is playing" self, "Track playing", "Can't close application while track is playing"
@ -1234,7 +1235,6 @@ class Window(QMainWindow):
for playlist_id, idx in open_playlist_ids.items(): for playlist_id, idx in open_playlist_ids.items():
playlist = session.get(Playlists, playlist_id) playlist = session.get(Playlists, playlist_id)
if playlist: if playlist:
log.debug(f"Set {playlist=} tab to {idx=}")
playlist.tab = idx playlist.tab = idx
# Save window attributes # Save window attributes
@ -1255,9 +1255,6 @@ class Window(QMainWindow):
# # # # # # # # # # Internal utility functions # # # # # # # # # # # # # # # # # # # # Internal utility functions # # # # # # # # # #
def active_base_model(self) -> PlaylistModel:
return self.current.base_model
def active_tab(self) -> PlaylistTab: def active_tab(self) -> PlaylistTab:
return self.playlist_section.tabPlaylist.currentWidget() return self.playlist_section.tabPlaylist.currentWidget()
@ -1455,6 +1452,7 @@ class Window(QMainWindow):
# # # # # # # # # # Playlist management functions # # # # # # # # # # # # # # # # # # # # Playlist management functions # # # # # # # # # #
@log_call
def _create_playlist( def _create_playlist(
self, session: Session, name: str, template_id: int self, session: Session, name: str, template_id: int
) -> Playlists: ) -> Playlists:
@ -1463,10 +1461,9 @@ class Window(QMainWindow):
if template_id > 0, and return the Playlists object. if template_id > 0, and return the Playlists object.
""" """
log.debug(f" _create_playlist({name=}, {template_id=})")
return Playlists(session, name, template_id) return Playlists(session, name, template_id)
@log_call
def _open_playlist(self, playlist: Playlists, is_template: bool = False) -> int: def _open_playlist(self, playlist: Playlists, is_template: bool = False) -> int:
""" """
With passed playlist: With passed playlist:
@ -1477,8 +1474,6 @@ class Window(QMainWindow):
return: tab index return: tab index
""" """
log.debug(f" _open_playlist({playlist=}, {is_template=})")
# Create base model and proxy model # Create base model and proxy model
base_model = PlaylistModel(playlist.id, is_template) base_model = PlaylistModel(playlist.id, is_template)
proxy_model = PlaylistProxyModel() proxy_model = PlaylistProxyModel()
@ -1497,6 +1492,7 @@ class Window(QMainWindow):
return idx return idx
@log_call
def create_playlist_from_template(self, session: Session, template_id: int) -> None: def create_playlist_from_template(self, session: Session, template_id: int) -> None:
""" """
Prompt for new playlist name and create from passed template_id Prompt for new playlist name and create from passed template_id
@ -1518,7 +1514,8 @@ class Window(QMainWindow):
self._open_playlist(playlist) self._open_playlist(playlist)
session.commit() session.commit()
def delete_playlist(self) -> None: @log_call
def delete_playlist(self, checked: bool = False) -> None:
""" """
Delete current playlist Delete current playlist
""" """
@ -1537,7 +1534,7 @@ class Window(QMainWindow):
else: else:
log.error("Failed to retrieve playlist") log.error("Failed to retrieve playlist")
def open_existing_playlist(self) -> None: def open_existing_playlist(self, checked: bool = False) -> None:
"""Open existing playlist""" """Open existing playlist"""
with db.Session() as session: with db.Session() as session:
@ -1549,7 +1546,7 @@ class Window(QMainWindow):
self._open_playlist(playlist) self._open_playlist(playlist)
session.commit() session.commit()
def save_as_template(self) -> None: def save_as_template(self, checked: bool = False) -> None:
"""Save current playlist as template""" """Save current playlist as template"""
with db.Session() as session: with db.Session() as session:
@ -1625,7 +1622,7 @@ class Window(QMainWindow):
# # # # # # # # # # Manage templates and queries # # # # # # # # # # # # # # # # # # # # Manage templates and queries # # # # # # # # # #
def manage_queries_wrapper(self): def manage_queries_wrapper(self, checked: bool = False) -> None:
""" """
Simply instantiate the manage_queries class Simply instantiate the manage_queries class
""" """
@ -1633,7 +1630,7 @@ class Window(QMainWindow):
with db.Session() as session: with db.Session() as session:
_ = ManageQueries(session, self) _ = ManageQueries(session, self)
def manage_templates_wrapper(self): def manage_templates_wrapper(self, checked: bool = False) -> None:
""" """
Simply instantiate the manage_queries class Simply instantiate the manage_queries class
""" """
@ -1643,12 +1640,12 @@ class Window(QMainWindow):
# # # # # # # # # # Miscellaneous functions # # # # # # # # # # # # # # # # # # # # Miscellaneous functions # # # # # # # # # #
def select_duplicate_rows(self) -> None: def select_duplicate_rows(self, checked: bool = False) -> None:
"""Call playlist to select duplicate rows""" """Call playlist to select duplicate rows"""
self.active_tab().select_duplicate_rows() self.active_tab().select_duplicate_rows()
def about(self) -> None: def about(self, checked: bool = False) -> None:
"""Get git tag and database name""" """Get git tag and database name"""
try: try:
@ -1674,10 +1671,10 @@ class Window(QMainWindow):
Clear next track Clear next track
""" """
track_sequence.set_next(None) self.track_sequence.set_next(None)
self.update_headers() self.update_headers()
def clear_selection(self) -> None: def clear_selection(self, checked: bool = False) -> None:
"""Clear row selection""" """Clear row selection"""
# Unselect any selected rows # Unselect any selected rows
@ -1687,7 +1684,7 @@ class Window(QMainWindow):
# Clear the search bar # Clear the search bar
self.search_playlist_clear() self.search_playlist_clear()
def close_playlist_tab(self) -> bool: def close_playlist_tab(self, checked: bool = False) -> bool:
""" """
Close active playlist tab, called by menu item Close active playlist tab, called by menu item
""" """
@ -1707,8 +1704,8 @@ class Window(QMainWindow):
).playlist_id ).playlist_id
# Don't close current track playlist # Don't close current track playlist
if track_sequence.current is not None: if self.track_sequence.current is not None:
current_track_playlist_id = track_sequence.current.playlist_id current_track_playlist_id = self.track_sequence.current.playlist_id
if current_track_playlist_id: if current_track_playlist_id:
if closing_tab_playlist_id == current_track_playlist_id: if closing_tab_playlist_id == current_track_playlist_id:
helpers.show_OK( helpers.show_OK(
@ -1717,8 +1714,8 @@ class Window(QMainWindow):
return False return False
# Don't close next track playlist # Don't close next track playlist
if track_sequence.next is not None: if self.track_sequence.next is not None:
next_track_playlist_id = track_sequence.next.playlist_id next_track_playlist_id = self.track_sequence.next.playlist_id
if next_track_playlist_id: if next_track_playlist_id:
if closing_tab_playlist_id == next_track_playlist_id: if closing_tab_playlist_id == next_track_playlist_id:
helpers.show_OK( helpers.show_OK(
@ -1773,6 +1770,7 @@ class Window(QMainWindow):
self.signals.search_songfacts_signal.connect(self.open_songfacts_browser) self.signals.search_songfacts_signal.connect(self.open_songfacts_browser)
self.signals.search_wikipedia_signal.connect(self.open_wikipedia_browser) self.signals.search_wikipedia_signal.connect(self.open_wikipedia_browser)
@log_call
def current_row_or_end(self) -> int: def current_row_or_end(self) -> int:
""" """
If a row or rows are selected, return the row number of the first If a row or rows are selected, return the row number of the first
@ -1780,18 +1778,18 @@ class Window(QMainWindow):
of the playlist. of the playlist.
""" """
if self.current.selected_rows: if self.current.selected_row_numbers:
return self.current.selected_rows[0] return self.current.selected_row_numbers[0]
return self.current.base_model.rowCount() return self.current.base_model.rowCount()
def debug(self): def debug(self, checked: bool = False) -> None:
"""Invoke debugger""" """Invoke debugger"""
import ipdb # type: ignore import ipdb # type: ignore
ipdb.set_trace() ipdb.set_trace()
def download_played_tracks(self) -> None: def download_played_tracks(self, checked: bool = False) -> None:
"""Download a CSV of played tracks""" """Download a CSV of played tracks"""
dlg = DownloadCSV(self) dlg = DownloadCSV(self)
@ -1819,9 +1817,10 @@ class Window(QMainWindow):
def drop3db(self) -> None: def drop3db(self) -> None:
"""Drop music level by 3db if button checked""" """Drop music level by 3db if button checked"""
if track_sequence.current: if self.track_sequence.current:
track_sequence.current.drop3db(self.footer_section.btnDrop3db.isChecked()) self.track_sequence.current.drop3db(self.footer_section.btnDrop3db.isChecked())
@log_call
def enable_escape(self, enabled: bool) -> None: def enable_escape(self, enabled: bool) -> None:
""" """
Manage signal to enable/disable handling ESC character. Manage signal to enable/disable handling ESC character.
@ -1830,11 +1829,10 @@ class Window(QMainWindow):
so we need to disable it here while editing. so we need to disable it here while editing.
""" """
log.debug(f"enable_escape({enabled=})")
if "clear_selection" in self.menu_actions: if "clear_selection" in self.menu_actions:
self.menu_actions["clear_selection"].setEnabled(enabled) self.menu_actions["clear_selection"].setEnabled(enabled)
@log_call
def end_of_track_actions(self) -> None: def end_of_track_actions(self) -> None:
""" """
@ -1846,13 +1844,8 @@ class Window(QMainWindow):
- Enable controls - Enable controls
""" """
if track_sequence.current: if self.track_sequence.current:
# Dereference the fade curve so it can be garbage collected self.track_sequence.move_current_to_previous()
track_sequence.current.fade_graph = None
# Reset track_sequence objects
track_sequence.previous = track_sequence.current
track_sequence.current = None
# Tell playlist previous track has finished # Tell playlist previous track has finished
self.current.base_model.previous_track_ended() self.current.base_model.previous_track_ended()
@ -1874,7 +1867,7 @@ class Window(QMainWindow):
# if not self.stop_autoplay: # if not self.stop_autoplay:
# self.play_next() # self.play_next()
def export_playlist_tab(self) -> None: def export_playlist_tab(self, checked: bool = False) -> None:
"""Export the current playlist to an m3u file""" """Export the current playlist to an m3u file"""
playlist_id = self.current.playlist_id playlist_id = self.current.playlist_id
@ -1915,11 +1908,11 @@ class Window(QMainWindow):
"\n" "\n"
) )
def fade(self) -> None: def fade(self, checked: bool = False) -> None:
"""Fade currently playing track""" """Fade currently playing track"""
if track_sequence.current: if self.track_sequence.current:
track_sequence.current.fade() self.track_sequence.current.fade()
def get_tab_index_for_playlist(self, playlist_id: int) -> Optional[int]: def get_tab_index_for_playlist(self, playlist_id: int) -> Optional[int]:
""" """
@ -1951,7 +1944,7 @@ class Window(QMainWindow):
# Reset row heights # Reset row heights
self.active_tab().resize_rows() self.active_tab().resize_rows()
def import_files_wrapper(self) -> None: def import_files_wrapper(self, checked: bool = False) -> None:
""" """
Pass import files call to file_importer module Pass import files call to file_importer module
""" """
@ -1961,7 +1954,7 @@ class Window(QMainWindow):
self.importer = FileImporter(self.current.base_model, self.current_row_or_end()) self.importer = FileImporter(self.current.base_model, self.current_row_or_end())
self.importer.start() self.importer.start()
def insert_header(self) -> None: def insert_header(self, checked: bool = False) -> None:
"""Show dialog box to enter header text and add to playlist""" """Show dialog box to enter header text and add to playlist"""
# Get header text # Get header text
@ -1976,19 +1969,16 @@ class Window(QMainWindow):
note=dlg.textValue(), note=dlg.textValue(),
) )
def insert_track(self) -> None: def insert_track(self, checked: bool = False) -> None:
"""Show dialog box to select and add track from database""" """Show dialog box to select and add track from database"""
with db.Session() as session: dlg = TrackInsertDialog(
dlg = TrackSelectDialog( parent=self,
parent=self, playlist_id=self.active_tab().playlist_id
session=session, )
new_row_number=self.current_row_or_end(), dlg.exec()
base_model=self.current.base_model,
)
dlg.exec()
session.commit()
@log_call
def load_last_playlists(self) -> None: def load_last_playlists(self) -> None:
"""Load the playlists that were open when the last session closed""" """Load the playlists that were open when the last session closed"""
@ -1996,7 +1986,6 @@ class Window(QMainWindow):
with db.Session() as session: with db.Session() as session:
for playlist in Playlists.get_open(session): for playlist in Playlists.get_open(session):
if playlist: if playlist:
log.debug(f"load_last_playlists() loaded {playlist=}")
# Create tab # Create tab
playlist_ids.append(self._open_playlist(playlist)) playlist_ids.append(self._open_playlist(playlist))
@ -2012,7 +2001,7 @@ class Window(QMainWindow):
Playlists.clear_tabs(session, playlist_ids) Playlists.clear_tabs(session, playlist_ids)
session.commit() session.commit()
def lookup_row_in_songfacts(self) -> None: def lookup_row_in_songfacts(self, checked: bool = False) -> None:
""" """
Display songfacts page for title in highlighted row Display songfacts page for title in highlighted row
""" """
@ -2023,7 +2012,7 @@ class Window(QMainWindow):
self.signals.search_songfacts_signal.emit(track_info.title) self.signals.search_songfacts_signal.emit(track_info.title)
def lookup_row_in_wikipedia(self) -> None: def lookup_row_in_wikipedia(self, checked: bool = False) -> None:
""" """
Display Wikipedia page for title in highlighted row or next track Display Wikipedia page for title in highlighted row or next track
""" """
@ -2034,20 +2023,21 @@ class Window(QMainWindow):
self.signals.search_wikipedia_signal.emit(track_info.title) self.signals.search_wikipedia_signal.emit(track_info.title)
def mark_rows_for_moving(self) -> None: def mark_rows_for_moving(self, checked: bool = False) -> None:
""" """
Cut rows ready for pasting. Cut rows ready for pasting.
""" """
# Save the selected PlaylistRows items ready for a later # Save the selected PlaylistRows items ready for a later
# paste # paste
self.move_source_rows = self.current.selected_rows self.move_source_rows = self.current.base_model.selected_rows
self.move_source_model = self.current.base_model self.move_source_model = self.current.base_model
log.debug( log.debug(
f"mark_rows_for_moving(): {self.move_source_rows=} {self.move_source_model=}" f"mark_rows_for_moving(): {self.move_source_rows=} {self.move_source_model=}"
) )
@log_call
def move_playlist_rows(self, row_numbers: list[int]) -> None: def move_playlist_rows(self, row_numbers: list[int]) -> None:
""" """
Move passed playlist rows to another playlist Move passed playlist rows to another playlist
@ -2083,27 +2073,20 @@ class Window(QMainWindow):
) )
# Reset track_sequences # Reset track_sequences
with db.Session() as session: self.track_sequence.update()
for ts in [
track_sequence.next,
track_sequence.current,
track_sequence.previous,
]:
if ts:
ts.update_playlist_and_row(session)
def move_selected(self) -> None: def move_selected(self, checked: bool = False) -> None:
""" """
Move selected rows to another playlist Move selected rows to another playlist
""" """
selected_rows = self.current.selected_rows selected_rows = self.current.selected_row_numbers
if not selected_rows: if not selected_rows:
return return
self.move_playlist_rows(selected_rows) self.move_playlist_rows(selected_rows)
def move_unplayed(self) -> None: def move_unplayed(self, checked: bool = False) -> None:
""" """
Move unplayed rows to another playlist Move unplayed rows to another playlist
""" """
@ -2135,7 +2118,8 @@ class Window(QMainWindow):
webbrowser.get("browser").open_new_tab(url) webbrowser.get("browser").open_new_tab(url)
def paste_rows(self, dummy_for_profiling: int | None = None) -> None: @log_call
def paste_rows(self, checked: bool = False) -> None:
""" """
Paste earlier cut rows. Paste earlier cut rows.
""" """
@ -2150,9 +2134,9 @@ class Window(QMainWindow):
# that moved row the next track # that moved row the next track
set_next_row: Optional[int] = None set_next_row: Optional[int] = None
if ( if (
track_sequence.current self.track_sequence.current
and track_sequence.current.playlist_id == to_playlist_model.playlist_id and self.track_sequence.current.playlist_id == to_playlist_model.playlist_id
and destination_row == track_sequence.current.row_number + 1 and destination_row == self.track_sequence.current.row_number + 1
): ):
set_next_row = destination_row set_next_row = destination_row
@ -2168,7 +2152,8 @@ class Window(QMainWindow):
if set_next_row: if set_next_row:
to_playlist_model.set_next_row(set_next_row) to_playlist_model.set_next_row(set_next_row)
def play_next(self, position: Optional[float] = None) -> None: @log_call
def play_next(self, position: Optional[float] = None, checked: bool = False) -> None:
""" """
Play next track, optionally from passed position. Play next track, optionally from passed position.
@ -2188,7 +2173,7 @@ class Window(QMainWindow):
""" """
# If there is no next track set, return. # If there is no next track set, return.
if track_sequence.next is None: if self.track_sequence.next is None:
log.error("musicmuster.play_next(): no next track selected") log.error("musicmuster.play_next(): no next track selected")
return return
@ -2205,35 +2190,34 @@ class Window(QMainWindow):
log.debug("issue223: play_next: 10ms timer disabled") log.debug("issue223: play_next: 10ms timer disabled")
# If there's currently a track playing, fade it. # If there's currently a track playing, fade it.
if track_sequence.current: if self.track_sequence.current:
track_sequence.current.fade() self.track_sequence.current.fade()
# Move next track to current track. # Move next track to current track.
# end_of_track_actions() will have saved current track to # end_of_track_actions() will have saved current track to
# previous_track # previous_track
track_sequence.current = track_sequence.next self.track_sequence.move_next_to_current()
if self.track_sequence.current is None:
# Clear next track raise ApplicationError("No current track")
self.clear_next()
# Restore volume if -3dB active # Restore volume if -3dB active
if self.footer_section.btnDrop3db.isChecked(): if self.footer_section.btnDrop3db.isChecked():
self.footer_section.btnDrop3db.setChecked(False) self.footer_section.btnDrop3db.setChecked(False)
# Play (new) current track # Play (new) current track
log.debug(f"Play: {track_sequence.current.title}") log.debug(f"Play: {self.track_sequence.current.title}")
track_sequence.current.play(position) self.track_sequence.current.play(position)
# Update clocks now, don't wait for next tick # Update clocks now, don't wait for next tick
self.update_clocks() self.update_clocks()
# Show closing volume graph # Show closing volume graph
if track_sequence.current.fade_graph: if self.track_sequence.current.fade_graph:
track_sequence.current.fade_graph.GraphWidget = ( self.track_sequence.current.fade_graph.GraphWidget = (
self.footer_section.widgetFadeVolume self.footer_section.widgetFadeVolume
) )
track_sequence.current.fade_graph.clear() self.track_sequence.current.fade_graph.clear()
track_sequence.current.fade_graph.plot() self.track_sequence.current.fade_graph.plot()
# Disable play next controls # Disable play next controls
self.catch_return_key = True self.catch_return_key = True
@ -2266,10 +2250,10 @@ class Window(QMainWindow):
track_info = self.active_tab().get_selected_row_track_info() track_info = self.active_tab().get_selected_row_track_info()
if not track_info: if not track_info:
# Otherwise get track_id to next track to play # Otherwise get track_id to next track to play
if track_sequence.next: if self.track_sequence.next:
if track_sequence.next.track_id: if self.track_sequence.next.track_id:
track_info = TrackInfo( track_info = TrackInfo(
track_sequence.next.track_id, track_sequence.next.row_number self.track_sequence.next.track_id, self.track_sequence.next.row_number
) )
else: else:
return return
@ -2364,7 +2348,7 @@ class Window(QMainWindow):
if ok: if ok:
log.debug("quicklog: " + dlg.textValue()) log.debug("quicklog: " + dlg.textValue())
def rename_playlist(self) -> None: def rename_playlist(self, checked: bool = False) -> None:
""" """
Rename current playlist Rename current playlist
""" """
@ -2387,12 +2371,12 @@ class Window(QMainWindow):
Return True if it has, False if not Return True if it has, False if not
""" """
if track_sequence.current and self.catch_return_key: if self.track_sequence.current and self.catch_return_key:
# Suppress inadvertent double press # Suppress inadvertent double press
if ( if (
track_sequence.current self.track_sequence.current
and track_sequence.current.start_time and self.track_sequence.current.start_time
and track_sequence.current.start_time and self.track_sequence.current.start_time
+ dt.timedelta(milliseconds=Config.RETURN_KEY_DEBOUNCE_MS) + dt.timedelta(milliseconds=Config.RETURN_KEY_DEBOUNCE_MS)
> dt.datetime.now() > dt.datetime.now()
): ):
@ -2401,8 +2385,8 @@ class Window(QMainWindow):
# If return is pressed during first PLAY_NEXT_GUARD_MS then # If return is pressed during first PLAY_NEXT_GUARD_MS then
# default to NOT playing the next track, else default to # default to NOT playing the next track, else default to
# playing it. # playing it.
default_yes: bool = track_sequence.current.start_time is not None and ( default_yes: bool = self.track_sequence.current.start_time is not None and (
(dt.datetime.now() - track_sequence.current.start_time).total_seconds() (dt.datetime.now() - self.track_sequence.current.start_time).total_seconds()
* 1000 * 1000
> Config.PLAY_NEXT_GUARD_MS > Config.PLAY_NEXT_GUARD_MS
) )
@ -2420,7 +2404,7 @@ class Window(QMainWindow):
return False return False
def resume(self) -> None: def resume(self, checked: bool = False) -> None:
""" """
Resume playing last track. We may be playing the next track Resume playing last track. We may be playing the next track
or none; take care of both eventualities. or none; take care of both eventualities.
@ -2431,18 +2415,18 @@ class Window(QMainWindow):
- If a track is playing, make that the next track - If a track is playing, make that the next track
""" """
if not track_sequence.previous: if not self.track_sequence.previous:
return return
# Return if no saved position # Return if no saved position
resume_marker = track_sequence.previous.resume_marker resume_marker = self.track_sequence.previous.resume_marker
if not resume_marker: if not resume_marker:
log.error("No previous track position") log.error("No previous track position")
return return
# We want to use play_next() to resume, so copy the previous # We want to use play_next() to resume, so copy the previous
# track to the next track: # track to the next track:
track_sequence.set_next(track_sequence.previous) self.track_sequence.move_previous_to_next()
# Now resume playing the now-next track # Now resume playing the now-next track
self.play_next(resume_marker) self.play_next(resume_marker)
@ -2451,17 +2435,17 @@ class Window(QMainWindow):
# We need to fake the start time to reflect where we resumed the # We need to fake the start time to reflect where we resumed the
# track # track
if ( if (
track_sequence.current self.track_sequence.current
and track_sequence.current.start_time and self.track_sequence.current.start_time
and track_sequence.current.duration and self.track_sequence.current.duration
and track_sequence.current.resume_marker and self.track_sequence.current.resume_marker
): ):
elapsed_ms = ( elapsed_ms = (
track_sequence.current.duration * track_sequence.current.resume_marker self.track_sequence.current.duration * self.track_sequence.current.resume_marker
) )
track_sequence.current.start_time -= dt.timedelta(milliseconds=elapsed_ms) self.track_sequence.current.start_time -= dt.timedelta(milliseconds=elapsed_ms)
def search_playlist(self) -> None: def search_playlist(self, checked: bool = False) -> None:
"""Show text box to search playlist""" """Show text box to search playlist"""
# Disable play controls so that 'return' in search box doesn't # Disable play controls so that 'return' in search box doesn't
@ -2486,7 +2470,7 @@ class Window(QMainWindow):
self.current.proxy_model.set_incremental_search(self.txtSearch.text()) self.current.proxy_model.set_incremental_search(self.txtSearch.text())
def selected_or_next_track_info(self) -> Optional[RowAndTrack]: def selected_or_next_track_info(self) -> Optional[PlaylistRow]:
""" """
Return RowAndTrack info for selected track. If no selected track, return for Return RowAndTrack info for selected track. If no selected track, return for
next track. If no next track, return None. next track. If no next track, return None.
@ -2494,12 +2478,12 @@ class Window(QMainWindow):
row_number: Optional[int] = None row_number: Optional[int] = None
if self.current.selected_rows: if self.current.selected_row_numbers:
row_number = self.current.selected_rows[0] row_number = self.current.selected_row_numbers[0]
if row_number is None: if row_number is None:
if track_sequence.next: if self.track_sequence.next:
if track_sequence.next.track_id: if self.track_sequence.next.track_id:
row_number = track_sequence.next.row_number row_number = self.track_sequence.next.row_number
if row_number is None: if row_number is None:
return None return None
@ -2519,17 +2503,21 @@ class Window(QMainWindow):
height = Settings.get_setting(session, "mainwindow_height").f_int or 100 height = Settings.get_setting(session, "mainwindow_height").f_int or 100
self.setGeometry(x, y, width, height) self.setGeometry(x, y, width, height)
def set_selected_track_next(self) -> None: @log_call
def set_selected_track_next(self, checked: bool = False) -> None:
""" """
Set currently-selected row on visible playlist tab as next track Set currently-selected row on visible playlist tab as next track
""" """
playlist_tab = self.active_tab() self.signals.signal_set_next_row.emit(self.current.playlist_id)
if playlist_tab: self.clear_selection()
playlist_tab.set_row_as_next_track() # playlist_tab = self.active_tab()
else: # if playlist_tab:
log.error("No active tab") # playlist_tab.set_row_as_next_track()
# else:
# log.error("No active tab")
@log_call
def set_tab_colour(self, widget: PlaylistTab, colour: QColor) -> None: def set_tab_colour(self, widget: PlaylistTab, colour: QColor) -> None:
""" """
Find the tab containing the widget and set the text colour Find the tab containing the widget and set the text colour
@ -2541,8 +2529,8 @@ class Window(QMainWindow):
def show_current(self) -> None: def show_current(self) -> None:
"""Scroll to show current track""" """Scroll to show current track"""
if track_sequence.current: if self.track_sequence.current:
self.show_track(track_sequence.current) self.show_track(self.track_sequence.current)
def show_warning(self, title: str, body: str) -> None: def show_warning(self, title: str, body: str) -> None:
""" """
@ -2555,8 +2543,8 @@ class Window(QMainWindow):
def show_next(self) -> None: def show_next(self) -> None:
"""Scroll to show next track""" """Scroll to show next track"""
if track_sequence.next: if self.track_sequence.next:
self.show_track(track_sequence.next) self.show_track(self.track_sequence.next)
def show_status_message(self, message: str, timing: int) -> None: def show_status_message(self, message: str, timing: int) -> None:
""" """
@ -2570,7 +2558,7 @@ class Window(QMainWindow):
else: else:
self.statusbar.clearMessage() self.statusbar.clearMessage()
def show_track(self, playlist_track: RowAndTrack) -> None: def show_track(self, playlist_track: PlaylistRow) -> None:
"""Scroll to show track in plt""" """Scroll to show track in plt"""
# Switch to the correct tab # Switch to the correct tab
@ -2591,12 +2579,13 @@ class Window(QMainWindow):
self.active_tab().scroll_to_top(playlist_track.row_number) self.active_tab().scroll_to_top(playlist_track.row_number)
def stop(self) -> None: @log_call
def stop(self, checked: bool = False) -> None:
"""Stop playing immediately""" """Stop playing immediately"""
self.stop_autoplay = True self.stop_autoplay = True
if track_sequence.current: if self.track_sequence.current:
track_sequence.current.stop() self.track_sequence.current.stop()
def tab_change(self) -> None: def tab_change(self) -> None:
"""Called when active tab changed""" """Called when active tab changed"""
@ -2608,22 +2597,22 @@ class Window(QMainWindow):
Called every 10ms Called every 10ms
""" """
if track_sequence.current: if self.track_sequence.current:
track_sequence.current.update_fade_graph() self.track_sequence.current.update_fade_graph()
def tick_100ms(self) -> None: def tick_100ms(self) -> None:
""" """
Called every 100ms Called every 100ms
""" """
if track_sequence.current: if self.track_sequence.current:
try: try:
track_sequence.current.check_for_end_of_track() self.track_sequence.current.check_for_end_of_track()
# Update intro counter if applicable and, if updated, return # Update intro counter if applicable and, if updated, return
# because playing an intro takes precedence over timing a # because playing an intro takes precedence over timing a
# preview. # preview.
intro_ms_remaining = track_sequence.current.time_remaining_intro() intro_ms_remaining = self.track_sequence.current.time_remaining_intro()
if intro_ms_remaining > 0: if intro_ms_remaining > 0:
self.footer_section.label_intro_timer.setText( self.footer_section.label_intro_timer.setText(
f"{intro_ms_remaining / 1000:.1f}" f"{intro_ms_remaining / 1000:.1f}"
@ -2683,17 +2672,17 @@ class Window(QMainWindow):
""" """
# If track is playing, update track clocks time and colours # If track is playing, update track clocks time and colours
if track_sequence.current and track_sequence.current.is_playing(): if self.track_sequence.current and self.track_sequence.current.is_playing():
# Elapsed time # Elapsed time
self.header_section.label_elapsed_timer.setText( self.header_section.label_elapsed_timer.setText(
helpers.ms_to_mmss(track_sequence.current.time_playing()) helpers.ms_to_mmss(self.track_sequence.current.time_playing())
+ " / " + " / "
+ helpers.ms_to_mmss(track_sequence.current.duration) + helpers.ms_to_mmss(self.track_sequence.current.duration)
) )
# Time to fade # Time to fade
time_to_fade = track_sequence.current.time_to_fade() time_to_fade = self.track_sequence.current.time_to_fade()
time_to_silence = track_sequence.current.time_to_silence() time_to_silence = self.track_sequence.current.time_to_silence()
self.footer_section.label_fade_timer.setText( self.footer_section.label_fade_timer.setText(
helpers.ms_to_mmss(time_to_fade) helpers.ms_to_mmss(time_to_fade)
) )
@ -2725,6 +2714,7 @@ class Window(QMainWindow):
self.catch_return_key = False self.catch_return_key = False
self.show_status_message("Play controls: Enabled", 0) self.show_status_message("Play controls: Enabled", 0)
# Re-enable 10ms timer (see above) # Re-enable 10ms timer (see above)
log.debug(f"issue287: {self.timer10.isActive()=}")
if not self.timer10.isActive(): if not self.timer10.isActive():
self.timer10.start(10) self.timer10.start(10)
log.debug("issue223: update_clocks: 10ms timer enabled") log.debug("issue223: update_clocks: 10ms timer enabled")
@ -2742,25 +2732,25 @@ class Window(QMainWindow):
Update last / current / next track headers Update last / current / next track headers
""" """
if track_sequence.previous: if self.track_sequence.previous:
self.header_section.hdrPreviousTrack.setText( self.header_section.hdrPreviousTrack.setText(
f"{track_sequence.previous.title} - {track_sequence.previous.artist}" f"{self.track_sequence.previous.title} - {self.track_sequence.previous.artist}"
) )
else: else:
self.header_section.hdrPreviousTrack.setText("") self.header_section.hdrPreviousTrack.setText("")
if track_sequence.current: if self.track_sequence.current:
self.header_section.hdrCurrentTrack.setText( self.header_section.hdrCurrentTrack.setText(
f"{track_sequence.current.title.replace('&', '&&')} - " f"{self.track_sequence.current.title.replace('&', '&&')} - "
f"{track_sequence.current.artist.replace('&', '&&')}" f"{self.track_sequence.current.artist.replace('&', '&&')}"
) )
else: else:
self.header_section.hdrCurrentTrack.setText("") self.header_section.hdrCurrentTrack.setText("")
if track_sequence.next: if self.track_sequence.next:
self.header_section.hdrNextTrack.setText( self.header_section.hdrNextTrack.setText(
f"{track_sequence.next.title.replace('&', '&&')} - " f"{self.track_sequence.next.title.replace('&', '&&')} - "
f"{track_sequence.next.artist.replace('&', '&&')}" f"{self.track_sequence.next.artist.replace('&', '&&')}"
) )
else: else:
self.header_section.hdrNextTrack.setText("") self.header_section.hdrNextTrack.setText("")
@ -2775,25 +2765,25 @@ class Window(QMainWindow):
# Do we need to set a 'next' icon? # Do we need to set a 'next' icon?
set_next = True set_next = True
if ( if (
track_sequence.current self.track_sequence.current
and track_sequence.next and self.track_sequence.next
and track_sequence.current.playlist_id == track_sequence.next.playlist_id and self.track_sequence.current.playlist_id == self.track_sequence.next.playlist_id
): ):
set_next = False set_next = False
for idx in range(self.tabBar.count()): for idx in range(self.tabBar.count()):
widget = self.playlist_section.tabPlaylist.widget(idx) widget = self.playlist_section.tabPlaylist.widget(idx)
if ( if (
track_sequence.next self.track_sequence.next
and set_next and set_next
and widget.playlist_id == track_sequence.next.playlist_id and widget.playlist_id == self.track_sequence.next.playlist_id
): ):
self.playlist_section.tabPlaylist.setTabIcon( self.playlist_section.tabPlaylist.setTabIcon(
idx, QIcon(Config.PLAYLIST_ICON_NEXT) idx, QIcon(Config.PLAYLIST_ICON_NEXT)
) )
elif ( elif (
track_sequence.current self.track_sequence.current
and widget.playlist_id == track_sequence.current.playlist_id and widget.playlist_id == self.track_sequence.current.playlist_id
): ):
self.playlist_section.tabPlaylist.setTabIcon( self.playlist_section.tabPlaylist.setTabIcon(
idx, QIcon(Config.PLAYLIST_ICON_CURRENT) idx, QIcon(Config.PLAYLIST_ICON_CURRENT)

File diff suppressed because it is too large Load Diff

533
app/playlistrow.py Normal file
View File

@ -0,0 +1,533 @@
# Standard library imports
import datetime as dt
from typing import Any
# PyQt imports
from PyQt6.QtCore import (
pyqtSignal,
QObject,
QThread,
)
# Third party imports
from pyqtgraph import PlotWidget # type: ignore
from pyqtgraph.graphicsItems.PlotDataItem import PlotDataItem # type: ignore
from pyqtgraph.graphicsItems.LinearRegionItem import LinearRegionItem # type: ignore
import numpy as np
import pyqtgraph as pg # type: ignore
# App imports
from classes import ApplicationError, MusicMusterSignals, PlaylistRowDTO, singleton
from config import Config
import helpers
from log import log
from music_manager import Music
import repository
class PlaylistRow:
"""
Object to manage playlist row and track.
"""
def __init__(self, dto: PlaylistRowDTO) -> None:
"""
The dto object will include a Tracks object if this row has a track.
"""
self.dto = dto
self.music = Music(name=Config.VLC_MAIN_PLAYER_NAME)
self.signals = MusicMusterSignals()
self.end_of_track_signalled: bool = False
self.end_time: dt.datetime | None = None
self.fade_graph: Any | None = None
self.fade_graph_start_updates: dt.datetime | None = None
self.forecast_end_time: dt.datetime | None = None
self.forecast_start_time: dt.datetime | None = None
self.note_bg: str | None = None
self.note_fg: str | None = None
self.resume_marker: float = 0.0
self.row_bg: str | None = None
self.row_fg: str | None = None
self.start_time: dt.datetime | None = None
def __repr__(self) -> str:
return (
f"<PlaylistRow(playlist_id={self.dto.playlist_id}, "
f"row_number={self.dto.row_number}, "
f"playlistrow_id={self.dto.playlistrow_id}, "
f"note={self.dto.note}, track_id={self.dto.track_id}>"
)
# Expose TrackDTO fields as properties
@property
def artist(self):
return self.dto.artist
@property
def bitrate(self):
return self.dto.bitrate
@property
def duration(self):
return self.dto.duration
@property
def fade_at(self):
return self.dto.fade_at
@property
def intro(self):
return self.dto.intro
@property
def lastplayed(self):
return self.dto.lastplayed
@property
def path(self):
return self.dto.path
@property
def silence_at(self):
return self.dto.silence_at
@property
def start_gap(self):
return self.dto.start_gap
@property
def title(self):
return self.dto.title
@property
def track_id(self):
return self.dto.track_id
@track_id.setter
def track_id(self, value: int) -> None:
"""
Adding a track_id should only happen to a header row.
"""
if self.track_id:
raise ApplicationError("Attempting to add track to row with existing track ({self=}")
# TODO: set up write access to track_id. Should only update if
# track_id == 0. Need to update all other track fields at the
# same time.
print("set track_id attribute for {self=}, {value=}")
pass
# Expose PlaylistRowDTO fields as properties
@property
def note(self):
return self.dto.note
@note.setter
def note(self, value: str) -> None:
# TODO set up write access to db
print("set note attribute for {self=}, {value=}")
# self.dto.note = value
@property
def played(self):
return self.dto.played
@played.setter
def played(self, value: bool = True) -> None:
# TODO set up write access to db
print("set played attribute for {self=}")
# self.dto.played = value
@property
def playlist_id(self):
return self.dto.playlist_id
@property
def playlistrow_id(self):
return self.dto.playlistrow_id
@property
def row_number(self):
return self.dto.row_number
@row_number.setter
def row_number(self, value: int) -> None:
# TODO do we need to set up write access to db?
self.dto.row_number = value
def check_for_end_of_track(self) -> None:
"""
Check whether track has ended. If so, emit track_ended_signal
"""
if self.start_time is None:
return
if self.end_of_track_signalled:
return
if self.music.is_playing():
return
self.start_time = None
if self.fade_graph:
self.fade_graph.clear()
# Ensure that player is released
self.music.fade(0)
self.signals.track_ended_signal.emit()
self.end_of_track_signalled = True
def drop3db(self, enable: bool) -> None:
"""
If enable is true, drop output by 3db else restore to full volume
"""
if enable:
self.music.set_volume(volume=Config.VLC_VOLUME_DROP3db, set_default=False)
else:
self.music.set_volume(volume=Config.VLC_VOLUME_DEFAULT, set_default=False)
def fade(self, fade_seconds: int = Config.FADEOUT_SECONDS) -> None:
"""Fade music"""
self.resume_marker = self.music.get_position()
self.music.fade(fade_seconds)
self.signals.track_ended_signal.emit()
def is_playing(self) -> bool:
"""
Return True if we're currently playing else False
"""
if self.start_time is None:
return False
return self.music.is_playing()
def play(self, position: float | None = None) -> None:
"""Play track"""
now = dt.datetime.now()
self.start_time = now
# Initialise player
self.music.play(self.path, start_time=now, position=position)
self.end_time = now + dt.timedelta(milliseconds=self.duration)
# Calculate time fade_graph should start updating
if self.fade_at:
update_graph_at_ms = max(
0, self.fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1
)
self.fade_graph_start_updates = now + dt.timedelta(
milliseconds=update_graph_at_ms
)
def set_forecast_start_time(
self, modified_rows: list[int], start: dt.datetime | None
) -> dt.datetime | None:
"""
Set forecast start time for this row
Update passed modified rows list if we changed the row.
Return new start time
"""
changed = False
if self.forecast_start_time != start:
self.forecast_start_time = start
changed = True
if start is None:
if self.forecast_end_time is not None:
self.forecast_end_time = None
changed = True
new_start_time = None
else:
end_time = start + dt.timedelta(milliseconds=self.duration)
new_start_time = end_time
if self.forecast_end_time != end_time:
self.forecast_end_time = end_time
changed = True
if changed and self.row_number not in modified_rows:
modified_rows.append(self.row_number)
return new_start_time
def stop(self, fade_seconds: int = 0) -> None:
"""
Stop this track playing
"""
self.resume_marker = self.music.get_position()
self.fade(fade_seconds)
# Reset fade graph
if self.fade_graph:
self.fade_graph.clear()
def time_playing(self) -> int:
"""
Return time track has been playing in milliseconds, zero if not playing
"""
if self.start_time is None:
return 0
return self.music.get_playtime()
def time_remaining_intro(self) -> int:
"""
Return milliseconds of intro remaining. Return 0 if no intro time in track
record or if intro has finished.
"""
if not self.intro:
return 0
return max(0, self.intro - self.time_playing())
def time_to_fade(self) -> int:
"""
Return milliseconds until fade time. Return zero if we're not playing.
"""
if self.start_time is None:
return 0
return self.fade_at - self.time_playing()
def time_to_silence(self) -> int:
"""
Return milliseconds until silent. Return zero if we're not playing.
"""
if self.start_time is None:
return 0
return self.silence_at - self.time_playing()
def update_fade_graph(self) -> None:
"""
Update fade graph
"""
if (
not self.is_playing()
or not self.fade_graph_start_updates
or not self.fade_graph
):
return
now = dt.datetime.now()
if self.fade_graph_start_updates > now:
return
self.fade_graph.tick(self.time_playing())
class _AddFadeCurve(QObject):
"""
Initialising a fade curve introduces a noticeable delay so carry out in
a thread.
"""
finished = pyqtSignal()
def __init__(
self,
plr: PlaylistRow,
track_path: str,
track_fade_at: int,
track_silence_at: int,
) -> None:
super().__init__()
self.plr = plr
self.track_path = track_path
self.track_fade_at = track_fade_at
self.track_silence_at = track_silence_at
def run(self) -> None:
"""
Create fade curve and add to PlaylistTrack object
"""
fc = FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at)
if not fc:
log.error(f"Failed to create FadeCurve for {self.track_path=}")
else:
self.plr.fade_graph = fc
self.finished.emit()
class FadeCurve:
GraphWidget: PlotWidget | None = None
def __init__(
self, track_path: str, track_fade_at: int, track_silence_at: int
) -> None:
"""
Set up fade graph array
"""
audio = helpers.get_audio_segment(track_path)
if not audio:
log.error(f"FadeCurve: could not get audio for {track_path=}")
return None
# Start point of curve is Config.FADE_CURVE_MS_BEFORE_FADE
# milliseconds before fade starts to silence
self.start_ms: int = max(
0, track_fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1
)
self.end_ms: int = track_silence_at
audio_segment = audio[self.start_ms : self.end_ms]
self.graph_array = np.array(audio_segment.get_array_of_samples())
# Calculate the factor to map milliseconds of track to array
self.ms_to_array_factor = len(self.graph_array) / (self.end_ms - self.start_ms)
self.curve: PlotDataItem | None = None
self.region: LinearRegionItem | None = None
def clear(self) -> None:
"""Clear the current graph"""
if self.GraphWidget:
self.GraphWidget.clear()
def plot(self) -> None:
if self.GraphWidget:
self.curve = self.GraphWidget.plot(self.graph_array)
if self.curve:
self.curve.setPen(Config.FADE_CURVE_FOREGROUND)
else:
log.debug("_FadeCurve.plot: no curve")
else:
log.debug("_FadeCurve.plot: no GraphWidget")
def tick(self, play_time: int) -> None:
"""Update volume fade curve"""
if not self.GraphWidget:
return
ms_of_graph = play_time - self.start_ms
if ms_of_graph < 0:
return
if self.region is None:
# Create the region now that we're into fade
self.region = pg.LinearRegionItem([0, 0], bounds=[0, len(self.graph_array)])
self.GraphWidget.addItem(self.region)
# Update region position
if self.region:
self.region.setRegion([0, ms_of_graph * self.ms_to_array_factor])
@singleton
class TrackSequence:
"""
Maintain a list of which track (if any) is next, current and
previous. A track can only be previous after being current, and can
only be current after being next. If one of the tracks listed here
moves, the row_number and/or playlist_id will change.
"""
def __init__(self) -> None:
"""
Set up storage for the three monitored tracks
"""
self.next: PlaylistRow | None = None
self.current: PlaylistRow | None = None
self.previous: PlaylistRow | None = None
def set_next(self, plr: PlaylistRow | None) -> None:
"""
Set the 'next' track to be passed PlaylistRow. Clear any previous
next track. If passed PlaylistRow is None just clear existing
next track.
"""
# Clear any existing fade graph
if self.next and self.next.fade_graph:
self.next.fade_graph.clear()
if plr is None:
self.next = None
else:
self.next = plr
self.create_fade_graph()
def move_next_to_current(self) -> None:
"""
Make the next track the current track
"""
self.current = self.next
self.next = None
def move_current_to_previous(self) -> None:
"""
Make the current track the previous track
"""
if self.current is None:
raise ApplicationError("Tried to move non-existent track from current to previous")
# Dereference the fade curve so it can be garbage collected
self.current.fade_graph = None
self.previous = self.current
self.current = None
def move_previous_to_next(self) -> None:
"""
Make the previous track the next track
"""
self.next = self.previous
self.previous = None
def create_fade_graph(self) -> None:
"""
Initialise and add FadeCurve in a thread as it's slow
"""
self.fadecurve_thread = QThread()
if self.next is None:
raise ApplicationError("hell in a handcart")
self.worker = _AddFadeCurve(
self.next,
track_path=self.next.path,
track_fade_at=self.next.fade_at,
track_silence_at=self.next.silence_at,
)
self.worker.moveToThread(self.fadecurve_thread)
self.fadecurve_thread.started.connect(self.worker.run)
self.worker.finished.connect(self.fadecurve_thread.quit)
self.worker.finished.connect(self.worker.deleteLater)
self.fadecurve_thread.finished.connect(self.fadecurve_thread.deleteLater)
self.fadecurve_thread.start()
def update(self) -> None:
"""
If a PlaylistRow is edited (moved, title changed, etc), the
playlistrow_id won't change. We can retrieve the PlaylistRow
using the playlistrow_id and update the stored PlaylistRow.
"""
for ts in [self.next, self.current, self.previous]:
if not ts:
continue
playlist_row_dto = repository.get_playlist_row(ts.playlistrow_id)
if not playlist_row_dto:
raise ApplicationError(f"(Can't retrieve PlaylistRows entry, {self=}")
ts = PlaylistRow(playlist_row_dto)

View File

@ -37,16 +37,16 @@ from PyQt6.QtWidgets import (
from audacity_controller import AudacityController from audacity_controller import AudacityController
from classes import ApplicationError, Col, MusicMusterSignals, PlaylistStyle, TrackInfo from classes import ApplicationError, Col, MusicMusterSignals, PlaylistStyle, TrackInfo
from config import Config from config import Config
from dialogs import TrackSelectDialog from dialogs import TrackInsertDialog
from helpers import ( from helpers import (
ask_yes_no, ask_yes_no,
ms_to_mmss, ms_to_mmss,
show_OK, show_OK,
show_warning, show_warning,
) )
from log import log from log import log, log_call
from models import db, Settings from models import db, Settings
from music_manager import track_sequence from playlistrow import TrackSequence
from playlistmodel import PlaylistModel, PlaylistProxyModel from playlistmodel import PlaylistModel, PlaylistProxyModel
if TYPE_CHECKING: if TYPE_CHECKING:
@ -278,6 +278,7 @@ class PlaylistTab(QTableView):
self.musicmuster = musicmuster self.musicmuster = musicmuster
self.playlist_id = model.sourceModel().playlist_id self.playlist_id = model.sourceModel().playlist_id
self.track_sequence = TrackSequence()
# Set up widget # Set up widget
self.setItemDelegate(PlaylistDelegate(self, model.sourceModel())) self.setItemDelegate(PlaylistDelegate(self, model.sourceModel()))
@ -358,7 +359,8 @@ class PlaylistTab(QTableView):
# Deselect edited line # Deselect edited line
self.clear_selection() self.clear_selection()
def dropEvent(self, event: Optional[QDropEvent], dummy: int | None = None) -> None: @log_call
def dropEvent(self, event: Optional[QDropEvent]) -> None:
""" """
Move dropped rows Move dropped rows
""" """
@ -394,9 +396,6 @@ class PlaylistTab(QTableView):
destination_index = to_index destination_index = to_index
to_model_row = self.model().mapToSource(destination_index).row() to_model_row = self.model().mapToSource(destination_index).row()
log.debug(
f"PlaylistTab.dropEvent(): {from_rows=}, {destination_index=}, {to_model_row=}"
)
# Sanity check # Sanity check
base_model_row_count = self.get_base_model().rowCount() base_model_row_count = self.get_base_model().rowCount()
@ -408,8 +407,8 @@ class PlaylistTab(QTableView):
# that moved row the next track # that moved row the next track
set_next_row: Optional[int] = None set_next_row: Optional[int] = None
if ( if (
track_sequence.current self.track_sequence.current
and to_model_row == track_sequence.current.row_number + 1 and to_model_row == self.track_sequence.current.row_number + 1
): ):
set_next_row = to_model_row set_next_row = to_model_row
@ -456,14 +455,19 @@ class PlaylistTab(QTableView):
self, selected: QItemSelection, deselected: QItemSelection self, selected: QItemSelection, deselected: QItemSelection
) -> None: ) -> None:
""" """
Tell model which rows are selected.
Toggle drag behaviour according to whether rows are selected Toggle drag behaviour according to whether rows are selected
""" """
selected_rows = self.get_selected_rows() selected_row_numbers = self.get_selected_rows()
self.musicmuster.current.selected_rows = selected_rows
# Signal selected rows to model
self.signals.signal_playlist_selected_rows.emit(self.playlist_id, selected_row_numbers)
# Put sum of selected tracks' duration in status bar
# If no rows are selected, we have nothing to do # If no rows are selected, we have nothing to do
if len(selected_rows) == 0: if len(selected_row_numbers) == 0:
self.musicmuster.lblSumPlaytime.setText("") self.musicmuster.lblSumPlaytime.setText("")
else: else:
if not self.musicmuster.disable_selection_timing: if not self.musicmuster.disable_selection_timing:
@ -514,11 +518,9 @@ class PlaylistTab(QTableView):
return return
with db.Session() as session: with db.Session() as session:
dlg = TrackSelectDialog( dlg = TrackInsertDialog(
parent=self.musicmuster, parent=self.musicmuster,
session=session, playlist_id=self.playlist_id,
new_row_number=model_row_number,
base_model=self.get_base_model(),
add_to_header=True, add_to_header=True,
) )
dlg.exec() dlg.exec()
@ -535,12 +537,12 @@ class PlaylistTab(QTableView):
header_row = self.get_base_model().is_header_row(model_row_number) header_row = self.get_base_model().is_header_row(model_row_number)
track_row = not header_row track_row = not header_row
if track_sequence.current: if self.track_sequence.current:
this_is_current_row = model_row_number == track_sequence.current.row_number this_is_current_row = model_row_number == self.track_sequence.current.row_number
else: else:
this_is_current_row = False this_is_current_row = False
if track_sequence.next: if self.track_sequence.next:
this_is_next_row = model_row_number == track_sequence.next.row_number this_is_next_row = model_row_number == self.track_sequence.next.row_number
else: else:
this_is_next_row = False this_is_next_row = False
track_path = base_model.get_row_info(model_row_number).path track_path = base_model.get_row_info(model_row_number).path
@ -676,8 +678,6 @@ class PlaylistTab(QTableView):
Called when column width changes. Save new width to database. Called when column width changes. Save new width to database.
""" """
log.debug(f"_column_resize({column_number=}, {_old=}, {_new=}")
header = self.horizontalHeader() header = self.horizontalHeader()
if not header: if not header:
return return
@ -722,6 +722,7 @@ class PlaylistTab(QTableView):
cb.clear(mode=cb.Mode.Clipboard) cb.clear(mode=cb.Mode.Clipboard)
cb.setText(track_path, mode=cb.Mode.Clipboard) cb.setText(track_path, mode=cb.Mode.Clipboard)
@log_call
def current_track_started(self) -> None: def current_track_started(self) -> None:
""" """
Called when track starts playing Called when track starts playing
@ -757,8 +758,8 @@ class PlaylistTab(QTableView):
# Don't delete current or next tracks # Don't delete current or next tracks
selected_row_numbers = self.selected_model_row_numbers() selected_row_numbers = self.selected_model_row_numbers()
for ts in [ for ts in [
track_sequence.next, self.track_sequence.next,
track_sequence.current, self.track_sequence.current,
]: ]:
if ts: if ts:
if ( if (
@ -809,6 +810,7 @@ class PlaylistTab(QTableView):
else: else:
return TrackInfo(track_id, selected_row) return TrackInfo(track_id, selected_row)
@log_call
def get_selected_row(self) -> Optional[int]: def get_selected_row(self) -> Optional[int]:
""" """
Return selected row number. If no rows or multiple rows selected, return None Return selected row number. If no rows or multiple rows selected, return None
@ -820,6 +822,7 @@ class PlaylistTab(QTableView):
else: else:
return None return None
@log_call
def get_selected_rows(self) -> list[int]: def get_selected_rows(self) -> list[int]:
"""Return a list of model-selected row numbers sorted by row""" """Return a list of model-selected row numbers sorted by row"""
@ -832,6 +835,7 @@ class PlaylistTab(QTableView):
return sorted(list(set([self.model().mapToSource(a).row() for a in selected_indexes]))) return sorted(list(set([self.model().mapToSource(a).row() for a in selected_indexes])))
@log_call
def get_top_visible_row(self) -> int: def get_top_visible_row(self) -> int:
""" """
Get the viewport of the table view Get the viewport of the table view
@ -954,8 +958,6 @@ class PlaylistTab(QTableView):
If playlist_id is us, resize rows If playlist_id is us, resize rows
""" """
log.debug(f"resize_rows({playlist_id=}) {self.playlist_id=}")
if playlist_id and playlist_id != self.playlist_id: if playlist_id and playlist_id != self.playlist_id:
return return
@ -1002,6 +1004,7 @@ class PlaylistTab(QTableView):
# Reset selection mode # Reset selection mode
self.setSelectionMode(QAbstractItemView.SelectionMode.ExtendedSelection) self.setSelectionMode(QAbstractItemView.SelectionMode.ExtendedSelection)
@log_call
def source_model_selected_row_number(self) -> Optional[int]: def source_model_selected_row_number(self) -> Optional[int]:
""" """
Return the model row number corresponding to the selected row or None Return the model row number corresponding to the selected row or None
@ -1012,6 +1015,7 @@ class PlaylistTab(QTableView):
return None return None
return self.model().mapToSource(selected_index).row() return self.model().mapToSource(selected_index).row()
@log_call
def selected_model_row_numbers(self) -> list[int]: def selected_model_row_numbers(self) -> list[int]:
""" """
Return a list of model row numbers corresponding to the selected rows or Return a list of model row numbers corresponding to the selected rows or
@ -1054,8 +1058,6 @@ class PlaylistTab(QTableView):
def _set_column_widths(self) -> None: def _set_column_widths(self) -> None:
"""Column widths from settings""" """Column widths from settings"""
log.debug("_set_column_widths()")
header = self.horizontalHeader() header = self.horizontalHeader()
if not header: if not header:
return return
@ -1119,7 +1121,7 @@ class PlaylistTab(QTableView):
# Update musicmuster # Update musicmuster
self.musicmuster.current.playlist_id = self.playlist_id self.musicmuster.current.playlist_id = self.playlist_id
self.musicmuster.current.selected_rows = self.get_selected_rows() self.musicmuster.current.selected_row_numbers = self.get_selected_rows()
self.musicmuster.current.base_model = self.get_base_model() self.musicmuster.current.base_model = self.get_base_model()
self.musicmuster.current.proxy_model = self.model() self.musicmuster.current.proxy_model = self.model()
@ -1128,6 +1130,6 @@ class PlaylistTab(QTableView):
def _unmark_as_next(self) -> None: def _unmark_as_next(self) -> None:
"""Rescan track""" """Rescan track"""
track_sequence.set_next(None) self.track_sequence.set_next(None)
self.clear_selection() self.clear_selection()
self.signals.next_track_changed_signal.emit() self.signals.next_track_changed_signal.emit()

View File

@ -40,7 +40,7 @@ from helpers import (
) )
from log import log from log import log
from models import db, Playdates, Tracks from models import db, Playdates, Tracks
from music_manager import RowAndTrack from playlistrow import PlaylistRow
@dataclass @dataclass
@ -268,7 +268,7 @@ class QuerylistModel(QAbstractTableModel):
bottom_right = self.index(row, self.columnCount() - 1) bottom_right = self.index(row, self.columnCount() - 1)
self.dataChanged.emit(top_left, bottom_right, [Qt.ItemDataRole.BackgroundRole]) self.dataChanged.emit(top_left, bottom_right, [Qt.ItemDataRole.BackgroundRole])
def _tooltip_role(self, row: int, column: int, rat: RowAndTrack) -> str | QVariant: def _tooltip_role(self, row: int, column: int, rat: PlaylistRow) -> str | QVariant:
""" """
Return tooltip. Currently only used for last_played column. Return tooltip. Currently only used for last_played column.
""" """

748
app/repository.py Normal file
View File

@ -0,0 +1,748 @@
# Standard library imports
import re
# PyQt imports
# Third party imports
from sqlalchemy import (
delete,
func,
select,
update,
)
from sqlalchemy.orm import aliased
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.elements import BinaryExpression, ColumnElement
from classes import ApplicationError, PlaylistRowDTO
# App imports
from classes import PlaylistDTO, TrackDTO
from config import Config
import helpers
from log import log
from models import (
db,
NoteColours,
Playdates,
PlaylistRows,
Playlists,
Settings,
Tracks,
)
# Notecolour functions
def get_colour(text: str, foreground: bool = False) -> str:
"""
Parse text and return background (foreground if foreground==True)
colour string if matched, else None
"""
if not text:
return ""
match = False
with db.Session() as session:
for rec in NoteColours.get_all(session):
if rec.is_regex:
flags = re.UNICODE
if not rec.is_casesensitive:
flags |= re.IGNORECASE
p = re.compile(rec.substring, flags)
if p.match(text):
match = True
else:
if rec.is_casesensitive:
if rec.substring in text:
match = True
else:
if rec.substring.lower() in text.lower():
match = True
if match:
if foreground:
return rec.foreground or ""
else:
return rec.colour
return ""
# Track functions
def add_track_to_header(playlistrow_id: int, track_id: int) -> None:
"""
Add a track to this (header) row
"""
with db.Session() as session:
session.execute(
update(PlaylistRows)
.where(PlaylistRows.id == playlistrow_id)
.values(track_id=track_id)
)
session.commit()
def create_track(path: str) -> TrackDTO:
"""
Create a track db entry from a track path and return the DTO
"""
metadata = helpers.get_all_track_metadata(path)
with db.Session() as session:
try:
track = Tracks(
session=session,
path=str(metadata["path"]),
title=str(metadata["title"]),
artist=str(metadata["artist"]),
duration=int(metadata["duration"]),
start_gap=int(metadata["start_gap"]),
fade_at=int(metadata["fade_at"]),
silence_at=int(metadata["silence_at"]),
bitrate=int(metadata["bitrate"]),
)
track_id = track.id
session.commit()
except Exception:
raise ApplicationError("Can't create Track")
new_track = track_by_id(track_id)
if not new_track:
raise ApplicationError("Unable to create new track")
return new_track
def get_all_tracks() -> list[TrackDTO]:
"""Return a list of all tracks"""
return _tracks_where(Tracks.id > 0)
def track_by_id(track_id: int) -> TrackDTO | None:
"""
Return track with specified id
"""
# Alias PlaydatesTable for subquery
LatestPlaydate = aliased(Playdates)
# Subquery: latest playdate for each track
latest_playdate_subq = (
select(
LatestPlaydate.track_id,
func.max(LatestPlaydate.lastplayed).label("lastplayed"),
)
.group_by(LatestPlaydate.track_id)
.subquery()
)
stmt = (
select(
Tracks.id.label("track_id"),
Tracks.artist,
Tracks.bitrate,
Tracks.duration,
Tracks.fade_at,
Tracks.intro,
Tracks.path,
Tracks.silence_at,
Tracks.start_gap,
Tracks.title,
latest_playdate_subq.c.lastplayed,
)
.outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id)
.where(Tracks.id == track_id)
)
with db.Session() as session:
record = session.execute(stmt).one_or_none()
if not record:
return None
dto = TrackDTO(
artist=record.artist,
bitrate=record.bitrate,
duration=record.duration,
fade_at=record.fade_at,
intro=record.intro,
lastplayed=record.lastplayed,
path=record.path,
silence_at=record.silence_at,
start_gap=record.start_gap,
title=record.title,
track_id=record.track_id,
)
return dto
def _tracks_where(where: BinaryExpression | ColumnElement[bool]) -> list[TrackDTO]:
"""
Return tracks selected by where
"""
# Alias PlaydatesTable for subquery
LatestPlaydate = aliased(Playdates)
# Subquery: latest playdate for each track
latest_playdate_subq = (
select(
LatestPlaydate.track_id,
func.max(LatestPlaydate.lastplayed).label("lastplayed"),
)
.group_by(LatestPlaydate.track_id)
.subquery()
)
stmt = (
select(
Tracks.id.label("track_id"),
Tracks.artist,
Tracks.bitrate,
Tracks.duration,
Tracks.fade_at,
Tracks.intro,
Tracks.path,
Tracks.silence_at,
Tracks.start_gap,
Tracks.title,
latest_playdate_subq.c.lastplayed,
)
.outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id)
.where(where)
)
results: list[TrackDTO] = []
with db.Session() as session:
records = session.execute(stmt).all()
for record in records:
dto = TrackDTO(
artist=record.artist,
bitrate=record.bitrate,
duration=record.duration,
fade_at=record.fade_at,
intro=record.intro,
lastplayed=record.lastplayed,
path=record.path,
silence_at=record.silence_at,
start_gap=record.start_gap,
title=record.title,
track_id=record.track_id,
)
results.append(dto)
return results
def tracks_like_artist(filter_str: str) -> list[TrackDTO]:
"""
Return tracks where artist is like filter
"""
return _tracks_where(Tracks.artist.ilike(f"%{filter_str}%"))
def tracks_like_title(filter_str: str) -> list[TrackDTO]:
"""
Return tracks where title is like filter
"""
return _tracks_where(Tracks.title.ilike(f"%{filter_str}%"))
# Playlist functions
def _check_playlist_integrity(
session: Session, playlist_id: int, fix: bool = False
) -> None:
"""
Ensure the row numbers are contiguous. Fix and log if fix==True,
else raise ApplicationError.
"""
playlist_rows = (
session.execute(
select(PlaylistRows)
.where(PlaylistRows.playlist_id == playlist_id)
.order_by(PlaylistRows.row_number)
)
.scalars()
.all()
)
for idx, plr in enumerate(playlist_rows):
if plr.row_number == idx:
continue
msg = (
"_check_playlist_integrity: incorrect row number "
f"({plr.id=}, {plr.row_number=}, {idx=})"
)
if fix:
log.debug(msg)
plr.row_number = idx
else:
raise ApplicationError(msg)
def _shift_rows(
session: Session, playlist_id: int, starting_row: int, shift_by: int
) -> None:
"""
Shift rows from starting_row by shift_by. If shift_by is +ve, shift rows
down; if -ve, shift them up.
"""
log.debug(f"(_shift_rows_down({playlist_id=}, {starting_row=}, {shift_by=}")
session.execute(
update(PlaylistRows)
.where(
(PlaylistRows.playlist_id == playlist_id),
(PlaylistRows.row_number >= starting_row),
)
.values(row_number=PlaylistRows.row_number + shift_by)
)
def move_rows(
from_rows: list[int], from_playlist_id: int, to_row: int, to_playlist_id: int | None = None
) -> None:
"""
Move rows with or between playlists.
Algorithm:
- Sanity check row numbers
- Check there are no playlist rows with playlist_id == PENDING_MOVE
- Put rows to be moved into PENDING_MOVE playlist
- Resequence remaining row numbers
- Make space for moved rows
- Move the PENDING_MOVE rows back and fixup row numbers
- Sanity check row numbers
"""
log.debug(
f"move_rows_to_playlist({from_rows=}, {from_playlist_id=}, {to_row=}, {to_playlist_id=})"
)
# If to_playlist_id isn't specified, we're moving within the one
# playlist.
if to_playlist_id is None:
to_playlist_id = from_playlist_id
with db.Session() as session:
# Sanity check row numbers
_check_playlist_integrity(session, from_playlist_id, fix=False)
if from_playlist_id != to_playlist_id:
_check_playlist_integrity(session, to_playlist_id, fix=False)
# Check there are no playlist rows with playlist_id == PENDING_MOVE
pending_move_rows = get_playlist_rows(Config.PLAYLIST_PENDING_MOVE)
if pending_move_rows:
raise ApplicationError(f"move_rows_to_playlist: {pending_move_rows=}")
# We need playlist length if we're moving within a playlist. Get
# that now before we remove rows.
from_playlist_length = len(get_playlist_rows(from_playlist_id))
# Put rows to be moved into PENDING_MOVE playlist
session.execute(
update(PlaylistRows)
.where(
PlaylistRows.playlist_id == from_playlist_id,
PlaylistRows.row_number.in_(from_rows),
)
.values(playlist_id=Config.PLAYLIST_PENDING_MOVE)
)
# Resequence remaining row numbers
_check_playlist_integrity(session, from_playlist_id, fix=True)
session.commit()
# Make space for moved rows. If moving within one playlist,
# determning where to make the space is non-trivial. For example,
# if the playlist has ten entries and we're moving four of them
# to row 8, after we've moved the rows to the
# PLAYLIST_PENDING_MOVE there will only be six entries left.
# Clearly we can't make space at row 8...
space_row = to_row
if to_playlist_id == from_playlist_id:
overflow = max(to_row + len(from_rows) - from_playlist_length, 0)
if overflow != 0:
space_row = (
to_row - overflow - len([a for a in from_rows if a > to_row])
)
_shift_rows(session, to_playlist_id, space_row, len(from_rows))
# Move the PENDING_MOVE rows back and fixup row numbers
update_list: list[dict[str, int]] = []
next_row = space_row
# PLAYLIST_PENDING_MOVE may have gaps so don't check it
for row_to_move in get_playlist_rows(
Config.PLAYLIST_PENDING_MOVE, check_playlist_itegrity=False
):
update_list.append(
{"id": row_to_move.playlistrow_id, "row_number": next_row}
)
update_list.append(
{"id": row_to_move.playlistrow_id, "playlist_id": to_playlist_id}
)
next_row += 1
session.execute(update(PlaylistRows), update_list)
session.commit()
# Sanity check row numbers
_check_playlist_integrity(session, from_playlist_id, fix=False)
if from_playlist_id != to_playlist_id:
_check_playlist_integrity(session, to_playlist_id, fix=False)
def update_row_numbers(
playlist_id: int, id_to_row_number: list[dict[int, int]]
) -> None:
"""
Update playlistrows rownumbers for passed playlistrow_ids
playlist_id is only needed for sanity checking
"""
with db.Session() as session:
session.execute(update(PlaylistRows), id_to_row_number)
session.commit()
# Sanity check
_check_playlist_integrity(session, playlist_id, fix=False)
def create_playlist(name: str, template_id: int) -> PlaylistDTO:
"""
Create playlist and return DTO.
"""
with db.Session() as session:
try:
playlist = Playlists(session, name, template_id)
playlist_id = playlist.id
session.commit()
except Exception:
raise ApplicationError("Can't create Playlist")
new_playlist = playlist_by_id(playlist_id)
if not new_playlist:
raise ApplicationError("Can't retrieve new Playlist")
return new_playlist
def get_playlist_row(playlistrow_id: int) -> PlaylistRowDTO | None:
"""
Return specific row DTO
"""
# Alias PlaydatesTable for subquery
LatestPlaydate = aliased(Playdates)
# Subquery: latest playdate for each track
latest_playdate_subq = (
select(
LatestPlaydate.track_id,
func.max(LatestPlaydate.lastplayed).label("lastplayed"),
)
.group_by(LatestPlaydate.track_id)
.subquery()
)
stmt = (
select(
PlaylistRows.id.label("playlistrow_id"),
PlaylistRows.row_number,
PlaylistRows.note,
PlaylistRows.played,
PlaylistRows.playlist_id,
Tracks.id.label("track_id"),
Tracks.artist,
Tracks.bitrate,
Tracks.duration,
Tracks.fade_at,
Tracks.intro,
Tracks.path,
Tracks.silence_at,
Tracks.start_gap,
Tracks.title,
latest_playdate_subq.c.lastplayed,
)
.outerjoin(Tracks, PlaylistRows.track_id == Tracks.id)
.outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id)
.where(PlaylistRows.id == playlistrow_id)
.order_by(PlaylistRows.row_number)
)
with db.Session() as session:
record = session.execute(stmt).one_or_none()
if not record:
return None
# Handle cases where track_id is None (no track associated)
if record.track_id is None:
dto = PlaylistRowDTO(
artist="",
bitrate=0,
duration=0,
fade_at=0,
intro=None,
lastplayed=None,
note=record.note,
path="",
played=record.played,
playlist_id=record.playlist_id,
playlistrow_id=record.playlistrow_id,
row_number=record.row_number,
silence_at=0,
start_gap=0,
title="",
track_id=-1,
)
else:
dto = PlaylistRowDTO(
artist=record.artist,
bitrate=record.bitrate,
duration=record.duration,
fade_at=record.fade_at,
intro=record.intro,
lastplayed=record.lastplayed,
note=record.note,
path=record.path,
played=record.played,
playlist_id=record.playlist_id,
playlistrow_id=record.playlistrow_id,
row_number=record.row_number,
silence_at=record.silence_at,
start_gap=record.start_gap,
title=record.title,
track_id=record.track_id,
)
return dto
def get_playlist_rows(
playlist_id: int, check_playlist_itegrity=True
) -> list[PlaylistRowDTO]:
# Alias PlaydatesTable for subquery
LatestPlaydate = aliased(Playdates)
# Subquery: latest playdate for each track
latest_playdate_subq = (
select(
LatestPlaydate.track_id,
func.max(LatestPlaydate.lastplayed).label("lastplayed"),
)
.group_by(LatestPlaydate.track_id)
.subquery()
)
stmt = (
select(
PlaylistRows.id.label("playlistrow_id"),
PlaylistRows.row_number,
PlaylistRows.note,
PlaylistRows.played,
PlaylistRows.playlist_id,
Tracks.id.label("track_id"),
Tracks.artist,
Tracks.bitrate,
Tracks.duration,
Tracks.fade_at,
Tracks.intro,
Tracks.path,
Tracks.silence_at,
Tracks.start_gap,
Tracks.title,
latest_playdate_subq.c.lastplayed,
)
.outerjoin(Tracks, PlaylistRows.track_id == Tracks.id)
.outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id)
.where(PlaylistRows.playlist_id == playlist_id)
.order_by(PlaylistRows.row_number)
)
with db.Session() as session:
results = session.execute(stmt).all()
# Sanity check
# TODO: would be good to be confident at removing this
if check_playlist_itegrity:
_check_playlist_integrity(
session=session, playlist_id=playlist_id, fix=False
)
dto_list = []
for row in results:
# Handle cases where track_id is None (no track associated)
if row.track_id is None:
dto = PlaylistRowDTO(
artist="",
bitrate=0,
duration=0,
fade_at=0,
intro=None,
lastplayed=None,
note=row.note,
path="",
played=row.played,
playlist_id=row.playlist_id,
playlistrow_id=row.playlistrow_id,
row_number=row.row_number,
silence_at=0,
start_gap=0,
title="",
track_id=-1,
# Additional fields like row_fg, row_bg, etc., use default None values
)
else:
dto = PlaylistRowDTO(
artist=row.artist,
bitrate=row.bitrate,
duration=row.duration,
fade_at=row.fade_at,
intro=row.intro,
lastplayed=row.lastplayed,
note=row.note,
path=row.path,
played=row.played,
playlist_id=row.playlist_id,
playlistrow_id=row.playlistrow_id,
row_number=row.row_number,
silence_at=row.silence_at,
start_gap=row.start_gap,
title=row.title,
track_id=row.track_id,
# Additional fields like row_fg, row_bg, etc., use default None values
)
dto_list.append(dto)
return dto_list
def insert_row(
playlist_id: int, row_number: int, track_id: int | None, note: str
) -> PlaylistRowDTO:
"""
Insert a new row into playlist and return new row DTO
"""
with db.Session() as session:
# Sanity check
_check_playlist_integrity(session, playlist_id, fix=False)
# Make space for new row
_shift_rows(
session=session,
playlist_id=playlist_id,
starting_row=row_number,
shift_by=1,
)
playlist_row = PlaylistRows.insert_row(
session=session,
playlist_id=playlist_id,
new_row_number=row_number,
note=note,
track_id=track_id,
)
session.commit()
playlist_row_id = playlist_row.id
# Sanity check
_check_playlist_integrity(session, playlist_id, fix=False)
new_playlist_row = get_playlist_row(playlistrow_id=playlist_row_id)
if not new_playlist_row:
raise ApplicationError("Can't retrieve new playlist row")
return new_playlist_row
def remove_rows(playlist_id: int, row_numbers: list[int]) -> None:
"""
Remove rows from playlist
Delete from highest row back so that not yet deleted row numbers don't change.
"""
log.debug(f"remove_rows({playlist_id=}, {row_numbers=}")
with db.Session() as session:
for row_number in sorted(row_numbers, reverse=True):
session.execute(
delete(PlaylistRows).where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.row_number == row_number,
)
)
# Fixup row number to remove gaps
_check_playlist_integrity(session, playlist_id, fix=True)
session.commit()
def playlist_by_id(playlist_id: int) -> PlaylistDTO | None:
"""
Return playlist with specified id
"""
stmt = select(
Playlists.id.label("playlist_id"),
Playlists.name,
Playlists.favourite,
Playlists.is_template,
Playlists.open,
).where(Playlists.id == playlist_id)
with db.Session() as session:
record = session.execute(stmt).one_or_none()
if not record:
return None
dto = PlaylistDTO(
name=record.name,
playlist_id=record.playlist_id,
favourite=record.favourite,
is_template=record.is_template,
open=record.open,
)
return dto
# Misc
def get_setting(name: str) -> int | None:
"""
Get int setting
"""
with db.Session() as session:
record = session.execute(
select(Settings).where(Settings.name == name)
).one_or_none()
if not record:
return None
return record.f_int
def set_setting(name: str, value: int) -> None:
"""
Add int setting
"""
with db.Session() as session:
record = session.execute(
select(Settings).where(Settings.name == name)
).one_or_none()
if not record:
record = Settings(session=session, name=name)
if not record:
raise ApplicationError("Can't create Settings record")
record.f_int = value
session.commit()

View File

@ -1,29 +0,0 @@
# Standard library imports
# PyQt imports
# Third party imports
import vlc # type: ignore
# App imports
class VLCManager:
"""
Singleton class to ensure we only ever have one vlc Instance
"""
__instance = None
def __init__(self) -> None:
if VLCManager.__instance is None:
self.vlc_instance = vlc.Instance()
VLCManager.__instance = self
else:
raise Exception("Attempted to create a second VLCManager instance")
@staticmethod
def get_instance() -> vlc.Instance:
if VLCManager.__instance is None:
VLCManager()
return VLCManager.__instance

View File

@ -0,0 +1,40 @@
# Standard library imports
import unittest
# PyQt imports
# Third party imports
# App imports
from app.models import (
db,
)
class MyTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Runs once before any test in this class"""
pass
@classmethod
def tearDownClass(cls):
"""Runs once after all tests"""
pass
def setUp(self):
"""Runs before each test"""
db.create_all()
def tearDown(self):
"""Runs after each test"""
db.drop_all()
def test_xxx(self):
"""Comment"""
pass

View File

@ -134,122 +134,6 @@ class TestMMMiscRowMove(unittest.TestCase):
def tearDown(self): def tearDown(self):
db.drop_all() db.drop_all()
def test_move_rows_test2(self):
# move row 3 to row 5
self.model.move_rows([3], 5)
# Check we have all rows and plr_rownums are correct
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
if row not in [3, 4, 5]:
assert self.model.playlist_rows[row].note == str(row)
elif row == 3:
assert self.model.playlist_rows[row].note == str(4)
elif row == 4:
assert self.model.playlist_rows[row].note == str(3)
elif row == 5:
assert self.model.playlist_rows[row].note == str(5)
def test_move_rows_test3(self):
# move row 4 to row 3
self.model.move_rows([4], 3)
# Check we have all rows and plr_rownums are correct
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
if row not in [3, 4]:
assert self.model.playlist_rows[row].note == str(row)
elif row == 3:
assert self.model.playlist_rows[row].note == str(4)
elif row == 4:
assert self.model.playlist_rows[row].note == str(3)
def test_move_rows_test4(self):
# move row 4 to row 2
self.model.move_rows([4], 2)
# Check we have all rows and plr_rownums are correct
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
if row not in [2, 3, 4]:
assert self.model.playlist_rows[row].note == str(row)
elif row == 2:
assert self.model.playlist_rows[row].note == str(4)
elif row == 3:
assert self.model.playlist_rows[row].note == str(2)
elif row == 4:
assert self.model.playlist_rows[row].note == str(3)
def test_move_rows_test5(self):
# move rows [1, 4, 5, 10] → 8
self.model.move_rows([1, 4, 5, 10], 8)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 2, 3, 6, 7, 1, 4, 5, 10, 8, 9]
def test_move_rows_test6(self):
# move rows [3, 6] → 5
self.model.move_rows([3, 6], 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 4, 3, 6, 5, 7, 8, 9, 10]
def test_move_rows_test7(self):
# move rows [3, 5, 6] → 8
self.model.move_rows([3, 5, 6], 8)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 4, 7, 3, 5, 6, 8, 9, 10]
def test_move_rows_test8(self):
# move rows [7, 8, 10] → 5
self.model.move_rows([7, 8, 10], 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 3, 4, 7, 8, 10, 5, 6, 9]
def test_move_rows_test9(self):
# move rows [1, 2, 3] → 0
# Replicate issue 244
self.model.move_rows([0, 1, 2, 3], 0)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
def test_insert_header_row_end(self): def test_insert_header_row_end(self):
# insert header row at end of playlist # insert header row at end of playlist

274
tests/test_repository.py Normal file
View File

@ -0,0 +1,274 @@
# Standard library imports
import unittest
# PyQt imports
# Third party imports
# App imports
from app import playlistmodel
from app import repository
from app.models import db
from classes import PlaylistDTO
from playlistmodel import PlaylistModel
class MyTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Runs once before any test in this class"""
cls.isa_path = "testdata/isa.mp3"
cls.isa_title = "I'm So Afraid"
cls.isa_artist = "Fleetwood Mac"
cls.mom_path = "testdata/mom.mp3"
cls.mom_title = "Man of Mystery"
cls.mom_artist = "The Shadows"
@classmethod
def tearDownClass(cls):
"""Runs once after all tests"""
pass
def setUp(self):
"""Runs before each test"""
db.create_all()
def create_playlist_and_model(
self, playlist_name: str
) -> (PlaylistDTO, PlaylistModel):
# Create a playlist and model
playlist = repository.create_playlist(name=playlist_name, template_id=0)
assert playlist
model = playlistmodel.PlaylistModel(playlist.playlist_id, is_template=False)
assert model
return (playlist, model)
def create_playlist_model_tracks(self, playlist_name: str):
(playlist, model) = self.create_playlist_and_model("my playlist")
# Create tracks
self.track1 = repository.create_track(self.isa_path)
self.track2 = repository.create_track(self.mom_path)
# Add tracks and header to playlist
self.row0 = repository.insert_row(
playlist.playlist_id,
row_number=0,
track_id=self.track1.track_id,
note="track 1",
)
self.row1 = repository.insert_row(
playlist.playlist_id,
row_number=1,
track_id=0,
note="Header row",
)
self.row2 = repository.insert_row(
playlist.playlist_id,
row_number=2,
track_id=self.track2.track_id,
note="track 2",
)
def create_rows(
self, playlist_name: str, number_of_rows: int
) -> (PlaylistDTO, PlaylistModel):
(playlist, model) = self.create_playlist_and_model(playlist_name)
for row_number in range(number_of_rows):
repository.insert_row(
playlist.playlist_id, row_number, None, str(row_number)
)
return (playlist, model)
def tearDown(self):
"""Runs after each test"""
db.drop_all()
def test_add_track_to_header(self):
"""Add a track to a header row"""
self.create_playlist_model_tracks("my playlist")
repository.add_track_to_header(self.row1.playlistrow_id, self.track2.track_id)
result = repository.get_playlist_row(self.row1.playlistrow_id)
assert result.track_id == self.track2.track_id
def test_create_track(self):
repository.create_track(self.isa_path)
results = repository.get_all_tracks()
assert len(results) == 1
assert results[0].path == self.isa_path
def test_get_track_by_id(self):
dto = repository.create_track(self.isa_path)
result = repository.track_by_id(dto.track_id)
assert result.path == self.isa_path
def test_get_track_by_artist(self):
_ = repository.create_track(self.isa_path)
_ = repository.create_track(self.mom_path)
result_isa = repository.tracks_like_artist(self.isa_artist)
assert len(result_isa) == 1
assert result_isa[0].artist == self.isa_artist
result_mom = repository.tracks_like_artist(self.mom_artist)
assert len(result_mom) == 1
assert result_mom[0].artist == self.mom_artist
def test_get_track_by_title(self):
_ = repository.create_track(self.isa_path)
_ = repository.create_track(self.mom_path)
result_isa = repository.tracks_like_title(self.isa_title)
assert len(result_isa) == 1
assert result_isa[0].title == self.isa_title
result_mom = repository.tracks_like_title(self.mom_title)
assert len(result_mom) == 1
assert result_mom[0].title == self.mom_title
def test_move_rows_test1(self):
# move row 3 to row 5
number_of_rows = 10
(playlist, model) = self.create_rows("test_move_rows_test1", number_of_rows)
repository.move_rows([3], playlist.playlist_id, 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 4, 5, 3, 6, 7, 8, 9]
def test_move_rows_test2(self):
# move row 4 to row 3
number_of_rows = 10
(playlist, model) = self.create_rows("test_move_rows_test2", number_of_rows)
repository.move_rows([4], playlist.playlist_id, 3)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 4, 3, 5, 6, 7, 8, 9]
def test_move_rows_test3(self):
# move row 4 to row 2
number_of_rows = 10
(playlist, model) = self.create_rows("test_move_rows_test3", number_of_rows)
repository.move_rows([4], playlist.playlist_id, 2)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 4, 2, 3, 5, 6, 7, 8, 9]
def test_move_rows_test4(self):
# move rows [1, 4, 5, 10] → 8
number_of_rows = 11
(playlist, model) = self.create_rows("test_move_rows_test4", number_of_rows)
repository.move_rows([1, 4, 5, 10], playlist.playlist_id, 8)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 2, 3, 6, 7, 8, 1, 4, 5, 10, 9]
def test_move_rows_test5(self):
# move rows [3, 6] → 5
number_of_rows = 11
(playlist, model) = self.create_rows("test_move_rows_test5", number_of_rows)
repository.move_rows([3, 6], playlist.playlist_id, 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 4, 5, 3, 6, 7, 8, 9, 10]
def test_move_rows_test6(self):
# move rows [3, 5, 6] → 8
number_of_rows = 11
(playlist, model) = self.create_rows("test_move_rows_test6", number_of_rows)
repository.move_rows([3, 5, 6], playlist.playlist_id, 8)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 4, 7, 8, 9, 10, 3, 5, 6]
def test_move_rows_test7(self):
# move rows [7, 8, 10] → 5
number_of_rows = 11
(playlist, model) = self.create_rows("test_move_rows_test7", number_of_rows)
repository.move_rows([7, 8, 10], playlist.playlist_id, 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 3, 4, 7, 8, 10, 5, 6, 9]
def test_move_rows_test8(self):
# move rows [1, 2, 3] → 0
# Replicate issue 244
number_of_rows = 11
(playlist, model) = self.create_rows("test_move_rows_test8", number_of_rows)
repository.move_rows([0, 1, 2, 3], playlist.playlist_id, 0)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
def test_move_rows_to_playlist(self):
number_of_rows = 11
rows_to_move = [2, 4, 6]
to_row = 5
(playlist_src, model_src) = self.create_rows("src playlist", number_of_rows)
(playlist_dst, model_dst) = self.create_rows("dst playlist", number_of_rows)
repository.move_rows(
rows_to_move, playlist_src.playlist_id, to_row, playlist_dst.playlist_id
)
# Check we have all rows and plr_rownums are correct
new_order_src = []
for row in repository.get_playlist_rows(playlist_src.playlist_id):
new_order_src.append(int(row.note))
assert new_order_src == [0, 1, 3, 5, 7, 8, 9, 10]
new_order_dst = []
for row in repository.get_playlist_rows(playlist_dst.playlist_id):
new_order_dst.append(int(row.note))
assert new_order_dst == [0, 1, 2, 3, 4, 2, 4, 6, 5, 6, 7, 8, 9, 10]
def test_remove_rows(self):
pass
def test_get_playlist_by_id(self):
pass
def test_settings(self):
pass