Added .rescan to Tracks
Also added tests for rescan function
This commit is contained in:
parent
557b89ba09
commit
1c86728170
120
app/helpers.py
120
app/helpers.py
@ -1,10 +1,67 @@
|
|||||||
import os
|
import os
|
||||||
import psutil
|
import psutil
|
||||||
|
|
||||||
|
from app.config import Config
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from pydub import AudioSegment
|
||||||
|
from mutagen.flac import FLAC
|
||||||
|
from mutagen.mp3 import MP3
|
||||||
from PyQt5.QtWidgets import QMessageBox
|
from PyQt5.QtWidgets import QMessageBox
|
||||||
|
|
||||||
|
|
||||||
|
def fade_point(audio_segment, fade_threshold=0,
|
||||||
|
chunk_size=Config.AUDIO_SEGMENT_CHUNK_SIZE):
|
||||||
|
"""
|
||||||
|
Returns the millisecond/index of the point where the volume drops below
|
||||||
|
the maximum and doesn't get louder again.
|
||||||
|
audio_segment - the sdlg_search_database_uiegment to find silence in
|
||||||
|
fade_threshold - the upper bound for how quiet is silent in dFBS
|
||||||
|
chunk_size - chunk size for interating over the segment in ms
|
||||||
|
"""
|
||||||
|
|
||||||
|
assert chunk_size > 0 # to avoid infinite loop
|
||||||
|
|
||||||
|
segment_length = audio_segment.duration_seconds * 1000 # ms
|
||||||
|
trim_ms = segment_length - chunk_size
|
||||||
|
max_vol = audio_segment.dBFS
|
||||||
|
if fade_threshold == 0:
|
||||||
|
fade_threshold = max_vol
|
||||||
|
|
||||||
|
while (
|
||||||
|
audio_segment[trim_ms:trim_ms + chunk_size].dBFS < fade_threshold
|
||||||
|
and trim_ms > 0): # noqa W503
|
||||||
|
trim_ms -= chunk_size
|
||||||
|
|
||||||
|
# if there is no trailing silence, return lenght of track (it's less
|
||||||
|
# the chunk_size, but for chunk_size = 10ms, this may be ignored)
|
||||||
|
return int(trim_ms)
|
||||||
|
|
||||||
|
|
||||||
|
def get_audio_segment(path):
|
||||||
|
try:
|
||||||
|
if path.endswith('.mp3'):
|
||||||
|
return AudioSegment.from_mp3(path)
|
||||||
|
elif path.endswith('.flac'):
|
||||||
|
return AudioSegment.from_file(path, "flac")
|
||||||
|
except AttributeError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_tag_data(path):
|
||||||
|
"""
|
||||||
|
Return a dictionary of title, artist, duration-in-milliseconds and path.
|
||||||
|
"""
|
||||||
|
|
||||||
|
tag = TinyTag.get(path)
|
||||||
|
|
||||||
|
return dict(
|
||||||
|
title=tag.title,
|
||||||
|
artist=tag.artist,
|
||||||
|
duration=int(round(tag.duration, Config.MILLISECOND_SIGFIGS) * 1000),
|
||||||
|
path=path
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def get_relative_date(past_date, reference_date=None):
|
def get_relative_date(past_date, reference_date=None):
|
||||||
"""
|
"""
|
||||||
Return how long before reference_date past_date is as string.
|
Return how long before reference_date past_date is as string.
|
||||||
@ -40,6 +97,49 @@ def get_relative_date(past_date, reference_date=None):
|
|||||||
return f"{weeks} {weeks_str}, {days} {days_str} ago"
|
return f"{weeks} {weeks_str}, {days} {days_str} ago"
|
||||||
|
|
||||||
|
|
||||||
|
def leading_silence(audio_segment, silence_threshold=Config.DBFS_SILENCE,
|
||||||
|
chunk_size=Config.AUDIO_SEGMENT_CHUNK_SIZE):
|
||||||
|
"""
|
||||||
|
Returns the millisecond/index that the leading silence ends.
|
||||||
|
audio_segment - the segment to find silence in
|
||||||
|
silence_threshold - the upper bound for how quiet is silent in dFBS
|
||||||
|
chunk_size - chunk size for interating over the segment in ms
|
||||||
|
|
||||||
|
https://github.com/jiaaro/pydub/blob/master/pydub/silence.py
|
||||||
|
"""
|
||||||
|
|
||||||
|
trim_ms = 0 # ms
|
||||||
|
assert chunk_size > 0 # to avoid infinite loop
|
||||||
|
while (
|
||||||
|
audio_segment[trim_ms:trim_ms + chunk_size].dBFS < # noqa W504
|
||||||
|
silence_threshold and trim_ms < len(audio_segment)):
|
||||||
|
trim_ms += chunk_size
|
||||||
|
|
||||||
|
# if there is no end it should return the length of the segment
|
||||||
|
return min(trim_ms, len(audio_segment))
|
||||||
|
|
||||||
|
|
||||||
|
def ms_to_mmss(ms, decimals=0, negative=False):
|
||||||
|
if not ms:
|
||||||
|
return "-"
|
||||||
|
sign = ""
|
||||||
|
if ms < 0:
|
||||||
|
if negative:
|
||||||
|
sign = "-"
|
||||||
|
else:
|
||||||
|
ms = 0
|
||||||
|
|
||||||
|
minutes, remainder = divmod(ms, 60 * 1000)
|
||||||
|
seconds = remainder / 1000
|
||||||
|
|
||||||
|
# if seconds >= 59.5, it will be represented as 60, which looks odd.
|
||||||
|
# So, fake it under those circumstances
|
||||||
|
if seconds >= 59.5:
|
||||||
|
seconds = 59.0
|
||||||
|
|
||||||
|
return f"{sign}{minutes:.0f}:{seconds:02.{decimals}f}"
|
||||||
|
|
||||||
|
|
||||||
def open_in_audacity(path):
|
def open_in_audacity(path):
|
||||||
"""
|
"""
|
||||||
Open passed file in Audacity
|
Open passed file in Audacity
|
||||||
@ -88,22 +188,8 @@ def show_warning(title, msg):
|
|||||||
QMessageBox.warning(None, title, msg, buttons=QMessageBox.Cancel)
|
QMessageBox.warning(None, title, msg, buttons=QMessageBox.Cancel)
|
||||||
|
|
||||||
|
|
||||||
def ms_to_mmss(ms, decimals=0, negative=False):
|
def trailing_silence(audio_segment, silence_threshold=-50.0,
|
||||||
if not ms:
|
chunk_size=Config.AUDIO_SEGMENT_CHUNK_SIZE):
|
||||||
return "-"
|
return fade_point(audio_segment, silence_threshold, chunk_size)
|
||||||
sign = ""
|
|
||||||
if ms < 0:
|
|
||||||
if negative:
|
|
||||||
sign = "-"
|
|
||||||
else:
|
|
||||||
ms = 0
|
|
||||||
|
|
||||||
minutes, remainder = divmod(ms, 60 * 1000)
|
|
||||||
seconds = remainder / 1000
|
|
||||||
|
|
||||||
# if seconds >= 59.5, it will be represented as 60, which looks odd.
|
|
||||||
# So, fake it under those circumstances
|
|
||||||
if seconds >= 59.5:
|
|
||||||
seconds = 59.0
|
|
||||||
|
|
||||||
return f"{sign}{minutes:.0f}:{seconds:02.{decimals}f}"
|
|
||||||
|
|||||||
@ -6,6 +6,8 @@ import re
|
|||||||
import sqlalchemy
|
import sqlalchemy
|
||||||
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from mutagen.flac import FLAC
|
||||||
|
from mutagen.mp3 import MP3
|
||||||
from sqlalchemy.ext.associationproxy import association_proxy
|
from sqlalchemy.ext.associationproxy import association_proxy
|
||||||
from sqlalchemy.ext.declarative import declarative_base
|
from sqlalchemy.ext.declarative import declarative_base
|
||||||
from sqlalchemy import (
|
from sqlalchemy import (
|
||||||
@ -23,6 +25,13 @@ from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound
|
|||||||
from sqlalchemy.orm import backref, relationship, sessionmaker, scoped_session
|
from sqlalchemy.orm import backref, relationship, sessionmaker, scoped_session
|
||||||
|
|
||||||
from app.config import Config
|
from app.config import Config
|
||||||
|
from app.helpers import (
|
||||||
|
fade_point,
|
||||||
|
get_audio_segment,
|
||||||
|
leading_silence,
|
||||||
|
show_warning,
|
||||||
|
trailing_silence,
|
||||||
|
)
|
||||||
from app.log import DEBUG, ERROR
|
from app.log import DEBUG, ERROR
|
||||||
|
|
||||||
# Create session at the global level as per
|
# Create session at the global level as per
|
||||||
@ -481,6 +490,22 @@ class Tracks(Base):
|
|||||||
ERROR(f"get_track({track_id}): not found")
|
ERROR(f"get_track({track_id}): not found")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def rescan(self, session):
|
||||||
|
"""
|
||||||
|
Update audio metadata for passed track.
|
||||||
|
"""
|
||||||
|
|
||||||
|
audio = get_audio_segment(self.path)
|
||||||
|
self.duration = len(audio)
|
||||||
|
self.fade_at = round(fade_point(audio) / 1000,
|
||||||
|
Config.MILLISECOND_SIGFIGS) * 1000
|
||||||
|
self.mtime = os.path.getmtime(self.path)
|
||||||
|
self.silence_at = round(trailing_silence(audio) / 1000,
|
||||||
|
Config.MILLISECOND_SIGFIGS) * 1000
|
||||||
|
self.start_gap = leading_silence(audio)
|
||||||
|
session.add(self)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def remove_by_path(session, path):
|
def remove_by_path(session, path):
|
||||||
"Remove track with passed path from database"
|
"Remove track with passed path from database"
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
import os.path
|
import os.path
|
||||||
import time
|
import random
|
||||||
|
import string
|
||||||
|
|
||||||
from app.models import (
|
from app.models import (
|
||||||
NoteColours,
|
NoteColours,
|
||||||
@ -322,6 +323,30 @@ def test_tracks_by_id(session):
|
|||||||
assert Tracks.get_by_id(session, track1.id) is track1
|
assert Tracks.get_by_id(session, track1.id) is track1
|
||||||
|
|
||||||
|
|
||||||
|
def test_tracks_rescan(session):
|
||||||
|
# Get test track
|
||||||
|
test_track_path = "./testdata/isa.mp3"
|
||||||
|
test_track_data = "./testdata/isa.py"
|
||||||
|
|
||||||
|
track = Tracks(session, test_track_path)
|
||||||
|
track.rescan(session)
|
||||||
|
|
||||||
|
# Get test data
|
||||||
|
with open(test_track_data) as f:
|
||||||
|
testdata = eval(f.read())
|
||||||
|
|
||||||
|
# Re-read the track
|
||||||
|
track_read = Tracks.get_from_path(session, test_track_path)
|
||||||
|
|
||||||
|
assert track_read.duration == testdata['duration']
|
||||||
|
assert track_read.start_gap == testdata['leading_silence']
|
||||||
|
# Silence detection can vary, so ± 1 second is OK
|
||||||
|
assert track_read.fade_at < testdata['fade_at'] + 1000
|
||||||
|
assert track_read.fade_at > testdata['fade_at'] - 1000
|
||||||
|
assert track_read.silence_at < testdata['trailing_silence'] + 1000
|
||||||
|
assert track_read.silence_at > testdata['trailing_silence'] - 1000
|
||||||
|
|
||||||
|
|
||||||
def test_tracks_remove_by_path(session):
|
def test_tracks_remove_by_path(session):
|
||||||
track1_path = "/a/b/c"
|
track1_path = "/a/b/c"
|
||||||
|
|
||||||
|
|||||||
8
testdata/isa.py
vendored
Normal file
8
testdata/isa.py
vendored
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
# Measurements for isa.{mp3,flac} (milliseconds)
|
||||||
|
|
||||||
|
{
|
||||||
|
"leading_silence": 60,
|
||||||
|
"fade_at": 236163,
|
||||||
|
"trailing_silence": 259373,
|
||||||
|
"duration": 262533,
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user