Pass all arguments to Tracks.__init__ on track creation Smarten up metadata collecting Reformat code Reinstate stackprinter, but with more sensible settings (mostly defaults, oddly enough)
392 lines
11 KiB
Python
392 lines
11 KiB
Python
import os
|
|
import psutil
|
|
import shutil
|
|
import smtplib
|
|
import ssl
|
|
import tempfile
|
|
|
|
from config import Config
|
|
from datetime import datetime
|
|
from email.message import EmailMessage
|
|
from log import log
|
|
from mutagen.flac import FLAC # type: ignore
|
|
from mutagen.mp3 import MP3 # type: ignore
|
|
from pydub import AudioSegment, effects
|
|
from pydub.utils import mediainfo
|
|
from PyQt6.QtWidgets import QMainWindow, QMessageBox # type: ignore
|
|
from tinytag import TinyTag # type: ignore
|
|
from typing import Any, Dict, Optional
|
|
|
|
|
|
def ask_yes_no(title: str, question: str, default_yes: bool = False) -> bool:
|
|
"""Ask question; return True for yes, False for no"""
|
|
|
|
dlg = QMessageBox()
|
|
dlg.setWindowTitle(title)
|
|
dlg.setText(question)
|
|
dlg.setStandardButtons(
|
|
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No
|
|
)
|
|
dlg.setIcon(QMessageBox.Icon.Question)
|
|
if default_yes:
|
|
dlg.setDefaultButton(QMessageBox.StandardButton.Yes)
|
|
button = dlg.exec()
|
|
|
|
return button == QMessageBox.StandardButton.Yes
|
|
|
|
|
|
def fade_point(
|
|
audio_segment: AudioSegment,
|
|
fade_threshold: float = 0.0,
|
|
chunk_size: int = Config.AUDIO_SEGMENT_CHUNK_SIZE,
|
|
) -> int:
|
|
"""
|
|
Returns the millisecond/index of the point where the volume drops below
|
|
the maximum and doesn't get louder again.
|
|
audio_segment - the sdlg_search_database_uiegment to find silence in
|
|
fade_threshold - the upper bound for how quiet is silent in dFBS
|
|
chunk_size - chunk size for interating over the segment in ms
|
|
"""
|
|
|
|
assert chunk_size > 0 # to avoid infinite loop
|
|
|
|
segment_length: int = audio_segment.duration_seconds * 1000 # ms
|
|
trim_ms = segment_length - chunk_size
|
|
max_vol = audio_segment.dBFS
|
|
if fade_threshold == 0:
|
|
fade_threshold = max_vol
|
|
|
|
while (
|
|
audio_segment[trim_ms : trim_ms + chunk_size].dBFS < fade_threshold
|
|
and trim_ms > 0
|
|
): # noqa W503
|
|
trim_ms -= chunk_size
|
|
|
|
# if there is no trailing silence, return lenght of track (it's less
|
|
# the chunk_size, but for chunk_size = 10ms, this may be ignored)
|
|
return int(trim_ms)
|
|
|
|
|
|
def file_is_unreadable(path: Optional[str]) -> bool:
|
|
"""
|
|
Returns True if passed path is readable, else False
|
|
"""
|
|
|
|
if not path:
|
|
return True
|
|
|
|
return not os.access(path, os.R_OK)
|
|
|
|
|
|
def get_audio_segment(path: str) -> Optional[AudioSegment]:
|
|
try:
|
|
if path.endswith(".mp3"):
|
|
return AudioSegment.from_mp3(path)
|
|
elif path.endswith(".flac"):
|
|
return AudioSegment.from_file(path, "flac") # type: ignore
|
|
except AttributeError:
|
|
return None
|
|
|
|
return None
|
|
|
|
|
|
def get_tags(path: str) -> Dict[str, Any]:
|
|
"""
|
|
Return a dictionary of title, artist, duration-in-milliseconds and path.
|
|
"""
|
|
|
|
tag = TinyTag.get(path)
|
|
|
|
return dict(
|
|
title=tag.title,
|
|
artist=tag.artist,
|
|
bitrate=round(tag.bitrate),
|
|
duration=int(round(tag.duration, Config.MILLISECOND_SIGFIGS) * 1000),
|
|
path=path,
|
|
)
|
|
|
|
|
|
def get_relative_date(
|
|
past_date: Optional[datetime], reference_date: Optional[datetime] = None
|
|
) -> str:
|
|
"""
|
|
Return how long before reference_date past_date is as string.
|
|
|
|
Params:
|
|
@past_date: datetime
|
|
@reference_date: datetime, defaults to current date and time
|
|
|
|
@return: string
|
|
"""
|
|
|
|
if not past_date or past_date == Config.EPOCH:
|
|
return "Never"
|
|
if not reference_date:
|
|
reference_date = datetime.now()
|
|
|
|
# Check parameters
|
|
if past_date > reference_date:
|
|
return "get_relative_date() past_date is after relative_date"
|
|
|
|
days: int
|
|
days_str: str
|
|
weeks: int
|
|
weeks_str: str
|
|
|
|
weeks, days = divmod((reference_date.date() - past_date.date()).days, 7)
|
|
if weeks == days == 0:
|
|
# Same day so return time instead
|
|
return past_date.strftime("%H:%M")
|
|
if weeks == 1:
|
|
weeks_str = "week"
|
|
else:
|
|
weeks_str = "weeks"
|
|
if days == 1:
|
|
days_str = "day"
|
|
else:
|
|
days_str = "days"
|
|
return f"{weeks} {weeks_str}, {days} {days_str} ago"
|
|
|
|
|
|
def get_file_metadata(filepath: str) -> dict:
|
|
"""Return track metadata"""
|
|
|
|
# Get title, artist, bitrate, duration, path
|
|
metadata: Dict[str, str | int | float] = get_tags(filepath)
|
|
|
|
metadata['mtime'] = os.path.getmtime(filepath)
|
|
|
|
# Set start_gap, fade_at and silence_at
|
|
audio = get_audio_segment(filepath)
|
|
if not audio:
|
|
audio_values = dict(
|
|
start_gap=0,
|
|
fade_at=0,
|
|
silence_at=0
|
|
)
|
|
else:
|
|
audio_values = dict(
|
|
start_gap=leading_silence(audio),
|
|
fade_at=int(round(fade_point(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000),
|
|
silence_at=int(
|
|
round(trailing_silence(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000
|
|
)
|
|
)
|
|
metadata |= audio_values
|
|
|
|
return metadata
|
|
|
|
|
|
def leading_silence(
|
|
audio_segment: AudioSegment,
|
|
silence_threshold: int = Config.DBFS_SILENCE,
|
|
chunk_size: int = Config.AUDIO_SEGMENT_CHUNK_SIZE,
|
|
) -> int:
|
|
"""
|
|
Returns the millisecond/index that the leading silence ends.
|
|
audio_segment - the segment to find silence in
|
|
silence_threshold - the upper bound for how quiet is silent in dFBS
|
|
chunk_size - chunk size for interating over the segment in ms
|
|
|
|
https://github.com/jiaaro/pydub/blob/master/pydub/silence.py
|
|
"""
|
|
|
|
trim_ms: int = 0 # ms
|
|
assert chunk_size > 0 # to avoid infinite loop
|
|
while audio_segment[
|
|
trim_ms : trim_ms + chunk_size
|
|
].dBFS < silence_threshold and trim_ms < len( # noqa W504
|
|
audio_segment
|
|
):
|
|
trim_ms += chunk_size
|
|
|
|
# if there is no end it should return the length of the segment
|
|
return min(trim_ms, len(audio_segment))
|
|
|
|
|
|
def send_mail(to_addr, from_addr, subj, body):
|
|
# From https://docs.python.org/3/library/email.examples.html
|
|
|
|
# Create a text/plain message
|
|
msg = EmailMessage()
|
|
msg.set_content(body)
|
|
|
|
msg["Subject"] = subj
|
|
msg["From"] = from_addr
|
|
msg["To"] = to_addr
|
|
|
|
# Send the message via SMTP server.
|
|
context = ssl.create_default_context()
|
|
try:
|
|
s = smtplib.SMTP(host=Config.MAIL_SERVER, port=Config.MAIL_PORT)
|
|
if Config.MAIL_USE_TLS:
|
|
s.starttls(context=context)
|
|
if Config.MAIL_USERNAME and Config.MAIL_PASSWORD:
|
|
s.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD)
|
|
s.send_message(msg)
|
|
except Exception as e:
|
|
print(e)
|
|
finally:
|
|
s.quit()
|
|
|
|
|
|
def ms_to_mmss(ms: Optional[int], decimals: int = 0, negative: bool = False) -> str:
|
|
"""Convert milliseconds to mm:ss"""
|
|
|
|
minutes: int
|
|
remainder: int
|
|
seconds: float
|
|
|
|
if not ms:
|
|
return "-"
|
|
sign = ""
|
|
if ms < 0:
|
|
if negative:
|
|
sign = "-"
|
|
else:
|
|
ms = 0
|
|
|
|
minutes, remainder = divmod(ms, 60 * 1000)
|
|
seconds = remainder / 1000
|
|
|
|
# if seconds >= 59.5, it will be represented as 60, which looks odd.
|
|
# So, fake it under those circumstances
|
|
if seconds >= 59.5:
|
|
seconds = 59.0
|
|
|
|
return f"{sign}{minutes:.0f}:{seconds:02.{decimals}f}"
|
|
|
|
|
|
def normalise_track(path):
|
|
"""Normalise track"""
|
|
|
|
# Check type
|
|
ftype = os.path.splitext(path)[1][1:]
|
|
if ftype not in ["mp3", "flac"]:
|
|
log.info(
|
|
f"helpers.normalise_track({path}): " f"File type {ftype} not implemented"
|
|
)
|
|
|
|
bitrate = mediainfo(path)["bit_rate"]
|
|
audio = get_audio_segment(path)
|
|
if not audio:
|
|
return
|
|
|
|
# Get current file gid, uid and permissions
|
|
stats = os.stat(path)
|
|
try:
|
|
# Copy original file
|
|
_, temp_path = tempfile.mkstemp()
|
|
shutil.copyfile(path, temp_path)
|
|
except Exception as err:
|
|
log.debug(f"helpers.normalise_track({path}): err1: {repr(err)}")
|
|
return
|
|
|
|
# Overwrite original file with normalised output
|
|
normalised = effects.normalize(audio)
|
|
try:
|
|
normalised.export(path, format=os.path.splitext(path)[1][1:], bitrate=bitrate)
|
|
# Fix up permssions and ownership
|
|
os.chown(path, stats.st_uid, stats.st_gid)
|
|
os.chmod(path, stats.st_mode)
|
|
# Copy tags
|
|
if ftype == "flac":
|
|
tag_handler = FLAC
|
|
elif ftype == "mp3":
|
|
tag_handler = MP3
|
|
else:
|
|
return
|
|
src = tag_handler(temp_path)
|
|
dst = tag_handler(path)
|
|
for tag in src:
|
|
dst[tag] = src[tag]
|
|
dst.save()
|
|
except Exception as err:
|
|
log.debug(f"helpers.normalise_track({path}): err2: {repr(err)}")
|
|
# Restore original file
|
|
shutil.copyfile(path, temp_path)
|
|
finally:
|
|
if os.path.exists(temp_path):
|
|
os.remove(temp_path)
|
|
|
|
|
|
def open_in_audacity(path: str) -> bool:
|
|
"""
|
|
Open passed file in Audacity
|
|
|
|
Return True if apparently opened successfully, else False
|
|
"""
|
|
|
|
# Return if audacity not running
|
|
if "audacity" not in [i.name() for i in psutil.process_iter()]:
|
|
return False
|
|
|
|
# Return if path not given
|
|
if not path:
|
|
return False
|
|
|
|
to_pipe: str = "/tmp/audacity_script_pipe.to." + str(os.getuid())
|
|
from_pipe: str = "/tmp/audacity_script_pipe.from." + str(os.getuid())
|
|
eol: str = "\n"
|
|
|
|
def send_command(command: str) -> None:
|
|
"""Send a single command."""
|
|
to_audacity.write(command + eol)
|
|
to_audacity.flush()
|
|
|
|
def get_response() -> str:
|
|
"""Return the command response."""
|
|
|
|
result: str = ""
|
|
line: str = ""
|
|
|
|
while True:
|
|
result += line
|
|
line = from_audacity.readline()
|
|
if line == "\n" and len(result) > 0:
|
|
break
|
|
return result
|
|
|
|
def do_command(command: str) -> str:
|
|
"""Send one command, and return the response."""
|
|
|
|
send_command(command)
|
|
response = get_response()
|
|
return response
|
|
|
|
with open(to_pipe, "w") as to_audacity, open(from_pipe, "rt") as from_audacity:
|
|
do_command(f'Import2: Filename="{path}"')
|
|
|
|
return True
|
|
|
|
|
|
def set_track_metadata(track):
|
|
"""Set/update track metadata in database"""
|
|
|
|
metadata = get_file_metadata(track.path)
|
|
|
|
for key in metadata:
|
|
setattr(track, key, metadata[key])
|
|
|
|
|
|
def show_OK(parent: QMainWindow, title: str, msg: str) -> None:
|
|
"""Display a message to user"""
|
|
|
|
QMessageBox.information(parent, title, msg, buttons=QMessageBox.StandardButton.Ok)
|
|
|
|
|
|
def show_warning(parent: QMainWindow, title: str, msg: str) -> None:
|
|
"""Display a warning to user"""
|
|
|
|
QMessageBox.warning(parent, title, msg, buttons=QMessageBox.StandardButton.Cancel)
|
|
|
|
|
|
def trailing_silence(
|
|
audio_segment: AudioSegment,
|
|
silence_threshold: int = -50,
|
|
chunk_size: int = Config.AUDIO_SEGMENT_CHUNK_SIZE,
|
|
) -> int:
|
|
"""Return fade point from start in milliseconds"""
|
|
|
|
return fade_point(audio_segment, silence_threshold, chunk_size)
|