Mark Felder
5da9cbd8a5
Rich Media parsing was previously handled on-demand with a 2 second HTTP request timeout and retained only in Cachex. Every time a Pleroma instance is restarted it will have to request and parse the data for each status with a URL detected. When fetching a batch of statuses they were processed in parallel to attempt to keep the maximum latency at 2 seconds, but often resulted in a timeline appearing to hang during loading due to a URL that could not be successfully reached. URLs which had images links that expire (Amazon AWS) were parsed and inserted with a TTL to ensure the image link would not break. Rich Media data is now cached in the database and fetched asynchronously. Cachex is used as a read-through cache. When the data becomes available we stream an update to the clients. If the result is returned quickly the experience is almost seamless. Activities were already processed for their Rich Media data during ingestion to warm the cache, so users should not normally encounter the asynchronous loading of the Rich Media data. Implementation notes: - The async worker is a Task with a globally unique process name to prevent duplicate processing of the same URL - The Task will attempt to fetch the data 3 times with increasing sleep time between attempts - The HTTP request obeys the default HTTP request timeout value instead of 2 seconds - URLs that cannot be successfully parsed due to an unexpected error receives a negative cache entry for 15 minutes - URLs that fail with an expected error will receive a negative cache with no TTL - Activities that have no detected URLs insert a nil value in the Cachex :scrubber_cache so we do not repeat parsing the object content with Floki every time the activity is rendered - Expiring image URLs are handled with an Oban job - There is no automatic cleanup of the Rich Media data in the database, but it is safe to delete at any time - The post draft/preview feature makes the URL processing synchronous so the rendered post preview will have an accurate rendering Overall performance of timelines and creating new posts which contain URLs is greatly improved.
588 lines
18 KiB
Elixir
588 lines
18 KiB
Elixir
# Pleroma: A lightweight social networking server
|
|
# Copyright © 2017-2022 Pleroma Authors <https://pleroma.social/>
|
|
# SPDX-License-Identifier: AGPL-3.0-only
|
|
|
|
defmodule Pleroma.Web.ActivityPub.SideEffects do
|
|
@moduledoc """
|
|
This module looks at an inserted object and executes the side effects that it
|
|
implies. For example, a `Like` activity will increase the like count on the
|
|
liked object, a `Follow` activity will add the user to the follower
|
|
collection, and so on.
|
|
"""
|
|
alias Pleroma.Activity
|
|
alias Pleroma.FollowingRelationship
|
|
alias Pleroma.Notification
|
|
alias Pleroma.Object
|
|
alias Pleroma.Repo
|
|
alias Pleroma.User
|
|
alias Pleroma.Web.ActivityPub.ActivityPub
|
|
alias Pleroma.Web.ActivityPub.Builder
|
|
alias Pleroma.Web.ActivityPub.Pipeline
|
|
alias Pleroma.Web.ActivityPub.Utils
|
|
alias Pleroma.Web.Push
|
|
alias Pleroma.Web.Streamer
|
|
alias Pleroma.Workers.PollWorker
|
|
|
|
require Pleroma.Constants
|
|
require Logger
|
|
|
|
@logger Pleroma.Config.get([:side_effects, :logger], Logger)
|
|
|
|
@behaviour Pleroma.Web.ActivityPub.SideEffects.Handling
|
|
|
|
defp ap_streamer, do: Pleroma.Config.get([:side_effects, :ap_streamer], ActivityPub)
|
|
|
|
@impl true
|
|
def handle(object, meta \\ [])
|
|
|
|
# Task this handles
|
|
# - Follows
|
|
# - Sends a notification
|
|
@impl true
|
|
def handle(
|
|
%{
|
|
data: %{
|
|
"actor" => actor,
|
|
"type" => "Accept",
|
|
"object" => follow_activity_id
|
|
}
|
|
} = object,
|
|
meta
|
|
) do
|
|
with %Activity{actor: follower_id} = follow_activity <-
|
|
Activity.get_by_ap_id(follow_activity_id),
|
|
%User{} = followed <- User.get_cached_by_ap_id(actor),
|
|
%User{} = follower <- User.get_cached_by_ap_id(follower_id),
|
|
{:ok, follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "accept"),
|
|
{:ok, _follower, followed} <-
|
|
FollowingRelationship.update(follower, followed, :follow_accept) do
|
|
Notification.update_notification_type(followed, follow_activity)
|
|
end
|
|
|
|
{:ok, object, meta}
|
|
end
|
|
|
|
# Task this handles
|
|
# - Rejects all existing follow activities for this person
|
|
# - Updates the follow state
|
|
# - Dismisses notification
|
|
@impl true
|
|
def handle(
|
|
%{
|
|
data: %{
|
|
"actor" => actor,
|
|
"type" => "Reject",
|
|
"object" => follow_activity_id
|
|
}
|
|
} = object,
|
|
meta
|
|
) do
|
|
with %Activity{actor: follower_id} = follow_activity <-
|
|
Activity.get_by_ap_id(follow_activity_id),
|
|
%User{} = followed <- User.get_cached_by_ap_id(actor),
|
|
%User{} = follower <- User.get_cached_by_ap_id(follower_id),
|
|
{:ok, _follow_activity} <- Utils.update_follow_state_for_all(follow_activity, "reject") do
|
|
FollowingRelationship.update(follower, followed, :follow_reject)
|
|
Notification.dismiss(follow_activity)
|
|
end
|
|
|
|
{:ok, object, meta}
|
|
end
|
|
|
|
# Tasks this handle
|
|
# - Follows if possible
|
|
# - Sends a notification
|
|
# - Generates accept or reject if appropriate
|
|
@impl true
|
|
def handle(
|
|
%{
|
|
data: %{
|
|
"id" => follow_id,
|
|
"type" => "Follow",
|
|
"object" => followed_user,
|
|
"actor" => following_user
|
|
}
|
|
} = object,
|
|
meta
|
|
) do
|
|
with %User{} = follower <- User.get_cached_by_ap_id(following_user),
|
|
%User{} = followed <- User.get_cached_by_ap_id(followed_user),
|
|
{_, {:ok, _, _}, _, _} <-
|
|
{:following, User.follow(follower, followed, :follow_pending), follower, followed} do
|
|
if followed.local && User.can_direct_follow_local(follower, followed) do
|
|
{:ok, accept_data, _} = Builder.accept(followed, object)
|
|
{:ok, _activity, _} = Pipeline.common_pipeline(accept_data, local: true)
|
|
end
|
|
else
|
|
{:following, {:error, _}, _follower, followed} ->
|
|
{:ok, reject_data, _} = Builder.reject(followed, object)
|
|
{:ok, _activity, _} = Pipeline.common_pipeline(reject_data, local: true)
|
|
|
|
_ ->
|
|
nil
|
|
end
|
|
|
|
{:ok, notifications} = Notification.create_notifications(object, do_send: false)
|
|
|
|
meta =
|
|
meta
|
|
|> add_notifications(notifications)
|
|
|
|
updated_object = Activity.get_by_ap_id(follow_id)
|
|
|
|
{:ok, updated_object, meta}
|
|
end
|
|
|
|
# Tasks this handles:
|
|
# - Unfollow and block
|
|
@impl true
|
|
def handle(
|
|
%{data: %{"type" => "Block", "object" => blocked_user, "actor" => blocking_user}} =
|
|
object,
|
|
meta
|
|
) do
|
|
with %User{} = blocker <- User.get_cached_by_ap_id(blocking_user),
|
|
%User{} = blocked <- User.get_cached_by_ap_id(blocked_user) do
|
|
User.block(blocker, blocked)
|
|
end
|
|
|
|
{:ok, object, meta}
|
|
end
|
|
|
|
# Tasks this handles:
|
|
# - Update the user
|
|
# - Update a non-user object (Note, Question, etc.)
|
|
#
|
|
# For a local user, we also get a changeset with the full information, so we
|
|
# can update non-federating, non-activitypub settings as well.
|
|
@impl true
|
|
def handle(%{data: %{"type" => "Update", "object" => updated_object}} = object, meta) do
|
|
updated_object_id = updated_object["id"]
|
|
|
|
with {_, true} <- {:has_id, is_binary(updated_object_id)},
|
|
%{"type" => type} <- updated_object,
|
|
{_, is_user} <- {:is_user, type in Pleroma.Constants.actor_types()} do
|
|
if is_user do
|
|
handle_update_user(object, meta)
|
|
else
|
|
handle_update_object(object, meta)
|
|
end
|
|
else
|
|
_ ->
|
|
{:ok, object, meta}
|
|
end
|
|
end
|
|
|
|
# Tasks this handles:
|
|
# - Add like to object
|
|
# - Set up notification
|
|
@impl true
|
|
def handle(%{data: %{"type" => "Like"}} = object, meta) do
|
|
liked_object = Object.get_by_ap_id(object.data["object"])
|
|
Utils.add_like_to_object(object, liked_object)
|
|
|
|
Notification.create_notifications(object)
|
|
|
|
{:ok, object, meta}
|
|
end
|
|
|
|
# Tasks this handles
|
|
# - Actually create object
|
|
# - Rollback if we couldn't create it
|
|
# - Increase the user note count
|
|
# - Increase the reply count
|
|
# - Increase replies count
|
|
# - Ask for scraping of nodeinfo
|
|
# - Set up ActivityExpiration
|
|
# - Set up notifications
|
|
# - Index incoming posts for search (if needed)
|
|
@impl true
|
|
def handle(%{data: %{"type" => "Create"}} = activity, meta) do
|
|
with {:ok, object, meta} <- handle_object_creation(meta[:object_data], activity, meta),
|
|
%User{} = user <- User.get_cached_by_ap_id(activity.data["actor"]) do
|
|
{:ok, notifications} = Notification.create_notifications(activity, do_send: false)
|
|
{:ok, _user} = ActivityPub.increase_note_count_if_public(user, object)
|
|
{:ok, _user} = ActivityPub.update_last_status_at_if_public(user, object)
|
|
|
|
if in_reply_to = object.data["type"] != "Answer" && object.data["inReplyTo"] do
|
|
Object.increase_replies_count(in_reply_to)
|
|
end
|
|
|
|
reply_depth = (meta[:depth] || 0) + 1
|
|
|
|
Pleroma.Workers.NodeInfoFetcherWorker.enqueue("process", %{
|
|
"source_url" => activity.data["actor"]
|
|
})
|
|
|
|
# FIXME: Force inReplyTo to replies
|
|
if Pleroma.Web.Federator.allowed_thread_distance?(reply_depth) and
|
|
object.data["replies"] != nil do
|
|
for reply_id <- object.data["replies"] do
|
|
Pleroma.Workers.RemoteFetcherWorker.enqueue("fetch_remote", %{
|
|
"id" => reply_id,
|
|
"depth" => reply_depth
|
|
})
|
|
end
|
|
end
|
|
|
|
Pleroma.Web.RichMedia.Card.get_by_activity(activity)
|
|
|
|
Pleroma.Search.add_to_index(Map.put(activity, :object, object))
|
|
|
|
meta =
|
|
meta
|
|
|> add_notifications(notifications)
|
|
|
|
ap_streamer().stream_out(activity)
|
|
|
|
{:ok, activity, meta}
|
|
else
|
|
e ->
|
|
Logger.error(inspect(e))
|
|
Repo.rollback(e)
|
|
end
|
|
end
|
|
|
|
# Tasks this handles:
|
|
# - Add announce to object
|
|
# - Set up notification
|
|
# - Stream out the announce
|
|
@impl true
|
|
def handle(%{data: %{"type" => "Announce"}} = object, meta) do
|
|
announced_object = Object.get_by_ap_id(object.data["object"])
|
|
user = User.get_cached_by_ap_id(object.data["actor"])
|
|
|
|
Utils.add_announce_to_object(object, announced_object)
|
|
|
|
if !User.is_internal_user?(user) do
|
|
Notification.create_notifications(object)
|
|
|
|
ap_streamer().stream_out(object)
|
|
end
|
|
|
|
{:ok, object, meta}
|
|
end
|
|
|
|
@impl true
|
|
def handle(%{data: %{"type" => "Undo", "object" => undone_object}} = object, meta) do
|
|
with undone_object <- Activity.get_by_ap_id(undone_object),
|
|
:ok <- handle_undoing(undone_object) do
|
|
{:ok, object, meta}
|
|
end
|
|
end
|
|
|
|
# Tasks this handles:
|
|
# - Add reaction to object
|
|
# - Set up notification
|
|
@impl true
|
|
def handle(%{data: %{"type" => "EmojiReact"}} = object, meta) do
|
|
reacted_object = Object.get_by_ap_id(object.data["object"])
|
|
Utils.add_emoji_reaction_to_object(object, reacted_object)
|
|
|
|
Notification.create_notifications(object)
|
|
|
|
{:ok, object, meta}
|
|
end
|
|
|
|
# Tasks this handles:
|
|
# - Delete and unpins the create activity
|
|
# - Set up notification
|
|
# - Reduce the user note count
|
|
# - Reduce the reply count
|
|
# - Stream out the activity
|
|
# - Removes posts from search index (if needed)
|
|
@impl true
|
|
def handle(%{data: %{"type" => "Delete", "object" => deleted_object}} = object, meta) do
|
|
deleted_object =
|
|
Object.normalize(deleted_object, fetch: false) ||
|
|
User.get_cached_by_ap_id(deleted_object)
|
|
|
|
result =
|
|
case deleted_object do
|
|
%Object{} ->
|
|
with {:ok, deleted_object, _activity} <- Object.delete(deleted_object),
|
|
{_, actor} when is_binary(actor) <- {:actor, deleted_object.data["actor"]},
|
|
%User{} = user <- User.get_cached_by_ap_id(actor) do
|
|
User.remove_pinned_object_id(user, deleted_object.data["id"])
|
|
|
|
{:ok, user} = ActivityPub.decrease_note_count_if_public(user, deleted_object)
|
|
|
|
if in_reply_to = deleted_object.data["inReplyTo"] do
|
|
Object.decrease_replies_count(in_reply_to)
|
|
end
|
|
|
|
ap_streamer().stream_out(object)
|
|
ap_streamer().stream_out_participations(deleted_object, user)
|
|
:ok
|
|
else
|
|
{:actor, _} ->
|
|
@logger.error("The object doesn't have an actor: #{inspect(deleted_object)}")
|
|
:no_object_actor
|
|
end
|
|
|
|
%User{} ->
|
|
with {:ok, _} <- User.delete(deleted_object) do
|
|
:ok
|
|
end
|
|
end
|
|
|
|
if result == :ok do
|
|
# Only remove from index when deleting actual objects, not users or anything else
|
|
with %Pleroma.Object{} <- deleted_object do
|
|
Pleroma.Search.remove_from_index(deleted_object)
|
|
end
|
|
|
|
{:ok, object, meta}
|
|
else
|
|
{:error, result}
|
|
end
|
|
end
|
|
|
|
# Tasks this handles:
|
|
# - adds pin to user
|
|
# - removes expiration job for pinned activity, if was set for expiration
|
|
@impl true
|
|
def handle(%{data: %{"type" => "Add"} = data} = object, meta) do
|
|
with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
|
|
{:ok, _user} <- User.add_pinned_object_id(user, data["object"]) do
|
|
# if pinned activity was scheduled for deletion, we remove job
|
|
if expiration = Pleroma.Workers.PurgeExpiredActivity.get_expiration(meta[:activity_id]) do
|
|
Oban.cancel_job(expiration.id)
|
|
end
|
|
|
|
{:ok, object, meta}
|
|
else
|
|
nil ->
|
|
{:error, :user_not_found}
|
|
|
|
{:error, changeset} ->
|
|
if changeset.errors[:pinned_objects] do
|
|
{:error, :pinned_statuses_limit_reached}
|
|
else
|
|
changeset.errors
|
|
end
|
|
end
|
|
end
|
|
|
|
# Tasks this handles:
|
|
# - removes pin from user
|
|
# - removes corresponding Add activity
|
|
# - if activity had expiration, recreates activity expiration job
|
|
@impl true
|
|
def handle(%{data: %{"type" => "Remove"} = data} = object, meta) do
|
|
with %User{} = user <- User.get_cached_by_ap_id(data["actor"]),
|
|
{:ok, _user} <- User.remove_pinned_object_id(user, data["object"]) do
|
|
data["object"]
|
|
|> Activity.add_by_params_query(user.ap_id, user.featured_address)
|
|
|> Repo.delete_all()
|
|
|
|
# if pinned activity was scheduled for deletion, we reschedule it for deletion
|
|
if meta[:expires_at] do
|
|
# MRF.ActivityExpirationPolicy used UTC timestamps for expires_at in original implementation
|
|
{:ok, expires_at} =
|
|
Pleroma.EctoType.ActivityPub.ObjectValidators.DateTime.cast(meta[:expires_at])
|
|
|
|
Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
|
|
activity_id: meta[:activity_id],
|
|
expires_at: expires_at
|
|
})
|
|
end
|
|
|
|
{:ok, object, meta}
|
|
else
|
|
nil -> {:error, :user_not_found}
|
|
error -> error
|
|
end
|
|
end
|
|
|
|
# Nothing to do
|
|
@impl true
|
|
def handle(object, meta) do
|
|
{:ok, object, meta}
|
|
end
|
|
|
|
defp handle_update_user(
|
|
%{data: %{"type" => "Update", "object" => updated_object}} = object,
|
|
meta
|
|
) do
|
|
if changeset = Keyword.get(meta, :user_update_changeset) do
|
|
changeset
|
|
|> User.update_and_set_cache()
|
|
else
|
|
{:ok, new_user_data} = ActivityPub.user_data_from_user_object(updated_object)
|
|
|
|
User.get_by_ap_id(updated_object["id"])
|
|
|> User.remote_user_changeset(new_user_data)
|
|
|> User.update_and_set_cache()
|
|
end
|
|
|
|
{:ok, object, meta}
|
|
end
|
|
|
|
defp handle_update_object(
|
|
%{data: %{"type" => "Update", "object" => updated_object}} = object,
|
|
meta
|
|
) do
|
|
orig_object_ap_id = updated_object["id"]
|
|
orig_object = Object.get_by_ap_id(orig_object_ap_id)
|
|
orig_object_data = orig_object.data
|
|
|
|
updated_object =
|
|
if meta[:local] do
|
|
# If this is a local Update, we don't process it by transmogrifier,
|
|
# so we use the embedded object as-is.
|
|
updated_object
|
|
else
|
|
meta[:object_data]
|
|
end
|
|
|
|
if orig_object_data["type"] in Pleroma.Constants.updatable_object_types() do
|
|
%{
|
|
updated_data: updated_object_data,
|
|
updated: updated,
|
|
used_history_in_new_object?: used_history_in_new_object?
|
|
} = Object.Updater.make_new_object_data_from_update_object(orig_object_data, updated_object)
|
|
|
|
changeset =
|
|
orig_object
|
|
|> Repo.preload(:hashtags)
|
|
|> Object.change(%{data: updated_object_data})
|
|
|
|
with {:ok, new_object} <- Repo.update(changeset),
|
|
{:ok, _} <- Object.invalid_object_cache(new_object),
|
|
{:ok, _} <- Object.set_cache(new_object),
|
|
# The metadata/utils.ex uses the object id for the cache.
|
|
{:ok, _} <- Pleroma.Activity.HTML.invalidate_cache_for(new_object.id) do
|
|
if used_history_in_new_object? do
|
|
with create_activity when not is_nil(create_activity) <-
|
|
Pleroma.Activity.get_create_by_object_ap_id(orig_object_ap_id),
|
|
{:ok, _} <- Pleroma.Activity.HTML.invalidate_cache_for(create_activity.id) do
|
|
nil
|
|
else
|
|
_ -> nil
|
|
end
|
|
end
|
|
|
|
if updated do
|
|
object
|
|
|> Activity.normalize()
|
|
|> ActivityPub.notify_and_stream()
|
|
end
|
|
end
|
|
end
|
|
|
|
{:ok, object, meta}
|
|
end
|
|
|
|
def handle_object_creation(%{"type" => "Question"} = object, activity, meta) do
|
|
with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
|
|
PollWorker.schedule_poll_end(activity)
|
|
{:ok, object, meta}
|
|
end
|
|
end
|
|
|
|
def handle_object_creation(%{"type" => "Answer"} = object_map, _activity, meta) do
|
|
with {:ok, object, meta} <- Pipeline.common_pipeline(object_map, meta) do
|
|
Object.increase_vote_count(
|
|
object.data["inReplyTo"],
|
|
object.data["name"],
|
|
object.data["actor"]
|
|
)
|
|
|
|
{:ok, object, meta}
|
|
end
|
|
end
|
|
|
|
def handle_object_creation(%{"type" => objtype} = object, _activity, meta)
|
|
when objtype in ~w[Audio Video Event Article Note Page] do
|
|
with {:ok, object, meta} <- Pipeline.common_pipeline(object, meta) do
|
|
{:ok, object, meta}
|
|
end
|
|
end
|
|
|
|
# Nothing to do
|
|
def handle_object_creation(object, _activity, meta) do
|
|
{:ok, object, meta}
|
|
end
|
|
|
|
defp undo_like(nil, object), do: delete_object(object)
|
|
|
|
defp undo_like(%Object{} = liked_object, object) do
|
|
with {:ok, _} <- Utils.remove_like_from_object(object, liked_object) do
|
|
delete_object(object)
|
|
end
|
|
end
|
|
|
|
def handle_undoing(%{data: %{"type" => "Like"}} = object) do
|
|
object.data["object"]
|
|
|> Object.get_by_ap_id()
|
|
|> undo_like(object)
|
|
end
|
|
|
|
def handle_undoing(%{data: %{"type" => "EmojiReact"}} = object) do
|
|
with %Object{} = reacted_object <- Object.get_by_ap_id(object.data["object"]),
|
|
{:ok, _} <- Utils.remove_emoji_reaction_from_object(object, reacted_object),
|
|
{:ok, _} <- Repo.delete(object) do
|
|
:ok
|
|
end
|
|
end
|
|
|
|
def handle_undoing(%{data: %{"type" => "Announce"}} = object) do
|
|
with %Object{} = liked_object <- Object.get_by_ap_id(object.data["object"]),
|
|
{:ok, _} <- Utils.remove_announce_from_object(object, liked_object),
|
|
{:ok, _} <- Repo.delete(object) do
|
|
:ok
|
|
end
|
|
end
|
|
|
|
def handle_undoing(
|
|
%{data: %{"type" => "Block", "actor" => blocker, "object" => blocked}} = object
|
|
) do
|
|
with %User{} = blocker <- User.get_cached_by_ap_id(blocker),
|
|
%User{} = blocked <- User.get_cached_by_ap_id(blocked),
|
|
{:ok, _} <- User.unblock(blocker, blocked),
|
|
{:ok, _} <- Repo.delete(object) do
|
|
:ok
|
|
end
|
|
end
|
|
|
|
def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
|
|
|
|
@spec delete_object(Object.t()) :: :ok | {:error, Ecto.Changeset.t()}
|
|
defp delete_object(object) do
|
|
with {:ok, _} <- Repo.delete(object), do: :ok
|
|
end
|
|
|
|
defp send_notifications(meta) do
|
|
Keyword.get(meta, :notifications, [])
|
|
|> Enum.each(fn notification ->
|
|
Streamer.stream(["user", "user:notification"], notification)
|
|
Push.send(notification)
|
|
end)
|
|
|
|
meta
|
|
end
|
|
|
|
defp send_streamables(meta) do
|
|
Keyword.get(meta, :streamables, [])
|
|
|> Enum.each(fn {topics, items} ->
|
|
Streamer.stream(topics, items)
|
|
end)
|
|
|
|
meta
|
|
end
|
|
|
|
defp add_notifications(meta, notifications) do
|
|
existing = Keyword.get(meta, :notifications, [])
|
|
|
|
meta
|
|
|> Keyword.put(:notifications, notifications ++ existing)
|
|
end
|
|
|
|
@impl true
|
|
def handle_after_transaction(meta) do
|
|
meta
|
|
|> send_notifications()
|
|
|> send_streamables()
|
|
end
|
|
end
|