WIP: Add backfilling of posts #846
8 changed files with 112 additions and 10 deletions
|
@ -371,7 +371,8 @@
|
|||
note_replies_output_limit: 5,
|
||||
sign_object_fetches: true,
|
||||
authorized_fetch_mode: false,
|
||||
max_collection_objects: 50
|
||||
max_collection_objects: 50,
|
||||
outbox_refetch_cooldown: 5 * 60
|
||||
|
||||
config :pleroma, :streamer,
|
||||
workers: 3,
|
||||
|
|
|
@ -210,6 +210,9 @@ defp log_fetch_error(id, error) do
|
|||
Logger.error("Object rejected while fetching #{id} #{inspect(error)}")
|
||||
end
|
||||
|
||||
# if its already an activity, dont wrap
|
||||
defp prepare_activity_params(%{"object" => _} = data), do: data
|
||||
|
||||
defp prepare_activity_params(data) do
|
||||
%{
|
||||
"type" => "Create",
|
||||
|
|
|
@ -152,6 +152,8 @@ defmodule Pleroma.User do
|
|||
field(:also_known_as, {:array, ObjectValidators.ObjectID}, default: [])
|
||||
field(:inbox, :string)
|
||||
field(:shared_inbox, :string)
|
||||
field(:outbox, :string, default: nil)
|
||||
field(:last_outbox_fetch, :naive_datetime, default: nil)
|
||||
field(:last_active_at, :naive_datetime)
|
||||
field(:disclose_client, :boolean, default: true)
|
||||
field(:pinned_objects, :map, default: %{})
|
||||
|
@ -471,6 +473,7 @@ def remote_user_changeset(struct \\ %User{local: false}, params) do
|
|||
:ap_id,
|
||||
:inbox,
|
||||
:shared_inbox,
|
||||
:outbox,
|
||||
:nickname,
|
||||
:avatar,
|
||||
:ap_enabled,
|
||||
|
@ -2769,4 +2772,12 @@ def accepts_direct_messages?(%User{accepts_direct_messages_from: :everybody}, _)
|
|||
|
||||
def accepts_direct_messages?(%User{accepts_direct_messages_from: :nobody}, _),
|
||||
do: false
|
||||
|
||||
def outbox_refreshed(%User{} = user) do
|
||||
# now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
|
||||
|
||||
user
|
||||
|> cast(%{last_outbox_fetch: NaiveDateTime.utc_now()}, [:last_outbox_fetch])
|
||||
|> update_and_set_cache()
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1587,7 +1587,8 @@ defp object_to_user_data(data, additional) do
|
|||
actor_type = data["type"] || "Person"
|
||||
|
||||
featured_address = data["featured"]
|
||||
{:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
|
||||
{:ok, pinned_objects} = fetch_and_prepare_collection_from_ap_id(featured_address)
|
||||
outbox_address = data["outbox"]
|
||||
|
||||
# first, check that the owner is correct
|
||||
signing_key =
|
||||
|
@ -1644,6 +1645,7 @@ defp object_to_user_data(data, additional) do
|
|||
also_known_as: also_known_as,
|
||||
signing_key: signing_key,
|
||||
inbox: data["inbox"],
|
||||
outbox: outbox_address,
|
||||
shared_inbox: shared_inbox,
|
||||
pinned_objects: pinned_objects,
|
||||
nickname: nickname
|
||||
|
@ -1790,7 +1792,7 @@ def maybe_handle_clashing_nickname(data) do
|
|||
end
|
||||
end
|
||||
|
||||
def pin_data_from_featured_collection(%{
|
||||
def activity_data_from_collection(%{
|
||||
"type" => "OrderedCollection",
|
||||
"first" => first
|
||||
}) do
|
||||
|
@ -1805,7 +1807,7 @@ def pin_data_from_featured_collection(%{
|
|||
end
|
||||
end
|
||||
|
||||
def pin_data_from_featured_collection(
|
||||
def activity_data_from_collection(
|
||||
%{
|
||||
"type" => type
|
||||
} = collection
|
||||
|
@ -1821,21 +1823,23 @@ def pin_data_from_featured_collection(
|
|||
end)
|
||||
end
|
||||
|
||||
def pin_data_from_featured_collection(obj) do
|
||||
Logger.error("Could not parse featured collection #{inspect(obj)}")
|
||||
def activity_data_from_collection(obj) do
|
||||
Logger.error("Could not parse collection #{inspect(obj)}")
|
||||
%{}
|
||||
end
|
||||
|
||||
def fetch_and_prepare_featured_from_ap_id(nil) do
|
||||
def fetch_and_prepare_collection_from_ap_id(nil) do
|
||||
{:ok, %{}}
|
||||
end
|
||||
|
||||
def fetch_and_prepare_featured_from_ap_id(ap_id) do
|
||||
def fetch_and_prepare_collection_from_ap_id(ap_id) do
|
||||
Logger.info("Fetching collection #{ap_id}")
|
||||
|
||||
with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
|
||||
{:ok, pin_data_from_featured_collection(data)}
|
||||
{:ok, activity_data_from_collection(data)}
|
||||
else
|
||||
e ->
|
||||
Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
|
||||
Logger.error("Could not decode collection at fetch #{ap_id}, #{inspect(e)}")
|
||||
{:ok, %{}}
|
||||
end
|
||||
end
|
||||
|
@ -1854,6 +1858,43 @@ def enqueue_pin_fetches(%{pinned_objects: pins}) do
|
|||
|
||||
def enqueue_pin_fetches(_), do: nil
|
||||
|
||||
defp need_outbox_refresh?(last_fetch)
|
||||
|
||||
defp need_outbox_refresh?(nil), do: true
|
||||
|
||||
defp need_outbox_refresh?(%NaiveDateTime{} = last_fetch) do
|
||||
NaiveDateTime.diff(NaiveDateTime.utc_now(), last_fetch) >
|
||||
Config.get!([:activitypub, :outbox_refetch_cooldown])
|
||||
end
|
||||
|
||||
def enqueue_outbox_fetches(
|
||||
%{outbox: outbox_address, last_outbox_fetch: last_fetch, local: false} = user
|
||||
) do
|
||||
if need_outbox_refresh?(last_fetch) do
|
||||
# enqueue a task to fetch the outbox
|
||||
Logger.debug("Refetching outbox #{outbox_address}")
|
||||
|
||||
Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_outbox", %{
|
||||
"id" => outbox_address
|
||||
})
|
||||
|
||||
User.outbox_refreshed(user)
|
||||
|
||||
:ok
|
||||
else
|
||||
Logger.debug("Not refetching outbox (TTL not reached: #{last_fetch}, age #{NaiveDateTime.diff(NaiveDateTime.utc_now(), last_fetch)})")
|
||||
end
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
def enqueue_outbox_fetches(%{local: true}), do: :ok
|
||||
|
||||
def enqueue_outbox_fetches(%{local: false} = user) do
|
||||
make_user_from_ap_id(user.ap_id)
|
||||
:ok
|
||||
end
|
||||
|
||||
def make_user_from_ap_id(ap_id, additional \\ []) do
|
||||
user = User.get_cached_by_ap_id(ap_id)
|
||||
|
||||
|
|
|
@ -306,6 +306,8 @@ def show(%{assigns: %{user: for_user}} = conn, %{id: nickname_or_id} = params) d
|
|||
def statuses(%{assigns: %{user: reading_user}} = conn, params) do
|
||||
with %User{} = user <- User.get_cached_by_nickname_or_id(params.id, for: reading_user),
|
||||
:visible <- User.visible_for(user, reading_user) do
|
||||
ActivityPub.enqueue_outbox_fetches(user)
|
||||
|
||||
|
||||
params =
|
||||
params
|
||||
|> Map.delete(:tagged)
|
||||
|
|
|
@ -4,12 +4,29 @@
|
|||
|
||||
defmodule Pleroma.Workers.RemoteFetcherWorker do
|
||||
alias Pleroma.Object.Fetcher
|
||||
alias Pleroma.Object
|
||||
alias Pleroma.Web.ActivityPub.ActivityPub
|
||||
|
||||
use Pleroma.Workers.WorkerHelper,
|
||||
queue: "remote_fetcher",
|
||||
unique: [period: 300, states: Oban.Job.states(), keys: [:op, :id]]
|
||||
|
||||
@impl Oban.Worker
|
||||
|
||||
def perform(%Job{args: %{"op" => "fetch_outbox", "id" => address}}) do
|
||||
with {:ok, outbox} <- ActivityPub.fetch_and_prepare_collection_from_ap_id(address) do
|
||||
Enum.each(outbox, fn {ap_id, _} ->
|
||||
if is_nil(Object.get_cached_by_ap_id(ap_id)) do
|
||||
Fetcher.fetch_object_from_id(ap_id, depth: 1)
|
||||
end
|
||||
end)
|
||||
|
||||
:ok
|
||||
else
|
||||
e -> {:error, e}
|
||||
end
|
||||
end
|
||||
|
||||
def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do
|
||||
case Fetcher.fetch_object_from_id(id, depth: args["depth"]) do
|
||||
{:ok, _object} ->
|
||||
|
|
17
priv/repo/migrations/20241027040000_users_add_outboxes.exs
Normal file
17
priv/repo/migrations/20241027040000_users_add_outboxes.exs
Normal file
|
@ -0,0 +1,17 @@
|
|||
defmodule Pleroma.Repo.Migrations.UsersAddOutboxes do
|
||||
use Ecto.Migration
|
||||
|
||||
def up do
|
||||
alter table(:users) do
|
||||
add_if_not_exists(:outbox, :text)
|
||||
add_if_not_exists(:last_outbox_fetch, :naive_datetime)
|
||||
end
|
||||
end
|
||||
tudbut marked this conversation as resolved
Outdated
Oneric
commented
This does not generally hold This does not generally hold
tudbut
commented
im aware. but for migrating instances, no outboxes will be populated at first so this is kinda the only solution if you want previously fetched users to have the property im aware. but for migrating instances, no outboxes will be populated at first so this is kinda the only solution if you want previously fetched users to have the property
tudbut
commented
any idea how to do this more accurately? any idea how to do this more accurately?
Oneric
commented
don’t populate it and wait for the next user refresh to occur or Update activity to arrive filling outbox with the correct value don’t populate it and wait for the next user refresh to occur or Update activity to arrive filling outbox with the correct value
Oneric
commented
(also this breaks for e.g. users named "inbox") (also this breaks for e.g. users named "inbox")
|
||||
|
||||
def down do
|
||||
alter table(:users) do
|
||||
remove_if_exists(:outbox, :text)
|
||||
remove_if_exists(:last_outbox_fetch, :naive_datetime)
|
||||
end
|
||||
end
|
||||
end
|
10
shell.nix
Normal file
10
shell.nix
Normal file
|
@ -0,0 +1,10 @@
|
|||
{ pkgs ? import <nixpkgs> {} }:
|
||||
pkgs.mkShell {
|
||||
nativeBuildInputs = with pkgs.buildPackages; [
|
||||
elixir
|
||||
elixir-ls
|
||||
cmake
|
||||
file # libmagic
|
||||
ffmpeg
|
||||
];
|
||||
}
|
Loading…
Reference in a new issue
imho this should be limited to, or there should at least be an option to limit this to, authenticated requests only. Processing new posts entailing pulling in their whole threads, new refeenced users and their
pinned(now generally latest) posts and their threads and so on can become quite costlyAlso no need to enquee fetched if the viewed profile is a local user
Furthermore, there prob should be a timeout (e.g. 60 min) so we don't constantly spam outbox fetches for popular remote users
we do always fetch the outbox, but activities in it are still cached. a timeout may make sense either way.
local users never get an outbox address property, its only added to the user object when its a remote one. so technically we're already not fetching local outboxes. making that more explicit makes sense tho
This is insufficent, since:
And also doesn’t address the limitation to authenticated users only. As I mentioned this can be costly and I don’t want unauthenticated users browsing user profiles to ever be able to create such a significant workload for the instance for profiles actual local users might never even look at. (Letting unauthenticated viewers trigger this also exacerbates the request spam issue)
I've made it only fetch once per minute per user, and ignore local users. If one minute is too small, I propose making this a setting, though I'm not familiar with how to do that at this point. 10 mins should also be fine to hard-code, but an hour feels a little much.
The timeout mechanism seems like it should work and uses mechansims you’re familiar with
Though this much additional DB traffic to frequently update timestamp might (untested) negatively affect overall perf. Given the cooldown period is well within typicall continuous-uptime, I was thinking of just using a runtime-only hint in caches; see e.g. #762 for an example of a (new)
cachex
set being used for timeoutsTo add the restriction for logged-in users, see e.g.
lib/pleroma/web/mastodon_api/controllers/timeline_controller.ex
’spublic/2
functionIf this is too much, we can also adjust those bits later before merge (though it might take a while)