From 7e6ec778d965419ed4083428d4d39b2a689f7619 Mon Sep 17 00:00:00 2001
From: Alexander Strizhakov <alex.strizhakov@gmail.com>
Date: Wed, 20 May 2020 17:45:06 +0300
Subject: [PATCH] exclude replies on blocked domains

---
 benchmarks/load_testing/activities.ex         | 365 ++++++++++--------
 benchmarks/load_testing/fetcher.ex            |  71 ++++
 benchmarks/load_testing/users.ex              |  22 +-
 .../mix/tasks/pleroma/benchmarks/tags.ex      |  38 +-
 lib/pleroma/web/activity_pub/activity_pub.ex  |  27 ++
 .../api_spec/operations/timeline_operation.ex |   7 +
 .../controllers/timeline_controller.ex        |  13 +-
 ...ients_contain_blocked_domains_function.exs |  33 ++
 .../controllers/timeline_controller_test.exs  |  68 ++++
 9 files changed, 469 insertions(+), 175 deletions(-)
 create mode 100644 priv/repo/migrations/20200520155351_add_recipients_contain_blocked_domains_function.exs

diff --git a/benchmarks/load_testing/activities.ex b/benchmarks/load_testing/activities.ex
index ff0d481a8..074ded457 100644
--- a/benchmarks/load_testing/activities.ex
+++ b/benchmarks/load_testing/activities.ex
@@ -22,8 +22,21 @@ defmodule Pleroma.LoadTesting.Activities do
   @max_concurrency 10
 
   @visibility ~w(public private direct unlisted)
-  @types ~w(simple emoji mentions hell_thread attachment tag like reblog simple_thread remote)
-  @groups ~w(user friends non_friends)
+  @types [
+    :simple,
+    :emoji,
+    :mentions,
+    :hell_thread,
+    :attachment,
+    :tag,
+    :like,
+    :reblog,
+    :simple_thread
+  ]
+  @groups [:friends_local, :friends_remote, :non_friends_local, :non_friends_local]
+  @remote_groups [:friends_remote, :non_friends_remote]
+  @friends_groups [:friends_local, :friends_remote]
+  @non_friends_groups [:non_friends_local, :non_friends_remote]
 
   @spec generate(User.t(), keyword()) :: :ok
   def generate(user, opts \\ []) do
@@ -34,33 +47,24 @@ def generate(user, opts \\ []) do
 
     opts = Keyword.merge(@defaults, opts)
 
-    friends =
-      user
-      |> Users.get_users(limit: opts[:friends_used], local: :local, friends?: true)
-      |> Enum.shuffle()
+    users = Users.prepare_users(user, opts)
 
-    non_friends =
-      user
-      |> Users.get_users(limit: opts[:non_friends_used], local: :local, friends?: false)
-      |> Enum.shuffle()
+    {:ok, _} = Agent.start_link(fn -> users[:non_friends_remote] end, name: :non_friends_remote)
 
     task_data =
       for visibility <- @visibility,
           type <- @types,
-          group <- @groups,
+          group <- [:user | @groups],
           do: {visibility, type, group}
 
     IO.puts("Starting generating #{opts[:iterations]} iterations of activities...")
 
-    friends_thread = Enum.take(friends, 5)
-    non_friends_thread = Enum.take(friends, 5)
-
     public_long_thread = fn ->
-      generate_long_thread("public", user, friends_thread, non_friends_thread, opts)
+      generate_long_thread("public", users, opts)
     end
 
     private_long_thread = fn ->
-      generate_long_thread("private", user, friends_thread, non_friends_thread, opts)
+      generate_long_thread("private", users, opts)
     end
 
     iterations = opts[:iterations]
@@ -73,10 +77,10 @@ def generate(user, opts \\ []) do
             i when i == iterations - 2 ->
               spawn(public_long_thread)
               spawn(private_long_thread)
-              generate_activities(user, friends, non_friends, Enum.shuffle(task_data), opts)
+              generate_activities(users, Enum.shuffle(task_data), opts)
 
             _ ->
-              generate_activities(user, friends, non_friends, Enum.shuffle(task_data), opts)
+              generate_activities(users, Enum.shuffle(task_data), opts)
           end
         )
       end)
@@ -127,16 +131,16 @@ def generate_tagged_activities(opts \\ []) do
     end)
   end
 
-  defp generate_long_thread(visibility, user, friends, non_friends, _opts) do
+  defp generate_long_thread(visibility, users, _opts) do
     group =
       if visibility == "public",
-        do: "friends",
-        else: "user"
+        do: :friends_local,
+        else: :user
 
     tasks = get_reply_tasks(visibility, group) |> Stream.cycle() |> Enum.take(50)
 
     {:ok, activity} =
-      CommonAPI.post(user, %{
+      CommonAPI.post(users[:user], %{
         status: "Start of #{visibility} long thread",
         visibility: visibility
       })
@@ -150,31 +154,28 @@ defp generate_long_thread(visibility, user, friends, non_friends, _opts) do
       Map.put(state, key, activity)
     end)
 
-    acc = {activity.id, ["@" <> user.nickname, "reply to long thread"]}
-    insert_replies_for_long_thread(tasks, visibility, user, friends, non_friends, acc)
+    acc = {activity.id, ["@" <> users[:user].nickname, "reply to long thread"]}
+    insert_replies_for_long_thread(tasks, visibility, users, acc)
     IO.puts("Generating #{visibility} long thread ended\n")
   end
 
-  defp insert_replies_for_long_thread(tasks, visibility, user, friends, non_friends, acc) do
+  defp insert_replies_for_long_thread(tasks, visibility, users, acc) do
     Enum.reduce(tasks, acc, fn
-      "friend", {id, data} ->
-        friend = Enum.random(friends)
-        insert_reply(friend, List.delete(data, "@" <> friend.nickname), id, visibility)
-
-      "non_friend", {id, data} ->
-        non_friend = Enum.random(non_friends)
-        insert_reply(non_friend, List.delete(data, "@" <> non_friend.nickname), id, visibility)
-
-      "user", {id, data} ->
+      :user, {id, data} ->
+        user = users[:user]
         insert_reply(user, List.delete(data, "@" <> user.nickname), id, visibility)
+
+      group, {id, data} ->
+        replier = Enum.random(users[group])
+        insert_reply(replier, List.delete(data, "@" <> replier.nickname), id, visibility)
     end)
   end
 
-  defp generate_activities(user, friends, non_friends, task_data, opts) do
+  defp generate_activities(users, task_data, opts) do
     Task.async_stream(
       task_data,
       fn {visibility, type, group} ->
-        insert_activity(type, visibility, group, user, friends, non_friends, opts)
+        insert_activity(type, visibility, group, users, opts)
       end,
       max_concurrency: @max_concurrency,
       timeout: 30_000
@@ -182,67 +183,104 @@ defp generate_activities(user, friends, non_friends, task_data, opts) do
     |> Stream.run()
   end
 
-  defp insert_activity("simple", visibility, group, user, friends, non_friends, _opts) do
-    {:ok, _activity} =
+  defp insert_local_activity(visibility, group, users, status) do
+    {:ok, _} =
       group
-      |> get_actor(user, friends, non_friends)
-      |> CommonAPI.post(%{status: "Simple status", visibility: visibility})
+      |> get_actor(users)
+      |> CommonAPI.post(%{status: status, visibility: visibility})
   end
 
-  defp insert_activity("emoji", visibility, group, user, friends, non_friends, _opts) do
-    {:ok, _activity} =
-      group
-      |> get_actor(user, friends, non_friends)
-      |> CommonAPI.post(%{
-        status: "Simple status with emoji :firefox:",
-        visibility: visibility
-      })
+  defp insert_remote_activity(visibility, group, users, status) do
+    actor = get_actor(group, users)
+    {act_data, obj_data} = prepare_activity_data(actor, visibility, users[:user])
+    {activity_data, object_data} = other_data(actor, status)
+
+    activity_data
+    |> Map.merge(act_data)
+    |> Map.put("object", Map.merge(object_data, obj_data))
+    |> Pleroma.Web.ActivityPub.ActivityPub.insert(false)
   end
 
-  defp insert_activity("mentions", visibility, group, user, friends, non_friends, _opts) do
+  defp user_mentions(users) do
     user_mentions =
-      get_random_mentions(friends, Enum.random(0..3)) ++
-        get_random_mentions(non_friends, Enum.random(0..3))
+      Enum.reduce(
+        @groups,
+        [],
+        fn group, acc ->
+          acc ++ get_random_mentions(users[group], Enum.random(0..2))
+        end
+      )
 
-    user_mentions =
-      if Enum.random([true, false]),
-        do: ["@" <> user.nickname | user_mentions],
-        else: user_mentions
-
-    {:ok, _activity} =
-      group
-      |> get_actor(user, friends, non_friends)
-      |> CommonAPI.post(%{
-        status: Enum.join(user_mentions, ", ") <> " simple status with mentions",
-        visibility: visibility
-      })
+    if Enum.random([true, false]),
+      do: ["@" <> users[:user].nickname | user_mentions],
+      else: user_mentions
   end
 
-  defp insert_activity("hell_thread", visibility, group, user, friends, non_friends, _opts) do
-    mentions =
-      with {:ok, nil} <- Cachex.get(:user_cache, "hell_thread_mentions") do
-        cached =
-          ([user | Enum.take(friends, 10)] ++ Enum.take(non_friends, 10))
-          |> Enum.map(&"@#{&1.nickname}")
-          |> Enum.join(", ")
+  defp hell_thread_mentions(users) do
+    with {:ok, nil} <- Cachex.get(:user_cache, "hell_thread_mentions") do
+      cached =
+        @groups
+        |> Enum.reduce([users[:user]], fn group, acc ->
+          acc ++ Enum.take(users[group], 5)
+        end)
+        |> Enum.map(&"@#{&1.nickname}")
+        |> Enum.join(", ")
 
-        Cachex.put(:user_cache, "hell_thread_mentions", cached)
-        cached
-      else
-        {:ok, cached} -> cached
-      end
-
-    {:ok, _activity} =
-      group
-      |> get_actor(user, friends, non_friends)
-      |> CommonAPI.post(%{
-        status: mentions <> " hell thread status",
-        visibility: visibility
-      })
+      Cachex.put(:user_cache, "hell_thread_mentions", cached)
+      cached
+    else
+      {:ok, cached} -> cached
+    end
   end
 
-  defp insert_activity("attachment", visibility, group, user, friends, non_friends, _opts) do
-    actor = get_actor(group, user, friends, non_friends)
+  defp insert_activity(:simple, visibility, group, users, _opts)
+       when group in @remote_groups do
+    insert_remote_activity(visibility, group, users, "Remote status")
+  end
+
+  defp insert_activity(:simple, visibility, group, users, _opts) do
+    insert_local_activity(visibility, group, users, "Simple status")
+  end
+
+  defp insert_activity(:emoji, visibility, group, users, _opts)
+       when group in @remote_groups do
+    insert_remote_activity(visibility, group, users, "Remote status with emoji :firefox:")
+  end
+
+  defp insert_activity(:emoji, visibility, group, users, _opts) do
+    insert_local_activity(visibility, group, users, "Simple status with emoji :firefox:")
+  end
+
+  defp insert_activity(:mentions, visibility, group, users, _opts)
+       when group in @remote_groups do
+    mentions = user_mentions(users)
+
+    status = Enum.join(mentions, ", ") <> " remote status with mentions"
+
+    insert_remote_activity(visibility, group, users, status)
+  end
+
+  defp insert_activity(:mentions, visibility, group, users, _opts) do
+    mentions = user_mentions(users)
+
+    status = Enum.join(mentions, ", ") <> " simple status with mentions"
+    insert_remote_activity(visibility, group, users, status)
+  end
+
+  defp insert_activity(:hell_thread, visibility, group, users, _)
+       when group in @remote_groups do
+    mentions = hell_thread_mentions(users)
+    insert_remote_activity(visibility, group, users, mentions <> " remote hell thread status")
+  end
+
+  defp insert_activity(:hell_thread, visibility, group, users, _opts) do
+    mentions = hell_thread_mentions(users)
+
+    insert_local_activity(visibility, group, users, mentions <> " hell thread status")
+  end
+
+  defp insert_activity(:attachment, visibility, group, users, _opts) do
+    actor = get_actor(group, users)
 
     obj_data = %{
       "actor" => actor.ap_id,
@@ -268,67 +306,54 @@ defp insert_activity("attachment", visibility, group, user, friends, non_friends
       })
   end
 
-  defp insert_activity("tag", visibility, group, user, friends, non_friends, _opts) do
-    {:ok, _activity} =
-      group
-      |> get_actor(user, friends, non_friends)
-      |> CommonAPI.post(%{status: "Status with #tag", visibility: visibility})
+  defp insert_activity(:tag, visibility, group, users, _opts) do
+    insert_local_activity(visibility, group, users, "Status with #tag")
   end
 
-  defp insert_activity("like", visibility, group, user, friends, non_friends, opts) do
-    actor = get_actor(group, user, friends, non_friends)
+  defp insert_activity(:like, visibility, group, users, opts) do
+    actor = get_actor(group, users)
 
     with activity_id when not is_nil(activity_id) <- get_random_create_activity_id(),
          {:ok, _activity} <- CommonAPI.favorite(actor, activity_id) do
       :ok
     else
       {:error, _} ->
-        insert_activity("like", visibility, group, user, friends, non_friends, opts)
+        insert_activity(:like, visibility, group, users, opts)
 
       nil ->
         Process.sleep(15)
-        insert_activity("like", visibility, group, user, friends, non_friends, opts)
+        insert_activity(:like, visibility, group, users, opts)
     end
   end
 
-  defp insert_activity("reblog", visibility, group, user, friends, non_friends, opts) do
-    actor = get_actor(group, user, friends, non_friends)
+  defp insert_activity(:reblog, visibility, group, users, opts) do
+    actor = get_actor(group, users)
 
     with activity_id when not is_nil(activity_id) <- get_random_create_activity_id(),
-         {:ok, _activity, _object} <- CommonAPI.repeat(activity_id, actor) do
+         {:ok, _activity} <- CommonAPI.repeat(activity_id, actor) do
       :ok
     else
       {:error, _} ->
-        insert_activity("reblog", visibility, group, user, friends, non_friends, opts)
+        insert_activity(:reblog, visibility, group, users, opts)
 
       nil ->
         Process.sleep(15)
-        insert_activity("reblog", visibility, group, user, friends, non_friends, opts)
+        insert_activity(:reblog, visibility, group, users, opts)
     end
   end
 
-  defp insert_activity("simple_thread", visibility, group, user, friends, non_friends, _opts)
-       when visibility in ["public", "unlisted", "private"] do
-    actor = get_actor(group, user, friends, non_friends)
-    tasks = get_reply_tasks(visibility, group)
-
-    {:ok, activity} = CommonAPI.post(user, %{status: "Simple status", visibility: visibility})
-
-    acc = {activity.id, ["@" <> actor.nickname, "reply to status"]}
-    insert_replies(tasks, visibility, user, friends, non_friends, acc)
-  end
-
-  defp insert_activity("simple_thread", "direct", group, user, friends, non_friends, _opts) do
-    actor = get_actor(group, user, friends, non_friends)
+  defp insert_activity(:simple_thread, "direct", group, users, _opts) do
+    actor = get_actor(group, users)
     tasks = get_reply_tasks("direct", group)
 
     list =
       case group do
-        "non_friends" ->
-          Enum.take(non_friends, 3)
+        :user ->
+          group = Enum.random(@friends_groups)
+          Enum.take(users[group], 3)
 
         _ ->
-          Enum.take(friends, 3)
+          Enum.take(users[group], 3)
       end
 
     data = Enum.map(list, &("@" <> &1.nickname))
@@ -339,40 +364,30 @@ defp insert_activity("simple_thread", "direct", group, user, friends, non_friend
         visibility: "direct"
       })
 
-    acc = {activity.id, ["@" <> user.nickname | data] ++ ["reply to status"]}
-    insert_direct_replies(tasks, user, list, acc)
+    acc = {activity.id, ["@" <> users[:user].nickname | data] ++ ["reply to status"]}
+    insert_direct_replies(tasks, users[:user], list, acc)
   end
 
-  defp insert_activity("remote", _, "user", _, _, _, _), do: :ok
+  defp insert_activity(:simple_thread, visibility, group, users, _opts) do
+    actor = get_actor(group, users)
+    tasks = get_reply_tasks(visibility, group)
 
-  defp insert_activity("remote", visibility, group, user, _friends, _non_friends, opts) do
-    remote_friends =
-      Users.get_users(user, limit: opts[:friends_used], local: :external, friends?: true)
+    {:ok, activity} =
+      CommonAPI.post(users[:user], %{status: "Simple status", visibility: visibility})
 
-    remote_non_friends =
-      Users.get_users(user, limit: opts[:non_friends_used], local: :external, friends?: false)
-
-    actor = get_actor(group, user, remote_friends, remote_non_friends)
-
-    {act_data, obj_data} = prepare_activity_data(actor, visibility, user)
-    {activity_data, object_data} = other_data(actor)
-
-    activity_data
-    |> Map.merge(act_data)
-    |> Map.put("object", Map.merge(object_data, obj_data))
-    |> Pleroma.Web.ActivityPub.ActivityPub.insert(false)
+    acc = {activity.id, ["@" <> actor.nickname, "reply to status"]}
+    insert_replies(tasks, visibility, users, acc)
   end
 
-  defp get_actor("user", user, _friends, _non_friends), do: user
-  defp get_actor("friends", _user, friends, _non_friends), do: Enum.random(friends)
-  defp get_actor("non_friends", _user, _friends, non_friends), do: Enum.random(non_friends)
+  defp get_actor(:user, %{user: user}), do: user
+  defp get_actor(group, users), do: Enum.random(users[group])
 
-  defp other_data(actor) do
+  defp other_data(actor, content) do
     %{host: host} = URI.parse(actor.ap_id)
     datetime = DateTime.utc_now()
-    context_id = "http://#{host}:4000/contexts/#{UUID.generate()}"
-    activity_id = "http://#{host}:4000/activities/#{UUID.generate()}"
-    object_id = "http://#{host}:4000/objects/#{UUID.generate()}"
+    context_id = "https://#{host}/contexts/#{UUID.generate()}"
+    activity_id = "https://#{host}/activities/#{UUID.generate()}"
+    object_id = "https://#{host}/objects/#{UUID.generate()}"
 
     activity_data = %{
       "actor" => actor.ap_id,
@@ -389,7 +404,7 @@ defp other_data(actor) do
       "attributedTo" => actor.ap_id,
       "bcc" => [],
       "bto" => [],
-      "content" => "Remote post",
+      "content" => content,
       "context" => context_id,
       "conversation" => context_id,
       "emoji" => %{},
@@ -475,51 +490,65 @@ defp prepare_activity_data(_actor, "direct", mention) do
     {act_data, obj_data}
   end
 
-  defp get_reply_tasks("public", "user"), do: ~w(friend non_friend user)
-  defp get_reply_tasks("public", "friends"), do: ~w(non_friend user friend)
-  defp get_reply_tasks("public", "non_friends"), do: ~w(user friend non_friend)
+  defp get_reply_tasks("public", :user) do
+    [:friends_local, :friends_remote, :non_friends_local, :non_friends_remote, :user]
+  end
 
-  defp get_reply_tasks(visibility, "user") when visibility in ["unlisted", "private"],
-    do: ~w(friend user friend)
+  defp get_reply_tasks("public", group) when group in @friends_groups do
+    [:non_friends_local, :non_friends_remote, :user, :friends_local, :friends_remote]
+  end
 
-  defp get_reply_tasks(visibility, "friends") when visibility in ["unlisted", "private"],
-    do: ~w(user friend user)
+  defp get_reply_tasks("public", group) when group in @non_friends_groups do
+    [:user, :friends_local, :friends_remote, :non_friends_local, :non_friends_remote]
+  end
 
-  defp get_reply_tasks(visibility, "non_friends") when visibility in ["unlisted", "private"],
-    do: []
+  defp get_reply_tasks(visibility, :user) when visibility in ["unlisted", "private"] do
+    [:friends_local, :friends_remote, :user, :friends_local, :friends_remote]
+  end
 
-  defp get_reply_tasks("direct", "user"), do: ~w(friend user friend)
-  defp get_reply_tasks("direct", "friends"), do: ~w(user friend user)
-  defp get_reply_tasks("direct", "non_friends"), do: ~w(user non_friend user)
+  defp get_reply_tasks(visibility, group)
+       when visibility in ["unlisted", "private"] and group in @friends_groups do
+    [:user, :friends_remote, :friends_local, :user]
+  end
 
-  defp insert_replies(tasks, visibility, user, friends, non_friends, acc) do
+  defp get_reply_tasks(visibility, group)
+       when visibility in ["unlisted", "private"] and
+              group in @non_friends_groups,
+       do: []
+
+  defp get_reply_tasks("direct", :user), do: [:friends_local, :user, :friends_remote]
+
+  defp get_reply_tasks("direct", group) when group in @friends_groups,
+    do: [:user, group, :user]
+
+  defp get_reply_tasks("direct", group) when group in @non_friends_groups do
+    [:user, :non_friends_remote, :user, :non_friends_local]
+  end
+
+  defp insert_replies(tasks, visibility, users, acc) do
     Enum.reduce(tasks, acc, fn
-      "friend", {id, data} ->
-        friend = Enum.random(friends)
-        insert_reply(friend, data, id, visibility)
+      :user, {id, data} ->
+        insert_reply(users[:user], data, id, visibility)
 
-      "non_friend", {id, data} ->
-        non_friend = Enum.random(non_friends)
-        insert_reply(non_friend, data, id, visibility)
-
-      "user", {id, data} ->
-        insert_reply(user, data, id, visibility)
+      group, {id, data} ->
+        replier = Enum.random(users[group])
+        insert_reply(replier, data, id, visibility)
     end)
   end
 
   defp insert_direct_replies(tasks, user, list, acc) do
     Enum.reduce(tasks, acc, fn
-      group, {id, data} when group in ["friend", "non_friend"] ->
+      :user, {id, data} ->
+        {reply_id, _} = insert_reply(user, List.delete(data, "@" <> user.nickname), id, "direct")
+        {reply_id, data}
+
+      _, {id, data} ->
         actor = Enum.random(list)
 
         {reply_id, _} =
           insert_reply(actor, List.delete(data, "@" <> actor.nickname), id, "direct")
 
         {reply_id, data}
-
-      "user", {id, data} ->
-        {reply_id, _} = insert_reply(user, List.delete(data, "@" <> user.nickname), id, "direct")
-        {reply_id, data}
     end)
   end
 
diff --git a/benchmarks/load_testing/fetcher.ex b/benchmarks/load_testing/fetcher.ex
index 0de4924bc..b278faf9f 100644
--- a/benchmarks/load_testing/fetcher.ex
+++ b/benchmarks/load_testing/fetcher.ex
@@ -36,6 +36,7 @@ defp fetch_timelines(user) do
     fetch_home_timeline(user)
     fetch_direct_timeline(user)
     fetch_public_timeline(user)
+    fetch_public_timeline(user, :with_blocks)
     fetch_public_timeline(user, :local)
     fetch_public_timeline(user, :tag)
     fetch_notifications(user)
@@ -227,6 +228,76 @@ defp fetch_public_timeline(user, :only_media) do
     fetch_public_timeline(opts, "public timeline only media")
   end
 
+  # TODO: remove using `:method` after benchmarks
+  defp fetch_public_timeline(user, :with_blocks) do
+    opts = opts_for_public_timeline(user)
+
+    remote_non_friends = Agent.get(:non_friends_remote, & &1)
+
+    Benchee.run(
+      %{
+        "public timeline without blocks" => fn opts ->
+          ActivityPub.fetch_public_activities(opts)
+        end
+      },
+      inputs: %{
+        "old filtering" => Map.delete(opts, :method),
+        "with psql fun" => Map.put(opts, :method, :fun),
+        "with unnest" => Map.put(opts, :method, :unnest)
+      }
+    )
+
+    Enum.each(remote_non_friends, fn non_friend ->
+      {:ok, _} = User.block(user, non_friend)
+    end)
+
+    user = User.get_by_id(user.id)
+
+    opts = Map.put(opts, "blocking_user", user)
+
+    Benchee.run(
+      %{
+        "public timeline with user block" => fn opts ->
+          ActivityPub.fetch_public_activities(opts)
+        end
+      },
+      inputs: %{
+        "old filtering" => Map.delete(opts, :method),
+        "with psql fun" => Map.put(opts, :method, :fun),
+        "with unnest" => Map.put(opts, :method, :unnest)
+      }
+    )
+
+    domains =
+      Enum.reduce(remote_non_friends, [], fn non_friend, domains ->
+        {:ok, _user} = User.unblock(user, non_friend)
+        %{host: host} = URI.parse(non_friend.ap_id)
+        [host | domains]
+      end)
+
+    domains = Enum.uniq(domains)
+
+    Enum.each(domains, fn domain ->
+      {:ok, _} = User.block_domain(user, domain)
+    end)
+
+    user = User.get_by_id(user.id)
+    opts = Map.put(opts, "blocking_user", user)
+
+    Benchee.run(
+      %{
+        "public timeline with domain block" => fn opts ->
+          ActivityPub.fetch_public_activities(opts)
+        end
+      },
+      inputs: %{
+        "old filtering" => Map.delete(opts, :method),
+        "with psql fun" => Map.put(opts, :method, :fun),
+        "with unnest" => Map.put(opts, :method, :unnest)
+      }
+    )
+  end
+
   defp fetch_public_timeline(opts, title) when is_binary(title) do
     first_page_last = ActivityPub.fetch_public_activities(opts) |> List.last()
 
diff --git a/benchmarks/load_testing/users.ex b/benchmarks/load_testing/users.ex
index e4d0b22ff..6cf3958c1 100644
--- a/benchmarks/load_testing/users.ex
+++ b/benchmarks/load_testing/users.ex
@@ -27,7 +27,7 @@ def generate(opts \\ []) do
 
     make_friends(main_user, opts[:friends])
 
-    Repo.get(User, main_user.id)
+    User.get_by_id(main_user.id)
   end
 
   def generate_users(max) do
@@ -166,4 +166,24 @@ defp run_stream(users, main_user) do
     )
     |> Stream.run()
   end
+
+  @spec prepare_users(User.t(), keyword()) :: map()
+  def prepare_users(user, opts) do
+    friends_limit = opts[:friends_used]
+    non_friends_limit = opts[:non_friends_used]
+
+    %{
+      user: user,
+      friends_local: fetch_users(user, friends_limit, :local, true),
+      friends_remote: fetch_users(user, friends_limit, :external, true),
+      non_friends_local: fetch_users(user, non_friends_limit, :local, false),
+      non_friends_remote: fetch_users(user, non_friends_limit, :external, false)
+    }
+  end
+
+  defp fetch_users(user, limit, local, friends?) do
+    user
+    |> get_users(limit: limit, local: local, friends?: friends?)
+    |> Enum.shuffle()
+  end
 end
diff --git a/benchmarks/mix/tasks/pleroma/benchmarks/tags.ex b/benchmarks/mix/tasks/pleroma/benchmarks/tags.ex
index 657403202..1162b2e06 100644
--- a/benchmarks/mix/tasks/pleroma/benchmarks/tags.ex
+++ b/benchmarks/mix/tasks/pleroma/benchmarks/tags.ex
@@ -5,7 +5,6 @@ defmodule Mix.Tasks.Pleroma.Benchmarks.Tags do
   import Ecto.Query
 
   alias Pleroma.Repo
-  alias Pleroma.Web.MastodonAPI.TimelineController
 
   def run(_args) do
     Mix.Pleroma.start_pleroma()
@@ -37,7 +36,7 @@ def run(_args) do
     Benchee.run(
       %{
         "Hashtag fetching, any" => fn tags ->
-          TimelineController.hashtag_fetching(
+          hashtag_fetching(
             %{
               "any" => tags
             },
@@ -47,7 +46,7 @@ def run(_args) do
         end,
         # Will always return zero results because no overlapping hashtags are generated.
         "Hashtag fetching, all" => fn tags ->
-          TimelineController.hashtag_fetching(
+          hashtag_fetching(
             %{
               "all" => tags
             },
@@ -67,7 +66,7 @@ def run(_args) do
     Benchee.run(
       %{
         "Hashtag fetching" => fn tag ->
-          TimelineController.hashtag_fetching(
+          hashtag_fetching(
             %{
               "tag" => tag
             },
@@ -80,4 +79,35 @@ def run(_args) do
       time: 5
     )
   end
+
+  defp hashtag_fetching(params, user, local_only) do
+    tags =
+      [params["tag"], params["any"]]
+      |> List.flatten()
+      |> Enum.uniq()
+      |> Enum.filter(& &1)
+      |> Enum.map(&String.downcase(&1))
+
+    tag_all =
+      params
+      |> Map.get("all", [])
+      |> Enum.map(&String.downcase(&1))
+
+    tag_reject =
+      params
+      |> Map.get("none", [])
+      |> Enum.map(&String.downcase(&1))
+
+    _activities =
+      params
+      |> Map.put("type", "Create")
+      |> Map.put("local_only", local_only)
+      |> Map.put("blocking_user", user)
+      |> Map.put("muting_user", user)
+      |> Map.put("user", user)
+      |> Map.put("tag", tags)
+      |> Map.put("tag_all", tag_all)
+      |> Map.put("tag_reject", tag_reject)
+      |> Pleroma.Web.ActivityPub.ActivityPub.fetch_public_activities()
+  end
 end
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex
index b8a2873d8..e7958f7a8 100644
--- a/lib/pleroma/web/activity_pub/activity_pub.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub.ex
@@ -932,6 +932,33 @@ defp restrict_blocked(query, %{"blocking_user" => %User{} = user} = opts) do
     query =
       if has_named_binding?(query, :object), do: query, else: Activity.with_joined_object(query)
 
+    # TODO: update after benchmarks
+    query =
+      case opts[:method] do
+        :fun ->
+          from(a in query,
+            where:
+              fragment(
+                "recipients_contain_blocked_domains(?, ?) = false",
+                a.recipients,
+                ^domain_blocks
+              )
+          )
+
+        :unnest ->
+          from(a in query,
+            where:
+              fragment(
+                "NOT ? && (SELECT ARRAY(SELECT split_part(UNNEST(?), '/', 3)))",
+                ^domain_blocks,
+                a.recipients
+              )
+          )
+
+        _ ->
+          query
+      end
+
     from(
       [activity, object: o] in query,
       where: fragment("not (? = ANY(?))", activity.actor, ^blocked_ap_ids),
diff --git a/lib/pleroma/web/api_spec/operations/timeline_operation.ex b/lib/pleroma/web/api_spec/operations/timeline_operation.ex
index 8e19bace7..375b441a1 100644
--- a/lib/pleroma/web/api_spec/operations/timeline_operation.ex
+++ b/lib/pleroma/web/api_spec/operations/timeline_operation.ex
@@ -62,6 +62,13 @@ def public_operation do
         only_media_param(),
         with_muted_param(),
         exclude_visibilities_param(),
+        # TODO: remove after benchmarks
+        Operation.parameter(
+          :method,
+          :query,
+          %Schema{type: :string},
+          "Temp parameter"
+        ),
         reply_visibility_param() | pagination_params()
       ],
       operationId: "TimelineController.public",
diff --git a/lib/pleroma/web/mastodon_api/controllers/timeline_controller.ex b/lib/pleroma/web/mastodon_api/controllers/timeline_controller.ex
index 958567510..1734df4b5 100644
--- a/lib/pleroma/web/mastodon_api/controllers/timeline_controller.ex
+++ b/lib/pleroma/web/mastodon_api/controllers/timeline_controller.ex
@@ -109,14 +109,23 @@ def public(%{assigns: %{user: user}} = conn, params) do
     if restrict? and is_nil(user) do
       render_error(conn, :unauthorized, "authorization required for timeline view")
     else
-      activities =
+      # TODO: return back after benchmarks
+      params =
         params
         |> Map.put("type", ["Create", "Announce"])
         |> Map.put("local_only", local_only)
         |> Map.put("blocking_user", user)
         |> Map.put("muting_user", user)
         |> Map.put("reply_filtering_user", user)
-        |> ActivityPub.fetch_public_activities()
+
+      params =
+        if params["method"] do
+          Map.put(params, :method, String.to_existing_atom(params["method"]))
+        else
+          params
+        end
+
+      activities = ActivityPub.fetch_public_activities(params)
 
       conn
       |> add_link_headers(activities, %{"local" => local_only})
diff --git a/priv/repo/migrations/20200520155351_add_recipients_contain_blocked_domains_function.exs b/priv/repo/migrations/20200520155351_add_recipients_contain_blocked_domains_function.exs
new file mode 100644
index 000000000..14e873125
--- /dev/null
+++ b/priv/repo/migrations/20200520155351_add_recipients_contain_blocked_domains_function.exs
@@ -0,0 +1,33 @@
+defmodule Pleroma.Repo.Migrations.AddRecipientsContainBlockedDomainsFunction do
+  use Ecto.Migration
+  @disable_ddl_transaction true
+
+  def up do
+    statement = """
+    CREATE OR REPLACE FUNCTION recipients_contain_blocked_domains(recipients varchar[], blocked_domains varchar[]) RETURNS boolean AS $$
+    DECLARE
+      recipient_domain varchar;
+      recipient varchar;
+    BEGIN
+      FOREACH recipient IN ARRAY recipients LOOP
+        recipient_domain = split_part(recipient, '/', 3)::varchar;
+
+        IF recipient_domain = ANY(blocked_domains) THEN
+          RETURN TRUE;
+        END IF;
+      END LOOP;
+
+      RETURN FALSE;
+    END;
+    $$ LANGUAGE plpgsql;
+    """
+
+    execute(statement)
+  end
+
+  def down do
+    execute(
+      "drop function if exists recipients_contain_blocked_domains(recipients varchar[], blocked_domains varchar[])"
+    )
+  end
+end
diff --git a/test/web/mastodon_api/controllers/timeline_controller_test.exs b/test/web/mastodon_api/controllers/timeline_controller_test.exs
index 2375ac8e8..3474c0cf9 100644
--- a/test/web/mastodon_api/controllers/timeline_controller_test.exs
+++ b/test/web/mastodon_api/controllers/timeline_controller_test.exs
@@ -90,6 +90,74 @@ test "the public timeline includes only public statuses for an authenticated use
       res_conn = get(conn, "/api/v1/timelines/public")
       assert length(json_response_and_validate_schema(res_conn, 200)) == 1
     end
+
+    test "doesn't return replies if follower is posting with blocked user" do
+      %{conn: conn, user: blocker} = oauth_access(["read:statuses"])
+      [blockee, friend] = insert_list(2, :user)
+      {:ok, blocker} = User.follow(blocker, friend)
+      {:ok, _} = User.block(blocker, blockee)
+
+      conn = assign(conn, :user, blocker)
+
+      {:ok, %{id: activity_id} = activity} = CommonAPI.post(friend, %{status: "hey!"})
+
+      {:ok, reply_from_blockee} =
+        CommonAPI.post(blockee, %{status: "heya", in_reply_to_status_id: activity})
+
+      {:ok, _reply_from_friend} =
+        CommonAPI.post(friend, %{status: "status", in_reply_to_status_id: reply_from_blockee})
+
+      res_conn = get(conn, "/api/v1/timelines/public")
+      [%{"id" => ^activity_id}] = json_response_and_validate_schema(res_conn, 200)
+    end
+
+    # TODO: update after benchmarks
+    test "doesn't return replies if follow is posting with users from blocked domain" do
+      %{conn: conn, user: blocker} = oauth_access(["read:statuses"])
+      friend = insert(:user)
+      blockee = insert(:user, ap_id: "https://example.com/users/blocked")
+      {:ok, blocker} = User.follow(blocker, friend)
+      {:ok, blocker} = User.block_domain(blocker, "example.com")
+
+      conn = assign(conn, :user, blocker)
+
+      {:ok, %{id: activity_id} = activity} = CommonAPI.post(friend, %{status: "hey!"})
+
+      {:ok, reply_from_blockee} =
+        CommonAPI.post(blockee, %{status: "heya", in_reply_to_status_id: activity})
+
+      {:ok, _reply_from_friend} =
+        CommonAPI.post(friend, %{status: "status", in_reply_to_status_id: reply_from_blockee})
+
+      res_conn = get(conn, "/api/v1/timelines/public?method=fun")
+
+      activities = json_response_and_validate_schema(res_conn, 200)
+      [%{"id" => ^activity_id}] = activities
+    end
+
+    # TODO: update after benchmarks
+    test "doesn't return replies if follow is posting with users from blocked domain with unnest param" do
+      %{conn: conn, user: blocker} = oauth_access(["read:statuses"])
+      friend = insert(:user)
+      blockee = insert(:user, ap_id: "https://example.com/users/blocked")
+      {:ok, blocker} = User.follow(blocker, friend)
+      {:ok, blocker} = User.block_domain(blocker, "example.com")
+
+      conn = assign(conn, :user, blocker)
+
+      {:ok, %{id: activity_id} = activity} = CommonAPI.post(friend, %{status: "hey!"})
+
+      {:ok, reply_from_blockee} =
+        CommonAPI.post(blockee, %{status: "heya", in_reply_to_status_id: activity})
+
+      {:ok, _reply_from_friend} =
+        CommonAPI.post(friend, %{status: "status", in_reply_to_status_id: reply_from_blockee})
+
+      res_conn = get(conn, "/api/v1/timelines/public?method=unnest")
+
+      activities = json_response_and_validate_schema(res_conn, 200)
+      [%{"id" => ^activity_id}] = activities
+    end
   end
 
   defp local_and_remote_activities do