Compare commits

..

33 Commits

Author SHA1 Message Date
Keith Edmunds
0f22671e38 Fix writing m3u file 2025-04-11 17:06:50 +01:00
Keith Edmunds
a160473d9e WIP folded get_filtered_tracks into repository 2025-04-11 17:04:42 +01:00
Keith Edmunds
f9c060b091 WIP: add tracks from queries OK 2025-04-11 15:59:15 +01:00
Keith Edmunds
b0385789be All test_repository tests pass 2025-04-11 13:05:14 +01:00
Keith Edmunds
e03e64542f Rolled back complexity of _tracks_where 2025-04-11 13:04:58 +01:00
Keith Edmunds
83f5ad5303 WIP: Make TrackDTO a member, not parent of, PlaylistRow 2025-04-11 09:23:17 +01:00
Keith Edmunds
f9c8541b17 WIP: moving to signals 2025-04-08 16:11:39 +01:00
Keith Edmunds
5e492f4569 WIP: sessions only in repository and mocels 2025-04-07 18:33:16 +01:00
Keith Edmunds
f3b1e05e83 WIP Remove repository.py dependency on helpers.py 2025-04-05 16:32:31 +01:00
Keith Edmunds
e71f1d072f Update environment 2025-04-05 15:52:34 +01:00
Keith Edmunds
c182a69a5d WIP: remove session from playlistmodel 2025-04-04 18:54:46 +01:00
Keith Edmunds
e39518e5ee WIP: remove sessions from file_importer 2025-04-04 18:52:31 +01:00
Keith Edmunds
4eaab98745 WIP: progressing no sessions in app files 2025-04-04 16:19:17 +01:00
Keith Edmunds
2fce0b34be WIP: clean up imports 2025-04-04 16:17:51 +01:00
Keith Edmunds
ed7ac0758c No more sessions in playlists!
Save/restore playlist column widths.
2025-03-30 13:55:17 +01:00
Keith Edmunds
098ce7198e Remove stack dump from ApplicationError dialog 2025-03-30 13:54:10 +01:00
Keith Edmunds
38b166737b WIP: add track to header
Logic works; playlistrow.py doesn't yet update database but prints a
message saying db needs to be updated.
2025-03-30 13:30:04 +01:00
Keith Edmunds
0ea12eb9d9 Clean up merge from dev 2025-03-30 12:05:41 +01:00
Keith Edmunds
75cc7a3f19 Merge changes from dev 2025-03-30 11:56:24 +01:00
Keith Edmunds
f64671d126 Improve function logging
Use @log_call decorator

Add 'checked' parameter to menu slots because PyQt6 will pass a
boolean 'checked' parameter even when the menu item can't be checked.

Remove superfluous logging calls.
2025-03-29 21:04:59 +00:00
Keith Edmunds
2bf1bc64e7 WIP: Unify function to move rows within/between playlists 2025-03-29 18:20:38 +00:00
Keith Edmunds
3c7fc20e5a Remove kae.py from git 2025-03-29 18:20:38 +00:00
Keith Edmunds
52d2269ece WIP: Move rows within and between playlists working 2025-03-29 18:20:38 +00:00
Keith Edmunds
3cd764c893 WIP: moving rows within playlist works 2025-03-29 18:20:38 +00:00
Keith Edmunds
65878b0b75 WIP: all tests for move rows within playlist working 2025-03-29 18:20:38 +00:00
Keith Edmunds
4e89d72a8f WIP: move within playlist tests working 2025-03-29 18:20:38 +00:00
Keith Edmunds
92ecb632b5 Report correct line for ApplicationError 2025-03-29 18:20:38 +00:00
Keith Edmunds
6296566c2c WIP: Can play tracks without errors 2025-03-29 18:20:38 +00:00
Keith Edmunds
7e5b170f5e Use @singleton decorator 2025-03-29 18:20:38 +00:00
Keith Edmunds
3db71a08ae WIP remove sessions, use reporistory 2025-03-29 18:20:38 +00:00
Keith Edmunds
7b0e2b2c6c WIP: playlists load, can't play track 2025-03-29 18:20:38 +00:00
Keith Edmunds
4265472d73 Keep track of selected rows in model 2025-03-29 18:20:38 +00:00
Keith Edmunds
c94cadf24f WIP: Use PlaylistRowDTO to isolate SQLAlchemy objects 2025-03-29 18:20:38 +00:00
26 changed files with 3927 additions and 2823 deletions

1
.gitattributes vendored
View File

@ -1 +0,0 @@
*.py diff=python

View File

@ -69,8 +69,7 @@ class AudacityController:
select_status = self._send_command("SelectAll")
log.debug(f"{select_status=}")
# Escape any double quotes in filename
export_cmd = f'Export2: Filename="{self.path.replace('"', '\\"')}" NumChannels=2'
export_cmd = f'Export2: Filename="{self.path}" NumChannels=2'
export_status = self._send_command(export_cmd)
log.debug(f"{export_status=}")
self.path = ""

View File

@ -1,7 +1,7 @@
# Standard library imports
from __future__ import annotations
from dataclasses import dataclass
import datetime as dt
from enum import auto, Enum
import functools
import threading
@ -46,6 +46,55 @@ def singleton(cls):
return wrapper_singleton
# DTOs
@dataclass
class PlaylistDTO:
playlist_id: int
name: str
open: bool = False
favourite: bool = False
is_template: bool = False
@dataclass
class QueryDTO:
query_id: int
name: str
favourite: bool
filter: Filter
@dataclass
class TrackDTO:
track_id: int
artist: str
bitrate: int
duration: int
fade_at: int
intro: int | None
path: str
silence_at: int
start_gap: int
title: str
lastplayed: dt.datetime | None
@dataclass
class PlaylistRowDTO:
note: str
played: bool
playlist_id: int
playlistrow_id: int
row_number: int
track: TrackDTO | None
@dataclass
class PlaydatesDTO(TrackDTO):
playdate_id: int
lastplayed: dt.datetime
class ApplicationError(Exception):
"""
Custom exception
@ -91,31 +140,6 @@ class Filter:
duration_unit: str = "minutes"
@singleton
@dataclass
class MusicMusterSignals(QObject):
"""
Class for all MusicMuster signals. See:
- https://zetcode.com/gui/pyqt5/eventssignals/
- https://stackoverflow.com/questions/62654525/emit-a-signal-from-another-class-to-main-class
"""
begin_reset_model_signal = pyqtSignal(int)
enable_escape_signal = pyqtSignal(bool)
end_reset_model_signal = pyqtSignal(int)
next_track_changed_signal = pyqtSignal()
resize_rows_signal = pyqtSignal(int)
search_songfacts_signal = pyqtSignal(str)
search_wikipedia_signal = pyqtSignal(str)
show_warning_signal = pyqtSignal(str, str)
span_cells_signal = pyqtSignal(int, int, int, int, int)
status_message_signal = pyqtSignal(str, int)
track_ended_signal = pyqtSignal()
def __post_init__(self):
super().__init__()
class PlaylistStyle(QProxyStyle):
def drawPrimitive(self, element, option, painter, widget=None):
"""
@ -152,3 +176,60 @@ class Tags(NamedTuple):
class TrackInfo(NamedTuple):
track_id: int
row_number: int
# Classes for signals
@dataclass
class InsertRows:
playlist_id: int
from_row: int
to_row: int
@dataclass
class InsertTrack:
playlist_id: int
track_id: int | None
note: str
@dataclass
class PlayTrack:
playlist_id: int
track_id: int
@singleton
@dataclass
class MusicMusterSignals(QObject):
"""
Class for all MusicMuster signals. See:
- https://zetcode.com/gui/pyqt5/eventssignals/
- https://stackoverflow.com/questions/62654525/emit-a-signal-from-another-class-to-main-class
"""
begin_reset_model_signal = pyqtSignal(int)
enable_escape_signal = pyqtSignal(bool)
end_reset_model_signal = pyqtSignal(int)
next_track_changed_signal = pyqtSignal()
resize_rows_signal = pyqtSignal(int)
search_songfacts_signal = pyqtSignal(str)
search_wikipedia_signal = pyqtSignal(str)
show_warning_signal = pyqtSignal(str, str)
signal_add_track_to_header = pyqtSignal(InsertTrack)
signal_begin_insert_rows = pyqtSignal(InsertRows)
signal_end_insert_rows = pyqtSignal(int)
signal_insert_track = pyqtSignal(InsertTrack)
signal_playlist_selected_rows = pyqtSignal(int, list)
signal_set_next_row = pyqtSignal(int)
# signal_set_next_track takes a PlaylistRow as an argument. We can't
# specify that here as it requires us to import PlaylistRow from
# playlistrow.py, which itself imports MusicMusterSignals
signal_set_next_track = pyqtSignal(object)
signal_track_started = pyqtSignal(PlayTrack)
span_cells_signal = pyqtSignal(int, int, int, int, int)
status_message_signal = pyqtSignal(str, int)
track_ended_signal = pyqtSignal()
def __post_init__(self):
super().__init__()

View File

@ -34,6 +34,7 @@ class Config(object):
COLOUR_QUERYLIST_SELECTED = "#d3ffd3"
COLOUR_UNREADABLE = "#dc3545"
COLOUR_WARNING_TIMER = "#ffc107"
DB_NOT_FOUND = "Database not found"
DBFS_SILENCE = -50
DEFAULT_COLUMN_WIDTH = 200
DISPLAY_SQL = False
@ -112,6 +113,8 @@ class Config(object):
PLAYLIST_ICON_CURRENT = ":/icons/green-circle.png"
PLAYLIST_ICON_NEXT = ":/icons/yellow-circle.png"
PLAYLIST_ICON_TEMPLATE = ":/icons/redstar.png"
PLAYLIST_PENDING_MOVE = -1
PLAYLIST_FAILED_MOVE = -2
PREVIEW_ADVANCE_MS = 5000
PREVIEW_BACK_MS = 5000
PREVIEW_END_BUFFER_MS = 1000
@ -132,14 +135,14 @@ class Config(object):
TRACK_TIME_FORMAT = "%H:%M:%S"
VLC_MAIN_PLAYER_NAME = "MusicMuster Main Player"
VLC_PREVIEW_PLAYER_NAME = "MusicMuster Preview Player"
VLC_VOLUME_DEFAULT = 100
VLC_VOLUME_DROP3db = 70
VLC_VOLUME_DEFAULT = 75
VLC_VOLUME_DROP3db = 65
WARNING_MS_BEFORE_FADE = 5500
WARNING_MS_BEFORE_SILENCE = 5500
WEB_ZOOM_FACTOR = 1.2
WIKIPEDIA_ON_NEXT = False
# These rely on earlier definitions
HIDE_PLAYED_MODE = HIDE_PLAYED_MODE_TRACKS
HIDE_PLAYED_MODE = HIDE_PLAYED_MODE_SECTIONS
IMPORT_DESTINATION = os.path.join(ROOT, "Singles")
REPLACE_FILES_DEFAULT_DESTINATION = os.path.dirname(REPLACE_FILES_DEFAULT_SOURCE)

View File

@ -2,230 +2,168 @@
from typing import Optional
# PyQt imports
from PyQt6.QtCore import QEvent, Qt
from PyQt6.QtGui import QKeyEvent
from PyQt6.QtCore import Qt
from PyQt6.QtWidgets import (
QDialog,
QHBoxLayout,
QLabel,
QLineEdit,
QListWidget,
QListWidgetItem,
QMainWindow,
QPushButton,
QVBoxLayout,
)
# Third party imports
from sqlalchemy.orm.session import Session
# App imports
from classes import MusicMusterSignals
from classes import ApplicationError, InsertTrack, MusicMusterSignals
from helpers import (
ask_yes_no,
get_relative_date,
ms_to_mmss,
)
from log import log
from models import Settings, Tracks
from playlistmodel import PlaylistModel
from ui import dlg_TrackSelect_ui
import repository
class TrackSelectDialog(QDialog):
"""Select track from database"""
class TrackInsertDialog(QDialog):
def __init__(
self,
parent: QMainWindow,
session: Session,
new_row_number: int,
base_model: PlaylistModel,
playlist_id: int,
add_to_header: Optional[bool] = False,
*args: Qt.WindowType,
**kwargs: Qt.WindowType,
) -> None:
"""
Subclassed QDialog to manage track selection
"""
super().__init__(parent, *args, **kwargs)
self.session = session
self.new_row_number = new_row_number
self.base_model = base_model
super().__init__(parent)
self.playlist_id = playlist_id
self.add_to_header = add_to_header
self.ui = dlg_TrackSelect_ui.Ui_Dialog()
self.ui.setupUi(self)
self.ui.btnAdd.clicked.connect(self.add_selected)
self.ui.btnAddClose.clicked.connect(self.add_selected_and_close)
self.ui.btnClose.clicked.connect(self.close)
self.ui.matchList.itemDoubleClicked.connect(self.add_selected)
self.ui.matchList.itemSelectionChanged.connect(self.selection_changed)
self.ui.radioTitle.toggled.connect(self.title_artist_toggle)
self.ui.searchString.textEdited.connect(self.chars_typed)
self.track: Optional[Tracks] = None
self.signals = MusicMusterSignals()
self.setWindowTitle("Insert Track")
record = Settings.get_setting(self.session, "dbdialog_width")
width = record.f_int or 800
record = Settings.get_setting(self.session, "dbdialog_height")
height = record.f_int or 600
# Title input on one line
self.title_label = QLabel("Title:")
self.title_edit = QLineEdit()
self.title_edit.textChanged.connect(self.update_list)
title_layout = QHBoxLayout()
title_layout.addWidget(self.title_label)
title_layout.addWidget(self.title_edit)
# Track list
self.track_list = QListWidget()
self.track_list.itemDoubleClicked.connect(self.add_clicked)
self.track_list.itemSelectionChanged.connect(self.selection_changed)
# Note input on one line
self.note_label = QLabel("Note:")
self.note_edit = QLineEdit()
note_layout = QHBoxLayout()
note_layout.addWidget(self.note_label)
note_layout.addWidget(self.note_edit)
# Track path
self.path = QLabel()
path_layout = QHBoxLayout()
path_layout.addWidget(self.path)
# Buttons
self.add_btn = QPushButton("Add")
self.add_close_btn = QPushButton("Add and close")
self.close_btn = QPushButton("Close")
self.add_btn.clicked.connect(self.add_clicked)
self.add_close_btn.clicked.connect(self.add_and_close_clicked)
self.close_btn.clicked.connect(self.close)
btn_layout = QHBoxLayout()
btn_layout.addWidget(self.add_btn)
btn_layout.addWidget(self.add_close_btn)
btn_layout.addWidget(self.close_btn)
# Main layout
layout = QVBoxLayout()
layout.addLayout(title_layout)
layout.addWidget(self.track_list)
layout.addLayout(note_layout)
layout.addLayout(path_layout)
layout.addLayout(btn_layout)
self.setLayout(layout)
self.resize(800, 600)
width = repository.get_setting("dbdialog_width") or 800
height = repository.get_setting("dbdialog_height") or 800
self.resize(width, height)
if add_to_header:
self.ui.lblNote.setVisible(False)
self.ui.txtNote.setVisible(False)
self.signals = MusicMusterSignals()
def add_selected(self) -> None:
"""Handle Add button"""
track = None
if self.ui.matchList.selectedItems():
item = self.ui.matchList.currentItem()
if item:
track = item.data(Qt.ItemDataRole.UserRole)
note = self.ui.txtNote.text()
if not (track or note):
def update_list(self, text: str) -> None:
self.track_list.clear()
if text.strip() == "":
# Do not search or populate list if input is empty
return
track_id = None
if track:
track_id = track.id
if text.startswith("a/") and len(text) > 2:
self.tracks = repository.tracks_by_artist(text[2:])
else:
self.tracks = repository.tracks_by_title(text)
if note and not track_id:
self.base_model.insert_row(self.new_row_number, track_id, note)
self.ui.txtNote.clear()
self.new_row_number += 1
for track in self.tracks:
duration_str = ms_to_mmss(track.duration)
last_played_str = get_relative_date(track.lastplayed)
item_str = (
f"{track.title} - {track.artist} [{duration_str}] {last_played_str}"
)
item = QListWidgetItem(item_str)
item.setData(Qt.ItemDataRole.UserRole, track.track_id)
self.track_list.addItem(item)
def get_selected_track_id(self) -> int | None:
selected_items = self.track_list.selectedItems()
if selected_items:
return selected_items[0].data(Qt.ItemDataRole.UserRole)
return None
def add_clicked(self):
track_id = self.get_selected_track_id()
note_text = self.note_edit.text()
if track_id is None and not note_text:
return
self.ui.txtNote.clear()
self.select_searchtext()
insert_track_data = InsertTrack(self.playlist_id, track_id, note_text)
if track_id is None:
log.error("track_id is None and should not be")
return
# Check whether track is already in playlist
move_existing = False
existing_prd = self.base_model.is_track_in_playlist(track_id)
if existing_prd is not None:
if ask_yes_no(
"Duplicate row",
"Track already in playlist. " "Move to new location?",
default_yes=True,
):
move_existing = True
self.title_edit.clear()
self.note_edit.clear()
self.track_list.clear()
self.title_edit.setFocus()
if self.add_to_header:
if move_existing and existing_prd: # "and existing_prd" for mypy's benefit
self.base_model.move_track_to_header(
self.new_row_number, existing_prd, note
)
else:
self.base_model.add_track_to_header(self.new_row_number, track_id)
# Close dialog - we can only add one track to a header
self.signals.signal_add_track_to_header.emit(insert_track_data)
self.accept()
else:
# Adding a new track row
if move_existing and existing_prd: # "and existing_prd" for mypy's benefit
self.base_model.move_track_add_note(
self.new_row_number, existing_prd, note
)
else:
self.base_model.insert_row(self.new_row_number, track_id, note)
self.signals.signal_insert_track.emit(insert_track_data)
self.new_row_number += 1
def add_selected_and_close(self) -> None:
"""Handle Add and Close button"""
self.add_selected()
def add_and_close_clicked(self):
self.add_clicked()
self.accept()
def chars_typed(self, s: str) -> None:
"""Handle text typed in search box"""
self.ui.matchList.clear()
if len(s) > 0:
if s.startswith("a/") and len(s) > 2:
matches = Tracks.search_artists(self.session, "%" + s[2:])
elif self.ui.radioTitle.isChecked():
matches = Tracks.search_titles(self.session, "%" + s)
else:
matches = Tracks.search_artists(self.session, "%" + s)
if matches:
for track in matches:
last_played = None
last_playdate = max(
track.playdates, key=lambda p: p.lastplayed, default=None
)
if last_playdate:
last_played = last_playdate.lastplayed
t = QListWidgetItem()
track_text = (
f"{track.title} - {track.artist} "
f"[{ms_to_mmss(track.duration)}] "
f"({get_relative_date(last_played)})"
)
t.setText(track_text)
t.setData(Qt.ItemDataRole.UserRole, track)
self.ui.matchList.addItem(t)
def closeEvent(self, event: Optional[QEvent]) -> None:
"""
Override close and save dialog coordinates
"""
if not event:
return
record = Settings.get_setting(self.session, "dbdialog_height")
record.f_int = self.height()
record = Settings.get_setting(self.session, "dbdialog_width")
record.f_int = self.width()
self.session.commit()
event.accept()
def keyPressEvent(self, event: QKeyEvent | None) -> None:
"""
Clear selection on ESC if there is one
"""
if event and event.key() == Qt.Key.Key_Escape:
if self.ui.matchList.selectedItems():
self.ui.matchList.clearSelection()
return
super(TrackSelectDialog, self).keyPressEvent(event)
def select_searchtext(self) -> None:
"""Select the searchbox"""
self.ui.searchString.selectAll()
self.ui.searchString.setFocus()
def selection_changed(self) -> None:
"""Display selected track path in dialog box"""
if not self.ui.matchList.selectedItems():
self.path.setText("")
track_id = self.get_selected_track_id()
if track_id is None:
return
item = self.ui.matchList.currentItem()
track = item.data(Qt.ItemDataRole.UserRole)
last_playdate = max(track.playdates, key=lambda p: p.lastplayed, default=None)
if last_playdate:
last_played = last_playdate.lastplayed
else:
last_played = None
path_text = f"{track.path} ({get_relative_date(last_played)})"
tracklist = [t for t in self.tracks if t.track_id == track_id]
if not tracklist:
return
if len(tracklist) > 1:
raise ApplicationError("More than one track returned")
track = tracklist[0]
self.ui.dbPath.setText(path_text)
def title_artist_toggle(self) -> None:
"""
Handle switching between searching for artists and searching for
titles
"""
# Logic is handled already in chars_typed(), so just call that.
self.chars_typed(self.ui.searchString.text())
self.path.setText(track.path)

View File

@ -4,7 +4,6 @@ from dataclasses import dataclass, field
from fuzzywuzzy import fuzz # type: ignore
import os.path
import threading
from typing import Optional, Sequence
import os
import shutil
@ -32,19 +31,21 @@ from classes import (
MusicMusterSignals,
singleton,
Tags,
TrackDTO,
)
from config import Config
from helpers import (
audio_file_extension,
file_is_unreadable,
get_all_track_metadata,
get_audio_metadata,
get_tags,
normalise_track,
show_OK,
)
from log import log
from models import db, Tracks
from music_manager import track_sequence
from playlistrow import TrackSequence
from playlistmodel import PlaylistModel
import helpers
import repository
@dataclass
@ -68,7 +69,7 @@ class TrackFileData:
destination_path: str = ""
import_this_file: bool = False
error: str = ""
file_path_to_remove: Optional[str] = None
file_path_to_remove: str | None = None
track_id: int = 0
track_match_data: list[TrackMatchData] = field(default_factory=list)
@ -121,13 +122,7 @@ class FileImporter:
# Get signals
self.signals = MusicMusterSignals()
def _get_existing_tracks(self) -> Sequence[Tracks]:
"""
Return a list of all existing Tracks
"""
with db.Session() as session:
return Tracks.get_all(session)
self.existing_tracks: list[TrackDTO] = []
def start(self) -> None:
"""
@ -147,7 +142,7 @@ class FileImporter:
# Refresh list of existing tracks as they may have been updated
# by previous imports
self.existing_tracks = self._get_existing_tracks()
self.existing_tracks = repository.get_all_tracks()
for infile in [
os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f)
@ -201,7 +196,6 @@ class FileImporter:
self.sort_track_match_data(tfd)
selection = self.get_user_choices(tfd)
if self.process_selection(tfd, selection):
if self.extension_check(tfd):
if self.validate_file_data(tfd):
tfd.import_this_file = True
@ -237,26 +231,6 @@ class FileImporter:
return True
def extension_check(self, tfd: TrackFileData) -> bool:
"""
If we are replacing an existing file, check that the correct file
extension of the replacement file matches the existing file
extension and return True if it does (or if there is no exsting
file), else False.
"""
if not tfd.file_path_to_remove:
return True
if tfd.file_path_to_remove.endswith(audio_file_extension(tfd.source_path)):
return True
tfd.error = (
f"Existing file ({tfd.file_path_to_remove}) has a different "
f"extension to replacement file ({tfd.source_path})"
)
return False
def find_similar(self, tfd: TrackFileData) -> None:
"""
- Search title in existing tracks
@ -278,7 +252,7 @@ class FileImporter:
artist_match=artist_score,
title=existing_track.title,
title_match=title_score,
track_id=existing_track.id,
track_id=existing_track.track_id,
)
)
@ -411,12 +385,14 @@ class FileImporter:
else:
tfd.destination_path = existing_track_path
def _get_existing_track(self, track_id: int) -> Tracks:
def _get_existing_track(self, track_id: int) -> TrackDTO:
"""
Lookup in existing track in the local cache and return it
"""
existing_track_records = [a for a in self.existing_tracks if a.id == track_id]
existing_track_records = [
a for a in self.existing_tracks if a.track_id == track_id
]
if len(existing_track_records) != 1:
raise ApplicationError(
f"Internal error in _get_existing_track: {existing_track_records=}"
@ -465,8 +441,7 @@ class FileImporter:
if tfd.track_id == 0 and tfd.destination_path != tfd.file_path_to_remove:
while os.path.exists(tfd.destination_path):
msg = (
"New import requested but default destination path"
f" ({tfd.destination_path})"
f"New import requested but default destination path ({tfd.destination_path})"
" already exists. Click OK and choose where to save this track"
)
show_OK(title="Desintation path exists", msg=msg, parent=None)
@ -490,8 +465,7 @@ class FileImporter:
# file). Check that because the path field in the database is
# unique and so adding a duplicate will give a db integrity
# error.
with db.Session() as session:
if Tracks.get_by_path(session, tfd.destination_path):
if repository.track_by_path(tfd.destination_path):
tfd.error = (
"Importing a new track but destination path already exists "
f"in database ({tfd.destination_path})"
@ -618,7 +592,7 @@ class DoTrackImport(QThread):
tags: Tags,
destination_path: str,
track_id: int,
file_path_to_remove: Optional[str] = None,
file_path_to_remove: str | None = None,
) -> None:
"""
Save parameters
@ -648,9 +622,8 @@ class DoTrackImport(QThread):
f"Importing {os.path.basename(self.import_file_path)}", 5000
)
# Get audio metadata in this thread rather than calling
# function to save interactive time
self.audio_metadata = helpers.get_audio_metadata(self.import_file_path)
# Get audio metadata in this thread rather than calling function to save interactive time
self.audio_metadata = get_audio_metadata(self.import_file_path)
# Remove old file if so requested
if self.file_path_to_remove and os.path.exists(self.file_path_to_remove):
@ -659,42 +632,22 @@ class DoTrackImport(QThread):
# Move new file to destination
shutil.move(self.import_file_path, self.destination_track_path)
with db.Session() as session:
if self.track_id == 0:
# Import new track
try:
track = Tracks(
session,
path=self.destination_track_path,
**self.tags._asdict(),
**self.audio_metadata._asdict(),
)
except Exception as e:
self.signals.show_warning_signal.emit(
"Error importing track", str(e)
)
return
else:
track = session.get(Tracks, self.track_id)
if track:
for key, value in self.tags._asdict().items():
if hasattr(track, key):
setattr(track, key, value)
for key, value in self.audio_metadata._asdict().items():
if hasattr(track, key):
setattr(track, key, value)
track.path = self.destination_track_path
else:
log.error(f"Unable to retrieve {self.track_id=}")
return
session.commit()
# Normalise
normalise_track(self.destination_track_path)
helpers.normalise_track(self.destination_track_path)
# Update databse
metadata = get_all_track_metadata(self.destination_track_path)
if self.track_id == 0:
track_dto = repository.create_track(self.destination_track_path, metadata)
else:
track_dto = repository.update_track(
self.destination_track_path, self.track_id, metadata
)
self.signals.status_message_signal.emit(
f"{os.path.basename(self.import_file_path)} imported", 10000
)
self.import_finished.emit(self.import_file_path, track.id)
self.import_finished.emit(self.import_file_path, track_dto.track_id)
class PickMatch(QDialog):
@ -723,6 +676,7 @@ class PickMatch(QDialog):
self.setWindowTitle("New or replace")
layout = QVBoxLayout()
track_sequence = TrackSequence()
# Add instructions
instructions = (

View File

@ -13,7 +13,6 @@ import tempfile
from PyQt6.QtWidgets import QInputDialog, QMainWindow, QMessageBox, QWidget
# Third party imports
import filetype
from mutagen.flac import FLAC # type: ignore
from mutagen.mp3 import MP3 # type: ignore
from pydub import AudioSegment, effects
@ -21,7 +20,7 @@ from pydub.utils import mediainfo
from tinytag import TinyTag, TinyTagException # type: ignore
# App imports
from classes import AudioMetadata, ApplicationError, Tags
from classes import AudioMetadata, ApplicationError, Tags, TrackDTO
from config import Config
from log import log
from models import Tracks
@ -51,14 +50,6 @@ def ask_yes_no(
return button == QMessageBox.StandardButton.Yes
def audio_file_extension(fpath: str) -> str | None:
"""
Return the correct extension for this type of file.
"""
return filetype.guess(fpath).extension
def fade_point(
audio_segment: AudioSegment,
fade_threshold: float = 0.0,
@ -81,7 +72,7 @@ def fade_point(
fade_threshold = max_vol
while (
audio_segment[trim_ms: trim_ms + chunk_size].dBFS < fade_threshold
audio_segment[trim_ms : trim_ms + chunk_size].dBFS < fade_threshold
and trim_ms > 0
): # noqa W503
trim_ms -= chunk_size
@ -103,9 +94,6 @@ def file_is_unreadable(path: Optional[str]) -> bool:
def get_audio_segment(path: str) -> Optional[AudioSegment]:
if not path.endswith(audio_file_extension(path)):
return None
try:
if path.endswith(".mp3"):
return AudioSegment.from_mp3(path)
@ -180,7 +168,7 @@ def get_name(prompt: str, default: str = "") -> str | None:
def get_relative_date(
past_date: Optional[dt.datetime], reference_date: Optional[dt.datetime] = None
past_date: Optional[dt.datetime], now: Optional[dt.datetime] = None
) -> str:
"""
Return how long before reference_date past_date is as string.
@ -194,31 +182,33 @@ def get_relative_date(
if not past_date or past_date == Config.EPOCH:
return "Never"
if not reference_date:
reference_date = dt.datetime.now()
if not now:
now = dt.datetime.now()
# Check parameters
if past_date > reference_date:
return "get_relative_date() past_date is after relative_date"
if past_date > now:
raise ApplicationError("get_relative_date() past_date is after relative_date")
days: int
days_str: str
weeks: int
weeks_str: str
delta = now - past_date
days = delta.days
weeks, days = divmod((reference_date.date() - past_date.date()).days, 7)
if weeks == days == 0:
# Same day so return time instead
return Config.LAST_PLAYED_TODAY_STRING + " " + past_date.strftime("%H:%M")
if weeks == 1:
weeks_str = "week"
else:
weeks_str = "weeks"
if days == 1:
days_str = "day"
else:
days_str = "days"
return f"{weeks} {weeks_str}, {days} {days_str}"
if days == 0:
return "(Today)"
elif days == 1:
return "(Yesterday)"
years, days_remain = divmod(days, 365)
months, days_final = divmod(days_remain, 30)
parts = []
if years:
parts.append(f"{years}y")
if months:
parts.append(f"{months}m")
if days_final:
parts.append(f"{days_final}d")
formatted = " ".join(parts)
return f"({formatted} ago)"
def get_tags(path: str) -> Tags:
@ -276,39 +266,15 @@ def leading_silence(
return min(trim_ms, len(audio_segment))
def ms_to_mmss(
ms: Optional[int],
decimals: int = 0,
negative: bool = False,
none: Optional[str] = None,
) -> str:
def ms_to_mmss(ms: int | None, none: str = "-") -> str:
"""Convert milliseconds to mm:ss"""
minutes: int
remainder: int
seconds: float
if not ms:
if none:
if ms is None:
return none
else:
return "-"
sign = ""
if ms < 0:
if negative:
sign = "-"
else:
ms = 0
minutes, remainder = divmod(ms, 60 * 1000)
seconds = remainder / 1000
minutes, seconds = divmod(ms // 1000, 60)
# if seconds >= 59.5, it will be represented as 60, which looks odd.
# So, fake it under those circumstances
if seconds >= 59.5:
seconds = 59.0
return f"{sign}{minutes:.0f}:{seconds:02.{decimals}f}"
return f"{minutes}:{seconds:02d}"
def normalise_track(path: str) -> None:
@ -365,32 +331,6 @@ def normalise_track(path: str) -> None:
os.remove(temp_path)
def remove_substring_case_insensitive(parent_string: str, substring: str) -> str:
"""
Remove all instances of substring from parent string, case insensitively
"""
# Convert both strings to lowercase for case-insensitive comparison
lower_parent = parent_string.lower()
lower_substring = substring.lower()
# Initialize the result string
result = parent_string
# Continue removing the substring until it's no longer found
while lower_substring in lower_parent:
# Find the index of the substring
index = lower_parent.find(lower_substring)
# Remove the substring
result = result[:index] + result[index + len(substring) :]
# Update the lowercase versions
lower_parent = result.lower()
return result
def send_mail(to_addr: str, from_addr: str, subj: str, body: str) -> None:
# From https://docs.python.org/3/library/email.examples.html

View File

@ -1,56 +0,0 @@
from PyQt6.QtCore import QObject, QTimer, QElapsedTimer
import logging
import time
from config import Config
class EventLoopJitterMonitor(QObject):
def __init__(
self,
parent=None,
interval_ms: int = 20,
jitter_threshold_ms: int = 100,
log_cooldown_s: float = 1.0,
):
super().__init__(parent)
self._interval = interval_ms
self._jitter_threshold = jitter_threshold_ms
self._log_cooldown_s = log_cooldown_s
self._timer = QTimer(self)
self._timer.setInterval(self._interval)
self._timer.timeout.connect(self._on_timeout)
self._elapsed = QElapsedTimer()
self._elapsed.start()
self._last = self._elapsed.elapsed()
# child logger: e.g. "musicmuster.jitter"
self._log = logging.getLogger(f"{Config.LOG_NAME}.jitter")
self._last_log_time = 0.0
def start(self) -> None:
self._timer.start()
def _on_timeout(self) -> None:
now_ms = self._elapsed.elapsed()
delta = now_ms - self._last
self._last = now_ms
if delta > (self._interval + self._jitter_threshold):
self._log_jitter(now_ms, delta)
def _log_jitter(self, now_ms: int, gap_ms: int) -> None:
now = time.monotonic()
# simple rate limit: only one log every log_cooldown_s
if now - self._last_log_time < self._log_cooldown_s:
return
self._last_log_time = now
self._log.warning(
"Event loop gap detected: t=%d ms, gap=%d ms (interval=%d ms)",
now_ms,
gap_ms,
self._interval,
)

View File

@ -80,17 +80,30 @@ log = logging.getLogger(Config.LOG_NAME)
def handle_exception(exc_type, exc_value, exc_traceback):
error = str(exc_value)
"""
Inform user of exception
"""
# Navigate to the inner stack frame
tb = exc_traceback
while tb.tb_next:
tb = tb.tb_next
fname = os.path.basename(tb.tb_frame.f_code.co_filename)
lineno = tb.tb_lineno
msg = f"ApplicationError: {exc_value}\nat {fname}:{lineno}"
logmsg = f"ApplicationError: {exc_value} at {fname}:{lineno}"
if issubclass(exc_type, ApplicationError):
log.error(error)
log.error(logmsg)
else:
# Handle unexpected errors (log and display)
error_msg = "".join(traceback.format_exception(exc_type, exc_value, exc_traceback))
print(stackprinter.format(exc_value, suppressed_paths=['/.venv'], style='darkbg'))
msg = stackprinter.format(exc_value)
log.error(msg)
stack = stackprinter.format(exc_value)
log.error(stack)
log.error(error_msg)
print("Critical error:", error_msg) # Consider logging instead of print
@ -101,11 +114,10 @@ def handle_exception(exc_type, exc_value, exc_traceback):
Config.ERRORS_TO,
Config.ERRORS_FROM,
"Exception (log_uncaught_exceptions) from musicmuster",
msg,
stack,
)
if QApplication.instance() is not None:
fname = os.path.split(exc_traceback.tb_frame.f_code.co_filename)[1]
msg = f"ApplicationError: {error}\nat {fname}:{exc_traceback.tb_lineno}"
QMessageBox.critical(None, "Application Error", msg)
@ -124,13 +136,13 @@ def log_call(func):
args_repr = [truncate_large(a) for a in args]
kwargs_repr = [f"{k}={truncate_large(v)}" for k, v in kwargs.items()]
params_repr = ", ".join(args_repr + kwargs_repr)
log.debug(f"call {func.__name__}({params_repr})")
log.debug(f"call {func.__name__}({params_repr})", stacklevel=2)
try:
result = func(*args, **kwargs)
log.debug(f"return {func.__name__}: {truncate_large(result)}")
log.debug(f"return {func.__name__}: {truncate_large(result)}", stacklevel=2)
return result
except Exception as e:
log.debug(f"exception in {func.__name__}: {e}")
log.debug(f"exception in {func.__name__}: {e}", stacklevel=2)
raise
return wrapper

View File

@ -23,8 +23,8 @@ filters:
# - function-name-1
# - function-name-2
musicmuster:
- update_clocks
- play_next
jittermonitor: []
handlers:
stderr:

View File

@ -241,7 +241,9 @@ class Playlists(dbtables.PlaylistsTable):
"""
session.execute(
update(Playlists).where((Playlists.id.in_(playlist_ids))).values(tab=None)
update(Playlists)
.where(Playlists.id.in_(playlist_ids))
.values(tab=None)
)
def close(self, session: Session) -> None:
@ -395,6 +397,7 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
given playlist_id and row
"""
# TODO: use selectinload?
stmt = (
select(PlaylistRows)
.options(joinedload(cls.track))
@ -435,24 +438,6 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
)
)
@staticmethod
def fixup_rownumbers(session: Session, playlist_id: int) -> None:
"""
Ensure the row numbers for passed playlist have no gaps
"""
plrs = session.scalars(
select(PlaylistRows)
.where(PlaylistRows.playlist_id == playlist_id)
.order_by(PlaylistRows.row_number)
).all()
for i, plr in enumerate(plrs):
plr.row_number = i
# Ensure new row numbers are available to the caller
session.commit()
@classmethod
def plrids_to_plrs(
cls, session: Session, playlist_id: int, plr_ids: list[int]
@ -753,6 +738,9 @@ class Tracks(dbtables.TracksTable):
Return tracks matching filter
"""
# Now implemented in repostory.py
return []
query = select(cls)
# Path specification

View File

@ -3,33 +3,23 @@ from __future__ import annotations
import datetime as dt
from time import sleep
from typing import Optional
# Third party imports
# import line_profiler
import numpy as np
import pyqtgraph as pg # type: ignore
from sqlalchemy.orm.session import Session
import vlc # type: ignore
# PyQt imports
from PyQt6.QtCore import (
pyqtSignal,
QObject,
QThread,
)
from pyqtgraph import PlotWidget
from pyqtgraph.graphicsItems.PlotDataItem import PlotDataItem # type: ignore
from pyqtgraph.graphicsItems.LinearRegionItem import LinearRegionItem # type: ignore
# App imports
from classes import ApplicationError, MusicMusterSignals
from classes import singleton
from config import Config
import helpers
from log import log
from models import PlaylistRows
from vlcmanager import VLCManager
# Define the VLC callback function type
# import ctypes
@ -64,106 +54,6 @@ from vlcmanager import VLCManager
# libc.vsnprintf.restype = ctypes.c_int
class _AddFadeCurve(QObject):
"""
Initialising a fade curve introduces a noticeable delay so carry out in
a thread.
"""
finished = pyqtSignal()
def __init__(
self,
rat: RowAndTrack,
track_path: str,
track_fade_at: int,
track_silence_at: int,
) -> None:
super().__init__()
self.rat = rat
self.track_path = track_path
self.track_fade_at = track_fade_at
self.track_silence_at = track_silence_at
def run(self) -> None:
"""
Create fade curve and add to PlaylistTrack object
"""
fc = _FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at)
if not fc:
log.error(f"Failed to create FadeCurve for {self.track_path=}")
else:
self.rat.fade_graph = fc
self.finished.emit()
class _FadeCurve:
GraphWidget: Optional[PlotWidget] = None
def __init__(
self, track_path: str, track_fade_at: int, track_silence_at: int
) -> None:
"""
Set up fade graph array
"""
audio = helpers.get_audio_segment(track_path)
if not audio:
log.error(f"FadeCurve: could not get audio for {track_path=}")
return None
# Start point of curve is Config.FADE_CURVE_MS_BEFORE_FADE
# milliseconds before fade starts to silence
self.start_ms: int = max(
0, track_fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1
)
self.end_ms: int = track_silence_at
audio_segment = audio[self.start_ms : self.end_ms]
self.graph_array = np.array(audio_segment.get_array_of_samples())
# Calculate the factor to map milliseconds of track to array
self.ms_to_array_factor = len(self.graph_array) / (self.end_ms - self.start_ms)
self.curve: Optional[PlotDataItem] = None
self.region: Optional[LinearRegionItem] = None
def clear(self) -> None:
"""Clear the current graph"""
if self.GraphWidget:
self.GraphWidget.clear()
def plot(self) -> None:
if self.GraphWidget:
self.curve = self.GraphWidget.plot(self.graph_array)
if self.curve:
self.curve.setPen(Config.FADE_CURVE_FOREGROUND)
else:
log.debug("_FadeCurve.plot: no curve")
else:
log.debug("_FadeCurve.plot: no GraphWidget")
def tick(self, play_time: int) -> None:
"""Update volume fade curve"""
if not self.GraphWidget:
return
ms_of_graph = play_time - self.start_ms
if ms_of_graph < 0:
return
if self.region is None:
# Create the region now that we're into fade
self.region = pg.LinearRegionItem([0, 0], bounds=[0, len(self.graph_array)])
self.GraphWidget.addItem(self.region)
# Update region position
if self.region:
self.region.setRegion([0, ms_of_graph * self.ms_to_array_factor])
class _FadeTrack(QThread):
finished = pyqtSignal()
@ -197,21 +87,32 @@ class _FadeTrack(QThread):
self.finished.emit()
# TODO can we move this into the _Music class?
vlc_instance = VLCManager().vlc_instance
@singleton
class VLCManager:
"""
Singleton class to ensure we only ever have one vlc Instance
"""
def __init__(self) -> None:
self.vlc_instance = vlc.Instance()
def get_instance(self) -> vlc.Instance:
return self.vlc_instance
class _Music:
class Music:
"""
Manage the playing of music tracks
"""
def __init__(self, name: str) -> None:
vlc_instance.set_user_agent(name, name)
self.player: Optional[vlc.MediaPlayer] = None
self.name = name
vlc_manager = VLCManager()
self.vlc_instance = vlc_manager.get_instance()
self.vlc_instance.set_user_agent(name, name)
self.player: vlc.MediaPlayer | None = None
self.max_volume: int = Config.VLC_VOLUME_DEFAULT
self.start_dt: Optional[dt.datetime] = None
self.start_dt: dt.datetime | None = None
# Set up logging
# self._set_vlc_log()
@ -239,27 +140,6 @@ class _Music:
# except Exception as e:
# log.error(f"Failed to set up VLC logging: {e}")
def adjust_by_ms(self, ms: int) -> None:
"""Move player position by ms milliseconds"""
if not self.player:
return
elapsed_ms = self.get_playtime()
position = self.get_position()
if not position:
position = 0.0
new_position = max(0.0, position + ((position * ms) / elapsed_ms))
self.set_position(new_position)
# Adjus start time so elapsed time calculations are correct
if new_position == 0:
self.start_dt = dt.datetime.now()
else:
if self.start_dt:
self.start_dt -= dt.timedelta(milliseconds=ms)
else:
self.start_dt = dt.datetime.now() - dt.timedelta(milliseconds=ms)
def fade(self, fade_seconds: int) -> None:
"""
Fade the currently playing track.
@ -293,11 +173,11 @@ class _Music:
elapsed_seconds = (now - self.start_dt).total_seconds()
return int(elapsed_seconds * 1000)
def get_position(self) -> Optional[float]:
def get_position(self) -> float:
"""Return current position"""
if not self.player:
return None
return 0.0
return self.player.get_position()
def is_playing(self) -> bool:
@ -322,7 +202,7 @@ class _Music:
self,
path: str,
start_time: dt.datetime,
position: Optional[float] = None,
position: float | None = None,
) -> None:
"""
Start playing the track at path.
@ -339,7 +219,7 @@ class _Music:
log.error(f"play({path}): path not readable")
return None
self.player = vlc.MediaPlayer(vlc_instance, path)
self.player = vlc.MediaPlayer(self.vlc_instance, path)
if self.player is None:
log.error(f"_Music:play: failed to create MediaPlayer ({path=})")
helpers.show_warning(
@ -363,7 +243,7 @@ class _Music:
self.player.set_position(position)
def set_volume(
self, volume: Optional[int] = None, set_default: bool = True
self, volume: int | None = None, set_default: bool = True
) -> None:
"""Set maximum volume used for player"""
@ -377,6 +257,17 @@ class _Music:
volume = Config.VLC_VOLUME_DEFAULT
self.player.audio_set_volume(volume)
# Ensure volume correct
# For as-yet unknown reasons. sometimes the volume gets
# reset to zero within 200mS or so of starting play. This
# only happened since moving to Debian 12, which uses
# Pipewire for sound (which may be irrelevant).
for _ in range(3):
current_volume = self.player.audio_get_volume()
if current_volume < volume:
self.player.audio_set_volume(volume)
log.debug(f"Reset from {volume=}")
sleep(0.1)
def stop(self) -> None:
"""Immediately stop playing"""
@ -392,333 +283,3 @@ class _Music:
self.player.stop()
self.player.release()
self.player = None
class RowAndTrack:
"""
Object to manage playlist rows and tracks.
"""
def __init__(self, playlist_row: PlaylistRows) -> None:
"""
Initialises data structure.
The passed PlaylistRows object will include a Tracks object if this
row has a track.
"""
# Collect playlistrow data
self.note = playlist_row.note
self.played = playlist_row.played
self.playlist_id = playlist_row.playlist_id
self.playlistrow_id = playlist_row.id
self.row_number = playlist_row.row_number
self.track_id = playlist_row.track_id
# Playlist display data
self.row_fg: Optional[str] = None
self.row_bg: Optional[str] = None
self.note_fg: Optional[str] = None
self.note_bg: Optional[str] = None
# Collect track data if there's a track
if playlist_row.track_id:
self.artist = playlist_row.track.artist
self.bitrate = playlist_row.track.bitrate
self.duration = playlist_row.track.duration
self.fade_at = playlist_row.track.fade_at
self.intro = playlist_row.track.intro
if playlist_row.track.playdates:
self.lastplayed = max(
[a.lastplayed for a in playlist_row.track.playdates]
)
else:
self.lastplayed = Config.EPOCH
self.path = playlist_row.track.path
self.silence_at = playlist_row.track.silence_at
self.start_gap = playlist_row.track.start_gap
self.title = playlist_row.track.title
else:
self.artist = ""
self.bitrate = 0
self.duration = 0
self.fade_at = 0
self.intro = None
self.lastplayed = Config.EPOCH
self.path = ""
self.silence_at = 0
self.start_gap = 0
self.title = ""
# Track playing data
self.end_of_track_signalled: bool = False
self.end_time: Optional[dt.datetime] = None
self.fade_graph: Optional[_FadeCurve] = None
self.fade_graph_start_updates: Optional[dt.datetime] = None
self.resume_marker: Optional[float] = 0.0
self.forecast_end_time: Optional[dt.datetime] = None
self.forecast_start_time: Optional[dt.datetime] = None
self.start_time: Optional[dt.datetime] = None
# Other object initialisation
self.music = _Music(name=Config.VLC_MAIN_PLAYER_NAME)
self.signals = MusicMusterSignals()
def __repr__(self) -> str:
return (
f"<RowAndTrack(playlist_id={self.playlist_id}, "
f"row_number={self.row_number}, "
f"playlistrow_id={self.playlistrow_id}, "
f"note={self.note}, track_id={self.track_id}>"
)
def check_for_end_of_track(self) -> None:
"""
Check whether track has ended. If so, emit track_ended_signal
"""
if self.start_time is None:
return
if self.end_of_track_signalled:
return
if self.music.is_playing():
return
self.start_time = None
if self.fade_graph:
self.fade_graph.clear()
# Ensure that player is released
self.music.fade(0)
self.signals.track_ended_signal.emit()
self.end_of_track_signalled = True
def create_fade_graph(self) -> None:
"""
Initialise and add FadeCurve in a thread as it's slow
"""
self.fadecurve_thread = QThread()
self.worker = _AddFadeCurve(
self,
track_path=self.path,
track_fade_at=self.fade_at,
track_silence_at=self.silence_at,
)
self.worker.moveToThread(self.fadecurve_thread)
self.fadecurve_thread.started.connect(self.worker.run)
self.worker.finished.connect(self.fadecurve_thread.quit)
self.worker.finished.connect(self.worker.deleteLater)
self.fadecurve_thread.finished.connect(self.fadecurve_thread.deleteLater)
self.fadecurve_thread.start()
def drop3db(self, enable: bool) -> None:
"""
If enable is true, drop output by 3db else restore to full volume
"""
if enable:
self.music.set_volume(volume=Config.VLC_VOLUME_DROP3db, set_default=False)
else:
self.music.set_volume(volume=Config.VLC_VOLUME_DEFAULT, set_default=False)
def fade(self, fade_seconds: int = Config.FADEOUT_SECONDS) -> None:
"""Fade music"""
self.resume_marker = self.music.get_position()
self.music.fade(fade_seconds)
self.signals.track_ended_signal.emit()
def is_playing(self) -> bool:
"""
Return True if we're currently playing else False
"""
if self.start_time is None:
return False
return self.music.is_playing()
def move_back(self, ms: int = Config.PREVIEW_BACK_MS) -> None:
"""
Rewind player by ms milliseconds
"""
self.music.adjust_by_ms(ms * -1)
def move_forward(self, ms: int = Config.PREVIEW_ADVANCE_MS) -> None:
"""
Rewind player by ms milliseconds
"""
self.music.adjust_by_ms(ms)
def play(self, position: Optional[float] = None) -> None:
"""Play track"""
now = dt.datetime.now()
self.start_time = now
# Initialise player
self.music.play(self.path, start_time=now, position=position)
self.end_time = now + dt.timedelta(milliseconds=self.duration)
# Calculate time fade_graph should start updating
if self.fade_at:
update_graph_at_ms = max(
0, self.fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1
)
self.fade_graph_start_updates = now + dt.timedelta(
milliseconds=update_graph_at_ms
)
def restart(self) -> None:
"""
Restart player
"""
self.music.adjust_by_ms(self.time_playing() * -1)
def set_forecast_start_time(
self, modified_rows: list[int], start: Optional[dt.datetime]
) -> Optional[dt.datetime]:
"""
Set forecast start time for this row
Update passed modified rows list if we changed the row.
Return new start time
"""
changed = False
if self.forecast_start_time != start:
self.forecast_start_time = start
changed = True
if start is None:
if self.forecast_end_time is not None:
self.forecast_end_time = None
changed = True
new_start_time = None
else:
end_time = start + dt.timedelta(milliseconds=self.duration)
new_start_time = end_time
if self.forecast_end_time != end_time:
self.forecast_end_time = end_time
changed = True
if changed and self.row_number not in modified_rows:
modified_rows.append(self.row_number)
return new_start_time
def stop(self, fade_seconds: int = 0) -> None:
"""
Stop this track playing
"""
self.resume_marker = self.music.get_position()
self.fade(fade_seconds)
# Reset fade graph
if self.fade_graph:
self.fade_graph.clear()
def time_playing(self) -> int:
"""
Return time track has been playing in milliseconds, zero if not playing
"""
if self.start_time is None:
return 0
return self.music.get_playtime()
def time_remaining_intro(self) -> int:
"""
Return milliseconds of intro remaining. Return 0 if no intro time in track
record or if intro has finished.
"""
if not self.intro:
return 0
return max(0, self.intro - self.time_playing())
def time_to_fade(self) -> int:
"""
Return milliseconds until fade time. Return zero if we're not playing.
"""
if self.start_time is None:
return 0
return self.fade_at - self.time_playing()
def time_to_silence(self) -> int:
"""
Return milliseconds until silent. Return zero if we're not playing.
"""
if self.start_time is None:
return 0
return self.silence_at - self.time_playing()
def update_fade_graph(self) -> None:
"""
Update fade graph
"""
if (
not self.is_playing()
or not self.fade_graph_start_updates
or not self.fade_graph
):
return
now = dt.datetime.now()
if self.fade_graph_start_updates > now:
return
self.fade_graph.tick(self.time_playing())
def update_playlist_and_row(self, session: Session) -> None:
"""
Update local playlist_id and row_number from playlistrow_id
"""
plr = session.get(PlaylistRows, self.playlistrow_id)
if not plr:
raise ApplicationError(f"(Can't retrieve PlaylistRows entry, {self=}")
self.playlist_id = plr.playlist_id
self.row_number = plr.row_number
class TrackSequence:
next: Optional[RowAndTrack] = None
current: Optional[RowAndTrack] = None
previous: Optional[RowAndTrack] = None
def set_next(self, rat: Optional[RowAndTrack]) -> None:
"""
Set the 'next' track to be passed rat. Clear
any previous next track. If passed rat is None
just clear existing next track.
"""
# Clear any existing fade graph
if self.next and self.next.fade_graph:
self.next.fade_graph.clear()
if rat is None:
self.next = None
else:
self.next = rat
self.next.create_fade_graph()
track_sequence = TrackSequence()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

589
app/playlistrow.py Normal file
View File

@ -0,0 +1,589 @@
# Standard library imports
import datetime as dt
from typing import Any
# PyQt imports
from PyQt6.QtCore import (
pyqtSignal,
QObject,
QThread,
)
# Third party imports
from pyqtgraph import PlotWidget # type: ignore
from pyqtgraph.graphicsItems.PlotDataItem import PlotDataItem # type: ignore
from pyqtgraph.graphicsItems.LinearRegionItem import LinearRegionItem # type: ignore
import numpy as np
import pyqtgraph as pg # type: ignore
# App imports
from classes import ApplicationError, MusicMusterSignals, PlaylistRowDTO, singleton
from config import Config
import helpers
from log import log
from music_manager import Music
import repository
class PlaylistRow:
"""
Object to manage playlist row and track.
"""
def __init__(self, dto: PlaylistRowDTO) -> None:
"""
The dto object will include a Tracks object if this row has a track.
"""
self.dto = dto
self.music = Music(name=Config.VLC_MAIN_PLAYER_NAME)
self.signals = MusicMusterSignals()
self.end_of_track_signalled: bool = False
self.end_time: dt.datetime | None = None
self.fade_graph: Any | None = None
self.fade_graph_start_updates: dt.datetime | None = None
self.forecast_end_time: dt.datetime | None = None
self.forecast_start_time: dt.datetime | None = None
self.note_bg: str | None = None
self.note_fg: str | None = None
self.resume_marker: float = 0.0
self.row_bg: str | None = None
self.row_fg: str | None = None
self.start_time: dt.datetime | None = None
def __repr__(self) -> str:
track_id = None
if self.dto.track:
track_id = self.dto.track.track_id
return (
f"<PlaylistRow(playlist_id={self.dto.playlist_id}, "
f"row_number={self.dto.row_number}, "
f"playlistrow_id={self.dto.playlistrow_id}, "
f"note={self.dto.note}, track_id={track_id}>"
)
# Expose TrackDTO fields as properties
@property
def artist(self):
if self.dto.track:
return self.dto.track.artist
else:
return ""
@artist.setter
def artist(self, value: str) -> None:
print(f"set artist attribute for {self=}, {value=}")
@property
def bitrate(self):
if self.dto.track:
return self.dto.track.bitrate
else:
return 0
@property
def duration(self):
if self.dto.track:
return self.dto.track.duration
else:
return 0
@property
def fade_at(self):
if self.dto.track:
return self.dto.track.fade_at
else:
return 0
@property
def intro(self):
if self.dto.track:
return self.dto.track.intro
else:
return 0
@intro.setter
def intro(self, value: int) -> None:
print(f"set intro attribute for {self=}, {value=}")
@property
def lastplayed(self):
if self.dto.track:
return self.dto.track.lastplayed
else:
return None
@property
def path(self):
if self.dto.track:
return self.dto.track.path
else:
return ""
@property
def silence_at(self):
if self.dto.track:
return self.dto.track.silence_at
else:
return 0
@property
def start_gap(self):
if self.dto.track:
return self.dto.track.start_gap
else:
return 0
@property
def title(self):
if self.dto.track:
return self.dto.track.title
else:
return ""
@title.setter
def title(self, value: str) -> None:
print(f"set title attribute for {self=}, {value=}")
@property
def track_id(self):
if self.dto.track:
return self.dto.track.track_id
else:
return 0
@track_id.setter
def track_id(self, track_id: int) -> None:
"""
Adding a track_id should only happen to a header row.
"""
if self.track_id > 0:
raise ApplicationError("Attempting to add track to row with existing track ({self=}")
repository.add_track_to_header(track_id)
# Need to update with track information
track = repository.track_by_id(track_id)
if track:
for attr, value in track.__dataclass_fields__.items():
setattr(self, attr, value)
# TODO: set up write access to track_id. Should only update if
# track_id == 0. Need to update all other track fields at the
# same time.
print(f"set track_id attribute for {self=}, {value=}")
pass
# Expose PlaylistRowDTO fields as properties
@property
def note(self):
return self.dto.note
@note.setter
def note(self, value: str) -> None:
# TODO set up write access to db
print(f"set note attribute for {self=}, {value=}")
# self.dto.note = value
@property
def played(self):
return self.dto.played
@played.setter
def played(self, value: bool = True) -> None:
# TODO set up write access to db
print(f"set played attribute for {self=}")
# self.dto.played = value
@property
def playlist_id(self):
return self.dto.playlist_id
@property
def playlistrow_id(self):
return self.dto.playlistrow_id
@property
def row_number(self):
return self.dto.row_number
@row_number.setter
def row_number(self, value: int) -> None:
# TODO do we need to set up write access to db?
self.dto.row_number = value
def check_for_end_of_track(self) -> None:
"""
Check whether track has ended. If so, emit track_ended_signal
"""
if self.start_time is None:
return
if self.end_of_track_signalled:
return
if self.music.is_playing():
return
self.start_time = None
if self.fade_graph:
self.fade_graph.clear()
# Ensure that player is released
self.music.fade(0)
self.signals.track_ended_signal.emit()
self.end_of_track_signalled = True
def drop3db(self, enable: bool) -> None:
"""
If enable is true, drop output by 3db else restore to full volume
"""
if enable:
self.music.set_volume(volume=Config.VLC_VOLUME_DROP3db, set_default=False)
else:
self.music.set_volume(volume=Config.VLC_VOLUME_DEFAULT, set_default=False)
def fade(self, fade_seconds: int = Config.FADEOUT_SECONDS) -> None:
"""Fade music"""
self.resume_marker = self.music.get_position()
self.music.fade(fade_seconds)
self.signals.track_ended_signal.emit()
def is_playing(self) -> bool:
"""
Return True if we're currently playing else False
"""
if self.start_time is None:
return False
return self.music.is_playing()
def play(self, position: float | None = None) -> None:
"""Play track"""
now = dt.datetime.now()
self.start_time = now
# Initialise player
self.music.play(self.path, start_time=now, position=position)
self.end_time = now + dt.timedelta(milliseconds=self.duration)
# Calculate time fade_graph should start updating
if self.fade_at:
update_graph_at_ms = max(
0, self.fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1
)
self.fade_graph_start_updates = now + dt.timedelta(
milliseconds=update_graph_at_ms
)
def set_forecast_start_time(
self, modified_rows: list[int], start: dt.datetime | None
) -> dt.datetime | None:
"""
Set forecast start time for this row
Update passed modified rows list if we changed the row.
Return new start time
"""
changed = False
if self.forecast_start_time != start:
self.forecast_start_time = start
changed = True
if start is None:
if self.forecast_end_time is not None:
self.forecast_end_time = None
changed = True
new_start_time = None
else:
end_time = start + dt.timedelta(milliseconds=self.duration)
new_start_time = end_time
if self.forecast_end_time != end_time:
self.forecast_end_time = end_time
changed = True
if changed and self.row_number not in modified_rows:
modified_rows.append(self.row_number)
return new_start_time
def stop(self, fade_seconds: int = 0) -> None:
"""
Stop this track playing
"""
self.resume_marker = self.music.get_position()
self.fade(fade_seconds)
# Reset fade graph
if self.fade_graph:
self.fade_graph.clear()
def time_playing(self) -> int:
"""
Return time track has been playing in milliseconds, zero if not playing
"""
if self.start_time is None:
return 0
return self.music.get_playtime()
def time_remaining_intro(self) -> int:
"""
Return milliseconds of intro remaining. Return 0 if no intro time in track
record or if intro has finished.
"""
if not self.intro:
return 0
return max(0, self.intro - self.time_playing())
def time_to_fade(self) -> int:
"""
Return milliseconds until fade time. Return zero if we're not playing.
"""
if self.start_time is None:
return 0
return self.fade_at - self.time_playing()
def time_to_silence(self) -> int:
"""
Return milliseconds until silent. Return zero if we're not playing.
"""
if self.start_time is None:
return 0
return self.silence_at - self.time_playing()
def update_fade_graph(self) -> None:
"""
Update fade graph
"""
if (
not self.is_playing()
or not self.fade_graph_start_updates
or not self.fade_graph
):
return
now = dt.datetime.now()
if self.fade_graph_start_updates > now:
return
self.fade_graph.tick(self.time_playing())
class _AddFadeCurve(QObject):
"""
Initialising a fade curve introduces a noticeable delay so carry out in
a thread.
"""
finished = pyqtSignal()
def __init__(
self,
plr: PlaylistRow,
track_path: str,
track_fade_at: int,
track_silence_at: int,
) -> None:
super().__init__()
self.plr = plr
self.track_path = track_path
self.track_fade_at = track_fade_at
self.track_silence_at = track_silence_at
def run(self) -> None:
"""
Create fade curve and add to PlaylistTrack object
"""
fc = FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at)
if not fc:
log.error(f"Failed to create FadeCurve for {self.track_path=}")
else:
self.plr.fade_graph = fc
self.finished.emit()
class FadeCurve:
GraphWidget: PlotWidget | None = None
def __init__(
self, track_path: str, track_fade_at: int, track_silence_at: int
) -> None:
"""
Set up fade graph array
"""
audio = helpers.get_audio_segment(track_path)
if not audio:
log.error(f"FadeCurve: could not get audio for {track_path=}")
return None
# Start point of curve is Config.FADE_CURVE_MS_BEFORE_FADE
# milliseconds before fade starts to silence
self.start_ms: int = max(
0, track_fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1
)
self.end_ms: int = track_silence_at
audio_segment = audio[self.start_ms : self.end_ms]
self.graph_array = np.array(audio_segment.get_array_of_samples())
# Calculate the factor to map milliseconds of track to array
self.ms_to_array_factor = len(self.graph_array) / (self.end_ms - self.start_ms)
self.curve: PlotDataItem | None = None
self.region: LinearRegionItem | None = None
def clear(self) -> None:
"""Clear the current graph"""
if self.GraphWidget:
self.GraphWidget.clear()
def plot(self) -> None:
if self.GraphWidget:
self.curve = self.GraphWidget.plot(self.graph_array)
if self.curve:
self.curve.setPen(Config.FADE_CURVE_FOREGROUND)
else:
log.debug("_FadeCurve.plot: no curve")
else:
log.debug("_FadeCurve.plot: no GraphWidget")
def tick(self, play_time: int) -> None:
"""Update volume fade curve"""
if not self.GraphWidget:
return
ms_of_graph = play_time - self.start_ms
if ms_of_graph < 0:
return
if self.region is None:
# Create the region now that we're into fade
self.region = pg.LinearRegionItem([0, 0], bounds=[0, len(self.graph_array)])
self.GraphWidget.addItem(self.region)
# Update region position
if self.region:
self.region.setRegion([0, ms_of_graph * self.ms_to_array_factor])
@singleton
class TrackSequence:
"""
Maintain a list of which track (if any) is next, current and
previous. A track can only be previous after being current, and can
only be current after being next. If one of the tracks listed here
moves, the row_number and/or playlist_id will change.
"""
def __init__(self) -> None:
"""
Set up storage for the three monitored tracks
"""
self.next: PlaylistRow | None = None
self.current: PlaylistRow | None = None
self.previous: PlaylistRow | None = None
def set_next(self, plr: PlaylistRow | None) -> None:
"""
Set the 'next' track to be passed PlaylistRow. Clear any previous
next track. If passed PlaylistRow is None just clear existing
next track.
"""
# Clear any existing fade graph
if self.next and self.next.fade_graph:
self.next.fade_graph.clear()
if plr is None:
self.next = None
else:
self.next = plr
self.create_fade_graph()
def move_next_to_current(self) -> None:
"""
Make the next track the current track
"""
self.current = self.next
self.next = None
def move_current_to_previous(self) -> None:
"""
Make the current track the previous track
"""
if self.current is None:
raise ApplicationError("Tried to move non-existent track from current to previous")
# Dereference the fade curve so it can be garbage collected
self.current.fade_graph = None
self.previous = self.current
self.current = None
def move_previous_to_next(self) -> None:
"""
Make the previous track the next track
"""
self.next = self.previous
self.previous = None
def create_fade_graph(self) -> None:
"""
Initialise and add FadeCurve in a thread as it's slow
"""
self.fadecurve_thread = QThread()
if self.next is None:
raise ApplicationError("hell in a handcart")
self.worker = _AddFadeCurve(
self.next,
track_path=self.next.path,
track_fade_at=self.next.fade_at,
track_silence_at=self.next.silence_at,
)
self.worker.moveToThread(self.fadecurve_thread)
self.fadecurve_thread.started.connect(self.worker.run)
self.worker.finished.connect(self.fadecurve_thread.quit)
self.worker.finished.connect(self.worker.deleteLater)
self.fadecurve_thread.finished.connect(self.fadecurve_thread.deleteLater)
self.fadecurve_thread.start()
def update(self) -> None:
"""
If a PlaylistRow is edited (moved, title changed, etc), the
playlistrow_id won't change. We can retrieve the PlaylistRow
using the playlistrow_id and update the stored PlaylistRow.
"""
for ts in [self.next, self.current, self.previous]:
if not ts:
continue
playlist_row_dto = repository.get_playlist_row(ts.playlistrow_id)
if not playlist_row_dto:
raise ApplicationError(f"(Can't retrieve PlaylistRows entry, {self=}")
ts = PlaylistRow(playlist_row_dto)

View File

@ -34,9 +34,17 @@ from PyQt6.QtWidgets import (
# import line_profiler
# App imports
from classes import ApplicationError, Col, MusicMusterSignals, PlaylistStyle, TrackInfo
from audacity_controller import AudacityController
from classes import (
ApplicationError,
Col,
MusicMusterSignals,
PlaylistStyle,
PlayTrack,
TrackInfo
)
from config import Config
from dialogs import TrackSelectDialog
from dialogs import TrackInsertDialog
from helpers import (
ask_yes_no,
ms_to_mmss,
@ -44,9 +52,9 @@ from helpers import (
show_warning,
)
from log import log, log_call
from models import db, Settings
from music_manager import track_sequence
from playlistrow import TrackSequence
from playlistmodel import PlaylistModel, PlaylistProxyModel
import repository
if TYPE_CHECKING:
from musicmuster import Window
@ -277,6 +285,7 @@ class PlaylistTab(QTableView):
self.musicmuster = musicmuster
self.playlist_id = model.sourceModel().playlist_id
self.track_sequence = TrackSequence()
# Set up widget
self.setItemDelegate(PlaylistDelegate(self, model.sourceModel()))
@ -302,11 +311,19 @@ class PlaylistTab(QTableView):
self.signals = MusicMusterSignals()
self.signals.resize_rows_signal.connect(self.resize_rows)
self.signals.span_cells_signal.connect(self._span_cells)
self.signals.signal_track_started.connect(self.track_started)
# Selection model
self.setSelectionMode(QAbstractItemView.SelectionMode.ExtendedSelection)
self.setSelectionBehavior(QAbstractItemView.SelectionBehavior.SelectRows)
# Set up for Audacity
try:
self.ac: Optional[AudacityController] = AudacityController()
except ApplicationError as e:
self.ac = None
show_warning(self.musicmuster, "Audacity error", str(e))
# Load model, set column widths
self.setModel(model)
self._set_column_widths()
@ -350,7 +367,8 @@ class PlaylistTab(QTableView):
# Deselect edited line
self.clear_selection()
def dropEvent(self, event: Optional[QDropEvent], dummy: int | None = None) -> None:
@log_call
def dropEvent(self, event: Optional[QDropEvent]) -> None:
"""
Move dropped rows
"""
@ -386,9 +404,6 @@ class PlaylistTab(QTableView):
destination_index = to_index
to_model_row = self.model().mapToSource(destination_index).row()
log.debug(
f"PlaylistTab.dropEvent(): {from_rows=}, {destination_index=}, {to_model_row=}"
)
# Sanity check
base_model_row_count = self.get_base_model().rowCount()
@ -400,8 +415,8 @@ class PlaylistTab(QTableView):
# that moved row the next track
set_next_row: Optional[int] = None
if (
track_sequence.current
and to_model_row == track_sequence.current.row_number + 1
self.track_sequence.current
and to_model_row == self.track_sequence.current.row_number + 1
):
set_next_row = to_model_row
@ -448,14 +463,19 @@ class PlaylistTab(QTableView):
self, selected: QItemSelection, deselected: QItemSelection
) -> None:
"""
Tell model which rows are selected.
Toggle drag behaviour according to whether rows are selected
"""
selected_rows = self.get_selected_rows()
self.musicmuster.current.selected_rows = selected_rows
selected_row_numbers = self.get_selected_rows()
# Signal selected rows to model
self.signals.signal_playlist_selected_rows.emit(self.playlist_id, selected_row_numbers)
# Put sum of selected tracks' duration in status bar
# If no rows are selected, we have nothing to do
if len(selected_rows) == 0:
if len(selected_row_numbers) == 0:
self.musicmuster.lblSumPlaytime.setText("")
else:
if not self.musicmuster.disable_selection_timing:
@ -501,20 +521,12 @@ class PlaylistTab(QTableView):
def _add_track(self) -> None:
"""Add a track to a section header making it a normal track row"""
model_row_number = self.source_model_selected_row_number()
if model_row_number is None:
return
with db.Session() as session:
dlg = TrackSelectDialog(
dlg = TrackInsertDialog(
parent=self.musicmuster,
session=session,
new_row_number=model_row_number,
base_model=self.get_base_model(),
playlist_id=self.playlist_id,
add_to_header=True,
)
dlg.exec()
session.commit()
def _build_context_menu(self, item: QTableWidgetItem) -> None:
"""Used to process context (right-click) menu, which is defined here"""
@ -527,19 +539,19 @@ class PlaylistTab(QTableView):
header_row = self.get_base_model().is_header_row(model_row_number)
track_row = not header_row
if track_sequence.current:
this_is_current_row = model_row_number == track_sequence.current.row_number
if self.track_sequence.current:
this_is_current_row = model_row_number == self.track_sequence.current.row_number
else:
this_is_current_row = False
if track_sequence.next:
this_is_next_row = model_row_number == track_sequence.next.row_number
if self.track_sequence.next:
this_is_next_row = model_row_number == self.track_sequence.next.row_number
else:
this_is_next_row = False
track_path = base_model.get_row_info(model_row_number).path
# Open/import in/from Audacity
if track_row and not this_is_current_row and self.musicmuster.ac:
if track_path == self.musicmuster.ac.path:
if track_row and not this_is_current_row:
if self.ac and track_path == self.ac.path:
# This track was opened in Audacity
self._add_context_menu(
"Update from Audacity",
@ -649,8 +661,8 @@ class PlaylistTab(QTableView):
that we have an edit open.
"""
if self.musicmuster.ac:
self.musicmuster.ac.path = None
if self.ac:
self.ac.path = None
def clear_selection(self) -> None:
"""Unselect all tracks and reset drag mode"""
@ -668,8 +680,6 @@ class PlaylistTab(QTableView):
Called when column width changes. Save new width to database.
"""
log.debug(f"_column_resize({column_number=}, {_old=}, {_new=}")
header = self.horizontalHeader()
if not header:
return
@ -677,11 +687,10 @@ class PlaylistTab(QTableView):
# Resize rows if necessary
self.resizeRowsToContents()
with db.Session() as session:
attr_name = f"playlist_col_{column_number}_width"
record = Settings.get_setting(session, attr_name)
record.f_int = self.columnWidth(column_number)
session.commit()
# Save settings
repository.set_setting(
f"playlist_col_{column_number}_width", self.columnWidth(column_number)
)
def _context_menu(self, pos):
"""Display right-click menu"""
@ -714,12 +723,18 @@ class PlaylistTab(QTableView):
cb.clear(mode=cb.Mode.Clipboard)
cb.setText(track_path, mode=cb.Mode.Clipboard)
def current_track_started(self) -> None:
@log_call
def track_started(self, play_track: PlayTrack) -> None:
"""
Called when track starts playing
"""
self.get_base_model().current_track_started()
if play_track.playlist_id != self.playlist_id:
# Not for us
return
# TODO - via signal
# self.get_base_model().current_track_started()
# Scroll to current section if hide mode is by section
if (
self.musicmuster.hide_played_tracks
@ -749,8 +764,8 @@ class PlaylistTab(QTableView):
# Don't delete current or next tracks
selected_row_numbers = self.selected_model_row_numbers()
for ts in [
track_sequence.next,
track_sequence.current,
self.track_sequence.next,
self.track_sequence.current,
]:
if ts:
if (
@ -801,6 +816,7 @@ class PlaylistTab(QTableView):
else:
return TrackInfo(track_id, selected_row)
@log_call
def get_selected_row(self) -> Optional[int]:
"""
Return selected row number. If no rows or multiple rows selected, return None
@ -812,6 +828,7 @@ class PlaylistTab(QTableView):
else:
return None
@log_call
def get_selected_rows(self) -> list[int]:
"""Return a list of model-selected row numbers sorted by row"""
@ -824,6 +841,7 @@ class PlaylistTab(QTableView):
return sorted(list(set([self.model().mapToSource(a).row() for a in selected_indexes])))
@log_call
def get_top_visible_row(self) -> int:
"""
Get the viewport of the table view
@ -852,10 +870,10 @@ class PlaylistTab(QTableView):
Import current Audacity track to passed row
"""
if not self.musicmuster.ac:
if not self.ac:
return
try:
self.musicmuster.ac.export()
self.ac.export()
self._rescan(row_number)
except ApplicationError as e:
show_warning(self.musicmuster, "Audacity error", str(e))
@ -912,16 +930,15 @@ class PlaylistTab(QTableView):
Open track in passed row in Audacity
"""
if not self.musicmuster.ac:
return
path = self.get_base_model().get_row_track_path(row_number)
if not path:
log.error(f"_open_in_audacity: can't get path for {row_number=}")
return
try:
self.musicmuster.ac.open(path)
if not self.ac:
self.ac = AudacityController()
self.ac.open(path)
except ApplicationError as e:
show_warning(self.musicmuster, "Audacity error", str(e))
@ -947,8 +964,6 @@ class PlaylistTab(QTableView):
If playlist_id is us, resize rows
"""
log.debug(f"resize_rows({playlist_id=}) {self.playlist_id=}")
if playlist_id and playlist_id != self.playlist_id:
return
@ -995,6 +1010,7 @@ class PlaylistTab(QTableView):
# Reset selection mode
self.setSelectionMode(QAbstractItemView.SelectionMode.ExtendedSelection)
@log_call
def source_model_selected_row_number(self) -> Optional[int]:
"""
Return the model row number corresponding to the selected row or None
@ -1005,6 +1021,7 @@ class PlaylistTab(QTableView):
return None
return self.model().mapToSource(selected_index).row()
@log_call
def selected_model_row_numbers(self) -> list[int]:
"""
Return a list of model row numbers corresponding to the selected rows or
@ -1047,19 +1064,16 @@ class PlaylistTab(QTableView):
def _set_column_widths(self) -> None:
"""Column widths from settings"""
log.debug("_set_column_widths()")
header = self.horizontalHeader()
if not header:
return
# Last column is set to stretch so ignore it here
with db.Session() as session:
for column_number in range(header.count() - 1):
attr_name = f"playlist_col_{column_number}_width"
record = Settings.get_setting(session, attr_name)
if record.f_int is not None:
self.setColumnWidth(column_number, record.f_int)
value = repository.get_setting(attr_name)
if value is not None:
self.setColumnWidth(column_number, value)
else:
self.setColumnWidth(column_number, Config.DEFAULT_COLUMN_WIDTH)
@ -1112,7 +1126,7 @@ class PlaylistTab(QTableView):
# Update musicmuster
self.musicmuster.current.playlist_id = self.playlist_id
self.musicmuster.current.selected_rows = self.get_selected_rows()
self.musicmuster.current.selected_row_numbers = self.get_selected_rows()
self.musicmuster.current.base_model = self.get_base_model()
self.musicmuster.current.proxy_model = self.model()
@ -1121,6 +1135,6 @@ class PlaylistTab(QTableView):
def _unmark_as_next(self) -> None:
"""Rescan track"""
track_sequence.set_next(None)
self.track_sequence.set_next(None)
self.clear_selection()
self.signals.next_track_changed_signal.emit()

View File

@ -21,7 +21,6 @@ from PyQt6.QtGui import (
)
# Third party imports
from sqlalchemy.orm.session import Session
# import snoop # type: ignore
@ -39,8 +38,9 @@ from helpers import (
show_warning,
)
from log import log
from models import db, Playdates, Tracks
from music_manager import RowAndTrack
from models import db, Playdates
from playlistrow import PlaylistRow
import repository
@dataclass
@ -64,7 +64,7 @@ class QuerylistModel(QAbstractTableModel):
"""
def __init__(self, session: Session, filter: Filter) -> None:
def __init__(self, filter: Filter) -> None:
"""
Load query
"""
@ -72,7 +72,6 @@ class QuerylistModel(QAbstractTableModel):
log.debug(f"QuerylistModel.__init__({filter=})")
super().__init__()
self.session = session
self.filter = filter
self.querylist_rows: dict[int, QueryRow] = {}
@ -136,7 +135,7 @@ class QuerylistModel(QAbstractTableModel):
row = index.row()
column = index.column()
# rat for playlist row data as it's used a lot
# plr for playlist row data as it's used a lot
qrow = self.querylist_rows[row]
# Dispatch to role-specific functions
@ -230,21 +229,16 @@ class QuerylistModel(QAbstractTableModel):
row = 0
try:
results = Tracks.get_filtered_tracks(self.session, self.filter)
results = repository.get_filtered_tracks(self.filter)
for result in results:
lastplayed = None
if hasattr(result, "playdates"):
pds = result.playdates
if pds:
lastplayed = max([a.lastplayed for a in pds])
queryrow = QueryRow(
artist=result.artist,
bitrate=result.bitrate or 0,
duration=result.duration,
lastplayed=lastplayed,
lastplayed=result.lastplayed,
path=result.path,
title=result.title,
track_id=result.id,
track_id=result.track_id,
)
self.querylist_rows[row] = queryrow
@ -268,23 +262,14 @@ class QuerylistModel(QAbstractTableModel):
bottom_right = self.index(row, self.columnCount() - 1)
self.dataChanged.emit(top_left, bottom_right, [Qt.ItemDataRole.BackgroundRole])
def _tooltip_role(self, row: int, column: int, rat: RowAndTrack) -> str | QVariant:
def _tooltip_role(self, row: int, column: int, plr: PlaylistRow) -> str | QVariant:
"""
Return tooltip. Currently only used for last_played column.
"""
if column != QueryCol.LAST_PLAYED.value:
return QVariant()
with db.Session() as session:
track_id = self.querylist_rows[row].track_id
if not track_id:
return QVariant()
playdates = Playdates.last_playdates(session, track_id)
return (
"<br>".join(
[
a.lastplayed.strftime(Config.LAST_PLAYED_TOOLTIP_DATE_FORMAT)
for a in reversed(playdates)
]
)
)
return repository.get_last_played_dates(track_id)

1194
app/repository.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -5,7 +5,6 @@ import os
# PyQt imports
# Third party imports
from sqlalchemy.orm.session import Session
# App imports
from config import Config
@ -13,10 +12,10 @@ from helpers import (
get_tags,
)
from log import log
from models import Tracks
import repository
def check_db(session: Session) -> None:
def check_db() -> None:
"""
Database consistency check.
@ -27,7 +26,7 @@ def check_db(session: Session) -> None:
Check all paths in database exist
"""
db_paths = set([a.path for a in Tracks.get_all(session)])
db_paths = set([a.path for a in repository.get_all_tracks()])
os_paths_list = []
for root, _dirs, files in os.walk(Config.ROOT):
@ -52,7 +51,7 @@ def check_db(session: Session) -> None:
missing_file_count += 1
track = Tracks.get_by_path(session, path)
track = repository.track_by_path(path)
if not track:
# This shouldn't happen as we're looking for paths in
# database that aren't in filesystem, but just in case...
@ -74,7 +73,7 @@ def check_db(session: Session) -> None:
for t in paths_not_found:
print(
f"""
Track ID: {t.id}
Track ID: {t.track_id}
Path: {t.path}
Title: {t.title}
Artist: {t.artist}
@ -84,14 +83,15 @@ def check_db(session: Session) -> None:
print("There were more paths than listed that were not found")
def update_bitrates(session: Session) -> None:
def update_bitrates() -> None:
"""
Update bitrates on all tracks in database
"""
for track in Tracks.get_all(session):
for track in repository.get_all_tracks():
try:
t = get_tags(track.path)
# TODO this won't persist as we're updating DTO
track.bitrate = t.bitrate
except FileNotFoundError:
continue

View File

@ -1,29 +0,0 @@
# Standard library imports
# PyQt imports
# Third party imports
import vlc # type: ignore
# App imports
class VLCManager:
"""
Singleton class to ensure we only ever have one vlc Instance
"""
__instance = None
def __init__(self) -> None:
if VLCManager.__instance is None:
self.vlc_instance = vlc.Instance()
VLCManager.__instance = self
else:
raise Exception("Attempted to create a second VLCManager instance")
@staticmethod
def get_instance() -> vlc.Instance:
if VLCManager.__instance is None:
VLCManager()
return VLCManager.__instance

View File

@ -33,9 +33,6 @@ dependencies = [
"types-pyyaml>=6.0.12.20241230",
"dogpile-cache>=1.3.4",
"pdbpp>=0.10.3",
"filetype>=1.2.0",
"black>=25.1.0",
"slugify>=0.0.1",
]
[dependency-groups]
@ -66,9 +63,6 @@ python_version = 3.11
warn_unused_configs = true
disallow_incomplete_defs = true
[tool.pylsp.plugins.pycodestyle]
maxLineLength = 88
[tool.pytest.ini_options]
addopts = "--exitfirst --showlocals --capture=no"
pythonpath = [".", "app"]

View File

@ -0,0 +1,40 @@
# Standard library imports
import unittest
# PyQt imports
# Third party imports
# App imports
from app.models import (
db,
)
class MyTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Runs once before any test in this class"""
pass
@classmethod
def tearDownClass(cls):
"""Runs once after all tests"""
pass
def setUp(self):
"""Runs before each test"""
db.create_all()
def tearDown(self):
"""Runs after each test"""
db.drop_all()
def test_xxx(self):
"""Comment"""
pass

View File

@ -134,122 +134,6 @@ class TestMMMiscRowMove(unittest.TestCase):
def tearDown(self):
db.drop_all()
def test_move_rows_test2(self):
# move row 3 to row 5
self.model.move_rows([3], 5)
# Check we have all rows and plr_rownums are correct
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
if row not in [3, 4, 5]:
assert self.model.playlist_rows[row].note == str(row)
elif row == 3:
assert self.model.playlist_rows[row].note == str(4)
elif row == 4:
assert self.model.playlist_rows[row].note == str(3)
elif row == 5:
assert self.model.playlist_rows[row].note == str(5)
def test_move_rows_test3(self):
# move row 4 to row 3
self.model.move_rows([4], 3)
# Check we have all rows and plr_rownums are correct
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
if row not in [3, 4]:
assert self.model.playlist_rows[row].note == str(row)
elif row == 3:
assert self.model.playlist_rows[row].note == str(4)
elif row == 4:
assert self.model.playlist_rows[row].note == str(3)
def test_move_rows_test4(self):
# move row 4 to row 2
self.model.move_rows([4], 2)
# Check we have all rows and plr_rownums are correct
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
if row not in [2, 3, 4]:
assert self.model.playlist_rows[row].note == str(row)
elif row == 2:
assert self.model.playlist_rows[row].note == str(4)
elif row == 3:
assert self.model.playlist_rows[row].note == str(2)
elif row == 4:
assert self.model.playlist_rows[row].note == str(3)
def test_move_rows_test5(self):
# move rows [1, 4, 5, 10] → 8
self.model.move_rows([1, 4, 5, 10], 8)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 2, 3, 6, 7, 1, 4, 5, 10, 8, 9]
def test_move_rows_test6(self):
# move rows [3, 6] → 5
self.model.move_rows([3, 6], 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 4, 3, 6, 5, 7, 8, 9, 10]
def test_move_rows_test7(self):
# move rows [3, 5, 6] → 8
self.model.move_rows([3, 5, 6], 8)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 4, 7, 3, 5, 6, 8, 9, 10]
def test_move_rows_test8(self):
# move rows [7, 8, 10] → 5
self.model.move_rows([7, 8, 10], 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 3, 4, 7, 8, 10, 5, 6, 9]
def test_move_rows_test9(self):
# move rows [1, 2, 3] → 0
# Replicate issue 244
self.model.move_rows([0, 1, 2, 3], 0)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
def test_insert_header_row_end(self):
# insert header row at end of playlist

298
tests/test_repository.py Normal file
View File

@ -0,0 +1,298 @@
# Standard library imports
import unittest
# PyQt imports
# Third party imports
# App imports
from app import playlistmodel
from app import repository
from app.models import db
from classes import PlaylistDTO
from helpers import get_all_track_metadata
from playlistmodel import PlaylistModel
class MyTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Runs once before any test in this class"""
cls.isa_path = "testdata/isa.mp3"
cls.isa_title = "I'm So Afraid"
cls.isa_artist = "Fleetwood Mac"
cls.mom_path = "testdata/mom.mp3"
cls.mom_title = "Man of Mystery"
cls.mom_artist = "The Shadows"
@classmethod
def tearDownClass(cls):
"""Runs once after all tests"""
pass
def setUp(self):
"""Runs before each test"""
db.create_all()
def create_playlist_and_model(
self, playlist_name: str
) -> (PlaylistDTO, PlaylistModel):
# Create a playlist and model
playlist = repository.create_playlist(name=playlist_name, template_id=0)
assert playlist
model = playlistmodel.PlaylistModel(playlist.playlist_id, is_template=False)
assert model
return (playlist, model)
def create_playlist_model_tracks(self, playlist_name: str):
(playlist, model) = self.create_playlist_and_model(playlist_name)
# Create tracks
metadata1 = get_all_track_metadata(self.isa_path)
self.track1 = repository.create_track(self.isa_path, metadata1)
metadata2 = get_all_track_metadata(self.mom_path)
self.track2 = repository.create_track(self.mom_path, metadata2)
# Add tracks and header to playlist
self.row0 = repository.insert_row(
playlist.playlist_id,
row_number=0,
track_id=self.track1.track_id,
note="track 1",
)
self.row1 = repository.insert_row(
playlist.playlist_id,
row_number=1,
track_id=0,
note="Header row",
)
self.row2 = repository.insert_row(
playlist.playlist_id,
row_number=2,
track_id=self.track2.track_id,
note="track 2",
)
def create_rows(
self, playlist_name: str, number_of_rows: int
) -> (PlaylistDTO, PlaylistModel):
(playlist, model) = self.create_playlist_and_model(playlist_name)
for row_number in range(number_of_rows):
repository.insert_row(
playlist.playlist_id, row_number, None, str(row_number)
)
return (playlist, model)
def tearDown(self):
"""Runs after each test"""
db.drop_all()
def test_add_track_to_header(self):
"""Add a track to a header row"""
self.create_playlist_model_tracks("my playlist")
repository.add_track_to_header(self.row1.playlistrow_id, self.track2.track_id)
result = repository.get_playlist_row(self.row1.playlistrow_id)
assert result.track.track_id == self.track2.track_id
def test_create_track(self):
metadata = get_all_track_metadata(self.isa_path)
repository.create_track(self.isa_path, metadata)
results = repository.get_all_tracks()
assert len(results) == 1
assert results[0].path == self.isa_path
def test_get_track_by_id(self):
metadata = get_all_track_metadata(self.isa_path)
dto = repository.create_track(self.isa_path, metadata)
result = repository.track_by_id(dto.track_id)
assert result.path == self.isa_path
def test_get_track_by_artist(self):
metadata = get_all_track_metadata(self.isa_path)
_ = repository.create_track(self.isa_path, metadata)
metadata = get_all_track_metadata(self.mom_path)
_ = repository.create_track(self.mom_path, metadata)
result_isa = repository.tracks_by_artist(self.isa_artist)
assert len(result_isa) == 1
assert result_isa[0].artist == self.isa_artist
result_mom = repository.tracks_by_artist(self.mom_artist)
assert len(result_mom) == 1
assert result_mom[0].artist == self.mom_artist
def test_get_track_by_title(self):
metadata_isa = get_all_track_metadata(self.isa_path)
_ = repository.create_track(self.isa_path, metadata_isa)
metadata_mom = get_all_track_metadata(self.mom_path)
_ = repository.create_track(self.mom_path, metadata_mom)
result_isa = repository.tracks_by_title(self.isa_title)
assert len(result_isa) == 1
assert result_isa[0].title == self.isa_title
result_mom = repository.tracks_by_title(self.mom_title)
assert len(result_mom) == 1
assert result_mom[0].title == self.mom_title
def test_tracks_get_all_tracks(self):
self.create_playlist_model_tracks(playlist_name="test_track_get_all_tracks")
all_tracks = repository.get_all_tracks()
assert len(all_tracks) == 2
def test_tracks_by_path(self):
metadata_isa = get_all_track_metadata(self.isa_path)
_ = repository.create_track(self.isa_path, metadata_isa)
metadata_mom = get_all_track_metadata(self.mom_path)
_ = repository.create_track(self.mom_path, metadata_mom)
result_isa = repository.track_by_path(self.isa_path)
assert result_isa.title == self.isa_title
result_mom = repository.track_by_path(self.mom_path)
assert result_mom.title == self.mom_title
def test_move_rows_test1(self):
# move row 3 to row 5
number_of_rows = 10
(playlist, model) = self.create_rows("test_move_rows_test1", number_of_rows)
repository.move_rows([3], playlist.playlist_id, 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 4, 5, 3, 6, 7, 8, 9]
def test_move_rows_test2(self):
# move row 4 to row 3
number_of_rows = 10
(playlist, model) = self.create_rows("test_move_rows_test2", number_of_rows)
repository.move_rows([4], playlist.playlist_id, 3)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 4, 3, 5, 6, 7, 8, 9]
def test_move_rows_test3(self):
# move row 4 to row 2
number_of_rows = 10
(playlist, model) = self.create_rows("test_move_rows_test3", number_of_rows)
repository.move_rows([4], playlist.playlist_id, 2)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 4, 2, 3, 5, 6, 7, 8, 9]
def test_move_rows_test4(self):
# move rows [1, 4, 5, 10] → 8
number_of_rows = 11
(playlist, model) = self.create_rows("test_move_rows_test4", number_of_rows)
repository.move_rows([1, 4, 5, 10], playlist.playlist_id, 8)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 2, 3, 6, 7, 8, 1, 4, 5, 10, 9]
def test_move_rows_test5(self):
# move rows [3, 6] → 5
number_of_rows = 11
(playlist, model) = self.create_rows("test_move_rows_test5", number_of_rows)
repository.move_rows([3, 6], playlist.playlist_id, 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 4, 5, 3, 6, 7, 8, 9, 10]
def test_move_rows_test6(self):
# move rows [3, 5, 6] → 8
number_of_rows = 11
(playlist, model) = self.create_rows("test_move_rows_test6", number_of_rows)
repository.move_rows([3, 5, 6], playlist.playlist_id, 8)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 4, 7, 8, 9, 10, 3, 5, 6]
def test_move_rows_test7(self):
# move rows [7, 8, 10] → 5
number_of_rows = 11
(playlist, model) = self.create_rows("test_move_rows_test7", number_of_rows)
repository.move_rows([7, 8, 10], playlist.playlist_id, 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 3, 4, 7, 8, 10, 5, 6, 9]
def test_move_rows_test8(self):
# move rows [1, 2, 3] → 0
# Replicate issue 244
number_of_rows = 11
(playlist, model) = self.create_rows("test_move_rows_test8", number_of_rows)
repository.move_rows([0, 1, 2, 3], playlist.playlist_id, 0)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
def test_move_rows_to_playlist(self):
number_of_rows = 11
rows_to_move = [2, 4, 6]
to_row = 5
(playlist_src, model_src) = self.create_rows("src playlist", number_of_rows)
(playlist_dst, model_dst) = self.create_rows("dst playlist", number_of_rows)
repository.move_rows(
rows_to_move, playlist_src.playlist_id, to_row, playlist_dst.playlist_id
)
# Check we have all rows and plr_rownums are correct
new_order_src = []
for row in repository.get_playlist_rows(playlist_src.playlist_id):
new_order_src.append(int(row.note))
assert new_order_src == [0, 1, 3, 5, 7, 8, 9, 10]
new_order_dst = []
for row in repository.get_playlist_rows(playlist_dst.playlist_id):
new_order_dst.append(int(row.note))
assert new_order_dst == [0, 1, 2, 3, 4, 2, 4, 6, 5, 6, 7, 8, 9, 10]
def test_remove_rows(self):
pass
def test_get_playlist_by_id(self):
pass
def test_settings(self):
pass

View File

@ -132,9 +132,8 @@ class MyTestCase(unittest.TestCase):
Config.ROOT = os.path.join(os.path.dirname(__file__), "testdata")
with db.Session() as session:
utilities.check_db(session)
utilities.update_bitrates(session)
utilities.check_db()
utilities.update_bitrates()
# def test_meta_all_clear(qtbot, session):

817
uv.lock

File diff suppressed because it is too large Load Diff