Finch everywhere
This commit is contained in:
parent
28612096ba
commit
02c62dd97f
27 changed files with 117 additions and 546 deletions
|
@ -175,7 +175,7 @@
|
|||
"application/ld+json" => ["activity+json"]
|
||||
}
|
||||
|
||||
config :tesla, adapter: Tesla.Adapter.Hackney
|
||||
config :tesla, :adapter, {Tesla.Adapter.Finch, name: MyFinch}
|
||||
|
||||
# Configures http settings, upstream proxy etc.
|
||||
config :pleroma, :http,
|
||||
|
@ -441,8 +441,7 @@
|
|||
# Note: max_read_duration defaults to Pleroma.ReverseProxy.max_read_duration_default/1
|
||||
max_read_duration: 30_000,
|
||||
http: [
|
||||
follow_redirect: true,
|
||||
pool: :media
|
||||
follow_redirect: true
|
||||
]
|
||||
],
|
||||
whitelist: []
|
||||
|
@ -764,51 +763,6 @@
|
|||
parameters: [gin_fuzzy_search_limit: "500"],
|
||||
prepare: :unnamed
|
||||
|
||||
config :pleroma, :connections_pool,
|
||||
reclaim_multiplier: 0.1,
|
||||
connection_acquisition_wait: 250,
|
||||
connection_acquisition_retries: 5,
|
||||
max_connections: 250,
|
||||
max_idle_time: 30_000,
|
||||
retry: 0,
|
||||
connect_timeout: 5_000
|
||||
|
||||
config :pleroma, :pools,
|
||||
federation: [
|
||||
size: 50,
|
||||
max_waiting: 10,
|
||||
recv_timeout: 10_000
|
||||
],
|
||||
media: [
|
||||
size: 50,
|
||||
max_waiting: 20,
|
||||
recv_timeout: 15_000
|
||||
],
|
||||
upload: [
|
||||
size: 25,
|
||||
max_waiting: 5,
|
||||
recv_timeout: 15_000
|
||||
],
|
||||
default: [
|
||||
size: 10,
|
||||
max_waiting: 2,
|
||||
recv_timeout: 5_000
|
||||
]
|
||||
|
||||
config :pleroma, :hackney_pools,
|
||||
federation: [
|
||||
max_connections: 50,
|
||||
timeout: 150_000
|
||||
],
|
||||
media: [
|
||||
max_connections: 50,
|
||||
timeout: 150_000
|
||||
],
|
||||
upload: [
|
||||
max_connections: 25,
|
||||
timeout: 300_000
|
||||
]
|
||||
|
||||
config :pleroma, :majic_pool, size: 2
|
||||
|
||||
private_instance? = :if_instance_is_private
|
||||
|
|
|
@ -104,12 +104,8 @@
|
|||
|
||||
config :joken, default_signer: "yU8uHKq+yyAkZ11Hx//jcdacWc8yQ1bxAAGrplzB0Zwwjkp35v0RK9SO8WTPr6QZ"
|
||||
|
||||
config :pleroma, Pleroma.ReverseProxy.Client, Pleroma.ReverseProxy.ClientMock
|
||||
|
||||
config :pleroma, :modules, runtime_dir: "test/fixtures/modules"
|
||||
|
||||
config :pleroma, Pleroma.Gun, Pleroma.GunMock
|
||||
|
||||
config :pleroma, Pleroma.Emails.NewUsersDigestEmail, enabled: true
|
||||
|
||||
config :pleroma, Pleroma.Web.Plugs.RemoteIp, enabled: false
|
||||
|
|
|
@ -28,16 +28,7 @@ def start_pleroma do
|
|||
Logger.remove_backend(:console)
|
||||
end
|
||||
|
||||
adapter = Application.get_env(:tesla, :adapter)
|
||||
|
||||
apps =
|
||||
if adapter == Tesla.Adapter.Gun do
|
||||
[:gun | @apps]
|
||||
else
|
||||
[:hackney | @apps]
|
||||
end
|
||||
|
||||
Enum.each(apps, &Application.ensure_all_started/1)
|
||||
Enum.each(@apps, &Application.ensure_all_started/1)
|
||||
|
||||
oban_config = [
|
||||
crontab: [],
|
||||
|
@ -57,7 +48,6 @@ def start_pleroma do
|
|||
{Majic.Pool,
|
||||
[name: Pleroma.MajicPool, pool_size: Pleroma.Config.get([:majic_pool, :size], 2)]}
|
||||
] ++
|
||||
http_children(adapter) ++
|
||||
elasticsearch_children()
|
||||
|
||||
cachex_children = Enum.map(@cachex_children, &Pleroma.Application.build_cachex(&1, []))
|
||||
|
@ -131,13 +121,6 @@ def escape_sh_path(path) do
|
|||
~S(') <> String.replace(path, ~S('), ~S(\')) <> ~S(')
|
||||
end
|
||||
|
||||
defp http_children(Tesla.Adapter.Gun) do
|
||||
Pleroma.Gun.ConnectionPool.children() ++
|
||||
[{Task, &Pleroma.HTTP.AdapterHelper.Gun.limiter_setup/0}]
|
||||
end
|
||||
|
||||
defp http_children(_), do: []
|
||||
|
||||
def elasticsearch_children do
|
||||
config = Pleroma.Config.get([Pleroma.Search, :module])
|
||||
|
||||
|
|
|
@ -74,40 +74,4 @@ def run(["render_timeline", nickname | _] = args) do
|
|||
inputs: inputs
|
||||
)
|
||||
end
|
||||
|
||||
def run(["adapters"]) do
|
||||
start_pleroma()
|
||||
|
||||
:ok =
|
||||
Pleroma.Gun.Conn.open(
|
||||
"https://httpbin.org/stream-bytes/1500",
|
||||
:gun_connections
|
||||
)
|
||||
|
||||
Process.sleep(1_500)
|
||||
|
||||
Benchee.run(
|
||||
%{
|
||||
"Without conn and without pool" => fn ->
|
||||
{:ok, %Tesla.Env{}} =
|
||||
Pleroma.HTTP.get("https://httpbin.org/stream-bytes/1500", [],
|
||||
pool: :no_pool,
|
||||
receive_conn: false
|
||||
)
|
||||
end,
|
||||
"Without conn and with pool" => fn ->
|
||||
{:ok, %Tesla.Env{}} =
|
||||
Pleroma.HTTP.get("https://httpbin.org/stream-bytes/1500", [], receive_conn: false)
|
||||
end,
|
||||
"With reused conn and without pool" => fn ->
|
||||
{:ok, %Tesla.Env{}} =
|
||||
Pleroma.HTTP.get("https://httpbin.org/stream-bytes/1500", [], pool: :no_pool)
|
||||
end,
|
||||
"With reused conn and with pool" => fn ->
|
||||
{:ok, %Tesla.Env{}} = Pleroma.HTTP.get("https://httpbin.org/stream-bytes/1500")
|
||||
end
|
||||
},
|
||||
parallel: 10
|
||||
)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -59,34 +59,8 @@ def start(_type, _args) do
|
|||
Pleroma.Docs.JSON.compile()
|
||||
limiters_setup()
|
||||
|
||||
adapter = Application.get_env(:tesla, :adapter)
|
||||
|
||||
if match?({Tesla.Adapter.Finch, _}, adapter) do
|
||||
Logger.info("Starting Finch")
|
||||
Finch.start_link(name: MyFinch)
|
||||
end
|
||||
|
||||
if adapter == Tesla.Adapter.Gun do
|
||||
if version = Pleroma.OTPVersion.version() do
|
||||
[major, minor] =
|
||||
version
|
||||
|> String.split(".")
|
||||
|> Enum.map(&String.to_integer/1)
|
||||
|> Enum.take(2)
|
||||
|
||||
if (major == 22 and minor < 2) or major < 22 do
|
||||
raise "
|
||||
!!!OTP VERSION WARNING!!!
|
||||
You are using gun adapter with OTP version #{version}, which doesn't support correct handling of unordered certificates chains. Please update your Erlang/OTP to at least 22.2.
|
||||
"
|
||||
end
|
||||
else
|
||||
raise "
|
||||
!!!OTP VERSION WARNING!!!
|
||||
To support correct handling of unordered certificates chains - OTP version must be > 22.2.
|
||||
"
|
||||
end
|
||||
end
|
||||
Logger.info("Starting Finch")
|
||||
Finch.start_link(name: MyFinch)
|
||||
|
||||
# Define workers and child supervisors to be supervised
|
||||
children =
|
||||
|
@ -97,7 +71,6 @@ def start(_type, _args) do
|
|||
Pleroma.Web.Plugs.RateLimiter.Supervisor
|
||||
] ++
|
||||
cachex_children() ++
|
||||
http_children(adapter, @mix_env) ++
|
||||
[
|
||||
Pleroma.Stats,
|
||||
Pleroma.JobQueueMonitor,
|
||||
|
@ -276,34 +249,6 @@ defp task_children(_) do
|
|||
]
|
||||
end
|
||||
|
||||
# start hackney and gun pools in tests
|
||||
defp http_children(_, :test) do
|
||||
http_children(Tesla.Adapter.Hackney, nil) ++ http_children(Tesla.Adapter.Gun, nil)
|
||||
end
|
||||
|
||||
defp http_children(Tesla.Adapter.Hackney, _) do
|
||||
pools = [:federation, :media]
|
||||
|
||||
pools =
|
||||
if Config.get([Pleroma.Upload, :proxy_remote]) do
|
||||
[:upload | pools]
|
||||
else
|
||||
pools
|
||||
end
|
||||
|
||||
for pool <- pools do
|
||||
options = Config.get([:hackney_pools, pool])
|
||||
:hackney_pool.child_spec(pool, options)
|
||||
end
|
||||
end
|
||||
|
||||
defp http_children(Tesla.Adapter.Gun, _) do
|
||||
Pleroma.Gun.ConnectionPool.children() ++
|
||||
[{Task, &Pleroma.HTTP.AdapterHelper.Gun.limiter_setup/0}]
|
||||
end
|
||||
|
||||
defp http_children(_, _), do: []
|
||||
|
||||
def elasticsearch_children do
|
||||
config = Config.get([Pleroma.Search, :module])
|
||||
|
||||
|
|
|
@ -173,7 +173,6 @@ def warn do
|
|||
check_old_mrf_config(),
|
||||
check_media_proxy_whitelist_config(),
|
||||
check_welcome_message_config(),
|
||||
check_gun_pool_options(),
|
||||
check_activity_expiration_config(),
|
||||
check_remote_ip_plug_name(),
|
||||
check_uploders_s3_public_endpoint(),
|
||||
|
@ -257,51 +256,6 @@ def check_media_proxy_whitelist_config do
|
|||
end
|
||||
end
|
||||
|
||||
def check_gun_pool_options do
|
||||
pool_config = Config.get(:connections_pool)
|
||||
|
||||
if timeout = pool_config[:await_up_timeout] do
|
||||
Logger.warn("""
|
||||
!!!DEPRECATION WARNING!!!
|
||||
Your config is using old setting `config :pleroma, :connections_pool, await_up_timeout`. Please change to `config :pleroma, :connections_pool, connect_timeout` to ensure compatibility with future releases.
|
||||
""")
|
||||
|
||||
Config.put(:connections_pool, Keyword.put_new(pool_config, :connect_timeout, timeout))
|
||||
end
|
||||
|
||||
pools_configs = Config.get(:pools)
|
||||
|
||||
warning_preface = """
|
||||
!!!DEPRECATION WARNING!!!
|
||||
Your config is using old setting name `timeout` instead of `recv_timeout` in pool settings. Setting should work for now, but you are advised to change format to scheme with port to prevent possible issues later.
|
||||
"""
|
||||
|
||||
updated_config =
|
||||
Enum.reduce(pools_configs, [], fn {pool_name, config}, acc ->
|
||||
if timeout = config[:timeout] do
|
||||
Keyword.put(acc, pool_name, Keyword.put_new(config, :recv_timeout, timeout))
|
||||
else
|
||||
acc
|
||||
end
|
||||
end)
|
||||
|
||||
if updated_config != [] do
|
||||
pool_warnings =
|
||||
updated_config
|
||||
|> Keyword.keys()
|
||||
|> Enum.map(fn pool_name ->
|
||||
"\n* `:timeout` options in #{pool_name} pool is now `:recv_timeout`"
|
||||
end)
|
||||
|
||||
Logger.warn(Enum.join([warning_preface | pool_warnings]))
|
||||
|
||||
Config.put(:pools, updated_config)
|
||||
:error
|
||||
else
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
||||
@spec check_activity_expiration_config() :: :ok | nil
|
||||
def check_activity_expiration_config do
|
||||
warning_preface = """
|
||||
|
|
|
@ -15,14 +15,11 @@ defmodule Pleroma.Config.TransferTask do
|
|||
|
||||
defp reboot_time_keys,
|
||||
do: [
|
||||
{:pleroma, :hackney_pools},
|
||||
{:pleroma, :shout},
|
||||
{:pleroma, Oban},
|
||||
{:pleroma, :rate_limit},
|
||||
{:pleroma, :markup},
|
||||
{:pleroma, :streamer},
|
||||
{:pleroma, :pools},
|
||||
{:pleroma, :connections_pool}
|
||||
{:pleroma, :streamer}
|
||||
]
|
||||
|
||||
defp reboot_time_subkeys,
|
||||
|
|
|
@ -542,7 +542,7 @@ defp get_filename(pack, shortcode) do
|
|||
defp http_get(%URI{} = url), do: url |> to_string() |> http_get()
|
||||
|
||||
defp http_get(url) do
|
||||
with {:ok, %{body: body}} <- Pleroma.HTTP.get(url, [], pool: :default) do
|
||||
with {:ok, %{body: body}} <- Pleroma.HTTP.get(url, [], []) do
|
||||
Jason.decode(body)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -93,7 +93,7 @@ defp download_build(frontend_info, dest) do
|
|||
url = String.replace(frontend_info["build_url"], "${ref}", frontend_info["ref"])
|
||||
|
||||
with {:ok, %{status: 200, body: zip_body}} <-
|
||||
Pleroma.HTTP.get(url, [], pool: :media, recv_timeout: 120_000) do
|
||||
Pleroma.HTTP.get(url, [], recv_timeout: 120_000) do
|
||||
unzip(zip_body, dest)
|
||||
else
|
||||
{:error, e} -> {:error, e}
|
||||
|
|
|
@ -24,7 +24,7 @@ def missing_dependencies do
|
|||
def image_resize(url, options) do
|
||||
with executable when is_binary(executable) <- System.find_executable("convert"),
|
||||
{:ok, args} <- prepare_image_resize_args(options),
|
||||
{:ok, env} <- HTTP.get(url, [], pool: :media),
|
||||
{:ok, env} <- HTTP.get(url, [], []),
|
||||
{:ok, fifo_path} <- mkfifo() do
|
||||
args = List.flatten([fifo_path, args])
|
||||
run_fifo(fifo_path, env, executable, args)
|
||||
|
@ -73,7 +73,7 @@ defp prepare_image_resize_args(_), do: {:error, :missing_options}
|
|||
# Note: video thumbnail is intentionally not resized (always has original dimensions)
|
||||
def video_framegrab(url) do
|
||||
with executable when is_binary(executable) <- System.find_executable("ffmpeg"),
|
||||
{:ok, env} <- HTTP.get(url, [], pool: :media),
|
||||
{:ok, env} <- HTTP.get(url, [], []),
|
||||
{:ok, fifo_path} <- mkfifo(),
|
||||
args = [
|
||||
"-y",
|
||||
|
|
|
@ -66,17 +66,9 @@ def request(method, url, body, headers, options) when is_binary(url) do
|
|||
params = options[:params] || []
|
||||
request = build_request(method, headers, options, url, body, params)
|
||||
|
||||
adapter = Application.get_env(:tesla, :adapter)
|
||||
client = Tesla.client([Tesla.Middleware.FollowRedirects])
|
||||
|
||||
client = Tesla.client(adapter_middlewares(adapter), adapter)
|
||||
|
||||
maybe_limit(
|
||||
fn ->
|
||||
request(client, request)
|
||||
end,
|
||||
adapter,
|
||||
adapter_opts
|
||||
)
|
||||
request(client, request)
|
||||
end
|
||||
|
||||
@spec request(Client.t(), keyword()) :: {:ok, Env.t()} | {:error, any()}
|
||||
|
@ -92,19 +84,4 @@ defp build_request(method, headers, options, url, body, params) do
|
|||
|> Builder.add_param(:query, :query, params)
|
||||
|> Builder.convert_to_keyword()
|
||||
end
|
||||
|
||||
@prefix Pleroma.Gun.ConnectionPool
|
||||
defp maybe_limit(fun, Tesla.Adapter.Gun, opts) do
|
||||
ConcurrentLimiter.limit(:"#{@prefix}.#{opts[:pool] || :default}", fun)
|
||||
end
|
||||
|
||||
defp maybe_limit(fun, _, _) do
|
||||
fun.()
|
||||
end
|
||||
|
||||
defp adapter_middlewares(Tesla.Adapter.Gun) do
|
||||
[Tesla.Middleware.FollowRedirects, Pleroma.Tesla.Middleware.ConnectionPool]
|
||||
end
|
||||
|
||||
defp adapter_middlewares(_), do: []
|
||||
end
|
||||
|
|
|
@ -6,7 +6,7 @@ defmodule Pleroma.HTTP.AdapterHelper do
|
|||
@moduledoc """
|
||||
Configure Tesla.Client with default and customized adapter options.
|
||||
"""
|
||||
@defaults [pool: :federation, connect_timeout: 5_000, recv_timeout: 5_000]
|
||||
@defaults [name: MyFinch, connect_timeout: 5_000, recv_timeout: 5_000]
|
||||
|
||||
@type proxy_type() :: :socks4 | :socks5
|
||||
@type host() :: charlist() | :inet.ip_address()
|
||||
|
@ -43,17 +43,7 @@ def maybe_add_proxy(opts, proxy), do: Keyword.put_new(opts, :proxy, proxy)
|
|||
def options(%URI{} = uri, opts \\ []) do
|
||||
@defaults
|
||||
|> Keyword.merge(opts)
|
||||
|> adapter_helper().options(uri)
|
||||
end
|
||||
|
||||
defp adapter, do: Application.get_env(:tesla, :adapter)
|
||||
|
||||
defp adapter_helper do
|
||||
case adapter() do
|
||||
Tesla.Adapter.Gun -> AdapterHelper.Gun
|
||||
Tesla.Adapter.Hackney -> AdapterHelper.Hackney
|
||||
_ -> AdapterHelper.Default
|
||||
end
|
||||
|> AdapterHelper.Default.options(uri)
|
||||
end
|
||||
|
||||
@spec parse_proxy(String.t() | tuple() | nil) ::
|
||||
|
|
|
@ -11,8 +11,6 @@ defmodule Pleroma.HTTP.ExAws do
|
|||
|
||||
@impl true
|
||||
def request(method, url, body \\ "", headers \\ [], http_opts \\ []) do
|
||||
http_opts = Keyword.put_new(http_opts, :pool, :upload)
|
||||
|
||||
case HTTP.request(method, url, body, headers, http_opts) do
|
||||
{:ok, env} ->
|
||||
{:ok, %{status_code: env.status, headers: env.headers, body: env.body}}
|
||||
|
|
|
@ -11,8 +11,6 @@ defmodule Pleroma.HTTP.Tzdata do
|
|||
|
||||
@impl true
|
||||
def get(url, headers, options) do
|
||||
options = Keyword.put_new(options, :pool, :default)
|
||||
|
||||
with {:ok, %Tesla.Env{} = env} <- HTTP.get(url, headers, options) do
|
||||
{:ok, {env.status, env.headers, env.body}}
|
||||
end
|
||||
|
@ -20,8 +18,6 @@ def get(url, headers, options) do
|
|||
|
||||
@impl true
|
||||
def head(url, headers, options) do
|
||||
options = Keyword.put_new(options, :pool, :default)
|
||||
|
||||
with {:ok, %Tesla.Env{} = env} <- HTTP.head(url, headers, options) do
|
||||
{:ok, {env.status, env.headers}}
|
||||
end
|
||||
|
|
|
@ -170,7 +170,7 @@ defp scrape_favicon(%URI{} = instance_uri) do
|
|||
try do
|
||||
with {_, true} <- {:reachable, reachable?(instance_uri.host)},
|
||||
{:ok, %Tesla.Env{body: html}} <-
|
||||
Pleroma.HTTP.get(to_string(instance_uri), [{"accept", "text/html"}], pool: :media),
|
||||
Pleroma.HTTP.get(to_string(instance_uri), [{"accept", "text/html"}], []),
|
||||
{_, [favicon_rel | _]} when is_binary(favicon_rel) <-
|
||||
{:parse,
|
||||
html |> Floki.parse_document!() |> Floki.attribute("link[rel=icon]", "href")},
|
||||
|
|
|
@ -59,11 +59,7 @@ def default_cache_control_header, do: @default_cache_control_header
|
|||
|
||||
* `req_headers`, `resp_headers` additional headers.
|
||||
|
||||
* `http`: options for [hackney](https://github.com/benoitc/hackney) or [gun](https://github.com/ninenines/gun).
|
||||
|
||||
"""
|
||||
@default_options [pool: :media]
|
||||
|
||||
@inline_content_types [
|
||||
"image/gif",
|
||||
"image/jpeg",
|
||||
|
@ -94,7 +90,7 @@ def default_cache_control_header, do: @default_cache_control_header
|
|||
def call(_conn, _url, _opts \\ [])
|
||||
|
||||
def call(conn = %{method: method}, url, opts) when method in @methods do
|
||||
client_opts = Keyword.merge(@default_options, Keyword.get(opts, :http, []))
|
||||
client_opts = Keyword.get(opts, :http, [])
|
||||
|
||||
req_headers = build_req_headers(conn.req_headers, opts)
|
||||
|
||||
|
@ -106,32 +102,39 @@ def call(conn = %{method: method}, url, opts) when method in @methods do
|
|||
end
|
||||
|
||||
with {:ok, nil} <- @cachex.get(:failed_proxy_url_cache, url),
|
||||
{:ok, code, headers, client} <- request(method, url, req_headers, client_opts),
|
||||
{:ok, status, headers, body} <- request(method, url, req_headers, client_opts),
|
||||
:ok <-
|
||||
header_length_constraint(
|
||||
headers,
|
||||
Keyword.get(opts, :max_body_length, @max_body_length)
|
||||
) do
|
||||
response(conn, client, url, code, headers, opts)
|
||||
conn
|
||||
|> put_private(:proxied_url, url)
|
||||
|> response(body, status, headers, opts)
|
||||
else
|
||||
{:ok, true} ->
|
||||
conn
|
||||
|> error_or_redirect(url, 500, "Request failed", opts)
|
||||
|> error_or_redirect(500, "Request failed", opts)
|
||||
|> halt()
|
||||
|
||||
{:ok, code, headers} ->
|
||||
head_response(conn, url, code, headers, opts)
|
||||
{:ok, status, headers} ->
|
||||
conn
|
||||
|> put_private(:proxied_url, url)
|
||||
|> head_response(status, headers, opts)
|
||||
|> halt()
|
||||
|
||||
{:error, {:invalid_http_response, code}} ->
|
||||
Logger.error("#{__MODULE__}: request to #{inspect(url)} failed with HTTP status #{code}")
|
||||
track_failed_url(url, code, opts)
|
||||
{:error, {:invalid_http_response, status}} ->
|
||||
Logger.error(
|
||||
"#{__MODULE__}: request to #{inspect(url)} failed with HTTP status #{status}"
|
||||
)
|
||||
|
||||
track_failed_url(url, status, opts)
|
||||
|
||||
conn
|
||||
|> put_private(:proxied_url, url)
|
||||
|> error_or_redirect(
|
||||
url,
|
||||
code,
|
||||
"Request failed: " <> Plug.Conn.Status.reason_phrase(code),
|
||||
status,
|
||||
"Request failed: " <> Plug.Conn.Status.reason_phrase(status),
|
||||
opts
|
||||
)
|
||||
|> halt()
|
||||
|
@ -141,7 +144,8 @@ def call(conn = %{method: method}, url, opts) when method in @methods do
|
|||
track_failed_url(url, error, opts)
|
||||
|
||||
conn
|
||||
|> error_or_redirect(url, 500, "Request failed", opts)
|
||||
|> put_private(:proxied_url, url)
|
||||
|> error_or_redirect(500, "Request failed", opts)
|
||||
|> halt()
|
||||
end
|
||||
end
|
||||
|
@ -156,93 +160,48 @@ defp request(method, url, headers, opts) do
|
|||
Logger.debug("#{__MODULE__} #{method} #{url} #{inspect(headers)}")
|
||||
method = method |> String.downcase() |> String.to_existing_atom()
|
||||
|
||||
case client().request(method, url, headers, "", opts) do
|
||||
{:ok, code, headers, client} when code in @valid_resp_codes ->
|
||||
{:ok, code, downcase_headers(headers), client}
|
||||
opts = opts ++ [receive_timeout: @max_read_duration]
|
||||
|
||||
{:ok, code, headers} when code in @valid_resp_codes ->
|
||||
{:ok, code, downcase_headers(headers)}
|
||||
case Pleroma.HTTP.request(method, url, "", headers, opts) do
|
||||
{:ok, %Tesla.Env{status: status, headers: headers, body: body}}
|
||||
when status in @valid_resp_codes ->
|
||||
{:ok, status, downcase_headers(headers), body}
|
||||
|
||||
{:ok, code, _, _} ->
|
||||
{:error, {:invalid_http_response, code}}
|
||||
{:ok, %Tesla.Env{status: status, headers: headers}} when status in @valid_resp_codes ->
|
||||
{:ok, status, downcase_headers(headers)}
|
||||
|
||||
{:ok, code, _} ->
|
||||
{:error, {:invalid_http_response, code}}
|
||||
{:ok, %Tesla.Env{status: status}} ->
|
||||
{:error, {:invalid_http_response, status}}
|
||||
|
||||
{:error, error} ->
|
||||
{:error, error}
|
||||
end
|
||||
end
|
||||
|
||||
defp response(conn, client, url, status, headers, opts) do
|
||||
Logger.debug("#{__MODULE__} #{status} #{url} #{inspect(headers)}")
|
||||
|
||||
result =
|
||||
conn
|
||||
|> put_resp_headers(build_resp_headers(headers, opts))
|
||||
|> send_chunked(status)
|
||||
|> chunk_reply(client, opts)
|
||||
|
||||
case result do
|
||||
{:ok, conn} ->
|
||||
halt(conn)
|
||||
|
||||
{:error, :closed, conn} ->
|
||||
client().close(client)
|
||||
halt(conn)
|
||||
|
||||
{:error, error, conn} ->
|
||||
Logger.warn(
|
||||
"#{__MODULE__} request to #{url} failed while reading/chunking: #{inspect(error)}"
|
||||
)
|
||||
|
||||
client().close(client)
|
||||
halt(conn)
|
||||
end
|
||||
end
|
||||
|
||||
defp chunk_reply(conn, client, opts) do
|
||||
chunk_reply(conn, client, opts, 0, 0)
|
||||
end
|
||||
|
||||
defp chunk_reply(conn, client, opts, sent_so_far, duration) do
|
||||
with {:ok, duration} <-
|
||||
check_read_duration(
|
||||
duration,
|
||||
Keyword.get(opts, :max_read_duration, @max_read_duration)
|
||||
),
|
||||
{:ok, data, client} <- client().stream_body(client),
|
||||
{:ok, duration} <- increase_read_duration(duration),
|
||||
sent_so_far = sent_so_far + byte_size(data),
|
||||
:ok <-
|
||||
body_size_constraint(
|
||||
sent_so_far,
|
||||
Keyword.get(opts, :max_body_length, @max_body_length)
|
||||
),
|
||||
{:ok, conn} <- chunk(conn, data) do
|
||||
chunk_reply(conn, client, opts, sent_so_far, duration)
|
||||
else
|
||||
:done -> {:ok, conn}
|
||||
{:error, error} -> {:error, error, conn}
|
||||
end
|
||||
end
|
||||
|
||||
defp head_response(conn, url, code, headers, opts) do
|
||||
Logger.debug("#{__MODULE__} #{code} #{url} #{inspect(headers)}")
|
||||
defp response(conn, body, status, headers, opts) do
|
||||
Logger.debug("#{__MODULE__} #{status} #{conn.private[:proxied_url]} #{inspect(headers)}")
|
||||
|
||||
conn
|
||||
|> put_resp_headers(build_resp_headers(headers, opts))
|
||||
|> send_resp(code, "")
|
||||
|> send_resp(status, body)
|
||||
end
|
||||
|
||||
defp error_or_redirect(conn, url, code, body, opts) do
|
||||
defp head_response(conn, status, headers, opts) do
|
||||
Logger.debug("#{__MODULE__} #{status} #{conn.private[:proxied_url]} #{inspect(headers)}")
|
||||
|
||||
conn
|
||||
|> put_resp_headers(build_resp_headers(headers, opts))
|
||||
|> send_resp(status, "")
|
||||
end
|
||||
|
||||
defp error_or_redirect(conn, status, body, opts) do
|
||||
if Keyword.get(opts, :redirect_on_failure, false) do
|
||||
conn
|
||||
|> Phoenix.Controller.redirect(external: url)
|
||||
|> Phoenix.Controller.redirect(external: conn.private[:proxied_url])
|
||||
|> halt()
|
||||
else
|
||||
conn
|
||||
|> send_resp(code, body)
|
||||
|> send_resp(status, body)
|
||||
|> halt
|
||||
end
|
||||
end
|
||||
|
@ -382,37 +341,6 @@ defp header_length_constraint(headers, limit) when is_integer(limit) and limit >
|
|||
|
||||
defp header_length_constraint(_, _), do: :ok
|
||||
|
||||
defp body_size_constraint(size, limit) when is_integer(limit) and limit > 0 and size >= limit do
|
||||
{:error, :body_too_large}
|
||||
end
|
||||
|
||||
defp body_size_constraint(_, _), do: :ok
|
||||
|
||||
defp check_read_duration(nil = _duration, max), do: check_read_duration(@max_read_duration, max)
|
||||
|
||||
defp check_read_duration(duration, max)
|
||||
when is_integer(duration) and is_integer(max) and max > 0 do
|
||||
if duration > max do
|
||||
{:error, :read_duration_exceeded}
|
||||
else
|
||||
{:ok, {duration, :erlang.system_time(:millisecond)}}
|
||||
end
|
||||
end
|
||||
|
||||
defp check_read_duration(_, _), do: {:ok, :no_duration_limit, :no_duration_limit}
|
||||
|
||||
defp increase_read_duration({previous_duration, started})
|
||||
when is_integer(previous_duration) and is_integer(started) do
|
||||
duration = :erlang.system_time(:millisecond) - started
|
||||
{:ok, previous_duration + duration}
|
||||
end
|
||||
|
||||
defp increase_read_duration(_) do
|
||||
{:ok, :no_duration_limit, :no_duration_limit}
|
||||
end
|
||||
|
||||
defp client, do: Pleroma.ReverseProxy.Client.Wrapper
|
||||
|
||||
defp track_failed_url(url, error, opts) do
|
||||
ttl =
|
||||
unless error in [:body_too_large, 400, 204] do
|
||||
|
|
|
@ -8,11 +8,7 @@ defmodule Pleroma.Telemetry.Logger do
|
|||
require Logger
|
||||
|
||||
@events [
|
||||
[:pleroma, :connection_pool, :reclaim, :start],
|
||||
[:pleroma, :connection_pool, :reclaim, :stop],
|
||||
[:pleroma, :connection_pool, :provision_failure],
|
||||
[:pleroma, :connection_pool, :client, :dead],
|
||||
[:pleroma, :connection_pool, :client, :add]
|
||||
[:pleroma, :repo, :query]
|
||||
]
|
||||
def attach do
|
||||
:telemetry.attach_many(
|
||||
|
@ -28,68 +24,62 @@ def attach do
|
|||
# out anyway due to higher log level configured
|
||||
|
||||
def handle_event(
|
||||
[:pleroma, :connection_pool, :reclaim, :start],
|
||||
_,
|
||||
%{max_connections: max_connections, reclaim_max: reclaim_max},
|
||||
_
|
||||
[:pleroma, :repo, :query] = _name,
|
||||
%{query_time: query_time} = measurements,
|
||||
%{source: source} = metadata,
|
||||
config
|
||||
) do
|
||||
Logger.debug(fn ->
|
||||
"Connection pool is exhausted (reached #{max_connections} connections). Starting idle connection cleanup to reclaim as much as #{reclaim_max} connections"
|
||||
end)
|
||||
logging_config = Pleroma.Config.get([:telemetry, :slow_queries_logging], [])
|
||||
|
||||
if logging_config[:enabled] &&
|
||||
logging_config[:min_duration] &&
|
||||
query_time > logging_config[:min_duration] and
|
||||
(is_nil(logging_config[:exclude_sources]) or
|
||||
source not in logging_config[:exclude_sources]) do
|
||||
log_slow_query(measurements, metadata, config)
|
||||
else
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
||||
def handle_event(
|
||||
[:pleroma, :connection_pool, :reclaim, :stop],
|
||||
%{reclaimed_count: 0},
|
||||
_,
|
||||
_
|
||||
) do
|
||||
Logger.error(fn ->
|
||||
"Connection pool failed to reclaim any connections due to all of them being in use. It will have to drop requests for opening connections to new hosts"
|
||||
end)
|
||||
end
|
||||
defp log_slow_query(
|
||||
%{query_time: query_time} = _measurements,
|
||||
%{source: _source, query: query, params: query_params, repo: repo} = _metadata,
|
||||
_config
|
||||
) do
|
||||
sql_explain =
|
||||
with {:ok, %{rows: explain_result_rows}} <-
|
||||
repo.query("EXPLAIN " <> query, query_params, log: false) do
|
||||
Enum.map_join(explain_result_rows, "\n", & &1)
|
||||
end
|
||||
|
||||
def handle_event(
|
||||
[:pleroma, :connection_pool, :reclaim, :stop],
|
||||
%{reclaimed_count: reclaimed_count},
|
||||
_,
|
||||
_
|
||||
) do
|
||||
Logger.debug(fn -> "Connection pool cleaned up #{reclaimed_count} idle connections" end)
|
||||
end
|
||||
{:current_stacktrace, stacktrace} = Process.info(self(), :current_stacktrace)
|
||||
|
||||
def handle_event(
|
||||
[:pleroma, :connection_pool, :provision_failure],
|
||||
%{opts: [key | _]},
|
||||
_,
|
||||
_
|
||||
) do
|
||||
Logger.error(fn ->
|
||||
"Connection pool had to refuse opening a connection to #{key} due to connection limit exhaustion"
|
||||
end)
|
||||
end
|
||||
pleroma_stacktrace =
|
||||
Enum.filter(stacktrace, fn
|
||||
{__MODULE__, _, _, _} ->
|
||||
false
|
||||
|
||||
{mod, _, _, _} ->
|
||||
mod
|
||||
|> to_string()
|
||||
|> String.starts_with?("Elixir.Pleroma.")
|
||||
end)
|
||||
|
||||
def handle_event(
|
||||
[:pleroma, :connection_pool, :client, :dead],
|
||||
%{client_pid: client_pid, reason: reason},
|
||||
%{key: key},
|
||||
_
|
||||
) do
|
||||
Logger.warn(fn ->
|
||||
"Pool worker for #{key}: Client #{inspect(client_pid)} died before releasing the connection with #{inspect(reason)}"
|
||||
"""
|
||||
Slow query!
|
||||
|
||||
Total time: #{round(query_time / 1_000)} ms
|
||||
|
||||
#{query}
|
||||
|
||||
#{inspect(query_params, limit: :infinity)}
|
||||
|
||||
#{sql_explain}
|
||||
|
||||
#{Exception.format_stacktrace(pleroma_stacktrace)}
|
||||
"""
|
||||
end)
|
||||
end
|
||||
|
||||
def handle_event(
|
||||
[:pleroma, :connection_pool, :client, :add],
|
||||
%{clients: [_, _ | _] = clients},
|
||||
%{key: key, protocol: :http},
|
||||
_
|
||||
) do
|
||||
Logger.info(fn ->
|
||||
"Pool worker for #{key}: #{length(clients)} clients are using an HTTP1 connection at the same time, head-of-line blocking might occur."
|
||||
end)
|
||||
end
|
||||
|
||||
def handle_event([:pleroma, :connection_pool, :client, :add], _, _, _), do: :ok
|
||||
end
|
||||
|
|
|
@ -30,23 +30,12 @@ def put_file(%Pleroma.Upload{} = upload) do
|
|||
|
||||
op =
|
||||
if streaming do
|
||||
op =
|
||||
upload.tempfile
|
||||
|> ExAws.S3.Upload.stream_file()
|
||||
|> ExAws.S3.upload(bucket, s3_name, [
|
||||
{:acl, :public_read},
|
||||
{:content_type, upload.content_type}
|
||||
])
|
||||
|
||||
if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do
|
||||
# set s3 upload timeout to respect :upload pool timeout
|
||||
# timeout should be slightly larger, so s3 can retry upload on fail
|
||||
timeout = Pleroma.HTTP.AdapterHelper.Gun.pool_timeout(:upload) + 1_000
|
||||
opts = Keyword.put(op.opts, :timeout, timeout)
|
||||
Map.put(op, :opts, opts)
|
||||
else
|
||||
op
|
||||
end
|
||||
upload.tempfile
|
||||
|> ExAws.S3.Upload.stream_file()
|
||||
|> ExAws.S3.upload(bucket, s3_name, [
|
||||
{:acl, :public_read},
|
||||
{:content_type, upload.content_type}
|
||||
])
|
||||
else
|
||||
{:ok, file_data} = File.read(upload.tempfile)
|
||||
|
||||
|
|
|
@ -12,7 +12,6 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
|
|||
require Logger
|
||||
|
||||
@adapter_options [
|
||||
pool: :media,
|
||||
recv_timeout: 10_000
|
||||
]
|
||||
|
||||
|
|
|
@ -54,7 +54,7 @@ defp handle_preview(conn, url) do
|
|||
media_proxy_url = MediaProxy.url(url)
|
||||
|
||||
with {:ok, %{status: status} = head_response} when status in 200..299 <-
|
||||
Pleroma.HTTP.request("head", media_proxy_url, [], [], pool: :media) do
|
||||
Pleroma.HTTP.request("head", media_proxy_url, [], [], name: MyFinch) do
|
||||
content_type = Tesla.get_header(head_response, "content-type")
|
||||
content_length = Tesla.get_header(head_response, "content-length")
|
||||
content_length = content_length && String.to_integer(content_length)
|
||||
|
|
|
@ -89,8 +89,7 @@ defp get_media(conn, {:static_dir, directory}, _, opts) do
|
|||
defp get_media(conn, {:url, url}, true, _) do
|
||||
proxy_opts = [
|
||||
http: [
|
||||
follow_redirect: true,
|
||||
pool: :upload
|
||||
follow_redirect: true
|
||||
]
|
||||
]
|
||||
|
||||
|
|
|
@ -4,7 +4,6 @@
|
|||
|
||||
defmodule Pleroma.Web.RelMe do
|
||||
@options [
|
||||
pool: :media,
|
||||
max_body: 2_000_000,
|
||||
recv_timeout: 2_000
|
||||
]
|
||||
|
|
|
@ -10,7 +10,6 @@ defmodule Pleroma.Web.RichMedia.Helpers do
|
|||
alias Pleroma.Web.RichMedia.Parser
|
||||
|
||||
@options [
|
||||
pool: :media,
|
||||
max_body: 2_000_000,
|
||||
recv_timeout: 2_000
|
||||
]
|
||||
|
|
1
mix.exs
1
mix.exs
|
@ -215,7 +215,6 @@ defp deps do
|
|||
{:mock, "~> 0.3.5", only: :test},
|
||||
# temporary downgrade for excoveralls, hackney until hackney max_connections bug will be fixed
|
||||
{:excoveralls, "0.12.3", only: :test},
|
||||
{:hackney, "~> 1.18.0", override: true},
|
||||
{:mox, "~> 1.0", only: :test},
|
||||
{:websocket_client, git: "https://github.com/jeremyong/websocket_client.git", only: :test}
|
||||
] ++ oauth_deps()
|
||||
|
|
|
@ -280,50 +280,6 @@ test "check_uploders_s3_public_endpoint/0" do
|
|||
"Your config is using the old setting for controlling the URL of media uploaded to your S3 bucket."
|
||||
end
|
||||
|
||||
describe "check_gun_pool_options/0" do
|
||||
test "await_up_timeout" do
|
||||
config = Config.get(:connections_pool)
|
||||
clear_config(:connections_pool, Keyword.put(config, :await_up_timeout, 5_000))
|
||||
|
||||
assert capture_log(fn ->
|
||||
DeprecationWarnings.check_gun_pool_options()
|
||||
end) =~
|
||||
"Your config is using old setting `config :pleroma, :connections_pool, await_up_timeout`."
|
||||
end
|
||||
|
||||
test "pool timeout" do
|
||||
old_config = [
|
||||
federation: [
|
||||
size: 50,
|
||||
max_waiting: 10,
|
||||
timeout: 10_000
|
||||
],
|
||||
media: [
|
||||
size: 50,
|
||||
max_waiting: 10,
|
||||
timeout: 10_000
|
||||
],
|
||||
upload: [
|
||||
size: 25,
|
||||
max_waiting: 5,
|
||||
timeout: 15_000
|
||||
],
|
||||
default: [
|
||||
size: 10,
|
||||
max_waiting: 2,
|
||||
timeout: 5_000
|
||||
]
|
||||
]
|
||||
|
||||
clear_config(:pools, old_config)
|
||||
|
||||
assert capture_log(fn ->
|
||||
DeprecationWarnings.check_gun_pool_options()
|
||||
end) =~
|
||||
"Your config is using old setting name `timeout` instead of `recv_timeout` in pool settings"
|
||||
end
|
||||
end
|
||||
|
||||
test "check_old_chat_shoutbox/0" do
|
||||
clear_config([:instance, :chat_limit], 1_000)
|
||||
clear_config([:chat, :enabled], true)
|
||||
|
|
|
@ -8,44 +8,10 @@ defmodule Pleroma.ReverseProxyTest do
|
|||
import Mox
|
||||
|
||||
alias Pleroma.ReverseProxy
|
||||
alias Pleroma.ReverseProxy.ClientMock
|
||||
alias Plug.Conn
|
||||
|
||||
setup_all do
|
||||
{:ok, _} = Registry.start_link(keys: :unique, name: ClientMock)
|
||||
:ok
|
||||
end
|
||||
|
||||
setup :verify_on_exit!
|
||||
|
||||
defp request_mock(invokes) do
|
||||
ClientMock
|
||||
|> expect(:request, fn :get, url, headers, _body, _opts ->
|
||||
Registry.register(ClientMock, url, 0)
|
||||
body = headers |> Enum.into(%{}) |> Jason.encode!()
|
||||
|
||||
{:ok, 200,
|
||||
[
|
||||
{"content-type", "application/json"},
|
||||
{"content-length", byte_size(body) |> to_string()}
|
||||
], %{url: url, body: body}}
|
||||
end)
|
||||
|> expect(:stream_body, invokes, fn %{url: url, body: body} = client ->
|
||||
case Registry.lookup(ClientMock, url) do
|
||||
[{_, 0}] ->
|
||||
Registry.update_value(ClientMock, url, &(&1 + 1))
|
||||
{:ok, body, client}
|
||||
|
||||
[{_, 1}] ->
|
||||
Registry.unregister(ClientMock, url)
|
||||
:done
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
describe "reverse proxy" do
|
||||
test "do not track successful request", %{conn: conn} do
|
||||
request_mock(2)
|
||||
url = "/success"
|
||||
|
||||
conn = ReverseProxy.call(conn, url)
|
||||
|
@ -56,8 +22,6 @@ test "do not track successful request", %{conn: conn} do
|
|||
end
|
||||
|
||||
test "use Pleroma's user agent in the request; don't pass the client's", %{conn: conn} do
|
||||
request_mock(2)
|
||||
|
||||
conn =
|
||||
conn
|
||||
|> Plug.Conn.put_req_header("user-agent", "fake/1.0")
|
||||
|
@ -110,8 +74,6 @@ defp stream_mock(invokes, with_close? \\ false) do
|
|||
|
||||
describe "max_body" do
|
||||
test "length returns error if content-length more than option", %{conn: conn} do
|
||||
request_mock(0)
|
||||
|
||||
assert capture_log(fn ->
|
||||
ReverseProxy.call(conn, "/huge-file", max_body_length: 4)
|
||||
end) =~
|
||||
|
|
|
@ -7,9 +7,6 @@
|
|||
|
||||
Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, :manual)
|
||||
|
||||
Mox.defmock(Pleroma.ReverseProxy.ClientMock, for: Pleroma.ReverseProxy.Client)
|
||||
Mox.defmock(Pleroma.GunMock, for: Pleroma.Gun)
|
||||
|
||||
{:ok, _} = Application.ensure_all_started(:ex_machina)
|
||||
|
||||
ExUnit.after_suite(fn _results ->
|
||||
|
|
Loading…
Reference in a new issue