Use http methods instead of requests directly
This commit is contained in:
parent
20eaf86b56
commit
92d4dc745a
9 changed files with 237 additions and 195 deletions
|
@ -2,79 +2,76 @@
|
||||||
import pytest
|
import pytest
|
||||||
import requests
|
import requests
|
||||||
|
|
||||||
|
from requests import Request
|
||||||
|
|
||||||
from toot import App, CLIENT_NAME, CLIENT_WEBSITE
|
from toot import App, CLIENT_NAME, CLIENT_WEBSITE
|
||||||
from toot.api import create_app, login, SCOPES, AuthenticationError
|
from toot.api import create_app, login, SCOPES, AuthenticationError
|
||||||
from tests.utils import MockResponse
|
from tests.utils import MockResponse, Expectations
|
||||||
|
|
||||||
|
|
||||||
def test_create_app(monkeypatch):
|
def test_create_app(monkeypatch):
|
||||||
response = {
|
request = Request('POST', 'http://bigfish.software/api/v1/apps',
|
||||||
'client_id': 'foo',
|
data={'website': CLIENT_WEBSITE,
|
||||||
'client_secret': 'bar',
|
'client_name': CLIENT_NAME,
|
||||||
}
|
'scopes': SCOPES,
|
||||||
|
'redirect_uris': 'urn:ietf:wg:oauth:2.0:oob'})
|
||||||
|
|
||||||
def mock_post(url, data):
|
response = MockResponse({'client_id': 'foo',
|
||||||
assert url == 'https://bigfish.software/api/v1/apps'
|
'client_secret': 'bar'})
|
||||||
assert data == {
|
|
||||||
'website': CLIENT_WEBSITE,
|
|
||||||
'client_name': CLIENT_NAME,
|
|
||||||
'scopes': SCOPES,
|
|
||||||
'redirect_uris': 'urn:ietf:wg:oauth:2.0:oob'
|
|
||||||
}
|
|
||||||
return MockResponse(response)
|
|
||||||
|
|
||||||
monkeypatch.setattr(requests, 'post', mock_post)
|
e = Expectations()
|
||||||
|
e.add(request, response)
|
||||||
|
e.patch(monkeypatch)
|
||||||
|
|
||||||
assert create_app('bigfish.software') == response
|
create_app('bigfish.software')
|
||||||
|
|
||||||
|
|
||||||
def test_login(monkeypatch):
|
def test_login(monkeypatch):
|
||||||
app = App('bigfish.software', 'https://bigfish.software', 'foo', 'bar')
|
app = App('bigfish.software', 'https://bigfish.software', 'foo', 'bar')
|
||||||
|
|
||||||
response = {
|
data = {
|
||||||
|
'grant_type': 'password',
|
||||||
|
'client_id': app.client_id,
|
||||||
|
'client_secret': app.client_secret,
|
||||||
|
'username': 'user',
|
||||||
|
'password': 'pass',
|
||||||
|
'scope': SCOPES,
|
||||||
|
}
|
||||||
|
|
||||||
|
request = Request('POST', 'https://bigfish.software/oauth/token', data=data)
|
||||||
|
|
||||||
|
response = MockResponse({
|
||||||
'token_type': 'bearer',
|
'token_type': 'bearer',
|
||||||
'scope': 'read write follow',
|
'scope': 'read write follow',
|
||||||
'access_token': 'xxx',
|
'access_token': 'xxx',
|
||||||
'created_at': 1492523699
|
'created_at': 1492523699
|
||||||
}
|
})
|
||||||
|
|
||||||
def mock_post(url, data, allow_redirects):
|
e = Expectations()
|
||||||
assert not allow_redirects
|
e.add(request, response)
|
||||||
assert url == 'https://bigfish.software/oauth/token'
|
e.patch(monkeypatch)
|
||||||
assert data == {
|
|
||||||
'grant_type': 'password',
|
|
||||||
'client_id': app.client_id,
|
|
||||||
'client_secret': app.client_secret,
|
|
||||||
'username': 'user',
|
|
||||||
'password': 'pass',
|
|
||||||
'scope': SCOPES,
|
|
||||||
}
|
|
||||||
|
|
||||||
return MockResponse(response)
|
login(app, 'user', 'pass')
|
||||||
|
|
||||||
monkeypatch.setattr(requests, 'post', mock_post)
|
|
||||||
|
|
||||||
assert login(app, 'user', 'pass') == response
|
|
||||||
|
|
||||||
|
|
||||||
def test_login_failed(monkeypatch):
|
def test_login_failed(monkeypatch):
|
||||||
app = App('bigfish.software', 'https://bigfish.software', 'foo', 'bar')
|
app = App('bigfish.software', 'https://bigfish.software', 'foo', 'bar')
|
||||||
|
|
||||||
def mock_post(url, data, allow_redirects):
|
data = {
|
||||||
assert not allow_redirects
|
'grant_type': 'password',
|
||||||
assert url == 'https://bigfish.software/oauth/token'
|
'client_id': app.client_id,
|
||||||
assert data == {
|
'client_secret': app.client_secret,
|
||||||
'grant_type': 'password',
|
'username': 'user',
|
||||||
'client_id': app.client_id,
|
'password': 'pass',
|
||||||
'client_secret': app.client_secret,
|
'scope': SCOPES,
|
||||||
'username': 'user',
|
}
|
||||||
'password': 'pass',
|
|
||||||
'scope': SCOPES,
|
|
||||||
}
|
|
||||||
|
|
||||||
return MockResponse(is_redirect=True)
|
request = Request('POST', 'https://bigfish.software/oauth/token', data=data)
|
||||||
|
response = MockResponse(is_redirect=True)
|
||||||
|
|
||||||
monkeypatch.setattr(requests, 'post', mock_post)
|
e = Expectations()
|
||||||
|
e.add(request, response)
|
||||||
|
e.patch(monkeypatch)
|
||||||
|
|
||||||
with pytest.raises(AuthenticationError):
|
with pytest.raises(AuthenticationError):
|
||||||
login(app, 'user', 'pass')
|
login(app, 'user', 'pass')
|
||||||
|
|
|
@ -15,6 +15,7 @@ def test_register_app(monkeypatch):
|
||||||
assert app.client_secret == "cs"
|
assert app.client_secret == "cs"
|
||||||
|
|
||||||
monkeypatch.setattr(api, 'create_app', retval(app_data))
|
monkeypatch.setattr(api, 'create_app', retval(app_data))
|
||||||
|
monkeypatch.setattr(api, 'get_instance', retval({"title": "foo", "version": "1"}))
|
||||||
monkeypatch.setattr(config, 'save_app', assert_app)
|
monkeypatch.setattr(config, 'save_app', assert_app)
|
||||||
|
|
||||||
app = auth.register_app("foo.bar")
|
app = auth.register_app("foo.bar")
|
||||||
|
|
|
@ -3,10 +3,12 @@ import pytest
|
||||||
import requests
|
import requests
|
||||||
import re
|
import re
|
||||||
|
|
||||||
|
from requests import Request
|
||||||
|
|
||||||
from toot import console, User, App
|
from toot import console, User, App
|
||||||
from toot.exceptions import ConsoleError
|
from toot.exceptions import ConsoleError
|
||||||
|
|
||||||
from tests.utils import MockResponse
|
from tests.utils import MockResponse, Expectations
|
||||||
|
|
||||||
app = App('habunek.com', 'https://habunek.com', 'foo', 'bar')
|
app = App('habunek.com', 'https://habunek.com', 'foo', 'bar')
|
||||||
user = User('habunek.com', 'ivan@habunek.com', 'xxx')
|
user = User('habunek.com', 'ivan@habunek.com', 'xxx')
|
||||||
|
@ -34,7 +36,7 @@ def test_post_defaults(monkeypatch, capsys):
|
||||||
'media_ids[]': None,
|
'media_ids[]': None,
|
||||||
}
|
}
|
||||||
|
|
||||||
def mock_send(*args):
|
def mock_send(*args, **kwargs):
|
||||||
return MockResponse({
|
return MockResponse({
|
||||||
'url': 'http://ivan.habunek.com/'
|
'url': 'http://ivan.habunek.com/'
|
||||||
})
|
})
|
||||||
|
@ -59,7 +61,7 @@ def test_post_with_options(monkeypatch, capsys):
|
||||||
'media_ids[]': None,
|
'media_ids[]': None,
|
||||||
}
|
}
|
||||||
|
|
||||||
def mock_send(*args):
|
def mock_send(*args, **kwargs):
|
||||||
return MockResponse({
|
return MockResponse({
|
||||||
'url': 'http://ivan.habunek.com/'
|
'url': 'http://ivan.habunek.com/'
|
||||||
})
|
})
|
||||||
|
@ -96,11 +98,12 @@ def test_post_invalid_media(monkeypatch, capsys):
|
||||||
|
|
||||||
|
|
||||||
def test_timeline(monkeypatch, capsys):
|
def test_timeline(monkeypatch, capsys):
|
||||||
def mock_get(url, params, headers=None):
|
def mock_prepare(request):
|
||||||
assert url == 'https://habunek.com/api/v1/timelines/home'
|
assert request.url == 'https://habunek.com/api/v1/timelines/home'
|
||||||
assert headers == {'Authorization': 'Bearer xxx'}
|
assert request.headers == {'Authorization': 'Bearer xxx'}
|
||||||
assert params is None
|
assert request.params == {}
|
||||||
|
|
||||||
|
def mock_send(*args, **kwargs):
|
||||||
return MockResponse([{
|
return MockResponse([{
|
||||||
'account': {
|
'account': {
|
||||||
'display_name': 'Frank Zappa',
|
'display_name': 'Frank Zappa',
|
||||||
|
@ -111,7 +114,8 @@ def test_timeline(monkeypatch, capsys):
|
||||||
'reblog': None,
|
'reblog': None,
|
||||||
}])
|
}])
|
||||||
|
|
||||||
monkeypatch.setattr(requests, 'get', mock_get)
|
monkeypatch.setattr(requests.Request, 'prepare', mock_prepare)
|
||||||
|
monkeypatch.setattr(requests.Session, 'send', mock_send)
|
||||||
|
|
||||||
console.run_command(app, user, 'timeline', [])
|
console.run_command(app, user, 'timeline', [])
|
||||||
|
|
||||||
|
@ -127,7 +131,7 @@ def test_upload(monkeypatch, capsys):
|
||||||
assert request.headers == {'Authorization': 'Bearer xxx'}
|
assert request.headers == {'Authorization': 'Bearer xxx'}
|
||||||
assert request.files.get('file') is not None
|
assert request.files.get('file') is not None
|
||||||
|
|
||||||
def mock_send(*args):
|
def mock_send(*args, **kwargs):
|
||||||
return MockResponse({
|
return MockResponse({
|
||||||
'id': 123,
|
'id': 123,
|
||||||
'url': 'https://bigfish.software/123/456',
|
'url': 'https://bigfish.software/123/456',
|
||||||
|
@ -147,14 +151,15 @@ def test_upload(monkeypatch, capsys):
|
||||||
|
|
||||||
|
|
||||||
def test_search(monkeypatch, capsys):
|
def test_search(monkeypatch, capsys):
|
||||||
def mock_get(url, params, headers=None):
|
def mock_prepare(request):
|
||||||
assert url == 'https://habunek.com/api/v1/search'
|
assert request.url == 'https://habunek.com/api/v1/search'
|
||||||
assert headers == {'Authorization': 'Bearer xxx'}
|
assert request.headers == {'Authorization': 'Bearer xxx'}
|
||||||
assert params == {
|
assert request.params == {
|
||||||
'q': 'freddy',
|
'q': 'freddy',
|
||||||
'resolve': False,
|
'resolve': False,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def mock_send(*args, **kwargs):
|
||||||
return MockResponse({
|
return MockResponse({
|
||||||
'hashtags': ['foo', 'bar', 'baz'],
|
'hashtags': ['foo', 'bar', 'baz'],
|
||||||
'accounts': [{
|
'accounts': [{
|
||||||
|
@ -167,7 +172,8 @@ def test_search(monkeypatch, capsys):
|
||||||
'statuses': [],
|
'statuses': [],
|
||||||
})
|
})
|
||||||
|
|
||||||
monkeypatch.setattr(requests, 'get', mock_get)
|
monkeypatch.setattr(requests.Request, 'prepare', mock_prepare)
|
||||||
|
monkeypatch.setattr(requests.Session, 'send', mock_send)
|
||||||
|
|
||||||
console.run_command(app, user, 'search', ['freddy'])
|
console.run_command(app, user, 'search', ['freddy'])
|
||||||
|
|
||||||
|
@ -179,25 +185,20 @@ def test_search(monkeypatch, capsys):
|
||||||
|
|
||||||
|
|
||||||
def test_follow(monkeypatch, capsys):
|
def test_follow(monkeypatch, capsys):
|
||||||
def mock_get(url, params, headers):
|
req1 = Request('GET', 'https://habunek.com/api/v1/accounts/search',
|
||||||
assert url == 'https://habunek.com/api/v1/accounts/search'
|
params={'q': 'blixa'},
|
||||||
assert params == {'q': 'blixa'}
|
headers={'Authorization': 'Bearer xxx'})
|
||||||
assert headers == {'Authorization': 'Bearer xxx'}
|
res1 = MockResponse([
|
||||||
|
{'id': 123, 'acct': 'blixa@other.acc'},
|
||||||
|
{'id': 321, 'acct': 'blixa'},
|
||||||
|
])
|
||||||
|
|
||||||
return MockResponse([
|
req2 = Request('POST', 'https://habunek.com/api/v1/accounts/321/follow',
|
||||||
{'id': 123, 'acct': 'blixa@other.acc'},
|
headers={'Authorization': 'Bearer xxx'})
|
||||||
{'id': 321, 'acct': 'blixa'},
|
res2 = MockResponse()
|
||||||
])
|
|
||||||
|
|
||||||
def mock_prepare(request):
|
expectations = Expectations([req1, req2], [res1, res2])
|
||||||
assert request.url == 'https://habunek.com/api/v1/accounts/321/follow'
|
expectations.patch(monkeypatch)
|
||||||
|
|
||||||
def mock_send(*args, **kwargs):
|
|
||||||
return MockResponse()
|
|
||||||
|
|
||||||
monkeypatch.setattr(requests.Request, 'prepare', mock_prepare)
|
|
||||||
monkeypatch.setattr(requests.Session, 'send', mock_send)
|
|
||||||
monkeypatch.setattr(requests, 'get', mock_get)
|
|
||||||
|
|
||||||
console.run_command(app, user, 'follow', ['blixa'])
|
console.run_command(app, user, 'follow', ['blixa'])
|
||||||
|
|
||||||
|
@ -206,14 +207,12 @@ def test_follow(monkeypatch, capsys):
|
||||||
|
|
||||||
|
|
||||||
def test_follow_not_found(monkeypatch, capsys):
|
def test_follow_not_found(monkeypatch, capsys):
|
||||||
def mock_get(url, params, headers):
|
req = Request('GET', 'https://habunek.com/api/v1/accounts/search',
|
||||||
assert url == 'https://habunek.com/api/v1/accounts/search'
|
params={'q': 'blixa'}, headers={'Authorization': 'Bearer xxx'})
|
||||||
assert params == {'q': 'blixa'}
|
res = MockResponse()
|
||||||
assert headers == {'Authorization': 'Bearer xxx'}
|
|
||||||
|
|
||||||
return MockResponse([])
|
expectations = Expectations([req], [res])
|
||||||
|
expectations.patch(monkeypatch)
|
||||||
monkeypatch.setattr(requests, 'get', mock_get)
|
|
||||||
|
|
||||||
with pytest.raises(ConsoleError) as ex:
|
with pytest.raises(ConsoleError) as ex:
|
||||||
console.run_command(app, user, 'follow', ['blixa'])
|
console.run_command(app, user, 'follow', ['blixa'])
|
||||||
|
@ -221,25 +220,20 @@ def test_follow_not_found(monkeypatch, capsys):
|
||||||
|
|
||||||
|
|
||||||
def test_unfollow(monkeypatch, capsys):
|
def test_unfollow(monkeypatch, capsys):
|
||||||
def mock_get(url, params, headers):
|
req1 = Request('GET', 'https://habunek.com/api/v1/accounts/search',
|
||||||
assert url == 'https://habunek.com/api/v1/accounts/search'
|
params={'q': 'blixa'},
|
||||||
assert params == {'q': 'blixa'}
|
headers={'Authorization': 'Bearer xxx'})
|
||||||
assert headers == {'Authorization': 'Bearer xxx'}
|
res1 = MockResponse([
|
||||||
|
{'id': 123, 'acct': 'blixa@other.acc'},
|
||||||
|
{'id': 321, 'acct': 'blixa'},
|
||||||
|
])
|
||||||
|
|
||||||
return MockResponse([
|
req2 = Request('POST', 'https://habunek.com/api/v1/accounts/321/unfollow',
|
||||||
{'id': 123, 'acct': 'blixa@other.acc'},
|
headers={'Authorization': 'Bearer xxx'})
|
||||||
{'id': 321, 'acct': 'blixa'},
|
res2 = MockResponse()
|
||||||
])
|
|
||||||
|
|
||||||
def mock_prepare(request):
|
expectations = Expectations([req1, req2], [res1, res2])
|
||||||
assert request.url == 'https://habunek.com/api/v1/accounts/321/unfollow'
|
expectations.patch(monkeypatch)
|
||||||
|
|
||||||
def mock_send(*args, **kwargs):
|
|
||||||
return MockResponse()
|
|
||||||
|
|
||||||
monkeypatch.setattr(requests.Request, 'prepare', mock_prepare)
|
|
||||||
monkeypatch.setattr(requests.Session, 'send', mock_send)
|
|
||||||
monkeypatch.setattr(requests, 'get', mock_get)
|
|
||||||
|
|
||||||
console.run_command(app, user, 'unfollow', ['blixa'])
|
console.run_command(app, user, 'unfollow', ['blixa'])
|
||||||
|
|
||||||
|
@ -248,14 +242,12 @@ def test_unfollow(monkeypatch, capsys):
|
||||||
|
|
||||||
|
|
||||||
def test_unfollow_not_found(monkeypatch, capsys):
|
def test_unfollow_not_found(monkeypatch, capsys):
|
||||||
def mock_get(url, params, headers):
|
req = Request('GET', 'https://habunek.com/api/v1/accounts/search',
|
||||||
assert url == 'https://habunek.com/api/v1/accounts/search'
|
params={'q': 'blixa'}, headers={'Authorization': 'Bearer xxx'})
|
||||||
assert params == {'q': 'blixa'}
|
res = MockResponse([])
|
||||||
assert headers == {'Authorization': 'Bearer xxx'}
|
|
||||||
|
|
||||||
return MockResponse([])
|
expectations = Expectations([req], [res])
|
||||||
|
expectations.patch(monkeypatch)
|
||||||
monkeypatch.setattr(requests, 'get', mock_get)
|
|
||||||
|
|
||||||
with pytest.raises(ConsoleError) as ex:
|
with pytest.raises(ConsoleError) as ex:
|
||||||
console.run_command(app, user, 'unfollow', ['blixa'])
|
console.run_command(app, user, 'unfollow', ['blixa'])
|
||||||
|
@ -263,30 +255,29 @@ def test_unfollow_not_found(monkeypatch, capsys):
|
||||||
|
|
||||||
|
|
||||||
def test_whoami(monkeypatch, capsys):
|
def test_whoami(monkeypatch, capsys):
|
||||||
def mock_get(url, params, headers=None):
|
req = Request('GET', 'https://habunek.com/api/v1/accounts/verify_credentials',
|
||||||
assert url == 'https://habunek.com/api/v1/accounts/verify_credentials'
|
headers={'Authorization': 'Bearer xxx'})
|
||||||
assert headers == {'Authorization': 'Bearer xxx'}
|
|
||||||
assert params is None
|
|
||||||
|
|
||||||
return MockResponse({
|
res = MockResponse({
|
||||||
'acct': 'ihabunek',
|
'acct': 'ihabunek',
|
||||||
'avatar': 'https://files.mastodon.social/accounts/avatars/000/046/103/original/6a1304e135cac514.jpg?1491312434',
|
'avatar': 'https://files.mastodon.social/accounts/avatars/000/046/103/original/6a1304e135cac514.jpg?1491312434',
|
||||||
'avatar_static': 'https://files.mastodon.social/accounts/avatars/000/046/103/original/6a1304e135cac514.jpg?1491312434',
|
'avatar_static': 'https://files.mastodon.social/accounts/avatars/000/046/103/original/6a1304e135cac514.jpg?1491312434',
|
||||||
'created_at': '2017-04-04T13:23:09.777Z',
|
'created_at': '2017-04-04T13:23:09.777Z',
|
||||||
'display_name': 'Ivan Habunek',
|
'display_name': 'Ivan Habunek',
|
||||||
'followers_count': 5,
|
'followers_count': 5,
|
||||||
'following_count': 9,
|
'following_count': 9,
|
||||||
'header': '/headers/original/missing.png',
|
'header': '/headers/original/missing.png',
|
||||||
'header_static': '/headers/original/missing.png',
|
'header_static': '/headers/original/missing.png',
|
||||||
'id': 46103,
|
'id': 46103,
|
||||||
'locked': False,
|
'locked': False,
|
||||||
'note': 'A developer.',
|
'note': 'A developer.',
|
||||||
'statuses_count': 19,
|
'statuses_count': 19,
|
||||||
'url': 'https://mastodon.social/@ihabunek',
|
'url': 'https://mastodon.social/@ihabunek',
|
||||||
'username': 'ihabunek'
|
'username': 'ihabunek'
|
||||||
})
|
})
|
||||||
|
|
||||||
monkeypatch.setattr(requests, 'get', mock_get)
|
expectations = Expectations([req], [res])
|
||||||
|
expectations.patch(monkeypatch)
|
||||||
|
|
||||||
console.run_command(app, user, 'whoami', [])
|
console.run_command(app, user, 'whoami', [])
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,31 @@
|
||||||
|
import requests
|
||||||
|
|
||||||
|
|
||||||
|
class Expectations():
|
||||||
|
"""Helper for mocking http requests"""
|
||||||
|
def __init__(self, requests=[], responses=[]):
|
||||||
|
self.requests = requests
|
||||||
|
self.responses = responses
|
||||||
|
|
||||||
|
def mock_prepare(self, request):
|
||||||
|
expected = self.requests.pop(0)
|
||||||
|
assert request.method == expected.method
|
||||||
|
assert request.url == expected.url
|
||||||
|
assert request.data == expected.data
|
||||||
|
assert request.headers == expected.headers
|
||||||
|
assert request.params == expected.params
|
||||||
|
|
||||||
|
def mock_send(self, *args, **kwargs):
|
||||||
|
return self.responses.pop(0)
|
||||||
|
|
||||||
|
def add(self, req, res):
|
||||||
|
self.requests.append(req)
|
||||||
|
self.responses.append(res)
|
||||||
|
|
||||||
|
def patch(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(requests.Session, 'prepare_request', self.mock_prepare)
|
||||||
|
monkeypatch.setattr(requests.Session, 'send', self.mock_send)
|
||||||
|
|
||||||
|
|
||||||
class MockResponse:
|
class MockResponse:
|
||||||
def __init__(self, response_data={}, ok=True, is_redirect=False):
|
def __init__(self, response_data={}, ok=True, is_redirect=False):
|
||||||
|
|
41
toot/api.py
41
toot/api.py
|
@ -1,13 +1,11 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
import re
|
import re
|
||||||
import requests
|
|
||||||
|
|
||||||
from urllib.parse import urlparse, urlencode
|
from urllib.parse import urlparse, urlencode
|
||||||
|
|
||||||
from toot import http, CLIENT_NAME, CLIENT_WEBSITE
|
from toot import http, CLIENT_NAME, CLIENT_WEBSITE
|
||||||
from toot.exceptions import ApiError, AuthenticationError, NotFoundError
|
from toot.exceptions import AuthenticationError
|
||||||
from toot.utils import domain_exists
|
|
||||||
|
|
||||||
SCOPES = 'read write follow'
|
SCOPES = 'read write follow'
|
||||||
|
|
||||||
|
@ -18,31 +16,32 @@ def _account_action(app, user, account, action):
|
||||||
return http.post(app, user, url).json()
|
return http.post(app, user, url).json()
|
||||||
|
|
||||||
|
|
||||||
def create_app(instance):
|
def create_app(domain):
|
||||||
base_url = 'https://' + instance
|
url = 'http://{}/api/v1/apps'.format(domain)
|
||||||
url = base_url + '/api/v1/apps'
|
|
||||||
|
|
||||||
response = requests.post(url, {
|
data = {
|
||||||
'client_name': CLIENT_NAME,
|
'client_name': CLIENT_NAME,
|
||||||
'redirect_uris': 'urn:ietf:wg:oauth:2.0:oob',
|
'redirect_uris': 'urn:ietf:wg:oauth:2.0:oob',
|
||||||
'scopes': SCOPES,
|
'scopes': SCOPES,
|
||||||
'website': CLIENT_WEBSITE,
|
'website': CLIENT_WEBSITE,
|
||||||
})
|
}
|
||||||
|
|
||||||
return http.process_response(response).json()
|
return http.anon_post(url, data).json()
|
||||||
|
|
||||||
|
|
||||||
def login(app, username, password):
|
def login(app, username, password):
|
||||||
url = app.base_url + '/oauth/token'
|
url = app.base_url + '/oauth/token'
|
||||||
|
|
||||||
response = requests.post(url, {
|
data = {
|
||||||
'grant_type': 'password',
|
'grant_type': 'password',
|
||||||
'client_id': app.client_id,
|
'client_id': app.client_id,
|
||||||
'client_secret': app.client_secret,
|
'client_secret': app.client_secret,
|
||||||
'username': username,
|
'username': username,
|
||||||
'password': password,
|
'password': password,
|
||||||
'scope': SCOPES,
|
'scope': SCOPES,
|
||||||
}, allow_redirects=False)
|
}
|
||||||
|
|
||||||
|
response = http.anon_post(url, data, allow_redirects=False)
|
||||||
|
|
||||||
# If auth fails, it redirects to the login page
|
# If auth fails, it redirects to the login page
|
||||||
if response.is_redirect:
|
if response.is_redirect:
|
||||||
|
@ -64,13 +63,15 @@ def get_browser_login_url(app):
|
||||||
def request_access_token(app, authorization_code):
|
def request_access_token(app, authorization_code):
|
||||||
url = app.base_url + '/oauth/token'
|
url = app.base_url + '/oauth/token'
|
||||||
|
|
||||||
response = requests.post(url, {
|
data = {
|
||||||
'grant_type': 'authorization_code',
|
'grant_type': 'authorization_code',
|
||||||
'client_id': app.client_id,
|
'client_id': app.client_id,
|
||||||
'client_secret': app.client_secret,
|
'client_secret': app.client_secret,
|
||||||
'code': authorization_code,
|
'code': authorization_code,
|
||||||
'redirect_uri': 'urn:ietf:wg:oauth:2.0:oob',
|
'redirect_uri': 'urn:ietf:wg:oauth:2.0:oob',
|
||||||
}, allow_redirects=False)
|
}
|
||||||
|
|
||||||
|
response = http.anon_post(url, data, allow_redirects=False)
|
||||||
|
|
||||||
return http.process_response(response).json()
|
return http.process_response(response).json()
|
||||||
|
|
||||||
|
@ -155,16 +156,6 @@ def get_notifications(app, user):
|
||||||
return http.get(app, user, '/api/v1/notifications').json()
|
return http.get(app, user, '/api/v1/notifications').json()
|
||||||
|
|
||||||
|
|
||||||
def get_instance(app, user, domain):
|
def get_instance(domain):
|
||||||
if not domain_exists(domain):
|
|
||||||
raise ApiError("Domain {} not found".format(domain))
|
|
||||||
|
|
||||||
url = "http://{}/api/v1/instance".format(domain)
|
url = "http://{}/api/v1/instance".format(domain)
|
||||||
|
return http.anon_get(url).json()
|
||||||
try:
|
|
||||||
return http.unauthorized_get(url).json()
|
|
||||||
except NotFoundError:
|
|
||||||
raise ApiError(
|
|
||||||
"Instance info not found at {}.\n"
|
|
||||||
"The given domain probably does not host a Mastodon instance.".format(url)
|
|
||||||
)
|
|
||||||
|
|
19
toot/auth.py
19
toot/auth.py
|
@ -10,17 +10,22 @@ from toot.exceptions import ApiError, ConsoleError
|
||||||
from toot.output import print_out
|
from toot.output import print_out
|
||||||
|
|
||||||
|
|
||||||
def register_app(instance):
|
def register_app(domain):
|
||||||
print_out("Registering application with <green>{}</green>".format(instance))
|
print_out("Looking up instance info...")
|
||||||
|
instance = api.get_instance(domain)
|
||||||
|
|
||||||
|
print_out("Found instance <blue>{}</blue> running Mastodon version <yellow>{}</yellow>".format(
|
||||||
|
instance['title'], instance['version']))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
response = api.create_app(instance)
|
print_out("Registering application...")
|
||||||
except Exception:
|
response = api.create_app(domain)
|
||||||
raise ConsoleError("Registration failed. Did you enter a valid instance?")
|
except ApiError:
|
||||||
|
raise ConsoleError("Registration failed.")
|
||||||
|
|
||||||
base_url = 'https://' + instance
|
base_url = 'https://' + domain
|
||||||
|
|
||||||
app = App(instance, base_url, response['client_id'], response['client_secret'])
|
app = App(domain, base_url, response['client_id'], response['client_secret'])
|
||||||
path = config.save_app(app)
|
path = config.save_app(app)
|
||||||
print_out("Application tokens saved to: <green>{}</green>\n".format(path))
|
print_out("Application tokens saved to: <green>{}</green>\n".format(path))
|
||||||
|
|
||||||
|
|
|
@ -8,8 +8,9 @@ from textwrap import TextWrapper
|
||||||
|
|
||||||
from toot import api, config
|
from toot import api, config
|
||||||
from toot.auth import login_interactive, login_browser_interactive, create_app_interactive
|
from toot.auth import login_interactive, login_browser_interactive, create_app_interactive
|
||||||
from toot.exceptions import ConsoleError
|
from toot.exceptions import ConsoleError, NotFoundError
|
||||||
from toot.output import print_out, print_instance, print_account, print_search_results
|
from toot.output import print_out, print_instance, print_account, print_search_results
|
||||||
|
from toot.utils import assert_domain_exists
|
||||||
|
|
||||||
|
|
||||||
def _print_timeline(item):
|
def _print_timeline(item):
|
||||||
|
@ -207,5 +208,13 @@ def instance(app, user, args):
|
||||||
if not name:
|
if not name:
|
||||||
raise ConsoleError("Please specify instance name.")
|
raise ConsoleError("Please specify instance name.")
|
||||||
|
|
||||||
instance = api.get_instance(app, user, name)
|
assert_domain_exists(name)
|
||||||
print_instance(instance)
|
|
||||||
|
try:
|
||||||
|
instance = api.get_instance(name)
|
||||||
|
print_instance(instance)
|
||||||
|
except NotFoundError:
|
||||||
|
raise ConsoleError(
|
||||||
|
"Instance not found at {}.\n"
|
||||||
|
"The given domain probably does not host a Mastodon instance.".format(name)
|
||||||
|
)
|
||||||
|
|
71
toot/http.py
71
toot/http.py
|
@ -1,24 +1,37 @@
|
||||||
import requests
|
|
||||||
|
|
||||||
from toot.logging import log_request, log_response
|
|
||||||
from requests import Request, Session
|
from requests import Request, Session
|
||||||
from toot.exceptions import NotFoundError, ApiError
|
from toot.exceptions import NotFoundError, ApiError
|
||||||
|
from toot.logging import log_request, log_response
|
||||||
|
|
||||||
|
|
||||||
|
def send_request(request, allow_redirects=True):
|
||||||
|
log_request(request)
|
||||||
|
|
||||||
|
with Session() as session:
|
||||||
|
prepared = session.prepare_request(request)
|
||||||
|
response = session.send(prepared, allow_redirects=allow_redirects)
|
||||||
|
|
||||||
|
log_response(response)
|
||||||
|
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
def _get_error_message(response):
|
||||||
|
"""Attempt to extract an error message from response body"""
|
||||||
|
try:
|
||||||
|
data = response.json()
|
||||||
|
if "error_description" in data:
|
||||||
|
return data['error_description']
|
||||||
|
if "error" in data:
|
||||||
|
return data['error']
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return "Unknown error"
|
||||||
|
|
||||||
|
|
||||||
def process_response(response):
|
def process_response(response):
|
||||||
log_response(response)
|
|
||||||
|
|
||||||
if not response.ok:
|
if not response.ok:
|
||||||
error = "Unknown error"
|
error = _get_error_message(response)
|
||||||
|
|
||||||
try:
|
|
||||||
data = response.json()
|
|
||||||
if "error_description" in data:
|
|
||||||
error = data['error_description']
|
|
||||||
elif "error" in data:
|
|
||||||
error = data['error']
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
if response.status_code == 404:
|
if response.status_code == 404:
|
||||||
raise NotFoundError(error)
|
raise NotFoundError(error)
|
||||||
|
@ -32,31 +45,31 @@ def get(app, user, url, params=None):
|
||||||
url = app.base_url + url
|
url = app.base_url + url
|
||||||
headers = {"Authorization": "Bearer " + user.access_token}
|
headers = {"Authorization": "Bearer " + user.access_token}
|
||||||
|
|
||||||
log_request(Request('GET', url, headers, params=params))
|
request = Request('GET', url, headers, params=params)
|
||||||
|
response = send_request(request)
|
||||||
response = requests.get(url, params, headers=headers)
|
|
||||||
|
|
||||||
return process_response(response)
|
return process_response(response)
|
||||||
|
|
||||||
|
|
||||||
def unauthorized_get(url, params=None):
|
def anon_get(url, params=None):
|
||||||
log_request(Request('GET', url, None, params=params))
|
request = Request('GET', url, None, params=params)
|
||||||
|
response = send_request(request)
|
||||||
response = requests.get(url, params)
|
|
||||||
|
|
||||||
return process_response(response)
|
return process_response(response)
|
||||||
|
|
||||||
|
|
||||||
def post(app, user, url, data=None, files=None):
|
def post(app, user, url, data=None, files=None, allow_redirects=True):
|
||||||
url = app.base_url + url
|
url = app.base_url + url
|
||||||
headers = {"Authorization": "Bearer " + user.access_token}
|
headers = {"Authorization": "Bearer " + user.access_token}
|
||||||
|
|
||||||
session = Session()
|
|
||||||
request = Request('POST', url, headers, files, data)
|
request = Request('POST', url, headers, files, data)
|
||||||
prepared_request = request.prepare()
|
response = send_request(request, allow_redirects)
|
||||||
|
|
||||||
log_request(request)
|
return process_response(response)
|
||||||
|
|
||||||
response = session.send(prepared_request)
|
|
||||||
|
def anon_post(url, data=None, files=None, allow_redirects=True):
|
||||||
|
request = Request('POST', url, {}, files, data)
|
||||||
|
response = send_request(request, allow_redirects)
|
||||||
|
|
||||||
return process_response(response)
|
return process_response(response)
|
||||||
|
|
|
@ -5,6 +5,8 @@ import socket
|
||||||
|
|
||||||
from bs4 import BeautifulSoup
|
from bs4 import BeautifulSoup
|
||||||
|
|
||||||
|
from toot.exceptions import ConsoleError
|
||||||
|
|
||||||
|
|
||||||
def get_text(html):
|
def get_text(html):
|
||||||
"""Converts html to text, strips all tags."""
|
"""Converts html to text, strips all tags."""
|
||||||
|
@ -50,3 +52,8 @@ def domain_exists(name):
|
||||||
return True
|
return True
|
||||||
except OSError:
|
except OSError:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def assert_domain_exists(domain):
|
||||||
|
if not domain_exists(domain):
|
||||||
|
raise ConsoleError("Domain {} not found".format(domain))
|
||||||
|
|
Loading…
Reference in a new issue