Merge branch '1149-oban-job-queue' into 'develop'

[#1149] `oban`-based job & retry queues

Closes #1149

See merge request pleroma/pleroma!1518
This commit is contained in:
rinpatch 2019-09-15 20:22:17 +00:00
commit 2990c0a53b
63 changed files with 1579 additions and 1327 deletions

View file

@ -30,6 +30,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- AdminAPI: Add "godmode" while fetching user statuses (i.e. admin can see private statuses) - AdminAPI: Add "godmode" while fetching user statuses (i.e. admin can see private statuses)
- Improve digest email template - Improve digest email template
Pagination: (optional) return `total` alongside with `items` when paginating Pagination: (optional) return `total` alongside with `items` when paginating
- Replaced [pleroma_job_queue](https://git.pleroma.social/pleroma/pleroma_job_queue) and `Pleroma.Web.Federator.RetryQueue` with [Oban](https://github.com/sorentwo/oban) (see [`docs/config.md`](docs/config.md) on migrating customized worker / retry settings)
- Introduced [quantum](https://github.com/quantum-elixir/quantum-core) job scheduler
### Fixed ### Fixed
- Following from Osada - Following from Osada

View file

@ -51,6 +51,24 @@
telemetry_event: [Pleroma.Repo.Instrumenter], telemetry_event: [Pleroma.Repo.Instrumenter],
migration_lock: nil migration_lock: nil
scheduled_jobs =
with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest],
true <- digest_config[:active] do
[{digest_config[:schedule], {Pleroma.Daemons.DigestEmailDaemon, :perform, []}}]
else
_ -> []
end
scheduled_jobs =
scheduled_jobs ++
[{"0 */6 * * * *", {Pleroma.Web.Websub, :refresh_subscriptions, []}}]
config :pleroma, Pleroma.Scheduler,
global: true,
overlap: true,
timezone: :utc,
jobs: scheduled_jobs
config :pleroma, Pleroma.Captcha, config :pleroma, Pleroma.Captcha,
enabled: false, enabled: false,
seconds_valid: 60, seconds_valid: 60,
@ -451,21 +469,26 @@
"web" "web"
] ]
config :pleroma, Pleroma.Web.Federator.RetryQueue, config :pleroma, Oban,
enabled: false, repo: Pleroma.Repo,
max_jobs: 20, verbose: false,
initial_timeout: 30, prune: {:maxlen, 1500},
max_retries: 5 queues: [
activity_expiration: 10,
federator_incoming: 50,
federator_outgoing: 50,
web_push: 50,
mailer: 10,
transmogrifier: 20,
scheduled_activities: 10,
background: 5
]
config :pleroma_job_queue, :queues, config :pleroma, :workers,
activity_expiration: 10, retries: [
federator_incoming: 50, federator_incoming: 5,
federator_outgoing: 50, federator_outgoing: 5
web_push: 50, ]
mailer: 10,
transmogrifier: 20,
scheduled_activities: 10,
background: 5
config :pleroma, :fetch_initial_posts, config :pleroma, :fetch_initial_posts,
enabled: false, enabled: false,

View file

@ -1778,87 +1778,141 @@
group: :pleroma_job_queue, group: :pleroma_job_queue,
key: :queues, key: :queues,
type: :group, type: :group,
description: "Pleroma Job Queue configuration: a list of queues with maximum concurrent jobs", description: "[Deprecated] Replaced with `Oban`/`:queues` (keeping the same format)",
children: [ children: []
%{
key: :federator_outgoing,
type: :integer,
description: "Outgoing federation queue",
suggestions: [50]
},
%{
key: :federator_incoming,
type: :integer,
description: "Incoming federation queue",
suggestions: [50]
},
%{
key: :mailer,
type: :integer,
description: "Email sender queue, see Pleroma.Emails.Mailer",
suggestions: [10]
},
%{
key: :web_push,
type: :integer,
description: "Web push notifications queue",
suggestions: [50]
},
%{
key: :transmogrifier,
type: :integer,
description: "Transmogrifier queue",
suggestions: [20]
},
%{
key: :scheduled_activities,
type: :integer,
description: "Scheduled activities queue, see Pleroma.ScheduledActivities",
suggestions: [10]
},
%{
key: :activity_expiration,
type: :integer,
description: "Activity expiration queue",
suggestions: [10]
},
%{
key: :background,
type: :integer,
description: "Background queue",
suggestions: [5]
}
]
}, },
%{ %{
group: :pleroma, group: :pleroma,
key: Pleroma.Web.Federator.RetryQueue, key: Pleroma.Web.Federator.RetryQueue,
type: :group, type: :group,
description: "", description: "[Deprecated] See `Oban` and `:workers` sections for configuration notes",
children: [ children: [
%{
key: :enabled,
type: :boolean,
description: "If set to true, failed federation jobs will be retried",
suggestions: [true, false]
},
%{
key: :max_jobs,
type: :integer,
description: "The maximum amount of parallel federation jobs running at the same time",
suggestions: [20]
},
%{
key: :initial_timeout,
type: :integer,
description: "The initial timeout in seconds",
suggestions: [30]
},
%{ %{
key: :max_retries, key: :max_retries,
type: :integer, type: :integer,
description: "The maximum number of times a federation job is retried", description: "[Deprecated] Replaced as `Oban`/`:queues`/`:outgoing_federation` value",
suggestions: [5] suggestions: []
}
]
},
%{
group: :pleroma,
key: Oban,
type: :group,
description: """
[Oban](https://github.com/sorentwo/oban) asynchronous job processor configuration.
Note: if you are running PostgreSQL in [`silent_mode`](https://postgresqlco.nf/en/doc/param/silent_mode?version=9.1),
it's advised to set [`log_destination`](https://postgresqlco.nf/en/doc/param/log_destination?version=9.1) to `syslog`,
otherwise `postmaster.log` file may grow because of "you don't own a lock of type ShareLock" warnings
(see https://github.com/sorentwo/oban/issues/52).
""",
children: [
%{
key: :repo,
type: :module,
description: "Application's Ecto repo",
suggestions: [Pleroma.Repo]
},
%{
key: :verbose,
type: :boolean,
description: "Logs verbose mode",
suggestions: [false, true]
},
%{
key: :prune,
type: [:atom, :tuple],
description:
"Non-retryable jobs [pruning settings](https://github.com/sorentwo/oban#pruning)",
suggestions: [:disabled, {:maxlen, 1500}, {:maxage, 60 * 60}]
},
%{
key: :queues,
type: :keyword,
description:
"Background jobs queues (keys: queues, values: max numbers of concurrent jobs)",
suggestions: [
[
activity_expiration: 10,
background: 5,
federator_incoming: 50,
federator_outgoing: 50,
mailer: 10,
scheduled_activities: 10,
transmogrifier: 20,
web_push: 50
]
],
children: [
%{
key: :activity_expiration,
type: :integer,
description: "Activity expiration queue",
suggestions: [10]
},
%{
key: :background,
type: :integer,
description: "Background queue",
suggestions: [5]
},
%{
key: :federator_incoming,
type: :integer,
description: "Incoming federation queue",
suggestions: [50]
},
%{
key: :federator_outgoing,
type: :integer,
description: "Outgoing federation queue",
suggestions: [50]
},
%{
key: :mailer,
type: :integer,
description: "Email sender queue, see Pleroma.Emails.Mailer",
suggestions: [10]
},
%{
key: :scheduled_activities,
type: :integer,
description: "Scheduled activities queue, see Pleroma.ScheduledActivities",
suggestions: [10]
},
%{
key: :transmogrifier,
type: :integer,
description: "Transmogrifier queue",
suggestions: [20]
},
%{
key: :web_push,
type: :integer,
description: "Web push notifications queue",
suggestions: [50]
}
]
}
]
},
%{
group: :pleroma,
key: :workers,
type: :group,
description: "Includes custom worker options not interpretable directly by `Oban`",
children: [
%{
key: :retries,
type: :keyword,
description: "Max retry attempts for failed jobs, per `Oban` queue",
suggestions: [
[
federator_incoming: 5,
federator_outgoing: 5
]
]
} }
] ]
}, },

View file

@ -61,7 +61,11 @@
config :web_push_encryption, :http_client, Pleroma.Web.WebPushHttpClientMock config :web_push_encryption, :http_client, Pleroma.Web.WebPushHttpClientMock
config :pleroma_job_queue, disabled: true config :pleroma, Oban,
queues: false,
prune: :disabled
config :pleroma, Pleroma.Scheduler, jobs: []
config :pleroma, Pleroma.ScheduledActivity, config :pleroma, Pleroma.ScheduledActivity,
daily_user_limit: 2, daily_user_limit: 2,

View file

@ -400,35 +400,71 @@ You can then do
curl "http://localhost:4000/api/pleroma/admin/invite_token?admin_token=somerandomtoken" curl "http://localhost:4000/api/pleroma/admin/invite_token?admin_token=somerandomtoken"
``` ```
## :pleroma_job_queue ## Oban
[Pleroma Job Queue](https://git.pleroma.social/pleroma/pleroma_job_queue) configuration: a list of queues with maximum concurrent jobs. [Oban](https://github.com/sorentwo/oban) asynchronous job processor configuration.
Configuration options described in [Oban readme](https://github.com/sorentwo/oban#usage):
* `repo` - app's Ecto repo (`Pleroma.Repo`)
* `verbose` - logs verbosity
* `prune` - non-retryable jobs [pruning settings](https://github.com/sorentwo/oban#pruning) (`:disabled` / `{:maxlen, value}` / `{:maxage, value}`)
* `queues` - job queues (see below)
Pleroma has the following queues: Pleroma has the following queues:
* `activity_expiration` - Activity expiration
* `federator_outgoing` - Outgoing federation * `federator_outgoing` - Outgoing federation
* `federator_incoming` - Incoming federation * `federator_incoming` - Incoming federation
* `mailer` - Email sender, see [`Pleroma.Emails.Mailer`](#pleroma-emails-mailer) * `mailer` - Email sender, see [`Pleroma.Emails.Mailer`](#pleromaemailsmailer)
* `transmogrifier` - Transmogrifier * `transmogrifier` - Transmogrifier
* `web_push` - Web push notifications * `web_push` - Web push notifications
* `scheduled_activities` - Scheduled activities, see [`Pleroma.ScheduledActivities`](#pleromascheduledactivity) * `scheduled_activities` - Scheduled activities, see [`Pleroma.ScheduledActivity`](#pleromascheduledactivity)
Example: Example:
```elixir ```elixir
config :pleroma_job_queue, :queues, config :pleroma, Oban,
federator_incoming: 50, repo: Pleroma.Repo,
federator_outgoing: 50 verbose: false,
prune: {:maxlen, 1500},
queues: [
federator_incoming: 50,
federator_outgoing: 50
]
``` ```
This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the `max_jobs` set to `50`. This config contains two queues: `federator_incoming` and `federator_outgoing`. Both have the number of max concurrent jobs set to `50`.
## Pleroma.Web.Federator.RetryQueue ### Migrating `pleroma_job_queue` settings
* `enabled`: If set to `true`, failed federation jobs will be retried `config :pleroma_job_queue, :queues` is replaced by `config :pleroma, Oban, :queues` and uses the same format (keys are queues' names, values are max concurrent jobs numbers).
* `max_jobs`: The maximum amount of parallel federation jobs running at the same time.
* `initial_timeout`: The initial timeout in seconds ### Note on running with PostgreSQL in silent mode
* `max_retries`: The maximum number of times a federation job is retried
If you are running PostgreSQL in [`silent_mode`](https://postgresqlco.nf/en/doc/param/silent_mode?version=9.1), it's advised to set [`log_destination`](https://postgresqlco.nf/en/doc/param/log_destination?version=9.1) to `syslog`,
otherwise `postmaster.log` file may grow because of "you don't own a lock of type ShareLock" warnings (see https://github.com/sorentwo/oban/issues/52).
## :workers
Includes custom worker options not interpretable directly by `Oban`.
* `retries` — keyword lists where keys are `Oban` queues (see above) and values are numbers of max attempts for failed jobs.
Example:
```elixir
config :pleroma, :workers,
retries: [
federator_incoming: 5,
federator_outgoing: 5
]
```
### Migrating `Pleroma.Web.Federator.RetryQueue` settings
* `max_retries` is replaced with `config :pleroma, :workers, retries: [federator_outgoing: 5]`
* `enabled: false` corresponds to `config :pleroma, :workers, retries: [federator_outgoing: 1]`
* deprecated options: `max_jobs`, `initial_timeout`
## Pleroma.Web.Metadata ## Pleroma.Web.Metadata
* `providers`: a list of metadata providers to enable. Providers available: * `providers`: a list of metadata providers to enable. Providers available:
@ -489,6 +525,24 @@ config :auto_linker,
] ]
``` ```
## Pleroma.Scheduler
Configuration for [Quantum](https://github.com/quantum-elixir/quantum-core) jobs scheduler.
See [Quantum readme](https://github.com/quantum-elixir/quantum-core#usage) for the list of supported options.
Example:
```elixir
config :pleroma, Pleroma.Scheduler,
global: true,
overlap: true,
timezone: :utc,
jobs: [{"0 */6 * * * *", {Pleroma.Web.Websub, :refresh_subscriptions, []}}]
```
The above example defines a single job which invokes `Pleroma.Web.Websub.refresh_subscriptions()` every 6 hours ("0 */6 * * * *", [crontab format](https://en.wikipedia.org/wiki/Cron)).
## Pleroma.ScheduledActivity ## Pleroma.ScheduledActivity
* `daily_user_limit`: the number of scheduled activities a user is allowed to create in a single day (Default: `25`) * `daily_user_limit`: the number of scheduled activities a user is allowed to create in a single day (Default: `25`)

View file

@ -31,18 +31,19 @@ def start(_type, _args) do
children = children =
[ [
Pleroma.Repo, Pleroma.Repo,
Pleroma.Scheduler,
Pleroma.Config.TransferTask, Pleroma.Config.TransferTask,
Pleroma.Emoji, Pleroma.Emoji,
Pleroma.Captcha, Pleroma.Captcha,
Pleroma.FlakeId, Pleroma.FlakeId,
Pleroma.ScheduledActivityWorker, Pleroma.Daemons.ScheduledActivityDaemon,
Pleroma.ActivityExpirationWorker Pleroma.Daemons.ActivityExpirationDaemon
] ++ ] ++
cachex_children() ++ cachex_children() ++
hackney_pool_children() ++ hackney_pool_children() ++
[ [
Pleroma.Web.Federator.RetryQueue,
Pleroma.Stats, Pleroma.Stats,
{Oban, Pleroma.Config.get(Oban)},
%{ %{
id: :web_push_init, id: :web_push_init,
start: {Task, :start_link, [&Pleroma.Web.Push.init/0]}, start: {Task, :start_link, [&Pleroma.Web.Push.init/0]},
@ -70,9 +71,7 @@ def start(_type, _args) do
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
# for other strategies and supported options # for other strategies and supported options
opts = [strategy: :one_for_one, name: Pleroma.Supervisor] opts = [strategy: :one_for_one, name: Pleroma.Supervisor]
result = Supervisor.start_link(children, opts) Supervisor.start_link(children, opts)
:ok = after_supervisor_start()
result
end end
defp setup_instrumenters do defp setup_instrumenters do
@ -164,17 +163,4 @@ defp hackney_pool_children do
:hackney_pool.child_spec(pool, options) :hackney_pool.child_spec(pool, options)
end end
end end
defp after_supervisor_start do
with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest],
true <- digest_config[:active] do
PleromaJobQueue.schedule(
digest_config[:schedule],
:digest_emails,
Pleroma.DigestEmailWorker
)
end
:ok
end
end end

View file

@ -2,13 +2,14 @@
# Copyright © 2019 Pleroma Authors <https://pleroma.social/> # Copyright © 2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.ActivityExpirationWorker do defmodule Pleroma.Daemons.ActivityExpirationDaemon do
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.ActivityExpiration alias Pleroma.ActivityExpiration
alias Pleroma.Config alias Pleroma.Config
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
require Logger require Logger
use GenServer use GenServer
import Ecto.Query import Ecto.Query
@ -49,7 +50,10 @@ def perform(:execute, expiration_id) do
def handle_info(:perform, state) do def handle_info(:perform, state) do
ActivityExpiration.due_expirations(@schedule_interval) ActivityExpiration.due_expirations(@schedule_interval)
|> Enum.each(fn expiration -> |> Enum.each(fn expiration ->
PleromaJobQueue.enqueue(:activity_expiration, __MODULE__, [:execute, expiration.id]) Pleroma.Workers.ActivityExpirationWorker.enqueue(
"activity_expiration",
%{"activity_expiration_id" => expiration.id}
)
end) end)
schedule_next() schedule_next()

View file

@ -2,10 +2,11 @@
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.DigestEmailWorker do defmodule Pleroma.Daemons.DigestEmailDaemon do
import Ecto.Query alias Pleroma.Repo
alias Pleroma.Workers.DigestEmailsWorker
@queue_name :digest_emails import Ecto.Query
def perform do def perform do
config = Pleroma.Config.get([:email_notifications, :digest]) config = Pleroma.Config.get([:email_notifications, :digest])
@ -20,8 +21,10 @@ def perform do
where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"), where: u.last_digest_emailed_at < datetime_add(^now, ^negative_interval, "day"),
select: u select: u
) )
|> Pleroma.Repo.all() |> Repo.all()
|> Enum.each(&PleromaJobQueue.enqueue(@queue_name, __MODULE__, [&1])) |> Enum.each(fn user ->
DigestEmailsWorker.enqueue("digest_email", %{"user_id" => user.id})
end)
end end
@doc """ @doc """

View file

@ -2,7 +2,7 @@
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.ScheduledActivityWorker do defmodule Pleroma.Daemons.ScheduledActivityDaemon do
@moduledoc """ @moduledoc """
Sends scheduled activities to the job queue. Sends scheduled activities to the job queue.
""" """
@ -11,6 +11,7 @@ defmodule Pleroma.ScheduledActivityWorker do
alias Pleroma.ScheduledActivity alias Pleroma.ScheduledActivity
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
use GenServer use GenServer
require Logger require Logger
@ -45,7 +46,10 @@ def perform(:execute, scheduled_activity_id) do
def handle_info(:perform, state) do def handle_info(:perform, state) do
ScheduledActivity.due_activities(@schedule_interval) ScheduledActivity.due_activities(@schedule_interval)
|> Enum.each(fn scheduled_activity -> |> Enum.each(fn scheduled_activity ->
PleromaJobQueue.enqueue(:scheduled_activities, __MODULE__, [:execute, scheduled_activity.id]) Pleroma.Workers.ScheduledActivityWorker.enqueue(
"execute",
%{"activity_id" => scheduled_activity.id}
)
end) end)
schedule_next() schedule_next()

View file

@ -9,6 +9,7 @@ defmodule Pleroma.Emails.Mailer do
The module contains functions to delivery email using Swoosh.Mailer. The module contains functions to delivery email using Swoosh.Mailer.
""" """
alias Pleroma.Workers.MailerWorker
alias Swoosh.DeliveryError alias Swoosh.DeliveryError
@otp_app :pleroma @otp_app :pleroma
@ -19,7 +20,12 @@ def enabled?, do: Pleroma.Config.get([__MODULE__, :enabled])
@doc "add email to queue" @doc "add email to queue"
def deliver_async(email, config \\ []) do def deliver_async(email, config \\ []) do
PleromaJobQueue.enqueue(:mailer, __MODULE__, [:deliver_async, email, config]) encoded_email =
email
|> :erlang.term_to_binary()
|> Base.encode64()
MailerWorker.enqueue("email", %{"encoded_email" => encoded_email, "config" => config})
end end
@doc "callback to perform send email from queue" @doc "callback to perform send email from queue"

View file

@ -90,7 +90,7 @@ def set_reachable(_), do: {:error, nil}
def set_unreachable(url_or_host, unreachable_since \\ nil) def set_unreachable(url_or_host, unreachable_since \\ nil)
def set_unreachable(url_or_host, unreachable_since) when is_binary(url_or_host) do def set_unreachable(url_or_host, unreachable_since) when is_binary(url_or_host) do
unreachable_since = unreachable_since || DateTime.utc_now() unreachable_since = parse_datetime(unreachable_since) || NaiveDateTime.utc_now()
host = host(url_or_host) host = host(url_or_host)
existing_record = Repo.get_by(Instance, %{host: host}) existing_record = Repo.get_by(Instance, %{host: host})
@ -114,4 +114,10 @@ def set_unreachable(url_or_host, unreachable_since) when is_binary(url_or_host)
end end
def set_unreachable(_, _), do: {:error, nil} def set_unreachable(_, _), do: {:error, nil}
defp parse_datetime(datetime) when is_binary(datetime) do
NaiveDateTime.from_iso8601(datetime)
end
defp parse_datetime(datetime), do: datetime
end end

7
lib/pleroma/scheduler.ex Normal file
View file

@ -0,0 +1,7 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Scheduler do
use Quantum.Scheduler, otp_app: :pleroma
end

View file

@ -27,6 +27,7 @@ defmodule Pleroma.User do
alias Pleroma.Web.OStatus alias Pleroma.Web.OStatus
alias Pleroma.Web.RelMe alias Pleroma.Web.RelMe
alias Pleroma.Web.Websub alias Pleroma.Web.Websub
alias Pleroma.Workers.BackgroundWorker
require Logger require Logger
@ -647,8 +648,9 @@ def get_or_fetch_by_nickname(nickname) do
end end
@doc "Fetch some posts when the user has just been federated with" @doc "Fetch some posts when the user has just been federated with"
def fetch_initial_posts(user), def fetch_initial_posts(user) do
do: PleromaJobQueue.enqueue(:background, __MODULE__, [:fetch_initial_posts, user]) BackgroundWorker.enqueue("fetch_initial_posts", %{"user_id" => user.id})
end
@spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t() @spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t()
def get_followers_query(%User{} = user, nil) do def get_followers_query(%User{} = user, nil) do
@ -1078,7 +1080,7 @@ def unblock_domain(user, domain) do
end end
def deactivate_async(user, status \\ true) do def deactivate_async(user, status \\ true) do
PleromaJobQueue.enqueue(:background, __MODULE__, [:deactivate_async, user, status]) BackgroundWorker.enqueue("deactivate_user", %{"user_id" => user.id, "status" => status})
end end
def deactivate(%User{} = user, status \\ true) do def deactivate(%User{} = user, status \\ true) do
@ -1106,9 +1108,9 @@ def update_notification_settings(%User{} = user, settings \\ %{}) do
|> update_and_set_cache() |> update_and_set_cache()
end end
@spec delete(User.t()) :: :ok def delete(%User{} = user) do
def delete(%User{} = user), BackgroundWorker.enqueue("delete_user", %{"user_id" => user.id})
do: PleromaJobQueue.enqueue(:background, __MODULE__, [:delete, user]) end
@spec perform(atom(), User.t()) :: {:ok, User.t()} @spec perform(atom(), User.t()) :: {:ok, User.t()}
def perform(:delete, %User{} = user) do def perform(:delete, %User{} = user) do
@ -1215,21 +1217,20 @@ def external_users(opts \\ []) do
Repo.all(query) Repo.all(query)
end end
def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers), def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do
do: BackgroundWorker.enqueue("blocks_import", %{
PleromaJobQueue.enqueue(:background, __MODULE__, [ "blocker_id" => blocker.id,
:blocks_import, "blocked_identifiers" => blocked_identifiers
blocker, })
blocked_identifiers end
])
def follow_import(%User{} = follower, followed_identifiers) when is_list(followed_identifiers), def follow_import(%User{} = follower, followed_identifiers)
do: when is_list(followed_identifiers) do
PleromaJobQueue.enqueue(:background, __MODULE__, [ BackgroundWorker.enqueue("follow_import", %{
:follow_import, "follower_id" => follower.id,
follower, "followed_identifiers" => followed_identifiers
followed_identifiers })
]) end
def delete_user_activities(%User{ap_id: ap_id} = user) do def delete_user_activities(%User{ap_id: ap_id} = user) do
ap_id ap_id

View file

@ -17,6 +17,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.Web.ActivityPub.MRF alias Pleroma.Web.ActivityPub.MRF
alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.WebFinger alias Pleroma.Web.WebFinger
alias Pleroma.Workers.BackgroundWorker
import Ecto.Query import Ecto.Query
import Pleroma.Web.ActivityPub.Utils import Pleroma.Web.ActivityPub.Utils
@ -145,7 +146,7 @@ def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when
activity activity
end end
PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity]) BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
Notification.create_notifications(activity) Notification.create_notifications(activity)

View file

@ -8,6 +8,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
alias Pleroma.HTTP alias Pleroma.HTTP
alias Pleroma.Web.MediaProxy alias Pleroma.Web.MediaProxy
alias Pleroma.Workers.BackgroundWorker
require Logger require Logger
@ -30,7 +31,7 @@ def perform(:preload, %{"object" => %{"attachment" => attachments}} = _message)
url url
|> Enum.each(fn |> Enum.each(fn
%{"href" => href} -> %{"href" => href} ->
PleromaJobQueue.enqueue(:background, __MODULE__, [:prefetch, href]) BackgroundWorker.enqueue("media_proxy_prefetch", %{"url" => href})
x -> x ->
Logger.debug("Unhandled attachment URL object #{inspect(x)}") Logger.debug("Unhandled attachment URL object #{inspect(x)}")
@ -46,7 +47,7 @@ def filter(
%{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message %{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message
) )
when is_list(attachments) and length(attachments) > 0 do when is_list(attachments) and length(attachments) > 0 do
PleromaJobQueue.enqueue(:background, __MODULE__, [:preload, message]) BackgroundWorker.enqueue("media_proxy_preload", %{"message" => message})
{:ok, message} {:ok, message}
end end

View file

@ -84,6 +84,15 @@ def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = pa
end end
end end
def publish_one(%{actor_id: actor_id} = params) do
actor = User.get_cached_by_id(actor_id)
params
|> Map.delete(:actor_id)
|> Map.put(:actor, actor)
|> publish_one()
end
defp should_federate?(inbox, public) do defp should_federate?(inbox, public) do
if public do if public do
true true
@ -159,7 +168,8 @@ def determine_inbox(
Publishes an activity with BCC to all relevant peers. Publishes an activity with BCC to all relevant peers.
""" """
def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bcc != [] do def publish(%User{} = actor, %{data: %{"bcc" => bcc}} = activity)
when is_list(bcc) and bcc != [] do
public = is_public?(activity) public = is_public?(activity)
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data) {:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
@ -186,7 +196,7 @@ def publish(actor, %{data: %{"bcc" => bcc}} = activity) when is_list(bcc) and bc
Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{ Pleroma.Web.Federator.Publisher.enqueue_one(__MODULE__, %{
inbox: inbox, inbox: inbox,
json: json, json: json,
actor: actor, actor_id: actor.id,
id: activity.data["id"], id: activity.data["id"],
unreachable_since: unreachable_since unreachable_since: unreachable_since
}) })
@ -221,7 +231,7 @@ def publish(%User{} = actor, %Activity{} = activity) do
%{ %{
inbox: inbox, inbox: inbox,
json: json, json: json,
actor: actor, actor_id: actor.id,
id: activity.data["id"], id: activity.data["id"],
unreachable_since: unreachable_since unreachable_since: unreachable_since
} }

View file

@ -15,6 +15,7 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.Federator alias Pleroma.Web.Federator
alias Pleroma.Workers.TransmogrifierWorker
import Ecto.Query import Ecto.Query
@ -1051,7 +1052,7 @@ def upgrade_user_from_ap_id(ap_id) do
already_ap <- User.ap_enabled?(user), already_ap <- User.ap_enabled?(user),
{:ok, user} <- user |> User.upgrade_changeset(data) |> User.update_and_set_cache() do {:ok, user} <- user |> User.upgrade_changeset(data) |> User.update_and_set_cache() do
unless already_ap do unless already_ap do
PleromaJobQueue.enqueue(:transmogrifier, __MODULE__, [:user_upgrade, user]) TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id})
end end
{:ok, user} {:ok, user}

View file

@ -167,14 +167,7 @@ def create_context(context) do
@spec maybe_federate(any()) :: :ok @spec maybe_federate(any()) :: :ok
def maybe_federate(%Activity{local: true} = activity) do def maybe_federate(%Activity{local: true} = activity) do
if Pleroma.Config.get!([:instance, :federating]) do if Pleroma.Config.get!([:instance, :federating]) do
priority = Pleroma.Web.Federator.publish(activity)
case activity.data["type"] do
"Delete" -> 10
"Create" -> 1
_ -> 5
end
Pleroma.Web.Federator.publish(activity, priority)
end end
:ok :ok

View file

@ -10,16 +10,17 @@ defmodule Pleroma.Web.Federator do
alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.Federator.Publisher alias Pleroma.Web.Federator.Publisher
alias Pleroma.Web.Federator.RetryQueue
alias Pleroma.Web.OStatus alias Pleroma.Web.OStatus
alias Pleroma.Web.Websub alias Pleroma.Web.Websub
alias Pleroma.Workers.PublisherWorker
alias Pleroma.Workers.ReceiverWorker
alias Pleroma.Workers.SubscriberWorker
require Logger require Logger
def init do def init do
# 1 minute # To do: consider removing this call in favor of scheduled execution (`quantum`-based)
Process.sleep(1000 * 60) refresh_subscriptions(schedule_in: 60)
refresh_subscriptions()
end end
@doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)" @doc "Addresses [memory leaks on recursive replies fetching](https://git.pleroma.social/pleroma/pleroma/issues/161)"
@ -37,50 +38,38 @@ def allowed_incoming_reply_depth?(depth) do
# Client API # Client API
def incoming_doc(doc) do def incoming_doc(doc) do
PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_doc, doc]) ReceiverWorker.enqueue("incoming_doc", %{"body" => doc})
end end
def incoming_ap_doc(params) do def incoming_ap_doc(params) do
PleromaJobQueue.enqueue(:federator_incoming, __MODULE__, [:incoming_ap_doc, params]) ReceiverWorker.enqueue("incoming_ap_doc", %{"params" => params})
end end
def publish(activity, priority \\ 1) do def publish(%{id: "pleroma:fakeid"} = activity) do
PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish, activity], priority) perform(:publish, activity)
end
def publish(activity) do
PublisherWorker.enqueue("publish", %{"activity_id" => activity.id})
end end
def verify_websub(websub) do def verify_websub(websub) do
PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:verify_websub, websub]) SubscriberWorker.enqueue("verify_websub", %{"websub_id" => websub.id})
end end
def request_subscription(sub) do def request_subscription(websub) do
PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:request_subscription, sub]) SubscriberWorker.enqueue("request_subscription", %{"websub_id" => websub.id})
end end
def refresh_subscriptions do def refresh_subscriptions(worker_args \\ []) do
PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:refresh_subscriptions]) SubscriberWorker.enqueue("refresh_subscriptions", %{}, worker_args ++ [max_attempts: 1])
end end
# Job Worker Callbacks # Job Worker Callbacks
def perform(:refresh_subscriptions) do @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
Logger.debug("Federator running refresh subscriptions") def perform(:publish_one, module, params) do
Websub.refresh_subscriptions() apply(module, :publish_one, [params])
spawn(fn ->
# 6 hours
Process.sleep(1000 * 60 * 60 * 6)
refresh_subscriptions()
end)
end
def perform(:request_subscription, websub) do
Logger.debug("Refreshing #{websub.topic}")
with {:ok, websub} <- Websub.request_subscription(websub) do
Logger.debug("Successfully refreshed #{websub.topic}")
else
_e -> Logger.debug("Couldn't refresh #{websub.topic}")
end
end end
def perform(:publish, activity) do def perform(:publish, activity) do
@ -92,14 +81,6 @@ def perform(:publish, activity) do
end end
end end
def perform(:verify_websub, websub) do
Logger.debug(fn ->
"Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
end)
Websub.verify(websub)
end
def perform(:incoming_doc, doc) do def perform(:incoming_doc, doc) do
Logger.info("Got document, trying to parse") Logger.info("Got document, trying to parse")
OStatus.handle_incoming(doc) OStatus.handle_incoming(doc)
@ -130,22 +111,27 @@ def perform(:incoming_ap_doc, params) do
end end
end end
def perform( def perform(:request_subscription, websub) do
:publish_single_websub, Logger.debug("Refreshing #{websub.topic}")
%{xml: _xml, topic: _topic, callback: _callback, secret: _secret} = params
) do
case Websub.publish_one(params) do
{:ok, _} ->
:ok
{:error, _} -> with {:ok, websub} <- Websub.request_subscription(websub) do
RetryQueue.enqueue(params, Websub) Logger.debug("Successfully refreshed #{websub.topic}")
else
_e -> Logger.debug("Couldn't refresh #{websub.topic}")
end end
end end
def perform(type, _) do def perform(:verify_websub, websub) do
Logger.debug(fn -> "Unknown task: #{type}" end) Logger.debug(fn ->
{:error, "Don't know what to do with this"} "Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
end)
Websub.verify(websub)
end
def perform(:refresh_subscriptions) do
Logger.debug("Federator running refresh subscriptions")
Websub.refresh_subscriptions()
end end
def ap_enabled_actor(id) do def ap_enabled_actor(id) do

View file

@ -6,7 +6,7 @@ defmodule Pleroma.Web.Federator.Publisher do
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Config alias Pleroma.Config
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.Federator.RetryQueue alias Pleroma.Workers.PublisherWorker
require Logger require Logger
@ -30,23 +30,11 @@ defmodule Pleroma.Web.Federator.Publisher do
Enqueue publishing a single activity. Enqueue publishing a single activity.
""" """
@spec enqueue_one(module(), Map.t()) :: :ok @spec enqueue_one(module(), Map.t()) :: :ok
def enqueue_one(module, %{} = params), def enqueue_one(module, %{} = params) do
do: PleromaJobQueue.enqueue(:federator_outgoing, __MODULE__, [:publish_one, module, params]) PublisherWorker.enqueue(
"publish_one",
@spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()} %{"module" => to_string(module), "params" => params}
def perform(:publish_one, module, params) do )
case apply(module, :publish_one, [params]) do
{:ok, _} ->
:ok
{:error, _e} ->
RetryQueue.enqueue(params, module)
end
end
def perform(type, _, _) do
Logger.debug("Unknown task: #{type}")
{:error, "Don't know what to do with this"}
end end
@doc """ @doc """

View file

@ -1,239 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Federator.RetryQueue do
use GenServer
require Logger
def init(args) do
queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected])
{:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}}
end
def start_link(_) do
enabled =
if Pleroma.Config.get(:env) == :test,
do: true,
else: Pleroma.Config.get([__MODULE__, :enabled], false)
if enabled do
Logger.info("Starting retry queue")
linkres =
GenServer.start_link(
__MODULE__,
%{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil},
name: __MODULE__
)
maybe_kickoff_timer()
linkres
else
Logger.info("Retry queue disabled")
:ignore
end
end
def enqueue(data, transport, retries \\ 0) do
GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
end
def get_stats do
GenServer.call(__MODULE__, :get_stats)
end
def reset_stats do
GenServer.call(__MODULE__, :reset_stats)
end
def get_retry_params(retries) do
if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do
{:drop, "Max retries reached"}
else
{:retry, growth_function(retries)}
end
end
def get_retry_timer_interval do
Pleroma.Config.get([:retry_queue, :interval], 1000)
end
defp ets_count_expires(table, current_time) do
:ets.select_count(
table,
[
{
{:"$1", :"$2"},
[{:"=<", :"$1", {:const, current_time}}],
[true]
}
]
)
end
defp ets_pop_n_expired(table, current_time, desired) do
{popped, _continuation} =
:ets.select(
table,
[
{
{:"$1", :"$2"},
[{:"=<", :"$1", {:const, current_time}}],
[:"$_"]
}
],
desired
)
popped
|> Enum.each(fn e ->
:ets.delete_object(table, e)
end)
popped
end
def maybe_start_job(running_jobs, queue_table) do
# we don't want to hit the ets or the DateTime more times than we have to
# could optimize slightly further by not using the count, and instead grabbing
# up to N objects early...
current_time = DateTime.to_unix(DateTime.utc_now())
n_running_jobs = :sets.size(running_jobs)
if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do
n_ready_jobs = ets_count_expires(queue_table, current_time)
if n_ready_jobs > 0 do
# figure out how many we could start
available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs
start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
else
running_jobs
end
else
running_jobs
end
end
defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do
running_jobs
end
defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
when available_job_slots > 0 do
candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots)
candidates
|> List.foldl(running_jobs, fn {_, e}, rj ->
{:ok, pid} = Task.start(fn -> worker(e) end)
mref = Process.monitor(pid)
:sets.add_element(mref, rj)
end)
end
def worker({:send, data, transport, retries}) do
case transport.publish_one(data) do
{:ok, _} ->
GenServer.cast(__MODULE__, :inc_delivered)
:delivered
{:error, _reason} ->
enqueue(data, transport, retries)
:retry
end
end
def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
{:reply, %{delivered: delivery_count, dropped: drop_count}, state}
end
def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
{:reply, %{delivered: delivery_count, dropped: drop_count},
%{state | delivered: 0, dropped: 0}}
end
def handle_cast(:reset_stats, state) do
{:noreply, %{state | delivered: 0, dropped: 0}}
end
def handle_cast(
{:maybe_enqueue, data, transport, retries},
%{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state
) do
case get_retry_params(retries) do
{:retry, timeout} ->
:ets.insert(queue_table, {timeout, {:send, data, transport, retries}})
running_jobs = maybe_start_job(running_jobs, queue_table)
{:noreply, %{state | running_jobs: running_jobs}}
{:drop, message} ->
Logger.debug(message)
{:noreply, %{state | dropped: drop_count + 1}}
end
end
def handle_cast(:kickoff_timer, state) do
retry_interval = get_retry_timer_interval()
Process.send_after(__MODULE__, :retry_timer_run, retry_interval)
{:noreply, state}
end
def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do
{:noreply, %{state | delivered: delivery_count + 1}}
end
def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do
{:noreply, %{state | dropped: drop_count + 1}}
end
def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
case transport.publish_one(data) do
{:ok, _} ->
{:noreply, %{state | delivered: delivery_count + 1}}
{:error, _reason} ->
enqueue(data, transport, retries)
{:noreply, state}
end
end
def handle_info(
:retry_timer_run,
%{queue_table: queue_table, running_jobs: running_jobs} = state
) do
maybe_kickoff_timer()
running_jobs = maybe_start_job(running_jobs, queue_table)
{:noreply, %{state | running_jobs: running_jobs}}
end
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
%{running_jobs: running_jobs, queue_table: queue_table} = state
running_jobs = :sets.del_element(ref, running_jobs)
running_jobs = maybe_start_job(running_jobs, queue_table)
{:noreply, %{state | running_jobs: running_jobs}}
end
def handle_info(unknown, state) do
Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
{:noreply, state}
end
if Pleroma.Config.get(:env) == :test do
defp growth_function(_retries) do
_shutit = Pleroma.Config.get([__MODULE__, :initial_timeout])
DateTime.to_unix(DateTime.utc_now()) - 1
end
else
defp growth_function(retries) do
round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) +
DateTime.to_unix(DateTime.utc_now())
end
end
defp maybe_kickoff_timer do
GenServer.cast(__MODULE__, :kickoff_timer)
end
end

View file

@ -17,6 +17,7 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
) )
alias Pleroma.Web.OAuth.Token alias Pleroma.Web.OAuth.Token
alias Pleroma.Workers.BackgroundWorker
def start_link(_), do: GenServer.start_link(__MODULE__, %{}) def start_link(_), do: GenServer.start_link(__MODULE__, %{})
@ -27,9 +28,11 @@ def init(_) do
@doc false @doc false
def handle_info(:perform, state) do def handle_info(:perform, state) do
Token.delete_expired_tokens() BackgroundWorker.enqueue("clean_expired_tokens", %{})
Process.send_after(self(), :perform, @interval) Process.send_after(self(), :perform, @interval)
{:noreply, state} {:noreply, state}
end end
def perform(:clean), do: Token.delete_expired_tokens()
end end

View file

@ -3,7 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Push do defmodule Pleroma.Web.Push do
alias Pleroma.Web.Push.Impl alias Pleroma.Workers.WebPusherWorker
require Logger require Logger
@ -31,6 +31,7 @@ def enabled do
end end
end end
def send(notification), def send(notification) do
do: PleromaJobQueue.enqueue(:web_push, Impl, [notification]) WebPusherWorker.enqueue("web_push", %{"notification_id" => notification.id})
end
end end

View file

@ -170,6 +170,15 @@ def publish_one(%{recipient: url, feed: feed} = params) when is_binary(url) do
end end
end end
def publish_one(%{recipient_id: recipient_id} = params) do
recipient = User.get_cached_by_id(recipient_id)
params
|> Map.delete(:recipient_id)
|> Map.put(:recipient, recipient)
|> publish_one()
end
def publish_one(_), do: :noop def publish_one(_), do: :noop
@supported_activities [ @supported_activities [
@ -218,7 +227,7 @@ def publish(%{info: %{keys: keys}} = user, %{data: %{"type" => type}} = activity
Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end) Logger.debug(fn -> "Sending Salmon to #{remote_user.ap_id}" end)
Publisher.enqueue_one(__MODULE__, %{ Publisher.enqueue_one(__MODULE__, %{
recipient: remote_user, recipient_id: remote_user.id,
feed: feed, feed: feed,
unreachable_since: reachable_urls_metadata[remote_user.info.salmon] unreachable_since: reachable_urls_metadata[remote_user.info.salmon]
}) })

View file

@ -265,12 +265,7 @@ def follow_import(%{assigns: %{user: follower}} = conn, %{"list" => list}) do
String.split(line, ",") |> List.first() String.split(line, ",") |> List.first()
end) end)
|> List.delete("Account address") do |> List.delete("Account address") do
PleromaJobQueue.enqueue(:background, User, [ User.follow_import(follower, followed_identifiers)
:follow_import,
follower,
followed_identifiers
])
json(conn, "job started") json(conn, "job started")
end end
end end
@ -281,12 +276,7 @@ def blocks_import(conn, %{"list" => %Plug.Upload{} = listfile}) do
def blocks_import(%{assigns: %{user: blocker}} = conn, %{"list" => list}) do def blocks_import(%{assigns: %{user: blocker}} = conn, %{"list" => list}) do
with blocked_identifiers <- String.split(list) do with blocked_identifiers <- String.split(list) do
PleromaJobQueue.enqueue(:background, User, [ User.blocks_import(blocker, blocked_identifiers)
:blocks_import,
blocker,
blocked_identifiers
])
json(conn, "job started") json(conn, "job started")
end end
end end

View file

@ -0,0 +1,18 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.ActivityExpirationWorker do
use Pleroma.Workers.WorkerHelper, queue: "activity_expiration"
@impl Oban.Worker
def perform(
%{
"op" => "activity_expiration",
"activity_expiration_id" => activity_expiration_id
},
_job
) do
Pleroma.Daemons.ActivityExpirationDaemon.perform(:execute, activity_expiration_id)
end
end

View file

@ -0,0 +1,69 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.BackgroundWorker do
alias Pleroma.Activity
alias Pleroma.User
alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
alias Pleroma.Web.OAuth.Token.CleanWorker
use Pleroma.Workers.WorkerHelper, queue: "background"
@impl Oban.Worker
def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do
user = User.get_cached_by_id(user_id)
User.perform(:fetch_initial_posts, user)
end
def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}, _job) do
user = User.get_cached_by_id(user_id)
User.perform(:deactivate_async, user, status)
end
def perform(%{"op" => "delete_user", "user_id" => user_id}, _job) do
user = User.get_cached_by_id(user_id)
User.perform(:delete, user)
end
def perform(
%{
"op" => "blocks_import",
"blocker_id" => blocker_id,
"blocked_identifiers" => blocked_identifiers
},
_job
) do
blocker = User.get_cached_by_id(blocker_id)
User.perform(:blocks_import, blocker, blocked_identifiers)
end
def perform(
%{
"op" => "follow_import",
"follower_id" => follower_id,
"followed_identifiers" => followed_identifiers
},
_job
) do
follower = User.get_cached_by_id(follower_id)
User.perform(:follow_import, follower, followed_identifiers)
end
def perform(%{"op" => "clean_expired_tokens"}, _job) do
CleanWorker.perform(:clean)
end
def perform(%{"op" => "media_proxy_preload", "message" => message}, _job) do
MediaProxyWarmingPolicy.perform(:preload, message)
end
def perform(%{"op" => "media_proxy_prefetch", "url" => url}, _job) do
MediaProxyWarmingPolicy.perform(:prefetch, url)
end
def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id}, _job) do
activity = Activity.get_by_id(activity_id)
Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity)
end
end

View file

@ -0,0 +1,16 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.DigestEmailsWorker do
alias Pleroma.User
use Pleroma.Workers.WorkerHelper, queue: "digest_emails"
@impl Oban.Worker
def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
user_id
|> User.get_cached_by_id()
|> Pleroma.Daemons.DigestEmailDaemon.perform()
end
end

View file

@ -0,0 +1,15 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.MailerWorker do
use Pleroma.Workers.WorkerHelper, queue: "mailer"
@impl Oban.Worker
def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do
encoded_email
|> Base.decode64!()
|> :erlang.binary_to_term()
|> Pleroma.Emails.Mailer.deliver(config)
end
end

View file

@ -0,0 +1,25 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.PublisherWorker do
alias Pleroma.Activity
alias Pleroma.Web.Federator
use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
def backoff(attempt) when is_integer(attempt) do
Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5)
end
@impl Oban.Worker
def perform(%{"op" => "publish", "activity_id" => activity_id}, _job) do
activity = Activity.get_by_id(activity_id)
Federator.perform(:publish, activity)
end
def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}, _job) do
params = Map.new(params, fn {k, v} -> {String.to_atom(k), v} end)
Federator.perform(:publish_one, String.to_atom(module_name), params)
end
end

View file

@ -0,0 +1,18 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.ReceiverWorker do
alias Pleroma.Web.Federator
use Pleroma.Workers.WorkerHelper, queue: "federator_incoming"
@impl Oban.Worker
def perform(%{"op" => "incoming_doc", "body" => doc}, _job) do
Federator.perform(:incoming_doc, doc)
end
def perform(%{"op" => "incoming_ap_doc", "params" => params}, _job) do
Federator.perform(:incoming_ap_doc, params)
end
end

View file

@ -0,0 +1,12 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.ScheduledActivityWorker do
use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities"
@impl Oban.Worker
def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do
Pleroma.Daemons.ScheduledActivityDaemon.perform(:execute, activity_id)
end
end

View file

@ -0,0 +1,26 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.SubscriberWorker do
alias Pleroma.Repo
alias Pleroma.Web.Federator
alias Pleroma.Web.Websub
use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
@impl Oban.Worker
def perform(%{"op" => "refresh_subscriptions"}, _job) do
Federator.perform(:refresh_subscriptions)
end
def perform(%{"op" => "request_subscription", "websub_id" => websub_id}, _job) do
websub = Repo.get(Websub.WebsubClientSubscription, websub_id)
Federator.perform(:request_subscription, websub)
end
def perform(%{"op" => "verify_websub", "websub_id" => websub_id}, _job) do
websub = Repo.get(Websub.WebsubServerSubscription, websub_id)
Federator.perform(:verify_websub, websub)
end
end

View file

@ -0,0 +1,15 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.TransmogrifierWorker do
alias Pleroma.User
use Pleroma.Workers.WorkerHelper, queue: "transmogrifier"
@impl Oban.Worker
def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do
user = User.get_cached_by_id(user_id)
Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user)
end
end

View file

@ -0,0 +1,16 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.WebPusherWorker do
alias Pleroma.Notification
alias Pleroma.Repo
use Pleroma.Workers.WorkerHelper, queue: "web_push"
@impl Oban.Worker
def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do
notification = Repo.get(Notification, notification_id)
Pleroma.Web.Push.Impl.perform(notification)
end
end

View file

@ -0,0 +1,46 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.WorkerHelper do
alias Pleroma.Config
alias Pleroma.Workers.WorkerHelper
def worker_args(queue) do
case Config.get([:workers, :retries, queue]) do
nil -> []
max_attempts -> [max_attempts: max_attempts]
end
end
def sidekiq_backoff(attempt, pow \\ 4, base_backoff \\ 15) do
backoff =
:math.pow(attempt, pow) +
base_backoff +
:rand.uniform(2 * base_backoff) * attempt
trunc(backoff)
end
defmacro __using__(opts) do
caller_module = __CALLER__.module
queue = Keyword.fetch!(opts, :queue)
quote do
# Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker,
queue: unquote(queue),
max_attempts: 1
def enqueue(op, params, worker_args \\ []) do
params = Map.merge(%{"op" => op}, params)
queue_atom = String.to_atom(unquote(queue))
worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom)
unquote(caller_module)
|> apply(:new, [params, worker_args])
|> Pleroma.Repo.insert()
end
end
end
end

View file

@ -101,6 +101,8 @@ defp deps do
{:phoenix_ecto, "~> 4.0"}, {:phoenix_ecto, "~> 4.0"},
{:ecto_sql, "~> 3.1"}, {:ecto_sql, "~> 3.1"},
{:postgrex, ">= 0.13.5"}, {:postgrex, ">= 0.13.5"},
{:oban, "~> 0.7"},
{:quantum, "~> 2.3"},
{:gettext, "~> 0.15"}, {:gettext, "~> 0.15"},
{:comeonin, "~> 4.1.1"}, {:comeonin, "~> 4.1.1"},
{:pbkdf2_elixir, "~> 0.12.3"}, {:pbkdf2_elixir, "~> 0.12.3"},
@ -141,7 +143,6 @@ defp deps do
{:http_signatures, {:http_signatures,
git: "https://git.pleroma.social/pleroma/http_signatures.git", git: "https://git.pleroma.social/pleroma/http_signatures.git",
ref: "293d77bb6f4a67ac8bde1428735c3b42f22cbb30"}, ref: "293d77bb6f4a67ac8bde1428735c3b42f22cbb30"},
{:pleroma_job_queue, "~> 0.3"},
{:telemetry, "~> 0.3"}, {:telemetry, "~> 0.3"},
{:prometheus_ex, "~> 3.0"}, {:prometheus_ex, "~> 3.0"},
{:prometheus_plugs, "~> 1.1"}, {:prometheus_plugs, "~> 1.1"},

View file

@ -17,7 +17,7 @@
"credo": {:hex, :credo, "0.9.3", "76fa3e9e497ab282e0cf64b98a624aa11da702854c52c82db1bf24e54ab7c97a", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:poison, ">= 0.0.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"}, "credo": {:hex, :credo, "0.9.3", "76fa3e9e497ab282e0cf64b98a624aa11da702854c52c82db1bf24e54ab7c97a", [:mix], [{:bunt, "~> 0.2.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:poison, ">= 0.0.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"},
"crontab": {:hex, :crontab, "1.1.7", "b9219f0bdc8678b94143655a8f229716c5810c0636a4489f98c0956137e53985", [:mix], [{:ecto, "~> 1.0 or ~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}], "hexpm"}, "crontab": {:hex, :crontab, "1.1.7", "b9219f0bdc8678b94143655a8f229716c5810c0636a4489f98c0956137e53985", [:mix], [{:ecto, "~> 1.0 or ~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: true]}], "hexpm"},
"crypt": {:git, "https://github.com/msantos/crypt", "1f2b58927ab57e72910191a7ebaeff984382a1d3", [ref: "1f2b58927ab57e72910191a7ebaeff984382a1d3"]}, "crypt": {:git, "https://github.com/msantos/crypt", "1f2b58927ab57e72910191a7ebaeff984382a1d3", [ref: "1f2b58927ab57e72910191a7ebaeff984382a1d3"]},
"db_connection": {:hex, :db_connection, "2.0.6", "bde2f85d047969c5b5800cb8f4b3ed6316c8cb11487afedac4aa5f93fd39abfa", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm"}, "db_connection": {:hex, :db_connection, "2.1.1", "a51e8a2ee54ef2ae6ec41a668c85787ed40cb8944928c191280fe34c15b76ae5", [:mix], [{:connection, "~> 1.0.2", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm"},
"decimal": {:hex, :decimal, "1.8.0", "ca462e0d885f09a1c5a342dbd7c1dcf27ea63548c65a65e67334f4b61803822e", [:mix], [], "hexpm"}, "decimal": {:hex, :decimal, "1.8.0", "ca462e0d885f09a1c5a342dbd7c1dcf27ea63548c65a65e67334f4b61803822e", [:mix], [], "hexpm"},
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm"}, "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm"},
"earmark": {:hex, :earmark, "1.3.6", "ce1d0675e10a5bb46b007549362bd3f5f08908843957687d8484fe7f37466b19", [:mix], [], "hexpm"}, "earmark": {:hex, :earmark, "1.3.6", "ce1d0675e10a5bb46b007549362bd3f5f08908843957687d8484fe7f37466b19", [:mix], [], "hexpm"},
@ -36,6 +36,8 @@
"excoveralls": {:hex, :excoveralls, "0.11.1", "dd677fbdd49114fdbdbf445540ec735808250d56b011077798316505064edb2c", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"}, "excoveralls": {:hex, :excoveralls, "0.11.1", "dd677fbdd49114fdbdbf445540ec735808250d56b011077798316505064edb2c", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm"},
"floki": {:hex, :floki, "0.20.4", "be42ac911fece24b4c72f3b5846774b6e61b83fe685c2fc9d62093277fb3bc86", [:mix], [{:html_entities, "~> 0.4.0", [hex: :html_entities, repo: "hexpm", optional: false]}, {:mochiweb, "~> 2.15", [hex: :mochiweb, repo: "hexpm", optional: false]}], "hexpm"}, "floki": {:hex, :floki, "0.20.4", "be42ac911fece24b4c72f3b5846774b6e61b83fe685c2fc9d62093277fb3bc86", [:mix], [{:html_entities, "~> 0.4.0", [hex: :html_entities, repo: "hexpm", optional: false]}, {:mochiweb, "~> 2.15", [hex: :mochiweb, repo: "hexpm", optional: false]}], "hexpm"},
"gen_smtp": {:hex, :gen_smtp, "0.14.0", "39846a03522456077c6429b4badfd1d55e5e7d0fdfb65e935b7c5e38549d9202", [:rebar3], [], "hexpm"}, "gen_smtp": {:hex, :gen_smtp, "0.14.0", "39846a03522456077c6429b4badfd1d55e5e7d0fdfb65e935b7c5e38549d9202", [:rebar3], [], "hexpm"},
"gen_stage": {:hex, :gen_stage, "0.14.2", "6a2a578a510c5bfca8a45e6b27552f613b41cf584b58210f017088d3d17d0b14", [:mix], [], "hexpm"},
"gen_state_machine": {:hex, :gen_state_machine, "2.0.5", "9ac15ec6e66acac994cc442dcc2c6f9796cf380ec4b08267223014be1c728a95", [:mix], [], "hexpm"},
"gettext": {:hex, :gettext, "0.17.0", "abe21542c831887a2b16f4c94556db9c421ab301aee417b7c4fbde7fbdbe01ec", [:mix], [], "hexpm"}, "gettext": {:hex, :gettext, "0.17.0", "abe21542c831887a2b16f4c94556db9c421ab301aee417b7c4fbde7fbdbe01ec", [:mix], [], "hexpm"},
"hackney": {:hex, :hackney, "1.15.1", "9f8f471c844b8ce395f7b6d8398139e26ddca9ebc171a8b91342ee15a19963f4", [:rebar3], [{:certifi, "2.5.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.4", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"}, "hackney": {:hex, :hackney, "1.15.1", "9f8f471c844b8ce395f7b6d8398139e26ddca9ebc171a8b91342ee15a19963f4", [:rebar3], [{:certifi, "2.5.1", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "6.0.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.4", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"},
"html_entities": {:hex, :html_entities, "0.4.0", "f2fee876858cf6aaa9db608820a3209e45a087c5177332799592142b50e89a6b", [:mix], [], "hexpm"}, "html_entities": {:hex, :html_entities, "0.4.0", "f2fee876858cf6aaa9db608820a3209e45a087c5177332799592142b50e89a6b", [:mix], [], "hexpm"},
@ -46,6 +48,7 @@
"jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"}, "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"},
"joken": {:hex, :joken, "2.0.1", "ec9ab31bf660f343380da033b3316855197c8d4c6ef597fa3fcb451b326beb14", [:mix], [{:jose, "~> 1.9", [hex: :jose, repo: "hexpm", optional: false]}], "hexpm"}, "joken": {:hex, :joken, "2.0.1", "ec9ab31bf660f343380da033b3316855197c8d4c6ef597fa3fcb451b326beb14", [:mix], [{:jose, "~> 1.9", [hex: :jose, repo: "hexpm", optional: false]}], "hexpm"},
"jose": {:hex, :jose, "1.9.0", "4167c5f6d06ffaebffd15cdb8da61a108445ef5e85ab8f5a7ad926fdf3ada154", [:mix, :rebar3], [{:base64url, "~> 0.0.1", [hex: :base64url, repo: "hexpm", optional: false]}], "hexpm"}, "jose": {:hex, :jose, "1.9.0", "4167c5f6d06ffaebffd15cdb8da61a108445ef5e85ab8f5a7ad926fdf3ada154", [:mix, :rebar3], [{:base64url, "~> 0.0.1", [hex: :base64url, repo: "hexpm", optional: false]}], "hexpm"},
"libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm"},
"makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"}, "makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"},
"makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"}, "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"},
"meck": {:hex, :meck, "0.8.13", "ffedb39f99b0b99703b8601c6f17c7f76313ee12de6b646e671e3188401f7866", [:rebar3], [], "hexpm"}, "meck": {:hex, :meck, "0.8.13", "ffedb39f99b0b99703b8601c6f17c7f76313ee12de6b646e671e3188401f7866", [:rebar3], [], "hexpm"},
@ -57,6 +60,7 @@
"mogrify": {:hex, :mogrify, "0.6.1", "de1b527514f2d95a7bbe9642eb556061afb337e220cf97adbf3a4e6438ed70af", [:mix], [], "hexpm"}, "mogrify": {:hex, :mogrify, "0.6.1", "de1b527514f2d95a7bbe9642eb556061afb337e220cf97adbf3a4e6438ed70af", [:mix], [], "hexpm"},
"mox": {:hex, :mox, "0.5.1", "f86bb36026aac1e6f924a4b6d024b05e9adbed5c63e8daa069bd66fb3292165b", [:mix], [], "hexpm"}, "mox": {:hex, :mox, "0.5.1", "f86bb36026aac1e6f924a4b6d024b05e9adbed5c63e8daa069bd66fb3292165b", [:mix], [], "hexpm"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.1", "c90796ecee0289dbb5ad16d3ad06f957b0cd1199769641c961cfe0b97db190e0", [:mix], [], "hexpm"}, "nimble_parsec": {:hex, :nimble_parsec, "0.5.1", "c90796ecee0289dbb5ad16d3ad06f957b0cd1199769641c961cfe0b97db190e0", [:mix], [], "hexpm"},
"oban": {:hex, :oban, "0.7.1", "171bdd1b69c1a4a839f8c768f5e962fc22d1de1513d459fb6b8e0cbd34817a9a", [:mix], [{:ecto_sql, "~> 3.1", [hex: :ecto_sql, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.14", [hex: :postgrex, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
"parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"}, "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"},
"pbkdf2_elixir": {:hex, :pbkdf2_elixir, "0.12.3", "6706a148809a29c306062862c803406e88f048277f6e85b68faf73291e820b84", [:mix], [], "hexpm"}, "pbkdf2_elixir": {:hex, :pbkdf2_elixir, "0.12.3", "6706a148809a29c306062862c803406e88f048277f6e85b68faf73291e820b84", [:mix], [], "hexpm"},
"phoenix": {:hex, :phoenix, "1.4.9", "746d098e10741c334d88143d3c94cab1756435f94387a63441792e66ec0ee974", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 1.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.8.1 or ~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"}, "phoenix": {:hex, :phoenix, "1.4.9", "746d098e10741c334d88143d3c94cab1756435f94387a63441792e66ec0ee974", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 1.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.8.1 or ~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"},
@ -64,7 +68,6 @@
"phoenix_html": {:hex, :phoenix_html, "2.13.1", "fa8f034b5328e2dfa0e4131b5569379003f34bc1fafdaa84985b0b9d2f12e68b", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"}, "phoenix_html": {:hex, :phoenix_html, "2.13.1", "fa8f034b5328e2dfa0e4131b5569379003f34bc1fafdaa84985b0b9d2f12e68b", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"},
"phoenix_pubsub": {:hex, :phoenix_pubsub, "1.1.2", "496c303bdf1b2e98a9d26e89af5bba3ab487ba3a3735f74bf1f4064d2a845a3e", [:mix], [], "hexpm"}, "phoenix_pubsub": {:hex, :phoenix_pubsub, "1.1.2", "496c303bdf1b2e98a9d26e89af5bba3ab487ba3a3735f74bf1f4064d2a845a3e", [:mix], [], "hexpm"},
"phoenix_swoosh": {:hex, :phoenix_swoosh, "0.2.0", "a7e0b32077cd6d2323ae15198839b05d9caddfa20663fd85787479e81f89520e", [:mix], [{:phoenix, "~> 1.0", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.2", [hex: :phoenix_html, repo: "hexpm", optional: false]}, {:swoosh, "~> 0.1", [hex: :swoosh, repo: "hexpm", optional: false]}], "hexpm"}, "phoenix_swoosh": {:hex, :phoenix_swoosh, "0.2.0", "a7e0b32077cd6d2323ae15198839b05d9caddfa20663fd85787479e81f89520e", [:mix], [{:phoenix, "~> 1.0", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.2", [hex: :phoenix_html, repo: "hexpm", optional: false]}, {:swoosh, "~> 0.1", [hex: :swoosh, repo: "hexpm", optional: false]}], "hexpm"},
"pleroma_job_queue": {:hex, :pleroma_job_queue, "0.3.0", "b84538d621f0c3d6fcc1cff9d5648d3faaf873b8b21b94e6503428a07a48ec47", [:mix], [{:crontab, "~> 1.1", [hex: :crontab, repo: "hexpm", optional: false]}], "hexpm"},
"plug": {:hex, :plug, "1.8.2", "0bcce1daa420f189a6491f3940cc77ea7fb1919761175c9c3b59800d897440fc", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm"}, "plug": {:hex, :plug, "1.8.2", "0bcce1daa420f189a6491f3940cc77ea7fb1919761175c9c3b59800d897440fc", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm"},
"plug_cowboy": {:hex, :plug_cowboy, "2.1.0", "b75768153c3a8a9e8039d4b25bb9b14efbc58e9c4a6e6a270abff1cd30cbe320", [:mix], [{:cowboy, "~> 2.5", [hex: :cowboy, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"}, "plug_cowboy": {:hex, :plug_cowboy, "2.1.0", "b75768153c3a8a9e8039d4b25bb9b14efbc58e9c4a6e6a270abff1cd30cbe320", [:mix], [{:cowboy, "~> 2.5", [hex: :cowboy, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"},
"plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm"}, "plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm"},
@ -77,9 +80,11 @@
"prometheus_phoenix": {:hex, :prometheus_phoenix, "1.3.0", "c4b527e0b3a9ef1af26bdcfbfad3998f37795b9185d475ca610fe4388fdd3bb5", [:mix], [{:phoenix, "~> 1.4", [hex: :phoenix, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 1.3 or ~> 2.0 or ~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}], "hexpm"}, "prometheus_phoenix": {:hex, :prometheus_phoenix, "1.3.0", "c4b527e0b3a9ef1af26bdcfbfad3998f37795b9185d475ca610fe4388fdd3bb5", [:mix], [{:phoenix, "~> 1.4", [hex: :phoenix, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 1.3 or ~> 2.0 or ~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}], "hexpm"},
"prometheus_plugs": {:hex, :prometheus_plugs, "1.1.5", "25933d48f8af3a5941dd7b621c889749894d8a1082a6ff7c67cc99dec26377c5", [:mix], [{:accept, "~> 0.1", [hex: :accept, repo: "hexpm", optional: false]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 1.1 or ~> 2.0 or ~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}, {:prometheus_process_collector, "~> 1.1", [hex: :prometheus_process_collector, repo: "hexpm", optional: true]}], "hexpm"}, "prometheus_plugs": {:hex, :prometheus_plugs, "1.1.5", "25933d48f8af3a5941dd7b621c889749894d8a1082a6ff7c67cc99dec26377c5", [:mix], [{:accept, "~> 0.1", [hex: :accept, repo: "hexpm", optional: false]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 1.1 or ~> 2.0 or ~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}, {:prometheus_process_collector, "~> 1.1", [hex: :prometheus_process_collector, repo: "hexpm", optional: true]}], "hexpm"},
"quack": {:hex, :quack, "0.1.1", "cca7b4da1a233757fdb44b3334fce80c94785b3ad5a602053b7a002b5a8967bf", [:mix], [{:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: false]}, {:tesla, "~> 1.2.0", [hex: :tesla, repo: "hexpm", optional: false]}], "hexpm"}, "quack": {:hex, :quack, "0.1.1", "cca7b4da1a233757fdb44b3334fce80c94785b3ad5a602053b7a002b5a8967bf", [:mix], [{:poison, ">= 1.0.0", [hex: :poison, repo: "hexpm", optional: false]}, {:tesla, "~> 1.2.0", [hex: :tesla, repo: "hexpm", optional: false]}], "hexpm"},
"quantum": {:hex, :quantum, "2.3.4", "72a0e8855e2adc101459eac8454787cb74ab4169de6ca50f670e72142d4960e9", [:mix], [{:calendar, "~> 0.17", [hex: :calendar, repo: "hexpm", optional: true]}, {:crontab, "~> 1.1", [hex: :crontab, repo: "hexpm", optional: false]}, {:gen_stage, "~> 0.12", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:swarm, "~> 3.3", [hex: :swarm, repo: "hexpm", optional: false]}, {:timex, "~> 3.1", [hex: :timex, repo: "hexpm", optional: true]}], "hexpm"},
"ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm"}, "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm"},
"recon": {:git, "https://github.com/ferd/recon.git", "75d70c7c08926d2f24f1ee6de14ee50fe8a52763", [tag: "2.4.0"]}, "recon": {:git, "https://github.com/ferd/recon.git", "75d70c7c08926d2f24f1ee6de14ee50fe8a52763", [tag: "2.4.0"]},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.4", "f0eafff810d2041e93f915ef59899c923f4568f4585904d010387ed74988e77b", [:make, :mix, :rebar3], [], "hexpm"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.4", "f0eafff810d2041e93f915ef59899c923f4568f4585904d010387ed74988e77b", [:make, :mix, :rebar3], [], "hexpm"},
"swarm": {:hex, :swarm, "3.4.0", "64f8b30055d74640d2186c66354b33b999438692a91be275bb89cdc7e401f448", [:mix], [{:gen_state_machine, "~> 2.0", [hex: :gen_state_machine, repo: "hexpm", optional: false]}, {:libring, "~> 1.0", [hex: :libring, repo: "hexpm", optional: false]}], "hexpm"},
"sweet_xml": {:hex, :sweet_xml, "0.6.6", "fc3e91ec5dd7c787b6195757fbcf0abc670cee1e4172687b45183032221b66b8", [:mix], [], "hexpm"}, "sweet_xml": {:hex, :sweet_xml, "0.6.6", "fc3e91ec5dd7c787b6195757fbcf0abc670cee1e4172687b45183032221b66b8", [:mix], [], "hexpm"},
"swoosh": {:hex, :swoosh, "0.23.2", "7dda95ff0bf54a2298328d6899c74dae1223777b43563ccebebb4b5d2b61df38", [:mix], [{:cowboy, "~> 1.0.1 or ~> 1.1 or ~> 2.4", [hex: :cowboy, repo: "hexpm", optional: true]}, {:gen_smtp, "~> 0.13", [hex: :gen_smtp, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mail, "~> 0.2", [hex: :mail, repo: "hexpm", optional: true]}, {:mime, "~> 1.1", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 1.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}], "hexpm"}, "swoosh": {:hex, :swoosh, "0.23.2", "7dda95ff0bf54a2298328d6899c74dae1223777b43563ccebebb4b5d2b61df38", [:mix], [{:cowboy, "~> 1.0.1 or ~> 1.1 or ~> 2.4", [hex: :cowboy, repo: "hexpm", optional: true]}, {:gen_smtp, "~> 0.13", [hex: :gen_smtp, repo: "hexpm", optional: true]}, {:hackney, "~> 1.9", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mail, "~> 0.2", [hex: :mail, repo: "hexpm", optional: true]}, {:mime, "~> 1.1", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 1.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}], "hexpm"},
"syslog": {:git, "https://github.com/Vagabond/erlang-syslog.git", "4a6c6f2c996483e86c1320e9553f91d337bcb6aa", [tag: "1.0.5"]}, "syslog": {:git, "https://github.com/Vagabond/erlang-syslog.git", "4a6c6f2c996483e86c1320e9553f91d337bcb6aa", [tag: "1.0.5"]},

View file

@ -0,0 +1,6 @@
defmodule Pleroma.Repo.Migrations.AddObanJobsTable do
use Ecto.Migration
defdelegate up, to: Oban.Migrations
defdelegate down, to: Oban.Migrations
end

View file

@ -7,6 +7,7 @@ defmodule Pleroma.ActivityTest do
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Bookmark alias Pleroma.Bookmark
alias Pleroma.Object alias Pleroma.Object
alias Pleroma.Tests.ObanHelpers
alias Pleroma.ThreadMute alias Pleroma.ThreadMute
import Pleroma.Factory import Pleroma.Factory
@ -125,7 +126,8 @@ test "when association is not loaded" do
} }
{:ok, local_activity} = Pleroma.Web.CommonAPI.post(user, %{"status" => "find me!"}) {:ok, local_activity} = Pleroma.Web.CommonAPI.post(user, %{"status" => "find me!"})
{:ok, remote_activity} = Pleroma.Web.Federator.incoming_ap_doc(params) {:ok, job} = Pleroma.Web.Federator.incoming_ap_doc(params)
{:ok, remote_activity} = ObanHelpers.perform(job)
%{local_activity: local_activity, remote_activity: remote_activity, user: user} %{local_activity: local_activity, remote_activity: remote_activity, user: user}
end end

View file

@ -22,6 +22,8 @@ test "it goes through old direct conversations" do
{:ok, _activity} = {:ok, _activity} =
CommonAPI.post(user, %{"visibility" => "direct", "status" => "hey @#{other_user.nickname}"}) CommonAPI.post(user, %{"visibility" => "direct", "status" => "hey @#{other_user.nickname}"})
Pleroma.Tests.ObanHelpers.perform_all()
Repo.delete_all(Conversation) Repo.delete_all(Conversation)
Repo.delete_all(Conversation.Participation) Repo.delete_all(Conversation.Participation)

View file

@ -10,7 +10,7 @@ defmodule Pleroma.ActivityExpirationWorkerTest do
test "deletes an activity" do test "deletes an activity" do
activity = insert(:note_activity) activity = insert(:note_activity)
expiration = insert(:expiration_in_the_past, %{activity_id: activity.id}) expiration = insert(:expiration_in_the_past, %{activity_id: activity.id})
Pleroma.ActivityExpirationWorker.perform(:execute, expiration.id) Pleroma.Daemons.ActivityExpirationDaemon.perform(:execute, expiration.id)
refute Repo.get(Activity, activity.id) refute Repo.get(Activity, activity.id)
end end

View file

@ -2,11 +2,12 @@
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.DigestEmailWorkerTest do defmodule Pleroma.DigestEmailDaemonTest do
use Pleroma.DataCase use Pleroma.DataCase
import Pleroma.Factory import Pleroma.Factory
alias Pleroma.DigestEmailWorker alias Pleroma.Daemons.DigestEmailDaemon
alias Pleroma.Tests.ObanHelpers
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
@ -22,7 +23,10 @@ test "it sends digest emails" do
User.switch_email_notifications(user2, "digest", true) User.switch_email_notifications(user2, "digest", true)
CommonAPI.post(user, %{"status" => "hey @#{user2.nickname}!"}) CommonAPI.post(user, %{"status" => "hey @#{user2.nickname}!"})
DigestEmailWorker.perform() DigestEmailDaemon.perform()
ObanHelpers.perform_all()
# Performing job(s) enqueued at previous step
ObanHelpers.perform_all()
assert_received {:email, email} assert_received {:email, email}
assert email.to == [{user2.name, user2.email}] assert email.to == [{user2.name, user2.email}]

View file

@ -2,7 +2,7 @@
# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> # Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.ScheduledActivityWorkerTest do defmodule Pleroma.ScheduledActivityDaemonTest do
use Pleroma.DataCase use Pleroma.DataCase
alias Pleroma.ScheduledActivity alias Pleroma.ScheduledActivity
import Pleroma.Factory import Pleroma.Factory
@ -10,7 +10,7 @@ defmodule Pleroma.ScheduledActivityWorkerTest do
test "creates a status from the scheduled activity" do test "creates a status from the scheduled activity" do
user = insert(:user) user = insert(:user)
scheduled_activity = insert(:scheduled_activity, user: user, params: %{status: "hi"}) scheduled_activity = insert(:scheduled_activity, user: user, params: %{status: "hi"})
Pleroma.ScheduledActivityWorker.perform(:execute, scheduled_activity.id) Pleroma.Daemons.ScheduledActivityDaemon.perform(:execute, scheduled_activity.id)
refute Repo.get(ScheduledActivity, scheduled_activity.id) refute Repo.get(ScheduledActivity, scheduled_activity.id)
activity = Repo.all(Pleroma.Activity) |> Enum.find(&(&1.actor == user.ap_id)) activity = Repo.all(Pleroma.Activity) |> Enum.find(&(&1.actor == user.ap_id))

View file

@ -8,6 +8,7 @@ defmodule Pleroma.NotificationTest do
import Pleroma.Factory import Pleroma.Factory
alias Pleroma.Notification alias Pleroma.Notification
alias Pleroma.Tests.ObanHelpers
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
@ -588,7 +589,8 @@ test "notifications are deleted if a local user is deleted" do
refute Enum.empty?(Notification.for_user(other_user)) refute Enum.empty?(Notification.for_user(other_user))
User.delete(user) {:ok, job} = User.delete(user)
ObanHelpers.perform(job)
assert Enum.empty?(Notification.for_user(other_user)) assert Enum.empty?(Notification.for_user(other_user))
end end
@ -633,6 +635,7 @@ test "notifications are deleted if a remote user is deleted" do
} }
{:ok, _delete_activity} = Transmogrifier.handle_incoming(delete_user_message) {:ok, _delete_activity} = Transmogrifier.handle_incoming(delete_user_message)
ObanHelpers.perform_all()
assert Enum.empty?(Notification.for_user(local_user)) assert Enum.empty?(Notification.for_user(local_user))
end end

View file

@ -0,0 +1,42 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Tests.ObanHelpers do
@moduledoc """
Oban test helpers.
"""
alias Pleroma.Repo
def perform_all do
Oban.Job
|> Repo.all()
|> perform()
end
def perform(%Oban.Job{} = job) do
res = apply(String.to_existing_atom("Elixir." <> job.worker), :perform, [job.args, job])
Repo.delete(job)
res
end
def perform(jobs) when is_list(jobs) do
for job <- jobs, do: perform(job)
end
def member?(%{} = job_args, jobs) when is_list(jobs) do
Enum.any?(jobs, fn job ->
member?(job_args, job.args)
end)
end
def member?(%{} = test_attrs, %{} = attrs) do
Enum.all?(
test_attrs,
fn {k, _v} -> member?(test_attrs[k], attrs[k]) end
)
end
def member?(x, y), do: x == y
end

View file

@ -4,6 +4,7 @@ defmodule Mix.Tasks.Pleroma.DigestTest do
import Pleroma.Factory import Pleroma.Factory
import Swoosh.TestAssertions import Swoosh.TestAssertions
alias Pleroma.Tests.ObanHelpers
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
setup_all do setup_all do
@ -39,6 +40,8 @@ test "Sends digest to the given user" do
:ok = Mix.Tasks.Pleroma.Digest.run(["test", user2.nickname, yesterday_date]) :ok = Mix.Tasks.Pleroma.Digest.run(["test", user2.nickname, yesterday_date])
ObanHelpers.perform_all()
assert_receive {:mix_shell, :info, [message]} assert_receive {:mix_shell, :info, [message]}
assert message =~ "Digest email have been sent" assert message =~ "Digest email have been sent"

View file

@ -7,14 +7,16 @@ defmodule Pleroma.UserTest do
alias Pleroma.Builders.UserBuilder alias Pleroma.Builders.UserBuilder
alias Pleroma.Object alias Pleroma.Object
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.Tests.ObanHelpers
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
use Pleroma.DataCase use Pleroma.DataCase
use Oban.Testing, repo: Pleroma.Repo
import Pleroma.Factory
import Mock import Mock
import Pleroma.Factory
setup_all do setup_all do
Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end) Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
@ -709,7 +711,9 @@ test "it imports user followings from list" do
user3.nickname user3.nickname
] ]
result = User.follow_import(user1, identifiers) {:ok, job} = User.follow_import(user1, identifiers)
result = ObanHelpers.perform(job)
assert is_list(result) assert is_list(result)
assert result == [user2, user3] assert result == [user2, user3]
end end
@ -920,7 +924,9 @@ test "it imports user blocks from list" do
user3.nickname user3.nickname
] ]
result = User.blocks_import(user1, identifiers) {:ok, job} = User.blocks_import(user1, identifiers)
result = ObanHelpers.perform(job)
assert is_list(result) assert is_list(result)
assert result == [user2, user3] assert result == [user2, user3]
end end
@ -1037,7 +1043,9 @@ test ".delete_user_activities deletes all create activities", %{user: user} do
test "it deletes deactivated user" do test "it deletes deactivated user" do
{:ok, user} = insert(:user, info: %{deactivated: true}) |> User.set_cache() {:ok, user} = insert(:user, info: %{deactivated: true}) |> User.set_cache()
assert {:ok, _} = User.delete(user) {:ok, job} = User.delete(user)
{:ok, _user} = ObanHelpers.perform(job)
refute User.get_by_id(user.id) refute User.get_by_id(user.id)
end end
@ -1055,7 +1063,8 @@ test "it deletes a user, all follow relationships and all activities", %{user: u
{:ok, like_two, _} = CommonAPI.favorite(activity.id, follower) {:ok, like_two, _} = CommonAPI.favorite(activity.id, follower)
{:ok, repeat, _} = CommonAPI.repeat(activity_two.id, user) {:ok, repeat, _} = CommonAPI.repeat(activity_two.id, user)
{:ok, _} = User.delete(user) {:ok, job} = User.delete(user)
{:ok, _user} = ObanHelpers.perform(job)
follower = User.get_cached_by_id(follower.id) follower = User.get_cached_by_id(follower.id)
@ -1087,12 +1096,18 @@ test "it deletes a user, all follow relationships and all activities", %{user: u
{:ok, follower} = User.get_or_fetch_by_ap_id("http://mastodon.example.org/users/admin") {:ok, follower} = User.get_or_fetch_by_ap_id("http://mastodon.example.org/users/admin")
{:ok, _} = User.follow(follower, user) {:ok, _} = User.follow(follower, user)
{:ok, _user} = User.delete(user) {:ok, job} = User.delete(user)
{:ok, _user} = ObanHelpers.perform(job)
assert called( assert ObanHelpers.member?(
Pleroma.Web.ActivityPub.Publisher.publish_one(%{ %{
inbox: "http://mastodon.example.org/inbox" "op" => "publish_one",
}) "params" => %{
"inbox" => "http://mastodon.example.org/inbox",
"id" => "pleroma:fakeid"
}
},
all_enqueued(worker: Pleroma.Workers.PublisherWorker)
) )
end end
end end
@ -1186,7 +1201,8 @@ test "invalidate_cache works" do
test "User.delete() plugs any possible zombie objects" do test "User.delete() plugs any possible zombie objects" do
user = insert(:user) user = insert(:user)
{:ok, _} = User.delete(user) {:ok, job} = User.delete(user)
{:ok, _} = ObanHelpers.perform(job)
{:ok, cached_user} = Cachex.get(:user_cache, "ap_id:#{user.ap_id}") {:ok, cached_user} = Cachex.get(:user_cache, "ap_id:#{user.ap_id}")

View file

@ -4,16 +4,20 @@
defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
use Pleroma.Web.ConnCase use Pleroma.Web.ConnCase
use Oban.Testing, repo: Pleroma.Repo
import Pleroma.Factory import Pleroma.Factory
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Instances alias Pleroma.Instances
alias Pleroma.Object alias Pleroma.Object
alias Pleroma.Tests.ObanHelpers
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.ActivityPub.ObjectView alias Pleroma.Web.ActivityPub.ObjectView
alias Pleroma.Web.ActivityPub.Relay alias Pleroma.Web.ActivityPub.Relay
alias Pleroma.Web.ActivityPub.UserView alias Pleroma.Web.ActivityPub.UserView
alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
alias Pleroma.Workers.ReceiverWorker
setup_all do setup_all do
Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end) Tesla.Mock.mock_global(fn env -> apply(HttpRequestMock, :request, [env]) end)
@ -365,7 +369,8 @@ test "it inserts an incoming activity into the database", %{conn: conn} do
|> post("/inbox", data) |> post("/inbox", data)
assert "ok" == json_response(conn, 200) assert "ok" == json_response(conn, 200)
:timer.sleep(500)
ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
assert Activity.get_by_ap_id(data["id"]) assert Activity.get_by_ap_id(data["id"])
end end
@ -407,7 +412,7 @@ test "it inserts an incoming activity into the database", %{conn: conn, data: da
|> post("/users/#{user.nickname}/inbox", data) |> post("/users/#{user.nickname}/inbox", data)
assert "ok" == json_response(conn, 200) assert "ok" == json_response(conn, 200)
:timer.sleep(500) ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
assert Activity.get_by_ap_id(data["id"]) assert Activity.get_by_ap_id(data["id"])
end end
@ -436,7 +441,7 @@ test "it accepts messages from actors that are followed by the user", %{
|> post("/users/#{recipient.nickname}/inbox", data) |> post("/users/#{recipient.nickname}/inbox", data)
assert "ok" == json_response(conn, 200) assert "ok" == json_response(conn, 200)
:timer.sleep(500) ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
assert Activity.get_by_ap_id(data["id"]) assert Activity.get_by_ap_id(data["id"])
end end
@ -526,6 +531,8 @@ test "it removes all follower collections but actor's", %{conn: conn} do
|> post("/users/#{recipient.nickname}/inbox", data) |> post("/users/#{recipient.nickname}/inbox", data)
|> json_response(200) |> json_response(200)
ObanHelpers.perform(all_enqueued(worker: ReceiverWorker))
activity = Activity.get_by_ap_id(data["id"]) activity = Activity.get_by_ap_id(data["id"])
assert activity.id assert activity.id
@ -601,6 +608,7 @@ test "it inserts an incoming create activity into the database", %{conn: conn} d
|> post("/users/#{user.nickname}/outbox", data) |> post("/users/#{user.nickname}/outbox", data)
result = json_response(conn, 201) result = json_response(conn, 201)
assert Activity.get_by_ap_id(result["id"]) assert Activity.get_by_ap_id(result["id"])
end end

View file

@ -686,7 +686,7 @@ test "returns reblogs for users for whom reblogs have not been muted" do
user = insert(:user) user = insert(:user)
{:ok, like_activity, _object} = ActivityPub.like(user, object_activity) {:ok, like_activity, _object} = ActivityPub.like(user, object_activity)
assert called(Pleroma.Web.Federator.publish(like_activity, 5)) assert called(Pleroma.Web.Federator.publish(like_activity))
end end
test "returns exist activity if object already liked" do test "returns exist activity if object already liked" do
@ -747,7 +747,7 @@ test "adds a like activity to the db" do
{:ok, unlike_activity, _, object} = ActivityPub.unlike(user, object) {:ok, unlike_activity, _, object} = ActivityPub.unlike(user, object)
assert object.data["like_count"] == 0 assert object.data["like_count"] == 0
assert called(Pleroma.Web.Federator.publish(unlike_activity, 5)) assert called(Pleroma.Web.Federator.publish(unlike_activity))
end end
test "unliking a previously liked object" do test "unliking a previously liked object" do

View file

@ -6,6 +6,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicyTest do
use Pleroma.DataCase use Pleroma.DataCase
alias Pleroma.HTTP alias Pleroma.HTTP
alias Pleroma.Tests.ObanHelpers
alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
import Mock import Mock
@ -24,6 +25,11 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicyTest do
test "it prefetches media proxy URIs" do test "it prefetches media proxy URIs" do
with_mock HTTP, get: fn _, _, _ -> {:ok, []} end do with_mock HTTP, get: fn _, _, _ -> {:ok, []} end do
MediaProxyWarmingPolicy.filter(@message) MediaProxyWarmingPolicy.filter(@message)
ObanHelpers.perform_all()
# Performing jobs which has been just enqueued
ObanHelpers.perform_all()
assert called(HTTP.get(:_, :_, :_)) assert called(HTTP.get(:_, :_, :_))
end end
end end

View file

@ -263,7 +263,7 @@ test "it returns inbox for messages involving single recipients in total" do
assert called( assert called(
Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{ Pleroma.Web.Federator.Publisher.enqueue_one(Publisher, %{
inbox: "https://domain.com/users/nick1/inbox", inbox: "https://domain.com/users/nick1/inbox",
actor: actor, actor_id: actor.id,
id: note_activity.data["id"] id: note_activity.data["id"]
}) })
) )

View file

@ -99,7 +99,7 @@ test "returns error when object is unknown" do
assert activity.data["type"] == "Announce" assert activity.data["type"] == "Announce"
assert activity.data["actor"] == service_actor.ap_id assert activity.data["actor"] == service_actor.ap_id
assert activity.data["object"] == obj.data["id"] assert activity.data["object"] == obj.data["id"]
assert called(Pleroma.Web.Federator.publish(activity, 5)) assert called(Pleroma.Web.Federator.publish(activity))
end end
test_with_mock "returns announce activity and not publish to federate", test_with_mock "returns announce activity and not publish to federate",
@ -113,7 +113,7 @@ test "returns error when object is unknown" do
assert activity.data["type"] == "Announce" assert activity.data["type"] == "Announce"
assert activity.data["actor"] == service_actor.ap_id assert activity.data["actor"] == service_actor.ap_id
assert activity.data["object"] == obj.data["id"] assert activity.data["object"] == obj.data["id"]
refute called(Pleroma.Web.Federator.publish(activity, 5)) refute called(Pleroma.Web.Federator.publish(activity))
end end
end end
end end

View file

@ -8,6 +8,7 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do
alias Pleroma.Object alias Pleroma.Object
alias Pleroma.Object.Fetcher alias Pleroma.Object.Fetcher
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.Tests.ObanHelpers
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.Transmogrifier
@ -648,6 +649,7 @@ test "it works for incoming user deletes" do
|> Poison.decode!() |> Poison.decode!()
{:ok, _} = Transmogrifier.handle_incoming(data) {:ok, _} = Transmogrifier.handle_incoming(data)
ObanHelpers.perform_all()
refute User.get_cached_by_ap_id(ap_id) refute User.get_cached_by_ap_id(ap_id)
end end
@ -1210,6 +1212,8 @@ test "it upgrades a user to activitypub" do
assert user.info.note_count == 1 assert user.info.note_count == 1
{:ok, user} = Transmogrifier.upgrade_user_from_ap_id("https://niu.moe/users/rye") {:ok, user} = Transmogrifier.upgrade_user_from_ap_id("https://niu.moe/users/rye")
ObanHelpers.perform_all()
assert user.info.ap_enabled assert user.info.ap_enabled
assert user.info.note_count == 1 assert user.info.note_count == 1
assert user.follower_address == "https://niu.moe/users/rye/followers" assert user.follower_address == "https://niu.moe/users/rye/followers"

View file

@ -2096,7 +2096,7 @@ test "queues key as atom", %{conn: conn} do
post(conn, "/api/pleroma/admin/config", %{ post(conn, "/api/pleroma/admin/config", %{
configs: [ configs: [
%{ %{
"group" => "pleroma_job_queue", "group" => "oban",
"key" => ":queues", "key" => ":queues",
"value" => [ "value" => [
%{"tuple" => [":federator_incoming", 50]}, %{"tuple" => [":federator_incoming", 50]},
@ -2114,7 +2114,7 @@ test "queues key as atom", %{conn: conn} do
assert json_response(conn, 200) == %{ assert json_response(conn, 200) == %{
"configs" => [ "configs" => [
%{ %{
"group" => "pleroma_job_queue", "group" => "oban",
"key" => ":queues", "key" => ":queues",
"value" => [ "value" => [
%{"tuple" => [":federator_incoming", 50]}, %{"tuple" => [":federator_incoming", 50]},

View file

@ -4,9 +4,14 @@
defmodule Pleroma.Web.FederatorTest do defmodule Pleroma.Web.FederatorTest do
alias Pleroma.Instances alias Pleroma.Instances
alias Pleroma.Tests.ObanHelpers
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Federator alias Pleroma.Web.Federator
alias Pleroma.Workers.PublisherWorker
use Pleroma.DataCase use Pleroma.DataCase
use Oban.Testing, repo: Pleroma.Repo
import Pleroma.Factory import Pleroma.Factory
import Mock import Mock
@ -24,15 +29,6 @@ defmodule Pleroma.Web.FederatorTest do
clear_config([:instance, :rewrite_policy]) clear_config([:instance, :rewrite_policy])
clear_config([:mrf_keyword]) clear_config([:mrf_keyword])
describe "Publisher.perform" do
test "call `perform` with unknown task" do
assert {
:error,
"Don't know what to do with this"
} = Pleroma.Web.Federator.Publisher.perform("test", :ok, :ok)
end
end
describe "Publish an activity" do describe "Publish an activity" do
setup do setup do
user = insert(:user) user = insert(:user)
@ -53,6 +49,7 @@ test "with relays active, it publishes to the relay", %{
} do } do
with_mocks([relay_mock]) do with_mocks([relay_mock]) do
Federator.publish(activity) Federator.publish(activity)
ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
end end
assert_received :relay_publish assert_received :relay_publish
@ -66,6 +63,7 @@ test "with relays deactivated, it does not publish to the relay", %{
with_mocks([relay_mock]) do with_mocks([relay_mock]) do
Federator.publish(activity) Federator.publish(activity)
ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
end end
refute_received :relay_publish refute_received :relay_publish
@ -73,10 +71,7 @@ test "with relays deactivated, it does not publish to the relay", %{
end end
describe "Targets reachability filtering in `publish`" do describe "Targets reachability filtering in `publish`" do
test_with_mock "it federates only to reachable instances via AP", test "it federates only to reachable instances via AP" do
Pleroma.Web.ActivityPub.Publisher,
[:passthrough],
[] do
user = insert(:user) user = insert(:user)
{inbox1, inbox2} = {inbox1, inbox2} =
@ -104,20 +99,20 @@ test "with relays deactivated, it does not publish to the relay", %{
{:ok, _activity} = {:ok, _activity} =
CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"}) CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"})
assert called( expected_dt = NaiveDateTime.to_iso8601(dt)
Pleroma.Web.ActivityPub.Publisher.publish_one(%{
inbox: inbox1,
unreachable_since: dt
})
)
refute called(Pleroma.Web.ActivityPub.Publisher.publish_one(%{inbox: inbox2})) ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
assert ObanHelpers.member?(
%{
"op" => "publish_one",
"params" => %{"inbox" => inbox1, "unreachable_since" => expected_dt}
},
all_enqueued(worker: PublisherWorker)
)
end end
test_with_mock "it federates only to reachable instances via Websub", test "it federates only to reachable instances via Websub" do
Pleroma.Web.Websub,
[:passthrough],
[] do
user = insert(:user) user = insert(:user)
websub_topic = Pleroma.Web.OStatus.feed_path(user) websub_topic = Pleroma.Web.OStatus.feed_path(user)
@ -142,23 +137,27 @@ test "with relays deactivated, it does not publish to the relay", %{
{:ok, _activity} = CommonAPI.post(user, %{"status" => "HI"}) {:ok, _activity} = CommonAPI.post(user, %{"status" => "HI"})
assert called( expected_callback = sub2.callback
Pleroma.Web.Websub.publish_one(%{ expected_dt = NaiveDateTime.to_iso8601(dt)
callback: sub2.callback,
unreachable_since: dt
})
)
refute called(Pleroma.Web.Websub.publish_one(%{callback: sub1.callback})) ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
assert ObanHelpers.member?(
%{
"op" => "publish_one",
"params" => %{
"callback" => expected_callback,
"unreachable_since" => expected_dt
}
},
all_enqueued(worker: PublisherWorker)
)
end end
test_with_mock "it federates only to reachable instances via Salmon", test "it federates only to reachable instances via Salmon" do
Pleroma.Web.Salmon,
[:passthrough],
[] do
user = insert(:user) user = insert(:user)
remote_user1 = _remote_user1 =
insert(:user, %{ insert(:user, %{
local: false, local: false,
nickname: "nick1@domain.com", nickname: "nick1@domain.com",
@ -174,6 +173,8 @@ test "with relays deactivated, it does not publish to the relay", %{
info: %{salmon: "https://domain2.com/salmon"} info: %{salmon: "https://domain2.com/salmon"}
}) })
remote_user2_id = remote_user2.id
dt = NaiveDateTime.utc_now() dt = NaiveDateTime.utc_now()
Instances.set_unreachable(remote_user2.ap_id, dt) Instances.set_unreachable(remote_user2.ap_id, dt)
@ -182,14 +183,20 @@ test "with relays deactivated, it does not publish to the relay", %{
{:ok, _activity} = {:ok, _activity} =
CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"}) CommonAPI.post(user, %{"status" => "HI @nick1@domain.com, @nick2@domain2.com!"})
assert called( expected_dt = NaiveDateTime.to_iso8601(dt)
Pleroma.Web.Salmon.publish_one(%{
recipient: remote_user2,
unreachable_since: dt
})
)
refute called(Pleroma.Web.Salmon.publish_one(%{recipient: remote_user1})) ObanHelpers.perform(all_enqueued(worker: PublisherWorker))
assert ObanHelpers.member?(
%{
"op" => "publish_one",
"params" => %{
"recipient_id" => remote_user2_id,
"unreachable_since" => expected_dt
}
},
all_enqueued(worker: PublisherWorker)
)
end end
end end
@ -209,7 +216,8 @@ test "successfully processes incoming AP docs with correct origin" do
"to" => ["https://www.w3.org/ns/activitystreams#Public"] "to" => ["https://www.w3.org/ns/activitystreams#Public"]
} }
{:ok, _activity} = Federator.incoming_ap_doc(params) assert {:ok, job} = Federator.incoming_ap_doc(params)
assert {:ok, _activity} = ObanHelpers.perform(job)
end end
test "rejects incoming AP docs with incorrect origin" do test "rejects incoming AP docs with incorrect origin" do
@ -227,7 +235,8 @@ test "rejects incoming AP docs with incorrect origin" do
"to" => ["https://www.w3.org/ns/activitystreams#Public"] "to" => ["https://www.w3.org/ns/activitystreams#Public"]
} }
:error = Federator.incoming_ap_doc(params) assert {:ok, job} = Federator.incoming_ap_doc(params)
assert :error = ObanHelpers.perform(job)
end end
test "it does not crash if MRF rejects the post" do test "it does not crash if MRF rejects the post" do
@ -242,7 +251,8 @@ test "it does not crash if MRF rejects the post" do
File.read!("test/fixtures/mastodon-post-activity.json") File.read!("test/fixtures/mastodon-post-activity.json")
|> Poison.decode!() |> Poison.decode!()
assert Federator.incoming_ap_doc(params) == :error assert {:ok, job} = Federator.incoming_ap_doc(params)
assert :error = ObanHelpers.perform(job)
end end
end end
end end

View file

@ -16,7 +16,8 @@ defmodule Pleroma.Instances.InstanceTest do
describe "set_reachable/1" do describe "set_reachable/1" do
test "clears `unreachable_since` of existing matching Instance record having non-nil `unreachable_since`" do test "clears `unreachable_since` of existing matching Instance record having non-nil `unreachable_since`" do
instance = insert(:instance, unreachable_since: NaiveDateTime.utc_now()) unreachable_since = NaiveDateTime.to_iso8601(NaiveDateTime.utc_now())
instance = insert(:instance, unreachable_since: unreachable_since)
assert {:ok, instance} = Instance.set_reachable(instance.host) assert {:ok, instance} = Instance.set_reachable(instance.host)
refute instance.unreachable_since refute instance.unreachable_since

View file

@ -13,6 +13,7 @@ defmodule Pleroma.Web.MastodonAPI.MastodonAPIControllerTest do
alias Pleroma.Object alias Pleroma.Object
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.ScheduledActivity alias Pleroma.ScheduledActivity
alias Pleroma.Tests.ObanHelpers
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
@ -3897,6 +3898,7 @@ test "it creates a PasswordResetToken record for user", %{user: user} do
end end
test "it sends an email to user", %{user: user} do test "it sends an email to user", %{user: user} do
ObanHelpers.perform_all()
token_record = Repo.get_by(Pleroma.PasswordResetToken, user_id: user.id) token_record = Repo.get_by(Pleroma.PasswordResetToken, user_id: user.id)
email = Pleroma.Emails.UserEmail.password_reset_email(user, token_record.token) email = Pleroma.Emails.UserEmail.password_reset_email(user, token_record.token)
@ -3957,6 +3959,8 @@ test "resend account confirmation email", %{conn: conn, user: user} do
|> post("/api/v1/pleroma/accounts/confirmation_resend?email=#{user.email}") |> post("/api/v1/pleroma/accounts/confirmation_resend?email=#{user.email}")
|> json_response(:no_content) |> json_response(:no_content)
ObanHelpers.perform_all()
email = Pleroma.Emails.UserEmail.account_confirmation_email(user) email = Pleroma.Emails.UserEmail.account_confirmation_email(user)
notify_email = Config.get([:instance, :notify_email]) notify_email = Config.get([:instance, :notify_email])
instance_name = Config.get([:instance, :name]) instance_name = Config.get([:instance, :name])

View file

@ -1,48 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule MockActivityPub do
def publish_one({ret, waiter}) do
send(waiter, :complete)
{ret, "success"}
end
end
defmodule Pleroma.Web.Federator.RetryQueueTest do
use Pleroma.DataCase
alias Pleroma.Web.Federator.RetryQueue
@small_retry_count 0
@hopeless_retry_count 10
setup do
RetryQueue.reset_stats()
end
test "RetryQueue responds to stats request" do
assert %{delivered: 0, dropped: 0} == RetryQueue.get_stats()
end
test "failed posts are retried" do
{:retry, _timeout} = RetryQueue.get_retry_params(@small_retry_count)
wait_task =
Task.async(fn ->
receive do
:complete -> :ok
end
end)
RetryQueue.enqueue({:ok, wait_task.pid}, MockActivityPub, @small_retry_count)
Task.await(wait_task)
assert %{delivered: 1, dropped: 0} == RetryQueue.get_stats()
end
test "posts that have been tried too many times are dropped" do
{:drop, _timeout} = RetryQueue.get_retry_params(@hopeless_retry_count)
RetryQueue.enqueue({:ok, nil}, MockActivityPub, @hopeless_retry_count)
assert %{delivered: 0, dropped: 1} == RetryQueue.get_stats()
end
end

View file

@ -96,6 +96,6 @@ test "it gets a magic key" do
Salmon.publish(user, activity) Salmon.publish(user, activity)
assert called(Publisher.enqueue_one(Salmon, %{recipient: mentioned_user})) assert called(Publisher.enqueue_one(Salmon, %{recipient_id: mentioned_user.id}))
end end
end end

View file

@ -5,6 +5,7 @@
defmodule Pleroma.Web.TwitterAPI.TwitterAPITest do defmodule Pleroma.Web.TwitterAPI.TwitterAPITest do
use Pleroma.DataCase use Pleroma.DataCase
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.Tests.ObanHelpers
alias Pleroma.User alias Pleroma.User
alias Pleroma.UserInviteToken alias Pleroma.UserInviteToken
alias Pleroma.Web.MastodonAPI.AccountView alias Pleroma.Web.MastodonAPI.AccountView
@ -68,6 +69,7 @@ test "it sends confirmation email if :account_activation_required is specified i
} }
{:ok, user} = TwitterAPI.register_user(data) {:ok, user} = TwitterAPI.register_user(data)
ObanHelpers.perform_all()
assert user.info.confirmation_pending assert user.info.confirmation_pending

View file

@ -4,8 +4,10 @@
defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do
use Pleroma.Web.ConnCase use Pleroma.Web.ConnCase
use Oban.Testing, repo: Pleroma.Repo
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.Tests.ObanHelpers
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
import ExUnit.CaptureLog import ExUnit.CaptureLog
@ -43,8 +45,7 @@ test "it imports follow lists from file", %{conn: conn} do
{File, [], {File, [],
read!: fn "follow_list.txt" -> read!: fn "follow_list.txt" ->
"Account address,Show boosts\n#{user2.ap_id},true" "Account address,Show boosts\n#{user2.ap_id},true"
end}, end}
{PleromaJobQueue, [:passthrough], []}
]) do ]) do
response = response =
conn conn
@ -52,15 +53,16 @@ test "it imports follow lists from file", %{conn: conn} do
|> post("/api/pleroma/follow_import", %{"list" => %Plug.Upload{path: "follow_list.txt"}}) |> post("/api/pleroma/follow_import", %{"list" => %Plug.Upload{path: "follow_list.txt"}})
|> json_response(:ok) |> json_response(:ok)
assert called(
PleromaJobQueue.enqueue(
:background,
User,
[:follow_import, user1, [user2.ap_id]]
)
)
assert response == "job started" assert response == "job started"
assert ObanHelpers.member?(
%{
"op" => "follow_import",
"follower_id" => user1.id,
"followed_identifiers" => [user2.ap_id]
},
all_enqueued(worker: Pleroma.Workers.BackgroundWorker)
)
end end
end end
@ -119,8 +121,7 @@ test "it imports blocks users from file", %{conn: conn} do
user3 = insert(:user) user3 = insert(:user)
with_mocks([ with_mocks([
{File, [], read!: fn "blocks_list.txt" -> "#{user2.ap_id} #{user3.ap_id}" end}, {File, [], read!: fn "blocks_list.txt" -> "#{user2.ap_id} #{user3.ap_id}" end}
{PleromaJobQueue, [:passthrough], []}
]) do ]) do
response = response =
conn conn
@ -128,15 +129,16 @@ test "it imports blocks users from file", %{conn: conn} do
|> post("/api/pleroma/blocks_import", %{"list" => %Plug.Upload{path: "blocks_list.txt"}}) |> post("/api/pleroma/blocks_import", %{"list" => %Plug.Upload{path: "blocks_list.txt"}})
|> json_response(:ok) |> json_response(:ok)
assert called(
PleromaJobQueue.enqueue(
:background,
User,
[:blocks_import, user1, [user2.ap_id, user3.ap_id]]
)
)
assert response == "job started" assert response == "job started"
assert ObanHelpers.member?(
%{
"op" => "blocks_import",
"blocker_id" => user1.id,
"blocked_identifiers" => [user2.ap_id, user3.ap_id]
},
all_enqueued(worker: Pleroma.Workers.BackgroundWorker)
)
end end
end end
end end
@ -560,6 +562,7 @@ test "it returns HTTP 200", %{conn: conn} do
|> json_response(:ok) |> json_response(:ok)
assert response == %{"status" => "success"} assert response == %{"status" => "success"}
ObanHelpers.perform_all()
user = User.get_cached_by_id(user.id) user = User.get_cached_by_id(user.id)

View file

@ -4,11 +4,14 @@
defmodule Pleroma.Web.WebsubTest do defmodule Pleroma.Web.WebsubTest do
use Pleroma.DataCase use Pleroma.DataCase
use Oban.Testing, repo: Pleroma.Repo
alias Pleroma.Tests.ObanHelpers
alias Pleroma.Web.Router.Helpers alias Pleroma.Web.Router.Helpers
alias Pleroma.Web.Websub alias Pleroma.Web.Websub
alias Pleroma.Web.Websub.WebsubClientSubscription alias Pleroma.Web.Websub.WebsubClientSubscription
alias Pleroma.Web.Websub.WebsubServerSubscription alias Pleroma.Web.Websub.WebsubServerSubscription
alias Pleroma.Workers.SubscriberWorker
import Pleroma.Factory import Pleroma.Factory
import Tesla.Mock import Tesla.Mock
@ -224,6 +227,7 @@ test "it renews subscriptions that have less than a day of time left" do
}) })
_refresh = Websub.refresh_subscriptions() _refresh = Websub.refresh_subscriptions()
ObanHelpers.perform(all_enqueued(worker: SubscriberWorker))
assert still_good == Repo.get(WebsubClientSubscription, still_good.id) assert still_good == Repo.get(WebsubClientSubscription, still_good.id)
refute needs_refresh == Repo.get(WebsubClientSubscription, needs_refresh.id) refute needs_refresh == Repo.get(WebsubClientSubscription, needs_refresh.id)