New FileImporter working, tests to be written

This commit is contained in:
Keith Edmunds 2025-01-28 17:25:06 +00:00
parent 52a773176c
commit 92e1a1cac8
3 changed files with 52 additions and 64 deletions

View File

@ -83,6 +83,7 @@ class Config(object):
MAIL_USERNAME = os.environ.get("MAIL_USERNAME")
MAIL_USE_TLS = os.environ.get("MAIL_USE_TLS") is not None
MAX_IMPORT_MATCHES = 5
MAX_IMPORT_THREADS = 3
MAX_INFO_TABS = 5
MAX_MISSING_FILES_TO_REPORT = 10
MILLISECOND_SIGFIGS = 0

View File

@ -3,6 +3,7 @@ from __future__ import annotations
from dataclasses import dataclass, field
from fuzzywuzzy import fuzz # type: ignore
import os.path
import threading
from typing import Optional, Sequence
import os
import shutil
@ -98,11 +99,10 @@ class FileImporter:
The actual import is handled by the DoTrackImport class.
"""
# Place to keep a reference to importer threads. This is an instance
# variable to allow tests to access the threads. As this is a
# singleton, a class variable or an instance variable are effectively
# the same thing.
threads: list[QThread] = []
# Place to keep a reference to importer workers. This is an instance
# variable to allow tests access. As this is a singleton, a class
# variable or an instance variable are effectively the same thing.
workers: dict[str, DoTrackImport] = {}
def __init__(
self, base_model: PlaylistModel, row_number: Optional[int] = None
@ -111,13 +111,6 @@ class FileImporter:
Initialise the FileImporter singleton instance.
"""
self.initialized: bool
if hasattr(self, "initialized") and self.initialized:
return # Prevent re-initialization of the singleton
self.initialized = True # Mark the instance as initialized
# Create ModelData
if not row_number:
row_number = base_model.rowCount()
@ -126,18 +119,9 @@ class FileImporter:
# Data structure to track files to import
self.import_files_data: list[TrackFileData] = []
# Keep track of workers
self.workers: dict[str, DoTrackImport] = {}
# Limit concurrent threads
self.max_concurrent_threads: int = 3
# Dictionary of exsting tracks indexed by track.id
self.existing_tracks = self._get_existing_tracks()
# Track whether importing is active
self.importing: bool = False
# Get signals
self.signals = MusicMusterSignals()
@ -151,9 +135,8 @@ class FileImporter:
def start(self) -> None:
"""
Build a TrackFileData object for each new file to import, add the
TrackFileData object to self.import_files_data, and trigger
importing.
Build a TrackFileData object for each new file to import, add it
to self.import_files_data, and trigger importing.
"""
new_files: list[str] = []
@ -186,9 +169,14 @@ class FileImporter:
]
)
# Remove do-not-import entries from queue
self.import_files_data[:] = [
a for a in self.import_files_data if a.import_this_file is not False
]
# Start the import if necessary
if not self.importing:
self.import_next_file()
log.debug(f"Import files prepared: {[a.source_path for a in self.import_files_data]}")
self._import_next_file()
def populate_trackfiledata(self, path: str) -> TrackFileData:
"""
@ -493,42 +481,30 @@ class FileImporter:
show_OK("File not imported", "\r\r".join(msgs))
log.debug("\r\r".join(msgs))
def import_next_file(self) -> None:
def _import_next_file(self) -> None:
"""
Import the next file sequentially.
This is called when an import completes so will be called asynchronously.
Protect with a lock.
"""
# Remove any entries that should not be imported. Modify list
# in-place rather than create a new list, which will retain any
# references to self.import_files_data.
self.import_files_data[:] = [
a for a in self.import_files_data if a.import_this_file
]
lock = threading.Lock()
# If no valid files remain, mark importing as False and exit
if not self.import_files_data:
self.importing = False
self.signals.status_message_signal.emit("All files imported", 10000)
log.debug("import_next_file: all files imported")
return
self.importing = (
True # Now safe to mark as True since at least one file is valid)
)
while (
len(FileImporter.threads) < self.max_concurrent_threads
and self.import_files_data
):
tfd = self.import_files_data.pop()
filename = os.path.basename(tfd.source_path)
log.debug(f"processing: {filename}")
log.debug(
"remaining: "
f"{[os.path.basename(a.source_path) for a in self.import_files_data]}"
)
self.signals.status_message_signal.emit(f"Importing {filename}", 10000)
self._start_import(tfd)
with lock:
while len(self.workers) < Config.MAX_IMPORT_THREADS:
try:
tfd = self.import_files_data.pop()
filename = os.path.basename(tfd.source_path)
log.debug(f"_import_next_file: {filename}")
log.debug(
f"remaining files: {[a.source_path for a in self.import_files_data]}"
)
self.signals.status_message_signal.emit(f"Importing {filename}", 10000)
self._start_import(tfd)
except IndexError:
log.debug("import_next_file: no files remaining in queue")
break
def _start_import(self, tfd: TrackFileData) -> None:
"""
@ -544,7 +520,7 @@ class FileImporter:
destination_path=tfd.destination_path,
track_id=tfd.track_id,
)
log.debug(f"{self.workers[tfd.source_path]=} created for {filename=}")
log.debug(f"{self.workers[tfd.source_path]=} created")
self.workers[tfd.source_path].import_finished.connect(self.post_import_processing)
self.workers[tfd.source_path].finished.connect(lambda: self.cleanup_thread(tfd))
@ -561,6 +537,10 @@ class FileImporter:
if tfd.source_path in self.workers:
del self.workers[tfd.source_path]
else:
log.debug(f"Couldn't find entry in self.workers: {tfd.source_path=}")
log.debug(f"After cleanup_thread: {self.workers.keys()=}")
def post_import_processing(self, source_path: str, track_id: int) -> None:
"""
@ -575,8 +555,8 @@ class FileImporter:
track_id, self.model_data.row_number
)
# Process next file
self.import_next_file()
# Process next file(s)
self._import_next_file()
class DoTrackImport(QThread):
@ -605,6 +585,9 @@ class DoTrackImport(QThread):
self.signals = MusicMusterSignals()
def __repr__(self) -> str:
return f"<DoTrackImport(id={hex(id(self))}, import_file_path={self.import_file_path}"
def run(self) -> None:
"""
Either create track objects from passed files or update exising track
@ -628,11 +611,11 @@ class DoTrackImport(QThread):
if temp_file and os.path.exists(temp_file):
os.unlink(temp_file)
with db.Session() as session:
self.signals.status_message_signal.emit(
f"Importing {os.path.basename(self.import_file_path)}", 5000
)
self.signals.status_message_signal.emit(
f"Importing {os.path.basename(self.import_file_path)}", 5000
)
with db.Session() as session:
if self.track_id == 0:
# Import new track
try:
@ -657,6 +640,9 @@ class DoTrackImport(QThread):
if hasattr(track, key):
setattr(track, key, value)
track.path = self.destination_track_path
else:
log.error(f"Unable to retrieve {self.track_id=}")
return
session.commit()
helpers.normalise_track(self.destination_track_path)

View File

@ -1151,6 +1151,7 @@ class PlaylistModel(QAbstractTableModel):
]:
if ts:
ts.update_playlist_and_row(session)
session.commit()
self.update_track_times()