[#1149] Added more oban workers. Refactoring.

This commit is contained in:
Ivan Tashkinov 2019-08-13 20:20:26 +03:00
parent 33a5fc4a70
commit 0e1c481a94
30 changed files with 402 additions and 159 deletions

View file

@ -1,6 +1,11 @@
defmodule Pleroma.DigestEmailWorker do defmodule Pleroma.DigestEmailWorker do
alias Pleroma.Repo
alias Pleroma.Workers.Mailer, as: MailerWorker
import Ecto.Query import Ecto.Query
defdelegate worker_args(queue), to: Pleroma.Workers.Helper
def perform do def perform do
config = Pleroma.Config.get([:email_notifications, :digest]) config = Pleroma.Config.get([:email_notifications, :digest])
negative_interval = -Map.fetch!(config, :interval) negative_interval = -Map.fetch!(config, :interval)
@ -15,7 +20,11 @@ defmodule Pleroma.DigestEmailWorker do
select: u select: u
) )
|> Pleroma.Repo.all() |> Pleroma.Repo.all()
|> Enum.each(&PleromaJobQueue.enqueue(:digest_emails, __MODULE__, [&1])) |> Enum.each(fn user ->
%{"op" => "digest_email", "user_id" => user.id}
|> MailerWorker.new([queue: "digest_emails"] ++ worker_args(:digest_emails))
|> Repo.insert()
end)
end end
@doc """ @doc """

View file

@ -8,14 +8,18 @@ defmodule Pleroma.ScheduledActivityWorker do
""" """
alias Pleroma.Config alias Pleroma.Config
alias Pleroma.Repo
alias Pleroma.ScheduledActivity alias Pleroma.ScheduledActivity
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
use GenServer use GenServer
require Logger require Logger
@schedule_interval :timer.minutes(1) @schedule_interval :timer.minutes(1)
defdelegate worker_args(queue), to: Pleroma.Workers.Helper
def start_link do def start_link do
GenServer.start_link(__MODULE__, nil) GenServer.start_link(__MODULE__, nil)
end end
@ -45,7 +49,9 @@ defmodule Pleroma.ScheduledActivityWorker do
def handle_info(:perform, state) do def handle_info(:perform, state) do
ScheduledActivity.due_activities(@schedule_interval) ScheduledActivity.due_activities(@schedule_interval)
|> Enum.each(fn scheduled_activity -> |> Enum.each(fn scheduled_activity ->
PleromaJobQueue.enqueue(:scheduled_activities, __MODULE__, [:execute, scheduled_activity.id]) %{"op" => "execute", "activity_id" => scheduled_activity.id}
|> Pleroma.Workers.ScheduledActivityWorker.new(worker_args(:scheduled_activities))
|> Repo.insert()
end) end)
schedule_next() schedule_next()

View file

@ -26,6 +26,7 @@ defmodule Pleroma.User do
alias Pleroma.Web.OStatus alias Pleroma.Web.OStatus
alias Pleroma.Web.RelMe alias Pleroma.Web.RelMe
alias Pleroma.Web.Websub alias Pleroma.Web.Websub
alias Pleroma.Workers.BackgroundWorker
require Logger require Logger
@ -39,6 +40,8 @@ defmodule Pleroma.User do
@strict_local_nickname_regex ~r/^[a-zA-Z\d]+$/ @strict_local_nickname_regex ~r/^[a-zA-Z\d]+$/
@extended_local_nickname_regex ~r/^[a-zA-Z\d_-]+$/ @extended_local_nickname_regex ~r/^[a-zA-Z\d_-]+$/
defdelegate worker_args(queue), to: Pleroma.Workers.Helper
schema "users" do schema "users" do
field(:bio, :string) field(:bio, :string)
field(:email, :string) field(:email, :string)
@ -579,8 +582,11 @@ defmodule Pleroma.User do
end end
@doc "Fetch some posts when the user has just been federated with" @doc "Fetch some posts when the user has just been federated with"
def fetch_initial_posts(user), def fetch_initial_posts(user) do
do: PleromaJobQueue.enqueue(:background, __MODULE__, [:fetch_initial_posts, user]) %{"op" => "fetch_initial_posts", "user_id" => user.id}
|> BackgroundWorker.new(worker_args(:background))
|> Repo.insert()
end
@spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t() @spec get_followers_query(User.t(), pos_integer() | nil) :: Ecto.Query.t()
def get_followers_query(%User{} = user, nil) do def get_followers_query(%User{} = user, nil) do
@ -1001,7 +1007,9 @@ defmodule Pleroma.User do
end end
def deactivate_async(user, status \\ true) do def deactivate_async(user, status \\ true) do
PleromaJobQueue.enqueue(:background, __MODULE__, [:deactivate_async, user, status]) %{"op" => "deactivate_user", "user_id" => user.id, "status" => status}
|> BackgroundWorker.new(worker_args(:background))
|> Repo.insert()
end end
def deactivate(%User{} = user, status \\ true) do def deactivate(%User{} = user, status \\ true) do
@ -1029,9 +1037,11 @@ defmodule Pleroma.User do
|> update_and_set_cache() |> update_and_set_cache()
end end
@spec delete(User.t()) :: :ok def delete(%User{} = user) do
def delete(%User{} = user), %{"op" => "delete_user", "user_id" => user.id}
do: PleromaJobQueue.enqueue(:background, __MODULE__, [:delete, user]) |> BackgroundWorker.new(worker_args(:background))
|> Repo.insert()
end
@spec perform(atom(), User.t()) :: {:ok, User.t()} @spec perform(atom(), User.t()) :: {:ok, User.t()}
def perform(:delete, %User{} = user) do def perform(:delete, %User{} = user) do
@ -1138,21 +1148,26 @@ defmodule Pleroma.User do
Repo.all(query) Repo.all(query)
end end
def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers), def blocks_import(%User{} = blocker, blocked_identifiers) when is_list(blocked_identifiers) do
do: %{
PleromaJobQueue.enqueue(:background, __MODULE__, [ "op" => "blocks_import",
:blocks_import, "blocker_id" => blocker.id,
blocker, "blocked_identifiers" => blocked_identifiers
blocked_identifiers }
]) |> BackgroundWorker.new(worker_args(:background))
|> Repo.insert()
end
def follow_import(%User{} = follower, followed_identifiers) when is_list(followed_identifiers), def follow_import(%User{} = follower, followed_identifiers)
do: when is_list(followed_identifiers) do
PleromaJobQueue.enqueue(:background, __MODULE__, [ %{
:follow_import, "op" => "follow_import",
follower, "follower_id" => follower.id,
followed_identifiers "followed_identifiers" => followed_identifiers
]) }
|> BackgroundWorker.new(worker_args(:background))
|> Repo.insert()
end
def delete_user_activities(%User{ap_id: ap_id} = user) do def delete_user_activities(%User{ap_id: ap_id} = user) do
ap_id ap_id

View file

@ -17,6 +17,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.Web.ActivityPub.MRF alias Pleroma.Web.ActivityPub.MRF
alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.WebFinger alias Pleroma.Web.WebFinger
alias Pleroma.Workers.BackgroundWorker
import Ecto.Query import Ecto.Query
import Pleroma.Web.ActivityPub.Utils import Pleroma.Web.ActivityPub.Utils
@ -25,6 +26,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
require Logger require Logger
require Pleroma.Constants require Pleroma.Constants
defdelegate worker_args(queue), to: Pleroma.Workers.Helper
# For Announce activities, we filter the recipients based on following status for any actors # For Announce activities, we filter the recipients based on following status for any actors
# that match actual users. See issue #164 for more information about why this is necessary. # that match actual users. See issue #164 for more information about why this is necessary.
defp get_recipients(%{"type" => "Announce"} = data) do defp get_recipients(%{"type" => "Announce"} = data) do
@ -145,7 +148,9 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
activity activity
end end
PleromaJobQueue.enqueue(:background, Pleroma.Web.RichMedia.Helpers, [:fetch, activity]) %{"op" => "fetch_data_for_activity", "activity_id" => activity.id}
|> BackgroundWorker.new(worker_args(:background))
|> Repo.insert()
Notification.create_notifications(activity) Notification.create_notifications(activity)

View file

@ -7,7 +7,9 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
@behaviour Pleroma.Web.ActivityPub.MRF @behaviour Pleroma.Web.ActivityPub.MRF
alias Pleroma.HTTP alias Pleroma.HTTP
alias Pleroma.Repo
alias Pleroma.Web.MediaProxy alias Pleroma.Web.MediaProxy
alias Pleroma.Workers.BackgroundWorker
require Logger require Logger
@ -16,6 +18,8 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
recv_timeout: 10_000 recv_timeout: 10_000
] ]
defdelegate worker_args(queue), to: Pleroma.Workers.Helper
def perform(:prefetch, url) do def perform(:prefetch, url) do
Logger.info("Prefetching #{inspect(url)}") Logger.info("Prefetching #{inspect(url)}")
@ -30,7 +34,9 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
url url
|> Enum.each(fn |> Enum.each(fn
%{"href" => href} -> %{"href" => href} ->
PleromaJobQueue.enqueue(:background, __MODULE__, [:prefetch, href]) %{"op" => "media_proxy_prefetch", "url" => href}
|> BackgroundWorker.new(worker_args(:background))
|> Repo.insert()
x -> x ->
Logger.debug("Unhandled attachment URL object #{inspect(x)}") Logger.debug("Unhandled attachment URL object #{inspect(x)}")
@ -46,7 +52,9 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy do
%{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message %{"type" => "Create", "object" => %{"attachment" => attachments} = _object} = message
) )
when is_list(attachments) and length(attachments) > 0 do when is_list(attachments) and length(attachments) > 0 do
PleromaJobQueue.enqueue(:background, __MODULE__, [:preload, message]) %{"op" => "media_proxy_preload", "message" => message}
|> BackgroundWorker.new(worker_args(:background))
|> Repo.insert()
{:ok, message} {:ok, message}
end end

View file

@ -15,12 +15,15 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.Federator alias Pleroma.Web.Federator
alias Pleroma.Workers.Transmogrifier, as: TransmogrifierWorker
import Ecto.Query import Ecto.Query
require Logger require Logger
require Pleroma.Constants require Pleroma.Constants
defdelegate worker_args(queue), to: Pleroma.Workers.Helper
@doc """ @doc """
Modifies an incoming AP object (mastodon format) to our internal format. Modifies an incoming AP object (mastodon format) to our internal format.
""" """
@ -1073,7 +1076,9 @@ defmodule Pleroma.Web.ActivityPub.Transmogrifier do
already_ap <- User.ap_enabled?(user), already_ap <- User.ap_enabled?(user),
{:ok, user} <- user |> User.upgrade_changeset(data) |> User.update_and_set_cache() do {:ok, user} <- user |> User.upgrade_changeset(data) |> User.update_and_set_cache() do
unless already_ap do unless already_ap do
PleromaJobQueue.enqueue(:transmogrifier, __MODULE__, [:user_upgrade, user]) %{"op" => "user_upgrade", "user_id" => user.id}
|> TransmogrifierWorker.new(worker_args(:transmogrifier))
|> Repo.insert()
end end
{:ok, user} {:ok, user}

View file

@ -3,12 +3,23 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Federator do defmodule Pleroma.Web.Federator do
alias Pleroma.Activity
alias Pleroma.Object.Containment
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.Federator.Publisher
alias Pleroma.Web.OStatus
alias Pleroma.Web.Websub
alias Pleroma.Workers.Publisher, as: PublisherWorker alias Pleroma.Workers.Publisher, as: PublisherWorker
alias Pleroma.Workers.Receiver, as: ReceiverWorker alias Pleroma.Workers.Receiver, as: ReceiverWorker
alias Pleroma.Workers.Subscriber, as: SubscriberWorker alias Pleroma.Workers.Subscriber, as: SubscriberWorker
require Logger require Logger
defdelegate worker_args(queue), to: Pleroma.Workers.Helper
def init do def init do
# 1 minute # 1 minute
refresh_subscriptions(schedule_in: 60) refresh_subscriptions(schedule_in: 60)
@ -41,7 +52,7 @@ defmodule Pleroma.Web.Federator do
end end
def publish(%{id: "pleroma:fakeid"} = activity) do def publish(%{id: "pleroma:fakeid"} = activity) do
PublisherWorker.perform_publish(activity) perform(:publish, activity)
end end
def publish(activity) do def publish(activity) do
@ -68,11 +79,88 @@ defmodule Pleroma.Web.Federator do
|> Pleroma.Repo.insert() |> Pleroma.Repo.insert()
end end
defp worker_args(queue) do # Job Worker Callbacks
if max_attempts = Pleroma.Config.get([:workers, :retries, queue]) do
[max_attempts: max_attempts] @spec perform(atom(), module(), any()) :: {:ok, any()} | {:error, any()}
def perform(:publish_one, module, params) do
apply(module, :publish_one, [params])
end
def perform(:publish, activity) do
Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]),
{:ok, actor} <- User.ensure_keys_present(actor) do
Publisher.publish(actor, activity)
end
end
def perform(:incoming_doc, doc) do
Logger.info("Got document, trying to parse")
OStatus.handle_incoming(doc)
end
def perform(:incoming_ap_doc, params) do
Logger.info("Handling incoming AP activity")
params = Utils.normalize_params(params)
# NOTE: we use the actor ID to do the containment, this is fine because an
# actor shouldn't be acting on objects outside their own AP server.
with {:ok, _user} <- ap_enabled_actor(params["actor"]),
nil <- Activity.normalize(params["id"]),
:ok <- Containment.contain_origin_from_id(params["actor"], params),
{:ok, activity} <- Transmogrifier.handle_incoming(params) do
{:ok, activity}
else else
[] %Activity{} ->
Logger.info("Already had #{params["id"]}")
:error
_e ->
# Just drop those for now
Logger.info("Unhandled activity")
Logger.info(Jason.encode!(params, pretty: true))
:error
end
end
def perform(:request_subscription, websub) do
Logger.debug("Refreshing #{websub.topic}")
with {:ok, websub} <- Websub.request_subscription(websub) do
Logger.debug("Successfully refreshed #{websub.topic}")
else
_e -> Logger.debug("Couldn't refresh #{websub.topic}")
end
end
def perform(:verify_websub, websub) do
Logger.debug(fn ->
"Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
end)
Websub.verify(websub)
end
def perform(:refresh_subscriptions) do
Logger.debug("Federator running refresh subscriptions")
Websub.refresh_subscriptions()
spawn(fn ->
# 6 hours
Process.sleep(1000 * 60 * 60 * 6)
refresh_subscriptions()
end)
end
def ap_enabled_actor(id) do
user = User.get_cached_by_ap_id(id)
if User.ap_enabled?(user) do
{:ok, user}
else
ActivityPub.make_user_from_ap_id(id)
end end
end end
end end

View file

@ -14,9 +14,12 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
[:oauth2, :clean_expired_tokens_interval], [:oauth2, :clean_expired_tokens_interval],
86_400_000 86_400_000
) )
@queue :background
alias Pleroma.Repo
alias Pleroma.Web.OAuth.Token alias Pleroma.Web.OAuth.Token
alias Pleroma.Workers.BackgroundWorker
defdelegate worker_args(queue), to: Pleroma.Workers.Helper
def start_link, do: GenServer.start_link(__MODULE__, nil) def start_link, do: GenServer.start_link(__MODULE__, nil)
@ -31,8 +34,11 @@ defmodule Pleroma.Web.OAuth.Token.CleanWorker do
@doc false @doc false
def handle_info(:perform, state) do def handle_info(:perform, state) do
%{"op" => "clean_expired_tokens"}
|> BackgroundWorker.new(worker_args(:background))
|> Repo.insert()
Process.send_after(self(), :perform, @interval) Process.send_after(self(), :perform, @interval)
PleromaJobQueue.enqueue(@queue, __MODULE__, [:clean])
{:noreply, state} {:noreply, state}
end end

View file

@ -3,10 +3,13 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Push do defmodule Pleroma.Web.Push do
alias Pleroma.Web.Push.Impl alias Pleroma.Repo
alias Pleroma.Workers.WebPusher
require Logger require Logger
defdelegate worker_args(queue), to: Pleroma.Workers.Helper
def init do def init do
unless enabled() do unless enabled() do
Logger.warn(""" Logger.warn("""
@ -31,6 +34,9 @@ defmodule Pleroma.Web.Push do
end end
end end
def send(notification), def send(notification) do
do: PleromaJobQueue.enqueue(:web_push, Impl, [notification]) %{"op" => "web_push", "notification_id" => notification.id}
|> WebPusher.new(worker_args(:web_push))
|> Repo.insert()
end
end end

View file

@ -265,12 +265,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do
String.split(line, ",") |> List.first() String.split(line, ",") |> List.first()
end) end)
|> List.delete("Account address") do |> List.delete("Account address") do
PleromaJobQueue.enqueue(:background, User, [ User.follow_import(follower, followed_identifiers)
:follow_import,
follower,
followed_identifiers
])
json(conn, "job started") json(conn, "job started")
end end
end end
@ -281,12 +276,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilController do
def blocks_import(%{assigns: %{user: blocker}} = conn, %{"list" => list}) do def blocks_import(%{assigns: %{user: blocker}} = conn, %{"list" => list}) do
with blocked_identifiers <- String.split(list) do with blocked_identifiers <- String.split(list) do
PleromaJobQueue.enqueue(:background, User, [ User.blocks_import(blocker, blocked_identifiers)
:blocks_import,
blocker,
blocked_identifiers
])
json(conn, "job started") json(conn, "job started")
end end
end end

View file

@ -0,0 +1,66 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.BackgroundWorker do
alias Pleroma.Activity
alias Pleroma.User
alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
alias Pleroma.Web.OAuth.Token.CleanWorker
# Note: `max_attempts` is intended to be overridden in `new/1` call
use Oban.Worker,
queue: "background",
max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
@impl Oban.Worker
def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}) do
user = User.get_by_id(user_id)
User.perform(:fetch_initial_posts, user)
end
def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}) do
user = User.get_by_id(user_id)
User.perform(:deactivate_async, user, status)
end
def perform(%{"op" => "delete_user", "user_id" => user_id}) do
user = User.get_by_id(user_id)
User.perform(:delete, user)
end
def perform(%{
"op" => "blocks_import",
"blocker_id" => blocker_id,
"blocked_identifiers" => blocked_identifiers
}) do
blocker = User.get_by_id(blocker_id)
User.perform(:blocks_import, blocker, blocked_identifiers)
end
def perform(%{
"op" => "follow_import",
"follower_id" => follower_id,
"followed_identifiers" => followed_identifiers
}) do
follower = User.get_by_id(follower_id)
User.perform(:follow_import, follower, followed_identifiers)
end
def perform(%{"op" => "clean_expired_tokens"}) do
CleanWorker.perform(:clean)
end
def perform(%{"op" => "media_proxy_preload", "message" => message}) do
MediaProxyWarmingPolicy.perform(:preload, message)
end
def perform(%{"op" => "media_proxy_prefetch", "url" => url}) do
MediaProxyWarmingPolicy.perform(:prefetch, url)
end
def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id}) do
activity = Activity.get_by_id(activity_id)
Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity)
end
end

View file

@ -0,0 +1,13 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Helper do
def worker_args(queue) do
if max_attempts = Pleroma.Config.get([:workers, :retries, queue]) do
[max_attempts: max_attempts]
else
[]
end
end
end

View file

@ -0,0 +1,18 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Mailer do
alias Pleroma.User
# Note: `max_attempts` is intended to be overridden in `new/1` call
use Oban.Worker,
queue: "mailer",
max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
@impl Oban.Worker
def perform(%{"op" => "digest_email", "user_id" => user_id}) do
user = User.get_by_id(user_id)
Pleroma.DigestEmailWorker.perform(user)
end
end

View file

@ -4,7 +4,7 @@
defmodule Pleroma.Workers.Publisher do defmodule Pleroma.Workers.Publisher do
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.User alias Pleroma.Web.Federator
# Note: `max_attempts` is intended to be overridden in `new/1` call # Note: `max_attempts` is intended to be overridden in `new/1` call
use Oban.Worker, use Oban.Worker,
@ -13,23 +13,11 @@ defmodule Pleroma.Workers.Publisher do
@impl Oban.Worker @impl Oban.Worker
def perform(%{"op" => "publish", "activity_id" => activity_id}) do def perform(%{"op" => "publish", "activity_id" => activity_id}) do
with %Activity{} = activity <- Activity.get_by_id(activity_id) do activity = Activity.get_by_id(activity_id)
perform_publish(activity) Federator.perform(:publish, activity)
else
_ -> raise "Non-existing activity: #{activity_id}"
end
end end
def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}) do def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}) do
module_name Federator.perform(:publish_one, String.to_atom(module_name), params)
|> String.to_atom()
|> apply(:publish_one, [params])
end
def perform_publish(%Activity{} = activity) do
with %User{} = actor <- User.get_cached_by_ap_id(activity.data["actor"]),
{:ok, actor} <- User.ensure_keys_present(actor) do
Pleroma.Web.Federator.Publisher.publish(actor, activity)
end
end end
end end

View file

@ -3,15 +3,7 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Receiver do defmodule Pleroma.Workers.Receiver do
alias Pleroma.Activity alias Pleroma.Web.Federator
alias Pleroma.Object.Containment
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.ActivityPub.Utils
alias Pleroma.Web.OStatus
require Logger
# Note: `max_attempts` is intended to be overridden in `new/1` call # Note: `max_attempts` is intended to be overridden in `new/1` call
use Oban.Worker, use Oban.Worker,
@ -20,42 +12,10 @@ defmodule Pleroma.Workers.Receiver do
@impl Oban.Worker @impl Oban.Worker
def perform(%{"op" => "incoming_doc", "body" => doc}) do def perform(%{"op" => "incoming_doc", "body" => doc}) do
Logger.info("Got incoming document, trying to parse") Federator.perform(:incoming_doc, doc)
OStatus.handle_incoming(doc)
end end
def perform(%{"op" => "incoming_ap_doc", "params" => params}) do def perform(%{"op" => "incoming_ap_doc", "params" => params}) do
Logger.info("Handling incoming AP activity") Federator.perform(:incoming_ap_doc, params)
params = Utils.normalize_params(params)
# NOTE: we use the actor ID to do the containment, this is fine because an
# actor shouldn't be acting on objects outside their own AP server.
with {:ok, _user} <- ap_enabled_actor(params["actor"]),
nil <- Activity.normalize(params["id"]),
:ok <- Containment.contain_origin_from_id(params["actor"], params),
{:ok, activity} <- Transmogrifier.handle_incoming(params) do
{:ok, activity}
else
%Activity{} ->
Logger.info("Already had #{params["id"]}")
:error
_e ->
# Just drop those for now
Logger.info("Unhandled activity")
Logger.info(Jason.encode!(params, pretty: true))
:error
end
end
defp ap_enabled_actor(id) do
user = User.get_cached_by_ap_id(id)
if User.ap_enabled?(user) do
{:ok, user}
else
ActivityPub.make_user_from_ap_id(id)
end
end end
end end

View file

@ -0,0 +1,15 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.ScheduledActivityWorker do
# Note: `max_attempts` is intended to be overridden in `new/1` call
use Oban.Worker,
queue: "scheduled_activities",
max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
@impl Oban.Worker
def perform(%{"op" => "execute", "activity_id" => activity_id}) do
Pleroma.ScheduledActivityWorker.perform(:execute, activity_id)
end
end

View file

@ -4,11 +4,9 @@
defmodule Pleroma.Workers.Subscriber do defmodule Pleroma.Workers.Subscriber do
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.Web.Websub alias Pleroma.Web.Federator
alias Pleroma.Web.Websub.WebsubClientSubscription alias Pleroma.Web.Websub.WebsubClientSubscription
require Logger
# Note: `max_attempts` is intended to be overridden in `new/1` call # Note: `max_attempts` is intended to be overridden in `new/1` call
use Oban.Worker, use Oban.Worker,
queue: "federator_outgoing", queue: "federator_outgoing",
@ -16,29 +14,16 @@ defmodule Pleroma.Workers.Subscriber do
@impl Oban.Worker @impl Oban.Worker
def perform(%{"op" => "refresh_subscriptions"}) do def perform(%{"op" => "refresh_subscriptions"}) do
Websub.refresh_subscriptions() Federator.perform(:refresh_subscriptions)
# Schedule the next run in 6 hours
Pleroma.Web.Federator.refresh_subscriptions(schedule_in: 3600 * 6)
end end
def perform(%{"op" => "request_subscription", "websub_id" => websub_id}) do def perform(%{"op" => "request_subscription", "websub_id" => websub_id}) do
websub = Repo.get(WebsubClientSubscription, websub_id) websub = Repo.get(WebsubClientSubscription, websub_id)
Logger.debug("Refreshing #{websub.topic}") Federator.perform(:request_subscription, websub)
with {:ok, websub} <- Websub.request_subscription(websub) do
Logger.debug("Successfully refreshed #{websub.topic}")
else
_e -> Logger.debug("Couldn't refresh #{websub.topic}")
end
end end
def perform(%{"op" => "verify_websub", "websub_id" => websub_id}) do def perform(%{"op" => "verify_websub", "websub_id" => websub_id}) do
websub = Repo.get(WebsubClientSubscription, websub_id) websub = Repo.get(WebsubClientSubscription, websub_id)
Federator.perform(:verify_websub, websub)
Logger.debug(fn ->
"Running WebSub verification for #{websub.id} (#{websub.topic}, #{websub.callback})"
end)
Websub.verify(websub)
end end
end end

View file

@ -0,0 +1,18 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Transmogrifier do
alias Pleroma.User
# Note: `max_attempts` is intended to be overridden in `new/1` call
use Oban.Worker,
queue: "transmogrifier",
max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
@impl Oban.Worker
def perform(%{"op" => "user_upgrade", "user_id" => user_id}) do
user = User.get_by_id(user_id)
Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user)
end
end

View file

@ -0,0 +1,19 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.WebPusher do
alias Pleroma.Notification
alias Pleroma.Repo
# Note: `max_attempts` is intended to be overridden in `new/1` call
use Oban.Worker,
queue: "web_push",
max_attempts: Pleroma.Config.get([:workers, :retries, :compile_time_default])
@impl Oban.Worker
def perform(%{"op" => "web_push", "notification_id" => notification_id}) do
notification = Repo.get(Notification, notification_id)
Pleroma.Web.Push.Impl.perform(notification)
end
end

View file

@ -6,8 +6,8 @@ defmodule Pleroma.ActivityTest do
use Pleroma.DataCase use Pleroma.DataCase
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Bookmark alias Pleroma.Bookmark
alias Pleroma.ObanHelpers
alias Pleroma.Object alias Pleroma.Object
alias Pleroma.Tests.ObanHelpers
alias Pleroma.ThreadMute alias Pleroma.ThreadMute
import Pleroma.Factory import Pleroma.Factory

View file

@ -28,7 +28,7 @@ defmodule Pleroma.ConversationTest do
{:ok, _activity} = {:ok, _activity} =
CommonAPI.post(user, %{"visibility" => "direct", "status" => "hey @#{other_user.nickname}"}) CommonAPI.post(user, %{"visibility" => "direct", "status" => "hey @#{other_user.nickname}"})
Pleroma.ObanHelpers.perform_all() Pleroma.Tests.ObanHelpers.perform_all()
Repo.delete_all(Conversation) Repo.delete_all(Conversation)
Repo.delete_all(Conversation.Participation) Repo.delete_all(Conversation.Participation)

View file

@ -8,6 +8,7 @@ defmodule Pleroma.NotificationTest do
import Pleroma.Factory import Pleroma.Factory
alias Pleroma.Notification alias Pleroma.Notification
alias Pleroma.Tests.ObanHelpers
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
@ -621,7 +622,8 @@ defmodule Pleroma.NotificationTest do
refute Enum.empty?(Notification.for_user(other_user)) refute Enum.empty?(Notification.for_user(other_user))
User.delete(user) {:ok, job} = User.delete(user)
ObanHelpers.perform(job)
assert Enum.empty?(Notification.for_user(other_user)) assert Enum.empty?(Notification.for_user(other_user))
end end
@ -666,6 +668,7 @@ defmodule Pleroma.NotificationTest do
} }
{:ok, _delete_activity} = Transmogrifier.handle_incoming(delete_user_message) {:ok, _delete_activity} = Transmogrifier.handle_incoming(delete_user_message)
ObanHelpers.perform_all()
assert Enum.empty?(Notification.for_user(local_user)) assert Enum.empty?(Notification.for_user(local_user))
end end

View file

@ -2,7 +2,7 @@
# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/> # Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.ObanHelpers do defmodule Pleroma.Tests.ObanHelpers do
@moduledoc """ @moduledoc """
Oban test helpers. Oban test helpers.
""" """

View file

@ -5,9 +5,9 @@
defmodule Pleroma.UserTest do defmodule Pleroma.UserTest do
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Builders.UserBuilder alias Pleroma.Builders.UserBuilder
alias Pleroma.ObanHelpers
alias Pleroma.Object alias Pleroma.Object
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.Tests.ObanHelpers
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
@ -676,7 +676,9 @@ defmodule Pleroma.UserTest do
user3.nickname user3.nickname
] ]
result = User.follow_import(user1, identifiers) {:ok, job} = User.follow_import(user1, identifiers)
result = ObanHelpers.perform(job)
assert is_list(result) assert is_list(result)
assert result == [user2, user3] assert result == [user2, user3]
end end
@ -887,7 +889,9 @@ defmodule Pleroma.UserTest do
user3.nickname user3.nickname
] ]
result = User.blocks_import(user1, identifiers) {:ok, job} = User.blocks_import(user1, identifiers)
result = ObanHelpers.perform(job)
assert is_list(result) assert is_list(result)
assert result == [user2, user3] assert result == [user2, user3]
end end
@ -1013,7 +1017,8 @@ defmodule Pleroma.UserTest do
{:ok, like_two, _} = CommonAPI.favorite(activity.id, follower) {:ok, like_two, _} = CommonAPI.favorite(activity.id, follower)
{:ok, repeat, _} = CommonAPI.repeat(activity_two.id, user) {:ok, repeat, _} = CommonAPI.repeat(activity_two.id, user)
{:ok, _} = User.delete(user) {:ok, job} = User.delete(user)
{:ok, _user} = ObanHelpers.perform(job)
follower = User.get_cached_by_id(follower.id) follower = User.get_cached_by_id(follower.id)
@ -1043,7 +1048,8 @@ defmodule Pleroma.UserTest do
{:ok, follower} = User.get_or_fetch_by_ap_id("http://mastodon.example.org/users/admin") {:ok, follower} = User.get_or_fetch_by_ap_id("http://mastodon.example.org/users/admin")
{:ok, _} = User.follow(follower, user) {:ok, _} = User.follow(follower, user)
{:ok, _user} = User.delete(user) {:ok, job} = User.delete(user)
{:ok, _user} = ObanHelpers.perform(job)
assert ObanHelpers.member?( assert ObanHelpers.member?(
%{ %{
@ -1100,7 +1106,8 @@ defmodule Pleroma.UserTest do
test "User.delete() plugs any possible zombie objects" do test "User.delete() plugs any possible zombie objects" do
user = insert(:user) user = insert(:user)
{:ok, _} = User.delete(user) {:ok, job} = User.delete(user)
{:ok, _} = ObanHelpers.perform(job)
{:ok, cached_user} = Cachex.get(:user_cache, "ap_id:#{user.ap_id}") {:ok, cached_user} = Cachex.get(:user_cache, "ap_id:#{user.ap_id}")

View file

@ -9,8 +9,8 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubControllerTest do
import Pleroma.Factory import Pleroma.Factory
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Instances alias Pleroma.Instances
alias Pleroma.ObanHelpers
alias Pleroma.Object alias Pleroma.Object
alias Pleroma.Tests.ObanHelpers
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.ActivityPub.ObjectView alias Pleroma.Web.ActivityPub.ObjectView
alias Pleroma.Web.ActivityPub.UserView alias Pleroma.Web.ActivityPub.UserView

View file

@ -6,6 +6,7 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicyTest do
use Pleroma.DataCase use Pleroma.DataCase
alias Pleroma.HTTP alias Pleroma.HTTP
alias Pleroma.Tests.ObanHelpers
alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
import Mock import Mock
@ -24,6 +25,11 @@ defmodule Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicyTest do
test "it prefetches media proxy URIs" do test "it prefetches media proxy URIs" do
with_mock HTTP, get: fn _, _, _ -> {:ok, []} end do with_mock HTTP, get: fn _, _, _ -> {:ok, []} end do
MediaProxyWarmingPolicy.filter(@message) MediaProxyWarmingPolicy.filter(@message)
ObanHelpers.perform_all()
# Performing jobs which has been just enqueued
ObanHelpers.perform_all()
assert called(HTTP.get(:_, :_, :_)) assert called(HTTP.get(:_, :_, :_))
end end
end end

View file

@ -8,6 +8,7 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do
alias Pleroma.Object alias Pleroma.Object
alias Pleroma.Object.Fetcher alias Pleroma.Object.Fetcher
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.Tests.ObanHelpers
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.Transmogrifier
@ -563,6 +564,7 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do
|> Poison.decode!() |> Poison.decode!()
{:ok, _} = Transmogrifier.handle_incoming(data) {:ok, _} = Transmogrifier.handle_incoming(data)
ObanHelpers.perform_all()
refute User.get_cached_by_ap_id(ap_id) refute User.get_cached_by_ap_id(ap_id)
end end
@ -1132,6 +1134,8 @@ defmodule Pleroma.Web.ActivityPub.TransmogrifierTest do
assert user.info.note_count == 1 assert user.info.note_count == 1
{:ok, user} = Transmogrifier.upgrade_user_from_ap_id("https://niu.moe/users/rye") {:ok, user} = Transmogrifier.upgrade_user_from_ap_id("https://niu.moe/users/rye")
ObanHelpers.perform_all()
assert user.info.ap_enabled assert user.info.ap_enabled
assert user.info.note_count == 1 assert user.info.note_count == 1
assert user.follower_address == "https://niu.moe/users/rye/followers" assert user.follower_address == "https://niu.moe/users/rye/followers"

View file

@ -4,7 +4,7 @@
defmodule Pleroma.Web.FederatorTest do defmodule Pleroma.Web.FederatorTest do
alias Pleroma.Instances alias Pleroma.Instances
alias Pleroma.ObanHelpers alias Pleroma.Tests.ObanHelpers
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Federator alias Pleroma.Web.Federator
alias Pleroma.Workers.Publisher, as: PublisherWorker alias Pleroma.Workers.Publisher, as: PublisherWorker

View file

@ -4,9 +4,11 @@
defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do
use Pleroma.Web.ConnCase use Pleroma.Web.ConnCase
use Oban.Testing, repo: Pleroma.Repo
alias Pleroma.Notification alias Pleroma.Notification
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.Tests.ObanHelpers
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
import Pleroma.Factory import Pleroma.Factory
@ -50,8 +52,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do
{File, [], {File, [],
read!: fn "follow_list.txt" -> read!: fn "follow_list.txt" ->
"Account address,Show boosts\n#{user2.ap_id},true" "Account address,Show boosts\n#{user2.ap_id},true"
end}, end}
{PleromaJobQueue, [:passthrough], []}
]) do ]) do
response = response =
conn conn
@ -59,15 +60,16 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do
|> post("/api/pleroma/follow_import", %{"list" => %Plug.Upload{path: "follow_list.txt"}}) |> post("/api/pleroma/follow_import", %{"list" => %Plug.Upload{path: "follow_list.txt"}})
|> json_response(:ok) |> json_response(:ok)
assert called(
PleromaJobQueue.enqueue(
:background,
User,
[:follow_import, user1, [user2.ap_id]]
)
)
assert response == "job started" assert response == "job started"
assert ObanHelpers.member?(
%{
"op" => "follow_import",
"follower_id" => user1.id,
"followed_identifiers" => [user2.ap_id]
},
all_enqueued(worker: Pleroma.Workers.BackgroundWorker)
)
end end
end end
@ -126,8 +128,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do
user3 = insert(:user) user3 = insert(:user)
with_mocks([ with_mocks([
{File, [], read!: fn "blocks_list.txt" -> "#{user2.ap_id} #{user3.ap_id}" end}, {File, [], read!: fn "blocks_list.txt" -> "#{user2.ap_id} #{user3.ap_id}" end}
{PleromaJobQueue, [:passthrough], []}
]) do ]) do
response = response =
conn conn
@ -135,15 +136,16 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do
|> post("/api/pleroma/blocks_import", %{"list" => %Plug.Upload{path: "blocks_list.txt"}}) |> post("/api/pleroma/blocks_import", %{"list" => %Plug.Upload{path: "blocks_list.txt"}})
|> json_response(:ok) |> json_response(:ok)
assert called(
PleromaJobQueue.enqueue(
:background,
User,
[:blocks_import, user1, [user2.ap_id, user3.ap_id]]
)
)
assert response == "job started" assert response == "job started"
assert ObanHelpers.member?(
%{
"op" => "blocks_import",
"blocker_id" => user1.id,
"blocked_identifiers" => [user2.ap_id, user3.ap_id]
},
all_enqueued(worker: Pleroma.Workers.BackgroundWorker)
)
end end
end end
end end
@ -607,6 +609,7 @@ defmodule Pleroma.Web.TwitterAPI.UtilControllerTest do
|> json_response(:ok) |> json_response(:ok)
assert response == %{"status" => "success"} assert response == %{"status" => "success"}
ObanHelpers.perform_all()
user = User.get_cached_by_id(user.id) user = User.get_cached_by_id(user.id)

View file

@ -6,7 +6,7 @@ defmodule Pleroma.Web.WebsubTest do
use Pleroma.DataCase use Pleroma.DataCase
use Oban.Testing, repo: Pleroma.Repo use Oban.Testing, repo: Pleroma.Repo
alias Pleroma.ObanHelpers alias Pleroma.Tests.ObanHelpers
alias Pleroma.Web.Router.Helpers alias Pleroma.Web.Router.Helpers
alias Pleroma.Web.Websub alias Pleroma.Web.Websub
alias Pleroma.Web.Websub.WebsubClientSubscription alias Pleroma.Web.Websub.WebsubClientSubscription