Streamer, SideEffects: Stream out ChatMessageReferences
Saves us a few calles to fetch things from the DB that we already have.
This commit is contained in:
parent
903955b189
commit
fb4ae9c720
5 changed files with 58 additions and 50 deletions
|
@ -140,11 +140,15 @@ def handle_object_creation(%{"type" => "ChatMessage"} = object, meta) do
|
||||||
|> Enum.each(fn [user, other_user] ->
|
|> Enum.each(fn [user, other_user] ->
|
||||||
if user.local do
|
if user.local do
|
||||||
{:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
|
{:ok, chat} = Chat.bump_or_create(user.id, other_user.ap_id)
|
||||||
ChatMessageReference.create(chat, object, user.ap_id == actor.ap_id)
|
{:ok, cm_ref} = ChatMessageReference.create(chat, object, user.ap_id == actor.ap_id)
|
||||||
|
|
||||||
|
Streamer.stream(
|
||||||
|
["user", "user:pleroma_chat"],
|
||||||
|
{user, %{cm_ref | chat: chat, object: object}}
|
||||||
|
)
|
||||||
end
|
end
|
||||||
end)
|
end)
|
||||||
|
|
||||||
Streamer.stream(["user", "user:pleroma_chat"], object)
|
|
||||||
{:ok, object, meta}
|
{:ok, object, meta}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -6,11 +6,11 @@ defmodule Pleroma.Web.Streamer do
|
||||||
require Logger
|
require Logger
|
||||||
|
|
||||||
alias Pleroma.Activity
|
alias Pleroma.Activity
|
||||||
|
alias Pleroma.ChatMessageReference
|
||||||
alias Pleroma.Config
|
alias Pleroma.Config
|
||||||
alias Pleroma.Conversation.Participation
|
alias Pleroma.Conversation.Participation
|
||||||
alias Pleroma.Notification
|
alias Pleroma.Notification
|
||||||
alias Pleroma.Object
|
alias Pleroma.Object
|
||||||
alias Pleroma.Repo
|
|
||||||
alias Pleroma.User
|
alias Pleroma.User
|
||||||
alias Pleroma.Web.ActivityPub.ActivityPub
|
alias Pleroma.Web.ActivityPub.ActivityPub
|
||||||
alias Pleroma.Web.ActivityPub.Visibility
|
alias Pleroma.Web.ActivityPub.Visibility
|
||||||
|
@ -201,24 +201,17 @@ defp do_stream(topic, %Notification{} = item)
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp do_stream(topic, %{data: %{"type" => "ChatMessage"}} = object)
|
defp do_stream(topic, {user, %ChatMessageReference{} = cm_ref})
|
||||||
when topic in ["user", "user:pleroma_chat"] do
|
when topic in ["user", "user:pleroma_chat"] do
|
||||||
recipients = [object.data["actor"] | object.data["to"]]
|
topic = "#{topic}:#{user.id}"
|
||||||
|
|
||||||
topics =
|
text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
|
||||||
%{ap_id: recipients, local: true}
|
|
||||||
|> Pleroma.User.Query.build()
|
|
||||||
|> Repo.all()
|
|
||||||
|> Enum.map(fn %{id: id} = user -> {user, "#{topic}:#{id}"} end)
|
|
||||||
|
|
||||||
Enum.each(topics, fn {user, topic} ->
|
|
||||||
Registry.dispatch(@registry, topic, fn list ->
|
Registry.dispatch(@registry, topic, fn list ->
|
||||||
Enum.each(list, fn {pid, _auth} ->
|
Enum.each(list, fn {pid, _auth} ->
|
||||||
text = StreamerView.render("chat_update.json", object, user, recipients)
|
|
||||||
send(pid, {:text, text})
|
send(pid, {:text, text})
|
||||||
end)
|
end)
|
||||||
end)
|
end)
|
||||||
end)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
defp do_stream("user", item) do
|
defp do_stream("user", item) do
|
||||||
|
|
|
@ -6,36 +6,11 @@ defmodule Pleroma.Web.StreamerView do
|
||||||
use Pleroma.Web, :view
|
use Pleroma.Web, :view
|
||||||
|
|
||||||
alias Pleroma.Activity
|
alias Pleroma.Activity
|
||||||
alias Pleroma.Chat
|
|
||||||
alias Pleroma.ChatMessageReference
|
|
||||||
alias Pleroma.Conversation.Participation
|
alias Pleroma.Conversation.Participation
|
||||||
alias Pleroma.Notification
|
alias Pleroma.Notification
|
||||||
alias Pleroma.User
|
alias Pleroma.User
|
||||||
alias Pleroma.Web.MastodonAPI.NotificationView
|
alias Pleroma.Web.MastodonAPI.NotificationView
|
||||||
|
|
||||||
def render("chat_update.json", object, user, recipients) do
|
|
||||||
chat = Chat.get(user.id, hd(recipients -- [user.ap_id]))
|
|
||||||
|
|
||||||
# Explicitly giving the cmr for the object here, so we don't accidentally
|
|
||||||
# send a later 'last_message' that was inserted between inserting this and
|
|
||||||
# streaming it out
|
|
||||||
cm_ref = ChatMessageReference.for_chat_and_object(chat, object)
|
|
||||||
|
|
||||||
representation =
|
|
||||||
Pleroma.Web.PleromaAPI.ChatView.render(
|
|
||||||
"show.json",
|
|
||||||
%{last_message: cm_ref, chat: chat}
|
|
||||||
)
|
|
||||||
|
|
||||||
%{
|
|
||||||
event: "pleroma:chat_update",
|
|
||||||
payload:
|
|
||||||
representation
|
|
||||||
|> Jason.encode!()
|
|
||||||
}
|
|
||||||
|> Jason.encode!()
|
|
||||||
end
|
|
||||||
|
|
||||||
def render("update.json", %Activity{} = activity, %User{} = user) do
|
def render("update.json", %Activity{} = activity, %User{} = user) do
|
||||||
%{
|
%{
|
||||||
event: "update",
|
event: "update",
|
||||||
|
@ -76,6 +51,27 @@ def render("update.json", %Activity{} = activity) do
|
||||||
|> Jason.encode!()
|
|> Jason.encode!()
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def render("chat_update.json", %{chat_message_reference: cm_ref}) do
|
||||||
|
# Explicitly giving the cmr for the object here, so we don't accidentally
|
||||||
|
# send a later 'last_message' that was inserted between inserting this and
|
||||||
|
# streaming it out
|
||||||
|
Logger.debug("Trying to stream out #{inspect(cm_ref)}")
|
||||||
|
|
||||||
|
representation =
|
||||||
|
Pleroma.Web.PleromaAPI.ChatView.render(
|
||||||
|
"show.json",
|
||||||
|
%{last_message: cm_ref, chat: cm_ref.chat}
|
||||||
|
)
|
||||||
|
|
||||||
|
%{
|
||||||
|
event: "pleroma:chat_update",
|
||||||
|
payload:
|
||||||
|
representation
|
||||||
|
|> Jason.encode!()
|
||||||
|
}
|
||||||
|
|> Jason.encode!()
|
||||||
|
end
|
||||||
|
|
||||||
def render("conversation.json", %Participation{} = participation) do
|
def render("conversation.json", %Participation{} = participation) do
|
||||||
%{
|
%{
|
||||||
event: "conversation",
|
event: "conversation",
|
||||||
|
|
|
@ -325,9 +325,8 @@ test "it streams the created ChatMessage" do
|
||||||
{:ok, _create_activity, _meta} =
|
{:ok, _create_activity, _meta} =
|
||||||
SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
|
SideEffects.handle(create_activity, local: false, object_data: chat_message_data)
|
||||||
|
|
||||||
object = Object.normalize(create_activity, false)
|
assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], {author, :_}))
|
||||||
|
assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], {recipient, :_}))
|
||||||
assert called(Pleroma.Web.Streamer.stream(["user", "user:pleroma_chat"], object))
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,8 @@ defmodule Pleroma.Web.StreamerTest do
|
||||||
|
|
||||||
import Pleroma.Factory
|
import Pleroma.Factory
|
||||||
|
|
||||||
|
alias Pleroma.Chat
|
||||||
|
alias Pleroma.ChatMessageReference
|
||||||
alias Pleroma.Conversation.Participation
|
alias Pleroma.Conversation.Participation
|
||||||
alias Pleroma.List
|
alias Pleroma.List
|
||||||
alias Pleroma.Object
|
alias Pleroma.Object
|
||||||
|
@ -150,22 +152,36 @@ test "it sends notify to in the 'user:notification' stream", %{user: user, notif
|
||||||
test "it sends chat messages to the 'user:pleroma_chat' stream", %{user: user} do
|
test "it sends chat messages to the 'user:pleroma_chat' stream", %{user: user} do
|
||||||
other_user = insert(:user)
|
other_user = insert(:user)
|
||||||
|
|
||||||
{:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey")
|
{:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno")
|
||||||
object = Object.normalize(create_activity, false)
|
object = Object.normalize(create_activity, false)
|
||||||
|
chat = Chat.get(user.id, other_user.ap_id)
|
||||||
|
cm_ref = ChatMessageReference.for_chat_and_object(chat, object)
|
||||||
|
cm_ref = %{cm_ref | chat: chat, object: object}
|
||||||
|
|
||||||
Streamer.get_topic_and_add_socket("user:pleroma_chat", user)
|
Streamer.get_topic_and_add_socket("user:pleroma_chat", user)
|
||||||
Streamer.stream("user:pleroma_chat", object)
|
Streamer.stream("user:pleroma_chat", {user, cm_ref})
|
||||||
text = StreamerView.render("chat_update.json", object, user, [user.ap_id, other_user.ap_id])
|
|
||||||
|
text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
|
||||||
|
|
||||||
|
assert text =~ "hey cirno"
|
||||||
assert_receive {:text, ^text}
|
assert_receive {:text, ^text}
|
||||||
end
|
end
|
||||||
|
|
||||||
test "it sends chat messages to the 'user' stream", %{user: user} do
|
test "it sends chat messages to the 'user' stream", %{user: user} do
|
||||||
other_user = insert(:user)
|
other_user = insert(:user)
|
||||||
|
|
||||||
{:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey")
|
{:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno")
|
||||||
object = Object.normalize(create_activity, false)
|
object = Object.normalize(create_activity, false)
|
||||||
|
chat = Chat.get(user.id, other_user.ap_id)
|
||||||
|
cm_ref = ChatMessageReference.for_chat_and_object(chat, object)
|
||||||
|
cm_ref = %{cm_ref | chat: chat, object: object}
|
||||||
|
|
||||||
Streamer.get_topic_and_add_socket("user", user)
|
Streamer.get_topic_and_add_socket("user", user)
|
||||||
Streamer.stream("user", object)
|
Streamer.stream("user", {user, cm_ref})
|
||||||
text = StreamerView.render("chat_update.json", object, user, [user.ap_id, other_user.ap_id])
|
|
||||||
|
text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
|
||||||
|
|
||||||
|
assert text =~ "hey cirno"
|
||||||
assert_receive {:text, ^text}
|
assert_receive {:text, ^text}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue