[#26] Move bulk configuration to index level

This commit is contained in:
Daniel Berkompas 2018-04-27 10:28:56 -07:00
parent b2d2e73d01
commit 8aa939a88a
10 changed files with 77 additions and 39 deletions

View file

@ -59,15 +59,6 @@ config :my_app, MyApp.ElasticsearchCluster,
username: "username",
password: "password",
# When indexing data using the `mix elasticsearch.build` task,
# control the data ingestion rate by raising or lowering the number
# of items to send in each bulk request.
bulk_page_size: 5000,
# Likewise, wait a given period between posting pages to give
# Elasticsearch time to catch up.
bulk_wait_interval: 15_000, # 15 seconds
# If you want to mock the responses of the Elasticsearch JSON API
# for testing or other purposes, you can inject a different module
# here. It must implement the Elasticsearch.API behaviour.
@ -100,7 +91,16 @@ config :my_app, MyApp.ElasticsearchCluster,
#
# Each piece of data that is returned by the store must implement the
# Elasticsearch.Document protocol.
sources: [MyApp.Post]
sources: [MyApp.Post],
# When indexing data using the `mix elasticsearch.build` task,
# control the data ingestion rate by raising or lowering the number
# of items to send in each bulk request.
bulk_page_size: 5000,
# Likewise, wait a given period between posting pages to give
# Elasticsearch time to catch up.
bulk_wait_interval: 15_000 # 15 seconds
}
}
```

View file

@ -13,4 +13,4 @@ config :elasticsearch, Elasticsearch.Test.Repo,
config :elasticsearch, ecto_repos: [Elasticsearch.Test.Repo]
config :logger, level: :warn
config :logger, level: :debug

View file

@ -79,28 +79,35 @@ defmodule Elasticsearch.Index.Bulk do
"""
@spec upload(Cluster.t(), index_name :: String.t(), Elasticsearch.Store.t(), list) ::
:ok | {:error, [map]}
def upload(cluster, index_name, store, sources, errors \\ [])
def upload(_cluster, _index_name, _store, [], []), do: :ok
def upload(_cluster, _index_name, _store, [], errors), do: {:error, errors}
def upload(cluster, index_name, index_config, errors \\ [])
def upload(_cluster, _index_name, %{sources: []}, []), do: :ok
def upload(_cluster, _index_name, %{sources: []}, errors), do: {:error, errors}
def upload(cluster, index_name, store, [source | tail] = _sources, errors)
def upload(
cluster,
index_name,
%{store: store, sources: [source | tail]} = index_config,
errors
)
when is_atom(store) do
config = Cluster.Config.get(cluster)
bulk_page_size = index_config[:bulk_page_size] || 5000
bulk_wait_interval = index_config[:bulk_wait_interval] || 0
errors =
config
|> DataStream.stream(source, store)
|> Stream.map(&encode!(config, &1, index_name))
|> Stream.chunk_every(config.bulk_page_size)
|> Stream.intersperse(config.bulk_wait_interval)
|> Stream.chunk_every(bulk_page_size)
|> Stream.intersperse(bulk_wait_interval)
|> Stream.map(&put_bulk_page(config, index_name, &1))
|> Enum.reduce(errors, &collect_errors/2)
upload(cluster, index_name, store, tail, errors)
upload(config, index_name, %{index_config | sources: tail}, errors)
end
defp put_bulk_page(_config, _index_name, wait_interval) when is_integer(wait_interval) do
IO.puts("Pausing #{wait_interval}ms between bulk pages")
Logger.debug("Pausing #{wait_interval}ms between bulk pages")
:timer.sleep(wait_interval)
end

View file

@ -23,24 +23,20 @@ defmodule Elasticsearch.Index do
iex> file = "test/support/settings/posts.json"
...> store = Elasticsearch.Test.Store
...> Index.hot_swap(Cluster, "posts", file, store, [Post])
...> Index.hot_swap(Cluster, "posts", %{settings: file, store: store, sources: [Post]})
:ok
"""
@spec hot_swap(
Cluster.t(),
alias :: String.t() | atom,
settings_path :: String.t(),
Elasticsearch.Store.t(),
list
) ::
:ok
| {:error, Elasticsearch.Exception.t()}
def hot_swap(cluster, alias, settings_file, store, sources) do
@spec hot_swap(Cluster.t(), alias :: String.t() | atom, %{
settings: Path.t(),
store: module,
sources: [any]
}) :: :ok | {:error, Elasticsearch.Exception.t()}
def hot_swap(cluster, alias, %{settings: settings_file} = index_config) do
name = build_name(alias)
config = Config.get(cluster)
with :ok <- create_from_file(config, name, settings_file),
:ok <- Bulk.upload(config, name, store, sources),
:ok <- Bulk.upload(config, name, index_config),
:ok <- __MODULE__.alias(config, name, alias),
:ok <- clean_starting_with(config, alias, 2),
:ok <- refresh(config, name) do

View file

@ -23,6 +23,8 @@ defmodule Mix.Tasks.Elasticsearch.Build do
require Logger
import Maybe
alias Elasticsearch.{
Cluster.Config,
Index
@ -54,9 +56,7 @@ defmodule Mix.Tasks.Elasticsearch.Build do
end
defp build(config, alias, :rebuild) do
%{settings: settings, store: store, sources: sources} = config.indexes[alias]
with :ok <- Index.hot_swap(config, alias, settings, store, sources) do
with :ok <- Index.hot_swap(config, alias, config.indexes[alias]) do
:ok
else
{:error, errors} when is_list(errors) ->
@ -71,7 +71,7 @@ defmodule Mix.Tasks.Elasticsearch.Build do
{:error, :enoent} ->
Mix.raise("""
Schema file not found at #{settings}.
Settings file not found at #{maybe(config, [:indexes, alias, :settings])}.
""")
{:error, exception} ->

View file

@ -61,6 +61,7 @@ defmodule Elasticsearch.Mixfile do
{:poison, ">= 0.0.0", optional: true},
{:httpoison, ">= 0.0.0"},
{:vex, "~> 0.6.0"},
{:maybe, "~> 1.0.0"},
{:postgrex, ">= 0.0.0", only: [:dev, :test]},
{:ex_doc, ">= 0.0.0", only: [:dev, :test]},
{:ecto, ">= 0.0.0", only: [:dev, :test]},

View file

@ -14,6 +14,7 @@
"idna": {:hex, :idna, "5.1.0", "d72b4effeb324ad5da3cab1767cb16b17939004e789d8c0ad5b70f3cea20c89a", [], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"},
"jason": {:hex, :jason, "1.0.0-rc.1", "c8421d4e6e6ef0dd7c2b64ff63589f8561116808fa003dddfd5360cde7bb4625", [], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"},
"jsx": {:hex, :jsx, "2.8.3", "a05252d381885240744d955fbe3cf810504eb2567164824e19303ea59eef62cf", [], [], "hexpm"},
"maybe": {:hex, :maybe, "1.0.0", "65311dd7e16659579116666b268d03d7e1d1b3da8776c81a6b199de7177b43d6", [:mix], [], "hexpm"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [], [], "hexpm"},
"mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [], [], "hexpm"},
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"},

View file

@ -1,6 +1,8 @@
defmodule Elasticsearch.Index.BulkTest do
use Elasticsearch.DataCase
import ExUnit.CaptureLog
alias Elasticsearch.{
Test.Cluster,
Test.Store,
@ -31,12 +33,12 @@ defmodule Elasticsearch.Index.BulkTest do
doctest Elasticsearch.Index.Bulk
describe ".upload/5" do
describe ".upload/4" do
# Regression test for https://github.com/infinitered/elasticsearch-elixir/issues/10
@tag :regression
test "calls itself recursively properly" do
assert {:error, [%TestException{}]} =
Bulk.upload(Cluster, :posts, Store, [Post], [%TestException{}])
Bulk.upload(Cluster, :posts, %{store: Store, sources: [Post]}, [%TestException{}])
end
test "collects errors properly" do
@ -46,7 +48,33 @@ defmodule Elasticsearch.Index.BulkTest do
Cluster
|> Elasticsearch.Cluster.Config.get()
|> Map.put(:api, ErrorAPI)
|> Bulk.upload(:posts, Store, [Post])
|> Bulk.upload(:posts, %{store: Store, sources: [Post]})
end
test "respects bulk_* settings" do
populate_posts_table(2)
Logger.configure(level: :debug)
output =
capture_log([level: :debug], fn ->
Elasticsearch.Index.create_from_file(
Cluster,
"posts-bulk-test",
"test/support/settings/posts.json"
)
Bulk.upload(Cluster, "posts-bulk-test", %{
store: Store,
sources: [Post],
bulk_page_size: 1,
bulk_wait_interval: 10
})
Elasticsearch.delete!(Cluster, "/posts-bulk-test")
end)
assert output =~ "Pausing 10ms between bulk pages"
end
end
end

View file

@ -2,6 +2,7 @@ defmodule Mix.Tasks.Elasticsearch.BuildTest do
use Elasticsearch.DataCase, async: false
import Mix.Task, only: [rerun: 2]
import ExUnit.CaptureLog
import ExUnit.CaptureIO
alias Elasticsearch.Index
@ -46,8 +47,10 @@ defmodule Mix.Tasks.Elasticsearch.BuildTest do
test "builds configured index" do
populate_posts_table()
Logger.configure(level: :debug)
output =
capture_io(fn ->
capture_log([level: :debug], fn ->
rerun("elasticsearch.build", ["posts"] ++ @cluster_opts)
end)

View file

@ -31,6 +31,8 @@ defmodule Elasticsearch.DataCase do
Ecto.Adapters.SQL.Sandbox.mode(Elasticsearch.Test.Repo, {:shared, self()})
end
Logger.configure(level: :warn)
:ok
end