Compare commits

...

12 Commits

Author SHA1 Message Date
Keith Edmunds
bd7fb79610 Clear fade graph when clearing next track 2024-07-30 16:36:29 +01:00
Keith Edmunds
59b6b87186 Fixup typos in playlistmodel.py 2024-07-30 04:21:04 +01:00
Keith Edmunds
b15687a4c6 Clean up playlists.py 2024-07-30 04:12:35 +01:00
Keith Edmunds
076451ff89 Cleanup of playlistmodel.py 2024-07-29 21:49:17 +01:00
Keith Edmunds
d6f55c5987 Rewrite of track handling
Combine the old track_manager and playlist data structures into
RowAndTrack data structure.
2024-07-29 18:52:02 +01:00
Keith Edmunds
4a85d7ea84 Fix repr typo 2024-07-28 19:47:05 +01:00
Keith Edmunds
3c01fb63c3 Implement VLC logging 2024-07-28 19:45:55 +01:00
Keith Edmunds
b423ab0624 Log.debug production stackprinter messages 2024-07-26 18:10:53 +01:00
Keith Edmunds
051d8cf0ef Log releasing player and keep player count
Working on issue #251
2024-07-26 11:49:38 +01:00
Keith Edmunds
1513ad96d8 Fix track times bug
When update_track_times runs, it looks as track_sequence.current and
.next, but didn't check that those tracks referred to the current
playlist, which could cause a KeyError.

Fixes #252
2024-07-26 11:38:33 +01:00
Keith Edmunds
04c2c6377a Increase play debounce time 500ms → 1000ms 2024-07-26 11:20:15 +01:00
Keith Edmunds
9973f00055 Enhance debugging for failed fade graph creation 2024-07-26 11:18:29 +01:00
10 changed files with 1064 additions and 1050 deletions

View File

@ -1,15 +1,73 @@
# Standard library imports
from __future__ import annotations
import ctypes
from dataclasses import dataclass, field
import datetime as dt
from enum import auto, Enum
import platform
import threading
from time import sleep
from typing import Any, Optional, NamedTuple
# PyQt imports
from PyQt6.QtCore import pyqtSignal, QObject
# Third party imports
import numpy as np
import pyqtgraph as pg # type: ignore
import vlc # type: ignore
# PyQt imports
from PyQt6.QtCore import (
pyqtSignal,
QObject,
QRunnable,
QThread,
QThreadPool,
)
from pyqtgraph import PlotWidget
from pyqtgraph.graphicsItems.PlotDataItem import PlotDataItem # type: ignore
from pyqtgraph.graphicsItems.LinearRegionItem import LinearRegionItem # type: ignore
# App imports
import helpers
from config import Config
from log import log
from models import PlaylistRows
from helpers import (
file_is_unreadable,
get_audio_segment,
show_warning,
singleton,
)
lock = threading.Lock()
# Define the VLC callback function type
VLC_LOG_CB = ctypes.CFUNCTYPE(
None,
ctypes.c_void_p,
ctypes.c_int,
ctypes.c_void_p,
ctypes.c_char_p,
ctypes.c_void_p,
)
# Determine the correct C library for vsnprintf based on the platform
if platform.system() == "Windows":
libc = ctypes.CDLL("msvcrt")
elif platform.system() == "Linux":
libc = ctypes.CDLL("libc.so.6")
elif platform.system() == "Darwin": # macOS
libc = ctypes.CDLL("libc.dylib")
else:
raise OSError("Unsupported operating system")
# Define the vsnprintf function
libc.vsnprintf.argtypes = [
ctypes.c_char_p,
ctypes.c_size_t,
ctypes.c_char_p,
ctypes.c_void_p,
]
libc.vsnprintf.restype = ctypes.c_int
class Col(Enum):
@ -25,7 +83,381 @@ class Col(Enum):
NOTE = auto()
@helpers.singleton
class _AddFadeCurve(QObject):
"""
Initialising a fade curve introduces a noticeable delay so carry out in
a thread.
"""
finished = pyqtSignal()
def __init__(
self,
rat: RowAndTrack,
track_path: str,
track_fade_at: int,
track_silence_at: int,
) -> None:
super().__init__()
self.rat = rat
self.track_path = track_path
self.track_fade_at = track_fade_at
self.track_silence_at = track_silence_at
def run(self) -> None:
"""
Create fade curve and add to PlaylistTrack object
"""
fc = _FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at)
if not fc:
log.error(f"Failed to create FadeCurve for {self.track_path=}")
else:
self.rat.fade_graph = fc
self.finished.emit()
class _FadeCurve:
GraphWidget: Optional[PlotWidget] = None
def __init__(
self, track_path: str, track_fade_at: int, track_silence_at: int
) -> None:
"""
Set up fade graph array
"""
audio = get_audio_segment(track_path)
if not audio:
log.error(f"FadeCurve: could not get audio for {track_path=}")
return None
# Start point of curve is Config.FADE_CURVE_MS_BEFORE_FADE
# milliseconds before fade starts to silence
self.start_ms: int = max(
0, track_fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1
)
self.end_ms: int = track_silence_at
self.audio_segment = audio[self.start_ms : self.end_ms]
self.graph_array = np.array(self.audio_segment.get_array_of_samples())
# Calculate the factor to map milliseconds of track to array
self.ms_to_array_factor = len(self.graph_array) / (self.end_ms - self.start_ms)
self.curve: Optional[PlotDataItem] = None
self.region: Optional[LinearRegionItem] = None
def clear(self) -> None:
"""Clear the current graph"""
if self.GraphWidget:
self.GraphWidget.clear()
def plot(self) -> None:
if self.GraphWidget:
self.curve = self.GraphWidget.plot(self.graph_array)
if self.curve:
self.curve.setPen(Config.FADE_CURVE_FOREGROUND)
else:
log.debug("_FadeCurve.plot: no curve")
else:
log.debug("_FadeCurve.plot: no GraphWidget")
def tick(self, play_time: int) -> None:
"""Update volume fade curve"""
if not self.GraphWidget:
return
ms_of_graph = play_time - self.start_ms
if ms_of_graph < 0:
return
if self.region is None:
# Create the region now that we're into fade
log.debug("issue223: _FadeCurve: create region")
self.region = pg.LinearRegionItem([0, 0], bounds=[0, len(self.graph_array)])
self.GraphWidget.addItem(self.region)
# Update region position
if self.region:
log.debug("issue223: _FadeCurve: update region")
self.region.setRegion([0, ms_of_graph * self.ms_to_array_factor])
class _FadeTrack(QRunnable):
def __init__(self, player: vlc.MediaPlayer, fade_seconds: int) -> None:
super().__init__()
self.player = player
self.fade_seconds = fade_seconds
def run(self) -> None:
"""
Implementation of fading the player
"""
if not self.player:
return
# Reduce volume logarithmically
total_steps = self.fade_seconds * Config.FADEOUT_STEPS_PER_SECOND
db_reduction_per_step = Config.FADEOUT_DB / total_steps
reduction_factor_per_step = pow(10, (db_reduction_per_step / 20))
volume = self.player.audio_get_volume()
for i in range(1, total_steps + 1):
self.player.audio_set_volume(
int(volume * pow(reduction_factor_per_step, i))
)
sleep(1 / Config.FADEOUT_STEPS_PER_SECOND)
self.player.stop()
class _Music:
"""
Manage the playing of music tracks
"""
def __init__(self, name: str) -> None:
self.VLC = vlc.Instance()
self.VLC.set_user_agent(name, name)
self.player: Optional[vlc.MediaPlayer] = None
self.name = name
self.max_volume: int = Config.VLC_VOLUME_DEFAULT
self.start_dt: Optional[dt.datetime] = None
self.player_count: int = 0
# Set up logging
self._set_vlc_log()
@VLC_LOG_CB
def log_callback(data, level, ctx, fmt, args):
try:
# Create a ctypes string buffer to hold the formatted message
buf = ctypes.create_string_buffer(1024)
# Use vsnprintf to format the string with the va_list
libc.vsnprintf(buf, len(buf), fmt, args)
# Decode the formatted message
message = buf.value.decode("utf-8", errors="replace")
log.debug("VLC: " + message)
except Exception as e:
log.error(f"Error in VLC log callback: {e}")
def _set_vlc_log(self):
try:
vlc.libvlc_log_set(self.VLC, self.log_callback, None)
log.info("VLC logging set up successfully")
except Exception as e:
log.error(f"Failed to set up VLC logging: {e}")
def adjust_by_ms(self, ms: int) -> None:
"""Move player position by ms milliseconds"""
if not self.player:
return
elapsed_ms = self.get_playtime()
position = self.get_position()
if not position:
position = 0.0
new_position = max(0.0, position + ((position * ms) / elapsed_ms))
self.set_position(new_position)
# Adjus start time so elapsed time calculations are correct
if new_position == 0:
self.start_dt = dt.datetime.now()
else:
if self.start_dt:
self.start_dt -= dt.timedelta(milliseconds=ms)
else:
self.start_dt = dt.datetime.now() - dt.timedelta(milliseconds=ms)
def stop(self) -> None:
"""Immediately stop playing"""
log.debug(f"Music[{self.name}].stop()")
self.start_dt = None
if not self.player:
return
p = self.player
self.player = None
self.start_dt = None
with lock:
p.stop()
p.release()
self.player_count -= 1
log.debug(f"_Music.stop: Releasing player {p=}, {self.player_count=}")
p = None
def fade(self, fade_seconds: int) -> None:
"""
Fade the currently playing track.
The actual management of fading runs in its own thread so as not
to hold up the UI during the fade.
"""
if not self.player:
return
if not self.player.get_position() > 0 and self.player.is_playing():
return
if fade_seconds <= 0:
self.stop()
return
# Take a copy of current player to allow another track to be
# started without interfering here
with lock:
p = self.player
self.player = None
pool = QThreadPool.globalInstance()
if pool:
fader = _FadeTrack(p, fade_seconds=fade_seconds)
pool.start(fader)
self.start_dt = None
else:
log.error("_Music: failed to allocate QThreadPool")
def get_playtime(self) -> int:
"""
Return number of milliseconds current track has been playing or
zero if not playing. The vlc function get_time() only updates 3-4
times a second; this function has much better resolution.
"""
if self.start_dt is None:
return 0
now = dt.datetime.now()
elapsed_seconds = (now - self.start_dt).total_seconds()
return int(elapsed_seconds * 1000)
def get_position(self) -> Optional[float]:
"""Return current position"""
if not self.player:
return None
return self.player.get_position()
def is_playing(self) -> bool:
"""
Return True if we're playing
"""
if not self.player:
return False
# There is a discrete time between starting playing a track and
# player.is_playing() returning True, so assume playing if less
# than Config.PLAY_SETTLE microseconds have passed since
# starting play.
return self.start_dt is not None and (
self.player.is_playing()
or (dt.datetime.now() - self.start_dt)
< dt.timedelta(microseconds=Config.PLAY_SETTLE)
)
def play(
self,
path: str,
start_time: dt.datetime,
position: Optional[float] = None,
) -> None:
"""
Start playing the track at path.
Log and return if path not found.
start_time ensures our version and our caller's version of
the start time is the same
"""
log.debug(f"Music[{self.name}].play({path=}, {position=}")
if file_is_unreadable(path):
log.error(f"play({path}): path not readable")
return None
media = self.VLC.media_new_path(path)
if media is None:
log.error(f"_Music:play: failed to create media ({path=})")
show_warning(None, "Error loading file", f"Cannot play file ({path})")
return
self.player = media.player_new_from_media()
if self.player:
_ = self.player.play()
self.set_volume(self.max_volume)
self.player_count += 1
log.debug(f"_Music.play: {self.player_count=}")
if position:
self.player.set_position(position)
self.start_dt = start_time
# For as-yet unknown reasons. sometimes the volume gets
# reset to zero within 200mS or so of starting play. This
# only happened since moving to Debian 12, which uses
# Pipewire for sound (which may be irrelevant).
# It has been known for the volume to need correcting more
# than once in the first 200mS.
for _ in range(3):
if self.player:
volume = self.player.audio_get_volume()
if volume < Config.VLC_VOLUME_DEFAULT:
self.set_volume(Config.VLC_VOLUME_DEFAULT)
log.error(f"Reset from {volume=}")
sleep(0.1)
else:
log.error("_Music:play: failed to create media player")
show_warning(None, "Media player", "Unable to create media player")
def set_position(self, position: float) -> None:
"""
Set player position
"""
if self.player:
self.player.set_position(position)
def set_volume(
self, volume: Optional[int] = None, set_default: bool = True
) -> None:
"""Set maximum volume used for player"""
if not self.player:
return
if set_default and volume:
self.max_volume = volume
if volume is None:
volume = Config.VLC_VOLUME_DEFAULT
self.player.audio_set_volume(volume)
# Ensure volume correct
# For as-yet unknown reasons. sometimes the volume gets
# reset to zero within 200mS or so of starting play. This
# only happened since moving to Debian 12, which uses
# Pipewire for sound (which may be irrelevant).
for _ in range(3):
current_volume = self.player.audio_get_volume()
if current_volume < volume:
self.player.audio_set_volume(volume)
log.debug(f"Reset from {volume=}")
sleep(0.1)
@singleton
@dataclass
class MusicMusterSignals(QObject):
"""
@ -53,6 +485,294 @@ class MusicMusterSignals(QObject):
super().__init__()
class RowAndTrack:
"""
Object to manage playlist rows and tracks.
"""
def __init__(self, playlist_row: PlaylistRows) -> None:
"""
Initialises data structure.
The passed PlaylistRows object will include a Tracks object if this
row has a track.
"""
# Collect playlistrow data
self.note = playlist_row.note
self.played = playlist_row.played
self.playlist_id = playlist_row.playlist_id
self.playlistrow_id = playlist_row.id
self.row_number = playlist_row.row_number
self.track_id = playlist_row.track_id
# Collect track data if there's a track
if playlist_row.track_id:
self.artist = playlist_row.track.artist
self.bitrate = playlist_row.track.bitrate
self.duration = playlist_row.track.duration
self.fade_at = playlist_row.track.fade_at
self.intro = playlist_row.track.intro
if playlist_row.track.playdates:
self.lastplayed = max([a.lastplayed for a in playlist_row.track.playdates])
else:
self.lastplayed = Config.EPOCH
self.path = playlist_row.track.path
self.silence_at = playlist_row.track.silence_at
self.start_gap = playlist_row.track.start_gap
self.title = playlist_row.track.title
else:
self.artist = ""
self.bitrate = None
self.duration = 0
self.fade_at = 0
self.intro = None
self.lastplayed = Config.EPOCH
self.path = ""
self.silence_at = 0
self.start_gap = 0
self.title = ""
# Track playing data
self.end_of_track_signalled: bool = False
self.end_time: Optional[dt.datetime] = None
self.fade_graph: Optional[_FadeCurve] = None
self.fade_graph_start_updates: Optional[dt.datetime] = None
self.resume_marker: Optional[float] = 0.0
self.forecast_end_time: Optional[dt.datetime] = None
self.forecast_start_time: Optional[dt.datetime] = None
self.start_time: Optional[dt.datetime] = None
# Other object initialisation
self.signals = MusicMusterSignals()
def __repr__(self) -> str:
return (
f"<RowAndTrack(playlist_id={self.playlist_id}, "
f"row_number={self.row_number}, "
f"note={self.note}, track_id={self.track_id}>"
)
def check_for_end_of_track(self) -> None:
"""
Check whether track has ended. If so, emit track_ended_signal
"""
if self.start_time is None:
return
if self.end_of_track_signalled:
return
if not self.player.is_playing():
self.start_time = None
if self.fade_graph:
self.fade_graph.clear()
self.signal_end_of_track()
self.end_of_track_signalled = True
def create_fade_graph(self) -> None:
"""
Initialise and add FadeCurve in a thread as it's slow
"""
self.fadecurve_thread = QThread()
self.worker = _AddFadeCurve(
self,
track_path=self.path,
track_fade_at=self.fade_at,
track_silence_at=self.silence_at,
)
self.worker.moveToThread(self.fadecurve_thread)
self.fadecurve_thread.started.connect(self.worker.run)
self.worker.finished.connect(self.fadecurve_thread.quit)
self.worker.finished.connect(self.worker.deleteLater)
self.fadecurve_thread.finished.connect(self.fadecurve_thread.deleteLater)
self.fadecurve_thread.start()
def drop3db(self, enable: bool) -> None:
"""
If enable is true, drop output by 3db else restore to full volume
"""
if enable:
self.player.set_volume(volume=Config.VLC_VOLUME_DROP3db, set_default=False)
else:
self.player.set_volume(volume=Config.VLC_VOLUME_DEFAULT, set_default=False)
def fade(self, fade_seconds: int = Config.FADEOUT_SECONDS) -> None:
"""Fade music"""
self.resume_marker = self.player.get_position()
self.player.fade(fade_seconds)
self.signal_end_of_track()
def is_playing(self) -> bool:
"""
Return True if we're currently playing else False
"""
if self.start_time is None:
return False
return self.player.is_playing()
def move_back(self, ms: int = Config.PREVIEW_BACK_MS) -> None:
"""
Rewind player by ms milliseconds
"""
self.player.adjust_by_ms(ms * -1)
def move_forward(self, ms: int = Config.PREVIEW_ADVANCE_MS) -> None:
"""
Rewind player by ms milliseconds
"""
self.player.adjust_by_ms(ms)
def play(self, position: Optional[float] = None) -> None:
"""Play track"""
log.debug(f"issue223: RowAndTrack: play {self.track_id=}")
now = dt.datetime.now()
self.start_time = now
# Initialise player
self.player = _Music(name=Config.VLC_MAIN_PLAYER_NAME)
self.player.play(self.path, start_time=now, position=position)
self.end_time = now + dt.timedelta(milliseconds=self.duration)
# Calculate time fade_graph should start updating
if self.fade_at:
update_graph_at_ms = max(
0, self.fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1
)
self.fade_graph_start_updates = now + dt.timedelta(
milliseconds=update_graph_at_ms
)
def restart(self) -> None:
"""
Restart player
"""
self.player.adjust_by_ms(self.time_playing() * -1)
def set_forecast_start_time(
self, modified_rows: list[int], start: Optional[dt.datetime]
) -> Optional[dt.datetime]:
"""
Set forecast start time for this row
Update passed modified rows list if we changed the row.
Return new start time
"""
changed = False
if self.forecast_start_time != start:
self.forecast_start_time = start
changed = True
if start is None:
if self.forecast_end_time is not None:
self.forecast_end_time = None
changed = True
new_start_time = None
else:
end_time = start + dt.timedelta(milliseconds=self.duration)
new_start_time = end_time
if self.forecast_end_time != end_time:
self.forecast_end_time = end_time
changed = True
if changed and self.row_number not in modified_rows:
modified_rows.append(self.row_number)
return new_start_time
def signal_end_of_track(self) -> None:
"""
Send end of track signal unless we are a preview player
"""
self.signals.track_ended_signal.emit()
def stop(self, fade_seconds: int = 0) -> None:
"""
Stop this track playing
"""
self.resume_marker = self.player.get_position()
self.fade(fade_seconds)
# Reset fade graph
if self.fade_graph:
self.fade_graph.clear()
def time_playing(self) -> int:
"""
Return time track has been playing in milliseconds, zero if not playing
"""
if self.start_time is None:
return 0
return self.player.get_playtime()
def time_remaining_intro(self) -> int:
"""
Return milliseconds of intro remaining. Return 0 if no intro time in track
record or if intro has finished.
"""
if not self.intro:
return 0
return max(0, self.intro - self.time_playing())
def time_to_fade(self) -> int:
"""
Return milliseconds until fade time. Return zero if we're not playing.
"""
if self.start_time is None:
return 0
return self.fade_at - self.time_playing()
def time_to_silence(self) -> int:
"""
Return milliseconds until silent. Return zero if we're not playing.
"""
if self.start_time is None:
return 0
return self.silence_at - self.time_playing()
def update_fade_graph(self) -> None:
"""
Update fade graph
"""
if (
not self.is_playing()
or not self.fade_graph_start_updates
or not self.fade_graph
):
return
now = dt.datetime.now()
if self.fade_graph_start_updates > now:
return
self.fade_graph.tick(self.time_playing())
@dataclass
class TrackFileData:
"""
@ -70,3 +790,29 @@ class TrackFileData:
class TrackInfo(NamedTuple):
track_id: int
row_number: int
class TrackSequence:
next: Optional[RowAndTrack] = None
current: Optional[RowAndTrack] = None
previous: Optional[RowAndTrack] = None
def set_next(self, rat: Optional[RowAndTrack]) -> None:
"""
Set the 'next' track to be passed rat. Clear
any previous next track. If passed rat is None
just clear existing next track.
"""
# Clear any existing fade graph
if self.next and self.next.fade_graph:
self.next.fade_graph.clear()
if rat is None:
self.next = None
else:
self.next = rat
self.next.create_fade_graph()
track_sequence = TrackSequence()

View File

@ -87,7 +87,7 @@ class Config(object):
PREVIEW_BACK_MS = 5000
PREVIEW_END_BUFFER_MS = 1000
REPLACE_FILES_DEFAULT_SOURCE = "/home/kae/music/Singles/tmp"
RETURN_KEY_DEBOUNCE_MS = 500
RETURN_KEY_DEBOUNCE_MS = 1000
ROOT = os.environ.get("ROOT") or "/home/kae/music"
ROWS_FROM_ZERO = True
SCROLL_TOP_MARGIN = 3

View File

@ -76,7 +76,7 @@ class PlaylistsTable(Model):
"PlaylistRowsTable",
back_populates="playlist",
cascade="all, delete-orphan",
order_by="PlaylistRowsTable.plr_rownum",
order_by="PlaylistRowsTable.row_number",
)
def __repr__(self) -> str:
@ -90,7 +90,7 @@ class PlaylistRowsTable(Model):
__tablename__ = "playlist_rows"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
plr_rownum: Mapped[int]
row_number: Mapped[int]
note: Mapped[str] = mapped_column(
String(2048), index=False, default="", nullable=False
)
@ -107,9 +107,9 @@ class PlaylistRowsTable(Model):
def __repr__(self) -> str:
return (
f"<PlaylistRow(id={self.id}, playlist_id={self.playlist_id}, "
f"<PlaylistRows(id={self.id}, playlist_id={self.playlist_id}, "
f"track_id={self.track_id}, "
f"note={self.note}, plr_rownum={self.plr_rownum}>"
f"note={self.note}, row_number={self.row_number}>"
)
@ -135,7 +135,6 @@ class TracksTable(Model):
__tablename__ = "tracks"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
title: Mapped[str] = mapped_column(String(256), index=True)
artist: Mapped[str] = mapped_column(String(256), index=True)
bitrate: Mapped[Optional[int]] = mapped_column(default=None)
duration: Mapped[int] = mapped_column(index=True)
@ -145,6 +144,8 @@ class TracksTable(Model):
path: Mapped[str] = mapped_column(String(2048), index=False, unique=True)
silence_at: Mapped[int] = mapped_column(index=False)
start_gap: Mapped[int] = mapped_column(index=False)
title: Mapped[str] = mapped_column(String(256), index=True)
playlistrows: Mapped[List[PlaylistRowsTable]] = relationship(
"PlaylistRowsTable", back_populates="track"
)

View File

@ -67,8 +67,12 @@ def log_uncaught_exceptions(type_, value, traceback):
if os.environ["MM_ENV"] == "PRODUCTION":
msg = stackprinter.format(value)
send_mail(
Config.ERRORS_TO, Config.ERRORS_FROM, "Exception from musicmuster", msg
Config.ERRORS_TO,
Config.ERRORS_FROM,
"Exception (log_uncaught_exceptions) from musicmuster",
msg,
)
log.debug(msg)
sys.excepthook = log_uncaught_exceptions

View File

@ -312,7 +312,7 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
self.playlist_id = playlist_id
self.track_id = track_id
self.plr_rownum = row_number
self.row_number = row_number
self.note = note
session.add(self)
session.commit()
@ -338,7 +338,7 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
PlaylistRows(
session=session,
playlist_id=dst_id,
row_number=plr.plr_rownum,
row_number=plr.row_number,
note=plr.note,
track_id=plr.track_id,
)
@ -357,7 +357,7 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
.options(joinedload(cls.track))
.where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.plr_rownum == row_number,
PlaylistRows.row_number == row_number,
)
# .options(joinedload(Tracks.playdates))
)
@ -375,7 +375,7 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
select(PlaylistRows)
.options(joinedload(cls.track))
.where(PlaylistRows.playlist_id == playlist_id)
.order_by(PlaylistRows.plr_rownum)
.order_by(PlaylistRows.row_number)
# .options(joinedload(Tracks.playdates))
)
@ -391,7 +391,7 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
session.execute(
delete(PlaylistRows).where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.plr_rownum > maxrow,
PlaylistRows.row_number > maxrow,
)
)
session.commit()
@ -405,7 +405,7 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
session.execute(
delete(PlaylistRows).where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.plr_rownum == row_number,
PlaylistRows.row_number == row_number,
)
)
@ -418,11 +418,11 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
plrs = session.scalars(
select(PlaylistRows)
.where(PlaylistRows.playlist_id == playlist_id)
.order_by(PlaylistRows.plr_rownum)
.order_by(PlaylistRows.row_number)
).all()
for i, plr in enumerate(plrs):
plr.plr_rownum = i
plr.row_number = i
# Ensure new row numbers are available to the caller
session.commit()
@ -439,7 +439,7 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
plrs = session.scalars(
select(cls)
.where(cls.playlist_id == playlist_id, cls.id.in_(plr_ids))
.order_by(cls.plr_rownum)
.order_by(cls.row_number)
).all()
return plrs
@ -449,7 +449,7 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
"""Return the last used row for playlist, or None if no rows"""
return session.execute(
select(func.max(PlaylistRows.plr_rownum)).where(
select(func.max(PlaylistRows.row_number)).where(
PlaylistRows.playlist_id == playlist_id
)
).scalar_one()
@ -481,7 +481,7 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
plrs = session.scalars(
select(cls)
.where(cls.playlist_id == playlist_id, cls.played.is_(True))
.order_by(cls.plr_rownum)
.order_by(cls.row_number)
).all()
return plrs
@ -500,7 +500,7 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
query = select(cls).where(
cls.playlist_id == playlist_id, cls.track_id.is_not(None)
)
plrs = session.scalars((query).order_by(cls.plr_rownum)).all()
plrs = session.scalars((query).order_by(cls.row_number)).all()
return plrs
@ -520,7 +520,7 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
cls.track_id.is_not(None),
cls.played.is_(False),
)
.order_by(cls.plr_rownum)
.order_by(cls.row_number)
).all()
return plrs
@ -558,17 +558,17 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
update(PlaylistRows)
.where(
(PlaylistRows.playlist_id == playlist_id),
(PlaylistRows.plr_rownum >= starting_row),
(PlaylistRows.row_number >= starting_row),
)
.values(plr_rownum=PlaylistRows.plr_rownum + move_by)
.values(row_number=PlaylistRows.row_number + move_by)
)
@staticmethod
def update_plr_rownumbers(
def update_plr_row_numbers(
session: Session, playlist_id: int, sqla_map: List[dict[str, int]]
) -> None:
"""
Take a {plrid: plr_rownum} dictionary and update the row numbers accordingly
Take a {plrid: row_number} dictionary and update the row numbers accordingly
"""
# Update database. Ref:
@ -577,9 +577,9 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
update(PlaylistRows)
.where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.id == bindparam("plrid"),
PlaylistRows.id == bindparam("playlistrow_id"),
)
.values(plr_rownum=bindparam("plr_rownum"))
.values(row_number=bindparam("row_number"))
)
session.connection().execute(stmt, sqla_map)

View File

@ -53,8 +53,10 @@ import stackprinter # type: ignore
# App imports
from classes import (
MusicMusterSignals,
RowAndTrack,
TrackFileData,
TrackInfo,
track_sequence,
)
from config import Config
from dialogs import TrackSelectDialog, ReplaceFilesDialog
@ -63,10 +65,6 @@ from log import log
from models import db, Playdates, PlaylistRows, Playlists, Settings, Tracks
from playlistmodel import PlaylistModel, PlaylistProxyModel
from playlists import PlaylistTab
from trackmanager import (
MainTrackManager,
track_sequence,
)
from ui import icons_rc # noqa F401
from ui.dlg_SelectPlaylist_ui import Ui_dlgSelectPlaylist # type: ignore
from ui.downloadcsv_ui import Ui_DateSelect # type: ignore
@ -345,7 +343,7 @@ class Window(QMainWindow, Ui_MainWindow):
Clear next track
"""
track_sequence.next = None
track_sequence.set_next(None)
self.update_headers()
def clear_selection(self) -> None:
@ -1108,7 +1106,7 @@ class Window(QMainWindow, Ui_MainWindow):
- Clear next track
- Restore volume if -3dB active
- Play (new) current track.
- Show closing volume graph
- Show fade graph
- Notify model
- Note that track is now playing
- Disable play next controls
@ -1129,8 +1127,8 @@ class Window(QMainWindow, Ui_MainWindow):
# Issue #223 concerns a very short pause (maybe 0.1s) sometimes
# when starting to play at track.
# Resolution appears to be to disable timer10 for the first ten
# seconds of playback. Re-enable in update_clocks.
# Resolution appears to be to disable timer10 for a short time.
# Length of time and re-enabling of timer10 both in update_clocks.
self.timer10.stop()
log.debug("issue223: play_next: 10ms timer disabled")
@ -1161,7 +1159,7 @@ class Window(QMainWindow, Ui_MainWindow):
# Show closing volume graph
if track_sequence.current.fade_graph:
log.debug("issue223: play_next: set up fade_graph")
log.debug(f"issue223: play_next: set up fade_graph, {track_sequence.current.title=}")
track_sequence.current.fade_graph.GraphWidget = self.widgetFadeVolume
track_sequence.current.fade_graph.clear()
track_sequence.current.fade_graph.plot()
@ -1420,7 +1418,7 @@ class Window(QMainWindow, Ui_MainWindow):
# We want to use play_next() to resume, so copy the previous
# track to the next track:
track_sequence.next = track_sequence.previous
track_sequence.set_next(track_sequence.previous)
# Now resume playing the now-next track
self.play_next(track_sequence.next.resume_marker)
@ -1569,7 +1567,7 @@ class Window(QMainWindow, Ui_MainWindow):
self.statusbar.showMessage(message, timing)
def show_track(self, playlist_track: MainTrackManager) -> None:
def show_track(self, playlist_track: RowAndTrack) -> None:
"""Scroll to show track in plt"""
# Switch to the correct tab
@ -1678,12 +1676,6 @@ class Window(QMainWindow, Ui_MainWindow):
self.preview_manager.get_playtime() / 1000, 60
)
self.label_intro_timer.setText(f"{int(minutes)}:{seconds:04.1f}")
# if self.preview_track_manager.time_remaining_intro() <= 50:
# self.label_intro_timer.setStyleSheet(
# f"background: {Config.COLOUR_WARNING_TIMER}"
# )
# else:
# self.label_intro_timer.setStyleSheet("")
else:
self.btnPreview.setChecked(False)
self.label_intro_timer.setText("0.0")
@ -1717,7 +1709,7 @@ class Window(QMainWindow, Ui_MainWindow):
# see play_next() and issue #223.
# TODO: find a better way of handling this
if (
track_sequence.current.time_playing() > 10000
track_sequence.current.time_playing() > 5000
and not self.timer10.isActive()
):
self.timer10.start(10)
@ -1915,9 +1907,10 @@ if __name__ == "__main__":
send_mail(
",".join(Config.ERRORS_TO),
",".join(Config.ERRORS_FROM),
"Exception from musicmuster",
"Exception from musicmuster.py",
msg,
)
log.debug(msg)
else:
print("\033[1;31;47mUnhandled exception starts")
print(

File diff suppressed because it is too large Load Diff

View File

@ -35,7 +35,7 @@ from PyQt6.QtWidgets import (
# Third party imports
# App imports
from classes import Col, MusicMusterSignals, TrackInfo
from classes import Col, MusicMusterSignals, TrackInfo, track_sequence
from config import Config
from dialogs import TrackSelectDialog
from helpers import (
@ -47,7 +47,6 @@ from helpers import (
from log import log
from models import db, Settings
from playlistmodel import PlaylistModel, PlaylistProxyModel
from trackmanager import track_sequence
if TYPE_CHECKING:
from musicmuster import Window
@ -208,11 +207,14 @@ class PlaylistTab(QTableView):
self.setDragDropMode(QAbstractItemView.DragDropMode.InternalMove)
self.setDragDropOverwriteMode(False)
self.setAcceptDrops(True)
# Set our custom style - this draws the drop indicator across the whole row
self.setStyle(PlaylistStyle())
# We will enable dragging when rows are selected. Disabling it
# here means we can click and drag to select rows.
self.setDragEnabled(False)
# Prepare for context menu
self.menu = QMenu()
self.setContextMenuPolicy(Qt.ContextMenuPolicy.CustomContextMenu)
@ -230,12 +232,14 @@ class PlaylistTab(QTableView):
# Load playlist rows
self.setModel(self.proxy_model)
self._set_column_widths()
# Stretch last column *after* setting column widths which is
# *much* faster
h_header = self.horizontalHeader()
if isinstance(h_header, QHeaderView):
h_header.sectionResized.connect(self._column_resize)
h_header.setStretchLastSection(True)
# Setting ResizeToContents causes screen flash on load
self.resize_rows()
@ -921,12 +925,6 @@ class PlaylistTab(QTableView):
Implement spanning of cells, initiated by signal
"""
# Commented out as too noisy
# log.debug(
# f"_span_cells({playlist_id=}, {row=}, "
# f"{column=}, {rowSpan=}, {columnSpan=}) {self.playlist_id=}"
# )
if playlist_id != self.playlist_id:
return
@ -951,5 +949,6 @@ class PlaylistTab(QTableView):
def _unmark_as_next(self) -> None:
"""Rescan track"""
self.source_model.set_next_row(None)
track_sequence.set_next(None)
self.clear_selection()
self.signals.next_track_changed_signal.emit()

View File

@ -1,656 +0,0 @@
# Standard library imports
from __future__ import annotations
import datetime as dt
import threading
from time import sleep
from typing import Optional
# Third party imports
import numpy as np
import pyqtgraph as pg # type: ignore
import vlc # type: ignore
# PyQt imports
from PyQt6.QtCore import (
pyqtSignal,
QObject,
QRunnable,
QThread,
QThreadPool,
)
from pyqtgraph import PlotWidget
from pyqtgraph.graphicsItems.PlotDataItem import PlotDataItem # type: ignore
from pyqtgraph.graphicsItems.LinearRegionItem import LinearRegionItem # type: ignore
# App imports
from classes import MusicMusterSignals
from config import Config
from log import log
from models import db, PlaylistRows, Tracks
from helpers import (
file_is_unreadable,
get_audio_segment,
show_warning,
)
lock = threading.Lock()
class _AddFadeCurve(QObject):
"""
Initialising a fade curve introduces a noticeable delay so carry out in
a thread.
"""
finished = pyqtSignal()
def __init__(
self,
track_manager: _TrackManager,
track_path: str,
track_fade_at: int,
track_silence_at: int,
) -> None:
super().__init__()
self.track_manager = track_manager
self.track_path = track_path
self.track_fade_at = track_fade_at
self.track_silence_at = track_silence_at
def run(self) -> None:
"""
Create fade curve and add to PlaylistTrack object
"""
fc = _FadeCurve(self.track_path, self.track_fade_at, self.track_silence_at)
if not fc:
log.error(f"Failed to create FadeCurve for {self.track_path=}")
else:
self.track_manager.fade_graph = fc
self.finished.emit()
class _FadeCurve:
GraphWidget: Optional[PlotWidget] = None
def __init__(
self, track_path: str, track_fade_at: int, track_silence_at: int
) -> None:
"""
Set up fade graph array
"""
audio = get_audio_segment(track_path)
if not audio:
log.error(f"FadeCurve: could not get audio for {track_path=}")
return None
# Start point of curve is Config.FADE_CURVE_MS_BEFORE_FADE
# milliseconds before fade starts to silence
self.start_ms: int = max(
0, track_fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1
)
self.end_ms: int = track_silence_at
self.audio_segment = audio[self.start_ms : self.end_ms]
self.graph_array = np.array(self.audio_segment.get_array_of_samples())
# Calculate the factor to map milliseconds of track to array
self.ms_to_array_factor = len(self.graph_array) / (self.end_ms - self.start_ms)
self.curve: Optional[PlotDataItem] = None
self.region: Optional[LinearRegionItem] = None
def clear(self) -> None:
"""Clear the current graph"""
if self.GraphWidget:
self.GraphWidget.clear()
def plot(self) -> None:
if self.GraphWidget:
self.curve = self.GraphWidget.plot(self.graph_array)
if self.curve:
self.curve.setPen(Config.FADE_CURVE_FOREGROUND)
def tick(self, play_time: int) -> None:
"""Update volume fade curve"""
if not self.GraphWidget:
return
ms_of_graph = play_time - self.start_ms
if ms_of_graph < 0:
return
if self.region is None:
# Create the region now that we're into fade
log.debug("issue223: _FadeCurve: create region")
self.region = pg.LinearRegionItem([0, 0], bounds=[0, len(self.graph_array)])
self.GraphWidget.addItem(self.region)
# Update region position
if self.region:
log.debug("issue223: _FadeCurve: update region")
self.region.setRegion([0, ms_of_graph * self.ms_to_array_factor])
class _FadeTrack(QRunnable):
def __init__(self, player: vlc.MediaPlayer, fade_seconds: int) -> None:
super().__init__()
self.player = player
self.fade_seconds = fade_seconds
def run(self) -> None:
"""
Implementation of fading the player
"""
if not self.player:
return
# Reduce volume logarithmically
total_steps = self.fade_seconds * Config.FADEOUT_STEPS_PER_SECOND
db_reduction_per_step = Config.FADEOUT_DB / total_steps
reduction_factor_per_step = pow(10, (db_reduction_per_step / 20))
volume = self.player.audio_get_volume()
for i in range(1, total_steps + 1):
self.player.audio_set_volume(
int(volume * pow(reduction_factor_per_step, i))
)
sleep(1 / Config.FADEOUT_STEPS_PER_SECOND)
self.player.stop()
log.debug(f"Releasing player {self.player=}")
self.player.release()
class _Music:
"""
Manage the playing of music tracks
"""
def __init__(self, name: str) -> None:
self.VLC = vlc.Instance()
self.VLC.set_user_agent(name, name)
self.player: Optional[vlc.MediaPlayer] = None
self.name = name
self.max_volume: int = Config.VLC_VOLUME_DEFAULT
self.start_dt: Optional[dt.datetime] = None
def adjust_by_ms(self, ms: int) -> None:
"""Move player position by ms milliseconds"""
if not self.player:
return
elapsed_ms = self.get_playtime()
position = self.get_position()
if not position:
position = 0.0
new_position = max(0.0, position + ((position * ms) / elapsed_ms))
self.set_position(new_position)
# Adjus start time so elapsed time calculations are correct
if new_position == 0:
self.start_dt = dt.datetime.now()
else:
if self.start_dt:
self.start_dt -= dt.timedelta(milliseconds=ms)
else:
self.start_dt = dt.datetime.now() - dt.timedelta(milliseconds=ms)
def stop(self) -> None:
"""Immediately stop playing"""
log.debug(f"Music[{self.name}].stop()")
self.start_dt = None
if not self.player:
return
p = self.player
self.player = None
self.start_dt = None
with lock:
p.stop()
p.release()
p = None
def fade(self, fade_seconds: int) -> None:
"""
Fade the currently playing track.
The actual management of fading runs in its own thread so as not
to hold up the UI during the fade.
"""
if not self.player:
return
if not self.player.get_position() > 0 and self.player.is_playing():
return
if fade_seconds <= 0:
self.stop()
return
# Take a copy of current player to allow another track to be
# started without interfering here
with lock:
p = self.player
self.player = None
pool = QThreadPool.globalInstance()
if pool:
fader = _FadeTrack(p, fade_seconds=fade_seconds)
pool.start(fader)
self.start_dt = None
else:
log.error("_Music: failed to allocate QThreadPool")
def get_playtime(self) -> int:
"""
Return number of milliseconds current track has been playing or
zero if not playing. The vlc function get_time() only updates 3-4
times a second; this function has much better resolution.
"""
if self.start_dt is None:
return 0
now = dt.datetime.now()
elapsed_seconds = (now - self.start_dt).total_seconds()
return int(elapsed_seconds * 1000)
def get_position(self) -> Optional[float]:
"""Return current position"""
if not self.player:
return None
return self.player.get_position()
def is_playing(self) -> bool:
"""
Return True if we're playing
"""
if not self.player:
return False
# There is a discrete time between starting playing a track and
# player.is_playing() returning True, so assume playing if less
# than Config.PLAY_SETTLE microseconds have passed since
# starting play.
return self.start_dt is not None and (
self.player.is_playing()
or (dt.datetime.now() - self.start_dt)
< dt.timedelta(microseconds=Config.PLAY_SETTLE)
)
def play(
self,
path: str,
start_time: dt.datetime,
position: Optional[float] = None,
) -> None:
"""
Start playing the track at path.
Log and return if path not found.
start_time ensures our version and our caller's version of
the start time is the same
"""
log.debug(f"Music[{self.name}].play({path=}, {position=}")
if file_is_unreadable(path):
log.error(f"play({path}): path not readable")
return None
media = self.VLC.media_new_path(path)
if media is None:
log.error(f"_Music:play: failed to create media ({path=})")
show_warning(None, "Error loading file", f"Cannot play file ({path})")
return
self.player = media.player_new_from_media()
if self.player:
_ = self.player.play()
self.set_volume(self.max_volume)
if position:
self.player.set_position(position)
self.start_dt = start_time
# For as-yet unknown reasons. sometimes the volume gets
# reset to zero within 200mS or so of starting play. This
# only happened since moving to Debian 12, which uses
# Pipewire for sound (which may be irrelevant).
# It has been known for the volume to need correcting more
# than once in the first 200mS.
for _ in range(3):
if self.player:
volume = self.player.audio_get_volume()
if volume < Config.VLC_VOLUME_DEFAULT:
self.set_volume(Config.VLC_VOLUME_DEFAULT)
log.error(f"Reset from {volume=}")
sleep(0.1)
else:
log.error("_Music:play: failed to create media player")
show_warning(None, "Media player", "Unable to create media player")
def set_position(self, position: float) -> None:
"""
Set player position
"""
if self.player:
self.player.set_position(position)
def set_volume(
self, volume: Optional[int] = None, set_default: bool = True
) -> None:
"""Set maximum volume used for player"""
if not self.player:
return
if set_default and volume:
self.max_volume = volume
if volume is None:
volume = Config.VLC_VOLUME_DEFAULT
self.player.audio_set_volume(volume)
# Ensure volume correct
# For as-yet unknown reasons. sometimes the volume gets
# reset to zero within 200mS or so of starting play. This
# only happened since moving to Debian 12, which uses
# Pipewire for sound (which may be irrelevant).
for _ in range(3):
current_volume = self.player.audio_get_volume()
if current_volume < volume:
self.player.audio_set_volume(volume)
log.debug(f"Reset from {volume=}")
sleep(0.1)
class _TrackManager:
"""
Object to manage active playlist tracks,
typically the previous, current and next track.
"""
def __init__(
self,
session: db.Session,
player_name: str,
track_id: int,
row_number: int,
) -> None:
"""
Initialises data structure.
Define a player.
Raise ValueError if no track in passed plr.
"""
track = session.get(Tracks, track_id)
if not track:
raise ValueError(f"_TrackPlayer: unable to retreived {track_id=}")
self.player_name = player_name
self.row_number = row_number
# Check file readable
if file_is_unreadable(track.path):
raise ValueError(f"_TrackManager.__init__: {track.path=} unreadable")
self.artist = track.artist
self.bitrate = track.bitrate
self.duration = track.duration
self.fade_at = track.fade_at
self.intro = track.intro
self.path = track.path
self.silence_at = track.silence_at
self.start_gap = track.start_gap
self.title = track.title
self.track_id = track.id
self.end_time: Optional[dt.datetime] = None
self.fade_graph: Optional[_FadeCurve] = None
self.fade_graph_start_updates: Optional[dt.datetime] = None
self.resume_marker: Optional[float]
self.start_time: Optional[dt.datetime] = None
self.end_of_track_signalled: bool = False
self.signals = MusicMusterSignals()
# Initialise player
self.player = _Music(name=player_name)
# Initialise and add FadeCurve in a thread as it's slow
self.fadecurve_thread = QThread()
self.worker = _AddFadeCurve(
self,
track_path=track.path,
track_fade_at=track.fade_at,
track_silence_at=track.silence_at,
)
self.worker.moveToThread(self.fadecurve_thread)
self.fadecurve_thread.started.connect(self.worker.run)
self.worker.finished.connect(self.fadecurve_thread.quit)
self.worker.finished.connect(self.worker.deleteLater)
self.fadecurve_thread.finished.connect(self.fadecurve_thread.deleteLater)
self.fadecurve_thread.start()
def check_for_end_of_track(self) -> None:
"""
Check whether track has ended. If so, emit track_ended_signal
"""
if self.start_time is None:
return
if self.end_of_track_signalled:
return
if not self.player.is_playing():
self.start_time = None
if self.fade_graph:
self.fade_graph.clear()
self.signal_end_of_track()
self.end_of_track_signalled = True
def drop3db(self, enable: bool) -> None:
"""
If enable is true, drop output by 3db else restore to full volume
"""
if enable:
self.player.set_volume(volume=Config.VLC_VOLUME_DROP3db, set_default=False)
else:
self.player.set_volume(volume=Config.VLC_VOLUME_DEFAULT, set_default=False)
def fade(self, fade_seconds: int = Config.FADEOUT_SECONDS) -> None:
"""Fade music"""
self.resume_marker = self.player.get_position()
self.player.fade(fade_seconds)
self.signal_end_of_track()
def is_playing(self) -> bool:
"""
Return True if we're currently playing else False
"""
if self.start_time is None:
return False
return self.player.is_playing()
def move_back(self, ms: int = Config.PREVIEW_BACK_MS) -> None:
"""
Rewind player by ms milliseconds
"""
self.player.adjust_by_ms(ms * -1)
def move_forward(self, ms: int = Config.PREVIEW_ADVANCE_MS) -> None:
"""
Rewind player by ms milliseconds
"""
self.player.adjust_by_ms(ms)
def play(self, position: Optional[float] = None) -> None:
"""Play track"""
log.debug(f"issue223: _TrackManager: play {self.track_id=}")
now = dt.datetime.now()
self.start_time = now
self.player.play(self.path, start_time=now, position=position)
self.end_time = now + dt.timedelta(milliseconds=self.duration)
# Calculate time fade_graph should start updating
if self.fade_at:
update_graph_at_ms = max(
0, self.fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1
)
self.fade_graph_start_updates = now + dt.timedelta(
milliseconds=update_graph_at_ms
)
def restart(self) -> None:
"""
Restart player
"""
self.player.adjust_by_ms(self.time_playing() * -1)
def signal_end_of_track(self) -> None:
"""
Send end of track signal unless we are a preview player
"""
self.signals.track_ended_signal.emit()
def stop(self, fade_seconds: int = 0) -> None:
"""
Stop this track playing
"""
self.resume_marker = self.player.get_position()
self.fade(fade_seconds)
# Reset fade graph
if self.fade_graph:
self.fade_graph.clear()
def time_playing(self) -> int:
"""
Return time track has been playing in milliseconds, zero if not playing
"""
if self.start_time is None:
return 0
return self.player.get_playtime()
def time_remaining_intro(self) -> int:
"""
Return milliseconds of intro remaining. Return 0 if no intro time in track
record or if intro has finished.
"""
if not self.intro:
return 0
return max(0, self.intro - self.time_playing())
def time_to_fade(self) -> int:
"""
Return milliseconds until fade time. Return zero if we're not playing.
"""
if self.start_time is None:
return 0
return self.fade_at - self.time_playing()
def time_to_silence(self) -> int:
"""
Return milliseconds until silent. Return zero if we're not playing.
"""
if self.start_time is None:
return 0
return self.silence_at - self.time_playing()
def update_fade_graph(self) -> None:
"""
Update fade graph
"""
if (
not self.is_playing()
or not self.fade_graph_start_updates
or not self.fade_graph
):
return
now = dt.datetime.now()
if self.fade_graph_start_updates > now:
return
self.fade_graph.tick(self.time_playing())
class MainTrackManager(_TrackManager):
"""
Manage playing tracks from the playlist with associated data
"""
def __init__(self, session: db.Session, plr_id: int) -> None:
"""
Set up manager for playlist tracks
"""
# Ensure we have a track
plr = session.get(PlaylistRows, plr_id)
if not plr:
raise ValueError(f"PlaylistTrack: unable to retreive plr {plr_id=}")
self.track_id: int = plr.track_id
super().__init__(
session=session,
player_name=Config.VLC_MAIN_PLAYER_NAME,
track_id=self.track_id,
row_number=plr.plr_rownum,
)
# Save non-track plr info
self.plr_id: int = plr.id
self.playlist_id: int = plr.playlist_id
def __repr__(self) -> str:
return (
f"<MainTrackManager(plr_id={self.plr_id}, playlist_id={self.playlist_id}, "
f"row_number={self.row_number}>"
)
class TrackSequence:
next: Optional[MainTrackManager] = None
current: Optional[MainTrackManager] = None
previous: Optional[MainTrackManager] = None
track_sequence = TrackSequence()

View File

@ -56,7 +56,7 @@ class TestMMMiscTracks(unittest.TestCase):
assert max(self.model.playlist_rows.keys()) == 7
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].plr_rownum == row
assert self.model.playlist_rows[row].row_number == row
def test_timing_one_track(self):
START_ROW = 0
@ -140,7 +140,7 @@ class TestMMMiscRowMove(unittest.TestCase):
# Check we have all rows and plr_rownums are correct
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].plr_rownum == row
assert self.model.playlist_rows[row].row_number == row
if row not in [3, 4, 5]:
assert self.model.playlist_rows[row].note == str(row)
elif row == 3:
@ -158,7 +158,7 @@ class TestMMMiscRowMove(unittest.TestCase):
# Check we have all rows and plr_rownums are correct
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].plr_rownum == row
assert self.model.playlist_rows[row].row_number == row
if row not in [3, 4]:
assert self.model.playlist_rows[row].note == str(row)
elif row == 3:
@ -174,7 +174,7 @@ class TestMMMiscRowMove(unittest.TestCase):
# Check we have all rows and plr_rownums are correct
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].plr_rownum == row
assert self.model.playlist_rows[row].row_number == row
if row not in [2, 3, 4]:
assert self.model.playlist_rows[row].note == str(row)
elif row == 2:
@ -193,7 +193,7 @@ class TestMMMiscRowMove(unittest.TestCase):
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].plr_rownum == row
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 2, 3, 6, 7, 1, 4, 5, 10, 8, 9]
@ -206,7 +206,7 @@ class TestMMMiscRowMove(unittest.TestCase):
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].plr_rownum == row
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 4, 3, 6, 5, 7, 8, 9, 10]
@ -219,7 +219,7 @@ class TestMMMiscRowMove(unittest.TestCase):
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].plr_rownum == row
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 4, 7, 3, 5, 6, 8, 9, 10]
@ -232,7 +232,7 @@ class TestMMMiscRowMove(unittest.TestCase):
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].plr_rownum == row
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 3, 4, 7, 8, 10, 5, 6, 9]
@ -246,7 +246,7 @@ class TestMMMiscRowMove(unittest.TestCase):
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].plr_rownum == row
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
@ -328,7 +328,7 @@ class TestMMMiscRowMove(unittest.TestCase):
assert len(model_src.playlist_rows) == self.ROWS_TO_CREATE - len(from_rows)
assert len(model_dst.playlist_rows) == self.ROWS_TO_CREATE + len(from_rows)
assert sorted([a.plr_rownum for a in model_src.playlist_rows.values()]) == list(
assert sorted([a.row_number for a in model_src.playlist_rows.values()]) == list(
range(len(model_src.playlist_rows))
)