Compare commits

..

4 Commits

Author SHA1 Message Date
Keith Edmunds
4a4058d211 Import rewrite WIP 2025-01-13 15:29:50 +00:00
Keith Edmunds
3b71041b66 Remove profiling calls (again) 2025-01-10 20:37:49 +00:00
Keith Edmunds
d30bf49c88 Don't select unplayable track as next track 2025-01-10 20:27:26 +00:00
Keith Edmunds
3a3b1b712d Much improved file importer 2025-01-10 19:50:53 +00:00
11 changed files with 410 additions and 369 deletions

View File

@ -94,10 +94,10 @@ class MusicMusterSignals(QObject):
class Tags(NamedTuple):
artist: str
title: str
bitrate: int
duration: int
artist: str = ""
title: str = ""
bitrate: int = 0
duration: int = 0
class TrackInfo(NamedTuple):

View File

@ -51,6 +51,10 @@ class Config(object):
FADEOUT_DB = -10
FADEOUT_SECONDS = 5
FADEOUT_STEPS_PER_SECOND = 5
FUZZYMATCH_MINIMUM_LIST = 60.0
FUZZYMATCH_MINIMUM_SELECT_ARTIST = 80.0
FUZZYMATCH_MINIMUM_SELECT_TITLE = 80.0
FUZZYMATCH_SHOW_SCORES = True
HEADER_ARTIST = "Artist"
HEADER_BITRATE = "bps"
HEADER_DURATION = "Length"
@ -62,8 +66,8 @@ class Config(object):
HEADER_START_TIME = "Start"
HEADER_TITLE = "Title"
HIDE_AFTER_PLAYING_OFFSET = 5000
HIDE_PLAYED_MODE_TRACKS = "TRACKS"
HIDE_PLAYED_MODE_SECTIONS = "SECTIONS"
HIDE_PLAYED_MODE_TRACKS = "TRACKS"
IMPORT_AS_NEW = "Import as new track"
INFO_TAB_TITLE_LENGTH = 15
INTRO_SECONDS_FORMAT = ".1f"
@ -82,7 +86,6 @@ class Config(object):
MAX_INFO_TABS = 5
MAX_MISSING_FILES_TO_REPORT = 10
MILLISECOND_SIGFIGS = 0
MINIMUM_FUZZYMATCH = 60.0
MINIMUM_ROW_HEIGHT = 30
NO_TEMPLATE_NAME = "None"
NOTE_TIME_FORMAT = "%H:%M"
@ -123,5 +126,5 @@ class Config(object):
# These rely on earlier definitions
HIDE_PLAYED_MODE = HIDE_PLAYED_MODE_SECTIONS
IMPORT_DESTINATION = os.path.join(ROOT, "Singles")
IMPORT_DESTINATION = "/tmp/mm" # os.path.join(ROOT, "Singles")
REPLACE_FILES_DEFAULT_DESTINATION = os.path.dirname(REPLACE_FILES_DEFAULT_SOURCE)

View File

@ -1,6 +1,5 @@
# Standard library imports
from typing import Optional
import os
# PyQt imports
from PyQt6.QtCore import QEvent, Qt
@ -9,27 +8,22 @@ from PyQt6.QtWidgets import (
QDialog,
QListWidgetItem,
QMainWindow,
QTableWidgetItem,
)
# Third party imports
import pydymenu # type: ignore
from sqlalchemy.orm.session import Session
# App imports
from classes import MusicMusterSignals
from config import Config
from helpers import (
ask_yes_no,
get_relative_date,
get_tags,
ms_to_mmss,
show_warning,
)
from log import log
from models import db, Settings, Tracks
from models import Settings, Tracks
from playlistmodel import PlaylistModel
from ui import dlg_TrackSelect_ui, dlg_replace_files_ui
from ui import dlg_TrackSelect_ui
class TrackSelectDialog(QDialog):

View File

@ -1,7 +1,7 @@
# Standard library imports
from __future__ import annotations
from dataclasses import dataclass
from dataclasses import dataclass, field
from fuzzywuzzy import fuzz # type: ignore
import os.path
from typing import Optional
@ -17,11 +17,11 @@ from PyQt6.QtCore import (
from PyQt6.QtWidgets import (
QButtonGroup,
QDialog,
QFileDialog,
QHBoxLayout,
QLabel,
QPushButton,
QRadioButton,
QStatusBar,
QVBoxLayout,
)
@ -30,8 +30,6 @@ from PyQt6.QtWidgets import (
# App imports
from classes import (
ApplicationError,
AudioMetadata,
FileErrors,
MusicMusterSignals,
Tags,
)
@ -39,7 +37,7 @@ from config import Config
from helpers import (
file_is_unreadable,
get_tags,
show_warning,
show_OK,
)
from log import log
from models import db, Tracks
@ -49,35 +47,26 @@ import helpers
class DoTrackImport(QObject):
import_finished = pyqtSignal()
import_finished = pyqtSignal(int, QThread)
def __init__(
self,
associated_thread: QThread,
import_file_path: str,
tags: Tags,
destination_track_path: str,
destination_path: str,
track_id: int,
audio_metadata: AudioMetadata,
base_model: PlaylistModel,
row_number: Optional[int],
) -> None:
"""
Save parameters
"""
super().__init__()
self.associated_thread = associated_thread
self.import_file_path = import_file_path
self.tags = tags
self.destination_track_path = destination_track_path
self.destination_track_path = destination_path
self.track_id = track_id
self.audio_metadata = audio_metadata
self.base_model = base_model
if row_number is None:
self.next_row_number = base_model.rowCount()
else:
self.next_row_number = row_number
self.signals = MusicMusterSignals()
@ -91,12 +80,18 @@ class DoTrackImport(QObject):
temp_file: Optional[str] = None
# Get audio metadata in this thread rather than calling function to save interactive time
self.audio_metadata = helpers.get_audio_metadata(self.import_file_path)
# If destination exists, move it out of the way
if os.path.exists(self.destination_track_path):
temp_file = self.destination_track_path + ".TMP"
shutil.move(self.destination_track_path, temp_file)
# Move file to destination
shutil.move(self.import_file_path, self.destination_track_path)
# Clean up
if temp_file and os.path.exists(temp_file):
os.unlink(temp_file)
with db.Session() as session:
self.signals.status_message_signal.emit(
@ -130,13 +125,11 @@ class DoTrackImport(QObject):
session.commit()
helpers.normalise_track(self.destination_track_path)
self.base_model.insert_row(self.next_row_number, track.id, "imported")
self.next_row_number += 1
self.signals.status_message_signal.emit(
f"{os.path.basename(self.import_file_path)} imported", 10000
)
self.import_finished.emit()
self.signals.status_message_signal.emit(
f"{os.path.basename(self.import_file_path)} imported", 10000
)
self.import_finished.emit(track.id, self.associated_thread)
class FileImporter:
@ -151,32 +144,27 @@ class FileImporter:
Set up class
"""
# Save parameters
self.base_model = base_model
if row_number:
self.row_number = row_number
else:
self.row_number = base_model.rowCount()
# Create ModelData
if not row_number:
row_number = base_model.rowCount()
self.model_data = ThreadData(base_model=base_model, row_number=row_number)
# Place to keep reference to importer threads and data
self.thread_data: dict[QThread, ThreadData] = {}
# Data structure to track files to import
self.import_files_data: list[TrackFileData] = []
# Dictionary of exsting tracks
self.import_files_data: dict[str, TrackFileData] = {}
# Dictionary of exsting tracks indexed by track.id
self.existing_tracks = self._get_existing_tracks()
# List of track_id, title tuples
self.track_idx_and_title = [
((a.id, a.title)) for a in self.existing_tracks.values()
]
# Files to import
self.import_files_paths = [
# Populate self.import_files_data
for infile in [
os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f)
for f in os.listdir(Config.REPLACE_FILES_DEFAULT_SOURCE)
if f.endswith((".mp3", ".flac"))
]
# Files we can't import
self.unimportable_files: list[FileErrors] = []
# Files user doesn't want imported
self.do_not_import: list[str] = []
# Place to keep reference to importer while it runs
self.import_thread: dict[int, QThread] = {}
]:
self.import_files_data[infile] = TrackFileData()
def do_import(self) -> None:
"""
@ -190,206 +178,110 @@ class FileImporter:
- import files and either replace existing or add to pool
"""
# check all file are readable
self.check_files_are_readable()
# load readable files and tags into self.import_files
for import_file in self.import_files_paths:
try:
tags = get_tags(import_file)
except ApplicationError as e:
self.unimportable_files.append(
FileErrors(path=import_file, error=str(e))
)
self.import_files_paths.remove(import_file)
try:
self.import_files_data.append(
TrackFileData(import_file_path=import_file, tags=tags)
)
except Exception as e:
self.unimportable_files.append(
FileErrors(path=import_file, error=str(e))
)
self.import_files_paths.remove(import_file)
if self.unimportable_files:
msg = "The following files could not be read and won't be imported:\n"
for unimportable_file in self.unimportable_files:
msg += f"\n\t{unimportable_file.path} ({unimportable_file.error})"
show_warning(None, "Unimportable files", msg)
# check for close matches.
for idx in range(len(self.import_files_data)):
self.check_match(idx=idx)
self.import_files_data = [
x
for x in self.import_files_data
if (
x.import_file_path not in self.do_not_import
or x.track_id != -1
)
]
# Import all that's left.
for idx in range(len(self.import_files_data)):
self._import_file(idx)
def check_match(self, idx: int) -> None:
"""
Work on and update the idx element of self.import_file_data.
Check for similar existing titles. If none found, set up to
import this as a new track. If one is found, check with user
whether this is a new track or replacement. If more than one
is found, as for one but order the tracks in
artist-similarity order.
"""
similar_track_ids = self._find_similar_strings(
self.import_files_data[idx].tags.title, self.track_idx_and_title
)
if len(similar_track_ids) == 0:
matching_track = 0
elif len(similar_track_ids) == 1:
matching_track = self._pick_match(idx, similar_track_ids)
else:
matching_track = self._pick_match(
idx, self.order_by_artist(idx, similar_track_ids)
)
if matching_track < 0: # User cancelled
matching_track = -1
elif matching_track == 0:
self.import_files_data[idx].destination_track_path = os.path.join(
Config.IMPORT_DESTINATION,
os.path.basename(self.import_files_data[idx].import_file_path),
)
else:
self.import_files_data[idx].destination_track_path = self.existing_tracks[
matching_track
].path
self.import_files_data[idx].track_id = matching_track
def _import_file(self, idx: int) -> None:
"""
Import the file specified at self.import_files_data[idx]
"""
log.debug(f"_import_file({idx=}), {self.import_files_data[idx]=}")
f = self.import_files_data[idx]
# Import in separate thread
self.import_thread[idx] = QThread()
self.worker = DoTrackImport(
import_file_path=f.import_file_path,
tags=f.tags,
destination_track_path=f.destination_track_path,
track_id=f.track_id,
audio_metadata=helpers.get_audio_metadata(f.import_file_path),
base_model=self.base_model,
row_number=self.row_number,
)
self.worker.moveToThread(self.import_thread[idx])
self.import_thread[idx].started.connect(self.worker.run)
self.worker.import_finished.connect(self.import_thread[idx].quit)
self.worker.import_finished.connect(self.worker.deleteLater)
self.import_thread[idx].finished.connect(self.import_thread[idx].deleteLater)
self.import_thread[idx].start()
def order_by_artist(self, idx: int, track_ids_to_check: list[int]) -> list[int]:
"""
Return the list of track_ids sorted by how well the artist at idx matches the
track artist.
"""
track_idx_and_artist = [
((key, a.artist))
for key, a in self.existing_tracks.items()
if key in track_ids_to_check
]
# We want to return all of the passed tracks so set minimum_score
# to zero
return self._find_similar_strings(
self.import_files_data[idx].tags.artist,
track_idx_and_artist,
minimum_score=0.0,
)
def _pick_match(self, idx: int, track_ids: list[int]) -> int:
"""
Return the track_id selected by the user, including "import as new" which will be
track_id 0. Return -1 if user cancels.
If user chooses not to import this track, remove it from the list of tracks to
import and return -1.
"""
log.debug(f"_pick_match({idx=}, {track_ids=})")
new_track_details = (
f"{self.import_files_data[idx].tags.title} "
f"({self.import_files_data[idx].tags.artist})"
)
# Build a list of (track title and artist, track_id, track path)
choices: list[tuple[str, int, str]] = []
# First choice is always to import as a new track
choices.append((Config.DO_NOT_IMPORT, -2, ""))
choices.append((Config.IMPORT_AS_NEW, 0, ""))
for track_id in track_ids:
choices.append(
(
f"{self.existing_tracks[track_id].title} "
f"({self.existing_tracks[track_id].artist})",
track_id,
str(self.existing_tracks[track_id].path),
)
)
dialog = PickMatch(new_track_details, choices)
if dialog.exec() and dialog.selected_id >= 0:
return dialog.selected_id
else:
self.do_not_import.append(self.import_files_data[idx].import_file_path)
return -1
def check_files_are_readable(self) -> None:
"""
Check files to be imported are readable. If not, remove them from the
import list and add them to the file errors list.
"""
for path in self.import_files_paths:
# Check all file are readable and have tags. Mark failures not to
# be imported and populate error text.
for path in self.import_files_data.keys():
if file_is_unreadable(path):
self.unimportable_files.append(
FileErrors(path=os.path.basename(path), error="File is unreadable")
)
self.import_files_paths.remove(path)
self.import_files_data[path].import_this_file = False
self.import_files_data[path].error = f"{path} is unreadable"
continue
def import_files_are_tagged(self) -> list:
"""
Return a (possibly empty) list of all untagged files in the
import directory. Add tags to file_data
"""
# Get tags
try:
self.import_files_data[path].tags = get_tags(path)
except ApplicationError as e:
self.import_files_data[path].import_this_file = False
self.import_files_data[path].error = f"Tag errors ({str(e)})"
continue
untagged_files: list[str] = []
for fullpath in self.import_files_paths:
tags = get_tags(fullpath)
if not tags:
untagged_files.append(os.path.basename(fullpath))
# Remove from import list
del self.import_files_data[fullpath]
log.warning(f"Import: no tags found, {fullpath=}")
# Get track match data
self.populate_track_match_data(path)
# Sort with best artist match first
self.import_files_data[path].track_match_data.sort(
key=lambda rec: rec.artist_match, reverse=True
)
# Process user choices
self.process_user_choices(path)
# Import files and tell users about files that won't be imported
msgs: list[str] = []
for (path, entry) in self.import_files_data.items():
if entry.import_this_file:
self._start_thread(path)
else:
self.import_files_data.append(
TrackFileData(import_file_path=fullpath, tags=tags)
msgs.append(
f"{os.path.basename(path)} will not be imported because {entry.error}"
)
if msgs:
show_OK("File not imported", "\r\r".join(msgs))
return untagged_files
def _start_thread(self, path: str) -> None:
"""
Import the file specified by path
"""
log.debug(f"_start_thread({path=})")
# Create thread and worker
thread = QThread()
worker = DoTrackImport(
associated_thread=thread,
import_file_path=path,
tags=self.import_files_data[path].tags,
destination_path=self.import_files_data[path].destination_path,
track_id=self.import_files_data[path].track_id,
)
# Associate data with the thread
self.model_data.worker = worker
self.thread_data[thread] = self.model_data
# Move worker to thread
worker.moveToThread(thread)
log.debug(f"_start_thread_worker started ({path=}, {id(thread)=}, {id(worker)=})")
# Connect signals
thread.started.connect(lambda: log.debug(f"Thread {thread} started"))
thread.started.connect(worker.run)
thread.finished.connect(lambda: log.debug(f"Thread {thread} finished"))
thread.finished.connect(thread.deleteLater)
worker.import_finished.connect(
lambda: log.debug(f"Worker task finished for thread {thread}")
)
worker.import_finished.connect(self._thread_finished)
worker.import_finished.connect(thread.quit)
worker.import_finished.connect(worker.deleteLater)
# Start thread
thread.start()
def _thread_finished(self, track_id: int, thread: QThread) -> None:
"""
If track already in playlist, refresh it else insert it
"""
log.debug(f" Ending thread {thread}")
model_data = self.thread_data.pop(thread, None)
if model_data:
if model_data.base_model:
model_data.base_model.update_or_insert(track_id, model_data.row_number)
def _get_existing_track(self, track_id: int) -> Tracks:
"""
Lookup in existing track in the local cache and return it
"""
existing_track_records = [a for a in self.existing_tracks if a.id == track_id]
if len(existing_track_records) != 1:
raise ApplicationError(
f"Internal error in _get_existing_track: {existing_track_records=}"
)
return existing_track_records[0]
def _get_existing_tracks(self):
"""
@ -397,51 +289,195 @@ class FileImporter:
"""
with db.Session() as session:
return Tracks.all_tracks_indexed_by_id(session)
return Tracks.get_all(session)
def _find_similar_strings(
self,
needle: str,
haystack: list[tuple[int, str]],
minimum_score: float = Config.MINIMUM_FUZZYMATCH,
) -> list[int]:
def _get_match_score(self, str1: str, str2: str) -> float:
"""
Search for the needle in the string element of the haystack.
Discard similarities less that minimum_score. Return a list of
the int element of the haystack in order of decreasing score (ie,
best match first).
Return the score of how well str1 matches str2.
"""
# Create a dictionary to store similarities
similarities: dict[int, float] = {}
ratio = fuzz.ratio(str1, str2)
partial_ratio = fuzz.partial_ratio(str1, str2)
token_sort_ratio = fuzz.token_sort_ratio(str1, str2)
token_set_ratio = fuzz.token_set_ratio(str1, str2)
for hayblade in haystack:
# Calculate similarity using multiple metrics
ratio = fuzz.ratio(needle, hayblade[1])
partial_ratio = fuzz.partial_ratio(needle, hayblade[1])
token_sort_ratio = fuzz.token_sort_ratio(needle, hayblade[1])
token_set_ratio = fuzz.token_set_ratio(needle, hayblade[1])
# Combine scores
combined_score = (
ratio * 0.25
+ partial_ratio * 0.25
+ token_sort_ratio * 0.25
+ token_set_ratio * 0.25
)
# Combine scores
combined_score = (
ratio * 0.25
+ partial_ratio * 0.25
+ token_sort_ratio * 0.25
+ token_set_ratio * 0.25
)
return combined_score
if combined_score >= minimum_score:
similarities[hayblade[0]] = combined_score
log.debug(
f"_find_similar_strings({needle=}), {len(haystack)=}, "
f"{minimum_score=}, {hayblade=}, {combined_score=}"
def populate_track_match_data(self, path: str) -> None:
"""
Populate self.import_files_data[path].track_match_data
- Search title in existing tracks
- if score >= Config.FUZZYMATCH_MINIMUM_LIST:
- get artist score
- add TrackMatchData to self.import_files_data[path].track_match_data
"""
title = self.import_files_data[path].tags.title
artist = self.import_files_data[path].tags.artist
for track in self.existing_tracks:
title_score = self._get_match_score(title, track.title)
if title_score >= Config.FUZZYMATCH_MINIMUM_LIST:
artist_score = self._get_match_score(artist, track.artist)
self.import_files_data[path].track_match_data.append(
TrackMatchData(
artist=track.artist,
artist_match=artist_score,
title=track.title,
title_match=title_score,
track_id=track.id,
)
)
# Sort matches by score
sorted_matches = sorted(similarities.items(), key=lambda x: x[1], reverse=True)
def process_user_choices(self, path: str) -> None:
"""
Find out whether user wants to import this as a new track,
overwrite an existing track or not import it at all.
"""
# Return list of indexes, highest score first
return [a[0] for a in sorted_matches]
# Build a list of (track title and artist, track_id, track path)
choices: list[tuple[str, int, str]] = []
# First choice is always to import as a new track
choices.append((Config.DO_NOT_IMPORT, -1, ""))
choices.append((Config.IMPORT_AS_NEW, 0, ""))
# New track details
importing_track_description = (
f"{self.import_files_data[path].tags.title} "
f"({self.import_files_data[path].tags.artist})"
)
# Select 'import as new' as default unless the top match is good
# enough
default = 1 # default choice is import as new
track_match_data = self.import_files_data[path].track_match_data
try:
if track_match_data:
if (
track_match_data[0].artist_match
>= Config.FUZZYMATCH_MINIMUM_SELECT_ARTIST
and track_match_data[0].title_match
>= Config.FUZZYMATCH_MINIMUM_SELECT_TITLE
):
default = 2
for rec in track_match_data:
existing_track_description = f"{rec.title} ({rec.artist})"
if Config.FUZZYMATCH_SHOW_SCORES:
existing_track_description += f" ({rec.title_match:.0f}%)"
existing_track_path = self._get_existing_track(rec.track_id).path
choices.append(
(existing_track_description, rec.track_id, existing_track_path)
)
except IndexError:
import pdb
pdb.set_trace()
print(2)
dialog = PickMatch(
new_track_description=importing_track_description,
choices=choices,
default=default,
)
if dialog.exec():
if dialog.selected_track_id < 0:
self.import_files_data[path].import_this_file = False
self.import_files_data[path].error = "you asked not to import this file"
elif dialog.selected_track_id > 0:
self.replace_file(path=path, track_id=dialog.selected_track_id)
else:
# Import as new, but check destination path doesn't
# already exists
while os.path.exists(self.import_files_data[path].destination_path):
msg = (
"New import requested but default destination path ({path}) "
"already exists. Click OK and choose where to save this track"
)
import pdb
pdb.set_trace()
show_OK(None, title="Desintation path exists", msg=msg)
# Get output filename
pathspec = QFileDialog.getSaveFileName(
None,
"Save imported track",
directory=Config.IMPORT_DESTINATION,
)
if not pathspec:
self.import_files_data[path].import_this_file = False
self.import_files_data[
path
].error = "destination file already exists"
return
self.import_files_data[path].destination_path = pathspec[0]
self.import_as_new(path=path)
else:
# User cancelled dialog
self.import_files_data[path].import_this_file = False
self.import_files_data[path].error = "you cancelled the import of this file"
def import_as_new(self, path: str) -> None:
"""
Import passed path as a new file
"""
log.debug(f"Import as new, {path=}")
tfd = self.import_files_data[path]
destination_path = os.path.join(
Config.IMPORT_DESTINATION, os.path.basename(path)
)
if os.path.exists(destination_path):
tfd.import_this_file = False
tfd.error = f"this is a new import but destination file already exists ({destination_path})"
return
tfd.destination_path = destination_path
def replace_file(self, path: str, track_id: int) -> None:
"""
Replace existing track {track_id=} with passed path
"""
log.debug(f"Replace {track_id=} with {path=}")
tfd = self.import_files_data[path]
existing_track_path = self._get_existing_track(track_id).path
proposed_destination_path = os.path.join(
os.path.dirname(existing_track_path), os.path.basename(path)
)
# if the destination path exists and it's not the path the
# track_id points to, abort
if existing_track_path != proposed_destination_path and os.path.exists(
proposed_destination_path
):
tfd.import_this_file = False
tfd.error = f"New import would overwrite existing file ({proposed_destination_path})"
return
tfd.file_path_to_remove = existing_track_path
tfd.destination_path = proposed_destination_path
tfd.track_id = track_id
@dataclass
class ThreadData:
base_model: PlaylistModel
row_number: int
worker: Optional[DoTrackImport] = None
class PickMatch(QDialog):
@ -451,14 +487,18 @@ class PickMatch(QDialog):
"""
def __init__(
self, new_track_details: str, items_with_ids: list[tuple[str, int, str]]
self,
new_track_description: str,
choices: list[tuple[str, int, str]],
default: int,
) -> None:
super().__init__()
self.new_track_details = new_track_details
self.init_ui(items_with_ids)
self.selected_id = -1
self.new_track_description = new_track_description
self.default = default
self.init_ui(choices)
self.selected_track_id = -1
def init_ui(self, items_with_ids: list[tuple[str, int, str]]) -> None:
def init_ui(self, choices: list[tuple[str, int, str]]) -> None:
"""
Set up dialog
"""
@ -469,7 +509,7 @@ class PickMatch(QDialog):
# Add instructions
instructions = (
f"Importing {self.new_track_details}.\n"
f"Importing {self.new_track_description}.\n"
"Import as a new track or replace existing track?"
)
instructions_label = QLabel(instructions)
@ -479,25 +519,25 @@ class PickMatch(QDialog):
self.button_group = QButtonGroup()
# Add radio buttons for each item
for idx, (text, track_id, track_path) in enumerate(items_with_ids):
for idx, (track_description, track_id, track_path) in enumerate(choices):
if (
track_sequence.current
and track_id
and track_sequence.current.track_id == track_id
):
# Don't allow current track to be replaced
text = "(Currently playing) " + text
radio_button = QRadioButton(text)
track_description = "(Currently playing) " + track_description
radio_button = QRadioButton(track_description)
radio_button.setDisabled(True)
self.button_group.addButton(radio_button, -1)
else:
radio_button = QRadioButton(text)
radio_button = QRadioButton(track_description)
radio_button.setToolTip(track_path)
self.button_group.addButton(radio_button, track_id)
layout.addWidget(radio_button)
# Select the second item by default (import as new)
if idx == 1:
if idx == self.default:
radio_button.setChecked(True)
# Add OK and Cancel buttons
@ -516,7 +556,7 @@ class PickMatch(QDialog):
def on_ok(self):
# Get the ID of the selected button
self.selected_id = self.button_group.checkedId()
self.selected_track_id = self.button_group.checkedId()
self.accept()
@ -526,16 +566,19 @@ class TrackFileData:
Simple class to track details changes to a track file
"""
import_file_path: str
tags: Tags
destination_track_path: str = ""
file_path_to_removed: Optional[str] = None
tags: Tags = Tags()
destination_path: str = ""
import_this_file: bool = True
error: str = ""
file_path_to_remove: Optional[str] = None
track_id: int = 0
audio_metadata: Optional[AudioMetadata] = None
track_match_data: list[TrackMatchData] = field(default_factory=list)
def set_destination_track_path(self, path: str) -> None:
"""
Assigned the passed path
"""
self.destination_track_path = path
@dataclass
class TrackMatchData:
artist: str
artist_match: float
title: str
title_match: float
track_id: int

View File

@ -10,7 +10,7 @@ import ssl
import tempfile
# PyQt imports
from PyQt6.QtWidgets import QMainWindow, QMessageBox
from PyQt6.QtWidgets import QMainWindow, QMessageBox, QWidget
# Third party imports
from mutagen.flac import FLAC # type: ignore
@ -204,6 +204,14 @@ def get_tags(path: str) -> Tags:
except TinyTagException:
raise ApplicationError(f"Can't read tags: get_tags({path=})")
if (
tag.title is None
or tag.artist is None
or tag.bitrate is None
or tag.duration is None
):
raise ApplicationError(f"Missing tags: get_tags({path=})")
return Tags(
title=tag.title,
artist=tag.artist,
@ -392,10 +400,16 @@ def set_track_metadata(track: Tracks) -> None:
setattr(track, tag_key, getattr(tags, tag_key))
def show_OK(parent: QMainWindow, title: str, msg: str) -> None:
def show_OK(title: str, msg: str, parent: Optional[QWidget] = None) -> None:
"""Display a message to user"""
QMessageBox.information(parent, title, msg, buttons=QMessageBox.StandardButton.Ok)
dlg = QMessageBox(parent)
dlg.setIcon(QMessageBox.Icon.Information)
dlg.setWindowTitle(title)
dlg.setText(msg)
dlg.setStandardButtons(QMessageBox.StandardButton.Ok)
_ = dlg.exec()
def show_warning(parent: Optional[QMainWindow], title: str, msg: str) -> None:

View File

@ -10,7 +10,6 @@ import sys
# PyQt imports
# Third party imports
import line_profiler
from sqlalchemy import (
bindparam,
delete,
@ -246,9 +245,7 @@ class Playlists(dbtables.PlaylistsTable):
"""Returns a list of all templates ordered by name"""
return session.scalars(
select(cls)
.where(cls.is_template.is_(True))
.order_by(cls.name)
select(cls).where(cls.is_template.is_(True)).order_by(cls.name)
).all()
@classmethod
@ -577,12 +574,10 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
)
@staticmethod
@line_profiler.profile
def update_plr_row_numbers(
session: Session,
playlist_id: int,
sqla_map: List[dict[str, int]],
dummy_for_profiling: Optional[int] = None,
) -> None:
"""
Take a {plrid: row_number} dictionary and update the row numbers accordingly

View File

@ -44,7 +44,6 @@ from PyQt6.QtWidgets import (
)
# Third party imports
import line_profiler
from pygame import mixer
from sqlalchemy.orm.session import Session
import stackprinter # type: ignore
@ -571,18 +570,18 @@ class Window(QMainWindow, Ui_MainWindow):
)
self.actionExport_playlist.triggered.connect(self.export_playlist_tab)
self.actionFade.triggered.connect(self.fade)
self.actionImport_files.triggered.connect(self.import_files_wrapper)
self.actionInsertSectionHeader.triggered.connect(self.insert_header)
self.actionInsertTrack.triggered.connect(self.insert_track)
self.actionManage_templates.triggered.connect(self.manage_templates)
self.actionMark_for_moving.triggered.connect(self.mark_rows_for_moving)
self.actionMoveSelected.triggered.connect(self.move_selected)
self.actionMoveUnplayed.triggered.connect(self.move_unplayed)
self.actionManage_templates.triggered.connect(self.manage_templates)
self.actionNewPlaylist.triggered.connect(self.new_playlist)
self.actionOpenPlaylist.triggered.connect(self.open_playlist)
self.actionPaste.triggered.connect(self.paste_rows)
self.actionPlay_next.triggered.connect(self.play_next)
self.actionRenamePlaylist.triggered.connect(self.rename_playlist)
self.actionReplace_files.triggered.connect(self.import_files_wrapper)
self.actionResume.triggered.connect(self.resume)
self.actionSave_as_template.triggered.connect(self.save_as_template)
self.actionSearch_title_in_Songfacts.triggered.connect(
@ -1140,8 +1139,7 @@ class Window(QMainWindow, Ui_MainWindow):
else:
webbrowser.get("browser").open_new_tab(url)
@line_profiler.profile
def paste_rows(self, dummy_for_profiling: Optional[int] = None) -> None:
def paste_rows(self) -> None:
"""
Paste earlier cut rows.
"""

View File

@ -26,7 +26,6 @@ from PyQt6.QtGui import (
)
# Third party imports
import line_profiler
import obswebsocket # type: ignore
# import snoop # type: ignore
@ -297,22 +296,17 @@ class PlaylistModel(QAbstractTableModel):
self.update_track_times()
# Find next track
# Get all unplayed track rows
log.debug(f"{self}: Find next track")
next_row = None
unplayed_rows = self.get_unplayed_rows()
unplayed_rows = [
a
for a in self.get_unplayed_rows()
if not self.is_header_row(a)
and not file_is_unreadable(self.playlist_rows[a].path)
]
if unplayed_rows:
try:
# Find next row after current track
next_row = min(
[
a
for a in unplayed_rows
if a > row_number and not self.is_header_row(a)
]
)
next_row = min([a for a in unplayed_rows if a > row_number])
except ValueError:
# Find first unplayed track
next_row = min(unplayed_rows)
if next_row is not None:
self.set_next_row(next_row)
@ -777,9 +771,7 @@ class PlaylistModel(QAbstractTableModel):
return None
def load_data(
self, session: db.session, dummy_for_profiling: Optional[int] = None
) -> None:
def load_data(self, session: db.session) -> None:
"""
Same as refresh data, but only used when creating playslit.
Distinguishes profile time between initial load and other
@ -826,13 +818,7 @@ class PlaylistModel(QAbstractTableModel):
self.update_track_times()
self.invalidate_rows(row_numbers)
@line_profiler.profile
def move_rows(
self,
from_rows: list[int],
to_row_number: int,
dummy_for_profiling: Optional[int] = None,
) -> None:
def move_rows(self, from_rows: list[int], to_row_number: int) -> None:
"""
Move the playlist rows given to to_row and below.
"""
@ -897,13 +883,11 @@ class PlaylistModel(QAbstractTableModel):
self.update_track_times()
self.invalidate_rows(list(row_map.keys()))
@line_profiler.profile
def move_rows_between_playlists(
self,
from_rows: list[int],
to_row_number: int,
to_playlist_id: int,
dummy_for_profiling: Optional[int] = None,
) -> None:
"""
Move the playlist rows given to to_row and below of to_playlist.
@ -1076,10 +1060,7 @@ class PlaylistModel(QAbstractTableModel):
# Update display
self.invalidate_row(track_sequence.previous.row_number)
@line_profiler.profile
def refresh_data(
self, session: db.session, dummy_for_profiling: Optional[int] = None
) -> None:
def refresh_data(self, session: db.session) -> None:
"""
Populate self.playlist_rows with playlist data
@ -1582,6 +1563,23 @@ class PlaylistModel(QAbstractTableModel):
)
)
def update_or_insert(self, track_id: int, row_number: int) -> None:
"""
If the passed track_id exists in this playlist, update the
row(s), otherwise insert this track at row_number.
"""
track_rows = [
a.row_number for a in self.playlist_rows.values() if a.track_id == track_id
]
if track_rows:
with db.Session() as session:
for row in track_rows:
self.refresh_row(session, row)
self.invalidate_rows(track_rows)
else:
self.insert_row(proposed_row_number=row_number, track_id=track_id)
def update_track_times(self) -> None:
"""
Update track start/end times in self.playlist_rows

View File

@ -33,7 +33,6 @@ from PyQt6.QtWidgets import (
)
# Third party imports
import line_profiler
# App imports
from audacity_controller import AudacityController
@ -378,10 +377,7 @@ class PlaylistTab(QTableView):
# Deselect edited line
self.clear_selection()
@line_profiler.profile
def dropEvent(
self, event: Optional[QDropEvent], dummy_for_profiling: Optional[int] = None
) -> None:
def dropEvent(self, event: Optional[QDropEvent]) -> None:
"""
Move dropped rows
"""

View File

@ -1000,7 +1000,7 @@ padding-left: 8px;</string>
<addaction name="actionSave_as_template"/>
<addaction name="actionManage_templates"/>
<addaction name="separator"/>
<addaction name="actionReplace_files"/>
<addaction name="actionImport_files"/>
<addaction name="separator"/>
<addaction name="actionE_xit"/>
</widget>
@ -1364,7 +1364,7 @@ padding-left: 8px;</string>
<string>Select duplicate rows...</string>
</property>
</action>
<action name="actionReplace_files">
<action name="actionImport_files">
<property name="text">
<string>Import files...</string>
</property>

View File

@ -1,6 +1,6 @@
# Form implementation generated from reading ui file 'app/ui/main_window.ui'
#
# Created by: PyQt6 UI code generator 6.7.1
# Created by: PyQt6 UI code generator 6.8.0
#
# WARNING: Any manual changes made to this file will be lost when pyuic6 is
# run again. Do not edit this file unless you know what you are doing.
@ -529,8 +529,8 @@ class Ui_MainWindow(object):
self.actionSearch_title_in_Songfacts.setObjectName("actionSearch_title_in_Songfacts")
self.actionSelect_duplicate_rows = QtGui.QAction(parent=MainWindow)
self.actionSelect_duplicate_rows.setObjectName("actionSelect_duplicate_rows")
self.actionReplace_files = QtGui.QAction(parent=MainWindow)
self.actionReplace_files.setObjectName("actionReplace_files")
self.actionImport_files = QtGui.QAction(parent=MainWindow)
self.actionImport_files.setObjectName("actionImport_files")
self.menuFile.addSeparator()
self.menuFile.addAction(self.actionInsertTrack)
self.menuFile.addAction(self.actionRemove)
@ -557,7 +557,7 @@ class Ui_MainWindow(object):
self.menuPlaylist.addAction(self.actionSave_as_template)
self.menuPlaylist.addAction(self.actionManage_templates)
self.menuPlaylist.addSeparator()
self.menuPlaylist.addAction(self.actionReplace_files)
self.menuPlaylist.addAction(self.actionImport_files)
self.menuPlaylist.addSeparator()
self.menuPlaylist.addAction(self.actionE_xit)
self.menuSearc_h.addAction(self.actionSetNext)
@ -676,6 +676,6 @@ class Ui_MainWindow(object):
self.actionSearch_title_in_Songfacts.setText(_translate("MainWindow", "Search title in Songfacts"))
self.actionSearch_title_in_Songfacts.setShortcut(_translate("MainWindow", "Ctrl+S"))
self.actionSelect_duplicate_rows.setText(_translate("MainWindow", "Select duplicate rows..."))
self.actionReplace_files.setText(_translate("MainWindow", "Import files..."))
from infotabs import InfoTabs # type: ignore
from pyqtgraph import PlotWidget # type: ignore
self.actionImport_files.setText(_translate("MainWindow", "Import files..."))
from infotabs import InfoTabs
from pyqtgraph import PlotWidget