#!/usr/bin/env python # # Script to replace existing files in parent directory. Typical usage: # the current directory contains a "better" version of the file than the # parent (eg, bettet bitrate). # # Actions: # # - check that the same filename is present in the parent directory # - check that the artist and title tags are the same # - append ".bak" to the version in the parent directory # - move file to parent directory # - normalise file # - update duration, start_gap, fade_at, silence_at, mtime in database import glob import os import pydymenu import shutil import sys from helpers import ( fade_point, get_audio_segment, get_tags, leading_silence, trailing_silence, ) from models import Tracks from dbconfig import Session from thefuzz import process # type: ignore from sqlalchemy.exc import IntegrityError from typing import List # ###################### SETTINGS ######################### process_multiple_matches = True do_processing = True process_no_matches = True # ######################################################### def insensitive_glob(pattern): def either(c): return '[%s%s]' % (c.lower(), c.upper()) if c.isalpha() else c return glob.glob(''.join(map(either, pattern))) # Check file of same name exists in parent directory source_dir = '/home/kae/music/Singles/tmp' # os.getcwd() parent_dir = os.path.dirname(source_dir) assert source_dir != parent_dir name_and_tags: List[str] = [] name_not_tags: List[str] = [] tags_not_name: List[str] = [] multiple_similar: List[str] = [] no_match: List[str] = [] possibles: List[str] = [] print(f"{source_dir=}, {parent_dir=}") def main(): if 'musicmuster_prod' not in os.environ.get('MM_DB'): response = input("Not on production database - c to continue: ") if response != "c": sys.exit(0) tracks = os.listdir(parent_dir) for fname in os.listdir(source_dir): parent_file = os.path.join(parent_dir, fname) new_file = os.path.join(source_dir, fname) us = get_tags(new_file) us_t = us['title'] us_a = us['artist'] bitrate = us['bitrate'] if os.path.exists(parent_file): # File exists, check tags p = get_tags(parent_file) p_t = p['title'] p_a = p['artist'] if ( (str(p_t).lower() != str(us_t).lower()) or (str(p_a).lower() != str(us_a).lower()) ): name_not_tags.append( f" {fname=}, {p_t} → {us_t}, {p_a} → {us_a}") process_track(new_file, parent_file, us_t, us_a, bitrate) continue name_and_tags.append(new_file) process_track(new_file, parent_file, us_t, us_a, bitrate) continue # Try to find a near match stem = fname.split(".")[0] matches = insensitive_glob(os.path.join(parent_dir, stem) + '*') match_count = len(matches) if match_count == 0: if process_no_matches: prompt = f"\n file={fname}\n title={us_t}\n artist={us_a}: " # Use fzf to search choice = pydymenu.fzf(tracks, prompt) if choice: old_file = os.path.join(parent_dir, choice[0]) oldtags = get_tags(old_file) old_title = oldtags['title'] old_artist = oldtags['artist'] print() print(f" File name will change {choice[0]}") print(f" → {fname}") print() print(f" Title tag will change {old_title}") print(f" → {us_t}") print() print(f" Artist tag will change {old_artist}") print(f" → {us_a}") print() data = input("Go ahead (y to accept)? ") if data == "y": process_track(new_file, old_file, us_t, us_a, bitrate) continue else: no_match.append(f"{fname}, {us_t=}, {us_a=}") continue no_match.append(f"{fname}, {us_t=}, {us_a=}") continue else: no_match.append(f"{fname}, {us_t=}, {us_a=}") continue if match_count > 1: multiple_similar.append(fname + "\n " + "\n ".join(matches)) if match_count <= 26 and process_multiple_matches: print(f"\n file={fname}\n title={us_t}\n artist={us_a}\n") d = {} while True: for i, match in enumerate(matches): d[i] = match for k, v in d.items(): print(f"{k}: {v}") data = input("pick one, return to quit: ") if data == "": break try: key = int(data) except ValueError: continue if key in d: dst = d[key] process_track(new_file, dst, us_t, us_a, bitrate) break else: continue continue # from break after testing for "" in data # One match, check tags sim_name = matches[0] p = get_tags(sim_name) p_t = p['title'] p_a = p['artist'] if ( (str(p_t).lower() != str(us_t).lower()) or (str(p_a).lower() != str(us_a).lower()) ): possibles.append( f"File: {os.path.basename(sim_name)} → {fname}" f"\n {p_t} → {us_t}\n {p_a} → {us_a}" ) process_track(new_file, sim_name, us_t, us_a, bitrate) continue tags_not_name.append(f"Rename {os.path.basename(sim_name)} → {fname}") process_track(new_file, sim_name, us_t, us_a, bitrate) print(f"Name and tags match ({len(name_and_tags)}):") # print(" \n".join(name_and_tags)) # print() print(f"Name but not tags match ({len(name_not_tags)}):") print(" \n".join(name_not_tags)) print() print(f"Tags but not name match ({len(tags_not_name)}):") # print(" \n".join(tags_not_name)) # print() print(f"Multiple similar names ({len(multiple_similar)}):") print(" \n".join(multiple_similar)) print() print(f"Possibles: ({len(possibles)}):") print(" \n".join(possibles)) print() print(f"No match ({len(no_match)}):") print(" \n".join(no_match)) print() def process_track(src, dst, title, artist, bitrate): new_path = os.path.join(os.path.dirname(dst), os.path.basename(src)) print( f"process_track:\n {src=}\n {new_path=}\n " f"{dst=}\n {title=}, {artist=}\n" ) if not do_processing: return with Session() as session: track = Tracks.get_by_path(session, dst) if track: track.title = title track.artist = artist track.path = new_path track.bitrate = bitrate try: session.commit() except IntegrityError: # https://jira.mariadb.org/browse/MDEV-29345 workaround session.rollback() track.title = title track.artist = artist track.path = "DUMMY" track.bitrate = bitrate session.commit() track.path = new_path session.commit() print(f"os.unlink({dst}") print(f"shutil.move({src}, {new_path}") os.unlink(dst) shutil.move(src, new_path) track = Tracks.get_by_path(session, new_path) if track: track.rescan(session) else: print(f"Can't find copied track {src=}, {dst=}") main()