diff --git a/lib/pleroma/application.ex b/lib/pleroma/application.ex
index 308d8cffa..a00bc0624 100644
--- a/lib/pleroma/application.ex
+++ b/lib/pleroma/application.ex
@@ -173,7 +173,14 @@ defp chat_enabled?, do: Config.get([:chat, :enabled])
defp streamer_child(env) when env in [:test, :benchmark], do: []
defp streamer_child(_) do
- [Pleroma.Web.Streamer.supervisor()]
+ [
+ {Registry,
+ [
+ name: Pleroma.Web.Streamer.registry(),
+ keys: :duplicate,
+ partitions: System.schedulers_online()
+ ]}
+ ]
end
defp chat_child(_env, true) do
diff --git a/lib/pleroma/web/activity_pub/activity_pub.ex b/lib/pleroma/web/activity_pub/activity_pub.ex
index 099df5879..8baaf97ac 100644
--- a/lib/pleroma/web/activity_pub/activity_pub.ex
+++ b/lib/pleroma/web/activity_pub/activity_pub.ex
@@ -170,12 +170,6 @@ def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when
BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
- Notification.create_notifications(activity)
-
- conversation = create_or_bump_conversation(activity, map["actor"])
- participations = get_participations(conversation)
- stream_out(activity)
- stream_out_participations(participations)
{:ok, activity}
else
%Activity{} = activity ->
@@ -198,6 +192,15 @@ def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when
end
end
+ def notify_and_stream(activity) do
+ Notification.create_notifications(activity)
+
+ conversation = create_or_bump_conversation(activity, activity.actor)
+ participations = get_participations(conversation)
+ stream_out(activity)
+ stream_out_participations(participations)
+ end
+
defp create_or_bump_conversation(activity, actor) do
with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
%User{} = user <- User.get_cached_by_ap_id(actor),
@@ -274,6 +277,7 @@ defp do_create(%{to: to, actor: actor, context: context, object: object} = param
_ <- increase_poll_votes_if_vote(create_data),
{:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
{:ok, _actor} <- increase_note_count_if_public(actor, activity),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity}
else
@@ -301,6 +305,7 @@ def listen(%{to: to, actor: actor, context: context, object: object} = params) d
additional
),
{:ok, activity} <- insert(listen_data, local),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity}
end
@@ -325,6 +330,7 @@ def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
%{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
|> Utils.maybe_put("id", activity_id),
{:ok, activity} <- insert(data, local),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity}
end
@@ -344,6 +350,7 @@ def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
},
data <- Utils.maybe_put(data, "id", activity_id),
{:ok, activity} <- insert(data, local),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity}
end
@@ -365,6 +372,7 @@ defp do_react_with_emoji(user, object, emoji, options) do
reaction_data <- make_emoji_reaction_data(user, object, emoji, activity_id),
{:ok, activity} <- insert(reaction_data, local),
{:ok, object} <- add_emoji_reaction_to_object(activity, object),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity, object}
else
@@ -391,6 +399,7 @@ defp do_unreact_with_emoji(user, reaction_id, options) do
unreact_data <- make_undo_data(user, reaction_activity, activity_id),
{:ok, activity} <- insert(unreact_data, local),
{:ok, object} <- remove_emoji_reaction_from_object(reaction_activity, object),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity, object}
else
@@ -413,6 +422,7 @@ defp do_unlike(actor, object, activity_id, local) do
{:ok, unlike_activity} <- insert(unlike_data, local),
{:ok, _activity} <- Repo.delete(like_activity),
{:ok, object} <- remove_like_from_object(like_activity, object),
+ _ <- notify_and_stream(unlike_activity),
:ok <- maybe_federate(unlike_activity) do
{:ok, unlike_activity, like_activity, object}
else
@@ -442,6 +452,7 @@ defp do_announce(user, object, activity_id, local, public) do
announce_data <- make_announce_data(user, object, activity_id, public),
{:ok, activity} <- insert(announce_data, local),
{:ok, object} <- add_announce_to_object(activity, object),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity, object}
else
@@ -468,6 +479,7 @@ defp do_unannounce(actor, object, activity_id, local) do
with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
{:ok, unannounce_activity} <- insert(unannounce_data, local),
+ _ <- notify_and_stream(unannounce_activity),
:ok <- maybe_federate(unannounce_activity),
{:ok, _activity} <- Repo.delete(announce_activity),
{:ok, object} <- remove_announce_from_object(announce_activity, object) do
@@ -490,6 +502,7 @@ def follow(follower, followed, activity_id \\ nil, local \\ true) do
defp do_follow(follower, followed, activity_id, local) do
with data <- make_follow_data(follower, followed, activity_id),
{:ok, activity} <- insert(data, local),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity}
else
@@ -511,6 +524,7 @@ defp do_unfollow(follower, followed, activity_id, local) do
{:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
{:ok, activity} <- insert(unfollow_data, local),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity}
else
@@ -540,6 +554,7 @@ defp do_block(blocker, blocked, activity_id, local) do
with true <- outgoing_blocks,
block_data <- make_block_data(blocker, blocked, activity_id),
{:ok, activity} <- insert(block_data, local),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity}
else
@@ -560,6 +575,7 @@ defp do_unblock(blocker, blocked, activity_id, local) do
with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
{:ok, activity} <- insert(unblock_data, local),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do
{:ok, activity}
else
@@ -594,6 +610,7 @@ def flag(
with flag_data <- make_flag_data(params, additional),
{:ok, activity} <- insert(flag_data, local),
{:ok, stripped_activity} <- strip_report_status_data(activity),
+ _ <- notify_and_stream(activity),
:ok <- maybe_federate(stripped_activity) do
User.all_superusers()
|> Enum.filter(fn user -> not is_nil(user.email) end)
@@ -617,7 +634,8 @@ def move(%User{} = origin, %User{} = target, local \\ true) do
}
with true <- origin.ap_id in target.also_known_as,
- {:ok, activity} <- insert(params, local) do
+ {:ok, activity} <- insert(params, local),
+ _ <- notify_and_stream(activity) do
maybe_federate(activity)
BackgroundWorker.enqueue("move_following", %{
diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex
index 5652a37c1..6ef3fe2dd 100644
--- a/lib/pleroma/web/mastodon_api/websocket_handler.ex
+++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex
@@ -12,6 +12,11 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
@behaviour :cowboy_websocket
+ # Cowboy timeout period.
+ @timeout :timer.seconds(30)
+ # Hibernate every X messages
+ @hibernate_every 100
+
@streams [
"public",
"public:local",
@@ -25,9 +30,6 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
]
@anonymous_streams ["public", "public:local", "hashtag"]
- # Handled by periodic keepalive in Pleroma.Web.Streamer.Ping.
- @timeout :infinity
-
def init(%{qs: qs} = req, state) do
with params <- :cow_qs.parse_qs(qs),
sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil),
@@ -42,7 +44,7 @@ def init(%{qs: qs} = req, state) do
req
end
- {:cowboy_websocket, req, %{user: user, topic: topic}, %{idle_timeout: @timeout}}
+ {:cowboy_websocket, req, %{user: user, topic: topic, count: 0}, %{idle_timeout: @timeout}}
else
{:error, code} ->
Logger.debug("#{__MODULE__} denied connection: #{inspect(code)} - #{inspect(req)}")
@@ -57,7 +59,13 @@ def init(%{qs: qs} = req, state) do
end
def websocket_init(state) do
- send(self(), :subscribe)
+ Logger.debug(
+ "#{__MODULE__} accepted websocket connection for user #{
+ (state.user || %{id: "anonymous"}).id
+ }, topic #{state.topic}"
+ )
+
+ Streamer.add_socket(state.topic, state.user)
{:ok, state}
end
@@ -66,19 +74,24 @@ def websocket_handle(_frame, state) do
{:ok, state}
end
- def websocket_info(:subscribe, state) do
- Logger.debug(
- "#{__MODULE__} accepted websocket connection for user #{
- (state.user || %{id: "anonymous"}).id
- }, topic #{state.topic}"
- )
+ def websocket_info({:render_with_user, view, template, item}, state) do
+ user = %User{} = User.get_cached_by_ap_id(state.user.ap_id)
- Streamer.add_socket(state.topic, streamer_socket(state))
- {:ok, state}
+ unless Streamer.filtered_by_user?(user, item) do
+ websocket_info({:text, view.render(template, user, item)}, %{state | user: user})
+ else
+ {:ok, state}
+ end
end
def websocket_info({:text, message}, state) do
- {:reply, {:text, message}, state}
+ # If the websocket processed X messages, force an hibernate/GC.
+ # We don't hibernate at every message to balance CPU usage/latency with RAM usage.
+ if state.count > @hibernate_every do
+ {:reply, {:text, message}, %{state | count: 0}, :hibernate}
+ else
+ {:reply, {:text, message}, %{state | count: state.count + 1}}
+ end
end
def terminate(reason, _req, state) do
@@ -88,7 +101,7 @@ def terminate(reason, _req, state) do
}, topic #{state.topic || "?"}: #{inspect(reason)}"
)
- Streamer.remove_socket(state.topic, streamer_socket(state))
+ Streamer.remove_socket(state.topic)
:ok
end
@@ -136,8 +149,4 @@ defp expand_topic("list", params) do
end
defp expand_topic(topic, _), do: topic
-
- defp streamer_socket(state) do
- %{transport_pid: self(), assigns: state}
- end
end
diff --git a/lib/pleroma/web/streamer/ping.ex b/lib/pleroma/web/streamer/ping.ex
deleted file mode 100644
index 7a08202a9..000000000
--- a/lib/pleroma/web/streamer/ping.ex
+++ /dev/null
@@ -1,37 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Streamer.Ping do
- use GenServer
- require Logger
-
- alias Pleroma.Web.Streamer.State
- alias Pleroma.Web.Streamer.StreamerSocket
-
- @keepalive_interval :timer.seconds(30)
-
- def start_link(opts) do
- ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval)
- GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__)
- end
-
- def init(%{ping_interval: ping_interval} = args) do
- Process.send_after(self(), :ping, ping_interval)
- {:ok, args}
- end
-
- def handle_info(:ping, %{ping_interval: ping_interval} = state) do
- State.get_sockets()
- |> Map.values()
- |> List.flatten()
- |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} ->
- Logger.debug("Sending keepalive ping")
- send(transport_pid, {:text, ""})
- end)
-
- Process.send_after(self(), :ping, ping_interval)
-
- {:noreply, state}
- end
-end
diff --git a/lib/pleroma/web/streamer/state.ex b/lib/pleroma/web/streamer/state.ex
deleted file mode 100644
index 999550b88..000000000
--- a/lib/pleroma/web/streamer/state.ex
+++ /dev/null
@@ -1,82 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Streamer.State do
- use GenServer
- require Logger
-
- alias Pleroma.Web.Streamer.StreamerSocket
-
- @env Mix.env()
-
- def start_link(_) do
- GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__)
- end
-
- def add_socket(topic, socket) do
- GenServer.call(__MODULE__, {:add, topic, socket})
- end
-
- def remove_socket(topic, socket) do
- do_remove_socket(@env, topic, socket)
- end
-
- def get_sockets do
- %{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state)
- stream_sockets
- end
-
- def init(init_arg) do
- {:ok, init_arg}
- end
-
- def handle_call(:get_state, _from, state) do
- {:reply, state, state}
- end
-
- def handle_call({:add, topic, socket}, _from, %{sockets: sockets} = state) do
- internal_topic = internal_topic(topic, socket)
- stream_socket = StreamerSocket.from_socket(socket)
-
- sockets_for_topic =
- sockets
- |> Map.get(internal_topic, [])
- |> List.insert_at(0, stream_socket)
- |> Enum.uniq()
-
- state = put_in(state, [:sockets, internal_topic], sockets_for_topic)
- Logger.debug("Got new conn for #{topic}")
- {:reply, state, state}
- end
-
- def handle_call({:remove, topic, socket}, _from, %{sockets: sockets} = state) do
- internal_topic = internal_topic(topic, socket)
- stream_socket = StreamerSocket.from_socket(socket)
-
- sockets_for_topic =
- sockets
- |> Map.get(internal_topic, [])
- |> List.delete(stream_socket)
-
- state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic)
- {:reply, state, state}
- end
-
- defp do_remove_socket(:test, _, _) do
- :ok
- end
-
- defp do_remove_socket(_env, topic, socket) do
- GenServer.call(__MODULE__, {:remove, topic, socket})
- end
-
- defp internal_topic(topic, socket)
- when topic in ~w[user user:notification direct] do
- "#{topic}:#{socket.assigns[:user].id}"
- end
-
- defp internal_topic(topic, _) do
- topic
- end
-end
diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex
index 814d5a729..5ad4aa936 100644
--- a/lib/pleroma/web/streamer/streamer.ex
+++ b/lib/pleroma/web/streamer/streamer.ex
@@ -3,53 +3,241 @@
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Streamer do
- alias Pleroma.Web.Streamer.State
- alias Pleroma.Web.Streamer.Worker
+ require Logger
+
+ alias Pleroma.Activity
+ alias Pleroma.Config
+ alias Pleroma.Conversation.Participation
+ alias Pleroma.Notification
+ alias Pleroma.Object
+ alias Pleroma.User
+ alias Pleroma.Web.ActivityPub.ActivityPub
+ alias Pleroma.Web.ActivityPub.Visibility
+ alias Pleroma.Web.CommonAPI
+ alias Pleroma.Web.StreamerView
- @timeout 60_000
@mix_env Mix.env()
+ @registry Pleroma.Web.StreamerRegistry
- def add_socket(topic, socket) do
- State.add_socket(topic, socket)
+ def registry, do: @registry
+
+ def add_socket(topic, %User{} = user) do
+ if should_env_send?(), do: Registry.register(@registry, user_topic(topic, user), true)
end
- def remove_socket(topic, socket) do
- State.remove_socket(topic, socket)
+ def add_socket(topic, _) do
+ if should_env_send?(), do: Registry.register(@registry, topic, false)
end
- def get_sockets do
- State.get_sockets()
+ def remove_socket(topic) do
+ if should_env_send?(), do: Registry.unregister(@registry, topic)
end
- def stream(topics, items) do
- if should_send?() do
- Task.async(fn ->
- :poolboy.transaction(
- :streamer_worker,
- &Worker.stream(&1, topics, items),
- @timeout
- )
+ def stream(topics, item) when is_list(topics) do
+ if should_env_send?() do
+ Enum.each(topics, fn t ->
+ spawn(fn -> do_stream(t, item) end)
end)
end
+
+ :ok
end
- def supervisor, do: Pleroma.Web.Streamer.Supervisor
+ def stream(topic, items) when is_list(items) do
+ if should_env_send?() do
+ Enum.each(items, fn i ->
+ spawn(fn -> do_stream(topic, i) end)
+ end)
- defp should_send? do
- handle_should_send(@mix_env)
- end
-
- defp handle_should_send(:test) do
- case Process.whereis(:streamer_worker) do
- nil ->
- false
-
- pid ->
- Process.alive?(pid)
+ :ok
end
end
- defp handle_should_send(:benchmark), do: false
+ def stream(topic, item) do
+ if should_env_send?() do
+ spawn(fn -> do_stream(topic, item) end)
+ end
- defp handle_should_send(_), do: true
+ :ok
+ end
+
+ def filtered_by_user?(%User{} = user, %Activity{} = item) do
+ %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
+ User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
+
+ recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
+ recipients = MapSet.new(item.recipients)
+ domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
+
+ with parent <- Object.normalize(item) || item,
+ true <-
+ Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
+ true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
+ true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
+ true <- MapSet.disjoint?(recipients, recipient_blocks),
+ %{host: item_host} <- URI.parse(item.actor),
+ %{host: parent_host} <- URI.parse(parent.data["actor"]),
+ false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
+ false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
+ true <- thread_containment(item, user),
+ false <- CommonAPI.thread_muted?(user, item) do
+ false
+ else
+ _ -> true
+ end
+ end
+
+ def filtered_by_user?(%User{} = user, %Notification{activity: activity}) do
+ filtered_by_user?(user, activity)
+ end
+
+ defp do_stream("direct", item) do
+ recipient_topics =
+ User.get_recipients_from_activity(item)
+ |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
+
+ Enum.each(recipient_topics, fn user_topic ->
+ Logger.debug("Trying to push direct message to #{user_topic}\n\n")
+ push_to_socket(user_topic, item)
+ end)
+ end
+
+ defp do_stream("participation", participation) do
+ user_topic = "direct:#{participation.user_id}"
+ Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
+
+ push_to_socket(user_topic, participation)
+ end
+
+ defp do_stream("list", item) do
+ # filter the recipient list if the activity is not public, see #270.
+ recipient_lists =
+ case Visibility.is_public?(item) do
+ true ->
+ Pleroma.List.get_lists_from_activity(item)
+
+ _ ->
+ Pleroma.List.get_lists_from_activity(item)
+ |> Enum.filter(fn list ->
+ owner = User.get_cached_by_id(list.user_id)
+
+ Visibility.visible_for_user?(item, owner)
+ end)
+ end
+
+ recipient_topics =
+ recipient_lists
+ |> Enum.map(fn %{id: id} -> "list:#{id}" end)
+
+ Enum.each(recipient_topics, fn list_topic ->
+ Logger.debug("Trying to push message to #{list_topic}\n\n")
+ push_to_socket(list_topic, item)
+ end)
+ end
+
+ defp do_stream(topic, %Notification{} = item)
+ when topic in ["user", "user:notification"] do
+ Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
+ Enum.each(list, fn {pid, _auth} ->
+ send(pid, {:render_with_user, StreamerView, "notification.json", item})
+ end)
+ end)
+ end
+
+ defp do_stream("user", item) do
+ Logger.debug("Trying to push to users")
+
+ recipient_topics =
+ User.get_recipients_from_activity(item)
+ |> Enum.map(fn %{id: id} -> "user:#{id}" end)
+
+ Enum.each(recipient_topics, fn topic ->
+ push_to_socket(topic, item)
+ end)
+ end
+
+ defp do_stream(topic, item) do
+ Logger.debug("Trying to push to #{topic}")
+ Logger.debug("Pushing item to #{topic}")
+ push_to_socket(topic, item)
+ end
+
+ defp push_to_socket(topic, %Participation{} = participation) do
+ rendered = StreamerView.render("conversation.json", participation)
+
+ Registry.dispatch(@registry, topic, fn list ->
+ Enum.each(list, fn {pid, _} ->
+ send(pid, {:text, rendered})
+ end)
+ end)
+ end
+
+ defp push_to_socket(topic, %Activity{
+ data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
+ }) do
+ rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
+
+ Registry.dispatch(@registry, topic, fn list ->
+ Enum.each(list, fn {pid, _} ->
+ send(pid, {:text, rendered})
+ end)
+ end)
+ end
+
+ defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
+
+ defp push_to_socket(topic, item) do
+ anon_render = StreamerView.render("update.json", item)
+
+ Registry.dispatch(@registry, topic, fn list ->
+ Enum.each(list, fn {pid, auth?} ->
+ if auth? do
+ send(pid, {:render_with_user, StreamerView, "update.json", item})
+ else
+ send(pid, {:text, anon_render})
+ end
+ end)
+ end)
+ end
+
+ defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
+
+ defp thread_containment(activity, user) do
+ if Config.get([:instance, :skip_thread_containment]) do
+ true
+ else
+ ActivityPub.contain_activity(activity, user)
+ end
+ end
+
+ # In test environement, only return true if the registry is started.
+ # In benchmark environment, returns false.
+ # In any other environment, always returns true.
+ cond do
+ @mix_env == :test ->
+ def should_env_send? do
+ case Process.whereis(@registry) do
+ nil ->
+ false
+
+ pid ->
+ Process.alive?(pid)
+ end
+ end
+
+ @mix_env == :benchmark ->
+ def should_env_send?, do: false
+
+ true ->
+ def should_env_send?, do: true
+ end
+
+ defp user_topic(topic, user)
+ when topic in ~w[user user:notification direct] do
+ "#{topic}:#{user.id}"
+ end
+
+ defp user_topic(topic, _) do
+ topic
+ end
end
diff --git a/lib/pleroma/web/streamer/streamer_socket.ex b/lib/pleroma/web/streamer/streamer_socket.ex
deleted file mode 100644
index 7d5dcd34e..000000000
--- a/lib/pleroma/web/streamer/streamer_socket.ex
+++ /dev/null
@@ -1,35 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Streamer.StreamerSocket do
- defstruct transport_pid: nil, user: nil
-
- alias Pleroma.User
- alias Pleroma.Web.Streamer.StreamerSocket
-
- def from_socket(%{
- transport_pid: transport_pid,
- assigns: %{user: nil}
- }) do
- %StreamerSocket{
- transport_pid: transport_pid
- }
- end
-
- def from_socket(%{
- transport_pid: transport_pid,
- assigns: %{user: %User{} = user}
- }) do
- %StreamerSocket{
- transport_pid: transport_pid,
- user: user
- }
- end
-
- def from_socket(%{transport_pid: transport_pid}) do
- %StreamerSocket{
- transport_pid: transport_pid
- }
- end
-end
diff --git a/lib/pleroma/web/streamer/supervisor.ex b/lib/pleroma/web/streamer/supervisor.ex
deleted file mode 100644
index bd9029bc0..000000000
--- a/lib/pleroma/web/streamer/supervisor.ex
+++ /dev/null
@@ -1,37 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Streamer.Supervisor do
- use Supervisor
-
- def start_link(opts) do
- Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
- end
-
- def init(args) do
- children = [
- {Pleroma.Web.Streamer.State, args},
- {Pleroma.Web.Streamer.Ping, args},
- :poolboy.child_spec(:streamer_worker, poolboy_config())
- ]
-
- opts = [strategy: :one_for_one, name: Pleroma.Web.Streamer.Supervisor]
- Supervisor.init(children, opts)
- end
-
- defp poolboy_config do
- opts =
- Pleroma.Config.get(:streamer,
- workers: 3,
- overflow_workers: 2
- )
-
- [
- {:name, {:local, :streamer_worker}},
- {:worker_module, Pleroma.Web.Streamer.Worker},
- {:size, opts[:workers]},
- {:max_overflow, opts[:overflow_workers]}
- ]
- end
-end
diff --git a/lib/pleroma/web/streamer/worker.ex b/lib/pleroma/web/streamer/worker.ex
deleted file mode 100644
index f6160fa4d..000000000
--- a/lib/pleroma/web/streamer/worker.ex
+++ /dev/null
@@ -1,208 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.Streamer.Worker do
- use GenServer
-
- require Logger
-
- alias Pleroma.Activity
- alias Pleroma.Config
- alias Pleroma.Conversation.Participation
- alias Pleroma.Notification
- alias Pleroma.Object
- alias Pleroma.User
- alias Pleroma.Web.ActivityPub.ActivityPub
- alias Pleroma.Web.ActivityPub.Visibility
- alias Pleroma.Web.CommonAPI
- alias Pleroma.Web.Streamer.State
- alias Pleroma.Web.Streamer.StreamerSocket
- alias Pleroma.Web.StreamerView
-
- def start_link(_) do
- GenServer.start_link(__MODULE__, %{}, [])
- end
-
- def init(init_arg) do
- {:ok, init_arg}
- end
-
- def stream(pid, topics, items) do
- GenServer.call(pid, {:stream, topics, items})
- end
-
- def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
- Enum.each(topics, fn t ->
- do_stream(%{topic: t, item: item})
- end)
-
- {:reply, state, state}
- end
-
- def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
- Enum.each(items, fn i ->
- do_stream(%{topic: topic, item: i})
- end)
-
- {:reply, state, state}
- end
-
- def handle_call({:stream, topic, item}, _from, state) do
- do_stream(%{topic: topic, item: item})
-
- {:reply, state, state}
- end
-
- defp do_stream(%{topic: "direct", item: item}) do
- recipient_topics =
- User.get_recipients_from_activity(item)
- |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
-
- Enum.each(recipient_topics, fn user_topic ->
- Logger.debug("Trying to push direct message to #{user_topic}\n\n")
- push_to_socket(State.get_sockets(), user_topic, item)
- end)
- end
-
- defp do_stream(%{topic: "participation", item: participation}) do
- user_topic = "direct:#{participation.user_id}"
- Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
-
- push_to_socket(State.get_sockets(), user_topic, participation)
- end
-
- defp do_stream(%{topic: "list", item: item}) do
- # filter the recipient list if the activity is not public, see #270.
- recipient_lists =
- case Visibility.is_public?(item) do
- true ->
- Pleroma.List.get_lists_from_activity(item)
-
- _ ->
- Pleroma.List.get_lists_from_activity(item)
- |> Enum.filter(fn list ->
- owner = User.get_cached_by_id(list.user_id)
-
- Visibility.visible_for_user?(item, owner)
- end)
- end
-
- recipient_topics =
- recipient_lists
- |> Enum.map(fn %{id: id} -> "list:#{id}" end)
-
- Enum.each(recipient_topics, fn list_topic ->
- Logger.debug("Trying to push message to #{list_topic}\n\n")
- push_to_socket(State.get_sockets(), list_topic, item)
- end)
- end
-
- defp do_stream(%{topic: topic, item: %Notification{} = item})
- when topic in ["user", "user:notification"] do
- State.get_sockets()
- |> Map.get("#{topic}:#{item.user_id}", [])
- |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
- with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
- true <- should_send?(user, item) do
- send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
- end
- end)
- end
-
- defp do_stream(%{topic: "user", item: item}) do
- Logger.debug("Trying to push to users")
-
- recipient_topics =
- User.get_recipients_from_activity(item)
- |> Enum.map(fn %{id: id} -> "user:#{id}" end)
-
- Enum.each(recipient_topics, fn topic ->
- push_to_socket(State.get_sockets(), topic, item)
- end)
- end
-
- defp do_stream(%{topic: topic, item: item}) do
- Logger.debug("Trying to push to #{topic}")
- Logger.debug("Pushing item to #{topic}")
- push_to_socket(State.get_sockets(), topic, item)
- end
-
- defp should_send?(%User{} = user, %Activity{} = item) do
- %{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
- User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
-
- recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
- recipients = MapSet.new(item.recipients)
- domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
-
- with parent <- Object.normalize(item) || item,
- true <-
- Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
- true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
- true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
- true <- MapSet.disjoint?(recipients, recipient_blocks),
- %{host: item_host} <- URI.parse(item.actor),
- %{host: parent_host} <- URI.parse(parent.data["actor"]),
- false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
- false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
- true <- thread_containment(item, user),
- false <- CommonAPI.thread_muted?(user, item) do
- true
- else
- _ -> false
- end
- end
-
- defp should_send?(%User{} = user, %Notification{activity: activity}) do
- should_send?(user, activity)
- end
-
- def push_to_socket(topics, topic, %Participation{} = participation) do
- Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
- send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
- end)
- end
-
- def push_to_socket(topics, topic, %Activity{
- data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
- }) do
- Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
- send(
- transport_pid,
- {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
- )
- end)
- end
-
- def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
-
- def push_to_socket(topics, topic, item) do
- Enum.each(topics[topic] || [], fn %StreamerSocket{
- transport_pid: transport_pid,
- user: socket_user
- } ->
- # Get the current user so we have up-to-date blocks etc.
- if socket_user do
- user = User.get_cached_by_ap_id(socket_user.ap_id)
-
- if should_send?(user, item) do
- send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
- end
- else
- send(transport_pid, {:text, StreamerView.render("update.json", item)})
- end
- end)
- end
-
- @spec thread_containment(Activity.t(), User.t()) :: boolean()
- defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
-
- defp thread_containment(activity, user) do
- if Config.get([:instance, :skip_thread_containment]) do
- true
- else
- ActivityPub.contain_activity(activity, user)
- end
- end
-end
diff --git a/test/integration/mastodon_websocket_test.exs b/test/integration/mastodon_websocket_test.exs
index bd229c55f..109c7b4cb 100644
--- a/test/integration/mastodon_websocket_test.exs
+++ b/test/integration/mastodon_websocket_test.exs
@@ -12,17 +12,14 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.OAuth
+ @moduletag needs_streamer: true, capture_log: true
+
@path Pleroma.Web.Endpoint.url()
|> URI.parse()
|> Map.put(:scheme, "ws")
|> Map.put(:path, "/api/v1/streaming")
|> URI.to_string()
- setup_all do
- start_supervised(Pleroma.Web.Streamer.supervisor())
- :ok
- end
-
def start_socket(qs \\ nil, headers \\ []) do
path =
case qs do
diff --git a/test/notification_test.exs b/test/notification_test.exs
index 601a6c0ca..5c85f3368 100644
--- a/test/notification_test.exs
+++ b/test/notification_test.exs
@@ -162,14 +162,18 @@ test "does not create a notification for subscribed users if status is a reply"
@tag needs_streamer: true
test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do
user = insert(:user)
- task = Task.async(fn -> assert_receive {:text, _}, 4_000 end)
- task_user_notification = Task.async(fn -> assert_receive {:text, _}, 4_000 end)
- Streamer.add_socket("user", %{transport_pid: task.pid, assigns: %{user: user}})
- Streamer.add_socket(
- "user:notification",
- %{transport_pid: task_user_notification.pid, assigns: %{user: user}}
- )
+ task =
+ Task.async(fn ->
+ Streamer.add_socket("user", user)
+ assert_receive {:render_with_user, _, _, _}, 4_000
+ end)
+
+ task_user_notification =
+ Task.async(fn ->
+ Streamer.add_socket("user:notification", user)
+ assert_receive {:render_with_user, _, _, _}, 4_000
+ end)
activity = insert(:note_activity)
diff --git a/test/support/builders/activity_builder.ex b/test/support/builders/activity_builder.ex
index 6e5a8e059..7c4950bfa 100644
--- a/test/support/builders/activity_builder.ex
+++ b/test/support/builders/activity_builder.ex
@@ -21,7 +21,15 @@ def build(data \\ %{}, opts \\ %{}) do
def insert(data \\ %{}, opts \\ %{}) do
activity = build(data, opts)
- ActivityPub.insert(activity)
+
+ case ActivityPub.insert(activity) do
+ ok = {:ok, activity} ->
+ ActivityPub.notify_and_stream(activity)
+ ok
+
+ error ->
+ error
+ end
end
def insert_list(times, data \\ %{}, opts \\ %{}) do
diff --git a/test/support/conn_case.ex b/test/support/conn_case.ex
index 91c03b1a8..b23918dd1 100644
--- a/test/support/conn_case.ex
+++ b/test/support/conn_case.ex
@@ -139,7 +139,11 @@ defp ensure_federating_or_authenticated(conn, url, user) do
end
if tags[:needs_streamer] do
- start_supervised(Pleroma.Web.Streamer.supervisor())
+ start_supervised(%{
+ id: Pleroma.Web.Streamer.registry(),
+ start:
+ {Registry, :start_link, [[keys: :duplicate, name: Pleroma.Web.Streamer.registry()]]}
+ })
end
{:ok, conn: Phoenix.ConnTest.build_conn()}
diff --git a/test/support/data_case.ex b/test/support/data_case.ex
index 1669f2520..ba8848952 100644
--- a/test/support/data_case.ex
+++ b/test/support/data_case.ex
@@ -40,7 +40,11 @@ defmodule Pleroma.DataCase do
end
if tags[:needs_streamer] do
- start_supervised(Pleroma.Web.Streamer.supervisor())
+ start_supervised(%{
+ id: Pleroma.Web.Streamer.registry(),
+ start:
+ {Registry, :start_link, [[keys: :duplicate, name: Pleroma.Web.Streamer.registry()]]}
+ })
end
:ok
diff --git a/test/web/streamer/ping_test.exs b/test/web/streamer/ping_test.exs
deleted file mode 100644
index 5df6c1cc3..000000000
--- a/test/web/streamer/ping_test.exs
+++ /dev/null
@@ -1,36 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.PingTest do
- use Pleroma.DataCase
-
- import Pleroma.Factory
- alias Pleroma.Web.Streamer
-
- setup do
- start_supervised({Streamer.supervisor(), [ping_interval: 30]})
-
- :ok
- end
-
- describe "sockets" do
- setup do
- user = insert(:user)
- {:ok, %{user: user}}
- end
-
- test "it sends pings", %{user: user} do
- task =
- Task.async(fn ->
- assert_receive {:text, received_event}, 40
- assert_receive {:text, received_event}, 40
- assert_receive {:text, received_event}, 40
- end)
-
- Streamer.add_socket("public", %{transport_pid: task.pid, assigns: %{user: user}})
-
- Task.await(task)
- end
- end
-end
diff --git a/test/web/streamer/state_test.exs b/test/web/streamer/state_test.exs
deleted file mode 100644
index a755e75c0..000000000
--- a/test/web/streamer/state_test.exs
+++ /dev/null
@@ -1,54 +0,0 @@
-# Pleroma: A lightweight social networking server
-# Copyright © 2017-2020 Pleroma Authors
-# SPDX-License-Identifier: AGPL-3.0-only
-
-defmodule Pleroma.Web.StateTest do
- use Pleroma.DataCase
-
- import Pleroma.Factory
- alias Pleroma.Web.Streamer
- alias Pleroma.Web.Streamer.StreamerSocket
-
- @moduletag needs_streamer: true
-
- describe "sockets" do
- setup do
- user = insert(:user)
- user2 = insert(:user)
- {:ok, %{user: user, user2: user2}}
- end
-
- test "it can add a socket", %{user: user} do
- Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}})
-
- assert(%{"public" => [%StreamerSocket{transport_pid: 1}]} = Streamer.get_sockets())
- end
-
- test "it can add multiple sockets per user", %{user: user} do
- Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}})
- Streamer.add_socket("public", %{transport_pid: 2, assigns: %{user: user}})
-
- assert(
- %{
- "public" => [
- %StreamerSocket{transport_pid: 2},
- %StreamerSocket{transport_pid: 1}
- ]
- } = Streamer.get_sockets()
- )
- end
-
- test "it will not add a duplicate socket", %{user: user} do
- Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}})
- Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}})
-
- assert(
- %{
- "activity" => [
- %StreamerSocket{transport_pid: 1}
- ]
- } = Streamer.get_sockets()
- )
- end
- end
-end
diff --git a/test/web/streamer/streamer_test.exs b/test/web/streamer/streamer_test.exs
index 3c0f240f5..ee530f4e9 100644
--- a/test/web/streamer/streamer_test.exs
+++ b/test/web/streamer/streamer_test.exs
@@ -12,13 +12,9 @@ defmodule Pleroma.Web.StreamerTest do
alias Pleroma.User
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Streamer
- alias Pleroma.Web.Streamer.StreamerSocket
- alias Pleroma.Web.Streamer.Worker
@moduletag needs_streamer: true, capture_log: true
- @streamer_timeout 150
- @streamer_start_wait 10
setup do: clear_config([:instance, :skip_thread_containment])
describe "user streams" do
@@ -29,69 +25,35 @@ defmodule Pleroma.Web.StreamerTest do
end
test "it streams the user's post in the 'user' stream", %{user: user} do
- task =
- Task.async(fn ->
- assert_receive {:text, _}, @streamer_timeout
- end)
-
- Streamer.add_socket(
- "user",
- %{transport_pid: task.pid, assigns: %{user: user}}
- )
-
+ Streamer.add_socket("user", user)
{:ok, activity} = CommonAPI.post(user, %{"status" => "hey"})
-
- Streamer.stream("user", activity)
- Task.await(task)
+ assert_receive {:render_with_user, _, _, ^activity}
+ refute Streamer.filtered_by_user?(user, activity)
end
test "it streams boosts of the user in the 'user' stream", %{user: user} do
- task =
- Task.async(fn ->
- assert_receive {:text, _}, @streamer_timeout
- end)
-
- Streamer.add_socket(
- "user",
- %{transport_pid: task.pid, assigns: %{user: user}}
- )
+ Streamer.add_socket("user", user)
other_user = insert(:user)
{:ok, activity} = CommonAPI.post(other_user, %{"status" => "hey"})
{:ok, announce, _} = CommonAPI.repeat(activity.id, user)
- Streamer.stream("user", announce)
- Task.await(task)
+ assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce}
+ refute Streamer.filtered_by_user?(user, announce)
end
test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
- task =
- Task.async(fn ->
- assert_receive {:text, _}, @streamer_timeout
- end)
-
- Streamer.add_socket(
- "user",
- %{transport_pid: task.pid, assigns: %{user: user}}
- )
-
+ Streamer.add_socket("user", user)
Streamer.stream("user", notify)
- Task.await(task)
+ assert_receive {:render_with_user, _, _, ^notify}
+ refute Streamer.filtered_by_user?(user, notify)
end
test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do
- task =
- Task.async(fn ->
- assert_receive {:text, _}, @streamer_timeout
- end)
-
- Streamer.add_socket(
- "user:notification",
- %{transport_pid: task.pid, assigns: %{user: user}}
- )
-
+ Streamer.add_socket("user:notification", user)
Streamer.stream("user:notification", notify)
- Task.await(task)
+ assert_receive {:render_with_user, _, _, ^notify}
+ refute Streamer.filtered_by_user?(user, notify)
end
test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
@@ -100,18 +62,12 @@ test "it doesn't send notify to the 'user:notification' stream when a user is bl
blocked = insert(:user)
{:ok, _user_relationship} = User.block(user, blocked)
- task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
-
- Streamer.add_socket(
- "user:notification",
- %{transport_pid: task.pid, assigns: %{user: user}}
- )
+ Streamer.add_socket("user:notification", user)
{:ok, activity} = CommonAPI.post(user, %{"status" => ":("})
- {:ok, notif} = CommonAPI.favorite(blocked, activity.id)
+ {:ok, _} = CommonAPI.favorite(blocked, activity.id)
- Streamer.stream("user:notification", notif)
- Task.await(task)
+ refute_receive _
end
test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
@@ -119,45 +75,50 @@ test "it doesn't send notify to the 'user:notification' stream when a thread is
} do
user2 = insert(:user)
- task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
-
- Streamer.add_socket(
- "user:notification",
- %{transport_pid: task.pid, assigns: %{user: user}}
- )
-
{:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
- {:ok, activity} = CommonAPI.add_mute(user, activity)
- {:ok, notif} = CommonAPI.favorite(user2, activity.id)
+ {:ok, _} = CommonAPI.add_mute(user, activity)
- Streamer.stream("user:notification", notif)
- Task.await(task)
+ Streamer.add_socket("user:notification", user)
+
+ {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
+
+ refute_receive _
+ assert Streamer.filtered_by_user?(user, favorite_activity)
end
- test "it doesn't send notify to the 'user:notification' stream' when a domain is blocked", %{
+ test "it sends favorite to 'user:notification' stream'", %{
user: user
} do
user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
- task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
+ {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
+ Streamer.add_socket("user:notification", user)
+ {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
- Streamer.add_socket(
- "user:notification",
- %{transport_pid: task.pid, assigns: %{user: user}}
- )
+ assert_receive {:render_with_user, _, "notification.json", notif}
+ assert notif.activity.id == favorite_activity.id
+ refute Streamer.filtered_by_user?(user, notif)
+ end
+
+ test "it doesn't send the 'user:notification' stream' when a domain is blocked", %{
+ user: user
+ } do
+ user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
{:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
{:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
- {:ok, notif} = CommonAPI.favorite(user2, activity.id)
+ Streamer.add_socket("user:notification", user)
+ {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
- Streamer.stream("user:notification", notif)
- Task.await(task)
+ refute_receive _
+ assert Streamer.filtered_by_user?(user, favorite_activity)
end
test "it sends follow activities to the 'user:notification' stream", %{
user: user
} do
user_url = user.ap_id
+ user2 = insert(:user)
body =
File.read!("test/fixtures/users_mock/localhost.json")
@@ -169,47 +130,24 @@ test "it sends follow activities to the 'user:notification' stream", %{
%Tesla.Env{status: 200, body: body}
end)
- user2 = insert(:user)
- task = Task.async(fn -> assert_receive {:text, _}, @streamer_timeout end)
+ Streamer.add_socket("user:notification", user)
+ {:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user)
- Process.sleep(@streamer_start_wait)
-
- Streamer.add_socket(
- "user:notification",
- %{transport_pid: task.pid, assigns: %{user: user}}
- )
-
- {:ok, _follower, _followed, _activity} = CommonAPI.follow(user2, user)
-
- # We don't directly pipe the notification to the streamer as it's already
- # generated as a side effect of CommonAPI.follow().
- Task.await(task)
+ assert_receive {:render_with_user, _, "notification.json", notif}
+ assert notif.activity.id == follow_activity.id
+ refute Streamer.filtered_by_user?(user, notif)
end
end
- test "it sends to public" do
+ test "it sends to public authenticated" do
user = insert(:user)
other_user = insert(:user)
- task =
- Task.async(fn ->
- assert_receive {:text, _}, @streamer_timeout
- end)
+ Streamer.add_socket("public", other_user)
- fake_socket = %StreamerSocket{
- transport_pid: task.pid,
- user: user
- }
-
- {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
-
- topics = %{
- "public" => [fake_socket]
- }
-
- Worker.push_to_socket(topics, "public", activity)
-
- Task.await(task)
+ {:ok, activity} = CommonAPI.post(user, %{"status" => "Test"})
+ assert_receive {:render_with_user, _, _, ^activity}
+ refute Streamer.filtered_by_user?(user, activity)
end
test "works for deletions" do
@@ -217,37 +155,32 @@ test "works for deletions" do
other_user = insert(:user)
{:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
- task =
- Task.async(fn ->
- expected_event =
- %{
- "event" => "delete",
- "payload" => activity.id
- }
- |> Jason.encode!()
+ Streamer.add_socket("public", user)
- assert_receive {:text, received_event}, @streamer_timeout
- assert received_event == expected_event
- end)
+ {:ok, _} = CommonAPI.delete(activity.id, other_user)
+ activity_id = activity.id
+ assert_receive {:text, event}
+ assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
+ end
- fake_socket = %StreamerSocket{
- transport_pid: task.pid,
- user: user
- }
+ test "it sends to public unauthenticated" do
+ user = insert(:user)
- {:ok, activity} = CommonAPI.delete(activity.id, other_user)
+ Streamer.add_socket("public", nil)
- topics = %{
- "public" => [fake_socket]
- }
+ {:ok, activity} = CommonAPI.post(user, %{"status" => "Test"})
+ activity_id = activity.id
+ assert_receive {:text, event}
+ assert %{"event" => "update", "payload" => payload} = Jason.decode!(event)
+ assert %{"id" => ^activity_id} = Jason.decode!(payload)
- Worker.push_to_socket(topics, "public", activity)
-
- Task.await(task)
+ {:ok, _} = CommonAPI.delete(activity.id, user)
+ assert_receive {:text, event}
+ assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
end
describe "thread_containment" do
- test "it doesn't send to user if recipients invalid and thread containment is enabled" do
+ test "it filters to user if recipients invalid and thread containment is enabled" do
Pleroma.Config.put([:instance, :skip_thread_containment], false)
author = insert(:user)
user = insert(:user)
@@ -262,12 +195,10 @@ test "it doesn't send to user if recipients invalid and thread containment is en
)
)
- task = Task.async(fn -> refute_receive {:text, _}, 1_000 end)
- fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
- topics = %{"public" => [fake_socket]}
- Worker.push_to_socket(topics, "public", activity)
-
- Task.await(task)
+ Streamer.add_socket("public", user)
+ Streamer.stream("public", activity)
+ assert_receive {:render_with_user, _, _, ^activity}
+ assert Streamer.filtered_by_user?(user, activity)
end
test "it sends message if recipients invalid and thread containment is disabled" do
@@ -285,12 +216,11 @@ test "it sends message if recipients invalid and thread containment is disabled"
)
)
- task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
- fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
- topics = %{"public" => [fake_socket]}
- Worker.push_to_socket(topics, "public", activity)
+ Streamer.add_socket("public", user)
+ Streamer.stream("public", activity)
- Task.await(task)
+ assert_receive {:render_with_user, _, _, ^activity}
+ refute Streamer.filtered_by_user?(user, activity)
end
test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
@@ -308,255 +238,168 @@ test "it sends message if recipients invalid and thread containment is enabled b
)
)
- task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
- fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
- topics = %{"public" => [fake_socket]}
- Worker.push_to_socket(topics, "public", activity)
+ Streamer.add_socket("public", user)
+ Streamer.stream("public", activity)
- Task.await(task)
+ assert_receive {:render_with_user, _, _, ^activity}
+ refute Streamer.filtered_by_user?(user, activity)
end
end
describe "blocks" do
- test "it doesn't send messages involving blocked users" do
+ test "it filters messages involving blocked users" do
user = insert(:user)
blocked_user = insert(:user)
{:ok, _user_relationship} = User.block(user, blocked_user)
+ Streamer.add_socket("public", user)
{:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
-
- task =
- Task.async(fn ->
- refute_receive {:text, _}, 1_000
- end)
-
- fake_socket = %StreamerSocket{
- transport_pid: task.pid,
- user: user
- }
-
- topics = %{
- "public" => [fake_socket]
- }
-
- Worker.push_to_socket(topics, "public", activity)
-
- Task.await(task)
+ assert_receive {:render_with_user, _, _, ^activity}
+ assert Streamer.filtered_by_user?(user, activity)
end
- test "it doesn't send messages transitively involving blocked users" do
+ test "it filters messages transitively involving blocked users" do
blocker = insert(:user)
blockee = insert(:user)
friend = insert(:user)
- task =
- Task.async(fn ->
- refute_receive {:text, _}, 1_000
- end)
-
- fake_socket = %StreamerSocket{
- transport_pid: task.pid,
- user: blocker
- }
-
- topics = %{
- "public" => [fake_socket]
- }
+ Streamer.add_socket("public", blocker)
{:ok, _user_relationship} = User.block(blocker, blockee)
{:ok, activity_one} = CommonAPI.post(friend, %{"status" => "hey! @#{blockee.nickname}"})
- Worker.push_to_socket(topics, "public", activity_one)
+ assert_receive {:render_with_user, _, _, ^activity_one}
+ assert Streamer.filtered_by_user?(blocker, activity_one)
{:ok, activity_two} = CommonAPI.post(blockee, %{"status" => "hey! @#{friend.nickname}"})
- Worker.push_to_socket(topics, "public", activity_two)
+ assert_receive {:render_with_user, _, _, ^activity_two}
+ assert Streamer.filtered_by_user?(blocker, activity_two)
{:ok, activity_three} = CommonAPI.post(blockee, %{"status" => "hey! @#{blocker.nickname}"})
- Worker.push_to_socket(topics, "public", activity_three)
-
- Task.await(task)
+ assert_receive {:render_with_user, _, _, ^activity_three}
+ assert Streamer.filtered_by_user?(blocker, activity_three)
end
end
- test "it doesn't send unwanted DMs to list" do
- user_a = insert(:user)
- user_b = insert(:user)
- user_c = insert(:user)
+ describe "lists" do
+ test "it doesn't send unwanted DMs to list" do
+ user_a = insert(:user)
+ user_b = insert(:user)
+ user_c = insert(:user)
- {:ok, user_a} = User.follow(user_a, user_b)
+ {:ok, user_a} = User.follow(user_a, user_b)
- {:ok, list} = List.create("Test", user_a)
- {:ok, list} = List.follow(list, user_b)
+ {:ok, list} = List.create("Test", user_a)
+ {:ok, list} = List.follow(list, user_b)
- {:ok, activity} =
- CommonAPI.post(user_b, %{
- "status" => "@#{user_c.nickname} Test",
- "visibility" => "direct"
- })
+ Streamer.add_socket("list:#{list.id}", user_a)
- task =
- Task.async(fn ->
- refute_receive {:text, _}, 1_000
- end)
+ {:ok, _activity} =
+ CommonAPI.post(user_b, %{
+ "status" => "@#{user_c.nickname} Test",
+ "visibility" => "direct"
+ })
- fake_socket = %StreamerSocket{
- transport_pid: task.pid,
- user: user_a
- }
+ refute_receive _
+ end
- topics = %{
- "list:#{list.id}" => [fake_socket]
- }
+ test "it doesn't send unwanted private posts to list" do
+ user_a = insert(:user)
+ user_b = insert(:user)
- Worker.handle_call({:stream, "list", activity}, self(), topics)
+ {:ok, list} = List.create("Test", user_a)
+ {:ok, list} = List.follow(list, user_b)
- Task.await(task)
+ Streamer.add_socket("list:#{list.id}", user_a)
+
+ {:ok, _activity} =
+ CommonAPI.post(user_b, %{
+ "status" => "Test",
+ "visibility" => "private"
+ })
+
+ refute_receive _
+ end
+
+ test "it sends wanted private posts to list" do
+ user_a = insert(:user)
+ user_b = insert(:user)
+
+ {:ok, user_a} = User.follow(user_a, user_b)
+
+ {:ok, list} = List.create("Test", user_a)
+ {:ok, list} = List.follow(list, user_b)
+
+ Streamer.add_socket("list:#{list.id}", user_a)
+
+ {:ok, activity} =
+ CommonAPI.post(user_b, %{
+ "status" => "Test",
+ "visibility" => "private"
+ })
+
+ assert_receive {:render_with_user, _, _, ^activity}
+ refute Streamer.filtered_by_user?(user_a, activity)
+ end
end
- test "it doesn't send unwanted private posts to list" do
- user_a = insert(:user)
- user_b = insert(:user)
+ describe "muted reblogs" do
+ test "it filters muted reblogs" do
+ user1 = insert(:user)
+ user2 = insert(:user)
+ user3 = insert(:user)
+ CommonAPI.follow(user1, user2)
+ CommonAPI.hide_reblogs(user1, user2)
- {:ok, list} = List.create("Test", user_a)
- {:ok, list} = List.follow(list, user_b)
+ {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
- {:ok, activity} =
- CommonAPI.post(user_b, %{
- "status" => "Test",
- "visibility" => "private"
- })
+ Streamer.add_socket("user", user1)
+ {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2)
+ assert_receive {:render_with_user, _, _, ^announce_activity}
+ assert Streamer.filtered_by_user?(user1, announce_activity)
+ end
- task =
- Task.async(fn ->
- refute_receive {:text, _}, 1_000
- end)
+ test "it filters reblog notification for reblog-muted actors" do
+ user1 = insert(:user)
+ user2 = insert(:user)
+ CommonAPI.follow(user1, user2)
+ CommonAPI.hide_reblogs(user1, user2)
- fake_socket = %StreamerSocket{
- transport_pid: task.pid,
- user: user_a
- }
+ {:ok, create_activity} = CommonAPI.post(user1, %{"status" => "I'm kawen"})
+ Streamer.add_socket("user", user1)
+ {:ok, _favorite_activity, _} = CommonAPI.repeat(create_activity.id, user2)
- topics = %{
- "list:#{list.id}" => [fake_socket]
- }
+ assert_receive {:render_with_user, _, "notification.json", notif}
+ assert Streamer.filtered_by_user?(user1, notif)
+ end
- Worker.handle_call({:stream, "list", activity}, self(), topics)
+ test "it send non-reblog notification for reblog-muted actors" do
+ user1 = insert(:user)
+ user2 = insert(:user)
+ CommonAPI.follow(user1, user2)
+ CommonAPI.hide_reblogs(user1, user2)
- Task.await(task)
+ {:ok, create_activity} = CommonAPI.post(user1, %{"status" => "I'm kawen"})
+ Streamer.add_socket("user", user1)
+ {:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id)
+
+ assert_receive {:render_with_user, _, "notification.json", notif}
+ refute Streamer.filtered_by_user?(user1, notif)
+ end
end
- test "it sends wanted private posts to list" do
- user_a = insert(:user)
- user_b = insert(:user)
-
- {:ok, user_a} = User.follow(user_a, user_b)
-
- {:ok, list} = List.create("Test", user_a)
- {:ok, list} = List.follow(list, user_b)
-
- {:ok, activity} =
- CommonAPI.post(user_b, %{
- "status" => "Test",
- "visibility" => "private"
- })
-
- task =
- Task.async(fn ->
- assert_receive {:text, _}, 1_000
- end)
-
- fake_socket = %StreamerSocket{
- transport_pid: task.pid,
- user: user_a
- }
-
- Streamer.add_socket(
- "list:#{list.id}",
- fake_socket
- )
-
- Worker.handle_call({:stream, "list", activity}, self(), %{})
-
- Task.await(task)
- end
-
- test "it doesn't send muted reblogs" do
- user1 = insert(:user)
- user2 = insert(:user)
- user3 = insert(:user)
- CommonAPI.hide_reblogs(user1, user2)
-
- {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
- {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2)
-
- task =
- Task.async(fn ->
- refute_receive {:text, _}, 1_000
- end)
-
- fake_socket = %StreamerSocket{
- transport_pid: task.pid,
- user: user1
- }
-
- topics = %{
- "public" => [fake_socket]
- }
-
- Worker.push_to_socket(topics, "public", announce_activity)
-
- Task.await(task)
- end
-
- test "it does send non-reblog notification for reblog-muted actors" do
- user1 = insert(:user)
- user2 = insert(:user)
- user3 = insert(:user)
- CommonAPI.hide_reblogs(user1, user2)
-
- {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
- {:ok, favorite_activity} = CommonAPI.favorite(user2, create_activity.id)
-
- task =
- Task.async(fn ->
- assert_receive {:text, _}, 1_000
- end)
-
- fake_socket = %StreamerSocket{
- transport_pid: task.pid,
- user: user1
- }
-
- topics = %{
- "public" => [fake_socket]
- }
-
- Worker.push_to_socket(topics, "public", favorite_activity)
-
- Task.await(task)
- end
-
- test "it doesn't send posts from muted threads" do
+ test "it filters posts from muted threads" do
user = insert(:user)
user2 = insert(:user)
+ Streamer.add_socket("user", user2)
{:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
-
{:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
-
- {:ok, activity} = CommonAPI.add_mute(user2, activity)
-
- task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
-
- Streamer.add_socket(
- "user",
- %{transport_pid: task.pid, assigns: %{user: user2}}
- )
-
- Streamer.stream("user", activity)
- Task.await(task)
+ {:ok, _} = CommonAPI.add_mute(user2, activity)
+ assert_receive {:render_with_user, _, _, ^activity}
+ assert Streamer.filtered_by_user?(user2, activity)
end
describe "direct streams" do
@@ -568,22 +411,7 @@ test "it sends conversation update to the 'direct' stream", %{} do
user = insert(:user)
another_user = insert(:user)
- task =
- Task.async(fn ->
- assert_receive {:text, received_event}, @streamer_timeout
-
- assert %{"event" => "conversation", "payload" => received_payload} =
- Jason.decode!(received_event)
-
- assert %{"last_status" => last_status} = Jason.decode!(received_payload)
- [participation] = Participation.for_user(user)
- assert last_status["pleroma"]["direct_conversation_id"] == participation.id
- end)
-
- Streamer.add_socket(
- "direct",
- %{transport_pid: task.pid, assigns: %{user: user}}
- )
+ Streamer.add_socket("direct", user)
{:ok, _create_activity} =
CommonAPI.post(another_user, %{
@@ -591,42 +419,47 @@ test "it sends conversation update to the 'direct' stream", %{} do
"visibility" => "direct"
})
- Task.await(task)
+ assert_receive {:text, received_event}
+
+ assert %{"event" => "conversation", "payload" => received_payload} =
+ Jason.decode!(received_event)
+
+ assert %{"last_status" => last_status} = Jason.decode!(received_payload)
+ [participation] = Participation.for_user(user)
+ assert last_status["pleroma"]["direct_conversation_id"] == participation.id
end
test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted" do
user = insert(:user)
another_user = insert(:user)
+ Streamer.add_socket("direct", user)
+
{:ok, create_activity} =
CommonAPI.post(another_user, %{
"status" => "hi @#{user.nickname}",
"visibility" => "direct"
})
- task =
- Task.async(fn ->
- assert_receive {:text, received_event}, @streamer_timeout
- assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
+ create_activity_id = create_activity.id
+ assert_receive {:render_with_user, _, _, ^create_activity}
+ assert_receive {:text, received_conversation1}
+ assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
- refute_receive {:text, _}, @streamer_timeout
- end)
+ {:ok, _} = CommonAPI.delete(create_activity_id, another_user)
- Process.sleep(@streamer_start_wait)
+ assert_receive {:text, received_event}
- Streamer.add_socket(
- "direct",
- %{transport_pid: task.pid, assigns: %{user: user}}
- )
+ assert %{"event" => "delete", "payload" => ^create_activity_id} =
+ Jason.decode!(received_event)
- {:ok, _} = CommonAPI.delete(create_activity.id, another_user)
-
- Task.await(task)
+ refute_receive _
end
test "it sends conversation update to the 'direct' stream when a message is deleted" do
user = insert(:user)
another_user = insert(:user)
+ Streamer.add_socket("direct", user)
{:ok, create_activity} =
CommonAPI.post(another_user, %{
@@ -636,35 +469,30 @@ test "it sends conversation update to the 'direct' stream when a message is dele
{:ok, create_activity2} =
CommonAPI.post(another_user, %{
- "status" => "hi @#{user.nickname}",
+ "status" => "hi @#{user.nickname} 2",
"in_reply_to_status_id" => create_activity.id,
"visibility" => "direct"
})
- task =
- Task.async(fn ->
- assert_receive {:text, received_event}, @streamer_timeout
- assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
-
- assert_receive {:text, received_event}, @streamer_timeout
-
- assert %{"event" => "conversation", "payload" => received_payload} =
- Jason.decode!(received_event)
-
- assert %{"last_status" => last_status} = Jason.decode!(received_payload)
- assert last_status["id"] == to_string(create_activity.id)
- end)
-
- Process.sleep(@streamer_start_wait)
-
- Streamer.add_socket(
- "direct",
- %{transport_pid: task.pid, assigns: %{user: user}}
- )
+ assert_receive {:render_with_user, _, _, ^create_activity}
+ assert_receive {:render_with_user, _, _, ^create_activity2}
+ assert_receive {:text, received_conversation1}
+ assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
+ assert_receive {:text, received_conversation1}
+ assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
{:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
- Task.await(task)
+ assert_receive {:text, received_event}
+ assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
+
+ assert_receive {:text, received_event}
+
+ assert %{"event" => "conversation", "payload" => received_payload} =
+ Jason.decode!(received_event)
+
+ assert %{"last_status" => last_status} = Jason.decode!(received_payload)
+ assert last_status["id"] == to_string(create_activity.id)
end
end
end