import os import psutil import shutil import smtplib import ssl import tempfile from email.message import EmailMessage from mutagen.flac import FLAC # type: ignore from mutagen.mp3 import MP3 # type: ignore from pydub import effects from pydub.utils import mediainfo from config import Config from datetime import datetime from log import log from pydub import AudioSegment from PyQt5.QtWidgets import QMessageBox from tinytag import TinyTag # type: ignore from typing import Optional # from typing import Dict, Optional, Union from typing import Dict, Union def ask_yes_no(title: str, question: str) -> bool: """Ask question; return True for yes, False for no""" button_reply = QMessageBox.question(None, title, question) return button_reply == QMessageBox.Yes def fade_point( audio_segment: AudioSegment, fade_threshold: float = 0.0, chunk_size: int = Config.AUDIO_SEGMENT_CHUNK_SIZE) -> int: """ Returns the millisecond/index of the point where the volume drops below the maximum and doesn't get louder again. audio_segment - the sdlg_search_database_uiegment to find silence in fade_threshold - the upper bound for how quiet is silent in dFBS chunk_size - chunk size for interating over the segment in ms """ assert chunk_size > 0 # to avoid infinite loop segment_length: int = audio_segment.duration_seconds * 1000 # ms trim_ms = segment_length - chunk_size max_vol = audio_segment.dBFS if fade_threshold == 0: fade_threshold = max_vol while ( audio_segment[trim_ms:trim_ms + chunk_size].dBFS < fade_threshold and trim_ms > 0): # noqa W503 trim_ms -= chunk_size # if there is no trailing silence, return lenght of track (it's less # the chunk_size, but for chunk_size = 10ms, this may be ignored) return int(trim_ms) def file_is_readable(path: str) -> bool: """ Returns True if passed path is readable, else False vlc cannot read files with a colon in the path """ return os.access(path, os.R_OK) def get_audio_segment(path: str) -> Optional[AudioSegment]: try: if path.endswith('.mp3'): return AudioSegment.from_mp3(path) elif path.endswith('.flac'): return AudioSegment.from_file(path, "flac") # type: ignore except AttributeError: return None return None def get_tags(path: str) -> Dict[str, Union[str, int]]: """ Return a dictionary of title, artist, duration-in-milliseconds and path. """ tag = TinyTag.get(path) return dict( title=tag.title, artist=tag.artist, bitrate=round(tag.bitrate), duration=int(round(tag.duration, Config.MILLISECOND_SIGFIGS) * 1000), path=path ) def get_relative_date(past_date: datetime, reference_date: Optional[datetime] = None) -> str: """ Return how long before reference_date past_date is as string. Params: @past_date: datetime @reference_date: datetime, defaults to current date and time @return: string """ if not past_date: return "Never" if not reference_date: reference_date = datetime.now() # Check parameters if past_date > reference_date: return "get_relative_date() past_date is after relative_date" days: int days_str: str weeks: int weeks_str: str weeks, days = divmod((reference_date.date() - past_date.date()).days, 7) if weeks == days == 0: # Same day so return time instead return past_date.strftime("%H:%M") if weeks == 1: weeks_str = "week" else: weeks_str = "weeks" if days == 1: days_str = "day" else: days_str = "days" return f"{weeks} {weeks_str}, {days} {days_str} ago" def leading_silence( audio_segment: AudioSegment, silence_threshold: int = Config.DBFS_SILENCE, chunk_size: int = Config.AUDIO_SEGMENT_CHUNK_SIZE) -> int: """ Returns the millisecond/index that the leading silence ends. audio_segment - the segment to find silence in silence_threshold - the upper bound for how quiet is silent in dFBS chunk_size - chunk size for interating over the segment in ms https://github.com/jiaaro/pydub/blob/master/pydub/silence.py """ trim_ms: int = 0 # ms assert chunk_size > 0 # to avoid infinite loop while ( audio_segment[trim_ms:trim_ms + chunk_size].dBFS < # noqa W504 silence_threshold and trim_ms < len(audio_segment)): trim_ms += chunk_size # if there is no end it should return the length of the segment return min(trim_ms, len(audio_segment)) def send_mail(to_addr, from_addr, subj, body): # From https://docs.python.org/3/library/email.examples.html # Create a text/plain message msg = EmailMessage() msg.set_content(body) msg['Subject'] = subj msg['From'] = from_addr msg['To'] = to_addr # Send the message via SMTP server. context = ssl.create_default_context() try: s = smtplib.SMTP(host=Config.MAIL_SERVER, port=Config.MAIL_PORT) if Config.MAIL_USE_TLS: s.starttls(context=context) if Config.MAIL_USERNAME and Config.MAIL_PASSWORD: s.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD) s.send_message(msg) except Exception as e: print(e) finally: s.quit() def ms_to_mmss(ms: int, decimals: int = 0, negative: bool = False) -> str: """Convert milliseconds to mm:ss""" minutes: int remainder: int seconds: float if not ms: return "-" sign = "" if ms < 0: if negative: sign = "-" else: ms = 0 minutes, remainder = divmod(ms, 60 * 1000) seconds = remainder / 1000 # if seconds >= 59.5, it will be represented as 60, which looks odd. # So, fake it under those circumstances if seconds >= 59.5: seconds = 59.0 return f"{sign}{minutes:.0f}:{seconds:02.{decimals}f}" def normalise_track(path): """Normalise track""" # Check type ftype = os.path.splitext(path)[1][1:] if ftype not in ['mp3', 'flac']: log.info( f"helpers.normalise_track({path}): " f"File type {ftype} not implemented" ) bitrate = mediainfo(path)['bit_rate'] audio = get_audio_segment(path) if not audio: return # Get current file gid, uid and permissions stats = os.stat(path) try: # Copy original file _, temp_path = tempfile.mkstemp() shutil.copyfile(path, temp_path) except Exception as err: log.debug( f"helpers.normalise_track({path}): err1: {repr(err)}" ) return # Overwrite original file with normalised output normalised = effects.normalize(audio) try: normalised.export(path, format=os.path.splitext(path)[1][1:], bitrate=bitrate) # Fix up permssions and ownership os.chown(path, stats.st_uid, stats.st_gid) os.chmod(path, stats.st_mode) # Copy tags if ftype == 'flac': tag_handler = FLAC elif ftype == 'mp3': tag_handler = MP3 else: return src = tag_handler(temp_path) dst = tag_handler(path) for tag in src: dst[tag] = src[tag] dst.save() except Exception as err: log.debug( f"helpers.normalise_track({path}): err2: {repr(err)}" ) # Restore original file shutil.copyfile(path, temp_path) finally: if os.path.exists(temp_path): os.remove(temp_path) def open_in_audacity(path: str) -> bool: """ Open passed file in Audacity Return True if apparently opened successfully, else False """ # Return if audacity not running if "audacity" not in [i.name() for i in psutil.process_iter()]: return False # Return if path not given if not path: return False to_pipe: str = '/tmp/audacity_script_pipe.to.' + str(os.getuid()) from_pipe: str = '/tmp/audacity_script_pipe.from.' + str(os.getuid()) eol: str = '\n' def send_command(command: str) -> None: """Send a single command.""" to_audacity.write(command + eol) to_audacity.flush() def get_response() -> str: """Return the command response.""" result: str = '' line: str = '' while True: result += line line = from_audacity.readline() if line == '\n' and len(result) > 0: break return result def do_command(command: str) -> str: """Send one command, and return the response.""" send_command(command) response = get_response() return response with open(to_pipe, 'w') as to_audacity, open( from_pipe, 'rt') as from_audacity: do_command(f'Import2: Filename="{path}"') return True def set_track_metadata(session, track): """Set/update track metadata in database""" t = get_tags(track.path) audio = get_audio_segment(track.path) track.title = t['title'] track.artist = t['artist'] track.bitrate = t['bitrate'] if not audio: return track.duration = len(audio) track.start_gap = leading_silence(audio) track.fade_at = round(fade_point(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000 track.silence_at = round(trailing_silence(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000 track.mtime = os.path.getmtime(track.path) session.commit() def show_OK(title: str, msg: str) -> None: """Display a message to user""" QMessageBox.information(None, title, msg, buttons=QMessageBox.Ok) def show_warning(title: str, msg: str) -> None: """Display a warning to user""" QMessageBox.warning(None, title, msg, buttons=QMessageBox.Cancel) def trailing_silence( audio_segment: AudioSegment, silence_threshold: int = -50, chunk_size: int = Config.AUDIO_SEGMENT_CHUNK_SIZE) -> int: """Return fade point from start in milliseconds""" return fade_point(audio_segment, silence_threshold, chunk_size)