156 lines
5.3 KiB
Python
156 lines
5.3 KiB
Python
# Standard library imports
|
|
import os
|
|
import psutil
|
|
import socket
|
|
import select
|
|
from typing import Optional
|
|
|
|
# PyQt imports
|
|
|
|
# Third party imports
|
|
|
|
# App imports
|
|
from classes import ApplicationError
|
|
from config import Config
|
|
from log import log
|
|
|
|
|
|
class AudacityController:
|
|
def __init__(
|
|
self,
|
|
method: str = "pipe",
|
|
socket_host: str = "localhost",
|
|
socket_port: int = 12345,
|
|
timeout: int = Config.AUDACITY_TIMEOUT_SECONDS,
|
|
) -> None:
|
|
"""
|
|
Initialize the AudacityController.
|
|
:param method: Communication method ('pipe' or 'socket').
|
|
:param socket_host: Host for socket connection (if using sockets).
|
|
:param socket_port: Port for socket connection (if using sockets).
|
|
:param timeout: Timeout in seconds for pipe operations.
|
|
"""
|
|
|
|
self.method = method
|
|
self.path: Optional[str] = None
|
|
self.timeout = timeout
|
|
if method == "pipe":
|
|
user_uid = os.getuid() # Get the user's UID
|
|
self.pipe_to = f"/tmp/audacity_script_pipe.to.{user_uid}"
|
|
self.pipe_from = f"/tmp/audacity_script_pipe.from.{user_uid}"
|
|
elif method == "socket":
|
|
self.socket_host = socket_host
|
|
self.socket_port = socket_port
|
|
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
try:
|
|
self.sock.connect((self.socket_host, self.socket_port))
|
|
self.sock.settimeout(self.timeout)
|
|
except socket.error as e:
|
|
raise ApplicationError(f"Failed to connect to Audacity socket: {e}")
|
|
else:
|
|
raise ApplicationError("Invalid method. Use 'pipe' or 'socket'.")
|
|
|
|
self._sanity_check()
|
|
|
|
def close(self):
|
|
"""
|
|
Close the connection (for sockets).
|
|
"""
|
|
if self.method == "socket":
|
|
self.sock.close()
|
|
|
|
def export(self) -> None:
|
|
"""
|
|
Export file from Audacity
|
|
"""
|
|
|
|
self._sanity_check()
|
|
|
|
select_status = self._send_command("SelectAll")
|
|
log.debug(f"{select_status=}")
|
|
|
|
# Escape any double quotes in filename
|
|
export_cmd = f'Export2: Filename="{self.path.replace('"', '\\"')}" NumChannels=2'
|
|
export_status = self._send_command(export_cmd)
|
|
log.debug(f"{export_status=}")
|
|
self.path = ""
|
|
if not export_status.startswith("Exported"):
|
|
raise ApplicationError(f"Error writing from Audacity: {export_status=}")
|
|
|
|
def open(self, path: str) -> None:
|
|
"""
|
|
Open path in Audacity. Escape filename.
|
|
"""
|
|
|
|
self._sanity_check()
|
|
|
|
escaped_path = path.replace('"', '\\"')
|
|
cmd = f'Import2: Filename="{escaped_path}"'
|
|
status = self._send_command(cmd)
|
|
self.path = path
|
|
|
|
log.debug(f"_open_in_audacity {path=}, {status=}")
|
|
|
|
def _sanity_check(self) -> None:
|
|
"""
|
|
Check Audactity running and basic connectivity.
|
|
"""
|
|
|
|
# Check Audacity is running
|
|
if "audacity" not in [i.name() for i in psutil.process_iter()]:
|
|
log.warning("Audactity not running")
|
|
raise ApplicationError("Audacity is not running")
|
|
|
|
# Check pipes exist
|
|
if self.method == "pipe":
|
|
if not (os.path.exists(self.pipe_to) and os.path.exists(self.pipe_from)):
|
|
raise ApplicationError(
|
|
"AudacityController: Audacity pipes not found. Ensure scripting is enabled "
|
|
f"and pipes exist at {self.pipe_to} and {self.pipe_from}."
|
|
)
|
|
|
|
def _test_connectivity(self) -> None:
|
|
"""
|
|
Send test command to Audacity
|
|
"""
|
|
|
|
response = self._send_command(Config.AUDACITY_TEST_COMMAND)
|
|
if response != Config.AUDACITY_TEST_RESPONSE:
|
|
raise ApplicationError(
|
|
"Error testing Audacity connectivity\n"
|
|
f"Sent: {Config.AUDACITY_TEST_COMMAND}"
|
|
f"Received: {response}"
|
|
)
|
|
|
|
def _send_command(self, command: str) -> str:
|
|
"""
|
|
Send a command to Audacity.
|
|
:param command: Command to send (e.g., 'SelectAll').
|
|
:return: Response from Audacity.
|
|
"""
|
|
log.debug(f"_send_command({command=})")
|
|
|
|
if self.method == "pipe":
|
|
try:
|
|
with open(self.pipe_to, "w") as to_pipe:
|
|
to_pipe.write(command + "\n")
|
|
with open(self.pipe_from, "r") as from_pipe:
|
|
ready, _, _ = select.select([from_pipe], [], [], self.timeout)
|
|
if ready:
|
|
response = from_pipe.readline()
|
|
else:
|
|
raise TimeoutError(
|
|
f"Timeout waiting for response from {self.pipe_from}"
|
|
)
|
|
except Exception as e:
|
|
raise RuntimeError(f"Error communicating with Audacity via pipes: {e}")
|
|
elif self.method == "socket":
|
|
try:
|
|
self.sock.sendall((command + "\n").encode("utf-8"))
|
|
response = self.sock.recv(1024).decode("utf-8")
|
|
except socket.timeout:
|
|
raise TimeoutError("Timeout waiting for response from Audacity socket.")
|
|
except Exception as e:
|
|
raise RuntimeError(f"Error communicating with Audacity via socket: {e}")
|
|
return response.strip()
|