forked from AkkomaGang/akkoma
Compare commits
1 commit
develop
...
reprocess-
Author | SHA1 | Date | |
---|---|---|---|
d54ef82e8e |
3 changed files with 42 additions and 17 deletions
|
@ -107,18 +107,22 @@ def persist(%{"type" => type} = object, meta) when type in @object_types do
|
|||
|
||||
@impl true
|
||||
def persist(object, meta) do
|
||||
with local <- Keyword.fetch!(meta, :local),
|
||||
{recipients, _, _} <- get_recipients(object),
|
||||
{:ok, activity} <-
|
||||
Repo.insert(%Activity{
|
||||
data: object,
|
||||
local: local,
|
||||
recipients: recipients,
|
||||
actor: object["actor"]
|
||||
}),
|
||||
# TODO: add tests for expired activities, when Note type will be supported in new pipeline
|
||||
{:ok, _} <- maybe_create_activity_expiration(activity) do
|
||||
{:ok, activity, meta}
|
||||
if meta[:activity] do
|
||||
{:ok, meta[:activity], meta}
|
||||
else
|
||||
with local <- Keyword.fetch!(meta, :local),
|
||||
{recipients, _, _} <- get_recipients(object),
|
||||
{:ok, activity} <-
|
||||
Repo.insert(%Activity{
|
||||
data: object,
|
||||
local: local,
|
||||
recipients: recipients,
|
||||
actor: object["actor"]
|
||||
}),
|
||||
# TODO: add tests for expired activities, when Note type will be supported in new pipeline
|
||||
{:ok, _} <- maybe_create_activity_expiration(activity) do
|
||||
{:ok, activity, meta}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -509,12 +509,13 @@ def handle_incoming(%{"type" => type} = data, _options)
|
|||
|
||||
def handle_incoming(
|
||||
%{"type" => type} = data,
|
||||
_options
|
||||
options
|
||||
)
|
||||
when type in ~w{Update Block Follow Accept Reject} do
|
||||
|
||||
with {:ok, %User{}} <- ObjectValidator.fetch_actor(data),
|
||||
{:ok, activity, _} <-
|
||||
Pipeline.common_pipeline(data, local: false) do
|
||||
Pipeline.common_pipeline(data, Keyword.put(options, :local, false)) do
|
||||
{:ok, activity}
|
||||
end
|
||||
end
|
||||
|
|
|
@ -86,17 +86,17 @@ def perform(:incoming_ap_doc, params) do
|
|||
# NOTE: we use the actor ID to do the containment, this is fine because an
|
||||
# actor shouldn't be acting on objects outside their own AP server.
|
||||
with {_, {:ok, _user}} <- {:actor, ap_enabled_actor(actor)},
|
||||
nil <- Activity.normalize(params["id"]),
|
||||
{true, activity} <- should_process?(Activity.normalize(params["id"])),
|
||||
{_, :ok} <-
|
||||
{:correct_origin?, Containment.contain_origin_from_id(actor, params)},
|
||||
{:ok, activity} <- Transmogrifier.handle_incoming(params) do
|
||||
{:ok, activity} <- Transmogrifier.handle_incoming(params, activity: activity) do
|
||||
{:ok, activity}
|
||||
else
|
||||
{:correct_origin?, _} ->
|
||||
Logger.debug("Origin containment failure for #{params["id"]}")
|
||||
{:error, :origin_containment_failed}
|
||||
|
||||
%Activity{} ->
|
||||
{false, nil} ->
|
||||
Logger.debug("Already had #{params["id"]}")
|
||||
{:error, :already_present}
|
||||
|
||||
|
@ -111,6 +111,7 @@ def perform(:incoming_ap_doc, params) do
|
|||
|
||||
e ->
|
||||
# Just drop those for now
|
||||
IO.inspect(e)
|
||||
Logger.debug(fn -> "Unhandled activity\n" <> Jason.encode!(params, pretty: true) end)
|
||||
{:error, e}
|
||||
end
|
||||
|
@ -125,4 +126,23 @@ def ap_enabled_actor(id) do
|
|||
ActivityPub.make_user_from_ap_id(id)
|
||||
end
|
||||
end
|
||||
|
||||
defp should_process?(%Activity{data: %{"type" => "Follow"}} = activity) do
|
||||
# Misskey can resend follows when it shouldn't
|
||||
# A follow/unfollow/follow in misskey land will die because it
|
||||
# reuses the same ID
|
||||
{true, activity}
|
||||
end
|
||||
|
||||
defp should_process?(
|
||||
%Activity{data: %{"type" => "Undo", "object" => %{"type" => "Follow"}}} = activity
|
||||
) do
|
||||
{true, activity}
|
||||
end
|
||||
|
||||
defp should_process?(%Activity{} = e) do
|
||||
{false, nil}
|
||||
end
|
||||
|
||||
defp should_process?(_), do: {true, nil}
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue