Fix counters not decrementing when the process is exited
This fixes the issue by trapping exits for the duration of the fun, then turning exit trapping off and killing the process if there is an exit message in the mailbox. The real-world case where this fixes things is Pleroma MediaProxy. Without exit trapping, the process would get killed if the connection was closed client-side and a counter would be left incremented. I am not sure if trapping exits is the optimal solution, but I don't see any other option.
This commit is contained in:
parent
55e92f84b4
commit
8c1caa0f10
1 changed files with 29 additions and 4 deletions
|
@ -101,25 +101,50 @@ defmodule ConcurrentLimiter do
|
||||||
|
|
||||||
cond do
|
cond do
|
||||||
counter <= max_running ->
|
counter <= max_running ->
|
||||||
:telemetry.execute([:concurrent_limiter, :execution], %{counter: counter}, %{limiter: name})
|
:telemetry.execute([:concurrent_limiter, :execution], %{counter: counter}, %{
|
||||||
|
limiter: name
|
||||||
|
})
|
||||||
|
|
||||||
|
Process.flag(:trap_exit, true)
|
||||||
|
|
||||||
try do
|
try do
|
||||||
fun.()
|
fun.()
|
||||||
after
|
after
|
||||||
dec(ref, name)
|
dec(ref, name)
|
||||||
|
Process.flag(:trap_exit, false)
|
||||||
|
|
||||||
|
receive do
|
||||||
|
{:EXIT, _, reason} ->
|
||||||
|
Process.exit(self(), reason)
|
||||||
|
after
|
||||||
|
0 -> :noop
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
counter > max ->
|
counter > max ->
|
||||||
:telemetry.execute([:concurrent_limiter, :overload], %{counter: counter}, %{limiter: name, scope: "max"})
|
:telemetry.execute([:concurrent_limiter, :overload], %{counter: counter}, %{
|
||||||
|
limiter: name,
|
||||||
|
scope: "max"
|
||||||
|
})
|
||||||
|
|
||||||
dec(ref, name)
|
dec(ref, name)
|
||||||
{:error, :overload}
|
{:error, :overload}
|
||||||
|
|
||||||
retries + 1 > max_retries ->
|
retries + 1 > max_retries ->
|
||||||
:telemetry.execute([:concurrent_limiter, :max_retries], %{counter: counter}, %{limiter: name, retries: retries + 1})
|
:telemetry.execute([:concurrent_limiter, :max_retries], %{counter: counter}, %{
|
||||||
|
limiter: name,
|
||||||
|
retries: retries + 1
|
||||||
|
})
|
||||||
|
|
||||||
dec(ref, name)
|
dec(ref, name)
|
||||||
{:error, :overload}
|
{:error, :overload}
|
||||||
|
|
||||||
counter > max_running ->
|
counter > max_running ->
|
||||||
:telemetry.execute([:concurrent_limiter, :wait], %{counter: counter}, %{limiter: name, retries: retries + 1})
|
:telemetry.execute([:concurrent_limiter, :wait], %{counter: counter}, %{
|
||||||
|
limiter: name,
|
||||||
|
retries: retries + 1
|
||||||
|
})
|
||||||
|
|
||||||
wait(ref, name, fun, wait, opts, retries + 1)
|
wait(ref, name, fun, wait, opts, retries + 1)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue