musicmuster/app/musicmuster.py
2023-10-21 13:49:13 +01:00

2230 lines
76 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
from log import log
from os.path import basename
import argparse
import os
import numpy as np
import pyqtgraph as pg # type: ignore
import stackprinter # type: ignore
import subprocess
import sys
import threading
from datetime import datetime, timedelta
from pygame import mixer
from time import sleep
from typing import (
Callable,
cast,
List,
Optional,
Sequence,
)
from sqlalchemy import text
from PyQt6.QtCore import (
pyqtSignal,
QDate,
QEvent,
QObject,
Qt,
QSize,
QThread,
QTime,
QTimer,
)
from PyQt6.QtGui import (
QCloseEvent,
QColor,
QFont,
QMouseEvent,
QPalette,
QResizeEvent,
)
from PyQt6.QtWidgets import (
QApplication,
QDialog,
QFileDialog,
QInputDialog,
QLabel,
QLineEdit,
QListWidgetItem,
QMainWindow,
QMessageBox,
QPushButton,
QProgressBar,
)
from dbconfig import (
engine,
Session,
scoped_session,
)
import helpers
import icons_rc # noqa F401
import music
from models import Base, Carts, Playdates, PlaylistRows, Playlists, Settings, Tracks
from config import Config
from playlists import PlaylistTab
from ui.dlg_cart_ui import Ui_DialogCartEdit # type: ignore
from ui.dlg_search_database_ui import Ui_Dialog # type: ignore
from ui.dlg_SelectPlaylist_ui import Ui_dlgSelectPlaylist # type: ignore
from ui.downloadcsv_ui import Ui_DateSelect # type: ignore
from ui.main_window_ui import Ui_MainWindow # type: ignore
from utilities import check_db, update_bitrates
class CartButton(QPushButton):
"""Button for playing carts"""
progress = pyqtSignal(int)
def __init__(self, musicmuster: "Window", cart: Carts, *args, **kwargs):
"""Create a cart pushbutton and set it disabled"""
super().__init__(*args, **kwargs)
self.musicmuster = musicmuster
self.cart_id = cart.id
if cart.path and cart.enabled and not cart.duration:
tags = helpers.get_tags(cart.path)
cart.duration = tags["duration"]
self.duration = cart.duration
self.path = cart.path
self.player = None
self.is_playing = False
self.setEnabled(True)
self.setMinimumSize(QSize(147, 61))
font = QFont()
font.setPointSize(14)
self.setFont(font)
self.setObjectName("cart_" + str(cart.cart_number))
self.pgb = QProgressBar(self)
self.pgb.setTextVisible(False)
self.pgb.setVisible(False)
palette = self.pgb.palette()
palette.setColor(
QPalette.ColorRole.Highlight, QColor(Config.COLOUR_CART_PROGRESSBAR)
)
self.pgb.setPalette(palette)
self.pgb.setGeometry(0, 0, self.width(), 10)
self.pgb.setMinimum(0)
self.pgb.setMaximum(1)
self.pgb.setValue(0)
self.progress.connect(self.pgb.setValue)
def __repr__(self) -> str:
return (
f"<CartButton(cart_id={self.cart_id} "
f"path={self.path}, is_playing={self.is_playing}>"
)
def event(self, event: Optional[QEvent]) -> bool:
"""Allow right click even when button is disabled"""
if not event:
return False
if event.type() == QEvent.Type.MouseButtonRelease:
mouse_event = cast(QMouseEvent, event)
if mouse_event.button() == Qt.MouseButton.RightButton:
self.musicmuster.cart_edit(self, event)
return True
return super().event(event)
def resizeEvent(self, event: Optional[QResizeEvent]) -> None:
"""Resize progess bar when button size changes"""
self.pgb.setGeometry(0, 0, self.width(), 10)
class FadeCurve:
GraphWidget = None
def __init__(self, track):
"""
Set up fade graph array
"""
audio = helpers.get_audio_segment(track.path)
if not audio:
return None
# Start point of curve is Config.FADE_CURVE_MS_BEFORE_FADE
# milliseconds before fade starts to silence
self.start_ms = max(0, track.fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1)
self.end_ms = track.silence_at
self.audio_segment = audio[self.start_ms : self.end_ms]
self.graph_array = np.array(self.audio_segment.get_array_of_samples())
# Calculate the factor to map milliseconds of track to array
self.ms_to_array_factor = len(self.graph_array) / (self.end_ms - self.start_ms)
self.region = None
def clear(self) -> None:
"""Clear the current graph"""
if self.GraphWidget:
self.GraphWidget.clear()
def plot(self):
self.curve = self.GraphWidget.plot(self.graph_array)
self.curve.setPen(Config.FADE_CURVE_FOREGROUND)
def tick(self, play_time) -> None:
"""Update volume fade curve"""
if not self.GraphWidget:
return
ms_of_graph = play_time - self.start_ms
if ms_of_graph < 0:
return
if self.region is None:
# Create the region now that we're into fade
self.region = pg.LinearRegionItem([0, 0], bounds=[0, len(self.graph_array)])
self.GraphWidget.addItem(self.region)
# Update region position
self.region.setRegion([0, ms_of_graph * self.ms_to_array_factor])
class ImportTrack(QObject):
import_error = pyqtSignal(str)
importing = pyqtSignal(str)
finished = pyqtSignal(PlaylistTab)
def __init__(self, playlist: PlaylistTab, filenames: list, row: int) -> None:
super().__init__()
self.filenames = filenames
self.playlist = playlist
self.row = row
def run(self):
"""
Create track objects from passed files and add to visible playlist
"""
target_row = self.row
with Session() as session:
for fname in self.filenames:
self.importing.emit(f"Importing {basename(fname)}")
metadata = helpers.get_file_metadata(fname)
try:
track = Tracks(session, **metadata)
except Exception as e:
print(e)
return
helpers.normalise_track(track.path)
self.playlist.insert_track(session, track, target_row)
# Insert next row under this one
target_row += 1
# We're importing potentially multiple tracks in a loop.
# If there's an error adding the track to the Tracks
# table, the session will rollback, thus losing any
# previous additions in this loop. So, commit now to
# lock in what we've just done.
session.commit()
self.playlist.save_playlist(session)
self.finished.emit(self.playlist)
class MusicMusterSignals(QObject):
"""
Class for all MusicMuster signals. See:
- https://zetcode.com/gui/pyqt5/eventssignals/
- https://stackoverflow.com/questions/62654525/
emit-a-signal-from-another-class-to-main-class
"""
set_next_track_signal = pyqtSignal(int, int)
span_cells_signal = pyqtSignal(int, int, int, int)
enable_escape_signal = pyqtSignal(bool)
class PlaylistTrack:
"""
Used to provide a single reference point for specific playlist tracks,
typically the previous, current and next track.
"""
def __init__(self) -> None:
"""
Only initialises data structure. Call set_plr to populate.
Do NOT store row_number here - that changes if tracks are reordered
in playlist (add, remove, drag/drop) and we shouldn't care about row
number: that's the playlist's problem.
"""
self.artist: Optional[str] = None
self.duration: Optional[int] = None
self.end_time: Optional[datetime] = None
self.fade_at: Optional[int] = None
self.fade_curve: Optional[FadeCurve] = None
self.fade_length: Optional[int] = None
self.path: Optional[str] = None
self.playlist_id: Optional[int] = None
self.playlist_tab: Optional[PlaylistTab] = None
self.plr_id: Optional[int] = None
self.silence_at: Optional[int] = None
self.start_gap: Optional[int] = None
self.start_time: Optional[datetime] = None
self.title: Optional[str] = None
self.track_id: Optional[int] = None
def __repr__(self) -> str:
return (
f"<PlaylistTrack(title={self.title}, artist={self.artist}, "
f"playlist_id={self.playlist_id}>"
)
def set_plr(
self, session: scoped_session, plr: PlaylistRows, tab: PlaylistTab
) -> None:
"""
Update with new plr information
"""
if not plr.track:
return
self.playlist_tab = tab
session.add(plr)
track = plr.track
self.artist = track.artist
self.duration = track.duration
self.end_time = None
self.fade_at = track.fade_at
self.fade_graph = FadeCurve(track)
self.path = track.path
self.playlist_id = plr.playlist_id
self.plr_id = plr.id
self.silence_at = track.silence_at
self.start_gap = track.start_gap
self.start_time = None
self.title = track.title
self.track_id = track.id
if track.silence_at and track.fade_at:
self.fade_length = track.silence_at - track.fade_at
def start(self) -> None:
"""
Called when track starts playing
"""
self.start_time = datetime.now()
if self.duration:
self.end_time = self.start_time + timedelta(milliseconds=self.duration)
class Window(QMainWindow, Ui_MainWindow):
def __init__(self, parent=None, *args, **kwargs) -> None:
super().__init__(parent)
self.setupUi(self)
self.timer10: QTimer = QTimer()
self.timer500: QTimer = QTimer()
self.timer1000: QTimer = QTimer()
self.music: music.Music = music.Music()
self.playing: bool = False
self.current_track = PlaylistTrack()
self.next_track = PlaylistTrack()
self.previous_track = PlaylistTrack()
self.previous_track_position: Optional[float] = None
self.selected_plrs: Optional[List[PlaylistRows]] = None
self.signals = MusicMusterSignals()
self.set_main_window_size()
self.lblSumPlaytime = QLabel("")
self.statusbar.addPermanentWidget(self.lblSumPlaytime)
self.txtSearch = QLineEdit()
self.statusbar.addWidget(self.txtSearch)
self.txtSearch.setHidden(True)
self.hide_played_tracks = False
mixer.init()
self.widgetFadeVolume.hideAxis("bottom")
self.widgetFadeVolume.hideAxis("left")
self.widgetFadeVolume.setDefaultPadding(0)
self.widgetFadeVolume.setBackground(Config.FADE_CURVE_BACKGROUND)
FadeCurve.GraphWidget = self.widgetFadeVolume
self.visible_playlist_tab: Callable[
[], PlaylistTab
] = self.tabPlaylist.currentWidget
self.load_last_playlists()
if Config.CARTS_HIDE:
self.cartsWidget.hide()
self.frame_6.hide()
else:
self.carts_init()
self.enable_play_next_controls()
self.clock_counter = 0
self.timer10.start(10)
self.timer500.start(500)
self.timer1000.start(1000)
self.connect_signals_slots()
def about(self) -> None:
"""Get git tag and database name"""
try:
git_tag = str(
subprocess.check_output(["git", "describe"], stderr=subprocess.STDOUT)
).strip("'b\\n")
except subprocess.CalledProcessError as exc_info:
git_tag = str(exc_info.output)
with Session() as session:
if session.bind:
dbname = session.bind.engine.url.database
QMessageBox.information(
self,
"About",
f"MusicMuster {git_tag}\n\nDatabase: {dbname}",
QMessageBox.StandardButton.Ok,
)
def cart_configure(self, cart: Carts, btn: CartButton) -> None:
"""Configure button with cart data"""
btn.setEnabled(False)
btn.pgb.setVisible(False)
if cart.path:
if not helpers.file_is_unreadable(cart.path):
colour = Config.COLOUR_CART_READY
btn.path = cart.path
btn.player = self.music.VLC.media_player_new(cart.path)
if btn.player:
btn.player.audio_set_volume(Config.VOLUME_VLC_DEFAULT)
if cart.enabled:
btn.setEnabled(True)
btn.pgb.setVisible(True)
else:
colour = Config.COLOUR_CART_ERROR
else:
colour = Config.COLOUR_CART_UNCONFIGURED
btn.setStyleSheet("background-color: " + colour + ";\n")
if cart.name is not None:
btn.setText(cart.name)
def cart_click(self) -> None:
"""Handle cart click"""
btn = self.sender()
if not isinstance(btn, CartButton):
return
if not helpers.file_is_unreadable(btn.path):
# Don't allow clicks while we're playing
btn.setEnabled(False)
if not btn.player:
log.debug(f"musicmuster.cart_click(): no player assigned ({btn=})")
return
btn.player.play()
btn.is_playing = True
colour = Config.COLOUR_CART_PLAYING
thread = threading.Thread(target=self.cart_progressbar, args=(btn,))
thread.start()
else:
colour = Config.COLOUR_CART_ERROR
btn.setStyleSheet("background-color: " + colour + ";\n")
btn.pgb.setMinimum(0)
def cart_edit(self, btn: CartButton, event: QEvent):
"""Handle context menu for cart button"""
with Session() as session:
cart = session.query(Carts).get(btn.cart_id)
if cart is None:
log.error("cart_edit: cart not found")
return
dlg = CartDialog(musicmuster=self, session=session, cart=cart)
if dlg.exec():
name = dlg.ui.lineEditName.text()
if not name:
QMessageBox.warning(self, "Error", "Name required")
return
path = dlg.path
if not path:
QMessageBox.warning(self, "Error", "Filename required")
return
if cart.path and not helpers.file_is_unreadable(cart.path):
tags = helpers.get_tags(cart.path)
cart.duration = tags["duration"]
cart.enabled = dlg.ui.chkEnabled.isChecked()
cart.name = name
cart.path = path
session.add(cart)
session.commit()
self.cart_configure(cart, btn)
def carts_init(self) -> None:
"""Initialse carts data structures"""
with Session() as session:
# Number carts from 1 for humanity
for cart_number in range(1, Config.CARTS_COUNT + 1):
cart = session.query(Carts).get(cart_number)
if cart is None:
cart = Carts(session, cart_number, name=f"Cart #{cart_number}")
btn = CartButton(self, cart)
btn.clicked.connect(self.cart_click)
# Insert button on left of cart space starting at
# location zero
self.horizontalLayout_Carts.insertWidget(cart.id - 1, btn)
# Configure button
self.cart_configure(cart, btn)
def cart_progressbar(self, btn: CartButton) -> None:
"""Manage progress bar"""
if not btn.duration:
return
ms = 0
btn.pgb.setMaximum(btn.duration)
while ms <= btn.duration:
btn.progress.emit(ms)
ms += 100
sleep(0.1)
def cart_tick(self) -> None:
"""Cart clock actions"""
for i in range(self.horizontalLayout_Carts.count()):
btn = self.horizontalLayout_Carts.itemAt(i).widget()
if not btn:
continue
if btn.is_playing:
if not btn.player.is_playing():
# Cart has finished playing
btn.is_playing = False
btn.setEnabled(True)
# Setting to position 0 doesn't seem to work
btn.player = self.music.VLC.media_player_new(btn.path)
btn.player.audio_set_volume(Config.VOLUME_VLC_DEFAULT)
colour = Config.COLOUR_CART_READY
btn.setStyleSheet("background-color: " + colour + ";\n")
btn.pgb.setValue(0)
def clear_next(self) -> None:
"""
Clear next track
"""
self.next_track = PlaylistTrack()
self.update_headers()
def clear_selection(self) -> None:
"""Clear selected row"""
# Unselect any selected rows
if self.visible_playlist_tab():
self.visible_playlist_tab().clear_selection()
# Clear the search bar
self.search_playlist_clear()
def closeEvent(self, event: Optional[QCloseEvent]) -> None:
"""Handle attempt to close main window"""
if not event:
return
# Don't allow window to close when a track is playing
if self.playing:
event.ignore()
helpers.show_warning(
self, "Track playing", "Can't close application while track is playing"
)
else:
with Session() as session:
settings = Settings.all_as_dict(session)
record = settings["mainwindow_height"]
if record.f_int != self.height():
record.update(session, {"f_int": self.height()})
record = settings["mainwindow_width"]
if record.f_int != self.width():
record.update(session, {"f_int": self.width()})
record = settings["mainwindow_x"]
if record.f_int != self.x():
record.update(session, {"f_int": self.x()})
record = settings["mainwindow_y"]
if record.f_int != self.y():
record.update(session, {"f_int": self.y()})
# Save splitter settings
splitter_sizes = self.splitter.sizes()
assert len(splitter_sizes) == 2
splitter_top, splitter_bottom = splitter_sizes
record = settings["splitter_top"]
if record.f_int != splitter_top:
record.update(session, {"f_int": splitter_top})
record = settings["splitter_bottom"]
if record.f_int != splitter_bottom:
record.update(session, {"f_int": splitter_bottom})
# Save current tab
record = settings["active_tab"]
record.update(session, {"f_int": self.tabPlaylist.currentIndex()})
event.accept()
def close_playlist_tab(self) -> bool:
"""
Close active playlist tab, called by menu item
"""
return self.close_tab(self.tabPlaylist.currentIndex())
def close_tab(self, tab_index: int) -> bool:
"""
Close playlist tab unless it holds the current or next track.
Called from close_playlist_tab() or by clicking close button on tab.
Return True if tab closed else False.
"""
# Don't close current track playlist
if self.tabPlaylist.widget(tab_index) == (self.current_track.playlist_tab):
self.statusbar.showMessage("Can't close current track playlist", 5000)
return False
# Attempt to close next track playlist
if self.tabPlaylist.widget(tab_index) == self.next_track.playlist_tab:
self.next_track.playlist_tab.clear_next()
# Record playlist as closed and update remaining playlist tabs
with Session() as session:
playlist_id = self.tabPlaylist.widget(tab_index).playlist_id
playlist = session.get(Playlists, playlist_id)
if playlist:
playlist.close(session)
# Close playlist and remove tab
self.tabPlaylist.widget(tab_index).close()
self.tabPlaylist.removeTab(tab_index)
return True
def connect_signals_slots(self) -> None:
self.action_About.triggered.connect(self.about)
self.action_Clear_selection.triggered.connect(self.clear_selection)
self.actionDebug.triggered.connect(self.debug)
self.actionClosePlaylist.triggered.connect(self.close_playlist_tab)
self.actionDeletePlaylist.triggered.connect(self.delete_playlist)
self.actionDownload_CSV_of_played_tracks.triggered.connect(
self.download_played_tracks
)
self.actionEnable_controls.triggered.connect(self.enable_play_next_controls)
self.actionExport_playlist.triggered.connect(self.export_playlist_tab)
self.actionFade.triggered.connect(self.fade)
self.actionFind_next.triggered.connect(
lambda: self.tabPlaylist.currentWidget().search_next()
)
self.actionFind_previous.triggered.connect(
lambda: self.tabPlaylist.currentWidget().search_previous()
)
self.actionImport.triggered.connect(self.import_track)
self.actionInsertSectionHeader.triggered.connect(self.insert_header)
self.actionInsertTrack.triggered.connect(self.insert_track)
self.actionMark_for_moving.triggered.connect(self.cut_rows)
self.actionMoveSelected.triggered.connect(self.move_selected)
self.actionNew_from_template.triggered.connect(self.new_from_template)
self.actionNewPlaylist.triggered.connect(self.create_and_show_playlist)
self.actionOpenPlaylist.triggered.connect(self.open_playlist)
self.actionPaste.triggered.connect(self.paste_rows)
self.actionPlay_next.triggered.connect(self.play_next)
self.actionRenamePlaylist.triggered.connect(self.rename_playlist)
self.actionResume.triggered.connect(self.resume)
self.actionSave_as_template.triggered.connect(self.save_as_template)
self.actionSearch_title_in_Songfacts.triggered.connect(
lambda: self.tabPlaylist.currentWidget().lookup_row_in_songfacts()
)
self.actionSearch_title_in_Wikipedia.triggered.connect(
lambda: self.tabPlaylist.currentWidget().lookup_row_in_wikipedia()
)
self.actionSearch.triggered.connect(self.search_playlist)
self.actionSelect_duplicate_rows.triggered.connect(self.select_duplicate_rows)
self.actionSelect_next_track.triggered.connect(self.select_next_row)
self.actionSelect_previous_track.triggered.connect(self.select_previous_row)
self.actionMoveUnplayed.triggered.connect(self.move_unplayed)
self.actionSetNext.triggered.connect(self.set_selected_track_next)
self.actionSkipToNext.triggered.connect(self.play_next)
self.actionStop.triggered.connect(self.stop)
self.btnDrop3db.clicked.connect(self.drop3db)
self.btnFade.clicked.connect(self.fade)
self.btnHidePlayed.clicked.connect(self.hide_played)
self.btnPreview.clicked.connect(self.preview)
self.btnStop.clicked.connect(self.stop)
self.hdrCurrentTrack.clicked.connect(self.show_current)
self.hdrNextTrack.clicked.connect(self.show_next)
self.tabPlaylist.currentChanged.connect(self.tab_change)
self.tabPlaylist.tabCloseRequested.connect(self.close_tab)
self.tabBar = self.tabPlaylist.tabBar()
self.tabBar.tabMoved.connect(self.move_tab)
self.txtSearch.returnPressed.connect(self.search_playlist_return)
self.signals.enable_escape_signal.connect(self.enable_escape)
self.timer10.timeout.connect(self.tick_10ms)
self.timer500.timeout.connect(self.tick_500ms)
self.timer1000.timeout.connect(self.tick_1000ms)
def create_playlist(
self, session: scoped_session, playlist_name: Optional[str] = None
) -> Optional[Playlists]:
"""Create new playlist"""
playlist_name = self.solicit_playlist_name()
if not playlist_name:
return None
playlist = Playlists(session, playlist_name)
return playlist
def create_and_show_playlist(self) -> None:
"""Create new playlist and display it"""
with Session() as session:
playlist = self.create_playlist(session)
if playlist:
self.create_playlist_tab(session, playlist)
def create_playlist_tab(self, session: scoped_session, playlist: Playlists) -> int:
"""
Take the passed playlist database object, create a playlist tab and
add tab to display. Return index number of tab.
"""
assert playlist.id
playlist_tab = PlaylistTab(
musicmuster=self,
playlist_id=playlist.id,
signals=self.signals,
)
idx = self.tabPlaylist.addTab(playlist_tab, playlist.name)
self.tabPlaylist.setCurrentIndex(idx)
return idx
def cut_rows(self) -> None:
"""
Cut rows ready for pasting.
"""
with Session() as session:
# Save the selected PlaylistRows items ready for a later
# paste
self.selected_plrs = self.visible_playlist_tab().get_selected_playlistrows(
session
)
def debug(self):
"""Invoke debugger"""
visible_playlist_id = self.visible_playlist_tab().playlist_id
print(f"Active playlist id={visible_playlist_id}")
import ipdb # type: ignore
ipdb.set_trace()
def delete_playlist(self) -> None:
"""
Delete current playlist
"""
with Session() as session:
playlist_id = self.visible_playlist_tab().playlist_id
playlist = session.get(Playlists, playlist_id)
if playlist:
if helpers.ask_yes_no(
"Delete playlist",
f"Delete playlist '{playlist.name}': " "Are you sure?",
):
if self.close_playlist_tab():
playlist.delete(session)
def disable_play_next_controls(self) -> None:
"""
Disable "play next" keyboard controls
"""
self.actionPlay_next.setEnabled(False)
self.statusbar.showMessage("Play controls: Disabled", 0)
def download_played_tracks(self) -> None:
"""Download a CSV of played tracks"""
dlg = DownloadCSV(self)
if dlg.exec():
start_dt = dlg.ui.dateTimeEdit.dateTime().toPyDateTime()
# Get output filename
pathspec = QFileDialog.getSaveFileName(
self,
"Save CSV of tracks played",
directory="/tmp/playlist.csv",
filter="CSV files (*.csv)",
)
if not pathspec:
return
path = pathspec[0]
if not path.endswith(".csv"):
path += ".csv"
with open(path, "w") as f:
with Session() as session:
for playdate in Playdates.played_after(session, start_dt):
f.write(f"{playdate.track.artist},{playdate.track.title}\n")
def drop3db(self) -> None:
"""Drop music level by 3db if button checked"""
if self.btnDrop3db.isChecked():
self.music.set_volume(Config.VOLUME_VLC_DROP3db, set_default=False)
else:
self.music.set_volume(Config.VOLUME_VLC_DEFAULT, set_default=False)
def enable_escape(self, enabled: bool) -> None:
"""
Manage signal to enable/disable handling ESC character.
Needed because we want to use ESC when editing playlist in place,
so we need to disable it here while editing.
"""
self.action_Clear_selection.setEnabled(enabled)
def enable_play_next_controls(self) -> None:
"""
Enable "play next" keyboard controls
"""
self.actionPlay_next.setEnabled(True)
self.statusbar.showMessage("Play controls: Enabled", 0)
def end_of_track_actions(self) -> None:
"""
Clean up after track played
Actions required:
- Set flag to say we're not playing a track
- Tell playlist_tab track has finished
- Reset PlaylistTrack objects
- Reset clocks
- Reset fade graph
- Update headers
- Enable controls
"""
# Set flag to say we're not playing a track so that timer ticks
# don't see player=None and kick off end-of-track actions
self.playing = False
# Tell playlist_tab track has finished
if self.current_track.playlist_tab:
self.current_track.playlist_tab.play_ended()
# Reset fade graph
self.current_track.fade_graph.clear()
# Reset PlaylistTrack objects
if self.current_track.track_id:
self.previous_track = self.current_track
self.current_track = PlaylistTrack()
# Reset clocks
self.frame_fade.setStyleSheet("")
self.frame_silent.setStyleSheet("")
self.label_elapsed_timer.setText("00:00 / 00:00")
self.label_fade_timer.setText("00:00")
self.label_silent_timer.setText("00:00")
# Update headers
self.update_headers()
# Enable controls
self.enable_play_next_controls()
def export_playlist_tab(self) -> None:
"""Export the current playlist to an m3u file"""
if not self.visible_playlist_tab():
return
playlist_id = self.visible_playlist_tab().playlist_id
with Session() as session:
# Get output filename
playlist = session.get(Playlists, playlist_id)
if not playlist:
return
pathspec = QFileDialog.getSaveFileName(
self,
"Save Playlist",
directory=f"{playlist.name}.m3u",
filter="M3U files (*.m3u);;All files (*.*)",
)
if not pathspec:
return
path = pathspec[0]
if not path.endswith(".m3u"):
path += ".m3u"
# Get list of track rows for this playlist
plrs = PlaylistRows.get_rows_with_tracks(session, playlist_id)
with open(path, "w") as f:
# Required directive on first line
f.write("#EXTM3U\n")
for track in [a.track for a in plrs]:
if track.duration is None:
track.duration = 0
f.write(
"#EXTINF:"
f"{int(track.duration / 1000)},"
f"{track.title} - "
f"{track.artist}"
"\n"
f"{track.path}"
"\n"
)
def fade(self) -> None:
"""Fade currently playing track"""
self.stop_playing(fade=True)
def get_one_track(self, session: scoped_session) -> Optional[Tracks]:
"""Show dialog box to select one track and return it to caller"""
dlg = DbDialog(self, session, get_one_track=True)
if dlg.exec():
return dlg.track
else:
return None
def get_playtime(self) -> int:
"""
Return number of milliseconds current track has been playing or
zero if not playing. The vlc function get_time() only updates 3-4
times a second; this function has much better resolution.
"""
if self.current_track.track_id is None or self.current_track.start_time is None:
return 0
now = datetime.now()
track_start = self.current_track.start_time
elapsed_seconds = (now - track_start).total_seconds()
return int(elapsed_seconds * 1000)
def hide_played(self):
"""Toggle hide played tracks"""
if self.hide_played_tracks:
self.hide_played_tracks = False
self.btnHidePlayed.setText("Hide played")
else:
self.hide_played_tracks = True
self.btnHidePlayed.setText("Show played")
# Update displayed playlist
self.visible_playlist_tab().hide_or_show_played_tracks()
def import_track(self) -> None:
"""Import track file"""
dlg = QFileDialog()
dlg.setFileMode(QFileDialog.FileMode.ExistingFiles)
dlg.setViewMode(QFileDialog.ViewMode.Detail)
dlg.setDirectory(Config.IMPORT_DESTINATION)
dlg.setNameFilter("Music files (*.flac *.mp3)")
if not dlg.exec():
return
with Session() as session:
new_tracks = []
for fname in dlg.selectedFiles():
txt = ""
tags = helpers.get_tags(fname)
title = tags["title"]
artist = tags["artist"]
count = 0
possible_matches = Tracks.search_titles(session, title)
if possible_matches:
txt += "Similar to new track "
txt += f'"{title}" by "{artist} ({fname})":\n\n'
for track in possible_matches:
txt += f' "{track.title}" by {track.artist}'
txt += f" ({track.path})\n\n"
count += 1
if count >= Config.MAX_IMPORT_MATCHES:
txt += "\nThere are more similar-looking tracks"
break
txt += "\n"
# Check whether to proceed if there were potential matches
txt += "Proceed with import?"
result = QMessageBox.question(
self,
"Possible duplicates",
txt,
QMessageBox.StandardButton.Ok,
QMessageBox.StandardButton.Cancel,
)
if result == QMessageBox.StandardButton.Cancel:
continue
new_tracks.append(fname)
# Import in separate thread
self.import_thread = QThread()
self.worker = ImportTrack(
self.visible_playlist_tab(),
new_tracks,
self.visible_playlist_tab().get_new_row_number(),
)
self.worker.moveToThread(self.import_thread)
self.import_thread.started.connect(self.worker.run)
self.worker.finished.connect(self.import_thread.quit)
self.worker.finished.connect(self.worker.deleteLater)
self.import_thread.finished.connect(self.import_thread.deleteLater)
self.worker.import_error.connect(
lambda msg: helpers.show_warning(
self, "Import error", "Error importing " + msg
)
)
self.worker.importing.connect(lambda msg: self.statusbar.showMessage(msg, 5000))
self.worker.finished.connect(self.import_complete)
self.import_thread.start()
def import_complete(self, playlist_tab: PlaylistTab):
"""
Called by thread when track import complete
"""
self.statusbar.showMessage("Imports complete")
def insert_header(self) -> None:
"""Show dialog box to enter header text and add to playlist"""
try:
playlist_tab = self.visible_playlist_tab()
except AttributeError:
# Just return if there's no visible playlist tab
return
# Get header text
dlg: QInputDialog = QInputDialog(self)
dlg.setInputMode(QInputDialog.InputMode.TextInput)
dlg.setLabelText("Header text:")
dlg.resize(500, 100)
ok = dlg.exec()
if ok:
with Session() as session:
playlist_tab.insert_header(session, dlg.textValue())
playlist_tab.save_playlist(session)
def insert_track(self) -> None:
"""Show dialog box to select and add track from database"""
with Session() as session:
dlg = DbDialog(self, session, get_one_track=False)
dlg.exec()
def load_last_playlists(self) -> None:
"""Load the playlists that were open when the last session closed"""
with Session() as session:
for playlist in Playlists.get_open(session):
if playlist:
_ = self.create_playlist_tab(session, playlist)
# Set active tab
record = Settings.get_int_settings(session, "active_tab")
if record.f_int and record.f_int >= 0:
self.tabPlaylist.setCurrentIndex(record.f_int)
def move_playlist_rows(
self, session: scoped_session, playlistrows: Sequence[PlaylistRows]
) -> None:
"""
Move passed playlist rows to another playlist
Actions required:
- exclude current/next tracks from being moved
- identify destination playlist
- update playlist for the rows in the database
- remove them from the display
- update destination playlist display if loaded
"""
# Remove current/next rows from list
plrs_to_move = [
plr
for plr in playlistrows
if plr.id not in [self.current_track.plr_id, self.next_track.plr_id]
]
rows_to_delete = [
plr.plr_rownum for plr in plrs_to_move if plr.plr_rownum is not None
]
if not rows_to_delete:
return
# Identify destination playlist
playlists = []
visible_tab = self.visible_playlist_tab()
source_playlist_id = visible_tab.playlist_id
for playlist in Playlists.get_all(session):
if playlist.id == source_playlist_id:
continue
else:
playlists.append(playlist)
dlg = SelectPlaylistDialog(self, playlists=playlists, session=session)
dlg.exec()
if not dlg.playlist:
return
destination_playlist_id = dlg.playlist.id
# Update destination playlist in the database
last_row = PlaylistRows.get_last_used_row(session, destination_playlist_id)
if last_row is not None:
next_row = last_row + 1
else:
next_row = 0
for plr in plrs_to_move:
plr.plr_rownum = next_row
next_row += 1
plr.playlist_id = destination_playlist_id
# Reset played as it's not been played on this playlist
plr.played = False
session.commit()
# Remove moved rows from display and save visible playlist
visible_tab.remove_rows(rows_to_delete)
visible_tab.save_playlist(session)
# Disable sort undo
self.sort_undo: List[int] = []
# Update destination playlist_tab if visible (if not visible, it
# will be re-populated when it is opened)
destination_playlist_tab = None
for tab in range(self.tabPlaylist.count()):
if self.tabPlaylist.widget(tab).playlist_id == dlg.playlist.id:
destination_playlist_tab = self.tabPlaylist.widget(tab)
break
if destination_playlist_tab:
destination_playlist_tab.populate_display(session, dlg.playlist.id)
def move_selected(self) -> None:
"""
Move selected rows to another playlist
"""
with Session() as session:
selected_plrs = self.visible_playlist_tab().get_selected_playlistrows(
session
)
if not selected_plrs:
return
self.move_playlist_rows(session, selected_plrs)
def move_tab(self, frm: int, to: int) -> None:
"""Handle tabs being moved"""
with Session() as session:
Playlists.move_tab(session, frm, to)
def move_unplayed(self) -> None:
"""
Move unplayed rows to another playlist
"""
playlist_id = self.visible_playlist_tab().playlist_id
with Session() as session:
unplayed_plrs = PlaylistRows.get_unplayed_rows(session, playlist_id)
if helpers.ask_yes_no(
"Move tracks", f"Move {len(unplayed_plrs)} tracks:" " Are you sure?"
):
self.move_playlist_rows(session, unplayed_plrs)
def new_from_template(self) -> None:
"""Create new playlist from template"""
with Session() as session:
templates = Playlists.get_all_templates(session)
dlg = SelectPlaylistDialog(self, playlists=templates, session=session)
dlg.exec()
template = dlg.playlist
if template:
playlist_name = self.solicit_playlist_name()
if not playlist_name:
return
playlist = Playlists.create_playlist_from_template(
session, template, playlist_name
)
if not playlist:
return
tab_index = self.create_playlist_tab(session, playlist)
playlist.mark_open(session, tab_index)
def open_playlist(self):
"""Open existing playlist"""
with Session() as session:
playlists = Playlists.get_closed(session)
dlg = SelectPlaylistDialog(self, playlists=playlists, session=session)
dlg.exec()
playlist = dlg.playlist
if playlist:
tab_index = self.create_playlist_tab(session, playlist)
playlist.mark_open(session, tab_index)
def paste_rows(self) -> None:
"""
Paste earlier cut rows.
Process:
- ensure we have some cut rows
- if not pasting at end of playlist, move later rows down
- update plrs with correct playlist and row
- if moving between playlists: renumber source playlist rows
- else: check integrity of playlist rows
"""
if not self.selected_plrs:
return
playlist_tab = self.visible_playlist_tab()
dst_playlist_id = playlist_tab.playlist_id
dst_row = self.visible_playlist_tab().get_new_row_number()
with Session() as session:
# Create space in destination playlist
PlaylistRows.move_rows_down(
session, dst_playlist_id, dst_row, len(self.selected_plrs)
)
session.commit()
# Update plrs
row = dst_row
src_playlist_id = None
for plr in self.selected_plrs:
# Update moved rows
session.add(plr)
if not src_playlist_id:
src_playlist_id = plr.playlist_id
plr.playlist_id = dst_playlist_id
plr.plr_rownum = row
row += 1
if not src_playlist_id:
return
session.flush()
# Update display
self.visible_playlist_tab().populate_display(
session, dst_playlist_id, scroll_to_top=False
)
# If source playlist is not destination playlist, fixup row
# numbers and update display
if src_playlist_id != dst_playlist_id:
PlaylistRows.fixup_rownumbers(session, src_playlist_id)
# Update source playlist_tab if visible (if not visible, it
# will be re-populated when it is opened)
source_playlist_tab = None
for tab in range(self.tabPlaylist.count()):
if self.tabPlaylist.widget(tab).playlist_id == src_playlist_id:
source_playlist_tab = self.tabPlaylist.widget(tab)
break
if source_playlist_tab:
source_playlist_tab.populate_display(
session, src_playlist_id, scroll_to_top=False
)
# Reset so rows can't be repasted
self.selected_plrs = None
def play_next(self, position: Optional[float] = None) -> None:
"""
Play next track, optionally from passed position.
Actions required:
- If there is no next track set, return.
- If there's currently a track playing, fade it.
- Move next track to current track.
- Ensure playlist tabs are the correct colour
- Restore volume if -3dB active
- Play (new) current track.
- Tell database to record it as played
- Tell playlist track is now playing
- Note that track is now playing
- Disable play next controls
- Update headers
- Update clocks
"""
# If there is no next track set, return.
if not self.next_track.track_id:
log.debug("musicmuster.play_next(): no next track selected")
return
with Session() as session:
# If there's currently a track playing, fade it.
self.stop_playing(fade=True)
# Move next track to current track.
# stop_playing() above has called end_of_track_actions()
# which will have populated self.previous_track
self.current_track = self.next_track
self.clear_next()
if not self.current_track.track_id:
log.debug("musicmuster.play_next(): no id for next track")
return
if not self.current_track.path:
log.debug("musicmuster.play_next(): no path for next track")
return
# Set current track playlist_tab colour
current_tab = self.current_track.playlist_tab
if current_tab:
self.set_tab_colour(current_tab, QColor(Config.COLOUR_CURRENT_TAB))
# Restore volume if -3dB active
if self.btnDrop3db.isChecked():
self.btnDrop3db.setChecked(False)
# Play (new) current track
self.current_track.start()
self.music.play(self.current_track.path, position)
# For as-yet unknown reasons. sometimes the volume gets
# reset to zero within 200mS or so of starting play. This
# only happened since moving to Debian 12, which uses
# Pipewire for sound (which may be irrelevant).
for _ in range(3):
if self.music.player:
volume = self.music.player.audio_get_volume()
if volume < Config.VOLUME_VLC_DEFAULT:
self.music.set_volume()
log.error(f"Reset from {volume=}")
break
sleep(0.1)
# Show closing volume graph
self.current_track.fade_graph.plot()
# Tell database to record it as played
Playdates(session, self.current_track.track_id)
# Tell playlist track is now playing
if self.current_track.playlist_tab:
self.current_track.playlist_tab.play_started(session)
# Note that track is now playing
self.playing = True
# Disable play next controls
self.disable_play_next_controls()
# Update headers
self.update_headers()
def preview(self) -> None:
"""
Preview selected or next track. We use a different mechanism to
normal track playing so that the user can route the output audio
differently (eg, to headphones).
"""
if self.btnPreview.isChecked():
# Get track path for first selected track if there is one
track_path = self.visible_playlist_tab().get_selected_row_track_path()
if not track_path:
# Otherwise get path to next track to play
track_path = self.next_track.path
if not track_path:
self.btnPreview.setChecked(False)
return
mixer.music.load(track_path)
mixer.music.play()
else:
mixer.music.stop()
def rename_playlist(self) -> None:
"""
Rename current playlist
"""
with Session() as session:
playlist_id = self.visible_playlist_tab().playlist_id
playlist = session.get(Playlists, playlist_id)
if playlist:
new_name = self.solicit_playlist_name(playlist.name)
if new_name:
playlist.rename(session, new_name)
idx = self.tabBar.currentIndex()
self.tabBar.setTabText(idx, new_name)
def resume(self) -> None:
"""
Resume playing last track. We may be playing the next track
or none; take care of both eventualities.
Actions required:
- Return if no saved position
- Resume last track
- If a track is playing, make that the next track
"""
# Return if no saved position
if not self.previous_track_position:
return
# Note any playing track as this will become the next track
playing_track = None
if self.current_track.track_id:
playing_track = self.current_track
# Set next plr to be track to resume
if not self.previous_track.plr_id:
return
if not self.previous_track.playlist_tab:
return
# Resume last track
self.set_next_plr_id(
self.previous_track.plr_id, self.previous_track.playlist_tab
)
self.play_next(self.previous_track_position)
# Adjust track info so that clocks and graph are correct.
# Easiest way is to fake the start time.
if self.current_track.start_time and self.current_track.duration:
elapsed_ms = self.current_track.duration * self.previous_track_position
self.current_track.start_time -= timedelta(milliseconds=elapsed_ms)
# If a track was playing when we were called, get details to
# set it as the next track
if playing_track:
if not playing_track.plr_id:
return
if not playing_track.playlist_tab:
return
self.set_next_plr_id(playing_track.plr_id, playing_track.playlist_tab)
def save_as_template(self) -> None:
"""Save current playlist as template"""
with Session() as session:
template_names = [a.name for a in Playlists.get_all_templates(session)]
while True:
# Get name for new template
dlg = QInputDialog(self)
dlg.setInputMode(QInputDialog.InputMode.TextInput)
dlg.setLabelText("Template name:")
dlg.resize(500, 100)
ok = dlg.exec()
if not ok:
return
template_name = dlg.textValue()
if template_name not in template_names:
break
helpers.show_warning(
self, "Duplicate template", "Template name already in use"
)
Playlists.save_as_template(
session, self.visible_playlist_tab().playlist_id, template_name
)
helpers.show_OK(self, "Template", "Template saved")
def search_playlist(self) -> None:
"""Show text box to search playlist"""
# Disable play controls so that 'return' in search box doesn't
# play next track
self.disable_play_next_controls()
self.txtSearch.setHidden(False)
self.txtSearch.setFocus()
# Select any text that may already be there
self.txtSearch.selectAll()
def search_playlist_clear(self) -> None:
"""Tidy up and reset search bar"""
# Clear the search text
self.visible_playlist_tab().set_search("")
# Clean up search bar
self.txtSearch.setText("")
self.txtSearch.setHidden(True)
def search_playlist_return(self) -> None:
"""Initiate search when return pressed"""
self.visible_playlist_tab().set_search(self.txtSearch.text())
self.enable_play_next_controls()
def select_duplicate_rows(self) -> None:
"""
Select the last of any rows with duplicate tracks in current playlist.
This allows the selection to typically come towards the end of the playlist away
from any show specific sections.
If there a track is selected on three or more rows, only the last one is selected.
"""
visible_playlist_id = self.visible_playlist_tab().playlist_id
# Get row number of duplicate rows
sql = text(f"""
SELECT max(plr_rownum)
FROM playlist_rows
WHERE playlist_id = {visible_playlist_id}
AND track_id != 0
GROUP BY track_id
HAVING count(id) > 1
""")
with Session() as session:
row_numbers = [int(a) for a in session.execute(sql).scalars().all()]
if row_numbers:
self.visible_playlist_tab().select_rows(row_numbers)
self.statusbar.showMessage(f"{len(row_numbers)} duplicate rows selected", 10000)
def select_next_row(self) -> None:
"""Select next or first row in playlist"""
self.visible_playlist_tab().select_next_row()
def select_previous_row(self) -> None:
"""Select previous or first row in playlist"""
self.visible_playlist_tab().select_previous_row()
def set_main_window_size(self) -> None:
"""Set size of window from database"""
with Session() as session:
settings = Settings.all_as_dict(session)
record = settings["mainwindow_x"]
x = record.f_int or 1
record = settings["mainwindow_y"]
y = record.f_int or 1
record = settings["mainwindow_width"]
width = record.f_int or 1599
record = settings["mainwindow_height"]
height = record.f_int or 981
self.setGeometry(x, y, width, height)
record = settings["splitter_top"]
splitter_top = record.f_int or 256
record = settings["splitter_bottom"]
splitter_bottom = record.f_int or 256
self.splitter.setSizes([splitter_top, splitter_bottom])
return
def set_tab_colour(self, widget: PlaylistTab, colour: QColor) -> None:
"""
Find the tab containing the widget and set the text colour
"""
idx = self.tabPlaylist.indexOf(widget)
self.tabPlaylist.tabBar().setTabTextColor(idx, colour)
def show_current(self) -> None:
"""Scroll to show current track"""
if self.current_track.playlist_tab != self.visible_playlist_tab():
self.tabPlaylist.setCurrentWidget(self.current_track.playlist_tab)
self.tabPlaylist.currentWidget().scroll_current_to_top()
def show_next(self) -> None:
"""Scroll to show next track"""
if self.next_track.playlist_tab != self.visible_playlist_tab():
self.tabPlaylist.setCurrentWidget(self.next_track.playlist_tab)
self.tabPlaylist.currentWidget().scroll_next_to_top()
def solicit_playlist_name(self, default: Optional[str] = "") -> Optional[str]:
"""Get name of playlist from user"""
dlg = QInputDialog(self)
dlg.setInputMode(QInputDialog.InputMode.TextInput)
dlg.setLabelText("Playlist name:")
if default:
dlg.setTextValue(default)
dlg.resize(500, 100)
ok = dlg.exec()
if ok:
return dlg.textValue()
else:
return None
def stop(self) -> None:
"""Stop playing immediately"""
self.stop_playing(fade=False)
def stop_playing(self, fade=True) -> None:
"""
Stop playing current track
Actions required:
- Return if not playing
- Stop/fade track
- Reset playlist_tab colour
- Run end-of-track actions
"""
# Return if not playing
if not self.playing:
return
# Stop/fade track
self.previous_track_position = self.music.get_position()
if fade:
self.music.fade()
else:
self.music.stop()
# Reset playlist_tab colour
if self.current_track.playlist_tab:
if self.current_track.playlist_tab == self.next_track.playlist_tab:
self.set_tab_colour(
self.current_track.playlist_tab, QColor(Config.COLOUR_NEXT_TAB)
)
else:
self.set_tab_colour(
self.current_track.playlist_tab, QColor(Config.COLOUR_NORMAL_TAB)
)
# Run end-of-track actions
self.end_of_track_actions()
def tab_change(self):
"""Called when active tab changed"""
try:
self.tabPlaylist.currentWidget().tab_visible()
except AttributeError:
# May also be called when last tab is closed
pass
def set_selected_track_next(self) -> None:
"""
Set currently-selected row on visible playlist tab as next track
"""
playlist_tab = self.visible_playlist_tab()
selected_plr_ids = playlist_tab.get_selected_playlistrow_ids()
if len(selected_plr_ids) != 1:
log.error(f"set_next_track:_from_mm {selected_plr_ids=}")
return
self.set_next_plr_id(selected_plr_ids[0], playlist_tab)
def set_next_plr_id(
self, next_plr_id: Optional[int], playlist_tab: PlaylistTab
) -> None:
"""
Set passed plr_id as next track to play, or clear next track if None
Actions required:
- Update self.next_track PlaylistTrack structure
- Tell playlist tabs to update their 'next track' highlighting
- Update headers
- Set playlist tab colours
- Populate info tabs
"""
with Session() as session:
# Update self.next_track PlaylistTrack structure
old_next_track = self.next_track
self.next_track = PlaylistTrack()
if next_plr_id:
next_plr = session.get(PlaylistRows, next_plr_id)
if next_plr:
self.next_track.set_plr(session, next_plr, playlist_tab)
# Tell playlist tabs to update their 'next track' highlighting
# Args must both be ints, so use zero for no next track
self.signals.set_next_track_signal.emit(
old_next_track.plr_id, next_plr_id or 0
)
# Update headers
self.update_headers()
# Set playlist tab colours
self._set_next_track_playlist_tab_colours(old_next_track)
if next_plr_id:
# Populate 'info' tabs with Wikipedia info, but queue it
# because it isn't quick
if self.next_track.title:
QTimer.singleShot(
0,
lambda: self.tabInfolist.open_in_wikipedia(
self.next_track.title
),
)
def _set_next_track_playlist_tab_colours(
self, old_next_track: Optional[PlaylistTrack]
) -> None:
"""
Set playlist tab colour for next track. self.next_track needs
to be set before calling.
"""
# If the original next playlist tab isn't the same as the
# new one or the current track, it needs its colour reset.
if (
old_next_track
and old_next_track.playlist_tab
and old_next_track.playlist_tab
not in [self.next_track.playlist_tab, self.current_track.playlist_tab]
):
self.set_tab_colour(
old_next_track.playlist_tab, QColor(Config.COLOUR_NORMAL_TAB)
)
# If the new next playlist tab isn't the same as the
# old one or the current track, it needs its colour set.
if old_next_track:
old_tab = old_next_track.playlist_tab
else:
old_tab = None
if (
self.next_track
and self.next_track.playlist_tab
and self.next_track.playlist_tab
not in [old_tab, self.current_track.playlist_tab]
):
self.set_tab_colour(
self.next_track.playlist_tab, QColor(Config.COLOUR_NEXT_TAB)
)
def tick_10ms(self) -> None:
"""
Called every 10ms
"""
# Update volume fade curve
if (
self.current_track.track_id
and self.current_track.fade_graph
and self.current_track.start_time
):
play_time = (
datetime.now() - self.current_track.start_time
).total_seconds() * 1000
self.current_track.fade_graph.tick(play_time)
def tick_500ms(self) -> None:
"""
Called every 500ms
"""
self.lblTOD.setText(datetime.now().strftime(Config.TOD_TIME_FORMAT))
# Update carts
self.cart_tick()
def tick_1000ms(self) -> None:
"""
Called every 1000ms
"""
# Ensure preview button is reset if preview finishes playing
self.btnPreview.setChecked(mixer.music.get_busy())
# Only update play clocks once a second so that their updates
# are synchronised (otherwise it looks odd)
if not self.playing:
return
# If track is playing, update track clocks time and colours
# There is a discrete time between starting playing a track and
# player.is_playing() returning True, so assume playing if less
# than Config.PLAY_SETTLE microseconds have passed since
# starting play.
if (
self.music.player
and self.current_track.start_time
and (
self.music.player.is_playing()
or (datetime.now() - self.current_track.start_time)
< timedelta(microseconds=Config.PLAY_SETTLE)
)
):
playtime = self.get_playtime()
time_to_fade = self.current_track.fade_at - playtime
time_to_silence = self.current_track.silence_at - playtime
# Elapsed time
self.label_elapsed_timer.setText(
helpers.ms_to_mmss(playtime)
+ " / "
+ helpers.ms_to_mmss(self.current_track.duration)
)
# Time to fade
self.label_fade_timer.setText(helpers.ms_to_mmss(time_to_fade))
# If silent in the next 5 seconds, put warning colour on
# time to silence box and enable play controls
if time_to_silence <= 5500:
css_silence = f"background: {Config.COLOUR_ENDING_TIMER}"
if self.frame_silent.styleSheet() != css_silence:
self.frame_silent.setStyleSheet(css_silence)
self.enable_play_next_controls()
# Set warning colour on time to silence box when fade starts
elif time_to_fade <= 500:
css_fade = f"background: {Config.COLOUR_WARNING_TIMER}"
if self.frame_silent.styleSheet() != css_fade:
self.frame_silent.setStyleSheet(css_fade)
# Five seconds before fade starts, set warning colour on
# time to silence box and enable play controls
elif time_to_fade <= 5500:
self.frame_fade.setStyleSheet(
f"background: {Config.COLOUR_WARNING_TIMER}"
)
self.enable_play_next_controls()
else:
self.frame_silent.setStyleSheet("")
self.frame_fade.setStyleSheet("")
self.label_silent_timer.setText(helpers.ms_to_mmss(time_to_silence))
# Autoplay next track
# if time_to_silence <= 1500:
# self.play_next()
else:
if self.playing:
self.stop_playing()
def update_headers(self) -> None:
"""
Update last / current / next track headers
"""
if self.previous_track.title and self.previous_track.artist:
self.hdrPreviousTrack.setText(
f"{self.previous_track.title} - {self.previous_track.artist}"
)
else:
self.hdrPreviousTrack.setText("")
if self.current_track.title and self.current_track.artist:
self.hdrCurrentTrack.setText(
f"{self.current_track.title.replace('&', '&&')} - "
f"{self.current_track.artist.replace('&', '&&')}"
)
else:
self.hdrCurrentTrack.setText("")
if self.next_track.title and self.next_track.artist:
self.hdrNextTrack.setText(
f"{self.next_track.title.replace('&', '&&')} - "
f"{self.next_track.artist.replace('&', '&&')}"
)
else:
self.hdrNextTrack.setText("")
class CartDialog(QDialog):
"""Edit cart details"""
def __init__(
self, musicmuster: Window, session: scoped_session, cart: Carts, *args, **kwargs
) -> None:
"""
Manage carts
"""
super().__init__(*args, **kwargs)
self.musicmuster = musicmuster
self.session = session
self.ui = Ui_DialogCartEdit()
self.ui.setupUi(self)
self.path = cart.path
self.ui.lblPath.setText(self.path)
self.ui.lineEditName.setText(cart.name)
self.ui.chkEnabled.setChecked(cart.enabled)
self.setWindowTitle("Edit Cart " + str(cart.id))
self.ui.btnFile.clicked.connect(self.choose_file)
def choose_file(self) -> None:
"""File select"""
dlg = QFileDialog()
dlg.setFileMode(QFileDialog.FileMode.ExistingFile)
dlg.setViewMode(QFileDialog.ViewMode.Detail)
dlg.setDirectory(Config.CART_DIRECTORY)
dlg.setNameFilter("Music files (*.flac *.mp3)")
if dlg.exec():
self.path = dlg.selectedFiles()[0]
self.ui.lblPath.setText(self.path)
class DbDialog(QDialog):
"""Select track from database"""
def __init__(
self,
musicmuster: Window,
session: scoped_session,
get_one_track: bool = False,
*args,
**kwargs,
) -> None:
"""
Subclassed QDialog to manage track selection
If get_one_track is True, return after first track selection
with that track in ui.track. Otherwise, allow multiple tracks
to be added to the playlist.
"""
super().__init__(*args, **kwargs)
self.musicmuster = musicmuster
self.session = session
self.get_one_track = get_one_track
self.ui = Ui_Dialog()
self.ui.setupUi(self)
self.ui.btnAdd.clicked.connect(self.add_selected)
self.ui.btnAddClose.clicked.connect(self.add_selected_and_close)
self.ui.btnClose.clicked.connect(self.close)
self.ui.matchList.itemDoubleClicked.connect(self.double_click)
self.ui.matchList.itemSelectionChanged.connect(self.selection_changed)
self.ui.radioTitle.toggled.connect(self.title_artist_toggle)
self.ui.searchString.textEdited.connect(self.chars_typed)
self.track: Optional[Tracks] = None
if get_one_track:
self.ui.txtNote.hide()
self.ui.lblNote.hide()
record = Settings.get_int_settings(self.session, "dbdialog_width")
width = record.f_int or 800
record = Settings.get_int_settings(self.session, "dbdialog_height")
height = record.f_int or 600
self.resize(width, height)
def __del__(self) -> None:
"""Save dialog size and position"""
# FIXME:
# if record.f_int != self.height():
# ^^^^^^^^^^^^^
# RuntimeError: wrapped C/C++ object of type DbDialog has been deleted
record = Settings.get_int_settings(self.session, "dbdialog_height")
if record.f_int != self.height():
record.update(self.session, {"f_int": self.height()})
record = Settings.get_int_settings(self.session, "dbdialog_width")
if record.f_int != self.width():
record.update(self.session, {"f_int": self.width()})
def add_selected(self) -> None:
"""Handle Add button"""
track = None
if self.ui.matchList.selectedItems():
item = self.ui.matchList.currentItem()
if item:
track = item.data(Qt.ItemDataRole.UserRole)
note = self.ui.txtNote.text()
if not note and not track:
return
self.add_track(track, self.ui.txtNote.text())
def add_selected_and_close(self) -> None:
"""Handle Add and Close button"""
self.add_selected()
self.accept()
def add_track(self, track: Optional[Tracks], note: str) -> None:
"""Add passed track to playlist on screen"""
if self.get_one_track:
self.track = track
self.accept()
return
if track:
self.musicmuster.visible_playlist_tab().insert_track(
self.session, track, note
)
else:
self.musicmuster.visible_playlist_tab().insert_header(self.session, note)
self.ui.txtNote.clear()
self.select_searchtext()
def chars_typed(self, s: str) -> None:
"""Handle text typed in search box"""
self.ui.matchList.clear()
if len(s) > 0:
if self.ui.radioTitle.isChecked():
matches = Tracks.search_titles(self.session, "%" + s)
else:
matches = Tracks.search_artists(self.session, "%" + s)
if matches:
for track in matches:
last_played = None
last_playdate = max(
track.playdates, key=lambda p: p.lastplayed, default=None
)
if last_playdate:
last_played = last_playdate.lastplayed
t = QListWidgetItem()
track_text = (
f"{track.title} - {track.artist} "
f"[{helpers.ms_to_mmss(track.duration)}] "
f"({helpers.get_relative_date(last_played)})"
)
t.setText(track_text)
t.setData(Qt.ItemDataRole.UserRole, track)
self.ui.matchList.addItem(t)
def double_click(self, entry: QListWidgetItem) -> None:
"""Add items that are double-clicked"""
track = entry.data(Qt.ItemDataRole.UserRole)
note = self.ui.txtNote.text()
self.add_track(track, note)
def keyPressEvent(self, event):
"""
Clear selection on ESC if there is one
"""
if event.key() == Qt.Key.Key_Escape:
if self.ui.matchList.selectedItems():
self.ui.matchList.clearSelection()
return
super(DbDialog, self).keyPressEvent(event)
def select_searchtext(self) -> None:
"""Select the searchbox"""
self.ui.searchString.selectAll()
self.ui.searchString.setFocus()
def selection_changed(self) -> None:
"""Display selected track path in dialog box"""
if not self.ui.matchList.selectedItems():
return
item = self.ui.matchList.currentItem()
track = item.data(Qt.ItemDataRole.UserRole)
last_playdate = max(track.playdates, key=lambda p: p.lastplayed, default=None)
if last_playdate:
last_played = last_playdate.lastplayed
else:
last_played = None
path_text = f"{track.path} ({helpers.get_relative_date(last_played)})"
self.ui.dbPath.setText(path_text)
def title_artist_toggle(self) -> None:
"""
Handle switching between searching for artists and searching for
titles
"""
# Logic is handled already in chars_typed(), so just call that.
self.chars_typed(self.ui.searchString.text())
class DownloadCSV(QDialog):
def __init__(self, parent=None):
super().__init__()
self.ui = Ui_DateSelect()
self.ui.setupUi(self)
self.ui.dateTimeEdit.setDate(QDate.currentDate())
self.ui.dateTimeEdit.setTime(QTime(19, 59, 0))
self.ui.buttonBox.accepted.connect(self.accept)
self.ui.buttonBox.rejected.connect(self.reject)
class SelectPlaylistDialog(QDialog):
def __init__(self, parent=None, playlists=None, session=None):
super().__init__()
if playlists is None:
return
self.ui = Ui_dlgSelectPlaylist()
self.ui.setupUi(self)
self.ui.lstPlaylists.itemDoubleClicked.connect(self.list_doubleclick)
self.ui.buttonBox.accepted.connect(self.open)
self.ui.buttonBox.rejected.connect(self.close)
self.session = session
self.playlist = None
record = Settings.get_int_settings(self.session, "select_playlist_dialog_width")
width = record.f_int or 800
record = Settings.get_int_settings(
self.session, "select_playlist_dialog_height"
)
height = record.f_int or 600
self.resize(width, height)
for playlist in playlists:
p = QListWidgetItem()
p.setText(playlist.name)
p.setData(Qt.ItemDataRole.UserRole, playlist)
self.ui.lstPlaylists.addItem(p)
def __del__(self): # review
record = Settings.get_int_settings(
self.session, "select_playlist_dialog_height"
)
if record.f_int != self.height():
record.update(self.session, {"f_int": self.height()})
record = Settings.get_int_settings(self.session, "select_playlist_dialog_width")
if record.f_int != self.width():
record.update(self.session, {"f_int": self.width()})
def list_doubleclick(self, entry): # review
self.playlist = entry.data(Qt.ItemDataRole.UserRole)
self.accept()
def open(self): # review
if self.ui.lstPlaylists.selectedItems():
item = self.ui.lstPlaylists.currentItem()
self.playlist = item.data(Qt.ItemDataRole.UserRole)
self.accept()
if __name__ == "__main__":
"""
If command line arguments given, carry out requested function and
exit. Otherwise run full application.
"""
p = argparse.ArgumentParser()
# Only allow at most one option to be specified
group = p.add_mutually_exclusive_group()
group.add_argument(
"-b",
"--bitrates",
action="store_true",
dest="update_bitrates",
default=False,
help="Update bitrates in database",
)
group.add_argument(
"-c",
"--check-database",
action="store_true",
dest="check_db",
default=False,
help="Check and report on database",
)
args = p.parse_args()
# Run as required
if args.check_db:
log.debug("Updating database")
with Session() as session:
check_db(session)
engine.dispose()
elif args.update_bitrates:
log.debug("Update bitrates")
with Session() as session:
update_bitrates(session)
engine.dispose()
else:
try:
Base.metadata.create_all(engine)
app = QApplication(sys.argv)
# PyQt6 defaults to a grey for labels
palette = app.palette()
palette.setColor(
QPalette.ColorRole.WindowText, QColor(Config.COLOUR_LABEL_TEXT)
)
# Set colours that will be used by playlist row stripes
palette.setColor(
QPalette.ColorRole.Base, QColor(Config.COLOUR_EVEN_PLAYLIST)
)
palette.setColor(
QPalette.ColorRole.AlternateBase, QColor(Config.COLOUR_ODD_PLAYLIST)
)
app.setPalette(palette)
win = Window()
win.show()
status = app.exec()
engine.dispose()
sys.exit(status)
except Exception as exc:
if os.environ["MM_ENV"] == "PRODUCTION":
from helpers import send_mail
msg = stackprinter.format(exc)
send_mail(
Config.ERRORS_TO,
Config.ERRORS_FROM,
"Exception from musicmuster",
msg,
)
else:
print("\033[1;31;47mUnhandled exception starts")
print(
stackprinter.format(
exc, suppressed_paths=["/pypoetry/virtualenvs/"], style="darkbg"
)
)
print("Unhandled exception ends\033[1;37;40m")