Sentinel processes - ensure counter is always decremented
This commit is contained in:
parent
3aa46650e2
commit
12490aa78a
4 changed files with 73 additions and 38 deletions
|
@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
|
||||||
### Fixed
|
### Fixed
|
||||||
|
|
||||||
- Decrement counter when max retries has been reached.
|
- Decrement counter when max retries has been reached.
|
||||||
|
- Ensure counter is always decremented in case of process being killed (by using "sentinel" processes that monitors).
|
||||||
- Fixes behaviour of `max_waiting = 0` with `max_size = 1`.
|
- Fixes behaviour of `max_waiting = 0` with `max_size = 1`.
|
||||||
|
|
||||||
## [0.1.0] - 2020-05-16
|
## [0.1.0] - 2020-05-16
|
||||||
|
|
|
@ -97,6 +97,7 @@ defmodule ConcurrentLimiter do
|
||||||
max = max_running + max_waiting
|
max = max_running + max_waiting
|
||||||
counter = inc(ref, name)
|
counter = inc(ref, name)
|
||||||
max_retries = Keyword.get(opts, :max_retries) || max_retries
|
max_retries = Keyword.get(opts, :max_retries) || max_retries
|
||||||
|
sentinel = Keyword.get(opts, :sentinel) || true
|
||||||
:telemetry.execute([:concurrent_limiter, :limit], %{counter: counter}, %{limiter: name})
|
:telemetry.execute([:concurrent_limiter, :limit], %{counter: counter}, %{limiter: name})
|
||||||
|
|
||||||
cond do
|
cond do
|
||||||
|
@ -105,20 +106,13 @@ defmodule ConcurrentLimiter do
|
||||||
limiter: name
|
limiter: name
|
||||||
})
|
})
|
||||||
|
|
||||||
Process.flag(:trap_exit, true)
|
mon = sentinel_start(sentinel, ref, name)
|
||||||
|
|
||||||
try do
|
try do
|
||||||
fun.()
|
fun.()
|
||||||
after
|
after
|
||||||
dec(ref, name)
|
dec(ref, name)
|
||||||
Process.flag(:trap_exit, false)
|
sentinel_stop(mon)
|
||||||
|
|
||||||
receive do
|
|
||||||
{:EXIT, _, reason} ->
|
|
||||||
Process.exit(self(), reason)
|
|
||||||
after
|
|
||||||
0 -> :noop
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
counter > max ->
|
counter > max ->
|
||||||
|
@ -127,13 +121,24 @@ defmodule ConcurrentLimiter do
|
||||||
scope: "max"
|
scope: "max"
|
||||||
})
|
})
|
||||||
|
|
||||||
max_waiting == 0 ->
|
|
||||||
:telemetry.execute([:concurrent_limiter, :overload], %{counter: counter}, %{limiter: name, scope: "max"})
|
|
||||||
dec(ref, name)
|
dec(ref, name)
|
||||||
{:error, :overload}
|
{:error, :overload}
|
||||||
|
|
||||||
counter > max ->
|
max_waiting == 0 ->
|
||||||
:telemetry.execute([:concurrent_limiter, :overload], %{counter: counter}, %{limiter: name, scope: "max"})
|
:telemetry.execute([:concurrent_limiter, :overload], %{counter: counter}, %{
|
||||||
|
limiter: name,
|
||||||
|
scope: "max"
|
||||||
|
})
|
||||||
|
|
||||||
|
dec(ref, name)
|
||||||
|
{:error, :overload}
|
||||||
|
|
||||||
|
counter > max ->
|
||||||
|
:telemetry.execute([:concurrent_limiter, :overload], %{counter: counter}, %{
|
||||||
|
limiter: name,
|
||||||
|
scope: "max"
|
||||||
|
})
|
||||||
|
|
||||||
dec(ref, name)
|
dec(ref, name)
|
||||||
{:error, :overload}
|
{:error, :overload}
|
||||||
|
|
||||||
|
@ -152,17 +157,15 @@ defmodule ConcurrentLimiter do
|
||||||
retries: retries + 1
|
retries: retries + 1
|
||||||
})
|
})
|
||||||
|
|
||||||
wait(ref, name, fun, wait, opts, retries + 1)
|
mon = sentinel_start(sentinel, ref, name)
|
||||||
|
wait = Keyword.get(opts, :timeout) || wait
|
||||||
|
Process.sleep(wait)
|
||||||
|
dec(ref, name)
|
||||||
|
sentinel_stop(mon)
|
||||||
|
do_limit(name, fun, opts, retries + 1)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp wait(ref, name, fun, wait, opts, retries) do
|
|
||||||
wait = Keyword.get(opts, :timeout) || wait
|
|
||||||
Process.sleep(wait)
|
|
||||||
dec(ref, name)
|
|
||||||
do_limit(name, fun, opts, retries)
|
|
||||||
end
|
|
||||||
|
|
||||||
defp inc(ref, _) do
|
defp inc(ref, _) do
|
||||||
:atomics.add_get(ref, 1, 1)
|
:atomics.add_get(ref, 1, 1)
|
||||||
end
|
end
|
||||||
|
@ -179,4 +182,27 @@ defmodule ConcurrentLimiter do
|
||||||
rescue
|
rescue
|
||||||
_ -> false
|
_ -> false
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp sentinel_start(true, ref, name) do
|
||||||
|
self = self()
|
||||||
|
|
||||||
|
spawn(fn ->
|
||||||
|
sentinel_run(ref, name, self, Process.monitor(self))
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp sentinel_start(_, _, _), do: nil
|
||||||
|
|
||||||
|
defp sentinel_stop(pid) when is_pid(pid) do
|
||||||
|
Process.exit(pid, :normal)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp sentinel_stop(_), do: nil
|
||||||
|
|
||||||
|
defp sentinel_run(ref, name, pid, mon) do
|
||||||
|
receive do
|
||||||
|
{:DOWN, ^mon, _, ^pid, reason} ->
|
||||||
|
dec(ref, name)
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -9,14 +9,33 @@ defmodule ConcurrentLimiterTest do
|
||||||
test "limited to one" do
|
test "limited to one" do
|
||||||
name = "l1"
|
name = "l1"
|
||||||
ConcurrentLimiter.new(name, 1, 0, max_retries: 0)
|
ConcurrentLimiter.new(name, 1, 0, max_retries: 0)
|
||||||
endless = fn() -> :timer.sleep(10000) end
|
endless = fn -> :timer.sleep(10_000) end
|
||||||
spawn(fn() -> ConcurrentLimiter.limit(name, endless) end)
|
spawn(fn -> ConcurrentLimiter.limit(name, endless) end)
|
||||||
:timer.sleep(5)
|
:timer.sleep(5)
|
||||||
{:error, :overload} = ConcurrentLimiter.limit(name, endless)
|
{:error, :overload} = ConcurrentLimiter.limit(name, endless)
|
||||||
{:error, :overload} = ConcurrentLimiter.limit(name, endless)
|
{:error, :overload} = ConcurrentLimiter.limit(name, endless)
|
||||||
{:error, :overload} = ConcurrentLimiter.limit(name, endless)
|
{:error, :overload} = ConcurrentLimiter.limit(name, endless)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test "decrements correctly when current pid exits" do
|
||||||
|
name = "l1crash"
|
||||||
|
ConcurrentLimiter.new(name, 1, 0, max_retries: 0)
|
||||||
|
endless = fn -> :timer.sleep(100) end
|
||||||
|
|
||||||
|
pid =
|
||||||
|
spawn(fn ->
|
||||||
|
ConcurrentLimiter.limit(name, endless)
|
||||||
|
end)
|
||||||
|
|
||||||
|
# let some time for spawn to execute
|
||||||
|
:timer.sleep(5)
|
||||||
|
{:error, :overload} = ConcurrentLimiter.limit(name, endless)
|
||||||
|
Process.exit(pid, :kill)
|
||||||
|
# let some time for exit to execute
|
||||||
|
:timer.sleep(5)
|
||||||
|
:ok = ConcurrentLimiter.limit(name, fn -> :ok end)
|
||||||
|
end
|
||||||
|
|
||||||
test "limiter is atomic" do
|
test "limiter is atomic" do
|
||||||
name = "test"
|
name = "test"
|
||||||
ConcurrentLimiter.new(name, 2, 2)
|
ConcurrentLimiter.new(name, 2, 2)
|
||||||
|
|
|
@ -1,24 +1,13 @@
|
||||||
infinite = 1_000_000_000_000_000_000_000_000_000_000_000_000_000_000_000
|
infinite = 1_000_000_000_000_000_000_000_000_000_000_000_000_000_000_000
|
||||||
ConcurrentLimiter.new(:bench, infinite, 0)
|
ConcurrentLimiter.new(:bench, infinite, 0)
|
||||||
ConcurrentLimiter.new(:bench_s, infinite, 0, ets: ConcurrentLimiterTest)
|
ConcurrentLimiter.new(:bench_no_sentinel, infinite, 0, sentinel: false)
|
||||||
|
|
||||||
concurrent = [{:read_concurrency, true}, {:write_concurrency, true}]
|
|
||||||
|
|
||||||
ConcurrentLimiter.new(:bench_rw, infinite, 0)
|
|
||||||
ConcurrentLimiter.new(:bench_s_rw, infinite, 0, ets: ConcurrentLimiterTest, ets_opts: concurrent)
|
|
||||||
|
|
||||||
single = %{
|
single = %{
|
||||||
"ConcurrentLimiter.limit/2" => fn ->
|
"ConcurrentLimiter.limit/2 (with sentinels)" => fn ->
|
||||||
ConcurrentLimiter.limit(:bench, fn -> :ok end)
|
ConcurrentLimiter.limit(:bench, fn -> :ok end)
|
||||||
end,
|
end,
|
||||||
"ConcurrentLimiter.limit/2 with concurrency" => fn ->
|
"ConcurrentLimiter.limit/2 (without sentinels)" => fn ->
|
||||||
ConcurrentLimiter.limit(:bench_rw, fn -> :ok end)
|
ConcurrentLimiter.limit(:bench_no_sentinel, fn -> :ok end)
|
||||||
end,
|
|
||||||
"ConcurrentLimiter:limit/2 with shared ets" => fn ->
|
|
||||||
ConcurrentLimiter.limit(:bench_s, fn -> :ok end)
|
|
||||||
end,
|
|
||||||
"ConcurrentLimiter:limit/2 with shared ets and concurrency" => fn ->
|
|
||||||
ConcurrentLimiter.limit(:bench_s_rw, fn -> :ok end)
|
|
||||||
end
|
end
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue