From 6d65e8ecb09a61b33a5ac665202cb85de3b3875c Mon Sep 17 00:00:00 2001 From: Kahanis <6c41n7bt3x16@opayq.com> Date: Tue, 21 Feb 2023 16:02:39 +0100 Subject: [PATCH] Adds a worker to pull new replies to discovered messages. --- config/config.exs | 3 +- lib/pleroma/web/activity_pub/side_effects.ex | 2 + lib/pleroma/workers/reply_refresh_worker.ex | 76 ++++++++++++++++++++ 3 files changed, 80 insertions(+), 1 deletion(-) create mode 100644 lib/pleroma/workers/reply_refresh_worker.ex diff --git a/config/config.exs b/config/config.exs index 5eaa8ce76..596bc59a7 100644 --- a/config/config.exs +++ b/config/config.exs @@ -568,7 +568,8 @@ mute_expire: 5, search_indexing: 10, nodeinfo_fetcher: 1, - database_prune: 1 + database_prune: 1, + reply_refresher: 50, ], plugins: [ Oban.Plugins.Pruner, diff --git a/lib/pleroma/web/activity_pub/side_effects.ex b/lib/pleroma/web/activity_pub/side_effects.ex index 34617a218..37c43b605 100644 --- a/lib/pleroma/web/activity_pub/side_effects.ex +++ b/lib/pleroma/web/activity_pub/side_effects.ex @@ -22,6 +22,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do alias Pleroma.Web.Push alias Pleroma.Web.Streamer alias Pleroma.Workers.PollWorker + alias Pleroma.Workers.ReplyRefresherWorker require Pleroma.Constants require Logger @@ -230,6 +231,7 @@ def handle(%{data: %{"type" => "Create"}} = activity, meta) do end) Pleroma.Search.add_to_index(Map.put(activity, :object, object)) + ReplyRefresherWorker.schedule(object.data["id"]) meta = meta diff --git a/lib/pleroma/workers/reply_refresh_worker.ex b/lib/pleroma/workers/reply_refresh_worker.ex new file mode 100644 index 000000000..5bf4bc942 --- /dev/null +++ b/lib/pleroma/workers/reply_refresh_worker.ex @@ -0,0 +1,76 @@ +defmodule Pleroma.Workers.ReplyRefresherWorker do + @moduledoc """ + The worker to pull new replies to discovered posts. + """ + + use Pleroma.Workers.WorkerHelper, queue: "reply_refresher" + + alias Pleroma.Object + alias Pleroma.Object.Fetcher + alias Pleroma.Workers.ReplyRefresherWorker + + @backoff_time {60, 600, 3600, 6 * 3600, 24 * 3600, 3 * 24 * 3600} + + @impl Oban.Worker + def perform(%Job{args: %{"object_id" => object_id, "run_count" => run_count}}) do + case process_object(object_id) do + {:ok, true} -> reschedule(object_id, 0) + {:ok, false} -> reschedule(object_id, run_count + 1) + e -> e + end + end + + def schedule(id) do + %{object_id: id, run_count: 0} + |> ReplyRefresherWorker.new(schedule_in: elem(@backoff_time, 0)) + |> Oban.insert() + end + + defp reschedule(id, run_count) when run_count < tuple_size(@backoff_time) do + %{object_id: id, run_count: run_count} + |> ReplyRefresherWorker.new(schedule_in: elem(@backoff_time, run_count)) + |> Oban.insert() + {:ok, nil} + end + + defp reschedule(_, _) do + {:ok, nil} + end + + defp reply_to_id(id) when is_binary(id) do + id + end + defp reply_to_id(%{"id" => id}) do + id + end + defp process_reply(reply) do + id = reply_to_id(reply) + with {_, nil} <- {:find, Object.get_by_ap_id(id)}, + {_, {:ok, _}} <- {:fetch, Fetcher.fetch_object_from_id(id)} do + {:ok, true} + else + {:find, _obj} -> {:ok, false} + {:fetch, err} -> {:ok, err} + end + end + defp process_object(id) do + obj = Object.get_by_ap_id(id) + with {:ok, new_obj} <- Fetcher.refetch_object(obj), + {:ok, replies} <- fetch_reply_list(new_obj.data["replies"]) do + Enum.reduce_while(Enum.map(replies, &process_reply/1), {:ok, false}, fn x, acc -> + case x do + {:ok, false} -> {:cont, acc} + {:ok, true} -> {:cont, {:ok, true}} + e -> {:halt, e} + end + end) + end + end + defp fetch_reply_list(replies) when is_list(replies) do + {:ok, replies} + end + defp fetch_reply_list(%{"type" => type} = replies) + when type in ["Collection", "OrderedCollection", "CollectionPage", "OrderedCollectionPage"] do + Akkoma.Collections.Fetcher.fetch_collection(replies) + end +end