#!/usr/bin/env python3 from datetime import datetime, timedelta from os.path import basename from time import sleep from typing import ( cast, List, Optional, Sequence, ) import argparse import os import subprocess import sys import threading from pygame import mixer from PyQt6.QtCore import ( pyqtSignal, QDate, QEvent, QObject, QSize, Qt, QThread, QTime, QTimer, ) from PyQt6.QtGui import ( QCloseEvent, QColor, QFont, QMouseEvent, QPalette, QResizeEvent, ) from PyQt6.QtWidgets import ( QApplication, QDialog, QFileDialog, QInputDialog, QLabel, QLineEdit, QListWidgetItem, QMainWindow, QMessageBox, QProgressBar, QPushButton, ) from sqlalchemy import text import stackprinter # type: ignore from classes import ( track_sequence, FadeCurve, MusicMusterSignals, PlaylistTrack, ) from config import Config from dbconfig import ( engine, scoped_session, Session, ) from dialogs import TrackSelectDialog from log import log from models import Base, Carts, Playdates, PlaylistRows, Playlists, Settings, Tracks from playlistmodel import PlaylistModel from playlists import PlaylistTab from ui.dlg_cart_ui import Ui_DialogCartEdit # type: ignore from ui.dlg_SelectPlaylist_ui import Ui_dlgSelectPlaylist # type: ignore from ui.downloadcsv_ui import Ui_DateSelect # type: ignore from ui.main_window_ui import Ui_MainWindow # type: ignore from utilities import check_db, update_bitrates import helpers import icons_rc # noqa F401 import music class CartButton(QPushButton): """Button for playing carts""" progress = pyqtSignal(int) def __init__(self, musicmuster: "Window", cart: Carts, *args, **kwargs): """Create a cart pushbutton and set it disabled""" super().__init__(*args, **kwargs) self.musicmuster = musicmuster self.cart_id = cart.id if cart.path and cart.enabled and not cart.duration: tags = helpers.get_tags(cart.path) cart.duration = tags["duration"] self.duration = cart.duration self.path = cart.path self.player = None self.is_playing = False self.setEnabled(True) self.setMinimumSize(QSize(147, 61)) font = QFont() font.setPointSize(14) self.setFont(font) self.setObjectName("cart_" + str(cart.cart_number)) self.pgb = QProgressBar(self) self.pgb.setTextVisible(False) self.pgb.setVisible(False) palette = self.pgb.palette() palette.setColor( QPalette.ColorRole.Highlight, QColor(Config.COLOUR_CART_PROGRESSBAR) ) self.pgb.setPalette(palette) self.pgb.setGeometry(0, 0, self.width(), 10) self.pgb.setMinimum(0) self.pgb.setMaximum(1) self.pgb.setValue(0) self.progress.connect(self.pgb.setValue) def __repr__(self) -> str: return ( f"" ) def event(self, event: Optional[QEvent]) -> bool: """Allow right click even when button is disabled""" if not event: return False if event.type() == QEvent.Type.MouseButtonRelease: mouse_event = cast(QMouseEvent, event) if mouse_event.button() == Qt.MouseButton.RightButton: self.musicmuster.cart_edit(self, event) return True return super().event(event) def resizeEvent(self, event: Optional[QResizeEvent]) -> None: """Resize progess bar when button size changes""" self.pgb.setGeometry(0, 0, self.width(), 10) class ImportTrack(QObject): import_error = pyqtSignal(str) importing = pyqtSignal(str) finished = pyqtSignal(PlaylistTab) def __init__(self, playlist: PlaylistTab, filenames: list, row: int) -> None: super().__init__() self.filenames = filenames self.playlist = playlist self.row = row def run(self): """ Create track objects from passed files and add to visible playlist """ target_row = self.row with Session() as session: for fname in self.filenames: self.importing.emit(f"Importing {basename(fname)}") metadata = helpers.get_file_metadata(fname) try: track = Tracks(session, **metadata) except Exception as e: print(e) return helpers.normalise_track(track.path) self.playlist.insert_track(session, track, target_row) # Insert next row under this one target_row += 1 # We're importing potentially multiple tracks in a loop. # If there's an error adding the track to the Tracks # table, the session will rollback, thus losing any # previous additions in this loop. So, commit now to # lock in what we've just done. session.commit() self.playlist.save_playlist(session) self.finished.emit(self.playlist) class Window(QMainWindow, Ui_MainWindow): def __init__(self, parent=None, *args, **kwargs) -> None: super().__init__(parent) self.setupUi(self) self.timer10: QTimer = QTimer() self.timer500: QTimer = QTimer() self.timer1000: QTimer = QTimer() self.music: music.Music = music.Music() self.playing: bool = False self.selected_plrs: Optional[List[PlaylistRows]] = None self.set_main_window_size() self.lblSumPlaytime = QLabel("") self.statusbar.addPermanentWidget(self.lblSumPlaytime) self.txtSearch = QLineEdit() self.statusbar.addWidget(self.txtSearch) self.txtSearch.setHidden(True) self.hide_played_tracks = False mixer.init() self.widgetFadeVolume.hideAxis("bottom") self.widgetFadeVolume.hideAxis("left") self.widgetFadeVolume.setDefaultPadding(0) self.widgetFadeVolume.setBackground(Config.FADE_CURVE_BACKGROUND) FadeCurve.GraphWidget = self.widgetFadeVolume self.active_tab = lambda: self.tabPlaylist.currentWidget() self.active_model = lambda: self.tabPlaylist.currentWidget().model() self.load_last_playlists() if Config.CARTS_HIDE: self.cartsWidget.hide() self.frame_6.hide() else: self.carts_init() self.enable_play_next_controls() self.clock_counter = 0 self.timer10.start(10) self.timer500.start(500) self.timer1000.start(1000) self.signals = MusicMusterSignals() self.connect_signals_slots() def about(self) -> None: """Get git tag and database name""" try: git_tag = str( subprocess.check_output(["git", "describe"], stderr=subprocess.STDOUT) ).strip("'b\\n") except subprocess.CalledProcessError as exc_info: git_tag = str(exc_info.output) with Session() as session: if session.bind: dbname = session.bind.engine.url.database QMessageBox.information( self, "About", f"MusicMuster {git_tag}\n\nDatabase: {dbname}", QMessageBox.StandardButton.Ok, ) def cart_configure(self, cart: Carts, btn: CartButton) -> None: """Configure button with cart data""" btn.setEnabled(False) btn.pgb.setVisible(False) if cart.path: if not helpers.file_is_unreadable(cart.path): colour = Config.COLOUR_CART_READY btn.path = cart.path btn.player = self.music.VLC.media_player_new(cart.path) if btn.player: btn.player.audio_set_volume(Config.VOLUME_VLC_DEFAULT) if cart.enabled: btn.setEnabled(True) btn.pgb.setVisible(True) else: colour = Config.COLOUR_CART_ERROR else: colour = Config.COLOUR_CART_UNCONFIGURED btn.setStyleSheet("background-color: " + colour + ";\n") if cart.name is not None: btn.setText(cart.name) def cart_click(self) -> None: """Handle cart click""" btn = self.sender() if not isinstance(btn, CartButton): return if not helpers.file_is_unreadable(btn.path): # Don't allow clicks while we're playing btn.setEnabled(False) if not btn.player: log.debug(f"musicmuster.cart_click(): no player assigned ({btn=})") return btn.player.play() btn.is_playing = True colour = Config.COLOUR_CART_PLAYING thread = threading.Thread(target=self.cart_progressbar, args=(btn,)) thread.start() else: colour = Config.COLOUR_CART_ERROR btn.setStyleSheet("background-color: " + colour + ";\n") btn.pgb.setMinimum(0) def cart_edit(self, btn: CartButton, event: QEvent): """Handle context menu for cart button""" with Session() as session: cart = session.query(Carts).get(btn.cart_id) if cart is None: log.error("cart_edit: cart not found") return dlg = CartDialog(musicmuster=self, session=session, cart=cart) if dlg.exec(): name = dlg.ui.lineEditName.text() if not name: QMessageBox.warning(self, "Error", "Name required") return path = dlg.path if not path: QMessageBox.warning(self, "Error", "Filename required") return if cart.path and not helpers.file_is_unreadable(cart.path): tags = helpers.get_tags(cart.path) cart.duration = tags["duration"] cart.enabled = dlg.ui.chkEnabled.isChecked() cart.name = name cart.path = path session.add(cart) session.commit() self.cart_configure(cart, btn) def carts_init(self) -> None: """Initialse carts data structures""" with Session() as session: # Number carts from 1 for humanity for cart_number in range(1, Config.CARTS_COUNT + 1): cart = session.query(Carts).get(cart_number) if cart is None: cart = Carts(session, cart_number, name=f"Cart #{cart_number}") btn = CartButton(self, cart) btn.clicked.connect(self.cart_click) # Insert button on left of cart space starting at # location zero self.horizontalLayout_Carts.insertWidget(cart.id - 1, btn) # Configure button self.cart_configure(cart, btn) def cart_progressbar(self, btn: CartButton) -> None: """Manage progress bar""" if not btn.duration: return ms = 0 btn.pgb.setMaximum(btn.duration) while ms <= btn.duration: btn.progress.emit(ms) ms += 100 sleep(0.1) def cart_tick(self) -> None: """Cart clock actions""" for i in range(self.horizontalLayout_Carts.count()): btn = self.horizontalLayout_Carts.itemAt(i).widget() if not btn: continue if btn.is_playing: if not btn.player.is_playing(): # Cart has finished playing btn.is_playing = False btn.setEnabled(True) # Setting to position 0 doesn't seem to work btn.player = self.music.VLC.media_player_new(btn.path) btn.player.audio_set_volume(Config.VOLUME_VLC_DEFAULT) colour = Config.COLOUR_CART_READY btn.setStyleSheet("background-color: " + colour + ";\n") btn.pgb.setValue(0) def clear_next(self) -> None: """ Clear next track """ track_sequence.next = PlaylistTrack() self.update_headers() def clear_selection(self) -> None: """Clear selected row""" # Unselect any selected rows if self.active_tab(): self.active_tab().clear_selection() # Clear the search bar self.search_playlist_clear() def closeEvent(self, event: Optional[QCloseEvent]) -> None: """Handle attempt to close main window""" if not event: return # Don't allow window to close when a track is playing if self.playing: event.ignore() helpers.show_warning( self, "Track playing", "Can't close application while track is playing" ) else: with Session() as session: settings = Settings.all_as_dict(session) record = settings["mainwindow_height"] if record.f_int != self.height(): record.update(session, {"f_int": self.height()}) record = settings["mainwindow_width"] if record.f_int != self.width(): record.update(session, {"f_int": self.width()}) record = settings["mainwindow_x"] if record.f_int != self.x(): record.update(session, {"f_int": self.x()}) record = settings["mainwindow_y"] if record.f_int != self.y(): record.update(session, {"f_int": self.y()}) # Save splitter settings splitter_sizes = self.splitter.sizes() assert len(splitter_sizes) == 2 splitter_top, splitter_bottom = splitter_sizes record = settings["splitter_top"] if record.f_int != splitter_top: record.update(session, {"f_int": splitter_top}) record = settings["splitter_bottom"] if record.f_int != splitter_bottom: record.update(session, {"f_int": splitter_bottom}) # Save current tab record = settings["active_tab"] record.update(session, {"f_int": self.tabPlaylist.currentIndex()}) event.accept() def close_playlist_tab(self) -> bool: """ Close active playlist tab, called by menu item """ return self.close_tab(self.tabPlaylist.currentIndex()) def close_tab(self, tab_index: int) -> bool: """ Close playlist tab unless it holds the current or next track. Called from close_playlist_tab() or by clicking close button on tab. Return True if tab closed else False. """ return False # TODO Reimplement without ussing self.current_track.playlist_tab # # Don't close current track playlist # if self.tabPlaylist.widget(tab_index) == (self.current_track.playlist_tab): # self.statusbar.showMessage("Can't close current track playlist", 5000) # return False # # Attempt to close next track playlist # if self.tabPlaylist.widget(tab_index) == self.next_track.playlist_tab: # self.next_track.playlist_tab.clear_next() # # Record playlist as closed and update remaining playlist tabs # with Session() as session: # playlist_id = self.tabPlaylist.widget(tab_index).playlist_id # playlist = session.get(Playlists, playlist_id) # if playlist: # playlist.close(session) # # Close playlist and remove tab # self.tabPlaylist.widget(tab_index).close() # self.tabPlaylist.removeTab(tab_index) # return True def connect_signals_slots(self) -> None: self.action_About.triggered.connect(self.about) self.action_Clear_selection.triggered.connect(self.clear_selection) self.actionDebug.triggered.connect(self.debug) self.actionClosePlaylist.triggered.connect(self.close_playlist_tab) self.actionDeletePlaylist.triggered.connect(self.delete_playlist) self.actionDownload_CSV_of_played_tracks.triggered.connect( self.download_played_tracks ) self.actionEnable_controls.triggered.connect(self.enable_play_next_controls) self.actionExport_playlist.triggered.connect(self.export_playlist_tab) self.actionFade.triggered.connect(self.fade) self.actionFind_next.triggered.connect( lambda: self.tabPlaylist.currentWidget().search_next() ) self.actionFind_previous.triggered.connect( lambda: self.tabPlaylist.currentWidget().search_previous() ) self.actionImport.triggered.connect(self.import_track) self.actionInsertSectionHeader.triggered.connect(self.insert_header) self.actionInsertTrack.triggered.connect(self.insert_track) self.actionMark_for_moving.triggered.connect(self.cut_rows) self.actionMoveSelected.triggered.connect(self.move_selected) self.actionNew_from_template.triggered.connect(self.new_from_template) self.actionNewPlaylist.triggered.connect(self.create_and_show_playlist) self.actionOpenPlaylist.triggered.connect(self.open_playlist) self.actionPaste.triggered.connect(self.paste_rows) self.actionPlay_next.triggered.connect(self.play_next) self.actionRenamePlaylist.triggered.connect(self.rename_playlist) self.actionResume.triggered.connect(self.resume) self.actionSave_as_template.triggered.connect(self.save_as_template) self.actionSearch_title_in_Songfacts.triggered.connect( lambda: self.tabPlaylist.currentWidget().lookup_row_in_songfacts() ) self.actionSearch_title_in_Wikipedia.triggered.connect( lambda: self.tabPlaylist.currentWidget().lookup_row_in_wikipedia() ) self.actionSearch.triggered.connect(self.search_playlist) self.actionSelect_duplicate_rows.triggered.connect(self.select_duplicate_rows) self.actionSelect_next_track.triggered.connect(self.select_next_row) self.actionSelect_previous_track.triggered.connect(self.select_previous_row) self.actionMoveUnplayed.triggered.connect(self.move_unplayed) self.actionSetNext.triggered.connect(self.set_selected_track_next) self.actionSkipToNext.triggered.connect(self.play_next) self.actionStop.triggered.connect(self.stop) self.btnDrop3db.clicked.connect(self.drop3db) self.btnFade.clicked.connect(self.fade) self.btnHidePlayed.clicked.connect(self.hide_played) self.btnPreview.clicked.connect(self.preview) self.btnStop.clicked.connect(self.stop) self.hdrCurrentTrack.clicked.connect(self.show_current) self.hdrNextTrack.clicked.connect(self.show_next) self.tabPlaylist.currentChanged.connect(self.tab_change) self.tabPlaylist.tabCloseRequested.connect(self.close_tab) self.tabBar = self.tabPlaylist.tabBar() self.tabBar.tabMoved.connect(self.move_tab) self.txtSearch.returnPressed.connect(self.search_playlist_return) self.signals.enable_escape_signal.connect(self.enable_escape) self.signals.next_track_changed_signal.connect(self.update_headers) self.timer10.timeout.connect(self.tick_10ms) self.timer500.timeout.connect(self.tick_500ms) self.timer1000.timeout.connect(self.tick_1000ms) def create_playlist( self, session: scoped_session, playlist_name: Optional[str] = None ) -> Optional[Playlists]: """Create new playlist""" playlist_name = self.solicit_playlist_name() if not playlist_name: return None playlist = Playlists(session, playlist_name) return playlist def create_and_show_playlist(self) -> None: """Create new playlist and display it""" with Session() as session: playlist = self.create_playlist(session) if playlist: self.create_playlist_tab(session, playlist) def create_playlist_tab(self, session: scoped_session, playlist: Playlists) -> int: """ Take the passed playlist database object, create a playlist tab and add tab to display. Return index number of tab. """ assert playlist.id playlist_tab = PlaylistTab( musicmuster=self, playlist_id=playlist.id, ) idx = self.tabPlaylist.addTab(playlist_tab, playlist.name) self.tabPlaylist.setCurrentIndex(idx) return idx def cut_rows(self) -> None: """ Cut rows ready for pasting. """ with Session() as session: # Save the selected PlaylistRows items ready for a later # paste self.selected_plrs = self.active_tab().get_selected_playlistrows(session) def debug(self): """Invoke debugger""" visible_playlist_id = self.active_tab().playlist_id print(f"Active playlist id={visible_playlist_id}") import ipdb # type: ignore ipdb.set_trace() def delete_playlist(self) -> None: """ Delete current playlist """ with Session() as session: playlist_id = self.active_tab().playlist_id playlist = session.get(Playlists, playlist_id) if playlist: if helpers.ask_yes_no( "Delete playlist", f"Delete playlist '{playlist.name}': " "Are you sure?", ): if self.close_playlist_tab(): playlist.delete(session) def disable_play_next_controls(self) -> None: """ Disable "play next" keyboard controls """ self.actionPlay_next.setEnabled(False) self.statusbar.showMessage("Play controls: Disabled", 0) def download_played_tracks(self) -> None: """Download a CSV of played tracks""" dlg = DownloadCSV(self) if dlg.exec(): start_dt = dlg.ui.dateTimeEdit.dateTime().toPyDateTime() # Get output filename pathspec = QFileDialog.getSaveFileName( self, "Save CSV of tracks played", directory="/tmp/playlist.csv", filter="CSV files (*.csv)", ) if not pathspec: return path = pathspec[0] if not path.endswith(".csv"): path += ".csv" with open(path, "w") as f: with Session() as session: for playdate in Playdates.played_after(session, start_dt): f.write(f"{playdate.track.artist},{playdate.track.title}\n") def drop3db(self) -> None: """Drop music level by 3db if button checked""" if self.btnDrop3db.isChecked(): self.music.set_volume(Config.VOLUME_VLC_DROP3db, set_default=False) else: self.music.set_volume(Config.VOLUME_VLC_DEFAULT, set_default=False) def enable_escape(self, enabled: bool) -> None: """ Manage signal to enable/disable handling ESC character. Needed because we want to use ESC when editing playlist in place, so we need to disable it here while editing. """ self.action_Clear_selection.setEnabled(enabled) def enable_play_next_controls(self) -> None: """ Enable "play next" keyboard controls """ self.actionPlay_next.setEnabled(True) self.statusbar.showMessage("Play controls: Enabled", 0) def export_playlist_tab(self) -> None: """Export the current playlist to an m3u file""" if not self.active_tab(): return playlist_id = self.active_tab().playlist_id with Session() as session: # Get output filename playlist = session.get(Playlists, playlist_id) if not playlist: return pathspec = QFileDialog.getSaveFileName( self, "Save Playlist", directory=f"{playlist.name}.m3u", filter="M3U files (*.m3u);;All files (*.*)", ) if not pathspec: return path = pathspec[0] if not path.endswith(".m3u"): path += ".m3u" # Get list of track rows for this playlist plrs = PlaylistRows.get_rows_with_tracks(session, playlist_id) with open(path, "w") as f: # Required directive on first line f.write("#EXTM3U\n") for track in [a.track for a in plrs]: if track.duration is None: track.duration = 0 f.write( "#EXTINF:" f"{int(track.duration / 1000)}," f"{track.title} - " f"{track.artist}" "\n" f"{track.path}" "\n" ) def fade(self) -> None: """Fade currently playing track""" self.stop_playing(fade=True) def get_playtime(self) -> int: """ Return number of milliseconds current track has been playing or zero if not playing. The vlc function get_time() only updates 3-4 times a second; this function has much better resolution. """ if track_sequence.now.track_id is None or track_sequence.now.start_time is None: return 0 now = datetime.now() track_start = track_sequence.now.start_time elapsed_seconds = (now - track_start).total_seconds() return int(elapsed_seconds * 1000) def hide_played(self): """Toggle hide played tracks""" if self.hide_played_tracks: self.hide_played_tracks = False self.btnHidePlayed.setText("Hide played") else: self.hide_played_tracks = True self.btnHidePlayed.setText("Show played") # Update displayed playlist self.active_tab().hide_or_show_played_tracks() def import_track(self) -> None: """Import track file""" dlg = QFileDialog() dlg.setFileMode(QFileDialog.FileMode.ExistingFiles) dlg.setViewMode(QFileDialog.ViewMode.Detail) dlg.setDirectory(Config.IMPORT_DESTINATION) dlg.setNameFilter("Music files (*.flac *.mp3)") if not dlg.exec(): return with Session() as session: new_tracks = [] for fname in dlg.selectedFiles(): txt = "" tags = helpers.get_tags(fname) title = tags["title"] artist = tags["artist"] count = 0 possible_matches = Tracks.search_titles(session, title) if possible_matches: txt += "Similar to new track " txt += f'"{title}" by "{artist} ({fname})":\n\n' for track in possible_matches: txt += f' "{track.title}" by {track.artist}' txt += f" ({track.path})\n\n" count += 1 if count >= Config.MAX_IMPORT_MATCHES: txt += "\nThere are more similar-looking tracks" break txt += "\n" # Check whether to proceed if there were potential matches txt += "Proceed with import?" result = QMessageBox.question( self, "Possible duplicates", txt, QMessageBox.StandardButton.Ok, QMessageBox.StandardButton.Cancel, ) if result == QMessageBox.StandardButton.Cancel: continue new_tracks.append(fname) # Import in separate thread self.import_thread = QThread() self.worker = ImportTrack( self.active_tab(), new_tracks, self.active_tab().get_new_row_number(), ) self.worker.moveToThread(self.import_thread) self.import_thread.started.connect(self.worker.run) self.worker.finished.connect(self.import_thread.quit) self.worker.finished.connect(self.worker.deleteLater) self.import_thread.finished.connect(self.import_thread.deleteLater) self.worker.import_error.connect( lambda msg: helpers.show_warning( self, "Import error", "Error importing " + msg ) ) self.worker.importing.connect(lambda msg: self.statusbar.showMessage(msg, 5000)) self.worker.finished.connect(self.import_complete) self.import_thread.start() def import_complete(self): """ Called by thread when track import complete """ self.statusbar.showMessage("Imports complete") def insert_header(self) -> None: """Show dialog box to enter header text and add to playlist""" try: model = cast(PlaylistModel, self.active_tab().model()) if model is None: return except AttributeError: # Just return if there's no visible playlist tab model return # Get header text dlg: QInputDialog = QInputDialog(self) dlg.setInputMode(QInputDialog.InputMode.TextInput) dlg.setLabelText("Header text:") dlg.resize(500, 100) ok = dlg.exec() if ok: model.insert_header_row( self.active_tab().get_selected_row_number(), dlg.textValue() ) def insert_track(self) -> None: """Show dialog box to select and add track from database""" with Session() as session: dlg = TrackSelectDialog( session=session, new_row_number=self.active_tab().get_selected_row_number(), playlist_id=self.active_tab().playlist_id, ) dlg.exec() def load_last_playlists(self) -> None: """Load the playlists that were open when the last session closed""" with Session() as session: for playlist in Playlists.get_open(session): if playlist: _ = self.create_playlist_tab(session, playlist) # Set active tab record = Settings.get_int_settings(session, "active_tab") if record.f_int and record.f_int >= 0: self.tabPlaylist.setCurrentIndex(record.f_int) def move_playlist_rows( self, session: scoped_session, playlistrows: Sequence[PlaylistRows] ) -> None: """ Move passed playlist rows to another playlist Actions required: - exclude current/next tracks from being moved - identify destination playlist - update playlist for the rows in the database - remove them from the display - update destination playlist display if loaded """ # Remove current/next rows from list plrs_to_move = [ plr for plr in playlistrows if plr.id not in [track_sequence.now.plr_id, track_sequence.next.plr_id] ] rows_to_delete = [ plr.plr_rownum for plr in plrs_to_move if plr.plr_rownum is not None ] if not rows_to_delete: return # Identify destination playlist playlists = [] visible_tab = self.active_tab() source_playlist_id = visible_tab.playlist_id for playlist in Playlists.get_all(session): if playlist.id == source_playlist_id: continue else: playlists.append(playlist) dlg = SelectPlaylistDialog(self, playlists=playlists, session=session) dlg.exec() if not dlg.playlist: return destination_playlist_id = dlg.playlist.id # Update destination playlist in the database last_row = PlaylistRows.get_last_used_row(session, destination_playlist_id) if last_row is not None: next_row = last_row + 1 else: next_row = 0 for plr in plrs_to_move: plr.plr_rownum = next_row next_row += 1 plr.playlist_id = destination_playlist_id # Reset played as it's not been played on this playlist plr.played = False session.commit() # Remove moved rows from display and save visible playlist visible_tab.remove_rows(rows_to_delete) visible_tab.save_playlist(session) # Disable sort undo self.sort_undo: List[int] = [] # Update destination playlist_tab if visible (if not visible, it # will be re-populated when it is opened) destination_playlist_tab = None for tab in range(self.tabPlaylist.count()): if self.tabPlaylist.widget(tab).playlist_id == dlg.playlist.id: destination_playlist_tab = self.tabPlaylist.widget(tab) break if destination_playlist_tab: destination_playlist_tab.populate_display(session, dlg.playlist.id) def move_selected(self) -> None: """ Move selected rows to another playlist """ with Session() as session: selected_plrs = self.active_tab().get_selected_playlistrows(session) if not selected_plrs: return self.move_playlist_rows(session, selected_plrs) def move_tab(self, frm: int, to: int) -> None: """Handle tabs being moved""" with Session() as session: Playlists.move_tab(session, frm, to) def move_unplayed(self) -> None: """ Move unplayed rows to another playlist """ playlist_id = self.active_tab().playlist_id with Session() as session: unplayed_plrs = PlaylistRows.get_unplayed_rows(session, playlist_id) if helpers.ask_yes_no( "Move tracks", f"Move {len(unplayed_plrs)} tracks:" " Are you sure?" ): self.move_playlist_rows(session, unplayed_plrs) def new_from_template(self) -> None: """Create new playlist from template""" with Session() as session: templates = Playlists.get_all_templates(session) dlg = SelectPlaylistDialog(self, playlists=templates, session=session) dlg.exec() template = dlg.playlist if template: playlist_name = self.solicit_playlist_name() if not playlist_name: return playlist = Playlists.create_playlist_from_template( session, template, playlist_name ) if not playlist: return tab_index = self.create_playlist_tab(session, playlist) playlist.mark_open(session, tab_index) def open_playlist(self): """Open existing playlist""" with Session() as session: playlists = Playlists.get_closed(session) dlg = SelectPlaylistDialog(self, playlists=playlists, session=session) dlg.exec() playlist = dlg.playlist if playlist: tab_index = self.create_playlist_tab(session, playlist) playlist.mark_open(session, tab_index) def paste_rows(self) -> None: """ Paste earlier cut rows. Process: - ensure we have some cut rows - if not pasting at end of playlist, move later rows down - update plrs with correct playlist and row - if moving between playlists: renumber source playlist rows - else: check integrity of playlist rows """ if not self.selected_plrs: return playlist_tab = self.active_tab() dst_playlist_id = playlist_tab.playlist_id dst_row = self.active_tab().get_new_row_number() with Session() as session: # Create space in destination playlist PlaylistRows.move_rows_down( session, dst_playlist_id, dst_row, len(self.selected_plrs) ) session.commit() # Update plrs row = dst_row src_playlist_id = None for plr in self.selected_plrs: # Update moved rows session.add(plr) if not src_playlist_id: src_playlist_id = plr.playlist_id plr.playlist_id = dst_playlist_id plr.plr_rownum = row row += 1 if not src_playlist_id: return session.flush() # Update display self.active_tab().populate_display( session, dst_playlist_id, scroll_to_top=False ) # If source playlist is not destination playlist, fixup row # numbers and update display if src_playlist_id != dst_playlist_id: PlaylistRows.fixup_rownumbers(session, src_playlist_id) # Update source playlist_tab if visible (if not visible, it # will be re-populated when it is opened) source_playlist_tab = None for tab in range(self.tabPlaylist.count()): if self.tabPlaylist.widget(tab).playlist_id == src_playlist_id: source_playlist_tab = self.tabPlaylist.widget(tab) break if source_playlist_tab: source_playlist_tab.populate_display( session, src_playlist_id, scroll_to_top=False ) # Reset so rows can't be repasted self.selected_plrs = None def play_next(self, position: Optional[float] = None) -> None: """ Play next track, optionally from passed position. Actions required: - If there is no next track set, return. - If there's currently a track playing, fade it. - Move next track to current track. - Clear next track - Restore volume if -3dB active - Play (new) current track. - Ensure 100% volume - Show closing volume graph - Notify model - Note that track is now playing - Disable play next controls - Update headers """ # If there is no next track set, return. if not track_sequence.next.track_id: log.debug("musicmuster.play_next(): no next track selected") return if not track_sequence.next.path: log.debug("musicmuster.play_next(): no path for next track") return # If there's currently a track playing, fade it. self.stop_playing(fade=True) # Move next track to current track. # stop_playing() above has called end_of_track_actions() # which will have populated self.previous_track track_sequence.now = track_sequence.next # Clear next track self.clear_next() # Set current track playlist_tab colour # TODO Reimplement without reference to self.current_track.playlist_tab # current_tab = self.current_track.playlist_tab # if current_tab: # self.set_tab_colour(current_tab, QColor(Config.COLOUR_CURRENT_TAB)) # Restore volume if -3dB active if self.btnDrop3db.isChecked(): self.btnDrop3db.setChecked(False) # Show closing volume graph if track_sequence.now.fade_graph: track_sequence.now.fade_graph.plot() # Play (new) current track if not track_sequence.now.path: return track_sequence.now.start() self.music.play(track_sequence.now.path, position) # Ensure 100% volume # For as-yet unknown reasons. sometimes the volume gets # reset to zero within 200mS or so of starting play. This # only happened since moving to Debian 12, which uses # Pipewire for sound (which may be irrelevant). for _ in range(3): if self.music.player: volume = self.music.player.audio_get_volume() if volume < Config.VOLUME_VLC_DEFAULT: self.music.set_volume() log.error(f"Reset from {volume=}") break sleep(0.1) # Notify model self.active_model().current_track_started() # Note that track is now playing self.playing = True # Disable play next controls self.disable_play_next_controls() # Update headers self.update_headers() def preview(self) -> None: """ Preview selected or next track. We use a different mechanism to normal track playing so that the user can route the output audio differently (eg, to headphones). """ if self.btnPreview.isChecked(): # Get track path for first selected track if there is one track_path = self.active_tab().get_selected_row_track_path() if not track_path: # Otherwise get path to next track to play track_path = track_sequence.next.path if not track_path: self.btnPreview.setChecked(False) return mixer.music.load(track_path) mixer.music.play() else: mixer.music.stop() def rename_playlist(self) -> None: """ Rename current playlist """ with Session() as session: playlist_id = self.active_tab().playlist_id playlist = session.get(Playlists, playlist_id) if playlist: new_name = self.solicit_playlist_name(playlist.name) if new_name: playlist.rename(session, new_name) idx = self.tabBar.currentIndex() self.tabBar.setTabText(idx, new_name) def resume(self) -> None: """ Resume playing last track. We may be playing the next track or none; take care of both eventualities. Actions required: - Return if no saved position - Resume last track - If a track is playing, make that the next track """ # Return if no saved position if not track_sequence.previous.resume_marker: return # We want to use play_next() to resume, so copy the previous # track to the next track: track_sequence.next = track_sequence.previous # Now resume playing the now-next track self.play_next(track_sequence.next.resume_marker) # Adjust track info so that clocks and graph are correct. # We need to fake the start time to reflect where we resumed the # track if ( track_sequence.now.start_time and track_sequence.now.duration and track_sequence.now.resume_marker ): elapsed_ms = track_sequence.now.duration * track_sequence.now.resume_marker track_sequence.now.start_time -= timedelta(milliseconds=elapsed_ms) def save_as_template(self) -> None: """Save current playlist as template""" with Session() as session: template_names = [a.name for a in Playlists.get_all_templates(session)] while True: # Get name for new template dlg = QInputDialog(self) dlg.setInputMode(QInputDialog.InputMode.TextInput) dlg.setLabelText("Template name:") dlg.resize(500, 100) ok = dlg.exec() if not ok: return template_name = dlg.textValue() if template_name not in template_names: break helpers.show_warning( self, "Duplicate template", "Template name already in use" ) Playlists.save_as_template( session, self.active_tab().playlist_id, template_name ) helpers.show_OK(self, "Template", "Template saved") def search_playlist(self) -> None: """Show text box to search playlist""" # Disable play controls so that 'return' in search box doesn't # play next track self.disable_play_next_controls() self.txtSearch.setHidden(False) self.txtSearch.setFocus() # Select any text that may already be there self.txtSearch.selectAll() def search_playlist_clear(self) -> None: """Tidy up and reset search bar""" # Clear the search text self.active_tab().set_search("") # Clean up search bar self.txtSearch.setText("") self.txtSearch.setHidden(True) def search_playlist_return(self) -> None: """Initiate search when return pressed""" self.active_tab().set_search(self.txtSearch.text()) self.enable_play_next_controls() def select_duplicate_rows(self) -> None: """ Select the last of any rows with duplicate tracks in current playlist. This allows the selection to typically come towards the end of the playlist away from any show specific sections. If there a track is selected on three or more rows, only the last one is selected. """ visible_playlist_id = self.active_tab().playlist_id # Get row number of duplicate rows sql = text( f""" SELECT max(plr_rownum) FROM playlist_rows WHERE playlist_id = {visible_playlist_id} AND track_id != 0 GROUP BY track_id HAVING count(id) > 1 """ ) with Session() as session: row_numbers = [int(a) for a in session.execute(sql).scalars().all()] if row_numbers: self.active_tab().select_rows(row_numbers) self.statusbar.showMessage( f"{len(row_numbers)} duplicate rows selected", 10000 ) def select_next_row(self) -> None: """Select next or first row in playlist""" self.active_tab().select_next_row() def select_previous_row(self) -> None: """Select previous or first row in playlist""" self.active_tab().select_previous_row() def set_main_window_size(self) -> None: """Set size of window from database""" with Session() as session: settings = Settings.all_as_dict(session) record = settings["mainwindow_x"] x = record.f_int or 1 record = settings["mainwindow_y"] y = record.f_int or 1 record = settings["mainwindow_width"] width = record.f_int or 1599 record = settings["mainwindow_height"] height = record.f_int or 981 self.setGeometry(x, y, width, height) record = settings["splitter_top"] splitter_top = record.f_int or 256 record = settings["splitter_bottom"] splitter_bottom = record.f_int or 256 self.splitter.setSizes([splitter_top, splitter_bottom]) return def set_selected_track_next(self) -> None: """ Set currently-selected row on visible playlist tab as next track """ playlist_tab = self.active_tab() if playlist_tab: playlist_tab.set_row_as_next_track() def set_tab_colour(self, widget: PlaylistTab, colour: QColor) -> None: """ Find the tab containing the widget and set the text colour """ idx = self.tabPlaylist.indexOf(widget) self.tabPlaylist.tabBar().setTabTextColor(idx, colour) def show_current(self) -> None: """Scroll to show current track""" return # TODO Reimplement # if self.current_track.playlist_tab != self.active_tab(): # self.tabPlaylist.setCurrentWidget(self.current_track.playlist_tab) # self.tabPlaylist.currentWidget().scroll_current_to_top() def show_next(self) -> None: """Scroll to show next track""" return # TODO Reimplement # if self.next_track.playlist_tab != self.active_tab(): # self.tabPlaylist.setCurrentWidget(self.next_track.playlist_tab) # self.tabPlaylist.currentWidget().scroll_next_to_top() def solicit_playlist_name(self, default: Optional[str] = "") -> Optional[str]: """Get name of playlist from user""" dlg = QInputDialog(self) dlg.setInputMode(QInputDialog.InputMode.TextInput) dlg.setLabelText("Playlist name:") if default: dlg.setTextValue(default) dlg.resize(500, 100) ok = dlg.exec() if ok: return dlg.textValue() else: return None def stop(self) -> None: """Stop playing immediately""" self.stop_playing(fade=False) def stop_playing(self, fade: bool = True) -> None: """ Stop playing current track Actions required: - Set flag to say we're not playing a track - Return if not playing - Stop/fade track - Reset playlist_tab colour - Tell playlist_tab track has finished - Reset PlaylistTrack objects - Reset clocks - Reset fade graph - Update headers - Enable controls """ # Set flag to say we're not playing a track so that timer ticks # don't see player=None and kick off end-of-track actions if self.playing: self.playing = False else: # Return if not playing return # Stop/fade track track_sequence.now.resume_marker = self.music.get_position() if fade: self.music.fade() else: self.music.stop() # Reset fade graph if track_sequence.now.fade_graph: track_sequence.now.fade_graph.clear() # Reset track_sequence objects if track_sequence.now.track_id: track_sequence.previous = track_sequence.now track_sequence.now = PlaylistTrack() # Tell model previous track has finished self.active_model().previous_track_ended() # Reset clocks self.frame_fade.setStyleSheet("") self.frame_silent.setStyleSheet("") self.label_elapsed_timer.setText("00:00 / 00:00") self.label_fade_timer.setText("00:00") self.label_silent_timer.setText("00:00") # Update headers self.update_headers() # Enable controls self.enable_play_next_controls() def tab_change(self): """Called when active tab changed""" try: self.tabPlaylist.currentWidget().tab_visible() except AttributeError: # May also be called when last tab is closed pass def set_next_plr_id( self, next_plr_id: Optional[int], playlist_tab: PlaylistTab ) -> None: """ Set passed plr_id as next track to play, or clear next track if None Actions required: - Update playing_track - Tell playlist tabs to update their 'next track' highlighting - Update headers - Set playlist tab colours - Populate ‘info’ tabs """ return # with Session() as session: # # Update self.next_track PlaylistTrack structure # self.next_track = NextTrack() # if next_plr_id: # next_plr = session.get(PlaylistRows, next_plr_id) # if next_plr: # self.next_track.set_plr(session, next_plr) # self.signals.set_next_track_signal.emit(next_plr.playlist_id) # # Update headers # self.update_headers() # TODO: reimlement # # Set playlist tab colours # self._set_next_track_playlist_tab_colours(old_next_track) # if next_plr_id: # # Populate 'info' tabs with Wikipedia info, but queue it # # because it isn't quick # if self.next_track.title: # QTimer.singleShot( # 0, # lambda: self.tabInfolist.open_in_wikipedia( # self.next_track.title # ), # ) def _set_next_track_playlist_tab_colours( self, old_next_track: Optional[PlaylistTrack] ) -> None: """ Set playlist tab colour for next track. self.next_track needs to be set before calling. """ # If the original next playlist tab isn't the same as the # new one or the current track, it needs its colour reset. return # TODO Reimplement # if ( # old_next_track # and old_next_track.playlist_tab # and old_next_track.playlist_tab # not in [self.next_track.playlist_tab, self.current_track.playlist_tab] # ): # self.set_tab_colour( # old_next_track.playlist_tab, QColor(Config.COLOUR_NORMAL_TAB) # ) # # If the new next playlist tab isn't the same as the # # old one or the current track, it needs its colour set. # if old_next_track: # old_tab = old_next_track.playlist_tab # else: # old_tab = None # if ( # self.next_track # and self.next_track.playlist_tab # and self.next_track.playlist_tab # not in [old_tab, self.current_track.playlist_tab] # ): # self.set_tab_colour( # self.next_track.playlist_tab, QColor(Config.COLOUR_NEXT_TAB) # ) def tick_10ms(self) -> None: """ Called every 10ms """ # Update volume fade curve if ( track_sequence.now.track_id and track_sequence.now.fade_graph and track_sequence.now.start_time ): play_time = ( datetime.now() - track_sequence.now.start_time ).total_seconds() * 1000 track_sequence.now.fade_graph.tick(play_time) def tick_500ms(self) -> None: """ Called every 500ms """ self.lblTOD.setText(datetime.now().strftime(Config.TOD_TIME_FORMAT)) # Update carts # self.cart_tick() def tick_1000ms(self) -> None: """ Called every 1000ms """ # Ensure preview button is reset if preview finishes playing self.btnPreview.setChecked(mixer.music.get_busy()) # Only update play clocks once a second so that their updates # are synchronised (otherwise it looks odd) if not self.playing: return # If track is playing, update track clocks time and colours # There is a discrete time between starting playing a track and # player.is_playing() returning True, so assume playing if less # than Config.PLAY_SETTLE microseconds have passed since # starting play. if ( self.music.player and track_sequence.now.start_time and ( self.music.player.is_playing() or (datetime.now() - track_sequence.now.start_time) < timedelta(microseconds=Config.PLAY_SETTLE) ) ): playtime = self.get_playtime() time_to_fade = track_sequence.now.fade_at - playtime time_to_silence = track_sequence.now.silence_at - playtime # Elapsed time self.label_elapsed_timer.setText( helpers.ms_to_mmss(playtime) + " / " + helpers.ms_to_mmss(track_sequence.now.duration) ) # Time to fade self.label_fade_timer.setText(helpers.ms_to_mmss(time_to_fade)) # If silent in the next 5 seconds, put warning colour on # time to silence box and enable play controls if time_to_silence <= Config.WARNING_MS_BEFORE_SILENCE: css_silence = f"background: {Config.COLOUR_ENDING_TIMER}" if self.frame_silent.styleSheet() != css_silence: self.frame_silent.setStyleSheet(css_silence) self.enable_play_next_controls() # Set warning colour on time to silence box when fade starts elif time_to_fade <= 500: css_fade = f"background: {Config.COLOUR_WARNING_TIMER}" if self.frame_silent.styleSheet() != css_fade: self.frame_silent.setStyleSheet(css_fade) # Five seconds before fade starts, set warning colour on # time to silence box and enable play controls elif time_to_fade <= Config.WARNING_MS_BEFORE_FADE: self.frame_fade.setStyleSheet( f"background: {Config.COLOUR_WARNING_TIMER}" ) self.enable_play_next_controls() else: self.frame_silent.setStyleSheet("") self.frame_fade.setStyleSheet("") self.label_silent_timer.setText(helpers.ms_to_mmss(time_to_silence)) # Autoplay next track # if time_to_silence <= 1500: # self.play_next() else: if self.playing: self.stop_playing() def update_headers(self) -> None: """ Update last / current / next track headers """ if track_sequence.previous.title and track_sequence.previous.artist: self.hdrPreviousTrack.setText( f"{track_sequence.previous.title} - {track_sequence.previous.artist}" ) else: self.hdrPreviousTrack.setText("") if track_sequence.now.title and track_sequence.now.artist: self.hdrCurrentTrack.setText( f"{track_sequence.now.title.replace('&', '&&')} - " f"{track_sequence.now.artist.replace('&', '&&')}" ) else: self.hdrCurrentTrack.setText("") if track_sequence.next.title and track_sequence.next.artist: self.hdrNextTrack.setText( f"{track_sequence.next.title.replace('&', '&&')} - " f"{track_sequence.next.artist.replace('&', '&&')}" ) else: self.hdrNextTrack.setText("") class CartDialog(QDialog): """Edit cart details""" def __init__( self, musicmuster: Window, session: scoped_session, cart: Carts, *args, **kwargs ) -> None: """ Manage carts """ super().__init__(*args, **kwargs) self.musicmuster = musicmuster self.session = session self.ui = Ui_DialogCartEdit() self.ui.setupUi(self) self.path = cart.path self.ui.lblPath.setText(self.path) self.ui.lineEditName.setText(cart.name) self.ui.chkEnabled.setChecked(cart.enabled) self.setWindowTitle("Edit Cart " + str(cart.id)) self.ui.btnFile.clicked.connect(self.choose_file) def choose_file(self) -> None: """File select""" dlg = QFileDialog() dlg.setFileMode(QFileDialog.FileMode.ExistingFile) dlg.setViewMode(QFileDialog.ViewMode.Detail) dlg.setDirectory(Config.CART_DIRECTORY) dlg.setNameFilter("Music files (*.flac *.mp3)") if dlg.exec(): self.path = dlg.selectedFiles()[0] self.ui.lblPath.setText(self.path) class DownloadCSV(QDialog): def __init__(self, parent=None): super().__init__() self.ui = Ui_DateSelect() self.ui.setupUi(self) self.ui.dateTimeEdit.setDate(QDate.currentDate()) self.ui.dateTimeEdit.setTime(QTime(19, 59, 0)) self.ui.buttonBox.accepted.connect(self.accept) self.ui.buttonBox.rejected.connect(self.reject) class SelectPlaylistDialog(QDialog): def __init__(self, parent=None, playlists=None, session=None): super().__init__() if playlists is None: return self.ui = Ui_dlgSelectPlaylist() self.ui.setupUi(self) self.ui.lstPlaylists.itemDoubleClicked.connect(self.list_doubleclick) self.ui.buttonBox.accepted.connect(self.open) self.ui.buttonBox.rejected.connect(self.close) self.session = session self.playlist = None record = Settings.get_int_settings(self.session, "select_playlist_dialog_width") width = record.f_int or 800 record = Settings.get_int_settings( self.session, "select_playlist_dialog_height" ) height = record.f_int or 600 self.resize(width, height) for playlist in playlists: p = QListWidgetItem() p.setText(playlist.name) p.setData(Qt.ItemDataRole.UserRole, playlist) self.ui.lstPlaylists.addItem(p) def __del__(self): # review record = Settings.get_int_settings( self.session, "select_playlist_dialog_height" ) if record.f_int != self.height(): record.update(self.session, {"f_int": self.height()}) record = Settings.get_int_settings(self.session, "select_playlist_dialog_width") if record.f_int != self.width(): record.update(self.session, {"f_int": self.width()}) def list_doubleclick(self, entry): # review self.playlist = entry.data(Qt.ItemDataRole.UserRole) self.accept() def open(self): # review if self.ui.lstPlaylists.selectedItems(): item = self.ui.lstPlaylists.currentItem() self.playlist = item.data(Qt.ItemDataRole.UserRole) self.accept() if __name__ == "__main__": """ If command line arguments given, carry out requested function and exit. Otherwise run full application. """ p = argparse.ArgumentParser() # Only allow at most one option to be specified group = p.add_mutually_exclusive_group() group.add_argument( "-b", "--bitrates", action="store_true", dest="update_bitrates", default=False, help="Update bitrates in database", ) group.add_argument( "-c", "--check-database", action="store_true", dest="check_db", default=False, help="Check and report on database", ) args = p.parse_args() # Run as required if args.check_db: log.debug("Updating database") with Session() as session: check_db(session) engine.dispose() elif args.update_bitrates: log.debug("Update bitrates") with Session() as session: update_bitrates(session) engine.dispose() else: try: Base.metadata.create_all(engine) app = QApplication(sys.argv) # PyQt6 defaults to a grey for labels palette = app.palette() palette.setColor( QPalette.ColorRole.WindowText, QColor(Config.COLOUR_LABEL_TEXT) ) # Set colours that will be used by playlist row stripes palette.setColor( QPalette.ColorRole.Base, QColor(Config.COLOUR_EVEN_PLAYLIST) ) palette.setColor( QPalette.ColorRole.AlternateBase, QColor(Config.COLOUR_ODD_PLAYLIST) ) app.setPalette(palette) win = Window() win.show() status = app.exec() engine.dispose() sys.exit(status) except Exception as exc: if os.environ["MM_ENV"] == "PRODUCTION": from helpers import send_mail msg = stackprinter.format(exc) send_mail( Config.ERRORS_TO, Config.ERRORS_FROM, "Exception from musicmuster", msg, ) else: print("\033[1;31;47mUnhandled exception starts") print( stackprinter.format( exc, suppressed_paths=["/pypoetry/virtualenvs/"], style="darkbg" ) ) print("Unhandled exception ends\033[1;37;40m")