Implement musicuster --check-database

This commit is contained in:
Keith Edmunds 2022-08-15 15:58:55 +01:00
parent 8ec0911ce4
commit 61311f67fe
3 changed files with 199 additions and 263 deletions

View File

@ -545,18 +545,12 @@ class Tracks(Base):
session.add(self) session.add(self)
session.commit() session.commit()
#
# @staticmethod @staticmethod
# def get_all_paths(session) -> List[str]: def get_all_paths(session) -> List[str]:
# """Return a list of paths of all tracks""" """Return a list of paths of all tracks"""
#
# return [a[0] for a in session.query(Tracks.path).all()] return session.execute(select(Tracks.path)).scalars().all()
#
# @classmethod
# def get_all_tracks(cls, session: Session) -> List["Tracks"]:
# """Return a list of all tracks"""
#
# return session.query(cls).all()
@classmethod @classmethod
def get_or_create(cls, session: Session, path: str) -> "Tracks": def get_or_create(cls, session: Session, path: str) -> "Tracks":
@ -572,48 +566,18 @@ class Tracks(Base):
return track return track
# @classmethod @classmethod
# def get_by_filename(cls, session: Session, filename: str) \ def get_by_path(cls, session: Session, path: str) -> "Tracks":
# -> Optional["Tracks"]: """
# """ Return track with passed path, or None.
# Return track if one and only one track in database has passed """
# filename (ie, basename of path). Return None if zero or more
# than one track matches. return (
# """ session.execute(
# select(Tracks)
# log.debug(f"Tracks.get_track_from_filename({filename=})") .where(Tracks.path == path)
# try: ).scalar_one()
# track = session.query(Tracks).filter(Tracks.path.ilike( )
# f'%{os.path.sep}{filename}')).one()
# return track
# except (NoResultFound, MultipleResultsFound):
# return None
#
# @classmethod
# def get_by_path(cls, session: Session, path: str) ->
# List["Tracks"]:
# """
# Return track with passee path, or None.
# """
#
# log.debug(f"Tracks.get_track_from_path({path=})")
#
# return session.query(Tracks).filter(Tracks.path ==
# path).first()
#
# @classmethod
# def get_by_id(cls, session: Session, track_id: int) ->
# Optional["Tracks"]:
# """Return track or None"""
#
# try:
# log.debug(f"Tracks.get_track(track_id={track_id})")
# track = session.query(Tracks).filter(Tracks.id ==
# track_id).one()
# return track
# except NoResultFound:
# log.error(f"get_track({track_id}): not found")
# return None
def rescan(self, session: Session) -> None: def rescan(self, session: Session) -> None:
""" """

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python #!/usr/bin/env python
from log import log from log import log
# import argparse import argparse
import sys import sys
import threading import threading
@ -41,7 +41,7 @@ from ui.dlg_SelectPlaylist_ui import Ui_dlgSelectPlaylist # type: ignore
from ui.downloadcsv_ui import Ui_DateSelect # type: ignore from ui.downloadcsv_ui import Ui_DateSelect # type: ignore
from config import Config from config import Config
from ui.main_window_ui import Ui_MainWindow # type: ignore from ui.main_window_ui import Ui_MainWindow # type: ignore
from utilities import create_track_from_file # , update_db from utilities import create_track_from_file, check_db
class TrackData: class TrackData:
@ -1096,30 +1096,35 @@ class SelectPlaylistDialog(QDialog):
if __name__ == "__main__": if __name__ == "__main__":
# p = argparse.ArgumentParser() """
# # Only allow at most one option to be specified If command line arguments given, carry out requested function and
# group = p.add_mutually_exclusive_group() exit. Otherwise run full application.
# group.add_argument('-u', '--update', """
# action="store_true", dest="update",
p = argparse.ArgumentParser()
# Only allow at most one option to be specified
group = p.add_mutually_exclusive_group()
group.add_argument('-c', '--check-database',
action="store_true", dest="check_db",
default=False, help="Check and report on database")
# group.add_argument('-f', '--full-update',
# action="store_true", dest="full_update",
# default=False, help="Update database") # default=False, help="Update database")
# # group.add_argument('-f', '--full-update', # group.add_argument('-i', '--import', dest="fname", help="Input
# # action="store_true", dest="full_update",
# # default=False, help="Update database")
# # group.add_argument('-i', '--import', dest="fname", help="Input
# file") # file")
# args = p.parse_args() args = p.parse_args()
#
# # Run as required # Run as required
# if args.update: if args.check_db:
# log.debug("Updating database") log.debug("Updating database")
with Session() as session:
check_db(session)
# elif args.full_update:
# log.debug("Full update of database")
# with Session() as session: # with Session() as session:
# update_db(session) # full_update_db(session)
# # elif args.full_update: else:
# # log.debug("Full update of database") # Normal run
# # with Session() as session:
# # full_update_db(session)
# else:
# # Normal run
try: try:
Base.metadata.create_all(engine) Base.metadata.create_all(engine)
app = QApplication(sys.argv) app = QApplication(sys.argv)

View File

@ -1,6 +1,5 @@
# #!/usr/bin/env python # #!/usr/bin/env python
# #
# import argparse
import os import os
import shutil import shutil
import tempfile import tempfile
@ -13,43 +12,11 @@ from helpers import (
leading_silence, leading_silence,
trailing_silence, trailing_silence,
) )
# from log import log.debug, log.info from log import log
# from models import Notes, Playdates, Session, Tracks
from models import Tracks from models import Tracks
from mutagen.flac import FLAC from mutagen.flac import FLAC # type: ignore
from mutagen.mp3 import MP3 from mutagen.mp3 import MP3 # type: ignore
from pydub import effects from pydub import effects
#
#
# def main():
# """Main loop"""
#
# log.debug("Starting")
#
# p = argparse.ArgumentParser()
# # Only allow one option to be specified
# group = p.add_mutually_exclusive_group()
# group.add_argument('-u', '--update',
# action="store_true", dest="update",
# default=False, help="Update database")
# group.add_argument('-f', '--full-update',
# action="store_true", dest="full_update",
# default=False, help="Update database")
# args = p.parse_args()
#
# # Run as required
# if args.update:
# log.debug("Updating database")
# with Session() as session:
# update_db(session)
# elif args.full_update:
# log.debug("Full update of database")
# with Session() as session:
# full_update_db(session)
# else:
# log.info("No action specified")
#
# log.debug("Finished")
def create_track_from_file(session, path, normalise=None, tags=None): def create_track_from_file(session, path, normalise=None, tags=None):
@ -127,8 +94,7 @@ def create_track_from_file(session, path, normalise=None, tags=None):
os.remove(temp_path) os.remove(temp_path)
return track return track
#
#
# def full_update_db(session): # def full_update_db(session):
# """Rescan all entries in database""" # """Rescan all entries in database"""
# #
@ -198,70 +164,74 @@ def create_track_from_file(session, path, normalise=None, tags=None):
# silence_at) # silence_at)
# track.silence_at = silence_at # track.silence_at = silence_at
# session.commit() # session.commit()
#
#
# def update_db(session): def check_db(session):
# """ """
# Repopulate database Database consistency check.
# """
# A report is generated if issues are found, but there are no automatic
# # Search for tracks that are in the music directory but not the datebase corrections made.
# # Check all paths in database exist
# # If issues found, write to stdout but do not try to resolve them Search for tracks that are in the music directory but not the datebase
# Check all paths in database exist
# db_paths = set(Tracks.get_all_paths(session)) """
#
# os_paths_list = [] db_paths = set(Tracks.get_all_paths(session))
# for root, dirs, files in os.walk(Config.ROOT):
# for f in files: os_paths_list = []
# path = os.path.join(root, f) for root, dirs, files in os.walk(Config.ROOT):
# ext = os.path.splitext(f)[1] for f in files:
# if ext in [".flac", ".mp3"]: path = os.path.join(root, f)
# os_paths_list.append(path) ext = os.path.splitext(f)[1]
# os_paths = set(os_paths_list) if ext in [".flac", ".mp3"]:
# os_paths_list.append(path)
# # Find any files in music directory that are not in database os_paths = set(os_paths_list)
# files_not_in_db = list(os_paths - db_paths)
# # Find any files in music directory that are not in database
# # Find paths in database missing in music directory files_not_in_db = list(os_paths - db_paths)
# paths_not_found = []
# missing_file_count = 0 # Find paths in database missing in music directory
# more_files_to_report = False paths_not_found = []
# for path in list(db_paths - os_paths): missing_file_count = 0
# if missing_file_count >= Config.MAX_MISSING_FILES_TO_REPORT: more_files_to_report = False
# more_files_to_report = True for path in list(db_paths - os_paths):
# break if missing_file_count >= Config.MAX_MISSING_FILES_TO_REPORT:
# more_files_to_report = True
# missing_file_count += 1 break
#
# track = Tracks.get_by_path(session, path) missing_file_count += 1
# if not track:
# log.error(f"update_db: {path} not found in db") track = Tracks.get_by_path(session, path)
# continue if not track:
# # This shouldn't happen as we're looking for paths in
# paths_not_found.append(track) # database that aren't in filesystem, but just in case...
# log.error(f"update_db: {path} not found in db")
# # Output messages (so if running via cron, these will get sent to continue
# # user)
# if files_not_in_db: paths_not_found.append(track)
# print("Files in music directory but not in database")
# print("--------------------------------------------") # Output messages (so if running via cron, these will get sent to
# print("\n".join(files_not_in_db)) # user)
# print("\n") if files_not_in_db:
# if paths_not_found: print("Files in music directory but not in database")
# print("Invalid paths in database") print("--------------------------------------------")
# print("-------------------------") print("\n".join(files_not_in_db))
# for t in paths_not_found: print("\n")
# print(f""" if paths_not_found:
# Track ID: {t.id} print("Invalid paths in database")
# Path: {t.path} print("-------------------------")
# Title: {t.title} for t in paths_not_found:
# Artist: {t.artist} print(f"""
# """) Track ID: {t.id}
# if more_files_to_report: Path: {t.path}
# print("There were more paths than listed that were not found") Title: {t.title}
# Artist: {t.artist}
# """)
if more_files_to_report:
print("There were more paths than listed that were not found")
# # Spike # # Spike
# # # #
# # # Manage tracks listed in database but where path is invalid # # # Manage tracks listed in database but where path is invalid
@ -288,6 +258,3 @@ def create_track_from_file(session, path, normalise=None, tags=None):
# # # #
# # # Remove Track entry pointing to invalid path # # # Remove Track entry pointing to invalid path
# # Tracks.remove_by_path(session, path) # # Tracks.remove_by_path(session, path)
#
# if __name__ == '__main__' and '__file__' in globals():
# main()