Pipeline: Add a side effects step after the transaction finishes
This is to run things like streaming notifications out, which will sometimes need data that is created by the transaction, but is streamed out asynchronously.
This commit is contained in:
parent
65689ba9bd
commit
115d08a754
6 changed files with 169 additions and 29 deletions
|
@ -334,30 +334,34 @@ def dismiss(%{id: user_id} = _user, id) do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def create_notifications(%Activity{data: %{"to" => _, "type" => "Create"}} = activity) do
|
def create_notifications(activity, options \\ [])
|
||||||
|
|
||||||
|
def create_notifications(%Activity{data: %{"to" => _, "type" => "Create"}} = activity, options) do
|
||||||
object = Object.normalize(activity, false)
|
object = Object.normalize(activity, false)
|
||||||
|
|
||||||
if object && object.data["type"] == "Answer" do
|
if object && object.data["type"] == "Answer" do
|
||||||
{:ok, []}
|
{:ok, []}
|
||||||
else
|
else
|
||||||
do_create_notifications(activity)
|
do_create_notifications(activity, options)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def create_notifications(%Activity{data: %{"type" => type}} = activity)
|
def create_notifications(%Activity{data: %{"type" => type}} = activity, options)
|
||||||
when type in ["Follow", "Like", "Announce", "Move", "EmojiReact"] do
|
when type in ["Follow", "Like", "Announce", "Move", "EmojiReact"] do
|
||||||
do_create_notifications(activity)
|
do_create_notifications(activity, options)
|
||||||
end
|
end
|
||||||
|
|
||||||
def create_notifications(_), do: {:ok, []}
|
def create_notifications(_, _), do: {:ok, []}
|
||||||
|
|
||||||
|
defp do_create_notifications(%Activity{} = activity, options) do
|
||||||
|
do_send = Keyword.get(options, :do_send, true)
|
||||||
|
|
||||||
defp do_create_notifications(%Activity{} = activity) do
|
|
||||||
{enabled_receivers, disabled_receivers} = get_notified_from_activity(activity)
|
{enabled_receivers, disabled_receivers} = get_notified_from_activity(activity)
|
||||||
potential_receivers = enabled_receivers ++ disabled_receivers
|
potential_receivers = enabled_receivers ++ disabled_receivers
|
||||||
|
|
||||||
notifications =
|
notifications =
|
||||||
Enum.map(potential_receivers, fn user ->
|
Enum.map(potential_receivers, fn user ->
|
||||||
do_send = user in enabled_receivers
|
do_send = do_send && user in enabled_receivers
|
||||||
create_notification(activity, user, do_send)
|
create_notification(activity, user, do_send)
|
||||||
end)
|
end)
|
||||||
|
|
||||||
|
@ -623,4 +627,12 @@ def skip?(:recently_followed, %Activity{data: %{"type" => "Follow"}} = activity,
|
||||||
end
|
end
|
||||||
|
|
||||||
def skip?(_, _, _), do: false
|
def skip?(_, _, _), do: false
|
||||||
|
|
||||||
|
def for_user_and_activity(user, activity) do
|
||||||
|
from(n in __MODULE__,
|
||||||
|
where: n.user_id == ^user.id,
|
||||||
|
where: n.activity_id == ^activity.id
|
||||||
|
)
|
||||||
|
|> Repo.one()
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -17,6 +17,10 @@ defmodule Pleroma.Web.ActivityPub.Pipeline do
|
||||||
{:ok, Activity.t() | Object.t(), keyword()} | {:error, any()}
|
{:ok, Activity.t() | Object.t(), keyword()} | {:error, any()}
|
||||||
def common_pipeline(object, meta) do
|
def common_pipeline(object, meta) do
|
||||||
case Repo.transaction(fn -> do_common_pipeline(object, meta) end) do
|
case Repo.transaction(fn -> do_common_pipeline(object, meta) end) do
|
||||||
|
{:ok, {:ok, activity, meta}} ->
|
||||||
|
SideEffects.handle_after_transaction(meta)
|
||||||
|
{:ok, activity, meta}
|
||||||
|
|
||||||
{:ok, value} ->
|
{:ok, value} ->
|
||||||
value
|
value
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do
|
||||||
alias Pleroma.Web.ActivityPub.Pipeline
|
alias Pleroma.Web.ActivityPub.Pipeline
|
||||||
alias Pleroma.Web.ActivityPub.Utils
|
alias Pleroma.Web.ActivityPub.Utils
|
||||||
alias Pleroma.Web.Streamer
|
alias Pleroma.Web.Streamer
|
||||||
|
alias Pleroma.Web.Push
|
||||||
|
|
||||||
def handle(object, meta \\ [])
|
def handle(object, meta \\ [])
|
||||||
|
|
||||||
|
@ -37,7 +38,12 @@ def handle(%{data: %{"type" => "Like"}} = object, meta) do
|
||||||
# - Set up notifications
|
# - Set up notifications
|
||||||
def handle(%{data: %{"type" => "Create"}} = activity, meta) do
|
def handle(%{data: %{"type" => "Create"}} = activity, meta) do
|
||||||
with {:ok, _object, _meta} <- handle_object_creation(meta[:object_data], meta) do
|
with {:ok, _object, _meta} <- handle_object_creation(meta[:object_data], meta) do
|
||||||
Notification.create_notifications(activity)
|
{:ok, notifications} = Notification.create_notifications(activity, do_send: false)
|
||||||
|
|
||||||
|
meta =
|
||||||
|
meta
|
||||||
|
|> add_notifications(notifications)
|
||||||
|
|
||||||
{:ok, activity, meta}
|
{:ok, activity, meta}
|
||||||
else
|
else
|
||||||
e -> Repo.rollback(e)
|
e -> Repo.rollback(e)
|
||||||
|
@ -200,4 +206,26 @@ def handle_undoing(
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
|
def handle_undoing(object), do: {:error, ["don't know how to handle", object]}
|
||||||
|
|
||||||
|
defp send_notifications(meta) do
|
||||||
|
Keyword.get(meta, :created_notifications, [])
|
||||||
|
|> Enum.each(fn notification ->
|
||||||
|
Streamer.stream(["user", "user:notification"], notification)
|
||||||
|
Push.send(notification)
|
||||||
|
end)
|
||||||
|
|
||||||
|
meta
|
||||||
|
end
|
||||||
|
|
||||||
|
defp add_notifications(meta, notifications) do
|
||||||
|
existing = Keyword.get(meta, :created_notifications, [])
|
||||||
|
|
||||||
|
meta
|
||||||
|
|> Keyword.put(:created_notifications, notifications ++ existing)
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_after_transaction(meta) do
|
||||||
|
meta
|
||||||
|
|> send_notifications()
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -33,7 +33,10 @@ test "it goes through validation, filtering, persisting, side effects and federa
|
||||||
{
|
{
|
||||||
Pleroma.Web.ActivityPub.SideEffects,
|
Pleroma.Web.ActivityPub.SideEffects,
|
||||||
[],
|
[],
|
||||||
[handle: fn o, m -> {:ok, o, m} end]
|
[
|
||||||
|
handle: fn o, m -> {:ok, o, m} end,
|
||||||
|
handle_after_transaction: fn m -> m end
|
||||||
|
]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Pleroma.Web.Federator,
|
Pleroma.Web.Federator,
|
||||||
|
@ -71,7 +74,7 @@ test "it goes through validation, filtering, persisting, side effects without fe
|
||||||
{
|
{
|
||||||
Pleroma.Web.ActivityPub.SideEffects,
|
Pleroma.Web.ActivityPub.SideEffects,
|
||||||
[],
|
[],
|
||||||
[handle: fn o, m -> {:ok, o, m} end]
|
[handle: fn o, m -> {:ok, o, m} end, handle_after_transaction: fn m -> m end]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Pleroma.Web.Federator,
|
Pleroma.Web.Federator,
|
||||||
|
@ -110,7 +113,7 @@ test "it goes through validation, filtering, persisting, side effects without fe
|
||||||
{
|
{
|
||||||
Pleroma.Web.ActivityPub.SideEffects,
|
Pleroma.Web.ActivityPub.SideEffects,
|
||||||
[],
|
[],
|
||||||
[handle: fn o, m -> {:ok, o, m} end]
|
[handle: fn o, m -> {:ok, o, m} end, handle_after_transaction: fn m -> m end]
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Pleroma.Web.Federator,
|
Pleroma.Web.Federator,
|
||||||
|
|
|
@ -22,6 +22,47 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do
|
||||||
import Pleroma.Factory
|
import Pleroma.Factory
|
||||||
import Mock
|
import Mock
|
||||||
|
|
||||||
|
describe "handle_after_transaction" do
|
||||||
|
test "it streams out notifications" do
|
||||||
|
author = insert(:user, local: true)
|
||||||
|
recipient = insert(:user, local: true)
|
||||||
|
|
||||||
|
{:ok, chat_message_data, _meta} = Builder.chat_message(author, recipient.ap_id, "hey")
|
||||||
|
|
||||||
|
{:ok, create_activity_data, _meta} =
|
||||||
|
Builder.create(author, chat_message_data["id"], [recipient.ap_id])
|
||||||
|
|
||||||
|
{:ok, create_activity, _meta} = ActivityPub.persist(create_activity_data, local: false)
|
||||||
|
|
||||||
|
{:ok, _create_activity, meta} =
|
||||||
|
SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
|
||||||
|
|
||||||
|
assert [notification] = meta[:created_notifications]
|
||||||
|
|
||||||
|
with_mocks([
|
||||||
|
{
|
||||||
|
Pleroma.Web.Streamer,
|
||||||
|
[],
|
||||||
|
[
|
||||||
|
stream: fn _, _ -> nil end
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Pleroma.Web.Push,
|
||||||
|
[],
|
||||||
|
[
|
||||||
|
send: fn _ -> nil end
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]) do
|
||||||
|
SideEffects.handle_after_transaction(meta)
|
||||||
|
|
||||||
|
assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
|
||||||
|
assert called(Pleroma.Web.Push.send(notification))
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
describe "delete objects" do
|
describe "delete objects" do
|
||||||
setup do
|
setup do
|
||||||
user = insert(:user)
|
user = insert(:user)
|
||||||
|
@ -361,22 +402,47 @@ test "it creates a Chat and ChatMessageReferences for the local users and bumps
|
||||||
|
|
||||||
{:ok, create_activity, _meta} = ActivityPub.persist(create_activity_data, local: false)
|
{:ok, create_activity, _meta} = ActivityPub.persist(create_activity_data, local: false)
|
||||||
|
|
||||||
{:ok, _create_activity, _meta} =
|
with_mocks([
|
||||||
SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
|
{
|
||||||
|
Pleroma.Web.Streamer,
|
||||||
|
[],
|
||||||
|
[
|
||||||
|
stream: fn _, _ -> nil end
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Pleroma.Web.Push,
|
||||||
|
[],
|
||||||
|
[
|
||||||
|
send: fn _ -> nil end
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]) do
|
||||||
|
{:ok, _create_activity, meta} =
|
||||||
|
SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
|
||||||
|
|
||||||
chat = Chat.get(author.id, recipient.ap_id)
|
# The notification gets created
|
||||||
|
assert [notification] = meta[:created_notifications]
|
||||||
|
assert notification.activity_id == create_activity.id
|
||||||
|
|
||||||
[cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all()
|
# But it is not sent out
|
||||||
|
refute called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
|
||||||
|
refute called(Pleroma.Web.Push.send(notification))
|
||||||
|
|
||||||
assert cm_ref.object.data["content"] == "hey"
|
chat = Chat.get(author.id, recipient.ap_id)
|
||||||
assert cm_ref.unread == false
|
|
||||||
|
|
||||||
chat = Chat.get(recipient.id, author.ap_id)
|
[cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all()
|
||||||
|
|
||||||
[cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all()
|
assert cm_ref.object.data["content"] == "hey"
|
||||||
|
assert cm_ref.unread == false
|
||||||
|
|
||||||
assert cm_ref.object.data["content"] == "hey"
|
chat = Chat.get(recipient.id, author.ap_id)
|
||||||
assert cm_ref.unread == true
|
|
||||||
|
[cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all()
|
||||||
|
|
||||||
|
assert cm_ref.object.data["content"] == "hey"
|
||||||
|
assert cm_ref.unread == true
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
test "it creates a Chat for the local users and bumps the unread count" do
|
test "it creates a Chat for the local users and bumps the unread count" do
|
||||||
|
|
|
@ -7,6 +7,7 @@ defmodule Pleroma.Web.CommonAPITest do
|
||||||
alias Pleroma.Activity
|
alias Pleroma.Activity
|
||||||
alias Pleroma.Chat
|
alias Pleroma.Chat
|
||||||
alias Pleroma.Conversation.Participation
|
alias Pleroma.Conversation.Participation
|
||||||
|
alias Pleroma.Notification
|
||||||
alias Pleroma.Object
|
alias Pleroma.Object
|
||||||
alias Pleroma.User
|
alias Pleroma.User
|
||||||
alias Pleroma.Web.ActivityPub.ActivityPub
|
alias Pleroma.Web.ActivityPub.ActivityPub
|
||||||
|
@ -39,15 +40,41 @@ test "it posts a chat message without content but with an attachment" do
|
||||||
|
|
||||||
{:ok, upload} = ActivityPub.upload(file, actor: author.ap_id)
|
{:ok, upload} = ActivityPub.upload(file, actor: author.ap_id)
|
||||||
|
|
||||||
{:ok, activity} =
|
with_mocks([
|
||||||
CommonAPI.post_chat_message(
|
{
|
||||||
author,
|
Pleroma.Web.Streamer,
|
||||||
recipient,
|
[],
|
||||||
nil,
|
[
|
||||||
media_id: upload.id
|
stream: fn _, _ ->
|
||||||
)
|
nil
|
||||||
|
end
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Pleroma.Web.Push,
|
||||||
|
[],
|
||||||
|
[
|
||||||
|
send: fn _ -> nil end
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]) do
|
||||||
|
{:ok, activity} =
|
||||||
|
CommonAPI.post_chat_message(
|
||||||
|
author,
|
||||||
|
recipient,
|
||||||
|
nil,
|
||||||
|
media_id: upload.id
|
||||||
|
)
|
||||||
|
|
||||||
assert activity
|
notification =
|
||||||
|
Notification.for_user_and_activity(recipient, activity)
|
||||||
|
|> Repo.preload(:activity)
|
||||||
|
|
||||||
|
assert called(Pleroma.Web.Push.send(notification))
|
||||||
|
assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification))
|
||||||
|
|
||||||
|
assert activity
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
test "it adds html newlines" do
|
test "it adds html newlines" do
|
||||||
|
|
Loading…
Reference in a new issue