# Standard library imports import os import psutil import socket import select # PyQt imports # Third party imports # App imports from classes import ApplicationError from config import Config from log import log class AudacityController: def __init__( self, method: str = "pipe", socket_host: str = "localhost", socket_port: int = 12345, timeout: int = Config.AUDACITY_TIMEOUT_SECONDS, ) -> None: """ Initialize the AudacityController. :param method: Communication method ('pipe' or 'socket'). :param socket_host: Host for socket connection (if using sockets). :param socket_port: Port for socket connection (if using sockets). :param timeout: Timeout in seconds for pipe operations. """ self.method = method self.path: str = "" self.timeout = timeout if method == "pipe": user_uid = os.getuid() # Get the user's UID self.pipe_to = f"/tmp/audacity_script_pipe.to.{user_uid}" self.pipe_from = f"/tmp/audacity_script_pipe.from.{user_uid}" elif method == "socket": self.socket_host = socket_host self.socket_port = socket_port self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: self.sock.connect((self.socket_host, self.socket_port)) self.sock.settimeout(self.timeout) except socket.error as e: raise ApplicationError(f"Failed to connect to Audacity socket: {e}") else: raise ApplicationError("Invalid method. Use 'pipe' or 'socket'.") self._sanity_check() def close(self): """ Close the connection (for sockets). """ if self.method == "socket": self.sock.close() def export(self) -> None: """ Export file from Audacity """ self._sanity_check() select_status = self._send_command("SelectAll") log.debug(f"{select_status=}") export_cmd = f'Export2: Filename="{self.path}" NumChannels=2' export_status = self._send_command(export_cmd) log.debug(f"{export_status=}") self.path = "" if not export_status.startswith("Exported"): raise ApplicationError(f"Error writing from Audacity: {export_status=}") def open(self, path: str) -> None: """ Open path in Audacity. Escape filename. """ self._sanity_check() escaped_path = path.replace('"', '\\"') cmd = f'Import2: Filename="{escaped_path}"' status = self._send_command(cmd) self.path = path log.debug(f"_open_in_audacity {path=}, {status=}") def _sanity_check(self) -> None: """ Check Audactity running and basic connectivity. """ # Check Audacity is running if "audacity" not in [i.name() for i in psutil.process_iter()]: log.warning("Audactity not running") raise ApplicationError("Audacity is not running") # Check pipes exist if self.method == "pipe": if not (os.path.exists(self.pipe_to) and os.path.exists(self.pipe_from)): raise ApplicationError( "AudacityController: Audacity pipes not found. Ensure scripting is enabled " f"and pipes exist at {self.pipe_to} and {self.pipe_from}." ) # Send test command to Audacity response = self._send_command(Config.AUDACITY_TEST_COMMAND) if response != Config.AUDACITY_TEST_RESPONSE: raise ApplicationError( "Error testing Audacity connectivity\n" f"Sent: {Config.AUDACITY_TEST_COMMAND}" f"Received: {response}" ) def _send_command(self, command: str) -> str: """ Send a command to Audacity. :param command: Command to send (e.g., 'SelectAll'). :return: Response from Audacity. """ log.debug(f"_send_command({command=})") if self.method == "pipe": try: with open(self.pipe_to, "w") as to_pipe: to_pipe.write(command + "\n") with open(self.pipe_from, "r") as from_pipe: ready, _, _ = select.select([from_pipe], [], [], self.timeout) if ready: response = from_pipe.readline() else: raise TimeoutError( f"Timeout waiting for response from {self.pipe_from}" ) except Exception as e: raise RuntimeError(f"Error communicating with Audacity via pipes: {e}") elif self.method == "socket": try: self.sock.sendall((command + "\n").encode("utf-8")) response = self.sock.recv(1024).decode("utf-8") except socket.timeout: raise TimeoutError("Timeout waiting for response from Audacity socket.") except Exception as e: raise RuntimeError(f"Error communicating with Audacity via socket: {e}") return response.strip()