diff --git a/config/config.exs b/config/config.exs index 1777a54c0..df6ea09ae 100644 --- a/config/config.exs +++ b/config/config.exs @@ -50,6 +50,15 @@ config :pleroma, :uri_schemes, # Configures the endpoint config :pleroma, Pleroma.Web.Endpoint, url: [host: "localhost"], + http: [ + dispatch: [ + {:_, + [ + {"/api/v1/streaming", Elixir.Pleroma.Web.MastodonAPI.WebsocketHandler, []}, + {:_, Plug.Adapters.Cowboy.Handler, {Pleroma.Web.Endpoint, []}} + ]} + ] + ], protocol: "https", secret_key_base: "aK4Abxf29xU9TTDKre9coZPUgevcVCFQJe/5xP/7Lt4BEif6idBIbjupVbOrbKxl", signing_salt: "CqaoopA2", diff --git a/lib/pleroma/web/endpoint.ex b/lib/pleroma/web/endpoint.ex index aacc88195..e52667c72 100644 --- a/lib/pleroma/web/endpoint.ex +++ b/lib/pleroma/web/endpoint.ex @@ -3,8 +3,6 @@ defmodule Pleroma.Web.Endpoint do socket("/socket", Pleroma.Web.UserSocket) - socket("/api/v1", Pleroma.Web.MastodonAPI.MastodonSocket, websocket: [path: "/streaming"]) - # Serve at "/" the static files from "priv/static" directory. # # You should set gzip to true if you are running phoenix.digest diff --git a/lib/pleroma/web/mastodon_api/mastodon_socket.ex b/lib/pleroma/web/mastodon_api/mastodon_socket.ex deleted file mode 100644 index 1b75897b5..000000000 --- a/lib/pleroma/web/mastodon_api/mastodon_socket.ex +++ /dev/null @@ -1,74 +0,0 @@ -defmodule Pleroma.Web.MastodonAPI.MastodonSocket do - use Phoenix.Socket - - alias Pleroma.Web.OAuth.Token - alias Pleroma.{User, Repo} - - @spec connect(params :: map(), Phoenix.Socket.t()) :: {:ok, Phoenix.Socket.t()} | :error - def connect(%{"access_token" => token} = params, socket) do - with %Token{user_id: user_id} <- Repo.get_by(Token, token: token), - %User{} = user <- Repo.get(User, user_id), - stream - when stream in [ - "public", - "public:local", - "public:media", - "public:local:media", - "user", - "direct", - "list", - "hashtag" - ] <- params["stream"] do - topic = - case stream do - "hashtag" -> "hashtag:#{params["tag"]}" - "list" -> "list:#{params["list"]}" - _ -> stream - end - - socket = - socket - |> assign(:topic, topic) - |> assign(:user, user) - - Pleroma.Web.Streamer.add_socket(topic, socket) - {:ok, socket} - else - _e -> :error - end - end - - def connect(%{"stream" => stream} = params, socket) - when stream in ["public", "public:local", "hashtag"] do - topic = - case stream do - "hashtag" -> "hashtag:#{params["tag"]}" - _ -> stream - end - - socket = - socket - |> assign(:topic, topic) - - Pleroma.Web.Streamer.add_socket(topic, socket) - {:ok, socket} - end - - def connect(_params, _socket), do: :error - - def id(_), do: nil - - def handle(:text, message, _state) do - # | :ok - # | state - # | {:text, message} - # | {:text, message, state} - # | {:close, "Goodbye!"} - {:text, message} - end - - def handle(:closed, _, %{socket: socket}) do - topic = socket.assigns[:topic] - Pleroma.Web.Streamer.remove_socket(topic, socket) - end -end diff --git a/lib/pleroma/web/mastodon_api/websocket_handler.ex b/lib/pleroma/web/mastodon_api/websocket_handler.ex new file mode 100644 index 000000000..11e0e1696 --- /dev/null +++ b/lib/pleroma/web/mastodon_api/websocket_handler.ex @@ -0,0 +1,120 @@ +defmodule Pleroma.Web.MastodonAPI.WebsocketHandler do + require Logger + + alias Pleroma.Web.OAuth.Token + alias Pleroma.{User, Repo} + + @behaviour :cowboy_websocket_handler + + @streams [ + "public", + "public:local", + "public:media", + "public:local:media", + "user", + "direct", + "list", + "hashtag" + ] + @anonymous_streams ["public", "public:local", "hashtag"] + + # Handled by periodic keepalive in Pleroma.Web.Streamer. + @timeout :infinity + + def init(_type, _req, _opts) do + {:upgrade, :protocol, :cowboy_websocket} + end + + def websocket_init(_type, req, _opts) do + with {qs, req} <- :cowboy_req.qs(req), + params <- :cow_qs.parse_qs(qs), + access_token <- List.keyfind(params, "access_token", 0), + {_, stream} <- List.keyfind(params, "stream", 0), + {:ok, user} <- allow_request(stream, access_token), + topic when is_binary(topic) <- expand_topic(stream, params) do + send(self(), :subscribe) + {:ok, req, %{user: user, topic: topic}, @timeout} + else + {:error, code} -> + Logger.debug("#{__MODULE__} denied connection: #{inspect(code)} - #{inspect(req)}") + {:ok, req} = :cowboy_req.reply(code, req) + {:shutdown, req} + + error -> + Logger.debug("#{__MODULE__} denied connection: #{inspect(error)} - #{inspect(req)}") + {:shutdown, req} + end + end + + # We never receive messages. + def websocket_handle(_frame, req, state) do + {:ok, req, state} + end + + def websocket_info(:subscribe, req, state) do + Logger.debug( + "#{__MODULE__} accepted websocket connection for user #{ + (state.user || %{id: "anonymous"}).id + }, topic #{state.topic}" + ) + + Pleroma.Web.Streamer.add_socket(state.topic, streamer_socket(state)) + {:ok, req, state} + end + + def websocket_info({:text, message}, req, state) do + {:reply, {:text, message}, req, state} + end + + def websocket_terminate(reason, _req, state) do + Logger.debug( + "#{__MODULE__} terminating websocket connection for user #{ + (state.user || %{id: "anonymous"}).id + }, topic #{state.topic || "?"}: #{inspect(reason)}" + ) + + Pleroma.Web.Streamer.remove_socket(state.topic, streamer_socket(state)) + :ok + end + + # Public streams without authentication. + defp allow_request(stream, nil) when stream in @anonymous_streams do + {:ok, nil} + end + + # Authenticated streams. + defp allow_request(stream, {"access_token", access_token}) when stream in @streams do + with %Token{user_id: user_id} <- Repo.get_by(Token, token: access_token), + user = %User{} <- Repo.get(User, user_id) do + {:ok, user} + else + _ -> {:error, 403} + end + end + + # Not authenticated. + defp allow_request(stream, _) when stream in @streams, do: {:error, 403} + + # No matching stream. + defp allow_request(_, _), do: {:error, 404} + + defp expand_topic("hashtag", params) do + case List.keyfind(params, "tag", 0) do + {_, tag} -> "hashtag:#{tag}" + _ -> nil + end + end + + defp expand_topic("list", params) do + case List.keyfind(params, "list", 0) do + {_, list} -> "list:#{list}" + _ -> nil + end + end + + defp expand_topic(topic, _), do: topic + + defp streamer_socket(state) do + %{transport_pid: self(), assigns: state} + end +end diff --git a/lib/pleroma/web/streamer.ex b/lib/pleroma/web/streamer.ex index 29c44e9d5..e1eecba4d 100644 --- a/lib/pleroma/web/streamer.ex +++ b/lib/pleroma/web/streamer.ex @@ -4,17 +4,9 @@ defmodule Pleroma.Web.Streamer do alias Pleroma.{User, Notification, Activity, Object, Repo} alias Pleroma.Web.ActivityPub.ActivityPub - def init(args) do - {:ok, args} - end + @keepalive_interval :timer.seconds(30) def start_link do - spawn(fn -> - # 30 seconds - Process.sleep(1000 * 30) - GenServer.cast(__MODULE__, %{action: :ping}) - end) - GenServer.start_link(__MODULE__, %{}, name: __MODULE__) end @@ -30,6 +22,16 @@ defmodule Pleroma.Web.Streamer do GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item}) end + def init(args) do + spawn(fn -> + # 30 seconds + Process.sleep(@keepalive_interval) + GenServer.cast(__MODULE__, %{action: :ping}) + end) + + {:ok, args} + end + def handle_cast(%{action: :ping}, topics) do Map.values(topics) |> List.flatten() @@ -40,7 +42,7 @@ defmodule Pleroma.Web.Streamer do spawn(fn -> # 30 seconds - Process.sleep(1000 * 30) + Process.sleep(@keepalive_interval) GenServer.cast(__MODULE__, %{action: :ping}) end)