From c9b45848dd4f9415455fae7735e02d49055e9a90 Mon Sep 17 00:00:00 2001 From: Keith Edmunds Date: Wed, 5 Feb 2025 17:43:38 +0000 Subject: [PATCH] Refine and fix file_importer tests --- app/file_importer.py | 16 ++- app/helpers.py | 6 +- tests/test_file_importer.py | 259 ++++++++++++++++++++++++++++++++---- 3 files changed, 250 insertions(+), 31 deletions(-) diff --git a/app/file_importer.py b/app/file_importer.py index bca3d4e..9c4f406 100644 --- a/app/file_importer.py +++ b/app/file_importer.py @@ -443,8 +443,8 @@ class FileImporter: if tfd.track_id == 0 and tfd.destination_path != tfd.file_path_to_remove: while os.path.exists(tfd.destination_path): msg = ( - "New import requested but default destination path ({ifd.destination_path}) " - "already exists. Click OK and choose where to save this track" + f"New import requested but default destination path ({tfd.destination_path})" + " already exists. Click OK and choose where to save this track" ) show_OK(title="Desintation path exists", msg=msg, parent=None) # Get output filename @@ -462,6 +462,18 @@ class FileImporter: else: tfd.error = "destination file already exists" return False + # The desintation path should not already exist in the + # database (becquse if it does, it points to a non-existent + # file). Check that because the path field in the database is + # unique and so adding a duplicate will give a db integrity + # error. + with db.Session() as session: + if Tracks.get_by_path(session, tfd.destination_path): + tfd.error = ( + "Importing a new track but destination path already exists " + f"in database ({tfd.destination_path})" + ) + return False # Check track_id if tfd.track_id < 0: diff --git a/app/helpers.py b/app/helpers.py index eee1a3c..0747af3 100644 --- a/app/helpers.py +++ b/app/helpers.py @@ -200,9 +200,9 @@ def get_tags(path: str) -> Tags: try: tag = TinyTag.get(path) except FileNotFoundError: - raise ApplicationError(f"File not found: {path})") + raise ApplicationError(f"File not found: {path}") except TinyTagException: - raise ApplicationError(f"Can't read tags in {path})") + raise ApplicationError(f"Can't read tags in {path}") if ( tag.title is None @@ -210,7 +210,7 @@ def get_tags(path: str) -> Tags: or tag.bitrate is None or tag.duration is None ): - raise ApplicationError(f"Missing tags in {path})") + raise ApplicationError(f"Missing tags in {path}") return Tags( title=tag.title, diff --git a/tests/test_file_importer.py b/tests/test_file_importer.py index 0e2076d..feb9ee5 100644 --- a/tests/test_file_importer.py +++ b/tests/test_file_importer.py @@ -12,10 +12,10 @@ import unittest from unittest.mock import MagicMock, patch # PyQt imports -from PyQt6.QtCore import Qt -from PyQt6.QtWidgets import QApplication, QDialog, QPushButton +from PyQt6.QtWidgets import QDialog, QFileDialog # Third party imports +from mutagen.mp3 import MP3 # type: ignore import pytest from pytestqt.plugin import QtBot # type: ignore @@ -27,7 +27,7 @@ from app.models import ( Tracks, ) from config import Config -from file_importer import FileImporter, PickMatch +from file_importer import FileImporter # Custom fixture to adapt qtbot for use with unittest.TestCase @@ -37,7 +37,14 @@ def qtbot_adapter(qapp, request): request.cls.qtbot = QtBot(request) -@pytest.mark.usefixtures("qtbot_adapter") +# Fixture for tmp_path to be available in the class +@pytest.fixture(scope="class") +def class_tmp_path(request, tmp_path_factory): + """Provide a class-wide tmp_path""" + request.cls.tmp_path = tmp_path_factory.mktemp("pytest_tmp") + + +@pytest.mark.usefixtures("qtbot_adapter", "class_tmp_path") class MyTestCase(unittest.TestCase): @classmethod def setUpClass(cls): @@ -77,6 +84,16 @@ class MyTestCase(unittest.TestCase): """Runs after each test""" self.widget.close() # Close UI to prevent side effects + def wait_for_workers(self, timeout: int = 10000): + """ + Let import threads workers run to completion + """ + + def workers_empty(): + assert FileImporter.workers == {} + + self.qtbot.waitUntil(workers_empty, timeout=timeout) + def test_001_import_no_files(self): """Try importing with no files to import""" @@ -156,12 +173,7 @@ class MyTestCase(unittest.TestCase): # Ensure selected_track_id was accessed after dialog.exec() assert mock_dialog_instance.selected_track_id == 0 - # Wait until workers have run to completion - # self.qtbot.wait(3000) - def workers_empty(): - assert FileImporter.workers == {} - - self.qtbot.waitUntil(workers_empty, timeout=10000) + self.wait_for_workers() # Check track was imported with db.Session() as session: @@ -207,8 +219,7 @@ class MyTestCase(unittest.TestCase): # Ensure selected_track_id was accessed after dialog.exec() assert mock_dialog_instance.selected_track_id == 0 - # Allow time for import thread to run - self.qtbot.wait(3000) + self.wait_for_workers() # Check track was imported with db.Session() as session: @@ -244,20 +255,24 @@ class MyTestCase(unittest.TestCase): # Ensure PickMatch was instantiated correctly MockPickMatch.assert_called_once_with( new_track_description="The Lovecats (The Cure)", - choices=[("Do not import", -1, ""), - ("Import as new track", 0, ""), - ("The Lovecats (The Cure) (100%)", 2, - os.path.join(self.musicstore, os.path.basename(test_track_path)) - ), - ], + choices=[ + ("Do not import", -1, ""), + ("Import as new track", 0, ""), + ( + "The Lovecats (The Cure) (100%)", + 2, + os.path.join( + self.musicstore, os.path.basename(test_track_path) + ), + ), + ], default=2, ) # Verify exec() was called mock_dialog_instance.exec.assert_called_once() - # Allow time for import thread to run - self.qtbot.wait(3000) + self.wait_for_workers() # Check track was imported with db.Session() as session: @@ -274,9 +289,201 @@ class MyTestCase(unittest.TestCase): assert os.path.exists(track_file) assert os.listdir(self.import_source) == [] - # def test_import_replace_file - # def test_import_similar_file - # def test_import_new_file_existing_destination - # def test_import_file_and_cancel - # def test_import_file_no_tags - # def test_import_unreadable_file + def test_006_import_file_no_tags(self) -> None: + """Try to import untagged file""" + + test_track_path = "testdata/lovecats.mp3" + test_filename = os.path.basename(test_track_path) + + shutil.copy(test_track_path, self.import_source) + import_file = os.path.join(self.import_source, test_filename) + assert os.path.exists(import_file) + + # Remove tags + src = MP3(import_file) + src.delete() + src.save() + + with patch("file_importer.show_OK") as mock_show_ok: + self.widget.import_files_wrapper() + mock_show_ok.assert_called_once_with( + "File not imported", + f"{test_filename} will not be imported because of tag errors " + f"(Missing tags in {import_file})", + ) + + def test_007_import_unreadable_file(self) -> None: + """Import unreadable file""" + + test_track_path = "testdata/lovecats.mp3" + test_filename = os.path.basename(test_track_path) + + shutil.copy(test_track_path, self.import_source) + import_file = os.path.join(self.import_source, test_filename) + assert os.path.exists(import_file) + + # Make undreadable + os.chmod(import_file, 0) + + with patch("file_importer.show_OK") as mock_show_ok: + self.widget.import_files_wrapper() + mock_show_ok.assert_called_once_with( + "File not imported", + f"{test_filename} will not be imported because {import_file} is unreadable", + ) + + # clean up + os.chmod(import_file, 0o777) + os.unlink(import_file) + + def test_008_import_new_file_existing_destination(self) -> None: + """Import duplicate file""" + + test_track_path = "testdata/lovecats.mp3" + test_filename = os.path.basename(test_track_path) + new_destination = os.path.join(self.musicstore, "lc2.mp3") + + shutil.copy(test_track_path, self.import_source) + import_file = os.path.join(self.import_source, test_filename) + assert os.path.exists(import_file) + + with ( + patch("file_importer.PickMatch") as MockPickMatch, + patch.object( + QFileDialog, "getSaveFileName", return_value=(new_destination, "") + ) as mock_file_dialog, + patch("file_importer.show_OK") as mock_show_ok, + ): + mock_file_dialog.return_value = ( + new_destination, + "", + ) # Ensure mock correctly returns expected value + + # Create a mock instance of PickMatch + mock_dialog_instance = MagicMock() + MockPickMatch.return_value = mock_dialog_instance + + # Simulate the user clicking OK in the dialog + mock_dialog_instance.exec.return_value = QDialog.DialogCode.Accepted + mock_dialog_instance.selected_track_id = 0 # Simulated return value + + self.widget.import_files_wrapper() + + # Ensure PickMatch was instantiated correctly + MockPickMatch.assert_called_once_with( + new_track_description="The Lovecats (The Cure)", + choices=[ + ("Do not import", -1, ""), + ("Import as new track", 0, ""), + ( + "The Lovecats (The Cure) (100%)", + 2, + os.path.join( + self.musicstore, os.path.basename(test_track_path) + ), + ), + ], + default=2, + ) + + # Verify exec() was called + mock_dialog_instance.exec.assert_called_once() + + destination = os.path.join(self.musicstore, test_filename) + mock_show_ok.assert_called_once_with( + title="Desintation path exists", + msg=f"New import requested but default destination path ({destination}) " + "already exists. Click OK and choose where to save this track", + parent=None, + ) + + self.wait_for_workers() + + # Ensure QFileDialog was called and returned expected value + assert mock_file_dialog.called # Ensure the mock was used + result = mock_file_dialog() + assert result[0] == new_destination # Validate return value + + # Check track was imported + with db.Session() as session: + tracks = Tracks.get_all(session) + assert len(tracks) == 3 + track = tracks[2] + assert track.title == "The Lovecats" + assert track.artist == "The Cure" + assert track.id == 3 + assert track.path == new_destination + assert os.path.exists(new_destination) + assert os.listdir(self.import_source) == [] + + # Remove file so as not to interfere with later tests + session.delete(track) + tracks = Tracks.get_all(session) + assert len(tracks) == 2 + session.commit() + + os.unlink(new_destination) + assert not os.path.exists(new_destination) + + def test_009_import_similar_file(self) -> None: + """Import file with similar, but different, title""" + + test_track_path = "testdata/lovecats.mp3" + test_filename = os.path.basename(test_track_path) + + shutil.copy(test_track_path, self.import_source) + import_file = os.path.join(self.import_source, test_filename) + assert os.path.exists(import_file) + + # Change title tag + src = MP3(import_file) + src["TIT2"].text[0] += " xyz" + src.save() + + with patch("file_importer.PickMatch") as MockPickMatch: + # Create a mock instance of PickMatch + mock_dialog_instance = MagicMock() + MockPickMatch.return_value = mock_dialog_instance + + # Simulate the user clicking OK in the dialog + mock_dialog_instance.exec.return_value = QDialog.DialogCode.Accepted + mock_dialog_instance.selected_track_id = 2 # Simulated return value + + self.widget.import_files_wrapper() + + # Ensure PickMatch was instantiated correctly + MockPickMatch.assert_called_once_with( + new_track_description="The Lovecats xyz (The Cure)", + choices=[ + ("Do not import", -1, ""), + ("Import as new track", 0, ""), + ( + "The Lovecats (The Cure) (93%)", + 2, + os.path.join( + self.musicstore, os.path.basename(test_track_path) + ), + ), + ], + default=2, + ) + + # Verify exec() was called + mock_dialog_instance.exec.assert_called_once() + + self.wait_for_workers() + + # Check track was imported + with db.Session() as session: + tracks = Tracks.get_all(session) + assert len(tracks) == 2 + track = tracks[1] + assert track.title == "The Lovecats xyz" + assert track.artist == "The Cure" + assert track.id == 2 + track_file = os.path.join( + self.musicstore, os.path.basename(test_track_path) + ) + assert track.path == track_file + assert os.path.exists(track_file) + assert os.listdir(self.import_source) == []