Merge remote-tracking branch 'origin' into validate-user-info

This commit is contained in:
lain 2018-11-30 17:34:20 +01:00
commit d0ec2812bd
46 changed files with 1016 additions and 91 deletions

View file

@ -9,6 +9,11 @@ variables:
POSTGRES_PASSWORD: postgres POSTGRES_PASSWORD: postgres
DB_HOST: postgres DB_HOST: postgres
cache:
key: ${CI_COMMIT_REF_SLUG}
paths:
- deps
- _build
stages: stages:
- lint - lint
- test - test

View file

@ -52,6 +52,7 @@
url: [host: "localhost"], url: [host: "localhost"],
protocol: "https", protocol: "https",
secret_key_base: "aK4Abxf29xU9TTDKre9coZPUgevcVCFQJe/5xP/7Lt4BEif6idBIbjupVbOrbKxl", secret_key_base: "aK4Abxf29xU9TTDKre9coZPUgevcVCFQJe/5xP/7Lt4BEif6idBIbjupVbOrbKxl",
signing_salt: "CqaoopA2",
render_errors: [view: Pleroma.Web.ErrorView, accepts: ~w(json)], render_errors: [view: Pleroma.Web.ErrorView, accepts: ~w(json)],
pubsub: [name: Pleroma.PubSub, adapter: Phoenix.PubSub.PG2], pubsub: [name: Pleroma.PubSub, adapter: Phoenix.PubSub.PG2],
secure_cookie_flag: true secure_cookie_flag: true
@ -72,18 +73,10 @@
config :pleroma, :ostatus, Pleroma.Web.OStatus config :pleroma, :ostatus, Pleroma.Web.OStatus
config :pleroma, :httpoison, Pleroma.HTTP config :pleroma, :httpoison, Pleroma.HTTP
version =
with {version, 0} <- System.cmd("git", ["rev-parse", "HEAD"]) do
"Pleroma #{Mix.Project.config()[:version]} #{String.trim(version)}"
else
_ -> "Pleroma #{Mix.Project.config()[:version]} dev"
end
# Configures http settings, upstream proxy etc. # Configures http settings, upstream proxy etc.
config :pleroma, :http, proxy_url: nil config :pleroma, :http, proxy_url: nil
config :pleroma, :instance, config :pleroma, :instance,
version: version,
name: "Pleroma", name: "Pleroma",
email: "example@example.com", email: "example@example.com",
description: "A Pleroma instance, an alternative fediverse server", description: "A Pleroma instance, an alternative fediverse server",

View file

@ -87,3 +87,16 @@ This section is used to configure Pleroma-FE, unless ``:managed_config`` in ``:i
* ``sts_max_age``: The maximum age for the `Strict-Transport-Security` header if sent * ``sts_max_age``: The maximum age for the `Strict-Transport-Security` header if sent
* ``ct_max_age``: The maximum age for the `Expect-CT` header if sent * ``ct_max_age``: The maximum age for the `Expect-CT` header if sent
* ``referrer_policy``: The referrer policy to use, either `"same-origin"` or `"no-referrer"`. * ``referrer_policy``: The referrer policy to use, either `"same-origin"` or `"no-referrer"`.
## :mrf_user_allowlist
The keys in this section are the domain names that the policy should apply to.
Each key should be assigned a list of users that should be allowed through by
their ActivityPub ID.
An example:
```
config :pleroma, :mrf_user_allowlist,
"example.org": ["https://example.org/users/admin"]
```

View file

@ -14,9 +14,11 @@ defmodule Mix.Tasks.RelayFollow do
def run([target]) do def run([target]) do
Mix.Task.run("app.start") Mix.Task.run("app.start")
:ok = Relay.follow(target) with {:ok, activity} <- Relay.follow(target) do
# put this task to sleep to allow the genserver to push out the messages # put this task to sleep to allow the genserver to push out the messages
:timer.sleep(500) :timer.sleep(500)
else
{:error, e} -> Mix.shell().error("Error while following #{target}: #{inspect(e)}")
end
end end
end end

View file

@ -13,9 +13,11 @@ defmodule Mix.Tasks.RelayUnfollow do
def run([target]) do def run([target]) do
Mix.Task.run("app.start") Mix.Task.run("app.start")
:ok = Relay.unfollow(target) with {:ok, activity} <- Relay.follow(target) do
# put this task to sleep to allow the genserver to push out the messages # put this task to sleep to allow the genserver to push out the messages
:timer.sleep(500) :timer.sleep(500)
else
{:error, e} -> Mix.shell().error("Error while following #{target}: #{inspect(e)}")
end
end end
end end

View file

@ -0,0 +1,32 @@
defmodule Mix.Tasks.SetAdmin do
use Mix.Task
alias Pleroma.User
@doc """
Sets admin status
Usage: set_admin nickname [true|false]
"""
def run([nickname | rest]) do
Application.ensure_all_started(:pleroma)
status =
case rest do
[status] -> status == "true"
_ -> true
end
with %User{local: true} = user <- User.get_by_nickname(nickname) do
info =
user.info
|> Map.put("is_admin", !!status)
cng = User.info_changeset(user, %{info: info})
{:ok, user} = User.update_and_set_cache(cng)
IO.puts("Admin status of #{nickname}: #{user.info["is_admin"]}")
else
_ ->
IO.puts("No local user #{nickname}")
end
end
end

View file

@ -1,8 +1,15 @@
defmodule Pleroma.Application do defmodule Pleroma.Application do
use Application use Application
@name "Pleroma"
@version Mix.Project.config()[:version]
def name, do: @name
def version, do: @version
def named_version(), do: @name <> " " <> @version
# See http://elixir-lang.org/docs/stable/elixir/Application.html # See http://elixir-lang.org/docs/stable/elixir/Application.html
# for more information on OTP Applications # for more information on OTP Applications
@env Mix.env()
def start(_type, _args) do def start(_type, _args) do
import Supervisor.Spec import Supervisor.Spec
import Cachex.Spec import Cachex.Spec
@ -57,10 +64,11 @@ def start(_type, _args) do
id: :cachex_idem id: :cachex_idem
), ),
worker(Pleroma.Web.Federator, []), worker(Pleroma.Web.Federator, []),
worker(Pleroma.Stats, []), worker(Pleroma.Web.Federator.RetryQueue, []),
worker(Pleroma.Gopher.Server, []) worker(Pleroma.Gopher.Server, []),
worker(Pleroma.Stats, [])
] ++ ] ++
if Mix.env() == :test, if @env == :test,
do: [], do: [],
else: else:
[worker(Pleroma.Web.Streamer, [])] ++ [worker(Pleroma.Web.Streamer, [])] ++

View file

@ -31,10 +31,12 @@ def normalize(obj) when is_map(obj), do: Object.get_by_ap_id(obj["id"])
def normalize(ap_id) when is_binary(ap_id), do: Object.get_by_ap_id(ap_id) def normalize(ap_id) when is_binary(ap_id), do: Object.get_by_ap_id(ap_id)
def normalize(_), do: nil def normalize(_), do: nil
def get_cached_by_ap_id(ap_id) do
if Mix.env() == :test do if Mix.env() == :test do
def get_cached_by_ap_id(ap_id) do
get_by_ap_id(ap_id) get_by_ap_id(ap_id)
end
else else
def get_cached_by_ap_id(ap_id) do
key = "object:#{ap_id}" key = "object:#{ap_id}"
Cachex.fetch!(:object_cache, key, fn _ -> Cachex.fetch!(:object_cache, key, fn _ ->

View file

@ -29,6 +29,8 @@ defp headers do
end end
defp csp_string do defp csp_string do
protocol = Config.get([Pleroma.Web.Endpoint, :protocol])
[ [
"default-src 'none'", "default-src 'none'",
"base-uri 'self'", "base-uri 'self'",
@ -39,7 +41,10 @@ defp csp_string do
"font-src 'self'", "font-src 'self'",
"script-src 'self'", "script-src 'self'",
"connect-src 'self' " <> String.replace(Pleroma.Web.Endpoint.static_url(), "http", "ws"), "connect-src 'self' " <> String.replace(Pleroma.Web.Endpoint.static_url(), "http", "ws"),
"manifest-src 'self'",
if @protocol == "https" do
"upgrade-insecure-requests" "upgrade-insecure-requests"
end
] ]
|> Enum.join("; ") |> Enum.join("; ")
end end

View file

@ -0,0 +1,19 @@
defmodule Pleroma.Plugs.UserIsAdminPlug do
import Plug.Conn
alias Pleroma.User
def init(options) do
options
end
def call(%{assigns: %{user: %User{info: %{"is_admin" => true}}}} = conn, _) do
conn
end
def call(conn, _) do
conn
|> put_resp_content_type("application/json")
|> send_resp(403, Jason.encode!(%{error: "User is not admin."}))
|> halt
end
end

View file

@ -162,7 +162,13 @@ def get_content_type(file) do
"audio/mpeg" "audio/mpeg"
<<0x4F, 0x67, 0x67, 0x53, 0x00, 0x02, 0x00, 0x00>> -> <<0x4F, 0x67, 0x67, 0x53, 0x00, 0x02, 0x00, 0x00>> ->
case IO.binread(f, 27) do
<<_::size(160), 0x80, 0x74, 0x68, 0x65, 0x6F, 0x72, 0x61>> ->
"video/ogg"
_ ->
"audio/ogg" "audio/ogg"
end
<<0x52, 0x49, 0x46, 0x46, _, _, _, _>> -> <<0x52, 0x49, 0x46, 0x46, _, _, _, _>> ->
"audio/wav" "audio/wav"

View file

@ -0,0 +1,23 @@
defmodule Pleroma.Web.ActivityPub.MRF.UserAllowListPolicy do
alias Pleroma.Config
@behaviour Pleroma.Web.ActivityPub.MRF
defp filter_by_list(object, []), do: {:ok, object}
defp filter_by_list(%{"actor" => actor} = object, allow_list) do
if actor in allow_list do
{:ok, object}
else
{:reject, nil}
end
end
@impl true
def filter(object) do
actor_info = URI.parse(object["actor"])
allow_list = Config.get([:mrf_user_allowlist, String.to_atom(actor_info.host)], [])
filter_by_list(object, allow_list)
end
end

View file

@ -12,11 +12,12 @@ def follow(target_instance) do
%User{} = target_user <- User.get_or_fetch_by_ap_id(target_instance), %User{} = target_user <- User.get_or_fetch_by_ap_id(target_instance),
{:ok, activity} <- ActivityPub.follow(local_user, target_user) do {:ok, activity} <- ActivityPub.follow(local_user, target_user) do
Logger.info("relay: followed instance: #{target_instance}; id=#{activity.data["id"]}") Logger.info("relay: followed instance: #{target_instance}; id=#{activity.data["id"]}")
{:ok, activity}
else else
e -> Logger.error("error: #{inspect(e)}") e ->
Logger.error("error: #{inspect(e)}")
{:error, e}
end end
:ok
end end
def unfollow(target_instance) do def unfollow(target_instance) do
@ -24,11 +25,12 @@ def unfollow(target_instance) do
%User{} = target_user <- User.get_or_fetch_by_ap_id(target_instance), %User{} = target_user <- User.get_or_fetch_by_ap_id(target_instance),
{:ok, activity} <- ActivityPub.unfollow(local_user, target_user) do {:ok, activity} <- ActivityPub.unfollow(local_user, target_user) do
Logger.info("relay: unfollowed instance: #{target_instance}: id=#{activity.data["id"]}") Logger.info("relay: unfollowed instance: #{target_instance}: id=#{activity.data["id"]}")
{:ok, activity}
else else
e -> Logger.error("error: #{inspect(e)}") e ->
Logger.error("error: #{inspect(e)}")
{:error, e}
end end
:ok
end end
def publish(%Activity{data: %{"type" => "Create"}} = activity) do def publish(%Activity{data: %{"type" => "Create"}} = activity) do

View file

@ -0,0 +1,158 @@
defmodule Pleroma.Web.AdminAPI.AdminAPIController do
use Pleroma.Web, :controller
alias Pleroma.{User, Repo}
alias Pleroma.Web.ActivityPub.Relay
require Logger
action_fallback(:errors)
def user_delete(conn, %{"nickname" => nickname}) do
user = User.get_by_nickname(nickname)
if user.local == true do
User.delete(user)
else
User.delete(user)
end
conn
|> json(nickname)
end
def user_create(
conn,
%{"nickname" => nickname, "email" => email, "password" => password}
) do
new_user = %{
nickname: nickname,
name: nickname,
email: email,
password: password,
password_confirmation: password,
bio: "."
}
User.register_changeset(%User{}, new_user)
|> Repo.insert!()
conn
|> json(new_user.nickname)
end
def right_add(conn, %{"permission_group" => permission_group, "nickname" => nickname})
when permission_group in ["moderator", "admin"] do
user = User.get_by_nickname(nickname)
info =
user.info
|> Map.put("is_" <> permission_group, true)
cng = User.info_changeset(user, %{info: info})
{:ok, user} = User.update_and_set_cache(cng)
conn
|> json(user.info)
end
def right_get(conn, %{"nickname" => nickname}) do
user = User.get_by_nickname(nickname)
conn
|> json(user.info)
end
def right_add(conn, _) do
conn
|> put_status(404)
|> json(%{error: "No such permission_group"})
end
def right_delete(
%{assigns: %{user: %User{:nickname => admin_nickname}}} = conn,
%{
"permission_group" => permission_group,
"nickname" => nickname
}
)
when permission_group in ["moderator", "admin"] do
if admin_nickname == nickname do
conn
|> put_status(403)
|> json(%{error: "You can't revoke your own admin status."})
else
user = User.get_by_nickname(nickname)
info =
user.info
|> Map.put("is_" <> permission_group, false)
cng = User.info_changeset(user, %{info: info})
{:ok, user} = User.update_and_set_cache(cng)
conn
|> json(user.info)
end
end
def right_delete(conn, _) do
conn
|> put_status(404)
|> json(%{error: "No such permission_group"})
end
def relay_follow(conn, %{"relay_url" => target}) do
{status, message} = Relay.follow(target)
if status == :ok do
conn
|> json(target)
else
conn
|> put_status(500)
|> json(target)
end
end
def relay_unfollow(conn, %{"relay_url" => target}) do
{status, message} = Relay.unfollow(target)
if status == :ok do
conn
|> json(target)
else
conn
|> put_status(500)
|> json(target)
end
end
@shortdoc "Get a account registeration invite token (base64 string)"
def get_invite_token(conn, _params) do
{:ok, token} = Pleroma.UserInviteToken.create_token()
conn
|> json(token.token)
end
@shortdoc "Get a password reset token (base64 string) for given nickname"
def get_password_reset(conn, %{"nickname" => nickname}) do
(%User{local: true} = user) = User.get_by_nickname(nickname)
{:ok, token} = Pleroma.PasswordResetToken.create_token(user)
conn
|> json(token.token)
end
def errors(conn, {:param_cast, _}) do
conn
|> put_status(400)
|> json("Invalid parameters")
end
def errors(conn, _) do
conn
|> put_status(500)
|> json("Something went wrong")
end
end

View file

@ -4,9 +4,7 @@ defmodule Pleroma.Web.UserSocket do
## Channels ## Channels
# channel "room:*", Pleroma.Web.RoomChannel # channel "room:*", Pleroma.Web.RoomChannel
if Application.get_env(:pleroma, :chat) |> Keyword.get(:enabled) do
channel("chat:*", Pleroma.Web.ChatChannel) channel("chat:*", Pleroma.Web.ChatChannel)
end
## Transports ## Transports
transport(:websocket, Phoenix.Transports.WebSocket) transport(:websocket, Phoenix.Transports.WebSocket)
@ -24,7 +22,8 @@ defmodule Pleroma.Web.UserSocket do
# See `Phoenix.Token` documentation for examples in # See `Phoenix.Token` documentation for examples in
# performing token verification on connect. # performing token verification on connect.
def connect(%{"token" => token}, socket) do def connect(%{"token" => token}, socket) do
with {:ok, user_id} <- Phoenix.Token.verify(socket, "user socket", token, max_age: 84600), with true <- Pleroma.Config.get([:chat, :enabled]),
{:ok, user_id} <- Phoenix.Token.verify(socket, "user socket", token, max_age: 84600),
%User{} = user <- Pleroma.Repo.get(User, user_id) do %User{} = user <- Pleroma.Repo.get(User, user_id) do
{:ok, assign(socket, :user_name, user.nickname)} {:ok, assign(socket, :user_name, user.nickname)}
else else

View file

@ -1,9 +1,7 @@
defmodule Pleroma.Web.Endpoint do defmodule Pleroma.Web.Endpoint do
use Phoenix.Endpoint, otp_app: :pleroma use Phoenix.Endpoint, otp_app: :pleroma
if Application.get_env(:pleroma, :chat) |> Keyword.get(:enabled) do
socket("/socket", Pleroma.Web.UserSocket) socket("/socket", Pleroma.Web.UserSocket)
end
socket("/api/v1", Pleroma.Web.MastodonAPI.MastodonSocket) socket("/api/v1", Pleroma.Web.MastodonAPI.MastodonSocket)
@ -58,7 +56,7 @@ defmodule Pleroma.Web.Endpoint do
Plug.Session, Plug.Session,
store: :cookie, store: :cookie,
key: cookie_name, key: cookie_name,
signing_salt: "CqaoopA2", signing_salt: {Pleroma.Config, :get, [[__MODULE__, :signing_salt], "CqaoopA2"]},
http_only: true, http_only: true,
secure: secure:
Application.get_env(:pleroma, Pleroma.Web.Endpoint) |> Keyword.get(:secure_cookie_flag), Application.get_env(:pleroma, Pleroma.Web.Endpoint) |> Keyword.get(:secure_cookie_flag),

View file

@ -3,6 +3,7 @@ defmodule Pleroma.Web.Federator do
alias Pleroma.User alias Pleroma.User
alias Pleroma.Activity alias Pleroma.Activity
alias Pleroma.Web.{WebFinger, Websub} alias Pleroma.Web.{WebFinger, Websub}
alias Pleroma.Web.Federator.RetryQueue
alias Pleroma.Web.ActivityPub.ActivityPub alias Pleroma.Web.ActivityPub.ActivityPub
alias Pleroma.Web.ActivityPub.Relay alias Pleroma.Web.ActivityPub.Relay
alias Pleroma.Web.ActivityPub.Transmogrifier alias Pleroma.Web.ActivityPub.Transmogrifier
@ -122,29 +123,25 @@ def handle(:incoming_ap_doc, params) do
end end
def handle(:publish_single_ap, params) do def handle(:publish_single_ap, params) do
ActivityPub.publish_one(params) case ActivityPub.publish_one(params) do
{:ok, _} ->
:ok
{:error, _} ->
RetryQueue.enqueue(params, ActivityPub)
end
end end
def handle(:publish_single_websub, %{xml: xml, topic: topic, callback: callback, secret: secret}) do def handle(
signature = @websub.sign(secret || "", xml) :publish_single_websub,
Logger.debug(fn -> "Pushing #{topic} to #{callback}" end) %{xml: xml, topic: topic, callback: callback, secret: secret} = params
with {:ok, %{status_code: code}} <-
@httpoison.post(
callback,
xml,
[
{"Content-Type", "application/atom+xml"},
{"X-Hub-Signature", "sha1=#{signature}"}
],
timeout: 10000,
recv_timeout: 20000,
hackney: [pool: :default]
) do ) do
Logger.debug(fn -> "Pushed to #{callback}, code #{code}" end) case Websub.publish_one(params) do
else {:ok, _} ->
e -> :ok
Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(e)}" end)
{:error, _} ->
RetryQueue.enqueue(params, Websub)
end end
end end
@ -153,11 +150,15 @@ def handle(type, _) do
{:error, "Don't know what to do with this"} {:error, "Don't know what to do with this"}
end end
if Mix.env() == :test do
def enqueue(type, payload, priority \\ 1) do def enqueue(type, payload, priority \\ 1) do
if Pleroma.Config.get([:instance, :federating]) do if Pleroma.Config.get([:instance, :federating]) do
if Mix.env() == :test do
handle(type, payload) handle(type, payload)
end
end
else else
def enqueue(type, payload, priority \\ 1) do
if Pleroma.Config.get([:instance, :federating]) do
GenServer.cast(__MODULE__, {:enqueue, type, payload, priority}) GenServer.cast(__MODULE__, {:enqueue, type, payload, priority})
end end
end end

View file

@ -0,0 +1,71 @@
defmodule Pleroma.Web.Federator.RetryQueue do
use GenServer
alias Pleroma.Web.{WebFinger, Websub}
alias Pleroma.Web.ActivityPub.ActivityPub
require Logger
@websub Application.get_env(:pleroma, :websub)
@ostatus Application.get_env(:pleroma, :websub)
@httpoison Application.get_env(:pleroma, :websub)
@instance Application.get_env(:pleroma, :websub)
# initial timeout, 5 min
@initial_timeout 30_000
@max_retries 5
def init(args) do
{:ok, args}
end
def start_link() do
GenServer.start_link(__MODULE__, %{delivered: 0, dropped: 0}, name: __MODULE__)
end
def enqueue(data, transport, retries \\ 0) do
GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
end
def get_retry_params(retries) do
if retries > @max_retries do
{:drop, "Max retries reached"}
else
{:retry, growth_function(retries)}
end
end
def handle_cast({:maybe_enqueue, data, transport, retries}, %{dropped: drop_count} = state) do
case get_retry_params(retries) do
{:retry, timeout} ->
Process.send_after(
__MODULE__,
{:send, data, transport, retries},
growth_function(retries)
)
{:noreply, state}
{:drop, message} ->
Logger.debug(message)
{:noreply, %{state | dropped: drop_count + 1}}
end
end
def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
case transport.publish_one(data) do
{:ok, _} ->
{:noreply, %{state | delivered: delivery_count + 1}}
{:error, reason} ->
enqueue(data, transport, retries)
{:noreply, state}
end
end
def handle_info(unknown, state) do
Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
{:noreply, state}
end
defp growth_function(retries) do
round(@initial_timeout * :math.pow(retries, 3))
end
end

View file

@ -141,7 +141,7 @@ def masto_instance(conn, _params) do
uri: Web.base_url(), uri: Web.base_url(),
title: Keyword.get(instance, :name), title: Keyword.get(instance, :name),
description: Keyword.get(instance, :description), description: Keyword.get(instance, :description),
version: "#{@mastodon_api_level} (compatible; #{Keyword.get(instance, :version)})", version: "#{@mastodon_api_level} (compatible; #{Pleroma.Application.named_version()})",
email: Keyword.get(instance, :email), email: Keyword.get(instance, :email),
urls: %{ urls: %{
streaming_api: String.replace(Pleroma.Web.Endpoint.static_url(), "http", "ws") streaming_api: String.replace(Pleroma.Web.Endpoint.static_url(), "http", "ws")

View file

@ -11,9 +11,8 @@ defmodule Pleroma.Web.MastodonAPI.MastodonSocket do
timeout: :infinity timeout: :infinity
) )
def connect(params, socket) do def connect(%{"access_token" => token} = params, socket) do
with token when not is_nil(token) <- params["access_token"], with %Token{user_id: user_id} <- Repo.get_by(Token, token: token),
%Token{user_id: user_id} <- Repo.get_by(Token, token: token),
%User{} = user <- Repo.get(User, user_id), %User{} = user <- Repo.get(User, user_id),
stream stream
when stream in [ when stream in [
@ -45,6 +44,24 @@ def connect(params, socket) do
end end
end end
def connect(%{"stream" => stream} = params, socket)
when stream in ["public", "public:local", "hashtag"] do
topic =
case stream do
"hashtag" -> "hashtag:#{params["tag"]}"
_ -> stream
end
with socket =
socket
|> assign(:topic, topic) do
Pleroma.Web.Streamer.add_socket(topic, socket)
{:ok, socket}
else
_e -> :error
end
end
def id(_), do: nil def id(_), do: nil
def handle(:text, message, _state) do def handle(:text, message, _state) do

View file

@ -3,6 +3,8 @@ defmodule Pleroma.Web.MediaProxy do
def url(nil), do: nil def url(nil), do: nil
def url(""), do: nil
def url(url = "/" <> _), do: url def url(url = "/" <> _), do: url
def url(url) do def url(url) do
@ -15,10 +17,10 @@ def url(url) do
base64 = Base.url_encode64(url, @base64_opts) base64 = Base.url_encode64(url, @base64_opts)
sig = :crypto.hmac(:sha, secret, base64) sig = :crypto.hmac(:sha, secret, base64)
sig64 = sig |> Base.url_encode64(@base64_opts) sig64 = sig |> Base.url_encode64(@base64_opts)
filename = Path.basename(URI.parse(url).path) filename = if path = URI.parse(url).path, do: "/" <> Path.basename(path), else: ""
Keyword.get(config, :base_url, Pleroma.Web.base_url()) <> Keyword.get(config, :base_url, Pleroma.Web.base_url()) <>
"/proxy/#{sig64}/#{base64}/#{filename}" "/proxy/#{sig64}/#{base64}#{filename}"
end end
end end

View file

@ -4,6 +4,7 @@ defmodule Pleroma.Web.Nodeinfo.NodeinfoController do
alias Pleroma.Stats alias Pleroma.Stats
alias Pleroma.Web alias Pleroma.Web
alias Pleroma.{User, Repo} alias Pleroma.{User, Repo}
alias Pleroma.Config
alias Pleroma.Web.ActivityPub.MRF alias Pleroma.Web.ActivityPub.MRF
plug(Pleroma.Web.FederatingPlug) plug(Pleroma.Web.FederatingPlug)
@ -52,6 +53,10 @@ def nodeinfo(conn, %{"version" => "2.0"}) do
|> Repo.all() |> Repo.all()
|> Enum.map(fn u -> u.ap_id end) |> Enum.map(fn u -> u.ap_id end)
mrf_user_allowlist =
Config.get([:mrf_user_allowlist], [])
|> Enum.into(%{}, fn {k, v} -> {k, length(v)} end)
mrf_transparency = Keyword.get(instance, :mrf_transparency) mrf_transparency = Keyword.get(instance, :mrf_transparency)
federation_response = federation_response =
@ -59,6 +64,7 @@ def nodeinfo(conn, %{"version" => "2.0"}) do
%{ %{
mrf_policies: mrf_policies, mrf_policies: mrf_policies,
mrf_simple: mrf_simple, mrf_simple: mrf_simple,
mrf_user_allowlist: mrf_user_allowlist,
quarantined_instances: quarantined quarantined_instances: quarantined
} }
else else
@ -86,8 +92,8 @@ def nodeinfo(conn, %{"version" => "2.0"}) do
response = %{ response = %{
version: "2.0", version: "2.0",
software: %{ software: %{
name: "pleroma", name: Pleroma.Application.name(),
version: Keyword.get(instance, :version) version: Pleroma.Application.version()
}, },
protocols: ["ostatus", "activitypub"], protocols: ["ostatus", "activitypub"],
services: %{ services: %{

View file

@ -136,7 +136,7 @@ def notice(conn, %{"id" => id}) do
"html" -> "html" ->
conn conn
|> put_resp_content_type("text/html") |> put_resp_content_type("text/html")
|> send_file(200, "priv/static/index.html") |> send_file(200, Application.app_dir(:pleroma, "priv/static/index.html"))
_ -> _ ->
represent_activity(conn, format, activity, user) represent_activity(conn, format, activity, user)

View file

@ -31,6 +31,21 @@ defmodule Pleroma.Web.Router do
plug(Pleroma.Plugs.EnsureAuthenticatedPlug) plug(Pleroma.Plugs.EnsureAuthenticatedPlug)
end end
pipeline :admin_api do
plug(:accepts, ["json"])
plug(:fetch_session)
plug(Pleroma.Plugs.OAuthPlug)
plug(Pleroma.Plugs.BasicAuthDecoderPlug)
plug(Pleroma.Plugs.UserFetcherPlug)
plug(Pleroma.Plugs.SessionAuthenticationPlug)
plug(Pleroma.Plugs.LegacyAuthenticationPlug)
plug(Pleroma.Plugs.AuthenticationPlug)
plug(Pleroma.Plugs.UserEnabledPlug)
plug(Pleroma.Plugs.SetUserSessionIdPlug)
plug(Pleroma.Plugs.EnsureAuthenticatedPlug)
plug(Pleroma.Plugs.UserIsAdminPlug)
end
pipeline :mastodon_html do pipeline :mastodon_html do
plug(:accepts, ["html"]) plug(:accepts, ["html"])
plug(:fetch_session) plug(:fetch_session)
@ -79,6 +94,23 @@ defmodule Pleroma.Web.Router do
get("/emoji", UtilController, :emoji) get("/emoji", UtilController, :emoji)
end end
scope "/api/pleroma/admin", Pleroma.Web.AdminAPI do
pipe_through(:admin_api)
delete("/user", AdminAPIController, :user_delete)
post("/user", AdminAPIController, :user_create)
get("/permission_group/:nickname", AdminAPIController, :right_get)
get("/permission_group/:nickname/:permission_group", AdminAPIController, :right_get)
post("/permission_group/:nickname/:permission_group", AdminAPIController, :right_add)
delete("/permission_group/:nickname/:permission_group", AdminAPIController, :right_delete)
post("/relay", AdminAPIController, :relay_follow)
delete("/relay", AdminAPIController, :relay_unfollow)
get("/invite_token", AdminAPIController, :get_invite_token)
get("/password_reset", AdminAPIController, :get_password_reset)
end
scope "/", Pleroma.Web.TwitterAPI do scope "/", Pleroma.Web.TwitterAPI do
pipe_through(:pleroma_html) pipe_through(:pleroma_html)
get("/ostatus_subscribe", UtilController, :remote_follow) get("/ostatus_subscribe", UtilController, :remote_follow)
@ -398,11 +430,9 @@ defmodule Fallback.RedirectController do
use Pleroma.Web, :controller use Pleroma.Web, :controller
def redirector(conn, _params) do def redirector(conn, _params) do
if Mix.env() != :test do
conn conn
|> put_resp_content_type("text/html") |> put_resp_content_type("text/html")
|> send_file(200, "priv/static/index.html") |> send_file(200, Application.app_dir(:pleroma, "priv/static/index.html"))
end
end end
def registration_page(conn, params) do def registration_page(conn, params) do

View file

@ -73,7 +73,8 @@ def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
Pleroma.List.get_lists_from_activity(item) Pleroma.List.get_lists_from_activity(item)
|> Enum.filter(fn list -> |> Enum.filter(fn list ->
owner = Repo.get(User, list.user_id) owner = Repo.get(User, list.user_id)
author.follower_address in owner.following
ActivityPub.visible_for_user?(item, owner)
end) end)
end end
@ -169,9 +170,23 @@ defp represent_update(%Activity{} = activity, %User{} = user) do
|> Jason.encode!() |> Jason.encode!()
end end
defp represent_update(%Activity{} = activity) do
%{
event: "update",
payload:
Pleroma.Web.MastodonAPI.StatusView.render(
"status.json",
activity: activity
)
|> Jason.encode!()
}
|> Jason.encode!()
end
def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
Enum.each(topics[topic] || [], fn socket -> Enum.each(topics[topic] || [], fn socket ->
# Get the current user so we have up-to-date blocks etc. # Get the current user so we have up-to-date blocks etc.
if socket.assigns[:user] do
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id) user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
blocks = user.info.blocks || [] blocks = user.info.blocks || []
@ -180,18 +195,25 @@ def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = ite
unless is_nil(parent) or item.actor in blocks or parent.data["actor"] in blocks do unless is_nil(parent) or item.actor in blocks or parent.data["actor"] in blocks do
send(socket.transport_pid, {:text, represent_update(item, user)}) send(socket.transport_pid, {:text, represent_update(item, user)})
end end
else
send(socket.transport_pid, {:text, represent_update(item)})
end
end) end)
end end
def push_to_socket(topics, topic, item) do def push_to_socket(topics, topic, item) do
Enum.each(topics[topic] || [], fn socket -> Enum.each(topics[topic] || [], fn socket ->
# Get the current user so we have up-to-date blocks etc. # Get the current user so we have up-to-date blocks etc.
if socket.assigns[:user] do
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id) user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
blocks = user.info.blocks || [] blocks = user.info.blocks || []
unless item.actor in blocks do unless item.actor in blocks do
send(socket.transport_pid, {:text, represent_update(item, user)}) send(socket.transport_pid, {:text, represent_update(item, user)})
end end
else
send(socket.transport_pid, {:text, represent_update(item)})
end
end) end)
end end

View file

@ -2,7 +2,9 @@
<html> <html>
<head> <head>
<meta charset=utf-8 /> <meta charset=utf-8 />
<title>Pleroma</title> <title>
<%= Application.get_env(:pleroma, :instance)[:name] %>
</title>
<style> <style>
body { body {
background-color: #282c37; background-color: #282c37;

View file

@ -197,7 +197,7 @@ def config(conn, _params) do
end end
def version(conn, _params) do def version(conn, _params) do
version = Pleroma.Config.get([:instance, :version]) version = Pleroma.Application.named_version()
case get_format(conn) do case get_format(conn) do
"xml" -> "xml" ->

View file

@ -252,4 +252,29 @@ def refresh_subscriptions(delta \\ 60 * 60 * 24) do
Pleroma.Web.Federator.enqueue(:request_subscription, sub) Pleroma.Web.Federator.enqueue(:request_subscription, sub)
end) end)
end end
def publish_one(%{xml: xml, topic: topic, callback: callback, secret: secret}) do
signature = sign(secret || "", xml)
Logger.info(fn -> "Pushing #{topic} to #{callback}" end)
with {:ok, %{status_code: code}} <-
@httpoison.post(
callback,
xml,
[
{"Content-Type", "application/atom+xml"},
{"X-Hub-Signature", "sha1=#{signature}"}
],
timeout: 10000,
recv_timeout: 20000,
hackney: [pool: :default]
) do
Logger.info(fn -> "Pushed to #{callback}, code #{code}" end)
{:ok, code}
else
e ->
Logger.debug(fn -> "Couldn't push to #{callback}, #{inspect(e)}" end)
{:error, e}
end
end
end end

49
mix.exs
View file

@ -4,7 +4,7 @@ defmodule Pleroma.Mixfile do
def project do def project do
[ [
app: :pleroma, app: :pleroma,
version: "0.9.0", version: version("0.9.0"),
elixir: "~> 1.4", elixir: "~> 1.4",
elixirc_paths: elixirc_paths(Mix.env()), elixirc_paths: elixirc_paths(Mix.env()),
compilers: [:phoenix, :gettext] ++ Mix.compilers(), compilers: [:phoenix, :gettext] ++ Mix.compilers(),
@ -84,4 +84,51 @@ defp aliases do
test: ["ecto.create --quiet", "ecto.migrate", "test"] test: ["ecto.create --quiet", "ecto.migrate", "test"]
] ]
end end
# Builds a version string made of:
# * the application version
# * a pre-release if ahead of the tag: the describe string (-count-commithash)
# * build info:
# * a build name if `PLEROMA_BUILD_NAME` or `:pleroma, :build_name` is defined
# * the mix environment if different than prod
defp version(version) do
{git_tag, git_pre_release} =
with {tag, 0} <- System.cmd("git", ["describe", "--tags", "--abbrev=0"]),
tag = String.trim(tag),
{describe, 0} <- System.cmd("git", ["describe", "--tags", "--abbrev=8"]),
describe = String.trim(describe),
ahead <- String.replace(describe, tag, "") do
{String.replace_prefix(tag, "v", ""), if(ahead != "", do: String.trim(ahead))}
else
_ -> {nil, nil}
end
if git_tag && version != git_tag do
Mix.shell().error(
"Application version #{inspect(version)} does not match git tag #{inspect(git_tag)}"
)
end
build_name =
cond do
name = Application.get_env(:pleroma, :build_name) -> name
name = System.get_env("PLEROMA_BUILD_NAME") -> name
true -> nil
end
env_name = if Mix.env() != :prod, do: to_string(Mix.env())
build =
[build_name, env_name]
|> Enum.filter(fn string -> string && string != "" end)
|> Enum.join("-")
|> (fn
"" -> nil
string -> "+" <> string
end).()
[version, git_pre_release, build]
|> Enum.filter(fn string -> string && string != "" end)
|> Enum.join()
end
end end

View file

@ -1 +1 @@
<!DOCTYPE html><html lang=en><head><meta charset=utf-8><meta name=viewport content="width=device-width,initial-scale=1"><title>Pleroma</title><link rel=icon type=image/png href=/favicon.png><link rel=stylesheet href=/static/font/css/fontello.css><link rel=stylesheet href=/static/font/css/animation.css><link href=/static/css/app.0808aeafc6252b3050ea95b17dcaff1a.css rel=stylesheet></head><body style="display: none"><div id=app></div><script type=text/javascript src=/static/js/manifest.35cf9744b80c38efa9fa.js></script><script type=text/javascript src=/static/js/vendor.7e4a5b87ce584522089d.js></script><script type=text/javascript src=/static/js/app.a65abd01bcd13a691048.js></script></body></html> <!DOCTYPE html><html lang=en><head><meta charset=utf-8><meta name=viewport content="width=device-width,initial-scale=1"><title>Pleroma</title><link rel=icon type=image/png href=/favicon.png><link rel=stylesheet href=/static/font/css/fontello.css><link rel=stylesheet href=/static/font/css/animation.css><link href=/static/css/app.0808aeafc6252b3050ea95b17dcaff1a.css rel=stylesheet></head><body style="display: none"><div id=app></div><script type=text/javascript src=/static/js/manifest.34667c2817916147413f.js></script><script type=text/javascript src=/static/js/vendor.32c621c7157f34c20923.js></script><script type=text/javascript src=/static/js/app.065638d22ade92dea420.js></script></body></html>

Binary file not shown.

Binary file not shown.

Binary file not shown.

113
test/media_proxy_test.exs Normal file
View file

@ -0,0 +1,113 @@
defmodule Pleroma.MediaProxyTest do
use ExUnit.Case
import Pleroma.Web.MediaProxy
describe "when enabled" do
setup do
enabled = Pleroma.Config.get([:media_proxy, :enabled])
unless enabled do
Pleroma.Config.put([:media_proxy, :enabled], true)
on_exit(fn -> Pleroma.Config.put([:media_proxy, :enabled], enabled) end)
end
:ok
end
test "ignores invalid url" do
assert url(nil) == nil
assert url("") == nil
end
test "ignores relative url" do
assert url("/local") == "/local"
assert url("/") == "/"
end
test "ignores local url" do
local_url = Pleroma.Web.Endpoint.url() <> "/hello"
local_root = Pleroma.Web.Endpoint.url()
assert url(local_url) == local_url
assert url(local_root) == local_root
end
test "encodes and decodes URL" do
url = "https://pleroma.soykaf.com/static/logo.png"
encoded = url(url)
assert String.starts_with?(
encoded,
Pleroma.Config.get([:media_proxy, :base_url], Pleroma.Web.base_url())
)
assert String.ends_with?(encoded, "/logo.png")
assert decode_result(encoded) == url
end
test "encodes and decodes URL without a path" do
url = "https://pleroma.soykaf.com"
encoded = url(url)
assert decode_result(encoded) == url
end
test "encodes and decodes URL without an extension" do
url = "https://pleroma.soykaf.com/path/"
encoded = url(url)
assert String.ends_with?(encoded, "/path")
assert decode_result(encoded) == url
end
test "encodes and decodes URL and ignores query params for the path" do
url = "https://pleroma.soykaf.com/static/logo.png?93939393939&bunny=true"
encoded = url(url)
assert String.ends_with?(encoded, "/logo.png")
assert decode_result(encoded) == url
end
test "validates signature" do
secret_key_base = Pleroma.Config.get([Pleroma.Web.Endpoint, :secret_key_base])
on_exit(fn ->
Pleroma.Config.put([Pleroma.Web.Endpoint, :secret_key_base], secret_key_base)
end)
encoded = url("https://pleroma.social")
Pleroma.Config.put(
[Pleroma.Web.Endpoint, :secret_key_base],
"00000000000000000000000000000000000000000000000"
)
[_, "proxy", sig, base64 | _] = URI.parse(encoded).path |> String.split("/")
assert decode_url(sig, base64) == {:error, :invalid_signature}
end
end
describe "when disabled" do
setup do
enabled = Pleroma.Config.get([:media_proxy, :enabled])
if enabled do
Pleroma.Config.put([:media_proxy, :enabled], false)
on_exit(fn ->
Pleroma.Config.put([:media_proxy, :enabled], enabled)
:ok
end)
end
:ok
end
test "does not encode remote urls" do
assert url("https://google.fr") == "https://google.fr"
end
end
defp decode_result(encoded) do
[_, "proxy", sig, base64 | _] = URI.parse(encoded).path |> String.split("/")
{:ok, decoded} = decode_url(sig, base64)
decoded
end
end

View file

@ -0,0 +1,39 @@
defmodule Pleroma.Plugs.UserIsAdminPlugTest do
use Pleroma.Web.ConnCase, async: true
alias Pleroma.Plugs.UserIsAdminPlug
import Pleroma.Factory
test "accepts a user that is admin", %{conn: conn} do
user = insert(:user, info: %{"is_admin" => true})
conn =
build_conn()
|> assign(:user, user)
ret_conn =
conn
|> UserIsAdminPlug.call(%{})
assert conn == ret_conn
end
test "denies a user that isn't admin", %{conn: conn} do
user = insert(:user)
conn =
build_conn()
|> assign(:user, user)
|> UserIsAdminPlug.call(%{})
assert conn.status == 403
end
test "denies when a user isn't set", %{conn: conn} do
conn =
build_conn()
|> UserIsAdminPlug.call(%{})
assert conn.status == 403
end
end

View file

@ -0,0 +1,112 @@
defmodule Pleroma.Web.AdminAPI.AdminAPIControllerTest do
use Pleroma.Web.ConnCase
alias Pleroma.{Repo, User}
import Pleroma.Factory
import ExUnit.CaptureLog
describe "/api/pleroma/admin/user" do
test "Delete" do
admin = insert(:user, info: %{"is_admin" => true})
user = insert(:user)
conn =
build_conn()
|> assign(:user, admin)
|> put_req_header("accept", "application/json")
|> delete("/api/pleroma/admin/user?nickname=#{user.nickname}")
assert json_response(conn, 200) == user.nickname
end
test "Create" do
admin = insert(:user, info: %{"is_admin" => true})
conn =
build_conn()
|> assign(:user, admin)
|> put_req_header("accept", "application/json")
|> post("/api/pleroma/admin/user", %{
"nickname" => "lain",
"email" => "lain@example.org",
"password" => "test"
})
assert json_response(conn, 200) == "lain"
end
end
describe "/api/pleroma/admin/permission_group" do
test "GET is giving user_info" do
admin = insert(:user, info: %{"is_admin" => true})
conn =
build_conn()
|> assign(:user, admin)
|> put_req_header("accept", "application/json")
|> get("/api/pleroma/admin/permission_group/#{admin.nickname}")
assert json_response(conn, 200) == admin.info
end
test "/:right POST, can add to a permission group" do
admin = insert(:user, info: %{"is_admin" => true})
user = insert(:user)
user_info =
user.info
|> Map.put("is_admin", true)
conn =
build_conn()
|> assign(:user, admin)
|> put_req_header("accept", "application/json")
|> post("/api/pleroma/admin/permission_group/#{user.nickname}/admin")
assert json_response(conn, 200) == user_info
end
test "/:right DELETE, can remove from a permission group" do
admin = insert(:user, info: %{"is_admin" => true})
user = insert(:user, info: %{"is_admin" => true})
user_info =
user.info
|> Map.put("is_admin", false)
conn =
build_conn()
|> assign(:user, admin)
|> put_req_header("accept", "application/json")
|> delete("/api/pleroma/admin/permission_group/#{user.nickname}/admin")
assert json_response(conn, 200) == user_info
end
end
test "/api/pleroma/admin/invite_token" do
admin = insert(:user, info: %{"is_admin" => true})
conn =
build_conn()
|> assign(:user, admin)
|> put_req_header("accept", "application/json")
|> get("/api/pleroma/admin/invite_token")
assert conn.status == 200
end
test "/api/pleroma/admin/password_reset" do
admin = insert(:user, info: %{"is_admin" => true})
user = insert(:user, info: %{"is_admin" => true})
conn =
build_conn()
|> assign(:user, admin)
|> put_req_header("accept", "application/json")
|> get("/api/pleroma/admin/password_reset?nickname=#{user.nickname}")
assert conn.status == 200
end
end

View file

@ -0,0 +1,33 @@
defmodule Pleroma.Web.MastodonApi.MastodonSocketTest do
use Pleroma.DataCase
alias Pleroma.Web.MastodonApi.MastodonSocket
alias Pleroma.Web.{Streamer, CommonAPI}
alias Pleroma.User
import Pleroma.Factory
test "public is working when non-authenticated" do
user = insert(:user)
task =
Task.async(fn ->
assert_receive {:text, _}, 4_000
end)
fake_socket = %{
transport_pid: task.pid,
assigns: %{}
}
topics = %{
"public" => [fake_socket]
}
{:ok, activity} = CommonAPI.post(user, %{"status" => "Test"})
Streamer.push_to_socket(topics, "public", activity)
Task.await(task)
end
end

View file

@ -0,0 +1,31 @@
defmodule MockActivityPub do
def publish_one(ret) do
{ret, "success"}
end
end
defmodule Pleroma.ActivityTest do
use Pleroma.DataCase
alias Pleroma.Web.Federator.RetryQueue
@small_retry_count 0
@hopeless_retry_count 10
test "failed posts are retried" do
{:retry, _timeout} = RetryQueue.get_retry_params(@small_retry_count)
assert {:noreply, %{delivered: 1}} ==
RetryQueue.handle_info({:send, :ok, MockActivityPub, @small_retry_count}, %{
delivered: 0
})
end
test "posts that have been tried too many times are dropped" do
{:drop, _timeout} = RetryQueue.get_retry_params(@hopeless_retry_count)
assert {:noreply, %{dropped: 1}} ==
RetryQueue.handle_cast({:maybe_enqueue, %{}, nil, @hopeless_retry_count}, %{
dropped: 0
})
end
end

View file

@ -2,7 +2,7 @@ defmodule Pleroma.Web.StreamerTest do
use Pleroma.DataCase use Pleroma.DataCase
alias Pleroma.Web.Streamer alias Pleroma.Web.Streamer
alias Pleroma.User alias Pleroma.{List, User}
alias Pleroma.Web.CommonAPI alias Pleroma.Web.CommonAPI
import Pleroma.Factory import Pleroma.Factory
@ -60,4 +60,111 @@ test "it doesn't send to blocked users" do
Task.await(task) Task.await(task)
end end
test "it doesn't send unwanted DMs to list" do
user_a = insert(:user)
user_b = insert(:user)
user_c = insert(:user)
{:ok, user_a} = User.follow(user_a, user_b)
{:ok, list} = List.create("Test", user_a)
{:ok, list} = List.follow(list, user_b)
task =
Task.async(fn ->
refute_receive {:text, _}, 1_000
end)
fake_socket = %{
transport_pid: task.pid,
assigns: %{
user: user_a
}
}
{:ok, activity} =
CommonAPI.post(user_b, %{
"status" => "@#{user_c.nickname} Test",
"visibility" => "direct"
})
topics = %{
"list:#{list.id}" => [fake_socket]
}
Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
Task.await(task)
end
test "it doesn't send unwanted private posts to list" do
user_a = insert(:user)
user_b = insert(:user)
{:ok, list} = List.create("Test", user_a)
{:ok, list} = List.follow(list, user_b)
task =
Task.async(fn ->
refute_receive {:text, _}, 1_000
end)
fake_socket = %{
transport_pid: task.pid,
assigns: %{
user: user_a
}
}
{:ok, activity} =
CommonAPI.post(user_b, %{
"status" => "Test",
"visibility" => "private"
})
topics = %{
"list:#{list.id}" => [fake_socket]
}
Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
Task.await(task)
end
test "it send wanted private posts to list" do
user_a = insert(:user)
user_b = insert(:user)
{:ok, user_a} = User.follow(user_a, user_b)
{:ok, list} = List.create("Test", user_a)
{:ok, list} = List.follow(list, user_b)
task =
Task.async(fn ->
assert_receive {:text, _}, 1_000
end)
fake_socket = %{
transport_pid: task.pid,
assigns: %{
user: user_a
}
}
{:ok, activity} =
CommonAPI.post(user_b, %{
"status" => "Test",
"visibility" => "private"
})
topics = %{
"list:#{list.id}" => [fake_socket]
}
Streamer.handle_cast(%{action: :stream, topic: "list", item: activity}, topics)
Task.await(task)
end
end end