Add more useful functions to Elasticsearch

This commit is contained in:
Daniel Berkompas 2017-11-02 14:25:07 -07:00
parent b75fd2ed90
commit 13d98ad1d1
6 changed files with 139 additions and 251 deletions

View file

@ -1,115 +1,119 @@
defmodule Elasticsearch do
@moduledoc """
An Elixir interface to the Elasticsearch API.
"""
alias Elasticsearch.{
Query
}
alias Elasticsearch.Document
@type response ::
{:ok, map} |
{:error, Elasticsearch.Exception.t}
@doc """
Executes an `Elasticsearch.Query`, and returns the response.
Creates an index with the given name from either a JSON string or Elixir map.
## Example
## Examples
query = %Query{
indexes: [:index1],
types: [:type1],
query: %{
"size" => 1,
"query" => %{
"match_all" => %{}
}
}
}
Elasticsearch.execute(query)
# => {:ok, %{
# "_shards" => %{
# "failed" => 0,
# "successful" => 5,
# "total" => 5
# },
# "hits" => %{
# "hits" => [%{
# "_id" => "89phLzwlKSMUqKbTYoswsncqEb5vWdfDlteg+HuFLG4=",
# "_index" => "index1_alias-1509582436", "_score" => 1.0,
# "_source" => %{
# "author" => "Author",
# "name" => "Name"
# },
# "_type" => "type1"
# }],
# "max_score" => 1.0,
# "total" => 10000
# },
# "timed_out" => false,
# "took" => 1
# }}
"""
@spec execute(Query.t) :: response
def execute(query) do
post("#{Query.url(query)}", query.query)
end
@doc """
Same as `execute/1`, but raises errors.
## Example
iex> query = %Query{
...> indexes: [:index1],
...> types: [:type1],
...> query: %{"query" => %{"match_all" => %{}}}
...> }
...> Elasticsearch.execute!(query)
** (Elasticsearch.Exception) (index_not_found_exception) no such index
"""
@spec execute!(Query.t) :: map
def execute!(query) do
case execute(query) do
{:ok, response} ->
response
{:error, error} ->
raise error
end
end
@doc """
Creates an index with the given name from a JSON schema file.
## Example
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index("test1", "{}")
:ok
iex> Elasticsearch.create_index("test2", "nonexistent.json")
iex> Elasticsearch.create_index("test1", %{})
:ok
"""
@spec create_index(String.t, map | String.t) ::
:ok |
{:error, Elasticsearch.Exception.t}
def create_index(name, settings) do
with {:ok, _response} <- put("/#{name}", settings), do: :ok
end
@doc """
Creates an index with the given name, with settings loaded from a JSON file.
## Example
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
:ok
iex> Elasticsearch.create_index_from_file("test2", "nonexistent.json")
{:error, :enoent}
"""
@spec create_index(String.t, Path.t) ::
@spec create_index_from_file(String.t, Path.t) ::
:ok |
{:error, File.posix} |
{:error, Elasticsearch.Exception.t}
def create_index(name, schema) do
with {:ok, contents} <- File.read(schema),
{:ok, _response} <- put("/#{name}", contents) do
:ok
def create_index_from_file(name, file) do
with {:ok, settings} <- File.read(file) do
create_index(name, settings)
end
end
@doc """
Creates or updates a document in a given index.
The document must implement the `Elasticsearch.Document` protocol.
## Example
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
...> struct = %Type1{id: 123, name: "Post", author: "Author"}
...> Elasticsearch.put_document(struct, "test1")
{:ok,
%{"_id" => "123", "_index" => "test1",
"_shards" => %{"failed" => 0, "successful" => 1, "total" => 2},
"_type" => "type1", "_version" => 1, "created" => true,
"result" => "created"}}
"""
@spec put_document(Document.t, String.t) :: response
def put_document(document, index) do
document
|> document_url(index)
|> put(Document.encode(document))
end
@doc """
Same as `put_document/2`, but raises on errors.
"""
@spec put_document!(Document.t, String.t) :: map
def put_document!(document, index) do
document
|> put_document(index)
|> unwrap!()
end
@doc """
Deletes a document from a given index.
The document must implement the `Elasticsearch.Document` protocol.
"""
@spec delete_document(Document.t, String.t) :: response
def delete_document(document, index) do
document
|> document_url(index)
|> delete()
end
@doc """
Same as `delete_document/2`, but raises on errors.
"""
@spec delete_document!(Document.t, String.t) :: map
def delete_document!(document, index) do
document
|> delete_document(index)
|> unwrap!()
end
defp document_url(document, index) do
"/#{index}/#{Document.type(document)}/#{Document.id(document)}"
end
@doc """
Assigns an alias to a given index, simultaneously removing it from prior
indexes, with zero downtime.
The previous index will be preserved, to make it easier to rollback to
an earlier index.
## Example
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.alias_index("test1", "test")
:ok
"""
@ -132,13 +136,7 @@ defmodule Elasticsearch do
]
}
# Delete all but the most recent index
indexes_to_delete = indexes -- [List.last(indexes)]
with {:ok, _} <- post("/_aliases", actions),
:ok <- delete_indexes(indexes_to_delete) do
:ok
end
with {:ok, _response} <- post("/_aliases", actions), do: :ok
end
end
@ -147,8 +145,8 @@ defmodule Elasticsearch do
## Example
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.create_index("test2", "priv/elasticsearch/index2.json")
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.create_index_from_file("test2", "priv/elasticsearch/index2.json")
...> Elasticsearch.indexes_starting_with("test")
{:ok, ["test1", "test2"]}
"""
@ -169,8 +167,8 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.create_index("test2", "priv/elasticsearch/index2.json")
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.create_index_from_file("test2", "priv/elasticsearch/index2.json")
...> Elasticsearch.latest_index_starting_with("test")
{:ok, "test2"}
@ -202,7 +200,7 @@ defmodule Elasticsearch do
## Example
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.refresh_index("test1")
:ok
"""
@ -218,7 +216,7 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.refresh_index!("test1")
:ok
@ -236,55 +234,49 @@ defmodule Elasticsearch do
end
@doc """
Deletes multiple indexes in one function call.
Removes indexes starting with the given prefix, keeping a certain number.
If you only need to delete one index, see either of these two functions
instead:
- `delete/1`
- `delete!/1`
Can be used to garbage collect old indexes that are no longer used.
## Examples
If any given index fails to delete, a list of `Elasticsearch.Exception`s will
be returned.
If there is only one index, and `num_to_keep` is >= 1, the index is not deleted.
iex> Elasticsearch.delete_indexes(["nonexistent"])
{:error,
[%Elasticsearch.Exception{col: nil, line: nil,
message: "no such index", query: nil,
raw: %{"error" => %{"index" => "nonexistent",
"index_uuid" => "_na_", "reason" => "no such index",
"resource.id" => "nonexistent",
"resource.type" => "index_or_alias",
"root_cause" => [%{"index" => "nonexistent",
"index_uuid" => "_na_", "reason" => "no such index",
"resource.id" => "nonexistent",
"resource.type" => "index_or_alias",
"type" => "index_not_found_exception"}],
"type" => "index_not_found_exception"}, "status" => 404},
status: 404, type: "index_not_found_exception"}]}
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.clean_indexes_starting_with("test", 1)
...> Elasticsearch.indexes_starting_with("test")
{:ok, ["test1"]}
Otherwise, you'll get `:ok`:
If `num_to_keep` is less than the number of indexes, the older indexes are
deleted.
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.delete_indexes(["test1"])
:ok
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.clean_indexes_starting_with("test", 0)
...> Elasticsearch.indexes_starting_with("test")
{:ok, []}
"""
@spec delete_indexes([String.t]) ::
@spec clean_indexes_starting_with(String.t, integer) ::
:ok |
{:error, [Elasticsearch.Exception.t]}
def delete_indexes(indexes) do
errors =
indexes
|> Stream.map(&delete("/#{&1}"))
|> Stream.filter(&(elem(&1, 0) == :error))
|> Enum.map(&elem(&1, 1))
def clean_indexes_starting_with(prefix, num_to_keep) when is_integer(num_to_keep) do
with {:ok, indexes} <- indexes_starting_with(prefix) do
total = length(indexes)
num_to_delete = total - num_to_keep
num_to_delete = if num_to_delete >= 0, do: num_to_delete, else: 0
if errors == [] do
:ok
else
{:error, errors}
errors =
indexes
|> Enum.sort()
|> Enum.take(num_to_delete)
|> Enum.map(&delete("/#{&1}"))
|> Enum.filter(&elem(&1, 0) == :error)
|> Enum.map(&elem(&1, 1))
if length(errors) > 0 do
{:error, errors}
else
:ok
end
end
end
@ -343,7 +335,7 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.put("/test1/type1/id", %{"name" => "name", "author" => "author"})
{:ok,
%{"_id" => "id", "_index" => "test1",
@ -368,7 +360,7 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.put!("/test1/type1/id", %{"name" => "name", "author" => "author"})
%{"_id" => "id", "_index" => "test1",
"_shards" => %{"failed" => 0, "successful" => 1, "total" => 2},
@ -391,7 +383,7 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
...> query = %{"query" => %{"match_all" => %{}}}
...> Elasticsearch.post("/test1/_search", query)
{:ok,
@ -409,7 +401,7 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
...> query = %{"query" => %{"match_all" => %{}}}
...> Elasticsearch.post!("/test1/_search", query)
%{"_shards" => %{"failed" => 0, "successful" => 5, "total" => 5},
@ -434,7 +426,7 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.delete("/test1")
{:ok, %{"acknowledged" => true}}
@ -466,7 +458,7 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.create_index("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.delete!("/test1")
%{"acknowledged" => true}

View file

@ -1,100 +0,0 @@
defmodule Elasticsearch.Query do
@moduledoc """
Represents an Elasticsearch query.
## Example
%Elasticsearch.Query{
indexes: [:index1, :index2],
types: [:type1, :type2],
query: %{
# Use regular Elasticsearch queries here, straight out of the
# Elasticsearch documentation, just converted to Elixir map syntax.
"query" => %{
"term" => %{
"field" => "value"
}
}
}
}
"""
alias __MODULE__
@enforce_keys [:indexes, :types, :query]
defstruct indexes: [],
types: [],
query: %{}
@type t :: %Query{
indexes: [atom],
types: [atom],
query: map
}
@doc """
Returns the Elasticsearch API path a given query should POST to. Respects
aliases for indexes.
## Example
iex> query = %Elasticsearch.Query{
...> indexes: [:index1, :index2],
...> types: [:type1, :type2],
...> query: %{"query" => %{}}
...> }
...> Query.url(query)
"/index1_alias,index2_alias/type1,type2/_search"
"""
@spec url(Query.t) :: String.t
def url(query) do
indexes =
query.indexes
|> Enum.map(&(config()[&1][:alias]))
|> Enum.join(",")
types = Enum.join(query.types, ",")
"/#{indexes}/#{types}/_search"
end
@doc """
Converts a query to a string that can be copy/pasted into Kibana for manual
testing.
## Example
iex> query = %Elasticsearch.Query{
...> indexes: [:index1, :index2],
...> types: [:type1, :type2],
...> query: %{
...> "query" => %{
...> "term" => %{ "field1" => "value" }
...> }
...> }
...> }
...> Query.to_string(query)
\"\"\"
POST /index1_alias,index2_alias/type1,type2/_search
{
"query": {
"term": {
"field1": "value"
}
}
}
\"\"\"
"""
@spec to_string(Query.t) :: String.t
def to_string(%Query{} = query) do
"""
POST #{url(query)}
#{Poison.encode!(query.query, pretty: true)}
"""
end
defp config do
Application.get_env(:elasticsearch, :indexes)
end
end

View file

@ -33,9 +33,10 @@ defmodule Mix.Tasks.Elasticsearch.Build do
defp build(config, :rebuild) do
index_name = Config.build_index_name(config[:alias])
with :ok <- Elasticsearch.create_index(index_name, config[:schema]),
with :ok <- Elasticsearch.create_index_from_file(index_name, config[:schema]),
:ok <- Elasticsearch.Bulk.upload(index_name, config[:sources]),
:ok <- Elasticsearch.alias_index(index_name, config[:alias]),
:ok <- Elasticsearch.clean_indexes_starting_with(config[:alias], 2),
:ok <- Elasticsearch.refresh_index(index_name) do
:ok
else

View file

@ -1,7 +0,0 @@
defmodule Elasticsearch.QueryTest do
use ExUnit.Case
alias Elasticsearch.Query
doctest Elasticsearch.Query
end

View file

@ -1,8 +1,6 @@
defmodule ElasticsearchTest do
use ExUnit.Case
alias Elasticsearch.Query
doctest Elasticsearch
setup do

View file

@ -48,6 +48,10 @@ defmodule Mix.Tasks.Elasticsearch.BuildTest do
{:ok, indexes} = Elasticsearch.indexes_starting_with("index1")
assert length(indexes) == 2
[_previous, current] = Enum.sort(indexes)
# assert that the most recent index is the one that is aliased
assert {:ok, %{^current => _}} = Elasticsearch.get("/index1_alias/_alias")
end
test "--existing checks if index exists" do