diff --git a/lib/pleroma/plugs/oauth_scopes_plug.ex b/lib/pleroma/plugs/oauth_scopes_plug.ex
index efc25b79f..b1a736d78 100644
--- a/lib/pleroma/plugs/oauth_scopes_plug.ex
+++ b/lib/pleroma/plugs/oauth_scopes_plug.ex
@@ -53,7 +53,7 @@ def drop_auth_info(conn) do
     |> assign(:token, nil)
   end
 
-  @doc "Filters descendants of supported scopes"
+  @doc "Keeps those of `scopes` which are descendants of `supported_scopes`"
   def filter_descendants(scopes, supported_scopes) do
     Enum.filter(
       scopes,
diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex
index cf923ded8..439cdd716 100644
--- a/lib/pleroma/web/mastodon_api/websocket_handler.ex
+++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex
@@ -23,8 +23,8 @@ def init(%{qs: qs} = req, state) do
     with params <- Enum.into(:cow_qs.parse_qs(qs), %{}),
          sec_websocket <- :cowboy_req.header("sec-websocket-protocol", req, nil),
          access_token <- Map.get(params, "access_token"),
-         {:ok, user} <- authenticate_request(access_token, sec_websocket),
-         {:ok, topic} <- Streamer.get_topic(Map.get(params, "stream"), user, params) do
+         {:ok, user, oauth_token} <- authenticate_request(access_token, sec_websocket),
+         {:ok, topic} <- Streamer.get_topic(params["stream"], user, oauth_token, params) do
       req =
         if sec_websocket do
           :cowboy_req.set_resp_header("sec-websocket-protocol", sec_websocket, req)
@@ -117,7 +117,7 @@ def terminate(reason, _req, state) do
 
   # Public streams without authentication.
   defp authenticate_request(nil, nil) do
-    {:ok, nil}
+    {:ok, nil, nil}
   end
 
   # Authenticated streams.
@@ -125,9 +125,9 @@ defp authenticate_request(access_token, sec_websocket) do
     token = access_token || sec_websocket
 
     with true <- is_bitstring(token),
-         %Token{user_id: user_id} <- Repo.get_by(Token, token: token),
+         oauth_token = %Token{user_id: user_id} <- Repo.get_by(Token, token: token),
          user = %User{} <- User.get_cached_by_id(user_id) do
-      {:ok, user}
+      {:ok, user, oauth_token}
     else
       _ -> {:error, :unauthorized}
     end
diff --git a/lib/pleroma/web/streamer/streamer.ex b/lib/pleroma/web/streamer/streamer.ex
index d1d70e556..5475f18a6 100644
--- a/lib/pleroma/web/streamer/streamer.ex
+++ b/lib/pleroma/web/streamer/streamer.ex
@@ -11,10 +11,12 @@ defmodule Pleroma.Web.Streamer do
   alias Pleroma.Conversation.Participation
   alias Pleroma.Notification
   alias Pleroma.Object
+  alias Pleroma.Plugs.OAuthScopesPlug
   alias Pleroma.User
   alias Pleroma.Web.ActivityPub.ActivityPub
   alias Pleroma.Web.ActivityPub.Visibility
   alias Pleroma.Web.CommonAPI
+  alias Pleroma.Web.OAuth.Token
   alias Pleroma.Web.StreamerView
 
   @mix_env Mix.env()
@@ -26,53 +28,87 @@ def registry, do: @registry
   @user_streams ["user", "user:notification", "direct", "user:pleroma_chat"]
 
   @doc "Expands and authorizes a stream, and registers the process for streaming."
-  @spec get_topic_and_add_socket(stream :: String.t(), User.t() | nil, Map.t() | nil) ::
+  @spec get_topic_and_add_socket(
+          stream :: String.t(),
+          User.t() | nil,
+          Token.t() | nil,
+          Map.t() | nil
+        ) ::
           {:ok, topic :: String.t()} | {:error, :bad_topic} | {:error, :unauthorized}
-  def get_topic_and_add_socket(stream, user, params \\ %{}) do
-    case get_topic(stream, user, params) do
+  def get_topic_and_add_socket(stream, user, oauth_token, params \\ %{}) do
+    case get_topic(stream, user, oauth_token, params) do
       {:ok, topic} -> add_socket(topic, user)
       error -> error
     end
   end
 
   @doc "Expand and authorizes a stream"
-  @spec get_topic(stream :: String.t(), User.t() | nil, Map.t()) ::
+  @spec get_topic(stream :: String.t(), User.t() | nil, Token.t() | nil, Map.t()) ::
           {:ok, topic :: String.t()} | {:error, :bad_topic}
-  def get_topic(stream, user, params \\ %{})
+  def get_topic(stream, user, oauth_token, params \\ %{})
 
   # Allow all public steams.
-  def get_topic(stream, _, _) when stream in @public_streams do
+  def get_topic(stream, _user, _oauth_token, _params) when stream in @public_streams do
     {:ok, stream}
   end
 
   # Allow all hashtags streams.
-  def get_topic("hashtag", _, %{"tag" => tag}) do
+  def get_topic("hashtag", _user, _oauth_token, %{"tag" => tag} = _params) do
     {:ok, "hashtag:" <> tag}
   end
 
   # Expand user streams.
-  def get_topic(stream, %User{} = user, _) when stream in @user_streams do
-    {:ok, stream <> ":" <> to_string(user.id)}
+  def get_topic(
+        stream,
+        %User{id: user_id} = user,
+        %Token{user_id: token_user_id} = oauth_token,
+        _params
+      )
+      when stream in @user_streams and user_id == token_user_id do
+    # Note: "read" works for all user streams (not mentioning it since it's an ancestor scope)
+    required_scopes =
+      if stream == "user:notification" do
+        ["read:notifications"]
+      else
+        ["read:statuses"]
+      end
+
+    if OAuthScopesPlug.filter_descendants(required_scopes, oauth_token.scopes) == [] do
+      {:error, :unauthorized}
+    else
+      {:ok, stream <> ":" <> to_string(user.id)}
+    end
   end
 
-  def get_topic(stream, _, _) when stream in @user_streams do
+  def get_topic(stream, _user, _oauth_token, _params) when stream in @user_streams do
     {:error, :unauthorized}
   end
 
   # List streams.
-  def get_topic("list", %User{} = user, %{"list" => id}) do
-    if Pleroma.List.get(id, user) do
-      {:ok, "list:" <> to_string(id)}
-    else
-      {:error, :bad_topic}
+  def get_topic(
+        "list",
+        %User{id: user_id} = user,
+        %Token{user_id: token_user_id} = oauth_token,
+        %{"list" => id}
+      )
+      when user_id == token_user_id do
+    cond do
+      OAuthScopesPlug.filter_descendants(["read", "read:lists"], oauth_token.scopes) == [] ->
+        {:error, :unauthorized}
+
+      Pleroma.List.get(id, user) ->
+        {:ok, "list:" <> to_string(id)}
+
+      true ->
+        {:error, :bad_topic}
     end
   end
 
-  def get_topic("list", _, _) do
+  def get_topic("list", _user, _oauth_token, _params) do
     {:error, :unauthorized}
   end
 
-  def get_topic(_, _, _) do
+  def get_topic(_stream, _user, _oauth_token, _params) do
     {:error, :bad_topic}
   end
 
diff --git a/test/integration/mastodon_websocket_test.exs b/test/integration/mastodon_websocket_test.exs
index 76fbc8bda..0f2e6cc2b 100644
--- a/test/integration/mastodon_websocket_test.exs
+++ b/test/integration/mastodon_websocket_test.exs
@@ -78,7 +78,7 @@ test "receives well formatted events" do
         Pleroma.Repo.insert(
           OAuth.App.register_changeset(%OAuth.App{}, %{
             client_name: "client",
-            scopes: ["scope"],
+            scopes: ["read"],
             redirect_uris: "url"
           })
         )
diff --git a/test/notification_test.exs b/test/notification_test.exs
index a09b08675..f2e0f0b0d 100644
--- a/test/notification_test.exs
+++ b/test/notification_test.exs
@@ -179,17 +179,19 @@ test "does not create a notification for subscribed users if status is a reply"
   describe "create_notification" do
     @tag needs_streamer: true
     test "it creates a notification for user and send to the 'user' and the 'user:notification' stream" do
-      user = insert(:user)
+      %{user: user, token: oauth_token} = oauth_access(["read"])
 
       task =
         Task.async(fn ->
-          Streamer.get_topic_and_add_socket("user", user)
+          {:ok, _topic} = Streamer.get_topic_and_add_socket("user", user, oauth_token)
           assert_receive {:render_with_user, _, _, _}, 4_000
         end)
 
       task_user_notification =
         Task.async(fn ->
-          Streamer.get_topic_and_add_socket("user:notification", user)
+          {:ok, _topic} =
+            Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
+
           assert_receive {:render_with_user, _, _, _}, 4_000
         end)
 
diff --git a/test/support/data_case.ex b/test/support/data_case.ex
index ba8848952..d5456521c 100644
--- a/test/support/data_case.ex
+++ b/test/support/data_case.ex
@@ -27,6 +27,21 @@ defmodule Pleroma.DataCase do
       import Ecto.Query
       import Pleroma.DataCase
       use Pleroma.Tests.Helpers
+
+      # Sets up OAuth access with specified scopes
+      defp oauth_access(scopes, opts \\ []) do
+        user =
+          Keyword.get_lazy(opts, :user, fn ->
+            Pleroma.Factory.insert(:user)
+          end)
+
+        token =
+          Keyword.get_lazy(opts, :oauth_token, fn ->
+            Pleroma.Factory.insert(:oauth_token, user: user, scopes: scopes)
+          end)
+
+        %{user: user, token: token}
+      end
     end
   end
 
diff --git a/test/web/streamer/streamer_test.exs b/test/web/streamer/streamer_test.exs
index d56d74464..185724a9f 100644
--- a/test/web/streamer/streamer_test.exs
+++ b/test/web/streamer/streamer_test.exs
@@ -21,92 +21,148 @@ defmodule Pleroma.Web.StreamerTest do
 
   setup do: clear_config([:instance, :skip_thread_containment])
 
-  describe "get_topic without an user" do
+  describe "get_topic/_ (unauthenticated)" do
     test "allows public" do
-      assert {:ok, "public"} = Streamer.get_topic("public", nil)
-      assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil)
-      assert {:ok, "public:media"} = Streamer.get_topic("public:media", nil)
-      assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", nil)
+      assert {:ok, "public"} = Streamer.get_topic("public", nil, nil)
+      assert {:ok, "public:local"} = Streamer.get_topic("public:local", nil, nil)
+      assert {:ok, "public:media"} = Streamer.get_topic("public:media", nil, nil)
+      assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", nil, nil)
     end
 
     test "allows hashtag streams" do
-      assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", nil, %{"tag" => "cofe"})
+      assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", nil, nil, %{"tag" => "cofe"})
     end
 
     test "disallows user streams" do
-      assert {:error, _} = Streamer.get_topic("user", nil)
-      assert {:error, _} = Streamer.get_topic("user:notification", nil)
-      assert {:error, _} = Streamer.get_topic("direct", nil)
+      assert {:error, _} = Streamer.get_topic("user", nil, nil)
+      assert {:error, _} = Streamer.get_topic("user:notification", nil, nil)
+      assert {:error, _} = Streamer.get_topic("direct", nil, nil)
     end
 
     test "disallows list streams" do
-      assert {:error, _} = Streamer.get_topic("list", nil, %{"list" => 42})
+      assert {:error, _} = Streamer.get_topic("list", nil, nil, %{"list" => 42})
     end
   end
 
-  describe "get_topic with an user" do
-    setup do
-      user = insert(:user)
-      {:ok, %{user: user}}
+  describe "get_topic/_ (authenticated)" do
+    setup do: oauth_access(["read"])
+
+    test "allows public streams (regardless of OAuth token scopes)", %{
+      user: user,
+      token: read_oauth_token
+    } do
+      with oauth_token <- [nil, read_oauth_token] do
+        assert {:ok, "public"} = Streamer.get_topic("public", user, oauth_token)
+        assert {:ok, "public:local"} = Streamer.get_topic("public:local", user, oauth_token)
+        assert {:ok, "public:media"} = Streamer.get_topic("public:media", user, oauth_token)
+
+        assert {:ok, "public:local:media"} =
+                 Streamer.get_topic("public:local:media", user, oauth_token)
+      end
     end
 
-    test "allows public streams", %{user: user} do
-      assert {:ok, "public"} = Streamer.get_topic("public", user)
-      assert {:ok, "public:local"} = Streamer.get_topic("public:local", user)
-      assert {:ok, "public:media"} = Streamer.get_topic("public:media", user)
-      assert {:ok, "public:local:media"} = Streamer.get_topic("public:local:media", user)
-    end
+    test "allows user streams (with proper OAuth token scopes)", %{
+      user: user,
+      token: read_oauth_token
+    } do
+      %{token: read_notifications_token} = oauth_access(["read:notifications"], user: user)
+      %{token: read_statuses_token} = oauth_access(["read:statuses"], user: user)
+      %{token: badly_scoped_token} = oauth_access(["irrelevant:scope"], user: user)
 
-    test "allows user streams", %{user: user} do
       expected_user_topic = "user:#{user.id}"
-      expected_notif_topic = "user:notification:#{user.id}"
+      expected_notification_topic = "user:notification:#{user.id}"
       expected_direct_topic = "direct:#{user.id}"
-      assert {:ok, ^expected_user_topic} = Streamer.get_topic("user", user)
-      assert {:ok, ^expected_notif_topic} = Streamer.get_topic("user:notification", user)
-      assert {:ok, ^expected_direct_topic} = Streamer.get_topic("direct", user)
+      expected_pleroma_chat_topic = "user:pleroma_chat:#{user.id}"
+
+      for valid_user_token <- [read_oauth_token, read_statuses_token] do
+        assert {:ok, ^expected_user_topic} = Streamer.get_topic("user", user, valid_user_token)
+
+        assert {:ok, ^expected_direct_topic} =
+                 Streamer.get_topic("direct", user, valid_user_token)
+
+        assert {:ok, ^expected_pleroma_chat_topic} =
+                 Streamer.get_topic("user:pleroma_chat", user, valid_user_token)
+      end
+
+      for invalid_user_token <- [read_notifications_token, badly_scoped_token],
+          user_topic <- ["user", "direct", "user:pleroma_chat"] do
+        assert {:error, :unauthorized} = Streamer.get_topic(user_topic, user, invalid_user_token)
+      end
+
+      for valid_notification_token <- [read_oauth_token, read_notifications_token] do
+        assert {:ok, ^expected_notification_topic} =
+                 Streamer.get_topic("user:notification", user, valid_notification_token)
+      end
+
+      for invalid_notification_token <- [read_statuses_token, badly_scoped_token] do
+        assert {:error, :unauthorized} =
+                 Streamer.get_topic("user:notification", user, invalid_notification_token)
+      end
     end
 
-    test "allows hashtag streams", %{user: user} do
-      assert {:ok, "hashtag:cofe"} = Streamer.get_topic("hashtag", user, %{"tag" => "cofe"})
+    test "allows hashtag streams (regardless of OAuth token scopes)", %{
+      user: user,
+      token: read_oauth_token
+    } do
+      for oauth_token <- [nil, read_oauth_token] do
+        assert {:ok, "hashtag:cofe"} =
+                 Streamer.get_topic("hashtag", user, oauth_token, %{"tag" => "cofe"})
+      end
     end
 
-    test "disallows registering to an user stream", %{user: user} do
+    test "disallows registering to another user's stream", %{user: user, token: read_oauth_token} do
       another_user = insert(:user)
-      assert {:error, _} = Streamer.get_topic("user:#{another_user.id}", user)
-      assert {:error, _} = Streamer.get_topic("user:notification:#{another_user.id}", user)
-      assert {:error, _} = Streamer.get_topic("direct:#{another_user.id}", user)
+      assert {:error, _} = Streamer.get_topic("user:#{another_user.id}", user, read_oauth_token)
+
+      assert {:error, _} =
+               Streamer.get_topic("user:notification:#{another_user.id}", user, read_oauth_token)
+
+      assert {:error, _} = Streamer.get_topic("direct:#{another_user.id}", user, read_oauth_token)
     end
 
-    test "allows list stream that are owned by the user", %{user: user} do
+    test "allows list stream that are owned by the user (with `read` or `read:lists` scopes)", %{
+      user: user,
+      token: read_oauth_token
+    } do
+      %{token: read_lists_token} = oauth_access(["read:lists"], user: user)
+      %{token: invalid_token} = oauth_access(["irrelevant:scope"], user: user)
       {:ok, list} = List.create("Test", user)
-      assert {:error, _} = Streamer.get_topic("list:#{list.id}", user)
-      assert {:ok, _} = Streamer.get_topic("list", user, %{"list" => list.id})
+
+      assert {:error, _} = Streamer.get_topic("list:#{list.id}", user, read_oauth_token)
+
+      for valid_token <- [read_oauth_token, read_lists_token] do
+        assert {:ok, _} = Streamer.get_topic("list", user, valid_token, %{"list" => list.id})
+      end
+
+      assert {:error, _} = Streamer.get_topic("list", user, invalid_token, %{"list" => list.id})
     end
 
-    test "disallows list stream that are not owned by the user", %{user: user} do
+    test "disallows list stream that are not owned by the user", %{user: user, token: oauth_token} do
       another_user = insert(:user)
       {:ok, list} = List.create("Test", another_user)
-      assert {:error, _} = Streamer.get_topic("list:#{list.id}", user)
-      assert {:error, _} = Streamer.get_topic("list", user, %{"list" => list.id})
+
+      assert {:error, _} = Streamer.get_topic("list:#{list.id}", user, oauth_token)
+      assert {:error, _} = Streamer.get_topic("list", user, oauth_token, %{"list" => list.id})
     end
   end
 
   describe "user streams" do
     setup do
-      user = insert(:user)
+      %{user: user, token: token} = oauth_access(["read"])
       notify = insert(:notification, user: user, activity: build(:note_activity))
-      {:ok, %{user: user, notify: notify}}
+      {:ok, %{user: user, notify: notify, token: token}}
     end
 
-    test "it streams the user's post in the 'user' stream", %{user: user} do
-      Streamer.get_topic_and_add_socket("user", user)
+    test "it streams the user's post in the 'user' stream", %{user: user, token: oauth_token} do
+      Streamer.get_topic_and_add_socket("user", user, oauth_token)
       {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
+
       assert_receive {:render_with_user, _, _, ^activity}
       refute Streamer.filtered_by_user?(user, activity)
     end
 
-    test "it streams boosts of the user in the 'user' stream", %{user: user} do
-      Streamer.get_topic_and_add_socket("user", user)
+    test "it streams boosts of the user in the 'user' stream", %{user: user, token: oauth_token} do
+      Streamer.get_topic_and_add_socket("user", user, oauth_token)
 
       other_user = insert(:user)
       {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"})
@@ -117,9 +173,10 @@ test "it streams boosts of the user in the 'user' stream", %{user: user} do
     end
 
     test "it does not stream announces of the user's own posts in the 'user' stream", %{
-      user: user
+      user: user,
+      token: oauth_token
     } do
-      Streamer.get_topic_and_add_socket("user", user)
+      Streamer.get_topic_and_add_socket("user", user, oauth_token)
 
       other_user = insert(:user)
       {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
@@ -129,9 +186,10 @@ test "it does not stream announces of the user's own posts in the 'user' stream"
     end
 
     test "it does stream notifications announces of the user's own posts in the 'user' stream", %{
-      user: user
+      user: user,
+      token: oauth_token
     } do
-      Streamer.get_topic_and_add_socket("user", user)
+      Streamer.get_topic_and_add_socket("user", user, oauth_token)
 
       other_user = insert(:user)
       {:ok, activity} = CommonAPI.post(user, %{status: "hey"})
@@ -145,8 +203,11 @@ test "it does stream notifications announces of the user's own posts in the 'use
       refute Streamer.filtered_by_user?(user, notification)
     end
 
-    test "it streams boosts of mastodon user in the 'user' stream", %{user: user} do
-      Streamer.get_topic_and_add_socket("user", user)
+    test "it streams boosts of mastodon user in the 'user' stream", %{
+      user: user,
+      token: oauth_token
+    } do
+      Streamer.get_topic_and_add_socket("user", user, oauth_token)
 
       other_user = insert(:user)
       {:ok, activity} = CommonAPI.post(other_user, %{status: "hey"})
@@ -164,21 +225,34 @@ test "it streams boosts of mastodon user in the 'user' stream", %{user: user} do
       refute Streamer.filtered_by_user?(user, announce)
     end
 
-    test "it sends notify to in the 'user' stream", %{user: user, notify: notify} do
-      Streamer.get_topic_and_add_socket("user", user)
+    test "it sends notify to in the 'user' stream", %{
+      user: user,
+      token: oauth_token,
+      notify: notify
+    } do
+      Streamer.get_topic_and_add_socket("user", user, oauth_token)
       Streamer.stream("user", notify)
+
       assert_receive {:render_with_user, _, _, ^notify}
       refute Streamer.filtered_by_user?(user, notify)
     end
 
-    test "it sends notify to in the 'user:notification' stream", %{user: user, notify: notify} do
-      Streamer.get_topic_and_add_socket("user:notification", user)
+    test "it sends notify to in the 'user:notification' stream", %{
+      user: user,
+      token: oauth_token,
+      notify: notify
+    } do
+      Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
       Streamer.stream("user:notification", notify)
+
       assert_receive {:render_with_user, _, _, ^notify}
       refute Streamer.filtered_by_user?(user, notify)
     end
 
-    test "it sends chat messages to the 'user:pleroma_chat' stream", %{user: user} do
+    test "it sends chat messages to the 'user:pleroma_chat' stream", %{
+      user: user,
+      token: oauth_token
+    } do
       other_user = insert(:user)
 
       {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno")
@@ -187,7 +261,7 @@ test "it sends chat messages to the 'user:pleroma_chat' stream", %{user: user} d
       cm_ref = MessageReference.for_chat_and_object(chat, object)
       cm_ref = %{cm_ref | chat: chat, object: object}
 
-      Streamer.get_topic_and_add_socket("user:pleroma_chat", user)
+      Streamer.get_topic_and_add_socket("user:pleroma_chat", user, oauth_token)
       Streamer.stream("user:pleroma_chat", {user, cm_ref})
 
       text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
@@ -196,7 +270,7 @@ test "it sends chat messages to the 'user:pleroma_chat' stream", %{user: user} d
       assert_receive {:text, ^text}
     end
 
-    test "it sends chat messages to the 'user' stream", %{user: user} do
+    test "it sends chat messages to the 'user' stream", %{user: user, token: oauth_token} do
       other_user = insert(:user)
 
       {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey cirno")
@@ -205,7 +279,7 @@ test "it sends chat messages to the 'user' stream", %{user: user} do
       cm_ref = MessageReference.for_chat_and_object(chat, object)
       cm_ref = %{cm_ref | chat: chat, object: object}
 
-      Streamer.get_topic_and_add_socket("user", user)
+      Streamer.get_topic_and_add_socket("user", user, oauth_token)
       Streamer.stream("user", {user, cm_ref})
 
       text = StreamerView.render("chat_update.json", %{chat_message_reference: cm_ref})
@@ -214,7 +288,10 @@ test "it sends chat messages to the 'user' stream", %{user: user} do
       assert_receive {:text, ^text}
     end
 
-    test "it sends chat message notifications to the 'user:notification' stream", %{user: user} do
+    test "it sends chat message notifications to the 'user:notification' stream", %{
+      user: user,
+      token: oauth_token
+    } do
       other_user = insert(:user)
 
       {:ok, create_activity} = CommonAPI.post_chat_message(other_user, user, "hey")
@@ -223,19 +300,21 @@ test "it sends chat message notifications to the 'user:notification' stream", %{
         Repo.get_by(Pleroma.Notification, user_id: user.id, activity_id: create_activity.id)
         |> Repo.preload(:activity)
 
-      Streamer.get_topic_and_add_socket("user:notification", user)
+      Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
       Streamer.stream("user:notification", notify)
+
       assert_receive {:render_with_user, _, _, ^notify}
       refute Streamer.filtered_by_user?(user, notify)
     end
 
     test "it doesn't send notify to the 'user:notification' stream when a user is blocked", %{
-      user: user
+      user: user,
+      token: oauth_token
     } do
       blocked = insert(:user)
       {:ok, _user_relationship} = User.block(user, blocked)
 
-      Streamer.get_topic_and_add_socket("user:notification", user)
+      Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
 
       {:ok, activity} = CommonAPI.post(user, %{status: ":("})
       {:ok, _} = CommonAPI.favorite(blocked, activity.id)
@@ -244,14 +323,15 @@ test "it doesn't send notify to the 'user:notification' stream when a user is bl
     end
 
     test "it doesn't send notify to the 'user:notification' stream when a thread is muted", %{
-      user: user
+      user: user,
+      token: oauth_token
     } do
       user2 = insert(:user)
 
       {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
       {:ok, _} = CommonAPI.add_mute(user, activity)
 
-      Streamer.get_topic_and_add_socket("user:notification", user)
+      Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
 
       {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
 
@@ -260,12 +340,13 @@ test "it doesn't send notify to the 'user:notification' stream when a thread is
     end
 
     test "it sends favorite to 'user:notification' stream'", %{
-      user: user
+      user: user,
+      token: oauth_token
     } do
       user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
 
       {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
-      Streamer.get_topic_and_add_socket("user:notification", user)
+      Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
       {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
 
       assert_receive {:render_with_user, _, "notification.json", notif}
@@ -274,13 +355,14 @@ test "it sends favorite to 'user:notification' stream'", %{
     end
 
     test "it doesn't send the 'user:notification' stream' when a domain is blocked", %{
-      user: user
+      user: user,
+      token: oauth_token
     } do
       user2 = insert(:user, %{ap_id: "https://hecking-lewd-place.com/user/meanie"})
 
       {:ok, user} = User.block_domain(user, "hecking-lewd-place.com")
       {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
-      Streamer.get_topic_and_add_socket("user:notification", user)
+      Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
       {:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
 
       refute_receive _
@@ -288,7 +370,8 @@ test "it doesn't send the 'user:notification' stream' when a domain is blocked",
     end
 
     test "it sends follow activities to the 'user:notification' stream", %{
-      user: user
+      user: user,
+      token: oauth_token
     } do
       user_url = user.ap_id
       user2 = insert(:user)
@@ -303,7 +386,7 @@ test "it sends follow activities to the 'user:notification' stream", %{
           %Tesla.Env{status: 200, body: body}
       end)
 
-      Streamer.get_topic_and_add_socket("user:notification", user)
+      Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
       {:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user)
 
       assert_receive {:render_with_user, _, "notification.json", notif}
@@ -312,51 +395,53 @@ test "it sends follow activities to the 'user:notification' stream", %{
     end
   end
 
-  test "it sends to public authenticated" do
-    user = insert(:user)
-    other_user = insert(:user)
+  describe "public streams" do
+    test "it sends to public (authenticated)" do
+      %{user: user, token: oauth_token} = oauth_access(["read"])
+      other_user = insert(:user)
 
-    Streamer.get_topic_and_add_socket("public", other_user)
+      Streamer.get_topic_and_add_socket("public", user, oauth_token)
 
-    {:ok, activity} = CommonAPI.post(user, %{status: "Test"})
-    assert_receive {:render_with_user, _, _, ^activity}
-    refute Streamer.filtered_by_user?(user, activity)
+      {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
+      assert_receive {:render_with_user, _, _, ^activity}
+      refute Streamer.filtered_by_user?(other_user, activity)
+    end
+
+    test "it sends to public (unauthenticated)" do
+      user = insert(:user)
+
+      Streamer.get_topic_and_add_socket("public", nil, nil)
+
+      {:ok, activity} = CommonAPI.post(user, %{status: "Test"})
+      activity_id = activity.id
+      assert_receive {:text, event}
+      assert %{"event" => "update", "payload" => payload} = Jason.decode!(event)
+      assert %{"id" => ^activity_id} = Jason.decode!(payload)
+
+      {:ok, _} = CommonAPI.delete(activity.id, user)
+      assert_receive {:text, event}
+      assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
+    end
+
+    test "handles deletions" do
+      %{user: user, token: oauth_token} = oauth_access(["read"])
+      other_user = insert(:user)
+      {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
+
+      Streamer.get_topic_and_add_socket("public", user, oauth_token)
+
+      {:ok, _} = CommonAPI.delete(activity.id, other_user)
+      activity_id = activity.id
+      assert_receive {:text, event}
+      assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
+    end
   end
 
-  test "works for deletions" do
-    user = insert(:user)
-    other_user = insert(:user)
-    {:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
-
-    Streamer.get_topic_and_add_socket("public", user)
-
-    {:ok, _} = CommonAPI.delete(activity.id, other_user)
-    activity_id = activity.id
-    assert_receive {:text, event}
-    assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
-  end
-
-  test "it sends to public unauthenticated" do
-    user = insert(:user)
-
-    Streamer.get_topic_and_add_socket("public", nil)
-
-    {:ok, activity} = CommonAPI.post(user, %{status: "Test"})
-    activity_id = activity.id
-    assert_receive {:text, event}
-    assert %{"event" => "update", "payload" => payload} = Jason.decode!(event)
-    assert %{"id" => ^activity_id} = Jason.decode!(payload)
-
-    {:ok, _} = CommonAPI.delete(activity.id, user)
-    assert_receive {:text, event}
-    assert %{"event" => "delete", "payload" => ^activity_id} = Jason.decode!(event)
-  end
-
-  describe "thread_containment" do
+  describe "thread_containment/2" do
     test "it filters to user if recipients invalid and thread containment is enabled" do
       Pleroma.Config.put([:instance, :skip_thread_containment], false)
       author = insert(:user)
-      user = insert(:user)
+      %{user: user, token: oauth_token} = oauth_access(["read"])
       User.follow(user, author, :follow_accept)
 
       activity =
@@ -368,7 +453,7 @@ test "it filters to user if recipients invalid and thread containment is enabled
             )
         )
 
-      Streamer.get_topic_and_add_socket("public", user)
+      Streamer.get_topic_and_add_socket("public", user, oauth_token)
       Streamer.stream("public", activity)
       assert_receive {:render_with_user, _, _, ^activity}
       assert Streamer.filtered_by_user?(user, activity)
@@ -377,7 +462,7 @@ test "it filters to user if recipients invalid and thread containment is enabled
     test "it sends message if recipients invalid and thread containment is disabled" do
       Pleroma.Config.put([:instance, :skip_thread_containment], true)
       author = insert(:user)
-      user = insert(:user)
+      %{user: user, token: oauth_token} = oauth_access(["read"])
       User.follow(user, author, :follow_accept)
 
       activity =
@@ -389,7 +474,7 @@ test "it sends message if recipients invalid and thread containment is disabled"
             )
         )
 
-      Streamer.get_topic_and_add_socket("public", user)
+      Streamer.get_topic_and_add_socket("public", user, oauth_token)
       Streamer.stream("public", activity)
 
       assert_receive {:render_with_user, _, _, ^activity}
@@ -400,6 +485,7 @@ test "it sends message if recipients invalid and thread containment is enabled b
       Pleroma.Config.put([:instance, :skip_thread_containment], false)
       author = insert(:user)
       user = insert(:user, skip_thread_containment: true)
+      %{token: oauth_token} = oauth_access(["read"], user: user)
       User.follow(user, author, :follow_accept)
 
       activity =
@@ -411,7 +497,7 @@ test "it sends message if recipients invalid and thread containment is enabled b
             )
         )
 
-      Streamer.get_topic_and_add_socket("public", user)
+      Streamer.get_topic_and_add_socket("public", user, oauth_token)
       Streamer.stream("public", activity)
 
       assert_receive {:render_with_user, _, _, ^activity}
@@ -420,23 +506,26 @@ test "it sends message if recipients invalid and thread containment is enabled b
   end
 
   describe "blocks" do
-    test "it filters messages involving blocked users" do
-      user = insert(:user)
+    setup do: oauth_access(["read"])
+
+    test "it filters messages involving blocked users", %{user: user, token: oauth_token} do
       blocked_user = insert(:user)
       {:ok, _user_relationship} = User.block(user, blocked_user)
 
-      Streamer.get_topic_and_add_socket("public", user)
+      Streamer.get_topic_and_add_socket("public", user, oauth_token)
       {:ok, activity} = CommonAPI.post(blocked_user, %{status: "Test"})
       assert_receive {:render_with_user, _, _, ^activity}
       assert Streamer.filtered_by_user?(user, activity)
     end
 
-    test "it filters messages transitively involving blocked users" do
-      blocker = insert(:user)
+    test "it filters messages transitively involving blocked users", %{
+      user: blocker,
+      token: blocker_token
+    } do
       blockee = insert(:user)
       friend = insert(:user)
 
-      Streamer.get_topic_and_add_socket("public", blocker)
+      Streamer.get_topic_and_add_socket("public", blocker, blocker_token)
 
       {:ok, _user_relationship} = User.block(blocker, blockee)
 
@@ -458,8 +547,9 @@ test "it filters messages transitively involving blocked users" do
   end
 
   describe "lists" do
-    test "it doesn't send unwanted DMs to list" do
-      user_a = insert(:user)
+    setup do: oauth_access(["read"])
+
+    test "it doesn't send unwanted DMs to list", %{user: user_a, token: user_a_token} do
       user_b = insert(:user)
       user_c = insert(:user)
 
@@ -468,7 +558,7 @@ test "it doesn't send unwanted DMs to list" do
       {:ok, list} = List.create("Test", user_a)
       {:ok, list} = List.follow(list, user_b)
 
-      Streamer.get_topic_and_add_socket("list", user_a, %{"list" => list.id})
+      Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
 
       {:ok, _activity} =
         CommonAPI.post(user_b, %{
@@ -479,14 +569,13 @@ test "it doesn't send unwanted DMs to list" do
       refute_receive _
     end
 
-    test "it doesn't send unwanted private posts to list" do
-      user_a = insert(:user)
+    test "it doesn't send unwanted private posts to list", %{user: user_a, token: user_a_token} do
       user_b = insert(:user)
 
       {:ok, list} = List.create("Test", user_a)
       {:ok, list} = List.follow(list, user_b)
 
-      Streamer.get_topic_and_add_socket("list", user_a, %{"list" => list.id})
+      Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
 
       {:ok, _activity} =
         CommonAPI.post(user_b, %{
@@ -497,8 +586,7 @@ test "it doesn't send unwanted private posts to list" do
       refute_receive _
     end
 
-    test "it sends wanted private posts to list" do
-      user_a = insert(:user)
+    test "it sends wanted private posts to list", %{user: user_a, token: user_a_token} do
       user_b = insert(:user)
 
       {:ok, user_a} = User.follow(user_a, user_b)
@@ -506,7 +594,7 @@ test "it sends wanted private posts to list" do
       {:ok, list} = List.create("Test", user_a)
       {:ok, list} = List.follow(list, user_b)
 
-      Streamer.get_topic_and_add_socket("list", user_a, %{"list" => list.id})
+      Streamer.get_topic_and_add_socket("list", user_a, user_a_token, %{"list" => list.id})
 
       {:ok, activity} =
         CommonAPI.post(user_b, %{
@@ -520,8 +608,9 @@ test "it sends wanted private posts to list" do
   end
 
   describe "muted reblogs" do
-    test "it filters muted reblogs" do
-      user1 = insert(:user)
+    setup do: oauth_access(["read"])
+
+    test "it filters muted reblogs", %{user: user1, token: user1_token} do
       user2 = insert(:user)
       user3 = insert(:user)
       CommonAPI.follow(user1, user2)
@@ -529,34 +618,38 @@ test "it filters muted reblogs" do
 
       {:ok, create_activity} = CommonAPI.post(user3, %{status: "I'm kawen"})
 
-      Streamer.get_topic_and_add_socket("user", user1)
+      Streamer.get_topic_and_add_socket("user", user1, user1_token)
       {:ok, announce_activity} = CommonAPI.repeat(create_activity.id, user2)
       assert_receive {:render_with_user, _, _, ^announce_activity}
       assert Streamer.filtered_by_user?(user1, announce_activity)
     end
 
-    test "it filters reblog notification for reblog-muted actors" do
-      user1 = insert(:user)
+    test "it filters reblog notification for reblog-muted actors", %{
+      user: user1,
+      token: user1_token
+    } do
       user2 = insert(:user)
       CommonAPI.follow(user1, user2)
       CommonAPI.hide_reblogs(user1, user2)
 
       {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"})
-      Streamer.get_topic_and_add_socket("user", user1)
+      Streamer.get_topic_and_add_socket("user", user1, user1_token)
       {:ok, _announce_activity} = CommonAPI.repeat(create_activity.id, user2)
 
       assert_receive {:render_with_user, _, "notification.json", notif}
       assert Streamer.filtered_by_user?(user1, notif)
     end
 
-    test "it send non-reblog notification for reblog-muted actors" do
-      user1 = insert(:user)
+    test "it send non-reblog notification for reblog-muted actors", %{
+      user: user1,
+      token: user1_token
+    } do
       user2 = insert(:user)
       CommonAPI.follow(user1, user2)
       CommonAPI.hide_reblogs(user1, user2)
 
       {:ok, create_activity} = CommonAPI.post(user1, %{status: "I'm kawen"})
-      Streamer.get_topic_and_add_socket("user", user1)
+      Streamer.get_topic_and_add_socket("user", user1, user1_token)
       {:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id)
 
       assert_receive {:render_with_user, _, "notification.json", notif}
@@ -564,27 +657,28 @@ test "it send non-reblog notification for reblog-muted actors" do
     end
   end
 
-  test "it filters posts from muted threads" do
-    user = insert(:user)
-    user2 = insert(:user)
-    Streamer.get_topic_and_add_socket("user", user2)
-    {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
-    {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
-    {:ok, _} = CommonAPI.add_mute(user2, activity)
-    assert_receive {:render_with_user, _, _, ^activity}
-    assert Streamer.filtered_by_user?(user2, activity)
+  describe "muted threads" do
+    test "it filters posts from muted threads" do
+      user = insert(:user)
+      %{user: user2, token: user2_token} = oauth_access(["read"])
+      Streamer.get_topic_and_add_socket("user", user2, user2_token)
+
+      {:ok, user2, user, _activity} = CommonAPI.follow(user2, user)
+      {:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
+      {:ok, _} = CommonAPI.add_mute(user2, activity)
+
+      assert_receive {:render_with_user, _, _, ^activity}
+      assert Streamer.filtered_by_user?(user2, activity)
+    end
   end
 
   describe "direct streams" do
-    setup do
-      :ok
-    end
+    setup do: oauth_access(["read"])
 
-    test "it sends conversation update to the 'direct' stream", %{} do
-      user = insert(:user)
+    test "it sends conversation update to the 'direct' stream", %{user: user, token: oauth_token} do
       another_user = insert(:user)
 
-      Streamer.get_topic_and_add_socket("direct", user)
+      Streamer.get_topic_and_add_socket("direct", user, oauth_token)
 
       {:ok, _create_activity} =
         CommonAPI.post(another_user, %{
@@ -602,11 +696,11 @@ test "it sends conversation update to the 'direct' stream", %{} do
       assert last_status["pleroma"]["direct_conversation_id"] == participation.id
     end
 
-    test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted" do
-      user = insert(:user)
+    test "it doesn't send conversation update to the 'direct' stream when the last message in the conversation is deleted",
+         %{user: user, token: oauth_token} do
       another_user = insert(:user)
 
-      Streamer.get_topic_and_add_socket("direct", user)
+      Streamer.get_topic_and_add_socket("direct", user, oauth_token)
 
       {:ok, create_activity} =
         CommonAPI.post(another_user, %{
@@ -629,10 +723,12 @@ test "it doesn't send conversation update to the 'direct' stream when the last m
       refute_receive _
     end
 
-    test "it sends conversation update to the 'direct' stream when a message is deleted" do
-      user = insert(:user)
+    test "it sends conversation update to the 'direct' stream when a message is deleted", %{
+      user: user,
+      token: oauth_token
+    } do
       another_user = insert(:user)
-      Streamer.get_topic_and_add_socket("direct", user)
+      Streamer.get_topic_and_add_socket("direct", user, oauth_token)
 
       {:ok, create_activity} =
         CommonAPI.post(another_user, %{