diff --git a/lib/pleroma/gun/connection_pool.ex b/lib/pleroma/gun/connection_pool.ex index 8b41a668c..c6894be53 100644 --- a/lib/pleroma/gun/connection_pool.ex +++ b/lib/pleroma/gun/connection_pool.ex @@ -19,7 +19,7 @@ def get_conn(uri, opts) do get_gun_pid_from_worker(worker_pid, true) [{worker_pid, {gun_pid, _used_by, _crf, _last_reference}}] -> - GenServer.cast(worker_pid, {:add_client, self(), false}) + GenServer.call(worker_pid, :add_client) {:ok, gun_pid} [] -> @@ -70,7 +70,7 @@ def release_conn(conn_pid) do case query_result do [worker_pid] -> - GenServer.cast(worker_pid, {:remove_client, self()}) + GenServer.call(worker_pid, :remove_client) [] -> :ok diff --git a/lib/pleroma/gun/connection_pool/worker.ex b/lib/pleroma/gun/connection_pool/worker.ex index f33447cb6..a61892c60 100644 --- a/lib/pleroma/gun/connection_pool/worker.ex +++ b/lib/pleroma/gun/connection_pool/worker.ex @@ -36,7 +36,16 @@ def handle_continue({:connect, [key, uri, opts, client_pid]}, _) do end @impl true - def handle_cast({:add_client, client_pid, send_pid_back}, %{key: key} = state) do + def handle_cast({:add_client, client_pid, send}, state) do + case handle_call(:add_client, {client_pid, nil}, state) do + {:reply, conn_pid, state, :hibernate} -> + if send, do: send(client_pid, {:conn_pid, conn_pid}) + {:noreply, state, :hibernate} + end + end + + @impl true + def handle_call(:add_client, {client_pid, _}, %{key: key} = state) do time = :erlang.monotonic_time(:millisecond) {{conn_pid, _, _, _}, _} = @@ -44,8 +53,6 @@ def handle_cast({:add_client, client_pid, send_pid_back}, %{key: key} = state) d {conn_pid, [client_pid | used_by], crf(time - last_reference, crf), time} end) - if send_pid_back, do: send(client_pid, {:conn_pid, conn_pid}) - state = if state.timer != nil do Process.cancel_timer(state[:timer]) @@ -57,11 +64,11 @@ def handle_cast({:add_client, client_pid, send_pid_back}, %{key: key} = state) d ref = Process.monitor(client_pid) state = put_in(state.client_monitors[client_pid], ref) - {:noreply, state, :hibernate} + {:reply, conn_pid, state, :hibernate} end @impl true - def handle_cast({:remove_client, client_pid}, %{key: key} = state) do + def handle_call(:remove_client, {client_pid, _}, %{key: key} = state) do {{_conn_pid, used_by, _crf, _last_reference}, _} = Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} -> {conn_pid, List.delete(used_by, client_pid), crf, last_reference} @@ -78,7 +85,7 @@ def handle_cast({:remove_client, client_pid}, %{key: key} = state) do nil end - {:noreply, %{state | timer: timer}, :hibernate} + {:reply, :ok, %{state | timer: timer}, :hibernate} end @impl true @@ -102,22 +109,13 @@ def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams} = down_me @impl true def handle_info({:DOWN, _ref, :process, pid, reason}, state) do - # Sometimes the client is dead before we demonitor it in :remove_client, so the message - # arrives anyway + :telemetry.execute( + [:pleroma, :connection_pool, :client_death], + %{client_pid: pid, reason: reason}, + %{key: state.key} + ) - case state.client_monitors[pid] do - nil -> - {:noreply, state, :hibernate} - - _ref -> - :telemetry.execute( - [:pleroma, :connection_pool, :client_death], - %{client_pid: pid, reason: reason}, - %{key: state.key} - ) - - handle_cast({:remove_client, pid}, state) - end + handle_cast({:remove_client, pid, false}, state) end # LRFU policy: https://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.55.1478