#!/usr/bin/env python3 # Standard library imports from os.path import basename from typing import List, Optional import argparse import datetime as dt import os import shutil from slugify import slugify # type: ignore import subprocess import sys import urllib.parse import webbrowser # PyQt imports from PyQt6.QtCore import ( pyqtSignal, QDate, QObject, Qt, QThread, QTime, QTimer, ) from PyQt6.QtGui import ( QCloseEvent, QColor, QKeySequence, QPalette, QShortcut, ) from PyQt6.QtWidgets import ( QApplication, QDialog, QFileDialog, QInputDialog, QLabel, QLineEdit, QListWidgetItem, QMainWindow, QMessageBox, QWidget, ) # Third party imports import pipeclient from pygame import mixer from sqlalchemy.exc import IntegrityError from sqlalchemy.orm.session import Session import stackprinter # type: ignore # App imports from classes import ( MusicMusterSignals, TrackFileData, TrackInfo, ) from config import Config from dialogs import TrackSelectDialog, ReplaceFilesDialog from helpers import file_is_unreadable from log import log from models import db, Playdates, PlaylistRows, Playlists, Settings, Tracks from playlistmodel import PlaylistModel, PlaylistProxyModel from playlists import PlaylistTab from trackmanager import ( MainTrackManager, track_sequence, ) from ui import icons_rc # noqa F401 from ui.dlg_SelectPlaylist_ui import Ui_dlgSelectPlaylist # type: ignore from ui.downloadcsv_ui import Ui_DateSelect # type: ignore from ui.main_window_ui import Ui_MainWindow # type: ignore from utilities import check_db, update_bitrates import helpers class ImportTrack(QObject): import_finished = pyqtSignal() def __init__( self, track_files: List[TrackFileData], source_model: PlaylistModel, row_number: Optional[int], ) -> None: super().__init__() self.track_files = track_files self.source_model = source_model if row_number is None: self.next_row_number = source_model.rowCount() else: self.next_row_number = row_number self.signals = MusicMusterSignals() # Sanity check for tf in track_files: if not tf.tags: raise Exception(f"ImportTrack: no tags for {tf.new_file_path}") if not tf.audio_metadata: raise Exception( f"ImportTrack: no audio_metadata for {tf.new_file_path}" ) if tf.track_path is None: raise Exception(f"ImportTrack: no track_path for {tf.new_file_path}") def run(self): """ Create track objects from passed files and add to visible playlist """ with db.Session() as session: for tf in self.track_files: self.signals.status_message_signal.emit( f"Importing {basename(tf.new_file_path)}", 5000 ) # Sanity check if not os.path.exists(tf.new_file_path): log.error(f"ImportTrack: file not found: {tf.new_file_path=}") continue # Move the track file. Check that we're not importing a # file that's already in its final destination. if os.path.exists(tf.track_path) and tf.track_path != tf.new_file_path: os.unlink(tf.track_path) shutil.move(tf.new_file_path, tf.track_path) # Import track try: track = Tracks( session, path=tf.track_path, **tf.audio_metadata | tf.tags ) except Exception as e: self.signals.show_warning_signal.emit( "Error importing track", str(e) ) return helpers.normalise_track(tf.track_path) # We're importing potentially multiple tracks in a loop. # If there's an error adding the track to the Tracks # table, the session will rollback, thus losing any # previous additions in this loop. So, commit now to # lock in what we've just done. session.commit() self.source_model.insert_row(self.next_row_number, track.id, "") self.next_row_number += 1 self.signals.status_message_signal.emit( f"{len(self.track_files)} tracks imported", 10000 ) self.import_finished.emit() class PreviewManager: """ Manage track preview player """ def __init__(self) -> None: mixer.init() self.intro: Optional[int] = None self.path: str = "" self.row_number: Optional[int] = None self.start_time: Optional[dt.datetime] = None self.track_id: int = 0 def back(self, ms: int) -> None: """ Move play position back by 'ms' milliseconds """ position = max(0, (self.get_playtime() - ms)) / 1000 mixer.music.set_pos(position) self.start_time = dt.datetime.now() - dt.timedelta(seconds=position) def forward(self, ms: int) -> None: """ Move play position forward by 'ms' milliseconds """ position = (self.get_playtime() + ms) / 1000 mixer.music.set_pos(position) self.start_time = dt.datetime.now() - dt.timedelta(seconds=position) def get_playtime(self) -> int: """ Return time since track started in milliseconds, 0 if not playing """ if not mixer.music.get_busy(): return 0 if not self.start_time: return 0 return int((dt.datetime.now() - self.start_time).total_seconds() * 1000) def is_playing(self) -> bool: return mixer.music.get_busy() def move_to_intro_end(self) -> None: """ Move play position to 'buffer' milliseconds before end of intro. If no intro defined, do nothing. """ if self.intro is None: return position = max(0, self.intro - Config.PREVIEW_END_BUFFER_MS) / 1000 mixer.music.set_pos(position) self.start_time = dt.datetime.now() - dt.timedelta(seconds=position) def play(self) -> None: mixer.music.play() self.start_time = dt.datetime.now() def restart(self) -> None: """ Restart player from beginning """ if not mixer.music.get_busy(): return mixer.music.rewind() self.start_time = dt.datetime.now() def set_intro(self, ms: int) -> None: """ Set intro time """ self.intro = ms def set_track_info(self, info: TrackInfo) -> None: self.track_id = info.track_id self.row_number = info.row_number with db.Session() as session: track = session.get(Tracks, self.track_id) if not track: raise ValueError( f"PreviewManager: unable to retreive track {self.track_id=}" ) self.intro = track.intro self.path = track.path # Check file readable if file_is_unreadable(self.path): raise ValueError(f"PreviewManager.__init__: {track.path=} unreadable") mixer.music.load(self.path) def stop(self) -> None: mixer.music.stop() mixer.music.unload() self.path = "" self.row_number = None self.track_id = 0 self.start_time = None class Window(QMainWindow, Ui_MainWindow): def __init__( self, parent: Optional[QWidget] = None, *args: list, **kwargs: dict ) -> None: super().__init__(parent) self.setupUi(self) self.timer10: QTimer = QTimer() self.timer100: QTimer = QTimer() self.timer500: QTimer = QTimer() self.timer1000: QTimer = QTimer() self.set_main_window_size() self.lblSumPlaytime = QLabel("") self.statusbar.addPermanentWidget(self.lblSumPlaytime) self.txtSearch = QLineEdit() self.txtSearch.setHidden(True) self.statusbar.addWidget(self.txtSearch) self.hide_played_tracks = False self.preview_manager = PreviewManager() self.widgetFadeVolume.hideAxis("bottom") self.widgetFadeVolume.hideAxis("left") self.widgetFadeVolume.setDefaultPadding(0) self.widgetFadeVolume.setBackground(Config.FADE_CURVE_BACKGROUND) self.active_tab = lambda: self.tabPlaylist.currentWidget() self.active_proxy_model = lambda: self.tabPlaylist.currentWidget().model() self.move_source_rows: Optional[List[int]] = None self.move_source_model: Optional[PlaylistProxyModel] = None self.audacity_file_path: Optional[str] = None self.audacity_client: Optional[pipeclient.PipeClient] = None self.initialise_audacity() self.disable_selection_timing = False self.clock_counter = 0 self.timer10.start(10) self.timer100.start(100) self.timer500.start(500) self.timer1000.start(1000) self.signals = MusicMusterSignals() self.connect_signals_slots() self.catch_return_key = False if not Config.USE_INTERNAL_BROWSER: webbrowser.register( "browser", None, webbrowser.BackgroundBrowser(Config.EXTERNAL_BROWSER_PATH), ) # Set up shortcut key for instant logging from keyboard self.action_quicklog = QShortcut(QKeySequence("Ctrl+L"), self) self.action_quicklog.activated.connect(self.quicklog) self.load_last_playlists() def about(self) -> None: """Get git tag and database name""" try: git_tag = str( subprocess.check_output(["git", "describe"], stderr=subprocess.STDOUT) ).strip("'b\\n") except subprocess.CalledProcessError as exc_info: git_tag = str(exc_info.output) with db.Session() as session: if session.bind: dbname = session.bind.engine.url.database QMessageBox.information( self, "About", f"MusicMuster {git_tag}\n\nDatabase: {dbname}", QMessageBox.StandardButton.Ok, ) def clear_next(self) -> None: """ Clear next track """ track_sequence.next = None self.update_headers() def clear_selection(self) -> None: """Clear row selection""" # Unselect any selected rows if self.active_tab(): self.active_tab().clear_selection() # Clear the search bar self.search_playlist_clear() def closeEvent(self, event: Optional[QCloseEvent]) -> None: """Handle attempt to close main window""" if not event: return # Don't allow window to close when a track is playing if track_sequence.current and track_sequence.current.is_playing(): event.ignore() helpers.show_warning( self, "Track playing", "Can't close application while track is playing" ) else: with db.Session() as session: # Save tab number of open playlists open_playlist_ids: dict[int, int] = {} for idx in range(self.tabPlaylist.count()): open_playlist_ids[self.tabPlaylist.widget(idx).playlist_id] = idx Playlists.clear_tabs(session, list(open_playlist_ids.keys())) for playlist_id, idx in open_playlist_ids.items(): playlist = session.get(Playlists, playlist_id) if playlist: log.debug(f"Set {playlist=} tab to {idx=}") playlist.tab = idx # Save window attributes splitter_top, splitter_bottom = self.splitter.sizes() attributes_to_save = dict( mainwindow_height=self.height(), mainwindow_width=self.width(), mainwindow_x=self.x(), mainwindow_y=self.y(), splitter_top=splitter_top, splitter_bottom=splitter_bottom, active_tab=self.tabPlaylist.currentIndex(), ) for name, value in attributes_to_save.items(): record = Settings.get_setting(session, name) record.f_int = value session.commit() event.accept() def close_playlist_tab(self) -> bool: """ Close active playlist tab, called by menu item """ return self.close_tab(self.tabPlaylist.currentIndex()) def close_tab(self, tab_index: int) -> bool: """ Close playlist tab unless it holds the current or next track. Called from close_playlist_tab() or by clicking close button on tab. Return True if tab closed else False. """ closing_tab_playlist_id = self.tabPlaylist.widget(tab_index).playlist_id # Don't close current track playlist if track_sequence.current is not None: current_track_playlist_id = track_sequence.current.playlist_id if current_track_playlist_id: if closing_tab_playlist_id == current_track_playlist_id: helpers.show_OK( self, "Current track", "Can't close current track playlist" ) return False # Don't close next track playlist if track_sequence.next is not None: next_track_playlist_id = track_sequence.next.playlist_id if next_track_playlist_id: if closing_tab_playlist_id == next_track_playlist_id: helpers.show_OK( self, "Next track", "Can't close next track playlist" ) return False # Record playlist as closed and update remaining playlist tabs with db.Session() as session: playlist = session.get(Playlists, closing_tab_playlist_id) if playlist: playlist.close(session) # Close playlist and remove tab self.tabPlaylist.widget(tab_index).close() self.tabPlaylist.removeTab(tab_index) return True def connect_signals_slots(self) -> None: self.action_About.triggered.connect(self.about) self.action_Clear_selection.triggered.connect(self.clear_selection) self.actionDebug.triggered.connect(self.debug) self.actionClosePlaylist.triggered.connect(self.close_playlist_tab) self.actionDeletePlaylist.triggered.connect(self.delete_playlist) self.actionDownload_CSV_of_played_tracks.triggered.connect( self.download_played_tracks ) self.actionExport_playlist.triggered.connect(self.export_playlist_tab) self.actionFade.triggered.connect(self.fade) self.actionImport.triggered.connect(self.import_track) self.actionInsertSectionHeader.triggered.connect(self.insert_header) self.actionInsertTrack.triggered.connect(self.insert_track) self.actionMark_for_moving.triggered.connect(self.cut_rows) self.actionMoveSelected.triggered.connect(self.move_selected) self.actionNew_from_template.triggered.connect(self.new_from_template) self.actionNewPlaylist.triggered.connect(self.create_and_show_playlist) self.actionOpenPlaylist.triggered.connect(self.open_playlist) self.actionPaste.triggered.connect(self.paste_rows) self.actionPlay_next.triggered.connect(self.play_next) self.actionRenamePlaylist.triggered.connect(self.rename_playlist) self.actionReplace_files.triggered.connect(self.import_files) self.actionResume.triggered.connect(self.resume) self.actionSave_as_template.triggered.connect(self.save_as_template) self.actionSearch_title_in_Songfacts.triggered.connect( self.lookup_row_in_songfacts ) self.actionSearch_title_in_Wikipedia.triggered.connect( self.lookup_row_in_wikipedia ) self.actionSearch.triggered.connect(self.search_playlist) self.actionSelect_duplicate_rows.triggered.connect( lambda: self.active_tab().select_duplicate_rows() ) self.actionMoveUnplayed.triggered.connect(self.move_unplayed) self.actionSetNext.triggered.connect(self.set_selected_track_next) self.actionSkipToNext.triggered.connect(self.play_next) self.actionStop.triggered.connect(self.stop) self.btnDrop3db.clicked.connect(self.drop3db) self.btnFade.clicked.connect(self.fade) self.btnHidePlayed.clicked.connect(self.hide_played) self.btnPreviewArm.clicked.connect(self.preview_arm) self.btnPreviewBack.clicked.connect(self.preview_back) self.btnPreview.clicked.connect(self.preview) self.btnPreviewEnd.clicked.connect(self.preview_end) self.btnPreviewFwd.clicked.connect(self.preview_fwd) self.btnPreviewMark.clicked.connect(self.preview_mark) self.btnPreviewStart.clicked.connect(self.preview_start) self.btnStop.clicked.connect(self.stop) self.hdrCurrentTrack.clicked.connect(self.show_current) self.hdrNextTrack.clicked.connect(self.show_next) self.tabPlaylist.currentChanged.connect(self.tab_change) self.tabPlaylist.tabCloseRequested.connect(self.close_tab) self.tabBar = self.tabPlaylist.tabBar() self.txtSearch.textChanged.connect(self.search_playlist_text_changed) self.signals.enable_escape_signal.connect(self.enable_escape) self.signals.next_track_changed_signal.connect(self.update_headers) self.signals.status_message_signal.connect(self.show_status_message) self.signals.show_warning_signal.connect(self.show_warning) self.signals.track_ended_signal.connect(self.end_of_track_actions) self.timer10.timeout.connect(self.tick_10ms) self.timer500.timeout.connect(self.tick_500ms) self.timer100.timeout.connect(self.tick_100ms) self.timer1000.timeout.connect(self.tick_1000ms) self.signals.search_songfacts_signal.connect(self.open_songfacts_browser) self.signals.search_wikipedia_signal.connect(self.open_wikipedia_browser) def create_playlist( self, session: Session, playlist_name: Optional[str] = None ) -> Optional[Playlists]: """Create new playlist""" log.info(f"create_playlist({playlist_name=}") playlist_name = self.solicit_playlist_name(session) if not playlist_name: return None playlist = Playlists(session, playlist_name) if playlist: playlist.mark_open() return playlist else: log.error("Failed to create playlist") return None def create_and_show_playlist(self) -> None: """Create new playlist and display it""" with db.Session() as session: playlist = self.create_playlist(session) if playlist: self.create_playlist_tab(playlist) session.commit() def create_playlist_tab(self, playlist: Playlists) -> int: """ Take the passed playlist database object, create a playlist tab and add tab to display. Return index number of tab. """ log.debug(f"create_playlist_tab({playlist=})") playlist_tab = PlaylistTab( musicmuster=self, playlist_id=playlist.id, ) idx = self.tabPlaylist.addTab(playlist_tab, playlist.name) log.debug(f"create_playlist_tab() returned: {idx=}") return idx def cut_rows(self) -> None: """ Cut rows ready for pasting. """ # Save the selected PlaylistRows items ready for a later # paste self.move_source_rows = self.active_tab().get_selected_rows() self.move_source_model = self.active_proxy_model() log.info(f"cut_rows(): {self.move_source_rows=} {self.move_source_model=}") def debug(self): """Invoke debugger""" visible_playlist_id = self.active_tab().playlist_id print(f"Active playlist id={visible_playlist_id}") import ipdb # type: ignore ipdb.set_trace() def delete_playlist(self) -> None: """ Delete current playlist """ with db.Session() as session: playlist_id = self.active_tab().playlist_id playlist = session.get(Playlists, playlist_id) if playlist: if helpers.ask_yes_no( "Delete playlist", f"Delete playlist '{playlist.name}': " "Are you sure?", ): if self.close_playlist_tab(): playlist.delete(session) session.commit() else: log.error("Failed to retrieve playlist") def download_played_tracks(self) -> None: """Download a CSV of played tracks""" dlg = DownloadCSV(self) if dlg.exec(): start_dt = dlg.ui.dateTimeEdit.dateTime().toPyDateTime() # Get output filename pathspec = QFileDialog.getSaveFileName( self, "Save CSV of tracks played", directory="/tmp/playlist.csv", filter="CSV files (*.csv)", ) if not pathspec: return path = pathspec[0] if not path.endswith(".csv"): path += ".csv" with open(path, "w") as f: with db.Session() as session: for playdate in Playdates.played_after(session, start_dt): f.write(f"{playdate.track.artist},{playdate.track.title}\n") def drop3db(self) -> None: """Drop music level by 3db if button checked""" if track_sequence.current: track_sequence.current.drop3db(self.btnDrop3db.isChecked()) def enable_escape(self, enabled: bool) -> None: """ Manage signal to enable/disable handling ESC character. Needed because we want to use ESC when editing playlist in place, so we need to disable it here while editing. """ log.debug(f"enable_escape({enabled=})") self.action_Clear_selection.setEnabled(enabled) def end_of_track_actions(self) -> None: """ Actions required: - Reset track_sequence objects - Tell model track has finished - Reset clocks - Update headers - Enable controls """ # Reset track_sequence objects track_sequence.previous = track_sequence.current track_sequence.current = None # Tell model previous track has finished self.active_proxy_model().previous_track_ended() # Reset clocks self.frame_fade.setStyleSheet("") self.frame_silent.setStyleSheet("") self.label_elapsed_timer.setText("00:00 / 00:00") self.label_fade_timer.setText("00:00") self.label_silent_timer.setText("00:00") # Update headers self.update_headers() # Enable controls self.catch_return_key = False self.show_status_message("Play controls: Enabled", 0) def export_playlist_tab(self) -> None: """Export the current playlist to an m3u file""" if not self.active_tab(): return playlist_id = self.active_tab().playlist_id with db.Session() as session: # Get output filename playlist = session.get(Playlists, playlist_id) if not playlist: return pathspec = QFileDialog.getSaveFileName( self, "Save Playlist", directory=f"{playlist.name}.m3u", filter="M3U files (*.m3u);;All files (*.*)", ) if not pathspec: return path = pathspec[0] if not path.endswith(".m3u"): path += ".m3u" # Get list of track rows for this playlist plrs = PlaylistRows.get_rows_with_tracks(session, playlist_id) with open(path, "w") as f: # Required directive on first line f.write("#EXTM3U\n") for track in [a.track for a in plrs]: if track.duration is None: track.duration = 0 f.write( "#EXTINF:" f"{int(track.duration / 1000)}," f"{track.title} - " f"{track.artist}" "\n" f"{track.path}" "\n" ) def fade(self) -> None: """Fade currently playing track""" if track_sequence.current: track_sequence.current.fade() def hide_played(self): """Toggle hide played tracks""" if self.hide_played_tracks: self.hide_played_tracks = False self.active_proxy_model().hide_played_tracks(False) self.btnHidePlayed.setText("Hide played") else: self.hide_played_tracks = True self.active_proxy_model().hide_played_tracks(True) self.btnHidePlayed.setText("Show played") # Reset row heights self.active_tab().resize_rows() def import_track(self) -> None: """Import track file""" dlg = QFileDialog() dlg.setFileMode(QFileDialog.FileMode.ExistingFiles) dlg.setViewMode(QFileDialog.ViewMode.Detail) dlg.setDirectory(Config.IMPORT_DESTINATION) dlg.setNameFilter("Music files (*.flac *.mp3)") if not dlg.exec(): return with db.Session() as session: track_files: list[TrackFileData] = [] for fpath in dlg.selectedFiles(): tf = TrackFileData(fpath) tf.tags = helpers.get_tags(fpath) do_import = self.ok_to_import(session, fpath, tf.tags) if do_import: tf.track_path = os.path.join( Config.IMPORT_DESTINATION, os.path.basename(fpath) ) tf.audio_metadata = helpers.get_audio_metadata(fpath) track_files.append(tf) self.import_filenames(track_files) def import_filenames(self, track_files: list[TrackFileData]) -> None: """ Import the list of filenames as new tracks """ # Import in separate thread self.import_thread = QThread() self.worker = ImportTrack( track_files, self.active_proxy_model(), self.active_tab().source_model_selected_row_number(), ) self.worker.moveToThread(self.import_thread) self.import_thread.started.connect(self.worker.run) self.worker.import_finished.connect(self.import_thread.quit) self.worker.import_finished.connect(self.worker.deleteLater) self.import_thread.finished.connect(self.import_thread.deleteLater) self.import_thread.start() def ok_to_import(self, session: Session, fname: str, tags: dict[str, str]) -> bool: """ Check file has tags, check it's not a duplicate. Return True if this filenam is OK to import, False if not. """ title = tags["title"] if not title: helpers.show_warning( self, "Problem with track file", f"{fname} does not have a title tag", ) return False artist = tags["artist"] if not artist: helpers.show_warning( self, "Problem with track file", f"{fname} does not have an artist tag", ) return False txt = "" count = 0 possible_matches = Tracks.search_titles(session, title) if possible_matches: txt += "Similar to new track " txt += f'"{title}" by "{artist} ({fname})":\n\n' for track in possible_matches: txt += f' "{track.title}" by {track.artist}' txt += f" ({track.path})\n\n" count += 1 if count >= Config.MAX_IMPORT_MATCHES: txt += "\nThere are more similar-looking tracks" break txt += "\n" # Check whether to proceed if there were potential matches txt += "Proceed with import?" result = QMessageBox.question( self, "Possible duplicates", txt, QMessageBox.StandardButton.Ok, QMessageBox.StandardButton.Cancel, ) if result == QMessageBox.StandardButton.Cancel: return False return True def initialise_audacity(self) -> None: """ Initialise access to audacity """ try: self.audacity_client = pipeclient.PipeClient() log.debug(f"{hex(id(self.audacity_client))=}") except RuntimeError as e: log.error(f"Unable to initialise Audacity: {str(e)}") def insert_header(self) -> None: """Show dialog box to enter header text and add to playlist""" proxy_model = self.active_proxy_model() if proxy_model is None: log.error("No proxy model") return # Get header text dlg: QInputDialog = QInputDialog(self) dlg.setInputMode(QInputDialog.InputMode.TextInput) dlg.setLabelText("Header text:") dlg.resize(500, 100) ok = dlg.exec() if ok: proxy_model.insert_row( proposed_row_number=self.active_tab().source_model_selected_row_number(), note=dlg.textValue(), ) def insert_track(self) -> None: """Show dialog box to select and add track from database""" new_row_number = ( self.active_tab().source_model_selected_row_number() or self.active_proxy_model().rowCount() ) with db.Session() as session: dlg = TrackSelectDialog( parent=self, session=session, new_row_number=new_row_number, source_model=self.active_proxy_model(), ) dlg.exec() session.commit() def load_last_playlists(self) -> None: """Load the playlists that were open when the last session closed""" playlist_ids = [] with db.Session() as session: for playlist in Playlists.get_open(session): if playlist: _ = self.create_playlist_tab(playlist) playlist_ids.append(playlist.id) log.debug(f"load_last_playlists() loaded {playlist=}") # Set active tab record = Settings.get_setting(session, "active_tab") if record.f_int is not None and record.f_int >= 0: self.tabPlaylist.setCurrentIndex(record.f_int) # Tabs may move during use. Rather than track where tabs # are, we record the tab index when we close the main # window. To avoid possible duplicate tab entries, we null # them all out now. Playlists.clear_tabs(session, playlist_ids) session.commit() def lookup_row_in_songfacts(self) -> None: """ Display songfacts page for title in highlighted row """ row_number = self.active_tab().source_model_selected_row_number() if row_number is None: return track_info = self.active_proxy_model().get_row_info(row_number) if track_info is None: return self.signals.search_songfacts_signal.emit(track_info.title) def lookup_row_in_wikipedia(self) -> None: """ Display Wikipedia page for title in highlighted row """ row_number = self.active_tab().source_model_selected_row_number() if row_number is None: return track_info = self.active_proxy_model().get_row_info(row_number) if track_info is None: return self.signals.search_wikipedia_signal.emit(track_info.title) def move_playlist_rows(self, row_numbers: List[int]) -> None: """ Move passed playlist rows to another playlist """ # Identify destination playlist playlists = [] visible_tab = self.active_tab() source_playlist_id = visible_tab.playlist_id with db.Session() as session: for playlist in Playlists.get_all(session): if playlist.id == source_playlist_id: continue else: playlists.append(playlist) dlg = SelectPlaylistDialog(self, playlists=playlists, session=session) dlg.exec() if not dlg.playlist: return to_playlist_id = dlg.playlist.id # Get row number in destination playlist last_row = PlaylistRows.get_last_used_row(session, to_playlist_id) if last_row is not None: to_row = last_row + 1 else: to_row = 0 # Move rows self.active_proxy_model().move_rows_between_playlists( row_numbers, to_row, to_playlist_id ) def move_selected(self) -> None: """ Move selected rows to another playlist """ selected_rows = self.active_tab().get_selected_rows() if not selected_rows: return self.move_playlist_rows(selected_rows) def move_unplayed(self) -> None: """ Move unplayed rows to another playlist """ unplayed_rows = self.active_proxy_model().get_unplayed_rows() if not unplayed_rows: return # We can get a race condition as selected rows change while # moving so disable selected rows timing for move self.disable_selection_timing = True self.move_playlist_rows(unplayed_rows) self.disable_selection_timing = False def new_from_template(self) -> None: """Create new playlist from template""" with db.Session() as session: templates = Playlists.get_all_templates(session) dlg = SelectPlaylistDialog(self, playlists=templates, session=session) dlg.exec() template = dlg.playlist if template: playlist_name = self.solicit_playlist_name(session) if not playlist_name: log.error("Template has no name") return playlist = Playlists.create_playlist_from_template( session, template, playlist_name ) # Need to ensure that the new playlist is committed to # the database before it is opened by the model. session.commit() if playlist: playlist.mark_open() self.create_playlist_tab(playlist) else: log.error("Playlist failed to create") def open_playlist(self) -> None: """Open existing playlist""" with db.Session() as session: playlists = Playlists.get_closed(session) dlg = SelectPlaylistDialog(self, playlists=playlists, session=session) dlg.exec() playlist = dlg.playlist if playlist: idx = self.create_playlist_tab(playlist) playlist.mark_open() session.commit() self.tabPlaylist.setCurrentIndex(idx) def open_songfacts_browser(self, title: str) -> None: """Search Songfacts for title""" slug = slugify(title, replacements=([["'", ""]])) log.info(f"Songfacts browser tab for {title=}") url = f"https://www.songfacts.com/search/songs/{slug}" if Config.USE_INTERNAL_BROWSER: self.tabInfolist.open_tab(url, title) else: webbrowser.get('browser').open_new_tab(url) def open_wikipedia_browser(self, title: str) -> None: """Search Wikipedia for title""" str = urllib.parse.quote_plus(title) log.info(f"Wikipedia browser tab for {title=}") url = f"https://www.wikipedia.org/w/index.php?search={str}" if Config.USE_INTERNAL_BROWSER: self.tabInfolist.open_tab(url, title) else: webbrowser.get('browser').open_new_tab(url) def paste_rows(self) -> None: """ Paste earlier cut rows. """ if self.move_source_rows is None or self.move_source_model is None: return to_playlist_model: PlaylistModel = self.active_tab().source_model selected_rows = self.active_tab().get_selected_rows() if selected_rows: destination_row = selected_rows[0] else: destination_row = self.active_proxy_model().rowCount() if ( to_playlist_model.playlist_id == self.move_source_model.source_model.playlist_id ): self.move_source_model.move_rows(self.move_source_rows, destination_row) else: self.move_source_model.move_rows_between_playlists( self.move_source_rows, destination_row, to_playlist_model.playlist_id ) self.active_tab().resize_rows() self.active_tab().clear_selection() def play_next(self, position: Optional[float] = None) -> None: """ Play next track, optionally from passed position. Actions required: - If there is no next track set, return. - Check for inadvertent press of return - If there's currently a track playing, fade it. - Move next track to current track. - Clear next track - Restore volume if -3dB active - Play (new) current track. - Show closing volume graph - Notify model - Note that track is now playing - Disable play next controls - Update headers """ log.debug(f"issue223: play_next({position=})") # If there is no next track set, return. if track_sequence.next is None: log.error("musicmuster.play_next(): no next track selected") return # Check for inadvertent press of 'return' if self.return_pressed_in_error(): return # Issue #223 concerns a very short pause (maybe 0.1s) sometimes # when starting to play at track. # Resolution appears to be to disable timer10 for the first ten # seconds of playback. Re-enable in update_clocks. self.timer10.stop() log.debug("issue223: play_next: 10ms timer disabled") # If there's currently a track playing, fade it. if track_sequence.current: track_sequence.current.fade() # Move next track to current track. # end_of_track_actions() will have saved current track to # previous_track track_sequence.current = track_sequence.next # Clear next track self.clear_next() # Restore volume if -3dB active if self.btnDrop3db.isChecked(): log.debug("issue223: play_next: Reset -3db button") self.btnDrop3db.setChecked(False) # Play (new) current track track_sequence.current.play(position) # Update clocks now, don't wait for next tick log.debug("issue223: play_next: update_clocks()") self.update_clocks() # Show closing volume graph if track_sequence.current.fade_graph: log.debug(f"issue223: play_next: set up fade_graph, {track_sequence.current.title=}") track_sequence.current.fade_graph.GraphWidget = self.widgetFadeVolume track_sequence.current.fade_graph.clear() track_sequence.current.fade_graph.plot() else: log.debug("issue223: play_next: No fade_graph") # Disable play next controls self.catch_return_key = True self.show_status_message("Play controls: Disabled", 0) # Notify model log.debug("issue223: play_next: notify model") self.active_proxy_model().current_track_started() # Update headers log.debug("issue223: play_next: update headers") self.update_headers() with db.Session() as session: last_played = Playdates.last_played_tracks(session) tracklist = [] for lp in last_played: track = session.get(Tracks, lp.track_id) tracklist.append(f"{track.title} ({track.artist})") tt = "
".join(tracklist) self.hdrPreviousTrack.setToolTip(tt) def preview(self) -> None: """ Preview selected or next track. We use a different mechanism to normal track playing so that the user can route the output audio differently (eg, to headphones). """ if self.btnPreview.isChecked(): # Get track path for first selected track if there is one track_info = self.active_tab().get_selected_row_track_info() if not track_info: # Otherwise get track_id to next track to play if track_sequence.next: if track_sequence.next.path: track_info = TrackInfo( track_sequence.next.track_id, track_sequence.next.row_number ) else: return self.preview_manager.set_track_info(track_info) self.preview_manager.play() else: self.preview_manager.stop() def preview_arm(self): """Manager arm button for setting intro length""" self.btnPreviewMark.setEnabled(self.btnPreviewArm.isChecked()) def preview_back(self) -> None: """Wind back preview file""" self.preview_manager.back(5000) def preview_end(self) -> None: """Advance preview file to Config.PREVIEW_END_BUFFER_MS before end of intro""" if self.preview_manager: self.preview_manager.move_to_intro_end() def preview_fwd(self) -> None: """Advance preview file""" self.preview_manager.forward(5000) def preview_mark(self) -> None: """Set intro time""" if self.preview_manager.is_playing(): track_id = self.preview_manager.track_id row_number = self.preview_manager.row_number with db.Session() as session: track = session.get(Tracks, track_id) if track: # Save intro as millisends rounded to nearest 0.1 # second because editor spinbox only resolves to 0.1 # seconds intro = round(self.preview_manager.get_playtime() / 100) * 100 track.intro = intro session.commit() self.preview_manager.set_intro(intro) self.active_tab().source_model.refresh_row(session, row_number) self.active_tab().source_model.invalidate_row(row_number) def preview_start(self) -> None: """Restart preview""" self.preview_manager.restart() def quicklog(self) -> None: """ Create log entry """ log.debug("quicklog timestamp; entry follows") # Get log text dlg: QInputDialog = QInputDialog(self) dlg.setInputMode(QInputDialog.InputMode.TextInput) dlg.setLabelText("Log text:") dlg.resize(500, 100) ok = dlg.exec() if ok: log.debug("quicklog: " + dlg.textValue()) def rename_playlist(self) -> None: """ Rename current playlist """ with db.Session() as session: playlist_id = self.active_tab().playlist_id playlist = session.get(Playlists, playlist_id) if playlist: new_name = self.solicit_playlist_name(session, playlist.name) if new_name: playlist.rename(session, new_name) idx = self.tabBar.currentIndex() self.tabBar.setTabText(idx, new_name) session.commit() def import_files(self) -> None: """ Scan source directory and offer to replace existing files with "similar" files, or import the source file as a new track. """ import_files: list[TrackFileData] = [] with db.Session() as session: dlg = ReplaceFilesDialog( session=session, main_window=self, ) status = dlg.exec() if status: for rf in dlg.replacement_files: if rf.track_id: # We're updating an existing track # If the filename has changed, remove the # existing file if rf.obsolete_path is not None: if os.path.exists(rf.obsolete_path): os.unlink(rf.obsolete_path) else: log.error( f"replace_files: could not unlink {rf.obsolete_path=}" ) continue if rf.track_path: if os.path.exists(rf.track_path): os.unlink(rf.track_path) shutil.move(rf.new_file_path, rf.track_path) track = session.get(Tracks, rf.track_id) if not track: raise Exception( f"replace_files: could not retrieve track {rf.track_id}" ) track.artist = rf.tags["artist"] track.title = rf.tags["title"] if track.path != rf.track_path: track.path = rf.track_path try: session.commit() except IntegrityError: # https://jira.mariadb.org/browse/MDEV-29345 workaround session.rollback() track.path = "DUMMY" session.commit() track.path = rf.track_path else: session.commit() else: # We're importing a new track do_import = self.ok_to_import( session, os.path.basename(rf.new_file_path), rf.tags ) if do_import: rf.audio_metadata = helpers.get_audio_metadata( rf.new_file_path ) import_files.append(rf) # self.import_filenames(dlg.replacement_files) self.import_filenames(import_files) else: session.rollback() session.close() def return_pressed_in_error(self) -> bool: """ Check whether Return key has been pressed in error. Return True if it has, False if not """ if track_sequence.current and self.catch_return_key: # Suppress inadvertent double press if ( track_sequence.current and track_sequence.current.start_time and track_sequence.current.start_time + dt.timedelta(milliseconds=Config.RETURN_KEY_DEBOUNCE_MS) > dt.datetime.now() ): return True # If return is pressed during first PLAY_NEXT_GUARD_MS then # default to NOT playing the next track, else default to # playing it. default_yes: bool = track_sequence.current.start_time is not None and ( (dt.datetime.now() - track_sequence.current.start_time).total_seconds() * 1000 > Config.PLAY_NEXT_GUARD_MS ) if default_yes: msg = "Hit return to play next track now" else: msg = "Press tab to select Yes and hit return to play next track" if not helpers.ask_yes_no( "Play next track", msg, default_yes=default_yes, parent=self, ): return True return False def resume(self) -> None: """ Resume playing last track. We may be playing the next track or none; take care of both eventualities. Actions required: - Return if no saved position - Resume last track - If a track is playing, make that the next track """ if not track_sequence.previous: return # Return if no saved position if not track_sequence.previous.resume_marker: log.error("No previous track position") return # We want to use play_next() to resume, so copy the previous # track to the next track: track_sequence.next = track_sequence.previous # Now resume playing the now-next track self.play_next(track_sequence.next.resume_marker) # Adjust track info so that clocks and graph are correct. # We need to fake the start time to reflect where we resumed the # track if ( track_sequence.current and track_sequence.current.start_time and track_sequence.current.duration and track_sequence.current.resume_marker ): elapsed_ms = ( track_sequence.current.duration * track_sequence.current.resume_marker ) track_sequence.current.start_time -= dt.timedelta(milliseconds=elapsed_ms) def save_as_template(self) -> None: """Save current playlist as template""" with db.Session() as session: template_names = [a.name for a in Playlists.get_all_templates(session)] while True: # Get name for new template dlg = QInputDialog(self) dlg.setInputMode(QInputDialog.InputMode.TextInput) dlg.setLabelText("Template name:") dlg.resize(500, 100) ok = dlg.exec() if not ok: return template_name = dlg.textValue() if template_name not in template_names: break helpers.show_warning( self, "Duplicate template", "Template name already in use" ) Playlists.save_as_template( session, self.active_tab().playlist_id, template_name ) session.commit() helpers.show_OK(self, "Template", "Template saved") def search_playlist(self) -> None: """Show text box to search playlist""" # Disable play controls so that 'return' in search box doesn't # play next track self.catch_return_key = True self.txtSearch.setHidden(False) self.txtSearch.setFocus() # Select any text that may already be there self.txtSearch.selectAll() def search_playlist_clear(self) -> None: """Tidy up and reset search bar""" # Clean up search bar self.txtSearch.setText("") self.txtSearch.setHidden(True) def search_playlist_text_changed(self) -> None: """ Incremental search of playlist """ self.active_proxy_model().set_incremental_search(self.txtSearch.text()) def select_next_row(self) -> None: """Select next or first row in playlist""" self.active_tab().select_next_row() def select_previous_row(self) -> None: """Select previous or first row in playlist""" self.active_tab().select_previous_row() def set_main_window_size(self) -> None: """Set size of window from database""" with db.Session() as session: x = Settings.get_setting(session, "mainwindow_x").f_int or 100 y = Settings.get_setting(session, "mainwindow_y").f_int or 100 width = Settings.get_setting(session, "mainwindow_width").f_int or 100 height = Settings.get_setting(session, "mainwindow_height").f_int or 100 self.setGeometry(x, y, width, height) if Config.USE_INTERNAL_BROWSER: splitter_top = ( Settings.get_setting(session, "splitter_top").f_int or 100 ) splitter_bottom = ( Settings.get_setting(session, "splitter_bottom").f_int or 100 ) self.splitter.setSizes([splitter_top, splitter_bottom]) else: self.tabInfolist.hide() def set_selected_track_next(self) -> None: """ Set currently-selected row on visible playlist tab as next track """ playlist_tab = self.active_tab() if playlist_tab: playlist_tab.set_row_as_next_track() else: log.error("No active tab") def set_tab_colour(self, widget: PlaylistTab, colour: QColor) -> None: """ Find the tab containing the widget and set the text colour """ idx = self.tabPlaylist.indexOf(widget) self.tabPlaylist.tabBar().setTabTextColor(idx, colour) def show_current(self) -> None: """Scroll to show current track""" if track_sequence.current: self.show_track(track_sequence.current) def show_warning(self, title: str, body: str) -> None: """ Display a warning dialog """ print(f"show_warning({title=}, {body=})") QMessageBox.warning(self, title, body) def show_next(self) -> None: """Scroll to show next track""" if track_sequence.next: self.show_track(track_sequence.next) def show_status_message(self, message: str, timing: int) -> None: """ Show status message in status bar for timing milliseconds """ self.statusbar.showMessage(message, timing) def show_track(self, playlist_track: MainTrackManager) -> None: """Scroll to show track in plt""" # Switch to the correct tab playlist_id = playlist_track.playlist_id if not playlist_id: # No playlist return if playlist_id != self.active_tab().playlist_id: for idx in range(self.tabPlaylist.count()): if self.tabPlaylist.widget(idx).playlist_id == playlist_id: self.tabPlaylist.setCurrentIndex(idx) break display_row = ( self.active_proxy_model() .mapFromSource( self.active_proxy_model().source_model.index( playlist_track.row_number, 0 ) ) .row() ) self.tabPlaylist.currentWidget().scroll_to_top(display_row) def solicit_playlist_name( self, session: Session, default: str = "" ) -> Optional[str]: """Get name of new playlist from user""" dlg = QInputDialog(self) dlg.setInputMode(QInputDialog.InputMode.TextInput) dlg.setLabelText("Playlist name:") while True: if default: dlg.setTextValue(default) dlg.resize(500, 100) ok = dlg.exec() if ok: proposed_name = dlg.textValue() if Playlists.name_is_available(session, proposed_name): return proposed_name else: helpers.show_warning( self, "Name in use", f"There's already a playlist called '{proposed_name}'", ) continue else: return None def stop(self) -> None: """Stop playing immediately""" if track_sequence.current: track_sequence.current.stop() def tab_change(self): """Called when active tab changed""" self.active_tab().resize_rows() def tick_10ms(self) -> None: """ Called every 10ms """ if track_sequence.current: track_sequence.current.update_fade_graph() def tick_100ms(self) -> None: """ Called every 100ms """ if track_sequence.current: try: track_sequence.current.check_for_end_of_track() # Update intro counter if applicable and, if updated, return # because playing an intro takes precedence over timing a # preview. intro_ms_remaining = track_sequence.current.time_remaining_intro() if intro_ms_remaining > 0: self.label_intro_timer.setText(f"{intro_ms_remaining / 1000:.1f}") if intro_ms_remaining <= Config.INTRO_SECONDS_WARNING_MS: self.label_intro_timer.setStyleSheet( f"background: {Config.COLOUR_WARNING_TIMER}" ) return else: if self.label_intro_timer.styleSheet() != "": self.label_intro_timer.setStyleSheet("") self.label_intro_timer.setText("0.0") except AttributeError: # current track ended during servicing tick pass # Ensure preview button is reset if preview finishes playing # Update preview timer if self.btnPreview.isChecked(): if self.preview_manager.is_playing(): self.btnPreview.setChecked(True) minutes, seconds = divmod( self.preview_manager.get_playtime() / 1000, 60 ) self.label_intro_timer.setText(f"{int(minutes)}:{seconds:04.1f}") # if self.preview_track_manager.time_remaining_intro() <= 50: # self.label_intro_timer.setStyleSheet( # f"background: {Config.COLOUR_WARNING_TIMER}" # ) # else: # self.label_intro_timer.setStyleSheet("") else: self.btnPreview.setChecked(False) self.label_intro_timer.setText("0.0") self.label_intro_timer.setStyleSheet("") self.btnPreview.setChecked(False) def tick_1000ms(self) -> None: """ Called every 1000ms """ # Only update play clocks once a second so that their updates # are synchronised (otherwise it looks odd) self.update_clocks() def tick_500ms(self) -> None: """ Called every 500ms """ self.lblTOD.setText(dt.datetime.now().strftime(Config.TOD_TIME_FORMAT)) def update_clocks(self) -> None: """ Update track clocks. """ # If track is playing, update track clocks time and colours if track_sequence.current and track_sequence.current.is_playing(): # see play_next() and issue #223. # TODO: find a better way of handling this if ( track_sequence.current.time_playing() > 10000 and not self.timer10.isActive() ): self.timer10.start(10) log.debug("issue223: update_clocks: 10ms timer enabled") # Elapsed time self.label_elapsed_timer.setText( helpers.ms_to_mmss(track_sequence.current.time_playing()) + " / " + helpers.ms_to_mmss(track_sequence.current.duration) ) # Time to fade time_to_fade = track_sequence.current.time_to_fade() time_to_silence = track_sequence.current.time_to_silence() self.label_fade_timer.setText(helpers.ms_to_mmss(time_to_fade)) # If silent in the next 5 seconds, put warning colour on # time to silence box and enable play controls if time_to_silence <= Config.WARNING_MS_BEFORE_SILENCE: css_silence = f"background: {Config.COLOUR_ENDING_TIMER}" if self.frame_silent.styleSheet() != css_silence: self.frame_silent.setStyleSheet(css_silence) self.catch_return_key = False self.show_status_message("Play controls: Enabled", 0) # Set warning colour on time to silence box when fade starts elif time_to_fade <= 500: css_fade = f"background: {Config.COLOUR_WARNING_TIMER}" if self.frame_silent.styleSheet() != css_fade: self.frame_silent.setStyleSheet(css_fade) # Five seconds before fade starts, set warning colour on # time to silence box and enable play controls elif time_to_fade <= Config.WARNING_MS_BEFORE_FADE: self.frame_fade.setStyleSheet( f"background: {Config.COLOUR_WARNING_TIMER}" ) self.catch_return_key = False self.show_status_message("Play controls: Enabled", 0) else: self.frame_silent.setStyleSheet("") self.frame_fade.setStyleSheet("") self.label_silent_timer.setText(helpers.ms_to_mmss(time_to_silence)) def update_headers(self) -> None: """ Update last / current / next track headers """ if track_sequence.previous: self.hdrPreviousTrack.setText( f"{track_sequence.previous.title} - {track_sequence.previous.artist}" ) else: self.hdrPreviousTrack.setText("") if track_sequence.current: self.hdrCurrentTrack.setText( f"{track_sequence.current.title.replace('&', '&&')} - " f"{track_sequence.current.artist.replace('&', '&&')}" ) else: self.hdrCurrentTrack.setText("") if track_sequence.next: self.hdrNextTrack.setText( f"{track_sequence.next.title.replace('&', '&&')} - " f"{track_sequence.next.artist.replace('&', '&&')}" ) else: self.hdrNextTrack.setText("") class DownloadCSV(QDialog): def __init__(self, parent=None): super().__init__() self.ui = Ui_DateSelect() self.ui.setupUi(self) self.ui.dateTimeEdit.setDate(QDate.currentDate()) self.ui.dateTimeEdit.setTime(QTime(19, 59, 0)) self.ui.buttonBox.accepted.connect(self.accept) self.ui.buttonBox.rejected.connect(self.reject) class SelectPlaylistDialog(QDialog): def __init__(self, parent=None, playlists=None, session=None): super().__init__() if playlists is None: return self.ui = Ui_dlgSelectPlaylist() self.ui.setupUi(self) self.ui.lstPlaylists.itemDoubleClicked.connect(self.list_doubleclick) self.ui.buttonBox.accepted.connect(self.open) self.ui.buttonBox.rejected.connect(self.close) self.session = session self.playlist = None record = Settings.get_setting(self.session, "select_playlist_dialog_width") width = record.f_int or 800 record = Settings.get_setting(self.session, "select_playlist_dialog_height") height = record.f_int or 600 self.resize(width, height) for playlist in playlists: p = QListWidgetItem() p.setText(playlist.name) p.setData(Qt.ItemDataRole.UserRole, playlist) self.ui.lstPlaylists.addItem(p) def __del__(self): # review record = Settings.get_setting(self.session, "select_playlist_dialog_height") record.f_int = self.height() record = Settings.get_setting(self.session, "select_playlist_dialog_width") record.f_int = self.width() self.session.commit() def list_doubleclick(self, entry): # review self.playlist = entry.data(Qt.ItemDataRole.UserRole) self.accept() def open(self): # review if self.ui.lstPlaylists.selectedItems(): item = self.ui.lstPlaylists.currentItem() self.playlist = item.data(Qt.ItemDataRole.UserRole) self.accept() if __name__ == "__main__": """ If command line arguments given, carry out requested function and exit. Otherwise run full application. """ p = argparse.ArgumentParser() # Only allow at most one option to be specified group = p.add_mutually_exclusive_group() group.add_argument( "-b", "--bitrates", action="store_true", dest="update_bitrates", default=False, help="Update bitrates in database", ) group.add_argument( "-c", "--check-database", action="store_true", dest="check_db", default=False, help="Check and report on database", ) args = p.parse_args() # Run as required if args.check_db: log.debug("Checking database") with db.Session() as session: check_db(session) elif args.update_bitrates: log.debug("Update bitrates") with db.Session() as session: update_bitrates(session) else: try: app = QApplication(sys.argv) # PyQt6 defaults to a grey for labels palette = app.palette() palette.setColor( QPalette.ColorRole.WindowText, QColor(Config.COLOUR_LABEL_TEXT) ) # Set colours that will be used by playlist row stripes palette.setColor( QPalette.ColorRole.Base, QColor(Config.COLOUR_EVEN_PLAYLIST) ) palette.setColor( QPalette.ColorRole.AlternateBase, QColor(Config.COLOUR_ODD_PLAYLIST) ) app.setPalette(palette) win = Window() win.show() status = app.exec() sys.exit(status) except Exception as exc: if os.environ["MM_ENV"] == "PRODUCTION": from helpers import send_mail msg = stackprinter.format(exc) send_mail( ",".join(Config.ERRORS_TO), ",".join(Config.ERRORS_FROM), "Exception from musicmuster.py", msg, ) log.debug(msg) else: print("\033[1;31;47mUnhandled exception starts") print( stackprinter.format( exc, suppressed_paths=["/pypoetry/virtualenvs/"], style="darkbg" ) ) print("Unhandled exception ends\033[1;37;40m")