377 lines
10 KiB
Python
377 lines
10 KiB
Python
import os
|
|
import psutil
|
|
import shutil
|
|
import smtplib
|
|
import ssl
|
|
import tempfile
|
|
|
|
from config import Config
|
|
from datetime import datetime
|
|
from email.message import EmailMessage
|
|
from log import log
|
|
from mutagen.flac import FLAC # type: ignore
|
|
from mutagen.mp3 import MP3 # type: ignore
|
|
from pydub import AudioSegment, effects
|
|
from pydub.utils import mediainfo
|
|
from PyQt6.QtWidgets import QMainWindow, QMessageBox # type: ignore
|
|
from tinytag import TinyTag # type: ignore
|
|
from typing import Any, Dict, Optional
|
|
|
|
|
|
def ask_yes_no(title: str, question: str, default_yes: bool = False) -> bool:
|
|
"""Ask question; return True for yes, False for no"""
|
|
|
|
dlg = QMessageBox()
|
|
dlg.setWindowTitle(title)
|
|
dlg.setText(question)
|
|
dlg.setStandardButtons(
|
|
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No
|
|
)
|
|
dlg.setIcon(QMessageBox.Icon.Question)
|
|
if default_yes:
|
|
dlg.setDefaultButton(QMessageBox.StandardButton.Yes)
|
|
button = dlg.exec()
|
|
|
|
return button == QMessageBox.StandardButton.Yes
|
|
|
|
|
|
def fade_point(
|
|
audio_segment: AudioSegment,
|
|
fade_threshold: float = 0.0,
|
|
chunk_size: int = Config.AUDIO_SEGMENT_CHUNK_SIZE,
|
|
) -> int:
|
|
"""
|
|
Returns the millisecond/index of the point where the volume drops below
|
|
the maximum and doesn't get louder again.
|
|
audio_segment - the sdlg_search_database_uiegment to find silence in
|
|
fade_threshold - the upper bound for how quiet is silent in dFBS
|
|
chunk_size - chunk size for interating over the segment in ms
|
|
"""
|
|
|
|
assert chunk_size > 0 # to avoid infinite loop
|
|
|
|
segment_length: int = audio_segment.duration_seconds * 1000 # ms
|
|
trim_ms = segment_length - chunk_size
|
|
max_vol = audio_segment.dBFS
|
|
if fade_threshold == 0:
|
|
fade_threshold = max_vol
|
|
|
|
while (
|
|
audio_segment[trim_ms : trim_ms + chunk_size].dBFS < fade_threshold
|
|
and trim_ms > 0
|
|
): # noqa W503
|
|
trim_ms -= chunk_size
|
|
|
|
# if there is no trailing silence, return lenght of track (it's less
|
|
# the chunk_size, but for chunk_size = 10ms, this may be ignored)
|
|
return int(trim_ms)
|
|
|
|
|
|
def file_is_unreadable(path: Optional[str]) -> bool:
|
|
"""
|
|
Returns True if passed path is readable, else False
|
|
"""
|
|
|
|
if not path:
|
|
return True
|
|
|
|
return not os.access(path, os.R_OK)
|
|
|
|
|
|
def get_audio_segment(path: str) -> Optional[AudioSegment]:
|
|
try:
|
|
if path.endswith(".mp3"):
|
|
return AudioSegment.from_mp3(path)
|
|
elif path.endswith(".flac"):
|
|
return AudioSegment.from_file(path, "flac") # type: ignore
|
|
except AttributeError:
|
|
return None
|
|
|
|
return None
|
|
|
|
|
|
def get_tags(path: str) -> Dict[str, Any]:
|
|
"""
|
|
Return a dictionary of title, artist, duration-in-milliseconds and path.
|
|
"""
|
|
|
|
tag = TinyTag.get(path)
|
|
|
|
return dict(
|
|
title=tag.title,
|
|
artist=tag.artist,
|
|
bitrate=round(tag.bitrate),
|
|
duration=int(round(tag.duration, Config.MILLISECOND_SIGFIGS) * 1000),
|
|
path=path,
|
|
)
|
|
|
|
|
|
def get_relative_date(
|
|
past_date: Optional[datetime], reference_date: Optional[datetime] = None
|
|
) -> str:
|
|
"""
|
|
Return how long before reference_date past_date is as string.
|
|
|
|
Params:
|
|
@past_date: datetime
|
|
@reference_date: datetime, defaults to current date and time
|
|
|
|
@return: string
|
|
"""
|
|
|
|
if not past_date:
|
|
return "Never"
|
|
if not reference_date:
|
|
reference_date = datetime.now()
|
|
|
|
# Check parameters
|
|
if past_date > reference_date:
|
|
return "get_relative_date() past_date is after relative_date"
|
|
|
|
days: int
|
|
days_str: str
|
|
weeks: int
|
|
weeks_str: str
|
|
|
|
weeks, days = divmod((reference_date.date() - past_date.date()).days, 7)
|
|
if weeks == days == 0:
|
|
# Same day so return time instead
|
|
return past_date.strftime("%H:%M")
|
|
if weeks == 1:
|
|
weeks_str = "week"
|
|
else:
|
|
weeks_str = "weeks"
|
|
if days == 1:
|
|
days_str = "day"
|
|
else:
|
|
days_str = "days"
|
|
return f"{weeks} {weeks_str}, {days} {days_str} ago"
|
|
|
|
|
|
def leading_silence(
|
|
audio_segment: AudioSegment,
|
|
silence_threshold: int = Config.DBFS_SILENCE,
|
|
chunk_size: int = Config.AUDIO_SEGMENT_CHUNK_SIZE,
|
|
) -> int:
|
|
"""
|
|
Returns the millisecond/index that the leading silence ends.
|
|
audio_segment - the segment to find silence in
|
|
silence_threshold - the upper bound for how quiet is silent in dFBS
|
|
chunk_size - chunk size for interating over the segment in ms
|
|
|
|
https://github.com/jiaaro/pydub/blob/master/pydub/silence.py
|
|
"""
|
|
|
|
trim_ms: int = 0 # ms
|
|
assert chunk_size > 0 # to avoid infinite loop
|
|
while audio_segment[
|
|
trim_ms : trim_ms + chunk_size
|
|
].dBFS < silence_threshold and trim_ms < len( # noqa W504
|
|
audio_segment
|
|
):
|
|
trim_ms += chunk_size
|
|
|
|
# if there is no end it should return the length of the segment
|
|
return min(trim_ms, len(audio_segment))
|
|
|
|
|
|
def send_mail(to_addr, from_addr, subj, body):
|
|
# From https://docs.python.org/3/library/email.examples.html
|
|
|
|
# Create a text/plain message
|
|
msg = EmailMessage()
|
|
msg.set_content(body)
|
|
|
|
msg["Subject"] = subj
|
|
msg["From"] = from_addr
|
|
msg["To"] = to_addr
|
|
|
|
# Send the message via SMTP server.
|
|
context = ssl.create_default_context()
|
|
try:
|
|
s = smtplib.SMTP(host=Config.MAIL_SERVER, port=Config.MAIL_PORT)
|
|
if Config.MAIL_USE_TLS:
|
|
s.starttls(context=context)
|
|
if Config.MAIL_USERNAME and Config.MAIL_PASSWORD:
|
|
s.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD)
|
|
s.send_message(msg)
|
|
except Exception as e:
|
|
print(e)
|
|
finally:
|
|
s.quit()
|
|
|
|
|
|
def ms_to_mmss(ms: Optional[int], decimals: int = 0, negative: bool = False) -> str:
|
|
"""Convert milliseconds to mm:ss"""
|
|
|
|
minutes: int
|
|
remainder: int
|
|
seconds: float
|
|
|
|
if not ms:
|
|
return "-"
|
|
sign = ""
|
|
if ms < 0:
|
|
if negative:
|
|
sign = "-"
|
|
else:
|
|
ms = 0
|
|
|
|
minutes, remainder = divmod(ms, 60 * 1000)
|
|
seconds = remainder / 1000
|
|
|
|
# if seconds >= 59.5, it will be represented as 60, which looks odd.
|
|
# So, fake it under those circumstances
|
|
if seconds >= 59.5:
|
|
seconds = 59.0
|
|
|
|
return f"{sign}{minutes:.0f}:{seconds:02.{decimals}f}"
|
|
|
|
|
|
def normalise_track(path):
|
|
"""Normalise track"""
|
|
|
|
# Check type
|
|
ftype = os.path.splitext(path)[1][1:]
|
|
if ftype not in ["mp3", "flac"]:
|
|
log.info(
|
|
f"helpers.normalise_track({path}): " f"File type {ftype} not implemented"
|
|
)
|
|
|
|
bitrate = mediainfo(path)["bit_rate"]
|
|
audio = get_audio_segment(path)
|
|
if not audio:
|
|
return
|
|
|
|
# Get current file gid, uid and permissions
|
|
stats = os.stat(path)
|
|
try:
|
|
# Copy original file
|
|
_, temp_path = tempfile.mkstemp()
|
|
shutil.copyfile(path, temp_path)
|
|
except Exception as err:
|
|
log.debug(f"helpers.normalise_track({path}): err1: {repr(err)}")
|
|
return
|
|
|
|
# Overwrite original file with normalised output
|
|
normalised = effects.normalize(audio)
|
|
try:
|
|
normalised.export(path, format=os.path.splitext(path)[1][1:], bitrate=bitrate)
|
|
# Fix up permssions and ownership
|
|
os.chown(path, stats.st_uid, stats.st_gid)
|
|
os.chmod(path, stats.st_mode)
|
|
# Copy tags
|
|
if ftype == "flac":
|
|
tag_handler = FLAC
|
|
elif ftype == "mp3":
|
|
tag_handler = MP3
|
|
else:
|
|
return
|
|
src = tag_handler(temp_path)
|
|
dst = tag_handler(path)
|
|
for tag in src:
|
|
dst[tag] = src[tag]
|
|
dst.save()
|
|
except Exception as err:
|
|
log.debug(f"helpers.normalise_track({path}): err2: {repr(err)}")
|
|
# Restore original file
|
|
shutil.copyfile(path, temp_path)
|
|
finally:
|
|
if os.path.exists(temp_path):
|
|
os.remove(temp_path)
|
|
|
|
|
|
def open_in_audacity(path: str) -> bool:
|
|
"""
|
|
Open passed file in Audacity
|
|
|
|
Return True if apparently opened successfully, else False
|
|
"""
|
|
|
|
# Return if audacity not running
|
|
if "audacity" not in [i.name() for i in psutil.process_iter()]:
|
|
return False
|
|
|
|
# Return if path not given
|
|
if not path:
|
|
return False
|
|
|
|
to_pipe: str = "/tmp/audacity_script_pipe.to." + str(os.getuid())
|
|
from_pipe: str = "/tmp/audacity_script_pipe.from." + str(os.getuid())
|
|
eol: str = "\n"
|
|
|
|
def send_command(command: str) -> None:
|
|
"""Send a single command."""
|
|
to_audacity.write(command + eol)
|
|
to_audacity.flush()
|
|
|
|
def get_response() -> str:
|
|
"""Return the command response."""
|
|
|
|
result: str = ""
|
|
line: str = ""
|
|
|
|
while True:
|
|
result += line
|
|
line = from_audacity.readline()
|
|
if line == "\n" and len(result) > 0:
|
|
break
|
|
return result
|
|
|
|
def do_command(command: str) -> str:
|
|
"""Send one command, and return the response."""
|
|
|
|
send_command(command)
|
|
response = get_response()
|
|
return response
|
|
|
|
with open(to_pipe, "w") as to_audacity, open(from_pipe, "rt") as from_audacity:
|
|
do_command(f'Import2: Filename="{path}"')
|
|
|
|
return True
|
|
|
|
|
|
def set_track_metadata(session, track):
|
|
"""Set/update track metadata in database"""
|
|
|
|
t = get_tags(track.path)
|
|
audio = get_audio_segment(track.path)
|
|
|
|
track.title = t["title"]
|
|
track.artist = t["artist"]
|
|
track.bitrate = t["bitrate"]
|
|
|
|
if not audio:
|
|
return
|
|
track.duration = len(audio)
|
|
track.start_gap = leading_silence(audio)
|
|
track.fade_at = round(fade_point(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000
|
|
track.silence_at = (
|
|
round(trailing_silence(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000
|
|
)
|
|
track.mtime = os.path.getmtime(track.path)
|
|
|
|
session.commit()
|
|
|
|
|
|
def show_OK(parent: QMainWindow, title: str, msg: str) -> None:
|
|
"""Display a message to user"""
|
|
|
|
QMessageBox.information(parent, title, msg, buttons=QMessageBox.StandardButton.Ok)
|
|
|
|
|
|
def show_warning(parent: QMainWindow, title: str, msg: str) -> None:
|
|
"""Display a warning to user"""
|
|
|
|
QMessageBox.warning(parent, title, msg, buttons=QMessageBox.StandardButton.Cancel)
|
|
|
|
|
|
def trailing_silence(
|
|
audio_segment: AudioSegment,
|
|
silence_threshold: int = -50,
|
|
chunk_size: int = Config.AUDIO_SEGMENT_CHUNK_SIZE,
|
|
) -> int:
|
|
"""Return fade point from start in milliseconds"""
|
|
|
|
return fade_point(audio_segment, silence_threshold, chunk_size)
|