432 lines
15 KiB
Python
432 lines
15 KiB
Python
# Standard library imports
|
|
from typing import Optional
|
|
import os
|
|
|
|
# PyQt imports
|
|
from PyQt6.QtCore import QEvent, Qt
|
|
from PyQt6.QtGui import QKeyEvent
|
|
from PyQt6.QtWidgets import (
|
|
QDialog,
|
|
QListWidgetItem,
|
|
QMainWindow,
|
|
QTableWidgetItem,
|
|
)
|
|
|
|
# Third party imports
|
|
import pydymenu # type: ignore
|
|
from sqlalchemy.orm.session import Session
|
|
|
|
# App imports
|
|
from classes import MusicMusterSignals, TrackFileData
|
|
from config import Config
|
|
from helpers import (
|
|
ask_yes_no,
|
|
get_relative_date,
|
|
get_tags,
|
|
ms_to_mmss,
|
|
show_warning,
|
|
)
|
|
from log import log
|
|
from models import db, Settings, Tracks
|
|
from playlistmodel import PlaylistModel
|
|
from ui import dlg_TrackSelect_ui, dlg_replace_files_ui
|
|
|
|
|
|
class ReplaceFilesDialog(QDialog):
|
|
"""Import files as new or replacements"""
|
|
|
|
def __init__(
|
|
self,
|
|
session: Session,
|
|
main_window: QMainWindow,
|
|
*args,
|
|
**kwargs,
|
|
) -> None:
|
|
super().__init__(*args, **kwargs)
|
|
self.session = session
|
|
self.main_window = main_window
|
|
self.ui = dlg_replace_files_ui.Ui_Dialog()
|
|
self.ui.setupUi(self)
|
|
|
|
self.ui.lblSourceDirectory.setText(Config.REPLACE_FILES_DEFAULT_SOURCE)
|
|
self.ui.lblDestinationDirectory.setText(
|
|
Config.REPLACE_FILES_DEFAULT_DESTINATION
|
|
)
|
|
self.replacement_files: list[TrackFileData] = []
|
|
|
|
# We only want to run this against the production database because
|
|
# we will affect files in the common pool of tracks used by all
|
|
# databases
|
|
dburi = os.environ.get("DATABASE_URL")
|
|
if not dburi or "musicmuster_prod" not in dburi:
|
|
if not ask_yes_no(
|
|
"Not production database",
|
|
"Not on production database - continue?",
|
|
default_yes=False,
|
|
):
|
|
return
|
|
if self.ui.lblSourceDirectory.text() == self.ui.lblDestinationDirectory.text():
|
|
show_warning(
|
|
parent=self.main_window,
|
|
title="Error",
|
|
msg="Cannot import into source directory",
|
|
)
|
|
return
|
|
|
|
self.ui.tableWidget.setHorizontalHeaderLabels(["Path", "Title", "Artist"])
|
|
|
|
# Work through new files
|
|
source_dir = self.ui.lblSourceDirectory.text()
|
|
with db.Session() as session:
|
|
for new_file_basename in os.listdir(source_dir):
|
|
new_file_path = os.path.join(source_dir, new_file_basename)
|
|
if not os.path.isfile(new_file_path):
|
|
continue
|
|
rf = TrackFileData(new_file_path=new_file_path)
|
|
rf.tags = get_tags(new_file_path)
|
|
if not (
|
|
"title" in rf.tags
|
|
and "artist" in rf.tags
|
|
and rf.tags["title"]
|
|
and rf.tags["artist"]
|
|
):
|
|
show_warning(
|
|
parent=self.main_window,
|
|
title="Error",
|
|
msg=(
|
|
f"File {new_file_path} missing tags\n\n:"
|
|
f"Title={rf.tags['title']}\n"
|
|
f"Artist={rf.tags['artist']}\n"
|
|
),
|
|
)
|
|
return
|
|
|
|
# Check for same filename
|
|
match_track = self.check_by_basename(
|
|
session, new_file_path, rf.tags["artist"], rf.tags["title"]
|
|
)
|
|
if not match_track:
|
|
match_track = self.check_by_title(
|
|
session, new_file_path, rf.tags["artist"], rf.tags["title"]
|
|
)
|
|
|
|
if not match_track:
|
|
match_track = self.get_fuzzy_match(session, new_file_basename)
|
|
|
|
# Build summary
|
|
if match_track:
|
|
# We will store new file in the same directory as the
|
|
# existing file but with the new file name
|
|
rf.track_path = os.path.join(
|
|
os.path.dirname(match_track.path), new_file_basename
|
|
)
|
|
|
|
# We will remove existing track file
|
|
rf.obsolete_path = match_track.path
|
|
|
|
rf.track_id = match_track.id
|
|
match_basename = os.path.basename(match_track.path)
|
|
if match_basename == new_file_basename:
|
|
path_text = " " + new_file_basename + " (no change)"
|
|
else:
|
|
path_text = (
|
|
f" {match_basename} →\n {new_file_basename} (replace)"
|
|
)
|
|
filename_item = QTableWidgetItem(path_text)
|
|
|
|
if match_track.title == rf.tags["title"]:
|
|
title_text = " " + rf.tags["title"] + " (no change)"
|
|
else:
|
|
title_text = (
|
|
f" {match_track.title} →\n {rf.tags['title']} (update)"
|
|
)
|
|
title_item = QTableWidgetItem(title_text)
|
|
|
|
if match_track.artist == rf.tags["artist"]:
|
|
artist_text = " " + rf.tags["artist"] + " (no change)"
|
|
else:
|
|
artist_text = (
|
|
f" {match_track.artist} →\n {rf.tags['artist']} (update)"
|
|
)
|
|
artist_item = QTableWidgetItem(artist_text)
|
|
|
|
else:
|
|
rf.track_path = os.path.join(
|
|
Config.REPLACE_FILES_DEFAULT_DESTINATION, new_file_basename
|
|
)
|
|
filename_item = QTableWidgetItem(" " + new_file_basename + " (new)")
|
|
title_item = QTableWidgetItem(" " + rf.tags["title"])
|
|
artist_item = QTableWidgetItem(" " + rf.tags["artist"])
|
|
|
|
self.replacement_files.append(rf)
|
|
row = self.ui.tableWidget.rowCount()
|
|
self.ui.tableWidget.insertRow(row)
|
|
self.ui.tableWidget.setItem(row, 0, filename_item)
|
|
self.ui.tableWidget.setItem(row, 1, title_item)
|
|
self.ui.tableWidget.setItem(row, 2, artist_item)
|
|
|
|
self.ui.tableWidget.resizeColumnsToContents()
|
|
self.ui.tableWidget.resizeRowsToContents()
|
|
|
|
def check_by_basename(
|
|
self, session: Session, new_path: str, new_path_artist: str, new_path_title: str
|
|
) -> Optional[Tracks]:
|
|
"""
|
|
Return Track that matches basename and tags
|
|
"""
|
|
|
|
match_track = None
|
|
candidates_by_basename = Tracks.get_by_basename(session, new_path)
|
|
if candidates_by_basename:
|
|
# Check tags are the same
|
|
for cbbn in candidates_by_basename:
|
|
cbbn_tags = get_tags(cbbn.path)
|
|
if (
|
|
"title" in cbbn_tags
|
|
and cbbn_tags["title"].lower() == new_path_title.lower()
|
|
and "artist" in cbbn_tags
|
|
and cbbn_tags["artist"].lower() == new_path_artist.lower()
|
|
):
|
|
match_track = cbbn
|
|
break
|
|
|
|
return match_track
|
|
|
|
def check_by_title(
|
|
self, session: Session, new_path: str, new_path_artist: str, new_path_title: str
|
|
) -> Optional[Tracks]:
|
|
"""
|
|
Return Track that mathces title and artist
|
|
"""
|
|
|
|
match_track = None
|
|
candidates_by_title = Tracks.search_titles(session, new_path_title)
|
|
if candidates_by_title:
|
|
# Check artist tag
|
|
for cbt in candidates_by_title:
|
|
try:
|
|
cbt_artist = get_tags(cbt.path)["artist"]
|
|
if cbt_artist.lower() == new_path_artist.lower():
|
|
match_track = cbt
|
|
break
|
|
except FileNotFoundError:
|
|
return None
|
|
|
|
return match_track
|
|
|
|
def get_fuzzy_match(self, session: Session, fname: str) -> Optional[Tracks]:
|
|
"""
|
|
Return Track that matches fuzzy filename search
|
|
"""
|
|
|
|
match_track = None
|
|
choice = pydymenu.rofi([a.path for a in Tracks.get_all(session)], prompt=fname)
|
|
if choice:
|
|
match_track = Tracks.get_by_path(session, choice[0])
|
|
|
|
return match_track
|
|
|
|
|
|
class TrackSelectDialog(QDialog):
|
|
"""Select track from database"""
|
|
|
|
def __init__(
|
|
self,
|
|
session: Session,
|
|
new_row_number: int,
|
|
source_model: PlaylistModel,
|
|
add_to_header: Optional[bool] = False,
|
|
*args,
|
|
**kwargs,
|
|
) -> None:
|
|
"""
|
|
Subclassed QDialog to manage track selection
|
|
"""
|
|
|
|
super().__init__(*args, **kwargs)
|
|
self.session = session
|
|
self.new_row_number = new_row_number
|
|
self.source_model = source_model
|
|
self.add_to_header = add_to_header
|
|
self.ui = dlg_TrackSelect_ui.Ui_Dialog()
|
|
self.ui.setupUi(self)
|
|
self.ui.btnAdd.clicked.connect(self.add_selected)
|
|
self.ui.btnAddClose.clicked.connect(self.add_selected_and_close)
|
|
self.ui.btnClose.clicked.connect(self.close)
|
|
self.ui.matchList.itemDoubleClicked.connect(self.add_selected)
|
|
self.ui.matchList.itemSelectionChanged.connect(self.selection_changed)
|
|
self.ui.radioTitle.toggled.connect(self.title_artist_toggle)
|
|
self.ui.searchString.textEdited.connect(self.chars_typed)
|
|
self.track: Optional[Tracks] = None
|
|
self.signals = MusicMusterSignals()
|
|
|
|
record = Settings.get_setting(self.session, "dbdialog_width")
|
|
width = record.f_int or 800
|
|
record = Settings.get_setting(self.session, "dbdialog_height")
|
|
height = record.f_int or 600
|
|
self.resize(width, height)
|
|
|
|
if add_to_header:
|
|
self.ui.lblNote.setVisible(False)
|
|
self.ui.txtNote.setVisible(False)
|
|
|
|
def add_selected(self) -> None:
|
|
"""Handle Add button"""
|
|
|
|
track = None
|
|
|
|
if self.ui.matchList.selectedItems():
|
|
item = self.ui.matchList.currentItem()
|
|
if item:
|
|
track = item.data(Qt.ItemDataRole.UserRole)
|
|
|
|
note = self.ui.txtNote.text()
|
|
|
|
if not (track or note):
|
|
return
|
|
|
|
track_id = None
|
|
if track:
|
|
track_id = track.id
|
|
|
|
if note and not track_id:
|
|
self.source_model.insert_row(self.new_row_number, track_id, note)
|
|
self.ui.txtNote.clear()
|
|
self.new_row_number += 1
|
|
return
|
|
|
|
self.ui.txtNote.clear()
|
|
self.select_searchtext()
|
|
|
|
if track_id is None:
|
|
log.error("track_id is None and should not be")
|
|
return
|
|
|
|
# Check whether track is already in playlist
|
|
move_existing = False
|
|
existing_prd = self.source_model.is_track_in_playlist(track_id)
|
|
if existing_prd is not None:
|
|
if ask_yes_no(
|
|
"Duplicate row",
|
|
"Track already in playlist. " "Move to new location?",
|
|
default_yes=True,
|
|
):
|
|
move_existing = True
|
|
|
|
if self.add_to_header:
|
|
if move_existing and existing_prd: # "and existing_prd" for mypy's benefit
|
|
self.source_model.move_track_to_header(
|
|
self.new_row_number, existing_prd, note
|
|
)
|
|
else:
|
|
self.source_model.add_track_to_header(self.new_row_number, track_id)
|
|
# Close dialog - we can only add one track to a header
|
|
self.accept()
|
|
else:
|
|
# Adding a new track row
|
|
if move_existing and existing_prd: # "and existing_prd" for mypy's benefit
|
|
self.source_model.move_track_add_note(
|
|
self.new_row_number, existing_prd, note
|
|
)
|
|
else:
|
|
self.source_model.insert_row(self.new_row_number, track_id, note)
|
|
|
|
self.new_row_number += 1
|
|
|
|
def add_selected_and_close(self) -> None:
|
|
"""Handle Add and Close button"""
|
|
|
|
self.add_selected()
|
|
self.accept()
|
|
|
|
def chars_typed(self, s: str) -> None:
|
|
"""Handle text typed in search box"""
|
|
|
|
self.ui.matchList.clear()
|
|
if len(s) > 0:
|
|
if s.startswith("a/") and len(s) > 2:
|
|
matches = Tracks.search_artists(self.session, "%" + s[2:])
|
|
elif self.ui.radioTitle.isChecked():
|
|
matches = Tracks.search_titles(self.session, "%" + s)
|
|
else:
|
|
matches = Tracks.search_artists(self.session, "%" + s)
|
|
if matches:
|
|
for track in matches:
|
|
last_played = None
|
|
last_playdate = max(
|
|
track.playdates, key=lambda p: p.lastplayed, default=None
|
|
)
|
|
if last_playdate:
|
|
last_played = last_playdate.lastplayed
|
|
t = QListWidgetItem()
|
|
track_text = (
|
|
f"{track.title} - {track.artist} "
|
|
f"[{ms_to_mmss(track.duration)}] "
|
|
f"({get_relative_date(last_played)})"
|
|
)
|
|
t.setText(track_text)
|
|
t.setData(Qt.ItemDataRole.UserRole, track)
|
|
self.ui.matchList.addItem(t)
|
|
|
|
def closeEvent(self, event: Optional[QEvent]) -> None:
|
|
"""
|
|
Override close and save dialog coordinates
|
|
"""
|
|
|
|
if not event:
|
|
return
|
|
|
|
record = Settings.get_setting(self.session, "dbdialog_height")
|
|
record.f_int = self.height()
|
|
|
|
record = Settings.get_setting(self.session, "dbdialog_width")
|
|
record.f_int = self.width()
|
|
|
|
self.session.commit()
|
|
|
|
event.accept()
|
|
|
|
def keyPressEvent(self, event: QKeyEvent) -> None:
|
|
"""
|
|
Clear selection on ESC if there is one
|
|
"""
|
|
|
|
if event.key() == Qt.Key.Key_Escape:
|
|
if self.ui.matchList.selectedItems():
|
|
self.ui.matchList.clearSelection()
|
|
return
|
|
|
|
super(TrackSelectDialog, self).keyPressEvent(event)
|
|
|
|
def select_searchtext(self) -> None:
|
|
"""Select the searchbox"""
|
|
|
|
self.ui.searchString.selectAll()
|
|
self.ui.searchString.setFocus()
|
|
|
|
def selection_changed(self) -> None:
|
|
"""Display selected track path in dialog box"""
|
|
|
|
if not self.ui.matchList.selectedItems():
|
|
return
|
|
|
|
item = self.ui.matchList.currentItem()
|
|
track = item.data(Qt.ItemDataRole.UserRole)
|
|
last_playdate = max(track.playdates, key=lambda p: p.lastplayed, default=None)
|
|
if last_playdate:
|
|
last_played = last_playdate.lastplayed
|
|
else:
|
|
last_played = None
|
|
path_text = f"{track.path} ({get_relative_date(last_played)})"
|
|
|
|
self.ui.dbPath.setText(path_text)
|
|
|
|
def title_artist_toggle(self) -> None:
|
|
"""
|
|
Handle switching between searching for artists and searching for
|
|
titles
|
|
"""
|
|
|
|
# Logic is handled already in chars_typed(), so just call that.
|
|
self.chars_typed(self.ui.searchString.text())
|