Compare commits

..

No commits in common. "86a1678f41dbc048034074f2ae3a8781969fe60c" and "36b3b8c323e6a5e2224bd580ff1ead519ffd2a11" have entirely different histories.

9 changed files with 426 additions and 365 deletions

View File

@ -15,6 +15,19 @@ else:
dbname = MYSQL_CONNECT.split("/")[-1] dbname = MYSQL_CONNECT.split("/")[-1]
log.debug(f"Database: {dbname}") log.debug(f"Database: {dbname}")
# MM_ENV = os.environ.get('MM_ENV', 'PRODUCTION')
# testing = False
# if MM_ENV == 'TESTING':
# dbname = os.environ.get('MM_TESTING_DBNAME', 'musicmuster_testing')
# dbuser = os.environ.get('MM_TESTING_DBUSER', 'musicmuster_testing')
# dbpw = os.environ.get('MM_TESTING_DBPW', 'musicmuster_testing')
# dbhost = os.environ.get('MM_TESTING_DBHOST', 'localhost')
# testing = True
# else:
# raise ValueError(f"Unknown MusicMuster environment: {MM_ENV=}")
#
# MYSQL_CONNECT = f"mysql+mysqldb://{dbuser}:{dbpw}@{dbhost}/{dbname}"
engine = create_engine( engine = create_engine(
MYSQL_CONNECT, MYSQL_CONNECT,
echo=Config.DISPLAY_SQL, echo=Config.DISPLAY_SQL,
@ -30,7 +43,7 @@ def Session() -> Generator[scoped_session, None, None]:
file = frame.filename file = frame.filename
function = frame.function function = frame.function
lineno = frame.lineno lineno = frame.lineno
Session = scoped_session(sessionmaker(bind=engine)) Session = scoped_session(sessionmaker(bind=engine, future=True))
log.debug(f"SqlA: session acquired [{hex(id(Session))}]") log.debug(f"SqlA: session acquired [{hex(id(Session))}]")
log.debug( log.debug(
f"Session acquisition: {file}:{function}:{lineno} " f"[{hex(id(Session))}]" f"Session acquisition: {file}:{function}:{lineno} " f"[{hex(id(Session))}]"

View File

@ -97,34 +97,6 @@ class NoteColours(Base):
f"colour={self.colour}>" f"colour={self.colour}>"
) )
def __init__(
self,
session: scoped_session,
substring: str,
colour: str,
enabled: bool = True,
is_regex: bool = False,
is_casesensitive: bool = False,
order: Optional[int] = 0,
) -> None:
self.substring = substring
self.colour = colour
self.enabled = enabled
self.is_regex = is_regex
self.is_casesensitive = is_casesensitive
self.order = order
session.add(self)
session.flush()
@classmethod
def get_all(cls, session: scoped_session) -> Sequence["NoteColours"]:
"""
Return all records
"""
return session.execute(select(cls)).scalars().all()
@staticmethod @staticmethod
def get_colour(session: scoped_session, text: str) -> Optional[str]: def get_colour(session: scoped_session, text: str) -> Optional[str]:
""" """
@ -413,9 +385,9 @@ class PlaylistRows(Base):
self, self,
session: scoped_session, session: scoped_session,
playlist_id: int, playlist_id: int,
track_id: Optional[int],
row_number: int, row_number: int,
note: str = "", note: str = "",
track_id: Optional[int] = None,
) -> None: ) -> None:
"""Create PlaylistRows object""" """Create PlaylistRows object"""
@ -448,13 +420,7 @@ class PlaylistRows(Base):
) )
for plr in src_rows: for plr in src_rows:
PlaylistRows( PlaylistRows(session, dst_id, plr.track_id, plr.plr_rownum, plr.note)
session=session,
playlist_id=dst_id,
row_number=plr.plr_rownum,
note=plr.note,
track_id=plr.track_id,
)
@staticmethod @staticmethod
def delete_higher_rows( def delete_higher_rows(
@ -487,7 +453,7 @@ class PlaylistRows(Base):
.options(joinedload(cls.track)) .options(joinedload(cls.track))
.where( .where(
PlaylistRows.playlist_id == playlist_id, PlaylistRows.playlist_id == playlist_id,
PlaylistRows.plr_rownum == row_number, PlaylistRows.plr_rownum == row_number
) )
# .options(joinedload(Tracks.playdates)) # .options(joinedload(Tracks.playdates))
) )
@ -765,7 +731,7 @@ class Tracks(Base):
fade_at: int, fade_at: int,
silence_at: int, silence_at: int,
mtime: int, mtime: int,
bitrate: int, bitrate: int
): ):
self.path = path self.path = path
self.title = title self.title = title
@ -798,11 +764,9 @@ class Tracks(Base):
""" """
try: try:
return ( return session.execute(
session.execute(select(Tracks).where(Tracks.path == path)) select(Tracks).where(Tracks.path == path)
.unique() ).unique().scalar_one()
.scalar_one()
)
except NoResultFound: except NoResultFound:
return None return None

View File

@ -1,8 +1,6 @@
from datetime import datetime from datetime import datetime
from enum import auto, Enum from enum import auto, Enum
from typing import List, Optional, TYPE_CHECKING from typing import Optional, TYPE_CHECKING
from dbconfig import scoped_session, Session
from PyQt6.QtCore import ( from PyQt6.QtCore import (
QAbstractTableModel, QAbstractTableModel,
@ -19,6 +17,8 @@ from PyQt6.QtGui import (
from config import Config from config import Config
from dbconfig import Session
from helpers import ( from helpers import (
file_is_unreadable, file_is_unreadable,
) )
@ -282,110 +282,6 @@ class PlaylistModel(QAbstractTableModel):
return QVariant() return QVariant()
def insert_row(self, session: scoped_session, row_number: int) -> int:
"""
Make space for a row at row_number. If row_number is greater
than length of list plus 1, or if row number is -1, put row at
end of list.
Return new row number that has an empty, valid entry in self.playlist_rows.
"""
if row_number > len(self.playlist_rows) or row_number == -1:
new_row_number = len(self.playlist_rows) + 1
elif row_number < 0:
raise ValueError(
f"playlistmodel.insert_row, invalid row number ({row_number})"
)
else:
new_row_number = row_number
# Move rows below new row down
modified_rows: List[int] = []
for i in reversed(range(new_row_number, len(self.playlist_rows))):
self.playlist_rows[i + 1] = self.playlist_rows[i]
self.playlist_rows[i + 1].plr_rownum += 1
modified_rows.append(i + 1)
# Replace old row
self.playlist_rows[new_row_number] = PlaylistRowData(
PlaylistRows(
session=session, playlist_id=self.playlist_id, row_number=new_row_number
)
)
# Refresh rows
self.invalidate_rows(modified_rows)
return new_row_number
def invalidate_rows(self, modified_rows: List[int]) -> None:
"""
Signal to view to refresh invlidated rows
"""
for modified_row in modified_rows:
self.dataChanged.emit(
self.index(modified_row, 0),
self.index(modified_row, self.columnCount()),
)
def move_rows(self, from_rows: List[int], to_row: int) -> None:
"""
Move the playlist rows given to to_row and below.
"""
# New thinking:
# Move relocated rows to correct place in mirror array
# Copy souurce to mirror around them
# Move mirror to original
# Fixup plr rownumbers and update db and display
# Signal rows have changed
# Prep
# modified_rows: List[int] = []
# moving_rows: dict[int, PlaylistRowData] = {}
new_playlist_rows: dict[int, PlaylistRowData] = {}
# Move the from_row records from the playlist_rows dict to the
# new_playlist_rows dict
next_to_row = to_row
for from_row in from_rows:
new_playlist_rows[next_to_row] = self.playlist_rows[from_row]
del self.playlist_rows[from_row]
next_to_row += 1
# Move the remaining rows to the gaps in new_playlist_rows
new_row = 0
for old_row in self.playlist_rows.keys():
# Find next gap
while new_row in new_playlist_rows:
new_row += 1
new_playlist_rows[new_row] = self.playlist_rows[old_row]
new_row += 1
# Make copy of rows live
self.playlist_rows = new_playlist_rows
# Update PlaylistRows table and notify display of rows that
# moved
with Session() as session:
for idx in range(len(self.playlist_rows)):
if self.playlist_rows[idx].plr_rownum == idx:
continue
# Row number in this row is incorred. Fix it in
# database:
plr = session.get(PlaylistRows, self.playlist_rows[idx].plrid)
if not plr:
print(f"\nCan't find plr in playlistmodel:move_rows {idx=}")
continue
plr.plr_rownum = idx
# Fix in self.playlist_rows
self.playlist_rows[idx].plr_rownum = idx
# Update display
self.invalidate_rows([idx])
print(f"Fixup {idx=}")
def refresh_data(self): def refresh_data(self):
"""Populate dicts for data calls""" """Populate dicts for data calls"""
@ -398,7 +294,6 @@ class PlaylistModel(QAbstractTableModel):
"""Populate dict for one row for data calls""" """Populate dict for one row for data calls"""
p = PlaylistRows.deep_row(session, self.playlist_id, row_number) p = PlaylistRows.deep_row(session, self.playlist_id, row_number)
self.playlist_rows.clear()
self.playlist_rows[p.plr_rownum] = PlaylistRowData(p) self.playlist_rows[p.plr_rownum] = PlaylistRowData(p)
def rowCount(self, index: QModelIndex = QModelIndex()) -> int: def rowCount(self, index: QModelIndex = QModelIndex()) -> int:

View File

@ -227,7 +227,7 @@ class PlaylistTab(QTableView):
and 0 <= max(from_rows) <= self.model().rowCount() and 0 <= max(from_rows) <= self.model().rowCount()
and 0 <= to_row <= self.model().rowCount() and 0 <= to_row <= self.model().rowCount()
): ):
self.model().move_rows(from_rows, to_row) print(f"move_rows({from_rows=}, {to_row=})")
event.accept() event.accept()
super().dropEvent(event) super().dropEvent(event)
@ -605,7 +605,7 @@ class PlaylistTab(QTableView):
# """ # """
# row_number = self.get_new_row_number() # row_number = self.get_new_row_number()
# TODO: check arg order plr = PlaylistRows(session, self.playlist_id, None, row_number, note) # plr = PlaylistRows(session, self.playlist_id, None, row_number, note)
# self.insert_row(session, plr) # self.insert_row(session, plr)
# self._set_row_header_text(session, row_number, note) # self._set_row_header_text(session, row_number, note)
# self.save_playlist(session) # self.save_playlist(session)
@ -694,7 +694,7 @@ class PlaylistTab(QTableView):
# return self._move_row(session, existing_plr, row_number) # return self._move_row(session, existing_plr, row_number)
# # Build playlist_row object # # Build playlist_row object
# plr = TODO: check arg order PlaylistRows(session, self.playlist_id, track.id, row_number, note) # plr = PlaylistRows(session, self.playlist_id, track.id, row_number, note)
# self.insert_row(session, plr) # self.insert_row(session, plr)
# self.save_playlist(session) # self.save_playlist(session)
# self._update_start_end_times(session) # self._update_start_end_times(session)

View File

@ -1,50 +1,41 @@
# https://itnext.io/setting-up-transactional-tests-with-pytest-and-sqlalchemy-b2d726347629 # https://itnext.io/setting-up-transactional-tests-with-pytest-and-sqlalchemy-b2d726347629
import pytest import pytest
import helpers
# Flake8 doesn't like the sys.append within imports
# import sys
# sys.path.append("app")
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.orm import scoped_session, sessionmaker
from app.models import Base, Tracks
DB_CONNECTION = \ @pytest.fixture(scope="session")
"mysql+mysqldb://musicmuster_testing:musicmuster_testing@localhost/dev_musicmuster_testing" def connection():
engine = create_engine(
"mysql+mysqldb://musicmuster_testing:musicmuster_testing@"
"localhost/musicmuster_testing"
)
return engine.connect()
@pytest.fixture(scope="session") @pytest.fixture(scope="session")
def db_engine(): def setup_database(connection):
engine = create_engine(DB_CONNECTION, isolation_level="READ COMMITTED") from app.models import Base # noqa E402
Base.metadata.create_all(engine)
yield engine Base.metadata.bind = connection
engine.dispose() Base.metadata.create_all()
# seed_database()
yield
Base.metadata.drop_all()
@pytest.fixture(scope="function") @pytest.fixture
def Session(db_engine): def session(setup_database, connection):
connection = db_engine.connect()
transaction = connection.begin() transaction = connection.begin()
sm = sessionmaker(bind=connection) yield scoped_session(
session = scoped_session(sm) sessionmaker(autocommit=False, autoflush=False, bind=connection)
# print(f"PyTest SqlA: session acquired [{hex(id(session))}]") )
yield session
# print(f" PyTest SqlA: session released and cleaned up [{hex(id(session))}]")
session.remove()
transaction.rollback() transaction.rollback()
connection.close()
@pytest.fixture(scope="function")
def track1(session):
track_path = "testdata/isa.mp3"
metadata = helpers.get_file_metadata(track_path)
track = Tracks(session, **metadata)
return track
@pytest.fixture(scope="function")
def track2(session):
track_path = "testdata/mom.mp3"
metadata = helpers.get_file_metadata(track_path)
track = Tracks(session, **metadata)
return track

View File

@ -52,11 +52,6 @@ build-backend = "poetry.core.masonry.api"
# mypy_path = "/home/kae/.cache/pypoetry/virtualenvs/musicmuster-oWgGw1IG-py3.9:/home/kae/git/musicmuster/app" # mypy_path = "/home/kae/.cache/pypoetry/virtualenvs/musicmuster-oWgGw1IG-py3.9:/home/kae/git/musicmuster/app"
mypy_path = "/home/kae/git/musicmuster/app" mypy_path = "/home/kae/git/musicmuster/app"
[tool.pytest.ini_options]
addopts = "--exitfirst --showlocals --capture=no"
pythonpath = [".", "app"]
filterwarnings = "ignore:'audioop' is deprecated"
[tool.vulture] [tool.vulture]
exclude = ["migrations", "app/ui", "archive"] exclude = ["migrations", "app/ui", "archive"]
paths = ["app"] paths = ["app"]

2
pytest.ini Normal file
View File

@ -0,0 +1,2 @@
[pytest]
addopts = -xls

View File

@ -1,9 +1,8 @@
import os.path import os.path
import helpers
from app.models import ( from app.models import (
NoteColours, NoteColours,
Notes,
Playdates, Playdates,
Playlists, Playlists,
Tracks, Tracks,
@ -13,7 +12,6 @@ from app.models import (
def test_notecolours_get_colour(session): def test_notecolours_get_colour(session):
"""Create a colour record and retrieve all colours""" """Create a colour record and retrieve all colours"""
print(">>>text_notcolours_get_colour")
note_colour = "#0bcdef" note_colour = "#0bcdef"
NoteColours(session, substring="substring", colour=note_colour) NoteColours(session, substring="substring", colour=note_colour)
@ -26,7 +24,6 @@ def test_notecolours_get_colour(session):
def test_notecolours_get_all(session): def test_notecolours_get_all(session):
"""Create two colour records and retrieve them all""" """Create two colour records and retrieve them all"""
print(">>>text_notcolours_get_all")
note1_colour = "#1bcdef" note1_colour = "#1bcdef"
note2_colour = "#20ff00" note2_colour = "#20ff00"
NoteColours(session, substring="note1", colour=note1_colour) NoteColours(session, substring="note1", colour=note1_colour)
@ -55,84 +52,172 @@ def test_notecolours_get_colour_match(session):
assert result == note_colour assert result == note_colour
def test_playdates_add_playdate(session, track1): def test_notes_creation(session):
# We need a playlist
playlist = Playlists(session, "my playlist")
note_text = "note text"
note = Notes(session, playlist.id, 0, note_text)
assert note
notes = session.query(Notes).all()
assert len(notes) == 1
assert notes[0].note == note_text
def test_notes_delete(session):
# We need a playlist
playlist = Playlists(session, "my playlist")
note_text = "note text"
note = Notes(session, playlist.id, 0, note_text)
assert note
notes = session.query(Notes).all()
assert len(notes) == 1
assert notes[0].note == note_text
note.delete_note(session)
notes = session.query(Notes).all()
assert len(notes) == 0
def test_notes_update_row_only(session):
# We need a playlist
playlist = Playlists(session, "my playlist")
note_text = "note text"
note = Notes(session, playlist.id, 0, note_text)
new_row = 10
note.update_note(session, new_row)
notes = session.query(Notes).all()
assert len(notes) == 1
assert notes[0].row == new_row
def test_notes_update_text(session):
# We need a playlist
playlist = Playlists(session, "my playlist")
note_text = "note text"
note = Notes(session, playlist.id, 0, note_text)
new_text = "This is new"
new_row = 0
note.update_note(session, new_row, new_text)
notes = session.query(Notes).all()
assert len(notes) == 1
assert notes[0].note == new_text
assert notes[0].row == new_row
def test_playdates_add_playdate(session):
"""Test playdate and last_played retrieval""" """Test playdate and last_played retrieval"""
playdate = Playdates(session, track1.id) # We need a track
track_path = "/a/b/c"
track = Tracks(session, track_path)
playdate = Playdates(session, track.id)
assert playdate assert playdate
last_played = Playdates.last_played(session, track1.id) last_played = Playdates.last_played(session, track.id)
assert abs((playdate.lastplayed - last_played).total_seconds()) < 2 assert abs((playdate.lastplayed - last_played).total_seconds()) < 2
def test_playdates_remove_track(session):
"""Test removing a track from a playdate"""
# We need a track
track_path = "/a/b/c"
track = Tracks(session, track_path)
Playdates.remove_track(session, track.id)
last_played = Playdates.last_played(session, track.id)
assert last_played is None
def test_playlist_create(session): def test_playlist_create(session):
playlist = Playlists(session, "my playlist") playlist = Playlists(session, "my playlist")
assert playlist assert playlist
# def test_playlist_add_track(session, track): def test_playlist_add_note(session):
# # We need a playlist note_text = "my note"
# playlist = Playlists(session, "my playlist")
# row = 17 playlist = Playlists(session, "my playlist")
# playlist.add_track(session, track.id, row) assert len(playlist.notes) == 1
playlist_note = playlist.notes[0]
# assert len(playlist.tracks) == 1 assert playlist_note.note == note_text
# playlist_track = playlist.tracks[row]
# assert playlist_track.path == track_path
# def test_playlist_tracks(session): def test_playlist_add_track(session):
# # We need a playlist # We need a playlist
# playlist = Playlists(session, "my playlist") playlist = Playlists(session, "my playlist")
# # We need two tracks # We need a track
# track1_path = "/a/b/c" track_path = "/a/b/c"
# track1_row = 17 track = Tracks(session, track_path)
# track1 = Tracks(session, track1_path)
# track2_path = "/x/y/z" row = 17
# track2_row = 29
# track2 = Tracks(session, track2_path)
# playlist.add_track(session, track1.id, track1_row) playlist.add_track(session, track.id, row)
# playlist.add_track(session, track2.id, track2_row)
# tracks = playlist.tracks assert len(playlist.tracks) == 1
# assert tracks[track1_row] == track1 playlist_track = playlist.tracks[row]
# assert tracks[track2_row] == track2 assert playlist_track.path == track_path
# def test_playlist_notes(session): def test_playlist_tracks(session):
# # We need a playlist # We need a playlist
# playlist = Playlists(session, "my playlist") playlist = Playlists(session, "my playlist")
# # We need two notes # We need two tracks
# note1_text = "note1 text" track1_path = "/a/b/c"
# note1_row = 11 track1_row = 17
# _ = Notes(session, playlist.id, note1_row, note1_text) track1 = Tracks(session, track1_path)
# note2_text = "note2 text" track2_path = "/x/y/z"
# note2_row = 19 track2_row = 29
# _ = Notes(session, playlist.id, note2_row, note2_text) track2 = Tracks(session, track2_path)
# notes = playlist.notes playlist.add_track(session, track1.id, track1_row)
# assert note1_text in [n.note for n in notes] playlist.add_track(session, track2.id, track2_row)
# assert note1_row in [n.row for n in notes]
# assert note2_text in [n.note for n in notes] tracks = playlist.tracks
# assert note2_row in [n.row for n in notes] assert tracks[track1_row] == track1
assert tracks[track2_row] == track2
def test_playlist_notes(session):
# We need a playlist
playlist = Playlists(session, "my playlist")
# We need two notes
note1_text = "note1 text"
note1_row = 11
_ = Notes(session, playlist.id, note1_row, note1_text)
note2_text = "note2 text"
note2_row = 19
_ = Notes(session, playlist.id, note2_row, note2_text)
notes = playlist.notes
assert note1_text in [n.note for n in notes]
assert note1_row in [n.row for n in notes]
assert note2_text in [n.note for n in notes]
assert note2_row in [n.row for n in notes]
def test_playlist_open_and_close(session): def test_playlist_open_and_close(session):
# We need a playlist # We need a playlist
playlist = Playlists(session, "my playlist") playlist = Playlists(session, "my playlist")
assert len(Playlists.get_open(session)) == 0
assert len(Playlists.get_closed(session)) == 1
playlist.mark_open(session, tab_index=0)
assert len(Playlists.get_open(session)) == 1 assert len(Playlists.get_open(session)) == 1
assert len(Playlists.get_closed(session)) == 0 assert len(Playlists.get_closed(session)) == 0
@ -141,6 +226,11 @@ def test_playlist_open_and_close(session):
assert len(Playlists.get_open(session)) == 0 assert len(Playlists.get_open(session)) == 0
assert len(Playlists.get_closed(session)) == 1 assert len(Playlists.get_closed(session)) == 1
playlist.mark_open(session)
assert len(Playlists.get_open(session)) == 1
assert len(Playlists.get_closed(session)) == 0
def test_playlist_get_all_and_by_id(session): def test_playlist_get_all_and_by_id(session):
# We need two playlists # We need two playlists
@ -153,34 +243,250 @@ def test_playlist_get_all_and_by_id(session):
assert len(all_playlists) == 2 assert len(all_playlists) == 2
assert p1_name in [p.name for p in all_playlists] assert p1_name in [p.name for p in all_playlists]
assert p2_name in [p.name for p in all_playlists] assert p2_name in [p.name for p in all_playlists]
assert session.get(Playlists, playlist1.id).name == p1_name assert Playlists.get_by_id(session, playlist1.id).name == p1_name
def test_tracks_get_all_tracks(session, track1, track2): def test_playlist_remove_tracks(session):
# Need two playlists and three tracks
p1_name = "playlist one"
playlist1 = Playlists(session, p1_name)
p2_name = "playlist two"
playlist2 = Playlists(session, p2_name)
track1_path = "/a/b/c"
track1 = Tracks(session, track1_path)
track2_path = "/m/n/o"
track2 = Tracks(session, track2_path)
track3_path = "/x/y/z"
track3 = Tracks(session, track3_path)
# Add all tracks to both playlists
for p in [playlist1, playlist2]:
for t in [track1, track2, track3]:
p.add_track(session, t.id)
assert len(playlist1.tracks) == 3
assert len(playlist2.tracks) == 3
playlist1.remove_track(session, 1)
assert len(playlist1.tracks) == 2
# Check the track itself still exists
original_track = Tracks.get_by_id(session, track1.id)
assert original_track
playlist1.remove_all_tracks(session)
assert len(playlist1.tracks) == 0
assert len(playlist2.tracks) == 3
def test_playlist_get_track_playlists(session):
# Need two playlists and two tracks
p1_name = "playlist one"
playlist1 = Playlists(session, p1_name)
p2_name = "playlist two"
playlist2 = Playlists(session, p2_name)
track1_path = "/a/b/c"
track1 = Tracks(session, track1_path)
track2_path = "/m/n/o"
track2 = Tracks(session, track2_path)
# Put track1 in both playlists, track2 only in playlist1
playlist1.add_track(session, track1.id)
playlist2.add_track(session, track1.id)
playlist1.add_track(session, track2.id)
playlists_track1 = track1.playlists
playlists_track2 = track2.playlists
assert p1_name in [a.playlist.name for a in playlists_track1]
assert p2_name in [a.playlist.name for a in playlists_track1]
assert p1_name in [a.playlist.name for a in playlists_track2]
assert p2_name not in [a.playlist.name for a in playlists_track2]
def test_playlist_move_track(session):
# We need two playlists
p1_name = "playlist one"
p2_name = "playlist two"
playlist1 = Playlists(session, p1_name)
playlist2 = Playlists(session, p2_name)
# Need two tracks # Need two tracks
track1_row = 17
track1_path = "/a/b/c"
track1 = Tracks(session, track1_path)
track2_row = 29
track2_path = "/m/n/o"
track2 = Tracks(session, track2_path)
result = [a.path for a in Tracks.get_all(session)] # Add both to playlist1 and check
assert track1.path in result playlist1.add_track(session, track1.id, track1_row)
assert track2.path in result playlist1.add_track(session, track2.id, track2_row)
tracks = playlist1.tracks
assert tracks[track1_row] == track1
assert tracks[track2_row] == track2
# Move track2 to playlist2 and check
playlist1.move_track(session, [track2_row], playlist2)
tracks1 = playlist1.tracks
tracks2 = playlist2.tracks
assert len(tracks1) == 1
assert len(tracks2) == 1
assert tracks1[track1_row] == track1
assert tracks2[0] == track2
def test_tracks_by_path(session, track1): def test_tracks_get_all_paths(session):
# Need two tracks
track1_path = "/a/b/c"
_ = Tracks(session, track1_path)
track2_path = "/m/n/o"
_ = Tracks(session, track2_path)
assert Tracks.get_by_path(session, track1.path) is track1 result = Tracks.get_all_paths(session)
assert track1_path in result
assert track2_path in result
def test_tracks_by_id(session, track1): def test_tracks_get_all_tracks(session):
# Need two tracks
track1_path = "/a/b/c"
track2_path = "/m/n/o"
assert session.get(Tracks, track1.id) is track1 result = Tracks.get_all_tracks(session)
assert track1_path in [a.path for a in result]
assert track2_path in [a.path for a in result]
def test_tracks_search_artists(session, track1): def test_tracks_by_filename(session):
track1_artist = "Fleetwood Mac" track1_path = "/a/b/c"
track1 = Tracks(session, track1_path)
assert Tracks.get_by_filename(session, os.path.basename(track1_path)) is track1
def test_tracks_by_path(session):
track1_path = "/a/b/c"
track1 = Tracks(session, track1_path)
assert Tracks.get_by_path(session, track1_path) is track1
def test_tracks_by_id(session):
track1_path = "/a/b/c"
track1 = Tracks(session, track1_path)
assert Tracks.get_by_id(session, track1.id) is track1
def test_tracks_rescan(session):
# Get test track
test_track_path = "./testdata/isa.mp3"
test_track_data = "./testdata/isa.py"
track = Tracks(session, test_track_path)
track.rescan(session)
# Get test data
with open(test_track_data) as f:
testdata = eval(f.read())
# Re-read the track
track_read = Tracks.get_by_path(session, test_track_path)
assert track_read.duration == testdata["duration"]
assert track_read.start_gap == testdata["leading_silence"]
# Silence detection can vary, so ± 1 second is OK
assert track_read.fade_at < testdata["fade_at"] + 1000
assert track_read.fade_at > testdata["fade_at"] - 1000
assert track_read.silence_at < testdata["trailing_silence"] + 1000
assert track_read.silence_at > testdata["trailing_silence"] - 1000
def test_tracks_remove_by_path(session):
track1_path = "/a/b/c"
assert len(Tracks.get_all_tracks(session)) == 1
Tracks.remove_by_path(session, track1_path)
assert len(Tracks.get_all_tracks(session)) == 0
def test_tracks_search_artists(session):
track1_path = "/a/b/c"
track1_artist = "Artist One"
track1 = Tracks(session, track1_path)
track1.artist = track1_artist
track2_path = "/m/n/o"
track2_artist = "Artist Two"
track2 = Tracks(session, track2_path)
track2.artist = track2_artist
session.commit()
artist_first_word = track1_artist.split()[0].lower()
assert len(Tracks.search_artists(session, artist_first_word)) == 2
assert len(Tracks.search_artists(session, track1_artist)) == 1 assert len(Tracks.search_artists(session, track1_artist)) == 1
def test_tracks_search_titles(session, track1): def test_tracks_search_titles(session):
track1_title = "I'm So Afraid" track1_path = "/a/b/c"
track1_title = "Title One"
track1 = Tracks(session, track1_path)
track1.title = track1_title
track2_path = "/m/n/o"
track2_title = "Title Two"
track2 = Tracks(session, track2_path)
track2.title = track2_title
session.commit()
title_first_word = track1_title.split()[0].lower()
assert len(Tracks.search_titles(session, title_first_word)) == 2
assert len(Tracks.search_titles(session, track1_title)) == 1 assert len(Tracks.search_titles(session, track1_title)) == 1
def test_tracks_update_lastplayed(session):
track1_path = "/a/b/c"
track1 = Tracks(session, track1_path)
assert track1.lastplayed is None
track1.update_lastplayed(session, track1.id)
assert track1.lastplayed is not None
def test_tracks_update_info(session):
path = "/a/b/c"
artist = "The Beatles"
title = "Help!"
newinfo = "abcdef"
track1 = Tracks(session, path)
track1.artist = artist
track1.title = title
test1 = Tracks.get_by_id(session, track1.id)
assert test1.artist == artist
assert test1.title == title
assert test1.path == path
track1.path = newinfo
test2 = Tracks.get_by_id(session, track1.id)
assert test2.artist == artist
assert test2.title == title
assert test2.path == newinfo
track1.artist = newinfo
test2 = Tracks.get_by_id(session, track1.id)
assert test2.artist == newinfo
assert test2.title == title
assert test2.path == newinfo
track1.title = newinfo
test3 = Tracks.get_by_id(session, track1.id)
assert test3.artist == newinfo
assert test3.title == newinfo
assert test3.path == newinfo

View File

@ -1,105 +0,0 @@
from app.models import (
Playlists,
)
from app import playlistmodel
from dbconfig import scoped_session
def create_model_with_playlist_rows(
session: scoped_session, rows: int
) -> "playlistmodel.PlaylistModel":
playlist = Playlists(session, "test playlist")
# Create a model
model = playlistmodel.PlaylistModel(playlist.id, None)
for row in range(rows):
newrow = model.insert_row(session, row)
model.playlist_rows[newrow].note = str(newrow)
session.commit()
return model
def test_insert_row(monkeypatch, Session):
monkeypatch.setattr(playlistmodel, "Session", Session)
# Create a playlist
with Session() as session:
playlist = Playlists(Session, "test playlist")
# Create a model
model = playlistmodel.PlaylistModel(playlist.id, None)
assert model.rowCount() == 0
model.insert_row(session, 0)
assert model.rowCount() == 1
def test_insert_high_row(monkeypatch, Session):
monkeypatch.setattr(playlistmodel, "Session", Session)
# Create a playlist
with Session() as session:
playlist = Playlists(Session, "test playlist")
# Create a model
model = playlistmodel.PlaylistModel(playlist.id, None)
assert model.rowCount() == 0
model.insert_row(session, 5)
assert model.rowCount() == 1
def test_11_row_playlist(monkeypatch, Session):
# Create multirow playlist
monkeypatch.setattr(playlistmodel, "Session", Session)
with Session() as session:
model = create_model_with_playlist_rows(session, 11)
assert model.rowCount() == 11
assert max(model.playlist_rows.keys()) == 10
for row in range(model.rowCount()):
assert row in model.playlist_rows
assert model.playlist_rows[row].plr_rownum == row
def test_move_rows_test2(monkeypatch, Session):
# move row 3 to row 5
monkeypatch.setattr(playlistmodel, "Session", Session)
with Session() as session:
model = create_model_with_playlist_rows(session, 11)
model.move_rows([3], 5)
# Check we have all rows and plr_rownums are correct
for row in range(model.rowCount()):
assert row in model.playlist_rows
assert model.playlist_rows[row].plr_rownum == row
if row not in [3, 4, 5]:
assert model.playlist_rows[row].note == str(row)
elif row == 3:
assert model.playlist_rows[row].note == str(4)
elif row == 4:
assert model.playlist_rows[row].note == str(5)
elif row == 5:
assert model.playlist_rows[row].note == str(3)
# def test_move_rows_test3(Session):
# # move row 4 to row 3
# pass
# def test_move_rows_test4(Session):
# # move row 4 to row 2
# pass
# def test_move_rows_test5(Session):
# # move rows [1, 4, 5, 10] → 8
# pass
# def test_move_rows_test6(Session):
# # move rows [3, 6] → 5
# pass
# def test_move_rows_test7(Session):
# # move rows [3, 5, 6] → 8
# pass
# def test_move_rows_test8(Session):
# # move rows [7, 8, 10] → 5
# pass