witchie/toot/tui/app.py

607 lines
22 KiB
Python

import logging
import urwid
from concurrent.futures import ThreadPoolExecutor
from toot import api, config, __version__
from .compose import StatusComposer
from .constants import PALETTE
from .entities import Status
from .overlays import ExceptionStackTrace, GotoMenu, Help, StatusSource, StatusLinks, StatusZoom
from .overlays import StatusDeleteConfirmation
from .timeline import Timeline
from .utils import parse_content_links, show_media
logger = logging.getLogger(__name__)
urwid.set_encoding('UTF-8')
class Header(urwid.WidgetWrap):
def __init__(self, app, user):
self.app = app
self.user = user
self.text = urwid.Text("")
self.cols = urwid.Columns([
("pack", urwid.Text(('header_bold', 'toot'))),
("pack", urwid.Text(('header', ' | {}@{}'.format(user.username, app.instance)))),
("pack", self.text),
])
widget = urwid.AttrMap(self.cols, 'header')
widget = urwid.Padding(widget)
self._wrapped_widget = widget
def clear_text(self, text):
self.text.set_text("")
def set_text(self, text):
self.text.set_text(" | " + text)
class Footer(urwid.Pile):
def __init__(self):
self.status = urwid.Text("")
self.message = urwid.Text("")
return super().__init__([
urwid.AttrMap(self.status, "footer_status"),
urwid.AttrMap(self.message, "footer_message"),
])
def set_status(self, text):
self.status.set_text(text)
def clear_status(self, text):
self.status.set_text("")
def set_message(self, text):
self.message.set_text(text)
def set_error_message(self, text):
self.message.set_text(("footer_message_error", text))
def clear_message(self):
self.message.set_text("")
class TUI(urwid.Frame):
"""Main TUI frame."""
@classmethod
def create(cls, app, user):
"""Factory method, sets up TUI and an event loop."""
tui = cls(app, user)
loop = urwid.MainLoop(
tui,
palette=PALETTE,
event_loop=urwid.AsyncioEventLoop(),
unhandled_input=tui.unhandled_input,
)
tui.loop = loop
return tui
def __init__(self, app, user):
self.app = app
self.user = user
self.config = config.load_config()
self.loop = None # set in `create`
self.executor = ThreadPoolExecutor(max_workers=1)
self.timeline_generator = api.home_timeline_generator(app, user, limit=40)
# Show intro screen while toots are being loaded
self.body = self.build_intro()
self.header = Header(app, user)
self.footer = Footer()
self.footer.set_status("Loading...")
# Default max status length, updated on startup
self.max_toot_chars = 500
self.timeline = None
self.overlay = None
self.exception = None
self.can_translate = False
super().__init__(self.body, header=self.header, footer=self.footer)
def run(self):
self.loop.set_alarm_in(0, lambda *args: self.async_load_instance())
self.loop.set_alarm_in(0, lambda *args: self.async_load_timeline(
is_initial=True, timeline_name="home"))
self.loop.run()
self.executor.shutdown(wait=False)
def build_intro(self):
font = urwid.font.Thin6x6Font()
# NB: Padding with width="clip" will convert the fixed BigText widget
# to a flow widget so it can be used in a Pile.
big_text = "Toot {}".format(__version__)
big_text = urwid.BigText(("intro_bigtext", big_text), font)
big_text = urwid.Padding(big_text, align="center", width="clip")
intro = urwid.Pile([
big_text,
urwid.Divider(),
urwid.Text([
"Maintained by ",
("intro_smalltext", "@ihabunek"),
" and contributors"
], align="center"),
urwid.Divider(),
urwid.Text(("intro_smalltext", "Loading toots..."), align="center"),
])
return urwid.Filler(intro)
def run_in_thread(self, fn, args=[], kwargs={}, done_callback=None, error_callback=None):
"""Runs `fn(*args, **kwargs)` asynchronously in a separate thread.
On completion calls `done_callback` if `fn` exited cleanly, or
`error_callback` if an exception was caught. Callback methods are
invoked in the main thread, not the thread in which `fn` is executed.
"""
def _default_error_callback(ex):
self.exception = ex
self.footer.set_error_message("An exception occurred, press E to view")
_error_callback = error_callback or _default_error_callback
def _done(future):
try:
result = future.result()
if done_callback:
# Use alarm to invoke callback in main thread
self.loop.set_alarm_in(0, lambda *args: done_callback(result))
except Exception as ex:
exception = ex
logger.exception(exception)
self.loop.set_alarm_in(0, lambda *args: _error_callback(exception))
future = self.executor.submit(fn, *args, **kwargs)
future.add_done_callback(_done)
return future
def connect_default_timeline_signals(self, timeline):
def _compose(*args):
self.show_compose()
def _delete(timeline, status):
if status.is_mine:
self.show_delete_confirmation(status)
def _reply(timeline, status):
self.show_compose(status)
def _source(timeline, status):
self.show_status_source(status)
def _links(timeline, status):
self.show_links(status)
def _media(timeline, status):
self.show_media(status)
def _menu(timeline, status):
self.show_context_menu(status)
def _zoom(timeline, status_details):
self.show_status_zoom(status_details)
urwid.connect_signal(timeline, "compose", _compose)
urwid.connect_signal(timeline, "delete", _delete)
urwid.connect_signal(timeline, "favourite", self.async_toggle_favourite)
urwid.connect_signal(timeline, "focus", self.refresh_footer)
urwid.connect_signal(timeline, "media", _media)
urwid.connect_signal(timeline, "menu", _menu)
urwid.connect_signal(timeline, "reblog", self.async_toggle_reblog)
urwid.connect_signal(timeline, "reply", _reply)
urwid.connect_signal(timeline, "source", _source)
urwid.connect_signal(timeline, "links", _links)
urwid.connect_signal(timeline, "zoom", _zoom)
urwid.connect_signal(timeline, "translate", self.async_translate)
def build_timeline(self, name, statuses, local):
def _close(*args):
raise urwid.ExitMainLoop()
def _next(*args):
self.async_load_timeline(is_initial=False)
def _thread(timeline, status):
self.show_thread(status)
def _toggle_save(timeline, status):
if not timeline.name.startswith("#"):
return
hashtag = timeline.name[1:]
assert isinstance(local, bool), local
timelines = self.config.setdefault("timelines", {})
if hashtag in timelines:
del timelines[hashtag]
self.footer.set_message("#{} unpinned".format(hashtag))
else:
timelines[hashtag] = {"local": local}
self.footer.set_message("#{} pinned".format(hashtag))
self.loop.set_alarm_in(5, lambda *args: self.footer.clear_message())
config.save_config(self.config)
timeline = Timeline(name, statuses, self.can_translate)
self.connect_default_timeline_signals(timeline)
urwid.connect_signal(timeline, "next", _next)
urwid.connect_signal(timeline, "close", _close)
urwid.connect_signal(timeline, "thread", _thread)
urwid.connect_signal(timeline, "save", _toggle_save)
return timeline
def make_status(self, status_data):
is_mine = self.user.username == status_data["account"]["acct"]
return Status(status_data, is_mine, self.app.instance)
def show_thread(self, status):
def _close(*args):
"""When thread is closed, go back to the main timeline."""
self.body = self.timeline
self.body.refresh_status_details()
self.refresh_footer(self.timeline)
# This is pretty fast, so it's probably ok to block while context is
# loaded, can be made async later if needed
context = api.context(self.app, self.user, status.original.id)
ancestors = [self.make_status(s) for s in context["ancestors"]]
descendants = [self.make_status(s) for s in context["descendants"]]
statuses = ancestors + [status] + descendants
focus = len(ancestors)
timeline = Timeline("thread", statuses, self.can_translate, focus,
is_thread=True)
self.connect_default_timeline_signals(timeline)
urwid.connect_signal(timeline, "close", _close)
self.body = timeline
self.refresh_footer(timeline)
def async_load_timeline(self, is_initial, timeline_name=None, local=None):
"""Asynchronously load a list of statuses."""
def _load_statuses():
self.footer.set_message("Loading statuses...")
try:
data = next(self.timeline_generator)
except StopIteration:
return []
finally:
self.footer.clear_message()
return [self.make_status(s) for s in data]
def _done_initial(statuses):
"""Process initial batch of statuses, construct a Timeline."""
self.timeline = self.build_timeline(timeline_name, statuses, local)
self.timeline.refresh_status_details() # Draw first status
self.refresh_footer(self.timeline)
self.body = self.timeline
def _done_next(statuses):
"""Process sequential batch of statuses, adds statuses to the
existing timeline."""
self.timeline.append_statuses(statuses)
return self.run_in_thread(_load_statuses,
done_callback=_done_initial if is_initial else _done_next)
def async_load_instance(self):
"""
Attempt to update max_toot_chars from instance data.
Does not work on vanilla Mastodon, works on Pleroma.
See: https://github.com/tootsuite/mastodon/issues/4915
Also attempt to update translation flag from instance
data. Translation is only present on Mastodon 4+ servers
where the administrator has enabled this feature.
See: https://github.com/mastodon/mastodon/issues/19328
"""
def _load_instance():
return api.get_instance(self.app.instance)
def _done(instance):
if "max_toot_chars" in instance:
self.max_toot_chars = instance["max_toot_chars"]
if "translation" in instance:
# instance is advertising translation service
self.can_translate = instance["translation"]["enabled"]
elif "version" in instance:
# fallback check:
# get the major version number of the server
# this works for Mastodon and Pleroma version strings
# Mastodon versions < 4 do not have translation service
# Revisit this logic if Pleroma implements translation
self.can_translate = int(instance["version"][0]) > 3
return self.run_in_thread(_load_instance, done_callback=_done)
def refresh_footer(self, timeline):
"""Show status details in footer."""
status, index, count = timeline.get_focused_status_with_counts()
self.footer.set_status([
("footer_status_bold", "[{}] ".format(timeline.name)),
] + ([status.id, " - status ", str(index + 1), " of ", str(count)]
if status else ["no focused status"]))
def show_status_source(self, status):
self.open_overlay(
widget=StatusSource(status),
title="Status source",
)
def show_links(self, status):
links = parse_content_links(status.data["content"]) if status else []
post_attachments = status.data["media_attachments"] or []
reblog_attachments = (status.data["reblog"]["media_attachments"] if status.data["reblog"] else None) or []
for a in post_attachments + reblog_attachments:
url = a["remote_url"] or a["url"]
links.append((url, a["description"] if a["description"] else url))
if links:
self.open_overlay(
widget=StatusLinks(links),
title="Status links",
options={"height": len(links) + 2},
)
def show_status_zoom(self, status_details):
self.open_overlay(
widget=StatusZoom(status_details),
title="Status zoom",
)
def show_exception(self, exception):
self.open_overlay(
widget=ExceptionStackTrace(exception),
title="Unhandled Exception",
)
def show_compose(self, in_reply_to=None):
def _close(*args):
self.close_overlay()
def _post(timeline, *args):
self.post_status(*args)
composer = StatusComposer(self.max_toot_chars, in_reply_to)
urwid.connect_signal(composer, "close", _close)
urwid.connect_signal(composer, "post", _post)
self.open_overlay(composer, title="Compose status")
def show_goto_menu(self):
user_timelines = self.config.get("timelines", {})
menu = GotoMenu(user_timelines)
urwid.connect_signal(menu, "home_timeline",
lambda x: self.goto_home_timeline())
urwid.connect_signal(menu, "public_timeline",
lambda x, local: self.goto_public_timeline(local))
urwid.connect_signal(menu, "hashtag_timeline",
lambda x, tag, local: self.goto_tag_timeline(tag, local=local))
self.open_overlay(menu, title="Go to", options=dict(
align="center", width=("relative", 60),
valign="middle", height=9 + len(user_timelines),
))
def show_help(self):
self.open_overlay(Help(), title="Help")
def goto_home_timeline(self):
self.timeline_generator = api.home_timeline_generator(
self.app, self.user, limit=40)
promise = self.async_load_timeline(is_initial=True, timeline_name="home")
promise.add_done_callback(lambda *args: self.close_overlay())
def goto_public_timeline(self, local):
self.timeline_generator = api.public_timeline_generator(
self.app, self.user, local=local, limit=40)
promise = self.async_load_timeline(is_initial=True, timeline_name="public")
promise.add_done_callback(lambda *args: self.close_overlay())
def goto_tag_timeline(self, tag, local):
self.timeline_generator = api.tag_timeline_generator(
self.app, self.user, tag, local=local, limit=40)
promise = self.async_load_timeline(
is_initial=True, timeline_name="#{}".format(tag), local=local,
)
promise.add_done_callback(lambda *args: self.close_overlay())
def show_media(self, status):
urls = [m["url"] for m in status.original.data["media_attachments"]]
if urls:
show_media(urls)
def show_context_menu(self, status):
# TODO: show context menu
pass
def show_delete_confirmation(self, status):
def _delete(widget):
promise = self.async_delete_status(self.timeline, status)
promise.add_done_callback(lambda *args: self.close_overlay())
def _close(widget):
self.close_overlay()
widget = StatusDeleteConfirmation(status)
urwid.connect_signal(widget, "close", _close)
urwid.connect_signal(widget, "delete", _delete)
self.open_overlay(widget, title="Delete status?", options=dict(
align="center", width=("relative", 60),
valign="middle", height=5,
))
def post_status(self, content, warning, visibility, in_reply_to_id):
data = api.post_status(self.app, self.user, content,
spoiler_text=warning,
visibility=visibility,
in_reply_to_id=in_reply_to_id)
status = self.make_status(data)
# TODO: instead of this, fetch new items from the timeline?
self.timeline.prepend_status(status)
self.timeline.focus_status(status)
self.footer.set_message("Status posted {} \\o/".format(status.id))
self.close_overlay()
def async_toggle_favourite(self, timeline, status):
def _favourite():
logger.info("Favouriting {}".format(status))
api.favourite(self.app, self.user, status.id)
def _unfavourite():
logger.info("Unfavouriting {}".format(status))
api.unfavourite(self.app, self.user, status.id)
def _done(loop):
# Create a new Status with flipped favourited flag
new_data = status.data
new_data["favourited"] = not status.favourited
new_status = self.make_status(new_data)
timeline.update_status(new_status)
self.run_in_thread(
_unfavourite if status.favourited else _favourite,
done_callback=_done
)
def async_toggle_reblog(self, timeline, status):
def _reblog():
logger.info("Reblogging {}".format(status))
api.reblog(self.app, self.user, status.id)
def _unreblog():
logger.info("Unreblogging {}".format(status))
api.unreblog(self.app, self.user, status.id)
def _done(loop):
# Create a new Status with flipped reblogged flag
new_data = status.data
new_data["reblogged"] = not status.reblogged
new_status = self.make_status(new_data)
timeline.update_status(new_status)
self.run_in_thread(
_unreblog if status.reblogged else _reblog,
done_callback=_done
)
def async_translate(self, timeline, status):
def _translate():
logger.info("Translating {}".format(status))
self.footer.set_message("Translating status {}".format(status.id))
try:
response = api.translate(self.app, self.user, status.id)
if response["content"]:
self.footer.set_message("Status translated")
else:
self.footer.set_error_message("Server returned empty translation")
response = None
except:
response = None
self.footer.set_error_message("Translate server error")
self.loop.set_alarm_in(3, lambda *args: self.footer.clear_message())
return response
def _done(response):
if response is not None:
status.translation = response["content"]
status.translated_from = response["detected_source_language"]
status.show_translation = True
timeline.update_status(status)
# If already translated, toggle showing translation
if status.translation:
status.show_translation = not status.show_translation
timeline.update_status(status)
else:
self.run_in_thread(_translate, done_callback=_done)
def async_delete_status(self, timeline, status):
def _delete():
api.delete_status(self.app, self.user, status.id)
def _done(loop):
timeline.remove_status(status)
return self.run_in_thread(_delete, done_callback=_done)
# --- Overlay handling -----------------------------------------------------
default_overlay_options = dict(
align="center", width=("relative", 80),
valign="middle", height=("relative", 80),
)
def open_overlay(self, widget, options={}, title=""):
top_widget = urwid.LineBox(widget, title=title)
bottom_widget = self.body
_options = self.default_overlay_options.copy()
_options.update(options)
self.overlay = urwid.Overlay(
top_widget,
bottom_widget,
**_options
)
self.body = self.overlay
def close_overlay(self):
self.body = self.overlay.bottom_w
self.overlay = None
# --- Keys -----------------------------------------------------------------
def unhandled_input(self, key):
# TODO: this should not be in unhandled input
if key in ('e', 'E'):
if self.exception:
self.show_exception(self.exception)
elif key in ('g', 'G'):
if not self.overlay:
self.show_goto_menu()
elif key in ('h', 'H'):
if not self.overlay:
self.show_help()
elif key == ',':
if not self.overlay:
self.timeline_generator = api.home_timeline_generator(
self.app, self.user, limit=40)
self.async_load_timeline(is_initial=True, timeline_name=self.timeline.name)
elif key == 'esc':
if self.overlay:
self.close_overlay()
elif self.timeline.name != "home":
# similar to goto_home_timeline() but without handling overlay (absent here)
self.timeline_generator = api.home_timeline_generator(
self.app, self.user, limit=40)
self.async_load_timeline(is_initial=True, timeline_name="home")
elif key in ('q', 'Q'):
if self.overlay:
self.close_overlay()
else:
raise urwid.ExitMainLoop()