Compare commits

...

3 Commits

Author SHA1 Message Date
Keith Edmunds
4c1ee0b1ca WIP: all tests for move rows within playlist working 2025-03-22 20:54:04 +00:00
Keith Edmunds
bc7d6818aa WIP: move within playlist tests working 2025-03-22 18:53:14 +00:00
Keith Edmunds
0f8409879c Report correct line for ApplicationError 2025-03-22 09:27:55 +00:00
11 changed files with 722 additions and 445 deletions

View File

@ -163,6 +163,13 @@ class TrackInfo(NamedTuple):
# Classes for signals
@dataclass
class InsertRows:
playlist_id: int
from_row: int
to_row: int
@dataclass
class InsertTrack:
playlist_id: int
@ -188,6 +195,8 @@ class MusicMusterSignals(QObject):
search_wikipedia_signal = pyqtSignal(str)
show_warning_signal = pyqtSignal(str, str)
signal_add_track_to_header = pyqtSignal(int, int)
signal_begin_insert_rows = pyqtSignal(InsertRows)
signal_end_insert_rows = pyqtSignal(int)
signal_insert_track = pyqtSignal(InsertTrack)
signal_playlist_selected_rows = pyqtSignal(int, list)
signal_set_next_row = pyqtSignal(int)

View File

@ -32,6 +32,7 @@ from classes import (
MusicMusterSignals,
singleton,
Tags,
TrackDTO,
)
from config import Config
from helpers import (
@ -40,7 +41,6 @@ from helpers import (
show_OK,
)
from log import log
from models import db, Tracks
from playlistrow import TrackSequence
from playlistmodel import PlaylistModel
import helpers
@ -122,13 +122,7 @@ class FileImporter:
# Get signals
self.signals = MusicMusterSignals()
def _get_existing_tracks(self) -> Sequence[Tracks]:
"""
Return a list of all existing Tracks
"""
with db.Session() as session:
return Tracks.get_all(session)
self.existing_tracks: list[TrackDTO] = []
def start(self) -> None:
"""
@ -148,7 +142,7 @@ class FileImporter:
# Refresh list of existing tracks as they may have been updated
# by previous imports
self.existing_tracks = self._get_existing_tracks()
self.existing_tracks = repository.get_all_tracks()
for infile in [
os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f)

View File

@ -79,9 +79,22 @@ log = logging.getLogger(Config.LOG_NAME)
def handle_exception(exc_type, exc_value, exc_traceback):
error = str(exc_value)
"""
Inform user of exception
"""
# Navigate to the inner stack frame
tb = exc_traceback
while tb.tb_next:
tb = tb.tb_next
fname = os.path.basename(tb.tb_frame.f_code.co_filename)
lineno = tb.tb_lineno
msg = f"ApplicationError: {exc_value}\nat {fname}:{lineno}"
logmsg = f"ApplicationError: {exc_value} at {fname}:{lineno}"
if issubclass(exc_type, ApplicationError):
log.error(error)
log.error(logmsg)
else:
# Handle unexpected errors (log and display)
error_msg = "".join(traceback.format_exception(exc_type, exc_value, exc_traceback))
@ -104,7 +117,6 @@ def handle_exception(exc_type, exc_value, exc_traceback):
)
if QApplication.instance() is not None:
fname = os.path.split(exc_traceback.tb_frame.f_code.co_filename)[1]
msg = f"ApplicationError: {error}\nat {fname}:{exc_traceback.tb_lineno}"
QMessageBox.critical(None, "Application Error", msg)

View File

@ -438,24 +438,6 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
)
)
@staticmethod
def fixup_rownumbers(session: Session, playlist_id: int) -> None:
"""
Ensure the row numbers for passed playlist have no gaps
"""
plrs = session.scalars(
select(PlaylistRows)
.where(PlaylistRows.playlist_id == playlist_id)
.order_by(PlaylistRows.row_number)
).all()
for i, plr in enumerate(plrs):
plr.row_number = i
# Ensure new row numbers are available to the caller
session.commit()
@classmethod
def plrids_to_plrs(
cls, session: Session, playlist_id: int, plr_ids: list[int]

View File

@ -1180,7 +1180,7 @@ class Window(QMainWindow):
self.footer_section.widgetFadeVolume.setDefaultPadding(0)
self.footer_section.widgetFadeVolume.setBackground(Config.FADE_CURVE_BACKGROUND)
self.move_source_rows: Optional[list[int]] = None
self.move_source_rows: list[PlaylistRow] = []
self.move_source_model: Optional[PlaylistModel] = None
self.disable_selection_timing = False
@ -2030,7 +2030,7 @@ class Window(QMainWindow):
# Save the selected PlaylistRows items ready for a later
# paste
self.move_source_rows = self.current.selected_row_numbers
self.move_source_rows = self.current.base_model.selected_rows
self.move_source_model = self.current.base_model
log.debug(
@ -2665,6 +2665,8 @@ class Window(QMainWindow):
Update track clocks.
"""
self.timer1000.stop()
raise ApplicationError("test")
# If track is playing, update track clocks time and colours
if self.track_sequence.current and self.track_sequence.current.is_playing():
# Elapsed time

View File

@ -34,6 +34,7 @@ import obswebsocket # type: ignore
from classes import (
ApplicationError,
Col,
InsertRows,
MusicMusterSignals,
)
from config import Config
@ -94,16 +95,14 @@ class PlaylistModel(QAbstractTableModel):
self.signals.begin_reset_model_signal.connect(self.begin_reset_model)
self.signals.end_reset_model_signal.connect(self.end_reset_model)
self.signals.signal_add_track_to_header.connect(self.add_track_to_header)
self.signals.signal_begin_insert_rows.connect(self.begin_insert_rows)
self.signals.signal_end_insert_rows.connect(self.end_insert_rows)
self.signals.signal_playlist_selected_rows.connect(self.set_selected_rows)
self.signals.signal_set_next_row.connect(self.set_next_row)
with db.Session() as session:
# Ensure row numbers in playlist are contiguous
# TODO: remove this
PlaylistRows.fixup_rownumbers(session, playlist_id)
# Populate self.playlist_rows
self.load_data()
for dto in repository.get_playlist_rows(self.playlist_id):
self.playlist_rows[dto.row_number] = PlaylistRow(dto)
self.update_track_times()
def __repr__(self) -> str:
@ -389,25 +388,18 @@ class PlaylistModel(QAbstractTableModel):
Need to delete them in contiguous groups wrapped in beginRemoveRows / endRemoveRows
calls. To keep it simple, if inefficient, delete rows one by one.
TODO: delete in blocks
Delete from highest row back so that not yet deleted row numbers don't change.
"""
with db.Session() as session:
for row_number in sorted(row_numbers, reverse=True):
log.debug(f"{self}: delete_rows(), {row_number=}")
super().beginRemoveRows(QModelIndex(), row_number, row_number)
# We need to remove data from the underlying data store,
# which is the database, but we cache in
# self.playlist_rows, which is what calls to data()
# reads, so fixup that too.
PlaylistRows.delete_row(session, self.playlist_id, row_number)
PlaylistRows.fixup_rownumbers(session, self.playlist_id)
self.refresh_data(session)
session.commit()
super().endRemoveRows()
for row_group in self._reversed_contiguous_row_groups(row_numbers):
# Signal that rows will be removed
super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group))
# Remove rows from data store
repository.remove_rows(self.playlist_id, row_group)
# Signal that data store has been updated
super().endRemoveRows()
self.refresh_data()
self.track_sequence.update()
self.update_track_times()
@ -825,39 +817,6 @@ class PlaylistModel(QAbstractTableModel):
return None
def load_data(self) -> None:
"""
Same as refresh data, but only used when creating playslit.
Distinguishes profile time between initial load and other
refreshes.
"""
# We used to clear self.playlist_rows each time but that's
# expensive and slow on big playlists
# # Note where each playlist_id is
# plid_to_row: dict[int, int] = {}
# for oldrow in self.playlist_rows:
# plrdata = self.playlist_rows[oldrow]
# plid_to_row[plrdata.playlistrow_id] = plrdata.row_number
# build a new playlist_rows
# new_playlist_rows: dict[int, RowAndTrack] = {}
# for p in PlaylistRows.get_playlist_rows(session, self.playlist_id):
# if p.id not in plid_to_row:
# new_playlist_rows[p.row_number] = RowAndTrack(p)
# else:
# new_playlist_rows[p.row_number] = self.playlist_rows[plid_to_row[p.id]]
# new_playlist_rows[p.row_number].row_number = p.row_number
# build a new playlist_rows
# shouldn't be PlaylistRow
new_playlist_rows: dict[int, PlaylistRow] = {}
for dto in repository.get_playlist_rows(self.playlist_id):
new_playlist_rows[dto.row_number] = PlaylistRow(dto)
# Copy to self.playlist_rows
self.playlist_rows = new_playlist_rows
def mark_unplayed(self, row_numbers: list[int]) -> None:
"""
Mark row as unplayed
@ -874,78 +833,81 @@ class PlaylistModel(QAbstractTableModel):
]
self.invalidate_rows(row_numbers, roles)
def move_rows(self, from_rows: list[int], to_row_number: int) -> None:
def move_rows(self, from_rows: list[PlaylistRow], to_row_number: int) -> None:
"""
Move the playlist rows given to to_row and below.
"""
log.debug(f"{self}: move_rows({from_rows=}, {to_row_number=}")
# Build a {current_row_number: new_row_number} dictionary
row_map: dict[int, int] = {}
# Don't move current row
if self.track_sequence.current:
current_row = self.track_sequence.current.row_number
if current_row in from_rows:
log.debug(
"move_rows: Removing {current_row=} from {from_rows=}"
)
from_rows.remove(self.track_sequence.current.row_number)
# The destination row number will need to be reduced by the
# number of rows being move from above the destination row
# otherwise rows below the destination row will end up above the
# moved rows.
adjusted_to_row = to_row_number - len(
[a for a in from_rows if a < to_row_number]
)
# Row moves must be wrapped in beginMoveRows .. endMoveRows and
# the row range must be contiguous. Process the highest rows
# first so the lower row numbers are unchanged
# Put the from_row row numbers into the row_map. Ultimately the
# total number of elements in the playlist doesn't change, so
# check that adding the moved rows starting at to_row won't
# overshoot the end of the playlist.
if adjusted_to_row + len(from_rows) > len(self.playlist_rows):
next_to_row = len(self.playlist_rows) - len(from_rows)
else:
next_to_row = adjusted_to_row
row_groups = self._reversed_contiguous_row_groups([a.row_number for a in from_rows])
# zip iterates from_row and to_row simultaneously from the
# respective sequences inside zip()
for from_row, to_row in zip(
from_rows, range(next_to_row, next_to_row + len(from_rows))
):
row_map[from_row] = to_row
# Move the remaining rows to the row_map. We want to fill it
# before (if there are gaps) and after (likewise) the rows that
# are moving.
# zip iterates old_row and new_row simultaneously from the
# respective sequences inside zip()
for old_row, new_row in zip(
[x for x in self.playlist_rows.keys() if x not in from_rows],
[y for y in range(len(self.playlist_rows)) if y not in row_map.values()],
):
# Optimise: only add to map if there is a change
if old_row != new_row:
row_map[old_row] = new_row
# For SQLAlchemy, build a list of dictionaries that map playlistrow_id to
# new row number:
sqla_map: list[dict[str, int]] = []
for oldrow, newrow in row_map.items():
playlistrow_id = self.playlist_rows[oldrow].playlistrow_id
sqla_map.append({"playlistrow_id": playlistrow_id, "row_number": newrow})
with db.Session() as session:
PlaylistRows.update_plr_row_numbers(session, self.playlist_id, sqla_map)
session.commit()
# Update playlist_rows
self.refresh_data(session)
# Handle the moves in row_group chunks
for row_group in row_groups:
# Tell model we will be moving rows
# See https://doc.qt.io/qt-6/qabstractitemmodel.html#beginMoveRows
# for how destination is calculated
destination = to_row_number
if to_row_number > max(row_group):
destination = to_row_number - max(row_group) + 1
super().beginMoveRows(QModelIndex(),
min(row_group),
max(row_group),
QModelIndex(),
destination
)
# Update database
repository.move_rows_within_playlist(self.playlist_id, row_group, to_row_number)
# Tell model we have finished moving rows
super().endMoveRows()
# Update display
self.refresh_data()
self.track_sequence.update()
self.update_track_times()
# only invalidate required roles
roles = [
Qt.ItemDataRole.DisplayRole,
]
self.invalidate_rows(list(row_map.keys()), roles)
# TODO: do we need this?
# # only invalidate required roles
# roles = [
# Qt.ItemDataRole.DisplayRole,
# ]
# self.invalidate_rows(list(row_map.keys()), roles)
def begin_insert_rows(self, insert_rows: InsertRows) -> None:
"""
Prepare model to insert rows
"""
if insert_rows.playlist_id != self.playlist_id:
return
super().beginInsertRows(QModelIndex(), insert_rows.from_row, insert_rows.to_row)
def end_insert_rows(self, playlist_id: int) -> None:
"""
End insert rows
"""
if playlist_id != self.playlist_id:
return
super().endInsertRows()
def move_rows_between_playlists(
self,
from_rows: list[int],
from_rows: list[PlaylistRow],
to_row_number: int,
to_playlist_id: int,
) -> None:
@ -958,56 +920,46 @@ class PlaylistModel(QAbstractTableModel):
f"{to_row_number=}, {to_playlist_id=}"
)
# Row removal must be wrapped in beginRemoveRows ..
# endRemoveRows and the row range must be contiguous. Process
# the highest rows first so the lower row numbers are unchanged
row_groups = self._reversed_contiguous_row_groups(from_rows)
# Prepare destination playlist for a reset
self.signals.begin_reset_model_signal.emit(to_playlist_id)
with db.Session() as session:
for row_group in row_groups:
# Make room in destination playlist
max_destination_row_number = PlaylistRows.get_last_used_row(
session, to_playlist_id
# Don't move current row
if self.track_sequence.current:
current_row = self.track_sequence.current.row_number
if current_row in from_rows:
log.debug(
"move_rows_between_playlists: Removing {current_row=} from {from_rows=}"
)
if (
max_destination_row_number
and to_row_number <= max_destination_row_number
):
# Move the destination playlist rows down to make room.
PlaylistRows.move_rows_down(
session, to_playlist_id, to_row_number, len(row_group)
)
next_to_row = to_row_number
from_rows.remove(self.track_sequence.current.row_number)
super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group))
for playlist_row in PlaylistRows.plrids_to_plrs(
session,
self.playlist_id,
[self.playlist_rows[a].playlistrow_id for a in row_group],
):
if (
self.track_sequence.current
and playlist_row.id == self.track_sequence.current.playlistrow_id
):
# Don't move current track
continue
playlist_row.playlist_id = to_playlist_id
playlist_row.row_number = next_to_row
next_to_row += 1
self.refresh_data(session)
super().endRemoveRows()
# We need to remove gaps in row numbers after tracks have
# moved.
PlaylistRows.fixup_rownumbers(session, self.playlist_id)
self.refresh_data(session)
session.commit()
# Row removal must be wrapped in beginRemoveRows .. endRemoveRows
# and the row range must be contiguous. Process the highest rows
# first so the lower row numbers are unchanged
# Reset of model must come after session has been closed
row_groups = self._reversed_contiguous_row_groups([a.row_number for a in from_rows])
# Handle the moves in row_group chunks
# TODO: use bool QAbstractItemModel::beginMoveRows(const
# QModelIndex &sourceParent, int sourceFirst, int sourceLast,
# const QModelIndex &destinationParent, int destinationChild)
for row_group in row_groups:
# Prepare source model
super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group))
# Prepare destination model
insert_rows = InsertRows(to_playlist_id,
to_row_number,
to_row_number + len(row_group)
)
self.signals.signal_begin_insert_rows.emit(insert_rows)
repository.move_rows_to_playlist(from_rows=row_group,
from_playlist_id=self.playlist_id,
to_row=to_row_number,
to_playlist_id=to_playlist_id
)
self.signals.signal_end_insert_rows.emit(to_playlist_id)
super().endRemoveRows()
self.refresh_data()
self.track_sequence.update()
self.signals.end_reset_model_signal.emit(to_playlist_id)
self.update_track_times()
def move_track_add_note(
@ -1101,16 +1053,9 @@ class PlaylistModel(QAbstractTableModel):
]
self.invalidate_row(self.track_sequence.previous.row_number, roles)
def refresh_data(self, session: Session) -> None:
def refresh_data(self) -> None:
"""
Populate self.playlist_rows with playlist data
We used to clear self.playlist_rows each time but that's
expensive and slow on big playlists. Instead we track where rows
are in database versus self.playlist_rows and fixup the latter.
This works well for news rows added and for rows moved, but
doesn't work for changed comments so they must be handled using
refresh_row().
"""
# Note where each playlist_id is by mapping each playlistrow_id
@ -1216,9 +1161,7 @@ class PlaylistModel(QAbstractTableModel):
]
self.invalidate_rows(row_numbers, roles)
def _reversed_contiguous_row_groups(
self, row_numbers: list[int]
) -> list[list[int]]:
def _reversed_contiguous_row_groups(self, row_numbers: list[int]) -> list[list[int]]:
"""
Take the list of row numbers and split into groups of contiguous rows. Return as a list
of lists with the highest row numbers first.
@ -1233,6 +1176,7 @@ class PlaylistModel(QAbstractTableModel):
result: list[list[int]] = []
temp: list[int] = []
last_value = row_numbers[0] - 1
row_numbers.sort()
for idx in range(len(row_numbers)):
if row_numbers[idx] != last_value + 1:

View File

@ -5,13 +5,14 @@ import re
# Third party imports
from sqlalchemy import (
delete,
func,
select,
update,
)
from sqlalchemy.orm import aliased
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.elements import BinaryExpression
from sqlalchemy.sql.elements import BinaryExpression, ColumnElement
from classes import ApplicationError, PlaylistRowDTO
# App imports
@ -113,6 +114,12 @@ def create_track(path: str) -> TrackDTO:
return new_track
def get_all_tracks() -> list[TrackDTO]:
"""Return a list of all tracks"""
return _tracks_where(Tracks.id > 0)
def track_by_id(track_id: int) -> TrackDTO | None:
"""
Return track with specified id
@ -171,7 +178,7 @@ def track_by_id(track_id: int) -> TrackDTO | None:
return dto
def _tracks_like(where: BinaryExpression) -> list[TrackDTO]:
def _tracks_where(where: BinaryExpression | ColumnElement[bool]) -> list[TrackDTO]:
"""
Return tracks selected by where
"""
@ -235,7 +242,7 @@ def tracks_like_artist(filter_str: str) -> list[TrackDTO]:
Return tracks where artist is like filter
"""
return _tracks_like(Tracks.artist.ilike(f"%{filter_str}%"))
return _tracks_where(Tracks.artist.ilike(f"%{filter_str}%"))
def tracks_like_title(filter_str: str) -> list[TrackDTO]:
@ -243,54 +250,16 @@ def tracks_like_title(filter_str: str) -> list[TrackDTO]:
Return tracks where title is like filter
"""
return _tracks_like(Tracks.title.ilike(f"%{filter_str}%"))
return _tracks_where(Tracks.title.ilike(f"%{filter_str}%"))
# Playlist functions
def _check_row_number_sequence(
session: Session, playlist_id: int, fix: bool = False
) -> None:
"""
The row numbers for any playlist should run from 0 to (length - 1).
This function checks that that is the case.
If there are errors, 'fix' determines what action is taken.
If fix == True:
Fix the row numbers and save to database. Log at info level.
If fix == False:
Log at error level and raise ApplicationError
"""
errors = False
playlist_rows = session.scalars(
select(PlaylistRows)
.where(PlaylistRows.playlist_id == playlist_id)
.order_by(PlaylistRows.row_number)
).all()
for idx, playlist_row in enumerate(playlist_rows):
if playlist_row.row_number != idx:
errors = True
msg = f"_check_row_number_sequence({playlist_id=}, {fix=}, {playlist_row=}, {idx=}"
if fix:
log.info(msg)
playlist_row.row_number = idx
else:
log.error(msg)
raise ApplicationError(msg)
if errors:
session.commit()
def _move_rows_down(
def _move_rows(
session: Session, playlist_id: int, starting_row: int, move_by: int
) -> None:
"""
Create space to insert move_by additional rows by incremented row
number from starting_row to end of playlist
Move rows from starting_row by move_by. If move_by is +ve, move rows
down; if -ve, move them up.
"""
log.debug(f"(_move_rows_down({playlist_id=}, {starting_row=}, {move_by=}")
@ -306,6 +275,111 @@ def _move_rows_down(
session.commit()
def move_rows_to_playlist(
from_rows: list[int], from_playlist_id: int, to_row: int, to_playlist_id: int
) -> None:
"""
Move rows between playlists.
"""
with db.Session() as session:
# Prepare desination playlist
# Find last used row
last_row = session.execute(
select(func.max(PlaylistRows.row_number)).where(
PlaylistRows.playlist_id == to_playlist_id
)
).scalar_one()
if last_row is None:
last_row = -1
# Make room in destination
if to_row <= last_row:
_move_rows(session, to_playlist_id, to_row, len(from_rows))
# Move rows
row_offset = to_row - min(from_rows)
stmt = (
update(PlaylistRows)
.where(
PlaylistRows.playlist_id == from_playlist_id,
PlaylistRows.row_number.in_(from_rows)
)
.values(
playlist_id=to_playlist_id,
row_number=PlaylistRows.row_number + row_offset
)
)
session.execute(stmt)
# Remove gaps in source
_move_rows(session=session,
playlist_id=from_playlist_id,
starting_row=max(from_rows) + 1,
move_by=(len(from_rows) * -1)
)
# Commit changes
session.commit()
# Sanity check
_check_playlist_integrity(session, get_playlist_rows(from_playlist_id), fix=False)
_check_playlist_integrity(session, get_playlist_rows(to_playlist_id), fix=False)
def move_rows_within_playlist(playlist_id: int, from_rows: list[int], to_row: int) -> None:
"""
Move rows within a playlist.
"""
log.debug(f"move_rows_within_playlist({playlist_id=}, {from_rows=}, {to_row=})")
playlistrows_dto = get_playlist_rows(playlist_id)
new_order: dict[int, int | None] = dict.fromkeys(range(len(playlistrows_dto)))
# The destination row number will need to be reduced by the
# number of rows being move from above the destination row
# otherwise rows below the destination row will end up above the
# moved rows.
# next_row = to_row - len([a for a in from_rows if a < to_row])
# Need to ensure the moved rows won't overrun the total number of
# rows
next_row = to_row
if next_row + len(from_rows) > len(playlistrows_dto):
next_row = len(playlistrows_dto) - len(from_rows)
# Populate new_order with moved rows
# # We need to keep, where possible, the rows after to_row unmoved
# if to_row + len(from_rows) > len(playlistrows_dto):
# next_row = max(to_row - len(from_rows) - len([a for a in from_rows if a < to_row]) + 1, 0)
for from_row in from_rows:
new_order[next_row] = from_row
next_row += 1
# Move remaining rows
remaining_rows = set(new_order.keys()) - set(from_rows)
next_row = 0
for row in remaining_rows:
while new_order[next_row] is not None:
next_row += 1
new_order[next_row] = row
next_row += 1
# Sanity check
if None in new_order:
raise ApplicationError(f"None remains after move: {new_order=}")
# Update database
# Build a list of dicts of (id: value, row_number: value}
update_list = []
for new_row_number, old_row_number in new_order.items():
plrid = [a.playlistrow_id for a in playlistrows_dto if a.row_number == old_row_number][0]
update_list.append(dict(id=plrid, row_number=new_row_number))
# Update rows
with db.Session() as session:
session.execute(update(PlaylistRows), update_list)
session.commit()
# Sanity check
_check_playlist_integrity(session, get_playlist_rows(playlist_id), fix=False)
def create_playlist(name: str, template_id: int) -> PlaylistDTO:
"""
Create playlist and return DTO.
@ -417,6 +491,27 @@ def get_playlist_row(playlistrow_id: int) -> PlaylistRowDTO | None:
return dto
def _check_playlist_integrity(
session: Session, playlist_rows: list[PlaylistRowDTO], fix: bool = False
) -> None:
"""
Ensure the row numbers are contiguous. Fix and log if fix==True,
else raise ApplicationError.
"""
for idx, plr in enumerate(playlist_rows):
if plr.row_number == idx:
continue
msg = f"_check_playlist_integrity: incorrect row number ({plr.playlistrow_id=}, {idx=})"
if fix:
log.debug(msg)
plr.row_number = idx
session.commit()
else:
raise ApplicationError(msg)
def get_playlist_rows(playlist_id: int) -> list[PlaylistRowDTO]:
# Alias PlaydatesTable for subquery
LatestPlaydate = aliased(Playdates)
@ -458,6 +553,9 @@ def get_playlist_rows(playlist_id: int) -> list[PlaylistRowDTO]:
with db.Session() as session:
results = session.execute(stmt).all()
# Sanity check
# TODO: would be good to be confident at removing this
_check_playlist_integrity(session=session, playlist_rows=results, fix=False)
dto_list = []
for row in results:
@ -516,10 +614,10 @@ def insert_row(
with db.Session() as session:
# Sanity check
_check_row_number_sequence(session=session, playlist_id=playlist_id, fix=False)
_check_playlist_integrity(session, get_playlist_rows(playlist_id), fix=False)
# Make space for new row
_move_rows_down(
_move_rows(
session=session, playlist_id=playlist_id, starting_row=row_number, move_by=1
)
@ -534,7 +632,7 @@ def insert_row(
playlist_row_id = playlist_row.id
# Sanity check
_check_row_number_sequence(session=session, playlist_id=playlist_id, fix=False)
_check_playlist_integrity(session, get_playlist_rows(playlist_id), fix=False)
new_playlist_row = get_playlist_row(playlistrow_id=playlist_row_id)
if not new_playlist_row:
@ -543,6 +641,29 @@ def insert_row(
return new_playlist_row
def remove_rows(playlist_id: int, row_numbers: list[int]) -> None:
"""
Remove rows from playlist
Delete from highest row back so that not yet deleted row numbers don't change.
"""
log.debug(f"remove_rows({playlist_id=}, {row_numbers=}")
with db.Session() as session:
for row_number in sorted(row_numbers, reverse=True):
session.execute(
delete(PlaylistRows).where(
PlaylistRows.playlist_id == playlist_id,
PlaylistRows.row_number == row_number,
)
)
# Fixup row number to remove gaps
_check_playlist_integrity(session, playlist_id, fix=True)
session.commit()
def playlist_by_id(playlist_id: int) -> PlaylistDTO | None:
"""
Return playlist with specified id
@ -579,7 +700,9 @@ def get_setting(name: str) -> int | None:
"""
with db.Session() as session:
record = session.execute(select(Settings).where(Settings.name == name)).one_or_none()
record = session.execute(
select(Settings).where(Settings.name == name)
).one_or_none()
if not record:
return None
@ -592,12 +715,12 @@ def set_setting(name: str, value: int) -> None:
"""
with db.Session() as session:
record = session.execute(select(Settings).where(Settings.name == name)).one_or_none()
record = session.execute(
select(Settings).where(Settings.name == name)
).one_or_none()
if not record:
record = Settings(session=session, name=name)
if not record:
raise ApplicationError("Can't create Settings record")
record.f_int = value
session.commit()

166
kae.py Executable file
View File

@ -0,0 +1,166 @@
#!/usr/bin/env python3
import sys
import datetime as dt
from dataclasses import dataclass
from PyQt6.QtWidgets import (
QApplication, QDialog, QLabel, QLineEdit, QListWidget,
QVBoxLayout, QHBoxLayout, QPushButton, QListWidgetItem
)
from PyQt6.QtCore import Qt, pyqtSignal
@dataclass
class TrackDTO:
track_id: int
artist: str
bitrate: int
duration: int # milliseconds
fade_at: int
intro: int | None
path: str
silence_at: int
start_gap: int
title: str
lastplayed: dt.datetime | None
# Placeholder external function to simulate search
def search_titles(query: str) -> list[TrackDTO]:
now = dt.datetime.now()
dummy_tracks = [
TrackDTO(1, "Artist A", 320, 210000, 0, None, "", 0, 0, "Title One", now - dt.timedelta(days=2)),
TrackDTO(2, "Artist B", 256, 185000, 0, None, "", 0, 0, "Another Title", now - dt.timedelta(days=30)),
TrackDTO(3, "Artist C", 320, 240000, 0, None, "", 0, 0, "More Music", None),
]
return [t for t in dummy_tracks if query.lower() in t.title.lower()]
def format_duration(ms: int) -> str:
minutes, seconds = divmod(ms // 1000, 60)
return f"{minutes}:{seconds:02d}"
def friendly_last_played(lastplayed: dt.datetime | None) -> str:
if lastplayed is None:
return "(Never)"
now = dt.datetime.now()
delta = now - lastplayed
days = delta.days
if days == 0:
return "(Today)"
elif days == 1:
return "(Yesterday)"
years, days_remain = divmod(days, 365)
months, days_final = divmod(days_remain, 30)
parts = []
if years:
parts.append(f"{years}y")
if months:
parts.append(f"{months}m")
if days_final:
parts.append(f"{days_final}d")
formatted = " ".join(parts)
return f"({formatted} ago)"
class TrackInsertDialog(QDialog):
signal_insert_track = pyqtSignal(int, str)
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("Insert Track")
# Title input on one line
self.title_label = QLabel("Title:")
self.title_edit = QLineEdit()
self.title_edit.textChanged.connect(self.update_list)
title_layout = QHBoxLayout()
title_layout.addWidget(self.title_label)
title_layout.addWidget(self.title_edit)
# Track list
self.track_list = QListWidget()
# Note input on one line
self.note_label = QLabel("Note:")
self.note_edit = QLineEdit()
note_layout = QHBoxLayout()
note_layout.addWidget(self.note_label)
note_layout.addWidget(self.note_edit)
# Buttons
self.add_btn = QPushButton("Add")
self.add_close_btn = QPushButton("Add and close")
self.close_btn = QPushButton("Close")
self.add_btn.clicked.connect(self.add_clicked)
self.add_close_btn.clicked.connect(self.add_and_close_clicked)
self.close_btn.clicked.connect(self.close)
btn_layout = QHBoxLayout()
btn_layout.addWidget(self.add_btn)
btn_layout.addWidget(self.add_close_btn)
btn_layout.addWidget(self.close_btn)
# Main layout
layout = QVBoxLayout()
layout.addLayout(title_layout)
layout.addWidget(self.track_list)
layout.addLayout(note_layout)
layout.addLayout(btn_layout)
self.setLayout(layout)
self.resize(600, 400)
def update_list(self, text: str):
self.track_list.clear()
if text.strip() == "":
# Do not search or populate list if input is empty
return
tracks = search_titles(text)
for track in tracks:
duration_str = format_duration(track.duration)
last_played_str = friendly_last_played(track.lastplayed)
item_str = f"{track.title} - {track.artist} [{duration_str}] {last_played_str}"
item = QListWidgetItem(item_str)
item.setData(Qt.ItemDataRole.UserRole, track.track_id)
self.track_list.addItem(item)
def get_selected_track_id(self) -> int | None:
selected_items = self.track_list.selectedItems()
if selected_items:
return selected_items[0].data(Qt.ItemDataRole.UserRole)
return None
def add_clicked(self):
track_id = self.get_selected_track_id()
if track_id is not None:
note_text = self.note_edit.text()
self.signal_insert_track.emit(track_id, note_text)
self.title_edit.clear()
self.note_edit.clear()
self.track_list.clear()
self.title_edit.setFocus()
def add_and_close_clicked(self):
track_id = self.get_selected_track_id()
if track_id is not None:
note_text = self.note_edit.text()
self.signal_insert_track.emit(track_id, note_text)
self.accept()
# Test harness (for quick testing)
if __name__ == "__main__":
app = QApplication(sys.argv)
dialog = TrackInsertDialog()
def print_inserted(track_id, note):
print(f"Inserted track ID: {track_id}, Note: '{note}'")
dialog.signal_insert_track.connect(print_inserted)
dialog.exec()

View File

@ -1,78 +0,0 @@
# Standard library imports
import unittest
# PyQt imports
# Third party imports
# App imports
from app import playlistmodel
from app import repository
from app.models import db
class MyTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Runs once before any test in this class"""
pass
@classmethod
def tearDownClass(cls):
"""Runs once after all tests"""
pass
def setUp(self):
"""Runs before each test"""
db.create_all()
# Create a playlist and model
playlist_name = "my playlist"
self.playlist = repository.create_playlist(name=playlist_name, template_id=0)
assert self.playlist
self.model = playlistmodel.PlaylistModel(
self.playlist.playlist_id, is_template=False
)
assert self.model
# Create tracks
track1_path = "testdata/isa.mp3"
self.track1 = repository.create_track(track1_path)
track2_path = "testdata/mom.mp3"
self.track2 = repository.create_track(track2_path)
# Add tracks and header to playlist
self.row0 = repository.insert_row(
self.playlist.playlist_id,
row_number=0,
track_id=self.track1.track_id,
note="track 1",
)
self.row1 = repository.insert_row(
self.playlist.playlist_id,
row_number=1,
track_id=0,
note="Header row",
)
self.row2 = repository.insert_row(
self.playlist.playlist_id,
row_number=2,
track_id=self.track2.track_id,
note="track 2",
)
def tearDown(self):
"""Runs after each test"""
db.drop_all()
def test_add_track_to_header(self):
"""Add a track to a header row"""
repository.add_track_to_header(self.row1.playlistrow_id, self.track2.track_id)
result = repository.get_playlist_row(self.row1.playlistrow_id)
assert result.track_id == self.track2.track_id

View File

@ -134,122 +134,6 @@ class TestMMMiscRowMove(unittest.TestCase):
def tearDown(self):
db.drop_all()
def test_move_rows_test2(self):
# move row 3 to row 5
self.model.move_rows([3], 5)
# Check we have all rows and plr_rownums are correct
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
if row not in [3, 4, 5]:
assert self.model.playlist_rows[row].note == str(row)
elif row == 3:
assert self.model.playlist_rows[row].note == str(4)
elif row == 4:
assert self.model.playlist_rows[row].note == str(3)
elif row == 5:
assert self.model.playlist_rows[row].note == str(5)
def test_move_rows_test3(self):
# move row 4 to row 3
self.model.move_rows([4], 3)
# Check we have all rows and plr_rownums are correct
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
if row not in [3, 4]:
assert self.model.playlist_rows[row].note == str(row)
elif row == 3:
assert self.model.playlist_rows[row].note == str(4)
elif row == 4:
assert self.model.playlist_rows[row].note == str(3)
def test_move_rows_test4(self):
# move row 4 to row 2
self.model.move_rows([4], 2)
# Check we have all rows and plr_rownums are correct
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
if row not in [2, 3, 4]:
assert self.model.playlist_rows[row].note == str(row)
elif row == 2:
assert self.model.playlist_rows[row].note == str(4)
elif row == 3:
assert self.model.playlist_rows[row].note == str(2)
elif row == 4:
assert self.model.playlist_rows[row].note == str(3)
def test_move_rows_test5(self):
# move rows [1, 4, 5, 10] → 8
self.model.move_rows([1, 4, 5, 10], 8)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 2, 3, 6, 7, 1, 4, 5, 10, 8, 9]
def test_move_rows_test6(self):
# move rows [3, 6] → 5
self.model.move_rows([3, 6], 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 4, 3, 6, 5, 7, 8, 9, 10]
def test_move_rows_test7(self):
# move rows [3, 5, 6] → 8
self.model.move_rows([3, 5, 6], 8)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 4, 7, 3, 5, 6, 8, 9, 10]
def test_move_rows_test8(self):
# move rows [7, 8, 10] → 5
self.model.move_rows([7, 8, 10], 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 3, 4, 7, 8, 10, 5, 6, 9]
def test_move_rows_test9(self):
# move rows [1, 2, 3] → 0
# Replicate issue 244
self.model.move_rows([0, 1, 2, 3], 0)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in range(self.model.rowCount()):
assert row in self.model.playlist_rows
assert self.model.playlist_rows[row].row_number == row
new_order.append(int(self.model.playlist_rows[row].note))
assert new_order == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
def test_insert_header_row_end(self):
# insert header row at end of playlist

239
tests/test_repository.py Normal file
View File

@ -0,0 +1,239 @@
# Standard library imports
import unittest
# PyQt imports
# Third party imports
# App imports
from app import playlistmodel
from app import repository
from app.models import db
from classes import PlaylistDTO
from playlistmodel import PlaylistModel
class MyTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Runs once before any test in this class"""
cls.isa_path = "testdata/isa.mp3"
cls.isa_title = "I'm So Afraid"
cls.isa_artist = "Fleetwood Mac"
cls.mom_path = "testdata/mom.mp3"
cls.mom_title = "Man of Mystery"
cls.mom_artist = "The Shadows"
@classmethod
def tearDownClass(cls):
"""Runs once after all tests"""
pass
def setUp(self):
"""Runs before each test"""
db.create_all()
def create_playlist_and_model(self, playlist_name: str) -> (PlaylistDTO, PlaylistModel):
# Create a playlist and model
playlist = repository.create_playlist(name=playlist_name, template_id=0)
assert playlist
model = playlistmodel.PlaylistModel(playlist.playlist_id, is_template=False)
assert model
return (playlist, model)
def create_playlist_model_tracks(self, playlist_name: str):
(playlist, model) = self.create_playlist_and_model("my playlist")
# Create tracks
self.track1 = repository.create_track(self.isa_path)
self.track2 = repository.create_track(self.mom_path)
# Add tracks and header to playlist
self.row0 = repository.insert_row(
playlist.playlist_id,
row_number=0,
track_id=self.track1.track_id,
note="track 1",
)
self.row1 = repository.insert_row(
playlist.playlist_id,
row_number=1,
track_id=0,
note="Header row",
)
self.row2 = repository.insert_row(
playlist.playlist_id,
row_number=2,
track_id=self.track2.track_id,
note="track 2",
)
def create_rows(self, playlist_name: str, number_of_rows: int) -> (PlaylistDTO, PlaylistModel):
(playlist, model) = self.create_playlist_and_model(playlist_name)
for row_number in range(number_of_rows):
repository.insert_row(
playlist.playlist_id, row_number, None, str(row_number)
)
return (playlist, model)
def tearDown(self):
"""Runs after each test"""
db.drop_all()
def test_add_track_to_header(self):
"""Add a track to a header row"""
self.create_playlist_model_tracks("my playlist")
repository.add_track_to_header(self.row1.playlistrow_id, self.track2.track_id)
result = repository.get_playlist_row(self.row1.playlistrow_id)
assert result.track_id == self.track2.track_id
def test_create_track(self):
repository.create_track(self.isa_path)
results = repository.get_all_tracks()
assert len(results) == 1
assert results[0].path == self.isa_path
def test_get_track_by_id(self):
dto = repository.create_track(self.isa_path)
result = repository.track_by_id(dto.track_id)
assert result.path == self.isa_path
def test_get_track_by_artist(self):
_ = repository.create_track(self.isa_path)
_ = repository.create_track(self.mom_path)
result_isa = repository.tracks_like_artist(self.isa_artist)
assert len(result_isa) == 1
assert result_isa[0].artist == self.isa_artist
result_mom = repository.tracks_like_artist(self.mom_artist)
assert len(result_mom) == 1
assert result_mom[0].artist == self.mom_artist
def test_get_track_by_title(self):
_ = repository.create_track(self.isa_path)
_ = repository.create_track(self.mom_path)
result_isa = repository.tracks_like_title(self.isa_title)
assert len(result_isa) == 1
assert result_isa[0].title == self.isa_title
result_mom = repository.tracks_like_title(self.mom_title)
assert len(result_mom) == 1
assert result_mom[0].title == self.mom_title
def test_move_rows_test1(self):
# move row 3 to row 5
number_of_rows = 10
(playlist, model) = self.create_rows("test_move_rows_test1", number_of_rows)
repository.move_rows_within_playlist(playlist.playlist_id, [3], 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 4, 5, 3, 6, 7, 8, 9]
def test_move_rows_test2(self):
# move row 4 to row 3
number_of_rows = 10
(playlist, model) = self.create_rows("test_move_rows_test2", number_of_rows)
repository.move_rows_within_playlist(playlist.playlist_id, [4], 3)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 4, 3, 5, 6, 7, 8, 9]
def test_move_rows_test3(self):
# move row 4 to row 2
number_of_rows = 10
(playlist, model) = self.create_rows("test_move_rows_test3", number_of_rows)
repository.move_rows_within_playlist(playlist.playlist_id, [4], 2)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 4, 2, 3, 5, 6, 7, 8, 9]
def test_move_rows_test4(self):
# move rows [1, 4, 5, 10] → 8
number_of_rows = 11
(playlist, model) = self.create_rows("test_move_rows_test4", number_of_rows)
repository.move_rows_within_playlist(playlist.playlist_id, [1, 4, 5, 10], 8)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 2, 3, 6, 7, 8, 9, 1, 4, 5, 10]
def test_move_rows_test5(self):
# move rows [3, 6] → 5
number_of_rows = 11
(playlist, model) = self.create_rows("test_move_rows_test5", number_of_rows)
repository.move_rows_within_playlist(playlist.playlist_id, [3, 6], 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 4, 5, 3, 6, 7, 8, 9, 10]
def test_move_rows_test6(self):
# move rows [3, 5, 6] → 8
number_of_rows = 11
(playlist, model) = self.create_rows("test_move_rows_test6", number_of_rows)
repository.move_rows_within_playlist(playlist.playlist_id, [3, 5, 6], 8)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 4, 7, 8, 9, 10, 3, 5, 6]
def test_move_rows_test7(self):
# move rows [7, 8, 10] → 5
number_of_rows = 11
(playlist, model) = self.create_rows("test_move_rows_test6", number_of_rows)
repository.move_rows_within_playlist(playlist.playlist_id, [7, 8, 10], 5)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 3, 4, 7, 8, 10, 5, 6, 9]
def test_move_rows_test8(self):
# move rows [1, 2, 3] → 0
# Replicate issue 244
number_of_rows = 11
(playlist, model) = self.create_rows("test_move_rows_test6", number_of_rows)
repository.move_rows_within_playlist(playlist.playlist_id, [0, 1, 2, 3], 0)
# Check we have all rows and plr_rownums are correct
new_order = []
for row in repository.get_playlist_rows(playlist.playlist_id):
new_order.append(int(row.note))
assert new_order == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]