akkoma/lib/pleroma/web/federator/retry_queue.ex

79 lines
1.9 KiB
Elixir
Raw Normal View History

# Pleroma: A lightweight social networking server
# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
2018-08-26 18:17:13 +00:00
defmodule Pleroma.Web.Federator.RetryQueue do
use GenServer
2018-12-09 09:12:48 +00:00
2018-08-26 18:17:13 +00:00
require Logger
# initial timeout, 5 min
@initial_timeout 30_000
@max_retries 5
def init(args) do
{:ok, args}
end
def start_link() do
enabled = Pleroma.Config.get([:retry_queue, :enabled], false)
if enabled do
Logger.info("Starting retry queue")
GenServer.start_link(__MODULE__, %{delivered: 0, dropped: 0}, name: __MODULE__)
else
Logger.info("Retry queue disabled")
:ignore
end
2018-08-26 18:17:13 +00:00
end
def enqueue(data, transport, retries \\ 0) do
GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
end
2018-11-19 16:08:41 +00:00
def get_retry_params(retries) do
2018-08-26 18:17:13 +00:00
if retries > @max_retries do
2018-11-19 16:08:41 +00:00
{:drop, "Max retries reached"}
2018-08-26 18:17:13 +00:00
else
2018-11-19 16:08:41 +00:00
{:retry, growth_function(retries)}
2018-08-26 18:17:13 +00:00
end
end
2018-11-19 16:08:41 +00:00
def handle_cast({:maybe_enqueue, data, transport, retries}, %{dropped: drop_count} = state) do
case get_retry_params(retries) do
2018-12-10 06:39:57 +00:00
{:retry, timeout} ->
2018-11-19 16:08:41 +00:00
Process.send_after(
__MODULE__,
{:send, data, transport, retries},
2018-12-10 06:39:57 +00:00
timeout
2018-11-19 16:08:41 +00:00
)
2018-08-26 18:17:13 +00:00
{:noreply, state}
2018-11-19 16:08:41 +00:00
{:drop, message} ->
Logger.debug(message)
{:noreply, %{state | dropped: drop_count + 1}}
2018-08-26 18:17:13 +00:00
end
end
2018-11-19 16:08:41 +00:00
def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
case transport.publish_one(data) do
2018-08-26 18:17:13 +00:00
{:ok, _} ->
2018-11-19 16:08:41 +00:00
{:noreply, %{state | delivered: delivery_count + 1}}
2018-08-26 18:17:13 +00:00
2018-12-09 09:12:48 +00:00
{:error, _reason} ->
2018-11-19 16:08:41 +00:00
enqueue(data, transport, retries)
2018-08-26 18:17:13 +00:00
{:noreply, state}
end
end
def handle_info(unknown, state) do
Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
{:noreply, state}
end
defp growth_function(retries) do
round(@initial_timeout * :math.pow(retries, 3))
end
end