Compare commits

..

No commits in common. "bef4507ef680eec4f7b1fd1bb074658768e77984" and "8192e79d426d0e006a106dc5bff87e8723f2f5ac" have entirely different histories.

10 changed files with 4332 additions and 4346 deletions

2
.envrc
View File

@ -2,8 +2,6 @@ layout poetry
branch=$(git branch --show-current) branch=$(git branch --show-current)
if on_git_branch master; then if on_git_branch master; then
export MM_ENV="PRODUCTION" export MM_ENV="PRODUCTION"
export MM_DB="mysql+mysqldb://musicmuster:musicmuster@localhost/musicmuster_prod"
else MYSQL_DATABASE="musicmuster_dev" else MYSQL_DATABASE="musicmuster_dev"
export MM_ENV="DEVELOPMENT" export MM_ENV="DEVELOPMENT"
export MM_DB="mysql+mysqldb://musicmuster:musicmuster@localhost/musicmuster_dev"
fi fi

View File

@ -1,6 +1,5 @@
import logging import logging
import os import os
from typing import List, Optional
class Config(object): class Config(object):
@ -32,18 +31,16 @@ class Config(object):
COLUMN_NAME_TITLE = "Title" COLUMN_NAME_TITLE = "Title"
DBFS_FADE = -12 DBFS_FADE = -12
DBFS_SILENCE = -50 DBFS_SILENCE = -50
DEBUG_FUNCTIONS: List[Optional[str]] = []
DEBUG_MODULES: List[Optional[str]] = ['dbconfig']
DEFAULT_COLUMN_WIDTH = 200 DEFAULT_COLUMN_WIDTH = 200
DEFAULT_IMPORT_DIRECTORY = "/home/kae/Nextcloud/tmp" DEFAULT_IMPORT_DIRECTORY = "/home/kae/Nextcloud/tmp"
DEFAULT_OUTPUT_DIRECTORY = "/home/kae/music/Singles" DEFAULT_OUTPUT_DIRECTORY = "/home/kae/music/Singles"
DISPLAY_SQL = True DISPLAY_SQL = False
ERRORS_TO = ['kae@midnighthax.com'] ERRORS_TO = ['kae@midnighthax.com']
FADE_STEPS = 20 FADE_STEPS = 20
FADE_TIME = 3000 FADE_TIME = 3000
INFO_TAB_TITLE_LENGTH = 15 INFO_TAB_TITLE_LENGTH = 15
INFO_TAB_URL = "https://www.wikipedia.org/w/index.php?search=%s" INFO_TAB_URL = "https://www.wikipedia.org/w/index.php?search=%s"
LOG_LEVEL_STDERR = logging.DEBUG LOG_LEVEL_STDERR = logging.INFO
LOG_LEVEL_SYSLOG = logging.DEBUG LOG_LEVEL_SYSLOG = logging.DEBUG
LOG_NAME = "musicmuster" LOG_NAME = "musicmuster"
MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD') MAIL_PASSWORD = os.environ.get('MAIL_PASSWORD')

View File

@ -1,38 +1,59 @@
import logging import inspect
import os import os
import sqlalchemy
from config import Config from config import Config
from sqlalchemy import create_engine
from contextlib import contextmanager from contextlib import contextmanager
from log import DEBUG
from sqlalchemy.orm import (sessionmaker, scoped_session) from sqlalchemy.orm import (sessionmaker, scoped_session)
from log import log
MYSQL_CONNECT = os.environ.get('MM_DB') class Counter:
if MYSQL_CONNECT is None: def __init__(self):
raise ValueError("MYSQL_CONNECT is undefined") self.count = 0
def __repr__(self):
return(f"<Counter({self.count=})>")
def inc(self):
self.count += 1
return self.count
def dec(self):
self.count -= 1
return self.count
MM_ENV = os.environ.get('MM_ENV', 'PRODUCTION')
testing = False
if MM_ENV == 'PRODUCTION':
dbname = os.environ.get('MM_PRODUCTION_DBNAME', 'musicmuster_prod')
dbuser = os.environ.get('MM_PRODUCTION_DBUSER', 'musicmuster')
dbpw = os.environ.get('MM_PRODUCTION_DBPW', 'musicmuster')
dbhost = os.environ.get('MM_PRODUCTION_DBHOST', 'localhost')
elif MM_ENV == 'TESTING':
dbname = os.environ.get('MM_TESTING_DBNAME', 'musicmuster_testing')
dbuser = os.environ.get('MM_TESTING_DBUSER', 'musicmuster_testing')
dbpw = os.environ.get('MM_TESTING_DBPW', 'musicmuster_testing')
dbhost = os.environ.get('MM_TESTING_DBHOST', 'localhost')
testing = True
elif MM_ENV == 'DEVELOPMENT':
dbname = os.environ.get('MM_DEVELOPMENT_DBNAME', 'musicmuster_dev')
dbuser = os.environ.get('MM_DEVELOPMENT_DBUSER', 'musicmuster')
dbpw = os.environ.get('MM_DEVELOPMENT_DBPW', 'musicmuster')
dbhost = os.environ.get('MM_DEVELOPMENT_DBHOST', 'localhost')
else: else:
dbname = MYSQL_CONNECT.split('/')[-1] raise ValueError(f"Unknown MusicMuster environment: {MM_ENV=}")
log.debug(f"Database: {dbname}")
# MM_ENV = os.environ.get('MM_ENV', 'PRODUCTION') DEBUG(f"Using {dbname} database")
# testing = False MYSQL_CONNECT = f"mysql+mysqldb://{dbuser}:{dbpw}@{dbhost}/{dbname}"
# if MM_ENV == 'TESTING':
# dbname = os.environ.get('MM_TESTING_DBNAME', 'musicmuster_testing')
# dbuser = os.environ.get('MM_TESTING_DBUSER', 'musicmuster_testing')
# dbpw = os.environ.get('MM_TESTING_DBPW', 'musicmuster_testing')
# dbhost = os.environ.get('MM_TESTING_DBHOST', 'localhost')
# testing = True
# else:
# raise ValueError(f"Unknown MusicMuster environment: {MM_ENV=}")
#
# MYSQL_CONNECT = f"mysql+mysqldb://{dbuser}:{dbpw}@{dbhost}/{dbname}"
engine = create_engine( engine = sqlalchemy.create_engine(
MYSQL_CONNECT, MYSQL_CONNECT,
encoding='utf-8', encoding='utf-8',
echo=Config.DISPLAY_SQL, echo=Config.DISPLAY_SQL,
pool_pre_ping=True, pool_pre_ping=True
future=True
) )
@ -42,9 +63,9 @@ def Session():
file = frame.filename file = frame.filename
function = frame.function function = frame.function
lineno = frame.lineno lineno = frame.lineno
Session = scoped_session(sessionmaker(bind=engine, future=True)) Session = scoped_session(sessionmaker(bind=engine))
log.debug(f"Session acquired, {file=}, {function=}, {lineno=}, {Session=}") DEBUG(f"Session acquired, {file=}, {function=}, {lineno=}, {Session=}")
yield Session yield Session
log.debug(" Session released") DEBUG(" Session released")
Session.commit() Session.commit()
Session.close() Session.close()

View File

@ -1,227 +1,227 @@
# import os import os
# import psutil import psutil
#
# from config import Config from config import Config
# from datetime import datetime from datetime import datetime
# from pydub import AudioSegment from pydub import AudioSegment
# from PyQt5.QtWidgets import QMessageBox from PyQt5.QtWidgets import QMessageBox
# from tinytag import TinyTag from tinytag import TinyTag
# from typing import Dict, Optional, Union from typing import Dict, Optional, Union
#
#
# def ask_yes_no(title: str, question: str) -> bool: def ask_yes_no(title: str, question: str) -> bool:
# """Ask question; return True for yes, False for no""" """Ask question; return True for yes, False for no"""
#
# button_reply: bool = QMessageBox.question(None, title, question) button_reply: bool = QMessageBox.question(None, title, question)
#
# return button_reply == QMessageBox.Yes return button_reply == QMessageBox.Yes
#
#
# def fade_point( def fade_point(
# audio_segment: AudioSegment, fade_threshold: int = 0, audio_segment: AudioSegment, fade_threshold: int = 0,
# chunk_size: int = Config.AUDIO_SEGMENT_CHUNK_SIZE) -> int: chunk_size: int = Config.AUDIO_SEGMENT_CHUNK_SIZE) -> int:
# """ """
# Returns the millisecond/index of the point where the volume drops below Returns the millisecond/index of the point where the volume drops below
# the maximum and doesn't get louder again. the maximum and doesn't get louder again.
# audio_segment - the sdlg_search_database_uiegment to find silence in audio_segment - the sdlg_search_database_uiegment to find silence in
# fade_threshold - the upper bound for how quiet is silent in dFBS fade_threshold - the upper bound for how quiet is silent in dFBS
# chunk_size - chunk size for interating over the segment in ms chunk_size - chunk size for interating over the segment in ms
# """ """
#
# assert chunk_size > 0 # to avoid infinite loop assert chunk_size > 0 # to avoid infinite loop
#
# segment_length: int = audio_segment.duration_seconds * 1000 # ms segment_length: int = audio_segment.duration_seconds * 1000 # ms
# trim_ms: int = segment_length - chunk_size trim_ms: int = segment_length - chunk_size
# max_vol: int = audio_segment.dBFS max_vol: int = audio_segment.dBFS
# if fade_threshold == 0: if fade_threshold == 0:
# fade_threshold = max_vol fade_threshold = max_vol
#
# while ( while (
# audio_segment[trim_ms:trim_ms + chunk_size].dBFS < fade_threshold audio_segment[trim_ms:trim_ms + chunk_size].dBFS < fade_threshold
# and trim_ms > 0): # noqa W503 and trim_ms > 0): # noqa W503
# trim_ms -= chunk_size trim_ms -= chunk_size
#
# # if there is no trailing silence, return lenght of track (it's less # if there is no trailing silence, return lenght of track (it's less
# # the chunk_size, but for chunk_size = 10ms, this may be ignored) # the chunk_size, but for chunk_size = 10ms, this may be ignored)
# return int(trim_ms) return int(trim_ms)
#
#
# def get_audio_segment(path: str) -> Optional[AudioSegment]: def get_audio_segment(path: str) -> Optional[AudioSegment]:
# try: try:
# if path.endswith('.mp3'): if path.endswith('.mp3'):
# return AudioSegment.from_mp3(path) return AudioSegment.from_mp3(path)
# elif path.endswith('.flac'): elif path.endswith('.flac'):
# return AudioSegment.from_file(path, "flac") return AudioSegment.from_file(path, "flac")
# except AttributeError: except AttributeError:
# return None return None
#
#
# def get_tags(path: str) -> Dict[str, Union[str, int]]: def get_tags(path: str) -> Dict[str, Union[str, int]]:
# """ """
# Return a dictionary of title, artist, duration-in-milliseconds and path. Return a dictionary of title, artist, duration-in-milliseconds and path.
# """ """
#
# tag: TinyTag = TinyTag.get(path) tag: TinyTag = TinyTag.get(path)
#
# d = dict( d = dict(
# title=tag.title, title=tag.title,
# artist=tag.artist, artist=tag.artist,
# duration=int(round(tag.duration, Config.MILLISECOND_SIGFIGS) * 1000), duration=int(round(tag.duration, Config.MILLISECOND_SIGFIGS) * 1000),
# path=path path=path
# ) )
# return d return d
#
#
# def get_relative_date(past_date: datetime, reference_date: datetime = None) \ def get_relative_date(past_date: datetime, reference_date: datetime = None) \
# -> str: -> str:
# """ """
# Return how long before reference_date past_date is as string. Return how long before reference_date past_date is as string.
#
# Params: Params:
# @past_date: datetime @past_date: datetime
# @reference_date: datetime, defaults to current date and time @reference_date: datetime, defaults to current date and time
#
# @return: string @return: string
# """ """
#
# if not past_date: if not past_date:
# return "Never" return "Never"
# if not reference_date: if not reference_date:
# reference_date = datetime.now() reference_date = datetime.now()
#
# # Check parameters # Check parameters
# if past_date > reference_date: if past_date > reference_date:
# return "get_relative_date() past_date is after relative_date" return "get_relative_date() past_date is after relative_date"
#
# days: int days: int
# days_str: str days_str: str
# weeks: int weeks: int
# weeks_str: str weeks_str: str
#
# weeks, days = divmod((reference_date.date() - past_date.date()).days, 7) weeks, days = divmod((reference_date.date() - past_date.date()).days, 7)
# if weeks == days == 0: if weeks == days == 0:
# # Played today, so return time instead # Played today, so return time instead
# return past_date.strftime("%H:%M") return past_date.strftime("%H:%M")
# if weeks == 1: if weeks == 1:
# weeks_str = "week" weeks_str = "week"
# else: else:
# weeks_str = "weeks" weeks_str = "weeks"
# if days == 1: if days == 1:
# days_str = "day" days_str = "day"
# else: else:
# days_str = "days" days_str = "days"
# return f"{weeks} {weeks_str}, {days} {days_str} ago" return f"{weeks} {weeks_str}, {days} {days_str} ago"
#
#
# def leading_silence( def leading_silence(
# audio_segment: AudioSegment, audio_segment: AudioSegment,
# silence_threshold: int = Config.DBFS_SILENCE, silence_threshold: int = Config.DBFS_SILENCE,
# chunk_size: int = Config.AUDIO_SEGMENT_CHUNK_SIZE) -> int: chunk_size: int = Config.AUDIO_SEGMENT_CHUNK_SIZE) -> int:
# """ """
# Returns the millisecond/index that the leading silence ends. Returns the millisecond/index that the leading silence ends.
# audio_segment - the segment to find silence in audio_segment - the segment to find silence in
# silence_threshold - the upper bound for how quiet is silent in dFBS silence_threshold - the upper bound for how quiet is silent in dFBS
# chunk_size - chunk size for interating over the segment in ms chunk_size - chunk size for interating over the segment in ms
#
# https://github.com/jiaaro/pydub/blob/master/pydub/silence.py https://github.com/jiaaro/pydub/blob/master/pydub/silence.py
# """ """
#
# trim_ms: int = 0 # ms trim_ms: int = 0 # ms
# assert chunk_size > 0 # to avoid infinite loop assert chunk_size > 0 # to avoid infinite loop
# while ( while (
# audio_segment[trim_ms:trim_ms + chunk_size].dBFS < # noqa W504 audio_segment[trim_ms:trim_ms + chunk_size].dBFS < # noqa W504
# silence_threshold and trim_ms < len(audio_segment)): silence_threshold and trim_ms < len(audio_segment)):
# trim_ms += chunk_size trim_ms += chunk_size
#
# # if there is no end it should return the length of the segment # if there is no end it should return the length of the segment
# return min(trim_ms, len(audio_segment)) return min(trim_ms, len(audio_segment))
#
#
# def ms_to_mmss(ms: int, decimals: int = 0, negative: bool = False) -> str: def ms_to_mmss(ms: int, decimals: int = 0, negative: bool = False) -> str:
# """Convert milliseconds to mm:ss""" """Convert milliseconds to mm:ss"""
#
# minutes: int minutes: int
# remainder: int remainder: int
# seconds: float seconds: float
#
# if not ms: if not ms:
# return "-" return "-"
# sign = "" sign = ""
# if ms < 0: if ms < 0:
# if negative: if negative:
# sign = "-" sign = "-"
# else: else:
# ms = 0 ms = 0
#
# minutes, remainder = divmod(ms, 60 * 1000) minutes, remainder = divmod(ms, 60 * 1000)
# seconds = remainder / 1000 seconds = remainder / 1000
#
# # if seconds >= 59.5, it will be represented as 60, which looks odd. # if seconds >= 59.5, it will be represented as 60, which looks odd.
# # So, fake it under those circumstances # So, fake it under those circumstances
# if seconds >= 59.5: if seconds >= 59.5:
# seconds = 59.0 seconds = 59.0
#
# return f"{sign}{minutes:.0f}:{seconds:02.{decimals}f}" return f"{sign}{minutes:.0f}:{seconds:02.{decimals}f}"
#
#
# def open_in_audacity(path: str) -> Optional[bool]: def open_in_audacity(path: str) -> Optional[bool]:
# """ """
# Open passed file in Audacity Open passed file in Audacity
#
# Return True if apparently opened successfully, else False Return True if apparently opened successfully, else False
# """ """
#
# # Return if audacity not running # Return if audacity not running
# if "audacity" not in [i.name() for i in psutil.process_iter()]: if "audacity" not in [i.name() for i in psutil.process_iter()]:
# return False return False
#
# # Return if path not given # Return if path not given
# if not path: if not path:
# return False return False
#
# to_pipe: str = '/tmp/audacity_script_pipe.to.' + str(os.getuid()) to_pipe: str = '/tmp/audacity_script_pipe.to.' + str(os.getuid())
# from_pipe: str = '/tmp/audacity_script_pipe.from.' + str(os.getuid()) from_pipe: str = '/tmp/audacity_script_pipe.from.' + str(os.getuid())
# eol: str = '\n' eol: str = '\n'
#
# def send_command(command: str) -> None: def send_command(command: str) -> None:
# """Send a single command.""" """Send a single command."""
# to_audacity.write(command + eol) to_audacity.write(command + eol)
# to_audacity.flush() to_audacity.flush()
#
# def get_response() -> str: def get_response() -> str:
# """Return the command response.""" """Return the command response."""
#
# result: str = '' result: str = ''
# line: str = '' line: str = ''
#
# while True: while True:
# result += line result += line
# line = from_audacity.readline() line = from_audacity.readline()
# if line == '\n' and len(result) > 0: if line == '\n' and len(result) > 0:
# break break
# return result return result
#
# def do_command(command: str) -> str: def do_command(command: str) -> str:
# """Send one command, and return the response.""" """Send one command, and return the response."""
#
# send_command(command) send_command(command)
# response = get_response() response = get_response()
# return response return response
#
# with open(to_pipe, 'w') as to_audacity, open( with open(to_pipe, 'w') as to_audacity, open(
# from_pipe, 'rt') as from_audacity: from_pipe, 'rt') as from_audacity:
# do_command(f'Import2: Filename="{path}"') do_command(f'Import2: Filename="{path}"')
#
#
# def show_warning(title: str, msg: str) -> None: def show_warning(title: str, msg: str) -> None:
# """Display a warning to user""" """Display a warning to user"""
#
# QMessageBox.warning(None, title, msg, buttons=QMessageBox.Cancel) QMessageBox.warning(None, title, msg, buttons=QMessageBox.Cancel)
#
#
# def trailing_silence( def trailing_silence(
# audio_segment: AudioSegment, silence_threshold: int = -50, audio_segment: AudioSegment, silence_threshold: int = -50,
# chunk_size: int = Config.AUDIO_SEGMENT_CHUNK_SIZE) -> int: chunk_size: int = Config.AUDIO_SEGMENT_CHUNK_SIZE) -> int:
# """Return fade point from start in milliseconds""" """Return fade point from start in milliseconds"""
#
# return fade_point(audio_segment, silence_threshold, chunk_size) return fade_point(audio_segment, silence_threshold, chunk_size)

View File

@ -20,19 +20,6 @@ class LevelTagFilter(logging.Filter):
return True return True
class DebugStdoutFilter(logging.Filter):
"""Filter debug messages sent to stdout"""
def filter(self, record):
if record.levelno != logging.DEBUG:
return True
if record.module in Config.DEBUG_MODULES:
return True
if record.funcName in Config.DEBUG_FUNCTIONS:
return True
return False
log = logging.getLogger(Config.LOG_NAME) log = logging.getLogger(Config.LOG_NAME)
log.setLevel(logging.DEBUG) log.setLevel(logging.DEBUG)
@ -46,19 +33,13 @@ syslog.setLevel(Config.LOG_LEVEL_SYSLOG)
# Filter # Filter
local_filter = LevelTagFilter() local_filter = LevelTagFilter()
debug_filter = DebugStdoutFilter()
syslog.addFilter(local_filter) syslog.addFilter(local_filter)
stderr.addFilter(local_filter) stderr.addFilter(local_filter)
stderr.addFilter(debug_filter)
# create formatter and add it to the handlers # create formatter and add it to the handlers
stderr_fmt = logging.Formatter('[%(asctime)s] %(leveltag)s: %(message)s', stderr_fmt = logging.Formatter('[%(asctime)s] %(leveltag)s: %(message)s',
datefmt='%H:%M:%S') datefmt='%H:%M:%S')
syslog_fmt = logging.Formatter( syslog_fmt = logging.Formatter('[%(name)s] %(leveltag)s: %(message)s')
'[%(name)s] %(module)s.%(funcName)s - %(leveltag)s: %(message)s'
)
stderr.setFormatter(stderr_fmt) stderr.setFormatter(stderr_fmt)
syslog.setFormatter(syslog_fmt) syslog.setFormatter(syslog_fmt)
@ -76,3 +57,52 @@ def log_uncaught_exceptions(ex_cls, ex, tb):
sys.excepthook = log_uncaught_exceptions sys.excepthook = log_uncaught_exceptions
def DEBUG(msg: str, force_stderr: bool = False) -> None:
"""
Outupt a log message at level DEBUG. If force_stderr is True,
output this message to stderr regardless of default stderr level
setting.
"""
if force_stderr:
old_level = stderr.level
stderr.setLevel(logging.DEBUG)
log.debug(msg)
stderr.setLevel(old_level)
else:
log.debug(msg)
def EXCEPTION(msg: str) -> None:
log.exception(msg, exc_info=True, stack_info=True)
def ERROR(msg: str) -> None:
log.error(msg)
def INFO(msg: str) -> None:
log.info(msg)
if __name__ == "__main__":
DEBUG("hi debug")
ERROR("hi error")
INFO("hi info")
EXCEPTION("hi exception")
def f():
return g()
def g():
return h()
def h():
return i()
def i():
return 1 / 0
f()

File diff suppressed because it is too large Load Diff

View File

@ -1,181 +1,181 @@
# import os import os
# import threading import threading
# import vlc import vlc
#
# from config import Config from config import Config
# from datetime import datetime from datetime import datetime
# from time import sleep from time import sleep
#
# from log import log.debug, log.error from log import DEBUG, ERROR
#
# lock = threading.Lock() lock = threading.Lock()
#
#
# class Music: class Music:
# """ """
# Manage the playing of music tracks Manage the playing of music tracks
# """ """
#
# def __init__(self): def __init__(self):
# self.current_track_start_time = None self.current_track_start_time = None
# self.fading = 0 self.fading = 0
# self.VLC = vlc.Instance() self.VLC = vlc.Instance()
# self.player = None self.player = None
# self.track_path = None self.track_path = None
# self.max_volume = Config.VOLUME_VLC_DEFAULT self.max_volume = Config.VOLUME_VLC_DEFAULT
#
# def fade(self): def fade(self):
# """ """
# Fade the currently playing track. Fade the currently playing track.
#
# The actual management of fading runs in its own thread so as not The actual management of fading runs in its own thread so as not
# to hold up the UI during the fade. to hold up the UI during the fade.
# """ """
#
# log.debug("music.fade()", True) DEBUG("music.fade()", True)
#
# if not self.player: if not self.player:
# return return
#
# if not self.player.get_position() > 0 and self.player.is_playing(): if not self.player.get_position() > 0 and self.player.is_playing():
# return return
#
# self.fading += 1 self.fading += 1
#
# thread = threading.Thread(target=self._fade) thread = threading.Thread(target=self._fade)
# thread.start() thread.start()
#
# def _fade(self): def _fade(self):
# """ """
# Implementation of fading the current track in a separate thread. Implementation of fading the current track in a separate thread.
# """ """
#
# # Take a copy of current player to allow another track to be # Take a copy of current player to allow another track to be
# # started without interfering here # started without interfering here
#
# log.debug(f"music._fade(), {self.player=}", True) DEBUG(f"music._fade(), {self.player=}", True)
#
# with lock: with lock:
# p = self.player p = self.player
# self.player = None self.player = None
#
# log.debug("music._fade() post-lock", True) DEBUG("music._fade() post-lock", True)
#
# fade_time = Config.FADE_TIME / 1000 fade_time = Config.FADE_TIME / 1000
# steps = Config.FADE_STEPS steps = Config.FADE_STEPS
# sleep_time = fade_time / steps sleep_time = fade_time / steps
#
# # We reduce volume by one mesure first, then by two measures, # We reduce volume by one mesure first, then by two measures,
# # then three, and so on. # then three, and so on.
#
# # The sum of the arithmetic sequence 1, 2, 3, ..n is # The sum of the arithmetic sequence 1, 2, 3, ..n is
# # (n**2 + n) / 2 # (n**2 + n) / 2
# total_measures_count = (steps**2 + steps) / 2 total_measures_count = (steps**2 + steps) / 2
#
# measures_to_reduce_by = 0 measures_to_reduce_by = 0
# for i in range(1, steps + 1): for i in range(1, steps + 1):
# measures_to_reduce_by += i measures_to_reduce_by += i
# volume_factor = 1 - ( volume_factor = 1 - (
# measures_to_reduce_by / total_measures_count) measures_to_reduce_by / total_measures_count)
# p.audio_set_volume(int(self.max_volume * volume_factor)) p.audio_set_volume(int(self.max_volume * volume_factor))
# sleep(sleep_time) sleep(sleep_time)
#
# with lock: with lock:
# log.debug(f"music._fade(), stopping {p=}", True) DEBUG(f"music._fade(), stopping {p=}", True)
#
# p.stop() p.stop()
# log.debug(f"Releasing player {p=}", True) DEBUG(f"Releasing player {p=}", True)
# p.release() p.release()
#
# self.fading -= 1 self.fading -= 1
#
# def get_playtime(self): def get_playtime(self):
# """Return elapsed play time""" """Return elapsed play time"""
#
# with lock: with lock:
# if not self.player: if not self.player:
# return None return None
#
# return self.player.get_time() return self.player.get_time()
#
# def get_position(self): def get_position(self):
# """Return current position""" """Return current position"""
#
# with lock: with lock:
# log.debug("music.get_position", True) DEBUG("music.get_position", True)
#
# print(f"get_position, {self.player=}") print(f"get_position, {self.player=}")
# if not self.player: if not self.player:
# return return
# return self.player.get_position() return self.player.get_position()
#
# def play(self, path): def play(self, path):
# """ """
# Start playing the track at path. Start playing the track at path.
#
# Log and return if path not found. Log and return if path not found.
# """ """
#
# log.debug(f"music.play({path=})", True) DEBUG(f"music.play({path=})", True)
#
# if not os.access(path, os.R_OK): if not os.access(path, os.R_OK):
# log.error(f"play({path}): path not found") ERROR(f"play({path}): path not found")
# return return
#
# self.track_path = path self.track_path = path
#
# self.player = self.VLC.media_player_new(path) self.player = self.VLC.media_player_new(path)
# self.player.audio_set_volume(self.max_volume) self.player.audio_set_volume(self.max_volume)
# log.debug(f"music.play({path=}), {self.player}", True) DEBUG(f"music.play({path=}), {self.player}", True)
# self.player.play() self.player.play()
# self.current_track_start_time = datetime.now() self.current_track_start_time = datetime.now()
#
# def playing(self): def playing(self):
# """ """
# Return True if currently playing a track, else False Return True if currently playing a track, else False
#
# vlc.is_playing() returns True if track was faded out. vlc.is_playing() returns True if track was faded out.
# get_position seems more reliable. get_position seems more reliable.
# """ """
#
# with lock: with lock:
# if self.player: if self.player:
# if self.player.get_position() > 0 and self.player.is_playing(): if self.player.get_position() > 0 and self.player.is_playing():
# return True return True
#
# # We take a copy of the player when fading, so we could be # We take a copy of the player when fading, so we could be
# # playing in a fade nowFalse # playing in a fade nowFalse
# return self.fading > 0 return self.fading > 0
#
# def set_position(self, ms): def set_position(self, ms):
# """Set current play time in milliseconds from start""" """Set current play time in milliseconds from start"""
#
# with lock: with lock:
# return self.player.set_time(ms) return self.player.set_time(ms)
#
# def set_volume(self, volume): def set_volume(self, volume):
# """Set maximum volume used for player""" """Set maximum volume used for player"""
#
# with lock: with lock:
# if not self.player: if not self.player:
# return return
#
# self.max_volume = volume self.max_volume = volume
# self.player.audio_set_volume(volume) self.player.audio_set_volume(volume)
#
# def stop(self): def stop(self):
# """Immediately stop playing""" """Immediately stop playing"""
#
# log.debug(f"music.stop(), {self.player=}", True) DEBUG(f"music.stop(), {self.player=}", True)
#
# with lock: with lock:
# if not self.player: if not self.player:
# return return
#
# position = self.player.get_position() position = self.player.get_position()
# self.player.stop() self.player.stop()
# log.debug(f"music.stop(): Releasing player {self.player=}", True) DEBUG(f"music.stop(): Releasing player {self.player=}", True)
# self.player.release() self.player.release()
# # Ensure we don't reference player after release # Ensure we don't reference player after release
# self.player = None self.player = None
# return position return position

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,292 +1,292 @@
# #!/usr/bin/env python #!/usr/bin/env python
import argparse
import os
import shutil
import tempfile
import helpers
from config import Config
from helpers import (
fade_point,
get_audio_segment,
get_tags,
leading_silence,
trailing_silence,
)
from log import DEBUG, INFO
from models import Notes, Playdates, Session, Tracks
from mutagen.flac import FLAC
from mutagen.mp3 import MP3
from pydub import effects
# Globals (I know)
messages = []
def main():
"""Main loop"""
DEBUG("Starting")
p = argparse.ArgumentParser()
# Only allow one option to be specified
group = p.add_mutually_exclusive_group()
group.add_argument('-u', '--update',
action="store_true", dest="update",
default=False, help="Update database")
group.add_argument('-f', '--full-update',
action="store_true", dest="full_update",
default=False, help="Update database")
args = p.parse_args()
# Run as required
if args.update:
DEBUG("Updating database")
with Session() as session:
update_db(session)
elif args.full_update:
DEBUG("Full update of database")
with Session() as session:
full_update_db(session)
else:
INFO("No action specified")
DEBUG("Finished")
def create_track_from_file(session, path, normalise=None, tags=None):
"""
Create track in database from passed path, or update database entry
if path already in database.
Return track.
"""
if not tags:
t = get_tags(path)
else:
t = tags
track = Tracks.get_or_create(session, path)
track.title = t['title']
track.artist = t['artist']
audio = get_audio_segment(path)
track.duration = len(audio)
track.start_gap = leading_silence(audio)
track.fade_at = round(fade_point(audio) / 1000,
Config.MILLISECOND_SIGFIGS) * 1000
track.silence_at = round(trailing_silence(audio) / 1000,
Config.MILLISECOND_SIGFIGS) * 1000
track.mtime = os.path.getmtime(path)
session.commit()
if normalise or normalise is None and Config.NORMALISE_ON_IMPORT:
# Check type
ftype = os.path.splitext(path)[1][1:]
if ftype not in ['mp3', 'flac']:
INFO(f"File type {ftype} not implemented")
return track
# Get current file gid, uid and permissions
stats = os.stat(path)
try:
# Copy original file
fd, temp_path = tempfile.mkstemp()
shutil.copyfile(path, temp_path)
except Exception as err:
DEBUG(f"songdb.create_track_from_file({path}): err1: {repr(err)}")
return
# Overwrite original file with normalised output
normalised = effects.normalize(audio)
try:
normalised.export(path, format=os.path.splitext(path)[1][1:])
# Fix up permssions and ownership
os.chown(path, stats.st_uid, stats.st_gid)
os.chmod(path, stats.st_mode)
# Copy tags
if ftype == 'flac':
tag_handler = FLAC
elif ftype == 'mp3':
tag_handler = MP3
else:
return track
src = tag_handler(temp_path)
dst = tag_handler(path)
for tag in src:
dst[tag] = src[tag]
dst.save()
except Exception as err:
DEBUG(f"songdb.create_track_from_file({path}): err2: {repr(err)}")
# Restore original file
shutil.copyfile(path, temp_path)
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
return track
def full_update_db(session):
"""Rescan all entries in database"""
def log(msg):
INFO(f"full_update_db(): {msg}")
def check_change(track_id, title, attribute, old, new):
if new > (old * 1.1) or new < (old * 0.9):
log(
"\n"
f"track[{track_id}] ({title}) "
f"{attribute} updated from {old} to {new}"
)
# Start with normal update to add new tracks and remove any missing
# files
log("update_db()")
update_db(session)
# Now update track length, silence and fade for every track in
# database
tracks = Tracks.get_all_tracks(session)
total_tracks = len(tracks)
log(f"Processing {total_tracks} tracks")
track_count = 0
for track in tracks:
track_count += 1
print(f"\rTrack {track_count} of {total_tracks}", end='')
# Sanity check
tag = get_music_info(track.path)
if not tag['title']:
log(f"track[{track.id}] {track.title=}: No tag title")
continue
if not tag['artist']:
log(f"track[{track.id}] {track.artist=}: No tag artist")
continue
# Update title and artist
if track.title != tag['title']:
track.title = tag['title']
if track.artist != tag['artist']:
track.artist = tag['artist']
# Update numbers; log if more than 10% different
duration = int(round(
tag['duration'], Config.MILLISECOND_SIGFIGS) * 1000)
check_change(track.id, track.title, "duration", track.duration,
duration)
track.duration = duration
audio = get_audio_segment(track.path)
start_gap = leading_silence(audio)
check_change(track.id, track.title, "start_gap", track.start_gap,
start_gap)
track.start_gap = start_gap
fade_at = fade_point(audio)
check_change(track.id, track.title, "fade_at", track.fade_at,
fade_at)
track.fade_at = fade_at
silence_at = trailing_silence(audio)
check_change(track.id, track.title, "silence_at", track.silence_at,
silence_at)
track.silence_at = silence_at
session.commit()
def update_db(session):
"""
Repopulate database
"""
# Search for tracks that are in the music directory but not the datebase
# Check all paths in database exist
# If issues found, write to stdout but do not try to resolve them
db_paths = set(Tracks.get_all_paths(session))
os_paths_list = []
for root, dirs, files in os.walk(Config.ROOT):
for f in files:
path = os.path.join(root, f)
ext = os.path.splitext(f)[1]
if ext in [".flac", ".mp3"]:
os_paths_list.append(path)
os_paths = set(os_paths_list)
# Find any files in music directory that are not in database
files_not_in_db = list(os_paths - db_paths)
# Find paths in database missing in music directory
paths_not_found = []
missing_file_count = 0
more_files_to_report = False
for path in list(db_paths - os_paths):
if missing_file_count >= Config.MAX_MISSING_FILES_TO_REPORT:
more_files_to_report = True
break
missing_file_count += 1
track = Tracks.get_by_path(session, path)
if not track:
ERROR(f"update_db: {path} not found in db")
continue
paths_not_found.append(track)
# Output messages (so if running via cron, these will get sent to
# user)
if files_not_in_db:
print("Files in music directory but not in database")
print("--------------------------------------------")
print("\n".join(files_not_in_db))
print("\n")
if paths_not_found:
print("Invalid paths in database")
print("-------------------------")
for t in paths_not_found:
print(f"""
Track ID: {t.id}
Path: {t.path}
Title: {t.title}
Artist: {t.artist}
""")
if more_files_to_report:
print("There were more paths than listed that were not found")
# Spike
# #
# import argparse # # Manage tracks listed in database but where path is invalid
# import os # DEBUG(f"Invalid {path=} in database", True)
# import shutil # track = Tracks.get_by_path(session, path)
# import tempfile # messages.append(f"Remove from database: {path=} {track=}")
# #
# import helpers # # Remove references from Playdates
# from config import Config # Playdates.remove_track(session, track.id)
# from helpers import (
# fade_point,
# get_audio_segment,
# get_tags,
# leading_silence,
# trailing_silence,
# )
# from log import log.debug, log.info
# from models import Notes, Playdates, Session, Tracks
# from mutagen.flac import FLAC
# from mutagen.mp3 import MP3
# from pydub import effects
# #
# # Globals (I know) # # Replace playlist entries with a note
# messages = [] # note_txt = (
# f"File removed: {track.title=}, {track.artist=}, "
# f"{track.path=}"
# )
# for playlist_track in track.playlists:
# row = playlist_track.row
# # Remove playlist entry
# DEBUG(f"Remove {row=} from {playlist_track.playlist_id}", True)
# playlist_track.playlist.remove_track(session, row)
# # Create note
# DEBUG(f"Add note at {row=} to {playlist_track.playlist_id=}", True)
# Notes(session, playlist_track.playlist_id, row, note_txt)
# #
# # # Remove Track entry pointing to invalid path
# def main(): # Tracks.remove_by_path(session, path)
# """Main loop"""
# if __name__ == '__main__' and '__file__' in globals():
# log.debug("Starting") main()
#
# p = argparse.ArgumentParser()
# # Only allow one option to be specified
# group = p.add_mutually_exclusive_group()
# group.add_argument('-u', '--update',
# action="store_true", dest="update",
# default=False, help="Update database")
# group.add_argument('-f', '--full-update',
# action="store_true", dest="full_update",
# default=False, help="Update database")
# args = p.parse_args()
#
# # Run as required
# if args.update:
# log.debug("Updating database")
# with Session() as session:
# update_db(session)
# elif args.full_update:
# log.debug("Full update of database")
# with Session() as session:
# full_update_db(session)
# else:
# log.info("No action specified")
#
# log.debug("Finished")
#
#
# def create_track_from_file(session, path, normalise=None, tags=None):
# """
# Create track in database from passed path, or update database entry
# if path already in database.
#
# Return track.
# """
#
# if not tags:
# t = get_tags(path)
# else:
# t = tags
#
# track = Tracks.get_or_create(session, path)
# track.title = t['title']
# track.artist = t['artist']
# audio = get_audio_segment(path)
# track.duration = len(audio)
# track.start_gap = leading_silence(audio)
# track.fade_at = round(fade_point(audio) / 1000,
# Config.MILLISECOND_SIGFIGS) * 1000
# track.silence_at = round(trailing_silence(audio) / 1000,
# Config.MILLISECOND_SIGFIGS) * 1000
# track.mtime = os.path.getmtime(path)
# session.commit()
#
# if normalise or normalise is None and Config.NORMALISE_ON_IMPORT:
# # Check type
# ftype = os.path.splitext(path)[1][1:]
# if ftype not in ['mp3', 'flac']:
# log.info(f"File type {ftype} not implemented")
# return track
#
# # Get current file gid, uid and permissions
# stats = os.stat(path)
# try:
# # Copy original file
# fd, temp_path = tempfile.mkstemp()
# shutil.copyfile(path, temp_path)
# except Exception as err:
# log.debug(f"songdb.create_track_from_file({path}): err1: {repr(err)}")
# return
#
# # Overwrite original file with normalised output
# normalised = effects.normalize(audio)
# try:
# normalised.export(path, format=os.path.splitext(path)[1][1:])
# # Fix up permssions and ownership
# os.chown(path, stats.st_uid, stats.st_gid)
# os.chmod(path, stats.st_mode)
# # Copy tags
# if ftype == 'flac':
# tag_handler = FLAC
# elif ftype == 'mp3':
# tag_handler = MP3
# else:
# return track
# src = tag_handler(temp_path)
# dst = tag_handler(path)
# for tag in src:
# dst[tag] = src[tag]
# dst.save()
# except Exception as err:
# log.debug(f"songdb.create_track_from_file({path}): err2: {repr(err)}")
# # Restore original file
# shutil.copyfile(path, temp_path)
# finally:
# if os.path.exists(temp_path):
# os.remove(temp_path)
#
# return track
#
#
# def full_update_db(session):
# """Rescan all entries in database"""
#
# def log(msg):
# log.info(f"full_update_db(): {msg}")
#
# def check_change(track_id, title, attribute, old, new):
# if new > (old * 1.1) or new < (old * 0.9):
# log(
# "\n"
# f"track[{track_id}] ({title}) "
# f"{attribute} updated from {old} to {new}"
# )
#
# # Start with normal update to add new tracks and remove any missing
# # files
# log("update_db()")
# update_db(session)
#
# # Now update track length, silence and fade for every track in
# # database
#
# tracks = Tracks.get_all_tracks(session)
# total_tracks = len(tracks)
# log(f"Processing {total_tracks} tracks")
# track_count = 0
# for track in tracks:
# track_count += 1
# print(f"\rTrack {track_count} of {total_tracks}", end='')
#
# # Sanity check
# tag = get_music_info(track.path)
# if not tag['title']:
# log(f"track[{track.id}] {track.title=}: No tag title")
# continue
# if not tag['artist']:
# log(f"track[{track.id}] {track.artist=}: No tag artist")
# continue
#
# # Update title and artist
# if track.title != tag['title']:
# track.title = tag['title']
# if track.artist != tag['artist']:
# track.artist = tag['artist']
#
# # Update numbers; log if more than 10% different
# duration = int(round(
# tag['duration'], Config.MILLISECOND_SIGFIGS) * 1000)
# check_change(track.id, track.title, "duration", track.duration,
# duration)
# track.duration = duration
#
# audio = get_audio_segment(track.path)
#
# start_gap = leading_silence(audio)
# check_change(track.id, track.title, "start_gap", track.start_gap,
# start_gap)
# track.start_gap = start_gap
#
# fade_at = fade_point(audio)
# check_change(track.id, track.title, "fade_at", track.fade_at,
# fade_at)
# track.fade_at = fade_at
#
# silence_at = trailing_silence(audio)
# check_change(track.id, track.title, "silence_at", track.silence_at,
# silence_at)
# track.silence_at = silence_at
# session.commit()
#
#
# def update_db(session):
# """
# Repopulate database
# """
#
# # Search for tracks that are in the music directory but not the datebase
# # Check all paths in database exist
# # If issues found, write to stdout but do not try to resolve them
#
# db_paths = set(Tracks.get_all_paths(session))
#
# os_paths_list = []
# for root, dirs, files in os.walk(Config.ROOT):
# for f in files:
# path = os.path.join(root, f)
# ext = os.path.splitext(f)[1]
# if ext in [".flac", ".mp3"]:
# os_paths_list.append(path)
# os_paths = set(os_paths_list)
#
# # Find any files in music directory that are not in database
# files_not_in_db = list(os_paths - db_paths)
#
# # Find paths in database missing in music directory
# paths_not_found = []
# missing_file_count = 0
# more_files_to_report = False
# for path in list(db_paths - os_paths):
# if missing_file_count >= Config.MAX_MISSING_FILES_TO_REPORT:
# more_files_to_report = True
# break
#
# missing_file_count += 1
#
# track = Tracks.get_by_path(session, path)
# if not track:
# log.error(f"update_db: {path} not found in db")
# continue
#
# paths_not_found.append(track)
#
# # Output messages (so if running via cron, these will get sent to
# # user)
# if files_not_in_db:
# print("Files in music directory but not in database")
# print("--------------------------------------------")
# print("\n".join(files_not_in_db))
# print("\n")
# if paths_not_found:
# print("Invalid paths in database")
# print("-------------------------")
# for t in paths_not_found:
# print(f"""
# Track ID: {t.id}
# Path: {t.path}
# Title: {t.title}
# Artist: {t.artist}
# """)
# if more_files_to_report:
# print("There were more paths than listed that were not found")
#
#
# # Spike
# #
# # # Manage tracks listed in database but where path is invalid
# # log.debug(f"Invalid {path=} in database", True)
# # track = Tracks.get_by_path(session, path)
# # messages.append(f"Remove from database: {path=} {track=}")
# #
# # # Remove references from Playdates
# # Playdates.remove_track(session, track.id)
# #
# # # Replace playlist entries with a note
# # note_txt = (
# # f"File removed: {track.title=}, {track.artist=}, "
# # f"{track.path=}"
# # )
# # for playlist_track in track.playlists:
# # row = playlist_track.row
# # # Remove playlist entry
# # log.debug(f"Remove {row=} from {playlist_track.playlist_id}", True)
# # playlist_track.playlist.remove_track(session, row)
# # # Create note
# # log.debug(f"Add note at {row=} to {playlist_track.playlist_id=}", True)
# # Notes(session, playlist_track.playlist_id, row, note_txt)
# #
# # # Remove Track entry pointing to invalid path
# # Tracks.remove_by_path(session, path)
#
# if __name__ == '__main__' and '__file__' in globals():
# main()