Add backfilling of posts
This commit is contained in:
parent
bee10eab5e
commit
2aefeeee61
6 changed files with 55 additions and 11 deletions
|
@ -19,7 +19,7 @@ def cast(object) when is_binary(object) do
|
||||||
|
|
||||||
def cast(%{"id" => object}), do: cast(object)
|
def cast(%{"id" => object}), do: cast(object)
|
||||||
|
|
||||||
def cast(_), do: :error
|
def cast(o), do: {:error, o}
|
||||||
|
|
||||||
def dump(data), do: {:ok, data}
|
def dump(data), do: {:ok, data}
|
||||||
|
|
||||||
|
|
|
@ -153,6 +153,7 @@ defmodule Pleroma.User do
|
||||||
field(:also_known_as, {:array, ObjectValidators.ObjectID}, default: [])
|
field(:also_known_as, {:array, ObjectValidators.ObjectID}, default: [])
|
||||||
field(:inbox, :string)
|
field(:inbox, :string)
|
||||||
field(:shared_inbox, :string)
|
field(:shared_inbox, :string)
|
||||||
|
field(:outbox, :string, default: nil)
|
||||||
field(:last_active_at, :naive_datetime)
|
field(:last_active_at, :naive_datetime)
|
||||||
field(:disclose_client, :boolean, default: true)
|
field(:disclose_client, :boolean, default: true)
|
||||||
field(:pinned_objects, :map, default: %{})
|
field(:pinned_objects, :map, default: %{})
|
||||||
|
@ -465,6 +466,7 @@ def remote_user_changeset(struct \\ %User{local: false}, params) do
|
||||||
:ap_id,
|
:ap_id,
|
||||||
:inbox,
|
:inbox,
|
||||||
:shared_inbox,
|
:shared_inbox,
|
||||||
|
:outbox,
|
||||||
:nickname,
|
:nickname,
|
||||||
:public_key,
|
:public_key,
|
||||||
:avatar,
|
:avatar,
|
||||||
|
|
|
@ -1576,7 +1576,8 @@ defp object_to_user_data(data, additional) do
|
||||||
actor_type = data["type"] || "Person"
|
actor_type = data["type"] || "Person"
|
||||||
|
|
||||||
featured_address = data["featured"]
|
featured_address = data["featured"]
|
||||||
{:ok, pinned_objects} = fetch_and_prepare_featured_from_ap_id(featured_address)
|
{:ok, pinned_objects} = fetch_and_prepare_outbox_from_ap_id(featured_address)
|
||||||
|
outbox_address = data["outbox"]
|
||||||
|
|
||||||
public_key =
|
public_key =
|
||||||
if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
|
if is_map(data["publicKey"]) && is_binary(data["publicKey"]["publicKeyPem"]) do
|
||||||
|
@ -1626,6 +1627,7 @@ defp object_to_user_data(data, additional) do
|
||||||
also_known_as: also_known_as,
|
also_known_as: also_known_as,
|
||||||
public_key: public_key,
|
public_key: public_key,
|
||||||
inbox: data["inbox"],
|
inbox: data["inbox"],
|
||||||
|
outbox: outbox_address,
|
||||||
shared_inbox: shared_inbox,
|
shared_inbox: shared_inbox,
|
||||||
pinned_objects: pinned_objects,
|
pinned_objects: pinned_objects,
|
||||||
nickname: nickname
|
nickname: nickname
|
||||||
|
@ -1772,7 +1774,7 @@ def maybe_handle_clashing_nickname(data) do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def pin_data_from_featured_collection(%{
|
def activity_data_from_outbox_collection(%{
|
||||||
"type" => "OrderedCollection",
|
"type" => "OrderedCollection",
|
||||||
"first" => first
|
"first" => first
|
||||||
}) do
|
}) do
|
||||||
|
@ -1787,7 +1789,7 @@ def pin_data_from_featured_collection(%{
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def pin_data_from_featured_collection(
|
def activity_data_from_outbox_collection(
|
||||||
%{
|
%{
|
||||||
"type" => type
|
"type" => type
|
||||||
} = collection
|
} = collection
|
||||||
|
@ -1803,21 +1805,23 @@ def pin_data_from_featured_collection(
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
def pin_data_from_featured_collection(obj) do
|
def activity_data_from_outbox_collection(obj) do
|
||||||
Logger.error("Could not parse featured collection #{inspect(obj)}")
|
Logger.error("Could not parse outbox collection #{inspect(obj)}")
|
||||||
%{}
|
%{}
|
||||||
end
|
end
|
||||||
|
|
||||||
def fetch_and_prepare_featured_from_ap_id(nil) do
|
def fetch_and_prepare_outbox_from_ap_id(nil) do
|
||||||
{:ok, %{}}
|
{:ok, %{}}
|
||||||
end
|
end
|
||||||
|
|
||||||
def fetch_and_prepare_featured_from_ap_id(ap_id) do
|
def fetch_and_prepare_outbox_from_ap_id(ap_id) do
|
||||||
|
Logger.info("Fetching outbox #{ap_id}")
|
||||||
|
|
||||||
with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
|
with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id) do
|
||||||
{:ok, pin_data_from_featured_collection(data)}
|
{:ok, activity_data_from_outbox_collection(data)}
|
||||||
else
|
else
|
||||||
e ->
|
e ->
|
||||||
Logger.error("Could not decode featured collection at fetch #{ap_id}, #{inspect(e)}")
|
Logger.error("Could not decode outbox collection at fetch #{ap_id}, #{inspect(e)}")
|
||||||
{:ok, %{}}
|
{:ok, %{}}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -1836,6 +1840,19 @@ def enqueue_pin_fetches(%{pinned_objects: pins}) do
|
||||||
|
|
||||||
def enqueue_pin_fetches(_), do: nil
|
def enqueue_pin_fetches(_), do: nil
|
||||||
|
|
||||||
|
def enqueue_outbox_fetches(%{outbox: outbox_address}) do
|
||||||
|
# enqueue a task to fetch the outbox
|
||||||
|
Logger.debug("Refetching outbox #{outbox_address}")
|
||||||
|
|
||||||
|
Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_outbox", %{
|
||||||
|
"id" => outbox_address
|
||||||
|
})
|
||||||
|
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
|
||||||
|
def enqueue_outbox_fetches(_), do: :error
|
||||||
|
|
||||||
def make_user_from_ap_id(ap_id, additional \\ []) do
|
def make_user_from_ap_id(ap_id, additional \\ []) do
|
||||||
user = User.get_cached_by_ap_id(ap_id)
|
user = User.get_cached_by_ap_id(ap_id)
|
||||||
|
|
||||||
|
|
|
@ -637,7 +637,10 @@ defp handle_incoming_normalised(
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp handle_incoming_normalised(_, _), do: :error
|
defp handle_incoming_normalised(%{"object" => o}, options),
|
||||||
|
do: handle_incoming_normalised(o, options)
|
||||||
|
|
||||||
|
defp handle_incoming_normalised(o, _), do: {:error, {:unknown_object_class, o}}
|
||||||
|
|
||||||
@spec get_obj_helper(String.t(), Keyword.t()) :: {:ok, Object.t()} | nil
|
@spec get_obj_helper(String.t(), Keyword.t()) :: {:ok, Object.t()} | nil
|
||||||
def get_obj_helper(id, options \\ []) do
|
def get_obj_helper(id, options \\ []) do
|
||||||
|
|
|
@ -306,6 +306,8 @@ def show(%{assigns: %{user: for_user}} = conn, %{id: nickname_or_id} = params) d
|
||||||
def statuses(%{assigns: %{user: reading_user}} = conn, params) do
|
def statuses(%{assigns: %{user: reading_user}} = conn, params) do
|
||||||
with %User{} = user <- User.get_cached_by_nickname_or_id(params.id, for: reading_user),
|
with %User{} = user <- User.get_cached_by_nickname_or_id(params.id, for: reading_user),
|
||||||
:visible <- User.visible_for(user, reading_user) do
|
:visible <- User.visible_for(user, reading_user) do
|
||||||
|
ActivityPub.enqueue_outbox_fetches(user)
|
||||||
|
|
||||||
params =
|
params =
|
||||||
params
|
params
|
||||||
|> Map.delete(:tagged)
|
|> Map.delete(:tagged)
|
||||||
|
|
|
@ -4,12 +4,32 @@
|
||||||
|
|
||||||
defmodule Pleroma.Workers.RemoteFetcherWorker do
|
defmodule Pleroma.Workers.RemoteFetcherWorker do
|
||||||
alias Pleroma.Object.Fetcher
|
alias Pleroma.Object.Fetcher
|
||||||
|
alias Pleroma.Object
|
||||||
|
alias Pleroma.Web.ActivityPub.ActivityPub
|
||||||
|
|
||||||
use Pleroma.Workers.WorkerHelper,
|
use Pleroma.Workers.WorkerHelper,
|
||||||
queue: "remote_fetcher",
|
queue: "remote_fetcher",
|
||||||
unique: [period: 300, states: Oban.Job.states(), keys: [:op, :id]]
|
unique: [period: 300, states: Oban.Job.states(), keys: [:op, :id]]
|
||||||
|
|
||||||
@impl Oban.Worker
|
@impl Oban.Worker
|
||||||
|
|
||||||
|
def perform(%Job{args: %{"op" => "fetch_outbox", "id" => address}}) do
|
||||||
|
with {:ok, outbox} <- ActivityPub.fetch_and_prepare_outbox_from_ap_id(address) do
|
||||||
|
Enum.each(Enum.reverse(outbox), fn {ap_id, _} ->
|
||||||
|
if is_nil(Object.get_cached_by_ap_id(ap_id)) do
|
||||||
|
enqueue("fetch_remote", %{
|
||||||
|
"id" => ap_id,
|
||||||
|
"depth" => 1
|
||||||
|
})
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
|
||||||
|
:ok
|
||||||
|
else
|
||||||
|
e -> {:error, e}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do
|
def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do
|
||||||
case Fetcher.fetch_object_from_id(id, depth: args["depth"]) do
|
case Fetcher.fetch_object_from_id(id, depth: args["depth"]) do
|
||||||
{:ok, _object} ->
|
{:ok, _object} ->
|
||||||
|
|
Loading…
Reference in a new issue