Compare commits
7 Commits
b86b7f7f33
...
60c085ad12
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
60c085ad12 | ||
|
|
bf438c3d99 | ||
|
|
33fdc40f66 | ||
|
|
0082f76b56 | ||
|
|
83a817234d | ||
|
|
540846223b | ||
|
|
6985170378 |
125
app/helpers.py
125
app/helpers.py
@ -3,7 +3,6 @@ from email.message import EmailMessage
|
|||||||
from typing import Any, Dict, Optional
|
from typing import Any, Dict, Optional
|
||||||
import functools
|
import functools
|
||||||
import os
|
import os
|
||||||
import psutil
|
|
||||||
import re
|
import re
|
||||||
import shutil
|
import shutil
|
||||||
import smtplib
|
import smtplib
|
||||||
@ -22,14 +21,80 @@ from log import log
|
|||||||
|
|
||||||
start_time_re = re.compile(r"@\d\d:\d\d")
|
start_time_re = re.compile(r"@\d\d:\d\d")
|
||||||
|
|
||||||
# Classes are defined after global functions so that classes can use
|
|
||||||
# those functions.
|
class AudacityManager:
|
||||||
|
"""
|
||||||
|
Manage comms with Audacity
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
"""
|
||||||
|
Open passed file in Audacity
|
||||||
|
|
||||||
|
Return True if apparently opened successfully, else False
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.to_pipe: str = "/tmp/audacity_script_pipe.to." + str(os.getuid())
|
||||||
|
self.from_pipe: str = "/tmp/audacity_script_pipe.from." + str(os.getuid())
|
||||||
|
self.eol: str = "\n"
|
||||||
|
self.path = ""
|
||||||
|
|
||||||
|
def open_file(self, path: str) -> str:
|
||||||
|
"""Open passed file in Audacity"""
|
||||||
|
|
||||||
|
self.path = path
|
||||||
|
return self._do_command(f'Import2: Filename="{self.path}"')
|
||||||
|
|
||||||
|
def export_file(self) -> str:
|
||||||
|
"""Export current file"""
|
||||||
|
|
||||||
|
if not self.path:
|
||||||
|
return "Error: no path selected"
|
||||||
|
|
||||||
|
sa_response = self._do_command("SelectAll:")
|
||||||
|
if sa_response == "\nBatchCommand finished: OK\n":
|
||||||
|
exp_response = self._do_command(
|
||||||
|
f'Export2: Filename="{self.path}" NumChannels=2'
|
||||||
|
)
|
||||||
|
return exp_response
|
||||||
|
else:
|
||||||
|
return "SelectAll response: " + sa_response
|
||||||
|
|
||||||
|
def _send_command(self, command: str) -> None:
|
||||||
|
"""Send a single command."""
|
||||||
|
self.to_audacity.write(command + self.eol)
|
||||||
|
self.to_audacity.flush()
|
||||||
|
|
||||||
|
def _get_response(self) -> str:
|
||||||
|
"""Return the command response."""
|
||||||
|
|
||||||
|
result: str = ""
|
||||||
|
line: str = ""
|
||||||
|
|
||||||
|
while True:
|
||||||
|
result += line
|
||||||
|
line = self.from_audacity.readline()
|
||||||
|
if line == "\n" and len(result) > 0:
|
||||||
|
break
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _do_command(self, command: str) -> str:
|
||||||
|
"""Send one command, and return the response."""
|
||||||
|
|
||||||
|
with open(self.to_pipe, "w") as self.to_audacity, open(
|
||||||
|
self.from_pipe, "rt"
|
||||||
|
) as self.from_audacity:
|
||||||
|
self._send_command(command)
|
||||||
|
response = self._get_response()
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
def ask_yes_no(title: str, question: str, default_yes: bool = False) -> bool:
|
def ask_yes_no(
|
||||||
|
title: str, question: str, default_yes: bool = False, parent: Optional[QMainWindow] = None
|
||||||
|
) -> bool:
|
||||||
"""Ask question; return True for yes, False for no"""
|
"""Ask question; return True for yes, False for no"""
|
||||||
|
|
||||||
dlg = QMessageBox()
|
dlg = QMessageBox(parent)
|
||||||
dlg.setWindowTitle(title)
|
dlg.setWindowTitle(title)
|
||||||
dlg.setText(question)
|
dlg.setText(question)
|
||||||
dlg.setStandardButtons(
|
dlg.setStandardButtons(
|
||||||
@ -314,56 +379,6 @@ def normalise_track(path):
|
|||||||
os.remove(temp_path)
|
os.remove(temp_path)
|
||||||
|
|
||||||
|
|
||||||
def open_in_audacity(path: str) -> bool:
|
|
||||||
"""
|
|
||||||
Open passed file in Audacity
|
|
||||||
|
|
||||||
Return True if apparently opened successfully, else False
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Return if audacity not running
|
|
||||||
if "audacity" not in [i.name() for i in psutil.process_iter()]:
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Return if path not given
|
|
||||||
if not path:
|
|
||||||
return False
|
|
||||||
|
|
||||||
to_pipe: str = "/tmp/audacity_script_pipe.to." + str(os.getuid())
|
|
||||||
from_pipe: str = "/tmp/audacity_script_pipe.from." + str(os.getuid())
|
|
||||||
eol: str = "\n"
|
|
||||||
|
|
||||||
def send_command(command: str) -> None:
|
|
||||||
"""Send a single command."""
|
|
||||||
to_audacity.write(command + eol)
|
|
||||||
to_audacity.flush()
|
|
||||||
|
|
||||||
def get_response() -> str:
|
|
||||||
"""Return the command response."""
|
|
||||||
|
|
||||||
result: str = ""
|
|
||||||
line: str = ""
|
|
||||||
|
|
||||||
while True:
|
|
||||||
result += line
|
|
||||||
line = from_audacity.readline()
|
|
||||||
if line == "\n" and len(result) > 0:
|
|
||||||
break
|
|
||||||
return result
|
|
||||||
|
|
||||||
def do_command(command: str) -> str:
|
|
||||||
"""Send one command, and return the response."""
|
|
||||||
|
|
||||||
send_command(command)
|
|
||||||
response = get_response()
|
|
||||||
return response
|
|
||||||
|
|
||||||
with open(to_pipe, "w") as to_audacity, open(from_pipe, "rt") as from_audacity:
|
|
||||||
do_command(f'Import2: Filename="{path}"')
|
|
||||||
|
|
||||||
return True
|
|
||||||
|
|
||||||
|
|
||||||
def send_mail(to_addr, from_addr, subj, body):
|
def send_mail(to_addr, from_addr, subj, body):
|
||||||
# From https://docs.python.org/3/library/email.examples.html
|
# From https://docs.python.org/3/library/email.examples.html
|
||||||
|
|
||||||
|
|||||||
@ -29,7 +29,6 @@ from helpers import (
|
|||||||
file_is_unreadable,
|
file_is_unreadable,
|
||||||
get_embedded_time,
|
get_embedded_time,
|
||||||
get_relative_date,
|
get_relative_date,
|
||||||
open_in_audacity,
|
|
||||||
ms_to_mmss,
|
ms_to_mmss,
|
||||||
set_track_metadata,
|
set_track_metadata,
|
||||||
)
|
)
|
||||||
@ -984,15 +983,6 @@ class PlaylistModel(QAbstractTableModel):
|
|||||||
log.error(f"OBS SDK error ({e})")
|
log.error(f"OBS SDK error ({e})")
|
||||||
return
|
return
|
||||||
|
|
||||||
def open_in_audacity(self, row_number: int) -> None:
|
|
||||||
"""
|
|
||||||
Open track at passed row number in Audacity
|
|
||||||
"""
|
|
||||||
|
|
||||||
path = self.playlist_rows[row_number].path
|
|
||||||
if path:
|
|
||||||
open_in_audacity(path)
|
|
||||||
|
|
||||||
def previous_track_ended(self) -> None:
|
def previous_track_ended(self) -> None:
|
||||||
"""
|
"""
|
||||||
Notification from musicmuster that the previous track has ended.
|
Notification from musicmuster that the previous track has ended.
|
||||||
@ -1527,9 +1517,6 @@ class PlaylistProxyModel(QSortFilterProxyModel):
|
|||||||
header_row_number, existing_prd, note
|
header_row_number, existing_prd, note
|
||||||
)
|
)
|
||||||
|
|
||||||
def open_in_audacity(self, row_number: int) -> None:
|
|
||||||
return self.data_model.open_in_audacity(row_number)
|
|
||||||
|
|
||||||
def previous_track_ended(self) -> None:
|
def previous_track_ended(self) -> None:
|
||||||
return self.data_model.previous_track_ended()
|
return self.data_model.previous_track_ended()
|
||||||
|
|
||||||
|
|||||||
@ -1,5 +1,6 @@
|
|||||||
|
import psutil
|
||||||
from pprint import pprint
|
from pprint import pprint
|
||||||
from typing import Callable, cast, List, Optional, overload, TYPE_CHECKING
|
from typing import Callable, cast, List, Optional, TYPE_CHECKING
|
||||||
|
|
||||||
from PyQt6.QtCore import (
|
from PyQt6.QtCore import (
|
||||||
QEvent,
|
QEvent,
|
||||||
@ -7,6 +8,7 @@ from PyQt6.QtCore import (
|
|||||||
QObject,
|
QObject,
|
||||||
QItemSelection,
|
QItemSelection,
|
||||||
Qt,
|
Qt,
|
||||||
|
QTimer,
|
||||||
)
|
)
|
||||||
from PyQt6.QtGui import QAction, QKeyEvent
|
from PyQt6.QtGui import QAction, QKeyEvent
|
||||||
from PyQt6.QtWidgets import (
|
from PyQt6.QtWidgets import (
|
||||||
@ -33,6 +35,7 @@ from classes import MusicMusterSignals, track_sequence
|
|||||||
from config import Config
|
from config import Config
|
||||||
from helpers import (
|
from helpers import (
|
||||||
ask_yes_no,
|
ask_yes_no,
|
||||||
|
AudacityManager,
|
||||||
ms_to_mmss,
|
ms_to_mmss,
|
||||||
show_OK,
|
show_OK,
|
||||||
show_warning,
|
show_warning,
|
||||||
@ -202,6 +205,7 @@ class PlaylistTab(QTableView):
|
|||||||
# Load playlist rows
|
# Load playlist rows
|
||||||
self.setModel(self.proxy_model)
|
self.setModel(self.proxy_model)
|
||||||
self._set_column_widths()
|
self._set_column_widths()
|
||||||
|
QTimer.singleShot(0, lambda: self.resizeRowsToContents())
|
||||||
|
|
||||||
# ########## Overrident class functions ##########
|
# ########## Overrident class functions ##########
|
||||||
|
|
||||||
@ -351,7 +355,7 @@ class PlaylistTab(QTableView):
|
|||||||
# Open in Audacity
|
# Open in Audacity
|
||||||
if track_row and not current_row:
|
if track_row and not current_row:
|
||||||
self._add_context_menu(
|
self._add_context_menu(
|
||||||
"Open in Audacity", lambda: model.open_in_audacity(model_row_number)
|
"Open in Audacity", lambda: self.open_in_audacity(model_row_number)
|
||||||
)
|
)
|
||||||
|
|
||||||
# Rescan
|
# Rescan
|
||||||
@ -522,11 +526,20 @@ class PlaylistTab(QTableView):
|
|||||||
return self.data_model.get_row_track_path(model_row_number)
|
return self.data_model.get_row_track_path(model_row_number)
|
||||||
|
|
||||||
def get_selected_rows(self) -> List[int]:
|
def get_selected_rows(self) -> List[int]:
|
||||||
"""Return a list of selected row numbers sorted by row"""
|
"""Return a list of model-selected row numbers sorted by row"""
|
||||||
|
|
||||||
# Use a set to deduplicate result (a selected row will have all
|
# Use a set to deduplicate result (a selected row will have all
|
||||||
# items in that row selected)
|
# items in that row selected)
|
||||||
return sorted(list(set([a.row() for a in self.selectedIndexes()])))
|
return sorted(
|
||||||
|
list(
|
||||||
|
set(
|
||||||
|
[
|
||||||
|
self.proxy_model.mapToSource(a).row()
|
||||||
|
for a in self.selectedIndexes()
|
||||||
|
]
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
def _info_row(self, row_number: int) -> None:
|
def _info_row(self, row_number: int) -> None:
|
||||||
"""Display popup with info re row"""
|
"""Display popup with info re row"""
|
||||||
@ -553,6 +566,30 @@ class PlaylistTab(QTableView):
|
|||||||
self.data_model.mark_unplayed(row_numbers)
|
self.data_model.mark_unplayed(row_numbers)
|
||||||
self.clear_selection()
|
self.clear_selection()
|
||||||
|
|
||||||
|
def open_in_audacity(self, row_number: int) -> None:
|
||||||
|
"""
|
||||||
|
Open track in passed row in Audacity
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Notify user if audacity not running
|
||||||
|
if "audacity" not in [i.name() for i in psutil.process_iter()]:
|
||||||
|
show_warning(self.musicmuster, "Audacity", "Audacity is not running")
|
||||||
|
return
|
||||||
|
|
||||||
|
path = self.data_model.get_row_track_path(row_number)
|
||||||
|
if not path:
|
||||||
|
return
|
||||||
|
|
||||||
|
audacity = AudacityManager()
|
||||||
|
audacity.open_file(path)
|
||||||
|
if ask_yes_no(
|
||||||
|
"Export file",
|
||||||
|
"Click yes to export file, no to ignore",
|
||||||
|
parent=self.musicmuster,
|
||||||
|
):
|
||||||
|
audacity.export_file()
|
||||||
|
self._rescan(row_number)
|
||||||
|
|
||||||
def _rescan(self, row_number: int) -> None:
|
def _rescan(self, row_number: int) -> None:
|
||||||
"""Rescan track"""
|
"""Rescan track"""
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user