From ca32ae5cd3305d98708687190a0b3c98094c7cbb Mon Sep 17 00:00:00 2001 From: Tom Crossland Date: Fri, 27 Sep 2019 14:08:50 +0200 Subject: [PATCH 1/8] feat: allow document type to be specified in bulk encoding --- lib/elasticsearch/indexing/bulk.ex | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/lib/elasticsearch/indexing/bulk.ex b/lib/elasticsearch/indexing/bulk.ex index 7f0a7b5..705edd6 100644 --- a/lib/elasticsearch/indexing/bulk.ex +++ b/lib/elasticsearch/indexing/bulk.ex @@ -27,11 +27,11 @@ defmodule Elasticsearch.Index.Bulk do %Protocol.UndefinedError{description: "", protocol: Elasticsearch.Document, value: 123}} """ - @spec encode(Cluster.t(), struct, String.t()) :: + @spec encode(Cluster.t(), struct, String.t(), Keyword.t()) :: {:ok, String.t()} | {:error, Error.t()} - def encode(cluster, struct, index) do - {:ok, encode!(cluster, struct, index)} + def encode(cluster, struct, index, opts \\ []) do + {:ok, encode!(cluster, struct, index, opts)} rescue exception -> {:error, exception} @@ -51,9 +51,9 @@ defmodule Elasticsearch.Index.Bulk do iex> Bulk.encode!(Cluster, 123, "my-index") ** (Protocol.UndefinedError) protocol Elasticsearch.Document not implemented for 123. This protocol is implemented for: Comment, Post """ - def encode!(cluster, struct, index) do + def encode!(cluster, struct, index, opts \\ []) do config = Cluster.Config.get(cluster) - header = header(config, "create", index, struct) + header = header(config, "create", index, struct, opts) document = struct @@ -63,12 +63,17 @@ defmodule Elasticsearch.Index.Bulk do "#{header}\n#{document}\n" end - defp header(config, type, index, struct) do + defp header(config, type, index, struct, opts) do attrs = %{ "_index" => index, "_id" => Document.id(struct) } + attrs = case opts[:type] do + nil -> attrs + type -> Map.put(attrs, "_type", type) + end + attrs = if routing = Document.routing(struct) do Map.put(attrs, "_routing", routing) From 5b00d000182688fe812b04eb0e70ed3ba85afcd3 Mon Sep 17 00:00:00 2001 From: Tom Crossland Date: Fri, 27 Sep 2019 14:11:41 +0200 Subject: [PATCH 2/8] feat: allow action to be specified in bulk encoding --- lib/elasticsearch/indexing/bulk.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/elasticsearch/indexing/bulk.ex b/lib/elasticsearch/indexing/bulk.ex index 705edd6..de91522 100644 --- a/lib/elasticsearch/indexing/bulk.ex +++ b/lib/elasticsearch/indexing/bulk.ex @@ -52,8 +52,9 @@ defmodule Elasticsearch.Index.Bulk do ** (Protocol.UndefinedError) protocol Elasticsearch.Document not implemented for 123. This protocol is implemented for: Comment, Post """ def encode!(cluster, struct, index, opts \\ []) do + action = Keyword.get(opts, :action, "create") config = Cluster.Config.get(cluster) - header = header(config, "create", index, struct, opts) + header = header(config, action, index, struct, opts) document = struct From dc890cabbd4e239b932da66c7ce6090a7fb9dd80 Mon Sep 17 00:00:00 2001 From: Tom Crossland Date: Mon, 30 Sep 2019 09:08:10 +0200 Subject: [PATCH 3/8] refactor: we can specify the type in the bulk API url --- lib/elasticsearch/indexing/bulk.ex | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/lib/elasticsearch/indexing/bulk.ex b/lib/elasticsearch/indexing/bulk.ex index de91522..6553f06 100644 --- a/lib/elasticsearch/indexing/bulk.ex +++ b/lib/elasticsearch/indexing/bulk.ex @@ -27,11 +27,11 @@ defmodule Elasticsearch.Index.Bulk do %Protocol.UndefinedError{description: "", protocol: Elasticsearch.Document, value: 123}} """ - @spec encode(Cluster.t(), struct, String.t(), Keyword.t()) :: + @spec encode(Cluster.t(), struct, String.t(), String.t()) :: {:ok, String.t()} | {:error, Error.t()} - def encode(cluster, struct, index, opts \\ []) do - {:ok, encode!(cluster, struct, index, opts)} + def encode(cluster, struct, index, action \\ "create") do + {:ok, encode!(cluster, struct, index, action)} rescue exception -> {:error, exception} @@ -51,10 +51,9 @@ defmodule Elasticsearch.Index.Bulk do iex> Bulk.encode!(Cluster, 123, "my-index") ** (Protocol.UndefinedError) protocol Elasticsearch.Document not implemented for 123. This protocol is implemented for: Comment, Post """ - def encode!(cluster, struct, index, opts \\ []) do - action = Keyword.get(opts, :action, "create") + def encode!(cluster, struct, index, action \\ "create") do config = Cluster.Config.get(cluster) - header = header(config, action, index, struct, opts) + header = header(config, action, index, struct) document = struct @@ -64,17 +63,12 @@ defmodule Elasticsearch.Index.Bulk do "#{header}\n#{document}\n" end - defp header(config, type, index, struct, opts) do + defp header(config, type, index, struct) do attrs = %{ "_index" => index, "_id" => Document.id(struct) } - attrs = case opts[:type] do - nil -> attrs - type -> Map.put(attrs, "_type", type) - end - attrs = if routing = Document.routing(struct) do Map.put(attrs, "_routing", routing) From 3674d6e4e6020dfa8ef93eade0d5c22024fbc703 Mon Sep 17 00:00:00 2001 From: Tom Crossland Date: Wed, 2 Oct 2019 11:12:40 +0200 Subject: [PATCH 4/8] feat: support for bulk "index" action --- README.md | 4 +++ lib/elasticsearch.ex | 55 ++++++++++++++++++++++++++++++ lib/elasticsearch/api/api.ex | 2 +- lib/elasticsearch/indexing/bulk.ex | 15 ++++---- 4 files changed, 68 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 95ec492..537628c 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,10 @@ config :my_app, MyApp.ElasticsearchCluster, # Likewise, wait a given period between posting pages to give # Elasticsearch time to catch up. bulk_wait_interval: 15_000 # 15 seconds + + # By default bulk indexing uses the "create" action. To allow existing + # documents to be replaced, use the "index" action instead. + bulk_action: "create" } } ``` diff --git a/lib/elasticsearch.ex b/lib/elasticsearch.ex index 21e33f1..ffe15f7 100644 --- a/lib/elasticsearch.ex +++ b/lib/elasticsearch.ex @@ -425,6 +425,61 @@ defmodule Elasticsearch do |> unwrap!() end + @doc """ + Determines whether a resource exists at a given Elasticsearch path + + ## Examples + + iex> Index.create_from_file(Cluster, "posts", "test/support/settings/posts.json") + ...> Elasticsearch.head(Cluster, "/posts") + {:ok, ""} + + It returns an error if the given resource does not exist. + + iex> Elasticsearch.head(Cluster, "/nonexistent") + {:error, + %Elasticsearch.Exception{ + col: nil, + line: nil, + message: "", + query: nil, + raw: nil, + status: nil, + type: nil + }} + """ + @spec head(Cluster.t(), url) :: response + @spec head(Cluster.t(), url, opts) :: response + def head(cluster, url, opts \\ []) do + config = Config.get(cluster) + + config + |> config.api.request(:head, url, "", opts) + |> format() + end + + @doc """ + Same as `head/1`, but returns the response and raises errors. + + ## Examples + + iex> Index.create_from_file(Cluster, "posts", "test/support/settings/posts.json") + ...> Elasticsearch.head!(Cluster, "/posts") + "" + + Raises an error if the resource is invalid. + + iex> Elasticsearch.head!(Cluster, "/nonexistent") + ** (Elasticsearch.Exception) + """ + @spec head!(Cluster.t(), url) :: map | no_return + @spec head!(Cluster.t(), url, opts) :: map | no_return + def head!(cluster, url, opts \\ []) do + cluster + |> head(url, opts) + |> unwrap!() + end + defp format({:ok, %{status_code: code, body: body}}) when code >= 200 and code < 300 do {:ok, body} diff --git a/lib/elasticsearch/api/api.ex b/lib/elasticsearch/api/api.ex index 55cd2ce..54632ae 100644 --- a/lib/elasticsearch/api/api.ex +++ b/lib/elasticsearch/api/api.ex @@ -4,7 +4,7 @@ defmodule Elasticsearch.API do """ @typedoc "An HTTP method" - @type method :: :get | :put | :post | :delete + @type method :: :get | :put | :post | :delete | :head @typedoc "The URL to request from the API" @type url :: String.t() diff --git a/lib/elasticsearch/indexing/bulk.ex b/lib/elasticsearch/indexing/bulk.ex index 6553f06..8f9ad81 100644 --- a/lib/elasticsearch/indexing/bulk.ex +++ b/lib/elasticsearch/indexing/bulk.ex @@ -99,16 +99,17 @@ defmodule Elasticsearch.Index.Bulk do config = Cluster.Config.get(cluster) bulk_page_size = index_config[:bulk_page_size] || 5000 bulk_wait_interval = index_config[:bulk_wait_interval] || 0 + action = index_config[:bulk_action] || "create" errors = store.transaction(fn -> source |> store.stream() - |> Stream.map(&encode!(config, &1, index_name)) + |> Stream.map(&encode!(config, &1, index_name, action)) |> Stream.chunk_every(bulk_page_size) |> Stream.intersperse(bulk_wait_interval) |> Stream.map(&put_bulk_page(config, index_name, &1)) - |> Enum.reduce(errors, &collect_errors/2) + |> Enum.reduce(errors, &collect_errors(&1, &2, action)) end) upload(config, index_name, %{index_config | sources: tail}, errors) @@ -123,21 +124,21 @@ defmodule Elasticsearch.Index.Bulk do Elasticsearch.put(config, "/#{index_name}/_doc/_bulk", Enum.join(items)) end - defp collect_errors({:ok, %{"errors" => true} = response}, errors) do + defp collect_errors({:ok, %{"errors" => true} = response}, errors, action) do new_errors = response["items"] - |> Enum.filter(&(&1["create"]["error"] != nil)) - |> Enum.map(& &1["create"]) + |> Enum.filter(&(&1[action]["error"] != nil)) + |> Enum.map(& &1[action]) |> Enum.map(&Elasticsearch.Exception.exception(response: &1)) new_errors ++ errors end - defp collect_errors({:error, error}, errors) do + defp collect_errors({:error, error}, errors, _action) do [error | errors] end - defp collect_errors(_response, errors) do + defp collect_errors(_response, errors, _action) do errors end end From 747aa6eedd72e8c991418522d41513c842f1d4e5 Mon Sep 17 00:00:00 2001 From: Tom Crossland Date: Fri, 4 Oct 2019 21:30:27 +0200 Subject: [PATCH 5/8] feat: allow settings to be read from a map instead of a file --- lib/elasticsearch/indexing/index.ex | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/lib/elasticsearch/indexing/index.ex b/lib/elasticsearch/indexing/index.ex index 6eaf7a9..2e7619c 100644 --- a/lib/elasticsearch/indexing/index.ex +++ b/lib/elasticsearch/indexing/index.ex @@ -30,9 +30,9 @@ defmodule Elasticsearch.Index do alias = alias_to_atom(alias) name = build_name(alias) config = Config.get(cluster) - %{settings: settings_file} = index_config = config[:indexes][alias] + %{settings: settings} = index_config = config[:indexes][alias] - with :ok <- create_from_file(config, name, settings_file), + with :ok <- create_from_settings(config, name, settings), :ok <- Bulk.upload(config, name, index_config), :ok <- __MODULE__.alias(config, name, to_string(alias)), :ok <- clean_starting_with(config, to_string(alias), 2), @@ -273,6 +273,31 @@ defmodule Elasticsearch.Index do end end + @doc """ + Creates an index with the given name, with settings loaded from a map or a JSON file (see `create_from_file/3`). + + ## Example + + iex> Index.create_from_settings(Cluster, "posts-1", %{}) + :ok + + iex> Index.create_from_settings(Cluster, "posts-1", "nonexistent.json") + {:error, :enoent} + """ + @spec create_from_settings(Cluster.t(), String.t(), map | Path.t()) :: + :ok + | {:error, File.posix()} + | {:error, Elasticsearch.Exception.t()} + def create_from_settings(cluster, name, settings) + + def create_from_settings(cluster, name, settings) when is_map(settings) do + create(cluster, name, settings) + end + + def create_from_settings(cluster, name, file) do + create_from_file(cluster, name, file) + end + @doc """ Generates a name for an index that will be aliased to a given `alias`. Similar to migrations, the name will contain a timestamp. From a71ae8336cd642ed9dd28e14bbddc0e6b0eb8655 Mon Sep 17 00:00:00 2001 From: Tom Crossland Date: Fri, 4 Oct 2019 21:50:53 +0200 Subject: [PATCH 6/8] fix: an empty map is valid for index settings --- lib/elasticsearch/cluster/config.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/elasticsearch/cluster/config.ex b/lib/elasticsearch/cluster/config.ex index 2bd2ea4..6bcd19e 100644 --- a/lib/elasticsearch/cluster/config.ex +++ b/lib/elasticsearch/cluster/config.ex @@ -73,7 +73,7 @@ defmodule Elasticsearch.Cluster.Config do defp validate_index({_name, settings}) do Vex.validate( settings, - settings: [presence: true, by: &is_binary/1], + settings: [presence: [unless: &is_map/1], by: &(is_binary(&1) or is_map(&1))], store: [presence: true, by: &is_module/1], sources: [ presence: true, From 3b3af970c09fb1629dfec25182164b80963c0f50 Mon Sep 17 00:00:00 2001 From: Tom Crossland Date: Mon, 7 Oct 2019 11:04:24 +0200 Subject: [PATCH 7/8] fix: failing test after changing settings validatino --- test/elasticsearch/cluster/cluster_test.exs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/elasticsearch/cluster/cluster_test.exs b/test/elasticsearch/cluster/cluster_test.exs index ca1f9b3..1859b18 100644 --- a/test/elasticsearch/cluster/cluster_test.exs +++ b/test/elasticsearch/cluster/cluster_test.exs @@ -103,7 +103,7 @@ defmodule Elasticsearch.ClusterTest do test "validates indexes" do errors = errors_on(%{valid_config() | indexes: %{example: %{}}}) - for field <- [:settings, :store, :sources, :bulk_page_size, :bulk_wait_interval] do + for field <- [:store, :sources, :bulk_page_size, :bulk_wait_interval] do assert {"must be present", validation: :presence} in errors[field] end From 3d44a0b63e2c9dc28469572e4fab1ed06e3bc1fd Mon Sep 17 00:00:00 2001 From: Tom Crossland Date: Mon, 7 Oct 2019 11:18:34 +0200 Subject: [PATCH 8/8] refactor: remove presence validation for settings (empty map is valid) --- lib/elasticsearch/cluster/config.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/elasticsearch/cluster/config.ex b/lib/elasticsearch/cluster/config.ex index 6bcd19e..02ddf75 100644 --- a/lib/elasticsearch/cluster/config.ex +++ b/lib/elasticsearch/cluster/config.ex @@ -73,7 +73,7 @@ defmodule Elasticsearch.Cluster.Config do defp validate_index({_name, settings}) do Vex.validate( settings, - settings: [presence: [unless: &is_map/1], by: &(is_binary(&1) or is_map(&1))], + settings: [by: &(is_binary(&1) or is_map(&1))], store: [presence: true, by: &is_module/1], sources: [ presence: true,