diff --git a/app/replace_files.py b/app/replace_files.py deleted file mode 100755 index 4f545b7..0000000 --- a/app/replace_files.py +++ /dev/null @@ -1,272 +0,0 @@ -#!/usr/bin/env python -# -# Script to replace existing files in parent directory. Typical usage: -# the current directory contains a "better" version of the file than the -# parent (eg, bettet bitrate). - -# Standard library imports -import os -import shutil -import sys -from typing import List - -# PyQt imports - -# Third party imports -import pydymenu # type: ignore -from sqlalchemy.exc import IntegrityError -from sqlalchemy.orm.session import Session - -# App imports -from helpers import ( - get_tags, - set_track_metadata, -) -from models import db, Tracks - -# ###################### SETTINGS ######################### -process_name_and_tags_matches = True -process_tag_matches = True -do_processing = True -process_no_matches = True - -source_dir = "/home/kae/music/Singles/tmp" -parent_dir = os.path.dirname(source_dir) -# ######################################################### - -name_and_tags: List[str] = [] -tags_not_name: List[str] = [] -# multiple_similar: List[str] = [] -# possibles: List[str] = [] -no_match: int = 0 - - -def main(): - global no_match - - # We only want to run this against the production database because - # we will affect files in the common pool of tracks used by all - # databases - if "musicmuster_prod" not in os.environ.get("ALCHEMICAL_DATABASE_URI"): - response = input("Not on production database - c to continue: ") - if response != "c": - sys.exit(0) - - # Sanity check - assert source_dir != parent_dir - - # Scan parent directory - with db.Session() as session: - all_tracks = Tracks.get_all(session) - parent_tracks = [a for a in all_tracks if parent_dir in a.path] - parent_fnames = [os.path.basename(a.path) for a in parent_tracks] - # Create a dictionary of parent paths with their titles and - # artists - parents = {} - for t in parent_tracks: - parents[t.path] = {"title": t.title, "artist": t.artist} - titles_to_path = {} - artists_to_path = {} - for k, v in parents.items(): - try: - titles_to_path[v["title"].lower()] = k - artists_to_path[v["artist"].lower()] = k - except AttributeError: - continue - - for new_fname in os.listdir(source_dir): - new_path = os.path.join(source_dir, new_fname) - if not os.path.isfile(new_path): - continue - new_tags = get_tags(new_path) - new_title = new_tags["title"] - if not new_title: - print(f"{new_fname} does not have a title tag") - sys.exit(1) - new_artist = new_tags["artist"] - if not new_artist: - print(f"{new_fname} does not have an artist tag") - sys.exit(1) - bitrate = new_tags["bitrate"] - - # If same filename exists in parent direcory, check tags - parent_path = os.path.join(parent_dir, new_fname) - if os.path.exists(parent_path): - parent_tags = get_tags(parent_path) - parent_title = parent_tags["title"] - parent_artist = parent_tags["artist"] - if (str(parent_title).lower() == str(new_title).lower()) and ( - str(parent_artist).lower() == str(new_artist).lower() - ): - name_and_tags.append( - f" {new_fname=}, {parent_title} → {new_title}, " - f" {parent_artist} → {new_artist}" - ) - if process_name_and_tags_matches: - process_track(new_path, parent_path, new_title, new_artist, bitrate) - continue - - # Check for matching tags although filename is different - if new_title.lower() in titles_to_path: - possible_path = titles_to_path[new_title.lower()] - if parents[possible_path]["artist"].lower() == new_artist.lower(): - # print( - # f"title={new_title}, artist={new_artist}:\n" - # f" {new_path} → {parent_path}" - # ) - tags_not_name.append( - f"title={new_title}, artist={new_artist}:\n" - f" {new_path} → {parent_path}" - ) - if process_tag_matches: - process_track( - new_path, possible_path, new_title, new_artist, bitrate - ) - continue - else: - no_match += 1 - - else: - no_match += 1 - - # Try to find a near match - - if process_no_matches: - prompt = f"file={new_fname}\n title={new_title}\n artist={new_artist}: " - # Use fzf to search - choice = pydymenu.rofi(parent_fnames, prompt=prompt) - if choice: - old_file = os.path.join(parent_dir, choice[0]) - oldtags = get_tags(old_file) - old_title = oldtags["title"] - old_artist = oldtags["artist"] - print() - print(f" File name will change {choice[0]}") - print(f" → {new_fname}") - print() - print(f" Title tag will change {old_title}") - print(f" → {new_title}") - print() - print(f" Artist tag will change {old_artist}") - print(f" → {new_artist}") - print() - data = input("Go ahead (y to accept)? ") - if data == "y": - process_track(new_path, old_file, new_title, new_artist, bitrate) - continue - if data == "q": - sys.exit(0) - else: - continue - # else: - # no_match.append(f"{new_fname}, {new_title=}, {new_artist=}") - # continue - - # if match_count > 1: - # multiple_similar.append(new_fname + "\n " + "\n ".join(matches)) - # if match_count <= 26 and process_multiple_matches: - # print(f"\n file={new_fname}\n title={new_title}\n artist={new_artist}\n") - # d = {} - # while True: - # for i, match in enumerate(matches): - # d[i] = match - # for k, v in d.items(): - # print(f"{k}: {v}") - # data = input("pick one, return to quit: ") - # if data == "": - # break - # try: - # key = int(data) - # except ValueError: - # continue - # if key in d: - # dst = d[key] - # process_track(new_path, dst, new_title, new_artist, bitrate) - # break - # else: - # continue - # continue # from break after testing for "" in data - # # One match, check tags - # sim_name = matches[0] - # p = get_tags(sim_name) - # parent_title = p['title'] - # parent_artist = p['artist'] - # if ( - # (str(parent_title).lower() != str(new_title).lower()) or - # (str(parent_artist).lower() != str(new_artist).lower()) - # ): - # possibles.append( - # f"File: {os.path.basename(sim_name)} → {new_fname}" - # f"\n {parent_title} → {new_title}\n {parent_artist} → {new_artist}" - # ) - # process_track(new_path, sim_name, new_title, new_artist, bitrate) - # continue - # tags_not_name.append(f"Rename {os.path.basename(sim_name)} → {new_fname}") - # process_track(new_path, sim_name, new_title, new_artist, bitrate) - - print(f"Name and tags match ({len(name_and_tags)}):") - # print(" \n".join(name_and_tags)) - # print() - - # print(f"Name but not tags match ({len(name_not_tags)}):") - # print(" \n".join(name_not_tags)) - # print() - - print(f"Tags but not name match ({len(tags_not_name)}):") - # print(" \n".join(tags_not_name)) - # print() - - # print(f"Multiple similar names ({len(multiple_similar)}):") - # print(" \n".join(multiple_similar)) - # print() - - # print(f"Possibles: ({len(possibles)}):") - # print(" \n".join(possibles)) - # print() - - # print(f"No match ({len(no_match)}):") - # print(" \n".join(no_match)) - # print() - - # print(f"Name and tags match ({len(name_and_tags)}):") - # print(f"Name but not tags match ({len(name_not_tags)}):") - # print(f"Tags but not name match ({len(tags_not_name)}):") - # print(f"Multiple similar names ({len(multiple_similar)}):") - # print(f"Possibles: ({len(possibles)}):") - # print(f"No match ({len(no_match)}):") - print(f"No matches: {no_match}") - - -def process_track(src, dst, title, artist, bitrate): - new_path = os.path.join(os.path.dirname(dst), os.path.basename(src)) - print(f"process_track:\n {src=}\n {dst=}\n {title=}, {artist=}\n") - - if not do_processing: - return - - with db.Session() as session: - track = Tracks.get_by_path(session, dst) - if track: - # Update path, but workaround MariaDB bug - track.path = new_path - try: - session.commit() - except IntegrityError: - # https://jira.mariadb.org/browse/MDEV-29345 workaround - session.rollback() - track.path = "DUMMY" - session.commit() - track.path = new_path - session.commit() - - print(f"os.unlink({dst}") - print(f"shutil.move({src}, {new_path}") - - os.unlink(dst) - shutil.move(src, new_path) - - # Update track metadata - set_track_metadata(track) - - -main()