Add a worker to fetch new replies to known messages. #481
4 changed files with 86 additions and 1 deletions
|
@ -568,7 +568,8 @@
|
|||
mute_expire: 5,
|
||||
search_indexing: 10,
|
||||
nodeinfo_fetcher: 1,
|
||||
database_prune: 1
|
||||
database_prune: 1,
|
||||
reply_refresher: 50,
|
||||
],
|
||||
plugins: [
|
||||
Oban.Plugins.Pruner,
|
||||
|
|
|
@ -53,6 +53,12 @@ defp objects_from_collection(%{"type" => type, "first" => %{"id" => id}})
|
|||
fetch_page_items(id)
|
||||
end
|
||||
|
||||
defp objects_from_collection(%{"type" => type, "first" => %{"items" => items} = first})
|
||||
when type in ["Collection", "OrderedCollection"] and is_list(items) do
|
||||
maybe_next_page(first, items)
|
||||
end
|
||||
|
||||
|
||||
defp objects_from_collection(_page), do: []
|
||||
|
||||
defp fetch_page_items(id, items \\ []) do
|
||||
|
|
|
@ -22,6 +22,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
|
|||
alias Pleroma.Web.Push
|
||||
alias Pleroma.Web.Streamer
|
||||
alias Pleroma.Workers.PollWorker
|
||||
alias Pleroma.Workers.ReplyRefresherWorker
|
||||
|
||||
require Pleroma.Constants
|
||||
require Logger
|
||||
|
@ -230,6 +231,7 @@ def handle(%{data: %{"type" => "Create"}} = activity, meta) do
|
|||
end)
|
||||
|
||||
Pleroma.Search.add_to_index(Map.put(activity, :object, object))
|
||||
ReplyRefresherWorker.schedule(object.data["id"])
|
||||
|
||||
meta =
|
||||
meta
|
||||
|
|
76
lib/pleroma/workers/reply_refresh_worker.ex
Normal file
76
lib/pleroma/workers/reply_refresh_worker.ex
Normal file
|
@ -0,0 +1,76 @@
|
|||
defmodule Pleroma.Workers.ReplyRefresherWorker do
|
||||
@moduledoc """
|
||||
The worker to pull new replies to discovered posts.
|
||||
"""
|
||||
|
||||
use Pleroma.Workers.WorkerHelper, queue: "reply_refresher"
|
||||
|
||||
alias Pleroma.Object
|
||||
alias Pleroma.Object.Fetcher
|
||||
alias Pleroma.Workers.ReplyRefresherWorker
|
||||
|
||||
@backoff_time {60, 600, 3600, 6 * 3600, 24 * 3600, 3 * 24 * 3600}
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(%Job{args: %{"object_id" => object_id, "run_count" => run_count}}) do
|
||||
case process_object(object_id) do
|
||||
{:ok, true} -> reschedule(object_id, 0)
|
||||
{:ok, false} -> reschedule(object_id, run_count + 1)
|
||||
e -> e
|
||||
end
|
||||
end
|
||||
|
||||
def schedule(id) do
|
||||
%{object_id: id, run_count: 0}
|
||||
|> ReplyRefresherWorker.new(schedule_in: elem(@backoff_time, 0))
|
||||
|> Oban.insert()
|
||||
end
|
||||
|
||||
defp reschedule(id, run_count) when run_count < tuple_size(@backoff_time) do
|
||||
%{object_id: id, run_count: run_count}
|
||||
|> ReplyRefresherWorker.new(schedule_in: elem(@backoff_time, run_count))
|
||||
|> Oban.insert()
|
||||
{:ok, nil}
|
||||
end
|
||||
|
||||
defp reschedule(_, _) do
|
||||
{:ok, nil}
|
||||
end
|
||||
|
||||
defp reply_to_id(id) when is_binary(id) do
|
||||
id
|
||||
end
|
||||
defp reply_to_id(%{"id" => id}) do
|
||||
id
|
||||
end
|
||||
defp process_reply(reply) do
|
||||
id = reply_to_id(reply)
|
||||
with {_, nil} <- {:find, Object.get_by_ap_id(id)},
|
||||
{_, {:ok, _}} <- {:fetch, Fetcher.fetch_object_from_id(id)} do
|
||||
{:ok, true}
|
||||
else
|
||||
{:find, _obj} -> {:ok, false}
|
||||
{:fetch, err} -> {:ok, err}
|
||||
end
|
||||
end
|
||||
defp process_object(id) do
|
||||
obj = Object.get_by_ap_id(id)
|
||||
with {:ok, new_obj} <- Fetcher.refetch_object(obj),
|
||||
{:ok, replies} <- fetch_reply_list(new_obj.data["replies"]) do
|
||||
Enum.reduce_while(Enum.map(replies, &process_reply/1), {:ok, false}, fn x, acc ->
|
||||
case x do
|
||||
{:ok, false} -> {:cont, acc}
|
||||
{:ok, true} -> {:cont, {:ok, true}}
|
||||
e -> {:halt, e}
|
||||
end
|
||||
end)
|
||||
end
|
||||
end
|
||||
defp fetch_reply_list(replies) when is_list(replies) do
|
||||
{:ok, replies}
|
||||
end
|
||||
defp fetch_reply_list(%{"type" => type} = replies)
|
||||
when type in ["Collection", "OrderedCollection", "CollectionPage", "OrderedCollectionPage"] do
|
||||
Akkoma.Collections.Fetcher.fetch_collection(replies)
|
||||
end
|
||||
end
|
Loading…
Reference in a new issue