From c17d83cd7330d5c874f647a5224a3a130ed88fb0 Mon Sep 17 00:00:00 2001 From: Alexander Strizhakov Date: Wed, 19 Aug 2020 10:22:59 +0300 Subject: [PATCH 1/5] improvements and fixes for http requests - fix for gun worker termination in some circumstances - pool for http clients (ex_aws, tzdata) - default pool timeouts for gun - gun retries on gun_down messages - s3 upload timeout if streaming enabled --- config/config.exs | 12 +++++--- lib/pleroma/gun/connection_pool/worker.ex | 35 ++++++++++++++++------- lib/pleroma/http/adapter_helper.ex | 16 +++++++---- lib/pleroma/http/adapter_helper/gun.ex | 18 ++++++------ lib/pleroma/http/ex_aws.ex | 2 ++ lib/pleroma/http/tzdata.ex | 4 +++ lib/pleroma/uploaders/s3.ex | 19 ++++++++---- 7 files changed, 70 insertions(+), 36 deletions(-) diff --git a/config/config.exs b/config/config.exs index 246712b9f..ea8869d48 100644 --- a/config/config.exs +++ b/config/config.exs @@ -740,19 +740,23 @@ config :pleroma, :pools, federation: [ size: 50, - max_waiting: 10 + max_waiting: 10, + timeout: 10_000 ], media: [ size: 50, - max_waiting: 10 + max_waiting: 10, + timeout: 10_000 ], upload: [ size: 25, - max_waiting: 5 + max_waiting: 5, + timeout: 15_000 ], default: [ size: 10, - max_waiting: 2 + max_waiting: 2, + timeout: 5_000 ] config :pleroma, :hackney_pools, diff --git a/lib/pleroma/gun/connection_pool/worker.ex b/lib/pleroma/gun/connection_pool/worker.ex index fec9d0efa..c36332817 100644 --- a/lib/pleroma/gun/connection_pool/worker.ex +++ b/lib/pleroma/gun/connection_pool/worker.ex @@ -83,17 +83,25 @@ def handle_call(:remove_client, {client_pid, _}, %{key: key} = state) do end) {ref, state} = pop_in(state.client_monitors[client_pid]) - Process.demonitor(ref) - - timer = - if used_by == [] do - max_idle = Pleroma.Config.get([:connections_pool, :max_idle_time], 30_000) - Process.send_after(self(), :idle_close, max_idle) + # DOWN message can receive right after `remove_client` call and cause worker to terminate + state = + if is_nil(ref) do + state else - nil + Process.demonitor(ref) + + timer = + if used_by == [] do + max_idle = Pleroma.Config.get([:connections_pool, :max_idle_time], 30_000) + Process.send_after(self(), :idle_close, max_idle) + else + nil + end + + %{state | timer: timer} end - {:reply, :ok, %{state | timer: timer}, :hibernate} + {:reply, :ok, state, :hibernate} end @impl true @@ -103,16 +111,21 @@ def handle_info(:idle_close, state) do {:stop, :normal, state} end + @impl true + def handle_info({:gun_up, _pid, _protocol}, state) do + {:noreply, state, :hibernate} + end + # Gracefully shutdown if the connection got closed without any streams left @impl true def handle_info({:gun_down, _pid, _protocol, _reason, []}, state) do {:stop, :normal, state} end - # Otherwise, shutdown with an error + # Otherwise, wait for retry @impl true - def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams} = down_message, state) do - {:stop, {:error, down_message}, state} + def handle_info({:gun_down, _pid, _protocol, _reason, _killed_streams}, state) do + {:noreply, state, :hibernate} end @impl true diff --git a/lib/pleroma/http/adapter_helper.ex b/lib/pleroma/http/adapter_helper.ex index 9ec3836b0..740e6e9ff 100644 --- a/lib/pleroma/http/adapter_helper.ex +++ b/lib/pleroma/http/adapter_helper.ex @@ -10,6 +10,7 @@ defmodule Pleroma.HTTP.AdapterHelper do @type proxy_type() :: :socks4 | :socks5 @type host() :: charlist() | :inet.ip_address() + @type pool() :: :federation | :upload | :media | :default alias Pleroma.Config alias Pleroma.HTTP.AdapterHelper @@ -44,14 +45,13 @@ def maybe_add_proxy(opts, proxy), do: Keyword.put_new(opts, :proxy, proxy) @spec options(URI.t(), keyword()) :: keyword() def options(%URI{} = uri, opts \\ []) do @defaults - |> put_timeout() |> Keyword.merge(opts) + |> put_timeout() |> adapter_helper().options(uri) end - # For Hackney, this is the time a connection can stay idle in the pool. - # For Gun, this is the timeout to receive a message from Gun. - defp put_timeout(opts) do + @spec pool_timeout(pool()) :: non_neg_integer() + def pool_timeout(pool) do {config_key, default} = if adapter() == Tesla.Adapter.Gun do {:pools, Config.get([:pools, :default, :timeout], 5_000)} @@ -59,9 +59,13 @@ defp put_timeout(opts) do {:hackney_pools, 10_000} end - timeout = Config.get([config_key, opts[:pool], :timeout], default) + Config.get([config_key, pool, :timeout], default) + end - Keyword.merge(opts, timeout: timeout) + # For Hackney, this is the time a connection can stay idle in the pool. + # For Gun, this is the timeout to receive a message from Gun. + defp put_timeout(opts) do + Keyword.put_new(opts, :timeout, pool_timeout(opts[:pool])) end def get_conn(uri, opts), do: adapter_helper().get_conn(uri, opts) diff --git a/lib/pleroma/http/adapter_helper/gun.ex b/lib/pleroma/http/adapter_helper/gun.ex index b4ff8306c..db0a298b3 100644 --- a/lib/pleroma/http/adapter_helper/gun.ex +++ b/lib/pleroma/http/adapter_helper/gun.ex @@ -5,6 +5,7 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do @behaviour Pleroma.HTTP.AdapterHelper + alias Pleroma.Config alias Pleroma.Gun.ConnectionPool alias Pleroma.HTTP.AdapterHelper @@ -14,7 +15,7 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do connect_timeout: 5_000, domain_lookup_timeout: 5_000, tls_handshake_timeout: 5_000, - retry: 0, + retry: 1, retry_timeout: 1000, await_up_timeout: 5_000 ] @@ -22,10 +23,11 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do @spec options(keyword(), URI.t()) :: keyword() def options(incoming_opts \\ [], %URI{} = uri) do proxy = - Pleroma.Config.get([:http, :proxy_url]) + [:http, :proxy_url] + |> Config.get() |> AdapterHelper.format_proxy() - config_opts = Pleroma.Config.get([:http, :adapter], []) + config_opts = Config.get([:http, :adapter], []) @defaults |> Keyword.merge(config_opts) @@ -37,8 +39,7 @@ def options(incoming_opts \\ [], %URI{} = uri) do defp add_scheme_opts(opts, %{scheme: "http"}), do: opts defp add_scheme_opts(opts, %{scheme: "https"}) do - opts - |> Keyword.put(:certificates_verification, true) + Keyword.put(opts, :certificates_verification, true) end @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} | {:error, atom()} @@ -51,11 +52,11 @@ def get_conn(uri, opts) do @prefix Pleroma.Gun.ConnectionPool def limiter_setup do - wait = Pleroma.Config.get([:connections_pool, :connection_acquisition_wait]) - retries = Pleroma.Config.get([:connections_pool, :connection_acquisition_retries]) + wait = Config.get([:connections_pool, :connection_acquisition_wait]) + retries = Config.get([:connections_pool, :connection_acquisition_retries]) :pools - |> Pleroma.Config.get([]) + |> Config.get([]) |> Enum.each(fn {name, opts} -> max_running = Keyword.get(opts, :size, 50) max_waiting = Keyword.get(opts, :max_waiting, 10) @@ -69,7 +70,6 @@ def limiter_setup do case result do :ok -> :ok {:error, :existing} -> :ok - e -> raise e end end) diff --git a/lib/pleroma/http/ex_aws.ex b/lib/pleroma/http/ex_aws.ex index e53e64077..2fe8beafc 100644 --- a/lib/pleroma/http/ex_aws.ex +++ b/lib/pleroma/http/ex_aws.ex @@ -11,6 +11,8 @@ defmodule Pleroma.HTTP.ExAws do @impl true def request(method, url, body \\ "", headers \\ [], http_opts \\ []) do + http_opts = Keyword.put(http_opts, :adapter, pool: :upload) + case HTTP.request(method, url, body, headers, http_opts) do {:ok, env} -> {:ok, %{status_code: env.status, headers: env.headers, body: env.body}} diff --git a/lib/pleroma/http/tzdata.ex b/lib/pleroma/http/tzdata.ex index 34bb253a7..7dd3352ff 100644 --- a/lib/pleroma/http/tzdata.ex +++ b/lib/pleroma/http/tzdata.ex @@ -11,6 +11,8 @@ defmodule Pleroma.HTTP.Tzdata do @impl true def get(url, headers, options) do + options = Keyword.put(options, :adapter, pool: :upload) + with {:ok, %Tesla.Env{} = env} <- HTTP.get(url, headers, options) do {:ok, {env.status, env.headers, env.body}} end @@ -18,6 +20,8 @@ def get(url, headers, options) do @impl true def head(url, headers, options) do + options = Keyword.put(options, :adapter, pool: :upload) + with {:ok, %Tesla.Env{} = env} <- HTTP.head(url, headers, options) do {:ok, {env.status, env.headers}} end diff --git a/lib/pleroma/uploaders/s3.ex b/lib/pleroma/uploaders/s3.ex index a13ff23b6..ed9794ca2 100644 --- a/lib/pleroma/uploaders/s3.ex +++ b/lib/pleroma/uploaders/s3.ex @@ -46,12 +46,19 @@ def put_file(%Pleroma.Upload{} = upload) do op = if streaming do - upload.tempfile - |> ExAws.S3.Upload.stream_file() - |> ExAws.S3.upload(bucket, s3_name, [ - {:acl, :public_read}, - {:content_type, upload.content_type} - ]) + op = + upload.tempfile + |> ExAws.S3.Upload.stream_file() + |> ExAws.S3.upload(bucket, s3_name, [ + {:acl, :public_read}, + {:content_type, upload.content_type} + ]) + + # set s3 upload timeout to respect :upload pool timeout + # timeout should be slightly larger, so s3 can retry upload on fail + timeout = Pleroma.HTTP.AdapterHelper.pool_timeout(:upload) + 1_000 + opts = Keyword.put(op.opts, :timeout, timeout) + Map.put(op, :opts, opts) else {:ok, file_data} = File.read(upload.tempfile) From 5e8adf91b46c3f23ec423d53afccbb062df4a241 Mon Sep 17 00:00:00 2001 From: Alexander Strizhakov Date: Tue, 1 Sep 2020 11:30:56 +0300 Subject: [PATCH 2/5] don't overwrite passed pool option in http clients --- lib/pleroma/http/ex_aws.ex | 2 +- lib/pleroma/http/tzdata.ex | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/pleroma/http/ex_aws.ex b/lib/pleroma/http/ex_aws.ex index 2fe8beafc..c3f335c73 100644 --- a/lib/pleroma/http/ex_aws.ex +++ b/lib/pleroma/http/ex_aws.ex @@ -11,7 +11,7 @@ defmodule Pleroma.HTTP.ExAws do @impl true def request(method, url, body \\ "", headers \\ [], http_opts \\ []) do - http_opts = Keyword.put(http_opts, :adapter, pool: :upload) + http_opts = Keyword.put_new(http_opts, :adapter, pool: :upload) case HTTP.request(method, url, body, headers, http_opts) do {:ok, env} -> diff --git a/lib/pleroma/http/tzdata.ex b/lib/pleroma/http/tzdata.ex index 7dd3352ff..356799aab 100644 --- a/lib/pleroma/http/tzdata.ex +++ b/lib/pleroma/http/tzdata.ex @@ -11,7 +11,7 @@ defmodule Pleroma.HTTP.Tzdata do @impl true def get(url, headers, options) do - options = Keyword.put(options, :adapter, pool: :upload) + options = Keyword.put_new(options, :adapter, pool: :upload) with {:ok, %Tesla.Env{} = env} <- HTTP.get(url, headers, options) do {:ok, {env.status, env.headers, env.body}} @@ -20,7 +20,7 @@ def get(url, headers, options) do @impl true def head(url, headers, options) do - options = Keyword.put(options, :adapter, pool: :upload) + options = Keyword.put_new(options, :adapter, pool: :upload) with {:ok, %Tesla.Env{} = env} <- HTTP.head(url, headers, options) do {:ok, {env.status, env.headers}} From 79f65b4374908a32ebf39db176a30a01152a9141 Mon Sep 17 00:00:00 2001 From: Alexander Strizhakov Date: Wed, 2 Sep 2020 09:16:51 +0300 Subject: [PATCH 3/5] correct pool and uniform headers format --- lib/mix/tasks/pleroma/frontend.ex | 4 +++- lib/pleroma/instances/instance.ex | 4 +++- lib/pleroma/object/fetcher.ex | 6 +++--- lib/pleroma/web/rich_media/helpers.ex | 2 +- lib/pleroma/web/web_finger/web_finger.ex | 4 ++-- test/support/http_request_mock.ex | 4 ++-- 6 files changed, 14 insertions(+), 10 deletions(-) diff --git a/lib/mix/tasks/pleroma/frontend.ex b/lib/mix/tasks/pleroma/frontend.ex index 2adbf8d72..484af6da7 100644 --- a/lib/mix/tasks/pleroma/frontend.ex +++ b/lib/mix/tasks/pleroma/frontend.ex @@ -124,7 +124,9 @@ defp download_build(frontend_info, dest) do url = String.replace(frontend_info["build_url"], "${ref}", frontend_info["ref"]) with {:ok, %{status: 200, body: zip_body}} <- - Pleroma.HTTP.get(url, [], timeout: 120_000, recv_timeout: 120_000) do + Pleroma.HTTP.get(url, [], + adapter: [pool: :media, timeout: 120_000, recv_timeout: 120_000] + ) do unzip(zip_body, dest) else e -> {:error, e} diff --git a/lib/pleroma/instances/instance.ex b/lib/pleroma/instances/instance.ex index a1f935232..711c42158 100644 --- a/lib/pleroma/instances/instance.ex +++ b/lib/pleroma/instances/instance.ex @@ -150,7 +150,9 @@ def get_or_update_favicon(%URI{host: host} = instance_uri) do defp scrape_favicon(%URI{} = instance_uri) do try do with {:ok, %Tesla.Env{body: html}} <- - Pleroma.HTTP.get(to_string(instance_uri), [{:Accept, "text/html"}]), + Pleroma.HTTP.get(to_string(instance_uri), [{"accept", "text/html"}], + adapter: [pool: :media] + ), favicon_rel <- html |> Floki.parse_document!() diff --git a/lib/pleroma/object/fetcher.ex b/lib/pleroma/object/fetcher.ex index 6fdbc8efd..374d8704a 100644 --- a/lib/pleroma/object/fetcher.ex +++ b/lib/pleroma/object/fetcher.ex @@ -164,12 +164,12 @@ defp make_signature(id, date) do date: date }) - [{"signature", signature}] + {"signature", signature} end defp sign_fetch(headers, id, date) do if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do - headers ++ make_signature(id, date) + [make_signature(id, date) | headers] else headers end @@ -177,7 +177,7 @@ defp sign_fetch(headers, id, date) do defp maybe_date_fetch(headers, date) do if Pleroma.Config.get([:activitypub, :sign_object_fetches]) do - headers ++ [{"date", date}] + [{"date", date} | headers] else headers end diff --git a/lib/pleroma/web/rich_media/helpers.ex b/lib/pleroma/web/rich_media/helpers.ex index 6210f2c5a..2fb482b51 100644 --- a/lib/pleroma/web/rich_media/helpers.ex +++ b/lib/pleroma/web/rich_media/helpers.ex @@ -96,6 +96,6 @@ def rich_media_get(url) do @rich_media_options end - Pleroma.HTTP.get(url, headers, options) + Pleroma.HTTP.get(url, headers, adapter: options) end end diff --git a/lib/pleroma/web/web_finger/web_finger.ex b/lib/pleroma/web/web_finger/web_finger.ex index c4051e63e..6629f5356 100644 --- a/lib/pleroma/web/web_finger/web_finger.ex +++ b/lib/pleroma/web/web_finger/web_finger.ex @@ -136,12 +136,12 @@ def get_template_from_xml(body) do def find_lrdd_template(domain) do with {:ok, %{status: status, body: body}} when status in 200..299 <- - HTTP.get("http://#{domain}/.well-known/host-meta", []) do + HTTP.get("http://#{domain}/.well-known/host-meta") do get_template_from_xml(body) else _ -> with {:ok, %{body: body, status: status}} when status in 200..299 <- - HTTP.get("https://#{domain}/.well-known/host-meta", []) do + HTTP.get("https://#{domain}/.well-known/host-meta") do get_template_from_xml(body) else e -> {:error, "Can't find LRDD template: #{inspect(e)}"} diff --git a/test/support/http_request_mock.ex b/test/support/http_request_mock.ex index eeeba7880..a0ebf65d9 100644 --- a/test/support/http_request_mock.ex +++ b/test/support/http_request_mock.ex @@ -1350,11 +1350,11 @@ def get("https://relay.mastodon.host/actor", _, _, _) do {:ok, %Tesla.Env{status: 200, body: File.read!("test/fixtures/relay/relay.json")}} end - def get("http://localhost:4001/", _, "", Accept: "text/html") do + def get("http://localhost:4001/", _, "", [{"accept", "text/html"}]) do {:ok, %Tesla.Env{status: 200, body: File.read!("test/fixtures/tesla_mock/7369654.html")}} end - def get("https://osada.macgirvin.com/", _, "", Accept: "text/html") do + def get("https://osada.macgirvin.com/", _, "", [{"accept", "text/html"}]) do {:ok, %Tesla.Env{ status: 200, From 1c57ef44983e150f3cc016290fe99f159eb79cb0 Mon Sep 17 00:00:00 2001 From: Alexander Strizhakov Date: Wed, 2 Sep 2020 10:33:43 +0300 Subject: [PATCH 4/5] default pool for tz_data client --- lib/pleroma/http/tzdata.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/pleroma/http/tzdata.ex b/lib/pleroma/http/tzdata.ex index 356799aab..4539ac359 100644 --- a/lib/pleroma/http/tzdata.ex +++ b/lib/pleroma/http/tzdata.ex @@ -11,7 +11,7 @@ defmodule Pleroma.HTTP.Tzdata do @impl true def get(url, headers, options) do - options = Keyword.put_new(options, :adapter, pool: :upload) + options = Keyword.put_new(options, :adapter, pool: :default) with {:ok, %Tesla.Env{} = env} <- HTTP.get(url, headers, options) do {:ok, {env.status, env.headers, env.body}} @@ -20,7 +20,7 @@ def get(url, headers, options) do @impl true def head(url, headers, options) do - options = Keyword.put_new(options, :adapter, pool: :upload) + options = Keyword.put_new(options, :adapter, pool: :default) with {:ok, %Tesla.Env{} = env} <- HTTP.head(url, headers, options) do {:ok, {env.status, env.headers}} From 84fbf1616104c09e0f4f5442d86ca2c573ae4056 Mon Sep 17 00:00:00 2001 From: Alexander Strizhakov Date: Wed, 2 Sep 2020 10:50:51 +0300 Subject: [PATCH 5/5] timeout option moved to gun adapter helper --- lib/pleroma/http/adapter_helper.ex | 23 ++--------------------- lib/pleroma/http/adapter_helper/gun.ex | 15 +++++++++++++++ lib/pleroma/uploaders/s3.ex | 14 +++++++++----- 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/lib/pleroma/http/adapter_helper.ex b/lib/pleroma/http/adapter_helper.ex index 740e6e9ff..0728cbaa2 100644 --- a/lib/pleroma/http/adapter_helper.ex +++ b/lib/pleroma/http/adapter_helper.ex @@ -10,9 +10,7 @@ defmodule Pleroma.HTTP.AdapterHelper do @type proxy_type() :: :socks4 | :socks5 @type host() :: charlist() | :inet.ip_address() - @type pool() :: :federation | :upload | :media | :default - alias Pleroma.Config alias Pleroma.HTTP.AdapterHelper require Logger @@ -46,29 +44,12 @@ def maybe_add_proxy(opts, proxy), do: Keyword.put_new(opts, :proxy, proxy) def options(%URI{} = uri, opts \\ []) do @defaults |> Keyword.merge(opts) - |> put_timeout() |> adapter_helper().options(uri) end - @spec pool_timeout(pool()) :: non_neg_integer() - def pool_timeout(pool) do - {config_key, default} = - if adapter() == Tesla.Adapter.Gun do - {:pools, Config.get([:pools, :default, :timeout], 5_000)} - else - {:hackney_pools, 10_000} - end - - Config.get([config_key, pool, :timeout], default) - end - - # For Hackney, this is the time a connection can stay idle in the pool. - # For Gun, this is the timeout to receive a message from Gun. - defp put_timeout(opts) do - Keyword.put_new(opts, :timeout, pool_timeout(opts[:pool])) - end - + @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} | {:error, atom()} def get_conn(uri, opts), do: adapter_helper().get_conn(uri, opts) + defp adapter, do: Application.get_env(:tesla, :adapter) defp adapter_helper do diff --git a/lib/pleroma/http/adapter_helper/gun.ex b/lib/pleroma/http/adapter_helper/gun.ex index db0a298b3..02e20f2d1 100644 --- a/lib/pleroma/http/adapter_helper/gun.ex +++ b/lib/pleroma/http/adapter_helper/gun.ex @@ -20,6 +20,8 @@ defmodule Pleroma.HTTP.AdapterHelper.Gun do await_up_timeout: 5_000 ] + @type pool() :: :federation | :upload | :media | :default + @spec options(keyword(), URI.t()) :: keyword() def options(incoming_opts \\ [], %URI{} = uri) do proxy = @@ -34,6 +36,7 @@ def options(incoming_opts \\ [], %URI{} = uri) do |> add_scheme_opts(uri) |> AdapterHelper.maybe_add_proxy(proxy) |> Keyword.merge(incoming_opts) + |> put_timeout() end defp add_scheme_opts(opts, %{scheme: "http"}), do: opts @@ -42,6 +45,18 @@ defp add_scheme_opts(opts, %{scheme: "https"}) do Keyword.put(opts, :certificates_verification, true) end + defp put_timeout(opts) do + # this is the timeout to receive a message from Gun + Keyword.put_new(opts, :timeout, pool_timeout(opts[:pool])) + end + + @spec pool_timeout(pool()) :: non_neg_integer() + def pool_timeout(pool) do + default = Config.get([:pools, :default, :timeout], 5_000) + + Config.get([:pools, pool, :timeout], default) + end + @spec get_conn(URI.t(), keyword()) :: {:ok, keyword()} | {:error, atom()} def get_conn(uri, opts) do case ConnectionPool.get_conn(uri, opts) do diff --git a/lib/pleroma/uploaders/s3.ex b/lib/pleroma/uploaders/s3.ex index ed9794ca2..6dbef9085 100644 --- a/lib/pleroma/uploaders/s3.ex +++ b/lib/pleroma/uploaders/s3.ex @@ -54,11 +54,15 @@ def put_file(%Pleroma.Upload{} = upload) do {:content_type, upload.content_type} ]) - # set s3 upload timeout to respect :upload pool timeout - # timeout should be slightly larger, so s3 can retry upload on fail - timeout = Pleroma.HTTP.AdapterHelper.pool_timeout(:upload) + 1_000 - opts = Keyword.put(op.opts, :timeout, timeout) - Map.put(op, :opts, opts) + if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do + # set s3 upload timeout to respect :upload pool timeout + # timeout should be slightly larger, so s3 can retry upload on fail + timeout = Pleroma.HTTP.AdapterHelper.Gun.pool_timeout(:upload) + 1_000 + opts = Keyword.put(op.opts, :timeout, timeout) + Map.put(op, :opts, opts) + else + op + end else {:ok, file_data} = File.read(upload.tempfile)