Convert rich media backfill to oban task

This commit is contained in:
Floatingghost 2024-06-11 18:06:51 +01:00 committed by Norm
parent a41545a781
commit a8243d2d51
8 changed files with 47 additions and 36 deletions

View file

@ -583,6 +583,7 @@
search_indexing: 10, search_indexing: 10,
nodeinfo_fetcher: 1, nodeinfo_fetcher: 1,
database_prune: 1, database_prune: 1,
rich_media_backfill: 2,
rich_media_expiration: 2 rich_media_expiration: 2
], ],
plugins: [ plugins: [
@ -599,7 +600,8 @@
retries: [ retries: [
federator_incoming: 5, federator_incoming: 5,
federator_outgoing: 5, federator_outgoing: 5,
search_indexing: 2 search_indexing: 2,
rich_media_backfill: 3
], ],
timeout: [ timeout: [
activity_expiration: :timer.seconds(5), activity_expiration: :timer.seconds(5),
@ -621,7 +623,8 @@
mute_expire: :timer.seconds(5), mute_expire: :timer.seconds(5),
search_indexing: :timer.seconds(5), search_indexing: :timer.seconds(5),
nodeinfo_fetcher: :timer.seconds(10), nodeinfo_fetcher: :timer.seconds(10),
database_prune: :timer.minutes(10) database_prune: :timer.minutes(10),
rich_media_backfill: :timer.seconds(30)
] ]
config :pleroma, Pleroma.Formatter, config :pleroma, Pleroma.Formatter,

View file

@ -74,6 +74,7 @@ def request(method, url, body, headers, options) when is_binary(url) do
request = build_request(method, headers, options, url, body, params) request = build_request(method, headers, options, url, body, params)
client = Tesla.client([Tesla.Middleware.FollowRedirects, Tesla.Middleware.Telemetry]) client = Tesla.client([Tesla.Middleware.FollowRedirects, Tesla.Middleware.Telemetry])
Logger.debug("Outbound: #{method} #{url}")
request(client, request) request(client, request)
rescue rescue
e -> e ->

View file

@ -3,6 +3,10 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.RichMedia.Backfill do defmodule Pleroma.Web.RichMedia.Backfill do
use Pleroma.Workers.WorkerHelper,
queue: "rich_media_backfill",
unique: [period: 300, states: Oban.Job.states(), keys: [:op, :url_hash]]
alias Pleroma.Web.RichMedia.Card alias Pleroma.Web.RichMedia.Card
alias Pleroma.Web.RichMedia.Parser alias Pleroma.Web.RichMedia.Parser
alias Pleroma.Web.RichMedia.Parser.TTL alias Pleroma.Web.RichMedia.Parser.TTL
@ -10,61 +14,58 @@ defmodule Pleroma.Web.RichMedia.Backfill do
require Logger require Logger
@backfiller Pleroma.Config.get([__MODULE__, :provider], Pleroma.Web.RichMedia.Backfill.Task)
@cachex Pleroma.Config.get([:cachex, :provider], Cachex) @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
@max_attempts 3
@retry 5_000
def start(%{url: url} = args) when is_binary(url) do def start(%{url: url} = args) when is_binary(url) do
url_hash = Card.url_to_hash(url) url_hash = Card.url_to_hash(url)
args = args =
args args
|> Map.put(:attempt, 1)
|> Map.put(:url_hash, url_hash) |> Map.put(:url_hash, url_hash)
@backfiller.run(args) __MODULE__.enqueue("rich_media_backfill", args)
end end
def run(%{url: url, url_hash: url_hash, attempt: attempt} = args) def perform(%Oban.Job{args: %{"op" => "rich_media_backfill", "url" => url} = args})
when attempt <= @max_attempts do when is_binary(url) do
run(args)
end
def run(%{"url" => url, "url_hash" => url_hash} = args) do
case Parser.parse(url) do case Parser.parse(url) do
{:ok, fields} -> {:ok, fields} ->
{:ok, card} = Card.create(url, fields) {:ok, card} = Card.create(url, fields)
maybe_schedule_expiration(url, fields) maybe_schedule_expiration(url, fields)
if Map.has_key?(args, :activity_id) do if Map.has_key?(args, "activity_id") do
stream_update(args) stream_update(args)
end end
warm_cache(url_hash, card) warm_cache(url_hash, card)
:ok
{:error, {:invalid_metadata, fields}} -> {:error, {:invalid_metadata, fields}} ->
Logger.debug("Rich media incomplete or invalid metadata for #{url}: #{inspect(fields)}") Logger.debug("Rich media incomplete or invalid metadata for #{url}: #{inspect(fields)}")
negative_cache(url_hash) negative_cache(url_hash, :timer.minutes(30))
{:error, :body_too_large} -> {:error, :body_too_large} ->
Logger.error("Rich media error for #{url}: :body_too_large") Logger.error("Rich media error for #{url}: :body_too_large")
negative_cache(url_hash) negative_cache(url_hash, :timer.minutes(30))
{:error, {:content_type, type}} -> {:error, {:content_type, type}} ->
Logger.debug("Rich media error for #{url}: :content_type is #{type}") Logger.debug("Rich media error for #{url}: :content_type is #{type}")
negative_cache(url_hash) negative_cache(url_hash, :timer.minutes(30))
e -> e ->
Logger.debug("Rich media error for #{url}: #{inspect(e)}") Logger.debug("Rich media error for #{url}: #{inspect(e)}")
{:error, e}
:timer.sleep(@retry * attempt)
run(%{args | attempt: attempt + 1})
end end
end end
def run(%{url: url, url_hash: url_hash}) do def run(e) do
Logger.debug("Rich media failure for #{url}") Logger.error("Rich media failure - invalid args: #{inspect(e)}")
{:discard, :invalid}
negative_cache(url_hash, :timer.minutes(15))
end end
defp maybe_schedule_expiration(url, fields) do defp maybe_schedule_expiration(url, fields) do
@ -80,22 +81,18 @@ defp maybe_schedule_expiration(url, fields) do
end end
end end
defp stream_update(%{activity_id: activity_id}) do defp stream_update(%{"activity_id" => activity_id}) do
Logger.info("Rich media backfill: streaming update for activity #{activity_id}")
Pleroma.Activity.get_by_id(activity_id) Pleroma.Activity.get_by_id(activity_id)
|> Pleroma.Activity.normalize() |> Pleroma.Activity.normalize()
|> Pleroma.Web.ActivityPub.ActivityPub.stream_out() |> Pleroma.Web.ActivityPub.ActivityPub.stream_out()
end end
defp warm_cache(key, val), do: @cachex.put(:rich_media_cache, key, val) defp warm_cache(key, val), do: @cachex.put(:rich_media_cache, key, val)
defp negative_cache(key, ttl \\ nil), do: @cachex.put(:rich_media_cache, key, nil, ttl: ttl)
end
defmodule Pleroma.Web.RichMedia.Backfill.Task do def negative_cache(key, ttl \\ :timer.minutes(30)) do
alias Pleroma.Web.RichMedia.Backfill @cachex.put(:rich_media_cache, key, nil, ttl: ttl)
{:discard, :error}
def run(args) do
Task.Supervisor.start_child(Pleroma.TaskSupervisor, Backfill, :run, [args],
name: {:global, {:rich_media, args.url_hash}}
)
end end
end end

View file

@ -78,7 +78,8 @@ def application do
:comeonin, :comeonin,
:fast_sanitize, :fast_sanitize,
:os_mon, :os_mon,
:ssl :ssl,
:recon
], ],
included_applications: [:ex_syslogger] included_applications: [:ex_syslogger]
] ]
@ -158,9 +159,7 @@ defp deps do
{:timex, "~> 3.7"}, {:timex, "~> 3.7"},
{:ueberauth, "== 0.10.5"}, {:ueberauth, "== 0.10.5"},
{:linkify, "~> 0.5.3"}, {:linkify, "~> 0.5.3"},
{:http_signatures, {:http_signatures, "~> 0.1.2"},
git: "https://akkoma.dev/AkkomaGang/http_signatures.git",
ref: "6640ce7d24c783ac2ef56e27d00d12e8dc85f396"},
{:telemetry, "~> 1.2"}, {:telemetry, "~> 1.2"},
{:telemetry_poller, "~> 1.0"}, {:telemetry_poller, "~> 1.0"},
{:telemetry_metrics, "~> 0.6"}, {:telemetry_metrics, "~> 0.6"},

View file

@ -57,7 +57,7 @@
"hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"}, "hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"},
"hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"}, "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"},
"html_entities": {:hex, :html_entities, "0.5.2", "9e47e70598da7de2a9ff6af8758399251db6dbb7eebe2b013f2bbd2515895c3c", [:mix], [], "hexpm", "c53ba390403485615623b9531e97696f076ed415e8d8058b1dbaa28181f4fdcc"}, "html_entities": {:hex, :html_entities, "0.5.2", "9e47e70598da7de2a9ff6af8758399251db6dbb7eebe2b013f2bbd2515895c3c", [:mix], [], "hexpm", "c53ba390403485615623b9531e97696f076ed415e8d8058b1dbaa28181f4fdcc"},
"http_signatures": {:git, "https://akkoma.dev/AkkomaGang/http_signatures.git", "6640ce7d24c783ac2ef56e27d00d12e8dc85f396", [ref: "6640ce7d24c783ac2ef56e27d00d12e8dc85f396"]}, "http_signatures": {:hex, :http_signatures, "0.1.2", "ed1cc7043abcf5bb4f30d68fb7bad9d618ec1a45c4ff6c023664e78b67d9c406", [:mix], [], "hexpm", "f08aa9ac121829dae109d608d83c84b940ef2f183ae50f2dd1e9a8bc619d8be7"},
"httpoison": {:hex, :httpoison, "1.8.2", "9eb9c63ae289296a544842ef816a85d881d4a31f518a0fec089aaa744beae290", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "2bb350d26972e30c96e2ca74a1aaf8293d61d0742ff17f01e0279fef11599921"}, "httpoison": {:hex, :httpoison, "1.8.2", "9eb9c63ae289296a544842ef816a85d881d4a31f518a0fec089aaa744beae290", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "2bb350d26972e30c96e2ca74a1aaf8293d61d0742ff17f01e0279fef11599921"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
"inet_cidr": {:hex, :inet_cidr, "1.0.8", "d26bb7bdbdf21ae401ead2092bf2bb4bf57fe44a62f5eaa5025280720ace8a40", [:mix], [], "hexpm", "d5b26da66603bb56c933c65214c72152f0de9a6ea53618b56d63302a68f6a90e"}, "inet_cidr": {:hex, :inet_cidr, "1.0.8", "d26bb7bdbdf21ae401ead2092bf2bb4bf57fe44a62f5eaa5025280720ace8a40", [:mix], [], "hexpm", "d5b26da66603bb56c933c65214c72152f0de9a6ea53618b56d63302a68f6a90e"},

View file

@ -36,6 +36,9 @@ test "crawls URL in activity" do
content_type: "text/markdown" content_type: "text/markdown"
}) })
# wait for oban
Pleroma.Tests.ObanHelpers.perform_all()
assert %Card{url_hash: ^url_hash, fields: _} = Card.get_by_activity(activity) assert %Card{url_hash: ^url_hash, fields: _} = Card.get_by_activity(activity)
end end
@ -50,6 +53,8 @@ test "recrawls URLs on status edits/updates" do
# Force a backfill # Force a backfill
Card.get_by_activity(activity) Card.get_by_activity(activity)
# wait for oban
Pleroma.Tests.ObanHelpers.perform_all()
assert match?( assert match?(
%Card{url_hash: ^original_url_hash, fields: _}, %Card{url_hash: ^original_url_hash, fields: _},
@ -62,6 +67,8 @@ test "recrawls URLs on status edits/updates" do
# Force a backfill # Force a backfill
Card.get_by_activity(activity) Card.get_by_activity(activity)
# wait for oban
Pleroma.Tests.ObanHelpers.perform_all()
assert match?( assert match?(
%Card{url_hash: ^updated_url_hash, fields: _}, %Card{url_hash: ^updated_url_hash, fields: _},

View file

@ -73,6 +73,8 @@ test "s3 signed url is parsed and correct ttl is set for rich media" do
end) end)
Card.get_or_backfill_by_url(url) Card.get_or_backfill_by_url(url)
# wait for oban
Pleroma.Tests.ObanHelpers.perform_all()
assert_enqueued(worker: Pleroma.Workers.RichMediaExpirationWorker, args: %{"url" => url}) assert_enqueued(worker: Pleroma.Workers.RichMediaExpirationWorker, args: %{"url" => url})

View file

@ -35,6 +35,8 @@ test "OpenGraph TTL value is honored" do
end) end)
Card.get_or_backfill_by_url(url) Card.get_or_backfill_by_url(url)
# wait for oban
Pleroma.Tests.ObanHelpers.perform_all()
assert_enqueued(worker: Pleroma.Workers.RichMediaExpirationWorker, args: %{"url" => url}) assert_enqueued(worker: Pleroma.Workers.RichMediaExpirationWorker, args: %{"url" => url})
end end