diff --git a/app/file_importer.py b/app/file_importer.py index 7ddd643..7917af0 100644 --- a/app/file_importer.py +++ b/app/file_importer.py @@ -10,7 +10,6 @@ import shutil # PyQt imports from PyQt6.QtCore import ( pyqtSignal, - QObject, QThread, ) from PyQt6.QtWidgets import ( @@ -30,6 +29,7 @@ from PyQt6.QtWidgets import ( from classes import ( ApplicationError, MusicMusterSignals, + singleton, Tags, ) from config import Config @@ -53,7 +53,6 @@ class ThreadData: base_model: PlaylistModel row_number: int - worker: Optional[DoTrackImport] = None @dataclass @@ -62,9 +61,10 @@ class TrackFileData: Data structure to hold details of file to be imported """ + source_path: str tags: Tags = Tags() destination_path: str = "" - import_this_file: bool = True + import_this_file: bool = False error: str = "" file_path_to_remove: Optional[str] = None track_id: int = 0 @@ -85,6 +85,7 @@ class TrackMatchData: track_id: int +@singleton class FileImporter: """ Class to manage the import of new tracks. Sanity checks are carried @@ -97,35 +98,47 @@ class FileImporter: The actual import is handled by the DoTrackImport class. """ + # Place to keep a reference to importer threads. This is an instance + # variable to allow tests to access the threads. As this is a + # singleton, a class variable or an instance variable are effectively + # the same thing. + threads: list[QThread] = [] + def __init__( self, base_model: PlaylistModel, row_number: Optional[int] = None ) -> None: """ - Set up class + Initialise the FileImporter singleton instance. """ + self.initialized: bool + + if hasattr(self, "initialized") and self.initialized: + return # Prevent re-initialization of the singleton + + self.initialized = True # Mark the instance as initialized + # Create ModelData if not row_number: row_number = base_model.rowCount() self.model_data = ThreadData(base_model=base_model, row_number=row_number) - # Populate self.import_files_data - for infile in [ - os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f) - for f in os.listdir(Config.REPLACE_FILES_DEFAULT_SOURCE) - if f.endswith((".mp3", ".flac")) - ]: - self.import_files_data[infile] = TrackFileData() - - # Place to keep a reference to importer threads - self.threads: list[QThread] = [] - # Data structure to track files to import - self.import_files_data: dict[str, TrackFileData] = {} + self.import_files_data: list[TrackFileData] = [] + + # Keep track of workers + self.workers: dict[str, DoTrackImport] = {} + + # Limit concurrent threads + self.max_concurrent_threads: int = 3 # Dictionary of exsting tracks indexed by track.id self.existing_tracks = self._get_existing_tracks() + # Track whether importing is active + self.importing: bool = False + + # Get signals self.signals = MusicMusterSignals() def _get_existing_tracks(self) -> Sequence[Tracks]: @@ -136,19 +149,16 @@ class FileImporter: with db.Session() as session: return Tracks.get_all(session) - def do_import(self) -> None: + def start(self) -> None: """ - Populate self.import_files_data, which is a TrackFileData object for each entry. - - - Validate files to be imported - - Find matches and similar files - - Get user choices for each import file - - Validate self.import_files_data integrity - - Tell the user which files won't be imported and why - - Import the files, one by one. + Build a TrackFileData object for each new file to import, add the + TrackFileData object to self.import_files_data, and trigger + importing. """ - if not self.import_files_data: + new_files: list[str] = [] + + if not os.listdir(Config.REPLACE_FILES_DEFAULT_SOURCE): show_OK( "File import", f"No files in {Config.REPLACE_FILES_DEFAULT_SOURCE} to import", @@ -156,78 +166,102 @@ class FileImporter: ) return - for path in self.import_files_data.keys(): - self.validate_file(path) - if self.import_files_data[path].import_this_file: - self.find_similar(path) - if len(self.import_files_data[path].track_match_data) > 1: - self.sort_track_match_data(path) - selection = self.get_user_choices(path) - self.process_selection(path, selection) - if self.import_files_data[path].import_this_file: - self.validate_file_data(path) + for infile in [ + os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f) + for f in os.listdir(Config.REPLACE_FILES_DEFAULT_SOURCE) + if f.endswith((".mp3", ".flac")) + ]: + if infile in [a.source_path for a in self.import_files_data]: + log.debug(f"file_importer.start skipping {infile=}, already queued") + else: + new_files.append(infile) + self.import_files_data.append(self.populate_trackfiledata(infile)) # Tell user which files won't be imported and why - self.inform_user() - # Start the import of all other files - self.import_next_file() + self.inform_user( + [ + a + for a in self.import_files_data + if a.source_path in new_files and a.import_this_file is False + ] + ) - def validate_file(self, path: str) -> None: + # Start the import if necessary + if not self.importing: + self.import_next_file() + + def populate_trackfiledata(self, path: str) -> TrackFileData: """ - - check all files are readable - - check all files have tags - - Mark failures not to be imported and populate error text. + Populate TrackFileData object for path: - On return, the following TrackFileData fields should be set: - - tags: Yes - destination_path: No - import_this_file: Yes (set by default) - error: No (only set if an error is detected) - file_path_to_remove: No - track_id: No - track_match_data: No + - Validate file to be imported + - Find matches and similar files + - Get user choices for each import file + - Validate self.import_files_data integrity + - Tell the user which files won't be imported and why + - Import the files, one by one. """ - for path in self.import_files_data.keys(): - if file_is_unreadable(path): - self.import_files_data[path].import_this_file = False - self.import_files_data[path].error = f"{path} is unreadable" - continue + tfd = TrackFileData(source_path=path) - try: - self.import_files_data[path].tags = get_tags(path) - except ApplicationError as e: - self.import_files_data[path].import_this_file = False - self.import_files_data[path].error = f"Tag errors ({str(e)})" - continue + if self.check_file_readable(tfd): + if self.check_file_tags(tfd): + self.find_similar(tfd) + if len(tfd.track_match_data) > 1: + self.sort_track_match_data(tfd) + selection = self.get_user_choices(tfd) + if self.process_selection(tfd, selection): + if self.validate_file_data(tfd): + tfd.import_this_file = True - def find_similar(self, path: str) -> None: + return tfd + + def check_file_readable(self, tfd: TrackFileData) -> bool: + """ + Check file is readable. + Return True if it is. + Populate error and return False if not. + """ + + if file_is_unreadable(tfd.source_path): + tfd.import_this_file = False + tfd.error = f"{tfd.source_path} is unreadable" + return False + + return True + + def check_file_tags(self, tfd: TrackFileData) -> bool: + """ + Add tags to tfd + Return True if successful. + Populate error and return False if not. + """ + + try: + tfd.tags = get_tags(tfd.source_path) + except ApplicationError as e: + tfd.import_this_file = False + tfd.error = f"of tag errors ({str(e)})" + return False + + return True + + def find_similar(self, tfd: TrackFileData) -> None: """ - Search title in existing tracks - if score >= Config.FUZZYMATCH_MINIMUM_LIST: - get artist score - add TrackMatchData to self.import_files_data[path].track_match_data - - On return, the following TrackFileData fields should be set: - - tags: Yes - destination_path: No - import_this_file: Yes (set by default) - error: No (only set if an error is detected) - file_path_to_remove: No - track_id: No - track_match_data: YES, IN THIS FUNCTION """ - title = self.import_files_data[path].tags.title - artist = self.import_files_data[path].tags.artist + title = tfd.tags.title + artist = tfd.tags.artist for existing_track in self.existing_tracks: title_score = self._get_match_score(title, existing_track.title) if title_score >= Config.FUZZYMATCH_MINIMUM_LIST: artist_score = self._get_match_score(artist, existing_track.artist) - self.import_files_data[path].track_match_data.append( + tfd.track_match_data.append( TrackMatchData( artist=existing_track.artist, artist_match=artist_score, @@ -237,14 +271,12 @@ class FileImporter: ) ) - def sort_track_match_data(self, path: str) -> None: + def sort_track_match_data(self, tfd: TrackFileData) -> None: """ Sort matched tracks in artist-similarity order """ - self.import_files_data[path].track_match_data.sort( - key=lambda x: x.artist_match, reverse=True - ) + tfd.track_match_data.sort(key=lambda x: x.artist_match, reverse=True) def _get_match_score(self, str1: str, str2: str) -> float: """ @@ -266,7 +298,7 @@ class FileImporter: return combined_score - def get_user_choices(self, path: str) -> int: + def get_user_choices(self, tfd: TrackFileData) -> int: """ Find out whether user wants to import this as a new track, overwrite an existing track or not import it at all. @@ -282,15 +314,12 @@ class FileImporter: choices.append((Config.IMPORT_AS_NEW, 0, "")) # New track details - new_track_description = ( - f"{self.import_files_data[path].tags.title} " - f"({self.import_files_data[path].tags.artist})" - ) + new_track_description = f"{tfd.tags.title} ({tfd.tags.artist})" # Select 'import as new' as default unless the top match is good # enough default = 1 - track_match_data = self.import_files_data[path].track_match_data + track_match_data = tfd.track_match_data if track_match_data: if ( track_match_data[0].artist_match @@ -323,48 +352,39 @@ class FileImporter: else: return -1 - def process_selection(self, path: str, selection: int) -> None: + def process_selection(self, tfd: TrackFileData, selection: int) -> bool: """ Process selection from PickMatch """ if selection < 0: # User cancelled - self.import_files_data[path].import_this_file = False - self.import_files_data[path].error = "you asked not to import this file" + tfd.import_this_file = False + tfd.error = "you asked not to import this file" + return False elif selection > 0: # Import and replace track - self.replace_file(path=path, track_id=selection) + self.replace_file(tfd, track_id=selection) else: # Import as new - self.import_as_new(path=path) + self.import_as_new(tfd) - def replace_file(self, path: str, track_id: int) -> None: + return True + + def replace_file(self, tfd: TrackFileData, track_id: int) -> None: """ Set up to replace an existing file. - - On return, the following TrackFileData fields should be set: - - tags: Yes - destination_path: YES, IN THIS FUNCTION - import_this_file: Yes (set by default) - error: No (only set if an error is detected) - file_path_to_remove: YES, IN THIS FUNCTION - track_id: YES, IN THIS FUNCTION - track_match_data: Yes """ - ifd = self.import_files_data[path] - if track_id < 1: - raise ApplicationError(f"No track ID: replace_file({path=}, {track_id=})") + raise ApplicationError(f"No track ID: replace_file({tfd=}, {track_id=})") - ifd.track_id = track_id + tfd.track_id = track_id existing_track_path = self._get_existing_track(track_id).path - ifd.file_path_to_remove = existing_track_path + tfd.file_path_to_remove = existing_track_path # If the existing file in the Config.IMPORT_DESTINATION # directory, replace it with the imported file name; otherwise, @@ -372,11 +392,11 @@ class FileImporter: # names from CDs, etc. if os.path.dirname(existing_track_path) == Config.IMPORT_DESTINATION: - ifd.destination_path = os.path.join( - Config.IMPORT_DESTINATION, os.path.basename(path) + tfd.destination_path = os.path.join( + Config.IMPORT_DESTINATION, os.path.basename(tfd.source_path) ) else: - ifd.destination_path = existing_track_path + tfd.destination_path = existing_track_path def _get_existing_track(self, track_id: int) -> Tracks: """ @@ -391,58 +411,45 @@ class FileImporter: return existing_track_records[0] - def import_as_new(self, path: str) -> None: + def import_as_new(self, tfd: TrackFileData) -> None: """ Set up to import as a new file. - - On return, the following TrackFileData fields should be set: - - tags: Yes - destination_path: YES, IN THIS FUNCTION - import_this_file: Yes (set by default) - error: No (only set if an error is detected) - file_path_to_remove: No (not needed now) - track_id: Yes - track_match_data: Yes """ - ifd = self.import_files_data[path] - ifd.destination_path = os.path.join( - Config.IMPORT_DESTINATION, os.path.basename(path) + tfd.destination_path = os.path.join( + Config.IMPORT_DESTINATION, os.path.basename(tfd.source_path) ) - def validate_file_data(self, path: str) -> None: + def validate_file_data(self, tfd: TrackFileData) -> bool: """ Check the data structures for integrity + Return True if all OK + Populate error and return False if not. """ - ifd = self.import_files_data[path] - - # Check import_this_file - if not ifd.import_this_file: - return - # Check tags - if not (ifd.tags.artist and ifd.tags.title): - raise ApplicationError(f"validate_file_data: {ifd.tags=}, {path=}") + if not (tfd.tags.artist and tfd.tags.title): + raise ApplicationError( + f"validate_file_data: {tfd.tags=}, {tfd.source_path=}" + ) # Check file_path_to_remove - if ifd.file_path_to_remove and not os.path.exists(ifd.file_path_to_remove): + if tfd.file_path_to_remove and not os.path.exists(tfd.file_path_to_remove): # File to remove is missing, but this isn't a major error. We # may be importing to replace a deleted file. - ifd.file_path_to_remove = "" + tfd.file_path_to_remove = "" # Check destination_path - if not ifd.destination_path: + if not tfd.destination_path: raise ApplicationError( - f"validate_file_data: no destination path set ({path=})" + f"validate_file_data: no destination path set ({tfd.source_path=})" ) # If destination path is the same as file_path_to_remove, that's # OK, otherwise if this is a new import then check check # destination path doesn't already exists - if ifd.track_id == 0 and ifd.destination_path != ifd.file_path_to_remove: - while os.path.exists(ifd.destination_path): + if tfd.track_id == 0 and tfd.destination_path != tfd.file_path_to_remove: + while os.path.exists(tfd.destination_path): msg = ( "New import requested but default destination path ({ifd.destination_path}) " "already exists. Click OK and choose where to save this track" @@ -455,92 +462,112 @@ class FileImporter: directory=Config.IMPORT_DESTINATION, ) if pathspec: - ifd.destination_path = pathspec[0] + if pathspec == '': + # User cancelled + tfd.error = "You did not select a location to save this track" + return False + tfd.destination_path = pathspec[0] else: - ifd.import_this_file = False - ifd.error = "destination file already exists" - return + tfd.error = "destination file already exists" + return False # Check track_id - if ifd.track_id < 0: - raise ApplicationError(f"validate_file_data: track_id < 0, {path=}") + if tfd.track_id < 0: + raise ApplicationError( + f"validate_file_data: track_id < 0, {tfd.source_path=}" + ) - def inform_user(self) -> None: + return True + + def inform_user(self, tfds: list[TrackFileData]) -> None: """ Tell user about files that won't be imported """ msgs: list[str] = [] - for path, entry in self.import_files_data.items(): - if entry.import_this_file is False: - msgs.append( - f"{os.path.basename(path)} will not be imported because {entry.error}" - ) + for tfd in tfds: + msgs.append( + f"{os.path.basename(tfd.source_path)} will not be imported because {tfd.error}" + ) if msgs: show_OK("File not imported", "\r\r".join(msgs)) + log.debug("\r\r".join(msgs)) def import_next_file(self) -> None: """ Import the next file sequentially. """ - while True: - if not self.import_files_data: - self.signals.status_message_signal.emit("All files imported", 10000) - return + # Remove any entries that should not be imported. Modify list + # in-place rather than create a new list, which will retain any + # references to self.import_files_data. + self.import_files_data[:] = [ + a for a in self.import_files_data if a.import_this_file + ] - # Get details for next file to import - path, tfd = self.import_files_data.popitem() - if tfd.import_this_file: - break + # If no valid files remain, mark importing as False and exit + if not self.import_files_data: + self.importing = False + self.signals.status_message_signal.emit("All files imported", 10000) + log.debug("import_next_file: all files imported") + return - print(f"import_next_file {path=}") + self.importing = ( + True # Now safe to mark as True since at least one file is valid) + ) - # Create and start a thread for processing - worker = DoTrackImport( - import_file_path=path, + while ( + len(FileImporter.threads) < self.max_concurrent_threads + and self.import_files_data + ): + tfd = self.import_files_data.pop() + filename = os.path.basename(tfd.source_path) + log.debug(f"processing: {filename}") + log.debug( + "remaining: " + f"{[os.path.basename(a.source_path) for a in self.import_files_data]}" + ) + self.signals.status_message_signal.emit(f"Importing {filename}", 10000) + self._start_import(tfd) + + def _start_import(self, tfd: TrackFileData) -> None: + """ + Start thread to import track + """ + + filename = os.path.basename(tfd.source_path) + log.debug(f"_start_import({filename=})") + + self.workers[tfd.source_path] = DoTrackImport( + import_file_path=tfd.source_path, tags=tfd.tags, destination_path=tfd.destination_path, track_id=tfd.track_id, ) - thread = QThread() - self.threads.append(thread) + log.debug(f"{self.workers[tfd.source_path]=} created for {filename=}") - # Move worker to thread - worker.moveToThread(thread) + self.workers[tfd.source_path].import_finished.connect(self.post_import_processing) + self.workers[tfd.source_path].finished.connect(lambda: self.cleanup_thread(tfd)) + self.workers[tfd.source_path].finished.connect(self.workers[tfd.source_path].deleteLater) - # Connect signals and slots - thread.started.connect(worker.run) - thread.started.connect(lambda: print(f"Thread starting for {path=}")) + self.workers[tfd.source_path].start() - worker.import_finished.connect(self.post_import_processing) - worker.import_finished.connect(thread.quit) - worker.import_finished.connect(lambda: print(f"Worker ended for {path=}")) - - # Ensure cleanup only after thread is fully stopped - thread.finished.connect(lambda: self.cleanup_thread(thread, worker)) - thread.finished.connect(lambda: print(f"Thread ended for {path=}")) - - # Start the thread - print(f"Calling thread.start() for {path=}") - thread.start() - - def cleanup_thread(self, thread, worker): + def cleanup_thread(self, tfd: TrackFileData) -> None: """ Remove references to finished threads/workers to prevent leaks. """ - worker.deleteLater() - thread.deleteLater() - if thread in self.threads: - self.threads.remove(thread) + log.debug(f"cleanup_thread({tfd.source_path=})") - def post_import_processing(self, track_id: int) -> None: + if tfd.source_path in self.workers: + del self.workers[tfd.source_path] + + def post_import_processing(self, source_path: str, track_id: int) -> None: """ If track already in playlist, refresh it else insert it """ - log.debug(f"post_import_processing({track_id=})") + log.debug(f"post_import_processing({source_path=}, {track_id=})") if self.model_data: if self.model_data.base_model: @@ -552,12 +579,12 @@ class FileImporter: self.import_next_file() -class DoTrackImport(QObject): +class DoTrackImport(QThread): """ Class to manage the actual import of tracks in a thread. """ - import_finished = pyqtSignal(int) + import_finished = pyqtSignal(str, int) def __init__( self, @@ -637,7 +664,7 @@ class DoTrackImport(QObject): self.signals.status_message_signal.emit( f"{os.path.basename(self.import_file_path)} imported", 10000 ) - self.import_finished.emit(track.id) + self.import_finished.emit(self.import_file_path, track.id) class PickMatch(QDialog): diff --git a/app/helpers.py b/app/helpers.py index b0a37dc..eee1a3c 100644 --- a/app/helpers.py +++ b/app/helpers.py @@ -200,9 +200,9 @@ def get_tags(path: str) -> Tags: try: tag = TinyTag.get(path) except FileNotFoundError: - raise ApplicationError(f"File not found: get_tags({path=})") + raise ApplicationError(f"File not found: {path})") except TinyTagException: - raise ApplicationError(f"Can't read tags: get_tags({path=})") + raise ApplicationError(f"Can't read tags in {path})") if ( tag.title is None @@ -210,7 +210,7 @@ def get_tags(path: str) -> Tags: or tag.bitrate is None or tag.duration is None ): - raise ApplicationError(f"Missing tags: get_tags({path=})") + raise ApplicationError(f"Missing tags in {path})") return Tags( title=tag.title, diff --git a/app/log.py b/app/log.py index 591294b..a49259d 100755 --- a/app/log.py +++ b/app/log.py @@ -20,15 +20,38 @@ from config import Config class FunctionFilter(logging.Filter): """Filter to allow category-based logging to stderr.""" - def __init__(self, functions: set[str]): + def __init__(self, module_functions: dict[str, list[str]]): super().__init__() - self.functions = functions + + self.modules: list[str] = [] + self.functions: defaultdict[str, list[str]] = defaultdict(list) + + for module in module_functions.keys(): + if module_functions[module]: + for function in module_functions[module]: + self.functions[module].append(function) + else: + self.modules.append(module) def filter(self, record: logging.LogRecord) -> bool: - return ( - getattr(record, "funcName", None) in self.functions - and getattr(record, "levelname", None) == "DEBUG" - ) + if not getattr(record, "levelname", None) == "DEBUG": + # Only prcess DEBUG messages + return False + + module = getattr(record, "module", None) + if not module: + # No module in record + return False + + # Process if this is a module we're tracking + if module in self.modules: + return True + + # Process if this is a function we're tracking + if getattr(record, "funcName", None) in self.functions[module]: + return True + + return False class LevelTagFilter(logging.Filter): diff --git a/app/musicmuster.py b/app/musicmuster.py index 83c3594..1e000bc 100755 --- a/app/musicmuster.py +++ b/app/musicmuster.py @@ -860,7 +860,7 @@ class Window(QMainWindow, Ui_MainWindow): self.current.base_model, self.current_row_or_end() ) - self.importer.do_import() + self.importer.start() def insert_header(self) -> None: """Show dialog box to enter header text and add to playlist"""