Compare commits
3 Commits
a95aa918b1
...
4c1ee0b1ca
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4c1ee0b1ca | ||
|
|
bc7d6818aa | ||
|
|
0f8409879c |
@ -163,6 +163,13 @@ class TrackInfo(NamedTuple):
|
||||
|
||||
|
||||
# Classes for signals
|
||||
@dataclass
|
||||
class InsertRows:
|
||||
playlist_id: int
|
||||
from_row: int
|
||||
to_row: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class InsertTrack:
|
||||
playlist_id: int
|
||||
@ -188,6 +195,8 @@ class MusicMusterSignals(QObject):
|
||||
search_wikipedia_signal = pyqtSignal(str)
|
||||
show_warning_signal = pyqtSignal(str, str)
|
||||
signal_add_track_to_header = pyqtSignal(int, int)
|
||||
signal_begin_insert_rows = pyqtSignal(InsertRows)
|
||||
signal_end_insert_rows = pyqtSignal(int)
|
||||
signal_insert_track = pyqtSignal(InsertTrack)
|
||||
signal_playlist_selected_rows = pyqtSignal(int, list)
|
||||
signal_set_next_row = pyqtSignal(int)
|
||||
|
||||
@ -32,6 +32,7 @@ from classes import (
|
||||
MusicMusterSignals,
|
||||
singleton,
|
||||
Tags,
|
||||
TrackDTO,
|
||||
)
|
||||
from config import Config
|
||||
from helpers import (
|
||||
@ -40,7 +41,6 @@ from helpers import (
|
||||
show_OK,
|
||||
)
|
||||
from log import log
|
||||
from models import db, Tracks
|
||||
from playlistrow import TrackSequence
|
||||
from playlistmodel import PlaylistModel
|
||||
import helpers
|
||||
@ -122,13 +122,7 @@ class FileImporter:
|
||||
# Get signals
|
||||
self.signals = MusicMusterSignals()
|
||||
|
||||
def _get_existing_tracks(self) -> Sequence[Tracks]:
|
||||
"""
|
||||
Return a list of all existing Tracks
|
||||
"""
|
||||
|
||||
with db.Session() as session:
|
||||
return Tracks.get_all(session)
|
||||
self.existing_tracks: list[TrackDTO] = []
|
||||
|
||||
def start(self) -> None:
|
||||
"""
|
||||
@ -148,7 +142,7 @@ class FileImporter:
|
||||
|
||||
# Refresh list of existing tracks as they may have been updated
|
||||
# by previous imports
|
||||
self.existing_tracks = self._get_existing_tracks()
|
||||
self.existing_tracks = repository.get_all_tracks()
|
||||
|
||||
for infile in [
|
||||
os.path.join(Config.REPLACE_FILES_DEFAULT_SOURCE, f)
|
||||
|
||||
18
app/log.py
18
app/log.py
@ -79,9 +79,22 @@ log = logging.getLogger(Config.LOG_NAME)
|
||||
|
||||
|
||||
def handle_exception(exc_type, exc_value, exc_traceback):
|
||||
error = str(exc_value)
|
||||
"""
|
||||
Inform user of exception
|
||||
"""
|
||||
|
||||
# Navigate to the inner stack frame
|
||||
tb = exc_traceback
|
||||
while tb.tb_next:
|
||||
tb = tb.tb_next
|
||||
|
||||
fname = os.path.basename(tb.tb_frame.f_code.co_filename)
|
||||
lineno = tb.tb_lineno
|
||||
msg = f"ApplicationError: {exc_value}\nat {fname}:{lineno}"
|
||||
logmsg = f"ApplicationError: {exc_value} at {fname}:{lineno}"
|
||||
|
||||
if issubclass(exc_type, ApplicationError):
|
||||
log.error(error)
|
||||
log.error(logmsg)
|
||||
else:
|
||||
# Handle unexpected errors (log and display)
|
||||
error_msg = "".join(traceback.format_exception(exc_type, exc_value, exc_traceback))
|
||||
@ -104,7 +117,6 @@ def handle_exception(exc_type, exc_value, exc_traceback):
|
||||
)
|
||||
if QApplication.instance() is not None:
|
||||
fname = os.path.split(exc_traceback.tb_frame.f_code.co_filename)[1]
|
||||
msg = f"ApplicationError: {error}\nat {fname}:{exc_traceback.tb_lineno}"
|
||||
QMessageBox.critical(None, "Application Error", msg)
|
||||
|
||||
|
||||
|
||||
@ -438,24 +438,6 @@ class PlaylistRows(dbtables.PlaylistRowsTable):
|
||||
)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def fixup_rownumbers(session: Session, playlist_id: int) -> None:
|
||||
"""
|
||||
Ensure the row numbers for passed playlist have no gaps
|
||||
"""
|
||||
|
||||
plrs = session.scalars(
|
||||
select(PlaylistRows)
|
||||
.where(PlaylistRows.playlist_id == playlist_id)
|
||||
.order_by(PlaylistRows.row_number)
|
||||
).all()
|
||||
|
||||
for i, plr in enumerate(plrs):
|
||||
plr.row_number = i
|
||||
|
||||
# Ensure new row numbers are available to the caller
|
||||
session.commit()
|
||||
|
||||
@classmethod
|
||||
def plrids_to_plrs(
|
||||
cls, session: Session, playlist_id: int, plr_ids: list[int]
|
||||
|
||||
@ -1180,7 +1180,7 @@ class Window(QMainWindow):
|
||||
self.footer_section.widgetFadeVolume.setDefaultPadding(0)
|
||||
self.footer_section.widgetFadeVolume.setBackground(Config.FADE_CURVE_BACKGROUND)
|
||||
|
||||
self.move_source_rows: Optional[list[int]] = None
|
||||
self.move_source_rows: list[PlaylistRow] = []
|
||||
self.move_source_model: Optional[PlaylistModel] = None
|
||||
|
||||
self.disable_selection_timing = False
|
||||
@ -2030,7 +2030,7 @@ class Window(QMainWindow):
|
||||
|
||||
# Save the selected PlaylistRows items ready for a later
|
||||
# paste
|
||||
self.move_source_rows = self.current.selected_row_numbers
|
||||
self.move_source_rows = self.current.base_model.selected_rows
|
||||
self.move_source_model = self.current.base_model
|
||||
|
||||
log.debug(
|
||||
@ -2665,6 +2665,8 @@ class Window(QMainWindow):
|
||||
Update track clocks.
|
||||
"""
|
||||
|
||||
self.timer1000.stop()
|
||||
raise ApplicationError("test")
|
||||
# If track is playing, update track clocks time and colours
|
||||
if self.track_sequence.current and self.track_sequence.current.is_playing():
|
||||
# Elapsed time
|
||||
|
||||
@ -34,6 +34,7 @@ import obswebsocket # type: ignore
|
||||
from classes import (
|
||||
ApplicationError,
|
||||
Col,
|
||||
InsertRows,
|
||||
MusicMusterSignals,
|
||||
)
|
||||
from config import Config
|
||||
@ -94,16 +95,14 @@ class PlaylistModel(QAbstractTableModel):
|
||||
self.signals.begin_reset_model_signal.connect(self.begin_reset_model)
|
||||
self.signals.end_reset_model_signal.connect(self.end_reset_model)
|
||||
self.signals.signal_add_track_to_header.connect(self.add_track_to_header)
|
||||
self.signals.signal_begin_insert_rows.connect(self.begin_insert_rows)
|
||||
self.signals.signal_end_insert_rows.connect(self.end_insert_rows)
|
||||
self.signals.signal_playlist_selected_rows.connect(self.set_selected_rows)
|
||||
self.signals.signal_set_next_row.connect(self.set_next_row)
|
||||
|
||||
with db.Session() as session:
|
||||
# Ensure row numbers in playlist are contiguous
|
||||
# TODO: remove this
|
||||
PlaylistRows.fixup_rownumbers(session, playlist_id)
|
||||
|
||||
# Populate self.playlist_rows
|
||||
self.load_data()
|
||||
for dto in repository.get_playlist_rows(self.playlist_id):
|
||||
self.playlist_rows[dto.row_number] = PlaylistRow(dto)
|
||||
self.update_track_times()
|
||||
|
||||
def __repr__(self) -> str:
|
||||
@ -389,25 +388,18 @@ class PlaylistModel(QAbstractTableModel):
|
||||
Need to delete them in contiguous groups wrapped in beginRemoveRows / endRemoveRows
|
||||
calls. To keep it simple, if inefficient, delete rows one by one.
|
||||
|
||||
TODO: delete in blocks
|
||||
|
||||
Delete from highest row back so that not yet deleted row numbers don't change.
|
||||
"""
|
||||
|
||||
with db.Session() as session:
|
||||
for row_number in sorted(row_numbers, reverse=True):
|
||||
log.debug(f"{self}: delete_rows(), {row_number=}")
|
||||
super().beginRemoveRows(QModelIndex(), row_number, row_number)
|
||||
# We need to remove data from the underlying data store,
|
||||
# which is the database, but we cache in
|
||||
# self.playlist_rows, which is what calls to data()
|
||||
# reads, so fixup that too.
|
||||
PlaylistRows.delete_row(session, self.playlist_id, row_number)
|
||||
PlaylistRows.fixup_rownumbers(session, self.playlist_id)
|
||||
self.refresh_data(session)
|
||||
session.commit()
|
||||
super().endRemoveRows()
|
||||
for row_group in self._reversed_contiguous_row_groups(row_numbers):
|
||||
# Signal that rows will be removed
|
||||
super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group))
|
||||
# Remove rows from data store
|
||||
repository.remove_rows(self.playlist_id, row_group)
|
||||
# Signal that data store has been updated
|
||||
super().endRemoveRows()
|
||||
|
||||
self.refresh_data()
|
||||
self.track_sequence.update()
|
||||
self.update_track_times()
|
||||
|
||||
@ -825,39 +817,6 @@ class PlaylistModel(QAbstractTableModel):
|
||||
|
||||
return None
|
||||
|
||||
def load_data(self) -> None:
|
||||
"""
|
||||
Same as refresh data, but only used when creating playslit.
|
||||
Distinguishes profile time between initial load and other
|
||||
refreshes.
|
||||
"""
|
||||
|
||||
# We used to clear self.playlist_rows each time but that's
|
||||
# expensive and slow on big playlists
|
||||
|
||||
# # Note where each playlist_id is
|
||||
# plid_to_row: dict[int, int] = {}
|
||||
# for oldrow in self.playlist_rows:
|
||||
# plrdata = self.playlist_rows[oldrow]
|
||||
# plid_to_row[plrdata.playlistrow_id] = plrdata.row_number
|
||||
|
||||
# build a new playlist_rows
|
||||
# new_playlist_rows: dict[int, RowAndTrack] = {}
|
||||
# for p in PlaylistRows.get_playlist_rows(session, self.playlist_id):
|
||||
# if p.id not in plid_to_row:
|
||||
# new_playlist_rows[p.row_number] = RowAndTrack(p)
|
||||
# else:
|
||||
# new_playlist_rows[p.row_number] = self.playlist_rows[plid_to_row[p.id]]
|
||||
# new_playlist_rows[p.row_number].row_number = p.row_number
|
||||
# build a new playlist_rows
|
||||
# shouldn't be PlaylistRow
|
||||
new_playlist_rows: dict[int, PlaylistRow] = {}
|
||||
for dto in repository.get_playlist_rows(self.playlist_id):
|
||||
new_playlist_rows[dto.row_number] = PlaylistRow(dto)
|
||||
|
||||
# Copy to self.playlist_rows
|
||||
self.playlist_rows = new_playlist_rows
|
||||
|
||||
def mark_unplayed(self, row_numbers: list[int]) -> None:
|
||||
"""
|
||||
Mark row as unplayed
|
||||
@ -874,78 +833,81 @@ class PlaylistModel(QAbstractTableModel):
|
||||
]
|
||||
self.invalidate_rows(row_numbers, roles)
|
||||
|
||||
def move_rows(self, from_rows: list[int], to_row_number: int) -> None:
|
||||
def move_rows(self, from_rows: list[PlaylistRow], to_row_number: int) -> None:
|
||||
"""
|
||||
Move the playlist rows given to to_row and below.
|
||||
"""
|
||||
|
||||
log.debug(f"{self}: move_rows({from_rows=}, {to_row_number=}")
|
||||
|
||||
# Build a {current_row_number: new_row_number} dictionary
|
||||
row_map: dict[int, int] = {}
|
||||
# Don't move current row
|
||||
if self.track_sequence.current:
|
||||
current_row = self.track_sequence.current.row_number
|
||||
if current_row in from_rows:
|
||||
log.debug(
|
||||
"move_rows: Removing {current_row=} from {from_rows=}"
|
||||
)
|
||||
from_rows.remove(self.track_sequence.current.row_number)
|
||||
|
||||
# The destination row number will need to be reduced by the
|
||||
# number of rows being move from above the destination row
|
||||
# otherwise rows below the destination row will end up above the
|
||||
# moved rows.
|
||||
adjusted_to_row = to_row_number - len(
|
||||
[a for a in from_rows if a < to_row_number]
|
||||
)
|
||||
# Row moves must be wrapped in beginMoveRows .. endMoveRows and
|
||||
# the row range must be contiguous. Process the highest rows
|
||||
# first so the lower row numbers are unchanged
|
||||
|
||||
# Put the from_row row numbers into the row_map. Ultimately the
|
||||
# total number of elements in the playlist doesn't change, so
|
||||
# check that adding the moved rows starting at to_row won't
|
||||
# overshoot the end of the playlist.
|
||||
if adjusted_to_row + len(from_rows) > len(self.playlist_rows):
|
||||
next_to_row = len(self.playlist_rows) - len(from_rows)
|
||||
else:
|
||||
next_to_row = adjusted_to_row
|
||||
row_groups = self._reversed_contiguous_row_groups([a.row_number for a in from_rows])
|
||||
|
||||
# zip iterates from_row and to_row simultaneously from the
|
||||
# respective sequences inside zip()
|
||||
for from_row, to_row in zip(
|
||||
from_rows, range(next_to_row, next_to_row + len(from_rows))
|
||||
):
|
||||
row_map[from_row] = to_row
|
||||
|
||||
# Move the remaining rows to the row_map. We want to fill it
|
||||
# before (if there are gaps) and after (likewise) the rows that
|
||||
# are moving.
|
||||
# zip iterates old_row and new_row simultaneously from the
|
||||
# respective sequences inside zip()
|
||||
for old_row, new_row in zip(
|
||||
[x for x in self.playlist_rows.keys() if x not in from_rows],
|
||||
[y for y in range(len(self.playlist_rows)) if y not in row_map.values()],
|
||||
):
|
||||
# Optimise: only add to map if there is a change
|
||||
if old_row != new_row:
|
||||
row_map[old_row] = new_row
|
||||
|
||||
# For SQLAlchemy, build a list of dictionaries that map playlistrow_id to
|
||||
# new row number:
|
||||
sqla_map: list[dict[str, int]] = []
|
||||
for oldrow, newrow in row_map.items():
|
||||
playlistrow_id = self.playlist_rows[oldrow].playlistrow_id
|
||||
sqla_map.append({"playlistrow_id": playlistrow_id, "row_number": newrow})
|
||||
|
||||
with db.Session() as session:
|
||||
PlaylistRows.update_plr_row_numbers(session, self.playlist_id, sqla_map)
|
||||
session.commit()
|
||||
# Update playlist_rows
|
||||
self.refresh_data(session)
|
||||
# Handle the moves in row_group chunks
|
||||
for row_group in row_groups:
|
||||
# Tell model we will be moving rows
|
||||
# See https://doc.qt.io/qt-6/qabstractitemmodel.html#beginMoveRows
|
||||
# for how destination is calculated
|
||||
destination = to_row_number
|
||||
if to_row_number > max(row_group):
|
||||
destination = to_row_number - max(row_group) + 1
|
||||
super().beginMoveRows(QModelIndex(),
|
||||
min(row_group),
|
||||
max(row_group),
|
||||
QModelIndex(),
|
||||
destination
|
||||
)
|
||||
# Update database
|
||||
repository.move_rows_within_playlist(self.playlist_id, row_group, to_row_number)
|
||||
# Tell model we have finished moving rows
|
||||
super().endMoveRows()
|
||||
|
||||
# Update display
|
||||
self.refresh_data()
|
||||
self.track_sequence.update()
|
||||
self.update_track_times()
|
||||
# only invalidate required roles
|
||||
roles = [
|
||||
Qt.ItemDataRole.DisplayRole,
|
||||
]
|
||||
self.invalidate_rows(list(row_map.keys()), roles)
|
||||
# TODO: do we need this?
|
||||
# # only invalidate required roles
|
||||
# roles = [
|
||||
# Qt.ItemDataRole.DisplayRole,
|
||||
# ]
|
||||
# self.invalidate_rows(list(row_map.keys()), roles)
|
||||
|
||||
def begin_insert_rows(self, insert_rows: InsertRows) -> None:
|
||||
"""
|
||||
Prepare model to insert rows
|
||||
"""
|
||||
|
||||
if insert_rows.playlist_id != self.playlist_id:
|
||||
return
|
||||
|
||||
super().beginInsertRows(QModelIndex(), insert_rows.from_row, insert_rows.to_row)
|
||||
|
||||
def end_insert_rows(self, playlist_id: int) -> None:
|
||||
"""
|
||||
End insert rows
|
||||
"""
|
||||
|
||||
if playlist_id != self.playlist_id:
|
||||
return
|
||||
|
||||
super().endInsertRows()
|
||||
|
||||
def move_rows_between_playlists(
|
||||
self,
|
||||
from_rows: list[int],
|
||||
from_rows: list[PlaylistRow],
|
||||
to_row_number: int,
|
||||
to_playlist_id: int,
|
||||
) -> None:
|
||||
@ -958,56 +920,46 @@ class PlaylistModel(QAbstractTableModel):
|
||||
f"{to_row_number=}, {to_playlist_id=}"
|
||||
)
|
||||
|
||||
# Row removal must be wrapped in beginRemoveRows ..
|
||||
# endRemoveRows and the row range must be contiguous. Process
|
||||
# the highest rows first so the lower row numbers are unchanged
|
||||
row_groups = self._reversed_contiguous_row_groups(from_rows)
|
||||
|
||||
# Prepare destination playlist for a reset
|
||||
self.signals.begin_reset_model_signal.emit(to_playlist_id)
|
||||
|
||||
with db.Session() as session:
|
||||
for row_group in row_groups:
|
||||
# Make room in destination playlist
|
||||
max_destination_row_number = PlaylistRows.get_last_used_row(
|
||||
session, to_playlist_id
|
||||
# Don't move current row
|
||||
if self.track_sequence.current:
|
||||
current_row = self.track_sequence.current.row_number
|
||||
if current_row in from_rows:
|
||||
log.debug(
|
||||
"move_rows_between_playlists: Removing {current_row=} from {from_rows=}"
|
||||
)
|
||||
if (
|
||||
max_destination_row_number
|
||||
and to_row_number <= max_destination_row_number
|
||||
):
|
||||
# Move the destination playlist rows down to make room.
|
||||
PlaylistRows.move_rows_down(
|
||||
session, to_playlist_id, to_row_number, len(row_group)
|
||||
)
|
||||
next_to_row = to_row_number
|
||||
from_rows.remove(self.track_sequence.current.row_number)
|
||||
|
||||
super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group))
|
||||
for playlist_row in PlaylistRows.plrids_to_plrs(
|
||||
session,
|
||||
self.playlist_id,
|
||||
[self.playlist_rows[a].playlistrow_id for a in row_group],
|
||||
):
|
||||
if (
|
||||
self.track_sequence.current
|
||||
and playlist_row.id == self.track_sequence.current.playlistrow_id
|
||||
):
|
||||
# Don't move current track
|
||||
continue
|
||||
playlist_row.playlist_id = to_playlist_id
|
||||
playlist_row.row_number = next_to_row
|
||||
next_to_row += 1
|
||||
self.refresh_data(session)
|
||||
super().endRemoveRows()
|
||||
# We need to remove gaps in row numbers after tracks have
|
||||
# moved.
|
||||
PlaylistRows.fixup_rownumbers(session, self.playlist_id)
|
||||
self.refresh_data(session)
|
||||
session.commit()
|
||||
# Row removal must be wrapped in beginRemoveRows .. endRemoveRows
|
||||
# and the row range must be contiguous. Process the highest rows
|
||||
# first so the lower row numbers are unchanged
|
||||
|
||||
# Reset of model must come after session has been closed
|
||||
row_groups = self._reversed_contiguous_row_groups([a.row_number for a in from_rows])
|
||||
|
||||
# Handle the moves in row_group chunks
|
||||
|
||||
# TODO: use bool QAbstractItemModel::beginMoveRows(const
|
||||
# QModelIndex &sourceParent, int sourceFirst, int sourceLast,
|
||||
# const QModelIndex &destinationParent, int destinationChild)
|
||||
|
||||
for row_group in row_groups:
|
||||
# Prepare source model
|
||||
super().beginRemoveRows(QModelIndex(), min(row_group), max(row_group))
|
||||
# Prepare destination model
|
||||
insert_rows = InsertRows(to_playlist_id,
|
||||
to_row_number,
|
||||
to_row_number + len(row_group)
|
||||
)
|
||||
self.signals.signal_begin_insert_rows.emit(insert_rows)
|
||||
repository.move_rows_to_playlist(from_rows=row_group,
|
||||
from_playlist_id=self.playlist_id,
|
||||
to_row=to_row_number,
|
||||
to_playlist_id=to_playlist_id
|
||||
)
|
||||
self.signals.signal_end_insert_rows.emit(to_playlist_id)
|
||||
super().endRemoveRows()
|
||||
|
||||
self.refresh_data()
|
||||
self.track_sequence.update()
|
||||
self.signals.end_reset_model_signal.emit(to_playlist_id)
|
||||
self.update_track_times()
|
||||
|
||||
def move_track_add_note(
|
||||
@ -1101,16 +1053,9 @@ class PlaylistModel(QAbstractTableModel):
|
||||
]
|
||||
self.invalidate_row(self.track_sequence.previous.row_number, roles)
|
||||
|
||||
def refresh_data(self, session: Session) -> None:
|
||||
def refresh_data(self) -> None:
|
||||
"""
|
||||
Populate self.playlist_rows with playlist data
|
||||
|
||||
We used to clear self.playlist_rows each time but that's
|
||||
expensive and slow on big playlists. Instead we track where rows
|
||||
are in database versus self.playlist_rows and fixup the latter.
|
||||
This works well for news rows added and for rows moved, but
|
||||
doesn't work for changed comments so they must be handled using
|
||||
refresh_row().
|
||||
"""
|
||||
|
||||
# Note where each playlist_id is by mapping each playlistrow_id
|
||||
@ -1216,9 +1161,7 @@ class PlaylistModel(QAbstractTableModel):
|
||||
]
|
||||
self.invalidate_rows(row_numbers, roles)
|
||||
|
||||
def _reversed_contiguous_row_groups(
|
||||
self, row_numbers: list[int]
|
||||
) -> list[list[int]]:
|
||||
def _reversed_contiguous_row_groups(self, row_numbers: list[int]) -> list[list[int]]:
|
||||
"""
|
||||
Take the list of row numbers and split into groups of contiguous rows. Return as a list
|
||||
of lists with the highest row numbers first.
|
||||
@ -1233,6 +1176,7 @@ class PlaylistModel(QAbstractTableModel):
|
||||
result: list[list[int]] = []
|
||||
temp: list[int] = []
|
||||
last_value = row_numbers[0] - 1
|
||||
row_numbers.sort()
|
||||
|
||||
for idx in range(len(row_numbers)):
|
||||
if row_numbers[idx] != last_value + 1:
|
||||
|
||||
@ -5,13 +5,14 @@ import re
|
||||
|
||||
# Third party imports
|
||||
from sqlalchemy import (
|
||||
delete,
|
||||
func,
|
||||
select,
|
||||
update,
|
||||
)
|
||||
from sqlalchemy.orm import aliased
|
||||
from sqlalchemy.orm.session import Session
|
||||
from sqlalchemy.sql.elements import BinaryExpression
|
||||
from sqlalchemy.sql.elements import BinaryExpression, ColumnElement
|
||||
from classes import ApplicationError, PlaylistRowDTO
|
||||
|
||||
# App imports
|
||||
@ -113,6 +114,12 @@ def create_track(path: str) -> TrackDTO:
|
||||
return new_track
|
||||
|
||||
|
||||
def get_all_tracks() -> list[TrackDTO]:
|
||||
"""Return a list of all tracks"""
|
||||
|
||||
return _tracks_where(Tracks.id > 0)
|
||||
|
||||
|
||||
def track_by_id(track_id: int) -> TrackDTO | None:
|
||||
"""
|
||||
Return track with specified id
|
||||
@ -171,7 +178,7 @@ def track_by_id(track_id: int) -> TrackDTO | None:
|
||||
return dto
|
||||
|
||||
|
||||
def _tracks_like(where: BinaryExpression) -> list[TrackDTO]:
|
||||
def _tracks_where(where: BinaryExpression | ColumnElement[bool]) -> list[TrackDTO]:
|
||||
"""
|
||||
Return tracks selected by where
|
||||
"""
|
||||
@ -235,7 +242,7 @@ def tracks_like_artist(filter_str: str) -> list[TrackDTO]:
|
||||
Return tracks where artist is like filter
|
||||
"""
|
||||
|
||||
return _tracks_like(Tracks.artist.ilike(f"%{filter_str}%"))
|
||||
return _tracks_where(Tracks.artist.ilike(f"%{filter_str}%"))
|
||||
|
||||
|
||||
def tracks_like_title(filter_str: str) -> list[TrackDTO]:
|
||||
@ -243,54 +250,16 @@ def tracks_like_title(filter_str: str) -> list[TrackDTO]:
|
||||
Return tracks where title is like filter
|
||||
"""
|
||||
|
||||
return _tracks_like(Tracks.title.ilike(f"%{filter_str}%"))
|
||||
return _tracks_where(Tracks.title.ilike(f"%{filter_str}%"))
|
||||
|
||||
|
||||
# Playlist functions
|
||||
def _check_row_number_sequence(
|
||||
session: Session, playlist_id: int, fix: bool = False
|
||||
) -> None:
|
||||
"""
|
||||
The row numbers for any playlist should run from 0 to (length - 1).
|
||||
This function checks that that is the case.
|
||||
|
||||
If there are errors, 'fix' determines what action is taken.
|
||||
|
||||
If fix == True:
|
||||
Fix the row numbers and save to database. Log at info level.
|
||||
If fix == False:
|
||||
Log at error level and raise ApplicationError
|
||||
"""
|
||||
|
||||
errors = False
|
||||
|
||||
playlist_rows = session.scalars(
|
||||
select(PlaylistRows)
|
||||
.where(PlaylistRows.playlist_id == playlist_id)
|
||||
.order_by(PlaylistRows.row_number)
|
||||
).all()
|
||||
|
||||
for idx, playlist_row in enumerate(playlist_rows):
|
||||
if playlist_row.row_number != idx:
|
||||
errors = True
|
||||
msg = f"_check_row_number_sequence({playlist_id=}, {fix=}, {playlist_row=}, {idx=}"
|
||||
if fix:
|
||||
log.info(msg)
|
||||
playlist_row.row_number = idx
|
||||
else:
|
||||
log.error(msg)
|
||||
raise ApplicationError(msg)
|
||||
|
||||
if errors:
|
||||
session.commit()
|
||||
|
||||
|
||||
def _move_rows_down(
|
||||
def _move_rows(
|
||||
session: Session, playlist_id: int, starting_row: int, move_by: int
|
||||
) -> None:
|
||||
"""
|
||||
Create space to insert move_by additional rows by incremented row
|
||||
number from starting_row to end of playlist
|
||||
Move rows from starting_row by move_by. If move_by is +ve, move rows
|
||||
down; if -ve, move them up.
|
||||
"""
|
||||
|
||||
log.debug(f"(_move_rows_down({playlist_id=}, {starting_row=}, {move_by=}")
|
||||
@ -306,6 +275,111 @@ def _move_rows_down(
|
||||
session.commit()
|
||||
|
||||
|
||||
def move_rows_to_playlist(
|
||||
from_rows: list[int], from_playlist_id: int, to_row: int, to_playlist_id: int
|
||||
) -> None:
|
||||
"""
|
||||
Move rows between playlists.
|
||||
"""
|
||||
|
||||
with db.Session() as session:
|
||||
# Prepare desination playlist
|
||||
# Find last used row
|
||||
last_row = session.execute(
|
||||
select(func.max(PlaylistRows.row_number)).where(
|
||||
PlaylistRows.playlist_id == to_playlist_id
|
||||
)
|
||||
).scalar_one()
|
||||
if last_row is None:
|
||||
last_row = -1
|
||||
# Make room in destination
|
||||
if to_row <= last_row:
|
||||
_move_rows(session, to_playlist_id, to_row, len(from_rows))
|
||||
# Move rows
|
||||
row_offset = to_row - min(from_rows)
|
||||
stmt = (
|
||||
update(PlaylistRows)
|
||||
.where(
|
||||
PlaylistRows.playlist_id == from_playlist_id,
|
||||
PlaylistRows.row_number.in_(from_rows)
|
||||
)
|
||||
.values(
|
||||
playlist_id=to_playlist_id,
|
||||
row_number=PlaylistRows.row_number + row_offset
|
||||
)
|
||||
)
|
||||
session.execute(stmt)
|
||||
# Remove gaps in source
|
||||
_move_rows(session=session,
|
||||
playlist_id=from_playlist_id,
|
||||
starting_row=max(from_rows) + 1,
|
||||
move_by=(len(from_rows) * -1)
|
||||
)
|
||||
# Commit changes
|
||||
session.commit()
|
||||
# Sanity check
|
||||
_check_playlist_integrity(session, get_playlist_rows(from_playlist_id), fix=False)
|
||||
_check_playlist_integrity(session, get_playlist_rows(to_playlist_id), fix=False)
|
||||
|
||||
|
||||
def move_rows_within_playlist(playlist_id: int, from_rows: list[int], to_row: int) -> None:
|
||||
"""
|
||||
Move rows within a playlist.
|
||||
"""
|
||||
|
||||
log.debug(f"move_rows_within_playlist({playlist_id=}, {from_rows=}, {to_row=})")
|
||||
|
||||
playlistrows_dto = get_playlist_rows(playlist_id)
|
||||
new_order: dict[int, int | None] = dict.fromkeys(range(len(playlistrows_dto)))
|
||||
|
||||
# The destination row number will need to be reduced by the
|
||||
# number of rows being move from above the destination row
|
||||
# otherwise rows below the destination row will end up above the
|
||||
# moved rows.
|
||||
# next_row = to_row - len([a for a in from_rows if a < to_row])
|
||||
# Need to ensure the moved rows won't overrun the total number of
|
||||
# rows
|
||||
next_row = to_row
|
||||
if next_row + len(from_rows) > len(playlistrows_dto):
|
||||
next_row = len(playlistrows_dto) - len(from_rows)
|
||||
|
||||
# Populate new_order with moved rows
|
||||
# # We need to keep, where possible, the rows after to_row unmoved
|
||||
# if to_row + len(from_rows) > len(playlistrows_dto):
|
||||
# next_row = max(to_row - len(from_rows) - len([a for a in from_rows if a < to_row]) + 1, 0)
|
||||
for from_row in from_rows:
|
||||
new_order[next_row] = from_row
|
||||
next_row += 1
|
||||
|
||||
# Move remaining rows
|
||||
remaining_rows = set(new_order.keys()) - set(from_rows)
|
||||
next_row = 0
|
||||
for row in remaining_rows:
|
||||
while new_order[next_row] is not None:
|
||||
next_row += 1
|
||||
new_order[next_row] = row
|
||||
next_row += 1
|
||||
|
||||
# Sanity check
|
||||
if None in new_order:
|
||||
raise ApplicationError(f"None remains after move: {new_order=}")
|
||||
|
||||
# Update database
|
||||
# Build a list of dicts of (id: value, row_number: value}
|
||||
update_list = []
|
||||
for new_row_number, old_row_number in new_order.items():
|
||||
plrid = [a.playlistrow_id for a in playlistrows_dto if a.row_number == old_row_number][0]
|
||||
update_list.append(dict(id=plrid, row_number=new_row_number))
|
||||
|
||||
# Update rows
|
||||
with db.Session() as session:
|
||||
session.execute(update(PlaylistRows), update_list)
|
||||
session.commit()
|
||||
|
||||
# Sanity check
|
||||
_check_playlist_integrity(session, get_playlist_rows(playlist_id), fix=False)
|
||||
|
||||
|
||||
def create_playlist(name: str, template_id: int) -> PlaylistDTO:
|
||||
"""
|
||||
Create playlist and return DTO.
|
||||
@ -417,6 +491,27 @@ def get_playlist_row(playlistrow_id: int) -> PlaylistRowDTO | None:
|
||||
return dto
|
||||
|
||||
|
||||
def _check_playlist_integrity(
|
||||
session: Session, playlist_rows: list[PlaylistRowDTO], fix: bool = False
|
||||
) -> None:
|
||||
"""
|
||||
Ensure the row numbers are contiguous. Fix and log if fix==True,
|
||||
else raise ApplicationError.
|
||||
"""
|
||||
|
||||
for idx, plr in enumerate(playlist_rows):
|
||||
if plr.row_number == idx:
|
||||
continue
|
||||
|
||||
msg = f"_check_playlist_integrity: incorrect row number ({plr.playlistrow_id=}, {idx=})"
|
||||
if fix:
|
||||
log.debug(msg)
|
||||
plr.row_number = idx
|
||||
session.commit()
|
||||
else:
|
||||
raise ApplicationError(msg)
|
||||
|
||||
|
||||
def get_playlist_rows(playlist_id: int) -> list[PlaylistRowDTO]:
|
||||
# Alias PlaydatesTable for subquery
|
||||
LatestPlaydate = aliased(Playdates)
|
||||
@ -458,6 +553,9 @@ def get_playlist_rows(playlist_id: int) -> list[PlaylistRowDTO]:
|
||||
|
||||
with db.Session() as session:
|
||||
results = session.execute(stmt).all()
|
||||
# Sanity check
|
||||
# TODO: would be good to be confident at removing this
|
||||
_check_playlist_integrity(session=session, playlist_rows=results, fix=False)
|
||||
|
||||
dto_list = []
|
||||
for row in results:
|
||||
@ -516,10 +614,10 @@ def insert_row(
|
||||
|
||||
with db.Session() as session:
|
||||
# Sanity check
|
||||
_check_row_number_sequence(session=session, playlist_id=playlist_id, fix=False)
|
||||
_check_playlist_integrity(session, get_playlist_rows(playlist_id), fix=False)
|
||||
|
||||
# Make space for new row
|
||||
_move_rows_down(
|
||||
_move_rows(
|
||||
session=session, playlist_id=playlist_id, starting_row=row_number, move_by=1
|
||||
)
|
||||
|
||||
@ -534,7 +632,7 @@ def insert_row(
|
||||
playlist_row_id = playlist_row.id
|
||||
|
||||
# Sanity check
|
||||
_check_row_number_sequence(session=session, playlist_id=playlist_id, fix=False)
|
||||
_check_playlist_integrity(session, get_playlist_rows(playlist_id), fix=False)
|
||||
|
||||
new_playlist_row = get_playlist_row(playlistrow_id=playlist_row_id)
|
||||
if not new_playlist_row:
|
||||
@ -543,6 +641,29 @@ def insert_row(
|
||||
return new_playlist_row
|
||||
|
||||
|
||||
def remove_rows(playlist_id: int, row_numbers: list[int]) -> None:
|
||||
"""
|
||||
Remove rows from playlist
|
||||
|
||||
Delete from highest row back so that not yet deleted row numbers don't change.
|
||||
"""
|
||||
|
||||
log.debug(f"remove_rows({playlist_id=}, {row_numbers=}")
|
||||
|
||||
with db.Session() as session:
|
||||
for row_number in sorted(row_numbers, reverse=True):
|
||||
session.execute(
|
||||
delete(PlaylistRows).where(
|
||||
PlaylistRows.playlist_id == playlist_id,
|
||||
PlaylistRows.row_number == row_number,
|
||||
)
|
||||
)
|
||||
# Fixup row number to remove gaps
|
||||
_check_playlist_integrity(session, playlist_id, fix=True)
|
||||
|
||||
session.commit()
|
||||
|
||||
|
||||
def playlist_by_id(playlist_id: int) -> PlaylistDTO | None:
|
||||
"""
|
||||
Return playlist with specified id
|
||||
@ -579,7 +700,9 @@ def get_setting(name: str) -> int | None:
|
||||
"""
|
||||
|
||||
with db.Session() as session:
|
||||
record = session.execute(select(Settings).where(Settings.name == name)).one_or_none()
|
||||
record = session.execute(
|
||||
select(Settings).where(Settings.name == name)
|
||||
).one_or_none()
|
||||
if not record:
|
||||
return None
|
||||
|
||||
@ -592,12 +715,12 @@ def set_setting(name: str, value: int) -> None:
|
||||
"""
|
||||
|
||||
with db.Session() as session:
|
||||
record = session.execute(select(Settings).where(Settings.name == name)).one_or_none()
|
||||
record = session.execute(
|
||||
select(Settings).where(Settings.name == name)
|
||||
).one_or_none()
|
||||
if not record:
|
||||
record = Settings(session=session, name=name)
|
||||
if not record:
|
||||
raise ApplicationError("Can't create Settings record")
|
||||
record.f_int = value
|
||||
session.commit()
|
||||
|
||||
|
||||
|
||||
166
kae.py
Executable file
166
kae.py
Executable file
@ -0,0 +1,166 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import sys
|
||||
import datetime as dt
|
||||
from dataclasses import dataclass
|
||||
from PyQt6.QtWidgets import (
|
||||
QApplication, QDialog, QLabel, QLineEdit, QListWidget,
|
||||
QVBoxLayout, QHBoxLayout, QPushButton, QListWidgetItem
|
||||
)
|
||||
from PyQt6.QtCore import Qt, pyqtSignal
|
||||
|
||||
@dataclass
|
||||
class TrackDTO:
|
||||
track_id: int
|
||||
artist: str
|
||||
bitrate: int
|
||||
duration: int # milliseconds
|
||||
fade_at: int
|
||||
intro: int | None
|
||||
path: str
|
||||
silence_at: int
|
||||
start_gap: int
|
||||
title: str
|
||||
lastplayed: dt.datetime | None
|
||||
|
||||
# Placeholder external function to simulate search
|
||||
def search_titles(query: str) -> list[TrackDTO]:
|
||||
now = dt.datetime.now()
|
||||
dummy_tracks = [
|
||||
TrackDTO(1, "Artist A", 320, 210000, 0, None, "", 0, 0, "Title One", now - dt.timedelta(days=2)),
|
||||
TrackDTO(2, "Artist B", 256, 185000, 0, None, "", 0, 0, "Another Title", now - dt.timedelta(days=30)),
|
||||
TrackDTO(3, "Artist C", 320, 240000, 0, None, "", 0, 0, "More Music", None),
|
||||
]
|
||||
return [t for t in dummy_tracks if query.lower() in t.title.lower()]
|
||||
|
||||
def format_duration(ms: int) -> str:
|
||||
minutes, seconds = divmod(ms // 1000, 60)
|
||||
return f"{minutes}:{seconds:02d}"
|
||||
|
||||
def friendly_last_played(lastplayed: dt.datetime | None) -> str:
|
||||
if lastplayed is None:
|
||||
return "(Never)"
|
||||
now = dt.datetime.now()
|
||||
delta = now - lastplayed
|
||||
days = delta.days
|
||||
|
||||
if days == 0:
|
||||
return "(Today)"
|
||||
elif days == 1:
|
||||
return "(Yesterday)"
|
||||
|
||||
years, days_remain = divmod(days, 365)
|
||||
months, days_final = divmod(days_remain, 30)
|
||||
|
||||
parts = []
|
||||
if years:
|
||||
parts.append(f"{years}y")
|
||||
if months:
|
||||
parts.append(f"{months}m")
|
||||
if days_final:
|
||||
parts.append(f"{days_final}d")
|
||||
formatted = " ".join(parts)
|
||||
return f"({formatted} ago)"
|
||||
|
||||
class TrackInsertDialog(QDialog):
|
||||
signal_insert_track = pyqtSignal(int, str)
|
||||
|
||||
def __init__(self, parent=None):
|
||||
super().__init__(parent)
|
||||
self.setWindowTitle("Insert Track")
|
||||
|
||||
# Title input on one line
|
||||
self.title_label = QLabel("Title:")
|
||||
self.title_edit = QLineEdit()
|
||||
self.title_edit.textChanged.connect(self.update_list)
|
||||
|
||||
title_layout = QHBoxLayout()
|
||||
title_layout.addWidget(self.title_label)
|
||||
title_layout.addWidget(self.title_edit)
|
||||
|
||||
# Track list
|
||||
self.track_list = QListWidget()
|
||||
|
||||
# Note input on one line
|
||||
self.note_label = QLabel("Note:")
|
||||
self.note_edit = QLineEdit()
|
||||
|
||||
note_layout = QHBoxLayout()
|
||||
note_layout.addWidget(self.note_label)
|
||||
note_layout.addWidget(self.note_edit)
|
||||
|
||||
# Buttons
|
||||
self.add_btn = QPushButton("Add")
|
||||
self.add_close_btn = QPushButton("Add and close")
|
||||
self.close_btn = QPushButton("Close")
|
||||
|
||||
self.add_btn.clicked.connect(self.add_clicked)
|
||||
self.add_close_btn.clicked.connect(self.add_and_close_clicked)
|
||||
self.close_btn.clicked.connect(self.close)
|
||||
|
||||
btn_layout = QHBoxLayout()
|
||||
btn_layout.addWidget(self.add_btn)
|
||||
btn_layout.addWidget(self.add_close_btn)
|
||||
btn_layout.addWidget(self.close_btn)
|
||||
|
||||
# Main layout
|
||||
layout = QVBoxLayout()
|
||||
layout.addLayout(title_layout)
|
||||
layout.addWidget(self.track_list)
|
||||
layout.addLayout(note_layout)
|
||||
layout.addLayout(btn_layout)
|
||||
|
||||
self.setLayout(layout)
|
||||
self.resize(600, 400)
|
||||
|
||||
def update_list(self, text: str):
|
||||
self.track_list.clear()
|
||||
if text.strip() == "":
|
||||
# Do not search or populate list if input is empty
|
||||
return
|
||||
|
||||
tracks = search_titles(text)
|
||||
for track in tracks:
|
||||
duration_str = format_duration(track.duration)
|
||||
last_played_str = friendly_last_played(track.lastplayed)
|
||||
item_str = f"{track.title} - {track.artist} [{duration_str}] {last_played_str}"
|
||||
item = QListWidgetItem(item_str)
|
||||
item.setData(Qt.ItemDataRole.UserRole, track.track_id)
|
||||
self.track_list.addItem(item)
|
||||
|
||||
def get_selected_track_id(self) -> int | None:
|
||||
selected_items = self.track_list.selectedItems()
|
||||
if selected_items:
|
||||
return selected_items[0].data(Qt.ItemDataRole.UserRole)
|
||||
return None
|
||||
|
||||
def add_clicked(self):
|
||||
track_id = self.get_selected_track_id()
|
||||
if track_id is not None:
|
||||
note_text = self.note_edit.text()
|
||||
self.signal_insert_track.emit(track_id, note_text)
|
||||
|
||||
self.title_edit.clear()
|
||||
self.note_edit.clear()
|
||||
self.track_list.clear()
|
||||
self.title_edit.setFocus()
|
||||
|
||||
def add_and_close_clicked(self):
|
||||
track_id = self.get_selected_track_id()
|
||||
if track_id is not None:
|
||||
note_text = self.note_edit.text()
|
||||
self.signal_insert_track.emit(track_id, note_text)
|
||||
self.accept()
|
||||
|
||||
# Test harness (for quick testing)
|
||||
if __name__ == "__main__":
|
||||
app = QApplication(sys.argv)
|
||||
|
||||
dialog = TrackInsertDialog()
|
||||
|
||||
def print_inserted(track_id, note):
|
||||
print(f"Inserted track ID: {track_id}, Note: '{note}'")
|
||||
|
||||
dialog.signal_insert_track.connect(print_inserted)
|
||||
dialog.exec()
|
||||
|
||||
@ -1,78 +0,0 @@
|
||||
# Standard library imports
|
||||
import unittest
|
||||
|
||||
# PyQt imports
|
||||
|
||||
# Third party imports
|
||||
|
||||
# App imports
|
||||
from app import playlistmodel
|
||||
from app import repository
|
||||
from app.models import db
|
||||
|
||||
|
||||
class MyTestCase(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
"""Runs once before any test in this class"""
|
||||
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
"""Runs once after all tests"""
|
||||
|
||||
pass
|
||||
|
||||
def setUp(self):
|
||||
"""Runs before each test"""
|
||||
|
||||
db.create_all()
|
||||
|
||||
# Create a playlist and model
|
||||
playlist_name = "my playlist"
|
||||
self.playlist = repository.create_playlist(name=playlist_name, template_id=0)
|
||||
assert self.playlist
|
||||
self.model = playlistmodel.PlaylistModel(
|
||||
self.playlist.playlist_id, is_template=False
|
||||
)
|
||||
assert self.model
|
||||
|
||||
# Create tracks
|
||||
track1_path = "testdata/isa.mp3"
|
||||
self.track1 = repository.create_track(track1_path)
|
||||
|
||||
track2_path = "testdata/mom.mp3"
|
||||
self.track2 = repository.create_track(track2_path)
|
||||
|
||||
# Add tracks and header to playlist
|
||||
self.row0 = repository.insert_row(
|
||||
self.playlist.playlist_id,
|
||||
row_number=0,
|
||||
track_id=self.track1.track_id,
|
||||
note="track 1",
|
||||
)
|
||||
self.row1 = repository.insert_row(
|
||||
self.playlist.playlist_id,
|
||||
row_number=1,
|
||||
track_id=0,
|
||||
note="Header row",
|
||||
)
|
||||
self.row2 = repository.insert_row(
|
||||
self.playlist.playlist_id,
|
||||
row_number=2,
|
||||
track_id=self.track2.track_id,
|
||||
note="track 2",
|
||||
)
|
||||
|
||||
def tearDown(self):
|
||||
"""Runs after each test"""
|
||||
|
||||
db.drop_all()
|
||||
|
||||
def test_add_track_to_header(self):
|
||||
"""Add a track to a header row"""
|
||||
|
||||
repository.add_track_to_header(self.row1.playlistrow_id, self.track2.track_id)
|
||||
result = repository.get_playlist_row(self.row1.playlistrow_id)
|
||||
assert result.track_id == self.track2.track_id
|
||||
@ -134,122 +134,6 @@ class TestMMMiscRowMove(unittest.TestCase):
|
||||
def tearDown(self):
|
||||
db.drop_all()
|
||||
|
||||
def test_move_rows_test2(self):
|
||||
# move row 3 to row 5
|
||||
self.model.move_rows([3], 5)
|
||||
# Check we have all rows and plr_rownums are correct
|
||||
for row in range(self.model.rowCount()):
|
||||
assert row in self.model.playlist_rows
|
||||
assert self.model.playlist_rows[row].row_number == row
|
||||
if row not in [3, 4, 5]:
|
||||
assert self.model.playlist_rows[row].note == str(row)
|
||||
elif row == 3:
|
||||
assert self.model.playlist_rows[row].note == str(4)
|
||||
elif row == 4:
|
||||
assert self.model.playlist_rows[row].note == str(3)
|
||||
elif row == 5:
|
||||
assert self.model.playlist_rows[row].note == str(5)
|
||||
|
||||
def test_move_rows_test3(self):
|
||||
# move row 4 to row 3
|
||||
|
||||
self.model.move_rows([4], 3)
|
||||
|
||||
# Check we have all rows and plr_rownums are correct
|
||||
for row in range(self.model.rowCount()):
|
||||
assert row in self.model.playlist_rows
|
||||
assert self.model.playlist_rows[row].row_number == row
|
||||
if row not in [3, 4]:
|
||||
assert self.model.playlist_rows[row].note == str(row)
|
||||
elif row == 3:
|
||||
assert self.model.playlist_rows[row].note == str(4)
|
||||
elif row == 4:
|
||||
assert self.model.playlist_rows[row].note == str(3)
|
||||
|
||||
def test_move_rows_test4(self):
|
||||
# move row 4 to row 2
|
||||
|
||||
self.model.move_rows([4], 2)
|
||||
|
||||
# Check we have all rows and plr_rownums are correct
|
||||
for row in range(self.model.rowCount()):
|
||||
assert row in self.model.playlist_rows
|
||||
assert self.model.playlist_rows[row].row_number == row
|
||||
if row not in [2, 3, 4]:
|
||||
assert self.model.playlist_rows[row].note == str(row)
|
||||
elif row == 2:
|
||||
assert self.model.playlist_rows[row].note == str(4)
|
||||
elif row == 3:
|
||||
assert self.model.playlist_rows[row].note == str(2)
|
||||
elif row == 4:
|
||||
assert self.model.playlist_rows[row].note == str(3)
|
||||
|
||||
def test_move_rows_test5(self):
|
||||
# move rows [1, 4, 5, 10] → 8
|
||||
|
||||
self.model.move_rows([1, 4, 5, 10], 8)
|
||||
|
||||
# Check we have all rows and plr_rownums are correct
|
||||
new_order = []
|
||||
for row in range(self.model.rowCount()):
|
||||
assert row in self.model.playlist_rows
|
||||
assert self.model.playlist_rows[row].row_number == row
|
||||
new_order.append(int(self.model.playlist_rows[row].note))
|
||||
assert new_order == [0, 2, 3, 6, 7, 1, 4, 5, 10, 8, 9]
|
||||
|
||||
def test_move_rows_test6(self):
|
||||
# move rows [3, 6] → 5
|
||||
|
||||
self.model.move_rows([3, 6], 5)
|
||||
|
||||
# Check we have all rows and plr_rownums are correct
|
||||
new_order = []
|
||||
for row in range(self.model.rowCount()):
|
||||
assert row in self.model.playlist_rows
|
||||
assert self.model.playlist_rows[row].row_number == row
|
||||
new_order.append(int(self.model.playlist_rows[row].note))
|
||||
assert new_order == [0, 1, 2, 4, 3, 6, 5, 7, 8, 9, 10]
|
||||
|
||||
def test_move_rows_test7(self):
|
||||
# move rows [3, 5, 6] → 8
|
||||
|
||||
self.model.move_rows([3, 5, 6], 8)
|
||||
|
||||
# Check we have all rows and plr_rownums are correct
|
||||
new_order = []
|
||||
for row in range(self.model.rowCount()):
|
||||
assert row in self.model.playlist_rows
|
||||
assert self.model.playlist_rows[row].row_number == row
|
||||
new_order.append(int(self.model.playlist_rows[row].note))
|
||||
assert new_order == [0, 1, 2, 4, 7, 3, 5, 6, 8, 9, 10]
|
||||
|
||||
def test_move_rows_test8(self):
|
||||
# move rows [7, 8, 10] → 5
|
||||
|
||||
self.model.move_rows([7, 8, 10], 5)
|
||||
|
||||
# Check we have all rows and plr_rownums are correct
|
||||
new_order = []
|
||||
for row in range(self.model.rowCount()):
|
||||
assert row in self.model.playlist_rows
|
||||
assert self.model.playlist_rows[row].row_number == row
|
||||
new_order.append(int(self.model.playlist_rows[row].note))
|
||||
assert new_order == [0, 1, 2, 3, 4, 7, 8, 10, 5, 6, 9]
|
||||
|
||||
def test_move_rows_test9(self):
|
||||
# move rows [1, 2, 3] → 0
|
||||
# Replicate issue 244
|
||||
|
||||
self.model.move_rows([0, 1, 2, 3], 0)
|
||||
|
||||
# Check we have all rows and plr_rownums are correct
|
||||
new_order = []
|
||||
for row in range(self.model.rowCount()):
|
||||
assert row in self.model.playlist_rows
|
||||
assert self.model.playlist_rows[row].row_number == row
|
||||
new_order.append(int(self.model.playlist_rows[row].note))
|
||||
assert new_order == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
|
||||
|
||||
def test_insert_header_row_end(self):
|
||||
# insert header row at end of playlist
|
||||
|
||||
|
||||
239
tests/test_repository.py
Normal file
239
tests/test_repository.py
Normal file
@ -0,0 +1,239 @@
|
||||
# Standard library imports
|
||||
import unittest
|
||||
|
||||
# PyQt imports
|
||||
|
||||
# Third party imports
|
||||
|
||||
# App imports
|
||||
from app import playlistmodel
|
||||
from app import repository
|
||||
from app.models import db
|
||||
from classes import PlaylistDTO
|
||||
from playlistmodel import PlaylistModel
|
||||
|
||||
|
||||
class MyTestCase(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
"""Runs once before any test in this class"""
|
||||
|
||||
cls.isa_path = "testdata/isa.mp3"
|
||||
cls.isa_title = "I'm So Afraid"
|
||||
cls.isa_artist = "Fleetwood Mac"
|
||||
cls.mom_path = "testdata/mom.mp3"
|
||||
cls.mom_title = "Man of Mystery"
|
||||
cls.mom_artist = "The Shadows"
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
"""Runs once after all tests"""
|
||||
|
||||
pass
|
||||
|
||||
def setUp(self):
|
||||
"""Runs before each test"""
|
||||
|
||||
db.create_all()
|
||||
|
||||
def create_playlist_and_model(self, playlist_name: str) -> (PlaylistDTO, PlaylistModel):
|
||||
# Create a playlist and model
|
||||
playlist = repository.create_playlist(name=playlist_name, template_id=0)
|
||||
assert playlist
|
||||
model = playlistmodel.PlaylistModel(playlist.playlist_id, is_template=False)
|
||||
assert model
|
||||
|
||||
return (playlist, model)
|
||||
|
||||
def create_playlist_model_tracks(self, playlist_name: str):
|
||||
(playlist, model) = self.create_playlist_and_model("my playlist")
|
||||
# Create tracks
|
||||
self.track1 = repository.create_track(self.isa_path)
|
||||
|
||||
self.track2 = repository.create_track(self.mom_path)
|
||||
|
||||
# Add tracks and header to playlist
|
||||
self.row0 = repository.insert_row(
|
||||
playlist.playlist_id,
|
||||
row_number=0,
|
||||
track_id=self.track1.track_id,
|
||||
note="track 1",
|
||||
)
|
||||
self.row1 = repository.insert_row(
|
||||
playlist.playlist_id,
|
||||
row_number=1,
|
||||
track_id=0,
|
||||
note="Header row",
|
||||
)
|
||||
self.row2 = repository.insert_row(
|
||||
playlist.playlist_id,
|
||||
row_number=2,
|
||||
track_id=self.track2.track_id,
|
||||
note="track 2",
|
||||
)
|
||||
|
||||
def create_rows(self, playlist_name: str, number_of_rows: int) -> (PlaylistDTO, PlaylistModel):
|
||||
(playlist, model) = self.create_playlist_and_model(playlist_name)
|
||||
for row_number in range(number_of_rows):
|
||||
repository.insert_row(
|
||||
playlist.playlist_id, row_number, None, str(row_number)
|
||||
)
|
||||
|
||||
return (playlist, model)
|
||||
|
||||
def tearDown(self):
|
||||
"""Runs after each test"""
|
||||
|
||||
db.drop_all()
|
||||
|
||||
def test_add_track_to_header(self):
|
||||
"""Add a track to a header row"""
|
||||
|
||||
self.create_playlist_model_tracks("my playlist")
|
||||
repository.add_track_to_header(self.row1.playlistrow_id, self.track2.track_id)
|
||||
result = repository.get_playlist_row(self.row1.playlistrow_id)
|
||||
assert result.track_id == self.track2.track_id
|
||||
|
||||
def test_create_track(self):
|
||||
repository.create_track(self.isa_path)
|
||||
results = repository.get_all_tracks()
|
||||
assert len(results) == 1
|
||||
assert results[0].path == self.isa_path
|
||||
|
||||
def test_get_track_by_id(self):
|
||||
dto = repository.create_track(self.isa_path)
|
||||
result = repository.track_by_id(dto.track_id)
|
||||
assert result.path == self.isa_path
|
||||
|
||||
def test_get_track_by_artist(self):
|
||||
_ = repository.create_track(self.isa_path)
|
||||
_ = repository.create_track(self.mom_path)
|
||||
result_isa = repository.tracks_like_artist(self.isa_artist)
|
||||
assert len(result_isa) == 1
|
||||
assert result_isa[0].artist == self.isa_artist
|
||||
result_mom = repository.tracks_like_artist(self.mom_artist)
|
||||
assert len(result_mom) == 1
|
||||
assert result_mom[0].artist == self.mom_artist
|
||||
|
||||
def test_get_track_by_title(self):
|
||||
_ = repository.create_track(self.isa_path)
|
||||
_ = repository.create_track(self.mom_path)
|
||||
result_isa = repository.tracks_like_title(self.isa_title)
|
||||
assert len(result_isa) == 1
|
||||
assert result_isa[0].title == self.isa_title
|
||||
result_mom = repository.tracks_like_title(self.mom_title)
|
||||
assert len(result_mom) == 1
|
||||
assert result_mom[0].title == self.mom_title
|
||||
|
||||
def test_move_rows_test1(self):
|
||||
# move row 3 to row 5
|
||||
|
||||
number_of_rows = 10
|
||||
(playlist, model) = self.create_rows("test_move_rows_test1", number_of_rows)
|
||||
|
||||
repository.move_rows_within_playlist(playlist.playlist_id, [3], 5)
|
||||
|
||||
# Check we have all rows and plr_rownums are correct
|
||||
new_order = []
|
||||
for row in repository.get_playlist_rows(playlist.playlist_id):
|
||||
new_order.append(int(row.note))
|
||||
assert new_order == [0, 1, 2, 4, 5, 3, 6, 7, 8, 9]
|
||||
|
||||
def test_move_rows_test2(self):
|
||||
# move row 4 to row 3
|
||||
|
||||
number_of_rows = 10
|
||||
(playlist, model) = self.create_rows("test_move_rows_test2", number_of_rows)
|
||||
|
||||
repository.move_rows_within_playlist(playlist.playlist_id, [4], 3)
|
||||
|
||||
# Check we have all rows and plr_rownums are correct
|
||||
new_order = []
|
||||
for row in repository.get_playlist_rows(playlist.playlist_id):
|
||||
new_order.append(int(row.note))
|
||||
assert new_order == [0, 1, 2, 4, 3, 5, 6, 7, 8, 9]
|
||||
|
||||
def test_move_rows_test3(self):
|
||||
# move row 4 to row 2
|
||||
|
||||
number_of_rows = 10
|
||||
(playlist, model) = self.create_rows("test_move_rows_test3", number_of_rows)
|
||||
|
||||
repository.move_rows_within_playlist(playlist.playlist_id, [4], 2)
|
||||
|
||||
# Check we have all rows and plr_rownums are correct
|
||||
new_order = []
|
||||
for row in repository.get_playlist_rows(playlist.playlist_id):
|
||||
new_order.append(int(row.note))
|
||||
assert new_order == [0, 1, 4, 2, 3, 5, 6, 7, 8, 9]
|
||||
|
||||
def test_move_rows_test4(self):
|
||||
# move rows [1, 4, 5, 10] → 8
|
||||
|
||||
number_of_rows = 11
|
||||
(playlist, model) = self.create_rows("test_move_rows_test4", number_of_rows)
|
||||
|
||||
repository.move_rows_within_playlist(playlist.playlist_id, [1, 4, 5, 10], 8)
|
||||
|
||||
# Check we have all rows and plr_rownums are correct
|
||||
new_order = []
|
||||
for row in repository.get_playlist_rows(playlist.playlist_id):
|
||||
new_order.append(int(row.note))
|
||||
assert new_order == [0, 2, 3, 6, 7, 8, 9, 1, 4, 5, 10]
|
||||
|
||||
def test_move_rows_test5(self):
|
||||
# move rows [3, 6] → 5
|
||||
|
||||
number_of_rows = 11
|
||||
(playlist, model) = self.create_rows("test_move_rows_test5", number_of_rows)
|
||||
|
||||
repository.move_rows_within_playlist(playlist.playlist_id, [3, 6], 5)
|
||||
|
||||
# Check we have all rows and plr_rownums are correct
|
||||
new_order = []
|
||||
for row in repository.get_playlist_rows(playlist.playlist_id):
|
||||
new_order.append(int(row.note))
|
||||
assert new_order == [0, 1, 2, 4, 5, 3, 6, 7, 8, 9, 10]
|
||||
|
||||
def test_move_rows_test6(self):
|
||||
# move rows [3, 5, 6] → 8
|
||||
|
||||
number_of_rows = 11
|
||||
(playlist, model) = self.create_rows("test_move_rows_test6", number_of_rows)
|
||||
|
||||
repository.move_rows_within_playlist(playlist.playlist_id, [3, 5, 6], 8)
|
||||
|
||||
# Check we have all rows and plr_rownums are correct
|
||||
new_order = []
|
||||
for row in repository.get_playlist_rows(playlist.playlist_id):
|
||||
new_order.append(int(row.note))
|
||||
assert new_order == [0, 1, 2, 4, 7, 8, 9, 10, 3, 5, 6]
|
||||
|
||||
def test_move_rows_test7(self):
|
||||
# move rows [7, 8, 10] → 5
|
||||
|
||||
number_of_rows = 11
|
||||
(playlist, model) = self.create_rows("test_move_rows_test6", number_of_rows)
|
||||
|
||||
repository.move_rows_within_playlist(playlist.playlist_id, [7, 8, 10], 5)
|
||||
|
||||
# Check we have all rows and plr_rownums are correct
|
||||
new_order = []
|
||||
for row in repository.get_playlist_rows(playlist.playlist_id):
|
||||
new_order.append(int(row.note))
|
||||
assert new_order == [0, 1, 2, 3, 4, 7, 8, 10, 5, 6, 9]
|
||||
|
||||
def test_move_rows_test8(self):
|
||||
# move rows [1, 2, 3] → 0
|
||||
# Replicate issue 244
|
||||
|
||||
number_of_rows = 11
|
||||
(playlist, model) = self.create_rows("test_move_rows_test6", number_of_rows)
|
||||
|
||||
repository.move_rows_within_playlist(playlist.playlist_id, [0, 1, 2, 3], 0)
|
||||
|
||||
# Check we have all rows and plr_rownums are correct
|
||||
new_order = []
|
||||
for row in repository.get_playlist_rows(playlist.playlist_id):
|
||||
new_order.append(int(row.note))
|
||||
assert new_order == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
|
||||
Loading…
Reference in New Issue
Block a user