[#1505] Restricted max thread distance for fetching replies on incoming federation (in addition to reply-to depth restriction).

This commit is contained in:
Ivan Tashkinov 2020-02-15 20:41:38 +03:00
parent b95dd5e217
commit 269d592181
8 changed files with 120 additions and 31 deletions

View file

@ -659,7 +659,7 @@
label: "Fed. incoming replies max depth", label: "Fed. incoming replies max depth",
type: :integer, type: :integer,
description: description:
"Max. depth of reply-to activities fetching on incoming federation, to prevent out-of-memory situations while" <> "Max. depth of reply-to and reply activities fetching on incoming federation, to prevent out-of-memory situations while" <>
" fetching very long threads. If set to `nil`, threads of any depth will be fetched. Lower this value if you experience out-of-memory crashes.", " fetching very long threads. If set to `nil`, threads of any depth will be fetched. Lower this value if you experience out-of-memory crashes.",
suggestions: [ suggestions: [
100 100

View file

@ -41,7 +41,7 @@ On the top right you will also see a wrench icon. This opens your personal setti
This is where the interesting stuff happens! This is where the interesting stuff happens!
Depending on the timeline you will see different statuses, but each status has a standard structure: Depending on the timeline you will see different statuses, but each status has a standard structure:
- Profile pic, name and link to profile. An optional left-arrow if it's a reply to another status (hovering will reveal the replied-to status). Clicking on the profile pic will uncollapse the user's profile. - Profile pic, name and link to profile. An optional left-arrow if it's a reply to another status (hovering will reveal the reply-to status). Clicking on the profile pic will uncollapse the user's profile.
- A `+` button on the right allows you to Expand/Collapse an entire discussion thread. It also updates in realtime! - A `+` button on the right allows you to Expand/Collapse an entire discussion thread. It also updates in realtime!
- An arrow icon allows you to open the status on the instance where it's originating from. - An arrow icon allows you to open the status on the instance where it's originating from.
- The text of the status, including mentions and attachements. If you click on a mention, it will automatically open the profile page of that person. - The text of the status, including mentions and attachements. If you click on a mention, it will automatically open the profile page of that person.

View file

@ -10,6 +10,7 @@ defmodule Pleroma.Object.Fetcher do
alias Pleroma.Signature alias Pleroma.Signature
alias Pleroma.Web.ActivityPub.InternalFetchActor alias Pleroma.Web.ActivityPub.InternalFetchActor
alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.Federator
require Logger require Logger
require Pleroma.Constants require Pleroma.Constants
@ -59,20 +60,23 @@ def refetch_object(%Object{data: %{"id" => id}} = object) do
end end
end end
# TODO: # Note: will create a Create activity, which we need internally at the moment.
# This will create a Create activity, which we need internally at the moment.
def fetch_object_from_id(id, options \\ []) do def fetch_object_from_id(id, options \\ []) do
with {:fetch_object, nil} <- {:fetch_object, Object.get_cached_by_ap_id(id)}, with {_, nil} <- {:fetch_object, Object.get_cached_by_ap_id(id)},
{:fetch, {:ok, data}} <- {:fetch, fetch_and_contain_remote_object_from_id(id)}, {_, true} <- {:allowed_depth, Federator.allowed_thread_distance?(options[:depth])},
{:normalize, nil} <- {:normalize, Object.normalize(data, false)}, {_, {:ok, data}} <- {:fetch, fetch_and_contain_remote_object_from_id(id)},
{_, nil} <- {:normalize, Object.normalize(data, false)},
params <- prepare_activity_params(data), params <- prepare_activity_params(data),
{:containment, :ok} <- {:containment, Containment.contain_origin(id, params)}, {_, :ok} <- {:containment, Containment.contain_origin(id, params)},
{:transmogrifier, {:ok, activity}} <- {_, {:ok, activity}} <-
{:transmogrifier, Transmogrifier.handle_incoming(params, options)}, {:transmogrifier, Transmogrifier.handle_incoming(params, options)},
{:object, _data, %Object{} = object} <- {_, _data, %Object{} = object} <-
{:object, data, Object.normalize(activity, false)} do {:object, data, Object.normalize(activity, false)} do
{:ok, object} {:ok, object}
else else
{:allowed_depth, false} ->
{:error, "Max thread distance exceeded."}
{:containment, _} -> {:containment, _} ->
{:error, "Object containment failed."} {:error, "Object containment failed."}

View file

@ -156,8 +156,9 @@ def fix_in_reply_to(%{"inReplyTo" => in_reply_to} = object, options)
when not is_nil(in_reply_to) do when not is_nil(in_reply_to) do
in_reply_to_id = prepare_in_reply_to(in_reply_to) in_reply_to_id = prepare_in_reply_to(in_reply_to)
object = Map.put(object, "inReplyToAtomUri", in_reply_to_id) object = Map.put(object, "inReplyToAtomUri", in_reply_to_id)
depth = (options[:depth] || 0) + 1
if Federator.allowed_incoming_reply_depth?(options[:depth]) do if Federator.allowed_thread_distance?(depth) do
with {:ok, replied_object} <- get_obj_helper(in_reply_to_id, options), with {:ok, replied_object} <- get_obj_helper(in_reply_to_id, options),
%Activity{} = _ <- Activity.get_create_by_object_ap_id(replied_object.data["id"]) do %Activity{} = _ <- Activity.get_create_by_object_ap_id(replied_object.data["id"]) do
object object
@ -312,7 +313,7 @@ def fix_type(object, options \\ [])
def fix_type(%{"inReplyTo" => reply_id, "name" => _} = object, options) def fix_type(%{"inReplyTo" => reply_id, "name" => _} = object, options)
when is_binary(reply_id) do when is_binary(reply_id) do
with true <- Federator.allowed_incoming_reply_depth?(options[:depth]), with true <- Federator.allowed_thread_distance?(options[:depth]),
{:ok, %{data: %{"type" => "Question"} = _} = _} <- get_obj_helper(reply_id, options) do {:ok, %{data: %{"type" => "Question"} = _} = _} <- get_obj_helper(reply_id, options) do
Map.put(object, "type", "Answer") Map.put(object, "type", "Answer")
else else
@ -406,7 +407,6 @@ def handle_incoming(
with nil <- Activity.get_create_by_object_ap_id(object["id"]), with nil <- Activity.get_create_by_object_ap_id(object["id"]),
{:ok, %User{} = user} <- User.get_or_fetch_by_ap_id(data["actor"]) do {:ok, %User{} = user} <- User.get_or_fetch_by_ap_id(data["actor"]) do
options = Keyword.put(options, :depth, (options[:depth] || 0) + 1)
object = fix_object(object, options) object = fix_object(object, options)
params = %{ params = %{
@ -425,8 +425,15 @@ def handle_incoming(
} }
with {:ok, created_activity} <- ActivityPub.create(params) do with {:ok, created_activity} <- ActivityPub.create(params) do
reply_depth = (options[:depth] || 0) + 1
if Federator.allowed_thread_distance?(reply_depth) do
for reply_id <- replies(object) do for reply_id <- replies(object) do
Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{"id" => reply_id}) Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
"id" => reply_id,
"depth" => reply_depth
})
end
end end
{:ok, created_activity} {:ok, created_activity}
@ -448,7 +455,8 @@ def handle_incoming(
|> fix_addressing |> fix_addressing
with {:ok, %User{} = user} <- User.get_or_fetch_by_ap_id(data["actor"]) do with {:ok, %User{} = user} <- User.get_or_fetch_by_ap_id(data["actor"]) do
options = Keyword.put(options, :depth, (options[:depth] || 0) + 1) reply_depth = (options[:depth] || 0) + 1
options = Keyword.put(options, :depth, reply_depth)
object = fix_object(object, options) object = fix_object(object, options)
params = %{ params = %{

View file

@ -15,13 +15,19 @@ defmodule Pleroma.Web.Federator do
require Logger require Logger
@doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)" @doc """
Returns `true` if the distance to target object does not exceed max configured value.
Serves to prevent fetching of very long threads, especially useful on smaller instances.
Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161).
Applies to fetching of both ancestor (reply-to) and child (reply) objects.
"""
# credo:disable-for-previous-line Credo.Check.Readability.MaxLineLength # credo:disable-for-previous-line Credo.Check.Readability.MaxLineLength
def allowed_incoming_reply_depth?(depth) do def allowed_thread_distance?(distance) do
max_replies_depth = Pleroma.Config.get([:instance, :federation_incoming_replies_max_depth]) max_distance = Pleroma.Config.get([:instance, :federation_incoming_replies_max_depth])
if max_replies_depth do if max_distance && max_distance >= 0 do
(depth || 1) <= max_replies_depth # Default depth is 0 (an object has zero distance from itself in its thread)
(distance || 0) <= max_distance
else else
true true
end end

View file

@ -12,9 +12,9 @@ def perform(
%{ %{
"op" => "fetch_remote", "op" => "fetch_remote",
"id" => id "id" => id
}, } = args,
_job _job
) do ) do
{:ok, _object} = Fetcher.fetch_object_from_id(id) {:ok, _object} = Fetcher.fetch_object_from_id(id, depth: args["depth"])
end end
end end

View file

@ -26,6 +26,31 @@ defmodule Pleroma.Object.FetcherTest do
:ok :ok
end end
describe "max thread distance restriction" do
@ap_id "http://mastodon.example.org/@admin/99541947525187367"
clear_config([:instance, :federation_incoming_replies_max_depth])
test "it returns thread depth exceeded error if thread depth is exceeded" do
Pleroma.Config.put([:instance, :federation_incoming_replies_max_depth], 0)
assert {:error, "Max thread distance exceeded."} =
Fetcher.fetch_object_from_id(@ap_id, depth: 1)
end
test "it fetches object if max thread depth is restricted to 0 and depth is not specified" do
Pleroma.Config.put([:instance, :federation_incoming_replies_max_depth], 0)
assert {:ok, _} = Fetcher.fetch_object_from_id(@ap_id)
end
test "it fetches object if requested depth does not exceed max thread depth" do
Pleroma.Config.put([:instance, :federation_incoming_replies_max_depth], 10)
assert {:ok, _} = Fetcher.fetch_object_from_id(@ap_id, depth: 10)
end
end
describe "actor origin containment" do describe "actor origin containment" do
test "it rejects objects with a bogus origin" do test "it rejects objects with a bogus origin" do
{:error, _} = Fetcher.fetch_object_from_id("https://info.pleroma.site/activity.json") {:error, _} = Fetcher.fetch_object_from_id("https://info.pleroma.site/activity.json")

View file

@ -42,7 +42,7 @@ test "it ignores an incoming notice if we already have it" do
end end
@tag capture_log: true @tag capture_log: true
test "it fetches replied-to activities if we don't have them" do test "it fetches reply-to activities if we don't have them" do
data = data =
File.read!("test/fixtures/mastodon-post-activity.json") File.read!("test/fixtures/mastodon-post-activity.json")
|> Poison.decode!() |> Poison.decode!()
@ -63,7 +63,7 @@ test "it fetches replied-to activities if we don't have them" do
assert returned_object.data["inReplyToAtomUri"] == "https://shitposter.club/notice/2827873" assert returned_object.data["inReplyToAtomUri"] == "https://shitposter.club/notice/2827873"
end end
test "it does not fetch replied-to activities beyond max_replies_depth" do test "it does not fetch reply-to activities beyond max replies depth limit" do
data = data =
File.read!("test/fixtures/mastodon-post-activity.json") File.read!("test/fixtures/mastodon-post-activity.json")
|> Poison.decode!() |> Poison.decode!()
@ -75,7 +75,7 @@ test "it does not fetch replied-to activities beyond max_replies_depth" do
data = Map.put(data, "object", object) data = Map.put(data, "object", object)
with_mock Pleroma.Web.Federator, with_mock Pleroma.Web.Federator,
allowed_incoming_reply_depth?: fn _ -> false end do allowed_thread_distance?: fn _ -> false end do
{:ok, returned_activity} = Transmogrifier.handle_incoming(data) {:ok, returned_activity} = Transmogrifier.handle_incoming(data)
returned_object = Object.normalize(returned_activity, false) returned_object = Object.normalize(returned_activity, false)
@ -1350,12 +1350,14 @@ test "it accepts Move activities" do
end end
end end
describe "handle_incoming/2: `replies` handling:" do describe "`handle_incoming/2`, Mastodon format `replies` handling" do
clear_config([:activitypub, :note_replies_output_limit]) do clear_config([:activitypub, :note_replies_output_limit]) do
Pleroma.Config.put([:activitypub, :note_replies_output_limit], 5) Pleroma.Config.put([:activitypub, :note_replies_output_limit], 5)
end end
test "with Mastodon-formatted `replies` collection, it schedules background fetching of items" do clear_config([:instance, :federation_incoming_replies_max_depth])
setup do
data = data =
"test/fixtures/mastodon-post-activity.json" "test/fixtures/mastodon-post-activity.json"
|> File.read!() |> File.read!()
@ -1364,15 +1366,41 @@ test "with Mastodon-formatted `replies` collection, it schedules background fetc
items = get_in(data, ["object", "replies", "first", "items"]) items = get_in(data, ["object", "replies", "first", "items"])
assert length(items) > 0 assert length(items) > 0
%{data: data, items: items}
end
test "schedules background fetching of `replies` items if max thread depth limit allows", %{
data: data,
items: items
} do
Pleroma.Config.put([:instance, :federation_incoming_replies_max_depth], 10)
{:ok, _activity} = Transmogrifier.handle_incoming(data) {:ok, _activity} = Transmogrifier.handle_incoming(data)
for id <- items do for id <- items do
job_args = %{"op" => "fetch_remote", "id" => id} job_args = %{"op" => "fetch_remote", "id" => id, "depth" => 1}
assert_enqueued(worker: Pleroma.Workers.RemoteFetcherWorker, args: job_args) assert_enqueued(worker: Pleroma.Workers.RemoteFetcherWorker, args: job_args)
end end
end end
test "with Pleroma-formatted `replies` collection, it schedules background fetching of items" do test "does NOT schedule background fetching of `replies` beyond max thread depth limit allows",
%{data: data} do
Pleroma.Config.put([:instance, :federation_incoming_replies_max_depth], 0)
{:ok, _activity} = Transmogrifier.handle_incoming(data)
assert all_enqueued(worker: Pleroma.Workers.RemoteFetcherWorker) == []
end
end
describe "`handle_incoming/2`, Pleroma format `replies` handling" do
clear_config([:activitypub, :note_replies_output_limit]) do
Pleroma.Config.put([:activitypub, :note_replies_output_limit], 5)
end
clear_config([:instance, :federation_incoming_replies_max_depth])
setup do
user = insert(:user) user = insert(:user)
{:ok, activity} = CommonAPI.post(user, %{"status" => "post1"}) {:ok, activity} = CommonAPI.post(user, %{"status" => "post1"})
@ -1390,13 +1418,31 @@ test "with Pleroma-formatted `replies` collection, it schedules background fetch
Repo.delete(activity.object) Repo.delete(activity.object)
Repo.delete(activity) Repo.delete(activity)
%{federation_output: federation_output, replies_uris: replies_uris}
end
test "schedules background fetching of `replies` items if max thread depth limit allows", %{
federation_output: federation_output,
replies_uris: replies_uris
} do
Pleroma.Config.put([:instance, :federation_incoming_replies_max_depth], 1)
{:ok, _activity} = Transmogrifier.handle_incoming(federation_output) {:ok, _activity} = Transmogrifier.handle_incoming(federation_output)
for id <- replies_uris do for id <- replies_uris do
job_args = %{"op" => "fetch_remote", "id" => id} job_args = %{"op" => "fetch_remote", "id" => id, "depth" => 1}
assert_enqueued(worker: Pleroma.Workers.RemoteFetcherWorker, args: job_args) assert_enqueued(worker: Pleroma.Workers.RemoteFetcherWorker, args: job_args)
end end
end end
test "does NOT schedule background fetching of `replies` beyond max thread depth limit allows",
%{federation_output: federation_output} do
Pleroma.Config.put([:instance, :federation_incoming_replies_max_depth], 0)
{:ok, _activity} = Transmogrifier.handle_incoming(federation_output)
assert all_enqueued(worker: Pleroma.Workers.RemoteFetcherWorker) == []
end
end end
describe "prepare outgoing" do describe "prepare outgoing" do