ActivityPub: One queue item per server.

This commit is contained in:
lain 2018-02-20 08:51:19 +01:00
parent 297a2c7d3f
commit 9c89916969
2 changed files with 13 additions and 4 deletions

View file

@ -2,6 +2,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.{Activity, Repo, Object, Upload, User, Notification} alias Pleroma.{Activity, Repo, Object, Upload, User, Notification}
alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.WebFinger alias Pleroma.Web.WebFinger
alias Pleroma.Web.Federator
import Ecto.Query import Ecto.Query
import Pleroma.Web.ActivityPub.Utils import Pleroma.Web.ActivityPub.Utils
require Logger require Logger
@ -305,12 +306,16 @@ def publish(actor, activity) do
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data) {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
json = Poison.encode!(data) json = Poison.encode!(data)
Enum.each remote_inboxes, fn(inbox) -> Enum.each remote_inboxes, fn(inbox) ->
Logger.info("Federating #{activity.data["id"]} to #{inbox}") Federator.enqueue(:publish_single_ap, %{inbox: inbox, json: json, actor: actor, id: activity.data["id"]})
end
end
def publish_one(%{inbox: inbox, json: json, actor: actor, id: id}) do
Logger.info("Federating #{id} to #{inbox}")
host = URI.parse(inbox).host host = URI.parse(inbox).host
signature = Pleroma.Web.HTTPSignatures.sign(actor, %{host: host, "content-length": byte_size(json)}) signature = Pleroma.Web.HTTPSignatures.sign(actor, %{host: host, "content-length": byte_size(json)})
@httpoison.post(inbox, json, [{"Content-Type", "application/activity+json"}, {"signature", signature}]) @httpoison.post(inbox, json, [{"Content-Type", "application/activity+json"}, {"signature", signature}])
end end
end
# TODO: # TODO:
# This will create a Create activity, which we need internally at the moment. # This will create a Create activity, which we need internally at the moment.

View file

@ -66,6 +66,10 @@ def handle(:incoming_doc, doc) do
@ostatus.handle_incoming(doc) @ostatus.handle_incoming(doc)
end end
def handle(:publish_single_ap, params) do
ActivityPub.publish_one(params)
end
def handle(:publish_single_websub, %{xml: xml, topic: topic, callback: callback, secret: secret}) do def handle(:publish_single_websub, %{xml: xml, topic: topic, callback: callback, secret: secret}) do
signature = @websub.sign(secret || "", xml) signature = @websub.sign(secret || "", xml)
Logger.debug(fn -> "Pushing #{topic} to #{callback}" end) Logger.debug(fn -> "Pushing #{topic} to #{callback}" end)