228 lines
6.2 KiB
Python
228 lines
6.2 KiB
Python
# Standard library imports
|
|
from __future__ import annotations
|
|
|
|
import datetime as dt
|
|
from time import sleep
|
|
|
|
|
|
# Third party imports
|
|
# import line_profiler
|
|
import vlc # type: ignore
|
|
|
|
# PyQt imports
|
|
from PyQt6.QtCore import (
|
|
pyqtSignal,
|
|
QThread,
|
|
)
|
|
|
|
# App imports
|
|
from classes import MusicMusterSignals, singleton
|
|
from config import Config
|
|
import helpers
|
|
from log import log
|
|
|
|
class _FadeTrack(QThread):
|
|
finished = pyqtSignal()
|
|
|
|
def __init__(self, player: vlc.MediaPlayer, fade_seconds: int) -> None:
|
|
super().__init__()
|
|
self.player = player
|
|
self.fade_seconds = fade_seconds
|
|
|
|
def run(self) -> None:
|
|
"""
|
|
Implementation of fading the player
|
|
"""
|
|
|
|
if not self.player:
|
|
return
|
|
|
|
# Reduce volume logarithmically
|
|
total_steps = self.fade_seconds * Config.FADEOUT_STEPS_PER_SECOND
|
|
if total_steps > 0:
|
|
db_reduction_per_step = Config.FADEOUT_DB / total_steps
|
|
reduction_factor_per_step = pow(10, (db_reduction_per_step / 20))
|
|
|
|
volume = self.player.audio_get_volume()
|
|
|
|
for i in range(1, total_steps + 1):
|
|
self.player.audio_set_volume(
|
|
int(volume * pow(reduction_factor_per_step, i))
|
|
)
|
|
sleep(1 / Config.FADEOUT_STEPS_PER_SECOND)
|
|
|
|
self.finished.emit()
|
|
|
|
|
|
@singleton
|
|
class VLCManager:
|
|
"""
|
|
Singleton class to ensure we only ever have one vlc Instance
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self.vlc_instance = vlc.Instance()
|
|
|
|
def get_instance(self) -> vlc.Instance:
|
|
return self.vlc_instance
|
|
|
|
|
|
class Music:
|
|
"""
|
|
Manage the playing of music tracks
|
|
"""
|
|
|
|
def __init__(self, name: str) -> None:
|
|
self.name = name
|
|
vlc_manager = VLCManager()
|
|
self.vlc_instance = vlc_manager.get_instance()
|
|
self.vlc_instance.set_user_agent(name, name)
|
|
self.player: vlc.MediaPlayer | None = None
|
|
self.max_volume: int = Config.VLC_VOLUME_DEFAULT
|
|
self.start_dt: dt.datetime | None = None
|
|
self.signals = MusicMusterSignals()
|
|
|
|
def fade(self, fade_seconds: int) -> None:
|
|
"""
|
|
Fade the currently playing track.
|
|
|
|
The actual management of fading runs in its own thread so as not
|
|
to hold up the UI during the fade.
|
|
"""
|
|
|
|
if not self.player:
|
|
return
|
|
|
|
if not self.player.get_position() > 0 and self.player.is_playing():
|
|
return
|
|
|
|
self.fader_worker = _FadeTrack(self.player, fade_seconds=fade_seconds)
|
|
self.fader_worker.finished.connect(self.player.release)
|
|
self.fader_worker.start()
|
|
self.start_dt = None
|
|
|
|
def get_playtime(self) -> int:
|
|
"""
|
|
Return number of milliseconds current track has been playing or
|
|
zero if not playing. The vlc function get_time() only updates 3-4
|
|
times a second; this function has much better resolution.
|
|
"""
|
|
|
|
if self.start_dt is None:
|
|
return 0
|
|
|
|
now = dt.datetime.now()
|
|
elapsed_seconds = (now - self.start_dt).total_seconds()
|
|
return int(elapsed_seconds * 1000)
|
|
|
|
def get_position(self) -> float:
|
|
"""Return current position"""
|
|
|
|
if not self.player:
|
|
return 0.0
|
|
return self.player.get_position()
|
|
|
|
def is_playing(self) -> bool:
|
|
"""
|
|
Return True if we're playing
|
|
"""
|
|
|
|
if not self.player:
|
|
return False
|
|
|
|
# There is a discrete time between starting playing a track and
|
|
# player.is_playing() returning True, so assume playing if less
|
|
# than Config.PLAY_SETTLE microseconds have passed since
|
|
# starting play.
|
|
return self.start_dt is not None and (
|
|
self.player.is_playing()
|
|
or (dt.datetime.now() - self.start_dt)
|
|
< dt.timedelta(microseconds=Config.PLAY_SETTLE)
|
|
)
|
|
|
|
def play(
|
|
self,
|
|
path: str,
|
|
start_time: dt.datetime,
|
|
position: float | None = None,
|
|
) -> None:
|
|
"""
|
|
Start playing the track at path.
|
|
|
|
Log and return if path not found.
|
|
|
|
start_time ensures our version and our caller's version of
|
|
the start time is the same
|
|
"""
|
|
|
|
log.debug(f"Music[{self.name}].play({path=}, {position=}")
|
|
|
|
if helpers.file_is_unreadable(path):
|
|
log.error(f"play({path}): path not readable")
|
|
return None
|
|
|
|
self.player = vlc.MediaPlayer(self.vlc_instance, path)
|
|
if self.player is None:
|
|
log.error(f"_Music:play: failed to create MediaPlayer ({path=})")
|
|
helpers.show_warning(
|
|
None, "Error creating MediaPlayer", f"Cannot play file ({path})"
|
|
)
|
|
return
|
|
|
|
_ = self.player.play()
|
|
self.set_volume(self.max_volume)
|
|
|
|
if position:
|
|
self.player.set_position(position)
|
|
self.start_dt = start_time
|
|
|
|
def set_position(self, position: float) -> None:
|
|
"""
|
|
Set player position
|
|
"""
|
|
|
|
if self.player:
|
|
self.player.set_position(position)
|
|
|
|
def set_volume(
|
|
self, volume: int | None = None, set_default: bool = True
|
|
) -> None:
|
|
"""Set maximum volume used for player"""
|
|
|
|
if not self.player:
|
|
return
|
|
|
|
if set_default and volume:
|
|
self.max_volume = volume
|
|
|
|
if volume is None:
|
|
volume = Config.VLC_VOLUME_DEFAULT
|
|
|
|
self.player.audio_set_volume(volume)
|
|
# Ensure volume correct
|
|
# For as-yet unknown reasons. sometimes the volume gets
|
|
# reset to zero within 200mS or so of starting play. This
|
|
# only happened since moving to Debian 12, which uses
|
|
# Pipewire for sound (which may be irrelevant).
|
|
for _ in range(3):
|
|
current_volume = self.player.audio_get_volume()
|
|
if current_volume < volume:
|
|
self.player.audio_set_volume(volume)
|
|
log.debug(f"Reset from {volume=}")
|
|
sleep(0.1)
|
|
|
|
def stop(self) -> None:
|
|
"""Immediately stop playing"""
|
|
|
|
log.debug(f"Music[{self.name}].stop()")
|
|
|
|
self.start_dt = None
|
|
|
|
if not self.player:
|
|
return
|
|
|
|
if self.player.is_playing():
|
|
self.player.stop()
|
|
self.player.release()
|
|
self.player = None
|