From 6eb87aba5aba593600ba7179a035e1b052677518 Mon Sep 17 00:00:00 2001 From: Peter Zingg Date: Sun, 18 Dec 2022 12:43:35 -0800 Subject: [PATCH] rsscloud and websub implementation, passing tests --- lib/pleroma/feed/headers.ex | 27 + lib/pleroma/feed/subscription.ex | 47 ++ lib/pleroma/feed/subscription_update.ex | 23 + lib/pleroma/feed/subscriptions.ex | 482 ++++++++++++++++++ lib/pleroma/feed/topic.ex | 23 + lib/pleroma/feed/update.ex | 25 + lib/pleroma/feed/updates.ex | 133 +++++ lib/pleroma/web/feed/rsscloud_controller.ex | 158 ++++++ lib/pleroma/web/feed/websub_controller.ex | 78 +++ lib/pleroma/web/router.ex | 17 + .../workers/dispatch_feed_update_worker.ex | 120 +++++ .../prune_feed_subscriptions_worker.ex | 46 ++ test/pleroma/feed/query_test.exs | 142 ++++++ test/pleroma/feed/rsscloud_test.exs | 479 +++++++++++++++++ test/pleroma/feed/subscriptions_test.exs | 3 + test/pleroma/feed/updates_test.exs | 80 +++ test/pleroma/feed/websub_test.exs | 472 +++++++++++++++++ .../web/feed/rsscloud_controller_test.exs | 118 +++++ .../web/feed/websub_controller_test.exs | 81 +++ test/support/conn_case.ex | 3 + test/support/data_case.ex | 3 + test/support/tesla_mock_agent.ex | 36 ++ 22 files changed, 2596 insertions(+) create mode 100644 lib/pleroma/feed/headers.ex create mode 100644 lib/pleroma/feed/subscription.ex create mode 100644 lib/pleroma/feed/subscription_update.ex create mode 100644 lib/pleroma/feed/subscriptions.ex create mode 100644 lib/pleroma/feed/topic.ex create mode 100644 lib/pleroma/feed/update.ex create mode 100644 lib/pleroma/feed/updates.ex create mode 100644 lib/pleroma/web/feed/rsscloud_controller.ex create mode 100644 lib/pleroma/web/feed/websub_controller.ex create mode 100644 lib/pleroma/workers/dispatch_feed_update_worker.ex create mode 100644 lib/pleroma/workers/prune_feed_subscriptions_worker.ex create mode 100644 test/pleroma/feed/query_test.exs create mode 100644 test/pleroma/feed/rsscloud_test.exs create mode 100644 test/pleroma/feed/subscriptions_test.exs create mode 100644 test/pleroma/feed/updates_test.exs create mode 100644 test/pleroma/feed/websub_test.exs create mode 100644 test/pleroma/web/feed/rsscloud_controller_test.exs create mode 100644 test/pleroma/web/feed/websub_controller_test.exs create mode 100644 test/support/tesla_mock_agent.ex diff --git a/lib/pleroma/feed/headers.ex b/lib/pleroma/feed/headers.ex new file mode 100644 index 000000000..753df5e01 --- /dev/null +++ b/lib/pleroma/feed/headers.ex @@ -0,0 +1,27 @@ +defmodule Pleroma.Feed.Headers do + @behaviour Ecto.Type + def type, do: :binary + + # Provide our own casting rules. + def cast(term) do + {:ok, term} + end + + def embed_as(_format) do + :self + end + + def equal?(a, b) do + a == b + end + + # When loading data from the database, we are guaranteed to + # receive an integer (as databases are strict) and we will + # just return it to be stored in the schema struct. + def load(binary), do: {:ok, :erlang.binary_to_term(binary)} + + # When dumping data to the database, we *expect* an integer + # but any value could be inserted into the struct, so we need + # guard against them. + def dump(term), do: {:ok, :erlang.term_to_binary(term)} +end diff --git a/lib/pleroma/feed/subscription.ex b/lib/pleroma/feed/subscription.ex new file mode 100644 index 000000000..70e9489db --- /dev/null +++ b/lib/pleroma/feed/subscription.ex @@ -0,0 +1,47 @@ +defmodule Pleroma.Feed.Subscription do + use Ecto.Schema + import Ecto.Changeset + + schema "feed_subscriptions" do + belongs_to(:topic, Pleroma.Feed.Topic) + has_many(:subscription_updates, Pleroma.Feed.SubscriptionUpdate) + + field(:callback_url, :string) + field(:lease_seconds, :float) + field(:expires_at, :naive_datetime) + field(:secret, :string) + field(:diff_domain, :boolean) + field(:api, Ecto.Enum, values: [:websub, :rsscloud]) + + timestamps() + end + + @doc false + def changeset(subscription, attrs) do + subscription + |> cast(attrs, [ + :topic_id, + :api, + :callback_url, + :lease_seconds, + :expires_at, + :secret, + :diff_domain + ]) + |> validate_required([:api, :callback_url, :lease_seconds, :expires_at]) + |> validate_method() + |> foreign_key_constraint(:topic_id) + |> unique_constraint([:api, :topic_id, :callback_url]) + end + + defp validate_method(changeset) do + api = get_field(changeset, :api) + + case api do + :websub -> changeset + :rsscloud -> validate_required(changeset, :diff_domain) + nil -> add_error(changeset, :api, "is required") + other -> add_error(changeset, :api, "is invalid: #{other}") + end + end +end diff --git a/lib/pleroma/feed/subscription_update.ex b/lib/pleroma/feed/subscription_update.ex new file mode 100644 index 000000000..398004000 --- /dev/null +++ b/lib/pleroma/feed/subscription_update.ex @@ -0,0 +1,23 @@ +defmodule Pleroma.Feed.SubscriptionUpdate do + use Ecto.Schema + import Ecto.Changeset + + schema "feed_subscription_updates" do + belongs_to(:update, Pleroma.Feed.Update) + belongs_to(:subscription, Pleroma.Feed.Subscription) + + field(:pushed_at, :naive_datetime) + field(:status_code, :integer) + + timestamps() + end + + @doc false + def changeset(topic, attrs) do + topic + |> cast(attrs, [:update_id, :subscription_id, :pushed_at, :status_code]) + |> validate_required([:pushed_at, :status_code]) + |> foreign_key_constraint(:update_id) + |> foreign_key_constraint(:subscription_id) + end +end diff --git a/lib/pleroma/feed/subscriptions.ex b/lib/pleroma/feed/subscriptions.ex new file mode 100644 index 000000000..d48398708 --- /dev/null +++ b/lib/pleroma/feed/subscriptions.ex @@ -0,0 +1,482 @@ +defmodule Pleroma.Feed.Subscriptions do + @moduledoc """ + The Subscriptions context. + """ + require Logger + + import Ecto.Query, warn: false + alias Pleroma.Repo + alias Pleroma.HTTP + + alias Pleroma.Feed.Subscription + alias Pleroma.Feed.Topic + + def subscribe(api, topic_url, callback_url, subscription_lease_seconds, opts \\ []) do + with {:ok, _} <- validate_url(topic_url), + {:ok, _callback_uri} <- validate_url(callback_url), + {:ok, topic} <- find_or_create_topic(topic_url), + :ok <- validate_subscription(api, topic, callback_url, subscription_lease_seconds, opts) do + find_or_create_subscription(api, topic, callback_url, subscription_lease_seconds, opts) + else + {:subscribe_validation_error, reason} -> + # WebSub must notify callback on failure. Ignore return value. + # RSSCloud just returns an error to the caller. + _ = deny_subscription(api, callback_url, topic_url, reason) + {:error, reason} + + {:error, %Ecto.Changeset{} = changeset} -> + Logger.error("subscribe data error: #{inspect(changeset.errors)}") + {:error, "data error"} + + {:error, reason} -> + {:error, reason} + end + end + + def unsubscribe(topic_url, callback_url) do + with {:ok, _} <- validate_url(topic_url), + {:ok, callback_uri} <- validate_url(callback_url), + %Topic{} = topic <- get_topic_by_url(topic_url), + %Subscription{api: api} = subscription <- + Repo.get_by(Subscription, topic_id: topic.id, callback_url: callback_url) do + if api == :websub do + _ = validate_unsubscribe(topic, callback_uri) + end + + subscription + |> Subscription.changeset(%{ + expires_at: NaiveDateTime.utc_now() + }) + |> Repo.update() + else + _ -> {:error, :subscription_not_found} + end + end + + @doc """ + We callback on WebSub subscriptions just before deleting them. + """ + def final_unsubscribe(%Subscription{api: :websub} = subscription) do + with {:ok, callback_uri} <- validate_url(subscription.callback_url) do + validate_unsubscribe(subscription.topic, callback_uri) + else + _ -> + {:unsubscribe_validation_error, "Subscription with improper callback_url"} + end + end + + def final_unsubscribe(%Subscription{api: :rsscloud}), do: :ok + + def get_topic_by_url(topic_url) do + Repo.get_by(Topic, url: topic_url) + end + + @doc """ + Find or create a topic. + + Topics can exist without any valid subscriptions. Additionally a subscription can fail to validate and a topic still exist. + + ## Examples + + iex> find_or_create_topic("https://some-topic-url") + {:ok, %Topic{}} + """ + def find_or_create_topic(topic_url) do + case Repo.get_by(Topic, url: topic_url) do + %Topic{} = topic -> + {:ok, topic} + + nil -> + %Topic{} + |> Topic.changeset(%{ + url: topic_url, + expires_at: ~N[2046-12-31 23:59:00] + }) + |> Repo.insert() + end + end + + def find_subscription_by_api_topic_and_url(api, %Topic{} = topic, callback_url) do + Repo.get_by(Subscription, api: api, topic_id: topic.id, callback_url: callback_url) + end + + def find_or_create_subscription( + api, + %Topic{} = topic, + callback_url, + subscription_lease_seconds, + opts + ) do + # BACKPORT api: api + lease_seconds = convert_lease_seconds(subscription_lease_seconds) + + case Repo.get_by(Subscription, api: api, topic_id: topic.id, callback_url: callback_url) do + %Subscription{} = subscription -> + subscription + |> Subscription.changeset(%{ + lease_seconds: lease_seconds, + expires_at: from_now(lease_seconds), + diff_domain: Keyword.get(opts, :diff_domain, false), + secret: Keyword.get(opts, :secret) + }) + |> Repo.update() + + nil -> + create_subscription(api, topic, callback_url, lease_seconds, opts) + end + end + + @doc """ + Validate a WebSub subscription by sending a HTTP GET to the subscriber's callback_url. + Validate an RSSCloud subscription by sending a HTTP GET or POST to the subscriber's callback_url. + """ + def validate_subscription( + :websub, + %Topic{} = topic, + callback_url, + subscription_lease_seconds, + _opts + ) do + challenge = :crypto.strong_rand_bytes(32) |> Base.url_encode64() |> binary_part(0, 32) + + query = [ + {"hub.mode", "subscribe"}, + {"hub.topic", topic.url}, + {"hub.challenge", challenge}, + {"hub.lease_seconds", to_string(subscription_lease_seconds)} + ] + + case HTTP.get(callback_url, [], params: query) do + {:ok, %Tesla.Env{status: code, body: body}} when code >= 200 and code < 300 -> + # Ensure the response body matches our challenge + if challenge != String.trim(body) do + {:subscribe_validation_error, :failed_challenge_body} + else + :ok + end + + other -> + handle_validation_errors(other) + end + end + + def validate_subscription( + :rsscloud, + %Topic{} = topic, + callback_url, + _lease_seconds, + opts + ) do + diff_domain = Keyword.get(opts, :diff_domain, false) + validate_rsscloud_subscription(topic, callback_url, diff_domain) + end + + def validate_rsscloud_subscription(topic, callback_url, true) do + challenge = :crypto.strong_rand_bytes(32) |> Base.url_encode64() |> binary_part(0, 32) + + query = [ + {"url", topic.url}, + {"challenge", challenge} + ] + + case HTTP.get(callback_url, [], params: query) do + {:ok, %Tesla.Env{status: code, body: body}} when code >= 200 and code < 300 -> + # Ensure the response body contains our challenge + if String.contains?(body, challenge) do + :ok + else + {:subscribe_validation_error, :failed_challenge_body} + end + + other -> + handle_validation_errors(other) + end + end + + def validate_rsscloud_subscription(topic, callback_uri, false) do + callback_url = to_string(callback_uri) + body = %{url: topic.url} |> URI.encode_query() + headers = [{"content-type", "application/x-www-form-urlencoded"}] + + case HTTP.post(callback_url, body, headers) do + {:ok, %Tesla.Env{status: code}} when code >= 200 and code < 300 -> + :ok + + other -> + handle_validation_errors(other) + end + end + + def handle_validation_errors({:ok, %Tesla.Env{status: 404}}) do + {:subscribe_validation_error, :failed_404_response} + end + + def handle_validation_errors({:ok, %Tesla.Env{} = env}) do + Logger.error("failed_unknown_response #{inspect(env)}") + {:subscribe_validation_error, :failed_unknown_response} + end + + def handle_validation_errors({:error, :invalid_request}) do + {:subscribe_validation_error, :invalid_request} + end + + def handle_validation_errors({:error, reason}) do + Logger.error("Got unexpected error from validate subscription call: #{reason}") + {:subscribe_validation_error, :failed_unknown_error} + end + + @doc """ + Validate a WebSub unsubscription by sending a HTTP GET to the subscriber's callback_url. + """ + def validate_unsubscribe( + %Topic{} = topic, + %URI{} = callback_uri + ) do + challenge = :crypto.strong_rand_bytes(32) |> Base.url_encode64() |> binary_part(0, 32) + + query = [ + {"hub.mode", "unsubscribe"}, + {"hub.topic", topic.url}, + {"hub.challenge", challenge} + ] + + callback_url = to_string(callback_uri) + + case HTTP.get(callback_url, [], params: query) do + {:ok, %Tesla.Env{}} -> + :ok + + {:error, reason} -> + Logger.error("Got unexpected error from validate unsubscribe call: #{reason}") + {:unsubscribe_validation_error, :failed_unknown_error} + end + end + + def create_subscription(api, %Topic{} = topic, callback_url, subscription_lease_seconds, opts) do + lease_seconds = convert_lease_seconds(subscription_lease_seconds) + # BACKPORT + %Subscription{ + topic_id: topic.id + } + |> Subscription.changeset(%{ + api: api, + callback_url: callback_url, + lease_seconds: lease_seconds, + expires_at: from_now(lease_seconds), + diff_domain: Keyword.get(opts, :diff_domain, false), + secret: Keyword.get(opts, :secret) + }) + |> Repo.insert() + end + + defp convert_lease_seconds(seconds) when is_integer(seconds), do: seconds + + defp convert_lease_seconds(seconds) when is_binary(seconds) do + case String.trim(seconds) |> Integer.parse() do + {seconds, ""} -> + seconds + + _ -> + Logger.error("Invalid lease value. not an integer: '#{seconds}'") + 0 + end + end + + def deny_subscription(:websub, callback_url, topic_url, reason) do + # If (and when) the subscription is denied, the hub MUST inform the subscriber by sending an HTTP [RFC7231] + # (or HTTPS [RFC2818]) GET request to the subscriber's callback URL as given in the subscription request. This request has the following query string arguments appended (format described in Section 4 of [URL]): + with {:ok, callback_uri} <- validate_url(callback_url) do + query = [ + {"hub.mode", "denied"}, + {"hub.topic", topic_url}, + {"hub.reason", reason_string(reason)} + ] + + final_url = to_string(callback_uri) + + # We don't especially care about a response on this one + case HTTP.get(final_url, [], params: query) do + {:ok, %Tesla.Env{}} -> + :ok + + {:error, reason} -> + {:error, reason} + end + else + {:error, reason} -> + {:error, reason} + end + end + + def deny_subscription(:rsscloud, _callback_url, _topic_url, _reason), do: :ok + + def reason_string(reason) when is_binary(reason), do: reason + def reason_string(reason) when is_atom(reason), do: Atom.to_string(reason) + def reason_string(reason), do: IO.inspect(reason) + + def list_active_topic_subscriptions(%Topic{} = topic) do + now = NaiveDateTime.utc_now() + + from(s in Subscription, + where: s.topic_id == ^topic.id and s.expires_at >= ^now + ) + |> Repo.all() + end + + def list_inactive_subscriptions(now) do + from(s in Subscription, + where: s.expires_at < ^now, + join: t in assoc(s, :topic), + preload: [topic: t] + ) + |> Repo.all() + end + + @spec delete_subscription(Subscription.t(), non_neg_integer(), NaiveDateTime.t() | nil) :: + {non_neg_integer(), non_neg_integer(), [integer()]} | {:error, term()} + def delete_subscription(subscription, topic_lease_seconds, now \\ nil) do + now = now || NaiveDateTime.utc_now() + + Repo.transaction(fn -> + topic_id = subscription.topic_id + + case Repo.delete(subscription) do + {:ok, _} -> + {n_topics, topic_ids} = update_topic_expirations([topic_id], topic_lease_seconds, now) + {1, n_topics, topic_ids} + + _ -> + {0, 0, []} + end + end) + |> case do + {:ok, res} -> res + {:error, reason} -> {:error, reason} + end + end + + # BACKPORT + @spec delete_all_inactive_subscriptions(non_neg_integer(), NaiveDateTime.t() | nil) :: + {non_neg_integer(), non_neg_integer(), [integer()]} | {:error, term()} + def delete_all_inactive_subscriptions(topic_lease_seconds, now \\ nil) do + Repo.transaction(fn -> + # Cascades to delete all SubscriptionUpdates as well + {n_subs, topic_ids} = + from(s in Subscription, + select: s.topic_id, + where: s.expires_at < ^now + ) + |> Repo.delete_all() + + # Update those topics who now don't have a subscription + {n_topics, topic_ids} = update_topic_expirations(topic_ids, topic_lease_seconds, now) + + {n_subs, n_topics, topic_ids} + end) + |> case do + {:ok, res} -> res + {:error, reason} -> {:error, reason} + end + end + + def list_inactive_topics(now) do + from(t in Topic, + where: + t.expires_at < ^now and + fragment("NOT EXISTS (SELECT * FROM feed_subscriptions s WHERE s.topic_id = ?)", t.id) + ) + |> Repo.all() + end + + @spec delete_all_inactive_topics(NaiveDateTime.t() | nil) :: {non_neg_integer(), [integer()]} + def delete_all_inactive_topics(now) do + now = now || NaiveDateTime.utc_now() + + from(t in Topic, + where: + t.expires_at < ^now and + fragment("NOT EXISTS (SELECT * FROM feed_subscriptions s WHERE s.topic_id = ?)", t.id) + ) + |> Repo.delete_all() + end + + @spec update_topic_expirations([integer()], non_neg_integer(), NaiveDateTime.t() | nil) :: + {non_neg_integer(), [integer()]} + def update_topic_expirations(topic_ids, topic_lease_seconds, now \\ nil) do + now = now || NaiveDateTime.utc_now() + lease_seconds = convert_lease_seconds(topic_lease_seconds) + expires_at = NaiveDateTime.add(now, lease_seconds, :second) + + from(t in Topic, + select: t.id, + where: + not exists( + from(s in Subscription, + where: s.topic_id in ^topic_ids + ) + ), + update: [set: [updated_at: ^now, expires_at: ^expires_at]] + ) + |> Repo.update_all([]) + end + + def from_now(seconds) do + NaiveDateTime.utc_now() |> NaiveDateTime.add(seconds, :second) + end + + defp validate_url(url) when is_binary(url) do + case URI.new(url) do + {:ok, uri} -> + if uri.scheme in ["http", "https"] do + {:ok, uri} + else + {:error, :url_not_http} + end + + err -> + err + end + end + + defp validate_url(_), do: {:error, :url_not_binary} + + def count_topics do + Repo.one( + from(u in Topic, + select: count(u.id) + ) + ) + end + + def count_active_subscriptions do + now = NaiveDateTime.utc_now() + + Repo.one( + from(s in Subscription, + where: s.expires_at >= ^now, + select: count(s.id) + ) + ) + end + + def subscription_updates_chart do + case Repo.query(""" + select date(pushed_at) as "date", count(*) as "count" + from subscription_updates + group by date(pushed_at) + order by date(pushed_at) desc + limit 30; + """) do + {:ok, %Postgrex.Result{rows: rows}} -> + flipped = Enum.reverse(rows) + + %{ + keys: Enum.map(flipped, fn [key, _] -> key end), + values: Enum.map(flipped, fn [_, value] -> value end) + } + + _ -> + %{keys: [], values: []} + end + end +end diff --git a/lib/pleroma/feed/topic.ex b/lib/pleroma/feed/topic.ex new file mode 100644 index 000000000..de3703520 --- /dev/null +++ b/lib/pleroma/feed/topic.ex @@ -0,0 +1,23 @@ +defmodule Pleroma.Feed.Topic do + use Ecto.Schema + import Ecto.Changeset + + schema "feed_topics" do + # BACKPORT + has_many(:subscriptions, Pleroma.Feed.Subscription) + has_many(:updates, Pleroma.Feed.Update) + + field(:url, :string) + field(:expires_at, :naive_datetime) + + timestamps() + end + + @doc false + def changeset(topic, attrs) do + topic + |> cast(attrs, [:url, :expires_at]) + |> validate_required([:url]) + |> unique_constraint([:url]) + end +end diff --git a/lib/pleroma/feed/update.ex b/lib/pleroma/feed/update.ex new file mode 100644 index 000000000..ffa7a8f5e --- /dev/null +++ b/lib/pleroma/feed/update.ex @@ -0,0 +1,25 @@ +defmodule Pleroma.Feed.Update do + use Ecto.Schema + import Ecto.Changeset + + schema "feed_updates" do + belongs_to(:topic, Pleroma.Feed.Topic) + has_many(:subscription_updates, Pleroma.Feed.SubscriptionUpdate) + + field(:body, :binary) + field(:headers, Pleroma.Feed.Headers) + field(:content_type, :string) + field(:links, {:array, :string}) + field(:hash, :string) + + timestamps() + end + + @doc false + def changeset(topic, attrs) do + topic + |> cast(attrs, [:topic_id, :body, :headers, :content_type, :hash, :links]) + |> validate_required([:body, :content_type, :hash, :links]) + |> foreign_key_constraint(:topic_id) + end +end diff --git a/lib/pleroma/feed/updates.ex b/lib/pleroma/feed/updates.ex new file mode 100644 index 000000000..2922b4802 --- /dev/null +++ b/lib/pleroma/feed/updates.ex @@ -0,0 +1,133 @@ +defmodule Pleroma.Feed.Updates do + @moduledoc """ + The Updates context. + """ + require Logger + + import Ecto.Query, warn: false + alias Pleroma.Repo + alias Pleroma.HTTP + + alias Pleroma.Feed.Subscriptions + alias Pleroma.Feed.SubscriptionUpdate + alias Pleroma.Feed.Topic + alias Pleroma.Feed.Update + + def publish(topic_url) do + case Subscriptions.get_topic_by_url(topic_url) do + # Get all active subscriptions and publish the update to them + %Topic{} = topic -> + case HTTP.get(topic.url, []) do + {:ok, %Tesla.Env{status: code} = env} + when code >= 200 and code < 300 -> + with {:ok, update} <- create_update(topic, env) do + # Realistically we should do all of this async, for now we'll do querying line and dispatch async + subscribers = Subscriptions.list_active_topic_subscriptions(topic) + + Enum.each(subscribers, fn subscription -> + Logger.debug("Queueing dispatch to #{subscription.callback_url}") + + Pleroma.Workers.DispatchFeedUpdateWorker.new(%{ + callback_url: subscription.callback_url, + update_id: update.id, + subscription_id: subscription.id, + subscription_api: subscription.api, + secret: subscription.secret + }) + |> Oban.insert() + end) + + Logger.info("Updates.publish: Sending updates for #{topic_url}") + {:ok, update} + else + {:error, _changeset} -> + Logger.error("Updates.publish: Could not create new record for #{topic.url}") + {:error, "Error creating update record."} + end + + _ -> + Logger.error("Updates.publish: Unsuccessful response code for #{topic.url}") + {:error, "Publish URL did not return a successful status code."} + end + + nil -> + # Nothing found + Logger.error("Updates.publish: Did not find topic for #{topic_url}") + {:error, "Topic not found for topic URL."} + + err -> + Logger.error("Updates.publish: Unknown error #{inspect(err)}") + {:error, "Unknown error."} + end + end + + def create_update(%Topic{} = topic, %Tesla.Env{body: body, headers: headers} = env) + when is_binary(body) do + content_type = Tesla.get_header(env, "content-type") || "application/octet-stream" + + # BACKPORT + %Update{ + topic_id: topic.id + } + |> Update.changeset(%{ + body: body, + headers: headers, + content_type: content_type, + links: Tesla.get_headers(env, "link"), + hash: :crypto.hash(:sha256, body) |> Base.encode16(case: :lower) + }) + |> Repo.insert() + end + + @doc """ + Create a subscription update, uses ID's for quick insertion + """ + def create_subscription_update(update_id, subscription_id, status_code) + when is_integer(update_id) and is_integer(subscription_id) do + %SubscriptionUpdate{ + update_id: update_id, + subscription_id: subscription_id + } + |> SubscriptionUpdate.changeset(%{ + pushed_at: NaiveDateTime.utc_now(), + status_code: status_code + }) + |> Repo.insert() + end + + def get_update(id) do + Repo.get(Update, id) + end + + def get_update_and_topic(id) do + Repo.get(Update, id) |> Repo.preload(:topic) + end + + def get_subscription_update(id) do + Repo.get(SubscriptionUpdate, id) + end + + def count_30min_updates do + now = NaiveDateTime.utc_now() + time_ago = NaiveDateTime.add(now, -1_800, :second) + + Repo.one( + from(u in Update, + where: u.inserted_at > ^time_ago and u.inserted_at < ^now, + select: count(u.id) + ) + ) + end + + def count_30min_subscription_updates do + now = NaiveDateTime.utc_now() + time_ago = NaiveDateTime.add(now, -1_800, :second) + + Repo.one( + from(u in SubscriptionUpdate, + where: u.inserted_at > ^time_ago and u.inserted_at < ^now, + select: count(u.id) + ) + ) + end +end diff --git a/lib/pleroma/web/feed/rsscloud_controller.ex b/lib/pleroma/web/feed/rsscloud_controller.ex new file mode 100644 index 000000000..cbf12d3d7 --- /dev/null +++ b/lib/pleroma/web/feed/rsscloud_controller.ex @@ -0,0 +1,158 @@ +defmodule Pleroma.Web.Feed.RSSCloudController do + use Pleroma.Web, :controller + + require Logger + + alias Pleroma.Feed.Subscriptions + + # RSSCloud subscriptions expire after 25 hours (no options!) + @subscription_lease_seconds 90_000 + + def ping(conn, _params) do + Logger.error("ping not implemented") + + handle_response({:error, :unimplemented}, conn) + end + + def please_notify(conn, _params) do + remote_ip = + case Plug.Conn.get_req_header(conn, "x-forwarded-for") do + [ip] -> + ip + + _ -> + with {a, b, c, d} <- conn.remote_ip do + "#{a}.#{b}.#{c}.#{d}" + else + _ -> "127.0.0.1" + end + end + |> case do + "127.0.0.1" -> "localhost" + ip_or_address -> ip_or_address + end + + result = + with {:ok, {callback, topics, diff_domain}} <- + parse_body_params(conn.body_params, remote_ip) do + {good, bad} = + topics + |> Enum.map(fn topic -> + Subscriptions.subscribe(:rsscloud, topic, callback, @subscription_lease_seconds, + diff_domain: diff_domain + ) + end) + |> Enum.split_with(fn res -> elem(res, 0) == :ok end) + + case bad do + [] -> hd(good) + _ -> hd(bad) + end + else + error -> error + end + + handle_response(result, conn) + end + + defp parse_body_params( + %{"protocol" => protocol, "port" => port, "path" => path} = params, + remote_ip + ) do + scheme = + case protocol do + "http-rest" -> "http" + "https-rest" -> "https" + _ -> nil + end + + port = + case Integer.parse(port) do + {p, ""} -> p + _ -> nil + end + + cond do + is_nil(scheme) -> + Logger.error("protocol '#{protocol}' invalid") + {:error, :invalid_request} + + is_nil(port) -> + Logger.error("port '#{port}' invalid") + {:error, :invalid_request} + + String.first(path) != "/" -> + Logger.error("path '#{path}' invalid") + {:error, :invalid_request} + + true -> + Enum.reduce(params, [], fn {k, v}, acc -> + if Regex.match?(~r/^url\d+$/, k) do + [v | acc] + else + acc + end + end) + |> case do + [] -> + Logger.error("no urls parsed") + {:error, :invalid_request} + + topics -> + domain = + case Map.get(params, "domain", remote_ip) do + "127.0.0.1" -> "localhost" + ip_or_address -> ip_or_address + end + + callback = "#{scheme}://#{domain}:#{port}#{path}" + + # diff_domain = domain != remote_ip + diff_domain = Map.has_key?(params, "domain") + {:ok, {callback, topics, diff_domain}} + end + end + end + + defp parse_body_params(_invalid_params, _remote_ip), do: {:error, :invalid_request} + + defp handle_response({:ok, _subscription}, conn) do + conn + |> Phoenix.Controller.json(%{success: true, msg: "subscribed"}) + |> Plug.Conn.halt() + end + + defp handle_response({:error, message}, conn) when is_binary(message) do + conn + |> Plug.Conn.put_status(500) + |> Phoenix.Controller.json(%{success: false, msg: message}) + |> Plug.Conn.halt() + end + + defp handle_response({:error, reason}, conn) when is_atom(reason) do + {status_code, message} = + case reason do + :invalid_request -> {400, "invalid_request"} + :failed_challenge_body -> {403, "failed_challenge_body"} + :failed_404_response -> {403, "failed_404_response"} + :failed_unknown_response -> {403, "failed_unknown_response"} + :failed_unknown_error -> {500, "failed_unknown_error"} + :unimplemented -> {500, "unimplemented"} + _ -> {500, "failed_unknown_reason"} + end + + conn + |> Plug.Conn.put_status(status_code) + |> Phoenix.Controller.json(%{success: false, msg: message}) + |> Plug.Conn.halt() + end + + defp handle_response({:error, error}, conn) do + Logger.error("RSSCloudController unknown error: #{inspect(error)}") + + conn + |> Plug.Conn.put_status(500) + |> Phoenix.Controller.json(%{success: false, msg: "unknown error"}) + |> Plug.Conn.halt() + end +end diff --git a/lib/pleroma/web/feed/websub_controller.ex b/lib/pleroma/web/feed/websub_controller.ex new file mode 100644 index 000000000..f821b0d3b --- /dev/null +++ b/lib/pleroma/web/feed/websub_controller.ex @@ -0,0 +1,78 @@ +defmodule Pleroma.Web.Feed.WebSubController do + use Pleroma.Web, :controller + + alias Pleroma.Feed.Subscriptions + alias Pleroma.Feed.Updates + + # By default WebSub subscriptions expire after 10 days + @subscription_lease_seconds 864_000 + + def hub(conn, _params) do + conn + |> handle_request(conn.params) + end + + defp handle_request( + conn, + %{"hub.mode" => "subscribe", "hub.topic" => topic, "hub.callback" => callback} = params + ) do + lease_seconds = Map.get(params, "hub.lease_seconds", @subscription_lease_seconds) + secret = Map.get(params, "hub.secret") + + Subscriptions.subscribe(:websub, topic, callback, lease_seconds, secret: secret) + |> handle_response(conn) + end + + defp handle_request(conn, %{ + "hub.mode" => "unsubscribe", + "hub.topic" => topic, + "hub.callback" => callback + }) do + Subscriptions.unsubscribe(topic, callback) + |> handle_response(conn) + end + + defp handle_request(conn, %{"hub.mode" => "publish", "hub.topic" => topic}) do + Updates.publish(topic) + |> handle_response(conn) + end + + defp handle_request(conn, %{"hub.mode" => "publish", "hub.url" => topic}) do + # Compatability with https://pubsubhubbub.appspot.com/ + Updates.publish(topic) + |> handle_response(conn) + end + + defp handle_response({:ok, _message}, conn) do + conn + |> Plug.Conn.send_resp(202, "") + |> Plug.Conn.halt() + end + + defp handle_response({:error, message}, conn) when is_binary(message) do + conn + |> Plug.Conn.send_resp(500, message) + |> Plug.Conn.halt() + end + + defp handle_response({:error, reason}, conn) when is_atom(reason) do + {status_code, message} = + case reason do + :failed_challenge_body -> {403, "failed_challenge_body"} + :failed_404_response -> {403, "failed_404_response"} + :failed_unknown_response -> {403, "failed_unknown_response"} + :failed_unknown_error -> {500, "failed_unknown_error"} + _ -> {500, "failed_unknown_reason"} + end + + conn + |> Plug.Conn.send_resp(status_code, message) + |> Plug.Conn.halt() + end + + defp handle_response({:error, _}, conn) do + conn + |> Plug.Conn.send_resp(500, "unknown error") + |> Plug.Conn.halt() + end +end diff --git a/lib/pleroma/web/router.ex b/lib/pleroma/web/router.ex index a34dd26ce..78b0d6242 100644 --- a/lib/pleroma/web/router.ex +++ b/lib/pleroma/web/router.ex @@ -843,6 +843,23 @@ defmodule Pleroma.Web.Router do get("/:sig/:url/:filename", MediaProxy.MediaProxyController, :remote) end + pipeline :accepts_www_forms do + plug :accepts, ["x-www-form-urlencoded"] + end + + scope "/hub", Pleroma.Web.Feed do + pipe_through :accepts_www_forms + + post "/", WebSubController, :hub + end + + scope "/rsscloud", Pleroma.Web.Feed do + pipe_through :accepts_www_forms + + post "/ping", RSSCloudController, :ping + post "/pleaseNotify", RSSCloudController, :please_notify + end + if Pleroma.Config.get(:env) == :dev do scope "/dev" do pipe_through([:mailbox_preview]) diff --git a/lib/pleroma/workers/dispatch_feed_update_worker.ex b/lib/pleroma/workers/dispatch_feed_update_worker.ex new file mode 100644 index 000000000..0dfaeae00 --- /dev/null +++ b/lib/pleroma/workers/dispatch_feed_update_worker.ex @@ -0,0 +1,120 @@ +defmodule Pleroma.Workers.DispatchFeedUpdateWorker do + use Oban.Worker, queue: :feed_updates, max_attempts: 3 + require Logger + + alias Pleroma.HTTP + + alias Pleroma.Feed.Update + alias Pleroma.Feed.Updates + + @impl Oban.Worker + def perform(%Oban.Job{ + args: %{ + "update_id" => update_id, + "subscription_id" => subscription_id, + "subscription_api" => api, + "callback_url" => callback_url, + "secret" => secret + } + }) do + with %Update{} = update <- Pleroma.Feed.Updates.get_update_and_topic(update_id) do + topic_url = update.topic.url + api = String.to_existing_atom(api) + + perform_request(api, callback_url, topic_url, update, secret) + |> log_request(update.id, subscription_id) + else + # In case update has already been removed. + _ -> + Logger.error("Could not find update #{update_id}") + {:error, "Update not found"} + end + end + + defp perform_request(:websub, callback_url, topic_url, update, secret) do + links = [ + "<#{topic_url}>; rel=self", + "; rel=hub" + ] + + headers = [ + {"content-type", update.content_type}, + {"link", Enum.join(links, ", ")} + ] + + headers = + if secret do + hmac = :crypto.mac(:hmac, :sha256, secret, update.body) |> Base.encode16(case: :lower) + [{"x-hub-signature", "sha256=" <> hmac} | headers] + else + headers + end + + case HTTP.post(callback_url, update.body, headers) do + {:ok, %Tesla.Env{status: code}} when code >= 200 and code < 300 -> + Logger.debug("WebSub got OK response from #{callback_url}") + {:ok, code} + + {:ok, %Tesla.Env{status: 410}} -> + # Invalidate this subscription + {:ok, 410} + + {:ok, %Tesla.Env{status: code}} -> + {:failed, code} + + {:error, reason} -> + {:error, reason} + end + end + + defp perform_request(:rsscloud, callback_url, topic_url, _update, _secret) do + body = %{url: topic_url} |> URI.encode_query() + headers = [{"content-type", "application/x-www-form-urlencoded"}] + + case HTTP.post(callback_url, body, headers) do + {:ok, %Tesla.Env{status: code}} when code >= 200 and code < 300 -> + Logger.debug("RSSCloud got OK response from #{callback_url}") + {:ok, code} + + {:ok, %Tesla.Env{status: 410}} -> + # Invalidate this subscription + {:ok, 410} + + {:ok, %Tesla.Env{status: code}} -> + {:failed, code} + + {:error, reason} -> + Logger.error("RSSCloud got ERROR at #{callback_url}: #{inspect(reason)}") + {:error, 500} + end + end + + defp perform_request(api, _callback_url, _topic_url, _update, _secret) do + Logger.error("No such api #{api}") + {:error, "invalid api"} + end + + defp log_request(res, update_id, subscription_id) do + status_code = + case res do + {_, code} when is_integer(code) -> + code + + _ -> + 599 + end + + # Will fail if either update at update_id or subscription at subscription_id is gone + _ = + case Updates.create_subscription_update(update_id, subscription_id, status_code) do + {:ok, %{id: id}} -> + Logger.debug("New update #{id} for subscription #{subscription_id}") + + {:error, changeset} -> + Logger.error("Failed to create update for subscription #{subscription_id}") + Logger.error(" -> #{inspect(changeset.errors)}") + end + + res + end +end diff --git a/lib/pleroma/workers/prune_feed_subscriptions_worker.ex b/lib/pleroma/workers/prune_feed_subscriptions_worker.ex new file mode 100644 index 000000000..9250c1b89 --- /dev/null +++ b/lib/pleroma/workers/prune_feed_subscriptions_worker.ex @@ -0,0 +1,46 @@ +defmodule Pleroma.Workers.PruneFeedSubscriptionsWorker do + use Oban.Worker, queue: :prune_feed_subscriptions, max_attempts: 1 + require Logger + + alias Pleroma.Feed.Subscription + alias Pleroma.Feed.Subscriptions + + # Orphaned topics live for another 3 hours + @topic_lease_seconds 10_800 + + @impl Oban.Worker + def perform(%Oban.Job{args: args}) do + expiring = + case Map.get(args, "expiring") do + expiring when is_binary(expiring) -> + {:ok, expiring, _} = DateTime.from_iso8601(expiring) + DateTime.to_naive(expiring) + + _ -> + NaiveDateTime.utc_now() + end + + Logger.error("Pruning subscriptions and topics expiring before #{expiring}") + + _ = + Subscriptions.list_inactive_subscriptions(expiring) + |> Enum.map(fn %Subscription{id: id} = subscription -> + _ = Subscriptions.final_unsubscribe(subscription) + id + end) + + _ = + case Subscriptions.delete_all_inactive_subscriptions(@topic_lease_seconds, expiring) do + {:error, reason} -> + Logger.error("Transaction error deleting inactive subscriptions: #{reason}") + + {count, _, _} -> + Logger.error("Deleted #{count} inactive subscriptions") + end + + {count, _} = Subscriptions.delete_all_inactive_topics(expiring) + Logger.error("Deleted #{count} inactive topics") + + :ok + end +end diff --git a/test/pleroma/feed/query_test.exs b/test/pleroma/feed/query_test.exs new file mode 100644 index 000000000..355cc20f0 --- /dev/null +++ b/test/pleroma/feed/query_test.exs @@ -0,0 +1,142 @@ +defmodule Pleroma.Feed.QueryTest do + use Pleroma.DataCase + + alias Pleroma.Feed.Subscriptions + alias Pleroma.Feed.SubscriptionUpdate + alias Pleroma.Feed.Topic + alias Pleroma.Feed.Updates + + @subscription_lease_seconds 1 + @topic_lease_seconds 5 + + test "delete_all subscriptions cascades" do + %{topics: _, subscriptions: subscriptions} = build_data() + + subscription_updates = + subscriptions + |> Enum.map(fn subscription -> + subscription = Repo.preload(subscription, :topic) + topic = subscription.topic + env = %Tesla.Env{status: 200, body: topic.url} + + Enum.map(1..5, fn _i -> + {:ok, update} = Updates.create_update(topic, env) + + Enum.map(1..5, fn _j -> + {:ok, subscription_update} = + Updates.create_subscription_update(update.id, subscription.id, 200) + + subscription_update + end) + end) + end) + |> List.flatten() + + assert 100 == Enum.count(subscription_updates) + + expiring = + DateTime.utc_now() + |> DateTime.add(60, :second) + + assert {4, 2, _} = + Subscriptions.delete_all_inactive_subscriptions(@topic_lease_seconds, expiring) + + assert 0 == SubscriptionUpdate |> Repo.aggregate(:count, :id) + end + + test "removing all subscriptions for a topic sets an expiration" do + %{topics: topics, subscriptions: _} = build_data() + + [topic1_exp, topic2_exp] = + topics + |> Enum.with_index(fn topic, i -> + topic_url = topic.url + %Topic{subscriptions: subscriptions} = Repo.preload(topic, :subscriptions) + + now = NaiveDateTime.utc_now() + + # Remove one sub in topic1 + # Remove two subs (all of them) in topic2 + 0..i + |> Enum.map(fn j -> + Enum.at(subscriptions, j) + |> Subscriptions.delete_subscription(@topic_lease_seconds, now) + end) + + %Topic{expires_at: expires_at} = Subscriptions.get_topic_by_url(topic_url) + expires_at + end) + + assert NaiveDateTime.compare(topic1_exp, ~N[2040-01-01 00:00:00]) == :gt + # After removing all subs in topic2, topic2 will expire soon! + assert NaiveDateTime.compare(topic2_exp, ~N[2040-01-01 00:00:00]) == :lt + end + + test "prunes topics after removing all subscriptions" do + %{topics: _, subscriptions: _} = build_data() + + expiring = Subscriptions.from_now(2 * @subscription_lease_seconds) + + assert {4, 2, _} = + Subscriptions.delete_all_inactive_subscriptions(@topic_lease_seconds, expiring) + + expiring = NaiveDateTime.add(expiring, 2 * @topic_lease_seconds, :second) + topics_will_expire = Subscriptions.list_inactive_topics(expiring) + assert Enum.count(topics_will_expire) == 2 + + assert {2, _} = Subscriptions.delete_all_inactive_topics(expiring) + end + + def build_data() do + {:ok, topic1} = Subscriptions.find_or_create_topic("http://publisher/topic1") + {:ok, topic2} = Subscriptions.find_or_create_topic("http://publisher/topic2") + assert topic1.id != topic2.id + + {:ok, sub1_topic1} = + Subscriptions.find_or_create_subscription( + :websub, + topic1, + "http://subscriber/sub1", + @subscription_lease_seconds, + [] + ) + + {:ok, sub1_topic2} = + Subscriptions.find_or_create_subscription( + :websub, + topic2, + "http://subscriber/sub1", + @subscription_lease_seconds, + [] + ) + + {:ok, sub2_topic1} = + Subscriptions.find_or_create_subscription( + :websub, + topic1, + "http://subscriber/sub2", + @subscription_lease_seconds, + [] + ) + + {:ok, sub2_topic2} = + Subscriptions.find_or_create_subscription( + :websub, + topic2, + "http://subscriber/sub2", + @subscription_lease_seconds, + [] + ) + + topic1 = Repo.preload(topic1, :subscriptions) + assert 2 == Enum.count(topic1.subscriptions) + + topic2 = Repo.preload(topic2, :subscriptions) + assert 2 == Enum.count(topic2.subscriptions) + + %{ + topics: [topic1, topic2], + subscriptions: [sub1_topic1, sub1_topic2, sub2_topic1, sub2_topic2] + } + end +end diff --git a/test/pleroma/feed/rsscloud_test.exs b/test/pleroma/feed/rsscloud_test.exs new file mode 100644 index 000000000..85401c32f --- /dev/null +++ b/test/pleroma/feed/rsscloud_test.exs @@ -0,0 +1,479 @@ +defmodule Pleroma.Feed.RSSCloudTest do + use Pleroma.DataCase + use Oban.Testing, repo: Pleroma.Repo + + @subscription_lease_seconds 90_000 + @content_type_text_plain [{"content-type", "text/plain"}] + @html_body """ + + + + + blah + + +

I'm the content

+ + + """ + @text_body "Hello world" + @json_body %{"hello" => "world"} + @xml_body """ + + + + Scripting News + http://scripting.com/ + It's even worse than it appears.. + Wed, 14 Dec 2022 16:36:13 GMT + Wed, 14 Dec 2022 17:54:45 GMT + + The idea of textcasting is like podcasting. Wed, 14 Dec 2022 13:44:21 GMT + http://scripting.com/2022/12/14.html#a134421 + http://scripting.com/2022/12/14.html#a134421 + + + + """ + + @moduledoc """ + Implements the tests described by https://websub.rocks/hub + """ + + alias Pleroma.Feed.Updates + alias Pleroma.Feed.Subscriptions + + describe "100 - Typical subscriber request" do + @doc """ + This subscriber will include only the parameters hub.mode, hub.topic and hub.callback. The hub should deliver notifications with no signature. + """ + + setup :setup_html_publisher + + test "100 - Typical subscriber request", %{ + subscriber_url: callback_url, + publisher_url: topic_url + } do + assert {:ok, subscription} = + Subscriptions.subscribe( + :rsscloud, + topic_url, + callback_url, + @subscription_lease_seconds + ) + + assert {:ok, update} = Updates.publish(topic_url) + assert update.content_type == "text/html; charset=UTF-8" + + assert_enqueued( + worker: Pleroma.Workers.DispatchFeedUpdateWorker, + args: %{ + update_id: update.id, + subscription_id: subscription.id, + subscription_api: "rsscloud", + callback_url: callback_url, + secret: nil + } + ) + + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :feed_updates) + + assert Tesla.Mock.Agent.hits(:publisher) == 1 + assert Tesla.Mock.Agent.hits(:subscriber) == 2 + + [_challenge, publish] = Tesla.Mock.Agent.access_list(:subscriber) + assert publish.body == "url=" <> URI.encode_www_form(topic_url) + + assert Tesla.get_header(publish, "content-type") == + "application/x-www-form-urlencoded" + end + + test "does not get publish if already unsubscribed", %{ + subscriber_url: callback_url, + publisher_url: topic_url + } do + assert {:ok, subscription} = + Subscriptions.subscribe( + :rsscloud, + topic_url, + callback_url, + @subscription_lease_seconds + ) + + {:ok, _} = Subscriptions.unsubscribe(topic_url, callback_url) + + # Quick sleep + :timer.sleep(1000) + + assert {:ok, update} = Updates.publish(topic_url) + + refute_enqueued( + worker: Pleroma.Workers.DispatchFeedUpdateWorker, + args: %{ + update_id: update.id, + subscription_id: subscription.id, + callback_url: callback_url, + secret: nil + } + ) + + assert Tesla.Mock.Agent.hits(:publisher) == 1 + + # Note that :rsscloud does not notify on unsubscription + assert Tesla.Mock.Agent.hits(:subscriber) == 1 + end + end + + describe "102 - Subscriber sends additional parameters" do + @doc """ + This subscriber will include some additional parameters in the request, which must be ignored by the hub if the hub doesn't recognize them. + """ + test "102 - Subscriber sends additional parameters", %{} do + end + end + + @doc """ + This subscriber tests whether the hub allows subscriptions to be re-subscribed before they expire. The hub must allow a subscription to be re-activated, and must update the previous subscription based on the topic+callback pair, rather than creating a new subscription. + """ + test "103 - Subscriber re-subscribes before the subscription expires", %{} do + end + + @doc """ + This test will first subscribe to a topic, and will then send an unsubscription request. You will be able to test that the unsubscription is confirmed by seeing that a notification is not received when a new post is published. + """ + test "104 - Unsubscribe request", %{} do + end + + describe "105 - Plaintext content" do + @doc """ + This test will check whether your hub can handle delivering content that is not HTML or XML. The content at the topic URL of this test is plaintext. + """ + + setup :setup_text_publisher + + test "105 - Plaintext content", %{ + subscriber_url: callback_url, + publisher_url: topic_url + } do + assert {:ok, subscription} = + Subscriptions.subscribe( + :rsscloud, + topic_url, + callback_url, + @subscription_lease_seconds + ) + + assert {:ok, update} = Updates.publish(topic_url) + assert update.content_type == "text/plain" + + assert_enqueued( + worker: Pleroma.Workers.DispatchFeedUpdateWorker, + args: %{ + update_id: update.id, + subscription_id: subscription.id, + subscription_api: "rsscloud", + callback_url: callback_url, + secret: nil + } + ) + + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :feed_updates) + + assert Tesla.Mock.Agent.hits(:publisher) == 1 + assert Tesla.Mock.Agent.hits(:subscriber) == 2 + + [_challenge, publish] = Tesla.Mock.Agent.access_list(:subscriber) + assert publish.body == "url=" <> URI.encode_www_form(topic_url) + + assert Tesla.get_header(publish, "content-type") == + "application/x-www-form-urlencoded" + end + end + + describe "106 - JSON content" do + @doc """ + This test will check whether your hub can handle delivering content that is not HTML or XML. The content at the topic URL of this test is JSON. + """ + + setup :setup_json_publisher + + test "106 - JSON content", %{ + subscriber_url: callback_url, + publisher_url: topic_url + } do + assert {:ok, subscription} = + Subscriptions.subscribe( + :rsscloud, + topic_url, + callback_url, + @subscription_lease_seconds + ) + + assert {:ok, update} = Updates.publish(topic_url) + assert update.content_type == "application/json" + + assert_enqueued( + worker: Pleroma.Workers.DispatchFeedUpdateWorker, + args: %{ + update_id: update.id, + subscription_id: subscription.id, + subscription_api: "rsscloud", + callback_url: callback_url, + secret: nil + } + ) + + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :feed_updates) + + assert Tesla.Mock.Agent.hits(:publisher) == 1 + assert Tesla.Mock.Agent.hits(:subscriber) == 2 + + [_challenge, publish] = Tesla.Mock.Agent.access_list(:subscriber) + assert publish.body == "url=" <> URI.encode_www_form(topic_url) + + assert Tesla.get_header(publish, "content-type") == + "application/x-www-form-urlencoded" + end + end + + describe "XML content" do + @doc """ + This test will check whether your hub can handle delivering content that is not HTML or XML. The content at the topic URL of this test is JSON. + """ + + setup :setup_xml_publisher + + test "XML content", %{ + subscriber_url: callback_url, + publisher_url: topic_url + } do + assert {:ok, subscription} = + Subscriptions.subscribe( + :rsscloud, + topic_url, + callback_url, + @subscription_lease_seconds + ) + + assert {:ok, update} = Updates.publish(topic_url) + assert update.content_type == "application/rss+xml" + + assert_enqueued( + worker: Pleroma.Workers.DispatchFeedUpdateWorker, + args: %{ + update_id: update.id, + subscription_id: subscription.id, + subscription_api: "rsscloud", + callback_url: callback_url, + secret: nil + } + ) + + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :feed_updates) + + assert Tesla.Mock.Agent.hits(:publisher) == 1 + assert Tesla.Mock.Agent.hits(:subscriber) == 2 + + [_challenge, publish] = Tesla.Mock.Agent.access_list(:subscriber) + assert publish.body == "url=" <> URI.encode_www_form(topic_url) + + assert Tesla.get_header(publish, "content-type") == + "application/x-www-form-urlencoded" + end + end + + describe "pruning" do + @doc """ + This test will check whether we can prune expired subscriptions. + """ + + setup :setup_xml_publisher + + test "after typical subscriber request", %{ + subscriber_url: callback_url, + publisher_url: topic_url + } do + assert {:ok, subscription} = + Subscriptions.subscribe( + :rsscloud, + topic_url, + callback_url, + @subscription_lease_seconds + ) + + assert {:ok, update} = Updates.publish(topic_url) + assert update.content_type == "application/rss+xml" + + assert_enqueued( + worker: Pleroma.Workers.DispatchFeedUpdateWorker, + args: %{ + update_id: update.id, + subscription_id: subscription.id, + subscription_api: "rsscloud", + callback_url: callback_url, + secret: nil + } + ) + + expiring = + Subscriptions.from_now(1_000_000) + |> DateTime.from_naive!("Etc/UTC") + |> DateTime.to_iso8601() + + Pleroma.Workers.PruneFeedSubscriptionsWorker.new(%{expiring: expiring}) + |> Oban.insert() + + assert_enqueued( + worker: Pleroma.Workers.PruneFeedSubscriptionsWorker, + args: %{ + expiring: expiring + } + ) + + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :prune_feed_subscriptions) + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :feed_updates) + end + end + + def setup_html_publisher(_) do + publisher_url = "http://localhost/publisher/posts" + subscriber_url = "http://localhost/subscriber/callback" + + Tesla.Mock.mock(fn + %{url: ^publisher_url} = req -> + Tesla.Mock.Agent.add_hit(:publisher, req) + + %Tesla.Env{ + status: 200, + body: @html_body, + headers: [ + {"content-type", "text/html; charset=UTF-8"} + ] + } + + %{url: ^subscriber_url} = req -> + Tesla.Mock.Agent.add_hit(:subscriber, req) + + %Tesla.Env{ + status: 200, + body: "ok", + headers: @content_type_text_plain + } + + _not_matched -> + %Tesla.Env{ + status: 404, + body: "not found", + headers: @content_type_text_plain + } + end) + + [publisher_url: publisher_url, subscriber_url: subscriber_url] + end + + def setup_text_publisher(_) do + publisher_url = "http://localhost/publisher/posts" + subscriber_url = "http://localhost/subscriber/callback" + + Tesla.Mock.mock(fn + %{url: ^publisher_url} = req -> + Tesla.Mock.Agent.add_hit(:publisher, req) + + %Tesla.Env{ + status: 200, + body: @text_body, + headers: @content_type_text_plain + } + + %{url: ^subscriber_url} = req -> + Tesla.Mock.Agent.add_hit(:subscriber, req) + + %Tesla.Env{ + status: 200, + body: "ok", + headers: @content_type_text_plain + } + + _not_matched -> + %Tesla.Env{ + status: 404, + body: "not found", + headers: @content_type_text_plain + } + end) + + [publisher_url: publisher_url, subscriber_url: subscriber_url] + end + + def setup_json_publisher(_) do + publisher_url = "http://localhost/publisher/posts" + subscriber_url = "http://localhost/subscriber/callback" + + Tesla.Mock.mock(fn + %{url: ^publisher_url} = req -> + Tesla.Mock.Agent.add_hit(:publisher, req) + + %Tesla.Env{ + status: 200, + body: Jason.encode!(@json_body), + headers: [ + {"content-type", "application/json"} + ] + } + + %{url: ^subscriber_url} = req -> + Tesla.Mock.Agent.add_hit(:subscriber, req) + + %Tesla.Env{ + status: 200, + body: "ok", + headers: @content_type_text_plain + } + + _not_matched -> + %Tesla.Env{ + status: 404, + body: "not found", + headers: @content_type_text_plain + } + end) + + [publisher_url: publisher_url, subscriber_url: subscriber_url] + end + + def setup_xml_publisher(_) do + publisher_url = "http://localhost/publisher/posts" + subscriber_url = "http://localhost/subscriber/callback" + + Tesla.Mock.mock(fn + %{url: ^publisher_url} = req -> + Tesla.Mock.Agent.add_hit(:publisher, req) + + %Tesla.Env{ + status: 200, + body: @xml_body, + headers: [ + {"content-type", "application/rss+xml"} + ] + } + + %{url: ^subscriber_url} = req -> + Tesla.Mock.Agent.add_hit(:subscriber, req) + + %Tesla.Env{ + status: 200, + body: "ok", + headers: @content_type_text_plain + } + + _not_matched -> + %Tesla.Env{ + status: 404, + body: "not found", + headers: @content_type_text_plain + } + end) + + [publisher_url: publisher_url, subscriber_url: subscriber_url] + end +end diff --git a/test/pleroma/feed/subscriptions_test.exs b/test/pleroma/feed/subscriptions_test.exs new file mode 100644 index 000000000..2602dfe7d --- /dev/null +++ b/test/pleroma/feed/subscriptions_test.exs @@ -0,0 +1,3 @@ +defmodule Pleroma.Feed.SubscriptionsTest do + use Pleroma.DataCase +end diff --git a/test/pleroma/feed/updates_test.exs b/test/pleroma/feed/updates_test.exs new file mode 100644 index 000000000..329729586 --- /dev/null +++ b/test/pleroma/feed/updates_test.exs @@ -0,0 +1,80 @@ +defmodule Pleroma.Feed.UpdatesTest do + use Pleroma.DataCase + use Oban.Testing, repo: Pleroma.Repo + + alias Pleroma.Feed.Subscriptions + alias Pleroma.Feed.Updates + + @subscription_lease_seconds 5 + @content_type_text_plain [{"content-type", "text/plain"}] + @html_body """ + + + + + blah + + +

I'm the content

+ + + """ + + describe "updates" do + test "publishing update dispatches jobs" do + topic_url = "https://localhost/publisher/topic/123" + callback_url = "http://localhost/subscriber/callback" + + Tesla.Mock.mock(fn + %{url: ^topic_url} = req -> + Tesla.Mock.Agent.add_hit(:publisher, req) + + %Tesla.Env{ + status: 200, + body: @html_body, + headers: [ + {"content-type", "text/html; charset=UTF-8"} + ] + } + + %{method: :get, url: ^callback_url, query: query} = req -> + Tesla.Mock.Agent.add_hit(:subscriber, req) + + query = Map.new(query) + + if Map.has_key?(query, "hub.challenge") do + %Tesla.Env{ + status: 200, + body: Map.get(query, "hub.challenge"), + headers: @content_type_text_plain + } + else + %Tesla.Env{status: 400, body: "no challenge", headers: @content_type_text_plain} + end + + _not_matched -> + %Tesla.Env{ + status: 404, + body: "not found", + headers: @content_type_text_plain + } + end) + + assert {:ok, _} = + Subscriptions.subscribe( + :websub, + topic_url, + callback_url, + @subscription_lease_seconds + ) + + assert {:ok, update} = Updates.publish(topic_url) + + assert Tesla.Mock.Agent.hits(:subscriber) == 1 + + assert [job] = all_enqueued(worker: Pleroma.Workers.DispatchFeedUpdateWorker) + assert job.args["update_id"] == update.id + assert job.args["callback_url"] == callback_url + end + end +end diff --git a/test/pleroma/feed/websub_test.exs b/test/pleroma/feed/websub_test.exs new file mode 100644 index 000000000..c90fbec06 --- /dev/null +++ b/test/pleroma/feed/websub_test.exs @@ -0,0 +1,472 @@ +defmodule Pleroma.Feed.WebSubTest do + use Pleroma.DataCase + use Oban.Testing, repo: Pleroma.Repo + + # WebSub subcriptions expire after 10 days + @subscription_lease_seconds 864_000 + @content_type_text_plain [{"content-type", "text/plain"}] + @html_body """ + + + + + blah + + +

I'm the content

+ + + """ + @text_body "Hello world" + @json_body %{"hello" => "world"} + @xml_body """ + + + + Scripting News + http://scripting.com/ + It's even worse than it appears.. + Wed, 14 Dec 2022 16:36:13 GMT + Wed, 14 Dec 2022 17:54:45 GMT + + The idea of textcasting is like podcasting. Wed, 14 Dec 2022 13:44:21 GMT + http://scripting.com/2022/12/14.html#a134421 + http://scripting.com/2022/12/14.html#a134421 + + + + """ + + @moduledoc """ + Implements the tests described by https://websub.rocks/hub + """ + + alias Pleroma.Feed.Updates + alias Pleroma.Feed.Subscriptions + + describe "100 - Typical subscriber request" do + @doc """ + This subscriber will include only the parameters hub.mode, hub.topic and hub.callback. The hub should deliver notifications with no signature. + """ + + setup :setup_html_publisher + + test "100 - Typical subscriber request", %{ + subscriber_url: callback_url, + publisher_url: topic_url + } do + assert {:ok, subscription} = + Subscriptions.subscribe( + :websub, + topic_url, + callback_url, + @subscription_lease_seconds + ) + + assert {:ok, update} = Updates.publish(topic_url) + + assert_enqueued( + worker: Pleroma.Workers.DispatchFeedUpdateWorker, + args: %{ + update_id: update.id, + subscription_id: subscription.id, + subscription_api: "websub", + callback_url: callback_url, + secret: nil + } + ) + + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :feed_updates) + + assert Tesla.Mock.Agent.hits(:publisher) == 1 + assert Tesla.Mock.Agent.hits(:subscriber) == 2 + [_challenge, publish] = Tesla.Mock.Agent.access_list(:subscriber) + assert publish.body == @html_body + end + + test "does not get publish if already unsubscribed", %{ + subscriber_url: callback_url, + publisher_url: topic_url + } do + assert {:ok, subscription} = + Subscriptions.subscribe( + :websub, + topic_url, + callback_url, + @subscription_lease_seconds + ) + + {:ok, _} = Subscriptions.unsubscribe(topic_url, callback_url) + + # Quick sleep + :timer.sleep(1000) + + assert {:ok, update} = Updates.publish(topic_url) + + refute_enqueued( + worker: Pleroma.Workers.DispatchFeedUpdateWorker, + args: %{ + update_id: update.id, + subscription_id: subscription.id, + callback_url: callback_url, + secret: nil + } + ) + + assert Tesla.Mock.Agent.hits(:publisher) == 1 + assert Tesla.Mock.Agent.hits(:subscriber) == 2 + end + end + + describe "101 - Subscriber includes a secret" do + @doc """ + This subscriber will include the parameters hub.mode, hub.topic, hub.callback and hub.secret. The hub should deliver notifications with a signature computed using this secret. + """ + + setup :setup_html_publisher + + test "101 - Subscriber includes a secret", %{ + subscriber_url: callback_url, + publisher_url: topic_url + } do + {:ok, subscription} = + Subscriptions.subscribe(:websub, topic_url, callback_url, @subscription_lease_seconds, + secret: "some_secret" + ) + + assert {:ok, update} = Updates.publish(topic_url) + + assert_enqueued( + worker: Pleroma.Workers.DispatchFeedUpdateWorker, + args: %{ + update_id: update.id, + subscription_id: subscription.id, + subscription_api: "websub", + callback_url: callback_url, + secret: "some_secret" + } + ) + + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :feed_updates) + + assert Tesla.Mock.Agent.hits(:publisher) == 1 + assert Tesla.Mock.Agent.hits(:subscriber) == 2 + + [_challenge, publish] = Tesla.Mock.Agent.access_list(:subscriber) + assert publish.body == @html_body + + assert Tesla.get_header(publish, "x-hub-signature") == + "sha256=9d63c6c06dca350aaa6955f9e4017b801fc56b4a904f2e4dab68652b6abfda4c" + end + end + + describe "102 - Subscriber sends additional parameters" do + @doc """ + This subscriber will include some additional parameters in the request, which must be ignored by the hub if the hub doesn't recognize them. + """ + test "102 - Subscriber sends additional parameters", %{} do + end + end + + @doc """ + This subscriber tests whether the hub allows subscriptions to be re-subscribed before they expire. The hub must allow a subscription to be re-activated, and must update the previous subscription based on the topic+callback pair, rather than creating a new subscription. + """ + test "103 - Subscriber re-subscribes before the subscription expires", %{} do + end + + @doc """ + This test will first subscribe to a topic, and will then send an unsubscription request. You will be able to test that the unsubscription is confirmed by seeing that a notification is not received when a new post is published. + """ + test "104 - Unsubscribe request", %{} do + end + + describe "105 - Plaintext content" do + @doc """ + This test will check whether your hub can handle delivering content that is not HTML or XML. The content at the topic URL of this test is plaintext. + """ + + setup :setup_text_publisher + + test "105 - Plaintext content", %{ + subscriber_url: callback_url, + publisher_url: topic_url + } do + assert {:ok, subscription} = + Subscriptions.subscribe( + :websub, + topic_url, + callback_url, + @subscription_lease_seconds + ) + + assert {:ok, update} = Updates.publish(topic_url) + + assert_enqueued( + worker: Pleroma.Workers.DispatchFeedUpdateWorker, + args: %{ + update_id: update.id, + subscription_id: subscription.id, + subscription_api: "websub", + callback_url: callback_url, + secret: nil + } + ) + + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :feed_updates) + + assert Tesla.Mock.Agent.hits(:publisher) == 1 + assert Tesla.Mock.Agent.hits(:subscriber) == 2 + + [_challenge, publish] = Tesla.Mock.Agent.access_list(:subscriber) + assert publish.body == @text_body + assert Tesla.get_header(publish, "content-type") == "text/plain" + + assert Tesla.get_header(publish, "link") == + "<#{topic_url}>; rel=self, ; rel=hub" + end + end + + describe "106 - JSON content" do + @doc """ + This test will check whether your hub can handle delivering content that is not HTML or XML. The content at the topic URL of this test is JSON. + """ + + setup :setup_json_publisher + + test "106 - JSON content", %{ + subscriber_url: callback_url, + publisher_url: topic_url + } do + assert {:ok, subscription} = + Subscriptions.subscribe( + :websub, + topic_url, + callback_url, + @subscription_lease_seconds + ) + + assert {:ok, update} = Updates.publish(topic_url) + + assert_enqueued( + worker: Pleroma.Workers.DispatchFeedUpdateWorker, + args: %{ + update_id: update.id, + subscription_id: subscription.id, + subscription_api: "websub", + callback_url: callback_url, + secret: nil + } + ) + + assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :feed_updates) + + assert Tesla.Mock.Agent.hits(:publisher) == 1 + assert Tesla.Mock.Agent.hits(:subscriber) == 2 + + [_challenge, publish] = Tesla.Mock.Agent.access_list(:subscriber) + assert Jason.decode!(publish.body) == @json_body + assert Tesla.get_header(publish, "content-type") == "application/json" + + assert Tesla.get_header(publish, "link") == + "<#{topic_url}>; rel=self, ; rel=hub" + end + end + + def setup_html_publisher(_) do + publisher_url = "http://localhost/publisher/posts" + subscriber_url = "http://localhost/subscriber/callback" + + Tesla.Mock.mock(fn + %{url: ^publisher_url} = req -> + Tesla.Mock.Agent.add_hit(:publisher, req) + + %Tesla.Env{ + status: 200, + body: @html_body, + headers: [ + {"content-type", "text/html; charset=UTF-8"} + ] + } + + %{url: ^subscriber_url, method: :get, query: query} = req -> + Tesla.Mock.Agent.add_hit(:subscriber, req) + query = Map.new(query) + + if Map.has_key?(query, "hub.challenge") do + %Tesla.Env{ + status: 200, + body: Map.get(query, "hub.challenge"), + headers: @content_type_text_plain + } + else + %Tesla.Env{status: 400, body: "no challenge", headers: @content_type_text_plain} + end + + %{url: ^subscriber_url, method: :post} = req -> + Tesla.Mock.Agent.add_hit(:subscriber, req) + + %Tesla.Env{ + status: 200, + body: "ok", + headers: @content_type_text_plain + } + + _not_matched -> + %Tesla.Env{ + status: 404, + body: "not found", + headers: [{"content-type", "text/plain"}] + } + end) + + [publisher_url: publisher_url, subscriber_url: subscriber_url] + end + + def setup_text_publisher(_) do + publisher_url = "http://localhost/publisher/posts" + subscriber_url = "http://localhost/subscriber/callback" + + Tesla.Mock.mock(fn + %{url: ^publisher_url} = req -> + Tesla.Mock.Agent.add_hit(:publisher, req) + + %Tesla.Env{ + status: 200, + body: @text_body, + headers: @content_type_text_plain + } + + %{url: ^subscriber_url, method: :get, query: query} = req -> + Tesla.Mock.Agent.add_hit(:subscriber, req) + query = Map.new(query) + + if Map.has_key?(query, "hub.challenge") do + %Tesla.Env{ + status: 200, + body: Map.get(query, "hub.challenge"), + headers: @content_type_text_plain + } + else + %Tesla.Env{status: 400, body: "no challenge", headers: @content_type_text_plain} + end + + %{url: ^subscriber_url, method: :post} = req -> + Tesla.Mock.Agent.add_hit(:subscriber, req) + + %Tesla.Env{ + status: 200, + body: "ok", + headers: @content_type_text_plain + } + + _not_matched -> + %Tesla.Env{ + status: 404, + body: "not found", + headers: [{"content-type", "text/plain"}] + } + end) + + [publisher_url: publisher_url, subscriber_url: subscriber_url] + end + + def setup_json_publisher(_) do + publisher_url = "http://localhost/publisher/posts" + subscriber_url = "http://localhost/subscriber/callback" + + Tesla.Mock.mock(fn + %{url: ^publisher_url} = req -> + Tesla.Mock.Agent.add_hit(:publisher, req) + + %Tesla.Env{ + status: 200, + body: Jason.encode!(@json_body), + headers: [ + {"content-type", "application/json"} + ] + } + + %{url: ^subscriber_url, method: :get, query: query} = req -> + Tesla.Mock.Agent.add_hit(:subscriber, req) + query = Map.new(query) + + if Map.has_key?(query, "hub.challenge") do + %Tesla.Env{ + status: 200, + body: Map.get(query, "hub.challenge"), + headers: @content_type_text_plain + } + else + %Tesla.Env{status: 400, body: "no challenge", headers: @content_type_text_plain} + end + + %{url: ^subscriber_url, method: :post} = req -> + Tesla.Mock.Agent.add_hit(:subscriber, req) + + %Tesla.Env{ + status: 200, + body: "ok", + headers: @content_type_text_plain + } + + _not_matched -> + %Tesla.Env{ + status: 404, + body: "not found", + headers: [{"content-type", "text/plain"}] + } + end) + + [publisher_url: publisher_url, subscriber_url: subscriber_url] + end + + def setup_xml_publisher(_) do + publisher_url = "http://localhost/publisher/posts" + subscriber_url = "http://localhost/subscriber/callback" + + Tesla.Mock.mock(fn + %{url: ^publisher_url} = req -> + Tesla.Mock.Agent.add_hit(:publisher, req) + + %Tesla.Env{ + status: 200, + body: @xml_body, + headers: [ + {"content-type", "application/rss+xml"} + ] + } + + %{url: ^subscriber_url, method: :get, query: query} = req -> + Tesla.Mock.Agent.add_hit(:subscriber, req) + query = Map.new(query) + + if Map.has_key?(query, "hub.challenge") do + %Tesla.Env{ + status: 200, + body: Map.get(query, "hub.challenge"), + headers: @content_type_text_plain + } + else + %Tesla.Env{status: 400, body: "no challenge", headers: @content_type_text_plain} + end + + %{url: ^subscriber_url, method: :post} = req -> + Tesla.Mock.Agent.add_hit(:subscriber, req) + + %Tesla.Env{ + status: 200, + body: "ok", + headers: @content_type_text_plain + } + + _not_matched -> + %Tesla.Env{ + status: 404, + body: "not found", + headers: [{"content-type", "text/plain"}] + } + end) + + [publisher_url: publisher_url, subscriber_url: subscriber_url] + end +end diff --git a/test/pleroma/web/feed/rsscloud_controller_test.exs b/test/pleroma/web/feed/rsscloud_controller_test.exs new file mode 100644 index 000000000..e196a2efa --- /dev/null +++ b/test/pleroma/web/feed/rsscloud_controller_test.exs @@ -0,0 +1,118 @@ +defmodule Pleroma.Feed.RSSCloudControllerTest do + use Pleroma.Web.ConnCase + + setup do + [subscriber_port: Enum.random(7000..8000)] + end + + test "subscribing to a specific topic with diff_domain = true", %{ + conn: conn, + subscriber_port: subscriber_port + } do + callback_url = "http://localhost:#{subscriber_port}/callback" + + Tesla.Mock.mock(fn + %{method: :get, url: ^callback_url, query: query} -> + query = Map.new(query) + + if Map.has_key?(query, "challenge") && Map.has_key?(query, "url") do + %Tesla.Env{ + status: 200, + body: Map.get(query, "challenge"), + headers: [{"content-type", "text/plain"}] + } + else + %Tesla.Env{status: 400, body: "no challenge", headers: [{"content-type", "text/plain"}]} + end + + _not_matched -> + %Tesla.Env{ + status: 404, + body: "not found", + headers: [{"content-type", "text/plain"}] + } + end) + + params = %{ + "protocol" => "http-rest", + "domain" => "localhost", + "port" => "#{subscriber_port}", + "path" => "/callback", + "notifyProcedure" => "", + "url1" => "http://localhost:1234/topic" + } + + conn = form_post(conn, "/rsscloud/pleaseNotify", params) + + assert response(conn, 200) =~ "" + end + + test "subscribing to a specific topic with diff_domain = false", %{ + conn: conn, + subscriber_port: subscriber_port + } do + callback_url = "http://localhost:#{subscriber_port}/callback" + + Tesla.Mock.mock(fn + %{method: :post, url: ^callback_url} -> + %Tesla.Env{status: 200, body: "ok", headers: [{"content-type", "text/plain"}]} + + _not_matched -> + %Tesla.Env{ + status: 404, + body: "not found", + headers: [{"content-type", "text/plain"}] + } + end) + + params = %{ + "protocol" => "http-rest", + "port" => "#{subscriber_port}", + "path" => "/callback", + "notifyProcedure" => "", + "url1" => "http://localhost:1234/topic" + } + + conn = form_post(conn, "/rsscloud/pleaseNotify", params) + + assert response(conn, 200) =~ "" + end + + test "subscribing with an invalid response", %{ + conn: conn, + subscriber_port: subscriber_port + } do + callback_url = "http://localhost:#{subscriber_port}/callback" + + Tesla.Mock.mock(fn + %{method: :get, url: ^callback_url} -> + %Tesla.Env{status: 200, body: "wrong answer", headers: [{"content-type", "text/plain"}]} + + _not_matched -> + %Tesla.Env{ + status: 404, + body: "not found", + headers: [{"content-type", "text/plain"}] + } + end) + + params = %{ + "protocol" => "http-rest", + "domain" => "localhost", + "port" => "#{subscriber_port}", + "path" => "/callback", + "notifyProcedure" => "", + "url1" => "http://localhost:1234/topic" + } + + conn = form_post(conn, "/rsscloud/pleaseNotify", params) + + assert response(conn, 403) =~ "failed_challenge_body" + end + + defp form_post(conn, path, params) do + conn + |> Plug.Conn.put_req_header("content-type", "application/x-www-form-urlencoded") + |> post(path, params) + end +end diff --git a/test/pleroma/web/feed/websub_controller_test.exs b/test/pleroma/web/feed/websub_controller_test.exs new file mode 100644 index 000000000..170de25b7 --- /dev/null +++ b/test/pleroma/web/feed/websub_controller_test.exs @@ -0,0 +1,81 @@ +defmodule Pleroma.Web.WebSubControllerTest do + use Pleroma.Web.ConnCase + + setup do + [subscriber_url: "http://localhost/subscriber/callback"] + end + + test "subscribing to a specific topic", %{ + conn: conn, + subscriber_url: callback_url + } do + Tesla.Mock.mock(fn + %{method: :get, url: ^callback_url, query: query} -> + query = Map.new(query) + + if Map.has_key?(query, "hub.challenge") do + %Tesla.Env{ + status: 200, + body: Map.get(query, "hub.challenge"), + headers: [{"content-type", "text/plain"}] + } + else + %Tesla.Env{status: 400, body: "no challenge", headers: [{"content-type", "text/plain"}]} + end + + _not_matched -> + %Tesla.Env{ + status: 404, + body: "not found", + headers: [{"content-type", "text/plain"}] + } + end) + + params = %{ + "hub.mode" => "subscribe", + "hub.topic" => "http://localhost:1234/topic", + "hub.callback" => callback_url + } + + conn = form_post(conn, "/hub", params) + + assert response(conn, 202) =~ "" + end + + test "subscribing with an invalid response", %{ + conn: conn, + subscriber_url: callback_url + } do + Tesla.Mock.mock(fn + %{method: :get, url: ^callback_url} -> + %Tesla.Env{ + status: 200, + body: "whut?", + headers: [{"content-type", "text/plain"}] + } + + _not_matched -> + %Tesla.Env{ + status: 404, + body: "not found", + headers: [{"content-type", "text/plain"}] + } + end) + + params = %{ + "hub.mode" => "subscribe", + "hub.topic" => "http://localhost:1234/topic", + "hub.callback" => callback_url + } + + conn = form_post(conn, "/hub", params) + + assert response(conn, 403) =~ "failed_challenge_body" + end + + defp form_post(conn, path, params) do + conn + |> Plug.Conn.put_req_header("content-type", "application/x-www-form-urlencoded") + |> post(path, params) + end +end diff --git a/test/support/conn_case.ex b/test/support/conn_case.ex index eab469833..60e737056 100644 --- a/test/support/conn_case.ex +++ b/test/support/conn_case.ex @@ -120,6 +120,9 @@ defmodule Pleroma.Web.ConnCase do Mox.verify_on_exit!() + Tesla.Mock.Agent.init() + on_exit(fn -> Tesla.Mock.Agent.stop() end) + {:ok, conn: Phoenix.ConnTest.build_conn()} end end diff --git a/test/support/data_case.ex b/test/support/data_case.ex index 0ee2aa4a2..f59edc6a5 100644 --- a/test/support/data_case.ex +++ b/test/support/data_case.ex @@ -100,6 +100,9 @@ defmodule Pleroma.DataCase do Mox.verify_on_exit!() + Tesla.Mock.Agent.init() + on_exit(fn -> Tesla.Mock.Agent.stop() end) + :ok end diff --git a/test/support/tesla_mock_agent.ex b/test/support/tesla_mock_agent.ex new file mode 100644 index 000000000..61f7f8c66 --- /dev/null +++ b/test/support/tesla_mock_agent.ex @@ -0,0 +1,36 @@ +defmodule Tesla.Mock.Agent do + @moduledoc false + + # require Logger + + def init() do + Agent.start_link(fn -> [] end, name: __MODULE__) + end + + def stop() do + # Agent.stop(__MODULE__) + end + + def add_hit(key, env) do + # if Enum.member?([:post, :put], env.method) do + # Logger.error("#{key} #{env.method} #{env.url} #{inspect(env.body)}") + # else + # Logger.error("#{key} #{env.method} #{env.url} #{inspect(env.query)}") + # end + + Agent.update(__MODULE__, fn state -> [{key, env} | state] end) + end + + def access_list(key) do + Agent.get(__MODULE__, fn state -> + Enum.filter(state, fn {k, _v} -> k == key end) + |> Enum.map(fn {_k, v} -> v end) + |> Enum.reverse() + end) + end + + def hits(key) do + access_list(key) + |> Enum.count() + end +end