Add scheduler for sending scheduled activities to the queue

This commit is contained in:
eugenijm 2019-04-03 18:55:04 +03:00
parent fc92a0fd8d
commit 2056efa714
13 changed files with 196 additions and 68 deletions

View file

@ -361,16 +361,13 @@
config :pleroma_job_queue, :queues, config :pleroma_job_queue, :queues,
federator_incoming: 50, federator_incoming: 50,
federator_outgoing: 50, federator_outgoing: 50,
mailer: 10 mailer: 10,
scheduled_activities: 10
config :pleroma, :fetch_initial_posts, config :pleroma, :fetch_initial_posts,
enabled: false, enabled: false,
pages: 5 pages: 5
config :pleroma, Pleroma.ScheduledActivity,
daily_user_limit: 25,
total_user_limit: 100
config :auto_linker, config :auto_linker,
opts: [ opts: [
scheme: true, scheme: true,
@ -396,6 +393,11 @@
config :prometheus, Pleroma.Web.Endpoint.MetricsExporter, path: "/api/pleroma/app_metrics" config :prometheus, Pleroma.Web.Endpoint.MetricsExporter, path: "/api/pleroma/app_metrics"
config :pleroma, Pleroma.ScheduledActivity,
daily_user_limit: 25,
total_user_limit: 300,
enabled: true
# Import environment specific config. This must remain at the bottom # Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above. # of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs" import_config "#{Mix.env()}.exs"

View file

@ -50,6 +50,11 @@
config :pleroma_job_queue, disabled: true config :pleroma_job_queue, disabled: true
config :pleroma, Pleroma.ScheduledActivity,
daily_user_limit: 2,
total_user_limit: 3,
enabled: false
try do try do
import_config "test.secret.exs" import_config "test.secret.exs"
rescue rescue

View file

@ -317,6 +317,7 @@ Pleroma has the following queues:
* `federator_outgoing` - Outgoing federation * `federator_outgoing` - Outgoing federation
* `federator_incoming` - Incoming federation * `federator_incoming` - Incoming federation
* `mailer` - Email sender, see [`Pleroma.Mailer`](#pleroma-mailer) * `mailer` - Email sender, see [`Pleroma.Mailer`](#pleroma-mailer)
* `scheduled_activities` - Scheduled activities, see [`Pleroma.ScheduledActivities`](#pleromascheduledactivity)
Example: Example:
@ -413,7 +414,8 @@ Pleroma account will be created with the same name as the LDAP user name.
* `Pleroma.Web.Auth.PleromaAuthenticator`: default database authenticator * `Pleroma.Web.Auth.PleromaAuthenticator`: default database authenticator
* `Pleroma.Web.Auth.LDAPAuthenticator`: LDAP authentication * `Pleroma.Web.Auth.LDAPAuthenticator`: LDAP authentication
## Pleroma.ScheduledActivity ## Pleroma.ScheduledActivity
* `daily_user_limit`: the number of scheduled activities a user is allowed to create in a single day * `daily_user_limit`: the number of scheduled activities a user is allowed to create in a single day (Default: `25`)
* `total_user_limit`: the number of scheduled activities a user is allowed to create in total * `total_user_limit`: the number of scheduled activities a user is allowed to create in total (Default: `300`)
* `enabled`: whether scheduled activities are sent to the job queue to be executed

View file

@ -104,7 +104,8 @@ def start(_type, _args) do
], ],
id: :cachex_idem id: :cachex_idem
), ),
worker(Pleroma.FlakeId, []) worker(Pleroma.FlakeId, []),
worker(Pleroma.ScheduledActivityWorker, [])
] ++ ] ++
hackney_pool_children() ++ hackney_pool_children() ++
[ [

View file

@ -184,12 +184,4 @@ def decrease_replies_count(ap_id) do
_ -> {:error, "Not found"} _ -> {:error, "Not found"}
end end
end end
def enforce_user_objects(user, object_ids) do
Object
|> where([o], fragment("?->>'actor' = ?", o.data, ^user.ap_id))
|> where([o], o.id in ^object_ids)
|> select([o], o.id)
|> Repo.all()
end
end end

View file

@ -6,7 +6,6 @@ defmodule Pleroma.ScheduledActivity do
use Ecto.Schema use Ecto.Schema
alias Pleroma.Config alias Pleroma.Config
alias Pleroma.Object
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.ScheduledActivity alias Pleroma.ScheduledActivity
alias Pleroma.User alias Pleroma.User
@ -37,8 +36,6 @@ defp with_media_attachments(
%{changes: %{params: %{"media_ids" => media_ids} = params}} = changeset %{changes: %{params: %{"media_ids" => media_ids} = params}} = changeset
) )
when is_list(media_ids) do when is_list(media_ids) do
user = User.get_cached_by_id(changeset.data.user_id)
media_ids = Object.enforce_user_objects(user, media_ids) |> Enum.map(&to_string(&1))
media_attachments = Utils.attachments_from_ids(%{"media_ids" => media_ids}) media_attachments = Utils.attachments_from_ids(%{"media_ids" => media_ids})
params = params =
@ -79,8 +76,8 @@ def validate_scheduled_at(changeset) do
def exceeds_daily_user_limit?(user_id, scheduled_at) do def exceeds_daily_user_limit?(user_id, scheduled_at) do
ScheduledActivity ScheduledActivity
|> where(user_id: ^user_id) |> where(user_id: ^user_id)
|> where([s], type(s.scheduled_at, :date) == type(^scheduled_at, :date)) |> where([sa], type(sa.scheduled_at, :date) == type(^scheduled_at, :date))
|> select([u], count(u.id)) |> select([sa], count(sa.id))
|> Repo.one() |> Repo.one()
|> Kernel.>=(Config.get([ScheduledActivity, :daily_user_limit])) |> Kernel.>=(Config.get([ScheduledActivity, :daily_user_limit]))
end end
@ -88,7 +85,7 @@ def exceeds_daily_user_limit?(user_id, scheduled_at) do
def exceeds_total_user_limit?(user_id) do def exceeds_total_user_limit?(user_id) do
ScheduledActivity ScheduledActivity
|> where(user_id: ^user_id) |> where(user_id: ^user_id)
|> select([u], count(u.id)) |> select([sa], count(sa.id))
|> Repo.one() |> Repo.one()
|> Kernel.>=(Config.get([ScheduledActivity, :total_user_limit])) |> Kernel.>=(Config.get([ScheduledActivity, :total_user_limit]))
end end
@ -125,19 +122,40 @@ def get(%User{} = user, scheduled_activity_id) do
|> Repo.one() |> Repo.one()
end end
def update(scheduled_activity, attrs) do def update(%ScheduledActivity{} = scheduled_activity, attrs) do
scheduled_activity scheduled_activity
|> update_changeset(attrs) |> update_changeset(attrs)
|> Repo.update() |> Repo.update()
end end
def delete(scheduled_activity) do def delete(%ScheduledActivity{} = scheduled_activity) do
scheduled_activity scheduled_activity
|> Repo.delete() |> Repo.delete()
end end
def delete(id) when is_binary(id) or is_integer(id) do
ScheduledActivity
|> where(id: ^id)
|> select([sa], sa)
|> Repo.delete_all()
|> case do
{1, [scheduled_activity]} -> {:ok, scheduled_activity}
_ -> :error
end
end
def for_user_query(%User{} = user) do def for_user_query(%User{} = user) do
ScheduledActivity ScheduledActivity
|> where(user_id: ^user.id) |> where(user_id: ^user.id)
end end
def due_activities(offset \\ 0) do
naive_datetime =
NaiveDateTime.utc_now()
|> NaiveDateTime.add(offset, :millisecond)
ScheduledActivity
|> where([sa], sa.scheduled_at < ^naive_datetime)
|> Repo.all()
end
end end

View file

@ -0,0 +1,58 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.ScheduledActivityWorker do
@moduledoc """
Sends scheduled activities to the job queue.
"""
alias Pleroma.Config
alias Pleroma.ScheduledActivity
alias Pleroma.User
alias Pleroma.Web.CommonAPI
use GenServer
require Logger
@schedule_interval :timer.minutes(1)
def start_link do
GenServer.start_link(__MODULE__, nil)
end
def init(_) do
if Config.get([ScheduledActivity, :enabled]) do
schedule_next()
{:ok, nil}
else
:ignore
end
end
def perform(:execute, scheduled_activity_id) do
try do
{:ok, scheduled_activity} = ScheduledActivity.delete(scheduled_activity_id)
%User{} = user = User.get_cached_by_id(scheduled_activity.user_id)
{:ok, _result} = CommonAPI.post(user, scheduled_activity.params)
rescue
error ->
Logger.error(
"#{__MODULE__} Couldn't create a status from the scheduled activity: #{inspect(error)}"
)
end
end
def handle_info(:perform, state) do
ScheduledActivity.due_activities(@schedule_interval)
|> Enum.each(fn scheduled_activity ->
PleromaJobQueue.enqueue(:scheduled_activities, __MODULE__, [:execute, scheduled_activity.id])
end)
schedule_next()
{:noreply, state}
end
defp schedule_next do
Process.send_after(self(), :perform, @schedule_interval)
end
end

View file

@ -5,6 +5,7 @@
defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do defmodule Pleroma.Web.MastodonAPI.MastodonAPIController do
use Pleroma.Web, :controller use Pleroma.Web, :controller
alias Ecto.Changeset
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Config alias Pleroma.Config
alias Pleroma.Filter alias Pleroma.Filter
@ -438,14 +439,12 @@ def post_status(%{assigns: %{user: user}} = conn, %{"status" => _} = params) do
scheduled_at = params["scheduled_at"] scheduled_at = params["scheduled_at"]
if scheduled_at && ScheduledActivity.far_enough?(scheduled_at) do if scheduled_at && ScheduledActivity.far_enough?(scheduled_at) do
{:ok, scheduled_activity} = with {:ok, scheduled_activity} <-
Cachex.fetch!(:idempotency_cache, idempotency_key, fn _ -> ScheduledActivity.create(user, %{"params" => params, "scheduled_at" => scheduled_at}) do
ScheduledActivity.create(user, %{"params" => params, "scheduled_at" => scheduled_at}) conn
end) |> put_view(ScheduledActivityView)
|> render("show.json", %{scheduled_activity: scheduled_activity})
conn end
|> put_view(ScheduledActivityView)
|> render("show.json", %{scheduled_activity: scheduled_activity})
else else
params = Map.drop(params, ["scheduled_at"]) params = Map.drop(params, ["scheduled_at"])
@ -1474,6 +1473,17 @@ def delete_filter(%{assigns: %{user: user}} = conn, %{"id" => filter_id}) do
# fallback action # fallback action
# #
def errors(conn, {:error, %Changeset{} = changeset}) do
error_message =
changeset
|> Changeset.traverse_errors(fn {message, _opt} -> message end)
|> Enum.map_join(", ", fn {_k, v} -> v end)
conn
|> put_status(422)
|> json(%{error: error_message})
end
def errors(conn, {:error, :not_found}) do def errors(conn, {:error, :not_found}) do
conn conn
|> put_status(404) |> put_status(404)

View file

@ -16,16 +16,20 @@ def render("index.json", %{scheduled_activities: scheduled_activities}) do
def render("show.json", %{scheduled_activity: %ScheduledActivity{} = scheduled_activity}) do def render("show.json", %{scheduled_activity: %ScheduledActivity{} = scheduled_activity}) do
%{ %{
id: scheduled_activity.id |> to_string, id: to_string(scheduled_activity.id),
scheduled_at: scheduled_activity.scheduled_at |> CommonAPI.Utils.to_masto_date(), scheduled_at: CommonAPI.Utils.to_masto_date(scheduled_activity.scheduled_at),
params: status_params(scheduled_activity.params) params: status_params(scheduled_activity.params)
} }
|> with_media_attachments(scheduled_activity) |> with_media_attachments(scheduled_activity)
end end
defp with_media_attachments(data, %{params: %{"media_attachments" => media_attachments}}) do defp with_media_attachments(data, %{params: %{"media_attachments" => media_attachments}}) do
attachments = render_many(media_attachments, StatusView, "attachment.json", as: :attachment) try do
Map.put(data, :media_attachments, attachments) attachments = render_many(media_attachments, StatusView, "attachment.json", as: :attachment)
Map.put(data, :media_attachments, attachments)
rescue
_ -> data
end
end end
defp with_media_attachments(data, _), do: data defp with_media_attachments(data, _), do: data

View file

@ -4,15 +4,11 @@
defmodule Pleroma.ScheduledActivityTest do defmodule Pleroma.ScheduledActivityTest do
use Pleroma.DataCase use Pleroma.DataCase
alias Pleroma.Config
alias Pleroma.DataCase alias Pleroma.DataCase
alias Pleroma.ScheduledActivity alias Pleroma.ScheduledActivity
alias Pleroma.Web.ActivityPub.ActivityPub
import Pleroma.Factory import Pleroma.Factory
setup context do setup context do
Config.put([ScheduledActivity, :daily_user_limit], 2)
Config.put([ScheduledActivity, :total_user_limit], 3)
DataCase.ensure_local_uploader(context) DataCase.ensure_local_uploader(context)
end end
@ -42,7 +38,7 @@ test "when total user limit is exceeded" do
tomorrow = tomorrow =
NaiveDateTime.utc_now() NaiveDateTime.utc_now()
|> NaiveDateTime.add(:timer.hours(24), :millisecond) |> NaiveDateTime.add(:timer.hours(36), :millisecond)
|> NaiveDateTime.to_iso8601() |> NaiveDateTime.to_iso8601()
{:ok, _} = ScheduledActivity.create(user, %{params: %{}, scheduled_at: today}) {:ok, _} = ScheduledActivity.create(user, %{params: %{}, scheduled_at: today})
@ -64,30 +60,5 @@ test "when scheduled_at is earlier than 5 minute from now" do
{:error, changeset} = ScheduledActivity.create(user, attrs) {:error, changeset} = ScheduledActivity.create(user, attrs)
assert changeset.errors == [scheduled_at: {"must be at least 5 minutes from now", []}] assert changeset.errors == [scheduled_at: {"must be at least 5 minutes from now", []}]
end end
test "excludes attachments belonging to another user" do
user = insert(:user)
another_user = insert(:user)
scheduled_at =
NaiveDateTime.utc_now()
|> NaiveDateTime.add(:timer.minutes(10), :millisecond)
|> NaiveDateTime.to_iso8601()
file = %Plug.Upload{
content_type: "image/jpg",
path: Path.absname("test/fixtures/image.jpg"),
filename: "an_image.jpg"
}
{:ok, user_upload} = ActivityPub.upload(file, actor: user.ap_id)
{:ok, another_user_upload} = ActivityPub.upload(file, actor: another_user.ap_id)
media_ids = [user_upload.id, another_user_upload.id]
attrs = %{params: %{"media_ids" => media_ids}, scheduled_at: scheduled_at}
{:ok, scheduled_activity} = ScheduledActivity.create(user, attrs)
assert to_string(user_upload.id) in scheduled_activity.params["media_ids"]
refute to_string(another_user_upload.id) in scheduled_activity.params["media_ids"]
end
end end
end end

View file

@ -0,0 +1,19 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2018 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.ScheduledActivityWorkerTest do
use Pleroma.DataCase
alias Pleroma.ScheduledActivity
import Pleroma.Factory
test "creates a status from the scheduled activity" do
user = insert(:user)
scheduled_activity = insert(:scheduled_activity, user: user, params: %{status: "hi"})
Pleroma.ScheduledActivityWorker.perform(:execute, scheduled_activity.id)
refute Repo.get(ScheduledActivity, scheduled_activity.id)
activity = Repo.all(Pleroma.Activity) |> Enum.find(&(&1.actor == user.ap_id))
assert activity.data["object"]["content"] == "hi"
end
end

View file

@ -2471,6 +2471,52 @@ test "skips the scheduling and creates the activity if scheduled_at is earlier t
assert [] == Repo.all(ScheduledActivity) assert [] == Repo.all(ScheduledActivity)
end end
test "returns error when daily user limit is exceeded", %{conn: conn} do
user = insert(:user)
today =
NaiveDateTime.utc_now()
|> NaiveDateTime.add(:timer.minutes(6), :millisecond)
|> NaiveDateTime.to_iso8601()
attrs = %{params: %{}, scheduled_at: today}
{:ok, _} = ScheduledActivity.create(user, attrs)
{:ok, _} = ScheduledActivity.create(user, attrs)
conn =
conn
|> assign(:user, user)
|> post("/api/v1/statuses", %{"status" => "scheduled", "scheduled_at" => today})
assert %{"error" => "daily limit exceeded"} == json_response(conn, 422)
end
test "returns error when total user limit is exceeded", %{conn: conn} do
user = insert(:user)
today =
NaiveDateTime.utc_now()
|> NaiveDateTime.add(:timer.minutes(6), :millisecond)
|> NaiveDateTime.to_iso8601()
tomorrow =
NaiveDateTime.utc_now()
|> NaiveDateTime.add(:timer.hours(36), :millisecond)
|> NaiveDateTime.to_iso8601()
attrs = %{params: %{}, scheduled_at: today}
{:ok, _} = ScheduledActivity.create(user, attrs)
{:ok, _} = ScheduledActivity.create(user, attrs)
{:ok, _} = ScheduledActivity.create(user, %{params: %{}, scheduled_at: tomorrow})
conn =
conn
|> assign(:user, user)
|> post("/api/v1/statuses", %{"status" => "scheduled", "scheduled_at" => tomorrow})
assert %{"error" => "total limit exceeded"} == json_response(conn, 422)
end
test "shows scheduled activities", %{conn: conn} do test "shows scheduled activities", %{conn: conn} do
user = insert(:user) user = insert(:user)
scheduled_activity_id1 = insert(:scheduled_activity, user: user).id |> to_string() scheduled_activity_id1 = insert(:scheduled_activity, user: user).id |> to_string()

View file

@ -52,7 +52,7 @@ test "A scheduled activity with a media attachment" do
|> Enum.map(&StatusView.render("attachment.json", %{attachment: &1})), |> Enum.map(&StatusView.render("attachment.json", %{attachment: &1})),
params: %{ params: %{
in_reply_to_id: to_string(activity.id), in_reply_to_id: to_string(activity.id),
media_ids: [to_string(upload.id)], media_ids: [upload.id],
poll: nil, poll: nil,
scheduled_at: nil, scheduled_at: nil,
sensitive: true, sensitive: true,