# Standard library imports from __future__ import annotations import datetime as dt from time import sleep from typing import Any, Optional # Third party imports # import line_profiler import numpy as np import pyqtgraph as pg # type: ignore from sqlalchemy.orm.session import Session import vlc # type: ignore # PyQt imports from PyQt6.QtCore import ( pyqtSignal, QObject, QThread, ) from pyqtgraph import PlotWidget from pyqtgraph.graphicsItems.PlotDataItem import PlotDataItem # type: ignore from pyqtgraph.graphicsItems.LinearRegionItem import LinearRegionItem # type: ignore # App imports from classes import ApplicationError, MusicMusterSignals from config import Config import helpers from log import log from repository import PlaylistRowDTO from vlcmanager import VLCManager # Define the VLC callback function type # import ctypes # import platform # VLC logging is very noisy so comment out unless needed # VLC_LOG_CB = ctypes.CFUNCTYPE( # None, # ctypes.c_void_p, # ctypes.c_int, # ctypes.c_void_p, # ctypes.c_char_p, # ctypes.c_void_p, # ) # # Determine the correct C library for vsnprintf based on the platform # if platform.system() == "Windows": # libc = ctypes.CDLL("msvcrt") # elif platform.system() == "Linux": # libc = ctypes.CDLL("libc.so.6") # elif platform.system() == "Darwin": # macOS # libc = ctypes.CDLL("libc.dylib") # else: # raise OSError("Unsupported operating system") # # Define the vsnprintf function # libc.vsnprintf.argtypes = [ # ctypes.c_char_p, # ctypes.c_size_t, # ctypes.c_char_p, # ctypes.c_void_p, # ] # libc.vsnprintf.restype = ctypes.c_int class _AddFadeCurve(QObject): """ Initialising a fade curve introduces a noticeable delay so carry out in a thread. """ finished = pyqtSignal() def __init__( self, plr: PlaylistRow, track_path: str, track_fade_at: int, track_silence_at: int, ) -> None: super().__init__() self.plr = plr self.track_path = track_path self.track_fade_at = track_fade_at self.track_silence_at = track_silence_at def run(self) -> None: """ Create fade curve and add to PlaylistTrack object """ fc = FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at) if not fc: log.error(f"Failed to create FadeCurve for {self.track_path=}") else: self.plr.fade_graph = fc self.finished.emit() class FadeCurve: GraphWidget: Optional[PlotWidget] = None def __init__( self, track_path: str, track_fade_at: int, track_silence_at: int ) -> None: """ Set up fade graph array """ audio = helpers.get_audio_segment(track_path) if not audio: log.error(f"FadeCurve: could not get audio for {track_path=}") return None # Start point of curve is Config.FADE_CURVE_MS_BEFORE_FADE # milliseconds before fade starts to silence self.start_ms: int = max( 0, track_fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1 ) self.end_ms: int = track_silence_at audio_segment = audio[self.start_ms : self.end_ms] self.graph_array = np.array(audio_segment.get_array_of_samples()) # Calculate the factor to map milliseconds of track to array self.ms_to_array_factor = len(self.graph_array) / (self.end_ms - self.start_ms) self.curve: Optional[PlotDataItem] = None self.region: Optional[LinearRegionItem] = None def clear(self) -> None: """Clear the current graph""" if self.GraphWidget: self.GraphWidget.clear() def plot(self) -> None: if self.GraphWidget: self.curve = self.GraphWidget.plot(self.graph_array) if self.curve: self.curve.setPen(Config.FADE_CURVE_FOREGROUND) else: log.debug("_FadeCurve.plot: no curve") else: log.debug("_FadeCurve.plot: no GraphWidget") def tick(self, play_time: int) -> None: """Update volume fade curve""" if not self.GraphWidget: return ms_of_graph = play_time - self.start_ms if ms_of_graph < 0: return if self.region is None: # Create the region now that we're into fade self.region = pg.LinearRegionItem([0, 0], bounds=[0, len(self.graph_array)]) self.GraphWidget.addItem(self.region) # Update region position if self.region: self.region.setRegion([0, ms_of_graph * self.ms_to_array_factor]) class _FadeTrack(QThread): finished = pyqtSignal() def __init__(self, player: vlc.MediaPlayer, fade_seconds: int) -> None: super().__init__() self.player = player self.fade_seconds = fade_seconds def run(self) -> None: """ Implementation of fading the player """ if not self.player: return # Reduce volume logarithmically total_steps = self.fade_seconds * Config.FADEOUT_STEPS_PER_SECOND if total_steps > 0: db_reduction_per_step = Config.FADEOUT_DB / total_steps reduction_factor_per_step = pow(10, (db_reduction_per_step / 20)) volume = self.player.audio_get_volume() for i in range(1, total_steps + 1): self.player.audio_set_volume( int(volume * pow(reduction_factor_per_step, i)) ) sleep(1 / Config.FADEOUT_STEPS_PER_SECOND) self.finished.emit() # TODO can we move this into the _Music class? vlc_instance = VLCManager().vlc_instance class _Music: """ Manage the playing of music tracks """ def __init__(self, name: str) -> None: vlc_instance.set_user_agent(name, name) self.player: Optional[vlc.MediaPlayer] = None self.name = name self.max_volume: int = Config.VLC_VOLUME_DEFAULT self.start_dt: Optional[dt.datetime] = None # Set up logging # self._set_vlc_log() # VLC logging very noisy so comment out unless needed # @VLC_LOG_CB # def log_callback(data, level, ctx, fmt, args): # try: # # Create a ctypes string buffer to hold the formatted message # buf = ctypes.create_string_buffer(1024) # # Use vsnprintf to format the string with the va_list # libc.vsnprintf(buf, len(buf), fmt, args) # # Decode the formatted message # message = buf.value.decode("utf-8", errors="replace") # log.debug("VLC: " + message) # except Exception as e: # log.error(f"Error in VLC log callback: {e}") # def _set_vlc_log(self): # try: # vlc.libvlc_log_set(vlc_instance, self.log_callback, None) # log.debug("VLC logging set up successfully") # except Exception as e: # log.error(f"Failed to set up VLC logging: {e}") def fade(self, fade_seconds: int) -> None: """ Fade the currently playing track. The actual management of fading runs in its own thread so as not to hold up the UI during the fade. """ if not self.player: return if not self.player.get_position() > 0 and self.player.is_playing(): return self.fader_worker = _FadeTrack(self.player, fade_seconds=fade_seconds) self.fader_worker.finished.connect(self.player.release) self.fader_worker.start() self.start_dt = None def get_playtime(self) -> int: """ Return number of milliseconds current track has been playing or zero if not playing. The vlc function get_time() only updates 3-4 times a second; this function has much better resolution. """ if self.start_dt is None: return 0 now = dt.datetime.now() elapsed_seconds = (now - self.start_dt).total_seconds() return int(elapsed_seconds * 1000) def get_position(self) -> float: """Return current position""" if not self.player: return 0.0 return self.player.get_position() def is_playing(self) -> bool: """ Return True if we're playing """ if not self.player: return False # There is a discrete time between starting playing a track and # player.is_playing() returning True, so assume playing if less # than Config.PLAY_SETTLE microseconds have passed since # starting play. return self.start_dt is not None and ( self.player.is_playing() or (dt.datetime.now() - self.start_dt) < dt.timedelta(microseconds=Config.PLAY_SETTLE) ) def play( self, path: str, start_time: dt.datetime, position: Optional[float] = None, ) -> None: """ Start playing the track at path. Log and return if path not found. start_time ensures our version and our caller's version of the start time is the same """ log.debug(f"Music[{self.name}].play({path=}, {position=}") if helpers.file_is_unreadable(path): log.error(f"play({path}): path not readable") return None self.player = vlc.MediaPlayer(vlc_instance, path) if self.player is None: log.error(f"_Music:play: failed to create MediaPlayer ({path=})") helpers.show_warning( None, "Error creating MediaPlayer", f"Cannot play file ({path})" ) return _ = self.player.play() self.set_volume(self.max_volume) if position: self.player.set_position(position) self.start_dt = start_time def set_position(self, position: float) -> None: """ Set player position """ if self.player: self.player.set_position(position) def set_volume( self, volume: Optional[int] = None, set_default: bool = True ) -> None: """Set maximum volume used for player""" if not self.player: return if set_default and volume: self.max_volume = volume if volume is None: volume = Config.VLC_VOLUME_DEFAULT self.player.audio_set_volume(volume) # Ensure volume correct # For as-yet unknown reasons. sometimes the volume gets # reset to zero within 200mS or so of starting play. This # only happened since moving to Debian 12, which uses # Pipewire for sound (which may be irrelevant). for _ in range(3): current_volume = self.player.audio_get_volume() if current_volume < volume: self.player.audio_set_volume(volume) log.debug(f"Reset from {volume=}") sleep(0.1) def stop(self) -> None: """Immediately stop playing""" log.debug(f"Music[{self.name}].stop()") self.start_dt = None if not self.player: return if self.player.is_playing(): self.player.stop() self.player.release() self.player = None class PlaylistRow: """ Object to manage playlist row and track. """ def __init__(self, dto: PlaylistRowDTO) -> None: """ The dto object will include a Tracks object if this row has a track. """ self.dto = dto self.music = _Music(name=Config.VLC_MAIN_PLAYER_NAME) self.signals = MusicMusterSignals() self.end_of_track_signalled: bool = False self.end_time: dt.datetime | None = None self.fade_graph: Any | None = None self.fade_graph_start_updates: dt.datetime | None = None self.forecast_end_time: dt.datetime | None = None self.forecast_start_time: dt.datetime | None = None self.note_bg: str | None = None self.note_fg: str | None = None self.resume_marker: float = 0.0 self.row_bg: str | None = None self.row_fg: str | None = None self.start_time: dt.datetime | None = None def __repr__(self) -> str: return ( f"" ) # Expose TrackDTO fields as properties @property def artist(self): return self.dto.artist @property def bitrate(self): return self.dto.bitrate @property def duration(self): return self.dto.duration @property def fade_at(self): return self.dto.fade_at @property def intro(self): return self.dto.intro @property def lastplayed(self): return self.dto.lastplayed @property def path(self): return self.dto.path @property def silence_at(self): return self.dto.silence_at @property def start_gap(self): return self.dto.start_gap @property def title(self): return self.dto.title @property def track_id(self): return self.dto.track_id @track_id.setter def track_id(self, value: int) -> None: """ Adding a track_id should only happen to a header row. """ if self.track_id: raise ApplicationError("Attempting to add track to row with existing track ({self=}") # TODO: set up write access to track_id. Should only update if # track_id == 0. Need to update all other track fields at the # same time. print("set track_id attribute for {self=}, {value=}") pass # Expose PlaylistRowDTO fields as properties @property def note(self): return self.dto.note @note.setter def note(self, value: str) -> None: # TODO set up write access to db print("set note attribute for {self=}, {value=}") # self.dto.note = value @property def played(self): return self.dto.played @played.setter def played(self, value: bool = True) -> None: # TODO set up write access to db print("set played attribute for {self=}") # self.dto.played = value @property def playlist_id(self): return self.dto.playlist_id @property def playlistrow_id(self): return self.dto.playlistrow_id @property def row_number(self): return self.dto.row_number @row_number.setter def row_number(self, value: int) -> None: # TODO do we need to set up write access to db? self.dto.row_number = value def check_for_end_of_track(self) -> None: """ Check whether track has ended. If so, emit track_ended_signal """ if self.start_time is None: return if self.end_of_track_signalled: return if self.music.is_playing(): return self.start_time = None if self.fade_graph: self.fade_graph.clear() # Ensure that player is released self.music.fade(0) self.signals.track_ended_signal.emit() self.end_of_track_signalled = True def drop3db(self, enable: bool) -> None: """ If enable is true, drop output by 3db else restore to full volume """ if enable: self.music.set_volume(volume=Config.VLC_VOLUME_DROP3db, set_default=False) else: self.music.set_volume(volume=Config.VLC_VOLUME_DEFAULT, set_default=False) def fade(self, fade_seconds: int = Config.FADEOUT_SECONDS) -> None: """Fade music""" self.resume_marker = self.music.get_position() self.music.fade(fade_seconds) self.signals.track_ended_signal.emit() def is_playing(self) -> bool: """ Return True if we're currently playing else False """ if self.start_time is None: return False return self.music.is_playing() def play(self, position: Optional[float] = None) -> None: """Play track""" now = dt.datetime.now() self.start_time = now # Initialise player self.music.play(self.path, start_time=now, position=position) self.end_time = now + dt.timedelta(milliseconds=self.duration) # Calculate time fade_graph should start updating if self.fade_at: update_graph_at_ms = max( 0, self.fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1 ) self.fade_graph_start_updates = now + dt.timedelta( milliseconds=update_graph_at_ms ) def set_forecast_start_time( self, modified_rows: list[int], start: Optional[dt.datetime] ) -> Optional[dt.datetime]: """ Set forecast start time for this row Update passed modified rows list if we changed the row. Return new start time """ changed = False if self.forecast_start_time != start: self.forecast_start_time = start changed = True if start is None: if self.forecast_end_time is not None: self.forecast_end_time = None changed = True new_start_time = None else: end_time = start + dt.timedelta(milliseconds=self.duration()) new_start_time = end_time if self.forecast_end_time != end_time: self.forecast_end_time = end_time changed = True if changed and self.row_number not in modified_rows: modified_rows.append(self.row_number) return new_start_time def stop(self, fade_seconds: int = 0) -> None: """ Stop this track playing """ self.resume_marker = self.music.get_position() self.fade(fade_seconds) # Reset fade graph if self.fade_graph: self.fade_graph.clear() def time_playing(self) -> int: """ Return time track has been playing in milliseconds, zero if not playing """ if self.start_time is None: return 0 return self.music.get_playtime() def time_remaining_intro(self) -> int: """ Return milliseconds of intro remaining. Return 0 if no intro time in track record or if intro has finished. """ if not self.intro: return 0 return max(0, self.intro - self.time_playing()) def time_to_fade(self) -> int: """ Return milliseconds until fade time. Return zero if we're not playing. """ if self.start_time is None: return 0 return self.fade_at - self.time_playing() def time_to_silence(self) -> int: """ Return milliseconds until silent. Return zero if we're not playing. """ if self.start_time is None: return 0 return self.silence_at - self.time_playing() def update_fade_graph(self) -> None: """ Update fade graph """ if ( not self.is_playing() or not self.fade_graph_start_updates or not self.fade_graph ): return now = dt.datetime.now() if self.fade_graph_start_updates > now: return self.fade_graph.tick(self.time_playing()) def update_playlist_and_row(self, session: Session) -> None: """ Update local playlist_id and row_number from playlistrow_id """ # TODO: only seems to be used by track_sequence return # plr = session.get(PlaylistRows, self.playlistrow_id) # if not plr: # raise ApplicationError(f"(Can't retrieve PlaylistRows entry, {self=}") # self.playlist_id = plr.playlist_id # self.row_number = plr.row_number class TrackSequence: next: Optional[PlaylistRow] = None current: Optional[PlaylistRow] = None previous: Optional[PlaylistRow] = None def set_next(self, rat: Optional[PlaylistRow]) -> None: """ Set the 'next' track to be passed rat. Clear any previous next track. If passed rat is None just clear existing next track. """ # Clear any existing fade graph if self.next and self.next.fade_graph: self.next.fade_graph.clear() if rat is None: self.next = None else: self.next = rat self.create_fade_graph() def create_fade_graph(self) -> None: """ Initialise and add FadeCurve in a thread as it's slow """ self.fadecurve_thread = QThread() if self.next is None: raise ApplicationError("hell in a handcart") self.worker = _AddFadeCurve( self.next, track_path=self.next.path, track_fade_at=self.next.fade_at, track_silence_at=self.next.silence_at, ) self.worker.moveToThread(self.fadecurve_thread) self.fadecurve_thread.started.connect(self.worker.run) self.worker.finished.connect(self.fadecurve_thread.quit) self.worker.finished.connect(self.worker.deleteLater) self.fadecurve_thread.finished.connect(self.fadecurve_thread.deleteLater) self.fadecurve_thread.start() track_sequence = TrackSequence()