add multi-stream sockets
ci/woodpecker/push/woodpecker Pipeline is pending Details

This commit is contained in:
FloatingGhost 2022-08-16 12:05:24 +01:00
parent dcf6531eba
commit 1b75aeead6
13 changed files with 161 additions and 43 deletions

View File

@ -734,6 +734,14 @@ config :pleroma, :frontends,
"build_dir" => "distribution",
"ref" => "akkoma"
},
"fedibird-fe" => %{
"name" => "fedibird-fe",
"git" => "https://akkoma.dev/AkkomaGang/fedibird-fe",
"build_url" =>
"https://akkoma-updates.s3-website.fr-par.scw.cloud/frontend/${ref}/fedibird-fe.zip",
"build_dir" => "distribution",
"ref" => "akkoma"
},
"admin-fe" => %{
"name" => "admin-fe",
"git" => "https://akkoma.dev/AkkomaGang/admin-fe",

View File

@ -27,9 +27,21 @@ defmodule Pleroma.Web.MastoFEController do
def index(conn, _params) do
with %{assigns: %{user: %User{} = user, token: %Token{app_id: token_app_id} = token}} <- conn,
{:ok, %{id: ^token_app_id}} <- AuthController.local_mastofe_app() do
flavour =
[:frontends, :mastodon]
|> Pleroma.Config.get()
|> Map.get("name", "mastodon-fe")
index =
if flavour == "fedibird-fe" do
"fedibird.index.html"
else
"glitchsoc.index.html"
end
conn
|> put_layout(false)
|> render("index.html",
|> render(index,
token: token.token,
user: user,
custom_emojis: Pleroma.Emoji.get_all()

View File

@ -32,8 +32,15 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
req
end
{:cowboy_websocket, req, %{user: user, topic: topic, count: 0, timer: nil},
%{idle_timeout: @timeout}}
{:cowboy_websocket, req,
%{
user: user,
topic: topic,
count: 0,
timer: nil,
subscriptions: [],
oauth_token: oauth_token
}, %{idle_timeout: @timeout}}
else
{:error, :bad_topic} ->
Logger.debug("#{__MODULE__} bad topic #{inspect(req)}")
@ -70,16 +77,45 @@ defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do
{:reply, {:text, "pong"}, %{state | timer: timer()}}
end
def websocket_handle({:text, text}, state) do
with {:ok, json} <- Jason.decode(text) do
websocket_handle({:json, json}, state)
else
_ ->
Logger.error("#{__MODULE__} received text frame: #{text}")
{:ok, state}
end
end
def websocket_handle(
{:json, %{"type" => "subscribe", "stream" => stream_name}},
%{user: user, oauth_token: token} = state
) do
with {:ok, topic} <- Streamer.get_topic(stream_name, user, token, %{}) do
new_subscriptions =
[topic | Map.get(state, :subscriptions, [])]
|> Enum.uniq()
{:ok, _topic} = Streamer.add_socket(topic, user)
{:ok, Map.put(state, :subscriptions, new_subscriptions)}
else
_ ->
Logger.error("#{__MODULE__} received invalid topic: #{stream_name}")
{:ok, state}
end
end
def websocket_handle(frame, state) do
Logger.error("#{__MODULE__} received frame: #{inspect(frame)}")
{:ok, state}
end
def websocket_info({:render_with_user, view, template, item}, state) do
def websocket_info({:render_with_user, view, template, item, topic}, state) do
user = %User{} = User.get_cached_by_ap_id(state.user.ap_id)
unless Streamer.filtered_by_user?(user, item) do
websocket_info({:text, view.render(template, item, user)}, %{state | user: user})
websocket_info({:text, view.render(template, item, user, topic)}, %{state | user: user})
else
{:ok, state}
end

View File

@ -75,6 +75,7 @@ defmodule Pleroma.Web.PleromaAPI.EmojiReactionController do
def create(%{assigns: %{user: user}} = conn, %{id: activity_id, emoji: emoji}) do
emoji = Pleroma.Emoji.maybe_quote(emoji)
with {:ok, _activity} <- CommonAPI.react_with_emoji(activity_id, user, emoji) do
activity = Activity.get_by_id(activity_id)

View File

@ -114,6 +114,11 @@ defmodule Pleroma.Web.Streamer do
{:error, :unauthorized}
end
# mastodon multi-topic WS
def get_topic(nil, _user, _oauth_token, _params) do
{:ok, :multi}
end
def get_topic(_stream, _user, _oauth_token, _params) do
{:error, :bad_topic}
end
@ -186,8 +191,8 @@ defmodule Pleroma.Web.Streamer do
end
defp do_stream("follow_relationship", item) do
text = StreamerView.render("follow_relationships_update.json", item)
user_topic = "user:#{item.follower.id}"
text = StreamerView.render("follow_relationships_update.json", item, user_topic)
Logger.debug("Trying to push follow relationship update to #{user_topic}\n\n")
@ -235,7 +240,7 @@ defmodule Pleroma.Web.Streamer do
when topic in ["user", "user:notification"] do
Registry.dispatch(@registry, "#{topic}:#{item.user_id}", fn list ->
Enum.each(list, fn {pid, _auth} ->
send(pid, {:render_with_user, StreamerView, "notification.json", item})
send(pid, {:render_with_user, StreamerView, "notification.json", item, topic})
end)
end)
end
@ -259,7 +264,7 @@ defmodule Pleroma.Web.Streamer do
end
defp push_to_socket(topic, %Participation{} = participation) do
rendered = StreamerView.render("conversation.json", participation)
rendered = StreamerView.render("conversation.json", participation, topic)
Registry.dispatch(@registry, topic, fn list ->
Enum.each(list, fn {pid, _} ->
@ -283,12 +288,12 @@ defmodule Pleroma.Web.Streamer do
defp push_to_socket(_topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
defp push_to_socket(topic, item) do
anon_render = StreamerView.render("update.json", item)
anon_render = StreamerView.render("update.json", item, topic)
Registry.dispatch(@registry, topic, fn list ->
Enum.each(list, fn {pid, auth?} ->
if auth? do
send(pid, {:render_with_user, StreamerView, "update.json", item})
send(pid, {:render_with_user, StreamerView, "update.json", item, topic})
else
send(pid, {:text, anon_render})
end

View File

@ -0,0 +1,35 @@
<!DOCTYPE html>
<html lang='en'>
<head>
<meta charset='utf-8'>
<meta content='width=device-width, initial-scale=1' name='viewport'>
<title>
<%= Config.get([:instance, :name]) %>
</title>
<link rel="icon" type="image/png" href="/favicon.png"/>
<link rel="manifest" type="applicaton/manifest+json" href="<%= Routes.masto_fe_path(Pleroma.Web.Endpoint, :manifest) %>" />
<meta name="theme-color" content="<%= Config.get([:manifest, :theme_color]) %>" />
<script crossorigin='anonymous' src="/packs/js/locales.js"></script>
<script crossorigin='anonymous' src="/packs/js/locales/glitch/en.js"></script>
<link rel='preload' as='script' crossorigin='anonymous' href='/packs/js/features/getting_started.js'>
<link rel='preload' as='script' crossorigin='anonymous' href='/packs/js/features/compose.js'>
<link rel='preload' as='script' crossorigin='anonymous' href='/packs/js/features/home_timeline.js'>
<link rel='preload' as='script' crossorigin='anonymous' href='/packs/js/features/notifications.js'>
<script id='initial-state' type='application/json'><%= initial_state(@token, @user, @custom_emojis) %></script>
<script src="/packs/js/core/common.js"></script>
<link rel="stylesheet" media="all" href="/packs/css/core/common.css" />
<script src="/packs/js/flavours/glitch/common.js"></script>
<link rel="stylesheet" media="all" href="/packs/css/flavours/glitch/common.css" />
<script src="/packs/js/flavours/glitch/home.js"></script>
</head>
<body class='app-body no-reduce-motion system-font'>
<div class='app-holder' data-props='{&quot;locale&quot;:&quot;en&quot;}' id='mastodon'>
</div>
</body>
</html>

View File

@ -30,7 +30,8 @@ defmodule Pleroma.Web.MastoFEView do
mascot: User.get_mascot(user)["url"],
show_quote_button: true,
enable_reaction: true,
compact_reaction: false
compact_reaction: false,
advanced_layout: true
},
poll_limits: Config.get([:instance, :poll_limits]),
rights: %{

View File

@ -11,8 +11,9 @@ defmodule Pleroma.Web.StreamerView do
alias Pleroma.User
alias Pleroma.Web.MastodonAPI.NotificationView
def render("update.json", %Activity{} = activity, %User{} = user) do
def render("update.json", %Activity{} = activity, %User{} = user, topic) do
%{
stream: [topic],
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
@ -25,8 +26,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!()
end
def render("notification.json", %Notification{} = notify, %User{} = user) do
def render("notification.json", %Notification{} = notify, %User{} = user, topic) do
%{
stream: [topic],
event: "notification",
payload:
NotificationView.render(
@ -38,8 +40,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!()
end
def render("update.json", %Activity{} = activity) do
def render("update.json", %Activity{} = activity, topic) do
%{
stream: [topic],
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
@ -51,8 +54,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!()
end
def render("follow_relationships_update.json", item) do
def render("follow_relationships_update.json", item, topic) do
%{
stream: [topic],
event: "pleroma:follow_relationships_update",
payload:
%{
@ -73,8 +77,9 @@ defmodule Pleroma.Web.StreamerView do
|> Jason.encode!()
end
def render("conversation.json", %Participation{} = participation) do
def render("conversation.json", %Participation{} = participation, topic) do
%{
stream: [topic],
event: "conversation",
payload:
Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{

View File

@ -31,9 +31,9 @@ defmodule Pleroma.Integration.MastodonWebsocketTest do
WebsocketClient.start_link(self(), path, headers)
end
test "refuses invalid requests" do
test "allows multi-streams" do
capture_log(fn ->
assert {:error, {404, _}} = start_socket()
assert {:ok, _} = start_socket()
assert {:error, {404, _}} = start_socket("?stream=ncjdk")
Process.sleep(30)
end)

View File

@ -224,7 +224,7 @@ defmodule Pleroma.NotificationTest do
task =
Task.async(fn ->
{:ok, _topic} = Streamer.get_topic_and_add_socket("user", user, oauth_token)
assert_receive {:render_with_user, _, _, _}, 4_000
assert_receive {:render_with_user, _, _, _, "user"}, 4_000
end)
task_user_notification =
@ -232,7 +232,7 @@ defmodule Pleroma.NotificationTest do
{:ok, _topic} =
Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
assert_receive {:render_with_user, _, _, _}, 4_000
assert_receive {:render_with_user, _, _, _, "user:notification"}, 4_000
end)
activity = insert(:note_activity)

View File

@ -272,6 +272,7 @@ defmodule Pleroma.Web.MastodonAPI.StatusViewTest do
spoiler_text: HTML.filter_tags(object_data["summary"]),
visibility: "public",
media_attachments: [],
emoji_reactions: [],
mentions: [],
tags: [
%{

View File

@ -157,7 +157,8 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user", user, oauth_token)
{:ok, activity} = CommonAPI.post(user, %{status: "hey"})
assert_receive {:render_with_user, _, _, ^activity}
stream_name = "user:#{user.id}"
assert_receive {:render_with_user, _, _, ^activity, ^stream_name}
refute Streamer.filtered_by_user?(user, activity)
end
@ -168,7 +169,11 @@ defmodule Pleroma.Web.StreamerTest do
{:ok, activity} = CommonAPI.post(other_user, %{status: "hey"})
{:ok, announce} = CommonAPI.repeat(activity.id, user)
assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce}
stream_name = "user:#{user.id}"
assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce,
^stream_name}
refute Streamer.filtered_by_user?(user, announce)
end
@ -221,7 +226,11 @@ defmodule Pleroma.Web.StreamerTest do
{:ok, %Pleroma.Activity{data: _data, local: false} = announce} =
Pleroma.Web.ActivityPub.Transmogrifier.handle_incoming(data)
assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce}
stream_name = "user:#{user.id}"
assert_receive {:render_with_user, Pleroma.Web.StreamerView, "update.json", ^announce,
^stream_name}
refute Streamer.filtered_by_user?(user, announce)
end
@ -233,7 +242,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user", user, oauth_token)
Streamer.stream("user", notify)
assert_receive {:render_with_user, _, _, ^notify}
assert_receive {:render_with_user, _, _, ^notify, "user"}
refute Streamer.filtered_by_user?(user, notify)
end
@ -245,7 +254,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
Streamer.stream("user:notification", notify)
assert_receive {:render_with_user, _, _, ^notify}
assert_receive {:render_with_user, _, _, ^notify, "user:notification"}
refute Streamer.filtered_by_user?(user, notify)
end
@ -291,7 +300,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
{:ok, favorite_activity} = CommonAPI.favorite(user2, activity.id)
assert_receive {:render_with_user, _, "notification.json", notif}
assert_receive {:render_with_user, _, "notification.json", notif, "user:notification"}
assert notif.activity.id == favorite_activity.id
refute Streamer.filtered_by_user?(user, notif)
end
@ -320,7 +329,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user:notification", user, oauth_token)
{:ok, _follower, _followed, follow_activity} = CommonAPI.follow(user2, user)
assert_receive {:render_with_user, _, "notification.json", notif}
assert_receive {:render_with_user, _, "notification.json", notif, "user:notification"}
assert notif.activity.id == follow_activity.id
refute Streamer.filtered_by_user?(user, notif)
end
@ -384,7 +393,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("public", user, oauth_token)
{:ok, activity} = CommonAPI.post(other_user, %{status: "Test"})
assert_receive {:render_with_user, _, _, ^activity}
assert_receive {:render_with_user, _, _, ^activity, "public"}
refute Streamer.filtered_by_user?(other_user, activity)
end
@ -436,7 +445,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("public", user, oauth_token)
Streamer.stream("public", activity)
assert_receive {:render_with_user, _, _, ^activity}
assert_receive {:render_with_user, _, _, ^activity, "public"}
assert Streamer.filtered_by_user?(user, activity)
end
@ -458,7 +467,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("public", user, oauth_token)
Streamer.stream("public", activity)
assert_receive {:render_with_user, _, _, ^activity}
assert_receive {:render_with_user, _, _, ^activity, "public"}
refute Streamer.filtered_by_user?(user, activity)
end
@ -481,7 +490,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("public", user, oauth_token)
Streamer.stream("public", activity)
assert_receive {:render_with_user, _, _, ^activity}
assert_receive {:render_with_user, _, _, ^activity, "public"}
refute Streamer.filtered_by_user?(user, activity)
end
end
@ -495,7 +504,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("public", user, oauth_token)
{:ok, activity} = CommonAPI.post(blocked_user, %{status: "Test"})
assert_receive {:render_with_user, _, _, ^activity}
assert_receive {:render_with_user, _, _, ^activity, "public"}
assert Streamer.filtered_by_user?(user, activity)
end
@ -512,17 +521,17 @@ defmodule Pleroma.Web.StreamerTest do
{:ok, activity_one} = CommonAPI.post(friend, %{status: "hey! @#{blockee.nickname}"})
assert_receive {:render_with_user, _, _, ^activity_one}
assert_receive {:render_with_user, _, _, ^activity_one, "public"}
assert Streamer.filtered_by_user?(blocker, activity_one)
{:ok, activity_two} = CommonAPI.post(blockee, %{status: "hey! @#{friend.nickname}"})
assert_receive {:render_with_user, _, _, ^activity_two}
assert_receive {:render_with_user, _, _, ^activity_two, "public"}
assert Streamer.filtered_by_user?(blocker, activity_two)
{:ok, activity_three} = CommonAPI.post(blockee, %{status: "hey! @#{blocker.nickname}"})
assert_receive {:render_with_user, _, _, ^activity_three}
assert_receive {:render_with_user, _, _, ^activity_three, "public"}
assert Streamer.filtered_by_user?(blocker, activity_three)
end
end
@ -583,7 +592,8 @@ defmodule Pleroma.Web.StreamerTest do
visibility: "private"
})
assert_receive {:render_with_user, _, _, ^activity}
stream_name = "list:#{list.id}"
assert_receive {:render_with_user, _, _, ^activity, ^stream_name}
refute Streamer.filtered_by_user?(user_a, activity)
end
end
@ -601,7 +611,8 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user", user1, user1_token)
{:ok, announce_activity} = CommonAPI.repeat(create_activity.id, user2)
assert_receive {:render_with_user, _, _, ^announce_activity}
stream_name = "user:#{user1.id}"
assert_receive {:render_with_user, _, _, ^announce_activity, ^stream_name}
assert Streamer.filtered_by_user?(user1, announce_activity)
end
@ -617,7 +628,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user", user1, user1_token)
{:ok, _announce_activity} = CommonAPI.repeat(create_activity.id, user2)
assert_receive {:render_with_user, _, "notification.json", notif}
assert_receive {:render_with_user, _, "notification.json", notif, "user"}
assert Streamer.filtered_by_user?(user1, notif)
end
@ -633,7 +644,7 @@ defmodule Pleroma.Web.StreamerTest do
Streamer.get_topic_and_add_socket("user", user1, user1_token)
{:ok, _favorite_activity} = CommonAPI.favorite(user2, create_activity.id)
assert_receive {:render_with_user, _, "notification.json", notif}
assert_receive {:render_with_user, _, "notification.json", notif, "user"}
refute Streamer.filtered_by_user?(user1, notif)
end
end
@ -648,7 +659,8 @@ defmodule Pleroma.Web.StreamerTest do
{:ok, activity} = CommonAPI.post(user, %{status: "super hot take"})
{:ok, _} = CommonAPI.add_mute(user2, activity)
assert_receive {:render_with_user, _, _, ^activity}
stream_name = "user:#{user2.id}"
assert_receive {:render_with_user, _, _, ^activity, ^stream_name}
assert Streamer.filtered_by_user?(user2, activity)
end
end
@ -690,7 +702,8 @@ defmodule Pleroma.Web.StreamerTest do
})
create_activity_id = create_activity.id
assert_receive {:render_with_user, _, _, ^create_activity}
stream_name = "direct:#{user.id}"
assert_receive {:render_with_user, _, _, ^create_activity, ^stream_name}
assert_receive {:text, received_conversation1}
assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
@ -725,8 +738,9 @@ defmodule Pleroma.Web.StreamerTest do
visibility: "direct"
})
assert_receive {:render_with_user, _, _, ^create_activity}
assert_receive {:render_with_user, _, _, ^create_activity2}
stream_name = "direct:#{user.id}"
assert_receive {:render_with_user, _, _, ^create_activity, ^stream_name}
assert_receive {:render_with_user, _, _, ^create_activity2, ^stream_name}
assert_receive {:text, received_conversation1}
assert %{"event" => "conversation", "payload" => _} = Jason.decode!(received_conversation1)
assert_receive {:text, received_conversation1}