musicmuster/app/file_importer.py
2025-08-17 18:42:01 +01:00

778 lines
25 KiB
Python

from __future__ import annotations
from dataclasses import dataclass, field
from fuzzywuzzy import fuzz # type: ignore
import os.path
import threading
from typing import Optional, Sequence
import os
import shutil
# PyQt imports
from PyQt6.QtCore import (
pyqtSignal,
QThread,
)
from PyQt6.QtWidgets import (
QButtonGroup,
QDialog,
QFileDialog,
QHBoxLayout,
QLabel,
QPushButton,
QRadioButton,
QVBoxLayout,
)
# Third party imports
# App imports
from classes import (
ApplicationError,
MusicMusterSignals,
singleton,
Tags,
)
from config import Config
from helpers import (
audio_file_extension,
file_is_unreadable,
get_tags,
show_OK,
)
from log import log
from models import db, Tracks
from music_manager import track_sequence
from playlistmodel import PlaylistModel
import helpers
@dataclass
class ThreadData:
"""
Data structure to hold details of the import thread context
"""
base_model: PlaylistModel
row_number: int
@dataclass
class TrackFileData:
"""
Data structure to hold details of file to be imported
"""
source_path: str
tags: Tags = Tags()
destination_path: str = ""
import_this_file: bool = False
error: str = ""
file_path_to_remove: Optional[str] = None
track_id: int = 0
track_match_data: list[TrackMatchData] = field(default_factory=list)
@dataclass
class TrackMatchData:
"""
Data structure to hold details of existing files that are similar to
the file being imported.
"""
artist: str
artist_match: float
title: str
title_match: float
track_id: int
@singleton
class FileImporter:
"""
Class to manage the import of new tracks. Sanity checks are carried
out before processing each track.
They may replace existing tracks, be imported as new tracks, or the
import may be skipped altogether. The user decides which of these in
the UI managed by the PickMatch class.
The actual import is handled by the DoTrackImport class.
"""
# Place to keep a reference to importer workers. This is an instance
# variable to allow tests access. As this is a singleton, a class
# variable or an instance variable are effectively the same thing.
workers: dict[str, DoTrackImport] = {}
def __init__(self, base_model: PlaylistModel, row_number: int) -> None:
"""
Initialise the FileImporter singleton instance.
"""
log.debug(f"FileImporter.__init__({base_model=}, {row_number=})")
# Create ModelData
self.model_data = ThreadData(base_model=base_model, row_number=row_number)
# Data structure to track files to import
self.import_files_data: list[TrackFileData] = []
# Get signals
self.signals = MusicMusterSignals()
def _get_existing_tracks(self) -> Sequence[Tracks]:
"""
Return a list of all existing Tracks
"""
with db.Session() as session:
return Tracks.get_all(session)
def start(self) -> None:
"""
Build a TrackFileData object for each new file to import, add it
to self.import_files_data, and trigger importing.
"""
new_files: list[str] = []
if not os.listdir(Config.REPLACE_FILES_DEFAULT_SOURCE):
show_OK(
"File import",
f"No files in {Config.REPLACE_FILES_DEFAULT_SOURCE} to import",
None,
)
return
# Refresh list of existing tracks as they may have been updated
# by previous imports
self.existing_tracks = self._get_existing_tracks()
for infile in [
os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f)
for f in os.listdir(Config.REPLACE_FILES_DEFAULT_SOURCE)
if f.endswith((".mp3", ".flac"))
]:
if infile in [a.source_path for a in self.import_files_data]:
log.debug(f"file_importer.start skipping {infile=}, already queued")
else:
new_files.append(infile)
self.import_files_data.append(self.populate_trackfiledata(infile))
# Tell user which files won't be imported and why
self.inform_user(
[
a
for a in self.import_files_data
if a.source_path in new_files and a.import_this_file is False
]
)
# Remove do-not-import entries from queue
self.import_files_data[:] = [
a for a in self.import_files_data if a.import_this_file is not False
]
# Start the import if necessary
log.debug(
f"Import files prepared: {[a.source_path for a in self.import_files_data]}"
)
self._import_next_file()
def populate_trackfiledata(self, path: str) -> TrackFileData:
"""
Populate TrackFileData object for path:
- Validate file to be imported
- Find matches and similar files
- Get user choices for each import file
- Validate self.import_files_data integrity
- Tell the user which files won't be imported and why
- Import the files, one by one.
"""
tfd = TrackFileData(source_path=path)
if self.check_file_readable(tfd):
if self.check_file_tags(tfd):
self.find_similar(tfd)
if len(tfd.track_match_data) > 1:
self.sort_track_match_data(tfd)
selection = self.get_user_choices(tfd)
if self.process_selection(tfd, selection):
if self.extension_check(tfd):
if self.validate_file_data(tfd):
tfd.import_this_file = True
return tfd
def check_file_readable(self, tfd: TrackFileData) -> bool:
"""
Check file is readable.
Return True if it is.
Populate error and return False if not.
"""
if file_is_unreadable(tfd.source_path):
tfd.import_this_file = False
tfd.error = f"{tfd.source_path} is unreadable"
return False
return True
def check_file_tags(self, tfd: TrackFileData) -> bool:
"""
Add tags to tfd
Return True if successful.
Populate error and return False if not.
"""
try:
tfd.tags = get_tags(tfd.source_path)
except ApplicationError as e:
tfd.import_this_file = False
tfd.error = f"of tag errors ({str(e)})"
return False
return True
def extension_check(self, tfd: TrackFileData) -> bool:
"""
If we are replacing an existing file, check that the correct file
extension of the replacement file matches the existing file
extension and return True if it does (or if there is no exsting
file), else False.
"""
if not tfd.file_path_to_remove:
return True
if tfd.file_path_to_remove.endswith(audio_file_extension(tfd.source_path)):
return True
tfd.error = (
f"Existing file ({tfd.file_path_to_remove}) has a different "
f"extension to replacement file ({tfd.source_path})"
)
return False
def find_similar(self, tfd: TrackFileData) -> None:
"""
- Search title in existing tracks
- if score >= Config.FUZZYMATCH_MINIMUM_LIST:
- get artist score
- add TrackMatchData to self.import_files_data[path].track_match_data
"""
title = tfd.tags.title
artist = tfd.tags.artist
for existing_track in self.existing_tracks:
title_score = self._get_match_score(title, existing_track.title)
if title_score >= Config.FUZZYMATCH_MINIMUM_LIST:
artist_score = self._get_match_score(artist, existing_track.artist)
tfd.track_match_data.append(
TrackMatchData(
artist=existing_track.artist,
artist_match=artist_score,
title=existing_track.title,
title_match=title_score,
track_id=existing_track.id,
)
)
def sort_track_match_data(self, tfd: TrackFileData) -> None:
"""
Sort matched tracks in artist-similarity order
"""
tfd.track_match_data.sort(key=lambda x: x.artist_match, reverse=True)
def _get_match_score(self, str1: str, str2: str) -> float:
"""
Return the score of how well str1 matches str2.
"""
ratio = fuzz.ratio(str1, str2)
partial_ratio = fuzz.partial_ratio(str1, str2)
token_sort_ratio = fuzz.token_sort_ratio(str1, str2)
token_set_ratio = fuzz.token_set_ratio(str1, str2)
# Combine scores
combined_score = (
ratio * 0.25
+ partial_ratio * 0.25
+ token_sort_ratio * 0.25
+ token_set_ratio * 0.25
)
return combined_score
def get_user_choices(self, tfd: TrackFileData) -> int:
"""
Find out whether user wants to import this as a new track,
overwrite an existing track or not import it at all.
Return -1 (user cancelled) 0 (import as new) >0 (replace track id)
"""
# Build a list of (track title and artist, track_id, track path)
choices: list[tuple[str, int, str]] = []
# First choices are always a) don't import 2) import as a new track
choices.append((Config.DO_NOT_IMPORT, -1, ""))
choices.append((Config.IMPORT_AS_NEW, 0, ""))
# New track details
new_track_description = f"{tfd.tags.title} ({tfd.tags.artist})"
# Select 'import as new' as default unless the top match is good
# enough
default = 1
track_match_data = tfd.track_match_data
if track_match_data:
if (
track_match_data[0].artist_match
>= Config.FUZZYMATCH_MINIMUM_SELECT_ARTIST
and track_match_data[0].title_match
>= Config.FUZZYMATCH_MINIMUM_SELECT_TITLE
):
default = 2
for xt in track_match_data:
xt_description = f"{xt.title} ({xt.artist})"
if Config.FUZZYMATCH_SHOW_SCORES:
xt_description += f" ({xt.title_match:.0f}%)"
existing_track_path = self._get_existing_track(xt.track_id).path
choices.append(
(
xt_description,
xt.track_id,
existing_track_path,
)
)
dialog = PickMatch(
new_track_description=new_track_description,
choices=choices,
default=default,
)
if dialog.exec():
return dialog.selected_track_id
else:
return -1
def process_selection(self, tfd: TrackFileData, selection: int) -> bool:
"""
Process selection from PickMatch
"""
if selection < 0:
# User cancelled
tfd.import_this_file = False
tfd.error = "you asked not to import this file"
return False
elif selection > 0:
# Import and replace track
self.replace_file(tfd, track_id=selection)
else:
# Import as new
self.import_as_new(tfd)
return True
def replace_file(self, tfd: TrackFileData, track_id: int) -> None:
"""
Set up to replace an existing file.
"""
log.debug(f"replace_file({tfd=}, {track_id=})")
if track_id < 1:
raise ApplicationError(f"No track ID: replace_file({tfd=}, {track_id=})")
tfd.track_id = track_id
existing_track_path = self._get_existing_track(track_id).path
tfd.file_path_to_remove = existing_track_path
# If the existing file in the Config.IMPORT_DESTINATION
# directory, replace it with the imported file name; otherwise,
# use the existing file name. This so that we don't change file
# names from CDs, etc.
if os.path.dirname(existing_track_path) == Config.IMPORT_DESTINATION:
tfd.destination_path = os.path.join(
Config.IMPORT_DESTINATION, os.path.basename(tfd.source_path)
)
else:
tfd.destination_path = existing_track_path
def _get_existing_track(self, track_id: int) -> Tracks:
"""
Lookup in existing track in the local cache and return it
"""
existing_track_records = [a for a in self.existing_tracks if a.id == track_id]
if len(existing_track_records) != 1:
raise ApplicationError(
f"Internal error in _get_existing_track: {existing_track_records=}"
)
return existing_track_records[0]
def import_as_new(self, tfd: TrackFileData) -> None:
"""
Set up to import as a new file.
"""
tfd.destination_path = os.path.join(
Config.IMPORT_DESTINATION, os.path.basename(tfd.source_path)
)
def validate_file_data(self, tfd: TrackFileData) -> bool:
"""
Check the data structures for integrity
Return True if all OK
Populate error and return False if not.
"""
# Check tags
if not (tfd.tags.artist and tfd.tags.title):
raise ApplicationError(
f"validate_file_data: {tfd.tags=}, {tfd.source_path=}"
)
# Check file_path_to_remove
if tfd.file_path_to_remove and not os.path.exists(tfd.file_path_to_remove):
# File to remove is missing, but this isn't a major error. We
# may be importing to replace a deleted file.
tfd.file_path_to_remove = ""
# Check destination_path
if not tfd.destination_path:
raise ApplicationError(
f"validate_file_data: no destination path set ({tfd.source_path=})"
)
# If destination path is the same as file_path_to_remove, that's
# OK, otherwise if this is a new import then check that
# destination path doesn't already exists
if tfd.track_id == 0 and tfd.destination_path != tfd.file_path_to_remove:
while os.path.exists(tfd.destination_path):
msg = (
"New import requested but default destination path"
f" ({tfd.destination_path})"
" already exists. Click OK and choose where to save this track"
)
show_OK(title="Desintation path exists", msg=msg, parent=None)
# Get output filename
pathspec = QFileDialog.getSaveFileName(
None,
"Save imported track",
directory=Config.IMPORT_DESTINATION,
)
if pathspec:
if pathspec == "":
# User cancelled
tfd.error = "You did not select a location to save this track"
return False
tfd.destination_path = pathspec[0]
else:
tfd.error = "destination file already exists"
return False
# The desintation path should not already exist in the
# database (becquse if it does, it points to a non-existent
# file). Check that because the path field in the database is
# unique and so adding a duplicate will give a db integrity
# error.
with db.Session() as session:
if Tracks.get_by_path(session, tfd.destination_path):
tfd.error = (
"Importing a new track but destination path already exists "
f"in database ({tfd.destination_path})"
)
return False
# Check track_id
if tfd.track_id < 0:
raise ApplicationError(
f"validate_file_data: track_id < 0, {tfd.source_path=}"
)
return True
def inform_user(self, tfds: list[TrackFileData]) -> None:
"""
Tell user about files that won't be imported
"""
msgs: list[str] = []
for tfd in tfds:
msgs.append(
f"{os.path.basename(tfd.source_path)} will not be imported because {tfd.error}"
)
if msgs:
show_OK("File not imported", "\r\r".join(msgs))
log.debug("\r\r".join(msgs))
def _import_next_file(self) -> None:
"""
Import the next file sequentially.
This is called when an import completes so will be called asynchronously.
Protect with a lock.
"""
lock = threading.Lock()
with lock:
while len(self.workers) < Config.MAX_IMPORT_THREADS:
try:
tfd = self.import_files_data.pop()
filename = os.path.basename(tfd.source_path)
log.debug(f"Processing {filename}")
log.debug(
f"remaining files: {[a.source_path for a in self.import_files_data]}"
)
self.signals.status_message_signal.emit(
f"Importing {filename}", 10000
)
self._start_import(tfd)
except IndexError:
log.debug("import_next_file: no files remaining in queue")
break
def _start_import(self, tfd: TrackFileData) -> None:
"""
Start thread to import track
"""
filename = os.path.basename(tfd.source_path)
log.debug(f"_start_import({filename=})")
self.workers[tfd.source_path] = DoTrackImport(
import_file_path=tfd.source_path,
tags=tfd.tags,
destination_path=tfd.destination_path,
track_id=tfd.track_id,
file_path_to_remove=tfd.file_path_to_remove,
)
log.debug(f"{self.workers[tfd.source_path]=} created")
self.workers[tfd.source_path].import_finished.connect(
self.post_import_processing
)
self.workers[tfd.source_path].finished.connect(lambda: self.cleanup_thread(tfd))
self.workers[tfd.source_path].finished.connect(
self.workers[tfd.source_path].deleteLater
)
self.workers[tfd.source_path].start()
def cleanup_thread(self, tfd: TrackFileData) -> None:
"""
Remove references to finished threads/workers to prevent leaks.
"""
log.debug(f"cleanup_thread({tfd.source_path=})")
if tfd.source_path in self.workers:
del self.workers[tfd.source_path]
else:
log.error(f"Couldn't find {tfd.source_path=} in {self.workers.keys()=}")
log.debug(f"After cleanup_thread: {self.workers.keys()=}")
def post_import_processing(self, source_path: str, track_id: int) -> None:
"""
If track already in playlist, refresh it else insert it
"""
log.debug(f"post_import_processing({source_path=}, {track_id=})")
if self.model_data:
if self.model_data.base_model:
self.model_data.base_model.update_or_insert(
track_id, self.model_data.row_number
)
# Process next file(s)
self._import_next_file()
class DoTrackImport(QThread):
"""
Class to manage the actual import of tracks in a thread.
"""
import_finished = pyqtSignal(str, int)
def __init__(
self,
import_file_path: str,
tags: Tags,
destination_path: str,
track_id: int,
file_path_to_remove: Optional[str] = None,
) -> None:
"""
Save parameters
"""
super().__init__()
self.import_file_path = import_file_path
self.tags = tags
self.destination_track_path = destination_path
self.track_id = track_id
self.file_path_to_remove = file_path_to_remove
self.signals = MusicMusterSignals()
def __repr__(self) -> str:
return f"<DoTrackImport(id={hex(id(self))}, import_file_path={self.import_file_path}"
def run(self) -> None:
"""
Either create track objects from passed files or update exising track
objects.
And add to visible playlist or update playlist if track already present.
"""
self.signals.status_message_signal.emit(
f"Importing {os.path.basename(self.import_file_path)}", 5000
)
# Get audio metadata in this thread rather than calling
# function to save interactive time
self.audio_metadata = helpers.get_audio_metadata(self.import_file_path)
# Remove old file if so requested
if self.file_path_to_remove and os.path.exists(self.file_path_to_remove):
os.unlink(self.file_path_to_remove)
# Move new file to destination
shutil.move(self.import_file_path, self.destination_track_path)
with db.Session() as session:
if self.track_id == 0:
# Import new track
try:
track = Tracks(
session,
path=self.destination_track_path,
**self.tags._asdict(),
**self.audio_metadata._asdict(),
)
except Exception as e:
self.signals.show_warning_signal.emit(
"Error importing track", str(e)
)
return
else:
track = session.get(Tracks, self.track_id)
if track:
for key, value in self.tags._asdict().items():
if hasattr(track, key):
setattr(track, key, value)
for key, value in self.audio_metadata._asdict().items():
if hasattr(track, key):
setattr(track, key, value)
track.path = self.destination_track_path
else:
log.error(f"Unable to retrieve {self.track_id=}")
return
session.commit()
helpers.normalise_track(self.destination_track_path)
self.signals.status_message_signal.emit(
f"{os.path.basename(self.import_file_path)} imported", 10000
)
self.import_finished.emit(self.import_file_path, track.id)
class PickMatch(QDialog):
"""
Dialog for user to select which existing track to replace or to
import to a new track
"""
def __init__(
self,
new_track_description: str,
choices: list[tuple[str, int, str]],
default: int,
) -> None:
super().__init__()
self.new_track_description = new_track_description
self.default = default
self.init_ui(choices)
self.selected_track_id = -1
def init_ui(self, choices: list[tuple[str, int, str]]) -> None:
"""
Set up dialog
"""
self.setWindowTitle("New or replace")
layout = QVBoxLayout()
# Add instructions
instructions = (
f"Importing {self.new_track_description}.\n"
"Import as a new track or replace existing track?"
)
instructions_label = QLabel(instructions)
layout.addWidget(instructions_label)
# Create a button group for radio buttons
self.button_group = QButtonGroup()
# Add radio buttons for each item
for idx, (track_description, track_id, track_path) in enumerate(choices):
if (
track_sequence.current
and track_id
and track_sequence.current.track_id == track_id
):
# Don't allow current track to be replaced
track_description = "(Currently playing) " + track_description
radio_button = QRadioButton(track_description)
radio_button.setDisabled(True)
self.button_group.addButton(radio_button, -1)
else:
radio_button = QRadioButton(track_description)
radio_button.setToolTip(track_path)
self.button_group.addButton(radio_button, track_id)
layout.addWidget(radio_button)
# Select the second item by default (import as new)
if idx == self.default:
radio_button.setChecked(True)
# Add OK and Cancel buttons
button_layout = QHBoxLayout()
ok_button = QPushButton("OK")
cancel_button = QPushButton("Cancel")
button_layout.addWidget(ok_button)
button_layout.addWidget(cancel_button)
layout.addLayout(button_layout)
self.setLayout(layout)
# Connect buttons to actions
ok_button.clicked.connect(self.on_ok)
cancel_button.clicked.connect(self.reject)
def on_ok(self):
# Get the ID of the selected button
self.selected_track_id = self.button_group.checkedId()
self.accept()