Streamer: Stream out Conversations/Participations.

This commit is contained in:
lain 2019-05-03 13:39:14 +02:00
parent 8af55728e4
commit 81d1aa424d
7 changed files with 98 additions and 7 deletions

View file

@ -63,10 +63,13 @@ def create_or_bump_for(activity) do
participation participation
end) end)
%{ {:ok,
conversation %{
| participations: participations conversation
} | participations: participations
}}
else
e -> {:error, e}
end end
end end
end end

View file

@ -142,8 +142,14 @@ def insert(map, local \\ true, fake \\ false) when is_map(map) do
end) end)
Notification.create_notifications(activity) Notification.create_notifications(activity)
Conversation.create_or_bump_for(activity)
participations =
activity
|> Conversation.create_or_bump_for()
|> get_participations()
stream_out(activity) stream_out(activity)
stream_out_participations(participations)
{:ok, activity} {:ok, activity}
else else
%Activity{} = activity -> %Activity{} = activity ->
@ -166,6 +172,19 @@ def insert(map, local \\ true, fake \\ false) when is_map(map) do
end end
end end
defp get_participations({:ok, %{participations: participations}}), do: participations
defp get_participations(_), do: []
def stream_out_participations(participations) do
participations =
participations
|> Repo.preload(:user)
Enum.each(participations, fn participation ->
Pleroma.Web.Streamer.stream("participation", participation)
end)
end
def stream_out(activity) do def stream_out(activity) do
public = "https://www.w3.org/ns/activitystreams#Public" public = "https://www.w3.org/ns/activitystreams#Public"
@ -197,6 +216,7 @@ def stream_out(activity) do
end end
end end
else else
# TODO: Write test, replace with visibility test
if !Enum.member?(activity.data["cc"] || [], public) && if !Enum.member?(activity.data["cc"] || [], public) &&
!Enum.member?( !Enum.member?(
activity.data["to"], activity.data["to"],

View file

@ -5,6 +5,7 @@
defmodule Pleroma.Web.Streamer do defmodule Pleroma.Web.Streamer do
use GenServer use GenServer
require Logger require Logger
alias Pleroma.Conversation.Participation
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Notification alias Pleroma.Notification
alias Pleroma.Object alias Pleroma.Object
@ -71,6 +72,15 @@ def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
{:noreply, topics} {:noreply, topics}
end end
def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do
user_topic = "direct:#{participation.user_id}"
Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
push_to_socket(topics, user_topic, participation)
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
# filter the recipient list if the activity is not public, see #270. # filter the recipient list if the activity is not public, see #270.
recipient_lists = recipient_lists =
@ -192,6 +202,19 @@ defp represent_update(%Activity{} = activity) do
|> Jason.encode!() |> Jason.encode!()
end end
def represent_conversation(%Participation{} = participation) do
%{
event: "conversation",
payload:
Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
participation: participation,
user: participation.user
})
|> Jason.encode!()
}
|> Jason.encode!()
end
def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
Enum.each(topics[topic] || [], fn socket -> Enum.each(topics[topic] || [], fn socket ->
# Get the current user so we have up-to-date blocks etc. # Get the current user so we have up-to-date blocks etc.
@ -214,6 +237,12 @@ def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = ite
end) end)
end end
def push_to_socket(topics, topic, %Participation{} = participation) do
Enum.each(topics[topic] || [], fn socket ->
send(socket.transport_pid, {:text, represent_conversation(participation)})
end)
end
def push_to_socket(topics, topic, %Activity{ def push_to_socket(topics, topic, %Activity{
data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id} data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
}) do }) do

View file

@ -87,7 +87,7 @@ defp deps do
{:bbcode, "~> 0.1"}, {:bbcode, "~> 0.1"},
{:ex_machina, "~> 2.3", only: :test}, {:ex_machina, "~> 2.3", only: :test},
{:credo, "~> 0.9.3", only: [:dev, :test]}, {:credo, "~> 0.9.3", only: [:dev, :test]},
{:mock, "~> 0.3.1", only: :test}, {:mock, "~> 0.3.3", only: :test},
{:crypt, {:crypt,
git: "https://github.com/msantos/crypt", ref: "1f2b58927ab57e72910191a7ebaeff984382a1d3"}, git: "https://github.com/msantos/crypt", ref: "1f2b58927ab57e72910191a7ebaeff984382a1d3"},
{:cors_plug, "~> 1.5"}, {:cors_plug, "~> 1.5"},

View file

@ -43,7 +43,7 @@
"mime": {:hex, :mime, "1.3.1", "30ce04ab3175b6ad0bdce0035cba77bba68b813d523d1aac73d9781b4d193cf8", [:mix], [], "hexpm"}, "mime": {:hex, :mime, "1.3.1", "30ce04ab3175b6ad0bdce0035cba77bba68b813d523d1aac73d9781b4d193cf8", [:mix], [], "hexpm"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm"},
"mochiweb": {:hex, :mochiweb, "2.15.0", "e1daac474df07651e5d17cc1e642c4069c7850dc4508d3db7263a0651330aacc", [:rebar3], [], "hexpm"}, "mochiweb": {:hex, :mochiweb, "2.15.0", "e1daac474df07651e5d17cc1e642c4069c7850dc4508d3db7263a0651330aacc", [:rebar3], [], "hexpm"},
"mock": {:hex, :mock, "0.3.1", "994f00150f79a0ea50dc9d86134cd9ebd0d177ad60bd04d1e46336cdfdb98ff9", [:mix], [{:meck, "~> 0.8.8", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm"}, "mock": {:hex, :mock, "0.3.3", "42a433794b1291a9cf1525c6d26b38e039e0d3a360732b5e467bfc77ef26c914", [:mix], [{:meck, "~> 0.8.13", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm"},
"mogrify": {:hex, :mogrify, "0.6.1", "de1b527514f2d95a7bbe9642eb556061afb337e220cf97adbf3a4e6438ed70af", [:mix], [], "hexpm"}, "mogrify": {:hex, :mogrify, "0.6.1", "de1b527514f2d95a7bbe9642eb556061afb337e220cf97adbf3a4e6438ed70af", [:mix], [], "hexpm"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"}, "nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"},
"parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"}, "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"},

View file

@ -117,4 +117,21 @@ test "it creates or updates a conversation and participations for a given DM" do
tridi.id == user_id tridi.id == user_id
end) end)
end end
test "create_or_bump_for returns the conversation with participations" do
har = insert(:user)
jafnhar = insert(:user, local: false)
{:ok, activity} =
CommonAPI.post(har, %{"status" => "Hey @#{jafnhar.nickname}", "visibility" => "direct"})
{:ok, conversation} = Conversation.create_or_bump_for(activity)
assert length(conversation.participations) == 2
{:ok, activity} =
CommonAPI.post(har, %{"status" => "Hey @#{jafnhar.nickname}", "visibility" => "public"})
assert {:error, _} = Conversation.create_or_bump_for(activity)
end
end end

View file

@ -22,6 +22,28 @@ defmodule Pleroma.Web.ActivityPub.ActivityPubTest do
:ok :ok
end end
describe "streaming out participations" do
test "it streams them out" do
user = insert(:user)
{:ok, activity} = CommonAPI.post(user, %{"status" => ".", "visibility" => "direct"})
{:ok, conversation} = Pleroma.Conversation.create_or_bump_for(activity)
participations =
conversation.participations
|> Repo.preload(:user)
with_mock Pleroma.Web.Streamer,
stream: fn _, _ -> nil end do
ActivityPub.stream_out_participations(conversation.participations)
Enum.each(participations, fn participation ->
assert called(Pleroma.Web.Streamer.stream("participation", participation))
end)
end
end
end
describe "fetching restricted by visibility" do describe "fetching restricted by visibility" do
test "it restricts by the appropriate visibility" do test "it restricts by the appropriate visibility" do
user = insert(:user) user = insert(:user)