musicmuster/app/playlistrow.py
2025-04-19 12:25:29 +01:00

584 lines
17 KiB
Python

# Standard library imports
import datetime as dt
from typing import Any
# PyQt imports
from PyQt6.QtCore import (
pyqtSignal,
QObject,
QThread,
)
# Third party imports
from pyqtgraph import PlotWidget # type: ignore
from pyqtgraph.graphicsItems.PlotDataItem import PlotDataItem # type: ignore
from pyqtgraph.graphicsItems.LinearRegionItem import LinearRegionItem # type: ignore
import numpy as np
import pyqtgraph as pg # type: ignore
# App imports
from classes import ApplicationError, MusicMusterSignals, PlaylistRowDTO, singleton
from config import Config
from log import log
from music_manager import Music
import ds
import helpers
class PlaylistRow:
"""
Object to manage playlist row and track.
"""
def __init__(self, dto: PlaylistRowDTO) -> None:
"""
The dto object will include row information plus a Tracks object
if this row has a track.
"""
self.dto = dto
self.music = Music(name=Config.VLC_MAIN_PLAYER_NAME)
self.signals = MusicMusterSignals()
self.end_of_track_signalled: bool = False
self.end_time: dt.datetime | None = None
self.fade_graph: Any | None = None
self.fade_graph_start_updates: dt.datetime | None = None
self.forecast_end_time: dt.datetime | None = None
self.forecast_start_time: dt.datetime | None = None
self.note_bg: str | None = None
self.note_fg: str | None = None
self.resume_marker: float = 0.0
self.row_bg: str | None = None
self.row_fg: str | None = None
self.start_time: dt.datetime | None = None
def __repr__(self) -> str:
track_id = None
if self.dto.track:
track_id = self.dto.track.track_id
return (
f"<PlaylistRow(playlist_id={self.dto.playlist_id}, "
f"row_number={self.dto.row_number}, "
f"playlistrow_id={self.dto.playlistrow_id}, "
f"note={self.dto.note}, track_id={track_id}>"
)
# Expose TrackDTO fields as properties
@property
def artist(self) -> str:
if self.dto.track:
return self.dto.track.artist
else:
return ""
@artist.setter
def artist(self, artist: str) -> None:
if not self.dto.track:
raise ApplicationError(f"No track_id when trying to set artist ({self})")
self.dto.track.artist = artist
ds.track_update(self.track_id, dict(artist=str(artist)))
@property
def bitrate(self) -> int:
if self.dto.track:
return self.dto.track.bitrate
else:
return 0
@property
def duration(self) -> int:
if self.dto.track:
return self.dto.track.duration
else:
return 0
@property
def fade_at(self) -> int:
if self.dto.track:
return self.dto.track.fade_at
else:
return 0
@property
def intro(self) -> int:
if self.dto.track:
return self.dto.track.intro
else:
return 0
@intro.setter
def intro(self, intro: int) -> None:
if not self.dto.track:
raise ApplicationError(f"No track_id when trying to set intro ({self})")
self.dto.track.intro = intro
ds.track_update(self.track_id, dict(intro=str(intro)))
@property
def lastplayed(self) -> dt.datetime | None:
if self.dto.track:
return self.dto.track.lastplayed
else:
return None
@property
def path(self) -> str:
if self.dto.track:
return self.dto.track.path
else:
return ""
@property
def silence_at(self) -> int:
if self.dto.track:
return self.dto.track.silence_at
else:
return 0
@property
def start_gap(self) -> int:
if self.dto.track:
return self.dto.track.start_gap
else:
return 0
@property
def title(self) -> str:
if self.dto.track:
return self.dto.track.title
else:
return ""
@title.setter
def title(self, title: str) -> None:
if not self.dto.track:
raise ApplicationError(f"No track_id when trying to set title ({self})")
self.dto.track.title = title
ds.track_update(self.track_id, dict(title=str(title)))
@property
def track_id(self) -> int:
if self.dto.track:
return self.dto.track.track_id
else:
return 0
@track_id.setter
def track_id(self, track_id: int) -> None:
"""
Adding a track_id should only happen to a header row.
"""
if self.track_id > 0:
raise ApplicationError(
"Attempting to add track to row with existing track ({self=}"
)
ds.track_add_to_header(playlistrow_id=self.playlistrow_id, track_id=track_id)
# Need to update with track information
track = ds.track_by_id(track_id)
if track:
for attr, value in track.__dataclass_fields__.items():
setattr(self, attr, value)
# Expose PlaylistRowDTO fields as properties
@property
def note(self) -> str:
return self.dto.note
@note.setter
def note(self, note: str) -> None:
self.dto.note = note
ds.playlistrow_update_note(self.playlistrow_id, str(note))
@property
def played(self) -> bool:
return self.dto.played
@played.setter
def played(self, value: bool) -> None:
self.dto.played = True
ds.playlistrow_played(self.playlistrow_id, value)
@property
def playlist_id(self) -> int:
return self.dto.playlist_id
@property
def playlistrow_id(self) -> int:
return self.dto.playlistrow_id
@property
def row_number(self) -> int:
return self.dto.row_number
@row_number.setter
def row_number(self, value: int) -> None:
# This does not update the database. The only times the row
# number changes are 1) in ds._playlist_check_playlist and
# ds.playlist_move_rows, and in both those places ds saves
# the change to the database.
self.dto.row_number = value
def drop3db(self, enable: bool) -> None:
"""
If enable is true, drop output by 3db else restore to full volume
"""
if enable:
self.music.set_volume(volume=Config.VLC_VOLUME_DROP3db, set_default=False)
else:
self.music.set_volume(volume=Config.VLC_VOLUME_DEFAULT, set_default=False)
def fade(self, fade_seconds: int = Config.FADEOUT_SECONDS) -> None:
"""Fade music"""
self.resume_marker = self.music.get_position()
self.music.fade(fade_seconds)
self.signals.track_ended_signal.emit(self.playlist_id)
self.end_of_track_signalled = True
def is_playing(self) -> bool:
"""
Return True if we're currently playing else False
"""
if self.start_time is None:
return False
return self.music.is_playing()
def play(self, position: float | None = None) -> None:
"""Play track"""
now = dt.datetime.now()
self.start_time = now
# Initialise player
self.music.play(
path=self.path,
start_time=now,
playlist_id=self.playlist_id,
position=position,
)
self.end_time = now + dt.timedelta(milliseconds=self.duration)
# Calculate time fade_graph should start updating
if self.fade_at:
update_graph_at_ms = max(
0, self.fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1
)
self.fade_graph_start_updates = now + dt.timedelta(
milliseconds=update_graph_at_ms
)
def set_forecast_start_time(
self, modified_rows: list[int], start: dt.datetime | None
) -> dt.datetime | None:
"""
Set forecast start time for this row
Update passed modified rows list if we changed the row.
Return new start time
"""
changed = False
if self.forecast_start_time != start:
self.forecast_start_time = start
changed = True
if start is None:
if self.forecast_end_time is not None:
self.forecast_end_time = None
changed = True
new_start_time = None
else:
end_time = start + dt.timedelta(milliseconds=self.duration)
new_start_time = end_time
if self.forecast_end_time != end_time:
self.forecast_end_time = end_time
changed = True
if changed and self.row_number not in modified_rows:
modified_rows.append(self.row_number)
return new_start_time
def stop(self, fade_seconds: int = 0) -> None:
"""
Stop this track playing
"""
self.resume_marker = self.music.get_position()
self.fade(fade_seconds)
# Reset fade graph
if self.fade_graph:
self.fade_graph.clear()
def time_playing(self) -> int:
"""
Return time track has been playing in milliseconds, zero if not playing
"""
if self.start_time is None:
return 0
return self.music.get_playtime()
def time_remaining_intro(self) -> int:
"""
Return milliseconds of intro remaining. Return 0 if no intro time in track
record or if intro has finished.
"""
if not self.intro:
return 0
return max(0, self.intro - self.time_playing())
def time_to_fade(self) -> int:
"""
Return milliseconds until fade time. Return zero if we're not playing.
"""
if self.start_time is None:
return 0
return self.fade_at - self.time_playing()
def time_to_silence(self) -> int:
"""
Return milliseconds until silent. Return zero if we're not playing.
"""
if self.start_time is None:
return 0
return self.silence_at - self.time_playing()
def update_fade_graph(self) -> None:
"""
Update fade graph
"""
if (
not self.is_playing()
or not self.fade_graph_start_updates
or not self.fade_graph
):
return
now = dt.datetime.now()
if self.fade_graph_start_updates > now:
return
self.fade_graph.tick(self.time_playing())
class _AddFadeCurve(QObject):
"""
Initialising a fade curve introduces a noticeable delay so carry out in
a thread.
"""
finished = pyqtSignal()
def __init__(
self,
plr: PlaylistRow,
track_path: str,
track_fade_at: int,
track_silence_at: int,
) -> None:
super().__init__()
self.plr = plr
self.track_path = track_path
self.track_fade_at = track_fade_at
self.track_silence_at = track_silence_at
def run(self) -> None:
"""
Create fade curve and add to PlaylistTrack object
"""
fc = FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at)
if not fc:
log.error(f"Failed to create FadeCurve for {self.track_path=}")
else:
self.plr.fade_graph = fc
self.finished.emit()
class FadeCurve:
GraphWidget: PlotWidget | None = None
def __init__(
self, track_path: str, track_fade_at: int, track_silence_at: int
) -> None:
"""
Set up fade graph array
"""
audio = helpers.get_audio_segment(track_path)
if not audio:
log.error(f"FadeCurve: could not get audio for {track_path=}")
return None
# Start point of curve is Config.FADE_CURVE_MS_BEFORE_FADE
# milliseconds before fade starts to silence
self.start_ms: int = max(
0, track_fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1
)
self.end_ms: int = track_silence_at
audio_segment = audio[self.start_ms : self.end_ms]
self.graph_array = np.array(audio_segment.get_array_of_samples())
# Calculate the factor to map milliseconds of track to array
self.ms_to_array_factor = len(self.graph_array) / (self.end_ms - self.start_ms)
self.curve: PlotDataItem | None = None
self.region: LinearRegionItem | None = None
def clear(self) -> None:
"""Clear the current graph"""
if self.GraphWidget:
self.GraphWidget.clear()
def plot(self) -> None:
if self.GraphWidget:
self.curve = self.GraphWidget.plot(self.graph_array)
if self.curve:
self.curve.setPen(Config.FADE_CURVE_FOREGROUND)
else:
log.debug("_FadeCurve.plot: no curve")
else:
log.debug("_FadeCurve.plot: no GraphWidget")
def tick(self, play_time: int) -> None:
"""Update volume fade curve"""
if not self.GraphWidget:
return
ms_of_graph = play_time - self.start_ms
if ms_of_graph < 0:
return
if self.region is None:
# Create the region now that we're into fade
self.region = pg.LinearRegionItem([0, 0], bounds=[0, len(self.graph_array)])
self.GraphWidget.addItem(self.region)
# Update region position
if self.region:
self.region.setRegion([0, ms_of_graph * self.ms_to_array_factor])
@singleton
class TrackSequence:
"""
Maintain a list of which track (if any) is next, current and
previous. A track can only be previous after being current, and can
only be current after being next. If one of the tracks listed here
moves, the row_number and/or playlist_id will change.
"""
def __init__(self) -> None:
"""
Set up storage for the three monitored tracks
"""
self.next: PlaylistRow | None = None
self.current: PlaylistRow | None = None
self.previous: PlaylistRow | None = None
def set_next(self, plr: PlaylistRow | None) -> None:
"""
Set the 'next' track to be passed PlaylistRow. Clear any previous
next track. If passed PlaylistRow is None just clear existing
next track.
"""
# Clear any existing fade graph
if self.next and self.next.fade_graph:
self.next.fade_graph.clear()
if plr is None:
self.next = None
else:
self.next = plr
self.create_fade_graph()
def move_next_to_current(self) -> None:
"""
Make the next track the current track
"""
self.current = self.next
self.next = None
def move_current_to_previous(self) -> None:
"""
Make the current track the previous track
"""
if self.current is None:
raise ApplicationError("Tried to move non-existent track from current to previous")
# Dereference the fade curve so it can be garbage collected
self.current.fade_graph = None
self.previous = self.current
self.current = None
def move_previous_to_next(self) -> None:
"""
Make the previous track the next track
"""
self.next = self.previous
self.previous = None
def create_fade_graph(self) -> None:
"""
Initialise and add FadeCurve in a thread as it's slow
"""
self.fadecurve_thread = QThread()
if self.next is None:
raise ApplicationError("hell in a handcart")
self.worker = _AddFadeCurve(
self.next,
track_path=self.next.path,
track_fade_at=self.next.fade_at,
track_silence_at=self.next.silence_at,
)
self.worker.moveToThread(self.fadecurve_thread)
self.fadecurve_thread.started.connect(self.worker.run)
self.worker.finished.connect(self.fadecurve_thread.quit)
self.worker.finished.connect(self.worker.deleteLater)
self.fadecurve_thread.finished.connect(self.fadecurve_thread.deleteLater)
self.fadecurve_thread.start()
def update(self) -> None:
"""
If a PlaylistRow is edited (moved, title changed, etc), the
playlistrow_id won't change. We can retrieve the PlaylistRow
using the playlistrow_id and update the stored PlaylistRow.
"""
for ts in [self.next, self.current, self.previous]:
if not ts:
continue
playlist_row_dto = ds.playlistrow_by_id(ts.playlistrow_id)
if not playlist_row_dto:
raise ApplicationError(f"(Can't retrieve PlaylistRows entry, {self=}")
ts = PlaylistRow(playlist_row_dto)