Compare commits

..

3 Commits

Author SHA1 Message Date
Keith Edmunds
6e4c386fe2 Manage deleting rows better
Fix incorrect updating of track_sequence row numbers.
2024-06-18 19:45:12 +01:00
Keith Edmunds
a4174e84ce Make use of get_tags more resilient 2024-06-17 16:31:23 +01:00
Keith Edmunds
fe8537c9c1 replace_files.py functionality now in musicmuster 2024-06-17 16:18:06 +01:00
4 changed files with 31 additions and 309 deletions

View File

@ -42,7 +42,6 @@ class MusicMusterSignals(QObject):
end_reset_model_signal = pyqtSignal(int)
next_track_changed_signal = pyqtSignal()
resize_rows_signal = pyqtSignal(int)
row_order_changed_signal = pyqtSignal(int)
search_songfacts_signal = pyqtSignal(str)
search_wikipedia_signal = pyqtSignal(str)
show_warning_signal = pyqtSignal(str, str)

View File

@ -84,7 +84,12 @@ class ReplaceFilesDialog(QDialog):
continue
rf = TrackFileData(new_file_path=new_file_path)
rf.tags = get_tags(new_file_path)
if not rf.tags["title"] or not rf.tags["artist"]:
if not (
"title" in rf.tags
and "artist" in rf.tags
and rf.tags["title"]
and rf.tags["artist"]
):
show_warning(
parent=self.main_window,
title="Error",
@ -177,7 +182,9 @@ class ReplaceFilesDialog(QDialog):
for cbbn in candidates_by_basename:
cbbn_tags = get_tags(cbbn.path)
if (
cbbn_tags["title"].lower() == new_path_title.lower()
"title" in cbbn_tags
and cbbn_tags["title"].lower() == new_path_title.lower()
and "artist" in cbbn_tags
and cbbn_tags["artist"].lower() == new_path_artist.lower()
):
match_track = cbbn
@ -197,10 +204,13 @@ class ReplaceFilesDialog(QDialog):
if candidates_by_title:
# Check artist tag
for cbt in candidates_by_title:
cbt_artist = get_tags(cbt.path)["artist"]
if cbt_artist.lower() == new_path_artist.lower():
match_track = cbt
break
try:
cbt_artist = get_tags(cbt.path)["artist"]
if cbt_artist.lower() == new_path_artist.lower():
match_track = cbt
break
except FileNotFoundError:
return None
return match_track

View File

@ -127,7 +127,6 @@ class PlaylistModel(QAbstractTableModel):
self.signals.begin_reset_model_signal.connect(self.begin_reset_model)
self.signals.end_reset_model_signal.connect(self.end_reset_model)
self.signals.row_order_changed_signal.connect(self.row_order_changed)
with db.Session() as session:
# Ensure row numbers in playlist are contiguous
@ -976,7 +975,6 @@ class PlaylistModel(QAbstractTableModel):
# Reset of model must come after session has been closed
self.reset_track_sequence_row_numbers()
self.signals.row_order_changed_signal.emit(to_playlist_id)
self.signals.end_reset_model_signal.emit(to_playlist_id)
self.update_track_times()
@ -1130,7 +1128,12 @@ class PlaylistModel(QAbstractTableModel):
def reset_track_sequence_row_numbers(self) -> None:
"""
Signal handler for when row ordering has changed
Signal handler for when row ordering has changed.
Example: row 4 is marked as next. Row 2 is deleted. The PlaylistRows table will
be correctly updated with change of row number, but track_sequence.next will still
contain row_number==4. This function fixes up the track_sequence row numbers by
looking up the plr_id and retrieving the row number from the database.
"""
log.debug("reset_track_sequence_row_numbers()")
@ -1138,20 +1141,15 @@ class PlaylistModel(QAbstractTableModel):
# Check the track_sequence next, current and previous plrs and
# update the row number
with db.Session() as session:
if track_sequence.next and track_sequence.next.row_number:
next_plr = session.get(PlaylistRows, track_sequence.next.row_number)
if next_plr:
track_sequence.next.row_number = next_plr.plr_rownum
if track_sequence.current and track_sequence.current.row_number:
now_plr = session.get(PlaylistRows, track_sequence.current.row_number)
if now_plr:
track_sequence.current.row_number = now_plr.plr_rownum
if track_sequence.previous and track_sequence.previous.row_number:
previous_plr = session.get(
PlaylistRows, track_sequence.previous.row_number
)
if previous_plr:
track_sequence.previous.row_number = previous_plr.plr_rownum
for ts in [track_sequence.next, track_sequence.current, track_sequence.previous]:
if ts and ts.row_number:
plr = session.get(PlaylistRows, ts.plr_id)
if plr and plr.plr_rownum != ts.row_number:
log.error(
"reset_track_sequence_row_numbers: "
f"from {ts=} to {plr.plr_rownum=}"
)
ts.row_number = plr.plr_rownum
self.update_track_times()
@ -1191,19 +1189,6 @@ class PlaylistModel(QAbstractTableModel):
return len(self.playlist_rows)
def row_order_changed(self, playlist_id: int) -> None:
"""
Signal handler for when row ordering has changed
"""
log.debug(f"row_order_changed({playlist_id=}) {self.playlist_id=}")
# Only action if this is for us
if playlist_id != self.playlist_id:
return
self.reset_track_sequence_row_numbers()
def selection_is_sortable(self, row_numbers: List[int]) -> bool:
"""
Return True if the selection is sortable. That means:

View File

@ -1,272 +0,0 @@
#!/usr/bin/env python
#
# Script to replace existing files in parent directory. Typical usage:
# the current directory contains a "better" version of the file than the
# parent (eg, bettet bitrate).
# Standard library imports
import os
import shutil
import sys
from typing import List
# PyQt imports
# Third party imports
import pydymenu # type: ignore
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.session import Session
# App imports
from helpers import (
get_tags,
set_track_metadata,
)
from models import db, Tracks
# ###################### SETTINGS #########################
process_name_and_tags_matches = True
process_tag_matches = True
do_processing = True
process_no_matches = True
source_dir = "/home/kae/music/Singles/tmp"
parent_dir = os.path.dirname(source_dir)
# #########################################################
name_and_tags: List[str] = []
tags_not_name: List[str] = []
# multiple_similar: List[str] = []
# possibles: List[str] = []
no_match: int = 0
def main():
global no_match
# We only want to run this against the production database because
# we will affect files in the common pool of tracks used by all
# databases
if "musicmuster_prod" not in os.environ.get("ALCHEMICAL_DATABASE_URI"):
response = input("Not on production database - c to continue: ")
if response != "c":
sys.exit(0)
# Sanity check
assert source_dir != parent_dir
# Scan parent directory
with db.Session() as session:
all_tracks = Tracks.get_all(session)
parent_tracks = [a for a in all_tracks if parent_dir in a.path]
parent_fnames = [os.path.basename(a.path) for a in parent_tracks]
# Create a dictionary of parent paths with their titles and
# artists
parents = {}
for t in parent_tracks:
parents[t.path] = {"title": t.title, "artist": t.artist}
titles_to_path = {}
artists_to_path = {}
for k, v in parents.items():
try:
titles_to_path[v["title"].lower()] = k
artists_to_path[v["artist"].lower()] = k
except AttributeError:
continue
for new_fname in os.listdir(source_dir):
new_path = os.path.join(source_dir, new_fname)
if not os.path.isfile(new_path):
continue
new_tags = get_tags(new_path)
new_title = new_tags["title"]
if not new_title:
print(f"{new_fname} does not have a title tag")
sys.exit(1)
new_artist = new_tags["artist"]
if not new_artist:
print(f"{new_fname} does not have an artist tag")
sys.exit(1)
bitrate = new_tags["bitrate"]
# If same filename exists in parent direcory, check tags
parent_path = os.path.join(parent_dir, new_fname)
if os.path.exists(parent_path):
parent_tags = get_tags(parent_path)
parent_title = parent_tags["title"]
parent_artist = parent_tags["artist"]
if (str(parent_title).lower() == str(new_title).lower()) and (
str(parent_artist).lower() == str(new_artist).lower()
):
name_and_tags.append(
f" {new_fname=}, {parent_title}{new_title}, "
f" {parent_artist}{new_artist}"
)
if process_name_and_tags_matches:
process_track(new_path, parent_path, new_title, new_artist, bitrate)
continue
# Check for matching tags although filename is different
if new_title.lower() in titles_to_path:
possible_path = titles_to_path[new_title.lower()]
if parents[possible_path]["artist"].lower() == new_artist.lower():
# print(
# f"title={new_title}, artist={new_artist}:\n"
# f" {new_path} → {parent_path}"
# )
tags_not_name.append(
f"title={new_title}, artist={new_artist}:\n"
f" {new_path}{parent_path}"
)
if process_tag_matches:
process_track(
new_path, possible_path, new_title, new_artist, bitrate
)
continue
else:
no_match += 1
else:
no_match += 1
# Try to find a near match
if process_no_matches:
prompt = f"file={new_fname}\n title={new_title}\n artist={new_artist}: "
# Use fzf to search
choice = pydymenu.rofi(parent_fnames, prompt=prompt)
if choice:
old_file = os.path.join(parent_dir, choice[0])
oldtags = get_tags(old_file)
old_title = oldtags["title"]
old_artist = oldtags["artist"]
print()
print(f" File name will change {choice[0]}")
print(f"{new_fname}")
print()
print(f" Title tag will change {old_title}")
print(f"{new_title}")
print()
print(f" Artist tag will change {old_artist}")
print(f"{new_artist}")
print()
data = input("Go ahead (y to accept)? ")
if data == "y":
process_track(new_path, old_file, new_title, new_artist, bitrate)
continue
if data == "q":
sys.exit(0)
else:
continue
# else:
# no_match.append(f"{new_fname}, {new_title=}, {new_artist=}")
# continue
# if match_count > 1:
# multiple_similar.append(new_fname + "\n " + "\n ".join(matches))
# if match_count <= 26 and process_multiple_matches:
# print(f"\n file={new_fname}\n title={new_title}\n artist={new_artist}\n")
# d = {}
# while True:
# for i, match in enumerate(matches):
# d[i] = match
# for k, v in d.items():
# print(f"{k}: {v}")
# data = input("pick one, return to quit: ")
# if data == "":
# break
# try:
# key = int(data)
# except ValueError:
# continue
# if key in d:
# dst = d[key]
# process_track(new_path, dst, new_title, new_artist, bitrate)
# break
# else:
# continue
# continue # from break after testing for "" in data
# # One match, check tags
# sim_name = matches[0]
# p = get_tags(sim_name)
# parent_title = p['title']
# parent_artist = p['artist']
# if (
# (str(parent_title).lower() != str(new_title).lower()) or
# (str(parent_artist).lower() != str(new_artist).lower())
# ):
# possibles.append(
# f"File: {os.path.basename(sim_name)} → {new_fname}"
# f"\n {parent_title} → {new_title}\n {parent_artist} → {new_artist}"
# )
# process_track(new_path, sim_name, new_title, new_artist, bitrate)
# continue
# tags_not_name.append(f"Rename {os.path.basename(sim_name)} → {new_fname}")
# process_track(new_path, sim_name, new_title, new_artist, bitrate)
print(f"Name and tags match ({len(name_and_tags)}):")
# print(" \n".join(name_and_tags))
# print()
# print(f"Name but not tags match ({len(name_not_tags)}):")
# print(" \n".join(name_not_tags))
# print()
print(f"Tags but not name match ({len(tags_not_name)}):")
# print(" \n".join(tags_not_name))
# print()
# print(f"Multiple similar names ({len(multiple_similar)}):")
# print(" \n".join(multiple_similar))
# print()
# print(f"Possibles: ({len(possibles)}):")
# print(" \n".join(possibles))
# print()
# print(f"No match ({len(no_match)}):")
# print(" \n".join(no_match))
# print()
# print(f"Name and tags match ({len(name_and_tags)}):")
# print(f"Name but not tags match ({len(name_not_tags)}):")
# print(f"Tags but not name match ({len(tags_not_name)}):")
# print(f"Multiple similar names ({len(multiple_similar)}):")
# print(f"Possibles: ({len(possibles)}):")
# print(f"No match ({len(no_match)}):")
print(f"No matches: {no_match}")
def process_track(src, dst, title, artist, bitrate):
new_path = os.path.join(os.path.dirname(dst), os.path.basename(src))
print(f"process_track:\n {src=}\n {dst=}\n {title=}, {artist=}\n")
if not do_processing:
return
with db.Session() as session:
track = Tracks.get_by_path(session, dst)
if track:
# Update path, but workaround MariaDB bug
track.path = new_path
try:
session.commit()
except IntegrityError:
# https://jira.mariadb.org/browse/MDEV-29345 workaround
session.rollback()
track.path = "DUMMY"
session.commit()
track.path = new_path
session.commit()
print(f"os.unlink({dst}")
print(f"shutil.move({src}, {new_path}")
os.unlink(dst)
shutil.move(src, new_path)
# Update track metadata
set_track_metadata(track)
main()