Refine and fix file_importer tests
This commit is contained in:
parent
fd0d8b15f7
commit
c9b45848dd
@ -443,8 +443,8 @@ class FileImporter:
|
||||
if tfd.track_id == 0 and tfd.destination_path != tfd.file_path_to_remove:
|
||||
while os.path.exists(tfd.destination_path):
|
||||
msg = (
|
||||
"New import requested but default destination path ({ifd.destination_path}) "
|
||||
"already exists. Click OK and choose where to save this track"
|
||||
f"New import requested but default destination path ({tfd.destination_path})"
|
||||
" already exists. Click OK and choose where to save this track"
|
||||
)
|
||||
show_OK(title="Desintation path exists", msg=msg, parent=None)
|
||||
# Get output filename
|
||||
@ -462,6 +462,18 @@ class FileImporter:
|
||||
else:
|
||||
tfd.error = "destination file already exists"
|
||||
return False
|
||||
# The desintation path should not already exist in the
|
||||
# database (becquse if it does, it points to a non-existent
|
||||
# file). Check that because the path field in the database is
|
||||
# unique and so adding a duplicate will give a db integrity
|
||||
# error.
|
||||
with db.Session() as session:
|
||||
if Tracks.get_by_path(session, tfd.destination_path):
|
||||
tfd.error = (
|
||||
"Importing a new track but destination path already exists "
|
||||
f"in database ({tfd.destination_path})"
|
||||
)
|
||||
return False
|
||||
|
||||
# Check track_id
|
||||
if tfd.track_id < 0:
|
||||
|
||||
@ -200,9 +200,9 @@ def get_tags(path: str) -> Tags:
|
||||
try:
|
||||
tag = TinyTag.get(path)
|
||||
except FileNotFoundError:
|
||||
raise ApplicationError(f"File not found: {path})")
|
||||
raise ApplicationError(f"File not found: {path}")
|
||||
except TinyTagException:
|
||||
raise ApplicationError(f"Can't read tags in {path})")
|
||||
raise ApplicationError(f"Can't read tags in {path}")
|
||||
|
||||
if (
|
||||
tag.title is None
|
||||
@ -210,7 +210,7 @@ def get_tags(path: str) -> Tags:
|
||||
or tag.bitrate is None
|
||||
or tag.duration is None
|
||||
):
|
||||
raise ApplicationError(f"Missing tags in {path})")
|
||||
raise ApplicationError(f"Missing tags in {path}")
|
||||
|
||||
return Tags(
|
||||
title=tag.title,
|
||||
|
||||
@ -12,10 +12,10 @@ import unittest
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
# PyQt imports
|
||||
from PyQt6.QtCore import Qt
|
||||
from PyQt6.QtWidgets import QApplication, QDialog, QPushButton
|
||||
from PyQt6.QtWidgets import QDialog, QFileDialog
|
||||
|
||||
# Third party imports
|
||||
from mutagen.mp3 import MP3 # type: ignore
|
||||
import pytest
|
||||
from pytestqt.plugin import QtBot # type: ignore
|
||||
|
||||
@ -27,7 +27,7 @@ from app.models import (
|
||||
Tracks,
|
||||
)
|
||||
from config import Config
|
||||
from file_importer import FileImporter, PickMatch
|
||||
from file_importer import FileImporter
|
||||
|
||||
|
||||
# Custom fixture to adapt qtbot for use with unittest.TestCase
|
||||
@ -37,7 +37,14 @@ def qtbot_adapter(qapp, request):
|
||||
request.cls.qtbot = QtBot(request)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("qtbot_adapter")
|
||||
# Fixture for tmp_path to be available in the class
|
||||
@pytest.fixture(scope="class")
|
||||
def class_tmp_path(request, tmp_path_factory):
|
||||
"""Provide a class-wide tmp_path"""
|
||||
request.cls.tmp_path = tmp_path_factory.mktemp("pytest_tmp")
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("qtbot_adapter", "class_tmp_path")
|
||||
class MyTestCase(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
@ -77,6 +84,16 @@ class MyTestCase(unittest.TestCase):
|
||||
"""Runs after each test"""
|
||||
self.widget.close() # Close UI to prevent side effects
|
||||
|
||||
def wait_for_workers(self, timeout: int = 10000):
|
||||
"""
|
||||
Let import threads workers run to completion
|
||||
"""
|
||||
|
||||
def workers_empty():
|
||||
assert FileImporter.workers == {}
|
||||
|
||||
self.qtbot.waitUntil(workers_empty, timeout=timeout)
|
||||
|
||||
def test_001_import_no_files(self):
|
||||
"""Try importing with no files to import"""
|
||||
|
||||
@ -156,12 +173,7 @@ class MyTestCase(unittest.TestCase):
|
||||
# Ensure selected_track_id was accessed after dialog.exec()
|
||||
assert mock_dialog_instance.selected_track_id == 0
|
||||
|
||||
# Wait until workers have run to completion
|
||||
# self.qtbot.wait(3000)
|
||||
def workers_empty():
|
||||
assert FileImporter.workers == {}
|
||||
|
||||
self.qtbot.waitUntil(workers_empty, timeout=10000)
|
||||
self.wait_for_workers()
|
||||
|
||||
# Check track was imported
|
||||
with db.Session() as session:
|
||||
@ -207,8 +219,7 @@ class MyTestCase(unittest.TestCase):
|
||||
# Ensure selected_track_id was accessed after dialog.exec()
|
||||
assert mock_dialog_instance.selected_track_id == 0
|
||||
|
||||
# Allow time for import thread to run
|
||||
self.qtbot.wait(3000)
|
||||
self.wait_for_workers()
|
||||
|
||||
# Check track was imported
|
||||
with db.Session() as session:
|
||||
@ -244,20 +255,24 @@ class MyTestCase(unittest.TestCase):
|
||||
# Ensure PickMatch was instantiated correctly
|
||||
MockPickMatch.assert_called_once_with(
|
||||
new_track_description="The Lovecats (The Cure)",
|
||||
choices=[("Do not import", -1, ""),
|
||||
("Import as new track", 0, ""),
|
||||
("The Lovecats (The Cure) (100%)", 2,
|
||||
os.path.join(self.musicstore, os.path.basename(test_track_path))
|
||||
),
|
||||
],
|
||||
choices=[
|
||||
("Do not import", -1, ""),
|
||||
("Import as new track", 0, ""),
|
||||
(
|
||||
"The Lovecats (The Cure) (100%)",
|
||||
2,
|
||||
os.path.join(
|
||||
self.musicstore, os.path.basename(test_track_path)
|
||||
),
|
||||
),
|
||||
],
|
||||
default=2,
|
||||
)
|
||||
|
||||
# Verify exec() was called
|
||||
mock_dialog_instance.exec.assert_called_once()
|
||||
|
||||
# Allow time for import thread to run
|
||||
self.qtbot.wait(3000)
|
||||
self.wait_for_workers()
|
||||
|
||||
# Check track was imported
|
||||
with db.Session() as session:
|
||||
@ -274,9 +289,201 @@ class MyTestCase(unittest.TestCase):
|
||||
assert os.path.exists(track_file)
|
||||
assert os.listdir(self.import_source) == []
|
||||
|
||||
# def test_import_replace_file
|
||||
# def test_import_similar_file
|
||||
# def test_import_new_file_existing_destination
|
||||
# def test_import_file_and_cancel
|
||||
# def test_import_file_no_tags
|
||||
# def test_import_unreadable_file
|
||||
def test_006_import_file_no_tags(self) -> None:
|
||||
"""Try to import untagged file"""
|
||||
|
||||
test_track_path = "testdata/lovecats.mp3"
|
||||
test_filename = os.path.basename(test_track_path)
|
||||
|
||||
shutil.copy(test_track_path, self.import_source)
|
||||
import_file = os.path.join(self.import_source, test_filename)
|
||||
assert os.path.exists(import_file)
|
||||
|
||||
# Remove tags
|
||||
src = MP3(import_file)
|
||||
src.delete()
|
||||
src.save()
|
||||
|
||||
with patch("file_importer.show_OK") as mock_show_ok:
|
||||
self.widget.import_files_wrapper()
|
||||
mock_show_ok.assert_called_once_with(
|
||||
"File not imported",
|
||||
f"{test_filename} will not be imported because of tag errors "
|
||||
f"(Missing tags in {import_file})",
|
||||
)
|
||||
|
||||
def test_007_import_unreadable_file(self) -> None:
|
||||
"""Import unreadable file"""
|
||||
|
||||
test_track_path = "testdata/lovecats.mp3"
|
||||
test_filename = os.path.basename(test_track_path)
|
||||
|
||||
shutil.copy(test_track_path, self.import_source)
|
||||
import_file = os.path.join(self.import_source, test_filename)
|
||||
assert os.path.exists(import_file)
|
||||
|
||||
# Make undreadable
|
||||
os.chmod(import_file, 0)
|
||||
|
||||
with patch("file_importer.show_OK") as mock_show_ok:
|
||||
self.widget.import_files_wrapper()
|
||||
mock_show_ok.assert_called_once_with(
|
||||
"File not imported",
|
||||
f"{test_filename} will not be imported because {import_file} is unreadable",
|
||||
)
|
||||
|
||||
# clean up
|
||||
os.chmod(import_file, 0o777)
|
||||
os.unlink(import_file)
|
||||
|
||||
def test_008_import_new_file_existing_destination(self) -> None:
|
||||
"""Import duplicate file"""
|
||||
|
||||
test_track_path = "testdata/lovecats.mp3"
|
||||
test_filename = os.path.basename(test_track_path)
|
||||
new_destination = os.path.join(self.musicstore, "lc2.mp3")
|
||||
|
||||
shutil.copy(test_track_path, self.import_source)
|
||||
import_file = os.path.join(self.import_source, test_filename)
|
||||
assert os.path.exists(import_file)
|
||||
|
||||
with (
|
||||
patch("file_importer.PickMatch") as MockPickMatch,
|
||||
patch.object(
|
||||
QFileDialog, "getSaveFileName", return_value=(new_destination, "")
|
||||
) as mock_file_dialog,
|
||||
patch("file_importer.show_OK") as mock_show_ok,
|
||||
):
|
||||
mock_file_dialog.return_value = (
|
||||
new_destination,
|
||||
"",
|
||||
) # Ensure mock correctly returns expected value
|
||||
|
||||
# Create a mock instance of PickMatch
|
||||
mock_dialog_instance = MagicMock()
|
||||
MockPickMatch.return_value = mock_dialog_instance
|
||||
|
||||
# Simulate the user clicking OK in the dialog
|
||||
mock_dialog_instance.exec.return_value = QDialog.DialogCode.Accepted
|
||||
mock_dialog_instance.selected_track_id = 0 # Simulated return value
|
||||
|
||||
self.widget.import_files_wrapper()
|
||||
|
||||
# Ensure PickMatch was instantiated correctly
|
||||
MockPickMatch.assert_called_once_with(
|
||||
new_track_description="The Lovecats (The Cure)",
|
||||
choices=[
|
||||
("Do not import", -1, ""),
|
||||
("Import as new track", 0, ""),
|
||||
(
|
||||
"The Lovecats (The Cure) (100%)",
|
||||
2,
|
||||
os.path.join(
|
||||
self.musicstore, os.path.basename(test_track_path)
|
||||
),
|
||||
),
|
||||
],
|
||||
default=2,
|
||||
)
|
||||
|
||||
# Verify exec() was called
|
||||
mock_dialog_instance.exec.assert_called_once()
|
||||
|
||||
destination = os.path.join(self.musicstore, test_filename)
|
||||
mock_show_ok.assert_called_once_with(
|
||||
title="Desintation path exists",
|
||||
msg=f"New import requested but default destination path ({destination}) "
|
||||
"already exists. Click OK and choose where to save this track",
|
||||
parent=None,
|
||||
)
|
||||
|
||||
self.wait_for_workers()
|
||||
|
||||
# Ensure QFileDialog was called and returned expected value
|
||||
assert mock_file_dialog.called # Ensure the mock was used
|
||||
result = mock_file_dialog()
|
||||
assert result[0] == new_destination # Validate return value
|
||||
|
||||
# Check track was imported
|
||||
with db.Session() as session:
|
||||
tracks = Tracks.get_all(session)
|
||||
assert len(tracks) == 3
|
||||
track = tracks[2]
|
||||
assert track.title == "The Lovecats"
|
||||
assert track.artist == "The Cure"
|
||||
assert track.id == 3
|
||||
assert track.path == new_destination
|
||||
assert os.path.exists(new_destination)
|
||||
assert os.listdir(self.import_source) == []
|
||||
|
||||
# Remove file so as not to interfere with later tests
|
||||
session.delete(track)
|
||||
tracks = Tracks.get_all(session)
|
||||
assert len(tracks) == 2
|
||||
session.commit()
|
||||
|
||||
os.unlink(new_destination)
|
||||
assert not os.path.exists(new_destination)
|
||||
|
||||
def test_009_import_similar_file(self) -> None:
|
||||
"""Import file with similar, but different, title"""
|
||||
|
||||
test_track_path = "testdata/lovecats.mp3"
|
||||
test_filename = os.path.basename(test_track_path)
|
||||
|
||||
shutil.copy(test_track_path, self.import_source)
|
||||
import_file = os.path.join(self.import_source, test_filename)
|
||||
assert os.path.exists(import_file)
|
||||
|
||||
# Change title tag
|
||||
src = MP3(import_file)
|
||||
src["TIT2"].text[0] += " xyz"
|
||||
src.save()
|
||||
|
||||
with patch("file_importer.PickMatch") as MockPickMatch:
|
||||
# Create a mock instance of PickMatch
|
||||
mock_dialog_instance = MagicMock()
|
||||
MockPickMatch.return_value = mock_dialog_instance
|
||||
|
||||
# Simulate the user clicking OK in the dialog
|
||||
mock_dialog_instance.exec.return_value = QDialog.DialogCode.Accepted
|
||||
mock_dialog_instance.selected_track_id = 2 # Simulated return value
|
||||
|
||||
self.widget.import_files_wrapper()
|
||||
|
||||
# Ensure PickMatch was instantiated correctly
|
||||
MockPickMatch.assert_called_once_with(
|
||||
new_track_description="The Lovecats xyz (The Cure)",
|
||||
choices=[
|
||||
("Do not import", -1, ""),
|
||||
("Import as new track", 0, ""),
|
||||
(
|
||||
"The Lovecats (The Cure) (93%)",
|
||||
2,
|
||||
os.path.join(
|
||||
self.musicstore, os.path.basename(test_track_path)
|
||||
),
|
||||
),
|
||||
],
|
||||
default=2,
|
||||
)
|
||||
|
||||
# Verify exec() was called
|
||||
mock_dialog_instance.exec.assert_called_once()
|
||||
|
||||
self.wait_for_workers()
|
||||
|
||||
# Check track was imported
|
||||
with db.Session() as session:
|
||||
tracks = Tracks.get_all(session)
|
||||
assert len(tracks) == 2
|
||||
track = tracks[1]
|
||||
assert track.title == "The Lovecats xyz"
|
||||
assert track.artist == "The Cure"
|
||||
assert track.id == 2
|
||||
track_file = os.path.join(
|
||||
self.musicstore, os.path.basename(test_track_path)
|
||||
)
|
||||
assert track.path == track_file
|
||||
assert os.path.exists(track_file)
|
||||
assert os.listdir(self.import_source) == []
|
||||
|
||||
Loading…
Reference in New Issue
Block a user