WIP: typing

This commit is contained in:
Keith Edmunds 2023-01-30 19:29:33 +00:00
parent dfb9326d5e
commit 73bb4b3a7f
4 changed files with 70 additions and 151 deletions

View File

@ -63,7 +63,7 @@ class Carts(Base):
def __init__(self, session: scoped_session, cart_number: int,
name: Optional[str] = None,
duration: int = None, path: str = None,
duration: Optional[int] = None, path: Optional[str] = None,
enabled: bool = True) -> None:
"""Create new cart"""
@ -132,7 +132,7 @@ class Playdates(Base):
id: int = Column(Integer, primary_key=True, autoincrement=True)
lastplayed = Column(DateTime, index=True, default=None)
track_id = Column(Integer, ForeignKey('tracks.id'))
track = relationship("Tracks", back_populates="playdates")
track: "Tracks" = relationship("Tracks", back_populates="playdates")
def __repr__(self) -> str:
return (
@ -149,7 +149,8 @@ class Playdates(Base):
session.commit()
@staticmethod
def last_played(session: scoped_session, track_id: int) -> Optional[datetime]:
def last_played(session: scoped_session,
track_id: int) -> Optional[datetime]:
"""Return datetime track last played or None"""
last_played = session.execute(
@ -165,7 +166,8 @@ class Playdates(Base):
return None
@staticmethod
def played_after(session: scoped_session, since: datetime) -> List["Playdates"]:
def played_after(session: scoped_session,
since: datetime) -> List["Playdates"]:
"""Return a list of Playdates objects since passed time"""
return (
@ -186,7 +188,7 @@ class Playlists(Base):
__tablename__ = "playlists"
id = Column(Integer, primary_key=True, autoincrement=True)
id = Column(Integer, primary_key=True, autoincrement=True, nullable=False)
name = Column(String(32), nullable=False, unique=True)
last_used = Column(DateTime, default=None, nullable=True)
tab = Column(Integer, default=None, nullable=True, unique=True)
@ -194,7 +196,7 @@ class Playlists(Base):
is_template = Column(Boolean, default=False, nullable=False)
query = Column(String(256), default=None, nullable=True, unique=False)
deleted = Column(Boolean, default=False, nullable=False)
rows = relationship(
rows: "PlaylistRows" = relationship(
"PlaylistRows",
back_populates="playlist",
cascade="all, delete-orphan",
@ -207,7 +209,7 @@ class Playlists(Base):
f"is_templatee={self.is_template}>"
)
def __init__(self, session: scoped_session, name: str) -> None:
def __init__(self, session: scoped_session, name: str):
self.name = name
session.add(self)
session.commit()
@ -345,9 +347,9 @@ class PlaylistRows(Base):
row_number = Column(Integer, nullable=False)
note = Column(String(2048), index=False)
playlist_id = Column(Integer, ForeignKey('playlists.id'), nullable=False)
playlist = relationship(Playlists, back_populates="rows")
playlist: Playlists = relationship(Playlists, back_populates="rows")
track_id = Column(Integer, ForeignKey('tracks.id'), nullable=True)
track = relationship("Tracks", back_populates="playlistrows")
track: "Tracks" = relationship("Tracks", back_populates="playlistrows")
played = Column(Boolean, nullable=False, index=False, default=False)
def __repr__(self) -> str:
@ -448,7 +450,8 @@ class PlaylistRows(Base):
).first()
@staticmethod
def get_last_used_row(session: scoped_session, playlist_id: int) -> Optional[int]:
def get_last_used_row(session: scoped_session,
playlist_id: int) -> Optional[int]:
"""Return the last used row for playlist, or None if no rows"""
return session.execute(
@ -515,8 +518,8 @@ class PlaylistRows(Base):
return plrs
@staticmethod
def move_rows_down(session: scoped_session, playlist_id: int, starting_row: int,
move_by: int) -> None:
def move_rows_down(session: scoped_session, playlist_id: int,
starting_row: int, move_by: int) -> None:
"""
Create space to insert move_by additional rows by incremented row
number from starting_row to end of playlist
@ -567,27 +570,27 @@ class Settings(Base):
value = self.f_datetime or self.f_int or self.f_string
return f"<Settings(id={self.id}, name={self.name}, {value=}>"
def __init__(self, session: scoped_session, name: str):
self.name = name
session.add(self)
session.flush()
@classmethod
def get_int_settings(cls, session: scoped_session, name: str) -> "Settings":
def get_int_settings(cls, session: scoped_session,
name: str) -> "Settings":
"""Get setting for an integer or return new setting record"""
int_setting: Settings
try:
int_setting = session.execute(
return session.execute(
select(cls)
.where(cls.name == name)
).scalar_one()
except NoResultFound:
int_setting = Settings()
int_setting.name = name
int_setting.f_int = None
session.add(int_setting)
return Settings(session, name)
return int_setting
def update(self, session: scoped_session, data: "Settings"):
def update(self, session: scoped_session, data: dict):
for key, value in data.items():
assert hasattr(self, key)
setattr(self, key, value)
@ -607,9 +610,10 @@ class Tracks(Base):
path = Column(String(2048), index=False, nullable=False, unique=True)
mtime = Column(Float, index=True)
bitrate = Column(Integer, nullable=True, default=None)
playlistrows = relationship("PlaylistRows", back_populates="track")
playlistrows: PlaylistRows = relationship("PlaylistRows",
back_populates="track")
playlists = association_proxy("playlistrows", "playlist")
playdates = relationship("Playdates", back_populates="track")
playdates: Playdates = relationship("Playdates", back_populates="track")
def __repr__(self) -> str:
return (
@ -650,7 +654,8 @@ class Tracks(Base):
return session.execute(select(cls)).scalars().all()
@classmethod
def get_by_path(cls, session: scoped_session, path: str) -> "Tracks":
def get_by_path(cls, session: scoped_session,
path: str) -> Optional["Tracks"]:
"""
Return track with passed path, or None.
"""
@ -666,7 +671,8 @@ class Tracks(Base):
return None
@classmethod
def search_artists(cls, session: scoped_session, text: str) -> List["Tracks"]:
def search_artists(cls, session: scoped_session,
text: str) -> List["Tracks"]:
"""Search case-insenstively for artists containing str"""
return (
@ -680,7 +686,8 @@ class Tracks(Base):
)
@classmethod
def search_titles(cls, session: scoped_session, text: str) -> List["Tracks"]:
def search_titles(cls, session: scoped_session,
text: str) -> List["Tracks"]:
"""Search case-insenstively for titles containing str"""
return (
session.execute(

View File

@ -27,7 +27,7 @@ from PyQt5.QtWidgets import (
QProgressBar,
)
from dbconfig import engine, Session
from dbconfig import engine, Session, scoped_session
import helpers
import music
@ -59,7 +59,8 @@ class CartButton(QPushButton):
"""Create a cart pushbutton and set it disabled"""
super().__init__(parent)
self.parent = parent
# Next line is redundant (check)
# self.parent = parent
self.cart_id = cart.id
if cart.path and cart.enabled and not cart.duration:
tags = helpers.get_tags(cart.path)
@ -126,28 +127,28 @@ class PlaylistTrack:
number: that's the playlist's problem.
"""
self.artist = None
self.duration = None
self.end_time = None
self.fade_at = None
self.fade_length = None
self.path = None
self.playlist_id = None
self.playlist_tab = None
self.plr_id = None
self.silence_at = None
self.start_gap = None
self.start_time = None
self.title = None
self.track_id = None
self.artist: Optional[str] = None
self.duration: Optional[int] = None
self.end_time: Optional[datetime] = None
self.fade_at: Optional[int] = None
self.fade_length: Optional[int] = None
self.path: Optional[str] = None
self.playlist_id: Optional[int] = None
self.playlist_tab: Optional[PlaylistTab] = None
self.plr_id: Optional[int] = None
self.silence_at: Optional[datetime] = None
self.start_gap: Optional[int] = None
self.start_time: Optional[datetime] = None
self.title: Optional[str] = None
self.track_id: Optional[int] = None
def __repr__(self) -> str:
return (
f"<PlaylistTrack(title={self.title}, artist={self.artist}, "
f"row_number={self.row_number} playlist_id={self.playlist_id}>"
f"playlist_id={self.playlist_id}>"
)
def set_plr(self, session: Session, plr: PlaylistRows,
def set_plr(self, session: scoped_session, plr: PlaylistRows,
tab: PlaylistTab) -> None:
"""
Update with new plr information
@ -492,7 +493,7 @@ class Window(QMainWindow, Ui_MainWindow):
self.timer.timeout.connect(self.tick)
def create_playlist(self,
session: Session,
session: scoped_session,
playlist_name: Optional[str] = None) -> Playlists:
"""Create new playlist"""
@ -512,7 +513,7 @@ class Window(QMainWindow, Ui_MainWindow):
if playlist:
self.create_playlist_tab(session, playlist)
def create_playlist_tab(self, session: Session,
def create_playlist_tab(self, session: scoped_session,
playlist: Playlists) -> int:
"""
Take the passed playlist database object, create a playlist tab and
@ -714,7 +715,7 @@ class Window(QMainWindow, Ui_MainWindow):
QMessageBox.Ok
)
def get_one_track(self, session: Session) -> Optional[Tracks]:
def get_one_track(self, session: scoped_session) -> Optional[Tracks]:
"""Show dialog box to select one track and return it to caller"""
dlg = DbDialog(self, session, get_one_track=True)
@ -833,10 +834,10 @@ class Window(QMainWindow, Ui_MainWindow):
_ = self.create_playlist_tab(session, playlist)
# Set active tab
record = Settings.get_int_settings(session, "active_tab")
if record and record.f_int is not None:
if record and record.f_int >= 0:
self.tabPlaylist.setCurrentIndex(record.f_int)
def move_playlist_rows(self, session: Session,
def move_playlist_rows(self, session: scoped_session,
playlistrows: List[PlaylistRows]) -> None:
"""
Move passed playlist rows to another playlist
@ -1325,7 +1326,7 @@ class Window(QMainWindow, Ui_MainWindow):
# May also be called when last tab is closed
pass
def this_is_the_next_playlist_row(self, session: Session,
def this_is_the_next_playlist_row(self, session: scoped_session,
plr: PlaylistRows,
playlist_tab: PlaylistTab) -> None:
"""
@ -1468,6 +1469,9 @@ class Window(QMainWindow, Ui_MainWindow):
# Time to end
self.label_end_timer.setText(helpers.ms_to_mmss(time_to_end))
# Autoplay next track
# if time_to_silence <= 1500:
# self.play_next()
else:
if self.playing:
self.stop_playing()
@ -1500,7 +1504,7 @@ class Window(QMainWindow, Ui_MainWindow):
class CartDialog(QDialog):
"""Edit cart details"""
def __init__(self, parent: QMainWindow, session: Session,
def __init__(self, parent: QMainWindow, session: scoped_session,
cart: Carts) -> None:
"""
Manage carts
@ -1537,7 +1541,7 @@ class CartDialog(QDialog):
class DbDialog(QDialog):
"""Select track from database"""
def __init__(self, parent: QMainWindow, session: Session,
def __init__(self, parent: QMainWindow, session: scoped_session,
get_one_track: bool = False) -> None:
"""
Subclassed QDialog to manage track selection
@ -1775,7 +1779,9 @@ if __name__ == "__main__":
app = QApplication(sys.argv)
win = Window()
win.show()
sys.exit(app.exec())
status = app.exec()
engine.dispose()
sys.exit(status)
except Exception as exc:
from helpers import send_mail

View File

@ -1205,7 +1205,7 @@ class PlaylistTab(QTableWidget):
return start + timedelta(milliseconds=duration)
def _column_resize(self, idx: int, old: int, new: int) -> None:
def _column_resize(self, idx: int, _old: int, _new: int) -> None:
"""
Called when column widths are changed.
@ -1722,7 +1722,7 @@ class PlaylistTab(QTableWidget):
continue
attr_name = f"playlist_{column_name}_col_width"
record: Settings = Settings.get_int_settings(session, attr_name)
if record and record.f_int is not None:
if record and record.f_int >= 0:
self.setColumnWidth(idx, record.f_int)
else:
self.setColumnWidth(idx, Config.DEFAULT_COLUMN_WIDTH)

94
play.py
View File

@ -1,94 +0,0 @@
#!/usr/bin/env python
from sqlalchemy import create_engine
from sqlalchemy import text
from sqlalchemy import Table, Column, Integer, String
from sqlalchemy import ForeignKey
from sqlalchemy import select
from sqlalchemy import insert
from sqlalchemy.orm import Session
from sqlalchemy.orm import declarative_base
from sqlalchemy.orm import relationship
Base = declarative_base()
engine = create_engine("sqlite+pysqlite:///:memory:", echo=True, future=True)
class User(Base):
__tablename__ = 'user_account'
id = Column(Integer, primary_key=True)
name = Column(String(30))
fullname = Column(String)
addresses = relationship("Address", back_populates="user")
def __repr__(self):
return (
f"User(id={self.id!r}, name={self.name!r}, "
f"fullname={self.fullname!r})"
)
class Address(Base):
__tablename__ = 'address'
id = Column(Integer, primary_key=True)
email_address = Column(String, nullable=False)
user_id = Column(Integer, ForeignKey('user_account.id'))
user = relationship("User", back_populates="addresses")
def __repr__(self):
return f"Address(id={self.id!r}, email_address={self.email_address!r})"
Base.metadata.create_all(engine)
squidward = User(name="squidward", fullname="Squidward Tentacles")
krabs = User(name="ehkrabs", fullname="Eugene H. Krabs")
session = Session(engine)
session.add(squidward)
session.add(krabs)
session.commit()
u1 = User(name='pkrabs', fullname='Pearl Krabs')
a1 = Address(email_address="pearl.krabs@gmail.com")
u1.addresses.append(a1)
a2 = Address(email_address="pearl@aol.com", user=u1)
session.add(u1)
session.add(a1)
session.add(a2)
session.commit()
# with engine.connect() as conn:
# conn.execute(text("CREATE TABLE some_table (x int, y int)"))
# conn.execute(
# text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
# [{"x": 1, "y": 1}, {"x": 2, "y": 4}]
# )
# conn.commit()
#
# with engine.begin() as conn:
# conn.execute(
# text("INSERT INTO some_table (x, y) VALUES (:x, :y)"),
# [{"x": 6, "y": 8}, {"x": 9, "y": 10}]
# )
#
# # with engine.connect() as conn:
# # result = conn.execute(text("SELECT x, y FROM some_table"))
# # for row in result:
# # print(f"x: {row.x} y: {row.y}")
#
#
# stmt = text(
# "SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6)
#
# with Session(engine) as session:
# result = session.execute(stmt)
# for row in result:
# print(f"x: {row.x} y: {row.y}")