urma/app/urma.py
2023-01-15 20:50:57 +00:00

352 lines
9.5 KiB
Python
Executable File

#! /usr/bin/env python
import datetime
import ipdb
import os
import pickle
import random
import requests
import stackprinter
import sys
from config import Config
from dbconfig import (
engine,
Session,
scoped_session,
)
from helpers import (
index_ojects_by_parameter,
send_mail,
)
from log import log
from mastodon import Mastodon
from models import (
Accounts,
Base,
Hashtags,
Posts,
PostTags,
)
from typing import List, Optional, Union
# TESTDATA = "/home/kae/git/urma/hometl.pickle"
#
# Mastodon.create_app(
# 'urma',
# api_base_url='mastodon.org.uk',
# to_file='urma_clientcred.secret'
# )
# API_BASE_URL = 'mastodon.org.uk'
# mastodon = Mastodon(client_id = 'urma_clientcred.secret',)
# mastodon.log_in('kae@midnighthax.com', '^ZUaiC8P6vLV49',
# to_file='urma_usercred.secret')
class MastodonAPI:
def __init__(self, access_token: str) -> None:
"""
Initialise access to Mastodon
"""
self.mastodon = Mastodon(access_token=access_token)
self.me = self.mastodon.me()
def get_account_following(self):
"""
Return a list of account_dicts that we are following
"""
page1 = self.mastodon.account_following(self.me.id)
return self.mastodon.fetch_remaining(page1)
def get_bookmarked(self, since: int) -> List[dict]:
"""
Return posts bookmarked since id 'since'
"""
results = []
data = self.mastodon.bookmarks()
while data:
# Add in new data
results.extend(data)
# Have we reached minimum id?
if min([a.id for a in data]) < since:
break
# Get more data
data = self.mastodon.fetch_next(data)
return results
def get_boosted(self, since: int) -> List[dict]:
"""
Return posts boosted since id 'since'
"""
results = []
data = self.mastodon.account_statuses(self.me.id)
while data:
for datum in data:
# Have we reached minimum id?
if datum.id < since:
break
# Is this our post that we boosted?
if datum.account.id == self.me.id and datum.reblog:
# Add in new data
results.append(datum)
# Get more data
data = self.mastodon.fetch_next(data)
return results
def get_favourited(self, since: Union[int, List[dict]]) -> List[dict]:
"""
Return posts favourite since id 'since'
"""
results = []
data = self.mastodon.favourites()
while data:
# Add in new data
results.extend(data)
# Have we reached minimum id?
if min([a.id for a in data]) < since:
break
# Get more data
data = self.mastodon.fetch_next(data)
return results
def get_hashtag_following(self):
"""
Return a list of hashtag_dicts that we are following
"""
page1 = self.mastodon.tag_following(self.me.id)
return self.mastodon.fetch_remaining(page1)
def unbookmark(self, post_id: int) -> None:
"""
Remove bookmark on passed post ID
"""
log.debug(f"unbookmark({post_id=})")
_ = self.mastodon.status_unbookmark(post_id)
def main() -> None:
"""
Main loop
"""
mastapi = MastodonAPI(Config.ACCESS_TOKEN)
with Session() as session:
since = get_since_id(session)
update_followed_accounts(session, mastapi)
update_followed_hashtags(session, mastapi)
favourited = mastapi.get_favourited(since)
process_favourited_posts(session, favourited)
boosted = mastapi.get_boosted(since)
process_boosted_posts(session, boosted)
bookmarked = mastapi.get_bookmarked(since)
process_bookmarked_posts(session, mastapi, bookmarked)
def get_since_id(session: Session) -> int:
"""
Return id to use as 'min_id' when fetching posts.
We don't want to fetch anything older than MAX_DAYS_TO_FETCH.
"""
# Build psuedo id for MAX_DAYS_TO_FETCH time ago
now = datetime.datetime.now()
max_days_ago_dt = now - datetime.timedelta(days=Config.MAX_DAYS_TO_FETCH)
# From mastodon.py package, use code from internals.py:__unpack_id
max_days_ago_id = (int(max_days_ago_dt.timestamp()) << 16) * 1000
return max_days_ago_id
# Get newest ID from database
newest_db_id = Posts.max_post_id(session)
if not newest_db_id:
return max_days_ago_id
else:
return max(max_days_ago_id, newest_db_id)
def process_bookmarked_posts(session, mastapi, posts) -> None:
"""
Process bookmarked posts
"""
for post in posts:
record = _process_post(session, post)
# Posts that are favourited and bookmarked are genuine bookmark
# posts: ignore.
if record.favourited:
continue
record.bookmarked = True
return
# TODO: mastapi.unbookmark(int(post.id))
def process_boosted_posts(session, posts) -> None:
"""
Process boosted posts
"""
for post in posts:
record = _process_post(session, post)
record.boosted = True
def process_favourited_posts(session, posts) -> None:
"""
Process favourited posts
"""
for post in posts:
record = _process_post(session, post)
record.favourited = True
def _process_post(session: Session, post) -> Posts:
"""
Add passsed post to database
"""
log.debug(f"{post.id=} processing")
rec = Posts.get_or_create(session, str(post.id))
if rec.account_id is not None:
# We already have this post
log.debug(f"{post.id=} already in db")
return rec
# Create account record if needed
log.debug(f"{post.id=} processing {post.account.id=}")
account_rec = Accounts.get_or_create(session, str(post.account.id))
if account_rec.username is None:
log.debug(f"{post.id=} populating new account {post.account.id=}")
account_rec.username = post.account.username
account_rec.acct = post.account.acct
account_rec.display_name = post.account.display_name
account_rec.bot = post.account.bot
account_rec.url = post.account.url
rec.account_id = account_rec.id
# Create hashtag records as needed
for tag in post.tags:
log.debug(f"{post.id=} processing {tag.name=}")
hashtag = Hashtags.get_or_create(session, tag.name, tag.url)
rec.hashtags.append(hashtag)
rec.created_at = post.created_at
rec.uri = post.uri
if post.reblog:
log.debug(f"{post.id=} {post.reblog.id=}")
boosted_rec = _process_post(session, post.reblog)
rec.boosted_by_id = boosted_rec.account_id
return rec
def update_followed_accounts(session: Session, mastapi: MastodonAPI) -> None:
"""
Retrieve list of followed accounts and update accounts
in database to match
"""
mast_followed_accounts = mastapi.get_account_following()
mast_followed_accounts_d = index_ojects_by_parameter(
mast_followed_accounts, "username")
our_followed_accounts = Accounts.get_followed(session)
our_followed_accounts_d = index_ojects_by_parameter(
our_followed_accounts, "username")
# Add those we are missing
for username in (
set(mast_followed_accounts_d.keys()) -
set(our_followed_accounts_d.keys())
):
account = Accounts.get_or_create(
session, str(mast_followed_accounts_d[username].id)
)
account.followed = True
# Remove any we no longer follow
for username in (
set(our_followed_accounts_d.keys()) -
set(mast_followed_accounts_d.keys())
):
account = Accounts.get_or_create(
session, str(our_followed_accounts_d[username].account_id)
)
account.followed = False
def update_followed_hashtags(session: Session, mastapi: MastodonAPI) -> None:
"""
Retrieve list of followed hashtags and update hashtags
"""
mast_followed_hashtags = mastapi.get_hashtag_following()
mast_followed_hashtags_d = index_ojects_by_parameter(
mast_followed_hashtags, "name")
our_followed_hashtags = Hashtags.get_followed(session)
our_followed_hashtags_d = index_ojects_by_parameter(
our_followed_hashtags, "name")
# Add those we are missing
for name in (
set(mast_followed_hashtags_d.keys()) -
set(our_followed_hashtags_d.keys())
):
hashtag = Hashtags.get_or_create(
session, name, mast_followed_hashtags_d[name].url)
hashtag.followed = True
# Remove any we no longer follow
for name in (
set(our_followed_hashtags_d.keys()) -
set(mast_followed_hashtags_d.keys())
):
hashtag = hashtags.get_or_create(
session, name, our_followed_hashtags_d[username].name)
hashtag.followed = False
if __name__ == "__main__":
"""
If command line arguments given, carry out requested function and
exit. Otherwise run full application.
"""
try:
Base.metadata.create_all(engine)
sys.exit(main())
except Exception as exc:
if os.environ["URMA_ENV"] != "DEVELOPMENT":
msg = stackprinter.format(exc)
send_mail(Config.ERRORS_TO, Config.ERRORS_FROM,
"Exception from urma", msg)
print("\033[1;31;47mUnhandled exception starts")
stackprinter.show(style="darkbg")
print("Unhandled exception ends\033[1;37;40m")