reprocess follows on receipt

This commit is contained in:
FloatingGhost 2022-08-05 04:33:05 +01:00
parent 5fe2c61029
commit d54ef82e8e
3 changed files with 42 additions and 17 deletions

View file

@ -107,18 +107,22 @@ def persist(%{"type" => type} = object, meta) when type in @object_types do
@impl true
def persist(object, meta) do
with local <- Keyword.fetch!(meta, :local),
{recipients, _, _} <- get_recipients(object),
{:ok, activity} <-
Repo.insert(%Activity{
data: object,
local: local,
recipients: recipients,
actor: object["actor"]
}),
# TODO: add tests for expired activities, when Note type will be supported in new pipeline
{:ok, _} <- maybe_create_activity_expiration(activity) do
{:ok, activity, meta}
if meta[:activity] do
{:ok, meta[:activity], meta}
else
with local <- Keyword.fetch!(meta, :local),
{recipients, _, _} <- get_recipients(object),
{:ok, activity} <-
Repo.insert(%Activity{
data: object,
local: local,
recipients: recipients,
actor: object["actor"]
}),
# TODO: add tests for expired activities, when Note type will be supported in new pipeline
{:ok, _} <- maybe_create_activity_expiration(activity) do
{:ok, activity, meta}
end
end
end

View file

@ -509,12 +509,13 @@ def handle_incoming(%{"type" => type} = data, _options)
def handle_incoming(
%{"type" => type} = data,
_options
options
)
when type in ~w{Update Block Follow Accept Reject} do
with {:ok, %User{}} <- ObjectValidator.fetch_actor(data),
{:ok, activity, _} <-
Pipeline.common_pipeline(data, local: false) do
Pipeline.common_pipeline(data, Keyword.put(options, :local, false)) do
{:ok, activity}
end
end

View file

@ -86,17 +86,17 @@ def perform(:incoming_ap_doc, params) do
# NOTE: we use the actor ID to do the containment, this is fine because an
# actor shouldn't be acting on objects outside their own AP server.
with {_, {:ok, _user}} <- {:actor, ap_enabled_actor(actor)},
nil <- Activity.normalize(params["id"]),
{true, activity} <- should_process?(Activity.normalize(params["id"])),
{_, :ok} <-
{:correct_origin?, Containment.contain_origin_from_id(actor, params)},
{:ok, activity} <- Transmogrifier.handle_incoming(params) do
{:ok, activity} <- Transmogrifier.handle_incoming(params, activity: activity) do
{:ok, activity}
else
{:correct_origin?, _} ->
Logger.debug("Origin containment failure for #{params["id"]}")
{:error, :origin_containment_failed}
%Activity{} ->
{false, nil} ->
Logger.debug("Already had #{params["id"]}")
{:error, :already_present}
@ -111,6 +111,7 @@ def perform(:incoming_ap_doc, params) do
e ->
# Just drop those for now
IO.inspect(e)
Logger.debug(fn -> "Unhandled activity\n" <> Jason.encode!(params, pretty: true) end)
{:error, e}
end
@ -125,4 +126,23 @@ def ap_enabled_actor(id) do
ActivityPub.make_user_from_ap_id(id)
end
end
defp should_process?(%Activity{data: %{"type" => "Follow"}} = activity) do
# Misskey can resend follows when it shouldn't
# A follow/unfollow/follow in misskey land will die because it
# reuses the same ID
{true, activity}
end
defp should_process?(
%Activity{data: %{"type" => "Undo", "object" => %{"type" => "Follow"}}} = activity
) do
{true, activity}
end
defp should_process?(%Activity{} = e) do
{false, nil}
end
defp should_process?(_), do: {true, nil}
end