From b40b4bc4e5b49ac2b35746cee7b1db92428d3ee1 Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sun, 3 Feb 2019 12:41:27 +0300 Subject: [PATCH] [#582] Optimized federation retirement by reducing the number of SQL calls (calling `Instances.set_reachable/1` only if instance had `unreachable_since`, calling `Instances.set_unreachable/1` only if instance had nil `unreachable_since`). --- lib/pleroma/instances/instance.ex | 25 ++++++++++++----- lib/pleroma/web/activity_pub/activity_pub.ex | 13 ++++----- lib/pleroma/web/federator/federator.ex | 4 +-- lib/pleroma/web/salmon/salmon.ex | 28 ++++++++++++-------- lib/pleroma/web/websub/websub.ex | 12 +++++---- test/web/federator_test.exs | 4 +-- test/web/instances/instances_test.exs | 23 ++++++++++++++-- 7 files changed, 74 insertions(+), 35 deletions(-) diff --git a/lib/pleroma/instances/instance.ex b/lib/pleroma/instances/instance.ex index a87590d8b..4a4ca26dd 100644 --- a/lib/pleroma/instances/instance.ex +++ b/lib/pleroma/instances/instance.ex @@ -26,7 +26,7 @@ def changeset(struct, params \\ %{}) do |> unique_constraint(:host) end - def filter_reachable([]), do: [] + def filter_reachable([]), do: %{} def filter_reachable(urls_or_hosts) when is_list(urls_or_hosts) do hosts = @@ -34,17 +34,28 @@ def filter_reachable(urls_or_hosts) when is_list(urls_or_hosts) do |> Enum.map(&(&1 && host(&1))) |> Enum.filter(&(to_string(&1) != "")) - unreachable_hosts = + unreachable_since_by_host = Repo.all( from(i in Instance, - where: - i.host in ^hosts and - i.unreachable_since <= ^Instances.reachability_datetime_threshold(), - select: i.host + where: i.host in ^hosts, + select: {i.host, i.unreachable_since} ) ) + |> Map.new(& &1) - Enum.filter(urls_or_hosts, &(&1 && host(&1) not in unreachable_hosts)) + reachability_datetime_threshold = Instances.reachability_datetime_threshold() + + for entry <- Enum.filter(urls_or_hosts, &is_binary/1) do + host = host(entry) + unreachable_since = unreachable_since_by_host[host] + + if !unreachable_since || + NaiveDateTime.compare(unreachable_since, reachability_datetime_threshold) == :gt do + {entry, unreachable_since} + end + end + |> Enum.filter(& &1) + |> Map.new(& &1) end def reachable?(url_or_host) when is_binary(url_or_host) do diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 06e8c3f1c..5f6c8e7d3 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -744,7 +744,7 @@ def publish(actor, activity) do public = is_public?(activity) - remote_inboxes = + reachable_inboxes_metadata = (Pleroma.Web.Salmon.remote_users(activity) ++ remote_followers) |> Enum.filter(fn user -> User.ap_enabled?(user) end) |> Enum.map(fn %{info: %{source_data: data}} -> @@ -757,17 +757,18 @@ def publish(actor, activity) do {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) json = Jason.encode!(data) - Enum.each(remote_inboxes, fn inbox -> + Enum.each(reachable_inboxes_metadata, fn {inbox, unreachable_since} -> Federator.enqueue(:publish_single_ap, %{ inbox: inbox, json: json, actor: actor, - id: activity.data["id"] + id: activity.data["id"], + unreachable_since: unreachable_since }) end) end - def publish_one(%{inbox: inbox, json: json, actor: actor, id: id}) do + def publish_one(%{inbox: inbox, json: json, actor: actor, id: id} = params) do Logger.info("Federating #{id} to #{inbox}") host = URI.parse(inbox).host @@ -791,11 +792,11 @@ def publish_one(%{inbox: inbox, json: json, actor: actor, id: id}) do {"digest", digest} ] ) do - Instances.set_reachable(inbox) + if params[:unreachable_since], do: Instances.set_reachable(inbox) result else {_post_result, response} -> - Instances.set_unreachable(inbox) + unless params[:unreachable_since], do: Instances.set_unreachable(inbox) {:error, response} end end diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index 46f7a4973..bb7676cf0 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -124,8 +124,8 @@ def handle(:incoming_ap_doc, params) do end end - def handle(:publish_single_salmon, {user_or_url, feed, poster}) do - Salmon.send_to_user(user_or_url, feed, poster) + def handle(:publish_single_salmon, params) do + Salmon.send_to_user(params) end def handle(:publish_single_ap, params) do diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex index 07ca42a5f..4d519ece4 100644 --- a/lib/pleroma/web/salmon/salmon.ex +++ b/lib/pleroma/web/salmon/salmon.ex @@ -162,30 +162,29 @@ def remote_users(%{data: %{"to" => to} = data}) do |> Enum.filter(fn user -> user && !user.local end) end - # push an activity to remote accounts - # - def send_to_user(%{info: %{salmon: salmon}}, feed, poster), - do: send_to_user(salmon, feed, poster) + @doc "Pushes an activity to remote account." + def send_to_user(%{recipient: %{info: %{salmon: salmon}}} = params), + do: send_to_user(Map.put(params, :recipient, salmon)) - def send_to_user(url, feed, poster) when is_binary(url) do + def send_to_user(%{recipient: url, feed: feed, poster: poster} = params) when is_binary(url) do with {:ok, %{status: code}} when code in 200..299 <- poster.( url, feed, [{"Content-Type", "application/magic-envelope+xml"}] ) do - Instances.set_reachable(url) + if params[:unreachable_since], do: Instances.set_reachable(url) Logger.debug(fn -> "Pushed to #{url}, code #{code}" end) :ok else e -> - Instances.set_unreachable(url) + unless params[:unreachable_since], do: Instances.set_reachable(url) Logger.debug(fn -> "Pushing Salmon to #{url} failed, #{inspect(e)}" end) :error end end - def send_to_user(_, _, _), do: :noop + def send_to_user(_), do: :noop @supported_activities [ "Create", @@ -218,13 +217,20 @@ def publish(%{info: %{keys: keys}} = user, %{data: %{"type" => type}} = activity remote_users = remote_users(activity) salmon_urls = Enum.map(remote_users, & &1.info.salmon) - reachable_salmon_urls = Instances.filter_reachable(salmon_urls) + reachable_urls_metadata = Instances.filter_reachable(salmon_urls) + reachable_urls = Map.keys(reachable_urls_metadata) remote_users - |> Enum.filter(&(&1.info.salmon in reachable_salmon_urls)) + |> Enum.filter(&(&1.info.salmon in reachable_urls)) |> Enum.each(fn remote_user -> Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end) - Pleroma.Web.Federator.enqueue(:publish_single_salmon, {remote_user, feed, poster}) + + Pleroma.Web.Federator.enqueue(:publish_single_salmon, %{ + recipient: remote_user, + feed: feed, + poster: poster, + unreachable_since: reachable_urls_metadata[remote_user.info.salmon] + }) end) end end diff --git a/lib/pleroma/web/websub/websub.ex b/lib/pleroma/web/websub/websub.ex index 8f7d53b03..cf51dce76 100644 --- a/lib/pleroma/web/websub/websub.ex +++ b/lib/pleroma/web/websub/websub.ex @@ -70,7 +70,8 @@ def publish(topic, user, %{data: %{"type" => type}} = activity) subscriptions = Repo.all(query) callbacks = Enum.map(subscriptions, & &1.callback) - reachable_callbacks = Instances.filter_reachable(callbacks) + reachable_callbacks_metadata = Instances.filter_reachable(callbacks) + reachable_callbacks = Map.keys(reachable_callbacks_metadata) subscriptions |> Enum.filter(&(&1.callback in reachable_callbacks)) @@ -79,7 +80,8 @@ def publish(topic, user, %{data: %{"type" => type}} = activity) xml: response, topic: topic, callback: sub.callback, - secret: sub.secret + secret: sub.secret, + unreachable_since: reachable_callbacks_metadata[sub.callback] } Pleroma.Web.Federator.enqueue(:publish_single_websub, data) @@ -268,7 +270,7 @@ def refresh_subscriptions(delta \\ 60 * 60 * 24) do end) end - def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret}) do + def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret} = params) do signature = sign(secret || "", xml) Logger.info(fn -> "Pushing #{topic} to #{callback}" end) @@ -281,12 +283,12 @@ def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret}) d {"X-Hub-Signature", "sha1=#{signature}"} ] ) do - Instances.set_reachable(callback) + if params[:unreachable_since], do: Instances.set_reachable(callback) Logger.info(fn -> "Pushed to #{callback}, code #{code}" end) {:ok, code} else {_post_result, response} -> - Instances.set_unreachable(callback) + unless params[:unreachable_since], do: Instances.set_reachable(callback) Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(response)}" end) {:error, response} end diff --git a/test/web/federator_test.exs b/test/web/federator_test.exs index c6d10ef78..7bb249d74 100644 --- a/test/web/federator_test.exs +++ b/test/web/federator_test.exs @@ -163,8 +163,8 @@ test "with relays deactivated, it does not publish to the relay", %{ {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"}) - assert called(Federator.enqueue(:publish_single_salmon, {remote_user2, :_, :_})) - refute called(Federator.enqueue(:publish_single_websub, {remote_user1, :_, :_})) + assert called(Federator.enqueue(:publish_single_salmon, %{recipient: remote_user2})) + refute called(Federator.enqueue(:publish_single_websub, %{recipient: remote_user1})) end end diff --git a/test/web/instances/instances_test.exs b/test/web/instances/instances_test.exs index adb8560a7..2530c09fe 100644 --- a/test/web/instances/instances_test.exs +++ b/test/web/instances/instances_test.exs @@ -47,7 +47,7 @@ test "returns true on non-binary input" do end describe "filter_reachable/1" do - test "keeps only reachable elements of supplied list" do + setup do host = "consistently-unreachable.name" url1 = "http://eventually-unreachable.com/path" url2 = "http://domain.com/path" @@ -55,7 +55,26 @@ test "keeps only reachable elements of supplied list" do Instances.set_consistently_unreachable(host) Instances.set_unreachable(url1) - assert [url1, url2] == Instances.filter_reachable([host, url1, url2]) + result = Instances.filter_reachable([host, url1, url2, nil]) + %{result: result, url1: url1, url2: url2} + end + + test "returns a map with keys containing 'not marked consistently unreachable' elements of supplied list", + %{result: result, url1: url1, url2: url2} do + assert is_map(result) + assert Enum.sort([url1, url2]) == result |> Map.keys() |> Enum.sort() + end + + test "returns a map with `unreachable_since` values for keys", + %{result: result, url1: url1, url2: url2} do + assert is_map(result) + assert %NaiveDateTime{} = result[url1] + assert is_nil(result[url2]) + end + + test "returns an empty map for empty list or list containing no hosts / url" do + assert %{} == Instances.filter_reachable([]) + assert %{} == Instances.filter_reachable([nil]) end end