Draft version

Contains the basic building blocks.
This commit is contained in:
Daniel Berkompas 2017-10-30 12:26:40 -07:00
parent 6d65bd3b20
commit b75fd2ed90
24 changed files with 1308 additions and 98 deletions

103
README.md
View file

@ -1,6 +1,9 @@
# Elasticsearch
**TODO: Add description**
A simple, no-nonsense Elasticsearch library for Elixir. Highlights include:
- **No DSLs.** Interact directly with the `Elasticsearch` JSON API.
- **Zero-downtime index (re)building.** Via `Mix.Tasks.Elasticsearch.Build` task.
## Installation
@ -15,7 +18,99 @@ def deps do
end
```
Documentation can be generated with [ExDoc](https://github.com/elixir-lang/ex_doc)
and published on [HexDocs](https://hexdocs.pm). Once published, the docs can
be found at [https://hexdocs.pm/elasticsearch](https://hexdocs.pm/elasticsearch).
## Configuration
See the annotated example configuration below.
```elixir
config :elasticsearch,
# The URL where Elasticsearch is hosted on your system
url: "http://localhost:9200", # or {:system, "ELASTICSEARCH_URL"}
# If your Elasticsearch cluster uses HTTP basic authentication,
# specify the username and password here:
username: "username", # or {:system, "ELASTICSEARCH_USERNAME"}
password: "password", # or {:system, "ELASTICSEARCH_PASSWORD"}
# When indexing data using the `mix elasticsearch.build` task,
# control the data ingestion rate by raising or lowering the number
# of items to send in each bulk request.
bulk_page_size: 5000,
# Likewise, wait a given period between posting pages to give
# Elasticsearch time to catch up.
bulk_wait_interval: 15_000, # 15 seconds
# This loader module must implement the Elasticsearch.DataLoader
# behaviour. It will be used to fetch data for each source in each
# indexes' `sources` list, below:
loader: MyApp.ElasticsearchLoader,
# If you want to mock the responses of the Elasticsearch JSON API
# for testing or other purposes, you can inject a different module
# here. It must implement the Elasticsearch.API behaviour.
api_module: Elasticsearch.API.HTTP,
# You should configure each index which you maintain in Elasticsearch here.
indexes: %{
# `:cities` becomes the Elixir name for this index, which you'll use in
# queries, etc.
cities: %{
# This is the base name of the Elasticsearch index. Each index will be
# built with a timestamp included in the name, like "cities-5902341238".
# It will then be aliased to "cities" for easy querying.
alias: "cities",
# This file describes the mappings and settings for your index. It will
# be posted as-is to Elasticsearch when you create your index, and
# therefore allows all the settings you could post directly.
schema: "priv/elasticsearch/cities.json",
# This is the list of data sources that should be used to populate this
# index. The `:loader` module above will be passed each one of these
# sources for fetching.
#
# Each piece of data that is returned by the loader must implement the
# Elasticsearch.Document protocol.
sources: [Type1]
}
}
```
## Querying
You can query Elasticsearch using raw requests, or with the help of
the `Elasticsearch.Query` struct.
```elixir
# Raw query
Elasticsearch.post("/cities/city/_search", '{"query": {"match_all": {}}}')
# Using a map
Elasticsearch.post("/cities/city/_search", %{"query" => %{"match_all" => %{}}})
# Using a query
query = %Elasticsearch.Query{
indexes: [:cities],
types: [:city],
query: %{
"query" => %{
"match_all" => %{}
}
}
}
Elasticsearch.execute(query)
```
TODOS:
- [ ] Write tests
- [ ] Update documentation in `Elasticsearch` module
- [ ] Update documentation in `mix elasticsearch.build` task
- [ ] Document how to mock Elasticsearch for testing
- [ ] Push to IR owned repo
- [ ] Prepare for publishing as hex package
- [ ] Update README
- [ ] Spec for `--append` option

View file

@ -3,20 +3,22 @@
use Mix.Config
config :elasticsearch,
url: "url here",
url: "http://localhost:9200",
username: "username",
password: "password",
bulk_page_size: 5000,
bulk_wait_interval: 15_000, # 15 seconds
loader: Elasticsearch.Test.DataLoader,
api_module: Elasticsearch.API.HTTP,
indexes: %{
index1: %{
alias: "index1_alias",
schema: "priv/elasticsearch/index1.json",
sources: [MyApp.Main] # Ecto schemas
sources: [Type1]
},
index2: %{
alias: "index2_alias",
schema: "priv/elasticsearch/index2.json",
sources: [MyApp.City]
sources: [Type2]
}
}

View file

@ -1,18 +1,507 @@
defmodule Elasticsearch do
@moduledoc """
Documentation for Elasticsearch.
"""
alias Elasticsearch.{
Query
}
@type response ::
{:ok, map} |
{:error, Elasticsearch.Exception.t}
@doc """
Hello world.
Executes an `Elasticsearch.Query`, and returns the response.
## Example
query = %Query{
indexes: [:index1],
types: [:type1],
query: %{
"size" => 1,
"query" => %{
"match_all" => %{}
}
}
}
Elasticsearch.execute(query)
# => {:ok, %{
# "_shards" => %{
# "failed" => 0,
# "successful" => 5,
# "total" => 5
# },
# "hits" => %{
# "hits" => [%{
# "_id" => "89phLzwlKSMUqKbTYoswsncqEb5vWdfDlteg+HuFLG4=",
# "_index" => "index1_alias-1509582436", "_score" => 1.0,
# "_source" => %{
# "author" => "Author",
# "name" => "Name"
# },
# "_type" => "type1"
# }],
# "max_score" => 1.0,
# "total" => 10000
# },
# "timed_out" => false,
# "took" => 1
# }}
"""
@spec execute(Query.t) :: response
def execute(query) do
post("#{Query.url(query)}", query.query)
end
@doc """
Same as `execute/1`, but raises errors.
## Example
iex> query = %Query{
...> indexes: [:index1],
...> types: [:type1],
...> query: %{"query" => %{"match_all" => %{}}}
...> }
...> Elasticsearch.execute!(query)
** (Elasticsearch.Exception) (index_not_found_exception) no such index
"""
@spec execute!(Query.t) :: map
def execute!(query) do
case execute(query) do
{:ok, response} ->
response
{:error, error} ->
raise error
end
end
@doc """
Creates an index with the given name from a JSON schema file.
## Example
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
:ok
iex> Elasticsearch.create_index("test2", "nonexistent.json")
{:error, :enoent}
"""
@spec create_index(String.t, Path.t) ::
:ok |
{:error, File.posix} |
{:error, Elasticsearch.Exception.t}
def create_index(name, schema) do
with {:ok, contents} <- File.read(schema),
{:ok, _response} <- put("/#{name}", contents) do
:ok
end
end
@doc """
Assigns an alias to a given index, simultaneously removing it from prior
indexes, with zero downtime.
The previous index will be preserved, to make it easier to rollback to
an earlier index.
## Example
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.alias_index("test1", "test")
:ok
"""
@spec alias_index(String.t, String.t) ::
:ok |
{:error, Elasticsearch.Exception.t}
def alias_index(index_name, index_alias) do
with {:ok, indexes} <- indexes_starting_with(index_alias),
indexes = Enum.reject(indexes, &(&1 == index_name)) do
remove_actions =
Enum.map indexes, fn(index) ->
%{"remove" => %{"index" => index, "alias" => index_alias}}
end
actions = %{
"actions" =>
remove_actions ++
[%{"add" => %{"index" => index_name, "alias" => index_alias}}
]
}
# Delete all but the most recent index
indexes_to_delete = indexes -- [List.last(indexes)]
with {:ok, _} <- post("/_aliases", actions),
:ok <- delete_indexes(indexes_to_delete) do
:ok
end
end
end
@doc """
Returns all indexes which start with a given string.
## Example
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.create_index("test2", "priv/elasticsearch/index2.json")
...> Elasticsearch.indexes_starting_with("test")
{:ok, ["test1", "test2"]}
"""
def indexes_starting_with(prefix) do
with {:ok, indexes} <- get("/_cat/indices?format=json") do
indexes =
indexes
|> Stream.map(&(&1["index"]))
|> Stream.filter(&String.starts_with?(&1, prefix))
|> Enum.sort()
{:ok, indexes}
end
end
@doc """
Gets the most recent index name with the given prefix.
## Examples
iex> Elasticsearch.hello
:world
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.create_index("test2", "priv/elasticsearch/index2.json")
...> Elasticsearch.latest_index_starting_with("test")
{:ok, "test2"}
If there are no indexes matching that prefix:
iex> Elasticsearch.latest_index_starting_with("nonexistent")
{:error, :not_found}
"""
def hello do
:world
@spec latest_index_starting_with(String.t) ::
{:ok, String.t} |
{:error, :not_found} |
{:error, Elasticsearch.Exception.t}
def latest_index_starting_with(prefix) do
with {:ok, indexes} <- indexes_starting_with(prefix) do
index =
indexes
|> Enum.sort()
|> List.last()
case index do
nil -> {:error, :not_found}
index -> {:ok, index}
end
end
end
@doc """
Refreshes a given index with recently added data.
## Example
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.refresh_index("test1")
:ok
"""
@spec refresh_index(String.t) :: :ok | {:error, Elasticsearch.Exception.t}
def refresh_index(index_name) do
with {:ok, _} <- post("/#{index_name}/_forcemerge?max_num_segments=5", %{}),
{:ok, _} <- post("/#{index_name}/_refresh", %{}),
do: :ok
end
@doc """
Same as `refresh_index/1`, but raises an error on failure.
## Examples
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.refresh_index!("test1")
:ok
iex> Elasticsearch.refresh_index!("nonexistent")
** (Elasticsearch.Exception) (index_not_found_exception) no such index
"""
@spec refresh_index!(String.t) :: :ok
def refresh_index!(index_name) do
case refresh_index(index_name) do
:ok ->
:ok
{:error, error} ->
raise error
end
end
@doc """
Deletes multiple indexes in one function call.
If you only need to delete one index, see either of these two functions
instead:
- `delete/1`
- `delete!/1`
## Examples
If any given index fails to delete, a list of `Elasticsearch.Exception`s will
be returned.
iex> Elasticsearch.delete_indexes(["nonexistent"])
{:error,
[%Elasticsearch.Exception{col: nil, line: nil,
message: "no such index", query: nil,
raw: %{"error" => %{"index" => "nonexistent",
"index_uuid" => "_na_", "reason" => "no such index",
"resource.id" => "nonexistent",
"resource.type" => "index_or_alias",
"root_cause" => [%{"index" => "nonexistent",
"index_uuid" => "_na_", "reason" => "no such index",
"resource.id" => "nonexistent",
"resource.type" => "index_or_alias",
"type" => "index_not_found_exception"}],
"type" => "index_not_found_exception"}, "status" => 404},
status: 404, type: "index_not_found_exception"}]}
Otherwise, you'll get `:ok`:
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.delete_indexes(["test1"])
:ok
"""
@spec delete_indexes([String.t]) ::
:ok |
{:error, [Elasticsearch.Exception.t]}
def delete_indexes(indexes) do
errors =
indexes
|> Stream.map(&delete("/#{&1}"))
|> Stream.filter(&(elem(&1, 0) == :error))
|> Enum.map(&elem(&1, 1))
if errors == [] do
:ok
else
{:error, errors}
end
end
@doc """
Gets the contents of a path from the Elasticsearch API.
## Examples
iex> {:ok, resp} = Elasticsearch.get("/_cat/health?format=json")
...> is_list(resp)
true
iex> Elasticsearch.get("/nonexistent")
{:error,
%Elasticsearch.Exception{col: nil, line: nil,
message: "no such index", query: nil,
raw: %{"error" => %{"index" => "nonexistent",
"index_uuid" => "_na_", "reason" => "no such index",
"resource.id" => "nonexistent",
"resource.type" => "index_or_alias",
"root_cause" => [%{"index" => "nonexistent",
"index_uuid" => "_na_", "reason" => "no such index",
"resource.id" => "nonexistent",
"resource.type" => "index_or_alias",
"type" => "index_not_found_exception"}],
"type" => "index_not_found_exception"}, "status" => 404},
status: 404, type: "index_not_found_exception"}}
"""
@spec get(String.t) :: response
def get(url) do
format(api_module().get(url))
end
@doc """
The same as `get/1`, but returns the response instead of a tuple. Raises on
errors.
## Examples
iex> resp = Elasticsearch.get!("/_cat/health?format=json")
...> is_list(resp)
true
iex> Elasticsearch.get!("/nonexistent")
** (Elasticsearch.Exception) (index_not_found_exception) no such index
"""
@spec get!(String.t) :: map
def get!(url) do
url
|> get()
|> unwrap!()
end
@doc """
Puts data to a given Elasticsearch API path.
## Examples
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.put("/test1/type1/id", %{"name" => "name", "author" => "author"})
{:ok,
%{"_id" => "id", "_index" => "test1",
"_shards" => %{"failed" => 0, "successful" => 1, "total" => 2},
"_type" => "type1", "_version" => 1, "created" => true,
"result" => "created"}}
iex> Elasticsearch.put("/bad/url", %{"name" => "name", "author" => "author"})
{:error,
%Elasticsearch.Exception{col: nil, line: nil,
message: "No handler found for uri [/bad/url] and method [PUT]",
query: nil, raw: nil, status: nil, type: nil}}
"""
@spec put(String.t, map | binary) :: response
def put(url, data) do
format(api_module().put(url, data))
end
@doc """
The same as `put/2`, but returns the response instead of a tuple. Raises on
errors.
## Examples
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.put!("/test1/type1/id", %{"name" => "name", "author" => "author"})
%{"_id" => "id", "_index" => "test1",
"_shards" => %{"failed" => 0, "successful" => 1, "total" => 2},
"_type" => "type1", "_version" => 1, "created" => true,
"result" => "created"}
iex> Elasticsearch.put!("/bad/url", %{"data" => "here"})
** (Elasticsearch.Exception) No handler found for uri [/bad/url] and method [PUT]
"""
@spec put!(String.t, map) :: map
def put!(url, data) do
url
|> put(data)
|> unwrap!()
end
@doc """
Posts data or queries to a given Elasticsearch path. If you want to execute
an `Elasticsearch.Query`, see `execute/1` instead.
## Examples
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
...> query = %{"query" => %{"match_all" => %{}}}
...> Elasticsearch.post("/test1/_search", query)
{:ok,
%{"_shards" => %{"failed" => 0, "successful" => 5, "total" => 5},
"hits" => %{"hits" => [], "max_score" => nil, "total" => 0},
"timed_out" => false, "took" => 1}}
"""
@spec post(String.t, map) :: response
def post(url, data) do
format(api_module().post(url, data))
end
@doc """
The same as `post/1`, but returns the response. Raises on errors.
## Examples
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
...> query = %{"query" => %{"match_all" => %{}}}
...> Elasticsearch.post!("/test1/_search", query)
%{"_shards" => %{"failed" => 0, "successful" => 5, "total" => 5},
"hits" => %{"hits" => [], "max_score" => nil, "total" => 0},
"timed_out" => false, "took" => 1}
Raises an error if the path is invalid or another error occurs:
iex> query = %{"query" => %{"match_all" => %{}}}
...> Elasticsearch.post!("/nonexistent/_search", query)
** (Elasticsearch.Exception) (index_not_found_exception) no such index
"""
@spec post!(String.t, map) :: map
def post!(url, data) do
url
|> post(data)
|> unwrap!()
end
@doc """
Deletes data at a given Elasticsearch URL.
## Examples
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.delete("/test1")
{:ok, %{"acknowledged" => true}}
It returns an error if the given resource does not exist.
iex> Elasticsearch.delete("/nonexistent")
{:error,
%Elasticsearch.Exception{col: nil, line: nil,
message: "no such index", query: nil,
raw: %{"error" => %{"index" => "nonexistent",
"index_uuid" => "_na_", "reason" => "no such index",
"resource.id" => "nonexistent",
"resource.type" => "index_or_alias",
"root_cause" => [%{"index" => "nonexistent",
"index_uuid" => "_na_", "reason" => "no such index",
"resource.id" => "nonexistent",
"resource.type" => "index_or_alias",
"type" => "index_not_found_exception"}],
"type" => "index_not_found_exception"}, "status" => 404},
status: 404, type: "index_not_found_exception"}}
"""
@spec delete(String.t) :: response
def delete(url) do
format(api_module().delete(url))
end
@doc """
Same as `delete/1`, but returns the response and raises errors.
## Examples
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.delete!("/test1")
%{"acknowledged" => true}
Raises an error if the resource is invalid.
iex> Elasticsearch.delete!("/nonexistent")
** (Elasticsearch.Exception) (index_not_found_exception) no such index
"""
@spec delete!(String.t) :: map
def delete!(url) do
url
|> delete()
|> unwrap!()
end
defp format({:ok, %{status_code: code, body: body}})
when code >= 200 and code < 300 do
{:ok, body}
end
defp format({:ok, %{body: body}}) do
error = Elasticsearch.Exception.exception(response: body)
{:error, error}
end
defp format(error), do: error
defp unwrap!({:ok, value}), do: value
defp unwrap!({:error, exception}), do: raise exception
defp api_module do
config()[:api_module] || Elasticsearch.API.HTTP
end
defp config do
Application.get_all_env(:elasticsearch)
end
end

19
lib/elasticsearch/api.ex Normal file
View file

@ -0,0 +1,19 @@
defmodule Elasticsearch.API do
@moduledoc """
A behaviour that an Elasticsearch API must adhere to.
"""
@type url :: String.t
@type data :: map | Keyword.t
@type opts :: Keyword.t
@type headers :: Keyword.t
@type response ::
{:ok, HTTPoison.Response.t | HTTPoison.AsyncResponse.t} |
{:error, HTTPoison.Error.t}
@callback get(url, headers, opts) :: response
@callback put(url, data, headers, opts) :: response
@callback post(url, data, headers, opts) :: response
@callback delete(url, headers, opts) :: response
end

View file

@ -0,0 +1,47 @@
defmodule Elasticsearch.API.HTTP do
@moduledoc """
An HTTP implementation of `Elasticsearch.API`.
"""
@behaviour Elasticsearch.API
use HTTPoison.Base
alias Elasticsearch.Config
###
# HTTPoison Callbacks
###
@doc false
def process_url(url) do
Config.url <> url
end
def process_request_headers(_headers) do
headers = [{"Content-Type", "application/json"}]
credentials = Config.http_basic_credentials()
if credentials do
[{"Authorization", "Basic #{credentials}"} | headers]
else
headers
end
end
@doc false
def process_request_body(string) when is_binary(string), do: string
def process_request_body(map) when is_map(map) do
Poison.encode!(map)
end
@doc false
def process_response_body(body) do
if body =~ "{" do
Poison.decode!(body)
else
body
end
end
end

134
lib/elasticsearch/bulk.ex Normal file
View file

@ -0,0 +1,134 @@
defmodule Elasticsearch.Bulk do
@moduledoc """
Functions for creating bulk indexing requests.
"""
alias Elasticsearch.{
DataStream,
Document
}
require Logger
@doc """
Encodes a given variable into an Elasticsearch bulk request. The variable
must implement `Elasticsearch.Document`.
## Examples
iex> Bulk.encode(%Type1{id: "my-id"}, "my-index")
{:ok, \"\"\"
{"create":{"_type":"type1","_index":"my-index","_id":"my-id"}}
{"name":null,"author":null}
\"\"\"}
iex> Bulk.encode(123, "my-index")
{:error,
%Protocol.UndefinedError{description: "",
protocol: Elasticsearch.Document, value: 123}}
"""
@spec encode(struct, String.t) ::
{:ok, String.t} |
{:error, Error.t}
def encode(struct, index) do
{:ok, encode!(struct, index)}
rescue
exception ->
{:error, exception}
end
@doc """
Same as `encode/1`, but returns the request and raises errors.
## Example
iex> Bulk.encode!(%Type1{id: "my-id"}, "my-index")
\"\"\"
{"create":{"_type":"type1","_index":"my-index","_id":"my-id"}}
{"name":null,"author":null}
\"\"\"
iex> Bulk.encode!(123, "my-index")
** (Protocol.UndefinedError) protocol Elasticsearch.Document not implemented for 123. This protocol is implemented for: Type1, Type2
"""
def encode!(struct, index) do
header = header("create", index, struct)
document =
struct
|> Document.encode
|> Poison.encode!
"#{header}\n#{document}\n"
end
@doc """
Uploads all the data from the list of `sources` to the given index.
Data for each `source` will be fetched using the configured `:loader`.
## Example
iex> Bulk.upload("test1", [:type1])
:ok
"""
@spec upload(String.t, list) :: :ok | {:error, [map]}
def upload(index_name, sources, errors \\ [])
def upload(_index_name, [], []), do: :ok
def upload(_index_name, [], errors), do: {:error, errors}
def upload(index_name, [source | tail] = _sources, errors) do
errors =
source
|> DataStream.stream()
|> Stream.map(&encode!(&1, index_name))
|> Stream.chunk_every(config()[:bulk_page_size])
|> Stream.map(&Elasticsearch.put("/#{index_name}/_bulk", Enum.join(&1)))
|> Enum.reduce(errors, &collect_errors/2)
upload(index_name, tail, errors)
end
defp collect_errors({:ok, %{"errors" => true} = response}, errors) do
new_errors =
response["items"]
|> Enum.filter(&(&1["create"]["error"] != nil))
|> Enum.map(&(&1["create"]))
|> Enum.map(&Elasticsearch.Exception.exception(response: &1))
new_errors ++ errors
end
defp collect_errors({:error, error}, errors) do
[error | errors]
end
defp collect_errors(_response, errors) do
errors
end
defp header(type, index, struct) do
attrs = %{
"_index" => index,
"_type" => Document.type(struct),
"_id" => Document.id(struct)
}
header =
%{}
|> Map.put(type, attrs)
|> put_parent(type, struct)
Poison.encode!(header)
end
defp put_parent(header, type, struct) do
parent = Document.parent(struct)
if parent do
put_in(header[type]["_parent"], parent)
else
header
end
end
defp config do
Application.get_all_env(:elasticsearch)
end
end

128
lib/elasticsearch/config.ex Normal file
View file

@ -0,0 +1,128 @@
defmodule Elasticsearch.Config do
@moduledoc """
Conveniences for fetching configuration values for `Elasticsearch`.
"""
alias Elasticsearch.DataLoader
@doc """
Returns the configured Elasticsearch URL.
## Configuration
config :elasticsearch,
url: "http://localhost:9200"
System tuples are also supported:
config :elasticsearch,
url: {:system, "ELASTICSEARCH_URL"}
## Example
iex> Config.url()
"http://localhost:9200"
"""
@spec url :: String.t
def url do
from_env(:elasticsearch, :url)
end
@doc """
Returns HTTP basic credential header contents based on the configured
`:username` and `:password`.
## Configuration
config :elasticsearch,
username: "username",
password: "password"
System tuples are also supported:
config :elasticsearch,
username: {:system, "ELASTICSEARCH_USERNAME"},
password: {:system, "ELASTICSEARCH_PASSWORD"}
## Example
iex> Config.http_basic_credentials()
"dXNlcm5hbWU6cGFzc3dvcmQ="
"""
@spec http_basic_credentials :: String.t | nil
def http_basic_credentials do
username = from_env(:elasticsearch, :username)
password = from_env(:elasticsearch, :password)
if username && password do
Base.encode64("#{username}:#{password}")
end
end
@doc """
Generates a name for an index that will be aliased to a given `alias`.
Similar to migrations, the name will contain a timestamp.
## Example
Config.build_index_name("main")
# => "main-1509581256"
"""
@spec build_index_name(String.t) :: String.t
def build_index_name(alias) do
"#{alias}-#{system_timestamp()}"
end
@doc """
Gets the full configuration for a given index.
## Configuration
config :elasticsearch,
indexes: %{
index1: %{
alias: "index1_alias",
schema: "priv/elasticsearch/index1.json",
sources: [Type1]
}
}
## Example
iex> Config.config_for_index(:index1)
%{
alias: "index1_alias",
schema: "priv/elasticsearch/index1.json",
sources: [Type1]
}
"""
@spec config_for_index(atom) ::
%{alias: String.t, schema: String.t, sources: [DataLoader.source]} |
nil
def config_for_index(index) do
all()[:indexes][index]
end
def all do
Application.get_all_env(:elasticsearch)
end
@doc """
A light wrapper around `Application.get_env/2`, providing automatic support for
`{:system, "VAR"}` tuples.
"""
@spec from_env(atom, atom, any) :: any
def from_env(otp_app, key, default \\ nil)
def from_env(otp_app, key, default) do
otp_app
|> Application.get_env(key, default)
|> read_from_system(default)
end
defp read_from_system({:system, env}, default), do: System.get_env(env) || default
defp read_from_system(value, _default), do: value
defp system_timestamp do
DateTime.to_unix(DateTime.utc_now)
end
end

View file

@ -0,0 +1,8 @@
defmodule Elasticsearch.DataLoader do
@type source :: any
@type data :: any
@type offset :: integer
@type limit :: integer
@callback load(source, offset, limit) :: [data]
end

View file

@ -1,54 +0,0 @@
defprotocol Elasticsearch.DataSource do
@moduledoc """
A protocol for fetching structs from a database to insert into Elasticsearch.
Each struct that is returned must implement `Elasticsearch.Document`.
## Configuration
The `Elasticsearch.DataSource` protocol will be used to fetch data from each
`:source` specified in the `:sources` in your index configuration:
config :elasticsearch,
indexes: %{
index1: %{
alias: "index1_alias",
schema: "priv/elasticsearch/index1.json",
sources: [MyApp.SchemaName] # Each source must implement `DataSource`
}
}
## Example
Since `:sources` will usually be a list of atoms, you can implement the
`Elasticsearch.DataSource` protocol for `Atom`:
defimpl Elasticsearch.DataSource, for: Atom do
import Ecto.Query
alias MyApp.Repo
def fetch(MyApp.SchemaName = module, offset, limit) do
module
|> offset(^offset)
|> limit(^limit)
|> Repo.all
end
end
If different modules should fetch their data differently, you can simply
add additional `fetch` definitions:
def fetch(MyApp.AnotherSchema = module, offset, limit) do
module
# ... custom logic here
end
"""
@type t :: any
@doc """
Returns a list of structs for the data source, based on `limit` and `offset`.
The structs returned must implement `Elasticsearch.Document`.
"""
@spec fetch(t, integer, integer) :: [map]
def fetch(source, offset, limit)
end

View file

@ -1,19 +1,23 @@
defmodule Elasticsearch.DataStream do
@moduledoc """
Functions for building `Stream`s out of `Elasticsearch.DataSource`s.
See `stream/1` for details.
Functions for building `Stream`s using the configured
`Elasticsearch.DataLoader`.
config :elasticsearch,
# A module that implements the Elasticsearch.DataLoader behaviour
loader: MyApp.ElasticsearchLoader
"""
alias Elasticsearch.DataSource
@type source :: any
@doc """
Creates a `Stream` from a given `Elasticsearch.DataSource`.
Creates a `Stream` from a given source.
## Configuration
You must first implement the `Elasticsearch.DataSource` protocol for the
source that you want to stream. The stream will be paginated based on
the `:bulk_page_size` in the configuration.
Your configured `:loader` module must handle the given data source.
The stream will be paginated based on the `:bulk_page_size` in the
configuration.
config :elasticsearch,
bulk_page_size: 5000
@ -25,7 +29,7 @@ defmodule Elasticsearch.DataStream do
true
"""
@spec stream(DataSource.t) :: Stream.t
@spec stream(source) :: Stream.t
def stream(source) do
Stream.resource(&init/0, &next(&1, source), &finish/1)
end
@ -37,9 +41,9 @@ defmodule Elasticsearch.DataStream do
{[], 0, config()[:bulk_page_size]}
end
# If no items, fetch another page of items
# If no items, load another page of items
defp next({[], offset, limit}, source) do
fetch_page(source, offset, limit)
load_page(source, offset, limit)
end
# If there are items, return the next item, and set the new state equal to
@ -49,16 +53,16 @@ defmodule Elasticsearch.DataStream do
end
# Fetch a new page of items
defp fetch_page(source, offset, limit) do
defp load_page(source, offset, limit) do
page_size = config()[:bulk_page_size]
case DataSource.fetch(source, offset, limit) do
# If the fetch returns no more items (i.e., we've iterated through them
case config()[:loader].load(source, offset, limit) do
# If the load returns no more items (i.e., we've iterated through them
# all) then halt the stream and leave offset and limit unchanged.
[] ->
{:halt, {[], offset, limit}}
# If the fetch returns items, then return the first item, and put the
# If the load returns items, then return the first item, and put the
# tail into the state. Also, increment offset and limit by the
# configured `:bulk_page_size`.
[h | t] ->

View file

@ -11,28 +11,54 @@ defmodule Elasticsearch.Exception do
:col,
:message,
:type,
:query
:query,
:raw
]
@enforce_keys @keys
defexception @keys
def exception(opts \\ []) do
%Exception{
status: opts[:response]["status"],
line: get_in(opts[:response], ["error", "line"]),
col: get_in(opts[:response], ["error", "col"]),
message: get_in(opts[:response], ["error", "reason"]),
type: get_in(opts[:response], ["error", "root_cause", Access.at(0), "type"]),
query: opts[:query]
}
attrs = build(opts[:response], opts[:query])
struct(Exception, attrs)
end
def message(exception) do
"""
(#{exception.type}) #{exception.message}
type = if exception.type, do: "(#{exception.type})"
msg = if exception.message, do: exception.message
#{inspect(exception.query)}
"""
[type, msg]
|> Enum.reject(&is_nil/1)
|> Enum.join(" ")
end
defp build(response, query) when is_map(response) do
[
status: response["status"],
line: get_in(response, ["error", "line"]),
col: get_in(response, ["error", "col"]),
message: get_in(response, ["error", "reason"]),
type: type(response),
raw: response,
query: query
]
end
defp build(response, query) when is_binary(response) do
[
status: nil,
line: nil,
col: nil,
message: response,
type: nil,
query: query
]
end
defp type(%{"error" => %{"root_cause" => causes}}) do
get_in(causes, [Access.at(0), "type"])
end
defp type(%{"error" => %{"type" => type}}) do
type
end
end

View file

@ -21,6 +21,8 @@ defmodule Elasticsearch.Query do
alias __MODULE__
@enforce_keys [:indexes, :types, :query]
defstruct indexes: [],
types: [],
query: %{}

View file

@ -0,0 +1,122 @@
defmodule Mix.Tasks.Elasticsearch.Build do
@moduledoc """
Builds Elasticsearch indexes using a zero-downtime, hot-swap technique.
"""
require Logger
alias Elasticsearch.Config
@doc false
def run(args) do
Mix.Task.run("app.start", [])
{indexes, type} = parse_args!(args)
for index <- indexes do
config = Config.config_for_index(index)
build(config, type)
end
end
defp build(config, :existing) do
case Elasticsearch.latest_index_starting_with(config[:alias]) do
{:ok, index_name} ->
IO.puts("Index already exists: #{index_name}")
{:error, :not_found} ->
build(config, :rebuild)
{:error, exception} ->
Mix.raise(exception)
end
end
defp build(config, :rebuild) do
index_name = Config.build_index_name(config[:alias])
with :ok <- Elasticsearch.create_index(index_name, config[:schema]),
:ok <- Elasticsearch.Bulk.upload(index_name, config[:sources]),
:ok <- Elasticsearch.alias_index(index_name, config[:alias]),
:ok <- Elasticsearch.refresh_index(index_name) do
:ok
else
{:error, errors} when is_list(errors) ->
errors = for error <- errors, do: "#{inspect(error)}\n"
Mix.raise """
Index created, but not aliased: #{index_name}
The following errors occurred:
#{errors}
"""
{:error, :enoent} ->
Mix.raise """
Schema file not found at #{config[:schema]}.
"""
{:error, exception} ->
Mix.raise """
Index #{index_name} could not be created.
#{inspect exception}
"""
error ->
Mix.raise(error)
end
end
defp parse_args!(args) do
{options, indexes} =
OptionParser.parse!(args, switches: [
existing: :boolean
])
indexes =
indexes
|> Enum.map(&String.to_atom/1)
|> MapSet.new
type =
cond do
options[:existing] ->
:existing
true ->
:rebuild
end
validate_indexes!(indexes)
{indexes, type}
end
defp validate_indexes!(indexes) do
configured = configured_index_names()
cond do
MapSet.size(indexes) == 0 ->
Mix.raise """
No indexes specified. The following indexes are configured:
#{inspect Enum.to_list(configured)}
"""
MapSet.subset?(indexes, configured) == false ->
Mix.raise """
The following indexes are not configured:
#{inspect Enum.to_list(MapSet.difference(indexes, configured))}
"""
true ->
:ok
end
end
defp configured_index_names do
config()
|> Keyword.get(:indexes)
|> Enum.map(fn {key, _val} -> key end)
|> MapSet.new
end
defp config do
Application.get_all_env(:elasticsearch)
end
end

14
mix.exs
View file

@ -7,6 +7,8 @@ defmodule Elasticsearch.Mixfile do
version: "0.1.0",
elixir: "~> 1.5",
start_permanent: Mix.env == :prod,
elixirc_paths: elixirc_paths(Mix.env),
docs: docs(),
deps: deps()
]
end
@ -19,12 +21,24 @@ defmodule Elasticsearch.Mixfile do
]
end
# Specifies which paths to compile per environment
defp elixirc_paths(env) when env in ~w(test dev)a, do: ["lib", "test/support"]
defp elixirc_paths(_), do: ["lib"]
# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:poison, ">= 0.0.0"},
{:httpoison, ">= 0.0.0"},
{:dialyze, ">= 0.0.0", only: :dev},
{:ex_doc, ">= 0.0.0", only: :dev}
]
end
defp docs do
[
main: "README",
extras: ["README.md"]
]
end
end

View file

@ -1,4 +1,12 @@
%{"dialyze": {:hex, :dialyze, "0.2.1", "9fb71767f96649020d769db7cbd7290059daff23707d6e851e206b1fdfa92f9d", [], [], "hexpm"},
%{"certifi": {:hex, :certifi, "2.0.0", "a0c0e475107135f76b8c1d5bc7efb33cd3815cb3cf3dea7aefdd174dabead064", [], [], "hexpm"},
"dialyze": {:hex, :dialyze, "0.2.1", "9fb71767f96649020d769db7cbd7290059daff23707d6e851e206b1fdfa92f9d", [], [], "hexpm"},
"earmark": {:hex, :earmark, "1.2.3", "206eb2e2ac1a794aa5256f3982de7a76bf4579ff91cb28d0e17ea2c9491e46a4", [], [], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.18.1", "37c69d2ef62f24928c1f4fdc7c724ea04aecfdf500c4329185f8e3649c915baf", [], [{:earmark, "~> 1.1", [hex: :earmark, repo: "hexpm", optional: false]}], "hexpm"},
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}}
"hackney": {:hex, :hackney, "1.10.1", "c38d0ca52ea80254936a32c45bb7eb414e7a96a521b4ce76d00a69753b157f21", [], [{:certifi, "2.0.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "5.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "1.0.1", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "1.1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm"},
"httpoison": {:hex, :httpoison, "0.13.0", "bfaf44d9f133a6599886720f3937a7699466d23bb0cd7a88b6ba011f53c6f562", [], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm"},
"idna": {:hex, :idna, "5.1.0", "d72b4effeb324ad5da3cab1767cb16b17939004e789d8c0ad5b70f3cea20c89a", [], [{:unicode_util_compat, "0.3.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [], [], "hexpm"},
"mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [], [], "hexpm"},
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.1", "28a4d65b7f59893bc2c7de786dec1e1555bd742d336043fe644ae956c3497fbe", [], [], "hexpm"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.3.1", "a1f612a7b512638634a603c8f401892afbf99b8ce93a45041f8aaca99cadb85e", [], [], "hexpm"}}

View file

@ -0,0 +1,14 @@
{
"mappings": {
"type1": {
"properties": {
"name": {
"type": "string"
},
"author": {
"type": "string"
}
}
}
}
}

View file

@ -0,0 +1,14 @@
{
"mappings": {
"type2": {
"properties": {
"name": {
"type": "string"
},
"author": {
"type": "string"
}
}
}
}
}

View file

@ -0,0 +1,7 @@
defmodule Elasticsearch.BulkTest do
use ExUnit.Case
alias Elasticsearch.Bulk
doctest Elasticsearch.Bulk
end

View file

@ -0,0 +1,7 @@
defmodule Elasticsearch.ConfigTest do
use ExUnit.Case
alias Elasticsearch.Config
doctest Elasticsearch.Config
end

View file

@ -1,8 +1,15 @@
defmodule ElasticsearchTest do
use ExUnit.Case
alias Elasticsearch.Query
doctest Elasticsearch
test "greets the world" do
assert Elasticsearch.hello() == :world
setup do
on_exit fn ->
for index <- ["test1", "test2", "nonexistent"] do
Elasticsearch.delete("/#{index}")
end
end
end
end

View file

@ -0,0 +1,64 @@
defmodule Mix.Tasks.Elasticsearch.BuildTest do
use ExUnit.Case
import Mix.Task, only: [rerun: 2]
import ExUnit.CaptureIO
alias Elasticsearch
setup do
on_exit fn ->
for index <- ["index1_alias"] do
Elasticsearch.delete("/#{index}")
end
end
end
describe ".run" do
test "raises error on invalid options" do
assert_raise Mix.Error, fn ->
rerun("elasticsearch.build", ["--fake"])
end
end
test "raises error on unconfigured indexes" do
assert_raise Mix.Error, fn ->
rerun("elasticsearch.build", ["nonexistent"])
end
end
test "raises error if no index specified" do
assert_raise Mix.Error, fn ->
rerun("elasticsearch.build", [])
end
end
test "builds configured index" do
rerun("elasticsearch.build", ["index1"])
resp = Elasticsearch.get!("/index1_alias/_search")
assert resp["hits"]["total"] == 10_000
end
test "only keeps two index versions" do
for _ <- 1..3 do
rerun("elasticsearch.build", ["index1"])
:timer.sleep(1000)
end
{:ok, indexes} = Elasticsearch.indexes_starting_with("index1")
assert length(indexes) == 2
end
test "--existing checks if index exists" do
rerun("elasticsearch.build", ["index1"])
io =
capture_io fn ->
rerun("elasticsearch.build", ["index1", "--existing"])
end
assert io =~ "Index already exists: index1_alias-"
end
end
end

View file

@ -0,0 +1,21 @@
defmodule Elasticsearch.Test.DataLoader do
@moduledoc false
@behaviour Elasticsearch.DataLoader
def load(Type1, _offset, limit) when limit <= 10_000 do
[%Type1{name: "Name", author: "Author"}]
|> Stream.cycle()
|> Stream.map(&Map.put(&1, :id, random_str()))
|> Enum.take(5000)
end
def load(_module, _offset, _limit) do
[]
end
defp random_str do
32
|> :crypto.strong_rand_bytes()
|> Base.encode64()
end
end

16
test/support/type1.ex Normal file
View file

@ -0,0 +1,16 @@
defmodule Type1 do
@moduledoc false
defstruct id: nil, name: nil, author: nil
end
defimpl Elasticsearch.Document, for: Type1 do
def id(item), do: item.id
def type(_item), do: "type1"
def parent(_item), do: false
def encode(item) do
%{
name: item.name,
author: item.author
}
end
end

16
test/support/type2.ex Normal file
View file

@ -0,0 +1,16 @@
defmodule Type2 do
@moduledoc false
defstruct name: nil, author: nil
end
defimpl Elasticsearch.Document, for: Type2 do
def id(item), do: item.name
def type(_item), do: "type1"
def parent(_item), do: false
def encode(item) do
%{
name: item.name,
author: item.author
}
end
end