# #!/usr/bin/env python # # import argparse # import os # import shutil # import tempfile # # import helpers # from config import Config # from helpers import ( # fade_point, # get_audio_segment, # get_tags, # leading_silence, # trailing_silence, # ) # from log import DEBUG, INFO # from models import Notes, Playdates, Session, Tracks # from mutagen.flac import FLAC # from mutagen.mp3 import MP3 # from pydub import effects # # # Globals (I know) # messages = [] # # # def main(): # """Main loop""" # # DEBUG("Starting") # # p = argparse.ArgumentParser() # # Only allow one option to be specified # group = p.add_mutually_exclusive_group() # group.add_argument('-u', '--update', # action="store_true", dest="update", # default=False, help="Update database") # group.add_argument('-f', '--full-update', # action="store_true", dest="full_update", # default=False, help="Update database") # args = p.parse_args() # # # Run as required # if args.update: # DEBUG("Updating database") # with Session() as session: # update_db(session) # elif args.full_update: # DEBUG("Full update of database") # with Session() as session: # full_update_db(session) # else: # INFO("No action specified") # # DEBUG("Finished") # # # def create_track_from_file(session, path, normalise=None, tags=None): # """ # Create track in database from passed path, or update database entry # if path already in database. # # Return track. # """ # # if not tags: # t = get_tags(path) # else: # t = tags # # track = Tracks.get_or_create(session, path) # track.title = t['title'] # track.artist = t['artist'] # audio = get_audio_segment(path) # track.duration = len(audio) # track.start_gap = leading_silence(audio) # track.fade_at = round(fade_point(audio) / 1000, # Config.MILLISECOND_SIGFIGS) * 1000 # track.silence_at = round(trailing_silence(audio) / 1000, # Config.MILLISECOND_SIGFIGS) * 1000 # track.mtime = os.path.getmtime(path) # session.commit() # # if normalise or normalise is None and Config.NORMALISE_ON_IMPORT: # # Check type # ftype = os.path.splitext(path)[1][1:] # if ftype not in ['mp3', 'flac']: # INFO(f"File type {ftype} not implemented") # return track # # # Get current file gid, uid and permissions # stats = os.stat(path) # try: # # Copy original file # fd, temp_path = tempfile.mkstemp() # shutil.copyfile(path, temp_path) # except Exception as err: # DEBUG(f"songdb.create_track_from_file({path}): err1: {repr(err)}") # return # # # Overwrite original file with normalised output # normalised = effects.normalize(audio) # try: # normalised.export(path, format=os.path.splitext(path)[1][1:]) # # Fix up permssions and ownership # os.chown(path, stats.st_uid, stats.st_gid) # os.chmod(path, stats.st_mode) # # Copy tags # if ftype == 'flac': # tag_handler = FLAC # elif ftype == 'mp3': # tag_handler = MP3 # else: # return track # src = tag_handler(temp_path) # dst = tag_handler(path) # for tag in src: # dst[tag] = src[tag] # dst.save() # except Exception as err: # DEBUG(f"songdb.create_track_from_file({path}): err2: {repr(err)}") # # Restore original file # shutil.copyfile(path, temp_path) # finally: # if os.path.exists(temp_path): # os.remove(temp_path) # # return track # # # def full_update_db(session): # """Rescan all entries in database""" # # def log(msg): # INFO(f"full_update_db(): {msg}") # # def check_change(track_id, title, attribute, old, new): # if new > (old * 1.1) or new < (old * 0.9): # log( # "\n" # f"track[{track_id}] ({title}) " # f"{attribute} updated from {old} to {new}" # ) # # # Start with normal update to add new tracks and remove any missing # # files # log("update_db()") # update_db(session) # # # Now update track length, silence and fade for every track in # # database # # tracks = Tracks.get_all_tracks(session) # total_tracks = len(tracks) # log(f"Processing {total_tracks} tracks") # track_count = 0 # for track in tracks: # track_count += 1 # print(f"\rTrack {track_count} of {total_tracks}", end='') # # # Sanity check # tag = get_music_info(track.path) # if not tag['title']: # log(f"track[{track.id}] {track.title=}: No tag title") # continue # if not tag['artist']: # log(f"track[{track.id}] {track.artist=}: No tag artist") # continue # # # Update title and artist # if track.title != tag['title']: # track.title = tag['title'] # if track.artist != tag['artist']: # track.artist = tag['artist'] # # # Update numbers; log if more than 10% different # duration = int(round( # tag['duration'], Config.MILLISECOND_SIGFIGS) * 1000) # check_change(track.id, track.title, "duration", track.duration, # duration) # track.duration = duration # # audio = get_audio_segment(track.path) # # start_gap = leading_silence(audio) # check_change(track.id, track.title, "start_gap", track.start_gap, # start_gap) # track.start_gap = start_gap # # fade_at = fade_point(audio) # check_change(track.id, track.title, "fade_at", track.fade_at, # fade_at) # track.fade_at = fade_at # # silence_at = trailing_silence(audio) # check_change(track.id, track.title, "silence_at", track.silence_at, # silence_at) # track.silence_at = silence_at # session.commit() # # # def update_db(session): # """ # Repopulate database # """ # # # Search for tracks that are in the music directory but not the datebase # # Check all paths in database exist # # If issues found, write to stdout but do not try to resolve them # # db_paths = set(Tracks.get_all_paths(session)) # # os_paths_list = [] # for root, dirs, files in os.walk(Config.ROOT): # for f in files: # path = os.path.join(root, f) # ext = os.path.splitext(f)[1] # if ext in [".flac", ".mp3"]: # os_paths_list.append(path) # os_paths = set(os_paths_list) # # # Find any files in music directory that are not in database # files_not_in_db = list(os_paths - db_paths) # # # Find paths in database missing in music directory # paths_not_found = [] # missing_file_count = 0 # more_files_to_report = False # for path in list(db_paths - os_paths): # if missing_file_count >= Config.MAX_MISSING_FILES_TO_REPORT: # more_files_to_report = True # break # # missing_file_count += 1 # # track = Tracks.get_by_path(session, path) # if not track: # ERROR(f"update_db: {path} not found in db") # continue # # paths_not_found.append(track) # # # Output messages (so if running via cron, these will get sent to # # user) # if files_not_in_db: # print("Files in music directory but not in database") # print("--------------------------------------------") # print("\n".join(files_not_in_db)) # print("\n") # if paths_not_found: # print("Invalid paths in database") # print("-------------------------") # for t in paths_not_found: # print(f""" # Track ID: {t.id} # Path: {t.path} # Title: {t.title} # Artist: {t.artist} # """) # if more_files_to_report: # print("There were more paths than listed that were not found") # # # # Spike # # # # # Manage tracks listed in database but where path is invalid # # DEBUG(f"Invalid {path=} in database", True) # # track = Tracks.get_by_path(session, path) # # messages.append(f"Remove from database: {path=} {track=}") # # # # # Remove references from Playdates # # Playdates.remove_track(session, track.id) # # # # # Replace playlist entries with a note # # note_txt = ( # # f"File removed: {track.title=}, {track.artist=}, " # # f"{track.path=}" # # ) # # for playlist_track in track.playlists: # # row = playlist_track.row # # # Remove playlist entry # # DEBUG(f"Remove {row=} from {playlist_track.playlist_id}", True) # # playlist_track.playlist.remove_track(session, row) # # # Create note # # DEBUG(f"Add note at {row=} to {playlist_track.playlist_id=}", True) # # Notes(session, playlist_track.playlist_id, row, note_txt) # # # # # Remove Track entry pointing to invalid path # # Tracks.remove_by_path(session, path) # # if __name__ == '__main__' and '__file__' in globals(): # main()