205 lines
6.4 KiB
Python
Executable File
205 lines
6.4 KiB
Python
Executable File
# #!/usr/bin/env python
|
|
#
|
|
import os
|
|
import shutil
|
|
import tempfile
|
|
|
|
from config import Config
|
|
from helpers import (
|
|
fade_point,
|
|
get_audio_segment,
|
|
get_tags,
|
|
leading_silence,
|
|
trailing_silence,
|
|
)
|
|
from log import log
|
|
from models import Tracks
|
|
from mutagen.flac import FLAC # type: ignore
|
|
from mutagen.mp3 import MP3 # type: ignore
|
|
from pydub import effects
|
|
|
|
|
|
def create_track_from_file(session, path, normalise=None, tags=None):
|
|
"""
|
|
Create track in database from passed path, or update database entry
|
|
if path already in database.
|
|
|
|
Return track.
|
|
"""
|
|
|
|
if not tags:
|
|
t = get_tags(path)
|
|
else:
|
|
t = tags
|
|
|
|
track = Tracks.get_or_create(session, path)
|
|
track.title = t['title']
|
|
track.artist = t['artist']
|
|
audio = get_audio_segment(path)
|
|
track.duration = len(audio)
|
|
track.start_gap = leading_silence(audio)
|
|
track.fade_at = round(fade_point(audio) / 1000,
|
|
Config.MILLISECOND_SIGFIGS) * 1000
|
|
track.silence_at = round(trailing_silence(audio) / 1000,
|
|
Config.MILLISECOND_SIGFIGS) * 1000
|
|
track.mtime = os.path.getmtime(path)
|
|
track.bitrate = t['bitrate']
|
|
session.commit()
|
|
|
|
if normalise or normalise is None and Config.NORMALISE_ON_IMPORT:
|
|
# Check type
|
|
ftype = os.path.splitext(path)[1][1:]
|
|
if ftype not in ['mp3', 'flac']:
|
|
log.info(f"File type {ftype} not implemented")
|
|
return track
|
|
|
|
# Get current file gid, uid and permissions
|
|
stats = os.stat(path)
|
|
try:
|
|
# Copy original file
|
|
fd, temp_path = tempfile.mkstemp()
|
|
shutil.copyfile(path, temp_path)
|
|
except Exception as err:
|
|
log.debug(
|
|
f"utilities.create_track_from_file({path}): err1: {repr(err)}"
|
|
)
|
|
return
|
|
|
|
# Overwrite original file with normalised output
|
|
normalised = effects.normalize(audio)
|
|
try:
|
|
normalised.export(path, format=os.path.splitext(path)[1][1:])
|
|
# Fix up permssions and ownership
|
|
os.chown(path, stats.st_uid, stats.st_gid)
|
|
os.chmod(path, stats.st_mode)
|
|
# Copy tags
|
|
if ftype == 'flac':
|
|
tag_handler = FLAC
|
|
elif ftype == 'mp3':
|
|
tag_handler = MP3
|
|
else:
|
|
return track
|
|
src = tag_handler(temp_path)
|
|
dst = tag_handler(path)
|
|
for tag in src:
|
|
dst[tag] = src[tag]
|
|
dst.save()
|
|
except Exception as err:
|
|
log.debug(
|
|
f"utilities.create_track_from_file({path}): err2: {repr(err)}"
|
|
)
|
|
# Restore original file
|
|
shutil.copyfile(path, temp_path)
|
|
finally:
|
|
if os.path.exists(temp_path):
|
|
os.remove(temp_path)
|
|
|
|
return track
|
|
|
|
|
|
def check_db(session):
|
|
"""
|
|
Database consistency check.
|
|
|
|
A report is generated if issues are found, but there are no automatic
|
|
corrections made.
|
|
|
|
Search for tracks that are in the music directory but not the datebase
|
|
Check all paths in database exist
|
|
"""
|
|
|
|
db_paths = set([a.path for a in Tracks.get_all(session)])
|
|
|
|
os_paths_list = []
|
|
for root, dirs, files in os.walk(Config.ROOT):
|
|
for f in files:
|
|
path = os.path.join(root, f)
|
|
ext = os.path.splitext(f)[1]
|
|
if ext in [".flac", ".mp3"]:
|
|
os_paths_list.append(path)
|
|
os_paths = set(os_paths_list)
|
|
|
|
# Find any files in music directory that are not in database
|
|
files_not_in_db = list(os_paths - db_paths)
|
|
|
|
# Find paths in database missing in music directory
|
|
paths_not_found = []
|
|
missing_file_count = 0
|
|
more_files_to_report = False
|
|
for path in list(db_paths - os_paths):
|
|
if missing_file_count >= Config.MAX_MISSING_FILES_TO_REPORT:
|
|
more_files_to_report = True
|
|
break
|
|
|
|
missing_file_count += 1
|
|
|
|
track = Tracks.get_by_path(session, path)
|
|
if not track:
|
|
# This shouldn't happen as we're looking for paths in
|
|
# database that aren't in filesystem, but just in case...
|
|
log.error(f"update_db: {path} not found in db")
|
|
continue
|
|
|
|
paths_not_found.append(track)
|
|
|
|
# Output messages (so if running via cron, these will get sent to
|
|
# user)
|
|
if files_not_in_db:
|
|
print("Files in music directory but not in database")
|
|
print("--------------------------------------------")
|
|
print("\n".join(files_not_in_db))
|
|
print("\n")
|
|
if paths_not_found:
|
|
print("Invalid paths in database")
|
|
print("-------------------------")
|
|
for t in paths_not_found:
|
|
print(f"""
|
|
Track ID: {t.id}
|
|
Path: {t.path}
|
|
Title: {t.title}
|
|
Artist: {t.artist}
|
|
""")
|
|
if more_files_to_report:
|
|
print("There were more paths than listed that were not found")
|
|
|
|
|
|
def update_bitrates(session):
|
|
"""
|
|
Update bitrates on all tracks in database
|
|
"""
|
|
|
|
for track in Tracks.get_all(session):
|
|
try:
|
|
t = get_tags(track.path)
|
|
track.bitrate = t["bitrate"]
|
|
except FileNotFoundError:
|
|
continue
|
|
|
|
|
|
# # Spike
|
|
# #
|
|
# # # Manage tracks listed in database but where path is invalid
|
|
# # log.debug(f"Invalid {path=} in database")
|
|
# # track = Tracks.get_by_path(session, path)
|
|
# # messages.append(f"Remove from database: {path=} {track=}")
|
|
# #
|
|
# # # Remove references from Playdates
|
|
# # Playdates.remove_track(session, track.id)
|
|
# #
|
|
# # # Replace playlist entries with a note
|
|
# # note_txt = (
|
|
# # f"File removed: {track.title=}, {track.artist=}, "
|
|
# # f"{track.path=}"
|
|
# # )
|
|
# # for playlist_track in track.playlists:
|
|
# # row = playlist_track.row
|
|
# # # Remove playlist entry
|
|
# # log.debug(f"Remove {row=} from {playlist_track.playlist_id}")
|
|
# # playlist_track.playlist.remove_track(session, row)
|
|
# # # Create note
|
|
# # log.debug(f"Add note at {row=} to {playlist_track.playlist_id=}")
|
|
# # Notes(session, playlist_track.playlist_id, row, note_txt)
|
|
# #
|
|
# # # Remove Track entry pointing to invalid path
|
|
# # Tracks.remove_by_path(session, path)
|