musicmuster/app/songdb.py
2021-09-29 20:55:39 +01:00

416 lines
13 KiB
Python
Executable File

#!/usr/bin/env python
import argparse
import os
import shutil
import tempfile
from config import Config
from helpers import show_warning
from log import DEBUG, INFO
from model import Notes, Playdates, PlaylistTracks, Session, Tracks
from mutagen.flac import FLAC
from mutagen.mp3 import MP3
from pydub import AudioSegment, effects
from tinytag import TinyTag
# Globals (I know)
messages = []
def main():
"Main loop"
DEBUG("Starting")
# Parse command line
p = argparse.ArgumentParser()
# Only allow one option to be specified
group = p.add_mutually_exclusive_group()
group.add_argument('-u', '--update',
action="store_true", dest="update",
default=False, help="Update database")
group.add_argument('-f', '--full-update',
action="store_true", dest="full_update",
default=False, help="Update database")
group.add_argument('-i', '--import', dest="fname", help="Input file")
args = p.parse_args()
# Run as required
if args.update:
DEBUG("Updating database")
with Session() as session:
update_db(session)
elif args.full_update:
DEBUG("Full update of database")
with Session() as session:
full_update_db(session)
elif args.fname:
fname = os.path.realpath(args.fname)
with Session() as session:
create_track_from_file(session, fname, interactive=True)
else:
INFO("No action specified")
DEBUG("Finished")
def create_track_from_file(session, path, interactive=False):
"""
Create track in database from passed path, or update database entry
if path already in database.
Return track.
"""
if interactive:
str = f"Importing {path}"
INFO(str)
INFO("-" * len(str))
INFO("Get track info...")
t = get_music_info(path)
title = t['title']
artist = t['artist']
if interactive:
INFO(f" Title: \"{title}\"")
INFO(f" Artist: \"{artist}\"")
# Check for duplicate
tracks = Tracks.search_titles(session, title)
if tracks:
print("Found the following possible matches:")
for track in tracks:
print(f'"{track.title}" by {track.artist}')
response = input("Continue [c] or abort [a]?")
if not response:
return
if response[0].lower() not in ['c', 'y']:
return
track = Tracks.get_or_create(session, path)
track.title = title
track.artist = artist
track.duration = int(round(
t['duration'], Config.MILLISECOND_SIGFIGS) * 1000)
if interactive:
INFO("Parse for start, fade and silence...")
audio = get_audio_segment(path)
track.start_gap = leading_silence(audio)
track.fade_at = round(fade_point(audio) / 1000,
Config.MILLISECOND_SIGFIGS) * 1000
track.silence_at = round(trailing_silence(audio) / 1000,
Config.MILLISECOND_SIGFIGS) * 1000
track.mtime = os.path.getmtime(path)
session.commit()
if Config.NORMALISE_ON_IMPORT:
if interactive:
INFO("Normalise...")
# Check type
ftype = os.path.splitext(path)[1][1:]
if ftype not in ['mp3', 'flac']:
INFO(f"File type {ftype} not implemented")
return track
# Get current file gid, uid and permissions
stats = os.stat(path)
try:
# Copy original file
fd, temp_path = tempfile.mkstemp()
shutil.copyfile(path, temp_path)
except Exception as err:
DEBUG(f"songdb.create_track_from_file({path}): err1: {str(err)}")
return
# Overwrite original file with normalised output
normalised = effects.normalize(audio)
try:
normalised.export(path, format=os.path.splitext(path)[1][1:])
# Fix up permssions and ownership
os.chown(path, stats.st_uid, stats.st_gid)
os.chmod(path, stats.st_mode)
# Copy tags
if ftype == 'flac':
tag_handler = FLAC
elif ftype == 'mp3':
tag_handler = MP3
else:
return track
src = tag_handler(temp_path)
dst = tag_handler(path)
for tag in src:
dst[tag] = src[tag]
dst.save()
except Exception as err:
DEBUG(f"songdb.create_track_from_file({path}): err2: {str(err)}")
# Restore original file
shutil.copyfile(path, temp_path)
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
return track
def full_update_db(session):
"Rescan all entries in database"
def log(msg):
INFO(f"full_update_db(): {msg}")
def check_change(track_id, title, attribute, old, new):
if new > (old * 1.1) or new < (old * 0.9):
log(
"\n"
f"track[{track_id}] ({title}) "
f"{attribute} updated from {old} to {new}"
)
# Start with normal update to add new tracks and remove any missing
# files
log("update_db()")
update_db(session)
# Now update track length, silence and fade for every track in
# database
tracks = Tracks.get_all_tracks(session)
total_tracks = len(tracks)
log(f"Processing {total_tracks} tracks")
track_count = 0
for track in tracks:
track_count += 1
print(f"\rTrack {track_count} of {total_tracks}", end='')
# Sanity check
tag = get_music_info(track.path)
if not tag['title']:
log(f"track[{track.id}] {track.title=}: No tag title")
continue
if not tag['artist']:
log(f"track[{track.id}] {track.artist=}: No tag artist")
continue
# Update title and artist
if track.title != tag['title']:
track.title = tag['title']
if track.artist != tag['artist']:
track.artist = tag['artist']
# Update numbers; log if more than 10% different
duration = int(round(
tag['duration'], Config.MILLISECOND_SIGFIGS) * 1000)
check_change(track.id, track.title, "duration", track.duration,
duration)
track.duration = duration
audio = get_audio_segment(track.path)
start_gap = leading_silence(audio)
check_change(track.id, track.title, "start_gap", track.start_gap,
start_gap)
track.start_gap = start_gap
fade_at = fade_point(audio)
check_change(track.id, track.title, "fade_at", track.fade_at,
fade_at)
track.fade_at = fade_at
silence_at = trailing_silence(audio)
check_change(track.id, track.title, "silence_at", track.silence_at,
silence_at)
track.silence_at = silence_at
session.commit()
def get_audio_segment(path):
try:
if path.endswith('.mp3'):
return AudioSegment.from_mp3(path)
elif path.endswith('.flac'):
return AudioSegment.from_file(path, "flac")
except AttributeError:
return None
def get_music_info(path):
"""
Return a dictionary of title, artist, duration-in-milliseconds and path.
"""
tag = TinyTag.get(path)
return dict(
title=tag.title,
artist=tag.artist,
duration=tag.duration,
path=path
)
def leading_silence(audio_segment, silence_threshold=Config.DBFS_SILENCE,
chunk_size=Config.AUDIO_SEGMENT_CHUNK_SIZE):
"""
Returns the millisecond/index that the leading silence ends.
audio_segment - the segment to find silence in
silence_threshold - the upper bound for how quiet is silent in dFBS
chunk_size - chunk size for interating over the segment in ms
https://github.com/jiaaro/pydub/blob/master/pydub/silence.py
"""
trim_ms = 0 # ms
assert chunk_size > 0 # to avoid infinite loop
while (
audio_segment[trim_ms:trim_ms + chunk_size].dBFS < # noqa W504
silence_threshold and trim_ms < len(audio_segment)):
trim_ms += chunk_size
# if there is no end it should return the length of the segment
return min(trim_ms, len(audio_segment))
def fade_point(audio_segment, fade_threshold=0,
chunk_size=Config.AUDIO_SEGMENT_CHUNK_SIZE):
"""
Returns the millisecond/index of the point where the volume drops below
the maximum and doesn't get louder again.
audio_segment - the sdlg_search_database_uiegment to find silence in
fade_threshold - the upper bound for how quiet is silent in dFBS
chunk_size - chunk size for interating over the segment in ms
"""
assert chunk_size > 0 # to avoid infinite loop
segment_length = audio_segment.duration_seconds * 1000 # ms
trim_ms = segment_length - chunk_size
max_vol = audio_segment.dBFS
if fade_threshold == 0:
fade_threshold = max_vol
while (
audio_segment[trim_ms:trim_ms + chunk_size].dBFS < fade_threshold
and trim_ms > 0): # noqa W503
trim_ms -= chunk_size
# if there is no trailing silence, return lenght of track (it's less
# the chunk_size, but for chunk_size = 10ms, this may be ignored)
return int(trim_ms)
def trailing_silence(audio_segment, silence_threshold=-50.0,
chunk_size=Config.AUDIO_SEGMENT_CHUNK_SIZE):
return fade_point(audio_segment, silence_threshold, chunk_size)
def update_db(session):
"""
Repopulate database
"""
# Search for tracks in only one of directory and database
db_paths = set(Tracks.get_all_paths(session))
os_paths_list = []
for root, dirs, files in os.walk(Config.ROOT):
for f in files:
path = os.path.join(root, f)
ext = os.path.splitext(f)[1]
if ext in [".flac", ".mp3"]:
os_paths_list.append(path)
os_paths = set(os_paths_list)
# If a track is moved, only the path will have changed.
# For any files we have found whose paths are not in the database,
# check to see whether the filename (basename) is present in the
# database:
for path in list(os_paths - db_paths):
DEBUG(f"songdb.update_db: {path=} not in database")
# is filename in database?
track = Tracks.get_track_from_filename(session, os.path.basename(path))
if not track:
messages.append(f"Track missing from database: {path}")
else:
# Check track info matches found track
t = get_music_info(path)
if t['artist'] == track.artist and t['title'] == track.title:
track.update_path(path)
else:
create_track_from_file(session, path)
# Refresh database paths
db_paths = set(Tracks.get_all_paths(session))
# Remote any tracks from database whose paths don't exist
for path in list(db_paths - os_paths):
# Manage tracks listed in database but where path is invalid
track = Tracks.get_track_from_path(session, path)
messages.append(f"Remove from database: {path=} {track=}")
# Remove references from Playdates
Playdates.remove_track(session, track.id)
# Replace playlist entries with a note
note_txt = (
f"File removed: {track.title=}, {track.artist=}, "
f"{track.path=}"
)
for pt in PlaylistTracks.get_track_playlists(session, track.id):
# Create note
Notes.add_note(session, pt.playlist_id, pt.row, note_txt)
# Remove playlist entry
PlaylistTracks.remove_track(session, pt.playlist_id, pt.row)
# Remove Track entry pointing to invalid path
Tracks.remove_path(session, path)
# Output messages (so if running via cron, these will get sent to
# user)
if messages:
print("Messages")
print("\n".join(messages))
def update_meta(session, track, artist=None, title=None):
"""
Updates both the tag info in the file and the database entry with passed
artist and tag details.
"""
DEBUG(f"songdb.update_meta({session=}, {track=}, {artist=}, {title=})")
if not artist and not title:
return
ftype = os.path.splitext(track.path)[1][1:]
if ftype == 'flac':
tag_handler = FLAC
elif ftype == 'mp3':
tag_handler = MP3
else:
INFO(f"File type {ftype} not implemented")
return
# Update tags
f = tag_handler(track.path)
try:
if artist:
f["artist"] = artist
if title:
f["title"] = title
f.save()
except TypeError:
show_warning("TAG error", "Can't update tag. Try editing in Audacity")
# Update database
with Session() as session:
if artist:
Tracks.update_artist(session, track.id, artist)
if title:
Tracks.update_title(session, track.id, title)
if __name__ == '__main__' and '__file__' in globals():
main()