# Standard library imports import datetime as dt import threading from time import sleep from typing import Optional # Third party imports import vlc # type: ignore # PyQt imports from PyQt6.QtCore import ( QRunnable, QThreadPool, ) # App imports from config import Config from helpers import file_is_unreadable from log import log lock = threading.Lock() class FadeTrack(QRunnable): def __init__(self, player: vlc.MediaPlayer, fade_seconds) -> None: super().__init__() self.player = player self.fade_seconds = fade_seconds def run(self) -> None: """ Implementation of fading the player """ if not self.player: return # Reduce volume logarithmically total_steps = self.fade_seconds * Config.FADEOUT_STEPS_PER_SECOND db_reduction_per_step = Config.FADEOUT_DB / total_steps reduction_factor_per_step = pow(10, (db_reduction_per_step / 20)) volume = self.player.audio_get_volume() for i in range(1, total_steps + 1): self.player.audio_set_volume( int(volume * pow(reduction_factor_per_step, i)) ) sleep(1 / Config.FADEOUT_STEPS_PER_SECOND) self.player.stop() log.debug(f"Releasing player {self.player=}") self.player.release() class Music: """ Manage the playing of music tracks """ def __init__(self, name) -> None: self.VLC = vlc.Instance() self.VLC.set_user_agent(name, name) self.player = None self.name = name self.max_volume = Config.VLC_VOLUME_DEFAULT self.start_dt: Optional[dt.datetime] = None def _adjust_by_ms(self, ms: int) -> None: """Move player position by ms milliseconds""" if not self.player: return elapsed_ms = self.get_playtime() position = self.get_position() if not position: position = 0 new_position = max(0, position + ((position * ms) / elapsed_ms)) self.player.set_position(new_position) # Adjus start time so elapsed time calculations are correct if new_position == 0: self.start_dt = dt.datetime.now() else: self.start_dt -= dt.timedelta(milliseconds=ms) def fade(self, fade_seconds: int = Config.FADEOUT_SECONDS) -> None: """ Fade the currently playing track. The actual management of fading runs in its own thread so as not to hold up the UI during the fade. """ log.info(f"Music[{self.name}].stop()") if not self.player: return if not self.player.get_position() > 0 and self.player.is_playing(): return # Take a copy of current player to allow another track to be # started without interfering here with lock: p = self.player self.player = None pool = QThreadPool.globalInstance() fader = FadeTrack(p, fade_seconds=fade_seconds) pool.start(fader) self.start_dt = None def get_playtime(self) -> int: """ Return number of milliseconds current track has been playing or zero if not playing. The vlc function get_time() only updates 3-4 times a second; this function has much better resolution. """ if self.start_dt is None: return 0 now = dt.datetime.now() elapsed_seconds = (now - self.start_dt).total_seconds() return int(elapsed_seconds * 1000) def get_playtime(self) -> int: """ Return number of milliseconds current track has been playing or zero if not playing. The vlc function get_time() only updates 3-4 times a second; this function has much better resolution. """ if self.player is None or self.start_dt is None: return 0 now = dt.datetime.now() elapsed_time = now - self.start_dt elapsed_seconds = elapsed_time.seconds + (elapsed_time.microseconds / 1000000) return int(elapsed_seconds * 1000) def get_position(self) -> Optional[float]: """Return current position""" if not self.player: return None return self.player.get_position() def is_playing(self) -> bool: """ Return True if we're playing """ if not self.player: return False # There is a discrete time between starting playing a track and # player.is_playing() returning True, so assume playing if less # than Config.PLAY_SETTLE microseconds have passed since # starting play. return ( self.player is not None and self.start_dt is not None and ( self.player.is_playing() or (dt.datetime.now() - self.start_dt) < dt.timedelta(microseconds=Config.PLAY_SETTLE) ) ) def move_back(self, ms: int) -> None: """ Rewind player by ms milliseconds """ self._adjust_by_ms(ms * -1) def move_forward(self, ms: int) -> None: """ Rewind player by ms milliseconds """ self._adjust_by_ms(ms) def play(self, path: str, position: Optional[float] = None) -> None: """ Start playing the track at path. Log and return if path not found. """ log.info(f"Music[{self.name}].play({path=}, {position=}") if file_is_unreadable(path): log.error(f"play({path}): path not readable") return None media = self.VLC.media_new_path(path) self.player = media.player_new_from_media() if self.player: _ = self.player.play() self.set_volume(self.max_volume) if position: self.player.set_position(position) self.start_dt = dt.datetime.now() # For as-yet unknown reasons. sometimes the volume gets # reset to zero within 200mS or so of starting play. This # only happened since moving to Debian 12, which uses # Pipewire for sound (which may be irrelevant). # It has been known for the volume to need correcting more # than once in the first 200mS. for _ in range(3): if self.player: volume = self.player.audio_get_volume() if volume < Config.VLC_VOLUME_DEFAULT: self.set_volume(Config.VLC_VOLUME_DEFAULT) log.error(f"Reset from {volume=}") sleep(0.1) def set_volume(self, volume=None, set_default=True) -> None: """Set maximum volume used for player""" if not self.player: return if set_default: self.max_volume = volume if volume is None: volume = Config.VLC_VOLUME_DEFAULT self.player.audio_set_volume(volume) # Ensure volume correct # For as-yet unknown reasons. sometimes the volume gets # reset to zero within 200mS or so of starting play. This # only happened since moving to Debian 12, which uses # Pipewire for sound (which may be irrelevant). for _ in range(3): current_volume = self.player.audio_get_volume() if current_volume < volume: self.player.audio_set_volume(volume) log.debug(f"Reset from {volume=}") sleep(0.1) def stop(self) -> float: """Immediately stop playing""" log.info(f"Music[{self.name}].stop()") self.start_dt = None if not self.player: return 0.0 p = self.player self.player = None self.start_dt = None with lock: position = p.get_position() p.stop() p.release() p = None return position