Streamer: Add a chat message stream.

This commit is contained in:
lain 2020-05-29 15:24:41 +02:00
parent af6d01ec93
commit c86a88edec
3 changed files with 65 additions and 1 deletions

View file

@ -10,6 +10,7 @@ defmodule Pleroma.Web.Streamer do
alias Pleroma.Conversation.Participation alias Pleroma.Conversation.Participation
alias Pleroma.Notification alias Pleroma.Notification
alias Pleroma.Object alias Pleroma.Object
alias Pleroma.Repo
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.ActivityPub.Visibility
@ -22,7 +23,7 @@ defmodule Pleroma.Web.Streamer do
def registry, do: @registry def registry, do: @registry
@public_streams ["public", "public:local", "public:media", "public:local:media"] @public_streams ["public", "public:local", "public:media", "public:local:media"]
@user_streams ["user", "user:notification", "direct"] @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
@doc "Expands and authorizes a stream, and registers the process for streaming." @doc "Expands and authorizes a stream, and registers the process for streaming."
@spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) :: @spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) ::
@ -200,6 +201,26 @@ defp do_stream(topic, %Notification{} = item)
end) end)
end end
defp do_stream(topic, %{data: %{"type" => "ChatMessage"}} = object)
when topic in ["user", "user:pleroma_chat"] do
recipients = [object.data["actor"] | object.data["to"]]
topics =
%{ap_id: recipients, local: true}
|> Pleroma.User.Query.build()
|> Repo.all()
|> Enum.map(fn %{id: id} = user -> {user, "#{topic}:#{id}"} end)
Enum.each(topics, fn {user, topic} ->
Registry.dispatch(@registry, topic, fn list ->
Enum.each(list, fn {pid, _auth} ->
text = StreamerView.render("chat_update.json", object, user, recipients)
send(pid, {:text, text})
end)
end)
end)
end
defp do_stream("user", item) do defp do_stream("user", item) do
Logger.debug("Trying to push to users") Logger.debug("Trying to push to users")

View file

@ -6,11 +6,30 @@ defmodule Pleroma.Web.StreamerView do
use Pleroma.Web, :view use Pleroma.Web, :view
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Chat
alias Pleroma.Conversation.Participation alias Pleroma.Conversation.Participation
alias Pleroma.Notification alias Pleroma.Notification
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.MastodonAPI.NotificationView alias Pleroma.Web.MastodonAPI.NotificationView
def render("chat_update.json", object, user, recipients) do
chat = Chat.get(user.id, hd(recipients -- [user.ap_id]))
representation =
Pleroma.Web.PleromaAPI.ChatMessageView.render(
"show.json",
%{object: object, chat: chat}
)
%{
event: "pleroma:chat_update",
payload:
representation
|> Jason.encode!()
}
|> Jason.encode!()
end
def render("update.json", %Activity{} = activity, %User{} = user) do def render("update.json", %Activity{} = activity, %User{} = user) do
%{ %{
event: "update", event: "update",

View file

@ -9,9 +9,11 @@ defmodule Pleroma.Web.StreamerTest do
alias Pleroma.Conversation.Participation alias Pleroma.Conversation.Participation
alias Pleroma.List alias Pleroma.List
alias Pleroma.Object
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Streamer alias Pleroma.Web.Streamer
alias Pleroma.Web.StreamerView
@moduletag needs_streamer: true, capture_log: true @moduletag needs_streamer: true, capture_log: true
@ -126,6 +128,28 @@ test "it sends notify to in the 'user:notification' stream", %{user: user, notif
refute Streamer.filtered_by_user?(user, notify) refute Streamer.filtered_by_user?(user, notify)
end end
test "it sends chat messages to the 'user:pleroma_chat' stream", %{user: user} do
other_user = insert(:user)
{:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey")
object = Object.normalize(create_activity, false)
Streamer.get_topic_and_add_socket("user:pleroma_chat", user)
Streamer.stream("user:pleroma_chat", object)
text = StreamerView.render("chat_update.json", object, user, [user.ap_id, other_user.ap_id])
assert_receive {:text, ^text}
end
test "it sends chat messages to the 'user' stream", %{user: user} do
other_user = insert(:user)
{:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey")
object = Object.normalize(create_activity, false)
Streamer.get_topic_and_add_socket("user", user)
Streamer.stream("user", object)
text = StreamerView.render("chat_update.json", object, user, [user.ap_id, other_user.ap_id])
assert_receive {:text, ^text}
end
test "it sends chat message notifications to the 'user:notification' stream", %{user: user} do test "it sends chat message notifications to the 'user:notification' stream", %{user: user} do
other_user = insert(:user) other_user = insert(:user)