use proper workers for fetching pins instead of an ad-hoc task #788
8 changed files with 62 additions and 15 deletions
|
@ -16,7 +16,7 @@ defmodule Mix.Pleroma do
|
|||
:fast_html,
|
||||
:oban
|
||||
]
|
||||
@cachex_children ["object", "user", "scrubber", "web_resp"]
|
||||
@cachex_children ["object", "user", "scrubber", "web_resp", "http_backoff"]
|
||||
@doc "Common functions to be reused in mix tasks"
|
||||
def start_pleroma do
|
||||
Pleroma.Config.Holder.save_default()
|
||||
|
|
|
@ -17,6 +17,13 @@ def run(["http", url]) do
|
|||
|> IO.inspect()
|
||||
end
|
||||
|
||||
def run(["fetch_object", url]) do
|
||||
start_pleroma()
|
||||
|
||||
Pleroma.Object.Fetcher.fetch_object_from_id(url)
|
||||
|> IO.inspect()
|
||||
end
|
||||
|
||||
def run(["home_timeline", nickname]) do
|
||||
start_pleroma()
|
||||
user = Repo.get_by!(User, nickname: nickname)
|
||||
|
|
|
@ -114,7 +114,8 @@ def get(url, headers \\ [], options \\ []) do
|
|||
|> HTTP.get(headers, options)
|
||||
|> check_backoff(host)
|
||||
|
||||
_ ->
|
||||
e ->
|
||||
IO.inspect(e)
|
||||
{:error, :ratelimit}
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1824,17 +1824,17 @@ def fetch_and_prepare_featured_from_ap_id(ap_id) do
|
|||
end
|
||||
end
|
||||
|
||||
def pinned_fetch_task(nil), do: nil
|
||||
|
||||
def pinned_fetch_task(%{pinned_objects: pins}) do
|
||||
if Enum.all?(pins, fn {ap_id, _} ->
|
||||
Object.get_cached_by_ap_id(ap_id) ||
|
||||
match?({:ok, _object}, Fetcher.fetch_object_from_id(ap_id))
|
||||
end) do
|
||||
:ok
|
||||
else
|
||||
:error
|
||||
end
|
||||
def enqueue_pin_fetches(%{pinned_objects: pins}) do
|
||||
IO.puts("Fetching pinned objects: #{inspect(pins)}")
|
||||
# enqueue a task to fetch all pinned objects
|
||||
Enum.each(pins, fn {ap_id, _} ->
|
||||
unless %{} = Object.get_cached_by_ap_id(ap_id) do
|
||||
Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
|
||||
"id" => ap_id,
|
||||
"depth" => 1
|
||||
})
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
def make_user_from_ap_id(ap_id, additional \\ []) do
|
||||
|
@ -1844,7 +1844,7 @@ def make_user_from_ap_id(ap_id, additional \\ []) do
|
|||
Transmogrifier.upgrade_user_from_ap_id(ap_id)
|
||||
else
|
||||
with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, additional) do
|
||||
{:ok, _pid} = Task.start(fn -> pinned_fetch_task(data) end)
|
||||
enqueue_pin_fetches(data)
|
||||
|
||||
user =
|
||||
if data.ap_id != ap_id do
|
||||
|
|
|
@ -1034,7 +1034,7 @@ def upgrade_user_from_ap_id(ap_id) do
|
|||
with %User{local: false} = user <- User.get_cached_by_ap_id(ap_id),
|
||||
{:ok, data} <- ActivityPub.fetch_and_prepare_user_from_ap_id(ap_id),
|
||||
{:ok, user} <- update_user(user, data) do
|
||||
{:ok, _pid} = Task.start(fn -> ActivityPub.pinned_fetch_task(user) end)
|
||||
ActivityPub.enqueue_pin_fetches(user)
|
||||
TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id})
|
||||
{:ok, user}
|
||||
else
|
||||
|
|
|
@ -24,6 +24,9 @@ def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do
|
|||
{:error, :allowed_depth} ->
|
||||
{:discard, :allowed_depth}
|
||||
|
||||
{:error, {:error, e}} ->
|
||||
{:discard, e}
|
||||
|
||||
{:error, :invalid_uri_scheme} ->
|
||||
{:discard, :invalid_uri_scheme}
|
||||
|
||||
|
|
|
@ -399,6 +399,31 @@ test "does NOT schedule background fetching of `replies` beyond max thread depth
|
|||
|
||||
assert all_enqueued(worker: Pleroma.Workers.RemoteFetcherWorker) == []
|
||||
end
|
||||
|
||||
test "it does not explode if it cannot process the user behind a post" do
|
||||
# this will break the nickname ascii check
|
||||
user_ap_data = "test/fixtures/users_mock/user.json"
|
||||
|> File.read!()
|
||||
|> String.replace("{{nickname}}", "あっこ")
|
||||
|> Jason.decode!()
|
||||
|> Map.delete("featured")
|
||||
|
||||
user_ap_id = user_ap_data["id"]
|
||||
|
||||
Tesla.Mock.mock_global(fn %{url: ^user_ap_id} ->
|
||||
%Tesla.Env{status: 200, body: Jason.encode!(user_ap_data),
|
||||
headers: HttpRequestMock.activitypub_object_headers()}
|
||||
end)
|
||||
|
||||
data =
|
||||
File.read!("test/fixtures/mastodon-post-activity.json")
|
||||
|> Jason.decode!()
|
||||
|> Map.put("actor", user_ap_id)
|
||||
|> Map.put("to", ["https://www.w3.org/ns/activitystreams#Public"])
|
||||
|> Map.put("cc", [])
|
||||
|
||||
assert {:error, :not_found} = Transmogrifier.handle_incoming(data)
|
||||
end
|
||||
end
|
||||
|
||||
describe "`handle_incoming/2`, Pleroma format `replies` handling" do
|
||||
|
|
|
@ -146,6 +146,17 @@ test "it accepts quote posts" do
|
|||
# It fetched the quoted post
|
||||
assert Object.normalize("https://misskey.io/notes/8vs6wxufd0")
|
||||
end
|
||||
|
||||
test "it does not explode if it cannot process the user behind a post" do
|
||||
# this will break the nickname ascii check
|
||||
user_ap_data = "test/fixtures/users_mock/user.json"
|
||||
|> File.read!()
|
||||
|> String.replace("{{nickname}}", "あっこ")
|
||||
|
||||
Tesla.Mock.mock_global(fn %{url: "https://example.com/users/あっこ"} ->
|
||||
%Tesla.Env{status: 200, body: user_ap_data}
|
||||
end)
|
||||
end
|
||||
end
|
||||
|
||||
describe "prepare outgoing" do
|
||||
|
|
Loading…
Reference in a new issue