Create trackmanager.py

music.py is fully absorbed into trackmanager.py and thus removed
Substantial parts of classes.py are absorbed into trackmanager.py
This commit is contained in:
Keith Edmunds 2024-06-02 10:00:31 +01:00
parent 8ea0a0dad5
commit fbcedb6c3b
3 changed files with 524 additions and 514 deletions

View File

@ -2,20 +2,13 @@
from dataclasses import dataclass, field from dataclasses import dataclass, field
from enum import auto, Enum from enum import auto, Enum
from typing import Any, Optional from typing import Any, Optional
import datetime as dt
# PyQt imports # PyQt imports
from PyQt6.QtCore import pyqtSignal, QObject, QThread from PyQt6.QtCore import pyqtSignal, QObject
# Third party imports # Third party imports
import numpy as np
import pyqtgraph as pg # type: ignore
# App imports # App imports
from config import Config
from log import log
from models import db, PlaylistRows, Tracks
from music import Music
import helpers import helpers
@ -32,62 +25,6 @@ class Col(Enum):
NOTE = auto() NOTE = auto()
class FadeCurve:
GraphWidget = None
def __init__(
self, track_path: str, track_fade_at: int, track_silence_at: int
) -> None:
"""
Set up fade graph array
"""
audio = helpers.get_audio_segment(track_path)
if not audio:
log.error(f"FadeCurve: could not get audio for {track_path=}")
return None
# Start point of curve is Config.FADE_CURVE_MS_BEFORE_FADE
# milliseconds before fade starts to silence
self.start_ms = max(0, track_fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1)
self.end_ms = track_silence_at
self.audio_segment = audio[self.start_ms : self.end_ms]
self.graph_array = np.array(self.audio_segment.get_array_of_samples())
# Calculate the factor to map milliseconds of track to array
self.ms_to_array_factor = len(self.graph_array) / (self.end_ms - self.start_ms)
self.region = None
def clear(self) -> None:
"""Clear the current graph"""
if self.GraphWidget:
self.GraphWidget.clear()
def plot(self):
self.curve = self.GraphWidget.plot(self.graph_array)
self.curve.setPen(Config.FADE_CURVE_FOREGROUND)
def tick(self, play_time) -> None:
"""Update volume fade curve"""
if not self.GraphWidget:
return
ms_of_graph = play_time - self.start_ms
if ms_of_graph < 0:
return
if self.region is None:
# Create the region now that we're into fade
self.region = pg.LinearRegionItem([0, 0], bounds=[0, len(self.graph_array)])
self.GraphWidget.addItem(self.region)
# Update region position
self.region.setRegion([0, ms_of_graph * self.ms_to_array_factor])
@helpers.singleton @helpers.singleton
@dataclass @dataclass
class MusicMusterSignals(QObject): class MusicMusterSignals(QObject):
@ -116,151 +53,6 @@ class MusicMusterSignals(QObject):
super().__init__() super().__init__()
class _TrackPlayer:
"""
Object to manage active playlist tracks,
typically the previous, current and next track.
"""
def __init__(self, session: db.Session, player_name: str, track_id: int) -> None:
"""
Initialises data structure.
Define a player.
Raise ValueError if no track in passed plr.
"""
track = session.get(Tracks, track_id)
if not track:
raise ValueError(f"_TrackPlayer: unable to retreived {track_id=}")
self.player_name = player_name
self.artist = track.artist
self.bitrate = track.bitrate
self.duration = track.duration
self.fade_at = track.fade_at
self.intro = track.intro
self.path = track.path
self.silence_at = track.silence_at
self.start_gap = track.start_gap
self.title = track.title
self.track_id = track.id
self.end_time: Optional[dt.datetime] = None
self.fade_graph: Optional[FadeCurve] = None
self.fade_graph_start_updates: Optional[dt.datetime] = None
self.resume_marker: Optional[float]
self.start_time: Optional[dt.datetime] = None
self.player = Music(name=player_name)
# Initialise and add FadeCurve in a thread as it's slow
self.fadecurve_thread = QThread()
self.worker = AddFadeCurve(
self,
track_path=track.path,
track_fade_at=track.fade_at,
track_silence_at=track.silence_at,
)
self.worker.moveToThread(self.fadecurve_thread)
self.fadecurve_thread.started.connect(self.worker.run)
self.worker.finished.connect(self.fadecurve_thread.quit)
self.worker.finished.connect(self.worker.deleteLater)
self.fadecurve_thread.finished.connect(self.fadecurve_thread.deleteLater)
self.fadecurve_thread.start()
def __repr__(self) -> str:
return (
f"<_TrackPlayer(title={self.title}, artist={self.artist}, "
f"player_name={self.player_name}>"
)
def fade(self, fade_seconds: int = Config.FADEOUT_SECONDS) -> None:
"""Fade music"""
self.player.fade(fade_seconds)
def play(self, position: Optional[float] = None) -> None:
"""Play track"""
now = dt.datetime.now()
self.start_time = now
self.player.play(self.path, position)
self.end_time = self.start_time + dt.timedelta(milliseconds=self.duration)
# Calculate time fade_graph should start updating
if self.fade_at:
update_graph_at_ms = max(
0, self.fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1
)
self.fade_graph_start_updates = now + dt.timedelta(
milliseconds=update_graph_at_ms
)
def stop_playing(self, fade_seconds: int = 0) -> None:
"""
Stop this track playing
"""
self.resume_marker = self.player.get_position()
self.player.fade(fade_seconds)
# Reset fade graph
if self.fade_graph:
self.fade_graph.clear()
class MainTrackPlayer(_TrackPlayer):
def __init__(self, session: db.Session, track_id: int) -> None:
super().__init__(
session=session, player_name=Config.VLC_MAIN_PLAYER_NAME, track_id=track_id
)
class PreviewTrackPlayer(_TrackPlayer):
def __init__(self, session: db.Session, track_id: int) -> None:
super().__init__(
session=session,
player_name=Config.VLC_PREVIEW_PLAYER_NAME,
track_id=track_id,
)
class PlaylistTrack:
"""
Used to provide a single reference point for specific playlist tracks,
typically the previous, current and next track.
"""
def __init__(self, plrid: int) -> None:
"""
Initialise
"""
with db.Session() as session:
# Ensure we have a track
plr = session.get(PlaylistRows, plrid)
if not plr:
raise ValueError(f"PlaylistTrack: unable to retreive plr {plrid=}")
self.track_id: int = plr.track_id
# Save non-track plr info
self.row_number: int = plr.plr_rownum
self.playlist_id: int = plr.playlist_id
self.plr_id: int = plr.id
# Initialise player
self.track_player = MainTrackPlayer(session=session, track_id=self.track_id)
def __repr__(self) -> str:
return (
f"<PlaylistTrack(title={self.track_player.title}, "
f"artist={self.track_player.artist}, "
f"plr_rownum={self.row_number}, playlist_id={self.playlist_id}>"
)
@dataclass @dataclass
class TrackFileData: class TrackFileData:
""" """
@ -273,46 +65,3 @@ class TrackFileData:
obsolete_path: Optional[str] = None obsolete_path: Optional[str] = None
tags: dict[str, Any] = field(default_factory=dict) tags: dict[str, Any] = field(default_factory=dict)
audio_metadata: dict[str, str | int | float] = field(default_factory=dict) audio_metadata: dict[str, str | int | float] = field(default_factory=dict)
class AddFadeCurve(QObject):
"""
Initialising a fade curve introduces a noticeable delay so carry out in
a thread.
"""
finished = pyqtSignal()
def __init__(
self,
track_player: _TrackPlayer,
track_path: str,
track_fade_at: int,
track_silence_at: int,
):
super().__init__()
self.track_player = track_player
self.track_path = track_path
self.track_fade_at = track_fade_at
self.track_silence_at = track_silence_at
def run(self):
"""
Create fade curve and add to PlaylistTrack object
"""
fc = FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at)
if not fc:
log.error(f"Failed to create FadeCurve for {self.track_path=}")
else:
self.track_player.fade_graph = fc
self.finished.emit()
class TrackSequence:
next: Optional[PlaylistTrack] = None
current: Optional[PlaylistTrack] = None
previous: Optional[PlaylistTrack] = None
track_sequence = TrackSequence()

View File

@ -1,262 +0,0 @@
# Standard library imports
import datetime as dt
import threading
from time import sleep
from typing import Optional
# Third party imports
import vlc # type: ignore
# PyQt imports
from PyQt6.QtCore import (
QRunnable,
QThreadPool,
)
# App imports
from config import Config
from helpers import file_is_unreadable
from log import log
lock = threading.Lock()
class FadeTrack(QRunnable):
def __init__(self, player: vlc.MediaPlayer, fade_seconds) -> None:
super().__init__()
self.player = player
self.fade_seconds = fade_seconds
def run(self) -> None:
"""
Implementation of fading the player
"""
if not self.player:
return
# Reduce volume logarithmically
total_steps = self.fade_seconds * Config.FADEOUT_STEPS_PER_SECOND
db_reduction_per_step = Config.FADEOUT_DB / total_steps
reduction_factor_per_step = pow(10, (db_reduction_per_step / 20))
volume = self.player.audio_get_volume()
for i in range(1, total_steps + 1):
self.player.audio_set_volume(
int(volume * pow(reduction_factor_per_step, i))
)
sleep(1 / Config.FADEOUT_STEPS_PER_SECOND)
self.player.stop()
log.debug(f"Releasing player {self.player=}")
self.player.release()
class Music:
"""
Manage the playing of music tracks
"""
def __init__(self, name) -> None:
self.VLC = vlc.Instance()
self.VLC.set_user_agent(name, name)
self.player = None
self.name = name
self.max_volume = Config.VLC_VOLUME_DEFAULT
self.start_dt: Optional[dt.datetime] = None
def _adjust_by_ms(self, ms: int) -> None:
"""Move player position by ms milliseconds"""
if not self.player:
return
elapsed_ms = self.get_playtime()
position = self.get_position()
if not position:
position = 0
new_position = max(0, position + ((position * ms) / elapsed_ms))
self.set_position(new_position)
# Adjus start time so elapsed time calculations are correct
if new_position == 0:
self.start_dt = dt.datetime.now()
else:
self.start_dt -= dt.timedelta(milliseconds=ms)
def fade(self, fade_seconds: int) -> None:
"""
Fade the currently playing track.
The actual management of fading runs in its own thread so as not
to hold up the UI during the fade.
"""
log.info(f"Music[{self.name}].stop()")
if not self.player:
return
if not self.player.get_position() > 0 and self.player.is_playing():
return
if fade_seconds <= 0:
self._stop()
return
# Take a copy of current player to allow another track to be
# started without interfering here
with lock:
p = self.player
self.player = None
pool = QThreadPool.globalInstance()
fader = FadeTrack(p, fade_seconds=fade_seconds)
pool.start(fader)
self.start_dt = None
def get_playtime(self) -> int:
"""
Return number of milliseconds current track has been playing or
zero if not playing. The vlc function get_time() only updates 3-4
times a second; this function has much better resolution.
"""
if self.start_dt is None:
return 0
now = dt.datetime.now()
elapsed_seconds = (now - self.start_dt).total_seconds()
return int(elapsed_seconds * 1000)
def get_position(self) -> Optional[float]:
"""Return current position"""
if not self.player:
return None
return self.player.get_position()
def is_playing(self) -> bool:
"""
Return True if we're playing
"""
if not self.player:
return False
# There is a discrete time between starting playing a track and
# player.is_playing() returning True, so assume playing if less
# than Config.PLAY_SETTLE microseconds have passed since
# starting play.
return (
self.player is not None
and self.start_dt is not None
and (
self.player.is_playing()
or (dt.datetime.now() - self.start_dt)
< dt.timedelta(microseconds=Config.PLAY_SETTLE)
)
)
def move_back(self, ms: int) -> None:
"""
Rewind player by ms milliseconds
"""
self._adjust_by_ms(ms * -1)
def move_forward(self, ms: int) -> None:
"""
Rewind player by ms milliseconds
"""
self._adjust_by_ms(ms)
def play(self, path: str, position: Optional[float]) -> None:
"""
Start playing the track at path.
Log and return if path not found.
"""
log.info(f"Music[{self.name}].play({path=}, {position=}")
if file_is_unreadable(path):
log.error(f"play({path}): path not readable")
return None
media = self.VLC.media_new_path(path)
self.player = media.player_new_from_media()
if self.player:
_ = self.player.play()
self.set_volume(self.max_volume)
if position:
self.player.set_position(position)
self.start_dt = dt.datetime.now()
# For as-yet unknown reasons. sometimes the volume gets
# reset to zero within 200mS or so of starting play. This
# only happened since moving to Debian 12, which uses
# Pipewire for sound (which may be irrelevant).
# It has been known for the volume to need correcting more
# than once in the first 200mS.
for _ in range(3):
if self.player:
volume = self.player.audio_get_volume()
if volume < Config.VLC_VOLUME_DEFAULT:
self.set_volume(Config.VLC_VOLUME_DEFAULT)
log.error(f"Reset from {volume=}")
sleep(0.1)
def set_position(self, position: int) -> None:
"""
Set player position
"""
if self.player:
self.player.set_position(position)
def set_volume(self, volume=None, set_default=True) -> None:
"""Set maximum volume used for player"""
if not self.player:
return
if set_default:
self.max_volume = volume
if volume is None:
volume = Config.VLC_VOLUME_DEFAULT
self.player.audio_set_volume(volume)
# Ensure volume correct
# For as-yet unknown reasons. sometimes the volume gets
# reset to zero within 200mS or so of starting play. This
# only happened since moving to Debian 12, which uses
# Pipewire for sound (which may be irrelevant).
for _ in range(3):
current_volume = self.player.audio_get_volume()
if current_volume < volume:
self.player.audio_set_volume(volume)
log.debug(f"Reset from {volume=}")
sleep(0.1)
def _stop(self) -> None:
"""Immediately stop playing"""
log.info(f"Music[{self.name}].stop()")
self.start_dt = None
if not self.player:
return
p = self.player
self.player = None
self.start_dt = None
with lock:
p.stop()
p.release()
p = None

523
app/trackmanager.py Normal file
View File

@ -0,0 +1,523 @@
# Standard library imports
from __future__ import annotations
import datetime as dt
import threading
from time import sleep
from typing import Optional
# Third party imports
import numpy as np
import pyqtgraph as pg # type: ignore
import vlc # type: ignore
# PyQt imports
from PyQt6.QtCore import (
pyqtSignal,
QObject,
QRunnable,
QThread,
QThreadPool,
)
# App imports
from config import Config
from log import log
from models import db, PlaylistRows, Tracks
from helpers import (
file_is_unreadable,
get_audio_segment,
)
lock = threading.Lock()
class _AddFadeCurve(QObject):
"""
Initialising a fade curve introduces a noticeable delay so carry out in
a thread.
"""
finished = pyqtSignal()
def __init__(
self,
track_player: _TrackManager,
track_path: str,
track_fade_at: int,
track_silence_at: int,
):
super().__init__()
self.track_player = track_player
self.track_path = track_path
self.track_fade_at = track_fade_at
self.track_silence_at = track_silence_at
def run(self):
"""
Create fade curve and add to PlaylistTrack object
"""
fc = _FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at)
if not fc:
log.error(f"Failed to create FadeCurve for {self.track_path=}")
else:
self.track_player.fade_graph = fc
self.finished.emit()
class _FadeCurve:
GraphWidget = None
def __init__(
self, track_path: str, track_fade_at: int, track_silence_at: int
) -> None:
"""
Set up fade graph array
"""
audio = get_audio_segment(track_path)
if not audio:
log.error(f"FadeCurve: could not get audio for {track_path=}")
return None
# Start point of curve is Config.FADE_CURVE_MS_BEFORE_FADE
# milliseconds before fade starts to silence
self.start_ms = max(0, track_fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1)
self.end_ms = track_silence_at
self.audio_segment = audio[self.start_ms : self.end_ms]
self.graph_array = np.array(self.audio_segment.get_array_of_samples())
# Calculate the factor to map milliseconds of track to array
self.ms_to_array_factor = len(self.graph_array) / (self.end_ms - self.start_ms)
self.region = None
def clear(self) -> None:
"""Clear the current graph"""
if self.GraphWidget:
self.GraphWidget.clear()
def plot(self):
self.curve = self.GraphWidget.plot(self.graph_array)
self.curve.setPen(Config.FADE_CURVE_FOREGROUND)
def tick(self, play_time) -> None:
"""Update volume fade curve"""
if not self.GraphWidget:
return
ms_of_graph = play_time - self.start_ms
if ms_of_graph < 0:
return
if self.region is None:
# Create the region now that we're into fade
self.region = pg.LinearRegionItem([0, 0], bounds=[0, len(self.graph_array)])
self.GraphWidget.addItem(self.region)
# Update region position
self.region.setRegion([0, ms_of_graph * self.ms_to_array_factor])
class _FadeTrack(QRunnable):
def __init__(self, player: vlc.MediaPlayer, fade_seconds) -> None:
super().__init__()
self.player = player
self.fade_seconds = fade_seconds
def run(self) -> None:
"""
Implementation of fading the player
"""
if not self.player:
return
# Reduce volume logarithmically
total_steps = self.fade_seconds * Config.FADEOUT_STEPS_PER_SECOND
db_reduction_per_step = Config.FADEOUT_DB / total_steps
reduction_factor_per_step = pow(10, (db_reduction_per_step / 20))
volume = self.player.audio_get_volume()
for i in range(1, total_steps + 1):
self.player.audio_set_volume(
int(volume * pow(reduction_factor_per_step, i))
)
sleep(1 / Config.FADEOUT_STEPS_PER_SECOND)
self.player.stop()
log.debug(f"Releasing player {self.player=}")
self.player.release()
class _Music:
"""
Manage the playing of music tracks
"""
def __init__(self, name) -> None:
self.VLC = vlc.Instance()
self.VLC.set_user_agent(name, name)
self.player = None
self.name = name
self.max_volume = Config.VLC_VOLUME_DEFAULT
self.start_dt: Optional[dt.datetime] = None
def _adjust_by_ms(self, ms: int) -> None:
"""Move player position by ms milliseconds"""
if not self.player:
return
elapsed_ms = self.get_playtime()
position = self.get_position()
if not position:
position = 0
new_position = max(0, position + ((position * ms) / elapsed_ms))
self.set_position(new_position)
# Adjus start time so elapsed time calculations are correct
if new_position == 0:
self.start_dt = dt.datetime.now()
else:
self.start_dt -= dt.timedelta(milliseconds=ms)
def fade(self, fade_seconds: int) -> None:
"""
Fade the currently playing track.
The actual management of fading runs in its own thread so as not
to hold up the UI during the fade.
"""
log.info(f"Music[{self.name}].stop()")
if not self.player:
return
if not self.player.get_position() > 0 and self.player.is_playing():
return
if fade_seconds <= 0:
self._stop()
return
# Take a copy of current player to allow another track to be
# started without interfering here
with lock:
p = self.player
self.player = None
pool = QThreadPool.globalInstance()
fader = _FadeTrack(p, fade_seconds=fade_seconds)
pool.start(fader)
self.start_dt = None
def get_playtime(self) -> int:
"""
Return number of milliseconds current track has been playing or
zero if not playing. The vlc function get_time() only updates 3-4
times a second; this function has much better resolution.
"""
if self.start_dt is None:
return 0
now = dt.datetime.now()
elapsed_seconds = (now - self.start_dt).total_seconds()
return int(elapsed_seconds * 1000)
def get_position(self) -> Optional[float]:
"""Return current position"""
if not self.player:
return None
return self.player.get_position()
def is_playing(self) -> bool:
"""
Return True if we're playing
"""
if not self.player:
return False
# There is a discrete time between starting playing a track and
# player.is_playing() returning True, so assume playing if less
# than Config.PLAY_SETTLE microseconds have passed since
# starting play.
return (
self.start_dt is not None
and (
self.player.is_playing()
or (dt.datetime.now() - self.start_dt)
< dt.timedelta(microseconds=Config.PLAY_SETTLE)
)
)
def move_back(self, ms: int) -> None:
"""
Rewind player by ms milliseconds
"""
self._adjust_by_ms(ms * -1)
def move_forward(self, ms: int) -> None:
"""
Rewind player by ms milliseconds
"""
self._adjust_by_ms(ms)
def play(self, path: str, position: Optional[float]) -> None:
"""
Start playing the track at path.
Log and return if path not found.
"""
log.info(f"Music[{self.name}].play({path=}, {position=}")
if file_is_unreadable(path):
log.error(f"play({path}): path not readable")
return None
media = self.VLC.media_new_path(path)
self.player = media.player_new_from_media()
if self.player:
_ = self.player.play()
self.set_volume(self.max_volume)
if position:
self.player.set_position(position)
self.start_dt = dt.datetime.now()
# For as-yet unknown reasons. sometimes the volume gets
# reset to zero within 200mS or so of starting play. This
# only happened since moving to Debian 12, which uses
# Pipewire for sound (which may be irrelevant).
# It has been known for the volume to need correcting more
# than once in the first 200mS.
for _ in range(3):
if self.player:
volume = self.player.audio_get_volume()
if volume < Config.VLC_VOLUME_DEFAULT:
self.set_volume(Config.VLC_VOLUME_DEFAULT)
log.error(f"Reset from {volume=}")
sleep(0.1)
def set_position(self, position: int) -> None:
"""
Set player position
"""
if self.player:
self.player.set_position(position)
def set_volume(self, volume=None, set_default=True) -> None:
"""Set maximum volume used for player"""
if not self.player:
return
if set_default:
self.max_volume = volume
if volume is None:
volume = Config.VLC_VOLUME_DEFAULT
self.player.audio_set_volume(volume)
# Ensure volume correct
# For as-yet unknown reasons. sometimes the volume gets
# reset to zero within 200mS or so of starting play. This
# only happened since moving to Debian 12, which uses
# Pipewire for sound (which may be irrelevant).
for _ in range(3):
current_volume = self.player.audio_get_volume()
if current_volume < volume:
self.player.audio_set_volume(volume)
log.debug(f"Reset from {volume=}")
sleep(0.1)
def _stop(self) -> None:
"""Immediately stop playing"""
log.info(f"Music[{self.name}].stop()")
self.start_dt = None
if not self.player:
return
p = self.player
self.player = None
self.start_dt = None
with lock:
p.stop()
p.release()
p = None
class _TrackManager:
"""
Object to manage active playlist tracks,
typically the previous, current and next track.
"""
def __init__(self, session: db.Session, player_name: str, track_id: int) -> None:
"""
Initialises data structure.
Define a player.
Raise ValueError if no track in passed plr.
"""
track = session.get(Tracks, track_id)
if not track:
raise ValueError(f"_TrackPlayer: unable to retreived {track_id=}")
self.player_name = player_name
self.artist = track.artist
self.bitrate = track.bitrate
self.duration = track.duration
self.fade_at = track.fade_at
self.intro = track.intro
self.path = track.path
self.silence_at = track.silence_at
self.start_gap = track.start_gap
self.title = track.title
self.track_id = track.id
self.end_time: Optional[dt.datetime] = None
self.fade_graph: Optional[_FadeCurve] = None
self.fade_graph_start_updates: Optional[dt.datetime] = None
self.resume_marker: Optional[float]
self.start_time: Optional[dt.datetime] = None
self.player = _Music(name=player_name)
# Initialise player
self.track_player = MainTrackManager(session=session, track_id=self.track_id)
# Initialise and add FadeCurve in a thread as it's slow
self.fadecurve_thread = QThread()
self.worker = _AddFadeCurve(
self,
track_path=track.path,
track_fade_at=track.fade_at,
track_silence_at=track.silence_at,
)
self.worker.moveToThread(self.fadecurve_thread)
self.fadecurve_thread.started.connect(self.worker.run)
self.worker.finished.connect(self.fadecurve_thread.quit)
self.worker.finished.connect(self.worker.deleteLater)
self.fadecurve_thread.finished.connect(self.fadecurve_thread.deleteLater)
self.fadecurve_thread.start()
def fade(self, fade_seconds: int = Config.FADEOUT_SECONDS) -> None:
"""Fade music"""
self.player.fade(fade_seconds)
@property
def is_playing(self) -> bool:
return self.track_player.is_playing()
def play(self, position: Optional[float] = None) -> None:
"""Play track"""
now = dt.datetime.now()
self.start_time = now
self.player.play(self.path, position)
self.end_time = self.start_time + dt.timedelta(milliseconds=self.duration)
# Calculate time fade_graph should start updating
if self.fade_at:
update_graph_at_ms = max(
0, self.fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1
)
self.fade_graph_start_updates = now + dt.timedelta(
milliseconds=update_graph_at_ms
)
def stop_playing(self, fade_seconds: int = 0) -> None:
"""
Stop this track playing
"""
self.resume_marker = self.player.get_position()
self.player.fade(fade_seconds)
# Reset fade graph
if self.fade_graph:
self.fade_graph.clear()
def time_to_fade(self) -> int:
"""
Return milliseconds until fade time. Return zero if we're not playing.
"""
if not self.player.is_playing:
return 0
class MainTrackManager(_TrackManager):
"""
Manage playing tracks from the playlist with associated data
"""
def __init__(self, plr_id: int) -> None:
"""
Set up manager for playlist tracks
"""
with db.Session() as session:
# Ensure we have a track
plr = session.get(PlaylistRows, plr_id)
if not plr:
raise ValueError(f"PlaylistTrack: unable to retreive plr {plr_id=}")
self.track_id: int = plr.track_id
super().__init__(
session=session, player_name=Config.VLC_MAIN_PLAYER_NAME, track_id=self.track_id
)
# Save non-track plr info
self.plr_id: int = plr.id
self.playlist_id: int = plr.playlist_id
self.row_number: int = plr.plr_rownum
def __repr__(self) -> str:
return (
f"<MainTrackManager(plr_id={self.plr_id}, playlist_id={self.playlist_id}, "
f"row_number={self.row_number}>"
)
class PreviewTrackManager(_TrackManager):
"""
Manage previewing tracks
"""
def __init__(self, track_id: int) -> None:
super().__init__(
player_name=Config.VLC_PREVIEW_PLAYER_NAME,
track_id=track_id,
)
def __repr__(self) -> str:
return f"<PreviewTrackManager(track_id={self.track_id}>"
class TrackSequence:
next: Optional[MainTrackManager] = None
current: Optional[MainTrackManager] = None
previous: Optional[MainTrackManager] = None
track_sequence = TrackSequence()