Rename to ConcurrentLimiter

This commit is contained in:
Jordan Bracco 2020-05-14 22:14:09 +02:00
parent f5ad9dcd31
commit 9f95ba8f65
8 changed files with 132 additions and 111 deletions

View file

@ -1,3 +1,3 @@
# Limiter
# Concurrent Limiter
See the docs in `lib/limiter.ex`.
See the docs in `lib/concurrent_limiter.ex`.

View file

@ -1,21 +1,23 @@
defmodule Limiter do
defmodule ConcurrentLimiter do
require Logger
@moduledoc """
# Limiter
# Concurrent Limiter
A concurrency limiter. Limits the number of concurrent invocations possible, without using a worker pool or different processes.
It can be useful in cases where you don't need a worker pool but still being able to limit concurrent calls without much overhead. As it internally uses `persistent_term` to store metadata, and can fallback to ETS tables, it is however not made for a large number of limiters and cannot be used for things like a per-user rate limiter.
It supports two storage methods:
* **[atomics](https://erlang.org/doc/man/atomics.html)** recommended and default if your OTP is > 21.2.
* **[ets](https://erlang.org/doc/man/ets.html)** either with a single table per Limiter (faster) or a shared table.
* **[ets](https://erlang.org/doc/man/ets.html)** either with a single table per limiter (faster) or a shared table.
You would however always want to use atomics, ets is mostly there for backwards compatibility.
You would almost always want to use atomics, ets is mostly there for backwards compatibility.
"""
@doc """
Initializes a `Limiter`.
Initializes a `ConcurrentLimiter`.
"""
@spec new(name, max_running, max_waiting, options) :: :ok | {:error, :existing} when name: atom(),
@ -24,9 +26,9 @@ defmodule Limiter do
options: [option],
option: {:wait, non_neg_integer()} | backend,
backend: :atomics | ets_backend,
ets_backend: :ets | {:ets, atom()} | {:ets, ets_name :: atom(), ets_options :: []}
ets_backend: :ets | {:ets, atom()} | {:ets, atom(), ets_options :: []}
def new(name, max_running, max_waiting, options \\ []) do
name = atom_name(name)
name = prefix_name(name)
if defined?(name) do
{:error, :existing}
else
@ -45,7 +47,7 @@ defmodule Limiter do
option: {:wait, non_neg_integer()}
@doc "Adjust the limiter limits at runtime"
def set(name, new_max_running, new_max_waiting, options \\ []) do
name = atom_name(name)
name = prefix_name(name)
if defined?(name) do
new_wait = Keyword.get(options, :wait)
{__MODULE__, max_running, max_waiting, backend, wait} = :persistent_term.get(name)
@ -60,7 +62,7 @@ defmodule Limiter do
@spec limit(atom(), function()) :: {:error, :overload} | any()
@doc "Limits invocation of `fun`."
def limit(name, fun) do
do_limit(atom_name(name), fun)
do_limit(prefix_name(name), fun)
end
defp do_limit(name, fun) do
@ -107,7 +109,7 @@ defmodule Limiter do
:atomics.sub_get(ref, 1, 1)
end
defp atom_name(suffix), do: Module.concat(__MODULE__, suffix)
defp prefix_name(suffix), do: Module.concat(__MODULE__, suffix)
defp defined?(name) do
{__MODULE__, _, _, _, _, _} = :persistent_term.get(name)
@ -120,7 +122,7 @@ defmodule Limiter do
if Code.ensure_loaded?(:atomics) do
:atomics
else
Logger.debug("Limiter: atomics not available, using ETS backend")
Logger.debug("ConcurrentLimiter: atomics not available, using ETS backend")
:ets
end
end
@ -134,7 +136,7 @@ defmodule Limiter do
end
defp setup_backend({:ets, name, options}) do
ets_name = atom_name(name)
ets_name = prefix_name(name)
case :ets.whereis(ets_name) do
:undefined -> :ets.new(ets_name, [:public, :named_table] ++ options)

View file

@ -1,9 +1,9 @@
defmodule Limiter.MixProject do
defmodule ConcurrentLimiter.MixProject do
use Mix.Project
def project do
[
app: :limiter,
app: :concurrent_limiter,
version: "0.1.0",
elixir: "~> 1.10",
start_permanent: Mix.env() == :prod,

View file

@ -1,16 +1,16 @@
defmodule LimiterTest do
defmodule ConcurrentLimiterTest do
use ExUnit.Case
doctest Limiter
doctest ConcurrentLimiter
test "limiter ets is atomic" do
name = "test1"
Limiter.new(name, 2, 2)
ConcurrentLimiter.new(name, 2, 2)
atomic_test(name)
end
test "limiter atomics is atomic" do
name = "test2"
Limiter.new(name, 2, 2, backend: :atomics)
ConcurrentLimiter.new(name, 2, 2, backend: :atomics)
atomic_test(name)
end
@ -18,7 +18,7 @@ defmodule LimiterTest do
self = self()
sleepy = fn sleep ->
case Limiter.limit(name, fn ->
case ConcurrentLimiter.limit(name, fn ->
send(self, :ok)
Process.sleep(sleep)
:ok

View file

@ -1,24 +1,24 @@
infinite = 1_000_000_000_000_000_000_000_000_000_000_000_000_000_000_000
Limiter.new(:bench, infinite, 0)
Limiter.new(:bench_s, infinite, 0, ets: LimiterTest)
ConcurrentLimiter.new(:bench, infinite, 0)
ConcurrentLimiter.new(:bench_s, infinite, 0, ets: ConcurrentLimiterTest)
concurrent = [{:read_concurrency, true}, {:write_concurrency, true}]
Limiter.new(:bench_rw, infinite, 0)
Limiter.new(:bench_s_rw, infinite, 0, ets: LimiterTest, ets_opts: concurrent)
ConcurrentLimiter.new(:bench_rw, infinite, 0)
ConcurrentLimiter.new(:bench_s_rw, infinite, 0, ets: ConcurrentLimiterTest, ets_opts: concurrent)
single = %{
"Limiter.limit/2" => fn ->
Limiter.limit(:bench, fn -> :ok end)
"ConcurrentLimiter.limit/2" => fn ->
ConcurrentLimiter.limit(:bench, fn -> :ok end)
end,
"Limiter.limit/2 with concurrency" => fn ->
Limiter.limit(:bench_rw, fn -> :ok end)
"ConcurrentLimiter.limit/2 with concurrency" => fn ->
ConcurrentLimiter.limit(:bench_rw, fn -> :ok end)
end,
"Limiter:limit/2 with shared ets" => fn ->
Limiter.limit(:bench_s, fn -> :ok end)
"ConcurrentLimiter:limit/2 with shared ets" => fn ->
ConcurrentLimiter.limit(:bench_s, fn -> :ok end)
end,
"Limiter:limit/2 with shared ets and concurrency" => fn ->
Limiter.limit(:bench_s_rw, fn -> :ok end)
"ConcurrentLimiter:limit/2 with shared ets and concurrency" => fn ->
ConcurrentLimiter.limit(:bench_s_rw, fn -> :ok end)
end
}

View file

@ -1,52 +1,70 @@
infinite = 1_000_000_000_000_000_000_000_000_000_000_000_000_000_000_000
parallel = case Integer.parse(System.get_env("PARALLEL", "")) do
{int, _} -> int
_ -> System.schedulers_online()/2
end
Limiter.new(:bench_u_0, infinite, 0, backend: {:ets, LimiterTest0, []})
Limiter.new(:bench_u_1, infinite, 0, backend: {:ets, LimiterTest1, []})
Limiter.new(:bench_u_2, infinite, 0, backend: {:ets, LimiterTest2, []})
Limiter.new(:bench_u_3, infinite, 0, backend: {:ets, LimiterTest3, []})
multi_count = case Integer.parse(System.get_env("MULTI", "")) do
{int, _} -> int
_ -> parallel
end
Limiter.new(:bench_a_0, infinite, 0, backend: :atomics)
Limiter.new(:bench_a_1, infinite, 0, backend: :atomics)
Limiter.new(:bench_a_2, infinite, 0, backend: :atomics)
Limiter.new(:bench_a_3, infinite, 0, backend: :atomics)
names = fn(prefix) ->
for i <- 1..multi_count do
Module.concat(MultiConcurrentLimiterBenchmark, "#{prefix}#{i}")
end
end
Limiter.new(:bench_s_0, infinite, 0, backend: {:ets, LimiterTest, []})
Limiter.new(:bench_s_1, infinite, 0, backend: {:ets, LimiterTest, []})
Limiter.new(:bench_s_2, infinite, 0, backend: {:ets, LimiterTest, []})
Limiter.new(:bench_s_3, infinite, 0, backend: {:ets, LimiterTest, []})
bench_unique = for name <- names.("u") do
ConcurrentLimiter.new(name, infinite, 0, backend: {:ets, name, []})
name
end
IO.inspect(bench_unique)
bench_atomics = for name <- names.("a") do
ConcurrentLimiter.new(name, infinite, 0, backend: :atomics)
name
end
bench_shared = for name <- names.("s") do
ConcurrentLimiter.new(name, infinite, 0, backend: {:ets, ConcurrentLimiterTest, []})
name
end
rw = [{:read_concurrency, true}, {:write_concurrency, true}]
Limiter.new(:bench_u_rw0, infinite, 0, backend: {:ets, LimiterTestRW0, rw})
Limiter.new(:bench_u_rw1, infinite, 0, backend: {:ets, LimiterTestRW1, rw})
Limiter.new(:bench_u_rw2, infinite, 0, backend: {:ets, LimiterTestRW2, rw})
Limiter.new(:bench_u_rw3, infinite, 0, backend: {:ets, LimiterTestRW3, rw})
bench_unique_rw = for name <- names.("u_rw") do
ConcurrentLimiter.new(name, infinite, 0, backend: {:ets, name, rw})
name
end
Limiter.new(:bench_s_rw0, infinite, 0, backend: {:ets, LimiterTestRW, rw})
Limiter.new(:bench_s_rw1, infinite, 0, backend: {:ets, LimiterTestRW, rw})
Limiter.new(:bench_s_rw2, infinite, 0, backend: {:ets, LimiterTestRW, rw})
Limiter.new(:bench_s_rw3, infinite, 0, backend: {:ets, LimiterTestRW, rw})
bench_shared_rw = for name <- names.("s_rw") do
ConcurrentLimiter.new(name, infinite, 0, backend: {:ets, ConcurrentLimiterTestRW, rw})
name
end
multiple = %{
"Limiter.limit/2 unique ets" => fn ->
limiter = Enum.random([:bench_u_0, :bench_u_1, :bench_u_2, :bench_u_3])
Limiter.limit(limiter, fn -> :ok end)
"ConcurrentLimiter.limit/2 unique ets" => fn ->
limiter = Enum.random(bench_unique)
ConcurrentLimiter.limit(limiter, fn -> :ok end)
end,
"Limiter:limit/2 shared ets" => fn ->
limiter = Enum.random([:bench_s_0, :bench_s_1, :bench_s_2, :bench_s_3])
Limiter.limit(limiter, fn -> :ok end)
"ConcurrentLimiter:limit/2 shared ets" => fn ->
limiter = Enum.random(bench_shared)
ConcurrentLimiter.limit(limiter, fn -> :ok end)
end,
"Limiter.limit/2 unique ets, concurrency" => fn ->
limiter = Enum.random([:bench_u_rw0, :bench_u_rw1, :bench_u_rw2, :bench_u_rw3])
Limiter.limit(limiter, fn -> :ok end)
"ConcurrentLimiter.limit/2 unique ets, concurrency" => fn ->
limiter = Enum.random(bench_unique_rw)
ConcurrentLimiter.limit(limiter, fn -> :ok end)
end,
"Limiter:limit/2 shared ets, concurrency" => fn ->
limiter = Enum.random([:bench_s_rw0, :bench_s_rw1, :bench_s_rw2, :bench_s_rw3])
Limiter.limit(limiter, fn -> :ok end)
"ConcurrentLimiter:limit/2 shared ets, concurrency" => fn ->
limiter = Enum.random(bench_shared_rw)
ConcurrentLimiter.limit(limiter, fn -> :ok end)
end,
"Limiter:limit/2 atomics" => fn ->
limiter = Enum.random([:bench_a_0, :bench_a_1, :bench_a_2, :bench_a_3])
Limiter.limit(limiter, fn -> :ok end)
"ConcurrentLimiter:limit/2 atomics" => fn ->
limiter = Enum.random(bench_atomics)
ConcurrentLimiter.limit(limiter, fn -> :ok end)
end
}

View file

@ -22,25 +22,25 @@ parallel: 1
inputs: none specified
Estimated total run time: 35 s
Benchmarking Limiter.limit/2 unique ets...
Benchmarking Limiter.limit/2 unique ets, concurrency...
Benchmarking Limiter:limit/2 atomics...
Benchmarking Limiter:limit/2 shared ets...
Benchmarking Limiter:limit/2 shared ets, concurrency...
Benchmarking ConcurrentLimiter.limit/2 unique ets...
Benchmarking ConcurrentLimiter.limit/2 unique ets, concurrency...
Benchmarking ConcurrentLimiter:limit/2 atomics...
Benchmarking ConcurrentLimiter:limit/2 shared ets...
Benchmarking ConcurrentLimiter:limit/2 shared ets, concurrency...
Name ips average deviation median 99th %
Limiter:limit/2 atomics 491.88 K 2.03 μs ±1506.30% 1.55 μs 3.48 μs
Limiter.limit/2 unique ets 414.63 K 2.41 μs ±1169.34% 1.97 μs 4.53 μs
Limiter:limit/2 shared ets 411.43 K 2.43 μs ±1286.95% 1.96 μs 3.66 μs
Limiter.limit/2 unique ets, concurrency 406.50 K 2.46 μs ±1006.31% 2.06 μs 4.34 μs
Limiter:limit/2 shared ets, concurrency 384.04 K 2.60 μs ±1293.25% 2.12 μs 4.37 μs
ConcurrentLimiter:limit/2 atomics 491.88 K 2.03 μs ±1506.30% 1.55 μs 3.48 μs
ConcurrentLimiter.limit/2 unique ets 414.63 K 2.41 μs ±1169.34% 1.97 μs 4.53 μs
ConcurrentLimiter:limit/2 shared ets 411.43 K 2.43 μs ±1286.95% 1.96 μs 3.66 μs
ConcurrentLimiter.limit/2 unique ets, concurrency 406.50 K 2.46 μs ±1006.31% 2.06 μs 4.34 μs
ConcurrentLimiter:limit/2 shared ets, concurrency 384.04 K 2.60 μs ±1293.25% 2.12 μs 4.37 μs
Comparison:
Limiter:limit/2 atomics 491.88 K
Limiter.limit/2 unique ets 414.63 K - 1.19x slower +0.38 μs
Limiter:limit/2 shared ets 411.43 K - 1.20x slower +0.40 μs
Limiter.limit/2 unique ets, concurrency 406.50 K - 1.21x slower +0.43 μs
Limiter:limit/2 shared ets, concurrency 384.04 K - 1.28x slower +0.57 μs
ConcurrentLimiter:limit/2 atomics 491.88 K
ConcurrentLimiter.limit/2 unique ets 414.63 K - 1.19x slower +0.38 μs
ConcurrentLimiter:limit/2 shared ets 411.43 K - 1.20x slower +0.40 μs
ConcurrentLimiter.limit/2 unique ets, concurrency 406.50 K - 1.21x slower +0.43 μs
ConcurrentLimiter:limit/2 shared ets, concurrency 384.04 K - 1.28x slower +0.57 μs
@ -65,22 +65,22 @@ parallel: 8
inputs: none specified
Estimated total run time: 35 s
Benchmarking Limiter.limit/2 unique ets...
Benchmarking Limiter.limit/2 unique ets, concurrency...
Benchmarking Limiter:limit/2 atomics...
Benchmarking Limiter:limit/2 shared ets...
Benchmarking Limiter:limit/2 shared ets, concurrency...
Benchmarking ConcurrentLimiter.limit/2 unique ets...
Benchmarking ConcurrentLimiter.limit/2 unique ets, concurrency...
Benchmarking ConcurrentLimiter:limit/2 atomics...
Benchmarking ConcurrentLimiter:limit/2 shared ets...
Benchmarking ConcurrentLimiter:limit/2 shared ets, concurrency...
Name ips average deviation median 99th %
Limiter:limit/2 atomics 307.84 K 3.25 μs ±1113.62% 2.09 μs 10.24 μs
Limiter.limit/2 unique ets, concurrency 95.56 K 10.46 μs ±391.37% 2.93 μs 163.02 μs
Limiter:limit/2 shared ets, concurrency 92.39 K 10.82 μs ±374.36% 2.92 μs 158.97 μs
Limiter.limit/2 unique ets 80.68 K 12.39 μs ±362.74% 2.85 μs 160.66 μs
Limiter:limit/2 shared ets 6.04 K 165.66 μs ±17.23% 167.48 μs 237.96 μs
ConcurrentLimiter:limit/2 atomics 307.84 K 3.25 μs ±1113.62% 2.09 μs 10.24 μs
ConcurrentLimiter.limit/2 unique ets, concurrency 95.56 K 10.46 μs ±391.37% 2.93 μs 163.02 μs
ConcurrentLimiter:limit/2 shared ets, concurrency 92.39 K 10.82 μs ±374.36% 2.92 μs 158.97 μs
ConcurrentLimiter.limit/2 unique ets 80.68 K 12.39 μs ±362.74% 2.85 μs 160.66 μs
ConcurrentLimiter:limit/2 shared ets 6.04 K 165.66 μs ±17.23% 167.48 μs 237.96 μs
Comparison:
Limiter:limit/2 atomics 307.84 K
Limiter.limit/2 unique ets, concurrency 95.56 K - 3.22x slower +7.22 μs
Limiter:limit/2 shared ets, concurrency 92.39 K - 3.33x slower +7.57 μs
Limiter.limit/2 unique ets 80.68 K - 3.82x slower +9.15 μs
Limiter:limit/2 shared ets 6.04 K - 51.00x slower +162.41 μs
ConcurrentLimiter:limit/2 atomics 307.84 K
ConcurrentLimiter.limit/2 unique ets, concurrency 95.56 K - 3.22x slower +7.22 μs
ConcurrentLimiter:limit/2 shared ets, concurrency 92.39 K - 3.33x slower +7.57 μs
ConcurrentLimiter.limit/2 unique ets 80.68 K - 3.82x slower +9.15 μs
ConcurrentLimiter:limit/2 shared ets 6.04 K - 51.00x slower +162.41 μs

View file

@ -1,19 +1,20 @@
:ets.new(:limiter_bench, [:public, :named_table])
:ets.new(:limiter_bench_concurrent, [:public, :named_table, {:read_concurrency, false}, {:write_concurrency, true}])
atomics = :atomics.new(1, [])
Benchee.run(
update_counter =
%{
"ets:update_counter" => fn ->
:ets.update_counter(:limiter_bench, "bench", {2, 1}, {"bench", 0})
end
},
parallel: 1
)
Benchee.run(
%{
"ets:update_counter" => fn ->
end,
"ets:update_counter concurrent" => fn ->
:ets.update_counter(:limiter_bench, "bench", {2, 1}, {"bench", 0})
end
},
parallel: System.schedulers_online()
)
end,
"atomics:add_get" => fn ->
:atomics.add_get(atomics, 1, 1)
end,
}
Benchee.run(update_counter, parallel: 1)
Benchee.run(update_counter, parallel: System.schedulers_online())