From 73e728177e63ed299dfa8cb93f5285085f01dd70 Mon Sep 17 00:00:00 2001 From: Keith Edmunds Date: Mon, 15 Aug 2022 14:16:46 +0100 Subject: [PATCH] Import track working --- app/helpers.py | 32 ++++---- app/models.py | 107 +++++++++++++------------- app/musicmuster.py | 124 +++++++++++++++--------------- app/playlists.py | 13 +++- app/utilities.py | 185 +++++++++++++++++++++++---------------------- 5 files changed, 235 insertions(+), 226 deletions(-) diff --git a/app/helpers.py b/app/helpers.py index e4b480d..6da20db 100644 --- a/app/helpers.py +++ b/app/helpers.py @@ -5,9 +5,10 @@ from config import Config from datetime import datetime from pydub import AudioSegment from PyQt5.QtWidgets import QMessageBox -# from tinytag import TinyTag +from tinytag import TinyTag # type: ignore from typing import Optional # from typing import Dict, Optional, Union +from typing import Dict, Union # # # def ask_yes_no(title: str, question: str) -> bool: @@ -74,21 +75,20 @@ def get_audio_segment(path: str) -> Optional[AudioSegment]: return None -# -# def get_tags(path: str) -> Dict[str, Union[str, int]]: -# """ -# Return a dictionary of title, artist, duration-in-milliseconds and path. -# """ -# -# tag: TinyTag = TinyTag.get(path) -# -# d = dict( -# title=tag.title, -# artist=tag.artist, -# duration=int(round(tag.duration, Config.MILLISECOND_SIGFIGS) * 1000), -# path=path -# ) -# return d + +def get_tags(path: str) -> Dict[str, Union[str, int]]: + """ + Return a dictionary of title, artist, duration-in-milliseconds and path. + """ + + tag = TinyTag.get(path) + + return dict( + title=tag.title, + artist=tag.artist, + duration=int(round(tag.duration, Config.MILLISECOND_SIGFIGS) * 1000), + path=path + ) def get_relative_date(past_date: datetime, reference_date: datetime = None) \ diff --git a/app/models.py b/app/models.py index 3ba066f..ea58c5c 100644 --- a/app/models.py +++ b/app/models.py @@ -82,7 +82,8 @@ class NoteColours(Base): # session.flush() # # @classmethod - # def get_all(cls, session: Session) -> Optional[List["NoteColours"]]: + # def get_all(cls, session: Session) -> + # Optional[List["NoteColours"]]: # """Return all records""" # # return session.query(cls).all() @@ -286,7 +287,8 @@ class Playlists(Base): # def remove_track(self, session: Session, row: int) -> None: # log.debug(f"Playlist.remove_track({self.id=}, {row=})") # - # # Refresh self first (this is necessary when calling remove_track + # # Refresh self first (this is necessary when calling + # remove_track # # multiple times before session.commit()) # session.refresh(self) # # Get tracks collection for this playlist @@ -508,7 +510,6 @@ class Tracks(Base): silence_at = Column(Integer, index=False) path = Column(String(2048), index=False, nullable=False) mtime = Column(Float, index=True) - # lastplayed = Column(DateTime, index=True, default=None) playlistrows = relationship("PlaylistRows", back_populates="track") playlists = association_proxy("playlistrows", "playlist") playdates = relationship("Playdates", back_populates="track") @@ -519,31 +520,31 @@ class Tracks(Base): f"artist={self.artist}, path={self.path}>" ) - # def __init__( - # self, - # session: Session, - # path: str, - # title: Optional[str] = None, - # artist: Optional[str] = None, - # duration: int = 0, - # start_gap: int = 0, - # fade_at: Optional[int] = None, - # silence_at: Optional[int] = None, - # mtime: Optional[float] = None, - # lastplayed: Optional[datetime] = None, - # ) -> None: - # self.path = path - # self.title = title - # self.artist = artist - # self.duration = duration - # self.start_gap = start_gap - # self.fade_at = fade_at - # self.silence_at = silence_at - # self.mtime = mtime - # self.lastplayed = lastplayed - # - # session.add(self) - # session.flush() + def __init__( + self, + session: Session, + path: str, + title: Optional[str] = None, + artist: Optional[str] = None, + duration: int = 0, + start_gap: int = 0, + fade_at: Optional[int] = None, + silence_at: Optional[int] = None, + mtime: Optional[float] = None, + lastplayed: Optional[datetime] = None, + ) -> None: + self.path = path + self.title = title + self.artist = artist + self.duration = duration + self.start_gap = start_gap + self.fade_at = fade_at + self.silence_at = silence_at + self.mtime = mtime + self.lastplayed = lastplayed + + session.add(self) + session.commit() # # @staticmethod # def get_all_paths(session) -> List[str]: @@ -556,23 +557,21 @@ class Tracks(Base): # """Return a list of all tracks""" # # return session.query(cls).all() - # - # @classmethod - # def get_or_create(cls, session: Session, path: str) -> "Tracks": - # """ - # If a track with path exists, return it; - # else created new track and return it - # """ - # - # log.debug(f"Tracks.get_or_create({path=})") - # - # try: - # track = session.query(cls).filter(cls.path == path).one() - # except NoResultFound: - # track = Tracks(session, path) - # - # return track - # + + @classmethod + def get_or_create(cls, session: Session, path: str) -> "Tracks": + """ + If a track with path exists, return it; + else created new track and return it + """ + + try: + track = session.query(cls).filter(cls.path == path).one() + except NoResultFound: + track = Tracks(session, path) + + return track + # @classmethod # def get_by_filename(cls, session: Session, filename: str) \ # -> Optional["Tracks"]: @@ -591,22 +590,26 @@ class Tracks(Base): # return None # # @classmethod - # def get_by_path(cls, session: Session, path: str) -> List["Tracks"]: + # def get_by_path(cls, session: Session, path: str) -> + # List["Tracks"]: # """ # Return track with passee path, or None. # """ # # log.debug(f"Tracks.get_track_from_path({path=})") # - # return session.query(Tracks).filter(Tracks.path == path).first() + # return session.query(Tracks).filter(Tracks.path == + # path).first() # # @classmethod - # def get_by_id(cls, session: Session, track_id: int) -> Optional["Tracks"]: + # def get_by_id(cls, session: Session, track_id: int) -> + # Optional["Tracks"]: # """Return track or None""" # # try: # log.debug(f"Tracks.get_track(track_id={track_id})") - # track = session.query(Tracks).filter(Tracks.id == track_id).one() + # track = session.query(Tracks).filter(Tracks.id == + # track_id).one() # return track # except NoResultFound: # log.error(f"get_track({track_id}): not found") @@ -635,10 +638,12 @@ class Tracks(Base): # log.debug(f"Tracks.remove_path({path=})") # # try: - # session.query(Tracks).filter(Tracks.path == path).delete() + # session.query(Tracks).filter(Tracks.path == + # path).delete() # session.flush() # except IntegrityError as exception: - # log.error(f"Can't remove track with {path=} ({exception=})") + # log.error(f"Can't remove track with {path=} + # ({exception=})") @classmethod def search_artists(cls, session: Session, text: str) -> List["Tracks"]: diff --git a/app/musicmuster.py b/app/musicmuster.py index fa825bc..4dc9d03 100755 --- a/app/musicmuster.py +++ b/app/musicmuster.py @@ -3,7 +3,7 @@ from log import log # import argparse import sys -# import threading +import threading from datetime import datetime, timedelta # from typing import Callable, Dict, List, Optional, Tuple @@ -20,13 +20,13 @@ from PyQt5.QtWidgets import ( QLineEdit, QListWidgetItem, QMainWindow, - # QMessageBox, + QMessageBox, ) -# + from dbconfig import engine, Session import helpers import music -# + from models import ( Base, Playdates, @@ -42,7 +42,7 @@ from ui.dlg_SelectPlaylist_ui import Ui_dlgSelectPlaylist # type: ignore from ui.downloadcsv_ui import Ui_DateSelect # type: ignore from config import Config from ui.main_window_ui import Ui_MainWindow # type: ignore -# from utilities import create_track_from_file, update_db +from utilities import create_track_from_file # , update_db class TrackData: @@ -164,7 +164,7 @@ class Window(QMainWindow, Ui_MainWindow): lambda: self.tabPlaylist.currentWidget().search_next()) self.actionFind_previous.triggered.connect( lambda: self.tabPlaylist.currentWidget().search_previous()) -# self.actionImport.triggered.connect(self.import_track) + self.actionImport.triggered.connect(self.import_track) self.actionInsertSectionHeader.triggered.connect(self.insert_header) self.actionInsertTrack.triggered.connect(self.insert_track) self.actionMoveSelected.triggered.connect(self.move_selected) @@ -173,7 +173,6 @@ class Window(QMainWindow, Ui_MainWindow): self.actionPlay_next.triggered.connect(self.play_next) self.actionSearch.triggered.connect(self.search_playlist) self.actionSelect_next_track.triggered.connect(self.select_next_row) -# self.actionSelect_played_tracks.triggered.connect(self.select_played) self.actionSelect_previous_track.triggered.connect( self.select_previous_row) self.actionMoveUnplayed.triggered.connect(self.move_unplayed) @@ -419,63 +418,60 @@ class Window(QMainWindow, Ui_MainWindow): with Session() as session: for i in range(self.tabPlaylist.count()): self.tabPlaylist.widget(i).update_display(session) -# -# def import_track(self) -> None: -# """Import track file""" -# -# dlg = QFileDialog() -# dlg.setFileMode(QFileDialog.ExistingFiles) -# dlg.setViewMode(QFileDialog.Detail) -# dlg.setDirectory(Config.IMPORT_DESTINATION) -# dlg.setNameFilter("Music files (*.flac *.mp3)") -# -# if dlg.exec_(): -# with Session() as session: -# txt: str = "" -# new_tracks = [] -# for fname in dlg.selectedFiles(): -# tags = helpers.get_tags(fname) -# new_tracks.append((fname, tags)) -# title = tags['title'] -# artist = tags['artist'] -# possible_matches = Tracks.search_titles(session, title) -# if possible_matches: -# txt += 'Similar to new track ' -# txt += f'"{title}" by "{artist} ({fname})":\n\n' -# for track in possible_matches: -# txt += f' "{track.title}" by {track.artist}' -# txt += f' ({track.path})\n' -# txt += "\n" -# # Check whether to proceed if there were potential matches -# if txt: -# txt += "Proceed with import?" -# result = QMessageBox.question(self, -# "Possible duplicates", -# txt, -# QMessageBox.Ok, -# QMessageBox.Cancel -# ) -# if result == QMessageBox.Cancel: -# return -# -# # Import in separate thread -# thread = threading.Thread(target=self._import_tracks, -# args=(new_tracks,)) -# thread.start() -# -# def _import_tracks(self, tracks: list): -# """ -# Import passed files. Don't use parent session as that may be invalid -# by the time we need it. -# """ -# -# with Session() as session: -# for (fname, tags) in tracks: -# track = create_track_from_file(session, fname, tags=tags) -# # Add to playlist on screen -# # If we don't specify "repaint=False", playlist will -# # also be saved to database -# self.visible_playlist_tab().insert_track(session, track) + + def import_track(self) -> None: + """Import track file""" + + dlg = QFileDialog() + dlg.setFileMode(QFileDialog.ExistingFiles) + dlg.setViewMode(QFileDialog.Detail) + dlg.setDirectory(Config.IMPORT_DESTINATION) + dlg.setNameFilter("Music files (*.flac *.mp3)") + + if not dlg.exec_(): + return + + with Session() as session: + txt = "" + new_tracks = [] + for fname in dlg.selectedFiles(): + tags = helpers.get_tags(fname) + new_tracks.append((fname, tags)) + title = tags['title'] + artist = tags['artist'] + possible_matches = Tracks.search_titles(session, title) + if possible_matches: + txt += 'Similar to new track ' + txt += f'"{title}" by "{artist} ({fname})":\n\n' + for track in possible_matches: + txt += f' "{track.title}" by {track.artist}' + txt += f' ({track.path})\n\n' + txt += "\n" + # Check whether to proceed if there were potential matches + txt += "Proceed with import?" + result = QMessageBox.question(self, + "Possible duplicates", + txt, + QMessageBox.Ok, + QMessageBox.Cancel + ) + if result == QMessageBox.Cancel: + return + + # Import in separate thread + thread = threading.Thread(target=self._import_tracks, + args=(new_tracks,)) + thread.start() + + def _import_tracks(self, tracks: list): + """ + Create track objects from passed files and add to visible playlist + """ + + with Session() as session: + for (fname, tags) in tracks: + track = create_track_from_file(session, fname, tags=tags) + self.visible_playlist_tab().insert_track(session, track) def load_last_playlists(self) -> None: """Load the playlists that were open when the last session closed""" diff --git a/app/playlists.py b/app/playlists.py index 1a1c12f..440273f 100644 --- a/app/playlists.py +++ b/app/playlists.py @@ -579,9 +579,16 @@ class PlaylistTab(QTableWidget): self._set_unreadable_row(row) else: - # This is a section header so make empty items (row - # background won't be coloured without items present). Any - # notes should displayed starting in column 0 + # This is a section header so it must have note text + if row_data.note is None: + log.debug( + f"insert_row({row_data=}) with no track_id and no note" + ) + return + + # Make empty items (row background won't be coloured without + # items present). Any notes should displayed starting in + # column 0 for i in range(2, len(columns) - 1): self.setItem(row, i, QTableWidgetItem()) notes_item = QTableWidgetItem(row_data.note) diff --git a/app/utilities.py b/app/utilities.py index 7674862..fc37771 100755 --- a/app/utilities.py +++ b/app/utilities.py @@ -1,27 +1,24 @@ # #!/usr/bin/env python # # import argparse -# import os -# import shutil -# import tempfile -# -# import helpers -# from config import Config -# from helpers import ( -# fade_point, -# get_audio_segment, -# get_tags, -# leading_silence, -# trailing_silence, -# ) +import os +import shutil +import tempfile + +from config import Config +from helpers import ( + fade_point, + get_audio_segment, + get_tags, + leading_silence, + trailing_silence, +) # from log import log.debug, log.info # from models import Notes, Playdates, Session, Tracks -# from mutagen.flac import FLAC -# from mutagen.mp3 import MP3 -# from pydub import effects -# -# # Globals (I know) -# messages = [] +from models import Tracks +from mutagen.flac import FLAC +from mutagen.mp3 import MP3 +from pydub import effects # # # def main(): @@ -53,79 +50,83 @@ # log.info("No action specified") # # log.debug("Finished") -# -# -# def create_track_from_file(session, path, normalise=None, tags=None): -# """ -# Create track in database from passed path, or update database entry -# if path already in database. -# -# Return track. -# """ -# -# if not tags: -# t = get_tags(path) -# else: -# t = tags -# -# track = Tracks.get_or_create(session, path) -# track.title = t['title'] -# track.artist = t['artist'] -# audio = get_audio_segment(path) -# track.duration = len(audio) -# track.start_gap = leading_silence(audio) -# track.fade_at = round(fade_point(audio) / 1000, -# Config.MILLISECOND_SIGFIGS) * 1000 -# track.silence_at = round(trailing_silence(audio) / 1000, -# Config.MILLISECOND_SIGFIGS) * 1000 -# track.mtime = os.path.getmtime(path) -# session.commit() -# -# if normalise or normalise is None and Config.NORMALISE_ON_IMPORT: -# # Check type -# ftype = os.path.splitext(path)[1][1:] -# if ftype not in ['mp3', 'flac']: -# log.info(f"File type {ftype} not implemented") -# return track -# -# # Get current file gid, uid and permissions -# stats = os.stat(path) -# try: -# # Copy original file -# fd, temp_path = tempfile.mkstemp() -# shutil.copyfile(path, temp_path) -# except Exception as err: -# log.debug(f"songdb.create_track_from_file({path}): err1: {repr(err)}") -# return -# -# # Overwrite original file with normalised output -# normalised = effects.normalize(audio) -# try: -# normalised.export(path, format=os.path.splitext(path)[1][1:]) -# # Fix up permssions and ownership -# os.chown(path, stats.st_uid, stats.st_gid) -# os.chmod(path, stats.st_mode) -# # Copy tags -# if ftype == 'flac': -# tag_handler = FLAC -# elif ftype == 'mp3': -# tag_handler = MP3 -# else: -# return track -# src = tag_handler(temp_path) -# dst = tag_handler(path) -# for tag in src: -# dst[tag] = src[tag] -# dst.save() -# except Exception as err: -# log.debug(f"songdb.create_track_from_file({path}): err2: {repr(err)}") -# # Restore original file -# shutil.copyfile(path, temp_path) -# finally: -# if os.path.exists(temp_path): -# os.remove(temp_path) -# -# return track + + +def create_track_from_file(session, path, normalise=None, tags=None): + """ + Create track in database from passed path, or update database entry + if path already in database. + + Return track. + """ + + if not tags: + t = get_tags(path) + else: + t = tags + + track = Tracks.get_or_create(session, path) + track.title = t['title'] + track.artist = t['artist'] + audio = get_audio_segment(path) + track.duration = len(audio) + track.start_gap = leading_silence(audio) + track.fade_at = round(fade_point(audio) / 1000, + Config.MILLISECOND_SIGFIGS) * 1000 + track.silence_at = round(trailing_silence(audio) / 1000, + Config.MILLISECOND_SIGFIGS) * 1000 + track.mtime = os.path.getmtime(path) + session.commit() + + if normalise or normalise is None and Config.NORMALISE_ON_IMPORT: + # Check type + ftype = os.path.splitext(path)[1][1:] + if ftype not in ['mp3', 'flac']: + log.info(f"File type {ftype} not implemented") + return track + + # Get current file gid, uid and permissions + stats = os.stat(path) + try: + # Copy original file + fd, temp_path = tempfile.mkstemp() + shutil.copyfile(path, temp_path) + except Exception as err: + log.debug( + f"utilities.create_track_from_file({path}): err1: {repr(err)}" + ) + return + + # Overwrite original file with normalised output + normalised = effects.normalize(audio) + try: + normalised.export(path, format=os.path.splitext(path)[1][1:]) + # Fix up permssions and ownership + os.chown(path, stats.st_uid, stats.st_gid) + os.chmod(path, stats.st_mode) + # Copy tags + if ftype == 'flac': + tag_handler = FLAC + elif ftype == 'mp3': + tag_handler = MP3 + else: + return track + src = tag_handler(temp_path) + dst = tag_handler(path) + for tag in src: + dst[tag] = src[tag] + dst.save() + except Exception as err: + log.debug( + f"utilities.create_track_from_file({path}): err2: {repr(err)}" + ) + # Restore original file + shutil.copyfile(path, temp_path) + finally: + if os.path.exists(temp_path): + os.remove(temp_path) + + return track # # # def full_update_db(session):