#!/usr/bin/env python import argparse import os import shutil import tempfile from app.config import Config from app.helpers import show_warning from app.log import DEBUG, INFO from app.models import Notes, Playdates, Session, Tracks from mutagen.flac import FLAC from mutagen.mp3 import MP3 from pydub import AudioSegment, effects # Globals (I know) messages = [] def main(): """Main loop""" DEBUG("Starting") p = argparse.ArgumentParser() # Only allow one option to be specified group = p.add_mutually_exclusive_group() group.add_argument('-u', '--update', action="store_true", dest="update", default=False, help="Update database") group.add_argument('-f', '--full-update', action="store_true", dest="full_update", default=False, help="Update database") group.add_argument('-i', '--import', dest="fname", help="Input file") args = p.parse_args() # Run as required if args.update: DEBUG("Updating database") with Session() as session: update_db(session) elif args.full_update: DEBUG("Full update of database") with Session() as session: full_update_db(session) elif args.fname: fname = os.path.realpath(args.fname) with Session() as session: create_track_from_file(session, fname, interactive=True) else: INFO("No action specified") DEBUG("Finished") def create_track_from_file(session, path, interactive=False): """ Create track in database from passed path, or update database entry if path already in database. Return track. """ if interactive: msg = f"Importing {path}" INFO(msg) INFO("-" * len(msg)) INFO("Get track info...") t = get_music_info(path) title = t['title'] artist = t['artist'] if interactive: INFO(f" Title: \"{title}\"") INFO(f" Artist: \"{artist}\"") # Check for duplicate tracks = Tracks.search_titles(session, title) if interactive and tracks: print("Found the following possible matches:") for track in tracks: print(f'"{track.title}" by {track.artist}') response = input("Continue [c] or abort [a]?") if not response: return if response[0].lower() not in ['c', 'y']: return track = Tracks.get_or_create(session, path) track.title = title track.artist = artist track.duration = int(round( t['duration'], Config.MILLISECOND_SIGFIGS) * 1000) if interactive: INFO("Parse for start, fade and silence...") audio = get_audio_segment(path) track.start_gap = leading_silence(audio) track.fade_at = round(fade_point(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000 track.silence_at = round(trailing_silence(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000 track.mtime = os.path.getmtime(path) session.commit() if Config.NORMALISE_ON_IMPORT: if interactive: INFO("Normalise...") # Check type ftype = os.path.splitext(path)[1][1:] if ftype not in ['mp3', 'flac']: INFO(f"File type {ftype} not implemented") return track # Get current file gid, uid and permissions stats = os.stat(path) try: # Copy original file fd, temp_path = tempfile.mkstemp() shutil.copyfile(path, temp_path) except Exception as err: DEBUG(f"songdb.create_track_from_file({path}): err1: {repr(err)}") return # Overwrite original file with normalised output normalised = effects.normalize(audio) try: normalised.export(path, format=os.path.splitext(path)[1][1:]) # Fix up permssions and ownership os.chown(path, stats.st_uid, stats.st_gid) os.chmod(path, stats.st_mode) # Copy tags if ftype == 'flac': tag_handler = FLAC elif ftype == 'mp3': tag_handler = MP3 else: return track src = tag_handler(temp_path) dst = tag_handler(path) for tag in src: dst[tag] = src[tag] dst.save() except Exception as err: DEBUG(f"songdb.create_track_from_file({path}): err2: {repr(err)}") # Restore original file shutil.copyfile(path, temp_path) finally: if os.path.exists(temp_path): os.remove(temp_path) return track def full_update_db(session): """Rescan all entries in database""" def log(msg): INFO(f"full_update_db(): {msg}") def check_change(track_id, title, attribute, old, new): if new > (old * 1.1) or new < (old * 0.9): log( "\n" f"track[{track_id}] ({title}) " f"{attribute} updated from {old} to {new}" ) # Start with normal update to add new tracks and remove any missing # files log("update_db()") update_db(session) # Now update track length, silence and fade for every track in # database tracks = Tracks.get_all_tracks(session) total_tracks = len(tracks) log(f"Processing {total_tracks} tracks") track_count = 0 for track in tracks: track_count += 1 print(f"\rTrack {track_count} of {total_tracks}", end='') # Sanity check tag = get_music_info(track.path) if not tag['title']: log(f"track[{track.id}] {track.title=}: No tag title") continue if not tag['artist']: log(f"track[{track.id}] {track.artist=}: No tag artist") continue # Update title and artist if track.title != tag['title']: track.title = tag['title'] if track.artist != tag['artist']: track.artist = tag['artist'] # Update numbers; log if more than 10% different duration = int(round( tag['duration'], Config.MILLISECOND_SIGFIGS) * 1000) check_change(track.id, track.title, "duration", track.duration, duration) track.duration = duration audio = get_audio_segment(track.path) start_gap = leading_silence(audio) check_change(track.id, track.title, "start_gap", track.start_gap, start_gap) track.start_gap = start_gap fade_at = fade_point(audio) check_change(track.id, track.title, "fade_at", track.fade_at, fade_at) track.fade_at = fade_at silence_at = trailing_silence(audio) check_change(track.id, track.title, "silence_at", track.silence_at, silence_at) track.silence_at = silence_at session.commit() def update_db(session): """ Repopulate database """ # Search for tracks in only one of directory and database db_paths = set(Tracks.get_all_paths(session)) os_paths_list = [] for root, dirs, files in os.walk(Config.ROOT): for f in files: path = os.path.join(root, f) ext = os.path.splitext(f)[1] if ext in [".flac", ".mp3"]: os_paths_list.append(path) os_paths = set(os_paths_list) # If a track is moved, only the path will have changed. # For any files we have found whose paths are not in the database, # check to see whether the filename (basename) is present in the # database: for path in list(os_paths - db_paths): DEBUG(f"songdb.update_db: {path=} not in database") # is filename in database? track = Tracks.get_from_filename(session, os.path.basename(path)) if not track: messages.append(f"Track missing from database: {path}") else: # Check track info matches found track t = get_music_info(path) if t['artist'] == track.artist and t['title'] == track.title: track.update_path(path) else: create_track_from_file(session, path) # Refresh database paths db_paths = set(Tracks.get_all_paths(session)) # Remote any tracks from database whose paths don't exist for path in list(db_paths - os_paths): # Manage tracks listed in database but where path is invalid track = Tracks.get_from_path(session, path) messages.append(f"Remove from database: {path=} {track=}") # Remove references from Playdates Playdates.remove_track(session, track.id) # Replace playlist entries with a note note_txt = ( f"File removed: {track.title=}, {track.artist=}, " f"{track.path=}" ) for playlist in [a.playlist for a in track.playlists]: # Create note Notes(session, playlist.id, pt.row, note_txt) # TODO: this needs to call playlist.add_note() now # Remove playlist entry playlist.remove_track(session, pt.row) # Remove Track entry pointing to invalid path Tracks.remove_by_path(session, path) # Output messages (so if running via cron, these will get sent to # user) if messages: print("Messages") print("\n".join(messages)) if __name__ == '__main__' and '__file__' in globals(): main()