Compare commits

..

No commits in common. "ab084ccf9702eb8bd40e6596299a1bd4408dcbd4" and "ad3ec45a763c5b01379b9b81a6855dcf232fdab3" have entirely different histories.

8 changed files with 174 additions and 379 deletions

View File

@ -81,7 +81,7 @@ class Config(object):
MAX_MISSING_FILES_TO_REPORT = 10
MILLISECOND_SIGFIGS = 0
MINIMUM_ROW_HEIGHT = 30
NOTE_TIME_FORMAT = "%H:%M"
NOTE_TIME_FORMAT = "%H:%M:%S"
OBS_HOST = "localhost"
OBS_PASSWORD = "auster"
OBS_PORT = 4455

View File

@ -4,7 +4,6 @@ from typing import Any, Dict, Optional
import functools
import os
import psutil
import re
import shutil
import smtplib
import ssl
@ -20,8 +19,6 @@ from tinytag import TinyTag # type: ignore
from config import Config
from log import log
start_time_re = re.compile(r"@\d\d:\d\d")
# Classes are defined after global functions so that classes can use
# those functions.
@ -98,47 +95,20 @@ def get_audio_segment(path: str) -> Optional[AudioSegment]:
return None
def get_embedded_time(text: str) -> Optional[datetime]:
"""Return datetime specified as @hh:mm:ss in text"""
def get_tags(path: str) -> Dict[str, Any]:
"""
Return a dictionary of title, artist, duration-in-milliseconds and path.
"""
try:
match = start_time_re.search(text)
except TypeError:
return None
if not match:
return None
tag = TinyTag.get(path)
try:
return datetime.strptime(match.group(0)[1:], Config.NOTE_TIME_FORMAT)
except ValueError:
return None
def get_file_metadata(filepath: str) -> dict:
"""Return track metadata"""
# Get title, artist, bitrate, duration, path
metadata: Dict[str, str | int | float] = get_tags(filepath)
metadata["mtime"] = os.path.getmtime(filepath)
# Set start_gap, fade_at and silence_at
audio = get_audio_segment(filepath)
if not audio:
audio_values = dict(start_gap=0, fade_at=0, silence_at=0)
else:
audio_values = dict(
start_gap=leading_silence(audio),
fade_at=int(
round(fade_point(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000
),
silence_at=int(
round(trailing_silence(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000
),
return dict(
title=tag.title,
artist=tag.artist,
bitrate=round(tag.bitrate),
duration=int(round(tag.duration, Config.MILLISECOND_SIGFIGS) * 1000),
path=path,
)
metadata |= audio_values
return metadata
def get_relative_date(
@ -183,20 +153,31 @@ def get_relative_date(
return f"{weeks} {weeks_str}, {days} {days_str} ago"
def get_tags(path: str) -> Dict[str, Any]:
"""
Return a dictionary of title, artist, duration-in-milliseconds and path.
"""
def get_file_metadata(filepath: str) -> dict:
"""Return track metadata"""
tag = TinyTag.get(path)
# Get title, artist, bitrate, duration, path
metadata: Dict[str, str | int | float] = get_tags(filepath)
return dict(
title=tag.title,
artist=tag.artist,
bitrate=round(tag.bitrate),
duration=int(round(tag.duration, Config.MILLISECOND_SIGFIGS) * 1000),
path=path,
metadata["mtime"] = os.path.getmtime(filepath)
# Set start_gap, fade_at and silence_at
audio = get_audio_segment(filepath)
if not audio:
audio_values = dict(start_gap=0, fade_at=0, silence_at=0)
else:
audio_values = dict(
start_gap=leading_silence(audio),
fade_at=int(
round(fade_point(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000
),
silence_at=int(
round(trailing_silence(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000
),
)
metadata |= audio_values
return metadata
def leading_silence(

View File

@ -684,6 +684,51 @@ class Window(QMainWindow, Ui_MainWindow):
self.actionPlay_next.setEnabled(True)
self.statusbar.showMessage("Play controls: Enabled", 0)
def end_of_track_actions(self) -> None:
"""
Clean up after track played
Actions required:
- Set flag to say we're not playing a track
- Tell playlist_tab track has finished
- Reset PlaylistTrack objects
- Reset clocks
- Reset fade graph
- Update headers
- Enable controls
"""
# Set flag to say we're not playing a track so that timer ticks
# don't see player=None and kick off end-of-track actions
self.playing = False
# Tell playlist_tab track has finished
# TODO Reimplement as a signal
# if self.current_track.playlist_tab:
# self.current_track.playlist_tab.play_ended()
# Reset fade graph
if track_sequence.now.fade_graph:
track_sequence.now.fade_graph.clear()
# Reset PlaylistTrack objects
if track_sequence.now.track_id:
track_sequence.previous = track_sequence.now
track_sequence.now = PlaylistTrack()
# Reset clocks
self.frame_fade.setStyleSheet("")
self.frame_silent.setStyleSheet("")
self.label_elapsed_timer.setText("00:00 / 00:00")
self.label_fade_timer.setText("00:00")
self.label_silent_timer.setText("00:00")
# Update headers
self.update_headers()
# Enable controls
self.enable_play_next_controls()
def export_playlist_tab(self) -> None:
"""Export the current playlist to an m3u file"""
@ -1429,29 +1474,19 @@ class Window(QMainWindow, Ui_MainWindow):
self.stop_playing(fade=False)
def stop_playing(self, fade: bool = True) -> None:
def stop_playing(self, fade=True) -> None:
"""
Stop playing current track
Actions required:
- Set flag to say we're not playing a track
- Return if not playing
- Stop/fade track
- Reset playlist_tab colour
- Tell playlist_tab track has finished
- Reset PlaylistTrack objects
- Reset clocks
- Reset fade graph
- Update headers
- Enable controls
- Run end-of-track actions
"""
# Set flag to say we're not playing a track so that timer ticks
# don't see player=None and kick off end-of-track actions
if self.playing:
self.playing = False
else:
# Return if not playing
if not self.playing:
return
# Stop/fade track
@ -1461,30 +1496,20 @@ class Window(QMainWindow, Ui_MainWindow):
else:
self.music.stop()
# Reset fade graph
if track_sequence.now.fade_graph:
track_sequence.now.fade_graph.clear()
# Reset playlist_tab colour
# TODO Reimplement
# if self.current_track.playlist_tab:
# if self.current_track.playlist_tab == self.next_track.playlist_tab:
# self.set_tab_colour(
# self.current_track.playlist_tab, QColor(Config.COLOUR_NEXT_TAB)
# )
# else:
# self.set_tab_colour(
# self.current_track.playlist_tab, QColor(Config.COLOUR_NORMAL_TAB)
# )
# Reset track_sequence objects
if track_sequence.now.track_id:
track_sequence.previous = track_sequence.now
track_sequence.now = PlaylistTrack()
# Tell model previous track has finished
self.active_model().previous_track_ended()
# Reset clocks
self.frame_fade.setStyleSheet("")
self.frame_silent.setStyleSheet("")
self.label_elapsed_timer.setText("00:00 / 00:00")
self.label_fade_timer.setText("00:00")
self.label_silent_timer.setText("00:00")
# Update headers
self.update_headers()
# Enable controls
self.enable_play_next_controls()
# Run end-of-track actions
self.end_of_track_actions()
def tab_change(self):
"""Called when active tab changed"""

View File

@ -1,4 +1,4 @@
from datetime import datetime, timedelta
from datetime import datetime
from enum import auto, Enum
from typing import List, Optional
@ -17,7 +17,7 @@ from PyQt6.QtGui import (
from classes import track_sequence, MusicMusterSignals, PlaylistTrack
from config import Config
from dbconfig import scoped_session, Session
from helpers import file_is_unreadable, get_embedded_time, ms_to_mmss
from helpers import file_is_unreadable
from log import log
from models import Playdates, PlaylistRows, Tracks
@ -43,16 +43,14 @@ class PlaylistRowData:
Populate PlaylistRowData from database PlaylistRows record
"""
self.start_gap: Optional[int] = None
self.title: str = ""
self.artist: str = ""
self.bitrate = 0
self.duration: int = 0
self.end_time: Optional[datetime] = None
self.lastplayed: datetime = Config.EPOCH
self.bitrate = 0
self.path = ""
self.played = False
self.start_gap: Optional[int] = None
self.start_time: Optional[datetime] = None
self.title: str = ""
self.plrid: int = plr.id
self.plr_rownum: int = plr.plr_rownum
@ -110,7 +108,6 @@ class PlaylistModel(QAbstractTableModel):
with Session() as session:
self.refresh_data(session)
self.update_track_times()
def __repr__(self) -> str:
return (
@ -235,38 +232,31 @@ class PlaylistModel(QAbstractTableModel):
- find next track
"""
row_number = track_sequence.now.plr_rownum
# Sanity check
if not track_sequence.now.track_id:
log.error(
"playlistmodel:current_track_started called with no current track"
)
return
if row_number is None:
if track_sequence.now.plr_rownum is None:
log.error(
"playlistmodel:current_track_started called with no row number "
f"({track_sequence.now=})"
)
return
# Update display
self.invalidate_row(track_sequence.now.plr_rownum)
# Update track times
# TODO
# Update Playdates in database
with Session() as session:
Playdates(session, track_sequence.now.track_id)
plr = session.get(PlaylistRows, track_sequence.now.plr_id)
if plr:
plr.played = True
self.refresh_row(session, plr.plr_rownum)
# Update track times
self.playlist_rows[row_number].start_time = datetime.now()
self.playlist_rows[row_number].end_time = datetime.now() + timedelta(
milliseconds=self.playlist_rows[row_number].duration
)
# Update colour and times for current row
self.invalidate_row(row_number)
# Update all other track times
self.update_track_times()
# Find next track
# Get all unplayed track rows
@ -279,7 +269,13 @@ class PlaylistModel(QAbstractTableModel):
if unplayed_rows:
try:
# Find next row after current track
next_row = min([a for a in unplayed_rows if a > row_number])
next_row = min(
[
a
for a in unplayed_rows
if a > track_sequence.now.plr_rownum
]
)
except ValueError:
# Find first unplayed track
next_row = min(unplayed_rows)
@ -339,7 +335,7 @@ class PlaylistModel(QAbstractTableModel):
self.signals.span_cells_signal.emit(
row, HEADER_NOTES_COLUMN, 1, self.columnCount() - 1
)
return QVariant(self.header_text(prd))
return QVariant(prd.note)
else:
return QVariant()
@ -350,17 +346,11 @@ class PlaylistModel(QAbstractTableModel):
if column == Col.ARTIST.value:
return QVariant(prd.artist)
if column == Col.DURATION.value:
return QVariant(ms_to_mmss(prd.duration))
return QVariant(prd.duration)
if column == Col.START_TIME.value:
if prd.start_time:
return QVariant(prd.start_time.strftime(Config.TRACK_TIME_FORMAT))
else:
return QVariant()
return QVariant("FIXME")
if column == Col.END_TIME.value:
if prd.end_time:
return QVariant(prd.end_time.strftime(Config.TRACK_TIME_FORMAT))
else:
return QVariant()
return QVariant("FIXME")
if column == Col.LAST_PLAYED.value:
return QVariant(prd.lastplayed)
if column == Col.BITRATE.value:
@ -459,60 +449,6 @@ class PlaylistModel(QAbstractTableModel):
return QVariant()
def header_text(self, prd: PlaylistRowData) -> str:
"""
Process possible section timing directives embeded in header
"""
count: int = 0
duration: int = 0
if prd.note.endswith("+"):
# This header is the start of a timed section
for row_number in range(prd.plr_rownum + 1, len(self.playlist_rows)):
row_prd = self.playlist_rows[row_number]
if self.is_header_row(row_number):
if row_prd.note.endswith("-"):
return (
f"{prd.note[:-1].strip()} "
f"[{count} tracks, {ms_to_mmss(duration)} unplayed]"
)
else:
continue
else:
count += 1
if not row_prd.played:
duration += row_prd.duration
return (
f"{prd.note[:-1].strip()} "
f"[{count} tracks, {ms_to_mmss(duration)} unplayed (to end of playlist)]"
)
elif prd.note.endswith("="):
# Show subtotal
for row_number in range(prd.plr_rownum - 1, -1, -1):
row_prd = self.playlist_rows[row_number]
if self.is_header_row(row_number):
if row_prd.note.endswith(("+", "=")):
stripped_note = prd.note[:-1].strip()
if stripped_note:
return (
f"{stripped_note} [{count} tracks, "
f"{ms_to_mmss(duration)} unplayed]"
)
else:
return (
f"[Subtotal: {count} tracks, "
f"{ms_to_mmss(duration)} unplayed]"
)
else:
continue
else:
count += 1
if not row_prd.played:
duration += row_prd.duration
return prd.note
def is_header_row(self, row_number: int) -> bool:
"""
Return True if row is a header row, else False
@ -645,33 +581,6 @@ class PlaylistModel(QAbstractTableModel):
# Update display
self.invalidate_rows(list(row_map.keys()))
def previous_track_ended(self) -> None:
"""
Notification from musicmuster that the previous track has ended.
Actions required:
- sanity check
- update display
- update track times
"""
# Sanity check
if not track_sequence.previous.track_id:
log.error("playlistmodel:previous_track_ended called with no current track")
return
if track_sequence.previous.plr_rownum is None:
log.error(
"playlistmodel:previous_track_ended called with no row number "
f"({track_sequence.previous=})"
)
return
# Update display
self.invalidate_row(track_sequence.previous.plr_rownum)
# Update track times
# TODO
def refresh_data(self, session: scoped_session):
"""Populate dicts for data calls"""
@ -684,7 +593,7 @@ class PlaylistModel(QAbstractTableModel):
"""Populate dict for one row for data calls"""
p = PlaylistRows.deep_row(session, self.playlist_id, row_number)
self.playlist_rows[row_number] = PlaylistRowData(p)
self.playlist_rows[p.plr_rownum] = PlaylistRowData(p)
def rowCount(self, index: QModelIndex = QModelIndex()) -> int:
"""Standard function for view"""
@ -710,7 +619,7 @@ class PlaylistModel(QAbstractTableModel):
plr = session.get(PlaylistRows, plrid)
if plr:
# Check this isn't a header row
if self.is_header_row(row_number):
if plr.track is None:
return
# Check track is readable
if file_is_unreadable(plr.track.path):
@ -718,7 +627,6 @@ class PlaylistModel(QAbstractTableModel):
track_sequence.next.set_plr(session, plr)
self.signals.next_track_changed_signal.emit()
self.invalidate_row(row_number)
self.update_track_times()
def setData(
self, index: QModelIndex, value: QVariant, role: int = Qt.ItemDataRole.EditRole
@ -740,7 +648,6 @@ class PlaylistModel(QAbstractTableModel):
)
return False
if plr.track_id:
if column == Col.TITLE.value or column == Col.ARTIST.value:
track = session.get(Tracks, plr.track_id)
if not track:
@ -755,10 +662,6 @@ class PlaylistModel(QAbstractTableModel):
return False
elif column == Col.NOTE.value:
plr.note = str(value)
else:
# This is a header row
if column == HEADER_NOTES_COLUMN:
plr.note = str(value)
# Flush changes before refreshing data
session.flush()
@ -768,82 +671,5 @@ class PlaylistModel(QAbstractTableModel):
return False
def supportedDropActions(self) -> Qt.DropAction:
def supportedDropActions(self):
return Qt.DropAction.MoveAction | Qt.DropAction.CopyAction
def update_track_times(self) -> None:
"""
Update track start/end times in self.playlist_rows
"""
next_start_time: Optional[datetime] = None
update_rows: List[int] = []
for row_number in range(len(self.playlist_rows)):
prd = self.playlist_rows[row_number]
# Reset start_time if this is the current row
if row_number == track_sequence.now.plr_rownum:
# Start/end times for current track are set in current_track_started
if not next_start_time:
next_start_time = prd.end_time
continue
# Set start time for next row if we have a current track
current_end_time = track_sequence.now.end_time
if row_number == track_sequence.next.plr_rownum and current_end_time:
prd.start_time = current_end_time
prd.end_time = current_end_time + timedelta(milliseconds=prd.duration)
next_start_time = prd.end_time
update_rows.append(row_number)
continue
# Don't update times for tracks that have been played
if prd.played:
continue
# Don't schedule unplayable tracks
if file_is_unreadable(prd.path):
continue
# If we're between the current and next row, zero out
# times
if (
track_sequence.now.plr_rownum is not None
and track_sequence.next.plr_rownum is not None
and track_sequence.now.plr_rownum
< row_number
< track_sequence.next.plr_rownum
):
prd.start_time = None
prd.end_time = None
update_rows.append(row_number)
continue
# Reset start time if timing in header or at current track
if not prd.path:
# This is a header row
header_time = get_embedded_time(prd.note)
if header_time:
next_start_time = header_time
else:
# This is an unplayed track; set start/end if we have a
# start time
if next_start_time is None:
continue
if prd.start_time != next_start_time:
prd.start_time = next_start_time
update_rows.append(row_number)
next_start_time += timedelta(
milliseconds=self.playlist_rows[row_number].duration
)
if prd.end_time != next_start_time:
prd.end_time = next_start_time
update_rows.append(row_number)
# Update start/stop times of rows that have changed
for updated_row in update_rows:
self.dataChanged.emit(
self.index(updated_row, Col.START_TIME.value),
self.index(updated_row, Col.END_TIME.value),
)

View File

@ -113,7 +113,7 @@ class EscapeDelegate(QStyledItemDelegate):
return True
elif key_event.key() == Qt.Key.Key_Escape:
discard_edits = QMessageBox.question(
cast(QWidget, self.parent()), "Abandon edit", "Discard changes?"
cast(QWidget, self), "Abandon edit", "Discard changes?"
)
if discard_edits == QMessageBox.StandardButton.Yes:
self.closeEditor.emit(editor)
@ -1064,6 +1064,7 @@ class PlaylistTab(QTableView):
header_row = False
model = cast(PlaylistModel, self.model())
if model:
header_row = model.is_header_row(row_number)
# current = row_number == self._get_current_track_row_number()
# next_row = row_number == self._get_next_track_row_number()

33
test.py Executable file
View File

@ -0,0 +1,33 @@
#!/usr/bin/env python
from PyQt5 import QtGui, QtWidgets
class TabBar(QtWidgets.QTabBar):
def paintEvent(self, event):
painter = QtWidgets.QStylePainter(self)
option = QtWidgets.QStyleOptionTab()
for index in range(self.count()):
self.initStyleOption(option, index)
bgcolor = QtGui.QColor(self.tabText(index))
option.palette.setColor(QtGui.QPalette.Window, bgcolor)
painter.drawControl(QtWidgets.QStyle.CE_TabBarTabShape, option)
painter.drawControl(QtWidgets.QStyle.CE_TabBarTabLabel, option)
class Window(QtWidgets.QTabWidget):
def __init__(self):
QtWidgets.QTabWidget.__init__(self)
self.setTabBar(TabBar(self))
for color in "tomato orange yellow lightgreen skyblue plum".split():
self.addTab(QtWidgets.QWidget(self), color)
if __name__ == "__main__":
import sys
app = QtWidgets.QApplication(sys.argv)
window = Window()
window.resize(420, 200)
window.show()
sys.exit(app.exec_())

View File

@ -1,37 +1,9 @@
from app.models import (
Playlists,
Tracks,
)
from app.helpers import get_file_metadata
from app import playlistmodel
from dbconfig import scoped_session
test_tracks = [
"testdata/isa.mp3",
"testdata/isa_with_gap.mp3",
"testdata/loser.mp3",
"testdata/lovecats-10seconds.mp3",
"testdata/lovecats.mp3",
"testdata/mom.mp3",
"testdata/sitting.mp3",
]
def create_model_with_tracks(session: scoped_session) -> "playlistmodel.PlaylistModel":
playlist = Playlists(session, "test playlist")
model = playlistmodel.PlaylistModel(playlist.id)
for row in range(len(test_tracks)):
track_path = test_tracks[row % len(test_tracks)]
metadata = get_file_metadata(track_path)
track = Tracks(session, **metadata)
model.insert_track_row(row, track.id, f"{row=}")
session.commit()
return model
def create_model_with_playlist_rows(
session: scoped_session, rows: int
) -> "playlistmodel.PlaylistModel":
@ -225,46 +197,3 @@ def test_insert_header_row_middle(monkeypatch, session):
model.edit_role(model.rowCount(), playlistmodel.Col.NOTE.value, prd)
== note_text
)
def test_create_model_with_tracks(monkeypatch, session):
monkeypatch.setattr(playlistmodel, "Session", session)
model = create_model_with_tracks(session)
assert len(model.playlist_rows) == len(test_tracks)
def test_timing_one_track(monkeypatch, session):
START_ROW = 0
END_ROW = 2
monkeypatch.setattr(playlistmodel, "Session", session)
model = create_model_with_tracks(session)
model.insert_header_row(START_ROW, "start+")
model.insert_header_row(END_ROW, "-")
prd = model.playlist_rows[START_ROW]
qv_value = model.display_role(START_ROW, playlistmodel.HEADER_NOTES_COLUMN, prd)
assert qv_value.value() == "start [1 tracks, 4:23 unplayed]"
# def test_edit_header(monkeypatch, session): # edit header row in middle of playlist
# monkeypatch.setattr(playlistmodel, "Session", session)
# note_text = "test text"
# initial_row_count = 11
# insert_row = 6
# model = create_model_with_playlist_rows(session, initial_row_count)
# model.insert_header_row(insert_row, note_text)
# assert model.rowCount() == initial_row_count + 1
# prd = model.playlist_rows[insert_row]
# # Test against edit_role because display_role for headers is
# # handled differently (sets up row span)
# assert (
# model.edit_role(model.rowCount(), playlistmodel.Col.NOTE.value, prd)
# == note_text
# )

View File

@ -1,4 +1,4 @@
from PyQt6.QtCore import Qt
from PyQt5.QtCore import Qt
from app import playlists
from app import models