forked from AkkomaGang/akkoma
ConnectionPool: Log possible HTTP1 blocks
This commit is contained in:
parent
53dc61ba90
commit
89a7efab69
3 changed files with 38 additions and 14 deletions
|
@ -50,10 +50,10 @@ defp do_open(uri, %{proxy: {proxy_host, proxy_port}} = opts) do
|
||||||
|
|
||||||
with open_opts <- Map.delete(opts, :tls_opts),
|
with open_opts <- Map.delete(opts, :tls_opts),
|
||||||
{:ok, conn} <- Gun.open(proxy_host, proxy_port, open_opts),
|
{:ok, conn} <- Gun.open(proxy_host, proxy_port, open_opts),
|
||||||
{:ok, _} <- Gun.await_up(conn, opts[:connect_timeout]),
|
{:ok, protocol} <- Gun.await_up(conn, opts[:connect_timeout]),
|
||||||
stream <- Gun.connect(conn, connect_opts),
|
stream <- Gun.connect(conn, connect_opts),
|
||||||
{:response, :fin, 200, _} <- Gun.await(conn, stream) do
|
{:response, :fin, 200, _} <- Gun.await(conn, stream) do
|
||||||
{:ok, conn}
|
{:ok, conn, protocol}
|
||||||
else
|
else
|
||||||
error ->
|
error ->
|
||||||
Logger.warn(
|
Logger.warn(
|
||||||
|
@ -88,8 +88,8 @@ defp do_open(uri, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
|
||||||
|> Map.put(:socks_opts, socks_opts)
|
|> Map.put(:socks_opts, socks_opts)
|
||||||
|
|
||||||
with {:ok, conn} <- Gun.open(proxy_host, proxy_port, opts),
|
with {:ok, conn} <- Gun.open(proxy_host, proxy_port, opts),
|
||||||
{:ok, _} <- Gun.await_up(conn, opts[:connect_timeout]) do
|
{:ok, protocol} <- Gun.await_up(conn, opts[:connect_timeout]) do
|
||||||
{:ok, conn}
|
{:ok, conn, protocol}
|
||||||
else
|
else
|
||||||
error ->
|
error ->
|
||||||
Logger.warn(
|
Logger.warn(
|
||||||
|
@ -106,8 +106,8 @@ defp do_open(%URI{host: host, port: port} = uri, opts) do
|
||||||
host = Pleroma.HTTP.AdapterHelper.parse_host(host)
|
host = Pleroma.HTTP.AdapterHelper.parse_host(host)
|
||||||
|
|
||||||
with {:ok, conn} <- Gun.open(host, port, opts),
|
with {:ok, conn} <- Gun.open(host, port, opts),
|
||||||
{:ok, _} <- Gun.await_up(conn, opts[:connect_timeout]) do
|
{:ok, protocol} <- Gun.await_up(conn, opts[:connect_timeout]) do
|
||||||
{:ok, conn}
|
{:ok, conn, protocol}
|
||||||
else
|
else
|
||||||
error ->
|
error ->
|
||||||
Logger.warn(
|
Logger.warn(
|
||||||
|
|
|
@ -15,7 +15,7 @@ def init([_key, _uri, _opts, _client_pid] = opts) do
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def handle_continue({:connect, [key, uri, opts, client_pid]}, _) do
|
def handle_continue({:connect, [key, uri, opts, client_pid]}, _) do
|
||||||
with {:ok, conn_pid} <- Gun.Conn.open(uri, opts),
|
with {:ok, conn_pid, protocol} <- Gun.Conn.open(uri, opts),
|
||||||
Process.link(conn_pid) do
|
Process.link(conn_pid) do
|
||||||
time = :erlang.monotonic_time(:millisecond)
|
time = :erlang.monotonic_time(:millisecond)
|
||||||
|
|
||||||
|
@ -27,8 +27,12 @@ def handle_continue({:connect, [key, uri, opts, client_pid]}, _) do
|
||||||
send(client_pid, {:conn_pid, conn_pid})
|
send(client_pid, {:conn_pid, conn_pid})
|
||||||
|
|
||||||
{:noreply,
|
{:noreply,
|
||||||
%{key: key, timer: nil, client_monitors: %{client_pid => Process.monitor(client_pid)}},
|
%{
|
||||||
:hibernate}
|
key: key,
|
||||||
|
timer: nil,
|
||||||
|
client_monitors: %{client_pid => Process.monitor(client_pid)},
|
||||||
|
protocol: protocol
|
||||||
|
}, :hibernate}
|
||||||
else
|
else
|
||||||
err ->
|
err ->
|
||||||
{:stop, {:shutdown, err}, nil}
|
{:stop, {:shutdown, err}, nil}
|
||||||
|
@ -53,14 +57,20 @@ def handle_cast({:remove_client, client_pid}, state) do
|
||||||
end
|
end
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def handle_call(:add_client, {client_pid, _}, %{key: key} = state) do
|
def handle_call(:add_client, {client_pid, _}, %{key: key, protocol: protocol} = state) do
|
||||||
time = :erlang.monotonic_time(:millisecond)
|
time = :erlang.monotonic_time(:millisecond)
|
||||||
|
|
||||||
{{conn_pid, _, _, _}, _} =
|
{{conn_pid, used_by, _, _}, _} =
|
||||||
Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} ->
|
Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} ->
|
||||||
{conn_pid, [client_pid | used_by], crf(time - last_reference, crf), time}
|
{conn_pid, [client_pid | used_by], crf(time - last_reference, crf), time}
|
||||||
end)
|
end)
|
||||||
|
|
||||||
|
:telemetry.execute(
|
||||||
|
[:pleroma, :connection_pool, :client, :add],
|
||||||
|
%{client_pid: client_pid, clients: used_by},
|
||||||
|
%{key: state.key, protocol: protocol}
|
||||||
|
)
|
||||||
|
|
||||||
state =
|
state =
|
||||||
if state.timer != nil do
|
if state.timer != nil do
|
||||||
Process.cancel_timer(state[:timer])
|
Process.cancel_timer(state[:timer])
|
||||||
|
@ -131,7 +141,7 @@ def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams}, state) d
|
||||||
@impl true
|
@impl true
|
||||||
def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
|
def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
|
||||||
:telemetry.execute(
|
:telemetry.execute(
|
||||||
[:pleroma, :connection_pool, :client_death],
|
[:pleroma, :connection_pool, :client, :dead],
|
||||||
%{client_pid: pid, reason: reason},
|
%{client_pid: pid, reason: reason},
|
||||||
%{key: state.key}
|
%{key: state.key}
|
||||||
)
|
)
|
||||||
|
|
|
@ -7,7 +7,8 @@ defmodule Pleroma.Telemetry.Logger do
|
||||||
[:pleroma, :connection_pool, :reclaim, :start],
|
[:pleroma, :connection_pool, :reclaim, :start],
|
||||||
[:pleroma, :connection_pool, :reclaim, :stop],
|
[:pleroma, :connection_pool, :reclaim, :stop],
|
||||||
[:pleroma, :connection_pool, :provision_failure],
|
[:pleroma, :connection_pool, :provision_failure],
|
||||||
[:pleroma, :connection_pool, :client_death]
|
[:pleroma, :connection_pool, :client, :dead],
|
||||||
|
[:pleroma, :connection_pool, :client, :add]
|
||||||
]
|
]
|
||||||
def attach do
|
def attach do
|
||||||
:telemetry.attach_many("pleroma-logger", @events, &handle_event/4, [])
|
:telemetry.attach_many("pleroma-logger", @events, &handle_event/4, [])
|
||||||
|
@ -62,7 +63,7 @@ def handle_event(
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_event(
|
def handle_event(
|
||||||
[:pleroma, :connection_pool, :client_death],
|
[:pleroma, :connection_pool, :client, :dead],
|
||||||
%{client_pid: client_pid, reason: reason},
|
%{client_pid: client_pid, reason: reason},
|
||||||
%{key: key},
|
%{key: key},
|
||||||
_
|
_
|
||||||
|
@ -73,4 +74,17 @@ def handle_event(
|
||||||
}"
|
}"
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def handle_event(
|
||||||
|
[:pleroma, :connection_pool, :client, :add],
|
||||||
|
%{clients: [_, _ | _] = clients},
|
||||||
|
%{key: key, protocol: :http},
|
||||||
|
_
|
||||||
|
) do
|
||||||
|
Logger.info(fn ->
|
||||||
|
"Pool worker for #{key}: #{length(clients)} clients are using an HTTP1 connection at the same time, head-of-line blocking might occur."
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_event([:pleroma, :connection_pool, :client, :add], _, _, _), do: :ok
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue