urma/app/urma.py

312 lines
9.3 KiB
Python
Executable File

#! /usr/bin/env python
import ipdb
import os
import pickle
import random
import stackprinter
import sys
from config import Config
from dbconfig import engine, Session, scoped_session
from helpers import (
index_ojects_by_parameter,
send_mail,
)
from log import log
from mastodon import Mastodon
from models import (
Accounts,
Attachments,
Base,
Hashtags,
Posts,
PostTags,
)
from typing import List
from PyQt5.QtWidgets import (
QApplication,
QLabel,
QMainWindow,
QPushButton,
)
from ui.main_window_ui import Ui_MainWindow # type: ignore
TESTDATA = "/home/kae/git/urma/hometl.pickle"
# Mastodon.create_app(
# 'urma',
# api_base_url='mastodon.org.uk',
# to_file='urma_clientcred.secret'
# )
# API_BASE_URL = 'mastodon.org.uk'
# mastodon = Mastodon(client_id = 'urma_clientcred.secret',)
# mastodon.log_in('kae@midnighthax.com', '^ZUaiC8P6vLV49',
# to_file='urma_usercred.secret')
class MastodonAPI:
def __init__(self, access_token: str) -> None:
"""
Initialise access to Mastodon
"""
self.mastodon = Mastodon(access_token=access_token)
self.me = self.mastodon.me()
def get_account_following(self):
"""
Return a list of account_dicts that we are following
"""
page1 = self.mastodon.account_following(self.me.id)
return self.mastodon.fetch_remaining(page1)
def get_hashtag_following(self):
"""
Return a list of hashtag_dicts that we are following
"""
page1 = self.mastodon.tag_following(self.me.id)
return self.mastodon.fetch_remaining(page1)
class UnratedPosts:
"""
Return unrated posts one at a time
"""
def __init__(self, session: Session) -> None:
self.dataset = Posts.get_unrated_posts(session)
self.pointer = None
def next(self) -> Posts:
# Set to first record if this is the first time we're called
if self.pointer is None:
self.pointer = 0
else:
self.pointer += 1
if self.pointer >= len(self.dataset):
# We've reached end of dataset
self.pointer = None
return None
else:
return self.dataset[self.pointer]
def prev(self) -> Posts:
# Set to last record if this is the first time we're called
if self.pointer is None:
self.pointer = len(self.dataset) - 1
else:
self.pointer -= 1
if self.pointer < 0:
# We've reached end of dataset
self.pointer = None
return None
else:
return self.dataset[self.pointer]
class Window(QMainWindow, Ui_MainWindow):
def __init__(self, parent=None) -> None:
super().__init__(parent)
self.setupUi(self)
self.mastapi = MastodonAPI(Config.ACCESS_TOKEN)
self.btnNext.clicked.connect(self.next)
self.btnPrev.clicked.connect(self.prev)
with Session() as session:
self.dataset = UnratedPosts(session)
record = self.dataset.next()
self.display(session, record)
self.update_followed_accounts(session)
self.update_followed_hashtags(session)
import ipdb; ipdb.set_trace()
def display(self, session: Session, record: UnratedPosts) -> None:
"""
Display passed post
"""
if not record:
return
if record not in session:
session.add(record)
self.txtUsername.setText(record.account.username)
if record.reblog:
self.txtBoosted.setEnabled(True)
self.display
elif record.child_id:
self.txtBoosted.setEnabled(True)
self.txtPost.setText("reblog child")
else:
self.txtPost.setHtml(record.content)
unfollowed_hashtags = [
a.name for a in record.hashtags if not a.followed]
followed_hashtags = [a.name for a in record.hashtags if a.followed]
hasttag_text = (
f"Followed: {', '.join(followed_hashtags)}\n\n"
f"Unfollowed: {', '.join(unfollowed_hashtags)}\n\n"
)
self.txtHashtags.setText(hasttag_text)
def next(self) -> None:
with Session() as session:
record = self.dataset.next()
self.display(session, record)
def prev(self) -> None:
with Session() as session:
record = self.dataset.prev()
self.display(session, record)
def update_followed_accounts(self, session: Session) -> None:
"""
Retrieve list of followed accounts and update accounts
in database to match
"""
mast_followed_accounts = self.mastapi.get_account_following()
mast_followed_accounts_d = index_ojects_by_parameter(
mast_followed_accounts, "username")
our_followed_accounts = Accounts.get_followed(session)
our_followed_accounts_d = index_ojects_by_parameter(
our_followed_accounts, "username")
# Add those we are missing
for username in (
set(mast_followed_accounts_d.keys()) -
set(our_followed_accounts_d.keys())
):
account = Accounts.get_or_create(
session, str(mast_followed_accounts_d[username].id)
)
account.followed = True
# Remove any we no longer follow
for username in (
set(our_followed_accounts_d.keys()) -
set(mast_followed_accounts_d.keys())
):
account = Accounts.get_or_create(
session, str(our_followed_accounts_d[username].account_id)
)
account.followed = False
def update_followed_hashtags(self, session: Session) -> None:
"""
Retrieve list of followed hashtags and update hashtags
"""
mast_followed_hashtags = self.mastapi.get_hashtag_following()
mast_followed_hashtags_d = index_ojects_by_parameter(
mast_followed_hashtags, "name")
our_followed_hashtags = Hashtags.get_followed(session)
our_followed_hashtags_d = index_ojects_by_parameter(
our_followed_hashtags, "name")
# Add those we are missing
for name in (
set(mast_followed_hashtags_d.keys()) -
set(our_followed_hashtags_d.keys())
):
hashtag = Hashtags.get_or_create(
session, name, mast_followed_hashtags_d[name].url)
hashtag.followed = True
# Remove any we no longer follow
for name in (
set(our_followed_hashtags_d.keys()) -
set(mast_followed_hashtags_d.keys())
):
hashtag = hashtags.get_or_create(
session, name, our_followed_hashtags_d[username].name)
hashtag.followed = False
# class HoldingPot:
# def process_post(post):
# rec = Posts.get_or_create(session, str(post.id))
# if rec.account_id is not None:
# # We already have this post
# return
#
# # Create account record if needed
# account_rec = Accounts.get_or_create(session, str(post.account.id))
# if account_rec.username is None:
# account_rec.username = post.account.username
# account_rec.acct = post.account.acct
# account_rec.display_name = post.account.display_name
# account_rec.bot = post.account.bot
# account_rec.url = post.account.url
# rec.account_id = account_rec.id
#
# # Create hashtag records as needed
# for tag in post.tags:
# hashtag = Hashtags.get_or_create(session, tag.name, tag.url)
# rec.hashtags.append(hashtag)
#
# # Handle media
# for media in post.media_attachments:
# media_rec = Attachments.get_or_create(session,
# str(media.id), rec.id)
# if not media_rec.type:
# media_rec.type = media.type
# media_rec.url = media.url
# media_rec.preview_url = media.preview_url
# media_rec.description = media.description
#
# rec.account_id = account_rec.id
# rec.created_at = post.created_at
# rec.uri = post.uri
# rec.url = post.url
# rec.content = post.content
#
# if post.reblog:
# rec.child_id = process_post(post.reblog).id
#
# return rec
#
# # Data for development
# with open(TESTDATA, "rb") as inp:
# hometl = pickle.load(inp)
#
# with Session() as session:
# for post in hometl:
# process_post(post)
if __name__ == "__main__":
"""
If command line arguments given, carry out requested function and
exit. Otherwise run full application.
"""
try:
Base.metadata.create_all(engine)
app = QApplication(sys.argv)
win = Window()
win.show()
sys.exit(app.exec())
except Exception as exc:
if os.environ["URMA_ENV"] != "DEVELOPMENT":
msg = stackprinter.format(exc)
send_mail(Config.ERRORS_TO, Config.ERRORS_FROM,
"Exception from urma", msg)
print("\033[1;31;47mUnhandled exception starts")
stackprinter.show(style="darkbg")
print("Unhandled exception ends\033[1;37;40m")