Import track working

This commit is contained in:
Keith Edmunds 2022-08-15 14:16:46 +01:00
parent 3b4cf5320d
commit 73e728177e
5 changed files with 235 additions and 226 deletions

View File

@ -5,9 +5,10 @@ from config import Config
from datetime import datetime
from pydub import AudioSegment
from PyQt5.QtWidgets import QMessageBox
# from tinytag import TinyTag
from tinytag import TinyTag # type: ignore
from typing import Optional
# from typing import Dict, Optional, Union
from typing import Dict, Union
#
#
# def ask_yes_no(title: str, question: str) -> bool:
@ -74,21 +75,20 @@ def get_audio_segment(path: str) -> Optional[AudioSegment]:
return None
#
# def get_tags(path: str) -> Dict[str, Union[str, int]]:
# """
# Return a dictionary of title, artist, duration-in-milliseconds and path.
# """
#
# tag: TinyTag = TinyTag.get(path)
#
# d = dict(
# title=tag.title,
# artist=tag.artist,
# duration=int(round(tag.duration, Config.MILLISECOND_SIGFIGS) * 1000),
# path=path
# )
# return d
def get_tags(path: str) -> Dict[str, Union[str, int]]:
"""
Return a dictionary of title, artist, duration-in-milliseconds and path.
"""
tag = TinyTag.get(path)
return dict(
title=tag.title,
artist=tag.artist,
duration=int(round(tag.duration, Config.MILLISECOND_SIGFIGS) * 1000),
path=path
)
def get_relative_date(past_date: datetime, reference_date: datetime = None) \

View File

@ -82,7 +82,8 @@ class NoteColours(Base):
# session.flush()
#
# @classmethod
# def get_all(cls, session: Session) -> Optional[List["NoteColours"]]:
# def get_all(cls, session: Session) ->
# Optional[List["NoteColours"]]:
# """Return all records"""
#
# return session.query(cls).all()
@ -286,7 +287,8 @@ class Playlists(Base):
# def remove_track(self, session: Session, row: int) -> None:
# log.debug(f"Playlist.remove_track({self.id=}, {row=})")
#
# # Refresh self first (this is necessary when calling remove_track
# # Refresh self first (this is necessary when calling
# remove_track
# # multiple times before session.commit())
# session.refresh(self)
# # Get tracks collection for this playlist
@ -508,7 +510,6 @@ class Tracks(Base):
silence_at = Column(Integer, index=False)
path = Column(String(2048), index=False, nullable=False)
mtime = Column(Float, index=True)
# lastplayed = Column(DateTime, index=True, default=None)
playlistrows = relationship("PlaylistRows", back_populates="track")
playlists = association_proxy("playlistrows", "playlist")
playdates = relationship("Playdates", back_populates="track")
@ -519,31 +520,31 @@ class Tracks(Base):
f"artist={self.artist}, path={self.path}>"
)
# def __init__(
# self,
# session: Session,
# path: str,
# title: Optional[str] = None,
# artist: Optional[str] = None,
# duration: int = 0,
# start_gap: int = 0,
# fade_at: Optional[int] = None,
# silence_at: Optional[int] = None,
# mtime: Optional[float] = None,
# lastplayed: Optional[datetime] = None,
# ) -> None:
# self.path = path
# self.title = title
# self.artist = artist
# self.duration = duration
# self.start_gap = start_gap
# self.fade_at = fade_at
# self.silence_at = silence_at
# self.mtime = mtime
# self.lastplayed = lastplayed
#
# session.add(self)
# session.flush()
def __init__(
self,
session: Session,
path: str,
title: Optional[str] = None,
artist: Optional[str] = None,
duration: int = 0,
start_gap: int = 0,
fade_at: Optional[int] = None,
silence_at: Optional[int] = None,
mtime: Optional[float] = None,
lastplayed: Optional[datetime] = None,
) -> None:
self.path = path
self.title = title
self.artist = artist
self.duration = duration
self.start_gap = start_gap
self.fade_at = fade_at
self.silence_at = silence_at
self.mtime = mtime
self.lastplayed = lastplayed
session.add(self)
session.commit()
#
# @staticmethod
# def get_all_paths(session) -> List[str]:
@ -556,23 +557,21 @@ class Tracks(Base):
# """Return a list of all tracks"""
#
# return session.query(cls).all()
#
# @classmethod
# def get_or_create(cls, session: Session, path: str) -> "Tracks":
# """
# If a track with path exists, return it;
# else created new track and return it
# """
#
# log.debug(f"Tracks.get_or_create({path=})")
#
# try:
# track = session.query(cls).filter(cls.path == path).one()
# except NoResultFound:
# track = Tracks(session, path)
#
# return track
#
@classmethod
def get_or_create(cls, session: Session, path: str) -> "Tracks":
"""
If a track with path exists, return it;
else created new track and return it
"""
try:
track = session.query(cls).filter(cls.path == path).one()
except NoResultFound:
track = Tracks(session, path)
return track
# @classmethod
# def get_by_filename(cls, session: Session, filename: str) \
# -> Optional["Tracks"]:
@ -591,22 +590,26 @@ class Tracks(Base):
# return None
#
# @classmethod
# def get_by_path(cls, session: Session, path: str) -> List["Tracks"]:
# def get_by_path(cls, session: Session, path: str) ->
# List["Tracks"]:
# """
# Return track with passee path, or None.
# """
#
# log.debug(f"Tracks.get_track_from_path({path=})")
#
# return session.query(Tracks).filter(Tracks.path == path).first()
# return session.query(Tracks).filter(Tracks.path ==
# path).first()
#
# @classmethod
# def get_by_id(cls, session: Session, track_id: int) -> Optional["Tracks"]:
# def get_by_id(cls, session: Session, track_id: int) ->
# Optional["Tracks"]:
# """Return track or None"""
#
# try:
# log.debug(f"Tracks.get_track(track_id={track_id})")
# track = session.query(Tracks).filter(Tracks.id == track_id).one()
# track = session.query(Tracks).filter(Tracks.id ==
# track_id).one()
# return track
# except NoResultFound:
# log.error(f"get_track({track_id}): not found")
@ -635,10 +638,12 @@ class Tracks(Base):
# log.debug(f"Tracks.remove_path({path=})")
#
# try:
# session.query(Tracks).filter(Tracks.path == path).delete()
# session.query(Tracks).filter(Tracks.path ==
# path).delete()
# session.flush()
# except IntegrityError as exception:
# log.error(f"Can't remove track with {path=} ({exception=})")
# log.error(f"Can't remove track with {path=}
# ({exception=})")
@classmethod
def search_artists(cls, session: Session, text: str) -> List["Tracks"]:

View File

@ -3,7 +3,7 @@
from log import log
# import argparse
import sys
# import threading
import threading
from datetime import datetime, timedelta
# from typing import Callable, Dict, List, Optional, Tuple
@ -20,13 +20,13 @@ from PyQt5.QtWidgets import (
QLineEdit,
QListWidgetItem,
QMainWindow,
# QMessageBox,
QMessageBox,
)
#
from dbconfig import engine, Session
import helpers
import music
#
from models import (
Base,
Playdates,
@ -42,7 +42,7 @@ from ui.dlg_SelectPlaylist_ui import Ui_dlgSelectPlaylist # type: ignore
from ui.downloadcsv_ui import Ui_DateSelect # type: ignore
from config import Config
from ui.main_window_ui import Ui_MainWindow # type: ignore
# from utilities import create_track_from_file, update_db
from utilities import create_track_from_file # , update_db
class TrackData:
@ -164,7 +164,7 @@ class Window(QMainWindow, Ui_MainWindow):
lambda: self.tabPlaylist.currentWidget().search_next())
self.actionFind_previous.triggered.connect(
lambda: self.tabPlaylist.currentWidget().search_previous())
# self.actionImport.triggered.connect(self.import_track)
self.actionImport.triggered.connect(self.import_track)
self.actionInsertSectionHeader.triggered.connect(self.insert_header)
self.actionInsertTrack.triggered.connect(self.insert_track)
self.actionMoveSelected.triggered.connect(self.move_selected)
@ -173,7 +173,6 @@ class Window(QMainWindow, Ui_MainWindow):
self.actionPlay_next.triggered.connect(self.play_next)
self.actionSearch.triggered.connect(self.search_playlist)
self.actionSelect_next_track.triggered.connect(self.select_next_row)
# self.actionSelect_played_tracks.triggered.connect(self.select_played)
self.actionSelect_previous_track.triggered.connect(
self.select_previous_row)
self.actionMoveUnplayed.triggered.connect(self.move_unplayed)
@ -419,63 +418,60 @@ class Window(QMainWindow, Ui_MainWindow):
with Session() as session:
for i in range(self.tabPlaylist.count()):
self.tabPlaylist.widget(i).update_display(session)
#
# def import_track(self) -> None:
# """Import track file"""
#
# dlg = QFileDialog()
# dlg.setFileMode(QFileDialog.ExistingFiles)
# dlg.setViewMode(QFileDialog.Detail)
# dlg.setDirectory(Config.IMPORT_DESTINATION)
# dlg.setNameFilter("Music files (*.flac *.mp3)")
#
# if dlg.exec_():
# with Session() as session:
# txt: str = ""
# new_tracks = []
# for fname in dlg.selectedFiles():
# tags = helpers.get_tags(fname)
# new_tracks.append((fname, tags))
# title = tags['title']
# artist = tags['artist']
# possible_matches = Tracks.search_titles(session, title)
# if possible_matches:
# txt += 'Similar to new track '
# txt += f'"{title}" by "{artist} ({fname})":\n\n'
# for track in possible_matches:
# txt += f' "{track.title}" by {track.artist}'
# txt += f' ({track.path})\n'
# txt += "\n"
# # Check whether to proceed if there were potential matches
# if txt:
# txt += "Proceed with import?"
# result = QMessageBox.question(self,
# "Possible duplicates",
# txt,
# QMessageBox.Ok,
# QMessageBox.Cancel
# )
# if result == QMessageBox.Cancel:
# return
#
# # Import in separate thread
# thread = threading.Thread(target=self._import_tracks,
# args=(new_tracks,))
# thread.start()
#
# def _import_tracks(self, tracks: list):
# """
# Import passed files. Don't use parent session as that may be invalid
# by the time we need it.
# """
#
# with Session() as session:
# for (fname, tags) in tracks:
# track = create_track_from_file(session, fname, tags=tags)
# # Add to playlist on screen
# # If we don't specify "repaint=False", playlist will
# # also be saved to database
# self.visible_playlist_tab().insert_track(session, track)
def import_track(self) -> None:
"""Import track file"""
dlg = QFileDialog()
dlg.setFileMode(QFileDialog.ExistingFiles)
dlg.setViewMode(QFileDialog.Detail)
dlg.setDirectory(Config.IMPORT_DESTINATION)
dlg.setNameFilter("Music files (*.flac *.mp3)")
if not dlg.exec_():
return
with Session() as session:
txt = ""
new_tracks = []
for fname in dlg.selectedFiles():
tags = helpers.get_tags(fname)
new_tracks.append((fname, tags))
title = tags['title']
artist = tags['artist']
possible_matches = Tracks.search_titles(session, title)
if possible_matches:
txt += 'Similar to new track '
txt += f'"{title}" by "{artist} ({fname})":\n\n'
for track in possible_matches:
txt += f' "{track.title}" by {track.artist}'
txt += f' ({track.path})\n\n'
txt += "\n"
# Check whether to proceed if there were potential matches
txt += "Proceed with import?"
result = QMessageBox.question(self,
"Possible duplicates",
txt,
QMessageBox.Ok,
QMessageBox.Cancel
)
if result == QMessageBox.Cancel:
return
# Import in separate thread
thread = threading.Thread(target=self._import_tracks,
args=(new_tracks,))
thread.start()
def _import_tracks(self, tracks: list):
"""
Create track objects from passed files and add to visible playlist
"""
with Session() as session:
for (fname, tags) in tracks:
track = create_track_from_file(session, fname, tags=tags)
self.visible_playlist_tab().insert_track(session, track)
def load_last_playlists(self) -> None:
"""Load the playlists that were open when the last session closed"""

View File

@ -579,9 +579,16 @@ class PlaylistTab(QTableWidget):
self._set_unreadable_row(row)
else:
# This is a section header so make empty items (row
# background won't be coloured without items present). Any
# notes should displayed starting in column 0
# This is a section header so it must have note text
if row_data.note is None:
log.debug(
f"insert_row({row_data=}) with no track_id and no note"
)
return
# Make empty items (row background won't be coloured without
# items present). Any notes should displayed starting in
# column 0
for i in range(2, len(columns) - 1):
self.setItem(row, i, QTableWidgetItem())
notes_item = QTableWidgetItem(row_data.note)

View File

@ -1,27 +1,24 @@
# #!/usr/bin/env python
#
# import argparse
# import os
# import shutil
# import tempfile
#
# import helpers
# from config import Config
# from helpers import (
# fade_point,
# get_audio_segment,
# get_tags,
# leading_silence,
# trailing_silence,
# )
import os
import shutil
import tempfile
from config import Config
from helpers import (
fade_point,
get_audio_segment,
get_tags,
leading_silence,
trailing_silence,
)
# from log import log.debug, log.info
# from models import Notes, Playdates, Session, Tracks
# from mutagen.flac import FLAC
# from mutagen.mp3 import MP3
# from pydub import effects
#
# # Globals (I know)
# messages = []
from models import Tracks
from mutagen.flac import FLAC
from mutagen.mp3 import MP3
from pydub import effects
#
#
# def main():
@ -53,79 +50,83 @@
# log.info("No action specified")
#
# log.debug("Finished")
#
#
# def create_track_from_file(session, path, normalise=None, tags=None):
# """
# Create track in database from passed path, or update database entry
# if path already in database.
#
# Return track.
# """
#
# if not tags:
# t = get_tags(path)
# else:
# t = tags
#
# track = Tracks.get_or_create(session, path)
# track.title = t['title']
# track.artist = t['artist']
# audio = get_audio_segment(path)
# track.duration = len(audio)
# track.start_gap = leading_silence(audio)
# track.fade_at = round(fade_point(audio) / 1000,
# Config.MILLISECOND_SIGFIGS) * 1000
# track.silence_at = round(trailing_silence(audio) / 1000,
# Config.MILLISECOND_SIGFIGS) * 1000
# track.mtime = os.path.getmtime(path)
# session.commit()
#
# if normalise or normalise is None and Config.NORMALISE_ON_IMPORT:
# # Check type
# ftype = os.path.splitext(path)[1][1:]
# if ftype not in ['mp3', 'flac']:
# log.info(f"File type {ftype} not implemented")
# return track
#
# # Get current file gid, uid and permissions
# stats = os.stat(path)
# try:
# # Copy original file
# fd, temp_path = tempfile.mkstemp()
# shutil.copyfile(path, temp_path)
# except Exception as err:
# log.debug(f"songdb.create_track_from_file({path}): err1: {repr(err)}")
# return
#
# # Overwrite original file with normalised output
# normalised = effects.normalize(audio)
# try:
# normalised.export(path, format=os.path.splitext(path)[1][1:])
# # Fix up permssions and ownership
# os.chown(path, stats.st_uid, stats.st_gid)
# os.chmod(path, stats.st_mode)
# # Copy tags
# if ftype == 'flac':
# tag_handler = FLAC
# elif ftype == 'mp3':
# tag_handler = MP3
# else:
# return track
# src = tag_handler(temp_path)
# dst = tag_handler(path)
# for tag in src:
# dst[tag] = src[tag]
# dst.save()
# except Exception as err:
# log.debug(f"songdb.create_track_from_file({path}): err2: {repr(err)}")
# # Restore original file
# shutil.copyfile(path, temp_path)
# finally:
# if os.path.exists(temp_path):
# os.remove(temp_path)
#
# return track
def create_track_from_file(session, path, normalise=None, tags=None):
"""
Create track in database from passed path, or update database entry
if path already in database.
Return track.
"""
if not tags:
t = get_tags(path)
else:
t = tags
track = Tracks.get_or_create(session, path)
track.title = t['title']
track.artist = t['artist']
audio = get_audio_segment(path)
track.duration = len(audio)
track.start_gap = leading_silence(audio)
track.fade_at = round(fade_point(audio) / 1000,
Config.MILLISECOND_SIGFIGS) * 1000
track.silence_at = round(trailing_silence(audio) / 1000,
Config.MILLISECOND_SIGFIGS) * 1000
track.mtime = os.path.getmtime(path)
session.commit()
if normalise or normalise is None and Config.NORMALISE_ON_IMPORT:
# Check type
ftype = os.path.splitext(path)[1][1:]
if ftype not in ['mp3', 'flac']:
log.info(f"File type {ftype} not implemented")
return track
# Get current file gid, uid and permissions
stats = os.stat(path)
try:
# Copy original file
fd, temp_path = tempfile.mkstemp()
shutil.copyfile(path, temp_path)
except Exception as err:
log.debug(
f"utilities.create_track_from_file({path}): err1: {repr(err)}"
)
return
# Overwrite original file with normalised output
normalised = effects.normalize(audio)
try:
normalised.export(path, format=os.path.splitext(path)[1][1:])
# Fix up permssions and ownership
os.chown(path, stats.st_uid, stats.st_gid)
os.chmod(path, stats.st_mode)
# Copy tags
if ftype == 'flac':
tag_handler = FLAC
elif ftype == 'mp3':
tag_handler = MP3
else:
return track
src = tag_handler(temp_path)
dst = tag_handler(path)
for tag in src:
dst[tag] = src[tag]
dst.save()
except Exception as err:
log.debug(
f"utilities.create_track_from_file({path}): err2: {repr(err)}"
)
# Restore original file
shutil.copyfile(path, temp_path)
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
return track
#
#
# def full_update_db(session):