witchie/toot/commands.py

369 lines
12 KiB
Python
Raw Normal View History

2017-04-19 12:47:30 +00:00
# -*- coding: utf-8 -*-
import sys
2017-12-30 12:32:52 +00:00
from toot import api, config
from toot.auth import login_interactive, login_browser_interactive, create_app_interactive
from toot.exceptions import ConsoleError, NotFoundError
from toot.output import (print_out, print_instance, print_account, print_acct_list,
print_search_results, print_timeline, print_notifications)
from toot.utils import assert_domain_exists, editor_input, multiline_input, EOF_KEY
2017-04-19 12:47:30 +00:00
def get_timeline_generator(app, user, args):
2018-06-12 08:40:36 +00:00
# Make sure tag, list and public are not used simultaneously
if len([arg for arg in [args.tag, args.list, args.public] if arg]) > 1:
raise ConsoleError("Only one of --public, --tag, or --list can be used at one time.")
2018-06-12 08:40:36 +00:00
if args.local and not (args.public or args.tag):
raise ConsoleError("The --local option is only valid alongside --public or --tag.")
if args.instance and not (args.public or args.tag):
raise ConsoleError("The --instance option is only valid alongside --public or --tag.")
2018-06-12 08:40:36 +00:00
if args.public:
if args.instance:
return api.anon_public_timeline_generator(args.instance, local=args.local, limit=args.count)
else:
return api.public_timeline_generator(app, user, local=args.local, limit=args.count)
2018-06-12 08:40:36 +00:00
elif args.tag:
if args.instance:
return api.anon_tag_timeline_generator(args.instance, args.tag, limit=args.count)
else:
return api.tag_timeline_generator(app, user, args.tag, local=args.local, limit=args.count)
elif args.list:
return api.timeline_list_generator(app, user, args.list, limit=args.count)
else:
return api.home_timeline_generator(app, user, limit=args.count)
def timeline(app, user, args):
generator = get_timeline_generator(app, user, args)
2018-06-12 08:40:36 +00:00
while(True):
try:
items = next(generator)
except StopIteration:
print_out("That's all folks.")
return
if args.reverse:
items = reversed(items)
print_timeline(items)
if args.once or not sys.stdout.isatty():
break
char = input("\nContinue? [Y/n] ")
if char.lower() == "n":
break
2017-04-19 12:47:30 +00:00
def thread(app, user, args):
toot = api.single_status(app, user, args.status_id)
context = api.context(app, user, args.status_id)
thread = []
for item in context['ancestors']:
thread.append(item)
thread.append(toot)
for item in context['descendants']:
thread.append(item)
print_timeline(thread)
2017-04-19 12:47:30 +00:00
2017-04-19 12:47:30 +00:00
def post(app, user, args):
# TODO: this might be achievable, explore options
if args.editor and not sys.stdin.isatty():
raise ConsoleError("Cannot run editor if not in tty.")
if args.media and len(args.media) > 4:
raise ConsoleError("Cannot attach more than 4 files.")
# Read any text that might be piped to stdin
if not args.text and not sys.stdin.isatty():
args.text = sys.stdin.read().rstrip()
2021-08-26 15:54:31 +00:00
# Match media to corresponding description and upload
media = args.media or []
descriptions = args.description or []
uploaded_media = []
for idx, file in enumerate(media):
description = descriptions[idx].strip() if idx < len(descriptions) else None
result = _do_upload(app, user, file, description)
uploaded_media.append(result)
media_ids = [m["id"] for m in uploaded_media]
2017-04-19 12:47:30 +00:00
2021-08-26 15:54:31 +00:00
if uploaded_media and not args.text:
args.text = "\n".join(m['text_url'] for m in uploaded_media)
if args.editor:
args.text = editor_input(args.editor, args.text)
elif not args.text:
print_out("Write or paste your toot. Press <yellow>{}</yellow> to post it.".format(EOF_KEY))
args.text = multiline_input()
if not args.text:
raise ConsoleError("You must specify either text or media to post.")
response = api.post_status(
app, user, args.text,
visibility=args.visibility,
media_ids=media_ids,
sensitive=args.sensitive,
spoiler_text=args.spoiler_text,
2018-06-13 10:43:31 +00:00
in_reply_to_id=args.reply_to,
language=args.language,
2021-01-17 11:42:08 +00:00
scheduled_at=args.scheduled_at,
2021-08-28 19:08:44 +00:00
content_type=args.content_type
)
2017-04-19 12:47:30 +00:00
2021-01-17 11:42:08 +00:00
if "scheduled_at" in response:
print_out("Toot scheduled for: <green>{}</green>".format(response["scheduled_at"]))
else:
print_out("Toot posted: <green>{}</green>".format(response.get('url')))
2017-04-19 12:47:30 +00:00
2018-06-14 08:40:16 +00:00
def delete(app, user, args):
api.delete_status(app, user, args.status_id)
print_out("<green>✓ Status deleted</green>")
def favourite(app, user, args):
api.favourite(app, user, args.status_id)
print_out("<green>✓ Status favourited</green>")
def unfavourite(app, user, args):
api.unfavourite(app, user, args.status_id)
print_out("<green>✓ Status unfavourited</green>")
def reblog(app, user, args):
api.reblog(app, user, args.status_id)
print_out("<green>✓ Status reblogged</green>")
def unreblog(app, user, args):
api.unreblog(app, user, args.status_id)
print_out("<green>✓ Status unreblogged</green>")
def pin(app, user, args):
api.pin(app, user, args.status_id)
print_out("<green>✓ Status pinned</green>")
def unpin(app, user, args):
api.unpin(app, user, args.status_id)
print_out("<green>✓ Status unpinned</green>")
2022-11-17 05:28:41 +00:00
def bookmark(app, user, args):
api.bookmark(app, user, args.status_id)
print_out("<green>✓ Status bookmarked</green>")
def unbookmark(app, user, args):
api.unbookmark(app, user, args.status_id)
print_out("<green>✓ Status unbookmarked</green>")
2019-01-24 08:36:25 +00:00
def reblogged_by(app, user, args):
for account in api.reblogged_by(app, user, args.status_id):
print_out("{}\n @{}".format(account['display_name'], account['acct']))
2017-04-19 12:47:30 +00:00
def auth(app, user, args):
config_data = config.load_config()
if not config_data["users"]:
print_out("You are not logged in to any accounts")
return
active_user = config_data["active_user"]
print_out("Authenticated accounts:")
for uid, u in config_data["users"].items():
active_label = "ACTIVE" if active_user == uid else ""
print_out("* <green>{}</green> <yellow>{}</yellow>".format(uid, active_label))
path = config.get_config_file_path()
print_out("\nAuth tokens are stored in: <blue>{}</blue>".format(path))
2017-04-19 12:47:30 +00:00
2018-06-15 07:39:28 +00:00
def login_cli(app, user, args):
2018-12-25 01:20:30 +00:00
app = create_app_interactive(instance=args.instance, scheme=args.scheme)
login_interactive(app, args.email)
2017-04-19 12:47:30 +00:00
print_out()
print_out("<green>✓ Successfully logged in.</green>")
2017-04-19 12:47:30 +00:00
2018-06-15 07:39:28 +00:00
def login(app, user, args):
2018-12-25 01:20:30 +00:00
app = create_app_interactive(instance=args.instance, scheme=args.scheme)
login_browser_interactive(app)
print_out()
print_out("<green>✓ Successfully logged in.</green>")
2017-04-19 12:47:30 +00:00
def logout(app, user, args):
user = config.load_user(args.account, throw=True)
config.delete_user(user)
print_out("<green>✓ User {} logged out</green>".format(config.user_id(user)))
2017-04-19 12:47:30 +00:00
def activate(app, user, args):
user = config.load_user(args.account, throw=True)
config.activate_user(user)
print_out("<green>✓ User {} active</green>".format(config.user_id(user)))
2017-04-19 12:47:30 +00:00
def upload(app, user, args):
2021-08-26 15:54:31 +00:00
response = _do_upload(app, user, args.file, args.description)
2017-04-19 12:47:30 +00:00
2017-12-30 12:14:37 +00:00
msg = "Successfully uploaded media ID <yellow>{}</yellow>, type '<yellow>{}</yellow>'"
print_out()
2017-12-30 12:14:37 +00:00
print_out(msg.format(response['id'], response['type']))
print_out("Original URL: <green>{}</green>".format(response['url']))
print_out("Preview URL: <green>{}</green>".format(response['preview_url']))
print_out("Text URL: <green>{}</green>".format(response['text_url']))
2017-04-19 12:47:30 +00:00
def search(app, user, args):
response = api.search(app, user, args.query, args.resolve)
2017-12-29 13:42:51 +00:00
print_search_results(response)
2017-04-19 12:47:30 +00:00
2021-08-26 15:54:31 +00:00
def _do_upload(app, user, file, description):
print_out("Uploading media: <green>{}</green>".format(file.name))
2021-08-26 15:54:31 +00:00
return api.upload_media(app, user, file, description=description)
2017-04-19 12:47:30 +00:00
def _find_account(app, user, account_name):
"""For a given account name, returns the Account object.
2017-04-19 12:47:30 +00:00
Raises an exception if not found.
"""
if not account_name:
raise ConsoleError("Empty account name given")
accounts = api.search_accounts(app, user, account_name)
if account_name[0] == "@":
account_name = account_name[1:]
for account in accounts:
2022-11-05 17:07:32 +00:00
# Normalise string matching because usernames are case insensitive
if account['acct'].lower() == account_name.lower():
2017-04-19 12:47:30 +00:00
return account
2017-04-19 13:29:40 +00:00
raise ConsoleError("Account not found")
2017-04-19 12:47:30 +00:00
def follow(app, user, args):
account = _find_account(app, user, args.account)
api.follow(app, user, account['id'])
print_out("<green>✓ You are now following {}</green>".format(args.account))
2017-04-19 12:47:30 +00:00
def unfollow(app, user, args):
account = _find_account(app, user, args.account)
2017-04-26 09:49:21 +00:00
api.unfollow(app, user, account['id'])
print_out("<green>✓ You are no longer following {}</green>".format(args.account))
2017-04-19 12:47:30 +00:00
def following(app, user, args):
account = _find_account(app, user, args.account)
response = api.following(app, user, account['id'])
print_acct_list(response)
def followers(app, user, args):
account = _find_account(app, user, args.account)
response = api.followers(app, user, account['id'])
print_acct_list(response)
2017-04-19 12:47:30 +00:00
2017-04-26 09:49:21 +00:00
def mute(app, user, args):
account = _find_account(app, user, args.account)
api.mute(app, user, account['id'])
print_out("<green>✓ You have muted {}</green>".format(args.account))
2017-04-19 12:47:30 +00:00
2017-04-26 09:49:21 +00:00
def unmute(app, user, args):
account = _find_account(app, user, args.account)
api.unmute(app, user, account['id'])
print_out("<green>✓ {} is no longer muted</green>".format(args.account))
2017-04-26 09:49:21 +00:00
def block(app, user, args):
account = _find_account(app, user, args.account)
api.block(app, user, account['id'])
print_out("<green>✓ You are now blocking {}</green>".format(args.account))
2017-04-26 09:49:21 +00:00
def unblock(app, user, args):
account = _find_account(app, user, args.account)
api.unblock(app, user, account['id'])
print_out("<green>✓ {} is no longer blocked</green>".format(args.account))
2017-04-19 12:47:30 +00:00
def whoami(app, user, args):
2017-04-19 13:29:40 +00:00
account = api.verify_credentials(app, user)
2017-12-29 13:42:51 +00:00
print_account(account)
2017-04-19 12:47:30 +00:00
2017-04-19 13:29:40 +00:00
def whois(app, user, args):
account = _find_account(app, user, args.account)
2017-12-29 13:42:51 +00:00
print_account(account)
2017-12-29 13:26:40 +00:00
def instance(app, user, args):
name = args.instance or (app and app.instance)
if not name:
raise ConsoleError("Please specify instance name.")
assert_domain_exists(name)
try:
instance = api.get_instance(name, args.scheme)
print_instance(instance)
except NotFoundError:
raise ConsoleError(
"Instance not found at {}.\n"
"The given domain probably does not host a Mastodon instance.".format(name)
)
def notifications(app, user, args):
if args.clear:
api.clear_notifications(app, user)
print_out("<green>Cleared notifications</green>")
return
exclude = []
if args.mentions:
# Filter everything except mentions
# https://docs.joinmastodon.org/methods/notifications/
exclude = ["follow", "favourite", "reblog", "poll", "follow_request"]
notifications = api.get_notifications(app, user, exclude_types=exclude)
if not notifications:
print_out("<yellow>No notification</yellow>")
return
if args.reverse:
notifications = reversed(notifications)
print_notifications(notifications)
def tui(app, user, args):
from .tui.app import TUI
TUI.create(app, user).run()