Compare commits

..

No commits in common. "c53f511fd3a44ac4d0c8a699fd35c8d48ae3b2da" and "3afcfd585695743bd5bd2670696f424fac5313eb" have entirely different histories.

10 changed files with 227 additions and 281 deletions

View File

@ -1,10 +1,9 @@
# Standard library imports
from __future__ import annotations
from dataclasses import dataclass
from dataclasses import dataclass, field
from enum import auto, Enum
import functools
import threading
from typing import NamedTuple
# Third party imports
@ -35,18 +34,12 @@ def singleton(cls):
"""
Make a class a Singleton class (see
https://realpython.com/primer-on-python-decorators/#creating-singletons)
Added locking.
"""
lock = threading.Lock()
@functools.wraps(cls)
def wrapper_singleton(*args, **kwargs):
if wrapper_singleton.instance is None:
with lock:
if wrapper_singleton.instance is None: # Check still None
wrapper_singleton.instance = cls(*args, **kwargs)
if not wrapper_singleton.instance:
wrapper_singleton.instance = cls(*args, **kwargs)
return wrapper_singleton.instance
wrapper_singleton.instance = None

View File

@ -2,6 +2,7 @@
import datetime as dt
import logging
import os
from typing import Optional
# PyQt imports
@ -34,6 +35,8 @@ class Config(object):
COLOUR_UNREADABLE = "#dc3545"
COLOUR_WARNING_TIMER = "#ffc107"
DBFS_SILENCE = -50
DEBUG_FUNCTIONS: list[Optional[str]] = []
DEBUG_MODULES: list[Optional[str]] = []
DEFAULT_COLUMN_WIDTH = 200
DISPLAY_SQL = False
DO_NOT_IMPORT = "Do not import"
@ -80,7 +83,6 @@ class Config(object):
MAIL_USERNAME = os.environ.get("MAIL_USERNAME")
MAIL_USE_TLS = os.environ.get("MAIL_USE_TLS") is not None
MAX_IMPORT_MATCHES = 5
MAX_IMPORT_THREADS = 3
MAX_INFO_TABS = 5
MAX_MISSING_FILES_TO_REPORT = 10
MILLISECOND_SIGFIGS = 0

View File

@ -147,15 +147,12 @@ class TracksTable(Model):
title: Mapped[str] = mapped_column(String(256), index=True)
playlistrows: Mapped[list[PlaylistRowsTable]] = relationship(
"PlaylistRowsTable",
back_populates="track",
cascade="all, delete-orphan",
"PlaylistRowsTable", back_populates="track"
)
playlists = association_proxy("playlistrows", "playlist")
playdates: Mapped[list[PlaydatesTable]] = relationship(
"PlaydatesTable",
back_populates="track",
cascade="all, delete-orphan",
lazy="joined",
)

View File

@ -3,7 +3,6 @@ from __future__ import annotations
from dataclasses import dataclass, field
from fuzzywuzzy import fuzz # type: ignore
import os.path
import threading
from typing import Optional, Sequence
import os
import shutil
@ -11,6 +10,7 @@ import shutil
# PyQt imports
from PyQt6.QtCore import (
pyqtSignal,
QObject,
QThread,
)
from PyQt6.QtWidgets import (
@ -30,7 +30,6 @@ from PyQt6.QtWidgets import (
from classes import (
ApplicationError,
MusicMusterSignals,
singleton,
Tags,
)
from config import Config
@ -54,6 +53,7 @@ class ThreadData:
base_model: PlaylistModel
row_number: int
worker: Optional[DoTrackImport] = None
@dataclass
@ -62,10 +62,9 @@ class TrackFileData:
Data structure to hold details of file to be imported
"""
source_path: str
tags: Tags = Tags()
destination_path: str = ""
import_this_file: bool = False
import_this_file: bool = True
error: str = ""
file_path_to_remove: Optional[str] = None
track_id: int = 0
@ -86,7 +85,6 @@ class TrackMatchData:
track_id: int
@singleton
class FileImporter:
"""
Class to manage the import of new tracks. Sanity checks are carried
@ -99,16 +97,11 @@ class FileImporter:
The actual import is handled by the DoTrackImport class.
"""
# Place to keep a reference to importer workers. This is an instance
# variable to allow tests access. As this is a singleton, a class
# variable or an instance variable are effectively the same thing.
workers: dict[str, DoTrackImport] = {}
def __init__(
self, base_model: PlaylistModel, row_number: Optional[int] = None
) -> None:
"""
Initialise the FileImporter singleton instance.
Set up class
"""
# Create ModelData
@ -116,13 +109,23 @@ class FileImporter:
row_number = base_model.rowCount()
self.model_data = ThreadData(base_model=base_model, row_number=row_number)
# Populate self.import_files_data
for infile in [
os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f)
for f in os.listdir(Config.REPLACE_FILES_DEFAULT_SOURCE)
if f.endswith((".mp3", ".flac"))
]:
self.import_files_data[infile] = TrackFileData()
# Place to keep a reference to importer threads
self.threads: list[QThread] = []
# Data structure to track files to import
self.import_files_data: list[TrackFileData] = []
self.import_files_data: dict[str, TrackFileData] = {}
# Dictionary of exsting tracks indexed by track.id
self.existing_tracks = self._get_existing_tracks()
# Get signals
self.signals = MusicMusterSignals()
def _get_existing_tracks(self) -> Sequence[Tracks]:
@ -133,56 +136,11 @@ class FileImporter:
with db.Session() as session:
return Tracks.get_all(session)
def start(self) -> None:
"""
Build a TrackFileData object for each new file to import, add it
to self.import_files_data, and trigger importing.
def do_import(self) -> None:
"""
Populate self.import_files_data, which is a TrackFileData object for each entry.
new_files: list[str] = []
if not os.listdir(Config.REPLACE_FILES_DEFAULT_SOURCE):
show_OK(
"File import",
f"No files in {Config.REPLACE_FILES_DEFAULT_SOURCE} to import",
None,
)
return
for infile in [
os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f)
for f in os.listdir(Config.REPLACE_FILES_DEFAULT_SOURCE)
if f.endswith((".mp3", ".flac"))
]:
if infile in [a.source_path for a in self.import_files_data]:
log.debug(f"file_importer.start skipping {infile=}, already queued")
else:
new_files.append(infile)
self.import_files_data.append(self.populate_trackfiledata(infile))
# Tell user which files won't be imported and why
self.inform_user(
[
a
for a in self.import_files_data
if a.source_path in new_files and a.import_this_file is False
]
)
# Remove do-not-import entries from queue
self.import_files_data[:] = [
a for a in self.import_files_data if a.import_this_file is not False
]
# Start the import if necessary
log.debug(f"Import files prepared: {[a.source_path for a in self.import_files_data]}")
self._import_next_file()
def populate_trackfiledata(self, path: str) -> TrackFileData:
"""
Populate TrackFileData object for path:
- Validate file to be imported
- Validate files to be imported
- Find matches and similar files
- Get user choices for each import file
- Validate self.import_files_data integrity
@ -190,66 +148,86 @@ class FileImporter:
- Import the files, one by one.
"""
tfd = TrackFileData(source_path=path)
if not self.import_files_data:
show_OK(
"File import",
f"No files in {Config.REPLACE_FILES_DEFAULT_SOURCE} to import",
None,
)
return
if self.check_file_readable(tfd):
if self.check_file_tags(tfd):
self.find_similar(tfd)
if len(tfd.track_match_data) > 1:
self.sort_track_match_data(tfd)
selection = self.get_user_choices(tfd)
if self.process_selection(tfd, selection):
if self.validate_file_data(tfd):
tfd.import_this_file = True
for path in self.import_files_data.keys():
self.validate_file(path)
if self.import_files_data[path].import_this_file:
self.find_similar(path)
if len(self.import_files_data[path].track_match_data) > 1:
self.sort_track_match_data(path)
selection = self.get_user_choices(path)
self.process_selection(path, selection)
if self.import_files_data[path].import_this_file:
self.validate_file_data(path)
return tfd
# Tell user which files won't be imported and why
self.inform_user()
# Start the import of all other files
self.import_next_file()
def check_file_readable(self, tfd: TrackFileData) -> bool:
def validate_file(self, path: str) -> None:
"""
Check file is readable.
Return True if it is.
Populate error and return False if not.
- check all files are readable
- check all files have tags
- Mark failures not to be imported and populate error text.
On return, the following TrackFileData fields should be set:
tags: Yes
destination_path: No
import_this_file: Yes (set by default)
error: No (only set if an error is detected)
file_path_to_remove: No
track_id: No
track_match_data: No
"""
if file_is_unreadable(tfd.source_path):
tfd.import_this_file = False
tfd.error = f"{tfd.source_path} is unreadable"
return False
for path in self.import_files_data.keys():
if file_is_unreadable(path):
self.import_files_data[path].import_this_file = False
self.import_files_data[path].error = f"{path} is unreadable"
continue
return True
try:
self.import_files_data[path].tags = get_tags(path)
except ApplicationError as e:
self.import_files_data[path].import_this_file = False
self.import_files_data[path].error = f"Tag errors ({str(e)})"
continue
def check_file_tags(self, tfd: TrackFileData) -> bool:
"""
Add tags to tfd
Return True if successful.
Populate error and return False if not.
"""
try:
tfd.tags = get_tags(tfd.source_path)
except ApplicationError as e:
tfd.import_this_file = False
tfd.error = f"of tag errors ({str(e)})"
return False
return True
def find_similar(self, tfd: TrackFileData) -> None:
def find_similar(self, path: str) -> None:
"""
- Search title in existing tracks
- if score >= Config.FUZZYMATCH_MINIMUM_LIST:
- get artist score
- add TrackMatchData to self.import_files_data[path].track_match_data
On return, the following TrackFileData fields should be set:
tags: Yes
destination_path: No
import_this_file: Yes (set by default)
error: No (only set if an error is detected)
file_path_to_remove: No
track_id: No
track_match_data: YES, IN THIS FUNCTION
"""
title = tfd.tags.title
artist = tfd.tags.artist
title = self.import_files_data[path].tags.title
artist = self.import_files_data[path].tags.artist
for existing_track in self.existing_tracks:
title_score = self._get_match_score(title, existing_track.title)
if title_score >= Config.FUZZYMATCH_MINIMUM_LIST:
artist_score = self._get_match_score(artist, existing_track.artist)
tfd.track_match_data.append(
self.import_files_data[path].track_match_data.append(
TrackMatchData(
artist=existing_track.artist,
artist_match=artist_score,
@ -259,12 +237,14 @@ class FileImporter:
)
)
def sort_track_match_data(self, tfd: TrackFileData) -> None:
def sort_track_match_data(self, path: str) -> None:
"""
Sort matched tracks in artist-similarity order
"""
tfd.track_match_data.sort(key=lambda x: x.artist_match, reverse=True)
self.import_files_data[path].track_match_data.sort(
key=lambda x: x.artist_match, reverse=True
)
def _get_match_score(self, str1: str, str2: str) -> float:
"""
@ -286,7 +266,7 @@ class FileImporter:
return combined_score
def get_user_choices(self, tfd: TrackFileData) -> int:
def get_user_choices(self, path: str) -> int:
"""
Find out whether user wants to import this as a new track,
overwrite an existing track or not import it at all.
@ -302,12 +282,15 @@ class FileImporter:
choices.append((Config.IMPORT_AS_NEW, 0, ""))
# New track details
new_track_description = f"{tfd.tags.title} ({tfd.tags.artist})"
new_track_description = (
f"{self.import_files_data[path].tags.title} "
f"({self.import_files_data[path].tags.artist})"
)
# Select 'import as new' as default unless the top match is good
# enough
default = 1
track_match_data = tfd.track_match_data
track_match_data = self.import_files_data[path].track_match_data
if track_match_data:
if (
track_match_data[0].artist_match
@ -340,39 +323,48 @@ class FileImporter:
else:
return -1
def process_selection(self, tfd: TrackFileData, selection: int) -> bool:
def process_selection(self, path: str, selection: int) -> None:
"""
Process selection from PickMatch
"""
if selection < 0:
# User cancelled
tfd.import_this_file = False
tfd.error = "you asked not to import this file"
return False
self.import_files_data[path].import_this_file = False
self.import_files_data[path].error = "you asked not to import this file"
elif selection > 0:
# Import and replace track
self.replace_file(tfd, track_id=selection)
self.replace_file(path=path, track_id=selection)
else:
# Import as new
self.import_as_new(tfd)
self.import_as_new(path=path)
return True
def replace_file(self, tfd: TrackFileData, track_id: int) -> None:
def replace_file(self, path: str, track_id: int) -> None:
"""
Set up to replace an existing file.
On return, the following TrackFileData fields should be set:
tags: Yes
destination_path: YES, IN THIS FUNCTION
import_this_file: Yes (set by default)
error: No (only set if an error is detected)
file_path_to_remove: YES, IN THIS FUNCTION
track_id: YES, IN THIS FUNCTION
track_match_data: Yes
"""
if track_id < 1:
raise ApplicationError(f"No track ID: replace_file({tfd=}, {track_id=})")
ifd = self.import_files_data[path]
tfd.track_id = track_id
if track_id < 1:
raise ApplicationError(f"No track ID: replace_file({path=}, {track_id=})")
ifd.track_id = track_id
existing_track_path = self._get_existing_track(track_id).path
tfd.file_path_to_remove = existing_track_path
ifd.file_path_to_remove = existing_track_path
# If the existing file in the Config.IMPORT_DESTINATION
# directory, replace it with the imported file name; otherwise,
@ -380,11 +372,11 @@ class FileImporter:
# names from CDs, etc.
if os.path.dirname(existing_track_path) == Config.IMPORT_DESTINATION:
tfd.destination_path = os.path.join(
Config.IMPORT_DESTINATION, os.path.basename(tfd.source_path)
ifd.destination_path = os.path.join(
Config.IMPORT_DESTINATION, os.path.basename(path)
)
else:
tfd.destination_path = existing_track_path
ifd.destination_path = existing_track_path
def _get_existing_track(self, track_id: int) -> Tracks:
"""
@ -399,45 +391,58 @@ class FileImporter:
return existing_track_records[0]
def import_as_new(self, tfd: TrackFileData) -> None:
def import_as_new(self, path: str) -> None:
"""
Set up to import as a new file.
On return, the following TrackFileData fields should be set:
tags: Yes
destination_path: YES, IN THIS FUNCTION
import_this_file: Yes (set by default)
error: No (only set if an error is detected)
file_path_to_remove: No (not needed now)
track_id: Yes
track_match_data: Yes
"""
tfd.destination_path = os.path.join(
Config.IMPORT_DESTINATION, os.path.basename(tfd.source_path)
ifd = self.import_files_data[path]
ifd.destination_path = os.path.join(
Config.IMPORT_DESTINATION, os.path.basename(path)
)
def validate_file_data(self, tfd: TrackFileData) -> bool:
def validate_file_data(self, path: str) -> None:
"""
Check the data structures for integrity
Return True if all OK
Populate error and return False if not.
"""
ifd = self.import_files_data[path]
# Check import_this_file
if not ifd.import_this_file:
return
# Check tags
if not (tfd.tags.artist and tfd.tags.title):
raise ApplicationError(
f"validate_file_data: {tfd.tags=}, {tfd.source_path=}"
)
if not (ifd.tags.artist and ifd.tags.title):
raise ApplicationError(f"validate_file_data: {ifd.tags=}, {path=}")
# Check file_path_to_remove
if tfd.file_path_to_remove and not os.path.exists(tfd.file_path_to_remove):
if ifd.file_path_to_remove and not os.path.exists(ifd.file_path_to_remove):
# File to remove is missing, but this isn't a major error. We
# may be importing to replace a deleted file.
tfd.file_path_to_remove = ""
ifd.file_path_to_remove = ""
# Check destination_path
if not tfd.destination_path:
if not ifd.destination_path:
raise ApplicationError(
f"validate_file_data: no destination path set ({tfd.source_path=})"
f"validate_file_data: no destination path set ({path=})"
)
# If destination path is the same as file_path_to_remove, that's
# OK, otherwise if this is a new import then check check
# destination path doesn't already exists
if tfd.track_id == 0 and tfd.destination_path != tfd.file_path_to_remove:
while os.path.exists(tfd.destination_path):
if ifd.track_id == 0 and ifd.destination_path != ifd.file_path_to_remove:
while os.path.exists(ifd.destination_path):
msg = (
"New import requested but default destination path ({ifd.destination_path}) "
"already exists. Click OK and choose where to save this track"
@ -450,104 +455,92 @@ class FileImporter:
directory=Config.IMPORT_DESTINATION,
)
if pathspec:
if pathspec == '':
# User cancelled
tfd.error = "You did not select a location to save this track"
return False
tfd.destination_path = pathspec[0]
ifd.destination_path = pathspec[0]
else:
tfd.error = "destination file already exists"
return False
ifd.import_this_file = False
ifd.error = "destination file already exists"
return
# Check track_id
if tfd.track_id < 0:
raise ApplicationError(
f"validate_file_data: track_id < 0, {tfd.source_path=}"
)
if ifd.track_id < 0:
raise ApplicationError(f"validate_file_data: track_id < 0, {path=}")
return True
def inform_user(self, tfds: list[TrackFileData]) -> None:
def inform_user(self) -> None:
"""
Tell user about files that won't be imported
"""
msgs: list[str] = []
for tfd in tfds:
msgs.append(
f"{os.path.basename(tfd.source_path)} will not be imported because {tfd.error}"
)
for path, entry in self.import_files_data.items():
if entry.import_this_file is False:
msgs.append(
f"{os.path.basename(path)} will not be imported because {entry.error}"
)
if msgs:
show_OK("File not imported", "\r\r".join(msgs))
log.debug("\r\r".join(msgs))
def _import_next_file(self) -> None:
def import_next_file(self) -> None:
"""
Import the next file sequentially.
This is called when an import completes so will be called asynchronously.
Protect with a lock.
"""
lock = threading.Lock()
while True:
if not self.import_files_data:
self.signals.status_message_signal.emit("All files imported", 10000)
return
with lock:
while len(self.workers) < Config.MAX_IMPORT_THREADS:
try:
tfd = self.import_files_data.pop()
filename = os.path.basename(tfd.source_path)
log.debug(f"_import_next_file: {filename}")
log.debug(
f"remaining files: {[a.source_path for a in self.import_files_data]}"
)
self.signals.status_message_signal.emit(f"Importing {filename}", 10000)
self._start_import(tfd)
except IndexError:
log.debug("import_next_file: no files remaining in queue")
break
# Get details for next file to import
path, tfd = self.import_files_data.popitem()
if tfd.import_this_file:
break
def _start_import(self, tfd: TrackFileData) -> None:
"""
Start thread to import track
"""
print(f"import_next_file {path=}")
filename = os.path.basename(tfd.source_path)
log.debug(f"_start_import({filename=})")
self.workers[tfd.source_path] = DoTrackImport(
import_file_path=tfd.source_path,
# Create and start a thread for processing
worker = DoTrackImport(
import_file_path=path,
tags=tfd.tags,
destination_path=tfd.destination_path,
track_id=tfd.track_id,
)
log.debug(f"{self.workers[tfd.source_path]=} created")
thread = QThread()
self.threads.append(thread)
self.workers[tfd.source_path].import_finished.connect(self.post_import_processing)
self.workers[tfd.source_path].finished.connect(lambda: self.cleanup_thread(tfd))
self.workers[tfd.source_path].finished.connect(self.workers[tfd.source_path].deleteLater)
# Move worker to thread
worker.moveToThread(thread)
self.workers[tfd.source_path].start()
# Connect signals and slots
thread.started.connect(worker.run)
thread.started.connect(lambda: print(f"Thread starting for {path=}"))
def cleanup_thread(self, tfd: TrackFileData) -> None:
worker.import_finished.connect(self.post_import_processing)
worker.import_finished.connect(thread.quit)
worker.import_finished.connect(lambda: print(f"Worker ended for {path=}"))
# Ensure cleanup only after thread is fully stopped
thread.finished.connect(lambda: self.cleanup_thread(thread, worker))
thread.finished.connect(lambda: print(f"Thread ended for {path=}"))
# Start the thread
print(f"Calling thread.start() for {path=}")
thread.start()
def cleanup_thread(self, thread, worker):
"""
Remove references to finished threads/workers to prevent leaks.
"""
log.debug(f"cleanup_thread({tfd.source_path=})")
worker.deleteLater()
thread.deleteLater()
if thread in self.threads:
self.threads.remove(thread)
if tfd.source_path in self.workers:
del self.workers[tfd.source_path]
else:
log.debug(f"Couldn't find entry in self.workers: {tfd.source_path=}")
log.debug(f"After cleanup_thread: {self.workers.keys()=}")
def post_import_processing(self, source_path: str, track_id: int) -> None:
def post_import_processing(self, track_id: int) -> None:
"""
If track already in playlist, refresh it else insert it
"""
log.debug(f"post_import_processing({source_path=}, {track_id=})")
log.debug(f"post_import_processing({track_id=})")
if self.model_data:
if self.model_data.base_model:
@ -555,16 +548,16 @@ class FileImporter:
track_id, self.model_data.row_number
)
# Process next file(s)
self._import_next_file()
# Process next file
self.import_next_file()
class DoTrackImport(QThread):
class DoTrackImport(QObject):
"""
Class to manage the actual import of tracks in a thread.
"""
import_finished = pyqtSignal(str, int)
import_finished = pyqtSignal(int)
def __init__(
self,
@ -585,9 +578,6 @@ class DoTrackImport(QThread):
self.signals = MusicMusterSignals()
def __repr__(self) -> str:
return f"<DoTrackImport(id={hex(id(self))}, import_file_path={self.import_file_path}"
def run(self) -> None:
"""
Either create track objects from passed files or update exising track
@ -611,11 +601,11 @@ class DoTrackImport(QThread):
if temp_file and os.path.exists(temp_file):
os.unlink(temp_file)
self.signals.status_message_signal.emit(
f"Importing {os.path.basename(self.import_file_path)}", 5000
)
with db.Session() as session:
self.signals.status_message_signal.emit(
f"Importing {os.path.basename(self.import_file_path)}", 5000
)
if self.track_id == 0:
# Import new track
try:
@ -640,9 +630,6 @@ class DoTrackImport(QThread):
if hasattr(track, key):
setattr(track, key, value)
track.path = self.destination_track_path
else:
log.error(f"Unable to retrieve {self.track_id=}")
return
session.commit()
helpers.normalise_track(self.destination_track_path)
@ -650,7 +637,7 @@ class DoTrackImport(QThread):
self.signals.status_message_signal.emit(
f"{os.path.basename(self.import_file_path)} imported", 10000
)
self.import_finished.emit(self.import_file_path, track.id)
self.import_finished.emit(track.id)
class PickMatch(QDialog):

View File

@ -200,9 +200,9 @@ def get_tags(path: str) -> Tags:
try:
tag = TinyTag.get(path)
except FileNotFoundError:
raise ApplicationError(f"File not found: {path})")
raise ApplicationError(f"File not found: get_tags({path=})")
except TinyTagException:
raise ApplicationError(f"Can't read tags in {path})")
raise ApplicationError(f"Can't read tags: get_tags({path=})")
if (
tag.title is None
@ -210,7 +210,7 @@ def get_tags(path: str) -> Tags:
or tag.bitrate is None
or tag.duration is None
):
raise ApplicationError(f"Missing tags in {path})")
raise ApplicationError(f"Missing tags: get_tags({path=})")
return Tags(
title=tag.title,

36
app/log.py Normal file → Executable file
View File

@ -1,6 +1,5 @@
#!/usr/bin/env python3
# Standard library imports
from collections import defaultdict
import logging
import logging.config
import logging.handlers
@ -21,38 +20,15 @@ from config import Config
class FunctionFilter(logging.Filter):
"""Filter to allow category-based logging to stderr."""
def __init__(self, module_functions: dict[str, list[str]]):
def __init__(self, functions: set[str]):
super().__init__()
self.modules: list[str] = []
self.functions: defaultdict[str, list[str]] = defaultdict(list)
for module in module_functions.keys():
if module_functions[module]:
for function in module_functions[module]:
self.functions[module].append(function)
else:
self.modules.append(module)
self.functions = functions
def filter(self, record: logging.LogRecord) -> bool:
if not getattr(record, "levelname", None) == "DEBUG":
# Only prcess DEBUG messages
return False
module = getattr(record, "module", None)
if not module:
# No module in record
return False
# Process if this is a module we're tracking
if module in self.modules:
return True
# Process if this is a function we're tracking
if getattr(record, "funcName", None) in self.functions[module]:
return True
return False
return (
getattr(record, "funcName", None) in self.functions
and getattr(record, "levelname", None) == "DEBUG"
)
class LevelTagFilter(logging.Filter):

View File

@ -4,24 +4,18 @@ disable_existing_loggers: True
formatters:
colored:
(): colorlog.ColoredFormatter
format: "%(log_color)s[%(asctime)s] %(filename)s.%(funcName)s:%(lineno)s %(blue)s%(message)s"
format: "%(log_color)s[%(asctime)s] %(filename)s:%(lineno)s %(message)s"
datefmt: "%H:%M:%S"
syslog:
format: "[%(name)s] %(filename)s:%(lineno)s %(leveltag)s: %(message)s"
filters:
leveltag:
(): log.LevelTagFilter
(): newlogger.LevelTagFilter
category_filter:
(): log.FunctionFilter
module_functions:
# Optionally additionally log some debug calls to stderr
# log all debug calls in a module:
# module-name: []
# log debug calls for some functions in a module:
# module-name:
# - function-name-1
# - function-name-2
(): newlogger.FunctionFilter
functions: !!set
fb: null
handlers:
stderr:

View File

@ -1,7 +1,5 @@
#!/usr/bin/env python3
from log import log
from newlogger import log
# Testing
def fa():
log.debug("fa Debug message")

View File

@ -860,7 +860,7 @@ class Window(QMainWindow, Ui_MainWindow):
self.current.base_model,
self.current_row_or_end()
)
self.importer.start()
self.importer.do_import()
def insert_header(self) -> None:
"""Show dialog box to enter header text and add to playlist"""

View File

@ -1030,7 +1030,7 @@ class PlaylistModel(QAbstractTableModel):
log.debug(f"{self}: OBS scene changed to '{scene_name}'")
continue
except obswebsocket.exceptions.ConnectionFailure:
log.warning(f"{self}: OBS connection refused")
log.error(f"{self}: OBS connection refused")
return
def previous_track_ended(self) -> None:
@ -1151,7 +1151,6 @@ class PlaylistModel(QAbstractTableModel):
]:
if ts:
ts.update_playlist_and_row(session)
session.commit()
self.update_track_times()