Merge branch 'refactor/gun-pool-registry' into 'develop'

Refactor gun pooling and simplify adapter option insertion

Closes #1834, #1700, and #1680

See merge request pleroma/pleroma!2479
This commit is contained in:
feld 2020-07-15 23:10:52 +00:00
commit 3a2b2cb6f2
29 changed files with 905 additions and 1925 deletions

View file

@ -172,7 +172,7 @@
"application/ld+json" => ["activity+json"]
}
config :tesla, adapter: Tesla.Adapter.Hackney
config :tesla, adapter: Tesla.Adapter.Gun
# Configures http settings, upstream proxy etc.
config :pleroma, :http,
@ -648,32 +648,30 @@
prepare: :unnamed
config :pleroma, :connections_pool,
checkin_timeout: 250,
reclaim_multiplier: 0.1,
connection_acquisition_wait: 250,
connection_acquisition_retries: 5,
max_connections: 250,
retry: 1,
retry_timeout: 1000,
max_idle_time: 30_000,
retry: 0,
await_up_timeout: 5_000
config :pleroma, :pools,
federation: [
size: 50,
max_overflow: 10,
timeout: 150_000
max_waiting: 10
],
media: [
size: 50,
max_overflow: 10,
timeout: 150_000
max_waiting: 10
],
upload: [
size: 25,
max_overflow: 5,
timeout: 300_000
max_waiting: 5
],
default: [
size: 10,
max_overflow: 2,
timeout: 10_000
max_waiting: 2
]
config :pleroma, :hackney_pools,

View file

@ -3161,36 +3161,37 @@
description: "Advanced settings for `gun` connections pool",
children: [
%{
key: :checkin_timeout,
key: :connection_acquisition_wait,
type: :integer,
description: "Timeout to checkin connection from pool. Default: 250ms.",
description:
"Timeout to acquire a connection from pool.The total max time is this value multiplied by the number of retries. Default: 250ms.",
suggestions: [250]
},
%{
key: :connection_acquisition_retries,
type: :integer,
description:
"Number of attempts to acquire the connection from the pool if it is overloaded. Default: 5",
suggestions: [5]
},
%{
key: :max_connections,
type: :integer,
description: "Maximum number of connections in the pool. Default: 250 connections.",
suggestions: [250]
},
%{
key: :retry,
type: :integer,
description:
"Number of retries, while `gun` will try to reconnect if connection goes down. Default: 1.",
suggestions: [1]
},
%{
key: :retry_timeout,
type: :integer,
description:
"Time between retries when `gun` will try to reconnect in milliseconds. Default: 1000ms.",
suggestions: [1000]
},
%{
key: :await_up_timeout,
type: :integer,
description: "Timeout while `gun` will wait until connection is up. Default: 5000ms.",
suggestions: [5000]
},
%{
key: :reclaim_multiplier,
type: :integer,
description:
"Multiplier for the number of idle connection to be reclaimed if the pool is full. For example if the pool maxes out at 250 connections and this setting is set to 0.3, the pool will reclaim at most 75 idle connections if it's overloaded. Default: 0.1",
suggestions: [0.1]
}
]
},
@ -3199,108 +3200,29 @@
key: :pools,
type: :group,
description: "Advanced settings for `gun` workers pools",
children: [
children:
Enum.map([:federation, :media, :upload, :default], fn pool_name ->
%{
key: :federation,
key: pool_name,
type: :keyword,
description: "Settings for federation pool.",
description: "Settings for #{pool_name} pool.",
children: [
%{
key: :size,
type: :integer,
description: "Number workers in the pool.",
description: "Maximum number of concurrent requests in the pool.",
suggestions: [50]
},
%{
key: :max_overflow,
key: :max_waiting,
type: :integer,
description: "Number of additional workers if pool is under load.",
description:
"Maximum number of requests waiting for other requests to finish. After this number is reached, the pool will start returning errrors when a new request is made",
suggestions: [10]
},
%{
key: :timeout,
type: :integer,
description: "Timeout while `gun` will wait for response.",
suggestions: [150_000]
}
]
},
%{
key: :media,
type: :keyword,
description: "Settings for media pool.",
children: [
%{
key: :size,
type: :integer,
description: "Number workers in the pool.",
suggestions: [50]
},
%{
key: :max_overflow,
type: :integer,
description: "Number of additional workers if pool is under load.",
suggestions: [10]
},
%{
key: :timeout,
type: :integer,
description: "Timeout while `gun` will wait for response.",
suggestions: [150_000]
}
]
},
%{
key: :upload,
type: :keyword,
description: "Settings for upload pool.",
children: [
%{
key: :size,
type: :integer,
description: "Number workers in the pool.",
suggestions: [25]
},
%{
key: :max_overflow,
type: :integer,
description: "Number of additional workers if pool is under load.",
suggestions: [5]
},
%{
key: :timeout,
type: :integer,
description: "Timeout while `gun` will wait for response.",
suggestions: [300_000]
}
]
},
%{
key: :default,
type: :keyword,
description: "Settings for default pool.",
children: [
%{
key: :size,
type: :integer,
description: "Number workers in the pool.",
suggestions: [10]
},
%{
key: :max_overflow,
type: :integer,
description: "Number of additional workers if pool is under load.",
suggestions: [2]
},
%{
key: :timeout,
type: :integer,
description: "Timeout while `gun` will wait for response.",
suggestions: [10_000]
}
]
}
]
end)
},
%{
group: :pleroma,

View file

@ -448,36 +448,32 @@ For each pool, the options are:
*For `gun` adapter*
Advanced settings for connections pool. Pool with opened connections. These connections can be reused in worker pools.
Settings for HTTP connection pool.
For big instances it's recommended to increase `config :pleroma, :connections_pool, max_connections: 500` up to 500-1000.
It will increase memory usage, but federation would work faster.
* `:checkin_timeout` - timeout to checkin connection from pool. Default: 250ms.
* `:max_connections` - maximum number of connections in the pool. Default: 250 connections.
* `:retry` - number of retries, while `gun` will try to reconnect if connection goes down. Default: 1.
* `:retry_timeout` - time between retries when `gun` will try to reconnect in milliseconds. Default: 1000ms.
* `:await_up_timeout` - timeout while `gun` will wait until connection is up. Default: 5000ms.
* `:connection_acquisition_wait` - Timeout to acquire a connection from pool.The total max time is this value multiplied by the number of retries.
* `connection_acquisition_retries` - Number of attempts to acquire the connection from the pool if it is overloaded. Each attempt is timed `:connection_acquisition_wait` apart.
* `:max_connections` - Maximum number of connections in the pool.
* `:await_up_timeout` - Timeout to connect to the host.
* `:reclaim_multiplier` - Multiplied by `:max_connections` this will be the maximum number of idle connections that will be reclaimed in case the pool is overloaded.
### :pools
*For `gun` adapter*
Advanced settings for workers pools.
Settings for request pools. These pools are limited on top of `:connections_pool`.
There are four pools used:
* `:federation` for the federation jobs.
You may want this pool max_connections to be at least equal to the number of federator jobs + retry queue jobs.
* `:media` for rich media, media proxy
* `:upload` for uploaded media (if using a remote uploader and `proxy_remote: true`)
* `:default` for other requests
* `:federation` for the federation jobs. You may want this pool's max_connections to be at least equal to the number of federator jobs + retry queue jobs.
* `:media` - for rich media, media proxy.
* `:upload` - for proxying media when a remote uploader is used and `proxy_remote: true`.
* `:default` - for other requests.
For each pool, the options are:
* `:size` - how much workers the pool can hold
* `:size` - limit to how much requests can be concurrently executed.
* `:timeout` - timeout while `gun` will wait for response
* `:max_overflow` - additional workers if pool is under load
* `:max_waiting` - limit to how much requests can be waiting for others to finish, after this is reached, subsequent requests will be dropped.
## Captcha

View file

@ -39,6 +39,7 @@ def start(_type, _args) do
# every time the application is restarted, so we disable module
# conflicts at runtime
Code.compiler_options(ignore_module_conflict: true)
Pleroma.Telemetry.Logger.attach()
Config.Holder.save_default()
Pleroma.HTML.compile_scrubbers()
Config.DeprecationWarnings.warn()
@ -223,9 +224,7 @@ defp task_children(_) do
# start hackney and gun pools in tests
defp http_children(_, :test) do
hackney_options = Config.get([:hackney_pools, :federation])
hackney_pool = :hackney_pool.child_spec(:federation, hackney_options)
[hackney_pool, Pleroma.Pool.Supervisor]
http_children(Tesla.Adapter.Hackney, nil) ++ http_children(Tesla.Adapter.Gun, nil)
end
defp http_children(Tesla.Adapter.Hackney, _) do
@ -244,7 +243,10 @@ defp http_children(Tesla.Adapter.Hackney, _) do
end
end
defp http_children(Tesla.Adapter.Gun, _), do: [Pleroma.Pool.Supervisor]
defp http_children(Tesla.Adapter.Gun, _) do
Pleroma.Gun.ConnectionPool.children() ++
[{Task, &Pleroma.HTTP.AdapterHelper.Gun.limiter_setup/0}]
end
defp http_children(_, _), do: []
end

View file

@ -19,7 +19,8 @@ defmodule Pleroma.Gun.API do
:tls_opts,
:tcp_opts,
:socks_opts,
:ws_opts
:ws_opts,
:supervise
]
@impl Gun

View file

@ -3,85 +3,33 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Gun.Conn do
@moduledoc """
Struct for gun connection data
"""
alias Pleroma.Gun
alias Pleroma.Pool.Connections
require Logger
@type gun_state :: :up | :down
@type conn_state :: :active | :idle
@type t :: %__MODULE__{
conn: pid(),
gun_state: gun_state(),
conn_state: conn_state(),
used_by: [pid()],
last_reference: pos_integer(),
crf: float(),
retries: pos_integer()
}
defstruct conn: nil,
gun_state: :open,
conn_state: :init,
used_by: [],
last_reference: 0,
crf: 1,
retries: 0
@spec open(String.t() | URI.t(), atom(), keyword()) :: :ok | nil
def open(url, name, opts \\ [])
def open(url, name, opts) when is_binary(url), do: open(URI.parse(url), name, opts)
def open(%URI{} = uri, name, opts) do
def open(%URI{} = uri, opts) do
pool_opts = Pleroma.Config.get([:connections_pool], [])
opts =
opts
|> Enum.into(%{})
|> Map.put_new(:retry, pool_opts[:retry] || 1)
|> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 1000)
|> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000)
|> Map.put_new(:supervise, false)
|> maybe_add_tls_opts(uri)
key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
max_connections = pool_opts[:max_connections] || 250
conn_pid =
if Connections.count(name) < max_connections do
do_open(uri, opts)
else
close_least_used_and_do_open(name, uri, opts)
end
if is_pid(conn_pid) do
conn = %Pleroma.Gun.Conn{
conn: conn_pid,
gun_state: :up,
conn_state: :active,
last_reference: :os.system_time(:second)
}
:ok = Gun.set_owner(conn_pid, Process.whereis(name))
Connections.add_conn(name, key, conn)
end
end
defp maybe_add_tls_opts(opts, %URI{scheme: "http"}), do: opts
defp maybe_add_tls_opts(opts, %URI{scheme: "https", host: host}) do
defp maybe_add_tls_opts(opts, %URI{scheme: "https"}) do
tls_opts = [
verify: :verify_peer,
cacertfile: CAStore.file_path(),
depth: 20,
reuse_sessions: false,
verify_fun:
{&:ssl_verify_hostname.verify_fun/3,
[check_hostname: Pleroma.HTTP.Connection.format_host(host)]}
log_level: :warning,
customize_hostname_check: [match_fun: :public_key.pkix_verify_hostname_match_fun(:https)]
]
tls_opts =
@ -105,7 +53,7 @@ defp do_open(uri, %{proxy: {proxy_host, proxy_port}} = opts) do
{:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]),
stream <- Gun.connect(conn, connect_opts),
{:response, :fin, 200, _} <- Gun.await(conn, stream) do
conn
{:ok, conn}
else
error ->
Logger.warn(
@ -141,7 +89,7 @@ defp do_open(uri, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
with {:ok, conn} <- Gun.open(proxy_host, proxy_port, opts),
{:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do
conn
{:ok, conn}
else
error ->
Logger.warn(
@ -155,11 +103,11 @@ defp do_open(uri, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do
end
defp do_open(%URI{host: host, port: port} = uri, opts) do
host = Pleroma.HTTP.Connection.parse_host(host)
host = Pleroma.HTTP.AdapterHelper.parse_host(host)
with {:ok, conn} <- Gun.open(host, port, opts),
{:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do
conn
{:ok, conn}
else
error ->
Logger.warn(
@ -171,7 +119,7 @@ defp do_open(%URI{host: host, port: port} = uri, opts) do
end
defp destination_opts(%URI{host: host, port: port}) do
host = Pleroma.HTTP.Connection.parse_host(host)
host = Pleroma.HTTP.AdapterHelper.parse_host(host)
%{host: host, port: port}
end
@ -181,17 +129,6 @@ defp add_http2_opts(opts, "https", tls_opts) do
defp add_http2_opts(opts, _, _), do: opts
defp close_least_used_and_do_open(name, uri, opts) do
with [{key, conn} | _conns] <- Connections.get_unused_conns(name),
:ok <- Gun.close(conn.conn) do
Connections.remove_conn(name, key)
do_open(uri, opts)
else
[] -> {:error, :pool_overflowed}
end
end
def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do
"#{scheme}://#{host}#{path}"
end

View file

@ -0,0 +1,79 @@
defmodule Pleroma.Gun.ConnectionPool do
@registry __MODULE__
alias Pleroma.Gun.ConnectionPool.WorkerSupervisor
def children do
[
{Registry, keys: :unique, name: @registry},
Pleroma.Gun.ConnectionPool.WorkerSupervisor
]
end
def get_conn(uri, opts) do
key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
case Registry.lookup(@registry, key) do
# The key has already been registered, but connection is not up yet
[{worker_pid, nil}] ->
get_gun_pid_from_worker(worker_pid, true)
[{worker_pid, {gun_pid, _used_by, _crf, _last_reference}}] ->
GenServer.cast(worker_pid, {:add_client, self(), false})
{:ok, gun_pid}
[] ->
# :gun.set_owner fails in :connected state for whatevever reason,
# so we open the connection in the process directly and send it's pid back
# We trust gun to handle timeouts by itself
case WorkerSupervisor.start_worker([key, uri, opts, self()]) do
{:ok, worker_pid} ->
get_gun_pid_from_worker(worker_pid, false)
{:error, {:already_started, worker_pid}} ->
get_gun_pid_from_worker(worker_pid, true)
err ->
err
end
end
end
defp get_gun_pid_from_worker(worker_pid, register) do
# GenServer.call will block the process for timeout length if
# the server crashes on startup (which will happen if gun fails to connect)
# so instead we use cast + monitor
ref = Process.monitor(worker_pid)
if register, do: GenServer.cast(worker_pid, {:add_client, self(), true})
receive do
{:conn_pid, pid} ->
Process.demonitor(ref)
{:ok, pid}
{:DOWN, ^ref, :process, ^worker_pid, reason} ->
case reason do
{:shutdown, error} -> error
_ -> {:error, reason}
end
end
end
def release_conn(conn_pid) do
# :ets.fun2ms(fn {_, {worker_pid, {gun_pid, _, _, _}}} when gun_pid == conn_pid ->
# worker_pid end)
query_result =
Registry.select(@registry, [
{{:_, :"$1", {:"$2", :_, :_, :_}}, [{:==, :"$2", conn_pid}], [:"$1"]}
])
case query_result do
[worker_pid] ->
GenServer.cast(worker_pid, {:remove_client, self()})
[] ->
:ok
end
end
end

View file

@ -0,0 +1,85 @@
defmodule Pleroma.Gun.ConnectionPool.Reclaimer do
use GenServer, restart: :temporary
@registry Pleroma.Gun.ConnectionPool
def start_monitor do
pid =
case :gen_server.start(__MODULE__, [], name: {:via, Registry, {@registry, "reclaimer"}}) do
{:ok, pid} ->
pid
{:error, {:already_registered, pid}} ->
pid
end
{pid, Process.monitor(pid)}
end
@impl true
def init(_) do
{:ok, nil, {:continue, :reclaim}}
end
@impl true
def handle_continue(:reclaim, _) do
max_connections = Pleroma.Config.get([:connections_pool, :max_connections])
reclaim_max =
[:connections_pool, :reclaim_multiplier]
|> Pleroma.Config.get()
|> Kernel.*(max_connections)
|> round
|> max(1)
:telemetry.execute([:pleroma, :connection_pool, :reclaim, :start], %{}, %{
max_connections: max_connections,
reclaim_max: reclaim_max
})
# :ets.fun2ms(
# fn {_, {worker_pid, {_, used_by, crf, last_reference}}} when used_by == [] ->
# {worker_pid, crf, last_reference} end)
unused_conns =
Registry.select(
@registry,
[
{{:_, :"$1", {:_, :"$2", :"$3", :"$4"}}, [{:==, :"$2", []}], [{{:"$1", :"$3", :"$4"}}]}
]
)
case unused_conns do
[] ->
:telemetry.execute(
[:pleroma, :connection_pool, :reclaim, :stop],
%{reclaimed_count: 0},
%{
max_connections: max_connections
}
)
{:stop, :no_unused_conns, nil}
unused_conns ->
reclaimed =
unused_conns
|> Enum.sort(fn {_pid1, crf1, last_reference1}, {_pid2, crf2, last_reference2} ->
crf1 <= crf2 and last_reference1 <= last_reference2
end)
|> Enum.take(reclaim_max)
reclaimed
|> Enum.each(fn {pid, _, _} ->
DynamicSupervisor.terminate_child(Pleroma.Gun.ConnectionPool.WorkerSupervisor, pid)
end)
:telemetry.execute(
[:pleroma, :connection_pool, :reclaim, :stop],
%{reclaimed_count: Enum.count(reclaimed)},
%{max_connections: max_connections}
)
{:stop, :normal, nil}
end
end
end

View file

@ -0,0 +1,127 @@
defmodule Pleroma.Gun.ConnectionPool.Worker do
alias Pleroma.Gun
use GenServer, restart: :temporary
@registry Pleroma.Gun.ConnectionPool
def start_link([key | _] = opts) do
GenServer.start_link(__MODULE__, opts, name: {:via, Registry, {@registry, key}})
end
@impl true
def init([_key, _uri, _opts, _client_pid] = opts) do
{:ok, nil, {:continue, {:connect, opts}}}
end
@impl true
def handle_continue({:connect, [key, uri, opts, client_pid]}, _) do
with {:ok, conn_pid} <- Gun.Conn.open(uri, opts),
Process.link(conn_pid) do
time = :erlang.monotonic_time(:millisecond)
{_, _} =
Registry.update_value(@registry, key, fn _ ->
{conn_pid, [client_pid], 1, time}
end)
send(client_pid, {:conn_pid, conn_pid})
{:noreply,
%{key: key, timer: nil, client_monitors: %{client_pid => Process.monitor(client_pid)}},
:hibernate}
else
err ->
{:stop, {:shutdown, err}, nil}
end
end
@impl true
def handle_cast({:add_client, client_pid, send_pid_back}, %{key: key} = state) do
time = :erlang.monotonic_time(:millisecond)
{{conn_pid, _, _, _}, _} =
Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} ->
{conn_pid, [client_pid | used_by], crf(time - last_reference, crf), time}
end)
if send_pid_back, do: send(client_pid, {:conn_pid, conn_pid})
state =
if state.timer != nil do
Process.cancel_timer(state[:timer])
%{state | timer: nil}
else
state
end
ref = Process.monitor(client_pid)
state = put_in(state.client_monitors[client_pid], ref)
{:noreply, state, :hibernate}
end
@impl true
def handle_cast({:remove_client, client_pid}, %{key: key} = state) do
{{_conn_pid, used_by, _crf, _last_reference}, _} =
Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} ->
{conn_pid, List.delete(used_by, client_pid), crf, last_reference}
end)
{ref, state} = pop_in(state.client_monitors[client_pid])
Process.demonitor(ref)
timer =
if used_by == [] do
max_idle = Pleroma.Config.get([:connections_pool, :max_idle_time], 30_000)
Process.send_after(self(), :idle_close, max_idle)
else
nil
end
{:noreply, %{state | timer: timer}, :hibernate}
end
@impl true
def handle_info(:idle_close, state) do
# Gun monitors the owner process, and will close the connection automatically
# when it's terminated
{:stop, :normal, state}
end
# Gracefully shutdown if the connection got closed without any streams left
@impl true
def handle_info({:gun_down, _pid, _protocol, _reason, []}, state) do
{:stop, :normal, state}
end
# Otherwise, shutdown with an error
@impl true
def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams} = down_message, state) do
{:stop, {:error, down_message}, state}
end
@impl true
def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
# Sometimes the client is dead before we demonitor it in :remove_client, so the message
# arrives anyway
case state.client_monitors[pid] do
nil ->
{:noreply, state, :hibernate}
_ref ->
:telemetry.execute(
[:pleroma, :connection_pool, :client_death],
%{client_pid: pid, reason: reason},
%{key: state.key}
)
handle_cast({:remove_client, pid}, state)
end
end
# LRFU policy: https://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.55.1478
defp crf(time_delta, prev_crf) do
1 + :math.pow(0.5, 0.0001 * time_delta) * prev_crf
end
end

View file

@ -0,0 +1,45 @@
defmodule Pleroma.Gun.ConnectionPool.WorkerSupervisor do
@moduledoc "Supervisor for pool workers. Does not do anything except enforce max connection limit"
use DynamicSupervisor
def start_link(opts) do
DynamicSupervisor.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
DynamicSupervisor.init(
strategy: :one_for_one,
max_children: Pleroma.Config.get([:connections_pool, :max_connections])
)
end
def start_worker(opts, retry \\ false) do
case DynamicSupervisor.start_child(__MODULE__, {Pleroma.Gun.ConnectionPool.Worker, opts}) do
{:error, :max_children} ->
if retry or free_pool() == :error do
:telemetry.execute([:pleroma, :connection_pool, :provision_failure], %{opts: opts})
{:error, :pool_full}
else
start_worker(opts, true)
end
res ->
res
end
end
defp free_pool do
wait_for_reclaimer_finish(Pleroma.Gun.ConnectionPool.Reclaimer.start_monitor())
end
defp wait_for_reclaimer_finish({pid, mon}) do
receive do
{:DOWN, ^mon, :process, ^pid, :no_unused_conns} ->
:error
{:DOWN, ^mon, :process, ^pid, :normal} ->
:ok
end
end
end

View file

@ -3,32 +3,30 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.HTTP.AdapterHelper do
alias Pleroma.HTTP.Connection
@moduledoc """
Configure Tesla.Client with default and customized adapter options.
"""
@defaults [pool: :federation]
@type proxy_type() :: :socks4 | :socks5
@type host() :: charlist() | :inet.ip_address()
alias Pleroma.Config
alias Pleroma.HTTP.AdapterHelper
require Logger
@type proxy ::
{Connection.host(), pos_integer()}
| {Connection.proxy_type(), Connection.host(), pos_integer()}
@callback options(keyword(), URI.t()) :: keyword()
@callback after_request(keyword()) :: :ok
@spec options(keyword(), URI.t()) :: keyword()
def options(opts, _uri) do
proxy = Pleroma.Config.get([:http, :proxy_url], nil)
maybe_add_proxy(opts, format_proxy(proxy))
end
@spec maybe_get_conn(URI.t(), keyword()) :: keyword()
def maybe_get_conn(_uri, opts), do: opts
@spec after_request(keyword()) :: :ok
def after_request(_opts), do: :ok
@callback get_conn(URI.t(), keyword()) :: {:ok, term()} | {:error, term()}
@spec format_proxy(String.t() | tuple() | nil) :: proxy() | nil
def format_proxy(nil), do: nil
def format_proxy(proxy_url) do
case Connection.parse_proxy(proxy_url) do
case parse_proxy(proxy_url) do
{:ok, host, port} -> {host, port}
{:ok, type, host, port} -> {type, host, port}
_ -> nil
@ -38,4 +36,105 @@ def format_proxy(proxy_url) do
@spec maybe_add_proxy(keyword(), proxy() | nil) :: keyword()
def maybe_add_proxy(opts, nil), do: opts
def maybe_add_proxy(opts, proxy), do: Keyword.put_new(opts, :proxy, proxy)
@doc """
Merge default connection & adapter options with received ones.
"""
@spec options(URI.t(), keyword()) :: keyword()
def options(%URI{} = uri, opts \\ []) do
@defaults
|> put_timeout()
|> Keyword.merge(opts)
|> adapter_helper().options(uri)
end
# For Hackney, this is the time a connection can stay idle in the pool.
# For Gun, this is the timeout to receive a message from Gun.
defp put_timeout(opts) do
{config_key, default} =
if adapter() == Tesla.Adapter.Gun do
{:pools, Config.get([:pools, :default, :timeout], 5_000)}
else
{:hackney_pools, 10_000}
end
timeout = Config.get([config_key, opts[:pool], :timeout], default)
Keyword.merge(opts, timeout: timeout)
end
def get_conn(uri, opts), do: adapter_helper().get_conn(uri, opts)
defp adapter, do: Application.get_env(:tesla, :adapter)
defp adapter_helper do
case adapter() do
Tesla.Adapter.Gun -> AdapterHelper.Gun
Tesla.Adapter.Hackney -> AdapterHelper.Hackney
_ -> AdapterHelper.Default
end
end
@spec parse_proxy(String.t() | tuple() | nil) ::
{:ok, host(), pos_integer()}
| {:ok, proxy_type(), host(), pos_integer()}
| {:error, atom()}
| nil
def parse_proxy(nil), do: nil
def parse_proxy(proxy) when is_binary(proxy) do
with [host, port] <- String.split(proxy, ":"),
{port, ""} <- Integer.parse(port) do
{:ok, parse_host(host), port}
else
{_, _} ->
Logger.warn("Parsing port failed #{inspect(proxy)}")
{:error, :invalid_proxy_port}
:error ->
Logger.warn("Parsing port failed #{inspect(proxy)}")
{:error, :invalid_proxy_port}
_ ->
Logger.warn("Parsing proxy failed #{inspect(proxy)}")
{:error, :invalid_proxy}
end
end
def parse_proxy(proxy) when is_tuple(proxy) do
with {type, host, port} <- proxy do
{:ok, type, parse_host(host), port}
else
_ ->
Logger.warn("Parsing proxy failed #{inspect(proxy)}")
{:error, :invalid_proxy}
end
end
@spec parse_host(String.t() | atom() | charlist()) :: charlist() | :inet.ip_address()
def parse_host(host) when is_list(host), do: host
def parse_host(host) when is_atom(host), do: to_charlist(host)
def parse_host(host) when is_binary(host) do
host = to_charlist(host)
case :inet.parse_address(host) do
{:error, :einval} -> host
{:ok, ip} -> ip
end
end
@spec format_host(String.t()) :: charlist()
def format_host(host) do
host_charlist = to_charlist(host)
case :inet.parse_address(host_charlist) do
{:error, :einval} ->
:idna.encode(host_charlist)
{:ok, _ip} ->
host_charlist
end
end
end

View file

@ -0,0 +1,14 @@
defmodule Pleroma.HTTP.AdapterHelper.Default do
alias Pleroma.HTTP.AdapterHelper
@behaviour Pleroma.HTTP.AdapterHelper
@spec options(keyword(), URI.t()) :: keyword()
def options(opts, _uri) do
proxy = Pleroma.Config.get([:http, :proxy_url], nil)
AdapterHelper.maybe_add_proxy(opts, AdapterHelper.format_proxy(proxy))
end
@spec get_conn(URI.t(), keyword()) :: {:ok, keyword()}
def get_conn(_uri, opts), do: {:ok, opts}
end

View file

@ -5,8 +5,8 @@
defmodule Pleroma.HTTP.AdapterHelper.Gun do
@behaviour Pleroma.HTTP.AdapterHelper
alias Pleroma.Gun.ConnectionPool
alias Pleroma.HTTP.AdapterHelper
alias Pleroma.Pool.Connections
require Logger
@ -14,7 +14,7 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do
connect_timeout: 5_000,
domain_lookup_timeout: 5_000,
tls_handshake_timeout: 5_000,
retry: 1,
retry: 0,
retry_timeout: 1000,
await_up_timeout: 5_000
]
@ -31,16 +31,7 @@ def options(incoming_opts \\ [], %URI{} = uri) do
|> Keyword.merge(config_opts)
|> add_scheme_opts(uri)
|> AdapterHelper.maybe_add_proxy(proxy)
|> maybe_get_conn(uri, incoming_opts)
end
@spec after_request(keyword()) :: :ok
def after_request(opts) do
if opts[:conn] && opts[:body_as] != :chunks do
Connections.checkout(opts[:conn], self(), :gun_connections)
end
:ok
|> Keyword.merge(incoming_opts)
end
defp add_scheme_opts(opts, %{scheme: "http"}), do: opts
@ -48,30 +39,40 @@ defp add_scheme_opts(opts, %{scheme: "http"}), do: opts
defp add_scheme_opts(opts, %{scheme: "https"}) do
opts
|> Keyword.put(:certificates_verification, true)
|> Keyword.put(:tls_opts, log_level: :warning)
end
defp maybe_get_conn(adapter_opts, uri, incoming_opts) do
{receive_conn?, opts} =
adapter_opts
|> Keyword.merge(incoming_opts)
|> Keyword.pop(:receive_conn, true)
if Connections.alive?(:gun_connections) and receive_conn? do
checkin_conn(uri, opts)
else
opts
@spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} | {:error, atom()}
def get_conn(uri, opts) do
case ConnectionPool.get_conn(uri, opts) do
{:ok, conn_pid} -> {:ok, Keyword.merge(opts, conn: conn_pid, close_conn: false)}
err -> err
end
end
defp checkin_conn(uri, opts) do
case Connections.checkin(uri, :gun_connections) do
nil ->
Task.start(Pleroma.Gun.Conn, :open, [uri, :gun_connections, opts])
opts
@prefix Pleroma.Gun.ConnectionPool
def limiter_setup do
wait = Pleroma.Config.get([:connections_pool, :connection_acquisition_wait])
retries = Pleroma.Config.get([:connections_pool, :connection_acquisition_retries])
conn when is_pid(conn) ->
Keyword.merge(opts, conn: conn, close_conn: false)
:pools
|> Pleroma.Config.get([])
|> Enum.each(fn {name, opts} ->
max_running = Keyword.get(opts, :size, 50)
max_waiting = Keyword.get(opts, :max_waiting, 10)
result =
ConcurrentLimiter.new(:"#{@prefix}.#{name}", max_running, max_waiting,
wait: wait,
max_retries: retries
)
case result do
:ok -> :ok
{:error, :existing} -> :ok
e -> raise e
end
end)
:ok
end
end

View file

@ -24,5 +24,6 @@ def options(connection_opts \\ [], %URI{} = uri) do
defp add_scheme_opts(opts, _), do: opts
def after_request(_), do: :ok
@spec get_conn(URI.t(), keyword()) :: {:ok, keyword()}
def get_conn(_uri, opts), do: {:ok, opts}
end

View file

@ -1,124 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.HTTP.Connection do
@moduledoc """
Configure Tesla.Client with default and customized adapter options.
"""
alias Pleroma.Config
alias Pleroma.HTTP.AdapterHelper
require Logger
@defaults [pool: :federation]
@type ip_address :: ipv4_address() | ipv6_address()
@type ipv4_address :: {0..255, 0..255, 0..255, 0..255}
@type ipv6_address ::
{0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535}
@type proxy_type() :: :socks4 | :socks5
@type host() :: charlist() | ip_address()
@doc """
Merge default connection & adapter options with received ones.
"""
@spec options(URI.t(), keyword()) :: keyword()
def options(%URI{} = uri, opts \\ []) do
@defaults
|> pool_timeout()
|> Keyword.merge(opts)
|> adapter_helper().options(uri)
end
defp pool_timeout(opts) do
{config_key, default} =
if adapter() == Tesla.Adapter.Gun do
{:pools, Config.get([:pools, :default, :timeout])}
else
{:hackney_pools, 10_000}
end
timeout = Config.get([config_key, opts[:pool], :timeout], default)
Keyword.merge(opts, timeout: timeout)
end
@spec after_request(keyword()) :: :ok
def after_request(opts), do: adapter_helper().after_request(opts)
defp adapter, do: Application.get_env(:tesla, :adapter)
defp adapter_helper do
case adapter() do
Tesla.Adapter.Gun -> AdapterHelper.Gun
Tesla.Adapter.Hackney -> AdapterHelper.Hackney
_ -> AdapterHelper
end
end
@spec parse_proxy(String.t() | tuple() | nil) ::
{:ok, host(), pos_integer()}
| {:ok, proxy_type(), host(), pos_integer()}
| {:error, atom()}
| nil
def parse_proxy(nil), do: nil
def parse_proxy(proxy) when is_binary(proxy) do
with [host, port] <- String.split(proxy, ":"),
{port, ""} <- Integer.parse(port) do
{:ok, parse_host(host), port}
else
{_, _} ->
Logger.warn("Parsing port failed #{inspect(proxy)}")
{:error, :invalid_proxy_port}
:error ->
Logger.warn("Parsing port failed #{inspect(proxy)}")
{:error, :invalid_proxy_port}
_ ->
Logger.warn("Parsing proxy failed #{inspect(proxy)}")
{:error, :invalid_proxy}
end
end
def parse_proxy(proxy) when is_tuple(proxy) do
with {type, host, port} <- proxy do
{:ok, type, parse_host(host), port}
else
_ ->
Logger.warn("Parsing proxy failed #{inspect(proxy)}")
{:error, :invalid_proxy}
end
end
@spec parse_host(String.t() | atom() | charlist()) :: charlist() | ip_address()
def parse_host(host) when is_list(host), do: host
def parse_host(host) when is_atom(host), do: to_charlist(host)
def parse_host(host) when is_binary(host) do
host = to_charlist(host)
case :inet.parse_address(host) do
{:error, :einval} -> host
{:ok, ip} -> ip
end
end
@spec format_host(String.t()) :: charlist()
def format_host(host) do
host_charlist = to_charlist(host)
case :inet.parse_address(host_charlist) do
{:error, :einval} ->
:idna.encode(host_charlist)
{:ok, _ip} ->
host_charlist
end
end
end

View file

@ -7,7 +7,7 @@ defmodule Pleroma.HTTP do
Wrapper for `Tesla.request/2`.
"""
alias Pleroma.HTTP.Connection
alias Pleroma.HTTP.AdapterHelper
alias Pleroma.HTTP.Request
alias Pleroma.HTTP.RequestBuilder, as: Builder
alias Tesla.Client
@ -60,49 +60,29 @@ def post(url, body, headers \\ [], options \\ []),
{:ok, Env.t()} | {:error, any()}
def request(method, url, body, headers, options) when is_binary(url) do
uri = URI.parse(url)
adapter_opts = Connection.options(uri, options[:adapter] || [])
adapter_opts = AdapterHelper.options(uri, options[:adapter] || [])
case AdapterHelper.get_conn(uri, adapter_opts) do
{:ok, adapter_opts} ->
options = put_in(options[:adapter], adapter_opts)
params = options[:params] || []
request = build_request(method, headers, options, url, body, params)
adapter = Application.get_env(:tesla, :adapter)
client = Tesla.client([Tesla.Middleware.FollowRedirects], adapter)
client = Tesla.client([Pleroma.HTTP.Middleware.FollowRedirects], adapter)
pid = Process.whereis(adapter_opts[:pool])
pool_alive? =
if adapter == Tesla.Adapter.Gun && pid do
Process.alive?(pid)
else
false
end
request_opts =
maybe_limit(
fn ->
request(client, request)
end,
adapter,
adapter_opts
|> Enum.into(%{})
|> Map.put(:env, Pleroma.Config.get([:env]))
|> Map.put(:pool_alive?, pool_alive?)
response = request(client, request, request_opts)
Connection.after_request(adapter_opts)
response
end
@spec request(Client.t(), keyword(), map()) :: {:ok, Env.t()} | {:error, any()}
def request(%Client{} = client, request, %{env: :test}), do: request(client, request)
def request(%Client{} = client, request, %{body_as: :chunks}), do: request(client, request)
def request(%Client{} = client, request, %{pool_alive?: false}), do: request(client, request)
def request(%Client{} = client, request, %{pool: pool, timeout: timeout}) do
:poolboy.transaction(
pool,
&Pleroma.Pool.Request.execute(&1, client, request, timeout),
timeout
)
# Connection release is handled in a custom FollowRedirects middleware
err ->
err
end
end
@spec request(Client.t(), keyword()) :: {:ok, Env.t()} | {:error, any()}
@ -118,4 +98,13 @@ defp build_request(method, headers, options, url, body, params) do
|> Builder.add_param(:query, :query, params)
|> Builder.convert_to_keyword()
end
@prefix Pleroma.Gun.ConnectionPool
defp maybe_limit(fun, Tesla.Adapter.Gun, opts) do
ConcurrentLimiter.limit(:"#{@prefix}.#{opts[:pool] || :default}", fun)
end
defp maybe_limit(fun, _, _) do
fun.()
end
end

View file

@ -1,283 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Pool.Connections do
use GenServer
alias Pleroma.Config
alias Pleroma.Gun
require Logger
@type domain :: String.t()
@type conn :: Pleroma.Gun.Conn.t()
@type t :: %__MODULE__{
conns: %{domain() => conn()},
opts: keyword()
}
defstruct conns: %{}, opts: []
@spec start_link({atom(), keyword()}) :: {:ok, pid()}
def start_link({name, opts}) do
GenServer.start_link(__MODULE__, opts, name: name)
end
@impl true
def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}}
@spec checkin(String.t() | URI.t(), atom()) :: pid() | nil
def checkin(url, name)
def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name)
def checkin(%URI{} = uri, name) do
timeout = Config.get([:connections_pool, :checkin_timeout], 250)
GenServer.call(name, {:checkin, uri}, timeout)
end
@spec alive?(atom()) :: boolean()
def alive?(name) do
if pid = Process.whereis(name) do
Process.alive?(pid)
else
false
end
end
@spec get_state(atom()) :: t()
def get_state(name) do
GenServer.call(name, :state)
end
@spec count(atom()) :: pos_integer()
def count(name) do
GenServer.call(name, :count)
end
@spec get_unused_conns(atom()) :: [{domain(), conn()}]
def get_unused_conns(name) do
GenServer.call(name, :unused_conns)
end
@spec checkout(pid(), pid(), atom()) :: :ok
def checkout(conn, pid, name) do
GenServer.cast(name, {:checkout, conn, pid})
end
@spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok
def add_conn(name, key, conn) do
GenServer.cast(name, {:add_conn, key, conn})
end
@spec remove_conn(atom(), String.t()) :: :ok
def remove_conn(name, key) do
GenServer.cast(name, {:remove_conn, key})
end
@impl true
def handle_cast({:add_conn, key, conn}, state) do
state = put_in(state.conns[key], conn)
Process.monitor(conn.conn)
{:noreply, state}
end
@impl true
def handle_cast({:checkout, conn_pid, pid}, state) do
state =
with true <- Process.alive?(conn_pid),
{key, conn} <- find_conn(state.conns, conn_pid),
used_by <- List.keydelete(conn.used_by, pid, 0) do
conn_state = if used_by == [], do: :idle, else: conn.conn_state
put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by})
else
false ->
Logger.debug("checkout for closed conn #{inspect(conn_pid)}")
state
nil ->
Logger.debug("checkout for alive conn #{inspect(conn_pid)}, but is not in state")
state
end
{:noreply, state}
end
@impl true
def handle_cast({:remove_conn, key}, state) do
state = put_in(state.conns, Map.delete(state.conns, key))
{:noreply, state}
end
@impl true
def handle_call({:checkin, uri}, from, state) do
key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
case state.conns[key] do
%{conn: pid, gun_state: :up} = conn ->
time = :os.system_time(:second)
last_reference = time - conn.last_reference
crf = crf(last_reference, 100, conn.crf)
state =
put_in(state.conns[key], %{
conn
| last_reference: time,
crf: crf,
conn_state: :active,
used_by: [from | conn.used_by]
})
{:reply, pid, state}
%{gun_state: :down} ->
{:reply, nil, state}
nil ->
{:reply, nil, state}
end
end
@impl true
def handle_call(:state, _from, state), do: {:reply, state, state}
@impl true
def handle_call(:count, _from, state) do
{:reply, Enum.count(state.conns), state}
end
@impl true
def handle_call(:unused_conns, _from, state) do
unused_conns =
state.conns
|> Enum.filter(&filter_conns/1)
|> Enum.sort(&sort_conns/2)
{:reply, unused_conns, state}
end
defp filter_conns({_, %{conn_state: :idle, used_by: []}}), do: true
defp filter_conns(_), do: false
defp sort_conns({_, c1}, {_, c2}) do
c1.crf <= c2.crf and c1.last_reference <= c2.last_reference
end
@impl true
def handle_info({:gun_up, conn_pid, _protocol}, state) do
%{origin_host: host, origin_scheme: scheme, origin_port: port} = Gun.info(conn_pid)
host =
case :inet.ntoa(host) do
{:error, :einval} -> host
ip -> ip
end
key = "#{scheme}:#{host}:#{port}"
state =
with {key, conn} <- find_conn(state.conns, conn_pid, key),
{true, key} <- {Process.alive?(conn_pid), key} do
put_in(state.conns[key], %{
conn
| gun_state: :up,
conn_state: :active,
retries: 0
})
else
{false, key} ->
put_in(
state.conns,
Map.delete(state.conns, key)
)
nil ->
:ok = Gun.close(conn_pid)
state
end
{:noreply, state}
end
@impl true
def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do
retries = Config.get([:connections_pool, :retry], 1)
# we can't get info on this pid, because pid is dead
state =
with {key, conn} <- find_conn(state.conns, conn_pid),
{true, key} <- {Process.alive?(conn_pid), key} do
if conn.retries == retries do
:ok = Gun.close(conn.conn)
put_in(
state.conns,
Map.delete(state.conns, key)
)
else
put_in(state.conns[key], %{
conn
| gun_state: :down,
retries: conn.retries + 1
})
end
else
{false, key} ->
put_in(
state.conns,
Map.delete(state.conns, key)
)
nil ->
Logger.debug(":gun_down for conn which isn't found in state")
state
end
{:noreply, state}
end
@impl true
def handle_info({:DOWN, _ref, :process, conn_pid, reason}, state) do
Logger.debug("received DOWN message for #{inspect(conn_pid)} reason -> #{inspect(reason)}")
state =
with {key, conn} <- find_conn(state.conns, conn_pid) do
Enum.each(conn.used_by, fn {pid, _ref} ->
Process.exit(pid, reason)
end)
put_in(
state.conns,
Map.delete(state.conns, key)
)
else
nil ->
Logger.debug(":DOWN for conn which isn't found in state")
state
end
{:noreply, state}
end
defp find_conn(conns, conn_pid) do
Enum.find(conns, fn {_key, conn} ->
conn.conn == conn_pid
end)
end
defp find_conn(conns, conn_pid, conn_key) do
Enum.find(conns, fn {key, conn} ->
key == conn_key and conn.conn == conn_pid
end)
end
def crf(current, steps, crf) do
1 + :math.pow(0.5, current / steps) * crf
end
end

View file

@ -1,22 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Pool do
def child_spec(opts) do
poolboy_opts =
opts
|> Keyword.put(:worker_module, Pleroma.Pool.Request)
|> Keyword.put(:name, {:local, opts[:name]})
|> Keyword.put(:size, opts[:size])
|> Keyword.put(:max_overflow, opts[:max_overflow])
%{
id: opts[:id] || {__MODULE__, make_ref()},
start: {:poolboy, :start_link, [poolboy_opts, [name: opts[:name]]]},
restart: :permanent,
shutdown: 5000,
type: :worker
}
end
end

View file

@ -1,65 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Pool.Request do
use GenServer
require Logger
def start_link(args) do
GenServer.start_link(__MODULE__, args)
end
@impl true
def init(_), do: {:ok, []}
@spec execute(pid() | atom(), Tesla.Client.t(), keyword(), pos_integer()) ::
{:ok, Tesla.Env.t()} | {:error, any()}
def execute(pid, client, request, timeout) do
GenServer.call(pid, {:execute, client, request}, timeout)
end
@impl true
def handle_call({:execute, client, request}, _from, state) do
response = Pleroma.HTTP.request(client, request)
{:reply, response, state}
end
@impl true
def handle_info({:gun_data, _conn, _stream, _, _}, state) do
{:noreply, state}
end
@impl true
def handle_info({:gun_up, _conn, _protocol}, state) do
{:noreply, state}
end
@impl true
def handle_info({:gun_down, _conn, _protocol, _reason, _killed}, state) do
{:noreply, state}
end
@impl true
def handle_info({:gun_error, _conn, _stream, _error}, state) do
{:noreply, state}
end
@impl true
def handle_info({:gun_push, _conn, _stream, _new_stream, _method, _uri, _headers}, state) do
{:noreply, state}
end
@impl true
def handle_info({:gun_response, _conn, _stream, _, _status, _headers}, state) do
{:noreply, state}
end
@impl true
def handle_info(msg, state) do
Logger.warn("Received unexpected message #{inspect(__MODULE__)} #{inspect(msg)}")
{:noreply, state}
end
end

View file

@ -1,42 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Pool.Supervisor do
use Supervisor
alias Pleroma.Config
alias Pleroma.Pool
def start_link(args) do
Supervisor.start_link(__MODULE__, args, name: __MODULE__)
end
def init(_) do
conns_child = %{
id: Pool.Connections,
start:
{Pool.Connections, :start_link, [{:gun_connections, Config.get([:connections_pool])}]}
}
Supervisor.init([conns_child | pools()], strategy: :one_for_one)
end
defp pools do
pools = Config.get(:pools)
pools =
if Config.get([Pleroma.Upload, :proxy_remote]) == false do
Keyword.delete(pools, :upload)
else
pools
end
for {pool_name, pool_opts} <- pools do
pool_opts
|> Keyword.put(:id, {Pool, pool_name})
|> Keyword.put(:name, pool_name)
|> Pool.child_spec()
end
end
end

View file

@ -48,7 +48,7 @@ def stream_body(%{pid: pid, opts: opts, fin: true}) do
# if there were redirects we need to checkout old conn
conn = opts[:old_conn] || opts[:conn]
if conn, do: :ok = Pleroma.Pool.Connections.checkout(conn, self(), :gun_connections)
if conn, do: :ok = Pleroma.Gun.ConnectionPool.release_conn(conn)
:done
end

View file

@ -0,0 +1,76 @@
defmodule Pleroma.Telemetry.Logger do
@moduledoc "Transforms Pleroma telemetry events to logs"
require Logger
@events [
[:pleroma, :connection_pool, :reclaim, :start],
[:pleroma, :connection_pool, :reclaim, :stop],
[:pleroma, :connection_pool, :provision_failure],
[:pleroma, :connection_pool, :client_death]
]
def attach do
:telemetry.attach_many("pleroma-logger", @events, &handle_event/4, [])
end
# Passing anonymous functions instead of strings to logger is intentional,
# that way strings won't be concatenated if the message is going to be thrown
# out anyway due to higher log level configured
def handle_event(
[:pleroma, :connection_pool, :reclaim, :start],
_,
%{max_connections: max_connections, reclaim_max: reclaim_max},
_
) do
Logger.debug(fn ->
"Connection pool is exhausted (reached #{max_connections} connections). Starting idle connection cleanup to reclaim as much as #{
reclaim_max
} connections"
end)
end
def handle_event(
[:pleroma, :connection_pool, :reclaim, :stop],
%{reclaimed_count: 0},
_,
_
) do
Logger.error(fn ->
"Connection pool failed to reclaim any connections due to all of them being in use. It will have to drop requests for opening connections to new hosts"
end)
end
def handle_event(
[:pleroma, :connection_pool, :reclaim, :stop],
%{reclaimed_count: reclaimed_count},
_,
_
) do
Logger.debug(fn -> "Connection pool cleaned up #{reclaimed_count} idle connections" end)
end
def handle_event(
[:pleroma, :connection_pool, :provision_failure],
%{opts: [key | _]},
_,
_
) do
Logger.error(fn ->
"Connection pool had to refuse opening a connection to #{key} due to connection limit exhaustion"
end)
end
def handle_event(
[:pleroma, :connection_pool, :client_death],
%{client_pid: client_pid, reason: reason},
%{key: key},
_
) do
Logger.warn(fn ->
"Pool worker for #{key}: Client #{inspect(client_pid)} died before releasing the connection with #{
inspect(reason)
}"
end)
end
end

View file

@ -0,0 +1,110 @@
# Pleroma: A lightweight social networking server
# Copyright © 2015-2020 Tymon Tobolski <https://github.com/teamon/tesla/blob/master/lib/tesla/middleware/follow_redirects.ex>
# Copyright © 2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.HTTP.Middleware.FollowRedirects do
@moduledoc """
Pool-aware version of https://github.com/teamon/tesla/blob/master/lib/tesla/middleware/follow_redirects.ex
Follow 3xx redirects
## Options
- `:max_redirects` - limit number of redirects (default: `5`)
"""
alias Pleroma.Gun.ConnectionPool
@behaviour Tesla.Middleware
@max_redirects 5
@redirect_statuses [301, 302, 303, 307, 308]
@impl Tesla.Middleware
def call(env, next, opts \\ []) do
max = Keyword.get(opts, :max_redirects, @max_redirects)
redirect(env, next, max)
end
defp redirect(env, next, left) do
opts = env.opts[:adapter]
case Tesla.run(env, next) do
{:ok, %{status: status} = res} when status in @redirect_statuses and left > 0 ->
release_conn(opts)
case Tesla.get_header(res, "location") do
nil ->
{:ok, res}
location ->
location = parse_location(location, res)
case get_conn(location, opts) do
{:ok, opts} ->
%{env | opts: Keyword.put(env.opts, :adapter, opts)}
|> new_request(res.status, location)
|> redirect(next, left - 1)
e ->
e
end
end
{:ok, %{status: status}} when status in @redirect_statuses ->
release_conn(opts)
{:error, {__MODULE__, :too_many_redirects}}
{:error, _} = e ->
release_conn(opts)
e
other ->
unless opts[:body_as] == :chunks do
release_conn(opts)
end
other
end
end
defp get_conn(location, opts) do
uri = URI.parse(location)
case ConnectionPool.get_conn(uri, opts) do
{:ok, conn} ->
{:ok, Keyword.merge(opts, conn: conn)}
e ->
e
end
end
defp release_conn(opts) do
ConnectionPool.release_conn(opts[:conn])
end
# The 303 (See Other) redirect was added in HTTP/1.1 to indicate that the originally
# requested resource is not available, however a related resource (or another redirect)
# available via GET is available at the specified location.
# https://tools.ietf.org/html/rfc7231#section-6.4.4
defp new_request(env, 303, location), do: %{env | url: location, method: :get, query: []}
# The 307 (Temporary Redirect) status code indicates that the target
# resource resides temporarily under a different URI and the user agent
# MUST NOT change the request method (...)
# https://tools.ietf.org/html/rfc7231#section-6.4.7
defp new_request(env, 307, location), do: %{env | url: location}
defp new_request(env, _, location), do: %{env | url: location, query: []}
defp parse_location("https://" <> _rest = location, _env), do: location
defp parse_location("http://" <> _rest = location, _env), do: location
defp parse_location(location, env) do
env.url
|> URI.parse()
|> URI.merge(location)
|> URI.to_string()
end
end

View file

@ -135,13 +135,11 @@ defp deps do
{:poison, "~> 3.0", override: true},
# {:tesla, "~> 1.3", override: true},
{:tesla,
git: "https://git.pleroma.social/pleroma/elixir-libraries/tesla.git",
ref: "61b7503cef33f00834f78ddfafe0d5d9dec2270b",
override: true},
github: "teamon/tesla", ref: "af3707078b10793f6a534938e56b963aff82fe3c", override: true},
{:castore, "~> 0.1"},
{:cowlib, "~> 2.8", override: true},
{:gun,
github: "ninenines/gun", ref: "e1a69b36b180a574c0ac314ced9613fdd52312cc", override: true},
github: "ninenines/gun", ref: "921c47146b2d9567eac7e9a4d2ccc60fffd4f327", override: true},
{:jason, "~> 1.0"},
{:mogrify, "~> 0.6.1"},
{:ex_aws, "~> 2.1"},
@ -191,6 +189,9 @@ defp deps do
{:plug_static_index_html, "~> 1.0.0"},
{:excoveralls, "~> 0.12.1", only: :test},
{:flake_id, "~> 0.1.0"},
{:concurrent_limiter,
git: "https://git.pleroma.social/pleroma/elixir-libraries/concurrent_limiter",
ref: "8eee96c6ba39b9286ec44c51c52d9f2758951365"},
{:remote_ip,
git: "https://git.pleroma.social/pleroma/remote_ip.git",
ref: "b647d0deecaa3acb140854fe4bda5b7e1dc6d1c8"},

View file

@ -15,6 +15,7 @@
"certifi": {:hex, :certifi, "2.5.2", "b7cfeae9d2ed395695dd8201c57a2d019c0c43ecaf8b8bcb9320b40d6662f340", [:rebar3], [{:parse_trans, "~>3.3", [hex: :parse_trans, repo: "hexpm", optional: false]}], "hexpm", "3b3b5f36493004ac3455966991eaf6e768ce9884693d9968055aeeeb1e575040"},
"combine": {:hex, :combine, "0.10.0", "eff8224eeb56498a2af13011d142c5e7997a80c8f5b97c499f84c841032e429f", [:mix], [], "hexpm", "1b1dbc1790073076580d0d1d64e42eae2366583e7aecd455d1215b0d16f2451b"},
"comeonin": {:hex, :comeonin, "5.3.1", "7fe612b739c78c9c1a75186ef2d322ce4d25032d119823269d0aa1e2f1e20025", [:mix], [], "hexpm", "d6222483060c17f0977fad1b7401ef0c5863c985a64352755f366aee3799c245"},
"concurrent_limiter": {:git, "https://git.pleroma.social/pleroma/elixir-libraries/concurrent_limiter", "8eee96c6ba39b9286ec44c51c52d9f2758951365", [ref: "8eee96c6ba39b9286ec44c51c52d9f2758951365"]},
"connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [:mix], [], "hexpm", "4a0850c9be22a43af9920a71ab17c051f5f7d45c209e40269a1938832510e4d9"},
"cors_plug": {:hex, :cors_plug, "1.5.2", "72df63c87e4f94112f458ce9d25800900cc88608c1078f0e4faddf20933eda6e", [:mix], [{:plug, "~> 1.3 or ~> 1.4 or ~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "9af027d20dc12dd0c4345a6b87247e0c62965871feea0bfecf9764648b02cc69"},
"cowboy": {:hex, :cowboy, "2.7.0", "91ed100138a764355f43316b1d23d7ff6bdb0de4ea618cb5d8677c93a7a2f115", [:rebar3], [{:cowlib, "~> 2.8.0", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "04fd8c6a39edc6aaa9c26123009200fc61f92a3a94f3178c527b70b767c6e605"},
@ -49,7 +50,7 @@
"gen_stage": {:hex, :gen_stage, "0.14.3", "d0c66f1c87faa301c1a85a809a3ee9097a4264b2edf7644bf5c123237ef732bf", [:mix], [], "hexpm"},
"gen_state_machine": {:hex, :gen_state_machine, "2.0.5", "9ac15ec6e66acac994cc442dcc2c6f9796cf380ec4b08267223014be1c728a95", [:mix], [], "hexpm"},
"gettext": {:hex, :gettext, "0.17.4", "f13088e1ec10ce01665cf25f5ff779e7df3f2dc71b37084976cf89d1aa124d5c", [:mix], [], "hexpm", "3c75b5ea8288e2ee7ea503ff9e30dfe4d07ad3c054576a6e60040e79a801e14d"},
"gun": {:git, "https://github.com/ninenines/gun.git", "e1a69b36b180a574c0ac314ced9613fdd52312cc", [ref: "e1a69b36b180a574c0ac314ced9613fdd52312cc"]},
"gun": {:git, "https://github.com/ninenines/gun.git", "921c47146b2d9567eac7e9a4d2ccc60fffd4f327", [ref: "921c47146b2d9567eac7e9a4d2ccc60fffd4f327"]},
"hackney": {:hex, :hackney, "1.16.0", "5096ac8e823e3a441477b2d187e30dd3fff1a82991a806b2003845ce72ce2d84", [:rebar3], [{:certifi, "2.5.2", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.1", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.0", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.6", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "3bf0bebbd5d3092a3543b783bf065165fa5d3ad4b899b836810e513064134e18"},
"html_entities": {:hex, :html_entities, "0.5.1", "1c9715058b42c35a2ab65edc5b36d0ea66dd083767bef6e3edb57870ef556549", [:mix], [], "hexpm", "30efab070904eb897ff05cd52fa61c1025d7f8ef3a9ca250bc4e6513d16c32de"},
"html_sanitize_ex": {:hex, :html_sanitize_ex, "1.3.0", "f005ad692b717691203f940c686208aa3d8ffd9dd4bb3699240096a51fa9564e", [:mix], [{:mochiweb, "~> 2.15", [hex: :mochiweb, repo: "hexpm", optional: false]}], "hexpm"},
@ -107,7 +108,7 @@
"swoosh": {:git, "https://github.com/swoosh/swoosh", "c96e0ca8a00d8f211ec1f042a4626b09f249caa5", [ref: "c96e0ca8a00d8f211ec1f042a4626b09f249caa5"]},
"syslog": {:hex, :syslog, "1.1.0", "6419a232bea84f07b56dc575225007ffe34d9fdc91abe6f1b2f254fd71d8efc2", [:rebar3], [], "hexpm", "4c6a41373c7e20587be33ef841d3de6f3beba08519809329ecc4d27b15b659e1"},
"telemetry": {:hex, :telemetry, "0.4.2", "2808c992455e08d6177322f14d3bdb6b625fbcfd233a73505870d8738a2f4599", [:rebar3], [], "hexpm", "2d1419bd9dda6a206d7b5852179511722e2b18812310d304620c7bd92a13fcef"},
"tesla": {:git, "https://git.pleroma.social/pleroma/elixir-libraries/tesla.git", "61b7503cef33f00834f78ddfafe0d5d9dec2270b", [ref: "61b7503cef33f00834f78ddfafe0d5d9dec2270b"]},
"tesla": {:git, "https://github.com/teamon/tesla.git", "af3707078b10793f6a534938e56b963aff82fe3c", [ref: "af3707078b10793f6a534938e56b963aff82fe3c"]},
"timex": {:hex, :timex, "3.6.1", "efdf56d0e67a6b956cc57774353b0329c8ab7726766a11547e529357ffdc1d56", [:mix], [{:combine, "~> 0.10", [hex: :combine, repo: "hexpm", optional: false]}, {:gettext, "~> 0.10", [hex: :gettext, repo: "hexpm", optional: false]}, {:tzdata, "~> 0.1.8 or ~> 0.5 or ~> 1.0.0", [hex: :tzdata, repo: "hexpm", optional: false]}], "hexpm", "f354efb2400dd7a80fd9eb6c8419068c4f632da4ac47f3d8822d6e33f08bc852"},
"trailing_format_plug": {:hex, :trailing_format_plug, "0.0.7", "64b877f912cf7273bed03379936df39894149e35137ac9509117e59866e10e45", [:mix], [{:plug, "> 0.12.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "bd4fde4c15f3e993a999e019d64347489b91b7a9096af68b2bdadd192afa693f"},
"tzdata": {:hex, :tzdata, "1.0.3", "73470ad29dde46e350c60a66e6b360d3b99d2d18b74c4c349dbebbc27a09a3eb", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "a6e1ee7003c4d04ecbd21dd3ec690d4c6662db5d3bbdd7262d53cdf5e7c746c1"},

View file

@ -0,0 +1,101 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Gun.ConnectionPoolTest do
use Pleroma.DataCase
import Mox
import ExUnit.CaptureLog
alias Pleroma.Config
alias Pleroma.Gun.ConnectionPool
defp gun_mock(_) do
Pleroma.GunMock
|> stub(:open, fn _, _, _ -> Task.start_link(fn -> Process.sleep(100) end) end)
|> stub(:await_up, fn _, _ -> {:ok, :http} end)
|> stub(:set_owner, fn _, _ -> :ok end)
:ok
end
setup :set_mox_from_context
setup :gun_mock
test "gives the same connection to 2 concurrent requests" do
Enum.map(
[
"http://www.korean-books.com.kp/KBMbooks/en/periodic/pictorial/20200530163914.pdf",
"http://www.korean-books.com.kp/KBMbooks/en/periodic/pictorial/20200528183427.pdf"
],
fn uri ->
uri = URI.parse(uri)
task_parent = self()
Task.start_link(fn ->
{:ok, conn} = ConnectionPool.get_conn(uri, [])
ConnectionPool.release_conn(conn)
send(task_parent, conn)
end)
end
)
[pid, pid] =
for _ <- 1..2 do
receive do
pid -> pid
end
end
end
test "connection limit is respected with concurrent requests" do
clear_config([:connections_pool, :max_connections]) do
Config.put([:connections_pool, :max_connections], 1)
# The supervisor needs a reboot to apply the new config setting
Process.exit(Process.whereis(Pleroma.Gun.ConnectionPool.WorkerSupervisor), :kill)
on_exit(fn ->
Process.exit(Process.whereis(Pleroma.Gun.ConnectionPool.WorkerSupervisor), :kill)
end)
end
capture_log(fn ->
Enum.map(
[
"https://ninenines.eu/",
"https://youtu.be/PFGwMiDJKNY"
],
fn uri ->
uri = URI.parse(uri)
task_parent = self()
Task.start_link(fn ->
result = ConnectionPool.get_conn(uri, [])
# Sleep so that we don't end up with a situation,
# where request from the second process gets processed
# only after the first process already released the connection
Process.sleep(50)
case result do
{:ok, pid} ->
ConnectionPool.release_conn(pid)
_ ->
nil
end
send(task_parent, result)
end)
end
)
[{:error, :pool_full}, {:ok, _pid}] =
for _ <- 1..2 do
receive do
result -> result
end
end
|> Enum.sort()
end)
end
end

View file

@ -9,24 +9,10 @@ defmodule Pleroma.HTTP.AdapterHelper.GunTest do
import Mox
alias Pleroma.Config
alias Pleroma.Gun.Conn
alias Pleroma.HTTP.AdapterHelper.Gun
alias Pleroma.Pool.Connections
setup :verify_on_exit!
defp gun_mock(_) do
gun_mock()
:ok
end
defp gun_mock do
Pleroma.GunMock
|> stub(:open, fn _, _, _ -> Task.start_link(fn -> Process.sleep(1000) end) end)
|> stub(:await_up, fn _, _ -> {:ok, :http} end)
|> stub(:set_owner, fn _, _ -> :ok end)
end
describe "options/1" do
setup do: clear_config([:http, :adapter], a: 1, b: 2)
@ -35,7 +21,6 @@ test "https url with default port" do
opts = Gun.options([receive_conn: false], uri)
assert opts[:certificates_verification]
assert opts[:tls_opts][:log_level] == :warning
end
test "https ipv4 with default port" do
@ -43,7 +28,6 @@ test "https ipv4 with default port" do
opts = Gun.options([receive_conn: false], uri)
assert opts[:certificates_verification]
assert opts[:tls_opts][:log_level] == :warning
end
test "https ipv6 with default port" do
@ -51,7 +35,6 @@ test "https ipv6 with default port" do
opts = Gun.options([receive_conn: false], uri)
assert opts[:certificates_verification]
assert opts[:tls_opts][:log_level] == :warning
end
test "https url with non standart port" do
@ -62,46 +45,12 @@ test "https url with non standart port" do
assert opts[:certificates_verification]
end
test "get conn on next request" do
gun_mock()
level = Application.get_env(:logger, :level)
Logger.configure(level: :debug)
on_exit(fn -> Logger.configure(level: level) end)
uri = URI.parse("http://some-domain2.com")
opts = Gun.options(uri)
assert opts[:conn] == nil
assert opts[:close_conn] == nil
Process.sleep(50)
opts = Gun.options(uri)
assert is_pid(opts[:conn])
assert opts[:close_conn] == false
end
test "merges with defaul http adapter config" do
defaults = Gun.options([receive_conn: false], URI.parse("https://example.com"))
assert Keyword.has_key?(defaults, :a)
assert Keyword.has_key?(defaults, :b)
end
test "default ssl adapter opts with connection" do
gun_mock()
uri = URI.parse("https://some-domain.com")
:ok = Conn.open(uri, :gun_connections)
opts = Gun.options(uri)
assert opts[:certificates_verification]
refute opts[:tls_opts] == []
assert opts[:close_conn] == false
assert is_pid(opts[:conn])
end
test "parses string proxy host & port" do
proxy = Config.get([:http, :proxy_url])
Config.put([:http, :proxy_url], "localhost:8123")
@ -132,127 +81,4 @@ test "passed opts have more weight than defaults" do
assert opts[:proxy] == {'example.com', 4321}
end
end
describe "options/1 with receive_conn parameter" do
setup :gun_mock
test "receive conn by default" do
uri = URI.parse("http://another-domain.com")
:ok = Conn.open(uri, :gun_connections)
received_opts = Gun.options(uri)
assert received_opts[:close_conn] == false
assert is_pid(received_opts[:conn])
end
test "don't receive conn if receive_conn is false" do
uri = URI.parse("http://another-domain.com")
:ok = Conn.open(uri, :gun_connections)
opts = [receive_conn: false]
received_opts = Gun.options(opts, uri)
assert received_opts[:close_conn] == nil
assert received_opts[:conn] == nil
end
end
describe "after_request/1" do
setup :gun_mock
test "body_as not chunks" do
uri = URI.parse("http://some-domain.com")
:ok = Conn.open(uri, :gun_connections)
opts = Gun.options(uri)
:ok = Gun.after_request(opts)
conn = opts[:conn]
assert %Connections{
conns: %{
"http:some-domain.com:80" => %Pleroma.Gun.Conn{
conn: ^conn,
conn_state: :idle,
used_by: []
}
}
} = Connections.get_state(:gun_connections)
end
test "body_as chunks" do
uri = URI.parse("http://some-domain.com")
:ok = Conn.open(uri, :gun_connections)
opts = Gun.options([body_as: :chunks], uri)
:ok = Gun.after_request(opts)
conn = opts[:conn]
self = self()
assert %Connections{
conns: %{
"http:some-domain.com:80" => %Pleroma.Gun.Conn{
conn: ^conn,
conn_state: :active,
used_by: [{^self, _}]
}
}
} = Connections.get_state(:gun_connections)
end
test "with no connection" do
uri = URI.parse("http://uniq-domain.com")
:ok = Conn.open(uri, :gun_connections)
opts = Gun.options([body_as: :chunks], uri)
conn = opts[:conn]
opts = Keyword.delete(opts, :conn)
self = self()
:ok = Gun.after_request(opts)
assert %Connections{
conns: %{
"http:uniq-domain.com:80" => %Pleroma.Gun.Conn{
conn: ^conn,
conn_state: :active,
used_by: [{^self, _}]
}
}
} = Connections.get_state(:gun_connections)
end
test "with ipv4" do
uri = URI.parse("http://127.0.0.1")
:ok = Conn.open(uri, :gun_connections)
opts = Gun.options(uri)
:ok = Gun.after_request(opts)
conn = opts[:conn]
assert %Connections{
conns: %{
"http:127.0.0.1:80" => %Pleroma.Gun.Conn{
conn: ^conn,
conn_state: :idle,
used_by: []
}
}
} = Connections.get_state(:gun_connections)
end
test "with ipv6" do
uri = URI.parse("http://[2a03:2880:f10c:83:face:b00c:0:25de]")
:ok = Conn.open(uri, :gun_connections)
opts = Gun.options(uri)
:ok = Gun.after_request(opts)
conn = opts[:conn]
assert %Connections{
conns: %{
"http:2a03:2880:f10c:83:face:b00c:0:25de:80" => %Pleroma.Gun.Conn{
conn: ^conn,
conn_state: :idle,
used_by: []
}
}
} = Connections.get_state(:gun_connections)
end
end
end

View file

@ -1,135 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.HTTP.ConnectionTest do
use ExUnit.Case
use Pleroma.Tests.Helpers
import ExUnit.CaptureLog
alias Pleroma.Config
alias Pleroma.HTTP.Connection
describe "parse_host/1" do
test "as atom to charlist" do
assert Connection.parse_host(:localhost) == 'localhost'
end
test "as string to charlist" do
assert Connection.parse_host("localhost.com") == 'localhost.com'
end
test "as string ip to tuple" do
assert Connection.parse_host("127.0.0.1") == {127, 0, 0, 1}
end
end
describe "parse_proxy/1" do
test "ip with port" do
assert Connection.parse_proxy("127.0.0.1:8123") == {:ok, {127, 0, 0, 1}, 8123}
end
test "host with port" do
assert Connection.parse_proxy("localhost:8123") == {:ok, 'localhost', 8123}
end
test "as tuple" do
assert Connection.parse_proxy({:socks4, :localhost, 9050}) ==
{:ok, :socks4, 'localhost', 9050}
end
test "as tuple with string host" do
assert Connection.parse_proxy({:socks5, "localhost", 9050}) ==
{:ok, :socks5, 'localhost', 9050}
end
end
describe "parse_proxy/1 errors" do
test "ip without port" do
capture_log(fn ->
assert Connection.parse_proxy("127.0.0.1") == {:error, :invalid_proxy}
end) =~ "parsing proxy fail \"127.0.0.1\""
end
test "host without port" do
capture_log(fn ->
assert Connection.parse_proxy("localhost") == {:error, :invalid_proxy}
end) =~ "parsing proxy fail \"localhost\""
end
test "host with bad port" do
capture_log(fn ->
assert Connection.parse_proxy("localhost:port") == {:error, :invalid_proxy_port}
end) =~ "parsing port in proxy fail \"localhost:port\""
end
test "ip with bad port" do
capture_log(fn ->
assert Connection.parse_proxy("127.0.0.1:15.9") == {:error, :invalid_proxy_port}
end) =~ "parsing port in proxy fail \"127.0.0.1:15.9\""
end
test "as tuple without port" do
capture_log(fn ->
assert Connection.parse_proxy({:socks5, :localhost}) == {:error, :invalid_proxy}
end) =~ "parsing proxy fail {:socks5, :localhost}"
end
test "with nil" do
assert Connection.parse_proxy(nil) == nil
end
end
describe "options/3" do
setup do: clear_config([:http, :proxy_url])
test "without proxy_url in config" do
Config.delete([:http, :proxy_url])
opts = Connection.options(%URI{})
refute Keyword.has_key?(opts, :proxy)
end
test "parses string proxy host & port" do
Config.put([:http, :proxy_url], "localhost:8123")
opts = Connection.options(%URI{})
assert opts[:proxy] == {'localhost', 8123}
end
test "parses tuple proxy scheme host and port" do
Config.put([:http, :proxy_url], {:socks, 'localhost', 1234})
opts = Connection.options(%URI{})
assert opts[:proxy] == {:socks, 'localhost', 1234}
end
test "passed opts have more weight than defaults" do
Config.put([:http, :proxy_url], {:socks5, 'localhost', 1234})
opts = Connection.options(%URI{}, proxy: {'example.com', 4321})
assert opts[:proxy] == {'example.com', 4321}
end
end
describe "format_host/1" do
test "with domain" do
assert Connection.format_host("example.com") == 'example.com'
end
test "with idna domain" do
assert Connection.format_host("ですexample.com") == 'xn--example-183fne.com'
end
test "with ipv4" do
assert Connection.format_host("127.0.0.1") == '127.0.0.1'
end
test "with ipv6" do
assert Connection.format_host("2a03:2880:f10c:83:face:b00c:0:25de") ==
'2a03:2880:f10c:83:face:b00c:0:25de'
end
end
end

View file

@ -1,760 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Pool.ConnectionsTest do
use ExUnit.Case, async: true
use Pleroma.Tests.Helpers
import ExUnit.CaptureLog
import Mox
alias Pleroma.Gun.Conn
alias Pleroma.GunMock
alias Pleroma.Pool.Connections
setup :verify_on_exit!
setup_all do
name = :test_connections
{:ok, pid} = Connections.start_link({name, [checkin_timeout: 150]})
{:ok, _} = Registry.start_link(keys: :unique, name: Pleroma.GunMock)
on_exit(fn ->
if Process.alive?(pid), do: GenServer.stop(name)
end)
{:ok, name: name}
end
defp open_mock(num \\ 1) do
GunMock
|> expect(:open, num, &start_and_register(&1, &2, &3))
|> expect(:await_up, num, fn _, _ -> {:ok, :http} end)
|> expect(:set_owner, num, fn _, _ -> :ok end)
end
defp connect_mock(mock) do
mock
|> expect(:connect, &connect(&1, &2))
|> expect(:await, &await(&1, &2))
end
defp info_mock(mock), do: expect(mock, :info, &info(&1))
defp start_and_register('gun-not-up.com', _, _), do: {:error, :timeout}
defp start_and_register(host, port, _) do
{:ok, pid} = Task.start_link(fn -> Process.sleep(1000) end)
scheme =
case port do
443 -> "https"
_ -> "http"
end
Registry.register(GunMock, pid, %{
origin_scheme: scheme,
origin_host: host,
origin_port: port
})
{:ok, pid}
end
defp info(pid) do
[{_, info}] = Registry.lookup(GunMock, pid)
info
end
defp connect(pid, _) do
ref = make_ref()
Registry.register(GunMock, ref, pid)
ref
end
defp await(pid, ref) do
[{_, ^pid}] = Registry.lookup(GunMock, ref)
{:response, :fin, 200, []}
end
defp now, do: :os.system_time(:second)
describe "alive?/2" do
test "is alive", %{name: name} do
assert Connections.alive?(name)
end
test "returns false if not started" do
refute Connections.alive?(:some_random_name)
end
end
test "opens connection and reuse it on next request", %{name: name} do
open_mock()
url = "http://some-domain.com"
key = "http:some-domain.com:80"
refute Connections.checkin(url, name)
:ok = Conn.open(url, name)
conn = Connections.checkin(url, name)
assert is_pid(conn)
assert Process.alive?(conn)
self = self()
%Connections{
conns: %{
^key => %Conn{
conn: ^conn,
gun_state: :up,
used_by: [{^self, _}],
conn_state: :active
}
}
} = Connections.get_state(name)
reused_conn = Connections.checkin(url, name)
assert conn == reused_conn
%Connections{
conns: %{
^key => %Conn{
conn: ^conn,
gun_state: :up,
used_by: [{^self, _}, {^self, _}],
conn_state: :active
}
}
} = Connections.get_state(name)
:ok = Connections.checkout(conn, self, name)
%Connections{
conns: %{
^key => %Conn{
conn: ^conn,
gun_state: :up,
used_by: [{^self, _}],
conn_state: :active
}
}
} = Connections.get_state(name)
:ok = Connections.checkout(conn, self, name)
%Connections{
conns: %{
^key => %Conn{
conn: ^conn,
gun_state: :up,
used_by: [],
conn_state: :idle
}
}
} = Connections.get_state(name)
end
test "reuse connection for idna domains", %{name: name} do
open_mock()
url = "http://ですsome-domain.com"
refute Connections.checkin(url, name)
:ok = Conn.open(url, name)
conn = Connections.checkin(url, name)
assert is_pid(conn)
assert Process.alive?(conn)
self = self()
%Connections{
conns: %{
"http:ですsome-domain.com:80" => %Conn{
conn: ^conn,
gun_state: :up,
used_by: [{^self, _}],
conn_state: :active
}
}
} = Connections.get_state(name)
reused_conn = Connections.checkin(url, name)
assert conn == reused_conn
end
test "reuse for ipv4", %{name: name} do
open_mock()
url = "http://127.0.0.1"
refute Connections.checkin(url, name)
:ok = Conn.open(url, name)
conn = Connections.checkin(url, name)
assert is_pid(conn)
assert Process.alive?(conn)
self = self()
%Connections{
conns: %{
"http:127.0.0.1:80" => %Conn{
conn: ^conn,
gun_state: :up,
used_by: [{^self, _}],
conn_state: :active
}
}
} = Connections.get_state(name)
reused_conn = Connections.checkin(url, name)
assert conn == reused_conn
:ok = Connections.checkout(conn, self, name)
:ok = Connections.checkout(reused_conn, self, name)
%Connections{
conns: %{
"http:127.0.0.1:80" => %Conn{
conn: ^conn,
gun_state: :up,
used_by: [],
conn_state: :idle
}
}
} = Connections.get_state(name)
end
test "reuse for ipv6", %{name: name} do
open_mock()
url = "http://[2a03:2880:f10c:83:face:b00c:0:25de]"
refute Connections.checkin(url, name)
:ok = Conn.open(url, name)
conn = Connections.checkin(url, name)
assert is_pid(conn)
assert Process.alive?(conn)
self = self()
%Connections{
conns: %{
"http:2a03:2880:f10c:83:face:b00c:0:25de:80" => %Conn{
conn: ^conn,
gun_state: :up,
used_by: [{^self, _}],
conn_state: :active
}
}
} = Connections.get_state(name)
reused_conn = Connections.checkin(url, name)
assert conn == reused_conn
end
test "up and down ipv4", %{name: name} do
open_mock()
|> info_mock()
|> allow(self(), name)
self = self()
url = "http://127.0.0.1"
:ok = Conn.open(url, name)
conn = Connections.checkin(url, name)
send(name, {:gun_down, conn, nil, nil, nil})
send(name, {:gun_up, conn, nil})
%Connections{
conns: %{
"http:127.0.0.1:80" => %Conn{
conn: ^conn,
gun_state: :up,
used_by: [{^self, _}],
conn_state: :active
}
}
} = Connections.get_state(name)
end
test "up and down ipv6", %{name: name} do
self = self()
open_mock()
|> info_mock()
|> allow(self, name)
url = "http://[2a03:2880:f10c:83:face:b00c:0:25de]"
:ok = Conn.open(url, name)
conn = Connections.checkin(url, name)
send(name, {:gun_down, conn, nil, nil, nil})
send(name, {:gun_up, conn, nil})
%Connections{
conns: %{
"http:2a03:2880:f10c:83:face:b00c:0:25de:80" => %Conn{
conn: ^conn,
gun_state: :up,
used_by: [{^self, _}],
conn_state: :active
}
}
} = Connections.get_state(name)
end
test "reuses connection based on protocol", %{name: name} do
open_mock(2)
http_url = "http://some-domain.com"
http_key = "http:some-domain.com:80"
https_url = "https://some-domain.com"
https_key = "https:some-domain.com:443"
refute Connections.checkin(http_url, name)
:ok = Conn.open(http_url, name)
conn = Connections.checkin(http_url, name)
assert is_pid(conn)
assert Process.alive?(conn)
refute Connections.checkin(https_url, name)
:ok = Conn.open(https_url, name)
https_conn = Connections.checkin(https_url, name)
refute conn == https_conn
reused_https = Connections.checkin(https_url, name)
refute conn == reused_https
assert reused_https == https_conn
%Connections{
conns: %{
^http_key => %Conn{
conn: ^conn,
gun_state: :up
},
^https_key => %Conn{
conn: ^https_conn,
gun_state: :up
}
}
} = Connections.get_state(name)
end
test "connection can't get up", %{name: name} do
expect(GunMock, :open, &start_and_register(&1, &2, &3))
url = "http://gun-not-up.com"
assert capture_log(fn ->
refute Conn.open(url, name)
refute Connections.checkin(url, name)
end) =~
"Opening connection to http://gun-not-up.com failed with error {:error, :timeout}"
end
test "process gun_down message and then gun_up", %{name: name} do
self = self()
open_mock()
|> info_mock()
|> allow(self, name)
url = "http://gun-down-and-up.com"
key = "http:gun-down-and-up.com:80"
:ok = Conn.open(url, name)
conn = Connections.checkin(url, name)
assert is_pid(conn)
assert Process.alive?(conn)
%Connections{
conns: %{
^key => %Conn{
conn: ^conn,
gun_state: :up,
used_by: [{^self, _}]
}
}
} = Connections.get_state(name)
send(name, {:gun_down, conn, :http, nil, nil})
%Connections{
conns: %{
^key => %Conn{
conn: ^conn,
gun_state: :down,
used_by: [{^self, _}]
}
}
} = Connections.get_state(name)
send(name, {:gun_up, conn, :http})
conn2 = Connections.checkin(url, name)
assert conn == conn2
assert is_pid(conn2)
assert Process.alive?(conn2)
%Connections{
conns: %{
^key => %Conn{
conn: _,
gun_state: :up,
used_by: [{^self, _}, {^self, _}]
}
}
} = Connections.get_state(name)
end
test "async processes get same conn for same domain", %{name: name} do
open_mock()
url = "http://some-domain.com"
:ok = Conn.open(url, name)
tasks =
for _ <- 1..5 do
Task.async(fn ->
Connections.checkin(url, name)
end)
end
tasks_with_results = Task.yield_many(tasks)
results =
Enum.map(tasks_with_results, fn {task, res} ->
res || Task.shutdown(task, :brutal_kill)
end)
conns = for {:ok, value} <- results, do: value
%Connections{
conns: %{
"http:some-domain.com:80" => %Conn{
conn: conn,
gun_state: :up
}
}
} = Connections.get_state(name)
assert Enum.all?(conns, fn res -> res == conn end)
end
test "remove frequently used and idle", %{name: name} do
open_mock(3)
self = self()
http_url = "http://some-domain.com"
https_url = "https://some-domain.com"
:ok = Conn.open(https_url, name)
:ok = Conn.open(http_url, name)
conn1 = Connections.checkin(https_url, name)
[conn2 | _conns] =
for _ <- 1..4 do
Connections.checkin(http_url, name)
end
http_key = "http:some-domain.com:80"
%Connections{
conns: %{
^http_key => %Conn{
conn: ^conn2,
gun_state: :up,
conn_state: :active,
used_by: [{^self, _}, {^self, _}, {^self, _}, {^self, _}]
},
"https:some-domain.com:443" => %Conn{
conn: ^conn1,
gun_state: :up,
conn_state: :active,
used_by: [{^self, _}]
}
}
} = Connections.get_state(name)
:ok = Connections.checkout(conn1, self, name)
another_url = "http://another-domain.com"
:ok = Conn.open(another_url, name)
conn = Connections.checkin(another_url, name)
%Connections{
conns: %{
"http:another-domain.com:80" => %Conn{
conn: ^conn,
gun_state: :up
},
^http_key => %Conn{
conn: _,
gun_state: :up
}
}
} = Connections.get_state(name)
end
describe "with proxy" do
test "as ip", %{name: name} do
open_mock()
|> connect_mock()
url = "http://proxy-string.com"
key = "http:proxy-string.com:80"
:ok = Conn.open(url, name, proxy: {{127, 0, 0, 1}, 8123})
conn = Connections.checkin(url, name)
%Connections{
conns: %{
^key => %Conn{
conn: ^conn,
gun_state: :up
}
}
} = Connections.get_state(name)
reused_conn = Connections.checkin(url, name)
assert reused_conn == conn
end
test "as host", %{name: name} do
open_mock()
|> connect_mock()
url = "http://proxy-tuple-atom.com"
:ok = Conn.open(url, name, proxy: {'localhost', 9050})
conn = Connections.checkin(url, name)
%Connections{
conns: %{
"http:proxy-tuple-atom.com:80" => %Conn{
conn: ^conn,
gun_state: :up
}
}
} = Connections.get_state(name)
reused_conn = Connections.checkin(url, name)
assert reused_conn == conn
end
test "as ip and ssl", %{name: name} do
open_mock()
|> connect_mock()
url = "https://proxy-string.com"
:ok = Conn.open(url, name, proxy: {{127, 0, 0, 1}, 8123})
conn = Connections.checkin(url, name)
%Connections{
conns: %{
"https:proxy-string.com:443" => %Conn{
conn: ^conn,
gun_state: :up
}
}
} = Connections.get_state(name)
reused_conn = Connections.checkin(url, name)
assert reused_conn == conn
end
test "as host and ssl", %{name: name} do
open_mock()
|> connect_mock()
url = "https://proxy-tuple-atom.com"
:ok = Conn.open(url, name, proxy: {'localhost', 9050})
conn = Connections.checkin(url, name)
%Connections{
conns: %{
"https:proxy-tuple-atom.com:443" => %Conn{
conn: ^conn,
gun_state: :up
}
}
} = Connections.get_state(name)
reused_conn = Connections.checkin(url, name)
assert reused_conn == conn
end
test "with socks type", %{name: name} do
open_mock()
url = "http://proxy-socks.com"
:ok = Conn.open(url, name, proxy: {:socks5, 'localhost', 1234})
conn = Connections.checkin(url, name)
%Connections{
conns: %{
"http:proxy-socks.com:80" => %Conn{
conn: ^conn,
gun_state: :up
}
}
} = Connections.get_state(name)
reused_conn = Connections.checkin(url, name)
assert reused_conn == conn
end
test "with socks4 type and ssl", %{name: name} do
open_mock()
url = "https://proxy-socks.com"
:ok = Conn.open(url, name, proxy: {:socks4, 'localhost', 1234})
conn = Connections.checkin(url, name)
%Connections{
conns: %{
"https:proxy-socks.com:443" => %Conn{
conn: ^conn,
gun_state: :up
}
}
} = Connections.get_state(name)
reused_conn = Connections.checkin(url, name)
assert reused_conn == conn
end
end
describe "crf/3" do
setup do
crf = Connections.crf(1, 10, 1)
{:ok, crf: crf}
end
test "more used will have crf higher", %{crf: crf} do
# used 3 times
crf1 = Connections.crf(1, 10, crf)
crf1 = Connections.crf(1, 10, crf1)
# used 2 times
crf2 = Connections.crf(1, 10, crf)
assert crf1 > crf2
end
test "recently used will have crf higher on equal references", %{crf: crf} do
# used 3 sec ago
crf1 = Connections.crf(3, 10, crf)
# used 4 sec ago
crf2 = Connections.crf(4, 10, crf)
assert crf1 > crf2
end
test "equal crf on equal reference and time", %{crf: crf} do
# used 2 times
crf1 = Connections.crf(1, 10, crf)
# used 2 times
crf2 = Connections.crf(1, 10, crf)
assert crf1 == crf2
end
test "recently used will have higher crf", %{crf: crf} do
crf1 = Connections.crf(2, 10, crf)
crf1 = Connections.crf(1, 10, crf1)
crf2 = Connections.crf(3, 10, crf)
crf2 = Connections.crf(4, 10, crf2)
assert crf1 > crf2
end
end
describe "get_unused_conns/1" do
test "crf is equalent, sorting by reference", %{name: name} do
Connections.add_conn(name, "1", %Conn{
conn_state: :idle,
last_reference: now() - 1
})
Connections.add_conn(name, "2", %Conn{
conn_state: :idle,
last_reference: now()
})
assert [{"1", _unused_conn} | _others] = Connections.get_unused_conns(name)
end
test "reference is equalent, sorting by crf", %{name: name} do
Connections.add_conn(name, "1", %Conn{
conn_state: :idle,
crf: 1.999
})
Connections.add_conn(name, "2", %Conn{
conn_state: :idle,
crf: 2
})
assert [{"1", _unused_conn} | _others] = Connections.get_unused_conns(name)
end
test "higher crf and lower reference", %{name: name} do
Connections.add_conn(name, "1", %Conn{
conn_state: :idle,
crf: 3,
last_reference: now() - 1
})
Connections.add_conn(name, "2", %Conn{
conn_state: :idle,
crf: 2,
last_reference: now()
})
assert [{"2", _unused_conn} | _others] = Connections.get_unused_conns(name)
end
test "lower crf and lower reference", %{name: name} do
Connections.add_conn(name, "1", %Conn{
conn_state: :idle,
crf: 1.99,
last_reference: now() - 1
})
Connections.add_conn(name, "2", %Conn{
conn_state: :idle,
crf: 2,
last_reference: now()
})
assert [{"1", _unused_conn} | _others] = Connections.get_unused_conns(name)
end
end
test "count/1" do
name = :test_count
{:ok, _} = Connections.start_link({name, [checkin_timeout: 150]})
assert Connections.count(name) == 0
Connections.add_conn(name, "1", %Conn{conn: self()})
assert Connections.count(name) == 1
Connections.remove_conn(name, "1")
assert Connections.count(name) == 0
end
end