# Standard library imports from __future__ import annotations from dataclasses import dataclass, field from fuzzywuzzy import fuzz # type: ignore import os.path from typing import Optional import os import shutil # PyQt imports from PyQt6.QtCore import ( pyqtSignal, QObject, QThread, ) from PyQt6.QtWidgets import ( QButtonGroup, QDialog, QFileDialog, QHBoxLayout, QLabel, QPushButton, QRadioButton, QVBoxLayout, ) # Third party imports # App imports from classes import ( ApplicationError, MusicMusterSignals, Tags, ) from config import Config from helpers import ( file_is_unreadable, get_tags, show_OK, ) from log import log from models import db, Tracks from music_manager import track_sequence from playlistmodel import PlaylistModel import helpers class DoTrackImport(QObject): import_finished = pyqtSignal(int, QThread) def __init__( self, associated_thread: QThread, import_file_path: str, tags: Tags, destination_path: str, track_id: int, ) -> None: """ Save parameters """ super().__init__() self.associated_thread = associated_thread self.import_file_path = import_file_path self.tags = tags self.destination_track_path = destination_path self.track_id = track_id self.signals = MusicMusterSignals() def run(self) -> None: """ Either create track objects from passed files or update exising track objects. And add to visible playlist or update playlist if track already present. """ temp_file: Optional[str] = None # Get audio metadata in this thread rather than calling function to save interactive time self.audio_metadata = helpers.get_audio_metadata(self.import_file_path) # If destination exists, move it out of the way if os.path.exists(self.destination_track_path): temp_file = self.destination_track_path + ".TMP" shutil.move(self.destination_track_path, temp_file) # Move file to destination shutil.move(self.import_file_path, self.destination_track_path) # Clean up if temp_file and os.path.exists(temp_file): os.unlink(temp_file) with db.Session() as session: self.signals.status_message_signal.emit( f"Importing {os.path.basename(self.import_file_path)}", 5000 ) if self.track_id == 0: # Import new track try: track = Tracks( session, path=self.destination_track_path, **self.tags._asdict(), **self.audio_metadata._asdict(), ) except Exception as e: self.signals.show_warning_signal.emit( "Error importing track", str(e) ) return else: track = session.get(Tracks, self.track_id) if track: for key, value in self.tags._asdict().items(): if hasattr(track, key): setattr(track, key, value) for key, value in self.audio_metadata._asdict().items(): if hasattr(track, key): setattr(track, key, value) track.path = self.destination_track_path session.commit() helpers.normalise_track(self.destination_track_path) self.signals.status_message_signal.emit( f"{os.path.basename(self.import_file_path)} imported", 10000 ) self.import_finished.emit(track.id, self.associated_thread) class FileImporter: """ Manage importing of files """ def __init__( self, base_model: PlaylistModel, row_number: Optional[int] = None ) -> None: """ Set up class """ # Create ModelData if not row_number: row_number = base_model.rowCount() self.model_data = ThreadData(base_model=base_model, row_number=row_number) # Place to keep reference to importer threads and data self.thread_data: dict[QThread, ThreadData] = {} # Data structure to track files to import self.import_files_data: dict[str, TrackFileData] = {} # Dictionary of exsting tracks indexed by track.id self.existing_tracks = self._get_existing_tracks() # Populate self.import_files_data for infile in [ os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f) for f in os.listdir(Config.REPLACE_FILES_DEFAULT_SOURCE) if f.endswith((".mp3", ".flac")) ]: self.import_files_data[infile] = TrackFileData() def do_import(self) -> None: """ Scan source directory and: - check all file are readable - load readable files and tags into self.import_files - check all files are tagged - check for exact match of existing file - check for duplicates and replacements - allow deselection of import for any one file - import files and either replace existing or add to pool """ # Check all file are readable and have tags. Mark failures not to # be imported and populate error text. for path in self.import_files_data.keys(): if file_is_unreadable(path): self.import_files_data[path].import_this_file = False self.import_files_data[path].error = f"{path} is unreadable" continue # Get tags try: self.import_files_data[path].tags = get_tags(path) except ApplicationError as e: self.import_files_data[path].import_this_file = False self.import_files_data[path].error = f"Tag errors ({str(e)})" continue # Get track match data self.populate_track_match_data(path) # Sort with best artist match first self.import_files_data[path].track_match_data.sort( key=lambda rec: rec.artist_match, reverse=True ) # Process user choices self.process_user_choices(path) # Import files and tell users about files that won't be imported msgs: list[str] = [] for (path, entry) in self.import_files_data.items(): if entry.import_this_file: self._start_thread(path) else: msgs.append( f"{os.path.basename(path)} will not be imported because {entry.error}" ) if msgs: show_OK("File not imported", "\r\r".join(msgs)) def _start_thread(self, path: str) -> None: """ Import the file specified by path """ log.debug(f"_start_thread({path=})") # Create thread and worker thread = QThread() worker = DoTrackImport( associated_thread=thread, import_file_path=path, tags=self.import_files_data[path].tags, destination_path=self.import_files_data[path].destination_path, track_id=self.import_files_data[path].track_id, ) # Associate data with the thread self.model_data.worker = worker self.thread_data[thread] = self.model_data # Move worker to thread worker.moveToThread(thread) log.debug(f"_start_thread_worker started ({path=}, {id(thread)=}, {id(worker)=})") # Connect signals thread.started.connect(lambda: log.debug(f"Thread {thread} started")) thread.started.connect(worker.run) thread.finished.connect(lambda: log.debug(f"Thread {thread} finished")) thread.finished.connect(thread.deleteLater) worker.import_finished.connect( lambda: log.debug(f"Worker task finished for thread {thread}") ) worker.import_finished.connect(self._thread_finished) worker.import_finished.connect(thread.quit) worker.import_finished.connect(worker.deleteLater) # Start thread thread.start() def _thread_finished(self, track_id: int, thread: QThread) -> None: """ If track already in playlist, refresh it else insert it """ log.debug(f" Ending thread {thread}") model_data = self.thread_data.pop(thread, None) if model_data: if model_data.base_model: model_data.base_model.update_or_insert(track_id, model_data.row_number) def _get_existing_track(self, track_id: int) -> Tracks: """ Lookup in existing track in the local cache and return it """ existing_track_records = [a for a in self.existing_tracks if a.id == track_id] if len(existing_track_records) != 1: raise ApplicationError( f"Internal error in _get_existing_track: {existing_track_records=}" ) return existing_track_records[0] def _get_existing_tracks(self): """ Return a dictionary {title: Track} for all existing tracks """ with db.Session() as session: return Tracks.get_all(session) def _get_match_score(self, str1: str, str2: str) -> float: """ Return the score of how well str1 matches str2. """ ratio = fuzz.ratio(str1, str2) partial_ratio = fuzz.partial_ratio(str1, str2) token_sort_ratio = fuzz.token_sort_ratio(str1, str2) token_set_ratio = fuzz.token_set_ratio(str1, str2) # Combine scores combined_score = ( ratio * 0.25 + partial_ratio * 0.25 + token_sort_ratio * 0.25 + token_set_ratio * 0.25 ) return combined_score def populate_track_match_data(self, path: str) -> None: """ Populate self.import_files_data[path].track_match_data - Search title in existing tracks - if score >= Config.FUZZYMATCH_MINIMUM_LIST: - get artist score - add TrackMatchData to self.import_files_data[path].track_match_data """ title = self.import_files_data[path].tags.title artist = self.import_files_data[path].tags.artist for track in self.existing_tracks: title_score = self._get_match_score(title, track.title) if title_score >= Config.FUZZYMATCH_MINIMUM_LIST: artist_score = self._get_match_score(artist, track.artist) self.import_files_data[path].track_match_data.append( TrackMatchData( artist=track.artist, artist_match=artist_score, title=track.title, title_match=title_score, track_id=track.id, ) ) def process_user_choices(self, path: str) -> None: """ Find out whether user wants to import this as a new track, overwrite an existing track or not import it at all. """ # Build a list of (track title and artist, track_id, track path) choices: list[tuple[str, int, str]] = [] # First choice is always to import as a new track choices.append((Config.DO_NOT_IMPORT, -1, "")) choices.append((Config.IMPORT_AS_NEW, 0, "")) # New track details importing_track_description = ( f"{self.import_files_data[path].tags.title} " f"({self.import_files_data[path].tags.artist})" ) # Select 'import as new' as default unless the top match is good # enough default = 1 # default choice is import as new track_match_data = self.import_files_data[path].track_match_data try: if track_match_data: if ( track_match_data[0].artist_match >= Config.FUZZYMATCH_MINIMUM_SELECT_ARTIST and track_match_data[0].title_match >= Config.FUZZYMATCH_MINIMUM_SELECT_TITLE ): default = 2 for rec in track_match_data: existing_track_description = f"{rec.title} ({rec.artist})" if Config.FUZZYMATCH_SHOW_SCORES: existing_track_description += f" ({rec.title_match:.0f}%)" existing_track_path = self._get_existing_track(rec.track_id).path choices.append( (existing_track_description, rec.track_id, existing_track_path) ) except IndexError: import pdb pdb.set_trace() print(2) dialog = PickMatch( new_track_description=importing_track_description, choices=choices, default=default, ) if dialog.exec(): if dialog.selected_track_id < 0: self.import_files_data[path].import_this_file = False self.import_files_data[path].error = "you asked not to import this file" elif dialog.selected_track_id > 0: self.replace_file(path=path, track_id=dialog.selected_track_id) else: # Import as new, but check destination path doesn't # already exists while os.path.exists(self.import_files_data[path].destination_path): msg = ( "New import requested but default destination path ({path}) " "already exists. Click OK and choose where to save this track" ) import pdb pdb.set_trace() show_OK(None, title="Desintation path exists", msg=msg) # Get output filename pathspec = QFileDialog.getSaveFileName( None, "Save imported track", directory=Config.IMPORT_DESTINATION, ) if not pathspec: self.import_files_data[path].import_this_file = False self.import_files_data[ path ].error = "destination file already exists" return self.import_files_data[path].destination_path = pathspec[0] self.import_as_new(path=path) else: # User cancelled dialog self.import_files_data[path].import_this_file = False self.import_files_data[path].error = "you cancelled the import of this file" def import_as_new(self, path: str) -> None: """ Import passed path as a new file """ log.debug(f"Import as new, {path=}") tfd = self.import_files_data[path] destination_path = os.path.join( Config.IMPORT_DESTINATION, os.path.basename(path) ) if os.path.exists(destination_path): tfd.import_this_file = False tfd.error = f"this is a new import but destination file already exists ({destination_path})" return tfd.destination_path = destination_path def replace_file(self, path: str, track_id: int) -> None: """ Replace existing track {track_id=} with passed path """ log.debug(f"Replace {track_id=} with {path=}") tfd = self.import_files_data[path] existing_track_path = self._get_existing_track(track_id).path proposed_destination_path = os.path.join( os.path.dirname(existing_track_path), os.path.basename(path) ) # if the destination path exists and it's not the path the # track_id points to, abort if existing_track_path != proposed_destination_path and os.path.exists( proposed_destination_path ): tfd.import_this_file = False tfd.error = f"New import would overwrite existing file ({proposed_destination_path})" return tfd.file_path_to_remove = existing_track_path tfd.destination_path = proposed_destination_path tfd.track_id = track_id @dataclass class ThreadData: base_model: PlaylistModel row_number: int worker: Optional[DoTrackImport] = None class PickMatch(QDialog): """ Dialog for user to select which existing track to replace or to import to a new track """ def __init__( self, new_track_description: str, choices: list[tuple[str, int, str]], default: int, ) -> None: super().__init__() self.new_track_description = new_track_description self.default = default self.init_ui(choices) self.selected_track_id = -1 def init_ui(self, choices: list[tuple[str, int, str]]) -> None: """ Set up dialog """ self.setWindowTitle("New or replace") layout = QVBoxLayout() # Add instructions instructions = ( f"Importing {self.new_track_description}.\n" "Import as a new track or replace existing track?" ) instructions_label = QLabel(instructions) layout.addWidget(instructions_label) # Create a button group for radio buttons self.button_group = QButtonGroup() # Add radio buttons for each item for idx, (track_description, track_id, track_path) in enumerate(choices): if ( track_sequence.current and track_id and track_sequence.current.track_id == track_id ): # Don't allow current track to be replaced track_description = "(Currently playing) " + track_description radio_button = QRadioButton(track_description) radio_button.setDisabled(True) self.button_group.addButton(radio_button, -1) else: radio_button = QRadioButton(track_description) radio_button.setToolTip(track_path) self.button_group.addButton(radio_button, track_id) layout.addWidget(radio_button) # Select the second item by default (import as new) if idx == self.default: radio_button.setChecked(True) # Add OK and Cancel buttons button_layout = QHBoxLayout() ok_button = QPushButton("OK") cancel_button = QPushButton("Cancel") button_layout.addWidget(ok_button) button_layout.addWidget(cancel_button) layout.addLayout(button_layout) self.setLayout(layout) # Connect buttons to actions ok_button.clicked.connect(self.on_ok) cancel_button.clicked.connect(self.reject) def on_ok(self): # Get the ID of the selected button self.selected_track_id = self.button_group.checkedId() self.accept() @dataclass class TrackFileData: """ Simple class to track details changes to a track file """ tags: Tags = Tags() destination_path: str = "" import_this_file: bool = True error: str = "" file_path_to_remove: Optional[str] = None track_id: int = 0 track_match_data: list[TrackMatchData] = field(default_factory=list) @dataclass class TrackMatchData: artist: str artist_match: float title: str title_match: float track_id: int