musicmuster/app/file_importer.py
2025-01-01 13:13:54 +00:00

542 lines
18 KiB
Python

# Standard library imports
from __future__ import annotations
from dataclasses import dataclass
from fuzzywuzzy import fuzz # type: ignore
import os.path
from typing import Optional
import os
import shutil
# PyQt imports
from PyQt6.QtCore import (
pyqtSignal,
QObject,
QThread,
)
from PyQt6.QtWidgets import (
QButtonGroup,
QDialog,
QHBoxLayout,
QLabel,
QPushButton,
QRadioButton,
QStatusBar,
QVBoxLayout,
)
# Third party imports
# App imports
from classes import (
ApplicationError,
AudioMetadata,
FileErrors,
MusicMusterSignals,
Tags,
)
from config import Config
from helpers import (
file_is_unreadable,
get_tags,
show_warning,
)
from log import log
from models import db, Tracks
from music_manager import track_sequence
from playlistmodel import PlaylistModel
import helpers
class DoTrackImport(QObject):
import_finished = pyqtSignal()
def __init__(
self,
import_file_path: str,
tags: Tags,
destination_track_path: str,
track_id: int,
audio_metadata: AudioMetadata,
base_model: PlaylistModel,
row_number: Optional[int],
) -> None:
"""
Save parameters
"""
super().__init__()
self.import_file_path = import_file_path
self.tags = tags
self.destination_track_path = destination_track_path
self.track_id = track_id
self.audio_metadata = audio_metadata
self.base_model = base_model
if row_number is None:
self.next_row_number = base_model.rowCount()
else:
self.next_row_number = row_number
self.signals = MusicMusterSignals()
def run(self) -> None:
"""
Either create track objects from passed files or update exising track
objects.
And add to visible playlist or update playlist if track already present.
"""
temp_file: Optional[str] = None
# If destination exists, move it out of the way
if os.path.exists(self.destination_track_path):
temp_file = self.destination_track_path + ".TMP"
shutil.move(self.destination_track_path, temp_file)
# Move file to destination
shutil.move(self.import_file_path, self.destination_track_path)
with db.Session() as session:
self.signals.status_message_signal.emit(
f"Importing {os.path.basename(self.import_file_path)}", 5000
)
if self.track_id == 0:
# Import new track
try:
track = Tracks(
session,
path=self.destination_track_path,
**self.tags._asdict(),
**self.audio_metadata._asdict(),
)
except Exception as e:
self.signals.show_warning_signal.emit(
"Error importing track", str(e)
)
return
else:
track = session.get(Tracks, self.track_id)
if track:
for key, value in self.tags._asdict().items():
if hasattr(track, key):
setattr(track, key, value)
for key, value in self.audio_metadata._asdict().items():
if hasattr(track, key):
setattr(track, key, value)
track.path = self.destination_track_path
session.commit()
helpers.normalise_track(self.destination_track_path)
self.base_model.insert_row(self.next_row_number, track.id, "imported")
self.next_row_number += 1
self.signals.status_message_signal.emit(
f"{os.path.basename(self.import_file_path)} imported", 10000
)
self.import_finished.emit()
class FileImporter:
"""
Manage importing of files
"""
def __init__(
self, base_model: PlaylistModel, row_number: Optional[int] = None
) -> None:
"""
Set up class
"""
# Save parameters
self.base_model = base_model
if row_number:
self.row_number = row_number
else:
self.row_number = base_model.rowCount()
# Data structure to track files to import
self.import_files_data: list[TrackFileData] = []
# Dictionary of exsting tracks
self.existing_tracks = self._get_existing_tracks()
# List of track_id, title tuples
self.track_idx_and_title = [
((a.id, a.title)) for a in self.existing_tracks.values()
]
# Files to import
self.import_files_paths = [
os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f)
for f in os.listdir(Config.REPLACE_FILES_DEFAULT_SOURCE)
if f.endswith((".mp3", ".flac"))
]
# Files we can't import
self.unimportable_files: list[FileErrors] = []
# Files user doesn't want imported
self.do_not_import: list[str] = []
# Place to keep reference to importer while it runs
self.import_thread: dict[int, QThread] = {}
def do_import(self) -> None:
"""
Scan source directory and:
- check all file are readable
- load readable files and tags into self.import_files
- check all files are tagged
- check for exact match of existing file
- check for duplicates and replacements
- allow deselection of import for any one file
- import files and either replace existing or add to pool
"""
# check all file are readable
self.check_files_are_readable()
# load readable files and tags into self.import_files
for import_file in self.import_files_paths:
try:
tags = get_tags(import_file)
except ApplicationError as e:
self.unimportable_files.append(
FileErrors(path=import_file, error=str(e))
)
self.import_files_paths.remove(import_file)
try:
self.import_files_data.append(
TrackFileData(import_file_path=import_file, tags=tags)
)
except Exception as e:
self.unimportable_files.append(
FileErrors(path=import_file, error=str(e))
)
self.import_files_paths.remove(import_file)
if self.unimportable_files:
msg = "The following files could not be read and won't be imported:\n"
for unimportable_file in self.unimportable_files:
msg += f"\n\t{unimportable_file.path} ({unimportable_file.error})"
show_warning(None, "Unimportable files", msg)
# check for close matches.
for idx in range(len(self.import_files_data)):
self.check_match(idx=idx)
self.import_files_data = [
x
for x in self.import_files_data
if (
x.import_file_path not in self.do_not_import
or x.track_id != -1
)
]
# Import all that's left.
for idx in range(len(self.import_files_data)):
self._import_file(idx)
def check_match(self, idx: int) -> None:
"""
Work on and update the idx element of self.import_file_data.
Check for similar existing titles. If none found, set up to
import this as a new track. If one is found, check with user
whether this is a new track or replacement. If more than one
is found, as for one but order the tracks in
artist-similarity order.
"""
similar_track_ids = self._find_similar_strings(
self.import_files_data[idx].tags.title, self.track_idx_and_title
)
if len(similar_track_ids) == 0:
matching_track = 0
elif len(similar_track_ids) == 1:
matching_track = self._pick_match(idx, similar_track_ids)
else:
matching_track = self._pick_match(
idx, self.order_by_artist(idx, similar_track_ids)
)
if matching_track < 0: # User cancelled
matching_track = -1
elif matching_track == 0:
self.import_files_data[idx].destination_track_path = os.path.join(
Config.IMPORT_DESTINATION,
os.path.basename(self.import_files_data[idx].import_file_path),
)
else:
self.import_files_data[idx].destination_track_path = self.existing_tracks[
matching_track
].path
self.import_files_data[idx].track_id = matching_track
def _import_file(self, idx: int) -> None:
"""
Import the file specified at self.import_files_data[idx]
"""
log.debug(f"_import_file({idx=}), {self.import_files_data[idx]=}")
f = self.import_files_data[idx]
# Import in separate thread
self.import_thread[idx] = QThread()
self.worker = DoTrackImport(
import_file_path=f.import_file_path,
tags=f.tags,
destination_track_path=f.destination_track_path,
track_id=f.track_id,
audio_metadata=helpers.get_audio_metadata(f.import_file_path),
base_model=self.base_model,
row_number=self.row_number,
)
self.worker.moveToThread(self.import_thread[idx])
self.import_thread[idx].started.connect(self.worker.run)
self.worker.import_finished.connect(self.import_thread[idx].quit)
self.worker.import_finished.connect(self.worker.deleteLater)
self.import_thread[idx].finished.connect(self.import_thread[idx].deleteLater)
self.import_thread[idx].start()
def order_by_artist(self, idx: int, track_ids_to_check: list[int]) -> list[int]:
"""
Return the list of track_ids sorted by how well the artist at idx matches the
track artist.
"""
track_idx_and_artist = [
((key, a.artist))
for key, a in self.existing_tracks.items()
if key in track_ids_to_check
]
# We want to return all of the passed tracks so set minimum_score
# to zero
return self._find_similar_strings(
self.import_files_data[idx].tags.artist,
track_idx_and_artist,
minimum_score=0.0,
)
def _pick_match(self, idx: int, track_ids: list[int]) -> int:
"""
Return the track_id selected by the user, including "import as new" which will be
track_id 0. Return -1 if user cancels.
If user chooses not to import this track, remove it from the list of tracks to
import and return -1.
"""
log.debug(f"_pick_match({idx=}, {track_ids=})")
new_track_details = (
f"{self.import_files_data[idx].tags.title} "
f"({self.import_files_data[idx].tags.artist})"
)
# Build a list of (track title and artist, track_id, track path)
choices: list[tuple[str, int, str]] = []
# First choice is always to import as a new track
choices.append((Config.DO_NOT_IMPORT, -2, ""))
choices.append((Config.IMPORT_AS_NEW, 0, ""))
for track_id in track_ids:
choices.append(
(
f"{self.existing_tracks[track_id].title} "
f"({self.existing_tracks[track_id].artist})",
track_id,
str(self.existing_tracks[track_id].path),
)
)
dialog = PickMatch(new_track_details, choices)
if dialog.exec() and dialog.selected_id >= 0:
return dialog.selected_id
else:
self.do_not_import.append(self.import_files_data[idx].import_file_path)
return -1
def check_files_are_readable(self) -> None:
"""
Check files to be imported are readable. If not, remove them from the
import list and add them to the file errors list.
"""
for path in self.import_files_paths:
if file_is_unreadable(path):
self.unimportable_files.append(
FileErrors(path=os.path.basename(path), error="File is unreadable")
)
self.import_files_paths.remove(path)
def import_files_are_tagged(self) -> list:
"""
Return a (possibly empty) list of all untagged files in the
import directory. Add tags to file_data
"""
untagged_files: list[str] = []
for fullpath in self.import_files_paths:
tags = get_tags(fullpath)
if not tags:
untagged_files.append(os.path.basename(fullpath))
# Remove from import list
del self.import_files_data[fullpath]
log.warning(f"Import: no tags found, {fullpath=}")
else:
self.import_files_data.append(
TrackFileData(import_file_path=fullpath, tags=tags)
)
return untagged_files
def _get_existing_tracks(self):
"""
Return a dictionary {title: Track} for all existing tracks
"""
with db.Session() as session:
return Tracks.all_tracks_indexed_by_id(session)
def _find_similar_strings(
self,
needle: str,
haystack: list[tuple[int, str]],
minimum_score: float = Config.MINIMUM_FUZZYMATCH,
) -> list[int]:
"""
Search for the needle in the string element of the haystack.
Discard similarities less that minimum_score. Return a list of
the int element of the haystack in order of decreasing score (ie,
best match first).
"""
# Create a dictionary to store similarities
similarities: dict[int, float] = {}
for hayblade in haystack:
# Calculate similarity using multiple metrics
ratio = fuzz.ratio(needle, hayblade[1])
partial_ratio = fuzz.partial_ratio(needle, hayblade[1])
token_sort_ratio = fuzz.token_sort_ratio(needle, hayblade[1])
token_set_ratio = fuzz.token_set_ratio(needle, hayblade[1])
# Combine scores
combined_score = (
ratio * 0.25
+ partial_ratio * 0.25
+ token_sort_ratio * 0.25
+ token_set_ratio * 0.25
)
if combined_score >= minimum_score:
similarities[hayblade[0]] = combined_score
log.debug(
f"_find_similar_strings({needle=}), {len(haystack)=}, "
f"{minimum_score=}, {hayblade=}, {combined_score=}"
)
# Sort matches by score
sorted_matches = sorted(similarities.items(), key=lambda x: x[1], reverse=True)
# Return list of indexes, highest score first
return [a[0] for a in sorted_matches]
class PickMatch(QDialog):
"""
Dialog for user to select which existing track to replace or to
import to a new track
"""
def __init__(
self, new_track_details: str, items_with_ids: list[tuple[str, int, str]]
) -> None:
super().__init__()
self.new_track_details = new_track_details
self.init_ui(items_with_ids)
self.selected_id = -1
def init_ui(self, items_with_ids: list[tuple[str, int, str]]) -> None:
"""
Set up dialog
"""
self.setWindowTitle("New or replace")
layout = QVBoxLayout()
# Add instructions
instructions = (
f"Importing {self.new_track_details}.\n"
"Import as a new track or replace existing track?"
)
instructions_label = QLabel(instructions)
layout.addWidget(instructions_label)
# Create a button group for radio buttons
self.button_group = QButtonGroup()
# Add radio buttons for each item
for idx, (text, track_id, track_path) in enumerate(items_with_ids):
if (
track_sequence.current
and track_id
and track_sequence.current.track_id == track_id
):
# Don't allow current track to be replaced
text = "(Currently playing) " + text
radio_button = QRadioButton(text)
radio_button.setDisabled(True)
self.button_group.addButton(radio_button, -1)
else:
radio_button = QRadioButton(text)
radio_button.setToolTip(track_path)
self.button_group.addButton(radio_button, track_id)
layout.addWidget(radio_button)
# Select the second item by default (import as new)
if idx == 1:
radio_button.setChecked(True)
# Add OK and Cancel buttons
button_layout = QHBoxLayout()
ok_button = QPushButton("OK")
cancel_button = QPushButton("Cancel")
button_layout.addWidget(ok_button)
button_layout.addWidget(cancel_button)
layout.addLayout(button_layout)
self.setLayout(layout)
# Connect buttons to actions
ok_button.clicked.connect(self.on_ok)
cancel_button.clicked.connect(self.reject)
def on_ok(self):
# Get the ID of the selected button
self.selected_id = self.button_group.checkedId()
self.accept()
@dataclass
class TrackFileData:
"""
Simple class to track details changes to a track file
"""
import_file_path: str
tags: Tags
destination_track_path: str = ""
file_path_to_removed: Optional[str] = None
track_id: int = 0
audio_metadata: Optional[AudioMetadata] = None
def set_destination_track_path(self, path: str) -> None:
"""
Assigned the passed path
"""
self.destination_track_path = path