diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8b73c783f..c9d6fef17 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -19,7 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Mastodon API: Unsubscribe followers when they unfollow a user
- AdminAPI: Add "godmode" while fetching user statuses (i.e. admin can see private statuses)
- Improve digest email template
-- Replaced [pleroma_job_queue](https://git.pleroma.social/pleroma/pleroma_job_queue) with [Oban](https://github.com/sorentwo/oban)
+- Replaced [pleroma_job_queue](https://git.pleroma.social/pleroma/pleroma_job_queue) and `Pleroma.Web.Federator.RetryQueue` with [Oban](https://github.com/sorentwo/oban) (see [`docs/config.md`](docs/config.md) on migrating customized worker / retry settings).
- Introduced [quantum](https://github.com/quantum-elixir/quantum-core) job scheduler
### Fixed
diff --git a/config/config.exs b/config/config.exs
index da89aa3e9..6fb4a0969 100644
--- a/config/config.exs
+++ b/config/config.exs
@@ -470,7 +470,7 @@
config :pleroma, Oban,
repo: Pleroma.Repo,
verbose: false,
- prune: {:maxage, 60 * 60 * 24 * 7},
+ prune: {:maxlen, 1500},
queues: [
activity_expiration: 10,
federator_incoming: 50,
diff --git a/config/test.exs b/config/test.exs
index 0ef809ac1..df512b5d7 100644
--- a/config/test.exs
+++ b/config/test.exs
@@ -65,6 +65,8 @@
queues: false,
prune: :disabled
+config :pleroma, Pleroma.Scheduler, jobs: []
+
config :pleroma, Pleroma.ScheduledActivity,
daily_user_limit: 2,
total_user_limit: 3,
diff --git a/docs/config.md b/docs/config.md
index 2e351e272..29a4d4c97 100644
--- a/docs/config.md
+++ b/docs/config.md
@@ -404,20 +404,29 @@ curl "http://localhost:4000/api/pleroma/admin/invite_token?admin_token=somerando
[Oban](https://github.com/sorentwo/oban) asynchronous job processor configuration.
+Configuration options described in [Oban readme](https://github.com/sorentwo/oban#usage):
+* `repo` - app's Ecto repo (`Pleroma.Repo`)
+* `verbose` - logs verbosity
+* `prune` - non-retryable jobs [pruning settings](https://github.com/sorentwo/oban#pruning) (`:disabled` / `{:maxlen, value}` / `{:maxage, value}`)
+* `queues` - job queues (see below)
+
Pleroma has the following queues:
+* `activity_expiration` - Activity expiration
* `federator_outgoing` - Outgoing federation
* `federator_incoming` - Incoming federation
-* `mailer` - Email sender, see [`Pleroma.Emails.Mailer`](#pleroma-emails-mailer)
+* `mailer` - Email sender, see [`Pleroma.Emails.Mailer`](#pleromaemailsmailer)
* `transmogrifier` - Transmogrifier
* `web_push` - Web push notifications
-* `scheduled_activities` - Scheduled activities, see [`Pleroma.ScheduledActivities`](#pleromascheduledactivity)
+* `scheduled_activities` - Scheduled activities, see [`Pleroma.ScheduledActivity`](#pleromascheduledactivity)
Example:
```elixir
config :pleroma, Oban,
repo: Pleroma.Repo,
+ verbose: false,
+ prune: {:maxlen, 1500},
queues: [
federator_incoming: 50,
federator_outgoing: 50
@@ -426,12 +435,37 @@ config :pleroma, Oban,
This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the number of max concurrent jobs set to `50`.
+### Migrating `pleroma_job_queue` settings
+
+`config :pleroma_job_queue, :queues` is replaced by `config :pleroma, Oban, :queues` and uses the same format (keys are queues' names, values are max concurrent jobs numbers).
+
+### Note on running with PostgreSQL in silent mode
+
+If you are running PostgreSQL in [`silent_mode`](https://postgresqlco.nf/en/doc/param/silent_mode?version=9.1), it's advised to set [`log_destination`](https://postgresqlco.nf/en/doc/param/log_destination?version=9.1) to `syslog`,
+otherwise `postmaster.log` file may grow because of "you don't own a lock of type ShareLock" warnings (see https://github.com/sorentwo/oban/issues/52).
+
## :workers
Includes custom worker options not interpretable directly by `Oban`.
* `retries` — keyword lists where keys are `Oban` queues (see above) and values are numbers of max attempts for failed jobs.
+Example:
+
+```elixir
+config :pleroma, :workers,
+ retries: [
+ federator_incoming: 5,
+ federator_outgoing: 5
+ ]
+```
+
+### Migrating `Pleroma.Web.Federator.RetryQueue` settings
+
+* `max_retries` is replaced with `config :pleroma, :workers, retries: [federator_outgoing: 5]`
+* `enabled: false` corresponds to `config :pleroma, :workers, retries: [federator_outgoing: 1]`
+* deprecated options: `max_jobs`, `initial_timeout`
+
## Pleroma.Web.Metadata
* `providers`: a list of metadata providers to enable. Providers available:
* Pleroma.Web.Metadata.Providers.OpenGraph
@@ -491,6 +525,24 @@ config :auto_linker,
]
```
+## Pleroma.Scheduler
+
+Configuration for [Quantum](https://github.com/quantum-elixir/quantum-core) jobs scheduler.
+
+See [Quantum readme](https://github.com/quantum-elixir/quantum-core#usage) for the list of supported options.
+
+Example:
+
+```elixir
+config :pleroma, Pleroma.Scheduler,
+ global: true,
+ overlap: true,
+ timezone: :utc,
+ jobs: [{"0 */6 * * * *", {Pleroma.Web.Websub, :refresh_subscriptions, []}}]
+```
+
+The above example defines a single job which invokes `Pleroma.Web.Websub.refresh_subscriptions()` every 6 hours ("0 */6 * * * *", [crontab format](https://en.wikipedia.org/wiki/Cron)).
+
## Pleroma.ScheduledActivity
* `daily_user_limit`: the number of scheduled activities a user is allowed to create in a single day (Default: `25`)
diff --git a/lib/pleroma/activity_expiration_worker.ex b/lib/pleroma/activity_expiration_worker.ex
index 5c0c53232..7aba7eece 100644
--- a/lib/pleroma/activity_expiration_worker.ex
+++ b/lib/pleroma/activity_expiration_worker.ex
@@ -9,13 +9,13 @@ defmodule Pleroma.ActivityExpirationWorker do
alias Pleroma.Repo
alias Pleroma.User
alias Pleroma.Web.CommonAPI
- alias Pleroma.Workers.BackgroundWorker
+ alias Pleroma.Workers.ActivityExpirationWorker
require Logger
use GenServer
import Ecto.Query
- defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+ import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
@schedule_interval :timer.minutes(1)
@@ -57,7 +57,7 @@ def handle_info(:perform, state) do
"op" => "activity_expiration",
"activity_expiration_id" => expiration.id
}
- |> BackgroundWorker.new(worker_args(:activity_expiration))
+ |> ActivityExpirationWorker.new(worker_args(:activity_expiration))
|> Repo.insert()
end)
diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index 7d38ed5c4..f8f866dbd 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -43,7 +43,7 @@ def start(_type, _args) do
hackney_pool_children() ++
[
Pleroma.Stats,
- {Oban, Application.get_env(:pleroma, Oban)},
+ {Oban, Pleroma.Config.get(Oban)},
%{
id: :web_push_init,
start: {Task, :start_link, [&Pleroma.Web.Push.init/0]},
diff --git a/lib/pleroma/digest_email_worker.ex b/lib/pleroma/digest_email_worker.ex
index ffc48bfab..4ab2a4ef4 100644
--- a/lib/pleroma/digest_email_worker.ex
+++ b/lib/pleroma/digest_email_worker.ex
@@ -4,11 +4,11 @@
defmodule Pleroma.DigestEmailWorker do
alias Pleroma.Repo
- alias Pleroma.Workers.Mailer, as: MailerWorker
+ alias Pleroma.Workers.MailerWorker
import Ecto.Query
- defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+ import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
def perform do
config = Pleroma.Config.get([:email_notifications, :digest])
diff --git a/lib/pleroma/emails/mailer.ex b/lib/pleroma/emails/mailer.ex
index bb534f602..9cbe7313c 100644
--- a/lib/pleroma/emails/mailer.ex
+++ b/lib/pleroma/emails/mailer.ex
@@ -10,7 +10,7 @@ defmodule Pleroma.Emails.Mailer do
"""
alias Pleroma.Repo
- alias Pleroma.Workers.Mailer, as: MailerWorker
+ alias Pleroma.Workers.MailerWorker
alias Swoosh.DeliveryError
@otp_app :pleroma
@@ -19,7 +19,7 @@ defmodule Pleroma.Emails.Mailer do
@spec enabled?() :: boolean()
def enabled?, do: Pleroma.Config.get([__MODULE__, :enabled])
- defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+ import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
@doc "add email to queue"
def deliver_async(email, config \\ []) do
diff --git a/lib/pleroma/scheduled_activity_worker.ex b/lib/pleroma/scheduled_activity_worker.ex
index a01fb4fcb..8bf534f42 100644
--- a/lib/pleroma/scheduled_activity_worker.ex
+++ b/lib/pleroma/scheduled_activity_worker.ex
@@ -18,7 +18,7 @@ defmodule Pleroma.ScheduledActivityWorker do
@schedule_interval :timer.minutes(1)
- defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+ import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
def start_link(_) do
GenServer.start_link(__MODULE__, nil)
diff --git a/lib/pleroma/user.ex b/lib/pleroma/user.ex
index 18bba0fbb..abfa063fb 100644
--- a/lib/pleroma/user.ex
+++ b/lib/pleroma/user.ex
@@ -41,7 +41,7 @@ defmodule Pleroma.User do
@strict_local_nickname_regex ~r/^[a-zA-Z\d]+$/
@extended_local_nickname_regex ~r/^[a-zA-Z\d_-]+$/
- defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+ import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
schema "users" do
field(:bio, :string)
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex
index 50279cca5..74c5eb91c 100644
--- a/lib/pleroma/web/activity_pub/activity_pub.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub.ex
@@ -26,7 +26,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
require Logger
require Pleroma.Constants
- defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+ import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
# For Announce activities, we filter the recipients based on following status for any actors
# that match actual users. See issue #164 for more information about why this is necessary.
diff --git a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex
index b188164ee..178321558 100644
--- a/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex
+++ b/lib/pleroma/web/activity_pub/mrf/mediaproxy_warming_policy.ex
@@ -18,7 +18,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
recv_timeout: 10_000
]
- defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+ import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
def perform(:prefetch, url) do
Logger.info("Prefetching #{inspect(url)}")
diff --git a/lib/pleroma/web/activity_pub/publisher.ex b/lib/pleroma/web/activity_pub/publisher.ex
index 24d101dc8..a6322e25a 100644
--- a/lib/pleroma/web/activity_pub/publisher.ex
+++ b/lib/pleroma/web/activity_pub/publisher.ex
@@ -85,7 +85,7 @@ def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = pa
end
def publish_one(%{actor_id: actor_id} = params) do
- actor = User.get_by_id(actor_id)
+ actor = User.get_cached_by_id(actor_id)
params
|> Map.delete(:actor_id)
diff --git a/lib/pleroma/web/activity_pub/transmogrifier.ex b/lib/pleroma/web/activity_pub/transmogrifier.ex
index b068d28a7..9437f9a16 100644
--- a/lib/pleroma/web/activity_pub/transmogrifier.ex
+++ b/lib/pleroma/web/activity_pub/transmogrifier.ex
@@ -15,14 +15,14 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.Federator
- alias Pleroma.Workers.Transmogrifier, as: TransmogrifierWorker
+ alias Pleroma.Workers.TransmogrifierWorker
import Ecto.Query
require Logger
require Pleroma.Constants
- defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+ import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
@doc """
Modifies an incoming AP object (mastodon format) to our internal format.
diff --git a/lib/pleroma/web/federator/federator.ex b/lib/pleroma/web/federator/federator.ex
index cf7e50fee..8f43066e3 100644
--- a/lib/pleroma/web/federator/federator.ex
+++ b/lib/pleroma/web/federator/federator.ex
@@ -12,13 +12,13 @@ defmodule Pleroma.Web.Federator do
alias Pleroma.Web.Federator.Publisher
alias Pleroma.Web.OStatus
alias Pleroma.Web.Websub
- alias Pleroma.Workers.Publisher, as: PublisherWorker
- alias Pleroma.Workers.Receiver, as: ReceiverWorker
- alias Pleroma.Workers.Subscriber, as: SubscriberWorker
+ alias Pleroma.Workers.PublisherWorker
+ alias Pleroma.Workers.ReceiverWorker
+ alias Pleroma.Workers.SubscriberWorker
require Logger
- defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+ import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
def init do
# To do: consider removing this call in favor of scheduled execution (`quantum`-based)
diff --git a/lib/pleroma/web/federator/publisher.ex b/lib/pleroma/web/federator/publisher.ex
index 05d2be615..42be109ab 100644
--- a/lib/pleroma/web/federator/publisher.ex
+++ b/lib/pleroma/web/federator/publisher.ex
@@ -6,7 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do
alias Pleroma.Activity
alias Pleroma.Config
alias Pleroma.User
- alias Pleroma.Workers.Publisher, as: PublisherWorker
+ alias Pleroma.Workers.PublisherWorker
require Logger
@@ -31,12 +31,7 @@ defmodule Pleroma.Web.Federator.Publisher do
"""
@spec enqueue_one(module(), Map.t()) :: :ok
def enqueue_one(module, %{} = params) do
- worker_args =
- if max_attempts = Pleroma.Config.get([:workers, :retries, :federator_outgoing]) do
- [max_attempts: max_attempts]
- else
- []
- end
+ worker_args = Pleroma.Workers.WorkerHelper.worker_args(:federator_outgoing)
%{"op" => "publish_one", "module" => to_string(module), "params" => params}
|> PublisherWorker.new(worker_args)
diff --git a/lib/pleroma/web/oauth/token/clean_worker.ex b/lib/pleroma/web/oauth/token/clean_worker.ex
index 943e73289..b150a68a7 100644
--- a/lib/pleroma/web/oauth/token/clean_worker.ex
+++ b/lib/pleroma/web/oauth/token/clean_worker.ex
@@ -20,7 +20,7 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
alias Pleroma.Web.OAuth.Token
alias Pleroma.Workers.BackgroundWorker
- defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+ import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
def start_link(_), do: GenServer.start_link(__MODULE__, %{})
diff --git a/lib/pleroma/web/push/push.ex b/lib/pleroma/web/push/push.ex
index b4f0e5127..4973b529c 100644
--- a/lib/pleroma/web/push/push.ex
+++ b/lib/pleroma/web/push/push.ex
@@ -4,11 +4,11 @@
defmodule Pleroma.Web.Push do
alias Pleroma.Repo
- alias Pleroma.Workers.WebPusher
+ alias Pleroma.Workers.WebPusherWorker
require Logger
- defdelegate worker_args(queue), to: Pleroma.Workers.Helper
+ import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
def init do
unless enabled() do
@@ -36,7 +36,7 @@ def enabled do
def send(notification) do
%{"op" => "web_push", "notification_id" => notification.id}
- |> WebPusher.new(worker_args(:web_push))
+ |> WebPusherWorker.new(worker_args(:web_push))
|> Repo.insert()
end
end
diff --git a/lib/pleroma/web/salmon/salmon.ex b/lib/pleroma/web/salmon/salmon.ex
index bbaa293fd..8ba7380c0 100644
--- a/lib/pleroma/web/salmon/salmon.ex
+++ b/lib/pleroma/web/salmon/salmon.ex
@@ -171,7 +171,7 @@ def publish_one(%{recipient: url, feed: feed} = params) when is_binary(url) do
end
def publish_one(%{recipient_id: recipient_id} = params) do
- recipient = User.get_by_id(recipient_id)
+ recipient = User.get_cached_by_id(recipient_id)
params
|> Map.delete(:recipient_id)
diff --git a/lib/pleroma/workers/activity_expiration_worker.ex b/lib/pleroma/workers/activity_expiration_worker.ex
new file mode 100644
index 000000000..0b491eabb
--- /dev/null
+++ b/lib/pleroma/workers/activity_expiration_worker.ex
@@ -0,0 +1,21 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.ActivityExpirationWorker do
+ # Note: `max_attempts` is intended to be overridden in `new/2` call
+ use Oban.Worker,
+ queue: "activity_expiration",
+ max_attempts: 1
+
+ @impl Oban.Worker
+ def perform(
+ %{
+ "op" => "activity_expiration",
+ "activity_expiration_id" => activity_expiration_id
+ },
+ _job
+ ) do
+ Pleroma.ActivityExpirationWorker.perform(:execute, activity_expiration_id)
+ end
+end
diff --git a/lib/pleroma/workers/background_worker.ex b/lib/pleroma/workers/background_worker.ex
index fbce7d789..7b5575a5f 100644
--- a/lib/pleroma/workers/background_worker.ex
+++ b/lib/pleroma/workers/background_worker.ex
@@ -8,24 +8,24 @@ defmodule Pleroma.Workers.BackgroundWorker do
alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
alias Pleroma.Web.OAuth.Token.CleanWorker
- # Note: `max_attempts` is intended to be overridden in `new/1` call
+ # Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker,
queue: "background",
max_attempts: 1
@impl Oban.Worker
def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do
- user = User.get_by_id(user_id)
+ user = User.get_cached_by_id(user_id)
User.perform(:fetch_initial_posts, user)
end
def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}, _job) do
- user = User.get_by_id(user_id)
+ user = User.get_cached_by_id(user_id)
User.perform(:deactivate_async, user, status)
end
def perform(%{"op" => "delete_user", "user_id" => user_id}, _job) do
- user = User.get_by_id(user_id)
+ user = User.get_cached_by_id(user_id)
User.perform(:delete, user)
end
@@ -37,7 +37,7 @@ def perform(
},
_job
) do
- blocker = User.get_by_id(blocker_id)
+ blocker = User.get_cached_by_id(blocker_id)
User.perform(:blocks_import, blocker, blocked_identifiers)
end
@@ -49,7 +49,7 @@ def perform(
},
_job
) do
- follower = User.get_by_id(follower_id)
+ follower = User.get_cached_by_id(follower_id)
User.perform(:follow_import, follower, followed_identifiers)
end
@@ -69,11 +69,4 @@ def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id},
activity = Activity.get_by_id(activity_id)
Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity)
end
-
- def perform(
- %{"op" => "activity_expiration", "activity_expiration_id" => activity_expiration_id},
- _job
- ) do
- Pleroma.ActivityExpirationWorker.perform(:execute, activity_expiration_id)
- end
end
diff --git a/lib/pleroma/workers/helper.ex b/lib/pleroma/workers/helper.ex
deleted file mode 100644
index 3286ce0e8..000000000
--- a/lib/pleroma/workers/helper.ex
+++ /dev/null
@@ -1,13 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2019 Pleroma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Workers.Helper do
- def worker_args(queue) do
- if max_attempts = Pleroma.Config.get([:workers, :retries, queue]) do
- [max_attempts: max_attempts]
- else
- []
- end
- end
-end
diff --git a/lib/pleroma/workers/mailer.ex b/lib/pleroma/workers/mailer_worker.ex
similarity index 58%
rename from lib/pleroma/workers/mailer.ex
rename to lib/pleroma/workers/mailer_worker.ex
index 1cce2ea03..4f73d61bc 100644
--- a/lib/pleroma/workers/mailer.ex
+++ b/lib/pleroma/workers/mailer_worker.ex
@@ -2,26 +2,25 @@
# Copyright © 2017-2019 Pleroma Authors
# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.Workers.Mailer do
+defmodule Pleroma.Workers.MailerWorker do
alias Pleroma.User
- # Note: `max_attempts` is intended to be overridden in `new/1` call
+ # Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker,
queue: "mailer",
max_attempts: 1
@impl Oban.Worker
def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do
- email =
- encoded_email
- |> Base.decode64!()
- |> :erlang.binary_to_term()
-
- Pleroma.Emails.Mailer.deliver(email, config)
+ encoded_email
+ |> Base.decode64!()
+ |> :erlang.binary_to_term()
+ |> Pleroma.Emails.Mailer.deliver(config)
end
def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
- user = User.get_by_id(user_id)
- Pleroma.DigestEmailWorker.perform(user)
+ user_id
+ |> User.get_cached_by_id()
+ |> Pleroma.DigestEmailWorker.perform()
end
end
diff --git a/lib/pleroma/workers/publisher.ex b/lib/pleroma/workers/publisher_worker.ex
similarity index 76%
rename from lib/pleroma/workers/publisher.ex
rename to lib/pleroma/workers/publisher_worker.ex
index 00fae99c7..5671d2a29 100644
--- a/lib/pleroma/workers/publisher.ex
+++ b/lib/pleroma/workers/publisher_worker.ex
@@ -2,15 +2,19 @@
# Copyright © 2017-2019 Pleroma Authors
# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.Workers.Publisher do
+defmodule Pleroma.Workers.PublisherWorker do
alias Pleroma.Activity
alias Pleroma.Web.Federator
- # Note: `max_attempts` is intended to be overridden in `new/1` call
+ # Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker,
queue: "federator_outgoing",
max_attempts: 1
+ def backoff(attempt) when is_integer(attempt) do
+ Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5)
+ end
+
@impl Oban.Worker
def perform(%{"op" => "publish", "activity_id" => activity_id}, _job) do
activity = Activity.get_by_id(activity_id)
diff --git a/lib/pleroma/workers/receiver.ex b/lib/pleroma/workers/receiver_worker.ex
similarity index 83%
rename from lib/pleroma/workers/receiver.ex
rename to lib/pleroma/workers/receiver_worker.ex
index 4ee270d74..cdce630f2 100644
--- a/lib/pleroma/workers/receiver.ex
+++ b/lib/pleroma/workers/receiver_worker.ex
@@ -2,10 +2,10 @@
# Copyright © 2017-2019 Pleroma Authors
# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.Workers.Receiver do
+defmodule Pleroma.Workers.ReceiverWorker do
alias Pleroma.Web.Federator
- # Note: `max_attempts` is intended to be overridden in `new/1` call
+ # Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker,
queue: "federator_incoming",
max_attempts: 1
diff --git a/lib/pleroma/workers/scheduled_activity_worker.ex b/lib/pleroma/workers/scheduled_activity_worker.ex
index d9724c78a..4094411ae 100644
--- a/lib/pleroma/workers/scheduled_activity_worker.ex
+++ b/lib/pleroma/workers/scheduled_activity_worker.ex
@@ -3,7 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.ScheduledActivityWorker do
- # Note: `max_attempts` is intended to be overridden in `new/1` call
+ # Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker,
queue: "scheduled_activities",
max_attempts: 1
diff --git a/lib/pleroma/workers/subscriber.ex b/lib/pleroma/workers/subscriber_worker.ex
similarity index 88%
rename from lib/pleroma/workers/subscriber.ex
rename to lib/pleroma/workers/subscriber_worker.ex
index e960b35bf..22d1dc956 100644
--- a/lib/pleroma/workers/subscriber.ex
+++ b/lib/pleroma/workers/subscriber_worker.ex
@@ -2,12 +2,12 @@
# Copyright © 2017-2019 Pleroma Authors
# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.Workers.Subscriber do
+defmodule Pleroma.Workers.SubscriberWorker do
alias Pleroma.Repo
alias Pleroma.Web.Federator
alias Pleroma.Web.Websub
- # Note: `max_attempts` is intended to be overridden in `new/1` call
+ # Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker,
queue: "federator_outgoing",
max_attempts: 1
diff --git a/lib/pleroma/workers/transmogrifier.ex b/lib/pleroma/workers/transmogrifier_worker.ex
similarity index 73%
rename from lib/pleroma/workers/transmogrifier.ex
rename to lib/pleroma/workers/transmogrifier_worker.ex
index e13202c06..6f5c1a2f2 100644
--- a/lib/pleroma/workers/transmogrifier.ex
+++ b/lib/pleroma/workers/transmogrifier_worker.ex
@@ -2,17 +2,17 @@
# Copyright © 2017-2019 Pleroma Authors
# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.Workers.Transmogrifier do
+defmodule Pleroma.Workers.TransmogrifierWorker do
alias Pleroma.User
- # Note: `max_attempts` is intended to be overridden in `new/1` call
+ # Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker,
queue: "transmogrifier",
max_attempts: 1
@impl Oban.Worker
def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do
- user = User.get_by_id(user_id)
+ user = User.get_cached_by_id(user_id)
Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user)
end
end
diff --git a/lib/pleroma/workers/web_pusher.ex b/lib/pleroma/workers/web_pusher_worker.ex
similarity index 82%
rename from lib/pleroma/workers/web_pusher.ex
rename to lib/pleroma/workers/web_pusher_worker.ex
index 7b78bb3ea..2b1d3b99a 100644
--- a/lib/pleroma/workers/web_pusher.ex
+++ b/lib/pleroma/workers/web_pusher_worker.ex
@@ -2,11 +2,11 @@
# Copyright © 2017-2019 Pleroma Authors
# SPDX-License-Identifier: AGPL-3.0-only
-defmodule Pleroma.Workers.WebPusher do
+defmodule Pleroma.Workers.WebPusherWorker do
alias Pleroma.Notification
alias Pleroma.Repo
- # Note: `max_attempts` is intended to be overridden in `new/1` call
+ # Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker,
queue: "web_push",
max_attempts: 1
diff --git a/lib/pleroma/workers/worker_helper.ex b/lib/pleroma/workers/worker_helper.ex
new file mode 100644
index 000000000..f9ed2e64d
--- /dev/null
+++ b/lib/pleroma/workers/worker_helper.ex
@@ -0,0 +1,23 @@
+# Pleroma: A lightweight social networking server
+# Copyright © 2017-2019 Pleroma Authors
+# SPDX-License-Identifier: AGPL-3.0-only
+
+defmodule Pleroma.Workers.WorkerHelper do
+ alias Pleroma.Config
+
+ def worker_args(queue) do
+ case Config.get([:workers, :retries, queue]) do
+ nil -> []
+ max_attempts -> [max_attempts: max_attempts]
+ end
+ end
+
+ def sidekiq_backoff(attempt, pow \\ 4, base_backoff \\ 15) do
+ backoff =
+ :math.pow(attempt, pow) +
+ base_backoff +
+ :rand.uniform(2 * base_backoff) * attempt
+
+ trunc(backoff)
+ end
+end
diff --git a/test/user_test.exs b/test/user_test.exs
index 86232de99..0acd0db4e 100644
--- a/test/user_test.exs
+++ b/test/user_test.exs
@@ -1123,7 +1123,7 @@ test "it deletes a user, all follow relationships and all activities", %{user: u
"id" => "pleroma:fakeid"
}
},
- all_enqueued(worker: Pleroma.Workers.Publisher)
+ all_enqueued(worker: Pleroma.Workers.PublisherWorker)
)
end
end
diff --git a/test/web/activity_pub/activity_pub_controller_test.exs b/test/web/activity_pub/activity_pub_controller_test.exs
index a1b567a46..f1c1bb503 100644
--- a/test/web/activity_pub/activity_pub_controller_test.exs
+++ b/test/web/activity_pub/activity_pub_controller_test.exs
@@ -17,7 +17,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
alias Pleroma.Web.ActivityPub.UserView
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.CommonAPI
- alias Pleroma.Workers.Receiver, as: ReceiverWorker
+ alias Pleroma.Workers.ReceiverWorker
setup_all do
Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
diff --git a/test/web/federator_test.exs b/test/web/federator_test.exs
index 5724672fd..4096d4690 100644
--- a/test/web/federator_test.exs
+++ b/test/web/federator_test.exs
@@ -7,7 +7,7 @@ defmodule Pleroma.Web.FederatorTest do
alias Pleroma.Tests.ObanHelpers
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Federator
- alias Pleroma.Workers.Publisher, as: PublisherWorker
+ alias Pleroma.Workers.PublisherWorker
use Pleroma.DataCase
use Oban.Testing, repo: Pleroma.Repo
diff --git a/test/web/websub/websub_test.exs b/test/web/websub/websub_test.exs
index 414610879..929acf5a2 100644
--- a/test/web/websub/websub_test.exs
+++ b/test/web/websub/websub_test.exs
@@ -11,7 +11,7 @@ defmodule Pleroma.Web.WebsubTest do
alias Pleroma.Web.Websub
alias Pleroma.Web.Websub.WebsubClientSubscription
alias Pleroma.Web.Websub.WebsubServerSubscription
- alias Pleroma.Workers.Subscriber, as: SubscriberWorker
+ alias Pleroma.Workers.SubscriberWorker
import Pleroma.Factory
import Tesla.Mock