Moving some background jobs into simple tasks

- fetching activity data
- attachment prefetching
- using limiter to prevent overload
This commit is contained in:
Alexander Strizhakov 2020-09-10 10:54:57 +03:00
parent 88f6b61a5e
commit 8d218ebaf5
No known key found for this signature in database
GPG key ID: 022896A53AEF1381
11 changed files with 58 additions and 40 deletions

View file

@ -57,6 +57,7 @@ def start(_type, _args) do
setup_instrumenters() setup_instrumenters()
load_custom_modules() load_custom_modules()
Pleroma.Docs.JSON.compile() Pleroma.Docs.JSON.compile()
limiters_setup()
adapter = Application.get_env(:tesla, :adapter) adapter = Application.get_env(:tesla, :adapter)
@ -273,4 +274,9 @@ defp http_children(Tesla.Adapter.Gun, _) do
end end
defp http_children(_, _), do: [] defp http_children(_, _), do: []
def limiters_setup do
[Pleroma.Web.RichMedia.Helpers, Pleroma.Web.MediaProxy]
|> Enum.each(&ConcurrentLimiter.new(&1, 1, 0))
end
end end

View file

@ -123,7 +123,9 @@ def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when
# Splice in the child object if we have one. # Splice in the child object if we have one.
activity = Maps.put_if_present(activity, :object, object) activity = Maps.put_if_present(activity, :object, object)
BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id}) ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
end)
{:ok, activity} {:ok, activity}
else else

View file

@ -8,7 +8,6 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
alias Pleroma.HTTP alias Pleroma.HTTP
alias Pleroma.Web.MediaProxy alias Pleroma.Web.MediaProxy
alias Pleroma.Workers.BackgroundWorker
require Logger require Logger
@ -17,7 +16,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
recv_timeout: 10_000 recv_timeout: 10_000
] ]
def perform(:prefetch, url) do defp prefetch(url) do
# Fetching only proxiable resources # Fetching only proxiable resources
if MediaProxy.enabled?() and MediaProxy.url_proxiable?(url) do if MediaProxy.enabled?() and MediaProxy.url_proxiable?(url) do
# If preview proxy is enabled, it'll also hit media proxy (so we're caching both requests) # If preview proxy is enabled, it'll also hit media proxy (so we're caching both requests)
@ -25,17 +24,25 @@ def perform(:prefetch, url) do
Logger.debug("Prefetching #{inspect(url)} as #{inspect(prefetch_url)}") Logger.debug("Prefetching #{inspect(url)} as #{inspect(prefetch_url)}")
HTTP.get(prefetch_url, [], @adapter_options) if Pleroma.Config.get(:env) == :test do
fetch(prefetch_url)
else
ConcurrentLimiter.limit(MediaProxy, fn ->
Task.start(fn -> fetch(prefetch_url) end)
end)
end
end end
end end
def perform(:preload, %{"object" => %{"attachment" => attachments}} = _message) do defp fetch(url), do: HTTP.get(url, [], @adapter_options)
defp preload(%{"object" => %{"attachment" => attachments}} = _message) do
Enum.each(attachments, fn Enum.each(attachments, fn
%{"url" => url} when is_list(url) -> %{"url" => url} when is_list(url) ->
url url
|> Enum.each(fn |> Enum.each(fn
%{"href" => href} -> %{"href" => href} ->
BackgroundWorker.enqueue("media_proxy_prefetch", %{"url" => href}) prefetch(href)
x -> x ->
Logger.debug("Unhandled attachment URL object #{inspect(x)}") Logger.debug("Unhandled attachment URL object #{inspect(x)}")
@ -51,7 +58,7 @@ def filter(
%{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message %{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message
) )
when is_list(attachments) and length(attachments) > 0 do when is_list(attachments) and length(attachments) > 0 do
BackgroundWorker.enqueue("media_proxy_preload", %{"message" => message}) preload(message)
{:ok, message} {:ok, message}
end end

View file

@ -24,7 +24,6 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.Push alias Pleroma.Web.Push
alias Pleroma.Web.Streamer alias Pleroma.Web.Streamer
alias Pleroma.Workers.BackgroundWorker
require Logger require Logger
@ -191,7 +190,9 @@ def handle(%{data: %{"type" => "Create"}} = activity, meta) do
Object.increase_replies_count(in_reply_to) Object.increase_replies_count(in_reply_to)
end end
BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id}) ConcurrentLimiter.limit(Pleroma.Web.RichMedia.Helpers, fn ->
Task.start(fn -> Pleroma.Web.RichMedia.Helpers.fetch_data_for_activity(activity) end)
end)
meta = meta =
meta meta

View file

@ -78,11 +78,6 @@ def fetch_data_for_activity(%Activity{data: %{"type" => "Create"}} = activity) d
def fetch_data_for_activity(_), do: %{} def fetch_data_for_activity(_), do: %{}
def perform(:fetch, %Activity{} = activity) do
fetch_data_for_activity(activity)
:ok
end
def rich_media_get(url) do def rich_media_get(url) do
headers = [{"user-agent", Pleroma.Application.user_agent() <> "; Bot"}] headers = [{"user-agent", Pleroma.Application.user_agent() <> "; Bot"}]

View file

@ -3,9 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.BackgroundWorker do defmodule Pleroma.Workers.BackgroundWorker do
alias Pleroma.Activity
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
use Pleroma.Workers.WorkerHelper, queue: "background" use Pleroma.Workers.WorkerHelper, queue: "background"
@ -32,19 +30,6 @@ def perform(%Job{args: %{"op" => op, "user_id" => user_id, "identifiers" => iden
{:ok, User.Import.perform(String.to_atom(op), user, identifiers)} {:ok, User.Import.perform(String.to_atom(op), user, identifiers)}
end end
def perform(%Job{args: %{"op" => "media_proxy_preload", "message" => message}}) do
MediaProxyWarmingPolicy.perform(:preload, message)
end
def perform(%Job{args: %{"op" => "media_proxy_prefetch", "url" => url}}) do
MediaProxyWarmingPolicy.perform(:prefetch, url)
end
def perform(%Job{args: %{"op" => "fetch_data_for_activity", "activity_id" => activity_id}}) do
activity = Activity.get_by_id(activity_id)
Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity)
end
def perform(%Job{ def perform(%Job{
args: %{"op" => "move_following", "origin_id" => origin_id, "target_id" => target_id} args: %{"op" => "move_following", "origin_id" => origin_id, "target_id" => target_id}
}) do }) do

View file

@ -0,0 +1,22 @@
defmodule Pleroma.Repo.Migrations.RemoveBackgroundJobs do
use Ecto.Migration
import Ecto.Query, only: [from: 2]
def up do
from(j in "oban_jobs",
where:
j.queue == ^"background" and
fragment("?->>'op'", j.args) in ^[
"fetch_data_for_activity",
"media_proxy_prefetch",
"media_proxy_preload"
] and
j.worker == ^"Pleroma.Workers.BackgroundWorker",
select: [:id]
)
|> Pleroma.Repo.delete_all()
end
def down, do: :ok
end

View file

@ -12,7 +12,7 @@ defmodule Pleroma.Config.DeprecationWarningsTest do
alias Pleroma.Config.DeprecationWarnings alias Pleroma.Config.DeprecationWarnings
test "check_old_mrf_config/0" do test "check_old_mrf_config/0" do
clear_config([:instance, :rewrite_policy], Pleroma.Web.ActivityPub.MRF.NoOpPolicy) clear_config([:instance, :rewrite_policy], [])
clear_config([:instance, :mrf_transparency], true) clear_config([:instance, :mrf_transparency], true)
clear_config([:instance, :mrf_transparency_exclusions], []) clear_config([:instance, :mrf_transparency_exclusions], [])

View file

@ -3,10 +3,10 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicyTest do defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicyTest do
use Pleroma.DataCase use ExUnit.Case
use Pleroma.Tests.Helpers
alias Pleroma.HTTP alias Pleroma.HTTP
alias Pleroma.Tests.ObanHelpers
alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
import Mock import Mock
@ -25,13 +25,13 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicyTest do
setup do: clear_config([:media_proxy, :enabled], true) setup do: clear_config([:media_proxy, :enabled], true)
test "it prefetches media proxy URIs" do test "it prefetches media proxy URIs" do
Tesla.Mock.mock(fn %{method: :get, url: "http://example.com/image.jpg"} ->
{:ok, %Tesla.Env{status: 200, body: ""}}
end)
with_mock HTTP, get: fn _, _, _ -> {:ok, []} end do with_mock HTTP, get: fn _, _, _ -> {:ok, []} end do
MediaProxyWarmingPolicy.filter(@message) MediaProxyWarmingPolicy.filter(@message)
ObanHelpers.perform_all()
# Performing jobs which has been just enqueued
ObanHelpers.perform_all()
assert called(HTTP.get(:_, :_, :_)) assert called(HTTP.get(:_, :_, :_))
end end
end end

View file

@ -328,7 +328,7 @@ test "fake statuses' preview card is not cached", %{conn: conn} do
end end
test "posting a status with OGP link preview", %{conn: conn} do test "posting a status with OGP link preview", %{conn: conn} do
Tesla.Mock.mock(fn env -> apply(HttpRequestMock, :request, [env]) end) Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
clear_config([:rich_media, :enabled], true) clear_config([:rich_media, :enabled], true)
conn = conn =
@ -1197,7 +1197,7 @@ test "on pin removes deletion job, on unpin reschedule deletion" do
end end
test "returns rich-media card", %{conn: conn, user: user} do test "returns rich-media card", %{conn: conn, user: user} do
Tesla.Mock.mock(fn env -> apply(HttpRequestMock, :request, [env]) end) Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
{:ok, activity} = CommonAPI.post(user, %{status: "https://example.com/ogp"}) {:ok, activity} = CommonAPI.post(user, %{status: "https://example.com/ogp"})
@ -1242,7 +1242,7 @@ test "returns rich-media card", %{conn: conn, user: user} do
end end
test "replaces missing description with an empty string", %{conn: conn, user: user} do test "replaces missing description with an empty string", %{conn: conn, user: user} do
Tesla.Mock.mock(fn env -> apply(HttpRequestMock, :request, [env]) end) Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
{:ok, activity} = CommonAPI.post(user, %{status: "https://example.com/ogp-missing-data"}) {:ok, activity} = CommonAPI.post(user, %{status: "https://example.com/ogp-missing-data"})

View file

@ -48,7 +48,7 @@ test "it displays a chat message" do
clear_config([:rich_media, :enabled], true) clear_config([:rich_media, :enabled], true)
Tesla.Mock.mock(fn Tesla.Mock.mock_global(fn
%{url: "https://example.com/ogp"} -> %{url: "https://example.com/ogp"} ->
%Tesla.Env{status: 200, body: File.read!("test/fixtures/rich_media/ogp.html")} %Tesla.Env{status: 200, body: File.read!("test/fixtures/rich_media/ogp.html")}
end) end)