From 8aa939a88a10c2b8d2d17c79f31e0405df711227 Mon Sep 17 00:00:00 2001 From: Daniel Berkompas Date: Fri, 27 Apr 2018 10:28:56 -0700 Subject: [PATCH] [#26] Move bulk configuration to index level --- README.md | 20 ++++++------- config/config.exs | 2 +- lib/elasticsearch/indexing/bulk.ex | 23 +++++++++------ lib/elasticsearch/indexing/index.ex | 20 ++++++------- lib/mix/elasticsearch.build.ex | 8 +++--- mix.exs | 1 + mix.lock | 1 + test/elasticsearch/indexing/bulk_test.exs | 34 +++++++++++++++++++++-- test/mix/elasticsearch.build_test.exs | 5 +++- test/support/data_case.ex | 2 ++ 10 files changed, 77 insertions(+), 39 deletions(-) diff --git a/README.md b/README.md index 5c22d75..f619895 100644 --- a/README.md +++ b/README.md @@ -59,15 +59,6 @@ config :my_app, MyApp.ElasticsearchCluster, username: "username", password: "password", - # When indexing data using the `mix elasticsearch.build` task, - # control the data ingestion rate by raising or lowering the number - # of items to send in each bulk request. - bulk_page_size: 5000, - - # Likewise, wait a given period between posting pages to give - # Elasticsearch time to catch up. - bulk_wait_interval: 15_000, # 15 seconds - # If you want to mock the responses of the Elasticsearch JSON API # for testing or other purposes, you can inject a different module # here. It must implement the Elasticsearch.API behaviour. @@ -100,7 +91,16 @@ config :my_app, MyApp.ElasticsearchCluster, # # Each piece of data that is returned by the store must implement the # Elasticsearch.Document protocol. - sources: [MyApp.Post] + sources: [MyApp.Post], + + # When indexing data using the `mix elasticsearch.build` task, + # control the data ingestion rate by raising or lowering the number + # of items to send in each bulk request. + bulk_page_size: 5000, + + # Likewise, wait a given period between posting pages to give + # Elasticsearch time to catch up. + bulk_wait_interval: 15_000 # 15 seconds } } ``` diff --git a/config/config.exs b/config/config.exs index 9eff770..7da48e6 100644 --- a/config/config.exs +++ b/config/config.exs @@ -13,4 +13,4 @@ config :elasticsearch, Elasticsearch.Test.Repo, config :elasticsearch, ecto_repos: [Elasticsearch.Test.Repo] -config :logger, level: :warn +config :logger, level: :debug diff --git a/lib/elasticsearch/indexing/bulk.ex b/lib/elasticsearch/indexing/bulk.ex index 10a286f..5610a1c 100644 --- a/lib/elasticsearch/indexing/bulk.ex +++ b/lib/elasticsearch/indexing/bulk.ex @@ -79,28 +79,35 @@ defmodule Elasticsearch.Index.Bulk do """ @spec upload(Cluster.t(), index_name :: String.t(), Elasticsearch.Store.t(), list) :: :ok | {:error, [map]} - def upload(cluster, index_name, store, sources, errors \\ []) - def upload(_cluster, _index_name, _store, [], []), do: :ok - def upload(_cluster, _index_name, _store, [], errors), do: {:error, errors} + def upload(cluster, index_name, index_config, errors \\ []) + def upload(_cluster, _index_name, %{sources: []}, []), do: :ok + def upload(_cluster, _index_name, %{sources: []}, errors), do: {:error, errors} - def upload(cluster, index_name, store, [source | tail] = _sources, errors) + def upload( + cluster, + index_name, + %{store: store, sources: [source | tail]} = index_config, + errors + ) when is_atom(store) do config = Cluster.Config.get(cluster) + bulk_page_size = index_config[:bulk_page_size] || 5000 + bulk_wait_interval = index_config[:bulk_wait_interval] || 0 errors = config |> DataStream.stream(source, store) |> Stream.map(&encode!(config, &1, index_name)) - |> Stream.chunk_every(config.bulk_page_size) - |> Stream.intersperse(config.bulk_wait_interval) + |> Stream.chunk_every(bulk_page_size) + |> Stream.intersperse(bulk_wait_interval) |> Stream.map(&put_bulk_page(config, index_name, &1)) |> Enum.reduce(errors, &collect_errors/2) - upload(cluster, index_name, store, tail, errors) + upload(config, index_name, %{index_config | sources: tail}, errors) end defp put_bulk_page(_config, _index_name, wait_interval) when is_integer(wait_interval) do - IO.puts("Pausing #{wait_interval}ms between bulk pages") + Logger.debug("Pausing #{wait_interval}ms between bulk pages") :timer.sleep(wait_interval) end diff --git a/lib/elasticsearch/indexing/index.ex b/lib/elasticsearch/indexing/index.ex index a7af9ba..cdae962 100644 --- a/lib/elasticsearch/indexing/index.ex +++ b/lib/elasticsearch/indexing/index.ex @@ -23,24 +23,20 @@ defmodule Elasticsearch.Index do iex> file = "test/support/settings/posts.json" ...> store = Elasticsearch.Test.Store - ...> Index.hot_swap(Cluster, "posts", file, store, [Post]) + ...> Index.hot_swap(Cluster, "posts", %{settings: file, store: store, sources: [Post]}) :ok """ - @spec hot_swap( - Cluster.t(), - alias :: String.t() | atom, - settings_path :: String.t(), - Elasticsearch.Store.t(), - list - ) :: - :ok - | {:error, Elasticsearch.Exception.t()} - def hot_swap(cluster, alias, settings_file, store, sources) do + @spec hot_swap(Cluster.t(), alias :: String.t() | atom, %{ + settings: Path.t(), + store: module, + sources: [any] + }) :: :ok | {:error, Elasticsearch.Exception.t()} + def hot_swap(cluster, alias, %{settings: settings_file} = index_config) do name = build_name(alias) config = Config.get(cluster) with :ok <- create_from_file(config, name, settings_file), - :ok <- Bulk.upload(config, name, store, sources), + :ok <- Bulk.upload(config, name, index_config), :ok <- __MODULE__.alias(config, name, alias), :ok <- clean_starting_with(config, alias, 2), :ok <- refresh(config, name) do diff --git a/lib/mix/elasticsearch.build.ex b/lib/mix/elasticsearch.build.ex index 865f145..f98c711 100644 --- a/lib/mix/elasticsearch.build.ex +++ b/lib/mix/elasticsearch.build.ex @@ -23,6 +23,8 @@ defmodule Mix.Tasks.Elasticsearch.Build do require Logger + import Maybe + alias Elasticsearch.{ Cluster.Config, Index @@ -54,9 +56,7 @@ defmodule Mix.Tasks.Elasticsearch.Build do end defp build(config, alias, :rebuild) do - %{settings: settings, store: store, sources: sources} = config.indexes[alias] - - with :ok <- Index.hot_swap(config, alias, settings, store, sources) do + with :ok <- Index.hot_swap(config, alias, config.indexes[alias]) do :ok else {:error, errors} when is_list(errors) -> @@ -71,7 +71,7 @@ defmodule Mix.Tasks.Elasticsearch.Build do {:error, :enoent} -> Mix.raise(""" - Schema file not found at #{settings}. + Settings file not found at #{maybe(config, [:indexes, alias, :settings])}. """) {:error, exception} -> diff --git a/mix.exs b/mix.exs index 29a8995..53c895c 100644 --- a/mix.exs +++ b/mix.exs @@ -61,6 +61,7 @@ defmodule Elasticsearch.Mixfile do {:poison, ">= 0.0.0", optional: true}, {:httpoison, ">= 0.0.0"}, {:vex, "~> 0.6.0"}, + {:maybe, "~> 1.0.0"}, {:postgrex, ">= 0.0.0", only: [:dev, :test]}, {:ex_doc, ">= 0.0.0", only: [:dev, :test]}, {:ecto, ">= 0.0.0", only: [:dev, :test]}, diff --git a/mix.lock b/mix.lock index 72767fc..fbc9d1c 100644 --- a/mix.lock +++ b/mix.lock @@ -14,6 +14,7 @@ "idna": {:hex, :idna, "5.1.0", "d72b4effeb324ad5da3cab1767cb16b17939004e789d8c0ad5b70f3cea20c89a", [], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"}, "jason": {:hex, :jason, "1.0.0-rc.1", "c8421d4e6e6ef0dd7c2b64ff63589f8561116808fa003dddfd5360cde7bb4625", [], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"}, "jsx": {:hex, :jsx, "2.8.3", "a05252d381885240744d955fbe3cf810504eb2567164824e19303ea59eef62cf", [], [], "hexpm"}, + "maybe": {:hex, :maybe, "1.0.0", "65311dd7e16659579116666b268d03d7e1d1b3da8776c81a6b199de7177b43d6", [:mix], [], "hexpm"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [], [], "hexpm"}, "mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [], [], "hexpm"}, "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}, diff --git a/test/elasticsearch/indexing/bulk_test.exs b/test/elasticsearch/indexing/bulk_test.exs index 7dc9de7..9930db4 100644 --- a/test/elasticsearch/indexing/bulk_test.exs +++ b/test/elasticsearch/indexing/bulk_test.exs @@ -1,6 +1,8 @@ defmodule Elasticsearch.Index.BulkTest do use Elasticsearch.DataCase + import ExUnit.CaptureLog + alias Elasticsearch.{ Test.Cluster, Test.Store, @@ -31,12 +33,12 @@ defmodule Elasticsearch.Index.BulkTest do doctest Elasticsearch.Index.Bulk - describe ".upload/5" do + describe ".upload/4" do # Regression test for https://github.com/infinitered/elasticsearch-elixir/issues/10 @tag :regression test "calls itself recursively properly" do assert {:error, [%TestException{}]} = - Bulk.upload(Cluster, :posts, Store, [Post], [%TestException{}]) + Bulk.upload(Cluster, :posts, %{store: Store, sources: [Post]}, [%TestException{}]) end test "collects errors properly" do @@ -46,7 +48,33 @@ defmodule Elasticsearch.Index.BulkTest do Cluster |> Elasticsearch.Cluster.Config.get() |> Map.put(:api, ErrorAPI) - |> Bulk.upload(:posts, Store, [Post]) + |> Bulk.upload(:posts, %{store: Store, sources: [Post]}) + end + + test "respects bulk_* settings" do + populate_posts_table(2) + + Logger.configure(level: :debug) + + output = + capture_log([level: :debug], fn -> + Elasticsearch.Index.create_from_file( + Cluster, + "posts-bulk-test", + "test/support/settings/posts.json" + ) + + Bulk.upload(Cluster, "posts-bulk-test", %{ + store: Store, + sources: [Post], + bulk_page_size: 1, + bulk_wait_interval: 10 + }) + + Elasticsearch.delete!(Cluster, "/posts-bulk-test") + end) + + assert output =~ "Pausing 10ms between bulk pages" end end end diff --git a/test/mix/elasticsearch.build_test.exs b/test/mix/elasticsearch.build_test.exs index 55391b3..f70e205 100644 --- a/test/mix/elasticsearch.build_test.exs +++ b/test/mix/elasticsearch.build_test.exs @@ -2,6 +2,7 @@ defmodule Mix.Tasks.Elasticsearch.BuildTest do use Elasticsearch.DataCase, async: false import Mix.Task, only: [rerun: 2] + import ExUnit.CaptureLog import ExUnit.CaptureIO alias Elasticsearch.Index @@ -46,8 +47,10 @@ defmodule Mix.Tasks.Elasticsearch.BuildTest do test "builds configured index" do populate_posts_table() + Logger.configure(level: :debug) + output = - capture_io(fn -> + capture_log([level: :debug], fn -> rerun("elasticsearch.build", ["posts"] ++ @cluster_opts) end) diff --git a/test/support/data_case.ex b/test/support/data_case.ex index cafd1df..af9bfd7 100644 --- a/test/support/data_case.ex +++ b/test/support/data_case.ex @@ -31,6 +31,8 @@ defmodule Elasticsearch.DataCase do Ecto.Adapters.SQL.Sandbox.mode(Elasticsearch.Test.Repo, {:shared, self()}) end + Logger.configure(level: :warn) + :ok end