Compare commits

..

No commits in common. "61311f67fef3bf219c790b96c793c6adf11b2808" and "92bdf216ca620d16b27f2decb6dbe8b852f2fb75" have entirely different histories.

4 changed files with 272 additions and 217 deletions

View File

@ -545,12 +545,18 @@ class Tracks(Base):
session.add(self)
session.commit()
@staticmethod
def get_all_paths(session) -> List[str]:
"""Return a list of paths of all tracks"""
return session.execute(select(Tracks.path)).scalars().all()
#
# @staticmethod
# def get_all_paths(session) -> List[str]:
# """Return a list of paths of all tracks"""
#
# return [a[0] for a in session.query(Tracks.path).all()]
#
# @classmethod
# def get_all_tracks(cls, session: Session) -> List["Tracks"]:
# """Return a list of all tracks"""
#
# return session.query(cls).all()
@classmethod
def get_or_create(cls, session: Session, path: str) -> "Tracks":
@ -566,18 +572,48 @@ class Tracks(Base):
return track
@classmethod
def get_by_path(cls, session: Session, path: str) -> "Tracks":
"""
Return track with passed path, or None.
"""
return (
session.execute(
select(Tracks)
.where(Tracks.path == path)
).scalar_one()
)
# @classmethod
# def get_by_filename(cls, session: Session, filename: str) \
# -> Optional["Tracks"]:
# """
# Return track if one and only one track in database has passed
# filename (ie, basename of path). Return None if zero or more
# than one track matches.
# """
#
# log.debug(f"Tracks.get_track_from_filename({filename=})")
# try:
# track = session.query(Tracks).filter(Tracks.path.ilike(
# f'%{os.path.sep}{filename}')).one()
# return track
# except (NoResultFound, MultipleResultsFound):
# return None
#
# @classmethod
# def get_by_path(cls, session: Session, path: str) ->
# List["Tracks"]:
# """
# Return track with passee path, or None.
# """
#
# log.debug(f"Tracks.get_track_from_path({path=})")
#
# return session.query(Tracks).filter(Tracks.path ==
# path).first()
#
# @classmethod
# def get_by_id(cls, session: Session, track_id: int) ->
# Optional["Tracks"]:
# """Return track or None"""
#
# try:
# log.debug(f"Tracks.get_track(track_id={track_id})")
# track = session.query(Tracks).filter(Tracks.id ==
# track_id).one()
# return track
# except NoResultFound:
# log.error(f"get_track({track_id}): not found")
# return None
def rescan(self, session: Session) -> None:
"""

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python
from log import log
import argparse
# import argparse
import sys
import threading
@ -41,7 +41,7 @@ from ui.dlg_SelectPlaylist_ui import Ui_dlgSelectPlaylist # type: ignore
from ui.downloadcsv_ui import Ui_DateSelect # type: ignore
from config import Config
from ui.main_window_ui import Ui_MainWindow # type: ignore
from utilities import create_track_from_file, check_db
from utilities import create_track_from_file # , update_db
class TrackData:
@ -1096,35 +1096,30 @@ class SelectPlaylistDialog(QDialog):
if __name__ == "__main__":
"""
If command line arguments given, carry out requested function and
exit. Otherwise run full application.
"""
p = argparse.ArgumentParser()
# Only allow at most one option to be specified
group = p.add_mutually_exclusive_group()
group.add_argument('-c', '--check-database',
action="store_true", dest="check_db",
default=False, help="Check and report on database")
# group.add_argument('-f', '--full-update',
# action="store_true", dest="full_update",
# p = argparse.ArgumentParser()
# # Only allow at most one option to be specified
# group = p.add_mutually_exclusive_group()
# group.add_argument('-u', '--update',
# action="store_true", dest="update",
# default=False, help="Update database")
# group.add_argument('-i', '--import', dest="fname", help="Input
# # group.add_argument('-f', '--full-update',
# # action="store_true", dest="full_update",
# # default=False, help="Update database")
# # group.add_argument('-i', '--import', dest="fname", help="Input
# file")
args = p.parse_args()
# Run as required
if args.check_db:
log.debug("Updating database")
with Session() as session:
check_db(session)
# elif args.full_update:
# log.debug("Full update of database")
# args = p.parse_args()
#
# # Run as required
# if args.update:
# log.debug("Updating database")
# with Session() as session:
# full_update_db(session)
else:
# Normal run
# update_db(session)
# # elif args.full_update:
# # log.debug("Full update of database")
# # with Session() as session:
# # full_update_db(session)
# else:
# # Normal run
try:
Base.metadata.create_all(engine)
app = QApplication(sys.argv)

View File

@ -151,8 +151,6 @@ class PlaylistTab(QTableWidget):
self.selecting_in_progress = False
# Connect signals
self.horizontalHeader().sectionResized.connect(self._column_resize)
# self.horizontalHeader().sectionClicked.connect(self._header_click)
# self.setSortingEnabled(True)
# Now load our tracks and notes
self.populate(session, self.playlist_id)
@ -625,7 +623,7 @@ class PlaylistTab(QTableWidget):
- Note start time
- Mark next-track row as current
- Mark current row as played
- Scroll to put next track as required
- Scroll to put current track as required
- Set next track
- Update display
"""
@ -642,6 +640,14 @@ class PlaylistTab(QTableWidget):
# Mark current row as played
self._set_played_row(session, current_row)
# Scroll to put current track Config.SCROLL_TOP_MARGIN from the
# top. Rows number from zero, so set (current_row -
# Config.SCROLL_TOP_MARGIN + 1) row to be top row
top_row = max(0, current_row - Config.SCROLL_TOP_MARGIN + 1)
scroll_item = self.item(top_row, 0)
self.scrollToItem(scroll_item, QAbstractItemView.PositionAtTop)
# Set next track
search_from = current_row + 1
next_row = self._find_next_track_row(session, search_from)
@ -1068,16 +1074,6 @@ class PlaylistTab(QTableWidget):
section_start_plr,
self._get_section_timing_string(section_time, no_end=True)
)
# Scroll to put next track Config.SCROLL_TOP_MARGIN from the
# top. Rows number from zero, so set (current_row -
# Config.SCROLL_TOP_MARGIN + 1) row to be top row
if next_row is not None:
top_row = max(0, next_row - Config.SCROLL_TOP_MARGIN + 1)
scroll_item = self.item(top_row, 0)
self.scrollToItem(scroll_item, QAbstractItemView.PositionAtTop)
#
# # ########## Internally called functions ##########
@ -1344,11 +1340,6 @@ class PlaylistTab(QTableWidget):
return self._meta_search(RowMeta.UNREADABLE, one=False)
# def _header_click(self, index: int) -> None:
# """Handle playlist header click"""
# print(f"_header_click({index=})")
def _info_row(self, track_id: int) -> None:
"""Display popup with info re row"""

View File

@ -1,5 +1,6 @@
# #!/usr/bin/env python
#
# import argparse
import os
import shutil
import tempfile
@ -12,11 +13,43 @@ from helpers import (
leading_silence,
trailing_silence,
)
from log import log
# from log import log.debug, log.info
# from models import Notes, Playdates, Session, Tracks
from models import Tracks
from mutagen.flac import FLAC # type: ignore
from mutagen.mp3 import MP3 # type: ignore
from mutagen.flac import FLAC
from mutagen.mp3 import MP3
from pydub import effects
#
#
# def main():
# """Main loop"""
#
# log.debug("Starting")
#
# p = argparse.ArgumentParser()
# # Only allow one option to be specified
# group = p.add_mutually_exclusive_group()
# group.add_argument('-u', '--update',
# action="store_true", dest="update",
# default=False, help="Update database")
# group.add_argument('-f', '--full-update',
# action="store_true", dest="full_update",
# default=False, help="Update database")
# args = p.parse_args()
#
# # Run as required
# if args.update:
# log.debug("Updating database")
# with Session() as session:
# update_db(session)
# elif args.full_update:
# log.debug("Full update of database")
# with Session() as session:
# full_update_db(session)
# else:
# log.info("No action specified")
#
# log.debug("Finished")
def create_track_from_file(session, path, normalise=None, tags=None):
@ -94,144 +127,141 @@ def create_track_from_file(session, path, normalise=None, tags=None):
os.remove(temp_path)
return track
# def full_update_db(session):
# """Rescan all entries in database"""
#
# def log(msg):
# log.info(f"full_update_db(): {msg}")
#
# def check_change(track_id, title, attribute, old, new):
# if new > (old * 1.1) or new < (old * 0.9):
# log(
# "\n"
# f"track[{track_id}] ({title}) "
# f"{attribute} updated from {old} to {new}"
# )
#
# # Start with normal update to add new tracks and remove any missing
# # files
# log("update_db()")
# update_db(session)
#
# # Now update track length, silence and fade for every track in
# # database
#
# tracks = Tracks.get_all_tracks(session)
# total_tracks = len(tracks)
# log(f"Processing {total_tracks} tracks")
# track_count = 0
# for track in tracks:
# track_count += 1
# print(f"\rTrack {track_count} of {total_tracks}", end='')
#
# # Sanity check
# tag = get_music_info(track.path)
# if not tag['title']:
# log(f"track[{track.id}] {track.title=}: No tag title")
# continue
# if not tag['artist']:
# log(f"track[{track.id}] {track.artist=}: No tag artist")
# continue
#
# # Update title and artist
# if track.title != tag['title']:
# track.title = tag['title']
# if track.artist != tag['artist']:
# track.artist = tag['artist']
#
# # Update numbers; log if more than 10% different
# duration = int(round(
# tag['duration'], Config.MILLISECOND_SIGFIGS) * 1000)
# check_change(track.id, track.title, "duration", track.duration,
# duration)
# track.duration = duration
#
# audio = get_audio_segment(track.path)
#
# start_gap = leading_silence(audio)
# check_change(track.id, track.title, "start_gap", track.start_gap,
# start_gap)
# track.start_gap = start_gap
#
# fade_at = fade_point(audio)
# check_change(track.id, track.title, "fade_at", track.fade_at,
# fade_at)
# track.fade_at = fade_at
#
# silence_at = trailing_silence(audio)
# check_change(track.id, track.title, "silence_at", track.silence_at,
# silence_at)
# track.silence_at = silence_at
# session.commit()
def check_db(session):
"""
Database consistency check.
A report is generated if issues are found, but there are no automatic
corrections made.
Search for tracks that are in the music directory but not the datebase
Check all paths in database exist
"""
db_paths = set(Tracks.get_all_paths(session))
os_paths_list = []
for root, dirs, files in os.walk(Config.ROOT):
for f in files:
path = os.path.join(root, f)
ext = os.path.splitext(f)[1]
if ext in [".flac", ".mp3"]:
os_paths_list.append(path)
os_paths = set(os_paths_list)
# Find any files in music directory that are not in database
files_not_in_db = list(os_paths - db_paths)
# Find paths in database missing in music directory
paths_not_found = []
missing_file_count = 0
more_files_to_report = False
for path in list(db_paths - os_paths):
if missing_file_count >= Config.MAX_MISSING_FILES_TO_REPORT:
more_files_to_report = True
break
missing_file_count += 1
track = Tracks.get_by_path(session, path)
if not track:
# This shouldn't happen as we're looking for paths in
# database that aren't in filesystem, but just in case...
log.error(f"update_db: {path} not found in db")
continue
paths_not_found.append(track)
# Output messages (so if running via cron, these will get sent to
# user)
if files_not_in_db:
print("Files in music directory but not in database")
print("--------------------------------------------")
print("\n".join(files_not_in_db))
print("\n")
if paths_not_found:
print("Invalid paths in database")
print("-------------------------")
for t in paths_not_found:
print(f"""
Track ID: {t.id}
Path: {t.path}
Title: {t.title}
Artist: {t.artist}
""")
if more_files_to_report:
print("There were more paths than listed that were not found")
#
#
# def full_update_db(session):
# """Rescan all entries in database"""
#
# def log(msg):
# log.info(f"full_update_db(): {msg}")
#
# def check_change(track_id, title, attribute, old, new):
# if new > (old * 1.1) or new < (old * 0.9):
# log(
# "\n"
# f"track[{track_id}] ({title}) "
# f"{attribute} updated from {old} to {new}"
# )
#
# # Start with normal update to add new tracks and remove any missing
# # files
# log("update_db()")
# update_db(session)
#
# # Now update track length, silence and fade for every track in
# # database
#
# tracks = Tracks.get_all_tracks(session)
# total_tracks = len(tracks)
# log(f"Processing {total_tracks} tracks")
# track_count = 0
# for track in tracks:
# track_count += 1
# print(f"\rTrack {track_count} of {total_tracks}", end='')
#
# # Sanity check
# tag = get_music_info(track.path)
# if not tag['title']:
# log(f"track[{track.id}] {track.title=}: No tag title")
# continue
# if not tag['artist']:
# log(f"track[{track.id}] {track.artist=}: No tag artist")
# continue
#
# # Update title and artist
# if track.title != tag['title']:
# track.title = tag['title']
# if track.artist != tag['artist']:
# track.artist = tag['artist']
#
# # Update numbers; log if more than 10% different
# duration = int(round(
# tag['duration'], Config.MILLISECOND_SIGFIGS) * 1000)
# check_change(track.id, track.title, "duration", track.duration,
# duration)
# track.duration = duration
#
# audio = get_audio_segment(track.path)
#
# start_gap = leading_silence(audio)
# check_change(track.id, track.title, "start_gap", track.start_gap,
# start_gap)
# track.start_gap = start_gap
#
# fade_at = fade_point(audio)
# check_change(track.id, track.title, "fade_at", track.fade_at,
# fade_at)
# track.fade_at = fade_at
#
# silence_at = trailing_silence(audio)
# check_change(track.id, track.title, "silence_at", track.silence_at,
# silence_at)
# track.silence_at = silence_at
# session.commit()
#
#
# def update_db(session):
# """
# Repopulate database
# """
#
# # Search for tracks that are in the music directory but not the datebase
# # Check all paths in database exist
# # If issues found, write to stdout but do not try to resolve them
#
# db_paths = set(Tracks.get_all_paths(session))
#
# os_paths_list = []
# for root, dirs, files in os.walk(Config.ROOT):
# for f in files:
# path = os.path.join(root, f)
# ext = os.path.splitext(f)[1]
# if ext in [".flac", ".mp3"]:
# os_paths_list.append(path)
# os_paths = set(os_paths_list)
#
# # Find any files in music directory that are not in database
# files_not_in_db = list(os_paths - db_paths)
#
# # Find paths in database missing in music directory
# paths_not_found = []
# missing_file_count = 0
# more_files_to_report = False
# for path in list(db_paths - os_paths):
# if missing_file_count >= Config.MAX_MISSING_FILES_TO_REPORT:
# more_files_to_report = True
# break
#
# missing_file_count += 1
#
# track = Tracks.get_by_path(session, path)
# if not track:
# log.error(f"update_db: {path} not found in db")
# continue
#
# paths_not_found.append(track)
#
# # Output messages (so if running via cron, these will get sent to
# # user)
# if files_not_in_db:
# print("Files in music directory but not in database")
# print("--------------------------------------------")
# print("\n".join(files_not_in_db))
# print("\n")
# if paths_not_found:
# print("Invalid paths in database")
# print("-------------------------")
# for t in paths_not_found:
# print(f"""
# Track ID: {t.id}
# Path: {t.path}
# Title: {t.title}
# Artist: {t.artist}
# """)
# if more_files_to_report:
# print("There were more paths than listed that were not found")
#
#
# # Spike
# #
# # # Manage tracks listed in database but where path is invalid
@ -258,3 +288,6 @@ def check_db(session):
# #
# # # Remove Track entry pointing to invalid path
# # Tracks.remove_by_path(session, path)
#
# if __name__ == '__main__' and '__file__' in globals():
# main()