Rework Audacity import/export

This commit is contained in:
Keith Edmunds 2023-12-22 13:40:24 +00:00
parent 3922be2642
commit 37711f883f
5 changed files with 381 additions and 87 deletions

View File

@ -5,6 +5,7 @@ from typing import List, Optional
class Config(object):
AUDACITY_TIMEOUT_TENTHS = 100
AUDIO_SEGMENT_CHUNK_SIZE = 10
BITRATE_LOW_THRESHOLD = 192
BITRATE_OK_THRESHOLD = 300

View File

@ -22,75 +22,11 @@ from log import log
start_time_re = re.compile(r"@\d\d:\d\d")
class AudacityManager:
"""
Manage comms with Audacity
"""
def __init__(self) -> None:
"""
Open passed file in Audacity
Return True if apparently opened successfully, else False
"""
self.to_pipe: str = "/tmp/audacity_script_pipe.to." + str(os.getuid())
self.from_pipe: str = "/tmp/audacity_script_pipe.from." + str(os.getuid())
self.eol: str = "\n"
self.path = ""
def open_file(self, path: str) -> str:
"""Open passed file in Audacity"""
self.path = path
return self._do_command(f'Import2: Filename="{self.path}"')
def export_file(self) -> str:
"""Export current file"""
if not self.path:
return "Error: no path selected"
sa_response = self._do_command("SelectAll:")
if sa_response == "\nBatchCommand finished: OK\n":
exp_response = self._do_command(
f'Export2: Filename="{self.path}" NumChannels=2'
)
return exp_response
else:
return "SelectAll response: " + sa_response
def _send_command(self, command: str) -> None:
"""Send a single command."""
self.to_audacity.write(command + self.eol)
self.to_audacity.flush()
def _get_response(self) -> str:
"""Return the command response."""
result: str = ""
line: str = ""
while True:
result += line
line = self.from_audacity.readline()
if line == "\n" and len(result) > 0:
break
return result
def _do_command(self, command: str) -> str:
"""Send one command, and return the response."""
with open(self.to_pipe, "w") as self.to_audacity, open(
self.from_pipe, "rt"
) as self.from_audacity:
self._send_command(command)
response = self._get_response()
return response
def ask_yes_no(
title: str, question: str, default_yes: bool = False, parent: Optional[QMainWindow] = None
title: str,
question: str,
default_yes: bool = False,
parent: Optional[QMainWindow] = None,
) -> bool:
"""Ask question; return True for yes, False for no"""

View File

@ -15,6 +15,7 @@ import subprocess
import sys
import threading
import pipeclient
from pygame import mixer
from PyQt6.QtCore import (
pyqtSignal,
@ -148,7 +149,10 @@ class ImportTrack(QObject):
import_finished = pyqtSignal()
def __init__(
self, filenames: List[str], source_model: PlaylistModel, row_number: Optional[int]
self,
filenames: List[str],
source_model: PlaylistModel,
row_number: Optional[int],
) -> None:
super().__init__()
self.filenames = filenames
@ -170,7 +174,9 @@ class ImportTrack(QObject):
try:
track = Tracks(session, **metadata)
except Exception as e:
self.signals.show_warning_signal.emit("Error importing track", str(e))
self.signals.show_warning_signal.emit(
"Error importing track", str(e)
)
return
helpers.normalise_track(track.path)
# We're importing potentially multiple tracks in a loop.
@ -220,6 +226,9 @@ class Window(QMainWindow, Ui_MainWindow):
self.move_source_rows: Optional[List[int]] = None
self.move_source_model: Optional[PlaylistProxyModel] = None
self.audacity_file_path: Optional[str] = None
# Initialise Audacity access
self.audacity_client = pipeclient.PipeClient()
log.info(f"{hex(id(self.audacity_client))=}")
if Config.CARTS_HIDE:
self.cartsWidget.hide()
@ -1352,9 +1361,13 @@ class Window(QMainWindow, Ui_MainWindow):
self.tabPlaylist.setCurrentIndex(idx)
break
display_row = self.active_proxy_model().mapFromSource(
self.active_proxy_model().source_model.index(plt.plr_rownum, 0)
).row()
display_row = (
self.active_proxy_model()
.mapFromSource(
self.active_proxy_model().source_model.index(plt.plr_rownum, 0)
)
.row()
)
self.tabPlaylist.currentWidget().scroll_to_top(display_row)
def solicit_playlist_name(

293
app/pipeclient.py Executable file
View File

@ -0,0 +1,293 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from log import log
"""Automate Audacity via mod-script-pipe.
Pipe Client may be used as a command-line script to send commands to
Audacity via the mod-script-pipe interface, or loaded as a module.
Requires Python 3.
(Python 2.7 is now obsolete, so no longer supported)
======================
Command Line Interface
======================
usage: pipeclient.py [-h] [-t] [-s ] [-d]
Arguments
---------
-h,--help: optional
show short help and exit
-t, --timeout: float, optional
timeout for reply in seconds (default: 10)
-s, --show-time: bool, optional
show command execution time (default: True)
-d, --docs: optional
show this documentation and exit
Example
-------
$ python3 pipeclient.py -t 20 -s False
Launches command line interface with 20 second time-out for
returned message, and don't show the execution time.
When prompted, enter the command to send (not quoted), or 'Q' to quit.
$ Enter command or 'Q' to quit: GetInfo: Type=Tracks Format=LISP
============
Module Usage
============
Note that on a critical error (such as broken pipe), the module just exits.
If a more graceful shutdown is required, replace the sys.exit()'s with
exceptions.
Example
-------
# Import the module:
>>> import pipeclient
# Create a client instance:
>>> client = pipeclient.PipeClient()
# Send a command:
>>> client.write("Command", timer=True)
# Read the last reply:
>>> print(client.read())
See Also
--------
PipeClient.write : Write a command to _write_pipe.
PipeClient.read : Read Audacity's reply from pipe.
Copyright Steve Daulton 2018
Released under terms of the GNU General Public License version 2:
<http://www.gnu.org/licenses/old-licenses/gpl-2.0.html />
"""
import os
import sys
import threading
import time
import errno
import argparse
if sys.version_info[0] < 3:
sys.exit('PipeClient Error: Python 3.x required')
# Platform specific constants
if sys.platform == 'win32':
WRITE_NAME: str = '\\\\.\\pipe\\ToSrvPipe'
READ_NAME: str = '\\\\.\\pipe\\FromSrvPipe'
EOL: str = '\r\n\0'
else:
# Linux or Mac
PIPE_BASE: str = '/tmp/audacity_script_pipe.'
WRITE_NAME: str = PIPE_BASE + 'to.' + str(os.getuid())
READ_NAME: str = PIPE_BASE + 'from.' + str(os.getuid())
EOL: str = '\n'
class PipeClient():
"""Write / read client access to Audacity via named pipes.
Normally there should be just one instance of this class. If
more instances are created, they all share the same state.
__init__ calls _write_thread_start() and _read_thread_start() on
first instantiation.
Parameters
----------
None
Attributes
----------
reader_pipe_broken : event object
Set if pipe reader fails. Audacity may have crashed
reply_ready : event object
flag cleared when command sent and set when response received
timer : bool
When true, time the command execution (default False)
reply : string
message received when Audacity completes the command
See Also
--------
write : Write a command to _write_pipe.
read : Read Audacity's reply from pipe.
"""
reader_pipe_broken = threading.Event()
reply_ready = threading.Event()
_shared_state: dict = {}
def __new__(cls, *p, **k):
self = object.__new__(cls, *p, **k)
self.__dict__ = cls._shared_state
return self
def __init__(self):
self.timer: bool = False
self._start_time: float = 0
self._write_pipe = None
self.reply: str = ''
if not self._write_pipe:
self._write_thread_start()
self._read_thread_start()
def _write_thread_start(self) -> None:
"""Start _write_pipe thread"""
# Pipe is opened in a new thread so that we don't
# freeze if Audacity is not running.
write_thread = threading.Thread(target=self._write_pipe_open)
write_thread.daemon = True
write_thread.start()
# Allow a little time for connection to be made.
time.sleep(0.1)
if not self._write_pipe:
sys.exit('PipeClientError: Write pipe cannot be opened.')
def _write_pipe_open(self) -> None:
"""Open _write_pipe."""
self._write_pipe = open(WRITE_NAME, 'w', encoding='ascii')
def _read_thread_start(self) -> None:
"""Start read_pipe thread."""
read_thread = threading.Thread(target=self._reader)
read_thread.daemon = True
read_thread.start()
def write(self, command, timer=False) -> None:
"""Write a command to _write_pipe.
Parameters
----------
command : string
The command to send to Audacity
timer : bool, optional
If true, time the execution of the command
Example
-------
write("GetInfo: Type=Labels", timer=True):
"""
self.timer = timer
self._write_pipe.write(command + EOL)
# Check that read pipe is alive
if PipeClient.reader_pipe_broken.is_set():
sys.exit('PipeClient: Read-pipe error.')
try:
self._write_pipe.flush()
if self.timer:
self._start_time = time.time()
self.reply = ''
PipeClient.reply_ready.clear()
except IOError as err:
if err.errno == errno.EPIPE:
sys.exit('PipeClient: Write-pipe error.')
else:
raise
def _reader(self) -> None:
"""Read FIFO in worker thread."""
# Thread will wait at this read until it connects.
# Connection should occur as soon as _write_pipe has connected.
with open(READ_NAME, 'r', encoding='ascii') as read_pipe:
message = ''
pipe_ok = True
while pipe_ok:
line = read_pipe.readline()
# Stop timer as soon as we get first line of response.
stop_time = time.time()
while pipe_ok and line != '\n':
message += line
line = read_pipe.readline()
if line == '':
# No data in read_pipe indicates that the pipe
# is broken (Audacity may have crashed).
PipeClient.reader_pipe_broken.set()
pipe_ok = False
if self.timer:
xtime = (stop_time - self._start_time) * 1000
message += f'Execution time: {xtime:.2f}ms'
self.reply = message
PipeClient.reply_ready.set()
message = ''
def read(self) -> str:
"""Read Audacity's reply from pipe.
Returns
-------
string
The reply from the last command sent to Audacity, or null string
if reply not received. Null string usually indicates that Audacity
is still processing the last command.
"""
if not PipeClient.reply_ready.is_set():
return ''
return self.reply
def bool_from_string(strval) -> bool:
"""Return boolean value from string"""
if strval.lower() in ('true', 't', '1', 'yes', 'y'):
return True
if strval.lower() in ('false', 'f', '0', 'no', 'n'):
return False
raise argparse.ArgumentTypeError('Boolean value expected.')
def main() -> None:
"""Interactive command-line for PipeClient"""
parser = argparse.ArgumentParser()
parser.add_argument('-t', '--timeout', type=float, metavar='', default=10,
help="timeout for reply in seconds (default: 10")
parser.add_argument('-s', '--show-time', metavar='True/False',
nargs='?', type=bool_from_string,
const='t', default='t', dest='show',
help='show command execution time (default: True)')
parser.add_argument('-d', '--docs', action='store_true',
help='show documentation and exit')
args = parser.parse_args()
if args.docs:
print(__doc__)
sys.exit(0)
client: PipeClient = PipeClient()
while True:
reply: str = ''
message: str = input("\nEnter command or 'Q' to quit: ")
start = time.time()
if message.upper() == 'Q':
sys.exit(0)
elif message == '':
pass
else:
client.write(message, timer=args.show)
while reply == '':
time.sleep(0.1) # allow time for reply
if time.time() - start > args.timeout:
reply = 'PipeClient: Reply timed-out.'
else:
reply = client.read()
print(reply)
if __name__ == '__main__':
main()

View File

@ -1,4 +1,5 @@
import psutil
import time
from pprint import pprint
from typing import Callable, cast, List, Optional, TYPE_CHECKING
@ -35,7 +36,6 @@ from classes import MusicMusterSignals, track_sequence
from config import Config
from helpers import (
ask_yes_no,
AudacityManager,
ms_to_mmss,
show_OK,
show_warning,
@ -343,6 +343,46 @@ class PlaylistTab(QTableView):
)
dlg.exec()
def _audactity_command(self, cmd: str) -> bool:
"""
Send cmd to Audacity and monitor for response. Return True if successful
else False.
"""
log.info(f"_audacity({cmd=})")
# Notify user if audacity not running
if "audacity" not in [i.name() for i in psutil.process_iter()]:
log.warning("Audactity not running")
show_warning(self.musicmuster, "Audacity", "Audacity is not running")
return False
self.musicmuster.audacity_client.write(cmd, timer=True)
reply = ""
count = 0
while reply == "" and count < Config.AUDACITY_TIMEOUT_TENTHS:
time.sleep(0.1)
reply = self.musicmuster.audacity_client.read()
count += 1
log.debug(f"_audactity_command: {count=}, {reply=}")
status = False
timing = ""
msgs = reply.split("\n")
for msg in msgs:
if msg == "BatchCommand finished: OK":
status = True
elif msg.startswith("Execution time:"):
timing = msg
if not status:
log.error(f"_audactity_command {msgs=}")
return False
if timing:
log.info(f"_audactity_command {timing=}")
return True
def _build_context_menu(self, item: QTableWidgetItem) -> None:
"""Used to process context (right-click) menu, which is defined here"""
@ -574,16 +614,29 @@ class PlaylistTab(QTableView):
Import current Audacity track to passed row
"""
# Notify user if audacity not running
if "audacity" not in [i.name() for i in psutil.process_iter()]:
show_warning(self.musicmuster, "Audacity", "Audacity is not running")
path = self.source_model.get_row_track_path(row_number)
if not path:
log.error(f"_import_from_audacity: can't get path for {row_number=}")
return
audacity = self.audacity
if not audacity:
select_cmd = "SelectAll:"
status = self._audactity_command(select_cmd)
if not status:
log.error(f"_import_from_audacity select {status=}")
show_warning(
self.musicmuster, "Audacity", "Error selecting track in Audacity"
)
return
export_cmd = f'Export2: Filename="{path}" NumChannels=2'
status = self._audactity_command(export_cmd)
if not status:
log.error(f"_import_from_audacity export {status=}")
show_warning(
self.musicmuster, "Audacity", "Error exporting track from Audacity"
)
return
audacity.export_file()
self.musicmuster.audacity_file_path = None
self._rescan(row_number)
@ -617,18 +670,16 @@ class PlaylistTab(QTableView):
Open track in passed row in Audacity
"""
# Notify user if audacity not running
if "audacity" not in [i.name() for i in psutil.process_iter()]:
show_warning(self.musicmuster, "Audacity", "Audacity is not running")
return
path = self.source_model.get_row_track_path(row_number)
if not path:
log.error(f"_open_in_audacity: can't get path for {row_number=}")
return
self.musicmuster.audacity_file_path = path
self.audacity = AudacityManager()
self.audacity.open_file(path)
cmd = f'Import2: Filename="{path}"'
status = self._audactity_command(cmd)
log.info(f"_open_in_audacity {path=}, {status=}")
def _rescan(self, row_number: int) -> None:
"""Rescan track"""