[#1149] Oban jobs implementation for :federator_incoming and :federator_outgoing queues.

This commit is contained in:
Ivan Tashkinov 2019-08-09 20:08:01 +03:00
parent 23d279e03e
commit b7fad8d395
13 changed files with 280 additions and 138 deletions

View file

@ -458,6 +458,13 @@
prune: {:maxage, 60 * 60 * 24 * 7},
queues: job_queues
config :pleroma, :workers,
retries: [
compile_time_default: 1,
federator_incoming: 5,
federator_outgoing: 5
]
config :pleroma, :fetch_initial_posts,
enabled: false,
pages: 5

View file

@ -168,14 +168,7 @@ def create_context(context) do
"""
def maybe_federate(%Activity{local: true} = activity) do
if Pleroma.Config.get!([:instance, :federating]) do
priority =
case activity.data["type"] do
"Delete" -> 10
"Create" -> 1
_ -> 5
end
Pleroma.Web.Federator.publish(activity, priority)
Pleroma.Web.Federator.publish(activity)
end
:ok

View file

@ -3,22 +3,15 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Federator do
alias Pleroma.Activity
alias Pleroma.Object.Containment
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.Federator.Publisher
alias Pleroma.Web.OStatus
alias Pleroma.Web.Websub
alias Pleroma.Workers.Publisher, as: PublisherWorker
alias Pleroma.Workers.Receiver, as: ReceiverWorker
alias Pleroma.Workers.Subscriber, as: SubscriberWorker
require Logger
def init do
# 1 minute
Process.sleep(1000 * 60)
refresh_subscriptions()
refresh_subscriptions(schedule_in: 60)
end
@doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)"
@ -36,111 +29,50 @@ def allowed_incoming_reply_depth?(depth) do
# Client API
def incoming_doc(doc) do
PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc])
%{"op" => "incoming_doc", "body" => doc}
|> ReceiverWorker.new(worker_args(:federator_incoming))
|> Pleroma.Repo.insert()
end
def incoming_ap_doc(params) do
PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params])
%{"op" => "incoming_ap_doc", "params" => params}
|> ReceiverWorker.new(worker_args(:federator_incoming))
|> Pleroma.Repo.insert()
end
def publish(activity, priority \\ 1) do
PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority)
def publish(%{id: "pleroma:fakeid"} = activity) do
PublisherWorker.perform_publish(activity)
end
def publish(activity) do
%{"op" => "publish", "activity_id" => activity.id}
|> PublisherWorker.new(worker_args(:federator_outgoing))
|> Pleroma.Repo.insert()
end
def verify_websub(websub) do
PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub])
%{"op" => "verify_websub", "websub_id" => websub.id}
|> SubscriberWorker.new(worker_args(:federator_outgoing))
|> Pleroma.Repo.insert()
end
def request_subscription(sub) do
PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub])
def request_subscription(websub) do
%{"op" => "request_subscription", "websub_id" => websub.id}
|> SubscriberWorker.new(worker_args(:federator_outgoing))
|> Pleroma.Repo.insert()
end
def refresh_subscriptions do
PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions])
def refresh_subscriptions(worker_args \\ []) do
%{"op" => "refresh_subscriptions"}
|> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing))
|> Pleroma.Repo.insert()
end
# Job Worker Callbacks
def perform(:refresh_subscriptions) do
Logger.debug("Federator running refresh subscriptions")
Websub.refresh_subscriptions()
spawn(fn ->
# 6 hours
Process.sleep(1000 * 60 * 60 * 6)
refresh_subscriptions()
end)
end
def perform(:request_subscription, websub) do
Logger.debug("Refreshing #{websub.topic}")
with {:ok, websub} <- Websub.request_subscription(websub) do
Logger.debug("Successfully refreshed #{websub.topic}")
defp worker_args(queue) do
if max_attempts = Pleroma.Config.get([:workers, :retries, queue]) do
[max_attempts: max_attempts]
else
_e -> Logger.debug("Couldn't refresh #{websub.topic}")
end
end
def perform(:publish, activity) do
Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]),
{:ok, actor} <- User.ensure_keys_present(actor) do
Publisher.publish(actor, activity)
end
end
def perform(:verify_websub, websub) do
Logger.debug(fn ->
"Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
end)
Websub.verify(websub)
end
def perform(:incoming_doc, doc) do
Logger.info("Got document, trying to parse")
OStatus.handle_incoming(doc)
end
def perform(:incoming_ap_doc, params) do
Logger.info("Handling incoming AP activity")
params = Utils.normalize_params(params)
# NOTE: we use the actor ID to do the containment, this is fine because an
# actor shouldn't be acting on objects outside their own AP server.
with {:ok, _user} <- ap_enabled_actor(params["actor"]),
nil <- Activity.normalize(params["id"]),
:ok <- Containment.contain_origin_from_id(params["actor"], params),
{:ok, activity} <- Transmogrifier.handle_incoming(params) do
{:ok, activity}
else
%Activity{} ->
Logger.info("Already had #{params["id"]}")
:error
_e ->
# Just drop those for now
Logger.info("Unhandled activity")
Logger.info(Jason.encode!(params, pretty: true))
:error
end
end
def perform(type, _) do
Logger.debug(fn -> "Unknown task: #{type}" end)
{:error, "Don't know what to do with this"}
end
def ap_enabled_actor(id) do
user = User.get_cached_by_ap_id(id)
if User.ap_enabled?(user) do
{:ok, user}
else
ActivityPub.make_user_from_ap_id(id)
[]
end
end
end

View file

@ -6,6 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do
alias Pleroma.Activity
alias Pleroma.Config
alias Pleroma.User
alias Pleroma.Workers.Publisher, as: PublisherWorker
require Logger
@ -30,8 +31,15 @@ defmodule Pleroma.Web.Federator.Publisher do
"""
@spec enqueue_one(module(), Map.t()) :: :ok
def enqueue_one(module, %{} = params) do
%{module: to_string(module), params: params}
|> Pleroma.Workers.Publisher.new()
worker_args =
if max_attempts = Pleroma.Config.get([:workers, :retries, :federator_outgoing]) do
[max_attempts: max_attempts]
else
[]
end
%{"op" => "publish_one", "module" => to_string(module), "params" => params}
|> PublisherWorker.new(worker_args)
|> Pleroma.Repo.insert()
end

View file

@ -3,12 +3,33 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Publisher do
use Oban.Worker, queue: "federator_outgoing", max_attempts: 5
alias Pleroma.Activity
alias Pleroma.User
# Note: `max_attempts` is intended to be overridden in `new/1` call
use Oban.Worker,
queue: "federator_outgoing",
max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
@impl Oban.Worker
def perform(%Oban.Job{args: %{module: module_name, params: params}}) do
def perform(%{"op" => "publish", "activity_id" => activity_id}) do
with %Activity{} = activity <- Activity.get_by_id(activity_id) do
perform_publish(activity)
else
_ -> raise "Non-existing activity: #{activity_id}"
end
end
def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}) do
module_name
|> String.to_atom()
|> apply(:publish_one, [params])
end
def perform_publish(%Activity{} = activity) do
with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]),
{:ok, actor} <- User.ensure_keys_present(actor) do
Pleroma.Web.Federator.Publisher.publish(actor, activity)
end
end
end

View file

@ -0,0 +1,61 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Receiver do
alias Pleroma.Activity
alias Pleroma.Object.Containment
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.OStatus
require Logger
# Note: `max_attempts` is intended to be overridden in `new/1` call
use Oban.Worker,
queue: "federator_incoming",
max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
@impl Oban.Worker
def perform(%{"op" => "incoming_doc", "body" => doc}) do
Logger.info("Got incoming document, trying to parse")
OStatus.handle_incoming(doc)
end
def perform(%{"op" => "incoming_ap_doc", "params" => params}) do
Logger.info("Handling incoming AP activity")
params = Utils.normalize_params(params)
# NOTE: we use the actor ID to do the containment, this is fine because an
# actor shouldn't be acting on objects outside their own AP server.
with {:ok, _user} <- ap_enabled_actor(params["actor"]),
nil <- Activity.normalize(params["id"]),
:ok <- Containment.contain_origin_from_id(params["actor"], params),
{:ok, activity} <- Transmogrifier.handle_incoming(params) do
{:ok, activity}
else
%Activity{} ->
Logger.info("Already had #{params["id"]}")
:error
_e ->
# Just drop those for now
Logger.info("Unhandled activity")
Logger.info(Jason.encode!(params, pretty: true))
:error
end
end
defp ap_enabled_actor(id) do
user = User.get_cached_by_ap_id(id)
if User.ap_enabled?(user) do
{:ok, user}
else
ActivityPub.make_user_from_ap_id(id)
end
end
end

View file

@ -0,0 +1,44 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Subscriber do
alias Pleroma.Repo
alias Pleroma.Web.Websub
alias Pleroma.Web.Websub.WebsubClientSubscription
require Logger
# Note: `max_attempts` is intended to be overridden in `new/1` call
use Oban.Worker,
queue: "federator_outgoing",
max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
@impl Oban.Worker
def perform(%{"op" => "refresh_subscriptions"}) do
Websub.refresh_subscriptions()
# Schedule the next run in 6 hours
Pleroma.Web.Federator.refresh_subscriptions(schedule_in: 3600 * 6)
end
def perform(%{"op" => "request_subscription", "websub_id" => websub_id}) do
websub = Repo.get(WebsubClientSubscription, websub_id)
Logger.debug("Refreshing #{websub.topic}")
with {:ok, websub} <- Websub.request_subscription(websub) do
Logger.debug("Successfully refreshed #{websub.topic}")
else
_e -> Logger.debug("Couldn't refresh #{websub.topic}")
end
end
def perform(%{"op" => "verify_websub", "websub_id" => websub_id}) do
websub = Repo.get(WebsubClientSubscription, websub_id)
Logger.debug(fn ->
"Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
end)
Websub.verify(websub)
end
end

View file

@ -6,6 +6,7 @@ defmodule Pleroma.ActivityTest do
use Pleroma.DataCase
alias Pleroma.Activity
alias Pleroma.Bookmark
alias Pleroma.ObanHelpers
alias Pleroma.Object
alias Pleroma.ThreadMute
import Pleroma.Factory
@ -125,7 +126,8 @@ test "when association is not loaded" do
}
{:ok, local_activity} = Pleroma.Web.CommonAPI.post(user, %{"status" => "find me!"})
{:ok, remote_activity} = Pleroma.Web.Federator.incoming_ap_doc(params)
{:ok, job} = Pleroma.Web.Federator.incoming_ap_doc(params)
{:ok, remote_activity} = ObanHelpers.perform(job)
%{local_activity: local_activity, remote_activity: remote_activity, user: user}
end

View file

@ -0,0 +1,36 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.ObanHelpers do
@moduledoc """
Oban test helpers.
"""
alias Pleroma.Repo
def perform(%Oban.Job{} = job) do
res = apply(String.to_existing_atom("Elixir." <> job.worker), :perform, [job])
Repo.delete(job)
res
end
def perform(jobs) when is_list(jobs) do
for job <- jobs, do: perform(job)
end
def member?(%{} = job_args, jobs) when is_list(jobs) do
Enum.any?(jobs, fn job ->
member?(job_args, job.args)
end)
end
def member?(%{} = test_attrs, %{} = attrs) do
Enum.all?(
test_attrs,
fn {k, _v} -> member?(test_attrs[k], attrs[k]) end
)
end
def member?(x, y), do: x == y
end

View file

@ -5,6 +5,7 @@
defmodule Pleroma.UserTest do
alias Pleroma.Activity
alias Pleroma.Builders.UserBuilder
alias Pleroma.ObanHelpers
alias Pleroma.Object
alias Pleroma.Repo
alias Pleroma.User
@ -1044,8 +1045,16 @@ test "it sends out User Delete activity", %{user: user} do
{:ok, _user} = User.delete(user)
assert [%{args: %{"params" => %{"inbox" => "http://mastodon.example.org/inbox"}}}] =
assert ObanHelpers.member?(
%{
"op" => "publish_one",
"params" => %{
"inbox" => "http://mastodon.example.org/inbox",
"id" => "pleroma:fakeid"
}
},
all_enqueued(worker: Pleroma.Workers.Publisher)
)
Pleroma.Config.put(config_path, initial_setting)
end

View file

@ -4,15 +4,19 @@
defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
use Pleroma.Web.ConnCase
use Oban.Testing, repo: Pleroma.Repo
import Pleroma.Factory
alias Pleroma.Activity
alias Pleroma.Instances
alias Pleroma.ObanHelpers
alias Pleroma.Object
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ObjectView
alias Pleroma.Web.ActivityPub.UserView
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.CommonAPI
alias Pleroma.Workers.Receiver, as: ReceiverWorker
setup_all do
Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
@ -232,7 +236,8 @@ test "it inserts an incoming activity into the database", %{conn: conn} do
|> post("/inbox", data)
assert "ok" == json_response(conn, 200)
:timer.sleep(500)
ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
assert Activity.get_by_ap_id(data["id"])
end
@ -274,7 +279,7 @@ test "it inserts an incoming activity into the database", %{conn: conn, data: da
|> post("/users/#{user.nickname}/inbox", data)
assert "ok" == json_response(conn, 200)
:timer.sleep(500)
ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
assert Activity.get_by_ap_id(data["id"])
end
@ -303,7 +308,7 @@ test "it accepts messages from actors that are followed by the user", %{
|> post("/users/#{recipient.nickname}/inbox", data)
assert "ok" == json_response(conn, 200)
:timer.sleep(500)
ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
assert Activity.get_by_ap_id(data["id"])
end
@ -382,6 +387,8 @@ test "it removes all follower collections but actor's", %{conn: conn} do
|> post("/users/#{recipient.nickname}/inbox", data)
|> json_response(200)
ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
activity = Activity.get_by_ap_id(data["id"])
assert activity.id
@ -457,6 +464,7 @@ test "it inserts an incoming create activity into the database", %{conn: conn} d
|> post("/users/#{user.nickname}/outbox", data)
result = json_response(conn, 201)
assert Activity.get_by_ap_id(result["id"])
end

View file

@ -4,8 +4,10 @@
defmodule Pleroma.Web.FederatorTest do
alias Pleroma.Instances
alias Pleroma.ObanHelpers
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Federator
alias Pleroma.Workers.Publisher, as: PublisherWorker
use Pleroma.DataCase
use Oban.Testing, repo: Pleroma.Repo
@ -45,6 +47,7 @@ test "with relays active, it publishes to the relay", %{
} do
with_mocks([relay_mock]) do
Federator.publish(activity)
ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
end
assert_received :relay_publish
@ -58,6 +61,7 @@ test "with relays deactivated, it does not publish to the relay", %{
with_mocks([relay_mock]) do
Federator.publish(activity)
ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
end
refute_received :relay_publish
@ -97,8 +101,15 @@ test "it federates only to reachable instances via AP" do
expected_dt = NaiveDateTime.to_iso8601(dt)
assert [%{args: %{"params" => %{"inbox" => ^inbox1, "unreachable_since" => ^expected_dt}}}] =
all_enqueued(worker: Pleroma.Workers.Publisher)
ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
assert ObanHelpers.member?(
%{
"op" => "publish_one",
"params" => %{"inbox" => inbox1, "unreachable_since" => expected_dt}
},
all_enqueued(worker: PublisherWorker)
)
end
test "it federates only to reachable instances via Websub" do
@ -129,16 +140,18 @@ test "it federates only to reachable instances via Websub" do
expected_callback = sub2.callback
expected_dt = NaiveDateTime.to_iso8601(dt)
assert [
ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
assert ObanHelpers.member?(
%{
args: %{
"op" => "publish_one",
"params" => %{
"callback" => ^expected_callback,
"unreachable_since" => ^expected_dt
"callback" => expected_callback,
"unreachable_since" => expected_dt
}
}
}
] = all_enqueued(worker: Pleroma.Workers.Publisher)
},
all_enqueued(worker: PublisherWorker)
)
end
test "it federates only to reachable instances via Salmon" do
@ -172,16 +185,18 @@ test "it federates only to reachable instances via Salmon" do
expected_dt = NaiveDateTime.to_iso8601(dt)
assert [
ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
assert ObanHelpers.member?(
%{
args: %{
"op" => "publish_one",
"params" => %{
"recipient_id" => ^remote_user2_id,
"unreachable_since" => ^expected_dt
"recipient_id" => remote_user2_id,
"unreachable_since" => expected_dt
}
}
}
] = all_enqueued(worker: Pleroma.Workers.Publisher)
},
all_enqueued(worker: PublisherWorker)
)
end
end
@ -201,7 +216,8 @@ test "successfully processes incoming AP docs with correct origin" do
"to" => ["https://www.w3.org/ns/activitystreams#Public"]
}
{:ok, _activity} = Federator.incoming_ap_doc(params)
assert {:ok, job} = Federator.incoming_ap_doc(params)
assert {:ok, _activity} = ObanHelpers.perform(job)
end
test "rejects incoming AP docs with incorrect origin" do
@ -219,7 +235,8 @@ test "rejects incoming AP docs with incorrect origin" do
"to" => ["https://www.w3.org/ns/activitystreams#Public"]
}
:error = Federator.incoming_ap_doc(params)
assert {:ok, job} = Federator.incoming_ap_doc(params)
assert :error = ObanHelpers.perform(job)
end
end
end

View file

@ -4,11 +4,14 @@
defmodule Pleroma.Web.WebsubTest do
use Pleroma.DataCase
use Oban.Testing, repo: Pleroma.Repo
alias Pleroma.ObanHelpers
alias Pleroma.Web.Router.Helpers
alias Pleroma.Web.Websub
alias Pleroma.Web.Websub.WebsubClientSubscription
alias Pleroma.Web.Websub.WebsubServerSubscription
alias Pleroma.Workers.Subscriber, as: SubscriberWorker
import Pleroma.Factory
import Tesla.Mock
@ -224,6 +227,7 @@ test "it renews subscriptions that have less than a day of time left" do
})
_refresh = Websub.refresh_subscriptions()
ObanHelpers.perform(all_enqueued(worker: SubscriberWorker))
assert still_good == Repo.get(WebsubClientSubscription, still_good.id)
refute needs_refresh == Repo.get(WebsubClientSubscription, needs_refresh.id)