forked from AkkomaGang/akkoma
Compare commits
2 Commits
8e5a88edf7
...
57ccae8dbf
Author | SHA1 | Date |
---|---|---|
Peter Zingg | 57ccae8dbf | |
Peter Zingg | 6eb87aba5a |
|
@ -0,0 +1,27 @@
|
|||
defmodule Pleroma.Feed.Headers do
|
||||
@behaviour Ecto.Type
|
||||
def type, do: :binary
|
||||
|
||||
# Provide our own casting rules.
|
||||
def cast(term) do
|
||||
{:ok, term}
|
||||
end
|
||||
|
||||
def embed_as(_format) do
|
||||
:self
|
||||
end
|
||||
|
||||
def equal?(a, b) do
|
||||
a == b
|
||||
end
|
||||
|
||||
# When loading data from the database, we are guaranteed to
|
||||
# receive an integer (as databases are strict) and we will
|
||||
# just return it to be stored in the schema struct.
|
||||
def load(binary), do: {:ok, :erlang.binary_to_term(binary)}
|
||||
|
||||
# When dumping data to the database, we *expect* an integer
|
||||
# but any value could be inserted into the struct, so we need
|
||||
# guard against them.
|
||||
def dump(term), do: {:ok, :erlang.term_to_binary(term)}
|
||||
end
|
|
@ -0,0 +1,47 @@
|
|||
defmodule Pleroma.Feed.Subscription do
|
||||
use Ecto.Schema
|
||||
import Ecto.Changeset
|
||||
|
||||
schema "feed_subscriptions" do
|
||||
belongs_to(:topic, Pleroma.Feed.Topic)
|
||||
has_many(:subscription_updates, Pleroma.Feed.SubscriptionUpdate)
|
||||
|
||||
field(:callback_url, :string)
|
||||
field(:lease_seconds, :float)
|
||||
field(:expires_at, :naive_datetime)
|
||||
field(:secret, :string)
|
||||
field(:diff_domain, :boolean)
|
||||
field(:api, Ecto.Enum, values: [:websub, :rsscloud])
|
||||
|
||||
timestamps()
|
||||
end
|
||||
|
||||
@doc false
|
||||
def changeset(subscription, attrs) do
|
||||
subscription
|
||||
|> cast(attrs, [
|
||||
:topic_id,
|
||||
:api,
|
||||
:callback_url,
|
||||
:lease_seconds,
|
||||
:expires_at,
|
||||
:secret,
|
||||
:diff_domain
|
||||
])
|
||||
|> validate_required([:api, :callback_url, :lease_seconds, :expires_at])
|
||||
|> validate_method()
|
||||
|> foreign_key_constraint(:topic_id)
|
||||
|> unique_constraint([:api, :topic_id, :callback_url])
|
||||
end
|
||||
|
||||
defp validate_method(changeset) do
|
||||
api = get_field(changeset, :api)
|
||||
|
||||
case api do
|
||||
:websub -> changeset
|
||||
:rsscloud -> validate_required(changeset, :diff_domain)
|
||||
nil -> add_error(changeset, :api, "is required")
|
||||
other -> add_error(changeset, :api, "is invalid: #{other}")
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,23 @@
|
|||
defmodule Pleroma.Feed.SubscriptionUpdate do
|
||||
use Ecto.Schema
|
||||
import Ecto.Changeset
|
||||
|
||||
schema "feed_subscription_updates" do
|
||||
belongs_to(:update, Pleroma.Feed.Update)
|
||||
belongs_to(:subscription, Pleroma.Feed.Subscription)
|
||||
|
||||
field(:pushed_at, :naive_datetime)
|
||||
field(:status_code, :integer)
|
||||
|
||||
timestamps()
|
||||
end
|
||||
|
||||
@doc false
|
||||
def changeset(topic, attrs) do
|
||||
topic
|
||||
|> cast(attrs, [:update_id, :subscription_id, :pushed_at, :status_code])
|
||||
|> validate_required([:pushed_at, :status_code])
|
||||
|> foreign_key_constraint(:update_id)
|
||||
|> foreign_key_constraint(:subscription_id)
|
||||
end
|
||||
end
|
|
@ -0,0 +1,482 @@
|
|||
defmodule Pleroma.Feed.Subscriptions do
|
||||
@moduledoc """
|
||||
The Subscriptions context.
|
||||
"""
|
||||
require Logger
|
||||
|
||||
import Ecto.Query, warn: false
|
||||
alias Pleroma.Repo
|
||||
alias Pleroma.HTTP
|
||||
|
||||
alias Pleroma.Feed.Subscription
|
||||
alias Pleroma.Feed.Topic
|
||||
|
||||
def subscribe(api, topic_url, callback_url, subscription_lease_seconds, opts \\ []) do
|
||||
with {:ok, _} <- validate_url(topic_url),
|
||||
{:ok, _callback_uri} <- validate_url(callback_url),
|
||||
{:ok, topic} <- find_or_create_topic(topic_url),
|
||||
:ok <- validate_subscription(api, topic, callback_url, subscription_lease_seconds, opts) do
|
||||
find_or_create_subscription(api, topic, callback_url, subscription_lease_seconds, opts)
|
||||
else
|
||||
{:subscribe_validation_error, reason} ->
|
||||
# WebSub must notify callback on failure. Ignore return value.
|
||||
# RSSCloud just returns an error to the caller.
|
||||
_ = deny_subscription(api, callback_url, topic_url, reason)
|
||||
{:error, reason}
|
||||
|
||||
{:error, %Ecto.Changeset{} = changeset} ->
|
||||
Logger.error("subscribe data error: #{inspect(changeset.errors)}")
|
||||
{:error, "data error"}
|
||||
|
||||
{:error, reason} ->
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
def unsubscribe(topic_url, callback_url) do
|
||||
with {:ok, _} <- validate_url(topic_url),
|
||||
{:ok, callback_uri} <- validate_url(callback_url),
|
||||
%Topic{} = topic <- get_topic_by_url(topic_url),
|
||||
%Subscription{api: api} = subscription <-
|
||||
Repo.get_by(Subscription, topic_id: topic.id, callback_url: callback_url) do
|
||||
if api == :websub do
|
||||
_ = validate_unsubscribe(topic, callback_uri)
|
||||
end
|
||||
|
||||
subscription
|
||||
|> Subscription.changeset(%{
|
||||
expires_at: NaiveDateTime.utc_now()
|
||||
})
|
||||
|> Repo.update()
|
||||
else
|
||||
_ -> {:error, :subscription_not_found}
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
We callback on WebSub subscriptions just before deleting them.
|
||||
"""
|
||||
def final_unsubscribe(%Subscription{api: :websub} = subscription) do
|
||||
with {:ok, callback_uri} <- validate_url(subscription.callback_url) do
|
||||
validate_unsubscribe(subscription.topic, callback_uri)
|
||||
else
|
||||
_ ->
|
||||
{:unsubscribe_validation_error, "Subscription with improper callback_url"}
|
||||
end
|
||||
end
|
||||
|
||||
def final_unsubscribe(%Subscription{api: :rsscloud}), do: :ok
|
||||
|
||||
def get_topic_by_url(topic_url) do
|
||||
Repo.get_by(Topic, url: topic_url)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Find or create a topic.
|
||||
|
||||
Topics can exist without any valid subscriptions. Additionally a subscription can fail to validate and a topic still exist.
|
||||
|
||||
## Examples
|
||||
|
||||
iex> find_or_create_topic("https://some-topic-url")
|
||||
{:ok, %Topic{}}
|
||||
"""
|
||||
def find_or_create_topic(topic_url) do
|
||||
case Repo.get_by(Topic, url: topic_url) do
|
||||
%Topic{} = topic ->
|
||||
{:ok, topic}
|
||||
|
||||
nil ->
|
||||
%Topic{}
|
||||
|> Topic.changeset(%{
|
||||
url: topic_url,
|
||||
expires_at: ~N[2046-12-31 23:59:00]
|
||||
})
|
||||
|> Repo.insert()
|
||||
end
|
||||
end
|
||||
|
||||
def find_subscription_by_api_topic_and_url(api, %Topic{} = topic, callback_url) do
|
||||
Repo.get_by(Subscription, api: api, topic_id: topic.id, callback_url: callback_url)
|
||||
end
|
||||
|
||||
def find_or_create_subscription(
|
||||
api,
|
||||
%Topic{} = topic,
|
||||
callback_url,
|
||||
subscription_lease_seconds,
|
||||
opts
|
||||
) do
|
||||
# BACKPORT api: api
|
||||
lease_seconds = convert_lease_seconds(subscription_lease_seconds)
|
||||
|
||||
case Repo.get_by(Subscription, api: api, topic_id: topic.id, callback_url: callback_url) do
|
||||
%Subscription{} = subscription ->
|
||||
subscription
|
||||
|> Subscription.changeset(%{
|
||||
lease_seconds: lease_seconds,
|
||||
expires_at: from_now(lease_seconds),
|
||||
diff_domain: Keyword.get(opts, :diff_domain, false),
|
||||
secret: Keyword.get(opts, :secret)
|
||||
})
|
||||
|> Repo.update()
|
||||
|
||||
nil ->
|
||||
create_subscription(api, topic, callback_url, lease_seconds, opts)
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
Validate a WebSub subscription by sending a HTTP GET to the subscriber's callback_url.
|
||||
Validate an RSSCloud subscription by sending a HTTP GET or POST to the subscriber's callback_url.
|
||||
"""
|
||||
def validate_subscription(
|
||||
:websub,
|
||||
%Topic{} = topic,
|
||||
callback_url,
|
||||
subscription_lease_seconds,
|
||||
_opts
|
||||
) do
|
||||
challenge = :crypto.strong_rand_bytes(32) |> Base.url_encode64() |> binary_part(0, 32)
|
||||
|
||||
query = [
|
||||
{"hub.mode", "subscribe"},
|
||||
{"hub.topic", topic.url},
|
||||
{"hub.challenge", challenge},
|
||||
{"hub.lease_seconds", to_string(subscription_lease_seconds)}
|
||||
]
|
||||
|
||||
case HTTP.get(callback_url, [], params: query) do
|
||||
{:ok, %Tesla.Env{status: code, body: body}} when code >= 200 and code < 300 ->
|
||||
# Ensure the response body matches our challenge
|
||||
if challenge != String.trim(body) do
|
||||
{:subscribe_validation_error, :failed_challenge_body}
|
||||
else
|
||||
:ok
|
||||
end
|
||||
|
||||
other ->
|
||||
handle_validation_errors(other)
|
||||
end
|
||||
end
|
||||
|
||||
def validate_subscription(
|
||||
:rsscloud,
|
||||
%Topic{} = topic,
|
||||
callback_url,
|
||||
_lease_seconds,
|
||||
opts
|
||||
) do
|
||||
diff_domain = Keyword.get(opts, :diff_domain, false)
|
||||
validate_rsscloud_subscription(topic, callback_url, diff_domain)
|
||||
end
|
||||
|
||||
def validate_rsscloud_subscription(topic, callback_url, true) do
|
||||
challenge = :crypto.strong_rand_bytes(32) |> Base.url_encode64() |> binary_part(0, 32)
|
||||
|
||||
query = [
|
||||
{"url", topic.url},
|
||||
{"challenge", challenge}
|
||||
]
|
||||
|
||||
case HTTP.get(callback_url, [], params: query) do
|
||||
{:ok, %Tesla.Env{status: code, body: body}} when code >= 200 and code < 300 ->
|
||||
# Ensure the response body contains our challenge
|
||||
if String.contains?(body, challenge) do
|
||||
:ok
|
||||
else
|
||||
{:subscribe_validation_error, :failed_challenge_body}
|
||||
end
|
||||
|
||||
other ->
|
||||
handle_validation_errors(other)
|
||||
end
|
||||
end
|
||||
|
||||
def validate_rsscloud_subscription(topic, callback_uri, false) do
|
||||
callback_url = to_string(callback_uri)
|
||||
body = %{url: topic.url} |> URI.encode_query()
|
||||
headers = [{"content-type", "application/x-www-form-urlencoded"}]
|
||||
|
||||
case HTTP.post(callback_url, body, headers) do
|
||||
{:ok, %Tesla.Env{status: code}} when code >= 200 and code < 300 ->
|
||||
:ok
|
||||
|
||||
other ->
|
||||
handle_validation_errors(other)
|
||||
end
|
||||
end
|
||||
|
||||
def handle_validation_errors({:ok, %Tesla.Env{status: 404}}) do
|
||||
{:subscribe_validation_error, :failed_404_response}
|
||||
end
|
||||
|
||||
def handle_validation_errors({:ok, %Tesla.Env{} = env}) do
|
||||
Logger.error("failed_unknown_response #{inspect(env)}")
|
||||
{:subscribe_validation_error, :failed_unknown_response}
|
||||
end
|
||||
|
||||
def handle_validation_errors({:error, :invalid_request}) do
|
||||
{:subscribe_validation_error, :invalid_request}
|
||||
end
|
||||
|
||||
def handle_validation_errors({:error, reason}) do
|
||||
Logger.error("Got unexpected error from validate subscription call: #{reason}")
|
||||
{:subscribe_validation_error, :failed_unknown_error}
|
||||
end
|
||||
|
||||
@doc """
|
||||
Validate a WebSub unsubscription by sending a HTTP GET to the subscriber's callback_url.
|
||||
"""
|
||||
def validate_unsubscribe(
|
||||
%Topic{} = topic,
|
||||
%URI{} = callback_uri
|
||||
) do
|
||||
challenge = :crypto.strong_rand_bytes(32) |> Base.url_encode64() |> binary_part(0, 32)
|
||||
|
||||
query = [
|
||||
{"hub.mode", "unsubscribe"},
|
||||
{"hub.topic", topic.url},
|
||||
{"hub.challenge", challenge}
|
||||
]
|
||||
|
||||
callback_url = to_string(callback_uri)
|
||||
|
||||
case HTTP.get(callback_url, [], params: query) do
|
||||
{:ok, %Tesla.Env{}} ->
|
||||
:ok
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.error("Got unexpected error from validate unsubscribe call: #{reason}")
|
||||
{:unsubscribe_validation_error, :failed_unknown_error}
|
||||
end
|
||||
end
|
||||
|
||||
def create_subscription(api, %Topic{} = topic, callback_url, subscription_lease_seconds, opts) do
|
||||
lease_seconds = convert_lease_seconds(subscription_lease_seconds)
|
||||
# BACKPORT
|
||||
%Subscription{
|
||||
topic_id: topic.id
|
||||
}
|
||||
|> Subscription.changeset(%{
|
||||
api: api,
|
||||
callback_url: callback_url,
|
||||
lease_seconds: lease_seconds,
|
||||
expires_at: from_now(lease_seconds),
|
||||
diff_domain: Keyword.get(opts, :diff_domain, false),
|
||||
secret: Keyword.get(opts, :secret)
|
||||
})
|
||||
|> Repo.insert()
|
||||
end
|
||||
|
||||
defp convert_lease_seconds(seconds) when is_integer(seconds), do: seconds
|
||||
|
||||
defp convert_lease_seconds(seconds) when is_binary(seconds) do
|
||||
case String.trim(seconds) |> Integer.parse() do
|
||||
{seconds, ""} ->
|
||||
seconds
|
||||
|
||||
_ ->
|
||||
Logger.error("Invalid lease value. not an integer: '#{seconds}'")
|
||||
0
|
||||
end
|
||||
end
|
||||
|
||||
def deny_subscription(:websub, callback_url, topic_url, reason) do
|
||||
# If (and when) the subscription is denied, the hub MUST inform the subscriber by sending an HTTP [RFC7231]
|
||||
# (or HTTPS [RFC2818]) GET request to the subscriber's callback URL as given in the subscription request. This request has the following query string arguments appended (format described in Section 4 of [URL]):
|
||||
with {:ok, callback_uri} <- validate_url(callback_url) do
|
||||
query = [
|
||||
{"hub.mode", "denied"},
|
||||
{"hub.topic", topic_url},
|
||||
{"hub.reason", reason_string(reason)}
|
||||
]
|
||||
|
||||
final_url = to_string(callback_uri)
|
||||
|
||||
# We don't especially care about a response on this one
|
||||
case HTTP.get(final_url, [], params: query) do
|
||||
{:ok, %Tesla.Env{}} ->
|
||||
:ok
|
||||
|
||||
{:error, reason} ->
|
||||
{:error, reason}
|
||||
end
|
||||
else
|
||||
{:error, reason} ->
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
def deny_subscription(:rsscloud, _callback_url, _topic_url, _reason), do: :ok
|
||||
|
||||
def reason_string(reason) when is_binary(reason), do: reason
|
||||
def reason_string(reason) when is_atom(reason), do: Atom.to_string(reason)
|
||||
def reason_string(reason), do: IO.inspect(reason)
|
||||
|
||||
def list_active_topic_subscriptions(%Topic{} = topic) do
|
||||
now = NaiveDateTime.utc_now()
|
||||
|
||||
from(s in Subscription,
|
||||
where: s.topic_id == ^topic.id and s.expires_at >= ^now
|
||||
)
|
||||
|> Repo.all()
|
||||
end
|
||||
|
||||
def list_inactive_subscriptions(now) do
|
||||
from(s in Subscription,
|
||||
where: s.expires_at < ^now,
|
||||
join: t in assoc(s, :topic),
|
||||
preload: [topic: t]
|
||||
)
|
||||
|> Repo.all()
|
||||
end
|
||||
|
||||
@spec delete_subscription(Subscription.t(), non_neg_integer(), NaiveDateTime.t() | nil) ::
|
||||
{non_neg_integer(), non_neg_integer(), [integer()]} | {:error, term()}
|
||||
def delete_subscription(subscription, topic_lease_seconds, now \\ nil) do
|
||||
now = now || NaiveDateTime.utc_now()
|
||||
|
||||
Repo.transaction(fn ->
|
||||
topic_id = subscription.topic_id
|
||||
|
||||
case Repo.delete(subscription) do
|
||||
{:ok, _} ->
|
||||
{n_topics, topic_ids} = update_topic_expirations([topic_id], topic_lease_seconds, now)
|
||||
{1, n_topics, topic_ids}
|
||||
|
||||
_ ->
|
||||
{0, 0, []}
|
||||
end
|
||||
end)
|
||||
|> case do
|
||||
{:ok, res} -> res
|
||||
{:error, reason} -> {:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
# BACKPORT
|
||||
@spec delete_all_inactive_subscriptions(non_neg_integer(), NaiveDateTime.t() | nil) ::
|
||||
{non_neg_integer(), non_neg_integer(), [integer()]} | {:error, term()}
|
||||
def delete_all_inactive_subscriptions(topic_lease_seconds, now \\ nil) do
|
||||
Repo.transaction(fn ->
|
||||
# Cascades to delete all SubscriptionUpdates as well
|
||||
{n_subs, topic_ids} =
|
||||
from(s in Subscription,
|
||||
select: s.topic_id,
|
||||
where: s.expires_at < ^now
|
||||
)
|
||||
|> Repo.delete_all()
|
||||
|
||||
# Update those topics who now don't have a subscription
|
||||
{n_topics, topic_ids} = update_topic_expirations(topic_ids, topic_lease_seconds, now)
|
||||
|
||||
{n_subs, n_topics, topic_ids}
|
||||
end)
|
||||
|> case do
|
||||
{:ok, res} -> res
|
||||
{:error, reason} -> {:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
def list_inactive_topics(now) do
|
||||
from(t in Topic,
|
||||
where:
|
||||
t.expires_at < ^now and
|
||||
fragment("NOT EXISTS (SELECT * FROM feed_subscriptions s WHERE s.topic_id = ?)", t.id)
|
||||
)
|
||||
|> Repo.all()
|
||||
end
|
||||
|
||||
@spec delete_all_inactive_topics(NaiveDateTime.t() | nil) :: {non_neg_integer(), [integer()]}
|
||||
def delete_all_inactive_topics(now) do
|
||||
now = now || NaiveDateTime.utc_now()
|
||||
|
||||
from(t in Topic,
|
||||
where:
|
||||
t.expires_at < ^now and
|
||||
fragment("NOT EXISTS (SELECT * FROM feed_subscriptions s WHERE s.topic_id = ?)", t.id)
|
||||
)
|
||||
|> Repo.delete_all()
|
||||
end
|
||||
|
||||
@spec update_topic_expirations([integer()], non_neg_integer(), NaiveDateTime.t() | nil) ::
|
||||
{non_neg_integer(), [integer()]}
|
||||
def update_topic_expirations(topic_ids, topic_lease_seconds, now \\ nil) do
|
||||
now = now || NaiveDateTime.utc_now()
|
||||
lease_seconds = convert_lease_seconds(topic_lease_seconds)
|
||||
expires_at = NaiveDateTime.add(now, lease_seconds, :second)
|
||||
|
||||
from(t in Topic,
|
||||
select: t.id,
|
||||
where:
|
||||
not exists(
|
||||
from(s in Subscription,
|
||||
where: s.topic_id in ^topic_ids
|
||||
)
|
||||
),
|
||||
update: [set: [updated_at: ^now, expires_at: ^expires_at]]
|
||||
)
|
||||
|> Repo.update_all([])
|
||||
end
|
||||
|
||||
def from_now(seconds) do
|
||||
NaiveDateTime.utc_now() |> NaiveDateTime.add(seconds, :second)
|
||||
end
|
||||
|
||||
defp validate_url(url) when is_binary(url) do
|
||||
case URI.new(url) do
|
||||
{:ok, uri} ->
|
||||
if uri.scheme in ["http", "https"] do
|
||||
{:ok, uri}
|
||||
else
|
||||
{:error, :url_not_http}
|
||||
end
|
||||
|
||||
err ->
|
||||
err
|
||||
end
|
||||
end
|
||||
|
||||
defp validate_url(_), do: {:error, :url_not_binary}
|
||||
|
||||
def count_topics do
|
||||
Repo.one(
|
||||
from(u in Topic,
|
||||
select: count(u.id)
|
||||
)
|
||||
)
|
||||
end
|
||||
|
||||
def count_active_subscriptions do
|
||||
now = NaiveDateTime.utc_now()
|
||||
|
||||
Repo.one(
|
||||
from(s in Subscription,
|
||||
where: s.expires_at >= ^now,
|
||||
select: count(s.id)
|
||||
)
|
||||
)
|
||||
end
|
||||
|
||||
def subscription_updates_chart do
|
||||
case Repo.query("""
|
||||
select date(pushed_at) as "date", count(*) as "count"
|
||||
from subscription_updates
|
||||
group by date(pushed_at)
|
||||
order by date(pushed_at) desc
|
||||
limit 30;
|
||||
""") do
|
||||
{:ok, %Postgrex.Result{rows: rows}} ->
|
||||
flipped = Enum.reverse(rows)
|
||||
|
||||
%{
|
||||
keys: Enum.map(flipped, fn [key, _] -> key end),
|
||||
values: Enum.map(flipped, fn [_, value] -> value end)
|
||||
}
|
||||
|
||||
_ ->
|
||||
%{keys: [], values: []}
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,23 @@
|
|||
defmodule Pleroma.Feed.Topic do
|
||||
use Ecto.Schema
|
||||
import Ecto.Changeset
|
||||
|
||||
schema "feed_topics" do
|
||||
# BACKPORT
|
||||
has_many(:subscriptions, Pleroma.Feed.Subscription)
|
||||
has_many(:updates, Pleroma.Feed.Update)
|
||||
|
||||
field(:url, :string)
|
||||
field(:expires_at, :naive_datetime)
|
||||
|
||||
timestamps()
|
||||
end
|
||||
|
||||
@doc false
|
||||
def changeset(topic, attrs) do
|
||||
topic
|
||||
|> cast(attrs, [:url, :expires_at])
|
||||
|> validate_required([:url])
|
||||
|> unique_constraint([:url])
|
||||
end
|
||||
end
|
|
@ -0,0 +1,25 @@
|
|||
defmodule Pleroma.Feed.Update do
|
||||
use Ecto.Schema
|
||||
import Ecto.Changeset
|
||||
|
||||
schema "feed_updates" do
|
||||
belongs_to(:topic, Pleroma.Feed.Topic)
|
||||
has_many(:subscription_updates, Pleroma.Feed.SubscriptionUpdate)
|
||||
|
||||
field(:body, :binary)
|
||||
field(:headers, Pleroma.Feed.Headers)
|
||||
field(:content_type, :string)
|
||||
field(:links, {:array, :string})
|
||||
field(:hash, :string)
|
||||
|
||||
timestamps()
|
||||
end
|
||||
|
||||
@doc false
|
||||
def changeset(topic, attrs) do
|
||||
topic
|
||||
|> cast(attrs, [:topic_id, :body, :headers, :content_type, :hash, :links])
|
||||
|> validate_required([:body, :content_type, :hash, :links])
|
||||
|> foreign_key_constraint(:topic_id)
|
||||
end
|
||||
end
|
|
@ -0,0 +1,133 @@
|
|||
defmodule Pleroma.Feed.Updates do
|
||||
@moduledoc """
|
||||
The Updates context.
|
||||
"""
|
||||
require Logger
|
||||
|
||||
import Ecto.Query, warn: false
|
||||
alias Pleroma.Repo
|
||||
alias Pleroma.HTTP
|
||||
|
||||
alias Pleroma.Feed.Subscriptions
|
||||
alias Pleroma.Feed.SubscriptionUpdate
|
||||
alias Pleroma.Feed.Topic
|
||||
alias Pleroma.Feed.Update
|
||||
|
||||
def publish(topic_url) do
|
||||
case Subscriptions.get_topic_by_url(topic_url) do
|
||||
# Get all active subscriptions and publish the update to them
|
||||
%Topic{} = topic ->
|
||||
case HTTP.get(topic.url, []) do
|
||||
{:ok, %Tesla.Env{status: code} = env}
|
||||
when code >= 200 and code < 300 ->
|
||||
with {:ok, update} <- create_update(topic, env) do
|
||||
# Realistically we should do all of this async, for now we'll do querying line and dispatch async
|
||||
subscribers = Subscriptions.list_active_topic_subscriptions(topic)
|
||||
|
||||
Enum.each(subscribers, fn subscription ->
|
||||
Logger.debug("Queueing dispatch to #{subscription.callback_url}")
|
||||
|
||||
Pleroma.Workers.DispatchFeedUpdateWorker.new(%{
|
||||
callback_url: subscription.callback_url,
|
||||
update_id: update.id,
|
||||
subscription_id: subscription.id,
|
||||
subscription_api: subscription.api,
|
||||
secret: subscription.secret
|
||||
})
|
||||
|> Oban.insert()
|
||||
end)
|
||||
|
||||
Logger.info("Updates.publish: Sending updates for #{topic_url}")
|
||||
{:ok, update}
|
||||
else
|
||||
{:error, _changeset} ->
|
||||
Logger.error("Updates.publish: Could not create new record for #{topic.url}")
|
||||
{:error, "Error creating update record."}
|
||||
end
|
||||
|
||||
_ ->
|
||||
Logger.error("Updates.publish: Unsuccessful response code for #{topic.url}")
|
||||
{:error, "Publish URL did not return a successful status code."}
|
||||
end
|
||||
|
||||
nil ->
|
||||
# Nothing found
|
||||
Logger.error("Updates.publish: Did not find topic for #{topic_url}")
|
||||
{:error, "Topic not found for topic URL."}
|
||||
|
||||
err ->
|
||||
Logger.error("Updates.publish: Unknown error #{inspect(err)}")
|
||||
{:error, "Unknown error."}
|
||||
end
|
||||
end
|
||||
|
||||
def create_update(%Topic{} = topic, %Tesla.Env{body: body, headers: headers} = env)
|
||||
when is_binary(body) do
|
||||
content_type = Tesla.get_header(env, "content-type") || "application/octet-stream"
|
||||
|
||||
# BACKPORT
|
||||
%Update{
|
||||
topic_id: topic.id
|
||||
}
|
||||
|> Update.changeset(%{
|
||||
body: body,
|
||||
headers: headers,
|
||||
content_type: content_type,
|
||||
links: Tesla.get_headers(env, "link"),
|
||||
hash: :crypto.hash(:sha256, body) |> Base.encode16(case: :lower)
|
||||
})
|
||||
|> Repo.insert()
|
||||
end
|
||||
|
||||
@doc """
|
||||
Create a subscription update, uses ID's for quick insertion
|
||||
"""
|
||||
def create_subscription_update(update_id, subscription_id, status_code)
|
||||
when is_integer(update_id) and is_integer(subscription_id) do
|
||||
%SubscriptionUpdate{
|
||||
update_id: update_id,
|
||||
subscription_id: subscription_id
|
||||
}
|
||||
|> SubscriptionUpdate.changeset(%{
|
||||
pushed_at: NaiveDateTime.utc_now(),
|
||||
status_code: status_code
|
||||
})
|
||||
|> Repo.insert()
|
||||
end
|
||||
|
||||
def get_update(id) do
|
||||
Repo.get(Update, id)
|
||||
end
|
||||
|
||||
def get_update_and_topic(id) do
|
||||
Repo.get(Update, id) |> Repo.preload(:topic)
|
||||
end
|
||||
|
||||
def get_subscription_update(id) do
|
||||
Repo.get(SubscriptionUpdate, id)
|
||||
end
|
||||
|
||||
def count_30min_updates do
|
||||
now = NaiveDateTime.utc_now()
|
||||
time_ago = NaiveDateTime.add(now, -1_800, :second)
|
||||
|
||||
Repo.one(
|
||||
from(u in Update,
|
||||
where: u.inserted_at > ^time_ago and u.inserted_at < ^now,
|
||||
select: count(u.id)
|
||||
)
|
||||
)
|
||||
end
|
||||
|
||||
def count_30min_subscription_updates do
|
||||
now = NaiveDateTime.utc_now()
|
||||
time_ago = NaiveDateTime.add(now, -1_800, :second)
|
||||
|
||||
Repo.one(
|
||||
from(u in SubscriptionUpdate,
|
||||
where: u.inserted_at > ^time_ago and u.inserted_at < ^now,
|
||||
select: count(u.id)
|
||||
)
|
||||
)
|
||||
end
|
||||
end
|
|
@ -0,0 +1,158 @@
|
|||
defmodule Pleroma.Web.Feed.RSSCloudController do
|
||||
use Pleroma.Web, :controller
|
||||
|
||||
require Logger
|
||||
|
||||
alias Pleroma.Feed.Subscriptions
|
||||
|
||||
# RSSCloud subscriptions expire after 25 hours (no options!)
|
||||
@subscription_lease_seconds 90_000
|
||||
|
||||
def ping(conn, _params) do
|
||||
Logger.error("ping not implemented")
|
||||
|
||||
handle_response({:error, :unimplemented}, conn)
|
||||
end
|
||||
|
||||
def please_notify(conn, _params) do
|
||||
remote_ip =
|
||||
case Plug.Conn.get_req_header(conn, "x-forwarded-for") do
|
||||
[ip] ->
|
||||
ip
|
||||
|
||||
_ ->
|
||||
with {a, b, c, d} <- conn.remote_ip do
|
||||
"#{a}.#{b}.#{c}.#{d}"
|
||||
else
|
||||
_ -> "127.0.0.1"
|
||||
end
|
||||
end
|
||||
|> case do
|
||||
"127.0.0.1" -> "localhost"
|
||||
ip_or_address -> ip_or_address
|
||||
end
|
||||
|
||||
result =
|
||||
with {:ok, {callback, topics, diff_domain}} <-
|
||||
parse_body_params(conn.body_params, remote_ip) do
|
||||
{good, bad} =
|
||||
topics
|
||||
|> Enum.map(fn topic ->
|
||||
Subscriptions.subscribe(:rsscloud, topic, callback, @subscription_lease_seconds,
|
||||
diff_domain: diff_domain
|
||||
)
|
||||
end)
|
||||
|> Enum.split_with(fn res -> elem(res, 0) == :ok end)
|
||||
|
||||
case bad do
|
||||
[] -> hd(good)
|
||||
_ -> hd(bad)
|
||||
end
|
||||
else
|
||||
error -> error
|
||||
end
|
||||
|
||||
handle_response(result, conn)
|
||||
end
|
||||
|
||||
defp parse_body_params(
|
||||
%{"protocol" => protocol, "port" => port, "path" => path} = params,
|
||||
remote_ip
|
||||
) do
|
||||
scheme =
|
||||
case protocol do
|
||||
"http-rest" -> "http"
|
||||
"https-rest" -> "https"
|
||||
_ -> nil
|
||||
end
|
||||
|
||||
port =
|
||||
case Integer.parse(port) do
|
||||
{p, ""} -> p
|
||||
_ -> nil
|
||||
end
|
||||
|
||||
cond do
|
||||
is_nil(scheme) ->
|
||||
Logger.error("protocol '#{protocol}' invalid")
|
||||
{:error, :invalid_request}
|
||||
|
||||
is_nil(port) ->
|
||||
Logger.error("port '#{port}' invalid")
|
||||
{:error, :invalid_request}
|
||||
|
||||
String.first(path) != "/" ->
|
||||
Logger.error("path '#{path}' invalid")
|
||||
{:error, :invalid_request}
|
||||
|
||||
true ->
|
||||
Enum.reduce(params, [], fn {k, v}, acc ->
|
||||
if Regex.match?(~r/^url\d+$/, k) do
|
||||
[v | acc]
|
||||
else
|
||||
acc
|
||||
end
|
||||
end)
|
||||
|> case do
|
||||
[] ->
|
||||
Logger.error("no urls parsed")
|
||||
{:error, :invalid_request}
|
||||
|
||||
topics ->
|
||||
domain =
|
||||
case Map.get(params, "domain", remote_ip) do
|
||||
"127.0.0.1" -> "localhost"
|
||||
ip_or_address -> ip_or_address
|
||||
end
|
||||
|
||||
callback = "#{scheme}://#{domain}:#{port}#{path}"
|
||||
|
||||
# diff_domain = domain != remote_ip
|
||||
diff_domain = Map.has_key?(params, "domain")
|
||||
{:ok, {callback, topics, diff_domain}}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
defp parse_body_params(_invalid_params, _remote_ip), do: {:error, :invalid_request}
|
||||
|
||||
defp handle_response({:ok, _subscription}, conn) do
|
||||
conn
|
||||
|> Phoenix.Controller.json(%{success: true, msg: "subscribed"})
|
||||
|> Plug.Conn.halt()
|
||||
end
|
||||
|
||||
defp handle_response({:error, message}, conn) when is_binary(message) do
|
||||
conn
|
||||
|> Plug.Conn.put_status(500)
|
||||
|> Phoenix.Controller.json(%{success: false, msg: message})
|
||||
|> Plug.Conn.halt()
|
||||
end
|
||||
|
||||
defp handle_response({:error, reason}, conn) when is_atom(reason) do
|
||||
{status_code, message} =
|
||||
case reason do
|
||||
:invalid_request -> {400, "invalid_request"}
|
||||
:failed_challenge_body -> {403, "failed_challenge_body"}
|
||||
:failed_404_response -> {403, "failed_404_response"}
|
||||
:failed_unknown_response -> {403, "failed_unknown_response"}
|
||||
:failed_unknown_error -> {500, "failed_unknown_error"}
|
||||
:unimplemented -> {500, "unimplemented"}
|
||||
_ -> {500, "failed_unknown_reason"}
|
||||
end
|
||||
|
||||
conn
|
||||
|> Plug.Conn.put_status(status_code)
|
||||
|> Phoenix.Controller.json(%{success: false, msg: message})
|
||||
|> Plug.Conn.halt()
|
||||
end
|
||||
|
||||
defp handle_response({:error, error}, conn) do
|
||||
Logger.error("RSSCloudController unknown error: #{inspect(error)}")
|
||||
|
||||
conn
|
||||
|> Plug.Conn.put_status(500)
|
||||
|> Phoenix.Controller.json(%{success: false, msg: "unknown error"})
|
||||
|> Plug.Conn.halt()
|
||||
end
|
||||
end
|
|
@ -54,6 +54,10 @@ defmodule Pleroma.Web.Feed.UserController do
|
|||
|> Pleroma.Maps.put_if_present(:max_id, params["max_id"])
|
||||
|> ActivityPub.fetch_public_or_unlisted_activities()
|
||||
|
||||
rss_cloud_config =
|
||||
Routes.rss_cloud_register_url(conn, :please_notify)
|
||||
|> URI.parse()
|
||||
|
||||
conn
|
||||
|> put_resp_content_type("application/#{format}+xml")
|
||||
|> put_view(FeedView)
|
||||
|
@ -61,6 +65,7 @@ defmodule Pleroma.Web.Feed.UserController do
|
|||
user: user,
|
||||
activities: activities,
|
||||
feed_config: Config.get([:feed]),
|
||||
rss_cloud_config: rss_cloud_config,
|
||||
view_module: FeedView
|
||||
)
|
||||
end
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
defmodule Pleroma.Web.Feed.WebSubController do
|
||||
use Pleroma.Web, :controller
|
||||
|
||||
alias Pleroma.Feed.Subscriptions
|
||||
alias Pleroma.Feed.Updates
|
||||
|
||||
# By default WebSub subscriptions expire after 10 days
|
||||
@subscription_lease_seconds 864_000
|
||||
|
||||
def hub(conn, _params) do
|
||||
conn
|
||||
|> handle_request(conn.params)
|
||||
end
|
||||
|
||||
defp handle_request(
|
||||
conn,
|
||||
%{"hub.mode" => "subscribe", "hub.topic" => topic, "hub.callback" => callback} = params
|
||||
) do
|
||||
lease_seconds = Map.get(params, "hub.lease_seconds", @subscription_lease_seconds)
|
||||
secret = Map.get(params, "hub.secret")
|
||||
|
||||
Subscriptions.subscribe(:websub, topic, callback, lease_seconds, secret: secret)
|
||||
|> handle_response(conn)
|
||||
end
|
||||
|
||||
defp handle_request(conn, %{
|
||||
"hub.mode" => "unsubscribe",
|
||||
"hub.topic" => topic,
|
||||
"hub.callback" => callback
|
||||
}) do
|
||||
Subscriptions.unsubscribe(topic, callback)
|
||||
|> handle_response(conn)
|
||||
end
|
||||
|
||||
defp handle_request(conn, %{"hub.mode" => "publish", "hub.topic" => topic}) do
|
||||
Updates.publish(topic)
|
||||
|> handle_response(conn)
|
||||
end
|
||||
|
||||
defp handle_request(conn, %{"hub.mode" => "publish", "hub.url" => topic}) do
|
||||
# Compatability with https://pubsubhubbub.appspot.com/
|
||||
Updates.publish(topic)
|
||||
|> handle_response(conn)
|
||||
end
|
||||
|
||||
defp handle_response({:ok, _message}, conn) do
|
||||
conn
|
||||
|> Plug.Conn.send_resp(202, "")
|
||||
|> Plug.Conn.halt()
|
||||
end
|
||||
|
||||
defp handle_response({:error, message}, conn) when is_binary(message) do
|
||||
conn
|
||||
|> Plug.Conn.send_resp(500, message)
|
||||
|> Plug.Conn.halt()
|
||||
end
|
||||
|
||||
defp handle_response({:error, reason}, conn) when is_atom(reason) do
|
||||
{status_code, message} =
|
||||
case reason do
|
||||
:failed_challenge_body -> {403, "failed_challenge_body"}
|
||||
:failed_404_response -> {403, "failed_404_response"}
|
||||
:failed_unknown_response -> {403, "failed_unknown_response"}
|
||||
:failed_unknown_error -> {500, "failed_unknown_error"}
|
||||
_ -> {500, "failed_unknown_reason"}
|
||||
end
|
||||
|
||||
conn
|
||||
|> Plug.Conn.send_resp(status_code, message)
|
||||
|> Plug.Conn.halt()
|
||||
end
|
||||
|
||||
defp handle_response({:error, _}, conn) do
|
||||
conn
|
||||
|> Plug.Conn.send_resp(500, "unknown error")
|
||||
|> Plug.Conn.halt()
|
||||
end
|
||||
end
|
|
@ -843,6 +843,23 @@ defmodule Pleroma.Web.Router do
|
|||
get("/:sig/:url/:filename", MediaProxy.MediaProxyController, :remote)
|
||||
end
|
||||
|
||||
pipeline :accepts_www_forms do
|
||||
plug :accepts, ["x-www-form-urlencoded"]
|
||||
end
|
||||
|
||||
scope "/hub", Pleroma.Web.Feed do
|
||||
pipe_through :accepts_www_forms
|
||||
|
||||
post "/", WebSubController, :hub, as: :web_sub_hub
|
||||
end
|
||||
|
||||
scope "/rsscloud", Pleroma.Web.Feed do
|
||||
pipe_through :accepts_www_forms
|
||||
|
||||
post "/ping", RSSCloudController, :ping
|
||||
post "/pleaseNotify", RSSCloudController, :please_notify, as: :rss_cloud_register
|
||||
end
|
||||
|
||||
if Pleroma.Config.get(:env) == :dev do
|
||||
scope "/dev" do
|
||||
pipe_through([:mailbox_preview])
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
<logo><%= feed_logo() %></logo>
|
||||
<updated><%= most_recent_update(@activities) %></updated>
|
||||
<link rel="self" href="<%= '#{Routes.tag_feed_url(@conn, :feed, @tag)}.atom' %>" type="application/atom+xml"/>
|
||||
<link rel="hub" href="<%= Routes.web_sub_hub_url(@conn, :hub) %>"/>
|
||||
<%= for activity <- @activities do %>
|
||||
<%= render @view_module, "_tag_activity.atom", Map.merge(assigns, prepare_activity(activity, actor: true)) %>
|
||||
<% end %>
|
||||
|
|
|
@ -6,6 +6,8 @@
|
|||
<title>#<%= @tag %></title>
|
||||
<description><%= Gettext.dpgettext("static_pages", "tag feed description", "These are public toots tagged with #%{tag}. You can interact with them if you have an account anywhere in the fediverse.", tag: @tag) %></description>
|
||||
<link><%= '#{Routes.tag_feed_url(@conn, :feed, @tag)}.rss' %></link>
|
||||
<cloud protocol="<%= @rss_cloud_config.scheme %>-rest" domain="<%= @rss_cloud_config.host %>"
|
||||
port="<%= @rss_cloud_config.port %>" path="<%= @rss_cloud_config.path %>" registerProcedure=""/>
|
||||
<webfeeds:logo><%= feed_logo() %></webfeeds:logo>
|
||||
<webfeeds:accentColor>2b90d9</webfeeds:accentColor>
|
||||
<%= for activity <- @activities do %>
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
<updated><%= most_recent_update(@activities, @user) %></updated>
|
||||
<logo><%= logo(@user) %></logo>
|
||||
<link rel="self" href="<%= '#{Routes.user_feed_url(@conn, :feed, @user.nickname)}.atom' %>" type="application/atom+xml"/>
|
||||
<link rel="hub" href="<%= Routes.web_sub_hub_url(@conn, :hub) %>"/>
|
||||
|
||||
<%= render @view_module, "_author.atom", assigns %>
|
||||
|
||||
|
|
|
@ -6,6 +6,8 @@
|
|||
<updated><%= most_recent_update(@activities, @user) %></updated>
|
||||
<image><%= logo(@user) %></image>
|
||||
<link><%= '#{Routes.user_feed_url(@conn, :feed, @user.nickname)}.rss' %></link>
|
||||
<cloud protocol="<%= @rss_cloud_config.scheme %>-rest" domain="<%= @rss_cloud_config.host %>"
|
||||
port="<%= @rss_cloud_config.port %>" path="<%= @rss_cloud_config.path %>" registerProcedure=""/>
|
||||
|
||||
<%= render @view_module, "_author.rss", assigns %>
|
||||
|
||||
|
|
|
@ -0,0 +1,120 @@
|
|||
defmodule Pleroma.Workers.DispatchFeedUpdateWorker do
|
||||
use Oban.Worker, queue: :feed_updates, max_attempts: 3
|
||||
require Logger
|
||||
|
||||
alias Pleroma.HTTP
|
||||
|
||||
alias Pleroma.Feed.Update
|
||||
alias Pleroma.Feed.Updates
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(%Oban.Job{
|
||||
args: %{
|
||||
"update_id" => update_id,
|
||||
"subscription_id" => subscription_id,
|
||||
"subscription_api" => api,
|
||||
"callback_url" => callback_url,
|
||||
"secret" => secret
|
||||
}
|
||||
}) do
|
||||
with %Update{} = update <- Pleroma.Feed.Updates.get_update_and_topic(update_id) do
|
||||
topic_url = update.topic.url
|
||||
api = String.to_existing_atom(api)
|
||||
|
||||
perform_request(api, callback_url, topic_url, update, secret)
|
||||
|> log_request(update.id, subscription_id)
|
||||
else
|
||||
# In case update has already been removed.
|
||||
_ ->
|
||||
Logger.error("Could not find update #{update_id}")
|
||||
{:error, "Update not found"}
|
||||
end
|
||||
end
|
||||
|
||||
defp perform_request(:websub, callback_url, topic_url, update, secret) do
|
||||
links = [
|
||||
"<#{topic_url}>; rel=self",
|
||||
"<https://cloud_hub.com/hub>; rel=hub"
|
||||
]
|
||||
|
||||
headers = [
|
||||
{"content-type", update.content_type},
|
||||
{"link", Enum.join(links, ", ")}
|
||||
]
|
||||
|
||||
headers =
|
||||
if secret do
|
||||
hmac = :crypto.mac(:hmac, :sha256, secret, update.body) |> Base.encode16(case: :lower)
|
||||
[{"x-hub-signature", "sha256=" <> hmac} | headers]
|
||||
else
|
||||
headers
|
||||
end
|
||||
|
||||
case HTTP.post(callback_url, update.body, headers) do
|
||||
{:ok, %Tesla.Env{status: code}} when code >= 200 and code < 300 ->
|
||||
Logger.debug("WebSub got OK response from #{callback_url}")
|
||||
{:ok, code}
|
||||
|
||||
{:ok, %Tesla.Env{status: 410}} ->
|
||||
# Invalidate this subscription
|
||||
{:ok, 410}
|
||||
|
||||
{:ok, %Tesla.Env{status: code}} ->
|
||||
{:failed, code}
|
||||
|
||||
{:error, reason} ->
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
|
||||
defp perform_request(:rsscloud, callback_url, topic_url, _update, _secret) do
|
||||
body = %{url: topic_url} |> URI.encode_query()
|
||||
headers = [{"content-type", "application/x-www-form-urlencoded"}]
|
||||
|
||||
case HTTP.post(callback_url, body, headers) do
|
||||
{:ok, %Tesla.Env{status: code}} when code >= 200 and code < 300 ->
|
||||
Logger.debug("RSSCloud got OK response from #{callback_url}")
|
||||
{:ok, code}
|
||||
|
||||
{:ok, %Tesla.Env{status: 410}} ->
|
||||
# Invalidate this subscription
|
||||
{:ok, 410}
|
||||
|
||||
{:ok, %Tesla.Env{status: code}} ->
|
||||
{:failed, code}
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.error("RSSCloud got ERROR at #{callback_url}: #{inspect(reason)}")
|
||||
{:error, 500}
|
||||
end
|
||||
end
|
||||
|
||||
defp perform_request(api, _callback_url, _topic_url, _update, _secret) do
|
||||
Logger.error("No such api #{api}")
|
||||
{:error, "invalid api"}
|
||||
end
|
||||
|
||||
defp log_request(res, update_id, subscription_id) do
|
||||
status_code =
|
||||
case res do
|
||||
{_, code} when is_integer(code) ->
|
||||
code
|
||||
|
||||
_ ->
|
||||
599
|
||||
end
|
||||
|
||||
# Will fail if either update at update_id or subscription at subscription_id is gone
|
||||
_ =
|
||||
case Updates.create_subscription_update(update_id, subscription_id, status_code) do
|
||||
{:ok, %{id: id}} ->
|
||||
Logger.debug("New update #{id} for subscription #{subscription_id}")
|
||||
|
||||
{:error, changeset} ->
|
||||
Logger.error("Failed to create update for subscription #{subscription_id}")
|
||||
Logger.error(" -> #{inspect(changeset.errors)}")
|
||||
end
|
||||
|
||||
res
|
||||
end
|
||||
end
|
|
@ -0,0 +1,46 @@
|
|||
defmodule Pleroma.Workers.PruneFeedSubscriptionsWorker do
|
||||
use Oban.Worker, queue: :prune_feed_subscriptions, max_attempts: 1
|
||||
require Logger
|
||||
|
||||
alias Pleroma.Feed.Subscription
|
||||
alias Pleroma.Feed.Subscriptions
|
||||
|
||||
# Orphaned topics live for another 3 hours
|
||||
@topic_lease_seconds 10_800
|
||||
|
||||
@impl Oban.Worker
|
||||
def perform(%Oban.Job{args: args}) do
|
||||
expiring =
|
||||
case Map.get(args, "expiring") do
|
||||
expiring when is_binary(expiring) ->
|
||||
{:ok, expiring, _} = DateTime.from_iso8601(expiring)
|
||||
DateTime.to_naive(expiring)
|
||||
|
||||
_ ->
|
||||
NaiveDateTime.utc_now()
|
||||
end
|
||||
|
||||
Logger.error("Pruning subscriptions and topics expiring before #{expiring}")
|
||||
|
||||
_ =
|
||||
Subscriptions.list_inactive_subscriptions(expiring)
|
||||
|> Enum.map(fn %Subscription{id: id} = subscription ->
|
||||
_ = Subscriptions.final_unsubscribe(subscription)
|
||||
id
|
||||
end)
|
||||
|
||||
_ =
|
||||
case Subscriptions.delete_all_inactive_subscriptions(@topic_lease_seconds, expiring) do
|
||||
{:error, reason} ->
|
||||
Logger.error("Transaction error deleting inactive subscriptions: #{reason}")
|
||||
|
||||
{count, _, _} ->
|
||||
Logger.error("Deleted #{count} inactive subscriptions")
|
||||
end
|
||||
|
||||
{count, _} = Subscriptions.delete_all_inactive_topics(expiring)
|
||||
Logger.error("Deleted #{count} inactive topics")
|
||||
|
||||
:ok
|
||||
end
|
||||
end
|
|
@ -0,0 +1,52 @@
|
|||
defmodule Pleroma.Repo.Migrations.CreateFeedTables do
|
||||
use Ecto.Migration
|
||||
|
||||
def change do
|
||||
create table(:feed_topics) do
|
||||
add(:url, :text)
|
||||
add(:expires_at, :naive_datetime)
|
||||
|
||||
timestamps()
|
||||
end
|
||||
|
||||
create unique_index(:feed_topics, [:url])
|
||||
create index(:feed_topics, :expires_at)
|
||||
|
||||
create table(:feed_subscriptions) do
|
||||
add(:topic_id, references(:feed_topics, on_delete: :delete_all))
|
||||
add(:api, :string)
|
||||
add(:callback_url, :text)
|
||||
add(:diff_domain, :boolean, null: false, default: false)
|
||||
add(:secret, :string, nullable: true)
|
||||
add(:lease_seconds, :float)
|
||||
add(:expires_at, :naive_datetime)
|
||||
|
||||
timestamps()
|
||||
end
|
||||
|
||||
create(unique_index(:feed_subscriptions, [:api, :topic_id, :callback_url]))
|
||||
create(index(:feed_subscriptions, :expires_at))
|
||||
|
||||
create table(:feed_updates) do
|
||||
add(:topic_id, references(:feed_topics, on_delete: :delete_all))
|
||||
add(:headers, :binary)
|
||||
add(:content_type, :text)
|
||||
add(:links, {:array, :text})
|
||||
add(:body, :binary)
|
||||
add(:hash, :string)
|
||||
|
||||
timestamps()
|
||||
end
|
||||
|
||||
create(index(:feed_updates, :inserted_at))
|
||||
|
||||
create table(:feed_subscription_updates) do
|
||||
add(:update_id, references(:feed_updates, on_delete: :delete_all))
|
||||
add(:subscription_id, references(:feed_subscriptions, on_delete: :delete_all))
|
||||
add(:status_code, :integer)
|
||||
add(:pushed_at, :naive_datetime)
|
||||
|
||||
timestamps()
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,142 @@
|
|||
defmodule Pleroma.Feed.QueryTest do
|
||||
use Pleroma.DataCase
|
||||
|
||||
alias Pleroma.Feed.Subscriptions
|
||||
alias Pleroma.Feed.SubscriptionUpdate
|
||||
alias Pleroma.Feed.Topic
|
||||
alias Pleroma.Feed.Updates
|
||||
|
||||
@subscription_lease_seconds 1
|
||||
@topic_lease_seconds 5
|
||||
|
||||
test "delete_all subscriptions cascades" do
|
||||
%{topics: _, subscriptions: subscriptions} = build_data()
|
||||
|
||||
subscription_updates =
|
||||
subscriptions
|
||||
|> Enum.map(fn subscription ->
|
||||
subscription = Repo.preload(subscription, :topic)
|
||||
topic = subscription.topic
|
||||
env = %Tesla.Env{status: 200, body: topic.url}
|
||||
|
||||
Enum.map(1..5, fn _i ->
|
||||
{:ok, update} = Updates.create_update(topic, env)
|
||||
|
||||
Enum.map(1..5, fn _j ->
|
||||
{:ok, subscription_update} =
|
||||
Updates.create_subscription_update(update.id, subscription.id, 200)
|
||||
|
||||
subscription_update
|
||||
end)
|
||||
end)
|
||||
end)
|
||||
|> List.flatten()
|
||||
|
||||
assert 100 == Enum.count(subscription_updates)
|
||||
|
||||
expiring =
|
||||
DateTime.utc_now()
|
||||
|> DateTime.add(60, :second)
|
||||
|
||||
assert {4, 2, _} =
|
||||
Subscriptions.delete_all_inactive_subscriptions(@topic_lease_seconds, expiring)
|
||||
|
||||
assert 0 == SubscriptionUpdate |> Repo.aggregate(:count, :id)
|
||||
end
|
||||
|
||||
test "removing all subscriptions for a topic sets an expiration" do
|
||||
%{topics: topics, subscriptions: _} = build_data()
|
||||
|
||||
[topic1_exp, topic2_exp] =
|
||||
topics
|
||||
|> Enum.with_index(fn topic, i ->
|
||||
topic_url = topic.url
|
||||
%Topic{subscriptions: subscriptions} = Repo.preload(topic, :subscriptions)
|
||||
|
||||
now = NaiveDateTime.utc_now()
|
||||
|
||||
# Remove one sub in topic1
|
||||
# Remove two subs (all of them) in topic2
|
||||
0..i
|
||||
|> Enum.map(fn j ->
|
||||
Enum.at(subscriptions, j)
|
||||
|> Subscriptions.delete_subscription(@topic_lease_seconds, now)
|
||||
end)
|
||||
|
||||
%Topic{expires_at: expires_at} = Subscriptions.get_topic_by_url(topic_url)
|
||||
expires_at
|
||||
end)
|
||||
|
||||
assert NaiveDateTime.compare(topic1_exp, ~N[2040-01-01 00:00:00]) == :gt
|
||||
# After removing all subs in topic2, topic2 will expire soon!
|
||||
assert NaiveDateTime.compare(topic2_exp, ~N[2040-01-01 00:00:00]) == :lt
|
||||
end
|
||||
|
||||
test "prunes topics after removing all subscriptions" do
|
||||
%{topics: _, subscriptions: _} = build_data()
|
||||
|
||||
expiring = Subscriptions.from_now(2 * @subscription_lease_seconds)
|
||||
|
||||
assert {4, 2, _} =
|
||||
Subscriptions.delete_all_inactive_subscriptions(@topic_lease_seconds, expiring)
|
||||
|
||||
expiring = NaiveDateTime.add(expiring, 2 * @topic_lease_seconds, :second)
|
||||
topics_will_expire = Subscriptions.list_inactive_topics(expiring)
|
||||
assert Enum.count(topics_will_expire) == 2
|
||||
|
||||
assert {2, _} = Subscriptions.delete_all_inactive_topics(expiring)
|
||||
end
|
||||
|
||||
def build_data() do
|
||||
{:ok, topic1} = Subscriptions.find_or_create_topic("http://publisher/topic1")
|
||||
{:ok, topic2} = Subscriptions.find_or_create_topic("http://publisher/topic2")
|
||||
assert topic1.id != topic2.id
|
||||
|
||||
{:ok, sub1_topic1} =
|
||||
Subscriptions.find_or_create_subscription(
|
||||
:websub,
|
||||
topic1,
|
||||
"http://subscriber/sub1",
|
||||
@subscription_lease_seconds,
|
||||
[]
|
||||
)
|
||||
|
||||
{:ok, sub1_topic2} =
|
||||
Subscriptions.find_or_create_subscription(
|
||||
:websub,
|
||||
topic2,
|
||||
"http://subscriber/sub1",
|
||||
@subscription_lease_seconds,
|
||||
[]
|
||||
)
|
||||
|
||||
{:ok, sub2_topic1} =
|
||||
Subscriptions.find_or_create_subscription(
|
||||
:websub,
|
||||
topic1,
|
||||
"http://subscriber/sub2",
|
||||
@subscription_lease_seconds,
|
||||
[]
|
||||
)
|
||||
|
||||
{:ok, sub2_topic2} =
|
||||
Subscriptions.find_or_create_subscription(
|
||||
:websub,
|
||||
topic2,
|
||||
"http://subscriber/sub2",
|
||||
@subscription_lease_seconds,
|
||||
[]
|
||||
)
|
||||
|
||||
topic1 = Repo.preload(topic1, :subscriptions)
|
||||
assert 2 == Enum.count(topic1.subscriptions)
|
||||
|
||||
topic2 = Repo.preload(topic2, :subscriptions)
|
||||
assert 2 == Enum.count(topic2.subscriptions)
|
||||
|
||||
%{
|
||||
topics: [topic1, topic2],
|
||||
subscriptions: [sub1_topic1, sub1_topic2, sub2_topic1, sub2_topic2]
|
||||
}
|
||||
end
|
||||
end
|
|
@ -0,0 +1,479 @@
|
|||
defmodule Pleroma.Feed.RSSCloudTest do
|
||||
use Pleroma.DataCase
|
||||
use Oban.Testing, repo: Pleroma.Repo
|
||||
|
||||
@subscription_lease_seconds 90_000
|
||||
@content_type_text_plain [{"content-type", "text/plain"}]
|
||||
@html_body """
|
||||
<!doctype html>
|
||||
<html lang=en>
|
||||
<head>
|
||||
<meta charset=utf-8>
|
||||
<title>blah</title>
|
||||
</head>
|
||||
<body>
|
||||
<p>I'm the content</p>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
@text_body "Hello world"
|
||||
@json_body %{"hello" => "world"}
|
||||
@xml_body """
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<rss version="2.0">
|
||||
<channel>
|
||||
<title>Scripting News</title>
|
||||
<link>http://scripting.com/</link>
|
||||
<description>It's even worse than it appears..</description>
|
||||
<pubDate>Wed, 14 Dec 2022 16:36:13 GMT</pubDate>
|
||||
<lastBuildDate>Wed, 14 Dec 2022 17:54:45 GMT</lastBuildDate>
|
||||
<item>
|
||||
<description>The idea of <a href="http://textcasting.org/">textcasting</a> is like podcasting.</description> <pubDate>Wed, 14 Dec 2022 13:44:21 GMT</pubDate>
|
||||
<link>http://scripting.com/2022/12/14.html#a134421</link>
|
||||
<guid>http://scripting.com/2022/12/14.html#a134421</guid>
|
||||
</item>
|
||||
</channel>
|
||||
</rss>
|
||||
"""
|
||||
|
||||
@moduledoc """
|
||||
Implements the tests described by https://websub.rocks/hub
|
||||
"""
|
||||
|
||||
alias Pleroma.Feed.Updates
|
||||
alias Pleroma.Feed.Subscriptions
|
||||
|
||||
describe "100 - Typical subscriber request" do
|
||||
@doc """
|
||||
This subscriber will include only the parameters hub.mode, hub.topic and hub.callback. The hub should deliver notifications with no signature.
|
||||
"""
|
||||
|
||||
setup :setup_html_publisher
|
||||
|
||||
test "100 - Typical subscriber request", %{
|
||||
subscriber_url: callback_url,
|
||||
publisher_url: topic_url
|
||||
} do
|
||||
assert {:ok, subscription} =
|
||||
Subscriptions.subscribe(
|
||||
:rsscloud,
|
||||
topic_url,
|
||||
callback_url,
|
||||
@subscription_lease_seconds
|
||||
)
|
||||
|
||||
assert {:ok, update} = Updates.publish(topic_url)
|
||||
assert update.content_type == "text/html; charset=UTF-8"
|
||||
|
||||
assert_enqueued(
|
||||
worker: Pleroma.Workers.DispatchFeedUpdateWorker,
|
||||
args: %{
|
||||
update_id: update.id,
|
||||
subscription_id: subscription.id,
|
||||
subscription_api: "rsscloud",
|
||||
callback_url: callback_url,
|
||||
secret: nil
|
||||
}
|
||||
)
|
||||
|
||||
assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :feed_updates)
|
||||
|
||||
assert Tesla.Mock.Agent.hits(:publisher) == 1
|
||||
assert Tesla.Mock.Agent.hits(:subscriber) == 2
|
||||
|
||||
[_challenge, publish] = Tesla.Mock.Agent.access_list(:subscriber)
|
||||
assert publish.body == "url=" <> URI.encode_www_form(topic_url)
|
||||
|
||||
assert Tesla.get_header(publish, "content-type") ==
|
||||
"application/x-www-form-urlencoded"
|
||||
end
|
||||
|
||||
test "does not get publish if already unsubscribed", %{
|
||||
subscriber_url: callback_url,
|
||||
publisher_url: topic_url
|
||||
} do
|
||||
assert {:ok, subscription} =
|
||||
Subscriptions.subscribe(
|
||||
:rsscloud,
|
||||
topic_url,
|
||||
callback_url,
|
||||
@subscription_lease_seconds
|
||||
)
|
||||
|
||||
{:ok, _} = Subscriptions.unsubscribe(topic_url, callback_url)
|
||||
|
||||
# Quick sleep
|
||||
:timer.sleep(1000)
|
||||
|
||||
assert {:ok, update} = Updates.publish(topic_url)
|
||||
|
||||
refute_enqueued(
|
||||
worker: Pleroma.Workers.DispatchFeedUpdateWorker,
|
||||
args: %{
|
||||
update_id: update.id,
|
||||
subscription_id: subscription.id,
|
||||
callback_url: callback_url,
|
||||
secret: nil
|
||||
}
|
||||
)
|
||||
|
||||
assert Tesla.Mock.Agent.hits(:publisher) == 1
|
||||
|
||||
# Note that :rsscloud does not notify on unsubscription
|
||||
assert Tesla.Mock.Agent.hits(:subscriber) == 1
|
||||
end
|
||||
end
|
||||
|
||||
describe "102 - Subscriber sends additional parameters" do
|
||||
@doc """
|
||||
This subscriber will include some additional parameters in the request, which must be ignored by the hub if the hub doesn't recognize them.
|
||||
"""
|
||||
test "102 - Subscriber sends additional parameters", %{} do
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
This subscriber tests whether the hub allows subscriptions to be re-subscribed before they expire. The hub must allow a subscription to be re-activated, and must update the previous subscription based on the topic+callback pair, rather than creating a new subscription.
|
||||
"""
|
||||
test "103 - Subscriber re-subscribes before the subscription expires", %{} do
|
||||
end
|
||||
|
||||
@doc """
|
||||
This test will first subscribe to a topic, and will then send an unsubscription request. You will be able to test that the unsubscription is confirmed by seeing that a notification is not received when a new post is published.
|
||||
"""
|
||||
test "104 - Unsubscribe request", %{} do
|
||||
end
|
||||
|
||||
describe "105 - Plaintext content" do
|
||||
@doc """
|
||||
This test will check whether your hub can handle delivering content that is not HTML or XML. The content at the topic URL of this test is plaintext.
|
||||
"""
|
||||
|
||||
setup :setup_text_publisher
|
||||
|
||||
test "105 - Plaintext content", %{
|
||||
subscriber_url: callback_url,
|
||||
publisher_url: topic_url
|
||||
} do
|
||||
assert {:ok, subscription} =
|
||||
Subscriptions.subscribe(
|
||||
:rsscloud,
|
||||
topic_url,
|
||||
callback_url,
|
||||
@subscription_lease_seconds
|
||||
)
|
||||
|
||||
assert {:ok, update} = Updates.publish(topic_url)
|
||||
assert update.content_type == "text/plain"
|
||||
|
||||
assert_enqueued(
|
||||
worker: Pleroma.Workers.DispatchFeedUpdateWorker,
|
||||
args: %{
|
||||
update_id: update.id,
|
||||
subscription_id: subscription.id,
|
||||
subscription_api: "rsscloud",
|
||||
callback_url: callback_url,
|
||||
secret: nil
|
||||
}
|
||||
)
|
||||
|
||||
assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :feed_updates)
|
||||
|
||||
assert Tesla.Mock.Agent.hits(:publisher) == 1
|
||||
assert Tesla.Mock.Agent.hits(:subscriber) == 2
|
||||
|
||||
[_challenge, publish] = Tesla.Mock.Agent.access_list(:subscriber)
|
||||
assert publish.body == "url=" <> URI.encode_www_form(topic_url)
|
||||
|
||||
assert Tesla.get_header(publish, "content-type") ==
|
||||
"application/x-www-form-urlencoded"
|
||||
end
|
||||
end
|
||||
|
||||
describe "106 - JSON content" do
|
||||
@doc """
|
||||
This test will check whether your hub can handle delivering content that is not HTML or XML. The content at the topic URL of this test is JSON.
|
||||
"""
|
||||
|
||||
setup :setup_json_publisher
|
||||
|
||||
test "106 - JSON content", %{
|
||||
subscriber_url: callback_url,
|
||||
publisher_url: topic_url
|
||||
} do
|
||||
assert {:ok, subscription} =
|
||||
Subscriptions.subscribe(
|
||||
:rsscloud,
|
||||
topic_url,
|
||||
callback_url,
|
||||
@subscription_lease_seconds
|
||||
)
|
||||
|
||||
assert {:ok, update} = Updates.publish(topic_url)
|
||||
assert update.content_type == "application/json"
|
||||
|
||||
assert_enqueued(
|
||||
worker: Pleroma.Workers.DispatchFeedUpdateWorker,
|
||||
args: %{
|
||||
update_id: update.id,
|
||||
subscription_id: subscription.id,
|
||||
subscription_api: "rsscloud",
|
||||
callback_url: callback_url,
|
||||
secret: nil
|
||||
}
|
||||
)
|
||||
|
||||
assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :feed_updates)
|
||||
|
||||
assert Tesla.Mock.Agent.hits(:publisher) == 1
|
||||
assert Tesla.Mock.Agent.hits(:subscriber) == 2
|
||||
|
||||
[_challenge, publish] = Tesla.Mock.Agent.access_list(:subscriber)
|
||||
assert publish.body == "url=" <> URI.encode_www_form(topic_url)
|
||||
|
||||
assert Tesla.get_header(publish, "content-type") ==
|
||||
"application/x-www-form-urlencoded"
|
||||
end
|
||||
end
|
||||
|
||||
describe "XML content" do
|
||||
@doc """
|
||||
This test will check whether your hub can handle delivering content that is not HTML or XML. The content at the topic URL of this test is JSON.
|
||||
"""
|
||||
|
||||
setup :setup_xml_publisher
|
||||
|
||||
test "XML content", %{
|
||||
subscriber_url: callback_url,
|
||||
publisher_url: topic_url
|
||||
} do
|
||||
assert {:ok, subscription} =
|
||||
Subscriptions.subscribe(
|
||||
:rsscloud,
|
||||
topic_url,
|
||||
callback_url,
|
||||
@subscription_lease_seconds
|
||||
)
|
||||
|
||||
assert {:ok, update} = Updates.publish(topic_url)
|
||||
assert update.content_type == "application/rss+xml"
|
||||
|
||||
assert_enqueued(
|
||||
worker: Pleroma.Workers.DispatchFeedUpdateWorker,
|
||||
args: %{
|
||||
update_id: update.id,
|
||||
subscription_id: subscription.id,
|
||||
subscription_api: "rsscloud",
|
||||
callback_url: callback_url,
|
||||
secret: nil
|
||||
}
|
||||
)
|
||||
|
||||
assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :feed_updates)
|
||||
|
||||
assert Tesla.Mock.Agent.hits(:publisher) == 1
|
||||
assert Tesla.Mock.Agent.hits(:subscriber) == 2
|
||||
|
||||
[_challenge, publish] = Tesla.Mock.Agent.access_list(:subscriber)
|
||||
assert publish.body == "url=" <> URI.encode_www_form(topic_url)
|
||||
|
||||
assert Tesla.get_header(publish, "content-type") ==
|
||||
"application/x-www-form-urlencoded"
|
||||
end
|
||||
end
|
||||
|
||||
describe "pruning" do
|
||||
@doc """
|
||||
This test will check whether we can prune expired subscriptions.
|
||||
"""
|
||||
|
||||
setup :setup_xml_publisher
|
||||
|
||||
test "after typical subscriber request", %{
|
||||
subscriber_url: callback_url,
|
||||
publisher_url: topic_url
|
||||
} do
|
||||
assert {:ok, subscription} =
|
||||
Subscriptions.subscribe(
|
||||
:rsscloud,
|
||||
topic_url,
|
||||
callback_url,
|
||||
@subscription_lease_seconds
|
||||
)
|
||||
|
||||
assert {:ok, update} = Updates.publish(topic_url)
|
||||
assert update.content_type == "application/rss+xml"
|
||||
|
||||
assert_enqueued(
|
||||
worker: Pleroma.Workers.DispatchFeedUpdateWorker,
|
||||
args: %{
|
||||
update_id: update.id,
|
||||
subscription_id: subscription.id,
|
||||
subscription_api: "rsscloud",
|
||||
callback_url: callback_url,
|
||||
secret: nil
|
||||
}
|
||||
)
|
||||
|
||||
expiring =
|
||||
Subscriptions.from_now(1_000_000)
|
||||
|> DateTime.from_naive!("Etc/UTC")
|
||||
|> DateTime.to_iso8601()
|
||||
|
||||
Pleroma.Workers.PruneFeedSubscriptionsWorker.new(%{expiring: expiring})
|
||||
|> Oban.insert()
|
||||
|
||||
assert_enqueued(
|
||||
worker: Pleroma.Workers.PruneFeedSubscriptionsWorker,
|
||||
args: %{
|
||||
expiring: expiring
|
||||
}
|
||||
)
|
||||
|
||||
assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :prune_feed_subscriptions)
|
||||
assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :feed_updates)
|
||||
end
|
||||
end
|
||||
|
||||
def setup_html_publisher(_) do
|
||||
publisher_url = "http://localhost/publisher/posts"
|
||||
subscriber_url = "http://localhost/subscriber/callback"
|
||||
|
||||
Tesla.Mock.mock(fn
|
||||
%{url: ^publisher_url} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:publisher, req)
|
||||
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: @html_body,
|
||||
headers: [
|
||||
{"content-type", "text/html; charset=UTF-8"}
|
||||
]
|
||||
}
|
||||
|
||||
%{url: ^subscriber_url} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:subscriber, req)
|
||||
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: "ok",
|
||||
headers: @content_type_text_plain
|
||||
}
|
||||
|
||||
_not_matched ->
|
||||
%Tesla.Env{
|
||||
status: 404,
|
||||
body: "not found",
|
||||
headers: @content_type_text_plain
|
||||
}
|
||||
end)
|
||||
|
||||
[publisher_url: publisher_url, subscriber_url: subscriber_url]
|
||||
end
|
||||
|
||||
def setup_text_publisher(_) do
|
||||
publisher_url = "http://localhost/publisher/posts"
|
||||
subscriber_url = "http://localhost/subscriber/callback"
|
||||
|
||||
Tesla.Mock.mock(fn
|
||||
%{url: ^publisher_url} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:publisher, req)
|
||||
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: @text_body,
|
||||
headers: @content_type_text_plain
|
||||
}
|
||||
|
||||
%{url: ^subscriber_url} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:subscriber, req)
|
||||
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: "ok",
|
||||
headers: @content_type_text_plain
|
||||
}
|
||||
|
||||
_not_matched ->
|
||||
%Tesla.Env{
|
||||
status: 404,
|
||||
body: "not found",
|
||||
headers: @content_type_text_plain
|
||||
}
|
||||
end)
|
||||
|
||||
[publisher_url: publisher_url, subscriber_url: subscriber_url]
|
||||
end
|
||||
|
||||
def setup_json_publisher(_) do
|
||||
publisher_url = "http://localhost/publisher/posts"
|
||||
subscriber_url = "http://localhost/subscriber/callback"
|
||||
|
||||
Tesla.Mock.mock(fn
|
||||
%{url: ^publisher_url} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:publisher, req)
|
||||
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: Jason.encode!(@json_body),
|
||||
headers: [
|
||||
{"content-type", "application/json"}
|
||||
]
|
||||
}
|
||||
|
||||
%{url: ^subscriber_url} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:subscriber, req)
|
||||
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: "ok",
|
||||
headers: @content_type_text_plain
|
||||
}
|
||||
|
||||
_not_matched ->
|
||||
%Tesla.Env{
|
||||
status: 404,
|
||||
body: "not found",
|
||||
headers: @content_type_text_plain
|
||||
}
|
||||
end)
|
||||
|
||||
[publisher_url: publisher_url, subscriber_url: subscriber_url]
|
||||
end
|
||||
|
||||
def setup_xml_publisher(_) do
|
||||
publisher_url = "http://localhost/publisher/posts"
|
||||
subscriber_url = "http://localhost/subscriber/callback"
|
||||
|
||||
Tesla.Mock.mock(fn
|
||||
%{url: ^publisher_url} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:publisher, req)
|
||||
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: @xml_body,
|
||||
headers: [
|
||||
{"content-type", "application/rss+xml"}
|
||||
]
|
||||
}
|
||||
|
||||
%{url: ^subscriber_url} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:subscriber, req)
|
||||
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: "ok",
|
||||
headers: @content_type_text_plain
|
||||
}
|
||||
|
||||
_not_matched ->
|
||||
%Tesla.Env{
|
||||
status: 404,
|
||||
body: "not found",
|
||||
headers: @content_type_text_plain
|
||||
}
|
||||
end)
|
||||
|
||||
[publisher_url: publisher_url, subscriber_url: subscriber_url]
|
||||
end
|
||||
end
|
|
@ -0,0 +1,3 @@
|
|||
defmodule Pleroma.Feed.SubscriptionsTest do
|
||||
use Pleroma.DataCase
|
||||
end
|
|
@ -0,0 +1,80 @@
|
|||
defmodule Pleroma.Feed.UpdatesTest do
|
||||
use Pleroma.DataCase
|
||||
use Oban.Testing, repo: Pleroma.Repo
|
||||
|
||||
alias Pleroma.Feed.Subscriptions
|
||||
alias Pleroma.Feed.Updates
|
||||
|
||||
@subscription_lease_seconds 5
|
||||
@content_type_text_plain [{"content-type", "text/plain"}]
|
||||
@html_body """
|
||||
<!doctype html>
|
||||
<html lang=en>
|
||||
<head>
|
||||
<meta charset=utf-8>
|
||||
<title>blah</title>
|
||||
</head>
|
||||
<body>
|
||||
<p>I'm the content</p>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
|
||||
describe "updates" do
|
||||
test "publishing update dispatches jobs" do
|
||||
topic_url = "https://localhost/publisher/topic/123"
|
||||
callback_url = "http://localhost/subscriber/callback"
|
||||
|
||||
Tesla.Mock.mock(fn
|
||||
%{url: ^topic_url} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:publisher, req)
|
||||
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: @html_body,
|
||||
headers: [
|
||||
{"content-type", "text/html; charset=UTF-8"}
|
||||
]
|
||||
}
|
||||
|
||||
%{method: :get, url: ^callback_url, query: query} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:subscriber, req)
|
||||
|
||||
query = Map.new(query)
|
||||
|
||||
if Map.has_key?(query, "hub.challenge") do
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: Map.get(query, "hub.challenge"),
|
||||
headers: @content_type_text_plain
|
||||
}
|
||||
else
|
||||
%Tesla.Env{status: 400, body: "no challenge", headers: @content_type_text_plain}
|
||||
end
|
||||
|
||||
_not_matched ->
|
||||
%Tesla.Env{
|
||||
status: 404,
|
||||
body: "not found",
|
||||
headers: @content_type_text_plain
|
||||
}
|
||||
end)
|
||||
|
||||
assert {:ok, _} =
|
||||
Subscriptions.subscribe(
|
||||
:websub,
|
||||
topic_url,
|
||||
callback_url,
|
||||
@subscription_lease_seconds
|
||||
)
|
||||
|
||||
assert {:ok, update} = Updates.publish(topic_url)
|
||||
|
||||
assert Tesla.Mock.Agent.hits(:subscriber) == 1
|
||||
|
||||
assert [job] = all_enqueued(worker: Pleroma.Workers.DispatchFeedUpdateWorker)
|
||||
assert job.args["update_id"] == update.id
|
||||
assert job.args["callback_url"] == callback_url
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,472 @@
|
|||
defmodule Pleroma.Feed.WebSubTest do
|
||||
use Pleroma.DataCase
|
||||
use Oban.Testing, repo: Pleroma.Repo
|
||||
|
||||
# WebSub subcriptions expire after 10 days
|
||||
@subscription_lease_seconds 864_000
|
||||
@content_type_text_plain [{"content-type", "text/plain"}]
|
||||
@html_body """
|
||||
<!doctype html>
|
||||
<html lang=en>
|
||||
<head>
|
||||
<meta charset=utf-8>
|
||||
<title>blah</title>
|
||||
</head>
|
||||
<body>
|
||||
<p>I'm the content</p>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
@text_body "Hello world"
|
||||
@json_body %{"hello" => "world"}
|
||||
@xml_body """
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<rss version="2.0">
|
||||
<channel>
|
||||
<title>Scripting News</title>
|
||||
<link>http://scripting.com/</link>
|
||||
<description>It's even worse than it appears..</description>
|
||||
<pubDate>Wed, 14 Dec 2022 16:36:13 GMT</pubDate>
|
||||
<lastBuildDate>Wed, 14 Dec 2022 17:54:45 GMT</lastBuildDate>
|
||||
<item>
|
||||
<description>The idea of <a href="http://textcasting.org/">textcasting</a> is like podcasting.</description> <pubDate>Wed, 14 Dec 2022 13:44:21 GMT</pubDate>
|
||||
<link>http://scripting.com/2022/12/14.html#a134421</link>
|
||||
<guid>http://scripting.com/2022/12/14.html#a134421</guid>
|
||||
</item>
|
||||
</channel>
|
||||
</rss>
|
||||
"""
|
||||
|
||||
@moduledoc """
|
||||
Implements the tests described by https://websub.rocks/hub
|
||||
"""
|
||||
|
||||
alias Pleroma.Feed.Updates
|
||||
alias Pleroma.Feed.Subscriptions
|
||||
|
||||
describe "100 - Typical subscriber request" do
|
||||
@doc """
|
||||
This subscriber will include only the parameters hub.mode, hub.topic and hub.callback. The hub should deliver notifications with no signature.
|
||||
"""
|
||||
|
||||
setup :setup_html_publisher
|
||||
|
||||
test "100 - Typical subscriber request", %{
|
||||
subscriber_url: callback_url,
|
||||
publisher_url: topic_url
|
||||
} do
|
||||
assert {:ok, subscription} =
|
||||
Subscriptions.subscribe(
|
||||
:websub,
|
||||
topic_url,
|
||||
callback_url,
|
||||
@subscription_lease_seconds
|
||||
)
|
||||
|
||||
assert {:ok, update} = Updates.publish(topic_url)
|
||||
|
||||
assert_enqueued(
|
||||
worker: Pleroma.Workers.DispatchFeedUpdateWorker,
|
||||
args: %{
|
||||
update_id: update.id,
|
||||
subscription_id: subscription.id,
|
||||
subscription_api: "websub",
|
||||
callback_url: callback_url,
|
||||
secret: nil
|
||||
}
|
||||
)
|
||||
|
||||
assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :feed_updates)
|
||||
|
||||
assert Tesla.Mock.Agent.hits(:publisher) == 1
|
||||
assert Tesla.Mock.Agent.hits(:subscriber) == 2
|
||||
[_challenge, publish] = Tesla.Mock.Agent.access_list(:subscriber)
|
||||
assert publish.body == @html_body
|
||||
end
|
||||
|
||||
test "does not get publish if already unsubscribed", %{
|
||||
subscriber_url: callback_url,
|
||||
publisher_url: topic_url
|
||||
} do
|
||||
assert {:ok, subscription} =
|
||||
Subscriptions.subscribe(
|
||||
:websub,
|
||||
topic_url,
|
||||
callback_url,
|
||||
@subscription_lease_seconds
|
||||
)
|
||||
|
||||
{:ok, _} = Subscriptions.unsubscribe(topic_url, callback_url)
|
||||
|
||||
# Quick sleep
|
||||
:timer.sleep(1000)
|
||||
|
||||
assert {:ok, update} = Updates.publish(topic_url)
|
||||
|
||||
refute_enqueued(
|
||||
worker: Pleroma.Workers.DispatchFeedUpdateWorker,
|
||||
args: %{
|
||||
update_id: update.id,
|
||||
subscription_id: subscription.id,
|
||||
callback_url: callback_url,
|
||||
secret: nil
|
||||
}
|
||||
)
|
||||
|
||||
assert Tesla.Mock.Agent.hits(:publisher) == 1
|
||||
assert Tesla.Mock.Agent.hits(:subscriber) == 2
|
||||
end
|
||||
end
|
||||
|
||||
describe "101 - Subscriber includes a secret" do
|
||||
@doc """
|
||||
This subscriber will include the parameters hub.mode, hub.topic, hub.callback and hub.secret. The hub should deliver notifications with a signature computed using this secret.
|
||||
"""
|
||||
|
||||
setup :setup_html_publisher
|
||||
|
||||
test "101 - Subscriber includes a secret", %{
|
||||
subscriber_url: callback_url,
|
||||
publisher_url: topic_url
|
||||
} do
|
||||
{:ok, subscription} =
|
||||
Subscriptions.subscribe(:websub, topic_url, callback_url, @subscription_lease_seconds,
|
||||
secret: "some_secret"
|
||||
)
|
||||
|
||||
assert {:ok, update} = Updates.publish(topic_url)
|
||||
|
||||
assert_enqueued(
|
||||
worker: Pleroma.Workers.DispatchFeedUpdateWorker,
|
||||
args: %{
|
||||
update_id: update.id,
|
||||
subscription_id: subscription.id,
|
||||
subscription_api: "websub",
|
||||
callback_url: callback_url,
|
||||
secret: "some_secret"
|
||||
}
|
||||
)
|
||||
|
||||
assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :feed_updates)
|
||||
|
||||
assert Tesla.Mock.Agent.hits(:publisher) == 1
|
||||
assert Tesla.Mock.Agent.hits(:subscriber) == 2
|
||||
|
||||
[_challenge, publish] = Tesla.Mock.Agent.access_list(:subscriber)
|
||||
assert publish.body == @html_body
|
||||
|
||||
assert Tesla.get_header(publish, "x-hub-signature") ==
|
||||
"sha256=9d63c6c06dca350aaa6955f9e4017b801fc56b4a904f2e4dab68652b6abfda4c"
|
||||
end
|
||||
end
|
||||
|
||||
describe "102 - Subscriber sends additional parameters" do
|
||||
@doc """
|
||||
This subscriber will include some additional parameters in the request, which must be ignored by the hub if the hub doesn't recognize them.
|
||||
"""
|
||||
test "102 - Subscriber sends additional parameters", %{} do
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
This subscriber tests whether the hub allows subscriptions to be re-subscribed before they expire. The hub must allow a subscription to be re-activated, and must update the previous subscription based on the topic+callback pair, rather than creating a new subscription.
|
||||
"""
|
||||
test "103 - Subscriber re-subscribes before the subscription expires", %{} do
|
||||
end
|
||||
|
||||
@doc """
|
||||
This test will first subscribe to a topic, and will then send an unsubscription request. You will be able to test that the unsubscription is confirmed by seeing that a notification is not received when a new post is published.
|
||||
"""
|
||||
test "104 - Unsubscribe request", %{} do
|
||||
end
|
||||
|
||||
describe "105 - Plaintext content" do
|
||||
@doc """
|
||||
This test will check whether your hub can handle delivering content that is not HTML or XML. The content at the topic URL of this test is plaintext.
|
||||
"""
|
||||
|
||||
setup :setup_text_publisher
|
||||
|
||||
test "105 - Plaintext content", %{
|
||||
subscriber_url: callback_url,
|
||||
publisher_url: topic_url
|
||||
} do
|
||||
assert {:ok, subscription} =
|
||||
Subscriptions.subscribe(
|
||||
:websub,
|
||||
topic_url,
|
||||
callback_url,
|
||||
@subscription_lease_seconds
|
||||
)
|
||||
|
||||
assert {:ok, update} = Updates.publish(topic_url)
|
||||
|
||||
assert_enqueued(
|
||||
worker: Pleroma.Workers.DispatchFeedUpdateWorker,
|
||||
args: %{
|
||||
update_id: update.id,
|
||||
subscription_id: subscription.id,
|
||||
subscription_api: "websub",
|
||||
callback_url: callback_url,
|
||||
secret: nil
|
||||
}
|
||||
)
|
||||
|
||||
assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :feed_updates)
|
||||
|
||||
assert Tesla.Mock.Agent.hits(:publisher) == 1
|
||||
assert Tesla.Mock.Agent.hits(:subscriber) == 2
|
||||
|
||||
[_challenge, publish] = Tesla.Mock.Agent.access_list(:subscriber)
|
||||
assert publish.body == @text_body
|
||||
assert Tesla.get_header(publish, "content-type") == "text/plain"
|
||||
|
||||
assert Tesla.get_header(publish, "link") ==
|
||||
"<#{topic_url}>; rel=self, <https://cloud_hub.com/hub>; rel=hub"
|
||||
end
|
||||
end
|
||||
|
||||
describe "106 - JSON content" do
|
||||
@doc """
|
||||
This test will check whether your hub can handle delivering content that is not HTML or XML. The content at the topic URL of this test is JSON.
|
||||
"""
|
||||
|
||||
setup :setup_json_publisher
|
||||
|
||||
test "106 - JSON content", %{
|
||||
subscriber_url: callback_url,
|
||||
publisher_url: topic_url
|
||||
} do
|
||||
assert {:ok, subscription} =
|
||||
Subscriptions.subscribe(
|
||||
:websub,
|
||||
topic_url,
|
||||
callback_url,
|
||||
@subscription_lease_seconds
|
||||
)
|
||||
|
||||
assert {:ok, update} = Updates.publish(topic_url)
|
||||
|
||||
assert_enqueued(
|
||||
worker: Pleroma.Workers.DispatchFeedUpdateWorker,
|
||||
args: %{
|
||||
update_id: update.id,
|
||||
subscription_id: subscription.id,
|
||||
subscription_api: "websub",
|
||||
callback_url: callback_url,
|
||||
secret: nil
|
||||
}
|
||||
)
|
||||
|
||||
assert %{success: 1, failure: 0} = Oban.drain_queue(queue: :feed_updates)
|
||||
|
||||
assert Tesla.Mock.Agent.hits(:publisher) == 1
|
||||
assert Tesla.Mock.Agent.hits(:subscriber) == 2
|
||||
|
||||
[_challenge, publish] = Tesla.Mock.Agent.access_list(:subscriber)
|
||||
assert Jason.decode!(publish.body) == @json_body
|
||||
assert Tesla.get_header(publish, "content-type") == "application/json"
|
||||
|
||||
assert Tesla.get_header(publish, "link") ==
|
||||
"<#{topic_url}>; rel=self, <https://cloud_hub.com/hub>; rel=hub"
|
||||
end
|
||||
end
|
||||
|
||||
def setup_html_publisher(_) do
|
||||
publisher_url = "http://localhost/publisher/posts"
|
||||
subscriber_url = "http://localhost/subscriber/callback"
|
||||
|
||||
Tesla.Mock.mock(fn
|
||||
%{url: ^publisher_url} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:publisher, req)
|
||||
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: @html_body,
|
||||
headers: [
|
||||
{"content-type", "text/html; charset=UTF-8"}
|
||||
]
|
||||
}
|
||||
|
||||
%{url: ^subscriber_url, method: :get, query: query} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:subscriber, req)
|
||||
query = Map.new(query)
|
||||
|
||||
if Map.has_key?(query, "hub.challenge") do
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: Map.get(query, "hub.challenge"),
|
||||
headers: @content_type_text_plain
|
||||
}
|
||||
else
|
||||
%Tesla.Env{status: 400, body: "no challenge", headers: @content_type_text_plain}
|
||||
end
|
||||
|
||||
%{url: ^subscriber_url, method: :post} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:subscriber, req)
|
||||
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: "ok",
|
||||
headers: @content_type_text_plain
|
||||
}
|
||||
|
||||
_not_matched ->
|
||||
%Tesla.Env{
|
||||
status: 404,
|
||||
body: "not found",
|
||||
headers: [{"content-type", "text/plain"}]
|
||||
}
|
||||
end)
|
||||
|
||||
[publisher_url: publisher_url, subscriber_url: subscriber_url]
|
||||
end
|
||||
|
||||
def setup_text_publisher(_) do
|
||||
publisher_url = "http://localhost/publisher/posts"
|
||||
subscriber_url = "http://localhost/subscriber/callback"
|
||||
|
||||
Tesla.Mock.mock(fn
|
||||
%{url: ^publisher_url} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:publisher, req)
|
||||
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: @text_body,
|
||||
headers: @content_type_text_plain
|
||||
}
|
||||
|
||||
%{url: ^subscriber_url, method: :get, query: query} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:subscriber, req)
|
||||
query = Map.new(query)
|
||||
|
||||
if Map.has_key?(query, "hub.challenge") do
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: Map.get(query, "hub.challenge"),
|
||||
headers: @content_type_text_plain
|
||||
}
|
||||
else
|
||||
%Tesla.Env{status: 400, body: "no challenge", headers: @content_type_text_plain}
|
||||
end
|
||||
|
||||
%{url: ^subscriber_url, method: :post} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:subscriber, req)
|
||||
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: "ok",
|
||||
headers: @content_type_text_plain
|
||||
}
|
||||
|
||||
_not_matched ->
|
||||
%Tesla.Env{
|
||||
status: 404,
|
||||
body: "not found",
|
||||
headers: [{"content-type", "text/plain"}]
|
||||
}
|
||||
end)
|
||||
|
||||
[publisher_url: publisher_url, subscriber_url: subscriber_url]
|
||||
end
|
||||
|
||||
def setup_json_publisher(_) do
|
||||
publisher_url = "http://localhost/publisher/posts"
|
||||
subscriber_url = "http://localhost/subscriber/callback"
|
||||
|
||||
Tesla.Mock.mock(fn
|
||||
%{url: ^publisher_url} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:publisher, req)
|
||||
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: Jason.encode!(@json_body),
|
||||
headers: [
|
||||
{"content-type", "application/json"}
|
||||
]
|
||||
}
|
||||
|
||||
%{url: ^subscriber_url, method: :get, query: query} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:subscriber, req)
|
||||
query = Map.new(query)
|
||||
|
||||
if Map.has_key?(query, "hub.challenge") do
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: Map.get(query, "hub.challenge"),
|
||||
headers: @content_type_text_plain
|
||||
}
|
||||
else
|
||||
%Tesla.Env{status: 400, body: "no challenge", headers: @content_type_text_plain}
|
||||
end
|
||||
|
||||
%{url: ^subscriber_url, method: :post} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:subscriber, req)
|
||||
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: "ok",
|
||||
headers: @content_type_text_plain
|
||||
}
|
||||
|
||||
_not_matched ->
|
||||
%Tesla.Env{
|
||||
status: 404,
|
||||
body: "not found",
|
||||
headers: [{"content-type", "text/plain"}]
|
||||
}
|
||||
end)
|
||||
|
||||
[publisher_url: publisher_url, subscriber_url: subscriber_url]
|
||||
end
|
||||
|
||||
def setup_xml_publisher(_) do
|
||||
publisher_url = "http://localhost/publisher/posts"
|
||||
subscriber_url = "http://localhost/subscriber/callback"
|
||||
|
||||
Tesla.Mock.mock(fn
|
||||
%{url: ^publisher_url} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:publisher, req)
|
||||
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: @xml_body,
|
||||
headers: [
|
||||
{"content-type", "application/rss+xml"}
|
||||
]
|
||||
}
|
||||
|
||||
%{url: ^subscriber_url, method: :get, query: query} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:subscriber, req)
|
||||
query = Map.new(query)
|
||||
|
||||
if Map.has_key?(query, "hub.challenge") do
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: Map.get(query, "hub.challenge"),
|
||||
headers: @content_type_text_plain
|
||||
}
|
||||
else
|
||||
%Tesla.Env{status: 400, body: "no challenge", headers: @content_type_text_plain}
|
||||
end
|
||||
|
||||
%{url: ^subscriber_url, method: :post} = req ->
|
||||
Tesla.Mock.Agent.add_hit(:subscriber, req)
|
||||
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: "ok",
|
||||
headers: @content_type_text_plain
|
||||
}
|
||||
|
||||
_not_matched ->
|
||||
%Tesla.Env{
|
||||
status: 404,
|
||||
body: "not found",
|
||||
headers: [{"content-type", "text/plain"}]
|
||||
}
|
||||
end)
|
||||
|
||||
[publisher_url: publisher_url, subscriber_url: subscriber_url]
|
||||
end
|
||||
end
|
|
@ -0,0 +1,118 @@
|
|||
defmodule Pleroma.Feed.RSSCloudControllerTest do
|
||||
use Pleroma.Web.ConnCase
|
||||
|
||||
setup do
|
||||
[subscriber_port: Enum.random(7000..8000)]
|
||||
end
|
||||
|
||||
test "subscribing to a specific topic with diff_domain = true", %{
|
||||
conn: conn,
|
||||
subscriber_port: subscriber_port
|
||||
} do
|
||||
callback_url = "http://localhost:#{subscriber_port}/callback"
|
||||
|
||||
Tesla.Mock.mock(fn
|
||||
%{method: :get, url: ^callback_url, query: query} ->
|
||||
query = Map.new(query)
|
||||
|
||||
if Map.has_key?(query, "challenge") && Map.has_key?(query, "url") do
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: Map.get(query, "challenge"),
|
||||
headers: [{"content-type", "text/plain"}]
|
||||
}
|
||||
else
|
||||
%Tesla.Env{status: 400, body: "no challenge", headers: [{"content-type", "text/plain"}]}
|
||||
end
|
||||
|
||||
_not_matched ->
|
||||
%Tesla.Env{
|
||||
status: 404,
|
||||
body: "not found",
|
||||
headers: [{"content-type", "text/plain"}]
|
||||
}
|
||||
end)
|
||||
|
||||
params = %{
|
||||
"protocol" => "http-rest",
|
||||
"domain" => "localhost",
|
||||
"port" => "#{subscriber_port}",
|
||||
"path" => "/callback",
|
||||
"notifyProcedure" => "",
|
||||
"url1" => "http://localhost:1234/topic"
|
||||
}
|
||||
|
||||
conn = form_post(conn, "/rsscloud/pleaseNotify", params)
|
||||
|
||||
assert response(conn, 200) =~ ""
|
||||
end
|
||||
|
||||
test "subscribing to a specific topic with diff_domain = false", %{
|
||||
conn: conn,
|
||||
subscriber_port: subscriber_port
|
||||
} do
|
||||
callback_url = "http://localhost:#{subscriber_port}/callback"
|
||||
|
||||
Tesla.Mock.mock(fn
|
||||
%{method: :post, url: ^callback_url} ->
|
||||
%Tesla.Env{status: 200, body: "ok", headers: [{"content-type", "text/plain"}]}
|
||||
|
||||
_not_matched ->
|
||||
%Tesla.Env{
|
||||
status: 404,
|
||||
body: "not found",
|
||||
headers: [{"content-type", "text/plain"}]
|
||||
}
|
||||
end)
|
||||
|
||||
params = %{
|
||||
"protocol" => "http-rest",
|
||||
"port" => "#{subscriber_port}",
|
||||
"path" => "/callback",
|
||||
"notifyProcedure" => "",
|
||||
"url1" => "http://localhost:1234/topic"
|
||||
}
|
||||
|
||||
conn = form_post(conn, "/rsscloud/pleaseNotify", params)
|
||||
|
||||
assert response(conn, 200) =~ ""
|
||||
end
|
||||
|
||||
test "subscribing with an invalid response", %{
|
||||
conn: conn,
|
||||
subscriber_port: subscriber_port
|
||||
} do
|
||||
callback_url = "http://localhost:#{subscriber_port}/callback"
|
||||
|
||||
Tesla.Mock.mock(fn
|
||||
%{method: :get, url: ^callback_url} ->
|
||||
%Tesla.Env{status: 200, body: "wrong answer", headers: [{"content-type", "text/plain"}]}
|
||||
|
||||
_not_matched ->
|
||||
%Tesla.Env{
|
||||
status: 404,
|
||||
body: "not found",
|
||||
headers: [{"content-type", "text/plain"}]
|
||||
}
|
||||
end)
|
||||
|
||||
params = %{
|
||||
"protocol" => "http-rest",
|
||||
"domain" => "localhost",
|
||||
"port" => "#{subscriber_port}",
|
||||
"path" => "/callback",
|
||||
"notifyProcedure" => "",
|
||||
"url1" => "http://localhost:1234/topic"
|
||||
}
|
||||
|
||||
conn = form_post(conn, "/rsscloud/pleaseNotify", params)
|
||||
|
||||
assert response(conn, 403) =~ "failed_challenge_body"
|
||||
end
|
||||
|
||||
defp form_post(conn, path, params) do
|
||||
conn
|
||||
|> Plug.Conn.put_req_header("content-type", "application/x-www-form-urlencoded")
|
||||
|> post(path, params)
|
||||
end
|
||||
end
|
|
@ -0,0 +1,81 @@
|
|||
defmodule Pleroma.Web.WebSubControllerTest do
|
||||
use Pleroma.Web.ConnCase
|
||||
|
||||
setup do
|
||||
[subscriber_url: "http://localhost/subscriber/callback"]
|
||||
end
|
||||
|
||||
test "subscribing to a specific topic", %{
|
||||
conn: conn,
|
||||
subscriber_url: callback_url
|
||||
} do
|
||||
Tesla.Mock.mock(fn
|
||||
%{method: :get, url: ^callback_url, query: query} ->
|
||||
query = Map.new(query)
|
||||
|
||||
if Map.has_key?(query, "hub.challenge") do
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: Map.get(query, "hub.challenge"),
|
||||
headers: [{"content-type", "text/plain"}]
|
||||
}
|
||||
else
|
||||
%Tesla.Env{status: 400, body: "no challenge", headers: [{"content-type", "text/plain"}]}
|
||||
end
|
||||
|
||||
_not_matched ->
|
||||
%Tesla.Env{
|
||||
status: 404,
|
||||
body: "not found",
|
||||
headers: [{"content-type", "text/plain"}]
|
||||
}
|
||||
end)
|
||||
|
||||
params = %{
|
||||
"hub.mode" => "subscribe",
|
||||
"hub.topic" => "http://localhost:1234/topic",
|
||||
"hub.callback" => callback_url
|
||||
}
|
||||
|
||||
conn = form_post(conn, "/hub", params)
|
||||
|
||||
assert response(conn, 202) =~ ""
|
||||
end
|
||||
|
||||
test "subscribing with an invalid response", %{
|
||||
conn: conn,
|
||||
subscriber_url: callback_url
|
||||
} do
|
||||
Tesla.Mock.mock(fn
|
||||
%{method: :get, url: ^callback_url} ->
|
||||
%Tesla.Env{
|
||||
status: 200,
|
||||
body: "whut?",
|
||||
headers: [{"content-type", "text/plain"}]
|
||||
}
|
||||
|
||||
_not_matched ->
|
||||
%Tesla.Env{
|
||||
status: 404,
|
||||
body: "not found",
|
||||
headers: [{"content-type", "text/plain"}]
|
||||
}
|
||||
end)
|
||||
|
||||
params = %{
|
||||
"hub.mode" => "subscribe",
|
||||
"hub.topic" => "http://localhost:1234/topic",
|
||||
"hub.callback" => callback_url
|
||||
}
|
||||
|
||||
conn = form_post(conn, "/hub", params)
|
||||
|
||||
assert response(conn, 403) =~ "failed_challenge_body"
|
||||
end
|
||||
|
||||
defp form_post(conn, path, params) do
|
||||
conn
|
||||
|> Plug.Conn.put_req_header("content-type", "application/x-www-form-urlencoded")
|
||||
|> post(path, params)
|
||||
end
|
||||
end
|
|
@ -120,6 +120,9 @@ defmodule Pleroma.Web.ConnCase do
|
|||
|
||||
Mox.verify_on_exit!()
|
||||
|
||||
Tesla.Mock.Agent.init()
|
||||
on_exit(fn -> Tesla.Mock.Agent.stop() end)
|
||||
|
||||
{:ok, conn: Phoenix.ConnTest.build_conn()}
|
||||
end
|
||||
end
|
||||
|
|
|
@ -100,6 +100,9 @@ defmodule Pleroma.DataCase do
|
|||
|
||||
Mox.verify_on_exit!()
|
||||
|
||||
Tesla.Mock.Agent.init()
|
||||
on_exit(fn -> Tesla.Mock.Agent.stop() end)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
defmodule Tesla.Mock.Agent do
|
||||
@moduledoc false
|
||||
|
||||
# require Logger
|
||||
|
||||
def init() do
|
||||
Agent.start_link(fn -> [] end, name: __MODULE__)
|
||||
end
|
||||
|
||||
def stop() do
|
||||
# Agent.stop(__MODULE__)
|
||||
end
|
||||
|
||||
def add_hit(key, env) do
|
||||
# if Enum.member?([:post, :put], env.method) do
|
||||
# Logger.error("#{key} #{env.method} #{env.url} #{inspect(env.body)}")
|
||||
# else
|
||||
# Logger.error("#{key} #{env.method} #{env.url} #{inspect(env.query)}")
|
||||
# end
|
||||
|
||||
Agent.update(__MODULE__, fn state -> [{key, env} | state] end)
|
||||
end
|
||||
|
||||
def access_list(key) do
|
||||
Agent.get(__MODULE__, fn state ->
|
||||
Enum.filter(state, fn {k, _v} -> k == key end)
|
||||
|> Enum.map(fn {_k, v} -> v end)
|
||||
|> Enum.reverse()
|
||||
end)
|
||||
end
|
||||
|
||||
def hits(key) do
|
||||
access_list(key)
|
||||
|> Enum.count()
|
||||
end
|
||||
end
|
Loading…
Reference in New Issue