#!/usr/bin/env python # # Script to replace existing files in parent directory. Typical usage: # the current directory contains a "better" version of the file than the # parent (eg, bettet bitrate). import glob import os import pydymenu # type: ignore import shutil import sys from helpers import ( fade_point, get_audio_segment, get_tags, leading_silence, trailing_silence, ) from models import Tracks from dbconfig import Session from thefuzz import process # type: ignore from sqlalchemy.exc import IntegrityError from typing import List # ###################### SETTINGS ######################### process_name_and_tags_matches = True process_tag_matches = True do_processing = True process_no_matches = True source_dir = '/home/kae/music/Singles/tmp' parent_dir = os.path.dirname(source_dir) # ######################################################### def insensitive_glob(pattern): """Helper for case insensitive glob.glob()""" def either(c): return '[%s%s]' % (c.lower(), c.upper()) if c.isalpha() else c return glob.glob(''.join(map(either, pattern))) name_and_tags: List[str] = [] tags_not_name: List[str] = [] multiple_similar: List[str] = [] no_match: List[str] = [] possibles: List[str] = [] no_match: int = 0 def main(): global no_match # We only want to run this against the production database because # we will affect files in the common pool of tracks used by all # databases if 'musicmuster_prod' not in os.environ.get('MM_DB'): response = input("Not on production database - c to continue: ") if response != "c": sys.exit(0) # Sanity check assert source_dir != parent_dir # Scan parent directory with Session() as session: all_tracks = Tracks.get_all(session) parent_tracks = [a for a in all_tracks if parent_dir in a.path] parent_fnames = [os.path.basename(a.path) for a in parent_tracks] # Create a dictionary of parent paths with their titles and # artists parents = {} for t in parent_tracks: parents[t.path] = {"title": t.title, "artist": t.artist} titles_to_path = {} artists_to_path = {} for k, v in parents.items(): try: titles_to_path[v['title'].lower()] = k artists_to_path[v['artist'].lower()] = k except AttributeError: continue for new_fname in os.listdir(source_dir): new_path = os.path.join(source_dir, new_fname) new_tags = get_tags(new_path) new_title = new_tags['title'] new_artist = new_tags['artist'] bitrate = new_tags['bitrate'] # If same filename exists in parent direcory, check tags parent_path = os.path.join(parent_dir, new_fname) if os.path.exists(parent_path): parent_tags = get_tags(parent_path) parent_title = parent_tags['title'] parent_artist = parent_tags['artist'] if ( (str(parent_title).lower() == str(new_title).lower()) and (str(parent_artist).lower() == str(new_artist).lower()) ): name_and_tags.append( f" {new_fname=}, {parent_title} → {new_title}, " f" {parent_artist} → {new_artist}" ) if process_name_and_tags_matches: process_track(new_path, parent_path, new_title, new_artist, bitrate) continue # Check for matching tags although filename is different if new_title.lower() in titles_to_path: possible_path = titles_to_path[new_title.lower()] if parents[possible_path]['artist'].lower() == new_artist.lower(): # print( # f"title={new_title}, artist={new_artist}:\n" # f" {new_path} → {parent_path}" # ) tags_not_name.append( f"title={new_title}, artist={new_artist}:\n" f" {new_path} → {parent_path}" ) if process_tag_matches: process_track(new_path, possible_path, new_title, new_artist, bitrate) continue else: no_match += 1 else: no_match += 1 # Try to find a near match # stem = new_fname.split(".")[0] # matches = insensitive_glob(os.path.join(parent_dir, stem) + '*') # match_count = len(matches) # if match_count == 0: if process_no_matches: prompt = f"\n file={new_fname}\n title={new_title}\n artist={new_artist}: " # Use fzf to search choice = pydymenu.fzf(parent_fnames, prompt) if choice: old_file = os.path.join(parent_dir, choice[0]) oldtags = get_tags(old_file) old_title = oldtags['title'] old_artist = oldtags['artist'] print() print(f" File name will change {choice[0]}") print(f" → {new_fname}") print() print(f" Title tag will change {old_title}") print(f" → {new_title}") print() print(f" Artist tag will change {old_artist}") print(f" → {new_artist}") print() data = input("Go ahead (y to accept)? ") if data == "y": process_track(new_path, old_file, new_title, new_artist, bitrate) continue if data == "q": sys.exit(0) else: continue # else: # no_match.append(f"{new_fname}, {new_title=}, {new_artist=}") # continue # if match_count > 1: # multiple_similar.append(new_fname + "\n " + "\n ".join(matches)) # if match_count <= 26 and process_multiple_matches: # print(f"\n file={new_fname}\n title={new_title}\n artist={new_artist}\n") # d = {} # while True: # for i, match in enumerate(matches): # d[i] = match # for k, v in d.items(): # print(f"{k}: {v}") # data = input("pick one, return to quit: ") # if data == "": # break # try: # key = int(data) # except ValueError: # continue # if key in d: # dst = d[key] # process_track(new_path, dst, new_title, new_artist, bitrate) # break # else: # continue # continue # from break after testing for "" in data # # One match, check tags # sim_name = matches[0] # p = get_tags(sim_name) # parent_title = p['title'] # parent_artist = p['artist'] # if ( # (str(parent_title).lower() != str(new_title).lower()) or # (str(parent_artist).lower() != str(new_artist).lower()) # ): # possibles.append( # f"File: {os.path.basename(sim_name)} → {new_fname}" # f"\n {parent_title} → {new_title}\n {parent_artist} → {new_artist}" # ) # process_track(new_path, sim_name, new_title, new_artist, bitrate) # continue # tags_not_name.append(f"Rename {os.path.basename(sim_name)} → {new_fname}") # process_track(new_path, sim_name, new_title, new_artist, bitrate) print(f"Name and tags match ({len(name_and_tags)}):") # print(" \n".join(name_and_tags)) # print() # print(f"Name but not tags match ({len(name_not_tags)}):") # print(" \n".join(name_not_tags)) # print() print(f"Tags but not name match ({len(tags_not_name)}):") # print(" \n".join(tags_not_name)) # print() # print(f"Multiple similar names ({len(multiple_similar)}):") # print(" \n".join(multiple_similar)) # print() # print(f"Possibles: ({len(possibles)}):") # print(" \n".join(possibles)) # print() # print(f"No match ({len(no_match)}):") # print(" \n".join(no_match)) # print() # print(f"Name and tags match ({len(name_and_tags)}):") # print(f"Name but not tags match ({len(name_not_tags)}):") # print(f"Tags but not name match ({len(tags_not_name)}):") # print(f"Multiple similar names ({len(multiple_similar)}):") # print(f"Possibles: ({len(possibles)}):") # print(f"No match ({len(no_match)}):") print(f"No matches: {no_match}") def process_track(src, dst, title, artist, bitrate): new_path = os.path.join(os.path.dirname(dst), os.path.basename(src)) print( f"process_track:\n {src=}\n {dst=}\n {title=}, {artist=}\n" ) if not do_processing: return with Session() as session: track = Tracks.get_by_path(session, dst) if track: track.title = title track.artist = artist track.path = new_path track.bitrate = bitrate try: session.commit() except IntegrityError: # https://jira.mariadb.org/browse/MDEV-29345 workaround session.rollback() track.title = title track.artist = artist track.path = "DUMMY" track.bitrate = bitrate session.commit() track.path = new_path session.commit() print(f"os.unlink({dst}") print(f"shutil.move({src}, {new_path}") os.unlink(dst) shutil.move(src, new_path) track = Tracks.get_by_path(session, new_path) if track: track.rescan(session) else: print(f"Can't find copied track {src=}, {dst=}") main()