# Pleroma: A lightweight social networking server # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Federator.RetryQueue do use GenServer require Logger def init(args) do queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected]) {:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}} end def start_link() do enabled = if Mix.env() == :test, do: true, else: Pleroma.Config.get([__MODULE__, :enabled], false) if enabled do Logger.info("Starting retry queue") linkres = GenServer.start_link( __MODULE__, %{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil}, name: __MODULE__ ) maybe_kickoff_timer() linkres else Logger.info("Retry queue disabled") :ignore end end def enqueue(data, transport, retries \\ 0) do GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1}) end def get_stats() do GenServer.call(__MODULE__, :get_stats) end def reset_stats() do GenServer.call(__MODULE__, :reset_stats) end def get_retry_params(retries) do if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do {:drop, "Max retries reached"} else {:retry, growth_function(retries)} end end def get_retry_timer_interval() do Pleroma.Config.get([:retry_queue, :interval], 1000) end defp ets_count_expires(table, current_time) do :ets.select_count( table, [ { {:"$1", :"$2"}, [{:"=<", :"$1", {:const, current_time}}], [true] } ] ) end defp ets_pop_n_expired(table, current_time, desired) do {popped, _continuation} = :ets.select( table, [ { {:"$1", :"$2"}, [{:"=<", :"$1", {:const, current_time}}], [:"$_"] } ], desired ) popped |> Enum.each(fn e -> :ets.delete_object(table, e) end) popped end def maybe_start_job(running_jobs, queue_table) do # we don't want to hit the ets or the DateTime more times than we have to # could optimize slightly further by not using the count, and instead grabbing # up to N objects early... current_time = DateTime.to_unix(DateTime.utc_now()) n_running_jobs = :sets.size(running_jobs) if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do n_ready_jobs = ets_count_expires(queue_table, current_time) if n_ready_jobs > 0 do # figure out how many we could start available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs start_n_jobs(running_jobs, queue_table, current_time, available_job_slots) else running_jobs end else running_jobs end end defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do running_jobs end defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots) when available_job_slots > 0 do candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots) candidates |> List.foldl(running_jobs, fn {_, e}, rj -> {:ok, pid} = Task.start(fn -> worker(e) end) mref = Process.monitor(pid) :sets.add_element(mref, rj) end) end def worker({:send, data, transport, retries}) do case transport.publish_one(data) do {:ok, _} -> GenServer.cast(__MODULE__, :inc_delivered) :delivered {:error, _reason} -> enqueue(data, transport, retries) :retry end end def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do {:reply, %{delivered: delivery_count, dropped: drop_count}, state} end def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do {:reply, %{delivered: delivery_count, dropped: drop_count}, %{state | delivered: 0, dropped: 0}} end def handle_cast(:reset_stats, state) do {:noreply, %{state | delivered: 0, dropped: 0}} end def handle_cast( {:maybe_enqueue, data, transport, retries}, %{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state ) do case get_retry_params(retries) do {:retry, timeout} -> :ets.insert(queue_table, {timeout, {:send, data, transport, retries}}) running_jobs = maybe_start_job(running_jobs, queue_table) {:noreply, %{state | running_jobs: running_jobs}} {:drop, message} -> Logger.debug(message) {:noreply, %{state | dropped: drop_count + 1}} end end def handle_cast(:kickoff_timer, state) do retry_interval = get_retry_timer_interval() Process.send_after(__MODULE__, :retry_timer_run, retry_interval) {:noreply, state} end def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do {:noreply, %{state | delivered: delivery_count + 1}} end def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do {:noreply, %{state | dropped: drop_count + 1}} end def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do case transport.publish_one(data) do {:ok, _} -> {:noreply, %{state | delivered: delivery_count + 1}} {:error, _reason} -> enqueue(data, transport, retries) {:noreply, state} end end def handle_info( :retry_timer_run, %{queue_table: queue_table, running_jobs: running_jobs} = state ) do maybe_kickoff_timer() running_jobs = maybe_start_job(running_jobs, queue_table) {:noreply, %{state | running_jobs: running_jobs}} end def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do %{running_jobs: running_jobs, queue_table: queue_table} = state running_jobs = :sets.del_element(ref, running_jobs) running_jobs = maybe_start_job(running_jobs, queue_table) {:noreply, %{state | running_jobs: running_jobs}} end def handle_info(unknown, state) do Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring") {:noreply, state} end if Mix.env() == :test do defp growth_function(_retries) do _shutit = Pleroma.Config.get([__MODULE__, :initial_timeout]) DateTime.to_unix(DateTime.utc_now()) - 1 end else defp growth_function(retries) do round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) + DateTime.to_unix(DateTime.utc_now()) end end defp maybe_kickoff_timer() do GenServer.cast(__MODULE__, :kickoff_timer) end end