Merge pull request #78 from Bluetab/feature/bulk-index-action
Bulk "index" action and HEAD support
This commit is contained in:
commit
3332e6eb3b
7 changed files with 102 additions and 17 deletions
|
|
@ -100,6 +100,10 @@ config :my_app, MyApp.ElasticsearchCluster,
|
|||
# Likewise, wait a given period between posting pages to give
|
||||
# Elasticsearch time to catch up.
|
||||
bulk_wait_interval: 15_000 # 15 seconds
|
||||
|
||||
# By default bulk indexing uses the "create" action. To allow existing
|
||||
# documents to be replaced, use the "index" action instead.
|
||||
bulk_action: "create"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
|
|
|||
|
|
@ -425,6 +425,61 @@ defmodule Elasticsearch do
|
|||
|> unwrap!()
|
||||
end
|
||||
|
||||
@doc """
|
||||
Determines whether a resource exists at a given Elasticsearch path
|
||||
|
||||
## Examples
|
||||
|
||||
iex> Index.create_from_file(Cluster, "posts", "test/support/settings/posts.json")
|
||||
...> Elasticsearch.head(Cluster, "/posts")
|
||||
{:ok, ""}
|
||||
|
||||
It returns an error if the given resource does not exist.
|
||||
|
||||
iex> Elasticsearch.head(Cluster, "/nonexistent")
|
||||
{:error,
|
||||
%Elasticsearch.Exception{
|
||||
col: nil,
|
||||
line: nil,
|
||||
message: "",
|
||||
query: nil,
|
||||
raw: nil,
|
||||
status: nil,
|
||||
type: nil
|
||||
}}
|
||||
"""
|
||||
@spec head(Cluster.t(), url) :: response
|
||||
@spec head(Cluster.t(), url, opts) :: response
|
||||
def head(cluster, url, opts \\ []) do
|
||||
config = Config.get(cluster)
|
||||
|
||||
config
|
||||
|> config.api.request(:head, url, "", opts)
|
||||
|> format()
|
||||
end
|
||||
|
||||
@doc """
|
||||
Same as `head/1`, but returns the response and raises errors.
|
||||
|
||||
## Examples
|
||||
|
||||
iex> Index.create_from_file(Cluster, "posts", "test/support/settings/posts.json")
|
||||
...> Elasticsearch.head!(Cluster, "/posts")
|
||||
""
|
||||
|
||||
Raises an error if the resource is invalid.
|
||||
|
||||
iex> Elasticsearch.head!(Cluster, "/nonexistent")
|
||||
** (Elasticsearch.Exception)
|
||||
"""
|
||||
@spec head!(Cluster.t(), url) :: map | no_return
|
||||
@spec head!(Cluster.t(), url, opts) :: map | no_return
|
||||
def head!(cluster, url, opts \\ []) do
|
||||
cluster
|
||||
|> head(url, opts)
|
||||
|> unwrap!()
|
||||
end
|
||||
|
||||
defp format({:ok, %{status_code: code, body: body}})
|
||||
when code >= 200 and code < 300 do
|
||||
{:ok, body}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ defmodule Elasticsearch.API do
|
|||
"""
|
||||
|
||||
@typedoc "An HTTP method"
|
||||
@type method :: :get | :put | :post | :delete
|
||||
@type method :: :get | :put | :post | :delete | :head
|
||||
|
||||
@typedoc "The URL to request from the API"
|
||||
@type url :: String.t()
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ defmodule Elasticsearch.Cluster.Config do
|
|||
defp validate_index({_name, settings}) do
|
||||
Vex.validate(
|
||||
settings,
|
||||
settings: [presence: true, by: &is_binary/1],
|
||||
settings: [by: &(is_binary(&1) or is_map(&1))],
|
||||
store: [presence: true, by: &is_module/1],
|
||||
sources: [
|
||||
presence: true,
|
||||
|
|
|
|||
|
|
@ -27,11 +27,11 @@ defmodule Elasticsearch.Index.Bulk do
|
|||
%Protocol.UndefinedError{description: "",
|
||||
protocol: Elasticsearch.Document, value: 123}}
|
||||
"""
|
||||
@spec encode(Cluster.t(), struct, String.t()) ::
|
||||
@spec encode(Cluster.t(), struct, String.t(), String.t()) ::
|
||||
{:ok, String.t()}
|
||||
| {:error, Error.t()}
|
||||
def encode(cluster, struct, index) do
|
||||
{:ok, encode!(cluster, struct, index)}
|
||||
def encode(cluster, struct, index, action \\ "create") do
|
||||
{:ok, encode!(cluster, struct, index, action)}
|
||||
rescue
|
||||
exception ->
|
||||
{:error, exception}
|
||||
|
|
@ -51,9 +51,9 @@ defmodule Elasticsearch.Index.Bulk do
|
|||
iex> Bulk.encode!(Cluster, 123, "my-index")
|
||||
** (Protocol.UndefinedError) protocol Elasticsearch.Document not implemented for 123 of type Integer
|
||||
"""
|
||||
def encode!(cluster, struct, index) do
|
||||
def encode!(cluster, struct, index, action \\ "create") do
|
||||
config = Cluster.Config.get(cluster)
|
||||
header = header(config, "create", index, struct)
|
||||
header = header(config, action, index, struct)
|
||||
|
||||
document =
|
||||
struct
|
||||
|
|
@ -99,16 +99,17 @@ defmodule Elasticsearch.Index.Bulk do
|
|||
config = Cluster.Config.get(cluster)
|
||||
bulk_page_size = index_config[:bulk_page_size] || 5000
|
||||
bulk_wait_interval = index_config[:bulk_wait_interval] || 0
|
||||
action = index_config[:bulk_action] || "create"
|
||||
|
||||
errors =
|
||||
store.transaction(fn ->
|
||||
source
|
||||
|> store.stream()
|
||||
|> Stream.map(&encode!(config, &1, index_name))
|
||||
|> Stream.map(&encode!(config, &1, index_name, action))
|
||||
|> Stream.chunk_every(bulk_page_size)
|
||||
|> Stream.intersperse(bulk_wait_interval)
|
||||
|> Stream.map(&put_bulk_page(config, index_name, &1))
|
||||
|> Enum.reduce(errors, &collect_errors/2)
|
||||
|> Enum.reduce(errors, &collect_errors(&1, &2, action))
|
||||
end)
|
||||
|
||||
upload(config, index_name, %{index_config | sources: tail}, errors)
|
||||
|
|
@ -123,21 +124,21 @@ defmodule Elasticsearch.Index.Bulk do
|
|||
Elasticsearch.put(config, "/#{index_name}/_doc/_bulk", Enum.join(items))
|
||||
end
|
||||
|
||||
defp collect_errors({:ok, %{"errors" => true} = response}, errors) do
|
||||
defp collect_errors({:ok, %{"errors" => true} = response}, errors, action) do
|
||||
new_errors =
|
||||
response["items"]
|
||||
|> Enum.filter(&(&1["create"]["error"] != nil))
|
||||
|> Enum.map(& &1["create"])
|
||||
|> Enum.filter(&(&1[action]["error"] != nil))
|
||||
|> Enum.map(& &1[action])
|
||||
|> Enum.map(&Elasticsearch.Exception.exception(response: &1))
|
||||
|
||||
new_errors ++ errors
|
||||
end
|
||||
|
||||
defp collect_errors({:error, error}, errors) do
|
||||
defp collect_errors({:error, error}, errors, _action) do
|
||||
[error | errors]
|
||||
end
|
||||
|
||||
defp collect_errors(_response, errors) do
|
||||
defp collect_errors(_response, errors, _action) do
|
||||
errors
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -30,9 +30,9 @@ defmodule Elasticsearch.Index do
|
|||
alias = alias_to_atom(alias)
|
||||
name = build_name(alias)
|
||||
config = Config.get(cluster)
|
||||
%{settings: settings_file} = index_config = config[:indexes][alias]
|
||||
%{settings: settings} = index_config = config[:indexes][alias]
|
||||
|
||||
with :ok <- create_from_file(config, name, settings_file),
|
||||
with :ok <- create_from_settings(config, name, settings),
|
||||
:ok <- Bulk.upload(config, name, index_config),
|
||||
:ok <- __MODULE__.alias(config, name, to_string(alias)),
|
||||
:ok <- clean_starting_with(config, to_string(alias), 2),
|
||||
|
|
@ -273,6 +273,31 @@ defmodule Elasticsearch.Index do
|
|||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
Creates an index with the given name, with settings loaded from a map or a JSON file (see `create_from_file/3`).
|
||||
|
||||
## Example
|
||||
|
||||
iex> Index.create_from_settings(Cluster, "posts-1", %{})
|
||||
:ok
|
||||
|
||||
iex> Index.create_from_settings(Cluster, "posts-1", "nonexistent.json")
|
||||
{:error, :enoent}
|
||||
"""
|
||||
@spec create_from_settings(Cluster.t(), String.t(), map | Path.t()) ::
|
||||
:ok
|
||||
| {:error, File.posix()}
|
||||
| {:error, Elasticsearch.Exception.t()}
|
||||
def create_from_settings(cluster, name, settings)
|
||||
|
||||
def create_from_settings(cluster, name, settings) when is_map(settings) do
|
||||
create(cluster, name, settings)
|
||||
end
|
||||
|
||||
def create_from_settings(cluster, name, file) do
|
||||
create_from_file(cluster, name, file)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Generates a name for an index that will be aliased to a given `alias`.
|
||||
Similar to migrations, the name will contain a timestamp.
|
||||
|
|
|
|||
|
|
@ -103,7 +103,7 @@ defmodule Elasticsearch.ClusterTest do
|
|||
test "validates indexes" do
|
||||
errors = errors_on(%{valid_config() | indexes: %{example: %{}}})
|
||||
|
||||
for field <- [:settings, :store, :sources, :bulk_page_size, :bulk_wait_interval] do
|
||||
for field <- [:store, :sources, :bulk_page_size, :bulk_wait_interval] do
|
||||
assert {"must be present", validation: :presence} in errors[field]
|
||||
end
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue