# #!/usr/bin/env python # # import argparse import os import shutil import tempfile from config import Config from helpers import ( fade_point, get_audio_segment, get_tags, leading_silence, trailing_silence, ) # from log import log.debug, log.info # from models import Notes, Playdates, Session, Tracks from models import Tracks from mutagen.flac import FLAC from mutagen.mp3 import MP3 from pydub import effects # # # def main(): # """Main loop""" # # log.debug("Starting") # # p = argparse.ArgumentParser() # # Only allow one option to be specified # group = p.add_mutually_exclusive_group() # group.add_argument('-u', '--update', # action="store_true", dest="update", # default=False, help="Update database") # group.add_argument('-f', '--full-update', # action="store_true", dest="full_update", # default=False, help="Update database") # args = p.parse_args() # # # Run as required # if args.update: # log.debug("Updating database") # with Session() as session: # update_db(session) # elif args.full_update: # log.debug("Full update of database") # with Session() as session: # full_update_db(session) # else: # log.info("No action specified") # # log.debug("Finished") def create_track_from_file(session, path, normalise=None, tags=None): """ Create track in database from passed path, or update database entry if path already in database. Return track. """ if not tags: t = get_tags(path) else: t = tags track = Tracks.get_or_create(session, path) track.title = t['title'] track.artist = t['artist'] audio = get_audio_segment(path) track.duration = len(audio) track.start_gap = leading_silence(audio) track.fade_at = round(fade_point(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000 track.silence_at = round(trailing_silence(audio) / 1000, Config.MILLISECOND_SIGFIGS) * 1000 track.mtime = os.path.getmtime(path) session.commit() if normalise or normalise is None and Config.NORMALISE_ON_IMPORT: # Check type ftype = os.path.splitext(path)[1][1:] if ftype not in ['mp3', 'flac']: log.info(f"File type {ftype} not implemented") return track # Get current file gid, uid and permissions stats = os.stat(path) try: # Copy original file fd, temp_path = tempfile.mkstemp() shutil.copyfile(path, temp_path) except Exception as err: log.debug( f"utilities.create_track_from_file({path}): err1: {repr(err)}" ) return # Overwrite original file with normalised output normalised = effects.normalize(audio) try: normalised.export(path, format=os.path.splitext(path)[1][1:]) # Fix up permssions and ownership os.chown(path, stats.st_uid, stats.st_gid) os.chmod(path, stats.st_mode) # Copy tags if ftype == 'flac': tag_handler = FLAC elif ftype == 'mp3': tag_handler = MP3 else: return track src = tag_handler(temp_path) dst = tag_handler(path) for tag in src: dst[tag] = src[tag] dst.save() except Exception as err: log.debug( f"utilities.create_track_from_file({path}): err2: {repr(err)}" ) # Restore original file shutil.copyfile(path, temp_path) finally: if os.path.exists(temp_path): os.remove(temp_path) return track # # # def full_update_db(session): # """Rescan all entries in database""" # # def log(msg): # log.info(f"full_update_db(): {msg}") # # def check_change(track_id, title, attribute, old, new): # if new > (old * 1.1) or new < (old * 0.9): # log( # "\n" # f"track[{track_id}] ({title}) " # f"{attribute} updated from {old} to {new}" # ) # # # Start with normal update to add new tracks and remove any missing # # files # log("update_db()") # update_db(session) # # # Now update track length, silence and fade for every track in # # database # # tracks = Tracks.get_all_tracks(session) # total_tracks = len(tracks) # log(f"Processing {total_tracks} tracks") # track_count = 0 # for track in tracks: # track_count += 1 # print(f"\rTrack {track_count} of {total_tracks}", end='') # # # Sanity check # tag = get_music_info(track.path) # if not tag['title']: # log(f"track[{track.id}] {track.title=}: No tag title") # continue # if not tag['artist']: # log(f"track[{track.id}] {track.artist=}: No tag artist") # continue # # # Update title and artist # if track.title != tag['title']: # track.title = tag['title'] # if track.artist != tag['artist']: # track.artist = tag['artist'] # # # Update numbers; log if more than 10% different # duration = int(round( # tag['duration'], Config.MILLISECOND_SIGFIGS) * 1000) # check_change(track.id, track.title, "duration", track.duration, # duration) # track.duration = duration # # audio = get_audio_segment(track.path) # # start_gap = leading_silence(audio) # check_change(track.id, track.title, "start_gap", track.start_gap, # start_gap) # track.start_gap = start_gap # # fade_at = fade_point(audio) # check_change(track.id, track.title, "fade_at", track.fade_at, # fade_at) # track.fade_at = fade_at # # silence_at = trailing_silence(audio) # check_change(track.id, track.title, "silence_at", track.silence_at, # silence_at) # track.silence_at = silence_at # session.commit() # # # def update_db(session): # """ # Repopulate database # """ # # # Search for tracks that are in the music directory but not the datebase # # Check all paths in database exist # # If issues found, write to stdout but do not try to resolve them # # db_paths = set(Tracks.get_all_paths(session)) # # os_paths_list = [] # for root, dirs, files in os.walk(Config.ROOT): # for f in files: # path = os.path.join(root, f) # ext = os.path.splitext(f)[1] # if ext in [".flac", ".mp3"]: # os_paths_list.append(path) # os_paths = set(os_paths_list) # # # Find any files in music directory that are not in database # files_not_in_db = list(os_paths - db_paths) # # # Find paths in database missing in music directory # paths_not_found = [] # missing_file_count = 0 # more_files_to_report = False # for path in list(db_paths - os_paths): # if missing_file_count >= Config.MAX_MISSING_FILES_TO_REPORT: # more_files_to_report = True # break # # missing_file_count += 1 # # track = Tracks.get_by_path(session, path) # if not track: # log.error(f"update_db: {path} not found in db") # continue # # paths_not_found.append(track) # # # Output messages (so if running via cron, these will get sent to # # user) # if files_not_in_db: # print("Files in music directory but not in database") # print("--------------------------------------------") # print("\n".join(files_not_in_db)) # print("\n") # if paths_not_found: # print("Invalid paths in database") # print("-------------------------") # for t in paths_not_found: # print(f""" # Track ID: {t.id} # Path: {t.path} # Title: {t.title} # Artist: {t.artist} # """) # if more_files_to_report: # print("There were more paths than listed that were not found") # # # # Spike # # # # # Manage tracks listed in database but where path is invalid # # log.debug(f"Invalid {path=} in database") # # track = Tracks.get_by_path(session, path) # # messages.append(f"Remove from database: {path=} {track=}") # # # # # Remove references from Playdates # # Playdates.remove_track(session, track.id) # # # # # Replace playlist entries with a note # # note_txt = ( # # f"File removed: {track.title=}, {track.artist=}, " # # f"{track.path=}" # # ) # # for playlist_track in track.playlists: # # row = playlist_track.row # # # Remove playlist entry # # log.debug(f"Remove {row=} from {playlist_track.playlist_id}") # # playlist_track.playlist.remove_track(session, row) # # # Create note # # log.debug(f"Add note at {row=} to {playlist_track.playlist_id=}") # # Notes(session, playlist_track.playlist_id, row, note_txt) # # # # # Remove Track entry pointing to invalid path # # Tracks.remove_by_path(session, path) # # if __name__ == '__main__' and '__file__' in globals(): # main()