From c86a88edec75223f650faa2bb442c09aa95ad694 Mon Sep 17 00:00:00 2001 From: lain Date: Fri, 29 May 2020 15:24:41 +0200 Subject: [PATCH] Streamer: Add a chat message stream. --- lib/pleroma/web/streamer/streamer.ex | 23 ++++++++++++++++++++++- lib/pleroma/web/views/streamer_view.ex | 19 +++++++++++++++++++ test/web/streamer/streamer_test.exs | 24 ++++++++++++++++++++++++ 3 files changed, 65 insertions(+), 1 deletion(-) diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex index 49a400df7..331490a78 100644 --- a/lib/pleroma/web/streamer/streamer.ex +++ b/lib/pleroma/web/streamer/streamer.ex @@ -10,6 +10,7 @@ defmodule Pleroma.Web.Streamer do alias Pleroma.Conversation.Participation alias Pleroma.Notification alias Pleroma.Object + alias Pleroma.Repo alias Pleroma.User alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.Visibility @@ -22,7 +23,7 @@ defmodule Pleroma.Web.Streamer do def registry, do: @registry @public_streams ["public", "public:local", "public:media", "public:local:media"] - @user_streams ["user", "user:notification", "direct"] + @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"] @doc "Expands and authorizes a stream, and registers the process for streaming." @spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) :: @@ -200,6 +201,26 @@ defp do_stream(topic, %Notification{} = item) end) end + defp do_stream(topic, %{data: %{"type" => "ChatMessage"}} = object) + when topic in ["user", "user:pleroma_chat"] do + recipients = [object.data["actor"] | object.data["to"]] + + topics = + %{ap_id: recipients, local: true} + |> Pleroma.User.Query.build() + |> Repo.all() + |> Enum.map(fn %{id: id} = user -> {user, "#{topic}:#{id}"} end) + + Enum.each(topics, fn {user, topic} -> + Registry.dispatch(@registry, topic, fn list -> + Enum.each(list, fn {pid, _auth} -> + text = StreamerView.render("chat_update.json", object, user, recipients) + send(pid, {:text, text}) + end) + end) + end) + end + defp do_stream("user", item) do Logger.debug("Trying to push to users") diff --git a/lib/pleroma/web/views/streamer_view.ex b/lib/pleroma/web/views/streamer_view.ex index 237b29ded..949e2ed37 100644 --- a/lib/pleroma/web/views/streamer_view.ex +++ b/lib/pleroma/web/views/streamer_view.ex @@ -6,11 +6,30 @@ defmodule Pleroma.Web.StreamerView do use Pleroma.Web, :view alias Pleroma.Activity + alias Pleroma.Chat alias Pleroma.Conversation.Participation alias Pleroma.Notification alias Pleroma.User alias Pleroma.Web.MastodonAPI.NotificationView + def render("chat_update.json", object, user, recipients) do + chat = Chat.get(user.id, hd(recipients -- [user.ap_id])) + + representation = + Pleroma.Web.PleromaAPI.ChatMessageView.render( + "show.json", + %{object: object, chat: chat} + ) + + %{ + event: "pleroma:chat_update", + payload: + representation + |> Jason.encode!() + } + |> Jason.encode!() + end + def render("update.json", %Activity{} = activity, %User{} = user) do %{ event: "update", diff --git a/test/web/streamer/streamer_test.exs b/test/web/streamer/streamer_test.exs index 115ba4703..ffbff35ca 100644 --- a/test/web/streamer/streamer_test.exs +++ b/test/web/streamer/streamer_test.exs @@ -9,9 +9,11 @@ defmodule Pleroma.Web.StreamerTest do alias Pleroma.Conversation.Participation alias Pleroma.List + alias Pleroma.Object alias Pleroma.User alias Pleroma.Web.CommonAPI alias Pleroma.Web.Streamer + alias Pleroma.Web.StreamerView @moduletag needs_streamer: true, capture_log: true @@ -126,6 +128,28 @@ test "it sends notify to in the 'user:notification' stream", %{user: user, notif refute Streamer.filtered_by_user?(user, notify) end + test "it sends chat messages to the 'user:pleroma_chat' stream", %{user: user} do + other_user = insert(:user) + + {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey") + object = Object.normalize(create_activity, false) + Streamer.get_topic_and_add_socket("user:pleroma_chat", user) + Streamer.stream("user:pleroma_chat", object) + text = StreamerView.render("chat_update.json", object, user, [user.ap_id, other_user.ap_id]) + assert_receive {:text, ^text} + end + + test "it sends chat messages to the 'user' stream", %{user: user} do + other_user = insert(:user) + + {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey") + object = Object.normalize(create_activity, false) + Streamer.get_topic_and_add_socket("user", user) + Streamer.stream("user", object) + text = StreamerView.render("chat_update.json", object, user, [user.ap_id, other_user.ap_id]) + assert_receive {:text, ^text} + end + test "it sends chat message notifications to the 'user:notification' stream", %{user: user} do other_user = insert(:user)