#! /usr/bin/env python import argparse import datetime import ipdb import os import pickle import random import requests import stackprinter import subprocess import sys from config import Config from dbconfig import ( engine, Session, scoped_session, ) from helpers import ( index_ojects_by_parameter, send_mail, ) from log import log from mastodon import Mastodon from models import ( Accounts, Base, Hashtags, Posts, PostTags, ) from sqlalchemy import ( func, select, ) from typing import List, Optional, Union # TESTDATA = "/home/kae/git/urma/hometl.pickle" # # Mastodon.create_app( # 'urma', # api_base_url='mastodon.org.uk', # to_file='urma_clientcred.secret' # ) # API_BASE_URL = 'mastodon.org.uk' # mastodon = Mastodon(client_id = 'urma_clientcred.secret',) # mastodon.log_in('kae@midnighthax.com', '^ZUaiC8P6vLV49', # to_file='urma_usercred.secret') class MastodonAPI: def __init__(self, access_token: str) -> None: """ Initialise access to Mastodon """ self.mastodon = Mastodon(access_token=access_token) self.me = self.mastodon.me() def get_account_following(self): """ Return a list of account_dicts that we are following """ page1 = self.mastodon.account_following(self.me.id) return self.mastodon.fetch_remaining(page1) def get_bookmarked(self, since: int) -> List[dict]: """ Return posts bookmarked since id 'since' """ results = [] data = self.mastodon.bookmarks() while data: # Add in new data results.extend(data) # Have we reached minimum id? if min([a.id for a in data]) < since: break # Get more data data = self.mastodon.fetch_next(data) return results def get_hashtag_following(self): """ Return a list of hashtag_dicts that we are following """ page1 = self.mastodon.tag_following(self.me.id) return self.mastodon.fetch_remaining(page1) def unbookmark(self, post_id: int) -> None: """ Remove bookmark on passed post ID """ log.debug(f"unbookmark({post_id=})") _ = self.mastodon.status_unbookmark(post_id) def update_database() -> None: """ Main loop """ mastapi = MastodonAPI(Config.ACCESS_TOKEN) with Session() as session: update_followed_accounts(session, mastapi) update_followed_hashtags(session, mastapi) get_and_process_favourited(session, mastapi) get_and_process_bookmarked(session, mastapi) def get_and_process_bookmarked(session, mastapi): """Get newly bookmarked posts and add to db""" posts_fetched = 0 bookmarked = mastapi.mastodon.bookmarks() while bookmarked and posts_fetched <= Config.MAX_POSTS_TO_FETCH: posts_fetched += len(bookmarked) if process_bookmarked_posts(session, bookmarked, mastapi.me.id): return bookmarked = mastapi.mastodon.fetch_next(bookmarked) def get_and_process_favourited(session, mastapi): """Get newly favourited posts and add to db""" posts_fetched = 0 favourited = mastapi.mastodon.favourites() while favourited and posts_fetched <= Config.MAX_POSTS_TO_FETCH: posts_fetched += len(favourited) if process_favourited_posts(session, favourited, mastapi.me.id): return favourited = mastapi.mastodon.fetch_next(favourited) def get_database_name(): """Return database name as string""" with Session() as session: dbname = session.bind.engine.url.database return dbname def get_version_string(): """Return Urma version as string""" try: return str( subprocess.check_output( ['git', 'describe'], stderr=subprocess.STDOUT ) ).strip('\'b\\n') except subprocess.CalledProcessError as exc_info: gitproc = subprocess.Popen(['git', 'rev-parse', 'HEAD'], stdout=subprocess.PIPE) (stdout, _) = gitproc.communicate() return stdout.strip()[:7].decode("utf-8") def process_bookmarked_posts(session: Session, posts: List[Posts], me_id: int) -> bool: """ Process bookmarked posts Stop when we find post has already been marked bookmarked. Return True if that's why we stopped, else False. """ for post in posts: record = _process_post(session, post, me_id) # Posts that are favourited and bookmarked are genuine bookmark # posts: ignore. if record.favourited: continue if record.bookmarked: return True else: record.bookmarked = True # TODO: mastapi.unbookmark(int(post.id)) return False def process_favourited_posts(session: Session, posts: List[Posts], me_id: int) -> bool: """ Process favourited posts. Stop when we find post has already been marked favourited Return True if that's why we stopped, else False. """ for post in posts: if post.favourited: record = _process_post(session, post, me_id) if record.favourited: return True else: record.favourited = True else: log.debug( f"process_favourited_posts({post.id=}) not favourited" ) return False def _process_post(session: Session, post: Posts, me_id) -> Posts: """ Add passsed post to database """ log.debug(f"{post.id=} processing") # Create account record if needed log.debug(f"{post.id=} processing {post.account.id=}") account_rec = Accounts.get_or_create(session, str(post.account.id)) if account_rec.username is None: log.debug(f"{post.id=} populating new account {post.account.id=}") account_rec.username = post.account.username account_rec.acct = post.account.acct account_rec.display_name = post.account.display_name account_rec.bot = post.account.bot account_rec.url = post.account.url if post.reblog: # We're only interesting the boosted post, not this onej log.debug(f"{post.id=} {post.reblog.id=}") boosted_record = _process_post(session, post.reblog, me_id) # Record who bosed the post unless it was us if post.account.id == me_id: boosted_record.boosting_account_id = None else: boosted_record.boosting_account_id = account_rec.id return boosted_record rec = Posts.get_or_create(session, str(post.id)) if rec.account_id is not None: # We already have this post log.debug(f"{post.id=} already in db") return rec else: rec.account_id = account_rec.id # Create hashtag records as needed for tag in post.tags: log.debug(f"{post.id=} processing {tag.name=}") hashtag = Hashtags.get_or_create(session, tag.name, tag.url) rec.hashtags.append(hashtag) rec.created_at = post.created_at rec.uri = post.uri return rec def report(): """Print report""" print(f"Urma version: {get_version_string()}") print(f"Database: {get_database_name()}") print(f"Date: {datetime.datetime.now().strftime('%c')}") print() with Session() as session: # Find the most popular hashtags that we don't follow print("Hashtags you don't follow that feature in posts you like") print("--------------------------------------------------------") top_unfollowed_tags = ( session.execute( select(Hashtags, func.count(Hashtags.name)) .join(PostTags).join(Posts) .where(Posts.favourited == 1, Hashtags.followed == 0) .group_by(Hashtags.name) .order_by(func.count(Hashtags.name).desc()) .limit(Config.TOP_HASHTAGS_TO_REPORT)) .all() ) # How many times was each hashtag in a post we didnt' like? for (hashtag, like) in top_unfollowed_tags: dislike = ( session.execute( select(func.count(Posts.id)) .join(PostTags).join(Hashtags) .where(Posts.favourited == 0, Hashtags.id == hashtag.id) ).scalars() .all()[0] ) print( f"Hashtag {hashtag.name} {like=}, {dislike=} " f"({like * 100 / (like + dislike):.2f}% liked)" ) # Find the least popular hashtags that we do follow print() print("Hashtags you follow that feature in posts you don't like") print("--------------------------------------------------------") bottom_followed_tags = ( session.execute( select(Hashtags, func.count(Hashtags.name)) .join(PostTags).join(Posts) .where(Posts.favourited == 0, Hashtags.followed == 1) .group_by(Hashtags.name) .order_by(func.count(Hashtags.name).desc()) .limit(Config.TOP_HASHTAGS_TO_REPORT)) .all() ) # How many times was each hashtag in a post we did like? for (hashtag, dislike) in bottom_followed_tags: like = ( session.execute( select(func.count(Posts.id)) .join(PostTags).join(Hashtags) .where(Posts.favourited == 1, Hashtags.id == hashtag.id) ).scalars() .all()[0] ) print( f"Hashtag {hashtag.name} {like=}, {dislike=} " f"({dislike * 100 / (like + dislike):.2f}% disliked)" ) # Find the most popular users that we don't follow print() print("Users you don't follow that feature in posts you like") print("-----------------------------------------------------") top_unfollowed_users = ( session.execute( select(Accounts, func.count(Accounts.username)) .join(Posts) .where(Posts.favourited == 1, Accounts.followed == 0) .group_by(Accounts.username) .order_by(func.count(Accounts.username).desc()) .limit(Config.TOP_POSTS_TO_REPORT)) .all() ) # How many times was each user in a post we didnt' like? for (user, like) in top_unfollowed_users: dislike = ( session.execute( select(func.count(Posts.id)) .join(Accounts) .where(Posts.favourited == 0, Accounts.id == user.id) ).scalars() .all()[0] ) print( f"User {user.username} {like=}, {dislike=} " f"({like * 100 / (like + dislike):.2f}% liked)" ) # Find the most unpopular users that we do follow print() print("Users you follow that feature in posts you don't like") print("-----------------------------------------------------") bottom_followed_users = ( session.execute( select(Accounts, func.count(Accounts.username)) .join(Posts) .where(Posts.favourited == 0, Accounts.followed == 1) .group_by(Accounts.username) .order_by(func.count(Accounts.username).desc()) .limit(Config.TOP_POSTS_TO_REPORT)) .all() ) # How many times was each user in a post we did like? for (user, dislike) in bottom_followed_users: like = ( session.execute( select(func.count(Posts.id)) .join(Accounts) .where(Posts.favourited == 1, Accounts.id == user.id) ).scalars() .all()[0] ) print( f"User {user.username} {like=}, {dislike=} " f"({dislike * 100 / (like + dislike):.2f}% disliked)" ) def update_followed_accounts(session: Session, mastapi: MastodonAPI) -> None: """ Retrieve list of followed accounts and update accounts in database to match """ mast_followed_accounts = mastapi.get_account_following() mast_followed_accounts_d = index_ojects_by_parameter( mast_followed_accounts, "username") our_followed_accounts = Accounts.get_followed(session) our_followed_accounts_d = index_ojects_by_parameter( our_followed_accounts, "username") # Add those we are missing for username in ( set(mast_followed_accounts_d.keys()) - set(our_followed_accounts_d.keys()) ): account = Accounts.get_or_create( session, str(mast_followed_accounts_d[username].id) ) account.followed = True # Remove any we no longer follow for username in ( set(our_followed_accounts_d.keys()) - set(mast_followed_accounts_d.keys()) ): account = Accounts.get_or_create( session, str(our_followed_accounts_d[username].account_id) ) account.followed = False def update_followed_hashtags(session: Session, mastapi: MastodonAPI) -> None: """ Retrieve list of followed hashtags and update hashtags """ mast_followed_hashtags = mastapi.get_hashtag_following() mast_followed_hashtags_d = index_ojects_by_parameter( mast_followed_hashtags, "name") our_followed_hashtags = Hashtags.get_followed(session) our_followed_hashtags_d = index_ojects_by_parameter( our_followed_hashtags, "name") # Add those we are missing for name in ( set(mast_followed_hashtags_d.keys()) - set(our_followed_hashtags_d.keys()) ): hashtag = Hashtags.get_or_create( session, name, mast_followed_hashtags_d[name].url) hashtag.followed = True # Remove any we no longer follow for name in ( set(our_followed_hashtags_d.keys()) - set(mast_followed_hashtags_d.keys()) ): hashtag = Hashtags.get_or_create( session, name, our_followed_hashtags_d[name].name) hashtag.followed = False if __name__ == "__main__": """ If command line arguments given, carry out requested function and exit. Otherwise run full application. """ try: Base.metadata.create_all(engine) p = argparse.ArgumentParser() # Only allow at most one option to be specified group = p.add_mutually_exclusive_group() group.add_argument('-u', '--update', action="store_true", dest="update_database", default=False, help="Update database from Mastodon") group.add_argument('-r', '--report', action="store_true", dest="report", default=False, help="Report") args = p.parse_args() # Run as required if args.update_database: log.debug("Updating database") update_database() elif args.report: log.debug("Report") report() else: # For now, default to updating database update_database() except Exception as exc: if os.environ["URMA_ENV"] != "DEVELOPMENT": msg = stackprinter.format(exc) send_mail(Config.ERRORS_TO, Config.ERRORS_FROM, "Exception from urma", msg) print("\033[1;31;47mUnhandled exception starts") stackprinter.show(style="darkbg") print("Unhandled exception ends\033[1;37;40m")