#!/usr/bin/env python import argparse import os import shutil import tempfile import helpers from config import Config from helpers import ( fade_point, get_audio_segment, get_tags, leading_silence, trailing_silence, ) from log import DEBUG, INFO from models import Notes, Playdates, Session, Tracks from mutagen.flac import FLAC from mutagen.mp3 import MP3 from pydub import effects # Globals (I know) messages = [] def main(): """Main loop""" DEBUG("Starting") p = argparse.ArgumentParser() # Only allow one option to be specified group = p.add_mutually_exclusive_group() group.add_argument('-u', '--update', action="store_true", dest="update", default=False, help="Update database") group.add_argument('-f', '--full-update', action="store_true", dest="full_update", default=False, help="Update database") args = p.parse_args() # Run as required if args.update: DEBUG("Updating database") with Session() as session: update_db(session) elif args.full_update: DEBUG("Full update of database") with Session() as session: full_update_db(session) else: INFO("No action specified") DEBUG("Finished") def create_track_from_file(session, path, normalise=None, tags=None): """ Create track in database from passed path, or update database entry if path already in database. Return track. """ if not tags: t = get_tags(path) else: t = tags track = Tracks.get_or_create(session, path) track.title = t['title'] track.artist = t['artist'] audio = get_audio_segment(path) track.duration = len(audio) track.start_gap = leading_silence(audio) track.fade_at = round(fade_point(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000 track.silence_at = round(trailing_silence(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000 track.mtime = os.path.getmtime(path) session.commit() if normalise or normalise is None and Config.NORMALISE_ON_IMPORT: # Check type ftype = os.path.splitext(path)[1][1:] if ftype not in ['mp3', 'flac']: INFO(f"File type {ftype} not implemented") return track # Get current file gid, uid and permissions stats = os.stat(path) try: # Copy original file fd, temp_path = tempfile.mkstemp() shutil.copyfile(path, temp_path) except Exception as err: DEBUG(f"songdb.create_track_from_file({path}): err1: {repr(err)}") return # Overwrite original file with normalised output normalised = effects.normalize(audio) try: normalised.export(path, format=os.path.splitext(path)[1][1:]) # Fix up permssions and ownership os.chown(path, stats.st_uid, stats.st_gid) os.chmod(path, stats.st_mode) # Copy tags if ftype == 'flac': tag_handler = FLAC elif ftype == 'mp3': tag_handler = MP3 else: return track src = tag_handler(temp_path) dst = tag_handler(path) for tag in src: dst[tag] = src[tag] dst.save() except Exception as err: DEBUG(f"songdb.create_track_from_file({path}): err2: {repr(err)}") # Restore original file shutil.copyfile(path, temp_path) finally: if os.path.exists(temp_path): os.remove(temp_path) return track def full_update_db(session): """Rescan all entries in database""" def log(msg): INFO(f"full_update_db(): {msg}") def check_change(track_id, title, attribute, old, new): if new > (old * 1.1) or new < (old * 0.9): log( "\n" f"track[{track_id}] ({title}) " f"{attribute} updated from {old} to {new}" ) # Start with normal update to add new tracks and remove any missing # files log("update_db()") update_db(session) # Now update track length, silence and fade for every track in # database tracks = Tracks.get_all_tracks(session) total_tracks = len(tracks) log(f"Processing {total_tracks} tracks") track_count = 0 for track in tracks: track_count += 1 print(f"\rTrack {track_count} of {total_tracks}", end='') # Sanity check tag = get_music_info(track.path) if not tag['title']: log(f"track[{track.id}] {track.title=}: No tag title") continue if not tag['artist']: log(f"track[{track.id}] {track.artist=}: No tag artist") continue # Update title and artist if track.title != tag['title']: track.title = tag['title'] if track.artist != tag['artist']: track.artist = tag['artist'] # Update numbers; log if more than 10% different duration = int(round( tag['duration'], Config.MILLISECOND_SIGFIGS) * 1000) check_change(track.id, track.title, "duration", track.duration, duration) track.duration = duration audio = get_audio_segment(track.path) start_gap = leading_silence(audio) check_change(track.id, track.title, "start_gap", track.start_gap, start_gap) track.start_gap = start_gap fade_at = fade_point(audio) check_change(track.id, track.title, "fade_at", track.fade_at, fade_at) track.fade_at = fade_at silence_at = trailing_silence(audio) check_change(track.id, track.title, "silence_at", track.silence_at, silence_at) track.silence_at = silence_at session.commit() def update_db(session): """ Repopulate database """ # Search for tracks that are in the music directory but not the datebase # Check all paths in database exist # If issues found, write to stdout but do not try to resolve them db_paths = set(Tracks.get_all_paths(session)) os_paths_list = [] for root, dirs, files in os.walk(Config.ROOT): for f in files: path = os.path.join(root, f) ext = os.path.splitext(f)[1] if ext in [".flac", ".mp3"]: os_paths_list.append(path) os_paths = set(os_paths_list) # Find any files in music directory that are not in database files_not_in_db = list(os_paths - db_paths) # Find paths in database missing in music directory paths_not_found = [] missing_file_count = 0 more_files_to_report = False for path in list(db_paths - os_paths): if missing_file_count >= Config.MAX_MISSING_FILES_TO_REPORT: more_files_to_report = True break missing_file_count += 1 track = Tracks.get_by_path(session, path) if not track: ERROR(f"update_db: {path} not found in db") continue paths_not_found.append(track) # Output messages (so if running via cron, these will get sent to # user) if files_not_in_db: print("Files in music directory but not in database") print("--------------------------------------------") print("\n".join(files_not_in_db)) print("\n") if paths_not_found: print("Invalid paths in database") print("-------------------------") for t in paths_not_found: print(f""" Track ID: {t.id} Path: {t.path} Title: {t.title} Artist: {t.artist} """) if more_files_to_report: print("There were more paths than listed that were not found") # Spike # # # Manage tracks listed in database but where path is invalid # DEBUG(f"Invalid {path=} in database", True) # track = Tracks.get_by_path(session, path) # messages.append(f"Remove from database: {path=} {track=}") # # # Remove references from Playdates # Playdates.remove_track(session, track.id) # # # Replace playlist entries with a note # note_txt = ( # f"File removed: {track.title=}, {track.artist=}, " # f"{track.path=}" # ) # for playlist_track in track.playlists: # row = playlist_track.row # # Remove playlist entry # DEBUG(f"Remove {row=} from {playlist_track.playlist_id}", True) # playlist_track.playlist.remove_track(session, row) # # Create note # DEBUG(f"Add note at {row=} to {playlist_track.playlist_id=}", True) # Notes(session, playlist_track.playlist_id, row, note_txt) # # # Remove Track entry pointing to invalid path # Tracks.remove_by_path(session, path) if __name__ == '__main__' and '__file__' in globals(): main()