Use finch everywhere (#33)
ci/woodpecker/push/lint Pipeline failed Details
ci/woodpecker/push/test unknown status Details
ci/woodpecker/push/release Pipeline was successful Details

Reviewed-on: #33
pull/47/head
floatingghost 5 months ago
parent 058bf96798
commit 364b6969eb

@ -70,8 +70,6 @@ config :pleroma, :rate_limit,
config :pleroma, :http_security, report_uri: "https://endpoint.com"
config :pleroma, :http, send_user_agent: false
rum_enabled = System.get_env("RUM_ENABLED") == "true"
config :pleroma, :database, rum_enabled: rum_enabled
IO.puts("RUM enabled: #{rum_enabled}")

@ -175,12 +175,11 @@ config :mime, :types, %{
"application/ld+json" => ["activity+json"]
}
config :tesla, adapter: Tesla.Adapter.Hackney
config :tesla, :adapter, {Tesla.Adapter.Finch, name: MyFinch}
# Configures http settings, upstream proxy etc.
config :pleroma, :http,
proxy_url: nil,
send_user_agent: true,
user_agent: :default,
adapter: []
@ -440,11 +439,7 @@ config :pleroma, :media_proxy,
redirect_on_failure: false,
max_body_length: 25 * 1_048_576,
# Note: max_read_duration defaults to Pleroma.ReverseProxy.max_read_duration_default/1
max_read_duration: 30_000,
http: [
follow_redirect: true,
pool: :media
]
max_read_duration: 30_000
],
whitelist: []
@ -765,51 +760,6 @@ config :pleroma, Pleroma.Repo,
parameters: [gin_fuzzy_search_limit: "500"],
prepare: :unnamed
config :pleroma, :connections_pool,
reclaim_multiplier: 0.1,
connection_acquisition_wait: 250,
connection_acquisition_retries: 5,
max_connections: 250,
max_idle_time: 30_000,
retry: 0,
connect_timeout: 5_000
config :pleroma, :pools,
federation: [
size: 50,
max_waiting: 10,
recv_timeout: 10_000
],
media: [
size: 50,
max_waiting: 20,
recv_timeout: 15_000
],
upload: [
size: 25,
max_waiting: 5,
recv_timeout: 15_000
],
default: [
size: 10,
max_waiting: 2,
recv_timeout: 5_000
]
config :pleroma, :hackney_pools,
federation: [
max_connections: 50,
timeout: 150_000
],
media: [
max_connections: 50,
timeout: 150_000
],
upload: [
max_connections: 25,
timeout: 300_000
]
config :pleroma, :majic_pool, size: 2
private_instance? = :if_instance_is_private
@ -826,8 +776,6 @@ config :pleroma, :mrf,
transparency: true,
transparency_exclusions: []
config :tzdata, :http_client, Pleroma.HTTP.Tzdata
config :ex_aws, http_client: Pleroma.HTTP.ExAws
config :web_push_encryption, http_client: Pleroma.HTTP.WebPush

@ -2625,10 +2625,6 @@ config :pleroma, :config_description, [
description: "Proxy URL",
suggestions: ["localhost:9020", {:socks5, :localhost, 3090}]
},
%{
key: :send_user_agent,
type: :boolean
},
%{
key: :user_agent,
type: [:string, :atom],
@ -2941,147 +2937,6 @@ config :pleroma, :config_description, [
}
]
},
%{
group: :pleroma,
key: :connections_pool,
type: :group,
description: "Advanced settings for `Gun` connections pool",
children: [
%{
key: :connection_acquisition_wait,
type: :integer,
description:
"Timeout to acquire a connection from pool. The total max time is this value multiplied by the number of retries. Default: 250ms.",
suggestions: [250]
},
%{
key: :connection_acquisition_retries,
type: :integer,
description:
"Number of attempts to acquire the connection from the pool if it is overloaded. Default: 5",
suggestions: [5]
},
%{
key: :max_connections,
type: :integer,
description: "Maximum number of connections in the pool. Default: 250 connections.",
suggestions: [250]
},
%{
key: :connect_timeout,
type: :integer,
description: "Timeout while `gun` will wait until connection is up. Default: 5000ms.",
suggestions: [5000]
},
%{
key: :reclaim_multiplier,
type: :integer,
description:
"Multiplier for the number of idle connection to be reclaimed if the pool is full. For example if the pool maxes out at 250 connections and this setting is set to 0.3, the pool will reclaim at most 75 idle connections if it's overloaded. Default: 0.1",
suggestions: [0.1]
}
]
},
%{
group: :pleroma,
key: :pools,
type: :group,
description: "Advanced settings for `Gun` workers pools",
children:
Enum.map([:federation, :media, :upload, :default], fn pool_name ->
%{
key: pool_name,
type: :keyword,
description: "Settings for #{pool_name} pool.",
children: [
%{
key: :size,
type: :integer,
description: "Maximum number of concurrent requests in the pool.",
suggestions: [50]
},
%{
key: :max_waiting,
type: :integer,
description:
"Maximum number of requests waiting for other requests to finish. After this number is reached, the pool will start returning errrors when a new request is made",
suggestions: [10]
},
%{
key: :recv_timeout,
type: :integer,
description: "Timeout for the pool while gun will wait for response",
suggestions: [10_000]
}
]
}
end)
},
%{
group: :pleroma,
key: :hackney_pools,
type: :group,
description: "Advanced settings for `Hackney` connections pools",
children: [
%{
key: :federation,
type: :keyword,
description: "Settings for federation pool.",
children: [
%{
key: :max_connections,
type: :integer,
description: "Number workers in the pool.",
suggestions: [50]
},
%{
key: :timeout,
type: :integer,
description: "Timeout while `hackney` will wait for response.",
suggestions: [150_000]
}
]
},
%{
key: :media,
type: :keyword,
description: "Settings for media pool.",
children: [
%{
key: :max_connections,
type: :integer,
description: "Number workers in the pool.",
suggestions: [50]
},
%{
key: :timeout,
type: :integer,
description: "Timeout while `hackney` will wait for response.",
suggestions: [150_000]
}
]
},
%{
key: :upload,
type: :keyword,
description: "Settings for upload pool.",
children: [
%{
key: :max_connections,
type: :integer,
description: "Number workers in the pool.",
suggestions: [25]
},
%{
key: :timeout,
type: :integer,
description: "Timeout while `hackney` will wait for response.",
suggestions: [300_000]
}
]
}
]
},
%{
group: :pleroma,
key: :restrict_unauthenticated,

@ -45,7 +45,7 @@ config :pleroma, Pleroma.Repo,
adapter: Ecto.Adapters.Postgres,
username: "postgres",
password: "postgres",
database: "pleroma_test",
database: "akkoma_test",
hostname: System.get_env("DB_HOST") || "localhost",
pool: Ecto.Adapters.SQL.Sandbox,
pool_size: 50,
@ -104,12 +104,8 @@ IO.puts("RUM enabled: #{rum_enabled}")
config :joken, default_signer: "yU8uHKq+yyAkZ11Hx//jcdacWc8yQ1bxAAGrplzB0Zwwjkp35v0RK9SO8WTPr6QZ"
config :pleroma, Pleroma.ReverseProxy.Client, Pleroma.ReverseProxy.ClientMock
config :pleroma, :modules, runtime_dir: "test/fixtures/modules"
config :pleroma, Pleroma.Gun, Pleroma.GunMock
config :pleroma, Pleroma.Emails.NewUsersDigestEmail, enabled: true
config :pleroma, Pleroma.Web.Plugs.RemoteIp, enabled: false

@ -23,21 +23,14 @@ defmodule Mix.Pleroma do
Pleroma.Config.Oban.warn()
Pleroma.Application.limiters_setup()
Application.put_env(:phoenix, :serve_endpoints, false, persistent: true)
Finch.start_link(name: MyFinch)
unless System.get_env("DEBUG") do
Logger.remove_backend(:console)
end
adapter = Application.get_env(:tesla, :adapter)
apps =
if adapter == Tesla.Adapter.Gun do
[:gun | @apps]
else
[:hackney | @apps]
end
Enum.each(apps, &Application.ensure_all_started/1)
Enum.each(@apps, &Application.ensure_all_started/1)
oban_config = [
crontab: [],
@ -57,7 +50,6 @@ defmodule Mix.Pleroma do
{Majic.Pool,
[name: Pleroma.MajicPool, pool_size: Pleroma.Config.get([:majic_pool, :size], 2)]}
] ++
http_children(adapter) ++
elasticsearch_children()
cachex_children = Enum.map(@cachex_children, &Pleroma.Application.build_cachex(&1, []))
@ -131,13 +123,6 @@ defmodule Mix.Pleroma do
~S(') <> String.replace(path, ~S('), ~S(\')) <> ~S(')
end
defp http_children(Tesla.Adapter.Gun) do
Pleroma.Gun.ConnectionPool.children() ++
[{Task, &Pleroma.HTTP.AdapterHelper.Gun.limiter_setup/0}]
end
defp http_children(_), do: []
def elasticsearch_children do
config = Pleroma.Config.get([Pleroma.Search, :module])

@ -74,40 +74,4 @@ defmodule Mix.Tasks.Pleroma.Benchmark do
inputs: inputs
)
end
def run(["adapters"]) do
start_pleroma()
:ok =
Pleroma.Gun.Conn.open(
"https://httpbin.org/stream-bytes/1500",
:gun_connections
)
Process.sleep(1_500)
Benchee.run(
%{
"Without conn and without pool" => fn ->
{:ok, %Tesla.Env{}} =
Pleroma.HTTP.get("https://httpbin.org/stream-bytes/1500", [],
pool: :no_pool,
receive_conn: false
)
end,
"Without conn and with pool" => fn ->
{:ok, %Tesla.Env{}} =
Pleroma.HTTP.get("https://httpbin.org/stream-bytes/1500", [], receive_conn: false)
end,
"With reused conn and without pool" => fn ->
{:ok, %Tesla.Env{}} =
Pleroma.HTTP.get("https://httpbin.org/stream-bytes/1500", [], pool: :no_pool)
end,
"With reused conn and with pool" => fn ->
{:ok, %Tesla.Env{}} = Pleroma.HTTP.get("https://httpbin.org/stream-bytes/1500")
end
},
parallel: 10
)
end
end

@ -59,34 +59,8 @@ defmodule Pleroma.Application do
Pleroma.Docs.JSON.compile()
limiters_setup()
adapter = Application.get_env(:tesla, :adapter)
if match?({Tesla.Adapter.Finch, _}, adapter) do
Logger.info("Starting Finch")
Finch.start_link(name: MyFinch)
end
if adapter == Tesla.Adapter.Gun do
if version = Pleroma.OTPVersion.version() do
[major, minor] =
version
|> String.split(".")
|> Enum.map(&String.to_integer/1)
|> Enum.take(2)
if (major == 22 and minor < 2) or major < 22 do
raise "
!!!OTP VERSION WARNING!!!
You are using gun adapter with OTP version #{version}, which doesn't support correct handling of unordered certificates chains. Please update your Erlang/OTP to at least 22.2.
"
end
else
raise "
!!!OTP VERSION WARNING!!!
To support correct handling of unordered certificates chains - OTP version must be > 22.2.
"
end
end
Logger.info("Starting Finch")
Finch.start_link(name: MyFinch)
# Define workers and child supervisors to be supervised
children =
@ -97,7 +71,6 @@ defmodule Pleroma.Application do
Pleroma.Web.Plugs.RateLimiter.Supervisor
] ++
cachex_children() ++
http_children(adapter, @mix_env) ++
[
Pleroma.Stats,
Pleroma.JobQueueMonitor,
@ -276,34 +249,6 @@ defmodule Pleroma.Application do
]
end
# start hackney and gun pools in tests
defp http_children(_, :test) do
http_children(Tesla.Adapter.Hackney, nil) ++ http_children(Tesla.Adapter.Gun, nil)
end
defp http_children(Tesla.Adapter.Hackney, _) do
pools = [:federation, :media]
pools =
if Config.get([Pleroma.Upload, :proxy_remote]) do
[:upload | pools]
else
pools
end
for pool <- pools do
options = Config.get([:hackney_pools, pool])
:hackney_pool.child_spec(pool, options)
end
end
defp http_children(Tesla.Adapter.Gun, _) do
Pleroma.Gun.ConnectionPool.children() ++
[{Task, &Pleroma.HTTP.AdapterHelper.Gun.limiter_setup/0}]
end
defp http_children(_, _), do: []
def elasticsearch_children do
config = Config.get([Pleroma.Search, :module])

@ -173,7 +173,6 @@ defmodule Pleroma.Config.DeprecationWarnings do
check_old_mrf_config(),
check_media_proxy_whitelist_config(),
check_welcome_message_config(),
check_gun_pool_options(),
check_activity_expiration_config(),
check_remote_ip_plug_name(),
check_uploders_s3_public_endpoint(),
@ -257,51 +256,6 @@ defmodule Pleroma.Config.DeprecationWarnings do
end
end
def check_gun_pool_options do
pool_config = Config.get(:connections_pool)
if timeout = pool_config[:await_up_timeout] do
Logger.warn("""
!!!DEPRECATION WARNING!!!
Your config is using old setting `config :pleroma, :connections_pool, await_up_timeout`. Please change to `config :pleroma, :connections_pool, connect_timeout` to ensure compatibility with future releases.
""")
Config.put(:connections_pool, Keyword.put_new(pool_config, :connect_timeout, timeout))
end
pools_configs = Config.get(:pools)
warning_preface = """
!!!DEPRECATION WARNING!!!
Your config is using old setting name `timeout` instead of `recv_timeout` in pool settings. Setting should work for now, but you are advised to change format to scheme with port to prevent possible issues later.
"""
updated_config =
Enum.reduce(pools_configs, [], fn {pool_name, config}, acc ->
if timeout = config[:timeout] do
Keyword.put(acc, pool_name, Keyword.put_new(config, :recv_timeout, timeout))
else
acc
end
end)
if updated_config != [] do
pool_warnings =
updated_config
|> Keyword.keys()
|> Enum.map(fn pool_name ->
"\n* `:timeout` options in #{pool_name} pool is now `:recv_timeout`"
end)
Logger.warn(Enum.join([warning_preface | pool_warnings]))
Config.put(:pools, updated_config)
:error
else
:ok
end
end
@spec check_activity_expiration_config() :: :ok | nil
def check_activity_expiration_config do
warning_preface = """

@ -15,14 +15,11 @@ defmodule Pleroma.Config.TransferTask do
defp reboot_time_keys,
do: [
{:pleroma, :hackney_pools},
{:pleroma, :shout},
{:pleroma, Oban},
{:pleroma, :rate_limit},
{:pleroma, :markup},
{:pleroma, :streamer},
{:pleroma, :pools},
{:pleroma, :connections_pool}
{:pleroma, :streamer}
]
defp reboot_time_subkeys,

@ -542,7 +542,7 @@ defmodule Pleroma.Emoji.Pack do
defp http_get(%URI{} = url), do: url |> to_string() |> http_get()
defp http_get(url) do
with {:ok, %{body: body}} <- Pleroma.HTTP.get(url, [], pool: :default) do
with {:ok, %{body: body}} <- Pleroma.HTTP.get(url, [], []) do
Jason.decode(body)
end
end

@ -93,7 +93,7 @@ defmodule Pleroma.Frontend do
url = String.replace(frontend_info["build_url"], "${ref}", frontend_info["ref"])
with {:ok, %{status: 200, body: zip_body}} <-
Pleroma.HTTP.get(url, [], pool: :media, recv_timeout: 120_000) do
Pleroma.HTTP.get(url, [], recv_timeout: 120_000) do
unzip(zip_body, dest)
else
{:error, e} -> {:error, e}

@ -24,7 +24,7 @@ defmodule Pleroma.Helpers.MediaHelper do
def image_resize(url, options) do
with executable when is_binary(executable) <- System.find_executable("convert"),
{:ok, args} <- prepare_image_resize_args(options),
{:ok, env} <- HTTP.get(url, [], pool: :media),
{:ok, env} <- HTTP.get(url, [], []),
{:ok, fifo_path} <- mkfifo() do
args = List.flatten([fifo_path, args])
run_fifo(fifo_path, env, executable, args)
@ -73,7 +73,7 @@ defmodule Pleroma.Helpers.MediaHelper do
# Note: video thumbnail is intentionally not resized (always has original dimensions)
def video_framegrab(url) do
with executable when is_binary(executable) <- System.find_executable("ffmpeg"),
{:ok, env} <- HTTP.get(url, [], pool: :media),
{:ok, env} <- HTTP.get(url, [], []),
{:ok, fifo_path} <- mkfifo(),
args = [
"-y",

@ -66,17 +66,9 @@ defmodule Pleroma.HTTP do
params = options[:params] || []
request = build_request(method, headers, options, url, body, params)
adapter = Application.get_env(:tesla, :adapter)
client = Tesla.client([Tesla.Middleware.FollowRedirects])
client = Tesla.client(adapter_middlewares(adapter), adapter)
maybe_limit(
fn ->
request(client, request)
end,
adapter,
adapter_opts
)
request(client, request)
end
@spec request(Client.t(), keyword()) :: {:ok, Env.t()} | {:error, any()}
@ -92,19 +84,4 @@ defmodule Pleroma.HTTP do
|> Builder.add_param(:query, :query, params)
|> Builder.convert_to_keyword()
end
@prefix Pleroma.Gun.ConnectionPool
defp maybe_limit(fun, Tesla.Adapter.Gun, opts) do
ConcurrentLimiter.limit(:"#{@prefix}.#{opts[:pool] || :default}", fun)
end
defp maybe_limit(fun, _, _) do
fun.()
end
defp adapter_middlewares(Tesla.Adapter.Gun) do
[Tesla.Middleware.FollowRedirects, Pleroma.Tesla.Middleware.ConnectionPool]
end
defp adapter_middlewares(_), do: []
end

@ -6,7 +6,7 @@ defmodule Pleroma.HTTP.AdapterHelper do
@moduledoc """
Configure Tesla.Client with default and customized adapter options.
"""
@defaults [pool: :federation, connect_timeout: 5_000, recv_timeout: 5_000]
@defaults [name: MyFinch, connect_timeout: 5_000, recv_timeout: 5_000]
@type proxy_type() :: :socks4 | :socks5
@type host() :: charlist() | :inet.ip_address()
@ -43,17 +43,7 @@ defmodule Pleroma.HTTP.AdapterHelper do
def options(%URI{} = uri, opts \\ []) do
@defaults
|> Keyword.merge(opts)
|> adapter_helper().options(uri)
end
defp adapter, do: Application.get_env(:tesla, :adapter)
defp adapter_helper do
case adapter() do
Tesla.Adapter.Gun -> AdapterHelper.Gun
Tesla.Adapter.Hackney -> AdapterHelper.Hackney
_ -> AdapterHelper.Default
end
|> AdapterHelper.Default.options(uri)
end
@spec parse_proxy(String.t() | tuple() | nil) ::

@ -11,8 +11,6 @@ defmodule Pleroma.HTTP.ExAws do
@impl true
def request(method, url, body \\ "", headers \\ [], http_opts \\ []) do
http_opts = Keyword.put_new(http_opts, :pool, :upload)
case HTTP.request(method, url, body, headers, http_opts) do
{:ok, env} ->
{:ok, %{status_code: env.status, headers: env.headers, body: env.body}}

@ -10,6 +10,8 @@ defmodule Pleroma.HTTP.RequestBuilder do
alias Pleroma.HTTP.Request
alias Tesla.Multipart
@mix_env Mix.env()
@doc """
Creates new request
"""
@ -33,14 +35,7 @@ defmodule Pleroma.HTTP.RequestBuilder do
"""
@spec headers(Request.t(), Request.headers()) :: Request.t()
def headers(request, headers) do
headers_list =
with true <- Pleroma.Config.get([:http, :send_user_agent]),
nil <- Enum.find(headers, fn {key, _val} -> String.downcase(key) == "user-agent" end) do
[{"user-agent", Pleroma.Application.user_agent()} | headers]
else
_ ->
headers
end
headers_list = maybe_add_user_agent(headers, @mix_env)
%{request | headers: headers_list}
end
@ -92,4 +87,16 @@ defmodule Pleroma.HTTP.RequestBuilder do
|> Map.from_struct()
|> Enum.into([])
end
defp maybe_add_user_agent(headers, :test) do
with true <- Pleroma.Config.get([:http, :send_user_agent]) do
[{"user-agent", Pleroma.Application.user_agent()} | headers]
else
_ ->
headers
end
end
defp maybe_add_user_agent(headers, _),
do: [{"user-agent", Pleroma.Application.user_agent()} | headers]
end

@ -11,8 +11,6 @@ defmodule Pleroma.HTTP.Tzdata do
@impl true
def get(url, headers, options) do
options = Keyword.put_new(options, :pool, :default)
with {:ok, %Tesla.Env{} = env} <- HTTP.get(url, headers, options) do
{:ok, {env.status, env.headers, env.body}}
end
@ -20,8 +18,6 @@ defmodule Pleroma.HTTP.Tzdata do
@impl true
def head(url, headers, options) do
options = Keyword.put_new(options, :pool, :default)
with {:ok, %Tesla.Env{} = env} <- HTTP.head(url, headers, options) do
{:ok, {env.status, env.headers}}
end

@ -170,7 +170,7 @@ defmodule Pleroma.Instances.Instance do
try do
with {_, true} <- {:reachable, reachable?(instance_uri.host)},
{:ok, %Tesla.Env{body: html}} <-
Pleroma.HTTP.get(to_string(instance_uri), [{"accept", "text/html"}], pool: :media),
Pleroma.HTTP.get(to_string(instance_uri), [{"accept", "text/html"}], []),
{_, [favicon_rel | _]} when is_binary(favicon_rel) <-
{:parse,
html |> Floki.parse_document!() |> Floki.attribute("link[rel=icon]", "href")},

@ -9,7 +9,7 @@ defmodule Pleroma.ReverseProxy do
@resp_cache_headers ~w(etag date last-modified)
@keep_resp_headers @resp_cache_headers ++
~w(content-length content-type content-disposition content-encoding) ++
~w(content-range accept-ranges vary)
~w(content-range accept-ranges vary expires)
@default_cache_control_header "public, max-age=1209600"
@valid_resp_codes [200, 206, 304]
@max_read_duration :timer.seconds(30)
@ -59,11 +59,7 @@ defmodule Pleroma.ReverseProxy do
* `req_headers`, `resp_headers` additional headers.
* `http`: options for [hackney](https://github.com/benoitc/hackney) or [gun](https://github.com/ninenines/gun).
"""
@default_options [pool: :media]
@inline_content_types [
"image/gif",
"image/jpeg",
@ -94,7 +90,7 @@ defmodule Pleroma.ReverseProxy do
def call(_conn, _url, _opts \\ [])
def call(conn = %{method: method}, url, opts) when method in @methods do
client_opts = Keyword.merge(@default_options, Keyword.get(opts, :http, []))
client_opts = Keyword.get(opts, :http, [])
req_headers = build_req_headers(conn.req_headers, opts)
@ -106,32 +102,39 @@ defmodule Pleroma.ReverseProxy do
end
with {:ok, nil} <- @cachex.get(:failed_proxy_url_cache, url),
{:ok, code, headers, client} <- request(method, url, req_headers, client_opts),
{:ok, status, headers, body} <- request(method, url, req_headers, client_opts),
:ok <-
header_length_constraint(
headers,
Keyword.get(opts, :max_body_length, @max_body_length)
) do
response(conn, client, url, code, headers, opts)
conn
|> put_private(:proxied_url, url)
|> response(body, status, headers, opts)
else
{:ok, true} ->
conn
|> error_or_redirect(url, 500, "Request failed", opts)
|> error_or_redirect(500, "Request failed", opts)
|> halt()
{:ok, code, headers} ->
head_response(conn, url, code, headers, opts)
{:ok, status, headers} ->
conn
|> put_private(:proxied_url, url)
|> head_response(status, headers, opts)
|> halt()
{:error, {:invalid_http_response, code}} ->
Logger.error("#{__MODULE__}: request to #{inspect(url)} failed with HTTP status #{code}")
track_failed_url(url, code, opts)
{:error, {:invalid_http_response, status}} ->
Logger.error(
"#{__MODULE__}: request to #{inspect(url)} failed with HTTP status #{status}"
)
track_failed_url(url, status, opts)
conn
|> put_private(:proxied_url, url)
|> error_or_redirect(
url,
code,
"Request failed: " <> Plug.Conn.Status.reason_phrase(code),
status,
"Request failed: " <> Plug.Conn.Status.reason_phrase(status),
opts
)
|> halt()
@ -141,7 +144,8 @@ defmodule Pleroma.ReverseProxy do
track_failed_url(url, error, opts)
conn
|> error_or_redirect(url, 500, "Request failed", opts)
|> put_private(:proxied_url, url)
|> error_or_redirect(500, "Request failed", opts)
|> halt()
end
end
@ -156,93 +160,48 @@ defmodule Pleroma.ReverseProxy do
Logger.debug("#{__MODULE__} #{method} #{url} #{inspect(headers)}")
method = method |> String.downcase() |> String.to_existing_atom()
case client().request(method, url, headers, "", opts) do
{:ok, code, headers, client} when code in @valid_resp_codes ->
{:ok, code, downcase_headers(headers), client}
opts = opts ++ [receive_timeout: @max_read_duration]
{:ok, code, headers} when code in @valid_resp_codes ->
{:ok, code, downcase_headers(headers)}
case Pleroma.HTTP.request(method, url, "", headers, opts) do
{:ok, %Tesla.Env{status: status, headers: headers, body: body}}
when status in @valid_resp_codes ->
{:ok, status, downcase_headers(headers), body}
{:ok, code, _, _} ->
{:error, {:invalid_http_response, code}}
{:ok, %Tesla.Env{status: status, headers: headers}} when status in @valid_resp_codes ->
{:ok, status, downcase_headers(headers)}
{:ok, code, _} ->
{:error, {:invalid_http_response, code}}
{:ok, %Tesla.Env{status: status}} ->
{:error, {:invalid_http_response, status}}
{:error, error} ->
{:error, error}
end
end
defp response(conn, client, url, status, headers, opts) do
Logger.debug("#{__MODULE__} #{status} #{url} #{inspect(headers)}")
result =
conn
|> put_resp_headers(build_resp_headers(headers, opts))
|> send_chunked(status)
|> chunk_reply(client, opts)
case result do
{:ok, conn} ->
halt(conn)
{:error, :closed, conn} ->
client().close(client)
halt(conn)
{:error, error, conn} ->
Logger.warn(
"#{__MODULE__} request to #{url} failed while reading/chunking: #{inspect(error)}"
)
client().close(client)
halt(conn)
end
end
defp chunk_reply(conn, client, opts) do
chunk_reply(conn, client, opts, 0, 0)
end
defp response(conn, body, status, headers, opts) do
Logger.debug("#{__MODULE__} #{status} #{conn.private[:proxied_url]} #{inspect(headers)}")
defp chunk_reply(conn, client, opts, sent_so_far, duration) do
with {:ok, duration} <-
check_read_duration(
duration,
Keyword.get(opts, :max_read_duration, @max_read_duration)
),
{:ok, data, client} <- client().stream_body(client),
{:ok, duration} <- increase_read_duration(duration),
sent_so_far = sent_so_far + byte_size(data),
:ok <-
body_size_constraint(
sent_so_far,
Keyword.get(opts, :max_body_length, @max_body_length)
),
{:ok, conn} <- chunk(conn, data) do
chunk_reply(conn, client, opts, sent_so_far, duration)
else
:done -> {:ok, conn}
{:error, error} -> {:error, error, conn}
end
conn
|> put_resp_headers(build_resp_headers(headers, opts))
|> send_resp(status, body)
end
defp head_response(conn, url, code, headers, opts) do
Logger.debug("#{__MODULE__} #{code} #{url} #{inspect(headers)}")
defp head_response(conn, status, headers, opts) do
Logger.debug("#{__MODULE__} #{status} #{conn.private[:proxied_url]} #{inspect(headers)}")
conn
|> put_resp_headers(build_resp_headers(headers, opts))
|> send_resp(code, "")
|> send_resp(status, "")
end
defp error_or_redirect(conn, url, code, body, opts) do
defp error_or_redirect(conn, status, body, opts) do
if Keyword.get(opts, :redirect_on_failure, false) do
conn
|> Phoenix.Controller.redirect(external: url)
|> Phoenix.Controller.redirect(external: conn.private[:proxied_url])
|> halt()
else
conn
|> send_resp(code, body)
|> send_resp(status, body)
|> halt
end
end
@ -272,7 +231,6 @@ defmodule Pleroma.ReverseProxy do
|> downcase_headers()
|> Enum.filter(fn {k, _} -> k in @keep_req_headers end)
|> build_req_range_or_encoding_header(opts)
|> build_req_user_agent_header(opts)
|> Keyword.merge(Keyword.get(opts, :req_headers, []))
end
@ -287,15 +245,6 @@ defmodule Pleroma.ReverseProxy do
end
end
defp build_req_user_agent_header(headers, _opts) do
List.keystore(
headers,
"user-agent",
0,
{"user-agent", Pleroma.Application.user_agent()}
)
end
defp build_resp_headers(headers, opts) do
headers
|> Enum.filter(fn {k, _} -> k in @keep_resp_headers end)
@ -382,37 +331,6 @@ defmodule Pleroma.ReverseProxy do
defp header_length_constraint(_, _), do: :ok
defp body_size_constraint(size, limit) when is_integer(limit) and limit > 0 and size >= limit do
{:error, :body_too_large}
end
defp body_size_constraint(_, _), do: :ok
defp check_read_duration(nil = _duration, max), do: check_read_duration(@max_read_duration, max)
defp check_read_duration(duration, max)
when is_integer(duration) and is_integer(max) and max > 0 do
if duration > max do
{:error, :read_duration_exceeded}
else
{:ok, {duration, :erlang.system_time(:millisecond)}}
end
end
defp check_read_duration(_, _), do: {:ok, :no_duration_limit, :no_duration_limit}
defp increase_read_duration({previous_duration, started})
when is_integer(previous_duration) and is_integer(started) do
duration = :erlang.system_time(:millisecond) - started
{:ok, previous_duration + duration}
end
defp increase_read_duration(_) do
{:ok, :no_duration_limit, :no_duration_limit}
end
defp client, do: Pleroma.ReverseProxy.Client.Wrapper
defp track_failed_url(url, error, opts) do
ttl =
unless error in [:body_too_large, 400, 204] do

@ -8,11 +8,7 @@ defmodule Pleroma.Telemetry.Logger do
require Logger
@events [
[:pleroma, :connection_pool, :reclaim, :start],
[:pleroma, :connection_pool, :reclaim, :stop],
[:pleroma, :connection_pool, :provision_failure],
[:pleroma, :connection_pool, :client, :dead],
[:pleroma, :connection_pool, :client, :add]
[:pleroma, :repo, :query]
]
def attach do
:telemetry.attach_many(
@ -28,68 +24,62 @@ defmodule Pleroma.Telemetry.Logger do
# out anyway due to higher log level configured
def handle_event(
[:pleroma, :connection_pool, :reclaim, :start],
_,
%{max_connections: max_connections, reclaim_max: reclaim_max},
_
[:pleroma, :repo, :query] = _name,
%{query_time: query_time} = measurements,
%{source: source} = metadata,
config
) do
Logger.debug(fn ->
"Connection pool is exhausted (reached #{max_connections} connections). Starting idle connection cleanup to reclaim as much as #{reclaim_max} connections"
end)
end
logging_config = Pleroma.Config.get([:telemetry, :slow_queries_logging], [])
def handle_event(
[:pleroma, :connection_pool, :reclaim, :stop],
%{reclaimed_count: 0},
_,
_
) do
Logger.error(fn ->
"Connection pool failed to reclaim any connections due to all of them being in use. It will have to drop requests for opening connections to new hosts"
end)
if logging_config[:enabled] &&
logging_config[:min_duration] &&
query_time > logging_config[:min_duration] and
(is_nil(logging_config[:exclude_sources]) or
source not in logging_config[:exclude_sources]) do
log_slow_query(measurements, metadata, config)
else
:ok
end
end
def handle_event(
[:pleroma, :connection_pool, :reclaim, :stop],
%{reclaimed_count: reclaimed_count},
_,
_
) do
Logger.debug(fn -> "Connection pool cleaned up #{reclaimed_count} idle connections" end)
end
defp log_slow_query(
%{query_time: query_time} = _measurements,
%{source: _source, query: query, params: query_params, repo: repo} = _metadata,
_config
) do
sql_explain =
with {:ok, %{rows: explain_result_rows}} <-
repo.query("EXPLAIN " <> query, query_params, log: false) do
Enum.map_join(explain_result_rows, "\n", & &1)
end
def handle_event(
[:pleroma, :connection_pool, :provision_failure],
%{opts: [key | _]},
_,
_
) do
Logger.error(fn ->
"Connection pool had to refuse opening a connection to #{key} due to connection limit exhaustion"
end)
end
{:current_stacktrace, stacktrace} = Process.info(self(), :current_stacktrace)
pleroma_stacktrace =
Enum.filter(stacktrace, fn
{__MODULE__, _, _, _} ->
false
{mod, _, _, _} ->
mod
|> to_string()
|> String.starts_with?("Elixir.Pleroma.")
end)
def handle_event(
[:pleroma, :connection_pool, :client, :dead],
%{client_pid: client_pid, reason: reason},
%{key: key},
_
) do
Logger.warn(fn ->
"Pool worker for #{key}: Client #{inspect(client_pid)} died before releasing the connection with #{inspect(reason)}"
end)
end
"""
Slow query!
def handle_event(
[:pleroma, :connection_pool, :client, :add],
%{clients: [_, _ | _] = clients},
%{key: key, protocol: :http},
_
) do
Logger.info(fn ->
"Pool worker for #{key}: #{length(clients)} clients are using an HTTP1 connection at the same time, head-of-line blocking might occur."
Total time: #{round(query_time / 1_000)} ms
#{query}
#{inspect(query_params, limit: :infinity)}
#{sql_explain}
#{Exception.format_stacktrace(pleroma_stacktrace)}
"""
end)
end
def handle_event([:pleroma, :connection_pool, :client, :add], _, _, _), do: :ok
end

@ -30,23 +30,12 @@ defmodule Pleroma.Uploaders.S3 do
op =
if streaming do
op =
upload.tempfile
|> ExAws.S3.Upload.stream_file()
|> ExAws.S3.upload(bucket, s3_name, [
{:acl, :public_read},
{:content_type, upload.content_type}
])
if Application.get_env(:tesla, :adapter) == Tesla.Adapter.Gun do
# set s3 upload timeout to respect :upload pool timeout
# timeout should be slightly larger, so s3 can retry upload on fail
timeout = Pleroma.HTTP.AdapterHelper.Gun.pool_timeout(:upload) + 1_000
opts = Keyword.put(op.opts, :timeout, timeout)
Map.put(op, :opts, opts)
else
op
end
upload.tempfile
|> ExAws.S3.Upload.stream_file()
|> ExAws.S3.upload(bucket, s3_name, [
{:acl, :public_read},
{:content_type, upload.content_type}
])
else
{:ok, file_data} = File.read(upload.tempfile)

@ -12,7 +12,6 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
require Logger
@adapter_options [
pool: :media,
recv_timeout: 10_000
]

@ -54,7 +54,7 @@ defmodule Pleroma.Web.MediaProxy.MediaProxyController do
media_proxy_url = MediaProxy.url(url)
with {:ok, %{status: status} = head_response} when status in 200..299 <-
Pleroma.HTTP.request("head", media_proxy_url, [], [], pool: :media) do
Pleroma.HTTP.request("head", media_proxy_url, [], [], name: MyFinch) do
content_type = Tesla.get_header(head_response, "content-type")
content_length = Tesla.get_header(head_response, "content-length")
content_length = content_length && String.to_integer(content_length)

@ -47,10 +47,9 @@ defmodule Pleroma.Web.Plugs.UploadedMedia do
config = Pleroma.Config.get(Pleroma.Upload)
with uploader <- Keyword.fetch!(config, :uploader),
proxy_remote = Keyword.get(config, :proxy_remote, false),
{:ok, get_method} <- uploader.get_file(file),
false <- media_is_banned(conn, get_method) do
get_media(conn, get_method, proxy_remote, opts)
get_media(conn, get_method, opts)
else
_ ->
conn
@ -69,7 +68,7 @@ defmodule Pleroma.Web.Plugs.UploadedMedia do
defp media_is_banned(_, _), do: false
defp get_media(conn, {:static_dir, directory}, _, opts) do
defp get_media(conn, {:static_dir, directory}, opts) do
static_opts =
Map.get(opts, :static_plug_opts)
|> Map.put(:at, [@path])
@ -86,25 +85,13 @@ defmodule Pleroma.Web.Plugs.UploadedMedia do
end
end
defp get_media(conn, {:url, url}, true, _) do
proxy_opts = [
http: [
follow_redirect: true,
pool: :upload
]
]
conn
|> Pleroma.ReverseProxy.call(url, proxy_opts)
end
defp get_media(conn, {:url, url}, _, _) do
defp get_media(conn, {:url, url}, _) do
conn
|> Phoenix.Controller.redirect(external: url)
|> halt()
end
defp get_media(conn, unknown, _, _) do
defp get_media(conn, unknown, _) do
Logger.error("#{__MODULE__}: Unknown get startegy: #{inspect(unknown)}")
conn

@ -4,7 +4,6 @@
defmodule Pleroma.Web.RelMe do
@options [
pool: :media,
max_body: 2_000_000,
recv_timeout: 2_000
]

@ -10,7 +10,6 @@ defmodule Pleroma.Web.RichMedia.Helpers do
alias Pleroma.Web.RichMedia.Parser
@options [
pool: :media,
max_body: 2_000_000,
recv_timeout: 2_000
]

@ -215,7 +215,6 @@ defmodule Pleroma.Mixfile do
{:mock, "~> 0.3.5", only: :test},
# temporary downgrade for excoveralls, hackney until hackney max_connections bug will be fixed
{:excoveralls, "0.12.3", only: :test},
{:hackney, "~> 1.18.0", override: true},
{:mox, "~> 1.0", only: :test},
{:websocket_client, git: "https://github.com/jeremyong/websocket_client.git", only: :test}
] ++ oauth_deps()

@ -280,50 +280,6 @@ defmodule Pleroma.Config.DeprecationWarningsTest do
"Your config is using the old setting for controlling the URL of media uploaded to your S3 bucket."
end
describe "check_gun_pool_options/0" do
test "await_up_timeout" do
config = Config.get(:connections_pool)
clear_config(:connections_pool, Keyword.put(config, :await_up_timeout, 5_000))
assert capture_log(fn ->
DeprecationWarnings.check_gun_pool_options()
end) =~
"Your config is using old setting `config :pleroma, :connections_pool, await_up_timeout`."
end
test "pool timeout" do