#! /usr/bin/env python import datetime import ipdb import os import pickle import random import requests import stackprinter import sys from config import Config from dbconfig import engine, Session, scoped_session from helpers import ( format_display_name, index_ojects_by_parameter, send_mail, ) from helpers import show_OK from log import log from mastodon import Mastodon from models import ( Accounts, Attachments, Base, Hashtags, Posts, PostTags, ) from typing import List, Optional from PyQt5.QtCore import Qt from PyQt5.QtGui import ( QImage, QPixmap, ) from PyQt5.QtWidgets import ( QApplication, QLabel, QMainWindow, QPushButton, ) from ui.main_window_ui import Ui_MainWindow # type: ignore TESTDATA = "/home/kae/git/urma/hometl.pickle" # Mastodon.create_app( # 'urma', # api_base_url='mastodon.org.uk', # to_file='urma_clientcred.secret' # ) # API_BASE_URL = 'mastodon.org.uk' # mastodon = Mastodon(client_id = 'urma_clientcred.secret',) # mastodon.log_in('kae@midnighthax.com', '^ZUaiC8P6vLV49', # to_file='urma_usercred.secret') class MastodonAPI: def __init__(self, access_token: str) -> None: """ Initialise access to Mastodon """ self.mastodon = Mastodon(access_token=access_token) self.me = self.mastodon.me() def get_account_following(self): """ Return a list of account_dicts that we are following """ page1 = self.mastodon.account_following(self.me.id) return self.mastodon.fetch_remaining(page1) def get_hashtag_following(self): """ Return a list of hashtag_dicts that we are following """ page1 = self.mastodon.tag_following(self.me.id) return self.mastodon.fetch_remaining(page1) class Window(QMainWindow, Ui_MainWindow): def __init__(self, parent=None) -> None: super().__init__(parent) self.setupUi(self) self.mastapi = MastodonAPI(Config.ACCESS_TOKEN) self.update_db() self.current_post_id = None self.next_post = self.next self.btnDislike.clicked.connect(self.dislike) self.btnFirst.clicked.connect(self.first) self.btnLast.clicked.connect(self.last) self.btnLike.clicked.connect(self.like) self.btnNext.clicked.connect(self.next) self.btnPrev.clicked.connect(self.prev) self.btnUnsure.clicked.connect(self.unsure) # Show first record self.next() def display(self, session: Session, post: Posts) -> None: """ Prepare to display post """ boosted_by = None if post.boosted_post_id: boosted_by = post.account while post.boosted_post_id: post = session.get(Posts, post.boosted_post_id) self._display(session, post, boosted_by) def _display(self, session: Session, post: int, boosted_by: Optional[Accounts] = None) -> None: """ Display passed post """ if post is None: return # Boosted if boosted_by: self.txtBoosted.setText( "Boosted by: " + format_display_name(boosted_by)) self.txtBoosted.show() else: self.txtBoosted.hide() # Username self.txtUsername.setText(format_display_name(post.account)) # Debug self.lblDebug.setText(str(post.id)) # Account self.lblAcct.setText(post.account.acct) # Hashtags unfollowed_hashtags = [ '#' + a.name for a in post.hashtags if not a.followed] followed_hashtags = [ '#' + a.name for a in post.hashtags if a.followed] hashtag_text = ( '' + '
'.join(followed_hashtags) + '

' + '' + '
'.join(unfollowed_hashtags) + '
' ) self.txtHashtags.setText(hashtag_text) # Post self.txtPost.setHtml(post.content) # Image if post.media_attachments: # TODO: handle multiple images, not just [0] url_image = post.media_attachments[0].preview_url pixmap = QPixmap() pixmap.loadFromData(requests.get(url_image).content) s_pixmap = pixmap.scaled(self.lblPicture.size(), Qt.KeepAspectRatio) self.lblPicture.show() self.lblPicture.setPixmap(s_pixmap) else: self.lblPicture.hide() def dislike(self): """ Mark a post as rated negatively """ self.rate_post(rating=-1) def first(self): """ actions """ pass def last(self): """ actions """ pass def like(self): """ Mark a post as rated positively """ self.rate_post(rating=1) def next(self) -> None: """ Display next post. We work BACKWARDS through posts, starting with the most recent, so "next" is actually one older. If we are called with self.current_post_id set to None, retrieve and display newest unrated post. """ # Remember whether we're going forward or backwards through # posts self.next_post = self.next # Get post to display with Session() as session: if self.current_post_id is None: post = Posts.get_unrated_newest(session) else: post = Posts.get_unrated_before(session, self.current_post_id) # Don't process posts that are boosted as they will be # processed by the boosting post while post and post.reblogged_by_post: post = Posts.get_unrated_before(session, post.post_id) if not post: self.current_post_id = None show_OK("All done", "No more posts to process") return self.current_post_id = post.post_id self.display(session, post) def prev(self): """ Display previous post. We work BACKWARDS through posts so "previous" is actually one newer. If we are called with self.current_post_id set to None, retrieve and display oldest unrated post. """ # Remember whether we're going forward or backwards through # posts self.next_post = self.prev # Get post to display, but don't process posts that are boosted # as they will be processed by the boosting post with Session() as session: if self.current_post_id is None: post = Posts.get_unrated_oldest(session) else: post = Posts.get_unrated_after(session, self.current_post_id) # Don't process posts that are boosted as they will be # processed by the boosting post while post and post.reblogged_by_post: post = Posts.get_unrated_after(session, post.post_id) if not post: self.current_post_id = None show_OK("All done", "No more posts to process") return self.current_post_id = post.post_id self.display(session, post) def rate_post(self, rating: int) -> None: """ Add rating to current post """ with Session() as session: post = Posts.get_by_post_id(session, self.current_post_id) post.rating = rating self.next_post() def unsure(self): """ Mark a post as rated neutrally """ self.rate_post(rating=0) def update_db(self) -> None: """ Update database from Mastodon Save a copy of downloaded data for debugging """ with Session() as session: minimum_post_id = Posts.max_post_id(session) if not minimum_post_id: minimum_post_id = "1" posts_to_get = Config.MAX_POSTS_TO_FETCH reached_minimum = False hometl = [] while True: # Create a filename to save data now = datetime.datetime.now() seq = 0 while True: fname = ( "testdata/" + now.strftime("%Y-%m-%d_%H:%M:%S_") + f"{seq:02d}.pickle" ) if not os.path.isfile(fname): print(f"{fname=}") break seq += 1 print(f"{seq=}") # Fetch data if not hometl: print("Fetching first data...") hometl = self.mastapi.mastodon.timeline() else: print("Fetching next data...") hometl = self.mastapi.mastodon.fetch_next(hometl) print(f"Fetched additional {len(hometl)} posts") with open(fname, "wb") as f: pickle.dump(hometl, f) for post in hometl: if str(post.id) <= minimum_post_id: reached_minimum = True break print(f"Processing {post.id=}") self._process_post(session, post) posts_to_get -= len(hometl) print(f"{posts_to_get=}") if posts_to_get <= 0 or reached_minimum or not hometl: break def _process_post(self, session: Session, post) -> Posts: """ Add passsed post to database """ log.debug(f"{post.id=} processing") rec = Posts.get_or_create(session, str(post.id)) if rec.account_id is not None: # We already have this post log.debug(f"{post.id=} already in db") return rec # Create account record if needed log.debug(f"{post.id=} processing {post.account.id=}") account_rec = Accounts.get_or_create(session, str(post.account.id)) if account_rec.username is None: log.debug(f"{post.id=} populating new account {post.account.id=}") account_rec.username = post.account.username account_rec.acct = post.account.acct account_rec.display_name = post.account.display_name account_rec.bot = post.account.bot account_rec.url = post.account.url rec.account_id = account_rec.id # Create hashtag records as needed for tag in post.tags: log.debug(f"{post.id=} processing {tag.name=}") hashtag = Hashtags.get_or_create(session, tag.name, tag.url) rec.hashtags.append(hashtag) # Handle media if post.media_attachments: for media in post.media_attachments: log.debug(f"{post.id=} processing {media.id=}") media_rec = Attachments.get_or_create( session, str(media.id), rec.id) if not media_rec.type: log.debug(f"{post.id=} {media.id=} new record") media_rec.type = media.type media_rec.url = media.url media_rec.preview_url = media.preview_url media_rec.description = media.description else: log.debug(f"{post.id=} {media.id=} already exists") else: log.debug(f"{post.id=} No media attachments") rec.account_id = account_rec.id rec.created_at = post.created_at rec.uri = post.uri rec.url = post.url rec.content = post.content[:Config.MAX_CONTENT_LENGTH] log.debug(f"{post.id=} {post.content=}") if post.reblog: log.debug(f"{post.id=} {post.reblog.id=}") rec.boosted_post_id = self._process_post( session, post.reblog).id log.debug(f"{post.id=} {rec.boosted_post_id=}") return rec def update_followed_accounts(self, session: Session) -> None: """ Retrieve list of followed accounts and update accounts in database to match """ mast_followed_accounts = self.mastapi.get_account_following() mast_followed_accounts_d = index_ojects_by_parameter( mast_followed_accounts, "username") our_followed_accounts = Accounts.get_followed(session) our_followed_accounts_d = index_ojects_by_parameter( our_followed_accounts, "username") # Add those we are missing for username in ( set(mast_followed_accounts_d.keys()) - set(our_followed_accounts_d.keys()) ): account = Accounts.get_or_create( session, str(mast_followed_accounts_d[username].id) ) account.followed = True # Remove any we no longer follow for username in ( set(our_followed_accounts_d.keys()) - set(mast_followed_accounts_d.keys()) ): account = Accounts.get_or_create( session, str(our_followed_accounts_d[username].account_id) ) account.followed = False def update_followed_hashtags(self, session: Session) -> None: """ Retrieve list of followed hashtags and update hashtags """ mast_followed_hashtags = self.mastapi.get_hashtag_following() mast_followed_hashtags_d = index_ojects_by_parameter( mast_followed_hashtags, "name") our_followed_hashtags = Hashtags.get_followed(session) our_followed_hashtags_d = index_ojects_by_parameter( our_followed_hashtags, "name") # Add those we are missing for name in ( set(mast_followed_hashtags_d.keys()) - set(our_followed_hashtags_d.keys()) ): hashtag = Hashtags.get_or_create( session, name, mast_followed_hashtags_d[name].url) hashtag.followed = True # Remove any we no longer follow for name in ( set(our_followed_hashtags_d.keys()) - set(mast_followed_hashtags_d.keys()) ): hashtag = hashtags.get_or_create( session, name, our_followed_hashtags_d[username].name) hashtag.followed = False # class HoldingPot: # def process_post(post): if __name__ == "__main__": """ If command line arguments given, carry out requested function and exit. Otherwise run full application. """ try: Base.metadata.create_all(engine) app = QApplication(sys.argv) win = Window() win.show() sys.exit(app.exec()) except Exception as exc: if os.environ["URMA_ENV"] != "DEVELOPMENT": msg = stackprinter.format(exc) send_mail(Config.ERRORS_TO, Config.ERRORS_FROM, "Exception from urma", msg) print("\033[1;31;47mUnhandled exception starts") stackprinter.show(style="darkbg") print("Unhandled exception ends\033[1;37;40m") # # Data for development # with open(TESTDATA, "rb") as inp: # hometl = pickle.load(inp) # # with Session() as session: # for post in hometl: # process_post(post)