# Standard library imports import datetime as dt import re # PyQt imports # Third party imports from dogpile.cache import make_region from dogpile.cache.api import NO_VALUE from sqlalchemy import ( delete, func, select, update, ) from sqlalchemy.orm import aliased from sqlalchemy.orm.session import Session from sqlalchemy.sql.elements import BinaryExpression, ColumnElement # App imports from classes import ( ApplicationError, Filter, NoteColoursDTO, PlaydatesDTO, PlaylistDTO, PlaylistRowDTO, QueryDTO, TrackDTO, ) from config import Config from log import log, log_call from models import ( db, NoteColours, Playdates, PlaylistRows, Playlists, Queries, Settings, Tracks, ) # Configure the dogpile cache region cache_region = make_region().configure( 'dogpile.cache.memory', # Use in-memory caching for now (switch to Redis if needed) expiration_time=600 # Cache expires after 10 minutes ) # Helper functions # @log_call def _remove_substring_case_insensitive(parent_string: str, substring: str) -> str: """ Remove all instances of substring from parent string, case insensitively """ # Convert both strings to lowercase for case-insensitive comparison lower_parent = parent_string.lower() lower_substring = substring.lower() # Initialize the result string result = parent_string # Continue removing the substring until it's no longer found while lower_substring in lower_parent: # Find the index of the substring index = lower_parent.find(lower_substring) # Remove the substring result = result[:index] + result[index + len(substring) :] # Update the lowercase versions lower_parent = result.lower() return result # Notecolour functions def _notecolours_all(session: Session) -> list[NoteColoursDTO]: """ Return all notecolour records """ cache_key = "note_colours_all" cached_result = cache_region.get(cache_key) if cached_result is not NO_VALUE: return cached_result # Query the database records = session.scalars( select(NoteColours) .where( NoteColours.enabled.is_(True), ) .order_by(NoteColours.order) ).all() results: list[NoteColoursDTO] = [] for record in records: result = NoteColoursDTO( notecolour_id=record.id, substring=record.substring, colour=record.colour, enabled=record.enabled, foreground=record.foreground, is_regex=record.is_regex, is_casesensitive=record.is_casesensitive, order=record.order, strip_substring=record.strip_substring, ) results.append(result) cache_region.set(cache_key, results) return results def _notecolors_get_notecolours_dto(text: str) -> tuple[NoteColoursDTO | None, str]: """ Parse text and return first matching colour record or None """ with db.Session() as session: for rec in _notecolours_all(session): if rec.is_regex: flags = re.UNICODE if not rec.is_casesensitive: flags |= re.IGNORECASE p = re.compile(rec.substring, flags) if p.match(text): if rec.strip_substring: return_text = re.sub(p, "", text) else: return_text = text return (rec, return_text) else: if rec.is_casesensitive: if rec.substring in text: return_text = text.replace(rec.substring, "") return (rec, return_text) else: if rec.substring.lower() in text.lower(): return_text = _remove_substring_case_insensitive( text, rec.substring ) return (rec, return_text) return (None, text) def notecolours_get_colour(text: str, foreground: bool = False) -> str: """ Parse text and return background (foreground if foreground==True) colour string if matched, else None """ (rec, _) = _notecolors_get_notecolours_dto(text) if rec is None: return "" elif foreground: return rec.foreground or "" else: return rec.colour # @log_call def notecolours_remove_colour_substring(text: str) -> str: """ Remove text that identifies the colour to be used if strip_substring is True """ (rec, stripped_text) = _notecolors_get_notecolours_dto(text) return stripped_text # Track functions # @log_call def _tracks_where(query: BinaryExpression | ColumnElement[bool],) -> list[TrackDTO]: """ filter_by_last_played: bool = False, last_played_before: dt.datetime | None = None, Return tracks selected by query """ # Alibas PlaydatesTable for subquery LatestPlaydate = aliased(Playdates) # Create a 'latest playdate' subquery latest_playdate_subq = ( select( LatestPlaydate.track_id, func.max(LatestPlaydate.lastplayed).label("lastplayed"), ) .group_by(LatestPlaydate.track_id) .subquery() ) stmt = ( select( Tracks.id.label("track_id"), Tracks.artist, Tracks.bitrate, Tracks.duration, Tracks.fade_at, Tracks.intro, Tracks.path, Tracks.silence_at, Tracks.start_gap, Tracks.title, latest_playdate_subq.c.lastplayed, ) .outerjoin(latest_playdate_subq, Tracks.id == latest_playdate_subq.c.track_id) .where(query) ) results: list[TrackDTO] = [] with db.Session() as session: records = session.execute(stmt).all() for record in records: dto = TrackDTO( artist=record.artist, bitrate=record.bitrate, duration=record.duration, fade_at=record.fade_at, intro=record.intro, lastplayed=record.lastplayed, path=record.path, silence_at=record.silence_at, start_gap=record.start_gap, title=record.title, track_id=record.track_id, ) results.append(dto) return results # @log_call def track_add_to_header(playlistrow_id: int, track_id: int) -> None: """ Add a track to this (header) row """ with db.Session() as session: session.execute( update(PlaylistRows) .where(PlaylistRows.id == playlistrow_id) .values(track_id=track_id) ) session.commit() def tracks_all() -> list[TrackDTO]: """Return a list of all tracks""" return _tracks_where(Tracks.id > 0) def tracks_by_artist(filter_str: str) -> list[TrackDTO]: """ Return tracks where artist is like filter """ return _tracks_where(Tracks.artist.ilike(f"%{filter_str}%")) def track_by_id(track_id: int) -> TrackDTO | None: """ Return track with specified id """ track_list = _tracks_where(Tracks.id == track_id) if not track_list: return None if len(track_list) > 1: raise ApplicationError(f"Duplicate {track_id=}") return track_list[0] def track_by_path(path: str) -> TrackDTO | None: """ Return track with passed path or None """ track_list = _tracks_where(Tracks.path.ilike(path)) if not track_list: return None if len(track_list) > 1: raise ApplicationError(f"Duplicate {path=}") return track_list[0] def tracks_by_title(filter_str: str) -> list[TrackDTO]: """ Return tracks where title is like filter """ return _tracks_where(Tracks.title.ilike(f"%{filter_str}%")) # @log_call def track_create(metadata: dict[str, str | int | float]) -> TrackDTO: """ Create a track db entry from a track path and return the DTO """ with db.Session() as session: try: track = Tracks( session=session, path=str(metadata["path"]), title=str(metadata["title"]), artist=str(metadata["artist"]), duration=int(metadata["duration"]), start_gap=int(metadata["start_gap"]), fade_at=int(metadata["fade_at"]), silence_at=int(metadata["silence_at"]), bitrate=int(metadata["bitrate"]), ) track_id = track.id session.commit() except Exception: raise ApplicationError("Can't create Track") new_track = track_by_id(track_id) if not new_track: raise ApplicationError("Unable to create new track") return new_track def track_delete(track_id: int) -> None: """Delete track""" with db.Session() as session: track = session.get(Tracks, track_id) session.delete(track) session.commit() def tracks_filtered(filter: Filter) -> list[TrackDTO]: """ Return tracks matching filter """ query = select(Tracks) # Path specification if filter.path: if filter.path_type == "contains": query = query.where(Tracks.path.ilike(f"%{filter.path}%")) elif filter.path_type == "excluding": query = query.where(Tracks.path.notilike(f"%{filter.path}%")) else: raise ApplicationError(f"Can't process filter path ({filter=})") # Duration specification seconds_duration = filter.duration_number if filter.duration_unit == Config.FILTER_DURATION_MINUTES: seconds_duration *= 60 elif filter.duration_unit != Config.FILTER_DURATION_SECONDS: raise ApplicationError(f"Can't process filter duration ({filter=})") if filter.duration_type == Config.FILTER_DURATION_LONGER: query = query.where(Tracks.duration >= seconds_duration) elif filter.duration_unit == Config.FILTER_DURATION_SHORTER: query = query.where(Tracks.duration <= seconds_duration) else: raise ApplicationError(f"Can't process filter duration type ({filter=})") # Process comparator if filter.last_played_comparator == Config.FILTER_PLAYED_COMPARATOR_NEVER: # Select tracks that have never been played query = query.outerjoin(Playdates, Tracks.id == Playdates.track_id).where( Playdates.id.is_(None) ) else: # Last played specification now = dt.datetime.now() # Set sensible default, and correct for Config.FILTER_PLAYED_COMPARATOR_ANYTIME before = now # If not ANYTIME, set 'before' appropriates if filter.last_played_comparator != Config.FILTER_PLAYED_COMPARATOR_ANYTIME: if filter.last_played_unit == Config.FILTER_PLAYED_DAYS: before = now - dt.timedelta(days=filter.last_played_number) elif filter.last_played_unit == Config.FILTER_PLAYED_WEEKS: before = now - dt.timedelta(days=7 * filter.last_played_number) elif filter.last_played_unit == Config.FILTER_PLAYED_MONTHS: before = now - dt.timedelta(days=30 * filter.last_played_number) elif filter.last_played_unit == Config.FILTER_PLAYED_YEARS: before = now - dt.timedelta(days=365 * filter.last_played_number) subquery = ( select( Playdates.track_id, func.max(Playdates.lastplayed).label("max_last_played"), ) .group_by(Playdates.track_id) .subquery() ) query = query.join(subquery, Tracks.id == subquery.c.track_id).where( subquery.c.max_last_played < before ) results: list[TrackDTO] = [] with db.Session() as session: records = session.scalars(query).unique().all() for record in records: if record.playdates: last_played = record.playdates[0].lastplayed else: last_played = None last_played dto = TrackDTO( artist=record.artist, bitrate=record.bitrate, duration=record.duration, fade_at=record.fade_at, intro=record.intro, lastplayed=last_played, path=record.path, silence_at=record.silence_at, start_gap=record.start_gap, title=record.title, track_id=record.id, ) results.append(dto) return results def track_set_intro(track_id: int, intro: int) -> None: """ Set track intro time """ with db.Session() as session: session.execute( update(Tracks) .where(Tracks.id == track_id) .values(intro=intro) ) session.commit() # @log_call def track_update( path: str, track_id: int, metadata: dict[str, str | int | float] ) -> TrackDTO: """ Update an existing track db entry return the DTO """ with db.Session() as session: track = session.get(Tracks, track_id) if not track: raise ApplicationError(f"Can't retrieve Track ({track_id=})") track.path = str(metadata["path"]) track.title = str(metadata["title"]), track.artist = str(metadata["artist"]), track.duration = int(metadata["duration"]), track.start_gap = int(metadata["start_gap"]), track.fade_at = int(metadata["fade_at"]), track.silence_at = int(metadata["silence_at"]), track.bitrate = int(metadata["bitrate"]), session.commit() updated_track = track_by_id(track_id) if not updated_track: raise ApplicationError("Unable to retrieve updated track") return updated_track # Playlist functions def _playlist_check_playlist( session: Session, playlist_id: int, fix: bool = False ) -> None: """ Ensure the row numbers are contiguous. Fix and log if fix==True, else raise ApplicationError. """ playlist_rows = ( session.execute( select(PlaylistRows) .where(PlaylistRows.playlist_id == playlist_id) .order_by(PlaylistRows.row_number) ) .scalars() .all() ) for idx, plr in enumerate(playlist_rows): if plr.row_number == idx: continue msg = ( "_check_playlist_integrity: incorrect row number " f"({plr.id=}, {plr.row_number=}, {idx=})" ) if fix: log.debug(msg) plr.row_number = idx else: raise ApplicationError(msg) # @log_call def _playlist_shift_rows( session: Session, playlist_id: int, starting_row: int, shift_by: int ) -> None: """ Shift rows from starting_row by shift_by. If shift_by is +ve, shift rows down; if -ve, shift them up. """ session.execute( update(PlaylistRows) .where( (PlaylistRows.playlist_id == playlist_id), (PlaylistRows.row_number >= starting_row), ) .values(row_number=PlaylistRows.row_number + shift_by) ) # @log_call def _playlists_where( query: BinaryExpression | ColumnElement[bool], ) -> list[PlaylistDTO]: """ Return playlists selected by query """ stmt = select( Playlists.favourite, Playlists.is_template, Playlists.id.label("playlist_id"), Playlists.name, Playlists.open, ).where(query) results: list[PlaylistDTO] = [] with db.Session() as session: records = session.execute(stmt).all() for record in records: dto = PlaylistDTO( favourite=record.favourite, is_template=record.is_template, playlist_id=record.playlist_id, name=record.name, open=record.open, ) results.append(dto) return results def playlists_all(): """Return all playlists""" return _playlists_where(Playlists.id > 0) # @log_call def playlist_by_id(playlist_id: int) -> PlaylistDTO | None: """ Return playlist with specified id """ playlist_list = _playlists_where(Playlists.id == playlist_id) if not playlist_list: return None if len(playlist_list) > 1: raise ApplicationError(f"Duplicate {playlist_id=}") return playlist_list[0] def playlist_copy(src_id: int, dst_id: int) -> None: """Copy playlist entries""" with db.Session() as session: src_rows = session.scalars( select(PlaylistRows).where(PlaylistRows.playlist_id == src_id) ).all() for plr in src_rows: PlaylistRows( session=session, playlist_id=dst_id, row_number=plr.row_number, note=plr.note, track_id=plr.track_id, ) session.commit() def playlists_closed() -> list[PlaylistDTO]: """ Return a list of closed playlists """ return _playlists_where(Playlists.open.is_(False)) # @log_call def playlist_create(name: str, template_id: int, as_template: bool = False) -> PlaylistDTO: """ Create playlist and return DTO. """ with db.Session() as session: try: playlist = Playlists(session, name, template_id) playlist.is_template = as_template playlist_id = playlist.id session.commit() except Exception: raise ApplicationError("Can't create Playlist") if template_id != 0: playlist_copy(template_id, playlist_id) new_playlist = playlist_by_id(playlist_id) if not new_playlist: raise ApplicationError("Can't retrieve new Playlist") return new_playlist def playlist_delete(playlist_id: int) -> None: """Delete playlist""" with db.Session() as session: query = session.get(Playlists, playlist_id) session.delete(query) session.commit() # @log_call def playlist_insert_row( playlist_id: int, row_number: int, track_id: int | None, note: str ) -> PlaylistRowDTO: """ Insert a new row into playlist and return new row DTO """ with db.Session() as session: # Sanity check _playlist_check_playlist(session, playlist_id, fix=False) # Make space for new row _playlist_shift_rows( session=session, playlist_id=playlist_id, starting_row=row_number, shift_by=1, ) playlist_row = PlaylistRows( session=session, playlist_id=playlist_id, row_number=row_number, note=note, track_id=track_id, ) session.commit() playlist_row_id = playlist_row.id # Sanity check _playlist_check_playlist(session, playlist_id, fix=False) new_playlist_row = playlistrow_by_id(playlistrow_id=playlist_row_id) if not new_playlist_row: raise ApplicationError("Can't retrieve new playlist row") return new_playlist_row # @log_call def playlist_mark_status(playlist_id: int, open: bool) -> None: """Mark playlist as open or closed""" with db.Session() as session: session.execute( update(Playlists) .where(Playlists.id == playlist_id) .values(open=open) ) session.commit() # @log_call def playlist_move_rows( from_rows: list[int], from_playlist_id: int, to_row: int, to_playlist_id: int | None = None, ) -> None: """ Move rows with or between playlists. Algorithm: - Sanity check row numbers - Check there are no playlist rows with playlist_id == PENDING_MOVE - Put rows to be moved into PENDING_MOVE playlist - Resequence remaining row numbers - Make space for moved rows - Move the PENDING_MOVE rows back and fixup row numbers - Sanity check row numbers """ # If to_playlist_id isn't specified, we're moving within the one # playlist. if to_playlist_id is None: to_playlist_id = from_playlist_id with db.Session() as session: # Sanity check row numbers _playlist_check_playlist(session, from_playlist_id, fix=False) if from_playlist_id != to_playlist_id: _playlist_check_playlist(session, to_playlist_id, fix=False) # Check there are no playlist rows with playlist_id == PENDING_MOVE pending_move_rows = playlistrows_by_playlist(Config.PLAYLIST_PENDING_MOVE) if pending_move_rows: raise ApplicationError(f"move_rows_to_playlist: {pending_move_rows=}") # We need playlist length if we're moving within a playlist. Get # that now before we remove rows. from_playlist_length = len(playlistrows_by_playlist(from_playlist_id)) # Put rows to be moved into PENDING_MOVE playlist session.execute( update(PlaylistRows) .where( PlaylistRows.playlist_id == from_playlist_id, PlaylistRows.row_number.in_(from_rows), ) .values(playlist_id=Config.PLAYLIST_PENDING_MOVE) ) # Resequence remaining row numbers _playlist_check_playlist(session, from_playlist_id, fix=True) session.commit() # Make space for moved rows. If moving within one playlist, # determning where to make the space is non-trivial. For example, # if the playlist has ten entries and we're moving four of them # to row 8, after we've moved the rows to the # PLAYLIST_PENDING_MOVE there will only be six entries left. # Clearly we can't make space at row 8... space_row = to_row if to_playlist_id == from_playlist_id: overflow = max(to_row + len(from_rows) - from_playlist_length, 0) if overflow != 0: space_row = ( to_row - overflow - len([a for a in from_rows if a > to_row]) ) _playlist_shift_rows(session, to_playlist_id, space_row, len(from_rows)) # Move the PENDING_MOVE rows back and fixup row numbers update_list: list[dict[str, int]] = [] next_row = space_row # PLAYLIST_PENDING_MOVE may have gaps so don't check it for row_to_move in playlistrows_by_playlist( Config.PLAYLIST_PENDING_MOVE, check_playlist_itegrity=False ): update_list.append( {"id": row_to_move.playlistrow_id, "row_number": next_row} ) update_list.append( {"id": row_to_move.playlistrow_id, "playlist_id": to_playlist_id} ) next_row += 1 session.execute(update(PlaylistRows), update_list) session.commit() # Sanity check row numbers _playlist_check_playlist(session, from_playlist_id, fix=False) if from_playlist_id != to_playlist_id: _playlist_check_playlist(session, to_playlist_id, fix=False) def playlists_open() -> list[PlaylistDTO]: """ Return a list of open playlists """ return _playlists_where(Playlists.open.is_(True)) def playlist_rename(playlist_id: int, new_name: str) -> None: """ Rename playlist """ with db.Session() as session: session.execute( update(Playlists) .where(Playlists.id == playlist_id) .values(name=new_name) ) session.commit() def playlist_row_count(playlist_id: int) -> int: """ Return number of rows in playlist """ with db.Session() as session: count = session.scalar( select(func.count()) .select_from(PlaylistRows) .where(PlaylistRows.playlist_id == playlist_id) ) return count def playlist_save_as_template(playlist_id: int, template_name: str) -> None: """ Save playlist as templated """ new_template = playlist_create(template_name, 0, as_template=True) playlist_copy(playlist_id, new_template.id) def playlists_templates_all() -> list[PlaylistDTO]: """ Return a list of playlist templates """ return _playlists_where(Playlists.is_template.is_(True)) def playlists_template_by_id(playlist_id: int) -> PlaylistDTO | None: """ Return a list of closed playlists """ playlist_list = _playlists_where( Playlists.playlist_id == playlist_id, Playlists.is_template.is_(True) ) if not playlist_list: return None if len(playlist_list) > 1: raise ApplicationError(f"Duplicate {playlist_id=}") return playlist_list[0] # @log_call def playlist_update_row_numbers( playlist_id: int, id_to_row_number: list[dict[int, int]] ) -> None: """ Update playlistrows rownumbers for passed playlistrow_ids playlist_id is only needed for sanity checking """ with db.Session() as session: session.execute(update(PlaylistRows), id_to_row_number) session.commit() # Sanity check _playlist_check_playlist(session, playlist_id, fix=False) # @log_call def playlist_remove_comments(playlist_id: int, row_numbers: list[int]) -> None: """ Remove comments from rows in playlist """ with db.Session() as session: session.execute( update(PlaylistRows) .where( PlaylistRows.playlist_id == playlist_id, PlaylistRows.row_number.in_(row_numbers), ) .values(note="") ) session.commit() # @log_call def playlist_remove_rows(playlist_id: int, row_numbers: list[int]) -> None: """ Remove rows from playlist Delete from highest row back so that not yet deleted row numbers don't change. """ with db.Session() as session: for row_number in sorted(row_numbers, reverse=True): session.execute( delete(PlaylistRows).where( PlaylistRows.playlist_id == playlist_id, PlaylistRows.row_number == row_number, ) ) # Fixup row number to remove gaps _playlist_check_playlist(session, playlist_id, fix=True) session.commit() # @log_call def playlist_save_tabs(playlist_id_to_tab: dict[int, int]) -> None: """ Save the tab numbers of the open playlists. """ with db.Session() as session: # Clear all existing tab numbers session.execute( update(Playlists) .where(Playlists.id.in_(playlist_id_to_tab.keys())) .values(tab=None) ) for (playlist_id, tab) in playlist_id_to_tab.items(): session.execute( update(Playlists) .where(Playlists.id == playlist_id) .values(tab=tab) ) session.commit() # @log_call def playlist_update_template_favourite(template_id: int, favourite: bool) -> None: """Update template favourite""" with db.Session() as session: session.execute( update(Playlists) .where(Playlists.id == template_id) .values(favourite=favourite) ) session.commit() # Playlist Rows # @log_call def playlistrow_by_id(playlistrow_id: int) -> PlaylistRowDTO | None: """ Return specific row DTO """ with db.Session() as session: record = ( session.execute(select(PlaylistRows).where(PlaylistRows.id == playlistrow_id)) .scalars() .one_or_none() ) if not record: return None track = None if record.track_id: track = track_by_id(record.track_id) dto = PlaylistRowDTO( note=record.note, played=record.played, playlist_id=record.playlist_id, playlistrow_id=record.id, row_number=record.row_number, track=track, ) return dto def playlistrows_by_playlist( playlist_id: int, check_playlist_itegrity: bool = True ) -> list[PlaylistRowDTO]: with db.Session() as session: # TODO: would be good to be confident at removing this if check_playlist_itegrity: _playlist_check_playlist( session=session, playlist_id=playlist_id, fix=False ) records = session.scalars( select(PlaylistRows) .where(PlaylistRows.playlist_id == playlist_id) .order_by(PlaylistRows.row_number) ).all() dto_list = [] for record in records: track = None if record.track_id: track = track_by_id(record.track_id) dto = PlaylistRowDTO( note=record.note, played=record.played, playlist_id=record.playlist_id, playlistrow_id=record.id, row_number=record.row_number, track=track, ) dto_list.append(dto) return dto_list # Playdates # @log_call def playdates_get_last(track_id: int, limit: int = 5) -> str: """ Return the most recent 'limit' dates that this track has been played as a text list """ with db.Session() as session: playdates = session.scalars( Playdates.select() .where(Playdates.track_id == track_id) .order_by(Playdates.lastplayed.desc()) .limit(limit) ).all() return "
".join( [ a.lastplayed.strftime(Config.LAST_PLAYED_TOOLTIP_DATE_FORMAT) for a in playdates ] ) def playdates_update(track_id: int) -> None: """ Update playdates for passed track """ with db.Session() as session: _ = Playdates(session, track_id) def playdates_between_dates( start: dt.datetime, end: dt.datetime | None = None ) -> list[PlaydatesDTO]: """ Return a list of PlaydateDTO objects from between times (until now if end is None) """ if end is None: end = dt.datetime.now() stmt = select( Playdates.id.label("playdate_id"), Playdates.lastplayed, Playdates.track_id, Playdates.track, ).where( Playdates.lastplayed >= start, Playdates.lastplayed <= end ) results: list[PlaydatesDTO] = [] with db.Session() as session: records = session.execute(stmt).all() for record in records: dto = PlaydatesDTO( playdate_id=record.playdate_id, lastplayed=record.lastplayed, track_id=record.track_id, artist=record.track.artist, bitrate=record.track.bitrate, duration=record.track.duration, fade_at=record.track.fade_at, intro=record.track.intro, path=record.track.path, silence_at=record.track.silence_at, start_gap=record.track.start_gap, title=record.track.title, ) results.append(dto) return results # Queries # @log_call def _queries_where( query: BinaryExpression | ColumnElement[bool], ) -> list[QueryDTO]: """ Return queries selected by query """ results: list[QueryDTO] = [] with db.Session() as session: records = session.scalars( select(Queries) .where(query) ).all() for record in records: dto = QueryDTO( favourite=record.favourite, filter=record.filter, name=record.name, query_id=record.id, ) results.append(dto) return results def queries_all(favourites_only: bool = False) -> list[QueryDTO]: """Return a list of all queries""" query = Queries.id > 0 return _queries_where(query) def query_by_id(query_id: int) -> QueryDTO | None: """Return query""" query_list = _queries_where(Queries.id == query_id) if not query_list: return None if len(query_list) > 1: raise ApplicationError(f"Duplicate {query_id=}") return query_list[0] def query_create(name: str, filter: Filter) -> QueryDTO: """ Create a query and return the DTO """ with db.Session() as session: try: query = Queries(session=session, name=name, filter=filter) query_id = query.id session.commit() except Exception: raise ApplicationError("Can't create Query") new_query = query_by_id(query_id) if not new_query: raise ApplicationError("Unable to create new query") return new_query def query_delete(query_id: int) -> None: """Delete query""" with db.Session() as session: query = session.get(Queries, query_id) session.delete(query) session.commit() def query_update_favourite(query_id: int, favourite: bool) -> None: """Update query favourite""" with db.Session() as session: session.execute( update(Queries).where(Queries.id == query_id).values(favourite=favourite) ) session.commit() def query_update_filter(query_id: int, filter: Filter) -> None: """Update query filter""" with db.Session() as session: session.execute( update(Queries).where(Queries.id == query_id).values(filter=filter) ) session.commit() def query_update_name(query_id: int, name: str) -> None: """Update query name""" with db.Session() as session: session.execute(update(Queries).where(Queries.id == query_id).values(name=name)) session.commit() # Misc def setting_get(name: str) -> int | None: """ Get int setting """ with db.Session() as session: record = ( session.execute(select(Settings).where(Settings.name == name)) .scalars() .one_or_none() ) if not record: return None return record.f_int def setting_set(name: str, value: int) -> None: """ Add int setting """ with db.Session() as session: record = ( session.execute(select(Settings).where(Settings.name == name)) .scalars() .one_or_none() ) if not record: record = Settings(session=session, name=name) if not record: raise ApplicationError("Can't create Settings record") record.f_int = value session.commit() def db_name_get() -> str: """Return database name""" with db.Session() as session: if session.bind: dbname = session.bind.engine.url.database return dbname return Config.DB_NOT_FOUND