Clean up importing and track rescan

This commit is contained in:
Keith Edmunds 2022-09-12 18:23:30 +01:00
parent 11eaa803f5
commit 0194790605
7 changed files with 109 additions and 135 deletions

View File

@ -1,8 +1,15 @@
import os
import psutil
import shutil
import tempfile
from mutagen.flac import FLAC # type: ignore
from mutagen.mp3 import MP3 # type: ignore
from pydub import effects
from config import Config
from datetime import datetime
from log import log
from pydub import AudioSegment
from PyQt5.QtWidgets import QMessageBox
from tinytag import TinyTag # type: ignore
@ -184,6 +191,63 @@ def ms_to_mmss(ms: int, decimals: int = 0, negative: bool = False) -> str:
return f"{sign}{minutes:.0f}:{seconds:02.{decimals}f}"
def normalise_track(path):
"""Normalise track"""
# Check type
ftype = os.path.splitext(path)[1][1:]
if ftype not in ['mp3', 'flac']:
log.info(
f"helpers.normalise_track({path}): "
f"File type {ftype} not implemented"
)
audio = get_audio_segment(path)
if not audio:
return
# Get current file gid, uid and permissions
stats = os.stat(path)
try:
# Copy original file
fd, temp_path = tempfile.mkstemp()
shutil.copyfile(path, temp_path)
except Exception as err:
log.debug(
f"helpers.normalise_track({path}): err1: {repr(err)}"
)
return
# Overwrite original file with normalised output
normalised = effects.normalize(audio)
try:
normalised.export(path, format=os.path.splitext(path)[1][1:])
# Fix up permssions and ownership
os.chown(path, stats.st_uid, stats.st_gid)
os.chmod(path, stats.st_mode)
# Copy tags
if ftype == 'flac':
tag_handler = FLAC
elif ftype == 'mp3':
tag_handler = MP3
else:
return
src = tag_handler(temp_path)
dst = tag_handler(path)
for tag in src:
dst[tag] = src[tag]
dst.save()
except Exception as err:
log.debug(
f"helpers.normalise_track({path}): err2: {repr(err)}"
)
# Restore original file
shutil.copyfile(path, temp_path)
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
def open_in_audacity(path: str) -> bool:
"""
Open passed file in Audacity
@ -235,6 +299,29 @@ def open_in_audacity(path: str) -> bool:
return True
def set_track_metadata(session, track):
"""Set/update track metadata in database"""
t = get_tags(track.path)
audio = get_audio_segment(track.path)
track.title = t['title']
track.artist = t['artist']
track.bitrate = t['bitrate']
if not audio:
return
track.duration = len(audio)
track.start_gap = leading_silence(audio)
track.fade_at = round(fade_point(audio) / 1000,
Config.MILLISECOND_SIGFIGS) * 1000
track.silence_at = round(trailing_silence(audio) / 1000,
Config.MILLISECOND_SIGFIGS) * 1000
track.mtime = os.path.getmtime(track.path)
session.commit()
def show_warning(title: str, msg: str) -> None:
"""Display a warning to user"""

View File

@ -554,19 +554,6 @@ class Tracks(Base):
return session.execute(select(cls)).scalars().all()
@classmethod
def get_or_create(cls, session: Session, path: str) -> "Tracks":
"""
If a track with path exists, return it;
else created new track and return it
"""
track = cls.get_by_path(session, path)
if not track:
track = Tracks(session, path)
return track
@classmethod
def get_by_path(cls, session: Session, path: str) -> "Tracks":
"""
@ -583,22 +570,6 @@ class Tracks(Base):
except NoResultFound:
return None
def rescan(self, session: Session) -> None:
"""
Update audio metadata for passed track.
"""
audio: AudioSegment = get_audio_segment(self.path)
self.duration = len(audio)
self.fade_at = round(fade_point(audio) / 1000,
Config.MILLISECOND_SIGFIGS) * 1000
self.mtime = os.path.getmtime(self.path)
self.silence_at = round(trailing_silence(audio) / 1000,
Config.MILLISECOND_SIGFIGS) * 1000
self.start_gap = leading_silence(audio)
session.add(self)
session.commit()
@classmethod
def search_artists(cls, session: Session, text: str) -> List["Tracks"]:
"""Search case-insenstively for artists containing str"""

View File

@ -42,7 +42,7 @@ from ui.dlg_SelectPlaylist_ui import Ui_dlgSelectPlaylist # type: ignore
from ui.downloadcsv_ui import Ui_DateSelect # type: ignore
from config import Config
from ui.main_window_ui import Ui_MainWindow # type: ignore
from utilities import create_track_from_file, check_db, update_bitrates
from utilities import check_db, update_bitrates
class TrackData:
@ -98,14 +98,6 @@ class Window(QMainWindow, Ui_MainWindow):
self.timer.start(Config.TIMER_MS)
self.connect_signals_slots()
#
# @staticmethod
# def print_current_database():
# with Session() as session:
# db = session.bind.engine.url.database
# print(f"{db=}")
#
def clear_selection(self) -> None:
""" Clear selected row"""
@ -492,7 +484,9 @@ class Window(QMainWindow, Ui_MainWindow):
with Session() as session:
for (fname, tags) in tracks:
track = create_track_from_file(session, fname, tags=tags)
track = Tracks(session, fname)
helpers.set_track_metadata(session, track)
helpers.normalise_track(track.path)
self.visible_playlist_tab().insert_track(session, track)
def insert_header(self) -> None:

View File

@ -43,7 +43,8 @@ from helpers import (
file_is_readable,
get_relative_date,
ms_to_mmss,
open_in_audacity
open_in_audacity,
set_track_metadata,
)
from log import log
from models import (
@ -1650,7 +1651,7 @@ class PlaylistTab(QTableWidget):
)
return
track.rescan(session)
set_track_metadata(session, track)
self._update_row(session, row, track)
def _run_subprocess(self, args):
@ -1889,6 +1890,9 @@ class PlaylistTab(QTableWidget):
item_duration = self.item(row, columns['duration'].idx)
item_duration.setText(ms_to_mmss(track.duration))
item_bitrate = self.item(row, columns['bitrate'].idx)
item_bitrate.setText(str(track.bitrate))
self.update_display(session)
def _wikipedia(self, row_number: int) -> None:

View File

@ -16,6 +16,7 @@ from helpers import (
get_tags,
leading_silence,
trailing_silence,
set_track_metadata,
)
from models import Tracks
@ -257,19 +258,14 @@ def process_track(src, dst, title, artist, bitrate):
with Session() as session:
track = Tracks.get_by_path(session, dst)
if track:
track.title = title
track.artist = artist
# Update path, but workaround MariaDB bug
track.path = new_path
track.bitrate = bitrate
try:
session.commit()
except IntegrityError:
# https://jira.mariadb.org/browse/MDEV-29345 workaround
session.rollback()
track.title = title
track.artist = artist
track.path = "DUMMY"
track.bitrate = bitrate
session.commit()
track.path = new_path
session.commit()
@ -279,11 +275,9 @@ def process_track(src, dst, title, artist, bitrate):
os.unlink(dst)
shutil.move(src, new_path)
track = Tracks.get_by_path(session, new_path)
if track:
track.rescan(session)
else:
print(f"Can't find copied track {src=}, {dst=}")
# Update track metadata
set_track_metadata(session, track)
main()

View File

@ -1,8 +1,6 @@
# #!/usr/bin/env python
#
import os
import shutil
import tempfile
from config import Config
from helpers import (
@ -10,91 +8,26 @@ from helpers import (
get_audio_segment,
get_tags,
leading_silence,
normalise_track,
set_track_metadata,
trailing_silence,
)
from log import log
from models import Tracks
from mutagen.flac import FLAC # type: ignore
from mutagen.mp3 import MP3 # type: ignore
from pydub import effects
def create_track_from_file(session, path, normalise=None, tags=None):
def create_track(session, path, normalise=None):
"""
Create track in database from passed path, or update database entry
if path already in database.
Create track in database from passed path.
Return track.
"""
if not tags:
t = get_tags(path)
else:
t = tags
track = Tracks.get_or_create(session, path)
track.title = t['title']
track.artist = t['artist']
audio = get_audio_segment(path)
track.duration = len(audio)
track.start_gap = leading_silence(audio)
track.fade_at = round(fade_point(audio) / 1000,
Config.MILLISECOND_SIGFIGS) * 1000
track.silence_at = round(trailing_silence(audio) / 1000,
Config.MILLISECOND_SIGFIGS) * 1000
track.mtime = os.path.getmtime(path)
track.bitrate = t['bitrate']
session.commit()
track = Tracks(session, path)
set_track_metadata(session, track)
if normalise or normalise is None and Config.NORMALISE_ON_IMPORT:
# Check type
ftype = os.path.splitext(path)[1][1:]
if ftype not in ['mp3', 'flac']:
log.info(f"File type {ftype} not implemented")
return track
# Get current file gid, uid and permissions
stats = os.stat(path)
try:
# Copy original file
fd, temp_path = tempfile.mkstemp()
shutil.copyfile(path, temp_path)
except Exception as err:
log.debug(
f"utilities.create_track_from_file({path}): err1: {repr(err)}"
)
return
# Overwrite original file with normalised output
normalised = effects.normalize(audio)
try:
normalised.export(path, format=os.path.splitext(path)[1][1:])
# Fix up permssions and ownership
os.chown(path, stats.st_uid, stats.st_gid)
os.chmod(path, stats.st_mode)
# Copy tags
if ftype == 'flac':
tag_handler = FLAC
elif ftype == 'mp3':
tag_handler = MP3
else:
return track
src = tag_handler(temp_path)
dst = tag_handler(path)
for tag in src:
dst[tag] = src[tag]
dst.save()
except Exception as err:
log.debug(
f"utilities.create_track_from_file({path}): err2: {repr(err)}"
)
# Restore original file
shutil.copyfile(path, temp_path)
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
return track
normalise_track(path)
def check_db(session):

View File

@ -366,15 +366,6 @@ def test_tracks_get_all_tracks(session):
assert track2_path in [a.path for a in result]
def test_tracks_get_or_create(session):
track1_path = "/a/b/c"
track1 = Tracks.get_or_create(session, track1_path)
assert track1.path == track1_path
track2 = Tracks.get_or_create(session, track1_path)
assert track1 is track2
def test_tracks_by_filename(session):
track1_path = "/a/b/c"