From 825077a5b0dc0c90d93bc94ae83398c4f68d0003 Mon Sep 17 00:00:00 2001 From: Egor Kislitsyn Date: Wed, 26 Jun 2019 18:36:42 +0700 Subject: [PATCH 1/5] Add Idempotency plug --- lib/pleroma/plugs/idempotency_plug.ex | 82 +++++++++++++++++++ test/plugs/idempotency_plug_test.exs | 110 ++++++++++++++++++++++++++ 2 files changed, 192 insertions(+) create mode 100644 lib/pleroma/plugs/idempotency_plug.ex create mode 100644 test/plugs/idempotency_plug_test.exs diff --git a/lib/pleroma/plugs/idempotency_plug.ex b/lib/pleroma/plugs/idempotency_plug.ex new file mode 100644 index 000000000..442573d60 --- /dev/null +++ b/lib/pleroma/plugs/idempotency_plug.ex @@ -0,0 +1,82 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Plugs.IdempotencyPlug do + import Phoenix.Controller, only: [json: 2] + import Plug.Conn + + @behaviour Plug + + @impl true + def init(opts), do: opts + + # Sending idempotency keys in `GET` and `DELETE` requests has no effect and should be avoided, as these requests are idempotent by definition. + @impl true + def call(%{method: method} = conn, _) when method in ["POST", "PUT", "PATCH"] do + case get_req_header(conn, "idempotency-key") do + [key] -> process_request(conn, key) + _ -> conn + end + end + + def call(conn, _), do: conn + + def process_request(conn, key) do + case Cachex.get(:idempotency_cache, key) do + {:ok, nil} -> + cache_resposnse(conn, key) + + {atom, message} when atom in [:ignore, :error] -> + render_error(conn, message) + + {:ok, record} -> + send_cached(conn, key, record) + end + end + + defp cache_resposnse(conn, key) do + Plug.Conn.register_before_send(conn, fn conn -> + [request_id] = get_resp_header(conn, "x-request-id") + content_type = get_content_type(conn) + + record = {request_id, content_type, conn.status, conn.resp_body} + {:ok, _} = Cachex.put(:idempotency_cache, key, record) + + conn + |> put_resp_header("idempotency-key", key) + |> put_resp_header("x-original-request-id", request_id) + end) + end + + defp send_cached(conn, key, record) do + {request_id, content_type, status, body} = record + + conn + |> put_resp_header("idempotency-key", key) + |> put_resp_header("idempotent-replayed", "true") + |> put_resp_header("x-original-request-id", request_id) + |> put_resp_content_type(content_type) + |> send_resp(status, body) + |> halt() + end + + defp render_error(conn, message) do + conn + |> put_status(:unprocessable_entity) + |> json(%{error: message}) + |> halt() + end + + defp get_content_type(conn) do + [content_type] = get_resp_header(conn, "content-type") + + if String.contains?(content_type, ";") do + content_type + |> String.split(";") + |> hd() + else + content_type + end + end +end diff --git a/test/plugs/idempotency_plug_test.exs b/test/plugs/idempotency_plug_test.exs new file mode 100644 index 000000000..aebc463e9 --- /dev/null +++ b/test/plugs/idempotency_plug_test.exs @@ -0,0 +1,110 @@ +# Pleroma: A lightweight social networking server +# Copyright © 2017-2019 Pleroma Authors +# SPDX-License-Identifier: AGPL-3.0-only + +defmodule Pleroma.Plugs.IdempotencyPlugTest do + use ExUnit.Case, async: true + use Plug.Test + + alias Pleroma.Plugs.IdempotencyPlug + alias Plug.Conn + + test "returns result from cache" do + key = "test1" + orig_request_id = "test1" + second_request_id = "test2" + body = "testing" + status = 200 + + :post + |> conn("/cofe") + |> put_req_header("idempotency-key", key) + |> Conn.put_resp_header("x-request-id", orig_request_id) + |> Conn.put_resp_content_type("application/json") + |> IdempotencyPlug.call([]) + |> Conn.send_resp(status, body) + + conn2 = + :post + |> conn("/cofe") + |> put_req_header("idempotency-key", key) + |> Conn.put_resp_header("x-request-id", second_request_id) + |> Conn.put_resp_content_type("application/json") + |> IdempotencyPlug.call([]) + + assert_raise Conn.AlreadySentError, fn -> + Conn.send_resp(conn2, :im_a_teapot, "no cofe") + end + + assert conn2.resp_body == body + assert conn2.status == status + + assert [^second_request_id] = Conn.get_resp_header(conn2, "x-request-id") + assert [^orig_request_id] = Conn.get_resp_header(conn2, "x-original-request-id") + assert [^key] = Conn.get_resp_header(conn2, "idempotency-key") + assert ["true"] = Conn.get_resp_header(conn2, "idempotent-replayed") + assert ["application/json; charset=utf-8"] = Conn.get_resp_header(conn2, "content-type") + end + + test "pass conn downstream if the cache not found" do + key = "test2" + orig_request_id = "test3" + body = "testing" + status = 200 + + conn = + :post + |> conn("/cofe") + |> put_req_header("idempotency-key", key) + |> Conn.put_resp_header("x-request-id", orig_request_id) + |> Conn.put_resp_content_type("application/json") + |> IdempotencyPlug.call([]) + |> Conn.send_resp(status, body) + + assert conn.resp_body == body + assert conn.status == status + + assert [] = Conn.get_resp_header(conn, "idempotent-replayed") + assert [^key] = Conn.get_resp_header(conn, "idempotency-key") + end + + test "passes conn downstream if idempotency is not present in headers" do + orig_request_id = "test4" + body = "testing" + status = 200 + + conn = + :post + |> conn("/cofe") + |> Conn.put_resp_header("x-request-id", orig_request_id) + |> Conn.put_resp_content_type("application/json") + |> IdempotencyPlug.call([]) + |> Conn.send_resp(status, body) + + assert [] = Conn.get_resp_header(conn, "idempotency-key") + end + + test "doesn't work with GET/DELETE" do + key = "test3" + body = "testing" + status = 200 + + conn = + :get + |> conn("/cofe") + |> put_req_header("idempotency-key", key) + |> IdempotencyPlug.call([]) + |> Conn.send_resp(status, body) + + assert [] = Conn.get_resp_header(conn, "idempotency-key") + + conn = + :delete + |> conn("/cofe") + |> put_req_header("idempotency-key", key) + |> IdempotencyPlug.call([]) + |> Conn.send_resp(status, body) + + assert [] = Conn.get_resp_header(conn, "idempotency-key") + end +end From 74132e371545c0c6090cfefa22c4ad3f5c505a40 Mon Sep 17 00:00:00 2001 From: Egor Kislitsyn Date: Wed, 26 Jun 2019 18:42:49 +0700 Subject: [PATCH 2/5] Enable IdempotencyPlug for the all API --- lib/pleroma/web/router.ex | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/pleroma/web/router.ex b/lib/pleroma/web/router.ex index c504116b6..055289dc5 100644 --- a/lib/pleroma/web/router.ex +++ b/lib/pleroma/web/router.ex @@ -27,6 +27,7 @@ defmodule Pleroma.Web.Router do plug(Pleroma.Plugs.UserEnabledPlug) plug(Pleroma.Plugs.SetUserSessionIdPlug) plug(Pleroma.Plugs.EnsureUserKeyPlug) + plug(Pleroma.Plugs.IdempotencyPlug) end pipeline :authenticated_api do @@ -41,6 +42,7 @@ defmodule Pleroma.Web.Router do plug(Pleroma.Plugs.UserEnabledPlug) plug(Pleroma.Plugs.SetUserSessionIdPlug) plug(Pleroma.Plugs.EnsureAuthenticatedPlug) + plug(Pleroma.Plugs.IdempotencyPlug) end pipeline :admin_api do @@ -57,6 +59,7 @@ defmodule Pleroma.Web.Router do plug(Pleroma.Plugs.SetUserSessionIdPlug) plug(Pleroma.Plugs.EnsureAuthenticatedPlug) plug(Pleroma.Plugs.UserIsAdminPlug) + plug(Pleroma.Plugs.IdempotencyPlug) end pipeline :mastodon_html do From 0b8aeac0f3ff560442f412301acb581c2ef1684b Mon Sep 17 00:00:00 2001 From: Egor Kislitsyn Date: Wed, 26 Jun 2019 18:49:14 +0700 Subject: [PATCH 3/5] Remove previous idempotency implementation from `post_status` --- .../mastodon_api/mastodon_api_controller.ex | 24 ++----------------- 1 file changed, 2 insertions(+), 22 deletions(-) diff --git a/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex b/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex index 9b9eca2a1..d2f08d503 100644 --- a/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex +++ b/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex @@ -561,18 +561,13 @@ def post_status(%{assigns: %{user: user}} = conn, %{"status" => _} = params) do else params = Map.drop(params, ["scheduled_at"]) - case get_cached_status_or_post(conn, params) do - {:ignore, message} -> - conn - |> put_status(422) - |> json(%{error: message}) - + case CommonAPI.post(user, params) do {:error, message} -> conn |> put_status(422) |> json(%{error: message}) - {_, activity} -> + {:ok, activity} -> conn |> put_view(StatusView) |> try_render("status.json", %{activity: activity, for: user, as: :activity}) @@ -580,21 +575,6 @@ def post_status(%{assigns: %{user: user}} = conn, %{"status" => _} = params) do end end - defp get_cached_status_or_post(%{assigns: %{user: user}} = conn, params) do - idempotency_key = - case get_req_header(conn, "idempotency-key") do - [key] -> key - _ -> Ecto.UUID.generate() - end - - Cachex.fetch(:idempotency_cache, idempotency_key, fn _ -> - case CommonAPI.post(user, params) do - {:ok, activity} -> activity - {:error, message} -> {:ignore, message} - end - end) - end - def delete_status(%{assigns: %{user: user}} = conn, %{"id" => id}) do with {:ok, %Activity{}} <- CommonAPI.delete(id, user) do json(conn, %{}) From 159630b21cba565e02716e06e9d4f8ad1bf5dab5 Mon Sep 17 00:00:00 2001 From: Egor Kislitsyn Date: Wed, 26 Jun 2019 19:19:07 +0700 Subject: [PATCH 4/5] Fix credo warning --- lib/pleroma/plugs/idempotency_plug.ex | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/pleroma/plugs/idempotency_plug.ex b/lib/pleroma/plugs/idempotency_plug.ex index 442573d60..7c06be9ea 100644 --- a/lib/pleroma/plugs/idempotency_plug.ex +++ b/lib/pleroma/plugs/idempotency_plug.ex @@ -11,7 +11,9 @@ defmodule Pleroma.Plugs.IdempotencyPlug do @impl true def init(opts), do: opts - # Sending idempotency keys in `GET` and `DELETE` requests has no effect and should be avoided, as these requests are idempotent by definition. + # Sending idempotency keys in `GET` and `DELETE` requests has no effect + # and should be avoided, as these requests are idempotent by definition. + @impl true def call(%{method: method} = conn, _) when method in ["POST", "PUT", "PATCH"] do case get_req_header(conn, "idempotency-key") do From 889a9c3a3f427f5fdf2708fd281c1218d08b8fd7 Mon Sep 17 00:00:00 2001 From: Egor Kislitsyn Date: Thu, 27 Jun 2019 01:53:36 +0700 Subject: [PATCH 5/5] Polish IdempotencyPlug --- lib/pleroma/plugs/idempotency_plug.ex | 8 ++++---- .../mastodon_api/mastodon_api_controller.ex | 2 +- test/plugs/idempotency_plug_test.exs | 18 +++++++++--------- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/lib/pleroma/plugs/idempotency_plug.ex b/lib/pleroma/plugs/idempotency_plug.ex index 7c06be9ea..e99c5d279 100644 --- a/lib/pleroma/plugs/idempotency_plug.ex +++ b/lib/pleroma/plugs/idempotency_plug.ex @@ -29,16 +29,16 @@ def process_request(conn, key) do {:ok, nil} -> cache_resposnse(conn, key) - {atom, message} when atom in [:ignore, :error] -> - render_error(conn, message) - {:ok, record} -> send_cached(conn, key, record) + + {atom, message} when atom in [:ignore, :error] -> + render_error(conn, message) end end defp cache_resposnse(conn, key) do - Plug.Conn.register_before_send(conn, fn conn -> + register_before_send(conn, fn conn -> [request_id] = get_resp_header(conn, "x-request-id") content_type = get_content_type(conn) diff --git a/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex b/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex index d2f08d503..7cdba4cc0 100644 --- a/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex +++ b/lib/pleroma/web/mastodon_api/mastodon_api_controller.ex @@ -564,7 +564,7 @@ def post_status(%{assigns: %{user: user}} = conn, %{"status" => _} = params) do case CommonAPI.post(user, params) do {:error, message} -> conn - |> put_status(422) + |> put_status(:unprocessable_entity) |> json(%{error: message}) {:ok, activity} -> diff --git a/test/plugs/idempotency_plug_test.exs b/test/plugs/idempotency_plug_test.exs index aebc463e9..ac1735f13 100644 --- a/test/plugs/idempotency_plug_test.exs +++ b/test/plugs/idempotency_plug_test.exs @@ -24,7 +24,7 @@ test "returns result from cache" do |> IdempotencyPlug.call([]) |> Conn.send_resp(status, body) - conn2 = + conn = :post |> conn("/cofe") |> put_req_header("idempotency-key", key) @@ -33,17 +33,17 @@ test "returns result from cache" do |> IdempotencyPlug.call([]) assert_raise Conn.AlreadySentError, fn -> - Conn.send_resp(conn2, :im_a_teapot, "no cofe") + Conn.send_resp(conn, :im_a_teapot, "no cofe") end - assert conn2.resp_body == body - assert conn2.status == status + assert conn.resp_body == body + assert conn.status == status - assert [^second_request_id] = Conn.get_resp_header(conn2, "x-request-id") - assert [^orig_request_id] = Conn.get_resp_header(conn2, "x-original-request-id") - assert [^key] = Conn.get_resp_header(conn2, "idempotency-key") - assert ["true"] = Conn.get_resp_header(conn2, "idempotent-replayed") - assert ["application/json; charset=utf-8"] = Conn.get_resp_header(conn2, "content-type") + assert [^second_request_id] = Conn.get_resp_header(conn, "x-request-id") + assert [^orig_request_id] = Conn.get_resp_header(conn, "x-original-request-id") + assert [^key] = Conn.get_resp_header(conn, "idempotency-key") + assert ["true"] = Conn.get_resp_header(conn, "idempotent-replayed") + assert ["application/json; charset=utf-8"] = Conn.get_resp_header(conn, "content-type") end test "pass conn downstream if the cache not found" do