240 lines
5.8 KiB
Python
240 lines
5.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import re
|
|
import requests
|
|
|
|
from requests import Request, Session
|
|
from urllib.parse import urlparse, urlencode
|
|
|
|
from toot import CLIENT_NAME, CLIENT_WEBSITE
|
|
from toot.utils import domain_exists
|
|
from toot.logging import log_request, log_response
|
|
|
|
SCOPES = 'read write follow'
|
|
|
|
|
|
class ApiError(Exception):
|
|
pass
|
|
|
|
|
|
class NotFoundError(ApiError):
|
|
pass
|
|
|
|
|
|
class AuthenticationError(ApiError):
|
|
pass
|
|
|
|
|
|
def _process_response(response):
|
|
log_response(response)
|
|
|
|
if not response.ok:
|
|
error = "Unknown error"
|
|
|
|
try:
|
|
data = response.json()
|
|
if "error_description" in data:
|
|
error = data['error_description']
|
|
elif "error" in data:
|
|
error = data['error']
|
|
except Exception:
|
|
pass
|
|
|
|
if response.status_code == 404:
|
|
raise NotFoundError(error)
|
|
|
|
raise ApiError(error)
|
|
|
|
return response
|
|
|
|
|
|
def _get(app, user, url, params=None):
|
|
url = app.base_url + url
|
|
headers = {"Authorization": "Bearer " + user.access_token}
|
|
|
|
log_request(Request('GET', url, headers, params=params))
|
|
|
|
response = requests.get(url, params, headers=headers)
|
|
|
|
return _process_response(response)
|
|
|
|
|
|
def _unauthorized_get(url, params=None):
|
|
log_request(Request('GET', url, None, params=params))
|
|
|
|
response = requests.get(url, params)
|
|
|
|
return _process_response(response)
|
|
|
|
|
|
def _post(app, user, url, data=None, files=None):
|
|
url = app.base_url + url
|
|
headers = {"Authorization": "Bearer " + user.access_token}
|
|
|
|
session = Session()
|
|
request = Request('POST', url, headers, files, data)
|
|
prepared_request = request.prepare()
|
|
|
|
log_request(request)
|
|
|
|
response = session.send(prepared_request)
|
|
|
|
return _process_response(response)
|
|
|
|
|
|
def _account_action(app, user, account, action):
|
|
url = '/api/v1/accounts/{}/{}'.format(account, action)
|
|
|
|
return _post(app, user, url).json()
|
|
|
|
|
|
def create_app(instance):
|
|
base_url = 'https://' + instance
|
|
url = base_url + '/api/v1/apps'
|
|
|
|
response = requests.post(url, {
|
|
'client_name': CLIENT_NAME,
|
|
'redirect_uris': 'urn:ietf:wg:oauth:2.0:oob',
|
|
'scopes': SCOPES,
|
|
'website': CLIENT_WEBSITE,
|
|
})
|
|
|
|
return _process_response(response).json()
|
|
|
|
|
|
def login(app, username, password):
|
|
url = app.base_url + '/oauth/token'
|
|
|
|
response = requests.post(url, {
|
|
'grant_type': 'password',
|
|
'client_id': app.client_id,
|
|
'client_secret': app.client_secret,
|
|
'username': username,
|
|
'password': password,
|
|
'scope': SCOPES,
|
|
}, allow_redirects=False)
|
|
|
|
# If auth fails, it redirects to the login page
|
|
if response.is_redirect:
|
|
raise AuthenticationError()
|
|
|
|
return _process_response(response).json()
|
|
|
|
|
|
def get_browser_login_url(app):
|
|
"""Returns the URL for manual log in via browser"""
|
|
return "{}/oauth/authorize/?{}".format(app.base_url, urlencode({
|
|
"response_type": "code",
|
|
"redirect_uri": "urn:ietf:wg:oauth:2.0:oob",
|
|
"scope": "read write follow",
|
|
"client_id": app.client_id,
|
|
}))
|
|
|
|
|
|
def request_access_token(app, authorization_code):
|
|
url = app.base_url + '/oauth/token'
|
|
|
|
response = requests.post(url, {
|
|
'grant_type': 'authorization_code',
|
|
'client_id': app.client_id,
|
|
'client_secret': app.client_secret,
|
|
'code': authorization_code,
|
|
'redirect_uri': 'urn:ietf:wg:oauth:2.0:oob',
|
|
}, allow_redirects=False)
|
|
|
|
return _process_response(response).json()
|
|
|
|
|
|
def post_status(app, user, status, visibility='public', media_ids=None):
|
|
return _post(app, user, '/api/v1/statuses', {
|
|
'status': status,
|
|
'media_ids[]': media_ids,
|
|
'visibility': visibility,
|
|
}).json()
|
|
|
|
|
|
def timeline_home(app, user):
|
|
return _get(app, user, '/api/v1/timelines/home').json()
|
|
|
|
|
|
def _get_next_path(headers):
|
|
links = headers.get('Link', '')
|
|
matches = re.match('<([^>]+)>; rel="next"', links)
|
|
if matches:
|
|
url = matches.group(1)
|
|
return urlparse(url).path
|
|
|
|
|
|
def timeline_generator(app, user):
|
|
next_path = '/api/v1/timelines/home'
|
|
|
|
while next_path:
|
|
response = _get(app, user, next_path)
|
|
yield response.json()
|
|
next_path = _get_next_path(response.headers)
|
|
|
|
|
|
def upload_media(app, user, file):
|
|
return _post(app, user, '/api/v1/media', files={
|
|
'file': file
|
|
}).json()
|
|
|
|
|
|
def search(app, user, query, resolve):
|
|
return _get(app, user, '/api/v1/search', {
|
|
'q': query,
|
|
'resolve': resolve,
|
|
}).json()
|
|
|
|
|
|
def search_accounts(app, user, query):
|
|
return _get(app, user, '/api/v1/accounts/search', {
|
|
'q': query,
|
|
}).json()
|
|
|
|
|
|
def follow(app, user, account):
|
|
return _account_action(app, user, account, 'follow')
|
|
|
|
|
|
def unfollow(app, user, account):
|
|
return _account_action(app, user, account, 'unfollow')
|
|
|
|
|
|
def mute(app, user, account):
|
|
return _account_action(app, user, account, 'mute')
|
|
|
|
|
|
def unmute(app, user, account):
|
|
return _account_action(app, user, account, 'unmute')
|
|
|
|
|
|
def block(app, user, account):
|
|
return _account_action(app, user, account, 'block')
|
|
|
|
|
|
def unblock(app, user, account):
|
|
return _account_action(app, user, account, 'unblock')
|
|
|
|
|
|
def verify_credentials(app, user):
|
|
return _get(app, user, '/api/v1/accounts/verify_credentials').json()
|
|
|
|
|
|
def get_notifications(app, user):
|
|
return _get(app, user, '/api/v1/notifications').json()
|
|
|
|
|
|
def get_instance(app, user, domain):
|
|
if not domain_exists(domain):
|
|
raise ApiError("Domain {} not found".format(domain))
|
|
|
|
url = "http://{}/api/v1/instance".format(domain)
|
|
|
|
try:
|
|
return _unauthorized_get(url).json()
|
|
except NotFoundError:
|
|
raise ApiError(
|
|
"Instance info not found at {}.\n"
|
|
"The given domain probably does not host a Mastodon instance.".format(url)
|
|
)
|