Compare commits

..

4 Commits

Author SHA1 Message Date
Keith Edmunds
f851fdcafe First draft of rename_singles.py 2022-08-22 16:08:24 +01:00
Keith Edmunds
26358761e5 Default to no processing in replace_files.py 2022-08-22 16:07:44 +01:00
Keith Edmunds
6ce41d3314 Check replace_files is run against production db 2022-08-22 16:01:56 +01:00
Keith Edmunds
62c5fa178c Work around MariaDB bug in replace_files.py 2022-08-22 14:39:18 +01:00
3 changed files with 89 additions and 18 deletions

View File

@ -572,12 +572,15 @@ class Tracks(Base):
Return track with passed path, or None.
"""
return (
session.execute(
select(Tracks)
.where(Tracks.path == path)
).scalar_one()
)
try:
return (
session.execute(
select(Tracks)
.where(Tracks.path == path)
).scalar_one()
)
except NoResultFound:
return None
def rescan(self, session: Session) -> None:
"""

54
app/rename_singles.py Executable file
View File

@ -0,0 +1,54 @@
#!/usr/bin/env python
#
# Script to manage renaming existing files in given directory and
# propagating that change to database. Typical usage: renaming files
# from 'title.mp3' to title - artist.mp3'
#
# Actions:
#
# - record all filenames and inode numbers
# - external: rename the files
# - update records with new filenames for each inode number
# - update external database with new paths
import os
import sqlite3
PHASE = 2
# Check file of same name exists in parent directory
source_dir = '/home/kae/tmp/Singles' # os.getcwd()
db = "/home/kae/tmp/singles.sqlite"
def main():
with sqlite3.connect(db) as connection:
cursor = connection.cursor()
if PHASE == 1:
cursor.execute(
"CREATE TABLE IF NOT EXISTS mp3s "
"(inode INTEGER, oldname TEXT, newname TEXT)"
)
for fname in os.listdir(source_dir):
fullpath = os.path.join(source_dir, fname)
inode = os.stat(fullpath).st_ino
sql = f'INSERT INTO mp3s VALUES ({inode}, "{fname}", "")'
cursor.execute(sql)
if PHASE == 2:
for fname in os.listdir(source_dir):
fullpath = os.path.join(source_dir, fname)
inode = os.stat(fullpath).st_ino
sql = (
f'UPDATE mp3s SET newname = "{fname}" WHERE inode={inode}'
)
try:
cursor.execute(sql)
except sqlite3.OperationalError:
print(f"Error with {inode} -> {fname}")
cursor.close()
main()

View File

@ -16,6 +16,7 @@
import glob
import os
import shutil
import sys
from helpers import (
fade_point,
@ -27,15 +28,14 @@ from helpers import (
from models import Tracks
from dbconfig import Session
from thefuzz import process
from string import ascii_lowercase as letters
from thefuzz import process # type: ignore
from sqlalchemy.exc import IntegrityError
from typing import List
# ###################### SETTINGS #########################
process_multiple_matches = True
do_processing = True
process_no_matches = True
process_multiple_matches = False
do_processing = False
process_no_matches = False
# #########################################################
@ -61,6 +61,10 @@ print(f"{source_dir=}, {parent_dir=}")
def main():
if 'musicmuster_prod' not in os.environ.get('MM_DB'):
response = input("Not on production database - c to continue: ")
if response != "c":
sys.exit(0)
tracks = os.listdir(parent_dir)
for fname in os.listdir(source_dir):
parent_file = os.path.join(parent_dir, fname)
@ -202,7 +206,10 @@ def main():
def process_track(src, dst, title, artist):
new_path = os.path.join(os.path.dirname(dst), os.path.basename(src))
print(f"process_track:\n {src=}\n {new_path=}\n {dst=}\n {title=}, {artist=}\n")
print(
f"process_track:\n {src=}\n {new_path=}\n "
f"{dst=}\n {title=}, {artist=}\n"
)
if not do_processing:
return
@ -213,17 +220,24 @@ def process_track(src, dst, title, artist):
track.title = title
track.artist = artist
track.path = new_path
session.commit()
try:
session.commit()
except IntegrityError:
# https://jira.mariadb.org/browse/MDEV-29345 workaround
session.rollback()
track.title = title
track.artist = artist
track.path = "DUMMY"
session.commit()
track.path = new_path
session.commit()
print(f"os.unlink({dst}")
print(f"shutil.move({src}, {new_path}")
os.unlink(dst)
shutil.move(src, new_path)
try:
track = Tracks.get_by_path(session, new_path)
except:
import ipdb; ipdb.set_trace()
track = Tracks.get_by_path(session, new_path)
if track:
track.rescan(session)
else: