Compare commits

..

No commits in common. "4a4058d211e59e0ced24f3a7fc6d1aa779b7ff71" and "85cfebe0f71af3609372cccb55551ccd2b7c3bac" have entirely different histories.

11 changed files with 364 additions and 405 deletions

View File

@ -94,10 +94,10 @@ class MusicMusterSignals(QObject):
class Tags(NamedTuple): class Tags(NamedTuple):
artist: str = "" artist: str
title: str = "" title: str
bitrate: int = 0 bitrate: int
duration: int = 0 duration: int
class TrackInfo(NamedTuple): class TrackInfo(NamedTuple):

View File

@ -51,10 +51,6 @@ class Config(object):
FADEOUT_DB = -10 FADEOUT_DB = -10
FADEOUT_SECONDS = 5 FADEOUT_SECONDS = 5
FADEOUT_STEPS_PER_SECOND = 5 FADEOUT_STEPS_PER_SECOND = 5
FUZZYMATCH_MINIMUM_LIST = 60.0
FUZZYMATCH_MINIMUM_SELECT_ARTIST = 80.0
FUZZYMATCH_MINIMUM_SELECT_TITLE = 80.0
FUZZYMATCH_SHOW_SCORES = True
HEADER_ARTIST = "Artist" HEADER_ARTIST = "Artist"
HEADER_BITRATE = "bps" HEADER_BITRATE = "bps"
HEADER_DURATION = "Length" HEADER_DURATION = "Length"
@ -66,8 +62,8 @@ class Config(object):
HEADER_START_TIME = "Start" HEADER_START_TIME = "Start"
HEADER_TITLE = "Title" HEADER_TITLE = "Title"
HIDE_AFTER_PLAYING_OFFSET = 5000 HIDE_AFTER_PLAYING_OFFSET = 5000
HIDE_PLAYED_MODE_SECTIONS = "SECTIONS"
HIDE_PLAYED_MODE_TRACKS = "TRACKS" HIDE_PLAYED_MODE_TRACKS = "TRACKS"
HIDE_PLAYED_MODE_SECTIONS = "SECTIONS"
IMPORT_AS_NEW = "Import as new track" IMPORT_AS_NEW = "Import as new track"
INFO_TAB_TITLE_LENGTH = 15 INFO_TAB_TITLE_LENGTH = 15
INTRO_SECONDS_FORMAT = ".1f" INTRO_SECONDS_FORMAT = ".1f"
@ -86,6 +82,7 @@ class Config(object):
MAX_INFO_TABS = 5 MAX_INFO_TABS = 5
MAX_MISSING_FILES_TO_REPORT = 10 MAX_MISSING_FILES_TO_REPORT = 10
MILLISECOND_SIGFIGS = 0 MILLISECOND_SIGFIGS = 0
MINIMUM_FUZZYMATCH = 60.0
MINIMUM_ROW_HEIGHT = 30 MINIMUM_ROW_HEIGHT = 30
NO_TEMPLATE_NAME = "None" NO_TEMPLATE_NAME = "None"
NOTE_TIME_FORMAT = "%H:%M" NOTE_TIME_FORMAT = "%H:%M"
@ -126,5 +123,5 @@ class Config(object):
# These rely on earlier definitions # These rely on earlier definitions
HIDE_PLAYED_MODE = HIDE_PLAYED_MODE_SECTIONS HIDE_PLAYED_MODE = HIDE_PLAYED_MODE_SECTIONS
IMPORT_DESTINATION = "/tmp/mm" # os.path.join(ROOT, "Singles") IMPORT_DESTINATION = os.path.join(ROOT, "Singles")
REPLACE_FILES_DEFAULT_DESTINATION = os.path.dirname(REPLACE_FILES_DEFAULT_SOURCE) REPLACE_FILES_DEFAULT_DESTINATION = os.path.dirname(REPLACE_FILES_DEFAULT_SOURCE)

View File

@ -1,5 +1,6 @@
# Standard library imports # Standard library imports
from typing import Optional from typing import Optional
import os
# PyQt imports # PyQt imports
from PyQt6.QtCore import QEvent, Qt from PyQt6.QtCore import QEvent, Qt
@ -8,22 +9,27 @@ from PyQt6.QtWidgets import (
QDialog, QDialog,
QListWidgetItem, QListWidgetItem,
QMainWindow, QMainWindow,
QTableWidgetItem,
) )
# Third party imports # Third party imports
import pydymenu # type: ignore
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
# App imports # App imports
from classes import MusicMusterSignals from classes import MusicMusterSignals
from config import Config
from helpers import ( from helpers import (
ask_yes_no, ask_yes_no,
get_relative_date, get_relative_date,
get_tags,
ms_to_mmss, ms_to_mmss,
show_warning,
) )
from log import log from log import log
from models import Settings, Tracks from models import db, Settings, Tracks
from playlistmodel import PlaylistModel from playlistmodel import PlaylistModel
from ui import dlg_TrackSelect_ui from ui import dlg_TrackSelect_ui, dlg_replace_files_ui
class TrackSelectDialog(QDialog): class TrackSelectDialog(QDialog):

View File

@ -1,7 +1,7 @@
# Standard library imports # Standard library imports
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass, field from dataclasses import dataclass
from fuzzywuzzy import fuzz # type: ignore from fuzzywuzzy import fuzz # type: ignore
import os.path import os.path
from typing import Optional from typing import Optional
@ -17,11 +17,11 @@ from PyQt6.QtCore import (
from PyQt6.QtWidgets import ( from PyQt6.QtWidgets import (
QButtonGroup, QButtonGroup,
QDialog, QDialog,
QFileDialog,
QHBoxLayout, QHBoxLayout,
QLabel, QLabel,
QPushButton, QPushButton,
QRadioButton, QRadioButton,
QStatusBar,
QVBoxLayout, QVBoxLayout,
) )
@ -30,6 +30,8 @@ from PyQt6.QtWidgets import (
# App imports # App imports
from classes import ( from classes import (
ApplicationError, ApplicationError,
AudioMetadata,
FileErrors,
MusicMusterSignals, MusicMusterSignals,
Tags, Tags,
) )
@ -37,7 +39,7 @@ from config import Config
from helpers import ( from helpers import (
file_is_unreadable, file_is_unreadable,
get_tags, get_tags,
show_OK, show_warning,
) )
from log import log from log import log
from models import db, Tracks from models import db, Tracks
@ -47,26 +49,35 @@ import helpers
class DoTrackImport(QObject): class DoTrackImport(QObject):
import_finished = pyqtSignal(int, QThread) import_finished = pyqtSignal()
def __init__( def __init__(
self, self,
associated_thread: QThread,
import_file_path: str, import_file_path: str,
tags: Tags, tags: Tags,
destination_path: str, destination_track_path: str,
track_id: int, track_id: int,
audio_metadata: AudioMetadata,
base_model: PlaylistModel,
row_number: Optional[int],
) -> None: ) -> None:
""" """
Save parameters Save parameters
""" """
super().__init__() super().__init__()
self.associated_thread = associated_thread
self.import_file_path = import_file_path self.import_file_path = import_file_path
self.tags = tags self.tags = tags
self.destination_track_path = destination_path self.destination_track_path = destination_track_path
self.track_id = track_id self.track_id = track_id
self.audio_metadata = audio_metadata
self.base_model = base_model
if row_number is None:
self.next_row_number = base_model.rowCount()
else:
self.next_row_number = row_number
self.signals = MusicMusterSignals() self.signals = MusicMusterSignals()
@ -80,18 +91,12 @@ class DoTrackImport(QObject):
temp_file: Optional[str] = None temp_file: Optional[str] = None
# Get audio metadata in this thread rather than calling function to save interactive time
self.audio_metadata = helpers.get_audio_metadata(self.import_file_path)
# If destination exists, move it out of the way # If destination exists, move it out of the way
if os.path.exists(self.destination_track_path): if os.path.exists(self.destination_track_path):
temp_file = self.destination_track_path + ".TMP" temp_file = self.destination_track_path + ".TMP"
shutil.move(self.destination_track_path, temp_file) shutil.move(self.destination_track_path, temp_file)
# Move file to destination # Move file to destination
shutil.move(self.import_file_path, self.destination_track_path) shutil.move(self.import_file_path, self.destination_track_path)
# Clean up
if temp_file and os.path.exists(temp_file):
os.unlink(temp_file)
with db.Session() as session: with db.Session() as session:
self.signals.status_message_signal.emit( self.signals.status_message_signal.emit(
@ -125,11 +130,13 @@ class DoTrackImport(QObject):
session.commit() session.commit()
helpers.normalise_track(self.destination_track_path) helpers.normalise_track(self.destination_track_path)
self.base_model.insert_row(self.next_row_number, track.id, "imported")
self.next_row_number += 1
self.signals.status_message_signal.emit( self.signals.status_message_signal.emit(
f"{os.path.basename(self.import_file_path)} imported", 10000 f"{os.path.basename(self.import_file_path)} imported", 10000
) )
self.import_finished.emit(track.id, self.associated_thread) self.import_finished.emit()
class FileImporter: class FileImporter:
@ -144,27 +151,32 @@ class FileImporter:
Set up class Set up class
""" """
# Create ModelData # Save parameters
if not row_number: self.base_model = base_model
row_number = base_model.rowCount() if row_number:
self.model_data = ThreadData(base_model=base_model, row_number=row_number) self.row_number = row_number
else:
# Place to keep reference to importer threads and data self.row_number = base_model.rowCount()
self.thread_data: dict[QThread, ThreadData] = {}
# Data structure to track files to import # Data structure to track files to import
self.import_files_data: dict[str, TrackFileData] = {} self.import_files_data: list[TrackFileData] = []
# Dictionary of exsting tracks
# Dictionary of exsting tracks indexed by track.id
self.existing_tracks = self._get_existing_tracks() self.existing_tracks = self._get_existing_tracks()
# List of track_id, title tuples
# Populate self.import_files_data self.track_idx_and_title = [
for infile in [ ((a.id, a.title)) for a in self.existing_tracks.values()
]
# Files to import
self.import_files_paths = [
os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f) os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f)
for f in os.listdir(Config.REPLACE_FILES_DEFAULT_SOURCE) for f in os.listdir(Config.REPLACE_FILES_DEFAULT_SOURCE)
if f.endswith((".mp3", ".flac")) if f.endswith((".mp3", ".flac"))
]: ]
self.import_files_data[infile] = TrackFileData() # Files we can't import
self.unimportable_files: list[FileErrors] = []
# Files user doesn't want imported
self.do_not_import: list[str] = []
# Place to keep reference to importer while it runs
self.import_thread: dict[int, QThread] = {}
def do_import(self) -> None: def do_import(self) -> None:
""" """
@ -178,110 +190,206 @@ class FileImporter:
- import files and either replace existing or add to pool - import files and either replace existing or add to pool
""" """
# Check all file are readable and have tags. Mark failures not to # check all file are readable
# be imported and populate error text. self.check_files_are_readable()
for path in self.import_files_data.keys():
if file_is_unreadable(path):
self.import_files_data[path].import_this_file = False
self.import_files_data[path].error = f"{path} is unreadable"
continue
# Get tags # load readable files and tags into self.import_files
for import_file in self.import_files_paths:
try: try:
self.import_files_data[path].tags = get_tags(path) tags = get_tags(import_file)
except ApplicationError as e: except ApplicationError as e:
self.import_files_data[path].import_this_file = False self.unimportable_files.append(
self.import_files_data[path].error = f"Tag errors ({str(e)})" FileErrors(path=import_file, error=str(e))
continue
# Get track match data
self.populate_track_match_data(path)
# Sort with best artist match first
self.import_files_data[path].track_match_data.sort(
key=lambda rec: rec.artist_match, reverse=True
) )
self.import_files_paths.remove(import_file)
try:
self.import_files_data.append(
TrackFileData(import_file_path=import_file, tags=tags)
)
except Exception as e:
self.unimportable_files.append(
FileErrors(path=import_file, error=str(e))
)
self.import_files_paths.remove(import_file)
# Process user choices if self.unimportable_files:
self.process_user_choices(path) msg = "The following files could not be read and won't be imported:\n"
for unimportable_file in self.unimportable_files:
msg += f"\n\t{unimportable_file.path} ({unimportable_file.error})"
show_warning(None, "Unimportable files", msg)
# Import files and tell users about files that won't be imported # check for close matches.
msgs: list[str] = [] for idx in range(len(self.import_files_data)):
for (path, entry) in self.import_files_data.items(): self.check_match(idx=idx)
if entry.import_this_file:
self._start_thread(path) self.import_files_data = [
x
for x in self.import_files_data
if (
x.import_file_path not in self.do_not_import
or x.track_id != -1
)
]
# Import all that's left.
for idx in range(len(self.import_files_data)):
self._import_file(idx)
def check_match(self, idx: int) -> None:
"""
Work on and update the idx element of self.import_file_data.
Check for similar existing titles. If none found, set up to
import this as a new track. If one is found, check with user
whether this is a new track or replacement. If more than one
is found, as for one but order the tracks in
artist-similarity order.
"""
similar_track_ids = self._find_similar_strings(
self.import_files_data[idx].tags.title, self.track_idx_and_title
)
if len(similar_track_ids) == 0:
matching_track = 0
elif len(similar_track_ids) == 1:
matching_track = self._pick_match(idx, similar_track_ids)
else: else:
msgs.append( matching_track = self._pick_match(
f"{os.path.basename(path)} will not be imported because {entry.error}" idx, self.order_by_artist(idx, similar_track_ids)
)
if msgs:
show_OK("File not imported", "\r\r".join(msgs))
def _start_thread(self, path: str) -> None:
"""
Import the file specified by path
"""
log.debug(f"_start_thread({path=})")
# Create thread and worker
thread = QThread()
worker = DoTrackImport(
associated_thread=thread,
import_file_path=path,
tags=self.import_files_data[path].tags,
destination_path=self.import_files_data[path].destination_path,
track_id=self.import_files_data[path].track_id,
) )
# Associate data with the thread if matching_track < 0: # User cancelled
self.model_data.worker = worker matching_track = -1
self.thread_data[thread] = self.model_data
# Move worker to thread elif matching_track == 0:
worker.moveToThread(thread) self.import_files_data[idx].destination_track_path = os.path.join(
log.debug(f"_start_thread_worker started ({path=}, {id(thread)=}, {id(worker)=})") Config.IMPORT_DESTINATION,
os.path.basename(self.import_files_data[idx].import_file_path),
# Connect signals
thread.started.connect(lambda: log.debug(f"Thread {thread} started"))
thread.started.connect(worker.run)
thread.finished.connect(lambda: log.debug(f"Thread {thread} finished"))
thread.finished.connect(thread.deleteLater)
worker.import_finished.connect(
lambda: log.debug(f"Worker task finished for thread {thread}")
) )
worker.import_finished.connect(self._thread_finished) else:
worker.import_finished.connect(thread.quit) self.import_files_data[idx].destination_track_path = self.existing_tracks[
worker.import_finished.connect(worker.deleteLater) matching_track
].path
# Start thread self.import_files_data[idx].track_id = matching_track
thread.start()
def _thread_finished(self, track_id: int, thread: QThread) -> None: def _import_file(self, idx: int) -> None:
""" """
If track already in playlist, refresh it else insert it Import the file specified at self.import_files_data[idx]
""" """
log.debug(f" Ending thread {thread}") log.debug(f"_import_file({idx=}), {self.import_files_data[idx]=}")
model_data = self.thread_data.pop(thread, None) f = self.import_files_data[idx]
if model_data:
if model_data.base_model:
model_data.base_model.update_or_insert(track_id, model_data.row_number)
def _get_existing_track(self, track_id: int) -> Tracks: # Import in separate thread
""" self.import_thread[idx] = QThread()
Lookup in existing track in the local cache and return it self.worker = DoTrackImport(
""" import_file_path=f.import_file_path,
tags=f.tags,
existing_track_records = [a for a in self.existing_tracks if a.id == track_id] destination_track_path=f.destination_track_path,
if len(existing_track_records) != 1: track_id=f.track_id,
raise ApplicationError( audio_metadata=helpers.get_audio_metadata(f.import_file_path),
f"Internal error in _get_existing_track: {existing_track_records=}" base_model=self.base_model,
row_number=self.row_number,
) )
return existing_track_records[0] self.worker.moveToThread(self.import_thread[idx])
self.import_thread[idx].started.connect(self.worker.run)
self.worker.import_finished.connect(self.import_thread[idx].quit)
self.worker.import_finished.connect(self.worker.deleteLater)
self.import_thread[idx].finished.connect(self.import_thread[idx].deleteLater)
self.import_thread[idx].start()
def order_by_artist(self, idx: int, track_ids_to_check: list[int]) -> list[int]:
"""
Return the list of track_ids sorted by how well the artist at idx matches the
track artist.
"""
track_idx_and_artist = [
((key, a.artist))
for key, a in self.existing_tracks.items()
if key in track_ids_to_check
]
# We want to return all of the passed tracks so set minimum_score
# to zero
return self._find_similar_strings(
self.import_files_data[idx].tags.artist,
track_idx_and_artist,
minimum_score=0.0,
)
def _pick_match(self, idx: int, track_ids: list[int]) -> int:
"""
Return the track_id selected by the user, including "import as new" which will be
track_id 0. Return -1 if user cancels.
If user chooses not to import this track, remove it from the list of tracks to
import and return -1.
"""
log.debug(f"_pick_match({idx=}, {track_ids=})")
new_track_details = (
f"{self.import_files_data[idx].tags.title} "
f"({self.import_files_data[idx].tags.artist})"
)
# Build a list of (track title and artist, track_id, track path)
choices: list[tuple[str, int, str]] = []
# First choice is always to import as a new track
choices.append((Config.DO_NOT_IMPORT, -2, ""))
choices.append((Config.IMPORT_AS_NEW, 0, ""))
for track_id in track_ids:
choices.append(
(
f"{self.existing_tracks[track_id].title} "
f"({self.existing_tracks[track_id].artist})",
track_id,
str(self.existing_tracks[track_id].path),
)
)
dialog = PickMatch(new_track_details, choices)
if dialog.exec() and dialog.selected_id >= 0:
return dialog.selected_id
else:
self.do_not_import.append(self.import_files_data[idx].import_file_path)
return -1
def check_files_are_readable(self) -> None:
"""
Check files to be imported are readable. If not, remove them from the
import list and add them to the file errors list.
"""
for path in self.import_files_paths:
if file_is_unreadable(path):
self.unimportable_files.append(
FileErrors(path=os.path.basename(path), error="File is unreadable")
)
self.import_files_paths.remove(path)
def import_files_are_tagged(self) -> list:
"""
Return a (possibly empty) list of all untagged files in the
import directory. Add tags to file_data
"""
untagged_files: list[str] = []
for fullpath in self.import_files_paths:
tags = get_tags(fullpath)
if not tags:
untagged_files.append(os.path.basename(fullpath))
# Remove from import list
del self.import_files_data[fullpath]
log.warning(f"Import: no tags found, {fullpath=}")
else:
self.import_files_data.append(
TrackFileData(import_file_path=fullpath, tags=tags)
)
return untagged_files
def _get_existing_tracks(self): def _get_existing_tracks(self):
""" """
@ -289,17 +397,30 @@ class FileImporter:
""" """
with db.Session() as session: with db.Session() as session:
return Tracks.get_all(session) return Tracks.all_tracks_indexed_by_id(session)
def _get_match_score(self, str1: str, str2: str) -> float: def _find_similar_strings(
self,
needle: str,
haystack: list[tuple[int, str]],
minimum_score: float = Config.MINIMUM_FUZZYMATCH,
) -> list[int]:
""" """
Return the score of how well str1 matches str2. Search for the needle in the string element of the haystack.
Discard similarities less that minimum_score. Return a list of
the int element of the haystack in order of decreasing score (ie,
best match first).
""" """
ratio = fuzz.ratio(str1, str2) # Create a dictionary to store similarities
partial_ratio = fuzz.partial_ratio(str1, str2) similarities: dict[int, float] = {}
token_sort_ratio = fuzz.token_sort_ratio(str1, str2)
token_set_ratio = fuzz.token_set_ratio(str1, str2) for hayblade in haystack:
# Calculate similarity using multiple metrics
ratio = fuzz.ratio(needle, hayblade[1])
partial_ratio = fuzz.partial_ratio(needle, hayblade[1])
token_sort_ratio = fuzz.token_sort_ratio(needle, hayblade[1])
token_set_ratio = fuzz.token_set_ratio(needle, hayblade[1])
# Combine scores # Combine scores
combined_score = ( combined_score = (
@ -309,175 +430,18 @@ class FileImporter:
+ token_set_ratio * 0.25 + token_set_ratio * 0.25
) )
return combined_score if combined_score >= minimum_score:
similarities[hayblade[0]] = combined_score
def populate_track_match_data(self, path: str) -> None: log.debug(
""" f"_find_similar_strings({needle=}), {len(haystack)=}, "
Populate self.import_files_data[path].track_match_data f"{minimum_score=}, {hayblade=}, {combined_score=}"
- Search title in existing tracks
- if score >= Config.FUZZYMATCH_MINIMUM_LIST:
- get artist score
- add TrackMatchData to self.import_files_data[path].track_match_data
"""
title = self.import_files_data[path].tags.title
artist = self.import_files_data[path].tags.artist
for track in self.existing_tracks:
title_score = self._get_match_score(title, track.title)
if title_score >= Config.FUZZYMATCH_MINIMUM_LIST:
artist_score = self._get_match_score(artist, track.artist)
self.import_files_data[path].track_match_data.append(
TrackMatchData(
artist=track.artist,
artist_match=artist_score,
title=track.title,
title_match=title_score,
track_id=track.id,
)
) )
def process_user_choices(self, path: str) -> None: # Sort matches by score
""" sorted_matches = sorted(similarities.items(), key=lambda x: x[1], reverse=True)
Find out whether user wants to import this as a new track,
overwrite an existing track or not import it at all.
"""
# Build a list of (track title and artist, track_id, track path) # Return list of indexes, highest score first
choices: list[tuple[str, int, str]] = [] return [a[0] for a in sorted_matches]
# First choice is always to import as a new track
choices.append((Config.DO_NOT_IMPORT, -1, ""))
choices.append((Config.IMPORT_AS_NEW, 0, ""))
# New track details
importing_track_description = (
f"{self.import_files_data[path].tags.title} "
f"({self.import_files_data[path].tags.artist})"
)
# Select 'import as new' as default unless the top match is good
# enough
default = 1 # default choice is import as new
track_match_data = self.import_files_data[path].track_match_data
try:
if track_match_data:
if (
track_match_data[0].artist_match
>= Config.FUZZYMATCH_MINIMUM_SELECT_ARTIST
and track_match_data[0].title_match
>= Config.FUZZYMATCH_MINIMUM_SELECT_TITLE
):
default = 2
for rec in track_match_data:
existing_track_description = f"{rec.title} ({rec.artist})"
if Config.FUZZYMATCH_SHOW_SCORES:
existing_track_description += f" ({rec.title_match:.0f}%)"
existing_track_path = self._get_existing_track(rec.track_id).path
choices.append(
(existing_track_description, rec.track_id, existing_track_path)
)
except IndexError:
import pdb
pdb.set_trace()
print(2)
dialog = PickMatch(
new_track_description=importing_track_description,
choices=choices,
default=default,
)
if dialog.exec():
if dialog.selected_track_id < 0:
self.import_files_data[path].import_this_file = False
self.import_files_data[path].error = "you asked not to import this file"
elif dialog.selected_track_id > 0:
self.replace_file(path=path, track_id=dialog.selected_track_id)
else:
# Import as new, but check destination path doesn't
# already exists
while os.path.exists(self.import_files_data[path].destination_path):
msg = (
"New import requested but default destination path ({path}) "
"already exists. Click OK and choose where to save this track"
)
import pdb
pdb.set_trace()
show_OK(None, title="Desintation path exists", msg=msg)
# Get output filename
pathspec = QFileDialog.getSaveFileName(
None,
"Save imported track",
directory=Config.IMPORT_DESTINATION,
)
if not pathspec:
self.import_files_data[path].import_this_file = False
self.import_files_data[
path
].error = "destination file already exists"
return
self.import_files_data[path].destination_path = pathspec[0]
self.import_as_new(path=path)
else:
# User cancelled dialog
self.import_files_data[path].import_this_file = False
self.import_files_data[path].error = "you cancelled the import of this file"
def import_as_new(self, path: str) -> None:
"""
Import passed path as a new file
"""
log.debug(f"Import as new, {path=}")
tfd = self.import_files_data[path]
destination_path = os.path.join(
Config.IMPORT_DESTINATION, os.path.basename(path)
)
if os.path.exists(destination_path):
tfd.import_this_file = False
tfd.error = f"this is a new import but destination file already exists ({destination_path})"
return
tfd.destination_path = destination_path
def replace_file(self, path: str, track_id: int) -> None:
"""
Replace existing track {track_id=} with passed path
"""
log.debug(f"Replace {track_id=} with {path=}")
tfd = self.import_files_data[path]
existing_track_path = self._get_existing_track(track_id).path
proposed_destination_path = os.path.join(
os.path.dirname(existing_track_path), os.path.basename(path)
)
# if the destination path exists and it's not the path the
# track_id points to, abort
if existing_track_path != proposed_destination_path and os.path.exists(
proposed_destination_path
):
tfd.import_this_file = False
tfd.error = f"New import would overwrite existing file ({proposed_destination_path})"
return
tfd.file_path_to_remove = existing_track_path
tfd.destination_path = proposed_destination_path
tfd.track_id = track_id
@dataclass
class ThreadData:
base_model: PlaylistModel
row_number: int
worker: Optional[DoTrackImport] = None
class PickMatch(QDialog): class PickMatch(QDialog):
@ -487,18 +451,14 @@ class PickMatch(QDialog):
""" """
def __init__( def __init__(
self, self, new_track_details: str, items_with_ids: list[tuple[str, int, str]]
new_track_description: str,
choices: list[tuple[str, int, str]],
default: int,
) -> None: ) -> None:
super().__init__() super().__init__()
self.new_track_description = new_track_description self.new_track_details = new_track_details
self.default = default self.init_ui(items_with_ids)
self.init_ui(choices) self.selected_id = -1
self.selected_track_id = -1
def init_ui(self, choices: list[tuple[str, int, str]]) -> None: def init_ui(self, items_with_ids: list[tuple[str, int, str]]) -> None:
""" """
Set up dialog Set up dialog
""" """
@ -509,7 +469,7 @@ class PickMatch(QDialog):
# Add instructions # Add instructions
instructions = ( instructions = (
f"Importing {self.new_track_description}.\n" f"Importing {self.new_track_details}.\n"
"Import as a new track or replace existing track?" "Import as a new track or replace existing track?"
) )
instructions_label = QLabel(instructions) instructions_label = QLabel(instructions)
@ -519,25 +479,25 @@ class PickMatch(QDialog):
self.button_group = QButtonGroup() self.button_group = QButtonGroup()
# Add radio buttons for each item # Add radio buttons for each item
for idx, (track_description, track_id, track_path) in enumerate(choices): for idx, (text, track_id, track_path) in enumerate(items_with_ids):
if ( if (
track_sequence.current track_sequence.current
and track_id and track_id
and track_sequence.current.track_id == track_id and track_sequence.current.track_id == track_id
): ):
# Don't allow current track to be replaced # Don't allow current track to be replaced
track_description = "(Currently playing) " + track_description text = "(Currently playing) " + text
radio_button = QRadioButton(track_description) radio_button = QRadioButton(text)
radio_button.setDisabled(True) radio_button.setDisabled(True)
self.button_group.addButton(radio_button, -1) self.button_group.addButton(radio_button, -1)
else: else:
radio_button = QRadioButton(track_description) radio_button = QRadioButton(text)
radio_button.setToolTip(track_path) radio_button.setToolTip(track_path)
self.button_group.addButton(radio_button, track_id) self.button_group.addButton(radio_button, track_id)
layout.addWidget(radio_button) layout.addWidget(radio_button)
# Select the second item by default (import as new) # Select the second item by default (import as new)
if idx == self.default: if idx == 1:
radio_button.setChecked(True) radio_button.setChecked(True)
# Add OK and Cancel buttons # Add OK and Cancel buttons
@ -556,7 +516,7 @@ class PickMatch(QDialog):
def on_ok(self): def on_ok(self):
# Get the ID of the selected button # Get the ID of the selected button
self.selected_track_id = self.button_group.checkedId() self.selected_id = self.button_group.checkedId()
self.accept() self.accept()
@ -566,19 +526,16 @@ class TrackFileData:
Simple class to track details changes to a track file Simple class to track details changes to a track file
""" """
tags: Tags = Tags() import_file_path: str
destination_path: str = "" tags: Tags
import_this_file: bool = True destination_track_path: str = ""
error: str = "" file_path_to_removed: Optional[str] = None
file_path_to_remove: Optional[str] = None
track_id: int = 0 track_id: int = 0
track_match_data: list[TrackMatchData] = field(default_factory=list) audio_metadata: Optional[AudioMetadata] = None
def set_destination_track_path(self, path: str) -> None:
"""
Assigned the passed path
"""
@dataclass self.destination_track_path = path
class TrackMatchData:
artist: str
artist_match: float
title: str
title_match: float
track_id: int

View File

@ -10,7 +10,7 @@ import ssl
import tempfile import tempfile
# PyQt imports # PyQt imports
from PyQt6.QtWidgets import QMainWindow, QMessageBox, QWidget from PyQt6.QtWidgets import QMainWindow, QMessageBox
# Third party imports # Third party imports
from mutagen.flac import FLAC # type: ignore from mutagen.flac import FLAC # type: ignore
@ -204,14 +204,6 @@ def get_tags(path: str) -> Tags:
except TinyTagException: except TinyTagException:
raise ApplicationError(f"Can't read tags: get_tags({path=})") raise ApplicationError(f"Can't read tags: get_tags({path=})")
if (
tag.title is None
or tag.artist is None
or tag.bitrate is None
or tag.duration is None
):
raise ApplicationError(f"Missing tags: get_tags({path=})")
return Tags( return Tags(
title=tag.title, title=tag.title,
artist=tag.artist, artist=tag.artist,
@ -400,16 +392,10 @@ def set_track_metadata(track: Tracks) -> None:
setattr(track, tag_key, getattr(tags, tag_key)) setattr(track, tag_key, getattr(tags, tag_key))
def show_OK(title: str, msg: str, parent: Optional[QWidget] = None) -> None: def show_OK(parent: QMainWindow, title: str, msg: str) -> None:
"""Display a message to user""" """Display a message to user"""
dlg = QMessageBox(parent) QMessageBox.information(parent, title, msg, buttons=QMessageBox.StandardButton.Ok)
dlg.setIcon(QMessageBox.Icon.Information)
dlg.setWindowTitle(title)
dlg.setText(msg)
dlg.setStandardButtons(QMessageBox.StandardButton.Ok)
_ = dlg.exec()
def show_warning(parent: Optional[QMainWindow], title: str, msg: str) -> None: def show_warning(parent: Optional[QMainWindow], title: str, msg: str) -> None:

View File

@ -10,6 +10,7 @@ import sys
# PyQt imports # PyQt imports
# Third party imports # Third party imports
import line_profiler
from sqlalchemy import ( from sqlalchemy import (
bindparam, bindparam,
delete, delete,
@ -245,7 +246,9 @@ class Playlists(dbtables.PlaylistsTable):
"""Returns a list of all templates ordered by name""" """Returns a list of all templates ordered by name"""
return session.scalars( return session.scalars(
select(cls).where(cls.is_template.is_(True)).order_by(cls.name) select(cls)
.where(cls.is_template.is_(True))
.order_by(cls.name)
).all() ).all()
@classmethod @classmethod
@ -574,10 +577,12 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
) )
@staticmethod @staticmethod
@line_profiler.profile
def update_plr_row_numbers( def update_plr_row_numbers(
session: Session, session: Session,
playlist_id: int, playlist_id: int,
sqla_map: List[dict[str, int]], sqla_map: List[dict[str, int]],
dummy_for_profiling: Optional[int] = None,
) -> None: ) -> None:
""" """
Take a {plrid: row_number} dictionary and update the row numbers accordingly Take a {plrid: row_number} dictionary and update the row numbers accordingly

View File

@ -44,6 +44,7 @@ from PyQt6.QtWidgets import (
) )
# Third party imports # Third party imports
import line_profiler
from pygame import mixer from pygame import mixer
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
import stackprinter # type: ignore import stackprinter # type: ignore
@ -570,18 +571,18 @@ class Window(QMainWindow, Ui_MainWindow):
) )
self.actionExport_playlist.triggered.connect(self.export_playlist_tab) self.actionExport_playlist.triggered.connect(self.export_playlist_tab)
self.actionFade.triggered.connect(self.fade) self.actionFade.triggered.connect(self.fade)
self.actionImport_files.triggered.connect(self.import_files_wrapper)
self.actionInsertSectionHeader.triggered.connect(self.insert_header) self.actionInsertSectionHeader.triggered.connect(self.insert_header)
self.actionInsertTrack.triggered.connect(self.insert_track) self.actionInsertTrack.triggered.connect(self.insert_track)
self.actionManage_templates.triggered.connect(self.manage_templates)
self.actionMark_for_moving.triggered.connect(self.mark_rows_for_moving) self.actionMark_for_moving.triggered.connect(self.mark_rows_for_moving)
self.actionMoveSelected.triggered.connect(self.move_selected) self.actionMoveSelected.triggered.connect(self.move_selected)
self.actionMoveUnplayed.triggered.connect(self.move_unplayed) self.actionMoveUnplayed.triggered.connect(self.move_unplayed)
self.actionManage_templates.triggered.connect(self.manage_templates)
self.actionNewPlaylist.triggered.connect(self.new_playlist) self.actionNewPlaylist.triggered.connect(self.new_playlist)
self.actionOpenPlaylist.triggered.connect(self.open_playlist) self.actionOpenPlaylist.triggered.connect(self.open_playlist)
self.actionPaste.triggered.connect(self.paste_rows) self.actionPaste.triggered.connect(self.paste_rows)
self.actionPlay_next.triggered.connect(self.play_next) self.actionPlay_next.triggered.connect(self.play_next)
self.actionRenamePlaylist.triggered.connect(self.rename_playlist) self.actionRenamePlaylist.triggered.connect(self.rename_playlist)
self.actionReplace_files.triggered.connect(self.import_files_wrapper)
self.actionResume.triggered.connect(self.resume) self.actionResume.triggered.connect(self.resume)
self.actionSave_as_template.triggered.connect(self.save_as_template) self.actionSave_as_template.triggered.connect(self.save_as_template)
self.actionSearch_title_in_Songfacts.triggered.connect( self.actionSearch_title_in_Songfacts.triggered.connect(
@ -1139,7 +1140,8 @@ class Window(QMainWindow, Ui_MainWindow):
else: else:
webbrowser.get("browser").open_new_tab(url) webbrowser.get("browser").open_new_tab(url)
def paste_rows(self) -> None: @line_profiler.profile
def paste_rows(self, dummy_for_profiling: Optional[int] = None) -> None:
""" """
Paste earlier cut rows. Paste earlier cut rows.
""" """

View File

@ -26,6 +26,7 @@ from PyQt6.QtGui import (
) )
# Third party imports # Third party imports
import line_profiler
import obswebsocket # type: ignore import obswebsocket # type: ignore
# import snoop # type: ignore # import snoop # type: ignore
@ -296,17 +297,22 @@ class PlaylistModel(QAbstractTableModel):
self.update_track_times() self.update_track_times()
# Find next track # Find next track
# Get all unplayed track rows
log.debug(f"{self}: Find next track")
next_row = None next_row = None
unplayed_rows = [ unplayed_rows = self.get_unplayed_rows()
a
for a in self.get_unplayed_rows()
if not self.is_header_row(a)
and not file_is_unreadable(self.playlist_rows[a].path)
]
if unplayed_rows: if unplayed_rows:
try: try:
next_row = min([a for a in unplayed_rows if a > row_number]) # Find next row after current track
next_row = min(
[
a
for a in unplayed_rows
if a > row_number and not self.is_header_row(a)
]
)
except ValueError: except ValueError:
# Find first unplayed track
next_row = min(unplayed_rows) next_row = min(unplayed_rows)
if next_row is not None: if next_row is not None:
self.set_next_row(next_row) self.set_next_row(next_row)
@ -771,7 +777,9 @@ class PlaylistModel(QAbstractTableModel):
return None return None
def load_data(self, session: db.session) -> None: def load_data(
self, session: db.session, dummy_for_profiling: Optional[int] = None
) -> None:
""" """
Same as refresh data, but only used when creating playslit. Same as refresh data, but only used when creating playslit.
Distinguishes profile time between initial load and other Distinguishes profile time between initial load and other
@ -818,7 +826,13 @@ class PlaylistModel(QAbstractTableModel):
self.update_track_times() self.update_track_times()
self.invalidate_rows(row_numbers) self.invalidate_rows(row_numbers)
def move_rows(self, from_rows: list[int], to_row_number: int) -> None: @line_profiler.profile
def move_rows(
self,
from_rows: list[int],
to_row_number: int,
dummy_for_profiling: Optional[int] = None,
) -> None:
""" """
Move the playlist rows given to to_row and below. Move the playlist rows given to to_row and below.
""" """
@ -883,11 +897,13 @@ class PlaylistModel(QAbstractTableModel):
self.update_track_times() self.update_track_times()
self.invalidate_rows(list(row_map.keys())) self.invalidate_rows(list(row_map.keys()))
@line_profiler.profile
def move_rows_between_playlists( def move_rows_between_playlists(
self, self,
from_rows: list[int], from_rows: list[int],
to_row_number: int, to_row_number: int,
to_playlist_id: int, to_playlist_id: int,
dummy_for_profiling: Optional[int] = None,
) -> None: ) -> None:
""" """
Move the playlist rows given to to_row and below of to_playlist. Move the playlist rows given to to_row and below of to_playlist.
@ -1060,7 +1076,10 @@ class PlaylistModel(QAbstractTableModel):
# Update display # Update display
self.invalidate_row(track_sequence.previous.row_number) self.invalidate_row(track_sequence.previous.row_number)
def refresh_data(self, session: db.session) -> None: @line_profiler.profile
def refresh_data(
self, session: db.session, dummy_for_profiling: Optional[int] = None
) -> None:
""" """
Populate self.playlist_rows with playlist data Populate self.playlist_rows with playlist data
@ -1563,23 +1582,6 @@ class PlaylistModel(QAbstractTableModel):
) )
) )
def update_or_insert(self, track_id: int, row_number: int) -> None:
"""
If the passed track_id exists in this playlist, update the
row(s), otherwise insert this track at row_number.
"""
track_rows = [
a.row_number for a in self.playlist_rows.values() if a.track_id == track_id
]
if track_rows:
with db.Session() as session:
for row in track_rows:
self.refresh_row(session, row)
self.invalidate_rows(track_rows)
else:
self.insert_row(proposed_row_number=row_number, track_id=track_id)
def update_track_times(self) -> None: def update_track_times(self) -> None:
""" """
Update track start/end times in self.playlist_rows Update track start/end times in self.playlist_rows

View File

@ -33,6 +33,7 @@ from PyQt6.QtWidgets import (
) )
# Third party imports # Third party imports
import line_profiler
# App imports # App imports
from audacity_controller import AudacityController from audacity_controller import AudacityController
@ -377,7 +378,10 @@ class PlaylistTab(QTableView):
# Deselect edited line # Deselect edited line
self.clear_selection() self.clear_selection()
def dropEvent(self, event: Optional[QDropEvent]) -> None: @line_profiler.profile
def dropEvent(
self, event: Optional[QDropEvent], dummy_for_profiling: Optional[int] = None
) -> None:
""" """
Move dropped rows Move dropped rows
""" """

View File

@ -1000,7 +1000,7 @@ padding-left: 8px;</string>
<addaction name="actionSave_as_template"/> <addaction name="actionSave_as_template"/>
<addaction name="actionManage_templates"/> <addaction name="actionManage_templates"/>
<addaction name="separator"/> <addaction name="separator"/>
<addaction name="actionImport_files"/> <addaction name="actionReplace_files"/>
<addaction name="separator"/> <addaction name="separator"/>
<addaction name="actionE_xit"/> <addaction name="actionE_xit"/>
</widget> </widget>
@ -1364,7 +1364,7 @@ padding-left: 8px;</string>
<string>Select duplicate rows...</string> <string>Select duplicate rows...</string>
</property> </property>
</action> </action>
<action name="actionImport_files"> <action name="actionReplace_files">
<property name="text"> <property name="text">
<string>Import files...</string> <string>Import files...</string>
</property> </property>

View File

@ -1,6 +1,6 @@
# Form implementation generated from reading ui file 'app/ui/main_window.ui' # Form implementation generated from reading ui file 'app/ui/main_window.ui'
# #
# Created by: PyQt6 UI code generator 6.8.0 # Created by: PyQt6 UI code generator 6.7.1
# #
# WARNING: Any manual changes made to this file will be lost when pyuic6 is # WARNING: Any manual changes made to this file will be lost when pyuic6 is
# run again. Do not edit this file unless you know what you are doing. # run again. Do not edit this file unless you know what you are doing.
@ -529,8 +529,8 @@ class Ui_MainWindow(object):
self.actionSearch_title_in_Songfacts.setObjectName("actionSearch_title_in_Songfacts") self.actionSearch_title_in_Songfacts.setObjectName("actionSearch_title_in_Songfacts")
self.actionSelect_duplicate_rows = QtGui.QAction(parent=MainWindow) self.actionSelect_duplicate_rows = QtGui.QAction(parent=MainWindow)
self.actionSelect_duplicate_rows.setObjectName("actionSelect_duplicate_rows") self.actionSelect_duplicate_rows.setObjectName("actionSelect_duplicate_rows")
self.actionImport_files = QtGui.QAction(parent=MainWindow) self.actionReplace_files = QtGui.QAction(parent=MainWindow)
self.actionImport_files.setObjectName("actionImport_files") self.actionReplace_files.setObjectName("actionReplace_files")
self.menuFile.addSeparator() self.menuFile.addSeparator()
self.menuFile.addAction(self.actionInsertTrack) self.menuFile.addAction(self.actionInsertTrack)
self.menuFile.addAction(self.actionRemove) self.menuFile.addAction(self.actionRemove)
@ -557,7 +557,7 @@ class Ui_MainWindow(object):
self.menuPlaylist.addAction(self.actionSave_as_template) self.menuPlaylist.addAction(self.actionSave_as_template)
self.menuPlaylist.addAction(self.actionManage_templates) self.menuPlaylist.addAction(self.actionManage_templates)
self.menuPlaylist.addSeparator() self.menuPlaylist.addSeparator()
self.menuPlaylist.addAction(self.actionImport_files) self.menuPlaylist.addAction(self.actionReplace_files)
self.menuPlaylist.addSeparator() self.menuPlaylist.addSeparator()
self.menuPlaylist.addAction(self.actionE_xit) self.menuPlaylist.addAction(self.actionE_xit)
self.menuSearc_h.addAction(self.actionSetNext) self.menuSearc_h.addAction(self.actionSetNext)
@ -676,6 +676,6 @@ class Ui_MainWindow(object):
self.actionSearch_title_in_Songfacts.setText(_translate("MainWindow", "Search title in Songfacts")) self.actionSearch_title_in_Songfacts.setText(_translate("MainWindow", "Search title in Songfacts"))
self.actionSearch_title_in_Songfacts.setShortcut(_translate("MainWindow", "Ctrl+S")) self.actionSearch_title_in_Songfacts.setShortcut(_translate("MainWindow", "Ctrl+S"))
self.actionSelect_duplicate_rows.setText(_translate("MainWindow", "Select duplicate rows...")) self.actionSelect_duplicate_rows.setText(_translate("MainWindow", "Select duplicate rows..."))
self.actionImport_files.setText(_translate("MainWindow", "Import files...")) self.actionReplace_files.setText(_translate("MainWindow", "Import files..."))
from infotabs import InfoTabs from infotabs import InfoTabs # type: ignore
from pyqtgraph import PlotWidget from pyqtgraph import PlotWidget # type: ignore