Switch to atomics, add shared ets, ..

This commit is contained in:
Jordan Bracco 2020-05-08 20:10:33 +02:00
parent 554524db9b
commit b541ef9674
9 changed files with 340 additions and 64 deletions

3
README.md Normal file
View file

@ -0,0 +1,3 @@
# Limiter
See the docs in `lib/limiter.ex`.

View file

@ -1,47 +1,158 @@
defmodule Limiter do
@ets __MODULE__.ETS
require Logger
def new(name, max_running, max_waiting) do
@moduledoc """
# Limiter
A concurrency limiter. Limits the number of concurrent invocations possible, without using a worker pool or different processes.
It supports two storage methods:
* **`[atomics](https://erlang.org/doc/man/atomics.html)`** recommended and default if your OTP is > 21.2.
* **`[ets](https://erlang.org/doc/man/ets.html)`** either with a single table per Limiter (faster) or a shared table (better for a large number of limiters).
You would however always want to use atomics, ets is mostly there for backwards compatibility.
"""
@doc """
Initializes a `Limiter`.
"""
@spec new(name, max_running, max_waiting, options) :: :ok | {:error, :existing}
when name: atom(),
max_running: non_neg_integer(),
max_waiting: non_neg_integer() | :infinity,
options: [option],
option: {:wait, non_neg_integer()} | backend,
backend: :atomics | ets_backend,
ets_backend: :ets | {:ets, atom()} | {:ets, ets_name :: atom(), ets_options :: []}
def new(name, max_running, max_waiting, options \\ []) do
name = atom_name(name)
:persistent_term.put(name, {max_running, max_waiting})
:ets.new(name, [:public, :named_table])
:ok
if defined?(name) do
{:error, :existing}
else
wait = Keyword.get(options, :wait, 150)
backend = Keyword.get(options, :backend, default_backend())
{:ok, backend} = setup_backend(backend)
:persistent_term.put(name, {__MODULE__, max_running, max_waiting, backend, wait})
:ok
end
end
@spec set(name, new_max_running, new_max_waiting, options) :: :ok | :error
when name: atom(),
new_max_running: non_neg_integer(),
new_max_waiting: non_neg_integer() | :infinity,
options: [option],
option: {:wait, non_neg_integer()}
@doc "Adjust the limiter limits at runtime"
def set(name, new_max_running, new_max_waiting, options \\ []) do
name = atom_name(name)
if defined?(name) do
new_wait = Keyword.get(options, :wait)
{__MODULE__, max_running, max_waiting, backend, wait} = :persistent_term.get(name)
new =
{__MODULE__, new_max_running || max_running, new_max_waiting || max_waiting, backend,
new_wait || wait}
:persistent_term.put(name, new)
:ok
else
:error
end
end
@spec limit(atom(), function()) :: {:error, :overload} | any()
@doc "Limits invocation of `fun`."
def limit(name, fun) do
{max_running, max_waiting} = :persistent_term.get(atom_name(name))
do_limit(atom_name(name), fun)
end
defp do_limit(name, fun) do
{__MODULE__, max_running, max_waiting, backend, wait} = :persistent_term.get(name)
max = max_running + max_waiting
counter = inc(name)
counter = inc(backend, name)
cond do
counter <= max_running ->
fun.()
try do
fun.()
after
dec(backend, name)
end
counter > max ->
dec(backend, name)
{:error, :overload}
counter > max_running ->
wait(name, fun)
wait(backend, name, wait, fun)
end
after
dec(name)
end
defp wait(name, fun) do
Process.sleep(150)
dec(name)
limit(name, fun)
defp wait(backend, name, wait, fun) do
Process.sleep(wait)
dec(backend, name)
do_limit(name, fun)
end
defp inc(name) do
name = atom_name(name)
:ets.update_counter(name, name, {2, 1}, {name, 0})
defp inc({:ets, ets}, name) do
:ets.update_counter(ets, name, {2, 1}, {name, 0})
end
def dec(name) do
name = atom_name(name)
:ets.update_counter(name, name, {2, -1}, {name, 0})
defp inc({:atomics, ref}, _) do
:atomics.add_get(ref, 1, 1)
end
defp atom_name(suffix), do: Module.concat(@ets, suffix)
def dec({:ets, ets}, name) do
:ets.update_counter(ets, name, {2, -1}, {name, 0})
end
def dec({:atomics, ref}, _) do
:atomics.sub_get(ref, 1, 1)
end
defp atom_name(suffix), do: Module.concat(__MODULE__, suffix)
defp defined?(name) do
{__MODULE__, _, _, _, _, _} = :persistent_term.get(name)
true
rescue
_ -> false
end
defp default_backend() do
if Code.ensure_loaded?(:atomics) do
:atomics
else
Logger.debug("Limiter: atomics not available, using ETS backend")
:ets
end
end
defp setup_backend(:ets) do
setup_backend({:ets, ETS})
end
defp setup_backend({:ets, name}) do
setup_backend({:ets, name, [{:write_concurrency, true}, {:read_concurrency, true}]})
end
defp setup_backend({:ets, name, options}) do
ets_name = atom_name(name)
case :ets.whereis(ets_name) do
:undefined -> :ets.new(ets_name, [:public, :named_table] ++ options)
_ -> nil
end
{:ok, {:ets, ets_name}}
end
defp setup_backend(:atomics) do
{:ok, {:atomics, :atomics.new(1, signed: true)}}
end
end

View file

@ -11,19 +11,16 @@ defmodule Limiter.MixProject do
]
end
# Run "mix help compile.app" to learn about applications.
def application do
[
extra_applications: [:logger]
]
end
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:ex_doc, "~> 0.21", only: :dev, runtime: false},
{:benchee, "~> 1.0", only: [:dev, :test]}
# {:dep_from_hexpm, "~> 0.3.0"},
# {:dep_from_git, git: "https://github.com/elixir-lang/my_dep.git", tag: "0.1.0"}
]
end
end

View file

@ -1,4 +1,9 @@
%{
"benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm", "3ad58ae787e9c7c94dd7ceda3b587ec2c64604563e049b2a0e8baafae832addb"},
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"},
"earmark": {:hex, :earmark, "1.4.4", "4821b8d05cda507189d51f2caeef370cf1e18ca5d7dfb7d31e9cafe6688106a4", [:mix], [], "hexpm", "1f93aba7340574847c0f609da787f0d79efcab51b044bb6e242cae5aca9d264d"},
"ex_doc": {:hex, :ex_doc, "0.21.3", "857ec876b35a587c5d9148a2512e952e24c24345552259464b98bfbb883c7b42", [:mix], [{:earmark, "~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "0db1ee8d1547ab4877c5b5dffc6604ef9454e189928d5ba8967d4a58a801f161"},
"makeup": {:hex, :makeup, "1.0.1", "82f332e461dc6c79dbd82fbe2a9c10d48ed07146f0a478286e590c83c52010b5", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "49736fe5b66a08d8575bf5321d716bac5da20c8e6b97714fec2bcd6febcfa1f8"},
"makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "d4b316c7222a85bbaa2fd7c6e90e37e953257ad196dc229505137c5e505e9eff"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"},
}

View file

@ -2,38 +2,20 @@ defmodule LimiterTest do
use ExUnit.Case
doctest Limiter
defp test_ets(name, max, sleep, fun) do
count = :ets.update_counter(:limiter_test, name, {2, 1}, {name, 0})
if count <= max do
fun.({:ok, count})
Process.sleep(sleep)
else
fun.(:fail)
end
after
:ets.update_counter(:limiter_test, name, {2, -1}, {name, 1})
end
test "limits with ets" do
:ets.new(:limiter_test, [:public, :named_table])
ets = "test"
test = self()
spawn_link(fn -> test_ets(ets, 2, 500, fn result -> send(test, result) end) end)
spawn_link(fn -> test_ets(ets, 2, 750, fn result -> send(test, result) end) end)
spawn_link(fn -> test_ets(ets, 2, 500, fn result -> send(test, result) end) end)
assert_receive {:ok, 1}
assert_receive {:ok, 2}
assert_receive :fail
Process.sleep(500)
spawn_link(fn -> test_ets(ets, 2, 500, fn result -> send(test, result) end) end)
assert_receive {:ok, 2}
end
test "limiter" do
test "limiter ets is atomic" do
name = "test1"
Limiter.new(name, 2, 2)
atomic_test(name)
end
test "limiter atomics is atomic" do
name = "test2"
Limiter.new(name, 2, 2, backend: :atomics)
atomic_test(name)
end
defp atomic_test(name) do
self = self()
Limiter.set(name, 2, 2)
sleepy = fn sleep ->
case Limiter.limit(name, fn ->

View file

@ -1,11 +1,28 @@
:ets.new(:limiter_bench, [:public, :named_table])
Limiter.new(:bench, 1_000_000_000_000_000_000_000_000_000_000_000_000_000_000_000, 0)
infinite = 1_000_000_000_000_000_000_000_000_000_000_000_000_000_000_000
Limiter.new(:bench, infinite, 0)
Limiter.new(:bench_s, infinite, 0, ets: LimiterTest)
Benchee.run(%{
"update_counter" => fn ->
:ets.update_counter(:limiter_bench, "bench", {2, 1}, {"bench", 0})
end,
"limit" => fn ->
concurrent = [{:read_concurrency, true}, {:write_concurrency, true}]
Limiter.new(:bench_rw, infinite, 0)
Limiter.new(:bench_s_rw, infinite, 0, ets: LimiterTest, ets_opts: concurrent)
single = %{
"Limiter.limit/2" => fn ->
Limiter.limit(:bench, fn -> :ok end)
end,
"Limiter.limit/2 with concurrency" => fn ->
Limiter.limit(:bench_rw, fn -> :ok end)
end,
"Limiter:limit/2 with shared ets" => fn ->
Limiter.limit(:bench_s, fn -> :ok end)
end,
"Limiter:limit/2 with shared ets and concurrency" => fn ->
Limiter.limit(:bench_s_rw, fn -> :ok end)
end
})
}
IO.puts("\n\n\n\nsingle, sequential\n\n\n\n")
Benchee.run(single, parallel: 1)
IO.puts("\n\n\n\nsingle, parallel\n\n\n\n")
Benchee.run(single, parallel: System.schedulers_online())

View file

@ -0,0 +1,56 @@
infinite = 1_000_000_000_000_000_000_000_000_000_000_000_000_000_000_000
Limiter.new(:bench_u_0, infinite, 0, backend: {:ets, LimiterTest0, []})
Limiter.new(:bench_u_1, infinite, 0, backend: {:ets, LimiterTest1, []})
Limiter.new(:bench_u_2, infinite, 0, backend: {:ets, LimiterTest2, []})
Limiter.new(:bench_u_3, infinite, 0, backend: {:ets, LimiterTest3, []})
Limiter.new(:bench_a_0, infinite, 0, backend: :atomics)
Limiter.new(:bench_a_1, infinite, 0, backend: :atomics)
Limiter.new(:bench_a_2, infinite, 0, backend: :atomics)
Limiter.new(:bench_a_3, infinite, 0, backend: :atomics)
Limiter.new(:bench_s_0, infinite, 0, backend: {:ets, LimiterTest, []})
Limiter.new(:bench_s_1, infinite, 0, backend: {:ets, LimiterTest, []})
Limiter.new(:bench_s_2, infinite, 0, backend: {:ets, LimiterTest, []})
Limiter.new(:bench_s_3, infinite, 0, backend: {:ets, LimiterTest, []})
rw = [{:read_concurrency, true}, {:write_concurrency, true}]
Limiter.new(:bench_u_rw0, infinite, 0, backend: {:ets, LimiterTestRW0, rw})
Limiter.new(:bench_u_rw1, infinite, 0, backend: {:ets, LimiterTestRW1, rw})
Limiter.new(:bench_u_rw2, infinite, 0, backend: {:ets, LimiterTestRW2, rw})
Limiter.new(:bench_u_rw3, infinite, 0, backend: {:ets, LimiterTestRW3, rw})
Limiter.new(:bench_s_rw0, infinite, 0, backend: {:ets, LimiterTestRW, rw})
Limiter.new(:bench_s_rw1, infinite, 0, backend: {:ets, LimiterTestRW, rw})
Limiter.new(:bench_s_rw2, infinite, 0, backend: {:ets, LimiterTestRW, rw})
Limiter.new(:bench_s_rw3, infinite, 0, backend: {:ets, LimiterTestRW, rw})
multiple = %{
"Limiter.limit/2 unique ets" => fn ->
limiter = Enum.random([:bench_u_0, :bench_u_1, :bench_u_2, :bench_u_3])
Limiter.limit(limiter, fn -> :ok end)
end,
"Limiter:limit/2 shared ets" => fn ->
limiter = Enum.random([:bench_s_0, :bench_s_1, :bench_s_2, :bench_s_3])
Limiter.limit(limiter, fn -> :ok end)
end,
"Limiter.limit/2 unique ets, concurrency" => fn ->
limiter = Enum.random([:bench_u_rw0, :bench_u_rw1, :bench_u_rw2, :bench_u_rw3])
Limiter.limit(limiter, fn -> :ok end)
end,
"Limiter:limit/2 shared ets, concurrency" => fn ->
limiter = Enum.random([:bench_s_rw0, :bench_s_rw1, :bench_s_rw2, :bench_s_rw3])
Limiter.limit(limiter, fn -> :ok end)
end,
"Limiter:limit/2 atomics" => fn ->
limiter = Enum.random([:bench_a_0, :bench_a_1, :bench_a_2, :bench_a_3])
Limiter.limit(limiter, fn -> :ok end)
end
}
IO.puts("\n\n\n\nmulti, sequential\n\n\n\n")
Benchee.run(multiple)
IO.puts("\n\n\n\nmulti, parallel\n\n\n\n")
Benchee.run(multiple, parallel: System.schedulers_online())

View file

@ -0,0 +1,86 @@
multi, sequential
Operating System: Linux
CPU Information: AMD EPYC 7401P 24-Core Processor
Number of Available Cores: 8
Available memory: 31.41 GB
Elixir 1.10.3
Erlang 22.3.2
Benchmark suite executing with the following configuration:
warmup: 2 s
time: 5 s
memory time: 0 ns
parallel: 1
inputs: none specified
Estimated total run time: 35 s
Benchmarking Limiter.limit/2 unique ets...
Benchmarking Limiter.limit/2 unique ets, concurrency...
Benchmarking Limiter:limit/2 atomics...
Benchmarking Limiter:limit/2 shared ets...
Benchmarking Limiter:limit/2 shared ets, concurrency...
Name ips average deviation median 99th %
Limiter:limit/2 atomics 491.88 K 2.03 μs ±1506.30% 1.55 μs 3.48 μs
Limiter.limit/2 unique ets 414.63 K 2.41 μs ±1169.34% 1.97 μs 4.53 μs
Limiter:limit/2 shared ets 411.43 K 2.43 μs ±1286.95% 1.96 μs 3.66 μs
Limiter.limit/2 unique ets, concurrency 406.50 K 2.46 μs ±1006.31% 2.06 μs 4.34 μs
Limiter:limit/2 shared ets, concurrency 384.04 K 2.60 μs ±1293.25% 2.12 μs 4.37 μs
Comparison:
Limiter:limit/2 atomics 491.88 K
Limiter.limit/2 unique ets 414.63 K - 1.19x slower +0.38 μs
Limiter:limit/2 shared ets 411.43 K - 1.20x slower +0.40 μs
Limiter.limit/2 unique ets, concurrency 406.50 K - 1.21x slower +0.43 μs
Limiter:limit/2 shared ets, concurrency 384.04 K - 1.28x slower +0.57 μs
multi, parallel
Operating System: Linux
CPU Information: AMD EPYC 7401P 24-Core Processor
Number of Available Cores: 8
Available memory: 31.41 GB
Elixir 1.10.3
Erlang 22.3.2
Benchmark suite executing with the following configuration:
warmup: 2 s
time: 5 s
memory time: 0 ns
parallel: 8
inputs: none specified
Estimated total run time: 35 s
Benchmarking Limiter.limit/2 unique ets...
Benchmarking Limiter.limit/2 unique ets, concurrency...
Benchmarking Limiter:limit/2 atomics...
Benchmarking Limiter:limit/2 shared ets...
Benchmarking Limiter:limit/2 shared ets, concurrency...
Name ips average deviation median 99th %
Limiter:limit/2 atomics 307.84 K 3.25 μs ±1113.62% 2.09 μs 10.24 μs
Limiter.limit/2 unique ets, concurrency 95.56 K 10.46 μs ±391.37% 2.93 μs 163.02 μs
Limiter:limit/2 shared ets, concurrency 92.39 K 10.82 μs ±374.36% 2.92 μs 158.97 μs
Limiter.limit/2 unique ets 80.68 K 12.39 μs ±362.74% 2.85 μs 160.66 μs
Limiter:limit/2 shared ets 6.04 K 165.66 μs ±17.23% 167.48 μs 237.96 μs
Comparison:
Limiter:limit/2 atomics 307.84 K
Limiter.limit/2 unique ets, concurrency 95.56 K - 3.22x slower +7.22 μs
Limiter:limit/2 shared ets, concurrency 92.39 K - 3.33x slower +7.57 μs
Limiter.limit/2 unique ets 80.68 K - 3.82x slower +9.15 μs
Limiter:limit/2 shared ets 6.04 K - 51.00x slower +162.41 μs

View file

@ -0,0 +1,19 @@
:ets.new(:limiter_bench, [:public, :named_table])
Benchee.run(
%{
"ets:update_counter" => fn ->
:ets.update_counter(:limiter_bench, "bench", {2, 1}, {"bench", 0})
end
},
parallel: 1
)
Benchee.run(
%{
"ets:update_counter" => fn ->
:ets.update_counter(:limiter_bench, "bench", {2, 1}, {"bench", 0})
end
},
parallel: System.schedulers_online()
)