WIP: add tracks from queries OK
This commit is contained in:
parent
b0385789be
commit
f9c060b091
@ -1297,7 +1297,7 @@ class Window(QMainWindow):
|
|||||||
|
|
||||||
if handler:
|
if handler:
|
||||||
# Use a lambda to pass arguments to the function
|
# Use a lambda to pass arguments to the function
|
||||||
action.triggered.connect(lambda _, h=handler, a=args: h(*a))
|
action.triggered.connect(lambda _, h=handler, a=args: h(a))
|
||||||
|
|
||||||
submenu.addAction(action)
|
submenu.addAction(action)
|
||||||
break
|
break
|
||||||
@ -1318,7 +1318,7 @@ class Window(QMainWindow):
|
|||||||
{
|
{
|
||||||
"text": "Show all",
|
"text": "Show all",
|
||||||
"handler": "create_playlist_from_template",
|
"handler": "create_playlist_from_template",
|
||||||
"args": (0),
|
"args": 0,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"separator": True,
|
"separator": True,
|
||||||
|
|||||||
@ -243,7 +243,7 @@ class QuerylistModel(QAbstractTableModel):
|
|||||||
lastplayed=lastplayed,
|
lastplayed=lastplayed,
|
||||||
path=result.path,
|
path=result.path,
|
||||||
title=result.title,
|
title=result.title,
|
||||||
track_id=result.track_id,
|
track_id=result.id,
|
||||||
)
|
)
|
||||||
|
|
||||||
self.querylist_rows[row] = queryrow
|
self.querylist_rows[row] = queryrow
|
||||||
|
|||||||
@ -237,6 +237,78 @@ def get_all_tracks() -> list[TrackDTO]:
|
|||||||
return _tracks_where(Tracks.id > 0)
|
return _tracks_where(Tracks.id > 0)
|
||||||
|
|
||||||
|
|
||||||
|
def get_filtered_tracks(filter: Filter) -> list["Tracks"]:
|
||||||
|
"""
|
||||||
|
Return tracks matching filter
|
||||||
|
"""
|
||||||
|
|
||||||
|
with db.Session() as session:
|
||||||
|
return Tracks.get_filtered_tracks(session, filter)
|
||||||
|
|
||||||
|
query = select(Tracks)
|
||||||
|
|
||||||
|
# Path specification
|
||||||
|
if filter.path:
|
||||||
|
if filter.path_type == "contains":
|
||||||
|
query = query.where(Tracks.path.ilike(f"%{filter.path}%"))
|
||||||
|
elif filter.path_type == "excluding":
|
||||||
|
query = query.where(Tracks.path.notilike(f"%{filter.path}%"))
|
||||||
|
else:
|
||||||
|
raise ApplicationError(f"Can't process filter path ({filter=})")
|
||||||
|
|
||||||
|
# Duration specification
|
||||||
|
seconds_duration = filter.duration_number
|
||||||
|
if filter.duration_unit == Config.FILTER_DURATION_MINUTES:
|
||||||
|
seconds_duration *= 60
|
||||||
|
elif filter.duration_unit != Config.FILTER_DURATION_SECONDS:
|
||||||
|
raise ApplicationError(f"Can't process filter duration ({filter=})")
|
||||||
|
|
||||||
|
if filter.duration_type == Config.FILTER_DURATION_LONGER:
|
||||||
|
query = query.where(Tracks.duration >= seconds_duration)
|
||||||
|
elif filter.duration_unit == Config.FILTER_DURATION_SHORTER:
|
||||||
|
query = query.where(Tracks.duration <= seconds_duration)
|
||||||
|
else:
|
||||||
|
raise ApplicationError(f"Can't process filter duration type ({filter=})")
|
||||||
|
|
||||||
|
# Process comparator
|
||||||
|
if filter.last_played_comparator == Config.FILTER_PLAYED_COMPARATOR_NEVER:
|
||||||
|
# Select tracks that have never been played
|
||||||
|
query = query.outerjoin(Playdates, Tracks.id == Playdates.track_id).where(
|
||||||
|
Playdates.id.is_(None)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Last played specification
|
||||||
|
now = dt.datetime.now()
|
||||||
|
# Set sensible default, and correct for Config.FILTER_PLAYED_COMPARATOR_ANYTIME
|
||||||
|
before = now
|
||||||
|
# If not ANYTIME, set 'before' appropriates
|
||||||
|
if filter.last_played_comparator != Config.FILTER_PLAYED_COMPARATOR_ANYTIME:
|
||||||
|
if filter.last_played_unit == Config.FILTER_PLAYED_DAYS:
|
||||||
|
before = now - dt.timedelta(days=filter.last_played_number)
|
||||||
|
elif filter.last_played_unit == Config.FILTER_PLAYED_WEEKS:
|
||||||
|
before = now - dt.timedelta(days=7 * filter.last_played_number)
|
||||||
|
elif filter.last_played_unit == Config.FILTER_PLAYED_MONTHS:
|
||||||
|
before = now - dt.timedelta(days=30 * filter.last_played_number)
|
||||||
|
elif filter.last_played_unit == Config.FILTER_PLAYED_YEARS:
|
||||||
|
before = now - dt.timedelta(days=365 * filter.last_played_number)
|
||||||
|
|
||||||
|
subquery = (
|
||||||
|
select(
|
||||||
|
Playdates.track_id,
|
||||||
|
func.max(Playdates.lastplayed).label("max_last_played"),
|
||||||
|
)
|
||||||
|
.group_by(Playdates.track_id)
|
||||||
|
.subquery()
|
||||||
|
)
|
||||||
|
query = query.join(subquery, Tracks.id == subquery.c.track_id).where(
|
||||||
|
subquery.c.max_last_played < before
|
||||||
|
)
|
||||||
|
|
||||||
|
with db.Session() as session:
|
||||||
|
records = session.scalars(query).unique().all()
|
||||||
|
|
||||||
|
return records
|
||||||
|
|
||||||
@log_call
|
@log_call
|
||||||
def add_track_to_header(playlistrow_id: int, track_id: int) -> None:
|
def add_track_to_header(playlistrow_id: int, track_id: int) -> None:
|
||||||
"""
|
"""
|
||||||
@ -314,68 +386,6 @@ def update_track(
|
|||||||
return updated_track
|
return updated_track
|
||||||
|
|
||||||
|
|
||||||
@log_call
|
|
||||||
def get_filtered_tracks(filter: Filter) -> list[TrackDTO]:
|
|
||||||
"""
|
|
||||||
Return tracks matching filter
|
|
||||||
"""
|
|
||||||
|
|
||||||
with db.Session() as session:
|
|
||||||
return Tracks.get_filtered_tracks(session, Filter)
|
|
||||||
# Create a base query
|
|
||||||
query = Tracks.id > 0
|
|
||||||
|
|
||||||
# Path specification
|
|
||||||
if filter.path:
|
|
||||||
if filter.path_type == "contains":
|
|
||||||
query = query.where(Tracks.path.ilike(f"%{filter.path}%"))
|
|
||||||
elif filter.path_type == "excluding":
|
|
||||||
query = query.where(Tracks.path.notilike(f"%{filter.path}%"))
|
|
||||||
else:
|
|
||||||
raise ApplicationError(f"Can't process filter path ({filter=})")
|
|
||||||
|
|
||||||
# Duration specification
|
|
||||||
seconds_duration = filter.duration_number
|
|
||||||
if filter.duration_unit == Config.FILTER_DURATION_MINUTES:
|
|
||||||
seconds_duration *= 60
|
|
||||||
elif filter.duration_unit != Config.FILTER_DURATION_SECONDS:
|
|
||||||
raise ApplicationError(f"Can't process filter duration ({filter=})")
|
|
||||||
|
|
||||||
if filter.duration_type == Config.FILTER_DURATION_LONGER:
|
|
||||||
query = query.where(Tracks.duration >= seconds_duration)
|
|
||||||
elif filter.duration_unit == Config.FILTER_DURATION_SHORTER:
|
|
||||||
query = query.where(Tracks.duration <= seconds_duration)
|
|
||||||
else:
|
|
||||||
raise ApplicationError(f"Can't process filter duration type ({filter=})")
|
|
||||||
|
|
||||||
# Process comparator
|
|
||||||
if filter.last_played_comparator == Config.FILTER_PLAYED_COMPARATOR_ANYTIME:
|
|
||||||
return _tracks_where(query, filter_by_last_played=False)
|
|
||||||
|
|
||||||
elif filter.last_played_comparator == Config.FILTER_PLAYED_COMPARATOR_NEVER:
|
|
||||||
return _tracks_where(query, filter_by_last_played=True, last_played_before=None)
|
|
||||||
else:
|
|
||||||
# Last played specification
|
|
||||||
now = dt.datetime.now()
|
|
||||||
# Set sensible default, and correct for Config.FILTER_PLAYED_COMPARATOR_ANYTIME
|
|
||||||
before = now
|
|
||||||
# If not ANYTIME, set 'before' appropriates
|
|
||||||
if filter.last_played_unit == Config.FILTER_PLAYED_DAYS:
|
|
||||||
before = now - dt.timedelta(days=filter.last_played_number)
|
|
||||||
elif filter.last_played_unit == Config.FILTER_PLAYED_WEEKS:
|
|
||||||
before = now - dt.timedelta(days=7 * filter.last_played_number)
|
|
||||||
elif filter.last_played_unit == Config.FILTER_PLAYED_MONTHS:
|
|
||||||
before = now - dt.timedelta(days=30 * filter.last_played_number)
|
|
||||||
elif filter.last_played_unit == Config.FILTER_PLAYED_YEARS:
|
|
||||||
before = now - dt.timedelta(days=365 * filter.last_played_number)
|
|
||||||
else:
|
|
||||||
raise ApplicationError("Can't determine last played criteria")
|
|
||||||
|
|
||||||
return _tracks_where(
|
|
||||||
query, filter_by_last_played=True, last_played_before=before
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def set_track_intro(track_id: int, intro: int) -> None:
|
def set_track_intro(track_id: int, intro: int) -> None:
|
||||||
"""
|
"""
|
||||||
Set track intro time
|
Set track intro time
|
||||||
@ -1023,20 +1033,19 @@ def _queries_where(
|
|||||||
Return queries selected by query
|
Return queries selected by query
|
||||||
"""
|
"""
|
||||||
|
|
||||||
stmt = select(
|
|
||||||
Queries.id.label("query_id"), Queries.name, Queries.favourite, Queries.filter
|
|
||||||
).where(query)
|
|
||||||
|
|
||||||
results: list[QueryDTO] = []
|
results: list[QueryDTO] = []
|
||||||
|
|
||||||
with db.Session() as session:
|
with db.Session() as session:
|
||||||
records = session.execute(stmt).one_or_none()
|
records = session.scalars(
|
||||||
|
select(Queries)
|
||||||
|
.where(query)
|
||||||
|
).all()
|
||||||
for record in records:
|
for record in records:
|
||||||
dto = QueryDTO(
|
dto = QueryDTO(
|
||||||
favourite=record.favourite,
|
favourite=record.favourite,
|
||||||
filter=record.filter,
|
filter=record.filter,
|
||||||
name=record.name,
|
name=record.name,
|
||||||
query_id=record.query_id,
|
query_id=record.id,
|
||||||
)
|
)
|
||||||
results.append(dto)
|
results.append(dto)
|
||||||
|
|
||||||
|
|||||||
@ -49,7 +49,7 @@ class MyTestCase(unittest.TestCase):
|
|||||||
return (playlist, model)
|
return (playlist, model)
|
||||||
|
|
||||||
def create_playlist_model_tracks(self, playlist_name: str):
|
def create_playlist_model_tracks(self, playlist_name: str):
|
||||||
(playlist, model) = self.create_playlist_and_model("my playlist")
|
(playlist, model) = self.create_playlist_and_model(playlist_name)
|
||||||
# Create tracks
|
# Create tracks
|
||||||
metadata1 = get_all_track_metadata(self.isa_path)
|
metadata1 = get_all_track_metadata(self.isa_path)
|
||||||
self.track1 = repository.create_track(self.isa_path, metadata1)
|
self.track1 = repository.create_track(self.isa_path, metadata1)
|
||||||
@ -127,10 +127,10 @@ class MyTestCase(unittest.TestCase):
|
|||||||
assert result_mom[0].artist == self.mom_artist
|
assert result_mom[0].artist == self.mom_artist
|
||||||
|
|
||||||
def test_get_track_by_title(self):
|
def test_get_track_by_title(self):
|
||||||
metadata = get_all_track_metadata(self.isa_path)
|
metadata_isa = get_all_track_metadata(self.isa_path)
|
||||||
_ = repository.create_track(self.isa_path, metadata)
|
_ = repository.create_track(self.isa_path, metadata_isa)
|
||||||
metadata = get_all_track_metadata(self.mom_path)
|
metadata_mom = get_all_track_metadata(self.mom_path)
|
||||||
_ = repository.create_track(self.mom_path, metadata)
|
_ = repository.create_track(self.mom_path, metadata_mom)
|
||||||
result_isa = repository.tracks_by_title(self.isa_title)
|
result_isa = repository.tracks_by_title(self.isa_title)
|
||||||
assert len(result_isa) == 1
|
assert len(result_isa) == 1
|
||||||
assert result_isa[0].title == self.isa_title
|
assert result_isa[0].title == self.isa_title
|
||||||
@ -138,6 +138,21 @@ class MyTestCase(unittest.TestCase):
|
|||||||
assert len(result_mom) == 1
|
assert len(result_mom) == 1
|
||||||
assert result_mom[0].title == self.mom_title
|
assert result_mom[0].title == self.mom_title
|
||||||
|
|
||||||
|
def test_tracks_get_all_tracks(self):
|
||||||
|
self.create_playlist_model_tracks(playlist_name="test_track_get_all_tracks")
|
||||||
|
all_tracks = repository.get_all_tracks()
|
||||||
|
assert len(all_tracks) == 2
|
||||||
|
|
||||||
|
def test_tracks_by_path(self):
|
||||||
|
metadata_isa = get_all_track_metadata(self.isa_path)
|
||||||
|
_ = repository.create_track(self.isa_path, metadata_isa)
|
||||||
|
metadata_mom = get_all_track_metadata(self.mom_path)
|
||||||
|
_ = repository.create_track(self.mom_path, metadata_mom)
|
||||||
|
result_isa = repository.track_by_path(self.isa_path)
|
||||||
|
assert result_isa.title == self.isa_title
|
||||||
|
result_mom = repository.track_by_path(self.mom_path)
|
||||||
|
assert result_mom.title == self.mom_title
|
||||||
|
|
||||||
def test_move_rows_test1(self):
|
def test_move_rows_test1(self):
|
||||||
# move row 3 to row 5
|
# move row 3 to row 5
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user