537 lines
18 KiB
Python
537 lines
18 KiB
Python
# Standard library imports
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from fuzzywuzzy import fuzz # type: ignore
|
|
import os.path
|
|
from typing import Optional
|
|
import os
|
|
import shutil
|
|
|
|
# PyQt imports
|
|
from PyQt6.QtCore import (
|
|
pyqtSignal,
|
|
QObject,
|
|
QThread,
|
|
)
|
|
from PyQt6.QtWidgets import (
|
|
QButtonGroup,
|
|
QDialog,
|
|
QHBoxLayout,
|
|
QLabel,
|
|
QPushButton,
|
|
QRadioButton,
|
|
QStatusBar,
|
|
QVBoxLayout,
|
|
)
|
|
|
|
# Third party imports
|
|
|
|
# App imports
|
|
from classes import (
|
|
ApplicationError,
|
|
AudioMetadata,
|
|
FileErrors,
|
|
MusicMusterSignals,
|
|
Tags,
|
|
)
|
|
from config import Config
|
|
from helpers import (
|
|
file_is_unreadable,
|
|
get_tags,
|
|
show_warning,
|
|
)
|
|
from log import log
|
|
from models import db, Tracks
|
|
from music_manager import track_sequence
|
|
from playlistmodel import PlaylistModel
|
|
import helpers
|
|
|
|
|
|
class DoTrackImport(QObject):
|
|
import_finished = pyqtSignal()
|
|
|
|
def __init__(
|
|
self,
|
|
import_file_path: str,
|
|
tags: Tags,
|
|
destination_track_path: str,
|
|
track_id: int,
|
|
audio_metadata: AudioMetadata,
|
|
base_model: PlaylistModel,
|
|
row_number: Optional[int],
|
|
) -> None:
|
|
"""
|
|
Save parameters
|
|
"""
|
|
|
|
super().__init__()
|
|
|
|
self.import_file_path = import_file_path
|
|
self.tags = tags
|
|
self.destination_track_path = destination_track_path
|
|
self.track_id = track_id
|
|
self.audio_metadata = audio_metadata
|
|
self.base_model = base_model
|
|
|
|
if row_number is None:
|
|
self.next_row_number = base_model.rowCount()
|
|
else:
|
|
self.next_row_number = row_number
|
|
|
|
self.signals = MusicMusterSignals()
|
|
|
|
def run(self) -> None:
|
|
"""
|
|
Either create track objects from passed files or update exising track
|
|
objects.
|
|
|
|
And add to visible playlist or update playlist if track already present.
|
|
"""
|
|
|
|
temp_file: Optional[str] = None
|
|
|
|
# If destination exists, move it out of the way
|
|
if os.path.exists(self.destination_track_path):
|
|
temp_file = self.destination_track_path + ".TMP"
|
|
shutil.move(self.destination_track_path, temp_file)
|
|
# Move file to destination
|
|
shutil.move(self.import_file_path, self.destination_track_path)
|
|
|
|
with db.Session() as session:
|
|
self.signals.status_message_signal.emit(
|
|
f"Importing {os.path.basename(self.import_file_path)}", 5000
|
|
)
|
|
|
|
if self.track_id == 0:
|
|
# Import new track
|
|
try:
|
|
track = Tracks(
|
|
session,
|
|
path=self.destination_track_path,
|
|
**self.tags._asdict(),
|
|
**self.audio_metadata._asdict(),
|
|
)
|
|
except Exception as e:
|
|
self.signals.show_warning_signal.emit(
|
|
"Error importing track", str(e)
|
|
)
|
|
return
|
|
else:
|
|
track = session.get(Tracks, self.track_id)
|
|
if track:
|
|
for key, value in self.tags._asdict().items():
|
|
if hasattr(track, key):
|
|
setattr(track, key, value)
|
|
for key, value in self.audio_metadata._asdict().items():
|
|
if hasattr(track, key):
|
|
setattr(track, key, value)
|
|
track.path = self.destination_track_path
|
|
session.commit()
|
|
|
|
helpers.normalise_track(self.destination_track_path)
|
|
self.base_model.insert_row(self.next_row_number, track.id, "imported")
|
|
self.next_row_number += 1
|
|
|
|
self.signals.status_message_signal.emit(
|
|
f"{os.path.basename(self.import_file_path)} imported", 10000
|
|
)
|
|
self.import_finished.emit()
|
|
|
|
|
|
class FileImporter:
|
|
"""
|
|
Manage importing of files
|
|
"""
|
|
|
|
def __init__(
|
|
self, base_model: PlaylistModel, row_number: Optional[int] = None
|
|
) -> None:
|
|
"""
|
|
Set up class
|
|
"""
|
|
|
|
# Save parameters
|
|
self.base_model = base_model
|
|
if row_number:
|
|
self.row_number = row_number
|
|
else:
|
|
self.row_number = base_model.rowCount()
|
|
# Data structure to track files to import
|
|
self.import_files_data: list[TrackFileData] = []
|
|
# Dictionary of exsting tracks
|
|
self.existing_tracks = self._get_existing_tracks()
|
|
# List of track_id, title tuples
|
|
self.track_idx_and_title = [
|
|
((a.id, a.title)) for a in self.existing_tracks.values()
|
|
]
|
|
# Files to import
|
|
self.import_files_paths = [
|
|
os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f)
|
|
for f in os.listdir(Config.REPLACE_FILES_DEFAULT_SOURCE)
|
|
if f.endswith((".mp3", ".flac"))
|
|
]
|
|
# Files we can't import
|
|
self.unimportable_files: list[FileErrors] = []
|
|
# Files user doesn't want imported
|
|
self.do_not_import: list[str] = []
|
|
|
|
def do_import(self) -> None:
|
|
"""
|
|
Scan source directory and:
|
|
- check all file are readable
|
|
- load readable files and tags into self.import_files
|
|
- check all files are tagged
|
|
- check for exact match of existing file
|
|
- check for duplicates and replacements
|
|
- allow deselection of import for any one file
|
|
- import files and either replace existing or add to pool
|
|
"""
|
|
|
|
# check all file are readable
|
|
self.check_files_are_readable()
|
|
|
|
# load readable files and tags into self.import_files
|
|
for import_file in self.import_files_paths:
|
|
try:
|
|
tags = get_tags(import_file)
|
|
except ApplicationError as e:
|
|
self.unimportable_files.append(
|
|
FileErrors(path=import_file, error=str(e))
|
|
)
|
|
self.import_files_paths.remove(import_file)
|
|
try:
|
|
self.import_files_data.append(
|
|
TrackFileData(import_file_path=import_file, tags=tags)
|
|
)
|
|
except Exception as e:
|
|
self.unimportable_files.append(
|
|
FileErrors(path=import_file, error=str(e))
|
|
)
|
|
self.import_files_paths.remove(import_file)
|
|
|
|
if self.unimportable_files:
|
|
msg = "The following files could not be read and won't be imported:\n"
|
|
for unimportable_file in self.unimportable_files:
|
|
msg += f"\n\t• {unimportable_file.path} ({unimportable_file.error})"
|
|
show_warning(None, "Unimportable files", msg)
|
|
|
|
# check for close matches.
|
|
for idx in range(len(self.import_files_data)):
|
|
self.check_match(idx=idx)
|
|
|
|
self.import_files_data = [
|
|
x
|
|
for x in self.import_files_data
|
|
if x.import_file_path not in self.do_not_import
|
|
]
|
|
|
|
# Import all that's left.
|
|
for idx in range(len(self.import_files_data)):
|
|
self._import_file(idx)
|
|
|
|
def check_match(self, idx: int) -> None:
|
|
"""
|
|
Work on and update the idx element of self.import_file_data.
|
|
Check for similar existing titles. If none found, set up to
|
|
import this as a new track. If one is found, check with user
|
|
whether this is a new track or replacement. If more than one
|
|
is found, as for one but order the tracks in
|
|
artist-similarity order.
|
|
"""
|
|
|
|
similar_track_ids = self._find_similar_strings(
|
|
self.import_files_data[idx].tags.title, self.track_idx_and_title
|
|
)
|
|
if len(similar_track_ids) == 0:
|
|
matching_track = 0
|
|
elif len(similar_track_ids) == 1:
|
|
matching_track = self._pick_match(idx, similar_track_ids)
|
|
else:
|
|
matching_track = self._pick_match(
|
|
idx, self.order_by_artist(idx, similar_track_ids)
|
|
)
|
|
|
|
if matching_track < 0: # User cancelled
|
|
return
|
|
|
|
if matching_track == 0:
|
|
self.import_files_data[idx].destination_track_path = os.path.join(
|
|
Config.IMPORT_DESTINATION,
|
|
os.path.basename(self.import_files_data[idx].import_file_path),
|
|
)
|
|
else:
|
|
self.import_files_data[idx].destination_track_path = self.existing_tracks[
|
|
matching_track
|
|
].path
|
|
|
|
self.import_files_data[idx].track_id = matching_track
|
|
|
|
def _import_file(self, idx: int) -> None:
|
|
"""
|
|
Import the file specified at self.import_files_data[idx]
|
|
"""
|
|
|
|
log.debug(f"_import_file({idx=}), {self.import_files_data[idx]=}")
|
|
|
|
f = self.import_files_data[idx]
|
|
|
|
# Import in separate thread
|
|
self.import_thread = QThread()
|
|
self.worker = DoTrackImport(
|
|
import_file_path=f.import_file_path,
|
|
tags=f.tags,
|
|
destination_track_path=f.destination_track_path,
|
|
track_id=f.track_id,
|
|
audio_metadata=helpers.get_audio_metadata(f.import_file_path),
|
|
base_model=self.base_model,
|
|
row_number=self.row_number,
|
|
)
|
|
|
|
self.worker.moveToThread(self.import_thread)
|
|
self.import_thread.started.connect(self.worker.run)
|
|
self.worker.import_finished.connect(self.import_thread.quit)
|
|
self.worker.import_finished.connect(self.worker.deleteLater)
|
|
self.import_thread.finished.connect(self.import_thread.deleteLater)
|
|
self.import_thread.start()
|
|
|
|
def order_by_artist(self, idx: int, track_ids_to_check: list[int]) -> list[int]:
|
|
"""
|
|
Return the list of track_ids sorted by how well the artist at idx matches the
|
|
track artist.
|
|
"""
|
|
|
|
track_idx_and_artist = [
|
|
((key, a.artist))
|
|
for key, a in self.existing_tracks.items()
|
|
if key in track_ids_to_check
|
|
]
|
|
# We want to return all of the passed tracks so set minimum_score
|
|
# to zero
|
|
return self._find_similar_strings(
|
|
self.import_files_data[idx].tags.artist,
|
|
track_idx_and_artist,
|
|
minimum_score=0.0,
|
|
)
|
|
|
|
def _pick_match(self, idx: int, track_ids: list[int]) -> int:
|
|
"""
|
|
Return the track_id selected by the user, including "import as new" which will be
|
|
track_id 0. Return -1 if user cancels.
|
|
|
|
If user chooses not to import this track, remove it from the list of tracks to
|
|
import and return -1.
|
|
"""
|
|
|
|
log.debug(f"_pick_match({idx=}, {track_ids=})")
|
|
|
|
new_track_details = (
|
|
f"{self.import_files_data[idx].tags.title} "
|
|
f"({self.import_files_data[idx].tags.artist})"
|
|
)
|
|
|
|
# Build a list of (track title and artist, track_id, track path)
|
|
choices: list[tuple[str, int, str]] = []
|
|
# First choice is always to import as a new track
|
|
choices.append((Config.DO_NOT_IMPORT, -2, ""))
|
|
choices.append((Config.IMPORT_AS_NEW, 0, ""))
|
|
for track_id in track_ids:
|
|
choices.append(
|
|
(
|
|
f"{self.existing_tracks[track_id].title} "
|
|
f"({self.existing_tracks[track_id].artist})",
|
|
track_id,
|
|
str(self.existing_tracks[track_id].path),
|
|
)
|
|
)
|
|
|
|
dialog = PickMatch(new_track_details, choices)
|
|
if dialog.exec() and dialog.selected_id >= 0:
|
|
return dialog.selected_id
|
|
else:
|
|
self.do_not_import.append(self.import_files_data[idx].import_file_path)
|
|
return -1
|
|
|
|
def check_files_are_readable(self) -> None:
|
|
"""
|
|
Check files to be imported are readable. If not, remove them from the
|
|
import list and add them to the file errors list.
|
|
"""
|
|
|
|
for path in self.import_files_paths:
|
|
if file_is_unreadable(path):
|
|
self.unimportable_files.append(
|
|
FileErrors(path=os.path.basename(path), error="File is unreadable")
|
|
)
|
|
self.import_files_paths.remove(path)
|
|
|
|
def import_files_are_tagged(self) -> list:
|
|
"""
|
|
Return a (possibly empty) list of all untagged files in the
|
|
import directory. Add tags to file_data
|
|
"""
|
|
|
|
untagged_files: list[str] = []
|
|
for fullpath in self.import_files_paths:
|
|
tags = get_tags(fullpath)
|
|
if not tags:
|
|
untagged_files.append(os.path.basename(fullpath))
|
|
# Remove from import list
|
|
del self.import_files_data[fullpath]
|
|
log.warning(f"Import: no tags found, {fullpath=}")
|
|
else:
|
|
self.import_files_data.append(
|
|
TrackFileData(import_file_path=fullpath, tags=tags)
|
|
)
|
|
|
|
return untagged_files
|
|
|
|
def _get_existing_tracks(self):
|
|
"""
|
|
Return a dictionary {title: Track} for all existing tracks
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
return Tracks.all_tracks_indexed_by_id(session)
|
|
|
|
def _find_similar_strings(
|
|
self,
|
|
needle: str,
|
|
haystack: list[tuple[int, str]],
|
|
minimum_score: float = Config.MINIMUM_FUZZYMATCH,
|
|
) -> list[int]:
|
|
"""
|
|
Search for the needle in the string element of the haystack.
|
|
Discard similarities less that minimum_score. Return a list of
|
|
the int element of the haystack in order of decreasing score (ie,
|
|
best match first).
|
|
"""
|
|
|
|
# Create a dictionary to store similarities
|
|
similarities: dict[int, float] = {}
|
|
|
|
for hayblade in haystack:
|
|
# Calculate similarity using multiple metrics
|
|
ratio = fuzz.ratio(needle, hayblade[1])
|
|
partial_ratio = fuzz.partial_ratio(needle, hayblade[1])
|
|
token_sort_ratio = fuzz.token_sort_ratio(needle, hayblade[1])
|
|
token_set_ratio = fuzz.token_set_ratio(needle, hayblade[1])
|
|
|
|
# Combine scores
|
|
combined_score = (
|
|
ratio * 0.25
|
|
+ partial_ratio * 0.25
|
|
+ token_sort_ratio * 0.25
|
|
+ token_set_ratio * 0.25
|
|
)
|
|
|
|
if combined_score >= minimum_score:
|
|
similarities[hayblade[0]] = combined_score
|
|
log.debug(
|
|
f"_find_similar_strings({needle=}), {len(haystack)=}, "
|
|
f"{minimum_score=}, {hayblade=}, {combined_score=}"
|
|
)
|
|
|
|
# Sort matches by score
|
|
sorted_matches = sorted(similarities.items(), key=lambda x: x[1], reverse=True)
|
|
|
|
# Return list of indexes, highest score first
|
|
return [a[0] for a in sorted_matches]
|
|
|
|
|
|
class PickMatch(QDialog):
|
|
"""
|
|
Dialog for user to select which existing track to replace or to
|
|
import to a new track
|
|
"""
|
|
|
|
def __init__(
|
|
self, new_track_details: str, items_with_ids: list[tuple[str, int, str]]
|
|
) -> None:
|
|
super().__init__()
|
|
self.new_track_details = new_track_details
|
|
self.init_ui(items_with_ids)
|
|
self.selected_id = -1
|
|
|
|
def init_ui(self, items_with_ids: list[tuple[str, int, str]]) -> None:
|
|
"""
|
|
Set up dialog
|
|
"""
|
|
|
|
self.setWindowTitle("New or replace")
|
|
|
|
layout = QVBoxLayout()
|
|
|
|
# Add instructions
|
|
instructions = (
|
|
f"Importing {self.new_track_details}.\n"
|
|
"Import as a new track or replace existing track?"
|
|
)
|
|
instructions_label = QLabel(instructions)
|
|
layout.addWidget(instructions_label)
|
|
|
|
# Create a button group for radio buttons
|
|
self.button_group = QButtonGroup()
|
|
|
|
# Add radio buttons for each item
|
|
for idx, (text, track_id, track_path) in enumerate(items_with_ids):
|
|
if (
|
|
track_sequence.current
|
|
and track_id
|
|
and track_sequence.current.track_id == track_id
|
|
):
|
|
# Don't allow current track to be replaced
|
|
text = "(Currently playing) " + text
|
|
radio_button = QRadioButton(text)
|
|
radio_button.setDisabled(True)
|
|
self.button_group.addButton(radio_button, -1)
|
|
else:
|
|
radio_button = QRadioButton(text)
|
|
radio_button.setToolTip(track_path)
|
|
self.button_group.addButton(radio_button, track_id)
|
|
layout.addWidget(radio_button)
|
|
|
|
# Select the second item by default (import as new)
|
|
if idx == 1:
|
|
radio_button.setChecked(True)
|
|
|
|
# Add OK and Cancel buttons
|
|
button_layout = QHBoxLayout()
|
|
ok_button = QPushButton("OK")
|
|
cancel_button = QPushButton("Cancel")
|
|
button_layout.addWidget(ok_button)
|
|
button_layout.addWidget(cancel_button)
|
|
layout.addLayout(button_layout)
|
|
|
|
self.setLayout(layout)
|
|
|
|
# Connect buttons to actions
|
|
ok_button.clicked.connect(self.on_ok)
|
|
cancel_button.clicked.connect(self.reject)
|
|
|
|
def on_ok(self):
|
|
# Get the ID of the selected button
|
|
self.selected_id = self.button_group.checkedId()
|
|
self.accept()
|
|
|
|
|
|
@dataclass
|
|
class TrackFileData:
|
|
"""
|
|
Simple class to track details changes to a track file
|
|
"""
|
|
|
|
import_file_path: str
|
|
tags: Tags
|
|
destination_track_path: str = ""
|
|
file_path_to_removed: Optional[str] = None
|
|
track_id: int = 0
|
|
audio_metadata: Optional[AudioMetadata] = None
|
|
|
|
def set_destination_track_path(self, path: str) -> None:
|
|
"""
|
|
Assigned the passed path
|
|
"""
|
|
|
|
self.destination_track_path = path
|