[#1149] Addressed code review comments (code style, jobs pruning etc.).

This commit is contained in:
Ivan Tashkinov 2019-08-31 19:08:56 +03:00
parent e890ea7e82
commit a90ea8ba15
34 changed files with 163 additions and 87 deletions

View file

@ -19,7 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- Mastodon API: Unsubscribe followers when they unfollow a user - Mastodon API: Unsubscribe followers when they unfollow a user
- AdminAPI: Add "godmode" while fetching user statuses (i.e. admin can see private statuses) - AdminAPI: Add "godmode" while fetching user statuses (i.e. admin can see private statuses)
- Improve digest email template - Improve digest email template
- Replaced [pleroma_job_queue](https://git.pleroma.social/pleroma/pleroma_job_queue) with [Oban](https://github.com/sorentwo/oban) - Replaced [pleroma_job_queue](https://git.pleroma.social/pleroma/pleroma_job_queue) and `Pleroma.Web.Federator.RetryQueue` with [Oban](https://github.com/sorentwo/oban) (see [`docs/config.md`](docs/config.md) on migrating customized worker / retry settings).
- Introduced [quantum](https://github.com/quantum-elixir/quantum-core) job scheduler - Introduced [quantum](https://github.com/quantum-elixir/quantum-core) job scheduler
### Fixed ### Fixed

View file

@ -470,7 +470,7 @@
config :pleroma, Oban, config :pleroma, Oban,
repo: Pleroma.Repo, repo: Pleroma.Repo,
verbose: false, verbose: false,
prune: {:maxage, 60 * 60 * 24 * 7}, prune: {:maxlen, 1500},
queues: [ queues: [
activity_expiration: 10, activity_expiration: 10,
federator_incoming: 50, federator_incoming: 50,

View file

@ -65,6 +65,8 @@
queues: false, queues: false,
prune: :disabled prune: :disabled
config :pleroma, Pleroma.Scheduler, jobs: []
config :pleroma, Pleroma.ScheduledActivity, config :pleroma, Pleroma.ScheduledActivity,
daily_user_limit: 2, daily_user_limit: 2,
total_user_limit: 3, total_user_limit: 3,

View file

@ -404,20 +404,29 @@ curl "http://localhost:4000/api/pleroma/admin/invite_token?admin_token=somerando
[Oban](https://github.com/sorentwo/oban) asynchronous job processor configuration. [Oban](https://github.com/sorentwo/oban) asynchronous job processor configuration.
Configuration options described in [Oban readme](https://github.com/sorentwo/oban#usage):
* `repo` - app's Ecto repo (`Pleroma.Repo`)
* `verbose` - logs verbosity
* `prune` - non-retryable jobs [pruning settings](https://github.com/sorentwo/oban#pruning) (`:disabled` / `{:maxlen, value}` / `{:maxage, value}`)
* `queues` - job queues (see below)
Pleroma has the following queues: Pleroma has the following queues:
* `activity_expiration` - Activity expiration
* `federator_outgoing` - Outgoing federation * `federator_outgoing` - Outgoing federation
* `federator_incoming` - Incoming federation * `federator_incoming` - Incoming federation
* `mailer` - Email sender, see [`Pleroma.Emails.Mailer`](#pleroma-emails-mailer) * `mailer` - Email sender, see [`Pleroma.Emails.Mailer`](#pleromaemailsmailer)
* `transmogrifier` - Transmogrifier * `transmogrifier` - Transmogrifier
* `web_push` - Web push notifications * `web_push` - Web push notifications
* `scheduled_activities` - Scheduled activities, see [`Pleroma.ScheduledActivities`](#pleromascheduledactivity) * `scheduled_activities` - Scheduled activities, see [`Pleroma.ScheduledActivity`](#pleromascheduledactivity)
Example: Example:
```elixir ```elixir
config :pleroma, Oban, config :pleroma, Oban,
repo: Pleroma.Repo, repo: Pleroma.Repo,
verbose: false,
prune: {:maxlen, 1500},
queues: [ queues: [
federator_incoming: 50, federator_incoming: 50,
federator_outgoing: 50 federator_outgoing: 50
@ -426,12 +435,37 @@ config :pleroma, Oban,
This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the number of max concurrent jobs set to `50`. This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the number of max concurrent jobs set to `50`.
### Migrating `pleroma_job_queue` settings
`config :pleroma_job_queue, :queues` is replaced by `config :pleroma, Oban, :queues` and uses the same format (keys are queues' names, values are max concurrent jobs numbers).
### Note on running with PostgreSQL in silent mode
If you are running PostgreSQL in [`silent_mode`](https://postgresqlco.nf/en/doc/param/silent_mode?version=9.1), it's advised to set [`log_destination`](https://postgresqlco.nf/en/doc/param/log_destination?version=9.1) to `syslog`,
otherwise `postmaster.log` file may grow because of "you don't own a lock of type ShareLock" warnings (see https://github.com/sorentwo/oban/issues/52).
## :workers ## :workers
Includes custom worker options not interpretable directly by `Oban`. Includes custom worker options not interpretable directly by `Oban`.
* `retries` — keyword lists where keys are `Oban` queues (see above) and values are numbers of max attempts for failed jobs. * `retries` — keyword lists where keys are `Oban` queues (see above) and values are numbers of max attempts for failed jobs.
Example:
```elixir
config :pleroma, :workers,
retries: [
federator_incoming: 5,
federator_outgoing: 5
]
```
### Migrating `Pleroma.Web.Federator.RetryQueue` settings
* `max_retries` is replaced with `config :pleroma, :workers, retries: [federator_outgoing: 5]`
* `enabled: false` corresponds to `config :pleroma, :workers, retries: [federator_outgoing: 1]`
* deprecated options: `max_jobs`, `initial_timeout`
## Pleroma.Web.Metadata ## Pleroma.Web.Metadata
* `providers`: a list of metadata providers to enable. Providers available: * `providers`: a list of metadata providers to enable. Providers available:
* Pleroma.Web.Metadata.Providers.OpenGraph * Pleroma.Web.Metadata.Providers.OpenGraph
@ -491,6 +525,24 @@ config :auto_linker,
] ]
``` ```
## Pleroma.Scheduler
Configuration for [Quantum](https://github.com/quantum-elixir/quantum-core) jobs scheduler.
See [Quantum readme](https://github.com/quantum-elixir/quantum-core#usage) for the list of supported options.
Example:
```elixir
config :pleroma, Pleroma.Scheduler,
global: true,
overlap: true,
timezone: :utc,
jobs: [{"0 */6 * * * *", {Pleroma.Web.Websub, :refresh_subscriptions, []}}]
```
The above example defines a single job which invokes `Pleroma.Web.Websub.refresh_subscriptions()` every 6 hours ("0 */6 * * * *", [crontab format](https://en.wikipedia.org/wiki/Cron)).
## Pleroma.ScheduledActivity ## Pleroma.ScheduledActivity
* `daily_user_limit`: the number of scheduled activities a user is allowed to create in a single day (Default: `25`) * `daily_user_limit`: the number of scheduled activities a user is allowed to create in a single day (Default: `25`)

View file

@ -9,13 +9,13 @@ defmodule Pleroma.ActivityExpirationWorker do
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
alias Pleroma.Workers.BackgroundWorker alias Pleroma.Workers.ActivityExpirationWorker
require Logger require Logger
use GenServer use GenServer
import Ecto.Query import Ecto.Query
defdelegate worker_args(queue), to: Pleroma.Workers.Helper import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
@schedule_interval :timer.minutes(1) @schedule_interval :timer.minutes(1)
@ -57,7 +57,7 @@ def handle_info(:perform, state) do
"op" => "activity_expiration", "op" => "activity_expiration",
"activity_expiration_id" => expiration.id "activity_expiration_id" => expiration.id
} }
|> BackgroundWorker.new(worker_args(:activity_expiration)) |> ActivityExpirationWorker.new(worker_args(:activity_expiration))
|> Repo.insert() |> Repo.insert()
end) end)

View file

@ -43,7 +43,7 @@ def start(_type, _args) do
hackney_pool_children() ++ hackney_pool_children() ++
[ [
Pleroma.Stats, Pleroma.Stats,
{Oban, Application.get_env(:pleroma, Oban)}, {Oban, Pleroma.Config.get(Oban)},
%{ %{
id: :web_push_init, id: :web_push_init,
start: {Task, :start_link, [&Pleroma.Web.Push.init/0]}, start: {Task, :start_link, [&Pleroma.Web.Push.init/0]},

View file

@ -4,11 +4,11 @@
defmodule Pleroma.DigestEmailWorker do defmodule Pleroma.DigestEmailWorker do
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.Workers.Mailer, as: MailerWorker alias Pleroma.Workers.MailerWorker
import Ecto.Query import Ecto.Query
defdelegate worker_args(queue), to: Pleroma.Workers.Helper import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
def perform do def perform do
config = Pleroma.Config.get([:email_notifications, :digest]) config = Pleroma.Config.get([:email_notifications, :digest])

View file

@ -10,7 +10,7 @@ defmodule Pleroma.Emails.Mailer do
""" """
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.Workers.Mailer, as: MailerWorker alias Pleroma.Workers.MailerWorker
alias Swoosh.DeliveryError alias Swoosh.DeliveryError
@otp_app :pleroma @otp_app :pleroma
@ -19,7 +19,7 @@ defmodule Pleroma.Emails.Mailer do
@spec enabled?() :: boolean() @spec enabled?() :: boolean()
def enabled?, do: Pleroma.Config.get([__MODULE__, :enabled]) def enabled?, do: Pleroma.Config.get([__MODULE__, :enabled])
defdelegate worker_args(queue), to: Pleroma.Workers.Helper import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
@doc "add email to queue" @doc "add email to queue"
def deliver_async(email, config \\ []) do def deliver_async(email, config \\ []) do

View file

@ -18,7 +18,7 @@ defmodule Pleroma.ScheduledActivityWorker do
@schedule_interval :timer.minutes(1) @schedule_interval :timer.minutes(1)
defdelegate worker_args(queue), to: Pleroma.Workers.Helper import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
def start_link(_) do def start_link(_) do
GenServer.start_link(__MODULE__, nil) GenServer.start_link(__MODULE__, nil)

View file

@ -41,7 +41,7 @@ defmodule Pleroma.User do
@strict_local_nickname_regex ~r/^[a-zA-Z\d]+$/ @strict_local_nickname_regex ~r/^[a-zA-Z\d]+$/
@extended_local_nickname_regex ~r/^[a-zA-Z\d_-]+$/ @extended_local_nickname_regex ~r/^[a-zA-Z\d_-]+$/
defdelegate worker_args(queue), to: Pleroma.Workers.Helper import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
schema "users" do schema "users" do
field(:bio, :string) field(:bio, :string)

View file

@ -26,7 +26,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
require Logger require Logger
require Pleroma.Constants require Pleroma.Constants
defdelegate worker_args(queue), to: Pleroma.Workers.Helper import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
# For Announce activities, we filter the recipients based on following status for any actors # For Announce activities, we filter the recipients based on following status for any actors
# that match actual users. See issue #164 for more information about why this is necessary. # that match actual users. See issue #164 for more information about why this is necessary.

View file

@ -18,7 +18,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
recv_timeout: 10_000 recv_timeout: 10_000
] ]
defdelegate worker_args(queue), to: Pleroma.Workers.Helper import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
def perform(:prefetch, url) do def perform(:prefetch, url) do
Logger.info("Prefetching #{inspect(url)}") Logger.info("Prefetching #{inspect(url)}")

View file

@ -85,7 +85,7 @@ def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = pa
end end
def publish_one(%{actor_id: actor_id} = params) do def publish_one(%{actor_id: actor_id} = params) do
actor = User.get_by_id(actor_id) actor = User.get_cached_by_id(actor_id)
params params
|> Map.delete(:actor_id) |> Map.delete(:actor_id)

View file

@ -15,14 +15,14 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.Federator alias Pleroma.Web.Federator
alias Pleroma.Workers.Transmogrifier, as: TransmogrifierWorker alias Pleroma.Workers.TransmogrifierWorker
import Ecto.Query import Ecto.Query
require Logger require Logger
require Pleroma.Constants require Pleroma.Constants
defdelegate worker_args(queue), to: Pleroma.Workers.Helper import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
@doc """ @doc """
Modifies an incoming AP object (mastodon format) to our internal format. Modifies an incoming AP object (mastodon format) to our internal format.

View file

@ -12,13 +12,13 @@ defmodule Pleroma.Web.Federator do
alias Pleroma.Web.Federator.Publisher alias Pleroma.Web.Federator.Publisher
alias Pleroma.Web.OStatus alias Pleroma.Web.OStatus
alias Pleroma.Web.Websub alias Pleroma.Web.Websub
alias Pleroma.Workers.Publisher, as: PublisherWorker alias Pleroma.Workers.PublisherWorker
alias Pleroma.Workers.Receiver, as: ReceiverWorker alias Pleroma.Workers.ReceiverWorker
alias Pleroma.Workers.Subscriber, as: SubscriberWorker alias Pleroma.Workers.SubscriberWorker
require Logger require Logger
defdelegate worker_args(queue), to: Pleroma.Workers.Helper import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
def init do def init do
# To do: consider removing this call in favor of scheduled execution (`quantum`-based) # To do: consider removing this call in favor of scheduled execution (`quantum`-based)

View file

@ -6,7 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Config alias Pleroma.Config
alias Pleroma.User alias Pleroma.User
alias Pleroma.Workers.Publisher, as: PublisherWorker alias Pleroma.Workers.PublisherWorker
require Logger require Logger
@ -31,12 +31,7 @@ defmodule Pleroma.Web.Federator.Publisher do
""" """
@spec enqueue_one(module(), Map.t()) :: :ok @spec enqueue_one(module(), Map.t()) :: :ok
def enqueue_one(module, %{} = params) do def enqueue_one(module, %{} = params) do
worker_args = worker_args = Pleroma.Workers.WorkerHelper.worker_args(:federator_outgoing)
if max_attempts = Pleroma.Config.get([:workers, :retries, :federator_outgoing]) do
[max_attempts: max_attempts]
else
[]
end
%{"op" => "publish_one", "module" => to_string(module), "params" => params} %{"op" => "publish_one", "module" => to_string(module), "params" => params}
|> PublisherWorker.new(worker_args) |> PublisherWorker.new(worker_args)

View file

@ -20,7 +20,7 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
alias Pleroma.Web.OAuth.Token alias Pleroma.Web.OAuth.Token
alias Pleroma.Workers.BackgroundWorker alias Pleroma.Workers.BackgroundWorker
defdelegate worker_args(queue), to: Pleroma.Workers.Helper import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
def start_link(_), do: GenServer.start_link(__MODULE__, %{}) def start_link(_), do: GenServer.start_link(__MODULE__, %{})

View file

@ -4,11 +4,11 @@
defmodule Pleroma.Web.Push do defmodule Pleroma.Web.Push do
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.Workers.WebPusher alias Pleroma.Workers.WebPusherWorker
require Logger require Logger
defdelegate worker_args(queue), to: Pleroma.Workers.Helper import Pleroma.Workers.WorkerHelper, only: [worker_args: 1]
def init do def init do
unless enabled() do unless enabled() do
@ -36,7 +36,7 @@ def enabled do
def send(notification) do def send(notification) do
%{"op" => "web_push", "notification_id" => notification.id} %{"op" => "web_push", "notification_id" => notification.id}
|> WebPusher.new(worker_args(:web_push)) |> WebPusherWorker.new(worker_args(:web_push))
|> Repo.insert() |> Repo.insert()
end end
end end

View file

@ -171,7 +171,7 @@ def publish_one(%{recipient: url, feed: feed} = params) when is_binary(url) do
end end
def publish_one(%{recipient_id: recipient_id} = params) do def publish_one(%{recipient_id: recipient_id} = params) do
recipient = User.get_by_id(recipient_id) recipient = User.get_cached_by_id(recipient_id)
params params
|> Map.delete(:recipient_id) |> Map.delete(:recipient_id)

View file

@ -0,0 +1,21 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.ActivityExpirationWorker do
# Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker,
queue: "activity_expiration",
max_attempts: 1
@impl Oban.Worker
def perform(
%{
"op" => "activity_expiration",
"activity_expiration_id" => activity_expiration_id
},
_job
) do
Pleroma.ActivityExpirationWorker.perform(:execute, activity_expiration_id)
end
end

View file

@ -8,24 +8,24 @@ defmodule Pleroma.Workers.BackgroundWorker do
alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
alias Pleroma.Web.OAuth.Token.CleanWorker alias Pleroma.Web.OAuth.Token.CleanWorker
# Note: `max_attempts` is intended to be overridden in `new/1` call # Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker, use Oban.Worker,
queue: "background", queue: "background",
max_attempts: 1 max_attempts: 1
@impl Oban.Worker @impl Oban.Worker
def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do
user = User.get_by_id(user_id) user = User.get_cached_by_id(user_id)
User.perform(:fetch_initial_posts, user) User.perform(:fetch_initial_posts, user)
end end
def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}, _job) do def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}, _job) do
user = User.get_by_id(user_id) user = User.get_cached_by_id(user_id)
User.perform(:deactivate_async, user, status) User.perform(:deactivate_async, user, status)
end end
def perform(%{"op" => "delete_user", "user_id" => user_id}, _job) do def perform(%{"op" => "delete_user", "user_id" => user_id}, _job) do
user = User.get_by_id(user_id) user = User.get_cached_by_id(user_id)
User.perform(:delete, user) User.perform(:delete, user)
end end
@ -37,7 +37,7 @@ def perform(
}, },
_job _job
) do ) do
blocker = User.get_by_id(blocker_id) blocker = User.get_cached_by_id(blocker_id)
User.perform(:blocks_import, blocker, blocked_identifiers) User.perform(:blocks_import, blocker, blocked_identifiers)
end end
@ -49,7 +49,7 @@ def perform(
}, },
_job _job
) do ) do
follower = User.get_by_id(follower_id) follower = User.get_cached_by_id(follower_id)
User.perform(:follow_import, follower, followed_identifiers) User.perform(:follow_import, follower, followed_identifiers)
end end
@ -69,11 +69,4 @@ def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id},
activity = Activity.get_by_id(activity_id) activity = Activity.get_by_id(activity_id)
Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity) Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity)
end end
def perform(
%{"op" => "activity_expiration", "activity_expiration_id" => activity_expiration_id},
_job
) do
Pleroma.ActivityExpirationWorker.perform(:execute, activity_expiration_id)
end
end end

View file

@ -1,13 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Helper do
def worker_args(queue) do
if max_attempts = Pleroma.Config.get([:workers, :retries, queue]) do
[max_attempts: max_attempts]
else
[]
end
end
end

View file

@ -2,26 +2,25 @@
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Mailer do defmodule Pleroma.Workers.MailerWorker do
alias Pleroma.User alias Pleroma.User
# Note: `max_attempts` is intended to be overridden in `new/1` call # Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker, use Oban.Worker,
queue: "mailer", queue: "mailer",
max_attempts: 1 max_attempts: 1
@impl Oban.Worker @impl Oban.Worker
def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do
email = encoded_email
encoded_email |> Base.decode64!()
|> Base.decode64!() |> :erlang.binary_to_term()
|> :erlang.binary_to_term() |> Pleroma.Emails.Mailer.deliver(config)
Pleroma.Emails.Mailer.deliver(email, config)
end end
def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
user = User.get_by_id(user_id) user_id
Pleroma.DigestEmailWorker.perform(user) |> User.get_cached_by_id()
|> Pleroma.DigestEmailWorker.perform()
end end
end end

View file

@ -2,15 +2,19 @@
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Publisher do defmodule Pleroma.Workers.PublisherWorker do
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Web.Federator alias Pleroma.Web.Federator
# Note: `max_attempts` is intended to be overridden in `new/1` call # Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker, use Oban.Worker,
queue: "federator_outgoing", queue: "federator_outgoing",
max_attempts: 1 max_attempts: 1
def backoff(attempt) when is_integer(attempt) do
Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5)
end
@impl Oban.Worker @impl Oban.Worker
def perform(%{"op" => "publish", "activity_id" => activity_id}, _job) do def perform(%{"op" => "publish", "activity_id" => activity_id}, _job) do
activity = Activity.get_by_id(activity_id) activity = Activity.get_by_id(activity_id)

View file

@ -2,10 +2,10 @@
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Receiver do defmodule Pleroma.Workers.ReceiverWorker do
alias Pleroma.Web.Federator alias Pleroma.Web.Federator
# Note: `max_attempts` is intended to be overridden in `new/1` call # Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker, use Oban.Worker,
queue: "federator_incoming", queue: "federator_incoming",
max_attempts: 1 max_attempts: 1

View file

@ -3,7 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.ScheduledActivityWorker do defmodule Pleroma.Workers.ScheduledActivityWorker do
# Note: `max_attempts` is intended to be overridden in `new/1` call # Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker, use Oban.Worker,
queue: "scheduled_activities", queue: "scheduled_activities",
max_attempts: 1 max_attempts: 1

View file

@ -2,12 +2,12 @@
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Subscriber do defmodule Pleroma.Workers.SubscriberWorker do
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.Web.Federator alias Pleroma.Web.Federator
alias Pleroma.Web.Websub alias Pleroma.Web.Websub
# Note: `max_attempts` is intended to be overridden in `new/1` call # Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker, use Oban.Worker,
queue: "federator_outgoing", queue: "federator_outgoing",
max_attempts: 1 max_attempts: 1

View file

@ -2,17 +2,17 @@
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Transmogrifier do defmodule Pleroma.Workers.TransmogrifierWorker do
alias Pleroma.User alias Pleroma.User
# Note: `max_attempts` is intended to be overridden in `new/1` call # Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker, use Oban.Worker,
queue: "transmogrifier", queue: "transmogrifier",
max_attempts: 1 max_attempts: 1
@impl Oban.Worker @impl Oban.Worker
def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do
user = User.get_by_id(user_id) user = User.get_cached_by_id(user_id)
Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user) Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user)
end end
end end

View file

@ -2,11 +2,11 @@
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.WebPusher do defmodule Pleroma.Workers.WebPusherWorker do
alias Pleroma.Notification alias Pleroma.Notification
alias Pleroma.Repo alias Pleroma.Repo
# Note: `max_attempts` is intended to be overridden in `new/1` call # Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker, use Oban.Worker,
queue: "web_push", queue: "web_push",
max_attempts: 1 max_attempts: 1

View file

@ -0,0 +1,23 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.WorkerHelper do
alias Pleroma.Config
def worker_args(queue) do
case Config.get([:workers, :retries, queue]) do
nil -> []
max_attempts -> [max_attempts: max_attempts]
end
end
def sidekiq_backoff(attempt, pow \\ 4, base_backoff \\ 15) do
backoff =
:math.pow(attempt, pow) +
base_backoff +
:rand.uniform(2 * base_backoff) * attempt
trunc(backoff)
end
end

View file

@ -1123,7 +1123,7 @@ test "it deletes a user, all follow relationships and all activities", %{user: u
"id" => "pleroma:fakeid" "id" => "pleroma:fakeid"
} }
}, },
all_enqueued(worker: Pleroma.Workers.Publisher) all_enqueued(worker: Pleroma.Workers.PublisherWorker)
) )
end end
end end

View file

@ -17,7 +17,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
alias Pleroma.Web.ActivityPub.UserView alias Pleroma.Web.ActivityPub.UserView
alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
alias Pleroma.Workers.Receiver, as: ReceiverWorker alias Pleroma.Workers.ReceiverWorker
setup_all do setup_all do
Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end) Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)

View file

@ -7,7 +7,7 @@ defmodule Pleroma.Web.FederatorTest do
alias Pleroma.Tests.ObanHelpers alias Pleroma.Tests.ObanHelpers
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Federator alias Pleroma.Web.Federator
alias Pleroma.Workers.Publisher, as: PublisherWorker alias Pleroma.Workers.PublisherWorker
use Pleroma.DataCase use Pleroma.DataCase
use Oban.Testing, repo: Pleroma.Repo use Oban.Testing, repo: Pleroma.Repo

View file

@ -11,7 +11,7 @@ defmodule Pleroma.Web.WebsubTest do
alias Pleroma.Web.Websub alias Pleroma.Web.Websub
alias Pleroma.Web.Websub.WebsubClientSubscription alias Pleroma.Web.Websub.WebsubClientSubscription
alias Pleroma.Web.Websub.WebsubServerSubscription alias Pleroma.Web.Websub.WebsubServerSubscription
alias Pleroma.Workers.Subscriber, as: SubscriberWorker alias Pleroma.Workers.SubscriberWorker
import Pleroma.Factory import Pleroma.Factory
import Tesla.Mock import Tesla.Mock