diff --git a/lib/pleroma/notification.ex b/lib/pleroma/notification.ex index e5b880b10..49e27c05a 100644 --- a/lib/pleroma/notification.ex +++ b/lib/pleroma/notification.ex @@ -334,30 +334,34 @@ defmodule Pleroma.Notification do end end - def create_notifications(%Activity{data: %{"to" => _, "type" => "Create"}} = activity) do + def create_notifications(activity, options \\ []) + + def create_notifications(%Activity{data: %{"to" => _, "type" => "Create"}} = activity, options) do object = Object.normalize(activity, false) if object && object.data["type"] == "Answer" do {:ok, []} else - do_create_notifications(activity) + do_create_notifications(activity, options) end end - def create_notifications(%Activity{data: %{"type" => type}} = activity) + def create_notifications(%Activity{data: %{"type" => type}} = activity, options) when type in ["Follow", "Like", "Announce", "Move", "EmojiReact"] do - do_create_notifications(activity) + do_create_notifications(activity, options) end - def create_notifications(_), do: {:ok, []} + def create_notifications(_, _), do: {:ok, []} + + defp do_create_notifications(%Activity{} = activity, options) do + do_send = Keyword.get(options, :do_send, true) - defp do_create_notifications(%Activity{} = activity) do {enabled_receivers, disabled_receivers} = get_notified_from_activity(activity) potential_receivers = enabled_receivers ++ disabled_receivers notifications = Enum.map(potential_receivers, fn user -> - do_send = user in enabled_receivers + do_send = do_send && user in enabled_receivers create_notification(activity, user, do_send) end) @@ -623,4 +627,12 @@ defmodule Pleroma.Notification do end def skip?(_, _, _), do: false + + def for_user_and_activity(user, activity) do + from(n in __MODULE__, + where: n.user_id == ^user.id, + where: n.activity_id == ^activity.id + ) + |> Repo.one() + end end diff --git a/lib/pleroma/web/activity_pub/pipeline.ex b/lib/pleroma/web/activity_pub/pipeline.ex index 0c54c4b23..6875c47f6 100644 --- a/lib/pleroma/web/activity_pub/pipeline.ex +++ b/lib/pleroma/web/activity_pub/pipeline.ex @@ -17,6 +17,10 @@ defmodule Pleroma.Web.ActivityPub.Pipeline do {:ok, Activity.t() | Object.t(), keyword()} | {:error, any()} def common_pipeline(object, meta) do case Repo.transaction(fn -> do_common_pipeline(object, meta) end) do + {:ok, {:ok, activity, meta}} -> + SideEffects.handle_after_transaction(meta) + {:ok, activity, meta} + {:ok, value} -> value diff --git a/lib/pleroma/web/activity_pub/side_effects.ex b/lib/pleroma/web/activity_pub/side_effects.ex index b3aacff40..10136789a 100644 --- a/lib/pleroma/web/activity_pub/side_effects.ex +++ b/lib/pleroma/web/activity_pub/side_effects.ex @@ -16,6 +16,7 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do alias Pleroma.Web.ActivityPub.Pipeline alias Pleroma.Web.ActivityPub.Utils alias Pleroma.Web.Streamer + alias Pleroma.Web.Push def handle(object, meta \\ []) @@ -37,7 +38,12 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do # - Set up notifications def handle(%{data: %{"type" => "Create"}} = activity, meta) do with {:ok, _object, _meta} <- handle_object_creation(meta[:object_data], meta) do - Notification.create_notifications(activity) + {:ok, notifications} = Notification.create_notifications(activity, do_send: false) + + meta = + meta + |> add_notifications(notifications) + {:ok, activity, meta} else e -> Repo.rollback(e) @@ -200,4 +206,26 @@ defmodule Pleroma.Web.ActivityPub.SideEffects do end def handle_undoing(object), do: {:error, ["don't know how to handle", object]} + + defp send_notifications(meta) do + Keyword.get(meta, :created_notifications, []) + |> Enum.each(fn notification -> + Streamer.stream(["user", "user:notification"], notification) + Push.send(notification) + end) + + meta + end + + defp add_notifications(meta, notifications) do + existing = Keyword.get(meta, :created_notifications, []) + + meta + |> Keyword.put(:created_notifications, notifications ++ existing) + end + + def handle_after_transaction(meta) do + meta + |> send_notifications() + end end diff --git a/test/web/activity_pub/pipeline_test.exs b/test/web/activity_pub/pipeline_test.exs index 26557720b..8deb64501 100644 --- a/test/web/activity_pub/pipeline_test.exs +++ b/test/web/activity_pub/pipeline_test.exs @@ -33,7 +33,10 @@ defmodule Pleroma.Web.ActivityPub.PipelineTest do { Pleroma.Web.ActivityPub.SideEffects, [], - [handle: fn o, m -> {:ok, o, m} end] + [ + handle: fn o, m -> {:ok, o, m} end, + handle_after_transaction: fn m -> m end + ] }, { Pleroma.Web.Federator, @@ -71,7 +74,7 @@ defmodule Pleroma.Web.ActivityPub.PipelineTest do { Pleroma.Web.ActivityPub.SideEffects, [], - [handle: fn o, m -> {:ok, o, m} end] + [handle: fn o, m -> {:ok, o, m} end, handle_after_transaction: fn m -> m end] }, { Pleroma.Web.Federator, @@ -110,7 +113,7 @@ defmodule Pleroma.Web.ActivityPub.PipelineTest do { Pleroma.Web.ActivityPub.SideEffects, [], - [handle: fn o, m -> {:ok, o, m} end] + [handle: fn o, m -> {:ok, o, m} end, handle_after_transaction: fn m -> m end] }, { Pleroma.Web.Federator, diff --git a/test/web/activity_pub/side_effects_test.exs b/test/web/activity_pub/side_effects_test.exs index 40df664eb..43ffe1337 100644 --- a/test/web/activity_pub/side_effects_test.exs +++ b/test/web/activity_pub/side_effects_test.exs @@ -22,6 +22,47 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do import Pleroma.Factory import Mock + describe "handle_after_transaction" do + test "it streams out notifications" do + author = insert(:user, local: true) + recipient = insert(:user, local: true) + + {:ok, chat_message_data, _meta} = Builder.chat_message(author, recipient.ap_id, "hey") + + {:ok, create_activity_data, _meta} = + Builder.create(author, chat_message_data["id"], [recipient.ap_id]) + + {:ok, create_activity, _meta} = ActivityPub.persist(create_activity_data, local: false) + + {:ok, _create_activity, meta} = + SideEffects.handle(create_activity, local: false, object_data: chat_message_data) + + assert [notification] = meta[:created_notifications] + + with_mocks([ + { + Pleroma.Web.Streamer, + [], + [ + stream: fn _, _ -> nil end + ] + }, + { + Pleroma.Web.Push, + [], + [ + send: fn _ -> nil end + ] + } + ]) do + SideEffects.handle_after_transaction(meta) + + assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification)) + assert called(Pleroma.Web.Push.send(notification)) + end + end + end + describe "delete objects" do setup do user = insert(:user) @@ -361,22 +402,47 @@ defmodule Pleroma.Web.ActivityPub.SideEffectsTest do {:ok, create_activity, _meta} = ActivityPub.persist(create_activity_data, local: false) - {:ok, _create_activity, _meta} = - SideEffects.handle(create_activity, local: false, object_data: chat_message_data) + with_mocks([ + { + Pleroma.Web.Streamer, + [], + [ + stream: fn _, _ -> nil end + ] + }, + { + Pleroma.Web.Push, + [], + [ + send: fn _ -> nil end + ] + } + ]) do + {:ok, _create_activity, meta} = + SideEffects.handle(create_activity, local: false, object_data: chat_message_data) - chat = Chat.get(author.id, recipient.ap_id) + # The notification gets created + assert [notification] = meta[:created_notifications] + assert notification.activity_id == create_activity.id - [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all() + # But it is not sent out + refute called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification)) + refute called(Pleroma.Web.Push.send(notification)) - assert cm_ref.object.data["content"] == "hey" - assert cm_ref.unread == false + chat = Chat.get(author.id, recipient.ap_id) - chat = Chat.get(recipient.id, author.ap_id) + [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all() - [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all() + assert cm_ref.object.data["content"] == "hey" + assert cm_ref.unread == false - assert cm_ref.object.data["content"] == "hey" - assert cm_ref.unread == true + chat = Chat.get(recipient.id, author.ap_id) + + [cm_ref] = ChatMessageReference.for_chat_query(chat) |> Repo.all() + + assert cm_ref.object.data["content"] == "hey" + assert cm_ref.unread == true + end end test "it creates a Chat for the local users and bumps the unread count" do diff --git a/test/web/common_api/common_api_test.exs b/test/web/common_api/common_api_test.exs index 611a9ae66..63b59820e 100644 --- a/test/web/common_api/common_api_test.exs +++ b/test/web/common_api/common_api_test.exs @@ -7,6 +7,7 @@ defmodule Pleroma.Web.CommonAPITest do alias Pleroma.Activity alias Pleroma.Chat alias Pleroma.Conversation.Participation + alias Pleroma.Notification alias Pleroma.Object alias Pleroma.User alias Pleroma.Web.ActivityPub.ActivityPub @@ -39,15 +40,41 @@ defmodule Pleroma.Web.CommonAPITest do {:ok, upload} = ActivityPub.upload(file, actor: author.ap_id) - {:ok, activity} = - CommonAPI.post_chat_message( - author, - recipient, - nil, - media_id: upload.id - ) + with_mocks([ + { + Pleroma.Web.Streamer, + [], + [ + stream: fn _, _ -> + nil + end + ] + }, + { + Pleroma.Web.Push, + [], + [ + send: fn _ -> nil end + ] + } + ]) do + {:ok, activity} = + CommonAPI.post_chat_message( + author, + recipient, + nil, + media_id: upload.id + ) - assert activity + notification = + Notification.for_user_and_activity(recipient, activity) + |> Repo.preload(:activity) + + assert called(Pleroma.Web.Push.send(notification)) + assert called(Pleroma.Web.Streamer.stream(["user", "user:notification"], notification)) + + assert activity + end end test "it adds html newlines" do