641 lines
19 KiB
Python
641 lines
19 KiB
Python
#!/usr/bin/python3
|
|
|
|
import os.path
|
|
import re
|
|
|
|
import sqlalchemy
|
|
|
|
from datetime import datetime
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy import (
|
|
Boolean,
|
|
Column,
|
|
DateTime,
|
|
Float,
|
|
ForeignKey,
|
|
Integer,
|
|
String,
|
|
func
|
|
)
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound
|
|
from sqlalchemy.orm import relationship, sessionmaker, scoped_session
|
|
|
|
from config import Config
|
|
from log import DEBUG, ERROR
|
|
|
|
# Create session at the global level as per
|
|
# https://docs.sqlalchemy.org/en/13/orm/session_basics.html
|
|
|
|
Base = declarative_base()
|
|
Session = scoped_session(sessionmaker())
|
|
|
|
|
|
def db_init():
|
|
# Set up database connection
|
|
|
|
global Session
|
|
|
|
engine = sqlalchemy.create_engine(
|
|
f"{Config.MYSQL_CONNECT}?charset=utf8",
|
|
encoding='utf-8',
|
|
echo=Config.DISPLAY_SQL,
|
|
pool_pre_ping=True)
|
|
|
|
Session.configure(bind=engine)
|
|
Base.metadata.create_all(engine)
|
|
|
|
# Create a Session factory
|
|
Session = sessionmaker(bind=engine)
|
|
|
|
|
|
# Database classes
|
|
class NoteColours(Base):
|
|
__tablename__ = 'notecolours'
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
substring = Column(String(256), index=False)
|
|
colour = Column(String(21), index=False)
|
|
enabled = Column(Boolean, default=True, index=True)
|
|
is_regex = Column(Boolean, default=False, index=False)
|
|
is_casesensitive = Column(Boolean, default=False, index=False)
|
|
order = Column(Integer, index=True)
|
|
|
|
def __repr__(self):
|
|
return (
|
|
f"<NoteColour(id={self.id}, substring={self.substring}, "
|
|
f"colour={self.colour}>"
|
|
)
|
|
|
|
@classmethod
|
|
def get_all(cls, session):
|
|
"""Return all records"""
|
|
|
|
return session.query(cls).all()
|
|
|
|
@classmethod
|
|
def get_by_id(cls, session, note_id):
|
|
"""Return record identified by id, or None if not found"""
|
|
|
|
return session.query(NoteColours).filter(
|
|
NoteColours.id == note_id).first()
|
|
|
|
@staticmethod
|
|
def get_colour(session, text):
|
|
"""
|
|
Parse text and return colour string if matched, else None
|
|
"""
|
|
|
|
for rec in (
|
|
session.query(NoteColours)
|
|
.filter(NoteColours.enabled.is_(True))
|
|
.order_by(NoteColours.order)
|
|
.all()
|
|
):
|
|
if rec.is_regex:
|
|
flags = re.UNICODE
|
|
if not rec.is_casesensitive:
|
|
flags |= re.IGNORECASE
|
|
p = re.compile(rec.substring, flags)
|
|
if p.match(text):
|
|
return rec.colour
|
|
else:
|
|
if rec.is_casesensitive:
|
|
if rec.substring in text:
|
|
return rec.colour
|
|
else:
|
|
if rec.substring.lower() in text.lower():
|
|
return rec.colour
|
|
|
|
return None
|
|
|
|
|
|
class Notes(Base):
|
|
__tablename__ = 'notes'
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
playlist_id = Column(Integer, ForeignKey('playlists.id'))
|
|
playlist = relationship("Playlists", back_populates="notes")
|
|
row = Column(Integer, nullable=False)
|
|
note = Column(String(256), index=False)
|
|
|
|
def __init__(self, session, playlist_id, row, text):
|
|
"""Create note"""
|
|
|
|
DEBUG(f"Notes.__init__({playlist_id=}, {row=}, {text=})")
|
|
self.playlist_id = playlist_id
|
|
self.row = row
|
|
self.note = text
|
|
session.add(self)
|
|
session.commit()
|
|
|
|
def __repr__(self):
|
|
return (
|
|
f"<Note(id={self.id}, row={self.row}, note={self.note}>"
|
|
)
|
|
|
|
def delete_note(self, session):
|
|
"""Delete note"""
|
|
|
|
DEBUG(f"delete_note({self.id=}")
|
|
|
|
session.query(self).filter(id == self.id).delete()
|
|
session.commit()
|
|
|
|
def update_note(self, session, row, text=None):
|
|
"""
|
|
Update note details. If text=None, don't change text.
|
|
"""
|
|
|
|
DEBUG(f"Notes.update_note({self.id=}, {row=}, {text=})")
|
|
|
|
note = session.query(self).filter(id == self.id).one()
|
|
note.row = row
|
|
if text:
|
|
note.note = text
|
|
session.commit()
|
|
|
|
|
|
class Playdates(Base):
|
|
__tablename__ = 'playdates'
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
lastplayed = Column(DateTime, index=True, default=None)
|
|
track_id = Column(Integer, ForeignKey('tracks.id'))
|
|
tracks = relationship("Tracks", back_populates="playdates")
|
|
|
|
@classmethod
|
|
def add_playdate(cls, session, track):
|
|
"""Record that track was played"""
|
|
|
|
DEBUG(f"add_playdate(track={track})")
|
|
pd = Playdates()
|
|
pd.lastplayed = datetime.now()
|
|
pd.track_id = track.id
|
|
session.add(pd)
|
|
track.update_lastplayed(session)
|
|
session.commit()
|
|
|
|
return pd
|
|
|
|
@staticmethod
|
|
def last_played(session, track_id):
|
|
"""Return datetime track last played or None"""
|
|
|
|
last_played = session.query(Playdates.lastplayed).filter(
|
|
(Playdates.track_id == track_id)
|
|
).order_by(Playdates.lastplayed.desc()).first()
|
|
if last_played:
|
|
return last_played[0]
|
|
else:
|
|
return None
|
|
|
|
@staticmethod
|
|
def remove_track(session, track_id):
|
|
"""
|
|
Remove all records of track_id
|
|
"""
|
|
|
|
session.query(Playdates).filter(
|
|
Playdates.track_id == track_id,
|
|
).delete()
|
|
session.commit()
|
|
|
|
|
|
class Playlists(Base):
|
|
"""
|
|
Usage:
|
|
|
|
pl = session.query(Playlists).filter(Playlists.id == 1).one()
|
|
|
|
pl
|
|
<Playlist(id=1, name=Default>
|
|
|
|
pl.tracks
|
|
[<__main__.PlaylistTracks at 0x7fcd20181c18>,
|
|
<__main__.PlaylistTracks at 0x7fcd20181c88>,
|
|
<__main__.PlaylistTracks at 0x7fcd20181be0>,
|
|
<__main__.PlaylistTracks at 0x7fcd20181c50>]
|
|
|
|
[a.tracks for a in pl.tracks]
|
|
[<Track(id=3992, title=Yesterday Man, artist=Various, path=/h[...]
|
|
<Track(id=2238, title=These Boots Are Made For Walkin', arti[...]
|
|
<Track(id=3837, title=Babe, artist=Various, path=/home/kae/m[...]
|
|
<Track(id=2332, title=Such Great Heights - Remastered, artis[...]]
|
|
|
|
glue = PlaylistTracks(row=5)
|
|
|
|
tr = session.query(Tracks).filter(Tracks.id == 676).one()
|
|
|
|
tr
|
|
<Track(id=676, title=Seven Nation Army, artist=White Stripes,
|
|
path=/home/kae/music/White Stripes/Elephant/01. Seven Nation Army.flac>
|
|
|
|
glue.track_id = tr.id
|
|
|
|
pl.tracks.append(glue)
|
|
|
|
session.commit()
|
|
|
|
[a.tracks for a in pl.tracks]
|
|
[<Track(id=3992, title=Yesterday Man, artist=Various, path=/h[...]
|
|
<Track(id=2238, title=These Boots Are Made For Walkin', arti[...]
|
|
<Track(id=3837, title=Babe, artist=Various, path=/home/kae/m[...]
|
|
<Track(id=2332, title=Such Great Heights - Remastered, artis[...]
|
|
<Track(id=676, title=Seven Nation Army, artist=White Stripes[...]]
|
|
"""
|
|
|
|
__tablename__ = "playlists"
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
name = Column(String(32), nullable=False, unique=True)
|
|
last_used = Column(DateTime, default=None, nullable=True)
|
|
loaded = Column(Boolean, default=True, nullable=False)
|
|
notes = relationship("Notes",
|
|
order_by="Notes.row",
|
|
back_populates="playlist")
|
|
tracks = relationship("PlaylistTracks",
|
|
order_by="PlaylistTracks.row",
|
|
back_populates="playlists")
|
|
|
|
def __init__(self, session, name):
|
|
self.name = name
|
|
session.add(playlist)
|
|
session.commit()
|
|
|
|
def __repr__(self):
|
|
return f"<Playlists(id={self.id}, name={self.name}>"
|
|
|
|
def add_note(self, session, row, text):
|
|
"""Add note to playlist at passed row"""
|
|
|
|
return Notes(session, self.id, row, text)
|
|
|
|
def add_track(self, session, track, row=None):
|
|
"""
|
|
Add track to playlist at given row.
|
|
If row=None, add to end of playlist
|
|
"""
|
|
|
|
if not row:
|
|
last_row = session.query(
|
|
func.max(PlaylistTracks.row)
|
|
).filter_by(playlist_id=self.id).first()
|
|
if last_row:
|
|
row = last_row[0] + 1
|
|
else:
|
|
row = 0
|
|
|
|
PlaylistTracks(session, self.id, track.id, row)
|
|
|
|
def close(self, session):
|
|
"""Record playlist as no longer loaded"""
|
|
|
|
self.loaded = False
|
|
session.add(self)
|
|
session.commit()
|
|
|
|
@classmethod
|
|
def get_all(cls, session):
|
|
"""Returns a list of all playlists ordered by last use"""
|
|
|
|
return (
|
|
session.query(cls)
|
|
.order_by(cls.last_used.desc())
|
|
).all()
|
|
|
|
@classmethod
|
|
def get_by_id(cls, session, playlist_id):
|
|
return (session.query(cls).filter(cls.id == playlist_id)).one()
|
|
|
|
@classmethod
|
|
def get_closed(cls, session):
|
|
"""Returns a list of all closed playlists ordered by last use"""
|
|
|
|
return (
|
|
session.query(cls)
|
|
.filter(cls.loaded.is_(False))
|
|
.order_by(cls.last_used.desc())
|
|
).all()
|
|
|
|
@classmethod
|
|
def get_open(cls, session):
|
|
"""
|
|
Return a list of playlists marked "loaded", ordered by loaded date.
|
|
"""
|
|
|
|
return (
|
|
session.query(cls)
|
|
.filter(cls.loaded.is_(True))
|
|
.order_by(cls.last_used.desc())
|
|
).all()
|
|
|
|
def mark_open(self, session):
|
|
"""Mark playlist as loaded and used now"""
|
|
|
|
self.loaded = True
|
|
self.last_used = datetime.now()
|
|
session.add(self)
|
|
session.commit()
|
|
|
|
def remove_all_tracks(self, session):
|
|
"""
|
|
Remove all tracks from this playlist
|
|
"""
|
|
|
|
session.query(PlaylistTracks).filter(
|
|
PlaylistTracks.playlist_id == self.id,
|
|
).delete()
|
|
session.commit()
|
|
|
|
def remove_track(self, session, row):
|
|
|
|
DEBUG(f"Playlist.remove_track({playlist_id=}, {row=})")
|
|
|
|
session.query(PlaylistTracks).filter(
|
|
PlaylistTracks.playlist_id == self.id,
|
|
PlaylistTracks.row == row
|
|
).delete()
|
|
session.commit()
|
|
|
|
|
|
class PlaylistTracks(Base):
|
|
__tablename__ = 'playlisttracks'
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
playlist_id = Column(Integer, ForeignKey('playlists.id'), primary_key=True)
|
|
track_id = Column(Integer, ForeignKey('tracks.id'), primary_key=True)
|
|
row = Column(Integer, nullable=False)
|
|
tracks = relationship("Tracks", back_populates="playlists")
|
|
playlists = relationship("Playlists", back_populates="tracks")
|
|
|
|
def __init__(self, session, playlist_id, track_id, row):
|
|
DEBUG(f"PlaylistTracks.__init__({playlist_id=}, {track_id=}, {row=})")
|
|
|
|
self.playlist_id = playlist_id,
|
|
self.track_id = track_id,
|
|
self.row = row
|
|
session.add(self)
|
|
session.commit()
|
|
|
|
@staticmethod
|
|
def get_track_playlists(session, track_id):
|
|
"""Return all PlaylistTracks objects with this track_id"""
|
|
|
|
return session.query(PlaylistTracks).filter(
|
|
PlaylistTracks.track_id == track_id).all()
|
|
|
|
# @staticmethod
|
|
# def move_track(session, from_playlist_id, row, to_playlist_id):
|
|
|
|
# DEBUG(
|
|
# "PlaylistTracks.move_tracks(from_playlist_id="
|
|
# f"{from_playlist_id}, row={row}, "
|
|
# f"to_playlist_id={to_playlist_id})"
|
|
# )
|
|
# max_row = session.query(func.max(PlaylistTracks.row)).filter(
|
|
# PlaylistTracks.playlist_id == to_playlist_id).scalar()
|
|
# if max_row is None:
|
|
# # Destination playlist is empty; use row 0
|
|
# new_row = 0
|
|
# else:
|
|
# # Destination playlist has tracks; add to end
|
|
# new_row = max_row + 1
|
|
# try:
|
|
# record = session.query(PlaylistTracks).filter(
|
|
# PlaylistTracks.playlist_id == from_playlist_id,
|
|
# PlaylistTracks.row == row).one()
|
|
# except NoResultFound:
|
|
# # Issue #38?
|
|
# ERROR(
|
|
# f"No rows matched in query: "
|
|
# f"PlaylistTracks.playlist_id == {from_playlist_id}, "
|
|
# f"PlaylistTracks.row == {row}"
|
|
# )
|
|
# return
|
|
# record.playlist_id = to_playlist_id
|
|
# record.row = new_row
|
|
# session.commit()
|
|
|
|
|
|
class Settings(Base):
|
|
__tablename__ = 'settings'
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
name = Column(String(32), nullable=False, unique=True)
|
|
f_datetime = Column(DateTime, default=None, nullable=True)
|
|
f_int = Column(Integer, default=None, nullable=True)
|
|
f_string = Column(String(128), default=None, nullable=True)
|
|
|
|
@classmethod
|
|
def get_int(cls, session, name):
|
|
try:
|
|
int_setting = session.query(cls).filter(
|
|
cls.name == name).one()
|
|
except NoResultFound:
|
|
int_setting = Settings()
|
|
int_setting.name = name
|
|
int_setting.f_int = None
|
|
session.add(int_setting)
|
|
session.commit()
|
|
return int_setting
|
|
|
|
def update(self, session, data):
|
|
for key, value in data.items():
|
|
assert hasattr(self, key)
|
|
setattr(self, key, value)
|
|
session.commit()
|
|
|
|
|
|
class Tracks(Base):
|
|
__tablename__ = 'tracks'
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
title = Column(String(256), index=True)
|
|
artist = Column(String(256), index=True)
|
|
duration = Column(Integer, index=True)
|
|
start_gap = Column(Integer, index=False)
|
|
fade_at = Column(Integer, index=False)
|
|
silence_at = Column(Integer, index=False)
|
|
path = Column(String(2048), index=False, nullable=False)
|
|
mtime = Column(Float, index=True)
|
|
lastplayed = Column(DateTime, index=True, default=None)
|
|
playlists = relationship("PlaylistTracks", back_populates="tracks")
|
|
playdates = relationship("Playdates", back_populates="tracks")
|
|
|
|
def __init__(self, session, path):
|
|
self.path = path
|
|
session.add(self)
|
|
session.commit()
|
|
|
|
def __repr__(self):
|
|
return (
|
|
f"<Track(id={self.id}, title={self.title}, "
|
|
f"artist={self.artist}, path={self.path}>"
|
|
)
|
|
|
|
@staticmethod
|
|
def get_all_paths(session):
|
|
"""Return a list of paths of all tracks"""
|
|
|
|
return [a[0] for a in session.query(Tracks.path).all()]
|
|
|
|
@classmethod
|
|
def get_all_tracks(cls, session):
|
|
"Return a list of all tracks"
|
|
=======
|
|
@classmethod
|
|
def get_all_tracks(cls, session):
|
|
"""Return a list of all tracks"""
|
|
|
|
return session.query(cls).all()
|
|
|
|
@classmethod
|
|
def get_or_create(cls, session, path):
|
|
"""
|
|
If a track with path exists, return it;
|
|
else created new track and return it
|
|
"""
|
|
|
|
DEBUG(f"Tracks.get_or_create(path=})")
|
|
|
|
try:
|
|
track = session.query(cls).filter(cls.path == path).one()
|
|
except NoResultFound:
|
|
track = Tracks(session, path)
|
|
|
|
return track
|
|
|
|
# @staticmethod
|
|
# def get_duration(session, id):
|
|
# try:
|
|
# return session.query(
|
|
# Tracks.duration).filter(Tracks.id == id).one()[0]
|
|
# except NoResultFound:
|
|
# ERROR(f"Can't find track id {id}")
|
|
# return None
|
|
|
|
@classmethod
|
|
def get_from_filename(cls, session, filename):
|
|
"""
|
|
Return track if one and only one track in database has passed
|
|
filename (ie, basename of path). Return None if zero or more
|
|
than one track matches.
|
|
"""
|
|
|
|
DEBUG(f"Tracks.get_track_from_filename({filename=})")
|
|
try:
|
|
track = session.query(Tracks).filter(Tracks.path.ilike(
|
|
f'%{os.path.sep}{filename}')).one()
|
|
return track
|
|
except (NoResultFound, MultipleResultsFound):
|
|
return None
|
|
|
|
@classmethod
|
|
def get_from_id(cls, session, id):
|
|
return session.query(Tracks).filter(
|
|
Tracks.id == id).one()
|
|
|
|
@classmethod
|
|
def get_from_path(cls, session, path):
|
|
"""
|
|
Return track with passee path, or None.
|
|
"""
|
|
|
|
DEBUG(f"Tracks.get_track_from_path({path=})")
|
|
|
|
return session.query(Tracks).filter(Tracks.path == path).first()
|
|
|
|
# @staticmethod
|
|
# def get_path(session, track_id):
|
|
# "Return path of passed track_id, or None"
|
|
#
|
|
# try:
|
|
# return session.query(Tracks.path).filter(
|
|
# Tracks.id == track_id).one()[0]
|
|
# except NoResultFound:
|
|
# ERROR(f"Can't find track id {track_id}")
|
|
# return None
|
|
|
|
@classmethod
|
|
def get_by_id(cls, session, track_id):
|
|
"""Return track or None"""
|
|
|
|
try:
|
|
DEBUG(f"Tracks.get_track(track_id={track_id})")
|
|
track = session.query(Tracks).filter(Tracks.id == track_id).one()
|
|
return track
|
|
except NoResultFound:
|
|
ERROR(f"get_track({track_id}): not found")
|
|
return None
|
|
|
|
@staticmethod
|
|
def remove_by_path(session, path):
|
|
"Remove track with passed path from database"
|
|
|
|
DEBUG(f"Tracks.remove_path({path=})")
|
|
|
|
try:
|
|
session.query(Tracks).filter(Tracks.path == path).delete()
|
|
session.commit()
|
|
except IntegrityError as exception:
|
|
ERROR(f"Can't remove track with {path=} ({exception=})")
|
|
|
|
# @staticmethod
|
|
# def search(session, title=None, artist=None, duration=None):
|
|
# """
|
|
# Return any tracks matching passed criteria
|
|
# """
|
|
#
|
|
# DEBUG(
|
|
# f"Tracks.search({title=}, {artist=}), {duration=})"
|
|
# )
|
|
#
|
|
# if not title and not artist and not duration:
|
|
# return None
|
|
#
|
|
# q = session.query(Tracks).filter(False)
|
|
# if title:
|
|
# q = q.filter(Tracks.title == title)
|
|
# if artist:
|
|
# q = q.filter(Tracks.artist == artist)
|
|
# if duration:
|
|
# q = q.filter(Tracks.duration == duration)
|
|
#
|
|
# return q.all()
|
|
|
|
@classmethod
|
|
def search_artists(cls, session, text):
|
|
|
|
return (
|
|
session.query(Tracks)
|
|
.filter(Tracks.artist.ilike(f"%{text}%"))
|
|
.order_by(Tracks.title)
|
|
).all()
|
|
|
|
@classmethod
|
|
def search_titles(cls, session, text):
|
|
return (
|
|
session.query(Tracks)
|
|
.filter(Tracks.title.ilike(f"%{text}%"))
|
|
.order_by(Tracks.title)
|
|
).all()
|
|
|
|
def update_lastplayed(self, session):
|
|
self.lastplayed = datetime.now()
|
|
session.add(self)
|
|
session.commit()
|
|
|
|
def update_artist(self, session, artist):
|
|
self.artist = artist
|
|
session.add(self)
|
|
session.commit()
|
|
|
|
def update_title(self, session, title):
|
|
self.title = title
|
|
session.add(self)
|
|
session.commit()
|
|
|
|
def update_path(self, newpath):
|
|
self.path = newpath
|