From 58a4f350a8bc361d793cb96442f856362c18f195 Mon Sep 17 00:00:00 2001 From: rinpatch Date: Wed, 6 May 2020 01:51:10 +0300 Subject: [PATCH] Refactor gun pooling and simplify adapter option insertion This patch refactors gun pooling to use Elixir process registry and simplifies adapter option insertion. Having the pool use process registry instead of a GenServer has a number of advantages: - Simpler code: the initial implementation adds about half the lines of code it deletes - Concurrency: unlike a GenServer, ETS-based registry can handle multiple checkout/checkin requests at the same time - Precise and easy idle connection clousure: current proposal for closing idle connections in the GenServer-based pool needs to filter through all connections once a minute and compare their last active time with closing time. With Elixir process registry this can be done by just using `Process.send_after`/`Process.cancel_timer` in the worker process. - Lower memory footprint: In my tests `gun-memory-leak` branch uses about 290mb on peak load (250 connections) and 235mb on idle (5-10 connections). Registry-based pool uses 210mb on idle and 240mb on peak load --- config/config.exs | 2 + lib/pleroma/application.ex | 8 +- lib/pleroma/gun/conn.ex | 78 +----- lib/pleroma/gun/connection_pool.ex | 129 ++++++++++ lib/pleroma/gun/connection_pool/worker.ex | 95 +++++++ lib/pleroma/http/adapter_helper.ex | 133 +++++++++- lib/pleroma/http/adapter_helper/default.ex | 17 ++ lib/pleroma/http/adapter_helper/gun.ex | 32 +-- lib/pleroma/http/adapter_helper/hackney.ex | 3 + lib/pleroma/http/connection.ex | 124 --------- lib/pleroma/http/http.ex | 53 ++-- lib/pleroma/pool/connections.ex | 283 --------------------- lib/pleroma/pool/pool.ex | 22 -- lib/pleroma/pool/request.ex | 65 ----- lib/pleroma/pool/supervisor.ex | 42 --- lib/pleroma/reverse_proxy/client/tesla.ex | 2 +- 16 files changed, 402 insertions(+), 686 deletions(-) create mode 100644 lib/pleroma/gun/connection_pool.ex create mode 100644 lib/pleroma/gun/connection_pool/worker.ex create mode 100644 lib/pleroma/http/adapter_helper/default.ex delete mode 100644 lib/pleroma/http/connection.ex delete mode 100644 lib/pleroma/pool/connections.ex delete mode 100644 lib/pleroma/pool/pool.ex delete mode 100644 lib/pleroma/pool/request.ex delete mode 100644 lib/pleroma/pool/supervisor.ex diff --git a/config/config.exs b/config/config.exs index 6fc84efc2..577ccc198 100644 --- a/config/config.exs +++ b/config/config.exs @@ -647,8 +647,10 @@ prepare: :unnamed config :pleroma, :connections_pool, + reclaim_multiplier: 0.1, checkin_timeout: 250, max_connections: 250, + max_idle_time: 30_000, retry: 1, retry_timeout: 1000, await_up_timeout: 5_000 diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 3282c6882..be14c1f9f 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -223,9 +223,7 @@ defp task_children(_) do # start hackney and gun pools in tests defp http_children(_, :test) do - hackney_options = Config.get([:hackney_pools, :federation]) - hackney_pool = :hackney_pool.child_spec(:federation, hackney_options) - [hackney_pool, Pleroma.Pool.Supervisor] + http_children(Tesla.Adapter.Hackney, nil) ++ http_children(Tesla.Adapter.Gun, nil) end defp http_children(Tesla.Adapter.Hackney, _) do @@ -244,7 +242,9 @@ defp http_children(Tesla.Adapter.Hackney, _) do end end - defp http_children(Tesla.Adapter.Gun, _), do: [Pleroma.Pool.Supervisor] + defp http_children(Tesla.Adapter.Gun, _) do + [{Registry, keys: :unique, name: Pleroma.Gun.ConnectionPool}] + end defp http_children(_, _), do: [] end diff --git a/lib/pleroma/gun/conn.ex b/lib/pleroma/gun/conn.ex index cd25a2e74..77f78c7ff 100644 --- a/lib/pleroma/gun/conn.ex +++ b/lib/pleroma/gun/conn.ex @@ -3,40 +3,11 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Gun.Conn do - @moduledoc """ - Struct for gun connection data - """ alias Pleroma.Gun - alias Pleroma.Pool.Connections require Logger - @type gun_state :: :up | :down - @type conn_state :: :active | :idle - - @type t :: %__MODULE__{ - conn: pid(), - gun_state: gun_state(), - conn_state: conn_state(), - used_by: [pid()], - last_reference: pos_integer(), - crf: float(), - retries: pos_integer() - } - - defstruct conn: nil, - gun_state: :open, - conn_state: :init, - used_by: [], - last_reference: 0, - crf: 1, - retries: 0 - - @spec open(String.t() | URI.t(), atom(), keyword()) :: :ok | nil - def open(url, name, opts \\ []) - def open(url, name, opts) when is_binary(url), do: open(URI.parse(url), name, opts) - - def open(%URI{} = uri, name, opts) do + def open(%URI{} = uri, opts) do pool_opts = Pleroma.Config.get([:connections_pool], []) opts = @@ -45,30 +16,10 @@ def open(%URI{} = uri, name, opts) do |> Map.put_new(:retry, pool_opts[:retry] || 1) |> Map.put_new(:retry_timeout, pool_opts[:retry_timeout] || 1000) |> Map.put_new(:await_up_timeout, pool_opts[:await_up_timeout] || 5_000) + |> Map.put_new(:supervise, false) |> maybe_add_tls_opts(uri) - key = "#{uri.scheme}:#{uri.host}:#{uri.port}" - - max_connections = pool_opts[:max_connections] || 250 - - conn_pid = - if Connections.count(name) < max_connections do - do_open(uri, opts) - else - close_least_used_and_do_open(name, uri, opts) - end - - if is_pid(conn_pid) do - conn = %Pleroma.Gun.Conn{ - conn: conn_pid, - gun_state: :up, - conn_state: :active, - last_reference: :os.system_time(:second) - } - - :ok = Gun.set_owner(conn_pid, Process.whereis(name)) - Connections.add_conn(name, key, conn) - end + do_open(uri, opts) end defp maybe_add_tls_opts(opts, %URI{scheme: "http"}), do: opts @@ -81,7 +32,7 @@ defp maybe_add_tls_opts(opts, %URI{scheme: "https", host: host}) do reuse_sessions: false, verify_fun: {&:ssl_verify_hostname.verify_fun/3, - [check_hostname: Pleroma.HTTP.Connection.format_host(host)]} + [check_hostname: Pleroma.HTTP.AdapterHelper.format_host(host)]} ] tls_opts = @@ -105,7 +56,7 @@ defp do_open(uri, %{proxy: {proxy_host, proxy_port}} = opts) do {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]), stream <- Gun.connect(conn, connect_opts), {:response, :fin, 200, _} <- Gun.await(conn, stream) do - conn + {:ok, conn} else error -> Logger.warn( @@ -141,7 +92,7 @@ defp do_open(uri, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do with {:ok, conn} <- Gun.open(proxy_host, proxy_port, opts), {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do - conn + {:ok, conn} else error -> Logger.warn( @@ -155,11 +106,11 @@ defp do_open(uri, %{proxy: {proxy_type, proxy_host, proxy_port}} = opts) do end defp do_open(%URI{host: host, port: port} = uri, opts) do - host = Pleroma.HTTP.Connection.parse_host(host) + host = Pleroma.HTTP.AdapterHelper.parse_host(host) with {:ok, conn} <- Gun.open(host, port, opts), {:ok, _} <- Gun.await_up(conn, opts[:await_up_timeout]) do - conn + {:ok, conn} else error -> Logger.warn( @@ -171,7 +122,7 @@ defp do_open(%URI{host: host, port: port} = uri, opts) do end defp destination_opts(%URI{host: host, port: port}) do - host = Pleroma.HTTP.Connection.parse_host(host) + host = Pleroma.HTTP.AdapterHelper.parse_host(host) %{host: host, port: port} end @@ -181,17 +132,6 @@ defp add_http2_opts(opts, "https", tls_opts) do defp add_http2_opts(opts, _, _), do: opts - defp close_least_used_and_do_open(name, uri, opts) do - with [{key, conn} | _conns] <- Connections.get_unused_conns(name), - :ok <- Gun.close(conn.conn) do - Connections.remove_conn(name, key) - - do_open(uri, opts) - else - [] -> {:error, :pool_overflowed} - end - end - def compose_uri_log(%URI{scheme: scheme, host: host, path: path}) do "#{scheme}://#{host}#{path}" end diff --git a/lib/pleroma/gun/connection_pool.ex b/lib/pleroma/gun/connection_pool.ex new file mode 100644 index 000000000..e6abee69c --- /dev/null +++ b/lib/pleroma/gun/connection_pool.ex @@ -0,0 +1,129 @@ +defmodule Pleroma.Gun.ConnectionPool do + @registry __MODULE__ + + def get_conn(uri, opts) do + case enforce_pool_limits() do + :ok -> + key = "#{uri.scheme}:#{uri.host}:#{uri.port}" + + case Registry.lookup(@registry, key) do + # The key has already been registered, but connection is not up yet + [{worker_pid, {nil, _used_by, _crf, _last_reference}}] -> + get_gun_pid_from_worker(worker_pid) + + [{worker_pid, {gun_pid, _used_by, _crf, _last_reference}}] -> + GenServer.cast(worker_pid, {:add_client, self(), false}) + {:ok, gun_pid} + + [] -> + # :gun.set_owner fails in :connected state for whatevever reason, + # so we open the connection in the process directly and send it's pid back + # We trust gun to handle timeouts by itself + case GenServer.start(Pleroma.Gun.ConnectionPool.Worker, [uri, key, opts, self()], + timeout: :infinity + ) do + {:ok, _worker_pid} -> + receive do + {:conn_pid, pid} -> {:ok, pid} + end + + {:error, {:error, {:already_registered, worker_pid}}} -> + get_gun_pid_from_worker(worker_pid) + + err -> + err + end + end + + :error -> + {:error, :pool_full} + end + end + + @enforcer_key "enforcer" + defp enforce_pool_limits() do + max_connections = Pleroma.Config.get([:connections_pool, :max_connections]) + + if Registry.count(@registry) >= max_connections do + case Registry.lookup(@registry, @enforcer_key) do + [] -> + pid = + spawn(fn -> + {:ok, _pid} = Registry.register(@registry, @enforcer_key, nil) + + reclaim_max = + [:connections_pool, :reclaim_multiplier] + |> Pleroma.Config.get() + |> Kernel.*(max_connections) + |> round + |> max(1) + + unused_conns = + Registry.select( + @registry, + [ + {{:_, :"$1", {:_, :"$2", :"$3", :"$4"}}, [{:==, :"$2", []}], + [{{:"$1", :"$3", :"$4"}}]} + ] + ) + + case unused_conns do + [] -> + exit(:pool_full) + + unused_conns -> + unused_conns + |> Enum.sort(fn {_pid1, crf1, last_reference1}, + {_pid2, crf2, last_reference2} -> + crf1 <= crf2 and last_reference1 <= last_reference2 + end) + |> Enum.take(reclaim_max) + |> Enum.each(fn {pid, _, _} -> GenServer.call(pid, :idle_close) end) + end + end) + + wait_for_enforcer_finish(pid) + + [{pid, _}] -> + wait_for_enforcer_finish(pid) + end + else + :ok + end + end + + defp wait_for_enforcer_finish(pid) do + ref = Process.monitor(pid) + + receive do + {:DOWN, ^ref, :process, ^pid, :pool_full} -> + :error + + {:DOWN, ^ref, :process, ^pid, :normal} -> + :ok + end + end + + defp get_gun_pid_from_worker(worker_pid) do + # GenServer.call will block the process for timeout length if + # the server crashes on startup (which will happen if gun fails to connect) + # so instead we use cast + monitor + + ref = Process.monitor(worker_pid) + GenServer.cast(worker_pid, {:add_client, self(), true}) + + receive do + {:conn_pid, pid} -> {:ok, pid} + {:DOWN, ^ref, :process, ^worker_pid, reason} -> reason + end + end + + def release_conn(conn_pid) do + [worker_pid] = + Registry.select(@registry, [ + {{:_, :"$1", {:"$2", :_, :_, :_}}, [{:==, :"$2", conn_pid}], [:"$1"]} + ]) + + GenServer.cast(worker_pid, {:remove_client, self()}) + end +end diff --git a/lib/pleroma/gun/connection_pool/worker.ex b/lib/pleroma/gun/connection_pool/worker.ex new file mode 100644 index 000000000..ebde4bbf6 --- /dev/null +++ b/lib/pleroma/gun/connection_pool/worker.ex @@ -0,0 +1,95 @@ +defmodule Pleroma.Gun.ConnectionPool.Worker do + alias Pleroma.Gun + use GenServer + + @registry Pleroma.Gun.ConnectionPool + + @impl true + def init([uri, key, opts, client_pid]) do + time = :os.system_time(:second) + # Register before opening connection to prevent race conditions + with {:ok, _owner} <- Registry.register(@registry, key, {nil, [client_pid], 1, time}), + {:ok, conn_pid} <- Gun.Conn.open(uri, opts), + Process.link(conn_pid) do + {_, _} = + Registry.update_value(@registry, key, fn {_, used_by, crf, last_reference} -> + {conn_pid, used_by, crf, last_reference} + end) + + send(client_pid, {:conn_pid, conn_pid}) + {:ok, %{key: key, timer: nil}, :hibernate} + else + err -> {:stop, err} + end + end + + @impl true + def handle_cast({:add_client, client_pid, send_pid_back}, %{key: key} = state) do + time = :os.system_time(:second) + + {{conn_pid, _, _, _}, _} = + Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} -> + {conn_pid, [client_pid | used_by], crf(time - last_reference, crf), time} + end) + + if send_pid_back, do: send(client_pid, {:conn_pid, conn_pid}) + + state = + if state.timer != nil do + Process.cancel_timer(state[:timer]) + %{state | timer: nil} + else + state + end + + {:noreply, state, :hibernate} + end + + @impl true + def handle_cast({:remove_client, client_pid}, %{key: key} = state) do + {{_conn_pid, used_by, _crf, _last_reference}, _} = + Registry.update_value(@registry, key, fn {conn_pid, used_by, crf, last_reference} -> + {conn_pid, List.delete(used_by, client_pid), crf, last_reference} + end) + + timer = + if used_by == [] do + max_idle = Pleroma.Config.get([:connections_pool, :max_idle_time], 30_000) + Process.send_after(self(), :idle_close, max_idle) + else + nil + end + + {:noreply, %{state | timer: timer}, :hibernate} + end + + @impl true + def handle_info(:idle_close, state) do + # Gun monitors the owner process, and will close the connection automatically + # when it's terminated + {:stop, :normal, state} + end + + # Gracefully shutdown if the connection got closed without any streams left + @impl true + def handle_info({:gun_down, _pid, _protocol, _reason, []}, state) do + {:stop, :normal, state} + end + + # Otherwise, shutdown with an error + @impl true + def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams} = down_message, state) do + {:stop, {:error, down_message}, state} + end + + @impl true + def handle_call(:idle_close, _, %{key: key} = state) do + Registry.unregister(@registry, key) + {:stop, :normal, state} + end + + # LRFU policy: https://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.55.1478 + defp crf(time_delta, prev_crf) do + 1 + :math.pow(0.5, time_delta / 100) * prev_crf + end +end diff --git a/lib/pleroma/http/adapter_helper.ex b/lib/pleroma/http/adapter_helper.ex index 510722ff9..0532ea31d 100644 --- a/lib/pleroma/http/adapter_helper.ex +++ b/lib/pleroma/http/adapter_helper.ex @@ -3,7 +3,21 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.HTTP.AdapterHelper do - alias Pleroma.HTTP.Connection + @moduledoc """ + Configure Tesla.Client with default and customized adapter options. + """ + @defaults [pool: :federation] + + @type ip_address :: ipv4_address() | ipv6_address() + @type ipv4_address :: {0..255, 0..255, 0..255, 0..255} + @type ipv6_address :: + {0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535} + @type proxy_type() :: :socks4 | :socks5 + @type host() :: charlist() | ip_address() + + alias Pleroma.Config + alias Pleroma.HTTP.AdapterHelper + require Logger @type proxy :: {Connection.host(), pos_integer()} @@ -11,24 +25,13 @@ defmodule Pleroma.HTTP.AdapterHelper do @callback options(keyword(), URI.t()) :: keyword() @callback after_request(keyword()) :: :ok - - @spec options(keyword(), URI.t()) :: keyword() - def options(opts, _uri) do - proxy = Pleroma.Config.get([:http, :proxy_url], nil) - maybe_add_proxy(opts, format_proxy(proxy)) - end - - @spec maybe_get_conn(URI.t(), keyword()) :: keyword() - def maybe_get_conn(_uri, opts), do: opts - - @spec after_request(keyword()) :: :ok - def after_request(_opts), do: :ok + @callback get_conn(URI.t(), keyword()) :: {:ok, term()} | {:error, term()} @spec format_proxy(String.t() | tuple() | nil) :: proxy() | nil def format_proxy(nil), do: nil def format_proxy(proxy_url) do - case Connection.parse_proxy(proxy_url) do + case parse_proxy(proxy_url) do {:ok, host, port} -> {host, port} {:ok, type, host, port} -> {type, host, port} _ -> nil @@ -38,4 +41,106 @@ def format_proxy(proxy_url) do @spec maybe_add_proxy(keyword(), proxy() | nil) :: keyword() def maybe_add_proxy(opts, nil), do: opts def maybe_add_proxy(opts, proxy), do: Keyword.put_new(opts, :proxy, proxy) + + @doc """ + Merge default connection & adapter options with received ones. + """ + + @spec options(URI.t(), keyword()) :: keyword() + def options(%URI{} = uri, opts \\ []) do + @defaults + |> pool_timeout() + |> Keyword.merge(opts) + |> adapter_helper().options(uri) + end + + defp pool_timeout(opts) do + {config_key, default} = + if adapter() == Tesla.Adapter.Gun do + {:pools, Config.get([:pools, :default, :timeout])} + else + {:hackney_pools, 10_000} + end + + timeout = Config.get([config_key, opts[:pool], :timeout], default) + + Keyword.merge(opts, timeout: timeout) + end + + @spec after_request(keyword()) :: :ok + def after_request(opts), do: adapter_helper().after_request(opts) + + def get_conn(uri, opts), do: adapter_helper().get_conn(uri, opts) + defp adapter, do: Application.get_env(:tesla, :adapter) + + defp adapter_helper do + case adapter() do + Tesla.Adapter.Gun -> AdapterHelper.Gun + Tesla.Adapter.Hackney -> AdapterHelper.Hackney + _ -> AdapterHelper.Default + end + end + + @spec parse_proxy(String.t() | tuple() | nil) :: + {:ok, host(), pos_integer()} + | {:ok, proxy_type(), host(), pos_integer()} + | {:error, atom()} + | nil + + def parse_proxy(nil), do: nil + + def parse_proxy(proxy) when is_binary(proxy) do + with [host, port] <- String.split(proxy, ":"), + {port, ""} <- Integer.parse(port) do + {:ok, parse_host(host), port} + else + {_, _} -> + Logger.warn("Parsing port failed #{inspect(proxy)}") + {:error, :invalid_proxy_port} + + :error -> + Logger.warn("Parsing port failed #{inspect(proxy)}") + {:error, :invalid_proxy_port} + + _ -> + Logger.warn("Parsing proxy failed #{inspect(proxy)}") + {:error, :invalid_proxy} + end + end + + def parse_proxy(proxy) when is_tuple(proxy) do + with {type, host, port} <- proxy do + {:ok, type, parse_host(host), port} + else + _ -> + Logger.warn("Parsing proxy failed #{inspect(proxy)}") + {:error, :invalid_proxy} + end + end + + @spec parse_host(String.t() | atom() | charlist()) :: charlist() | ip_address() + def parse_host(host) when is_list(host), do: host + def parse_host(host) when is_atom(host), do: to_charlist(host) + + def parse_host(host) when is_binary(host) do + host = to_charlist(host) + + case :inet.parse_address(host) do + {:error, :einval} -> host + {:ok, ip} -> ip + end + end + + @spec format_host(String.t()) :: charlist() + def format_host(host) do + host_charlist = to_charlist(host) + + case :inet.parse_address(host_charlist) do + {:error, :einval} -> + :idna.encode(host_charlist) + + {:ok, _ip} -> + host_charlist + end + end end diff --git a/lib/pleroma/http/adapter_helper/default.ex b/lib/pleroma/http/adapter_helper/default.ex new file mode 100644 index 000000000..218cfacc0 --- /dev/null +++ b/lib/pleroma/http/adapter_helper/default.ex @@ -0,0 +1,17 @@ +defmodule Pleroma.HTTP.AdapterHelper.Default do + alias Pleroma.HTTP.AdapterHelper + + @behaviour Pleroma.HTTP.AdapterHelper + + @spec options(keyword(), URI.t()) :: keyword() + def options(opts, _uri) do + proxy = Pleroma.Config.get([:http, :proxy_url], nil) + AdapterHelper.maybe_add_proxy(opts, AdapterHelper.format_proxy(proxy)) + end + + @spec after_request(keyword()) :: :ok + def after_request(_opts), do: :ok + + @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} + def get_conn(_uri, opts), do: {:ok, opts} +end diff --git a/lib/pleroma/http/adapter_helper/gun.ex b/lib/pleroma/http/adapter_helper/gun.ex index ead7cdc6b..6f7cc9784 100644 --- a/lib/pleroma/http/adapter_helper/gun.ex +++ b/lib/pleroma/http/adapter_helper/gun.ex @@ -5,8 +5,8 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do @behaviour Pleroma.HTTP.AdapterHelper + alias Pleroma.Gun.ConnectionPool alias Pleroma.HTTP.AdapterHelper - alias Pleroma.Pool.Connections require Logger @@ -31,13 +31,13 @@ def options(incoming_opts \\ [], %URI{} = uri) do |> Keyword.merge(config_opts) |> add_scheme_opts(uri) |> AdapterHelper.maybe_add_proxy(proxy) - |> maybe_get_conn(uri, incoming_opts) + |> Keyword.merge(incoming_opts) end @spec after_request(keyword()) :: :ok def after_request(opts) do if opts[:conn] && opts[:body_as] != :chunks do - Connections.checkout(opts[:conn], self(), :gun_connections) + ConnectionPool.release_conn(opts[:conn]) end :ok @@ -51,27 +51,11 @@ defp add_scheme_opts(opts, %{scheme: "https"}) do |> Keyword.put(:tls_opts, log_level: :warning) end - defp maybe_get_conn(adapter_opts, uri, incoming_opts) do - {receive_conn?, opts} = - adapter_opts - |> Keyword.merge(incoming_opts) - |> Keyword.pop(:receive_conn, true) - - if Connections.alive?(:gun_connections) and receive_conn? do - checkin_conn(uri, opts) - else - opts - end - end - - defp checkin_conn(uri, opts) do - case Connections.checkin(uri, :gun_connections) do - nil -> - Task.start(Pleroma.Gun.Conn, :open, [uri, :gun_connections, opts]) - opts - - conn when is_pid(conn) -> - Keyword.merge(opts, conn: conn, close_conn: false) + @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} | {:error, atom()} + def get_conn(uri, opts) do + case ConnectionPool.get_conn(uri, opts) do + {:ok, conn_pid} -> {:ok, Keyword.merge(opts, conn: conn_pid, close_conn: false)} + err -> err end end end diff --git a/lib/pleroma/http/adapter_helper/hackney.ex b/lib/pleroma/http/adapter_helper/hackney.ex index 3972a03a9..42d552740 100644 --- a/lib/pleroma/http/adapter_helper/hackney.ex +++ b/lib/pleroma/http/adapter_helper/hackney.ex @@ -25,4 +25,7 @@ def options(connection_opts \\ [], %URI{} = uri) do defp add_scheme_opts(opts, _), do: opts def after_request(_), do: :ok + + @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} + def get_conn(_uri, opts), do: {:ok, opts} end diff --git a/lib/pleroma/http/connection.ex b/lib/pleroma/http/connection.ex deleted file mode 100644 index ebacf7902..000000000 --- a/lib/pleroma/http/connection.ex +++ /dev/null @@ -1,124 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.HTTP.Connection do - @moduledoc """ - Configure Tesla.Client with default and customized adapter options. - """ - - alias Pleroma.Config - alias Pleroma.HTTP.AdapterHelper - - require Logger - - @defaults [pool: :federation] - - @type ip_address :: ipv4_address() | ipv6_address() - @type ipv4_address :: {0..255, 0..255, 0..255, 0..255} - @type ipv6_address :: - {0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535, 0..65_535} - @type proxy_type() :: :socks4 | :socks5 - @type host() :: charlist() | ip_address() - - @doc """ - Merge default connection & adapter options with received ones. - """ - - @spec options(URI.t(), keyword()) :: keyword() - def options(%URI{} = uri, opts \\ []) do - @defaults - |> pool_timeout() - |> Keyword.merge(opts) - |> adapter_helper().options(uri) - end - - defp pool_timeout(opts) do - {config_key, default} = - if adapter() == Tesla.Adapter.Gun do - {:pools, Config.get([:pools, :default, :timeout])} - else - {:hackney_pools, 10_000} - end - - timeout = Config.get([config_key, opts[:pool], :timeout], default) - - Keyword.merge(opts, timeout: timeout) - end - - @spec after_request(keyword()) :: :ok - def after_request(opts), do: adapter_helper().after_request(opts) - - defp adapter, do: Application.get_env(:tesla, :adapter) - - defp adapter_helper do - case adapter() do - Tesla.Adapter.Gun -> AdapterHelper.Gun - Tesla.Adapter.Hackney -> AdapterHelper.Hackney - _ -> AdapterHelper - end - end - - @spec parse_proxy(String.t() | tuple() | nil) :: - {:ok, host(), pos_integer()} - | {:ok, proxy_type(), host(), pos_integer()} - | {:error, atom()} - | nil - - def parse_proxy(nil), do: nil - - def parse_proxy(proxy) when is_binary(proxy) do - with [host, port] <- String.split(proxy, ":"), - {port, ""} <- Integer.parse(port) do - {:ok, parse_host(host), port} - else - {_, _} -> - Logger.warn("Parsing port failed #{inspect(proxy)}") - {:error, :invalid_proxy_port} - - :error -> - Logger.warn("Parsing port failed #{inspect(proxy)}") - {:error, :invalid_proxy_port} - - _ -> - Logger.warn("Parsing proxy failed #{inspect(proxy)}") - {:error, :invalid_proxy} - end - end - - def parse_proxy(proxy) when is_tuple(proxy) do - with {type, host, port} <- proxy do - {:ok, type, parse_host(host), port} - else - _ -> - Logger.warn("Parsing proxy failed #{inspect(proxy)}") - {:error, :invalid_proxy} - end - end - - @spec parse_host(String.t() | atom() | charlist()) :: charlist() | ip_address() - def parse_host(host) when is_list(host), do: host - def parse_host(host) when is_atom(host), do: to_charlist(host) - - def parse_host(host) when is_binary(host) do - host = to_charlist(host) - - case :inet.parse_address(host) do - {:error, :einval} -> host - {:ok, ip} -> ip - end - end - - @spec format_host(String.t()) :: charlist() - def format_host(host) do - host_charlist = to_charlist(host) - - case :inet.parse_address(host_charlist) do - {:error, :einval} -> - :idna.encode(host_charlist) - - {:ok, _ip} -> - host_charlist - end - end -end diff --git a/lib/pleroma/http/http.ex b/lib/pleroma/http/http.ex index 66ca75367..8ded76601 100644 --- a/lib/pleroma/http/http.ex +++ b/lib/pleroma/http/http.ex @@ -7,7 +7,7 @@ defmodule Pleroma.HTTP do Wrapper for `Tesla.request/2`. """ - alias Pleroma.HTTP.Connection + alias Pleroma.HTTP.AdapterHelper alias Pleroma.HTTP.Request alias Pleroma.HTTP.RequestBuilder, as: Builder alias Tesla.Client @@ -60,49 +60,26 @@ def post(url, body, headers \\ [], options \\ []), {:ok, Env.t()} | {:error, any()} def request(method, url, body, headers, options) when is_binary(url) do uri = URI.parse(url) - adapter_opts = Connection.options(uri, options[:adapter] || []) - options = put_in(options[:adapter], adapter_opts) - params = options[:params] || [] - request = build_request(method, headers, options, url, body, params) + adapter_opts = AdapterHelper.options(uri, options[:adapter] || []) - adapter = Application.get_env(:tesla, :adapter) - client = Tesla.client([Tesla.Middleware.FollowRedirects], adapter) + case AdapterHelper.get_conn(uri, adapter_opts) do + {:ok, adapter_opts} -> + options = put_in(options[:adapter], adapter_opts) + params = options[:params] || [] + request = build_request(method, headers, options, url, body, params) - pid = Process.whereis(adapter_opts[:pool]) + adapter = Application.get_env(:tesla, :adapter) + client = Tesla.client([Tesla.Middleware.FollowRedirects], adapter) - pool_alive? = - if adapter == Tesla.Adapter.Gun && pid do - Process.alive?(pid) - else - false - end + response = request(client, request) - request_opts = - adapter_opts - |> Enum.into(%{}) - |> Map.put(:env, Pleroma.Config.get([:env])) - |> Map.put(:pool_alive?, pool_alive?) + AdapterHelper.after_request(adapter_opts) - response = request(client, request, request_opts) + response - Connection.after_request(adapter_opts) - - response - end - - @spec request(Client.t(), keyword(), map()) :: {:ok, Env.t()} | {:error, any()} - def request(%Client{} = client, request, %{env: :test}), do: request(client, request) - - def request(%Client{} = client, request, %{body_as: :chunks}), do: request(client, request) - - def request(%Client{} = client, request, %{pool_alive?: false}), do: request(client, request) - - def request(%Client{} = client, request, %{pool: pool, timeout: timeout}) do - :poolboy.transaction( - pool, - &Pleroma.Pool.Request.execute(&1, client, request, timeout), - timeout - ) + err -> + err + end end @spec request(Client.t(), keyword()) :: {:ok, Env.t()} | {:error, any()} diff --git a/lib/pleroma/pool/connections.ex b/lib/pleroma/pool/connections.ex deleted file mode 100644 index acafe1bea..000000000 --- a/lib/pleroma/pool/connections.ex +++ /dev/null @@ -1,283 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Pool.Connections do - use GenServer - - alias Pleroma.Config - alias Pleroma.Gun - - require Logger - - @type domain :: String.t() - @type conn :: Pleroma.Gun.Conn.t() - - @type t :: %__MODULE__{ - conns: %{domain() => conn()}, - opts: keyword() - } - - defstruct conns: %{}, opts: [] - - @spec start_link({atom(), keyword()}) :: {:ok, pid()} - def start_link({name, opts}) do - GenServer.start_link(__MODULE__, opts, name: name) - end - - @impl true - def init(opts), do: {:ok, %__MODULE__{conns: %{}, opts: opts}} - - @spec checkin(String.t() | URI.t(), atom()) :: pid() | nil - def checkin(url, name) - def checkin(url, name) when is_binary(url), do: checkin(URI.parse(url), name) - - def checkin(%URI{} = uri, name) do - timeout = Config.get([:connections_pool, :checkin_timeout], 250) - - GenServer.call(name, {:checkin, uri}, timeout) - end - - @spec alive?(atom()) :: boolean() - def alive?(name) do - if pid = Process.whereis(name) do - Process.alive?(pid) - else - false - end - end - - @spec get_state(atom()) :: t() - def get_state(name) do - GenServer.call(name, :state) - end - - @spec count(atom()) :: pos_integer() - def count(name) do - GenServer.call(name, :count) - end - - @spec get_unused_conns(atom()) :: [{domain(), conn()}] - def get_unused_conns(name) do - GenServer.call(name, :unused_conns) - end - - @spec checkout(pid(), pid(), atom()) :: :ok - def checkout(conn, pid, name) do - GenServer.cast(name, {:checkout, conn, pid}) - end - - @spec add_conn(atom(), String.t(), Pleroma.Gun.Conn.t()) :: :ok - def add_conn(name, key, conn) do - GenServer.cast(name, {:add_conn, key, conn}) - end - - @spec remove_conn(atom(), String.t()) :: :ok - def remove_conn(name, key) do - GenServer.cast(name, {:remove_conn, key}) - end - - @impl true - def handle_cast({:add_conn, key, conn}, state) do - state = put_in(state.conns[key], conn) - - Process.monitor(conn.conn) - {:noreply, state} - end - - @impl true - def handle_cast({:checkout, conn_pid, pid}, state) do - state = - with true <- Process.alive?(conn_pid), - {key, conn} <- find_conn(state.conns, conn_pid), - used_by <- List.keydelete(conn.used_by, pid, 0) do - conn_state = if used_by == [], do: :idle, else: conn.conn_state - - put_in(state.conns[key], %{conn | conn_state: conn_state, used_by: used_by}) - else - false -> - Logger.debug("checkout for closed conn #{inspect(conn_pid)}") - state - - nil -> - Logger.debug("checkout for alive conn #{inspect(conn_pid)}, but is not in state") - state - end - - {:noreply, state} - end - - @impl true - def handle_cast({:remove_conn, key}, state) do - state = put_in(state.conns, Map.delete(state.conns, key)) - {:noreply, state} - end - - @impl true - def handle_call({:checkin, uri}, from, state) do - key = "#{uri.scheme}:#{uri.host}:#{uri.port}" - - case state.conns[key] do - %{conn: pid, gun_state: :up} = conn -> - time = :os.system_time(:second) - last_reference = time - conn.last_reference - crf = crf(last_reference, 100, conn.crf) - - state = - put_in(state.conns[key], %{ - conn - | last_reference: time, - crf: crf, - conn_state: :active, - used_by: [from | conn.used_by] - }) - - {:reply, pid, state} - - %{gun_state: :down} -> - {:reply, nil, state} - - nil -> - {:reply, nil, state} - end - end - - @impl true - def handle_call(:state, _from, state), do: {:reply, state, state} - - @impl true - def handle_call(:count, _from, state) do - {:reply, Enum.count(state.conns), state} - end - - @impl true - def handle_call(:unused_conns, _from, state) do - unused_conns = - state.conns - |> Enum.filter(&filter_conns/1) - |> Enum.sort(&sort_conns/2) - - {:reply, unused_conns, state} - end - - defp filter_conns({_, %{conn_state: :idle, used_by: []}}), do: true - defp filter_conns(_), do: false - - defp sort_conns({_, c1}, {_, c2}) do - c1.crf <= c2.crf and c1.last_reference <= c2.last_reference - end - - @impl true - def handle_info({:gun_up, conn_pid, _protocol}, state) do - %{origin_host: host, origin_scheme: scheme, origin_port: port} = Gun.info(conn_pid) - - host = - case :inet.ntoa(host) do - {:error, :einval} -> host - ip -> ip - end - - key = "#{scheme}:#{host}:#{port}" - - state = - with {key, conn} <- find_conn(state.conns, conn_pid, key), - {true, key} <- {Process.alive?(conn_pid), key} do - put_in(state.conns[key], %{ - conn - | gun_state: :up, - conn_state: :active, - retries: 0 - }) - else - {false, key} -> - put_in( - state.conns, - Map.delete(state.conns, key) - ) - - nil -> - :ok = Gun.close(conn_pid) - - state - end - - {:noreply, state} - end - - @impl true - def handle_info({:gun_down, conn_pid, _protocol, _reason, _killed}, state) do - retries = Config.get([:connections_pool, :retry], 1) - # we can't get info on this pid, because pid is dead - state = - with {key, conn} <- find_conn(state.conns, conn_pid), - {true, key} <- {Process.alive?(conn_pid), key} do - if conn.retries == retries do - :ok = Gun.close(conn.conn) - - put_in( - state.conns, - Map.delete(state.conns, key) - ) - else - put_in(state.conns[key], %{ - conn - | gun_state: :down, - retries: conn.retries + 1 - }) - end - else - {false, key} -> - put_in( - state.conns, - Map.delete(state.conns, key) - ) - - nil -> - Logger.debug(":gun_down for conn which isn't found in state") - - state - end - - {:noreply, state} - end - - @impl true - def handle_info({:DOWN, _ref, :process, conn_pid, reason}, state) do - Logger.debug("received DOWN message for #{inspect(conn_pid)} reason -> #{inspect(reason)}") - - state = - with {key, conn} <- find_conn(state.conns, conn_pid) do - Enum.each(conn.used_by, fn {pid, _ref} -> - Process.exit(pid, reason) - end) - - put_in( - state.conns, - Map.delete(state.conns, key) - ) - else - nil -> - Logger.debug(":DOWN for conn which isn't found in state") - - state - end - - {:noreply, state} - end - - defp find_conn(conns, conn_pid) do - Enum.find(conns, fn {_key, conn} -> - conn.conn == conn_pid - end) - end - - defp find_conn(conns, conn_pid, conn_key) do - Enum.find(conns, fn {key, conn} -> - key == conn_key and conn.conn == conn_pid - end) - end - - def crf(current, steps, crf) do - 1 + :math.pow(0.5, current / steps) * crf - end -end diff --git a/lib/pleroma/pool/pool.ex b/lib/pleroma/pool/pool.ex deleted file mode 100644 index 21a6fbbc5..000000000 --- a/lib/pleroma/pool/pool.ex +++ /dev/null @@ -1,22 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Pool do - def child_spec(opts) do - poolboy_opts = - opts - |> Keyword.put(:worker_module, Pleroma.Pool.Request) - |> Keyword.put(:name, {:local, opts[:name]}) - |> Keyword.put(:size, opts[:size]) - |> Keyword.put(:max_overflow, opts[:max_overflow]) - - %{ - id: opts[:id] || {__MODULE__, make_ref()}, - start: {:poolboy, :start_link, [poolboy_opts, [name: opts[:name]]]}, - restart: :permanent, - shutdown: 5000, - type: :worker - } - end -end diff --git a/lib/pleroma/pool/request.ex b/lib/pleroma/pool/request.ex deleted file mode 100644 index 3fb930db7..000000000 --- a/lib/pleroma/pool/request.ex +++ /dev/null @@ -1,65 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Pool.Request do - use GenServer - - require Logger - - def start_link(args) do - GenServer.start_link(__MODULE__, args) - end - - @impl true - def init(_), do: {:ok, []} - - @spec execute(pid() | atom(), Tesla.Client.t(), keyword(), pos_integer()) :: - {:ok, Tesla.Env.t()} | {:error, any()} - def execute(pid, client, request, timeout) do - GenServer.call(pid, {:execute, client, request}, timeout) - end - - @impl true - def handle_call({:execute, client, request}, _from, state) do - response = Pleroma.HTTP.request(client, request) - - {:reply, response, state} - end - - @impl true - def handle_info({:gun_data, _conn, _stream, _, _}, state) do - {:noreply, state} - end - - @impl true - def handle_info({:gun_up, _conn, _protocol}, state) do - {:noreply, state} - end - - @impl true - def handle_info({:gun_down, _conn, _protocol, _reason, _killed}, state) do - {:noreply, state} - end - - @impl true - def handle_info({:gun_error, _conn, _stream, _error}, state) do - {:noreply, state} - end - - @impl true - def handle_info({:gun_push, _conn, _stream, _new_stream, _method, _uri, _headers}, state) do - {:noreply, state} - end - - @impl true - def handle_info({:gun_response, _conn, _stream, _, _status, _headers}, state) do - {:noreply, state} - end - - @impl true - def handle_info(msg, state) do - Logger.warn("Received unexpected message #{inspect(__MODULE__)} #{inspect(msg)}") - {:noreply, state} - end -end diff --git a/lib/pleroma/pool/supervisor.ex b/lib/pleroma/pool/supervisor.ex deleted file mode 100644 index faf646cb2..000000000 --- a/lib/pleroma/pool/supervisor.ex +++ /dev/null @@ -1,42 +0,0 @@ -# Pleroma: A lightweight social networking server -# Copyright © 2017-2020 Pleroma Authors -# SPDX-License-Identifier: AGPL-3.0-only - -defmodule Pleroma.Pool.Supervisor do - use Supervisor - - alias Pleroma.Config - alias Pleroma.Pool - - def start_link(args) do - Supervisor.start_link(__MODULE__, args, name: __MODULE__) - end - - def init(_) do - conns_child = %{ - id: Pool.Connections, - start: - {Pool.Connections, :start_link, [{:gun_connections, Config.get([:connections_pool])}]} - } - - Supervisor.init([conns_child | pools()], strategy: :one_for_one) - end - - defp pools do - pools = Config.get(:pools) - - pools = - if Config.get([Pleroma.Upload, :proxy_remote]) == false do - Keyword.delete(pools, :upload) - else - pools - end - - for {pool_name, pool_opts} <- pools do - pool_opts - |> Keyword.put(:id, {Pool, pool_name}) - |> Keyword.put(:name, pool_name) - |> Pool.child_spec() - end - end -end diff --git a/lib/pleroma/reverse_proxy/client/tesla.ex b/lib/pleroma/reverse_proxy/client/tesla.ex index e81ea8bde..65785445d 100644 --- a/lib/pleroma/reverse_proxy/client/tesla.ex +++ b/lib/pleroma/reverse_proxy/client/tesla.ex @@ -48,7 +48,7 @@ def stream_body(%{pid: pid, opts: opts, fin: true}) do # if there were redirects we need to checkout old conn conn = opts[:old_conn] || opts[:conn] - if conn, do: :ok = Pleroma.Pool.Connections.checkout(conn, self(), :gun_connections) + if conn, do: :ok = Pleroma.Gun.ConnectionPool.release_conn(conn) :done end