From 6e258a0ee2fbdd81b795fb3bce02a0737a6114b6 Mon Sep 17 00:00:00 2001 From: Keith Edmunds Date: Sun, 22 Dec 2024 15:14:00 +0000 Subject: [PATCH] Split music_manager from classes --- app/classes.py | 746 ++----------------------------------------- app/music_manager.py | 746 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 767 insertions(+), 725 deletions(-) create mode 100644 app/music_manager.py diff --git a/app/classes.py b/app/classes.py index c9750fc..3ed90ea 100644 --- a/app/classes.py +++ b/app/classes.py @@ -2,12 +2,13 @@ from __future__ import annotations import ctypes -from dataclasses import dataclass, field +from dataclasses import dataclass import datetime as dt from enum import auto, Enum +import functools import platform from time import sleep -from typing import Any, Optional, NamedTuple +from typing import Optional, NamedTuple # Third party imports import numpy as np @@ -29,44 +30,8 @@ from pyqtgraph.graphicsItems.LinearRegionItem import LinearRegionItem # type: i from config import Config from log import log from models import PlaylistRows -from helpers import ( - file_is_unreadable, - get_audio_segment, - show_warning, - singleton, -) from vlcmanager import VLCManager -# Define the VLC callback function type -# VLC logging is very noisy so comment out unless needed -# VLC_LOG_CB = ctypes.CFUNCTYPE( -# None, -# ctypes.c_void_p, -# ctypes.c_int, -# ctypes.c_void_p, -# ctypes.c_char_p, -# ctypes.c_void_p, -# ) - -# Determine the correct C library for vsnprintf based on the platform -if platform.system() == "Windows": - libc = ctypes.CDLL("msvcrt") -elif platform.system() == "Linux": - libc = ctypes.CDLL("libc.so.6") -elif platform.system() == "Darwin": # macOS - libc = ctypes.CDLL("libc.dylib") -else: - raise OSError("Unsupported operating system") - -# Define the vsnprintf function -libc.vsnprintf.argtypes = [ - ctypes.c_char_p, - ctypes.c_size_t, - ctypes.c_char_p, - ctypes.c_void_p, -] -libc.vsnprintf.restype = ctypes.c_int - class Col(Enum): START_GAP = 0 @@ -81,362 +46,25 @@ class Col(Enum): NOTE = auto() -class _AddFadeCurve(QObject): +def singleton(cls): """ - Initialising a fade curve introduces a noticeable delay so carry out in - a thread. + Make a class a Singleton class (see + https://realpython.com/primer-on-python-decorators/#creating-singletons) """ - finished = pyqtSignal() + @functools.wraps(cls) + def wrapper_singleton(*args, **kwargs): + if not wrapper_singleton.instance: + wrapper_singleton.instance = cls(*args, **kwargs) + return wrapper_singleton.instance - def __init__( - self, - rat: RowAndTrack, - track_path: str, - track_fade_at: int, - track_silence_at: int, - ) -> None: - super().__init__() - self.rat = rat - self.track_path = track_path - self.track_fade_at = track_fade_at - self.track_silence_at = track_silence_at + wrapper_singleton.instance = None + return wrapper_singleton - def run(self) -> None: - """ - Create fade curve and add to PlaylistTrack object - """ - fc = _FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at) - if not fc: - log.error(f"Failed to create FadeCurve for {self.track_path=}") - else: - self.rat.fade_graph = fc - self.finished.emit() - - -class _FadeCurve: - GraphWidget: Optional[PlotWidget] = None - - def __init__( - self, track_path: str, track_fade_at: int, track_silence_at: int - ) -> None: - """ - Set up fade graph array - """ - - audio = get_audio_segment(track_path) - if not audio: - log.error(f"FadeCurve: could not get audio for {track_path=}") - return None - - # Start point of curve is Config.FADE_CURVE_MS_BEFORE_FADE - # milliseconds before fade starts to silence - self.start_ms: int = max( - 0, track_fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1 - ) - self.end_ms: int = track_silence_at - audio_segment = audio[self.start_ms : self.end_ms] - self.graph_array = np.array(audio_segment.get_array_of_samples()) - - # Calculate the factor to map milliseconds of track to array - self.ms_to_array_factor = len(self.graph_array) / (self.end_ms - self.start_ms) - - self.curve: Optional[PlotDataItem] = None - self.region: Optional[LinearRegionItem] = None - - def clear(self) -> None: - """Clear the current graph""" - - if self.GraphWidget: - self.GraphWidget.clear() - - def plot(self) -> None: - if self.GraphWidget: - self.curve = self.GraphWidget.plot(self.graph_array) - if self.curve: - self.curve.setPen(Config.FADE_CURVE_FOREGROUND) - else: - log.debug("_FadeCurve.plot: no curve") - else: - log.debug("_FadeCurve.plot: no GraphWidget") - - def tick(self, play_time: int) -> None: - """Update volume fade curve""" - - if not self.GraphWidget: - return - - ms_of_graph = play_time - self.start_ms - if ms_of_graph < 0: - return - - if self.region is None: - # Create the region now that we're into fade - log.debug("issue223: _FadeCurve: create region") - self.region = pg.LinearRegionItem([0, 0], bounds=[0, len(self.graph_array)]) - self.GraphWidget.addItem(self.region) - - # Update region position - if self.region: - # Next line is very noisy - # log.debug("issue223: _FadeCurve: update region") - self.region.setRegion([0, ms_of_graph * self.ms_to_array_factor]) - - -class _FadeTrack(QThread): - finished = pyqtSignal() - - def __init__(self, player: vlc.MediaPlayer, fade_seconds: int) -> None: - super().__init__() - self.player = player - self.fade_seconds = fade_seconds - - def run(self) -> None: - """ - Implementation of fading the player - """ - - if not self.player: - return - - # Reduce volume logarithmically - total_steps = self.fade_seconds * Config.FADEOUT_STEPS_PER_SECOND - if total_steps > 0: - db_reduction_per_step = Config.FADEOUT_DB / total_steps - reduction_factor_per_step = pow(10, (db_reduction_per_step / 20)) - - volume = self.player.audio_get_volume() - - for i in range(1, total_steps + 1): - self.player.audio_set_volume( - int(volume * pow(reduction_factor_per_step, i)) - ) - sleep(1 / Config.FADEOUT_STEPS_PER_SECOND) - - self.finished.emit() - - -vlc_instance = VLCManager().vlc_instance - - -class _Music: - """ - Manage the playing of music tracks - """ - - def __init__(self, name: str) -> None: - vlc_instance.set_user_agent(name, name) - self.player: Optional[vlc.MediaPlayer] = None - self.name = name - self.max_volume: int = Config.VLC_VOLUME_DEFAULT - self.start_dt: Optional[dt.datetime] = None - - # Set up logging - # self._set_vlc_log() - - # VLC logging very noisy so comment out unless needed - # @VLC_LOG_CB - # def log_callback(data, level, ctx, fmt, args): - # try: - # # Create a ctypes string buffer to hold the formatted message - # buf = ctypes.create_string_buffer(1024) - - # # Use vsnprintf to format the string with the va_list - # libc.vsnprintf(buf, len(buf), fmt, args) - - # # Decode the formatted message - # message = buf.value.decode("utf-8", errors="replace") - # log.debug("VLC: " + message) - # except Exception as e: - # log.error(f"Error in VLC log callback: {e}") - - # def _set_vlc_log(self): - # try: - # vlc.libvlc_log_set(vlc_instance, self.log_callback, None) - # log.debug("VLC logging set up successfully") - # except Exception as e: - # log.error(f"Failed to set up VLC logging: {e}") - - def adjust_by_ms(self, ms: int) -> None: - """Move player position by ms milliseconds""" - - if not self.player: - return - - elapsed_ms = self.get_playtime() - position = self.get_position() - if not position: - position = 0.0 - new_position = max(0.0, position + ((position * ms) / elapsed_ms)) - self.set_position(new_position) - # Adjus start time so elapsed time calculations are correct - if new_position == 0: - self.start_dt = dt.datetime.now() - else: - if self.start_dt: - self.start_dt -= dt.timedelta(milliseconds=ms) - else: - self.start_dt = dt.datetime.now() - dt.timedelta(milliseconds=ms) - - def fade(self, fade_seconds: int) -> None: - """ - Fade the currently playing track. - - The actual management of fading runs in its own thread so as not - to hold up the UI during the fade. - """ - - if not self.player: - return - - if not self.player.get_position() > 0 and self.player.is_playing(): - return - - self.fader_worker = _FadeTrack(self.player, fade_seconds=fade_seconds) - self.fader_worker.finished.connect(self.player.release) - self.fader_worker.start() - self.start_dt = None - - def get_playtime(self) -> int: - """ - Return number of milliseconds current track has been playing or - zero if not playing. The vlc function get_time() only updates 3-4 - times a second; this function has much better resolution. - """ - - if self.start_dt is None: - return 0 - - now = dt.datetime.now() - elapsed_seconds = (now - self.start_dt).total_seconds() - return int(elapsed_seconds * 1000) - - def get_position(self) -> Optional[float]: - """Return current position""" - - if not self.player: - return None - return self.player.get_position() - - def is_playing(self) -> bool: - """ - Return True if we're playing - """ - - if not self.player: - return False - - # There is a discrete time between starting playing a track and - # player.is_playing() returning True, so assume playing if less - # than Config.PLAY_SETTLE microseconds have passed since - # starting play. - return self.start_dt is not None and ( - self.player.is_playing() - or (dt.datetime.now() - self.start_dt) - < dt.timedelta(microseconds=Config.PLAY_SETTLE) - ) - - def play( - self, - path: str, - start_time: dt.datetime, - position: Optional[float] = None, - ) -> None: - """ - Start playing the track at path. - - Log and return if path not found. - - start_time ensures our version and our caller's version of - the start time is the same - """ - - log.debug(f"Music[{self.name}].play({path=}, {position=}") - - if file_is_unreadable(path): - log.error(f"play({path}): path not readable") - return None - - self.player = vlc.MediaPlayer(vlc_instance, path) - if self.player is None: - log.error(f"_Music:play: failed to create MediaPlayer ({path=})") - show_warning( - None, "Error creating MediaPlayer", f"Cannot play file ({path})" - ) - return - - _ = self.player.play() - self.set_volume(self.max_volume) - - if position: - self.player.set_position(position) - self.start_dt = start_time - - # For as-yet unknown reasons. sometimes the volume gets - # reset to zero within 200mS or so of starting play. This - # only happened since moving to Debian 12, which uses - # Pipewire for sound (which may be irrelevant). - # It has been known for the volume to need correcting more - # than once in the first 200mS. - # Update August 2024: This no longer seems to be an issue - # for _ in range(3): - # if self.player: - # volume = self.player.audio_get_volume() - # if volume < Config.VLC_VOLUME_DEFAULT: - # self.set_volume(Config.VLC_VOLUME_DEFAULT) - # log.error(f"Reset from {volume=}") - # sleep(0.1) - - def set_position(self, position: float) -> None: - """ - Set player position - """ - - if self.player: - self.player.set_position(position) - - def set_volume( - self, volume: Optional[int] = None, set_default: bool = True - ) -> None: - """Set maximum volume used for player""" - - if not self.player: - return - - if set_default and volume: - self.max_volume = volume - - if volume is None: - volume = Config.VLC_VOLUME_DEFAULT - - self.player.audio_set_volume(volume) - # Ensure volume correct - # For as-yet unknown reasons. sometimes the volume gets - # reset to zero within 200mS or so of starting play. This - # only happened since moving to Debian 12, which uses - # Pipewire for sound (which may be irrelevant). - for _ in range(3): - current_volume = self.player.audio_get_volume() - if current_volume < volume: - self.player.audio_set_volume(volume) - log.debug(f"Reset from {volume=}") - sleep(0.1) - - def stop(self) -> None: - """Immediately stop playing""" - - log.debug(f"Music[{self.name}].stop()") - - self.start_dt = None - - if not self.player: - return - - if self.player.is_playing(): - self.player.stop() - self.player.release() - self.player = None +class FileErrors(NamedTuple): + path: str + error: str class ApplicationError(Exception): @@ -475,345 +103,13 @@ class MusicMusterSignals(QObject): super().__init__() -class RowAndTrack: - """ - Object to manage playlist rows and tracks. - """ - - def __init__(self, playlist_row: PlaylistRows) -> None: - """ - Initialises data structure. - - The passed PlaylistRows object will include a Tracks object if this - row has a track. - """ - - # Collect playlistrow data - self.note = playlist_row.note - self.played = playlist_row.played - self.playlist_id = playlist_row.playlist_id - self.playlistrow_id = playlist_row.id - self.row_number = playlist_row.row_number - self.track_id = playlist_row.track_id - - # Collect track data if there's a track - if playlist_row.track_id: - self.artist = playlist_row.track.artist - self.bitrate = playlist_row.track.bitrate - self.duration = playlist_row.track.duration - self.fade_at = playlist_row.track.fade_at - self.intro = playlist_row.track.intro - if playlist_row.track.playdates: - self.lastplayed = max( - [a.lastplayed for a in playlist_row.track.playdates] - ) - else: - self.lastplayed = Config.EPOCH - self.path = playlist_row.track.path - self.silence_at = playlist_row.track.silence_at - self.start_gap = playlist_row.track.start_gap - self.title = playlist_row.track.title - else: - self.artist = "" - self.bitrate = None - self.duration = 0 - self.fade_at = 0 - self.intro = None - self.lastplayed = Config.EPOCH - self.path = "" - self.silence_at = 0 - self.start_gap = 0 - self.title = "" - - # Track playing data - self.end_of_track_signalled: bool = False - self.end_time: Optional[dt.datetime] = None - self.fade_graph: Optional[_FadeCurve] = None - self.fade_graph_start_updates: Optional[dt.datetime] = None - self.resume_marker: Optional[float] = 0.0 - self.forecast_end_time: Optional[dt.datetime] = None - self.forecast_start_time: Optional[dt.datetime] = None - self.start_time: Optional[dt.datetime] = None - - # Other object initialisation - self.music = _Music(name=Config.VLC_MAIN_PLAYER_NAME) - self.signals = MusicMusterSignals() - - def __repr__(self) -> str: - return ( - f"" - ) - - def check_for_end_of_track(self) -> None: - """ - Check whether track has ended. If so, emit track_ended_signal - """ - - if self.start_time is None: - return - - if self.end_of_track_signalled: - return - - if self.music.is_playing(): - return - - self.start_time = None - if self.fade_graph: - self.fade_graph.clear() - # Ensure that player is released - self.music.fade(0) - self.signals.track_ended_signal.emit() - self.end_of_track_signalled = True - - def create_fade_graph(self) -> None: - """ - Initialise and add FadeCurve in a thread as it's slow - """ - - self.fadecurve_thread = QThread() - self.worker = _AddFadeCurve( - self, - track_path=self.path, - track_fade_at=self.fade_at, - track_silence_at=self.silence_at, - ) - self.worker.moveToThread(self.fadecurve_thread) - self.fadecurve_thread.started.connect(self.worker.run) - self.worker.finished.connect(self.fadecurve_thread.quit) - self.worker.finished.connect(self.worker.deleteLater) - self.fadecurve_thread.finished.connect(self.fadecurve_thread.deleteLater) - self.fadecurve_thread.start() - - def drop3db(self, enable: bool) -> None: - """ - If enable is true, drop output by 3db else restore to full volume - """ - - if enable: - self.music.set_volume(volume=Config.VLC_VOLUME_DROP3db, set_default=False) - else: - self.music.set_volume(volume=Config.VLC_VOLUME_DEFAULT, set_default=False) - - def fade(self, fade_seconds: int = Config.FADEOUT_SECONDS) -> None: - """Fade music""" - - self.resume_marker = self.music.get_position() - self.music.fade(fade_seconds) - self.signals.track_ended_signal.emit() - - def is_playing(self) -> bool: - """ - Return True if we're currently playing else False - """ - - if self.start_time is None: - return False - - return self.music.is_playing() - - def move_back(self, ms: int = Config.PREVIEW_BACK_MS) -> None: - """ - Rewind player by ms milliseconds - """ - - self.music.adjust_by_ms(ms * -1) - - def move_forward(self, ms: int = Config.PREVIEW_ADVANCE_MS) -> None: - """ - Rewind player by ms milliseconds - """ - - self.music.adjust_by_ms(ms) - - def play(self, position: Optional[float] = None) -> None: - """Play track""" - - log.debug(f"issue223: RowAndTrack: play {self.track_id=}") - now = dt.datetime.now() - self.start_time = now - - # Initialise player - self.music.play(self.path, start_time=now, position=position) - - self.end_time = now + dt.timedelta(milliseconds=self.duration) - - # Calculate time fade_graph should start updating - if self.fade_at: - update_graph_at_ms = max( - 0, self.fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1 - ) - self.fade_graph_start_updates = now + dt.timedelta( - milliseconds=update_graph_at_ms - ) - - def restart(self) -> None: - """ - Restart player - """ - - self.music.adjust_by_ms(self.time_playing() * -1) - - def set_forecast_start_time( - self, modified_rows: list[int], start: Optional[dt.datetime] - ) -> Optional[dt.datetime]: - """ - Set forecast start time for this row - - Update passed modified rows list if we changed the row. - - Return new start time - """ - - changed = False - - if self.forecast_start_time != start: - self.forecast_start_time = start - changed = True - if start is None: - if self.forecast_end_time is not None: - self.forecast_end_time = None - changed = True - new_start_time = None - else: - end_time = start + dt.timedelta(milliseconds=self.duration) - new_start_time = end_time - if self.forecast_end_time != end_time: - self.forecast_end_time = end_time - changed = True - - if changed and self.row_number not in modified_rows: - modified_rows.append(self.row_number) - - return new_start_time - - def stop(self, fade_seconds: int = 0) -> None: - """ - Stop this track playing - """ - - self.resume_marker = self.music.get_position() - self.fade(fade_seconds) - - # Reset fade graph - if self.fade_graph: - self.fade_graph.clear() - - def time_playing(self) -> int: - """ - Return time track has been playing in milliseconds, zero if not playing - """ - - if self.start_time is None: - return 0 - - return self.music.get_playtime() - - def time_remaining_intro(self) -> int: - """ - Return milliseconds of intro remaining. Return 0 if no intro time in track - record or if intro has finished. - """ - - if not self.intro: - return 0 - - return max(0, self.intro - self.time_playing()) - - def time_to_fade(self) -> int: - """ - Return milliseconds until fade time. Return zero if we're not playing. - """ - - if self.start_time is None: - return 0 - - return self.fade_at - self.time_playing() - - def time_to_silence(self) -> int: - """ - Return milliseconds until silent. Return zero if we're not playing. - """ - - if self.start_time is None: - return 0 - - return self.silence_at - self.time_playing() - - def update_fade_graph(self) -> None: - """ - Update fade graph - """ - - if ( - not self.is_playing() - or not self.fade_graph_start_updates - or not self.fade_graph - ): - return - - now = dt.datetime.now() - - if self.fade_graph_start_updates > now: - return - - self.fade_graph.tick(self.time_playing()) - - def update_playlist_and_row(self, session: Session) -> None: - """ - Update local playlist_id and row_number from playlistrow_id - """ - - plr = session.get(PlaylistRows, self.playlistrow_id) - if not plr: - raise ApplicationError(f"(Can't retrieve PlaylistRows entry, {self=}") - self.playlist_id = plr.playlist_id - self.row_number = plr.row_number - - -@dataclass -class TrackFileData: - """ - Simple class to track details changes to a track file - """ - - new_file_path: str - track_id: int = 0 - track_path: Optional[str] = None - obsolete_path: Optional[str] = None - tags: dict[str, Any] = field(default_factory=dict) - audio_metadata: dict[str, str | int | float] = field(default_factory=dict) +class Tags(NamedTuple): + artist: str + title: str + bitrate: int + duration: int class TrackInfo(NamedTuple): track_id: int row_number: int - - -class TrackSequence: - next: Optional[RowAndTrack] = None - current: Optional[RowAndTrack] = None - previous: Optional[RowAndTrack] = None - - def set_next(self, rat: Optional[RowAndTrack]) -> None: - """ - Set the 'next' track to be passed rat. Clear - any previous next track. If passed rat is None - just clear existing next track. - """ - - # Clear any existing fade graph - if self.next and self.next.fade_graph: - self.next.fade_graph.clear() - - if rat is None: - self.next = None - else: - self.next = rat - self.next.create_fade_graph() - - -track_sequence = TrackSequence() diff --git a/app/music_manager.py b/app/music_manager.py new file mode 100644 index 0000000..a2afce4 --- /dev/null +++ b/app/music_manager.py @@ -0,0 +1,746 @@ +# Standard library imports +from __future__ import annotations + +import datetime as dt +from time import sleep +from typing import Optional + +# Third party imports +import numpy as np +import pyqtgraph as pg # type: ignore +from sqlalchemy.orm.session import Session +import vlc # type: ignore + +# PyQt imports +from PyQt6.QtCore import ( + pyqtSignal, + QObject, + QThread, +) +from pyqtgraph import PlotWidget +from pyqtgraph.graphicsItems.PlotDataItem import PlotDataItem # type: ignore +from pyqtgraph.graphicsItems.LinearRegionItem import LinearRegionItem # type: ignore + +# App imports +from classes import ApplicationError, MusicMusterSignals +from config import Config +import helpers +from log import log +from models import PlaylistRows +from vlcmanager import VLCManager + +# Define the VLC callback function type +# import ctypes +# import platform +# VLC logging is very noisy so comment out unless needed +# VLC_LOG_CB = ctypes.CFUNCTYPE( +# None, +# ctypes.c_void_p, +# ctypes.c_int, +# ctypes.c_void_p, +# ctypes.c_char_p, +# ctypes.c_void_p, +# ) + +# # Determine the correct C library for vsnprintf based on the platform +# if platform.system() == "Windows": +# libc = ctypes.CDLL("msvcrt") +# elif platform.system() == "Linux": +# libc = ctypes.CDLL("libc.so.6") +# elif platform.system() == "Darwin": # macOS +# libc = ctypes.CDLL("libc.dylib") +# else: +# raise OSError("Unsupported operating system") + +# # Define the vsnprintf function +# libc.vsnprintf.argtypes = [ +# ctypes.c_char_p, +# ctypes.c_size_t, +# ctypes.c_char_p, +# ctypes.c_void_p, +# ] +# libc.vsnprintf.restype = ctypes.c_int + + +class _AddFadeCurve(QObject): + """ + Initialising a fade curve introduces a noticeable delay so carry out in + a thread. + """ + + finished = pyqtSignal() + + def __init__( + self, + rat: RowAndTrack, + track_path: str, + track_fade_at: int, + track_silence_at: int, + ) -> None: + super().__init__() + self.rat = rat + self.track_path = track_path + self.track_fade_at = track_fade_at + self.track_silence_at = track_silence_at + + def run(self) -> None: + """ + Create fade curve and add to PlaylistTrack object + """ + + fc = _FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at) + if not fc: + log.error(f"Failed to create FadeCurve for {self.track_path=}") + else: + self.rat.fade_graph = fc + self.finished.emit() + + +class _FadeCurve: + GraphWidget: Optional[PlotWidget] = None + + def __init__( + self, track_path: str, track_fade_at: int, track_silence_at: int + ) -> None: + """ + Set up fade graph array + """ + + audio = helpers.get_audio_segment(track_path) + if not audio: + log.error(f"FadeCurve: could not get audio for {track_path=}") + return None + + # Start point of curve is Config.FADE_CURVE_MS_BEFORE_FADE + # milliseconds before fade starts to silence + self.start_ms: int = max( + 0, track_fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1 + ) + self.end_ms: int = track_silence_at + audio_segment = audio[self.start_ms : self.end_ms] + self.graph_array = np.array(audio_segment.get_array_of_samples()) + + # Calculate the factor to map milliseconds of track to array + self.ms_to_array_factor = len(self.graph_array) / (self.end_ms - self.start_ms) + + self.curve: Optional[PlotDataItem] = None + self.region: Optional[LinearRegionItem] = None + + def clear(self) -> None: + """Clear the current graph""" + + if self.GraphWidget: + self.GraphWidget.clear() + + def plot(self) -> None: + if self.GraphWidget: + self.curve = self.GraphWidget.plot(self.graph_array) + if self.curve: + self.curve.setPen(Config.FADE_CURVE_FOREGROUND) + else: + log.debug("_FadeCurve.plot: no curve") + else: + log.debug("_FadeCurve.plot: no GraphWidget") + + def tick(self, play_time: int) -> None: + """Update volume fade curve""" + + if not self.GraphWidget: + return + + ms_of_graph = play_time - self.start_ms + if ms_of_graph < 0: + return + + if self.region is None: + # Create the region now that we're into fade + log.debug("issue223: _FadeCurve: create region") + self.region = pg.LinearRegionItem([0, 0], bounds=[0, len(self.graph_array)]) + self.GraphWidget.addItem(self.region) + + # Update region position + if self.region: + # Next line is very noisy + # log.debug("issue223: _FadeCurve: update region") + self.region.setRegion([0, ms_of_graph * self.ms_to_array_factor]) + + +class _FadeTrack(QThread): + finished = pyqtSignal() + + def __init__(self, player: vlc.MediaPlayer, fade_seconds: int) -> None: + super().__init__() + self.player = player + self.fade_seconds = fade_seconds + + def run(self) -> None: + """ + Implementation of fading the player + """ + + if not self.player: + return + + # Reduce volume logarithmically + total_steps = self.fade_seconds * Config.FADEOUT_STEPS_PER_SECOND + if total_steps > 0: + db_reduction_per_step = Config.FADEOUT_DB / total_steps + reduction_factor_per_step = pow(10, (db_reduction_per_step / 20)) + + volume = self.player.audio_get_volume() + + for i in range(1, total_steps + 1): + self.player.audio_set_volume( + int(volume * pow(reduction_factor_per_step, i)) + ) + sleep(1 / Config.FADEOUT_STEPS_PER_SECOND) + + self.finished.emit() + + +# TODO can we move this into the _Music class? +vlc_instance = VLCManager().vlc_instance + + +class _Music: + """ + Manage the playing of music tracks + """ + + def __init__(self, name: str) -> None: + vlc_instance.set_user_agent(name, name) + self.player: Optional[vlc.MediaPlayer] = None + self.name = name + self.max_volume: int = Config.VLC_VOLUME_DEFAULT + self.start_dt: Optional[dt.datetime] = None + + # Set up logging + # self._set_vlc_log() + + # VLC logging very noisy so comment out unless needed + # @VLC_LOG_CB + # def log_callback(data, level, ctx, fmt, args): + # try: + # # Create a ctypes string buffer to hold the formatted message + # buf = ctypes.create_string_buffer(1024) + + # # Use vsnprintf to format the string with the va_list + # libc.vsnprintf(buf, len(buf), fmt, args) + + # # Decode the formatted message + # message = buf.value.decode("utf-8", errors="replace") + # log.debug("VLC: " + message) + # except Exception as e: + # log.error(f"Error in VLC log callback: {e}") + + # def _set_vlc_log(self): + # try: + # vlc.libvlc_log_set(vlc_instance, self.log_callback, None) + # log.debug("VLC logging set up successfully") + # except Exception as e: + # log.error(f"Failed to set up VLC logging: {e}") + + def adjust_by_ms(self, ms: int) -> None: + """Move player position by ms milliseconds""" + + if not self.player: + return + + elapsed_ms = self.get_playtime() + position = self.get_position() + if not position: + position = 0.0 + new_position = max(0.0, position + ((position * ms) / elapsed_ms)) + self.set_position(new_position) + # Adjus start time so elapsed time calculations are correct + if new_position == 0: + self.start_dt = dt.datetime.now() + else: + if self.start_dt: + self.start_dt -= dt.timedelta(milliseconds=ms) + else: + self.start_dt = dt.datetime.now() - dt.timedelta(milliseconds=ms) + + def fade(self, fade_seconds: int) -> None: + """ + Fade the currently playing track. + + The actual management of fading runs in its own thread so as not + to hold up the UI during the fade. + """ + + if not self.player: + return + + if not self.player.get_position() > 0 and self.player.is_playing(): + return + + self.fader_worker = _FadeTrack(self.player, fade_seconds=fade_seconds) + self.fader_worker.finished.connect(self.player.release) + self.fader_worker.start() + self.start_dt = None + + def get_playtime(self) -> int: + """ + Return number of milliseconds current track has been playing or + zero if not playing. The vlc function get_time() only updates 3-4 + times a second; this function has much better resolution. + """ + + if self.start_dt is None: + return 0 + + now = dt.datetime.now() + elapsed_seconds = (now - self.start_dt).total_seconds() + return int(elapsed_seconds * 1000) + + def get_position(self) -> Optional[float]: + """Return current position""" + + if not self.player: + return None + return self.player.get_position() + + def is_playing(self) -> bool: + """ + Return True if we're playing + """ + + if not self.player: + return False + + # There is a discrete time between starting playing a track and + # player.is_playing() returning True, so assume playing if less + # than Config.PLAY_SETTLE microseconds have passed since + # starting play. + return self.start_dt is not None and ( + self.player.is_playing() + or (dt.datetime.now() - self.start_dt) + < dt.timedelta(microseconds=Config.PLAY_SETTLE) + ) + + def play( + self, + path: str, + start_time: dt.datetime, + position: Optional[float] = None, + ) -> None: + """ + Start playing the track at path. + + Log and return if path not found. + + start_time ensures our version and our caller's version of + the start time is the same + """ + + log.debug(f"Music[{self.name}].play({path=}, {position=}") + + if helpers.file_is_unreadable(path): + log.error(f"play({path}): path not readable") + return None + + self.player = vlc.MediaPlayer(vlc_instance, path) + if self.player is None: + log.error(f"_Music:play: failed to create MediaPlayer ({path=})") + helpers.show_warning( + None, "Error creating MediaPlayer", f"Cannot play file ({path})" + ) + return + + _ = self.player.play() + self.set_volume(self.max_volume) + + if position: + self.player.set_position(position) + self.start_dt = start_time + + # For as-yet unknown reasons. sometimes the volume gets + # reset to zero within 200mS or so of starting play. This + # only happened since moving to Debian 12, which uses + # Pipewire for sound (which may be irrelevant). + # It has been known for the volume to need correcting more + # than once in the first 200mS. + # Update August 2024: This no longer seems to be an issue + # for _ in range(3): + # if self.player: + # volume = self.player.audio_get_volume() + # if volume < Config.VLC_VOLUME_DEFAULT: + # self.set_volume(Config.VLC_VOLUME_DEFAULT) + # log.error(f"Reset from {volume=}") + # sleep(0.1) + + def set_position(self, position: float) -> None: + """ + Set player position + """ + + if self.player: + self.player.set_position(position) + + def set_volume( + self, volume: Optional[int] = None, set_default: bool = True + ) -> None: + """Set maximum volume used for player""" + + if not self.player: + return + + if set_default and volume: + self.max_volume = volume + + if volume is None: + volume = Config.VLC_VOLUME_DEFAULT + + self.player.audio_set_volume(volume) + # Ensure volume correct + # For as-yet unknown reasons. sometimes the volume gets + # reset to zero within 200mS or so of starting play. This + # only happened since moving to Debian 12, which uses + # Pipewire for sound (which may be irrelevant). + for _ in range(3): + current_volume = self.player.audio_get_volume() + if current_volume < volume: + self.player.audio_set_volume(volume) + log.debug(f"Reset from {volume=}") + sleep(0.1) + + def stop(self) -> None: + """Immediately stop playing""" + + log.debug(f"Music[{self.name}].stop()") + + self.start_dt = None + + if not self.player: + return + + if self.player.is_playing(): + self.player.stop() + self.player.release() + self.player = None + + +class RowAndTrack: + """ + Object to manage playlist rows and tracks. + """ + + def __init__(self, playlist_row: PlaylistRows) -> None: + """ + Initialises data structure. + + The passed PlaylistRows object will include a Tracks object if this + row has a track. + """ + + # Collect playlistrow data + self.note = playlist_row.note + self.played = playlist_row.played + self.playlist_id = playlist_row.playlist_id + self.playlistrow_id = playlist_row.id + self.row_number = playlist_row.row_number + self.track_id = playlist_row.track_id + + # Collect track data if there's a track + if playlist_row.track_id: + self.artist = playlist_row.track.artist + self.bitrate = playlist_row.track.bitrate + self.duration = playlist_row.track.duration + self.fade_at = playlist_row.track.fade_at + self.intro = playlist_row.track.intro + if playlist_row.track.playdates: + self.lastplayed = max( + [a.lastplayed for a in playlist_row.track.playdates] + ) + else: + self.lastplayed = Config.EPOCH + self.path = playlist_row.track.path + self.silence_at = playlist_row.track.silence_at + self.start_gap = playlist_row.track.start_gap + self.title = playlist_row.track.title + else: + self.artist = "" + self.bitrate = None + self.duration = 0 + self.fade_at = 0 + self.intro = None + self.lastplayed = Config.EPOCH + self.path = "" + self.silence_at = 0 + self.start_gap = 0 + self.title = "" + + # Track playing data + self.end_of_track_signalled: bool = False + self.end_time: Optional[dt.datetime] = None + self.fade_graph: Optional[_FadeCurve] = None + self.fade_graph_start_updates: Optional[dt.datetime] = None + self.resume_marker: Optional[float] = 0.0 + self.forecast_end_time: Optional[dt.datetime] = None + self.forecast_start_time: Optional[dt.datetime] = None + self.start_time: Optional[dt.datetime] = None + + # Other object initialisation + self.music = _Music(name=Config.VLC_MAIN_PLAYER_NAME) + self.signals = MusicMusterSignals() + + def __repr__(self) -> str: + return ( + f"" + ) + + def check_for_end_of_track(self) -> None: + """ + Check whether track has ended. If so, emit track_ended_signal + """ + + if self.start_time is None: + return + + if self.end_of_track_signalled: + return + + if self.music.is_playing(): + return + + self.start_time = None + if self.fade_graph: + self.fade_graph.clear() + # Ensure that player is released + self.music.fade(0) + self.signals.track_ended_signal.emit() + self.end_of_track_signalled = True + + def create_fade_graph(self) -> None: + """ + Initialise and add FadeCurve in a thread as it's slow + """ + + self.fadecurve_thread = QThread() + self.worker = _AddFadeCurve( + self, + track_path=self.path, + track_fade_at=self.fade_at, + track_silence_at=self.silence_at, + ) + self.worker.moveToThread(self.fadecurve_thread) + self.fadecurve_thread.started.connect(self.worker.run) + self.worker.finished.connect(self.fadecurve_thread.quit) + self.worker.finished.connect(self.worker.deleteLater) + self.fadecurve_thread.finished.connect(self.fadecurve_thread.deleteLater) + self.fadecurve_thread.start() + + def drop3db(self, enable: bool) -> None: + """ + If enable is true, drop output by 3db else restore to full volume + """ + + if enable: + self.music.set_volume(volume=Config.VLC_VOLUME_DROP3db, set_default=False) + else: + self.music.set_volume(volume=Config.VLC_VOLUME_DEFAULT, set_default=False) + + def fade(self, fade_seconds: int = Config.FADEOUT_SECONDS) -> None: + """Fade music""" + + self.resume_marker = self.music.get_position() + self.music.fade(fade_seconds) + self.signals.track_ended_signal.emit() + + def is_playing(self) -> bool: + """ + Return True if we're currently playing else False + """ + + if self.start_time is None: + return False + + return self.music.is_playing() + + def move_back(self, ms: int = Config.PREVIEW_BACK_MS) -> None: + """ + Rewind player by ms milliseconds + """ + + self.music.adjust_by_ms(ms * -1) + + def move_forward(self, ms: int = Config.PREVIEW_ADVANCE_MS) -> None: + """ + Rewind player by ms milliseconds + """ + + self.music.adjust_by_ms(ms) + + def play(self, position: Optional[float] = None) -> None: + """Play track""" + + log.debug(f"issue223: RowAndTrack: play {self.track_id=}") + now = dt.datetime.now() + self.start_time = now + + # Initialise player + self.music.play(self.path, start_time=now, position=position) + + self.end_time = now + dt.timedelta(milliseconds=self.duration) + + # Calculate time fade_graph should start updating + if self.fade_at: + update_graph_at_ms = max( + 0, self.fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1 + ) + self.fade_graph_start_updates = now + dt.timedelta( + milliseconds=update_graph_at_ms + ) + + def restart(self) -> None: + """ + Restart player + """ + + self.music.adjust_by_ms(self.time_playing() * -1) + + def set_forecast_start_time( + self, modified_rows: list[int], start: Optional[dt.datetime] + ) -> Optional[dt.datetime]: + """ + Set forecast start time for this row + + Update passed modified rows list if we changed the row. + + Return new start time + """ + + changed = False + + if self.forecast_start_time != start: + self.forecast_start_time = start + changed = True + if start is None: + if self.forecast_end_time is not None: + self.forecast_end_time = None + changed = True + new_start_time = None + else: + end_time = start + dt.timedelta(milliseconds=self.duration) + new_start_time = end_time + if self.forecast_end_time != end_time: + self.forecast_end_time = end_time + changed = True + + if changed and self.row_number not in modified_rows: + modified_rows.append(self.row_number) + + return new_start_time + + def stop(self, fade_seconds: int = 0) -> None: + """ + Stop this track playing + """ + + self.resume_marker = self.music.get_position() + self.fade(fade_seconds) + + # Reset fade graph + if self.fade_graph: + self.fade_graph.clear() + + def time_playing(self) -> int: + """ + Return time track has been playing in milliseconds, zero if not playing + """ + + if self.start_time is None: + return 0 + + return self.music.get_playtime() + + def time_remaining_intro(self) -> int: + """ + Return milliseconds of intro remaining. Return 0 if no intro time in track + record or if intro has finished. + """ + + if not self.intro: + return 0 + + return max(0, self.intro - self.time_playing()) + + def time_to_fade(self) -> int: + """ + Return milliseconds until fade time. Return zero if we're not playing. + """ + + if self.start_time is None: + return 0 + + return self.fade_at - self.time_playing() + + def time_to_silence(self) -> int: + """ + Return milliseconds until silent. Return zero if we're not playing. + """ + + if self.start_time is None: + return 0 + + return self.silence_at - self.time_playing() + + def update_fade_graph(self) -> None: + """ + Update fade graph + """ + + if ( + not self.is_playing() + or not self.fade_graph_start_updates + or not self.fade_graph + ): + return + + now = dt.datetime.now() + + if self.fade_graph_start_updates > now: + return + + self.fade_graph.tick(self.time_playing()) + + def update_playlist_and_row(self, session: Session) -> None: + """ + Update local playlist_id and row_number from playlistrow_id + """ + + plr = session.get(PlaylistRows, self.playlistrow_id) + if not plr: + raise ApplicationError(f"(Can't retrieve PlaylistRows entry, {self=}") + self.playlist_id = plr.playlist_id + self.row_number = plr.row_number + + +class TrackSequence: + next: Optional[RowAndTrack] = None + current: Optional[RowAndTrack] = None + previous: Optional[RowAndTrack] = None + + def set_next(self, rat: Optional[RowAndTrack]) -> None: + """ + Set the 'next' track to be passed rat. Clear + any previous next track. If passed rat is None + just clear existing next track. + """ + + # Clear any existing fade graph + if self.next and self.next.fade_graph: + self.next.fade_graph.clear() + + if rat is None: + self.next = None + else: + self.next = rat + self.next.create_fade_graph() + + +track_sequence = TrackSequence()