Compare commits

..

3 Commits

Author SHA1 Message Date
Keith Edmunds
3832d9300c move_rows implemented; all tests pass 2023-10-28 11:30:37 +01:00
Keith Edmunds
afb8ddfaf5 Added archive/db_experiments.py for testing 2023-10-27 12:01:43 +01:00
Keith Edmunds
617c39c0de Reworked inserting rows into model
_insert_row() handles database
insert_header() handles playlist_rows and display updates
2023-10-27 12:01:09 +01:00
3 changed files with 205 additions and 122 deletions

View File

@ -1,5 +1,6 @@
from datetime import datetime from datetime import datetime
from enum import auto, Enum from enum import auto, Enum
from sqlalchemy import bindparam, update
from typing import List, Optional, TYPE_CHECKING from typing import List, Optional, TYPE_CHECKING
from dbconfig import scoped_session, Session from dbconfig import scoped_session, Session
@ -84,13 +85,16 @@ class PlaylistModel(QAbstractTableModel):
""" """
The Playlist Model The Playlist Model
Update strategy: update the database and then refresh the cached copy (self.playlist_rows). Update strategy: update the database and then refresh the
We do not try to edit playlist_rows directly. It would be too easy for a bug to get us row-indexed cached copy (self.playlist_rows). Do not edit
out of sync with the database, and if that wasn't immediately apparent then debugging it self.playlist_rows directly because keeping it and the
would be hard. database in sync is uncessarily challenging.
refresh_row() will populate one row of playlist_rows from the database refresh_row() will populate one row of playlist_rows from the
refresh_data() will repopulate all of playlist_rows from the database database
refresh_data() will repopulate all of playlist_rows from the
database
""" """
def __init__( def __init__(
@ -102,10 +106,13 @@ class PlaylistModel(QAbstractTableModel):
self.playlist_rows: dict[int, PlaylistRowData] = {} self.playlist_rows: dict[int, PlaylistRowData] = {}
self.refresh_data() with Session() as session:
self.refresh_data(session)
def __repr__(self) -> str: def __repr__(self) -> str:
return f"<PlaylistModel: playlist_id={self.playlist_id}, {self.rowCount() rows>" return (
f"<PlaylistModel: playlist_id={self.playlist_id}, {self.rowCount()} rows>"
)
def background_role(self, row: int, column: int, prd: PlaylistRowData) -> QBrush: def background_role(self, row: int, column: int, prd: PlaylistRowData) -> QBrush:
"""Return background setting""" """Return background setting"""
@ -290,72 +297,55 @@ class PlaylistModel(QAbstractTableModel):
return QVariant() return QVariant()
def insert_header_row(self, row_number: Optional[int], text: str) -> Optional[int]: def insert_header_row(self, row_number: Optional[int], text: str) -> None:
""" """
Insert a header row. Return row number or None if insertion failed. Insert a header row. Return row number or None if insertion failed.
""" """
with Session() as session: with Session() as session:
prd = self._insert_row(session, row_number) plr = self._insert_row(session, row_number)
# Update playlist_rows # Update the PlaylistRows object
prd.note = text
# Get row from db and update
plr = session.get(PlaylistRows, prd.plrid)
if plr:
plr.note = text plr.note = text
self.refresh_row(session, plr.plr_rownum) # Repopulate self.playlist_rows
self.invalidate_row(plr.plr_rownum) self.refresh_data(session)
return plr.plr_rownum # Update the display from the new row onwards
self.invalidate_rows(list(range(plr.plr_rownum, len(self.playlist_rows))))
return None
def _insert_row( def _insert_row(
self, session: scoped_session, row_number: Optional[int] self, session: scoped_session, row_number: Optional[int]
) -> PlaylistRowData: ) -> PlaylistRows:
""" """
Make space for a row at row_number. If row_number is greater Insert a row in the database.
than length of list plus 1, or if row number is -1, put row at
end of list.
Return the new PlaylistData structure If row_number is greater than length of list plus 1, or if row
number is None, put row at end of list.
Move existing rows to make space if ncessary.
Return the new PlaylistRows object.
""" """
modified_rows: List[int] = []
if row_number is None or row_number > len(self.playlist_rows): if row_number is None or row_number > len(self.playlist_rows):
# We are adding to the end of the list so we can optimise
new_row_number = len(self.playlist_rows) new_row_number = len(self.playlist_rows)
return PlaylistRows(session, self.playlist_id, new_row_number)
elif row_number < 0: elif row_number < 0:
raise ValueError( raise ValueError(
f"playlistmodel.insert_row, invalid row number ({row_number})" f"playlistmodel._insert_row, invalid row number ({row_number})"
) )
else: else:
new_row_number = row_number new_row_number = row_number
modified_rows.append(new_row_number)
# Move rows below new row down # Move rows below new row down
for i in reversed(range(new_row_number, len(self.playlist_rows))): stmt = (
self.playlist_rows[i + 1] = self.playlist_rows[i] update(PlaylistRows)
self.playlist_rows[i + 1].plr_rownum += 1 .where(PlaylistRows.plr_rownum >= new_row_number)
modified_rows.append(i + 1) .values({PlaylistRows.plr_rownum: PlaylistRows.plr_rownum + 1})
# If we are not adding to the end of the list, we need to clear
# out the existing recored at new_row_number (which we have
# already copied to its new location)
if new_row_number in self.playlist_rows:
del self.playlist_rows[new_row_number]
*** Problem here is that we haven't yet updated the database so when we insert a new row
with the PlaylistRows.__init__ call below, we'll get a duplicate. How best to keep
playlist_rows in step with database?
# Insert new row, possibly replace old row
plr = PlaylistRows(
session=session, playlist_id=self.playlist_id, row_number=new_row_number
) )
prd = PlaylistRowData(plr) session.execute(stmt)
# Add row to playlist_rows
self.playlist_rows[new_row_number] = prd
return prd # Insert the new row and return it
return PlaylistRows(session, self.playlist_id, new_row_number)
def invalidate_row(self, modified_row: int) -> None: def invalidate_row(self, modified_row: int) -> None:
""" """
@ -374,63 +364,70 @@ class PlaylistModel(QAbstractTableModel):
for modified_row in modified_rows: for modified_row in modified_rows:
self.invalidate_row(modified_row) self.invalidate_row(modified_row)
def move_rows(self, from_rows: List[int], to_row: int) -> None: def move_rows(self, from_rows: List[int], to_row_number: int) -> None:
""" """
Move the playlist rows given to to_row and below. Move the playlist rows given to to_row and below.
""" """
new_playlist_rows: dict[int, PlaylistRowData] = {} # Build a {current_row_number: new_row_number} dictionary
row_map: dict[int, int] = {}
# Move the from_row records from the playlist_rows dict to the # Put the from_row row numbers into the row_map. Ultimately the
# new_playlist_rows dict. The total number of elements in the # total number of elements in the playlist doesn't change, so
# playlist doesn't change, so check that adding the moved rows # check that adding the moved rows starting at to_row won't
# starting at to_row won't overshoot the end of the playlist. # overshoot the end of the playlist.
if to_row + len(from_rows) > len(self.playlist_rows): if to_row_number + len(from_rows) > len(self.playlist_rows):
next_to_row = len(self.playlist_rows) - len(from_rows) next_to_row = len(self.playlist_rows) - len(from_rows)
else: else:
next_to_row = to_row next_to_row = to_row_number
for from_row in from_rows: for from_row, to_row in zip(
new_playlist_rows[next_to_row] = self.playlist_rows[from_row] from_rows, range(next_to_row, next_to_row + len(from_rows))
del self.playlist_rows[from_row] ):
next_to_row += 1 row_map[from_row] = to_row
# Move the remaining rows to the row_map. We want to fill it
# before (if there are gaps) and after (likewise) the rows that
# are moving.
# This iterates old_row and new_row simultaneously.
for old_row, new_row in zip(
[x for x in self.playlist_rows.keys() if x not in from_rows],
[y for y in range(len(self.playlist_rows)) if y not in row_map.values()],
):
# Optimise: only add to map if there is a change
row_map[old_row] = new_row
# Move the remaining rows to the gaps in new_playlist_rows # For SQLAlchemy, build a list of dictionaries that map plrid to
new_row = 0 # new row number:
for old_row in self.playlist_rows.keys(): sqla_map: List[dict[str, int]] = []
# Find next gap for oldrow, newrow in row_map.items():
while new_row in new_playlist_rows: plrid = self.playlist_rows[oldrow].plrid
new_row += 1 sqla_map.append({"plrid": plrid, "plr_rownum": newrow})
new_playlist_rows[new_row] = self.playlist_rows[old_row]
new_row += 1
# Make copy of rows live # Update database. Ref:
self.playlist_rows = new_playlist_rows # https://docs.sqlalchemy.org/en/20/core/sqlelement.html#sqlalchemy.sql.expression.case
stmt = (
update(PlaylistRows)
.where(
PlaylistRows.playlist_id == self.playlist_id,
PlaylistRows.id == bindparam("plrid"),
)
.values(plr_rownum=bindparam("plr_rownum"))
)
# Update PlaylistRows table and notify display of rows that
# moved
with Session() as session: with Session() as session:
for idx in range(len(self.playlist_rows)): session.connection().execute(stmt, sqla_map)
if self.playlist_rows[idx].plr_rownum == idx:
continue
# Row number in this row is incorred. Fix it in
# database:
plr = session.get(PlaylistRows, self.playlist_rows[idx].plrid)
if not plr:
print(f"\nCan't find plr in playlistmodel:move_rows {idx=}")
continue
plr.plr_rownum = idx
# Fix in self.playlist_rows
self.playlist_rows[idx].plr_rownum = idx
# Update display
self.invalidate_row(idx)
def refresh_data(self): # Update playlist_rows
self.refresh_data(session)
# Update display
self.invalidate_rows(list(row_map.keys()))
def refresh_data(self, session: scoped_session):
"""Populate dicts for data calls""" """Populate dicts for data calls"""
# Populate self.playlist_rows with playlist data # Populate self.playlist_rows with playlist data
self.playlist_rows.clear() self.playlist_rows.clear()
with Session() as session:
for p in PlaylistRows.deep_rows(session, self.playlist_id): for p in PlaylistRows.deep_rows(session, self.playlist_id):
self.playlist_rows[p.plr_rownum] = PlaylistRowData(p) self.playlist_rows[p.plr_rownum] = PlaylistRowData(p)

84
archive/db_experiments.py Executable file
View File

@ -0,0 +1,84 @@
#!/usr/bin/env python3
from sqlalchemy import create_engine, String, update, bindparam, case
from sqlalchemy.orm import (
DeclarativeBase,
Mapped,
mapped_column,
sessionmaker,
scoped_session,
)
from typing import Generator
from contextlib import contextmanager
db_url = "sqlite:////tmp/rhys.db"
class Base(DeclarativeBase):
pass
class Rhys(Base):
__tablename__ = "rhys"
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
ref_number: Mapped[int] = mapped_column()
name: Mapped[str] = mapped_column(String(256), index=True)
def __init__(self, session, ref_number: int, name: str) -> None:
self.ref_number = ref_number
self.name = name
session.add(self)
session.flush()
@contextmanager
def Session() -> Generator[scoped_session, None, None]:
Session = scoped_session(sessionmaker(bind=engine))
yield Session
Session.commit()
Session.close()
engine = create_engine(db_url)
Base.metadata.create_all(engine)
inital_number_of_records = 10
def move_rows(session):
new_row = 6
with Session() as session:
# new_record = Rhys(session, new_row, f"new {new_row=}")
# Move rows
stmt = (
update(Rhys)
.where(Rhys.ref_number > new_row)
# .where(Rhys.id.in_(session.query(Rhys.id).order_by(Rhys.id.desc())))
.values({Rhys.ref_number: Rhys.ref_number + 1})
)
session.execute(stmt)
sqla_map = []
for k, v in zip(range(11), [0, 1, 2, 3, 4, 7, 8, 10, 5, 6, 9]):
sqla_map.append({"oldrow": k, "newrow": v})
# for a, b in sqla_map.items():
# print(f"{a} > {b}")
with Session() as session:
for a in range(inital_number_of_records):
_ = Rhys(session, a, f"record: {a}")
stmt = update(Rhys).values(
ref_number=case(
{item['oldrow']: item['newrow'] for item in sqla_map},
value=Rhys.ref_number
)
)
session.connection().execute(stmt, sqla_map)

View File

@ -14,34 +14,13 @@ def create_model_with_playlist_rows(
for row in range(rows): for row in range(rows):
plr = model._insert_row(session, row) plr = model._insert_row(session, row)
newrow = plr.plr_rownum newrow = plr.plr_rownum
model.playlist_rows[newrow].note = str(newrow) plr.note = str(newrow)
model.playlist_rows[newrow] = playlistmodel.PlaylistRowData(plr)
session.commit() session.commit()
return model return model
def test_insert_row(monkeypatch, session):
monkeypatch.setattr(playlistmodel, "Session", session)
# Create a playlist
playlist = Playlists(session, "test playlist")
# Create a model
model = playlistmodel.PlaylistModel(playlist.id, None)
assert model.rowCount() == 0
model._insert_row(session, 0)
assert model.rowCount() == 1
def test_insert_high_row(monkeypatch, session):
monkeypatch.setattr(playlistmodel, "Session", session)
# Create a playlist
playlist = Playlists(session, "test playlist")
# Create a model
model = playlistmodel.PlaylistModel(playlist.id, None)
assert model.rowCount() == 0
model._insert_row(session, 5)
assert model.rowCount() == 1
def test_11_row_playlist(monkeypatch, session): def test_11_row_playlist(monkeypatch, session):
# Create multirow playlist # Create multirow playlist
monkeypatch.setattr(playlistmodel, "Session", session) monkeypatch.setattr(playlistmodel, "Session", session)
@ -187,12 +166,35 @@ def test_insert_header_row_end(monkeypatch, session):
monkeypatch.setattr(playlistmodel, "Session", session) monkeypatch.setattr(playlistmodel, "Session", session)
note_text = "test text" note_text = "test text"
initial_row_count = 11
model = create_model_with_playlist_rows(session, 11) model = create_model_with_playlist_rows(session, initial_row_count)
row_number = model.insert_header_row(None, note_text) model.insert_header_row(None, note_text)
session.flush() assert model.rowCount() == initial_row_count + 1
assert model.rowCount() == row_number + 1 prd = model.playlist_rows[model.rowCount() - 1]
prd = model.playlist_rows[row_number]
# Test against edit_role because display_role for headers is # Test against edit_role because display_role for headers is
# handled differently (sets up row span) # handled differently (sets up row span)
assert model.edit_role(row_number, playlistmodel.Col.NOTE.value, prd) == note_text assert (
model.edit_role(model.rowCount(), playlistmodel.Col.NOTE.value, prd)
== note_text
)
def test_insert_header_row_middle(monkeypatch, session):
# insert header row in middle of playlist
monkeypatch.setattr(playlistmodel, "Session", session)
note_text = "test text"
initial_row_count = 11
insert_row = 6
model = create_model_with_playlist_rows(session, initial_row_count)
model.insert_header_row(insert_row, note_text)
assert model.rowCount() == initial_row_count + 1
prd = model.playlist_rows[insert_row]
# Test against edit_role because display_role for headers is
# handled differently (sets up row span)
assert (
model.edit_role(model.rowCount(), playlistmodel.Col.NOTE.value, prd)
== note_text
)