forked from AkkomaGang/akkoma
Remove FedSockets
Current FedSocket implementation has a bunch of problems. It doesn't have proper error handling (in case of an error the server just doesn't respond until the connection is closed, while the client doesn't match any error messages and just assumes there has been an error after 15s) and the code is full of bad descisions (see: fetch registry which uses uuids for no reason and waits for a response by recursively querying a ets table until the value changes, or double JSON encoding). Sometime ago I almost completed rewriting fedsockets from scrach to adress these issues. However, while doing so, I realized that fedsockets are just too overkill for what they were trying to accomplish, which is reduce the overhead of federation by not signing every message. This could be done without reimplementing failure states and endpoint logic we already have with HTTP by, for example, using TLS cert auth, or switching to a more performant signature algorithm. I opened https://git.pleroma.social/pleroma/pleroma/-/issues/2262 for further discussion on alternatives to fedsockets. From discussions I had with other Pleroma developers it seems like they would approve the descision to remove them as well, therefore I am submitting this patch.
This commit is contained in:
parent
9b9afe6b3f
commit
2c55f7d7cb
23 changed files with 32 additions and 1443 deletions
|
@ -129,7 +129,6 @@
|
|||
dispatch: [
|
||||
{:_,
|
||||
[
|
||||
{"/api/fedsocket/v1", Pleroma.Web.FedSockets.IncomingHandler, []},
|
||||
{"/api/v1/streaming", Pleroma.Web.MastodonAPI.WebsocketHandler, []},
|
||||
{"/websocket", Phoenix.Endpoint.CowboyWebSocket,
|
||||
{Phoenix.Transports.WebSocket,
|
||||
|
|
|
@ -272,19 +272,6 @@
|
|||
}
|
||||
]
|
||||
},
|
||||
%{
|
||||
group: :pleroma,
|
||||
key: :fed_sockets,
|
||||
type: :group,
|
||||
description: "Websocket based federation",
|
||||
children: [
|
||||
%{
|
||||
key: :enabled,
|
||||
type: :boolean,
|
||||
description: "Enable FedSockets"
|
||||
}
|
||||
]
|
||||
},
|
||||
%{
|
||||
group: :pleroma,
|
||||
key: Pleroma.Emails.Mailer,
|
||||
|
|
|
@ -220,18 +220,6 @@ config :pleroma, :mrf_user_allowlist, %{
|
|||
* `total_user_limit`: the number of scheduled activities a user is allowed to create in total (Default: `300`)
|
||||
* `enabled`: whether scheduled activities are sent to the job queue to be executed
|
||||
|
||||
## FedSockets
|
||||
FedSockets is an experimental feature allowing for Pleroma backends to federate using a persistant websocket connection as opposed to making each federation a seperate http connection. This feature is currently off by default. It is configurable throught he following options.
|
||||
|
||||
### :fedsockets
|
||||
* `enabled`: Enables FedSockets for this instance. `false` by default.
|
||||
* `connection_duration`: Time an idle websocket is kept open.
|
||||
* `rejection_duration`: Failures to connect via FedSockets will not be retried for this period of time.
|
||||
* `fed_socket_fetches` and `fed_socket_rejections`: Settings passed to `cachex` for the fetch registry, and rejection stacks. See `Pleroma.Web.FedSockets` for more details.
|
||||
|
||||
|
||||
## Frontends
|
||||
|
||||
### :frontend_configurations
|
||||
|
||||
This can be used to configure a keyword list that keeps the configuration data for any kind of frontend. By default, settings for `pleroma_fe` and `masto_fe` are configured. You can find the documentation for `pleroma_fe` configuration into [Pleroma-FE configuration and customization for instance administrators](/frontend/CONFIGURATION/#options).
|
||||
|
|
|
@ -93,9 +93,4 @@ server {
|
|||
chunked_transfer_encoding on;
|
||||
proxy_pass http://phoenix;
|
||||
}
|
||||
|
||||
location /api/fedsocket/v1 {
|
||||
proxy_request_buffering off;
|
||||
proxy_pass http://phoenix/api/fedsocket/v1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -207,8 +207,7 @@ defp dont_run_in_test(_) do
|
|||
name: Pleroma.Web.Streamer.registry(),
|
||||
keys: :duplicate,
|
||||
partitions: System.schedulers_online()
|
||||
]},
|
||||
Pleroma.Web.FedSockets.Supervisor
|
||||
]}
|
||||
]
|
||||
end
|
||||
|
||||
|
|
|
@ -12,7 +12,6 @@ defmodule Pleroma.Object.Fetcher do
|
|||
alias Pleroma.Web.ActivityPub.ObjectValidator
|
||||
alias Pleroma.Web.ActivityPub.Transmogrifier
|
||||
alias Pleroma.Web.Federator
|
||||
alias Pleroma.Web.FedSockets
|
||||
|
||||
require Logger
|
||||
require Pleroma.Constants
|
||||
|
@ -183,16 +182,16 @@ defp maybe_date_fetch(headers, date) do
|
|||
end
|
||||
end
|
||||
|
||||
def fetch_and_contain_remote_object_from_id(prm, opts \\ [])
|
||||
def fetch_and_contain_remote_object_from_id(id)
|
||||
|
||||
def fetch_and_contain_remote_object_from_id(%{"id" => id}, opts),
|
||||
do: fetch_and_contain_remote_object_from_id(id, opts)
|
||||
def fetch_and_contain_remote_object_from_id(%{"id" => id}),
|
||||
do: fetch_and_contain_remote_object_from_id(id)
|
||||
|
||||
def fetch_and_contain_remote_object_from_id(id, opts) when is_binary(id) do
|
||||
def fetch_and_contain_remote_object_from_id(id) when is_binary(id) do
|
||||
Logger.debug("Fetching object #{id} via AP")
|
||||
|
||||
with {:scheme, true} <- {:scheme, String.starts_with?(id, "http")},
|
||||
{:ok, body} <- get_object(id, opts),
|
||||
{:ok, body} <- get_object(id),
|
||||
{:ok, data} <- safe_json_decode(body),
|
||||
:ok <- Containment.contain_origin_from_id(id, data) do
|
||||
{:ok, data}
|
||||
|
@ -208,22 +207,10 @@ def fetch_and_contain_remote_object_from_id(id, opts) when is_binary(id) do
|
|||
end
|
||||
end
|
||||
|
||||
def fetch_and_contain_remote_object_from_id(_id, _opts),
|
||||
def fetch_and_contain_remote_object_from_id(_id),
|
||||
do: {:error, "id must be a string"}
|
||||
|
||||
defp get_object(id, opts) do
|
||||
with false <- Keyword.get(opts, :force_http, false),
|
||||
{:ok, fedsocket} <- FedSockets.get_or_create_fed_socket(id) do
|
||||
Logger.debug("fetching via fedsocket - #{inspect(id)}")
|
||||
FedSockets.fetch(fedsocket, id)
|
||||
else
|
||||
_other ->
|
||||
Logger.debug("fetching via http - #{inspect(id)}")
|
||||
get_object_http(id)
|
||||
end
|
||||
end
|
||||
|
||||
defp get_object_http(id) do
|
||||
defp get_object(id) do
|
||||
date = Pleroma.Signature.signed_date()
|
||||
|
||||
headers =
|
||||
|
|
|
@ -39,7 +39,7 @@ def key_id_to_actor_id(key_id) do
|
|||
def fetch_public_key(conn) do
|
||||
with %{"keyId" => kid} <- HTTPSignatures.signature_for_conn(conn),
|
||||
{:ok, actor_id} <- key_id_to_actor_id(kid),
|
||||
{:ok, public_key} <- User.get_public_key_for_ap_id(actor_id, force_http: true) do
|
||||
{:ok, public_key} <- User.get_public_key_for_ap_id(actor_id) do
|
||||
{:ok, public_key}
|
||||
else
|
||||
e ->
|
||||
|
@ -50,8 +50,8 @@ def fetch_public_key(conn) do
|
|||
def refetch_public_key(conn) do
|
||||
with %{"keyId" => kid} <- HTTPSignatures.signature_for_conn(conn),
|
||||
{:ok, actor_id} <- key_id_to_actor_id(kid),
|
||||
{:ok, _user} <- ActivityPub.make_user_from_ap_id(actor_id, force_http: true),
|
||||
{:ok, public_key} <- User.get_public_key_for_ap_id(actor_id, force_http: true) do
|
||||
{:ok, _user} <- ActivityPub.make_user_from_ap_id(actor_id),
|
||||
{:ok, public_key} <- User.get_public_key_for_ap_id(actor_id) do
|
||||
{:ok, public_key}
|
||||
else
|
||||
e ->
|
||||
|
|
|
@ -1772,12 +1772,12 @@ def html_filter_policy(%User{no_rich_text: true}) do
|
|||
|
||||
def html_filter_policy(_), do: Config.get([:markup, :scrub_policy])
|
||||
|
||||
def fetch_by_ap_id(ap_id, opts \\ []), do: ActivityPub.make_user_from_ap_id(ap_id, opts)
|
||||
def fetch_by_ap_id(ap_id), do: ActivityPub.make_user_from_ap_id(ap_id)
|
||||
|
||||
def get_or_fetch_by_ap_id(ap_id, opts \\ []) do
|
||||
def get_or_fetch_by_ap_id(ap_id) do
|
||||
cached_user = get_cached_by_ap_id(ap_id)
|
||||
|
||||
maybe_fetched_user = needs_update?(cached_user) && fetch_by_ap_id(ap_id, opts)
|
||||
maybe_fetched_user = needs_update?(cached_user) && fetch_by_ap_id(ap_id)
|
||||
|
||||
case {cached_user, maybe_fetched_user} do
|
||||
{_, {:ok, %User{} = user}} ->
|
||||
|
@ -1850,8 +1850,8 @@ def public_key(%{public_key: public_key_pem}) when is_binary(public_key_pem) do
|
|||
|
||||
def public_key(_), do: {:error, "key not found"}
|
||||
|
||||
def get_public_key_for_ap_id(ap_id, opts \\ []) do
|
||||
with {:ok, %User{} = user} <- get_or_fetch_by_ap_id(ap_id, opts),
|
||||
def get_public_key_for_ap_id(ap_id) do
|
||||
with {:ok, %User{} = user} <- get_or_fetch_by_ap_id(ap_id),
|
||||
{:ok, public_key} <- public_key(user) do
|
||||
{:ok, public_key}
|
||||
else
|
||||
|
|
|
@ -1289,12 +1289,10 @@ defp object_to_user_data(data) do
|
|||
|
||||
def fetch_follow_information_for_user(user) do
|
||||
with {:ok, following_data} <-
|
||||
Fetcher.fetch_and_contain_remote_object_from_id(user.following_address,
|
||||
force_http: true
|
||||
),
|
||||
Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
|
||||
{:ok, hide_follows} <- collection_private(following_data),
|
||||
{:ok, followers_data} <-
|
||||
Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address, force_http: true),
|
||||
Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
|
||||
{:ok, hide_followers} <- collection_private(followers_data) do
|
||||
{:ok,
|
||||
%{
|
||||
|
@ -1368,8 +1366,8 @@ def user_data_from_user_object(data) do
|
|||
end
|
||||
end
|
||||
|
||||
def fetch_and_prepare_user_from_ap_id(ap_id, opts \\ []) do
|
||||
with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id, opts),
|
||||
def fetch_and_prepare_user_from_ap_id(ap_id) do
|
||||
with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
|
||||
{:ok, data} <- user_data_from_user_object(data) do
|
||||
{:ok, maybe_update_follow_information(data)}
|
||||
else
|
||||
|
@ -1412,13 +1410,13 @@ def maybe_handle_clashing_nickname(data) do
|
|||
end
|
||||
end
|
||||
|
||||
def make_user_from_ap_id(ap_id, opts \\ []) do
|
||||
def make_user_from_ap_id(ap_id) do
|
||||
user = User.get_cached_by_ap_id(ap_id)
|
||||
|
||||
if user && !User.ap_enabled?(user) do
|
||||
Transmogrifier.upgrade_user_from_ap_id(ap_id)
|
||||
else
|
||||
with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, opts) do
|
||||
with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
|
||||
if user do
|
||||
user
|
||||
|> User.remote_user_changeset(data)
|
||||
|
|
|
@ -13,7 +13,6 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
|
|||
alias Pleroma.User
|
||||
alias Pleroma.Web.ActivityPub.Relay
|
||||
alias Pleroma.Web.ActivityPub.Transmogrifier
|
||||
alias Pleroma.Web.FedSockets
|
||||
|
||||
require Pleroma.Constants
|
||||
|
||||
|
@ -50,28 +49,6 @@ def is_representable?(%Activity{} = activity) do
|
|||
"""
|
||||
def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = params) do
|
||||
Logger.debug("Federating #{id} to #{inbox}")
|
||||
|
||||
case FedSockets.get_or_create_fed_socket(inbox) do
|
||||
{:ok, fedsocket} ->
|
||||
Logger.debug("publishing via fedsockets - #{inspect(inbox)}")
|
||||
FedSockets.publish(fedsocket, json)
|
||||
|
||||
_ ->
|
||||
Logger.debug("publishing via http - #{inspect(inbox)}")
|
||||
http_publish(inbox, actor, json, params)
|
||||
end
|
||||
end
|
||||
|
||||
def publish_one(%{actor_id: actor_id} = params) do
|
||||
actor = User.get_cached_by_id(actor_id)
|
||||
|
||||
params
|
||||
|> Map.delete(:actor_id)
|
||||
|> Map.put(:actor, actor)
|
||||
|> publish_one()
|
||||
end
|
||||
|
||||
defp http_publish(inbox, actor, json, params) do
|
||||
uri = %{path: path} = URI.parse(inbox)
|
||||
digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
|
||||
|
||||
|
@ -110,6 +87,15 @@ defp http_publish(inbox, actor, json, params) do
|
|||
end
|
||||
end
|
||||
|
||||
def publish_one(%{actor_id: actor_id} = params) do
|
||||
actor = User.get_cached_by_id(actor_id)
|
||||
|
||||
params
|
||||
|> Map.delete(:actor_id)
|
||||
|> Map.put(:actor, actor)
|
||||
|> publish_one()
|
||||
end
|
||||
|
||||
defp signature_host(%URI{port: port, scheme: scheme, host: host}) do
|
||||
if port == URI.default_port(scheme) do
|
||||
host
|
||||
|
|
|
@ -1008,7 +1008,7 @@ def perform(:user_upgrade, user) do
|
|||
|
||||
def upgrade_user_from_ap_id(ap_id) do
|
||||
with %User{local: false} = user <- User.get_cached_by_ap_id(ap_id),
|
||||
{:ok, data} <- ActivityPub.fetch_and_prepare_user_from_ap_id(ap_id, force_http: true),
|
||||
{:ok, data} <- ActivityPub.fetch_and_prepare_user_from_ap_id(ap_id),
|
||||
{:ok, user} <- update_user(user, data) do
|
||||
TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id})
|
||||
{:ok, user}
|
||||
|
|
|
@ -1,185 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.FedSockets do
|
||||
@moduledoc """
|
||||
This documents the FedSockets framework. A framework for federating
|
||||
ActivityPub objects between servers via persistant WebSocket connections.
|
||||
|
||||
FedSockets allow servers to authenticate on first contact and maintain that
|
||||
connection, eliminating the need to authenticate every time data needs to be shared.
|
||||
|
||||
## Protocol
|
||||
FedSockets currently support 2 types of data transfer:
|
||||
* `publish` method which doesn't require a response
|
||||
* `fetch` method requires a response be sent
|
||||
|
||||
### Publish
|
||||
The publish operation sends a json encoded map of the shape:
|
||||
%{action: :publish, data: json}
|
||||
and accepts (but does not require) a reply of form:
|
||||
%{"action" => "publish_reply"}
|
||||
|
||||
The outgoing params represent
|
||||
* data: ActivityPub object encoded into json
|
||||
|
||||
|
||||
### Fetch
|
||||
The fetch operation sends a json encoded map of the shape:
|
||||
%{action: :fetch, data: id, uuid: fetch_uuid}
|
||||
and requires a reply of form:
|
||||
%{"action" => "fetch_reply", "uuid" => uuid, "data" => data}
|
||||
|
||||
The outgoing params represent
|
||||
* id: an ActivityPub object URI
|
||||
* uuid: a unique uuid generated by the sender
|
||||
|
||||
The reply params represent
|
||||
* data: an ActivityPub object encoded into json
|
||||
* uuid: the uuid sent along with the fetch request
|
||||
|
||||
## Examples
|
||||
Clients of FedSocket transfers shouldn't need to use any of the functions outside of this module.
|
||||
|
||||
A typical publish operation can be performed through the following code, and a fetch operation in a similar manner.
|
||||
|
||||
case FedSockets.get_or_create_fed_socket(inbox) do
|
||||
{:ok, fedsocket} ->
|
||||
FedSockets.publish(fedsocket, json)
|
||||
|
||||
_ ->
|
||||
alternative_publish(inbox, actor, json, params)
|
||||
end
|
||||
|
||||
## Configuration
|
||||
FedSockets have the following config settings
|
||||
|
||||
config :pleroma, :fed_sockets,
|
||||
enabled: true,
|
||||
ping_interval: :timer.seconds(15),
|
||||
connection_duration: :timer.hours(1),
|
||||
rejection_duration: :timer.hours(1),
|
||||
fed_socket_fetches: [
|
||||
default: 12_000,
|
||||
interval: 3_000,
|
||||
lazy: false
|
||||
]
|
||||
* enabled - turn FedSockets on or off with this flag. Can be toggled at runtime.
|
||||
* connection_duration - How long a FedSocket can sit idle before it's culled.
|
||||
* rejection_duration - After failing to make a FedSocket connection a host will be excluded
|
||||
from further connections for this amount of time
|
||||
* fed_socket_fetches - Use these parameters to pass options to the Cachex queue backing the FetchRegistry
|
||||
* fed_socket_rejections - Use these parameters to pass options to the Cachex queue backing the FedRegistry
|
||||
|
||||
Cachex options are
|
||||
* default: the minimum amount of time a fetch can wait before it times out.
|
||||
* interval: the interval between checks for timed out entries. This plus the default represent the maximum time allowed
|
||||
* lazy: leave at false for consistant and fast lookups, set to true for stricter timeout enforcement
|
||||
|
||||
"""
|
||||
require Logger
|
||||
|
||||
alias Pleroma.Web.FedSockets.FedRegistry
|
||||
alias Pleroma.Web.FedSockets.FedSocket
|
||||
alias Pleroma.Web.FedSockets.SocketInfo
|
||||
|
||||
@doc """
|
||||
returns a FedSocket for the given origin. Will reuse an existing one or create a new one.
|
||||
|
||||
address is expected to be a fully formed URL such as:
|
||||
"http://www.example.com" or "http://www.example.com:8080"
|
||||
|
||||
It can and usually does include additional path parameters,
|
||||
but these are ignored as the FedSockets are organized by host and port info alone.
|
||||
"""
|
||||
def get_or_create_fed_socket(address) do
|
||||
with {:cache, {:error, :missing}} <- {:cache, get_fed_socket(address)},
|
||||
{:connect, {:ok, _pid}} <- {:connect, FedSocket.connect_to_host(address)},
|
||||
{:cache, {:ok, fed_socket}} <- {:cache, get_fed_socket(address)} do
|
||||
Logger.debug("fedsocket created for - #{inspect(address)}")
|
||||
{:ok, fed_socket}
|
||||
else
|
||||
{:cache, {:ok, socket}} ->
|
||||
Logger.debug("fedsocket found in cache - #{inspect(address)}")
|
||||
{:ok, socket}
|
||||
|
||||
{:cache, {:error, :rejected} = e} ->
|
||||
e
|
||||
|
||||
{:connect, {:error, _host}} ->
|
||||
Logger.debug("set host rejected for - #{inspect(address)}")
|
||||
FedRegistry.set_host_rejected(address)
|
||||
{:error, :rejected}
|
||||
|
||||
{_, {:error, :disabled}} ->
|
||||
{:error, :disabled}
|
||||
|
||||
{_, {:error, reason}} ->
|
||||
Logger.warn("get_or_create_fed_socket error - #{inspect(reason)}")
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
returns a FedSocket for the given origin. Will not create a new FedSocket if one does not exist.
|
||||
|
||||
address is expected to be a fully formed URL such as:
|
||||
"http://www.example.com" or "http://www.example.com:8080"
|
||||
"""
|
||||
def get_fed_socket(address) do
|
||||
origin = SocketInfo.origin(address)
|
||||
|
||||
with {:config, true} <- {:config, Pleroma.Config.get([:fed_sockets, :enabled], false)},
|
||||
{:ok, socket} <- FedRegistry.get_fed_socket(origin) do
|
||||
{:ok, socket}
|
||||
else
|
||||
{:config, _} ->
|
||||
{:error, :disabled}
|
||||
|
||||
{:error, :rejected} ->
|
||||
Logger.debug("FedSocket previously rejected - #{inspect(origin)}")
|
||||
{:error, :rejected}
|
||||
|
||||
{:error, reason} ->
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
Sends the supplied data via the publish protocol.
|
||||
It will not block waiting for a reply.
|
||||
Returns :ok but this is not an indication of a successful transfer.
|
||||
|
||||
the data is expected to be JSON encoded binary data.
|
||||
"""
|
||||
def publish(%SocketInfo{} = fed_socket, json) do
|
||||
FedSocket.publish(fed_socket, json)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Sends the supplied data via the fetch protocol.
|
||||
It will block waiting for a reply or timeout.
|
||||
|
||||
Returns {:ok, object} where object is the requested object (or nil)
|
||||
{:error, :timeout} in the event the message was not responded to
|
||||
|
||||
the id is expected to be the URI of an ActivityPub object.
|
||||
"""
|
||||
def fetch(%SocketInfo{} = fed_socket, id) do
|
||||
FedSocket.fetch(fed_socket, id)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Disconnect all and restart FedSockets.
|
||||
This is mainly used in development and testing but could be useful in production.
|
||||
"""
|
||||
def reset do
|
||||
FedRegistry
|
||||
|> Process.whereis()
|
||||
|> Process.exit(:testing)
|
||||
end
|
||||
|
||||
def uri_for_origin(origin),
|
||||
do: "ws://#{origin}/api/fedsocket/v1"
|
||||
end
|
|
@ -1,185 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.FedSockets.FedRegistry do
|
||||
@moduledoc """
|
||||
The FedRegistry stores the active FedSockets for quick retrieval.
|
||||
|
||||
The storage and retrieval portion of the FedRegistry is done in process through
|
||||
elixir's `Registry` module for speed and its ability to monitor for terminated processes.
|
||||
|
||||
Dropped connections will be caught by `Registry` and deleted. Since the next
|
||||
message will initiate a new connection there is no reason to try and reconnect at that point.
|
||||
|
||||
Normally outside modules should have no need to call or use the FedRegistry themselves.
|
||||
"""
|
||||
|
||||
alias Pleroma.Web.FedSockets.FedSocket
|
||||
alias Pleroma.Web.FedSockets.SocketInfo
|
||||
|
||||
require Logger
|
||||
|
||||
@default_rejection_duration 15 * 60 * 1000
|
||||
@rejections :fed_socket_rejections
|
||||
|
||||
@doc """
|
||||
Retrieves a FedSocket from the Registry given it's origin.
|
||||
|
||||
The origin is expected to be a string identifying the endpoint "example.com" or "example2.com:8080"
|
||||
|
||||
Will return:
|
||||
* {:ok, fed_socket} for working FedSockets
|
||||
* {:error, :rejected} for origins that have been tried and refused within the rejection duration interval
|
||||
* {:error, some_reason} usually :missing for unknown origins
|
||||
"""
|
||||
def get_fed_socket(origin) do
|
||||
case get_registry_data(origin) do
|
||||
{:error, reason} ->
|
||||
{:error, reason}
|
||||
|
||||
{:ok, %{state: :connected} = socket_info} ->
|
||||
{:ok, socket_info}
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
Adds a connected FedSocket to the Registry.
|
||||
|
||||
Always returns {:ok, fed_socket}
|
||||
"""
|
||||
def add_fed_socket(origin, pid \\ nil) do
|
||||
origin
|
||||
|> SocketInfo.build(pid)
|
||||
|> SocketInfo.connect()
|
||||
|> add_socket_info
|
||||
end
|
||||
|
||||
defp add_socket_info(%{origin: origin, state: :connected} = socket_info) do
|
||||
case Registry.register(FedSockets.Registry, origin, socket_info) do
|
||||
{:ok, _owner} ->
|
||||
clear_prior_rejection(origin)
|
||||
Logger.debug("fedsocket added: #{inspect(origin)}")
|
||||
|
||||
{:ok, socket_info}
|
||||
|
||||
{:error, {:already_registered, _pid}} ->
|
||||
FedSocket.close(socket_info)
|
||||
existing_socket_info = Registry.lookup(FedSockets.Registry, origin)
|
||||
|
||||
{:ok, existing_socket_info}
|
||||
|
||||
_ ->
|
||||
{:error, :error_adding_socket}
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
Mark this origin as having rejected a connection attempt.
|
||||
This will keep it from getting additional connection attempts
|
||||
for a period of time specified in the config.
|
||||
|
||||
Always returns {:ok, new_reg_data}
|
||||
"""
|
||||
def set_host_rejected(uri) do
|
||||
new_reg_data =
|
||||
uri
|
||||
|> SocketInfo.origin()
|
||||
|> get_or_create_registry_data()
|
||||
|> set_to_rejected()
|
||||
|> save_registry_data()
|
||||
|
||||
{:ok, new_reg_data}
|
||||
end
|
||||
|
||||
@doc """
|
||||
Retrieves the FedRegistryData from the Registry given it's origin.
|
||||
|
||||
The origin is expected to be a string identifying the endpoint "example.com" or "example2.com:8080"
|
||||
|
||||
Will return:
|
||||
* {:ok, fed_registry_data} for known origins
|
||||
* {:error, :missing} for uniknown origins
|
||||
* {:error, :cache_error} indicating some low level runtime issues
|
||||
"""
|
||||
def get_registry_data(origin) do
|
||||
case Registry.lookup(FedSockets.Registry, origin) do
|
||||
[] ->
|
||||
if is_rejected?(origin) do
|
||||
Logger.debug("previously rejected fedsocket requested")
|
||||
{:error, :rejected}
|
||||
else
|
||||
{:error, :missing}
|
||||
end
|
||||
|
||||
[{_pid, %{state: :connected} = socket_info}] ->
|
||||
{:ok, socket_info}
|
||||
|
||||
_ ->
|
||||
{:error, :cache_error}
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
Retrieves a map of all sockets from the Registry. The keys are the origins and the values are the corresponding SocketInfo
|
||||
"""
|
||||
def list_all do
|
||||
(list_all_connected() ++ list_all_rejected())
|
||||
|> Enum.into(%{})
|
||||
end
|
||||
|
||||
defp list_all_connected do
|
||||
FedSockets.Registry
|
||||
|> Registry.select([{{:"$1", :_, :"$3"}, [], [{{:"$1", :"$3"}}]}])
|
||||
end
|
||||
|
||||
defp list_all_rejected do
|
||||
{:ok, keys} = Cachex.keys(@rejections)
|
||||
|
||||
{:ok, registry_data} =
|
||||
Cachex.execute(@rejections, fn worker ->
|
||||
Enum.map(keys, fn k -> {k, Cachex.get!(worker, k)} end)
|
||||
end)
|
||||
|
||||
registry_data
|
||||
end
|
||||
|
||||
defp clear_prior_rejection(origin),
|
||||
do: Cachex.del(@rejections, origin)
|
||||
|
||||
defp is_rejected?(origin) do
|
||||
case Cachex.get(@rejections, origin) do
|
||||
{:ok, nil} ->
|
||||
false
|
||||
|
||||
{:ok, _} ->
|
||||
true
|
||||
end
|
||||
end
|
||||
|
||||
defp get_or_create_registry_data(origin) do
|
||||
case get_registry_data(origin) do
|
||||
{:error, :missing} ->
|
||||
%SocketInfo{origin: origin}
|
||||
|
||||
{:ok, socket_info} ->
|
||||
socket_info
|
||||
end
|
||||
end
|
||||
|
||||
defp save_registry_data(%SocketInfo{origin: origin, state: :connected} = socket_info) do
|
||||
{:ok, true} = Registry.update_value(FedSockets.Registry, origin, fn _ -> socket_info end)
|
||||
socket_info
|
||||
end
|
||||
|
||||
defp save_registry_data(%SocketInfo{origin: origin, state: :rejected} = socket_info) do
|
||||
rejection_expiration =
|
||||
Pleroma.Config.get([:fed_sockets, :rejection_duration], @default_rejection_duration)
|
||||
|
||||
{:ok, true} = Cachex.put(@rejections, origin, socket_info, ttl: rejection_expiration)
|
||||
socket_info
|
||||
end
|
||||
|
||||
defp set_to_rejected(%SocketInfo{} = socket_info),
|
||||
do: %SocketInfo{socket_info | state: :rejected}
|
||||
end
|
|
@ -1,137 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.FedSockets.FedSocket do
|
||||
@moduledoc """
|
||||
The FedSocket module abstracts the actions to be taken taken on connections regardless of
|
||||
whether the connection started as inbound or outbound.
|
||||
|
||||
|
||||
Normally outside modules will have no need to call the FedSocket module directly.
|
||||
"""
|
||||
|
||||
alias Pleroma.Object
|
||||
alias Pleroma.Object.Containment
|
||||
alias Pleroma.User
|
||||
alias Pleroma.Web.ActivityPub.ObjectView
|
||||
alias Pleroma.Web.ActivityPub.UserView
|
||||
alias Pleroma.Web.ActivityPub.Visibility
|
||||
alias Pleroma.Web.FedSockets.FetchRegistry
|
||||
alias Pleroma.Web.FedSockets.IngesterWorker
|
||||
alias Pleroma.Web.FedSockets.OutgoingHandler
|
||||
alias Pleroma.Web.FedSockets.SocketInfo
|
||||
|
||||
require Logger
|
||||
|
||||
@shake "61dd18f7-f1e6-49a4-939a-a749fcdc1103"
|
||||
|
||||
def connect_to_host(uri) do
|
||||
case OutgoingHandler.start_link(uri) do
|
||||
{:ok, pid} ->
|
||||
{:ok, pid}
|
||||
|
||||
error ->
|
||||
{:error, error}
|
||||
end
|
||||
end
|
||||
|
||||
def close(%SocketInfo{pid: socket_pid}),
|
||||
do: Process.send(socket_pid, :close, [])
|
||||
|
||||
def publish(%SocketInfo{pid: socket_pid}, json) do
|
||||
%{action: :publish, data: json}
|
||||
|> Jason.encode!()
|
||||
|> send_packet(socket_pid)
|
||||
end
|
||||
|
||||
def fetch(%SocketInfo{pid: socket_pid}, id) do
|
||||
fetch_uuid = FetchRegistry.register_fetch(id)
|
||||
|
||||
%{action: :fetch, data: id, uuid: fetch_uuid}
|
||||
|> Jason.encode!()
|
||||
|> send_packet(socket_pid)
|
||||
|
||||
wait_for_fetch_to_return(fetch_uuid, 0)
|
||||
end
|
||||
|
||||
def receive_package(%SocketInfo{} = fed_socket, json) do
|
||||
json
|
||||
|> Jason.decode!()
|
||||
|> process_package(fed_socket)
|
||||
end
|
||||
|
||||
defp wait_for_fetch_to_return(uuid, cntr) do
|
||||
case FetchRegistry.check_fetch(uuid) do
|
||||
{:error, :waiting} ->
|
||||
Process.sleep(:math.pow(cntr, 3) |> Kernel.trunc())
|
||||
wait_for_fetch_to_return(uuid, cntr + 1)
|
||||
|
||||
{:error, :missing} ->
|
||||
Logger.error("FedSocket fetch timed out - #{inspect(uuid)}")
|
||||
{:error, :timeout}
|
||||
|
||||
{:ok, _fr} ->
|
||||
FetchRegistry.pop_fetch(uuid)
|
||||
end
|
||||
end
|
||||
|
||||
defp process_package(%{"action" => "publish", "data" => data}, %{origin: origin} = _fed_socket) do
|
||||
if Containment.contain_origin(origin, data) do
|
||||
IngesterWorker.enqueue("ingest", %{"object" => data})
|
||||
end
|
||||
|
||||
{:reply, %{"action" => "publish_reply", "status" => "processed"}}
|
||||
end
|
||||
|
||||
defp process_package(%{"action" => "fetch_reply", "uuid" => uuid, "data" => data}, _fed_socket) do
|
||||
FetchRegistry.register_fetch_received(uuid, data)
|
||||
{:noreply, nil}
|
||||
end
|
||||
|
||||
defp process_package(%{"action" => "fetch", "uuid" => uuid, "data" => ap_id}, _fed_socket) do
|
||||
{:ok, data} = render_fetched_data(ap_id, uuid)
|
||||
{:reply, data}
|
||||
end
|
||||
|
||||
defp process_package(%{"action" => "publish_reply"}, _fed_socket) do
|
||||
{:noreply, nil}
|
||||
end
|
||||
|
||||
defp process_package(other, _fed_socket) do
|
||||
Logger.warn("unknown json packages received #{inspect(other)}")
|
||||
{:noreply, nil}
|
||||
end
|
||||
|
||||
defp render_fetched_data(ap_id, uuid) do
|
||||
{:ok,
|
||||
%{
|
||||
"action" => "fetch_reply",
|
||||
"status" => "processed",
|
||||
"uuid" => uuid,
|
||||
"data" => represent_item(ap_id)
|
||||
}}
|
||||
end
|
||||
|
||||
defp represent_item(ap_id) do
|
||||
case User.get_by_ap_id(ap_id) do
|
||||
nil ->
|
||||
object = Object.get_cached_by_ap_id(ap_id)
|
||||
|
||||
if Visibility.is_public?(object) do
|
||||
Phoenix.View.render_to_string(ObjectView, "object.json", object: object)
|
||||
else
|
||||
nil
|
||||
end
|
||||
|
||||
user ->
|
||||
Phoenix.View.render_to_string(UserView, "user.json", user: user)
|
||||
end
|
||||
end
|
||||
|
||||
defp send_packet(data, socket_pid) do
|
||||
Process.send(socket_pid, {:send, data}, [])
|
||||
end
|
||||
|
||||
def shake, do: @shake
|
||||
end
|
|
@ -1,151 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.FedSockets.FetchRegistry do
|
||||
@moduledoc """
|
||||
The FetchRegistry acts as a broker for fetch requests and return values.
|
||||
This allows calling processes to block while waiting for a reply.
|
||||
It doesn't impose it's own process instead using `Cachex` to handle fetches in process, allowing
|
||||
multi threaded processes to avoid bottlenecking.
|
||||
|
||||
Normally outside modules will have no need to call or use the FetchRegistry themselves.
|
||||
|
||||
The `Cachex` parameters can be controlled from the config. Since exact timeout intervals
|
||||
aren't necessary the following settings are used by default:
|
||||
|
||||
config :pleroma, :fed_sockets,
|
||||
fed_socket_fetches: [
|
||||
default: 12_000,
|
||||
interval: 3_000,
|
||||
lazy: false
|
||||
]
|
||||
|
||||
"""
|
||||
|
||||
defmodule FetchRegistryData do
|
||||
defstruct uuid: nil,
|
||||
sent_json: nil,
|
||||
received_json: nil,
|
||||
sent_at: nil,
|
||||
received_at: nil
|
||||
end
|
||||
|
||||
alias Ecto.UUID
|
||||
|
||||
require Logger
|
||||
|
||||
@fetches :fed_socket_fetches
|
||||
|
||||
@doc """
|
||||
Registers a json request wth the FetchRegistry and returns the identifying UUID.
|
||||
"""
|
||||
def register_fetch(json) do
|
||||
%FetchRegistryData{uuid: uuid} =
|
||||
json
|
||||
|> new_registry_data
|
||||
|> save_registry_data
|
||||
|
||||
uuid
|
||||
end
|
||||
|
||||
@doc """
|
||||
Reports on the status of a Fetch given the identifying UUID.
|
||||
|
||||
Will return
|
||||
* {:ok, fetched_object} if a fetch has completed
|
||||
* {:error, :waiting} if a fetch is still pending
|
||||
* {:error, other_error} usually :missing to indicate a fetch that has timed out
|
||||
"""
|
||||
def check_fetch(uuid) do
|
||||
case get_registry_data(uuid) do
|
||||
{:ok, %FetchRegistryData{received_at: nil}} ->
|
||||
{:error, :waiting}
|
||||
|
||||
{:ok, %FetchRegistryData{} = reg_data} ->
|
||||
{:ok, reg_data}
|
||||
|
||||
e ->
|
||||
e
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
Retrieves the response to a fetch given the identifying UUID.
|
||||
The completed fetch will be deleted from the FetchRegistry
|
||||
|
||||
Will return
|
||||
* {:ok, fetched_object} if a fetch has completed
|
||||
* {:error, :waiting} if a fetch is still pending
|
||||
* {:error, other_error} usually :missing to indicate a fetch that has timed out
|
||||
"""
|
||||
def pop_fetch(uuid) do
|
||||
case check_fetch(uuid) do
|
||||
{:ok, %FetchRegistryData{received_json: received_json}} ->
|
||||
delete_registry_data(uuid)
|
||||
{:ok, received_json}
|
||||
|
||||
e ->
|
||||
e
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
This is called to register a fetch has returned.
|
||||
It expects the result data along with the UUID that was sent in the request
|
||||
|
||||
Will return the fetched object or :error
|
||||
"""
|
||||
def register_fetch_received(uuid, data) do
|
||||
case get_registry_data(uuid) do
|
||||
{:ok, %FetchRegistryData{received_at: nil} = reg_data} ->
|
||||
reg_data
|
||||
|> set_fetch_received(data)
|
||||
|> save_registry_data()
|
||||
|
||||
{:ok, %FetchRegistryData{} = reg_data} ->
|
||||
Logger.warn("tried to add fetched data twice - #{uuid}")
|
||||
reg_data
|
||||
|
||||
{:error, _} ->
|
||||
Logger.warn("Error adding fetch to registry - #{uuid}")
|
||||
:error
|
||||
end
|
||||
end
|
||||
|
||||
defp new_registry_data(json) do
|
||||
%FetchRegistryData{
|
||||
uuid: UUID.generate(),
|
||||
sent_json: json,
|
||||
sent_at: :erlang.monotonic_time(:millisecond)
|
||||
}
|
||||
end
|
||||
|
||||
defp get_registry_data(origin) do
|
||||
case Cachex.get(@fetches, origin) do
|
||||
{:ok, nil} ->
|
||||
{:error, :missing}
|
||||
|
||||
{:ok, reg_data} ->
|
||||
{:ok, reg_data}
|
||||
|
||||
_ ->
|
||||
{:error, :cache_error}
|
||||
end
|
||||
end
|
||||
|
||||
defp set_fetch_received(%FetchRegistryData{} = reg_data, data),
|
||||
do: %FetchRegistryData{
|
||||
reg_data
|
||||
| received_at: :erlang.monotonic_time(:millisecond),
|
||||
received_json: data
|
||||
}
|
||||
|
||||
defp save_registry_data(%FetchRegistryData{uuid: uuid} = reg_data) do
|
||||
{:ok, true} = Cachex.put(@fetches, uuid, reg_data)
|
||||
reg_data
|
||||
end
|
||||
|
||||
defp delete_registry_data(origin),
|
||||
do: {:ok, true} = Cachex.del(@fetches, origin)
|
||||
end
|
|
@ -1,88 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.FedSockets.IncomingHandler do
|
||||
require Logger
|
||||
|
||||
alias Pleroma.Web.FedSockets.FedRegistry
|
||||
alias Pleroma.Web.FedSockets.FedSocket
|
||||
alias Pleroma.Web.FedSockets.SocketInfo
|
||||
|
||||
import HTTPSignatures, only: [validate_conn: 1, split_signature: 1]
|
||||
|
||||
@behaviour :cowboy_websocket
|
||||
|
||||
def init(req, state) do
|
||||
shake = FedSocket.shake()
|
||||
|
||||
with true <- Pleroma.Config.get([:fed_sockets, :enabled]),
|
||||
sec_protocol <- :cowboy_req.header("sec-websocket-protocol", req, nil),
|
||||
headers = %{"(request-target)" => ^shake} <- :cowboy_req.headers(req),
|
||||
true <- validate_conn(%{req_headers: headers}),
|
||||
%{"keyId" => origin} <- split_signature(headers["signature"]) do
|
||||
req =
|
||||
if is_nil(sec_protocol) do
|
||||
req
|
||||
else
|
||||
:cowboy_req.set_resp_header("sec-websocket-protocol", sec_protocol, req)
|
||||
end
|
||||
|
||||
{:cowboy_websocket, req, %{origin: origin}, %{}}
|
||||
else
|
||||
_ ->
|
||||
{:ok, req, state}
|
||||
end
|
||||
end
|
||||
|
||||
def websocket_init(%{origin: origin}) do
|
||||
case FedRegistry.add_fed_socket(origin) do
|
||||
{:ok, socket_info} ->
|
||||
{:ok, socket_info}
|
||||
|
||||
e ->
|
||||
Logger.error("FedSocket websocket_init failed - #{inspect(e)}")
|
||||
{:error, inspect(e)}
|
||||
end
|
||||
end
|
||||
|
||||
# Use the ping to check if the connection should be expired
|
||||
def websocket_handle(:ping, socket_info) do
|
||||
if SocketInfo.expired?(socket_info) do
|
||||
{:stop, socket_info}
|
||||
else
|
||||
{:ok, socket_info, :hibernate}
|
||||
end
|
||||
end
|
||||
|
||||
def websocket_handle({:text, data}, socket_info) do
|
||||
socket_info = SocketInfo.touch(socket_info)
|
||||
|
||||
case FedSocket.receive_package(socket_info, data) do
|
||||
{:noreply, _} ->
|
||||
{:ok, socket_info}
|
||||
|
||||
{:reply, reply} ->
|
||||
{:reply, {:text, Jason.encode!(reply)}, socket_info}
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.error("incoming error - receive_package: #{inspect(reason)}")
|
||||
{:ok, socket_info}
|
||||
end
|
||||
end
|
||||
|
||||
def websocket_info({:send, message}, socket_info) do
|
||||
socket_info = SocketInfo.touch(socket_info)
|
||||
|
||||
{:reply, {:text, message}, socket_info}
|
||||
end
|
||||
|
||||
def websocket_info(:close, state) do
|
||||
{:stop, state}
|
||||
end
|
||||
|
||||
def websocket_info(message, state) do
|
||||
Logger.debug("#{__MODULE__} unknown message #{inspect(message)}")
|
||||
{:ok, state}
|
||||
end
|
||||
end
|
|
@ -1,33 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.FedSockets.IngesterWorker do
|
||||
use Pleroma.Workers.WorkerHelper, queue: "ingestion_queue"
|
||||
require Logger
|
||||
|
||||
alias Pleroma.Web.Federator
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(%Job{args: %{"op" => "ingest", "object" => ingestee}}) do
|
||||
try do
|
||||
ingestee
|
||||
|> Jason.decode!()
|
||||
|> do_ingestion()
|
||||
rescue
|
||||
e ->
|
||||
Logger.error("IngesterWorker error - #{inspect(e)}")
|
||||
e
|
||||
end
|
||||
end
|
||||
|
||||
defp do_ingestion(params) do
|
||||
case Federator.incoming_ap_doc(params) do
|
||||
{:error, reason} ->
|
||||
{:error, reason}
|
||||
|
||||
{:ok, object} ->
|
||||
{:ok, object}
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,151 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.FedSockets.OutgoingHandler do
|
||||
use GenServer
|
||||
|
||||
require Logger
|
||||
|
||||
alias Pleroma.Application
|
||||
alias Pleroma.Web.ActivityPub.InternalFetchActor
|
||||
alias Pleroma.Web.FedSockets
|
||||
alias Pleroma.Web.FedSockets.FedRegistry
|
||||
alias Pleroma.Web.FedSockets.FedSocket
|
||||
alias Pleroma.Web.FedSockets.SocketInfo
|
||||
|
||||
def start_link(uri) do
|
||||
GenServer.start_link(__MODULE__, %{uri: uri})
|
||||
end
|
||||
|
||||
def init(%{uri: uri}) do
|
||||
case initiate_connection(uri) do
|
||||
{:ok, ws_origin, conn_pid} ->
|
||||
FedRegistry.add_fed_socket(ws_origin, conn_pid)
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.debug("Outgoing connection failed - #{inspect(reason)}")
|
||||
:ignore
|
||||
end
|
||||
end
|
||||
|
||||
def handle_info({:gun_ws, conn_pid, _ref, {:text, data}}, socket_info) do
|
||||
socket_info = SocketInfo.touch(socket_info)
|
||||
|
||||
case FedSocket.receive_package(socket_info, data) do
|
||||
{:noreply, _} ->
|
||||
{:noreply, socket_info}
|
||||
|
||||
{:reply, reply} ->
|
||||
:gun.ws_send(conn_pid, {:text, Jason.encode!(reply)})
|
||||
{:noreply, socket_info}
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.error("incoming error - receive_package: #{inspect(reason)}")
|
||||
{:noreply, socket_info}
|
||||
end
|
||||
end
|
||||
|
||||
def handle_info(:close, state) do
|
||||
Logger.debug("Sending close frame !!!!!!!")
|
||||
{:close, state}
|
||||
end
|
||||
|
||||
def handle_info({:gun_down, _pid, _prot, :closed, _}, state) do
|
||||
{:stop, :normal, state}
|
||||
end
|
||||
|
||||
def handle_info({:send, data}, %{conn_pid: conn_pid} = socket_info) do
|
||||
socket_info = SocketInfo.touch(socket_info)
|
||||
:gun.ws_send(conn_pid, {:text, data})
|
||||
{:noreply, socket_info}
|
||||
end
|
||||
|
||||
def handle_info({:gun_ws, _, _, :pong}, state) do
|
||||
{:noreply, state, :hibernate}
|
||||
end
|
||||
|
||||
def handle_info(msg, state) do
|
||||
Logger.debug("#{__MODULE__} unhandled event #{inspect(msg)}")
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
def terminate(reason, state) do
|
||||
Logger.debug(
|
||||
"#{__MODULE__} terminating outgoing connection for #{inspect(state)} for #{inspect(reason)}"
|
||||
)
|
||||
|
||||
{:ok, state}
|
||||
end
|
||||
|
||||
def initiate_connection(uri) do
|
||||
ws_uri =
|
||||
uri
|
||||
|> SocketInfo.origin()
|
||||
|> FedSockets.uri_for_origin()
|
||||
|
||||
%{host: host, port: port, path: path} = URI.parse(ws_uri)
|
||||
|
||||
with {:ok, conn_pid} <- :gun.open(to_charlist(host), port, %{protocols: [:http]}),
|
||||
{:ok, _} <- :gun.await_up(conn_pid),
|
||||
reference <-
|
||||
:gun.get(conn_pid, to_charlist(path), [
|
||||
{'user-agent', to_charlist(Application.user_agent())}
|
||||
]),
|
||||
{:response, :fin, 204, _} <- :gun.await(conn_pid, reference),
|
||||
headers <- build_headers(uri),
|
||||
ref <- :gun.ws_upgrade(conn_pid, to_charlist(path), headers, %{silence_pings: false}) do
|
||||
receive do
|
||||
{:gun_upgrade, ^conn_pid, ^ref, [<<"websocket">>], _} ->
|
||||
{:ok, ws_uri, conn_pid}
|
||||
after
|
||||
15_000 ->
|
||||
Logger.debug("Fedsocket timeout connecting to #{inspect(uri)}")
|
||||
{:error, :timeout}
|
||||
end
|
||||
else
|
||||
{:response, :nofin, 404, _} ->
|
||||
{:error, :fedsockets_not_supported}
|
||||
|
||||
e ->
|
||||
Logger.debug("Fedsocket error connecting to #{inspect(uri)}")
|
||||
{:error, e}
|
||||
end
|
||||
end
|
||||
|
||||
defp build_headers(uri) do
|
||||
host_for_sig = uri |> URI.parse() |> host_signature()
|
||||
|
||||
shake = FedSocket.shake()
|
||||
digest = "SHA-256=" <> (:crypto.hash(:sha256, shake) |> Base.encode64())
|
||||
date = Pleroma.Signature.signed_date()
|
||||
shake_size = byte_size(shake)
|
||||
|
||||
signature_opts = %{
|
||||
"(request-target)": shake,
|
||||
"content-length": to_charlist("#{shake_size}"),
|
||||
date: date,
|
||||
digest: digest,
|
||||
host: host_for_sig
|
||||
}
|
||||
|
||||
signature = Pleroma.Signature.sign(InternalFetchActor.get_actor(), signature_opts)
|
||||
|
||||
[
|
||||
{'signature', to_charlist(signature)},
|
||||
{'date', date},
|
||||
{'digest', to_charlist(digest)},
|
||||
{'content-length', to_charlist("#{shake_size}")},
|
||||
{to_charlist("(request-target)"), to_charlist(shake)},
|
||||
{'user-agent', to_charlist(Application.user_agent())}
|
||||
]
|
||||
end
|
||||
|
||||
defp host_signature(%{host: host, scheme: scheme, port: port}) do
|
||||
if port == URI.default_port(scheme) do
|
||||
host
|
||||
else
|
||||
"#{host}:#{port}"
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,52 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.FedSockets.SocketInfo do
|
||||
defstruct origin: nil,
|
||||
pid: nil,
|
||||
conn_pid: nil,
|
||||
state: :default,
|
||||
connected_until: nil
|
||||
|
||||
alias Pleroma.Web.FedSockets.SocketInfo
|
||||
@default_connection_duration 15 * 60 * 1000
|
||||
|
||||
def build(uri, conn_pid \\ nil) do
|
||||
uri
|
||||
|> build_origin()
|
||||
|> build_pids(conn_pid)
|
||||
|> touch()
|
||||
end
|
||||
|
||||
def touch(%SocketInfo{} = socket_info),
|
||||
do: %{socket_info | connected_until: new_ttl()}
|
||||
|
||||
def connect(%SocketInfo{} = socket_info),
|
||||
do: %{socket_info | state: :connected}
|
||||
|
||||
def expired?(%{connected_until: connected_until}),
|
||||
do: connected_until < :erlang.monotonic_time(:millisecond)
|
||||
|
||||
def origin(uri),
|
||||
do: build_origin(uri).origin
|
||||
|
||||
defp build_pids(socket_info, conn_pid),
|
||||
do: struct(socket_info, pid: self(), conn_pid: conn_pid)
|
||||
|
||||
defp build_origin(uri) when is_binary(uri),
|
||||
do: uri |> URI.parse() |> build_origin
|
||||
|
||||
defp build_origin(%{host: host, port: nil, scheme: scheme}),
|
||||
do: build_origin(%{host: host, port: URI.default_port(scheme)})
|
||||
|
||||
defp build_origin(%{host: host, port: port}),
|
||||
do: %SocketInfo{origin: "#{host}:#{port}"}
|
||||
|
||||
defp new_ttl do
|
||||
connection_duration =
|
||||
Pleroma.Config.get([:fed_sockets, :connection_duration], @default_connection_duration)
|
||||
|
||||
:erlang.monotonic_time(:millisecond) + connection_duration
|
||||
end
|
||||
end
|
|
@ -1,59 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.FedSockets.Supervisor do
|
||||
use Supervisor
|
||||
import Cachex.Spec
|
||||
|
||||
def start_link(opts) do
|
||||
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
|
||||
end
|
||||
|
||||
def init(args) do
|
||||
children = [
|
||||
build_cache(:fed_socket_fetches, args),
|
||||
build_cache(:fed_socket_rejections, args),
|
||||
{Registry, keys: :unique, name: FedSockets.Registry, meta: [rejected: %{}]}
|
||||
]
|
||||
|
||||
opts = [strategy: :one_for_all, name: Pleroma.Web.Streamer.Supervisor]
|
||||
Supervisor.init(children, opts)
|
||||
end
|
||||
|
||||
defp build_cache(name, args) do
|
||||
opts = get_opts(name, args)
|
||||
|
||||
%{
|
||||
id: String.to_atom("#{name}_cache"),
|
||||
start: {Cachex, :start_link, [name, opts]},
|
||||
type: :worker
|
||||
}
|
||||
end
|
||||
|
||||
defp get_opts(cache_name, args)
|
||||
when cache_name in [:fed_socket_fetches, :fed_socket_rejections] do
|
||||
default = get_opts_or_config(args, cache_name, :default, 15_000)
|
||||
interval = get_opts_or_config(args, cache_name, :interval, 3_000)
|
||||
lazy = get_opts_or_config(args, cache_name, :lazy, false)
|
||||
|
||||
[expiration: expiration(default: default, interval: interval, lazy: lazy)]
|
||||
end
|
||||
|
||||
defp get_opts(name, args) do
|
||||
Keyword.get(args, name, [])
|
||||
end
|
||||
|
||||
defp get_opts_or_config(args, name, key, default) do
|
||||
args
|
||||
|> Keyword.get(name, [])
|
||||
|> Keyword.get(key)
|
||||
|> case do
|
||||
nil ->
|
||||
Pleroma.Config.get([:fed_sockets, name, key], default)
|
||||
|
||||
value ->
|
||||
value
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,124 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.FedSockets.FedRegistryTest do
|
||||
use ExUnit.Case
|
||||
|
||||
alias Pleroma.Web.FedSockets
|
||||
alias Pleroma.Web.FedSockets.FedRegistry
|
||||
alias Pleroma.Web.FedSockets.SocketInfo
|
||||
|
||||
@good_domain "http://good.domain"
|
||||
@good_domain_origin "good.domain:80"
|
||||
|
||||
setup do
|
||||
start_supervised({Pleroma.Web.FedSockets.Supervisor, []})
|
||||
build_test_socket(@good_domain)
|
||||
Process.sleep(10)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
describe "add_fed_socket/1 without conflicting sockets" do
|
||||
test "can be added" do
|
||||
Process.sleep(10)
|
||||
assert {:ok, %SocketInfo{origin: origin}} = FedRegistry.get_fed_socket(@good_domain_origin)
|
||||
assert origin == "good.domain:80"
|
||||
end
|
||||
|
||||
test "multiple origins can be added" do
|
||||
build_test_socket("http://anothergood.domain")
|
||||
Process.sleep(10)
|
||||
|
||||
assert {:ok, %SocketInfo{origin: origin_1}} =
|
||||
FedRegistry.get_fed_socket(@good_domain_origin)
|
||||
|
||||
assert {:ok, %SocketInfo{origin: origin_2}} =
|
||||
FedRegistry.get_fed_socket("anothergood.domain:80")
|
||||
|
||||
assert origin_1 == "good.domain:80"
|
||||
assert origin_2 == "anothergood.domain:80"
|
||||
assert FedRegistry.list_all() |> Enum.count() == 2
|
||||
end
|
||||
end
|
||||
|
||||
describe "add_fed_socket/1 when duplicate sockets conflict" do
|
||||
setup do
|
||||
build_test_socket(@good_domain)
|
||||
build_test_socket(@good_domain)
|
||||
Process.sleep(10)
|
||||
:ok
|
||||
end
|
||||
|
||||
test "will be ignored" do
|
||||
assert {:ok, %SocketInfo{origin: origin, pid: _pid_one}} =
|
||||
FedRegistry.get_fed_socket(@good_domain_origin)
|
||||
|
||||
assert origin == "good.domain:80"
|
||||
|
||||
assert FedRegistry.list_all() |> Enum.count() == 1
|
||||
end
|
||||
|
||||
test "the newer process will be closed" do
|
||||
pid_two = build_test_socket(@good_domain)
|
||||
|
||||
assert {:ok, %SocketInfo{origin: origin, pid: _pid_one}} =
|
||||
FedRegistry.get_fed_socket(@good_domain_origin)
|
||||
|
||||
assert origin == "good.domain:80"
|
||||
Process.sleep(10)
|
||||
|
||||
refute Process.alive?(pid_two)
|
||||
|
||||
assert FedRegistry.list_all() |> Enum.count() == 1
|
||||
end
|
||||
end
|
||||
|
||||
describe "get_fed_socket/1" do
|
||||
test "returns missing for unknown hosts" do
|
||||
assert {:error, :missing} = FedRegistry.get_fed_socket("not_a_dmoain")
|
||||
end
|
||||
|
||||
test "returns rejected for hosts previously rejected" do
|
||||
"rejected.domain:80"
|
||||
|> FedSockets.uri_for_origin()
|
||||
|> FedRegistry.set_host_rejected()
|
||||
|
||||
assert {:error, :rejected} = FedRegistry.get_fed_socket("rejected.domain:80")
|
||||
end
|
||||
|
||||
test "can retrieve a previously added SocketInfo" do
|
||||
build_test_socket(@good_domain)
|
||||
Process.sleep(10)
|
||||
assert {:ok, %SocketInfo{origin: origin}} = FedRegistry.get_fed_socket(@good_domain_origin)
|
||||
assert origin == "good.domain:80"
|
||||
end
|
||||
|
||||
test "removes references to SocketInfos when the process crashes" do
|
||||
assert {:ok, %SocketInfo{origin: origin, pid: pid}} =
|
||||
FedRegistry.get_fed_socket(@good_domain_origin)
|
||||
|
||||
assert origin == "good.domain:80"
|
||||
|
||||
Process.exit(pid, :testing)
|
||||
Process.sleep(100)
|
||||
assert {:error, :missing} = FedRegistry.get_fed_socket(@good_domain_origin)
|
||||
end
|
||||
end
|
||||
|
||||
def build_test_socket(uri) do
|
||||
Kernel.spawn(fn -> fed_socket_almost(uri) end)
|
||||
end
|
||||
|
||||
def fed_socket_almost(origin) do
|
||||
FedRegistry.add_fed_socket(origin)
|
||||
|
||||
receive do
|
||||
:close ->
|
||||
:ok
|
||||
after
|
||||
5_000 -> :timeout
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,67 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.FedSockets.FetchRegistryTest do
|
||||
use ExUnit.Case
|
||||
|
||||
alias Pleroma.Web.FedSockets.FetchRegistry
|
||||
alias Pleroma.Web.FedSockets.FetchRegistry.FetchRegistryData
|
||||
|
||||
@json_message "hello"
|
||||
@json_reply "hello back"
|
||||
|
||||
setup do
|
||||
start_supervised(
|
||||
{Pleroma.Web.FedSockets.Supervisor,
|
||||
[
|
||||
ping_interval: 8,
|
||||
connection_duration: 15,
|
||||
rejection_duration: 5,
|
||||
fed_socket_fetches: [default: 10, interval: 10]
|
||||
]}
|
||||
)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
test "fetches can be stored" do
|
||||
uuid = FetchRegistry.register_fetch(@json_message)
|
||||
|
||||
assert {:error, :waiting} = FetchRegistry.check_fetch(uuid)
|
||||
end
|
||||
|
||||
test "fetches can return" do
|
||||
uuid = FetchRegistry.register_fetch(@json_message)
|
||||
task = Task.async(fn -> FetchRegistry.register_fetch_received(uuid, @json_reply) end)
|
||||
|
||||
assert {:error, :waiting} = FetchRegistry.check_fetch(uuid)
|
||||
Task.await(task)
|
||||
|
||||
assert {:ok, %FetchRegistryData{received_json: received_json}} =
|
||||
FetchRegistry.check_fetch(uuid)
|
||||
|
||||
assert received_json == @json_reply
|
||||
end
|
||||
|
||||
test "fetches are deleted once popped from stack" do
|
||||
uuid = FetchRegistry.register_fetch(@json_message)
|
||||
task = Task.async(fn -> FetchRegistry.register_fetch_received(uuid, @json_reply) end)
|
||||
Task.await(task)
|
||||
|
||||
assert {:ok, %FetchRegistryData{received_json: received_json}} =
|
||||
FetchRegistry.check_fetch(uuid)
|
||||
|
||||
assert received_json == @json_reply
|
||||
assert {:ok, @json_reply} = FetchRegistry.pop_fetch(uuid)
|
||||
|
||||
assert {:error, :missing} = FetchRegistry.check_fetch(uuid)
|
||||
end
|
||||
|
||||
test "fetches can time out" do
|
||||
uuid = FetchRegistry.register_fetch(@json_message)
|
||||
assert {:error, :waiting} = FetchRegistry.check_fetch(uuid)
|
||||
Process.sleep(500)
|
||||
assert {:error, :missing} = FetchRegistry.check_fetch(uuid)
|
||||
end
|
||||
end
|
|
@ -1,118 +0,0 @@
|
|||
# Pleroma: A lightweight social networking server
|
||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
|
||||
# SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
defmodule Pleroma.Web.FedSockets.SocketInfoTest do
|
||||
use ExUnit.Case
|
||||
|
||||
alias Pleroma.Web.FedSockets
|
||||
alias Pleroma.Web.FedSockets.SocketInfo
|
||||
|
||||
describe "uri_for_origin" do
|
||||
test "provides the fed_socket URL given the origin information" do
|
||||
endpoint = "example.com:4000"
|
||||
assert FedSockets.uri_for_origin(endpoint) =~ "ws://"
|
||||
assert FedSockets.uri_for_origin(endpoint) =~ endpoint
|
||||
end
|
||||
end
|
||||
|
||||
describe "origin" do
|
||||
test "will provide the origin field given a url" do
|
||||
endpoint = "example.com:4000"
|
||||
assert SocketInfo.origin("ws://#{endpoint}") == endpoint
|
||||
assert SocketInfo.origin("http://#{endpoint}") == endpoint
|
||||
assert SocketInfo.origin("https://#{endpoint}") == endpoint
|
||||
end
|
||||
|
||||
test "will proide the origin field given a uri" do
|
||||
endpoint = "example.com:4000"
|
||||
uri = URI.parse("http://#{endpoint}")
|
||||
|
||||
assert SocketInfo.origin(uri) == endpoint
|
||||
end
|
||||
end
|
||||
|
||||
describe "touch" do
|
||||
test "will update the TTL" do
|
||||
endpoint = "example.com:4000"
|
||||
socket = SocketInfo.build("ws://#{endpoint}")
|
||||
Process.sleep(2)
|
||||
touched_socket = SocketInfo.touch(socket)
|
||||
|
||||
assert socket.connected_until < touched_socket.connected_until
|
||||
end
|
||||
end
|
||||
|
||||
describe "expired?" do
|
||||
setup do
|
||||
start_supervised(
|
||||
{Pleroma.Web.FedSockets.Supervisor,
|
||||
[
|
||||
ping_interval: 8,
|
||||
connection_duration: 5,
|
||||
rejection_duration: 5,
|
||||
fed_socket_rejections: [lazy: true]
|
||||
]}
|
||||
)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
test "tests if the TTL is exceeded" do
|
||||
endpoint = "example.com:4000"
|
||||
socket = SocketInfo.build("ws://#{endpoint}")
|
||||
refute SocketInfo.expired?(socket)
|
||||
Process.sleep(10)
|
||||
|
||||
assert SocketInfo.expired?(socket)
|
||||
end
|
||||
end
|
||||
|
||||
describe "creating outgoing connection records" do
|
||||
test "can be passed a string" do
|
||||
assert %{conn_pid: :pid, origin: _origin} = SocketInfo.build("example.com:4000", :pid)
|
||||
end
|
||||
|
||||
test "can be passed a URI" do
|
||||
uri = URI.parse("http://example.com:4000")
|
||||
assert %{conn_pid: :pid, origin: origin} = SocketInfo.build(uri, :pid)
|
||||
assert origin =~ "example.com:4000"
|
||||
end
|
||||
|
||||
test "will include the port number" do
|
||||
assert %{conn_pid: :pid, origin: origin} = SocketInfo.build("http://example.com:4000", :pid)
|
||||
|
||||
assert origin =~ ":4000"
|
||||
end
|
||||
|
||||
test "will provide the port if missing" do
|
||||
assert %{conn_pid: :pid, origin: "example.com:80"} =
|
||||
SocketInfo.build("http://example.com", :pid)
|
||||
|
||||
assert %{conn_pid: :pid, origin: "example.com:443"} =
|
||||
SocketInfo.build("https://example.com", :pid)
|
||||
end
|
||||
end
|
||||
|
||||
describe "creating incoming connection records" do
|
||||
test "can be passed a string" do
|
||||
assert %{pid: _, origin: _origin} = SocketInfo.build("example.com:4000")
|
||||
end
|
||||
|
||||
test "can be passed a URI" do
|
||||
uri = URI.parse("example.com:4000")
|
||||
assert %{pid: _, origin: _origin} = SocketInfo.build(uri)
|
||||
end
|
||||
|
||||
test "will include the port number" do
|
||||
assert %{pid: _, origin: origin} = SocketInfo.build("http://example.com:4000")
|
||||
|
||||
assert origin =~ ":4000"
|
||||
end
|
||||
|
||||
test "will provide the port if missing" do
|
||||
assert %{pid: _, origin: "example.com:80"} = SocketInfo.build("http://example.com")
|
||||
assert %{pid: _, origin: "example.com:443"} = SocketInfo.build("https://example.com")
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in a new issue