From 55affbca7fcb214c71b3f8378b0de869c4d4d072 Mon Sep 17 00:00:00 2001 From: Egor Kislitsyn Date: Mon, 28 Jan 2019 22:17:17 +0700 Subject: [PATCH 1/8] add a job queue --- config/config.exs | 6 +- config/test.exs | 2 + docs/config.md | 31 +++- lib/pleroma/application.ex | 5 +- lib/pleroma/jobs.ex | 153 ++++++++++++++++++ lib/pleroma/web/activity_pub/activity_pub.ex | 20 ++- .../activity_pub/activity_pub_controller.ex | 4 +- lib/pleroma/web/activity_pub/utils.ex | 2 +- lib/pleroma/web/federator/federator.ex | 139 ++++++---------- lib/pleroma/web/ostatus/ostatus_controller.ex | 2 +- lib/pleroma/web/websub/websub.ex | 8 +- lib/pleroma/web/websub/websub_controller.ex | 2 +- test/jobs_test.exs | 83 ++++++++++ test/support/jobs_worker_mock.ex | 19 +++ test/web/federator_test.exs | 24 +-- 15 files changed, 358 insertions(+), 142 deletions(-) create mode 100644 lib/pleroma/jobs.ex create mode 100644 test/jobs_test.exs create mode 100644 test/support/jobs_worker_mock.ex diff --git a/config/config.exs b/config/config.exs index e8cf2ed3a..98dd8eb65 100644 --- a/config/config.exs +++ b/config/config.exs @@ -271,14 +271,16 @@ "web" ] -config :pleroma, Pleroma.Web.Federator, max_jobs: 50 - config :pleroma, Pleroma.Web.Federator.RetryQueue, enabled: false, max_jobs: 20, initial_timeout: 30, max_retries: 5 +config :pleroma, Pleroma.Jobs, + federator_incoming: [max_jobs: 50], + federator_outgoing: [max_jobs: 50] + # Import environment specific config. This must remain at the bottom # of this file so it overrides the configuration defined above. import_config "#{Mix.env()}.exs" diff --git a/config/test.exs b/config/test.exs index 67ed4737f..7e833fbb3 100644 --- a/config/test.exs +++ b/config/test.exs @@ -43,6 +43,8 @@ "BLH1qVhJItRGCfxgTtONfsOKDc9VRAraXw-3NsmjMngWSh7NxOizN6bkuRA7iLTMPS82PjwJAr3UoK9EC1IFrz4", private_key: "_-XZ0iebPrRfZ_o0-IatTdszYa8VCH1yLN-JauK7HHA" +config :pleroma, Pleroma.Jobs, testing: [max_jobs: 2] + try do import_config "test.secret.exs" rescue diff --git a/docs/config.md b/docs/config.md index 5464fa90d..84a1e3607 100644 --- a/docs/config.md +++ b/docs/config.md @@ -36,14 +36,15 @@ This filter replaces the filename (not the path) of an upload. For complete obfu An example for Sendgrid adapter: -``` +```exs config :pleroma, Pleroma.Mailer, adapter: Swoosh.Adapters.Sendgrid, api_key: "YOUR_API_KEY" ``` An example for SMTP adapter: -``` + +```exs config :pleroma, Pleroma.Mailer, adapter: Swoosh.Adapters.SMTP, relay: "smtp.gmail.com", @@ -163,7 +164,7 @@ their ActivityPub ID. An example: -``` +```exs config :pleroma, :mrf_user_allowlist, "example.org": ["https://example.org/users/admin"] ``` @@ -192,18 +193,34 @@ the source code is here: https://github.com/koto-bank/kocaptcha. The default end Allows to set a token that can be used to authenticate with the admin api without using an actual user by giving it as the 'admin_token' parameter. Example: -``` +```exs config :pleroma, :admin_token, "somerandomtoken" ``` You can then do -``` + +```sh curl "http://localhost:4000/api/pleroma/admin/invite_token?admin_token=somerandomtoken" ``` -## Pleroma.Web.Federator +## Pleroma.Jobs + +A list of job queues and their settings. + +Job queue settings: + +* `max_jobs`: The maximum amount of parallel jobs running at the same time. + +Example: + +```exs +config :pleroma, Pleroma.Jobs, + federator_incoming: [max_jobs: 50], + federator_outgoing: [max_jobs: 50] +``` + +This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the `max_jobs` set to `50`. -* `max_jobs`: The maximum amount of parallel federation jobs running at the same time. ## Pleroma.Web.Federator.RetryQueue diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index 47c0e5b68..60cba1580 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -101,9 +101,10 @@ def start(_type, _args) do ), worker(Pleroma.FlakeId, []), worker(Pleroma.Web.Federator.RetryQueue, []), - worker(Pleroma.Web.Federator, []), worker(Pleroma.Stats, []), - worker(Pleroma.Web.Push, []) + worker(Pleroma.Web.Push, []), + worker(Pleroma.Jobs, []), + worker(Task, [&Pleroma.Web.Federator.init/0], restart: :temporary) ] ++ streamer_child() ++ chat_child() ++ diff --git a/lib/pleroma/jobs.ex b/lib/pleroma/jobs.ex new file mode 100644 index 000000000..dff0f2197 --- /dev/null +++ b/lib/pleroma/jobs.ex @@ -0,0 +1,153 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Jobs do + @moduledoc """ + A basic job queue + """ + use GenServer + + require Logger + + def init(args) do + {:ok, args} + end + + def start_link do + queues = + Pleroma.Config.get(Pleroma.Jobs) + |> Enum.map(fn {name, _} -> create_queue(name) end) + |> Enum.into(%{}) + + state = %{ + queues: queues, + refs: %{} + } + + GenServer.start_link(__MODULE__, state, name: __MODULE__) + end + + def create_queue(name) do + {name, {:sets.new(), []}} + end + + @doc """ + Enqueues a job. + + Returns `:ok`. + + ## Arguments + + - `queue_name` - a queue name(must be specified in the config). + - `mod` - a worker module, must have `perform` function. + - `args` - a list of arguments for the `perform` function of the worker module. + - `priority` - a job priority (`0` by default). + + ## Examples + + Enqueue `Module.perform/0` with `priority=1`: + + iex> Pleroma.Jobs.enqueue(:example_queue, Module, []) + :ok + + Enqueue `Module.perform(:job_name)` with `priority=5`: + + iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:job_name], 5) + :ok + + Enqueue `Module.perform(:another_job, data)` with `priority=1`: + + iex> data = "foobar" + iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:another_job, data]) + :ok + + Enqueue `Module.perform(:foobar_job, :foo, :bar, 42)` with `priority=1`: + + iex> Pleroma.Jobs.enqueue(:example_queue, Module, [:foobar_job, :foo, :bar, 42]) + :ok + + """ + + def enqueue(queue_name, mod, args, priority \\ 1) + + if Mix.env() == :test do + def enqueue(_queue_name, mod, args, _priority) do + apply(mod, :perform, args) + end + else + @spec enqueue(atom(), atom(), [any()], integer()) :: :ok + def enqueue(queue_name, mod, args, priority \\ 1) do + GenServer.cast(__MODULE__, {:enqueue, queue_name, mod, args, priority}) + end + end + + def handle_cast({:enqueue, queue_name, mod, args, priority}, state) do + {running_jobs, queue} = state[:queues][queue_name] + + queue = enqueue_sorted(queue, {mod, args}, priority) + + state = + state + |> update_queue(queue_name, {running_jobs, queue}) + |> maybe_start_job(queue_name, running_jobs, queue) + + {:noreply, state} + end + + def handle_cast(m, state) do + IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}") + {:noreply, state} + end + + def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do + queue_name = state.refs[ref] + + {running_jobs, queue} = state[:queues][queue_name] + + running_jobs = :sets.del_element(ref, running_jobs) + + state = state |> remove_ref(ref) |> maybe_start_job(queue_name, running_jobs, queue) + + {:noreply, state} + end + + def maybe_start_job(state, queue_name, running_jobs, queue) do + if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, queue_name, :max_jobs]) && + queue != [] do + {{mod, args}, queue} = queue_pop(queue) + {:ok, pid} = Task.start(fn -> apply(mod, :perform, args) end) + mref = Process.monitor(pid) + + state + |> add_ref(queue_name, mref) + |> update_queue(queue_name, {:sets.add_element(mref, running_jobs), queue}) + else + update_queue(state, queue_name, {running_jobs, queue}) + end + end + + def enqueue_sorted(queue, element, priority) do + [%{item: element, priority: priority} | queue] + |> Enum.sort_by(fn %{priority: priority} -> priority end) + end + + def queue_pop([%{item: element} | queue]) do + {element, queue} + end + + defp add_ref(state, queue_name, ref) do + refs = Map.put(state[:refs], ref, queue_name) + Map.put(state, :refs, refs) + end + + defp remove_ref(state, ref) do + refs = Map.delete(state[:refs], ref) + Map.put(state, :refs, refs) + end + + defp update_queue(state, queue_name, data) do + queues = Map.put(state[:queues], queue_name, data) + Map.put(state, :queues, queues) + end +end diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 6b4682e35..5be359887 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -711,20 +711,18 @@ def publish(actor, activity) do public = is_public?(activity) - remote_inboxes = - (Pleroma.Web.Salmon.remote_users(activity) ++ followers) - |> Enum.filter(fn user -> User.ap_enabled?(user) end) - |> Enum.map(fn %{info: %{source_data: data}} -> - (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"] - end) - |> Enum.uniq() - |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) - {:ok, data} = Transmogrifier.prepare_outgoing(activity.data) json = Jason.encode!(data) - Enum.each(remote_inboxes, fn inbox -> - Federator.enqueue(:publish_single_ap, %{ + (Pleroma.Web.Salmon.remote_users(activity) ++ followers) + |> Enum.filter(fn user -> User.ap_enabled?(user) end) + |> Enum.map(fn %{info: %{source_data: data}} -> + (is_map(data["endpoints"]) && Map.get(data["endpoints"], "sharedInbox")) || data["inbox"] + end) + |> Enum.uniq() + |> Enum.filter(fn inbox -> should_federate?(inbox, public) end) + |> Enum.each(fn inbox -> + Federator.publish_single_ap(%{ inbox: inbox, json: json, actor: actor, diff --git a/lib/pleroma/web/activity_pub/activity_pub_controller.ex b/lib/pleroma/web/activity_pub/activity_pub_controller.ex index 7eed0a600..04c6fef3f 100644 --- a/lib/pleroma/web/activity_pub/activity_pub_controller.ex +++ b/lib/pleroma/web/activity_pub/activity_pub_controller.ex @@ -150,13 +150,13 @@ def inbox(%{assigns: %{valid_signature: true}} = conn, %{"nickname" => nickname} with %User{} = user <- User.get_cached_by_nickname(nickname), true <- Utils.recipient_in_message(user.ap_id, params), params <- Utils.maybe_splice_recipient(user.ap_id, params) do - Federator.enqueue(:incoming_ap_doc, params) + Federator.incoming_ap_doc(params) json(conn, "ok") end end def inbox(%{assigns: %{valid_signature: true}} = conn, params) do - Federator.enqueue(:incoming_ap_doc, params) + Federator.incoming_ap_doc(params) json(conn, "ok") end diff --git a/lib/pleroma/web/activity_pub/utils.ex b/lib/pleroma/web/activity_pub/utils.ex index e40d05fcd..2ff8a1e8a 100644 --- a/lib/pleroma/web/activity_pub/utils.ex +++ b/lib/pleroma/web/activity_pub/utils.ex @@ -156,7 +156,7 @@ def maybe_federate(%Activity{local: true} = activity) do _ -> 5 end - Pleroma.Web.Federator.enqueue(:publish, activity, priority) + Pleroma.Web.Federator.publish(activity, priority) :ok end diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index f3a0e18b8..4d03b4622 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -3,9 +3,9 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Federator do - use GenServer alias Pleroma.User alias Pleroma.Activity + alias Pleroma.Jobs alias Pleroma.Web.{WebFinger, Websub} alias Pleroma.Web.Federator.RetryQueue alias Pleroma.Web.ActivityPub.ActivityPub @@ -18,39 +18,60 @@ defmodule Pleroma.Web.Federator do @websub Application.get_env(:pleroma, :websub) @ostatus Application.get_env(:pleroma, :ostatus) - def init(args) do - {:ok, args} + def init() do + # 1 minute + Process.sleep(1000 * 60 * 1) + refresh_subscriptions() end - def start_link do - spawn(fn -> - # 1 minute - Process.sleep(1000 * 60 * 1) - enqueue(:refresh_subscriptions, nil) - end) + # Client API - GenServer.start_link( - __MODULE__, - %{ - in: {:sets.new(), []}, - out: {:sets.new(), []} - }, - name: __MODULE__ - ) + def incoming_doc(doc) do + Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc]) end - def handle(:refresh_subscriptions, _) do + def incoming_ap_doc(params) do + Jobs.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params]) + end + + def publish(activity, priority \\ 1) do + Jobs.enqueue(:federator_out, __MODULE__, [:publish, activity], priority) + end + + def publish_single_ap(params) do + Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_ap, params]) + end + + def publish_single_websub(websub) do + Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_websub, websub]) + end + + def verify_websub(websub) do + Jobs.enqueue(:federator_out, __MODULE__, [:verify_websub, websub]) + end + + def request_subscription(sub) do + Jobs.enqueue(:federator_out, __MODULE__, [:request_subscription, sub]) + end + + def refresh_subscriptions() do + Jobs.enqueue(:federator_out, __MODULE__, [:refresh_subscriptions]) + end + + # Job Worker Callbacks + + def perform(:refresh_subscriptions) do Logger.debug("Federator running refresh subscriptions") Websub.refresh_subscriptions() spawn(fn -> # 6 hours Process.sleep(1000 * 60 * 60 * 6) - enqueue(:refresh_subscriptions, nil) + refresh_subscriptions() end) end - def handle(:request_subscription, websub) do + def perform(:request_subscription, websub) do Logger.debug("Refreshing #{websub.topic}") with {:ok, websub} <- Websub.request_subscription(websub) do @@ -60,7 +81,7 @@ def handle(:request_subscription, websub) do end end - def handle(:publish, activity) do + def perform(:publish, activity) do Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end) with actor when not is_nil(actor) <- User.get_cached_by_ap_id(activity.data["actor"]) do @@ -86,7 +107,7 @@ def handle(:publish, activity) do end end - def handle(:verify_websub, websub) do + def perform(:verify_websub, websub) do Logger.debug(fn -> "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})" end) @@ -94,12 +115,12 @@ def handle(:verify_websub, websub) do @websub.verify(websub) end - def handle(:incoming_doc, doc) do + def perform(:incoming_doc, doc) do Logger.info("Got document, trying to parse") @ostatus.handle_incoming(doc) end - def handle(:incoming_ap_doc, params) do + def perform(:incoming_ap_doc, params) do Logger.info("Handling incoming AP activity") params = Utils.normalize_params(params) @@ -124,7 +145,7 @@ def handle(:incoming_ap_doc, params) do end end - def handle(:publish_single_ap, params) do + def perform(:publish_single_ap, params) do case ActivityPub.publish_one(params) do {:ok, _} -> :ok @@ -134,7 +155,7 @@ def handle(:publish_single_ap, params) do end end - def handle( + def perform( :publish_single_websub, %{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params ) do @@ -147,75 +168,11 @@ def handle( end end - def handle(type, _) do + def perform(type, _) do Logger.debug(fn -> "Unknown task: #{type}" end) {:error, "Don't know what to do with this"} end - if Mix.env() == :test do - def enqueue(type, payload, _priority \\ 1) do - if Pleroma.Config.get([:instance, :federating]) do - handle(type, payload) - end - end - else - def enqueue(type, payload, priority \\ 1) do - if Pleroma.Config.get([:instance, :federating]) do - GenServer.cast(__MODULE__, {:enqueue, type, payload, priority}) - end - end - end - - def maybe_start_job(running_jobs, queue) do - if :sets.size(running_jobs) < Pleroma.Config.get([__MODULE__, :max_jobs]) && queue != [] do - {{type, payload}, queue} = queue_pop(queue) - {:ok, pid} = Task.start(fn -> handle(type, payload) end) - mref = Process.monitor(pid) - {:sets.add_element(mref, running_jobs), queue} - else - {running_jobs, queue} - end - end - - def handle_cast({:enqueue, type, payload, _priority}, state) - when type in [:incoming_doc, :incoming_ap_doc] do - %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state - i_queue = enqueue_sorted(i_queue, {type, payload}, 1) - {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue) - {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} - end - - def handle_cast({:enqueue, type, payload, _priority}, state) do - %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state - o_queue = enqueue_sorted(o_queue, {type, payload}, 1) - {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue) - {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} - end - - def handle_cast(m, state) do - IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}") - {:noreply, state} - end - - def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do - %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state - i_running_jobs = :sets.del_element(ref, i_running_jobs) - o_running_jobs = :sets.del_element(ref, o_running_jobs) - {i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue) - {o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue) - - {:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}} - end - - def enqueue_sorted(queue, element, priority) do - [%{item: element, priority: priority} | queue] - |> Enum.sort_by(fn %{priority: priority} -> priority end) - end - - def queue_pop([%{item: element} | queue]) do - {element, queue} - end - def ap_enabled_actor(id) do user = User.get_by_ap_id(id) diff --git a/lib/pleroma/web/ostatus/ostatus_controller.ex b/lib/pleroma/web/ostatus/ostatus_controller.ex index 823619edb..845bc60bb 100644 --- a/lib/pleroma/web/ostatus/ostatus_controller.ex +++ b/lib/pleroma/web/ostatus/ostatus_controller.ex @@ -82,7 +82,7 @@ def salmon_incoming(conn, _) do {:ok, body, _conn} = read_body(conn) {:ok, doc} = decode_or_retry(body) - Federator.enqueue(:incoming_doc, doc) + Federator.incoming_doc(doc) conn |> send_resp(200, "") diff --git a/lib/pleroma/web/websub/websub.ex b/lib/pleroma/web/websub/websub.ex index 7ca62c83b..652ffd92c 100644 --- a/lib/pleroma/web/websub/websub.ex +++ b/lib/pleroma/web/websub/websub.ex @@ -7,7 +7,7 @@ defmodule Pleroma.Web.Websub do alias Pleroma.Repo alias Pleroma.Web.Websub.{WebsubServerSubscription, WebsubClientSubscription} alias Pleroma.Web.OStatus.FeedRepresenter - alias Pleroma.Web.{XML, Endpoint, OStatus} + alias Pleroma.Web.{XML, Endpoint, OStatus, Federator} alias Pleroma.Web.Router.Helpers require Logger @@ -77,7 +77,7 @@ def publish(topic, user, %{data: %{"type" => type}} = activity) secret: sub.secret } - Pleroma.Web.Federator.enqueue(:publish_single_websub, data) + Federator.publish_single_websub(data) end) end @@ -109,7 +109,7 @@ def incoming_subscription_request(user, %{"hub.mode" => "subscribe"} = params) d websub = Repo.update!(change) - Pleroma.Web.Federator.enqueue(:verify_websub, websub) + Federator.verify_websub(websub) {:ok, websub} else @@ -259,7 +259,7 @@ def refresh_subscriptions(delta \\ 60 * 60 * 24) do subs = Repo.all(query) Enum.each(subs, fn sub -> - Pleroma.Web.Federator.enqueue(:request_subscription, sub) + Federator.request_subscription(sub) end) end diff --git a/lib/pleroma/web/websub/websub_controller.ex b/lib/pleroma/web/websub/websub_controller.ex index e58f144e5..eb10227cb 100644 --- a/lib/pleroma/web/websub/websub_controller.ex +++ b/lib/pleroma/web/websub/websub_controller.ex @@ -80,7 +80,7 @@ def websub_incoming(conn, %{"id" => id}) do %WebsubClientSubscription{} = websub <- Repo.get(WebsubClientSubscription, id), {:ok, body, _conn} = read_body(conn), ^signature <- Websub.sign(websub.secret, body) do - Federator.enqueue(:incoming_doc, body) + Federator.incoming_doc(body) conn |> send_resp(200, "OK") diff --git a/test/jobs_test.exs b/test/jobs_test.exs new file mode 100644 index 000000000..ccb518dec --- /dev/null +++ b/test/jobs_test.exs @@ -0,0 +1,83 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.JobsTest do + use ExUnit.Case, async: true + + alias Pleroma.Jobs + alias Jobs.WorkerMock + + setup do + state = %{ + queues: Enum.into([Jobs.create_queue(:testing)], %{}), + refs: %{} + } + + [state: state] + end + + test "creates queue" do + queue = Jobs.create_queue(:foobar) + + assert {:foobar, set} = queue + assert :set == elem(set, 0) |> elem(0) + end + + test "enqueues an element according to priority" do + queue = [%{item: 1, priority: 2}] + + new_queue = Jobs.enqueue_sorted(queue, 2, 1) + assert new_queue == [%{item: 2, priority: 1}, %{item: 1, priority: 2}] + + new_queue = Jobs.enqueue_sorted(queue, 2, 3) + assert new_queue == [%{item: 1, priority: 2}, %{item: 2, priority: 3}] + end + + test "pop first item" do + queue = [%{item: 2, priority: 1}, %{item: 1, priority: 2}] + + assert {2, [%{item: 1, priority: 2}]} = Jobs.queue_pop(queue) + end + + test "enqueue a job", %{state: state} do + assert {:noreply, new_state} = + Jobs.handle_cast({:enqueue, :testing, WorkerMock, [:test_job, :foo, :bar], 3}, state) + + assert %{queues: %{testing: {running_jobs, []}}, refs: _} = new_state + assert :sets.size(running_jobs) == 1 + assert [ref] = :sets.to_list(running_jobs) + assert %{refs: %{^ref => :testing}} = new_state + end + + test "max jobs setting", %{state: state} do + max_jobs = Pleroma.Config.get([Jobs, :testing, :max_jobs]) + + {:noreply, state} = + Enum.reduce(1..(max_jobs + 1), {:noreply, state}, fn _, {:noreply, state} -> + Jobs.handle_cast({:enqueue, :testing, WorkerMock, [:test_job, :foo, :bar], 3}, state) + end) + + assert %{ + queues: %{ + testing: + {running_jobs, [%{item: {WorkerMock, [:test_job, :foo, :bar]}, priority: 3}]} + } + } = state + + assert :sets.size(running_jobs) == max_jobs + end + + test "remove job after it finished", %{state: state} do + {:noreply, new_state} = + Jobs.handle_cast({:enqueue, :testing, WorkerMock, [:test_job, :foo, :bar], 3}, state) + + %{queues: %{testing: {running_jobs, []}}} = new_state + [ref] = :sets.to_list(running_jobs) + + assert {:noreply, %{queues: %{testing: {running_jobs, []}}, refs: %{}}} = + Jobs.handle_info({:DOWN, ref, :process, nil, nil}, new_state) + + assert :sets.size(running_jobs) == 0 + end +end diff --git a/test/support/jobs_worker_mock.ex b/test/support/jobs_worker_mock.ex new file mode 100644 index 000000000..0fb976d05 --- /dev/null +++ b/test/support/jobs_worker_mock.ex @@ -0,0 +1,19 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Jobs.WorkerMock do + require Logger + + def perform(:test_job, arg, arg2) do + Logger.debug({:perform, :test_job, arg, arg2}) + end + + def perform(:test_job, payload) do + Logger.debug({:perform, :test_job, payload}) + end + + def test_job(payload) do + Pleroma.Jobs.enqueue(:testing, __MODULE__, [:test_job, payload]) + end +end diff --git a/test/web/federator_test.exs b/test/web/federator_test.exs index a49265c0c..d58621cc3 100644 --- a/test/web/federator_test.exs +++ b/test/web/federator_test.exs @@ -14,22 +14,6 @@ defmodule Pleroma.Web.FederatorTest do :ok end - test "enqueues an element according to priority" do - queue = [%{item: 1, priority: 2}] - - new_queue = Federator.enqueue_sorted(queue, 2, 1) - assert new_queue == [%{item: 2, priority: 1}, %{item: 1, priority: 2}] - - new_queue = Federator.enqueue_sorted(queue, 2, 3) - assert new_queue == [%{item: 1, priority: 2}, %{item: 2, priority: 3}] - end - - test "pop first item" do - queue = [%{item: 2, priority: 1}, %{item: 1, priority: 2}] - - assert {2, [%{item: 1, priority: 2}]} = Federator.queue_pop(queue) - end - describe "Publish an activity" do setup do user = insert(:user) @@ -49,7 +33,7 @@ test "with relays active, it publishes to the relay", %{ relay_mock: relay_mock } do with_mocks([relay_mock]) do - Federator.handle(:publish, activity) + Federator.publish(activity) end assert_received :relay_publish @@ -62,7 +46,7 @@ test "with relays deactivated, it does not publish to the relay", %{ Pleroma.Config.put([:instance, :allow_relay], false) with_mocks([relay_mock]) do - Federator.handle(:publish, activity) + Federator.publish(activity) end refute_received :relay_publish @@ -87,7 +71,7 @@ test "successfully processes incoming AP docs with correct origin" do "to" => ["https://www.w3.org/ns/activitystreams#Public"] } - {:ok, _activity} = Federator.handle(:incoming_ap_doc, params) + {:ok, _activity} = Federator.incoming_ap_doc(params) end test "rejects incoming AP docs with incorrect origin" do @@ -105,7 +89,7 @@ test "rejects incoming AP docs with incorrect origin" do "to" => ["https://www.w3.org/ns/activitystreams#Public"] } - :error = Federator.handle(:incoming_ap_doc, params) + :error = Federator.incoming_ap_doc(params) end end end From b2e9700785124a8ba4cfb037ae92a6bfc7c3b224 Mon Sep 17 00:00:00 2001 From: Egor Kislitsyn Date: Mon, 28 Jan 2019 23:12:02 +0700 Subject: [PATCH 2/8] cleanup --- lib/pleroma/jobs.ex | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/lib/pleroma/jobs.ex b/lib/pleroma/jobs.ex index dff0f2197..2a75ff529 100644 --- a/lib/pleroma/jobs.ex +++ b/lib/pleroma/jobs.ex @@ -40,7 +40,7 @@ def create_queue(name) do ## Arguments - `queue_name` - a queue name(must be specified in the config). - - `mod` - a worker module, must have `perform` function. + - `mod` - a worker module (must have `perform` function). - `args` - a list of arguments for the `perform` function of the worker module. - `priority` - a job priority (`0` by default). @@ -76,7 +76,6 @@ def enqueue(_queue_name, mod, args, _priority) do apply(mod, :perform, args) end else - @spec enqueue(atom(), atom(), [any()], integer()) :: :ok def enqueue(queue_name, mod, args, priority \\ 1) do GenServer.cast(__MODULE__, {:enqueue, queue_name, mod, args, priority}) end @@ -95,11 +94,6 @@ def handle_cast({:enqueue, queue_name, mod, args, priority}, state) do {:noreply, state} end - def handle_cast(m, state) do - IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}") - {:noreply, state} - end - def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do queue_name = state.refs[ref] From 56533495b53c12223678de3f2fc6df8ee1165c19 Mon Sep 17 00:00:00 2001 From: Egor Kislitsyn Date: Tue, 29 Jan 2019 00:33:08 +0700 Subject: [PATCH 3/8] fix defaults --- lib/pleroma/jobs.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pleroma/jobs.ex b/lib/pleroma/jobs.ex index 2a75ff529..d42688193 100644 --- a/lib/pleroma/jobs.ex +++ b/lib/pleroma/jobs.ex @@ -76,7 +76,7 @@ def enqueue(_queue_name, mod, args, _priority) do apply(mod, :perform, args) end else - def enqueue(queue_name, mod, args, priority \\ 1) do + def enqueue(queue_name, mod, args, priority) do GenServer.cast(__MODULE__, {:enqueue, queue_name, mod, args, priority}) end end From 1724a6b34b099dc83b94687d27d2492aaf97013e Mon Sep 17 00:00:00 2001 From: Egor Kislitsyn Date: Tue, 29 Jan 2019 00:35:57 +0700 Subject: [PATCH 4/8] add spec back --- lib/pleroma/jobs.ex | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/pleroma/jobs.ex b/lib/pleroma/jobs.ex index d42688193..16dccb682 100644 --- a/lib/pleroma/jobs.ex +++ b/lib/pleroma/jobs.ex @@ -76,6 +76,7 @@ def enqueue(_queue_name, mod, args, _priority) do apply(mod, :perform, args) end else + @spec enqueue(atom(), atom(), [any()], integer()) :: :ok def enqueue(queue_name, mod, args, priority) do GenServer.cast(__MODULE__, {:enqueue, queue_name, mod, args, priority}) end From ab31adf15bbec1597a9b7cf065898fb3f712eef3 Mon Sep 17 00:00:00 2001 From: Egor Kislitsyn Date: Wed, 30 Jan 2019 22:56:59 +0700 Subject: [PATCH 5/8] tiny improve --- lib/pleroma/jobs.ex | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/pleroma/jobs.ex b/lib/pleroma/jobs.ex index 16dccb682..24b7e5e46 100644 --- a/lib/pleroma/jobs.ex +++ b/lib/pleroma/jobs.ex @@ -102,7 +102,11 @@ def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do running_jobs = :sets.del_element(ref, running_jobs) - state = state |> remove_ref(ref) |> maybe_start_job(queue_name, running_jobs, queue) + state = + state + |> remove_ref(ref) + |> update_queue(queue_name, {running_jobs, queue}) + |> maybe_start_job(queue_name, running_jobs, queue) {:noreply, state} end @@ -118,7 +122,7 @@ def maybe_start_job(state, queue_name, running_jobs, queue) do |> add_ref(queue_name, mref) |> update_queue(queue_name, {:sets.add_element(mref, running_jobs), queue}) else - update_queue(state, queue_name, {running_jobs, queue}) + state end end From 58e250d9d27205116df5634d909ec1e43ea55286 Mon Sep 17 00:00:00 2001 From: Egor Kislitsyn Date: Thu, 31 Jan 2019 15:23:50 +0700 Subject: [PATCH 6/8] fix merge --- lib/pleroma/application.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex index e81816e35..e44c48c35 100644 --- a/lib/pleroma/application.ex +++ b/lib/pleroma/application.ex @@ -109,7 +109,7 @@ def start(_type, _args) do worker(Pleroma.Stats, []), worker(Pleroma.Web.Push, []), worker(Pleroma.Jobs, []), - worker(Task, [&Pleroma.Web.Federator.init/0], restart: :temporary) + worker(Task, [&Pleroma.Web.Federator.init/0], restart: :temporary) ] ++ streamer_child() ++ chat_child() ++ From 5b1d7c3c5672af065af503891d156b6e0cf5a8c1 Mon Sep 17 00:00:00 2001 From: Egor Kislitsyn Date: Wed, 6 Feb 2019 12:17:41 +0700 Subject: [PATCH 7/8] fix tests --- test/web/federator_test.exs | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/test/web/federator_test.exs b/test/web/federator_test.exs index 147086918..b56694a8b 100644 --- a/test/web/federator_test.exs +++ b/test/web/federator_test.exs @@ -87,11 +87,9 @@ test "with relays deactivated, it does not publish to the relay", %{ {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"}) - assert called( - Federator.enqueue(:publish_single_ap, %{inbox: inbox1, unreachable_since: dt}) - ) + assert called(Federator.publish_single_ap(%{inbox: inbox1, unreachable_since: dt})) - refute called(Federator.enqueue(:publish_single_ap, %{inbox: inbox2})) + refute called(Federator.publish_single_ap(%{inbox: inbox2})) end test_with_mock "it federates only to reachable instances via Websub", @@ -123,13 +121,13 @@ test "with relays deactivated, it does not publish to the relay", %{ {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI"}) assert called( - Federator.enqueue(:publish_single_websub, %{ + Federator.publish_single_websub(%{ callback: sub2.callback, unreachable_since: dt }) ) - refute called(Federator.enqueue(:publish_single_websub, %{callback: sub1.callback})) + refute called(Federator.publish_single_websub(%{callback: sub1.callback})) end test_with_mock "it federates only to reachable instances via Salmon", @@ -163,13 +161,13 @@ test "with relays deactivated, it does not publish to the relay", %{ CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"}) assert called( - Federator.enqueue(:publish_single_salmon, %{ + Federator.publish_single_salmon(%{ recipient: remote_user2, unreachable_since: dt }) ) - refute called(Federator.enqueue(:publish_single_websub, %{recipient: remote_user1})) + refute called(Federator.publish_single_websub(%{recipient: remote_user1})) end end From 3f32d7b937a2368707794b55d1256bfa9fa508f4 Mon Sep 17 00:00:00 2001 From: Egor Kislitsyn Date: Thu, 14 Feb 2019 17:02:47 +0700 Subject: [PATCH 8/8] Fix queue name --- lib/pleroma/web/federator/federator.ex | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index 7df75aca6..d4e2a9742 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -38,31 +38,31 @@ def incoming_ap_doc(params) do end def publish(activity, priority \\ 1) do - Jobs.enqueue(:federator_out, __MODULE__, [:publish, activity], priority) + Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority) end def publish_single_ap(params) do - Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_ap, params]) + Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_ap, params]) end def publish_single_websub(websub) do - Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_websub, websub]) + Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_websub, websub]) end def verify_websub(websub) do - Jobs.enqueue(:federator_out, __MODULE__, [:verify_websub, websub]) + Jobs.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub]) end def request_subscription(sub) do - Jobs.enqueue(:federator_out, __MODULE__, [:request_subscription, sub]) + Jobs.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub]) end def refresh_subscriptions() do - Jobs.enqueue(:federator_out, __MODULE__, [:refresh_subscriptions]) + Jobs.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions]) end def publish_single_salmon(params) do - Jobs.enqueue(:federator_out, __MODULE__, [:publish_single_salmon, params]) + Jobs.enqueue(:federator_outgoing, __MODULE__, [:publish_single_salmon, params]) end # Job Worker Callbacks