musicmuster/app/utilities.py
Keith Edmunds 1888c7f00d Fix cron job
Now only reports errors but does not attempt to fix them.

Fixes #114
2022-06-05 15:18:45 +01:00

293 lines
8.9 KiB
Python
Executable File

#!/usr/bin/env python
import argparse
import os
import shutil
import tempfile
import helpers
from config import Config
from helpers import (
fade_point,
get_audio_segment,
get_tags,
leading_silence,
trailing_silence,
)
from log import DEBUG, INFO
from models import Notes, Playdates, Session, Tracks
from mutagen.flac import FLAC
from mutagen.mp3 import MP3
from pydub import effects
# Globals (I know)
messages = []
def main():
"""Main loop"""
DEBUG("Starting")
p = argparse.ArgumentParser()
# Only allow one option to be specified
group = p.add_mutually_exclusive_group()
group.add_argument('-u', '--update',
action="store_true", dest="update",
default=False, help="Update database")
group.add_argument('-f', '--full-update',
action="store_true", dest="full_update",
default=False, help="Update database")
args = p.parse_args()
# Run as required
if args.update:
DEBUG("Updating database")
with Session() as session:
update_db(session)
elif args.full_update:
DEBUG("Full update of database")
with Session() as session:
full_update_db(session)
else:
INFO("No action specified")
DEBUG("Finished")
def create_track_from_file(session, path, normalise=None, tags=None):
"""
Create track in database from passed path, or update database entry
if path already in database.
Return track.
"""
if not tags:
t = get_tags(path)
else:
t = tags
track = Tracks.get_or_create(session, path)
track.title = t['title']
track.artist = t['artist']
audio = get_audio_segment(path)
track.duration = len(audio)
track.start_gap = leading_silence(audio)
track.fade_at = round(fade_point(audio) / 1000,
Config.MILLISECOND_SIGFIGS) * 1000
track.silence_at = round(trailing_silence(audio) / 1000,
Config.MILLISECOND_SIGFIGS) * 1000
track.mtime = os.path.getmtime(path)
session.commit()
if normalise or normalise is None and Config.NORMALISE_ON_IMPORT:
# Check type
ftype = os.path.splitext(path)[1][1:]
if ftype not in ['mp3', 'flac']:
INFO(f"File type {ftype} not implemented")
return track
# Get current file gid, uid and permissions
stats = os.stat(path)
try:
# Copy original file
fd, temp_path = tempfile.mkstemp()
shutil.copyfile(path, temp_path)
except Exception as err:
DEBUG(f"songdb.create_track_from_file({path}): err1: {repr(err)}")
return
# Overwrite original file with normalised output
normalised = effects.normalize(audio)
try:
normalised.export(path, format=os.path.splitext(path)[1][1:])
# Fix up permssions and ownership
os.chown(path, stats.st_uid, stats.st_gid)
os.chmod(path, stats.st_mode)
# Copy tags
if ftype == 'flac':
tag_handler = FLAC
elif ftype == 'mp3':
tag_handler = MP3
else:
return track
src = tag_handler(temp_path)
dst = tag_handler(path)
for tag in src:
dst[tag] = src[tag]
dst.save()
except Exception as err:
DEBUG(f"songdb.create_track_from_file({path}): err2: {repr(err)}")
# Restore original file
shutil.copyfile(path, temp_path)
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
return track
def full_update_db(session):
"""Rescan all entries in database"""
def log(msg):
INFO(f"full_update_db(): {msg}")
def check_change(track_id, title, attribute, old, new):
if new > (old * 1.1) or new < (old * 0.9):
log(
"\n"
f"track[{track_id}] ({title}) "
f"{attribute} updated from {old} to {new}"
)
# Start with normal update to add new tracks and remove any missing
# files
log("update_db()")
update_db(session)
# Now update track length, silence and fade for every track in
# database
tracks = Tracks.get_all_tracks(session)
total_tracks = len(tracks)
log(f"Processing {total_tracks} tracks")
track_count = 0
for track in tracks:
track_count += 1
print(f"\rTrack {track_count} of {total_tracks}", end='')
# Sanity check
tag = get_music_info(track.path)
if not tag['title']:
log(f"track[{track.id}] {track.title=}: No tag title")
continue
if not tag['artist']:
log(f"track[{track.id}] {track.artist=}: No tag artist")
continue
# Update title and artist
if track.title != tag['title']:
track.title = tag['title']
if track.artist != tag['artist']:
track.artist = tag['artist']
# Update numbers; log if more than 10% different
duration = int(round(
tag['duration'], Config.MILLISECOND_SIGFIGS) * 1000)
check_change(track.id, track.title, "duration", track.duration,
duration)
track.duration = duration
audio = get_audio_segment(track.path)
start_gap = leading_silence(audio)
check_change(track.id, track.title, "start_gap", track.start_gap,
start_gap)
track.start_gap = start_gap
fade_at = fade_point(audio)
check_change(track.id, track.title, "fade_at", track.fade_at,
fade_at)
track.fade_at = fade_at
silence_at = trailing_silence(audio)
check_change(track.id, track.title, "silence_at", track.silence_at,
silence_at)
track.silence_at = silence_at
session.commit()
def update_db(session):
"""
Repopulate database
"""
# Search for tracks that are in the music directory but not the datebase
# Check all paths in database exist
# If issues found, write to stdout but do not try to resolve them
db_paths = set(Tracks.get_all_paths(session))
os_paths_list = []
for root, dirs, files in os.walk(Config.ROOT):
for f in files:
path = os.path.join(root, f)
ext = os.path.splitext(f)[1]
if ext in [".flac", ".mp3"]:
os_paths_list.append(path)
os_paths = set(os_paths_list)
# Find any files in music directory that are not in database
files_not_in_db = list(os_paths - db_paths)
# Find paths in database missing in music directory
paths_not_found = []
missing_file_count = 0
more_files_to_report = False
for path in list(db_paths - os_paths):
if missing_file_count >= Config.MAX_MISSING_FILES_TO_REPORT:
more_files_to_report = True
break
missing_file_count += 1
track = Tracks.get_by_path(session, path)
if not track:
ERROR(f"update_db: {path} not found in db")
continue
paths_not_found.append(track)
# Output messages (so if running via cron, these will get sent to
# user)
if files_not_in_db:
print("Files in music directory but not in database")
print("--------------------------------------------")
print("\n".join(files_not_in_db))
print("\n")
if paths_not_found:
print("Invalid paths in database")
print("-------------------------")
for t in paths_not_found:
print(f"""
Track ID: {t.id}
Path: {t.path}
Title: {t.title}
Artist: {t.artist}
""")
if more_files_to_report:
print("There were more paths than listed that were not found")
# Spike
#
# # Manage tracks listed in database but where path is invalid
# DEBUG(f"Invalid {path=} in database", True)
# track = Tracks.get_by_path(session, path)
# messages.append(f"Remove from database: {path=} {track=}")
#
# # Remove references from Playdates
# Playdates.remove_track(session, track.id)
#
# # Replace playlist entries with a note
# note_txt = (
# f"File removed: {track.title=}, {track.artist=}, "
# f"{track.path=}"
# )
# for playlist_track in track.playlists:
# row = playlist_track.row
# # Remove playlist entry
# DEBUG(f"Remove {row=} from {playlist_track.playlist_id}", True)
# playlist_track.playlist.remove_track(session, row)
# # Create note
# DEBUG(f"Add note at {row=} to {playlist_track.playlist_id=}", True)
# Notes(session, playlist_track.playlist_id, row, note_txt)
#
# # Remove Track entry pointing to invalid path
# Tracks.remove_by_path(session, path)
if __name__ == '__main__' and '__file__' in globals():
main()