# Standard library imports from __future__ import annotations from dataclasses import dataclass from fuzzywuzzy import fuzz # type: ignore import os.path from typing import Optional import os import shutil # PyQt imports from PyQt6.QtCore import ( pyqtSignal, QObject, QThread, ) from PyQt6.QtWidgets import ( QButtonGroup, QDialog, QHBoxLayout, QLabel, QPushButton, QRadioButton, QStatusBar, QVBoxLayout, ) # Third party imports # App imports from classes import ( ApplicationError, AudioMetadata, FileErrors, MusicMusterSignals, Tags, ) from config import Config from helpers import ( file_is_unreadable, get_tags, show_warning, ) from log import log from models import db, Tracks from music_manager import track_sequence from playlistmodel import PlaylistModel import helpers class DoTrackImport(QObject): import_finished = pyqtSignal() def __init__( self, import_file_path: str, tags: Tags, destination_track_path: str, track_id: int, audio_metadata: AudioMetadata, source_model: PlaylistModel, row_number: Optional[int], ) -> None: """ Save parameters """ super().__init__() self.import_file_path = import_file_path self.tags = tags self.destination_track_path = destination_track_path self.track_id = track_id self.audio_metadata = audio_metadata self.source_model = source_model if row_number is None: self.next_row_number = source_model.rowCount() else: self.next_row_number = row_number self.signals = MusicMusterSignals() def run(self) -> None: """ Either create track objects from passed files or update exising track objects. And add to visible playlist or update playlist if track already present. """ temp_file: Optional[str] = None # If destination exists, move it out of the way if os.path.exists(self.destination_track_path): temp_file = self.destination_track_path + ".TMP" shutil.move(self.destination_track_path, temp_file) # Move file to destination shutil.move(self.import_file_path, self.destination_track_path) with db.Session() as session: self.signals.status_message_signal.emit( f"Importing {os.path.basename(self.import_file_path)}", 5000 ) if self.track_id == 0: # Import new track try: track = Tracks( session, path=self.destination_track_path, **self.tags._asdict(), **self.audio_metadata._asdict(), ) except Exception as e: self.signals.show_warning_signal.emit( "Error importing track", str(e) ) return else: track = session.get(Tracks, self.track_id) if track: for key, value in self.tags._asdict().items(): if hasattr(track, key): setattr(track, key, value) for key, value in self.audio_metadata._asdict().items(): if hasattr(track, key): setattr(track, key, value) track.path = self.destination_track_path session.commit() helpers.normalise_track(self.destination_track_path) self.source_model.insert_row(self.next_row_number, track.id, "imported") self.next_row_number += 1 self.signals.status_message_signal.emit( f"{os.path.basename(self.import_file_path)} imported", 10000 ) self.import_finished.emit() class FileImporter: """ Manage importing of files """ def __init__(self, active_proxy_model: PlaylistModel, row_number: int) -> None: """ Set up class """ # Save parameters self.active_proxy_model = active_proxy_model self.row_number = row_number # Data structure to track files to import self.import_files_data: list[TrackFileData] = [] # Dictionary of exsting tracks self.existing_tracks = self._get_existing_tracks() # List of track_id, title tuples self.track_idx_and_title = [ ((a.id, a.title)) for a in self.existing_tracks.values() ] # Files to import self.import_files_paths = [ os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f) for f in os.listdir(Config.REPLACE_FILES_DEFAULT_SOURCE) if f.endswith((".mp3", ".flac")) ] # Files we can't import self.unimportable_files: list[FileErrors] = [] # Files user doesn't want imported self.do_not_import: list[str] = [] def do_import(self) -> None: """ Scan source directory and: - check all file are readable - load readable files and tags into self.import_files - check all files are tagged - check for exact match of existing file - check for duplicates and replacements - allow deselection of import for any one file - import files and either replace existing or add to pool """ # check all file are readable self.check_files_are_readable() # load readable files and tags into self.import_files for import_file in self.import_files_paths: try: tags = get_tags(import_file) except ApplicationError as e: self.unimportable_files.append( FileErrors(path=import_file, error=str(e)) ) self.import_files_paths.remove(import_file) try: self.import_files_data.append( TrackFileData(import_file_path=import_file, tags=tags) ) except Exception as e: self.unimportable_files.append( FileErrors(path=import_file, error=str(e)) ) self.import_files_paths.remove(import_file) if self.unimportable_files: msg = "The following files could not be read and won't be imported:\n" for unimportable_file in self.unimportable_files: msg += f"\n\t• {unimportable_file.path} ({unimportable_file.error})" show_warning(None, "Unimportable files", msg) # check for close matches. for idx in range(len(self.import_files_data)): self.check_match(idx=idx) self.import_files_data = [ x for x in self.import_files_data if x.import_file_path not in self.do_not_import ] # Import all that's left. for idx in range(len(self.import_files_data)): self._import_file(idx) def check_match(self, idx: int) -> None: """ Work on and update the idx element of self.import_file_data. Check for similar existing titles. If none found, set up to import this as a new track. If one is found, check with user whether this is a new track or replacement. If more than one is found, as for one but order the tracks in artist-similarity order. """ similar_track_ids = self._find_similar_strings( self.import_files_data[idx].tags.title, self.track_idx_and_title ) if len(similar_track_ids) == 0: matching_track = 0 elif len(similar_track_ids) == 1: matching_track = self._pick_match(idx, similar_track_ids) else: matching_track = self._pick_match( idx, self.order_by_artist(idx, similar_track_ids) ) if matching_track < 0: # User cancelled return if matching_track == 0: self.import_files_data[idx].destination_track_path = os.path.join( Config.IMPORT_DESTINATION, os.path.basename(self.import_files_data[idx].import_file_path), ) else: self.import_files_data[idx].destination_track_path = self.existing_tracks[ matching_track ].path self.import_files_data[idx].track_id = matching_track def _import_file(self, idx: int) -> None: """ Import the file specified at self.import_files_data[idx] """ log.debug(f"_import_file({idx=}), {self.import_files_data[idx]=}") f = self.import_files_data[idx] # Import in separate thread self.import_thread = QThread() self.worker = DoTrackImport( import_file_path=f.import_file_path, tags=f.tags, destination_track_path=f.destination_track_path, track_id=f.track_id, audio_metadata=helpers.get_audio_metadata(f.import_file_path), source_model=self.active_proxy_model, row_number=self.row_number, ) self.worker.moveToThread(self.import_thread) self.import_thread.started.connect(self.worker.run) self.worker.import_finished.connect(self.import_thread.quit) self.worker.import_finished.connect(self.worker.deleteLater) self.import_thread.finished.connect(self.import_thread.deleteLater) self.import_thread.start() def order_by_artist(self, idx: int, track_ids_to_check: list[int]) -> list[int]: """ Return the list of track_ids sorted by how well the artist at idx matches the track artist. """ track_idx_and_artist = [ ((key, a.artist)) for key, a in self.existing_tracks.items() if key in track_ids_to_check ] # We want to return all of the passed tracks so set minimum_score # to zero return self._find_similar_strings( self.import_files_data[idx].tags.artist, track_idx_and_artist, minimum_score=0.0, ) def _pick_match(self, idx: int, track_ids: list[int]) -> int: """ Return the track_id selected by the user, including "import as new" which will be track_id 0. Return -1 if user cancels. If user chooses not to import this track, remove it from the list of tracks to import and return -1. """ log.debug(f"_pick_match({idx=}, {track_ids=})") new_track_details = ( f"{self.import_files_data[idx].tags.title} " f"({self.import_files_data[idx].tags.artist})" ) # Build a list of (track title and artist, track_id, track path) choices: list[tuple[str, int, str]] = [] # First choice is always to import as a new track choices.append((Config.DO_NOT_IMPORT, -2, "")) choices.append((Config.IMPORT_AS_NEW, 0, "")) for track_id in track_ids: choices.append( ( f"{self.existing_tracks[track_id].title} " f"({self.existing_tracks[track_id].artist})", track_id, str(self.existing_tracks[track_id].path) ) ) dialog = PickMatch(new_track_details, choices) if dialog.exec() and dialog.selected_id >= 0: return dialog.selected_id else: self.do_not_import.append(self.import_files_data[idx].import_file_path) return -1 def check_files_are_readable(self) -> None: """ Check files to be imported are readable. If not, remove them from the import list and add them to the file errors list. """ for path in self.import_files_paths: if file_is_unreadable(path): self.unimportable_files.append( FileErrors(path=os.path.basename(path), error="File is unreadable") ) self.import_files_paths.remove(path) def import_files_are_tagged(self) -> list: """ Return a (possibly empty) list of all untagged files in the import directory. Add tags to file_data """ untagged_files: list[str] = [] for fullpath in self.import_files_paths: tags = get_tags(fullpath) if not tags: untagged_files.append(os.path.basename(fullpath)) # Remove from import list del self.import_files_data[fullpath] log.warning(f"Import: no tags found, {fullpath=}") else: self.import_files_data.append( TrackFileData(import_file_path=fullpath, tags=tags) ) return untagged_files def _get_existing_tracks(self): """ Return a dictionary {title: Track} for all existing tracks """ with db.Session() as session: return Tracks.all_tracks_indexed_by_id(session) def _find_similar_strings( self, needle: str, haystack: list[tuple[int, str]], minimum_score: float = Config.MINIMUM_FUZZYMATCH, ) -> list[int]: """ Search for the needle in the string element of the haystack. Discard similarities less that minimum_score. Return a list of the int element of the haystack in order of decreasing score (ie, best match first). """ # Create a dictionary to store similarities similarities: dict[int, float] = {} for hayblade in haystack: # Calculate similarity using multiple metrics ratio = fuzz.ratio(needle, hayblade[1]) partial_ratio = fuzz.partial_ratio(needle, hayblade[1]) token_sort_ratio = fuzz.token_sort_ratio(needle, hayblade[1]) token_set_ratio = fuzz.token_set_ratio(needle, hayblade[1]) # Combine scores combined_score = ( ratio * 0.25 + partial_ratio * 0.25 + token_sort_ratio * 0.25 + token_set_ratio * 0.25 ) if combined_score >= minimum_score: similarities[hayblade[0]] = combined_score log.debug( f"_find_similar_strings({needle=}), {len(haystack)=}, " f"{minimum_score=}, {hayblade=}, {combined_score=}" ) # Sort matches by score sorted_matches = sorted(similarities.items(), key=lambda x: x[1], reverse=True) # Return list of indexes, highest score first return [a[0] for a in sorted_matches] class PickMatch(QDialog): """ Dialog for user to select which existing track to replace or to import to a new track """ def __init__( self, new_track_details: str, items_with_ids: list[tuple[str, int, str]] ) -> None: super().__init__() self.new_track_details = new_track_details self.init_ui(items_with_ids) self.selected_id = -1 def init_ui(self, items_with_ids: list[tuple[str, int]]) -> None: """ Set up dialog """ self.setWindowTitle("New or replace") layout = QVBoxLayout() # Add instructions instructions = ( f"Importing {self.new_track_details}.\n" "Import as a new track or replace existing track?" ) instructions_label = QLabel(instructions) layout.addWidget(instructions_label) # Create a button group for radio buttons self.button_group = QButtonGroup() # Add radio buttons for each item for idx, (text, track_id, track_path) in enumerate(items_with_ids): if ( track_sequence.current and track_id and track_sequence.current.track_id == track_id ): # Don't allow current track to be replaced text = "(Currently playing) " + text radio_button = QRadioButton(text) radio_button.setDisabled(True) self.button_group.addButton(radio_button, -1) else: radio_button = QRadioButton(text) radio_button.setToolTip(track_path) self.button_group.addButton(radio_button, track_id) layout.addWidget(radio_button) # Select the second item by default (import as new) if idx == 1: radio_button.setChecked(True) # Add OK and Cancel buttons button_layout = QHBoxLayout() ok_button = QPushButton("OK") cancel_button = QPushButton("Cancel") button_layout.addWidget(ok_button) button_layout.addWidget(cancel_button) layout.addLayout(button_layout) self.setLayout(layout) # Connect buttons to actions ok_button.clicked.connect(self.on_ok) cancel_button.clicked.connect(self.reject) def on_ok(self): # Get the ID of the selected button self.selected_id = self.button_group.checkedId() self.accept() @dataclass class TrackFileData: """ Simple class to track details changes to a track file """ import_file_path: str tags: Tags destination_track_path: str = "" file_path_to_removed: Optional[str] = None track_id: int = 0 audio_metadata: Optional[AudioMetadata] = None def set_destination_track_path(self, path: str) -> None: """ Assigned the passed path """ self.destination_track_path = path