musicmuster/app/music_manager.py
2025-03-29 18:20:38 +00:00

751 lines
21 KiB
Python

# Standard library imports
from __future__ import annotations
import datetime as dt
from time import sleep
from typing import Any, Optional
# Third party imports
# import line_profiler
import numpy as np
import pyqtgraph as pg # type: ignore
from sqlalchemy.orm.session import Session
import vlc # type: ignore
# PyQt imports
from PyQt6.QtCore import (
pyqtSignal,
QObject,
QThread,
)
from pyqtgraph import PlotWidget
from pyqtgraph.graphicsItems.PlotDataItem import PlotDataItem # type: ignore
from pyqtgraph.graphicsItems.LinearRegionItem import LinearRegionItem # type: ignore
# App imports
from classes import ApplicationError, MusicMusterSignals
from config import Config
import helpers
from log import log
from repository import PlaylistRowDTO
from vlcmanager import VLCManager
# Define the VLC callback function type
# import ctypes
# import platform
# VLC logging is very noisy so comment out unless needed
# VLC_LOG_CB = ctypes.CFUNCTYPE(
# None,
# ctypes.c_void_p,
# ctypes.c_int,
# ctypes.c_void_p,
# ctypes.c_char_p,
# ctypes.c_void_p,
# )
# # Determine the correct C library for vsnprintf based on the platform
# if platform.system() == "Windows":
# libc = ctypes.CDLL("msvcrt")
# elif platform.system() == "Linux":
# libc = ctypes.CDLL("libc.so.6")
# elif platform.system() == "Darwin": # macOS
# libc = ctypes.CDLL("libc.dylib")
# else:
# raise OSError("Unsupported operating system")
# # Define the vsnprintf function
# libc.vsnprintf.argtypes = [
# ctypes.c_char_p,
# ctypes.c_size_t,
# ctypes.c_char_p,
# ctypes.c_void_p,
# ]
# libc.vsnprintf.restype = ctypes.c_int
class _AddFadeCurve(QObject):
"""
Initialising a fade curve introduces a noticeable delay so carry out in
a thread.
"""
finished = pyqtSignal()
def __init__(
self,
plr: PlaylistRow,
track_path: str,
track_fade_at: int,
track_silence_at: int,
) -> None:
super().__init__()
self.plr = plr
self.track_path = track_path
self.track_fade_at = track_fade_at
self.track_silence_at = track_silence_at
def run(self) -> None:
"""
Create fade curve and add to PlaylistTrack object
"""
fc = FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at)
if not fc:
log.error(f"Failed to create FadeCurve for {self.track_path=}")
else:
self.plr.fade_graph = fc
self.finished.emit()
class FadeCurve:
GraphWidget: Optional[PlotWidget] = None
def __init__(
self, track_path: str, track_fade_at: int, track_silence_at: int
) -> None:
"""
Set up fade graph array
"""
audio = helpers.get_audio_segment(track_path)
if not audio:
log.error(f"FadeCurve: could not get audio for {track_path=}")
return None
# Start point of curve is Config.FADE_CURVE_MS_BEFORE_FADE
# milliseconds before fade starts to silence
self.start_ms: int = max(
0, track_fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1
)
self.end_ms: int = track_silence_at
audio_segment = audio[self.start_ms : self.end_ms]
self.graph_array = np.array(audio_segment.get_array_of_samples())
# Calculate the factor to map milliseconds of track to array
self.ms_to_array_factor = len(self.graph_array) / (self.end_ms - self.start_ms)
self.curve: Optional[PlotDataItem] = None
self.region: Optional[LinearRegionItem] = None
def clear(self) -> None:
"""Clear the current graph"""
if self.GraphWidget:
self.GraphWidget.clear()
def plot(self) -> None:
if self.GraphWidget:
self.curve = self.GraphWidget.plot(self.graph_array)
if self.curve:
self.curve.setPen(Config.FADE_CURVE_FOREGROUND)
else:
log.debug("_FadeCurve.plot: no curve")
else:
log.debug("_FadeCurve.plot: no GraphWidget")
def tick(self, play_time: int) -> None:
"""Update volume fade curve"""
if not self.GraphWidget:
return
ms_of_graph = play_time - self.start_ms
if ms_of_graph < 0:
return
if self.region is None:
# Create the region now that we're into fade
self.region = pg.LinearRegionItem([0, 0], bounds=[0, len(self.graph_array)])
self.GraphWidget.addItem(self.region)
# Update region position
if self.region:
self.region.setRegion([0, ms_of_graph * self.ms_to_array_factor])
class _FadeTrack(QThread):
finished = pyqtSignal()
def __init__(self, player: vlc.MediaPlayer, fade_seconds: int) -> None:
super().__init__()
self.player = player
self.fade_seconds = fade_seconds
def run(self) -> None:
"""
Implementation of fading the player
"""
if not self.player:
return
# Reduce volume logarithmically
total_steps = self.fade_seconds * Config.FADEOUT_STEPS_PER_SECOND
if total_steps > 0:
db_reduction_per_step = Config.FADEOUT_DB / total_steps
reduction_factor_per_step = pow(10, (db_reduction_per_step / 20))
volume = self.player.audio_get_volume()
for i in range(1, total_steps + 1):
self.player.audio_set_volume(
int(volume * pow(reduction_factor_per_step, i))
)
sleep(1 / Config.FADEOUT_STEPS_PER_SECOND)
self.finished.emit()
# TODO can we move this into the _Music class?
vlc_instance = VLCManager().vlc_instance
class _Music:
"""
Manage the playing of music tracks
"""
def __init__(self, name: str) -> None:
vlc_instance.set_user_agent(name, name)
self.player: Optional[vlc.MediaPlayer] = None
self.name = name
self.max_volume: int = Config.VLC_VOLUME_DEFAULT
self.start_dt: Optional[dt.datetime] = None
# Set up logging
# self._set_vlc_log()
# VLC logging very noisy so comment out unless needed
# @VLC_LOG_CB
# def log_callback(data, level, ctx, fmt, args):
# try:
# # Create a ctypes string buffer to hold the formatted message
# buf = ctypes.create_string_buffer(1024)
# # Use vsnprintf to format the string with the va_list
# libc.vsnprintf(buf, len(buf), fmt, args)
# # Decode the formatted message
# message = buf.value.decode("utf-8", errors="replace")
# log.debug("VLC: " + message)
# except Exception as e:
# log.error(f"Error in VLC log callback: {e}")
# def _set_vlc_log(self):
# try:
# vlc.libvlc_log_set(vlc_instance, self.log_callback, None)
# log.debug("VLC logging set up successfully")
# except Exception as e:
# log.error(f"Failed to set up VLC logging: {e}")
def fade(self, fade_seconds: int) -> None:
"""
Fade the currently playing track.
The actual management of fading runs in its own thread so as not
to hold up the UI during the fade.
"""
if not self.player:
return
if not self.player.get_position() > 0 and self.player.is_playing():
return
self.fader_worker = _FadeTrack(self.player, fade_seconds=fade_seconds)
self.fader_worker.finished.connect(self.player.release)
self.fader_worker.start()
self.start_dt = None
def get_playtime(self) -> int:
"""
Return number of milliseconds current track has been playing or
zero if not playing. The vlc function get_time() only updates 3-4
times a second; this function has much better resolution.
"""
if self.start_dt is None:
return 0
now = dt.datetime.now()
elapsed_seconds = (now - self.start_dt).total_seconds()
return int(elapsed_seconds * 1000)
def get_position(self) -> float:
"""Return current position"""
if not self.player:
return 0.0
return self.player.get_position()
def is_playing(self) -> bool:
"""
Return True if we're playing
"""
if not self.player:
return False
# There is a discrete time between starting playing a track and
# player.is_playing() returning True, so assume playing if less
# than Config.PLAY_SETTLE microseconds have passed since
# starting play.
return self.start_dt is not None and (
self.player.is_playing()
or (dt.datetime.now() - self.start_dt)
< dt.timedelta(microseconds=Config.PLAY_SETTLE)
)
def play(
self,
path: str,
start_time: dt.datetime,
position: Optional[float] = None,
) -> None:
"""
Start playing the track at path.
Log and return if path not found.
start_time ensures our version and our caller's version of
the start time is the same
"""
log.debug(f"Music[{self.name}].play({path=}, {position=}")
if helpers.file_is_unreadable(path):
log.error(f"play({path}): path not readable")
return None
self.player = vlc.MediaPlayer(vlc_instance, path)
if self.player is None:
log.error(f"_Music:play: failed to create MediaPlayer ({path=})")
helpers.show_warning(
None, "Error creating MediaPlayer", f"Cannot play file ({path})"
)
return
_ = self.player.play()
self.set_volume(self.max_volume)
if position:
self.player.set_position(position)
self.start_dt = start_time
def set_position(self, position: float) -> None:
"""
Set player position
"""
if self.player:
self.player.set_position(position)
def set_volume(
self, volume: Optional[int] = None, set_default: bool = True
) -> None:
"""Set maximum volume used for player"""
if not self.player:
return
if set_default and volume:
self.max_volume = volume
if volume is None:
volume = Config.VLC_VOLUME_DEFAULT
self.player.audio_set_volume(volume)
# Ensure volume correct
# For as-yet unknown reasons. sometimes the volume gets
# reset to zero within 200mS or so of starting play. This
# only happened since moving to Debian 12, which uses
# Pipewire for sound (which may be irrelevant).
for _ in range(3):
current_volume = self.player.audio_get_volume()
if current_volume < volume:
self.player.audio_set_volume(volume)
log.debug(f"Reset from {volume=}")
sleep(0.1)
def stop(self) -> None:
"""Immediately stop playing"""
log.debug(f"Music[{self.name}].stop()")
self.start_dt = None
if not self.player:
return
if self.player.is_playing():
self.player.stop()
self.player.release()
self.player = None
class PlaylistRow:
"""
Object to manage playlist row and track.
"""
def __init__(self, dto: PlaylistRowDTO) -> None:
"""
The dto object will include a Tracks object if this row has a track.
"""
self.dto = dto
self.music = _Music(name=Config.VLC_MAIN_PLAYER_NAME)
self.signals = MusicMusterSignals()
self.end_of_track_signalled: bool = False
self.end_time: dt.datetime | None = None
self.fade_graph: Any | None = None
self.fade_graph_start_updates: dt.datetime | None = None
self.forecast_end_time: dt.datetime | None = None
self.forecast_start_time: dt.datetime | None = None
self.note_bg: str | None = None
self.note_fg: str | None = None
self.resume_marker: float = 0.0
self.row_bg: str | None = None
self.row_fg: str | None = None
self.start_time: dt.datetime | None = None
def __repr__(self) -> str:
return (
f"<PlaylistRow(playlist_id={self.dto.playlist_id}, "
f"row_number={self.dto.row_number}, "
f"playlistrow_id={self.dto.playlistrow_id}, "
f"note={self.dto.note}, track_id={self.dto.track_id}>"
)
# Expose TrackDTO fields as properties
@property
def artist(self):
return self.dto.artist
@property
def bitrate(self):
return self.dto.bitrate
@property
def duration(self):
return self.dto.duration
@property
def fade_at(self):
return self.dto.fade_at
@property
def intro(self):
return self.dto.intro
@property
def lastplayed(self):
return self.dto.lastplayed
@property
def path(self):
return self.dto.path
@property
def silence_at(self):
return self.dto.silence_at
@property
def start_gap(self):
return self.dto.start_gap
@property
def title(self):
return self.dto.title
@property
def track_id(self):
return self.dto.track_id
@track_id.setter
def track_id(self, value: int) -> None:
"""
Adding a track_id should only happen to a header row.
"""
if self.track_id:
raise ApplicationError("Attempting to add track to row with existing track ({self=}")
# TODO: set up write access to track_id. Should only update if
# track_id == 0. Need to update all other track fields at the
# same time.
print("set track_id attribute for {self=}, {value=}")
pass
# Expose PlaylistRowDTO fields as properties
@property
def note(self):
return self.dto.note
@note.setter
def note(self, value: str) -> None:
# TODO set up write access to db
print("set note attribute for {self=}, {value=}")
# self.dto.note = value
@property
def played(self):
return self.dto.played
@played.setter
def played(self, value: bool = True) -> None:
# TODO set up write access to db
print("set played attribute for {self=}")
# self.dto.played = value
@property
def playlist_id(self):
return self.dto.playlist_id
@property
def playlistrow_id(self):
return self.dto.playlistrow_id
@property
def row_number(self):
return self.dto.row_number
@row_number.setter
def row_number(self, value: int) -> None:
# TODO do we need to set up write access to db?
self.dto.row_number = value
def check_for_end_of_track(self) -> None:
"""
Check whether track has ended. If so, emit track_ended_signal
"""
if self.start_time is None:
return
if self.end_of_track_signalled:
return
if self.music.is_playing():
return
self.start_time = None
if self.fade_graph:
self.fade_graph.clear()
# Ensure that player is released
self.music.fade(0)
self.signals.track_ended_signal.emit()
self.end_of_track_signalled = True
def drop3db(self, enable: bool) -> None:
"""
If enable is true, drop output by 3db else restore to full volume
"""
if enable:
self.music.set_volume(volume=Config.VLC_VOLUME_DROP3db, set_default=False)
else:
self.music.set_volume(volume=Config.VLC_VOLUME_DEFAULT, set_default=False)
def fade(self, fade_seconds: int = Config.FADEOUT_SECONDS) -> None:
"""Fade music"""
self.resume_marker = self.music.get_position()
self.music.fade(fade_seconds)
self.signals.track_ended_signal.emit()
def is_playing(self) -> bool:
"""
Return True if we're currently playing else False
"""
if self.start_time is None:
return False
return self.music.is_playing()
def play(self, position: Optional[float] = None) -> None:
"""Play track"""
now = dt.datetime.now()
self.start_time = now
# Initialise player
self.music.play(self.path, start_time=now, position=position)
self.end_time = now + dt.timedelta(milliseconds=self.duration)
# Calculate time fade_graph should start updating
if self.fade_at:
update_graph_at_ms = max(
0, self.fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1
)
self.fade_graph_start_updates = now + dt.timedelta(
milliseconds=update_graph_at_ms
)
def set_forecast_start_time(
self, modified_rows: list[int], start: Optional[dt.datetime]
) -> Optional[dt.datetime]:
"""
Set forecast start time for this row
Update passed modified rows list if we changed the row.
Return new start time
"""
changed = False
if self.forecast_start_time != start:
self.forecast_start_time = start
changed = True
if start is None:
if self.forecast_end_time is not None:
self.forecast_end_time = None
changed = True
new_start_time = None
else:
end_time = start + dt.timedelta(milliseconds=self.duration())
new_start_time = end_time
if self.forecast_end_time != end_time:
self.forecast_end_time = end_time
changed = True
if changed and self.row_number not in modified_rows:
modified_rows.append(self.row_number)
return new_start_time
def stop(self, fade_seconds: int = 0) -> None:
"""
Stop this track playing
"""
self.resume_marker = self.music.get_position()
self.fade(fade_seconds)
# Reset fade graph
if self.fade_graph:
self.fade_graph.clear()
def time_playing(self) -> int:
"""
Return time track has been playing in milliseconds, zero if not playing
"""
if self.start_time is None:
return 0
return self.music.get_playtime()
def time_remaining_intro(self) -> int:
"""
Return milliseconds of intro remaining. Return 0 if no intro time in track
record or if intro has finished.
"""
if not self.intro:
return 0
return max(0, self.intro - self.time_playing())
def time_to_fade(self) -> int:
"""
Return milliseconds until fade time. Return zero if we're not playing.
"""
if self.start_time is None:
return 0
return self.fade_at - self.time_playing()
def time_to_silence(self) -> int:
"""
Return milliseconds until silent. Return zero if we're not playing.
"""
if self.start_time is None:
return 0
return self.silence_at - self.time_playing()
def update_fade_graph(self) -> None:
"""
Update fade graph
"""
if (
not self.is_playing()
or not self.fade_graph_start_updates
or not self.fade_graph
):
return
now = dt.datetime.now()
if self.fade_graph_start_updates > now:
return
self.fade_graph.tick(self.time_playing())
def update_playlist_and_row(self, session: Session) -> None:
"""
Update local playlist_id and row_number from playlistrow_id
"""
# TODO: only seems to be used by track_sequence
return
# plr = session.get(PlaylistRows, self.playlistrow_id)
# if not plr:
# raise ApplicationError(f"(Can't retrieve PlaylistRows entry, {self=}")
# self.playlist_id = plr.playlist_id
# self.row_number = plr.row_number
class TrackSequence:
next: Optional[PlaylistRow] = None
current: Optional[PlaylistRow] = None
previous: Optional[PlaylistRow] = None
def set_next(self, rat: Optional[PlaylistRow]) -> None:
"""
Set the 'next' track to be passed rat. Clear
any previous next track. If passed rat is None
just clear existing next track.
"""
# Clear any existing fade graph
if self.next and self.next.fade_graph:
self.next.fade_graph.clear()
if rat is None:
self.next = None
else:
self.next = rat
self.create_fade_graph()
def create_fade_graph(self) -> None:
"""
Initialise and add FadeCurve in a thread as it's slow
"""
self.fadecurve_thread = QThread()
if self.next is None:
raise ApplicationError("hell in a handcart")
self.worker = _AddFadeCurve(
self.next,
track_path=self.next.path,
track_fade_at=self.next.fade_at,
track_silence_at=self.next.silence_at,
)
self.worker.moveToThread(self.fadecurve_thread)
self.fadecurve_thread.started.connect(self.worker.run)
self.worker.finished.connect(self.fadecurve_thread.quit)
self.worker.finished.connect(self.worker.deleteLater)
self.fadecurve_thread.finished.connect(self.fadecurve_thread.deleteLater)
self.fadecurve_thread.start()
track_sequence = TrackSequence()