Merge branch 'idempotency-plug' into 'develop'

Add IdempotencyPlug and use it in all of the api

Closes #1003

See merge request pleroma/pleroma!1339
This commit is contained in:
kaniini 2019-06-27 04:20:17 +00:00
commit 0369a5db16
4 changed files with 200 additions and 23 deletions

View file

@ -0,0 +1,84 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Plugs.IdempotencyPlug do
import Phoenix.Controller, only: [json: 2]
import Plug.Conn
@behaviour Plug
@impl true
def init(opts), do: opts
# Sending idempotency keys in `GET` and `DELETE` requests has no effect
# and should be avoided, as these requests are idempotent by definition.
@impl true
def call(%{method: method} = conn, _) when method in ["POST", "PUT", "PATCH"] do
case get_req_header(conn, "idempotency-key") do
[key] -> process_request(conn, key)
_ -> conn
end
end
def call(conn, _), do: conn
def process_request(conn, key) do
case Cachex.get(:idempotency_cache, key) do
{:ok, nil} ->
cache_resposnse(conn, key)
{:ok, record} ->
send_cached(conn, key, record)
{atom, message} when atom in [:ignore, :error] ->
render_error(conn, message)
end
end
defp cache_resposnse(conn, key) do
register_before_send(conn, fn conn ->
[request_id] = get_resp_header(conn, "x-request-id")
content_type = get_content_type(conn)
record = {request_id, content_type, conn.status, conn.resp_body}
{:ok, _} = Cachex.put(:idempotency_cache, key, record)
conn
|> put_resp_header("idempotency-key", key)
|> put_resp_header("x-original-request-id", request_id)
end)
end
defp send_cached(conn, key, record) do
{request_id, content_type, status, body} = record
conn
|> put_resp_header("idempotency-key", key)
|> put_resp_header("idempotent-replayed", "true")
|> put_resp_header("x-original-request-id", request_id)
|> put_resp_content_type(content_type)
|> send_resp(status, body)
|> halt()
end
defp render_error(conn, message) do
conn
|> put_status(:unprocessable_entity)
|> json(%{error: message})
|> halt()
end
defp get_content_type(conn) do
[content_type] = get_resp_header(conn, "content-type")
if String.contains?(content_type, ";") do
content_type
|> String.split(";")
|> hd()
else
content_type
end
end
end

View file

@ -561,18 +561,13 @@ def post_status(%{assigns: %{user: user}} = conn, %{"status" => _} = params) do
else else
params = Map.drop(params, ["scheduled_at"]) params = Map.drop(params, ["scheduled_at"])
case get_cached_status_or_post(conn, params) do case CommonAPI.post(user, params) do
{:ignore, message} ->
conn
|> put_status(422)
|> json(%{error: message})
{:error, message} -> {:error, message} ->
conn conn
|> put_status(422) |> put_status(:unprocessable_entity)
|> json(%{error: message}) |> json(%{error: message})
{_, activity} -> {:ok, activity} ->
conn conn
|> put_view(StatusView) |> put_view(StatusView)
|> try_render("status.json", %{activity: activity, for: user, as: :activity}) |> try_render("status.json", %{activity: activity, for: user, as: :activity})
@ -580,21 +575,6 @@ def post_status(%{assigns: %{user: user}} = conn, %{"status" => _} = params) do
end end
end end
defp get_cached_status_or_post(%{assigns: %{user: user}} = conn, params) do
idempotency_key =
case get_req_header(conn, "idempotency-key") do
[key] -> key
_ -> Ecto.UUID.generate()
end
Cachex.fetch(:idempotency_cache, idempotency_key, fn _ ->
case CommonAPI.post(user, params) do
{:ok, activity} -> activity
{:error, message} -> {:ignore, message}
end
end)
end
def delete_status(%{assigns: %{user: user}} = conn, %{"id" => id}) do def delete_status(%{assigns: %{user: user}} = conn, %{"id" => id}) do
with {:ok, %Activity{}} <- CommonAPI.delete(id, user) do with {:ok, %Activity{}} <- CommonAPI.delete(id, user) do
json(conn, %{}) json(conn, %{})

View file

@ -27,6 +27,7 @@ defmodule Pleroma.Web.Router do
plug(Pleroma.Plugs.UserEnabledPlug) plug(Pleroma.Plugs.UserEnabledPlug)
plug(Pleroma.Plugs.SetUserSessionIdPlug) plug(Pleroma.Plugs.SetUserSessionIdPlug)
plug(Pleroma.Plugs.EnsureUserKeyPlug) plug(Pleroma.Plugs.EnsureUserKeyPlug)
plug(Pleroma.Plugs.IdempotencyPlug)
end end
pipeline :authenticated_api do pipeline :authenticated_api do
@ -41,6 +42,7 @@ defmodule Pleroma.Web.Router do
plug(Pleroma.Plugs.UserEnabledPlug) plug(Pleroma.Plugs.UserEnabledPlug)
plug(Pleroma.Plugs.SetUserSessionIdPlug) plug(Pleroma.Plugs.SetUserSessionIdPlug)
plug(Pleroma.Plugs.EnsureAuthenticatedPlug) plug(Pleroma.Plugs.EnsureAuthenticatedPlug)
plug(Pleroma.Plugs.IdempotencyPlug)
end end
pipeline :admin_api do pipeline :admin_api do
@ -57,6 +59,7 @@ defmodule Pleroma.Web.Router do
plug(Pleroma.Plugs.SetUserSessionIdPlug) plug(Pleroma.Plugs.SetUserSessionIdPlug)
plug(Pleroma.Plugs.EnsureAuthenticatedPlug) plug(Pleroma.Plugs.EnsureAuthenticatedPlug)
plug(Pleroma.Plugs.UserIsAdminPlug) plug(Pleroma.Plugs.UserIsAdminPlug)
plug(Pleroma.Plugs.IdempotencyPlug)
end end
pipeline :mastodon_html do pipeline :mastodon_html do

View file

@ -0,0 +1,110 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.Plugs.IdempotencyPlugTest do
use ExUnit.Case, async: true
use Plug.Test
alias Pleroma.Plugs.IdempotencyPlug
alias Plug.Conn
test "returns result from cache" do
key = "test1"
orig_request_id = "test1"
second_request_id = "test2"
body = "testing"
status = 200
:post
|> conn("/cofe")
|> put_req_header("idempotency-key", key)
|> Conn.put_resp_header("x-request-id", orig_request_id)
|> Conn.put_resp_content_type("application/json")
|> IdempotencyPlug.call([])
|> Conn.send_resp(status, body)
conn =
:post
|> conn("/cofe")
|> put_req_header("idempotency-key", key)
|> Conn.put_resp_header("x-request-id", second_request_id)
|> Conn.put_resp_content_type("application/json")
|> IdempotencyPlug.call([])
assert_raise Conn.AlreadySentError, fn ->
Conn.send_resp(conn, :im_a_teapot, "no cofe")
end
assert conn.resp_body == body
assert conn.status == status
assert [^second_request_id] = Conn.get_resp_header(conn, "x-request-id")
assert [^orig_request_id] = Conn.get_resp_header(conn, "x-original-request-id")
assert [^key] = Conn.get_resp_header(conn, "idempotency-key")
assert ["true"] = Conn.get_resp_header(conn, "idempotent-replayed")
assert ["application/json; charset=utf-8"] = Conn.get_resp_header(conn, "content-type")
end
test "pass conn downstream if the cache not found" do
key = "test2"
orig_request_id = "test3"
body = "testing"
status = 200
conn =
:post
|> conn("/cofe")
|> put_req_header("idempotency-key", key)
|> Conn.put_resp_header("x-request-id", orig_request_id)
|> Conn.put_resp_content_type("application/json")
|> IdempotencyPlug.call([])
|> Conn.send_resp(status, body)
assert conn.resp_body == body
assert conn.status == status
assert [] = Conn.get_resp_header(conn, "idempotent-replayed")
assert [^key] = Conn.get_resp_header(conn, "idempotency-key")
end
test "passes conn downstream if idempotency is not present in headers" do
orig_request_id = "test4"
body = "testing"
status = 200
conn =
:post
|> conn("/cofe")
|> Conn.put_resp_header("x-request-id", orig_request_id)
|> Conn.put_resp_content_type("application/json")
|> IdempotencyPlug.call([])
|> Conn.send_resp(status, body)
assert [] = Conn.get_resp_header(conn, "idempotency-key")
end
test "doesn't work with GET/DELETE" do
key = "test3"
body = "testing"
status = 200
conn =
:get
|> conn("/cofe")
|> put_req_header("idempotency-key", key)
|> IdempotencyPlug.call([])
|> Conn.send_resp(status, body)
assert [] = Conn.get_resp_header(conn, "idempotency-key")
conn =
:delete
|> conn("/cofe")
|> put_req_header("idempotency-key", key)
|> IdempotencyPlug.call([])
|> Conn.send_resp(status, body)
assert [] = Conn.get_resp_header(conn, "idempotency-key")
end
end