Remove ETS backend, add max_retries, add options to limit/3.
This commit is contained in:
parent
9f95ba8f65
commit
fb0bcf9e8d
1 changed files with 34 additions and 70 deletions
|
@ -6,36 +6,32 @@ defmodule ConcurrentLimiter do
|
|||
|
||||
A concurrency limiter. Limits the number of concurrent invocations possible, without using a worker pool or different processes.
|
||||
|
||||
It can be useful in cases where you don't need a worker pool but still being able to limit concurrent calls without much overhead. As it internally uses `persistent_term` to store metadata, and can fallback to ETS tables, it is however not made for a large number of limiters and cannot be used for things like a per-user rate limiter.
|
||||
|
||||
It supports two storage methods:
|
||||
|
||||
* **[atomics](https://erlang.org/doc/man/atomics.html)** recommended and default if your OTP is > 21.2.
|
||||
* **[ets](https://erlang.org/doc/man/ets.html)** either with a single table per limiter (faster) or a shared table.
|
||||
|
||||
You would almost always want to use atomics, ets is mostly there for backwards compatibility.
|
||||
It can be useful in cases where you don't need a worker pool but still being able to limit concurrent calls without much overhead.
|
||||
As it internally uses `persistent_term` to store metadata, and can fallback to ETS tables, it is however not made for a large number
|
||||
of limiters and cannot be used for things like a per-user rate limiter.
|
||||
"""
|
||||
|
||||
@doc """
|
||||
Initializes a `ConcurrentLimiter`.
|
||||
"""
|
||||
|
||||
@default_wait 150
|
||||
@default_max_retries 5
|
||||
|
||||
@spec new(name, max_running, max_waiting, options) :: :ok | {:error, :existing} when name: atom(),
|
||||
max_running: non_neg_integer(),
|
||||
max_waiting: non_neg_integer() | :infinity,
|
||||
options: [option],
|
||||
option: {:wait, non_neg_integer()} | backend,
|
||||
backend: :atomics | ets_backend,
|
||||
ets_backend: :ets | {:ets, atom()} | {:ets, atom(), ets_options :: []}
|
||||
option: {:wait, non_neg_integer()} | {:max_retries, non_neg_integer()}
|
||||
def new(name, max_running, max_waiting, options \\ []) do
|
||||
name = prefix_name(name)
|
||||
if defined?(name) do
|
||||
{:error, :existing}
|
||||
else
|
||||
wait = Keyword.get(options, :wait, 150)
|
||||
backend = Keyword.get(options, :backend, default_backend())
|
||||
{:ok, backend} = setup_backend(backend)
|
||||
:persistent_term.put(name, {__MODULE__, max_running, max_waiting, backend, wait})
|
||||
wait = Keyword.get(options, :wait, @default_wait)
|
||||
max_retries = Keyword.get(options, :max_retries, @default_max_retries)
|
||||
atomics = :atomics.new(1, [signed: true])
|
||||
:persistent_term.put(name, {__MODULE__, max_running, max_waiting, atomics, wait, max_retries})
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
@ -50,8 +46,9 @@ defmodule ConcurrentLimiter do
|
|||
name = prefix_name(name)
|
||||
if defined?(name) do
|
||||
new_wait = Keyword.get(options, :wait)
|
||||
{__MODULE__, max_running, max_waiting, backend, wait} = :persistent_term.get(name)
|
||||
new = {__MODULE__, new_max_running || max_running, new_max_waiting || max_waiting, backend, new_wait || wait}
|
||||
new_max_retries = Keyword.get(options, :max_retries)
|
||||
{__MODULE__, max_running, max_waiting, ref, wait, max_retries} = :persistent_term.get(name)
|
||||
new = {__MODULE__, new_max_running || max_running, new_max_waiting || max_waiting, ref, new_wait || wait, new_max_retries || max_retries}
|
||||
:persistent_term.put(name, new)
|
||||
:ok
|
||||
else
|
||||
|
@ -59,53 +56,51 @@ defmodule ConcurrentLimiter do
|
|||
end
|
||||
end
|
||||
|
||||
@spec limit(atom(), function()) :: {:error, :overload} | any()
|
||||
@spec limit(atom(), function(), opts) :: {:error, :overload} | any() when opts: [option],
|
||||
option: {:wait, non_neg_integer()} | {:max_retries, non_neg_integer()}
|
||||
@doc "Limits invocation of `fun`."
|
||||
def limit(name, fun) do
|
||||
do_limit(prefix_name(name), fun)
|
||||
def limit(name, fun, opts \\ []) do
|
||||
do_limit(prefix_name(name), fun, opts, 0)
|
||||
end
|
||||
|
||||
defp do_limit(name, fun) do
|
||||
{__MODULE__, max_running, max_waiting, backend, wait} = :persistent_term.get(name)
|
||||
defp do_limit(name, fun, opts, retries) do
|
||||
{__MODULE__, max_running, max_waiting, ref, wait, max_retries} = :persistent_term.get(name)
|
||||
max = max_running + max_waiting
|
||||
counter = inc(backend, name)
|
||||
counter = inc(ref, name)
|
||||
max_retries = Keyword.get(opts, :max_retries) || max_retries
|
||||
|
||||
cond do
|
||||
counter <= max_running ->
|
||||
try do
|
||||
fun.()
|
||||
after
|
||||
dec(backend, name)
|
||||
dec(ref, name)
|
||||
end
|
||||
|
||||
counter > max ->
|
||||
dec(backend, name)
|
||||
dec(ref, name)
|
||||
{:error, :overload}
|
||||
|
||||
retries + 1 > max_retries ->
|
||||
{:error, :overload}
|
||||
|
||||
counter > max_running ->
|
||||
wait(backend, name, wait, fun)
|
||||
wait(ref, name, wait, fun, opts, max_retries, retries + 1)
|
||||
end
|
||||
end
|
||||
|
||||
defp wait(backend, name, wait, fun) do
|
||||
defp wait(ref, name, wait, fun, opts, max_retries, retries) do
|
||||
wait = Keyword.get(opts, :timeout) || wait
|
||||
Process.sleep(wait)
|
||||
dec(backend, name)
|
||||
do_limit(name, fun)
|
||||
dec(ref, name)
|
||||
do_limit(name, fun, opts, retries)
|
||||
end
|
||||
|
||||
defp inc({:ets, ets}, name) do
|
||||
:ets.update_counter(ets, name, {2, 1}, {name, 0})
|
||||
end
|
||||
|
||||
defp inc({:atomics, ref}, _) do
|
||||
defp inc(ref, _) do
|
||||
:atomics.add_get(ref, 1, 1)
|
||||
end
|
||||
|
||||
defp dec({:ets, ets}, name) do
|
||||
:ets.update_counter(ets, name, {2, -1}, {name, 0})
|
||||
end
|
||||
|
||||
defp dec({:atomics, ref}, _) do
|
||||
defp dec(ref, _) do
|
||||
:atomics.sub_get(ref, 1, 1)
|
||||
end
|
||||
|
||||
|
@ -118,35 +113,4 @@ defmodule ConcurrentLimiter do
|
|||
_ -> false
|
||||
end
|
||||
|
||||
defp default_backend() do
|
||||
if Code.ensure_loaded?(:atomics) do
|
||||
:atomics
|
||||
else
|
||||
Logger.debug("ConcurrentLimiter: atomics not available, using ETS backend")
|
||||
:ets
|
||||
end
|
||||
end
|
||||
|
||||
defp setup_backend(:ets) do
|
||||
setup_backend({:ets, ETS})
|
||||
end
|
||||
|
||||
defp setup_backend({:ets, name}) do
|
||||
setup_backend({:ets, name, [{:write_concurrency, true}, {:read_concurrency, true}]})
|
||||
end
|
||||
|
||||
defp setup_backend({:ets, name, options}) do
|
||||
ets_name = prefix_name(name)
|
||||
|
||||
case :ets.whereis(ets_name) do
|
||||
:undefined -> :ets.new(ets_name, [:public, :named_table] ++ options)
|
||||
_ -> nil
|
||||
end
|
||||
{:ok, {:ets, ets_name}}
|
||||
end
|
||||
|
||||
defp setup_backend(:atomics) do
|
||||
{:ok, {:atomics, :atomics.new(1, [signed: true])}}
|
||||
end
|
||||
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue