Rebase dev onto v2_id

This commit is contained in:
Keith Edmunds 2022-03-02 09:25:59 +00:00
parent 3a7b09f025
commit a91309477b
10 changed files with 797 additions and 593 deletions

View File

@ -4,7 +4,7 @@
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/app" isTestSource="false" />
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="jdk" jdkName="Poetry (musicmuster) (2)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">

View File

@ -55,7 +55,9 @@ class Config(object):
NOTE_TIME_FORMAT = "%H:%M:%S"
ROOT = os.environ.get('ROOT') or "/home/kae/music"
TESTMODE = True
TOD_TIME_FORMAT = "%H:%M:%S"
TIMER_MS = 500
TRACK_TIME_FORMAT = "%H:%M:%S"
VOLUME_VLC_DEFAULT = 75

View File

@ -4,22 +4,22 @@ import psutil
from config import Config
from datetime import datetime
from pydub import AudioSegment
from mutagen.flac import FLAC
from mutagen.mp3 import MP3
from PyQt5.QtWidgets import QMessageBox
from tinytag import TinyTag
from typing import Dict, Optional, Union
def ask_yes_no(title, question):
def ask_yes_no(title: str, question: str) -> bool:
"""Ask question; return True for yes, False for no"""
button_reply = QMessageBox.question(None, title, question)
button_reply: bool = QMessageBox.question(None, title, question)
return button_reply == QMessageBox.Yes
def fade_point(audio_segment, fade_threshold=0,
chunk_size=Config.AUDIO_SEGMENT_CHUNK_SIZE):
def fade_point(
audio_segment: AudioSegment, fade_threshold: int = 0,
chunk_size: int = Config.AUDIO_SEGMENT_CHUNK_SIZE):
"""
Returns the millisecond/index of the point where the volume drops below
the maximum and doesn't get louder again.
@ -30,15 +30,15 @@ def fade_point(audio_segment, fade_threshold=0,
assert chunk_size > 0 # to avoid infinite loop
segment_length = audio_segment.duration_seconds * 1000 # ms
trim_ms = segment_length - chunk_size
max_vol = audio_segment.dBFS
segment_length: int = audio_segment.duration_seconds * 1000 # ms
trim_ms: int = segment_length - chunk_size
max_vol: int = audio_segment.dBFS
if fade_threshold == 0:
fade_threshold = max_vol
while (
audio_segment[trim_ms:trim_ms + chunk_size].dBFS < fade_threshold
and trim_ms > 0): # noqa W503
audio_segment[trim_ms:trim_ms + chunk_size].dBFS < fade_threshold
and trim_ms > 0): # noqa W503
trim_ms -= chunk_size
# if there is no trailing silence, return lenght of track (it's less
@ -46,7 +46,7 @@ def fade_point(audio_segment, fade_threshold=0,
return int(trim_ms)
def get_audio_segment(path):
def get_audio_segment(path: str) -> Optional[AudioSegment]:
try:
if path.endswith('.mp3'):
return AudioSegment.from_mp3(path)
@ -56,12 +56,12 @@ def get_audio_segment(path):
return None
def get_tags(path):
def get_tags(path: str) -> Dict[str, Union[str, int]]:
"""
Return a dictionary of title, artist, duration-in-milliseconds and path.
"""
tag = TinyTag.get(path)
tag: TinyTag = TinyTag.get(path)
return dict(
title=tag.title,
@ -71,7 +71,8 @@ def get_tags(path):
)
def get_relative_date(past_date, reference_date=None):
def get_relative_date(past_date: datetime, reference_date: datetime = None) \
-> str:
"""
Return how long before reference_date past_date is as string.
@ -91,6 +92,11 @@ def get_relative_date(past_date, reference_date=None):
if past_date > reference_date:
return "get_relative_date() past_date is after relative_date"
days: int
days_str: str
weeks: int
weeks_str: str
weeks, days = divmod((reference_date.date() - past_date.date()).days, 7)
if weeks == days == 0:
# Played today, so return time instead
@ -106,8 +112,10 @@ def get_relative_date(past_date, reference_date=None):
return f"{weeks} {weeks_str}, {days} {days_str} ago"
def leading_silence(audio_segment, silence_threshold=Config.DBFS_SILENCE,
chunk_size=Config.AUDIO_SEGMENT_CHUNK_SIZE):
def leading_silence(
audio_segment: AudioSegment,
silence_threshold: int = Config.DBFS_SILENCE,
chunk_size: int = Config.AUDIO_SEGMENT_CHUNK_SIZE):
"""
Returns the millisecond/index that the leading silence ends.
audio_segment - the segment to find silence in
@ -117,10 +125,10 @@ def leading_silence(audio_segment, silence_threshold=Config.DBFS_SILENCE,
https://github.com/jiaaro/pydub/blob/master/pydub/silence.py
"""
trim_ms = 0 # ms
trim_ms: int = 0 # ms
assert chunk_size > 0 # to avoid infinite loop
while (
audio_segment[trim_ms:trim_ms + chunk_size].dBFS < # noqa W504
audio_segment[trim_ms:trim_ms + chunk_size].dBFS < # noqa W504
silence_threshold and trim_ms < len(audio_segment)):
trim_ms += chunk_size
@ -128,7 +136,13 @@ def leading_silence(audio_segment, silence_threshold=Config.DBFS_SILENCE,
return min(trim_ms, len(audio_segment))
def ms_to_mmss(ms, decimals=0, negative=False):
def ms_to_mmss(ms: int, decimals: int = 0, negative: bool = False) -> str:
"""Convert milliseconds to mm:ss"""
minutes: int
remainder: int
seconds: float
if not ms:
return "-"
sign = ""
@ -149,7 +163,7 @@ def ms_to_mmss(ms, decimals=0, negative=False):
return f"{sign}{minutes:.0f}:{seconds:02.{decimals}f}"
def open_in_audacity(path):
def open_in_audacity(path: str) -> Optional[bool]:
"""
Open passed file in Audacity
@ -164,19 +178,21 @@ def open_in_audacity(path):
if not path:
return False
to_pipe = '/tmp/audacity_script_pipe.to.' + str(os.getuid())
from_pipe = '/tmp/audacity_script_pipe.from.' + str(os.getuid())
EOL = '\n'
to_pipe: str = '/tmp/audacity_script_pipe.to.' + str(os.getuid())
from_pipe: str = '/tmp/audacity_script_pipe.from.' + str(os.getuid())
eol: str = '\n'
def send_command(command):
def send_command(command: str) -> None:
"""Send a single command."""
to_audacity.write(command + EOL)
to_audacity.write(command + eol)
to_audacity.flush()
def get_response():
def get_response() -> str:
"""Return the command response."""
result = ''
line = ''
result: str = ''
line: str = ''
while True:
result += line
line = from_audacity.readline()
@ -184,8 +200,9 @@ def open_in_audacity(path):
break
return result
def do_command(command):
def do_command(command: str) -> str:
"""Send one command, and return the response."""
send_command(command)
response = get_response()
return response
@ -195,12 +212,15 @@ def open_in_audacity(path):
do_command(f'Import2: Filename="{path}"')
def show_warning(title, msg):
"Display a warning to user"
def show_warning(title: str, msg: str) -> None:
"""Display a warning to user"""
QMessageBox.warning(None, title, msg, buttons=QMessageBox.Cancel)
def trailing_silence(audio_segment, silence_threshold=-50.0,
chunk_size=Config.AUDIO_SEGMENT_CHUNK_SIZE):
def trailing_silence(
audio_segment: AudioSegment, silence_threshold: int = -50,
chunk_size: int = Config.AUDIO_SEGMENT_CHUNK_SIZE):
"""Return fade point from start in milliseconds"""
return fade_point(audio_segment, silence_threshold, chunk_size)

View File

@ -9,7 +9,7 @@ from config import Config
class LevelTagFilter(logging.Filter):
"Add leveltag"
"""Add leveltag"""
def filter(self, record):
# Extract the first character of the level name
@ -32,9 +32,9 @@ syslog = logging.handlers.SysLogHandler(address='/dev/log')
syslog.setLevel(Config.LOG_LEVEL_SYSLOG)
# Filter
filter = LevelTagFilter()
syslog.addFilter(filter)
stderr.addFilter(filter)
local_filter = LevelTagFilter()
syslog.addFilter(local_filter)
stderr.addFilter(local_filter)
# create formatter and add it to the handlers
stderr_fmt = logging.Formatter('[%(asctime)s] %(leveltag)s: %(message)s',
@ -103,6 +103,6 @@ if __name__ == "__main__":
return i()
def i():
1 / 0
return 1 / 0
f()

View File

@ -6,6 +6,9 @@ import re
import sqlalchemy
from datetime import datetime
from typing import List, Optional
from pydub import AudioSegment
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta
from sqlalchemy import (
@ -23,7 +26,7 @@ from sqlalchemy.orm import (
backref,
relationship,
sessionmaker,
scoped_session
scoped_session, RelationshipProperty
)
from sqlalchemy.orm.collections import attribute_mapped_collection
from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound
@ -48,7 +51,7 @@ engine = sqlalchemy.create_engine(
echo=Config.DISPLAY_SQL,
pool_pre_ping=True)
sm = sessionmaker(bind=engine)
sm: sessionmaker = sessionmaker(bind=engine)
Session = scoped_session(sm)
Base: DeclarativeMeta = declarative_base()
@ -63,16 +66,18 @@ def db_init():
class NoteColours(Base):
__tablename__ = 'notecolours'
id = Column(Integer, primary_key=True, autoincrement=True)
substring = Column(String(256), index=False)
colour = Column(String(21), index=False)
enabled = Column(Boolean, default=True, index=True)
is_regex = Column(Boolean, default=False, index=False)
is_casesensitive = Column(Boolean, default=False, index=False)
order = Column(Integer, index=True)
id: int = Column(Integer, primary_key=True, autoincrement=True)
substring: str = Column(String(256), index=False)
colour: str = Column(String(21), index=False)
enabled: bool = Column(Boolean, default=True, index=True)
is_regex: bool = Column(Boolean, default=False, index=False)
is_casesensitive: bool = Column(Boolean, default=False, index=False)
order: int = Column(Integer, index=True)
def __init__(self, session, substring, colour, enabled=True,
is_regex=False, is_casesensitive=False, order=0):
def __init__(
self, session: Session, substring: str, colour: str,
enabled: bool = True, is_regex: bool = False,
is_casesensitive: bool = False, order: int = 0) -> None:
self.substring = substring
self.colour = colour
self.enabled = enabled
@ -83,36 +88,37 @@ class NoteColours(Base):
session.add(self)
session.commit()
def __repr__(self):
def __repr__(self) -> str:
return (
f"<NoteColour(id={self.id}, substring={self.substring}, "
f"colour={self.colour}>"
)
@classmethod
def get_all(cls, session):
def get_all(cls, session: Session) -> Optional[List["NoteColours"]]:
"""Return all records"""
return session.query(cls).all()
@classmethod
def get_by_id(cls, session, note_id):
def get_by_id(cls, session: Session, note_id: int) -> \
Optional["NoteColours"]:
"""Return record identified by id, or None if not found"""
return session.query(NoteColours).filter(
return session.query(NoteColours).local_filter(
NoteColours.id == note_id).first()
@staticmethod
def get_colour(session, text):
def get_colour(session: Session, text: str) -> Optional[str]:
"""
Parse text and return colour string if matched, else None
"""
for rec in (
session.query(NoteColours)
.filter(NoteColours.enabled.is_(True))
.order_by(NoteColours.order)
.all()
session.query(NoteColours)
.local_filter(NoteColours.enabled.is_(True))
.order_by(NoteColours.order)
.all()
):
if rec.is_regex:
flags = re.UNICODE
@ -135,14 +141,16 @@ class NoteColours(Base):
class Notes(Base):
__tablename__ = 'notes'
id = Column(Integer, primary_key=True, autoincrement=True)
playlist_id = Column(Integer, ForeignKey('playlists.id'))
playlist = relationship("Playlists", back_populates="notes",
lazy="joined")
row = Column(Integer, nullable=False)
note = Column(String(256), index=False)
id: int = Column(Integer, primary_key=True, autoincrement=True)
playlist_id: int = Column(Integer, ForeignKey('playlists.id'))
playlist: RelationshipProperty = relationship(
"Playlists", back_populates="notes", lazy="joined")
row: int = Column(Integer, nullable=False)
note: str = Column(String(256), index=False)
def __init__(self, session, playlist_id, row, text):
def __init__(
self, session: Session, playlist_id: int, row: int,
text: str) -> None:
"""Create note"""
DEBUG(f"Notes.__init__({playlist_id=}, {row=}, {text=})")
@ -152,12 +160,12 @@ class Notes(Base):
session.add(self)
session.commit()
def __repr__(self):
def __repr__(self) -> str:
return (
f"<Note(id={self.id}, row={self.row}, note={self.note}>"
)
def delete_note(self, session):
def delete_note(self, session: Session) -> None:
"""Delete note"""
DEBUG(f"delete_note({self.id=}")
@ -165,30 +173,31 @@ class Notes(Base):
session.query(Notes).filter_by(id=self.id).delete()
session.commit()
def update_note(self, session, row, text=None):
def update_note(
self, session: Session, row: int,
text: Optional[str] = None) -> None:
"""
Update note details. If text=None, don't change text.
"""
DEBUG(f"Notes.update_note({self.id=}, {row=}, {text=})")
note = session.query(Notes).filter_by(id=self.id).one()
note.row = row
self.row = row
if text:
note.note = text
self.note = text
session.commit()
class Playdates(Base):
__tablename__ = 'playdates'
id = Column(Integer, primary_key=True, autoincrement=True)
lastplayed = Column(DateTime, index=True, default=None)
track_id = Column(Integer, ForeignKey('tracks.id'))
tracks = relationship("Tracks", back_populates="playdates",
lazy="joined")
id: int = Column(Integer, primary_key=True, autoincrement=True)
lastplayed: datetime = Column(DateTime, index=True, default=None)
track_id: int = Column(Integer, ForeignKey('tracks.id'))
tracks: RelationshipProperty = relationship(
"Tracks", back_populates="playdates", lazy="joined")
def __init__(self, session, track):
def __init__(self, session: Session, track: "Tracks") -> None:
"""Record that track was played"""
DEBUG(f"add_playdate(track={track})")
@ -200,11 +209,11 @@ class Playdates(Base):
session.commit()
@staticmethod
def last_played(session, track_id):
def last_played(session: Session, track_id: int) -> Optional[datetime]:
"""Return datetime track last played or None"""
last_played = session.query(Playdates.lastplayed).filter(
(Playdates.track_id == track_id)
last_played: Optional[Playdates] = session.query(
Playdates.lastplayed).local_filter((Playdates.track_id == track_id)
).order_by(Playdates.lastplayed.desc()).first()
if last_played:
return last_played[0]
@ -212,12 +221,12 @@ class Playdates(Base):
return None
@staticmethod
def remove_track(session, track_id):
def remove_track(session: Session, track_id: int) -> None:
"""
Remove all records of track_id
"""
session.query(Playdates).filter(
session.query(Playdates).local_filter(
Playdates.track_id == track_id,
).delete()
session.commit()
@ -230,32 +239,32 @@ class Playlists(Base):
__tablename__ = "playlists"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(32), nullable=False, unique=True)
last_used = Column(DateTime, default=None, nullable=True)
loaded = Column(Boolean, default=True, nullable=False)
notes = relationship("Notes",
order_by="Notes.row",
back_populates="playlist",
lazy="joined")
id: int = Column(Integer, primary_key=True, autoincrement=True)
name: str = Column(String(32), nullable=False, unique=True)
last_used: datetime = Column(DateTime, default=None, nullable=True)
loaded: bool = Column(Boolean, default=True, nullable=False)
notes = relationship(
"Notes", order_by="Notes.row", back_populates="playlist", lazy="joined")
tracks = association_proxy('playlist_tracks', 'tracks')
row = association_proxy('playlist_tracks', 'row')
def __init__(self, session, name):
def __init__(self, session: Session, name: str) -> None:
self.name = name
session.add(self)
session.commit()
def __repr__(self):
def __repr__(self) -> str:
return f"<Playlists(id={self.id}, name={self.name}>"
def add_note(self, session, row, text):
def add_note(self, session: Session, row: int, text: str) -> Notes:
"""Add note to playlist at passed row"""
return Notes(session, self.id, row, text)
def add_track(self, session, track, row=None):
def add_track(
self, session: Session, track: "Tracks",
row: Optional[int] = None) -> None:
"""
Add track to playlist at given row.
If row=None, add to end of playlist
@ -266,7 +275,7 @@ class Playlists(Base):
PlaylistTracks(session, self.id, track.id, row)
def close(self, session):
def close(self, session: Session) -> None:
"""Record playlist as no longer loaded"""
self.loaded = False
@ -274,41 +283,41 @@ class Playlists(Base):
session.commit()
@classmethod
def get_all(cls, session):
def get_all(cls, session: Session) -> List["Playlists"]:
"""Returns a list of all playlists ordered by last use"""
return (
session.query(cls)
.order_by(cls.last_used.desc())
.order_by(cls.last_used.desc())
).all()
@classmethod
def get_by_id(cls, session, playlist_id):
return (session.query(cls).filter(cls.id == playlist_id)).one()
def get_by_id(cls, session: Session, playlist_id: int) -> "Playlists":
return (session.query(cls).local_filter(cls.id == playlist_id)).one()
@classmethod
def get_closed(cls, session):
def get_closed(cls, session: Session) -> List["Playlists"]:
"""Returns a list of all closed playlists ordered by last use"""
return (
session.query(cls)
.filter(cls.loaded.is_(False))
.order_by(cls.last_used.desc())
.local_filter(cls.loaded.is_(False))
.order_by(cls.last_used.desc())
).all()
@classmethod
def get_open(cls, session):
def get_open(cls, session: Session) -> List["Playlists"]:
"""
Return a list of playlists marked "loaded", ordered by loaded date.
"""
return (
session.query(cls)
.filter(cls.loaded.is_(True))
.order_by(cls.last_used.desc())
.local_filter(cls.loaded.is_(True))
.order_by(cls.last_used.desc())
).all()
def mark_open(self, session):
def mark_open(self, session: Session) -> None:
"""Mark playlist as loaded and used now"""
self.loaded = True
@ -316,21 +325,20 @@ class Playlists(Base):
session.add(self)
session.commit()
def remove_all_tracks(self, session):
def remove_all_tracks(self, session: Session) -> None:
"""
Remove all tracks from this playlist
"""
session.query(PlaylistTracks).filter(
session.query(PlaylistTracks).local_filter(
PlaylistTracks.playlist_id == self.id,
).delete()
session.commit()
def remove_track(self, session, row):
def remove_track(self, session: Session, row: int) -> None:
DEBUG(f"Playlist.remove_track({self.id=}, {row=})")
session.query(PlaylistTracks).filter(
session.query(PlaylistTracks).local_filter(
PlaylistTracks.playlist_id == self.id,
PlaylistTracks.row == row
).delete()
@ -340,12 +348,13 @@ class Playlists(Base):
class PlaylistTracks(Base):
__tablename__ = 'playlist_tracks'
id = Column(Integer, primary_key=True, autoincrement=True)
playlist_id = Column(Integer, ForeignKey('playlists.id'), primary_key=True)
track_id = Column(Integer, ForeignKey('tracks.id'), primary_key=True)
row = Column(Integer, nullable=False)
tracks = relationship("Tracks")
playlist = relationship(
id: int = Column(Integer, primary_key=True, autoincrement=True)
playlist_id: int = Column(Integer, ForeignKey('playlists.id'),
primary_key=True)
track_id: int = Column(Integer, ForeignKey('tracks.id'), primary_key=True)
row: int = Column(Integer, nullable=False)
tracks: RelationshipProperty = relationship("Tracks")
playlist: RelationshipProperty = relationship(
Playlists,
backref=backref(
"playlist_tracks",
@ -354,26 +363,33 @@ class PlaylistTracks(Base):
)
)
def __init__(self, session, playlist_id, track_id, row):
def __init__(
self, session: Session, playlist_id: int, track_id: int,
row: int) -> None:
DEBUG(f"PlaylistTracks.__init__({playlist_id=}, {track_id=}, {row=})")
self.playlist_id = playlist_id,
self.track_id = track_id,
self.playlist_id = playlist_id
self.track_id = track_id
self.row = row
session.add(self)
session.commit()
@staticmethod
def move_track(session, from_playlist_id, row, to_playlist_id):
def move_track(
session: Session, from_playlist_id: int, row: int,
to_playlist_id: int) -> None:
"""
Move track between playlists. This would be more efficient with
an ORM-enabled UPDATE statement, but this works just fine.
"""
DEBUG(
"PlaylistTracks.move_tracks("
f"{from_playlist_id=}, {rows=}, {to_playlist_id=})"
f"{from_playlist_id=}, {row=}, {to_playlist_id=})"
)
max_row = session.query(func.max(PlaylistTracks.row)).filter(
new_row: int
max_row: Optional[int] = session.query(
func.max(PlaylistTracks.row)).local_filter(
PlaylistTracks.playlist_id == to_playlist_id).scalar()
if max_row is None:
# Destination playlist is empty; use row 0
@ -382,7 +398,7 @@ class PlaylistTracks(Base):
# Destination playlist has tracks; add to end
new_row = max_row + 1
try:
record = session.query(PlaylistTracks).filter(
record: PlaylistTracks = session.query(PlaylistTracks).local_filter(
PlaylistTracks.playlist_id == from_playlist_id,
PlaylistTracks.row == row).one()
except NoResultFound:
@ -397,10 +413,12 @@ class PlaylistTracks(Base):
session.commit()
@staticmethod
def next_free_row(session, playlist):
def next_free_row(session: Session, playlist: Playlists) -> int:
"""Return next free row number"""
last_row = session.query(
row: int
last_row: int = session.query(
func.max(PlaylistTracks.row)
).filter_by(playlist_id=playlist.id).first()
# if there are no rows, the above returns (None, ) which is True
@ -415,16 +433,20 @@ class PlaylistTracks(Base):
class Settings(Base):
__tablename__ = 'settings'
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(32), nullable=False, unique=True)
f_datetime = Column(DateTime, default=None, nullable=True)
f_int = Column(Integer, default=None, nullable=True)
f_string = Column(String(128), default=None, nullable=True)
id: int = Column(Integer, primary_key=True, autoincrement=True)
name: str = Column(String(32), nullable=False, unique=True)
f_datetime: datetime = Column(DateTime, default=None, nullable=True)
f_int: int = Column(Integer, default=None, nullable=True)
f_string: str = Column(String(128), default=None, nullable=True)
@classmethod
def get_int(cls, session, name):
def get_int_settings(cls, session: Session, name: str) -> "Settings":
"""Get setting for an integer or return new setting record"""
int_setting: Settings
try:
int_setting = session.query(cls).filter(
int_setting = session.query(cls).local_filter(
cls.name == name).one()
except NoResultFound:
int_setting = Settings()
@ -434,7 +456,7 @@ class Settings(Base):
session.commit()
return int_setting
def update(self, session, data):
def update(self, session: Session, data):
for key, value in data.items():
assert hasattr(self, key)
setattr(self, key, value)
@ -444,46 +466,48 @@ class Settings(Base):
class Tracks(Base):
__tablename__ = 'tracks'
id = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String(256), index=True)
artist = Column(String(256), index=True)
duration = Column(Integer, index=True)
start_gap = Column(Integer, index=False)
fade_at = Column(Integer, index=False)
silence_at = Column(Integer, index=False)
path = Column(String(2048), index=False, nullable=False)
mtime = Column(Float, index=True)
lastplayed = Column(DateTime, index=True, default=None)
playlists = relationship("PlaylistTracks", back_populates="tracks",
lazy="joined")
playdates = relationship("Playdates", back_populates="tracks",
lazy="joined")
id: int = Column(Integer, primary_key=True, autoincrement=True)
title: str = Column(String(256), index=True)
artist: str = Column(String(256), index=True)
duration: int = Column(Integer, index=True)
start_gap: int = Column(Integer, index=False)
fade_at: int = Column(Integer, index=False)
silence_at: int = Column(Integer, index=False)
path: str = Column(String(2048), index=False, nullable=False)
mtime: float = Column(Float, index=True)
lastplayed: datetime = Column(DateTime, index=True, default=None)
playlists: RelationshipProperty = relationship("PlaylistTracks",
back_populates="tracks",
lazy="joined")
playdates: RelationshipProperty = relationship("Playdates",
back_populates="tracks",
lazy="joined")
def __init__(self, session, path):
def __init__(self, session: Session, path: str) -> None:
self.path = path
session.add(self)
session.commit()
def __repr__(self):
def __repr__(self) -> str:
return (
f"<Track(id={self.id}, title={self.title}, "
f"artist={self.artist}, path={self.path}>"
)
@staticmethod
def get_all_paths(session):
def get_all_paths(session) -> List[str]:
"""Return a list of paths of all tracks"""
return [a[0] for a in session.query(Tracks.path).all()]
@classmethod
def get_all_tracks(cls, session):
def get_all_tracks(cls, session: Session) -> List["Tracks"]:
"""Return a list of all tracks"""
return session.query(cls).all()
@classmethod
def get_or_create(cls, session, path):
def get_or_create(cls, session: Session, path: str) -> "Tracks":
"""
If a track with path exists, return it;
else created new track and return it
@ -492,14 +516,15 @@ class Tracks(Base):
DEBUG(f"Tracks.get_or_create({path=})")
try:
track = session.query(cls).filter(cls.path == path).one()
track = session.query(cls).local_filter(cls.path == path).one()
except NoResultFound:
track = Tracks(session, path)
return track
@classmethod
def get_from_filename(cls, session, filename):
def get_from_filename(cls, session: Session, filename: str) \
-> Optional["Tracks"]:
"""
Return track if one and only one track in database has passed
filename (ie, basename of path). Return None if zero or more
@ -508,93 +533,93 @@ class Tracks(Base):
DEBUG(f"Tracks.get_track_from_filename({filename=})")
try:
track = session.query(Tracks).filter(Tracks.path.ilike(
track = session.query(Tracks).local_filter(Tracks.path.ilike(
f'%{os.path.sep}{filename}')).one()
return track
except (NoResultFound, MultipleResultsFound):
return None
@classmethod
def get_from_path(cls, session, path):
def get_from_path(cls, session: Session, path: str) -> List["Tracks"]:
"""
Return track with passee path, or None.
"""
DEBUG(f"Tracks.get_track_from_path({path=})")
return session.query(Tracks).filter(Tracks.path == path).first()
return session.query(Tracks).local_filter(Tracks.path == path).first()
@classmethod
def get_by_id(cls, session, track_id):
def get_by_id(cls, session: Session, track_id: int) -> Optional["Tracks"]:
"""Return track or None"""
try:
DEBUG(f"Tracks.get_track(track_id={track_id})")
track = session.query(Tracks).filter(Tracks.id == track_id).one()
track = session.query(Tracks).local_filter(Tracks.id == track_id).one()
return track
except NoResultFound:
ERROR(f"get_track({track_id}): not found")
return None
def rescan(self, session):
def rescan(self, session: Session) -> None:
"""
Update audio metadata for passed track.
"""
audio = get_audio_segment(self.path)
audio: AudioSegment = get_audio_segment(self.path)
self.duration = len(audio)
self.fade_at = round(fade_point(audio) / 1000,
Config.MILLISECOND_SIGFIGS) * 1000
Config.MILLISECOND_SIGFIGS) * 1000
self.mtime = os.path.getmtime(self.path)
self.silence_at = round(trailing_silence(audio) / 1000,
Config.MILLISECOND_SIGFIGS) * 1000
Config.MILLISECOND_SIGFIGS) * 1000
self.start_gap = leading_silence(audio)
session.add(self)
session.commit()
@staticmethod
def remove_by_path(session, path):
def remove_by_path(session: Session, path: str) -> None:
"""Remove track with passed path from database"""
DEBUG(f"Tracks.remove_path({path=})")
try:
session.query(Tracks).filter(Tracks.path == path).delete()
session.query(Tracks).local_filter(Tracks.path == path).delete()
session.commit()
except IntegrityError as exception:
ERROR(f"Can't remove track with {path=} ({exception=})")
@classmethod
def search_artists(cls, session, text):
def search_artists(cls, session: Session, text: str) -> List["Tracks"]:
return (
session.query(cls)
.filter(cls.artist.ilike(f"%{text}%"))
.order_by(cls.title)
.local_filter(cls.artist.ilike(f"%{text}%"))
.order_by(cls.title)
).all()
@classmethod
def search_titles(cls, session, text):
def search_titles(cls, session: Session, text: str) -> List["Tracks"]:
return (
session.query(cls)
.filter(cls.title.ilike(f"%{text}%"))
.order_by(cls.title)
.local_filter(cls.title.ilike(f"%{text}%"))
.order_by(cls.title)
).all()
def update_lastplayed(self, session):
def update_lastplayed(self, session: Session) -> None:
self.lastplayed = datetime.now()
session.add(self)
session.commit()
def update_artist(self, session, artist):
def update_artist(self, session: Session, artist: str) -> None:
self.artist = artist
session.add(self)
session.commit()
def update_title(self, session, title):
def update_title(self, session: Session, title: str) -> None:
self.title = title
session.add(self)
session.commit()
def update_path(self, newpath):
def update_path(self, newpath: str) -> None:
self.path = newpath

View File

@ -86,13 +86,11 @@ class Music:
p.stop()
DEBUG(f"Releasing player {p=}", True)
p.release()
# Ensure we don't reference player after release
p = None
self.fading -= 1
def get_playtime(self):
"Return elapsed play time"
"""Return elapsed play time"""
with lock:
if not self.player:
@ -101,7 +99,7 @@ class Music:
return self.player.get_time()
def get_position(self):
"Return current position"
"""Return current position"""
with lock:
DEBUG("music.get_position", True)
@ -147,13 +145,13 @@ class Music:
return self.fading > 0
def set_position(self, ms):
"Set current play time in milliseconds from start"
"""Set current play time in milliseconds from start"""
with lock:
return self.player.set_time(ms)
def set_volume(self, volume):
"Set maximum volume used for player"
"""Set maximum volume used for player"""
with lock:
if not self.player:
@ -163,7 +161,7 @@ class Music:
self.player.audio_set_volume(volume)
def stop(self):
"Immediately stop playing"
"""Immediately stop playing"""
DEBUG(f"music.stop(), {self.player=}", True)

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python
import webbrowser
import os
import psutil
import sys
import urllib.parse
@ -9,8 +8,9 @@ import urllib.parse
from datetime import datetime, timedelta
from log import DEBUG, EXCEPTION
from typing import Callable, Dict, List, Optional, Tuple
from PyQt5.QtCore import QProcess, Qt, QTimer, QUrl
from PyQt5.QtCore import QEvent, QProcess, Qt, QTimer, QUrl
from PyQt5.QtGui import QColor
from PyQt5.QtWebEngineWidgets import QWebEngineView as QWebView
from PyQt5.QtWidgets import (
@ -37,38 +37,39 @@ from ui.main_window_ui import Ui_MainWindow
class Window(QMainWindow, Ui_MainWindow):
def __init__(self, parent=None):
def __init__(self, parent=None) -> None:
super().__init__(parent)
self.setupUi(self)
self.timer = QTimer()
self.even_tick = True
self.playing = False
self.timer: QTimer = QTimer()
self.even_tick: bool = True
self.playing: bool = False
self.connect_signals_slots()
self.disable_play_next_controls()
self.music = music.Music()
self.current_track = None
self.current_track_playlist_tab = None
self.info_tabs = {}
self.next_track = None
self.next_track_playlist_tab = None
self.previous_track = None
self.previous_track_position = None
self.music: music.Music = music.Music()
self.current_track: Optional[Tracks] = None
self.current_track_playlist_tab: Optional[PlaylistTab] = None
self.info_tabs: Optional[Dict[str, QWebView]] = {}
self.next_track: Optional[Tracks] = None
self.next_track_playlist_tab: Optional[PlaylistTab] = None
self.previous_track: Optional[Tracks] = None
self.previous_track_position: Optional[int] = None
self.spnVolume.setValue(Config.VOLUME_VLC_DEFAULT)
self.set_main_window_size()
self.lblSumPlaytime = QLabel("")
self.lblSumPlaytime: QLabel = QLabel("")
self.statusbar.addPermanentWidget(self.lblSumPlaytime)
self.visible_playlist_tab = self.tabPlaylist.currentWidget
self.visible_playlist_tab: Callable[[], PlaylistTab] = \
self.tabPlaylist.currentWidget
self.load_last_playlists()
self.enable_play_next_controls()
self.check_audacity()
self.timer.start(Config.TIMER_MS)
def add_file(self):
def add_file(self) -> None:
# TODO: V2 enahancement to import tracks
dlg = QFileDialog()
dlg.setFileMode(QFileDialog.ExistingFiles)
@ -85,22 +86,23 @@ class Window(QMainWindow, Ui_MainWindow):
# also be saved to database
self.visible_playlist_tab().insert_track(session, track)
def set_main_window_size(self):
def set_main_window_size(self) -> None:
# TODO: V2 check
"""Set size of window from database"""
with Session() as session:
record = Settings.get_int(session, "mainwindow_x")
record = Settings.get_int_settings(session, "mainwindow_x")
x = record.f_int or 1
record = Settings.get_int(session, "mainwindow_y")
record = Settings.get_int_settings(session, "mainwindow_y")
y = record.f_int or 1
record = Settings.get_int(session, "mainwindow_width")
record = Settings.get_int_settings(session, "mainwindow_width")
width = record.f_int or 1599
record = Settings.get_int(session, "mainwindow_height")
record = Settings.get_int_settings(session, "mainwindow_height")
height = record.f_int or 981
self.setGeometry(x, y, width, height)
@staticmethod
def check_audacity():
def check_audacity() -> None:
"""Offer to run Audacity if not running"""
if not Config.CHECK_AUDACITY_AT_STARTUP:
@ -113,10 +115,12 @@ class Window(QMainWindow, Ui_MainWindow):
QProcess.startDetached(Config.AUDACITY_COMMAND, [])
def clear_selection(self):
""" Clear selected row"""
if self.visible_playlist_tab():
self.visible_playlist_tab().clearSelection()
def closeEvent(self, event):
def closeEvent(self, event: QEvent) -> None:
"""Don't allow window to close when a track is playing"""
if self.music.playing():
@ -129,32 +133,32 @@ class Window(QMainWindow, Ui_MainWindow):
DEBUG("closeEvent() accepted")
with Session() as session:
record = Settings.get_int(session, "mainwindow_height")
record = Settings.get_int_settings(session, "mainwindow_height")
if record.f_int != self.height():
record.update(session, {'f_int': self.height()})
record = Settings.get_int(session, "mainwindow_width")
record = Settings.get_int_settings(session, "mainwindow_width")
if record.f_int != self.width():
record.update(session, {'f_int': self.width()})
record = Settings.get_int(session, "mainwindow_x")
record = Settings.get_int_settings(session, "mainwindow_x")
if record.f_int != self.x():
record.update(session, {'f_int': self.x()})
record = Settings.get_int(session, "mainwindow_y")
record = Settings.get_int_settings(session, "mainwindow_y")
if record.f_int != self.y():
record.update(session, {'f_int': self.y()})
# Find a playlist tab (as opposed to an info tab) and
# save column widths
if self.current_track_playlist_tab:
self.current_track_playlist_tab.close(session)
self.current_track_playlist_tab.close()
elif self.next_track_playlist_tab:
self.next_track_playlist_tab.close(session)
self.next_track_playlist_tab.close()
event.accept()
def connect_signals_slots(self):
def connect_signals_slots(self) -> None:
self.actionAdd_file.triggered.connect(self.add_file)
self.actionAdd_note.triggered.connect(self.create_note)
self.action_Clear_selection.triggered.connect(self.clear_selection)
@ -189,7 +193,7 @@ class Window(QMainWindow, Ui_MainWindow):
self.timer.timeout.connect(self.tick)
def create_playlist(self):
def create_playlist(self) -> None:
"""Create new playlist"""
dlg = QInputDialog(self)
@ -202,17 +206,23 @@ class Window(QMainWindow, Ui_MainWindow):
playlist = Playlists(session, dlg.textValue())
self.create_playlist_tab(session, playlist)
def change_volume(self, volume):
def change_volume(self, volume: int) -> None:
"""Change player maximum volume"""
DEBUG(f"change_volume({volume})")
self.music.set_volume(volume)
def close_playlist_tab(self):
def close_playlist_tab(self) -> None:
"""Close active playlist tab"""
self.close_tab(self.tabPlaylist.currentIndex())
def close_tab(self, index):
def close_tab(self, index: int) -> None:
"""
Close tab unless it holds the curren or next track
"""
if hasattr(self.tabPlaylist.widget(index), 'playlist'):
if self.tabPlaylist.widget(index) == (
self.current_track_playlist_tab):
@ -230,7 +240,7 @@ class Window(QMainWindow, Ui_MainWindow):
# Close regardless of tab type
self.tabPlaylist.removeTab(index)
def create_note(self):
def create_note(self) -> None:
"""Call playlist to create note"""
try:
@ -239,49 +249,73 @@ class Window(QMainWindow, Ui_MainWindow):
# Just return if there's no visible playlist tab
return
def disable_play_next_controls(self):
def disable_play_next_controls(self) -> None:
"""
Disable "play next" keyboard controls
"""
DEBUG("disable_play_next_controls()")
self.actionPlay_next.setEnabled(False)
self.statusbar.showMessage("Play controls: Disabled", 0)
def enable_play_next_controls(self):
def enable_play_next_controls(self) -> None:
"""
Enable "play next" keyboard controls
"""
DEBUG("enable_play_next_controls()")
self.actionPlay_next.setEnabled(True)
self.statusbar.showMessage("Play controls: Enabled", 0)
def end_of_track_actions(self):
"""Clean up after track played"""
def end_of_track_actions(self) -> None:
"""
Clean up after track played
# Set self.playing to False so that tick() doesn't see
# player=None and kick off end-of-track actions
Actions required:
- Set flag to say we're not playing a track
- Reset current track
- Tell playlist_tab track has finished
- Reset current playlist_tab
- Reset clocks
- Update headers
- Enable controls
"""
# Set flag to say we're not playing a track so that tick()
# doesn't see player=None and kick off end-of-track actions
self.playing = False
# Clean up metadata
# Reset current track
if self.current_track:
self.previous_track = self.current_track
self.current_track = None
# Tell playlist_tab track has finished and
# reset current playlist_tab
if self.current_track_playlist_tab:
self.current_track_playlist_tab.play_stopped()
self.current_track_playlist_tab = None
# Clean up display
# Reset clocks
self.frame_fade.setStyleSheet("")
self.label_silent_timer.setText("00:00")
self.frame_silent.setStyleSheet("")
self.label_end_timer.setText("00:00")
# Update headers
self.update_headers()
# Enable controls
self.enable_play_next_controls()
def export_playlist_tab(self):
def export_playlist_tab(self) -> None:
"""Export the current playlist to an m3u file"""
if not self.visible_playlist_tab():
return
# Get output filename
pathspec = QFileDialog.getSaveFileName(
pathspec: Tuple[str, str] = QFileDialog.getSaveFileName(
self, 'Save Playlist',
directory=f"{self.visible_playlist_tab().name}.m3u",
filter="M3U files (*.m3u);;All files (*.*)"
@ -289,7 +323,7 @@ class Window(QMainWindow, Ui_MainWindow):
if not pathspec:
return
path = pathspec[0]
path: str = pathspec[0]
if not path.endswith(".m3u"):
path += ".m3u"
@ -307,7 +341,7 @@ class Window(QMainWindow, Ui_MainWindow):
"\n"
)
def fade(self):
def fade(self) -> None:
"""Fade currently playing track"""
DEBUG("musicmuster:fade()", True)
@ -340,18 +374,19 @@ class Window(QMainWindow, Ui_MainWindow):
for playlist in Playlists.get_open(session):
self.create_playlist_tab(session, playlist)
def create_playlist_tab(self, session, playlist):
def create_playlist_tab(self, session: Session,
playlist: Playlists) -> None:
"""
Take the passed playlist database object, create a playlist tab and
add tab to display.
"""
playlist_tab = PlaylistTab(parent=self,
session=session, playlist=playlist)
idx = self.tabPlaylist.addTab(playlist_tab, playlist.name)
playlist_tab: PlaylistTab = PlaylistTab(
parent=self, session=session, playlist=playlist)
idx: int = self.tabPlaylist.addTab(playlist_tab, playlist.name)
self.tabPlaylist.setCurrentIndex(idx)
def move_selected(self):
def move_selected(self) -> None:
"""Move selected rows to another playlist"""
with Session() as session:
@ -396,12 +431,14 @@ class Window(QMainWindow, Ui_MainWindow):
# Update source playlist
self.visible_playlist_tab().remove_rows(rows)
def open_info_tab(self, title_list):
def open_info_tabs(self) -> None:
"""
Ensure we have info tabs for each of the passed titles
Called from update_headers
Ensure we have info tabs for next and current track titles
"""
title_list: List[str, str] = [self.previous_track.title,
self.current_track.title]
for title in title_list:
if title in self.info_tabs.keys():
# We already have a tab for this track
@ -433,19 +470,24 @@ class Window(QMainWindow, Ui_MainWindow):
url = Config.INFO_TAB_URL % txt
widget.setUrl(QUrl(url))
def play_next(self):
def play_next(self) -> None:
"""
Play next track.
If there is no next track set, return.
If there's currently a track playing, fade it.
Move next track to current track.
Play (new) current.
Tell playlist to update "current track" metadata. This will also
trigger a call to
Cue up next track in playlist if there is one.
Tell database to record it as played
Update metadata and headers, and repaint
Actions required:
- If there is no next track set, return.
- If there's currently a track playing, fade it.
- Move next track to current track.
- Update record of current track playlist_tab
- If current track on different playlist_tab to last, reset
last track playlist_tab colour
- Set current track playlist_tab colour
- Play (new) current track.
- Tell database to record it as played
- Tell playlist track is now playing
- Disable play next controls
- Update headers
- Update clocks
"""
DEBUG(
@ -462,43 +504,63 @@ class Window(QMainWindow, Ui_MainWindow):
return
with Session() as session:
# Stop current track, if any
self.stop_playing()
# If there's currently a track playing, fade it.
self.stop_playing(fade=True)
# Play next track
# Move next track to current track.
self.current_track = self.next_track
self.next_track = None
# If current track on different playlist_tab to last, reset
# last track playlist_tab colour
# Set current track playlist_tab colour
if self.current_track_playlist_tab != self.next_track_playlist_tab:
self.set_tab_colour(self.current_track_playlist_tab,
QColor(Config.COLOUR_NORMAL_TAB))
# Update record of current track playlist_tab
self.current_track_playlist_tab = self.next_track_playlist_tab
self.next_track_playlist_tab = None
# Set current track playlist_tab colour
self.set_tab_colour(self.current_track_playlist_tab,
QColor(Config.COLOUR_CURRENT_TAB))
self.music.play(self.current_track.path)
# Tell playlist so it can update its display
self.current_track_playlist_tab.play_started()
# Play (new) current track
self.music.play(self.current_track.path)
# Tell database to record it as played
Playdates(session, self.current_track)
# Tell playlist track is now playing
self.current_track_playlist_tab.play_started()
# Disable play next controls
self.disable_play_next_controls()
# Update headers
self.update_headers()
# Set time clocks
# Update clocks
now = datetime.now()
self.label_start_tod.setText(now.strftime("%H:%M:%S"))
silence_at = self.current_track.silence_at
silence_time = now + timedelta(milliseconds=silence_at)
self.label_silent_tod.setText(silence_time.strftime("%H:%M:%S"))
self.label_fade_length.setText(helpers.ms_to_mmss(
silence_at - self.current_track.fade_at
))
self.label_fade_length.setText(
helpers.ms_to_mmss(silence_at - self.current_track.fade_at)
)
def search_database(self) -> None:
"""Show dialog box to select and cue track from database"""
def search_database(self):
with Session() as session:
dlg = DbDialog(self, session)
dlg.exec()
def open_playlist(self):
def open_playlist(self) -> None:
"""Select and activate existing playlist"""
with Session() as session:
playlists = Playlists.get_closed(session)
dlg = SelectPlaylistDialog(self, playlists=playlists)
@ -507,35 +569,35 @@ class Window(QMainWindow, Ui_MainWindow):
playlist = Playlists.get_by_id(session, dlg.plid)
self.create_playlist_tab(session, playlist)
def select_next_row(self):
def select_next_row(self) -> None:
"""Select next or first row in playlist"""
self.visible_playlist_tab().select_next_row()
def select_played(self):
def select_played(self) -> None:
"""Select all played tracks in playlist"""
self.visible_playlist_tab().select_played_tracks()
def select_previous_row(self):
def select_previous_row(self) -> None:
"""Select previous or first row in playlist"""
self.visible_playlist_tab().select_previous_row()
def select_unplayed(self):
def select_unplayed(self) -> None:
"""Select all unplayed tracks in playlist"""
self.visible_playlist_tab().select_unplayed_tracks()
def set_tab_colour(self, widget, colour):
def set_tab_colour(self, widget, colour) -> None:
"""
Find the tab containing the widget and set the text colour
"""
idx = self.tabPlaylist.indexOf(widget)
idx: int = self.tabPlaylist.indexOf(widget)
self.tabPlaylist.tabBar().setTabTextColor(idx, colour)
def song_info_search(self):
def song_info_search(self) -> None:
"""
Open browser tab for Wikipedia, searching for
the first that exists of:
@ -544,7 +606,7 @@ class Window(QMainWindow, Ui_MainWindow):
- current track
"""
title = self.visible_playlist_tab().get_selected_title()
title: Optional[str] = self.visible_playlist_tab().get_selected_title()
if not title:
if self.next_track:
title = self.next_track.title
@ -552,36 +614,36 @@ class Window(QMainWindow, Ui_MainWindow):
if self.current_track:
title = self.current_track.title
if title:
# Wikipedia
txt = urllib.parse.quote_plus(title)
url = Config.INFO_TAB_URL % txt
webbrowser.open(url, new=2)
def stop(self):
def stop(self) -> None:
"""Stop playing immediately"""
DEBUG("musicmuster.stop()")
self.stop_playing(fade=False)
def stop_playing(self, fade=True):
"""Stop playing current track"""
def stop_playing(self, fade=True) -> None:
"""
Stop playing current track
Actions required:
- Return if not playing
- Stop/fade track
- Reset playlist_tab colour
- Run end-of-track actions
"""
DEBUG(f"musicmuster.stop_playing({fade=})", True)
# Set tab colour
if self.current_track_playlist_tab == self.next_track_playlist_tab:
self.set_tab_colour(self.current_track_playlist_tab,
QColor(Config.COLOUR_NEXT_TAB))
else:
self.set_tab_colour(self.current_track_playlist_tab,
QColor(Config.COLOUR_NORMAL_TAB))
# Return if not playing
if not self.music.playing():
DEBUG("musicmuster.stop_playing(): not playing", True)
self.end_of_track_actions()
return
# Stop/fade track
self.previous_track_position = self.music.get_position()
if fade:
DEBUG("musicmuster.stop_playing(): fading music", True)
@ -590,35 +652,51 @@ class Window(QMainWindow, Ui_MainWindow):
DEBUG("musicmuster.stop_playing(): stopping music", True)
self.music.stop()
# Reset playlist_tab colour
if self.current_track_playlist_tab == self.next_track_playlist_tab:
self.set_tab_colour(self.current_track_playlist_tab,
QColor(Config.COLOUR_NEXT_TAB))
else:
self.set_tab_colour(self.current_track_playlist_tab,
QColor(Config.COLOUR_NORMAL_TAB))
# Run end-of-track actions
self.end_of_track_actions()
# Release player
self.music.stop()
self.update_headers()
def this_is_the_next_track(self, playlist_tab, track):
def this_is_the_next_track(self, playlist_tab: PlaylistTab,
track: Tracks) -> None:
"""
This is notification from a playlist tab that it holds the next
track to be played.
Actions required:
- Clear next track if on other tab
- Reset tab colour if on other tab
- Note next playlist tab
- Set next playlist_tab tab colour
- Note next track
- Update headers
- Populate info tabs
"""
# The next track has been selected on the playlist_tab
# playlist. However, there may already be a 'next track'
# selected on another playlist that the user is overriding,
# in which case we need to reset that playlist.
# Clear next track if on another tab
if self.next_track_playlist_tab != playlist_tab:
# We need to reset the ex-next-track playlist
if self.next_track_playlist_tab:
self.next_track_playlist_tab.clear_next()
# Reset tab colour if it NOT the current playing tab
# Reset tab colour if on other tab
if (self.next_track_playlist_tab !=
self.current_track_playlist_tab):
self.set_tab_colour(
self.next_track_playlist_tab,
QColor(Config.COLOUR_NORMAL_TAB))
# Note next playlist tab
self.next_track_playlist_tab = playlist_tab
# self.next_track_playlist_tab is now set to correct playlist
# Set the colour of the next playlist tab if it isn't the
# Set next playlist_tab tab colour if it isn't the
# currently-playing tab
if (self.next_track_playlist_tab !=
self.current_track_playlist_tab):
@ -626,13 +704,18 @@ class Window(QMainWindow, Ui_MainWindow):
self.next_track_playlist_tab,
QColor(Config.COLOUR_NEXT_TAB))
# Note next track
self.next_track = track
# Update headers
self.update_headers()
def tick(self):
# Populate 'info' tabs
self.open_info_tabs()
def tick(self) -> None:
"""
Update screen
Carry out clock tick actions.
The Time of Day clock is updated every tick (500ms).
@ -640,22 +723,27 @@ class Window(QMainWindow, Ui_MainWindow):
one-second resolution, updating every 500ms can result in some
timers updating and then, 500ms later, other timers updating. That
looks odd.
Actions required:
- Update TOD clock
- If track is playing, update track clocks time and colours
- Else: run stop_track
"""
now = datetime.now()
self.lblTOD.setText(now.strftime("%H:%M:%S"))
# Update TOD clock
self.lblTOD.setText(datetime.now().strftime(Config.TOD_TIME_FORMAT))
self.even_tick = not self.even_tick
if not self.even_tick:
return
# If track is playing, update track clocks time and colours
if self.music.player and self.music.playing():
self.playing = True
playtime = self.music.get_playtime()
time_to_fade = (self.current_track.fade_at - playtime)
time_to_silence = (self.current_track.silence_at - playtime)
time_to_end = (self.current_track.duration - playtime)
playtime: int = self.music.get_playtime()
time_to_fade: int = (self.current_track.fade_at - playtime)
time_to_silence: int = (self.current_track.silence_at - playtime)
time_to_end: int = (self.current_track.duration - playtime)
# Elapsed time
if time_to_end < 500:
@ -702,19 +790,13 @@ class Window(QMainWindow, Ui_MainWindow):
if self.playing:
self.stop_playing()
def update_headers(self):
"""
Update last / current / next track headers.
Ensure a Wikipedia tab for each title.
"""
titles = []
def update_headers(self) -> None:
"""Update last / current / next track headers"""
try:
self.hdrPreviousTrack.setText(
f"{self.previous_track.title} - {self.previous_track.artist}"
)
titles.append(self.previous_track.title)
except AttributeError:
self.hdrPreviousTrack.setText("")
@ -722,7 +804,6 @@ class Window(QMainWindow, Ui_MainWindow):
self.hdrCurrentTrack.setText(
f"{self.current_track.title} - {self.current_track.artist}"
)
titles.append(self.current_track.title)
except AttributeError:
self.hdrCurrentTrack.setText("")
@ -730,17 +811,14 @@ class Window(QMainWindow, Ui_MainWindow):
self.hdrNextTrack.setText(
f"{self.next_track.title} - {self.next_track.artist}"
)
titles.append(self.next_track.title)
except AttributeError:
self.hdrNextTrack.setText("")
self.open_info_tab(titles)
class DbDialog(QDialog):
"""Select track from database"""
def __init__(self, parent, session):
def __init__(self, parent, session): # review
super().__init__(parent)
self.session = session
self.ui = Ui_Dialog()
@ -753,22 +831,22 @@ class DbDialog(QDialog):
self.ui.radioTitle.toggled.connect(self.title_artist_toggle)
self.ui.searchString.textEdited.connect(self.chars_typed)
record = Settings.get_int(self.session, "dbdialog_width")
record = Settings.get_int_settings(self.session, "dbdialog_width")
width = record.f_int or 800
record = Settings.get_int(self.session, "dbdialog_height")
record = Settings.get_int_settings(self.session, "dbdialog_height")
height = record.f_int or 600
self.resize(width, height)
def __del__(self):
record = Settings.get_int(self.session, "dbdialog_height")
def __del__(self): # review
record = Settings.get_int_settings(self.session, "dbdialog_height")
if record.f_int != self.height():
record.update(self.session, {'f_int': self.height()})
record = Settings.get_int(self.session, "dbdialog_width")
record = Settings.get_int_settings(self.session, "dbdialog_width")
if record.f_int != self.width():
record.update(self.session, {'f_int': self.width()})
def add_selected(self):
def add_selected(self): # review
if not self.ui.matchList.selectedItems():
return
@ -776,11 +854,11 @@ class DbDialog(QDialog):
track = item.data(Qt.UserRole)
self.add_track(track)
def add_selected_and_close(self):
def add_selected_and_close(self): # review
self.add_selected()
self.close()
def title_artist_toggle(self):
def title_artist_toggle(self): # review
"""
Handle switching between searching for artists and searching for
titles
@ -789,7 +867,7 @@ class DbDialog(QDialog):
# Logic is handled already in chars_typed(), so just call that.
self.chars_typed(self.ui.searchString.text())
def chars_typed(self, s):
def chars_typed(self, s): # review
if len(s) > 0:
if self.ui.radioTitle.isChecked():
matches = Tracks.search_titles(self.session, s)
@ -806,24 +884,24 @@ class DbDialog(QDialog):
t.setData(Qt.UserRole, track)
self.ui.matchList.addItem(t)
def double_click(self, entry):
def double_click(self, entry): # review
track = entry.data(Qt.UserRole)
self.add_track(track)
# Select search text to make it easier for next search
self.select_searchtext()
def add_track(self, track):
def add_track(self, track): # review
# Add to playlist on screen
self.parent().visible_playlist_tab().insert_track(
self.session, track)
# Select search text to make it easier for next search
self.select_searchtext()
def select_searchtext(self):
def select_searchtext(self): # review
self.ui.searchString.selectAll()
self.ui.searchString.setFocus()
def selection_changed(self):
def selection_changed(self): # review
if not self.ui.matchList.selectedItems():
return
@ -833,7 +911,7 @@ class DbDialog(QDialog):
class SelectPlaylistDialog(QDialog):
def __init__(self, parent=None, playlists=None):
def __init__(self, parent=None, playlists=None): # review
super().__init__(parent)
if playlists is None:
@ -846,9 +924,9 @@ class SelectPlaylistDialog(QDialog):
self.plid = None
with Session() as session:
record = Settings.get_int(session, "select_playlist_dialog_width")
record = Settings.get_int_settings(session, "select_playlist_dialog_width")
width = record.f_int or 800
record = Settings.get_int(session, "select_playlist_dialog_height")
record = Settings.get_int_settings(session, "select_playlist_dialog_height")
height = record.f_int or 600
self.resize(width, height)
@ -858,21 +936,21 @@ class SelectPlaylistDialog(QDialog):
p.setData(Qt.UserRole, plid)
self.ui.lstPlaylists.addItem(p)
def __del__(self):
def __del__(self): # review
with Session() as session:
record = Settings.get_int(session, "select_playlist_dialog_height")
record = Settings.get_int_settings(session, "select_playlist_dialog_height")
if record.f_int != self.height():
record.update(session, {'f_int': self.height()})
record = Settings.get_int(session, "select_playlist_dialog_width")
record = Settings.get_int_settings(session, "select_playlist_dialog_width")
if record.f_int != self.width():
record.update(session, {'f_int': self.width()})
def list_doubleclick(self, entry):
def list_doubleclick(self, entry): # review
self.plid = entry.data(Qt.UserRole)
self.accept()
def open(self):
def open(self): # review
if self.ui.lstPlaylists.selectedItems():
item = self.ui.lstPlaylists.currentItem()
self.plid = item.data(Qt.UserRole)

File diff suppressed because it is too large Load Diff

View File

@ -18,6 +18,7 @@ PyQtWebEngine = "^5.15.5"
pydub = "^0.25.1"
PyQt5-sip = "^12.9.1"
mypy = "^0.931"
sqlalchemy-stubs = "^0.4"
[tool.poetry.dev-dependencies]
mypy = "^0.931"

View File

@ -340,7 +340,7 @@ def test_playlisttracks_move_track(session):
assert len(tracks1) == 1
assert len(tracks2) == 1
assert tracks1[track1_row] == track1
assert tracks2[track2_row] == track2
assert tracks2[0] == track2
def test_tracks_get_all_paths(session):