#!/usr/bin/env python # # Script to replace existing files in parent directory. Typical usage: # the current directory contains a "better" version of the file than the # parent (eg, bettet bitrate). import os import pydymenu # type: ignore import shutil import sys from helpers import ( get_tags, set_track_metadata, ) from models import Tracks from dbconfig import Session from sqlalchemy.exc import IntegrityError from typing import List # ###################### SETTINGS ######################### process_name_and_tags_matches = True process_tag_matches = True do_processing = True process_no_matches = True source_dir = "/home/kae/music/Singles/tmp" parent_dir = os.path.dirname(source_dir) # ######################################################### name_and_tags: List[str] = [] tags_not_name: List[str] = [] # multiple_similar: List[str] = [] # possibles: List[str] = [] no_match: int = 0 def main(): global no_match # We only want to run this against the production database because # we will affect files in the common pool of tracks used by all # databases if "musicmuster_prod" not in os.environ.get("MM_DB"): response = input("Not on production database - c to continue: ") if response != "c": sys.exit(0) # Sanity check assert source_dir != parent_dir # Scan parent directory with Session() as session: all_tracks = Tracks.get_all(session) parent_tracks = [a for a in all_tracks if parent_dir in a.path] parent_fnames = [os.path.basename(a.path) for a in parent_tracks] # Create a dictionary of parent paths with their titles and # artists parents = {} for t in parent_tracks: parents[t.path] = {"title": t.title, "artist": t.artist} titles_to_path = {} artists_to_path = {} for k, v in parents.items(): try: titles_to_path[v["title"].lower()] = k artists_to_path[v["artist"].lower()] = k except AttributeError: continue for new_fname in os.listdir(source_dir): new_path = os.path.join(source_dir, new_fname) if not os.path.isfile(new_path): continue new_tags = get_tags(new_path) new_title = new_tags["title"] if not new_title: print(f"{new_fname} does not have a title tag") sys.exit(1) new_artist = new_tags["artist"] if not new_artist: print(f"{new_fname} does not have an artist tag") sys.exit(1) bitrate = new_tags["bitrate"] # If same filename exists in parent direcory, check tags parent_path = os.path.join(parent_dir, new_fname) if os.path.exists(parent_path): parent_tags = get_tags(parent_path) parent_title = parent_tags["title"] parent_artist = parent_tags["artist"] if (str(parent_title).lower() == str(new_title).lower()) and ( str(parent_artist).lower() == str(new_artist).lower() ): name_and_tags.append( f" {new_fname=}, {parent_title} → {new_title}, " f" {parent_artist} → {new_artist}" ) if process_name_and_tags_matches: process_track(new_path, parent_path, new_title, new_artist, bitrate) continue # Check for matching tags although filename is different if new_title.lower() in titles_to_path: possible_path = titles_to_path[new_title.lower()] if parents[possible_path]["artist"].lower() == new_artist.lower(): # print( # f"title={new_title}, artist={new_artist}:\n" # f" {new_path} → {parent_path}" # ) tags_not_name.append( f"title={new_title}, artist={new_artist}:\n" f" {new_path} → {parent_path}" ) if process_tag_matches: process_track( new_path, possible_path, new_title, new_artist, bitrate ) continue else: no_match += 1 else: no_match += 1 # Try to find a near match if process_no_matches: prompt = f"file={new_fname}\n title={new_title}\n artist={new_artist}: " # Use fzf to search choice = pydymenu.fzf(parent_fnames, prompt=prompt) if choice: old_file = os.path.join(parent_dir, choice[0]) oldtags = get_tags(old_file) old_title = oldtags["title"] old_artist = oldtags["artist"] print() print(f" File name will change {choice[0]}") print(f" → {new_fname}") print() print(f" Title tag will change {old_title}") print(f" → {new_title}") print() print(f" Artist tag will change {old_artist}") print(f" → {new_artist}") print() data = input("Go ahead (y to accept)? ") if data == "y": process_track(new_path, old_file, new_title, new_artist, bitrate) continue if data == "q": sys.exit(0) else: continue # else: # no_match.append(f"{new_fname}, {new_title=}, {new_artist=}") # continue # if match_count > 1: # multiple_similar.append(new_fname + "\n " + "\n ".join(matches)) # if match_count <= 26 and process_multiple_matches: # print(f"\n file={new_fname}\n title={new_title}\n artist={new_artist}\n") # d = {} # while True: # for i, match in enumerate(matches): # d[i] = match # for k, v in d.items(): # print(f"{k}: {v}") # data = input("pick one, return to quit: ") # if data == "": # break # try: # key = int(data) # except ValueError: # continue # if key in d: # dst = d[key] # process_track(new_path, dst, new_title, new_artist, bitrate) # break # else: # continue # continue # from break after testing for "" in data # # One match, check tags # sim_name = matches[0] # p = get_tags(sim_name) # parent_title = p['title'] # parent_artist = p['artist'] # if ( # (str(parent_title).lower() != str(new_title).lower()) or # (str(parent_artist).lower() != str(new_artist).lower()) # ): # possibles.append( # f"File: {os.path.basename(sim_name)} → {new_fname}" # f"\n {parent_title} → {new_title}\n {parent_artist} → {new_artist}" # ) # process_track(new_path, sim_name, new_title, new_artist, bitrate) # continue # tags_not_name.append(f"Rename {os.path.basename(sim_name)} → {new_fname}") # process_track(new_path, sim_name, new_title, new_artist, bitrate) print(f"Name and tags match ({len(name_and_tags)}):") # print(" \n".join(name_and_tags)) # print() # print(f"Name but not tags match ({len(name_not_tags)}):") # print(" \n".join(name_not_tags)) # print() print(f"Tags but not name match ({len(tags_not_name)}):") # print(" \n".join(tags_not_name)) # print() # print(f"Multiple similar names ({len(multiple_similar)}):") # print(" \n".join(multiple_similar)) # print() # print(f"Possibles: ({len(possibles)}):") # print(" \n".join(possibles)) # print() # print(f"No match ({len(no_match)}):") # print(" \n".join(no_match)) # print() # print(f"Name and tags match ({len(name_and_tags)}):") # print(f"Name but not tags match ({len(name_not_tags)}):") # print(f"Tags but not name match ({len(tags_not_name)}):") # print(f"Multiple similar names ({len(multiple_similar)}):") # print(f"Possibles: ({len(possibles)}):") # print(f"No match ({len(no_match)}):") print(f"No matches: {no_match}") def process_track(src, dst, title, artist, bitrate): new_path = os.path.join(os.path.dirname(dst), os.path.basename(src)) print(f"process_track:\n {src=}\n {dst=}\n {title=}, {artist=}\n") if not do_processing: return with Session() as session: track = Tracks.get_by_path(session, dst) if track: # Update path, but workaround MariaDB bug track.path = new_path try: session.commit() except IntegrityError: # https://jira.mariadb.org/browse/MDEV-29345 workaround session.rollback() track.path = "DUMMY" session.commit() track.path = new_path session.commit() print(f"os.unlink({dst}") print(f"shutil.move({src}, {new_path}") os.unlink(dst) shutil.move(src, new_path) # Update track metadata set_track_metadata(session, track) main()