# Standard library imports from __future__ import annotations import datetime as dt from time import sleep from typing import Optional # Third party imports import numpy as np import pyqtgraph as pg # type: ignore from sqlalchemy.orm.session import Session import vlc # type: ignore # PyQt imports from PyQt6.QtCore import ( pyqtSignal, QObject, QThread, ) from pyqtgraph import PlotWidget from pyqtgraph.graphicsItems.PlotDataItem import PlotDataItem # type: ignore from pyqtgraph.graphicsItems.LinearRegionItem import LinearRegionItem # type: ignore # App imports from classes import ApplicationError, MusicMusterSignals from config import Config import helpers from log import log from models import PlaylistRows from vlcmanager import VLCManager # Define the VLC callback function type # import ctypes # import platform # VLC logging is very noisy so comment out unless needed # VLC_LOG_CB = ctypes.CFUNCTYPE( # None, # ctypes.c_void_p, # ctypes.c_int, # ctypes.c_void_p, # ctypes.c_char_p, # ctypes.c_void_p, # ) # # Determine the correct C library for vsnprintf based on the platform # if platform.system() == "Windows": # libc = ctypes.CDLL("msvcrt") # elif platform.system() == "Linux": # libc = ctypes.CDLL("libc.so.6") # elif platform.system() == "Darwin": # macOS # libc = ctypes.CDLL("libc.dylib") # else: # raise OSError("Unsupported operating system") # # Define the vsnprintf function # libc.vsnprintf.argtypes = [ # ctypes.c_char_p, # ctypes.c_size_t, # ctypes.c_char_p, # ctypes.c_void_p, # ] # libc.vsnprintf.restype = ctypes.c_int class _AddFadeCurve(QObject): """ Initialising a fade curve introduces a noticeable delay so carry out in a thread. """ finished = pyqtSignal() def __init__( self, rat: RowAndTrack, track_path: str, track_fade_at: int, track_silence_at: int, ) -> None: super().__init__() self.rat = rat self.track_path = track_path self.track_fade_at = track_fade_at self.track_silence_at = track_silence_at def run(self) -> None: """ Create fade curve and add to PlaylistTrack object """ fc = _FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at) if not fc: log.error(f"Failed to create FadeCurve for {self.track_path=}") else: self.rat.fade_graph = fc self.finished.emit() class _FadeCurve: GraphWidget: Optional[PlotWidget] = None def __init__( self, track_path: str, track_fade_at: int, track_silence_at: int ) -> None: """ Set up fade graph array """ audio = helpers.get_audio_segment(track_path) if not audio: log.error(f"FadeCurve: could not get audio for {track_path=}") return None # Start point of curve is Config.FADE_CURVE_MS_BEFORE_FADE # milliseconds before fade starts to silence self.start_ms: int = max( 0, track_fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1 ) self.end_ms: int = track_silence_at audio_segment = audio[self.start_ms : self.end_ms] self.graph_array = np.array(audio_segment.get_array_of_samples()) # Calculate the factor to map milliseconds of track to array self.ms_to_array_factor = len(self.graph_array) / (self.end_ms - self.start_ms) self.curve: Optional[PlotDataItem] = None self.region: Optional[LinearRegionItem] = None def clear(self) -> None: """Clear the current graph""" if self.GraphWidget: self.GraphWidget.clear() def plot(self) -> None: if self.GraphWidget: self.curve = self.GraphWidget.plot(self.graph_array) if self.curve: self.curve.setPen(Config.FADE_CURVE_FOREGROUND) else: log.debug("_FadeCurve.plot: no curve") else: log.debug("_FadeCurve.plot: no GraphWidget") def tick(self, play_time: int) -> None: """Update volume fade curve""" if not self.GraphWidget: return ms_of_graph = play_time - self.start_ms if ms_of_graph < 0: return if self.region is None: # Create the region now that we're into fade self.region = pg.LinearRegionItem([0, 0], bounds=[0, len(self.graph_array)]) self.GraphWidget.addItem(self.region) # Update region position if self.region: self.region.setRegion([0, ms_of_graph * self.ms_to_array_factor]) class _FadeTrack(QThread): finished = pyqtSignal() def __init__(self, player: vlc.MediaPlayer, fade_seconds: int) -> None: super().__init__() self.player = player self.fade_seconds = fade_seconds def run(self) -> None: """ Implementation of fading the player """ if not self.player: return # Reduce volume logarithmically total_steps = self.fade_seconds * Config.FADEOUT_STEPS_PER_SECOND if total_steps > 0: db_reduction_per_step = Config.FADEOUT_DB / total_steps reduction_factor_per_step = pow(10, (db_reduction_per_step / 20)) volume = self.player.audio_get_volume() for i in range(1, total_steps + 1): self.player.audio_set_volume( int(volume * pow(reduction_factor_per_step, i)) ) sleep(1 / Config.FADEOUT_STEPS_PER_SECOND) self.finished.emit() # TODO can we move this into the _Music class? vlc_instance = VLCManager().vlc_instance class _Music: """ Manage the playing of music tracks """ def __init__(self, name: str) -> None: vlc_instance.set_user_agent(name, name) self.player: Optional[vlc.MediaPlayer] = None self.name = name self.max_volume: int = Config.VLC_VOLUME_DEFAULT self.start_dt: Optional[dt.datetime] = None # Set up logging # self._set_vlc_log() # VLC logging very noisy so comment out unless needed # @VLC_LOG_CB # def log_callback(data, level, ctx, fmt, args): # try: # # Create a ctypes string buffer to hold the formatted message # buf = ctypes.create_string_buffer(1024) # # Use vsnprintf to format the string with the va_list # libc.vsnprintf(buf, len(buf), fmt, args) # # Decode the formatted message # message = buf.value.decode("utf-8", errors="replace") # log.debug("VLC: " + message) # except Exception as e: # log.error(f"Error in VLC log callback: {e}") # def _set_vlc_log(self): # try: # vlc.libvlc_log_set(vlc_instance, self.log_callback, None) # log.debug("VLC logging set up successfully") # except Exception as e: # log.error(f"Failed to set up VLC logging: {e}") def adjust_by_ms(self, ms: int) -> None: """Move player position by ms milliseconds""" if not self.player: return elapsed_ms = self.get_playtime() position = self.get_position() if not position: position = 0.0 new_position = max(0.0, position + ((position * ms) / elapsed_ms)) self.set_position(new_position) # Adjus start time so elapsed time calculations are correct if new_position == 0: self.start_dt = dt.datetime.now() else: if self.start_dt: self.start_dt -= dt.timedelta(milliseconds=ms) else: self.start_dt = dt.datetime.now() - dt.timedelta(milliseconds=ms) def fade(self, fade_seconds: int) -> None: """ Fade the currently playing track. The actual management of fading runs in its own thread so as not to hold up the UI during the fade. """ if not self.player: return if not self.player.get_position() > 0 and self.player.is_playing(): return self.fader_worker = _FadeTrack(self.player, fade_seconds=fade_seconds) self.fader_worker.finished.connect(self.player.release) self.fader_worker.start() self.start_dt = None def get_playtime(self) -> int: """ Return number of milliseconds current track has been playing or zero if not playing. The vlc function get_time() only updates 3-4 times a second; this function has much better resolution. """ if self.start_dt is None: return 0 now = dt.datetime.now() elapsed_seconds = (now - self.start_dt).total_seconds() return int(elapsed_seconds * 1000) def get_position(self) -> Optional[float]: """Return current position""" if not self.player: return None return self.player.get_position() def is_playing(self) -> bool: """ Return True if we're playing """ if not self.player: return False # There is a discrete time between starting playing a track and # player.is_playing() returning True, so assume playing if less # than Config.PLAY_SETTLE microseconds have passed since # starting play. return self.start_dt is not None and ( self.player.is_playing() or (dt.datetime.now() - self.start_dt) < dt.timedelta(microseconds=Config.PLAY_SETTLE) ) def play( self, path: str, start_time: dt.datetime, position: Optional[float] = None, ) -> None: """ Start playing the track at path. Log and return if path not found. start_time ensures our version and our caller's version of the start time is the same """ log.debug(f"Music[{self.name}].play({path=}, {position=}") if helpers.file_is_unreadable(path): log.error(f"play({path}): path not readable") return None self.player = vlc.MediaPlayer(vlc_instance, path) if self.player is None: log.error(f"_Music:play: failed to create MediaPlayer ({path=})") helpers.show_warning( None, "Error creating MediaPlayer", f"Cannot play file ({path})" ) return _ = self.player.play() self.set_volume(self.max_volume) if position: self.player.set_position(position) self.start_dt = start_time # For as-yet unknown reasons. sometimes the volume gets # reset to zero within 200mS or so of starting play. This # only happened since moving to Debian 12, which uses # Pipewire for sound (which may be irrelevant). # It has been known for the volume to need correcting more # than once in the first 200mS. # Update August 2024: This no longer seems to be an issue # for _ in range(3): # if self.player: # volume = self.player.audio_get_volume() # if volume < Config.VLC_VOLUME_DEFAULT: # self.set_volume(Config.VLC_VOLUME_DEFAULT) # log.error(f"Reset from {volume=}") # sleep(0.1) def set_position(self, position: float) -> None: """ Set player position """ if self.player: self.player.set_position(position) def set_volume( self, volume: Optional[int] = None, set_default: bool = True ) -> None: """Set maximum volume used for player""" if not self.player: return if set_default and volume: self.max_volume = volume if volume is None: volume = Config.VLC_VOLUME_DEFAULT self.player.audio_set_volume(volume) # Ensure volume correct # For as-yet unknown reasons. sometimes the volume gets # reset to zero within 200mS or so of starting play. This # only happened since moving to Debian 12, which uses # Pipewire for sound (which may be irrelevant). for _ in range(3): current_volume = self.player.audio_get_volume() if current_volume < volume: self.player.audio_set_volume(volume) log.debug(f"Reset from {volume=}") sleep(0.1) def stop(self) -> None: """Immediately stop playing""" log.debug(f"Music[{self.name}].stop()") self.start_dt = None if not self.player: return if self.player.is_playing(): self.player.stop() self.player.release() self.player = None class RowAndTrack: """ Object to manage playlist rows and tracks. """ def __init__(self, playlist_row: PlaylistRows) -> None: """ Initialises data structure. The passed PlaylistRows object will include a Tracks object if this row has a track. """ # Collect playlistrow data self.note = playlist_row.note self.played = playlist_row.played self.playlist_id = playlist_row.playlist_id self.playlistrow_id = playlist_row.id self.row_number = playlist_row.row_number self.track_id = playlist_row.track_id # Collect track data if there's a track if playlist_row.track_id: self.artist = playlist_row.track.artist self.bitrate = playlist_row.track.bitrate self.duration = playlist_row.track.duration self.fade_at = playlist_row.track.fade_at self.intro = playlist_row.track.intro if playlist_row.track.playdates: self.lastplayed = max( [a.lastplayed for a in playlist_row.track.playdates] ) else: self.lastplayed = Config.EPOCH self.path = playlist_row.track.path self.silence_at = playlist_row.track.silence_at self.start_gap = playlist_row.track.start_gap self.title = playlist_row.track.title else: self.artist = "" self.bitrate = None self.duration = 0 self.fade_at = 0 self.intro = None self.lastplayed = Config.EPOCH self.path = "" self.silence_at = 0 self.start_gap = 0 self.title = "" # Track playing data self.end_of_track_signalled: bool = False self.end_time: Optional[dt.datetime] = None self.fade_graph: Optional[_FadeCurve] = None self.fade_graph_start_updates: Optional[dt.datetime] = None self.resume_marker: Optional[float] = 0.0 self.forecast_end_time: Optional[dt.datetime] = None self.forecast_start_time: Optional[dt.datetime] = None self.start_time: Optional[dt.datetime] = None # Other object initialisation self.music = _Music(name=Config.VLC_MAIN_PLAYER_NAME) self.signals = MusicMusterSignals() def __repr__(self) -> str: return ( f"" ) def check_for_end_of_track(self) -> None: """ Check whether track has ended. If so, emit track_ended_signal """ if self.start_time is None: return if self.end_of_track_signalled: return if self.music.is_playing(): return self.start_time = None if self.fade_graph: self.fade_graph.clear() # Ensure that player is released self.music.fade(0) self.signals.track_ended_signal.emit() self.end_of_track_signalled = True def create_fade_graph(self) -> None: """ Initialise and add FadeCurve in a thread as it's slow """ self.fadecurve_thread = QThread() self.worker = _AddFadeCurve( self, track_path=self.path, track_fade_at=self.fade_at, track_silence_at=self.silence_at, ) self.worker.moveToThread(self.fadecurve_thread) self.fadecurve_thread.started.connect(self.worker.run) self.worker.finished.connect(self.fadecurve_thread.quit) self.worker.finished.connect(self.worker.deleteLater) self.fadecurve_thread.finished.connect(self.fadecurve_thread.deleteLater) self.fadecurve_thread.start() def drop3db(self, enable: bool) -> None: """ If enable is true, drop output by 3db else restore to full volume """ if enable: self.music.set_volume(volume=Config.VLC_VOLUME_DROP3db, set_default=False) else: self.music.set_volume(volume=Config.VLC_VOLUME_DEFAULT, set_default=False) def fade(self, fade_seconds: int = Config.FADEOUT_SECONDS) -> None: """Fade music""" self.resume_marker = self.music.get_position() self.music.fade(fade_seconds) self.signals.track_ended_signal.emit() def is_playing(self) -> bool: """ Return True if we're currently playing else False """ if self.start_time is None: return False return self.music.is_playing() def move_back(self, ms: int = Config.PREVIEW_BACK_MS) -> None: """ Rewind player by ms milliseconds """ self.music.adjust_by_ms(ms * -1) def move_forward(self, ms: int = Config.PREVIEW_ADVANCE_MS) -> None: """ Rewind player by ms milliseconds """ self.music.adjust_by_ms(ms) def play(self, position: Optional[float] = None) -> None: """Play track""" now = dt.datetime.now() self.start_time = now # Initialise player self.music.play(self.path, start_time=now, position=position) self.end_time = now + dt.timedelta(milliseconds=self.duration) # Calculate time fade_graph should start updating if self.fade_at: update_graph_at_ms = max( 0, self.fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1 ) self.fade_graph_start_updates = now + dt.timedelta( milliseconds=update_graph_at_ms ) def restart(self) -> None: """ Restart player """ self.music.adjust_by_ms(self.time_playing() * -1) def set_forecast_start_time( self, modified_rows: list[int], start: Optional[dt.datetime] ) -> Optional[dt.datetime]: """ Set forecast start time for this row Update passed modified rows list if we changed the row. Return new start time """ changed = False if self.forecast_start_time != start: self.forecast_start_time = start changed = True if start is None: if self.forecast_end_time is not None: self.forecast_end_time = None changed = True new_start_time = None else: end_time = start + dt.timedelta(milliseconds=self.duration) new_start_time = end_time if self.forecast_end_time != end_time: self.forecast_end_time = end_time changed = True if changed and self.row_number not in modified_rows: modified_rows.append(self.row_number) return new_start_time def stop(self, fade_seconds: int = 0) -> None: """ Stop this track playing """ self.resume_marker = self.music.get_position() self.fade(fade_seconds) # Reset fade graph if self.fade_graph: self.fade_graph.clear() def time_playing(self) -> int: """ Return time track has been playing in milliseconds, zero if not playing """ if self.start_time is None: return 0 return self.music.get_playtime() def time_remaining_intro(self) -> int: """ Return milliseconds of intro remaining. Return 0 if no intro time in track record or if intro has finished. """ if not self.intro: return 0 return max(0, self.intro - self.time_playing()) def time_to_fade(self) -> int: """ Return milliseconds until fade time. Return zero if we're not playing. """ if self.start_time is None: return 0 return self.fade_at - self.time_playing() def time_to_silence(self) -> int: """ Return milliseconds until silent. Return zero if we're not playing. """ if self.start_time is None: return 0 return self.silence_at - self.time_playing() def update_fade_graph(self) -> None: """ Update fade graph """ if ( not self.is_playing() or not self.fade_graph_start_updates or not self.fade_graph ): return now = dt.datetime.now() if self.fade_graph_start_updates > now: return self.fade_graph.tick(self.time_playing()) def update_playlist_and_row(self, session: Session) -> None: """ Update local playlist_id and row_number from playlistrow_id """ plr = session.get(PlaylistRows, self.playlistrow_id) if not plr: raise ApplicationError(f"(Can't retrieve PlaylistRows entry, {self=}") self.playlist_id = plr.playlist_id self.row_number = plr.row_number class TrackSequence: next: Optional[RowAndTrack] = None current: Optional[RowAndTrack] = None previous: Optional[RowAndTrack] = None def set_next(self, rat: Optional[RowAndTrack]) -> None: """ Set the 'next' track to be passed rat. Clear any previous next track. If passed rat is None just clear existing next track. """ # Clear any existing fade graph if self.next and self.next.fade_graph: self.next.fade_graph.clear() if rat is None: self.next = None else: self.next = rat self.next.create_fade_graph() track_sequence = TrackSequence()