urma/app/urma.py
2023-01-21 23:37:16 +00:00

515 lines
16 KiB
Python
Executable File

#! /usr/bin/env python
import argparse
import datetime
import ipdb
import os
import pickle
import random
import requests
import stackprinter
import subprocess
import sys
from config import Config
from dbconfig import (
engine,
Session,
scoped_session,
)
from helpers import (
index_ojects_by_parameter,
send_mail,
)
from log import log
from mastodon import Mastodon
from models import (
Accounts,
Base,
Hashtags,
Posts,
PostTags,
)
from sqlalchemy import (
func,
select,
)
from typing import List, Optional, Union
# TESTDATA = "/home/kae/git/urma/hometl.pickle"
#
# Mastodon.create_app(
# 'urma',
# api_base_url='mastodon.org.uk',
# to_file='urma_clientcred.secret'
# )
# API_BASE_URL = 'mastodon.org.uk'
# mastodon = Mastodon(client_id = 'urma_clientcred.secret',)
# mastodon.log_in('kae@midnighthax.com', '^ZUaiC8P6vLV49',
# to_file='urma_usercred.secret')
class MastodonAPI:
def __init__(self, access_token: str) -> None:
"""
Initialise access to Mastodon
"""
self.mastodon = Mastodon(access_token=access_token)
self.me = self.mastodon.me()
def get_account_following(self):
"""
Return a list of account_dicts that we are following
"""
page1 = self.mastodon.account_following(self.me.id)
return self.mastodon.fetch_remaining(page1)
def get_bookmarked(self, since: int) -> List[dict]:
"""
Return posts bookmarked since id 'since'
"""
results = []
data = self.mastodon.bookmarks()
while data:
# Add in new data
results.extend(data)
# Have we reached minimum id?
if min([a.id for a in data]) < since:
break
# Get more data
data = self.mastodon.fetch_next(data)
return results
def get_hashtag_following(self):
"""
Return a list of hashtag_dicts that we are following
"""
page1 = self.mastodon.tag_following(self.me.id)
return self.mastodon.fetch_remaining(page1)
def unbookmark(self, post_id: int) -> None:
"""
Remove bookmark on passed post ID
"""
log.debug(f"unbookmark({post_id=})")
_ = self.mastodon.status_unbookmark(post_id)
def update_database() -> None:
"""
Main loop
"""
mastapi = MastodonAPI(Config.ACCESS_TOKEN)
with Session() as session:
update_followed_accounts(session, mastapi)
update_followed_hashtags(session, mastapi)
get_and_process_favourited(session, mastapi)
get_and_process_bookmarked(session, mastapi)
def get_and_process_bookmarked(session, mastapi):
"""Get newly bookmarked posts and add to db"""
posts_fetched = 0
bookmarked = mastapi.mastodon.bookmarks()
while bookmarked and posts_fetched <= Config.MAX_POSTS_TO_FETCH:
posts_fetched += len(bookmarked)
if process_bookmarked_posts(session, bookmarked, mastapi.me.id):
return
bookmarked = mastapi.mastodon.fetch_next(bookmarked)
def get_and_process_favourited(session, mastapi):
"""Get newly favourited posts and add to db"""
posts_fetched = 0
favourited = mastapi.mastodon.favourites()
while favourited and posts_fetched <= Config.MAX_POSTS_TO_FETCH:
posts_fetched += len(favourited)
if process_favourited_posts(session, favourited, mastapi.me.id):
return
favourited = mastapi.mastodon.fetch_next(favourited)
def get_database_name():
"""Return database name as string"""
with Session() as session:
dbname = session.bind.engine.url.database
return dbname
def get_version_string():
"""Return Urma version as string"""
try:
return str(
subprocess.check_output(
['git', 'describe'], stderr=subprocess.STDOUT
)
).strip('\'b\\n')
except subprocess.CalledProcessError as exc_info:
gitproc = subprocess.Popen(['git', 'rev-parse', 'HEAD'],
stdout=subprocess.PIPE)
(stdout, _) = gitproc.communicate()
return stdout.strip()[:7].decode("utf-8")
def process_bookmarked_posts(session: Session,
posts: List[Posts], me_id: int) -> bool:
"""
Process bookmarked posts
Stop when we find post has already been marked bookmarked.
Return True if that's why we stopped, else False.
"""
for post in posts:
record = _process_post(session, post, me_id)
# Posts that are favourited and bookmarked are genuine bookmark
# posts: ignore.
if record.favourited:
continue
if record.bookmarked:
return True
else:
record.bookmarked = True
# TODO: mastapi.unbookmark(int(post.id))
return False
def process_favourited_posts(session: Session,
posts: List[Posts], me_id: int) -> bool:
"""
Process favourited posts.
Stop when we find post has already been marked favourited
Return True if that's why we stopped, else False.
"""
for post in posts:
if post.favourited:
record = _process_post(session, post, me_id)
if record.favourited:
return True
else:
record.favourited = True
else:
log.debug(
f"process_favourited_posts({post.id=}) not favourited"
)
return False
def _process_post(session: Session, post: Posts, me_id) -> Posts:
"""
Add passsed post to database
"""
log.debug(f"{post.id=} processing")
# Create account record if needed
log.debug(f"{post.id=} processing {post.account.id=}")
account_rec = Accounts.get_or_create(session, str(post.account.id))
if account_rec.username is None:
log.debug(f"{post.id=} populating new account {post.account.id=}")
account_rec.username = post.account.username
account_rec.acct = post.account.acct
account_rec.display_name = post.account.display_name
account_rec.bot = post.account.bot
account_rec.url = post.account.url
if post.reblog:
# We're only interesting the boosted post, not this onej
log.debug(f"{post.id=} {post.reblog.id=}")
boosted_record = _process_post(session, post.reblog, me_id)
# Record who bosed the post unless it was us
if post.account.id == me_id:
boosted_record.boosting_account_id = None
else:
boosted_record.boosting_account_id = account_rec.id
return boosted_record
rec = Posts.get_or_create(session, str(post.id))
if rec.account_id is not None:
# We already have this post
log.debug(f"{post.id=} already in db")
return rec
else:
rec.account_id = account_rec.id
# Create hashtag records as needed
for tag in post.tags:
log.debug(f"{post.id=} processing {tag.name=}")
hashtag = Hashtags.get_or_create(session, tag.name, tag.url)
rec.hashtags.append(hashtag)
rec.created_at = post.created_at
rec.uri = post.uri
return rec
def report():
"""Print report"""
print(f"Urma version: {get_version_string()}")
print(f"Database: {get_database_name()}")
print(f"Date: {datetime.datetime.now().strftime('%c')}")
print()
with Session() as session:
# Find the most popular hashtags that we don't follow
print("Hashtags you don't follow that feature in posts you like")
print("--------------------------------------------------------")
top_unfollowed_tags = (
session.execute(
select(Hashtags, func.count(Hashtags.name))
.join(PostTags).join(Posts)
.where(Posts.favourited == 1, Hashtags.followed == 0)
.group_by(Hashtags.name)
.order_by(func.count(Hashtags.name).desc())
.limit(Config.TOP_HASHTAGS_TO_REPORT))
.all()
)
# How many times was each hashtag in a post we didnt' like?
for (hashtag, like) in top_unfollowed_tags:
dislike = (
session.execute(
select(func.count(Posts.id))
.join(PostTags).join(Hashtags)
.where(Posts.favourited == 0, Hashtags.id == hashtag.id)
).scalars()
.all()[0]
)
print(
f"Hashtag {hashtag.name} {like=}, {dislike=} "
f"({like * 100 / (like + dislike):.2f}% liked)"
)
# Find the least popular hashtags that we do follow
print()
print("Hashtags you follow that feature in posts you don't like")
print("--------------------------------------------------------")
bottom_followed_tags = (
session.execute(
select(Hashtags, func.count(Hashtags.name))
.join(PostTags).join(Posts)
.where(Posts.favourited == 0, Hashtags.followed == 1)
.group_by(Hashtags.name)
.order_by(func.count(Hashtags.name).desc())
.limit(Config.TOP_HASHTAGS_TO_REPORT))
.all()
)
# How many times was each hashtag in a post we did like?
for (hashtag, dislike) in bottom_followed_tags:
like = (
session.execute(
select(func.count(Posts.id))
.join(PostTags).join(Hashtags)
.where(Posts.favourited == 1, Hashtags.id == hashtag.id)
).scalars()
.all()[0]
)
print(
f"Hashtag {hashtag.name} {like=}, {dislike=} "
f"({dislike * 100 / (like + dislike):.2f}% disliked)"
)
# Find the most popular users that we don't follow
print()
print("Users you don't follow that feature in posts you like")
print("-----------------------------------------------------")
top_unfollowed_users = (
session.execute(
select(Accounts, func.count(Accounts.username))
.join(Posts)
.where(Posts.favourited == 1, Accounts.followed == 0)
.group_by(Accounts.username)
.order_by(func.count(Accounts.username).desc())
.limit(Config.TOP_POSTS_TO_REPORT))
.all()
)
# How many times was each user in a post we didnt' like?
for (user, like) in top_unfollowed_users:
dislike = (
session.execute(
select(func.count(Posts.id))
.join(Accounts)
.where(Posts.favourited == 0, Accounts.id == user.id)
).scalars()
.all()[0]
)
print(
f"User {user.username} {like=}, {dislike=} "
f"({like * 100 / (like + dislike):.2f}% liked)"
)
# Find the most unpopular users that we do follow
print()
print("Users you follow that feature in posts you don't like")
print("-----------------------------------------------------")
bottom_followed_users = (
session.execute(
select(Accounts, func.count(Accounts.username))
.join(Posts)
.where(Posts.favourited == 0, Accounts.followed == 1)
.group_by(Accounts.username)
.order_by(func.count(Accounts.username).desc())
.limit(Config.TOP_POSTS_TO_REPORT))
.all()
)
# How many times was each user in a post we did like?
for (user, dislike) in bottom_followed_users:
like = (
session.execute(
select(func.count(Posts.id))
.join(Accounts)
.where(Posts.favourited == 1, Accounts.id == user.id)
).scalars()
.all()[0]
)
print(
f"User {user.username} {like=}, {dislike=} "
f"({dislike * 100 / (like + dislike):.2f}% disliked)"
)
def update_followed_accounts(session: Session, mastapi: MastodonAPI) -> None:
"""
Retrieve list of followed accounts and update accounts
in database to match
"""
mast_followed_accounts = mastapi.get_account_following()
mast_followed_accounts_d = index_ojects_by_parameter(
mast_followed_accounts, "username")
our_followed_accounts = Accounts.get_followed(session)
our_followed_accounts_d = index_ojects_by_parameter(
our_followed_accounts, "username")
# Add those we are missing
for username in (
set(mast_followed_accounts_d.keys()) -
set(our_followed_accounts_d.keys())
):
account = Accounts.get_or_create(
session, str(mast_followed_accounts_d[username].id)
)
account.followed = True
# Remove any we no longer follow
for username in (
set(our_followed_accounts_d.keys()) -
set(mast_followed_accounts_d.keys())
):
account = Accounts.get_or_create(
session, str(our_followed_accounts_d[username].account_id)
)
account.followed = False
def update_followed_hashtags(session: Session, mastapi: MastodonAPI) -> None:
"""
Retrieve list of followed hashtags and update hashtags
"""
mast_followed_hashtags = mastapi.get_hashtag_following()
mast_followed_hashtags_d = index_ojects_by_parameter(
mast_followed_hashtags, "name")
our_followed_hashtags = Hashtags.get_followed(session)
our_followed_hashtags_d = index_ojects_by_parameter(
our_followed_hashtags, "name")
# Add those we are missing
for name in (
set(mast_followed_hashtags_d.keys()) -
set(our_followed_hashtags_d.keys())
):
hashtag = Hashtags.get_or_create(
session, name, mast_followed_hashtags_d[name].url)
hashtag.followed = True
# Remove any we no longer follow
for name in (
set(our_followed_hashtags_d.keys()) -
set(mast_followed_hashtags_d.keys())
):
hashtag = Hashtags.get_or_create(
session, name, our_followed_hashtags_d[name].name)
hashtag.followed = False
if __name__ == "__main__":
"""
If command line arguments given, carry out requested function and
exit. Otherwise run full application.
"""
try:
Base.metadata.create_all(engine)
p = argparse.ArgumentParser()
# Only allow at most one option to be specified
group = p.add_mutually_exclusive_group()
group.add_argument('-u', '--update',
action="store_true", dest="update_database",
default=False, help="Update database from Mastodon")
group.add_argument('-r', '--report',
action="store_true", dest="report",
default=False, help="Report")
args = p.parse_args()
# Run as required
if args.update_database:
log.debug("Updating database")
update_database()
elif args.report:
log.debug("Report")
report()
else:
# For now, default to updating database
update_database()
except Exception as exc:
if os.environ["URMA_ENV"] != "DEVELOPMENT":
msg = stackprinter.format(exc)
send_mail(Config.ERRORS_TO, Config.ERRORS_FROM,
"Exception from urma", msg)
print("\033[1;31;47mUnhandled exception starts")
stackprinter.show(style="darkbg")
print("Unhandled exception ends\033[1;37;40m")