# Standard library imports from __future__ import annotations import ctypes from dataclasses import dataclass, field import datetime as dt from enum import auto, Enum import platform from time import sleep from typing import Any, Optional, NamedTuple # Third party imports import numpy as np import pyqtgraph as pg # type: ignore import vlc # type: ignore # PyQt imports from PyQt6.QtCore import ( pyqtSignal, QObject, QThread, ) from pyqtgraph import PlotWidget from pyqtgraph.graphicsItems.PlotDataItem import PlotDataItem # type: ignore from pyqtgraph.graphicsItems.LinearRegionItem import LinearRegionItem # type: ignore # App imports from config import Config from log import log from models import PlaylistRows from helpers import ( file_is_unreadable, get_audio_segment, show_warning, singleton, ) from vlcmanager import VLCManager # Define the VLC callback function type # VLC logging is very noisy so comment out unless needed # VLC_LOG_CB = ctypes.CFUNCTYPE( # None, # ctypes.c_void_p, # ctypes.c_int, # ctypes.c_void_p, # ctypes.c_char_p, # ctypes.c_void_p, # ) # Determine the correct C library for vsnprintf based on the platform if platform.system() == "Windows": libc = ctypes.CDLL("msvcrt") elif platform.system() == "Linux": libc = ctypes.CDLL("libc.so.6") elif platform.system() == "Darwin": # macOS libc = ctypes.CDLL("libc.dylib") else: raise OSError("Unsupported operating system") # Define the vsnprintf function libc.vsnprintf.argtypes = [ ctypes.c_char_p, ctypes.c_size_t, ctypes.c_char_p, ctypes.c_void_p, ] libc.vsnprintf.restype = ctypes.c_int class Col(Enum): START_GAP = 0 TITLE = auto() ARTIST = auto() INTRO = auto() DURATION = auto() START_TIME = auto() END_TIME = auto() LAST_PLAYED = auto() BITRATE = auto() NOTE = auto() class _AddFadeCurve(QObject): """ Initialising a fade curve introduces a noticeable delay so carry out in a thread. """ finished = pyqtSignal() def __init__( self, rat: RowAndTrack, track_path: str, track_fade_at: int, track_silence_at: int, ) -> None: super().__init__() self.rat = rat self.track_path = track_path self.track_fade_at = track_fade_at self.track_silence_at = track_silence_at def run(self) -> None: """ Create fade curve and add to PlaylistTrack object """ fc = _FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at) if not fc: log.error(f"Failed to create FadeCurve for {self.track_path=}") else: self.rat.fade_graph = fc self.finished.emit() class _FadeCurve: GraphWidget: Optional[PlotWidget] = None def __init__( self, track_path: str, track_fade_at: int, track_silence_at: int ) -> None: """ Set up fade graph array """ audio = get_audio_segment(track_path) if not audio: log.error(f"FadeCurve: could not get audio for {track_path=}") return None # Start point of curve is Config.FADE_CURVE_MS_BEFORE_FADE # milliseconds before fade starts to silence self.start_ms: int = max( 0, track_fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1 ) self.end_ms: int = track_silence_at audio_segment = audio[self.start_ms : self.end_ms] self.graph_array = np.array(audio_segment.get_array_of_samples()) # Calculate the factor to map milliseconds of track to array self.ms_to_array_factor = len(self.graph_array) / (self.end_ms - self.start_ms) self.curve: Optional[PlotDataItem] = None self.region: Optional[LinearRegionItem] = None def clear(self) -> None: """Clear the current graph""" if self.GraphWidget: self.GraphWidget.clear() def plot(self) -> None: if self.GraphWidget: self.curve = self.GraphWidget.plot(self.graph_array) if self.curve: self.curve.setPen(Config.FADE_CURVE_FOREGROUND) else: log.debug("_FadeCurve.plot: no curve") else: log.debug("_FadeCurve.plot: no GraphWidget") def tick(self, play_time: int) -> None: """Update volume fade curve""" if not self.GraphWidget: return ms_of_graph = play_time - self.start_ms if ms_of_graph < 0: return if self.region is None: # Create the region now that we're into fade log.debug("issue223: _FadeCurve: create region") self.region = pg.LinearRegionItem([0, 0], bounds=[0, len(self.graph_array)]) self.GraphWidget.addItem(self.region) # Update region position if self.region: # Next line is very noisy # log.debug("issue223: _FadeCurve: update region") self.region.setRegion([0, ms_of_graph * self.ms_to_array_factor]) class _FadeTrack(QThread): finished = pyqtSignal() def __init__(self, player: vlc.MediaPlayer, fade_seconds: int) -> None: super().__init__() self.player = player self.fade_seconds = fade_seconds def run(self) -> None: """ Implementation of fading the player """ if not self.player: return # Reduce volume logarithmically total_steps = self.fade_seconds * Config.FADEOUT_STEPS_PER_SECOND if total_steps > 0: db_reduction_per_step = Config.FADEOUT_DB / total_steps reduction_factor_per_step = pow(10, (db_reduction_per_step / 20)) volume = self.player.audio_get_volume() for i in range(1, total_steps + 1): self.player.audio_set_volume( int(volume * pow(reduction_factor_per_step, i)) ) sleep(1 / Config.FADEOUT_STEPS_PER_SECOND) self.finished.emit() vlc_instance = VLCManager().vlc_instance class _Music: """ Manage the playing of music tracks """ def __init__(self, name: str) -> None: vlc_instance.set_user_agent(name, name) self.player: Optional[vlc.MediaPlayer] = None self.name = name self.max_volume: int = Config.VLC_VOLUME_DEFAULT self.start_dt: Optional[dt.datetime] = None # Set up logging # self._set_vlc_log() # VLC logging very noisy so comment out unless needed # @VLC_LOG_CB # def log_callback(data, level, ctx, fmt, args): # try: # # Create a ctypes string buffer to hold the formatted message # buf = ctypes.create_string_buffer(1024) # # Use vsnprintf to format the string with the va_list # libc.vsnprintf(buf, len(buf), fmt, args) # # Decode the formatted message # message = buf.value.decode("utf-8", errors="replace") # log.debug("VLC: " + message) # except Exception as e: # log.error(f"Error in VLC log callback: {e}") # def _set_vlc_log(self): # try: # vlc.libvlc_log_set(vlc_instance, self.log_callback, None) # log.debug("VLC logging set up successfully") # except Exception as e: # log.error(f"Failed to set up VLC logging: {e}") def adjust_by_ms(self, ms: int) -> None: """Move player position by ms milliseconds""" if not self.player: return elapsed_ms = self.get_playtime() position = self.get_position() if not position: position = 0.0 new_position = max(0.0, position + ((position * ms) / elapsed_ms)) self.set_position(new_position) # Adjus start time so elapsed time calculations are correct if new_position == 0: self.start_dt = dt.datetime.now() else: if self.start_dt: self.start_dt -= dt.timedelta(milliseconds=ms) else: self.start_dt = dt.datetime.now() - dt.timedelta(milliseconds=ms) def fade(self, fade_seconds: int) -> None: """ Fade the currently playing track. The actual management of fading runs in its own thread so as not to hold up the UI during the fade. """ if not self.player: return if not self.player.get_position() > 0 and self.player.is_playing(): return self.fader_worker = _FadeTrack(self.player, fade_seconds=fade_seconds) self.fader_worker.finished.connect(self.player.release) self.fader_worker.start() self.start_dt = None def get_playtime(self) -> int: """ Return number of milliseconds current track has been playing or zero if not playing. The vlc function get_time() only updates 3-4 times a second; this function has much better resolution. """ if self.start_dt is None: return 0 now = dt.datetime.now() elapsed_seconds = (now - self.start_dt).total_seconds() return int(elapsed_seconds * 1000) def get_position(self) -> Optional[float]: """Return current position""" if not self.player: return None return self.player.get_position() def is_playing(self) -> bool: """ Return True if we're playing """ if not self.player: return False # There is a discrete time between starting playing a track and # player.is_playing() returning True, so assume playing if less # than Config.PLAY_SETTLE microseconds have passed since # starting play. return self.start_dt is not None and ( self.player.is_playing() or (dt.datetime.now() - self.start_dt) < dt.timedelta(microseconds=Config.PLAY_SETTLE) ) def play( self, path: str, start_time: dt.datetime, position: Optional[float] = None, ) -> None: """ Start playing the track at path. Log and return if path not found. start_time ensures our version and our caller's version of the start time is the same """ log.debug(f"Music[{self.name}].play({path=}, {position=}") if file_is_unreadable(path): log.error(f"play({path}): path not readable") return None self.player = vlc.MediaPlayer(vlc_instance, path) if self.player is None: log.error(f"_Music:play: failed to create MediaPlayer ({path=})") show_warning(None, "Error creating MediaPlayer", f"Cannot play file ({path})") return _ = self.player.play() self.set_volume(self.max_volume) if position: self.player.set_position(position) self.start_dt = start_time # For as-yet unknown reasons. sometimes the volume gets # reset to zero within 200mS or so of starting play. This # only happened since moving to Debian 12, which uses # Pipewire for sound (which may be irrelevant). # It has been known for the volume to need correcting more # than once in the first 200mS. # Update August 2024: This no longer seems to be an issue # for _ in range(3): # if self.player: # volume = self.player.audio_get_volume() # if volume < Config.VLC_VOLUME_DEFAULT: # self.set_volume(Config.VLC_VOLUME_DEFAULT) # log.error(f"Reset from {volume=}") # sleep(0.1) def set_position(self, position: float) -> None: """ Set player position """ if self.player: self.player.set_position(position) def set_volume( self, volume: Optional[int] = None, set_default: bool = True ) -> None: """Set maximum volume used for player""" if not self.player: return if set_default and volume: self.max_volume = volume if volume is None: volume = Config.VLC_VOLUME_DEFAULT self.player.audio_set_volume(volume) # Ensure volume correct # For as-yet unknown reasons. sometimes the volume gets # reset to zero within 200mS or so of starting play. This # only happened since moving to Debian 12, which uses # Pipewire for sound (which may be irrelevant). for _ in range(3): current_volume = self.player.audio_get_volume() if current_volume < volume: self.player.audio_set_volume(volume) log.debug(f"Reset from {volume=}") sleep(0.1) def stop(self) -> None: """Immediately stop playing""" log.debug(f"Music[{self.name}].stop()") self.start_dt = None if not self.player: return if self.player.is_playing(): self.player.stop() self.player.release() self.player = None @singleton @dataclass class MusicMusterSignals(QObject): """ Class for all MusicMuster signals. See: - https://zetcode.com/gui/pyqt5/eventssignals/ - https://stackoverflow.com/questions/62654525/ emit-a-signal-from-another-class-to-main-class and Singleton class at https://refactoring.guru/design-patterns/singleton/python/example#example-0 """ begin_reset_model_signal = pyqtSignal(int) enable_escape_signal = pyqtSignal(bool) end_reset_model_signal = pyqtSignal(int) next_track_changed_signal = pyqtSignal() resize_rows_signal = pyqtSignal(int) search_songfacts_signal = pyqtSignal(str) search_wikipedia_signal = pyqtSignal(str) show_warning_signal = pyqtSignal(str, str) span_cells_signal = pyqtSignal(int, int, int, int, int) status_message_signal = pyqtSignal(str, int) track_ended_signal = pyqtSignal() def __post_init__(self): super().__init__() class RowAndTrack: """ Object to manage playlist rows and tracks. """ def __init__(self, playlist_row: PlaylistRows) -> None: """ Initialises data structure. The passed PlaylistRows object will include a Tracks object if this row has a track. """ # Collect playlistrow data self.note = playlist_row.note self.played = playlist_row.played self.playlist_id = playlist_row.playlist_id self.playlistrow_id = playlist_row.id self.row_number = playlist_row.row_number self.track_id = playlist_row.track_id # Collect track data if there's a track if playlist_row.track_id: self.artist = playlist_row.track.artist self.bitrate = playlist_row.track.bitrate self.duration = playlist_row.track.duration self.fade_at = playlist_row.track.fade_at self.intro = playlist_row.track.intro if playlist_row.track.playdates: self.lastplayed = max( [a.lastplayed for a in playlist_row.track.playdates] ) else: self.lastplayed = Config.EPOCH self.path = playlist_row.track.path self.silence_at = playlist_row.track.silence_at self.start_gap = playlist_row.track.start_gap self.title = playlist_row.track.title else: self.artist = "" self.bitrate = None self.duration = 0 self.fade_at = 0 self.intro = None self.lastplayed = Config.EPOCH self.path = "" self.silence_at = 0 self.start_gap = 0 self.title = "" # Track playing data self.end_of_track_signalled: bool = False self.end_time: Optional[dt.datetime] = None self.fade_graph: Optional[_FadeCurve] = None self.fade_graph_start_updates: Optional[dt.datetime] = None self.resume_marker: Optional[float] = 0.0 self.forecast_end_time: Optional[dt.datetime] = None self.forecast_start_time: Optional[dt.datetime] = None self.start_time: Optional[dt.datetime] = None # Other object initialisation self.music = _Music(name=Config.VLC_MAIN_PLAYER_NAME) self.signals = MusicMusterSignals() def __repr__(self) -> str: return ( f"" ) def check_for_end_of_track(self) -> None: """ Check whether track has ended. If so, emit track_ended_signal """ if self.start_time is None: return if self.end_of_track_signalled: return if self.music.is_playing(): return self.start_time = None if self.fade_graph: self.fade_graph.clear() # Ensure that player is released self.music.fade(0) self.signals.track_ended_signal.emit() self.end_of_track_signalled = True def create_fade_graph(self) -> None: """ Initialise and add FadeCurve in a thread as it's slow """ self.fadecurve_thread = QThread() self.worker = _AddFadeCurve( self, track_path=self.path, track_fade_at=self.fade_at, track_silence_at=self.silence_at, ) self.worker.moveToThread(self.fadecurve_thread) self.fadecurve_thread.started.connect(self.worker.run) self.worker.finished.connect(self.fadecurve_thread.quit) self.worker.finished.connect(self.worker.deleteLater) self.fadecurve_thread.finished.connect(self.fadecurve_thread.deleteLater) self.fadecurve_thread.start() def drop3db(self, enable: bool) -> None: """ If enable is true, drop output by 3db else restore to full volume """ if enable: self.music.set_volume(volume=Config.VLC_VOLUME_DROP3db, set_default=False) else: self.music.set_volume(volume=Config.VLC_VOLUME_DEFAULT, set_default=False) def fade(self, fade_seconds: int = Config.FADEOUT_SECONDS) -> None: """Fade music""" self.resume_marker = self.music.get_position() self.music.fade(fade_seconds) self.signals.track_ended_signal.emit() def is_playing(self) -> bool: """ Return True if we're currently playing else False """ if self.start_time is None: return False return self.music.is_playing() def move_back(self, ms: int = Config.PREVIEW_BACK_MS) -> None: """ Rewind player by ms milliseconds """ self.music.adjust_by_ms(ms * -1) def move_forward(self, ms: int = Config.PREVIEW_ADVANCE_MS) -> None: """ Rewind player by ms milliseconds """ self.music.adjust_by_ms(ms) def play(self, position: Optional[float] = None) -> None: """Play track""" log.debug(f"issue223: RowAndTrack: play {self.track_id=}") now = dt.datetime.now() self.start_time = now # Initialise player self.music.play(self.path, start_time=now, position=position) self.end_time = now + dt.timedelta(milliseconds=self.duration) # Calculate time fade_graph should start updating if self.fade_at: update_graph_at_ms = max( 0, self.fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1 ) self.fade_graph_start_updates = now + dt.timedelta( milliseconds=update_graph_at_ms ) def restart(self) -> None: """ Restart player """ self.music.adjust_by_ms(self.time_playing() * -1) def set_forecast_start_time( self, modified_rows: list[int], start: Optional[dt.datetime] ) -> Optional[dt.datetime]: """ Set forecast start time for this row Update passed modified rows list if we changed the row. Return new start time """ changed = False if self.forecast_start_time != start: self.forecast_start_time = start changed = True if start is None: if self.forecast_end_time is not None: self.forecast_end_time = None changed = True new_start_time = None else: end_time = start + dt.timedelta(milliseconds=self.duration) new_start_time = end_time if self.forecast_end_time != end_time: self.forecast_end_time = end_time changed = True if changed and self.row_number not in modified_rows: modified_rows.append(self.row_number) return new_start_time def stop(self, fade_seconds: int = 0) -> None: """ Stop this track playing """ self.resume_marker = self.music.get_position() self.fade(fade_seconds) # Reset fade graph if self.fade_graph: self.fade_graph.clear() def time_playing(self) -> int: """ Return time track has been playing in milliseconds, zero if not playing """ if self.start_time is None: return 0 return self.music.get_playtime() def time_remaining_intro(self) -> int: """ Return milliseconds of intro remaining. Return 0 if no intro time in track record or if intro has finished. """ if not self.intro: return 0 return max(0, self.intro - self.time_playing()) def time_to_fade(self) -> int: """ Return milliseconds until fade time. Return zero if we're not playing. """ if self.start_time is None: return 0 return self.fade_at - self.time_playing() def time_to_silence(self) -> int: """ Return milliseconds until silent. Return zero if we're not playing. """ if self.start_time is None: return 0 return self.silence_at - self.time_playing() def update_fade_graph(self) -> None: """ Update fade graph """ if ( not self.is_playing() or not self.fade_graph_start_updates or not self.fade_graph ): return now = dt.datetime.now() if self.fade_graph_start_updates > now: return self.fade_graph.tick(self.time_playing()) @dataclass class TrackFileData: """ Simple class to track details changes to a track file """ new_file_path: str track_id: int = 0 track_path: Optional[str] = None obsolete_path: Optional[str] = None tags: dict[str, Any] = field(default_factory=dict) audio_metadata: dict[str, str | int | float] = field(default_factory=dict) class TrackInfo(NamedTuple): track_id: int row_number: int class TrackSequence: next: Optional[RowAndTrack] = None current: Optional[RowAndTrack] = None previous: Optional[RowAndTrack] = None def set_next(self, rat: Optional[RowAndTrack]) -> None: """ Set the 'next' track to be passed rat. Clear any previous next track. If passed rat is None just clear existing next track. """ # Clear any existing fade graph if self.next and self.next.fade_graph: self.next.fade_graph.clear() if rat is None: self.next = None else: self.next = rat self.next.create_fade_graph() track_sequence = TrackSequence()