Much improved file importer
This commit is contained in:
parent
85cfebe0f7
commit
3a3b1b712d
@ -94,10 +94,10 @@ class MusicMusterSignals(QObject):
|
||||
|
||||
|
||||
class Tags(NamedTuple):
|
||||
artist: str
|
||||
title: str
|
||||
bitrate: int
|
||||
duration: int
|
||||
artist: str = ""
|
||||
title: str = ""
|
||||
bitrate: int = 0
|
||||
duration: int = 0
|
||||
|
||||
|
||||
class TrackInfo(NamedTuple):
|
||||
|
||||
@ -51,6 +51,10 @@ class Config(object):
|
||||
FADEOUT_DB = -10
|
||||
FADEOUT_SECONDS = 5
|
||||
FADEOUT_STEPS_PER_SECOND = 5
|
||||
FUZZYMATCH_MINIMUM_LIST = 60.0
|
||||
FUZZYMATCH_MINIMUM_SELECT_ARTIST = 80.0
|
||||
FUZZYMATCH_MINIMUM_SELECT_TITLE = 80.0
|
||||
FUZZYMATCH_SHOW_SCORES = True
|
||||
HEADER_ARTIST = "Artist"
|
||||
HEADER_BITRATE = "bps"
|
||||
HEADER_DURATION = "Length"
|
||||
@ -62,8 +66,8 @@ class Config(object):
|
||||
HEADER_START_TIME = "Start"
|
||||
HEADER_TITLE = "Title"
|
||||
HIDE_AFTER_PLAYING_OFFSET = 5000
|
||||
HIDE_PLAYED_MODE_TRACKS = "TRACKS"
|
||||
HIDE_PLAYED_MODE_SECTIONS = "SECTIONS"
|
||||
HIDE_PLAYED_MODE_TRACKS = "TRACKS"
|
||||
IMPORT_AS_NEW = "Import as new track"
|
||||
INFO_TAB_TITLE_LENGTH = 15
|
||||
INTRO_SECONDS_FORMAT = ".1f"
|
||||
@ -82,7 +86,6 @@ class Config(object):
|
||||
MAX_INFO_TABS = 5
|
||||
MAX_MISSING_FILES_TO_REPORT = 10
|
||||
MILLISECOND_SIGFIGS = 0
|
||||
MINIMUM_FUZZYMATCH = 60.0
|
||||
MINIMUM_ROW_HEIGHT = 30
|
||||
NO_TEMPLATE_NAME = "None"
|
||||
NOTE_TIME_FORMAT = "%H:%M"
|
||||
|
||||
@ -1,6 +1,5 @@
|
||||
# Standard library imports
|
||||
from typing import Optional
|
||||
import os
|
||||
|
||||
# PyQt imports
|
||||
from PyQt6.QtCore import QEvent, Qt
|
||||
@ -9,27 +8,22 @@ from PyQt6.QtWidgets import (
|
||||
QDialog,
|
||||
QListWidgetItem,
|
||||
QMainWindow,
|
||||
QTableWidgetItem,
|
||||
)
|
||||
|
||||
# Third party imports
|
||||
import pydymenu # type: ignore
|
||||
from sqlalchemy.orm.session import Session
|
||||
|
||||
# App imports
|
||||
from classes import MusicMusterSignals
|
||||
from config import Config
|
||||
from helpers import (
|
||||
ask_yes_no,
|
||||
get_relative_date,
|
||||
get_tags,
|
||||
ms_to_mmss,
|
||||
show_warning,
|
||||
)
|
||||
from log import log
|
||||
from models import db, Settings, Tracks
|
||||
from models import Settings, Tracks
|
||||
from playlistmodel import PlaylistModel
|
||||
from ui import dlg_TrackSelect_ui, dlg_replace_files_ui
|
||||
from ui import dlg_TrackSelect_ui
|
||||
|
||||
|
||||
class TrackSelectDialog(QDialog):
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
# Standard library imports
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
from fuzzywuzzy import fuzz # type: ignore
|
||||
import os.path
|
||||
from typing import Optional
|
||||
@ -21,7 +21,6 @@ from PyQt6.QtWidgets import (
|
||||
QLabel,
|
||||
QPushButton,
|
||||
QRadioButton,
|
||||
QStatusBar,
|
||||
QVBoxLayout,
|
||||
)
|
||||
|
||||
@ -30,8 +29,6 @@ from PyQt6.QtWidgets import (
|
||||
# App imports
|
||||
from classes import (
|
||||
ApplicationError,
|
||||
AudioMetadata,
|
||||
FileErrors,
|
||||
MusicMusterSignals,
|
||||
Tags,
|
||||
)
|
||||
@ -49,35 +46,26 @@ import helpers
|
||||
|
||||
|
||||
class DoTrackImport(QObject):
|
||||
import_finished = pyqtSignal()
|
||||
import_finished = pyqtSignal(int, QThread)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
associated_thread: QThread,
|
||||
import_file_path: str,
|
||||
tags: Tags,
|
||||
destination_track_path: str,
|
||||
destination_path: str,
|
||||
track_id: int,
|
||||
audio_metadata: AudioMetadata,
|
||||
base_model: PlaylistModel,
|
||||
row_number: Optional[int],
|
||||
) -> None:
|
||||
"""
|
||||
Save parameters
|
||||
"""
|
||||
|
||||
super().__init__()
|
||||
|
||||
self.associated_thread = associated_thread
|
||||
self.import_file_path = import_file_path
|
||||
self.tags = tags
|
||||
self.destination_track_path = destination_track_path
|
||||
self.destination_track_path = destination_path
|
||||
self.track_id = track_id
|
||||
self.audio_metadata = audio_metadata
|
||||
self.base_model = base_model
|
||||
|
||||
if row_number is None:
|
||||
self.next_row_number = base_model.rowCount()
|
||||
else:
|
||||
self.next_row_number = row_number
|
||||
|
||||
self.signals = MusicMusterSignals()
|
||||
|
||||
@ -91,6 +79,9 @@ class DoTrackImport(QObject):
|
||||
|
||||
temp_file: Optional[str] = None
|
||||
|
||||
# Get audio metadata in this thread rather than calling function to save interactive time
|
||||
self.audio_metadata = helpers.get_audio_metadata(self.import_file_path)
|
||||
|
||||
# If destination exists, move it out of the way
|
||||
if os.path.exists(self.destination_track_path):
|
||||
temp_file = self.destination_track_path + ".TMP"
|
||||
@ -130,13 +121,11 @@ class DoTrackImport(QObject):
|
||||
session.commit()
|
||||
|
||||
helpers.normalise_track(self.destination_track_path)
|
||||
self.base_model.insert_row(self.next_row_number, track.id, "imported")
|
||||
self.next_row_number += 1
|
||||
|
||||
self.signals.status_message_signal.emit(
|
||||
f"{os.path.basename(self.import_file_path)} imported", 10000
|
||||
)
|
||||
self.import_finished.emit()
|
||||
self.signals.status_message_signal.emit(
|
||||
f"{os.path.basename(self.import_file_path)} imported", 10000
|
||||
)
|
||||
self.import_finished.emit(track.id, self.associated_thread)
|
||||
|
||||
|
||||
class FileImporter:
|
||||
@ -151,32 +140,27 @@ class FileImporter:
|
||||
Set up class
|
||||
"""
|
||||
|
||||
# Save parameters
|
||||
self.base_model = base_model
|
||||
if row_number:
|
||||
self.row_number = row_number
|
||||
else:
|
||||
self.row_number = base_model.rowCount()
|
||||
# Create ModelData
|
||||
if not row_number:
|
||||
row_number = base_model.rowCount()
|
||||
self.model_data = ModelData(base_model=base_model, row_number=row_number)
|
||||
|
||||
# Place to keep reference to importer threads and data
|
||||
self.thread_data: dict[QThread, ModelData] = {}
|
||||
|
||||
# Data structure to track files to import
|
||||
self.import_files_data: list[TrackFileData] = []
|
||||
# Dictionary of exsting tracks
|
||||
self.import_files_data: dict[str, TrackFileData] = {}
|
||||
|
||||
# Dictionary of exsting tracks indexed by track.id
|
||||
self.existing_tracks = self._get_existing_tracks()
|
||||
# List of track_id, title tuples
|
||||
self.track_idx_and_title = [
|
||||
((a.id, a.title)) for a in self.existing_tracks.values()
|
||||
]
|
||||
# Files to import
|
||||
self.import_files_paths = [
|
||||
|
||||
# Populate self.import_files_data
|
||||
for infile in [
|
||||
os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f)
|
||||
for f in os.listdir(Config.REPLACE_FILES_DEFAULT_SOURCE)
|
||||
if f.endswith((".mp3", ".flac"))
|
||||
]
|
||||
# Files we can't import
|
||||
self.unimportable_files: list[FileErrors] = []
|
||||
# Files user doesn't want imported
|
||||
self.do_not_import: list[str] = []
|
||||
# Place to keep reference to importer while it runs
|
||||
self.import_thread: dict[int, QThread] = {}
|
||||
]:
|
||||
self.import_files_data[infile] = TrackFileData()
|
||||
|
||||
def do_import(self) -> None:
|
||||
"""
|
||||
@ -190,206 +174,108 @@ class FileImporter:
|
||||
- import files and either replace existing or add to pool
|
||||
"""
|
||||
|
||||
# check all file are readable
|
||||
self.check_files_are_readable()
|
||||
|
||||
# load readable files and tags into self.import_files
|
||||
for import_file in self.import_files_paths:
|
||||
try:
|
||||
tags = get_tags(import_file)
|
||||
except ApplicationError as e:
|
||||
self.unimportable_files.append(
|
||||
FileErrors(path=import_file, error=str(e))
|
||||
)
|
||||
self.import_files_paths.remove(import_file)
|
||||
try:
|
||||
self.import_files_data.append(
|
||||
TrackFileData(import_file_path=import_file, tags=tags)
|
||||
)
|
||||
except Exception as e:
|
||||
self.unimportable_files.append(
|
||||
FileErrors(path=import_file, error=str(e))
|
||||
)
|
||||
self.import_files_paths.remove(import_file)
|
||||
|
||||
if self.unimportable_files:
|
||||
msg = "The following files could not be read and won't be imported:\n"
|
||||
for unimportable_file in self.unimportable_files:
|
||||
msg += f"\n\t• {unimportable_file.path} ({unimportable_file.error})"
|
||||
show_warning(None, "Unimportable files", msg)
|
||||
|
||||
# check for close matches.
|
||||
for idx in range(len(self.import_files_data)):
|
||||
self.check_match(idx=idx)
|
||||
|
||||
self.import_files_data = [
|
||||
x
|
||||
for x in self.import_files_data
|
||||
if (
|
||||
x.import_file_path not in self.do_not_import
|
||||
or x.track_id != -1
|
||||
)
|
||||
]
|
||||
|
||||
# Import all that's left.
|
||||
for idx in range(len(self.import_files_data)):
|
||||
self._import_file(idx)
|
||||
|
||||
def check_match(self, idx: int) -> None:
|
||||
"""
|
||||
Work on and update the idx element of self.import_file_data.
|
||||
Check for similar existing titles. If none found, set up to
|
||||
import this as a new track. If one is found, check with user
|
||||
whether this is a new track or replacement. If more than one
|
||||
is found, as for one but order the tracks in
|
||||
artist-similarity order.
|
||||
"""
|
||||
|
||||
similar_track_ids = self._find_similar_strings(
|
||||
self.import_files_data[idx].tags.title, self.track_idx_and_title
|
||||
)
|
||||
if len(similar_track_ids) == 0:
|
||||
matching_track = 0
|
||||
elif len(similar_track_ids) == 1:
|
||||
matching_track = self._pick_match(idx, similar_track_ids)
|
||||
else:
|
||||
matching_track = self._pick_match(
|
||||
idx, self.order_by_artist(idx, similar_track_ids)
|
||||
)
|
||||
|
||||
if matching_track < 0: # User cancelled
|
||||
matching_track = -1
|
||||
|
||||
elif matching_track == 0:
|
||||
self.import_files_data[idx].destination_track_path = os.path.join(
|
||||
Config.IMPORT_DESTINATION,
|
||||
os.path.basename(self.import_files_data[idx].import_file_path),
|
||||
)
|
||||
else:
|
||||
self.import_files_data[idx].destination_track_path = self.existing_tracks[
|
||||
matching_track
|
||||
].path
|
||||
|
||||
self.import_files_data[idx].track_id = matching_track
|
||||
|
||||
def _import_file(self, idx: int) -> None:
|
||||
"""
|
||||
Import the file specified at self.import_files_data[idx]
|
||||
"""
|
||||
|
||||
log.debug(f"_import_file({idx=}), {self.import_files_data[idx]=}")
|
||||
|
||||
f = self.import_files_data[idx]
|
||||
|
||||
# Import in separate thread
|
||||
self.import_thread[idx] = QThread()
|
||||
self.worker = DoTrackImport(
|
||||
import_file_path=f.import_file_path,
|
||||
tags=f.tags,
|
||||
destination_track_path=f.destination_track_path,
|
||||
track_id=f.track_id,
|
||||
audio_metadata=helpers.get_audio_metadata(f.import_file_path),
|
||||
base_model=self.base_model,
|
||||
row_number=self.row_number,
|
||||
)
|
||||
|
||||
self.worker.moveToThread(self.import_thread[idx])
|
||||
self.import_thread[idx].started.connect(self.worker.run)
|
||||
self.worker.import_finished.connect(self.import_thread[idx].quit)
|
||||
self.worker.import_finished.connect(self.worker.deleteLater)
|
||||
self.import_thread[idx].finished.connect(self.import_thread[idx].deleteLater)
|
||||
self.import_thread[idx].start()
|
||||
|
||||
def order_by_artist(self, idx: int, track_ids_to_check: list[int]) -> list[int]:
|
||||
"""
|
||||
Return the list of track_ids sorted by how well the artist at idx matches the
|
||||
track artist.
|
||||
"""
|
||||
|
||||
track_idx_and_artist = [
|
||||
((key, a.artist))
|
||||
for key, a in self.existing_tracks.items()
|
||||
if key in track_ids_to_check
|
||||
]
|
||||
# We want to return all of the passed tracks so set minimum_score
|
||||
# to zero
|
||||
return self._find_similar_strings(
|
||||
self.import_files_data[idx].tags.artist,
|
||||
track_idx_and_artist,
|
||||
minimum_score=0.0,
|
||||
)
|
||||
|
||||
def _pick_match(self, idx: int, track_ids: list[int]) -> int:
|
||||
"""
|
||||
Return the track_id selected by the user, including "import as new" which will be
|
||||
track_id 0. Return -1 if user cancels.
|
||||
|
||||
If user chooses not to import this track, remove it from the list of tracks to
|
||||
import and return -1.
|
||||
"""
|
||||
|
||||
log.debug(f"_pick_match({idx=}, {track_ids=})")
|
||||
|
||||
new_track_details = (
|
||||
f"{self.import_files_data[idx].tags.title} "
|
||||
f"({self.import_files_data[idx].tags.artist})"
|
||||
)
|
||||
|
||||
# Build a list of (track title and artist, track_id, track path)
|
||||
choices: list[tuple[str, int, str]] = []
|
||||
# First choice is always to import as a new track
|
||||
choices.append((Config.DO_NOT_IMPORT, -2, ""))
|
||||
choices.append((Config.IMPORT_AS_NEW, 0, ""))
|
||||
for track_id in track_ids:
|
||||
choices.append(
|
||||
(
|
||||
f"{self.existing_tracks[track_id].title} "
|
||||
f"({self.existing_tracks[track_id].artist})",
|
||||
track_id,
|
||||
str(self.existing_tracks[track_id].path),
|
||||
)
|
||||
)
|
||||
|
||||
dialog = PickMatch(new_track_details, choices)
|
||||
if dialog.exec() and dialog.selected_id >= 0:
|
||||
return dialog.selected_id
|
||||
else:
|
||||
self.do_not_import.append(self.import_files_data[idx].import_file_path)
|
||||
return -1
|
||||
|
||||
def check_files_are_readable(self) -> None:
|
||||
"""
|
||||
Check files to be imported are readable. If not, remove them from the
|
||||
import list and add them to the file errors list.
|
||||
"""
|
||||
|
||||
for path in self.import_files_paths:
|
||||
# Check all file are readable and have tags. Mark failures not to
|
||||
# be imported and populate error text.
|
||||
for path in self.import_files_data.keys():
|
||||
if file_is_unreadable(path):
|
||||
self.unimportable_files.append(
|
||||
FileErrors(path=os.path.basename(path), error="File is unreadable")
|
||||
)
|
||||
self.import_files_paths.remove(path)
|
||||
self.import_files_data[path].import_this_file = False
|
||||
self.import_files_data[path].error = f"{path} is unreadable"
|
||||
continue
|
||||
|
||||
def import_files_are_tagged(self) -> list:
|
||||
# Get tags
|
||||
try:
|
||||
self.import_files_data[path].tags = get_tags(path)
|
||||
except ApplicationError as e:
|
||||
self.import_files_data[path].import_this_file = False
|
||||
self.import_files_data[path].error = f"{path} tag errors ({str(e)})"
|
||||
continue
|
||||
|
||||
# Get track match data
|
||||
self.populate_track_match_data(path)
|
||||
# Sort with best artist match first
|
||||
self.import_files_data[path].track_match_data.sort(
|
||||
key=lambda rec: rec.artist_match, reverse=True
|
||||
)
|
||||
|
||||
# Process user choices
|
||||
self.process_user_choices(path)
|
||||
|
||||
# Tell users about files that won't be imported
|
||||
for path in [
|
||||
a
|
||||
for a in self.import_files_data.keys()
|
||||
if not self.import_files_data[a].import_this_file
|
||||
]:
|
||||
msg = (
|
||||
f"{os.path.basename(path)} will not be imported because "
|
||||
f"{self.import_files_data[path].error}"
|
||||
)
|
||||
show_warning(None, "File not imported", msg)
|
||||
|
||||
# Import files once we have data for all of them (to avoid long
|
||||
# drawn-out user interaction while we import files)
|
||||
for path in [
|
||||
a
|
||||
for a in self.import_files_data.keys()
|
||||
if self.import_files_data[a].import_this_file
|
||||
]:
|
||||
self._start_thread(path)
|
||||
|
||||
def _start_thread(self, path: str) -> None:
|
||||
"""
|
||||
Return a (possibly empty) list of all untagged files in the
|
||||
import directory. Add tags to file_data
|
||||
Import the file specified by path
|
||||
"""
|
||||
|
||||
untagged_files: list[str] = []
|
||||
for fullpath in self.import_files_paths:
|
||||
tags = get_tags(fullpath)
|
||||
if not tags:
|
||||
untagged_files.append(os.path.basename(fullpath))
|
||||
# Remove from import list
|
||||
del self.import_files_data[fullpath]
|
||||
log.warning(f"Import: no tags found, {fullpath=}")
|
||||
else:
|
||||
self.import_files_data.append(
|
||||
TrackFileData(import_file_path=fullpath, tags=tags)
|
||||
)
|
||||
log.debug(f"_start_thread({path=})")
|
||||
|
||||
return untagged_files
|
||||
# Create thread and worker
|
||||
thread = QThread()
|
||||
self.worker = DoTrackImport(
|
||||
associated_thread=thread,
|
||||
import_file_path=path,
|
||||
tags=self.import_files_data[path].tags,
|
||||
destination_path=self.import_files_data[path].destination_path,
|
||||
track_id=self.import_files_data[path].track_id
|
||||
)
|
||||
|
||||
# Associate data with the thread
|
||||
self.thread_data[thread] = self.model_data
|
||||
|
||||
# Move self.worker to thread
|
||||
self.worker.moveToThread(thread)
|
||||
|
||||
# Connect signals
|
||||
thread.started.connect(self.worker.run)
|
||||
self.worker.import_finished.connect(self._thread_finished)
|
||||
self.worker.import_finished.connect(thread.quit)
|
||||
self.worker.import_finished.connect(self.worker.deleteLater)
|
||||
thread.finished.connect(thread.deleteLater)
|
||||
|
||||
# Start thread
|
||||
thread.start()
|
||||
|
||||
def _thread_finished(self, track_id: int, thread: QThread) -> None:
|
||||
"""
|
||||
If track already in playlist, refresh it else insert it
|
||||
"""
|
||||
|
||||
model_data = self.thread_data.pop(thread, None)
|
||||
if model_data:
|
||||
if model_data.base_model:
|
||||
model_data.base_model.update_or_insert(track_id, model_data.row_number)
|
||||
|
||||
def _get_existing_track(self, track_id: int) -> Tracks:
|
||||
"""
|
||||
Lookup in existing track in the local cache and return it
|
||||
"""
|
||||
|
||||
existing_track_records = [a for a in self.existing_tracks if a.id == track_id]
|
||||
if len(existing_track_records) != 1:
|
||||
raise ApplicationError(
|
||||
f"Internal error in _get_existing_track: {existing_track_records=}"
|
||||
)
|
||||
|
||||
return existing_track_records[0]
|
||||
|
||||
def _get_existing_tracks(self):
|
||||
"""
|
||||
@ -397,51 +283,161 @@ class FileImporter:
|
||||
"""
|
||||
|
||||
with db.Session() as session:
|
||||
return Tracks.all_tracks_indexed_by_id(session)
|
||||
return Tracks.get_all(session)
|
||||
|
||||
def _find_similar_strings(
|
||||
self,
|
||||
needle: str,
|
||||
haystack: list[tuple[int, str]],
|
||||
minimum_score: float = Config.MINIMUM_FUZZYMATCH,
|
||||
) -> list[int]:
|
||||
def _get_match_score(self, str1: str, str2: str) -> float:
|
||||
"""
|
||||
Search for the needle in the string element of the haystack.
|
||||
Discard similarities less that minimum_score. Return a list of
|
||||
the int element of the haystack in order of decreasing score (ie,
|
||||
best match first).
|
||||
Return the score of how well str1 matches str2.
|
||||
"""
|
||||
|
||||
# Create a dictionary to store similarities
|
||||
similarities: dict[int, float] = {}
|
||||
ratio = fuzz.ratio(str1, str2)
|
||||
partial_ratio = fuzz.partial_ratio(str1, str2)
|
||||
token_sort_ratio = fuzz.token_sort_ratio(str1, str2)
|
||||
token_set_ratio = fuzz.token_set_ratio(str1, str2)
|
||||
|
||||
for hayblade in haystack:
|
||||
# Calculate similarity using multiple metrics
|
||||
ratio = fuzz.ratio(needle, hayblade[1])
|
||||
partial_ratio = fuzz.partial_ratio(needle, hayblade[1])
|
||||
token_sort_ratio = fuzz.token_sort_ratio(needle, hayblade[1])
|
||||
token_set_ratio = fuzz.token_set_ratio(needle, hayblade[1])
|
||||
# Combine scores
|
||||
combined_score = (
|
||||
ratio * 0.25
|
||||
+ partial_ratio * 0.25
|
||||
+ token_sort_ratio * 0.25
|
||||
+ token_set_ratio * 0.25
|
||||
)
|
||||
|
||||
# Combine scores
|
||||
combined_score = (
|
||||
ratio * 0.25
|
||||
+ partial_ratio * 0.25
|
||||
+ token_sort_ratio * 0.25
|
||||
+ token_set_ratio * 0.25
|
||||
)
|
||||
return combined_score
|
||||
|
||||
if combined_score >= minimum_score:
|
||||
similarities[hayblade[0]] = combined_score
|
||||
log.debug(
|
||||
f"_find_similar_strings({needle=}), {len(haystack)=}, "
|
||||
f"{minimum_score=}, {hayblade=}, {combined_score=}"
|
||||
def populate_track_match_data(self, path: str) -> None:
|
||||
"""
|
||||
Populate self.import_files_data[path].track_match_data
|
||||
|
||||
- Search title in existing tracks
|
||||
- if score >= Config.FUZZYMATCH_MINIMUM_LIST:
|
||||
- get artist score
|
||||
- add TrackMatchData to self.import_files_data[path].track_match_data
|
||||
"""
|
||||
|
||||
title = self.import_files_data[path].tags.title
|
||||
artist = self.import_files_data[path].tags.artist
|
||||
|
||||
for track in self.existing_tracks:
|
||||
title_score = self._get_match_score(title, track.title)
|
||||
if title_score >= Config.FUZZYMATCH_MINIMUM_LIST:
|
||||
artist_score = self._get_match_score(artist, track.artist)
|
||||
self.import_files_data[path].track_match_data.append(
|
||||
TrackMatchData(
|
||||
artist=track.artist,
|
||||
artist_match=artist_score,
|
||||
title=track.title,
|
||||
title_match=title_score,
|
||||
track_id=track.id,
|
||||
)
|
||||
)
|
||||
|
||||
# Sort matches by score
|
||||
sorted_matches = sorted(similarities.items(), key=lambda x: x[1], reverse=True)
|
||||
def process_user_choices(self, path: str) -> None:
|
||||
"""
|
||||
Find out whether user wants to import this as a new track,
|
||||
overwrite an existing track or not import it at all.
|
||||
"""
|
||||
|
||||
# Return list of indexes, highest score first
|
||||
return [a[0] for a in sorted_matches]
|
||||
# Build a list of (track title and artist, track_id, track path)
|
||||
choices: list[tuple[str, int, str]] = []
|
||||
|
||||
# First choice is always to import as a new track
|
||||
choices.append((Config.DO_NOT_IMPORT, -1, ""))
|
||||
choices.append((Config.IMPORT_AS_NEW, 0, ""))
|
||||
|
||||
# New track details
|
||||
importing_track_description = (
|
||||
f"{self.import_files_data[path].tags.title} "
|
||||
f"({self.import_files_data[path].tags.artist})"
|
||||
)
|
||||
|
||||
# Select 'import as new' as default unless the top match is good
|
||||
# enough
|
||||
default = 1 # default choice is import as new
|
||||
track_match_data = self.import_files_data[path].track_match_data
|
||||
if (
|
||||
track_match_data[0].artist_match >= Config.FUZZYMATCH_MINIMUM_SELECT_ARTIST
|
||||
and track_match_data[0].title_match
|
||||
>= Config.FUZZYMATCH_MINIMUM_SELECT_TITLE
|
||||
):
|
||||
default = 2
|
||||
|
||||
for rec in track_match_data:
|
||||
existing_track_description = (f"{rec.title} ({rec.artist})")
|
||||
if Config.FUZZYMATCH_SHOW_SCORES:
|
||||
existing_track_description += f" ({rec.title_match:.0f}%)"
|
||||
existing_track_path = self._get_existing_track(rec.track_id).path
|
||||
choices.append(
|
||||
(existing_track_description, rec.track_id, existing_track_path)
|
||||
)
|
||||
|
||||
dialog = PickMatch(
|
||||
new_track_description=importing_track_description,
|
||||
choices=choices,
|
||||
default=default,
|
||||
)
|
||||
if dialog.exec():
|
||||
if dialog.selected_track_id < 0:
|
||||
self.import_files_data[path].import_this_file = False
|
||||
self.import_files_data[path].error = "you asked not to import this file"
|
||||
elif dialog.selected_track_id > 0:
|
||||
self.replace_file(path=path, track_id=dialog.selected_track_id)
|
||||
else:
|
||||
self.import_as_new(path=path)
|
||||
else:
|
||||
# User cancelled dialog
|
||||
self.import_files_data[path].import_this_file = False
|
||||
self.import_files_data[path].error = "you cancelled the import of this file"
|
||||
|
||||
def import_as_new(self, path: str) -> None:
|
||||
"""
|
||||
Import passed path as a new file
|
||||
"""
|
||||
|
||||
log.debug(f"Import as new, {path=}")
|
||||
|
||||
tfd = self.import_files_data[path]
|
||||
|
||||
destination_path = os.path.join(Config.IMPORT_DESTINATION, os.path.basename(path))
|
||||
if os.path.exists(destination_path):
|
||||
tfd.import_this_file = False
|
||||
tfd.error = (
|
||||
f"this is a new import but destination file already exists ({destination_path})"
|
||||
)
|
||||
return
|
||||
|
||||
tfd.destination_path = destination_path
|
||||
|
||||
def replace_file(self, path: str, track_id: int) -> None:
|
||||
"""
|
||||
Replace existing track {track_id=} with passed path
|
||||
"""
|
||||
|
||||
log.debug(f"Replace {track_id=} with {path=}")
|
||||
|
||||
tfd = self.import_files_data[path]
|
||||
|
||||
existing_track_path = self._get_existing_track(track_id).path
|
||||
proposed_destination_path = os.path.join(
|
||||
os.path.dirname(existing_track_path), os.path.basename(path)
|
||||
)
|
||||
# if the destination path exists and it's not the path the
|
||||
# track_id points to, abort
|
||||
if existing_track_path != proposed_destination_path and os.path.exists(
|
||||
proposed_destination_path
|
||||
):
|
||||
tfd.import_this_file = False
|
||||
tfd.error = f"New import would overwrite existing file ({proposed_destination_path})"
|
||||
return
|
||||
tfd.file_path_to_remove = existing_track_path
|
||||
tfd.destination_path = proposed_destination_path
|
||||
tfd.track_id = track_id
|
||||
|
||||
|
||||
@dataclass
|
||||
class ModelData:
|
||||
base_model: PlaylistModel
|
||||
row_number: int
|
||||
|
||||
|
||||
class PickMatch(QDialog):
|
||||
@ -451,14 +447,18 @@ class PickMatch(QDialog):
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, new_track_details: str, items_with_ids: list[tuple[str, int, str]]
|
||||
self,
|
||||
new_track_description: str,
|
||||
choices: list[tuple[str, int, str]],
|
||||
default: int,
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self.new_track_details = new_track_details
|
||||
self.init_ui(items_with_ids)
|
||||
self.selected_id = -1
|
||||
self.new_track_description = new_track_description
|
||||
self.default = default
|
||||
self.init_ui(choices)
|
||||
self.selected_track_id = -1
|
||||
|
||||
def init_ui(self, items_with_ids: list[tuple[str, int, str]]) -> None:
|
||||
def init_ui(self, choices: list[tuple[str, int, str]]) -> None:
|
||||
"""
|
||||
Set up dialog
|
||||
"""
|
||||
@ -469,7 +469,7 @@ class PickMatch(QDialog):
|
||||
|
||||
# Add instructions
|
||||
instructions = (
|
||||
f"Importing {self.new_track_details}.\n"
|
||||
f"Importing {self.new_track_description}.\n"
|
||||
"Import as a new track or replace existing track?"
|
||||
)
|
||||
instructions_label = QLabel(instructions)
|
||||
@ -479,25 +479,25 @@ class PickMatch(QDialog):
|
||||
self.button_group = QButtonGroup()
|
||||
|
||||
# Add radio buttons for each item
|
||||
for idx, (text, track_id, track_path) in enumerate(items_with_ids):
|
||||
for idx, (track_description, track_id, track_path) in enumerate(choices):
|
||||
if (
|
||||
track_sequence.current
|
||||
and track_id
|
||||
and track_sequence.current.track_id == track_id
|
||||
):
|
||||
# Don't allow current track to be replaced
|
||||
text = "(Currently playing) " + text
|
||||
radio_button = QRadioButton(text)
|
||||
track_description = "(Currently playing) " + track_description
|
||||
radio_button = QRadioButton(track_description)
|
||||
radio_button.setDisabled(True)
|
||||
self.button_group.addButton(radio_button, -1)
|
||||
else:
|
||||
radio_button = QRadioButton(text)
|
||||
radio_button = QRadioButton(track_description)
|
||||
radio_button.setToolTip(track_path)
|
||||
self.button_group.addButton(radio_button, track_id)
|
||||
layout.addWidget(radio_button)
|
||||
|
||||
# Select the second item by default (import as new)
|
||||
if idx == 1:
|
||||
if idx == self.default:
|
||||
radio_button.setChecked(True)
|
||||
|
||||
# Add OK and Cancel buttons
|
||||
@ -516,7 +516,7 @@ class PickMatch(QDialog):
|
||||
|
||||
def on_ok(self):
|
||||
# Get the ID of the selected button
|
||||
self.selected_id = self.button_group.checkedId()
|
||||
self.selected_track_id = self.button_group.checkedId()
|
||||
self.accept()
|
||||
|
||||
|
||||
@ -526,16 +526,19 @@ class TrackFileData:
|
||||
Simple class to track details changes to a track file
|
||||
"""
|
||||
|
||||
import_file_path: str
|
||||
tags: Tags
|
||||
destination_track_path: str = ""
|
||||
file_path_to_removed: Optional[str] = None
|
||||
tags: Tags = Tags()
|
||||
destination_path: str = ""
|
||||
import_this_file: bool = True
|
||||
error: str = ""
|
||||
file_path_to_remove: Optional[str] = None
|
||||
track_id: int = 0
|
||||
audio_metadata: Optional[AudioMetadata] = None
|
||||
track_match_data: list[TrackMatchData] = field(default_factory=list)
|
||||
|
||||
def set_destination_track_path(self, path: str) -> None:
|
||||
"""
|
||||
Assigned the passed path
|
||||
"""
|
||||
|
||||
self.destination_track_path = path
|
||||
@dataclass
|
||||
class TrackMatchData:
|
||||
artist: str
|
||||
artist_match: float
|
||||
title: str
|
||||
title_match: float
|
||||
track_id: int
|
||||
|
||||
@ -204,6 +204,14 @@ def get_tags(path: str) -> Tags:
|
||||
except TinyTagException:
|
||||
raise ApplicationError(f"Can't read tags: get_tags({path=})")
|
||||
|
||||
if not tag.title:
|
||||
tag.title = ""
|
||||
if not tag.artist:
|
||||
tag.artist = ""
|
||||
if not tag.bitrate:
|
||||
tag.bitrate = 0.0
|
||||
if not tag.duration:
|
||||
tag.duration = 0.0
|
||||
return Tags(
|
||||
title=tag.title,
|
||||
artist=tag.artist,
|
||||
|
||||
@ -571,18 +571,18 @@ class Window(QMainWindow, Ui_MainWindow):
|
||||
)
|
||||
self.actionExport_playlist.triggered.connect(self.export_playlist_tab)
|
||||
self.actionFade.triggered.connect(self.fade)
|
||||
self.actionImport_files.triggered.connect(self.import_files_wrapper)
|
||||
self.actionInsertSectionHeader.triggered.connect(self.insert_header)
|
||||
self.actionInsertTrack.triggered.connect(self.insert_track)
|
||||
self.actionManage_templates.triggered.connect(self.manage_templates)
|
||||
self.actionMark_for_moving.triggered.connect(self.mark_rows_for_moving)
|
||||
self.actionMoveSelected.triggered.connect(self.move_selected)
|
||||
self.actionMoveUnplayed.triggered.connect(self.move_unplayed)
|
||||
self.actionManage_templates.triggered.connect(self.manage_templates)
|
||||
self.actionNewPlaylist.triggered.connect(self.new_playlist)
|
||||
self.actionOpenPlaylist.triggered.connect(self.open_playlist)
|
||||
self.actionPaste.triggered.connect(self.paste_rows)
|
||||
self.actionPlay_next.triggered.connect(self.play_next)
|
||||
self.actionRenamePlaylist.triggered.connect(self.rename_playlist)
|
||||
self.actionReplace_files.triggered.connect(self.import_files_wrapper)
|
||||
self.actionResume.triggered.connect(self.resume)
|
||||
self.actionSave_as_template.triggered.connect(self.save_as_template)
|
||||
self.actionSearch_title_in_Songfacts.triggered.connect(
|
||||
|
||||
@ -1582,6 +1582,21 @@ class PlaylistModel(QAbstractTableModel):
|
||||
)
|
||||
)
|
||||
|
||||
def update_or_insert(self, track_id: int, row_number: int) -> None:
|
||||
"""
|
||||
If the passed track_id exists in this playlist, update the
|
||||
row(s), otherwise insert this track at row_number.
|
||||
"""
|
||||
|
||||
track_rows = [a.row_number for a in self.playlist_rows.values() if a.track_id == track_id]
|
||||
if track_rows:
|
||||
with db.Session() as session:
|
||||
for row in track_rows:
|
||||
self.refresh_row(session, row)
|
||||
self.invalidate_rows(track_rows)
|
||||
else:
|
||||
self.insert_row(proposed_row_number=row_number, track_id=track_id)
|
||||
|
||||
def update_track_times(self) -> None:
|
||||
"""
|
||||
Update track start/end times in self.playlist_rows
|
||||
|
||||
@ -1000,7 +1000,7 @@ padding-left: 8px;</string>
|
||||
<addaction name="actionSave_as_template"/>
|
||||
<addaction name="actionManage_templates"/>
|
||||
<addaction name="separator"/>
|
||||
<addaction name="actionReplace_files"/>
|
||||
<addaction name="actionImport_files"/>
|
||||
<addaction name="separator"/>
|
||||
<addaction name="actionE_xit"/>
|
||||
</widget>
|
||||
@ -1364,7 +1364,7 @@ padding-left: 8px;</string>
|
||||
<string>Select duplicate rows...</string>
|
||||
</property>
|
||||
</action>
|
||||
<action name="actionReplace_files">
|
||||
<action name="actionImport_files">
|
||||
<property name="text">
|
||||
<string>Import files...</string>
|
||||
</property>
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
# Form implementation generated from reading ui file 'app/ui/main_window.ui'
|
||||
#
|
||||
# Created by: PyQt6 UI code generator 6.7.1
|
||||
# Created by: PyQt6 UI code generator 6.8.0
|
||||
#
|
||||
# WARNING: Any manual changes made to this file will be lost when pyuic6 is
|
||||
# run again. Do not edit this file unless you know what you are doing.
|
||||
@ -529,8 +529,8 @@ class Ui_MainWindow(object):
|
||||
self.actionSearch_title_in_Songfacts.setObjectName("actionSearch_title_in_Songfacts")
|
||||
self.actionSelect_duplicate_rows = QtGui.QAction(parent=MainWindow)
|
||||
self.actionSelect_duplicate_rows.setObjectName("actionSelect_duplicate_rows")
|
||||
self.actionReplace_files = QtGui.QAction(parent=MainWindow)
|
||||
self.actionReplace_files.setObjectName("actionReplace_files")
|
||||
self.actionImport_files = QtGui.QAction(parent=MainWindow)
|
||||
self.actionImport_files.setObjectName("actionImport_files")
|
||||
self.menuFile.addSeparator()
|
||||
self.menuFile.addAction(self.actionInsertTrack)
|
||||
self.menuFile.addAction(self.actionRemove)
|
||||
@ -557,7 +557,7 @@ class Ui_MainWindow(object):
|
||||
self.menuPlaylist.addAction(self.actionSave_as_template)
|
||||
self.menuPlaylist.addAction(self.actionManage_templates)
|
||||
self.menuPlaylist.addSeparator()
|
||||
self.menuPlaylist.addAction(self.actionReplace_files)
|
||||
self.menuPlaylist.addAction(self.actionImport_files)
|
||||
self.menuPlaylist.addSeparator()
|
||||
self.menuPlaylist.addAction(self.actionE_xit)
|
||||
self.menuSearc_h.addAction(self.actionSetNext)
|
||||
@ -676,6 +676,6 @@ class Ui_MainWindow(object):
|
||||
self.actionSearch_title_in_Songfacts.setText(_translate("MainWindow", "Search title in Songfacts"))
|
||||
self.actionSearch_title_in_Songfacts.setShortcut(_translate("MainWindow", "Ctrl+S"))
|
||||
self.actionSelect_duplicate_rows.setText(_translate("MainWindow", "Select duplicate rows..."))
|
||||
self.actionReplace_files.setText(_translate("MainWindow", "Import files..."))
|
||||
from infotabs import InfoTabs # type: ignore
|
||||
from pyqtgraph import PlotWidget # type: ignore
|
||||
self.actionImport_files.setText(_translate("MainWindow", "Import files..."))
|
||||
from infotabs import InfoTabs
|
||||
from pyqtgraph import PlotWidget
|
||||
|
||||
Loading…
Reference in New Issue
Block a user