Compare commits

...

3 Commits

Author SHA1 Message Date
Keith Edmunds
4c1ee0b1ca WIP: all tests for move rows within playlist working 2025-03-22 20:54:04 +00:00
Keith Edmunds
bc7d6818aa WIP: move within playlist tests working 2025-03-22 18:53:14 +00:00
Keith Edmunds
0f8409879c Report correct line for ApplicationError 2025-03-22 09:27:55 +00:00
11 changed files with 722 additions and 445 deletions

View File

@ -163,6 +163,13 @@ class TrackInfo(NamedTuple):
# Classes for signals # Classes for signals
@dataclass
class InsertRows:
playlist_id: int
from_row: int
to_row: int
@dataclass @dataclass
class InsertTrack: class InsertTrack:
playlist_id: int playlist_id: int
@ -188,6 +195,8 @@ class MusicMusterSignals(QObject):
search_wikipedia_signal = pyqtSignal(str) search_wikipedia_signal = pyqtSignal(str)
show_warning_signal = pyqtSignal(str, str) show_warning_signal = pyqtSignal(str, str)
signal_add_track_to_header = pyqtSignal(int, int) signal_add_track_to_header = pyqtSignal(int, int)
signal_begin_insert_rows = pyqtSignal(InsertRows)
signal_end_insert_rows = pyqtSignal(int)
signal_insert_track = pyqtSignal(InsertTrack) signal_insert_track = pyqtSignal(InsertTrack)
signal_playlist_selected_rows = pyqtSignal(int, list) signal_playlist_selected_rows = pyqtSignal(int, list)
signal_set_next_row = pyqtSignal(int) signal_set_next_row = pyqtSignal(int)

View File

@ -32,6 +32,7 @@ from classes import (
MusicMusterSignals, MusicMusterSignals,
singleton, singleton,
Tags, Tags,
TrackDTO,
) )
from config import Config from config import Config
from helpers import ( from helpers import (
@ -40,7 +41,6 @@ from helpers import (
show_OK, show_OK,
) )
from log import log from log import log
from models import db, Tracks
from playlistrow import TrackSequence from playlistrow import TrackSequence
from playlistmodel import PlaylistModel from playlistmodel import PlaylistModel
import helpers import helpers
@ -122,13 +122,7 @@ class FileImporter:
# Get signals # Get signals
self.signals = MusicMusterSignals() self.signals = MusicMusterSignals()
def _get_existing_tracks(self) -> Sequence[Tracks]: self.existing_tracks: list[TrackDTO] = []
"""
Return a list of all existing Tracks
"""
with db.Session() as session:
return Tracks.get_all(session)
def start(self) -> None: def start(self) -> None:
""" """
@ -148,7 +142,7 @@ class FileImporter:
# Refresh list of existing tracks as they may have been updated # Refresh list of existing tracks as they may have been updated
# by previous imports # by previous imports
self.existing_tracks = self._get_existing_tracks() self.existing_tracks = repository.get_all_tracks()
for infile in [ for infile in [
os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f) os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f)

View File

@ -79,9 +79,22 @@ log = logging.getLogger(Config.LOG_NAME)
def handle_exception(exc_type, exc_value, exc_traceback): def handle_exception(exc_type, exc_value, exc_traceback):
error = str(exc_value) """
Inform user of exception
"""
# Navigate to the inner stack frame
tb = exc_traceback
while tb.tb_next:
tb = tb.tb_next
fname = os.path.basename(tb.tb_frame.f_code.co_filename)
lineno = tb.tb_lineno
msg = f"ApplicationError: {exc_value}\nat {fname}:{lineno}"
logmsg = f"ApplicationError: {exc_value} at {fname}:{lineno}"
if issubclass(exc_type, ApplicationError): if issubclass(exc_type, ApplicationError):
log.error(error) log.error(logmsg)
else: else:
# Handle unexpected errors (log and display) # Handle unexpected errors (log and display)
error_msg = "".join(traceback.format_exception(exc_type, exc_value, exc_traceback)) error_msg = "".join(traceback.format_exception(exc_type, exc_value, exc_traceback))
@ -104,7 +117,6 @@ def handle_exception(exc_type, exc_value, exc_traceback):
) )
if QApplication.instance() is not None: if QApplication.instance() is not None:
fname = os.path.split(exc_traceback.tb_frame.f_code.co_filename)[1] fname = os.path.split(exc_traceback.tb_frame.f_code.co_filename)[1]
msg = f"ApplicationError: {error}\nat {fname}:{exc_traceback.tb_lineno}"
QMessageBox.critical(None, "Application Error", msg) QMessageBox.critical(None, "Application Error", msg)

View File

@ -438,24 +438,6 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
) )
) )
@staticmethod
def fixup_rownumbers(session: Session, playlist_id: int) -> None:
"""
Ensure the row numbers for passed playlist have no gaps
"""
plrs = session.scalars(
select(PlaylistRows)
.where(PlaylistRows.playlist_id == playlist_id)
.order_by(PlaylistRows.row_number)
).all()
for i, plr in enumerate(plrs):
plr.row_number = i
# Ensure new row numbers are available to the caller
session.commit()
@classmethod @classmethod
def plrids_to_plrs( def plrids_to_plrs(
cls, session: Session, playlist_id: int, plr_ids: list[int] cls, session: Session, playlist_id: int, plr_ids: list[int]

View File

@ -1180,7 +1180,7 @@ class Window(QMainWindow):
self.footer_section.widgetFadeVolume.setDefaultPadding(0) self.footer_section.widgetFadeVolume.setDefaultPadding(0)
self.footer_section.widgetFadeVolume.setBackground(Config.FADE_CURVE_BACKGROUND) self.footer_section.widgetFadeVolume.setBackground(Config.FADE_CURVE_BACKGROUND)
self.move_source_rows: Optional[list[int]] = None self.move_source_rows: list[PlaylistRow] = []
self.move_source_model: Optional[PlaylistModel] = None self.move_source_model: Optional[PlaylistModel] = None
self.disable_selection_timing = False self.disable_selection_timing = False
@ -2030,7 +2030,7 @@ class Window(QMainWindow):
# Save the selected PlaylistRows items ready for a later # Save the selected PlaylistRows items ready for a later
# paste # paste
self.move_source_rows = self.current.selected_row_numbers self.move_source_rows = self.current.base_model.selected_rows
self.move_source_model = self.current.base_model self.move_source_model = self.current.base_model
log.debug( log.debug(
@ -2665,6 +2665,8 @@ class Window(QMainWindow):
Update track clocks. Update track clocks.
""" """
self.timer1000.stop()
raise ApplicationError("test")
# If track is playing, update track clocks time and colours # If track is playing, update track clocks time and colours
if self.track_sequence.current and self.track_sequence.current.is_playing(): if self.track_sequence.current and self.track_sequence.current.is_playing():
# Elapsed time # Elapsed time

View File

@ -34,6 +34,7 @@ import obswebsocket # type: ignore
from classes import ( from classes import (
ApplicationError, ApplicationError,
Col, Col,
InsertRows,
MusicMusterSignals, MusicMusterSignals,
) )
from config import Config from config import Config
@ -94,16 +95,14 @@ class PlaylistModel(QAbstractTableModel):
self.signals.begin_reset_model_signal.connect(self.begin_reset_model) self.signals.begin_reset_model_signal.connect(self.begin_reset_model)
self.signals.end_reset_model_signal.connect(self.end_reset_model) self.signals.end_reset_model_signal.connect(self.end_reset_model)
self.signals.signal_add_track_to_header.connect(self.add_track_to_header) self.signals.signal_add_track_to_header.connect(self.add_track_to_header)
self.signals.signal_begin_insert_rows.connect(self.begin_insert_rows)
self.signals.signal_end_insert_rows.connect(self.end_insert_rows)
self.signals.signal_playlist_selected_rows.connect(self.set_selected_rows) self.signals.signal_playlist_selected_rows.connect(self.set_selected_rows)
self.signals.signal_set_next_row.connect(self.set_next_row) self.signals.signal_set_next_row.connect(self.set_next_row)
with db.Session() as session:
# Ensure row numbers in playlist are contiguous
# TODO: remove this
PlaylistRows.fixup_rownumbers(session, playlist_id)
# Populate self.playlist_rows # Populate self.playlist_rows
self.load_data() for dto in repository.get_playlist_rows(self.playlist_id):
self.playlist_rows[dto.row_number] = PlaylistRow(dto)
self.update_track_times() self.update_track_times()
def __repr__(self) -> str: def __repr__(self) -> str:
@ -389,25 +388,18 @@ class PlaylistModel(QAbstractTableModel):
Need to delete them in contiguous groups wrapped in beginRemoveRows / endRemoveRows Need to delete them in contiguous groups wrapped in beginRemoveRows / endRemoveRows
calls. To keep it simple, if inefficient, delete rows one by one. calls. To keep it simple, if inefficient, delete rows one by one.
TODO: delete in blocks
Delete from highest row back so that not yet deleted row numbers don't change. Delete from highest row back so that not yet deleted row numbers don't change.
""" """
with db.Session() as session: for row_group in self._reversed_contiguous_row_groups(row_numbers):
for row_number in sorted(row_numbers, reverse=True): # Signal that rows will be removed
log.debug(f"{self}: delete_rows(), {row_number=}") super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group))
super().beginRemoveRows(QModelIndex(), row_number, row_number) # Remove rows from data store
# We need to remove data from the underlying data store, repository.remove_rows(self.playlist_id, row_group)
# which is the database, but we cache in # Signal that data store has been updated
# self.playlist_rows, which is what calls to data() super().endRemoveRows()
# reads, so fixup that too.
PlaylistRows.delete_row(session, self.playlist_id, row_number)
PlaylistRows.fixup_rownumbers(session, self.playlist_id)
self.refresh_data(session)
session.commit()
super().endRemoveRows()
self.refresh_data()
self.track_sequence.update() self.track_sequence.update()
self.update_track_times() self.update_track_times()
@ -825,39 +817,6 @@ class PlaylistModel(QAbstractTableModel):
return None return None
def load_data(self) -> None:
"""
Same as refresh data, but only used when creating playslit.
Distinguishes profile time between initial load and other
refreshes.
"""
# We used to clear self.playlist_rows each time but that's
# expensive and slow on big playlists
# # Note where each playlist_id is
# plid_to_row: dict[int, int] = {}
# for oldrow in self.playlist_rows:
# plrdata = self.playlist_rows[oldrow]
# plid_to_row[plrdata.playlistrow_id] = plrdata.row_number
# build a new playlist_rows
# new_playlist_rows: dict[int, RowAndTrack] = {}
# for p in PlaylistRows.get_playlist_rows(session, self.playlist_id):
# if p.id not in plid_to_row:
# new_playlist_rows[p.row_number] = RowAndTrack(p)
# else:
# new_playlist_rows[p.row_number] = self.playlist_rows[plid_to_row[p.id]]
# new_playlist_rows[p.row_number].row_number = p.row_number
# build a new playlist_rows
# shouldn't be PlaylistRow
new_playlist_rows: dict[int, PlaylistRow] = {}
for dto in repository.get_playlist_rows(self.playlist_id):
new_playlist_rows[dto.row_number] = PlaylistRow(dto)
# Copy to self.playlist_rows
self.playlist_rows = new_playlist_rows
def mark_unplayed(self, row_numbers: list[int]) -> None: def mark_unplayed(self, row_numbers: list[int]) -> None:
""" """
Mark row as unplayed Mark row as unplayed
@ -874,78 +833,81 @@ class PlaylistModel(QAbstractTableModel):
] ]
self.invalidate_rows(row_numbers, roles) self.invalidate_rows(row_numbers, roles)
def move_rows(self, from_rows: list[int], to_row_number: int) -> None: def move_rows(self, from_rows: list[PlaylistRow], to_row_number: int) -> None:
""" """
Move the playlist rows given to to_row and below. Move the playlist rows given to to_row and below.
""" """
log.debug(f"{self}: move_rows({from_rows=}, {to_row_number=}") log.debug(f"{self}: move_rows({from_rows=}, {to_row_number=}")
# Build a {current_row_number: new_row_number} dictionary # Don't move current row
row_map: dict[int, int] = {} if self.track_sequence.current:
current_row = self.track_sequence.current.row_number
if current_row in from_rows:
log.debug(
"move_rows: Removing {current_row=} from {from_rows=}"
)
from_rows.remove(self.track_sequence.current.row_number)
# The destination row number will need to be reduced by the # Row moves must be wrapped in beginMoveRows .. endMoveRows and
# number of rows being move from above the destination row # the row range must be contiguous. Process the highest rows
# otherwise rows below the destination row will end up above the # first so the lower row numbers are unchanged
# moved rows.
adjusted_to_row = to_row_number - len(
[a for a in from_rows if a < to_row_number]
)
# Put the from_row row numbers into the row_map. Ultimately the row_groups = self._reversed_contiguous_row_groups([a.row_number for a in from_rows])
# total number of elements in the playlist doesn't change, so
# check that adding the moved rows starting at to_row won't
# overshoot the end of the playlist.
if adjusted_to_row + len(from_rows) > len(self.playlist_rows):
next_to_row = len(self.playlist_rows) - len(from_rows)
else:
next_to_row = adjusted_to_row
# zip iterates from_row and to_row simultaneously from the # Handle the moves in row_group chunks
# respective sequences inside zip() for row_group in row_groups:
for from_row, to_row in zip( # Tell model we will be moving rows
from_rows, range(next_to_row, next_to_row + len(from_rows)) # See https://doc.qt.io/qt-6/qabstractitemmodel.html#beginMoveRows
): # for how destination is calculated
row_map[from_row] = to_row destination = to_row_number
if to_row_number > max(row_group):
# Move the remaining rows to the row_map. We want to fill it destination = to_row_number - max(row_group) + 1
# before (if there are gaps) and after (likewise) the rows that super().beginMoveRows(QModelIndex(),
# are moving. min(row_group),
# zip iterates old_row and new_row simultaneously from the max(row_group),
# respective sequences inside zip() QModelIndex(),
for old_row, new_row in zip( destination
[x for x in self.playlist_rows.keys() if x not in from_rows], )
[y for y in range(len(self.playlist_rows)) if y not in row_map.values()], # Update database
): repository.move_rows_within_playlist(self.playlist_id, row_group, to_row_number)
# Optimise: only add to map if there is a change # Tell model we have finished moving rows
if old_row != new_row: super().endMoveRows()
row_map[old_row] = new_row
# For SQLAlchemy, build a list of dictionaries that map playlistrow_id to
# new row number:
sqla_map: list[dict[str, int]] = []
for oldrow, newrow in row_map.items():
playlistrow_id = self.playlist_rows[oldrow].playlistrow_id
sqla_map.append({"playlistrow_id": playlistrow_id, "row_number": newrow})
with db.Session() as session:
PlaylistRows.update_plr_row_numbers(session, self.playlist_id, sqla_map)
session.commit()
# Update playlist_rows
self.refresh_data(session)
# Update display # Update display
self.refresh_data()
self.track_sequence.update() self.track_sequence.update()
self.update_track_times() self.update_track_times()
# only invalidate required roles # TODO: do we need this?
roles = [ # # only invalidate required roles
Qt.ItemDataRole.DisplayRole, # roles = [
] # Qt.ItemDataRole.DisplayRole,
self.invalidate_rows(list(row_map.keys()), roles) # ]
# self.invalidate_rows(list(row_map.keys()), roles)
def begin_insert_rows(self, insert_rows: InsertRows) -> None:
"""
Prepare model to insert rows
"""
if insert_rows.playlist_id != self.playlist_id:
return
super().beginInsertRows(QModelIndex(), insert_rows.from_row, insert_rows.to_row)
def end_insert_rows(self, playlist_id: int) -> None:
"""
End insert rows
"""
if playlist_id != self.playlist_id:
return
super().endInsertRows()
def move_rows_between_playlists( def move_rows_between_playlists(
self, self,
from_rows: list[int], from_rows: list[PlaylistRow],
to_row_number: int, to_row_number: int,
to_playlist_id: int, to_playlist_id: int,
) -> None: ) -> None:
@ -958,56 +920,46 @@ class PlaylistModel(QAbstractTableModel):
f"{to_row_number=}, {to_playlist_id=}" f"{to_row_number=}, {to_playlist_id=}"
) )
# Row removal must be wrapped in beginRemoveRows .. # Don't move current row
# endRemoveRows and the row range must be contiguous. Process if self.track_sequence.current:
# the highest rows first so the lower row numbers are unchanged current_row = self.track_sequence.current.row_number
row_groups = self._reversed_contiguous_row_groups(from_rows) if current_row in from_rows:
log.debug(
# Prepare destination playlist for a reset "move_rows_between_playlists: Removing {current_row=} from {from_rows=}"
self.signals.begin_reset_model_signal.emit(to_playlist_id)
with db.Session() as session:
for row_group in row_groups:
# Make room in destination playlist
max_destination_row_number = PlaylistRows.get_last_used_row(
session, to_playlist_id
) )
if ( from_rows.remove(self.track_sequence.current.row_number)
max_destination_row_number
and to_row_number <= max_destination_row_number
):
# Move the destination playlist rows down to make room.
PlaylistRows.move_rows_down(
session, to_playlist_id, to_row_number, len(row_group)
)
next_to_row = to_row_number
super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group)) # Row removal must be wrapped in beginRemoveRows .. endRemoveRows
for playlist_row in PlaylistRows.plrids_to_plrs( # and the row range must be contiguous. Process the highest rows
session, # first so the lower row numbers are unchanged
self.playlist_id,
[self.playlist_rows[a].playlistrow_id for a in row_group],
):
if (
self.track_sequence.current
and playlist_row.id == self.track_sequence.current.playlistrow_id
):
# Don't move current track
continue
playlist_row.playlist_id = to_playlist_id
playlist_row.row_number = next_to_row
next_to_row += 1
self.refresh_data(session)
super().endRemoveRows()
# We need to remove gaps in row numbers after tracks have
# moved.
PlaylistRows.fixup_rownumbers(session, self.playlist_id)
self.refresh_data(session)
session.commit()
# Reset of model must come after session has been closed row_groups = self._reversed_contiguous_row_groups([a.row_number for a in from_rows])
# Handle the moves in row_group chunks
# TODO: use bool QAbstractItemModel::beginMoveRows(const
# QModelIndex &sourceParent, int sourceFirst, int sourceLast,
# const QModelIndex &destinationParent, int destinationChild)
for row_group in row_groups:
# Prepare source model
super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group))
# Prepare destination model
insert_rows = InsertRows(to_playlist_id,
to_row_number,
to_row_number + len(row_group)
)
self.signals.signal_begin_insert_rows.emit(insert_rows)
repository.move_rows_to_playlist(from_rows=row_group,
from_playlist_id=self.playlist_id,
to_row=to_row_number,
to_playlist_id=to_playlist_id
)
self.signals.signal_end_insert_rows.emit(to_playlist_id)
super().endRemoveRows()
self.refresh_data()
self.track_sequence.update() self.track_sequence.update()
self.signals.end_reset_model_signal.emit(to_playlist_id)
self.update_track_times() self.update_track_times()
def move_track_add_note( def move_track_add_note(
@ -1101,16 +1053,9 @@ class PlaylistModel(QAbstractTableModel):
] ]
self.invalidate_row(self.track_sequence.previous.row_number, roles) self.invalidate_row(self.track_sequence.previous.row_number, roles)
def refresh_data(self, session: Session) -> None: def refresh_data(self) -> None:
""" """
Populate self.playlist_rows with playlist data Populate self.playlist_rows with playlist data
We used to clear self.playlist_rows each time but that's
expensive and slow on big playlists. Instead we track where rows
are in database versus self.playlist_rows and fixup the latter.
This works well for news rows added and for rows moved, but
doesn't work for changed comments so they must be handled using
refresh_row().
""" """
# Note where each playlist_id is by mapping each playlistrow_id # Note where each playlist_id is by mapping each playlistrow_id
@ -1216,9 +1161,7 @@ class PlaylistModel(QAbstractTableModel):
] ]
self.invalidate_rows(row_numbers, roles) self.invalidate_rows(row_numbers, roles)
def _reversed_contiguous_row_groups( def _reversed_contiguous_row_groups(self, row_numbers: list[int]) -> list[list[int]]:
self, row_numbers: list[int]
) -> list[list[int]]:
""" """
Take the list of row numbers and split into groups of contiguous rows. Return as a list Take the list of row numbers and split into groups of contiguous rows. Return as a list
of lists with the highest row numbers first. of lists with the highest row numbers first.
@ -1233,6 +1176,7 @@ class PlaylistModel(QAbstractTableModel):
result: list[list[int]] = [] result: list[list[int]] = []
temp: list[int] = [] temp: list[int] = []
last_value = row_numbers[0] - 1 last_value = row_numbers[0] - 1
row_numbers.sort()
for idx in range(len(row_numbers)): for idx in range(len(row_numbers)):
if row_numbers[idx] != last_value + 1: if row_numbers[idx] != last_value + 1:

View File

@ -5,13 +5,14 @@ import re
# Third party imports # Third party imports
from sqlalchemy import ( from sqlalchemy import (
delete,
func, func,
select, select,
update, update,
) )
from sqlalchemy.orm import aliased from sqlalchemy.orm import aliased
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
from sqlalchemy.sql.elements import BinaryExpression from sqlalchemy.sql.elements import BinaryExpression, ColumnElement
from classes import ApplicationError, PlaylistRowDTO from classes import ApplicationError, PlaylistRowDTO
# App imports # App imports
@ -113,6 +114,12 @@ def create_track(path: str) -> TrackDTO:
return new_track return new_track
def get_all_tracks() -> list[TrackDTO]:
"""Return a list of all tracks"""
return _tracks_where(Tracks.id > 0)
def track_by_id(track_id: int) -> TrackDTO | None: def track_by_id(track_id: int) -> TrackDTO | None:
""" """
Return track with specified id Return track with specified id
@ -171,7 +178,7 @@ def track_by_id(track_id: int) -> TrackDTO | None:
return dto return dto
def _tracks_like(where: BinaryExpression) -> list[TrackDTO]: def _tracks_where(where: BinaryExpression | ColumnElement[bool]) -> list[TrackDTO]:
""" """
Return tracks selected by where Return tracks selected by where
""" """
@ -235,7 +242,7 @@ def tracks_like_artist(filter_str: str) -> list[TrackDTO]:
Return tracks where artist is like filter Return tracks where artist is like filter
""" """
return _tracks_like(Tracks.artist.ilike(f"%{filter_str}%")) return _tracks_where(Tracks.artist.ilike(f"%{filter_str}%"))
def tracks_like_title(filter_str: str) -> list[TrackDTO]: def tracks_like_title(filter_str: str) -> list[TrackDTO]:
@ -243,54 +250,16 @@ def tracks_like_title(filter_str: str) -> list[TrackDTO]:
Return tracks where title is like filter Return tracks where title is like filter
""" """
return _tracks_like(Tracks.title.ilike(f"%{filter_str}%")) return _tracks_where(Tracks.title.ilike(f"%{filter_str}%"))
# Playlist functions # Playlist functions
def _check_row_number_sequence( def _move_rows(
session: Session, playlist_id: int, fix: bool = False
) -> None:
"""
The row numbers for any playlist should run from 0 to (length - 1).
This function checks that that is the case.
If there are errors, 'fix' determines what action is taken.
If fix == True:
Fix the row numbers and save to database. Log at info level.
If fix == False:
Log at error level and raise ApplicationError
"""
errors = False
playlist_rows = session.scalars(
select(PlaylistRows)
.where(PlaylistRows.playlist_id == playlist_id)
.order_by(PlaylistRows.row_number)
).all()
for idx, playlist_row in enumerate(playlist_rows):
if playlist_row.row_number != idx:
errors = True
msg = f"_check_row_number_sequence({playlist_id=}, {fix=}, {playlist_row=}, {idx=}"
if fix:
log.info(msg)
playlist_row.row_number = idx
else:
log.error(msg)
raise ApplicationError(msg)
if errors:
session.commit()
def _move_rows_down(
session: Session, playlist_id: int, starting_row: int, move_by: int session: Session, playlist_id: int, starting_row: int, move_by: int
) -> None: ) -> None:
""" """
Create space to insert move_by additional rows by incremented row Move rows from starting_row by move_by. If move_by is +ve, move rows
number from starting_row to end of playlist down; if -ve, move them up.
""" """
log.debug(f"(_move_rows_down({playlist_id=}, {starting_row=}, {move_by=}") log.debug(f"(_move_rows_down({playlist_id=}, {starting_row=}, {move_by=}")
@ -306,6 +275,111 @@ def _move_rows_down(
session.commit() session.commit()
def move_rows_to_playlist(
from_rows: list[int], from_playlist_id: int, to_row: int, to_playlist_id: int
) -> None:
"""
Move rows between playlists.
"""
with db.Session() as session:
# Prepare desination playlist
# Find last used row
last_row = session.execute(
select(func.max(PlaylistRows.row_number)).where(
PlaylistRows.playlist_id == to_playlist_id
)
).scalar_one()
if last_row is None:
last_row = -1
# Make room in destination
if to_row <= last_row:
_move_rows(session, to_playlist_id, to_row, len(from_rows))
# Move rows
row_offset = to_row - min(from_rows)
stmt = (
update(PlaylistRows)
.where(
PlaylistRows.playlist_id == from_playlist_id,
PlaylistRows.row_number.in_(from_rows)
)
.values(
playlist_id=to_playlist_id,
row_number=PlaylistRows.row_number + row_offset
)
)
session.execute(stmt)
# Remove gaps in source
_move_rows(session=session,
playlist_id=from_playlist_id,
starting_row=max(from_rows) + 1,
move_by=(len(from_rows) * -1)
)
# Commit changes
session.commit()
# Sanity check
_check_playlist_integrity(session, get_playlist_rows(from_playlist_id), fix=False)
_check_playlist_integrity(session, get_playlist_rows(to_playlist_id), fix=False)
def move_rows_within_playlist(playlist_id: int, from_rows: list[int], to_row: int) -> None:
"""
Move rows within a playlist.
"""
log.debug(f"move_rows_within_playlist({playlist_id=}, {from_rows=}, {to_row=})")
playlistrows_dto = get_playlist_rows(playlist_id)
new_order: dict[int, int | None] = dict.fromkeys(range(len(playlistrows_dto)))
# The destination row number will need to be reduced by the
# number of rows being move from above the destination row
# otherwise rows below the destination row will end up above the
# moved rows.
# next_row = to_row - len([a for a in from_rows if a < to_row])
# Need to ensure the moved rows won't overrun the total number of
# rows
next_row = to_row
if next_row + len(from_rows) > len(playlistrows_dto):
next_row = len(playlistrows_dto) - len(from_rows)
# Populate new_order with moved rows
# # We need to keep, where possible, the rows after to_row unmoved
# if to_row + len(from_rows) > len(playlistrows_dto):
# next_row = max(to_row - len(from_rows) - len([a for a in from_rows if a < to_row]) + 1, 0)
for from_row in from_rows:
new_order[next_row] = from_row
next_row += 1
# Move remaining rows
remaining_rows = set(new_order.keys()) - set(from_rows)
next_row = 0
for row in remaining_rows:
while new_order[next_row] is not None:
next_row += 1
new_order[next_row] = row
next_row += 1
# Sanity check
if None in new_order:
raise ApplicationError(f"None remains after move: {new_order=}")
# Update database
# Build a list of dicts of (id: value, row_number: value}
update_list = []
for new_row_number, old_row_number in new_order.items():
plrid = [a.playlistrow_id for a in playlistrows_dto if a.row_number == old_row_number][0]
update_list.append(dict(id=plrid, row_number=new_row_number))
# Update rows
with db.Session() as session:
session.execute(update(PlaylistRows), update_list)
session.commit()
# Sanity check
_check_playlist_integrity(session, get_playlist_rows(playlist_id), fix=False)
def create_playlist(name: str, template_id: int) -> PlaylistDTO: def create_playlist(name: str, template_id: int) -> PlaylistDTO:
""" """
Create playlist and return DTO. Create playlist and return DTO.
@ -417,6 +491,27 @@ def get_playlist_row(playlistrow_id: int) -> PlaylistRowDTO | None:
return dto return dto
def _check_playlist_integrity(
session: Session, playlist_rows: list[PlaylistRowDTO], fix: bool = False
) -> None:
"""
Ensure the row numbers are contiguous. Fix and log if fix==True,
else raise ApplicationError.
"""
for idx, plr in enumerate(playlist_rows):
if plr.row_number == idx:
continue
msg = f"_check_playlist_integrity: incorrect row number ({plr.playlistrow_id=}, {idx=})"
if fix:
log.debug(msg)
plr.row_number = idx
session.commit()
else:
raise ApplicationError(msg)
def get_playlist_rows(playlist_id: int) -> list[PlaylistRowDTO]: def get_playlist_rows(playlist_id: int) -> list[PlaylistRowDTO]:
# Alias PlaydatesTable for subquery # Alias PlaydatesTable for subquery
LatestPlaydate = aliased(Playdates) LatestPlaydate = aliased(Playdates)
@ -458,6 +553,9 @@ def get_playlist_rows(playlist_id: int) -> list[PlaylistRowDTO]:
with db.Session() as session: with db.Session() as session:
results = session.execute(stmt).all() results = session.execute(stmt).all()
# Sanity check
# TODO: would be good to be confident at removing this
_check_playlist_integrity(session=session, playlist_rows=results, fix=False)
dto_list = [] dto_list = []
for row in results: for row in results:
@ -516,10 +614,10 @@ def insert_row(
with db.Session() as session: with db.Session() as session:
# Sanity check # Sanity check
_check_row_number_sequence(session=session, playlist_id=playlist_id, fix=False) _check_playlist_integrity(session, get_playlist_rows(playlist_id), fix=False)
# Make space for new row # Make space for new row
_move_rows_down( _move_rows(
session=session, playlist_id=playlist_id, starting_row=row_number, move_by=1 session=session, playlist_id=playlist_id, starting_row=row_number, move_by=1
) )
@ -534,7 +632,7 @@ def insert_row(
playlist_row_id = playlist_row.id playlist_row_id = playlist_row.id
# Sanity check # Sanity check
_check_row_number_sequence(session=session, playlist_id=playlist_id, fix=False) _check_playlist_integrity(session, get_playlist_rows(playlist_id), fix=False)
new_playlist_row = get_playlist_row(playlistrow_id=playlist_row_id) new_playlist_row = get_playlist_row(playlistrow_id=playlist_row_id)
if not new_playlist_row: if not new_playlist_row:
@ -543,6 +641,29 @@ def insert_row(
return new_playlist_row return new_playlist_row
def remove_rows(playlist_id: int, row_numbers: list[int]) -> None:
"""
Remove rows from playlist
Delete from highest row back so that not yet deleted row numbers don't change.
"""
log.debug(f"remove_rows({playlist_id=}, {row_numbers=}")
with db.Session() as session:
for row_number in sorted(row_numbers, reverse=True):
session.execute(
delete(PlaylistRows).where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.row_number == row_number,
)
)
# Fixup row number to remove gaps
_check_playlist_integrity(session, playlist_id, fix=True)
session.commit()
def playlist_by_id(playlist_id: int) -> PlaylistDTO | None: def playlist_by_id(playlist_id: int) -> PlaylistDTO | None:
""" """
Return playlist with specified id Return playlist with specified id
@ -579,7 +700,9 @@ def get_setting(name: str) -> int | None:
""" """
with db.Session() as session: with db.Session() as session:
record = session.execute(select(Settings).where(Settings.name == name)).one_or_none() record = session.execute(
select(Settings).where(Settings.name == name)
).one_or_none()
if not record: if not record:
return None return None
@ -592,12 +715,12 @@ def set_setting(name: str, value: int) -> None:
""" """
with db.Session() as session: with db.Session() as session:
record = session.execute(select(Settings).where(Settings.name == name)).one_or_none() record = session.execute(
select(Settings).where(Settings.name == name)
).one_or_none()
if not record: if not record:
record = Settings(session=session, name=name) record = Settings(session=session, name=name)
if not record: if not record:
raise ApplicationError("Can't create Settings record") raise ApplicationError("Can't create Settings record")
record.f_int = value record.f_int = value
session.commit() session.commit()

166
kae.py Executable file
View File

@ -0,0 +1,166 @@
#!/usr/bin/env python3
import sys
import datetime as dt
from dataclasses import dataclass
from PyQt6.QtWidgets import (
QApplication, QDialog, QLabel, QLineEdit, QListWidget,
QVBoxLayout, QHBoxLayout, QPushButton, QListWidgetItem
)
from PyQt6.QtCore import Qt, pyqtSignal
@dataclass
class TrackDTO:
track_id: int
artist: str
bitrate: int
duration: int # milliseconds
fade_at: int
intro: int | None
path: str
silence_at: int
start_gap: int
title: str
lastplayed: dt.datetime | None
# Placeholder external function to simulate search
def search_titles(query: str) -> list[TrackDTO]:
now = dt.datetime.now()
dummy_tracks = [
TrackDTO(1, "Artist A", 320, 210000, 0, None, "", 0, 0, "Title One", now - dt.timedelta(days=2)),
TrackDTO(2, "Artist B", 256, 185000, 0, None, "", 0, 0, "Another Title", now - dt.timedelta(days=30)),
TrackDTO(3, "Artist C", 320, 240000, 0, None, "", 0, 0, "More Music", None),
]
return [t for t in dummy_tracks if query.lower() in t.title.lower()]
def format_duration(ms: int) -> str:
minutes, seconds = divmod(ms // 1000, 60)
return f"{minutes}:{seconds:02d}"
def friendly_last_played(lastplayed: dt.datetime | None) -> str:
if lastplayed is None:
return "(Never)"
now = dt.datetime.now()
delta = now - lastplayed
days = delta.days
if days == 0:
return "(Today)"
elif days == 1:
return "(Yesterday)"
years, days_remain = divmod(days, 365)
months, days_final = divmod(days_remain, 30)
parts = []
if years:
parts.append(f"{years}y")
if months:
parts.append(f"{months}m")
if days_final:
parts.append(f"{days_final}d")
formatted = " ".join(parts)
return f"({formatted} ago)"
class TrackInsertDialog(QDialog):
signal_insert_track = pyqtSignal(int, str)
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("Insert Track")
# Title input on one line
self.title_label = QLabel("Title:")
self.title_edit = QLineEdit()
self.title_edit.textChanged.connect(self.update_list)
title_layout = QHBoxLayout()
title_layout.addWidget(self.title_label)
title_layout.addWidget(self.title_edit)
# Track list
self.track_list = QListWidget()
# Note input on one line
self.note_label = QLabel("Note:")
self.note_edit = QLineEdit()
note_layout = QHBoxLayout()
note_layout.addWidget(self.note_label)
note_layout.addWidget(self.note_edit)
# Buttons
self.add_btn = QPushButton("Add")
self.add_close_btn = QPushButton("Add and close")
self.close_btn = QPushButton("Close")
self.add_btn.clicked.connect(self.add_clicked)
self.add_close_btn.clicked.connect(self.add_and_close_clicked)
self.close_btn.clicked.connect(self.close)
btn_layout = QHBoxLayout()
btn_layout.addWidget(self.add_btn)
btn_layout.addWidget(self.add_close_btn)
btn_layout.addWidget(self.close_btn)
# Main layout
layout = QVBoxLayout()
layout.addLayout(title_layout)
layout.addWidget(self.track_list)
layout.addLayout(note_layout)
layout.addLayout(btn_layout)
self.setLayout(layout)
self.resize(600, 400)
def update_list(self, text: str):
self.track_list.clear()
if text.strip() == "":
# Do not search or populate list if input is empty
return
tracks = search_titles(text)
for track in tracks:
duration_str = format_duration(track.duration)
last_played_str = friendly_last_played(track.lastplayed)
item_str = f"{track.title} - {track.artist} [{duration_str}] {last_played_str}"
item = QListWidgetItem(item_str)
item.setData(Qt.ItemDataRole.UserRole, track.track_id)
self.track_list.addItem(item)
def get_selected_track_id(self) -> int | None:
selected_items = self.track_list.selectedItems()
if selected_items:
return selected_items[0].data(Qt.ItemDataRole.UserRole)
return None
def add_clicked(self):
track_id = self.get_selected_track_id()
if track_id is not None:
note_text = self.note_edit.text()
self.signal_insert_track.emit(track_id, note_text)
self.title_edit.clear()
self.note_edit.clear()
self.track_list.clear()
self.title_edit.setFocus()
def add_and_close_clicked(self):
track_id = self.get_selected_track_id()
if track_id is not None:
note_text = self.note_edit.text()
self.signal_insert_track.emit(track_id, note_text)
self.accept()
# Test harness (for quick testing)
if __name__ == "__main__":
app = QApplication(sys.argv)
dialog = TrackInsertDialog()
def print_inserted(track_id, note):
print(f"Inserted track ID: {track_id}, Note: '{note}'")
dialog.signal_insert_track.connect(print_inserted)
dialog.exec()

View File

@ -1,78 +0,0 @@
# Standard library imports
import unittest
# PyQt imports
# Third party imports
# App imports
from app import playlistmodel
from app import repository
from app.models import db
class MyTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Runs once before any test in this class"""
pass
@classmethod
def tearDownClass(cls):
"""Runs once after all tests"""
pass
def setUp(self):
"""Runs before each test"""
db.create_all()
# Create a playlist and model
playlist_name = "my playlist"
self.playlist = repository.create_playlist(name=playlist_name, template_id=0)
assert self.playlist
self.model = playlistmodel.PlaylistModel(
self.playlist.playlist_id, is_template=False
)
assert self.model
# Create tracks
track1_path = "testdata/isa.mp3"
self.track1 = repository.create_track(track1_path)
track2_path = "testdata/mom.mp3"
self.track2 = repository.create_track(track2_path)
# Add tracks and header to playlist
self.row0 = repository.insert_row(
self.playlist.playlist_id,
row_number=0,
track_id=self.track1.track_id,
note="track 1",
)
self.row1 = repository.insert_row(
self.playlist.playlist_id,
row_number=1,
track_id=0,
note="Header row",
)
self.row2 = repository.insert_row(
self.playlist.playlist_id,
row_number=2,
track_id=self.track2.track_id,
note="track 2",
)
def tearDown(self):
"""Runs after each test"""
db.drop_all()
def test_add_track_to_header(self):
"""Add a track to a header row"""
repository.add_track_to_header(self.row1.playlistrow_id, self.track2.track_id)
result = repository.get_playlist_row(self.row1.playlistrow_id)
assert result.track_id == self.track2.track_id

View File

@ -134,122 +134,6 @@ class TestMMMiscRowMove(unittest.TestCase):
def tearDown(self): def tearDown(self):
db.drop_all() db.drop_all()
def test_move_rows_test2(self):
# move row 3 to row 5
self.model.move_rows([3], 5)
# Check we have all rows and plr_rownums are correct
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
if row not in [3, 4, 5]:
assert self.model.playlist_rows[row].note == str(row)
elif row == 3:
assert self.model.playlist_rows[row].note == str(4)
elif row == 4:
assert self.model.playlist_rows[row].note == str(3)
elif row == 5:
assert self.model.playlist_rows[row].note == str(5)
def test_move_rows_test3(self):
# move row 4 to row 3
self.model.move_rows([4], 3)
# Check we have all rows and plr_rownums are correct
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
if row not in [3, 4]:
assert self.model.playlist_rows[row].note == str(row)
elif row == 3:
assert self.model.playlist_rows[row].note == str(4)
elif row == 4:
assert self.model.playlist_rows[row].note == str(3)
def test_move_rows_test4(self):
# move row 4 to row 2
self.model.move_rows([4], 2)
# Check we have all rows and plr_rownums are correct
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
if row not in [2, 3, 4]:
assert self.model.playlist_rows[row].note == str(row)
elif row == 2:
assert self.model.playlist_rows[row].note == str(4)
elif row == 3:
assert self.model.playlist_rows[row].note == str(2)
elif row == 4:
assert self.model.playlist_rows[row].note == str(3)
def test_move_rows_test5(self):
# move rows [1, 4, 5, 10] → 8
self.model.move_rows([1, 4, 5, 10], 8)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 2, 3, 6, 7, 1, 4, 5, 10, 8, 9]
def test_move_rows_test6(self):
# move rows [3, 6] → 5
self.model.move_rows([3, 6], 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 4, 3, 6, 5, 7, 8, 9, 10]
def test_move_rows_test7(self):
# move rows [3, 5, 6] → 8
self.model.move_rows([3, 5, 6], 8)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 4, 7, 3, 5, 6, 8, 9, 10]
def test_move_rows_test8(self):
# move rows [7, 8, 10] → 5
self.model.move_rows([7, 8, 10], 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 3, 4, 7, 8, 10, 5, 6, 9]
def test_move_rows_test9(self):
# move rows [1, 2, 3] → 0
# Replicate issue 244
self.model.move_rows([0, 1, 2, 3], 0)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
def test_insert_header_row_end(self): def test_insert_header_row_end(self):
# insert header row at end of playlist # insert header row at end of playlist

239
tests/test_repository.py Normal file
View File

@ -0,0 +1,239 @@
# Standard library imports
import unittest
# PyQt imports
# Third party imports
# App imports
from app import playlistmodel
from app import repository
from app.models import db
from classes import PlaylistDTO
from playlistmodel import PlaylistModel
class MyTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Runs once before any test in this class"""
cls.isa_path = "testdata/isa.mp3"
cls.isa_title = "I'm So Afraid"
cls.isa_artist = "Fleetwood Mac"
cls.mom_path = "testdata/mom.mp3"
cls.mom_title = "Man of Mystery"
cls.mom_artist = "The Shadows"
@classmethod
def tearDownClass(cls):
"""Runs once after all tests"""
pass
def setUp(self):
"""Runs before each test"""
db.create_all()
def create_playlist_and_model(self, playlist_name: str) -> (PlaylistDTO, PlaylistModel):
# Create a playlist and model
playlist = repository.create_playlist(name=playlist_name, template_id=0)
assert playlist
model = playlistmodel.PlaylistModel(playlist.playlist_id, is_template=False)
assert model
return (playlist, model)
def create_playlist_model_tracks(self, playlist_name: str):
(playlist, model) = self.create_playlist_and_model("my playlist")
# Create tracks
self.track1 = repository.create_track(self.isa_path)
self.track2 = repository.create_track(self.mom_path)
# Add tracks and header to playlist
self.row0 = repository.insert_row(
playlist.playlist_id,
row_number=0,
track_id=self.track1.track_id,
note="track 1",
)
self.row1 = repository.insert_row(
playlist.playlist_id,
row_number=1,
track_id=0,
note="Header row",
)
self.row2 = repository.insert_row(
playlist.playlist_id,
row_number=2,
track_id=self.track2.track_id,
note="track 2",
)
def create_rows(self, playlist_name: str, number_of_rows: int) -> (PlaylistDTO, PlaylistModel):
(playlist, model) = self.create_playlist_and_model(playlist_name)
for row_number in range(number_of_rows):
repository.insert_row(
playlist.playlist_id, row_number, None, str(row_number)
)
return (playlist, model)
def tearDown(self):
"""Runs after each test"""
db.drop_all()
def test_add_track_to_header(self):
"""Add a track to a header row"""
self.create_playlist_model_tracks("my playlist")
repository.add_track_to_header(self.row1.playlistrow_id, self.track2.track_id)
result = repository.get_playlist_row(self.row1.playlistrow_id)
assert result.track_id == self.track2.track_id
def test_create_track(self):
repository.create_track(self.isa_path)
results = repository.get_all_tracks()
assert len(results) == 1
assert results[0].path == self.isa_path
def test_get_track_by_id(self):
dto = repository.create_track(self.isa_path)
result = repository.track_by_id(dto.track_id)
assert result.path == self.isa_path
def test_get_track_by_artist(self):
_ = repository.create_track(self.isa_path)
_ = repository.create_track(self.mom_path)
result_isa = repository.tracks_like_artist(self.isa_artist)
assert len(result_isa) == 1
assert result_isa[0].artist == self.isa_artist
result_mom = repository.tracks_like_artist(self.mom_artist)
assert len(result_mom) == 1
assert result_mom[0].artist == self.mom_artist
def test_get_track_by_title(self):
_ = repository.create_track(self.isa_path)
_ = repository.create_track(self.mom_path)
result_isa = repository.tracks_like_title(self.isa_title)
assert len(result_isa) == 1
assert result_isa[0].title == self.isa_title
result_mom = repository.tracks_like_title(self.mom_title)
assert len(result_mom) == 1
assert result_mom[0].title == self.mom_title
def test_move_rows_test1(self):
# move row 3 to row 5
number_of_rows = 10
(playlist, model) = self.create_rows("test_move_rows_test1", number_of_rows)
repository.move_rows_within_playlist(playlist.playlist_id, [3], 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 4, 5, 3, 6, 7, 8, 9]
def test_move_rows_test2(self):
# move row 4 to row 3
number_of_rows = 10
(playlist, model) = self.create_rows("test_move_rows_test2", number_of_rows)
repository.move_rows_within_playlist(playlist.playlist_id, [4], 3)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 4, 3, 5, 6, 7, 8, 9]
def test_move_rows_test3(self):
# move row 4 to row 2
number_of_rows = 10
(playlist, model) = self.create_rows("test_move_rows_test3", number_of_rows)
repository.move_rows_within_playlist(playlist.playlist_id, [4], 2)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 4, 2, 3, 5, 6, 7, 8, 9]
def test_move_rows_test4(self):
# move rows [1, 4, 5, 10] → 8
number_of_rows = 11
(playlist, model) = self.create_rows("test_move_rows_test4", number_of_rows)
repository.move_rows_within_playlist(playlist.playlist_id, [1, 4, 5, 10], 8)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 2, 3, 6, 7, 8, 9, 1, 4, 5, 10]
def test_move_rows_test5(self):
# move rows [3, 6] → 5
number_of_rows = 11
(playlist, model) = self.create_rows("test_move_rows_test5", number_of_rows)
repository.move_rows_within_playlist(playlist.playlist_id, [3, 6], 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 4, 5, 3, 6, 7, 8, 9, 10]
def test_move_rows_test6(self):
# move rows [3, 5, 6] → 8
number_of_rows = 11
(playlist, model) = self.create_rows("test_move_rows_test6", number_of_rows)
repository.move_rows_within_playlist(playlist.playlist_id, [3, 5, 6], 8)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 4, 7, 8, 9, 10, 3, 5, 6]
def test_move_rows_test7(self):
# move rows [7, 8, 10] → 5
number_of_rows = 11
(playlist, model) = self.create_rows("test_move_rows_test6", number_of_rows)
repository.move_rows_within_playlist(playlist.playlist_id, [7, 8, 10], 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 3, 4, 7, 8, 10, 5, 6, 9]
def test_move_rows_test8(self):
# move rows [1, 2, 3] → 0
# Replicate issue 244
number_of_rows = 11
(playlist, model) = self.create_rows("test_move_rows_test6", number_of_rows)
repository.move_rows_within_playlist(playlist.playlist_id, [0, 1, 2, 3], 0)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]