From 86e4d23acb640efea8cbc879ddbeadfa0e04f9c8 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sat, 25 Jan 2020 10:47:30 +0300 Subject: [PATCH] [#1505] Background fetching of incoming activities' `replies` collections. --- config/config.exs | 1 + .../web/activity_pub/transmogrifier.ex | 15 +++++- lib/pleroma/workers/remote_fetcher_worker.ex | 20 ++++++++ test/support/oban_helpers.ex | 4 ++ test/web/activity_pub/transmogrifier_test.exs | 48 +++++++++++++++++++ 5 files changed, 87 insertions(+), 1 deletion(-) create mode 100644 lib/pleroma/workers/remote_fetcher_worker.ex diff --git a/config/config.exs b/config/config.exs index 60642c467..5f72df8a0 100644 --- a/config/config.exs +++ b/config/config.exs @@ -501,6 +501,7 @@ transmogrifier: 20, scheduled_activities: 10, background: 5, + remote_fetcher: 2, attachments_cleanup: 5 ] diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex index 9e712ab75..d129334c2 100644 --- a/lib/pleroma/web/activity_pub/transmogrifier.ex +++ b/lib/pleroma/web/activity_pub/transmogrifier.ex @@ -424,7 +424,13 @@ def handle_incoming( ]) } - ActivityPub.create(params) + with {:ok, created_activity} <- ActivityPub.create(params) do + for reply_id <- replies(object) do + Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{"id" => reply_id}) + end + + {:ok, created_activity} + end else %Activity{} = activity -> {:ok, activity} _e -> :error @@ -946,6 +952,13 @@ defp set_replies(obj, replies_uris) do Map.merge(obj, %{"replies" => replies_collection}) end + def replies(%{"replies" => replies = %{}}) do + replies = with %{} <- replies["first"], do: replies["first"], else: (_ -> replies) + replies["items"] || [] + end + + def replies(_), do: [] + # Prepares the object of an outgoing create activity. def prepare_object(object) do object diff --git a/lib/pleroma/workers/remote_fetcher_worker.ex b/lib/pleroma/workers/remote_fetcher_worker.ex new file mode 100644 index 000000000..60eafe2c1 --- /dev/null +++ b/lib/pleroma/workers/remote_fetcher_worker.ex @@ -0,0 +1,20 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.RemoteFetcherWorker do + alias Pleroma.Object.Fetcher + + use Pleroma.Workers.WorkerHelper, queue: "remote_fetcher" + + @impl Oban.Worker + def perform( + %{ + "op" => "fetch_remote", + "id" => id + }, + _job + ) do + Fetcher.fetch_object_from_id!(id) + end +end diff --git a/test/support/oban_helpers.ex b/test/support/oban_helpers.ex index 72792c064..0e3b654df 100644 --- a/test/support/oban_helpers.ex +++ b/test/support/oban_helpers.ex @@ -9,6 +9,10 @@ defmodule Pleroma.Tests.ObanHelpers do alias Pleroma.Repo + def wipe_all do + Repo.delete_all(Oban.Job) + end + def perform_all do Oban.Job |> Repo.all() diff --git a/test/web/activity_pub/transmogrifier_test.exs b/test/web/activity_pub/transmogrifier_test.exs index 418b8a1ca..0fefb60da 100644 --- a/test/web/activity_pub/transmogrifier_test.exs +++ b/test/web/activity_pub/transmogrifier_test.exs @@ -3,7 +3,9 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do + use Oban.Testing, repo: Pleroma.Repo use Pleroma.DataCase + alias Pleroma.Activity alias Pleroma.Object alias Pleroma.Object.Fetcher @@ -1329,6 +1331,52 @@ test "it accepts Move activities" do end end + describe "handle_incoming:`replies` handling" do + setup do + data = + File.read!("test/fixtures/mastodon-post-activity.json") + |> Poison.decode!() + + items = ["https://shitposter.club/notice/2827873", "https://shitposter.club/notice/7387606"] + collection = %{"items" => items} + %{data: data, items: items, collection: collection} + end + + test "it schedules background fetching of wrapped `replies` collection items", %{ + data: data, + items: items, + collection: collection + } do + replies = %{"first" => collection} + + object = Map.put(data["object"], "replies", replies) + data = Map.put(data, "object", object) + {:ok, _activity} = Transmogrifier.handle_incoming(data) + + for id <- items do + job_args = %{"op" => "fetch_remote", "id" => id} + assert_enqueued(worker: Pleroma.Workers.RemoteFetcherWorker, args: job_args) + end + end + + test "it schedules background fetching of unwrapped `replies` collection items", %{ + data: data, + items: items, + collection: collection + } do + replies = collection + + object = Map.put(data["object"], "replies", replies) + data = Map.put(data, "object", object) + {:ok, _activity} = Transmogrifier.handle_incoming(data) + + for id <- items do + job_args = %{"op" => "fetch_remote", "id" => id} + assert_enqueued(worker: Pleroma.Workers.RemoteFetcherWorker, args: job_args) + end + end + end + describe "prepare outgoing" do test "it inlines private announced objects" do user = insert(:user)