Merge branch '582_federation_retirement_optimization' into 'develop'

[#582] Optimized federation retirement by reducing the number of SQL calls

Closes #582

See merge request pleroma/pleroma!762
This commit is contained in:
kaniini 2019-02-03 15:47:55 +00:00
commit 8180de951e
8 changed files with 178 additions and 43 deletions

View file

@ -26,7 +26,7 @@ def changeset(struct, params \\ %{}) do
|> unique_constraint(:host) |> unique_constraint(:host)
end end
def filter_reachable([]), do: [] def filter_reachable([]), do: %{}
def filter_reachable(urls_or_hosts) when is_list(urls_or_hosts) do def filter_reachable(urls_or_hosts) when is_list(urls_or_hosts) do
hosts = hosts =
@ -34,17 +34,28 @@ def filter_reachable(urls_or_hosts) when is_list(urls_or_hosts) do
|> Enum.map(&(&1 && host(&1))) |> Enum.map(&(&1 && host(&1)))
|> Enum.filter(&(to_string(&1) != "")) |> Enum.filter(&(to_string(&1) != ""))
unreachable_hosts = unreachable_since_by_host =
Repo.all( Repo.all(
from(i in Instance, from(i in Instance,
where: where: i.host in ^hosts,
i.host in ^hosts and select: {i.host, i.unreachable_since}
i.unreachable_since <= ^Instances.reachability_datetime_threshold(),
select: i.host
) )
) )
|> Map.new(& &1)
Enum.filter(urls_or_hosts, &(&1 && host(&1) not in unreachable_hosts)) reachability_datetime_threshold = Instances.reachability_datetime_threshold()
for entry <- Enum.filter(urls_or_hosts, &is_binary/1) do
host = host(entry)
unreachable_since = unreachable_since_by_host[host]
if !unreachable_since ||
NaiveDateTime.compare(unreachable_since, reachability_datetime_threshold) == :gt do
{entry, unreachable_since}
end
end
|> Enum.filter(& &1)
|> Map.new(& &1)
end end
def reachable?(url_or_host) when is_binary(url_or_host) do def reachable?(url_or_host) when is_binary(url_or_host) do

View file

@ -744,7 +744,7 @@ def publish(actor, activity) do
public = is_public?(activity) public = is_public?(activity)
remote_inboxes = reachable_inboxes_metadata =
(Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers) (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers)
|> Enum.filter(fn user -> User.ap_enabled?(user) end) |> Enum.filter(fn user -> User.ap_enabled?(user) end)
|> Enum.map(fn %{info: %{source_data: data}} -> |> Enum.map(fn %{info: %{source_data: data}} ->
@ -757,17 +757,18 @@ def publish(actor, activity) do
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data) {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
json = Jason.encode!(data) json = Jason.encode!(data)
Enum.each(remote_inboxes, fn inbox -> Enum.each(reachable_inboxes_metadata, fn {inbox, unreachable_since} ->
Federator.enqueue(:publish_single_ap, %{ Federator.enqueue(:publish_single_ap, %{
inbox: inbox, inbox: inbox,
json: json, json: json,
actor: actor, actor: actor,
id: activity.data["id"] id: activity.data["id"],
unreachable_since: unreachable_since
}) })
end) end)
end end
def publish_one(%{inbox: inbox, json: json, actor: actor, id: id}) do def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do
Logger.info("Federating #{id} to #{inbox}") Logger.info("Federating #{id} to #{inbox}")
host = URI.parse(inbox).host host = URI.parse(inbox).host
@ -791,11 +792,13 @@ def publish_one(%{inbox: inbox, json: json, actor: actor, id: id}) do
{"digest", digest} {"digest", digest}
] ]
) do ) do
Instances.set_reachable(inbox) if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
do: Instances.set_reachable(inbox)
result result
else else
{_post_result, response} -> {_post_result, response} ->
Instances.set_unreachable(inbox) unless params[:unreachable_since], do: Instances.set_unreachable(inbox)
{:error, response} {:error, response}
end end
end end

View file

@ -124,8 +124,8 @@ def handle(:incoming_ap_doc, params) do
end end
end end
def handle(:publish_single_salmon, {user_or_url, feed, poster}) do def handle(:publish_single_salmon, params) do
Salmon.send_to_user(user_or_url, feed, poster) Salmon.send_to_user(params)
end end
def handle(:publish_single_ap, params) do def handle(:publish_single_ap, params) do

View file

@ -162,30 +162,31 @@ def remote_users(%{data: %{"to" => to} = data}) do
|> Enum.filter(fn user -> user && !user.local end) |> Enum.filter(fn user -> user && !user.local end)
end end
# push an activity to remote accounts @doc "Pushes an activity to remote account."
# def send_to_user(%{recipient: %{info: %{salmon: salmon}}} = params),
def send_to_user(%{info: %{salmon: salmon}}, feed, poster), do: send_to_user(Map.put(params, :recipient, salmon))
do: send_to_user(salmon, feed, poster)
def send_to_user(url, feed, poster) when is_binary(url) do def send_to_user(%{recipient: url, feed: feed, poster: poster} = params) when is_binary(url) do
with {:ok, %{status: code}} when code in 200..299 <- with {:ok, %{status: code}} when code in 200..299 <-
poster.( poster.(
url, url,
feed, feed,
[{"Content-Type", "application/magic-envelope+xml"}] [{"Content-Type", "application/magic-envelope+xml"}]
) do ) do
Instances.set_reachable(url) if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
do: Instances.set_reachable(url)
Logger.debug(fn -> "Pushed to #{url}, code #{code}" end) Logger.debug(fn -> "Pushed to #{url}, code #{code}" end)
:ok :ok
else else
e -> e ->
Instances.set_unreachable(url) unless params[:unreachable_since], do: Instances.set_reachable(url)
Logger.debug(fn -> "Pushing Salmon to #{url} failed, #{inspect(e)}" end) Logger.debug(fn -> "Pushing Salmon to #{url} failed, #{inspect(e)}" end)
:error :error
end end
end end
def send_to_user(_, _, _), do: :noop def send_to_user(_), do: :noop
@supported_activities [ @supported_activities [
"Create", "Create",
@ -218,13 +219,20 @@ def publish(%{info: %{keys: keys}} = user, %{data: %{"type" => type}} = activity
remote_users = remote_users(activity) remote_users = remote_users(activity)
salmon_urls = Enum.map(remote_users, & &1.info.salmon) salmon_urls = Enum.map(remote_users, & &1.info.salmon)
reachable_salmon_urls = Instances.filter_reachable(salmon_urls) reachable_urls_metadata = Instances.filter_reachable(salmon_urls)
reachable_urls = Map.keys(reachable_urls_metadata)
remote_users remote_users
|> Enum.filter(&(&1.info.salmon in reachable_salmon_urls)) |> Enum.filter(&(&1.info.salmon in reachable_urls))
|> Enum.each(fn remote_user -> |> Enum.each(fn remote_user ->
Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end) Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
Pleroma.Web.Federator.enqueue(:publish_single_salmon, {remote_user, feed, poster})
Pleroma.Web.Federator.enqueue(:publish_single_salmon, %{
recipient: remote_user,
feed: feed,
poster: poster,
unreachable_since: reachable_urls_metadata[remote_user.info.salmon]
})
end) end)
end end
end end

View file

@ -70,7 +70,8 @@ def publish(topic, user, %{data: %{"type" => type}} = activity)
subscriptions = Repo.all(query) subscriptions = Repo.all(query)
callbacks = Enum.map(subscriptions, & &1.callback) callbacks = Enum.map(subscriptions, & &1.callback)
reachable_callbacks = Instances.filter_reachable(callbacks) reachable_callbacks_metadata = Instances.filter_reachable(callbacks)
reachable_callbacks = Map.keys(reachable_callbacks_metadata)
subscriptions subscriptions
|> Enum.filter(&(&1.callback in reachable_callbacks)) |> Enum.filter(&(&1.callback in reachable_callbacks))
@ -79,7 +80,8 @@ def publish(topic, user, %{data: %{"type" => type}} = activity)
xml: response, xml: response,
topic: topic, topic: topic,
callback: sub.callback, callback: sub.callback,
secret: sub.secret secret: sub.secret,
unreachable_since: reachable_callbacks_metadata[sub.callback]
} }
Pleroma.Web.Federator.enqueue(:publish_single_websub, data) Pleroma.Web.Federator.enqueue(:publish_single_websub, data)
@ -268,7 +270,7 @@ def refresh_subscriptions(delta \\ 60 * 60 * 24) do
end) end)
end end
def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret}) do def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret} = params) do
signature = sign(secret || "", xml) signature = sign(secret || "", xml)
Logger.info(fn -> "Pushing #{topic} to #{callback}" end) Logger.info(fn -> "Pushing #{topic} to #{callback}" end)
@ -281,12 +283,14 @@ def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret}) d
{"X-Hub-Signature", "sha1=#{signature}"} {"X-Hub-Signature", "sha1=#{signature}"}
] ]
) do ) do
Instances.set_reachable(callback) if !Map.has_key?(params, :unreachable_since) || params[:unreachable_since],
do: Instances.set_reachable(callback)
Logger.info(fn -> "Pushed to #{callback}, code #{code}" end) Logger.info(fn -> "Pushed to #{callback}, code #{code}" end)
{:ok, code} {:ok, code}
else else
{_post_result, response} -> {_post_result, response} ->
Instances.set_unreachable(callback) unless params[:unreachable_since], do: Instances.set_reachable(callback)
Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(response)}" end) Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(response)}" end)
{:error, response} {:error, response}
end end

View file

@ -698,7 +698,57 @@ test "returned pinned statuses" do
end end
describe "publish_one/1" do describe "publish_one/1" do
test_with_mock "it calls `Instances.set_unreachable` on target inbox on non-2xx HTTP response code", test_with_mock "calls `Instances.set_reachable` on successful federation if `unreachable_since` is not specified",
Instances,
[:passthrough],
[] do
actor = insert(:user)
inbox = "http://200.site/users/nick1/inbox"
assert {:ok, _} = ActivityPub.publish_one(%{inbox: inbox, json: "{}", actor: actor, id: 1})
assert called(Instances.set_reachable(inbox))
end
test_with_mock "calls `Instances.set_reachable` on successful federation if `unreachable_since` is set",
Instances,
[:passthrough],
[] do
actor = insert(:user)
inbox = "http://200.site/users/nick1/inbox"
assert {:ok, _} =
ActivityPub.publish_one(%{
inbox: inbox,
json: "{}",
actor: actor,
id: 1,
unreachable_since: NaiveDateTime.utc_now()
})
assert called(Instances.set_reachable(inbox))
end
test_with_mock "does NOT call `Instances.set_reachable` on successful federation if `unreachable_since` is nil",
Instances,
[:passthrough],
[] do
actor = insert(:user)
inbox = "http://200.site/users/nick1/inbox"
assert {:ok, _} =
ActivityPub.publish_one(%{
inbox: inbox,
json: "{}",
actor: actor,
id: 1,
unreachable_since: nil
})
refute called(Instances.set_reachable(inbox))
end
test_with_mock "calls `Instances.set_unreachable` on target inbox on non-2xx HTTP response code",
Instances, Instances,
[:passthrough], [:passthrough],
[] do [] do
@ -724,7 +774,7 @@ test "returned pinned statuses" do
assert called(Instances.set_unreachable(inbox)) assert called(Instances.set_unreachable(inbox))
end end
test_with_mock "it does NOT call `Instances.set_unreachable` if target is reachable", test_with_mock "does NOT call `Instances.set_unreachable` if target is reachable",
Instances, Instances,
[:passthrough], [:passthrough],
[] do [] do
@ -735,6 +785,25 @@ test "returned pinned statuses" do
refute called(Instances.set_unreachable(inbox)) refute called(Instances.set_unreachable(inbox))
end end
test_with_mock "does NOT call `Instances.set_unreachable` if target instance has non-nil `unreachable_since`",
Instances,
[:passthrough],
[] do
actor = insert(:user)
inbox = "http://connrefused.site/users/nick1/inbox"
assert {:error, _} =
ActivityPub.publish_one(%{
inbox: inbox,
json: "{}",
actor: actor,
id: 1,
unreachable_since: NaiveDateTime.utc_now()
})
refute called(Instances.set_unreachable(inbox))
end
end end
def data_uri do def data_uri do

View file

@ -95,15 +95,18 @@ test "with relays deactivated, it does not publish to the relay", %{
info: %{ap_enabled: true, source_data: %{"inbox" => inbox2}} info: %{ap_enabled: true, source_data: %{"inbox" => inbox2}}
}) })
Instances.set_unreachable( dt = NaiveDateTime.utc_now()
URI.parse(inbox2).host, Instances.set_unreachable(inbox1, dt)
Instances.reachability_datetime_threshold()
) Instances.set_consistently_unreachable(URI.parse(inbox2).host)
{:ok, _activity} = {:ok, _activity} =
CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"}) CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"})
assert called(Federator.enqueue(:publish_single_ap, %{inbox: inbox1})) assert called(
Federator.enqueue(:publish_single_ap, %{inbox: inbox1, unreachable_since: dt})
)
refute called(Federator.enqueue(:publish_single_ap, %{inbox: inbox2})) refute called(Federator.enqueue(:publish_single_ap, %{inbox: inbox2}))
end end
@ -128,11 +131,20 @@ test "with relays deactivated, it does not publish to the relay", %{
callback: "https://pleroma2.soykaf.com/cb" callback: "https://pleroma2.soykaf.com/cb"
}) })
dt = NaiveDateTime.utc_now()
Instances.set_unreachable(sub2.callback, dt)
Instances.set_consistently_unreachable(sub1.callback) Instances.set_consistently_unreachable(sub1.callback)
{:ok, _activity} = CommonAPI.post(user, %{"status" => "HI"}) {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI"})
assert called(Federator.enqueue(:publish_single_websub, %{callback: sub2.callback})) assert called(
Federator.enqueue(:publish_single_websub, %{
callback: sub2.callback,
unreachable_since: dt
})
)
refute called(Federator.enqueue(:publish_single_websub, %{callback: sub1.callback})) refute called(Federator.enqueue(:publish_single_websub, %{callback: sub1.callback}))
end end
@ -158,13 +170,22 @@ test "with relays deactivated, it does not publish to the relay", %{
info: %{salmon: "https://domain2.com/salmon"} info: %{salmon: "https://domain2.com/salmon"}
}) })
dt = NaiveDateTime.utc_now()
Instances.set_unreachable(remote_user2.ap_id, dt)
Instances.set_consistently_unreachable("domain.com") Instances.set_consistently_unreachable("domain.com")
{:ok, _activity} = {:ok, _activity} =
CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"}) CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"})
assert called(Federator.enqueue(:publish_single_salmon, {remote_user2, :_, :_})) assert called(
refute called(Federator.enqueue(:publish_single_websub, {remote_user1, :_, :_})) Federator.enqueue(:publish_single_salmon, %{
recipient: remote_user2,
unreachable_since: dt
})
)
refute called(Federator.enqueue(:publish_single_websub, %{recipient: remote_user1}))
end end
end end

View file

@ -47,7 +47,7 @@ test "returns true on non-binary input" do
end end
describe "filter_reachable/1" do describe "filter_reachable/1" do
test "keeps only reachable elements of supplied list" do setup do
host = "consistently-unreachable.name" host = "consistently-unreachable.name"
url1 = "http://eventually-unreachable.com/path" url1 = "http://eventually-unreachable.com/path"
url2 = "http://domain.com/path" url2 = "http://domain.com/path"
@ -55,7 +55,26 @@ test "keeps only reachable elements of supplied list" do
Instances.set_consistently_unreachable(host) Instances.set_consistently_unreachable(host)
Instances.set_unreachable(url1) Instances.set_unreachable(url1)
assert [url1, url2] == Instances.filter_reachable([host, url1, url2]) result = Instances.filter_reachable([host, url1, url2, nil])
%{result: result, url1: url1, url2: url2}
end
test "returns a map with keys containing 'not marked consistently unreachable' elements of supplied list",
%{result: result, url1: url1, url2: url2} do
assert is_map(result)
assert Enum.sort([url1, url2]) == result |> Map.keys() |> Enum.sort()
end
test "returns a map with `unreachable_since` values for keys",
%{result: result, url1: url1, url2: url2} do
assert is_map(result)
assert %NaiveDateTime{} = result[url1]
assert is_nil(result[url2])
end
test "returns an empty map for empty list or list containing no hosts / url" do
assert %{} == Instances.filter_reachable([])
assert %{} == Instances.filter_reachable([nil])
end end
end end