Merge branch 'bugfix/widen-streamer-blocks-for-1.1' into 'maint/1.1'

widen streaming API blocks (for 1.1)

See merge request pleroma/pleroma!1785
This commit is contained in:
kaniini 2019-10-04 17:39:28 +00:00
commit ca6f1644aa
27 changed files with 1030 additions and 495 deletions

4
.gitignore vendored
View file

@ -43,3 +43,7 @@ docs/generated_config.md
# Code test coverage # Code test coverage
/cover /cover
/Elixir.*.coverdata /Elixir.*.coverdata
.idea
pleroma.iml

View file

@ -69,6 +69,7 @@ Fixed:
- ActivityPub: Deactivated user deletion - ActivityPub: Deactivated user deletion
- ActivityPub: Fix `/users/:nickname/inbox` crashing without an authenticated user - ActivityPub: Fix `/users/:nickname/inbox` crashing without an authenticated user
- MRF: fix ability to follow a relay when AntiFollowbotPolicy was enabled - MRF: fix ability to follow a relay when AntiFollowbotPolicy was enabled
- Mastodon API: Blocks are now treated consistently between the Streaming API and the Timeline APIs
### Added ### Added
- Expiring/ephemeral activites. All activities can have expires_at value set, which controls when they should be deleted automatically. - Expiring/ephemeral activites. All activities can have expires_at value set, which controls when they should be deleted automatically.

View file

@ -313,6 +313,10 @@
follow_handshake_timeout: 500, follow_handshake_timeout: 500,
sign_object_fetches: true sign_object_fetches: true
config :pleroma, :streamer,
workers: 3,
overflow_workers: 2
config :pleroma, :user, deny_follow_blocked: true config :pleroma, :user, deny_follow_blocked: true
config :pleroma, :mrf_normalize_markup, scrub_policy: Pleroma.HTML.Scrubber.Default config :pleroma, :mrf_normalize_markup, scrub_policy: Pleroma.HTML.Scrubber.Default

View file

@ -0,0 +1,63 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Activity.Ir.Topics do
alias Pleroma.Object
alias Pleroma.Web.ActivityPub.Visibility
def get_activity_topics(activity) do
activity
|> Object.normalize()
|> generate_topics(activity)
|> List.flatten()
end
defp generate_topics(%{data: %{"type" => "Answer"}}, _) do
[]
end
defp generate_topics(object, activity) do
["user", "list"] ++ visibility_tags(object, activity)
end
defp visibility_tags(object, activity) do
case Visibility.get_visibility(activity) do
"public" ->
if activity.local do
["public", "public:local"]
else
["public"]
end
|> item_creation_tags(object, activity)
"direct" ->
["direct"]
_ ->
[]
end
end
defp item_creation_tags(tags, %{data: %{"type" => "Create"}} = object, activity) do
tags ++ hashtags_to_topics(object) ++ attachment_topics(object, activity)
end
defp item_creation_tags(tags, _, _) do
tags
end
defp hashtags_to_topics(%{data: %{"tag" => tags}}) do
tags
|> Enum.filter(&is_bitstring(&1))
|> Enum.map(fn tag -> "hashtag:" <> tag end)
end
defp hashtags_to_topics(_), do: []
defp attachment_topics(%{data: %{"attachment" => []}}, _act), do: []
defp attachment_topics(_object, %{local: true}), do: ["public:media", "public:local:media"]
defp attachment_topics(_object, _act), do: ["public:media"]
end

View file

@ -42,23 +42,9 @@ def start(_type, _args) do
hackney_pool_children() ++ hackney_pool_children() ++
[ [
Pleroma.Web.Federator.RetryQueue, Pleroma.Web.Federator.RetryQueue,
Pleroma.Stats, Pleroma.Stats
%{
id: :web_push_init,
start: {Task, :start_link, [&Pleroma.Web.Push.init/0]},
restart: :temporary
},
%{
id: :federator_init,
start: {Task, :start_link, [&Pleroma.Web.Federator.init/0]},
restart: :temporary
},
%{
id: :internal_fetch_init,
start: {Task, :start_link, [&Pleroma.Web.ActivityPub.InternalFetchActor.init/0]},
restart: :temporary
}
] ++ ] ++
task_children(@env) ++
oauth_cleanup_child(oauth_cleanup_enabled?()) ++ oauth_cleanup_child(oauth_cleanup_enabled?()) ++
streamer_child(@env) ++ streamer_child(@env) ++
chat_child(@env, chat_enabled?()) ++ chat_child(@env, chat_enabled?()) ++
@ -70,9 +56,7 @@ def start(_type, _args) do
# See http://elixir-lang.org/docs/stable/elixir/Supervisor.html # See http://elixir-lang.org/docs/stable/elixir/Supervisor.html
# for other strategies and supported options # for other strategies and supported options
opts = [strategy: :one_for_one, name: Pleroma.Supervisor] opts = [strategy: :one_for_one, name: Pleroma.Supervisor]
result = Supervisor.start_link(children, opts) Supervisor.start_link(children, opts)
:ok = after_supervisor_start()
result
end end
defp setup_instrumenters do defp setup_instrumenters do
@ -142,7 +126,7 @@ defp oauth_cleanup_enabled?,
defp streamer_child(:test), do: [] defp streamer_child(:test), do: []
defp streamer_child(_) do defp streamer_child(_) do
[Pleroma.Web.Streamer] [Pleroma.Web.Streamer.supervisor()]
end end
defp oauth_cleanup_child(true), defp oauth_cleanup_child(true),
@ -165,16 +149,38 @@ defp hackney_pool_children do
end end
end end
defp after_supervisor_start do defp task_children(:test) do
with digest_config <- Application.get_env(:pleroma, :email_notifications)[:digest], [
true <- digest_config[:active] do %{
PleromaJobQueue.schedule( id: :web_push_init,
digest_config[:schedule], start: {Task, :start_link, [&Pleroma.Web.Push.init/0]},
:digest_emails, restart: :temporary
Pleroma.DigestEmailWorker },
) %{
end id: :federator_init,
start: {Task, :start_link, [&Pleroma.Web.Federator.init/0]},
restart: :temporary
}
]
end
:ok defp task_children(_) do
[
%{
id: :web_push_init,
start: {Task, :start_link, [&Pleroma.Web.Push.init/0]},
restart: :temporary
},
%{
id: :federator_init,
start: {Task, :start_link, [&Pleroma.Web.Federator.init/0]},
restart: :temporary
},
%{
id: :internal_fetch_init,
start: {Task, :start_link, [&Pleroma.Web.ActivityPub.InternalFetchActor.init/0]},
restart: :temporary
}
]
end end
end end

View file

@ -210,8 +210,10 @@ def create_notification(%Activity{} = activity, %User{} = user) do
unless skip?(activity, user) do unless skip?(activity, user) do
notification = %Notification{user_id: user.id, activity: activity} notification = %Notification{user_id: user.id, activity: activity}
{:ok, notification} = Repo.insert(notification) {:ok, notification} = Repo.insert(notification)
Streamer.stream("user", notification)
Streamer.stream("user:notification", notification) ["user", "user:notification"]
|> Streamer.stream(notification)
Push.send(notification) Push.send(notification)
notification notification
end end

View file

@ -4,6 +4,7 @@
defmodule Pleroma.Web.ActivityPub.ActivityPub do defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Activity.Ir.Topics
alias Pleroma.Config alias Pleroma.Config
alias Pleroma.Conversation alias Pleroma.Conversation
alias Pleroma.Notification alias Pleroma.Notification
@ -16,6 +17,7 @@ defmodule Pleroma.Web.ActivityPub.ActivityPub do
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.ActivityPub.MRF alias Pleroma.Web.ActivityPub.MRF
alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.Transmogrifier
alias Pleroma.Web.Streamer
alias Pleroma.Web.WebFinger alias Pleroma.Web.WebFinger
import Ecto.Query import Ecto.Query
@ -186,9 +188,7 @@ def stream_out_participations(participations) do
participations participations
|> Repo.preload(:user) |> Repo.preload(:user)
Enum.each(participations, fn participation -> Streamer.stream("participation", participations)
Pleroma.Web.Streamer.stream("participation", participation)
end)
end end
def stream_out_participations(%Object{data: %{"context" => context}}, user) do def stream_out_participations(%Object{data: %{"context" => context}}, user) do
@ -207,41 +207,15 @@ def stream_out_participations(%Object{data: %{"context" => context}}, user) do
def stream_out_participations(_, _), do: :noop def stream_out_participations(_, _), do: :noop
def stream_out(activity) do def stream_out(%Activity{data: %{"type" => data_type}} = activity)
if activity.data["type"] in ["Create", "Announce", "Delete"] do when data_type in ["Create", "Announce", "Delete"] do
object = Object.normalize(activity) activity
# Do not stream out poll replies |> Topics.get_activity_topics()
unless object.data["type"] == "Answer" do |> Streamer.stream(activity)
Pleroma.Web.Streamer.stream("user", activity) end
Pleroma.Web.Streamer.stream("list", activity)
if get_visibility(activity) == "public" do def stream_out(_activity) do
Pleroma.Web.Streamer.stream("public", activity) :noop
if activity.local do
Pleroma.Web.Streamer.stream("public:local", activity)
end
if activity.data["type"] in ["Create"] do
object.data
|> Map.get("tag", [])
|> Enum.filter(fn tag -> is_bitstring(tag) end)
|> Enum.each(fn tag -> Pleroma.Web.Streamer.stream("hashtag:" <> tag, activity) end)
if object.data["attachment"] != [] do
Pleroma.Web.Streamer.stream("public:media", activity)
if activity.local do
Pleroma.Web.Streamer.stream("public:local:media", activity)
end
end
end
else
if get_visibility(activity) == "direct",
do: Pleroma.Web.Streamer.stream("direct", activity)
end
end
end
end end
def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do def create(%{to: to, actor: actor, context: context, object: object} = params, fake \\ false) do

View file

@ -8,6 +8,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
alias Pleroma.Repo alias Pleroma.Repo
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.OAuth.Token alias Pleroma.Web.OAuth.Token
alias Pleroma.Web.Streamer
@behaviour :cowboy_websocket @behaviour :cowboy_websocket
@ -24,7 +25,7 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
] ]
@anonymous_streams ["public", "public:local", "hashtag"] @anonymous_streams ["public", "public:local", "hashtag"]
# Handled by periodic keepalive in Pleroma.Web.Streamer. # Handled by periodic keepalive in Pleroma.Web.Streamer.Ping.
@timeout :infinity @timeout :infinity
def init(%{qs: qs} = req, state) do def init(%{qs: qs} = req, state) do
@ -65,7 +66,7 @@ def websocket_info(:subscribe, state) do
}, topic #{state.topic}" }, topic #{state.topic}"
) )
Pleroma.Web.Streamer.add_socket(state.topic, streamer_socket(state)) Streamer.add_socket(state.topic, streamer_socket(state))
{:ok, state} {:ok, state}
end end
@ -80,7 +81,7 @@ def terminate(reason, _req, state) do
}, topic #{state.topic || "?"}: #{inspect(reason)}" }, topic #{state.topic || "?"}: #{inspect(reason)}"
) )
Pleroma.Web.Streamer.remove_socket(state.topic, streamer_socket(state)) Streamer.remove_socket(state.topic, streamer_socket(state))
:ok :ok
end end

View file

@ -1,318 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Streamer do
use GenServer
require Logger
alias Pleroma.Activity
alias Pleroma.Config
alias Pleroma.Conversation.Participation
alias Pleroma.Notification
alias Pleroma.Object
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.MastodonAPI.NotificationView
@keepalive_interval :timer.seconds(30)
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def add_socket(topic, socket) do
GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
end
def remove_socket(topic, socket) do
GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
end
def stream(topic, item) do
GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
end
def init(args) do
Process.send_after(self(), %{action: :ping}, @keepalive_interval)
{:ok, args}
end
def handle_info(%{action: :ping}, topics) do
topics
|> Map.values()
|> List.flatten()
|> Enum.each(fn socket ->
Logger.debug("Sending keepalive ping")
send(socket.transport_pid, {:text, ""})
end)
Process.send_after(self(), %{action: :ping}, @keepalive_interval)
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
recipient_topics =
User.get_recipients_from_activity(item)
|> Enum.map(fn %{id: id} -> "direct:#{id}" end)
Enum.each(recipient_topics || [], fn user_topic ->
Logger.debug("Trying to push direct message to #{user_topic}\n\n")
push_to_socket(topics, user_topic, item)
end)
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do
user_topic = "direct:#{participation.user_id}"
Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
push_to_socket(topics, user_topic, participation)
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
# filter the recipient list if the activity is not public, see #270.
recipient_lists =
case Visibility.is_public?(item) do
true ->
Pleroma.List.get_lists_from_activity(item)
_ ->
Pleroma.List.get_lists_from_activity(item)
|> Enum.filter(fn list ->
owner = User.get_cached_by_id(list.user_id)
Visibility.visible_for_user?(item, owner)
end)
end
recipient_topics =
recipient_lists
|> Enum.map(fn %{id: id} -> "list:#{id}" end)
Enum.each(recipient_topics || [], fn list_topic ->
Logger.debug("Trying to push message to #{list_topic}\n\n")
push_to_socket(topics, list_topic, item)
end)
{:noreply, topics}
end
def handle_cast(
%{action: :stream, topic: topic, item: %Notification{} = item},
topics
)
when topic in ["user", "user:notification"] do
topics
|> Map.get("#{topic}:#{item.user_id}", [])
|> Enum.each(fn socket ->
with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id),
true <- should_send?(user, item) do
send(
socket.transport_pid,
{:text, represent_notification(socket.assigns[:user], item)}
)
end
end)
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
Logger.debug("Trying to push to users")
recipient_topics =
User.get_recipients_from_activity(item)
|> Enum.map(fn %{id: id} -> "user:#{id}" end)
Enum.each(recipient_topics, fn topic ->
push_to_socket(topics, topic, item)
end)
{:noreply, topics}
end
def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
Logger.debug("Trying to push to #{topic}")
Logger.debug("Pushing item to #{topic}")
push_to_socket(topics, topic, item)
{:noreply, topics}
end
def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
topic = internal_topic(topic, socket)
sockets_for_topic = sockets[topic] || []
sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
sockets = Map.put(sockets, topic, sockets_for_topic)
Logger.debug("Got new conn for #{topic}")
{:noreply, sockets}
end
def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
topic = internal_topic(topic, socket)
sockets_for_topic = sockets[topic] || []
sockets_for_topic = List.delete(sockets_for_topic, socket)
sockets = Map.put(sockets, topic, sockets_for_topic)
Logger.debug("Removed conn for #{topic}")
{:noreply, sockets}
end
def handle_cast(m, state) do
Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
{:noreply, state}
end
defp represent_update(%Activity{} = activity, %User{} = user) do
%{
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
"status.json",
activity: activity,
for: user
)
|> Jason.encode!()
}
|> Jason.encode!()
end
defp represent_update(%Activity{} = activity) do
%{
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
"status.json",
activity: activity
)
|> Jason.encode!()
}
|> Jason.encode!()
end
def represent_conversation(%Participation{} = participation) do
%{
event: "conversation",
payload:
Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
participation: participation,
for: participation.user
})
|> Jason.encode!()
}
|> Jason.encode!()
end
@spec represent_notification(User.t(), Notification.t()) :: binary()
defp represent_notification(%User{} = user, %Notification{} = notify) do
%{
event: "notification",
payload:
NotificationView.render(
"show.json",
%{notification: notify, for: user}
)
|> Jason.encode!()
}
|> Jason.encode!()
end
defp should_send?(%User{} = user, %Activity{} = item) do
blocks = user.info.blocks || []
mutes = user.info.mutes || []
reblog_mutes = user.info.muted_reblogs || []
domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
with parent when not is_nil(parent) <- Object.normalize(item),
true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
%{host: item_host} <- URI.parse(item.actor),
%{host: parent_host} <- URI.parse(parent.data["actor"]),
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
true <- thread_containment(item, user),
false <- CommonAPI.thread_muted?(user, item) do
true
else
_ -> false
end
end
defp should_send?(%User{} = user, %Notification{activity: activity}) do
should_send?(user, activity)
end
def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
Enum.each(topics[topic] || [], fn socket ->
# Get the current user so we have up-to-date blocks etc.
if socket.assigns[:user] do
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
if should_send?(user, item) do
send(socket.transport_pid, {:text, represent_update(item, user)})
end
else
send(socket.transport_pid, {:text, represent_update(item)})
end
end)
end
def push_to_socket(topics, topic, %Participation{} = participation) do
Enum.each(topics[topic] || [], fn socket ->
send(socket.transport_pid, {:text, represent_conversation(participation)})
end)
end
def push_to_socket(topics, topic, %Activity{
data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
}) do
Enum.each(topics[topic] || [], fn socket ->
send(
socket.transport_pid,
{:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
)
end)
end
def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
def push_to_socket(topics, topic, item) do
Enum.each(topics[topic] || [], fn socket ->
# Get the current user so we have up-to-date blocks etc.
if socket.assigns[:user] do
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
blocks = user.info.blocks || []
mutes = user.info.mutes || []
with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
true <- thread_containment(item, user) do
send(socket.transport_pid, {:text, represent_update(item, user)})
end
else
send(socket.transport_pid, {:text, represent_update(item)})
end
end)
end
defp internal_topic(topic, socket) when topic in ~w[user user:notification direct] do
"#{topic}:#{socket.assigns[:user].id}"
end
defp internal_topic(topic, _), do: topic
@spec thread_containment(Activity.t(), User.t()) :: boolean()
defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
defp thread_containment(activity, user) do
if Config.get([:instance, :skip_thread_containment]) do
true
else
ActivityPub.contain_activity(activity, user)
end
end
end

View file

@ -0,0 +1,37 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Streamer.Ping do
use GenServer
require Logger
alias Pleroma.Web.Streamer.State
alias Pleroma.Web.Streamer.StreamerSocket
@keepalive_interval :timer.seconds(30)
def start_link(opts) do
ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval)
GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__)
end
def init(%{ping_interval: ping_interval} = args) do
Process.send_after(self(), :ping, ping_interval)
{:ok, args}
end
def handle_info(:ping, %{ping_interval: ping_interval} = state) do
State.get_sockets()
|> Map.values()
|> List.flatten()
|> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} ->
Logger.debug("Sending keepalive ping")
send(transport_pid, {:text, ""})
end)
Process.send_after(self(), :ping, ping_interval)
{:noreply, state}
end
end

View file

@ -0,0 +1,82 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Streamer.State do
use GenServer
require Logger
alias Pleroma.Web.Streamer.StreamerSocket
@env Mix.env()
def start_link(_) do
GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__)
end
def add_socket(topic, socket) do
GenServer.call(__MODULE__, {:add, topic, socket})
end
def remove_socket(topic, socket) do
do_remove_socket(@env, topic, socket)
end
def get_sockets do
%{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state)
stream_sockets
end
def init(init_arg) do
{:ok, init_arg}
end
def handle_call(:get_state, _from, state) do
{:reply, state, state}
end
def handle_call({:add, topic, socket}, _from, %{sockets: sockets} = state) do
internal_topic = internal_topic(topic, socket)
stream_socket = StreamerSocket.from_socket(socket)
sockets_for_topic =
sockets
|> Map.get(internal_topic, [])
|> List.insert_at(0, stream_socket)
|> Enum.uniq()
state = put_in(state, [:sockets, internal_topic], sockets_for_topic)
Logger.debug("Got new conn for #{topic}")
{:reply, state, state}
end
def handle_call({:remove, topic, socket}, _from, %{sockets: sockets} = state) do
internal_topic = internal_topic(topic, socket)
stream_socket = StreamerSocket.from_socket(socket)
sockets_for_topic =
sockets
|> Map.get(internal_topic, [])
|> List.delete(stream_socket)
state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic)
{:reply, state, state}
end
defp do_remove_socket(:test, _, _) do
:ok
end
defp do_remove_socket(_env, topic, socket) do
GenServer.call(__MODULE__, {:remove, topic, socket})
end
defp internal_topic(topic, socket)
when topic in ~w[user user:notification direct] do
"#{topic}:#{socket.assigns[:user].id}"
end
defp internal_topic(topic, _) do
topic
end
end

View file

@ -0,0 +1,55 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Streamer do
alias Pleroma.Web.Streamer.State
alias Pleroma.Web.Streamer.Worker
@timeout 60_000
@mix_env Mix.env()
def add_socket(topic, socket) do
State.add_socket(topic, socket)
end
def remove_socket(topic, socket) do
State.remove_socket(topic, socket)
end
def get_sockets do
State.get_sockets()
end
def stream(topics, items) do
if should_send?() do
Task.async(fn ->
:poolboy.transaction(
:streamer_worker,
&Worker.stream(&1, topics, items),
@timeout
)
end)
end
end
def supervisor, do: Pleroma.Web.Streamer.Supervisor
defp should_send? do
handle_should_send(@mix_env)
end
defp handle_should_send(:test) do
case Process.whereis(:streamer_worker) do
nil ->
false
pid ->
Process.alive?(pid)
end
end
defp handle_should_send(_) do
true
end
end

View file

@ -0,0 +1,35 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Streamer.StreamerSocket do
defstruct transport_pid: nil, user: nil
alias Pleroma.User
alias Pleroma.Web.Streamer.StreamerSocket
def from_socket(%{
transport_pid: transport_pid,
assigns: %{user: nil}
}) do
%StreamerSocket{
transport_pid: transport_pid
}
end
def from_socket(%{
transport_pid: transport_pid,
assigns: %{user: %User{} = user}
}) do
%StreamerSocket{
transport_pid: transport_pid,
user: user
}
end
def from_socket(%{transport_pid: transport_pid}) do
%StreamerSocket{
transport_pid: transport_pid
}
end
end

View file

@ -0,0 +1,37 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Streamer.Supervisor do
use Supervisor
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(args) do
children = [
{Pleroma.Web.Streamer.State, args},
{Pleroma.Web.Streamer.Ping, args},
:poolboy.child_spec(:streamer_worker, poolboy_config())
]
opts = [strategy: :one_for_one, name: Pleroma.Web.Streamer.Supervisor]
Supervisor.init(children, opts)
end
defp poolboy_config do
opts =
Pleroma.Config.get(:streamer,
workers: 3,
overflow_workers: 2
)
[
{:name, {:local, :streamer_worker}},
{:worker_module, Pleroma.Web.Streamer.Worker},
{:size, opts[:workers]},
{:max_overflow, opts[:overflow_workers]}
]
end
end

View file

@ -0,0 +1,224 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.Streamer.Worker do
use GenServer
require Logger
alias Pleroma.Activity
alias Pleroma.Config
alias Pleroma.Conversation.Participation
alias Pleroma.Notification
alias Pleroma.Object
alias Pleroma.User
alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Visibility
alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Streamer.State
alias Pleroma.Web.Streamer.StreamerSocket
alias Pleroma.Web.StreamerView
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, [])
end
def init(init_arg) do
{:ok, init_arg}
end
def stream(pid, topics, items) do
GenServer.call(pid, {:stream, topics, items})
end
def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
Enum.each(topics, fn t ->
do_stream(%{topic: t, item: item})
end)
{:reply, state, state}
end
def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
Enum.each(items, fn i ->
do_stream(%{topic: topic, item: i})
end)
{:reply, state, state}
end
def handle_call({:stream, topic, item}, _from, state) do
do_stream(%{topic: topic, item: item})
{:reply, state, state}
end
defp do_stream(%{topic: "direct", item: item}) do
recipient_topics =
User.get_recipients_from_activity(item)
|> Enum.map(fn %{id: id} -> "direct:#{id}" end)
Enum.each(recipient_topics, fn user_topic ->
Logger.debug("Trying to push direct message to #{user_topic}\n\n")
push_to_socket(State.get_sockets(), user_topic, item)
end)
end
defp do_stream(%{topic: "participation", item: participation}) do
user_topic = "direct:#{participation.user_id}"
Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
push_to_socket(State.get_sockets(), user_topic, participation)
end
defp do_stream(%{topic: "list", item: item}) do
# filter the recipient list if the activity is not public, see #270.
recipient_lists =
case Visibility.is_public?(item) do
true ->
Pleroma.List.get_lists_from_activity(item)
_ ->
Pleroma.List.get_lists_from_activity(item)
|> Enum.filter(fn list ->
owner = User.get_cached_by_id(list.user_id)
Visibility.visible_for_user?(item, owner)
end)
end
recipient_topics =
recipient_lists
|> Enum.map(fn %{id: id} -> "list:#{id}" end)
Enum.each(recipient_topics, fn list_topic ->
Logger.debug("Trying to push message to #{list_topic}\n\n")
push_to_socket(State.get_sockets(), list_topic, item)
end)
end
defp do_stream(%{topic: topic, item: %Notification{} = item})
when topic in ["user", "user:notification"] do
State.get_sockets()
|> Map.get("#{topic}:#{item.user_id}", [])
|> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
true <- should_send?(user, item) do
send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
end
end)
end
defp do_stream(%{topic: "user", item: item}) do
Logger.debug("Trying to push to users")
recipient_topics =
User.get_recipients_from_activity(item)
|> Enum.map(fn %{id: id} -> "user:#{id}" end)
Enum.each(recipient_topics, fn topic ->
push_to_socket(State.get_sockets(), topic, item)
end)
end
defp do_stream(%{topic: topic, item: item}) do
Logger.debug("Trying to push to #{topic}")
Logger.debug("Pushing item to #{topic}")
push_to_socket(State.get_sockets(), topic, item)
end
defp should_send?(%User{} = user, %Activity{} = item) do
blocks = user.info.blocks || []
mutes = user.info.mutes || []
reblog_mutes = user.info.muted_reblogs || []
recipient_blocks = MapSet.new(blocks ++ mutes)
recipients = MapSet.new(item.recipients)
domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
with parent when not is_nil(parent) <- Object.normalize(item),
true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
true <- MapSet.disjoint?(recipients, recipient_blocks),
%{host: item_host} <- URI.parse(item.actor),
%{host: parent_host} <- URI.parse(parent.data["actor"]),
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
true <- thread_containment(item, user),
false <- CommonAPI.thread_muted?(user, item) do
true
else
_ -> false
end
end
defp should_send?(%User{} = user, %Notification{activity: activity}) do
should_send?(user, activity)
end
def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
Enum.each(topics[topic] || [], fn %StreamerSocket{
transport_pid: transport_pid,
user: socket_user
} ->
# Get the current user so we have up-to-date blocks etc.
if socket_user do
user = User.get_cached_by_ap_id(socket_user.ap_id)
if should_send?(user, item) do
send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
end
else
send(transport_pid, {:text, StreamerView.render("update.json", item)})
end
end)
end
def push_to_socket(topics, topic, %Participation{} = participation) do
Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
end)
end
def push_to_socket(topics, topic, %Activity{
data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
}) do
Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
send(
transport_pid,
{:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
)
end)
end
def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
def push_to_socket(topics, topic, item) do
Enum.each(topics[topic] || [], fn %StreamerSocket{
transport_pid: transport_pid,
user: socket_user
} ->
# Get the current user so we have up-to-date blocks etc.
if socket_user do
user = User.get_cached_by_ap_id(socket_user.ap_id)
if should_send?(user, item) do
send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
end
else
send(transport_pid, {:text, StreamerView.render("update.json", item)})
end
end)
end
@spec thread_containment(Activity.t(), User.t()) :: boolean()
defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
defp thread_containment(activity, user) do
if Config.get([:instance, :skip_thread_containment]) do
true
else
ActivityPub.contain_activity(activity, user)
end
end
end

View file

@ -0,0 +1,66 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.StreamerView do
use Pleroma.Web, :view
alias Pleroma.Activity
alias Pleroma.Conversation.Participation
alias Pleroma.Notification
alias Pleroma.User
alias Pleroma.Web.MastodonAPI.NotificationView
def render("update.json", %Activity{} = activity, %User{} = user) do
%{
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
"status.json",
activity: activity,
for: user
)
|> Jason.encode!()
}
|> Jason.encode!()
end
def render("notification.json", %User{} = user, %Notification{} = notify) do
%{
event: "notification",
payload:
NotificationView.render(
"show.json",
%{notification: notify, for: user}
)
|> Jason.encode!()
}
|> Jason.encode!()
end
def render("update.json", %Activity{} = activity) do
%{
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
"status.json",
activity: activity
)
|> Jason.encode!()
}
|> Jason.encode!()
end
def render("conversation.json", %Participation{} = participation) do
%{
event: "conversation",
payload:
Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
participation: participation,
for: participation.user
})
|> Jason.encode!()
}
|> Jason.encode!()
end
end

View file

@ -143,6 +143,7 @@ defp deps do
ref: "293d77bb6f4a67ac8bde1428735c3b42f22cbb30"}, ref: "293d77bb6f4a67ac8bde1428735c3b42f22cbb30"},
{:pleroma_job_queue, "~> 0.3"}, {:pleroma_job_queue, "~> 0.3"},
{:telemetry, "~> 0.3"}, {:telemetry, "~> 0.3"},
{:poolboy, "~> 1.5"},
{:prometheus_ex, "~> 3.0"}, {:prometheus_ex, "~> 3.0"},
{:prometheus_plugs, "~> 1.1"}, {:prometheus_plugs, "~> 1.1"},
{:prometheus_phoenix, "~> 1.3"}, {:prometheus_phoenix, "~> 1.3"},

View file

@ -70,6 +70,7 @@
"plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm"}, "plug_crypto": {:hex, :plug_crypto, "1.0.0", "18e49317d3fa343f24620ed22795ec29d4a5e602d52d1513ccea0b07d8ea7d4d", [:mix], [], "hexpm"},
"plug_static_index_html": {:hex, :plug_static_index_html, "1.0.0", "840123d4d3975585133485ea86af73cb2600afd7f2a976f9f5fd8b3808e636a0", [:mix], [{:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"}, "plug_static_index_html": {:hex, :plug_static_index_html, "1.0.0", "840123d4d3975585133485ea86af73cb2600afd7f2a976f9f5fd8b3808e636a0", [:mix], [{:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm"},
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}, "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"},
"poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm"},
"postgrex": {:hex, :postgrex, "0.14.3", "5754dee2fdf6e9e508cbf49ab138df964278700b764177e8f3871e658b345a1e", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"}, "postgrex": {:hex, :postgrex, "0.14.3", "5754dee2fdf6e9e508cbf49ab138df964278700b764177e8f3871e658b345a1e", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:db_connection, "~> 2.0", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}], "hexpm"},
"prometheus": {:hex, :prometheus, "4.4.1", "1e96073b3ed7788053768fea779cbc896ddc3bdd9ba60687f2ad50b252ac87d6", [:mix, :rebar3], [], "hexpm"}, "prometheus": {:hex, :prometheus, "4.4.1", "1e96073b3ed7788053768fea779cbc896ddc3bdd9ba60687f2ad50b252ac87d6", [:mix, :rebar3], [], "hexpm"},
"prometheus_ecto": {:hex, :prometheus_ecto, "1.4.1", "6c768ea9654de871e5b32fab2eac348467b3021604ebebbcbd8bcbe806a65ed5", [:mix], [{:ecto, "~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 1.1 or ~> 2.0 or ~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}], "hexpm"}, "prometheus_ecto": {:hex, :prometheus_ecto, "1.4.1", "6c768ea9654de871e5b32fab2eac348467b3021604ebebbcbd8bcbe806a65ed5", [:mix], [{:ecto, "~> 2.0 or ~> 3.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:prometheus_ex, "~> 1.1 or ~> 2.0 or ~> 3.0", [hex: :prometheus_ex, repo: "hexpm", optional: false]}], "hexpm"},

View file

@ -0,0 +1,141 @@
defmodule Pleroma.Activity.Ir.TopicsTest do
use Pleroma.DataCase
alias Pleroma.Activity
alias Pleroma.Activity.Ir.Topics
alias Pleroma.Object
require Pleroma.Constants
describe "poll answer" do
test "produce no topics" do
activity = %Activity{object: %Object{data: %{"type" => "Answer"}}}
assert [] == Topics.get_activity_topics(activity)
end
end
describe "non poll answer" do
test "always add user and list topics" do
activity = %Activity{object: %Object{data: %{"type" => "FooBar"}}}
topics = Topics.get_activity_topics(activity)
assert Enum.member?(topics, "user")
assert Enum.member?(topics, "list")
end
end
describe "public visibility" do
setup do
activity = %Activity{
object: %Object{data: %{"type" => "Note"}},
data: %{"to" => [Pleroma.Constants.as_public()]}
}
{:ok, activity: activity}
end
test "produces public topic", %{activity: activity} do
topics = Topics.get_activity_topics(activity)
assert Enum.member?(topics, "public")
end
test "local action produces public:local topic", %{activity: activity} do
activity = %{activity | local: true}
topics = Topics.get_activity_topics(activity)
assert Enum.member?(topics, "public:local")
end
test "non-local action does not produce public:local topic", %{activity: activity} do
activity = %{activity | local: false}
topics = Topics.get_activity_topics(activity)
refute Enum.member?(topics, "public:local")
end
end
describe "public visibility create events" do
setup do
activity = %Activity{
object: %Object{data: %{"type" => "Create", "attachment" => []}},
data: %{"to" => [Pleroma.Constants.as_public()]}
}
{:ok, activity: activity}
end
test "with no attachments doesn't produce public:media topics", %{activity: activity} do
topics = Topics.get_activity_topics(activity)
refute Enum.member?(topics, "public:media")
refute Enum.member?(topics, "public:local:media")
end
test "converts tags to hash tags", %{activity: %{object: %{data: data} = object} = activity} do
tagged_data = Map.put(data, "tag", ["foo", "bar"])
activity = %{activity | object: %{object | data: tagged_data}}
topics = Topics.get_activity_topics(activity)
assert Enum.member?(topics, "hashtag:foo")
assert Enum.member?(topics, "hashtag:bar")
end
test "only converts strinngs to hash tags", %{
activity: %{object: %{data: data} = object} = activity
} do
tagged_data = Map.put(data, "tag", [2])
activity = %{activity | object: %{object | data: tagged_data}}
topics = Topics.get_activity_topics(activity)
refute Enum.member?(topics, "hashtag:2")
end
end
describe "public visibility create events with attachments" do
setup do
activity = %Activity{
object: %Object{data: %{"type" => "Create", "attachment" => ["foo"]}},
data: %{"to" => [Pleroma.Constants.as_public()]}
}
{:ok, activity: activity}
end
test "produce public:media topics", %{activity: activity} do
topics = Topics.get_activity_topics(activity)
assert Enum.member?(topics, "public:media")
end
test "local produces public:local:media topics", %{activity: activity} do
topics = Topics.get_activity_topics(activity)
assert Enum.member?(topics, "public:local:media")
end
test "non-local doesn't produce public:local:media topics", %{activity: activity} do
activity = %{activity | local: false}
topics = Topics.get_activity_topics(activity)
refute Enum.member?(topics, "public:local:media")
end
end
describe "non-public visibility" do
test "produces direct topic" do
activity = %Activity{object: %Object{data: %{"type" => "Note"}}, data: %{"to" => []}}
topics = Topics.get_activity_topics(activity)
assert Enum.member?(topics, "direct")
refute Enum.member?(topics, "public")
refute Enum.member?(topics, "public:local")
refute Enum.member?(topics, "public:media")
refute Enum.member?(topics, "public:local:media")
end
end
end

View file

@ -5,12 +5,12 @@
defmodule Pleroma.Integration.MastodonWebsocketTest do defmodule Pleroma.Integration.MastodonWebsocketTest do
use Pleroma.DataCase use Pleroma.DataCase
import ExUnit.CaptureLog
import Pleroma.Factory import Pleroma.Factory
alias Pleroma.Integration.WebsocketClient alias Pleroma.Integration.WebsocketClient
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
alias Pleroma.Web.OAuth alias Pleroma.Web.OAuth
alias Pleroma.Web.Streamer
@path Pleroma.Web.Endpoint.url() @path Pleroma.Web.Endpoint.url()
|> URI.parse() |> URI.parse()
@ -18,14 +18,9 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
|> Map.put(:path, "/api/v1/streaming") |> Map.put(:path, "/api/v1/streaming")
|> URI.to_string() |> URI.to_string()
setup do setup_all do
GenServer.start(Streamer, %{}, name: Streamer) start_supervised(Pleroma.Web.Streamer.supervisor())
:ok
on_exit(fn ->
if pid = Process.whereis(Streamer) do
Process.exit(pid, :kill)
end
end)
end end
def start_socket(qs \\ nil, headers \\ []) do def start_socket(qs \\ nil, headers \\ []) do
@ -39,13 +34,19 @@ def start_socket(qs \\ nil, headers \\ []) do
end end
test "refuses invalid requests" do test "refuses invalid requests" do
assert {:error, {400, _}} = start_socket() capture_log(fn ->
assert {:error, {404, _}} = start_socket("?stream=ncjdk") assert {:error, {400, _}} = start_socket()
assert {:error, {404, _}} = start_socket("?stream=ncjdk")
Process.sleep(30)
end)
end end
test "requires authentication and a valid token for protected streams" do test "requires authentication and a valid token for protected streams" do
assert {:error, {403, _}} = start_socket("?stream=user&access_token=aaaaaaaaaaaa") capture_log(fn ->
assert {:error, {403, _}} = start_socket("?stream=user") assert {:error, {403, _}} = start_socket("?stream=user&access_token=aaaaaaaaaaaa")
assert {:error, {403, _}} = start_socket("?stream=user")
Process.sleep(30)
end)
end end
test "allows public streams without authentication" do test "allows public streams without authentication" do
@ -100,19 +101,31 @@ test "accepts valid tokens", state do
test "accepts the 'user' stream", %{token: token} = _state do test "accepts the 'user' stream", %{token: token} = _state do
assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}") assert {:ok, _} = start_socket("?stream=user&access_token=#{token.token}")
assert {:error, {403, "Forbidden"}} = start_socket("?stream=user")
assert capture_log(fn ->
assert {:error, {403, "Forbidden"}} = start_socket("?stream=user")
Process.sleep(30)
end) =~ ":badarg"
end end
test "accepts the 'user:notification' stream", %{token: token} = _state do test "accepts the 'user:notification' stream", %{token: token} = _state do
assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}") assert {:ok, _} = start_socket("?stream=user:notification&access_token=#{token.token}")
assert {:error, {403, "Forbidden"}} = start_socket("?stream=user:notification")
assert capture_log(fn ->
assert {:error, {403, "Forbidden"}} = start_socket("?stream=user:notification")
Process.sleep(30)
end) =~ ":badarg"
end end
test "accepts valid token on Sec-WebSocket-Protocol header", %{token: token} do test "accepts valid token on Sec-WebSocket-Protocol header", %{token: token} do
assert {:ok, _} = start_socket("?stream=user", [{"Sec-WebSocket-Protocol", token.token}]) assert {:ok, _} = start_socket("?stream=user", [{"Sec-WebSocket-Protocol", token.token}])
assert {:error, {403, "Forbidden"}} = assert capture_log(fn ->
start_socket("?stream=user", [{"Sec-WebSocket-Protocol", "I am a friend"}]) assert {:error, {403, "Forbidden"}} =
start_socket("?stream=user", [{"Sec-WebSocket-Protocol", "I am a friend"}])
Process.sleep(30)
end) =~ ":badarg"
end end
end end
end end

View file

@ -68,16 +68,7 @@ test "does not create a notification for subscribed users if status is a reply"
end end
describe "create_notification" do describe "create_notification" do
setup do @tag needs_streamer: true
GenServer.start(Streamer, %{}, name: Streamer)
on_exit(fn ->
if pid = Process.whereis(Streamer) do
Process.exit(pid, :kill)
end
end)
end
test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do
user = insert(:user) user = insert(:user)
task = Task.async(fn -> assert_receive {:text, _}, 4_000 end) task = Task.async(fn -> assert_receive {:text, _}, 4_000 end)

View file

@ -40,6 +40,10 @@ defmodule Pleroma.Web.ConnCase do
Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()}) Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()})
end end
if tags[:needs_streamer] do
start_supervised(Pleroma.Web.Streamer.supervisor())
end
{:ok, conn: Phoenix.ConnTest.build_conn()} {:ok, conn: Phoenix.ConnTest.build_conn()}
end end
end end

View file

@ -39,6 +39,10 @@ defmodule Pleroma.DataCase do
Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()}) Ecto.Adapters.SQL.Sandbox.mode(Pleroma.Repo, {:shared, self()})
end end
if tags[:needs_streamer] do
start_supervised(Pleroma.Web.Streamer.supervisor())
end
:ok :ok
end end

View file

@ -38,9 +38,7 @@ test "it streams them out" do
stream: fn _, _ -> nil end do stream: fn _, _ -> nil end do
ActivityPub.stream_out_participations(conversation.participations) ActivityPub.stream_out_participations(conversation.participations)
Enum.each(participations, fn participation -> assert called(Pleroma.Web.Streamer.stream("participation", participations))
assert called(Pleroma.Web.Streamer.stream("participation", participation))
end)
end end
end end
end end

View file

@ -0,0 +1,36 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.PingTest do
use Pleroma.DataCase
import Pleroma.Factory
alias Pleroma.Web.Streamer
setup do
start_supervised({Streamer.supervisor(), [ping_interval: 30]})
:ok
end
describe "sockets" do
setup do
user = insert(:user)
{:ok, %{user: user}}
end
test "it sends pings", %{user: user} do
task =
Task.async(fn ->
assert_receive {:text, received_event}, 40
assert_receive {:text, received_event}, 40
assert_receive {:text, received_event}, 40
end)
Streamer.add_socket("public", %{transport_pid: task.pid, assigns: %{user: user}})
Task.await(task)
end
end
end

View file

@ -0,0 +1,54 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Web.StateTest do
use Pleroma.DataCase
import Pleroma.Factory
alias Pleroma.Web.Streamer
alias Pleroma.Web.Streamer.StreamerSocket
@moduletag needs_streamer: true
describe "sockets" do
setup do
user = insert(:user)
user2 = insert(:user)
{:ok, %{user: user, user2: user2}}
end
test "it can add a socket", %{user: user} do
Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}})
assert(%{"public" => [%StreamerSocket{transport_pid: 1}]} = Streamer.get_sockets())
end
test "it can add multiple sockets per user", %{user: user} do
Streamer.add_socket("public", %{transport_pid: 1, assigns: %{user: user}})
Streamer.add_socket("public", %{transport_pid: 2, assigns: %{user: user}})
assert(
%{
"public" => [
%StreamerSocket{transport_pid: 2},
%StreamerSocket{transport_pid: 1}
]
} = Streamer.get_sockets()
)
end
test "it will not add a duplicate socket", %{user: user} do
Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}})
Streamer.add_socket("activity", %{transport_pid: 1, assigns: %{user: user}})
assert(
%{
"activity" => [
%StreamerSocket{transport_pid: 1}
]
} = Streamer.get_sockets()
)
end
end
end

View file

@ -5,24 +5,20 @@
defmodule Pleroma.Web.StreamerTest do defmodule Pleroma.Web.StreamerTest do
use Pleroma.DataCase use Pleroma.DataCase
import Pleroma.Factory
alias Pleroma.List alias Pleroma.List
alias Pleroma.User alias Pleroma.User
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
alias Pleroma.Web.Streamer alias Pleroma.Web.Streamer
import Pleroma.Factory alias Pleroma.Web.Streamer.StreamerSocket
alias Pleroma.Web.Streamer.Worker
@moduletag needs_streamer: true
clear_config_all([:instance, :skip_thread_containment]) clear_config_all([:instance, :skip_thread_containment])
describe "user streams" do describe "user streams" do
setup do setup do
GenServer.start(Streamer, %{}, name: Streamer)
on_exit(fn ->
if pid = Process.whereis(Streamer) do
Process.exit(pid, :kill)
end
end)
user = insert(:user) user = insert(:user)
notify = insert(:notification, user: user, activity: build(:note_activity)) notify = insert(:notification, user: user, activity: build(:note_activity))
{:ok, %{user: user, notify: notify}} {:ok, %{user: user, notify: notify}}
@ -125,11 +121,9 @@ test "it sends to public" do
assert_receive {:text, _}, 4_000 assert_receive {:text, _}, 4_000
end) end)
fake_socket = %{ fake_socket = %StreamerSocket{
transport_pid: task.pid, transport_pid: task.pid,
assigns: %{ user: user
user: user
}
} }
{:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"}) {:ok, activity} = CommonAPI.post(other_user, %{"status" => "Test"})
@ -138,7 +132,7 @@ test "it sends to public" do
"public" => [fake_socket] "public" => [fake_socket]
} }
Streamer.push_to_socket(topics, "public", activity) Worker.push_to_socket(topics, "public", activity)
Task.await(task) Task.await(task)
@ -155,11 +149,9 @@ test "it sends to public" do
assert received_event == expected_event assert received_event == expected_event
end) end)
fake_socket = %{ fake_socket = %StreamerSocket{
transport_pid: task.pid, transport_pid: task.pid,
assigns: %{ user: user
user: user
}
} }
{:ok, activity} = CommonAPI.delete(activity.id, other_user) {:ok, activity} = CommonAPI.delete(activity.id, other_user)
@ -168,7 +160,7 @@ test "it sends to public" do
"public" => [fake_socket] "public" => [fake_socket]
} }
Streamer.push_to_socket(topics, "public", activity) Worker.push_to_socket(topics, "public", activity)
Task.await(task) Task.await(task)
end end
@ -189,9 +181,9 @@ test "it doesn't send to user if recipients invalid and thread containment is en
) )
task = Task.async(fn -> refute_receive {:text, _}, 1_000 end) task = Task.async(fn -> refute_receive {:text, _}, 1_000 end)
fake_socket = %{transport_pid: task.pid, assigns: %{user: user}} fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
topics = %{"public" => [fake_socket]} topics = %{"public" => [fake_socket]}
Streamer.push_to_socket(topics, "public", activity) Worker.push_to_socket(topics, "public", activity)
Task.await(task) Task.await(task)
end end
@ -211,9 +203,9 @@ test "it sends message if recipients invalid and thread containment is disabled"
) )
task = Task.async(fn -> assert_receive {:text, _}, 1_000 end) task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
fake_socket = %{transport_pid: task.pid, assigns: %{user: user}} fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
topics = %{"public" => [fake_socket]} topics = %{"public" => [fake_socket]}
Streamer.push_to_socket(topics, "public", activity) Worker.push_to_socket(topics, "public", activity)
Task.await(task) Task.await(task)
end end
@ -233,40 +225,76 @@ test "it sends message if recipients invalid and thread containment is enabled b
) )
task = Task.async(fn -> assert_receive {:text, _}, 1_000 end) task = Task.async(fn -> assert_receive {:text, _}, 1_000 end)
fake_socket = %{transport_pid: task.pid, assigns: %{user: user}} fake_socket = %StreamerSocket{transport_pid: task.pid, user: user}
topics = %{"public" => [fake_socket]} topics = %{"public" => [fake_socket]}
Streamer.push_to_socket(topics, "public", activity) Worker.push_to_socket(topics, "public", activity)
Task.await(task) Task.await(task)
end end
end end
test "it doesn't send to blocked users" do describe "blocks" do
user = insert(:user) test "it doesn't send messages involving blocked users" do
blocked_user = insert(:user) user = insert(:user)
{:ok, user} = User.block(user, blocked_user) blocked_user = insert(:user)
{:ok, user} = User.block(user, blocked_user)
task = task =
Task.async(fn -> Task.async(fn ->
refute_receive {:text, _}, 1_000 refute_receive {:text, _}, 1_000
end) end)
fake_socket = %{ fake_socket = %StreamerSocket{
transport_pid: task.pid, transport_pid: task.pid,
assigns: %{
user: user user: user
} }
}
{:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"}) {:ok, activity} = CommonAPI.post(blocked_user, %{"status" => "Test"})
topics = %{ topics = %{
"public" => [fake_socket] "public" => [fake_socket]
} }
Streamer.push_to_socket(topics, "public", activity) Worker.push_to_socket(topics, "public", activity)
Task.await(task) Task.await(task)
end
test "it doesn't send messages transitively involving blocked users" do
blocker = insert(:user)
blockee = insert(:user)
friend = insert(:user)
task =
Task.async(fn ->
refute_receive {:text, _}, 1_000
end)
fake_socket = %StreamerSocket{
transport_pid: task.pid,
user: blocker
}
topics = %{
"public" => [fake_socket]
}
{:ok, blocker} = User.block(blocker, blockee)
{:ok, activity_one} = CommonAPI.post(friend, %{"status" => "hey! @#{blockee.nickname}"})
Worker.push_to_socket(topics, "public", activity_one)
{:ok, activity_two} = CommonAPI.post(blockee, %{"status" => "hey! @#{friend.nickname}"})
Worker.push_to_socket(topics, "public", activity_two)
{:ok, activity_three} = CommonAPI.post(blockee, %{"status" => "hey! @#{blocker.nickname}"})
Worker.push_to_socket(topics, "public", activity_three)
Task.await(task)
end
end end
test "it doesn't send unwanted DMs to list" do test "it doesn't send unwanted DMs to list" do
@ -284,11 +312,9 @@ test "it doesn't send unwanted DMs to list" do
refute_receive {:text, _}, 1_000 refute_receive {:text, _}, 1_000
end) end)
fake_socket = %{ fake_socket = %StreamerSocket{
transport_pid: task.pid, transport_pid: task.pid,
assigns: %{ user: user_a
user: user_a
}
} }
{:ok, activity} = {:ok, activity} =
@ -301,7 +327,7 @@ test "it doesn't send unwanted DMs to list" do
"list:#{list.id}" => [fake_socket] "list:#{list.id}" => [fake_socket]
} }
Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics) Worker.handle_call({:stream, "list", activity}, self(), topics)
Task.await(task) Task.await(task)
end end
@ -318,11 +344,9 @@ test "it doesn't send unwanted private posts to list" do
refute_receive {:text, _}, 1_000 refute_receive {:text, _}, 1_000
end) end)
fake_socket = %{ fake_socket = %StreamerSocket{
transport_pid: task.pid, transport_pid: task.pid,
assigns: %{ user: user_a
user: user_a
}
} }
{:ok, activity} = {:ok, activity} =
@ -335,12 +359,12 @@ test "it doesn't send unwanted private posts to list" do
"list:#{list.id}" => [fake_socket] "list:#{list.id}" => [fake_socket]
} }
Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics) Worker.handle_call({:stream, "list", activity}, self(), topics)
Task.await(task) Task.await(task)
end end
test "it send wanted private posts to list" do test "it sends wanted private posts to list" do
user_a = insert(:user) user_a = insert(:user)
user_b = insert(:user) user_b = insert(:user)
@ -354,11 +378,9 @@ test "it send wanted private posts to list" do
assert_receive {:text, _}, 1_000 assert_receive {:text, _}, 1_000
end) end)
fake_socket = %{ fake_socket = %StreamerSocket{
transport_pid: task.pid, transport_pid: task.pid,
assigns: %{ user: user_a
user: user_a
}
} }
{:ok, activity} = {:ok, activity} =
@ -367,11 +389,12 @@ test "it send wanted private posts to list" do
"visibility" => "private" "visibility" => "private"
}) })
topics = %{ Streamer.add_socket(
"list:#{list.id}" => [fake_socket] "list:#{list.id}",
} fake_socket
)
Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics) Worker.handle_call({:stream, "list", activity}, self(), %{})
Task.await(task) Task.await(task)
end end
@ -387,11 +410,9 @@ test "it doesn't send muted reblogs" do
refute_receive {:text, _}, 1_000 refute_receive {:text, _}, 1_000
end) end)
fake_socket = %{ fake_socket = %StreamerSocket{
transport_pid: task.pid, transport_pid: task.pid,
assigns: %{ user: user1
user: user1
}
} }
{:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"}) {:ok, create_activity} = CommonAPI.post(user3, %{"status" => "I'm kawen"})
@ -401,7 +422,7 @@ test "it doesn't send muted reblogs" do
"public" => [fake_socket] "public" => [fake_socket]
} }
Streamer.push_to_socket(topics, "public", announce_activity) Worker.push_to_socket(topics, "public", announce_activity)
Task.await(task) Task.await(task)
end end
@ -417,6 +438,8 @@ test "it doesn't send posts from muted threads" do
task = Task.async(fn -> refute_receive {:text, _}, 4_000 end) task = Task.async(fn -> refute_receive {:text, _}, 4_000 end)
Process.sleep(4000)
Streamer.add_socket( Streamer.add_socket(
"user", "user",
%{transport_pid: task.pid, assigns: %{user: user2}} %{transport_pid: task.pid, assigns: %{user: user2}}
@ -428,14 +451,6 @@ test "it doesn't send posts from muted threads" do
describe "direct streams" do describe "direct streams" do
setup do setup do
GenServer.start(Streamer, %{}, name: Streamer)
on_exit(fn ->
if pid = Process.whereis(Streamer) do
Process.exit(pid, :kill)
end
end)
:ok :ok
end end
@ -480,6 +495,8 @@ test "it doesn't send conversation update to the 'direct' streamj when the last
refute_receive {:text, _}, 4_000 refute_receive {:text, _}, 4_000
end) end)
Process.sleep(1000)
Streamer.add_socket( Streamer.add_socket(
"direct", "direct",
%{transport_pid: task.pid, assigns: %{user: user}} %{transport_pid: task.pid, assigns: %{user: user}}
@ -521,6 +538,8 @@ test "it sends conversation update to the 'direct' stream when a message is dele
assert last_status["id"] == to_string(create_activity.id) assert last_status["id"] == to_string(create_activity.id)
end) end)
Process.sleep(1000)
Streamer.add_socket( Streamer.add_socket(
"direct", "direct",
%{transport_pid: task.pid, assigns: %{user: user}} %{transport_pid: task.pid, assigns: %{user: user}}