musicmuster/app/replace_files.py
2022-08-22 19:27:47 +01:00

240 lines
7.8 KiB
Python
Executable File

#!/usr/bin/env python
#
# Script to replace existing files in parent directory. Typical usage:
# the current directory contains a "better" version of the file than the
# parent (eg, bettet bitrate).
#
# Actions:
#
# - check that the same filename is present in the parent directory
# - check that the artist and title tags are the same
# - append ".bak" to the version in the parent directory
# - move file to parent directory
# - normalise file
# - update duration, start_gap, fade_at, silence_at, mtime in database
import glob
import os
import pydymenu
import shutil
import sys
from helpers import (
fade_point,
get_audio_segment,
get_tags,
leading_silence,
trailing_silence,
)
from models import Tracks
from dbconfig import Session
from thefuzz import process # type: ignore
from sqlalchemy.exc import IntegrityError
from typing import List
# ###################### SETTINGS #########################
process_multiple_matches = True
do_processing = True
process_no_matches = True
# #########################################################
def insensitive_glob(pattern):
def either(c):
return '[%s%s]' % (c.lower(), c.upper()) if c.isalpha() else c
return glob.glob(''.join(map(either, pattern)))
# Check file of same name exists in parent directory
source_dir = '/home/kae/music/Singles/tmp' # os.getcwd()
parent_dir = os.path.dirname(source_dir)
assert source_dir != parent_dir
name_and_tags: List[str] = []
name_not_tags: List[str] = []
tags_not_name: List[str] = []
multiple_similar: List[str] = []
no_match: List[str] = []
possibles: List[str] = []
print(f"{source_dir=}, {parent_dir=}")
def main():
if 'musicmuster_prod' not in os.environ.get('MM_DB'):
response = input("Not on production database - c to continue: ")
if response != "c":
sys.exit(0)
tracks = os.listdir(parent_dir)
for fname in os.listdir(source_dir):
parent_file = os.path.join(parent_dir, fname)
new_file = os.path.join(source_dir, fname)
us = get_tags(new_file)
us_t = us['title']
us_a = us['artist']
if os.path.exists(parent_file):
# File exists, check tags
p = get_tags(parent_file)
p_t = p['title']
p_a = p['artist']
if (
(str(p_t).lower() != str(us_t).lower()) or
(str(p_a).lower() != str(us_a).lower())
):
name_not_tags.append(
f" {fname=}, {p_t}{us_t}, {p_a}{us_a}")
process_track(new_file, parent_file, us_t, us_a)
continue
name_and_tags.append(new_file)
process_track(new_file, parent_file, us_t, us_a)
continue
# Try to find a near match
stem = fname.split(".")[0]
matches = insensitive_glob(os.path.join(parent_dir, stem) + '*')
match_count = len(matches)
if match_count == 0:
if process_no_matches:
prompt = f"\n file={fname}\n title={us_t}\n artist={us_a}: "
# Use fzf to search
choice = pydymenu.fzf(tracks, prompt)
if choice:
old_file = os.path.join(parent_dir, choice[0])
oldtags = get_tags(old_file)
old_title = oldtags['title']
old_artist = oldtags['artist']
print()
print(f" File name will change {choice[0]}")
print(f"{fname}")
print()
print(f" Title tag will change {old_title}")
print(f"{us_t}")
print()
print(f" Artist tag will change {old_artist}")
print(f"{us_a}")
print()
data = input("Go ahead (y to accept)? ")
if data == "y":
process_track(new_file, old_file, us_t, us_a)
continue
else:
no_match.append(f"{fname}, {us_t=}, {us_a=}")
continue
no_match.append(f"{fname}, {us_t=}, {us_a=}")
continue
else:
no_match.append(f"{fname}, {us_t=}, {us_a=}")
continue
if match_count > 1:
multiple_similar.append(fname + "\n " + "\n ".join(matches))
if match_count <= 26 and process_multiple_matches:
print(f"\n file={fname}\n title={us_t}\n artist={us_a}\n")
d = {}
while True:
for i, match in enumerate(matches):
d[i] = match
for k, v in d.items():
print(f"{k}: {v}")
data = input("pick one, return to quit: ")
if data == "":
break
try:
key = int(data)
except ValueError:
continue
if key in d:
dst = d[key]
process_track(new_file, dst, us_t, us_a)
break
else:
continue
continue # from break after testing for "" in data
# One match, check tags
sim_name = matches[0]
p = get_tags(sim_name)
p_t = p['title']
p_a = p['artist']
if (
(str(p_t).lower() != str(us_t).lower()) or
(str(p_a).lower() != str(us_a).lower())
):
possibles.append(
f"File: {os.path.basename(sim_name)}{fname}"
f"\n {p_t}{us_t}\n {p_a}{us_a}"
)
process_track(new_file, sim_name, us_t, us_a)
continue
tags_not_name.append(f"Rename {os.path.basename(sim_name)}{fname}")
process_track(new_file, sim_name, us_t, us_a)
print(f"Name and tags match ({len(name_and_tags)}):")
# print(" \n".join(name_and_tags))
# print()
print(f"Name but not tags match ({len(name_not_tags)}):")
print(" \n".join(name_not_tags))
print()
print(f"Tags but not name match ({len(tags_not_name)}):")
# print(" \n".join(tags_not_name))
# print()
print(f"Multiple similar names ({len(multiple_similar)}):")
print(" \n".join(multiple_similar))
print()
print(f"Possibles: ({len(possibles)}):")
print(" \n".join(possibles))
print()
print(f"No match ({len(no_match)}):")
print(" \n".join(no_match))
print()
def process_track(src, dst, title, artist):
new_path = os.path.join(os.path.dirname(dst), os.path.basename(src))
print(
f"process_track:\n {src=}\n {new_path=}\n "
f"{dst=}\n {title=}, {artist=}\n"
)
if not do_processing:
return
with Session() as session:
track = Tracks.get_by_path(session, dst)
if track:
track.title = title
track.artist = artist
track.path = new_path
try:
session.commit()
except IntegrityError:
# https://jira.mariadb.org/browse/MDEV-29345 workaround
session.rollback()
track.title = title
track.artist = artist
track.path = "DUMMY"
session.commit()
track.path = new_path
session.commit()
print(f"os.unlink({dst}")
print(f"shutil.move({src}, {new_path}")
os.unlink(dst)
shutil.move(src, new_path)
track = Tracks.get_by_path(session, new_path)
if track:
track.rescan(session)
else:
print(f"Can't find copied track {src=}, {dst=}")
main()