From 60b025b782eb27b86a791451149b6690431371dc Mon Sep 17 00:00:00 2001 From: Ivan Tashkinov Date: Sat, 19 Sep 2020 19:16:55 +0300 Subject: [PATCH] [#2074] OAuth scope checking in Streaming API. --- lib/pleroma/plugs/oauth_scopes_plug.ex | 2 +- .../web/mastodon_api/websocket_handler.ex | 10 +- lib/pleroma/web/streamer/streamer.ex | 70 ++- test/integration/mastodon_websocket_test.exs | 2 +- test/notification_test.exs | 8 +- test/support/data_case.ex | 15 + test/web/streamer/streamer_test.exs | 408 +++++++++++------- 7 files changed, 332 insertions(+), 183 deletions(-) diff --git a/lib/pleroma/plugs/oauth_scopes_plug.ex b/lib/pleroma/plugs/oauth_scopes_plug.ex index efc25b79f..b1a736d78 100644 --- a/lib/pleroma/plugs/oauth_scopes_plug.ex +++ b/lib/pleroma/plugs/oauth_scopes_plug.ex @@ -53,7 +53,7 @@ def drop_auth_info(conn) do |> assign(:token, nil) end - @doc "Filters descendants of supported scopes" + @doc "Keeps those of `scopes` which are descendants of `supported_scopes`" def filter_descendants(scopes, supported_scopes) do Enum.filter( scopes, diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex index cf923ded8..439cdd716 100644 --- a/lib/pleroma/web/mastodon_api/websocket_handler.ex +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -23,8 +23,8 @@ def init(%{qs: qs} = req, state) do with params <- Enum.into(:cow_qs.parse_qs(qs), %{}), sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil), access_token <- Map.get(params, "access_token"), - {:ok, user} <- authenticate_request(access_token, sec_websocket), - {:ok, topic} <- Streamer.get_topic(Map.get(params, "stream"), user, params) do + {:ok, user, oauth_token} <- authenticate_request(access_token, sec_websocket), + {:ok, topic} <- Streamer.get_topic(params["stream"], user, oauth_token, params) do req = if sec_websocket do :cowboy_req.set_resp_header("sec-websocket-protocol", sec_websocket, req) @@ -117,7 +117,7 @@ def terminate(reason, _req, state) do # Public streams without authentication. defp authenticate_request(nil, nil) do - {:ok, nil} + {:ok, nil, nil} end # Authenticated streams. @@ -125,9 +125,9 @@ defp authenticate_request(access_token, sec_websocket) do token = access_token || sec_websocket with true <- is_bitstring(token), - %Token{user_id: user_id} <- Repo.get_by(Token, token: token), + oauth_token = %Token{user_id: user_id} <- Repo.get_by(Token, token: token), user = %User{} <- User.get_cached_by_id(user_id) do - {:ok, user} + {:ok, user, oauth_token} else _ -> {:error, :unauthorized} end diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex index d1d70e556..5475f18a6 100644 --- a/lib/pleroma/web/streamer/streamer.ex +++ b/lib/pleroma/web/streamer/streamer.ex @@ -11,10 +11,12 @@ defmodule Pleroma.Web.Streamer do alias Pleroma.Conversation.Participation alias Pleroma.Notification alias Pleroma.Object + alias Pleroma.Plugs.OAuthScopesPlug alias Pleroma.User alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.Visibility alias Pleroma.Web.CommonAPI + alias Pleroma.Web.OAuth.Token alias Pleroma.Web.StreamerView @mix_env Mix.env() @@ -26,53 +28,87 @@ def registry, do: @registry @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"] @doc "Expands and authorizes a stream, and registers the process for streaming." - @spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) :: + @spec get_topic_and_add_socket( + stream :: String.t(), + User.t() | nil, + Token.t() | nil, + Map.t() | nil + ) :: {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized} - def get_topic_and_add_socket(stream, user, params \\ %{}) do - case get_topic(stream, user, params) do + def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do + case get_topic(stream, user, oauth_token, params) do {:ok, topic} -> add_socket(topic, user) error -> error end end @doc "Expand and authorizes a stream" - @spec get_topic(stream :: String.t(), User.t() | nil, Map.t()) :: + @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) :: {:ok, topic :: String.t()} | {:error, :bad_topic} - def get_topic(stream, user, params \\ %{}) + def get_topic(stream, user, oauth_token, params \\ %{}) # Allow all public steams. - def get_topic(stream, _, _) when stream in @public_streams do + def get_topic(stream, _user, _oauth_token, _params) when stream in @public_streams do {:ok, stream} end # Allow all hashtags streams. - def get_topic("hashtag", _, %{"tag" => tag}) do + def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do {:ok, "hashtag:" <> tag} end # Expand user streams. - def get_topic(stream, %User{} = user, _) when stream in @user_streams do - {:ok, stream <> ":" <> to_string(user.id)} + def get_topic( + stream, + %User{id: user_id} = user, + %Token{user_id: token_user_id} = oauth_token, + _params + ) + when stream in @user_streams and user_id == token_user_id do + # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope) + required_scopes = + if stream == "user:notification" do + ["read:notifications"] + else + ["read:statuses"] + end + + if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do + {:error, :unauthorized} + else + {:ok, stream <> ":" <> to_string(user.id)} + end end - def get_topic(stream, _, _) when stream in @user_streams do + def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do {:error, :unauthorized} end # List streams. - def get_topic("list", %User{} = user, %{"list" => id}) do - if Pleroma.List.get(id, user) do - {:ok, "list:" <> to_string(id)} - else - {:error, :bad_topic} + def get_topic( + "list", + %User{id: user_id} = user, + %Token{user_id: token_user_id} = oauth_token, + %{"list" => id} + ) + when user_id == token_user_id do + cond do + OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] -> + {:error, :unauthorized} + + Pleroma.List.get(id, user) -> + {:ok, "list:" <> to_string(id)} + + true -> + {:error, :bad_topic} end end - def get_topic("list", _, _) do + def get_topic("list", _user, _oauth_token, _params) do {:error, :unauthorized} end - def get_topic(_, _, _) do + def get_topic(_stream, _user, _oauth_token, _params) do {:error, :bad_topic} end diff --git a/test/integration/mastodon_websocket_test.exs b/test/integration/mastodon_websocket_test.exs index 76fbc8bda..0f2e6cc2b 100644 --- a/test/integration/mastodon_websocket_test.exs +++ b/test/integration/mastodon_websocket_test.exs @@ -78,7 +78,7 @@ test "receives well formatted events" do Pleroma.Repo.insert( OAuth.App.register_changeset(%OAuth.App{}, %{ client_name: "client", - scopes: ["scope"], + scopes: ["read"], redirect_uris: "url" }) ) diff --git a/test/notification_test.exs b/test/notification_test.exs index a09b08675..f2e0f0b0d 100644 --- a/test/notification_test.exs +++ b/test/notification_test.exs @@ -179,17 +179,19 @@ test "does not create a notification for subscribed users if status is a reply" describe "create_notification" do @tag needs_streamer: true test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do - user = insert(:user) + %{user: user, token: oauth_token} = oauth_access(["read"]) task = Task.async(fn -> - Streamer.get_topic_and_add_socket("user", user) + {:ok, _topic} = Streamer.get_topic_and_add_socket("user", user, oauth_token) assert_receive {:render_with_user, _, _, _}, 4_000 end) task_user_notification = Task.async(fn -> - Streamer.get_topic_and_add_socket("user:notification", user) + {:ok, _topic} = + Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) + assert_receive {:render_with_user, _, _, _}, 4_000 end) diff --git a/test/support/data_case.ex b/test/support/data_case.ex index ba8848952..d5456521c 100644 --- a/test/support/data_case.ex +++ b/test/support/data_case.ex @@ -27,6 +27,21 @@ defmodule Pleroma.DataCase do import Ecto.Query import Pleroma.DataCase use Pleroma.Tests.Helpers + + # Sets up OAuth access with specified scopes + defp oauth_access(scopes, opts \\ []) do + user = + Keyword.get_lazy(opts, :user, fn -> + Pleroma.Factory.insert(:user) + end) + + token = + Keyword.get_lazy(opts, :oauth_token, fn -> + Pleroma.Factory.insert(:oauth_token, user: user, scopes: scopes) + end) + + %{user: user, token: token} + end end end diff --git a/test/web/streamer/streamer_test.exs b/test/web/streamer/streamer_test.exs index d56d74464..185724a9f 100644 --- a/test/web/streamer/streamer_test.exs +++ b/test/web/streamer/streamer_test.exs @@ -21,92 +21,148 @@ defmodule Pleroma.Web.StreamerTest do setup do: clear_config([:instance, :skip_thread_containment]) - describe "get_topic without an user" do + describe "get_topic/_ (unauthenticated)" do test "allows public" do - assert {:ok, "public"} = Streamer.get_topic("public", nil) - assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil) - assert {:ok, "public:media"} = Streamer.get_topic("public:media", nil) - assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", nil) + assert {:ok, "public"} = Streamer.get_topic("public", nil, nil) + assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil, nil) + assert {:ok, "public:media"} = Streamer.get_topic("public:media", nil, nil) + assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", nil, nil) end test "allows hashtag streams" do - assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", nil, %{"tag" => "cofe"}) + assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", nil, nil, %{"tag" => "cofe"}) end test "disallows user streams" do - assert {:error, _} = Streamer.get_topic("user", nil) - assert {:error, _} = Streamer.get_topic("user:notification", nil) - assert {:error, _} = Streamer.get_topic("direct", nil) + assert {:error, _} = Streamer.get_topic("user", nil, nil) + assert {:error, _} = Streamer.get_topic("user:notification", nil, nil) + assert {:error, _} = Streamer.get_topic("direct", nil, nil) end test "disallows list streams" do - assert {:error, _} = Streamer.get_topic("list", nil, %{"list" => 42}) + assert {:error, _} = Streamer.get_topic("list", nil, nil, %{"list" => 42}) end end - describe "get_topic with an user" do - setup do - user = insert(:user) - {:ok, %{user: user}} + describe "get_topic/_ (authenticated)" do + setup do: oauth_access(["read"]) + + test "allows public streams (regardless of OAuth token scopes)", %{ + user: user, + token: read_oauth_token + } do + with oauth_token <- [nil, read_oauth_token] do + assert {:ok, "public"} = Streamer.get_topic("public", user, oauth_token) + assert {:ok, "public:local"} = Streamer.get_topic("public:local", user, oauth_token) + assert {:ok, "public:media"} = Streamer.get_topic("public:media", user, oauth_token) + + assert {:ok, "public:local:media"} = + Streamer.get_topic("public:local:media", user, oauth_token) + end end - test "allows public streams", %{user: user} do - assert {:ok, "public"} = Streamer.get_topic("public", user) - assert {:ok, "public:local"} = Streamer.get_topic("public:local", user) - assert {:ok, "public:media"} = Streamer.get_topic("public:media", user) - assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", user) - end + test "allows user streams (with proper OAuth token scopes)", %{ + user: user, + token: read_oauth_token + } do + %{token: read_notifications_token} = oauth_access(["read:notifications"], user: user) + %{token: read_statuses_token} = oauth_access(["read:statuses"], user: user) + %{token: badly_scoped_token} = oauth_access(["irrelevant:scope"], user: user) - test "allows user streams", %{user: user} do expected_user_topic = "user:#{user.id}" - expected_notif_topic = "user:notification:#{user.id}" + expected_notification_topic = "user:notification:#{user.id}" expected_direct_topic = "direct:#{user.id}" - assert {:ok, ^expected_user_topic} = Streamer.get_topic("user", user) - assert {:ok, ^expected_notif_topic} = Streamer.get_topic("user:notification", user) - assert {:ok, ^expected_direct_topic} = Streamer.get_topic("direct", user) + expected_pleroma_chat_topic = "user:pleroma_chat:#{user.id}" + + for valid_user_token <- [read_oauth_token, read_statuses_token] do + assert {:ok, ^expected_user_topic} = Streamer.get_topic("user", user, valid_user_token) + + assert {:ok, ^expected_direct_topic} = + Streamer.get_topic("direct", user, valid_user_token) + + assert {:ok, ^expected_pleroma_chat_topic} = + Streamer.get_topic("user:pleroma_chat", user, valid_user_token) + end + + for invalid_user_token <- [read_notifications_token, badly_scoped_token], + user_topic <- ["user", "direct", "user:pleroma_chat"] do + assert {:error, :unauthorized} = Streamer.get_topic(user_topic, user, invalid_user_token) + end + + for valid_notification_token <- [read_oauth_token, read_notifications_token] do + assert {:ok, ^expected_notification_topic} = + Streamer.get_topic("user:notification", user, valid_notification_token) + end + + for invalid_notification_token <- [read_statuses_token, badly_scoped_token] do + assert {:error, :unauthorized} = + Streamer.get_topic("user:notification", user, invalid_notification_token) + end end - test "allows hashtag streams", %{user: user} do - assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", user, %{"tag" => "cofe"}) + test "allows hashtag streams (regardless of OAuth token scopes)", %{ + user: user, + token: read_oauth_token + } do + for oauth_token <- [nil, read_oauth_token] do + assert {:ok, "hashtag:cofe"} = + Streamer.get_topic("hashtag", user, oauth_token, %{"tag" => "cofe"}) + end end - test "disallows registering to an user stream", %{user: user} do + test "disallows registering to another user's stream", %{user: user, token: read_oauth_token} do another_user = insert(:user) - assert {:error, _} = Streamer.get_topic("user:#{another_user.id}", user) - assert {:error, _} = Streamer.get_topic("user:notification:#{another_user.id}", user) - assert {:error, _} = Streamer.get_topic("direct:#{another_user.id}", user) + assert {:error, _} = Streamer.get_topic("user:#{another_user.id}", user, read_oauth_token) + + assert {:error, _} = + Streamer.get_topic("user:notification:#{another_user.id}", user, read_oauth_token) + + assert {:error, _} = Streamer.get_topic("direct:#{another_user.id}", user, read_oauth_token) end - test "allows list stream that are owned by the user", %{user: user} do + test "allows list stream that are owned by the user (with `read` or `read:lists` scopes)", %{ + user: user, + token: read_oauth_token + } do + %{token: read_lists_token} = oauth_access(["read:lists"], user: user) + %{token: invalid_token} = oauth_access(["irrelevant:scope"], user: user) {:ok, list} = List.create("Test", user) - assert {:error, _} = Streamer.get_topic("list:#{list.id}", user) - assert {:ok, _} = Streamer.get_topic("list", user, %{"list" => list.id}) + + assert {:error, _} = Streamer.get_topic("list:#{list.id}", user, read_oauth_token) + + for valid_token <- [read_oauth_token, read_lists_token] do + assert {:ok, _} = Streamer.get_topic("list", user, valid_token, %{"list" => list.id}) + end + + assert {:error, _} = Streamer.get_topic("list", user, invalid_token, %{"list" => list.id}) end - test "disallows list stream that are not owned by the user", %{user: user} do + test "disallows list stream that are not owned by the user", %{user: user, token: oauth_token} do another_user = insert(:user) {:ok, list} = List.create("Test", another_user) - assert {:error, _} = Streamer.get_topic("list:#{list.id}", user) - assert {:error, _} = Streamer.get_topic("list", user, %{"list" => list.id}) + + assert {:error, _} = Streamer.get_topic("list:#{list.id}", user, oauth_token) + assert {:error, _} = Streamer.get_topic("list", user, oauth_token, %{"list" => list.id}) end end describe "user streams" do setup do - user = insert(:user) + %{user: user, token: token} = oauth_access(["read"]) notify = insert(:notification, user: user, activity: build(:note_activity)) - {:ok, %{user: user, notify: notify}} + {:ok, %{user: user, notify: notify, token: token}} end - test "it streams the user's post in the 'user' stream", %{user: user} do - Streamer.get_topic_and_add_socket("user", user) + test "it streams the user's post in the 'user' stream", %{user: user, token: oauth_token} do + Streamer.get_topic_and_add_socket("user", user, oauth_token) {:ok, activity} = CommonAPI.post(user, %{status: "hey"}) + assert_receive {:render_with_user, _, _, ^activity} refute Streamer.filtered_by_user?(user, activity) end - test "it streams boosts of the user in the 'user' stream", %{user: user} do - Streamer.get_topic_and_add_socket("user", user) + test "it streams boosts of the user in the 'user' stream", %{user: user, token: oauth_token} do + Streamer.get_topic_and_add_socket("user", user, oauth_token) other_user = insert(:user) {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"}) @@ -117,9 +173,10 @@ test "it streams boosts of the user in the 'user' stream", %{user: user} do end test "it does not stream announces of the user's own posts in the 'user' stream", %{ - user: user + user: user, + token: oauth_token } do - Streamer.get_topic_and_add_socket("user", user) + Streamer.get_topic_and_add_socket("user", user, oauth_token) other_user = insert(:user) {:ok, activity} = CommonAPI.post(user, %{status: "hey"}) @@ -129,9 +186,10 @@ test "it does not stream announces of the user's own posts in the 'user' stream" end test "it does stream notifications announces of the user's own posts in the 'user' stream", %{ - user: user + user: user, + token: oauth_token } do - Streamer.get_topic_and_add_socket("user", user) + Streamer.get_topic_and_add_socket("user", user, oauth_token) other_user = insert(:user) {:ok, activity} = CommonAPI.post(user, %{status: "hey"}) @@ -145,8 +203,11 @@ test "it does stream notifications announces of the user's own posts in the 'use refute Streamer.filtered_by_user?(user, notification) end - test "it streams boosts of mastodon user in the 'user' stream", %{user: user} do - Streamer.get_topic_and_add_socket("user", user) + test "it streams boosts of mastodon user in the 'user' stream", %{ + user: user, + token: oauth_token + } do + Streamer.get_topic_and_add_socket("user", user, oauth_token) other_user = insert(:user) {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"}) @@ -164,21 +225,34 @@ test "it streams boosts of mastodon user in the 'user' stream", %{user: user} do refute Streamer.filtered_by_user?(user, announce) end - test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do - Streamer.get_topic_and_add_socket("user", user) + test "it sends notify to in the 'user' stream", %{ + user: user, + token: oauth_token, + notify: notify + } do + Streamer.get_topic_and_add_socket("user", user, oauth_token) Streamer.stream("user", notify) + assert_receive {:render_with_user, _, _, ^notify} refute Streamer.filtered_by_user?(user, notify) end - test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do - Streamer.get_topic_and_add_socket("user:notification", user) + test "it sends notify to in the 'user:notification' stream", %{ + user: user, + token: oauth_token, + notify: notify + } do + Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) Streamer.stream("user:notification", notify) + assert_receive {:render_with_user, _, _, ^notify} refute Streamer.filtered_by_user?(user, notify) end - test "it sends chat messages to the 'user:pleroma_chat' stream", %{user: user} do + test "it sends chat messages to the 'user:pleroma_chat' stream", %{ + user: user, + token: oauth_token + } do other_user = insert(:user) {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno") @@ -187,7 +261,7 @@ test "it sends chat messages to the 'user:pleroma_chat' stream", %{user: user} d cm_ref = MessageReference.for_chat_and_object(chat, object) cm_ref = %{cm_ref | chat: chat, object: object} - Streamer.get_topic_and_add_socket("user:pleroma_chat", user) + Streamer.get_topic_and_add_socket("user:pleroma_chat", user, oauth_token) Streamer.stream("user:pleroma_chat", {user, cm_ref}) text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) @@ -196,7 +270,7 @@ test "it sends chat messages to the 'user:pleroma_chat' stream", %{user: user} d assert_receive {:text, ^text} end - test "it sends chat messages to the 'user' stream", %{user: user} do + test "it sends chat messages to the 'user' stream", %{user: user, token: oauth_token} do other_user = insert(:user) {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno") @@ -205,7 +279,7 @@ test "it sends chat messages to the 'user' stream", %{user: user} do cm_ref = MessageReference.for_chat_and_object(chat, object) cm_ref = %{cm_ref | chat: chat, object: object} - Streamer.get_topic_and_add_socket("user", user) + Streamer.get_topic_and_add_socket("user", user, oauth_token) Streamer.stream("user", {user, cm_ref}) text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref}) @@ -214,7 +288,10 @@ test "it sends chat messages to the 'user' stream", %{user: user} do assert_receive {:text, ^text} end - test "it sends chat message notifications to the 'user:notification' stream", %{user: user} do + test "it sends chat message notifications to the 'user:notification' stream", %{ + user: user, + token: oauth_token + } do other_user = insert(:user) {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey") @@ -223,19 +300,21 @@ test "it sends chat message notifications to the 'user:notification' stream", %{ Repo.get_by(Pleroma.Notification, user_id: user.id, activity_id: create_activity.id) |> Repo.preload(:activity) - Streamer.get_topic_and_add_socket("user:notification", user) + Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) Streamer.stream("user:notification", notify) + assert_receive {:render_with_user, _, _, ^notify} refute Streamer.filtered_by_user?(user, notify) end test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{ - user: user + user: user, + token: oauth_token } do blocked = insert(:user) {:ok, _user_relationship} = User.block(user, blocked) - Streamer.get_topic_and_add_socket("user:notification", user) + Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) {:ok, activity} = CommonAPI.post(user, %{status: ":("}) {:ok, _} = CommonAPI.favorite(blocked, activity.id) @@ -244,14 +323,15 @@ test "it doesn't send notify to the 'user:notification' stream when a user is bl end test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{ - user: user + user: user, + token: oauth_token } do user2 = insert(:user) {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"}) {:ok, _} = CommonAPI.add_mute(user, activity) - Streamer.get_topic_and_add_socket("user:notification", user) + Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id) @@ -260,12 +340,13 @@ test "it doesn't send notify to the 'user:notification' stream when a thread is end test "it sends favorite to 'user:notification' stream'", %{ - user: user + user: user, + token: oauth_token } do user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"}) {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"}) - Streamer.get_topic_and_add_socket("user:notification", user) + Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id) assert_receive {:render_with_user, _, "notification.json", notif} @@ -274,13 +355,14 @@ test "it sends favorite to 'user:notification' stream'", %{ end test "it doesn't send the 'user:notification' stream' when a domain is blocked", %{ - user: user + user: user, + token: oauth_token } do user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"}) {:ok, user} = User.block_domain(user, "hecking-lewd-place.com") {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"}) - Streamer.get_topic_and_add_socket("user:notification", user) + Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id) refute_receive _ @@ -288,7 +370,8 @@ test "it doesn't send the 'user:notification' stream' when a domain is blocked", end test "it sends follow activities to the 'user:notification' stream", %{ - user: user + user: user, + token: oauth_token } do user_url = user.ap_id user2 = insert(:user) @@ -303,7 +386,7 @@ test "it sends follow activities to the 'user:notification' stream", %{ %Tesla.Env{status: 200, body: body} end) - Streamer.get_topic_and_add_socket("user:notification", user) + Streamer.get_topic_and_add_socket("user:notification", user, oauth_token) {:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user) assert_receive {:render_with_user, _, "notification.json", notif} @@ -312,51 +395,53 @@ test "it sends follow activities to the 'user:notification' stream", %{ end end - test "it sends to public authenticated" do - user = insert(:user) - other_user = insert(:user) + describe "public streams" do + test "it sends to public (authenticated)" do + %{user: user, token: oauth_token} = oauth_access(["read"]) + other_user = insert(:user) - Streamer.get_topic_and_add_socket("public", other_user) + Streamer.get_topic_and_add_socket("public", user, oauth_token) - {:ok, activity} = CommonAPI.post(user, %{status: "Test"}) - assert_receive {:render_with_user, _, _, ^activity} - refute Streamer.filtered_by_user?(user, activity) + {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"}) + assert_receive {:render_with_user, _, _, ^activity} + refute Streamer.filtered_by_user?(other_user, activity) + end + + test "it sends to public (unauthenticated)" do + user = insert(:user) + + Streamer.get_topic_and_add_socket("public", nil, nil) + + {:ok, activity} = CommonAPI.post(user, %{status: "Test"}) + activity_id = activity.id + assert_receive {:text, event} + assert %{"event" => "update", "payload" => payload} = Jason.decode!(event) + assert %{"id" => ^activity_id} = Jason.decode!(payload) + + {:ok, _} = CommonAPI.delete(activity.id, user) + assert_receive {:text, event} + assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event) + end + + test "handles deletions" do + %{user: user, token: oauth_token} = oauth_access(["read"]) + other_user = insert(:user) + {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"}) + + Streamer.get_topic_and_add_socket("public", user, oauth_token) + + {:ok, _} = CommonAPI.delete(activity.id, other_user) + activity_id = activity.id + assert_receive {:text, event} + assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event) + end end - test "works for deletions" do - user = insert(:user) - other_user = insert(:user) - {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"}) - - Streamer.get_topic_and_add_socket("public", user) - - {:ok, _} = CommonAPI.delete(activity.id, other_user) - activity_id = activity.id - assert_receive {:text, event} - assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event) - end - - test "it sends to public unauthenticated" do - user = insert(:user) - - Streamer.get_topic_and_add_socket("public", nil) - - {:ok, activity} = CommonAPI.post(user, %{status: "Test"}) - activity_id = activity.id - assert_receive {:text, event} - assert %{"event" => "update", "payload" => payload} = Jason.decode!(event) - assert %{"id" => ^activity_id} = Jason.decode!(payload) - - {:ok, _} = CommonAPI.delete(activity.id, user) - assert_receive {:text, event} - assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event) - end - - describe "thread_containment" do + describe "thread_containment/2" do test "it filters to user if recipients invalid and thread containment is enabled" do Pleroma.Config.put([:instance, :skip_thread_containment], false) author = insert(:user) - user = insert(:user) + %{user: user, token: oauth_token} = oauth_access(["read"]) User.follow(user, author, :follow_accept) activity = @@ -368,7 +453,7 @@ test "it filters to user if recipients invalid and thread containment is enabled ) ) - Streamer.get_topic_and_add_socket("public", user) + Streamer.get_topic_and_add_socket("public", user, oauth_token) Streamer.stream("public", activity) assert_receive {:render_with_user, _, _, ^activity} assert Streamer.filtered_by_user?(user, activity) @@ -377,7 +462,7 @@ test "it filters to user if recipients invalid and thread containment is enabled test "it sends message if recipients invalid and thread containment is disabled" do Pleroma.Config.put([:instance, :skip_thread_containment], true) author = insert(:user) - user = insert(:user) + %{user: user, token: oauth_token} = oauth_access(["read"]) User.follow(user, author, :follow_accept) activity = @@ -389,7 +474,7 @@ test "it sends message if recipients invalid and thread containment is disabled" ) ) - Streamer.get_topic_and_add_socket("public", user) + Streamer.get_topic_and_add_socket("public", user, oauth_token) Streamer.stream("public", activity) assert_receive {:render_with_user, _, _, ^activity} @@ -400,6 +485,7 @@ test "it sends message if recipients invalid and thread containment is enabled b Pleroma.Config.put([:instance, :skip_thread_containment], false) author = insert(:user) user = insert(:user, skip_thread_containment: true) + %{token: oauth_token} = oauth_access(["read"], user: user) User.follow(user, author, :follow_accept) activity = @@ -411,7 +497,7 @@ test "it sends message if recipients invalid and thread containment is enabled b ) ) - Streamer.get_topic_and_add_socket("public", user) + Streamer.get_topic_and_add_socket("public", user, oauth_token) Streamer.stream("public", activity) assert_receive {:render_with_user, _, _, ^activity} @@ -420,23 +506,26 @@ test "it sends message if recipients invalid and thread containment is enabled b end describe "blocks" do - test "it filters messages involving blocked users" do - user = insert(:user) + setup do: oauth_access(["read"]) + + test "it filters messages involving blocked users", %{user: user, token: oauth_token} do blocked_user = insert(:user) {:ok, _user_relationship} = User.block(user, blocked_user) - Streamer.get_topic_and_add_socket("public", user) + Streamer.get_topic_and_add_socket("public", user, oauth_token) {:ok, activity} = CommonAPI.post(blocked_user, %{status: "Test"}) assert_receive {:render_with_user, _, _, ^activity} assert Streamer.filtered_by_user?(user, activity) end - test "it filters messages transitively involving blocked users" do - blocker = insert(:user) + test "it filters messages transitively involving blocked users", %{ + user: blocker, + token: blocker_token + } do blockee = insert(:user) friend = insert(:user) - Streamer.get_topic_and_add_socket("public", blocker) + Streamer.get_topic_and_add_socket("public", blocker, blocker_token) {:ok, _user_relationship} = User.block(blocker, blockee) @@ -458,8 +547,9 @@ test "it filters messages transitively involving blocked users" do end describe "lists" do - test "it doesn't send unwanted DMs to list" do - user_a = insert(:user) + setup do: oauth_access(["read"]) + + test "it doesn't send unwanted DMs to list", %{user: user_a, token: user_a_token} do user_b = insert(:user) user_c = insert(:user) @@ -468,7 +558,7 @@ test "it doesn't send unwanted DMs to list" do {:ok, list} = List.create("Test", user_a) {:ok, list} = List.follow(list, user_b) - Streamer.get_topic_and_add_socket("list", user_a, %{"list" => list.id}) + Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id}) {:ok, _activity} = CommonAPI.post(user_b, %{ @@ -479,14 +569,13 @@ test "it doesn't send unwanted DMs to list" do refute_receive _ end - test "it doesn't send unwanted private posts to list" do - user_a = insert(:user) + test "it doesn't send unwanted private posts to list", %{user: user_a, token: user_a_token} do user_b = insert(:user) {:ok, list} = List.create("Test", user_a) {:ok, list} = List.follow(list, user_b) - Streamer.get_topic_and_add_socket("list", user_a, %{"list" => list.id}) + Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id}) {:ok, _activity} = CommonAPI.post(user_b, %{ @@ -497,8 +586,7 @@ test "it doesn't send unwanted private posts to list" do refute_receive _ end - test "it sends wanted private posts to list" do - user_a = insert(:user) + test "it sends wanted private posts to list", %{user: user_a, token: user_a_token} do user_b = insert(:user) {:ok, user_a} = User.follow(user_a, user_b) @@ -506,7 +594,7 @@ test "it sends wanted private posts to list" do {:ok, list} = List.create("Test", user_a) {:ok, list} = List.follow(list, user_b) - Streamer.get_topic_and_add_socket("list", user_a, %{"list" => list.id}) + Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id}) {:ok, activity} = CommonAPI.post(user_b, %{ @@ -520,8 +608,9 @@ test "it sends wanted private posts to list" do end describe "muted reblogs" do - test "it filters muted reblogs" do - user1 = insert(:user) + setup do: oauth_access(["read"]) + + test "it filters muted reblogs", %{user: user1, token: user1_token} do user2 = insert(:user) user3 = insert(:user) CommonAPI.follow(user1, user2) @@ -529,34 +618,38 @@ test "it filters muted reblogs" do {:ok, create_activity} = CommonAPI.post(user3, %{status: "I'm kawen"}) - Streamer.get_topic_and_add_socket("user", user1) + Streamer.get_topic_and_add_socket("user", user1, user1_token) {:ok, announce_activity} = CommonAPI.repeat(create_activity.id, user2) assert_receive {:render_with_user, _, _, ^announce_activity} assert Streamer.filtered_by_user?(user1, announce_activity) end - test "it filters reblog notification for reblog-muted actors" do - user1 = insert(:user) + test "it filters reblog notification for reblog-muted actors", %{ + user: user1, + token: user1_token + } do user2 = insert(:user) CommonAPI.follow(user1, user2) CommonAPI.hide_reblogs(user1, user2) {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"}) - Streamer.get_topic_and_add_socket("user", user1) + Streamer.get_topic_and_add_socket("user", user1, user1_token) {:ok, _announce_activity} = CommonAPI.repeat(create_activity.id, user2) assert_receive {:render_with_user, _, "notification.json", notif} assert Streamer.filtered_by_user?(user1, notif) end - test "it send non-reblog notification for reblog-muted actors" do - user1 = insert(:user) + test "it send non-reblog notification for reblog-muted actors", %{ + user: user1, + token: user1_token + } do user2 = insert(:user) CommonAPI.follow(user1, user2) CommonAPI.hide_reblogs(user1, user2) {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"}) - Streamer.get_topic_and_add_socket("user", user1) + Streamer.get_topic_and_add_socket("user", user1, user1_token) {:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id) assert_receive {:render_with_user, _, "notification.json", notif} @@ -564,27 +657,28 @@ test "it send non-reblog notification for reblog-muted actors" do end end - test "it filters posts from muted threads" do - user = insert(:user) - user2 = insert(:user) - Streamer.get_topic_and_add_socket("user", user2) - {:ok, user2, user, _activity} = CommonAPI.follow(user2, user) - {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"}) - {:ok, _} = CommonAPI.add_mute(user2, activity) - assert_receive {:render_with_user, _, _, ^activity} - assert Streamer.filtered_by_user?(user2, activity) + describe "muted threads" do + test "it filters posts from muted threads" do + user = insert(:user) + %{user: user2, token: user2_token} = oauth_access(["read"]) + Streamer.get_topic_and_add_socket("user", user2, user2_token) + + {:ok, user2, user, _activity} = CommonAPI.follow(user2, user) + {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"}) + {:ok, _} = CommonAPI.add_mute(user2, activity) + + assert_receive {:render_with_user, _, _, ^activity} + assert Streamer.filtered_by_user?(user2, activity) + end end describe "direct streams" do - setup do - :ok - end + setup do: oauth_access(["read"]) - test "it sends conversation update to the 'direct' stream", %{} do - user = insert(:user) + test "it sends conversation update to the 'direct' stream", %{user: user, token: oauth_token} do another_user = insert(:user) - Streamer.get_topic_and_add_socket("direct", user) + Streamer.get_topic_and_add_socket("direct", user, oauth_token) {:ok, _create_activity} = CommonAPI.post(another_user, %{ @@ -602,11 +696,11 @@ test "it sends conversation update to the 'direct' stream", %{} do assert last_status["pleroma"]["direct_conversation_id"] == participation.id end - test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted" do - user = insert(:user) + test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted", + %{user: user, token: oauth_token} do another_user = insert(:user) - Streamer.get_topic_and_add_socket("direct", user) + Streamer.get_topic_and_add_socket("direct", user, oauth_token) {:ok, create_activity} = CommonAPI.post(another_user, %{ @@ -629,10 +723,12 @@ test "it doesn't send conversation update to the 'direct' stream when the last m refute_receive _ end - test "it sends conversation update to the 'direct' stream when a message is deleted" do - user = insert(:user) + test "it sends conversation update to the 'direct' stream when a message is deleted", %{ + user: user, + token: oauth_token + } do another_user = insert(:user) - Streamer.get_topic_and_add_socket("direct", user) + Streamer.get_topic_and_add_socket("direct", user, oauth_token) {:ok, create_activity} = CommonAPI.post(another_user, %{