295 lines
9.3 KiB
Python
Executable File
295 lines
9.3 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
import argparse
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
|
|
from config import Config
|
|
from log import DEBUG, INFO
|
|
from models import Notes, Playdates, Session, Tracks
|
|
from mutagen.flac import FLAC
|
|
from mutagen.mp3 import MP3
|
|
from pydub import effects
|
|
|
|
# Globals (I know)
|
|
messages = []
|
|
|
|
|
|
def main():
|
|
"""Main loop"""
|
|
|
|
DEBUG("Starting")
|
|
|
|
p = argparse.ArgumentParser()
|
|
# Only allow one option to be specified
|
|
group = p.add_mutually_exclusive_group()
|
|
group.add_argument('-u', '--update',
|
|
action="store_true", dest="update",
|
|
default=False, help="Update database")
|
|
group.add_argument('-f', '--full-update',
|
|
action="store_true", dest="full_update",
|
|
default=False, help="Update database")
|
|
group.add_argument('-i', '--import', dest="fname", help="Input file")
|
|
args = p.parse_args()
|
|
|
|
# Run as required
|
|
if args.update:
|
|
DEBUG("Updating database")
|
|
with Session() as session:
|
|
update_db(session)
|
|
elif args.full_update:
|
|
DEBUG("Full update of database")
|
|
with Session() as session:
|
|
full_update_db(session)
|
|
elif args.fname:
|
|
fname = os.path.realpath(args.fname)
|
|
with Session() as session:
|
|
create_track_from_file(session, fname, interactive=True)
|
|
|
|
else:
|
|
INFO("No action specified")
|
|
|
|
DEBUG("Finished")
|
|
|
|
|
|
def create_track_from_file(session, path, interactive=False):
|
|
"""
|
|
Create track in database from passed path, or update database entry
|
|
if path already in database.
|
|
|
|
Return track.
|
|
"""
|
|
|
|
if interactive:
|
|
msg = f"Importing {path}"
|
|
INFO(msg)
|
|
INFO("-" * len(msg))
|
|
INFO("Get track info...")
|
|
t = get_music_info(path)
|
|
title = t['title']
|
|
artist = t['artist']
|
|
if interactive:
|
|
INFO(f" Title: \"{title}\"")
|
|
INFO(f" Artist: \"{artist}\"")
|
|
# Check for duplicate
|
|
tracks = Tracks.search_titles(session, title)
|
|
if interactive and tracks:
|
|
print("Found the following possible matches:")
|
|
for track in tracks:
|
|
print(f'"{track.title}" by {track.artist}')
|
|
response = input("Continue [c] or abort [a]?")
|
|
if not response:
|
|
return
|
|
if response[0].lower() not in ['c', 'y']:
|
|
return
|
|
track = Tracks.get_or_create(session, path)
|
|
track.title = title
|
|
track.artist = artist
|
|
track.duration = int(round(
|
|
t['duration'], Config.MILLISECOND_SIGFIGS) * 1000)
|
|
|
|
if interactive:
|
|
INFO("Parse for start, fade and silence...")
|
|
audio = get_audio_segment(path)
|
|
track.start_gap = leading_silence(audio)
|
|
track.fade_at = round(fade_point(audio) / 1000,
|
|
Config.MILLISECOND_SIGFIGS) * 1000
|
|
track.silence_at = round(trailing_silence(audio) / 1000,
|
|
Config.MILLISECOND_SIGFIGS) * 1000
|
|
track.mtime = os.path.getmtime(path)
|
|
session.commit()
|
|
|
|
if Config.NORMALISE_ON_IMPORT:
|
|
if interactive:
|
|
INFO("Normalise...")
|
|
# Check type
|
|
ftype = os.path.splitext(path)[1][1:]
|
|
if ftype not in ['mp3', 'flac']:
|
|
INFO(f"File type {ftype} not implemented")
|
|
return track
|
|
|
|
# Get current file gid, uid and permissions
|
|
stats = os.stat(path)
|
|
try:
|
|
# Copy original file
|
|
fd, temp_path = tempfile.mkstemp()
|
|
shutil.copyfile(path, temp_path)
|
|
except Exception as err:
|
|
DEBUG(f"songdb.create_track_from_file({path}): err1: {repr(err)}")
|
|
return
|
|
|
|
# Overwrite original file with normalised output
|
|
normalised = effects.normalize(audio)
|
|
try:
|
|
normalised.export(path, format=os.path.splitext(path)[1][1:])
|
|
# Fix up permssions and ownership
|
|
os.chown(path, stats.st_uid, stats.st_gid)
|
|
os.chmod(path, stats.st_mode)
|
|
# Copy tags
|
|
if ftype == 'flac':
|
|
tag_handler = FLAC
|
|
elif ftype == 'mp3':
|
|
tag_handler = MP3
|
|
else:
|
|
return track
|
|
src = tag_handler(temp_path)
|
|
dst = tag_handler(path)
|
|
for tag in src:
|
|
dst[tag] = src[tag]
|
|
dst.save()
|
|
except Exception as err:
|
|
DEBUG(f"songdb.create_track_from_file({path}): err2: {repr(err)}")
|
|
# Restore original file
|
|
shutil.copyfile(path, temp_path)
|
|
finally:
|
|
if os.path.exists(temp_path):
|
|
os.remove(temp_path)
|
|
|
|
return track
|
|
|
|
|
|
def full_update_db(session):
|
|
"""Rescan all entries in database"""
|
|
|
|
def log(msg):
|
|
INFO(f"full_update_db(): {msg}")
|
|
|
|
def check_change(track_id, title, attribute, old, new):
|
|
if new > (old * 1.1) or new < (old * 0.9):
|
|
log(
|
|
"\n"
|
|
f"track[{track_id}] ({title}) "
|
|
f"{attribute} updated from {old} to {new}"
|
|
)
|
|
|
|
# Start with normal update to add new tracks and remove any missing
|
|
# files
|
|
log("update_db()")
|
|
update_db(session)
|
|
|
|
# Now update track length, silence and fade for every track in
|
|
# database
|
|
|
|
tracks = Tracks.get_all_tracks(session)
|
|
total_tracks = len(tracks)
|
|
log(f"Processing {total_tracks} tracks")
|
|
track_count = 0
|
|
for track in tracks:
|
|
track_count += 1
|
|
print(f"\rTrack {track_count} of {total_tracks}", end='')
|
|
|
|
# Sanity check
|
|
tag = get_music_info(track.path)
|
|
if not tag['title']:
|
|
log(f"track[{track.id}] {track.title=}: No tag title")
|
|
continue
|
|
if not tag['artist']:
|
|
log(f"track[{track.id}] {track.artist=}: No tag artist")
|
|
continue
|
|
|
|
# Update title and artist
|
|
if track.title != tag['title']:
|
|
track.title = tag['title']
|
|
if track.artist != tag['artist']:
|
|
track.artist = tag['artist']
|
|
|
|
# Update numbers; log if more than 10% different
|
|
duration = int(round(
|
|
tag['duration'], Config.MILLISECOND_SIGFIGS) * 1000)
|
|
check_change(track.id, track.title, "duration", track.duration,
|
|
duration)
|
|
track.duration = duration
|
|
|
|
audio = get_audio_segment(track.path)
|
|
|
|
start_gap = leading_silence(audio)
|
|
check_change(track.id, track.title, "start_gap", track.start_gap,
|
|
start_gap)
|
|
track.start_gap = start_gap
|
|
|
|
fade_at = fade_point(audio)
|
|
check_change(track.id, track.title, "fade_at", track.fade_at,
|
|
fade_at)
|
|
track.fade_at = fade_at
|
|
|
|
silence_at = trailing_silence(audio)
|
|
check_change(track.id, track.title, "silence_at", track.silence_at,
|
|
silence_at)
|
|
track.silence_at = silence_at
|
|
session.commit()
|
|
|
|
|
|
def update_db(session):
|
|
"""
|
|
Repopulate database
|
|
"""
|
|
|
|
# Search for tracks in only one of directory and database
|
|
|
|
db_paths = set(Tracks.get_all_paths(session))
|
|
|
|
os_paths_list = []
|
|
for root, dirs, files in os.walk(Config.ROOT):
|
|
for f in files:
|
|
path = os.path.join(root, f)
|
|
ext = os.path.splitext(f)[1]
|
|
if ext in [".flac", ".mp3"]:
|
|
os_paths_list.append(path)
|
|
os_paths = set(os_paths_list)
|
|
|
|
# If a track is moved, only the path will have changed.
|
|
# For any files we have found whose paths are not in the database,
|
|
# check to see whether the filename (basename) is present in the
|
|
# database:
|
|
|
|
for path in list(os_paths - db_paths):
|
|
DEBUG(f"songdb.update_db: {path=} not in database")
|
|
# is filename in database?
|
|
track = Tracks.get_from_filename(session, os.path.basename(path))
|
|
if not track:
|
|
messages.append(f"Track missing from database: {path}")
|
|
else:
|
|
# Check track info matches found track
|
|
t = get_music_info(path)
|
|
if t['artist'] == track.artist and t['title'] == track.title:
|
|
track.update_path(path)
|
|
else:
|
|
create_track_from_file(session, path)
|
|
|
|
# Refresh database paths
|
|
db_paths = set(Tracks.get_all_paths(session))
|
|
# Remote any tracks from database whose paths don't exist
|
|
for path in list(db_paths - os_paths):
|
|
# Manage tracks listed in database but where path is invalid
|
|
track = Tracks.get_from_path(session, path)
|
|
messages.append(f"Remove from database: {path=} {track=}")
|
|
|
|
# Remove references from Playdates
|
|
Playdates.remove_track(session, track.id)
|
|
|
|
# Replace playlist entries with a note
|
|
note_txt = (
|
|
f"File removed: {track.title=}, {track.artist=}, "
|
|
f"{track.path=}"
|
|
)
|
|
for playlist in [a.playlist for a in track.playlists]:
|
|
# Create note
|
|
Notes(session, playlist.id, pt.row, note_txt)
|
|
# TODO: this needs to call playlist.add_note() now
|
|
# Remove playlist entry
|
|
playlist.remove_track(session, pt.row)
|
|
|
|
# Remove Track entry pointing to invalid path
|
|
Tracks.remove_by_path(session, path)
|
|
|
|
# Output messages (so if running via cron, these will get sent to
|
|
# user)
|
|
if messages:
|
|
print("Messages")
|
|
print("\n".join(messages))
|
|
|
|
|
|
if __name__ == '__main__' and '__file__' in globals():
|
|
main()
|