From dd017c65a4b86501c435f5cb01804300e6b7c6dd Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sat, 31 Aug 2019 21:58:42 +0300 Subject: [PATCH] [#1149] Refactored Oban workers API (introduced `enqueue/3`). --- lib/pleroma/activity_expiration_worker.ex | 13 +++------ lib/pleroma/digest_email_worker.ex | 10 ++----- lib/pleroma/emails/mailer.ex | 7 +---- lib/pleroma/scheduled_activity_worker.ex | 10 +++---- lib/pleroma/user.ex | 28 +++++-------------- lib/pleroma/web/activity_pub/activity_pub.ex | 6 +--- .../mrf/mediaproxy_warming_policy.ex | 11 ++------ .../web/activity_pub/transmogrifier.ex | 6 +--- lib/pleroma/web/federator/federator.ex | 26 ++++------------- lib/pleroma/web/federator/publisher.ex | 9 +++--- lib/pleroma/web/oauth/token/clean_worker.ex | 7 +---- lib/pleroma/web/push/push.ex | 7 +---- .../workers/activity_expiration_worker.ex | 2 ++ lib/pleroma/workers/background_worker.ex | 2 ++ lib/pleroma/workers/digest_emails_worker.ex | 21 ++++++++++++++ lib/pleroma/workers/mailer_worker.ex | 10 ++----- lib/pleroma/workers/publisher_worker.ex | 2 ++ lib/pleroma/workers/receiver_worker.ex | 2 ++ .../workers/scheduled_activity_worker.ex | 2 ++ lib/pleroma/workers/subscriber_worker.ex | 2 ++ lib/pleroma/workers/transmogrifier_worker.ex | 2 ++ lib/pleroma/workers/web_pusher_worker.ex | 2 ++ lib/pleroma/workers/worker_helper.ex | 18 ++++++++++++ 23 files changed, 92 insertions(+), 113 deletions(-) create mode 100644 lib/pleroma/workers/digest_emails_worker.ex diff --git a/lib/pleroma/activity_expiration_worker.ex b/lib/pleroma/activity_expiration_worker.ex index 7aba7eece..c0820c202 100644 --- a/lib/pleroma/activity_expiration_worker.ex +++ b/lib/pleroma/activity_expiration_worker.ex @@ -9,14 +9,11 @@ defmodule Pleroma.ActivityExpirationWorker do alias Pleroma.Repo alias Pleroma.User alias Pleroma.Web.CommonAPI - alias Pleroma.Workers.ActivityExpirationWorker require Logger use GenServer import Ecto.Query - import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] - @schedule_interval :timer.minutes(1) def start_link(_) do @@ -53,12 +50,10 @@ defmodule Pleroma.ActivityExpirationWorker do def handle_info(:perform, state) do ActivityExpiration.due_expirations(@schedule_interval) |> Enum.each(fn expiration -> - %{ - "op" => "activity_expiration", - "activity_expiration_id" => expiration.id - } - |> ActivityExpirationWorker.new(worker_args(:activity_expiration)) - |> Repo.insert() + Pleroma.Workers.ActivityExpirationWorker.enqueue( + "activity_expiration", + %{"activity_expiration_id" => expiration.id} + ) end) schedule_next() diff --git a/lib/pleroma/digest_email_worker.ex b/lib/pleroma/digest_email_worker.ex index 4ab2a4ef4..5be7cf26b 100644 --- a/lib/pleroma/digest_email_worker.ex +++ b/lib/pleroma/digest_email_worker.ex @@ -4,12 +4,10 @@ defmodule Pleroma.DigestEmailWorker do alias Pleroma.Repo - alias Pleroma.Workers.MailerWorker + alias Pleroma.Workers.DigestEmailsWorker import Ecto.Query - import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] - def perform do config = Pleroma.Config.get([:email_notifications, :digest]) negative_interval = -Map.fetch!(config, :interval) @@ -23,11 +21,9 @@ defmodule Pleroma.DigestEmailWorker do where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"), select: u ) - |> Pleroma.Repo.all() + |> Repo.all() |> Enum.each(fn user -> - %{"op" => "digest_email", "user_id" => user.id} - |> MailerWorker.new([queue: "digest_emails"] ++ worker_args(:digest_emails)) - |> Repo.insert() + DigestEmailsWorker.enqueue("digest_email", %{"user_id" => user.id}) end) end diff --git a/lib/pleroma/emails/mailer.ex b/lib/pleroma/emails/mailer.ex index 9cbe7313c..eb96f2e8b 100644 --- a/lib/pleroma/emails/mailer.ex +++ b/lib/pleroma/emails/mailer.ex @@ -9,7 +9,6 @@ defmodule Pleroma.Emails.Mailer do The module contains functions to delivery email using Swoosh.Mailer. """ - alias Pleroma.Repo alias Pleroma.Workers.MailerWorker alias Swoosh.DeliveryError @@ -19,8 +18,6 @@ defmodule Pleroma.Emails.Mailer do @spec enabled?() :: boolean() def enabled?, do: Pleroma.Config.get([__MODULE__, :enabled]) - import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] - @doc "add email to queue" def deliver_async(email, config \\ []) do encoded_email = @@ -28,9 +25,7 @@ defmodule Pleroma.Emails.Mailer do |> :erlang.term_to_binary() |> Base.encode64() - %{"op" => "email", "encoded_email" => encoded_email, "config" => config} - |> MailerWorker.new(worker_args(:mailer)) - |> Repo.insert() + MailerWorker.enqueue("email", %{"encoded_email" => encoded_email, "config" => config}) end @doc "callback to perform send email from queue" diff --git a/lib/pleroma/scheduled_activity_worker.ex b/lib/pleroma/scheduled_activity_worker.ex index 8bf534f42..c41a542de 100644 --- a/lib/pleroma/scheduled_activity_worker.ex +++ b/lib/pleroma/scheduled_activity_worker.ex @@ -8,7 +8,6 @@ defmodule Pleroma.ScheduledActivityWorker do """ alias Pleroma.Config - alias Pleroma.Repo alias Pleroma.ScheduledActivity alias Pleroma.User alias Pleroma.Web.CommonAPI @@ -18,8 +17,6 @@ defmodule Pleroma.ScheduledActivityWorker do @schedule_interval :timer.minutes(1) - import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] - def start_link(_) do GenServer.start_link(__MODULE__, nil) end @@ -49,9 +46,10 @@ defmodule Pleroma.ScheduledActivityWorker do def handle_info(:perform, state) do ScheduledActivity.due_activities(@schedule_interval) |> Enum.each(fn scheduled_activity -> - %{"op" => "execute", "activity_id" => scheduled_activity.id} - |> Pleroma.Workers.ScheduledActivityWorker.new(worker_args(:scheduled_activities)) - |> Repo.insert() + Pleroma.Workers.ScheduledActivityWorker.enqueue( + "execute", + %{"activity_id" => scheduled_activity.id} + ) end) schedule_next() diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex index abfa063fb..2fe7e1748 100644 --- a/lib/pleroma/user.ex +++ b/lib/pleroma/user.ex @@ -41,8 +41,6 @@ defmodule Pleroma.User do @strict_local_nickname_regex ~r/^[a-zA-Z\d]+$/ @extended_local_nickname_regex ~r/^[a-zA-Z\d_-]+$/ - import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] - schema "users" do field(:bio, :string) field(:email, :string) @@ -623,9 +621,7 @@ defmodule Pleroma.User do @doc "Fetch some posts when the user has just been federated with" def fetch_initial_posts(user) do - %{"op" => "fetch_initial_posts", "user_id" => user.id} - |> BackgroundWorker.new(worker_args(:background)) - |> Repo.insert() + BackgroundWorker.enqueue("fetch_initial_posts", %{"user_id" => user.id}) end @spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t() @@ -1056,9 +1052,7 @@ defmodule Pleroma.User do end def deactivate_async(user, status \\ true) do - %{"op" => "deactivate_user", "user_id" => user.id, "status" => status} - |> BackgroundWorker.new(worker_args(:background)) - |> Repo.insert() + BackgroundWorker.enqueue("deactivate_user", %{"user_id" => user.id, "status" => status}) end def deactivate(%User{} = user, status \\ true) do @@ -1087,9 +1081,7 @@ defmodule Pleroma.User do end def delete(%User{} = user) do - %{"op" => "delete_user", "user_id" => user.id} - |> BackgroundWorker.new(worker_args(:background)) - |> Repo.insert() + BackgroundWorker.enqueue("delete_user", %{"user_id" => user.id}) end @spec perform(atom(), User.t()) :: {:ok, User.t()} @@ -1198,24 +1190,18 @@ defmodule Pleroma.User do end def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do - %{ - "op" => "blocks_import", + BackgroundWorker.enqueue("blocks_import", %{ "blocker_id" => blocker.id, "blocked_identifiers" => blocked_identifiers - } - |> BackgroundWorker.new(worker_args(:background)) - |> Repo.insert() + }) end def follow_import(%User{} = follower, followed_identifiers) when is_list(followed_identifiers) do - %{ - "op" => "follow_import", + BackgroundWorker.enqueue("follow_import", %{ "follower_id" => follower.id, "followed_identifiers" => followed_identifiers - } - |> BackgroundWorker.new(worker_args(:background)) - |> Repo.insert() + }) end def delete_user_activities(%User{ap_id: ap_id} = user) do diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex index 74c5eb91c..90b409606 100644 --- a/lib/pleroma/web/activity_pub/activity_pub.ex +++ b/lib/pleroma/web/activity_pub/activity_pub.ex @@ -26,8 +26,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do require Logger require Pleroma.Constants - import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] - # For Announce activities, we filter the recipients based on following status for any actors # that match actual users. See issue #164 for more information about why this is necessary. defp get_recipients(%{"type" => "Announce"} = data) do @@ -148,9 +146,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do activity end - %{"op" => "fetch_data_for_activity", "activity_id" => activity.id} - |> BackgroundWorker.new(worker_args(:background)) - |> Repo.insert() + BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id}) Notification.create_notifications(activity) diff --git a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex index 178321558..26b8539fe 100644 --- a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex +++ b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex @@ -7,7 +7,6 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do @behaviour Pleroma.Web.ActivityPub.MRF alias Pleroma.HTTP - alias Pleroma.Repo alias Pleroma.Web.MediaProxy alias Pleroma.Workers.BackgroundWorker @@ -18,8 +17,6 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do recv_timeout: 10_000 ] - import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] - def perform(:prefetch, url) do Logger.info("Prefetching #{inspect(url)}") @@ -34,9 +31,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do url |> Enum.each(fn %{"href" => href} -> - %{"op" => "media_proxy_prefetch", "url" => href} - |> BackgroundWorker.new(worker_args(:background)) - |> Repo.insert() + BackgroundWorker.enqueue("media_proxy_prefetch", %{"url" => href}) x -> Logger.debug("Unhandled attachment URL object #{inspect(x)}") @@ -52,9 +47,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do %{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message ) when is_list(attachments) and length(attachments) > 0 do - %{"op" => "media_proxy_preload", "message" => message} - |> BackgroundWorker.new(worker_args(:background)) - |> Repo.insert() + BackgroundWorker.enqueue("media_proxy_preload", %{"message" => message}) {:ok, message} end diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex index 9437f9a16..f27455e8b 100644 --- a/lib/pleroma/web/activity_pub/transmogrifier.ex +++ b/lib/pleroma/web/activity_pub/transmogrifier.ex @@ -22,8 +22,6 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do require Logger require Pleroma.Constants - import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] - @doc """ Modifies an incoming AP object (mastodon format) to our internal format. """ @@ -1054,9 +1052,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do already_ap <- User.ap_enabled?(user), {:ok, user} <- user |> User.upgrade_changeset(data) |> User.update_and_set_cache() do unless already_ap do - %{"op" => "user_upgrade", "user_id" => user.id} - |> TransmogrifierWorker.new(worker_args(:transmogrifier)) - |> Repo.insert() + TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id}) end {:ok, user} diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex index 8f43066e3..1a2da014a 100644 --- a/lib/pleroma/web/federator/federator.ex +++ b/lib/pleroma/web/federator/federator.ex @@ -18,8 +18,6 @@ defmodule Pleroma.Web.Federator do require Logger - import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] - def init do # To do: consider removing this call in favor of scheduled execution (`quantum`-based) refresh_subscriptions(schedule_in: 60) @@ -40,15 +38,11 @@ defmodule Pleroma.Web.Federator do # Client API def incoming_doc(doc) do - %{"op" => "incoming_doc", "body" => doc} - |> ReceiverWorker.new(worker_args(:federator_incoming)) - |> Pleroma.Repo.insert() + ReceiverWorker.enqueue("incoming_doc", %{"body" => doc}) end def incoming_ap_doc(params) do - %{"op" => "incoming_ap_doc", "params" => params} - |> ReceiverWorker.new(worker_args(:federator_incoming)) - |> Pleroma.Repo.insert() + ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params}) end def publish(%{id: "pleroma:fakeid"} = activity) do @@ -56,27 +50,19 @@ defmodule Pleroma.Web.Federator do end def publish(activity) do - %{"op" => "publish", "activity_id" => activity.id} - |> PublisherWorker.new(worker_args(:federator_outgoing)) - |> Pleroma.Repo.insert() + PublisherWorker.enqueue("publish", %{"activity_id" => activity.id}) end def verify_websub(websub) do - %{"op" => "verify_websub", "websub_id" => websub.id} - |> SubscriberWorker.new(worker_args(:federator_outgoing)) - |> Pleroma.Repo.insert() + SubscriberWorker.enqueue("verify_websub", %{"websub_id" => websub.id}) end def request_subscription(websub) do - %{"op" => "request_subscription", "websub_id" => websub.id} - |> SubscriberWorker.new(worker_args(:federator_outgoing)) - |> Pleroma.Repo.insert() + SubscriberWorker.enqueue("request_subscription", %{"websub_id" => websub.id}) end def refresh_subscriptions(worker_args \\ []) do - %{"op" => "refresh_subscriptions"} - |> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing)) - |> Pleroma.Repo.insert() + SubscriberWorker.enqueue("refresh_subscriptions", %{}, worker_args ++ [max_attempts: 1]) end # Job Worker Callbacks diff --git a/lib/pleroma/web/federator/publisher.ex b/lib/pleroma/web/federator/publisher.ex index 42be109ab..937064638 100644 --- a/lib/pleroma/web/federator/publisher.ex +++ b/lib/pleroma/web/federator/publisher.ex @@ -31,11 +31,10 @@ defmodule Pleroma.Web.Federator.Publisher do """ @spec enqueue_one(module(), Map.t()) :: :ok def enqueue_one(module, %{} = params) do - worker_args = Pleroma.Workers.WorkerHelper.worker_args(:federator_outgoing) - - %{"op" => "publish_one", "module" => to_string(module), "params" => params} - |> PublisherWorker.new(worker_args) - |> Pleroma.Repo.insert() + PublisherWorker.enqueue( + "publish_one", + %{"module" => to_string(module), "params" => params} + ) end @doc """ diff --git a/lib/pleroma/web/oauth/token/clean_worker.ex b/lib/pleroma/web/oauth/token/clean_worker.ex index b150a68a7..eb94bf86f 100644 --- a/lib/pleroma/web/oauth/token/clean_worker.ex +++ b/lib/pleroma/web/oauth/token/clean_worker.ex @@ -16,12 +16,9 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do @one_day ) - alias Pleroma.Repo alias Pleroma.Web.OAuth.Token alias Pleroma.Workers.BackgroundWorker - import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] - def start_link(_), do: GenServer.start_link(__MODULE__, %{}) def init(_) do @@ -31,9 +28,7 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do @doc false def handle_info(:perform, state) do - %{"op" => "clean_expired_tokens"} - |> BackgroundWorker.new(worker_args(:background)) - |> Repo.insert() + BackgroundWorker.enqueue("clean_expired_tokens", %{}) Process.send_after(self(), :perform, @interval) {:noreply, state} diff --git a/lib/pleroma/web/push/push.ex b/lib/pleroma/web/push/push.ex index 4973b529c..7ef1532ac 100644 --- a/lib/pleroma/web/push/push.ex +++ b/lib/pleroma/web/push/push.ex @@ -3,13 +3,10 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.Push do - alias Pleroma.Repo alias Pleroma.Workers.WebPusherWorker require Logger - import Pleroma.Workers.WorkerHelper, only: [worker_args: 1] - def init do unless enabled() do Logger.warn(""" @@ -35,8 +32,6 @@ defmodule Pleroma.Web.Push do end def send(notification) do - %{"op" => "web_push", "notification_id" => notification.id} - |> WebPusherWorker.new(worker_args(:web_push)) - |> Repo.insert() + WebPusherWorker.enqueue("web_push", %{"notification_id" => notification.id}) end end diff --git a/lib/pleroma/workers/activity_expiration_worker.ex b/lib/pleroma/workers/activity_expiration_worker.ex index 0b491eabb..60dd3feba 100644 --- a/lib/pleroma/workers/activity_expiration_worker.ex +++ b/lib/pleroma/workers/activity_expiration_worker.ex @@ -8,6 +8,8 @@ defmodule Pleroma.Workers.ActivityExpirationWorker do queue: "activity_expiration", max_attempts: 1 + use Pleroma.Workers.WorkerHelper, queue: "activity_expiration" + @impl Oban.Worker def perform( %{ diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex index 7b5575a5f..b9aef3a92 100644 --- a/lib/pleroma/workers/background_worker.ex +++ b/lib/pleroma/workers/background_worker.ex @@ -13,6 +13,8 @@ defmodule Pleroma.Workers.BackgroundWorker do queue: "background", max_attempts: 1 + use Pleroma.Workers.WorkerHelper, queue: "background" + @impl Oban.Worker def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do user = User.get_cached_by_id(user_id) diff --git a/lib/pleroma/workers/digest_emails_worker.ex b/lib/pleroma/workers/digest_emails_worker.ex new file mode 100644 index 000000000..ca073ce67 --- /dev/null +++ b/lib/pleroma/workers/digest_emails_worker.ex @@ -0,0 +1,21 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Workers.DigestEmailsWorker do + alias Pleroma.User + + # Note: `max_attempts` is intended to be overridden in `new/2` call + use Oban.Worker, + queue: "digest_emails", + max_attempts: 1 + + use Pleroma.Workers.WorkerHelper, queue: "digest_emails" + + @impl Oban.Worker + def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do + user_id + |> User.get_cached_by_id() + |> Pleroma.DigestEmailWorker.perform() + end +end diff --git a/lib/pleroma/workers/mailer_worker.ex b/lib/pleroma/workers/mailer_worker.ex index 4f73d61bc..a4bd54a6c 100644 --- a/lib/pleroma/workers/mailer_worker.ex +++ b/lib/pleroma/workers/mailer_worker.ex @@ -3,13 +3,13 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Workers.MailerWorker do - alias Pleroma.User - # Note: `max_attempts` is intended to be overridden in `new/2` call use Oban.Worker, queue: "mailer", max_attempts: 1 + use Pleroma.Workers.WorkerHelper, queue: "mailer" + @impl Oban.Worker def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do encoded_email @@ -17,10 +17,4 @@ defmodule Pleroma.Workers.MailerWorker do |> :erlang.binary_to_term() |> Pleroma.Emails.Mailer.deliver(config) end - - def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do - user_id - |> User.get_cached_by_id() - |> Pleroma.DigestEmailWorker.perform() - end end diff --git a/lib/pleroma/workers/publisher_worker.ex b/lib/pleroma/workers/publisher_worker.ex index 5671d2a29..a3ac22635 100644 --- a/lib/pleroma/workers/publisher_worker.ex +++ b/lib/pleroma/workers/publisher_worker.ex @@ -11,6 +11,8 @@ defmodule Pleroma.Workers.PublisherWorker do queue: "federator_outgoing", max_attempts: 1 + use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing" + def backoff(attempt) when is_integer(attempt) do Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5) end diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex index cdce630f2..3cc415ce4 100644 --- a/lib/pleroma/workers/receiver_worker.ex +++ b/lib/pleroma/workers/receiver_worker.ex @@ -10,6 +10,8 @@ defmodule Pleroma.Workers.ReceiverWorker do queue: "federator_incoming", max_attempts: 1 + use Pleroma.Workers.WorkerHelper, queue: "federator_incoming" + @impl Oban.Worker def perform(%{"op" => "incoming_doc", "body" => doc}, _job) do Federator.perform(:incoming_doc, doc) diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex index 4094411ae..936bb64d3 100644 --- a/lib/pleroma/workers/scheduled_activity_worker.ex +++ b/lib/pleroma/workers/scheduled_activity_worker.ex @@ -8,6 +8,8 @@ defmodule Pleroma.Workers.ScheduledActivityWorker do queue: "scheduled_activities", max_attempts: 1 + use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities" + @impl Oban.Worker def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do Pleroma.ScheduledActivityWorker.perform(:execute, activity_id) diff --git a/lib/pleroma/workers/subscriber_worker.ex b/lib/pleroma/workers/subscriber_worker.ex index 22d1dc956..4fb994554 100644 --- a/lib/pleroma/workers/subscriber_worker.ex +++ b/lib/pleroma/workers/subscriber_worker.ex @@ -12,6 +12,8 @@ defmodule Pleroma.Workers.SubscriberWorker do queue: "federator_outgoing", max_attempts: 1 + use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing" + @impl Oban.Worker def perform(%{"op" => "refresh_subscriptions"}, _job) do Federator.perform(:refresh_subscriptions) diff --git a/lib/pleroma/workers/transmogrifier_worker.ex b/lib/pleroma/workers/transmogrifier_worker.ex index 6f5c1a2f2..6fecc2bf9 100644 --- a/lib/pleroma/workers/transmogrifier_worker.ex +++ b/lib/pleroma/workers/transmogrifier_worker.ex @@ -10,6 +10,8 @@ defmodule Pleroma.Workers.TransmogrifierWorker do queue: "transmogrifier", max_attempts: 1 + use Pleroma.Workers.WorkerHelper, queue: "transmogrifier" + @impl Oban.Worker def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do user = User.get_cached_by_id(user_id) diff --git a/lib/pleroma/workers/web_pusher_worker.ex b/lib/pleroma/workers/web_pusher_worker.ex index 2b1d3b99a..4c2591a5c 100644 --- a/lib/pleroma/workers/web_pusher_worker.ex +++ b/lib/pleroma/workers/web_pusher_worker.ex @@ -11,6 +11,8 @@ defmodule Pleroma.Workers.WebPusherWorker do queue: "web_push", max_attempts: 1 + use Pleroma.Workers.WorkerHelper, queue: "web_push" + @impl Oban.Worker def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do notification = Repo.get(Notification, notification_id) diff --git a/lib/pleroma/workers/worker_helper.ex b/lib/pleroma/workers/worker_helper.ex index f9ed2e64d..b12f198d4 100644 --- a/lib/pleroma/workers/worker_helper.ex +++ b/lib/pleroma/workers/worker_helper.ex @@ -4,6 +4,7 @@ defmodule Pleroma.Workers.WorkerHelper do alias Pleroma.Config + alias Pleroma.Workers.WorkerHelper def worker_args(queue) do case Config.get([:workers, :retries, queue]) do @@ -20,4 +21,21 @@ defmodule Pleroma.Workers.WorkerHelper do trunc(backoff) end + + defmacro __using__(opts) do + caller_module = __CALLER__.module + queue = Keyword.fetch!(opts, :queue) + + quote do + def enqueue(op, params, worker_args \\ []) do + params = Map.merge(%{"op" => op}, params) + queue_atom = String.to_atom(unquote(queue)) + worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom) + + unquote(caller_module) + |> apply(:new, [params, worker_args]) + |> Pleroma.Repo.insert() + end + end + end end