545 lines
18 KiB
Python
545 lines
18 KiB
Python
# Standard library imports
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from fuzzywuzzy import fuzz # type: ignore
|
|
import os.path
|
|
from typing import Optional
|
|
import os
|
|
import shutil
|
|
|
|
# PyQt imports
|
|
from PyQt6.QtCore import (
|
|
pyqtSignal,
|
|
QObject,
|
|
QThread,
|
|
)
|
|
from PyQt6.QtWidgets import (
|
|
QButtonGroup,
|
|
QDialog,
|
|
QHBoxLayout,
|
|
QLabel,
|
|
QPushButton,
|
|
QRadioButton,
|
|
QVBoxLayout,
|
|
)
|
|
|
|
# Third party imports
|
|
|
|
# App imports
|
|
from classes import (
|
|
ApplicationError,
|
|
MusicMusterSignals,
|
|
Tags,
|
|
)
|
|
from config import Config
|
|
from helpers import (
|
|
file_is_unreadable,
|
|
get_tags,
|
|
show_warning,
|
|
)
|
|
from log import log
|
|
from models import db, Tracks
|
|
from music_manager import track_sequence
|
|
from playlistmodel import PlaylistModel
|
|
import helpers
|
|
|
|
|
|
class DoTrackImport(QObject):
|
|
import_finished = pyqtSignal(int, QThread)
|
|
|
|
def __init__(
|
|
self,
|
|
associated_thread: QThread,
|
|
import_file_path: str,
|
|
tags: Tags,
|
|
destination_path: str,
|
|
track_id: int,
|
|
) -> None:
|
|
"""
|
|
Save parameters
|
|
"""
|
|
|
|
super().__init__()
|
|
self.associated_thread = associated_thread
|
|
self.import_file_path = import_file_path
|
|
self.tags = tags
|
|
self.destination_track_path = destination_path
|
|
self.track_id = track_id
|
|
|
|
self.signals = MusicMusterSignals()
|
|
|
|
def run(self) -> None:
|
|
"""
|
|
Either create track objects from passed files or update exising track
|
|
objects.
|
|
|
|
And add to visible playlist or update playlist if track already present.
|
|
"""
|
|
|
|
temp_file: Optional[str] = None
|
|
|
|
# Get audio metadata in this thread rather than calling function to save interactive time
|
|
self.audio_metadata = helpers.get_audio_metadata(self.import_file_path)
|
|
|
|
# If destination exists, move it out of the way
|
|
if os.path.exists(self.destination_track_path):
|
|
temp_file = self.destination_track_path + ".TMP"
|
|
shutil.move(self.destination_track_path, temp_file)
|
|
# Move file to destination
|
|
shutil.move(self.import_file_path, self.destination_track_path)
|
|
|
|
with db.Session() as session:
|
|
self.signals.status_message_signal.emit(
|
|
f"Importing {os.path.basename(self.import_file_path)}", 5000
|
|
)
|
|
|
|
if self.track_id == 0:
|
|
# Import new track
|
|
try:
|
|
track = Tracks(
|
|
session,
|
|
path=self.destination_track_path,
|
|
**self.tags._asdict(),
|
|
**self.audio_metadata._asdict(),
|
|
)
|
|
except Exception as e:
|
|
self.signals.show_warning_signal.emit(
|
|
"Error importing track", str(e)
|
|
)
|
|
return
|
|
else:
|
|
track = session.get(Tracks, self.track_id)
|
|
if track:
|
|
for key, value in self.tags._asdict().items():
|
|
if hasattr(track, key):
|
|
setattr(track, key, value)
|
|
for key, value in self.audio_metadata._asdict().items():
|
|
if hasattr(track, key):
|
|
setattr(track, key, value)
|
|
track.path = self.destination_track_path
|
|
session.commit()
|
|
|
|
helpers.normalise_track(self.destination_track_path)
|
|
|
|
self.signals.status_message_signal.emit(
|
|
f"{os.path.basename(self.import_file_path)} imported", 10000
|
|
)
|
|
self.import_finished.emit(track.id, self.associated_thread)
|
|
|
|
|
|
class FileImporter:
|
|
"""
|
|
Manage importing of files
|
|
"""
|
|
|
|
def __init__(
|
|
self, base_model: PlaylistModel, row_number: Optional[int] = None
|
|
) -> None:
|
|
"""
|
|
Set up class
|
|
"""
|
|
|
|
# Create ModelData
|
|
if not row_number:
|
|
row_number = base_model.rowCount()
|
|
self.model_data = ModelData(base_model=base_model, row_number=row_number)
|
|
|
|
# Place to keep reference to importer threads and data
|
|
self.thread_data: dict[QThread, ModelData] = {}
|
|
|
|
# Data structure to track files to import
|
|
self.import_files_data: dict[str, TrackFileData] = {}
|
|
|
|
# Dictionary of exsting tracks indexed by track.id
|
|
self.existing_tracks = self._get_existing_tracks()
|
|
|
|
# Populate self.import_files_data
|
|
for infile in [
|
|
os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f)
|
|
for f in os.listdir(Config.REPLACE_FILES_DEFAULT_SOURCE)
|
|
if f.endswith((".mp3", ".flac"))
|
|
]:
|
|
self.import_files_data[infile] = TrackFileData()
|
|
|
|
def do_import(self) -> None:
|
|
"""
|
|
Scan source directory and:
|
|
- check all file are readable
|
|
- load readable files and tags into self.import_files
|
|
- check all files are tagged
|
|
- check for exact match of existing file
|
|
- check for duplicates and replacements
|
|
- allow deselection of import for any one file
|
|
- import files and either replace existing or add to pool
|
|
"""
|
|
|
|
# Check all file are readable and have tags. Mark failures not to
|
|
# be imported and populate error text.
|
|
for path in self.import_files_data.keys():
|
|
if file_is_unreadable(path):
|
|
self.import_files_data[path].import_this_file = False
|
|
self.import_files_data[path].error = f"{path} is unreadable"
|
|
continue
|
|
|
|
# Get tags
|
|
try:
|
|
self.import_files_data[path].tags = get_tags(path)
|
|
except ApplicationError as e:
|
|
self.import_files_data[path].import_this_file = False
|
|
self.import_files_data[path].error = f"{path} tag errors ({str(e)})"
|
|
continue
|
|
|
|
# Get track match data
|
|
self.populate_track_match_data(path)
|
|
# Sort with best artist match first
|
|
self.import_files_data[path].track_match_data.sort(
|
|
key=lambda rec: rec.artist_match, reverse=True
|
|
)
|
|
|
|
# Process user choices
|
|
self.process_user_choices(path)
|
|
|
|
# Tell users about files that won't be imported
|
|
for path in [
|
|
a
|
|
for a in self.import_files_data.keys()
|
|
if not self.import_files_data[a].import_this_file
|
|
]:
|
|
msg = (
|
|
f"{os.path.basename(path)} will not be imported because "
|
|
f"{self.import_files_data[path].error}"
|
|
)
|
|
show_warning(None, "File not imported", msg)
|
|
|
|
# Import files once we have data for all of them (to avoid long
|
|
# drawn-out user interaction while we import files)
|
|
for path in [
|
|
a
|
|
for a in self.import_files_data.keys()
|
|
if self.import_files_data[a].import_this_file
|
|
]:
|
|
self._start_thread(path)
|
|
|
|
def _start_thread(self, path: str) -> None:
|
|
"""
|
|
Import the file specified by path
|
|
"""
|
|
|
|
log.debug(f"_start_thread({path=})")
|
|
|
|
# Create thread and worker
|
|
thread = QThread()
|
|
self.worker = DoTrackImport(
|
|
associated_thread=thread,
|
|
import_file_path=path,
|
|
tags=self.import_files_data[path].tags,
|
|
destination_path=self.import_files_data[path].destination_path,
|
|
track_id=self.import_files_data[path].track_id
|
|
)
|
|
|
|
# Associate data with the thread
|
|
self.thread_data[thread] = self.model_data
|
|
|
|
# Move self.worker to thread
|
|
self.worker.moveToThread(thread)
|
|
|
|
# Connect signals
|
|
thread.started.connect(self.worker.run)
|
|
self.worker.import_finished.connect(self._thread_finished)
|
|
self.worker.import_finished.connect(thread.quit)
|
|
self.worker.import_finished.connect(self.worker.deleteLater)
|
|
thread.finished.connect(thread.deleteLater)
|
|
|
|
# Start thread
|
|
thread.start()
|
|
|
|
def _thread_finished(self, track_id: int, thread: QThread) -> None:
|
|
"""
|
|
If track already in playlist, refresh it else insert it
|
|
"""
|
|
|
|
model_data = self.thread_data.pop(thread, None)
|
|
if model_data:
|
|
if model_data.base_model:
|
|
model_data.base_model.update_or_insert(track_id, model_data.row_number)
|
|
|
|
def _get_existing_track(self, track_id: int) -> Tracks:
|
|
"""
|
|
Lookup in existing track in the local cache and return it
|
|
"""
|
|
|
|
existing_track_records = [a for a in self.existing_tracks if a.id == track_id]
|
|
if len(existing_track_records) != 1:
|
|
raise ApplicationError(
|
|
f"Internal error in _get_existing_track: {existing_track_records=}"
|
|
)
|
|
|
|
return existing_track_records[0]
|
|
|
|
def _get_existing_tracks(self):
|
|
"""
|
|
Return a dictionary {title: Track} for all existing tracks
|
|
"""
|
|
|
|
with db.Session() as session:
|
|
return Tracks.get_all(session)
|
|
|
|
def _get_match_score(self, str1: str, str2: str) -> float:
|
|
"""
|
|
Return the score of how well str1 matches str2.
|
|
"""
|
|
|
|
ratio = fuzz.ratio(str1, str2)
|
|
partial_ratio = fuzz.partial_ratio(str1, str2)
|
|
token_sort_ratio = fuzz.token_sort_ratio(str1, str2)
|
|
token_set_ratio = fuzz.token_set_ratio(str1, str2)
|
|
|
|
# Combine scores
|
|
combined_score = (
|
|
ratio * 0.25
|
|
+ partial_ratio * 0.25
|
|
+ token_sort_ratio * 0.25
|
|
+ token_set_ratio * 0.25
|
|
)
|
|
|
|
return combined_score
|
|
|
|
def populate_track_match_data(self, path: str) -> None:
|
|
"""
|
|
Populate self.import_files_data[path].track_match_data
|
|
|
|
- Search title in existing tracks
|
|
- if score >= Config.FUZZYMATCH_MINIMUM_LIST:
|
|
- get artist score
|
|
- add TrackMatchData to self.import_files_data[path].track_match_data
|
|
"""
|
|
|
|
title = self.import_files_data[path].tags.title
|
|
artist = self.import_files_data[path].tags.artist
|
|
|
|
for track in self.existing_tracks:
|
|
title_score = self._get_match_score(title, track.title)
|
|
if title_score >= Config.FUZZYMATCH_MINIMUM_LIST:
|
|
artist_score = self._get_match_score(artist, track.artist)
|
|
self.import_files_data[path].track_match_data.append(
|
|
TrackMatchData(
|
|
artist=track.artist,
|
|
artist_match=artist_score,
|
|
title=track.title,
|
|
title_match=title_score,
|
|
track_id=track.id,
|
|
)
|
|
)
|
|
|
|
def process_user_choices(self, path: str) -> None:
|
|
"""
|
|
Find out whether user wants to import this as a new track,
|
|
overwrite an existing track or not import it at all.
|
|
"""
|
|
|
|
# Build a list of (track title and artist, track_id, track path)
|
|
choices: list[tuple[str, int, str]] = []
|
|
|
|
# First choice is always to import as a new track
|
|
choices.append((Config.DO_NOT_IMPORT, -1, ""))
|
|
choices.append((Config.IMPORT_AS_NEW, 0, ""))
|
|
|
|
# New track details
|
|
importing_track_description = (
|
|
f"{self.import_files_data[path].tags.title} "
|
|
f"({self.import_files_data[path].tags.artist})"
|
|
)
|
|
|
|
# Select 'import as new' as default unless the top match is good
|
|
# enough
|
|
default = 1 # default choice is import as new
|
|
track_match_data = self.import_files_data[path].track_match_data
|
|
if (
|
|
track_match_data[0].artist_match >= Config.FUZZYMATCH_MINIMUM_SELECT_ARTIST
|
|
and track_match_data[0].title_match
|
|
>= Config.FUZZYMATCH_MINIMUM_SELECT_TITLE
|
|
):
|
|
default = 2
|
|
|
|
for rec in track_match_data:
|
|
existing_track_description = (f"{rec.title} ({rec.artist})")
|
|
if Config.FUZZYMATCH_SHOW_SCORES:
|
|
existing_track_description += f" ({rec.title_match:.0f}%)"
|
|
existing_track_path = self._get_existing_track(rec.track_id).path
|
|
choices.append(
|
|
(existing_track_description, rec.track_id, existing_track_path)
|
|
)
|
|
|
|
dialog = PickMatch(
|
|
new_track_description=importing_track_description,
|
|
choices=choices,
|
|
default=default,
|
|
)
|
|
if dialog.exec():
|
|
if dialog.selected_track_id < 0:
|
|
self.import_files_data[path].import_this_file = False
|
|
self.import_files_data[path].error = "you asked not to import this file"
|
|
elif dialog.selected_track_id > 0:
|
|
self.replace_file(path=path, track_id=dialog.selected_track_id)
|
|
else:
|
|
self.import_as_new(path=path)
|
|
else:
|
|
# User cancelled dialog
|
|
self.import_files_data[path].import_this_file = False
|
|
self.import_files_data[path].error = "you cancelled the import of this file"
|
|
|
|
def import_as_new(self, path: str) -> None:
|
|
"""
|
|
Import passed path as a new file
|
|
"""
|
|
|
|
log.debug(f"Import as new, {path=}")
|
|
|
|
tfd = self.import_files_data[path]
|
|
|
|
destination_path = os.path.join(Config.IMPORT_DESTINATION, os.path.basename(path))
|
|
if os.path.exists(destination_path):
|
|
tfd.import_this_file = False
|
|
tfd.error = (
|
|
f"this is a new import but destination file already exists ({destination_path})"
|
|
)
|
|
return
|
|
|
|
tfd.destination_path = destination_path
|
|
|
|
def replace_file(self, path: str, track_id: int) -> None:
|
|
"""
|
|
Replace existing track {track_id=} with passed path
|
|
"""
|
|
|
|
log.debug(f"Replace {track_id=} with {path=}")
|
|
|
|
tfd = self.import_files_data[path]
|
|
|
|
existing_track_path = self._get_existing_track(track_id).path
|
|
proposed_destination_path = os.path.join(
|
|
os.path.dirname(existing_track_path), os.path.basename(path)
|
|
)
|
|
# if the destination path exists and it's not the path the
|
|
# track_id points to, abort
|
|
if existing_track_path != proposed_destination_path and os.path.exists(
|
|
proposed_destination_path
|
|
):
|
|
tfd.import_this_file = False
|
|
tfd.error = f"New import would overwrite existing file ({proposed_destination_path})"
|
|
return
|
|
tfd.file_path_to_remove = existing_track_path
|
|
tfd.destination_path = proposed_destination_path
|
|
tfd.track_id = track_id
|
|
|
|
|
|
@dataclass
|
|
class ModelData:
|
|
base_model: PlaylistModel
|
|
row_number: int
|
|
|
|
|
|
class PickMatch(QDialog):
|
|
"""
|
|
Dialog for user to select which existing track to replace or to
|
|
import to a new track
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
new_track_description: str,
|
|
choices: list[tuple[str, int, str]],
|
|
default: int,
|
|
) -> None:
|
|
super().__init__()
|
|
self.new_track_description = new_track_description
|
|
self.default = default
|
|
self.init_ui(choices)
|
|
self.selected_track_id = -1
|
|
|
|
def init_ui(self, choices: list[tuple[str, int, str]]) -> None:
|
|
"""
|
|
Set up dialog
|
|
"""
|
|
|
|
self.setWindowTitle("New or replace")
|
|
|
|
layout = QVBoxLayout()
|
|
|
|
# Add instructions
|
|
instructions = (
|
|
f"Importing {self.new_track_description}.\n"
|
|
"Import as a new track or replace existing track?"
|
|
)
|
|
instructions_label = QLabel(instructions)
|
|
layout.addWidget(instructions_label)
|
|
|
|
# Create a button group for radio buttons
|
|
self.button_group = QButtonGroup()
|
|
|
|
# Add radio buttons for each item
|
|
for idx, (track_description, track_id, track_path) in enumerate(choices):
|
|
if (
|
|
track_sequence.current
|
|
and track_id
|
|
and track_sequence.current.track_id == track_id
|
|
):
|
|
# Don't allow current track to be replaced
|
|
track_description = "(Currently playing) " + track_description
|
|
radio_button = QRadioButton(track_description)
|
|
radio_button.setDisabled(True)
|
|
self.button_group.addButton(radio_button, -1)
|
|
else:
|
|
radio_button = QRadioButton(track_description)
|
|
radio_button.setToolTip(track_path)
|
|
self.button_group.addButton(radio_button, track_id)
|
|
layout.addWidget(radio_button)
|
|
|
|
# Select the second item by default (import as new)
|
|
if idx == self.default:
|
|
radio_button.setChecked(True)
|
|
|
|
# Add OK and Cancel buttons
|
|
button_layout = QHBoxLayout()
|
|
ok_button = QPushButton("OK")
|
|
cancel_button = QPushButton("Cancel")
|
|
button_layout.addWidget(ok_button)
|
|
button_layout.addWidget(cancel_button)
|
|
layout.addLayout(button_layout)
|
|
|
|
self.setLayout(layout)
|
|
|
|
# Connect buttons to actions
|
|
ok_button.clicked.connect(self.on_ok)
|
|
cancel_button.clicked.connect(self.reject)
|
|
|
|
def on_ok(self):
|
|
# Get the ID of the selected button
|
|
self.selected_track_id = self.button_group.checkedId()
|
|
self.accept()
|
|
|
|
|
|
@dataclass
|
|
class TrackFileData:
|
|
"""
|
|
Simple class to track details changes to a track file
|
|
"""
|
|
|
|
tags: Tags = Tags()
|
|
destination_path: str = ""
|
|
import_this_file: bool = True
|
|
error: str = ""
|
|
file_path_to_remove: Optional[str] = None
|
|
track_id: int = 0
|
|
track_match_data: list[TrackMatchData] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class TrackMatchData:
|
|
artist: str
|
|
artist_match: float
|
|
title: str
|
|
title_match: float
|
|
track_id: int
|