forked from AkkomaGang/akkoma
Compare commits
1 commit
develop
...
reprocess-
Author | SHA1 | Date | |
---|---|---|---|
d54ef82e8e |
3 changed files with 42 additions and 17 deletions
|
@ -107,18 +107,22 @@ def persist(%{"type" => type} = object, meta) when type in @object_types do
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def persist(object, meta) do
|
def persist(object, meta) do
|
||||||
with local <- Keyword.fetch!(meta, :local),
|
if meta[:activity] do
|
||||||
{recipients, _, _} <- get_recipients(object),
|
{:ok, meta[:activity], meta}
|
||||||
{:ok, activity} <-
|
else
|
||||||
Repo.insert(%Activity{
|
with local <- Keyword.fetch!(meta, :local),
|
||||||
data: object,
|
{recipients, _, _} <- get_recipients(object),
|
||||||
local: local,
|
{:ok, activity} <-
|
||||||
recipients: recipients,
|
Repo.insert(%Activity{
|
||||||
actor: object["actor"]
|
data: object,
|
||||||
}),
|
local: local,
|
||||||
# TODO: add tests for expired activities, when Note type will be supported in new pipeline
|
recipients: recipients,
|
||||||
{:ok, _} <- maybe_create_activity_expiration(activity) do
|
actor: object["actor"]
|
||||||
{:ok, activity, meta}
|
}),
|
||||||
|
# TODO: add tests for expired activities, when Note type will be supported in new pipeline
|
||||||
|
{:ok, _} <- maybe_create_activity_expiration(activity) do
|
||||||
|
{:ok, activity, meta}
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -509,12 +509,13 @@ def handle_incoming(%{"type" => type} = data, _options)
|
||||||
|
|
||||||
def handle_incoming(
|
def handle_incoming(
|
||||||
%{"type" => type} = data,
|
%{"type" => type} = data,
|
||||||
_options
|
options
|
||||||
)
|
)
|
||||||
when type in ~w{Update Block Follow Accept Reject} do
|
when type in ~w{Update Block Follow Accept Reject} do
|
||||||
|
|
||||||
with {:ok, %User{}} <- ObjectValidator.fetch_actor(data),
|
with {:ok, %User{}} <- ObjectValidator.fetch_actor(data),
|
||||||
{:ok, activity, _} <-
|
{:ok, activity, _} <-
|
||||||
Pipeline.common_pipeline(data, local: false) do
|
Pipeline.common_pipeline(data, Keyword.put(options, :local, false)) do
|
||||||
{:ok, activity}
|
{:ok, activity}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -86,17 +86,17 @@ def perform(:incoming_ap_doc, params) do
|
||||||
# NOTE: we use the actor ID to do the containment, this is fine because an
|
# NOTE: we use the actor ID to do the containment, this is fine because an
|
||||||
# actor shouldn't be acting on objects outside their own AP server.
|
# actor shouldn't be acting on objects outside their own AP server.
|
||||||
with {_, {:ok, _user}} <- {:actor, ap_enabled_actor(actor)},
|
with {_, {:ok, _user}} <- {:actor, ap_enabled_actor(actor)},
|
||||||
nil <- Activity.normalize(params["id"]),
|
{true, activity} <- should_process?(Activity.normalize(params["id"])),
|
||||||
{_, :ok} <-
|
{_, :ok} <-
|
||||||
{:correct_origin?, Containment.contain_origin_from_id(actor, params)},
|
{:correct_origin?, Containment.contain_origin_from_id(actor, params)},
|
||||||
{:ok, activity} <- Transmogrifier.handle_incoming(params) do
|
{:ok, activity} <- Transmogrifier.handle_incoming(params, activity: activity) do
|
||||||
{:ok, activity}
|
{:ok, activity}
|
||||||
else
|
else
|
||||||
{:correct_origin?, _} ->
|
{:correct_origin?, _} ->
|
||||||
Logger.debug("Origin containment failure for #{params["id"]}")
|
Logger.debug("Origin containment failure for #{params["id"]}")
|
||||||
{:error, :origin_containment_failed}
|
{:error, :origin_containment_failed}
|
||||||
|
|
||||||
%Activity{} ->
|
{false, nil} ->
|
||||||
Logger.debug("Already had #{params["id"]}")
|
Logger.debug("Already had #{params["id"]}")
|
||||||
{:error, :already_present}
|
{:error, :already_present}
|
||||||
|
|
||||||
|
@ -111,6 +111,7 @@ def perform(:incoming_ap_doc, params) do
|
||||||
|
|
||||||
e ->
|
e ->
|
||||||
# Just drop those for now
|
# Just drop those for now
|
||||||
|
IO.inspect(e)
|
||||||
Logger.debug(fn -> "Unhandled activity\n" <> Jason.encode!(params, pretty: true) end)
|
Logger.debug(fn -> "Unhandled activity\n" <> Jason.encode!(params, pretty: true) end)
|
||||||
{:error, e}
|
{:error, e}
|
||||||
end
|
end
|
||||||
|
@ -125,4 +126,23 @@ def ap_enabled_actor(id) do
|
||||||
ActivityPub.make_user_from_ap_id(id)
|
ActivityPub.make_user_from_ap_id(id)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp should_process?(%Activity{data: %{"type" => "Follow"}} = activity) do
|
||||||
|
# Misskey can resend follows when it shouldn't
|
||||||
|
# A follow/unfollow/follow in misskey land will die because it
|
||||||
|
# reuses the same ID
|
||||||
|
{true, activity}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp should_process?(
|
||||||
|
%Activity{data: %{"type" => "Undo", "object" => %{"type" => "Follow"}}} = activity
|
||||||
|
) do
|
||||||
|
{true, activity}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp should_process?(%Activity{} = e) do
|
||||||
|
{false, nil}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp should_process?(_), do: {true, nil}
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue