Added follow and unfollow commands

This commit is contained in:
Ivan Habunek 2017-04-16 17:15:05 +02:00
parent 64d46955e2
commit a493da5c84
No known key found for this signature in database
GPG key ID: CDBD63C43A30BB95
5 changed files with 211 additions and 24 deletions

View file

@ -39,6 +39,8 @@ Running ``toot <command> -h`` shows the documentation for the given command.
``toot post`` Post a status to your timeline.
``toot search`` Search for accounts or hashtags.
``toot timeline`` Display recent items in your public timeline.
``toot follow`` Follow an account.
``toot unfollow`` Unfollow an account.
=================== ===============================================================
Authentication

View file

@ -2,8 +2,7 @@
import pytest
import requests
from toot import User, App
from toot.console import print_usage, cmd_post_status, cmd_timeline, cmd_upload, cmd_search
from toot import console, User, App
from tests.utils import MockResponse
@ -12,7 +11,7 @@ user = User('ivan@habunek.com', 'xxx')
def test_print_usagecap(capsys):
print_usage()
console.print_usage()
out, err = capsys.readouterr()
assert "toot - interact with Mastodon from the command line" in out
@ -36,7 +35,7 @@ def test_post_status_defaults(monkeypatch, capsys):
monkeypatch.setattr(requests.Request, 'prepare', mock_prepare)
monkeypatch.setattr(requests.Session, 'send', mock_send)
cmd_post_status(app, user, ['Hello world'])
console.cmd_post_status(app, user, ['Hello world'])
out, err = capsys.readouterr()
assert "Toot posted" in out
@ -63,7 +62,7 @@ def test_post_status_with_options(monkeypatch, capsys):
args = ['"Hello world"', '--visibility', 'unlisted']
cmd_post_status(app, user, args)
console.cmd_post_status(app, user, args)
out, err = capsys.readouterr()
assert "Toot posted" in out
@ -73,7 +72,7 @@ def test_post_status_invalid_visibility(monkeypatch, capsys):
args = ['Hello world', '--visibility', 'foo']
with pytest.raises(SystemExit):
cmd_post_status(app, user, args)
console.cmd_post_status(app, user, args)
out, err = capsys.readouterr()
assert "invalid visibility value: 'foo'" in err
@ -83,7 +82,7 @@ def test_post_status_invalid_media(monkeypatch, capsys):
args = ['Hello world', '--media', 'does_not_exist.jpg']
with pytest.raises(SystemExit):
cmd_post_status(app, user, args)
console.cmd_post_status(app, user, args)
out, err = capsys.readouterr()
assert "can't open 'does_not_exist.jpg'" in err
@ -107,7 +106,7 @@ def test_timeline(monkeypatch, capsys):
monkeypatch.setattr(requests, 'get', mock_get)
cmd_timeline(app, user, [])
console.cmd_timeline(app, user, [])
out, err = capsys.readouterr()
assert "The computer can't tell you the emotional story." in out
@ -133,7 +132,7 @@ def test_upload(monkeypatch, capsys):
monkeypatch.setattr(requests.Request, 'prepare', mock_prepare)
monkeypatch.setattr(requests.Session, 'send', mock_send)
cmd_upload(app, user, [__file__])
console.cmd_upload(app, user, [__file__])
out, err = capsys.readouterr()
assert "Uploading media" in out
@ -163,10 +162,104 @@ def test_search(monkeypatch, capsys):
monkeypatch.setattr(requests, 'get', mock_get)
cmd_search(app, user, ['freddy'])
console.cmd_search(app, user, ['freddy'])
out, err = capsys.readouterr()
assert "Hashtags:\n\033[32m#foo\033[0m, \033[32m#bar\033[0m, \033[32m#baz\033[0m" in out
assert "Accounts:" in out
assert "\033[32m@thequeen\033[0m Freddy Mercury" in out
assert "\033[32m@thequeen@other.instance\033[0m Mercury Freddy" in out
def test_follow(monkeypatch, capsys):
def mock_get(url, params, headers):
assert url == 'https://habunek.com/api/v1/search'
assert params == {'q': 'blixa', 'resolve': False}
assert headers == {'Authorization': 'Bearer xxx'}
return MockResponse({
'accounts': [
{'id': 123, 'acct': 'blixa@other.acc'},
{'id': 321, 'acct': 'blixa'},
]
})
def mock_prepare(request):
assert request.url == 'https://habunek.com/api/v1/accounts/321/follow'
def mock_send(*args, **kwargs):
return MockResponse()
monkeypatch.setattr(requests.Request, 'prepare', mock_prepare)
monkeypatch.setattr(requests.Session, 'send', mock_send)
monkeypatch.setattr(requests, 'get', mock_get)
console.cmd_follow(app, user, ['blixa'])
out, err = capsys.readouterr()
assert "You are now following blixa" in out
def test_follow_not_found(monkeypatch, capsys):
def mock_get(url, params, headers):
assert url == 'https://habunek.com/api/v1/search'
assert params == {'q': 'blixa', 'resolve': False}
assert headers == {'Authorization': 'Bearer xxx'}
return MockResponse({
'accounts': []
})
monkeypatch.setattr(requests, 'get', mock_get)
console.cmd_follow(app, user, ['blixa'])
out, err = capsys.readouterr()
assert "Account not found" in err
def test_unfollow(monkeypatch, capsys):
def mock_get(url, params, headers):
assert url == 'https://habunek.com/api/v1/search'
assert params == {'q': 'blixa', 'resolve': False}
assert headers == {'Authorization': 'Bearer xxx'}
return MockResponse({
'accounts': [
{'id': 123, 'acct': 'blixa@other.acc'},
{'id': 321, 'acct': 'blixa'},
]
})
def mock_prepare(request):
assert request.url == 'https://habunek.com/api/v1/accounts/321/unfollow'
def mock_send(*args, **kwargs):
return MockResponse()
monkeypatch.setattr(requests.Request, 'prepare', mock_prepare)
monkeypatch.setattr(requests.Session, 'send', mock_send)
monkeypatch.setattr(requests, 'get', mock_get)
console.cmd_unfollow(app, user, ['blixa'])
out, err = capsys.readouterr()
assert "You are no longer following blixa" in out
def test_unfollow_not_found(monkeypatch, capsys):
def mock_get(url, params, headers):
assert url == 'https://habunek.com/api/v1/search'
assert params == {'q': 'blixa', 'resolve': False}
assert headers == {'Authorization': 'Bearer xxx'}
return MockResponse({
'accounts': []
})
monkeypatch.setattr(requests, 'get', mock_get)
console.cmd_unfollow(app, user, ['blixa'])
out, err = capsys.readouterr()
assert "Account not found" in err

View file

@ -1,6 +1,7 @@
class MockResponse:
def __init__(self, response_data={}):
def __init__(self, response_data={}, ok=True):
self.ok = ok
self.response_data = response_data
def raise_for_status(self):

View file

@ -4,6 +4,7 @@ import logging
import requests
from requests import Request, Session
from future.moves.urllib.parse import quote_plus
from toot import App, User, CLIENT_NAME, CLIENT_WEBSITE
@ -12,6 +13,14 @@ SCOPES = 'read write follow'
logger = logging.getLogger('toot')
class ApiError(Exception):
pass
class NotFoundError(ApiError):
pass
def _log_request(request, prepared_request):
logger.debug(">>> \033[32m{} {}\033[0m".format(request.method, request.url))
logger.debug(">>> DATA: \033[33m{}\033[0m".format(request.data))
@ -20,8 +29,12 @@ def _log_request(request, prepared_request):
def _log_response(response):
logger.debug("<<< \033[32m{}\033[0m".format(response))
logger.debug("<<< \033[33m{}\033[0m".format(response.json()))
if response.ok:
logger.debug("<<< \033[32m{}\033[0m".format(response))
logger.debug("<<< \033[33m{}\033[0m".format(response.json()))
else:
logger.debug("<<< \033[31m{}\033[0m".format(response))
logger.debug("<<< \033[31m{}\033[0m".format(response.content))
def _get(app, user, url, params=None):
@ -48,6 +61,17 @@ def _post(app, user, url, data=None, files=None):
_log_response(response)
if not response.ok:
try:
error = response.json()['error']
except:
error = "Unknown error"
if response.status_code == 404:
raise NotFoundError(error)
raise ApiError(error)
response.raise_for_status()
return response.json()
@ -115,3 +139,15 @@ def search(app, user, query, resolve):
'q': query,
'resolve': resolve,
})
def follow(app, user, account):
url = '/api/v1/accounts/%d/follow' % account
return _post(app, user, url)
def unfollow(app, user, account):
url = '/api/v1/accounts/%d/unfollow' % account
return _post(app, user, url)

View file

@ -15,8 +15,8 @@ from itertools import chain
from argparse import ArgumentParser, FileType
from textwrap import TextWrapper
from toot import DEFAULT_INSTANCE
from toot.api import create_app, login, post_status, timeline_home, upload_media, search
from toot import api, DEFAULT_INSTANCE
from toot.api import ApiError
from toot.config import save_user, load_user, load_app, save_app, CONFIG_APP_FILE, CONFIG_USER_FILE
@ -49,7 +49,7 @@ def create_app_interactive():
print("Registering application with %s" % green(base_url))
try:
app = create_app(base_url)
app = api.create_app(base_url)
except:
raise ConsoleError("Failed authenticating application. Did you enter a valid instance?")
@ -66,7 +66,7 @@ def login_interactive(app):
print("Authenticating...")
try:
user = login(app, email, password)
user = api.login(app, email, password)
except:
raise ConsoleError("Login failed")
@ -86,6 +86,8 @@ def print_usage():
print(" toot post <msg> - toot a new post to your timeline")
print(" toot search - search for accounts or hashtags")
print(" toot timeline - shows your public timeline")
print(" toot follow - follow an account")
print(" toot unfollow - unfollow an account")
print("")
print("To get help for each command run:")
print(" toot <command> --help")
@ -140,7 +142,7 @@ def cmd_timeline(app, user, args):
args = parser.parse_args(args)
items = timeline_home(app, user)
items = api.timeline_home(app, user)
parsed_items = [parse_timeline(t) for t in items]
print("" * 31 + "" + "" * 88)
@ -174,7 +176,7 @@ def cmd_post_status(app, user, args):
else:
media_ids = None
response = post_status(app, user, args.text, media_ids=media_ids, visibility=args.visibility)
response = api.post_status(app, user, args.text, media_ids=media_ids, visibility=args.visibility)
print("Toot posted: " + green(response.get('url')))
@ -194,11 +196,11 @@ def cmd_auth(app, user, args):
print("You are not logged in")
def cmd_login():
def cmd_login(args):
parser = ArgumentParser(prog="toot login",
description="Log into a Mastodon instance",
epilog="https://github.com/ihabunek/toot")
parser.parse_args()
parser.parse_args(args)
app = create_app_interactive()
user = login_interactive(app)
@ -264,7 +266,7 @@ def cmd_search(app, user, args):
args = parser.parse_args(args)
response = search(app, user, args.query, args.resolve)
response = api.search(app, user, args.query, args.resolve)
_print_accounts(response['accounts'])
_print_hashtags(response['hashtags'])
@ -272,7 +274,52 @@ def cmd_search(app, user, args):
def do_upload(app, user, file):
print("Uploading media: {}".format(green(file.name)))
return upload_media(app, user, file)
return api.upload_media(app, user, file)
def _find_account(app, user, account_name):
"""For a given account name, returns the Account object or None if not found."""
response = api.search(app, user, account_name, False)
for account in response['accounts']:
if account['acct'] == account_name:
return account
def cmd_follow(app, user, args):
parser = ArgumentParser(prog="toot follow",
description="Follow an account",
epilog="https://github.com/ihabunek/toot")
parser.add_argument("account", help="Account name, e.g. 'Gargron' or 'polymerwitch@toot.cat'")
args = parser.parse_args(args)
account = _find_account(app, user, args.account)
if not account:
print_error("Account not found")
return
api.follow(app, user, account['id'])
print(green(u"✓ You are now following %s" % args.account))
def cmd_unfollow(app, user, args):
parser = ArgumentParser(prog="toot unfollow",
description="Unfollow an account",
epilog="https://github.com/ihabunek/toot")
parser.add_argument("account", help="Account name, e.g. 'Gargron' or 'polymerwitch@toot.cat'")
args = parser.parse_args(args)
account = _find_account(app, user, args.account)
if not account:
print_error("Account not found")
return
api.unfollow(app, user, account['id'])
print(green(u"✓ You are no longer following %s" % args.account))
def run_command(command, args):
@ -281,7 +328,7 @@ def run_command(command, args):
# Commands which can run when not logged in
if command == 'login':
return cmd_login()
return cmd_login(args)
if command == 'auth':
return cmd_auth(app, user, args)
@ -307,6 +354,12 @@ def run_command(command, args):
if command == 'search':
return cmd_search(app, user, args)
if command == 'follow':
return cmd_follow(app, user, args)
if command == 'unfollow':
return cmd_unfollow(app, user, args)
print_error("Unknown command '{}'\n".format(command))
print_usage()
@ -325,3 +378,5 @@ def main():
run_command(command, args)
except ConsoleError as e:
print_error(str(e))
except ApiError as e:
print_error(str(e))