Merge branch 'fix/activity-expirations-again' into 'develop'

Fix activity expirations again

See merge request 
This commit is contained in:
feld 2020-08-13 17:38:19 +00:00
commit 14a06e63f6
8 changed files with 76 additions and 5 deletions

View file

@ -114,6 +114,7 @@ To add configuration to your config file, you can copy it from the base config.
* `Pleroma.Web.ActivityPub.MRF.MentionPolicy`: Drops posts mentioning configurable users. (See [`:mrf_mention`](#mrf_mention)).
* `Pleroma.Web.ActivityPub.MRF.VocabularyPolicy`: Restricts activities to a configured set of vocabulary. (See [`:mrf_vocabulary`](#mrf_vocabulary)).
* `Pleroma.Web.ActivityPub.MRF.ObjectAgePolicy`: Rejects or delists posts based on their age when received. (See [`:mrf_object_age`](#mrf_object_age)).
* `Pleroma.Web.ActivityPub.MRF.ActivityExpirationPolicy`: Sets a default expiration on all posts made by users of the local instance. Requires `Pleroma.ActivityExpiration` to be enabled for processing the scheduled delections.
* `transparency`: Make the content of your Message Rewrite Facility settings public (via nodeinfo).
* `transparency_exclusions`: Exclude specific instance names from MRF transparency. The use of the exclusions feature will be disclosed in nodeinfo as a boolean value.
@ -220,6 +221,8 @@ config :pleroma, :mrf_user_allowlist, %{
## Pleroma.ActivityExpiration
Enables the worker which processes posts scheduled for deletion. Pinned posts are exempt from expiration.
* `enabled`: whether expired activities will be sent to the job queue to be deleted
## Frontends

View file

@ -134,13 +134,23 @@ def run(["ensure_expiration"]) do
Pleroma.Activity
|> join(:left, [a], u in assoc(a, :expiration))
|> join(:inner, [a, _u], o in Object,
on:
fragment(
"(?->>'id') = COALESCE((?)->'object'->> 'id', (?)->>'object')",
o.data,
a.data,
a.data
)
)
|> where(local: true)
|> where([a, u], is_nil(u))
|> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data))
|> where([_a, _u, o], fragment("?->>'type' = 'Note'", o.data))
|> Pleroma.RepoStreamer.chunk_stream(100)
|> Stream.each(fn activities ->
Enum.each(activities, fn activity ->
expires_at = Timex.shift(activity.inserted_at, days: days)
Pleroma.ActivityExpiration.create(activity, expires_at, false)
end)
end)

View file

@ -340,4 +340,10 @@ def direct_conversation_id(activity, for_user) do
_ -> nil
end
end
@spec pinned_by_actor?(Activity.t()) :: boolean()
def pinned_by_actor?(%Activity{} = activity) do
actor = user_actor(activity)
activity.id in actor.pinned_activities
end
end

View file

@ -46,7 +46,12 @@ def due_expirations(offset \\ 0) do
ActivityExpiration
|> where([exp], exp.scheduled_at < ^naive_datetime)
|> limit(50)
|> preload(:activity)
|> Repo.all()
|> Enum.reject(fn %{activity: activity} ->
Activity.pinned_by_actor?(activity)
end)
end
def validate_scheduled_at(changeset, false), do: changeset

View file

@ -23,6 +23,8 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
alias Pleroma.Web.Streamer
alias Pleroma.Workers.BackgroundWorker
require Logger
def handle(object, meta \\ [])
# Task this handles
@ -251,13 +253,15 @@ def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
# - Stream out the activity
def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
deleted_object =
Object.normalize(deleted_object, false) || User.get_cached_by_ap_id(deleted_object)
Object.normalize(deleted_object, false) ||
User.get_cached_by_ap_id(deleted_object)
result =
case deleted_object do
%Object{} ->
with {:ok, deleted_object, activity} <- Object.delete(deleted_object),
%User{} = user <- User.get_cached_by_ap_id(deleted_object.data["actor"]) do
{_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
%User{} = user <- User.get_cached_by_ap_id(actor) do
User.remove_pinnned_activity(user, activity)
{:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
@ -271,6 +275,10 @@ def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object,
ActivityPub.stream_out(object)
ActivityPub.stream_out_participations(deleted_object, user)
:ok
else
{:actor, _} ->
Logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
:no_object_actor
end
%User{} ->

View file

@ -0,0 +1,19 @@
defmodule Pleroma.Repo.Migrations.OnlyExpireCreates do
use Ecto.Migration
def up do
statement = """
DELETE FROM
activity_expirations a_exp USING activities a, objects o
WHERE
a_exp.activity_id = a.id AND (o.data->>'id') = COALESCE(a.data->'object'->>'id', a.data->>'object')
AND (a.data->>'type' != 'Create' OR o.data->>'type' != 'Note');
"""
execute(statement)
end
def down do
:ok
end
end

View file

@ -11,7 +11,10 @@ defmodule Pleroma.ActivityExpirationTest do
test "finds activities due to be deleted only" do
activity = insert(:note_activity)
expiration_due = insert(:expiration_in_the_past, %{activity_id: activity.id})
expiration_due =
insert(:expiration_in_the_past, %{activity_id: activity.id}) |> Repo.preload(:activity)
activity2 = insert(:note_activity)
insert(:expiration_in_the_future, %{activity_id: activity2.id})

View file

@ -19,8 +19,9 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do
alias Pleroma.Web.ActivityPub.SideEffects
alias Pleroma.Web.CommonAPI
import Pleroma.Factory
import ExUnit.CaptureLog
import Mock
import Pleroma.Factory
describe "handle_after_transaction" do
test "it streams out notifications and streams" do
@ -221,6 +222,22 @@ test "it handles user deletions", %{delete_user: delete, user: user} do
assert User.get_cached_by_ap_id(user.ap_id).deactivated
end
test "it logs issues with objects deletion", %{
delete: delete,
object: object
} do
{:ok, object} =
object
|> Object.change(%{data: Map.delete(object.data, "actor")})
|> Repo.update()
Object.invalid_object_cache(object)
assert capture_log(fn ->
{:error, :no_object_actor} = SideEffects.handle(delete)
end) =~ "object doesn't have an actor"
end
end
describe "EmojiReact objects" do