commit
314a0aefbe
7 changed files with 139 additions and 28 deletions
BIN
tests/assets/small.webm
Normal file
BIN
tests/assets/small.webm
Normal file
Binary file not shown.
|
@ -292,7 +292,6 @@ def test_reblogged_by(mock_get, monkeypatch, capsys):
|
|||
def test_upload(mock_post, capsys):
|
||||
mock_post.return_value = MockResponse({
|
||||
'id': 123,
|
||||
'url': 'https://bigfish.software/123/456',
|
||||
'preview_url': 'https://bigfish.software/789/012',
|
||||
'url': 'https://bigfish.software/345/678',
|
||||
'type': 'image',
|
||||
|
@ -300,10 +299,10 @@ def test_upload(mock_post, capsys):
|
|||
|
||||
console.run_command(app, user, 'upload', [__file__])
|
||||
|
||||
mock_post.call_count == 1
|
||||
assert mock_post.call_count == 1
|
||||
|
||||
args, kwargs = http.post.call_args
|
||||
assert args == (app, user, '/api/v1/media')
|
||||
assert args == (app, user, '/api/v2/media')
|
||||
assert isinstance(kwargs['files']['file'], io.BufferedReader)
|
||||
|
||||
out, err = capsys.readouterr()
|
||||
|
|
|
@ -207,6 +207,39 @@ def test_post_language(app, user, run):
|
|||
assert status["language"] == "zh"
|
||||
|
||||
|
||||
def test_media_thumbnail(app, user, run):
|
||||
assets_dir = path.realpath(path.join(path.dirname(__file__), "assets"))
|
||||
|
||||
video_path = path.join(assets_dir, "small.webm")
|
||||
thumbnail_path = path.join(assets_dir, "test1.png")
|
||||
|
||||
out = run(
|
||||
"post",
|
||||
"--media", video_path,
|
||||
"--thumbnail", thumbnail_path,
|
||||
"--description", "foo",
|
||||
"some text"
|
||||
)
|
||||
|
||||
status_id = _posted_status_id(out)
|
||||
status = api.fetch_status(app, user, status_id)
|
||||
[media] = status["media_attachments"]
|
||||
|
||||
assert media["description"] == "foo"
|
||||
assert media["type"] == "video"
|
||||
assert media["url"].endswith(".mp4")
|
||||
assert media["preview_url"].endswith(".png")
|
||||
|
||||
# Video properties
|
||||
assert media["meta"]["original"]["duration"] == 5.58
|
||||
assert media["meta"]["original"]["height"] == 320
|
||||
assert media["meta"]["original"]["width"] == 560
|
||||
|
||||
# Thumbnail properties
|
||||
assert media["meta"]["small"]["height"] == 50
|
||||
assert media["meta"]["small"]["width"] == 50
|
||||
|
||||
|
||||
def test_media_attachments(app, user, run):
|
||||
assets_dir = path.realpath(path.join(path.dirname(__file__), "assets"))
|
||||
|
||||
|
|
71
toot/api.py
71
toot/api.py
|
@ -1,12 +1,14 @@
|
|||
import mimetypes
|
||||
from os import path
|
||||
import re
|
||||
import uuid
|
||||
|
||||
from typing import List
|
||||
from typing import BinaryIO, List, Optional
|
||||
from urllib.parse import urlparse, urlencode, quote
|
||||
|
||||
from toot import http, CLIENT_NAME, CLIENT_WEBSITE
|
||||
from toot.exceptions import AuthenticationError
|
||||
from toot.utils import str_bool, str_bool_nullable
|
||||
from toot import App, User, http, CLIENT_NAME, CLIENT_WEBSITE
|
||||
from toot.exceptions import AuthenticationError, ConsoleError
|
||||
from toot.utils import drop_empty_values, str_bool, str_bool_nullable
|
||||
|
||||
SCOPES = 'read write follow'
|
||||
|
||||
|
@ -85,10 +87,9 @@ def update_account(
|
|||
Update account credentials
|
||||
https://docs.joinmastodon.org/methods/accounts/#update_credentials
|
||||
"""
|
||||
files = {"avatar": avatar, "header": header}
|
||||
files = {k: v for k, v in files.items() if v is not None}
|
||||
files = drop_empty_values({"avatar": avatar, "header": header})
|
||||
|
||||
data = {
|
||||
data = drop_empty_values({
|
||||
"bot": str_bool_nullable(bot),
|
||||
"discoverable": str_bool_nullable(discoverable),
|
||||
"display_name": display_name,
|
||||
|
@ -97,8 +98,7 @@ def update_account(
|
|||
"source[language]": language,
|
||||
"source[privacy]": privacy,
|
||||
"source[sensitive]": str_bool_nullable(sensitive),
|
||||
}
|
||||
data = {k: v for k, v in data.items() if v is not None}
|
||||
})
|
||||
|
||||
return http.patch(app, user, "/api/v1/accounts/update_credentials", files=files, data=data)
|
||||
|
||||
|
@ -182,7 +182,9 @@ def post_status(
|
|||
# if the request is retried.
|
||||
headers = {"Idempotency-Key": uuid.uuid4().hex}
|
||||
|
||||
json = {
|
||||
# Strip keys for which value is None
|
||||
# Sending null values doesn't bother Mastodon, but it breaks Pleroma
|
||||
json = drop_empty_values({
|
||||
'status': status,
|
||||
'media_ids': media_ids,
|
||||
'visibility': visibility,
|
||||
|
@ -192,11 +194,7 @@ def post_status(
|
|||
'scheduled_at': scheduled_at,
|
||||
'content_type': content_type,
|
||||
'spoiler_text': spoiler_text
|
||||
}
|
||||
|
||||
# Strip keys for which value is None
|
||||
# Sending null values doesn't bother Mastodon, but it breaks Pleroma
|
||||
json = {k: v for k, v in json.items() if v is not None}
|
||||
})
|
||||
|
||||
return http.post(app, user, '/api/v1/statuses', json=json, headers=headers).json()
|
||||
|
||||
|
@ -351,11 +349,44 @@ def anon_tag_timeline_generator(instance, hashtag, local=False, limit=20):
|
|||
return _anon_timeline_generator(instance, path, params)
|
||||
|
||||
|
||||
def upload_media(app, user, file, description=None):
|
||||
return http.post(app, user, '/api/v1/media',
|
||||
data={'description': description},
|
||||
files={'file': file}
|
||||
).json()
|
||||
def get_media(app: App, user: User, id: str):
|
||||
return http.get(app, user, f"/api/v1/media/{id}").json()
|
||||
|
||||
|
||||
def upload_media(
|
||||
app: App,
|
||||
user: User,
|
||||
media: BinaryIO,
|
||||
description: Optional[str] = None,
|
||||
thumbnail: Optional[BinaryIO] = None,
|
||||
):
|
||||
data = drop_empty_values({"description": description})
|
||||
|
||||
# NB: Documentation says that "file" should provide a mime-type which we
|
||||
# don't do currently, but it works.
|
||||
files = drop_empty_values({
|
||||
"file": media,
|
||||
"thumbnail": _add_mime_type(thumbnail)
|
||||
})
|
||||
|
||||
return http.post(app, user, "/api/v2/media", data=data, files=files).json()
|
||||
|
||||
|
||||
def _add_mime_type(file):
|
||||
if file is None:
|
||||
return None
|
||||
|
||||
# TODO: mimetypes uses the file extension to guess the mime type which is
|
||||
# not always good enough (e.g. files without extension). python-magic could
|
||||
# be used instead but it requires adding it as a dependency.
|
||||
mime_type = mimetypes.guess_type(file.name)
|
||||
|
||||
if not mime_type:
|
||||
raise ConsoleError(f"Unable guess mime type of '{file.name}'. "
|
||||
"Ensure the file has the desired extension.")
|
||||
|
||||
filename = path.basename(file.name)
|
||||
return (filename, file, mime_type)
|
||||
|
||||
|
||||
def search(app, user, query, resolve=False, type=None):
|
||||
|
|
|
@ -2,6 +2,7 @@ import sys
|
|||
import platform
|
||||
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from time import sleep, time
|
||||
from toot import api, config, __version__
|
||||
from toot.auth import login_interactive, login_browser_interactive, create_app_interactive
|
||||
from toot.exceptions import ApiError, ConsoleError
|
||||
|
@ -142,19 +143,54 @@ def _get_scheduled_at(scheduled_at, scheduled_in):
|
|||
|
||||
|
||||
def _upload_media(app, user, args):
|
||||
# Match media to corresponding description and upload
|
||||
# Match media to corresponding description and thumbnail
|
||||
media = args.media or []
|
||||
descriptions = args.description or []
|
||||
thumbnails = args.thumbnail or []
|
||||
uploaded_media = []
|
||||
|
||||
for idx, file in enumerate(media):
|
||||
description = descriptions[idx].strip() if idx < len(descriptions) else None
|
||||
result = _do_upload(app, user, file, description)
|
||||
thumbnail = thumbnails[idx] if idx < len(thumbnails) else None
|
||||
result = _do_upload(app, user, file, description, thumbnail)
|
||||
uploaded_media.append(result)
|
||||
|
||||
_wait_until_all_processed(app, user, uploaded_media)
|
||||
|
||||
return [m["id"] for m in uploaded_media]
|
||||
|
||||
|
||||
def _wait_until_all_processed(app, user, uploaded_media):
|
||||
"""
|
||||
Media is uploaded asynchronously, and cannot be attached until the server
|
||||
has finished processing it. This function waits for that to happen.
|
||||
|
||||
Once media is processed, it will have the URL populated.
|
||||
"""
|
||||
if all(m["url"] for m in uploaded_media):
|
||||
return
|
||||
|
||||
# Timeout after waiting 1 minute
|
||||
start_time = time()
|
||||
timeout = 60
|
||||
|
||||
print_out("<dim>Waiting for media to finish processing...</dim>")
|
||||
for media in uploaded_media:
|
||||
_wait_until_processed(app, user, media, start_time, timeout)
|
||||
|
||||
|
||||
def _wait_until_processed(app, user, media, start_time, timeout):
|
||||
if media["url"]:
|
||||
return
|
||||
|
||||
media = api.get_media(app, user, media["id"])
|
||||
while not media["url"]:
|
||||
sleep(1)
|
||||
if time() > start_time + timeout:
|
||||
raise ConsoleError(f"Media not processed by server after {timeout} seconds. Aborting.")
|
||||
media = api.get_media(app, user, media["id"])
|
||||
|
||||
|
||||
def delete(app, user, args):
|
||||
api.delete_status(app, user, args.status_id)
|
||||
print_out("<green>✓ Status deleted</green>")
|
||||
|
@ -297,7 +333,7 @@ def activate(app, user, args):
|
|||
|
||||
|
||||
def upload(app, user, args):
|
||||
response = _do_upload(app, user, args.file, args.description)
|
||||
response = _do_upload(app, user, args.file, args.description, None)
|
||||
|
||||
msg = "Successfully uploaded media ID <yellow>{}</yellow>, type '<yellow>{}</yellow>'"
|
||||
|
||||
|
@ -312,9 +348,9 @@ def search(app, user, args):
|
|||
print_search_results(response)
|
||||
|
||||
|
||||
def _do_upload(app, user, file, description):
|
||||
def _do_upload(app, user, file, description, thumbnail):
|
||||
print_out("Uploading media: <green>{}</green>".format(file.name))
|
||||
return api.upload_media(app, user, file, description=description)
|
||||
return api.upload_media(app, user, file, description=description, thumbnail=thumbnail)
|
||||
|
||||
|
||||
def _find_account(app, user, account_name):
|
||||
|
|
|
@ -482,6 +482,12 @@ POST_COMMANDS = [
|
|||
"help": "plain-text description of the media for accessibility "
|
||||
"purposes, one per attached media"
|
||||
}),
|
||||
(["--thumbnail"], {
|
||||
"action": "append",
|
||||
"type": FileType("rb"),
|
||||
"help": "path to an image file to serve as media thumbnail, "
|
||||
"one per attached media"
|
||||
}),
|
||||
visibility_arg,
|
||||
(["-s", "--sensitive"], {
|
||||
"action": 'store_true',
|
||||
|
|
|
@ -7,6 +7,7 @@ import unicodedata
|
|||
import warnings
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
from typing import Dict
|
||||
|
||||
from toot.exceptions import ConsoleError
|
||||
|
||||
|
@ -154,3 +155,8 @@ def _use_existing_tmp_file(tmp_path) -> bool:
|
|||
return char == "o"
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def drop_empty_values(data: Dict) -> Dict:
|
||||
"""Remove keys whose values are null"""
|
||||
return {k: v for k, v in data.items() if v is not None}
|
||||
|
|
Loading…
Reference in a new issue