forked from AkkomaGang/akkoma
rate limiter: Fix a race condition
When multiple requests are processed by rate limiter plug at the same time and the bucket is not yet initialized, both would try to initialize the bucket resulting in an internal server error.
This commit is contained in:
parent
df2173343a
commit
4d416343fa
3 changed files with 49 additions and 7 deletions
|
@ -7,8 +7,8 @@ def start_link(init_arg) do
|
|||
DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
|
||||
end
|
||||
|
||||
def add_limiter(limiter_name, expiration) do
|
||||
{:ok, _pid} =
|
||||
def add_or_return_limiter(limiter_name, expiration) do
|
||||
result =
|
||||
DynamicSupervisor.start_child(
|
||||
__MODULE__,
|
||||
%{
|
||||
|
@ -28,6 +28,12 @@ def add_limiter(limiter_name, expiration) do
|
|||
]}
|
||||
}
|
||||
)
|
||||
|
||||
case result do
|
||||
{:ok, _pid} = result -> result
|
||||
{:error, {:already_started, pid}} -> {:ok, pid}
|
||||
_ -> result
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
|
|
|
@ -171,7 +171,7 @@ defp check_rate(action_settings) do
|
|||
{:error, value}
|
||||
|
||||
{:error, :no_cache} ->
|
||||
initialize_buckets(action_settings)
|
||||
initialize_buckets!(action_settings)
|
||||
check_rate(action_settings)
|
||||
end
|
||||
end
|
||||
|
@ -250,11 +250,16 @@ defp attach_selected_params(input, %{conn_params: conn_params, opts: plug_opts})
|
|||
|> String.replace_leading(":", "")
|
||||
end
|
||||
|
||||
defp initialize_buckets(%{name: _name, limits: nil}), do: :ok
|
||||
defp initialize_buckets!(%{name: _name, limits: nil}), do: :ok
|
||||
|
||||
defp initialize_buckets(%{name: name, limits: limits}) do
|
||||
LimiterSupervisor.add_limiter(anon_bucket_name(name), get_scale(:anon, limits))
|
||||
LimiterSupervisor.add_limiter(user_bucket_name(name), get_scale(:user, limits))
|
||||
defp initialize_buckets!(%{name: name, limits: limits}) do
|
||||
{:ok, _pid} =
|
||||
LimiterSupervisor.add_or_return_limiter(anon_bucket_name(name), get_scale(:anon, limits))
|
||||
|
||||
{:ok, _pid} =
|
||||
LimiterSupervisor.add_or_return_limiter(user_bucket_name(name), get_scale(:user, limits))
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
defp attach_identity(base, %{mode: :user, conn_info: conn_info}),
|
||||
|
|
|
@ -242,4 +242,35 @@ test "different users are counted independently" do
|
|||
refute conn_2.halted
|
||||
end
|
||||
end
|
||||
|
||||
test "doesn't crash due to a race condition when multiple requests are made at the same time and the bucket is not yet initialized" do
|
||||
limiter_name = :test_race_condition
|
||||
Pleroma.Config.put([:rate_limit, limiter_name], {1000, 5})
|
||||
Pleroma.Config.put([Pleroma.Web.Endpoint, :http, :ip], {8, 8, 8, 8})
|
||||
|
||||
opts = RateLimiter.init(name: limiter_name)
|
||||
|
||||
conn = conn(:get, "/")
|
||||
conn_2 = conn(:get, "/")
|
||||
|
||||
%Task{pid: pid1} =
|
||||
task1 =
|
||||
Task.async(fn ->
|
||||
receive do
|
||||
:process2_up ->
|
||||
RateLimiter.call(conn, opts)
|
||||
end
|
||||
end)
|
||||
|
||||
task2 =
|
||||
Task.async(fn ->
|
||||
send(pid1, :process2_up)
|
||||
RateLimiter.call(conn_2, opts)
|
||||
end)
|
||||
|
||||
Task.await(task1)
|
||||
Task.await(task2)
|
||||
|
||||
refute {:err, :not_found} == RateLimiter.inspect_bucket(conn, limiter_name, opts)
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue