#!/usr/bin/env python import argparse import os import shutil import tempfile from config import Config from log import DEBUG, INFO from model import Notes, Playdates, PlaylistTracks, Session, Tracks from mutagen.flac import FLAC from mutagen.mp3 import MP3 from pydub import AudioSegment, effects from tinytag import TinyTag def main(): "Main loop" INFO("Starting") # Parse command line p = argparse.ArgumentParser() p.add_argument('-u', '--update', action="store_true", dest="update", default=False, help="Update database") p.add_argument('-f', '--full-update', action="store_true", dest="full_update", default=False, help="Update database") args = p.parse_args() # Run as required if args.update: INFO("Updating database") with Session() as session: update_db(session) elif args.full_update: INFO("Full update of database") with Session() as session: full_update_db(session) else: INFO("No action specified") INFO("Finished") def create_track_from_file(session, path): """ Create track in database from passed path, or update database entry if path already in database. Return track. """ track = Tracks.get_or_create(session, path) t = get_music_info(path) track.title = t['title'] track.artist = t['artist'] track.duration = int(round( t['duration'], Config.MILLISECOND_SIGFIGS) * 1000) audio = get_audio_segment(path) track.start_gap = leading_silence(audio) track.fade_at = round(fade_point(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000 track.silence_at = round(trailing_silence(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000 track.mtime = os.path.getmtime(path) session.commit() if Config.NORMALISE_ON_IMPORT: # Check type ftype = os.path.splitext(path)[1][1:] if ftype not in ['mp3', 'flac']: INFO(f"File type {ftype} not implemented") return track # Get current file gid, uid and permissions stats = os.stat(path) try: # Copy original file fd, temp_path = tempfile.mkstemp() shutil.copyfile(path, temp_path) except Exception as err: DEBUG(f"songdb.create_track_from_file({path}): err1: {str(err)}") return # Overwrite original file with normalised output normalised = effects.normalize(audio) try: normalised.export(path, format=os.path.splitext(path)[1][1:]) # Fix up permssions and ownership os.chown(path, stats.st_uid, stats.st_gid) os.chmod(path, stats.st_mode) # Copy tags if ftype == 'flac': tag_handler = FLAC elif ftype == 'mp3': tag_handler = MP3 else: return track src = tag_handler(temp_path) dst = tag_handler(path) for tag in src: dst[tag] = src[tag] dst.save() except Exception as err: DEBUG(f"songdb.create_track_from_file({path}): err2: {str(err)}") # Restore original file shutil.copyfile(path, temp_path) finally: if os.path.exists(temp_path): os.remove(temp_path) return track def full_update_db(session): "Rescan all entries in database" def log(msg): INFO(f"full_update_db(): {msg}") def check_change(track_id, title, attribute, old, new): if new > (old * 1.1) or new < (old * 0.9): log( "\n" f"track[{track_id}] ({title}) " f"{attribute} updated from {old} to {new}" ) # Start with normal update to add new tracks and remove any missing # files log("update_db()") update_db(session) # Now update track length, silence and fade for every track in # database tracks = Tracks.get_all_tracks(session) total_tracks = len(tracks) log(f"Processing {total_tracks} tracks") track_count = 0 for track in tracks: track_count += 1 print(f"\rTrack {track_count} of {total_tracks}", end='') # Sanity check tag = get_music_info(track.path) if not tag['title']: log(f"track[{track.id}] {track.title=}: No tag title") continue if track.artist: if track.artist != tag['artist']: log( f"track[{track.id}] artist mismatch: " f"{track.artist=} {tag['artist']=}" ) continue # Update title if track.title != tag['title']: track.title = tag['title'] # Update numbers; log if more than 10% different duration = int(round( tag['duration'], Config.MILLISECOND_SIGFIGS) * 1000) check_change(track.id, track.title, "duration", track.duration, duration) track.duration = duration audio = get_audio_segment(track.path) start_gap = leading_silence(audio) check_change(track.id, track.title, "start_gap", track.start_gap, start_gap) track.start_gap = start_gap fade_at = fade_point(audio) check_change(track.id, track.title, "fade_at", track.fade_at, fade_at) track.fade_at = fade_at silence_at = trailing_silence(audio) check_change(track.id, track.title, "silence_at", track.silence_at, silence_at) track.silence_at = silence_at session.commit() def get_audio_segment(path): try: if path.endswith('.mp3'): return AudioSegment.from_mp3(path) elif path.endswith('.flac'): return AudioSegment.from_file(path, "flac") except AttributeError: return None def get_music_info(path): """ Return a dictionary of title, artist, duration-in-milliseconds and path. """ tag = TinyTag.get(path) return dict( title=tag.title, artist=tag.artist, duration=tag.duration, path=path ) def leading_silence(audio_segment, silence_threshold=Config.DBFS_SILENCE, chunk_size=Config.AUDIO_SEGMENT_CHUNK_SIZE): """ Returns the millisecond/index that the leading silence ends. audio_segment - the segment to find silence in silence_threshold - the upper bound for how quiet is silent in dFBS chunk_size - chunk size for interating over the segment in ms https://github.com/jiaaro/pydub/blob/master/pydub/silence.py """ trim_ms = 0 # ms assert chunk_size > 0 # to avoid infinite loop while ( audio_segment[trim_ms:trim_ms + chunk_size].dBFS < # noqa W504 silence_threshold and trim_ms < len(audio_segment)): trim_ms += chunk_size # if there is no end it should return the length of the segment return min(trim_ms, len(audio_segment)) def fade_point(audio_segment, fade_threshold=0, chunk_size=Config.AUDIO_SEGMENT_CHUNK_SIZE): """ Returns the millisecond/index of the point where the volume drops below the maximum and doesn't get louder again. audio_segment - the sdlg_search_database_uiegment to find silence in fade_threshold - the upper bound for how quiet is silent in dFBS chunk_size - chunk size for interating over the segment in ms """ assert chunk_size > 0 # to avoid infinite loop segment_length = audio_segment.duration_seconds * 1000 # ms trim_ms = segment_length - chunk_size max_vol = audio_segment.dBFS if fade_threshold == 0: fade_threshold = max_vol while ( audio_segment[trim_ms:trim_ms + chunk_size].dBFS < fade_threshold and trim_ms > 0): # noqa W503 trim_ms -= chunk_size # if there is no trailing silence, return lenght of track (it's less # the chunk_size, but for chunk_size = 10ms, this may be ignored) return int(trim_ms) def trailing_silence(audio_segment, silence_threshold=-50.0, chunk_size=Config.AUDIO_SEGMENT_CHUNK_SIZE): return fade_point(audio_segment, silence_threshold, chunk_size) def update_db(session): """ Repopulate database """ # Search for tracks in only one of directory and database db_paths = set(Tracks.get_all_paths(session)) os_paths_list = [] for root, dirs, files in os.walk(Config.ROOT): for f in files: path = os.path.join(root, f) ext = os.path.splitext(f)[1] if ext in [".flac", ".mp3"]: os_paths_list.append(path) os_paths = set(os_paths_list) # If a track is moved, only the path will have changed. # For any files we have found whose paths are not in the database, # check to see whether the filename (basename) is present in the # database: for path in list(os_paths - db_paths): DEBUG(f"songdb.update_db: {path=} not in database") # is filename in database? track = Tracks.get_track_from_filename(session, os.path.basename(path)) if not track: INFO(f"songdb.update_db: Adding to database: {path}") create_track_from_file(session, path) else: # Check track info matches found track t = get_music_info(path) if t['artist'] == track.artist and t['title'] == track.title: track.update_path(path) else: create_track_from_file(session, path) # Refresh database paths db_paths = set(Tracks.get_all_paths(session)) # Remote any tracks from database whose paths don't exist for path in list(db_paths - os_paths): # Manage tracks listed in database but where path is invalid track = Tracks.get_track_from_path(session, path) INFO(f"songdb.update_db(): remove from database: {path=} {track=}") # Remove references from Playdates Playdates.remove_track(session, track.id) # Replace playlist entries with a note note_txt = ( f"File removed: {track.title=}, {track.artist=}, " f"{track.path=}" ) for pt in PlaylistTracks.get_track_playlists(session, track.id): # Create note Notes.add_note(session, pt.playlist_id, pt.row, note_txt) # Remove playlist entry PlaylistTracks.remove_track(session, pt.playlist_id, pt.row) # Remove Track entry pointing to invalid path Tracks.remove_path(session, path) if __name__ == '__main__' and '__file__' in globals(): main()