from __future__ import annotations from dataclasses import dataclass, field from fuzzywuzzy import fuzz # type: ignore import os.path from typing import Optional, Sequence import os import shutil # PyQt imports from PyQt6.QtCore import ( pyqtSignal, QThread, ) from PyQt6.QtWidgets import ( QButtonGroup, QDialog, QFileDialog, QHBoxLayout, QLabel, QPushButton, QRadioButton, QVBoxLayout, ) # Third party imports # App imports from classes import ( ApplicationError, MusicMusterSignals, singleton, Tags, ) from config import Config from helpers import ( file_is_unreadable, get_tags, show_OK, ) from log import log from models import db, Tracks from music_manager import track_sequence from playlistmodel import PlaylistModel import helpers @dataclass class ThreadData: """ Data structure to hold details of the import thread context """ base_model: PlaylistModel row_number: int @dataclass class TrackFileData: """ Data structure to hold details of file to be imported """ source_path: str tags: Tags = Tags() destination_path: str = "" import_this_file: bool = False error: str = "" file_path_to_remove: Optional[str] = None track_id: int = 0 track_match_data: list[TrackMatchData] = field(default_factory=list) @dataclass class TrackMatchData: """ Data structure to hold details of existing files that are similar to the file being imported. """ artist: str artist_match: float title: str title_match: float track_id: int @singleton class FileImporter: """ Class to manage the import of new tracks. Sanity checks are carried out before processing each track. They may replace existing tracks, be imported as new tracks, or the import may be skipped altogether. The user decides which of these in the UI managed by the PickMatch class. The actual import is handled by the DoTrackImport class. """ # Place to keep a reference to importer threads. This is an instance # variable to allow tests to access the threads. As this is a # singleton, a class variable or an instance variable are effectively # the same thing. threads: list[QThread] = [] def __init__( self, base_model: PlaylistModel, row_number: Optional[int] = None ) -> None: """ Initialise the FileImporter singleton instance. """ self.initialized: bool if hasattr(self, "initialized") and self.initialized: return # Prevent re-initialization of the singleton self.initialized = True # Mark the instance as initialized # Create ModelData if not row_number: row_number = base_model.rowCount() self.model_data = ThreadData(base_model=base_model, row_number=row_number) # Data structure to track files to import self.import_files_data: list[TrackFileData] = [] # Keep track of workers self.workers: dict[str, DoTrackImport] = {} # Limit concurrent threads self.max_concurrent_threads: int = 3 # Dictionary of exsting tracks indexed by track.id self.existing_tracks = self._get_existing_tracks() # Track whether importing is active self.importing: bool = False # Get signals self.signals = MusicMusterSignals() def _get_existing_tracks(self) -> Sequence[Tracks]: """ Return a list of all existing Tracks """ with db.Session() as session: return Tracks.get_all(session) def start(self) -> None: """ Build a TrackFileData object for each new file to import, add the TrackFileData object to self.import_files_data, and trigger importing. """ new_files: list[str] = [] if not os.listdir(Config.REPLACE_FILES_DEFAULT_SOURCE): show_OK( "File import", f"No files in {Config.REPLACE_FILES_DEFAULT_SOURCE} to import", None, ) return for infile in [ os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f) for f in os.listdir(Config.REPLACE_FILES_DEFAULT_SOURCE) if f.endswith((".mp3", ".flac")) ]: if infile in [a.source_path for a in self.import_files_data]: log.debug(f"file_importer.start skipping {infile=}, already queued") else: new_files.append(infile) self.import_files_data.append(self.populate_trackfiledata(infile)) # Tell user which files won't be imported and why self.inform_user( [ a for a in self.import_files_data if a.source_path in new_files and a.import_this_file is False ] ) # Start the import if necessary if not self.importing: self.import_next_file() def populate_trackfiledata(self, path: str) -> TrackFileData: """ Populate TrackFileData object for path: - Validate file to be imported - Find matches and similar files - Get user choices for each import file - Validate self.import_files_data integrity - Tell the user which files won't be imported and why - Import the files, one by one. """ tfd = TrackFileData(source_path=path) if self.check_file_readable(tfd): if self.check_file_tags(tfd): self.find_similar(tfd) if len(tfd.track_match_data) > 1: self.sort_track_match_data(tfd) selection = self.get_user_choices(tfd) if self.process_selection(tfd, selection): if self.validate_file_data(tfd): tfd.import_this_file = True return tfd def check_file_readable(self, tfd: TrackFileData) -> bool: """ Check file is readable. Return True if it is. Populate error and return False if not. """ if file_is_unreadable(tfd.source_path): tfd.import_this_file = False tfd.error = f"{tfd.source_path} is unreadable" return False return True def check_file_tags(self, tfd: TrackFileData) -> bool: """ Add tags to tfd Return True if successful. Populate error and return False if not. """ try: tfd.tags = get_tags(tfd.source_path) except ApplicationError as e: tfd.import_this_file = False tfd.error = f"of tag errors ({str(e)})" return False return True def find_similar(self, tfd: TrackFileData) -> None: """ - Search title in existing tracks - if score >= Config.FUZZYMATCH_MINIMUM_LIST: - get artist score - add TrackMatchData to self.import_files_data[path].track_match_data """ title = tfd.tags.title artist = tfd.tags.artist for existing_track in self.existing_tracks: title_score = self._get_match_score(title, existing_track.title) if title_score >= Config.FUZZYMATCH_MINIMUM_LIST: artist_score = self._get_match_score(artist, existing_track.artist) tfd.track_match_data.append( TrackMatchData( artist=existing_track.artist, artist_match=artist_score, title=existing_track.title, title_match=title_score, track_id=existing_track.id, ) ) def sort_track_match_data(self, tfd: TrackFileData) -> None: """ Sort matched tracks in artist-similarity order """ tfd.track_match_data.sort(key=lambda x: x.artist_match, reverse=True) def _get_match_score(self, str1: str, str2: str) -> float: """ Return the score of how well str1 matches str2. """ ratio = fuzz.ratio(str1, str2) partial_ratio = fuzz.partial_ratio(str1, str2) token_sort_ratio = fuzz.token_sort_ratio(str1, str2) token_set_ratio = fuzz.token_set_ratio(str1, str2) # Combine scores combined_score = ( ratio * 0.25 + partial_ratio * 0.25 + token_sort_ratio * 0.25 + token_set_ratio * 0.25 ) return combined_score def get_user_choices(self, tfd: TrackFileData) -> int: """ Find out whether user wants to import this as a new track, overwrite an existing track or not import it at all. Return -1 (user cancelled) 0 (import as new) >0 (replace track id) """ # Build a list of (track title and artist, track_id, track path) choices: list[tuple[str, int, str]] = [] # First choices are always a) don't import 2) import as a new track choices.append((Config.DO_NOT_IMPORT, -1, "")) choices.append((Config.IMPORT_AS_NEW, 0, "")) # New track details new_track_description = f"{tfd.tags.title} ({tfd.tags.artist})" # Select 'import as new' as default unless the top match is good # enough default = 1 track_match_data = tfd.track_match_data if track_match_data: if ( track_match_data[0].artist_match >= Config.FUZZYMATCH_MINIMUM_SELECT_ARTIST and track_match_data[0].title_match >= Config.FUZZYMATCH_MINIMUM_SELECT_TITLE ): default = 2 for xt in track_match_data: xt_description = f"{xt.title} ({xt.artist})" if Config.FUZZYMATCH_SHOW_SCORES: xt_description += f" ({xt.title_match:.0f}%)" existing_track_path = self._get_existing_track(xt.track_id).path choices.append( ( xt_description, xt.track_id, existing_track_path, ) ) dialog = PickMatch( new_track_description=new_track_description, choices=choices, default=default, ) if dialog.exec(): return dialog.selected_track_id else: return -1 def process_selection(self, tfd: TrackFileData, selection: int) -> bool: """ Process selection from PickMatch """ if selection < 0: # User cancelled tfd.import_this_file = False tfd.error = "you asked not to import this file" return False elif selection > 0: # Import and replace track self.replace_file(tfd, track_id=selection) else: # Import as new self.import_as_new(tfd) return True def replace_file(self, tfd: TrackFileData, track_id: int) -> None: """ Set up to replace an existing file. """ if track_id < 1: raise ApplicationError(f"No track ID: replace_file({tfd=}, {track_id=})") tfd.track_id = track_id existing_track_path = self._get_existing_track(track_id).path tfd.file_path_to_remove = existing_track_path # If the existing file in the Config.IMPORT_DESTINATION # directory, replace it with the imported file name; otherwise, # use the existing file name. This so that we don't change file # names from CDs, etc. if os.path.dirname(existing_track_path) == Config.IMPORT_DESTINATION: tfd.destination_path = os.path.join( Config.IMPORT_DESTINATION, os.path.basename(tfd.source_path) ) else: tfd.destination_path = existing_track_path def _get_existing_track(self, track_id: int) -> Tracks: """ Lookup in existing track in the local cache and return it """ existing_track_records = [a for a in self.existing_tracks if a.id == track_id] if len(existing_track_records) != 1: raise ApplicationError( f"Internal error in _get_existing_track: {existing_track_records=}" ) return existing_track_records[0] def import_as_new(self, tfd: TrackFileData) -> None: """ Set up to import as a new file. """ tfd.destination_path = os.path.join( Config.IMPORT_DESTINATION, os.path.basename(tfd.source_path) ) def validate_file_data(self, tfd: TrackFileData) -> bool: """ Check the data structures for integrity Return True if all OK Populate error and return False if not. """ # Check tags if not (tfd.tags.artist and tfd.tags.title): raise ApplicationError( f"validate_file_data: {tfd.tags=}, {tfd.source_path=}" ) # Check file_path_to_remove if tfd.file_path_to_remove and not os.path.exists(tfd.file_path_to_remove): # File to remove is missing, but this isn't a major error. We # may be importing to replace a deleted file. tfd.file_path_to_remove = "" # Check destination_path if not tfd.destination_path: raise ApplicationError( f"validate_file_data: no destination path set ({tfd.source_path=})" ) # If destination path is the same as file_path_to_remove, that's # OK, otherwise if this is a new import then check check # destination path doesn't already exists if tfd.track_id == 0 and tfd.destination_path != tfd.file_path_to_remove: while os.path.exists(tfd.destination_path): msg = ( "New import requested but default destination path ({ifd.destination_path}) " "already exists. Click OK and choose where to save this track" ) show_OK(title="Desintation path exists", msg=msg, parent=None) # Get output filename pathspec = QFileDialog.getSaveFileName( None, "Save imported track", directory=Config.IMPORT_DESTINATION, ) if pathspec: if pathspec == '': # User cancelled tfd.error = "You did not select a location to save this track" return False tfd.destination_path = pathspec[0] else: tfd.error = "destination file already exists" return False # Check track_id if tfd.track_id < 0: raise ApplicationError( f"validate_file_data: track_id < 0, {tfd.source_path=}" ) return True def inform_user(self, tfds: list[TrackFileData]) -> None: """ Tell user about files that won't be imported """ msgs: list[str] = [] for tfd in tfds: msgs.append( f"{os.path.basename(tfd.source_path)} will not be imported because {tfd.error}" ) if msgs: show_OK("File not imported", "\r\r".join(msgs)) log.debug("\r\r".join(msgs)) def import_next_file(self) -> None: """ Import the next file sequentially. """ # Remove any entries that should not be imported. Modify list # in-place rather than create a new list, which will retain any # references to self.import_files_data. self.import_files_data[:] = [ a for a in self.import_files_data if a.import_this_file ] # If no valid files remain, mark importing as False and exit if not self.import_files_data: self.importing = False self.signals.status_message_signal.emit("All files imported", 10000) log.debug("import_next_file: all files imported") return self.importing = ( True # Now safe to mark as True since at least one file is valid) ) while ( len(FileImporter.threads) < self.max_concurrent_threads and self.import_files_data ): tfd = self.import_files_data.pop() filename = os.path.basename(tfd.source_path) log.debug(f"processing: {filename}") log.debug( "remaining: " f"{[os.path.basename(a.source_path) for a in self.import_files_data]}" ) self.signals.status_message_signal.emit(f"Importing {filename}", 10000) self._start_import(tfd) def _start_import(self, tfd: TrackFileData) -> None: """ Start thread to import track """ filename = os.path.basename(tfd.source_path) log.debug(f"_start_import({filename=})") self.workers[tfd.source_path] = DoTrackImport( import_file_path=tfd.source_path, tags=tfd.tags, destination_path=tfd.destination_path, track_id=tfd.track_id, ) log.debug(f"{self.workers[tfd.source_path]=} created for {filename=}") self.workers[tfd.source_path].import_finished.connect(self.post_import_processing) self.workers[tfd.source_path].finished.connect(lambda: self.cleanup_thread(tfd)) self.workers[tfd.source_path].finished.connect(self.workers[tfd.source_path].deleteLater) self.workers[tfd.source_path].start() def cleanup_thread(self, tfd: TrackFileData) -> None: """ Remove references to finished threads/workers to prevent leaks. """ log.debug(f"cleanup_thread({tfd.source_path=})") if tfd.source_path in self.workers: del self.workers[tfd.source_path] def post_import_processing(self, source_path: str, track_id: int) -> None: """ If track already in playlist, refresh it else insert it """ log.debug(f"post_import_processing({source_path=}, {track_id=})") if self.model_data: if self.model_data.base_model: self.model_data.base_model.update_or_insert( track_id, self.model_data.row_number ) # Process next file self.import_next_file() class DoTrackImport(QThread): """ Class to manage the actual import of tracks in a thread. """ import_finished = pyqtSignal(str, int) def __init__( self, import_file_path: str, tags: Tags, destination_path: str, track_id: int, ) -> None: """ Save parameters """ super().__init__() self.import_file_path = import_file_path self.tags = tags self.destination_track_path = destination_path self.track_id = track_id self.signals = MusicMusterSignals() def run(self) -> None: """ Either create track objects from passed files or update exising track objects. And add to visible playlist or update playlist if track already present. """ temp_file: Optional[str] = None # Get audio metadata in this thread rather than calling function to save interactive time self.audio_metadata = helpers.get_audio_metadata(self.import_file_path) # If destination exists, move it out of the way if os.path.exists(self.destination_track_path): temp_file = self.destination_track_path + ".TMP" shutil.move(self.destination_track_path, temp_file) # Move file to destination shutil.move(self.import_file_path, self.destination_track_path) # Clean up if temp_file and os.path.exists(temp_file): os.unlink(temp_file) with db.Session() as session: self.signals.status_message_signal.emit( f"Importing {os.path.basename(self.import_file_path)}", 5000 ) if self.track_id == 0: # Import new track try: track = Tracks( session, path=self.destination_track_path, **self.tags._asdict(), **self.audio_metadata._asdict(), ) except Exception as e: self.signals.show_warning_signal.emit( "Error importing track", str(e) ) return else: track = session.get(Tracks, self.track_id) if track: for key, value in self.tags._asdict().items(): if hasattr(track, key): setattr(track, key, value) for key, value in self.audio_metadata._asdict().items(): if hasattr(track, key): setattr(track, key, value) track.path = self.destination_track_path session.commit() helpers.normalise_track(self.destination_track_path) self.signals.status_message_signal.emit( f"{os.path.basename(self.import_file_path)} imported", 10000 ) self.import_finished.emit(self.import_file_path, track.id) class PickMatch(QDialog): """ Dialog for user to select which existing track to replace or to import to a new track """ def __init__( self, new_track_description: str, choices: list[tuple[str, int, str]], default: int, ) -> None: super().__init__() self.new_track_description = new_track_description self.default = default self.init_ui(choices) self.selected_track_id = -1 def init_ui(self, choices: list[tuple[str, int, str]]) -> None: """ Set up dialog """ self.setWindowTitle("New or replace") layout = QVBoxLayout() # Add instructions instructions = ( f"Importing {self.new_track_description}.\n" "Import as a new track or replace existing track?" ) instructions_label = QLabel(instructions) layout.addWidget(instructions_label) # Create a button group for radio buttons self.button_group = QButtonGroup() # Add radio buttons for each item for idx, (track_description, track_id, track_path) in enumerate(choices): if ( track_sequence.current and track_id and track_sequence.current.track_id == track_id ): # Don't allow current track to be replaced track_description = "(Currently playing) " + track_description radio_button = QRadioButton(track_description) radio_button.setDisabled(True) self.button_group.addButton(radio_button, -1) else: radio_button = QRadioButton(track_description) radio_button.setToolTip(track_path) self.button_group.addButton(radio_button, track_id) layout.addWidget(radio_button) # Select the second item by default (import as new) if idx == self.default: radio_button.setChecked(True) # Add OK and Cancel buttons button_layout = QHBoxLayout() ok_button = QPushButton("OK") cancel_button = QPushButton("Cancel") button_layout.addWidget(ok_button) button_layout.addWidget(cancel_button) layout.addLayout(button_layout) self.setLayout(layout) # Connect buttons to actions ok_button.clicked.connect(self.on_ok) cancel_button.clicked.connect(self.reject) def on_ok(self): # Get the ID of the selected button self.selected_track_id = self.button_group.checkedId() self.accept()