534 lines
15 KiB
Python
534 lines
15 KiB
Python
# Standard library imports
|
|
import datetime as dt
|
|
from typing import Any
|
|
|
|
# PyQt imports
|
|
from PyQt6.QtCore import (
|
|
pyqtSignal,
|
|
QObject,
|
|
QThread,
|
|
)
|
|
|
|
# Third party imports
|
|
from pyqtgraph import PlotWidget # type: ignore
|
|
from pyqtgraph.graphicsItems.PlotDataItem import PlotDataItem # type: ignore
|
|
from pyqtgraph.graphicsItems.LinearRegionItem import LinearRegionItem # type: ignore
|
|
import numpy as np
|
|
import pyqtgraph as pg # type: ignore
|
|
|
|
# App imports
|
|
from classes import ApplicationError, MusicMusterSignals, PlaylistRowDTO, singleton
|
|
from config import Config
|
|
import helpers
|
|
from log import log
|
|
from music_manager import Music
|
|
import repository
|
|
|
|
|
|
class PlaylistRow:
|
|
"""
|
|
Object to manage playlist row and track.
|
|
"""
|
|
|
|
def __init__(self, dto: PlaylistRowDTO) -> None:
|
|
"""
|
|
The dto object will include a Tracks object if this row has a track.
|
|
"""
|
|
|
|
self.dto = dto
|
|
self.music = Music(name=Config.VLC_MAIN_PLAYER_NAME)
|
|
self.signals = MusicMusterSignals()
|
|
self.end_of_track_signalled: bool = False
|
|
self.end_time: dt.datetime | None = None
|
|
self.fade_graph: Any | None = None
|
|
self.fade_graph_start_updates: dt.datetime | None = None
|
|
self.forecast_end_time: dt.datetime | None = None
|
|
self.forecast_start_time: dt.datetime | None = None
|
|
self.note_bg: str | None = None
|
|
self.note_fg: str | None = None
|
|
self.resume_marker: float = 0.0
|
|
self.row_bg: str | None = None
|
|
self.row_fg: str | None = None
|
|
self.start_time: dt.datetime | None = None
|
|
|
|
def __repr__(self) -> str:
|
|
return (
|
|
f"<PlaylistRow(playlist_id={self.dto.playlist_id}, "
|
|
f"row_number={self.dto.row_number}, "
|
|
f"playlistrow_id={self.dto.playlistrow_id}, "
|
|
f"note={self.dto.note}, track_id={self.dto.track_id}>"
|
|
)
|
|
|
|
# Expose TrackDTO fields as properties
|
|
@property
|
|
def artist(self):
|
|
return self.dto.artist
|
|
|
|
@property
|
|
def bitrate(self):
|
|
return self.dto.bitrate
|
|
|
|
@property
|
|
def duration(self):
|
|
return self.dto.duration
|
|
|
|
@property
|
|
def fade_at(self):
|
|
return self.dto.fade_at
|
|
|
|
@property
|
|
def intro(self):
|
|
return self.dto.intro
|
|
|
|
@property
|
|
def lastplayed(self):
|
|
return self.dto.lastplayed
|
|
|
|
@property
|
|
def path(self):
|
|
return self.dto.path
|
|
|
|
@property
|
|
def silence_at(self):
|
|
return self.dto.silence_at
|
|
|
|
@property
|
|
def start_gap(self):
|
|
return self.dto.start_gap
|
|
|
|
@property
|
|
def title(self):
|
|
return self.dto.title
|
|
|
|
@property
|
|
def track_id(self):
|
|
return self.dto.track_id
|
|
|
|
@track_id.setter
|
|
def track_id(self, value: int) -> None:
|
|
"""
|
|
Adding a track_id should only happen to a header row.
|
|
"""
|
|
|
|
if self.track_id:
|
|
raise ApplicationError("Attempting to add track to row with existing track ({self=}")
|
|
|
|
# TODO: set up write access to track_id. Should only update if
|
|
# track_id == 0. Need to update all other track fields at the
|
|
# same time.
|
|
print("set track_id attribute for {self=}, {value=}")
|
|
pass
|
|
|
|
# Expose PlaylistRowDTO fields as properties
|
|
@property
|
|
def note(self):
|
|
return self.dto.note
|
|
|
|
@note.setter
|
|
def note(self, value: str) -> None:
|
|
# TODO set up write access to db
|
|
print("set note attribute for {self=}, {value=}")
|
|
# self.dto.note = value
|
|
|
|
@property
|
|
def played(self):
|
|
return self.dto.played
|
|
|
|
@played.setter
|
|
def played(self, value: bool = True) -> None:
|
|
# TODO set up write access to db
|
|
print("set played attribute for {self=}")
|
|
# self.dto.played = value
|
|
|
|
@property
|
|
def playlist_id(self):
|
|
return self.dto.playlist_id
|
|
|
|
@property
|
|
def playlistrow_id(self):
|
|
return self.dto.playlistrow_id
|
|
|
|
@property
|
|
def row_number(self):
|
|
return self.dto.row_number
|
|
|
|
@row_number.setter
|
|
def row_number(self, value: int) -> None:
|
|
# TODO do we need to set up write access to db?
|
|
self.dto.row_number = value
|
|
|
|
def check_for_end_of_track(self) -> None:
|
|
"""
|
|
Check whether track has ended. If so, emit track_ended_signal
|
|
"""
|
|
|
|
if self.start_time is None:
|
|
return
|
|
|
|
if self.end_of_track_signalled:
|
|
return
|
|
|
|
if self.music.is_playing():
|
|
return
|
|
|
|
self.start_time = None
|
|
if self.fade_graph:
|
|
self.fade_graph.clear()
|
|
# Ensure that player is released
|
|
self.music.fade(0)
|
|
self.signals.track_ended_signal.emit()
|
|
self.end_of_track_signalled = True
|
|
|
|
def drop3db(self, enable: bool) -> None:
|
|
"""
|
|
If enable is true, drop output by 3db else restore to full volume
|
|
"""
|
|
|
|
if enable:
|
|
self.music.set_volume(volume=Config.VLC_VOLUME_DROP3db, set_default=False)
|
|
else:
|
|
self.music.set_volume(volume=Config.VLC_VOLUME_DEFAULT, set_default=False)
|
|
|
|
def fade(self, fade_seconds: int = Config.FADEOUT_SECONDS) -> None:
|
|
"""Fade music"""
|
|
|
|
self.resume_marker = self.music.get_position()
|
|
self.music.fade(fade_seconds)
|
|
self.signals.track_ended_signal.emit()
|
|
|
|
def is_playing(self) -> bool:
|
|
"""
|
|
Return True if we're currently playing else False
|
|
"""
|
|
|
|
if self.start_time is None:
|
|
return False
|
|
|
|
return self.music.is_playing()
|
|
|
|
def play(self, position: float | None = None) -> None:
|
|
"""Play track"""
|
|
|
|
now = dt.datetime.now()
|
|
self.start_time = now
|
|
|
|
# Initialise player
|
|
self.music.play(self.path, start_time=now, position=position)
|
|
|
|
self.end_time = now + dt.timedelta(milliseconds=self.duration)
|
|
|
|
# Calculate time fade_graph should start updating
|
|
if self.fade_at:
|
|
update_graph_at_ms = max(
|
|
0, self.fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1
|
|
)
|
|
self.fade_graph_start_updates = now + dt.timedelta(
|
|
milliseconds=update_graph_at_ms
|
|
)
|
|
|
|
def set_forecast_start_time(
|
|
self, modified_rows: list[int], start: dt.datetime | None
|
|
) -> dt.datetime | None:
|
|
"""
|
|
Set forecast start time for this row
|
|
|
|
Update passed modified rows list if we changed the row.
|
|
|
|
Return new start time
|
|
"""
|
|
|
|
changed = False
|
|
|
|
if self.forecast_start_time != start:
|
|
self.forecast_start_time = start
|
|
changed = True
|
|
if start is None:
|
|
if self.forecast_end_time is not None:
|
|
self.forecast_end_time = None
|
|
changed = True
|
|
new_start_time = None
|
|
else:
|
|
end_time = start + dt.timedelta(milliseconds=self.duration)
|
|
new_start_time = end_time
|
|
if self.forecast_end_time != end_time:
|
|
self.forecast_end_time = end_time
|
|
changed = True
|
|
|
|
if changed and self.row_number not in modified_rows:
|
|
modified_rows.append(self.row_number)
|
|
|
|
return new_start_time
|
|
|
|
def stop(self, fade_seconds: int = 0) -> None:
|
|
"""
|
|
Stop this track playing
|
|
"""
|
|
|
|
self.resume_marker = self.music.get_position()
|
|
self.fade(fade_seconds)
|
|
|
|
# Reset fade graph
|
|
if self.fade_graph:
|
|
self.fade_graph.clear()
|
|
|
|
def time_playing(self) -> int:
|
|
"""
|
|
Return time track has been playing in milliseconds, zero if not playing
|
|
"""
|
|
|
|
if self.start_time is None:
|
|
return 0
|
|
|
|
return self.music.get_playtime()
|
|
|
|
def time_remaining_intro(self) -> int:
|
|
"""
|
|
Return milliseconds of intro remaining. Return 0 if no intro time in track
|
|
record or if intro has finished.
|
|
"""
|
|
|
|
if not self.intro:
|
|
return 0
|
|
|
|
return max(0, self.intro - self.time_playing())
|
|
|
|
def time_to_fade(self) -> int:
|
|
"""
|
|
Return milliseconds until fade time. Return zero if we're not playing.
|
|
"""
|
|
|
|
if self.start_time is None:
|
|
return 0
|
|
|
|
return self.fade_at - self.time_playing()
|
|
|
|
def time_to_silence(self) -> int:
|
|
"""
|
|
Return milliseconds until silent. Return zero if we're not playing.
|
|
"""
|
|
|
|
if self.start_time is None:
|
|
return 0
|
|
|
|
return self.silence_at - self.time_playing()
|
|
|
|
def update_fade_graph(self) -> None:
|
|
"""
|
|
Update fade graph
|
|
"""
|
|
|
|
if (
|
|
not self.is_playing()
|
|
or not self.fade_graph_start_updates
|
|
or not self.fade_graph
|
|
):
|
|
return
|
|
|
|
now = dt.datetime.now()
|
|
|
|
if self.fade_graph_start_updates > now:
|
|
return
|
|
|
|
self.fade_graph.tick(self.time_playing())
|
|
|
|
|
|
class _AddFadeCurve(QObject):
|
|
"""
|
|
Initialising a fade curve introduces a noticeable delay so carry out in
|
|
a thread.
|
|
"""
|
|
|
|
finished = pyqtSignal()
|
|
|
|
def __init__(
|
|
self,
|
|
plr: PlaylistRow,
|
|
track_path: str,
|
|
track_fade_at: int,
|
|
track_silence_at: int,
|
|
) -> None:
|
|
super().__init__()
|
|
self.plr = plr
|
|
self.track_path = track_path
|
|
self.track_fade_at = track_fade_at
|
|
self.track_silence_at = track_silence_at
|
|
|
|
def run(self) -> None:
|
|
"""
|
|
Create fade curve and add to PlaylistTrack object
|
|
"""
|
|
|
|
fc = FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at)
|
|
if not fc:
|
|
log.error(f"Failed to create FadeCurve for {self.track_path=}")
|
|
else:
|
|
self.plr.fade_graph = fc
|
|
self.finished.emit()
|
|
|
|
|
|
class FadeCurve:
|
|
GraphWidget: PlotWidget | None = None
|
|
|
|
def __init__(
|
|
self, track_path: str, track_fade_at: int, track_silence_at: int
|
|
) -> None:
|
|
"""
|
|
Set up fade graph array
|
|
"""
|
|
|
|
audio = helpers.get_audio_segment(track_path)
|
|
if not audio:
|
|
log.error(f"FadeCurve: could not get audio for {track_path=}")
|
|
return None
|
|
|
|
# Start point of curve is Config.FADE_CURVE_MS_BEFORE_FADE
|
|
# milliseconds before fade starts to silence
|
|
self.start_ms: int = max(
|
|
0, track_fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1
|
|
)
|
|
self.end_ms: int = track_silence_at
|
|
audio_segment = audio[self.start_ms : self.end_ms]
|
|
self.graph_array = np.array(audio_segment.get_array_of_samples())
|
|
|
|
# Calculate the factor to map milliseconds of track to array
|
|
self.ms_to_array_factor = len(self.graph_array) / (self.end_ms - self.start_ms)
|
|
|
|
self.curve: PlotDataItem | None = None
|
|
self.region: LinearRegionItem | None = None
|
|
|
|
def clear(self) -> None:
|
|
"""Clear the current graph"""
|
|
|
|
if self.GraphWidget:
|
|
self.GraphWidget.clear()
|
|
|
|
def plot(self) -> None:
|
|
if self.GraphWidget:
|
|
self.curve = self.GraphWidget.plot(self.graph_array)
|
|
if self.curve:
|
|
self.curve.setPen(Config.FADE_CURVE_FOREGROUND)
|
|
else:
|
|
log.debug("_FadeCurve.plot: no curve")
|
|
else:
|
|
log.debug("_FadeCurve.plot: no GraphWidget")
|
|
|
|
def tick(self, play_time: int) -> None:
|
|
"""Update volume fade curve"""
|
|
|
|
if not self.GraphWidget:
|
|
return
|
|
|
|
ms_of_graph = play_time - self.start_ms
|
|
if ms_of_graph < 0:
|
|
return
|
|
|
|
if self.region is None:
|
|
# Create the region now that we're into fade
|
|
self.region = pg.LinearRegionItem([0, 0], bounds=[0, len(self.graph_array)])
|
|
self.GraphWidget.addItem(self.region)
|
|
|
|
# Update region position
|
|
if self.region:
|
|
self.region.setRegion([0, ms_of_graph * self.ms_to_array_factor])
|
|
|
|
|
|
@singleton
|
|
class TrackSequence:
|
|
"""
|
|
Maintain a list of which track (if any) is next, current and
|
|
previous. A track can only be previous after being current, and can
|
|
only be current after being next. If one of the tracks listed here
|
|
moves, the row_number and/or playlist_id will change.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
"""
|
|
Set up storage for the three monitored tracks
|
|
"""
|
|
|
|
self.next: PlaylistRow | None = None
|
|
self.current: PlaylistRow | None = None
|
|
self.previous: PlaylistRow | None = None
|
|
|
|
def set_next(self, plr: PlaylistRow | None) -> None:
|
|
"""
|
|
Set the 'next' track to be passed PlaylistRow. Clear any previous
|
|
next track. If passed PlaylistRow is None just clear existing
|
|
next track.
|
|
"""
|
|
|
|
# Clear any existing fade graph
|
|
if self.next and self.next.fade_graph:
|
|
self.next.fade_graph.clear()
|
|
|
|
if plr is None:
|
|
self.next = None
|
|
else:
|
|
self.next = plr
|
|
self.create_fade_graph()
|
|
|
|
def move_next_to_current(self) -> None:
|
|
"""
|
|
Make the next track the current track
|
|
"""
|
|
|
|
self.current = self.next
|
|
self.next = None
|
|
|
|
def move_current_to_previous(self) -> None:
|
|
"""
|
|
Make the current track the previous track
|
|
"""
|
|
|
|
if self.current is None:
|
|
raise ApplicationError("Tried to move non-existent track from current to previous")
|
|
|
|
# Dereference the fade curve so it can be garbage collected
|
|
self.current.fade_graph = None
|
|
self.previous = self.current
|
|
self.current = None
|
|
|
|
def move_previous_to_next(self) -> None:
|
|
"""
|
|
Make the previous track the next track
|
|
"""
|
|
|
|
self.next = self.previous
|
|
self.previous = None
|
|
|
|
def create_fade_graph(self) -> None:
|
|
"""
|
|
Initialise and add FadeCurve in a thread as it's slow
|
|
"""
|
|
|
|
self.fadecurve_thread = QThread()
|
|
if self.next is None:
|
|
raise ApplicationError("hell in a handcart")
|
|
self.worker = _AddFadeCurve(
|
|
self.next,
|
|
track_path=self.next.path,
|
|
track_fade_at=self.next.fade_at,
|
|
track_silence_at=self.next.silence_at,
|
|
)
|
|
self.worker.moveToThread(self.fadecurve_thread)
|
|
self.fadecurve_thread.started.connect(self.worker.run)
|
|
self.worker.finished.connect(self.fadecurve_thread.quit)
|
|
self.worker.finished.connect(self.worker.deleteLater)
|
|
self.fadecurve_thread.finished.connect(self.fadecurve_thread.deleteLater)
|
|
self.fadecurve_thread.start()
|
|
|
|
def update(self) -> None:
|
|
"""
|
|
If a PlaylistRow is edited (moved, title changed, etc), the
|
|
playlistrow_id won't change. We can retrieve the PlaylistRow
|
|
using the playlistrow_id and update the stored PlaylistRow.
|
|
"""
|
|
|
|
for ts in [self.next, self.current, self.previous]:
|
|
if not ts:
|
|
continue
|
|
playlist_row_dto = repository.get_playlist_row(ts.playlistrow_id)
|
|
if not playlist_row_dto:
|
|
raise ApplicationError(f"(Can't retrieve PlaylistRows entry, {self=}")
|
|
ts = PlaylistRow(playlist_row_dto)
|