musicmuster/app/musicmuster.py
Keith Edmunds ebf62fe161 Fix #233
2024-04-27 18:47:06 +01:00

1806 lines
61 KiB
Python
Executable File

#!/usr/bin/env python3
from datetime import datetime, timedelta
from time import sleep
from typing import (
cast,
List,
Optional,
)
from os.path import basename
import argparse
import os
import subprocess
import sys
import threading
import pipeclient
from pygame import mixer
from PyQt6.QtCore import (
pyqtSignal,
QDate,
QEvent,
QObject,
QSize,
Qt,
QThread,
QTime,
QTimer,
)
from PyQt6.QtGui import (
QCloseEvent,
QColor,
QFont,
QMouseEvent,
QPalette,
QResizeEvent,
)
from PyQt6.QtWidgets import (
QApplication,
QDialog,
QFileDialog,
QInputDialog,
QLabel,
QLineEdit,
QListWidgetItem,
QMainWindow,
QMessageBox,
QProgressBar,
QPushButton,
)
import stackprinter # type: ignore
from classes import (
track_sequence,
FadeCurve,
MusicMusterSignals,
PlaylistTrack,
)
from config import Config
from dbconfig import (
engine,
scoped_session,
Session,
)
from dialogs import TrackSelectDialog
from log import log
from models import Base, Carts, Playdates, PlaylistRows, Playlists, Settings, Tracks
from playlistmodel import PlaylistModel, PlaylistProxyModel
from playlists import PlaylistTab
from ui.dlg_cart_ui import Ui_DialogCartEdit # type: ignore
from ui.dlg_SelectPlaylist_ui import Ui_dlgSelectPlaylist # type: ignore
from ui.downloadcsv_ui import Ui_DateSelect # type: ignore
from ui.main_window_ui import Ui_MainWindow # type: ignore
from utilities import check_db, update_bitrates
import helpers
from ui import icons_rc # noqa F401
import music
class CartButton(QPushButton):
"""Button for playing carts"""
progress = pyqtSignal(int)
def __init__(self, musicmuster: "Window", cart: Carts, *args, **kwargs):
"""Create a cart pushbutton and set it disabled"""
super().__init__(*args, **kwargs)
self.musicmuster = musicmuster
self.cart_id = cart.id
if cart.path and cart.enabled and not cart.duration:
tags = helpers.get_tags(cart.path)
cart.duration = tags["duration"]
self.duration = cart.duration
self.path = cart.path
self.player = None
self.is_playing = False
self.setEnabled(True)
self.setMinimumSize(QSize(147, 61))
font = QFont()
font.setPointSize(14)
self.setFont(font)
self.setObjectName("cart_" + str(cart.cart_number))
self.pgb = QProgressBar(self)
self.pgb.setTextVisible(False)
self.pgb.setVisible(False)
palette = self.pgb.palette()
palette.setColor(
QPalette.ColorRole.Highlight, QColor(Config.COLOUR_CART_PROGRESSBAR)
)
self.pgb.setPalette(palette)
self.pgb.setGeometry(0, 0, self.width(), 10)
self.pgb.setMinimum(0)
self.pgb.setMaximum(1)
self.pgb.setValue(0)
self.progress.connect(self.pgb.setValue)
def __repr__(self) -> str:
return (
f"<CartButton(cart_id={self.cart_id} "
f"path={self.path}, is_playing={self.is_playing}>"
)
def event(self, event: Optional[QEvent]) -> bool:
"""Allow right click even when button is disabled"""
if not event:
return False
if event.type() == QEvent.Type.MouseButtonRelease:
mouse_event = cast(QMouseEvent, event)
if mouse_event.button() == Qt.MouseButton.RightButton:
self.musicmuster.cart_edit(self, event)
return True
return super().event(event)
def resizeEvent(self, event: Optional[QResizeEvent]) -> None:
"""Resize progess bar when button size changes"""
self.pgb.setGeometry(0, 0, self.width(), 10)
class ImportTrack(QObject):
import_finished = pyqtSignal()
def __init__(
self,
filenames: List[str],
source_model: PlaylistModel,
row_number: Optional[int],
) -> None:
super().__init__()
self.filenames = filenames
self.source_model = source_model
if row_number is None:
self.next_row_number = source_model.rowCount()
else:
self.next_row_number = row_number
self.signals = MusicMusterSignals()
def run(self):
"""
Create track objects from passed files and add to visible playlist
"""
with Session() as session:
for fname in self.filenames:
self.signals.status_message_signal.emit(
f"Importing {basename(fname)}", 5000
)
metadata = helpers.get_file_metadata(fname)
try:
track = Tracks(session, **metadata)
except Exception as e:
self.signals.show_warning_signal.emit(
"Error importing track", str(e)
)
return
helpers.normalise_track(track.path)
# We're importing potentially multiple tracks in a loop.
# If there's an error adding the track to the Tracks
# table, the session will rollback, thus losing any
# previous additions in this loop. So, commit now to
# lock in what we've just done.
session.commit()
self.source_model.insert_row(self.next_row_number, track.id, "")
self.next_row_number += 1
self.signals.status_message_signal.emit(
f"{len(self.filenames)} tracks imported", 10000
)
self.import_finished.emit()
class Window(QMainWindow, Ui_MainWindow):
def __init__(self, parent=None, *args, **kwargs) -> None:
super().__init__(parent)
self.setupUi(self)
self.timer10: QTimer = QTimer()
self.timer500: QTimer = QTimer()
self.timer1000: QTimer = QTimer()
self.music: music.Music = music.Music()
self.playing: bool = False
self.selected_plrs: Optional[List[PlaylistRows]] = None
self.set_main_window_size()
self.lblSumPlaytime = QLabel("")
self.statusbar.addPermanentWidget(self.lblSumPlaytime)
self.txtSearch = QLineEdit()
self.statusbar.addWidget(self.txtSearch)
self.txtSearch.setHidden(True)
self.hide_played_tracks = False
mixer.init()
self.widgetFadeVolume.hideAxis("bottom")
self.widgetFadeVolume.hideAxis("left")
self.widgetFadeVolume.setDefaultPadding(0)
self.widgetFadeVolume.setBackground(Config.FADE_CURVE_BACKGROUND)
FadeCurve.GraphWidget = self.widgetFadeVolume
self.active_tab = lambda: self.tabPlaylist.currentWidget()
self.active_proxy_model = lambda: self.tabPlaylist.currentWidget().model()
self.move_source_rows: Optional[List[int]] = None
self.move_source_model: Optional[PlaylistProxyModel] = None
self.audacity_file_path: Optional[str] = None
self.audacity_client: Optional[pipeclient.PipeClient] = None
self.initialise_audacity()
if Config.CARTS_HIDE:
self.cartsWidget.hide()
self.frame_6.hide()
else:
self.carts_init()
self.disable_selection_timing = False
self.clock_counter = 0
self.timer10.start(10)
self.timer500.start(500)
self.timer1000.start(1000)
self.signals = MusicMusterSignals()
self.connect_signals_slots()
self.catch_return_key = False
self.load_last_playlists()
def about(self) -> None:
"""Get git tag and database name"""
try:
git_tag = str(
subprocess.check_output(["git", "describe"], stderr=subprocess.STDOUT)
).strip("'b\\n")
except subprocess.CalledProcessError as exc_info:
git_tag = str(exc_info.output)
with Session() as session:
if session.bind:
dbname = session.bind.engine.url.database
QMessageBox.information(
self,
"About",
f"MusicMuster {git_tag}\n\nDatabase: {dbname}",
QMessageBox.StandardButton.Ok,
)
def cart_configure(self, cart: Carts, btn: CartButton) -> None:
"""Configure button with cart data"""
btn.setEnabled(False)
btn.pgb.setVisible(False)
if cart.path:
if not helpers.file_is_unreadable(cart.path):
colour = Config.COLOUR_CART_READY
btn.path = cart.path
btn.player = self.music.VLC.media_player_new(cart.path)
if btn.player:
btn.player.audio_set_volume(Config.VOLUME_VLC_DEFAULT)
if cart.enabled:
btn.setEnabled(True)
btn.pgb.setVisible(True)
else:
colour = Config.COLOUR_CART_ERROR
else:
colour = Config.COLOUR_CART_UNCONFIGURED
btn.setStyleSheet("background-color: " + colour + ";\n")
if cart.name is not None:
btn.setText(cart.name)
def cart_click(self) -> None:
"""Handle cart click"""
btn = self.sender()
if not isinstance(btn, CartButton):
return
if not helpers.file_is_unreadable(btn.path):
# Don't allow clicks while we're playing
btn.setEnabled(False)
if not btn.player:
log.debug(f"musicmuster.cart_click(): no player assigned ({btn=})")
return
btn.player.play()
btn.is_playing = True
colour = Config.COLOUR_CART_PLAYING
thread = threading.Thread(target=self.cart_progressbar, args=(btn,))
thread.start()
else:
colour = Config.COLOUR_CART_ERROR
btn.setStyleSheet("background-color: " + colour + ";\n")
btn.pgb.setMinimum(0)
def cart_edit(self, btn: CartButton, event: QEvent):
"""Handle context menu for cart button"""
with Session() as session:
cart = session.query(Carts).get(btn.cart_id)
if cart is None:
log.error("cart_edit: cart not found")
return
dlg = CartDialog(musicmuster=self, session=session, cart=cart)
if dlg.exec():
name = dlg.ui.lineEditName.text()
if not name:
QMessageBox.warning(self, "Error", "Name required")
return
path = dlg.path
if not path:
QMessageBox.warning(self, "Error", "Filename required")
return
if cart.path and not helpers.file_is_unreadable(cart.path):
tags = helpers.get_tags(cart.path)
cart.duration = tags["duration"]
cart.enabled = dlg.ui.chkEnabled.isChecked()
cart.name = name
cart.path = path
session.add(cart)
session.commit()
self.cart_configure(cart, btn)
def carts_init(self) -> None:
"""Initialse carts data structures"""
with Session() as session:
# Number carts from 1 for humanity
for cart_number in range(1, Config.CARTS_COUNT + 1):
cart = session.query(Carts).get(cart_number)
if cart is None:
cart = Carts(session, cart_number, name=f"Cart #{cart_number}")
btn = CartButton(self, cart)
btn.clicked.connect(self.cart_click)
# Insert button on left of cart space starting at
# location zero
self.horizontalLayout_Carts.insertWidget(cart.id - 1, btn)
# Configure button
self.cart_configure(cart, btn)
def cart_progressbar(self, btn: CartButton) -> None:
"""Manage progress bar"""
if not btn.duration:
return
ms = 0
btn.pgb.setMaximum(btn.duration)
while ms <= btn.duration:
btn.progress.emit(ms)
ms += 100
sleep(0.1)
def cart_tick(self) -> None:
"""Cart clock actions"""
for i in range(self.horizontalLayout_Carts.count()):
btn = self.horizontalLayout_Carts.itemAt(i).widget()
if not btn:
continue
if btn.is_playing:
if not btn.player.is_playing():
# Cart has finished playing
btn.is_playing = False
btn.setEnabled(True)
# Setting to position 0 doesn't seem to work
btn.player = self.music.VLC.media_player_new(btn.path)
btn.player.audio_set_volume(Config.VOLUME_VLC_DEFAULT)
colour = Config.COLOUR_CART_READY
btn.setStyleSheet("background-color: " + colour + ";\n")
btn.pgb.setValue(0)
def clear_next(self) -> None:
"""
Clear next track
"""
track_sequence.next = PlaylistTrack()
self.update_headers()
def clear_selection(self) -> None:
"""Clear row selection"""
# Unselect any selected rows
if self.active_tab():
self.active_tab().clear_selection()
# Clear the search bar
self.search_playlist_clear()
def closeEvent(self, event: Optional[QCloseEvent]) -> None:
"""Handle attempt to close main window"""
if not event:
return
# Don't allow window to close when a track is playing
if self.playing:
event.ignore()
helpers.show_warning(
self, "Track playing", "Can't close application while track is playing"
)
else:
with Session() as session:
settings = Settings.all_as_dict(session)
record = settings["mainwindow_height"]
if record.f_int != self.height():
record.update(session, {"f_int": self.height()})
record = settings["mainwindow_width"]
if record.f_int != self.width():
record.update(session, {"f_int": self.width()})
record = settings["mainwindow_x"]
if record.f_int != self.x():
record.update(session, {"f_int": self.x()})
record = settings["mainwindow_y"]
if record.f_int != self.y():
record.update(session, {"f_int": self.y()})
# Save splitter settings
splitter_sizes = self.splitter.sizes()
assert len(splitter_sizes) == 2
splitter_top, splitter_bottom = splitter_sizes
record = settings["splitter_top"]
if record.f_int != splitter_top:
record.update(session, {"f_int": splitter_top})
record = settings["splitter_bottom"]
if record.f_int != splitter_bottom:
record.update(session, {"f_int": splitter_bottom})
# Save tab number of open playlists
for idx in range(self.tabPlaylist.count()):
playlist_id = self.tabPlaylist.widget(idx).playlist_id
playlist = session.get(Playlists, playlist_id)
if playlist:
playlist.tab = idx
session.flush()
# Save current tab
record = settings["active_tab"]
record.update(session, {"f_int": self.tabPlaylist.currentIndex()})
event.accept()
def close_playlist_tab(self) -> bool:
"""
Close active playlist tab, called by menu item
"""
return self.close_tab(self.tabPlaylist.currentIndex())
def close_tab(self, tab_index: int) -> bool:
"""
Close playlist tab unless it holds the current or next track.
Called from close_playlist_tab() or by clicking close button on tab.
Return True if tab closed else False.
"""
# Don't close current track playlist
current_track_playlist_id = track_sequence.now.playlist_id
closing_tab_playlist_id = self.tabPlaylist.widget(tab_index).playlist_id
if current_track_playlist_id:
if closing_tab_playlist_id == current_track_playlist_id:
self.show_status_message("Can't close current track playlist", 5000)
return False
# Record playlist as closed and update remaining playlist tabs
with Session() as session:
playlist = session.get(Playlists, closing_tab_playlist_id)
if playlist:
playlist.close()
# Close playlist and remove tab
self.tabPlaylist.widget(tab_index).close()
self.tabPlaylist.removeTab(tab_index)
return True
def connect_signals_slots(self) -> None:
self.action_About.triggered.connect(self.about)
self.action_Clear_selection.triggered.connect(self.clear_selection)
self.actionDebug.triggered.connect(self.debug)
self.actionClosePlaylist.triggered.connect(self.close_playlist_tab)
self.actionDeletePlaylist.triggered.connect(self.delete_playlist)
self.actionDownload_CSV_of_played_tracks.triggered.connect(
self.download_played_tracks
)
self.actionExport_playlist.triggered.connect(self.export_playlist_tab)
self.actionFade.triggered.connect(self.fade)
self.actionImport.triggered.connect(self.import_track)
self.actionInsertSectionHeader.triggered.connect(self.insert_header)
self.actionInsertTrack.triggered.connect(self.insert_track)
self.actionMark_for_moving.triggered.connect(self.cut_rows)
self.actionMoveSelected.triggered.connect(self.move_selected)
self.actionNew_from_template.triggered.connect(self.new_from_template)
self.actionNewPlaylist.triggered.connect(self.create_and_show_playlist)
self.actionOpenPlaylist.triggered.connect(self.open_playlist)
self.actionPaste.triggered.connect(self.paste_rows)
self.actionPlay_next.triggered.connect(self.play_next)
self.actionRenamePlaylist.triggered.connect(self.rename_playlist)
self.actionResume.triggered.connect(self.resume)
self.actionSave_as_template.triggered.connect(self.save_as_template)
self.actionSearch_title_in_Songfacts.triggered.connect(
self.lookup_row_in_songfacts
)
self.actionSearch_title_in_Wikipedia.triggered.connect(
self.lookup_row_in_wikipedia
)
self.actionSearch.triggered.connect(self.search_playlist)
self.actionSelect_duplicate_rows.triggered.connect(
lambda: self.active_tab().select_duplicate_rows()
)
self.actionMoveUnplayed.triggered.connect(self.move_unplayed)
self.actionSetNext.triggered.connect(self.set_selected_track_next)
self.actionSkipToNext.triggered.connect(self.play_next)
self.actionStop.triggered.connect(self.stop)
self.btnDrop3db.clicked.connect(self.drop3db)
self.btnFade.clicked.connect(self.fade)
self.btnHidePlayed.clicked.connect(self.hide_played)
self.btnPreview.clicked.connect(self.preview)
self.btnStop.clicked.connect(self.stop)
self.hdrCurrentTrack.clicked.connect(self.show_current)
self.hdrNextTrack.clicked.connect(self.show_next)
self.tabPlaylist.currentChanged.connect(self.tab_change)
self.tabPlaylist.tabCloseRequested.connect(self.close_tab)
self.tabBar = self.tabPlaylist.tabBar()
self.txtSearch.textChanged.connect(self.search_playlist_text_changed)
self.signals.enable_escape_signal.connect(self.enable_escape)
self.signals.next_track_changed_signal.connect(self.update_headers)
self.signals.status_message_signal.connect(self.show_status_message)
self.signals.show_warning_signal.connect(self.show_warning)
self.timer10.timeout.connect(self.tick_10ms)
self.timer500.timeout.connect(self.tick_500ms)
self.timer1000.timeout.connect(self.tick_1000ms)
def create_playlist(
self, session: scoped_session, playlist_name: Optional[str] = None
) -> Optional[Playlists]:
"""Create new playlist"""
log.info(f"create_playlist({playlist_name=}")
playlist_name = self.solicit_playlist_name(session)
if not playlist_name:
return None
playlist = Playlists(session, playlist_name)
if playlist:
playlist.mark_open()
return playlist
else:
log.error("Failed to create playlist")
return None
def create_and_show_playlist(self) -> None:
"""Create new playlist and display it"""
with Session() as session:
playlist = self.create_playlist(session)
if playlist:
self.create_playlist_tab(playlist)
def create_playlist_tab(self, playlist: Playlists) -> int:
"""
Take the passed playlist database object, create a playlist tab and
add tab to display. Return index number of tab.
"""
log.info(f"create_playlist_tab({playlist=})")
playlist_tab = PlaylistTab(
musicmuster=self,
playlist_id=playlist.id,
)
idx = self.tabPlaylist.addTab(playlist_tab, playlist.name)
log.info(f"create_playlist_tab() returned: {idx=}")
return idx
def cut_rows(self) -> None:
"""
Cut rows ready for pasting.
"""
# Save the selected PlaylistRows items ready for a later
# paste
self.move_source_rows = self.active_tab().get_selected_rows()
self.move_source_model = self.active_proxy_model()
log.info(f"cut_rows(): {self.move_source_rows=} {self.move_source_model=}")
def debug(self):
"""Invoke debugger"""
visible_playlist_id = self.active_tab().playlist_id
print(f"Active playlist id={visible_playlist_id}")
import ipdb # type: ignore
ipdb.set_trace()
def delete_playlist(self) -> None:
"""
Delete current playlist
"""
with Session() as session:
playlist_id = self.active_tab().playlist_id
playlist = session.get(Playlists, playlist_id)
if playlist:
if helpers.ask_yes_no(
"Delete playlist",
f"Delete playlist '{playlist.name}': " "Are you sure?",
):
if self.close_playlist_tab():
playlist.delete(session)
else:
log.error("Failed to retrieve playlist")
def download_played_tracks(self) -> None:
"""Download a CSV of played tracks"""
dlg = DownloadCSV(self)
if dlg.exec():
start_dt = dlg.ui.dateTimeEdit.dateTime().toPyDateTime()
# Get output filename
pathspec = QFileDialog.getSaveFileName(
self,
"Save CSV of tracks played",
directory="/tmp/playlist.csv",
filter="CSV files (*.csv)",
)
if not pathspec:
return
path = pathspec[0]
if not path.endswith(".csv"):
path += ".csv"
with open(path, "w") as f:
with Session() as session:
for playdate in Playdates.played_after(session, start_dt):
f.write(f"{playdate.track.artist},{playdate.track.title}\n")
def drop3db(self) -> None:
"""Drop music level by 3db if button checked"""
if self.btnDrop3db.isChecked():
self.music.set_volume(Config.VOLUME_VLC_DROP3db, set_default=False)
else:
self.music.set_volume(Config.VOLUME_VLC_DEFAULT, set_default=False)
def enable_escape(self, enabled: bool) -> None:
"""
Manage signal to enable/disable handling ESC character.
Needed because we want to use ESC when editing playlist in place,
so we need to disable it here while editing.
"""
log.debug(f"enable_escape({enabled=})")
self.action_Clear_selection.setEnabled(enabled)
def export_playlist_tab(self) -> None:
"""Export the current playlist to an m3u file"""
if not self.active_tab():
return
playlist_id = self.active_tab().playlist_id
with Session() as session:
# Get output filename
playlist = session.get(Playlists, playlist_id)
if not playlist:
return
pathspec = QFileDialog.getSaveFileName(
self,
"Save Playlist",
directory=f"{playlist.name}.m3u",
filter="M3U files (*.m3u);;All files (*.*)",
)
if not pathspec:
return
path = pathspec[0]
if not path.endswith(".m3u"):
path += ".m3u"
# Get list of track rows for this playlist
plrs = PlaylistRows.get_rows_with_tracks(session, playlist_id)
with open(path, "w") as f:
# Required directive on first line
f.write("#EXTM3U\n")
for track in [a.track for a in plrs]:
if track.duration is None:
track.duration = 0
f.write(
"#EXTINF:"
f"{int(track.duration / 1000)},"
f"{track.title} - "
f"{track.artist}"
"\n"
f"{track.path}"
"\n"
)
def fade(self) -> None:
"""Fade currently playing track"""
self.stop_playing(fade=True)
def get_playtime(self) -> int:
"""
Return number of milliseconds current track has been playing or
zero if not playing. The vlc function get_time() only updates 3-4
times a second; this function has much better resolution.
"""
if track_sequence.now.track_id is None or track_sequence.now.start_time is None:
return 0
now = datetime.now()
track_start = track_sequence.now.start_time
elapsed_seconds = (now - track_start).total_seconds()
return int(elapsed_seconds * 1000)
def hide_played(self):
"""Toggle hide played tracks"""
if self.hide_played_tracks:
self.hide_played_tracks = False
self.active_proxy_model().hide_played_tracks(False)
self.btnHidePlayed.setText("Hide played")
else:
self.hide_played_tracks = True
self.active_proxy_model().hide_played_tracks(True)
self.btnHidePlayed.setText("Show played")
# Reset row heights
self.active_tab().resize_rows()
def import_track(self) -> None:
"""Import track file"""
dlg = QFileDialog()
dlg.setFileMode(QFileDialog.FileMode.ExistingFiles)
dlg.setViewMode(QFileDialog.ViewMode.Detail)
dlg.setDirectory(Config.IMPORT_DESTINATION)
dlg.setNameFilter("Music files (*.flac *.mp3)")
if not dlg.exec():
return
with Session() as session:
new_tracks = []
for fname in dlg.selectedFiles():
txt = ""
tags = helpers.get_tags(fname)
title = tags["title"]
if not title:
helpers.show_warning(
self,
"Problem with track file",
f"{fname} does not have a title tag",
)
continue
artist = tags["artist"]
if not artist:
helpers.show_warning(
self,
"Problem with track file",
f"{fname} does not have an artist tag",
)
continue
count = 0
possible_matches = Tracks.search_titles(session, title)
if possible_matches:
txt += "Similar to new track "
txt += f'"{title}" by "{artist} ({fname})":\n\n'
for track in possible_matches:
txt += f' "{track.title}" by {track.artist}'
txt += f" ({track.path})\n\n"
count += 1
if count >= Config.MAX_IMPORT_MATCHES:
txt += "\nThere are more similar-looking tracks"
break
txt += "\n"
# Check whether to proceed if there were potential matches
txt += "Proceed with import?"
result = QMessageBox.question(
self,
"Possible duplicates",
txt,
QMessageBox.StandardButton.Ok,
QMessageBox.StandardButton.Cancel,
)
if result == QMessageBox.StandardButton.Cancel:
continue
new_tracks.append(fname)
# Import in separate thread
self.import_thread = QThread()
self.worker = ImportTrack(
new_tracks,
self.active_proxy_model(),
self.active_tab().source_model_selected_row_number(),
)
self.worker.moveToThread(self.import_thread)
self.import_thread.started.connect(self.worker.run)
self.worker.import_finished.connect(self.import_thread.quit)
self.worker.import_finished.connect(self.worker.deleteLater)
self.import_thread.finished.connect(self.import_thread.deleteLater)
self.import_thread.start()
def initialise_audacity(self) -> None:
"""
Initialise access to audacity
"""
try:
self.audacity_client = pipeclient.PipeClient()
log.info(f"{hex(id(self.audacity_client))=}")
except RuntimeError as e:
log.error(f"Unable to initialise Audacity: {str(e)}")
def insert_header(self) -> None:
"""Show dialog box to enter header text and add to playlist"""
proxy_model = self.active_proxy_model()
if proxy_model is None:
log.error("No proxy model")
return
# Get header text
dlg: QInputDialog = QInputDialog(self)
dlg.setInputMode(QInputDialog.InputMode.TextInput)
dlg.setLabelText("Header text:")
dlg.resize(500, 100)
ok = dlg.exec()
if ok:
proxy_model.insert_row(
proposed_row_number=self.active_tab().source_model_selected_row_number(),
note=dlg.textValue(),
)
def insert_track(self) -> None:
"""Show dialog box to select and add track from database"""
new_row_number = (
self.active_tab().source_model_selected_row_number()
or self.active_proxy_model().rowCount()
)
with Session() as session:
dlg = TrackSelectDialog(
session=session,
new_row_number=new_row_number,
source_model=self.active_proxy_model(),
)
dlg.exec()
def load_last_playlists(self) -> None:
"""Load the playlists that were open when the last session closed"""
playlist_ids = []
with Session() as session:
for playlist in Playlists.get_open(session):
if playlist:
_ = self.create_playlist_tab(playlist)
playlist_ids.append(playlist.id)
log.info(f"load_last_playlists() loaded {playlist=}")
# Set active tab
record = Settings.get_int_settings(session, "active_tab")
if record.f_int is not None and record.f_int >= 0:
self.tabPlaylist.setCurrentIndex(record.f_int)
# Tabs may move during use. Rather than track where tabs
# are, we record the tab index when we close the main
# window. To avoid possible duplicate tab entries, we null
# them all out now.
Playlists.clear_tabs(session, playlist_ids)
def lookup_row_in_songfacts(self) -> None:
"""
Display songfacts page for title in highlighted row
"""
row_number = self.active_tab().source_model_selected_row_number()
if row_number is None:
return
track_info = self.active_proxy_model().get_row_info(row_number)
if track_info is None:
return
self.signals.search_songfacts_signal.emit(track_info.title)
def lookup_row_in_wikipedia(self) -> None:
"""
Display Wikipedia page for title in highlighted row
"""
row_number = self.active_tab().source_model_selected_row_number()
if row_number is None:
return
track_info = self.active_proxy_model().get_row_info(row_number)
if track_info is None:
return
self.signals.search_wikipedia_signal.emit(track_info.title)
def move_playlist_rows(self, row_numbers: List[int]) -> None:
"""
Move passed playlist rows to another playlist
"""
# Identify destination playlist
playlists = []
visible_tab = self.active_tab()
source_playlist_id = visible_tab.playlist_id
with Session() as session:
for playlist in Playlists.get_all(session):
if playlist.id == source_playlist_id:
continue
else:
playlists.append(playlist)
dlg = SelectPlaylistDialog(self, playlists=playlists, session=session)
dlg.exec()
if not dlg.playlist:
return
to_playlist_id = dlg.playlist.id
# Get row number in destination playlist
last_row = PlaylistRows.get_last_used_row(session, to_playlist_id)
if last_row is not None:
to_row = last_row + 1
else:
to_row = 0
# Move rows
self.active_proxy_model().move_rows_between_playlists(
row_numbers, to_row, to_playlist_id
)
def move_selected(self) -> None:
"""
Move selected rows to another playlist
"""
selected_rows = self.active_tab().get_selected_rows()
if not selected_rows:
return
self.move_playlist_rows(selected_rows)
def move_unplayed(self) -> None:
"""
Move unplayed rows to another playlist
"""
unplayed_rows = self.active_proxy_model().get_unplayed_rows()
if not unplayed_rows:
return
# We can get a race condition as selected rows change while
# moving so disable selected rows timing for move
self.disable_selection_timing = True
self.move_playlist_rows(unplayed_rows)
self.disable_selection_timing = False
def new_from_template(self) -> None:
"""Create new playlist from template"""
with Session() as session:
templates = Playlists.get_all_templates(session)
dlg = SelectPlaylistDialog(self, playlists=templates, session=session)
dlg.exec()
template = dlg.playlist
if template:
playlist_name = self.solicit_playlist_name(session)
if not playlist_name:
log.error("Template has no name")
return
playlist = Playlists.create_playlist_from_template(
session, template, playlist_name
)
# Need to ensure that the new playlist is committed to
# the database before it is opened by the model.
session.commit()
if playlist:
log.error("Playlist failed to create")
playlist.mark_open()
self.create_playlist_tab(playlist)
def open_playlist(self) -> None:
"""Open existing playlist"""
with Session() as session:
playlists = Playlists.get_closed(session)
dlg = SelectPlaylistDialog(self, playlists=playlists, session=session)
dlg.exec()
playlist = dlg.playlist
if playlist:
idx = self.create_playlist_tab(playlist)
playlist.mark_open()
self.tabPlaylist.setCurrentIndex(idx)
def paste_rows(self) -> None:
"""
Paste earlier cut rows.
"""
if self.move_source_rows is None or self.move_source_model is None:
return
to_playlist_model: PlaylistModel = self.active_tab().source_model
selected_rows = self.active_tab().get_selected_rows()
if selected_rows:
destination_row = selected_rows[0]
else:
destination_row = self.active_proxy_model().rowCount()
if (
to_playlist_model.playlist_id
== self.move_source_model.source_model.playlist_id
):
self.move_source_model.move_rows(self.move_source_rows, destination_row)
else:
self.move_source_model.move_rows_between_playlists(
self.move_source_rows, destination_row, to_playlist_model.playlist_id
)
self.active_tab().resize_rows()
self.active_tab().clear_selection()
def play_next(self, position: Optional[float] = None) -> None:
"""
Play next track, optionally from passed position.
Actions required:
- If there is no next track set, return.
- If there's currently a track playing, fade it.
- Move next track to current track.
- Clear next track
- Restore volume if -3dB active
- Play (new) current track.
- Ensure 100% volume
- Show closing volume graph
- Notify model
- Note that track is now playing
- Disable play next controls
- Update headers
"""
# Check for inadvertent press of 'return'
if self.catch_return_key:
if not helpers.ask_yes_no(
"Track playing",
"Really play next track now?",
default_yes=True,
parent=self,
):
return
log.info(f"play_next({position=})")
# If there is no next track set, return.
if not track_sequence.next.track_id:
log.error("musicmuster.play_next(): no next track selected")
return
if not track_sequence.next.path:
log.error("musicmuster.play_next(): no path for next track")
return
# If there's currently a track playing, fade it.
self.stop_playing(fade=True)
# Move next track to current track.
# stop_playing() above has called end_of_track_actions()
# which will have populated self.previous_track
track_sequence.now = track_sequence.next
# Clear next track
self.clear_next()
# Restore volume if -3dB active
if self.btnDrop3db.isChecked():
log.debug("Reset -3db button")
self.btnDrop3db.setChecked(False)
# Play (new) current track
if not track_sequence.now.path:
log.error("No path for next track")
return
self.music.play(track_sequence.now.path, position)
# Ensure 100% volume
# For as-yet unknown reasons. sometimes the volume gets
# reset to zero within 200mS or so of starting play. This
# only happened since moving to Debian 12, which uses
# Pipewire for sound (which may be irrelevant).
for _ in range(3):
if self.music.player:
volume = self.music.player.audio_get_volume()
if volume < Config.VOLUME_VLC_DEFAULT:
self.music.set_volume()
log.debug(f"Reset from {volume=}")
break
sleep(0.1)
# Show closing volume graph
if track_sequence.now.fade_graph:
track_sequence.now.fade_graph.plot()
else:
log.error("No fade_graph")
# Note that track is playing
log.debug("set track_sequence")
track_sequence.now.start()
self.playing = True
# Disable play next controls
self.catch_return_key = True
self.show_status_message("Play controls: Disabled", 0)
# Notify model
self.active_proxy_model().current_track_started()
# Update headers
log.debug("update headers")
self.update_headers()
def preview(self) -> None:
"""
Preview selected or next track. We use a different mechanism to
normal track playing so that the user can route the output audio
differently (eg, to headphones).
"""
if self.btnPreview.isChecked():
# Get track path for first selected track if there is one
track_path = self.active_tab().get_selected_row_track_path()
if not track_path:
# Otherwise get path to next track to play
track_path = track_sequence.next.path
if not track_path:
self.btnPreview.setChecked(False)
return
mixer.music.load(track_path)
mixer.music.play()
else:
mixer.music.stop()
def rename_playlist(self) -> None:
"""
Rename current playlist
"""
with Session() as session:
playlist_id = self.active_tab().playlist_id
playlist = session.get(Playlists, playlist_id)
if playlist:
new_name = self.solicit_playlist_name(session, playlist.name)
if new_name:
playlist.rename(session, new_name)
idx = self.tabBar.currentIndex()
self.tabBar.setTabText(idx, new_name)
def resume(self) -> None:
"""
Resume playing last track. We may be playing the next track
or none; take care of both eventualities.
Actions required:
- Return if no saved position
- Resume last track
- If a track is playing, make that the next track
"""
log.info("resume()")
# Return if no saved position
if not track_sequence.previous.resume_marker:
log.error("No previous track position")
return
# We want to use play_next() to resume, so copy the previous
# track to the next track:
track_sequence.next = track_sequence.previous
# Now resume playing the now-next track
self.play_next(track_sequence.next.resume_marker)
# Adjust track info so that clocks and graph are correct.
# We need to fake the start time to reflect where we resumed the
# track
if (
track_sequence.now.start_time
and track_sequence.now.duration
and track_sequence.now.resume_marker
):
elapsed_ms = track_sequence.now.duration * track_sequence.now.resume_marker
track_sequence.now.start_time -= timedelta(milliseconds=elapsed_ms)
def save_as_template(self) -> None:
"""Save current playlist as template"""
with Session() as session:
template_names = [a.name for a in Playlists.get_all_templates(session)]
while True:
# Get name for new template
dlg = QInputDialog(self)
dlg.setInputMode(QInputDialog.InputMode.TextInput)
dlg.setLabelText("Template name:")
dlg.resize(500, 100)
ok = dlg.exec()
if not ok:
return
template_name = dlg.textValue()
if template_name not in template_names:
break
helpers.show_warning(
self, "Duplicate template", "Template name already in use"
)
Playlists.save_as_template(
session, self.active_tab().playlist_id, template_name
)
helpers.show_OK(self, "Template", "Template saved")
def search_playlist(self) -> None:
"""Show text box to search playlist"""
# Disable play controls so that 'return' in search box doesn't
# play next track
self.catch_return_key = True
self.txtSearch.setHidden(False)
self.txtSearch.setFocus()
# Select any text that may already be there
self.txtSearch.selectAll()
def search_playlist_clear(self) -> None:
"""Tidy up and reset search bar"""
# Clean up search bar
self.txtSearch.setText("")
self.txtSearch.setHidden(True)
def search_playlist_text_changed(self) -> None:
"""
Incremental search of playlist
"""
self.active_proxy_model().set_incremental_search(self.txtSearch.text())
def select_next_row(self) -> None:
"""Select next or first row in playlist"""
self.active_tab().select_next_row()
def select_previous_row(self) -> None:
"""Select previous or first row in playlist"""
self.active_tab().select_previous_row()
def set_main_window_size(self) -> None:
"""Set size of window from database"""
with Session() as session:
settings = Settings.all_as_dict(session)
record = settings["mainwindow_x"]
x = record.f_int or 1
record = settings["mainwindow_y"]
y = record.f_int or 1
record = settings["mainwindow_width"]
width = record.f_int or 1599
record = settings["mainwindow_height"]
height = record.f_int or 981
self.setGeometry(x, y, width, height)
record = settings["splitter_top"]
splitter_top = record.f_int or 256
record = settings["splitter_bottom"]
splitter_bottom = record.f_int or 256
self.splitter.setSizes([splitter_top, splitter_bottom])
return
def set_selected_track_next(self) -> None:
"""
Set currently-selected row on visible playlist tab as next track
"""
playlist_tab = self.active_tab()
if playlist_tab:
playlist_tab.set_row_as_next_track()
else:
log.error("No active tab")
def set_tab_colour(self, widget: PlaylistTab, colour: QColor) -> None:
"""
Find the tab containing the widget and set the text colour
"""
idx = self.tabPlaylist.indexOf(widget)
self.tabPlaylist.tabBar().setTabTextColor(idx, colour)
def show_current(self) -> None:
"""Scroll to show current track"""
self.show_track(track_sequence.now)
def show_warning(self, title: str, body: str) -> None:
"""
Display a warning dialog
"""
print(f"show_warning({title=}, {body=})")
QMessageBox.warning(self, title, body)
def show_next(self) -> None:
"""Scroll to show next track"""
self.show_track(track_sequence.next)
def show_status_message(self, message: str, timing: int) -> None:
"""
Show status message in status bar for timing milliseconds
"""
self.statusbar.showMessage(message, timing)
def show_track(self, plt: PlaylistTrack) -> None:
"""Scroll to show track in plt"""
# Switch to the correct tab
plt_playlist_id = plt.playlist_id
if not plt_playlist_id:
# No playlist
return
if plt_playlist_id != self.active_tab().playlist_id:
for idx in range(self.tabPlaylist.count()):
if self.tabPlaylist.widget(idx).playlist_id == plt_playlist_id:
self.tabPlaylist.setCurrentIndex(idx)
break
display_row = (
self.active_proxy_model()
.mapFromSource(
self.active_proxy_model().source_model.index(plt.plr_rownum, 0)
)
.row()
)
self.tabPlaylist.currentWidget().scroll_to_top(display_row)
def solicit_playlist_name(
self, session: scoped_session, default: str = ""
) -> Optional[str]:
"""Get name of new playlist from user"""
dlg = QInputDialog(self)
dlg.setInputMode(QInputDialog.InputMode.TextInput)
dlg.setLabelText("Playlist name:")
while True:
if default:
dlg.setTextValue(default)
dlg.resize(500, 100)
ok = dlg.exec()
if ok:
proposed_name = dlg.textValue()
if Playlists.name_is_available(session, proposed_name):
return proposed_name
else:
helpers.show_warning(
self,
"Name in use",
f"There's already a playlist called '{proposed_name}'",
)
continue
else:
return None
def stop(self) -> None:
"""Stop playing immediately"""
self.stop_playing(fade=False)
def stop_playing(self, fade: bool = True) -> None:
"""
Stop playing current track
Actions required:
- Set flag to say we're not playing a track
- Return if not playing
- Stop/fade track
- Reset playlist_tab colour
- Tell playlist_tab track has finished
- Reset PlaylistTrack objects
- Reset clocks
- Reset fade graph
- Update headers
- Enable controls
"""
# Set flag to say we're not playing a track so that timer ticks
# don't see player=None and kick off end-of-track actions
if self.playing:
self.playing = False
else:
# Return if not playing
log.info("stop_playing() called but not playing")
return
# Stop/fade track
track_sequence.now.resume_marker = self.music.get_position()
if fade:
self.music.fade()
else:
self.music.stop()
# Reset fade graph
if track_sequence.now.fade_graph:
track_sequence.now.fade_graph.clear()
# Reset track_sequence objects
if track_sequence.now.track_id:
track_sequence.previous = track_sequence.now
track_sequence.now = PlaylistTrack()
# Tell model previous track has finished
self.active_proxy_model().previous_track_ended()
# Reset clocks
self.frame_fade.setStyleSheet("")
self.frame_silent.setStyleSheet("")
self.label_elapsed_timer.setText("00:00 / 00:00")
self.label_fade_timer.setText("00:00")
self.label_silent_timer.setText("00:00")
# Update headers
self.update_headers()
# Enable controls
self.catch_return_key = False
self.show_status_message("Play controls: Enabled", 0)
def tab_change(self):
"""Called when active tab changed"""
self.active_tab().resize_rows()
def tick_10ms(self) -> None:
"""
Called every 10ms
"""
# Update volume fade curve
if (
track_sequence.now.fade_graph_start_updates is None
or track_sequence.now.fade_graph_start_updates > datetime.now()
):
return
if (
track_sequence.now.track_id
and track_sequence.now.fade_graph
and track_sequence.now.start_time
):
play_time = (
datetime.now() - track_sequence.now.start_time
).total_seconds() * 1000
track_sequence.now.fade_graph.tick(play_time)
def tick_500ms(self) -> None:
"""
Called every 500ms
"""
self.lblTOD.setText(datetime.now().strftime(Config.TOD_TIME_FORMAT))
# Update carts
# self.cart_tick()
def tick_1000ms(self) -> None:
"""
Called every 1000ms
"""
# Ensure preview button is reset if preview finishes playing
self.btnPreview.setChecked(mixer.music.get_busy())
# Only update play clocks once a second so that their updates
# are synchronised (otherwise it looks odd)
if not self.playing:
return
# If track is playing, update track clocks time and colours
# There is a discrete time between starting playing a track and
# player.is_playing() returning True, so assume playing if less
# than Config.PLAY_SETTLE microseconds have passed since
# starting play.
if (
self.music.player
and track_sequence.now.start_time
and (
self.music.player.is_playing()
or (datetime.now() - track_sequence.now.start_time)
< timedelta(microseconds=Config.PLAY_SETTLE)
)
):
playtime = self.get_playtime()
time_to_fade = track_sequence.now.fade_at - playtime
time_to_silence = track_sequence.now.silence_at - playtime
# Elapsed time
self.label_elapsed_timer.setText(
helpers.ms_to_mmss(playtime)
+ " / "
+ helpers.ms_to_mmss(track_sequence.now.duration)
)
# Time to fade
self.label_fade_timer.setText(helpers.ms_to_mmss(time_to_fade))
# If silent in the next 5 seconds, put warning colour on
# time to silence box and enable play controls
if time_to_silence <= Config.WARNING_MS_BEFORE_SILENCE:
css_silence = f"background: {Config.COLOUR_ENDING_TIMER}"
if self.frame_silent.styleSheet() != css_silence:
self.frame_silent.setStyleSheet(css_silence)
self.catch_return_key = False
self.show_status_message("Play controls: Enabled", 0)
# Set warning colour on time to silence box when fade starts
elif time_to_fade <= 500:
css_fade = f"background: {Config.COLOUR_WARNING_TIMER}"
if self.frame_silent.styleSheet() != css_fade:
self.frame_silent.setStyleSheet(css_fade)
# Five seconds before fade starts, set warning colour on
# time to silence box and enable play controls
elif time_to_fade <= Config.WARNING_MS_BEFORE_FADE:
self.frame_fade.setStyleSheet(
f"background: {Config.COLOUR_WARNING_TIMER}"
)
self.catch_return_key = False
self.show_status_message("Play controls: Enabled", 0)
else:
self.frame_silent.setStyleSheet("")
self.frame_fade.setStyleSheet("")
self.label_silent_timer.setText(helpers.ms_to_mmss(time_to_silence))
# Autoplay next track
# if time_to_silence <= 1500:
# self.play_next()
else:
if self.playing:
self.stop_playing()
def update_headers(self) -> None:
"""
Update last / current / next track headers
"""
if track_sequence.previous.title and track_sequence.previous.artist:
self.hdrPreviousTrack.setText(
f"{track_sequence.previous.title} - {track_sequence.previous.artist}"
)
else:
self.hdrPreviousTrack.setText("")
if track_sequence.now.title and track_sequence.now.artist:
self.hdrCurrentTrack.setText(
f"{track_sequence.now.title.replace('&', '&&')} - "
f"{track_sequence.now.artist.replace('&', '&&')}"
)
else:
self.hdrCurrentTrack.setText("")
if track_sequence.next.title and track_sequence.next.artist:
self.hdrNextTrack.setText(
f"{track_sequence.next.title.replace('&', '&&')} - "
f"{track_sequence.next.artist.replace('&', '&&')}"
)
else:
self.hdrNextTrack.setText("")
class CartDialog(QDialog):
"""Edit cart details"""
def __init__(
self, musicmuster: Window, session: scoped_session, cart: Carts, *args, **kwargs
) -> None:
"""
Manage carts
"""
super().__init__(*args, **kwargs)
self.musicmuster = musicmuster
self.session = session
self.ui = Ui_DialogCartEdit()
self.ui.setupUi(self)
self.path = cart.path
self.ui.lblPath.setText(self.path)
self.ui.lineEditName.setText(cart.name)
self.ui.chkEnabled.setChecked(cart.enabled)
self.setWindowTitle("Edit Cart " + str(cart.id))
self.ui.btnFile.clicked.connect(self.choose_file)
def choose_file(self) -> None:
"""File select"""
dlg = QFileDialog()
dlg.setFileMode(QFileDialog.FileMode.ExistingFile)
dlg.setViewMode(QFileDialog.ViewMode.Detail)
dlg.setDirectory(Config.CART_DIRECTORY)
dlg.setNameFilter("Music files (*.flac *.mp3)")
if dlg.exec():
self.path = dlg.selectedFiles()[0]
self.ui.lblPath.setText(self.path)
class DownloadCSV(QDialog):
def __init__(self, parent=None):
super().__init__()
self.ui = Ui_DateSelect()
self.ui.setupUi(self)
self.ui.dateTimeEdit.setDate(QDate.currentDate())
self.ui.dateTimeEdit.setTime(QTime(19, 59, 0))
self.ui.buttonBox.accepted.connect(self.accept)
self.ui.buttonBox.rejected.connect(self.reject)
class SelectPlaylistDialog(QDialog):
def __init__(self, parent=None, playlists=None, session=None):
super().__init__()
if playlists is None:
return
self.ui = Ui_dlgSelectPlaylist()
self.ui.setupUi(self)
self.ui.lstPlaylists.itemDoubleClicked.connect(self.list_doubleclick)
self.ui.buttonBox.accepted.connect(self.open)
self.ui.buttonBox.rejected.connect(self.close)
self.session = session
self.playlist = None
record = Settings.get_int_settings(self.session, "select_playlist_dialog_width")
width = record.f_int or 800
record = Settings.get_int_settings(
self.session, "select_playlist_dialog_height"
)
height = record.f_int or 600
self.resize(width, height)
for playlist in playlists:
p = QListWidgetItem()
p.setText(playlist.name)
p.setData(Qt.ItemDataRole.UserRole, playlist)
self.ui.lstPlaylists.addItem(p)
def __del__(self): # review
record = Settings.get_int_settings(
self.session, "select_playlist_dialog_height"
)
if record.f_int != self.height():
record.update(self.session, {"f_int": self.height()})
record = Settings.get_int_settings(self.session, "select_playlist_dialog_width")
if record.f_int != self.width():
record.update(self.session, {"f_int": self.width()})
def list_doubleclick(self, entry): # review
self.playlist = entry.data(Qt.ItemDataRole.UserRole)
self.accept()
def open(self): # review
if self.ui.lstPlaylists.selectedItems():
item = self.ui.lstPlaylists.currentItem()
self.playlist = item.data(Qt.ItemDataRole.UserRole)
self.accept()
if __name__ == "__main__":
"""
If command line arguments given, carry out requested function and
exit. Otherwise run full application.
"""
p = argparse.ArgumentParser()
# Only allow at most one option to be specified
group = p.add_mutually_exclusive_group()
group.add_argument(
"-b",
"--bitrates",
action="store_true",
dest="update_bitrates",
default=False,
help="Update bitrates in database",
)
group.add_argument(
"-c",
"--check-database",
action="store_true",
dest="check_db",
default=False,
help="Check and report on database",
)
args = p.parse_args()
# Run as required
if args.check_db:
log.debug("Updating database")
with Session() as session:
check_db(session)
engine.dispose()
elif args.update_bitrates:
log.debug("Update bitrates")
with Session() as session:
update_bitrates(session)
engine.dispose()
else:
try:
Base.metadata.create_all(engine)
app = QApplication(sys.argv)
# PyQt6 defaults to a grey for labels
palette = app.palette()
palette.setColor(
QPalette.ColorRole.WindowText, QColor(Config.COLOUR_LABEL_TEXT)
)
# Set colours that will be used by playlist row stripes
palette.setColor(
QPalette.ColorRole.Base, QColor(Config.COLOUR_EVEN_PLAYLIST)
)
palette.setColor(
QPalette.ColorRole.AlternateBase, QColor(Config.COLOUR_ODD_PLAYLIST)
)
app.setPalette(palette)
win = Window()
win.show()
status = app.exec()
engine.dispose()
sys.exit(status)
except Exception as exc:
if os.environ["MM_ENV"] == "PRODUCTION":
from helpers import send_mail
msg = stackprinter.format(exc)
send_mail(
Config.ERRORS_TO,
Config.ERRORS_FROM,
"Exception from musicmuster",
msg,
)
else:
print("\033[1;31;47mUnhandled exception starts")
print(
stackprinter.format(
exc, suppressed_paths=["/pypoetry/virtualenvs/"], style="darkbg"
)
)
print("Unhandled exception ends\033[1;37;40m")