WIP: Add backfilling of posts #846

Draft
tudbut wants to merge 19 commits from tudbut/akkoma:develop into develop
8 changed files with 112 additions and 10 deletions

View file

@ -371,7 +371,8 @@
note_replies_output_limit: 5, note_replies_output_limit: 5,
sign_object_fetches: true, sign_object_fetches: true,
authorized_fetch_mode: false, authorized_fetch_mode: false,
max_collection_objects: 50 max_collection_objects: 50,
outbox_refetch_cooldown: 5 * 60
config :pleroma, :streamer, config :pleroma, :streamer,
workers: 3, workers: 3,

View file

@ -210,6 +210,9 @@ defp log_fetch_error(id, error) do
Logger.error("Object rejected while fetching #{id} #{inspect(error)}") Logger.error("Object rejected while fetching #{id} #{inspect(error)}")
end end
# if its already an activity, dont wrap
defp prepare_activity_params(%{"object" => _} = data), do: data
defp prepare_activity_params(data) do defp prepare_activity_params(data) do
%{ %{
"type" => "Create", "type" => "Create",

View file

@ -152,6 +152,8 @@ defmodule Pleroma.User do
field(:also_known_as, {:array, ObjectValidators.ObjectID}, default: []) field(:also_known_as, {:array, ObjectValidators.ObjectID}, default: [])
field(:inbox, :string) field(:inbox, :string)
field(:shared_inbox, :string) field(:shared_inbox, :string)
field(:outbox, :string, default: nil)
field(:last_outbox_fetch, :naive_datetime, default: nil)
field(:last_active_at, :naive_datetime) field(:last_active_at, :naive_datetime)
field(:disclose_client, :boolean, default: true) field(:disclose_client, :boolean, default: true)
field(:pinned_objects, :map, default: %{}) field(:pinned_objects, :map, default: %{})
@ -471,6 +473,7 @@ def remote_user_changeset(struct \\ %User{local: false}, params) do
:ap_id, :ap_id,
:inbox, :inbox,
:shared_inbox, :shared_inbox,
:outbox,
:nickname, :nickname,
:avatar, :avatar,
:ap_enabled, :ap_enabled,
@ -2769,4 +2772,12 @@ def accepts_direct_messages?(%User{accepts_direct_messages_from: :everybody}, _)
def accepts_direct_messages?(%User{accepts_direct_messages_from: :nobody}, _), def accepts_direct_messages?(%User{accepts_direct_messages_from: :nobody}, _),
do: false do: false
def outbox_refreshed(%User{} = user) do
# now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
user
|> cast(%{last_outbox_fetch: NaiveDateTime.utc_now()}, [:last_outbox_fetch])
|> update_and_set_cache()
end
end end

View file

@ -1587,7 +1587,8 @@ defp object_to_user_data(data, additional) do
actor_type = data["type"] || "Person" actor_type = data["type"] || "Person"
featured_address = data["featured"] featured_address = data["featured"]
{:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address) {:ok, pinned_objects} = fetch_and_prepare_collection_from_ap_id(featured_address)
outbox_address = data["outbox"]
# first, check that the owner is correct # first, check that the owner is correct
signing_key = signing_key =
@ -1644,6 +1645,7 @@ defp object_to_user_data(data, additional) do
also_known_as: also_known_as, also_known_as: also_known_as,
signing_key: signing_key, signing_key: signing_key,
inbox: data["inbox"], inbox: data["inbox"],
outbox: outbox_address,
shared_inbox: shared_inbox, shared_inbox: shared_inbox,
pinned_objects: pinned_objects, pinned_objects: pinned_objects,
nickname: nickname nickname: nickname
@ -1790,7 +1792,7 @@ def maybe_handle_clashing_nickname(data) do
end end
end end
def pin_data_from_featured_collection(%{ def activity_data_from_collection(%{
"type" => "OrderedCollection", "type" => "OrderedCollection",
"first" => first "first" => first
}) do }) do
@ -1805,7 +1807,7 @@ def pin_data_from_featured_collection(%{
end end
end end
def pin_data_from_featured_collection( def activity_data_from_collection(
%{ %{
"type" => type "type" => type
} = collection } = collection
@ -1821,21 +1823,23 @@ def pin_data_from_featured_collection(
end) end)
end end
def pin_data_from_featured_collection(obj) do def activity_data_from_collection(obj) do
Logger.error("Could not parse featured collection #{inspect(obj)}") Logger.error("Could not parse collection #{inspect(obj)}")
%{} %{}
end end
def fetch_and_prepare_featured_from_ap_id(nil) do def fetch_and_prepare_collection_from_ap_id(nil) do
{:ok, %{}} {:ok, %{}}
end end
def fetch_and_prepare_featured_from_ap_id(ap_id) do def fetch_and_prepare_collection_from_ap_id(ap_id) do
Logger.info("Fetching collection #{ap_id}")
with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
{:ok, pin_data_from_featured_collection(data)} {:ok, activity_data_from_collection(data)}
else else
e -> e ->
Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}") Logger.error("Could not decode collection at fetch #{ap_id}, #{inspect(e)}")
{:ok, %{}} {:ok, %{}}
end end
end end
@ -1854,6 +1858,43 @@ def enqueue_pin_fetches(%{pinned_objects: pins}) do
def enqueue_pin_fetches(_), do: nil def enqueue_pin_fetches(_), do: nil
defp need_outbox_refresh?(last_fetch)
defp need_outbox_refresh?(nil), do: true
defp need_outbox_refresh?(%NaiveDateTime{} = last_fetch) do
NaiveDateTime.diff(NaiveDateTime.utc_now(), last_fetch) >
Config.get!([:activitypub, :outbox_refetch_cooldown])
end
def enqueue_outbox_fetches(
%{outbox: outbox_address, last_outbox_fetch: last_fetch, local: false} = user
) do
if need_outbox_refresh?(last_fetch) do
# enqueue a task to fetch the outbox
Logger.debug("Refetching outbox #{outbox_address}")
Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_outbox", %{
"id" => outbox_address
})
User.outbox_refreshed(user)
:ok
else
Logger.debug("Not refetching outbox (TTL not reached: #{last_fetch}, age #{NaiveDateTime.diff(NaiveDateTime.utc_now(), last_fetch)})")
end
:ok
end
def enqueue_outbox_fetches(%{local: true}), do: :ok
def enqueue_outbox_fetches(%{local: false} = user) do
make_user_from_ap_id(user.ap_id)
:ok
end
def make_user_from_ap_id(ap_id, additional \\ []) do def make_user_from_ap_id(ap_id, additional \\ []) do
user = User.get_cached_by_ap_id(ap_id) user = User.get_cached_by_ap_id(ap_id)

View file

@ -306,6 +306,8 @@ def show(%{assigns: %{user: for_user}} = conn, %{id: nickname_or_id} = params) d
def statuses(%{assigns: %{user: reading_user}} = conn, params) do def statuses(%{assigns: %{user: reading_user}} = conn, params) do
with %User{} = user <- User.get_cached_by_nickname_or_id(params.id, for: reading_user), with %User{} = user <- User.get_cached_by_nickname_or_id(params.id, for: reading_user),
:visible <- User.visible_for(user, reading_user) do :visible <- User.visible_for(user, reading_user) do
ActivityPub.enqueue_outbox_fetches(user)

imho this should be limited to, or there should at least be an option to limit this to, authenticated requests only. Processing new posts entailing pulling in their whole threads, new refeenced users and their pinned (now generally latest) posts and their threads and so on can become quite costly

Also no need to enquee fetched if the viewed profile is a local user

Furthermore, there prob should be a timeout (e.g. 60 min) so we don't constantly spam outbox fetches for popular remote users

imho this should be limited to, or there should at least be an option to limit this to, authenticated requests only. Processing new posts entailing pulling in their whole threads, new refeenced users and their ~~pinned~~ *(now generally latest)* posts and their threads and so on can become quite costly Also no need to enquee fetched if the viewed profile is a local user Furthermore, there prob should be a timeout *(e.g. 60 min)* so we don't constantly spam outbox fetches for popular remote users

we do always fetch the outbox, but activities in it are still cached. a timeout may make sense either way.

local users never get an outbox address property, its only added to the user object when its a remote one. so technically we're already not fetching local outboxes. making that more explicit makes sense tho

we do always fetch the outbox, but activities in it are still cached. a timeout may make sense either way. local users never get an outbox address property, its only added to the user object when its a remote one. so technically we're already not fetching local outboxes. making that more explicit makes sense tho

we do always fetch the outbox, but activities in it are still cached.

This is insufficent, since:

  • it still spams remote servers with many requests (due to default uniqueness job constraint: 1 request per second and per remote user); this is not nice on remote servers and might aalso end up creating federation problems for us if we get timeouted due to it
  • it creates unecessary Oban job churn (yes this can matter for perf and also increases PostgreSQL bloat from deleted and not-yet-vacuumed rows; more on that in a future PR once i get to finish what i have on disk)

And also doesn’t address the limitation to authenticated users only. As I mentioned this can be costly and I don’t want unauthenticated users browsing user profiles to ever be able to create such a significant workload for the instance for profiles actual local users might never even look at. (Letting unauthenticated viewers trigger this also exacerbates the request spam issue)

> we do always fetch the outbox, but activities in it are still cached. This is insufficent, since: - it still spams remote servers with many requests *(due to default uniqueness job constraint: 1 request per second and per remote user)*; this is not nice on remote servers and might aalso end up creating federation problems for us if we get timeouted due to it - it creates unecessary Oban job churn *(yes this can matter for perf and also increases PostgreSQL bloat from deleted and not-yet-vacuumed rows; more on that in a future PR once i get to finish what i have on disk)* And also doesn’t address the limitation to authenticated users only. As I mentioned this can be costly and I don’t want unauthenticated users browsing user profiles to ever be able to create such a significant workload for the instance for profiles actual local users might never even look at. *(Letting unauthenticated viewers trigger this also exacerbates the request spam issue)*

I've made it only fetch once per minute per user, and ignore local users. If one minute is too small, I propose making this a setting, though I'm not familiar with how to do that at this point. 10 mins should also be fine to hard-code, but an hour feels a little much.

I've made it only fetch once per minute per user, and ignore local users. If one minute is too small, I propose making this a setting, though I'm not familiar with how to do that at this point. 10 mins should also be fine to hard-code, but an hour feels a little much.

The timeout mechanism seems like it should work and uses mechansims you’re familiar with
Though this much additional DB traffic to frequently update timestamp might (untested) negatively affect overall perf. Given the cooldown period is well within typicall continuous-uptime, I was thinking of just using a runtime-only hint in caches; see e.g. #762 for an example of a (new) cachex set being used for timeouts

To add the restriction for logged-in users, see e.g. lib/pleroma/web/mastodon_api/controllers/timeline_controller.ex’s public/2 function

If this is too much, we can also adjust those bits later before merge (though it might take a while)

The timeout mechanism seems like it should work and uses mechansims you’re familiar with Though this much additional DB traffic to frequently update timestamp might *(untested)* negatively affect overall perf. Given the cooldown period is well within typicall continuous-uptime, I was thinking of just using a runtime-only hint in caches; see e.g. #762 for an example of a (new) `cachex` set being used for timeouts To add the restriction for logged-in users, see e.g. `lib/pleroma/web/mastodon_api/controllers/timeline_controller.ex`’s `public/2` function If this is too much, we can also adjust those bits later before merge *(though it might take a while)*
params = params =
params params
|> Map.delete(:tagged) |> Map.delete(:tagged)

View file

@ -4,12 +4,29 @@
defmodule Pleroma.Workers.RemoteFetcherWorker do defmodule Pleroma.Workers.RemoteFetcherWorker do
alias Pleroma.Object.Fetcher alias Pleroma.Object.Fetcher
alias Pleroma.Object
alias Pleroma.Web.ActivityPub.ActivityPub
use Pleroma.Workers.WorkerHelper, use Pleroma.Workers.WorkerHelper,
queue: "remote_fetcher", queue: "remote_fetcher",
unique: [period: 300, states: Oban.Job.states(), keys: [:op, :id]] unique: [period: 300, states: Oban.Job.states(), keys: [:op, :id]]
@impl Oban.Worker @impl Oban.Worker
def perform(%Job{args: %{"op" => "fetch_outbox", "id" => address}}) do
with {:ok, outbox} <- ActivityPub.fetch_and_prepare_collection_from_ap_id(address) do
Enum.each(outbox, fn {ap_id, _} ->
if is_nil(Object.get_cached_by_ap_id(ap_id)) do
Fetcher.fetch_object_from_id(ap_id, depth: 1)
end
end)
:ok
else
e -> {:error, e}
end
end
def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do
case Fetcher.fetch_object_from_id(id, depth: args["depth"]) do case Fetcher.fetch_object_from_id(id, depth: args["depth"]) do
{:ok, _object} -> {:ok, _object} ->

View file

@ -0,0 +1,17 @@
defmodule Pleroma.Repo.Migrations.UsersAddOutboxes do
use Ecto.Migration
def up do
alter table(:users) do
add_if_not_exists(:outbox, :text)
add_if_not_exists(:last_outbox_fetch, :naive_datetime)
end
end
tudbut marked this conversation as resolved Outdated

This does not generally hold

This does not generally hold

im aware. but for migrating instances, no outboxes will be populated at first so this is kinda the only solution if you want previously fetched users to have the property

im aware. but for migrating instances, no outboxes will be populated at first so this is kinda the only solution if you want previously fetched users to have the property

any idea how to do this more accurately?

any idea how to do this more accurately?

don’t populate it and wait for the next user refresh to occur or Update activity to arrive filling outbox with the correct value

don’t populate it and wait for the next user refresh to occur or Update activity to arrive filling outbox with the correct value

(also this breaks for e.g. users named "inbox")

(also this breaks for e.g. users named "inbox")
def down do
alter table(:users) do
remove_if_exists(:outbox, :text)
remove_if_exists(:last_outbox_fetch, :naive_datetime)
end
end
end

10
shell.nix Normal file
View file

@ -0,0 +1,10 @@
{ pkgs ? import <nixpkgs> {} }:
pkgs.mkShell {
nativeBuildInputs = with pkgs.buildPackages; [
elixir
elixir-ls
cmake
file # libmagic
ffmpeg
];
}