diff --git a/app/config.py b/app/config.py index 32ba052..7d2b5d8 100644 --- a/app/config.py +++ b/app/config.py @@ -83,6 +83,7 @@ class Config(object): MAIL_USERNAME = os.environ.get("MAIL_USERNAME") MAIL_USE_TLS = os.environ.get("MAIL_USE_TLS") is not None MAX_IMPORT_MATCHES = 5 + MAX_IMPORT_THREADS = 3 MAX_INFO_TABS = 5 MAX_MISSING_FILES_TO_REPORT = 10 MILLISECOND_SIGFIGS = 0 diff --git a/app/file_importer.py b/app/file_importer.py index 7917af0..a26c0fc 100644 --- a/app/file_importer.py +++ b/app/file_importer.py @@ -3,6 +3,7 @@ from __future__ import annotations from dataclasses import dataclass, field from fuzzywuzzy import fuzz # type: ignore import os.path +import threading from typing import Optional, Sequence import os import shutil @@ -98,11 +99,10 @@ class FileImporter: The actual import is handled by the DoTrackImport class. """ - # Place to keep a reference to importer threads. This is an instance - # variable to allow tests to access the threads. As this is a - # singleton, a class variable or an instance variable are effectively - # the same thing. - threads: list[QThread] = [] + # Place to keep a reference to importer workers. This is an instance + # variable to allow tests access. As this is a singleton, a class + # variable or an instance variable are effectively the same thing. + workers: dict[str, DoTrackImport] = {} def __init__( self, base_model: PlaylistModel, row_number: Optional[int] = None @@ -111,13 +111,6 @@ class FileImporter: Initialise the FileImporter singleton instance. """ - self.initialized: bool - - if hasattr(self, "initialized") and self.initialized: - return # Prevent re-initialization of the singleton - - self.initialized = True # Mark the instance as initialized - # Create ModelData if not row_number: row_number = base_model.rowCount() @@ -126,18 +119,9 @@ class FileImporter: # Data structure to track files to import self.import_files_data: list[TrackFileData] = [] - # Keep track of workers - self.workers: dict[str, DoTrackImport] = {} - - # Limit concurrent threads - self.max_concurrent_threads: int = 3 - # Dictionary of exsting tracks indexed by track.id self.existing_tracks = self._get_existing_tracks() - # Track whether importing is active - self.importing: bool = False - # Get signals self.signals = MusicMusterSignals() @@ -151,9 +135,8 @@ class FileImporter: def start(self) -> None: """ - Build a TrackFileData object for each new file to import, add the - TrackFileData object to self.import_files_data, and trigger - importing. + Build a TrackFileData object for each new file to import, add it + to self.import_files_data, and trigger importing. """ new_files: list[str] = [] @@ -186,9 +169,14 @@ class FileImporter: ] ) + # Remove do-not-import entries from queue + self.import_files_data[:] = [ + a for a in self.import_files_data if a.import_this_file is not False + ] + # Start the import if necessary - if not self.importing: - self.import_next_file() + log.debug(f"Import files prepared: {[a.source_path for a in self.import_files_data]}") + self._import_next_file() def populate_trackfiledata(self, path: str) -> TrackFileData: """ @@ -493,42 +481,30 @@ class FileImporter: show_OK("File not imported", "\r\r".join(msgs)) log.debug("\r\r".join(msgs)) - def import_next_file(self) -> None: + def _import_next_file(self) -> None: """ Import the next file sequentially. + + This is called when an import completes so will be called asynchronously. + Protect with a lock. """ - # Remove any entries that should not be imported. Modify list - # in-place rather than create a new list, which will retain any - # references to self.import_files_data. - self.import_files_data[:] = [ - a for a in self.import_files_data if a.import_this_file - ] + lock = threading.Lock() - # If no valid files remain, mark importing as False and exit - if not self.import_files_data: - self.importing = False - self.signals.status_message_signal.emit("All files imported", 10000) - log.debug("import_next_file: all files imported") - return - - self.importing = ( - True # Now safe to mark as True since at least one file is valid) - ) - - while ( - len(FileImporter.threads) < self.max_concurrent_threads - and self.import_files_data - ): - tfd = self.import_files_data.pop() - filename = os.path.basename(tfd.source_path) - log.debug(f"processing: {filename}") - log.debug( - "remaining: " - f"{[os.path.basename(a.source_path) for a in self.import_files_data]}" - ) - self.signals.status_message_signal.emit(f"Importing {filename}", 10000) - self._start_import(tfd) + with lock: + while len(self.workers) < Config.MAX_IMPORT_THREADS: + try: + tfd = self.import_files_data.pop() + filename = os.path.basename(tfd.source_path) + log.debug(f"_import_next_file: {filename}") + log.debug( + f"remaining files: {[a.source_path for a in self.import_files_data]}" + ) + self.signals.status_message_signal.emit(f"Importing {filename}", 10000) + self._start_import(tfd) + except IndexError: + log.debug("import_next_file: no files remaining in queue") + break def _start_import(self, tfd: TrackFileData) -> None: """ @@ -544,7 +520,7 @@ class FileImporter: destination_path=tfd.destination_path, track_id=tfd.track_id, ) - log.debug(f"{self.workers[tfd.source_path]=} created for {filename=}") + log.debug(f"{self.workers[tfd.source_path]=} created") self.workers[tfd.source_path].import_finished.connect(self.post_import_processing) self.workers[tfd.source_path].finished.connect(lambda: self.cleanup_thread(tfd)) @@ -561,6 +537,10 @@ class FileImporter: if tfd.source_path in self.workers: del self.workers[tfd.source_path] + else: + log.debug(f"Couldn't find entry in self.workers: {tfd.source_path=}") + + log.debug(f"After cleanup_thread: {self.workers.keys()=}") def post_import_processing(self, source_path: str, track_id: int) -> None: """ @@ -575,8 +555,8 @@ class FileImporter: track_id, self.model_data.row_number ) - # Process next file - self.import_next_file() + # Process next file(s) + self._import_next_file() class DoTrackImport(QThread): @@ -605,6 +585,9 @@ class DoTrackImport(QThread): self.signals = MusicMusterSignals() + def __repr__(self) -> str: + return f" None: """ Either create track objects from passed files or update exising track @@ -628,11 +611,11 @@ class DoTrackImport(QThread): if temp_file and os.path.exists(temp_file): os.unlink(temp_file) - with db.Session() as session: - self.signals.status_message_signal.emit( - f"Importing {os.path.basename(self.import_file_path)}", 5000 - ) + self.signals.status_message_signal.emit( + f"Importing {os.path.basename(self.import_file_path)}", 5000 + ) + with db.Session() as session: if self.track_id == 0: # Import new track try: @@ -657,6 +640,9 @@ class DoTrackImport(QThread): if hasattr(track, key): setattr(track, key, value) track.path = self.destination_track_path + else: + log.error(f"Unable to retrieve {self.track_id=}") + return session.commit() helpers.normalise_track(self.destination_track_path) diff --git a/app/playlistmodel.py b/app/playlistmodel.py index f96a4e4..53e174f 100644 --- a/app/playlistmodel.py +++ b/app/playlistmodel.py @@ -1151,6 +1151,7 @@ class PlaylistModel(QAbstractTableModel): ]: if ts: ts.update_playlist_and_row(session) + session.commit() self.update_track_times()