Add a worker to fetch new replies to known messages. #481

Closed
Ghost wants to merge 2 commits from (deleted):develop into develop
4 changed files with 86 additions and 1 deletions

View file

@ -568,7 +568,8 @@
mute_expire: 5,
search_indexing: 10,
nodeinfo_fetcher: 1,
database_prune: 1
database_prune: 1,
reply_refresher: 50,
],
plugins: [
Oban.Plugins.Pruner,

View file

@ -53,6 +53,12 @@ defp objects_from_collection(%{"type" => type, "first" => %{"id" => id}})
fetch_page_items(id)
end
defp objects_from_collection(%{"type" => type, "first" => %{"items" => items} = first})
when type in ["Collection", "OrderedCollection"] and is_list(items) do
maybe_next_page(first, items)
end
defp objects_from_collection(_page), do: []
defp fetch_page_items(id, items \\ []) do

View file

@ -22,6 +22,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
alias Pleroma.Web.Push
alias Pleroma.Web.Streamer
alias Pleroma.Workers.PollWorker
alias Pleroma.Workers.ReplyRefresherWorker
require Pleroma.Constants
require Logger
@ -230,6 +231,7 @@ def handle(%{data: %{"type" => "Create"}} = activity, meta) do
end)
Pleroma.Search.add_to_index(Map.put(activity, :object, object))
ReplyRefresherWorker.schedule(object.data["id"])
meta =
meta

View file

@ -0,0 +1,76 @@
defmodule Pleroma.Workers.ReplyRefresherWorker do
@moduledoc """
The worker to pull new replies to discovered posts.
"""
use Pleroma.Workers.WorkerHelper, queue: "reply_refresher"
alias Pleroma.Object
alias Pleroma.Object.Fetcher
alias Pleroma.Workers.ReplyRefresherWorker
@backoff_time {60, 600, 3600, 6 * 3600, 24 * 3600, 3 * 24 * 3600}
@impl Oban.Worker
def perform(%Job{args: %{"object_id" => object_id, "run_count" => run_count}}) do
case process_object(object_id) do
{:ok, true} -> reschedule(object_id, 0)
{:ok, false} -> reschedule(object_id, run_count + 1)
e -> e
end
end
def schedule(id) do
%{object_id: id, run_count: 0}
|> ReplyRefresherWorker.new(schedule_in: elem(@backoff_time, 0))
|> Oban.insert()
end
defp reschedule(id, run_count) when run_count < tuple_size(@backoff_time) do
%{object_id: id, run_count: run_count}
|> ReplyRefresherWorker.new(schedule_in: elem(@backoff_time, run_count))
|> Oban.insert()
{:ok, nil}
end
defp reschedule(_, _) do
{:ok, nil}
end
defp reply_to_id(id) when is_binary(id) do
id
end
defp reply_to_id(%{"id" => id}) do
id
end
defp process_reply(reply) do
id = reply_to_id(reply)
with {_, nil} <- {:find, Object.get_by_ap_id(id)},
{_, {:ok, _}} <- {:fetch, Fetcher.fetch_object_from_id(id)} do
{:ok, true}
else
{:find, _obj} -> {:ok, false}
{:fetch, err} -> {:ok, err}
end
end
defp process_object(id) do
obj = Object.get_by_ap_id(id)
with {:ok, new_obj} <- Fetcher.refetch_object(obj),
{:ok, replies} <- fetch_reply_list(new_obj.data["replies"]) do
Enum.reduce_while(Enum.map(replies, &process_reply/1), {:ok, false}, fn x, acc ->
case x do
{:ok, false} -> {:cont, acc}
{:ok, true} -> {:cont, {:ok, true}}
e -> {:halt, e}
end
end)
end
end
defp fetch_reply_list(replies) when is_list(replies) do
{:ok, replies}
end
defp fetch_reply_list(%{"type" => type} = replies)
when type in ["Collection", "OrderedCollection", "CollectionPage", "OrderedCollectionPage"] do
Akkoma.Collections.Fetcher.fetch_collection(replies)
end
end