After around 1.5h of operation, we'd get messages such as: vlcpulse audio output error: PulseAudio server connection failure: Connection terminated Tracked down to not correctly releasing vlc player resources when track had finished playing. Fixed now, and much simplified the fadeout code as well.
1928 lines
66 KiB
Python
Executable File
1928 lines
66 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# Standard library imports
|
|
from os.path import basename
|
|
from typing import List, Optional
|
|
import argparse
|
|
import datetime as dt
|
|
import os
|
|
import shutil
|
|
from slugify import slugify # type: ignore
|
|
import subprocess
|
|
import sys
|
|
import urllib.parse
|
|
import webbrowser
|
|
|
|
# PyQt imports
|
|
from PyQt6.QtCore import (
|
|
pyqtSignal,
|
|
QDate,
|
|
QObject,
|
|
Qt,
|
|
QThread,
|
|
QTime,
|
|
QTimer,
|
|
)
|
|
from PyQt6.QtGui import (
|
|
QCloseEvent,
|
|
QColor,
|
|
QKeySequence,
|
|
QPalette,
|
|
QShortcut,
|
|
)
|
|
from PyQt6.QtWidgets import (
|
|
QApplication,
|
|
QDialog,
|
|
QFileDialog,
|
|
QInputDialog,
|
|
QLabel,
|
|
QLineEdit,
|
|
QListWidgetItem,
|
|
QMainWindow,
|
|
QMessageBox,
|
|
QWidget,
|
|
)
|
|
|
|
# Third party imports
|
|
import pipeclient
|
|
from pygame import mixer
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.orm.session import Session
|
|
import stackprinter # type: ignore
|
|
|
|
# App imports
|
|
from classes import (
|
|
MusicMusterSignals,
|
|
RowAndTrack,
|
|
TrackFileData,
|
|
TrackInfo,
|
|
track_sequence,
|
|
)
|
|
from config import Config
|
|
from dialogs import TrackSelectDialog, ReplaceFilesDialog
|
|
from helpers import file_is_unreadable
|
|
from log import log
|
|
from models import db, Playdates, PlaylistRows, Playlists, Settings, Tracks
|
|
from playlistmodel import PlaylistModel, PlaylistProxyModel
|
|
from playlists import PlaylistTab
|
|
from ui import icons_rc # noqa F401
|
|
from ui.dlg_SelectPlaylist_ui import Ui_dlgSelectPlaylist # type: ignore
|
|
from ui.downloadcsv_ui import Ui_DateSelect # type: ignore
|
|
from ui.main_window_ui import Ui_MainWindow # type: ignore
|
|
from utilities import check_db, update_bitrates
|
|
import helpers
|
|
|
|
|
|
class ImportTrack(QObject):
|
|
import_finished = pyqtSignal()
|
|
|
|
def __init__(
|
|
self,
|
|
track_files: List[TrackFileData],
|
|
source_model: PlaylistModel,
|
|
row_number: Optional[int],
|
|
) -> None:
|
|
super().__init__()
|
|
self.track_files = track_files
|
|
self.source_model = source_model
|
|
if row_number is None:
|
|
self.next_row_number = source_model.rowCount()
|
|
else:
|
|
self.next_row_number = row_number
|
|
self.signals = MusicMusterSignals()
|
|
|
|
# Sanity check
|
|
for tf in track_files:
|
|
if not tf.tags:
|
|
raise Exception(f"ImportTrack: no tags for {tf.new_file_path}")
|
|
if not tf.audio_metadata:
|
|
raise Exception(
|
|
f"ImportTrack: no audio_metadata for {tf.new_file_path}"
|
|
)
|
|
if tf.track_path is None:
|
|
raise Exception(f"ImportTrack: no track_path for {tf.new_file_path}")
|
|
|
|
def run(self):
|
|
"""
|
|
Create track objects from passed files and add to visible playlist
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
for tf in self.track_files:
|
|
self.signals.status_message_signal.emit(
|
|
f"Importing {basename(tf.new_file_path)}", 5000
|
|
)
|
|
|
|
# Sanity check
|
|
if not os.path.exists(tf.new_file_path):
|
|
log.error(f"ImportTrack: file not found: {tf.new_file_path=}")
|
|
continue
|
|
|
|
# Move the track file. Check that we're not importing a
|
|
# file that's already in its final destination.
|
|
if os.path.exists(tf.track_path) and tf.track_path != tf.new_file_path:
|
|
os.unlink(tf.track_path)
|
|
shutil.move(tf.new_file_path, tf.track_path)
|
|
|
|
# Import track
|
|
try:
|
|
track = Tracks(
|
|
session, path=tf.track_path, **tf.audio_metadata | tf.tags
|
|
)
|
|
except Exception as e:
|
|
self.signals.show_warning_signal.emit(
|
|
"Error importing track", str(e)
|
|
)
|
|
return
|
|
helpers.normalise_track(tf.track_path)
|
|
# We're importing potentially multiple tracks in a loop.
|
|
# If there's an error adding the track to the Tracks
|
|
# table, the session will rollback, thus losing any
|
|
# previous additions in this loop. So, commit now to
|
|
# lock in what we've just done.
|
|
session.commit()
|
|
self.source_model.insert_row(self.next_row_number, track.id, "")
|
|
self.next_row_number += 1
|
|
self.signals.status_message_signal.emit(
|
|
f"{len(self.track_files)} tracks imported", 10000
|
|
)
|
|
self.import_finished.emit()
|
|
|
|
|
|
class PreviewManager:
|
|
"""
|
|
Manage track preview player
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
mixer.init()
|
|
self.intro: Optional[int] = None
|
|
self.path: str = ""
|
|
self.row_number: Optional[int] = None
|
|
self.start_time: Optional[dt.datetime] = None
|
|
self.track_id: int = 0
|
|
|
|
def back(self, ms: int) -> None:
|
|
"""
|
|
Move play position back by 'ms' milliseconds
|
|
"""
|
|
|
|
position = max(0, (self.get_playtime() - ms)) / 1000
|
|
mixer.music.set_pos(position)
|
|
self.start_time = dt.datetime.now() - dt.timedelta(seconds=position)
|
|
|
|
def forward(self, ms: int) -> None:
|
|
"""
|
|
Move play position forward by 'ms' milliseconds
|
|
"""
|
|
|
|
position = (self.get_playtime() + ms) / 1000
|
|
mixer.music.set_pos(position)
|
|
self.start_time = dt.datetime.now() - dt.timedelta(seconds=position)
|
|
|
|
def get_playtime(self) -> int:
|
|
"""
|
|
Return time since track started in milliseconds, 0 if not playing
|
|
"""
|
|
|
|
if not mixer.music.get_busy():
|
|
return 0
|
|
|
|
if not self.start_time:
|
|
return 0
|
|
|
|
return int((dt.datetime.now() - self.start_time).total_seconds() * 1000)
|
|
|
|
def is_playing(self) -> bool:
|
|
return mixer.music.get_busy()
|
|
|
|
def move_to_intro_end(self) -> None:
|
|
"""
|
|
Move play position to 'buffer' milliseconds before end of intro.
|
|
|
|
If no intro defined, do nothing.
|
|
"""
|
|
|
|
if self.intro is None:
|
|
return
|
|
|
|
position = max(0, self.intro - Config.PREVIEW_END_BUFFER_MS) / 1000
|
|
mixer.music.set_pos(position)
|
|
self.start_time = dt.datetime.now() - dt.timedelta(seconds=position)
|
|
|
|
def play(self) -> None:
|
|
mixer.music.play()
|
|
self.start_time = dt.datetime.now()
|
|
|
|
def restart(self) -> None:
|
|
"""
|
|
Restart player from beginning
|
|
"""
|
|
|
|
if not mixer.music.get_busy():
|
|
return
|
|
|
|
mixer.music.rewind()
|
|
self.start_time = dt.datetime.now()
|
|
|
|
def set_intro(self, ms: int) -> None:
|
|
"""
|
|
Set intro time
|
|
"""
|
|
|
|
self.intro = ms
|
|
|
|
def set_track_info(self, info: TrackInfo) -> None:
|
|
self.track_id = info.track_id
|
|
self.row_number = info.row_number
|
|
|
|
with db.Session() as session:
|
|
track = session.get(Tracks, self.track_id)
|
|
if not track:
|
|
raise ValueError(
|
|
f"PreviewManager: unable to retreive track {self.track_id=}"
|
|
)
|
|
self.intro = track.intro
|
|
self.path = track.path
|
|
|
|
# Check file readable
|
|
if file_is_unreadable(self.path):
|
|
raise ValueError(f"PreviewManager.__init__: {track.path=} unreadable")
|
|
|
|
mixer.music.load(self.path)
|
|
|
|
def stop(self) -> None:
|
|
mixer.music.stop()
|
|
mixer.music.unload()
|
|
self.path = ""
|
|
self.row_number = None
|
|
self.track_id = 0
|
|
self.start_time = None
|
|
|
|
|
|
class Window(QMainWindow, Ui_MainWindow):
|
|
def __init__(
|
|
self, parent: Optional[QWidget] = None, *args: list, **kwargs: dict
|
|
) -> None:
|
|
super().__init__(parent)
|
|
self.setupUi(self)
|
|
|
|
self.timer10: QTimer = QTimer()
|
|
self.timer100: QTimer = QTimer()
|
|
self.timer500: QTimer = QTimer()
|
|
self.timer1000: QTimer = QTimer()
|
|
|
|
self.set_main_window_size()
|
|
self.lblSumPlaytime = QLabel("")
|
|
self.statusbar.addPermanentWidget(self.lblSumPlaytime)
|
|
self.txtSearch = QLineEdit()
|
|
self.txtSearch.setHidden(True)
|
|
self.statusbar.addWidget(self.txtSearch)
|
|
self.hide_played_tracks = False
|
|
self.preview_manager = PreviewManager()
|
|
|
|
self.widgetFadeVolume.hideAxis("bottom")
|
|
self.widgetFadeVolume.hideAxis("left")
|
|
self.widgetFadeVolume.setDefaultPadding(0)
|
|
self.widgetFadeVolume.setBackground(Config.FADE_CURVE_BACKGROUND)
|
|
|
|
self.active_tab = lambda: self.tabPlaylist.currentWidget()
|
|
self.active_proxy_model = lambda: self.tabPlaylist.currentWidget().model()
|
|
self.move_source_rows: Optional[List[int]] = None
|
|
self.move_source_model: Optional[PlaylistProxyModel] = None
|
|
self.audacity_file_path: Optional[str] = None
|
|
self.audacity_client: Optional[pipeclient.PipeClient] = None
|
|
self.initialise_audacity()
|
|
|
|
self.disable_selection_timing = False
|
|
self.clock_counter = 0
|
|
self.timer10.start(10)
|
|
self.timer100.start(100)
|
|
self.timer500.start(500)
|
|
self.timer1000.start(1000)
|
|
self.signals = MusicMusterSignals()
|
|
self.connect_signals_slots()
|
|
self.catch_return_key = False
|
|
|
|
if not Config.USE_INTERNAL_BROWSER:
|
|
webbrowser.register(
|
|
"browser",
|
|
None,
|
|
webbrowser.BackgroundBrowser(Config.EXTERNAL_BROWSER_PATH),
|
|
)
|
|
|
|
# Set up shortcut key for instant logging from keyboard
|
|
self.action_quicklog = QShortcut(QKeySequence("Ctrl+L"), self)
|
|
self.action_quicklog.activated.connect(self.quicklog)
|
|
|
|
self.load_last_playlists()
|
|
self.stop_autoplay = False
|
|
|
|
def about(self) -> None:
|
|
"""Get git tag and database name"""
|
|
|
|
try:
|
|
git_tag = str(
|
|
subprocess.check_output(["git", "describe"], stderr=subprocess.STDOUT)
|
|
).strip("'b\\n")
|
|
except subprocess.CalledProcessError as exc_info:
|
|
git_tag = str(exc_info.output)
|
|
|
|
with db.Session() as session:
|
|
if session.bind:
|
|
dbname = session.bind.engine.url.database
|
|
|
|
QMessageBox.information(
|
|
self,
|
|
"About",
|
|
f"MusicMuster {git_tag}\n\nDatabase: {dbname}",
|
|
QMessageBox.StandardButton.Ok,
|
|
)
|
|
|
|
def clear_next(self) -> None:
|
|
"""
|
|
Clear next track
|
|
"""
|
|
|
|
track_sequence.set_next(None)
|
|
self.update_headers()
|
|
|
|
def clear_selection(self) -> None:
|
|
"""Clear row selection"""
|
|
|
|
# Unselect any selected rows
|
|
if self.active_tab():
|
|
self.active_tab().clear_selection()
|
|
# Clear the search bar
|
|
self.search_playlist_clear()
|
|
|
|
def closeEvent(self, event: Optional[QCloseEvent]) -> None:
|
|
"""Handle attempt to close main window"""
|
|
|
|
if not event:
|
|
return
|
|
|
|
# Don't allow window to close when a track is playing
|
|
if track_sequence.current and track_sequence.current.is_playing():
|
|
event.ignore()
|
|
helpers.show_warning(
|
|
self, "Track playing", "Can't close application while track is playing"
|
|
)
|
|
else:
|
|
with db.Session() as session:
|
|
# Save tab number of open playlists
|
|
open_playlist_ids: dict[int, int] = {}
|
|
for idx in range(self.tabPlaylist.count()):
|
|
open_playlist_ids[self.tabPlaylist.widget(idx).playlist_id] = idx
|
|
Playlists.clear_tabs(session, list(open_playlist_ids.keys()))
|
|
for playlist_id, idx in open_playlist_ids.items():
|
|
playlist = session.get(Playlists, playlist_id)
|
|
if playlist:
|
|
log.debug(f"Set {playlist=} tab to {idx=}")
|
|
playlist.tab = idx
|
|
|
|
# Save window attributes
|
|
splitter_top, splitter_bottom = self.splitter.sizes()
|
|
attributes_to_save = dict(
|
|
mainwindow_height=self.height(),
|
|
mainwindow_width=self.width(),
|
|
mainwindow_x=self.x(),
|
|
mainwindow_y=self.y(),
|
|
splitter_top=splitter_top,
|
|
splitter_bottom=splitter_bottom,
|
|
active_tab=self.tabPlaylist.currentIndex(),
|
|
)
|
|
for name, value in attributes_to_save.items():
|
|
record = Settings.get_setting(session, name)
|
|
record.f_int = value
|
|
|
|
session.commit()
|
|
|
|
event.accept()
|
|
|
|
def close_playlist_tab(self) -> bool:
|
|
"""
|
|
Close active playlist tab, called by menu item
|
|
"""
|
|
|
|
return self.close_tab(self.tabPlaylist.currentIndex())
|
|
|
|
def close_tab(self, tab_index: int) -> bool:
|
|
"""
|
|
Close playlist tab unless it holds the current or next track.
|
|
Called from close_playlist_tab() or by clicking close button on tab.
|
|
|
|
Return True if tab closed else False.
|
|
"""
|
|
|
|
closing_tab_playlist_id = self.tabPlaylist.widget(tab_index).playlist_id
|
|
|
|
# Don't close current track playlist
|
|
if track_sequence.current is not None:
|
|
current_track_playlist_id = track_sequence.current.playlist_id
|
|
if current_track_playlist_id:
|
|
if closing_tab_playlist_id == current_track_playlist_id:
|
|
helpers.show_OK(
|
|
self, "Current track", "Can't close current track playlist"
|
|
)
|
|
return False
|
|
|
|
# Don't close next track playlist
|
|
if track_sequence.next is not None:
|
|
next_track_playlist_id = track_sequence.next.playlist_id
|
|
if next_track_playlist_id:
|
|
if closing_tab_playlist_id == next_track_playlist_id:
|
|
helpers.show_OK(
|
|
self, "Next track", "Can't close next track playlist"
|
|
)
|
|
return False
|
|
|
|
# Record playlist as closed and update remaining playlist tabs
|
|
with db.Session() as session:
|
|
playlist = session.get(Playlists, closing_tab_playlist_id)
|
|
if playlist:
|
|
playlist.close(session)
|
|
|
|
# Close playlist and remove tab
|
|
self.tabPlaylist.widget(tab_index).close()
|
|
self.tabPlaylist.removeTab(tab_index)
|
|
|
|
return True
|
|
|
|
def connect_signals_slots(self) -> None:
|
|
self.action_About.triggered.connect(self.about)
|
|
self.action_Clear_selection.triggered.connect(self.clear_selection)
|
|
self.actionDebug.triggered.connect(self.debug)
|
|
self.actionClosePlaylist.triggered.connect(self.close_playlist_tab)
|
|
self.actionDeletePlaylist.triggered.connect(self.delete_playlist)
|
|
self.actionDownload_CSV_of_played_tracks.triggered.connect(
|
|
self.download_played_tracks
|
|
)
|
|
self.actionExport_playlist.triggered.connect(self.export_playlist_tab)
|
|
self.actionFade.triggered.connect(self.fade)
|
|
self.actionImport.triggered.connect(self.import_track)
|
|
self.actionInsertSectionHeader.triggered.connect(self.insert_header)
|
|
self.actionInsertTrack.triggered.connect(self.insert_track)
|
|
self.actionMark_for_moving.triggered.connect(self.cut_rows)
|
|
self.actionMoveSelected.triggered.connect(self.move_selected)
|
|
self.actionNew_from_template.triggered.connect(self.new_from_template)
|
|
self.actionNewPlaylist.triggered.connect(self.create_and_show_playlist)
|
|
self.actionOpenPlaylist.triggered.connect(self.open_playlist)
|
|
self.actionPaste.triggered.connect(self.paste_rows)
|
|
self.actionPlay_next.triggered.connect(self.play_next)
|
|
self.actionRenamePlaylist.triggered.connect(self.rename_playlist)
|
|
self.actionReplace_files.triggered.connect(self.import_files)
|
|
self.actionResume.triggered.connect(self.resume)
|
|
self.actionSave_as_template.triggered.connect(self.save_as_template)
|
|
self.actionSearch_title_in_Songfacts.triggered.connect(
|
|
self.lookup_row_in_songfacts
|
|
)
|
|
self.actionSearch_title_in_Wikipedia.triggered.connect(
|
|
self.lookup_row_in_wikipedia
|
|
)
|
|
self.actionSearch.triggered.connect(self.search_playlist)
|
|
self.actionSelect_duplicate_rows.triggered.connect(
|
|
lambda: self.active_tab().select_duplicate_rows()
|
|
)
|
|
self.actionMoveUnplayed.triggered.connect(self.move_unplayed)
|
|
self.actionSetNext.triggered.connect(self.set_selected_track_next)
|
|
self.actionSkipToNext.triggered.connect(self.play_next)
|
|
self.actionStop.triggered.connect(self.stop)
|
|
self.btnDrop3db.clicked.connect(self.drop3db)
|
|
self.btnFade.clicked.connect(self.fade)
|
|
self.btnHidePlayed.clicked.connect(self.hide_played)
|
|
self.btnPreviewArm.clicked.connect(self.preview_arm)
|
|
self.btnPreviewBack.clicked.connect(self.preview_back)
|
|
self.btnPreview.clicked.connect(self.preview)
|
|
self.btnPreviewEnd.clicked.connect(self.preview_end)
|
|
self.btnPreviewFwd.clicked.connect(self.preview_fwd)
|
|
self.btnPreviewMark.clicked.connect(self.preview_mark)
|
|
self.btnPreviewStart.clicked.connect(self.preview_start)
|
|
self.btnStop.clicked.connect(self.stop)
|
|
self.hdrCurrentTrack.clicked.connect(self.show_current)
|
|
self.hdrNextTrack.clicked.connect(self.show_next)
|
|
self.tabPlaylist.currentChanged.connect(self.tab_change)
|
|
self.tabPlaylist.tabCloseRequested.connect(self.close_tab)
|
|
self.tabBar = self.tabPlaylist.tabBar()
|
|
self.txtSearch.textChanged.connect(self.search_playlist_text_changed)
|
|
|
|
self.signals.enable_escape_signal.connect(self.enable_escape)
|
|
self.signals.next_track_changed_signal.connect(self.update_headers)
|
|
self.signals.status_message_signal.connect(self.show_status_message)
|
|
self.signals.show_warning_signal.connect(self.show_warning)
|
|
self.signals.track_ended_signal.connect(self.end_of_track_actions)
|
|
|
|
self.timer10.timeout.connect(self.tick_10ms)
|
|
self.timer500.timeout.connect(self.tick_500ms)
|
|
self.timer100.timeout.connect(self.tick_100ms)
|
|
self.timer1000.timeout.connect(self.tick_1000ms)
|
|
|
|
self.signals.search_songfacts_signal.connect(self.open_songfacts_browser)
|
|
self.signals.search_wikipedia_signal.connect(self.open_wikipedia_browser)
|
|
|
|
def create_playlist(
|
|
self, session: Session, playlist_name: Optional[str] = None
|
|
) -> Optional[Playlists]:
|
|
"""Create new playlist"""
|
|
|
|
log.debug(f"create_playlist({playlist_name=}")
|
|
|
|
playlist_name = self.solicit_playlist_name(session)
|
|
if not playlist_name:
|
|
return None
|
|
playlist = Playlists(session, playlist_name)
|
|
|
|
if playlist:
|
|
playlist.mark_open()
|
|
return playlist
|
|
else:
|
|
log.error("Failed to create playlist")
|
|
|
|
return None
|
|
|
|
def create_and_show_playlist(self) -> None:
|
|
"""Create new playlist and display it"""
|
|
|
|
with db.Session() as session:
|
|
playlist = self.create_playlist(session)
|
|
if playlist:
|
|
self.create_playlist_tab(playlist)
|
|
session.commit()
|
|
|
|
def create_playlist_tab(self, playlist: Playlists) -> int:
|
|
"""
|
|
Take the passed playlist database object, create a playlist tab and
|
|
add tab to display. Return index number of tab.
|
|
"""
|
|
|
|
log.debug(f"create_playlist_tab({playlist=})")
|
|
|
|
playlist_tab = PlaylistTab(
|
|
musicmuster=self,
|
|
playlist_id=playlist.id,
|
|
)
|
|
idx = self.tabPlaylist.addTab(playlist_tab, playlist.name)
|
|
|
|
log.debug(f"create_playlist_tab() returned: {idx=}")
|
|
return idx
|
|
|
|
def cut_rows(self) -> None:
|
|
"""
|
|
Cut rows ready for pasting.
|
|
"""
|
|
|
|
# Save the selected PlaylistRows items ready for a later
|
|
# paste
|
|
self.move_source_rows = self.active_tab().get_selected_rows()
|
|
self.move_source_model = self.active_proxy_model()
|
|
|
|
log.debug(f"cut_rows(): {self.move_source_rows=} {self.move_source_model=}")
|
|
|
|
def debug(self):
|
|
"""Invoke debugger"""
|
|
|
|
visible_playlist_id = self.active_tab().playlist_id
|
|
print(f"Active playlist id={visible_playlist_id}")
|
|
import ipdb # type: ignore
|
|
|
|
ipdb.set_trace()
|
|
|
|
def delete_playlist(self) -> None:
|
|
"""
|
|
Delete current playlist
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
playlist_id = self.active_tab().playlist_id
|
|
playlist = session.get(Playlists, playlist_id)
|
|
if playlist:
|
|
if helpers.ask_yes_no(
|
|
"Delete playlist",
|
|
f"Delete playlist '{playlist.name}': " "Are you sure?",
|
|
):
|
|
if self.close_playlist_tab():
|
|
playlist.delete(session)
|
|
session.commit()
|
|
else:
|
|
log.error("Failed to retrieve playlist")
|
|
|
|
def download_played_tracks(self) -> None:
|
|
"""Download a CSV of played tracks"""
|
|
|
|
dlg = DownloadCSV(self)
|
|
if dlg.exec():
|
|
start_dt = dlg.ui.dateTimeEdit.dateTime().toPyDateTime()
|
|
# Get output filename
|
|
pathspec = QFileDialog.getSaveFileName(
|
|
self,
|
|
"Save CSV of tracks played",
|
|
directory="/tmp/playlist.csv",
|
|
filter="CSV files (*.csv)",
|
|
)
|
|
if not pathspec:
|
|
return
|
|
|
|
path = pathspec[0]
|
|
if not path.endswith(".csv"):
|
|
path += ".csv"
|
|
|
|
with open(path, "w") as f:
|
|
with db.Session() as session:
|
|
for playdate in Playdates.played_after(session, start_dt):
|
|
f.write(f"{playdate.track.artist},{playdate.track.title}\n")
|
|
|
|
def drop3db(self) -> None:
|
|
"""Drop music level by 3db if button checked"""
|
|
|
|
if track_sequence.current:
|
|
track_sequence.current.drop3db(self.btnDrop3db.isChecked())
|
|
|
|
def enable_escape(self, enabled: bool) -> None:
|
|
"""
|
|
Manage signal to enable/disable handling ESC character.
|
|
|
|
Needed because we want to use ESC when editing playlist in place,
|
|
so we need to disable it here while editing.
|
|
"""
|
|
|
|
log.debug(f"enable_escape({enabled=})")
|
|
|
|
self.action_Clear_selection.setEnabled(enabled)
|
|
|
|
def end_of_track_actions(self) -> None:
|
|
"""
|
|
|
|
Actions required:
|
|
- Reset track_sequence objects
|
|
- Tell model track has finished
|
|
- Reset clocks
|
|
- Update headers
|
|
- Enable controls
|
|
"""
|
|
|
|
# Reset track_sequence objects
|
|
track_sequence.previous = track_sequence.current
|
|
track_sequence.current = None
|
|
|
|
# Tell model previous track has finished
|
|
self.active_proxy_model().previous_track_ended()
|
|
|
|
# Reset clocks
|
|
self.frame_fade.setStyleSheet("")
|
|
self.frame_silent.setStyleSheet("")
|
|
self.label_fade_timer.setText("00:00")
|
|
self.label_silent_timer.setText("00:00")
|
|
|
|
# Update headers
|
|
self.update_headers()
|
|
|
|
# Enable controls
|
|
self.catch_return_key = False
|
|
self.show_status_message("Play controls: Enabled", 0)
|
|
|
|
# autoplay
|
|
if not self.stop_autoplay:
|
|
self.play_next()
|
|
|
|
def export_playlist_tab(self) -> None:
|
|
"""Export the current playlist to an m3u file"""
|
|
|
|
if not self.active_tab():
|
|
return
|
|
|
|
playlist_id = self.active_tab().playlist_id
|
|
|
|
with db.Session() as session:
|
|
# Get output filename
|
|
playlist = session.get(Playlists, playlist_id)
|
|
if not playlist:
|
|
return
|
|
|
|
pathspec = QFileDialog.getSaveFileName(
|
|
self,
|
|
"Save Playlist",
|
|
directory=f"{playlist.name}.m3u",
|
|
filter="M3U files (*.m3u);;All files (*.*)",
|
|
)
|
|
if not pathspec:
|
|
return
|
|
path = pathspec[0]
|
|
if not path.endswith(".m3u"):
|
|
path += ".m3u"
|
|
|
|
# Get list of track rows for this playlist
|
|
plrs = PlaylistRows.get_rows_with_tracks(session, playlist_id)
|
|
with open(path, "w") as f:
|
|
# Required directive on first line
|
|
f.write("#EXTM3U\n")
|
|
for track in [a.track for a in plrs]:
|
|
if track.duration is None:
|
|
track.duration = 0
|
|
f.write(
|
|
"#EXTINF:"
|
|
f"{int(track.duration / 1000)},"
|
|
f"{track.title} - "
|
|
f"{track.artist}"
|
|
"\n"
|
|
f"{track.path}"
|
|
"\n"
|
|
)
|
|
|
|
def fade(self) -> None:
|
|
"""Fade currently playing track"""
|
|
|
|
if track_sequence.current:
|
|
track_sequence.current.fade()
|
|
|
|
def hide_played(self):
|
|
"""Toggle hide played tracks"""
|
|
|
|
if self.hide_played_tracks:
|
|
self.hide_played_tracks = False
|
|
self.active_proxy_model().hide_played_tracks(False)
|
|
self.btnHidePlayed.setText("Hide played")
|
|
else:
|
|
self.hide_played_tracks = True
|
|
self.active_proxy_model().hide_played_tracks(True)
|
|
self.btnHidePlayed.setText("Show played")
|
|
# Reset row heights
|
|
self.active_tab().resize_rows()
|
|
|
|
def import_track(self) -> None:
|
|
"""Import track file"""
|
|
|
|
dlg = QFileDialog()
|
|
dlg.setFileMode(QFileDialog.FileMode.ExistingFiles)
|
|
dlg.setViewMode(QFileDialog.ViewMode.Detail)
|
|
dlg.setDirectory(Config.IMPORT_DESTINATION)
|
|
dlg.setNameFilter("Music files (*.flac *.mp3)")
|
|
|
|
if not dlg.exec():
|
|
return
|
|
|
|
with db.Session() as session:
|
|
track_files: list[TrackFileData] = []
|
|
for fpath in dlg.selectedFiles():
|
|
tf = TrackFileData(fpath)
|
|
tf.tags = helpers.get_tags(fpath)
|
|
do_import = self.ok_to_import(session, fpath, tf.tags)
|
|
if do_import:
|
|
tf.track_path = os.path.join(
|
|
Config.IMPORT_DESTINATION, os.path.basename(fpath)
|
|
)
|
|
tf.audio_metadata = helpers.get_audio_metadata(fpath)
|
|
track_files.append(tf)
|
|
|
|
self.import_filenames(track_files)
|
|
|
|
def import_filenames(self, track_files: list[TrackFileData]) -> None:
|
|
"""
|
|
Import the list of filenames as new tracks
|
|
"""
|
|
|
|
# Import in separate thread
|
|
self.import_thread = QThread()
|
|
self.worker = ImportTrack(
|
|
track_files,
|
|
self.active_proxy_model(),
|
|
self.active_tab().source_model_selected_row_number(),
|
|
)
|
|
self.worker.moveToThread(self.import_thread)
|
|
self.import_thread.started.connect(self.worker.run)
|
|
self.worker.import_finished.connect(self.import_thread.quit)
|
|
self.worker.import_finished.connect(self.worker.deleteLater)
|
|
self.import_thread.finished.connect(self.import_thread.deleteLater)
|
|
self.import_thread.start()
|
|
|
|
def ok_to_import(self, session: Session, fname: str, tags: dict[str, str]) -> bool:
|
|
"""
|
|
Check file has tags, check it's not a duplicate. Return True if this filenam
|
|
is OK to import, False if not.
|
|
"""
|
|
|
|
title = tags["title"]
|
|
if not title:
|
|
helpers.show_warning(
|
|
self,
|
|
"Problem with track file",
|
|
f"{fname} does not have a title tag",
|
|
)
|
|
return False
|
|
|
|
artist = tags["artist"]
|
|
if not artist:
|
|
helpers.show_warning(
|
|
self,
|
|
"Problem with track file",
|
|
f"{fname} does not have an artist tag",
|
|
)
|
|
return False
|
|
|
|
txt = ""
|
|
count = 0
|
|
possible_matches = Tracks.search_titles(session, title)
|
|
if possible_matches:
|
|
txt += "Similar to new track "
|
|
txt += f'"{title}" by "{artist} ({fname})":\n\n'
|
|
for track in possible_matches:
|
|
txt += f' "{track.title}" by {track.artist}'
|
|
txt += f" ({track.path})\n\n"
|
|
count += 1
|
|
if count >= Config.MAX_IMPORT_MATCHES:
|
|
txt += "\nThere are more similar-looking tracks"
|
|
break
|
|
txt += "\n"
|
|
# Check whether to proceed if there were potential matches
|
|
txt += "Proceed with import?"
|
|
result = QMessageBox.question(
|
|
self,
|
|
"Possible duplicates",
|
|
txt,
|
|
QMessageBox.StandardButton.Ok,
|
|
QMessageBox.StandardButton.Cancel,
|
|
)
|
|
if result == QMessageBox.StandardButton.Cancel:
|
|
return False
|
|
|
|
return True
|
|
|
|
def initialise_audacity(self) -> None:
|
|
"""
|
|
Initialise access to audacity
|
|
"""
|
|
|
|
try:
|
|
self.audacity_client = pipeclient.PipeClient()
|
|
log.debug(f"{hex(id(self.audacity_client))=}")
|
|
except RuntimeError as e:
|
|
log.error(f"Unable to initialise Audacity: {str(e)}")
|
|
|
|
def insert_header(self) -> None:
|
|
"""Show dialog box to enter header text and add to playlist"""
|
|
|
|
proxy_model = self.active_proxy_model()
|
|
if proxy_model is None:
|
|
log.error("No proxy model")
|
|
return
|
|
|
|
# Get header text
|
|
dlg: QInputDialog = QInputDialog(self)
|
|
dlg.setInputMode(QInputDialog.InputMode.TextInput)
|
|
dlg.setLabelText("Header text:")
|
|
dlg.resize(500, 100)
|
|
ok = dlg.exec()
|
|
if ok:
|
|
proxy_model.insert_row(
|
|
proposed_row_number=self.active_tab().source_model_selected_row_number(),
|
|
note=dlg.textValue(),
|
|
)
|
|
|
|
def insert_track(self) -> None:
|
|
"""Show dialog box to select and add track from database"""
|
|
|
|
new_row_number = (
|
|
self.active_tab().source_model_selected_row_number()
|
|
or self.active_proxy_model().rowCount()
|
|
)
|
|
with db.Session() as session:
|
|
dlg = TrackSelectDialog(
|
|
parent=self,
|
|
session=session,
|
|
new_row_number=new_row_number,
|
|
source_model=self.active_proxy_model(),
|
|
)
|
|
dlg.exec()
|
|
session.commit()
|
|
|
|
def load_last_playlists(self) -> None:
|
|
"""Load the playlists that were open when the last session closed"""
|
|
|
|
playlist_ids = []
|
|
with db.Session() as session:
|
|
for playlist in Playlists.get_open(session):
|
|
if playlist:
|
|
_ = self.create_playlist_tab(playlist)
|
|
playlist_ids.append(playlist.id)
|
|
log.debug(f"load_last_playlists() loaded {playlist=}")
|
|
# Set active tab
|
|
record = Settings.get_setting(session, "active_tab")
|
|
if record.f_int is not None and record.f_int >= 0:
|
|
self.tabPlaylist.setCurrentIndex(record.f_int)
|
|
|
|
# Tabs may move during use. Rather than track where tabs
|
|
# are, we record the tab index when we close the main
|
|
# window. To avoid possible duplicate tab entries, we null
|
|
# them all out now.
|
|
Playlists.clear_tabs(session, playlist_ids)
|
|
session.commit()
|
|
|
|
def lookup_row_in_songfacts(self) -> None:
|
|
"""
|
|
Display songfacts page for title in highlighted row
|
|
"""
|
|
|
|
row_number = self.active_tab().source_model_selected_row_number()
|
|
if row_number is None:
|
|
return
|
|
|
|
track_info = self.active_proxy_model().get_row_info(row_number)
|
|
if track_info is None:
|
|
return
|
|
|
|
self.signals.search_songfacts_signal.emit(track_info.title)
|
|
|
|
def lookup_row_in_wikipedia(self) -> None:
|
|
"""
|
|
Display Wikipedia page for title in highlighted row
|
|
"""
|
|
|
|
row_number = self.active_tab().source_model_selected_row_number()
|
|
if row_number is None:
|
|
return
|
|
|
|
track_info = self.active_proxy_model().get_row_info(row_number)
|
|
if track_info is None:
|
|
return
|
|
|
|
self.signals.search_wikipedia_signal.emit(track_info.title)
|
|
|
|
def move_playlist_rows(self, row_numbers: List[int]) -> None:
|
|
"""
|
|
Move passed playlist rows to another playlist
|
|
"""
|
|
|
|
# Identify destination playlist
|
|
playlists = []
|
|
visible_tab = self.active_tab()
|
|
source_playlist_id = visible_tab.playlist_id
|
|
|
|
with db.Session() as session:
|
|
for playlist in Playlists.get_all(session):
|
|
if playlist.id == source_playlist_id:
|
|
continue
|
|
else:
|
|
playlists.append(playlist)
|
|
|
|
dlg = SelectPlaylistDialog(self, playlists=playlists, session=session)
|
|
dlg.exec()
|
|
if not dlg.playlist:
|
|
return
|
|
to_playlist_id = dlg.playlist.id
|
|
|
|
# Get row number in destination playlist
|
|
last_row = PlaylistRows.get_last_used_row(session, to_playlist_id)
|
|
if last_row is not None:
|
|
to_row = last_row + 1
|
|
else:
|
|
to_row = 0
|
|
|
|
# Move rows
|
|
self.active_proxy_model().move_rows_between_playlists(
|
|
row_numbers, to_row, to_playlist_id
|
|
)
|
|
|
|
def move_selected(self) -> None:
|
|
"""
|
|
Move selected rows to another playlist
|
|
"""
|
|
|
|
selected_rows = self.active_tab().get_selected_rows()
|
|
if not selected_rows:
|
|
return
|
|
|
|
self.move_playlist_rows(selected_rows)
|
|
|
|
def move_unplayed(self) -> None:
|
|
"""
|
|
Move unplayed rows to another playlist
|
|
"""
|
|
|
|
unplayed_rows = self.active_proxy_model().get_unplayed_rows()
|
|
if not unplayed_rows:
|
|
return
|
|
# We can get a race condition as selected rows change while
|
|
# moving so disable selected rows timing for move
|
|
self.disable_selection_timing = True
|
|
self.move_playlist_rows(unplayed_rows)
|
|
self.disable_selection_timing = False
|
|
|
|
def new_from_template(self) -> None:
|
|
"""Create new playlist from template"""
|
|
|
|
with db.Session() as session:
|
|
templates = Playlists.get_all_templates(session)
|
|
dlg = SelectPlaylistDialog(self, playlists=templates, session=session)
|
|
dlg.exec()
|
|
template = dlg.playlist
|
|
if template:
|
|
playlist_name = self.solicit_playlist_name(session)
|
|
if not playlist_name:
|
|
log.error("Template has no name")
|
|
return
|
|
playlist = Playlists.create_playlist_from_template(
|
|
session, template, playlist_name
|
|
)
|
|
|
|
# Need to ensure that the new playlist is committed to
|
|
# the database before it is opened by the model.
|
|
session.commit()
|
|
if playlist:
|
|
playlist.mark_open()
|
|
self.create_playlist_tab(playlist)
|
|
else:
|
|
log.error("Playlist failed to create")
|
|
|
|
def open_playlist(self) -> None:
|
|
"""Open existing playlist"""
|
|
|
|
with db.Session() as session:
|
|
playlists = Playlists.get_closed(session)
|
|
dlg = SelectPlaylistDialog(self, playlists=playlists, session=session)
|
|
dlg.exec()
|
|
playlist = dlg.playlist
|
|
if playlist:
|
|
idx = self.create_playlist_tab(playlist)
|
|
playlist.mark_open()
|
|
session.commit()
|
|
|
|
self.tabPlaylist.setCurrentIndex(idx)
|
|
|
|
def open_songfacts_browser(self, title: str) -> None:
|
|
"""Search Songfacts for title"""
|
|
|
|
slug = slugify(title, replacements=([["'", ""]]))
|
|
log.info(f"Songfacts browser tab for {title=}")
|
|
url = f"https://www.songfacts.com/search/songs/{slug}"
|
|
|
|
if Config.USE_INTERNAL_BROWSER:
|
|
self.tabInfolist.open_tab(url, title)
|
|
else:
|
|
webbrowser.get('browser').open_new_tab(url)
|
|
|
|
def open_wikipedia_browser(self, title: str) -> None:
|
|
"""Search Wikipedia for title"""
|
|
|
|
str = urllib.parse.quote_plus(title)
|
|
log.info(f"Wikipedia browser tab for {title=}")
|
|
url = f"https://www.wikipedia.org/w/index.php?search={str}"
|
|
|
|
if Config.USE_INTERNAL_BROWSER:
|
|
self.tabInfolist.open_tab(url, title)
|
|
else:
|
|
webbrowser.get('browser').open_new_tab(url)
|
|
|
|
def paste_rows(self) -> None:
|
|
"""
|
|
Paste earlier cut rows.
|
|
"""
|
|
|
|
if self.move_source_rows is None or self.move_source_model is None:
|
|
return
|
|
|
|
to_playlist_model: PlaylistModel = self.active_tab().source_model
|
|
selected_rows = self.active_tab().get_selected_rows()
|
|
if selected_rows:
|
|
destination_row = selected_rows[0]
|
|
else:
|
|
destination_row = self.active_proxy_model().rowCount()
|
|
|
|
if (
|
|
to_playlist_model.playlist_id
|
|
== self.move_source_model.source_model.playlist_id
|
|
):
|
|
self.move_source_model.move_rows(self.move_source_rows, destination_row)
|
|
else:
|
|
self.move_source_model.move_rows_between_playlists(
|
|
self.move_source_rows, destination_row, to_playlist_model.playlist_id
|
|
)
|
|
self.active_tab().resize_rows()
|
|
self.active_tab().clear_selection()
|
|
|
|
def play_next(self, position: Optional[float] = None) -> None:
|
|
"""
|
|
Play next track, optionally from passed position.
|
|
|
|
Actions required:
|
|
- If there is no next track set, return.
|
|
- Check for inadvertent press of return
|
|
- If there's currently a track playing, fade it.
|
|
- Move next track to current track.
|
|
- Clear next track
|
|
- Restore volume if -3dB active
|
|
- Play (new) current track.
|
|
- Show fade graph
|
|
- Notify model
|
|
- Note that track is now playing
|
|
- Disable play next controls
|
|
- Update headers
|
|
"""
|
|
|
|
log.debug(f"issue223: play_next({position=})")
|
|
|
|
# If there is no next track set, return.
|
|
if track_sequence.next is None:
|
|
log.error("musicmuster.play_next(): no next track selected")
|
|
return
|
|
|
|
# Check for inadvertent press of 'return'
|
|
if self.return_pressed_in_error():
|
|
return
|
|
|
|
# Issue #223 concerns a very short pause (maybe 0.1s) sometimes
|
|
# when starting to play at track.
|
|
|
|
# Resolution appears to be to disable timer10 for a short time.
|
|
# Length of time and re-enabling of timer10 both in update_clocks.
|
|
|
|
self.timer10.stop()
|
|
log.debug("issue223: play_next: 10ms timer disabled")
|
|
|
|
# If there's currently a track playing, fade it.
|
|
if track_sequence.current:
|
|
track_sequence.current.fade()
|
|
|
|
# Move next track to current track.
|
|
# end_of_track_actions() will have saved current track to
|
|
# previous_track
|
|
track_sequence.current = track_sequence.next
|
|
|
|
# Clear next track
|
|
self.clear_next()
|
|
|
|
# Restore volume if -3dB active
|
|
if self.btnDrop3db.isChecked():
|
|
log.debug("issue223: play_next: Reset -3db button")
|
|
self.btnDrop3db.setChecked(False)
|
|
|
|
# Play (new) current track
|
|
log.info(f"Play: {track_sequence.current.title}")
|
|
track_sequence.current.play(position)
|
|
|
|
# Update clocks now, don't wait for next tick
|
|
log.debug("issue223: play_next: update_clocks()")
|
|
self.update_clocks()
|
|
|
|
# Show closing volume graph
|
|
if track_sequence.current.fade_graph:
|
|
log.debug(f"issue223: play_next: set up fade_graph, {track_sequence.current.title=}")
|
|
track_sequence.current.fade_graph.GraphWidget = self.widgetFadeVolume
|
|
track_sequence.current.fade_graph.clear()
|
|
track_sequence.current.fade_graph.plot()
|
|
else:
|
|
log.debug("issue223: play_next: No fade_graph")
|
|
|
|
# Disable play next controls
|
|
self.catch_return_key = True
|
|
self.show_status_message("Play controls: Disabled", 0)
|
|
|
|
# Notify model
|
|
log.debug("issue223: play_next: notify model")
|
|
self.active_proxy_model().current_track_started()
|
|
|
|
# Update headers
|
|
log.debug("issue223: play_next: update headers")
|
|
self.update_headers()
|
|
with db.Session() as session:
|
|
last_played = Playdates.last_played_tracks(session)
|
|
tracklist = []
|
|
for lp in last_played:
|
|
track = session.get(Tracks, lp.track_id)
|
|
tracklist.append(f"{track.title} ({track.artist})")
|
|
tt = "<br>".join(tracklist)
|
|
|
|
self.hdrPreviousTrack.setToolTip(tt)
|
|
|
|
def preview(self) -> None:
|
|
"""
|
|
Preview selected or next track. We use a different mechanism to
|
|
normal track playing so that the user can route the output audio
|
|
differently (eg, to headphones).
|
|
"""
|
|
|
|
if self.btnPreview.isChecked():
|
|
# Get track path for first selected track if there is one
|
|
track_info = self.active_tab().get_selected_row_track_info()
|
|
if not track_info:
|
|
# Otherwise get track_id to next track to play
|
|
if track_sequence.next:
|
|
if track_sequence.next.path:
|
|
track_info = TrackInfo(
|
|
track_sequence.next.track_id, track_sequence.next.row_number
|
|
)
|
|
else:
|
|
return
|
|
self.preview_manager.set_track_info(track_info)
|
|
self.preview_manager.play()
|
|
else:
|
|
self.preview_manager.stop()
|
|
|
|
def preview_arm(self):
|
|
"""Manager arm button for setting intro length"""
|
|
|
|
self.btnPreviewMark.setEnabled(self.btnPreviewArm.isChecked())
|
|
|
|
def preview_back(self) -> None:
|
|
"""Wind back preview file"""
|
|
|
|
self.preview_manager.back(5000)
|
|
|
|
def preview_end(self) -> None:
|
|
"""Advance preview file to Config.PREVIEW_END_BUFFER_MS before end of intro"""
|
|
|
|
if self.preview_manager:
|
|
self.preview_manager.move_to_intro_end()
|
|
|
|
def preview_fwd(self) -> None:
|
|
"""Advance preview file"""
|
|
|
|
self.preview_manager.forward(5000)
|
|
|
|
def preview_mark(self) -> None:
|
|
"""Set intro time"""
|
|
|
|
if self.preview_manager.is_playing():
|
|
track_id = self.preview_manager.track_id
|
|
row_number = self.preview_manager.row_number
|
|
with db.Session() as session:
|
|
track = session.get(Tracks, track_id)
|
|
if track:
|
|
# Save intro as millisends rounded to nearest 0.1
|
|
# second because editor spinbox only resolves to 0.1
|
|
# seconds
|
|
intro = round(self.preview_manager.get_playtime() / 100) * 100
|
|
track.intro = intro
|
|
session.commit()
|
|
self.preview_manager.set_intro(intro)
|
|
self.active_tab().source_model.refresh_row(session, row_number)
|
|
self.active_tab().source_model.invalidate_row(row_number)
|
|
|
|
def preview_start(self) -> None:
|
|
"""Restart preview"""
|
|
|
|
self.preview_manager.restart()
|
|
|
|
def quicklog(self) -> None:
|
|
"""
|
|
Create log entry
|
|
"""
|
|
|
|
log.debug("quicklog timestamp; entry follows")
|
|
|
|
# Get log text
|
|
dlg: QInputDialog = QInputDialog(self)
|
|
dlg.setInputMode(QInputDialog.InputMode.TextInput)
|
|
dlg.setLabelText("Log text:")
|
|
dlg.resize(500, 100)
|
|
ok = dlg.exec()
|
|
if ok:
|
|
log.debug("quicklog: " + dlg.textValue())
|
|
|
|
def rename_playlist(self) -> None:
|
|
"""
|
|
Rename current playlist
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
playlist_id = self.active_tab().playlist_id
|
|
playlist = session.get(Playlists, playlist_id)
|
|
if playlist:
|
|
new_name = self.solicit_playlist_name(session, playlist.name)
|
|
if new_name:
|
|
playlist.rename(session, new_name)
|
|
idx = self.tabBar.currentIndex()
|
|
self.tabBar.setTabText(idx, new_name)
|
|
session.commit()
|
|
|
|
def import_files(self) -> None:
|
|
"""
|
|
Scan source directory and offer to replace existing files with "similar"
|
|
files, or import the source file as a new track.
|
|
"""
|
|
|
|
import_files: list[TrackFileData] = []
|
|
|
|
with db.Session() as session:
|
|
dlg = ReplaceFilesDialog(
|
|
session=session,
|
|
main_window=self,
|
|
)
|
|
status = dlg.exec()
|
|
if status:
|
|
for rf in dlg.replacement_files:
|
|
if rf.track_id:
|
|
# We're updating an existing track
|
|
# If the filename has changed, remove the
|
|
# existing file
|
|
if rf.obsolete_path is not None:
|
|
if os.path.exists(rf.obsolete_path):
|
|
os.unlink(rf.obsolete_path)
|
|
else:
|
|
log.error(
|
|
f"replace_files: could not unlink {rf.obsolete_path=}"
|
|
)
|
|
continue
|
|
if rf.track_path:
|
|
if os.path.exists(rf.track_path):
|
|
os.unlink(rf.track_path)
|
|
shutil.move(rf.new_file_path, rf.track_path)
|
|
track = session.get(Tracks, rf.track_id)
|
|
if not track:
|
|
raise Exception(
|
|
f"replace_files: could not retrieve track {rf.track_id}"
|
|
)
|
|
|
|
track.artist = rf.tags["artist"]
|
|
track.title = rf.tags["title"]
|
|
if track.path != rf.track_path:
|
|
track.path = rf.track_path
|
|
try:
|
|
session.commit()
|
|
except IntegrityError:
|
|
# https://jira.mariadb.org/browse/MDEV-29345 workaround
|
|
session.rollback()
|
|
track.path = "DUMMY"
|
|
session.commit()
|
|
track.path = rf.track_path
|
|
else:
|
|
session.commit()
|
|
else:
|
|
# We're importing a new track
|
|
do_import = self.ok_to_import(
|
|
session, os.path.basename(rf.new_file_path), rf.tags
|
|
)
|
|
if do_import:
|
|
rf.audio_metadata = helpers.get_audio_metadata(
|
|
rf.new_file_path
|
|
)
|
|
import_files.append(rf)
|
|
|
|
# self.import_filenames(dlg.replacement_files)
|
|
self.import_filenames(import_files)
|
|
else:
|
|
session.rollback()
|
|
session.close()
|
|
|
|
def return_pressed_in_error(self) -> bool:
|
|
"""
|
|
Check whether Return key has been pressed in error.
|
|
|
|
Return True if it has, False if not
|
|
"""
|
|
|
|
if track_sequence.current and self.catch_return_key:
|
|
# Suppress inadvertent double press
|
|
if (
|
|
track_sequence.current
|
|
and track_sequence.current.start_time
|
|
and track_sequence.current.start_time
|
|
+ dt.timedelta(milliseconds=Config.RETURN_KEY_DEBOUNCE_MS)
|
|
> dt.datetime.now()
|
|
):
|
|
return True
|
|
|
|
# If return is pressed during first PLAY_NEXT_GUARD_MS then
|
|
# default to NOT playing the next track, else default to
|
|
# playing it.
|
|
default_yes: bool = track_sequence.current.start_time is not None and (
|
|
(dt.datetime.now() - track_sequence.current.start_time).total_seconds()
|
|
* 1000
|
|
> Config.PLAY_NEXT_GUARD_MS
|
|
)
|
|
if default_yes:
|
|
msg = "Hit return to play next track now"
|
|
else:
|
|
msg = "Press tab to select Yes and hit return to play next track"
|
|
if not helpers.ask_yes_no(
|
|
"Play next track",
|
|
msg,
|
|
default_yes=default_yes,
|
|
parent=self,
|
|
):
|
|
return True
|
|
|
|
return False
|
|
|
|
def resume(self) -> None:
|
|
"""
|
|
Resume playing last track. We may be playing the next track
|
|
or none; take care of both eventualities.
|
|
|
|
Actions required:
|
|
- Return if no saved position
|
|
- Resume last track
|
|
- If a track is playing, make that the next track
|
|
"""
|
|
|
|
if not track_sequence.previous:
|
|
return
|
|
|
|
# Return if no saved position
|
|
if not track_sequence.previous.resume_marker:
|
|
log.error("No previous track position")
|
|
return
|
|
|
|
# We want to use play_next() to resume, so copy the previous
|
|
# track to the next track:
|
|
track_sequence.set_next(track_sequence.previous)
|
|
|
|
# Now resume playing the now-next track
|
|
self.play_next(track_sequence.next.resume_marker)
|
|
|
|
# Adjust track info so that clocks and graph are correct.
|
|
# We need to fake the start time to reflect where we resumed the
|
|
# track
|
|
if (
|
|
track_sequence.current
|
|
and track_sequence.current.start_time
|
|
and track_sequence.current.duration
|
|
and track_sequence.current.resume_marker
|
|
):
|
|
elapsed_ms = (
|
|
track_sequence.current.duration * track_sequence.current.resume_marker
|
|
)
|
|
track_sequence.current.start_time -= dt.timedelta(milliseconds=elapsed_ms)
|
|
|
|
def save_as_template(self) -> None:
|
|
"""Save current playlist as template"""
|
|
|
|
with db.Session() as session:
|
|
template_names = [a.name for a in Playlists.get_all_templates(session)]
|
|
|
|
while True:
|
|
# Get name for new template
|
|
dlg = QInputDialog(self)
|
|
dlg.setInputMode(QInputDialog.InputMode.TextInput)
|
|
dlg.setLabelText("Template name:")
|
|
dlg.resize(500, 100)
|
|
ok = dlg.exec()
|
|
if not ok:
|
|
return
|
|
|
|
template_name = dlg.textValue()
|
|
if template_name not in template_names:
|
|
break
|
|
helpers.show_warning(
|
|
self, "Duplicate template", "Template name already in use"
|
|
)
|
|
Playlists.save_as_template(
|
|
session, self.active_tab().playlist_id, template_name
|
|
)
|
|
session.commit()
|
|
helpers.show_OK(self, "Template", "Template saved")
|
|
|
|
def search_playlist(self) -> None:
|
|
"""Show text box to search playlist"""
|
|
|
|
# Disable play controls so that 'return' in search box doesn't
|
|
# play next track
|
|
self.catch_return_key = True
|
|
self.txtSearch.setHidden(False)
|
|
self.txtSearch.setFocus()
|
|
# Select any text that may already be there
|
|
self.txtSearch.selectAll()
|
|
|
|
def search_playlist_clear(self) -> None:
|
|
"""Tidy up and reset search bar"""
|
|
|
|
# Clean up search bar
|
|
self.txtSearch.setText("")
|
|
self.txtSearch.setHidden(True)
|
|
|
|
def search_playlist_text_changed(self) -> None:
|
|
"""
|
|
Incremental search of playlist
|
|
"""
|
|
|
|
self.active_proxy_model().set_incremental_search(self.txtSearch.text())
|
|
|
|
def select_next_row(self) -> None:
|
|
"""Select next or first row in playlist"""
|
|
|
|
self.active_tab().select_next_row()
|
|
|
|
def select_previous_row(self) -> None:
|
|
"""Select previous or first row in playlist"""
|
|
|
|
self.active_tab().select_previous_row()
|
|
|
|
def set_main_window_size(self) -> None:
|
|
"""Set size of window from database"""
|
|
|
|
with db.Session() as session:
|
|
x = Settings.get_setting(session, "mainwindow_x").f_int or 100
|
|
y = Settings.get_setting(session, "mainwindow_y").f_int or 100
|
|
width = Settings.get_setting(session, "mainwindow_width").f_int or 100
|
|
height = Settings.get_setting(session, "mainwindow_height").f_int or 100
|
|
self.setGeometry(x, y, width, height)
|
|
|
|
if Config.USE_INTERNAL_BROWSER:
|
|
splitter_top = (
|
|
Settings.get_setting(session, "splitter_top").f_int or 100
|
|
)
|
|
splitter_bottom = (
|
|
Settings.get_setting(session, "splitter_bottom").f_int or 100
|
|
)
|
|
self.splitter.setSizes([splitter_top, splitter_bottom])
|
|
else:
|
|
self.tabInfolist.hide()
|
|
|
|
def set_selected_track_next(self) -> None:
|
|
"""
|
|
Set currently-selected row on visible playlist tab as next track
|
|
"""
|
|
|
|
playlist_tab = self.active_tab()
|
|
if playlist_tab:
|
|
playlist_tab.set_row_as_next_track()
|
|
else:
|
|
log.error("No active tab")
|
|
|
|
def set_tab_colour(self, widget: PlaylistTab, colour: QColor) -> None:
|
|
"""
|
|
Find the tab containing the widget and set the text colour
|
|
"""
|
|
|
|
idx = self.tabPlaylist.indexOf(widget)
|
|
self.tabPlaylist.tabBar().setTabTextColor(idx, colour)
|
|
|
|
def show_current(self) -> None:
|
|
"""Scroll to show current track"""
|
|
|
|
if track_sequence.current:
|
|
self.show_track(track_sequence.current)
|
|
|
|
def show_warning(self, title: str, body: str) -> None:
|
|
"""
|
|
Display a warning dialog
|
|
"""
|
|
|
|
print(f"show_warning({title=}, {body=})")
|
|
QMessageBox.warning(self, title, body)
|
|
|
|
def show_next(self) -> None:
|
|
"""Scroll to show next track"""
|
|
|
|
if track_sequence.next:
|
|
self.show_track(track_sequence.next)
|
|
|
|
def show_status_message(self, message: str, timing: int) -> None:
|
|
"""
|
|
Show status message in status bar for timing milliseconds
|
|
"""
|
|
|
|
self.statusbar.showMessage(message, timing)
|
|
|
|
def show_track(self, playlist_track: RowAndTrack) -> None:
|
|
"""Scroll to show track in plt"""
|
|
|
|
# Switch to the correct tab
|
|
playlist_id = playlist_track.playlist_id
|
|
if not playlist_id:
|
|
# No playlist
|
|
return
|
|
|
|
if playlist_id != self.active_tab().playlist_id:
|
|
for idx in range(self.tabPlaylist.count()):
|
|
if self.tabPlaylist.widget(idx).playlist_id == playlist_id:
|
|
self.tabPlaylist.setCurrentIndex(idx)
|
|
break
|
|
|
|
display_row = (
|
|
self.active_proxy_model()
|
|
.mapFromSource(
|
|
self.active_proxy_model().source_model.index(
|
|
playlist_track.row_number, 0
|
|
)
|
|
)
|
|
.row()
|
|
)
|
|
self.tabPlaylist.currentWidget().scroll_to_top(display_row)
|
|
|
|
def solicit_playlist_name(
|
|
self, session: Session, default: str = ""
|
|
) -> Optional[str]:
|
|
"""Get name of new playlist from user"""
|
|
|
|
dlg = QInputDialog(self)
|
|
dlg.setInputMode(QInputDialog.InputMode.TextInput)
|
|
dlg.setLabelText("Playlist name:")
|
|
while True:
|
|
if default:
|
|
dlg.setTextValue(default)
|
|
dlg.resize(500, 100)
|
|
ok = dlg.exec()
|
|
if ok:
|
|
proposed_name = dlg.textValue()
|
|
if Playlists.name_is_available(session, proposed_name):
|
|
return proposed_name
|
|
else:
|
|
helpers.show_warning(
|
|
self,
|
|
"Name in use",
|
|
f"There's already a playlist called '{proposed_name}'",
|
|
)
|
|
continue
|
|
else:
|
|
return None
|
|
|
|
def stop(self) -> None:
|
|
"""Stop playing immediately"""
|
|
|
|
self.stop_autoplay = True
|
|
if track_sequence.current:
|
|
track_sequence.current.stop()
|
|
|
|
def tab_change(self):
|
|
"""Called when active tab changed"""
|
|
|
|
self.active_tab().resize_rows()
|
|
|
|
def tick_10ms(self) -> None:
|
|
"""
|
|
Called every 10ms
|
|
"""
|
|
|
|
if track_sequence.current:
|
|
track_sequence.current.update_fade_graph()
|
|
|
|
def tick_100ms(self) -> None:
|
|
"""
|
|
Called every 100ms
|
|
"""
|
|
|
|
if track_sequence.current:
|
|
try:
|
|
track_sequence.current.check_for_end_of_track()
|
|
|
|
# Update intro counter if applicable and, if updated, return
|
|
# because playing an intro takes precedence over timing a
|
|
# preview.
|
|
intro_ms_remaining = track_sequence.current.time_remaining_intro()
|
|
if intro_ms_remaining > 0:
|
|
self.label_intro_timer.setText(f"{intro_ms_remaining / 1000:.1f}")
|
|
if intro_ms_remaining <= Config.INTRO_SECONDS_WARNING_MS:
|
|
self.label_intro_timer.setStyleSheet(
|
|
f"background: {Config.COLOUR_WARNING_TIMER}"
|
|
)
|
|
return
|
|
else:
|
|
if self.label_intro_timer.styleSheet() != "":
|
|
self.label_intro_timer.setStyleSheet("")
|
|
self.label_intro_timer.setText("0.0")
|
|
except AttributeError:
|
|
# current track ended during servicing tick
|
|
pass
|
|
|
|
# Ensure preview button is reset if preview finishes playing
|
|
# Update preview timer
|
|
if self.btnPreview.isChecked():
|
|
if self.preview_manager.is_playing():
|
|
self.btnPreview.setChecked(True)
|
|
minutes, seconds = divmod(
|
|
self.preview_manager.get_playtime() / 1000, 60
|
|
)
|
|
self.label_intro_timer.setText(f"{int(minutes)}:{seconds:04.1f}")
|
|
else:
|
|
self.btnPreview.setChecked(False)
|
|
self.label_intro_timer.setText("0.0")
|
|
self.label_intro_timer.setStyleSheet("")
|
|
self.btnPreview.setChecked(False)
|
|
|
|
def tick_1000ms(self) -> None:
|
|
"""
|
|
Called every 1000ms
|
|
"""
|
|
|
|
# Only update play clocks once a second so that their updates
|
|
# are synchronised (otherwise it looks odd)
|
|
|
|
self.update_clocks()
|
|
|
|
def tick_500ms(self) -> None:
|
|
"""
|
|
Called every 500ms
|
|
"""
|
|
|
|
self.lblTOD.setText(dt.datetime.now().strftime(Config.TOD_TIME_FORMAT))
|
|
|
|
def update_clocks(self) -> None:
|
|
"""
|
|
Update track clocks.
|
|
"""
|
|
|
|
# If track is playing, update track clocks time and colours
|
|
if track_sequence.current and track_sequence.current.is_playing():
|
|
# see play_next() and issue #223.
|
|
# TODO: find a better way of handling this
|
|
if (
|
|
track_sequence.current.time_playing() > 5000
|
|
and not self.timer10.isActive()
|
|
):
|
|
self.timer10.start(10)
|
|
log.debug("issue223: update_clocks: 10ms timer enabled")
|
|
|
|
# Elapsed time
|
|
self.label_elapsed_timer.setText(
|
|
helpers.ms_to_mmss(track_sequence.current.time_playing())
|
|
+ " / "
|
|
+ helpers.ms_to_mmss(track_sequence.current.duration)
|
|
)
|
|
|
|
# Time to fade
|
|
time_to_fade = track_sequence.current.time_to_fade()
|
|
time_to_silence = track_sequence.current.time_to_silence()
|
|
self.label_fade_timer.setText(helpers.ms_to_mmss(time_to_fade))
|
|
|
|
# If silent in the next 5 seconds, put warning colour on
|
|
# time to silence box and enable play controls
|
|
if time_to_silence <= Config.WARNING_MS_BEFORE_SILENCE:
|
|
css_silence = f"background: {Config.COLOUR_ENDING_TIMER}"
|
|
if self.frame_silent.styleSheet() != css_silence:
|
|
self.frame_silent.setStyleSheet(css_silence)
|
|
self.catch_return_key = False
|
|
self.show_status_message("Play controls: Enabled", 0)
|
|
|
|
# Set warning colour on time to silence box when fade starts
|
|
elif time_to_fade <= 500:
|
|
css_fade = f"background: {Config.COLOUR_WARNING_TIMER}"
|
|
if self.frame_silent.styleSheet() != css_fade:
|
|
self.frame_silent.setStyleSheet(css_fade)
|
|
|
|
# Five seconds before fade starts, set warning colour on
|
|
# time to silence box and enable play controls
|
|
elif time_to_fade <= Config.WARNING_MS_BEFORE_FADE:
|
|
self.frame_fade.setStyleSheet(
|
|
f"background: {Config.COLOUR_WARNING_TIMER}"
|
|
)
|
|
self.catch_return_key = False
|
|
self.show_status_message("Play controls: Enabled", 0)
|
|
else:
|
|
self.frame_silent.setStyleSheet("")
|
|
self.frame_fade.setStyleSheet("")
|
|
|
|
self.label_silent_timer.setText(helpers.ms_to_mmss(time_to_silence))
|
|
|
|
def update_headers(self) -> None:
|
|
"""
|
|
Update last / current / next track headers
|
|
"""
|
|
|
|
if track_sequence.previous:
|
|
self.hdrPreviousTrack.setText(
|
|
f"{track_sequence.previous.title} - {track_sequence.previous.artist}"
|
|
)
|
|
else:
|
|
self.hdrPreviousTrack.setText("")
|
|
|
|
if track_sequence.current:
|
|
self.hdrCurrentTrack.setText(
|
|
f"{track_sequence.current.title.replace('&', '&&')} - "
|
|
f"{track_sequence.current.artist.replace('&', '&&')}"
|
|
)
|
|
else:
|
|
self.hdrCurrentTrack.setText("")
|
|
|
|
if track_sequence.next:
|
|
self.hdrNextTrack.setText(
|
|
f"{track_sequence.next.title.replace('&', '&&')} - "
|
|
f"{track_sequence.next.artist.replace('&', '&&')}"
|
|
)
|
|
else:
|
|
self.hdrNextTrack.setText("")
|
|
|
|
|
|
class DownloadCSV(QDialog):
|
|
def __init__(self, parent=None):
|
|
super().__init__()
|
|
|
|
self.ui = Ui_DateSelect()
|
|
self.ui.setupUi(self)
|
|
self.ui.dateTimeEdit.setDate(QDate.currentDate())
|
|
self.ui.dateTimeEdit.setTime(QTime(19, 59, 0))
|
|
self.ui.buttonBox.accepted.connect(self.accept)
|
|
self.ui.buttonBox.rejected.connect(self.reject)
|
|
|
|
|
|
class SelectPlaylistDialog(QDialog):
|
|
def __init__(self, parent=None, playlists=None, session=None):
|
|
super().__init__()
|
|
|
|
if playlists is None:
|
|
return
|
|
self.ui = Ui_dlgSelectPlaylist()
|
|
self.ui.setupUi(self)
|
|
self.ui.lstPlaylists.itemDoubleClicked.connect(self.list_doubleclick)
|
|
self.ui.buttonBox.accepted.connect(self.open)
|
|
self.ui.buttonBox.rejected.connect(self.close)
|
|
self.session = session
|
|
self.playlist = None
|
|
|
|
record = Settings.get_setting(self.session, "select_playlist_dialog_width")
|
|
width = record.f_int or 800
|
|
record = Settings.get_setting(self.session, "select_playlist_dialog_height")
|
|
height = record.f_int or 600
|
|
self.resize(width, height)
|
|
|
|
for playlist in playlists:
|
|
p = QListWidgetItem()
|
|
p.setText(playlist.name)
|
|
p.setData(Qt.ItemDataRole.UserRole, playlist)
|
|
self.ui.lstPlaylists.addItem(p)
|
|
|
|
def __del__(self): # review
|
|
record = Settings.get_setting(self.session, "select_playlist_dialog_height")
|
|
record.f_int = self.height()
|
|
|
|
record = Settings.get_setting(self.session, "select_playlist_dialog_width")
|
|
record.f_int = self.width()
|
|
|
|
self.session.commit()
|
|
|
|
def list_doubleclick(self, entry): # review
|
|
self.playlist = entry.data(Qt.ItemDataRole.UserRole)
|
|
self.accept()
|
|
|
|
def open(self): # review
|
|
if self.ui.lstPlaylists.selectedItems():
|
|
item = self.ui.lstPlaylists.currentItem()
|
|
self.playlist = item.data(Qt.ItemDataRole.UserRole)
|
|
self.accept()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
"""
|
|
If command line arguments given, carry out requested function and
|
|
exit. Otherwise run full application.
|
|
"""
|
|
|
|
p = argparse.ArgumentParser()
|
|
# Only allow at most one option to be specified
|
|
group = p.add_mutually_exclusive_group()
|
|
group.add_argument(
|
|
"-b",
|
|
"--bitrates",
|
|
action="store_true",
|
|
dest="update_bitrates",
|
|
default=False,
|
|
help="Update bitrates in database",
|
|
)
|
|
group.add_argument(
|
|
"-c",
|
|
"--check-database",
|
|
action="store_true",
|
|
dest="check_db",
|
|
default=False,
|
|
help="Check and report on database",
|
|
)
|
|
args = p.parse_args()
|
|
|
|
# Run as required
|
|
if args.check_db:
|
|
log.debug("Checking database")
|
|
with db.Session() as session:
|
|
check_db(session)
|
|
elif args.update_bitrates:
|
|
log.debug("Update bitrates")
|
|
with db.Session() as session:
|
|
update_bitrates(session)
|
|
else:
|
|
try:
|
|
app = QApplication(sys.argv)
|
|
# PyQt6 defaults to a grey for labels
|
|
palette = app.palette()
|
|
palette.setColor(
|
|
QPalette.ColorRole.WindowText, QColor(Config.COLOUR_LABEL_TEXT)
|
|
)
|
|
# Set colours that will be used by playlist row stripes
|
|
palette.setColor(
|
|
QPalette.ColorRole.Base, QColor(Config.COLOUR_EVEN_PLAYLIST)
|
|
)
|
|
palette.setColor(
|
|
QPalette.ColorRole.AlternateBase, QColor(Config.COLOUR_ODD_PLAYLIST)
|
|
)
|
|
app.setPalette(palette)
|
|
win = Window()
|
|
win.show()
|
|
status = app.exec()
|
|
sys.exit(status)
|
|
except Exception as exc:
|
|
if os.environ["MM_ENV"] == "PRODUCTION":
|
|
from helpers import send_mail
|
|
|
|
msg = stackprinter.format(exc)
|
|
send_mail(
|
|
",".join(Config.ERRORS_TO),
|
|
",".join(Config.ERRORS_FROM),
|
|
"Exception from musicmuster.py",
|
|
msg,
|
|
)
|
|
log.debug(msg)
|
|
else:
|
|
print("\033[1;31;47mUnhandled exception starts")
|
|
print(
|
|
stackprinter.format(
|
|
exc, suppressed_paths=["/pypoetry/virtualenvs/"], style="darkbg"
|
|
)
|
|
)
|
|
print("Unhandled exception ends\033[1;37;40m")
|