musicmuster/app/models.py
2022-09-12 18:23:30 +01:00

599 lines
18 KiB
Python

#!/usr/bin/python3
#
import os.path
import re
#
from dbconfig import Session
#
from datetime import datetime
from typing import List, Optional
#
# from pydub import AudioSegment
from sqlalchemy.ext.associationproxy import association_proxy
# from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta
from sqlalchemy import (
Boolean,
Column,
DateTime,
delete,
Float,
ForeignKey,
func,
Integer,
select,
String,
UniqueConstraint,
)
# from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import (
backref,
declarative_base,
relationship,
RelationshipProperty
)
from sqlalchemy.orm.collections import attribute_mapped_collection
from sqlalchemy.orm.exc import (
# MultipleResultsFound,
NoResultFound
)
#
from config import Config
from helpers import (
fade_point,
get_audio_segment,
get_tags,
leading_silence,
trailing_silence,
)
from log import log
#
Base = declarative_base()
# Database classes
class NoteColours(Base):
__tablename__ = 'notecolours'
id = Column(Integer, primary_key=True, autoincrement=True)
substring = Column(String(256), index=False)
colour = Column(String(21), index=False)
enabled = Column(Boolean, default=True, index=True)
is_regex = Column(Boolean, default=False, index=False)
is_casesensitive = Column(Boolean, default=False, index=False)
order = Column(Integer, index=True)
def __repr__(self) -> str:
return (
f"<NoteColour(id={self.id}, substring={self.substring}, "
f"colour={self.colour}>"
)
# def __init__(
# self, session: Session, substring: str, colour: str,
# enabled: bool = True, is_regex: bool = False,
# is_casesensitive: bool = False, order: int = 0) -> None:
# self.substring = substring
# self.colour = colour
# self.enabled = enabled
# self.is_regex = is_regex
# self.is_casesensitive = is_casesensitive
# self.order = order
#
# session.add(self)
# session.flush()
#
# @classmethod
# def get_all(cls, session: Session) ->
# Optional[List["NoteColours"]]:
# """Return all records"""
#
# return session.query(cls).all()
#
# @classmethod
# def get_by_id(cls, session: Session, note_id: int) -> \
# Optional["NoteColours"]:
# """Return record identified by id, or None if not found"""
#
# return session.query(NoteColours).filter(
# NoteColours.id == note_id).first()
@staticmethod
def get_colour(session: Session, text: str) -> Optional[str]:
"""
Parse text and return colour string if matched, else None
"""
if not text:
return None
for rec in session.execute(
select(NoteColours)
.filter(NoteColours.enabled.is_(True))
.order_by(NoteColours.order)
).scalars().all():
if rec.is_regex:
flags = re.UNICODE
if not rec.is_casesensitive:
flags |= re.IGNORECASE
p = re.compile(rec.substring, flags)
if p.match(text):
return rec.colour
else:
if rec.is_casesensitive:
if rec.substring in text:
return rec.colour
else:
if rec.substring.lower() in text.lower():
return rec.colour
return None
class Playdates(Base):
__tablename__ = 'playdates'
id: int = Column(Integer, primary_key=True, autoincrement=True)
lastplayed = Column(DateTime, index=True, default=None)
track_id = Column(Integer, ForeignKey('tracks.id'))
track = relationship("Tracks", back_populates="playdates")
def __repr__(self) -> str:
return (
f"<Playdates(id={self.id}, track_id={self.track_id} "
f"lastplayed={self.lastplayed}>"
)
def __init__(self, session: Session, track_id: int) -> None:
"""Record that track was played"""
self.lastplayed = datetime.now()
self.track_id = track_id
session.add(self)
session.commit()
@staticmethod
def last_played(session: Session, track_id: int) -> Optional[datetime]:
"""Return datetime track last played or None"""
last_played = session.execute(
select(Playdates.lastplayed)
.where(Playdates.track_id == track_id)
.order_by(Playdates.lastplayed.desc())
.limit(1)
).first()
if last_played:
return last_played[0]
else:
return None
@staticmethod
def played_after(session: Session, since: datetime) -> List["Playdates"]:
"""Return a list of Playdates objects since passed time"""
return (
session.execute(
select(Playdates)
.where(Playdates.lastplayed >= since)
.order_by(Playdates.lastplayed)
)
.scalars()
.all()
)
# @staticmethod
# def remove_track(session: Session, track_id: int) -> None:
# """
# Remove all records of track_id
# """
#
# session.query(Playdates).filter(
# Playdates.track_id == track_id).delete()
# session.flush()
class Playlists(Base):
"""
Manage playlists
"""
__tablename__ = "playlists"
id: int = Column(Integer, primary_key=True, autoincrement=True)
name: str = Column(String(32), nullable=False, unique=True)
last_used = Column(DateTime, default=None, nullable=True)
loaded: bool = Column(Boolean, default=True, nullable=False)
rows = relationship(
"PlaylistRows",
back_populates="playlist",
cascade="all, delete-orphan",
order_by="PlaylistRows.row_number"
)
def __repr__(self) -> str:
return f"<Playlists(id={self.id}, name={self.name}>"
def __init__(self, session: Session, name: str) -> None:
self.name = name
session.add(self)
session.commit()
# def add_track(
# self, session: Session, track_id: int,
# row: Optional[int] = None) -> None:
# """
# Add track to playlist at given row.
# If row=None, add to end of playlist
# """
#
# if row is None:
# row = self.next_free_row(session, self.id)
#
# xPlaylistTracks(session, self.id, track_id, row)
def close(self, session: Session) -> None:
"""Mark playlist as unloaded"""
self.loaded = False
@classmethod
def get_all(cls, session: Session) -> List["Playlists"]:
"""Returns a list of all playlists ordered by last use"""
return (
session.execute(
select(cls)
.order_by(cls.loaded.desc(), cls.last_used.desc())
)
.scalars()
.all()
)
@classmethod
def get_closed(cls, session: Session) -> List["Playlists"]:
"""Returns a list of all closed playlists ordered by last use"""
return (
session.execute(
select(cls)
.filter(cls.loaded.is_(False))
.order_by(cls.last_used.desc())
)
.scalars()
.all()
)
@classmethod
def get_open(cls, session: Session) -> List[Optional["Playlists"]]:
"""
Return a list of playlists marked "loaded", ordered by loaded date.
"""
return (
session.execute(
select(cls)
.where(cls.loaded.is_(True))
.order_by(cls.last_used.desc())
)
.scalars()
.all()
)
def mark_open(self, session: Session) -> None:
"""Mark playlist as loaded and used now"""
self.loaded = True
self.last_used = datetime.now()
# def remove_track(self, session: Session, row: int) -> None:
# log.debug(f"Playlist.remove_track({self.id=}, {row=})")
#
# # Refresh self first (this is necessary when calling
# remove_track
# # multiple times before session.commit())
# session.refresh(self)
# # Get tracks collection for this playlist
# # Tracks are a dictionary of tracks keyed on row
# # number. Remove the relevant row.
# del self.tracks[row]
# # Save the new tracks collection
# session.flush()
#
class PlaylistRows(Base):
__tablename__ = 'playlist_rows'
id = Column(Integer, primary_key=True, autoincrement=True)
row_number = Column(Integer, nullable=False)
note = Column(String(2048), index=False)
playlist_id = Column(Integer, ForeignKey('playlists.id'), nullable=False)
playlist = relationship(Playlists, back_populates="rows")
track_id = Column(Integer, ForeignKey('tracks.id'), nullable=True)
track = relationship("Tracks", back_populates="playlistrows")
played = Column(Boolean, nullable=False, index=False, default=False)
def __repr__(self) -> str:
return (
f"<PlaylistRow(id={self.id}, playlist_id={self.playlist_id}, "
f"track_id={self.track_id}, "
f"note={self.note} row_number={self.row_number}>"
)
def __init__(
self, session: Session, playlist_id: int, track_id: int,
row_number: int) -> None:
"""Create PlaylistRows object"""
self.playlist_id = playlist_id
self.track_id = track_id
self.row_number = row_number
session.add(self)
session.flush()
@staticmethod
def delete_higher_rows(session: Session, playlist_id: int, row: int) \
-> None:
"""
Delete rows in given playlist that have a higher row number
than 'row'
"""
# Log the rows to be deleted
rows_to_go = session.execute(
select(PlaylistRows)
.where(PlaylistRows.playlist_id == playlist_id,
PlaylistRows.row_number > row)
).scalars().all()
if not rows_to_go:
return
for row in rows_to_go:
log.debug(f"Should delete: {row}")
# If needed later:
# session.delete(row)
rows_to_go = session.execute(
select(PlaylistRows)
.where(PlaylistRows.playlist_id == playlist_id,
PlaylistRows.row_number > row)
).scalars().all()
@staticmethod
def delete_rows(session: Session, ids: List[int]) -> None:
"""
Delete passed ids
"""
session.execute(
delete(PlaylistRows)
.where(PlaylistRows.id.in_(ids))
)
# Delete won't take effect until commit()
session.commit()
@staticmethod
def fixup_rownumbers(session: Session, playlist_id: int) -> None:
"""
Ensure the row numbers for passed playlist have no gaps
"""
plrs = session.execute(
select(PlaylistRows)
.where(PlaylistRows.playlist_id == playlist_id)
.order_by(PlaylistRows.row_number)
).scalars().all()
for i, plr in enumerate(plrs):
plr.row_number = i
# Ensure new row numbers are available to the caller
session.commit()
@classmethod
def get_played_rows(cls, session: Session,
playlist_id: int) -> List[int]:
"""
For passed playlist, return a list of rows that
have been played.
"""
plrs = session.execute(
select(cls)
.where(
cls.playlist_id == playlist_id,
cls.played.is_(True)
)
.order_by(cls.row_number)
).scalars().all()
return plrs
@classmethod
def get_rows_with_tracks(cls, session: Session,
playlist_id: int) -> List[int]:
"""
For passed playlist, return a list of rows that
contain tracks
"""
plrs = session.execute(
select(cls)
.where(
cls.playlist_id == playlist_id,
cls.track_id.is_not(None)
)
.order_by(cls.row_number)
).scalars().all()
return plrs
@staticmethod
def get_last_used_row(session: Session, playlist_id: int) -> Optional[int]:
"""Return the last used row for playlist, or None if no rows"""
return session.execute(
select(func.max(PlaylistRows.row_number))
.where(PlaylistRows.playlist_id == playlist_id)
).scalar_one()
@classmethod
def get_unplayed_rows(cls, session: Session,
playlist_id: int) -> List[int]:
"""
For passed playlist, return a list of track rows that
have not been played.
"""
plrs = session.execute(
select(cls)
.where(
cls.playlist_id == playlist_id,
cls.track_id.is_not(None),
cls.played.is_(False)
)
.order_by(cls.row_number)
).scalars().all()
return plrs
class Settings(Base):
"""Manage settings"""
__tablename__ = 'settings'
id: int = Column(Integer, primary_key=True, autoincrement=True)
name: str = Column(String(64), nullable=False, unique=True)
f_datetime = Column(DateTime, default=None, nullable=True)
f_int: int = Column(Integer, default=None, nullable=True)
f_string = Column(String(128), default=None, nullable=True)
def __repr__(self) -> str:
value = self.f_datetime or self.f_int or self.f_string
return f"<Settings(id={self.id}, name={self.name}, {value=}>"
@classmethod
def get_int_settings(cls, session: Session, name: str) -> "Settings":
"""Get setting for an integer or return new setting record"""
int_setting: Settings
try:
int_setting = session.execute(
select(cls)
.where(cls.name == name)
).scalar_one()
except NoResultFound:
int_setting = Settings()
int_setting.name = name
int_setting.f_int = None
session.add(int_setting)
return int_setting
def update(self, session: Session, data: "Settings"):
for key, value in data.items():
assert hasattr(self, key)
setattr(self, key, value)
session.flush()
class Tracks(Base):
__tablename__ = 'tracks'
id: int = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String(256), index=True)
artist = Column(String(256), index=True)
duration = Column(Integer, index=True)
start_gap = Column(Integer, index=False)
fade_at = Column(Integer, index=False)
silence_at = Column(Integer, index=False)
path = Column(String(2048), index=False, nullable=False, unique=True)
mtime = Column(Float, index=True)
bitrate = Column(Integer, nullable=True, default=None)
playlistrows = relationship("PlaylistRows", back_populates="track")
playlists = association_proxy("playlistrows", "playlist")
playdates = relationship("Playdates", back_populates="track")
def __repr__(self) -> str:
return (
f"<Track(id={self.id}, title={self.title}, "
f"artist={self.artist}, path={self.path}>"
)
def __init__(
self,
session: Session,
path: str,
title: Optional[str] = None,
artist: Optional[str] = None,
duration: int = 0,
start_gap: int = 0,
fade_at: Optional[int] = None,
silence_at: Optional[int] = None,
mtime: Optional[float] = None,
lastplayed: Optional[datetime] = None,
) -> None:
self.path = path
self.title = title
self.artist = artist
self.duration = duration
self.start_gap = start_gap
self.fade_at = fade_at
self.silence_at = silence_at
self.mtime = mtime
self.lastplayed = lastplayed
session.add(self)
session.commit()
@classmethod
def get_all(cls, session) -> List["Tracks"]:
"""Return a list of all tracks"""
return session.execute(select(cls)).scalars().all()
@classmethod
def get_by_path(cls, session: Session, path: str) -> "Tracks":
"""
Return track with passed path, or None.
"""
try:
return (
session.execute(
select(Tracks)
.where(Tracks.path == path)
).scalar_one()
)
except NoResultFound:
return None
@classmethod
def search_artists(cls, session: Session, text: str) -> List["Tracks"]:
"""Search case-insenstively for artists containing str"""
return (
session.execute(
select(cls)
.where(cls.artist.ilike(f"%{text}%"))
.order_by(cls.title)
)
.scalars()
.all()
)
@classmethod
def search_titles(cls, session: Session, text: str) -> List["Tracks"]:
"""Search case-insenstively for titles containing str"""
return (
session.execute(
select(cls)
.where(cls.title.ilike(f"%{text}%"))
.order_by(cls.title)
)
.scalars()
.all()
)