Introduce Elasticsearch.Cluster

This commit is contained in:
Daniel Berkompas 2018-04-11 15:53:23 -07:00
commit 148974af06
23 changed files with 849 additions and 481 deletions

View file

@ -23,19 +23,27 @@ def deps do
end
```
Then, create an `Elasticsearch.Cluster` in your application:
```elixir
defmodule MyApp.ElasticsearchCluster do
use Elasticsearch.Cluster, otp_app: :my_app
end
```
## Configuration
See the annotated example configuration below.
```elixir
config :elasticsearch,
config :my_app, MyApp.ElasticsearchCluster,
# The URL where Elasticsearch is hosted on your system
url: "http://localhost:9200", # or {:system, "ELASTICSEARCH_URL"}
url: "http://localhost:9200",
# If your Elasticsearch cluster uses HTTP basic authentication,
# specify the username and password here:
username: "username", # or {:system, "ELASTICSEARCH_USERNAME"}
password: "password", # or {:system, "ELASTICSEARCH_PASSWORD"}
username: "username",
password: "password",
# When indexing data using the `mix elasticsearch.build` task,
# control the data ingestion rate by raising or lowering the number
@ -49,7 +57,7 @@ config :elasticsearch,
# If you want to mock the responses of the Elasticsearch JSON API
# for testing or other purposes, you can inject a different module
# here. It must implement the Elasticsearch.API behaviour.
api_module: Elasticsearch.API.HTTP,
api: Elasticsearch.API.HTTP,
# Customize the library used for JSON encoding/decoding.
json_library: Poison, # or Jason
@ -83,7 +91,7 @@ config :elasticsearch,
}
```
## Protocols & Behaviours
## Protocols and Behaviours
#### Elasticsearch.Store
@ -130,8 +138,8 @@ This can be used in test mode, for example:
```elixir
# config/test.exs
config :elasticsearch,
api_module: MyApp.ElasticsearchMock
config :my_app, MyApp.ElasticsearchCluster,
api: MyApp.ElasticsearchMock
```
Your mock can then stub requests and responses from Elasticsearch.
@ -140,7 +148,8 @@ Your mock can then stub requests and responses from Elasticsearch.
defmodule MyApp.ElasticsearchMock do
@behaviour Elasticsearch.API
def get("/posts/1", _headers, _opts) do
@impl true
def request(_config, :get, "/posts/1", _headers, _opts) do
{:ok, %HTTPoison.Response{
status_code: 404,
body: %{
@ -161,7 +170,7 @@ hot-swap technique with Elasticsearch aliases.
```bash
# This will read the `indexes[posts]` configuration seen above, to build
# an index, `posts-123123123`, which will then be aliased to `posts`.
$ mix elasticsearch.build posts
$ mix elasticsearch.build posts --cluster MyApp.ElasticsearchCluster
```
See the docs on `Mix.Tasks.Elasticsearch.Build` and `Elasticsearch.Index`
@ -169,29 +178,29 @@ for more details.
#### Individual Documents
Use `Elasticsearch.put_document/2` to upload a document to a particular index.
Use `Elasticsearch.put_document/3` to upload a document to a particular index.
```elixir
# MyApp.Post must implement Elasticsearch.Document
Elasticsearch.put_document(%MyApp.Post{}, "index-name")
Elasticsearch.put_document(MyApp.ElasticsearchCluster, %MyApp.Post{}, "index-name")
```
To remove documents, use `Elasticsearch.delete_document/2`:
To remove documents, use `Elasticsearch.delete_document/3`:
```elixir
Elasticsearch.delete_document(%MyApp.Post{}, "index-name")
Elasticsearch.delete_document(MyApp.ElasticsearchCluster, %MyApp.Post{}, "index-name")
```
## Querying
You can query Elasticsearch the `post/2` function:
You can query Elasticsearch the `post/3` function:
```elixir
# Raw query
Elasticsearch.post("/posts/post/_search", '{"query": {"match_all": {}}}')
Elasticsearch.post(MyApp.ElasticsearchCluster, "/posts/post/_search", '{"query": {"match_all": {}}}')
# Using a map
Elasticsearch.post("/posts/post/_search", %{"query" => %{"match_all" => %{}}})
Elasticsearch.post(MyApp.ElasticsearchCluster, "/posts/post/_search", %{"query" => %{"match_all" => %{}}})
```
See the official Elasticsearch [documentation](https://www.elastic.co/guide/en/elasticsearch/reference/6.x/index.html)

8
bin/coverage Executable file
View file

@ -0,0 +1,8 @@
#!/usr/bin/env bash
#
# Coverage Report
#
# Generates and opens a code coverage report for the entire project.
mix coveralls.html || { echo 'Tests failed!'; exit 1; }
open cover/excoveralls.html

9
coveralls.json Normal file
View file

@ -0,0 +1,9 @@
{
"skip_files": [
"lib/elasticsearch/api/api.ex",
"test/support/*"
],
"coverage_options": {
"treat_no_relevant_lines_as_covered": true
}
}

View file

@ -1,28 +1,23 @@
defmodule Elasticsearch do
@moduledoc """
An Elixir interface to the Elasticsearch JSON API.
Entry-point for interacting with your Elasticsearch cluster(s).
## Configuration
You can customize the API module used by this module to make requests to
the Elasticsearch API. (Default: `Elasticsearch.API.HTTP`)
config :elasticsearch,
api_module: MyApp.CustomAPI
You can also specify default headers or default options to pass to
`HTTPoison`.
config :elasticsearch,
default_headers: [{"authorization", "custom-value"}],
default_options: [ssl: [{:versions, [:'tlsv1.2']}]]
You should configure at least one `Elasticsearch.Cluster` in order to
use the functions in this module, or else you'll need to pass all the
configuration for the cluster into each function call.
"""
alias Elasticsearch.Document
alias Elasticsearch.{
Document,
Cluster,
Cluster.Config
}
@type response ::
{:ok, map}
| {:error, Elasticsearch.Exception.t()}
@type index_name :: String.t()
@type url :: Path.t()
@type opts :: Keyword.t()
@type data :: map | String.t()
@type response :: {:ok, map} :: {:error, Elasticsearch.Exception.t()}
@doc """
Creates or updates a document in a given index.
@ -31,30 +26,26 @@ defmodule Elasticsearch do
## Example
iex> Elasticsearch.Index.create_from_file("posts-1", "test/support/settings/posts.json")
iex> Index.create_from_file(Cluster, "posts-1", "test/support/settings/posts.json")
...> struct = %Post{id: 123, title: "Post", author: "Author"}
...> Elasticsearch.put_document(struct, "posts-1")
...> Elasticsearch.put_document(Cluster, struct, "posts-1")
{:ok,
%{"_id" => "123", "_index" => "posts-1",
"_shards" => %{"failed" => 0, "successful" => 1, "total" => 2},
"_type" => "post", "_version" => 1, "created" => true,
"result" => "created"}}
"""
@spec put_document(Document.t(), String.t()) :: response
def put_document(document, index) do
document
|> document_url(index)
|> put(Document.encode(document))
@spec put_document(Cluster.t(), Document.t(), index_name) :: response
def put_document(cluster, document, index) do
put(cluster, document_url(document, index), Document.encode(document))
end
@doc """
Same as `put_document/2`, but raises on errors.
"""
@spec put_document!(Document.t(), String.t()) :: map
def put_document!(document, index) do
document
|> put_document(index)
|> unwrap!()
@spec put_document(Cluster.t(), Document.t(), index_name) :: map | no_return
def put_document!(cluster, document, index) do
put!(cluster, document_url(document, index), Document.encode(document))
end
@doc """
@ -62,21 +53,17 @@ defmodule Elasticsearch do
The document must implement the `Elasticsearch.Document` protocol.
"""
@spec delete_document(Document.t(), String.t()) :: response
def delete_document(document, index) do
document
|> document_url(index)
|> delete()
@spec delete_document(Cluster.t(), Document.t(), index_name) :: response
def delete_document(cluster, document, index) do
delete(cluster, document_url(document, index))
end
@doc """
Same as `delete_document/2`, but raises on errors.
"""
@spec delete_document!(Document.t(), String.t()) :: map
def delete_document!(document, index) do
document
|> delete_document(index)
|> unwrap!()
@spec delete_document!(Cluster.t(), Document.t(), index_name) :: map | no_return
def delete_document!(cluster, document, index) do
delete!(cluster, document_url(document, index))
end
defp document_url(document, index) do
@ -84,23 +71,17 @@ defmodule Elasticsearch do
end
@doc """
Waits for Elasticsearch to be available at the configured url.
Waits for a given Elasticsearch cluster to be available.
It will try a given number of times, with 1sec delay between tries.
## Example
iex> {:ok, resp} = Elasticsearch.wait_for_boot(15)
...> is_list(resp)
true
"""
@spec wait_for_boot(integer) ::
@spec wait_for_boot(Cluster.t(), integer) ::
{:ok, map}
| {:error, RuntimeError.t()}
| {:error, Elasticsearch.Exception.t()}
def wait_for_boot(tries, count \\ 0)
def wait_for_boot(cluster, tries, count \\ 0)
def wait_for_boot(tries, count) when count == tries do
def wait_for_boot(_cluster, tries, count) when count == tries do
{
:error,
RuntimeError.exception("""
@ -109,10 +90,10 @@ defmodule Elasticsearch do
}
end
def wait_for_boot(tries, count) do
with {:error, _} <- get("/_cat/health?format=json") do
def wait_for_boot(cluster, tries, count) do
with {:error, _} <- get(cluster, "/_cat/health?format=json") do
:timer.sleep(1000)
wait_for_boot(tries, count + 1)
wait_for_boot(cluster, tries, count + 1)
end
end
@ -121,11 +102,11 @@ defmodule Elasticsearch do
## Examples
iex> {:ok, resp} = Elasticsearch.get("/_cat/health?format=json")
iex> {:ok, resp} = Elasticsearch.get(Cluster, "/_cat/health?format=json")
...> is_list(resp)
true
iex> Elasticsearch.get("/nonexistent")
iex> Elasticsearch.get(Cluster, "/nonexistent")
{:error,
%Elasticsearch.Exception{col: nil, line: nil,
message: "no such index", query: nil,
@ -141,11 +122,13 @@ defmodule Elasticsearch do
"type" => "index_not_found_exception"}, "status" => 404},
status: 404, type: "index_not_found_exception"}}
"""
@spec get(String.t()) :: response
@spec get(String.t(), Keyword.t()) :: response
def get(url, opts \\ []) do
url
|> api_module().get(default_headers(), Keyword.merge(default_opts(), opts))
@spec get(Cluster.t(), url) :: response
@spec get(Cluster.t(), url, opts) :: response
def get(cluster, url, opts \\ []) do
config = Config.get(cluster)
config
|> config.api.request(:get, url, "", opts)
|> format()
end
@ -155,18 +138,18 @@ defmodule Elasticsearch do
## Examples
iex> resp = Elasticsearch.get!("/_cat/health?format=json")
iex> resp = Elasticsearch.get!(Cluster, "/_cat/health?format=json")
...> is_list(resp)
true
iex> Elasticsearch.get!("/nonexistent")
iex> Elasticsearch.get!(Cluster, "/nonexistent")
** (Elasticsearch.Exception) (index_not_found_exception) no such index
"""
@spec get!(String.t()) :: map
@spec get!(String.t(), Keyword.t()) :: map
def get!(url, opts \\ []) do
url
|> get(opts)
@spec get!(Cluster.t(), url) :: map | no_return
@spec get!(Cluster.t(), url, opts) :: map | no_return
def get!(cluster, url, opts \\ []) do
cluster
|> get(url, opts)
|> unwrap!()
end
@ -175,25 +158,27 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.Index.create_from_file("posts-1", "test/support/settings/posts.json")
...> Elasticsearch.put("/posts-1/post/id", %{"title" => "title", "author" => "author"})
iex> Index.create_from_file(Cluster, "posts-1", "test/support/settings/posts.json")
...> Elasticsearch.put(Cluster, "/posts-1/post/id", %{"title" => "title", "author" => "author"})
{:ok,
%{"_id" => "id", "_index" => "posts-1",
"_shards" => %{"failed" => 0, "successful" => 1, "total" => 2},
"_type" => "post", "_version" => 1, "created" => true,
"result" => "created"}}
iex> Elasticsearch.put("/bad/url", %{"title" => "title", "author" => "author"})
iex> Elasticsearch.put(Cluster, "/bad/url", %{"title" => "title", "author" => "author"})
{:error,
%Elasticsearch.Exception{col: nil, line: nil,
message: "No handler found for uri [/bad/url] and method [PUT]",
query: nil, raw: nil, status: nil, type: nil}}
"""
@spec put(String.t(), map | binary) :: response
@spec put(String.t(), map | binary, Keyword.t()) :: response
def put(url, data, opts \\ []) do
url
|> api_module().put(data, default_headers(), Keyword.merge(default_opts(), opts))
@spec put(Cluster.t(), url, data) :: response
@spec put(Cluster.t(), url, data, opts) :: response
def put(cluster, url, data, opts \\ []) do
config = Config.get(cluster)
config
|> config.api.request(:put, url, data, opts)
|> format()
end
@ -203,21 +188,21 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.Index.create_from_file("posts", "test/support/settings/posts.json")
...> Elasticsearch.put!("/posts/post/id", %{"name" => "name", "author" => "author"})
iex> Index.create_from_file(Cluster, "posts", "test/support/settings/posts.json")
...> Elasticsearch.put!(Cluster, "/posts/post/id", %{"name" => "name", "author" => "author"})
%{"_id" => "id", "_index" => "posts",
"_shards" => %{"failed" => 0, "successful" => 1, "total" => 2},
"_type" => "post", "_version" => 1, "created" => true,
"result" => "created"}
iex> Elasticsearch.put!("/bad/url", %{"data" => "here"})
iex> Elasticsearch.put!(Cluster, "/bad/url", %{"data" => "here"})
** (Elasticsearch.Exception) No handler found for uri [/bad/url] and method [PUT]
"""
@spec put!(String.t(), map) :: map
@spec put!(String.t(), map, Keyword.t()) :: map
def put!(url, data, opts \\ []) do
url
|> put(data, opts)
@spec put!(Cluster.t(), url, data) :: map | no_return
@spec put!(Cluster.t(), url, data, opts) :: map | no_return
def put!(cluster, url, data, opts \\ []) do
cluster
|> put(url, data, opts)
|> unwrap!()
end
@ -227,17 +212,19 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.Index.create_from_file("posts", "test/support/settings/posts.json")
iex> Index.create_from_file(Cluster, "posts", "test/support/settings/posts.json")
...> query = %{"query" => %{"match_all" => %{}}}
...> {:ok, resp} = Elasticsearch.post("/posts/_search", query)
...> {:ok, resp} = Elasticsearch.post(Cluster, "/posts/_search", query)
...> resp["hits"]["hits"]
[]
"""
@spec post(String.t(), map) :: response
@spec post(String.t(), map, Keyword.t()) :: response
def post(url, data, opts \\ []) do
url
|> api_module().post(data, default_headers(), Keyword.merge(default_opts(), opts))
@spec post(Cluster.t(), url, data) :: response
@spec post(Cluster.t(), url, data, opts) :: response
def post(cluster, url, data, opts \\ []) do
config = Config.get(cluster)
config
|> config.api.request(:post, url, data, opts)
|> format()
end
@ -246,23 +233,23 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.Index.create_from_file("posts", "test/support/settings/posts.json")
iex> Index.create_from_file(Cluster, "posts", "test/support/settings/posts.json")
...> query = %{"query" => %{"match_all" => %{}}}
...> resp = Elasticsearch.post!("/posts/_search", query)
...> resp = Elasticsearch.post!(Cluster, "/posts/_search", query)
...> is_map(resp)
true
Raises an error if the path is invalid or another error occurs:
iex> query = %{"query" => %{"match_all" => %{}}}
...> Elasticsearch.post!("/nonexistent/_search", query)
...> Elasticsearch.post!(Cluster, "/nonexistent/_search", query)
** (Elasticsearch.Exception) (index_not_found_exception) no such index
"""
@spec post!(String.t(), map) :: map
@spec post!(String.t(), map, Keyword.t()) :: map
def post!(url, data, opts \\ []) do
url
|> post(data, opts)
@spec post!(Cluster.t(), url, data) :: map | no_return
@spec post!(Cluster.t(), url, data, opts) :: map | no_return
def post!(cluster, url, data, opts \\ []) do
cluster
|> post(url, data, opts)
|> unwrap!()
end
@ -271,13 +258,13 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.Index.create_from_file("posts", "test/support/settings/posts.json")
...> Elasticsearch.delete("/posts")
iex> Index.create_from_file(Cluster, "posts", "test/support/settings/posts.json")
...> Elasticsearch.delete(Cluster, "/posts")
{:ok, %{"acknowledged" => true}}
It returns an error if the given resource does not exist.
iex> Elasticsearch.delete("/nonexistent")
iex> Elasticsearch.delete(Cluster, "/nonexistent")
{:error,
%Elasticsearch.Exception{col: nil, line: nil,
message: "no such index", query: nil,
@ -293,10 +280,14 @@ defmodule Elasticsearch do
"type" => "index_not_found_exception"}, "status" => 404},
status: 404, type: "index_not_found_exception"}}
"""
@spec delete(String.t()) :: response
@spec delete(String.t(), Keyword.t()) :: response
def delete(url, opts \\ []) do
format(api_module().delete(url, default_headers(), Keyword.merge(default_opts(), opts)))
@spec delete(Cluster.t(), url) :: response
@spec delete(Cluster.t(), url, opts) :: response
def delete(cluster, url, opts \\ []) do
config = Config.get(cluster)
config
|> config.api.request(:delete, url, "", opts)
|> format()
end
@doc """
@ -304,20 +295,20 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.Index.create_from_file("posts", "test/support/settings/posts.json")
...> Elasticsearch.delete!("/posts")
iex> Index.create_from_file(Cluster, "posts", "test/support/settings/posts.json")
...> Elasticsearch.delete!(Cluster, "/posts")
%{"acknowledged" => true}
Raises an error if the resource is invalid.
iex> Elasticsearch.delete!("/nonexistent")
iex> Elasticsearch.delete!(Cluster, "/nonexistent")
** (Elasticsearch.Exception) (index_not_found_exception) no such index
"""
@spec delete!(String.t()) :: map
@spec delete!(String.t(), Keyword.t()) :: map
def delete!(url, opts \\ []) do
url
|> delete(opts)
@spec delete!(Cluster.t(), url) :: map | no_return
@spec delete!(Cluster.t(), url, opts) :: map | no_return
def delete!(cluster, url, opts \\ []) do
cluster
|> delete(url, opts)
|> unwrap!()
end
@ -335,20 +326,4 @@ defmodule Elasticsearch do
defp unwrap!({:ok, value}), do: value
defp unwrap!({:error, exception}), do: raise(exception)
defp api_module do
config()[:api_module] || Elasticsearch.API.HTTP
end
defp default_opts do
Application.get_env(:elasticsearch, :default_opts, [])
end
defp default_headers do
Application.get_env(:elasticsearch, :default_headers, [])
end
defp config do
Application.get_all_env(:elasticsearch)
end
end

View file

@ -1,20 +1,26 @@
defmodule Elasticsearch.API do
@moduledoc """
Defines the necessary callbacks for integrating with the Elasticsearch
JSON API.
Behaviour for interacting with the Elasticsearch JSON API.
"""
@typedoc "An HTTP method"
@type method :: :get | :put | :post | :delete
@typedoc "The URL to request from the API"
@type url :: String.t()
@type data :: map | Keyword.t()
@typedoc "A payload of data to send, relevant to :put and :post requests"
@type data :: binary | map | Keyword.t()
@typedoc "A keyword list of options to pass to HTTPoison/Hackney"
@type opts :: Keyword.t()
@type headers :: Keyword.t()
@type response ::
{:ok, HTTPoison.Response.t() | HTTPoison.AsyncResponse.t()}
| {:error, HTTPoison.Error.t()}
@callback get(url, headers, opts) :: response
@callback put(url, data, headers, opts) :: response
@callback post(url, data, headers, opts) :: response
@callback delete(url, headers, opts) :: response
@doc """
Makes a request to an Elasticsearch JSON API URl using the given method.
"""
@callback request(config :: Elasticsearch.Cluster.config(), method, url, data, opts) :: response
end

View file

@ -1,27 +1,67 @@
defmodule Elasticsearch.API.HTTP do
@moduledoc """
An HTTP implementation of `Elasticsearch.API`, using `HTTPoison`.
A "real" HTTP implementation of `Elasticsearch.API`.
"""
@behaviour Elasticsearch.API
use HTTPoison.Base
alias Elasticsearch.Config
###
# HTTPoison Callbacks
###
@doc false
def process_url(url) do
Config.url() <> url
@impl true
def request(config, method, url, data, opts) do
method
|> HTTPoison.request(
process_url(url, config),
process_request_body(data, config),
headers(config),
opts ++ Map.get(config, :default_opts, [])
)
|> process_response(config)
end
def process_request_headers(_headers) do
headers = [{"Content-Type", "application/json"}]
# Respect absolute URLs if passed
defp process_url("http" <> _rest = url, _config) do
url
end
credentials = Config.http_basic_credentials()
# On relative urls, prepend the configured base URL
defp process_url(url, config) do
Path.join(config.url, url)
end
# Converts the request body into JSON, unless it has already
# been converted
defp process_request_body(data, _config) when is_binary(data) do
data
end
defp process_request_body(data, config) when is_map(data) do
json_library(config).encode!(data)
end
# Converts the response body string from JSON into a map, if it looks like it
# is actually JSON
defp process_response({:ok, %{body: body} = response}, config) do
body =
cond do
json?(body) -> json_library(config).decode!(body)
true -> body
end
{:ok, %{response | body: body}}
end
defp process_response(response, _config) do
response
end
defp json?(str) when is_binary(str) do
str =~ ~r/^\{/ || str =~ ~r/^\[/
end
# Produces request headers for the request, based on the configuration
defp headers(config) do
headers = [{"Content-Type", "application/json"}] ++ Map.get(config, :default_headers, [])
credentials = http_basic_credentials(config)
if credentials do
[{"Authorization", "Basic #{credentials}"} | headers]
@ -30,23 +70,19 @@ defmodule Elasticsearch.API.HTTP do
end
end
@doc false
def process_request_body(string) when is_binary(string), do: string
def process_request_body(map) when is_map(map) do
Config.json_library().encode!(map)
defp http_basic_credentials(%{username: username, password: password}) do
Base.encode64("#{username}:#{password}")
end
@doc false
def process_response_body(body) do
if json?(body) do
Config.json_library().decode!(body)
else
body
end
defp http_basic_credentials(_config) do
nil
end
defp json?(str) do
str =~ ~r/^\{/ || str =~ ~r/^\[/
defp json_library(%{json_library: json_library}) do
json_library
end
defp json_library(_config) do
Poison
end
end

View file

@ -0,0 +1,195 @@
defmodule Elasticsearch.Cluster do
@moduledoc """
Defines and holds configuration for your Elasticsearch cluster.
defmodule MyApp.ElasticsearchCluster do
use Elasticsearch.Cluster
end
Once you have created your cluster, add it to your application's supervision tree:
children = [
MyApp.ElasticsearchCluster
]
Finally, you can issue requests to Elasticsearch using it.
Elasticsearch.get(MyApp.ElasticsearchCluster, "/_cat/health")
## Configuration
Clusters can be configured in several ways.
#### Via Mix
Clusters can read configuration from the mix config, if you pass the
`:otp_app` option:
defmodule MyApp.ElasticsearchCluster do
use Elasticsearch.Cluster, otp_app: :my_app
end
# In your config/config.exs...
config :my_app, MyApp.ElasticsearchCluster,
url: "http://localhost:9200",
# ...
#### Via `init/1`
When a cluster starts, you can override its configuration via the `init/1`
callback. This is a good place to read from environment variables.
defmodule MyApp.ElasticsearchCluster do
use Elasticsearch.Cluster
def init(config) do
config =
config
|> Map.put(:url, System.get_env("ELASTICSEARCH_URL"))
# ...
{:ok, config}
end
end
#### Via `start_link/1`
You can also pass configuration into the cluster directly when you start it
with `start_link/1`.
MyApp.Elasticsearch.start_link(url: "http://localhost:9200", ...)
### Configuration Options
The following options are available for configuration.
* `:url` - The URL at which the Elasticsearch cluster is available.
* `:api` - The API module to use to communicate with Elasticsearch. Must implement the
`Elasticsearch.API` behaviour.
* `:bulk_page_size` - When creating indexes via bulk upload, how many documents to include
per request.
* `:bulk_wait_interval` - The number of milliseconds to wait between bulk upload requests.
* `:indexes` - A map of indexes. Used by `mix elasticsearch.build` to build indexes.
* `:settings`: The file path of the JSON settings for the index.
* `:store`: An `Elasticsearch.Store` module to use to load data for the index.
* `:sources`: A list of sources you want to load for this index.
* `:json_library` (Optional) - The JSON library to use. (E.g. `Poison` or `Jason`)
* `:username` (Optional) - The HTTP Basic username for the Elasticsearch endpoint, if any.
* `:password` (Optional) - The HTTP Basic password for the Elasticsearch endpoint, if any.
* `:default_headers` (Optional) - A list of default headers to send with the each request.
* `:default_options` (Optional) - A list of default HTTPoison/Hackney options to send with
each request.
### Configuration Example
%{
api: Elasticsearch.API.HTTP,
bulk_page_size: 5000,
bulk_wait_interval: 5000,
json_library: Poison,
url: "http://localhost:9200",
username: "username",
password: "password",
default_headers: [{"authorization", "custom-value"}],
default_opts: [ssl: [{:versions, [:'tlsv1.2']}],
indexes: %{
posts: %{
settings: "priv/elasticsearch/posts.json",
store: MyApp.ElasticsearchStore,
sources: [MyApp.Post]
}
}
}
"""
alias Elasticsearch.Cluster.Config
@typedoc """
Defines valid configuration for a cluster.
"""
@type config :: %{
:url => String.t(),
:api => module,
:bulk_page_size => integer,
:bulk_wait_interval => integer,
optional(:json_library) => module,
optional(:username) => String.t(),
optional(:password) => String.t(),
optional(:default_headers) => [{String.t(), String.t()}],
optional(:default_options) => Keyword.t(),
optional(:indexes) => %{
optional(atom) => %{
settings: Path.t(),
store: module,
sources: [module]
}
}
}
@typedoc """
A cluster is either a module defined with `Elasticsearch.Cluster`, or a
map that has all the required configuration keys.
"""
@type t :: module | config
@doc false
defmacro __using__(opts) do
quote do
use GenServer
alias Elasticsearch.Cluster.Config
# Cache configuration into the state of the GenServer so that
# we aren't running potentially expensive logic to load configuration
# on each function call.
def start_link(config \\ []) do
config = Config.build(unquote(opts[:otp_app]), __MODULE__, config)
# Ensure that the configuration is validated on startup
with {:ok, pid} <- GenServer.start_link(__MODULE__, config, name: __MODULE__),
:ok <- GenServer.call(pid, :validate) do
{:ok, pid}
else
error ->
GenServer.stop(__MODULE__)
error
end
end
@impl GenServer
def init(config), do: {:ok, config}
@doc false
def __config__ do
GenServer.call(__MODULE__, :config)
end
@impl GenServer
@doc false
def handle_call(:config, _from, config) do
{:reply, config, config}
end
def handle_call(:validate, _from, config) do
case Config.validate(config) do
{:ok, _config} ->
{:reply, :ok, config}
error ->
{:reply, error, config}
end
end
defoverridable init: 1
end
end
end

View file

@ -0,0 +1,86 @@
defmodule Elasticsearch.Cluster.Config do
@moduledoc false
def get(cluster) when is_atom(cluster) do
cluster.__config__()
end
def get(config) when is_map(config) or is_list(config) do
Enum.into(config, %{})
end
@doc false
def build(nil, config) do
Enum.into(config, %{})
end
def build(otp_app, module, config) do
config = Enum.into(config, %{})
from_app =
otp_app
|> Application.get_env(module, [])
|> Enum.into(%{})
Map.merge(from_app, config)
end
@doc false
def validate(config) do
with {:ok, config} <-
Vex.validate(
config,
url: &(is_binary(&1) && String.starts_with?(&1, "http")),
username: [presence: [unless: &(&1[:password] == nil)]],
password: [presence: [unless: &(&1[:username] == nil)]],
api: [presence: true, by: &is_module/1],
json_library: [by: &(is_nil(&1) || is_module(&1))],
bulk_page_size: [presence: true, by: &is_integer/1],
bulk_wait_interval: [presence: true, by: &is_integer/1]
),
:ok <- validate_indexes(config[:indexes] || %{}) do
{:ok, config}
else
{:error, errors} ->
{:error, validation_errors(errors)}
end
end
defp is_module(module) do
is_atom(module) && Code.ensure_loaded?(module)
end
defp validation_errors(errors) do
errors
|> Enum.map(&Tuple.delete_at(&1, 0))
|> Enum.group_by(&elem(&1, 0), fn {_field, validation, message} ->
{message, validation: validation}
end)
end
defp validate_indexes(indexes) do
invalid =
indexes
|> Enum.map(&validate_index/1)
|> Enum.reject(&match?({:ok, _}, &1))
|> Enum.map(&elem(&1, 1))
if length(invalid) == 0 do
:ok
else
{:error, List.flatten(invalid)}
end
end
defp validate_index({_name, settings}) do
Vex.validate(
settings,
settings: [presence: true, by: &is_binary/1],
store: [presence: true, by: &is_module/1],
sources: [
presence: true,
by: &(is_list(&1) && Enum.map(&1, fn source -> is_atom(source) end))
]
)
end
end

View file

@ -1,132 +0,0 @@
defmodule Elasticsearch.Config do
@moduledoc """
Convenience functions for fetching configuration values for `Elasticsearch`.
"""
alias Elasticsearch.Store
@doc """
Returns the configured Elasticsearch URL.
## Configuration
config :elasticsearch,
url: "http://localhost:9200"
System tuples are also supported:
config :elasticsearch,
url: {:system, "ELASTICSEARCH_URL"}
## Example
iex> Config.url()
"http://localhost:9200"
"""
@spec url :: String.t()
def url do
from_env(:elasticsearch, :url)
end
@doc """
Returns HTTP basic credential header contents based on the configured
`:username` and `:password`.
## Configuration
config :elasticsearch,
username: "username",
password: "password"
System tuples are also supported:
config :elasticsearch,
username: {:system, "ELASTICSEARCH_USERNAME"},
password: {:system, "ELASTICSEARCH_PASSWORD"}
## Example
iex> Config.http_basic_credentials()
"dXNlcm5hbWU6cGFzc3dvcmQ="
"""
@spec http_basic_credentials :: String.t() | nil
def http_basic_credentials do
username = from_env(:elasticsearch, :username)
password = from_env(:elasticsearch, :password)
if username && password do
Base.encode64("#{username}:#{password}")
end
end
@doc """
Gets the full configuration for a given index.
## Configuration
config :elasticsearch,
indexes: %{
posts: %{
settings: "test/support/settings/posts.json",
store: Elasticsearch.Test.Store,
sources: [Post]
}
}
## Example
iex> Config.config_for_index(:posts)
%{
settings: "test/support/settings/posts.json",
store: Elasticsearch.Test.Store,
sources: [Post]
}
"""
@spec config_for_index(atom) ::
%{
settings: String.t(),
store: Store.t(),
sources: [Store.source()]
}
| nil
def config_for_index(index) do
all()[:indexes][index]
end
@doc """
Returns all configuration values for `Elasticsearch`.
"""
@spec all :: Keyword.t()
def all do
Application.get_all_env(:elasticsearch)
end
@doc """
Returns the JSON library to use for encoding/decoding.
Default: `Poison`
## Configuration
config :elasticsearch, json_library: Jason
"""
@spec json_library :: module
def json_library do
Application.get_env(:elasticsearch, :json_library) || Poison
end
@doc """
A light wrapper around `Application.get_env/2`, providing automatic support for
`{:system, "VAR"}` tuples.
"""
@spec from_env(atom, atom, any) :: any
def from_env(otp_app, key, default \\ nil)
def from_env(otp_app, key, default) do
otp_app
|> Application.get_env(key, default)
|> read_from_system(default)
end
defp read_from_system({:system, env}, default), do: System.get_env(env) || default
defp read_from_system(value, _default), do: value
end

View file

@ -4,6 +4,7 @@ defmodule Elasticsearch.Index.Bulk do
"""
alias Elasticsearch.{
Cluster,
DataStream,
Document
}
@ -47,7 +48,7 @@ defmodule Elasticsearch.Index.Bulk do
{"create":{"_type":"post","_index":"my-index","_id":"my-id"}}
{"title":null,"author":null}
\"\"\"
iex> Bulk.encode!(123, "my-index")
** (Protocol.UndefinedError) protocol Elasticsearch.Document not implemented for 123. This protocol is implemented for: Post
"""
@ -66,21 +67,24 @@ defmodule Elasticsearch.Index.Bulk do
Uploads all the data from the list of `sources` to the given index.
Data for each `source` will be fetched using the configured `:store`.
"""
@spec upload(String.t(), Elasticsearch.Store.t(), list) :: :ok | {:error, [map]}
def upload(index_name, store, sources, errors \\ [])
def upload(_index_name, _store, [], []), do: :ok
def upload(_index_name, _store, [], errors), do: {:error, errors}
@spec upload(Cluster.t(), index_name :: String.t(), Elasticsearch.Store.t(), list) ::
:ok | {:error, [map]}
def upload(cluster, index_name, store, sources, errors \\ [])
def upload(_cluster, _index_name, _store, [], []), do: :ok
def upload(_cluster, _index_name, _store, [], errors), do: {:error, errors}
def upload(cluster, index_name, store, [source | tail] = _sources, errors) do
config = Cluster.Config.get(cluster)
def upload(index_name, store, [source | tail] = _sources, errors) do
errors =
source
|> DataStream.stream(store)
config
|> DataStream.stream(source, store)
|> Stream.map(&encode!(&1, index_name))
|> Stream.chunk_every(config()[:bulk_page_size])
|> Stream.map(&Elasticsearch.put("/#{index_name}/_bulk", Enum.join(&1)))
|> Stream.chunk_every(config.bulk_page_size)
|> Stream.map(&Elasticsearch.put(cluster, "/#{index_name}/_bulk", Enum.join(&1)))
|> Enum.reduce(errors, &collect_errors/2)
upload(index_name, tail, errors)
upload(cluster, index_name, tail, errors)
end
defp collect_errors({:ok, %{"errors" => true} = response}, errors) do
@ -125,8 +129,4 @@ defmodule Elasticsearch.Index.Bulk do
header
end
end
defp config do
Application.get_all_env(:elasticsearch)
end
end

View file

@ -3,7 +3,10 @@ defmodule Elasticsearch.Index do
Functions for manipulating Elasticsearch indexes.
"""
alias Elasticsearch.Index.Bulk
alias Elasticsearch.{
Cluster.Config,
Index.Bulk
}
@doc """
Creates an index using a zero-downtime hot-swap technique.
@ -20,20 +23,27 @@ defmodule Elasticsearch.Index do
iex> file = "test/support/settings/posts.json"
...> store = Elasticsearch.Test.Store
...> Index.hot_swap("posts", file, store, [Post])
...> Index.hot_swap(Cluster, "posts", file, store, [Post])
:ok
"""
@spec hot_swap(String.t() | atom, String.t(), Elasticsearch.Store.t(), list) ::
@spec hot_swap(
Cluster.t(),
alias :: String.t() | atom,
settings_path :: String.t(),
Elasticsearch.Store.t(),
list
) ::
:ok
| {:error, Elasticsearch.Exception.t()}
def hot_swap(alias, settings_file, store, sources) do
def hot_swap(cluster, alias, settings_file, store, sources) do
name = build_name(alias)
config = Config.get(cluster)
with :ok <- create_from_file(name, settings_file),
:ok <- Bulk.upload(name, store, sources),
:ok <- __MODULE__.alias(name, alias),
:ok <- clean_starting_with(alias, 2),
:ok <- refresh(name) do
with :ok <- create_from_file(config, name, settings_file),
:ok <- Bulk.upload(config, name, store, sources),
:ok <- __MODULE__.alias(config, name, alias),
:ok <- clean_starting_with(config, alias, 2),
:ok <- refresh(config, name) do
:ok
end
end
@ -43,15 +53,15 @@ defmodule Elasticsearch.Index do
## Example
iex> Index.create_from_file("posts-1", "test/support/settings/posts.json")
...> Index.starting_with("posts")
iex> Index.create_from_file(Cluster, "posts-1", "test/support/settings/posts.json")
...> Index.starting_with(Cluster, "posts")
{:ok, ["posts-1"]}
"""
@spec starting_with(String.t() | atom) ::
@spec starting_with(Cluster.t(), String.t() | atom) ::
{:ok, [String.t()]}
| {:error, Elasticsearch.Exception.t()}
def starting_with(prefix) do
with {:ok, indexes} <- Elasticsearch.get("/_cat/indices?format=json") do
def starting_with(cluster, prefix) do
with {:ok, indexes} <- Elasticsearch.get(cluster, "/_cat/indices?format=json") do
prefix = to_string(prefix)
indexes =
@ -70,15 +80,16 @@ defmodule Elasticsearch.Index do
## Example
iex> Index.create_from_file("posts-1", "test/support/settings/posts.json")
...> Index.alias("posts-1", "posts")
iex> Index.create_from_file(Cluster, "posts-1", "test/support/settings/posts.json")
...> Index.alias(Cluster, "posts-1", "posts")
:ok
"""
@spec alias(String.t(), String.t()) ::
@spec alias(Cluster.t(), String.t(), String.t()) ::
:ok
| {:error, Elasticsearch.Exception.t()}
def alias(name, alias) do
with {:ok, indexes} <- starting_with(alias), indexes = Enum.reject(indexes, &(&1 == name)) do
def alias(cluster, name, alias) do
with {:ok, indexes} <- starting_with(cluster, alias),
indexes = Enum.reject(indexes, &(&1 == name)) do
remove_actions =
Enum.map(indexes, fn index ->
%{"remove" => %{"index" => index, "alias" => alias}}
@ -88,7 +99,7 @@ defmodule Elasticsearch.Index do
"actions" => remove_actions ++ [%{"add" => %{"index" => name, "alias" => alias}}]
}
with {:ok, _response} <- Elasticsearch.post("/_aliases", actions), do: :ok
with {:ok, _response} <- Elasticsearch.post(cluster, "/_aliases", actions), do: :ok
end
end
@ -97,22 +108,22 @@ defmodule Elasticsearch.Index do
## Examples
iex> Index.create_from_file("posts-1", "test/support/settings/posts.json")
...> Index.create_from_file("posts-2", "test/support/settings/posts.json")
...> Index.latest_starting_with("posts")
iex> Index.create_from_file(Cluster, "posts-1", "test/support/settings/posts.json")
...> Index.create_from_file(Cluster, "posts-2", "test/support/settings/posts.json")
...> Index.latest_starting_with(Cluster, "posts")
{:ok, "posts-2"}
If there are no indexes matching that prefix:
iex> Index.latest_starting_with("nonexistent")
iex> Index.latest_starting_with(Cluster, "nonexistent")
{:error, :not_found}
"""
@spec latest_starting_with(String.t() | atom) ::
@spec latest_starting_with(Cluster.t(), String.t() | atom) ::
{:ok, String.t()}
| {:error, :not_found}
| {:error, Elasticsearch.Exception.t()}
def latest_starting_with(prefix) do
with {:ok, indexes} <- starting_with(prefix) do
def latest_starting_with(cluster, prefix) do
with {:ok, indexes} <- starting_with(cluster, prefix) do
index =
indexes
|> Enum.sort()
@ -130,14 +141,14 @@ defmodule Elasticsearch.Index do
## Example
iex> Index.create_from_file("posts-1", "test/support/settings/posts.json")
...> Index.refresh("posts-1")
iex> Index.create_from_file(Cluster, "posts-1", "test/support/settings/posts.json")
...> Index.refresh(Cluster, "posts-1")
:ok
"""
@spec refresh(String.t()) :: :ok | {:error, Elasticsearch.Exception.t()}
def refresh(name) do
with {:ok, _} <- Elasticsearch.post("/#{name}/_forcemerge?max_num_segments=5", %{}),
{:ok, _} <- Elasticsearch.post("/#{name}/_refresh", %{}),
@spec refresh(Cluster.t(), String.t()) :: :ok | {:error, Elasticsearch.Exception.t()}
def refresh(cluster, name) do
with {:ok, _} <- Elasticsearch.post(cluster, "/#{name}/_forcemerge?max_num_segments=5", %{}),
{:ok, _} <- Elasticsearch.post(cluster, "/#{name}/_refresh", %{}),
do: :ok
end
@ -146,16 +157,16 @@ defmodule Elasticsearch.Index do
## Examples
iex> Index.create_from_file("posts-1", "test/support/settings/posts.json")
...> Index.refresh!("posts-1")
iex> Index.create_from_file(Cluster, "posts-1", "test/support/settings/posts.json")
...> Index.refresh!(Cluster, "posts-1")
:ok
iex> Index.refresh!("nonexistent")
iex> Index.refresh!(Cluster, "nonexistent")
** (Elasticsearch.Exception) (index_not_found_exception) no such index
"""
@spec refresh!(String.t()) :: :ok
def refresh!(name) do
case refresh(name) do
@spec refresh!(Cluster.t(), String.t()) :: :ok
def refresh!(cluster, name) do
case refresh(cluster, name) do
:ok ->
:ok
@ -173,24 +184,24 @@ defmodule Elasticsearch.Index do
If there is only one index, and `num_to_keep` is >= 1, the index is not deleted.
iex> Index.create_from_file("posts-1", "test/support/settings/posts.json")
...> Index.clean_starting_with("posts", 1)
...> Index.starting_with("posts")
iex> Index.create_from_file(Cluster, "posts-1", "test/support/settings/posts.json")
...> Index.clean_starting_with(Cluster, "posts", 1)
...> Index.starting_with(Cluster, "posts")
{:ok, ["posts-1"]}
If `num_to_keep` is less than the number of indexes, the older indexes are
deleted.
iex> Index.create_from_file("posts-1", "test/support/settings/posts.json")
...> Index.clean_starting_with("posts", 0)
...> Index.starting_with("posts")
iex> Index.create_from_file(Cluster, "posts-1", "test/support/settings/posts.json")
...> Index.clean_starting_with(Cluster, "posts", 0)
...> Index.starting_with(Cluster, "posts")
{:ok, []}
"""
@spec clean_starting_with(String.t(), integer) ::
@spec clean_starting_with(Cluster.t(), String.t(), integer) ::
:ok
| {:error, [Elasticsearch.Exception.t()]}
def clean_starting_with(prefix, num_to_keep) when is_integer(num_to_keep) do
with {:ok, indexes} <- starting_with(prefix) do
def clean_starting_with(cluster, prefix, num_to_keep) when is_integer(num_to_keep) do
with {:ok, indexes} <- starting_with(cluster, prefix) do
total = length(indexes)
num_to_delete = total - num_to_keep
num_to_delete = if num_to_delete >= 0, do: num_to_delete, else: 0
@ -199,7 +210,7 @@ defmodule Elasticsearch.Index do
indexes
|> Enum.sort()
|> Enum.take(num_to_delete)
|> Enum.map(&Elasticsearch.delete("/#{&1}"))
|> Enum.map(&Elasticsearch.delete(cluster, "/#{&1}"))
|> Enum.filter(&(elem(&1, 0) == :error))
|> Enum.map(&elem(&1, 1))
@ -216,14 +227,14 @@ defmodule Elasticsearch.Index do
## Examples
iex> Index.create("posts-1", "{}")
iex> Index.create(Cluster, "posts-1", "{}")
:ok
"""
@spec create(String.t(), map | String.t()) ::
@spec create(Cluster.t(), String.t(), map | String.t()) ::
:ok
| {:error, Elasticsearch.Exception.t()}
def create(name, settings) do
with {:ok, _response} <- Elasticsearch.put("/#{name}", settings), do: :ok
def create(cluster, name, settings) do
with {:ok, _response} <- Elasticsearch.put(cluster, "/#{name}", settings), do: :ok
end
@doc """
@ -231,10 +242,10 @@ defmodule Elasticsearch.Index do
## Example
iex> Index.create_from_file("posts-1", "test/support/settings/posts.json")
iex> Index.create_from_file(Cluster, "posts-1", "test/support/settings/posts.json")
:ok
iex> Index.create_from_file("posts-1", "nonexistent.json")
iex> Index.create_from_file(Cluster, "posts-1", "nonexistent.json")
{:error, :enoent}
The `posts.json` file contains regular index settings as described in the
@ -255,13 +266,13 @@ defmodule Elasticsearch.Index do
}
}
"""
@spec create_from_file(String.t(), Path.t()) ::
@spec create_from_file(Cluster.t(), String.t(), Path.t()) ::
:ok
| {:error, File.posix()}
| {:error, Elasticsearch.Exception.t()}
def create_from_file(name, file) do
def create_from_file(cluster, name, file) do
with {:ok, settings} <- File.read(file) do
create(name, settings)
create(cluster, name, settings)
end
end

View file

@ -6,37 +6,29 @@ defmodule Elasticsearch.DataStream do
@type source :: any
alias Elasticsearch.Config
alias Elasticsearch.Cluster
@doc """
Creates a `Stream` from a given source.
## Configuration
Your configured `:store` module must handle the given data source.
The stream will be paginated based on the `:bulk_page_size` in the
configuration.
config :elasticsearch,
bulk_page_size: 5000
## Example
iex> stream = DataStream.stream(MyApp.Schema, Elasticsearch.Test.Store)
iex> stream = DataStream.stream(Cluster, MyApp.Schema, Elasticsearch.Test.Store)
...> is_function(stream)
true
"""
@spec stream(source, Elasticsearch.Store.t()) :: Stream.t()
def stream(source, store) do
Stream.resource(&init/0, &next(&1, source, store), &finish/1)
@spec stream(Cluster.t(), source, Elasticsearch.Store.t()) :: Stream.t()
def stream(cluster, source, store) do
config = Cluster.Config.get(cluster)
Stream.resource(fn -> init(config) end, &next(&1, source, store), &finish/1)
end
# Store state in the following format:
#
# {items, offset, limit}
defp init do
{[], 0, Config.all()[:bulk_page_size]}
defp init(config) do
{[], 0, config.bulk_page_size}
end
# If no items, load another page of items
@ -52,8 +44,6 @@ defmodule Elasticsearch.DataStream do
# Fetch a new page of items
defp load_page(source, store, offset, limit) do
page_size = Config.all()[:bulk_page_size]
case store.load(source, offset, limit) do
# If the load returns no more items (i.e., we've iterated through them
# all) then halt the stream and leave offset and limit unchanged.
@ -64,7 +54,7 @@ defmodule Elasticsearch.DataStream do
# tail into the state. Also, increment offset and limit by the
# configured `:bulk_page_size`.
[h | t] ->
{[h], {t, offset + page_size, limit}}
{[h], {t, offset + limit, limit}}
end
end

View file

@ -8,53 +8,55 @@ defmodule Mix.Tasks.Elasticsearch.Build do
4. Remove old indexes beginning with `alias`.
5. Refresh `alias-12323123`.
For a functional version of this approach, see
For a functional version of this approach, see
`Elasticsearch.Index.hot_swap/4`.
## Example
$ mix elasticsearch.build posts [index2] [index3]
$ mix elasticsearch.build posts [index2] [index3] --cluster MyApp.Cluster
To build an index only if it does not exist, use the `--existing` option:
$ mix elasticsearch.build posts --existing
$ mix elasticsearch.build posts --existing --cluster MyApp.Cluster
Index posts already exists.
"""
require Logger
alias Elasticsearch.{
Index,
Config
Cluster.Config,
Index
}
@doc false
def run(args) do
Mix.Task.run("app.start", [])
{indexes, type} = parse_args!(args)
{cluster, indexes, type} = parse_args!(args)
config = Config.get(cluster)
for alias <- indexes do
config = Config.config_for_index(alias)
build(alias, config, type)
build(config, alias, type)
end
end
defp build(alias, config, :existing) do
case Index.latest_starting_with(alias) do
defp build(config, alias, :existing) do
case Index.latest_starting_with(config, alias) do
{:ok, name} ->
IO.puts("Index already exists: #{name}")
{:error, :not_found} ->
build(alias, config, :rebuild)
build(config, alias, :rebuild)
{:error, exception} ->
Mix.raise(exception)
end
end
defp build(alias, %{settings: settings, store: store, sources: sources}, :rebuild) do
with :ok <- Index.hot_swap(alias, settings, store, sources) do
defp build(config, alias, :rebuild) do
%{settings: settings, store: store, sources: sources} = config.indexes[alias]
with :ok <- Index.hot_swap(config, alias, settings, store, sources) do
:ok
else
{:error, errors} when is_list(errors) ->
@ -85,35 +87,32 @@ defmodule Mix.Tasks.Elasticsearch.Build do
end
defp parse_args!(args) do
{options, indexes} =
OptionParser.parse!(
args,
switches: [
existing: :boolean
]
)
{options, indexes} = OptionParser.parse!(args, strict: [cluster: :string, existing: :boolean])
cluster =
if options[:cluster] do
:"Elixir.#{options[:cluster]}"
else
Mix.raise("""
Please specify a cluster:
--cluster MyApp.ClusterName
""")
end
indexes =
indexes
|> Enum.map(&String.to_atom/1)
|> MapSet.new()
|> validate_indexes!(cluster)
type =
cond do
options[:existing] ->
:existing
type = if options[:existing], do: :existing, else: :rebuild
true ->
:rebuild
end
validate_indexes!(indexes)
{indexes, type}
{cluster, indexes, type}
end
defp validate_indexes!(indexes) do
configured = configured_names()
defp validate_indexes!(indexes, cluster) do
configured = configured_index_names(cluster)
cond do
MapSet.size(indexes) == 0 ->
@ -131,18 +130,15 @@ defmodule Mix.Tasks.Elasticsearch.Build do
""")
true ->
:ok
indexes
end
end
defp configured_names do
config()
|> Keyword.get(:indexes)
defp configured_index_names(cluster) do
cluster
|> Config.get()
|> Map.get(:indexes)
|> Enum.map(fn {key, _val} -> key end)
|> MapSet.new()
end
defp config do
Application.get_all_env(:elasticsearch)
end
end

View file

@ -58,6 +58,8 @@ defmodule Elasticsearch.Mixfile do
[
{:poison, ">= 0.0.0", optional: true},
{:httpoison, ">= 0.0.0"},
{:vex, "~> 0.6.0"},
{:stream_data, ">= 0.0.0", only: [:dev, :test]},
{:dialyze, ">= 0.0.0", only: [:dev, :test]},
{:ex_doc, ">= 0.0.0", only: [:dev, :test]},
{:excoveralls, ">= 0.0.0", only: :test}
@ -75,7 +77,7 @@ defmodule Elasticsearch.Mixfile do
Elasticsearch.API.HTTP
],
Config: [
Elasticsearch.Config
Elasticsearch.Cluster
],
Indexing: [
Elasticsearch.Index,
@ -92,4 +94,4 @@ defmodule Elasticsearch.Mixfile do
]
]
end
end
end

View file

@ -14,5 +14,7 @@
"mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [], [], "hexpm"},
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [], [], "hexpm"},
"stream_data": {:hex, :stream_data, "0.4.2", "fa86b78c88ec4eaa482c0891350fcc23f19a79059a687760ddcf8680aac2799b", [:mix], [], "hexpm"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.3.1", "a1f612a7b512638634a603c8f401892afbf99b8ce93a45041f8aaca99cadb85e", [], [], "hexpm"},
"vex": {:hex, :vex, "0.6.0", "4e79b396b2ec18cd909eed0450b19108d9631842598d46552dc05031100b7a56", [:mix], [], "hexpm"},
}

View file

@ -0,0 +1,139 @@
defmodule Elasticsearch.ClusterTest do
use ExUnit.Case, async: false
def valid_config do
%{
api: Elasticsearch.API.HTTP,
bulk_page_size: 5000,
bulk_wait_interval: 5000,
json_library: Poison,
url: "http://localhost:9200",
username: "username",
password: "password",
indexes: %{
posts: %{
settings: "test/support/settings/posts.json",
store: Elasticsearch.Test.Store,
sources: [Post]
}
}
}
end
setup do
Application.put_env(
:elasticsearch,
Elasticsearch.ClusterTest.MixConfiguredCluster,
valid_config()
)
end
defmodule Cluster do
use Elasticsearch.Cluster
end
defmodule MixConfiguredCluster do
use Elasticsearch.Cluster, otp_app: :elasticsearch
end
defmodule InitConfiguredCluster do
use Elasticsearch.Cluster
def init(_config) do
{:ok, Elasticsearch.ClusterTest.valid_config()}
end
end
describe "configuration" do
test "accepts Mix configuration" do
assert {:ok, _pid} = MixConfiguredCluster.start_link()
assert MixConfiguredCluster.__config__() == valid_config()
end
test "accepts init configuration" do
assert {:ok, _pid} = InitConfiguredCluster.start_link()
assert InitConfiguredCluster.__config__() == valid_config()
end
test "accepts configuration on startup" do
assert {:ok, _pid} = Cluster.start_link(valid_config())
assert Cluster.__config__() == valid_config()
end
end
describe ".start_link/1" do
test "validates url" do
refute errors_on(url: "http://localhost:9200")[:url]
assert errors_on(url: "werlkjweoqwelj").url
end
test "validates username" do
assert {"must be present", validation: :presence} in errors_on(%{password: "password"}).username
refute errors_on([])[:username]
end
test "validates password" do
assert {"must be present", validation: :presence} in errors_on(%{username: "username"}).password
refute errors_on([])[:password]
end
test "validates api" do
assert {"must be present", validation: :presence} in errors_on([]).api
for invalid <- [Nonexistent.Module, "string"] do
assert {"must be valid", validation: :by} in errors_on(api: invalid).api
end
end
test "validates json_library" do
refute errors_on([])[:json_library]
refute errors_on(json_library: Poison)[:json_library]
assert {"must be valid", validation: :by} in errors_on(json_library: Nonexistent.Module).json_library
end
test "validates bulk_page_size" do
assert {"must be present", validation: :presence} in errors_on([]).bulk_page_size
for invalid <- [nil, "string", :atom] do
assert {"must be valid", validation: :by} in errors_on(bulk_page_size: invalid).bulk_page_size
end
end
test "validates bulk_wait_interval" do
assert {"must be present", validation: :presence} in errors_on([]).bulk_wait_interval
for invalid <- [nil, "string", :atom] do
assert {"must be valid", validation: :by} in errors_on(bulk_wait_interval: invalid).bulk_wait_interval
end
end
test "validates indexes" do
errors = errors_on(%{valid_config() | indexes: %{example: %{}}})
for field <- [:settings, :store, :sources] do
assert {"must be present", validation: :presence} in errors[field]
end
errors =
errors_on(%{
valid_config()
| indexes: %{example: %{settings: :atom, store: Nonexistent.Module, sources: 123}}
})
for field <- [:settings, :store, :sources] do
assert {"must be valid", validation: :by} in errors[field]
end
end
test "accepts valid configuration" do
assert {:ok, pid} = Cluster.start_link(valid_config())
assert is_pid(pid)
end
end
defp errors_on(config) do
{:error, errors} = Cluster.start_link(config)
errors
end
end

View file

@ -1,7 +0,0 @@
defmodule Elasticsearch.ConfigTest do
use ExUnit.Case
alias Elasticsearch.Config
doctest Elasticsearch.Config
end

View file

@ -1,13 +1,16 @@
defmodule Elasticsearch.IndexTest do
defmodule Elasticsearch.Cluster.IndexTest do
use ExUnit.Case
alias Elasticsearch.Index
alias Elasticsearch.{
Index,
Test.Cluster
}
doctest Elasticsearch.Index
setup do
for index <- ["posts"] do
Elasticsearch.delete("/#{index}*")
Elasticsearch.delete(Cluster, "/#{index}*")
end
end
end

View file

@ -1,6 +1,7 @@
defmodule Elasticsearch.DataStreamTest do
use ExUnit.Case
alias Elasticsearch.Test.Cluster
alias Elasticsearch.DataStream
doctest Elasticsearch.DataStream

View file

@ -1,16 +1,21 @@
defmodule ElasticsearchTest do
use ExUnit.Case
alias Elasticsearch.{
Index,
Test.Cluster
}
doctest Elasticsearch
setup do
on_exit(fn ->
"posts"
|> Elasticsearch.Index.starting_with()
Cluster
|> Index.starting_with("posts")
|> elem(1)
|> Enum.map(&Elasticsearch.delete!("/#{&1}"))
|> Enum.map(&Elasticsearch.delete!(Cluster, "/#{&1}"))
Elasticsearch.delete("/nonexistent")
Elasticsearch.delete(Cluster, "/nonexistent")
end)
end
end

View file

@ -4,18 +4,20 @@ defmodule Mix.Tasks.Elasticsearch.BuildTest do
import Mix.Task, only: [rerun: 2]
import ExUnit.CaptureIO
alias Elasticsearch
alias Elasticsearch.Index
alias Elasticsearch.Test.Cluster, as: TestCluster
setup do
on_exit(fn ->
"posts"
|> Index.starting_with()
TestCluster
|> Index.starting_with("posts")
|> elem(1)
|> Enum.map(&Elasticsearch.delete("/#{&1}"))
|> Enum.map(&Elasticsearch.delete(TestCluster, "/#{&1}"))
end)
end
@cluster_opts ["--cluster", "Elasticsearch.Test.Cluster"]
describe ".run" do
test "raises error on invalid options" do
assert_raise Mix.Error, fn ->
@ -23,45 +25,51 @@ defmodule Mix.Tasks.Elasticsearch.BuildTest do
end
end
test "raises error if cluster not specified" do
assert_raise Mix.Error, fn ->
rerun("elasticsearch.build", ["posts"])
end
end
test "raises error on unconfigured indexes" do
assert_raise Mix.Error, fn ->
rerun("elasticsearch.build", ["nonexistent"])
rerun("elasticsearch.build", ["nonexistent"] ++ @cluster_opts)
end
end
test "raises error if no index specified" do
assert_raise Mix.Error, fn ->
rerun("elasticsearch.build", [])
rerun("elasticsearch.build", [] ++ @cluster_opts)
end
end
test "builds configured index" do
rerun("elasticsearch.build", ["posts"])
rerun("elasticsearch.build", ["posts"] ++ @cluster_opts)
resp = Elasticsearch.get!("/posts/_search")
resp = Elasticsearch.get!(TestCluster, "/posts/_search")
assert resp["hits"]["total"] == 10_000
end
test "only keeps two index versions" do
for _ <- 1..3 do
rerun("elasticsearch.build", ["posts"])
rerun("elasticsearch.build", ["posts"] ++ @cluster_opts)
:timer.sleep(1000)
end
{:ok, indexes} = Index.starting_with("posts")
{:ok, indexes} = Index.starting_with(TestCluster, "posts")
assert length(indexes) == 2
[_previous, current] = Enum.sort(indexes)
# assert that the most recent index is the one that is aliased
assert {:ok, %{^current => _}} = Elasticsearch.get("/posts/_alias")
assert {:ok, %{^current => _}} = Elasticsearch.get(TestCluster, "/posts/_alias")
end
test "--existing checks if index exists" do
rerun("elasticsearch.build", ["posts"])
rerun("elasticsearch.build", ["posts"] ++ @cluster_opts)
io =
capture_io(fn ->
rerun("elasticsearch.build", ["posts", "--existing"])
rerun("elasticsearch.build", ["posts", "--existing"] ++ @cluster_opts)
end)
assert io =~ "Index already exists: posts-"

25
test/support/cluster.ex Normal file
View file

@ -0,0 +1,25 @@
defmodule Elasticsearch.Test.Cluster do
@moduledoc false
use Elasticsearch.Cluster
def init(_config) do
{:ok,
%{
api: Elasticsearch.API.HTTP,
bulk_page_size: 5000,
bulk_wait_interval: 5000,
json_library: Poison,
url: "http://localhost:9200",
username: "username",
password: "password",
indexes: %{
posts: %{
settings: "test/support/settings/posts.json",
store: Elasticsearch.Test.Store,
sources: [Post]
}
}
}}
end
end

View file

@ -8,4 +8,5 @@ unless System.get_env("CI") do
)
end
{:ok, _} = Elasticsearch.wait_for_boot(15)
{:ok, _} = Elasticsearch.Test.Cluster.start_link()
{:ok, _} = Elasticsearch.wait_for_boot(Elasticsearch.Test.Cluster, 15)