additionally support retry-after values
Some checks are pending
ci/woodpecker/push/build-amd64 Pipeline is pending
ci/woodpecker/push/build-arm64 Pipeline is pending
ci/woodpecker/push/docs Pipeline is pending
ci/woodpecker/push/lint Pipeline is pending
ci/woodpecker/push/test Pipeline is pending
ci/woodpecker/pr/build-amd64 Pipeline is pending
ci/woodpecker/pr/build-arm64 Pipeline is pending
ci/woodpecker/pr/docs Pipeline is pending
ci/woodpecker/pr/lint Pipeline is pending
ci/woodpecker/pr/test Pipeline is pending

This commit is contained in:
Floatingghost 2024-05-06 23:34:48 +01:00
parent 010e8c7bb2
commit bd74693db6
2 changed files with 136 additions and 44 deletions

View file

@ -5,66 +5,114 @@ defmodule Pleroma.HTTP.Backoff do
@cachex Pleroma.Config.get([:cachex, :provider], Cachex) @cachex Pleroma.Config.get([:cachex, :provider], Cachex)
@backoff_cache :http_backoff_cache @backoff_cache :http_backoff_cache
# attempt to parse a timestamp from a header
# returns nil if it can't parse the timestamp
@spec timestamp_or_nil(binary) :: DateTime.t() | nil
defp timestamp_or_nil(header) do
case DateTime.from_iso8601(header) do
{:ok, stamp, _} ->
stamp
_ ->
nil
end
end
# attempt to parse the x-ratelimit-reset header from the headers
@spec x_ratelimit_reset(headers :: list) :: DateTime.t() | nil
defp x_ratelimit_reset(headers) do
with {_header, value} <- List.keyfind(headers, "x-ratelimit-reset", 0),
true <- is_binary(value) do
timestamp_or_nil(value)
else
_ ->
nil
end
end
# attempt to parse the Retry-After header from the headers
# this can be either a timestamp _or_ a number of seconds to wait!
# we'll return a datetime if we can parse it, or nil if we can't
@spec retry_after(headers :: list) :: DateTime.t() | nil
defp retry_after(headers) do
with {_header, value} <- List.keyfind(headers, "retry-after", 0),
true <- is_binary(value) do
# first, see if it's an integer
case Integer.parse(value) do
{seconds, ""} ->
Logger.debug("Parsed Retry-After header: #{seconds} seconds")
DateTime.utc_now() |> Timex.shift(seconds: seconds)
_ ->
# if it's not an integer, try to parse it as a timestamp
timestamp_or_nil(value)
end
else
_ ->
nil
end
end
# given a set of headers, will attempt to find the next backoff timestamp
# if it can't find one, it will default to 5 minutes from now
@spec next_backoff_timestamp(%{headers: list}) :: DateTime.t()
defp next_backoff_timestamp(%{headers: headers}) when is_list(headers) do defp next_backoff_timestamp(%{headers: headers}) when is_list(headers) do
# figure out from the 429 response when we can make the next request
# mastodon uses the x-ratelimit-reset header, so we will use that!
# other servers may not, so we'll default to 5 minutes from now if we can't find it
default_5_minute_backoff = default_5_minute_backoff =
DateTime.utc_now() DateTime.utc_now()
|> Timex.shift(seconds: 5 * 60) |> Timex.shift(seconds: 5 * 60)
case Enum.find_value(headers, fn {"x-ratelimit-reset", value} -> value end) do backoff =
nil -> [&x_ratelimit_reset/1, &retry_after/1]
Logger.error( |> Enum.map(& &1.(headers))
"Rate limited, but couldn't find timestamp! Using default 5 minute backoff until #{default_5_minute_backoff}" |> Enum.find(&(&1 != nil))
)
default_5_minute_backoff if is_nil(backoff) do
Logger.debug("No backoff headers found, defaulting to 5 minutes from now")
value -> default_5_minute_backoff
with {:ok, stamp, _} <- DateTime.from_iso8601(value) do else
Logger.error("Rate limited until #{stamp}") Logger.debug("Found backoff header, will back off until: #{backoff}")
stamp backoff
else
_ ->
Logger.error(
"Rate limited, but couldn't parse timestamp! Using default 5 minute backoff until #{default_5_minute_backoff}"
)
default_5_minute_backoff
end
end end
end end
defp next_backoff_timestamp(_), do: DateTime.utc_now() |> Timex.shift(seconds: 5 * 60) defp next_backoff_timestamp(_), do: DateTime.utc_now() |> Timex.shift(seconds: 5 * 60)
# utility function to check the HTTP response for potential backoff headers
# will check if we get a 429 or 503 response, and if we do, will back off for a bit
@spec check_backoff({:ok | :error, HTTP.Env.t()}, binary()) ::
{:ok | :error, HTTP.Env.t()} | {:error, :ratelimit}
defp check_backoff({:ok, env}, host) do
case env.status do
status when status in [429, 503] ->
Logger.error("Rate limited on #{host}! Backing off...")
timestamp = next_backoff_timestamp(env)
ttl = Timex.diff(timestamp, DateTime.utc_now(), :seconds)
# we will cache the host for 5 minutes
@cachex.put(@backoff_cache, host, true, ttl: ttl)
{:error, :ratelimit}
_ ->
{:ok, env}
end
end
defp check_backoff(env, _), do: env
@doc """
this acts as a single throughput for all GET requests
we will check if the host is in the cache, and if it is, we will automatically fail the request
this ensures that we don't hammer the server with requests, and instead wait for the backoff to expire
this is a very simple implementation, and can be improved upon!
"""
@spec get(binary, list, list) :: {:ok | :error, HTTP.Env.t()} | {:error, :ratelimit}
def get(url, headers \\ [], options \\ []) do def get(url, headers \\ [], options \\ []) do
# this acts as a single throughput for all GET requests
# we will check if the host is in the cache, and if it is, we will automatically fail the request
# this ensures that we don't hammer the server with requests, and instead wait for the backoff to expire
# this is a very simple implementation, and can be improved upon!
%{host: host} = URI.parse(url) %{host: host} = URI.parse(url)
case @cachex.get(@backoff_cache, host) do case @cachex.get(@backoff_cache, host) do
{:ok, nil} -> {:ok, nil} ->
case HTTP.get(url, headers, options) do url
{:ok, env} -> |> HTTP.get(headers, options)
case env.status do |> check_backoff(host)
429 ->
Logger.error("Rate limited on #{host}! Backing off...")
timestamp = next_backoff_timestamp(env)
ttl = Timex.diff(timestamp, DateTime.utc_now(), :seconds)
# we will cache the host for 5 minutes
@cachex.put(@backoff_cache, host, true, ttl: ttl)
{:error, :ratelimit}
_ ->
{:ok, env}
end
{:error, env} ->
{:error, env}
end
_ -> _ ->
{:error, :ratelimit} {:error, :ratelimit}

View file

@ -3,6 +3,10 @@ defmodule Pleroma.HTTP.BackoffTest do
use Pleroma.DataCase, async: false use Pleroma.DataCase, async: false
alias Pleroma.HTTP.Backoff alias Pleroma.HTTP.Backoff
defp within_tolerance?(ttl, expected) do
ttl > expected - 10 and ttl < expected + 10
end
describe "get/3" do describe "get/3" do
test "should return {:ok, env} when not rate limited" do test "should return {:ok, env} when not rate limited" do
Tesla.Mock.mock_global(fn Tesla.Mock.mock_global(fn
@ -46,6 +50,46 @@ test "should parse the value of x-ratelimit-reset, if present" do
assert {:error, :ratelimit} = Backoff.get("https://ratelimited.dev/api/v1/instance") assert {:error, :ratelimit} = Backoff.get("https://ratelimited.dev/api/v1/instance")
assert {:ok, true} = Cachex.get(@backoff_cache, "ratelimited.dev") assert {:ok, true} = Cachex.get(@backoff_cache, "ratelimited.dev")
{:ok, ttl} = Cachex.ttl(@backoff_cache, "ratelimited.dev")
assert within_tolerance?(ttl, 600)
end
test "should parse the value of retry-after when it's a timestamp" do
ten_minutes_from_now =
DateTime.utc_now() |> Timex.shift(minutes: 10) |> DateTime.to_iso8601()
Tesla.Mock.mock_global(fn
%Tesla.Env{url: "https://ratelimited.dev/api/v1/instance"} ->
{:ok,
%Tesla.Env{
status: 429,
body: "Rate limited",
headers: [{"retry-after", ten_minutes_from_now}]
}}
end)
assert {:error, :ratelimit} = Backoff.get("https://ratelimited.dev/api/v1/instance")
assert {:ok, true} = Cachex.get(@backoff_cache, "ratelimited.dev")
{:ok, ttl} = Cachex.ttl(@backoff_cache, "ratelimited.dev")
assert within_tolerance?(ttl, 600)
end
test "should parse the value of retry-after when it's a number of seconds" do
Tesla.Mock.mock_global(fn
%Tesla.Env{url: "https://ratelimited.dev/api/v1/instance"} ->
{:ok,
%Tesla.Env{
status: 429,
body: "Rate limited",
headers: [{"retry-after", "600"}]
}}
end)
assert {:error, :ratelimit} = Backoff.get("https://ratelimited.dev/api/v1/instance")
assert {:ok, true} = Cachex.get(@backoff_cache, "ratelimited.dev")
# assert that the value is 10 minutes from now
{:ok, ttl} = Cachex.ttl(@backoff_cache, "ratelimited.dev")
assert within_tolerance?(ttl, 600)
end end
end end
end end