WIP: queries working, tests so far good

This commit is contained in:
Keith Edmunds 2025-03-05 09:00:41 +00:00
parent 096889d6cb
commit 7fd655f96f
5 changed files with 143 additions and 37 deletions

View File

@ -83,7 +83,7 @@ class Filter:
version: int = 1
path_type: str = "contains"
path: Optional[str] = None
last_played_number: Optional[int] = None
last_played_number: int = 0
last_played_comparator: str = "before"
last_played_unit: str = "years"
duration_type: str = "longer than"

View File

@ -197,7 +197,7 @@ class TracksTable(Model):
id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
artist: Mapped[str] = mapped_column(String(256), index=True)
bitrate: Mapped[Optional[int]] = mapped_column(default=None)
bitrate: Mapped[int] = mapped_column(default=None)
duration: Mapped[int] = mapped_column(index=True)
fade_at: Mapped[int] = mapped_column(index=False)
intro: Mapped[Optional[int]] = mapped_column(default=None)

View File

@ -128,10 +128,15 @@ class NoteColours(dbtables.NoteColoursTable):
class Playdates(dbtables.PlaydatesTable):
def __init__(self, session: Session, track_id: int) -> None:
def __init__(
self, session: Session, track_id: int, when: Optional[dt.datetime] = None
) -> None:
"""Record that track was played"""
self.lastplayed = dt.datetime.now()
if not when:
self.lastplayed = dt.datetime.now()
else:
self.lastplayed = when
self.track_id = track_id
session.add(self)
session.commit()
@ -745,39 +750,39 @@ class Tracks(dbtables.TracksTable):
else:
raise ApplicationError(f"Can't process filter duration type ({filter=})")
# Last played specification
if (
filter.last_played_number
and filter.last_played_comparator != Config.FILTER_PLAYED_COMPARATOR_ANYTIME
):
# Process comparator
if filter.last_played_comparator == Config.FILTER_PLAYED_COMPARATOR_NEVER:
# Select tracks that have never been played
query = query.outerjoin(Playdates, cls.id == Playdates.track_id).where(
Playdates.id.is_(None)
)
else:
# Last played specification
now = dt.datetime.now()
since = now - dt.timedelta(days=365 * filter.last_played_number)
if filter.duration_unit == Config.FILTER_PLAYED_DAYS:
since = now - dt.timedelta(days=filter.last_played_number)
elif filter.duration_unit == Config.FILTER_PLAYED_WEEKS:
since = now - dt.timedelta(days=7 * filter.last_played_number)
if filter.duration_unit == Config.FILTER_PLAYED_MONTHS:
since = now - dt.timedelta(days=30 * filter.last_played_number)
# Set sensible default, and correct for Config.FILTER_PLAYED_COMPARATOR_ANYTIME
before = now
# If not ANYTIME, set 'before' appropriates
if filter.last_played_comparator != Config.FILTER_PLAYED_COMPARATOR_ANYTIME:
if filter.last_played_unit == Config.FILTER_PLAYED_DAYS:
before = now - dt.timedelta(days=filter.last_played_number)
elif filter.last_played_unit == Config.FILTER_PLAYED_WEEKS:
before = now - dt.timedelta(days=7 * filter.last_played_number)
elif filter.last_played_unit == Config.FILTER_PLAYED_MONTHS:
before = now - dt.timedelta(days=30 * filter.last_played_number)
elif filter.last_played_unit == Config.FILTER_PLAYED_YEARS:
before = now - dt.timedelta(days=365 * filter.last_played_number)
if filter.last_played_comparator == Config.FILTER_PLAYED_COMPARATOR_NEVER:
# Select tracks that have never been played
query = query.outerjoin(Playdates, cls.id == Playdates.track_id).where(
Playdates.id.is_(None)
)
elif (
filter.last_played_comparator == Config.FILTER_PLAYED_COMPARATOR_BEFORE
):
subquery = (
select(
Playdates.track_id,
func.max(Playdates.lastplayed).label("max_last_played"),
)
.group_by(Playdates.track_id)
.subquery()
)
query = query.join(subquery, Tracks.id == subquery.c.track_id).where(
subquery.c.max_last_played < since
subquery = (
select(
Playdates.track_id,
func.max(Playdates.lastplayed).label("max_last_played"),
)
.group_by(Playdates.track_id)
.subquery()
)
query = query.join(subquery, Tracks.id == subquery.c.track_id).where(
subquery.c.max_last_played < before
)
records = session.scalars(query).unique().all()

View File

@ -230,11 +230,11 @@ class QuerylistModel(QAbstractTableModel):
try:
results = Tracks.get_filtered_tracks(self.session, self.filter)
for result in results:
lastplayed = None
if hasattr(result, "playdates"):
pds = result.playdates
lastplayed = max([a.lastplayed for a in pds])
else:
lastplayed = None
if pds:
lastplayed = max([a.lastplayed for a in pds])
queryrow = QueryRow(
artist=result.artist,
bitrate=result.bitrate or 0,

101
tests/test_queries.py Normal file
View File

@ -0,0 +1,101 @@
# Standard library imports
import datetime as dt
import unittest
# PyQt imports
# Third party imports
# App imports
from app.models import (
db,
Playdates,
Tracks,
)
from classes import (
Filter,
)
class MyTestCase(unittest.TestCase):
@classmethod
def setUpClass(cls):
"""Runs once before any test in this class"""
db.create_all()
with db.Session() as session:
# Create some track entries
_ = Tracks(**dict(
session=session,
artist="a",
bitrate=0,
duration=100,
fade_at=0,
path="/alpha/bravo/charlie",
silence_at=0,
start_gap=0,
title="abc"
))
track2 = Tracks(**dict(
session=session,
artist="a",
bitrate=0,
duration=100,
fade_at=0,
path="/xray/yankee/zulu",
silence_at=0,
start_gap=0,
title="xyz"
))
track2_id = track2.id
# Add playdates
# Track 2 played just over a year ago
just_over_a_year_ago = dt.datetime.now() - dt.timedelta(days=367)
_ = Playdates(session, track2_id, when=just_over_a_year_ago)
@classmethod
def tearDownClass(cls):
"""Runs once after all tests"""
db.drop_all()
def setUp(self):
"""Runs before each test"""
pass
def tearDown(self):
"""Runs after each test"""
pass
def test_search_path_1(self):
"""Search for unplayed track"""
filter = Filter(path="alpha", last_played_comparator="never")
with db.Session() as session:
results = Tracks.get_filtered_tracks(session, filter)
assert len(results) == 1
assert 'alpha' in results[0].path
def test_search_path_2(self):
"""Search for unplayed track that doesn't exist"""
filter = Filter(path="xray", last_played_comparator="never")
with db.Session() as session:
results = Tracks.get_filtered_tracks(session, filter)
assert len(results) == 0
def test_played_over_a_year_ago(self):
"""Search for tracks played over a year ago"""
filter = Filter(last_played_unit="years", last_played_number=1)
with db.Session() as session:
results = Tracks.get_filtered_tracks(session, filter)
assert len(results) == 1
assert 'zulu' in results[0].path