Compare commits

..

11 Commits

Author SHA1 Message Date
Keith Edmunds
e689c9afeb Check for no title/artist tag in replace_files 2023-11-19 11:46:38 +00:00
Keith Edmunds
a0fa7e455e Change intro gap warning to 300ms 2023-11-16 22:24:37 +00:00
Keith Edmunds
8674e6d5b3 Try to fix occasional short dropouts at start of track
Move plot graph to before starting track
2023-11-13 20:55:08 +00:00
Keith Edmunds
705f3ea2f2 Fix bug with unended timed section 2023-11-08 21:10:35 +00:00
Keith Edmunds
405efee732 Fix bug that added row number to notes of imported tracks 2023-11-03 08:18:24 +00:00
Keith Edmunds
2db407edc5 Show section end time for all unplayed tracks 2023-10-30 19:27:59 +00:00
Keith Edmunds
ab8da0a312 Fix moving of timing starts and subtotals 2023-10-20 13:07:30 +01:00
Keith Edmunds
48d26d80df Fix replace_files after other updates 2023-10-19 11:20:22 +01:00
Keith Edmunds
da751ee530 Add return type in music.py 2023-10-18 08:54:15 +01:00
Keith Edmunds
282e4476a9 Clean up music.py interface 2023-10-17 22:52:30 +01:00
Keith Edmunds
ecd46b8a0a Improved fading
fade() takes an optional parameter, fade_seconds
fading is now logarithmic
2023-10-17 22:41:18 +01:00
41 changed files with 8314 additions and 4902 deletions

1
.gitignore vendored
View File

@ -10,4 +10,3 @@ StudioPlaylist.png
*.otl
*.howto
.direnv
tmp/

View File

@ -51,7 +51,7 @@ class MyTableWidget(QTableWidget):
def __init__(self, parent=None):
super().__init__(parent)
self.setItemDelegate(EscapeDelegate(self))
# self.setEditTriggers(QAbstractItemView.EditTrigger.DoubleClicked)
self.setEditTriggers(QAbstractItemView.EditTrigger.DoubleClicked)
class MainWindow(QMainWindow):

View File

@ -1,91 +0,0 @@
#!/usr/bin/env python3
from PyQt6.QtCore import Qt, QEvent, QObject, QVariant, QAbstractTableModel
from PyQt6.QtWidgets import (
QApplication,
QMainWindow,
QMessageBox,
QPlainTextEdit,
QStyledItemDelegate,
QTableView,
)
from PyQt6.QtGui import QKeyEvent
from typing import cast
class EscapeDelegate(QStyledItemDelegate):
def __init__(self, parent=None):
super().__init__(parent)
def createEditor(self, parent, option, index):
return QPlainTextEdit(parent)
def eventFilter(self, editor: QObject, event: QEvent):
"""By default, QPlainTextEdit doesn't handle enter or return"""
if event.type() == QEvent.Type.KeyPress:
key_event = cast(QKeyEvent, event)
print(key_event.key())
if key_event.key() == Qt.Key.Key_Return:
if key_event.modifiers() == (Qt.KeyboardModifier.ControlModifier):
print("save data")
self.commitData.emit(editor)
self.closeEditor.emit(editor)
return True
elif key_event.key() == Qt.Key.Key_Escape:
discard_edits = QMessageBox.question(
self.parent(), "Abandon edit", "Discard changes?"
)
if discard_edits == QMessageBox.StandardButton.Yes:
print("abandon edit")
self.closeEditor.emit(editor)
return True
return False
class MyTableWidget(QTableView):
def __init__(self, parent=None):
super().__init__(parent)
self.setItemDelegate(EscapeDelegate(self))
self.setModel(MyModel())
class MyModel(QAbstractTableModel):
def columnCount(self, index):
return 2
def rowCount(self, index):
return 2
def data(self, index, role):
if not index.isValid() or not (0 <= index.row() < 2):
return QVariant()
row = index.row()
column = index.column()
if role == Qt.ItemDataRole.DisplayRole:
return QVariant(f"Row {row}, Col {column}")
return QVariant()
def flags(self, index):
return Qt.ItemFlag.ItemIsEnabled | Qt.ItemFlag.ItemIsSelectable | Qt.ItemFlag.ItemIsEditable
class MainWindow(QMainWindow):
def __init__(self, parent=None):
super().__init__(parent)
self.table_widget = MyTableWidget(self)
self.setCentralWidget(self.table_widget)
self.table_widget.resizeColumnsToContents()
self.table_widget.resizeRowsToContents()
if __name__ == "__main__":
app = QApplication([])
window = MainWindow()
window.show()
app.exec()

View File

@ -1,223 +0,0 @@
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional
from PyQt6.QtCore import pyqtSignal, QObject, QThread
import numpy as np
import pyqtgraph as pg # type: ignore
from config import Config
from dbconfig import scoped_session
from models import PlaylistRows
import helpers
class FadeCurve:
GraphWidget = None
def __init__(
self, track_path: str, track_fade_at: int, track_silence_at: int
) -> None:
"""
Set up fade graph array
"""
audio = helpers.get_audio_segment(track_path)
if not audio:
return None
# Start point of curve is Config.FADE_CURVE_MS_BEFORE_FADE
# milliseconds before fade starts to silence
self.start_ms = max(0, track_fade_at - Config.FADE_CURVE_MS_BEFORE_FADE - 1)
self.end_ms = track_silence_at
self.audio_segment = audio[self.start_ms : self.end_ms]
self.graph_array = np.array(self.audio_segment.get_array_of_samples())
# Calculate the factor to map milliseconds of track to array
self.ms_to_array_factor = len(self.graph_array) / (self.end_ms - self.start_ms)
self.region = None
def clear(self) -> None:
"""Clear the current graph"""
if self.GraphWidget:
self.GraphWidget.clear()
def plot(self):
self.curve = self.GraphWidget.plot(self.graph_array)
self.curve.setPen(Config.FADE_CURVE_FOREGROUND)
def tick(self, play_time) -> None:
"""Update volume fade curve"""
if not self.GraphWidget:
return
ms_of_graph = play_time - self.start_ms
if ms_of_graph < 0:
return
if self.region is None:
# Create the region now that we're into fade
self.region = pg.LinearRegionItem([0, 0], bounds=[0, len(self.graph_array)])
self.GraphWidget.addItem(self.region)
# Update region position
self.region.setRegion([0, ms_of_graph * self.ms_to_array_factor])
@helpers.singleton
@dataclass
class MusicMusterSignals(QObject):
"""
Class for all MusicMuster signals. See:
- https://zetcode.com/gui/pyqt5/eventssignals/
- https://stackoverflow.com/questions/62654525/
emit-a-signal-from-another-class-to-main-class
and Singleton class at
https://refactoring.guru/design-patterns/singleton/python/example#example-0
"""
begin_reset_model_signal = pyqtSignal(int)
enable_escape_signal = pyqtSignal(bool)
end_reset_model_signal = pyqtSignal(int)
next_track_changed_signal = pyqtSignal()
resize_rows_signal = pyqtSignal(int)
row_order_changed_signal = pyqtSignal(int)
search_songfacts_signal = pyqtSignal(str)
search_wikipedia_signal = pyqtSignal(str)
show_warning_signal = pyqtSignal(str, str)
span_cells_signal = pyqtSignal(int, int, int, int, int)
status_message_signal = pyqtSignal(str, int)
def __post_init__(self):
super().__init__()
class PlaylistTrack:
"""
Used to provide a single reference point for specific playlist tracks,
typically the previous, current and next track.
"""
def __init__(self) -> None:
"""
Only initialises data structure. Call set_plr to populate.
"""
self.artist: Optional[str] = None
self.duration: Optional[int] = None
self.end_time: Optional[datetime] = None
self.fade_at: Optional[int] = None
self.fade_graph: Optional[FadeCurve] = None
self.fade_length: Optional[int] = None
self.path: Optional[str] = None
self.playlist_id: Optional[int] = None
self.plr_id: Optional[int] = None
self.plr_rownum: Optional[int] = None
self.resume_marker: Optional[float] = None
self.silence_at: Optional[int] = None
self.start_gap: Optional[int] = None
self.start_time: Optional[datetime] = None
self.title: Optional[str] = None
self.track_id: Optional[int] = None
def __repr__(self) -> str:
return (
f"<PlaylistTrack(title={self.title}, artist={self.artist}, "
f"plr_rownum={self.plr_rownum}, playlist_id={self.playlist_id}>"
)
def set_plr(self, session: scoped_session, plr: PlaylistRows) -> None:
"""
Update with new plr information
"""
session.add(plr)
self.plr_rownum = plr.plr_rownum
if not plr.track:
return
track = plr.track
self.artist = track.artist
self.duration = track.duration
self.end_time = None
self.fade_at = track.fade_at
self.path = track.path
self.playlist_id = plr.playlist_id
self.plr_id = plr.id
self.silence_at = track.silence_at
self.start_gap = track.start_gap
self.start_time = None
self.title = track.title
self.track_id = track.id
if track.silence_at and track.fade_at:
self.fade_length = track.silence_at - track.fade_at
# Initialise and add FadeCurve in a thread as it's slow
# Import in separate thread
self.fadecurve_thread = QThread()
self.worker = AddFadeCurve(
self,
track_path=track.path,
track_fade_at=track.fade_at,
track_silence_at=track.silence_at,
)
self.worker.moveToThread(self.fadecurve_thread)
self.fadecurve_thread.started.connect(self.worker.run)
self.worker.finished.connect(self.fadecurve_thread.quit)
self.worker.finished.connect(self.worker.deleteLater)
self.fadecurve_thread.finished.connect(self.fadecurve_thread.deleteLater)
self.fadecurve_thread.start()
def start(self) -> None:
"""
Called when track starts playing
"""
self.start_time = datetime.now()
if self.duration:
self.end_time = self.start_time + timedelta(milliseconds=self.duration)
class AddFadeCurve(QObject):
"""
Initialising a fade curve introduces a noticeable delay so carry out in
a thread.
"""
finished = pyqtSignal()
def __init__(
self,
playlist_track: PlaylistTrack,
track_path: str,
track_fade_at: int,
track_silence_at: int,
):
super().__init__()
self.playlist_track = playlist_track
self.track_path = track_path
self.track_fade_at = track_fade_at
self.track_silence_at = track_silence_at
def run(self):
"""
Create fade curve and add to PlaylistTrack object
"""
self.playlist_track.fade_graph = FadeCurve(
self.track_path, self.track_fade_at, self.track_silence_at
)
self.finished.emit()
class TrackSequence:
next = PlaylistTrack()
now = PlaylistTrack()
previous = PlaylistTrack()
track_sequence = TrackSequence()

View File

@ -32,60 +32,57 @@ class Config(object):
COLOUR_ODD_PLAYLIST = "#f2f2f2"
COLOUR_UNREADABLE = "#dc3545"
COLOUR_WARNING_TIMER = "#ffc107"
COLUMN_NAME_ARTIST = "Artist"
COLUMN_NAME_AUTOPLAY = "A"
COLUMN_NAME_BITRATE = "bps"
COLUMN_NAME_END_TIME = "End"
COLUMN_NAME_LAST_PLAYED = "Last played"
COLUMN_NAME_LEADING_SILENCE = "Gap"
COLUMN_NAME_LENGTH = "Length"
COLUMN_NAME_NOTES = "Notes"
COLUMN_NAME_START_TIME = "Start"
COLUMN_NAME_TITLE = "Title"
DBFS_SILENCE = -50
DEBUG_FUNCTIONS: List[Optional[str]] = []
DEBUG_MODULES: List[Optional[str]] = ["dbconfig"]
DEBUG_MODULES: List[Optional[str]] = ['dbconfig']
DEFAULT_COLUMN_WIDTH = 200
DISPLAY_SQL = False
EPOCH = datetime.datetime(1970, 1, 1)
ERRORS_FROM = ["noreply@midnighthax.com"]
ERRORS_TO = ["kae@midnighthax.com"]
ERRORS_FROM = ['noreply@midnighthax.com']
ERRORS_TO = ['kae@midnighthax.com']
FADE_CURVE_BACKGROUND = "lightyellow"
FADE_CURVE_FOREGROUND = "blue"
FADE_CURVE_MS_BEFORE_FADE = 5000
FADEOUT_DB = -10
FADEOUT_SECONDS = 5
FADEOUT_STEPS_PER_SECOND = 5
HEADER_ARTIST = "Artist"
HEADER_BITRATE = "bps"
HEADER_DURATION = "Length"
HEADER_END_TIME = "End"
HEADER_LAST_PLAYED = "Last played"
HEADER_NOTE = "Notes"
HEADER_START_GAP = "Gap"
HEADER_START_TIME = "Start"
HEADER_TITLE = "Title"
HIDE_AFTER_PLAYING_OFFSET = 5000
INFO_TAB_TITLE_LENGTH = 15
LAST_PLAYED_TODAY_STRING = "Today"
LOG_LEVEL_STDERR = logging.ERROR
LOG_LEVEL_SYSLOG = logging.DEBUG
LOG_NAME = "musicmuster"
MAIL_PASSWORD = os.environ.get("MAIL_PASSWORD")
MAIL_PORT = int(os.environ.get("MAIL_PORT") or 25)
MAIL_SERVER = os.environ.get("MAIL_SERVER") or "woodlands.midnighthax.com"
MAIL_USERNAME = os.environ.get("MAIL_USERNAME")
MAIL_USE_TLS = os.environ.get("MAIL_USE_TLS") is not None
MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD')
MAIL_PORT = int(os.environ.get('MAIL_PORT') or 25)
MAIL_SERVER = os.environ.get('MAIL_SERVER') or "woodlands.midnighthax.com"
MAIL_USERNAME = os.environ.get('MAIL_USERNAME')
MAIL_USE_TLS = os.environ.get('MAIL_USE_TLS') is not None
MAX_IMPORT_MATCHES = 5
MAX_INFO_TABS = 5
MAX_MISSING_FILES_TO_REPORT = 10
MILLISECOND_SIGFIGS = 0
MINIMUM_ROW_HEIGHT = 30
NOTE_TIME_FORMAT = "%H:%M"
NOTE_TIME_FORMAT = "%H:%M:%S"
OBS_HOST = "localhost"
OBS_PASSWORD = "auster"
OBS_PORT = 4455
PLAY_SETTLE = 500000
ROOT = os.environ.get("ROOT") or "/home/kae/music"
ROWS_FROM_ZERO = True
ROOT = os.environ.get('ROOT') or "/home/kae/music"
IMPORT_DESTINATION = os.path.join(ROOT, "Singles")
SCROLL_TOP_MARGIN = 3
START_GAP_WARNING_THRESHOLD = 300
TEXT_NO_TRACK_NO_NOTE = "[Section header]"
TOD_TIME_FORMAT = "%H:%M:%S"
TRACK_TIME_FORMAT = "%H:%M:%S"
VOLUME_VLC_DEFAULT = 75
VOLUME_VLC_DROP3db = 65
WARNING_MS_BEFORE_FADE = 5500
WARNING_MS_BEFORE_SILENCE = 5500
WEB_ZOOM_FACTOR = 1.2

View File

@ -15,6 +15,19 @@ else:
dbname = MYSQL_CONNECT.split("/")[-1]
log.debug(f"Database: {dbname}")
# MM_ENV = os.environ.get('MM_ENV', 'PRODUCTION')
# testing = False
# if MM_ENV == 'TESTING':
# dbname = os.environ.get('MM_TESTING_DBNAME', 'musicmuster_testing')
# dbuser = os.environ.get('MM_TESTING_DBUSER', 'musicmuster_testing')
# dbpw = os.environ.get('MM_TESTING_DBPW', 'musicmuster_testing')
# dbhost = os.environ.get('MM_TESTING_DBHOST', 'localhost')
# testing = True
# else:
# raise ValueError(f"Unknown MusicMuster environment: {MM_ENV=}")
#
# MYSQL_CONNECT = f"mysql+mysqldb://{dbuser}:{dbpw}@{dbhost}/{dbname}"
engine = create_engine(
MYSQL_CONNECT,
echo=Config.DISPLAY_SQL,
@ -30,9 +43,12 @@ def Session() -> Generator[scoped_session, None, None]:
file = frame.filename
function = frame.function
lineno = frame.lineno
Session = scoped_session(sessionmaker(bind=engine))
log.debug(f"Session acquired: {file}:{function}:{lineno} " f"[{hex(id(Session))}]")
Session = scoped_session(sessionmaker(bind=engine, future=True))
log.debug(f"SqlA: session acquired [{hex(id(Session))}]")
log.debug(
f"Session acquisition: {file}:{function}:{lineno} " f"[{hex(id(Session))}]"
)
yield Session
log.debug(f" Session released [{hex(id(Session))}]")
log.debug(f" SqlA: session released [{hex(id(Session))}]")
Session.commit()
Session.close()

View File

@ -1,196 +0,0 @@
from typing import Optional
from PyQt6.QtCore import QEvent, Qt
from PyQt6.QtWidgets import QDialog, QListWidgetItem
from classes import MusicMusterSignals
from dbconfig import scoped_session
from helpers import (
ask_yes_no,
get_relative_date,
ms_to_mmss,
)
from models import Settings, Tracks
from playlistmodel import PlaylistModel
from ui.dlg_TrackSelect_ui import Ui_Dialog # type: ignore
class TrackSelectDialog(QDialog):
"""Select track from database"""
def __init__(
self,
session: scoped_session,
new_row_number: int,
model: PlaylistModel,
add_to_header: Optional[bool] = False,
*args,
**kwargs,
) -> None:
"""
Subclassed QDialog to manage track selection
"""
super().__init__(*args, **kwargs)
self.session = session
self.new_row_number = new_row_number
self.model = model
self.add_to_header = add_to_header
self.ui = Ui_Dialog()
self.ui.setupUi(self)
self.ui.btnAdd.clicked.connect(self.add_selected)
self.ui.btnAddClose.clicked.connect(self.add_selected_and_close)
self.ui.btnClose.clicked.connect(self.close)
self.ui.matchList.itemDoubleClicked.connect(self.add_selected)
self.ui.matchList.itemSelectionChanged.connect(self.selection_changed)
self.ui.radioTitle.toggled.connect(self.title_artist_toggle)
self.ui.searchString.textEdited.connect(self.chars_typed)
self.track: Optional[Tracks] = None
self.signals = MusicMusterSignals()
record = Settings.get_int_settings(self.session, "dbdialog_width")
width = record.f_int or 800
record = Settings.get_int_settings(self.session, "dbdialog_height")
height = record.f_int or 600
self.resize(width, height)
def add_selected(self) -> None:
"""Handle Add button"""
track = None
if self.ui.matchList.selectedItems():
item = self.ui.matchList.currentItem()
if item:
track = item.data(Qt.ItemDataRole.UserRole)
note = self.ui.txtNote.text()
if not note and not track:
return
self.ui.txtNote.clear()
self.select_searchtext()
track_id = None
if track:
track_id = track.id
else:
return
# Check whether track is already in playlist
move_existing = False
existing_prd = self.model.is_track_in_playlist(track_id)
if existing_prd is not None:
if ask_yes_no(
"Duplicate row",
"Track already in playlist. " "Move to new location?",
default_yes=True,
):
move_existing = True
if self.add_to_header and existing_prd: # "and existing_prd" for mypy's benefit
if move_existing:
self.model.move_track_to_header(self.new_row_number, existing_prd, note)
else:
self.model.add_track_to_header(self.new_row_number, track_id)
# Close dialog - we can only add one track to a header
self.accept()
else:
if move_existing and existing_prd: # "and existing_prd" for mypy's benefit
self.model.move_track_add_note(self.new_row_number, existing_prd, note)
else:
self.model.insert_row(self.new_row_number, track_id, note)
def add_selected_and_close(self) -> None:
"""Handle Add and Close button"""
self.add_selected()
self.accept()
def chars_typed(self, s: str) -> None:
"""Handle text typed in search box"""
self.ui.matchList.clear()
if len(s) > 0:
if self.ui.radioTitle.isChecked():
matches = Tracks.search_titles(self.session, "%" + s)
else:
matches = Tracks.search_artists(self.session, "%" + s)
if matches:
for track in matches:
last_played = None
last_playdate = max(
track.playdates, key=lambda p: p.lastplayed, default=None
)
if last_playdate:
last_played = last_playdate.lastplayed
t = QListWidgetItem()
track_text = (
f"{track.title} - {track.artist} "
f"[{ms_to_mmss(track.duration)}] "
f"({get_relative_date(last_played)})"
)
t.setText(track_text)
t.setData(Qt.ItemDataRole.UserRole, track)
self.ui.matchList.addItem(t)
def closeEvent(self, event: Optional[QEvent]) -> None:
"""
Override close and save dialog coordinates
"""
if not event:
return
record = Settings.get_int_settings(self.session, "dbdialog_height")
if record.f_int != self.height():
record.update(self.session, {"f_int": self.height()})
record = Settings.get_int_settings(self.session, "dbdialog_width")
if record.f_int != self.width():
record.update(self.session, {"f_int": self.width()})
event.accept()
def keyPressEvent(self, event):
"""
Clear selection on ESC if there is one
"""
if event.key() == Qt.Key.Key_Escape:
if self.ui.matchList.selectedItems():
self.ui.matchList.clearSelection()
return
super(TrackSelectDialog, self).keyPressEvent(event)
def select_searchtext(self) -> None:
"""Select the searchbox"""
self.ui.searchString.selectAll()
self.ui.searchString.setFocus()
def selection_changed(self) -> None:
"""Display selected track path in dialog box"""
if not self.ui.matchList.selectedItems():
return
item = self.ui.matchList.currentItem()
track = item.data(Qt.ItemDataRole.UserRole)
last_playdate = max(track.playdates, key=lambda p: p.lastplayed, default=None)
if last_playdate:
last_played = last_playdate.lastplayed
else:
last_played = None
path_text = f"{track.path} ({get_relative_date(last_played)})"
self.ui.dbPath.setText(path_text)
def title_artist_toggle(self) -> None:
"""
Handle switching between searching for artists and searching for
titles
"""
# Logic is handled already in chars_typed(), so just call that.
self.chars_typed(self.ui.searchString.text())

View File

@ -1,29 +1,21 @@
from datetime import datetime
from email.message import EmailMessage
from typing import Any, Dict, Optional
import functools
import os
import psutil
import re
import shutil
import smtplib
import ssl
import tempfile
from config import Config
from datetime import datetime
from email.message import EmailMessage
from log import log
from mutagen.flac import FLAC # type: ignore
from mutagen.mp3 import MP3 # type: ignore
from pydub import AudioSegment, effects
from pydub.utils import mediainfo
from PyQt6.QtWidgets import QMainWindow, QMessageBox
from PyQt6.QtWidgets import QMainWindow, QMessageBox # type: ignore
from tinytag import TinyTag # type: ignore
from config import Config
from log import log
start_time_re = re.compile(r"@\d\d:\d\d")
# Classes are defined after global functions so that classes can use
# those functions.
from typing import Any, Dict, Optional
def ask_yes_no(title: str, question: str, default_yes: bool = False) -> bool:
@ -98,47 +90,20 @@ def get_audio_segment(path: str) -> Optional[AudioSegment]:
return None
def get_embedded_time(text: str) -> Optional[datetime]:
"""Return datetime specified as @hh:mm in text"""
def get_tags(path: str) -> Dict[str, Any]:
"""
Return a dictionary of title, artist, duration-in-milliseconds and path.
"""
try:
match = start_time_re.search(text)
except TypeError:
return None
if not match:
return None
tag = TinyTag.get(path)
try:
return datetime.strptime(match.group(0)[1:], Config.NOTE_TIME_FORMAT)
except ValueError:
return None
def get_file_metadata(filepath: str) -> dict:
"""Return track metadata"""
# Get title, artist, bitrate, duration, path
metadata: Dict[str, str | int | float] = get_tags(filepath)
metadata["mtime"] = os.path.getmtime(filepath)
# Set start_gap, fade_at and silence_at
audio = get_audio_segment(filepath)
if not audio:
audio_values = dict(start_gap=0, fade_at=0, silence_at=0)
else:
audio_values = dict(
start_gap=leading_silence(audio),
fade_at=int(
round(fade_point(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000
),
silence_at=int(
round(trailing_silence(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000
),
return dict(
title=tag.title,
artist=tag.artist,
bitrate=round(tag.bitrate),
duration=int(round(tag.duration, Config.MILLISECOND_SIGFIGS) * 1000),
path=path,
)
metadata |= audio_values
return metadata
def get_relative_date(
@ -171,7 +136,7 @@ def get_relative_date(
weeks, days = divmod((reference_date.date() - past_date.date()).days, 7)
if weeks == days == 0:
# Same day so return time instead
return Config.LAST_PLAYED_TODAY_STRING + " " + past_date.strftime("%H:%M")
return past_date.strftime("%H:%M")
if weeks == 1:
weeks_str = "week"
else:
@ -183,20 +148,33 @@ def get_relative_date(
return f"{weeks} {weeks_str}, {days} {days_str} ago"
def get_tags(path: str) -> Dict[str, Any]:
"""
Return a dictionary of title, artist, duration-in-milliseconds and path.
"""
def get_file_metadata(filepath: str) -> dict:
"""Return track metadata"""
tag = TinyTag.get(path)
# Get title, artist, bitrate, duration, path
metadata: Dict[str, str | int | float] = get_tags(filepath)
return dict(
title=tag.title,
artist=tag.artist,
bitrate=round(tag.bitrate),
duration=int(round(tag.duration, Config.MILLISECOND_SIGFIGS) * 1000),
path=path,
metadata['mtime'] = os.path.getmtime(filepath)
# Set start_gap, fade_at and silence_at
audio = get_audio_segment(filepath)
if not audio:
audio_values = dict(
start_gap=0,
fade_at=0,
silence_at=0
)
else:
audio_values = dict(
start_gap=leading_silence(audio),
fade_at=int(round(fade_point(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000),
silence_at=int(
round(trailing_silence(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000
)
)
metadata |= audio_values
return metadata
def leading_silence(
@ -226,12 +204,33 @@ def leading_silence(
return min(trim_ms, len(audio_segment))
def ms_to_mmss(
ms: Optional[int],
decimals: int = 0,
negative: bool = False,
none: Optional[str] = None,
) -> str:
def send_mail(to_addr, from_addr, subj, body):
# From https://docs.python.org/3/library/email.examples.html
# Create a text/plain message
msg = EmailMessage()
msg.set_content(body)
msg["Subject"] = subj
msg["From"] = from_addr
msg["To"] = to_addr
# Send the message via SMTP server.
context = ssl.create_default_context()
try:
s = smtplib.SMTP(host=Config.MAIL_SERVER, port=Config.MAIL_PORT)
if Config.MAIL_USE_TLS:
s.starttls(context=context)
if Config.MAIL_USERNAME and Config.MAIL_PASSWORD:
s.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD)
s.send_message(msg)
except Exception as e:
print(e)
finally:
s.quit()
def ms_to_mmss(ms: Optional[int], decimals: int = 0, negative: bool = False) -> str:
"""Convert milliseconds to mm:ss"""
minutes: int
@ -239,9 +238,6 @@ def ms_to_mmss(
seconds: float
if not ms:
if none:
return none
else:
return "-"
sign = ""
if ms < 0:
@ -364,32 +360,6 @@ def open_in_audacity(path: str) -> bool:
return True
def send_mail(to_addr, from_addr, subj, body):
# From https://docs.python.org/3/library/email.examples.html
# Create a text/plain message
msg = EmailMessage()
msg.set_content(body)
msg["Subject"] = subj
msg["From"] = from_addr
msg["To"] = to_addr
# Send the message via SMTP server.
context = ssl.create_default_context()
try:
s = smtplib.SMTP(host=Config.MAIL_SERVER, port=Config.MAIL_PORT)
if Config.MAIL_USE_TLS:
s.starttls(context=context)
if Config.MAIL_USERNAME and Config.MAIL_PASSWORD:
s.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD)
s.send_message(msg)
except Exception as e:
print(e)
finally:
s.quit()
def set_track_metadata(track):
"""Set/update track metadata in database"""
@ -411,22 +381,6 @@ def show_warning(parent: QMainWindow, title: str, msg: str) -> None:
QMessageBox.warning(parent, title, msg, buttons=QMessageBox.StandardButton.Cancel)
def singleton(cls):
"""
Make a class a Singleton class (see
https://realpython.com/primer-on-python-decorators/#creating-singletons)
"""
@functools.wraps(cls)
def wrapper_singleton(*args, **kwargs):
if not wrapper_singleton.instance:
wrapper_singleton.instance = cls(*args, **kwargs)
return wrapper_singleton.instance
wrapper_singleton.instance = None
return wrapper_singleton
def trailing_silence(
audio_segment: AudioSegment,
silence_threshold: int = -50,

1
app/icons_rc.py Symbolic link
View File

@ -0,0 +1 @@
ui/icons_rc.py

View File

@ -8,8 +8,6 @@ from PyQt6.QtWebEngineWidgets import QWebEngineView
from PyQt6.QtWidgets import QTabWidget
from config import Config
from classes import MusicMusterSignals
class InfoTabs(QTabWidget):
"""
@ -19,9 +17,7 @@ class InfoTabs(QTabWidget):
def __init__(self, parent=None) -> None:
super().__init__(parent)
self.signals = MusicMusterSignals()
self.signals.search_songfacts_signal.connect(self.open_in_songfacts)
self.signals.search_wikipedia_signal.connect(self.open_in_wikipedia)
# Dictionary to record when tabs were last updated (so we can
# re-use the oldest one later)
self.last_update: Dict[QWebEngineView, datetime] = {}
self.tabtitles: Dict[int, str] = {}

View File

@ -6,13 +6,11 @@ from config import Config
from dbconfig import scoped_session
from datetime import datetime
from pprint import pprint
from typing import List, Optional, Sequence
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy import (
bindparam,
Boolean,
DateTime,
delete,
@ -26,6 +24,7 @@ from sqlalchemy import (
from sqlalchemy.orm import (
DeclarativeBase,
joinedload,
lazyload,
Mapped,
mapped_column,
relationship,
@ -50,9 +49,9 @@ class Carts(Base):
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
cart_number: Mapped[int] = mapped_column(unique=True)
name: Mapped[str] = mapped_column(String(256), index=True)
duration: Mapped[Optional[int]] = mapped_column(index=True)
path: Mapped[Optional[str]] = mapped_column(String(2048), index=False)
enabled: Mapped[Optional[bool]] = mapped_column(default=False)
duration: Mapped[int] = mapped_column(index=True)
path: Mapped[str] = mapped_column(String(2048), index=False)
enabled: Mapped[bool] = mapped_column(default=False)
def __repr__(self) -> str:
return (
@ -64,7 +63,7 @@ class Carts(Base):
self,
session: scoped_session,
cart_number: int,
name: str,
name: Optional[str] = None,
duration: Optional[int] = None,
path: Optional[str] = None,
enabled: bool = True,
@ -98,34 +97,6 @@ class NoteColours(Base):
f"colour={self.colour}>"
)
def __init__(
self,
session: scoped_session,
substring: str,
colour: str,
enabled: bool = True,
is_regex: bool = False,
is_casesensitive: bool = False,
order: Optional[int] = 0,
) -> None:
self.substring = substring
self.colour = colour
self.enabled = enabled
self.is_regex = is_regex
self.is_casesensitive = is_casesensitive
self.order = order
session.add(self)
session.flush()
@classmethod
def get_all(cls, session: scoped_session) -> Sequence["NoteColours"]:
"""
Return all records
"""
return session.scalars(select(cls)).all()
@staticmethod
def get_colour(session: scoped_session, text: str) -> Optional[str]:
"""
@ -135,11 +106,15 @@ class NoteColours(Base):
if not text:
return None
for rec in session.scalars(
for rec in (
session.execute(
select(NoteColours)
.filter(NoteColours.enabled.is_(True))
.order_by(NoteColours.order)
).all():
)
.scalars()
.all()
):
if rec.is_regex:
flags = re.UNICODE
if not rec.is_casesensitive:
@ -200,11 +175,15 @@ class Playdates(Base):
def played_after(session: scoped_session, since: datetime) -> Sequence["Playdates"]:
"""Return a list of Playdates objects since passed time"""
return session.scalars(
return (
session.execute(
select(Playdates)
.where(Playdates.lastplayed >= since)
.order_by(Playdates.lastplayed)
).all()
)
.scalars()
.all()
)
class Playlists(Base):
@ -217,8 +196,7 @@ class Playlists(Base):
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(32), unique=True)
last_used: Mapped[Optional[datetime]] = mapped_column(DateTime, default=None)
tab: Mapped[Optional[int]] = mapped_column(default=None)
open: Mapped[bool] = mapped_column(default=False)
tab: Mapped[Optional[int]] = mapped_column(default=None, unique=True)
is_template: Mapped[bool] = mapped_column(default=False)
deleted: Mapped[bool] = mapped_column(default=False)
rows: Mapped[List["PlaylistRows"]] = relationship(
@ -231,7 +209,7 @@ class Playlists(Base):
def __repr__(self) -> str:
return (
f"<Playlists(id={self.id}, name={self.name}, "
f"is_templatee={self.is_template}, open={self.open}>"
f"is_templatee={self.is_template}>"
)
def __init__(self, session: scoped_session, name: str):
@ -239,10 +217,19 @@ class Playlists(Base):
session.add(self)
session.flush()
def close(self) -> None:
def close(self, session: scoped_session) -> None:
"""Mark playlist as unloaded"""
self.open = False
closed_idx = self.tab
self.tab = None
# Closing this tab will mean all higher-number tabs have moved
# down by one
session.execute(
update(Playlists)
.where(Playlists.tab > closed_idx)
.values(tab=Playlists.tab - 1)
)
@classmethod
def create_playlist_from_template(
@ -272,61 +259,78 @@ class Playlists(Base):
def get_all(cls, session: scoped_session) -> Sequence["Playlists"]:
"""Returns a list of all playlists ordered by last use"""
return session.scalars(
return (
session.execute(
select(cls)
.filter(cls.is_template.is_(False))
.order_by(cls.last_used.desc())
).all()
.order_by(cls.tab.desc(), cls.last_used.desc())
)
.scalars()
.all()
)
@classmethod
def get_all_templates(cls, session: scoped_session) -> Sequence["Playlists"]:
"""Returns a list of all templates ordered by name"""
return session.scalars(
return (
session.execute(
select(cls).filter(cls.is_template.is_(True)).order_by(cls.name)
).all()
)
.scalars()
.all()
)
@classmethod
def get_closed(cls, session: scoped_session) -> Sequence["Playlists"]:
"""Returns a list of all closed playlists ordered by last use"""
return session.scalars(
return (
session.execute(
select(cls)
.filter(
cls.open.is_(False),
cls.tab.is_(None),
cls.is_template.is_(False),
cls.deleted.is_(False),
)
.order_by(cls.last_used.desc())
).all()
)
.scalars()
.all()
)
@classmethod
def get_open(cls, session: scoped_session) -> Sequence[Optional["Playlists"]]:
"""
Return a list of loaded playlists ordered by tab.
"""
return session.scalars(
select(cls).where(cls.open.is_(True)).order_by(cls.tab)
).all()
def mark_open(self) -> None:
"""Mark playlist as loaded and used now"""
self.open = True
self.last_used = datetime.now()
@staticmethod
def name_is_available(session: scoped_session, name: str) -> bool:
"""
Return True if no playlist of this name exists else false.
Return a list of loaded playlists ordered by tab order.
"""
return (
session.execute(select(Playlists).where(Playlists.name == name)).first()
is None
session.execute(select(cls).where(cls.tab.is_not(None)).order_by(cls.tab))
.scalars()
.all()
)
def mark_open(self, session: scoped_session, tab_index: int) -> None:
"""Mark playlist as loaded and used now"""
self.tab = tab_index
self.last_used = datetime.now()
@staticmethod
def move_tab(session: scoped_session, frm: int, to: int) -> None:
"""Move tabs"""
row_frm = session.execute(select(Playlists).filter_by(tab=frm)).scalar_one()
row_to = session.execute(select(Playlists).filter_by(tab=to)).scalar_one()
row_frm.tab = None
row_to.tab = None
session.commit()
row_to.tab = frm
row_frm.tab = to
def rename(self, session: scoped_session, new_name: str) -> None:
"""
Rename playlist
@ -381,9 +385,9 @@ class PlaylistRows(Base):
self,
session: scoped_session,
playlist_id: int,
track_id: Optional[int],
row_number: int,
note: str = "",
track_id: Optional[int] = None,
) -> None:
"""Create PlaylistRows object"""
@ -407,39 +411,33 @@ class PlaylistRows(Base):
def copy_playlist(session: scoped_session, src_id: int, dst_id: int) -> None:
"""Copy playlist entries"""
src_rows = session.scalars(
src_rows = (
session.execute(
select(PlaylistRows).filter(PlaylistRows.playlist_id == src_id)
).all()
)
.scalars()
.all()
)
for plr in src_rows:
PlaylistRows(
session=session,
playlist_id=dst_id,
row_number=plr.plr_rownum,
note=plr.note,
track_id=plr.track_id,
)
PlaylistRows(session, dst_id, plr.track_id, plr.plr_rownum, plr.note)
@classmethod
def deep_row(
cls, session: scoped_session, playlist_id: int, row_number: int
) -> "PlaylistRows":
@staticmethod
def delete_higher_rows(
session: scoped_session, playlist_id: int, maxrow: int
) -> None:
"""
Return a playlist row that includes full track and lastplayed data for
given playlist_id and row
Delete rows in given playlist that have a higher row number
than 'maxrow'
"""
stmt = (
select(PlaylistRows)
.options(joinedload(cls.track))
.where(
session.execute(
delete(PlaylistRows).where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.plr_rownum == row_number,
PlaylistRows.plr_rownum > maxrow,
)
# .options(joinedload(Tracks.playdates))
)
return session.execute(stmt).unique().scalar_one()
session.flush()
@classmethod
def deep_rows(
@ -460,47 +458,21 @@ class PlaylistRows(Base):
return session.scalars(stmt).unique().all()
@staticmethod
def delete_higher_rows(
session: scoped_session, playlist_id: int, maxrow: int
) -> None:
"""
Delete rows in given playlist that have a higher row number
than 'maxrow'
"""
session.execute(
delete(PlaylistRows).where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.plr_rownum > maxrow,
)
)
session.flush()
@staticmethod
def delete_row(session: scoped_session, playlist_id: int, row_number: int) -> None:
"""
Delete passed row in given playlist.
"""
session.execute(
delete(PlaylistRows).where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.plr_rownum == row_number,
)
)
@staticmethod
def fixup_rownumbers(session: scoped_session, playlist_id: int) -> None:
"""
Ensure the row numbers for passed playlist have no gaps
"""
plrs = session.scalars(
plrs = (
session.execute(
select(PlaylistRows)
.where(PlaylistRows.playlist_id == playlist_id)
.order_by(PlaylistRows.plr_rownum)
).all()
)
.scalars()
.all()
)
for i, plr in enumerate(plrs):
plr.plr_rownum = i
@ -517,11 +489,15 @@ class PlaylistRows(Base):
PlaylistRows objects
"""
plrs = session.scalars(
plrs = (
session.execute(
select(cls)
.where(cls.playlist_id == playlist_id, cls.id.in_(plr_ids))
.order_by(cls.plr_rownum)
).all()
)
.scalars()
.all()
)
return plrs
@ -559,11 +535,15 @@ class PlaylistRows(Base):
have been played.
"""
plrs = session.scalars(
plrs = (
session.execute(
select(cls)
.where(cls.playlist_id == playlist_id, cls.played.is_(True))
.order_by(cls.plr_rownum)
).all()
)
.scalars()
.all()
)
return plrs
@ -588,7 +568,7 @@ class PlaylistRows(Base):
if to_row is not None:
query = query.where(cls.plr_rownum <= to_row)
plrs = session.scalars((query).order_by(cls.plr_rownum)).all()
plrs = session.execute((query).order_by(cls.plr_rownum)).scalars().all()
return plrs
@ -601,7 +581,8 @@ class PlaylistRows(Base):
have not been played.
"""
plrs = session.scalars(
plrs = (
session.execute(
select(cls)
.where(
cls.playlist_id == playlist_id,
@ -609,17 +590,13 @@ class PlaylistRows(Base):
cls.played.is_(False),
)
.order_by(cls.plr_rownum)
).all()
)
.scalars()
.all()
)
return plrs
@classmethod
def insert_row(
cls, session: scoped_session, playlist_id: int, new_row_number: int
) -> "PlaylistRows":
cls.move_rows_down(session, playlist_id, new_row_number, 1)
return cls(session, playlist_id, new_row_number)
@staticmethod
def move_rows_down(
session: scoped_session, playlist_id: int, starting_row: int, move_by: int
@ -638,26 +615,6 @@ class PlaylistRows(Base):
.values(plr_rownum=PlaylistRows.plr_rownum + move_by)
)
@staticmethod
def update_plr_rownumbers(
session: scoped_session, playlist_id: int, sqla_map: List[dict[str, int]]
) -> None:
"""
Take a {plrid: plr_rownum} dictionary and update the row numbers accordingly
"""
# Update database. Ref:
# https://docs.sqlalchemy.org/en/20/tutorial/data_update.html#the-update-sql-expression-construct
stmt = (
update(PlaylistRows)
.where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.id == bindparam("plrid"),
)
.values(plr_rownum=bindparam("plr_rownum"))
)
session.connection().execute(stmt, sqla_map)
class Settings(Base):
"""Manage settings"""
@ -671,10 +628,8 @@ class Settings(Base):
f_string: Mapped[Optional[str]] = mapped_column(String(128), default=None)
def __repr__(self) -> str:
return (
f"<Settings(id={self.id}, name={self.name}, "
f"f_datetime={self.f_datetime}, f_int={self.f_int}, f_string={self.f_string}>"
)
value = self.f_datetime or self.f_int or self.f_string
return f"<Settings(id={self.id}, name={self.name}, {value=}>"
def __init__(self, session: scoped_session, name: str):
self.name = name
@ -689,7 +644,7 @@ class Settings(Base):
result = {}
settings = session.scalars(select(cls)).all()
settings = session.execute(select(cls)).scalars().all()
for setting in settings:
result[setting.name] = setting
@ -705,7 +660,7 @@ class Settings(Base):
except NoResultFound:
return Settings(session, name)
def update(self, session: scoped_session, data: dict) -> None:
def update(self, session: scoped_session, data: dict):
for key, value in data.items():
assert hasattr(self, key)
setattr(self, key, value)
@ -752,7 +707,7 @@ class Tracks(Base):
fade_at: int,
silence_at: int,
mtime: int,
bitrate: int,
bitrate: int
):
self.path = path
self.title = title
@ -776,7 +731,7 @@ class Tracks(Base):
def get_all(cls, session) -> List["Tracks"]:
"""Return a list of all tracks"""
return session.scalars(select(cls)).unique().all()
return session.execute(select(cls)).scalars().unique().all()
@classmethod
def get_by_path(cls, session: scoped_session, path: str) -> Optional["Tracks"]:
@ -785,11 +740,9 @@ class Tracks(Base):
"""
try:
return (
session.execute(select(Tracks).where(Tracks.path == path))
.unique()
.scalar_one()
)
return session.execute(
select(Tracks).where(Tracks.path == path)
).unique().scalar_one()
except NoResultFound:
return None
@ -804,12 +757,13 @@ class Tracks(Base):
"""
return (
session.scalars(
session.execute(
select(cls)
.options(joinedload(Tracks.playdates))
.where(cls.artist.ilike(f"%{text}%"))
.order_by(cls.title)
)
.scalars()
.unique()
.all()
)
@ -824,12 +778,13 @@ class Tracks(Base):
https://docs.sqlalchemy.org/en/20/orm/queryguide/relationships.html#joined-eager-loading
"""
return (
session.scalars(
session.execute(
select(cls)
.options(joinedload(Tracks.playdates))
.where(cls.title.like(f"{text}%"))
.order_by(cls.title)
)
.scalars()
.unique()
.all()
)

View File

@ -1,6 +1,8 @@
# import os
import threading
import vlc # type: ignore
#
from config import Config
from helpers import file_is_unreadable
from typing import Optional
@ -8,7 +10,7 @@ from time import sleep
from log import log
from PyQt6.QtCore import (
from PyQt6.QtCore import ( # type: ignore
QRunnable,
QThreadPool,
)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
# Form implementation generated from reading ui file 'dlg_TrackSelect.ui'
# Form implementation generated from reading ui file 'app/ui/dlg_SearchDatabase.ui'
#
# Created by: PyQt6 UI code generator 6.5.3
# Created by: PyQt6 UI code generator 6.5.2
#
# WARNING: Any manual changes made to this file will be lost when pyuic6 is
# run again. Do not edit this file unless you know what you are doing.

View File

@ -785,6 +785,11 @@ padding-left: 8px;</string>
<string>&amp;Search</string>
</property>
<addaction name="actionSearch"/>
<addaction name="actionFind_next"/>
<addaction name="actionFind_previous"/>
<addaction name="separator"/>
<addaction name="actionSelect_next_track"/>
<addaction name="actionSelect_previous_track"/>
<addaction name="separator"/>
<addaction name="actionSearch_title_in_Wikipedia"/>
<addaction name="actionSearch_title_in_Songfacts"/>

View File

@ -1,6 +1,6 @@
# Form implementation generated from reading ui file 'app/ui/main_window.ui'
#
# Created by: PyQt6 UI code generator 6.6.0
# Created by: PyQt6 UI code generator 6.5.3
#
# WARNING: Any manual changes made to this file will be lost when pyuic6 is
# run again. Do not edit this file unless you know what you are doing.
@ -492,6 +492,11 @@ class Ui_MainWindow(object):
self.menuPlaylist.addAction(self.actionMark_for_moving)
self.menuPlaylist.addAction(self.actionPaste)
self.menuSearc_h.addAction(self.actionSearch)
self.menuSearc_h.addAction(self.actionFind_next)
self.menuSearc_h.addAction(self.actionFind_previous)
self.menuSearc_h.addSeparator()
self.menuSearc_h.addAction(self.actionSelect_next_track)
self.menuSearc_h.addAction(self.actionSelect_previous_track)
self.menuSearc_h.addSeparator()
self.menuSearc_h.addAction(self.actionSearch_title_in_Wikipedia)
self.menuSearc_h.addAction(self.actionSearch_title_in_Songfacts)

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
# #!/usr/bin/env python
#
import os

View File

@ -1,190 +0,0 @@
#!/usr/bin/python3
# vim: set expandtab tabstop=4 shiftwidth=4:
# PyQt Functionality Snippet by Apocalyptech
# "Licensed" in the Public Domain under CC0 1.0 Universal (CC0 1.0)
# Public Domain Dedication. Use it however you like!
#
# https://creativecommons.org/publicdomain/zero/1.0/
# https://creativecommons.org/publicdomain/zero/1.0/legalcode
from PyQt6 import QtWidgets, QtCore
# class MyModel(QtGui.QStandardItemModel):
class MyModel(QtCore.QAbstractTableModel):
def __init__(self, parent=None):
super().__init__(parent)
def columnCount(self, parent=None):
return 5
def rowCount(self, parent=None):
return 20
# def headerData(self, column: int, orientation, role: QtCore.Qt.ItemDataRole):
# return (('Regex', 'Category')[column]
# if role == QtCore.Qt.DisplayRole and orientation == QtCore.Qt.Horizontal
# else None)
def headerData(self, column, orientation, role):
if role == QtCore.Qt.ItemDataRole.DisplayRole and orientation == QtCore.Qt.Orientation.Horizontal:
return f"{column=}"
return None
def data(self, index: QtCore.QModelIndex, role: QtCore.Qt.ItemDataRole):
if not index.isValid() or role not in {
QtCore.Qt.ItemDataRole.DisplayRole,
QtCore.Qt.ItemDataRole.EditRole,
}:
return None
# return (self._data[index.row()][index.column()] if index.row() < len(self._data) else
# "edit me" if role == QtCore.Qt.DisplayRole else "")
# def data(self, index, role):
# if not index.isValid() or role not in [QtCore.Qt.DisplayRole,
# QtCore.Qt.EditRole]:
# return None
# return (self._data[index.row()][index.column()] if index.row() < len(self._data) else
# "edit me" if role == QtCore.Qt.DisplayRole else "")
row = index.row()
column = index.column()
return f"Row {row}, Col {column}"
def flags(self, index: QtCore.QModelIndex) -> QtCore.Qt.ItemFlag:
# https://doc.qt.io/qt-5/qt.html#ItemFlag-enum
if not index.isValid():
return QtCore.Qt.ItemFlag.ItemIsEnabled
if index.row() < 20:
return (
QtCore.Qt.ItemFlag.ItemIsEnabled
| QtCore.Qt.ItemFlag.ItemIsEditable
| QtCore.Qt.ItemFlag.ItemIsSelectable
| QtCore.Qt.ItemFlag.ItemIsDragEnabled
)
return QtCore.Qt.ItemFlag.ItemIsEnabled | QtCore.Qt.ItemFlag.ItemIsEditable
# def flags(self, index):
# if not index.isValid():
# return QtCore.Qt.ItemIsDropEnabled
# if index.row() < 5:
# return (
# QtCore.Qt.ItemIsEnabled
# | QtCore.Qt.ItemIsEditable
# | QtCore.Qt.ItemIsSelectable
# | QtCore.Qt.ItemIsDragEnabled
# )
# return QtCore.Qt.ItemIsEnabled | QtCore.Qt.ItemIsEditable
# def supportedDragOptions(self):
# return QtCore.Qt.MoveAction | QtCore.Qt.CopyAction
# def supportedDropActions(self) -> bool:
# return QtCore.Qt.MoveAction | QtCore.Qt.CopyAction
def relocateRow(self, row_source, row_target) -> None:
return
row_a, row_b = max(row_source, row_target), min(row_source, row_target)
self.beginMoveRows(
QtCore.QModelIndex(), row_a, row_a, QtCore.QModelIndex(), row_b
)
self._data.insert(row_target, self._data.pop(row_source))
self.endMoveRows()
def supportedDropActions(self):
return QtCore.Qt.DropAction.MoveAction | QtCore.Qt.DropAction.CopyAction
# def relocateRow(self, src, dst):
# print("relocateRow")
# def dropMimeData(self, data, action, row, col, parent):
# """
# Always move the entire row, and don't allow column "shifting"
# """
# # return super().dropMimeData(data, action, row, 0, parent)
# print("dropMimeData")
# super().dropMimeData(data, action, row, col, parent)
class MyStyle(QtWidgets.QProxyStyle):
def drawPrimitive(self, element, option, painter, widget=None):
"""
Draw a line across the entire row rather than just the column
we're hovering over. This may not always work depending on global
style - for instance I think it won't work on OSX.
"""
if element == QtWidgets.QStyle.PrimitiveElement.PE_IndicatorItemViewItemDrop and not option.rect.isNull():
option_new = QtWidgets.QStyleOption(option)
option_new.rect.setLeft(0)
if widget:
option_new.rect.setRight(widget.width())
option = option_new
super().drawPrimitive(element, option, painter, widget)
class MyTableView(QtWidgets.QTableView):
def __init__(self, parent):
super().__init__(parent)
self.verticalHeader().hide()
self.setSelectionBehavior(QtWidgets.QAbstractItemView.SelectionBehavior.SelectRows)
self.setSelectionMode(QtWidgets.QAbstractItemView.SelectionMode.ExtendedSelection)
self.setDragDropMode(QtWidgets.QAbstractItemView.DragDropMode.InternalMove)
self.setDragDropOverwriteMode(False)
self.setAcceptDrops(True)
# self.horizontalHeader().hide()
# self.horizontalHeader().setSectionResizeMode(QtWidgets.QHeaderView.Stretch)
# self.setShowGrid(False)
# Set our custom style - this draws the drop indicator across the whole row
self.setStyle(MyStyle())
# Set our custom model - this prevents row "shifting"
# self.model = MyModel()
# self.setModel(self.model)
self.setModel(MyModel())
# for (idx, data) in enumerate(['foo', 'bar', 'baz']):
# item_1 = QtGui.QStandardItem('Item {}'.format(idx))
# item_1.setEditable(False)
# item_1.setDropEnabled(False)
# item_2 = QtGui.QStandardItem(data)
# item_2.setEditable(False)
# item_2.setDropEnabled(False)
# self.model.appendRow([item_1, item_2])
def dropEvent(self, event):
if event.source() is not self or (
event.dropAction() != QtCore.Qt.DropAction.MoveAction
and self.dragDropMode() != QtWidgets.QAbstractItemView.InternalMove
):
super().dropEvent(event)
from_rows = list(set([a.row() for a in self.selectedIndexes()]))
to_row = self.indexAt(event.position().toPoint()).row()
if (
0 <= min(from_rows) <= self.model().rowCount()
and 0 <= max(from_rows) <= self.model().rowCount()
and 0 <= to_row <= self.model().rowCount()
):
print(f"move_rows({from_rows=}, {to_row=})")
event.accept()
super().dropEvent(event)
class Testing(QtWidgets.QMainWindow):
def __init__(self):
super().__init__()
view = MyTableView(self)
view.setModel(MyModel())
self.setCentralWidget(view)
self.show()
if __name__ == "__main__":
app = QtWidgets.QApplication([])
test = Testing()
raise SystemExit(app.exec())

View File

@ -1,84 +0,0 @@
#!/usr/bin/env python3
from sqlalchemy import create_engine, String, update, bindparam, case
from sqlalchemy.orm import (
DeclarativeBase,
Mapped,
mapped_column,
sessionmaker,
scoped_session,
)
from typing import Generator
from contextlib import contextmanager
db_url = "sqlite:////tmp/rhys.db"
class Base(DeclarativeBase):
pass
class Rhys(Base):
__tablename__ = "rhys"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
ref_number: Mapped[int] = mapped_column()
name: Mapped[str] = mapped_column(String(256), index=True)
def __init__(self, session, ref_number: int, name: str) -> None:
self.ref_number = ref_number
self.name = name
session.add(self)
session.flush()
@contextmanager
def Session() -> Generator[scoped_session, None, None]:
Session = scoped_session(sessionmaker(bind=engine))
yield Session
Session.commit()
Session.close()
engine = create_engine(db_url)
Base.metadata.create_all(engine)
inital_number_of_records = 10
def move_rows(session):
new_row = 6
with Session() as session:
# new_record = Rhys(session, new_row, f"new {new_row=}")
# Move rows
stmt = (
update(Rhys)
.where(Rhys.ref_number > new_row)
# .where(Rhys.id.in_(session.query(Rhys.id).order_by(Rhys.id.desc())))
.values({Rhys.ref_number: Rhys.ref_number + 1})
)
session.execute(stmt)
sqla_map = []
for k, v in zip(range(11), [0, 1, 2, 3, 4, 7, 8, 10, 5, 6, 9]):
sqla_map.append({"oldrow": k, "newrow": v})
# for a, b in sqla_map.items():
# print(f"{a} > {b}")
with Session() as session:
for a in range(inital_number_of_records):
_ = Rhys(session, a, f"record: {a}")
stmt = update(Rhys).values(
ref_number=case(
{item['oldrow']: item['newrow'] for item in sqla_map},
value=Rhys.ref_number
)
)
session.connection().execute(stmt, sqla_map)

View File

@ -1,98 +0,0 @@
#!/usr/bin/env python
# vim: set expandtab tabstop=4 shiftwidth=4:
# PyQt Functionality Snippet by Apocalyptech
# "Licensed" in the Public Domain under CC0 1.0 Universal (CC0 1.0)
# Public Domain Dedication. Use it however you like!
#
# https://creativecommons.org/publicdomain/zero/1.0/
# https://creativecommons.org/publicdomain/zero/1.0/legalcode
import sys
from PyQt5 import QtWidgets, QtGui, QtCore
class MyModel(QtGui.QStandardItemModel):
def dropMimeData(self, data, action, row, col, parent):
"""
Always move the entire row, and don't allow column "shifting"
"""
return super().dropMimeData(data, action, row, 0, parent)
class MyStyle(QtWidgets.QProxyStyle):
def drawPrimitive(self, element, option, painter, widget=None):
"""
Draw a line across the entire row rather than just the column
we're hovering over. This may not always work depending on global
style - for instance I think it won't work on OSX.
"""
if element == self.PE_IndicatorItemViewItemDrop and not option.rect.isNull():
option_new = QtWidgets.QStyleOption(option)
option_new.rect.setLeft(0)
if widget:
option_new.rect.setRight(widget.width())
option = option_new
super().drawPrimitive(element, option, painter, widget)
class MyTableView(QtWidgets.QTableView):
def __init__(self, parent):
super().__init__(parent)
self.verticalHeader().hide()
self.horizontalHeader().hide()
self.horizontalHeader().setSectionResizeMode(QtWidgets.QHeaderView.Stretch)
self.setSelectionBehavior(self.SelectRows)
self.setSelectionMode(self.SingleSelection)
self.setShowGrid(False)
self.setDragDropMode(self.InternalMove)
self.setDragDropOverwriteMode(False)
# Set our custom style - this draws the drop indicator across the whole row
self.setStyle(MyStyle())
# Set our custom model - this prevents row "shifting"
self.model = MyModel()
self.setModel(self.model)
for (idx, data) in enumerate(['foo', 'bar', 'baz']):
item_1 = QtGui.QStandardItem('Item {}'.format(idx))
item_1.setEditable(False)
item_1.setDropEnabled(False)
item_2 = QtGui.QStandardItem(data)
item_2.setEditable(False)
item_2.setDropEnabled(False)
self.model.appendRow([item_1, item_2])
class Testing(QtWidgets.QMainWindow):
def __init__(self):
super().__init__()
# Main widget
w = QtWidgets.QWidget()
l = QtWidgets.QVBoxLayout()
w.setLayout(l)
self.setCentralWidget(w)
# spacer
l.addWidget(QtWidgets.QLabel('top'), 1)
# Combo Box
l.addWidget(MyTableView(self))
# spacer
l.addWidget(QtWidgets.QLabel('bottom'), 1)
# A bit of window housekeeping
self.resize(400, 400)
self.setWindowTitle('Testing')
self.show()
if __name__ == '__main__':
app = QtWidgets.QApplication([])
test = Testing()
sys.exit(app.exec_())

BIN
archive/todo/.DS_Store vendored

Binary file not shown.

View File

@ -1 +0,0 @@
[[false, "My first todo"], [true, "My second todo"], [true, "Another todo"], [false, "as"]]

View File

@ -1,71 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>MainWindow</class>
<widget class="QMainWindow" name="MainWindow">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>275</width>
<height>314</height>
</rect>
</property>
<property name="windowTitle">
<string>Todo</string>
</property>
<widget class="QWidget" name="centralwidget">
<layout class="QVBoxLayout" name="verticalLayout">
<item>
<widget class="QListView" name="todoView">
<property name="selectionMode">
<enum>QAbstractItemView::SingleSelection</enum>
</property>
</widget>
</item>
<item>
<widget class="QWidget" name="widget" native="true">
<layout class="QHBoxLayout" name="horizontalLayout">
<item>
<widget class="QPushButton" name="deleteButton">
<property name="text">
<string>Delete</string>
</property>
</widget>
</item>
<item>
<widget class="QPushButton" name="completeButton">
<property name="text">
<string>Complete</string>
</property>
</widget>
</item>
</layout>
</widget>
</item>
<item>
<widget class="QLineEdit" name="todoEdit"/>
</item>
<item>
<widget class="QPushButton" name="addButton">
<property name="text">
<string>Add Todo</string>
</property>
</widget>
</item>
</layout>
</widget>
<widget class="QMenuBar" name="menubar">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>275</width>
<height>22</height>
</rect>
</property>
</widget>
<widget class="QStatusBar" name="statusbar"/>
</widget>
<resources/>
<connections/>
</ui>

Binary file not shown.

Before

Width:  |  Height:  |  Size: 634 B

View File

@ -1,106 +0,0 @@
import sys
import datetime
import json
from PyQt5 import QtCore, QtGui, QtWidgets, uic
from PyQt5.QtCore import Qt
qt_creator_file = "mainwindow.ui"
Ui_MainWindow, QtBaseClass = uic.loadUiType(qt_creator_file)
tick = QtGui.QImage('tick.png')
class TodoModel(QtCore.QAbstractListModel):
def __init__(self, *args, todos=None, **kwargs):
super(TodoModel, self).__init__(*args, **kwargs)
self.todos = todos or []
def data(self, index, role):
if role == Qt.DisplayRole:
_, text = self.todos[index.row()]
return text
if role == Qt.DecorationRole:
status, _ = self.todos[index.row()]
if status:
return tick
def rowCount(self, index):
return len(self.todos)
def flags(self, index):
print(datetime.datetime.now().time().strftime("%H:%M:%S"))
return super().flags(index)
class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow):
def __init__(self):
QtWidgets.QMainWindow.__init__(self)
Ui_MainWindow.__init__(self)
self.setupUi(self)
self.model = TodoModel()
self.load()
self.todoView.setModel(self.model)
self.addButton.pressed.connect(self.add)
self.deleteButton.pressed.connect(self.delete)
self.completeButton.pressed.connect(self.complete)
def add(self):
"""
Add an item to our todo list, getting the text from the QLineEdit .todoEdit
and then clearing it.
"""
text = self.todoEdit.text()
if text: # Don't add empty strings.
# Access the list via the model.
self.model.todos.append((False, text))
# Trigger refresh.
self.model.layoutChanged.emit()
# Empty the input
self.todoEdit.setText("")
self.save()
def delete(self):
indexes = self.todoView.selectedIndexes()
if indexes:
# Indexes is a list of a single item in single-select mode.
index = indexes[0]
# Remove the item and refresh.
del self.model.todos[index.row()]
self.model.layoutChanged.emit()
# Clear the selection (as it is no longer valid).
self.todoView.clearSelection()
self.save()
def complete(self):
indexes = self.todoView.selectedIndexes()
if indexes:
index = indexes[0]
row = index.row()
status, text = self.model.todos[row]
self.model.todos[row] = (True, text)
# .dataChanged takes top-left and bottom right, which are equal
# for a single selection.
self.model.dataChanged.emit(index, index)
# Clear the selection (as it is no longer valid).
self.todoView.clearSelection()
self.save()
def load(self):
try:
with open('data.db', 'r') as f:
self.model.todos = json.load(f)
except Exception:
pass
def save(self):
with open('data.db', 'w') as f:
data = json.dump(self.model.todos, f)
app = QtWidgets.QApplication(sys.argv)
window = MainWindow()
window.show()
app.exec_()

View File

@ -1,49 +1,41 @@
# https://itnext.io/setting-up-transactional-tests-with-pytest-and-sqlalchemy-b2d726347629
import pytest
import helpers
# Flake8 doesn't like the sys.append within imports
# import sys
# sys.path.append("app")
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from app.models import Base, Tracks
DB_CONNECTION = "mysql+mysqldb://musicmuster_testing:musicmuster_testing@localhost/dev_musicmuster_testing"
@pytest.fixture(scope="session")
def connection():
engine = create_engine(
"mysql+mysqldb://musicmuster_testing:musicmuster_testing@"
"localhost/musicmuster_testing"
)
return engine.connect()
@pytest.fixture(scope="session")
def db_engine():
engine = create_engine(DB_CONNECTION, isolation_level="READ COMMITTED")
Base.metadata.create_all(engine)
yield engine
engine.dispose()
def setup_database(connection):
from app.models import Base # noqa E402
Base.metadata.bind = connection
Base.metadata.create_all()
# seed_database()
yield
Base.metadata.drop_all()
@pytest.fixture(scope="function")
def session(db_engine):
connection = db_engine.connect()
@pytest.fixture
def session(setup_database, connection):
transaction = connection.begin()
sm = sessionmaker(bind=connection)
session = scoped_session(sm)
# print(f"PyTest SqlA: session acquired [{hex(id(session))}]")
yield session
# print(f" PyTest SqlA: session released and cleaned up [{hex(id(session))}]")
session.remove()
yield scoped_session(
sessionmaker(autocommit=False, autoflush=False, bind=connection)
)
transaction.rollback()
connection.close()
@pytest.fixture(scope="function")
def track1(session):
track_path = "testdata/isa.mp3"
metadata = helpers.get_file_metadata(track_path)
track = Tracks(session, **metadata)
return track
@pytest.fixture(scope="function")
def track2(session):
track_path = "testdata/mom.mp3"
metadata = helpers.get_file_metadata(track_path)
track = Tracks(session, **metadata)
return track

View File

@ -1,60 +0,0 @@
"""Add 'open' field to Playlists
Revision ID: 5bb2c572e1e5
Revises: 3a53a9fb26ab
Create Date: 2023-11-18 14:19:02.643914
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision = '5bb2c572e1e5'
down_revision = '3a53a9fb26ab'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('carts', 'duration',
existing_type=mysql.INTEGER(display_width=11),
nullable=True)
op.alter_column('carts', 'path',
existing_type=mysql.VARCHAR(length=2048),
nullable=True)
op.alter_column('carts', 'enabled',
existing_type=mysql.TINYINT(display_width=1),
nullable=True)
op.alter_column('playlist_rows', 'note',
existing_type=mysql.VARCHAR(length=2048),
nullable=False)
op.add_column('playlists', sa.Column('open', sa.Boolean(), nullable=False))
op.alter_column('settings', 'name',
existing_type=mysql.VARCHAR(length=32),
type_=sa.String(length=64),
existing_nullable=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('settings', 'name',
existing_type=sa.String(length=64),
type_=mysql.VARCHAR(length=32),
existing_nullable=False)
op.drop_column('playlists', 'open')
op.alter_column('playlist_rows', 'note',
existing_type=mysql.VARCHAR(length=2048),
nullable=True)
op.alter_column('carts', 'enabled',
existing_type=mysql.TINYINT(display_width=1),
nullable=False)
op.alter_column('carts', 'path',
existing_type=mysql.VARCHAR(length=2048),
nullable=False)
op.alter_column('carts', 'duration',
existing_type=mysql.INTEGER(display_width=11),
nullable=False)
# ### end Alembic commands ###

913
poetry.lock generated

File diff suppressed because it is too large Load Diff

BIN
prof/combined.prof Normal file

Binary file not shown.

3833
prof/combined.svg Normal file

File diff suppressed because it is too large Load Diff

After

Width:  |  Height:  |  Size: 262 KiB

Binary file not shown.

View File

@ -36,13 +36,12 @@ line-profiler = "^4.0.2"
flakehell = "^0.9.0"
[tool.poetry.group.dev.dependencies]
pudb = "^2023.1"
pudb = "^2022.1.3"
sphinx = "^7.0.1"
furo = "^2023.5.20"
black = "^23.3.0"
flakehell = "^0.9.0"
mypy = "^1.7.0"
pdbp = "^1.5.0"
mypy = "^1.6.0"
[build-system]
requires = ["poetry-core>=1.0.0"]
@ -52,11 +51,6 @@ build-backend = "poetry.core.masonry.api"
# mypy_path = "/home/kae/.cache/pypoetry/virtualenvs/musicmuster-oWgGw1IG-py3.9:/home/kae/git/musicmuster/app"
mypy_path = "/home/kae/git/musicmuster/app"
[tool.pytest.ini_options]
addopts = "--exitfirst --showlocals --capture=no"
pythonpath = [".", "app"]
filterwarnings = "ignore:'audioop' is deprecated"
[tool.vulture]
exclude = ["migrations", "app/ui", "archive"]
paths = ["app"]

2
pytest.ini Normal file
View File

@ -0,0 +1,2 @@
[pytest]
addopts = -xls

33
test.py Executable file
View File

@ -0,0 +1,33 @@
#!/usr/bin/env python
from PyQt5 import QtGui, QtWidgets
class TabBar(QtWidgets.QTabBar):
def paintEvent(self, event):
painter = QtWidgets.QStylePainter(self)
option = QtWidgets.QStyleOptionTab()
for index in range(self.count()):
self.initStyleOption(option, index)
bgcolor = QtGui.QColor(self.tabText(index))
option.palette.setColor(QtGui.QPalette.Window, bgcolor)
painter.drawControl(QtWidgets.QStyle.CE_TabBarTabShape, option)
painter.drawControl(QtWidgets.QStyle.CE_TabBarTabLabel, option)
class Window(QtWidgets.QTabWidget):
def __init__(self):
QtWidgets.QTabWidget.__init__(self)
self.setTabBar(TabBar(self))
for color in "tomato orange yellow lightgreen skyblue plum".split():
self.addTab(QtWidgets.QWidget(self), color)
if __name__ == "__main__":
import sys
app = QtWidgets.QApplication(sys.argv)
window = Window()
window.resize(420, 200)
window.show()
sys.exit(app.exec_())

View File

@ -45,7 +45,7 @@ def test_get_relative_date():
assert get_relative_date(None) == "Never"
today_at_10 = datetime.now().replace(hour=10, minute=0)
today_at_11 = datetime.now().replace(hour=11, minute=0)
assert get_relative_date(today_at_10, today_at_11) == "Today 10:00"
assert get_relative_date(today_at_10, today_at_11) == "10:00"
eight_days_ago = today_at_10 - timedelta(days=8)
assert get_relative_date(eight_days_ago, today_at_11) == "1 week, 1 day ago"
sixteen_days_ago = today_at_10 - timedelta(days=16)

View File

@ -1,9 +1,8 @@
import os.path
import helpers
from app.models import (
NoteColours,
Notes,
Playdates,
Playlists,
Tracks,
@ -13,7 +12,6 @@ from app.models import (
def test_notecolours_get_colour(session):
"""Create a colour record and retrieve all colours"""
print(">>>text_notcolours_get_colour")
note_colour = "#0bcdef"
NoteColours(session, substring="substring", colour=note_colour)
@ -26,7 +24,6 @@ def test_notecolours_get_colour(session):
def test_notecolours_get_all(session):
"""Create two colour records and retrieve them all"""
print(">>>text_notcolours_get_all")
note1_colour = "#1bcdef"
note2_colour = "#20ff00"
NoteColours(session, substring="note1", colour=note1_colour)
@ -55,92 +52,185 @@ def test_notecolours_get_colour_match(session):
assert result == note_colour
def test_playdates_add_playdate(session, track1):
def test_notes_creation(session):
# We need a playlist
playlist = Playlists(session, "my playlist")
note_text = "note text"
note = Notes(session, playlist.id, 0, note_text)
assert note
notes = session.query(Notes).all()
assert len(notes) == 1
assert notes[0].note == note_text
def test_notes_delete(session):
# We need a playlist
playlist = Playlists(session, "my playlist")
note_text = "note text"
note = Notes(session, playlist.id, 0, note_text)
assert note
notes = session.query(Notes).all()
assert len(notes) == 1
assert notes[0].note == note_text
note.delete_note(session)
notes = session.query(Notes).all()
assert len(notes) == 0
def test_notes_update_row_only(session):
# We need a playlist
playlist = Playlists(session, "my playlist")
note_text = "note text"
note = Notes(session, playlist.id, 0, note_text)
new_row = 10
note.update_note(session, new_row)
notes = session.query(Notes).all()
assert len(notes) == 1
assert notes[0].row == new_row
def test_notes_update_text(session):
# We need a playlist
playlist = Playlists(session, "my playlist")
note_text = "note text"
note = Notes(session, playlist.id, 0, note_text)
new_text = "This is new"
new_row = 0
note.update_note(session, new_row, new_text)
notes = session.query(Notes).all()
assert len(notes) == 1
assert notes[0].note == new_text
assert notes[0].row == new_row
def test_playdates_add_playdate(session):
"""Test playdate and last_played retrieval"""
playdate = Playdates(session, track1.id)
# We need a track
track_path = "/a/b/c"
track = Tracks(session, track_path)
playdate = Playdates(session, track.id)
assert playdate
last_played = Playdates.last_played(session, track1.id)
last_played = Playdates.last_played(session, track.id)
assert abs((playdate.lastplayed - last_played).total_seconds()) < 2
def test_playdates_remove_track(session):
"""Test removing a track from a playdate"""
# We need a track
track_path = "/a/b/c"
track = Tracks(session, track_path)
Playdates.remove_track(session, track.id)
last_played = Playdates.last_played(session, track.id)
assert last_played is None
def test_playlist_create(session):
playlist = Playlists(session, "my playlist")
assert playlist
# def test_playlist_add_track(session, track):
# # We need a playlist
# playlist = Playlists(session, "my playlist")
def test_playlist_add_note(session):
note_text = "my note"
# row = 17
playlist = Playlists(session, "my playlist")
# playlist.add_track(session, track.id, row)
# assert len(playlist.tracks) == 1
# playlist_track = playlist.tracks[row]
# assert playlist_track.path == track_path
assert len(playlist.notes) == 1
playlist_note = playlist.notes[0]
assert playlist_note.note == note_text
# def test_playlist_tracks(session):
# # We need a playlist
# playlist = Playlists(session, "my playlist")
def test_playlist_add_track(session):
# We need a playlist
playlist = Playlists(session, "my playlist")
# # We need two tracks
# track1_path = "/a/b/c"
# track1_row = 17
# track1 = Tracks(session, track1_path)
# We need a track
track_path = "/a/b/c"
track = Tracks(session, track_path)
# track2_path = "/x/y/z"
# track2_row = 29
# track2 = Tracks(session, track2_path)
row = 17
# playlist.add_track(session, track1.id, track1_row)
# playlist.add_track(session, track2.id, track2_row)
playlist.add_track(session, track.id, row)
# tracks = playlist.tracks
# assert tracks[track1_row] == track1
# assert tracks[track2_row] == track2
assert len(playlist.tracks) == 1
playlist_track = playlist.tracks[row]
assert playlist_track.path == track_path
# def test_playlist_notes(session):
# # We need a playlist
# playlist = Playlists(session, "my playlist")
def test_playlist_tracks(session):
# We need a playlist
playlist = Playlists(session, "my playlist")
# # We need two notes
# note1_text = "note1 text"
# note1_row = 11
# _ = Notes(session, playlist.id, note1_row, note1_text)
# We need two tracks
track1_path = "/a/b/c"
track1_row = 17
track1 = Tracks(session, track1_path)
# note2_text = "note2 text"
# note2_row = 19
# _ = Notes(session, playlist.id, note2_row, note2_text)
track2_path = "/x/y/z"
track2_row = 29
track2 = Tracks(session, track2_path)
# notes = playlist.notes
# assert note1_text in [n.note for n in notes]
# assert note1_row in [n.row for n in notes]
# assert note2_text in [n.note for n in notes]
# assert note2_row in [n.row for n in notes]
playlist.add_track(session, track1.id, track1_row)
playlist.add_track(session, track2.id, track2_row)
tracks = playlist.tracks
assert tracks[track1_row] == track1
assert tracks[track2_row] == track2
def test_playlist_notes(session):
# We need a playlist
playlist = Playlists(session, "my playlist")
# We need two notes
note1_text = "note1 text"
note1_row = 11
_ = Notes(session, playlist.id, note1_row, note1_text)
note2_text = "note2 text"
note2_row = 19
_ = Notes(session, playlist.id, note2_row, note2_text)
notes = playlist.notes
assert note1_text in [n.note for n in notes]
assert note1_row in [n.row for n in notes]
assert note2_text in [n.note for n in notes]
assert note2_row in [n.row for n in notes]
def test_playlist_open_and_close(session):
# We need a playlist
playlist = Playlists(session, "my playlist")
assert len(Playlists.get_open(session)) == 0
assert len(Playlists.get_closed(session)) == 1
playlist.mark_open()
assert len(Playlists.get_open(session)) == 1
assert len(Playlists.get_closed(session)) == 0
playlist.close()
playlist.close(session)
assert len(Playlists.get_open(session)) == 0
assert len(Playlists.get_closed(session)) == 1
playlist.mark_open(session)
assert len(Playlists.get_open(session)) == 1
assert len(Playlists.get_closed(session)) == 0
def test_playlist_get_all_and_by_id(session):
# We need two playlists
@ -153,34 +243,250 @@ def test_playlist_get_all_and_by_id(session):
assert len(all_playlists) == 2
assert p1_name in [p.name for p in all_playlists]
assert p2_name in [p.name for p in all_playlists]
assert session.get(Playlists, playlist1.id).name == p1_name
assert Playlists.get_by_id(session, playlist1.id).name == p1_name
def test_tracks_get_all_tracks(session, track1, track2):
def test_playlist_remove_tracks(session):
# Need two playlists and three tracks
p1_name = "playlist one"
playlist1 = Playlists(session, p1_name)
p2_name = "playlist two"
playlist2 = Playlists(session, p2_name)
track1_path = "/a/b/c"
track1 = Tracks(session, track1_path)
track2_path = "/m/n/o"
track2 = Tracks(session, track2_path)
track3_path = "/x/y/z"
track3 = Tracks(session, track3_path)
# Add all tracks to both playlists
for p in [playlist1, playlist2]:
for t in [track1, track2, track3]:
p.add_track(session, t.id)
assert len(playlist1.tracks) == 3
assert len(playlist2.tracks) == 3
playlist1.remove_track(session, 1)
assert len(playlist1.tracks) == 2
# Check the track itself still exists
original_track = Tracks.get_by_id(session, track1.id)
assert original_track
playlist1.remove_all_tracks(session)
assert len(playlist1.tracks) == 0
assert len(playlist2.tracks) == 3
def test_playlist_get_track_playlists(session):
# Need two playlists and two tracks
p1_name = "playlist one"
playlist1 = Playlists(session, p1_name)
p2_name = "playlist two"
playlist2 = Playlists(session, p2_name)
track1_path = "/a/b/c"
track1 = Tracks(session, track1_path)
track2_path = "/m/n/o"
track2 = Tracks(session, track2_path)
# Put track1 in both playlists, track2 only in playlist1
playlist1.add_track(session, track1.id)
playlist2.add_track(session, track1.id)
playlist1.add_track(session, track2.id)
playlists_track1 = track1.playlists
playlists_track2 = track2.playlists
assert p1_name in [a.playlist.name for a in playlists_track1]
assert p2_name in [a.playlist.name for a in playlists_track1]
assert p1_name in [a.playlist.name for a in playlists_track2]
assert p2_name not in [a.playlist.name for a in playlists_track2]
def test_playlist_move_track(session):
# We need two playlists
p1_name = "playlist one"
p2_name = "playlist two"
playlist1 = Playlists(session, p1_name)
playlist2 = Playlists(session, p2_name)
# Need two tracks
track1_row = 17
track1_path = "/a/b/c"
track1 = Tracks(session, track1_path)
track2_row = 29
track2_path = "/m/n/o"
track2 = Tracks(session, track2_path)
result = [a.path for a in Tracks.get_all(session)]
assert track1.path in result
assert track2.path in result
# Add both to playlist1 and check
playlist1.add_track(session, track1.id, track1_row)
playlist1.add_track(session, track2.id, track2_row)
tracks = playlist1.tracks
assert tracks[track1_row] == track1
assert tracks[track2_row] == track2
# Move track2 to playlist2 and check
playlist1.move_track(session, [track2_row], playlist2)
tracks1 = playlist1.tracks
tracks2 = playlist2.tracks
assert len(tracks1) == 1
assert len(tracks2) == 1
assert tracks1[track1_row] == track1
assert tracks2[0] == track2
def test_tracks_by_path(session, track1):
def test_tracks_get_all_paths(session):
# Need two tracks
track1_path = "/a/b/c"
_ = Tracks(session, track1_path)
track2_path = "/m/n/o"
_ = Tracks(session, track2_path)
assert Tracks.get_by_path(session, track1.path) is track1
result = Tracks.get_all_paths(session)
assert track1_path in result
assert track2_path in result
def test_tracks_by_id(session, track1):
def test_tracks_get_all_tracks(session):
# Need two tracks
track1_path = "/a/b/c"
track2_path = "/m/n/o"
assert session.get(Tracks, track1.id) is track1
result = Tracks.get_all_tracks(session)
assert track1_path in [a.path for a in result]
assert track2_path in [a.path for a in result]
def test_tracks_search_artists(session, track1):
track1_artist = "Fleetwood Mac"
def test_tracks_by_filename(session):
track1_path = "/a/b/c"
track1 = Tracks(session, track1_path)
assert Tracks.get_by_filename(session, os.path.basename(track1_path)) is track1
def test_tracks_by_path(session):
track1_path = "/a/b/c"
track1 = Tracks(session, track1_path)
assert Tracks.get_by_path(session, track1_path) is track1
def test_tracks_by_id(session):
track1_path = "/a/b/c"
track1 = Tracks(session, track1_path)
assert Tracks.get_by_id(session, track1.id) is track1
def test_tracks_rescan(session):
# Get test track
test_track_path = "./testdata/isa.mp3"
test_track_data = "./testdata/isa.py"
track = Tracks(session, test_track_path)
track.rescan(session)
# Get test data
with open(test_track_data) as f:
testdata = eval(f.read())
# Re-read the track
track_read = Tracks.get_by_path(session, test_track_path)
assert track_read.duration == testdata["duration"]
assert track_read.start_gap == testdata["leading_silence"]
# Silence detection can vary, so ± 1 second is OK
assert track_read.fade_at < testdata["fade_at"] + 1000
assert track_read.fade_at > testdata["fade_at"] - 1000
assert track_read.silence_at < testdata["trailing_silence"] + 1000
assert track_read.silence_at > testdata["trailing_silence"] - 1000
def test_tracks_remove_by_path(session):
track1_path = "/a/b/c"
assert len(Tracks.get_all_tracks(session)) == 1
Tracks.remove_by_path(session, track1_path)
assert len(Tracks.get_all_tracks(session)) == 0
def test_tracks_search_artists(session):
track1_path = "/a/b/c"
track1_artist = "Artist One"
track1 = Tracks(session, track1_path)
track1.artist = track1_artist
track2_path = "/m/n/o"
track2_artist = "Artist Two"
track2 = Tracks(session, track2_path)
track2.artist = track2_artist
session.commit()
artist_first_word = track1_artist.split()[0].lower()
assert len(Tracks.search_artists(session, artist_first_word)) == 2
assert len(Tracks.search_artists(session, track1_artist)) == 1
def test_tracks_search_titles(session, track1):
track1_title = "I'm So Afraid"
def test_tracks_search_titles(session):
track1_path = "/a/b/c"
track1_title = "Title One"
track1 = Tracks(session, track1_path)
track1.title = track1_title
track2_path = "/m/n/o"
track2_title = "Title Two"
track2 = Tracks(session, track2_path)
track2.title = track2_title
session.commit()
title_first_word = track1_title.split()[0].lower()
assert len(Tracks.search_titles(session, title_first_word)) == 2
assert len(Tracks.search_titles(session, track1_title)) == 1
def test_tracks_update_lastplayed(session):
track1_path = "/a/b/c"
track1 = Tracks(session, track1_path)
assert track1.lastplayed is None
track1.update_lastplayed(session, track1.id)
assert track1.lastplayed is not None
def test_tracks_update_info(session):
path = "/a/b/c"
artist = "The Beatles"
title = "Help!"
newinfo = "abcdef"
track1 = Tracks(session, path)
track1.artist = artist
track1.title = title
test1 = Tracks.get_by_id(session, track1.id)
assert test1.artist == artist
assert test1.title == title
assert test1.path == path
track1.path = newinfo
test2 = Tracks.get_by_id(session, track1.id)
assert test2.artist == artist
assert test2.title == title
assert test2.path == newinfo
track1.artist = newinfo
test2 = Tracks.get_by_id(session, track1.id)
assert test2.artist == newinfo
assert test2.title == title
assert test2.path == newinfo
track1.title = newinfo
test3 = Tracks.get_by_id(session, track1.id)
assert test3.artist == newinfo
assert test3.title == newinfo
assert test3.path == newinfo

View File

@ -1,380 +0,0 @@
from pprint import pprint
from typing import Optional
from app.models import (
Playlists,
Tracks,
)
from PyQt6.QtCore import Qt, QModelIndex
from app.helpers import get_file_metadata
from app import playlistmodel
from dbconfig import scoped_session
test_tracks = [
"testdata/isa.mp3",
"testdata/isa_with_gap.mp3",
"testdata/loser.mp3",
"testdata/lovecats-10seconds.mp3",
"testdata/lovecats.mp3",
"testdata/mom.mp3",
"testdata/sitting.mp3",
]
def create_model_with_tracks(session: scoped_session, name: Optional[str] = None) -> "playlistmodel.PlaylistModel":
playlist = Playlists(session, name or "test playlist")
model = playlistmodel.PlaylistModel(playlist.id)
for row in range(len(test_tracks)):
track_path = test_tracks[row % len(test_tracks)]
metadata = get_file_metadata(track_path)
track = Tracks(session, **metadata)
model.insert_row(proposed_row_number=row, track_id=track.id, note=f"{row=}")
session.commit()
return model
def create_model_with_playlist_rows(
session: scoped_session, rows: int, name: Optional[str] = None
) -> "playlistmodel.PlaylistModel":
playlist = Playlists(session, name or "test playlist")
# Create a model
model = playlistmodel.PlaylistModel(playlist.id)
for row in range(rows):
model.insert_row(proposed_row_number=row, note=str(row))
session.commit()
return model
def test_11_row_playlist(monkeypatch, session):
# Create multirow playlist
monkeypatch.setattr(playlistmodel, "Session", session)
model = create_model_with_playlist_rows(session, 11)
assert model.rowCount() == 11
assert max(model.playlist_rows.keys()) == 10
for row in range(model.rowCount()):
assert row in model.playlist_rows
assert model.playlist_rows[row].plr_rownum == row
def test_move_rows_test2(monkeypatch, session):
# move row 3 to row 5
monkeypatch.setattr(playlistmodel, "Session", session)
model = create_model_with_playlist_rows(session, 11)
model.move_rows([3], 5)
# Check we have all rows and plr_rownums are correct
for row in range(model.rowCount()):
assert row in model.playlist_rows
assert model.playlist_rows[row].plr_rownum == row
if row not in [3, 4, 5]:
assert model.playlist_rows[row].note == str(row)
elif row == 3:
assert model.playlist_rows[row].note == str(4)
elif row == 4:
assert model.playlist_rows[row].note == str(5)
elif row == 5:
assert model.playlist_rows[row].note == str(3)
def test_move_rows_test3(monkeypatch, session):
# move row 4 to row 3
monkeypatch.setattr(playlistmodel, "Session", session)
model = create_model_with_playlist_rows(session, 11)
model.move_rows([4], 3)
# Check we have all rows and plr_rownums are correct
for row in range(model.rowCount()):
assert row in model.playlist_rows
assert model.playlist_rows[row].plr_rownum == row
if row not in [3, 4]:
assert model.playlist_rows[row].note == str(row)
elif row == 3:
assert model.playlist_rows[row].note == str(4)
elif row == 4:
assert model.playlist_rows[row].note == str(3)
def test_move_rows_test4(monkeypatch, session):
# move row 4 to row 2
monkeypatch.setattr(playlistmodel, "Session", session)
model = create_model_with_playlist_rows(session, 11)
model.move_rows([4], 2)
# Check we have all rows and plr_rownums are correct
for row in range(model.rowCount()):
assert row in model.playlist_rows
assert model.playlist_rows[row].plr_rownum == row
if row not in [2, 3, 4]:
assert model.playlist_rows[row].note == str(row)
elif row == 2:
assert model.playlist_rows[row].note == str(4)
elif row == 3:
assert model.playlist_rows[row].note == str(2)
elif row == 4:
assert model.playlist_rows[row].note == str(3)
def test_move_rows_test5(monkeypatch, session):
# move rows [1, 4, 5, 10] → 8
monkeypatch.setattr(playlistmodel, "Session", session)
model = create_model_with_playlist_rows(session, 11)
model.move_rows([1, 4, 5, 10], 8)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(model.rowCount()):
assert row in model.playlist_rows
assert model.playlist_rows[row].plr_rownum == row
new_order.append(int(model.playlist_rows[row].note))
assert new_order == [0, 2, 3, 6, 7, 8, 9, 1, 4, 5, 10]
def test_move_rows_test6(monkeypatch, session):
# move rows [3, 6] → 5
monkeypatch.setattr(playlistmodel, "Session", session)
model = create_model_with_playlist_rows(session, 11)
model.move_rows([3, 6], 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(model.rowCount()):
assert row in model.playlist_rows
assert model.playlist_rows[row].plr_rownum == row
new_order.append(int(model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 4, 5, 3, 6, 7, 8, 9, 10]
def test_move_rows_test7(monkeypatch, session):
# move rows [3, 5, 6] → 8
monkeypatch.setattr(playlistmodel, "Session", session)
model = create_model_with_playlist_rows(session, 11)
model.move_rows([3, 5, 6], 8)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(model.rowCount()):
assert row in model.playlist_rows
assert model.playlist_rows[row].plr_rownum == row
new_order.append(int(model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 4, 7, 8, 9, 10, 3, 5, 6]
def test_move_rows_test8(monkeypatch, session):
# move rows [7, 8, 10] → 5
monkeypatch.setattr(playlistmodel, "Session", session)
model = create_model_with_playlist_rows(session, 11)
model.move_rows([7, 8, 10], 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(model.rowCount()):
assert row in model.playlist_rows
assert model.playlist_rows[row].plr_rownum == row
new_order.append(int(model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 3, 4, 7, 8, 10, 5, 6, 9]
def test_insert_header_row_end(monkeypatch, session):
# insert header row at end of playlist
monkeypatch.setattr(playlistmodel, "Session", session)
note_text = "test text"
initial_row_count = 11
model = create_model_with_playlist_rows(session, initial_row_count)
model.insert_row(proposed_row_number=None, note=note_text)
assert model.rowCount() == initial_row_count + 1
prd = model.playlist_rows[model.rowCount() - 1]
# Test against edit_role because display_role for headers is
# handled differently (sets up row span)
assert (
model.edit_role(model.rowCount() - 1, playlistmodel.Col.NOTE.value, prd)
== note_text
)
def test_insert_header_row_middle(monkeypatch, session):
# insert header row in middle of playlist
monkeypatch.setattr(playlistmodel, "Session", session)
note_text = "test text"
initial_row_count = 11
insert_row = 6
model = create_model_with_playlist_rows(session, initial_row_count)
model.insert_row(proposed_row_number=insert_row, note=note_text)
assert model.rowCount() == initial_row_count + 1
prd = model.playlist_rows[insert_row]
# Test against edit_role because display_role for headers is
# handled differently (sets up row span)
assert (
model.edit_role(model.rowCount() - 1, playlistmodel.Col.NOTE.value, prd)
== note_text
)
def test_create_model_with_tracks(monkeypatch, session):
monkeypatch.setattr(playlistmodel, "Session", session)
model = create_model_with_tracks(session)
assert len(model.playlist_rows) == len(test_tracks)
def test_timing_one_track(monkeypatch, session):
START_ROW = 0
END_ROW = 2
monkeypatch.setattr(playlistmodel, "Session", session)
model = create_model_with_tracks(session)
model.insert_row(proposed_row_number=START_ROW, note="start+")
model.insert_row(proposed_row_number=END_ROW, note="-")
prd = model.playlist_rows[START_ROW]
qv_value = model.display_role(START_ROW, playlistmodel.HEADER_NOTES_COLUMN, prd)
assert qv_value.value() == "start [1 tracks, 4:23 unplayed]"
def test_insert_track_new_playlist(monkeypatch, session):
# insert a track into a new playlist
monkeypatch.setattr(playlistmodel, "Session", session)
playlist = Playlists(session, "test playlist")
# Create a model
model = playlistmodel.PlaylistModel(playlist.id)
track_path = test_tracks[0]
metadata = get_file_metadata(track_path)
track = Tracks(session, **metadata)
model.insert_row(proposed_row_number=0, track_id=track.id)
prd = model.playlist_rows[model.rowCount() - 1]
assert (
model.edit_role(model.rowCount() - 1, playlistmodel.Col.TITLE.value, prd)
== metadata["title"]
)
def test_reverse_row_groups_one_row(monkeypatch, session):
monkeypatch.setattr(playlistmodel, "Session", session)
rows_to_move = [3]
model_src = create_model_with_playlist_rows(session, 5, name="source")
result = model_src._reversed_contiguous_row_groups(rows_to_move)
assert len(result) == 1
assert result[0] == [3]
def test_reverse_row_groups_multiple_row(monkeypatch, session):
monkeypatch.setattr(playlistmodel, "Session", session)
rows_to_move = [2, 3, 4, 5, 7, 9, 10, 13, 17, 20, 21]
model_src = create_model_with_playlist_rows(session, 5, name="source")
result = model_src._reversed_contiguous_row_groups(rows_to_move)
assert result == [[20, 21], [17], [13], [9, 10], [7], [2, 3, 4, 5]]
def test_move_one_row_between_playlists_to_end(monkeypatch, session):
monkeypatch.setattr(playlistmodel, "Session", session)
create_rowcount = 5
from_rows = [3]
to_row = create_rowcount
model_src = create_model_with_playlist_rows(session, create_rowcount, name="source")
model_dst = create_model_with_playlist_rows(session, create_rowcount, name="destination")
model_src.move_rows_between_playlists(from_rows, to_row, model_dst.playlist_id)
model_dst.refresh_data(session)
assert len(model_src.playlist_rows) == create_rowcount - len(from_rows)
assert len(model_dst.playlist_rows) == create_rowcount + len(from_rows)
assert sorted([a.plr_rownum for a in model_src.playlist_rows.values()]) == list(
range(len(model_src.playlist_rows))
)
def test_move_one_row_between_playlists_to_middle(monkeypatch, session):
monkeypatch.setattr(playlistmodel, "Session", session)
create_rowcount = 5
from_rows = [3]
to_row = 2
model_src = create_model_with_playlist_rows(session, create_rowcount, name="source")
model_dst = create_model_with_playlist_rows(session, create_rowcount, name="destination")
model_src.move_rows_between_playlists(from_rows, to_row, model_dst.playlist_id)
model_dst.refresh_data(session)
# Check the rows of the destination model
row_notes = []
for row_number in range(model_dst.rowCount()):
index = model_dst.index(row_number, playlistmodel.Col.TITLE.value, QModelIndex())
row_notes.append(model_dst.data(index, Qt.ItemDataRole.EditRole).value())
assert len(model_src.playlist_rows) == create_rowcount - len(from_rows)
assert len(model_dst.playlist_rows) == create_rowcount + len(from_rows)
assert [int(a) for a in row_notes] == [0, 1, 3, 2, 3, 4]
def test_move_multiple_rows_between_playlists_to_end(monkeypatch, session):
monkeypatch.setattr(playlistmodel, "Session", session)
create_rowcount = 5
from_rows = [1, 3, 4]
to_row = 2
model_src = create_model_with_playlist_rows(session, create_rowcount, name="source")
model_dst = create_model_with_playlist_rows(session, create_rowcount, name="destination")
model_src.move_rows_between_playlists(from_rows, to_row, model_dst.playlist_id)
model_dst.refresh_data(session)
# Check the rows of the destination model
row_notes = []
for row_number in range(model_dst.rowCount()):
index = model_dst.index(row_number, playlistmodel.Col.TITLE.value, QModelIndex())
row_notes.append(model_dst.data(index, Qt.ItemDataRole.EditRole).value())
assert len(model_src.playlist_rows) == create_rowcount - len(from_rows)
assert len(model_dst.playlist_rows) == create_rowcount + len(from_rows)
assert [int(a) for a in row_notes] == [0, 1, 3, 4, 1, 2, 3, 4]
# def test_edit_header(monkeypatch, session): # edit header row in middle of playlist
# monkeypatch.setattr(playlistmodel, "Session", session)
# note_text = "test text"
# initial_row_count = 11
# insert_row = 6
# model = create_model_with_playlist_rows(session, initial_row_count)
# model.insert_header_row(insert_row, note_text)
# assert model.rowCount() == initial_row_count + 1
# prd = model.playlist_rows[insert_row]
# # Test against edit_role because display_role for headers is
# # handled differently (sets up row span)
# assert (
# model.edit_role(model.rowCount(), playlistmodel.Col.NOTE.value, prd)
# == note_text
# )

View File

@ -1,4 +1,4 @@
from PyQt6.QtCore import Qt
from PyQt5.QtCore import Qt
from app import playlists
from app import models