[#1149] Refactored Oban workers API (introduced enqueue/3).

This commit is contained in:
Ivan Tashkinov 2019-08-31 21:58:42 +03:00
parent a90ea8ba15
commit dd017c65a4
23 changed files with 92 additions and 113 deletions

View file

@ -9,14 +9,11 @@ defmodule Pleroma.ActivityExpirationWorker do
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
alias Pleroma.Workers.ActivityExpirationWorker
require Logger require Logger
use GenServer use GenServer
import Ecto.Query import Ecto.Query
import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
@schedule_interval :timer.minutes(1) @schedule_interval :timer.minutes(1)
def start_link(_) do def start_link(_) do
@ -53,12 +50,10 @@ def perform(:execute, expiration_id) do
def handle_info(:perform, state) do def handle_info(:perform, state) do
ActivityExpiration.due_expirations(@schedule_interval) ActivityExpiration.due_expirations(@schedule_interval)
|> Enum.each(fn expiration -> |> Enum.each(fn expiration ->
%{ Pleroma.Workers.ActivityExpirationWorker.enqueue(
"op" => "activity_expiration", "activity_expiration",
"activity_expiration_id" => expiration.id %{"activity_expiration_id" => expiration.id}
} )
|> ActivityExpirationWorker.new(worker_args(:activity_expiration))
|> Repo.insert()
end) end)
schedule_next() schedule_next()

View file

@ -4,12 +4,10 @@
defmodule Pleroma.DigestEmailWorker do defmodule Pleroma.DigestEmailWorker do
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.Workers.MailerWorker alias Pleroma.Workers.DigestEmailsWorker
import Ecto.Query import Ecto.Query
import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
def perform do def perform do
config = Pleroma.Config.get([:email_notifications, :digest]) config = Pleroma.Config.get([:email_notifications, :digest])
negative_interval = -Map.fetch!(config, :interval) negative_interval = -Map.fetch!(config, :interval)
@ -23,11 +21,9 @@ def perform do
where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"), where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"),
select: u select: u
) )
|> Pleroma.Repo.all() |> Repo.all()
|> Enum.each(fn user -> |> Enum.each(fn user ->
%{"op" => "digest_email", "user_id" => user.id} DigestEmailsWorker.enqueue("digest_email", %{"user_id" => user.id})
|> MailerWorker.new([queue: "digest_emails"] ++ worker_args(:digest_emails))
|> Repo.insert()
end) end)
end end

View file

@ -9,7 +9,6 @@ defmodule Pleroma.Emails.Mailer do
The module contains functions to delivery email using Swoosh.Mailer. The module contains functions to delivery email using Swoosh.Mailer.
""" """
alias Pleroma.Repo
alias Pleroma.Workers.MailerWorker alias Pleroma.Workers.MailerWorker
alias Swoosh.DeliveryError alias Swoosh.DeliveryError
@ -19,8 +18,6 @@ defmodule Pleroma.Emails.Mailer do
@spec enabled?() :: boolean() @spec enabled?() :: boolean()
def enabled?, do: Pleroma.Config.get([__MODULE__, :enabled]) def enabled?, do: Pleroma.Config.get([__MODULE__, :enabled])
import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
@doc "add email to queue" @doc "add email to queue"
def deliver_async(email, config \\ []) do def deliver_async(email, config \\ []) do
encoded_email = encoded_email =
@ -28,9 +25,7 @@ def deliver_async(email, config \\ []) do
|> :erlang.term_to_binary() |> :erlang.term_to_binary()
|> Base.encode64() |> Base.encode64()
%{"op" => "email", "encoded_email" => encoded_email, "config" => config} MailerWorker.enqueue("email", %{"encoded_email" => encoded_email, "config" => config})
|> MailerWorker.new(worker_args(:mailer))
|> Repo.insert()
end end
@doc "callback to perform send email from queue" @doc "callback to perform send email from queue"

View file

@ -8,7 +8,6 @@ defmodule Pleroma.ScheduledActivityWorker do
""" """
alias Pleroma.Config alias Pleroma.Config
alias Pleroma.Repo
alias Pleroma.ScheduledActivity alias Pleroma.ScheduledActivity
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
@ -18,8 +17,6 @@ defmodule Pleroma.ScheduledActivityWorker do
@schedule_interval :timer.minutes(1) @schedule_interval :timer.minutes(1)
import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
def start_link(_) do def start_link(_) do
GenServer.start_link(__MODULE__, nil) GenServer.start_link(__MODULE__, nil)
end end
@ -49,9 +46,10 @@ def perform(:execute, scheduled_activity_id) do
def handle_info(:perform, state) do def handle_info(:perform, state) do
ScheduledActivity.due_activities(@schedule_interval) ScheduledActivity.due_activities(@schedule_interval)
|> Enum.each(fn scheduled_activity -> |> Enum.each(fn scheduled_activity ->
%{"op" => "execute", "activity_id" => scheduled_activity.id} Pleroma.Workers.ScheduledActivityWorker.enqueue(
|> Pleroma.Workers.ScheduledActivityWorker.new(worker_args(:scheduled_activities)) "execute",
|> Repo.insert() %{"activity_id" => scheduled_activity.id}
)
end) end)
schedule_next() schedule_next()

View file

@ -41,8 +41,6 @@ defmodule Pleroma.User do
@strict_local_nickname_regex ~r/^[a-zA-Z\d]+$/ @strict_local_nickname_regex ~r/^[a-zA-Z\d]+$/
@extended_local_nickname_regex ~r/^[a-zA-Z\d_-]+$/ @extended_local_nickname_regex ~r/^[a-zA-Z\d_-]+$/
import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
schema "users" do schema "users" do
field(:bio, :string) field(:bio, :string)
field(:email, :string) field(:email, :string)
@ -623,9 +621,7 @@ def get_or_fetch_by_nickname(nickname) do
@doc "Fetch some posts when the user has just been federated with" @doc "Fetch some posts when the user has just been federated with"
def fetch_initial_posts(user) do def fetch_initial_posts(user) do
%{"op" => "fetch_initial_posts", "user_id" => user.id} BackgroundWorker.enqueue("fetch_initial_posts", %{"user_id" => user.id})
|> BackgroundWorker.new(worker_args(:background))
|> Repo.insert()
end end
@spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t() @spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t()
@ -1056,9 +1052,7 @@ def unblock_domain(user, domain) do
end end
def deactivate_async(user, status \\ true) do def deactivate_async(user, status \\ true) do
%{"op" => "deactivate_user", "user_id" => user.id, "status" => status} BackgroundWorker.enqueue("deactivate_user", %{"user_id" => user.id, "status" => status})
|> BackgroundWorker.new(worker_args(:background))
|> Repo.insert()
end end
def deactivate(%User{} = user, status \\ true) do def deactivate(%User{} = user, status \\ true) do
@ -1087,9 +1081,7 @@ def update_notification_settings(%User{} = user, settings \\ %{}) do
end end
def delete(%User{} = user) do def delete(%User{} = user) do
%{"op" => "delete_user", "user_id" => user.id} BackgroundWorker.enqueue("delete_user", %{"user_id" => user.id})
|> BackgroundWorker.new(worker_args(:background))
|> Repo.insert()
end end
@spec perform(atom(), User.t()) :: {:ok, User.t()} @spec perform(atom(), User.t()) :: {:ok, User.t()}
@ -1198,24 +1190,18 @@ def external_users(opts \\ []) do
end end
def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do
%{ BackgroundWorker.enqueue("blocks_import", %{
"op" => "blocks_import",
"blocker_id" => blocker.id, "blocker_id" => blocker.id,
"blocked_identifiers" => blocked_identifiers "blocked_identifiers" => blocked_identifiers
} })
|> BackgroundWorker.new(worker_args(:background))
|> Repo.insert()
end end
def follow_import(%User{} = follower, followed_identifiers) def follow_import(%User{} = follower, followed_identifiers)
when is_list(followed_identifiers) do when is_list(followed_identifiers) do
%{ BackgroundWorker.enqueue("follow_import", %{
"op" => "follow_import",
"follower_id" => follower.id, "follower_id" => follower.id,
"followed_identifiers" => followed_identifiers "followed_identifiers" => followed_identifiers
} })
|> BackgroundWorker.new(worker_args(:background))
|> Repo.insert()
end end
def delete_user_activities(%User{ap_id: ap_id} = user) do def delete_user_activities(%User{ap_id: ap_id} = user) do

View file

@ -26,8 +26,6 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
require Logger require Logger
require Pleroma.Constants require Pleroma.Constants
import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
# For Announce activities, we filter the recipients based on following status for any actors # For Announce activities, we filter the recipients based on following status for any actors
# that match actual users. See issue #164 for more information about why this is necessary. # that match actual users. See issue #164 for more information about why this is necessary.
defp get_recipients(%{"type" => "Announce"} = data) do defp get_recipients(%{"type" => "Announce"} = data) do
@ -148,9 +146,7 @@ def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when
activity activity
end end
%{"op" => "fetch_data_for_activity", "activity_id" => activity.id} BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
|> BackgroundWorker.new(worker_args(:background))
|> Repo.insert()
Notification.create_notifications(activity) Notification.create_notifications(activity)

View file

@ -7,7 +7,6 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
@behaviour Pleroma.Web.ActivityPub.MRF @behaviour Pleroma.Web.ActivityPub.MRF
alias Pleroma.HTTP alias Pleroma.HTTP
alias Pleroma.Repo
alias Pleroma.Web.MediaProxy alias Pleroma.Web.MediaProxy
alias Pleroma.Workers.BackgroundWorker alias Pleroma.Workers.BackgroundWorker
@ -18,8 +17,6 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
recv_timeout: 10_000 recv_timeout: 10_000
] ]
import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
def perform(:prefetch, url) do def perform(:prefetch, url) do
Logger.info("Prefetching #{inspect(url)}") Logger.info("Prefetching #{inspect(url)}")
@ -34,9 +31,7 @@ def perform(:preload, %{"object" => %{"attachment" => attachments}} = _message)
url url
|> Enum.each(fn |> Enum.each(fn
%{"href" => href} -> %{"href" => href} ->
%{"op" => "media_proxy_prefetch", "url" => href} BackgroundWorker.enqueue("media_proxy_prefetch", %{"url" => href})
|> BackgroundWorker.new(worker_args(:background))
|> Repo.insert()
x -> x ->
Logger.debug("Unhandled attachment URL object #{inspect(x)}") Logger.debug("Unhandled attachment URL object #{inspect(x)}")
@ -52,9 +47,7 @@ def filter(
%{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message %{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message
) )
when is_list(attachments) and length(attachments) > 0 do when is_list(attachments) and length(attachments) > 0 do
%{"op" => "media_proxy_preload", "message" => message} BackgroundWorker.enqueue("media_proxy_preload", %{"message" => message})
|> BackgroundWorker.new(worker_args(:background))
|> Repo.insert()
{:ok, message} {:ok, message}
end end

View file

@ -22,8 +22,6 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
require Logger require Logger
require Pleroma.Constants require Pleroma.Constants
import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
@doc """ @doc """
Modifies an incoming AP object (mastodon format) to our internal format. Modifies an incoming AP object (mastodon format) to our internal format.
""" """
@ -1054,9 +1052,7 @@ def upgrade_user_from_ap_id(ap_id) do
already_ap <- User.ap_enabled?(user), already_ap <- User.ap_enabled?(user),
{:ok, user} <- user |> User.upgrade_changeset(data) |> User.update_and_set_cache() do {:ok, user} <- user |> User.upgrade_changeset(data) |> User.update_and_set_cache() do
unless already_ap do unless already_ap do
%{"op" => "user_upgrade", "user_id" => user.id} TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id})
|> TransmogrifierWorker.new(worker_args(:transmogrifier))
|> Repo.insert()
end end
{:ok, user} {:ok, user}

View file

@ -18,8 +18,6 @@ defmodule Pleroma.Web.Federator do
require Logger require Logger
import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
def init do def init do
# To do: consider removing this call in favor of scheduled execution (`quantum`-based) # To do: consider removing this call in favor of scheduled execution (`quantum`-based)
refresh_subscriptions(schedule_in: 60) refresh_subscriptions(schedule_in: 60)
@ -40,15 +38,11 @@ def allowed_incoming_reply_depth?(depth) do
# Client API # Client API
def incoming_doc(doc) do def incoming_doc(doc) do
%{"op" => "incoming_doc", "body" => doc} ReceiverWorker.enqueue("incoming_doc", %{"body" => doc})
|> ReceiverWorker.new(worker_args(:federator_incoming))
|> Pleroma.Repo.insert()
end end
def incoming_ap_doc(params) do def incoming_ap_doc(params) do
%{"op" => "incoming_ap_doc", "params" => params} ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params})
|> ReceiverWorker.new(worker_args(:federator_incoming))
|> Pleroma.Repo.insert()
end end
def publish(%{id: "pleroma:fakeid"} = activity) do def publish(%{id: "pleroma:fakeid"} = activity) do
@ -56,27 +50,19 @@ def publish(%{id: "pleroma:fakeid"} = activity) do
end end
def publish(activity) do def publish(activity) do
%{"op" => "publish", "activity_id" => activity.id} PublisherWorker.enqueue("publish", %{"activity_id" => activity.id})
|> PublisherWorker.new(worker_args(:federator_outgoing))
|> Pleroma.Repo.insert()
end end
def verify_websub(websub) do def verify_websub(websub) do
%{"op" => "verify_websub", "websub_id" => websub.id} SubscriberWorker.enqueue("verify_websub", %{"websub_id" => websub.id})
|> SubscriberWorker.new(worker_args(:federator_outgoing))
|> Pleroma.Repo.insert()
end end
def request_subscription(websub) do def request_subscription(websub) do
%{"op" => "request_subscription", "websub_id" => websub.id} SubscriberWorker.enqueue("request_subscription", %{"websub_id" => websub.id})
|> SubscriberWorker.new(worker_args(:federator_outgoing))
|> Pleroma.Repo.insert()
end end
def refresh_subscriptions(worker_args \\ []) do def refresh_subscriptions(worker_args \\ []) do
%{"op" => "refresh_subscriptions"} SubscriberWorker.enqueue("refresh_subscriptions", %{}, worker_args ++ [max_attempts: 1])
|> SubscriberWorker.new(worker_args ++ [max_attempts: 1] ++ worker_args(:federator_outgoing))
|> Pleroma.Repo.insert()
end end
# Job Worker Callbacks # Job Worker Callbacks

View file

@ -31,11 +31,10 @@ defmodule Pleroma.Web.Federator.Publisher do
""" """
@spec enqueue_one(module(), Map.t()) :: :ok @spec enqueue_one(module(), Map.t()) :: :ok
def enqueue_one(module, %{} = params) do def enqueue_one(module, %{} = params) do
worker_args = Pleroma.Workers.WorkerHelper.worker_args(:federator_outgoing) PublisherWorker.enqueue(
"publish_one",
%{"op" => "publish_one", "module" => to_string(module), "params" => params} %{"module" => to_string(module), "params" => params}
|> PublisherWorker.new(worker_args) )
|> Pleroma.Repo.insert()
end end
@doc """ @doc """

View file

@ -16,12 +16,9 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
@one_day @one_day
) )
alias Pleroma.Repo
alias Pleroma.Web.OAuth.Token alias Pleroma.Web.OAuth.Token
alias Pleroma.Workers.BackgroundWorker alias Pleroma.Workers.BackgroundWorker
import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
def start_link(_), do: GenServer.start_link(__MODULE__, %{}) def start_link(_), do: GenServer.start_link(__MODULE__, %{})
def init(_) do def init(_) do
@ -31,9 +28,7 @@ def init(_) do
@doc false @doc false
def handle_info(:perform, state) do def handle_info(:perform, state) do
%{"op" => "clean_expired_tokens"} BackgroundWorker.enqueue("clean_expired_tokens", %{})
|> BackgroundWorker.new(worker_args(:background))
|> Repo.insert()
Process.send_after(self(), :perform, @interval) Process.send_after(self(), :perform, @interval)
{:noreply, state} {:noreply, state}

View file

@ -3,13 +3,10 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Push do defmodule Pleroma.Web.Push do
alias Pleroma.Repo
alias Pleroma.Workers.WebPusherWorker alias Pleroma.Workers.WebPusherWorker
require Logger require Logger
import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
def init do def init do
unless enabled() do unless enabled() do
Logger.warn(""" Logger.warn("""
@ -35,8 +32,6 @@ def enabled do
end end
def send(notification) do def send(notification) do
%{"op" => "web_push", "notification_id" => notification.id} WebPusherWorker.enqueue("web_push", %{"notification_id" => notification.id})
|> WebPusherWorker.new(worker_args(:web_push))
|> Repo.insert()
end end
end end

View file

@ -8,6 +8,8 @@ defmodule Pleroma.Workers.ActivityExpirationWorker do
queue: "activity_expiration", queue: "activity_expiration",
max_attempts: 1 max_attempts: 1
use Pleroma.Workers.WorkerHelper, queue: "activity_expiration"
@impl Oban.Worker @impl Oban.Worker
def perform( def perform(
%{ %{

View file

@ -13,6 +13,8 @@ defmodule Pleroma.Workers.BackgroundWorker do
queue: "background", queue: "background",
max_attempts: 1 max_attempts: 1
use Pleroma.Workers.WorkerHelper, queue: "background"
@impl Oban.Worker @impl Oban.Worker
def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do
user = User.get_cached_by_id(user_id) user = User.get_cached_by_id(user_id)

View file

@ -0,0 +1,21 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.DigestEmailsWorker do
alias Pleroma.User
# Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker,
queue: "digest_emails",
max_attempts: 1
use Pleroma.Workers.WorkerHelper, queue: "digest_emails"
@impl Oban.Worker
def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
user_id
|> User.get_cached_by_id()
|> Pleroma.DigestEmailWorker.perform()
end
end

View file

@ -3,13 +3,13 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.MailerWorker do defmodule Pleroma.Workers.MailerWorker do
alias Pleroma.User
# Note: `max_attempts` is intended to be overridden in `new/2` call # Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker, use Oban.Worker,
queue: "mailer", queue: "mailer",
max_attempts: 1 max_attempts: 1
use Pleroma.Workers.WorkerHelper, queue: "mailer"
@impl Oban.Worker @impl Oban.Worker
def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do
encoded_email encoded_email
@ -17,10 +17,4 @@ def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => con
|> :erlang.binary_to_term() |> :erlang.binary_to_term()
|> Pleroma.Emails.Mailer.deliver(config) |> Pleroma.Emails.Mailer.deliver(config)
end end
def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
user_id
|> User.get_cached_by_id()
|> Pleroma.DigestEmailWorker.perform()
end
end end

View file

@ -11,6 +11,8 @@ defmodule Pleroma.Workers.PublisherWorker do
queue: "federator_outgoing", queue: "federator_outgoing",
max_attempts: 1 max_attempts: 1
use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
def backoff(attempt) when is_integer(attempt) do def backoff(attempt) when is_integer(attempt) do
Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5) Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5)
end end

View file

@ -10,6 +10,8 @@ defmodule Pleroma.Workers.ReceiverWorker do
queue: "federator_incoming", queue: "federator_incoming",
max_attempts: 1 max_attempts: 1
use Pleroma.Workers.WorkerHelper, queue: "federator_incoming"
@impl Oban.Worker @impl Oban.Worker
def perform(%{"op" => "incoming_doc", "body" => doc}, _job) do def perform(%{"op" => "incoming_doc", "body" => doc}, _job) do
Federator.perform(:incoming_doc, doc) Federator.perform(:incoming_doc, doc)

View file

@ -8,6 +8,8 @@ defmodule Pleroma.Workers.ScheduledActivityWorker do
queue: "scheduled_activities", queue: "scheduled_activities",
max_attempts: 1 max_attempts: 1
use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities"
@impl Oban.Worker @impl Oban.Worker
def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do
Pleroma.ScheduledActivityWorker.perform(:execute, activity_id) Pleroma.ScheduledActivityWorker.perform(:execute, activity_id)

View file

@ -12,6 +12,8 @@ defmodule Pleroma.Workers.SubscriberWorker do
queue: "federator_outgoing", queue: "federator_outgoing",
max_attempts: 1 max_attempts: 1
use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
@impl Oban.Worker @impl Oban.Worker
def perform(%{"op" => "refresh_subscriptions"}, _job) do def perform(%{"op" => "refresh_subscriptions"}, _job) do
Federator.perform(:refresh_subscriptions) Federator.perform(:refresh_subscriptions)

View file

@ -10,6 +10,8 @@ defmodule Pleroma.Workers.TransmogrifierWorker do
queue: "transmogrifier", queue: "transmogrifier",
max_attempts: 1 max_attempts: 1
use Pleroma.Workers.WorkerHelper, queue: "transmogrifier"
@impl Oban.Worker @impl Oban.Worker
def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do
user = User.get_cached_by_id(user_id) user = User.get_cached_by_id(user_id)

View file

@ -11,6 +11,8 @@ defmodule Pleroma.Workers.WebPusherWorker do
queue: "web_push", queue: "web_push",
max_attempts: 1 max_attempts: 1
use Pleroma.Workers.WorkerHelper, queue: "web_push"
@impl Oban.Worker @impl Oban.Worker
def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do
notification = Repo.get(Notification, notification_id) notification = Repo.get(Notification, notification_id)

View file

@ -4,6 +4,7 @@
defmodule Pleroma.Workers.WorkerHelper do defmodule Pleroma.Workers.WorkerHelper do
alias Pleroma.Config alias Pleroma.Config
alias Pleroma.Workers.WorkerHelper
def worker_args(queue) do def worker_args(queue) do
case Config.get([:workers, :retries, queue]) do case Config.get([:workers, :retries, queue]) do
@ -20,4 +21,21 @@ def sidekiq_backoff(attempt, pow \\ 4, base_backoff \\ 15) do
trunc(backoff) trunc(backoff)
end end
defmacro __using__(opts) do
caller_module = __CALLER__.module
queue = Keyword.fetch!(opts, :queue)
quote do
def enqueue(op, params, worker_args \\ []) do
params = Map.merge(%{"op" => op}, params)
queue_atom = String.to_atom(unquote(queue))
worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom)
unquote(caller_module)
|> apply(:new, [params, worker_args])
|> Pleroma.Repo.insert()
end
end
end
end end