# Standard library imports from __future__ import annotations import datetime as dt import threading from time import sleep from typing import Optional # Third party imports import numpy as np import pyqtgraph as pg # type: ignore import vlc # type: ignore # PyQt imports from PyQt6.QtCore import ( pyqtSignal, QObject, QRunnable, QThread, QThreadPool, ) from pyqtgraph import PlotWidget from pyqtgraph.graphicsItems.PlotDataItem import PlotDataItem # type: ignore from pyqtgraph.graphicsItems.LinearRegionItem import LinearRegionItem # type: ignore # App imports from classes import MusicMusterSignals from config import Config from log import log from models import db, PlaylistRows, Tracks from helpers import ( file_is_unreadable, get_audio_segment, ) lock = threading.Lock() class _AddFadeCurve(QObject): """ Initialising a fade curve introduces a noticeable delay so carry out in a thread. """ finished = pyqtSignal() def __init__( self, track_manager: _TrackManager, track_path: str, track_fade_at: int, track_silence_at: int, ) -> None: super().__init__() self.track_manager = track_manager self.track_path = track_path self.track_fade_at = track_fade_at self.track_silence_at = track_silence_at def run(self) -> None: """ Create fade curve and add to PlaylistTrack object """ fc = _FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at) if not fc: log.error(f"Failed to create FadeCurve for {self.track_path=}") else: self.track_manager.fade_graph = fc self.finished.emit() class _FadeCurve: GraphWidget: Optional[PlotWidget] = None def __init__( self, track_path: str, track_fade_at: int, track_silence_at: int ) -> None: """ Set up fade graph array """ audio = get_audio_segment(track_path) if not audio: log.error(f"FadeCurve: could not get audio for {track_path=}") return None # Start point of curve is Config.FADE_CURVE_MS_BEFORE_FADE # milliseconds before fade starts to silence self.start_ms: int = max( 0, track_fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1 ) self.end_ms: int = track_silence_at self.audio_segment = audio[self.start_ms : self.end_ms] self.graph_array = np.array(self.audio_segment.get_array_of_samples()) # Calculate the factor to map milliseconds of track to array self.ms_to_array_factor = len(self.graph_array) / (self.end_ms - self.start_ms) self.curve: Optional[PlotDataItem] = None self.region: Optional[LinearRegionItem] = None def clear(self) -> None: """Clear the current graph""" if self.GraphWidget: self.GraphWidget.clear() def plot(self) -> None: if self.GraphWidget: self.curve = self.GraphWidget.plot(self.graph_array) if self.curve: self.curve.setPen(Config.FADE_CURVE_FOREGROUND) def tick(self, play_time) -> None: """Update volume fade curve""" if not self.GraphWidget: return ms_of_graph = play_time - self.start_ms if ms_of_graph < 0: return if self.region is None: # Create the region now that we're into fade log.debug("issue223: _FadeCurve: create region") self.region = pg.LinearRegionItem([0, 0], bounds=[0, len(self.graph_array)]) self.GraphWidget.addItem(self.region) # Update region position if self.region: log.debug("issue223: _FadeCurve: update region") self.region.setRegion([0, ms_of_graph * self.ms_to_array_factor]) class _FadeTrack(QRunnable): def __init__(self, player: vlc.MediaPlayer, fade_seconds) -> None: super().__init__() self.player: vlc.MediaPlayer = player self.fade_seconds: int = fade_seconds def run(self) -> None: """ Implementation of fading the player """ if not self.player: return # Reduce volume logarithmically total_steps = self.fade_seconds * Config.FADEOUT_STEPS_PER_SECOND db_reduction_per_step = Config.FADEOUT_DB / total_steps reduction_factor_per_step = pow(10, (db_reduction_per_step / 20)) volume = self.player.audio_get_volume() for i in range(1, total_steps + 1): self.player.audio_set_volume( int(volume * pow(reduction_factor_per_step, i)) ) sleep(1 / Config.FADEOUT_STEPS_PER_SECOND) self.player.stop() log.debug(f"Releasing player {self.player=}") self.player.release() class _Music: """ Manage the playing of music tracks """ def __init__(self, name) -> None: self.VLC = vlc.Instance() self.VLC.set_user_agent(name, name) self.player: Optional[vlc.MediaPlayer] = None self.name: str = name self.max_volume: int = Config.VLC_VOLUME_DEFAULT self.start_dt: Optional[dt.datetime] = None def adjust_by_ms(self, ms: int) -> None: """Move player position by ms milliseconds""" if not self.player: return elapsed_ms = self.get_playtime() position = self.get_position() if not position: position = 0.0 new_position = max(0.0, position + ((position * ms) / elapsed_ms)) self.set_position(new_position) # Adjus start time so elapsed time calculations are correct if new_position == 0: self.start_dt = dt.datetime.now() else: if self.start_dt: self.start_dt -= dt.timedelta(milliseconds=ms) else: self.start_dt = dt.datetime.now() - dt.timedelta(milliseconds=ms) def stop(self) -> None: """Immediately stop playing""" log.debug(f"Music[{self.name}].stop()") self.start_dt = None if not self.player: return p = self.player self.player = None self.start_dt = None with lock: p.stop() p.release() p = None def fade(self, fade_seconds: int) -> None: """ Fade the currently playing track. The actual management of fading runs in its own thread so as not to hold up the UI during the fade. """ if not self.player: return if not self.player.get_position() > 0 and self.player.is_playing(): return if fade_seconds <= 0: self.stop() return # Take a copy of current player to allow another track to be # started without interfering here with lock: p = self.player self.player = None pool = QThreadPool.globalInstance() if pool: fader = _FadeTrack(p, fade_seconds=fade_seconds) pool.start(fader) self.start_dt = None else: log.error("_Music: failed to allocate QThreadPool") def get_playtime(self) -> int: """ Return number of milliseconds current track has been playing or zero if not playing. The vlc function get_time() only updates 3-4 times a second; this function has much better resolution. """ if self.start_dt is None: return 0 now = dt.datetime.now() elapsed_seconds = (now - self.start_dt).total_seconds() return int(elapsed_seconds * 1000) def get_position(self) -> Optional[float]: """Return current position""" if not self.player: return None return self.player.get_position() def is_playing(self) -> bool: """ Return True if we're playing """ if not self.player: return False # There is a discrete time between starting playing a track and # player.is_playing() returning True, so assume playing if less # than Config.PLAY_SETTLE microseconds have passed since # starting play. return self.start_dt is not None and ( self.player.is_playing() or (dt.datetime.now() - self.start_dt) < dt.timedelta(microseconds=Config.PLAY_SETTLE) ) def play( self, path: str, start_time: dt.datetime, position: Optional[float] = None, ) -> None: """ Start playing the track at path. Log and return if path not found. start_time ensures our version and our caller's version of the start time is the same """ log.debug(f"Music[{self.name}].play({path=}, {position=}") if file_is_unreadable(path): log.error(f"play({path}): path not readable") return None media = self.VLC.media_new_path(path) self.player = media.player_new_from_media() if self.player: _ = self.player.play() self.set_volume(self.max_volume) if position: self.player.set_position(position) self.start_dt = start_time # For as-yet unknown reasons. sometimes the volume gets # reset to zero within 200mS or so of starting play. This # only happened since moving to Debian 12, which uses # Pipewire for sound (which may be irrelevant). # It has been known for the volume to need correcting more # than once in the first 200mS. for _ in range(3): if self.player: volume = self.player.audio_get_volume() if volume < Config.VLC_VOLUME_DEFAULT: self.set_volume(Config.VLC_VOLUME_DEFAULT) log.error(f"Reset from {volume=}") sleep(0.1) def set_position(self, position: float) -> None: """ Set player position """ if self.player: self.player.set_position(position) def set_volume( self, volume: Optional[int] = None, set_default: bool = True ) -> None: """Set maximum volume used for player""" if not self.player: return if set_default and volume: self.max_volume = volume if volume is None: volume = Config.VLC_VOLUME_DEFAULT self.player.audio_set_volume(volume) # Ensure volume correct # For as-yet unknown reasons. sometimes the volume gets # reset to zero within 200mS or so of starting play. This # only happened since moving to Debian 12, which uses # Pipewire for sound (which may be irrelevant). for _ in range(3): current_volume = self.player.audio_get_volume() if current_volume < volume: self.player.audio_set_volume(volume) log.debug(f"Reset from {volume=}") sleep(0.1) class _TrackManager: """ Object to manage active playlist tracks, typically the previous, current and next track. """ def __init__( self, session: db.Session, player_name: str, track_id: int, row_number: int, ) -> None: """ Initialises data structure. Define a player. Raise ValueError if no track in passed plr. """ track = session.get(Tracks, track_id) if not track: raise ValueError(f"_TrackPlayer: unable to retreived {track_id=}") self.player_name = player_name self.row_number = row_number # Check file readable if file_is_unreadable(track.path): raise ValueError(f"_TrackManager.__init__: {track.path=} unreadable") self.artist = track.artist self.bitrate = track.bitrate self.duration = track.duration self.fade_at = track.fade_at self.intro = track.intro self.path = track.path self.silence_at = track.silence_at self.start_gap = track.start_gap self.title = track.title self.track_id = track.id self.end_time: Optional[dt.datetime] = None self.fade_graph: Optional[_FadeCurve] = None self.fade_graph_start_updates: Optional[dt.datetime] = None self.resume_marker: Optional[float] self.start_time: Optional[dt.datetime] = None self.end_of_track_signalled: bool = False self.signals = MusicMusterSignals() # Initialise player self.player = _Music(name=player_name) # Initialise and add FadeCurve in a thread as it's slow self.fadecurve_thread = QThread() self.worker = _AddFadeCurve( self, track_path=track.path, track_fade_at=track.fade_at, track_silence_at=track.silence_at, ) self.worker.moveToThread(self.fadecurve_thread) self.fadecurve_thread.started.connect(self.worker.run) self.worker.finished.connect(self.fadecurve_thread.quit) self.worker.finished.connect(self.worker.deleteLater) self.fadecurve_thread.finished.connect(self.fadecurve_thread.deleteLater) self.fadecurve_thread.start() def check_for_end_of_track(self) -> None: """ Check whether track has ended. If so, emit track_ended_signal """ if self.start_time is None: return if self.end_of_track_signalled: return if not self.player.is_playing(): self.start_time = None if self.fade_graph: self.fade_graph.clear() self.signal_end_of_track() self.end_of_track_signalled = True def drop3db(self, enable: bool) -> None: """ If enable is true, drop output by 3db else restore to full volume """ if enable: self.player.set_volume(volume=Config.VLC_VOLUME_DROP3db, set_default=False) else: self.player.set_volume(volume=Config.VLC_VOLUME_DEFAULT, set_default=False) def fade(self, fade_seconds: int = Config.FADEOUT_SECONDS) -> None: """Fade music""" self.resume_marker = self.player.get_position() self.player.fade(fade_seconds) self.signal_end_of_track() def is_playing(self) -> bool: """ Return True if we're currently playing else False """ if self.start_time is None: return False return self.player.is_playing() def move_back(self, ms: int = Config.PREVIEW_BACK_MS) -> None: """ Rewind player by ms milliseconds """ self.player.adjust_by_ms(ms * -1) def move_forward(self, ms: int = Config.PREVIEW_ADVANCE_MS) -> None: """ Rewind player by ms milliseconds """ self.player.adjust_by_ms(ms) def move_to_intro_end(self, buffer: int = Config.PREVIEW_END_BUFFER_MS) -> None: """ Move play position to 'buffer' milliseconds before end of intro. If no intro defined, do nothing. """ if self.intro is None: return new_position = max(0, self.intro - Config.PREVIEW_END_BUFFER_MS) self.player.adjust_by_ms(new_position - self.time_playing()) def play(self, position: Optional[float] = None) -> None: """Play track""" log.debug(f"issue223: _TrackManager: play {self.track_id=}") now = dt.datetime.now() self.start_time = now self.player.play(self.path, start_time=now, position=position) self.end_time = self.start_time + dt.timedelta(milliseconds=self.duration) # Calculate time fade_graph should start updating if self.fade_at: update_graph_at_ms = max( 0, self.fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1 ) self.fade_graph_start_updates = now + dt.timedelta( milliseconds=update_graph_at_ms ) def restart(self) -> None: """ Restart player """ self.player.adjust_by_ms(self.time_playing() * -1) def signal_end_of_track(self) -> None: """ Send end of track signal unless we are a preview player """ def stop(self, fade_seconds: int = 0) -> None: """ Stop this track playing """ self.resume_marker = self.player.get_position() self.fade(fade_seconds) # Reset fade graph if self.fade_graph: self.fade_graph.clear() def time_playing(self) -> int: """ Return time track has been playing in milliseconds, zero if not playing """ if self.start_time is None: return 0 return self.player.get_playtime() def time_remaining_intro(self) -> int: """ Return milliseconds of intro remaining. Return 0 if no intro time in track record or if intro has finished. """ if not self.intro: return 0 return max(0, self.intro - self.time_playing()) def time_to_fade(self) -> int: """ Return milliseconds until fade time. Return zero if we're not playing. """ if self.start_time is None: return 0 return self.fade_at - self.time_playing() def time_to_silence(self) -> int: """ Return milliseconds until silent. Return zero if we're not playing. """ if self.start_time is None: return 0 return self.silence_at - self.time_playing() def update_fade_graph(self) -> None: """ Update fade graph """ if ( not self.is_playing() or not self.fade_graph_start_updates or not self.fade_graph ): return now = dt.datetime.now() if self.fade_graph_start_updates > now: return self.fade_graph.tick(self.time_playing()) class MainTrackManager(_TrackManager): """ Manage playing tracks from the playlist with associated data """ def __init__(self, session: db.Session, plr_id: int) -> None: """ Set up manager for playlist tracks """ # Ensure we have a track plr = session.get(PlaylistRows, plr_id) if not plr: raise ValueError(f"PlaylistTrack: unable to retreive plr {plr_id=}") self.track_id: int = plr.track_id super().__init__( session=session, player_name=Config.VLC_MAIN_PLAYER_NAME, track_id=self.track_id, row_number=plr.plr_rownum, ) # Save non-track plr info self.plr_id: int = plr.id self.playlist_id: int = plr.playlist_id def __repr__(self) -> str: return ( f"" ) class TrackSequence: next: Optional[MainTrackManager] = None current: Optional[MainTrackManager] = None previous: Optional[MainTrackManager] = None track_sequence = TrackSequence()