Merge branch 'streamer-worker-registry' into 'develop'

Streamer rework

See merge request pleroma/pleroma!2426
This commit is contained in:
lain 2020-05-07 09:13:32 +00:00
commit 4061841846
17 changed files with 535 additions and 957 deletions

View file

@ -173,7 +173,14 @@ defp chat_enabled?, do: Config.get([:chat, :enabled])
defp streamer_child(env) when env in [:test, :benchmark], do: [] defp streamer_child(env) when env in [:test, :benchmark], do: []
defp streamer_child(_) do defp streamer_child(_) do
[Pleroma.Web.Streamer.supervisor()] [
{Registry,
[
name: Pleroma.Web.Streamer.registry(),
keys: :duplicate,
partitions: System.schedulers_online()
]}
]
end end
defp chat_child(_env, true) do defp chat_child(_env, true) do

View file

@ -170,12 +170,6 @@ def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when
BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id}) BackgroundWorker.enqueue("fetch_data_for_activity", %{"activity_id" => activity.id})
Notification.create_notifications(activity)
conversation = create_or_bump_conversation(activity, map["actor"])
participations = get_participations(conversation)
stream_out(activity)
stream_out_participations(participations)
{:ok, activity} {:ok, activity}
else else
%Activity{} = activity -> %Activity{} = activity ->
@ -198,6 +192,15 @@ def insert(map, local \\ true, fake \\ false, bypass_actor_check \\ false) when
end end
end end
def notify_and_stream(activity) do
Notification.create_notifications(activity)
conversation = create_or_bump_conversation(activity, activity.actor)
participations = get_participations(conversation)
stream_out(activity)
stream_out_participations(participations)
end
defp create_or_bump_conversation(activity, actor) do defp create_or_bump_conversation(activity, actor) do
with {:ok, conversation} <- Conversation.create_or_bump_for(activity), with {:ok, conversation} <- Conversation.create_or_bump_for(activity),
%User{} = user <- User.get_cached_by_ap_id(actor), %User{} = user <- User.get_cached_by_ap_id(actor),
@ -274,6 +277,7 @@ defp do_create(%{to: to, actor: actor, context: context, object: object} = param
_ <- increase_poll_votes_if_vote(create_data), _ <- increase_poll_votes_if_vote(create_data),
{:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity}, {:quick_insert, false, activity} <- {:quick_insert, quick_insert?, activity},
{:ok, _actor} <- increase_note_count_if_public(actor, activity), {:ok, _actor} <- increase_note_count_if_public(actor, activity),
_ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do :ok <- maybe_federate(activity) do
{:ok, activity} {:ok, activity}
else else
@ -301,6 +305,7 @@ def listen(%{to: to, actor: actor, context: context, object: object} = params) d
additional additional
), ),
{:ok, activity} <- insert(listen_data, local), {:ok, activity} <- insert(listen_data, local),
_ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do :ok <- maybe_federate(activity) do
{:ok, activity} {:ok, activity}
end end
@ -325,6 +330,7 @@ def accept_or_reject(type, %{to: to, actor: actor, object: object} = params) do
%{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object} %{"to" => to, "type" => type, "actor" => actor.ap_id, "object" => object}
|> Utils.maybe_put("id", activity_id), |> Utils.maybe_put("id", activity_id),
{:ok, activity} <- insert(data, local), {:ok, activity} <- insert(data, local),
_ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do :ok <- maybe_federate(activity) do
{:ok, activity} {:ok, activity}
end end
@ -344,6 +350,7 @@ def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
}, },
data <- Utils.maybe_put(data, "id", activity_id), data <- Utils.maybe_put(data, "id", activity_id),
{:ok, activity} <- insert(data, local), {:ok, activity} <- insert(data, local),
_ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do :ok <- maybe_federate(activity) do
{:ok, activity} {:ok, activity}
end end
@ -365,6 +372,7 @@ defp do_react_with_emoji(user, object, emoji, options) do
reaction_data <- make_emoji_reaction_data(user, object, emoji, activity_id), reaction_data <- make_emoji_reaction_data(user, object, emoji, activity_id),
{:ok, activity} <- insert(reaction_data, local), {:ok, activity} <- insert(reaction_data, local),
{:ok, object} <- add_emoji_reaction_to_object(activity, object), {:ok, object} <- add_emoji_reaction_to_object(activity, object),
_ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do :ok <- maybe_federate(activity) do
{:ok, activity, object} {:ok, activity, object}
else else
@ -391,6 +399,7 @@ defp do_unreact_with_emoji(user, reaction_id, options) do
unreact_data <- make_undo_data(user, reaction_activity, activity_id), unreact_data <- make_undo_data(user, reaction_activity, activity_id),
{:ok, activity} <- insert(unreact_data, local), {:ok, activity} <- insert(unreact_data, local),
{:ok, object} <- remove_emoji_reaction_from_object(reaction_activity, object), {:ok, object} <- remove_emoji_reaction_from_object(reaction_activity, object),
_ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do :ok <- maybe_federate(activity) do
{:ok, activity, object} {:ok, activity, object}
else else
@ -413,6 +422,7 @@ defp do_unlike(actor, object, activity_id, local) do
{:ok, unlike_activity} <- insert(unlike_data, local), {:ok, unlike_activity} <- insert(unlike_data, local),
{:ok, _activity} <- Repo.delete(like_activity), {:ok, _activity} <- Repo.delete(like_activity),
{:ok, object} <- remove_like_from_object(like_activity, object), {:ok, object} <- remove_like_from_object(like_activity, object),
_ <- notify_and_stream(unlike_activity),
:ok <- maybe_federate(unlike_activity) do :ok <- maybe_federate(unlike_activity) do
{:ok, unlike_activity, like_activity, object} {:ok, unlike_activity, like_activity, object}
else else
@ -442,6 +452,7 @@ defp do_announce(user, object, activity_id, local, public) do
announce_data <- make_announce_data(user, object, activity_id, public), announce_data <- make_announce_data(user, object, activity_id, public),
{:ok, activity} <- insert(announce_data, local), {:ok, activity} <- insert(announce_data, local),
{:ok, object} <- add_announce_to_object(activity, object), {:ok, object} <- add_announce_to_object(activity, object),
_ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do :ok <- maybe_federate(activity) do
{:ok, activity, object} {:ok, activity, object}
else else
@ -468,6 +479,7 @@ defp do_unannounce(actor, object, activity_id, local) do
with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object), with %Activity{} = announce_activity <- get_existing_announce(actor.ap_id, object),
unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id), unannounce_data <- make_unannounce_data(actor, announce_activity, activity_id),
{:ok, unannounce_activity} <- insert(unannounce_data, local), {:ok, unannounce_activity} <- insert(unannounce_data, local),
_ <- notify_and_stream(unannounce_activity),
:ok <- maybe_federate(unannounce_activity), :ok <- maybe_federate(unannounce_activity),
{:ok, _activity} <- Repo.delete(announce_activity), {:ok, _activity} <- Repo.delete(announce_activity),
{:ok, object} <- remove_announce_from_object(announce_activity, object) do {:ok, object} <- remove_announce_from_object(announce_activity, object) do
@ -490,6 +502,7 @@ def follow(follower, followed, activity_id \\ nil, local \\ true) do
defp do_follow(follower, followed, activity_id, local) do defp do_follow(follower, followed, activity_id, local) do
with data <- make_follow_data(follower, followed, activity_id), with data <- make_follow_data(follower, followed, activity_id),
{:ok, activity} <- insert(data, local), {:ok, activity} <- insert(data, local),
_ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do :ok <- maybe_federate(activity) do
{:ok, activity} {:ok, activity}
else else
@ -511,6 +524,7 @@ defp do_unfollow(follower, followed, activity_id, local) do
{:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"), {:ok, follow_activity} <- update_follow_state(follow_activity, "cancelled"),
unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id), unfollow_data <- make_unfollow_data(follower, followed, follow_activity, activity_id),
{:ok, activity} <- insert(unfollow_data, local), {:ok, activity} <- insert(unfollow_data, local),
_ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do :ok <- maybe_federate(activity) do
{:ok, activity} {:ok, activity}
else else
@ -540,6 +554,7 @@ defp do_block(blocker, blocked, activity_id, local) do
with true <- outgoing_blocks, with true <- outgoing_blocks,
block_data <- make_block_data(blocker, blocked, activity_id), block_data <- make_block_data(blocker, blocked, activity_id),
{:ok, activity} <- insert(block_data, local), {:ok, activity} <- insert(block_data, local),
_ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do :ok <- maybe_federate(activity) do
{:ok, activity} {:ok, activity}
else else
@ -560,6 +575,7 @@ defp do_unblock(blocker, blocked, activity_id, local) do
with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked), with %Activity{} = block_activity <- fetch_latest_block(blocker, blocked),
unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id), unblock_data <- make_unblock_data(blocker, blocked, block_activity, activity_id),
{:ok, activity} <- insert(unblock_data, local), {:ok, activity} <- insert(unblock_data, local),
_ <- notify_and_stream(activity),
:ok <- maybe_federate(activity) do :ok <- maybe_federate(activity) do
{:ok, activity} {:ok, activity}
else else
@ -594,6 +610,7 @@ def flag(
with flag_data <- make_flag_data(params, additional), with flag_data <- make_flag_data(params, additional),
{:ok, activity} <- insert(flag_data, local), {:ok, activity} <- insert(flag_data, local),
{:ok, stripped_activity} <- strip_report_status_data(activity), {:ok, stripped_activity} <- strip_report_status_data(activity),
_ <- notify_and_stream(activity),
:ok <- maybe_federate(stripped_activity) do :ok <- maybe_federate(stripped_activity) do
User.all_superusers() User.all_superusers()
|> Enum.filter(fn user -> not is_nil(user.email) end) |> Enum.filter(fn user -> not is_nil(user.email) end)
@ -617,7 +634,8 @@ def move(%User{} = origin, %User{} = target, local \\ true) do
} }
with true <- origin.ap_id in target.also_known_as, with true <- origin.ap_id in target.also_known_as,
{:ok, activity} <- insert(params, local) do {:ok, activity} <- insert(params, local),
_ <- notify_and_stream(activity) do
maybe_federate(activity) maybe_federate(activity)
BackgroundWorker.enqueue("move_following", %{ BackgroundWorker.enqueue("move_following", %{

View file

@ -12,6 +12,11 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
@behaviour :cowboy_websocket @behaviour :cowboy_websocket
# Cowboy timeout period.
@timeout :timer.seconds(30)
# Hibernate every X messages
@hibernate_every 100
@streams [ @streams [
"public", "public",
"public:local", "public:local",
@ -25,9 +30,6 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
] ]
@anonymous_streams ["public", "public:local", "hashtag"] @anonymous_streams ["public", "public:local", "hashtag"]
# Handled by periodic keepalive in Pleroma.Web.Streamer.Ping.
@timeout :infinity
def init(%{qs: qs} = req, state) do def init(%{qs: qs} = req, state) do
with params <- :cow_qs.parse_qs(qs), with params <- :cow_qs.parse_qs(qs),
sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil), sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil),
@ -42,7 +44,7 @@ def init(%{qs: qs} = req, state) do
req req
end end
{:cowboy_websocket, req, %{user: user, topic: topic}, %{idle_timeout: @timeout}} {:cowboy_websocket, req, %{user: user, topic: topic, count: 0}, %{idle_timeout: @timeout}}
else else
{:error, code} -> {:error, code} ->
Logger.debug("#{__MODULE__} denied connection: #{inspect(code)} - #{inspect(req)}") Logger.debug("#{__MODULE__} denied connection: #{inspect(code)} - #{inspect(req)}")
@ -57,7 +59,13 @@ def init(%{qs: qs} = req, state) do
end end
def websocket_init(state) do def websocket_init(state) do
send(self(), :subscribe) Logger.debug(
"#{__MODULE__} accepted websocket connection for user #{
(state.user || %{id: "anonymous"}).id
}, topic #{state.topic}"
)
Streamer.add_socket(state.topic, state.user)
{:ok, state} {:ok, state}
end end
@ -66,19 +74,24 @@ def websocket_handle(_frame, state) do
{:ok, state} {:ok, state}
end end
def websocket_info(:subscribe, state) do def websocket_info({:render_with_user, view, template, item}, state) do
Logger.debug( user = %User{} = User.get_cached_by_ap_id(state.user.ap_id)
"#{__MODULE__} accepted websocket connection for user #{
(state.user || %{id: "anonymous"}).id
}, topic #{state.topic}"
)
Streamer.add_socket(state.topic, streamer_socket(state)) unless Streamer.filtered_by_user?(user, item) do
websocket_info({:text, view.render(template, user, item)}, %{state | user: user})
else
{:ok, state} {:ok, state}
end end
end
def websocket_info({:text, message}, state) do def websocket_info({:text, message}, state) do
{:reply, {:text, message}, state} # If the websocket processed X messages, force an hibernate/GC.
# We don't hibernate at every message to balance CPU usage/latency with RAM usage.
if state.count > @hibernate_every do
{:reply, {:text, message}, %{state | count: 0}, :hibernate}
else
{:reply, {:text, message}, %{state | count: state.count + 1}}
end
end end
def terminate(reason, _req, state) do def terminate(reason, _req, state) do
@ -88,7 +101,7 @@ def terminate(reason, _req, state) do
}, topic #{state.topic || "?"}: #{inspect(reason)}" }, topic #{state.topic || "?"}: #{inspect(reason)}"
) )
Streamer.remove_socket(state.topic, streamer_socket(state)) Streamer.remove_socket(state.topic)
:ok :ok
end end
@ -136,8 +149,4 @@ defp expand_topic("list", params) do
end end
defp expand_topic(topic, _), do: topic defp expand_topic(topic, _), do: topic
defp streamer_socket(state) do
%{transport_pid: self(), assigns: state}
end
end end

View file

@ -1,37 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Streamer.Ping do
use GenServer
require Logger
alias Pleroma.Web.Streamer.State
alias Pleroma.Web.Streamer.StreamerSocket
@keepalive_interval :timer.seconds(30)
def start_link(opts) do
ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval)
GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__)
end
def init(%{ping_interval: ping_interval} = args) do
Process.send_after(self(), :ping, ping_interval)
{:ok, args}
end
def handle_info(:ping, %{ping_interval: ping_interval} = state) do
State.get_sockets()
|> Map.values()
|> List.flatten()
|> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} ->
Logger.debug("Sending keepalive ping")
send(transport_pid, {:text, ""})
end)
Process.send_after(self(), :ping, ping_interval)
{:noreply, state}
end
end

View file

@ -1,82 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Streamer.State do
use GenServer
require Logger
alias Pleroma.Web.Streamer.StreamerSocket
@env Mix.env()
def start_link(_) do
GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__)
end
def add_socket(topic, socket) do
GenServer.call(__MODULE__, {:add, topic, socket})
end
def remove_socket(topic, socket) do
do_remove_socket(@env, topic, socket)
end
def get_sockets do
%{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state)
stream_sockets
end
def init(init_arg) do
{:ok, init_arg}
end
def handle_call(:get_state, _from, state) do
{:reply, state, state}
end
def handle_call({:add, topic, socket}, _from, %{sockets: sockets} = state) do
internal_topic = internal_topic(topic, socket)
stream_socket = StreamerSocket.from_socket(socket)
sockets_for_topic =
sockets
|> Map.get(internal_topic, [])
|> List.insert_at(0, stream_socket)
|> Enum.uniq()
state = put_in(state, [:sockets, internal_topic], sockets_for_topic)
Logger.debug("Got new conn for #{topic}")
{:reply, state, state}
end
def handle_call({:remove, topic, socket}, _from, %{sockets: sockets} = state) do
internal_topic = internal_topic(topic, socket)
stream_socket = StreamerSocket.from_socket(socket)
sockets_for_topic =
sockets
|> Map.get(internal_topic, [])
|> List.delete(stream_socket)
state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic)
{:reply, state, state}
end
defp do_remove_socket(:test, _, _) do
:ok
end
defp do_remove_socket(_env, topic, socket) do
GenServer.call(__MODULE__, {:remove, topic, socket})
end
defp internal_topic(topic, socket)
when topic in ~w[user user:notification direct] do
"#{topic}:#{socket.assigns[:user].id}"
end
defp internal_topic(topic, _) do
topic
end
end

View file

@ -3,44 +3,220 @@
# SPDX-License-Identifier: AGPL-3.0-only # SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Streamer do defmodule Pleroma.Web.Streamer do
alias Pleroma.Web.Streamer.State require Logger
alias Pleroma.Web.Streamer.Worker
alias Pleroma.Activity
alias Pleroma.Config
alias Pleroma.Conversation.Participation
alias Pleroma.Notification
alias Pleroma.Object
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.StreamerView
@timeout 60_000
@mix_env Mix.env() @mix_env Mix.env()
@registry Pleroma.Web.StreamerRegistry
def add_socket(topic, socket) do def registry, do: @registry
State.add_socket(topic, socket)
def add_socket(topic, %User{} = user) do
if should_env_send?(), do: Registry.register(@registry, user_topic(topic, user), true)
end end
def remove_socket(topic, socket) do def add_socket(topic, _) do
State.remove_socket(topic, socket) if should_env_send?(), do: Registry.register(@registry, topic, false)
end end
def get_sockets do def remove_socket(topic) do
State.get_sockets() if should_env_send?(), do: Registry.unregister(@registry, topic)
end end
def stream(topics, items) do def stream(topics, item) when is_list(topics) do
if should_send?() do if should_env_send?() do
Task.async(fn -> Enum.each(topics, fn t ->
:poolboy.transaction( spawn(fn -> do_stream(t, item) end)
:streamer_worker,
&Worker.stream(&1, topics, items),
@timeout
)
end) end)
end end
:ok
end end
def supervisor, do: Pleroma.Web.Streamer.Supervisor def stream(topic, items) when is_list(items) do
if should_env_send?() do
Enum.each(items, fn i ->
spawn(fn -> do_stream(topic, i) end)
end)
defp should_send? do :ok
handle_should_send(@mix_env) end
end end
defp handle_should_send(:test) do def stream(topic, item) do
case Process.whereis(:streamer_worker) do if should_env_send?() do
spawn(fn -> do_stream(topic, item) end)
end
:ok
end
def filtered_by_user?(%User{} = user, %Activity{} = item) do
%{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
recipients = MapSet.new(item.recipients)
domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
with parent <- Object.normalize(item) || item,
true <-
Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
true <- MapSet.disjoint?(recipients, recipient_blocks),
%{host: item_host} <- URI.parse(item.actor),
%{host: parent_host} <- URI.parse(parent.data["actor"]),
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
true <- thread_containment(item, user),
false <- CommonAPI.thread_muted?(user, item) do
false
else
_ -> true
end
end
def filtered_by_user?(%User{} = user, %Notification{activity: activity}) do
filtered_by_user?(user, activity)
end
defp do_stream("direct", item) do
recipient_topics =
User.get_recipients_from_activity(item)
|> Enum.map(fn %{id: id} -> "direct:#{id}" end)
Enum.each(recipient_topics, fn user_topic ->
Logger.debug("Trying to push direct message to #{user_topic}\n\n")
push_to_socket(user_topic, item)
end)
end
defp do_stream("participation", participation) do
user_topic = "direct:#{participation.user_id}"
Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
push_to_socket(user_topic, participation)
end
defp do_stream("list", item) do
# filter the recipient list if the activity is not public, see #270.
recipient_lists =
case Visibility.is_public?(item) do
true ->
Pleroma.List.get_lists_from_activity(item)
_ ->
Pleroma.List.get_lists_from_activity(item)
|> Enum.filter(fn list ->
owner = User.get_cached_by_id(list.user_id)
Visibility.visible_for_user?(item, owner)
end)
end
recipient_topics =
recipient_lists
|> Enum.map(fn %{id: id} -> "list:#{id}" end)
Enum.each(recipient_topics, fn list_topic ->
Logger.debug("Trying to push message to #{list_topic}\n\n")
push_to_socket(list_topic, item)
end)
end
defp do_stream(topic, %Notification{} = item)
when topic in ["user", "user:notification"] do
Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
Enum.each(list, fn {pid, _auth} ->
send(pid, {:render_with_user, StreamerView, "notification.json", item})
end)
end)
end
defp do_stream("user", item) do
Logger.debug("Trying to push to users")
recipient_topics =
User.get_recipients_from_activity(item)
|> Enum.map(fn %{id: id} -> "user:#{id}" end)
Enum.each(recipient_topics, fn topic ->
push_to_socket(topic, item)
end)
end
defp do_stream(topic, item) do
Logger.debug("Trying to push to #{topic}")
Logger.debug("Pushing item to #{topic}")
push_to_socket(topic, item)
end
defp push_to_socket(topic, %Participation{} = participation) do
rendered = StreamerView.render("conversation.json", participation)
Registry.dispatch(@registry, topic, fn list ->
Enum.each(list, fn {pid, _} ->
send(pid, {:text, rendered})
end)
end)
end
defp push_to_socket(topic, %Activity{
data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
}) do
rendered = Jason.encode!(%{event: "delete", payload: to_string(deleted_activity_id)})
Registry.dispatch(@registry, topic, fn list ->
Enum.each(list, fn {pid, _} ->
send(pid, {:text, rendered})
end)
end)
end
defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
defp push_to_socket(topic, item) do
anon_render = StreamerView.render("update.json", item)
Registry.dispatch(@registry, topic, fn list ->
Enum.each(list, fn {pid, auth?} ->
if auth? do
send(pid, {:render_with_user, StreamerView, "update.json", item})
else
send(pid, {:text, anon_render})
end
end)
end)
end
defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
defp thread_containment(activity, user) do
if Config.get([:instance, :skip_thread_containment]) do
true
else
ActivityPub.contain_activity(activity, user)
end
end
# In test environement, only return true if the registry is started.
# In benchmark environment, returns false.
# In any other environment, always returns true.
cond do
@mix_env == :test ->
def should_env_send? do
case Process.whereis(@registry) do
nil -> nil ->
false false
@ -49,7 +225,19 @@ defp handle_should_send(:test) do
end end
end end
defp handle_should_send(:benchmark), do: false @mix_env == :benchmark ->
def should_env_send?, do: false
defp handle_should_send(_), do: true true ->
def should_env_send?, do: true
end
defp user_topic(topic, user)
when topic in ~w[user user:notification direct] do
"#{topic}:#{user.id}"
end
defp user_topic(topic, _) do
topic
end
end end

View file

@ -1,35 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Streamer.StreamerSocket do
defstruct transport_pid: nil, user: nil
alias Pleroma.User
alias Pleroma.Web.Streamer.StreamerSocket
def from_socket(%{
transport_pid: transport_pid,
assigns: %{user: nil}
}) do
%StreamerSocket{
transport_pid: transport_pid
}
end
def from_socket(%{
transport_pid: transport_pid,
assigns: %{user: %User{} = user}
}) do
%StreamerSocket{
transport_pid: transport_pid,
user: user
}
end
def from_socket(%{transport_pid: transport_pid}) do
%StreamerSocket{
transport_pid: transport_pid
}
end
end

View file

@ -1,37 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Streamer.Supervisor do
use Supervisor
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(args) do
children = [
{Pleroma.Web.Streamer.State, args},
{Pleroma.Web.Streamer.Ping, args},
:poolboy.child_spec(:streamer_worker, poolboy_config())
]
opts = [strategy: :one_for_one, name: Pleroma.Web.Streamer.Supervisor]
Supervisor.init(children, opts)
end
defp poolboy_config do
opts =
Pleroma.Config.get(:streamer,
workers: 3,
overflow_workers: 2
)
[
{:name, {:local, :streamer_worker}},
{:worker_module, Pleroma.Web.Streamer.Worker},
{:size, opts[:workers]},
{:max_overflow, opts[:overflow_workers]}
]
end
end

View file

@ -1,208 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Streamer.Worker do
use GenServer
require Logger
alias Pleroma.Activity
alias Pleroma.Config
alias Pleroma.Conversation.Participation
alias Pleroma.Notification
alias Pleroma.Object
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Streamer.State
alias Pleroma.Web.Streamer.StreamerSocket
alias Pleroma.Web.StreamerView
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, [])
end
def init(init_arg) do
{:ok, init_arg}
end
def stream(pid, topics, items) do
GenServer.call(pid, {:stream, topics, items})
end
def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
Enum.each(topics, fn t ->
do_stream(%{topic: t, item: item})
end)
{:reply, state, state}
end
def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
Enum.each(items, fn i ->
do_stream(%{topic: topic, item: i})
end)
{:reply, state, state}
end
def handle_call({:stream, topic, item}, _from, state) do
do_stream(%{topic: topic, item: item})
{:reply, state, state}
end
defp do_stream(%{topic: "direct", item: item}) do
recipient_topics =
User.get_recipients_from_activity(item)
|> Enum.map(fn %{id: id} -> "direct:#{id}" end)
Enum.each(recipient_topics, fn user_topic ->
Logger.debug("Trying to push direct message to #{user_topic}\n\n")
push_to_socket(State.get_sockets(), user_topic, item)
end)
end
defp do_stream(%{topic: "participation", item: participation}) do
user_topic = "direct:#{participation.user_id}"
Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
push_to_socket(State.get_sockets(), user_topic, participation)
end
defp do_stream(%{topic: "list", item: item}) do
# filter the recipient list if the activity is not public, see #270.
recipient_lists =
case Visibility.is_public?(item) do
true ->
Pleroma.List.get_lists_from_activity(item)
_ ->
Pleroma.List.get_lists_from_activity(item)
|> Enum.filter(fn list ->
owner = User.get_cached_by_id(list.user_id)
Visibility.visible_for_user?(item, owner)
end)
end
recipient_topics =
recipient_lists
|> Enum.map(fn %{id: id} -> "list:#{id}" end)
Enum.each(recipient_topics, fn list_topic ->
Logger.debug("Trying to push message to #{list_topic}\n\n")
push_to_socket(State.get_sockets(), list_topic, item)
end)
end
defp do_stream(%{topic: topic, item: %Notification{} = item})
when topic in ["user", "user:notification"] do
State.get_sockets()
|> Map.get("#{topic}:#{item.user_id}", [])
|> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
true <- should_send?(user, item) do
send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
end
end)
end
defp do_stream(%{topic: "user", item: item}) do
Logger.debug("Trying to push to users")
recipient_topics =
User.get_recipients_from_activity(item)
|> Enum.map(fn %{id: id} -> "user:#{id}" end)
Enum.each(recipient_topics, fn topic ->
push_to_socket(State.get_sockets(), topic, item)
end)
end
defp do_stream(%{topic: topic, item: item}) do
Logger.debug("Trying to push to #{topic}")
Logger.debug("Pushing item to #{topic}")
push_to_socket(State.get_sockets(), topic, item)
end
defp should_send?(%User{} = user, %Activity{} = item) do
%{block: blocked_ap_ids, mute: muted_ap_ids, reblog_mute: reblog_muted_ap_ids} =
User.outgoing_relationships_ap_ids(user, [:block, :mute, :reblog_mute])
recipient_blocks = MapSet.new(blocked_ap_ids ++ muted_ap_ids)
recipients = MapSet.new(item.recipients)
domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.domain_blocks)
with parent <- Object.normalize(item) || item,
true <-
Enum.all?([blocked_ap_ids, muted_ap_ids], &(item.actor not in &1)),
true <- item.data["type"] != "Announce" || item.actor not in reblog_muted_ap_ids,
true <- Enum.all?([blocked_ap_ids, muted_ap_ids], &(parent.data["actor"] not in &1)),
true <- MapSet.disjoint?(recipients, recipient_blocks),
%{host: item_host} <- URI.parse(item.actor),
%{host: parent_host} <- URI.parse(parent.data["actor"]),
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
true <- thread_containment(item, user),
false <- CommonAPI.thread_muted?(user, item) do
true
else
_ -> false
end
end
defp should_send?(%User{} = user, %Notification{activity: activity}) do
should_send?(user, activity)
end
def push_to_socket(topics, topic, %Participation{} = participation) do
Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
end)
end
def push_to_socket(topics, topic, %Activity{
data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
}) do
Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
send(
transport_pid,
{:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
)
end)
end
def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
def push_to_socket(topics, topic, item) do
Enum.each(topics[topic] || [], fn %StreamerSocket{
transport_pid: transport_pid,
user: socket_user
} ->
# Get the current user so we have up-to-date blocks etc.
if socket_user do
user = User.get_cached_by_ap_id(socket_user.ap_id)
if should_send?(user, item) do
send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
end
else
send(transport_pid, {:text, StreamerView.render("update.json", item)})
end
end)
end
@spec thread_containment(Activity.t(), User.t()) :: boolean()
defp thread_containment(_activity, %User{skip_thread_containment: true}), do: true
defp thread_containment(activity, user) do
if Config.get([:instance, :skip_thread_containment]) do
true
else
ActivityPub.contain_activity(activity, user)
end
end
end

View file

@ -12,17 +12,14 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
alias Pleroma.Web.OAuth alias Pleroma.Web.OAuth
@moduletag needs_streamer: true, capture_log: true
@path Pleroma.Web.Endpoint.url() @path Pleroma.Web.Endpoint.url()
|> URI.parse() |> URI.parse()
|> Map.put(:scheme, "ws") |> Map.put(:scheme, "ws")
|> Map.put(:path, "/api/v1/streaming") |> Map.put(:path, "/api/v1/streaming")
|> URI.to_string() |> URI.to_string()
setup_all do
start_supervised(Pleroma.Web.Streamer.supervisor())
:ok
end
def start_socket(qs \\ nil, headers \\ []) do def start_socket(qs \\ nil, headers \\ []) do
path = path =
case qs do case qs do

View file

@ -162,14 +162,18 @@ test "does not create a notification for subscribed users if status is a reply"
@tag needs_streamer: true @tag needs_streamer: true
test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do
user = insert(:user) user = insert(:user)
task = Task.async(fn -> assert_receive {:text, _}, 4_000 end)
task_user_notification = Task.async(fn -> assert_receive {:text, _}, 4_000 end)
Streamer.add_socket("user", %{transport_pid: task.pid, assigns: %{user: user}})
Streamer.add_socket( task =
"user:notification", Task.async(fn ->
%{transport_pid: task_user_notification.pid, assigns: %{user: user}} Streamer.add_socket("user", user)
) assert_receive {:render_with_user, _, _, _}, 4_000
end)
task_user_notification =
Task.async(fn ->
Streamer.add_socket("user:notification", user)
assert_receive {:render_with_user, _, _, _}, 4_000
end)
activity = insert(:note_activity) activity = insert(:note_activity)

View file

@ -21,7 +21,15 @@ def build(data \\ %{}, opts \\ %{}) do
def insert(data \\ %{}, opts \\ %{}) do def insert(data \\ %{}, opts \\ %{}) do
activity = build(data, opts) activity = build(data, opts)
ActivityPub.insert(activity)
case ActivityPub.insert(activity) do
ok = {:ok, activity} ->
ActivityPub.notify_and_stream(activity)
ok
error ->
error
end
end end
def insert_list(times, data \\ %{}, opts \\ %{}) do def insert_list(times, data \\ %{}, opts \\ %{}) do

View file

@ -139,7 +139,11 @@ defp ensure_federating_or_authenticated(conn, url, user) do
end end
if tags[:needs_streamer] do if tags[:needs_streamer] do
start_supervised(Pleroma.Web.Streamer.supervisor()) start_supervised(%{
id: Pleroma.Web.Streamer.registry(),
start:
{Registry, :start_link, [[keys: :duplicate, name: Pleroma.Web.Streamer.registry()]]}
})
end end
{:ok, conn: Phoenix.ConnTest.build_conn()} {:ok, conn: Phoenix.ConnTest.build_conn()}

View file

@ -40,7 +40,11 @@ defmodule Pleroma.DataCase do
end end
if tags[:needs_streamer] do if tags[:needs_streamer] do
start_supervised(Pleroma.Web.Streamer.supervisor()) start_supervised(%{
id: Pleroma.Web.Streamer.registry(),
start:
{Registry, :start_link, [[keys: :duplicate, name: Pleroma.Web.Streamer.registry()]]}
})
end end
:ok :ok

View file

@ -1,36 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.PingTest do
use Pleroma.DataCase
import Pleroma.Factory
alias Pleroma.Web.Streamer
setup do
start_supervised({Streamer.supervisor(), [ping_interval: 30]})
:ok
end
describe "sockets" do
setup do
user = insert(:user)
{:ok, %{user: user}}
end
test "it sends pings", %{user: user} do
task =
Task.async(fn ->
assert_receive {:text, received_event}, 40
assert_receive {:text, received_event}, 40
assert_receive {:text, received_event}, 40
end)
Streamer.add_socket("public", %{transport_pid: task.pid, assigns: %{user: user}})
Task.await(task)
end
end
end

View file

@ -1,54 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.StateTest do
use Pleroma.DataCase
import Pleroma.Factory
alias Pleroma.Web.Streamer
alias Pleroma.Web.Streamer.StreamerSocket
@moduletag needs_streamer: true
describe "sockets" do
setup do
user = insert(:user)
user2 = insert(:user)
{:ok, %{user: user, user2: user2}}
end
test "it can add a socket", %{user: user} do
Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}})
assert(%{"public" => [%StreamerSocket{transport_pid: 1}]} = Streamer.get_sockets())
end
test "it can add multiple sockets per user", %{user: user} do
Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}})
Streamer.add_socket("public", %{transport_pid: 2, assigns: %{user: user}})
assert(
%{
"public" => [
%StreamerSocket{transport_pid: 2},
%StreamerSocket{transport_pid: 1}
]
} = Streamer.get_sockets()
)
end
test "it will not add a duplicate socket", %{user: user} do
Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}})
Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}})
assert(
%{
"activity" => [
%StreamerSocket{transport_pid: 1}
]
} = Streamer.get_sockets()
)
end
end
end

View file

@ -12,13 +12,9 @@ defmodule Pleroma.Web.StreamerTest do
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Streamer alias Pleroma.Web.Streamer
alias Pleroma.Web.Streamer.StreamerSocket
alias Pleroma.Web.Streamer.Worker
@moduletag needs_streamer: true, capture_log: true @moduletag needs_streamer: true, capture_log: true
@streamer_timeout 150
@streamer_start_wait 10
setup do: clear_config([:instance, :skip_thread_containment]) setup do: clear_config([:instance, :skip_thread_containment])
describe "user streams" do describe "user streams" do
@ -29,69 +25,35 @@ defmodule Pleroma.Web.StreamerTest do
end end
test "it streams the user's post in the 'user' stream", %{user: user} do test "it streams the user's post in the 'user' stream", %{user: user} do
task = Streamer.add_socket("user", user)
Task.async(fn ->
assert_receive {:text, _}, @streamer_timeout
end)
Streamer.add_socket(
"user",
%{transport_pid: task.pid, assigns: %{user: user}}
)
{:ok, activity} = CommonAPI.post(user, %{"status" => "hey"}) {:ok, activity} = CommonAPI.post(user, %{"status" => "hey"})
assert_receive {:render_with_user, _, _, ^activity}
Streamer.stream("user", activity) refute Streamer.filtered_by_user?(user, activity)
Task.await(task)
end end
test "it streams boosts of the user in the 'user' stream", %{user: user} do test "it streams boosts of the user in the 'user' stream", %{user: user} do
task = Streamer.add_socket("user", user)
Task.async(fn ->
assert_receive {:text, _}, @streamer_timeout
end)
Streamer.add_socket(
"user",
%{transport_pid: task.pid, assigns: %{user: user}}
)
other_user = insert(:user) other_user = insert(:user)
{:ok, activity} = CommonAPI.post(other_user, %{"status" => "hey"}) {:ok, activity} = CommonAPI.post(other_user, %{"status" => "hey"})
{:ok, announce, _} = CommonAPI.repeat(activity.id, user) {:ok, announce, _} = CommonAPI.repeat(activity.id, user)
Streamer.stream("user", announce) assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce}
Task.await(task) refute Streamer.filtered_by_user?(user, announce)
end end
test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
task = Streamer.add_socket("user", user)
Task.async(fn ->
assert_receive {:text, _}, @streamer_timeout
end)
Streamer.add_socket(
"user",
%{transport_pid: task.pid, assigns: %{user: user}}
)
Streamer.stream("user", notify) Streamer.stream("user", notify)
Task.await(task) assert_receive {:render_with_user, _, _, ^notify}
refute Streamer.filtered_by_user?(user, notify)
end end
test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do
task = Streamer.add_socket("user:notification", user)
Task.async(fn ->
assert_receive {:text, _}, @streamer_timeout
end)
Streamer.add_socket(
"user:notification",
%{transport_pid: task.pid, assigns: %{user: user}}
)
Streamer.stream("user:notification", notify) Streamer.stream("user:notification", notify)
Task.await(task) assert_receive {:render_with_user, _, _, ^notify}
refute Streamer.filtered_by_user?(user, notify)
end end
test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{ test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
@ -100,18 +62,12 @@ test "it doesn't send notify to the 'user:notification' stream when a user is bl
blocked = insert(:user) blocked = insert(:user)
{:ok, _user_relationship} = User.block(user, blocked) {:ok, _user_relationship} = User.block(user, blocked)
task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end) Streamer.add_socket("user:notification", user)
Streamer.add_socket(
"user:notification",
%{transport_pid: task.pid, assigns: %{user: user}}
)
{:ok, activity} = CommonAPI.post(user, %{"status" => ":("}) {:ok, activity} = CommonAPI.post(user, %{"status" => ":("})
{:ok, notif} = CommonAPI.favorite(blocked, activity.id) {:ok, _} = CommonAPI.favorite(blocked, activity.id)
Streamer.stream("user:notification", notif) refute_receive _
Task.await(task)
end end
test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{ test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
@ -119,45 +75,50 @@ test "it doesn't send notify to the 'user:notification' stream when a thread is
} do } do
user2 = insert(:user) user2 = insert(:user)
task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
Streamer.add_socket(
"user:notification",
%{transport_pid: task.pid, assigns: %{user: user}}
)
{:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"}) {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
{:ok, activity} = CommonAPI.add_mute(user, activity) {:ok, _} = CommonAPI.add_mute(user, activity)
{:ok, notif} = CommonAPI.favorite(user2, activity.id)
Streamer.stream("user:notification", notif) Streamer.add_socket("user:notification", user)
Task.await(task)
{:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
refute_receive _
assert Streamer.filtered_by_user?(user, favorite_activity)
end end
test "it doesn't send notify to the 'user:notification' stream' when a domain is blocked", %{ test "it sends favorite to 'user:notification' stream'", %{
user: user user: user
} do } do
user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"}) user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end) {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
Streamer.add_socket("user:notification", user)
{:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
Streamer.add_socket( assert_receive {:render_with_user, _, "notification.json", notif}
"user:notification", assert notif.activity.id == favorite_activity.id
%{transport_pid: task.pid, assigns: %{user: user}} refute Streamer.filtered_by_user?(user, notif)
) end
test "it doesn't send the 'user:notification' stream' when a domain is blocked", %{
user: user
} do
user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
{:ok, user} = User.block_domain(user, "hecking-lewd-place.com") {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
{:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"}) {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
{:ok, notif} = CommonAPI.favorite(user2, activity.id) Streamer.add_socket("user:notification", user)
{:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
Streamer.stream("user:notification", notif) refute_receive _
Task.await(task) assert Streamer.filtered_by_user?(user, favorite_activity)
end end
test "it sends follow activities to the 'user:notification' stream", %{ test "it sends follow activities to the 'user:notification' stream", %{
user: user user: user
} do } do
user_url = user.ap_id user_url = user.ap_id
user2 = insert(:user)
body = body =
File.read!("test/fixtures/users_mock/localhost.json") File.read!("test/fixtures/users_mock/localhost.json")
@ -169,47 +130,24 @@ test "it sends follow activities to the 'user:notification' stream", %{
%Tesla.Env{status: 200, body: body} %Tesla.Env{status: 200, body: body}
end) end)
user2 = insert(:user) Streamer.add_socket("user:notification", user)
task = Task.async(fn -> assert_receive {:text, _}, @streamer_timeout end) {:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user)
Process.sleep(@streamer_start_wait) assert_receive {:render_with_user, _, "notification.json", notif}
assert notif.activity.id == follow_activity.id
Streamer.add_socket( refute Streamer.filtered_by_user?(user, notif)
"user:notification",
%{transport_pid: task.pid, assigns: %{user: user}}
)
{:ok, _follower, _followed, _activity} = CommonAPI.follow(user2, user)
# We don't directly pipe the notification to the streamer as it's already
# generated as a side effect of CommonAPI.follow().
Task.await(task)
end end
end end
test "it sends to public" do test "it sends to public authenticated" do
user = insert(:user) user = insert(:user)
other_user = insert(:user) other_user = insert(:user)
task = Streamer.add_socket("public", other_user)
Task.async(fn ->
assert_receive {:text, _}, @streamer_timeout
end)
fake_socket = %StreamerSocket{ {:ok, activity} = CommonAPI.post(user, %{"status" => "Test"})
transport_pid: task.pid, assert_receive {:render_with_user, _, _, ^activity}
user: user refute Streamer.filtered_by_user?(user, activity)
}
{:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
topics = %{
"public" => [fake_socket]
}
Worker.push_to_socket(topics, "public", activity)
Task.await(task)
end end
test "works for deletions" do test "works for deletions" do
@ -217,37 +155,32 @@ test "works for deletions" do
other_user = insert(:user) other_user = insert(:user)
{:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"}) {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
task = Streamer.add_socket("public", user)
Task.async(fn ->
expected_event =
%{
"event" => "delete",
"payload" => activity.id
}
|> Jason.encode!()
assert_receive {:text, received_event}, @streamer_timeout {:ok, _} = CommonAPI.delete(activity.id, other_user)
assert received_event == expected_event activity_id = activity.id
end) assert_receive {:text, event}
assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
end
fake_socket = %StreamerSocket{ test "it sends to public unauthenticated" do
transport_pid: task.pid, user = insert(:user)
user: user
}
{:ok, activity} = CommonAPI.delete(activity.id, other_user) Streamer.add_socket("public", nil)
topics = %{ {:ok, activity} = CommonAPI.post(user, %{"status" => "Test"})
"public" => [fake_socket] activity_id = activity.id
} assert_receive {:text, event}
assert %{"event" => "update", "payload" => payload} = Jason.decode!(event)
assert %{"id" => ^activity_id} = Jason.decode!(payload)
Worker.push_to_socket(topics, "public", activity) {:ok, _} = CommonAPI.delete(activity.id, user)
assert_receive {:text, event}
Task.await(task) assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
end end
describe "thread_containment" do describe "thread_containment" do
test "it doesn't send to user if recipients invalid and thread containment is enabled" do test "it filters to user if recipients invalid and thread containment is enabled" do
Pleroma.Config.put([:instance, :skip_thread_containment], false) Pleroma.Config.put([:instance, :skip_thread_containment], false)
author = insert(:user) author = insert(:user)
user = insert(:user) user = insert(:user)
@ -262,12 +195,10 @@ test "it doesn't send to user if recipients invalid and thread containment is en
) )
) )
task = Task.async(fn -> refute_receive {:text, _}, 1_000 end) Streamer.add_socket("public", user)
fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} Streamer.stream("public", activity)
topics = %{"public" => [fake_socket]} assert_receive {:render_with_user, _, _, ^activity}
Worker.push_to_socket(topics, "public", activity) assert Streamer.filtered_by_user?(user, activity)
Task.await(task)
end end
test "it sends message if recipients invalid and thread containment is disabled" do test "it sends message if recipients invalid and thread containment is disabled" do
@ -285,12 +216,11 @@ test "it sends message if recipients invalid and thread containment is disabled"
) )
) )
task = Task.async(fn -> assert_receive {:text, _}, 1_000 end) Streamer.add_socket("public", user)
fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} Streamer.stream("public", activity)
topics = %{"public" => [fake_socket]}
Worker.push_to_socket(topics, "public", activity)
Task.await(task) assert_receive {:render_with_user, _, _, ^activity}
refute Streamer.filtered_by_user?(user, activity)
end end
test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do test "it sends message if recipients invalid and thread containment is enabled but user's thread containment is disabled" do
@ -308,79 +238,53 @@ test "it sends message if recipients invalid and thread containment is enabled b
) )
) )
task = Task.async(fn -> assert_receive {:text, _}, 1_000 end) Streamer.add_socket("public", user)
fake_socket = %StreamerSocket{transport_pid: task.pid, user: user} Streamer.stream("public", activity)
topics = %{"public" => [fake_socket]}
Worker.push_to_socket(topics, "public", activity)
Task.await(task) assert_receive {:render_with_user, _, _, ^activity}
refute Streamer.filtered_by_user?(user, activity)
end end
end end
describe "blocks" do describe "blocks" do
test "it doesn't send messages involving blocked users" do test "it filters messages involving blocked users" do
user = insert(:user) user = insert(:user)
blocked_user = insert(:user) blocked_user = insert(:user)
{:ok, _user_relationship} = User.block(user, blocked_user) {:ok, _user_relationship} = User.block(user, blocked_user)
Streamer.add_socket("public", user)
{:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"}) {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
assert_receive {:render_with_user, _, _, ^activity}
task = assert Streamer.filtered_by_user?(user, activity)
Task.async(fn ->
refute_receive {:text, _}, 1_000
end)
fake_socket = %StreamerSocket{
transport_pid: task.pid,
user: user
}
topics = %{
"public" => [fake_socket]
}
Worker.push_to_socket(topics, "public", activity)
Task.await(task)
end end
test "it doesn't send messages transitively involving blocked users" do test "it filters messages transitively involving blocked users" do
blocker = insert(:user) blocker = insert(:user)
blockee = insert(:user) blockee = insert(:user)
friend = insert(:user) friend = insert(:user)
task = Streamer.add_socket("public", blocker)
Task.async(fn ->
refute_receive {:text, _}, 1_000
end)
fake_socket = %StreamerSocket{
transport_pid: task.pid,
user: blocker
}
topics = %{
"public" => [fake_socket]
}
{:ok, _user_relationship} = User.block(blocker, blockee) {:ok, _user_relationship} = User.block(blocker, blockee)
{:ok, activity_one} = CommonAPI.post(friend, %{"status" => "hey! @#{blockee.nickname}"}) {:ok, activity_one} = CommonAPI.post(friend, %{"status" => "hey! @#{blockee.nickname}"})
Worker.push_to_socket(topics, "public", activity_one) assert_receive {:render_with_user, _, _, ^activity_one}
assert Streamer.filtered_by_user?(blocker, activity_one)
{:ok, activity_two} = CommonAPI.post(blockee, %{"status" => "hey! @#{friend.nickname}"}) {:ok, activity_two} = CommonAPI.post(blockee, %{"status" => "hey! @#{friend.nickname}"})
Worker.push_to_socket(topics, "public", activity_two) assert_receive {:render_with_user, _, _, ^activity_two}
assert Streamer.filtered_by_user?(blocker, activity_two)
{:ok, activity_three} = CommonAPI.post(blockee, %{"status" => "hey! @#{blocker.nickname}"}) {:ok, activity_three} = CommonAPI.post(blockee, %{"status" => "hey! @#{blocker.nickname}"})
Worker.push_to_socket(topics, "public", activity_three) assert_receive {:render_with_user, _, _, ^activity_three}
assert Streamer.filtered_by_user?(blocker, activity_three)
Task.await(task)
end end
end end
describe "lists" do
test "it doesn't send unwanted DMs to list" do test "it doesn't send unwanted DMs to list" do
user_a = insert(:user) user_a = insert(:user)
user_b = insert(:user) user_b = insert(:user)
@ -391,29 +295,15 @@ test "it doesn't send unwanted DMs to list" do
{:ok, list} = List.create("Test", user_a) {:ok, list} = List.create("Test", user_a)
{:ok, list} = List.follow(list, user_b) {:ok, list} = List.follow(list, user_b)
{:ok, activity} = Streamer.add_socket("list:#{list.id}", user_a)
{:ok, _activity} =
CommonAPI.post(user_b, %{ CommonAPI.post(user_b, %{
"status" => "@#{user_c.nickname} Test", "status" => "@#{user_c.nickname} Test",
"visibility" => "direct" "visibility" => "direct"
}) })
task = refute_receive _
Task.async(fn ->
refute_receive {:text, _}, 1_000
end)
fake_socket = %StreamerSocket{
transport_pid: task.pid,
user: user_a
}
topics = %{
"list:#{list.id}" => [fake_socket]
}
Worker.handle_call({:stream, "list", activity}, self(), topics)
Task.await(task)
end end
test "it doesn't send unwanted private posts to list" do test "it doesn't send unwanted private posts to list" do
@ -423,29 +313,15 @@ test "it doesn't send unwanted private posts to list" do
{:ok, list} = List.create("Test", user_a) {:ok, list} = List.create("Test", user_a)
{:ok, list} = List.follow(list, user_b) {:ok, list} = List.follow(list, user_b)
{:ok, activity} = Streamer.add_socket("list:#{list.id}", user_a)
{:ok, _activity} =
CommonAPI.post(user_b, %{ CommonAPI.post(user_b, %{
"status" => "Test", "status" => "Test",
"visibility" => "private" "visibility" => "private"
}) })
task = refute_receive _
Task.async(fn ->
refute_receive {:text, _}, 1_000
end)
fake_socket = %StreamerSocket{
transport_pid: task.pid,
user: user_a
}
topics = %{
"list:#{list.id}" => [fake_socket]
}
Worker.handle_call({:stream, "list", activity}, self(), topics)
Task.await(task)
end end
test "it sends wanted private posts to list" do test "it sends wanted private posts to list" do
@ -457,106 +333,73 @@ test "it sends wanted private posts to list" do
{:ok, list} = List.create("Test", user_a) {:ok, list} = List.create("Test", user_a)
{:ok, list} = List.follow(list, user_b) {:ok, list} = List.follow(list, user_b)
Streamer.add_socket("list:#{list.id}", user_a)
{:ok, activity} = {:ok, activity} =
CommonAPI.post(user_b, %{ CommonAPI.post(user_b, %{
"status" => "Test", "status" => "Test",
"visibility" => "private" "visibility" => "private"
}) })
task = assert_receive {:render_with_user, _, _, ^activity}
Task.async(fn -> refute Streamer.filtered_by_user?(user_a, activity)
assert_receive {:text, _}, 1_000 end
end)
fake_socket = %StreamerSocket{
transport_pid: task.pid,
user: user_a
}
Streamer.add_socket(
"list:#{list.id}",
fake_socket
)
Worker.handle_call({:stream, "list", activity}, self(), %{})
Task.await(task)
end end
test "it doesn't send muted reblogs" do describe "muted reblogs" do
test "it filters muted reblogs" do
user1 = insert(:user) user1 = insert(:user)
user2 = insert(:user) user2 = insert(:user)
user3 = insert(:user) user3 = insert(:user)
CommonAPI.follow(user1, user2)
CommonAPI.hide_reblogs(user1, user2) CommonAPI.hide_reblogs(user1, user2)
{:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"}) {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
Streamer.add_socket("user", user1)
{:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2) {:ok, announce_activity, _} = CommonAPI.repeat(create_activity.id, user2)
assert_receive {:render_with_user, _, _, ^announce_activity}
task = assert Streamer.filtered_by_user?(user1, announce_activity)
Task.async(fn ->
refute_receive {:text, _}, 1_000
end)
fake_socket = %StreamerSocket{
transport_pid: task.pid,
user: user1
}
topics = %{
"public" => [fake_socket]
}
Worker.push_to_socket(topics, "public", announce_activity)
Task.await(task)
end end
test "it does send non-reblog notification for reblog-muted actors" do test "it filters reblog notification for reblog-muted actors" do
user1 = insert(:user) user1 = insert(:user)
user2 = insert(:user) user2 = insert(:user)
user3 = insert(:user) CommonAPI.follow(user1, user2)
CommonAPI.hide_reblogs(user1, user2) CommonAPI.hide_reblogs(user1, user2)
{:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"}) {:ok, create_activity} = CommonAPI.post(user1, %{"status" => "I'm kawen"})
{:ok, favorite_activity} = CommonAPI.favorite(user2, create_activity.id) Streamer.add_socket("user", user1)
{:ok, _favorite_activity, _} = CommonAPI.repeat(create_activity.id, user2)
task = assert_receive {:render_with_user, _, "notification.json", notif}
Task.async(fn -> assert Streamer.filtered_by_user?(user1, notif)
assert_receive {:text, _}, 1_000
end)
fake_socket = %StreamerSocket{
transport_pid: task.pid,
user: user1
}
topics = %{
"public" => [fake_socket]
}
Worker.push_to_socket(topics, "public", favorite_activity)
Task.await(task)
end end
test "it doesn't send posts from muted threads" do test "it send non-reblog notification for reblog-muted actors" do
user1 = insert(:user)
user2 = insert(:user)
CommonAPI.follow(user1, user2)
CommonAPI.hide_reblogs(user1, user2)
{:ok, create_activity} = CommonAPI.post(user1, %{"status" => "I'm kawen"})
Streamer.add_socket("user", user1)
{:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id)
assert_receive {:render_with_user, _, "notification.json", notif}
refute Streamer.filtered_by_user?(user1, notif)
end
end
test "it filters posts from muted threads" do
user = insert(:user) user = insert(:user)
user2 = insert(:user) user2 = insert(:user)
Streamer.add_socket("user", user2)
{:ok, user2, user, _activity} = CommonAPI.follow(user2, user) {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
{:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"}) {:ok, activity} = CommonAPI.post(user, %{"status" => "super hot take"})
{:ok, _} = CommonAPI.add_mute(user2, activity)
{:ok, activity} = CommonAPI.add_mute(user2, activity) assert_receive {:render_with_user, _, _, ^activity}
assert Streamer.filtered_by_user?(user2, activity)
task = Task.async(fn -> refute_receive {:text, _}, @streamer_timeout end)
Streamer.add_socket(
"user",
%{transport_pid: task.pid, assigns: %{user: user2}}
)
Streamer.stream("user", activity)
Task.await(task)
end end
describe "direct streams" do describe "direct streams" do
@ -568,22 +411,7 @@ test "it sends conversation update to the 'direct' stream", %{} do
user = insert(:user) user = insert(:user)
another_user = insert(:user) another_user = insert(:user)
task = Streamer.add_socket("direct", user)
Task.async(fn ->
assert_receive {:text, received_event}, @streamer_timeout
assert %{"event" => "conversation", "payload" => received_payload} =
Jason.decode!(received_event)
assert %{"last_status" => last_status} = Jason.decode!(received_payload)
[participation] = Participation.for_user(user)
assert last_status["pleroma"]["direct_conversation_id"] == participation.id
end)
Streamer.add_socket(
"direct",
%{transport_pid: task.pid, assigns: %{user: user}}
)
{:ok, _create_activity} = {:ok, _create_activity} =
CommonAPI.post(another_user, %{ CommonAPI.post(another_user, %{
@ -591,42 +419,47 @@ test "it sends conversation update to the 'direct' stream", %{} do
"visibility" => "direct" "visibility" => "direct"
}) })
Task.await(task) assert_receive {:text, received_event}
assert %{"event" => "conversation", "payload" => received_payload} =
Jason.decode!(received_event)
assert %{"last_status" => last_status} = Jason.decode!(received_payload)
[participation] = Participation.for_user(user)
assert last_status["pleroma"]["direct_conversation_id"] == participation.id
end end
test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted" do test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted" do
user = insert(:user) user = insert(:user)
another_user = insert(:user) another_user = insert(:user)
Streamer.add_socket("direct", user)
{:ok, create_activity} = {:ok, create_activity} =
CommonAPI.post(another_user, %{ CommonAPI.post(another_user, %{
"status" => "hi @#{user.nickname}", "status" => "hi @#{user.nickname}",
"visibility" => "direct" "visibility" => "direct"
}) })
task = create_activity_id = create_activity.id
Task.async(fn -> assert_receive {:render_with_user, _, _, ^create_activity}
assert_receive {:text, received_event}, @streamer_timeout assert_receive {:text, received_conversation1}
assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event) assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
refute_receive {:text, _}, @streamer_timeout {:ok, _} = CommonAPI.delete(create_activity_id, another_user)
end)
Process.sleep(@streamer_start_wait) assert_receive {:text, received_event}
Streamer.add_socket( assert %{"event" => "delete", "payload" => ^create_activity_id} =
"direct", Jason.decode!(received_event)
%{transport_pid: task.pid, assigns: %{user: user}}
)
{:ok, _} = CommonAPI.delete(create_activity.id, another_user) refute_receive _
Task.await(task)
end end
test "it sends conversation update to the 'direct' stream when a message is deleted" do test "it sends conversation update to the 'direct' stream when a message is deleted" do
user = insert(:user) user = insert(:user)
another_user = insert(:user) another_user = insert(:user)
Streamer.add_socket("direct", user)
{:ok, create_activity} = {:ok, create_activity} =
CommonAPI.post(another_user, %{ CommonAPI.post(another_user, %{
@ -636,35 +469,30 @@ test "it sends conversation update to the 'direct' stream when a message is dele
{:ok, create_activity2} = {:ok, create_activity2} =
CommonAPI.post(another_user, %{ CommonAPI.post(another_user, %{
"status" => "hi @#{user.nickname}", "status" => "hi @#{user.nickname} 2",
"in_reply_to_status_id" => create_activity.id, "in_reply_to_status_id" => create_activity.id,
"visibility" => "direct" "visibility" => "direct"
}) })
task = assert_receive {:render_with_user, _, _, ^create_activity}
Task.async(fn -> assert_receive {:render_with_user, _, _, ^create_activity2}
assert_receive {:text, received_event}, @streamer_timeout assert_receive {:text, received_conversation1}
assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
assert_receive {:text, received_conversation1}
assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
{:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
assert_receive {:text, received_event}
assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event) assert %{"event" => "delete", "payload" => _} = Jason.decode!(received_event)
assert_receive {:text, received_event}, @streamer_timeout assert_receive {:text, received_event}
assert %{"event" => "conversation", "payload" => received_payload} = assert %{"event" => "conversation", "payload" => received_payload} =
Jason.decode!(received_event) Jason.decode!(received_event)
assert %{"last_status" => last_status} = Jason.decode!(received_payload) assert %{"last_status" => last_status} = Jason.decode!(received_payload)
assert last_status["id"] == to_string(create_activity.id) assert last_status["id"] == to_string(create_activity.id)
end)
Process.sleep(@streamer_start_wait)
Streamer.add_socket(
"direct",
%{transport_pid: task.pid, assigns: %{user: user}}
)
{:ok, _} = CommonAPI.delete(create_activity2.id, another_user)
Task.await(task)
end end
end end
end end