updated ScheduledActivity

This commit is contained in:
Maksim Pechnikov 2019-12-03 21:30:10 +03:00
parent b7c449118b
commit 652cc6ba4b
9 changed files with 142 additions and 72 deletions

View file

@ -496,7 +496,6 @@
crontab: [ crontab: [
{"0 0 * * *", Pleroma.Workers.Cron.ClearOauthTokenWorker}, {"0 0 * * *", Pleroma.Workers.Cron.ClearOauthTokenWorker},
{"0 * * * *", Pleroma.Workers.Cron.StatsWorker}, {"0 * * * *", Pleroma.Workers.Cron.StatsWorker},
{"* * * * *", Pleroma.Workers.Cron.ScheduledActivityWorker},
{"* * * * *", Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker}, {"* * * * *", Pleroma.Workers.Cron.PurgeExpiredActivitiesWorker},
{"0 0 * * 0", Pleroma.Workers.Cron.DigestEmailsWorker} {"0 0 * * 0", Pleroma.Workers.Cron.DigestEmailsWorker}
] ]

View file

@ -5,11 +5,13 @@
defmodule Pleroma.ScheduledActivity do defmodule Pleroma.ScheduledActivity do
use Ecto.Schema use Ecto.Schema
alias Ecto.Multi
alias Pleroma.Config alias Pleroma.Config
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.ScheduledActivity alias Pleroma.ScheduledActivity
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.CommonAPI.Utils alias Pleroma.Web.CommonAPI.Utils
alias Pleroma.Workers.ScheduledActivityWorker
import Ecto.Query import Ecto.Query
import Ecto.Changeset import Ecto.Changeset
@ -105,14 +107,29 @@ def far_enough?(scheduled_at) do
end end
def new(%User{} = user, attrs) do def new(%User{} = user, attrs) do
%ScheduledActivity{user_id: user.id} changeset(%ScheduledActivity{user_id: user.id}, attrs)
|> changeset(attrs)
end end
@doc """
Creates ScheduledActivity and add to queue to perform at scheduled_at date
"""
@spec create(User.t(), map()) :: {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
def create(%User{} = user, attrs) do def create(%User{} = user, attrs) do
user Multi.new()
|> new(attrs) |> Multi.insert(:scheduled_activity, new(user, attrs))
|> Repo.insert() |> Multi.run(:scheduled_activity_job, fn _repo, %{scheduled_activity: activity} ->
%{activity_id: activity.id}
|> ScheduledActivityWorker.new(scheduled_at: activity.scheduled_at)
|> Oban.insert()
end)
|> Repo.transaction()
|> case do
{:ok, %{scheduled_activity: scheduled_activity}} ->
{:ok, scheduled_activity}
{:error, _, changeset, _} ->
{:error, changeset}
end
end end
def get(%User{} = user, scheduled_activity_id) do def get(%User{} = user, scheduled_activity_id) do
@ -122,15 +139,35 @@ def get(%User{} = user, scheduled_activity_id) do
|> Repo.one() |> Repo.one()
end end
def update(%ScheduledActivity{} = scheduled_activity, attrs) do @spec update(ScheduledActivity.t(), map()) ::
scheduled_activity {:ok, ScheduledActivity.t()} | {:error, Ecto.Changeset.t()}
|> update_changeset(attrs) def update(%ScheduledActivity{id: id} = scheduled_activity, attrs) do
|> Repo.update() with {:error, %Ecto.Changeset{valid?: true} = changeset} <-
{:error, update_changeset(scheduled_activity, attrs)} do
Multi.new()
|> Multi.update(:scheduled_activity, changeset)
|> Multi.update_all(:scheduled_job, job_query(id),
set: [scheduled_at: changeset.changes[:scheduled_at]]
)
|> Repo.transaction()
|> case do
{:ok, %{scheduled_activity: scheduled_activity}} ->
{:ok, scheduled_activity}
{:error, _, changeset, _} ->
{:error, changeset}
end
end
end
def delete_job(%ScheduledActivity{id: id} = _scheduled_activity) do
id
|> job_query
|> Repo.delete_all()
end end
def delete(%ScheduledActivity{} = scheduled_activity) do def delete(%ScheduledActivity{} = scheduled_activity) do
scheduled_activity Repo.delete(scheduled_activity)
|> Repo.delete()
end end
def delete(id) when is_binary(id) or is_integer(id) do def delete(id) when is_binary(id) or is_integer(id) do
@ -158,4 +195,11 @@ def due_activities(offset \\ 0) do
|> where([sa], sa.scheduled_at < ^naive_datetime) |> where([sa], sa.scheduled_at < ^naive_datetime)
|> Repo.all() |> Repo.all()
end end
def job_query(scheduled_activity_id) do
from(j in Oban.Job,
where: j.queue == "scheduled_activities",
where: fragment("args ->> 'activity_id' = ?::text", ^to_string(scheduled_activity_id))
)
end
end end

View file

@ -45,7 +45,8 @@ def update(%{assigns: %{scheduled_activity: scheduled_activity}} = conn, params)
@doc "DELETE /api/v1/scheduled_statuses/:id" @doc "DELETE /api/v1/scheduled_statuses/:id"
def delete(%{assigns: %{scheduled_activity: scheduled_activity}} = conn, _params) do def delete(%{assigns: %{scheduled_activity: scheduled_activity}} = conn, _params) do
with {:ok, scheduled_activity} <- ScheduledActivity.delete(scheduled_activity) do with {:ok, scheduled_activity} <- ScheduledActivity.delete(scheduled_activity),
_ <- ScheduledActivity.delete_job(scheduled_activity) do
render(conn, "show.json", scheduled_activity: scheduled_activity) render(conn, "show.json", scheduled_activity: scheduled_activity)
end end
end end

View file

@ -124,15 +124,18 @@ def create(
) do ) do
params = Map.put(params, "in_reply_to_status_id", params["in_reply_to_id"]) params = Map.put(params, "in_reply_to_status_id", params["in_reply_to_id"])
if ScheduledActivity.far_enough?(scheduled_at) do with {:far_enough, true} <- {:far_enough, ScheduledActivity.far_enough?(scheduled_at)},
with {:ok, scheduled_activity} <- attrs <- %{"params" => params, "scheduled_at" => scheduled_at},
ScheduledActivity.create(user, %{"params" => params, "scheduled_at" => scheduled_at}) do {:ok, scheduled_activity} <- ScheduledActivity.create(user, attrs) do
conn conn
|> put_view(ScheduledActivityView) |> put_view(ScheduledActivityView)
|> render("show.json", scheduled_activity: scheduled_activity) |> render("show.json", scheduled_activity: scheduled_activity)
end
else else
{:far_enough, _} ->
create(conn, Map.drop(params, ["scheduled_at"])) create(conn, Map.drop(params, ["scheduled_at"]))
error ->
error
end end
end end

View file

@ -2,12 +2,13 @@
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/> # Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Cron.ScheduledActivityWorker do defmodule Pleroma.Workers.ScheduledActivityWorker do
@moduledoc """ @moduledoc """
The worker to post scheduled actvities. The worker to post scheduled activity.
""" """
use Oban.Worker, queue: "scheduled_activities" use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities"
alias Pleroma.Config alias Pleroma.Config
alias Pleroma.ScheduledActivity alias Pleroma.ScheduledActivity
alias Pleroma.User alias Pleroma.User
@ -15,18 +16,20 @@ defmodule Pleroma.Workers.Cron.ScheduledActivityWorker do
require Logger require Logger
@schedule_interval :timer.minutes(1)
@impl Oban.Worker @impl Oban.Worker
def perform(_opts, _job) do def perform(%{"activity_id" => activity_id}, _job) do
if Config.get([ScheduledActivity, :enabled]) do if Config.get([ScheduledActivity, :enabled]) do
@schedule_interval case Pleroma.Repo.get(ScheduledActivity, activity_id) do
|> ScheduledActivity.due_activities() %ScheduledActivity{} = scheduled_activity ->
|> Enum.each(&post_activity/1) post_activity(scheduled_activity)
_ ->
Logger.error("#{__MODULE__} Couldn't find scheduled activity: #{activity_id}")
end
end end
end end
def post_activity(scheduled_activity) do defp post_activity(%ScheduledActivity{} = scheduled_activity) do
try do try do
{:ok, scheduled_activity} = ScheduledActivity.delete(scheduled_activity) {:ok, scheduled_activity} = ScheduledActivity.delete(scheduled_activity)
%User{} = user = User.get_cached_by_id(scheduled_activity.user_id) %User{} = user = User.get_cached_by_id(scheduled_activity.user_id)

View file

@ -26,6 +26,7 @@ test "when daily user limit is exceeded" do
attrs = %{params: %{}, scheduled_at: today} attrs = %{params: %{}, scheduled_at: today}
{:ok, _} = ScheduledActivity.create(user, attrs) {:ok, _} = ScheduledActivity.create(user, attrs)
{:ok, _} = ScheduledActivity.create(user, attrs) {:ok, _} = ScheduledActivity.create(user, attrs)
{:error, changeset} = ScheduledActivity.create(user, attrs) {:error, changeset} = ScheduledActivity.create(user, attrs)
assert changeset.errors == [scheduled_at: {"daily limit exceeded", []}] assert changeset.errors == [scheduled_at: {"daily limit exceeded", []}]
end end
@ -83,7 +84,10 @@ test "creates a status from the scheduled activity" do
params: %{status: "hi"} params: %{status: "hi"}
) )
Pleroma.Workers.Cron.ScheduledActivityWorker.perform(:opts, :pid) Pleroma.Workers.ScheduledActivityWorker.perform(
%{"activity_id" => scheduled_activity.id},
:pid
)
refute Repo.get(ScheduledActivity, scheduled_activity.id) refute Repo.get(ScheduledActivity, scheduled_activity.id)
activity = Repo.all(Pleroma.Activity) |> Enum.find(&(&1.actor == user.ap_id)) activity = Repo.all(Pleroma.Activity) |> Enum.find(&(&1.actor == user.ap_id))

View file

@ -53,6 +53,12 @@ defmacro __using__(_opts) do
clear_config_all: 2 clear_config_all: 2
] ]
def to_datetime(naive_datetime) do
naive_datetime
|> DateTime.from_naive!("Etc/UTC")
|> DateTime.truncate(:second)
end
def collect_ids(collection) do def collect_ids(collection) do
collection collection
|> Enum.map(& &1.id) |> Enum.map(& &1.id)

View file

@ -9,6 +9,7 @@ defmodule Pleroma.Web.MastodonAPI.ScheduledActivityControllerTest do
alias Pleroma.ScheduledActivity alias Pleroma.ScheduledActivity
import Pleroma.Factory import Pleroma.Factory
import Ecto.Query
test "shows scheduled activities", %{conn: conn} do test "shows scheduled activities", %{conn: conn} do
user = insert(:user) user = insert(:user)
@ -68,7 +69,30 @@ test "shows a scheduled activity", %{conn: conn} do
test "updates a scheduled activity", %{conn: conn} do test "updates a scheduled activity", %{conn: conn} do
user = insert(:user) user = insert(:user)
scheduled_activity = insert(:scheduled_activity, user: user)
scheduled_at =
NaiveDateTime.add(
NaiveDateTime.utc_now(),
:timer.minutes(60),
:millisecond
)
{:ok, scheduled_activity} =
ScheduledActivity.create(
user,
%{
scheduled_at: scheduled_at,
params: build(:note).data
}
)
scheduled_activity_job =
Repo.one(from(j in Oban.Job, where: j.queue == "scheduled_activities"))
assert scheduled_activity_job.args == %{"activity_id" => scheduled_activity.id}
assert DateTime.truncate(scheduled_activity_job.scheduled_at, :second) ==
to_datetime(scheduled_at)
new_scheduled_at = new_scheduled_at =
NaiveDateTime.add(NaiveDateTime.utc_now(), :timer.minutes(120), :millisecond) NaiveDateTime.add(NaiveDateTime.utc_now(), :timer.minutes(120), :millisecond)
@ -82,6 +106,10 @@ test "updates a scheduled activity", %{conn: conn} do
assert %{"scheduled_at" => expected_scheduled_at} = json_response(res_conn, 200) assert %{"scheduled_at" => expected_scheduled_at} = json_response(res_conn, 200)
assert expected_scheduled_at == Pleroma.Web.CommonAPI.Utils.to_masto_date(new_scheduled_at) assert expected_scheduled_at == Pleroma.Web.CommonAPI.Utils.to_masto_date(new_scheduled_at)
scheduled_activity_job = refresh_record(scheduled_activity_job)
assert DateTime.truncate(scheduled_activity_job.scheduled_at, :second) ==
to_datetime(new_scheduled_at)
res_conn = res_conn =
conn conn
@ -93,7 +121,25 @@ test "updates a scheduled activity", %{conn: conn} do
test "deletes a scheduled activity", %{conn: conn} do test "deletes a scheduled activity", %{conn: conn} do
user = insert(:user) user = insert(:user)
scheduled_activity = insert(:scheduled_activity, user: user)
{:ok, scheduled_activity} =
ScheduledActivity.create(
user,
%{
scheduled_at:
NaiveDateTime.add(
NaiveDateTime.utc_now(),
:timer.minutes(60),
:millisecond
),
params: build(:note).data
}
)
scheduled_activity_job =
Repo.one(from(j in Oban.Job, where: j.queue == "scheduled_activities"))
assert scheduled_activity_job.args == %{"activity_id" => scheduled_activity.id}
res_conn = res_conn =
conn conn
@ -101,7 +147,8 @@ test "deletes a scheduled activity", %{conn: conn} do
|> delete("/api/v1/scheduled_statuses/#{scheduled_activity.id}") |> delete("/api/v1/scheduled_statuses/#{scheduled_activity.id}")
assert %{} = json_response(res_conn, 200) assert %{} = json_response(res_conn, 200)
assert nil == Repo.get(ScheduledActivity, scheduled_activity.id) refute Repo.get(ScheduledActivity, scheduled_activity.id)
refute Repo.get(Oban.Job, scheduled_activity_job.id)
res_conn = res_conn =
conn conn

View file

@ -1,37 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Workers.Cron.ScheduledActivityWorkerTest do
use Pleroma.DataCase
alias Pleroma.ScheduledActivity
import Pleroma.Factory
clear_config([ScheduledActivity, :enabled])
test "creates a status from the scheduled activity" do
Pleroma.Config.put([ScheduledActivity, :enabled], true)
user = insert(:user)
naive_datetime =
NaiveDateTime.add(
NaiveDateTime.utc_now(),
-:timer.minutes(2),
:millisecond
)
scheduled_activity =
insert(
:scheduled_activity,
scheduled_at: naive_datetime,
user: user,
params: %{status: "hi"}
)
Pleroma.Workers.Cron.ScheduledActivityWorker.perform(:opts, :pid)
refute Repo.get(ScheduledActivity, scheduled_activity.id)
activity = Repo.all(Pleroma.Activity) |> Enum.find(&(&1.actor == user.ap_id))
assert Pleroma.Object.normalize(activity).data["content"] == "hi"
end
end