Compare commits
3 Commits
f57bcc37f6
...
3832d9300c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3832d9300c | ||
|
|
afb8ddfaf5 | ||
|
|
617c39c0de |
@ -1,5 +1,6 @@
|
|||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from enum import auto, Enum
|
from enum import auto, Enum
|
||||||
|
from sqlalchemy import bindparam, update
|
||||||
from typing import List, Optional, TYPE_CHECKING
|
from typing import List, Optional, TYPE_CHECKING
|
||||||
|
|
||||||
from dbconfig import scoped_session, Session
|
from dbconfig import scoped_session, Session
|
||||||
@ -84,13 +85,16 @@ class PlaylistModel(QAbstractTableModel):
|
|||||||
"""
|
"""
|
||||||
The Playlist Model
|
The Playlist Model
|
||||||
|
|
||||||
Update strategy: update the database and then refresh the cached copy (self.playlist_rows).
|
Update strategy: update the database and then refresh the
|
||||||
We do not try to edit playlist_rows directly. It would be too easy for a bug to get us
|
row-indexed cached copy (self.playlist_rows). Do not edit
|
||||||
out of sync with the database, and if that wasn't immediately apparent then debugging it
|
self.playlist_rows directly because keeping it and the
|
||||||
would be hard.
|
database in sync is uncessarily challenging.
|
||||||
|
|
||||||
refresh_row() will populate one row of playlist_rows from the database
|
refresh_row() will populate one row of playlist_rows from the
|
||||||
refresh_data() will repopulate all of playlist_rows from the database
|
database
|
||||||
|
|
||||||
|
refresh_data() will repopulate all of playlist_rows from the
|
||||||
|
database
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
@ -102,10 +106,13 @@ class PlaylistModel(QAbstractTableModel):
|
|||||||
|
|
||||||
self.playlist_rows: dict[int, PlaylistRowData] = {}
|
self.playlist_rows: dict[int, PlaylistRowData] = {}
|
||||||
|
|
||||||
self.refresh_data()
|
with Session() as session:
|
||||||
|
self.refresh_data(session)
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
return f"<PlaylistModel: playlist_id={self.playlist_id}, {self.rowCount() rows>"
|
return (
|
||||||
|
f"<PlaylistModel: playlist_id={self.playlist_id}, {self.rowCount()} rows>"
|
||||||
|
)
|
||||||
|
|
||||||
def background_role(self, row: int, column: int, prd: PlaylistRowData) -> QBrush:
|
def background_role(self, row: int, column: int, prd: PlaylistRowData) -> QBrush:
|
||||||
"""Return background setting"""
|
"""Return background setting"""
|
||||||
@ -290,72 +297,55 @@ class PlaylistModel(QAbstractTableModel):
|
|||||||
|
|
||||||
return QVariant()
|
return QVariant()
|
||||||
|
|
||||||
def insert_header_row(self, row_number: Optional[int], text: str) -> Optional[int]:
|
def insert_header_row(self, row_number: Optional[int], text: str) -> None:
|
||||||
"""
|
"""
|
||||||
Insert a header row. Return row number or None if insertion failed.
|
Insert a header row. Return row number or None if insertion failed.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
with Session() as session:
|
with Session() as session:
|
||||||
prd = self._insert_row(session, row_number)
|
plr = self._insert_row(session, row_number)
|
||||||
# Update playlist_rows
|
# Update the PlaylistRows object
|
||||||
prd.note = text
|
|
||||||
# Get row from db and update
|
|
||||||
plr = session.get(PlaylistRows, prd.plrid)
|
|
||||||
if plr:
|
|
||||||
plr.note = text
|
plr.note = text
|
||||||
self.refresh_row(session, plr.plr_rownum)
|
# Repopulate self.playlist_rows
|
||||||
self.invalidate_row(plr.plr_rownum)
|
self.refresh_data(session)
|
||||||
return plr.plr_rownum
|
# Update the display from the new row onwards
|
||||||
|
self.invalidate_rows(list(range(plr.plr_rownum, len(self.playlist_rows))))
|
||||||
return None
|
|
||||||
|
|
||||||
def _insert_row(
|
def _insert_row(
|
||||||
self, session: scoped_session, row_number: Optional[int]
|
self, session: scoped_session, row_number: Optional[int]
|
||||||
) -> PlaylistRowData:
|
) -> PlaylistRows:
|
||||||
"""
|
"""
|
||||||
Make space for a row at row_number. If row_number is greater
|
Insert a row in the database.
|
||||||
than length of list plus 1, or if row number is -1, put row at
|
|
||||||
end of list.
|
|
||||||
|
|
||||||
Return the new PlaylistData structure
|
If row_number is greater than length of list plus 1, or if row
|
||||||
|
number is None, put row at end of list.
|
||||||
|
|
||||||
|
Move existing rows to make space if ncessary.
|
||||||
|
|
||||||
|
Return the new PlaylistRows object.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
modified_rows: List[int] = []
|
|
||||||
|
|
||||||
if row_number is None or row_number > len(self.playlist_rows):
|
if row_number is None or row_number > len(self.playlist_rows):
|
||||||
|
# We are adding to the end of the list so we can optimise
|
||||||
new_row_number = len(self.playlist_rows)
|
new_row_number = len(self.playlist_rows)
|
||||||
|
return PlaylistRows(session, self.playlist_id, new_row_number)
|
||||||
elif row_number < 0:
|
elif row_number < 0:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"playlistmodel.insert_row, invalid row number ({row_number})"
|
f"playlistmodel._insert_row, invalid row number ({row_number})"
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
new_row_number = row_number
|
new_row_number = row_number
|
||||||
modified_rows.append(new_row_number)
|
|
||||||
|
|
||||||
# Move rows below new row down
|
# Move rows below new row down
|
||||||
for i in reversed(range(new_row_number, len(self.playlist_rows))):
|
stmt = (
|
||||||
self.playlist_rows[i + 1] = self.playlist_rows[i]
|
update(PlaylistRows)
|
||||||
self.playlist_rows[i + 1].plr_rownum += 1
|
.where(PlaylistRows.plr_rownum >= new_row_number)
|
||||||
modified_rows.append(i + 1)
|
.values({PlaylistRows.plr_rownum: PlaylistRows.plr_rownum + 1})
|
||||||
# If we are not adding to the end of the list, we need to clear
|
|
||||||
# out the existing recored at new_row_number (which we have
|
|
||||||
# already copied to its new location)
|
|
||||||
if new_row_number in self.playlist_rows:
|
|
||||||
del self.playlist_rows[new_row_number]
|
|
||||||
|
|
||||||
*** Problem here is that we haven't yet updated the database so when we insert a new row
|
|
||||||
with the PlaylistRows.__init__ call below, we'll get a duplicate. How best to keep
|
|
||||||
playlist_rows in step with database?
|
|
||||||
|
|
||||||
# Insert new row, possibly replace old row
|
|
||||||
plr = PlaylistRows(
|
|
||||||
session=session, playlist_id=self.playlist_id, row_number=new_row_number
|
|
||||||
)
|
)
|
||||||
prd = PlaylistRowData(plr)
|
session.execute(stmt)
|
||||||
# Add row to playlist_rows
|
|
||||||
self.playlist_rows[new_row_number] = prd
|
|
||||||
|
|
||||||
return prd
|
# Insert the new row and return it
|
||||||
|
return PlaylistRows(session, self.playlist_id, new_row_number)
|
||||||
|
|
||||||
def invalidate_row(self, modified_row: int) -> None:
|
def invalidate_row(self, modified_row: int) -> None:
|
||||||
"""
|
"""
|
||||||
@ -374,63 +364,70 @@ class PlaylistModel(QAbstractTableModel):
|
|||||||
for modified_row in modified_rows:
|
for modified_row in modified_rows:
|
||||||
self.invalidate_row(modified_row)
|
self.invalidate_row(modified_row)
|
||||||
|
|
||||||
def move_rows(self, from_rows: List[int], to_row: int) -> None:
|
def move_rows(self, from_rows: List[int], to_row_number: int) -> None:
|
||||||
"""
|
"""
|
||||||
Move the playlist rows given to to_row and below.
|
Move the playlist rows given to to_row and below.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
new_playlist_rows: dict[int, PlaylistRowData] = {}
|
# Build a {current_row_number: new_row_number} dictionary
|
||||||
|
row_map: dict[int, int] = {}
|
||||||
|
|
||||||
# Move the from_row records from the playlist_rows dict to the
|
# Put the from_row row numbers into the row_map. Ultimately the
|
||||||
# new_playlist_rows dict. The total number of elements in the
|
# total number of elements in the playlist doesn't change, so
|
||||||
# playlist doesn't change, so check that adding the moved rows
|
# check that adding the moved rows starting at to_row won't
|
||||||
# starting at to_row won't overshoot the end of the playlist.
|
# overshoot the end of the playlist.
|
||||||
if to_row + len(from_rows) > len(self.playlist_rows):
|
if to_row_number + len(from_rows) > len(self.playlist_rows):
|
||||||
next_to_row = len(self.playlist_rows) - len(from_rows)
|
next_to_row = len(self.playlist_rows) - len(from_rows)
|
||||||
else:
|
else:
|
||||||
next_to_row = to_row
|
next_to_row = to_row_number
|
||||||
|
|
||||||
for from_row in from_rows:
|
for from_row, to_row in zip(
|
||||||
new_playlist_rows[next_to_row] = self.playlist_rows[from_row]
|
from_rows, range(next_to_row, next_to_row + len(from_rows))
|
||||||
del self.playlist_rows[from_row]
|
):
|
||||||
next_to_row += 1
|
row_map[from_row] = to_row
|
||||||
|
# Move the remaining rows to the row_map. We want to fill it
|
||||||
|
# before (if there are gaps) and after (likewise) the rows that
|
||||||
|
# are moving.
|
||||||
|
# This iterates old_row and new_row simultaneously.
|
||||||
|
for old_row, new_row in zip(
|
||||||
|
[x for x in self.playlist_rows.keys() if x not in from_rows],
|
||||||
|
[y for y in range(len(self.playlist_rows)) if y not in row_map.values()],
|
||||||
|
):
|
||||||
|
# Optimise: only add to map if there is a change
|
||||||
|
row_map[old_row] = new_row
|
||||||
|
|
||||||
# Move the remaining rows to the gaps in new_playlist_rows
|
# For SQLAlchemy, build a list of dictionaries that map plrid to
|
||||||
new_row = 0
|
# new row number:
|
||||||
for old_row in self.playlist_rows.keys():
|
sqla_map: List[dict[str, int]] = []
|
||||||
# Find next gap
|
for oldrow, newrow in row_map.items():
|
||||||
while new_row in new_playlist_rows:
|
plrid = self.playlist_rows[oldrow].plrid
|
||||||
new_row += 1
|
sqla_map.append({"plrid": plrid, "plr_rownum": newrow})
|
||||||
new_playlist_rows[new_row] = self.playlist_rows[old_row]
|
|
||||||
new_row += 1
|
|
||||||
|
|
||||||
# Make copy of rows live
|
# Update database. Ref:
|
||||||
self.playlist_rows = new_playlist_rows
|
# https://docs.sqlalchemy.org/en/20/core/sqlelement.html#sqlalchemy.sql.expression.case
|
||||||
|
stmt = (
|
||||||
|
update(PlaylistRows)
|
||||||
|
.where(
|
||||||
|
PlaylistRows.playlist_id == self.playlist_id,
|
||||||
|
PlaylistRows.id == bindparam("plrid"),
|
||||||
|
)
|
||||||
|
.values(plr_rownum=bindparam("plr_rownum"))
|
||||||
|
)
|
||||||
|
|
||||||
# Update PlaylistRows table and notify display of rows that
|
|
||||||
# moved
|
|
||||||
with Session() as session:
|
with Session() as session:
|
||||||
for idx in range(len(self.playlist_rows)):
|
session.connection().execute(stmt, sqla_map)
|
||||||
if self.playlist_rows[idx].plr_rownum == idx:
|
|
||||||
continue
|
|
||||||
# Row number in this row is incorred. Fix it in
|
|
||||||
# database:
|
|
||||||
plr = session.get(PlaylistRows, self.playlist_rows[idx].plrid)
|
|
||||||
if not plr:
|
|
||||||
print(f"\nCan't find plr in playlistmodel:move_rows {idx=}")
|
|
||||||
continue
|
|
||||||
plr.plr_rownum = idx
|
|
||||||
# Fix in self.playlist_rows
|
|
||||||
self.playlist_rows[idx].plr_rownum = idx
|
|
||||||
# Update display
|
|
||||||
self.invalidate_row(idx)
|
|
||||||
|
|
||||||
def refresh_data(self):
|
# Update playlist_rows
|
||||||
|
self.refresh_data(session)
|
||||||
|
|
||||||
|
# Update display
|
||||||
|
self.invalidate_rows(list(row_map.keys()))
|
||||||
|
|
||||||
|
def refresh_data(self, session: scoped_session):
|
||||||
"""Populate dicts for data calls"""
|
"""Populate dicts for data calls"""
|
||||||
|
|
||||||
# Populate self.playlist_rows with playlist data
|
# Populate self.playlist_rows with playlist data
|
||||||
self.playlist_rows.clear()
|
self.playlist_rows.clear()
|
||||||
with Session() as session:
|
|
||||||
for p in PlaylistRows.deep_rows(session, self.playlist_id):
|
for p in PlaylistRows.deep_rows(session, self.playlist_id):
|
||||||
self.playlist_rows[p.plr_rownum] = PlaylistRowData(p)
|
self.playlist_rows[p.plr_rownum] = PlaylistRowData(p)
|
||||||
|
|
||||||
|
|||||||
84
archive/db_experiments.py
Executable file
84
archive/db_experiments.py
Executable file
@ -0,0 +1,84 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
from sqlalchemy import create_engine, String, update, bindparam, case
|
||||||
|
from sqlalchemy.orm import (
|
||||||
|
DeclarativeBase,
|
||||||
|
Mapped,
|
||||||
|
mapped_column,
|
||||||
|
sessionmaker,
|
||||||
|
scoped_session,
|
||||||
|
)
|
||||||
|
from typing import Generator
|
||||||
|
from contextlib import contextmanager
|
||||||
|
|
||||||
|
db_url = "sqlite:////tmp/rhys.db"
|
||||||
|
|
||||||
|
|
||||||
|
class Base(DeclarativeBase):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class Rhys(Base):
|
||||||
|
__tablename__ = "rhys"
|
||||||
|
|
||||||
|
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
|
||||||
|
ref_number: Mapped[int] = mapped_column()
|
||||||
|
name: Mapped[str] = mapped_column(String(256), index=True)
|
||||||
|
|
||||||
|
def __init__(self, session, ref_number: int, name: str) -> None:
|
||||||
|
self.ref_number = ref_number
|
||||||
|
self.name = name
|
||||||
|
session.add(self)
|
||||||
|
session.flush()
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def Session() -> Generator[scoped_session, None, None]:
|
||||||
|
Session = scoped_session(sessionmaker(bind=engine))
|
||||||
|
yield Session
|
||||||
|
Session.commit()
|
||||||
|
Session.close()
|
||||||
|
|
||||||
|
|
||||||
|
engine = create_engine(db_url)
|
||||||
|
Base.metadata.create_all(engine)
|
||||||
|
|
||||||
|
inital_number_of_records = 10
|
||||||
|
|
||||||
|
|
||||||
|
def move_rows(session):
|
||||||
|
new_row = 6
|
||||||
|
|
||||||
|
with Session() as session:
|
||||||
|
# new_record = Rhys(session, new_row, f"new {new_row=}")
|
||||||
|
# Move rows
|
||||||
|
|
||||||
|
stmt = (
|
||||||
|
update(Rhys)
|
||||||
|
.where(Rhys.ref_number > new_row)
|
||||||
|
# .where(Rhys.id.in_(session.query(Rhys.id).order_by(Rhys.id.desc())))
|
||||||
|
.values({Rhys.ref_number: Rhys.ref_number + 1})
|
||||||
|
)
|
||||||
|
|
||||||
|
session.execute(stmt)
|
||||||
|
|
||||||
|
|
||||||
|
sqla_map = []
|
||||||
|
for k, v in zip(range(11), [0, 1, 2, 3, 4, 7, 8, 10, 5, 6, 9]):
|
||||||
|
sqla_map.append({"oldrow": k, "newrow": v})
|
||||||
|
|
||||||
|
# for a, b in sqla_map.items():
|
||||||
|
# print(f"{a} > {b}")
|
||||||
|
|
||||||
|
with Session() as session:
|
||||||
|
for a in range(inital_number_of_records):
|
||||||
|
_ = Rhys(session, a, f"record: {a}")
|
||||||
|
|
||||||
|
stmt = update(Rhys).values(
|
||||||
|
ref_number=case(
|
||||||
|
{item['oldrow']: item['newrow'] for item in sqla_map},
|
||||||
|
value=Rhys.ref_number
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
session.connection().execute(stmt, sqla_map)
|
||||||
@ -14,34 +14,13 @@ def create_model_with_playlist_rows(
|
|||||||
for row in range(rows):
|
for row in range(rows):
|
||||||
plr = model._insert_row(session, row)
|
plr = model._insert_row(session, row)
|
||||||
newrow = plr.plr_rownum
|
newrow = plr.plr_rownum
|
||||||
model.playlist_rows[newrow].note = str(newrow)
|
plr.note = str(newrow)
|
||||||
|
model.playlist_rows[newrow] = playlistmodel.PlaylistRowData(plr)
|
||||||
|
|
||||||
session.commit()
|
session.commit()
|
||||||
return model
|
return model
|
||||||
|
|
||||||
|
|
||||||
def test_insert_row(monkeypatch, session):
|
|
||||||
monkeypatch.setattr(playlistmodel, "Session", session)
|
|
||||||
# Create a playlist
|
|
||||||
playlist = Playlists(session, "test playlist")
|
|
||||||
# Create a model
|
|
||||||
model = playlistmodel.PlaylistModel(playlist.id, None)
|
|
||||||
assert model.rowCount() == 0
|
|
||||||
model._insert_row(session, 0)
|
|
||||||
assert model.rowCount() == 1
|
|
||||||
|
|
||||||
|
|
||||||
def test_insert_high_row(monkeypatch, session):
|
|
||||||
monkeypatch.setattr(playlistmodel, "Session", session)
|
|
||||||
# Create a playlist
|
|
||||||
playlist = Playlists(session, "test playlist")
|
|
||||||
# Create a model
|
|
||||||
model = playlistmodel.PlaylistModel(playlist.id, None)
|
|
||||||
assert model.rowCount() == 0
|
|
||||||
model._insert_row(session, 5)
|
|
||||||
assert model.rowCount() == 1
|
|
||||||
|
|
||||||
|
|
||||||
def test_11_row_playlist(monkeypatch, session):
|
def test_11_row_playlist(monkeypatch, session):
|
||||||
# Create multirow playlist
|
# Create multirow playlist
|
||||||
monkeypatch.setattr(playlistmodel, "Session", session)
|
monkeypatch.setattr(playlistmodel, "Session", session)
|
||||||
@ -187,12 +166,35 @@ def test_insert_header_row_end(monkeypatch, session):
|
|||||||
|
|
||||||
monkeypatch.setattr(playlistmodel, "Session", session)
|
monkeypatch.setattr(playlistmodel, "Session", session)
|
||||||
note_text = "test text"
|
note_text = "test text"
|
||||||
|
initial_row_count = 11
|
||||||
|
|
||||||
model = create_model_with_playlist_rows(session, 11)
|
model = create_model_with_playlist_rows(session, initial_row_count)
|
||||||
row_number = model.insert_header_row(None, note_text)
|
model.insert_header_row(None, note_text)
|
||||||
session.flush()
|
assert model.rowCount() == initial_row_count + 1
|
||||||
assert model.rowCount() == row_number + 1
|
prd = model.playlist_rows[model.rowCount() - 1]
|
||||||
prd = model.playlist_rows[row_number]
|
|
||||||
# Test against edit_role because display_role for headers is
|
# Test against edit_role because display_role for headers is
|
||||||
# handled differently (sets up row span)
|
# handled differently (sets up row span)
|
||||||
assert model.edit_role(row_number, playlistmodel.Col.NOTE.value, prd) == note_text
|
assert (
|
||||||
|
model.edit_role(model.rowCount(), playlistmodel.Col.NOTE.value, prd)
|
||||||
|
== note_text
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_insert_header_row_middle(monkeypatch, session):
|
||||||
|
# insert header row in middle of playlist
|
||||||
|
|
||||||
|
monkeypatch.setattr(playlistmodel, "Session", session)
|
||||||
|
note_text = "test text"
|
||||||
|
initial_row_count = 11
|
||||||
|
insert_row = 6
|
||||||
|
|
||||||
|
model = create_model_with_playlist_rows(session, initial_row_count)
|
||||||
|
model.insert_header_row(insert_row, note_text)
|
||||||
|
assert model.rowCount() == initial_row_count + 1
|
||||||
|
prd = model.playlist_rows[insert_row]
|
||||||
|
# Test against edit_role because display_role for headers is
|
||||||
|
# handled differently (sets up row span)
|
||||||
|
assert (
|
||||||
|
model.edit_role(model.rowCount(), playlistmodel.Col.NOTE.value, prd)
|
||||||
|
== note_text
|
||||||
|
)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user