# Standard library imports import datetime as dt from typing import Any # PyQt imports from PyQt6.QtCore import ( pyqtSignal, QObject, QThread, ) # Third party imports from pyqtgraph import PlotWidget # type: ignore from pyqtgraph.graphicsItems.PlotDataItem import PlotDataItem # type: ignore from pyqtgraph.graphicsItems.LinearRegionItem import LinearRegionItem # type: ignore import numpy as np import pyqtgraph as pg # type: ignore # App imports from classes import ApplicationError, MusicMusterSignals, PlaylistRowDTO, singleton from config import Config from log import log from music_manager import Music import ds import helpers class PlaylistRow: """ Object to manage playlist row and track. """ def __init__(self, dto: PlaylistRowDTO) -> None: """ The dto object will include row information plus a Tracks object if this row has a track. """ self.dto = dto self.music = Music(name=Config.VLC_MAIN_PLAYER_NAME) self.signals = MusicMusterSignals() self.end_of_track_signalled: bool = False self.end_time: dt.datetime | None = None self.fade_graph: Any | None = None self.fade_graph_start_updates: dt.datetime | None = None self.forecast_end_time: dt.datetime | None = None self.forecast_start_time: dt.datetime | None = None self.note_bg: str | None = None self.note_fg: str | None = None self.resume_marker: float = 0.0 self.row_bg: str | None = None self.row_fg: str | None = None self.start_time: dt.datetime | None = None def __repr__(self) -> str: track_id = None if self.dto.track: track_id = self.dto.track.track_id return ( f"" ) # Expose TrackDTO fields as properties @property def artist(self) -> str: if self.dto.track: return self.dto.track.artist else: return "" @artist.setter def artist(self, artist: str) -> None: if not self.dto.track: raise ApplicationError(f"No track_id when trying to set artist ({self})") self.dto.track.artist = artist ds.track_update(self.track_id, dict(artist=str(artist))) @property def bitrate(self) -> int: if self.dto.track: return self.dto.track.bitrate else: return 0 @property def duration(self) -> int: if self.dto.track: return self.dto.track.duration else: return 0 @property def fade_at(self) -> int: if self.dto.track: return self.dto.track.fade_at else: return 0 @property def intro(self) -> int: if self.dto.track: return self.dto.track.intro else: return 0 @intro.setter def intro(self, intro: int) -> None: if not self.dto.track: raise ApplicationError(f"No track_id when trying to set intro ({self})") self.dto.track.intro = intro ds.track_update(self.track_id, dict(intro=str(intro))) @property def lastplayed(self) -> dt.datetime | None: if self.dto.track: return self.dto.track.lastplayed else: return None @property def path(self) -> str: if self.dto.track: return self.dto.track.path else: return "" @property def silence_at(self) -> int: if self.dto.track: return self.dto.track.silence_at else: return 0 @property def start_gap(self) -> int: if self.dto.track: return self.dto.track.start_gap else: return 0 @property def title(self) -> str: if self.dto.track: return self.dto.track.title else: return "" @title.setter def title(self, title: str) -> None: if not self.dto.track: raise ApplicationError(f"No track_id when trying to set title ({self})") self.dto.track.title = title ds.track_update(self.track_id, dict(title=str(title))) @property def track_id(self) -> int: if self.dto.track: return self.dto.track.track_id else: return 0 @track_id.setter def track_id(self, track_id: int) -> None: """ Adding a track_id should only happen to a header row. """ if self.track_id > 0: raise ApplicationError("Attempting to add track to row with existing track ({self=}") ds.track_add_to_header(playlistrow_id=self.playlistrow_id, track_id=track_id) # Need to update with track information track = ds.track_by_id(track_id) if track: for attr, value in track.__dataclass_fields__.items(): setattr(self, attr, value) # Expose PlaylistRowDTO fields as properties @property def note(self) -> str: return self.dto.note @note.setter def note(self, note: str) -> None: self.dto.note = note ds.playlistrow_update_note(self.playlistrow_id, str(note)) @property def played(self) -> bool: return self.dto.played @played.setter def played(self, value: bool) -> None: self.dto.played = True ds.playlistrow_played(self.playlistrow_id, value) @property def playlist_id(self) -> int: return self.dto.playlist_id @property def playlistrow_id(self) -> int: return self.dto.playlistrow_id @property def row_number(self) -> int: return self.dto.row_number @row_number.setter def row_number(self, value: int) -> None: # This does not update the database. The only times the row # number changes are 1) in ds._playlist_check_playlist and # ds.playlist_move_rows, and in both those places ds saves # the change to the database. self.dto.row_number = value def check_for_end_of_track(self) -> None: """ Check whether track has ended. If so, emit track_ended_signal """ if self.start_time is None: return if self.end_of_track_signalled: return if self.music.is_playing(): return self.start_time = None if self.fade_graph: self.fade_graph.clear() # Ensure that player is released self.music.fade(0) self.signals.track_ended_signal.emit(self.playlist_id) self.end_of_track_signalled = True def drop3db(self, enable: bool) -> None: """ If enable is true, drop output by 3db else restore to full volume """ if enable: self.music.set_volume(volume=Config.VLC_VOLUME_DROP3db, set_default=False) else: self.music.set_volume(volume=Config.VLC_VOLUME_DEFAULT, set_default=False) def fade(self, fade_seconds: int = Config.FADEOUT_SECONDS) -> None: """Fade music""" self.resume_marker = self.music.get_position() self.music.fade(fade_seconds) self.signals.track_ended_signal.emit(self.playlist_id) self.end_of_track_signalled = True def is_playing(self) -> bool: """ Return True if we're currently playing else False """ if self.start_time is None: return False return self.music.is_playing() def play(self, position: float | None = None) -> None: """Play track""" now = dt.datetime.now() self.start_time = now # Initialise player self.music.play(self.path, start_time=now, position=position) self.end_time = now + dt.timedelta(milliseconds=self.duration) # Calculate time fade_graph should start updating if self.fade_at: update_graph_at_ms = max( 0, self.fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1 ) self.fade_graph_start_updates = now + dt.timedelta( milliseconds=update_graph_at_ms ) def set_forecast_start_time( self, modified_rows: list[int], start: dt.datetime | None ) -> dt.datetime | None: """ Set forecast start time for this row Update passed modified rows list if we changed the row. Return new start time """ changed = False if self.forecast_start_time != start: self.forecast_start_time = start changed = True if start is None: if self.forecast_end_time is not None: self.forecast_end_time = None changed = True new_start_time = None else: end_time = start + dt.timedelta(milliseconds=self.duration) new_start_time = end_time if self.forecast_end_time != end_time: self.forecast_end_time = end_time changed = True if changed and self.row_number not in modified_rows: modified_rows.append(self.row_number) return new_start_time def stop(self, fade_seconds: int = 0) -> None: """ Stop this track playing """ self.resume_marker = self.music.get_position() self.fade(fade_seconds) # Reset fade graph if self.fade_graph: self.fade_graph.clear() def time_playing(self) -> int: """ Return time track has been playing in milliseconds, zero if not playing """ if self.start_time is None: return 0 return self.music.get_playtime() def time_remaining_intro(self) -> int: """ Return milliseconds of intro remaining. Return 0 if no intro time in track record or if intro has finished. """ if not self.intro: return 0 return max(0, self.intro - self.time_playing()) def time_to_fade(self) -> int: """ Return milliseconds until fade time. Return zero if we're not playing. """ if self.start_time is None: return 0 return self.fade_at - self.time_playing() def time_to_silence(self) -> int: """ Return milliseconds until silent. Return zero if we're not playing. """ if self.start_time is None: return 0 return self.silence_at - self.time_playing() def update_fade_graph(self) -> None: """ Update fade graph """ if ( not self.is_playing() or not self.fade_graph_start_updates or not self.fade_graph ): return now = dt.datetime.now() if self.fade_graph_start_updates > now: return self.fade_graph.tick(self.time_playing()) class _AddFadeCurve(QObject): """ Initialising a fade curve introduces a noticeable delay so carry out in a thread. """ finished = pyqtSignal() def __init__( self, plr: PlaylistRow, track_path: str, track_fade_at: int, track_silence_at: int, ) -> None: super().__init__() self.plr = plr self.track_path = track_path self.track_fade_at = track_fade_at self.track_silence_at = track_silence_at def run(self) -> None: """ Create fade curve and add to PlaylistTrack object """ fc = FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at) if not fc: log.error(f"Failed to create FadeCurve for {self.track_path=}") else: self.plr.fade_graph = fc self.finished.emit() class FadeCurve: GraphWidget: PlotWidget | None = None def __init__( self, track_path: str, track_fade_at: int, track_silence_at: int ) -> None: """ Set up fade graph array """ audio = helpers.get_audio_segment(track_path) if not audio: log.error(f"FadeCurve: could not get audio for {track_path=}") return None # Start point of curve is Config.FADE_CURVE_MS_BEFORE_FADE # milliseconds before fade starts to silence self.start_ms: int = max( 0, track_fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1 ) self.end_ms: int = track_silence_at audio_segment = audio[self.start_ms : self.end_ms] self.graph_array = np.array(audio_segment.get_array_of_samples()) # Calculate the factor to map milliseconds of track to array self.ms_to_array_factor = len(self.graph_array) / (self.end_ms - self.start_ms) self.curve: PlotDataItem | None = None self.region: LinearRegionItem | None = None def clear(self) -> None: """Clear the current graph""" if self.GraphWidget: self.GraphWidget.clear() def plot(self) -> None: if self.GraphWidget: self.curve = self.GraphWidget.plot(self.graph_array) if self.curve: self.curve.setPen(Config.FADE_CURVE_FOREGROUND) else: log.debug("_FadeCurve.plot: no curve") else: log.debug("_FadeCurve.plot: no GraphWidget") def tick(self, play_time: int) -> None: """Update volume fade curve""" if not self.GraphWidget: return ms_of_graph = play_time - self.start_ms if ms_of_graph < 0: return if self.region is None: # Create the region now that we're into fade self.region = pg.LinearRegionItem([0, 0], bounds=[0, len(self.graph_array)]) self.GraphWidget.addItem(self.region) # Update region position if self.region: self.region.setRegion([0, ms_of_graph * self.ms_to_array_factor]) @singleton class TrackSequence: """ Maintain a list of which track (if any) is next, current and previous. A track can only be previous after being current, and can only be current after being next. If one of the tracks listed here moves, the row_number and/or playlist_id will change. """ def __init__(self) -> None: """ Set up storage for the three monitored tracks """ self.next: PlaylistRow | None = None self.current: PlaylistRow | None = None self.previous: PlaylistRow | None = None def set_next(self, plr: PlaylistRow | None) -> None: """ Set the 'next' track to be passed PlaylistRow. Clear any previous next track. If passed PlaylistRow is None just clear existing next track. """ # Clear any existing fade graph if self.next and self.next.fade_graph: self.next.fade_graph.clear() if plr is None: self.next = None else: self.next = plr self.create_fade_graph() def move_next_to_current(self) -> None: """ Make the next track the current track """ self.current = self.next self.next = None def move_current_to_previous(self) -> None: """ Make the current track the previous track """ if self.current is None: raise ApplicationError("Tried to move non-existent track from current to previous") # Dereference the fade curve so it can be garbage collected self.current.fade_graph = None self.previous = self.current self.current = None def move_previous_to_next(self) -> None: """ Make the previous track the next track """ self.next = self.previous self.previous = None def create_fade_graph(self) -> None: """ Initialise and add FadeCurve in a thread as it's slow """ self.fadecurve_thread = QThread() if self.next is None: raise ApplicationError("hell in a handcart") self.worker = _AddFadeCurve( self.next, track_path=self.next.path, track_fade_at=self.next.fade_at, track_silence_at=self.next.silence_at, ) self.worker.moveToThread(self.fadecurve_thread) self.fadecurve_thread.started.connect(self.worker.run) self.worker.finished.connect(self.fadecurve_thread.quit) self.worker.finished.connect(self.worker.deleteLater) self.fadecurve_thread.finished.connect(self.fadecurve_thread.deleteLater) self.fadecurve_thread.start() def update(self) -> None: """ If a PlaylistRow is edited (moved, title changed, etc), the playlistrow_id won't change. We can retrieve the PlaylistRow using the playlistrow_id and update the stored PlaylistRow. """ for ts in [self.next, self.current, self.previous]: if not ts: continue playlist_row_dto = ds.playlistrow_by_id(ts.playlistrow_id) if not playlist_row_dto: raise ApplicationError(f"(Can't retrieve PlaylistRows entry, {self=}") ts = PlaylistRow(playlist_row_dto)