akkoma/lib/pleroma/gun/connection_pool/worker.ex
rinpatch 58a4f350a8 Refactor gun pooling and simplify adapter option insertion
This patch refactors gun pooling to use Elixir process registry and
simplifies adapter option insertion.

Having the pool use process registry instead of a GenServer has a number of advantages:
- Simpler code: the initial implementation adds about half the lines of code it deletes
- Concurrency: unlike a GenServer, ETS-based registry can handle multiple checkout/checkin
requests at the same time
- Precise and easy idle connection clousure: current proposal for closing idle connections in
the GenServer-based pool needs to filter through all connections once a minute and compare their
last active time with closing time. With Elixir process registry this can be done
by just using `Process.send_after`/`Process.cancel_timer` in the worker process.
- Lower memory footprint: In my tests `gun-memory-leak` branch uses about 290mb on peak load (250 connections)
and 235mb on idle (5-10 connections). Registry-based pool uses 210mb on idle and 240mb on peak load
2020-07-15 15:17:27 +03:00

95 lines
2.8 KiB
Elixir

defmodule Pleroma.Gun.ConnectionPool.Worker do
alias Pleroma.Gun
use GenServer
@registry Pleroma.Gun.ConnectionPool
@impl true
def init([uri, key, opts, client_pid]) do
time = :os.system_time(:second)
# Register before opening connection to prevent race conditions
with {:ok, _owner} <- Registry.register(@registry, key, {nil, [client_pid], 1, time}),
{:ok, conn_pid} <- Gun.Conn.open(uri, opts),
Process.link(conn_pid) do
{_, _} =
Registry.update_value(@registry, key, fn {_, used_by, crf, last_reference} ->
{conn_pid, used_by, crf, last_reference}
end)
send(client_pid, {:conn_pid, conn_pid})
{:ok, %{key: key, timer: nil}, :hibernate}
else
err -> {:stop, err}
end
end
@impl true
def handle_cast({:add_client, client_pid, send_pid_back}, %{key: key} = state) do
time = :os.system_time(:second)
{{conn_pid, _, _, _}, _} =
Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} ->
{conn_pid, [client_pid | used_by], crf(time - last_reference, crf), time}
end)
if send_pid_back, do: send(client_pid, {:conn_pid, conn_pid})
state =
if state.timer != nil do
Process.cancel_timer(state[:timer])
%{state | timer: nil}
else
state
end
{:noreply, state, :hibernate}
end
@impl true
def handle_cast({:remove_client, client_pid}, %{key: key} = state) do
{{_conn_pid, used_by, _crf, _last_reference}, _} =
Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} ->
{conn_pid, List.delete(used_by, client_pid), crf, last_reference}
end)
timer =
if used_by == [] do
max_idle = Pleroma.Config.get([:connections_pool, :max_idle_time], 30_000)
Process.send_after(self(), :idle_close, max_idle)
else
nil
end
{:noreply, %{state | timer: timer}, :hibernate}
end
@impl true
def handle_info(:idle_close, state) do
# Gun monitors the owner process, and will close the connection automatically
# when it's terminated
{:stop, :normal, state}
end
# Gracefully shutdown if the connection got closed without any streams left
@impl true
def handle_info({:gun_down, _pid, _protocol, _reason, []}, state) do
{:stop, :normal, state}
end
# Otherwise, shutdown with an error
@impl true
def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams} = down_message, state) do
{:stop, {:error, down_message}, state}
end
@impl true
def handle_call(:idle_close, _, %{key: key} = state) do
Registry.unregister(@registry, key)
{:stop, :normal, state}
end
# LRFU policy: https://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.55.1478
defp crf(time_delta, prev_crf) do
1 + :math.pow(0.5, time_delta / 100) * prev_crf
end
end