replace_files.py functionality now in musicmuster
This commit is contained in:
parent
5e4277646b
commit
fe8537c9c1
@ -1,272 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
#
|
||||
# Script to replace existing files in parent directory. Typical usage:
|
||||
# the current directory contains a "better" version of the file than the
|
||||
# parent (eg, bettet bitrate).
|
||||
|
||||
# Standard library imports
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
from typing import List
|
||||
|
||||
# PyQt imports
|
||||
|
||||
# Third party imports
|
||||
import pydymenu # type: ignore
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
from sqlalchemy.orm.session import Session
|
||||
|
||||
# App imports
|
||||
from helpers import (
|
||||
get_tags,
|
||||
set_track_metadata,
|
||||
)
|
||||
from models import db, Tracks
|
||||
|
||||
# ###################### SETTINGS #########################
|
||||
process_name_and_tags_matches = True
|
||||
process_tag_matches = True
|
||||
do_processing = True
|
||||
process_no_matches = True
|
||||
|
||||
source_dir = "/home/kae/music/Singles/tmp"
|
||||
parent_dir = os.path.dirname(source_dir)
|
||||
# #########################################################
|
||||
|
||||
name_and_tags: List[str] = []
|
||||
tags_not_name: List[str] = []
|
||||
# multiple_similar: List[str] = []
|
||||
# possibles: List[str] = []
|
||||
no_match: int = 0
|
||||
|
||||
|
||||
def main():
|
||||
global no_match
|
||||
|
||||
# We only want to run this against the production database because
|
||||
# we will affect files in the common pool of tracks used by all
|
||||
# databases
|
||||
if "musicmuster_prod" not in os.environ.get("ALCHEMICAL_DATABASE_URI"):
|
||||
response = input("Not on production database - c to continue: ")
|
||||
if response != "c":
|
||||
sys.exit(0)
|
||||
|
||||
# Sanity check
|
||||
assert source_dir != parent_dir
|
||||
|
||||
# Scan parent directory
|
||||
with db.Session() as session:
|
||||
all_tracks = Tracks.get_all(session)
|
||||
parent_tracks = [a for a in all_tracks if parent_dir in a.path]
|
||||
parent_fnames = [os.path.basename(a.path) for a in parent_tracks]
|
||||
# Create a dictionary of parent paths with their titles and
|
||||
# artists
|
||||
parents = {}
|
||||
for t in parent_tracks:
|
||||
parents[t.path] = {"title": t.title, "artist": t.artist}
|
||||
titles_to_path = {}
|
||||
artists_to_path = {}
|
||||
for k, v in parents.items():
|
||||
try:
|
||||
titles_to_path[v["title"].lower()] = k
|
||||
artists_to_path[v["artist"].lower()] = k
|
||||
except AttributeError:
|
||||
continue
|
||||
|
||||
for new_fname in os.listdir(source_dir):
|
||||
new_path = os.path.join(source_dir, new_fname)
|
||||
if not os.path.isfile(new_path):
|
||||
continue
|
||||
new_tags = get_tags(new_path)
|
||||
new_title = new_tags["title"]
|
||||
if not new_title:
|
||||
print(f"{new_fname} does not have a title tag")
|
||||
sys.exit(1)
|
||||
new_artist = new_tags["artist"]
|
||||
if not new_artist:
|
||||
print(f"{new_fname} does not have an artist tag")
|
||||
sys.exit(1)
|
||||
bitrate = new_tags["bitrate"]
|
||||
|
||||
# If same filename exists in parent direcory, check tags
|
||||
parent_path = os.path.join(parent_dir, new_fname)
|
||||
if os.path.exists(parent_path):
|
||||
parent_tags = get_tags(parent_path)
|
||||
parent_title = parent_tags["title"]
|
||||
parent_artist = parent_tags["artist"]
|
||||
if (str(parent_title).lower() == str(new_title).lower()) and (
|
||||
str(parent_artist).lower() == str(new_artist).lower()
|
||||
):
|
||||
name_and_tags.append(
|
||||
f" {new_fname=}, {parent_title} → {new_title}, "
|
||||
f" {parent_artist} → {new_artist}"
|
||||
)
|
||||
if process_name_and_tags_matches:
|
||||
process_track(new_path, parent_path, new_title, new_artist, bitrate)
|
||||
continue
|
||||
|
||||
# Check for matching tags although filename is different
|
||||
if new_title.lower() in titles_to_path:
|
||||
possible_path = titles_to_path[new_title.lower()]
|
||||
if parents[possible_path]["artist"].lower() == new_artist.lower():
|
||||
# print(
|
||||
# f"title={new_title}, artist={new_artist}:\n"
|
||||
# f" {new_path} → {parent_path}"
|
||||
# )
|
||||
tags_not_name.append(
|
||||
f"title={new_title}, artist={new_artist}:\n"
|
||||
f" {new_path} → {parent_path}"
|
||||
)
|
||||
if process_tag_matches:
|
||||
process_track(
|
||||
new_path, possible_path, new_title, new_artist, bitrate
|
||||
)
|
||||
continue
|
||||
else:
|
||||
no_match += 1
|
||||
|
||||
else:
|
||||
no_match += 1
|
||||
|
||||
# Try to find a near match
|
||||
|
||||
if process_no_matches:
|
||||
prompt = f"file={new_fname}\n title={new_title}\n artist={new_artist}: "
|
||||
# Use fzf to search
|
||||
choice = pydymenu.rofi(parent_fnames, prompt=prompt)
|
||||
if choice:
|
||||
old_file = os.path.join(parent_dir, choice[0])
|
||||
oldtags = get_tags(old_file)
|
||||
old_title = oldtags["title"]
|
||||
old_artist = oldtags["artist"]
|
||||
print()
|
||||
print(f" File name will change {choice[0]}")
|
||||
print(f" → {new_fname}")
|
||||
print()
|
||||
print(f" Title tag will change {old_title}")
|
||||
print(f" → {new_title}")
|
||||
print()
|
||||
print(f" Artist tag will change {old_artist}")
|
||||
print(f" → {new_artist}")
|
||||
print()
|
||||
data = input("Go ahead (y to accept)? ")
|
||||
if data == "y":
|
||||
process_track(new_path, old_file, new_title, new_artist, bitrate)
|
||||
continue
|
||||
if data == "q":
|
||||
sys.exit(0)
|
||||
else:
|
||||
continue
|
||||
# else:
|
||||
# no_match.append(f"{new_fname}, {new_title=}, {new_artist=}")
|
||||
# continue
|
||||
|
||||
# if match_count > 1:
|
||||
# multiple_similar.append(new_fname + "\n " + "\n ".join(matches))
|
||||
# if match_count <= 26 and process_multiple_matches:
|
||||
# print(f"\n file={new_fname}\n title={new_title}\n artist={new_artist}\n")
|
||||
# d = {}
|
||||
# while True:
|
||||
# for i, match in enumerate(matches):
|
||||
# d[i] = match
|
||||
# for k, v in d.items():
|
||||
# print(f"{k}: {v}")
|
||||
# data = input("pick one, return to quit: ")
|
||||
# if data == "":
|
||||
# break
|
||||
# try:
|
||||
# key = int(data)
|
||||
# except ValueError:
|
||||
# continue
|
||||
# if key in d:
|
||||
# dst = d[key]
|
||||
# process_track(new_path, dst, new_title, new_artist, bitrate)
|
||||
# break
|
||||
# else:
|
||||
# continue
|
||||
# continue # from break after testing for "" in data
|
||||
# # One match, check tags
|
||||
# sim_name = matches[0]
|
||||
# p = get_tags(sim_name)
|
||||
# parent_title = p['title']
|
||||
# parent_artist = p['artist']
|
||||
# if (
|
||||
# (str(parent_title).lower() != str(new_title).lower()) or
|
||||
# (str(parent_artist).lower() != str(new_artist).lower())
|
||||
# ):
|
||||
# possibles.append(
|
||||
# f"File: {os.path.basename(sim_name)} → {new_fname}"
|
||||
# f"\n {parent_title} → {new_title}\n {parent_artist} → {new_artist}"
|
||||
# )
|
||||
# process_track(new_path, sim_name, new_title, new_artist, bitrate)
|
||||
# continue
|
||||
# tags_not_name.append(f"Rename {os.path.basename(sim_name)} → {new_fname}")
|
||||
# process_track(new_path, sim_name, new_title, new_artist, bitrate)
|
||||
|
||||
print(f"Name and tags match ({len(name_and_tags)}):")
|
||||
# print(" \n".join(name_and_tags))
|
||||
# print()
|
||||
|
||||
# print(f"Name but not tags match ({len(name_not_tags)}):")
|
||||
# print(" \n".join(name_not_tags))
|
||||
# print()
|
||||
|
||||
print(f"Tags but not name match ({len(tags_not_name)}):")
|
||||
# print(" \n".join(tags_not_name))
|
||||
# print()
|
||||
|
||||
# print(f"Multiple similar names ({len(multiple_similar)}):")
|
||||
# print(" \n".join(multiple_similar))
|
||||
# print()
|
||||
|
||||
# print(f"Possibles: ({len(possibles)}):")
|
||||
# print(" \n".join(possibles))
|
||||
# print()
|
||||
|
||||
# print(f"No match ({len(no_match)}):")
|
||||
# print(" \n".join(no_match))
|
||||
# print()
|
||||
|
||||
# print(f"Name and tags match ({len(name_and_tags)}):")
|
||||
# print(f"Name but not tags match ({len(name_not_tags)}):")
|
||||
# print(f"Tags but not name match ({len(tags_not_name)}):")
|
||||
# print(f"Multiple similar names ({len(multiple_similar)}):")
|
||||
# print(f"Possibles: ({len(possibles)}):")
|
||||
# print(f"No match ({len(no_match)}):")
|
||||
print(f"No matches: {no_match}")
|
||||
|
||||
|
||||
def process_track(src, dst, title, artist, bitrate):
|
||||
new_path = os.path.join(os.path.dirname(dst), os.path.basename(src))
|
||||
print(f"process_track:\n {src=}\n {dst=}\n {title=}, {artist=}\n")
|
||||
|
||||
if not do_processing:
|
||||
return
|
||||
|
||||
with db.Session() as session:
|
||||
track = Tracks.get_by_path(session, dst)
|
||||
if track:
|
||||
# Update path, but workaround MariaDB bug
|
||||
track.path = new_path
|
||||
try:
|
||||
session.commit()
|
||||
except IntegrityError:
|
||||
# https://jira.mariadb.org/browse/MDEV-29345 workaround
|
||||
session.rollback()
|
||||
track.path = "DUMMY"
|
||||
session.commit()
|
||||
track.path = new_path
|
||||
session.commit()
|
||||
|
||||
print(f"os.unlink({dst}")
|
||||
print(f"shutil.move({src}, {new_path}")
|
||||
|
||||
os.unlink(dst)
|
||||
shutil.move(src, new_path)
|
||||
|
||||
# Update track metadata
|
||||
set_track_metadata(track)
|
||||
|
||||
|
||||
main()
|
||||
Loading…
Reference in New Issue
Block a user