WIP: Add backfilling of posts #846
8 changed files with 112 additions and 10 deletions
|
@ -371,7 +371,8 @@
|
|||
note_replies_output_limit: 5,
|
||||
sign_object_fetches: true,
|
||||
authorized_fetch_mode: false,
|
||||
max_collection_objects: 50
|
||||
max_collection_objects: 50,
|
||||
outbox_refetch_cooldown: 5 * 60
|
||||
|
||||
config :pleroma, :streamer,
|
||||
workers: 3,
|
||||
|
|
|
@ -210,6 +210,9 @@ defp log_fetch_error(id, error) do
|
|||
Logger.error("Object rejected while fetching #{id} #{inspect(error)}")
|
||||
end
|
||||
|
||||
# if its already an activity, dont wrap
|
||||
defp prepare_activity_params(%{"object" => _} = data), do: data
|
||||
|
||||
defp prepare_activity_params(data) do
|
||||
%{
|
||||
"type" => "Create",
|
||||
|
|
|
@ -152,6 +152,8 @@ defmodule Pleroma.User do
|
|||
field(:also_known_as, {:array, ObjectValidators.ObjectID}, default: [])
|
||||
field(:inbox, :string)
|
||||
field(:shared_inbox, :string)
|
||||
field(:outbox, :string, default: nil)
|
||||
field(:last_outbox_fetch, :naive_datetime, default: nil)
|
||||
field(:last_active_at, :naive_datetime)
|
||||
field(:disclose_client, :boolean, default: true)
|
||||
field(:pinned_objects, :map, default: %{})
|
||||
|
@ -471,6 +473,7 @@ def remote_user_changeset(struct \\ %User{local: false}, params) do
|
|||
:ap_id,
|
||||
:inbox,
|
||||
:shared_inbox,
|
||||
:outbox,
|
||||
:nickname,
|
||||
:avatar,
|
||||
:ap_enabled,
|
||||
|
@ -2769,4 +2772,12 @@ def accepts_direct_messages?(%User{accepts_direct_messages_from: :everybody}, _)
|
|||
|
||||
def accepts_direct_messages?(%User{accepts_direct_messages_from: :nobody}, _),
|
||||
do: false
|
||||
|
||||
def outbox_refreshed(%User{} = user) do
|
||||
# now = NaiveDateTime.truncate(NaiveDateTime.utc_now(), :second)
|
||||
|
||||
user
|
||||
|> cast(%{last_outbox_fetch: NaiveDateTime.utc_now()}, [:last_outbox_fetch])
|
||||
|> update_and_set_cache()
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1587,7 +1587,8 @@ defp object_to_user_data(data, additional) do
|
|||
actor_type = data["type"] || "Person"
|
||||
|
||||
featured_address = data["featured"]
|
||||
{:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
|
||||
{:ok, pinned_objects} = fetch_and_prepare_collection_from_ap_id(featured_address)
|
||||
outbox_address = data["outbox"]
|
||||
|
||||
# first, check that the owner is correct
|
||||
signing_key =
|
||||
|
@ -1644,6 +1645,7 @@ defp object_to_user_data(data, additional) do
|
|||
also_known_as: also_known_as,
|
||||
signing_key: signing_key,
|
||||
inbox: data["inbox"],
|
||||
outbox: outbox_address,
|
||||
shared_inbox: shared_inbox,
|
||||
pinned_objects: pinned_objects,
|
||||
nickname: nickname
|
||||
|
@ -1790,7 +1792,7 @@ def maybe_handle_clashing_nickname(data) do
|
|||
end
|
||||
end
|
||||
|
||||
def pin_data_from_featured_collection(%{
|
||||
def activity_data_from_collection(%{
|
||||
"type" => "OrderedCollection",
|
||||
"first" => first
|
||||
}) do
|
||||
|
@ -1805,7 +1807,7 @@ def pin_data_from_featured_collection(%{
|
|||
end
|
||||
end
|
||||
|
||||
def pin_data_from_featured_collection(
|
||||
def activity_data_from_collection(
|
||||
%{
|
||||
"type" => type
|
||||
} = collection
|
||||
|
@ -1821,21 +1823,23 @@ def pin_data_from_featured_collection(
|
|||
end)
|
||||
end
|
||||
|
||||
def pin_data_from_featured_collection(obj) do
|
||||
Logger.error("Could not parse featured collection #{inspect(obj)}")
|
||||
def activity_data_from_collection(obj) do
|
||||
Logger.error("Could not parse collection #{inspect(obj)}")
|
||||
%{}
|
||||
end
|
||||
|
||||
def fetch_and_prepare_featured_from_ap_id(nil) do
|
||||
def fetch_and_prepare_collection_from_ap_id(nil) do
|
||||
{:ok, %{}}
|
||||
end
|
||||
|
||||
def fetch_and_prepare_featured_from_ap_id(ap_id) do
|
||||
def fetch_and_prepare_collection_from_ap_id(ap_id) do
|
||||
Logger.info("Fetching collection #{ap_id}")
|
||||
|
||||
with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
|
||||
{:ok, pin_data_from_featured_collection(data)}
|
||||
{:ok, activity_data_from_collection(data)}
|
||||
else
|
||||
e ->
|
||||
Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
|
||||
Logger.error("Could not decode collection at fetch #{ap_id}, #{inspect(e)}")
|
||||
{:ok, %{}}
|
||||
end
|
||||
end
|
||||
|
@ -1854,6 +1858,43 @@ def enqueue_pin_fetches(%{pinned_objects: pins}) do
|
|||
|
||||
def enqueue_pin_fetches(_), do: nil
|
||||
|
||||
defp need_outbox_refresh?(last_fetch)
|
||||
|
||||
defp need_outbox_refresh?(nil), do: true
|
||||
|
||||
defp need_outbox_refresh?(%NaiveDateTime{} = last_fetch) do
|
||||
NaiveDateTime.diff(NaiveDateTime.utc_now(), last_fetch) >
|
||||
Config.get!([:activitypub, :outbox_refetch_cooldown])
|
||||
end
|
||||
|
||||
def enqueue_outbox_fetches(
|
||||
%{outbox: outbox_address, last_outbox_fetch: last_fetch, local: false} = user
|
||||
) do
|
||||
if need_outbox_refresh?(last_fetch) do
|
||||
# enqueue a task to fetch the outbox
|
||||
Logger.debug("Refetching outbox #{outbox_address}")
|
||||
|
||||
Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_outbox", %{
|
||||
"id" => outbox_address
|
||||
})
|
||||
|
||||
User.outbox_refreshed(user)
|
||||
|
||||
:ok
|
||||
else
|
||||
Logger.debug("Not refetching outbox (TTL not reached: #{last_fetch}, age #{NaiveDateTime.diff(NaiveDateTime.utc_now(), last_fetch)})")
|
||||
end
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
def enqueue_outbox_fetches(%{local: true}), do: :ok
|
||||
|
||||
def enqueue_outbox_fetches(%{local: false} = user) do
|
||||
make_user_from_ap_id(user.ap_id)
|
||||
:ok
|
||||
end
|
||||
|
||||
def make_user_from_ap_id(ap_id, additional \\ []) do
|
||||
user = User.get_cached_by_ap_id(ap_id)
|
||||
|
||||
|
|
|
@ -306,6 +306,8 @@ def show(%{assigns: %{user: for_user}} = conn, %{id: nickname_or_id} = params) d
|
|||
def statuses(%{assigns: %{user: reading_user}} = conn, params) do
|
||||
with %User{} = user <- User.get_cached_by_nickname_or_id(params.id, for: reading_user),
|
||||
:visible <- User.visible_for(user, reading_user) do
|
||||
ActivityPub.enqueue_outbox_fetches(user)
|
||||
|
||||
params =
|
||||
params
|
||||
|> Map.delete(:tagged)
|
||||
|
|
|
@ -4,12 +4,29 @@
|
|||
|
||||
defmodule Pleroma.Workers.RemoteFetcherWorker do
|
||||
alias Pleroma.Object.Fetcher
|
||||
alias Pleroma.Object
|
||||
alias Pleroma.Web.ActivityPub.ActivityPub
|
||||
|
||||
use Pleroma.Workers.WorkerHelper,
|
||||
queue: "remote_fetcher",
|
||||
unique: [period: 300, states: Oban.Job.states(), keys: [:op, :id]]
|
||||
|
||||
@impl Oban.Worker
|
||||
|
||||
def perform(%Job{args: %{"op" => "fetch_outbox", "id" => address}}) do
|
||||
with {:ok, outbox} <- ActivityPub.fetch_and_prepare_collection_from_ap_id(address) do
|
||||
Enum.each(outbox, fn {ap_id, _} ->
|
||||
if is_nil(Object.get_cached_by_ap_id(ap_id)) do
|
||||
Fetcher.fetch_object_from_id(ap_id, depth: 1)
|
||||
end
|
||||
end)
|
||||
|
||||
:ok
|
||||
else
|
||||
e -> {:error, e}
|
||||
end
|
||||
end
|
||||
|
||||
def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do
|
||||
case Fetcher.fetch_object_from_id(id, depth: args["depth"]) do
|
||||
{:ok, _object} ->
|
||||
|
|
17
priv/repo/migrations/20241027040000_users_add_outboxes.exs
Normal file
17
priv/repo/migrations/20241027040000_users_add_outboxes.exs
Normal file
|
@ -0,0 +1,17 @@
|
|||
defmodule Pleroma.Repo.Migrations.UsersAddOutboxes do
|
||||
use Ecto.Migration
|
||||
|
||||
def up do
|
||||
alter table(:users) do
|
||||
add_if_not_exists(:outbox, :text)
|
||||
add_if_not_exists(:last_outbox_fetch, :naive_datetime)
|
||||
end
|
||||
end
|
||||
|
||||
def down do
|
||||
alter table(:users) do
|
||||
remove_if_exists(:outbox, :text)
|
||||
remove_if_exists(:last_outbox_fetch, :naive_datetime)
|
||||
end
|
||||
end
|
||||
end
|
10
shell.nix
Normal file
10
shell.nix
Normal file
|
@ -0,0 +1,10 @@
|
|||
{ pkgs ? import <nixpkgs> {} }:
|
||||
pkgs.mkShell {
|
||||
nativeBuildInputs = with pkgs.buildPackages; [
|
||||
elixir
|
||||
elixir-ls
|
||||
cmake
|
||||
file # libmagic
|
||||
ffmpeg
|
||||
];
|
||||
}
|
Loading…
Reference in a new issue