#!/usr/bin/env python3 # Standard library imports from os.path import basename from time import sleep from typing import cast, List, Optional import argparse import datetime as dt import os import subprocess import sys import threading # PyQt imports from PyQt6.QtCore import ( pyqtSignal, QDate, QEvent, QObject, QSize, Qt, QThread, QTime, QTimer, ) from PyQt6.QtGui import ( QCloseEvent, QColor, QFont, QMouseEvent, QPalette, QResizeEvent, ) from PyQt6.QtWidgets import ( QApplication, QDialog, QFileDialog, QInputDialog, QLabel, QLineEdit, QListWidgetItem, QMainWindow, QMessageBox, QProgressBar, QPushButton, ) # Third party imports from pygame import mixer import pipeclient from sqlalchemy.orm import scoped_session import stackprinter # type: ignore # App imports from classes import ( track_sequence, FadeCurve, MusicMusterSignals, PlaylistTrack, ) from config import Config from dbtables import db from dialogs import TrackSelectDialog from log import log from models import Carts, Playdates, PlaylistRows, Playlists, Settings, Tracks from playlistmodel import PlaylistModel, PlaylistProxyModel from playlists import PlaylistTab from ui import icons_rc # noqa F401 from ui.dlg_cart_ui import Ui_DialogCartEdit # type: ignore from ui.dlg_SelectPlaylist_ui import Ui_dlgSelectPlaylist # type: ignore from ui.downloadcsv_ui import Ui_DateSelect # type: ignore from ui.main_window_ui import Ui_MainWindow # type: ignore from utilities import check_db, update_bitrates import helpers import music class CartButton(QPushButton): """Button for playing carts""" progress = pyqtSignal(int) def __init__(self, musicmuster: "Window", cart: Carts, *args, **kwargs): """Create a cart pushbutton and set it disabled""" super().__init__(*args, **kwargs) self.musicmuster = musicmuster self.cart_id = cart.id if cart.path and cart.enabled and not cart.duration: tags = helpers.get_tags(cart.path) cart.duration = tags["duration"] self.duration = cart.duration self.path = cart.path self.player = None self.is_playing = False self.setEnabled(True) self.setMinimumSize(QSize(147, 61)) font = QFont() font.setPointSize(14) self.setFont(font) self.setObjectName("cart_" + str(cart.cart_number)) self.pgb = QProgressBar(self) self.pgb.setTextVisible(False) self.pgb.setVisible(False) palette = self.pgb.palette() palette.setColor( QPalette.ColorRole.Highlight, QColor(Config.COLOUR_CART_PROGRESSBAR) ) self.pgb.setPalette(palette) self.pgb.setGeometry(0, 0, self.width(), 10) self.pgb.setMinimum(0) self.pgb.setMaximum(1) self.pgb.setValue(0) self.progress.connect(self.pgb.setValue) def __repr__(self) -> str: return ( f"" ) def event(self, event: Optional[QEvent]) -> bool: """Allow right click even when button is disabled""" if not event: return False if event.type() == QEvent.Type.MouseButtonRelease: mouse_event = cast(QMouseEvent, event) if mouse_event.button() == Qt.MouseButton.RightButton: self.musicmuster.cart_edit(self, event) return True return super().event(event) def resizeEvent(self, event: Optional[QResizeEvent]) -> None: """Resize progess bar when button size changes""" self.pgb.setGeometry(0, 0, self.width(), 10) class ImportTrack(QObject): import_finished = pyqtSignal() def __init__( self, filenames: List[str], source_model: PlaylistModel, row_number: Optional[int], ) -> None: super().__init__() self.filenames = filenames self.source_model = source_model if row_number is None: self.next_row_number = source_model.rowCount() else: self.next_row_number = row_number self.signals = MusicMusterSignals() def run(self): """ Create track objects from passed files and add to visible playlist """ with db.Session() as session: for fname in self.filenames: self.signals.status_message_signal.emit( f"Importing {basename(fname)}", 5000 ) metadata = helpers.get_file_metadata(fname) try: track = Tracks(session, **metadata) except Exception as e: self.signals.show_warning_signal.emit( "Error importing track", str(e) ) return helpers.normalise_track(track.path) # We're importing potentially multiple tracks in a loop. # If there's an error adding the track to the Tracks # table, the session will rollback, thus losing any # previous additions in this loop. So, commit now to # lock in what we've just done. session.commit() self.source_model.insert_row(self.next_row_number, track.id, "") self.next_row_number += 1 self.signals.status_message_signal.emit( f"{len(self.filenames)} tracks imported", 10000 ) self.import_finished.emit() class Window(QMainWindow, Ui_MainWindow): def __init__(self, parent=None, *args, **kwargs) -> None: super().__init__(parent) self.setupUi(self) self.timer10: QTimer = QTimer() self.timer500: QTimer = QTimer() self.timer1000: QTimer = QTimer() self.music: music.Music = music.Music() self.playing: bool = False self.set_main_window_size() self.lblSumPlaytime = QLabel("") self.statusbar.addPermanentWidget(self.lblSumPlaytime) self.txtSearch = QLineEdit() self.txtSearch.setHidden(True) self.statusbar.addWidget(self.txtSearch) self.hide_played_tracks = False mixer.init() self.widgetFadeVolume.hideAxis("bottom") self.widgetFadeVolume.hideAxis("left") self.widgetFadeVolume.setDefaultPadding(0) self.widgetFadeVolume.setBackground(Config.FADE_CURVE_BACKGROUND) FadeCurve.GraphWidget = self.widgetFadeVolume self.active_tab = lambda: self.tabPlaylist.currentWidget() self.active_proxy_model = lambda: self.tabPlaylist.currentWidget().model() self.move_source_rows: Optional[List[int]] = None self.move_source_model: Optional[PlaylistProxyModel] = None self.audacity_file_path: Optional[str] = None self.audacity_client: Optional[pipeclient.PipeClient] = None self.initialise_audacity() if Config.CARTS_HIDE: self.cartsWidget.hide() self.frame_6.hide() else: self.carts_init() self.disable_selection_timing = False self.clock_counter = 0 self.timer10.start(10) self.timer500.start(500) self.timer1000.start(1000) self.signals = MusicMusterSignals() self.connect_signals_slots() self.catch_return_key = False self.load_last_playlists() def about(self) -> None: """Get git tag and database name""" try: git_tag = str( subprocess.check_output(["git", "describe"], stderr=subprocess.STDOUT) ).strip("'b\\n") except subprocess.CalledProcessError as exc_info: git_tag = str(exc_info.output) with db.Session() as session: if session.bind: dbname = session.bind.engine.url.database QMessageBox.information( self, "About", f"MusicMuster {git_tag}\n\nDatabase: {dbname}", QMessageBox.StandardButton.Ok, ) def cart_configure(self, cart: Carts, btn: CartButton) -> None: """Configure button with cart data""" btn.setEnabled(False) btn.pgb.setVisible(False) if cart.path: if not helpers.file_is_unreadable(cart.path): colour = Config.COLOUR_CART_READY btn.path = cart.path btn.player = self.music.VLC.media_player_new(cart.path) if btn.player: btn.player.audio_set_volume(Config.VOLUME_VLC_DEFAULT) if cart.enabled: btn.setEnabled(True) btn.pgb.setVisible(True) else: colour = Config.COLOUR_CART_ERROR else: colour = Config.COLOUR_CART_UNCONFIGURED btn.setStyleSheet("background-color: " + colour + ";\n") if cart.name is not None: btn.setText(cart.name) def cart_click(self) -> None: """Handle cart click""" btn = self.sender() if not isinstance(btn, CartButton): return if not helpers.file_is_unreadable(btn.path): # Don't allow clicks while we're playing btn.setEnabled(False) if not btn.player: log.debug(f"musicmuster.cart_click(): no player assigned ({btn=})") return btn.player.play() btn.is_playing = True colour = Config.COLOUR_CART_PLAYING thread = threading.Thread(target=self.cart_progressbar, args=(btn,)) thread.start() else: colour = Config.COLOUR_CART_ERROR btn.setStyleSheet("background-color: " + colour + ";\n") btn.pgb.setMinimum(0) def cart_edit(self, btn: CartButton, event: QEvent): """Handle context menu for cart button""" with db.Session() as session: cart = session.query(Carts).get(btn.cart_id) if cart is None: log.error("cart_edit: cart not found") return dlg = CartDialog(musicmuster=self, session=session, cart=cart) if dlg.exec(): name = dlg.ui.lineEditName.text() if not name: QMessageBox.warning(self, "Error", "Name required") return path = dlg.path if not path: QMessageBox.warning(self, "Error", "Filename required") return if cart.path and not helpers.file_is_unreadable(cart.path): tags = helpers.get_tags(cart.path) cart.duration = tags["duration"] cart.enabled = dlg.ui.chkEnabled.isChecked() cart.name = name cart.path = path session.add(cart) session.commit() self.cart_configure(cart, btn) def carts_init(self) -> None: """Initialse carts data structures""" with db.Session() as session: # Number carts from 1 for humanity for cart_number in range(1, Config.CARTS_COUNT + 1): cart = session.query(Carts).get(cart_number) if cart is None: cart = Carts(session, cart_number, name=f"Cart #{cart_number}") btn = CartButton(self, cart) btn.clicked.connect(self.cart_click) # Insert button on left of cart space starting at # location zero self.horizontalLayout_Carts.insertWidget(cart.id - 1, btn) # Configure button self.cart_configure(cart, btn) def cart_progressbar(self, btn: CartButton) -> None: """Manage progress bar""" if not btn.duration: return ms = 0 btn.pgb.setMaximum(btn.duration) while ms <= btn.duration: btn.progress.emit(ms) ms += 100 sleep(0.1) def cart_tick(self) -> None: """Cart clock actions""" for i in range(self.horizontalLayout_Carts.count()): btn = self.horizontalLayout_Carts.itemAt(i).widget() if not btn: continue if btn.is_playing: if not btn.player.is_playing(): # Cart has finished playing btn.is_playing = False btn.setEnabled(True) # Setting to position 0 doesn't seem to work btn.player = self.music.VLC.media_player_new(btn.path) btn.player.audio_set_volume(Config.VOLUME_VLC_DEFAULT) colour = Config.COLOUR_CART_READY btn.setStyleSheet("background-color: " + colour + ";\n") btn.pgb.setValue(0) def clear_next(self) -> None: """ Clear next track """ track_sequence.next = PlaylistTrack() self.update_headers() def clear_selection(self) -> None: """Clear row selection""" # Unselect any selected rows if self.active_tab(): self.active_tab().clear_selection() # Clear the search bar self.search_playlist_clear() def closeEvent(self, event: Optional[QCloseEvent]) -> None: """Handle attempt to close main window""" if not event: return # Don't allow window to close when a track is playing if self.playing: event.ignore() helpers.show_warning( self, "Track playing", "Can't close application while track is playing" ) else: with db.Session() as session: settings = Settings.all_as_dict(session) record = settings["mainwindow_height"] if record.f_int != self.height(): record.update(session, {"f_int": self.height()}) record = settings["mainwindow_width"] if record.f_int != self.width(): record.update(session, {"f_int": self.width()}) record = settings["mainwindow_x"] if record.f_int != self.x(): record.update(session, {"f_int": self.x()}) record = settings["mainwindow_y"] if record.f_int != self.y(): record.update(session, {"f_int": self.y()}) # Save splitter settings splitter_sizes = self.splitter.sizes() assert len(splitter_sizes) == 2 splitter_top, splitter_bottom = splitter_sizes record = settings["splitter_top"] if record.f_int != splitter_top: record.update(session, {"f_int": splitter_top}) record = settings["splitter_bottom"] if record.f_int != splitter_bottom: record.update(session, {"f_int": splitter_bottom}) # Save tab number of open playlists for idx in range(self.tabPlaylist.count()): playlist_id = self.tabPlaylist.widget(idx).playlist_id playlist = session.get(Playlists, playlist_id) if playlist: playlist.tab = idx session.flush() # Save current tab record = settings["active_tab"] record.update(session, {"f_int": self.tabPlaylist.currentIndex()}) event.accept() def close_playlist_tab(self) -> bool: """ Close active playlist tab, called by menu item """ return self.close_tab(self.tabPlaylist.currentIndex()) def close_tab(self, tab_index: int) -> bool: """ Close playlist tab unless it holds the current or next track. Called from close_playlist_tab() or by clicking close button on tab. Return True if tab closed else False. """ # Don't close current track playlist current_track_playlist_id = track_sequence.now.playlist_id closing_tab_playlist_id = self.tabPlaylist.widget(tab_index).playlist_id if current_track_playlist_id: if closing_tab_playlist_id == current_track_playlist_id: self.show_status_message("Can't close current track playlist", 5000) return False # Record playlist as closed and update remaining playlist tabs with db.Session() as session: playlist = session.get(Playlists, closing_tab_playlist_id) if playlist: playlist.close() # Close playlist and remove tab self.tabPlaylist.widget(tab_index).close() self.tabPlaylist.removeTab(tab_index) return True def connect_signals_slots(self) -> None: self.action_About.triggered.connect(self.about) self.action_Clear_selection.triggered.connect(self.clear_selection) self.actionDebug.triggered.connect(self.debug) self.actionClosePlaylist.triggered.connect(self.close_playlist_tab) self.actionDeletePlaylist.triggered.connect(self.delete_playlist) self.actionDownload_CSV_of_played_tracks.triggered.connect( self.download_played_tracks ) self.actionExport_playlist.triggered.connect(self.export_playlist_tab) self.actionFade.triggered.connect(self.fade) self.actionImport.triggered.connect(self.import_track) self.actionInsertSectionHeader.triggered.connect(self.insert_header) self.actionInsertTrack.triggered.connect(self.insert_track) self.actionMark_for_moving.triggered.connect(self.cut_rows) self.actionMoveSelected.triggered.connect(self.move_selected) self.actionNew_from_template.triggered.connect(self.new_from_template) self.actionNewPlaylist.triggered.connect(self.create_and_show_playlist) self.actionOpenPlaylist.triggered.connect(self.open_playlist) self.actionPaste.triggered.connect(self.paste_rows) self.actionPlay_next.triggered.connect(self.play_next) self.actionRenamePlaylist.triggered.connect(self.rename_playlist) self.actionResume.triggered.connect(self.resume) self.actionSave_as_template.triggered.connect(self.save_as_template) self.actionSearch_title_in_Songfacts.triggered.connect( self.lookup_row_in_songfacts ) self.actionSearch_title_in_Wikipedia.triggered.connect( self.lookup_row_in_wikipedia ) self.actionSearch.triggered.connect(self.search_playlist) self.actionSelect_duplicate_rows.triggered.connect( lambda: self.active_tab().select_duplicate_rows() ) self.actionMoveUnplayed.triggered.connect(self.move_unplayed) self.actionSetNext.triggered.connect(self.set_selected_track_next) self.actionSkipToNext.triggered.connect(self.play_next) self.actionStop.triggered.connect(self.stop) self.btnDrop3db.clicked.connect(self.drop3db) self.btnFade.clicked.connect(self.fade) self.btnHidePlayed.clicked.connect(self.hide_played) self.btnPreview.clicked.connect(self.preview) self.btnStop.clicked.connect(self.stop) self.hdrCurrentTrack.clicked.connect(self.show_current) self.hdrNextTrack.clicked.connect(self.show_next) self.tabPlaylist.currentChanged.connect(self.tab_change) self.tabPlaylist.tabCloseRequested.connect(self.close_tab) self.tabBar = self.tabPlaylist.tabBar() self.txtSearch.textChanged.connect(self.search_playlist_text_changed) self.signals.enable_escape_signal.connect(self.enable_escape) self.signals.next_track_changed_signal.connect(self.update_headers) self.signals.status_message_signal.connect(self.show_status_message) self.signals.show_warning_signal.connect(self.show_warning) self.timer10.timeout.connect(self.tick_10ms) self.timer500.timeout.connect(self.tick_500ms) self.timer1000.timeout.connect(self.tick_1000ms) def create_playlist( self, session: scoped_session, playlist_name: Optional[str] = None ) -> Optional[Playlists]: """Create new playlist""" log.info(f"create_playlist({playlist_name=}") playlist_name = self.solicit_playlist_name(session) if not playlist_name: return None playlist = Playlists(session, playlist_name) if playlist: playlist.mark_open() return playlist else: log.error("Failed to create playlist") return None def create_and_show_playlist(self) -> None: """Create new playlist and display it""" with db.Session() as session: playlist = self.create_playlist(session) if playlist: self.create_playlist_tab(playlist) def create_playlist_tab(self, playlist: Playlists) -> int: """ Take the passed playlist database object, create a playlist tab and add tab to display. Return index number of tab. """ log.info(f"create_playlist_tab({playlist=})") playlist_tab = PlaylistTab( musicmuster=self, playlist_id=playlist.id, ) idx = self.tabPlaylist.addTab(playlist_tab, playlist.name) log.info(f"create_playlist_tab() returned: {idx=}") return idx def cut_rows(self) -> None: """ Cut rows ready for pasting. """ # Save the selected PlaylistRows items ready for a later # paste self.move_source_rows = self.active_tab().get_selected_rows() self.move_source_model = self.active_proxy_model() log.info(f"cut_rows(): {self.move_source_rows=} {self.move_source_model=}") def debug(self): """Invoke debugger""" visible_playlist_id = self.active_tab().playlist_id print(f"Active playlist id={visible_playlist_id}") import ipdb # type: ignore ipdb.set_trace() def delete_playlist(self) -> None: """ Delete current playlist """ with db.Session() as session: playlist_id = self.active_tab().playlist_id playlist = session.get(Playlists, playlist_id) if playlist: if helpers.ask_yes_no( "Delete playlist", f"Delete playlist '{playlist.name}': " "Are you sure?", ): if self.close_playlist_tab(): playlist.delete(session) else: log.error("Failed to retrieve playlist") def download_played_tracks(self) -> None: """Download a CSV of played tracks""" dlg = DownloadCSV(self) if dlg.exec(): start_dt = dlg.ui.dateTimeEdit.dateTime().toPyDateTime() # Get output filename pathspec = QFileDialog.getSaveFileName( self, "Save CSV of tracks played", directory="/tmp/playlist.csv", filter="CSV files (*.csv)", ) if not pathspec: return path = pathspec[0] if not path.endswith(".csv"): path += ".csv" with open(path, "w") as f: with db.Session() as session: for playdate in Playdates.played_after(session, start_dt): f.write(f"{playdate.track.artist},{playdate.track.title}\n") def drop3db(self) -> None: """Drop music level by 3db if button checked""" if self.btnDrop3db.isChecked(): self.music.set_volume(Config.VOLUME_VLC_DROP3db, set_default=False) else: self.music.set_volume(Config.VOLUME_VLC_DEFAULT, set_default=False) def enable_escape(self, enabled: bool) -> None: """ Manage signal to enable/disable handling ESC character. Needed because we want to use ESC when editing playlist in place, so we need to disable it here while editing. """ log.debug(f"enable_escape({enabled=})") self.action_Clear_selection.setEnabled(enabled) def export_playlist_tab(self) -> None: """Export the current playlist to an m3u file""" if not self.active_tab(): return playlist_id = self.active_tab().playlist_id with db.Session() as session: # Get output filename playlist = session.get(Playlists, playlist_id) if not playlist: return pathspec = QFileDialog.getSaveFileName( self, "Save Playlist", directory=f"{playlist.name}.m3u", filter="M3U files (*.m3u);;All files (*.*)", ) if not pathspec: return path = pathspec[0] if not path.endswith(".m3u"): path += ".m3u" # Get list of track rows for this playlist plrs = PlaylistRows.get_rows_with_tracks(session, playlist_id) with open(path, "w") as f: # Required directive on first line f.write("#EXTM3U\n") for track in [a.track for a in plrs]: if track.duration is None: track.duration = 0 f.write( "#EXTINF:" f"{int(track.duration / 1000)}," f"{track.title} - " f"{track.artist}" "\n" f"{track.path}" "\n" ) def fade(self) -> None: """Fade currently playing track""" self.stop_playing(fade=True) def get_playtime(self) -> int: """ Return number of milliseconds current track has been playing or zero if not playing. The vlc function get_time() only updates 3-4 times a second; this function has much better resolution. """ if track_sequence.now.track_id is None or track_sequence.now.start_time is None: return 0 now = dt.datetime.now() track_start = track_sequence.now.start_time elapsed_seconds = (now - track_start).total_seconds() return int(elapsed_seconds * 1000) def hide_played(self): """Toggle hide played tracks""" if self.hide_played_tracks: self.hide_played_tracks = False self.active_proxy_model().hide_played_tracks(False) self.btnHidePlayed.setText("Hide played") else: self.hide_played_tracks = True self.active_proxy_model().hide_played_tracks(True) self.btnHidePlayed.setText("Show played") # Reset row heights self.active_tab().resize_rows() def import_track(self) -> None: """Import track file""" dlg = QFileDialog() dlg.setFileMode(QFileDialog.FileMode.ExistingFiles) dlg.setViewMode(QFileDialog.ViewMode.Detail) dlg.setDirectory(Config.IMPORT_DESTINATION) dlg.setNameFilter("Music files (*.flac *.mp3)") if not dlg.exec(): return with db.Session() as session: new_tracks = [] for fname in dlg.selectedFiles(): txt = "" tags = helpers.get_tags(fname) title = tags["title"] if not title: helpers.show_warning( self, "Problem with track file", f"{fname} does not have a title tag", ) continue artist = tags["artist"] if not artist: helpers.show_warning( self, "Problem with track file", f"{fname} does not have an artist tag", ) continue count = 0 possible_matches = Tracks.search_titles(session, title) if possible_matches: txt += "Similar to new track " txt += f'"{title}" by "{artist} ({fname})":\n\n' for track in possible_matches: txt += f' "{track.title}" by {track.artist}' txt += f" ({track.path})\n\n" count += 1 if count >= Config.MAX_IMPORT_MATCHES: txt += "\nThere are more similar-looking tracks" break txt += "\n" # Check whether to proceed if there were potential matches txt += "Proceed with import?" result = QMessageBox.question( self, "Possible duplicates", txt, QMessageBox.StandardButton.Ok, QMessageBox.StandardButton.Cancel, ) if result == QMessageBox.StandardButton.Cancel: continue new_tracks.append(fname) # Import in separate thread self.import_thread = QThread() self.worker = ImportTrack( new_tracks, self.active_proxy_model(), self.active_tab().source_model_selected_row_number(), ) self.worker.moveToThread(self.import_thread) self.import_thread.started.connect(self.worker.run) self.worker.import_finished.connect(self.import_thread.quit) self.worker.import_finished.connect(self.worker.deleteLater) self.import_thread.finished.connect(self.import_thread.deleteLater) self.import_thread.start() def initialise_audacity(self) -> None: """ Initialise access to audacity """ try: self.audacity_client = pipeclient.PipeClient() log.info(f"{hex(id(self.audacity_client))=}") except RuntimeError as e: log.error(f"Unable to initialise Audacity: {str(e)}") def insert_header(self) -> None: """Show dialog box to enter header text and add to playlist""" proxy_model = self.active_proxy_model() if proxy_model is None: log.error("No proxy model") return # Get header text dlg: QInputDialog = QInputDialog(self) dlg.setInputMode(QInputDialog.InputMode.TextInput) dlg.setLabelText("Header text:") dlg.resize(500, 100) ok = dlg.exec() if ok: proxy_model.insert_row( proposed_row_number=self.active_tab().source_model_selected_row_number(), note=dlg.textValue(), ) def insert_track(self) -> None: """Show dialog box to select and add track from database""" new_row_number = ( self.active_tab().source_model_selected_row_number() or self.active_proxy_model().rowCount() ) with db.Session() as session: dlg = TrackSelectDialog( session=session, new_row_number=new_row_number, source_model=self.active_proxy_model(), ) dlg.exec() def load_last_playlists(self) -> None: """Load the playlists that were open when the last session closed""" playlist_ids = [] with db.Session() as session: for playlist in Playlists.get_open(session): if playlist: _ = self.create_playlist_tab(playlist) playlist_ids.append(playlist.id) log.debug(f"load_last_playlists() loaded {playlist=}") # Set active tab record = Settings.get_int_settings(session, "active_tab") if record.f_int is not None and record.f_int >= 0: self.tabPlaylist.setCurrentIndex(record.f_int) # Tabs may move during use. Rather than track where tabs # are, we record the tab index when we close the main # window. To avoid possible duplicate tab entries, we null # them all out now. Playlists.clear_tabs(session, playlist_ids) def lookup_row_in_songfacts(self) -> None: """ Display songfacts page for title in highlighted row """ row_number = self.active_tab().source_model_selected_row_number() if row_number is None: return track_info = self.active_proxy_model().get_row_info(row_number) if track_info is None: return self.signals.search_songfacts_signal.emit(track_info.title) def lookup_row_in_wikipedia(self) -> None: """ Display Wikipedia page for title in highlighted row """ row_number = self.active_tab().source_model_selected_row_number() if row_number is None: return track_info = self.active_proxy_model().get_row_info(row_number) if track_info is None: return self.signals.search_wikipedia_signal.emit(track_info.title) def move_playlist_rows(self, row_numbers: List[int]) -> None: """ Move passed playlist rows to another playlist """ # Identify destination playlist playlists = [] visible_tab = self.active_tab() source_playlist_id = visible_tab.playlist_id with db.Session() as session: for playlist in Playlists.get_all(session): if playlist.id == source_playlist_id: continue else: playlists.append(playlist) dlg = SelectPlaylistDialog(self, playlists=playlists, session=session) dlg.exec() if not dlg.playlist: return to_playlist_id = dlg.playlist.id # Get row number in destination playlist last_row = PlaylistRows.get_last_used_row(session, to_playlist_id) if last_row is not None: to_row = last_row + 1 else: to_row = 0 # Move rows self.active_proxy_model().move_rows_between_playlists( row_numbers, to_row, to_playlist_id ) def move_selected(self) -> None: """ Move selected rows to another playlist """ selected_rows = self.active_tab().get_selected_rows() if not selected_rows: return self.move_playlist_rows(selected_rows) def move_unplayed(self) -> None: """ Move unplayed rows to another playlist """ unplayed_rows = self.active_proxy_model().get_unplayed_rows() if not unplayed_rows: return # We can get a race condition as selected rows change while # moving so disable selected rows timing for move self.disable_selection_timing = True self.move_playlist_rows(unplayed_rows) self.disable_selection_timing = False def new_from_template(self) -> None: """Create new playlist from template""" with db.Session() as session: templates = Playlists.get_all_templates(session) dlg = SelectPlaylistDialog(self, playlists=templates, session=session) dlg.exec() template = dlg.playlist if template: playlist_name = self.solicit_playlist_name(session) if not playlist_name: log.error("Template has no name") return playlist = Playlists.create_playlist_from_template( session, template, playlist_name ) # Need to ensure that the new playlist is committed to # the database before it is opened by the model. session.commit() if playlist: log.error("Playlist failed to create") playlist.mark_open() self.create_playlist_tab(playlist) def open_playlist(self) -> None: """Open existing playlist""" with db.Session() as session: playlists = Playlists.get_closed(session) dlg = SelectPlaylistDialog(self, playlists=playlists, session=session) dlg.exec() playlist = dlg.playlist if playlist: idx = self.create_playlist_tab(playlist) playlist.mark_open() self.tabPlaylist.setCurrentIndex(idx) def paste_rows(self) -> None: """ Paste earlier cut rows. """ if self.move_source_rows is None or self.move_source_model is None: return to_playlist_model: PlaylistModel = self.active_tab().source_model selected_rows = self.active_tab().get_selected_rows() if selected_rows: destination_row = selected_rows[0] else: destination_row = self.active_proxy_model().rowCount() if ( to_playlist_model.playlist_id == self.move_source_model.source_model.playlist_id ): self.move_source_model.move_rows(self.move_source_rows, destination_row) else: self.move_source_model.move_rows_between_playlists( self.move_source_rows, destination_row, to_playlist_model.playlist_id ) self.active_tab().resize_rows() self.active_tab().clear_selection() def play_next(self, position: Optional[float] = None) -> None: """ Play next track, optionally from passed position. Actions required: - If there is no next track set, return. - If there's currently a track playing, fade it. - Move next track to current track. - Clear next track - Restore volume if -3dB active - Play (new) current track. - Ensure 100% volume - Show closing volume graph - Notify model - Note that track is now playing - Disable play next controls - Update headers """ # Check for inadvertent press of 'return' if self.catch_return_key: if not helpers.ask_yes_no( "Track playing", "Really play next track now?", default_yes=True, parent=self, ): return log.info(f"play_next({position=})") # If there is no next track set, return. if not track_sequence.next.track_id: log.error("musicmuster.play_next(): no next track selected") return if not track_sequence.next.path: log.error("musicmuster.play_next(): no path for next track") return # If there's currently a track playing, fade it. self.stop_playing(fade=True) # Move next track to current track. # stop_playing() above has called end_of_track_actions() # which will have populated self.previous_track track_sequence.now = track_sequence.next # Clear next track self.clear_next() # Restore volume if -3dB active if self.btnDrop3db.isChecked(): log.debug("Reset -3db button") self.btnDrop3db.setChecked(False) # Play (new) current track if not track_sequence.now.path: log.error("No path for next track") return self.music.play(track_sequence.now.path, position) # Ensure 100% volume # For as-yet unknown reasons. sometimes the volume gets # reset to zero within 200mS or so of starting play. This # only happened since moving to Debian 12, which uses # Pipewire for sound (which may be irrelevant). for _ in range(3): if self.music.player: volume = self.music.player.audio_get_volume() if volume < Config.VOLUME_VLC_DEFAULT: self.music.set_volume() log.debug(f"Reset from {volume=}") break sleep(0.1) # Show closing volume graph if track_sequence.now.fade_graph: track_sequence.now.fade_graph.plot() else: log.error("No fade_graph") # Note that track is playing log.debug("set track_sequence") track_sequence.now.start() self.playing = True # Disable play next controls self.catch_return_key = True self.show_status_message("Play controls: Disabled", 0) # Notify model self.active_proxy_model().current_track_started() # Update headers log.debug("update headers") self.update_headers() def preview(self) -> None: """ Preview selected or next track. We use a different mechanism to normal track playing so that the user can route the output audio differently (eg, to headphones). """ if self.btnPreview.isChecked(): # Get track path for first selected track if there is one track_path = self.active_tab().get_selected_row_track_path() if not track_path: # Otherwise get path to next track to play track_path = track_sequence.next.path if not track_path: self.btnPreview.setChecked(False) return mixer.music.load(track_path) mixer.music.play() else: mixer.music.stop() def rename_playlist(self) -> None: """ Rename current playlist """ with db.Session() as session: playlist_id = self.active_tab().playlist_id playlist = session.get(Playlists, playlist_id) if playlist: new_name = self.solicit_playlist_name(session, playlist.name) if new_name: playlist.rename(session, new_name) idx = self.tabBar.currentIndex() self.tabBar.setTabText(idx, new_name) def resume(self) -> None: """ Resume playing last track. We may be playing the next track or none; take care of both eventualities. Actions required: - Return if no saved position - Resume last track - If a track is playing, make that the next track """ log.info("resume()") # Return if no saved position if not track_sequence.previous.resume_marker: log.error("No previous track position") return # We want to use play_next() to resume, so copy the previous # track to the next track: track_sequence.next = track_sequence.previous # Now resume playing the now-next track self.play_next(track_sequence.next.resume_marker) # Adjust track info so that clocks and graph are correct. # We need to fake the start time to reflect where we resumed the # track if ( track_sequence.now.start_time and track_sequence.now.duration and track_sequence.now.resume_marker ): elapsed_ms = track_sequence.now.duration * track_sequence.now.resume_marker track_sequence.now.start_time -= dt.timedelta(milliseconds=elapsed_ms) def save_as_template(self) -> None: """Save current playlist as template""" with db.Session() as session: template_names = [a.name for a in Playlists.get_all_templates(session)] while True: # Get name for new template dlg = QInputDialog(self) dlg.setInputMode(QInputDialog.InputMode.TextInput) dlg.setLabelText("Template name:") dlg.resize(500, 100) ok = dlg.exec() if not ok: return template_name = dlg.textValue() if template_name not in template_names: break helpers.show_warning( self, "Duplicate template", "Template name already in use" ) Playlists.save_as_template( session, self.active_tab().playlist_id, template_name ) helpers.show_OK(self, "Template", "Template saved") def search_playlist(self) -> None: """Show text box to search playlist""" # Disable play controls so that 'return' in search box doesn't # play next track self.catch_return_key = True self.txtSearch.setHidden(False) self.txtSearch.setFocus() # Select any text that may already be there self.txtSearch.selectAll() def search_playlist_clear(self) -> None: """Tidy up and reset search bar""" # Clean up search bar self.txtSearch.setText("") self.txtSearch.setHidden(True) def search_playlist_text_changed(self) -> None: """ Incremental search of playlist """ self.active_proxy_model().set_incremental_search(self.txtSearch.text()) def select_next_row(self) -> None: """Select next or first row in playlist""" self.active_tab().select_next_row() def select_previous_row(self) -> None: """Select previous or first row in playlist""" self.active_tab().select_previous_row() def set_main_window_size(self) -> None: """Set size of window from database""" with db.Session() as session: settings = Settings.all_as_dict(session) record = settings["mainwindow_x"] x = record.f_int or 1 record = settings["mainwindow_y"] y = record.f_int or 1 record = settings["mainwindow_width"] width = record.f_int or 1599 record = settings["mainwindow_height"] height = record.f_int or 981 self.setGeometry(x, y, width, height) record = settings["splitter_top"] splitter_top = record.f_int or 256 record = settings["splitter_bottom"] splitter_bottom = record.f_int or 256 self.splitter.setSizes([splitter_top, splitter_bottom]) return def set_selected_track_next(self) -> None: """ Set currently-selected row on visible playlist tab as next track """ playlist_tab = self.active_tab() if playlist_tab: playlist_tab.set_row_as_next_track() else: log.error("No active tab") def set_tab_colour(self, widget: PlaylistTab, colour: QColor) -> None: """ Find the tab containing the widget and set the text colour """ idx = self.tabPlaylist.indexOf(widget) self.tabPlaylist.tabBar().setTabTextColor(idx, colour) def show_current(self) -> None: """Scroll to show current track""" self.show_track(track_sequence.now) def show_warning(self, title: str, body: str) -> None: """ Display a warning dialog """ print(f"show_warning({title=}, {body=})") QMessageBox.warning(self, title, body) def show_next(self) -> None: """Scroll to show next track""" self.show_track(track_sequence.next) def show_status_message(self, message: str, timing: int) -> None: """ Show status message in status bar for timing milliseconds """ self.statusbar.showMessage(message, timing) def show_track(self, plt: PlaylistTrack) -> None: """Scroll to show track in plt""" # Switch to the correct tab plt_playlist_id = plt.playlist_id if not plt_playlist_id: # No playlist return if plt_playlist_id != self.active_tab().playlist_id: for idx in range(self.tabPlaylist.count()): if self.tabPlaylist.widget(idx).playlist_id == plt_playlist_id: self.tabPlaylist.setCurrentIndex(idx) break display_row = ( self.active_proxy_model() .mapFromSource( self.active_proxy_model().source_model.index(plt.plr_rownum, 0) ) .row() ) self.tabPlaylist.currentWidget().scroll_to_top(display_row) def solicit_playlist_name( self, session: scoped_session, default: str = "" ) -> Optional[str]: """Get name of new playlist from user""" dlg = QInputDialog(self) dlg.setInputMode(QInputDialog.InputMode.TextInput) dlg.setLabelText("Playlist name:") while True: if default: dlg.setTextValue(default) dlg.resize(500, 100) ok = dlg.exec() if ok: proposed_name = dlg.textValue() if Playlists.name_is_available(session, proposed_name): return proposed_name else: helpers.show_warning( self, "Name in use", f"There's already a playlist called '{proposed_name}'", ) continue else: return None def stop(self) -> None: """Stop playing immediately""" self.stop_playing(fade=False) def stop_playing(self, fade: bool = True) -> None: """ Stop playing current track Actions required: - Set flag to say we're not playing a track - Return if not playing - Stop/fade track - Reset playlist_tab colour - Tell playlist_tab track has finished - Reset PlaylistTrack objects - Reset clocks - Reset fade graph - Update headers - Enable controls """ # Set flag to say we're not playing a track so that timer ticks # don't see player=None and kick off end-of-track actions if self.playing: self.playing = False else: # Return if not playing log.info("stop_playing() called but not playing") return # Stop/fade track track_sequence.now.resume_marker = self.music.get_position() if fade: self.music.fade() else: self.music.stop() # Reset fade graph if track_sequence.now.fade_graph: track_sequence.now.fade_graph.clear() # Reset track_sequence objects if track_sequence.now.track_id: track_sequence.previous = track_sequence.now track_sequence.now = PlaylistTrack() # Tell model previous track has finished self.active_proxy_model().previous_track_ended() # Reset clocks self.frame_fade.setStyleSheet("") self.frame_silent.setStyleSheet("") self.label_elapsed_timer.setText("00:00 / 00:00") self.label_fade_timer.setText("00:00") self.label_silent_timer.setText("00:00") # Update headers self.update_headers() # Enable controls self.catch_return_key = False self.show_status_message("Play controls: Enabled", 0) def tab_change(self): """Called when active tab changed""" self.active_tab().resize_rows() def tick_10ms(self) -> None: """ Called every 10ms """ # Update volume fade curve if ( track_sequence.now.fade_graph_start_updates is None or track_sequence.now.fade_graph_start_updates > datetime.now() ): return if ( track_sequence.now.track_id and track_sequence.now.fade_graph and track_sequence.now.start_time ): play_time = ( dt.datetime.now() - track_sequence.now.start_time ).total_seconds() * 1000 track_sequence.now.fade_graph.tick(play_time) def tick_500ms(self) -> None: """ Called every 500ms """ self.lblTOD.setText(dt.datetime.now().strftime(Config.TOD_TIME_FORMAT)) # Update carts # self.cart_tick() def tick_1000ms(self) -> None: """ Called every 1000ms """ # Ensure preview button is reset if preview finishes playing self.btnPreview.setChecked(mixer.music.get_busy()) # Only update play clocks once a second so that their updates # are synchronised (otherwise it looks odd) if not self.playing: return # If track is playing, update track clocks time and colours # There is a discrete time between starting playing a track and # player.is_playing() returning True, so assume playing if less # than Config.PLAY_SETTLE microseconds have passed since # starting play. if ( self.music.player and track_sequence.now.start_time and ( self.music.player.is_playing() or (dt.datetime.now() - track_sequence.now.start_time) < dt.timedelta(microseconds=Config.PLAY_SETTLE) ) ): playtime = self.get_playtime() time_to_fade = track_sequence.now.fade_at - playtime time_to_silence = track_sequence.now.silence_at - playtime # Elapsed time self.label_elapsed_timer.setText( helpers.ms_to_mmss(playtime) + " / " + helpers.ms_to_mmss(track_sequence.now.duration) ) # Time to fade self.label_fade_timer.setText(helpers.ms_to_mmss(time_to_fade)) # If silent in the next 5 seconds, put warning colour on # time to silence box and enable play controls if time_to_silence <= Config.WARNING_MS_BEFORE_SILENCE: css_silence = f"background: {Config.COLOUR_ENDING_TIMER}" if self.frame_silent.styleSheet() != css_silence: self.frame_silent.setStyleSheet(css_silence) self.catch_return_key = False self.show_status_message("Play controls: Enabled", 0) # Set warning colour on time to silence box when fade starts elif time_to_fade <= 500: css_fade = f"background: {Config.COLOUR_WARNING_TIMER}" if self.frame_silent.styleSheet() != css_fade: self.frame_silent.setStyleSheet(css_fade) # Five seconds before fade starts, set warning colour on # time to silence box and enable play controls elif time_to_fade <= Config.WARNING_MS_BEFORE_FADE: self.frame_fade.setStyleSheet( f"background: {Config.COLOUR_WARNING_TIMER}" ) self.catch_return_key = False self.show_status_message("Play controls: Enabled", 0) else: self.frame_silent.setStyleSheet("") self.frame_fade.setStyleSheet("") self.label_silent_timer.setText(helpers.ms_to_mmss(time_to_silence)) # Autoplay next track # if time_to_silence <= 1500: # self.play_next() else: if self.playing: self.stop_playing() def update_headers(self) -> None: """ Update last / current / next track headers """ if track_sequence.previous.title and track_sequence.previous.artist: self.hdrPreviousTrack.setText( f"{track_sequence.previous.title} - {track_sequence.previous.artist}" ) else: self.hdrPreviousTrack.setText("") if track_sequence.now.title and track_sequence.now.artist: self.hdrCurrentTrack.setText( f"{track_sequence.now.title.replace('&', '&&')} - " f"{track_sequence.now.artist.replace('&', '&&')}" ) else: self.hdrCurrentTrack.setText("") if track_sequence.next.title and track_sequence.next.artist: self.hdrNextTrack.setText( f"{track_sequence.next.title.replace('&', '&&')} - " f"{track_sequence.next.artist.replace('&', '&&')}" ) else: self.hdrNextTrack.setText("") class CartDialog(QDialog): """Edit cart details""" def __init__( self, musicmuster: Window, session: scoped_session, cart: Carts, *args, **kwargs ) -> None: """ Manage carts """ super().__init__(*args, **kwargs) self.musicmuster = musicmuster self.session = session self.ui = Ui_DialogCartEdit() self.ui.setupUi(self) self.path = cart.path self.ui.lblPath.setText(self.path) self.ui.lineEditName.setText(cart.name) self.ui.chkEnabled.setChecked(cart.enabled) self.setWindowTitle("Edit Cart " + str(cart.id)) self.ui.btnFile.clicked.connect(self.choose_file) def choose_file(self) -> None: """File select""" dlg = QFileDialog() dlg.setFileMode(QFileDialog.FileMode.ExistingFile) dlg.setViewMode(QFileDialog.ViewMode.Detail) dlg.setDirectory(Config.CART_DIRECTORY) dlg.setNameFilter("Music files (*.flac *.mp3)") if dlg.exec(): self.path = dlg.selectedFiles()[0] self.ui.lblPath.setText(self.path) class DownloadCSV(QDialog): def __init__(self, parent=None): super().__init__() self.ui = Ui_DateSelect() self.ui.setupUi(self) self.ui.dateTimeEdit.setDate(QDate.currentDate()) self.ui.dateTimeEdit.setTime(QTime(19, 59, 0)) self.ui.buttonBox.accepted.connect(self.accept) self.ui.buttonBox.rejected.connect(self.reject) class SelectPlaylistDialog(QDialog): def __init__(self, parent=None, playlists=None, session=None): super().__init__() if playlists is None: return self.ui = Ui_dlgSelectPlaylist() self.ui.setupUi(self) self.ui.lstPlaylists.itemDoubleClicked.connect(self.list_doubleclick) self.ui.buttonBox.accepted.connect(self.open) self.ui.buttonBox.rejected.connect(self.close) self.session = session self.playlist = None record = Settings.get_int_settings(self.session, "select_playlist_dialog_width") width = record.f_int or 800 record = Settings.get_int_settings( self.session, "select_playlist_dialog_height" ) height = record.f_int or 600 self.resize(width, height) for playlist in playlists: p = QListWidgetItem() p.setText(playlist.name) p.setData(Qt.ItemDataRole.UserRole, playlist) self.ui.lstPlaylists.addItem(p) def __del__(self): # review record = Settings.get_int_settings( self.session, "select_playlist_dialog_height" ) if record.f_int != self.height(): record.update(self.session, {"f_int": self.height()}) record = Settings.get_int_settings(self.session, "select_playlist_dialog_width") if record.f_int != self.width(): record.update(self.session, {"f_int": self.width()}) def list_doubleclick(self, entry): # review self.playlist = entry.data(Qt.ItemDataRole.UserRole) self.accept() def open(self): # review if self.ui.lstPlaylists.selectedItems(): item = self.ui.lstPlaylists.currentItem() self.playlist = item.data(Qt.ItemDataRole.UserRole) self.accept() if __name__ == "__main__": """ If command line arguments given, carry out requested function and exit. Otherwise run full application. """ p = argparse.ArgumentParser() # Only allow at most one option to be specified group = p.add_mutually_exclusive_group() group.add_argument( "-b", "--bitrates", action="store_true", dest="update_bitrates", default=False, help="Update bitrates in database", ) group.add_argument( "-c", "--check-database", action="store_true", dest="check_db", default=False, help="Check and report on database", ) args = p.parse_args() # Run as required if args.check_db: log.debug("Checking database") with db.Session() as session: check_db(session) elif args.update_bitrates: log.debug("Update bitrates") with db.Session() as session: update_bitrates(session) else: try: app = QApplication(sys.argv) # PyQt6 defaults to a grey for labels palette = app.palette() palette.setColor( QPalette.ColorRole.WindowText, QColor(Config.COLOUR_LABEL_TEXT) ) # Set colours that will be used by playlist row stripes palette.setColor( QPalette.ColorRole.Base, QColor(Config.COLOUR_EVEN_PLAYLIST) ) palette.setColor( QPalette.ColorRole.AlternateBase, QColor(Config.COLOUR_ODD_PLAYLIST) ) app.setPalette(palette) win = Window() win.show() status = app.exec() sys.exit(status) except Exception as exc: if os.environ["MM_ENV"] == "PRODUCTION": from helpers import send_mail msg = stackprinter.format(exc) send_mail( Config.ERRORS_TO, Config.ERRORS_FROM, "Exception from musicmuster", msg, ) else: print("\033[1;31;47mUnhandled exception starts") print( stackprinter.format( exc, suppressed_paths=["/pypoetry/virtualenvs/"], style="darkbg" ) ) print("Unhandled exception ends\033[1;37;40m")