Format codebase
Using more recent best practices.
This commit is contained in:
parent
7a329acb66
commit
a33e809557
22 changed files with 192 additions and 145 deletions
6
.formatter.exs
Normal file
6
.formatter.exs
Normal file
|
@ -0,0 +1,6 @@
|
|||
[
|
||||
inputs: [
|
||||
"{lib,config,test}/**/*.{ex,exs}"
|
||||
],
|
||||
locals_without_parens: []
|
||||
]
|
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -20,3 +20,4 @@ erl_crash.dump
|
|||
*.ez
|
||||
|
||||
vendor/
|
||||
.elixir_ls/
|
||||
|
|
|
@ -49,7 +49,7 @@ echo "----------------------------------------------------------"
|
|||
echo "Running tests..."
|
||||
echo "----------------------------------------------------------"
|
||||
|
||||
bin/ci || { exit 1; }
|
||||
bin/test || { exit 1; }
|
||||
|
||||
echo "----------------------------------------------------------"
|
||||
echo "Setup complete!"
|
||||
|
|
|
@ -7,12 +7,13 @@ config :elasticsearch,
|
|||
username: "username",
|
||||
password: "password",
|
||||
bulk_page_size: 5000,
|
||||
bulk_wait_interval: 15_000, # 15 seconds
|
||||
# 15 seconds
|
||||
bulk_wait_interval: 15_000,
|
||||
api_module: Elasticsearch.API.HTTP,
|
||||
indexes: %{
|
||||
posts: %{
|
||||
settings: "test/support/settings/posts.json",
|
||||
loader: Elasticsearch.Test.DataLoader,
|
||||
sources: [Post]
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,8 +6,8 @@ defmodule Elasticsearch do
|
|||
alias Elasticsearch.Document
|
||||
|
||||
@type response ::
|
||||
{:ok, map} |
|
||||
{:error, Elasticsearch.Exception.t}
|
||||
{:ok, map}
|
||||
| {:error, Elasticsearch.Exception.t()}
|
||||
|
||||
@doc """
|
||||
Creates an index with the given name from either a JSON string or Elixir map.
|
||||
|
@ -17,9 +17,9 @@ defmodule Elasticsearch do
|
|||
iex> Elasticsearch.create_index("posts-1", "{}")
|
||||
:ok
|
||||
"""
|
||||
@spec create_index(String.t, map | String.t) ::
|
||||
:ok |
|
||||
{:error, Elasticsearch.Exception.t}
|
||||
@spec create_index(String.t(), map | String.t()) ::
|
||||
:ok
|
||||
| {:error, Elasticsearch.Exception.t()}
|
||||
def create_index(name, settings) do
|
||||
with {:ok, _response} <- put("/#{name}", settings), do: :ok
|
||||
end
|
||||
|
@ -28,17 +28,17 @@ defmodule Elasticsearch do
|
|||
Creates an index with the given name, with settings loaded from a JSON file.
|
||||
|
||||
## Example
|
||||
|
||||
|
||||
iex> Elasticsearch.create_index_from_file("posts-1", "test/support/settings/posts.json")
|
||||
:ok
|
||||
|
||||
iex> Elasticsearch.create_index_from_file("posts-1", "nonexistent.json")
|
||||
{:error, :enoent}
|
||||
"""
|
||||
@spec create_index_from_file(String.t, Path.t) ::
|
||||
:ok |
|
||||
{:error, File.posix} |
|
||||
{:error, Elasticsearch.Exception.t}
|
||||
@spec create_index_from_file(String.t(), Path.t()) ::
|
||||
:ok
|
||||
| {:error, File.posix()}
|
||||
| {:error, Elasticsearch.Exception.t()}
|
||||
def create_index_from_file(name, file) do
|
||||
with {:ok, settings} <- File.read(file) do
|
||||
create_index(name, settings)
|
||||
|
@ -61,7 +61,7 @@ defmodule Elasticsearch do
|
|||
"_type" => "post", "_version" => 1, "created" => true,
|
||||
"result" => "created"}}
|
||||
"""
|
||||
@spec put_document(Document.t, String.t) :: response
|
||||
@spec put_document(Document.t(), String.t()) :: response
|
||||
def put_document(document, index) do
|
||||
document
|
||||
|> document_url(index)
|
||||
|
@ -71,7 +71,7 @@ defmodule Elasticsearch do
|
|||
@doc """
|
||||
Same as `put_document/2`, but raises on errors.
|
||||
"""
|
||||
@spec put_document!(Document.t, String.t) :: map
|
||||
@spec put_document!(Document.t(), String.t()) :: map
|
||||
def put_document!(document, index) do
|
||||
document
|
||||
|> put_document(index)
|
||||
|
@ -83,7 +83,7 @@ defmodule Elasticsearch do
|
|||
|
||||
The document must implement the `Elasticsearch.Document` protocol.
|
||||
"""
|
||||
@spec delete_document(Document.t, String.t) :: response
|
||||
@spec delete_document(Document.t(), String.t()) :: response
|
||||
def delete_document(document, index) do
|
||||
document
|
||||
|> document_url(index)
|
||||
|
@ -93,7 +93,7 @@ defmodule Elasticsearch do
|
|||
@doc """
|
||||
Same as `delete_document/2`, but raises on errors.
|
||||
"""
|
||||
@spec delete_document!(Document.t, String.t) :: map
|
||||
@spec delete_document!(Document.t(), String.t()) :: map
|
||||
def delete_document!(document, index) do
|
||||
document
|
||||
|> delete_document(index)
|
||||
|
@ -109,28 +109,25 @@ defmodule Elasticsearch do
|
|||
indexes, with zero downtime.
|
||||
|
||||
## Example
|
||||
|
||||
|
||||
iex> Elasticsearch.create_index_from_file("posts-1", "test/support/settings/posts.json")
|
||||
...> Elasticsearch.alias_index("posts-1", "posts")
|
||||
:ok
|
||||
"""
|
||||
@spec alias_index(String.t, String.t) ::
|
||||
:ok |
|
||||
{:error, Elasticsearch.Exception.t}
|
||||
@spec alias_index(String.t(), String.t()) ::
|
||||
:ok
|
||||
| {:error, Elasticsearch.Exception.t()}
|
||||
def alias_index(index_name, index_alias) do
|
||||
with {:ok, indexes} <- indexes_starting_with(index_alias),
|
||||
indexes = Enum.reject(indexes, &(&1 == index_name)) do
|
||||
|
||||
remove_actions =
|
||||
Enum.map indexes, fn(index) ->
|
||||
remove_actions =
|
||||
Enum.map(indexes, fn index ->
|
||||
%{"remove" => %{"index" => index, "alias" => index_alias}}
|
||||
end
|
||||
end)
|
||||
|
||||
actions = %{
|
||||
"actions" =>
|
||||
remove_actions ++
|
||||
[%{"add" => %{"index" => index_name, "alias" => index_alias}}
|
||||
]
|
||||
remove_actions ++ [%{"add" => %{"index" => index_name, "alias" => index_alias}}]
|
||||
}
|
||||
|
||||
with {:ok, _response} <- post("/_aliases", actions), do: :ok
|
||||
|
@ -149,15 +146,20 @@ defmodule Elasticsearch do
|
|||
true
|
||||
"""
|
||||
@spec wait_for_boot(integer) ::
|
||||
{:ok, map} |
|
||||
{:error, RuntimeError.t} |
|
||||
{:error, Elasticsearch.Exception.t}
|
||||
{:ok, map}
|
||||
| {:error, RuntimeError.t()}
|
||||
| {:error, Elasticsearch.Exception.t()}
|
||||
def wait_for_boot(tries, count \\ 0)
|
||||
|
||||
def wait_for_boot(tries, count) when count == tries do
|
||||
{:error, RuntimeError.exception("""
|
||||
Elasticsearch could not be found after #{count} tries. Make sure it's running?
|
||||
""")}
|
||||
{
|
||||
:error,
|
||||
RuntimeError.exception("""
|
||||
Elasticsearch could not be found after #{count} tries. Make sure it's running?
|
||||
""")
|
||||
}
|
||||
end
|
||||
|
||||
def wait_for_boot(tries, count) do
|
||||
with {:error, _} <- get("/_cat/health?format=json") do
|
||||
:timer.sleep(1000)
|
||||
|
@ -169,21 +171,21 @@ defmodule Elasticsearch do
|
|||
Returns all indexes which start with a given string.
|
||||
|
||||
## Example
|
||||
|
||||
|
||||
iex> Elasticsearch.create_index_from_file("posts-1", "test/support/settings/posts.json")
|
||||
...> Elasticsearch.indexes_starting_with("posts")
|
||||
{:ok, ["posts-1"]}
|
||||
"""
|
||||
@spec indexes_starting_with(String.t | atom) ::
|
||||
{:ok, [String.t]} |
|
||||
{:error, Elasticsearch.Exception.t}
|
||||
@spec indexes_starting_with(String.t() | atom) ::
|
||||
{:ok, [String.t()]}
|
||||
| {:error, Elasticsearch.Exception.t()}
|
||||
def indexes_starting_with(prefix) do
|
||||
with {:ok, indexes} <- get("/_cat/indices?format=json") do
|
||||
prefix = to_string(prefix)
|
||||
|
||||
indexes =
|
||||
indexes
|
||||
|> Enum.map(&(&1["index"]))
|
||||
|> Enum.map(& &1["index"])
|
||||
|> Enum.filter(&String.starts_with?(&1, prefix))
|
||||
|> Enum.sort()
|
||||
|
||||
|
@ -206,10 +208,10 @@ defmodule Elasticsearch do
|
|||
iex> Elasticsearch.latest_index_starting_with("nonexistent")
|
||||
{:error, :not_found}
|
||||
"""
|
||||
@spec latest_index_starting_with(String.t | atom) ::
|
||||
{:ok, String.t} |
|
||||
{:error, :not_found} |
|
||||
{:error, Elasticsearch.Exception.t}
|
||||
@spec latest_index_starting_with(String.t() | atom) ::
|
||||
{:ok, String.t()}
|
||||
| {:error, :not_found}
|
||||
| {:error, Elasticsearch.Exception.t()}
|
||||
def latest_index_starting_with(prefix) do
|
||||
with {:ok, indexes} <- indexes_starting_with(prefix) do
|
||||
index =
|
||||
|
@ -228,12 +230,12 @@ defmodule Elasticsearch do
|
|||
Refreshes a given index with recently added data.
|
||||
|
||||
## Example
|
||||
|
||||
|
||||
iex> Elasticsearch.create_index_from_file("posts-1", "test/support/settings/posts.json")
|
||||
...> Elasticsearch.refresh_index("posts-1")
|
||||
:ok
|
||||
"""
|
||||
@spec refresh_index(String.t) :: :ok | {:error, Elasticsearch.Exception.t}
|
||||
@spec refresh_index(String.t()) :: :ok | {:error, Elasticsearch.Exception.t()}
|
||||
def refresh_index(index_name) do
|
||||
with {:ok, _} <- post("/#{index_name}/_forcemerge?max_num_segments=5", %{}),
|
||||
{:ok, _} <- post("/#{index_name}/_refresh", %{}),
|
||||
|
@ -244,7 +246,7 @@ defmodule Elasticsearch do
|
|||
Same as `refresh_index/1`, but raises an error on failure.
|
||||
|
||||
## Examples
|
||||
|
||||
|
||||
iex> Elasticsearch.create_index_from_file("posts-1", "test/support/settings/posts.json")
|
||||
...> Elasticsearch.refresh_index!("posts-1")
|
||||
:ok
|
||||
|
@ -252,11 +254,12 @@ defmodule Elasticsearch do
|
|||
iex> Elasticsearch.refresh_index!("nonexistent")
|
||||
** (Elasticsearch.Exception) (index_not_found_exception) no such index
|
||||
"""
|
||||
@spec refresh_index!(String.t) :: :ok
|
||||
@spec refresh_index!(String.t()) :: :ok
|
||||
def refresh_index!(index_name) do
|
||||
case refresh_index(index_name) do
|
||||
:ok ->
|
||||
:ok ->
|
||||
:ok
|
||||
|
||||
{:error, error} ->
|
||||
raise error
|
||||
end
|
||||
|
@ -284,21 +287,21 @@ defmodule Elasticsearch do
|
|||
...> Elasticsearch.indexes_starting_with("posts")
|
||||
{:ok, []}
|
||||
"""
|
||||
@spec clean_indexes_starting_with(String.t, integer) ::
|
||||
:ok |
|
||||
{:error, [Elasticsearch.Exception.t]}
|
||||
@spec clean_indexes_starting_with(String.t(), integer) ::
|
||||
:ok
|
||||
| {:error, [Elasticsearch.Exception.t()]}
|
||||
def clean_indexes_starting_with(prefix, num_to_keep) when is_integer(num_to_keep) do
|
||||
with {:ok, indexes} <- indexes_starting_with(prefix) do
|
||||
total = length(indexes)
|
||||
num_to_delete = total - num_to_keep
|
||||
num_to_delete = if num_to_delete >= 0, do: num_to_delete, else: 0
|
||||
|
||||
errors =
|
||||
errors =
|
||||
indexes
|
||||
|> Enum.sort()
|
||||
|> Enum.take(num_to_delete)
|
||||
|> Enum.map(&delete("/#{&1}"))
|
||||
|> Enum.filter(&elem(&1, 0) == :error)
|
||||
|> Enum.filter(&(elem(&1, 0) == :error))
|
||||
|> Enum.map(&elem(&1, 1))
|
||||
|
||||
if length(errors) > 0 do
|
||||
|
@ -334,7 +337,7 @@ defmodule Elasticsearch do
|
|||
"type" => "index_not_found_exception"}, "status" => 404},
|
||||
status: 404, type: "index_not_found_exception"}}
|
||||
"""
|
||||
@spec get(String.t) :: response
|
||||
@spec get(String.t()) :: response
|
||||
def get(url) do
|
||||
format(api_module().get(url))
|
||||
end
|
||||
|
@ -352,7 +355,7 @@ defmodule Elasticsearch do
|
|||
iex> Elasticsearch.get!("/nonexistent")
|
||||
** (Elasticsearch.Exception) (index_not_found_exception) no such index
|
||||
"""
|
||||
@spec get!(String.t) :: map
|
||||
@spec get!(String.t()) :: map
|
||||
def get!(url) do
|
||||
url
|
||||
|> get()
|
||||
|
@ -363,7 +366,7 @@ defmodule Elasticsearch do
|
|||
Puts data to a given Elasticsearch API path.
|
||||
|
||||
## Examples
|
||||
|
||||
|
||||
iex> Elasticsearch.create_index_from_file("posts-1", "test/support/settings/posts.json")
|
||||
...> Elasticsearch.put("/posts-1/post/id", %{"title" => "title", "author" => "author"})
|
||||
{:ok,
|
||||
|
@ -378,7 +381,7 @@ defmodule Elasticsearch do
|
|||
message: "No handler found for uri [/bad/url] and method [PUT]",
|
||||
query: nil, raw: nil, status: nil, type: nil}}
|
||||
"""
|
||||
@spec put(String.t, map | binary) :: response
|
||||
@spec put(String.t(), map | binary) :: response
|
||||
def put(url, data) do
|
||||
format(api_module().put(url, data))
|
||||
end
|
||||
|
@ -388,7 +391,7 @@ defmodule Elasticsearch do
|
|||
errors.
|
||||
|
||||
## Examples
|
||||
|
||||
|
||||
iex> Elasticsearch.create_index_from_file("posts", "test/support/settings/posts.json")
|
||||
...> Elasticsearch.put!("/posts/post/id", %{"name" => "name", "author" => "author"})
|
||||
%{"_id" => "id", "_index" => "posts",
|
||||
|
@ -399,7 +402,7 @@ defmodule Elasticsearch do
|
|||
iex> Elasticsearch.put!("/bad/url", %{"data" => "here"})
|
||||
** (Elasticsearch.Exception) No handler found for uri [/bad/url] and method [PUT]
|
||||
"""
|
||||
@spec put!(String.t, map) :: map
|
||||
@spec put!(String.t(), map) :: map
|
||||
def put!(url, data) do
|
||||
url
|
||||
|> put(data)
|
||||
|
@ -418,7 +421,7 @@ defmodule Elasticsearch do
|
|||
...> resp["hits"]["hits"]
|
||||
[]
|
||||
"""
|
||||
@spec post(String.t, map) :: response
|
||||
@spec post(String.t(), map) :: response
|
||||
def post(url, data) do
|
||||
format(api_module().post(url, data))
|
||||
end
|
||||
|
@ -440,7 +443,7 @@ defmodule Elasticsearch do
|
|||
...> Elasticsearch.post!("/nonexistent/_search", query)
|
||||
** (Elasticsearch.Exception) (index_not_found_exception) no such index
|
||||
"""
|
||||
@spec post!(String.t, map) :: map
|
||||
@spec post!(String.t(), map) :: map
|
||||
def post!(url, data) do
|
||||
url
|
||||
|> post(data)
|
||||
|
@ -474,7 +477,7 @@ defmodule Elasticsearch do
|
|||
"type" => "index_not_found_exception"}, "status" => 404},
|
||||
status: 404, type: "index_not_found_exception"}}
|
||||
"""
|
||||
@spec delete(String.t) :: response
|
||||
@spec delete(String.t()) :: response
|
||||
def delete(url) do
|
||||
format(api_module().delete(url))
|
||||
end
|
||||
|
@ -483,7 +486,7 @@ defmodule Elasticsearch do
|
|||
Same as `delete/1`, but returns the response and raises errors.
|
||||
|
||||
## Examples
|
||||
|
||||
|
||||
iex> Elasticsearch.create_index_from_file("posts", "test/support/settings/posts.json")
|
||||
...> Elasticsearch.delete!("/posts")
|
||||
%{"acknowledged" => true}
|
||||
|
@ -493,7 +496,7 @@ defmodule Elasticsearch do
|
|||
iex> Elasticsearch.delete!("/nonexistent")
|
||||
** (Elasticsearch.Exception) (index_not_found_exception) no such index
|
||||
"""
|
||||
@spec delete!(String.t) :: map
|
||||
@spec delete!(String.t()) :: map
|
||||
def delete!(url) do
|
||||
url
|
||||
|> delete()
|
||||
|
@ -501,7 +504,7 @@ defmodule Elasticsearch do
|
|||
end
|
||||
|
||||
defp format({:ok, %{status_code: code, body: body}})
|
||||
when code >= 200 and code < 300 do
|
||||
when code >= 200 and code < 300 do
|
||||
{:ok, body}
|
||||
end
|
||||
|
||||
|
@ -513,7 +516,7 @@ defmodule Elasticsearch do
|
|||
defp format(error), do: error
|
||||
|
||||
defp unwrap!({:ok, value}), do: value
|
||||
defp unwrap!({:error, exception}), do: raise exception
|
||||
defp unwrap!({:error, exception}), do: raise(exception)
|
||||
|
||||
defp api_module do
|
||||
config()[:api_module] || Elasticsearch.API.HTTP
|
||||
|
|
|
@ -3,14 +3,14 @@ defmodule Elasticsearch.API do
|
|||
A behaviour that an Elasticsearch API must adhere to.
|
||||
"""
|
||||
|
||||
@type url :: String.t
|
||||
@type data :: map | Keyword.t
|
||||
@type opts :: Keyword.t
|
||||
@type headers :: Keyword.t
|
||||
@type url :: String.t()
|
||||
@type data :: map | Keyword.t()
|
||||
@type opts :: Keyword.t()
|
||||
@type headers :: Keyword.t()
|
||||
|
||||
@type response ::
|
||||
{:ok, HTTPoison.Response.t | HTTPoison.AsyncResponse.t} |
|
||||
{:error, HTTPoison.Error.t}
|
||||
@type response ::
|
||||
{:ok, HTTPoison.Response.t() | HTTPoison.AsyncResponse.t()}
|
||||
| {:error, HTTPoison.Error.t()}
|
||||
|
||||
@callback get(url, headers, opts) :: response
|
||||
@callback put(url, data, headers, opts) :: response
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
defmodule Elasticsearch.API.HTTP do
|
||||
@moduledoc """
|
||||
An HTTP implementation of `Elasticsearch.API`.
|
||||
An HTTP implementation of `Elasticsearch.API`, using `HTTPoison`.
|
||||
"""
|
||||
|
||||
@behaviour Elasticsearch.API
|
||||
|
@ -15,7 +15,7 @@ defmodule Elasticsearch.API.HTTP do
|
|||
|
||||
@doc false
|
||||
def process_url(url) do
|
||||
Config.url <> url
|
||||
Config.url() <> url
|
||||
end
|
||||
|
||||
def process_request_headers(_headers) do
|
||||
|
@ -32,6 +32,7 @@ defmodule Elasticsearch.API.HTTP do
|
|||
|
||||
@doc false
|
||||
def process_request_body(string) when is_binary(string), do: string
|
||||
|
||||
def process_request_body(map) when is_map(map) do
|
||||
Poison.encode!(map)
|
||||
end
|
||||
|
|
|
@ -21,9 +21,9 @@ defmodule Elasticsearch.Builder do
|
|||
...> Builder.hot_swap_index("posts", file, loader, [Post])
|
||||
:ok
|
||||
"""
|
||||
@spec hot_swap_index(String.t | atom, String.t, Elasticsearch.DataLoader.t, list) ::
|
||||
:ok |
|
||||
{:error, Elasticsearch.Exception.t}
|
||||
@spec hot_swap_index(String.t() | atom, String.t(), Elasticsearch.DataLoader.t(), list) ::
|
||||
:ok
|
||||
| {:error, Elasticsearch.Exception.t()}
|
||||
def hot_swap_index(alias, settings_file, loader, sources) do
|
||||
index_name = build_index_name(alias)
|
||||
|
||||
|
@ -32,8 +32,8 @@ defmodule Elasticsearch.Builder do
|
|||
:ok <- Elasticsearch.alias_index(index_name, alias),
|
||||
:ok <- Elasticsearch.clean_indexes_starting_with(alias, 2),
|
||||
:ok <- Elasticsearch.refresh_index(index_name) do
|
||||
:ok
|
||||
end
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
|
@ -45,12 +45,12 @@ defmodule Elasticsearch.Builder do
|
|||
Config.build_index_name("main")
|
||||
# => "main-1509581256"
|
||||
"""
|
||||
@spec build_index_name(String.t | atom) :: String.t
|
||||
@spec build_index_name(String.t() | atom) :: String.t()
|
||||
def build_index_name(alias) do
|
||||
"#{alias}-#{system_timestamp()}"
|
||||
end
|
||||
|
||||
defp system_timestamp do
|
||||
DateTime.to_unix(DateTime.utc_now)
|
||||
DateTime.to_unix(DateTime.utc_now())
|
||||
end
|
||||
end
|
||||
|
|
|
@ -15,7 +15,7 @@ defmodule Elasticsearch.Bulk do
|
|||
must implement `Elasticsearch.Document`.
|
||||
|
||||
## Examples
|
||||
|
||||
|
||||
iex> Bulk.encode(%Post{id: "my-id"}, "my-index")
|
||||
{:ok, \"\"\"
|
||||
{"create":{"_type":"post","_index":"my-index","_id":"my-id"}}
|
||||
|
@ -27,9 +27,9 @@ defmodule Elasticsearch.Bulk do
|
|||
%Protocol.UndefinedError{description: "",
|
||||
protocol: Elasticsearch.Document, value: 123}}
|
||||
"""
|
||||
@spec encode(struct, String.t) ::
|
||||
{:ok, String.t} |
|
||||
{:error, Error.t}
|
||||
@spec encode(struct, String.t()) ::
|
||||
{:ok, String.t()}
|
||||
| {:error, Error.t()}
|
||||
def encode(struct, index) do
|
||||
{:ok, encode!(struct, index)}
|
||||
rescue
|
||||
|
@ -41,7 +41,7 @@ defmodule Elasticsearch.Bulk do
|
|||
Same as `encode/1`, but returns the request and raises errors.
|
||||
|
||||
## Example
|
||||
|
||||
|
||||
iex> Bulk.encode!(%Post{id: "my-id"}, "my-index")
|
||||
\"\"\"
|
||||
{"create":{"_type":"post","_index":"my-index","_id":"my-id"}}
|
||||
|
@ -56,8 +56,8 @@ defmodule Elasticsearch.Bulk do
|
|||
|
||||
document =
|
||||
struct
|
||||
|> Document.encode
|
||||
|> Poison.encode!
|
||||
|> Document.encode()
|
||||
|> Poison.encode!()
|
||||
|
||||
"#{header}\n#{document}\n"
|
||||
end
|
||||
|
@ -66,10 +66,11 @@ defmodule Elasticsearch.Bulk do
|
|||
Uploads all the data from the list of `sources` to the given index.
|
||||
Data for each `source` will be fetched using the configured `:loader`.
|
||||
"""
|
||||
@spec upload(String.t, Elasticsearch.DataLoader.t, list) :: :ok | {:error, [map]}
|
||||
@spec upload(String.t(), Elasticsearch.DataLoader.t(), list) :: :ok | {:error, [map]}
|
||||
def upload(index_name, loader, sources, errors \\ [])
|
||||
def upload(_index_name, _loader, [], []), do: :ok
|
||||
def upload(_index_name, _loader, [], errors), do: {:error, errors}
|
||||
|
||||
def upload(index_name, loader, [source | tail] = _sources, errors) do
|
||||
errors =
|
||||
source
|
||||
|
@ -83,17 +84,19 @@ defmodule Elasticsearch.Bulk do
|
|||
end
|
||||
|
||||
defp collect_errors({:ok, %{"errors" => true} = response}, errors) do
|
||||
new_errors =
|
||||
new_errors =
|
||||
response["items"]
|
||||
|> Enum.filter(&(&1["create"]["error"] != nil))
|
||||
|> Enum.map(&(&1["create"]))
|
||||
|> Enum.map(& &1["create"])
|
||||
|> Enum.map(&Elasticsearch.Exception.exception(response: &1))
|
||||
|
||||
new_errors ++ errors
|
||||
end
|
||||
|
||||
defp collect_errors({:error, error}, errors) do
|
||||
[error | errors]
|
||||
end
|
||||
|
||||
defp collect_errors(_response, errors) do
|
||||
errors
|
||||
end
|
||||
|
@ -105,7 +108,7 @@ defmodule Elasticsearch.Bulk do
|
|||
"_id" => Document.id(struct)
|
||||
}
|
||||
|
||||
header =
|
||||
header =
|
||||
%{}
|
||||
|> Map.put(type, attrs)
|
||||
|> put_parent(type, struct)
|
||||
|
|
|
@ -23,7 +23,7 @@ defmodule Elasticsearch.Config do
|
|||
iex> Config.url()
|
||||
"http://localhost:9200"
|
||||
"""
|
||||
@spec url :: String.t
|
||||
@spec url :: String.t()
|
||||
def url do
|
||||
from_env(:elasticsearch, :url)
|
||||
end
|
||||
|
@ -33,7 +33,7 @@ defmodule Elasticsearch.Config do
|
|||
`:username` and `:password`.
|
||||
|
||||
## Configuration
|
||||
|
||||
|
||||
config :elasticsearch,
|
||||
username: "username",
|
||||
password: "password"
|
||||
|
@ -49,7 +49,7 @@ defmodule Elasticsearch.Config do
|
|||
iex> Config.http_basic_credentials()
|
||||
"dXNlcm5hbWU6cGFzc3dvcmQ="
|
||||
"""
|
||||
@spec http_basic_credentials :: String.t | nil
|
||||
@spec http_basic_credentials :: String.t() | nil
|
||||
def http_basic_credentials do
|
||||
username = from_env(:elasticsearch, :username)
|
||||
password = from_env(:elasticsearch, :password)
|
||||
|
@ -74,7 +74,7 @@ defmodule Elasticsearch.Config do
|
|||
}
|
||||
|
||||
## Example
|
||||
|
||||
|
||||
iex> Config.config_for_index(:posts)
|
||||
%{
|
||||
settings: "test/support/settings/posts.json",
|
||||
|
@ -82,11 +82,13 @@ defmodule Elasticsearch.Config do
|
|||
sources: [Post]
|
||||
}
|
||||
"""
|
||||
@spec config_for_index(atom) :: %{
|
||||
settings: String.t,
|
||||
loader: DataLoader.t,
|
||||
sources: [DataLoader.source]
|
||||
} | nil
|
||||
@spec config_for_index(atom) ::
|
||||
%{
|
||||
settings: String.t(),
|
||||
loader: DataLoader.t(),
|
||||
sources: [DataLoader.source()]
|
||||
}
|
||||
| nil
|
||||
def config_for_index(index) do
|
||||
all()[:indexes][index]
|
||||
end
|
||||
|
@ -101,6 +103,7 @@ defmodule Elasticsearch.Config do
|
|||
"""
|
||||
@spec from_env(atom, atom, any) :: any
|
||||
def from_env(otp_app, key, default \\ nil)
|
||||
|
||||
def from_env(otp_app, key, default) do
|
||||
otp_app
|
||||
|> Application.get_env(key, default)
|
||||
|
|
|
@ -23,13 +23,13 @@ defmodule Elasticsearch.DataStream do
|
|||
bulk_page_size: 5000
|
||||
|
||||
## Example
|
||||
|
||||
|
||||
iex> stream = DataStream.stream(MyApp.Schema, Elasticsearch.Test.DataLoader)
|
||||
...> is_function(stream)
|
||||
true
|
||||
|
||||
"""
|
||||
@spec stream(source, Elasticsearch.DataLoader.t) :: Stream.t
|
||||
@spec stream(source, Elasticsearch.DataLoader.t()) :: Stream.t()
|
||||
def stream(source, loader) do
|
||||
Stream.resource(&init/0, &next(&1, source, loader), &finish/1)
|
||||
end
|
||||
|
@ -59,7 +59,7 @@ defmodule Elasticsearch.DataStream do
|
|||
case loader.load(source, offset, limit) do
|
||||
# If the load returns no more items (i.e., we've iterated through them
|
||||
# all) then halt the stream and leave offset and limit unchanged.
|
||||
[] ->
|
||||
[] ->
|
||||
{:halt, {[], offset, limit}}
|
||||
|
||||
# If the load returns items, then return the first item, and put the
|
||||
|
|
|
@ -34,14 +34,14 @@ defprotocol Elasticsearch.Document do
|
|||
|
||||
def type(_item), do: "item"
|
||||
"""
|
||||
@spec type(any) :: String.t
|
||||
@spec type(any) :: String.t()
|
||||
def type(item)
|
||||
|
||||
@doc """
|
||||
Returns the parent ID of the document, or `false` if there is no parent.
|
||||
|
||||
## Examples
|
||||
|
||||
|
||||
# For structs that have parents
|
||||
def parent(%{parent_id: id}) when id != nil, do: id
|
||||
|
||||
|
|
|
@ -58,6 +58,7 @@ defmodule Elasticsearch.Exception do
|
|||
defp type(%{"error" => %{"root_cause" => causes}}) do
|
||||
get_in(causes, [Access.at(0), "type"])
|
||||
end
|
||||
|
||||
defp type(%{"error" => %{"type" => type}}) do
|
||||
type
|
||||
end
|
||||
|
|
|
@ -29,11 +29,11 @@ defmodule Elasticsearch.Executable do
|
|||
wrap = Application.app_dir(:elasticsearch) <> "/priv/bin/wrap"
|
||||
port = Port.open({:spawn, "#{wrap} #{executable} --port #{port_number}"}, [])
|
||||
{:os_pid, os_pid} = Port.info(port, :os_pid)
|
||||
IO.puts "[info] Running #{name} with PID #{os_pid} on port #{port_number}"
|
||||
IO.puts("[info] Running #{name} with PID #{os_pid} on port #{port_number}")
|
||||
{:ok, port}
|
||||
|
||||
_other ->
|
||||
IO.puts "[info] Detected #{name} already running on port #{port_number}"
|
||||
IO.puts("[info] Detected #{name} already running on port #{port_number}")
|
||||
{:ok, nil}
|
||||
end
|
||||
end
|
||||
|
|
|
@ -33,7 +33,7 @@ defmodule Mix.Tasks.Elasticsearch.Build do
|
|||
Mix.Task.run("app.start", [])
|
||||
|
||||
{indexes, type} = parse_args!(args)
|
||||
|
||||
|
||||
for alias <- indexes do
|
||||
config = Config.config_for_index(alias)
|
||||
build(alias, config, type)
|
||||
|
@ -44,8 +44,10 @@ defmodule Mix.Tasks.Elasticsearch.Build do
|
|||
case Elasticsearch.latest_index_starting_with(alias) do
|
||||
{:ok, index_name} ->
|
||||
IO.puts("Index already exists: #{index_name}")
|
||||
|
||||
{:error, :not_found} ->
|
||||
build(alias, config, :rebuild)
|
||||
|
||||
{:error, exception} ->
|
||||
Mix.raise(exception)
|
||||
end
|
||||
|
@ -58,22 +60,25 @@ defmodule Mix.Tasks.Elasticsearch.Build do
|
|||
{:error, errors} when is_list(errors) ->
|
||||
errors = for error <- errors, do: "#{inspect(error)}\n"
|
||||
|
||||
Mix.raise """
|
||||
Mix.raise("""
|
||||
Index created, but not aliased: #{alias}
|
||||
The following errors occurred:
|
||||
|
||||
#{errors}
|
||||
"""
|
||||
""")
|
||||
|
||||
{:error, :enoent} ->
|
||||
Mix.raise """
|
||||
Mix.raise("""
|
||||
Schema file not found at #{settings}.
|
||||
"""
|
||||
""")
|
||||
|
||||
{:error, exception} ->
|
||||
Mix.raise """
|
||||
Mix.raise("""
|
||||
Index #{alias} could not be created.
|
||||
|
||||
#{inspect exception}
|
||||
"""
|
||||
#{inspect(exception)}
|
||||
""")
|
||||
|
||||
error ->
|
||||
Mix.raise(error)
|
||||
end
|
||||
|
@ -81,19 +86,23 @@ defmodule Mix.Tasks.Elasticsearch.Build do
|
|||
|
||||
defp parse_args!(args) do
|
||||
{options, indexes} =
|
||||
OptionParser.parse!(args, switches: [
|
||||
existing: :boolean
|
||||
])
|
||||
OptionParser.parse!(
|
||||
args,
|
||||
switches: [
|
||||
existing: :boolean
|
||||
]
|
||||
)
|
||||
|
||||
indexes =
|
||||
indexes
|
||||
|> Enum.map(&String.to_atom/1)
|
||||
|> MapSet.new
|
||||
|> MapSet.new()
|
||||
|
||||
type =
|
||||
cond do
|
||||
options[:existing] ->
|
||||
:existing
|
||||
|
||||
true ->
|
||||
:rebuild
|
||||
end
|
||||
|
@ -108,18 +117,19 @@ defmodule Mix.Tasks.Elasticsearch.Build do
|
|||
|
||||
cond do
|
||||
MapSet.size(indexes) == 0 ->
|
||||
Mix.raise """
|
||||
Mix.raise("""
|
||||
No indexes specified. The following indexes are configured:
|
||||
|
||||
#{inspect Enum.to_list(configured)}
|
||||
"""
|
||||
#{inspect(Enum.to_list(configured))}
|
||||
""")
|
||||
|
||||
MapSet.subset?(indexes, configured) == false ->
|
||||
Mix.raise """
|
||||
Mix.raise("""
|
||||
The following indexes are not configured:
|
||||
|
||||
#{inspect Enum.to_list(MapSet.difference(indexes, configured))}
|
||||
"""
|
||||
#{inspect(Enum.to_list(MapSet.difference(indexes, configured)))}
|
||||
""")
|
||||
|
||||
true ->
|
||||
:ok
|
||||
end
|
||||
|
@ -129,7 +139,7 @@ defmodule Mix.Tasks.Elasticsearch.Build do
|
|||
config()
|
||||
|> Keyword.get(:indexes)
|
||||
|> Enum.map(fn {key, _val} -> key end)
|
||||
|> MapSet.new
|
||||
|> MapSet.new()
|
||||
end
|
||||
|
||||
defp config do
|
||||
|
|
|
@ -9,44 +9,56 @@ defmodule Mix.Tasks.Elasticsearch.Install do
|
|||
supervision tree in the Mix `:dev` environment.
|
||||
|
||||
## Example
|
||||
|
||||
|
||||
# Installs Elasticsearch and Kibana 5.1.1 to vendor/
|
||||
mix elasticsearch.install vendor --version 5.1.1
|
||||
"""
|
||||
|
||||
@doc false
|
||||
def run(args) do
|
||||
with {[{:version, version}], [location], _} <- OptionParser.parse(args, switches: [ version: :string ]) do
|
||||
with {[{:version, version}], [location], _} <-
|
||||
OptionParser.parse(args, switches: [version: :string]) do
|
||||
download_elasticsearch(version, location)
|
||||
download_kibana(version, location)
|
||||
else
|
||||
_ ->
|
||||
Mix.raise """
|
||||
Mix.raise("""
|
||||
Invalid options. See `mix help elasticsearch.install`
|
||||
"""
|
||||
""")
|
||||
end
|
||||
end
|
||||
|
||||
defp download_elasticsearch(version, location) do
|
||||
name = "elasticsearch-#{version}"
|
||||
name = "elasticsearch-#{version}"
|
||||
tar = "#{name}.tar.gz"
|
||||
|
||||
System.cmd("curl", ["-L", "-O", "https://artifacts.elastic.co/downloads/elasticsearch/#{tar}"], cd: location)
|
||||
System.cmd(
|
||||
"curl",
|
||||
["-L", "-O", "https://artifacts.elastic.co/downloads/elasticsearch/#{tar}"],
|
||||
cd: location
|
||||
)
|
||||
|
||||
unpack(tar, name, "elasticsearch", location)
|
||||
end
|
||||
|
||||
defp download_kibana(version, location) do
|
||||
name =
|
||||
case :os.type do
|
||||
case :os.type() do
|
||||
{:unix, :darwin} ->
|
||||
"kibana-#{version}-darwin-x86_64"
|
||||
|
||||
other ->
|
||||
Mix.raise "Unsupported system for Kibana: #{inspect other}"
|
||||
Mix.raise("Unsupported system for Kibana: #{inspect(other)}")
|
||||
end
|
||||
|
||||
tar = "#{name}.tar.gz"
|
||||
|
||||
System.cmd("curl", ["-L", "-O", "https://artifacts.elastic.co/downloads/kibana/#{tar}"], cd: location)
|
||||
System.cmd(
|
||||
"curl",
|
||||
["-L", "-O", "https://artifacts.elastic.co/downloads/kibana/#{tar}"],
|
||||
cd: location
|
||||
)
|
||||
|
||||
unpack(tar, name, "kibana", location)
|
||||
end
|
||||
|
||||
|
|
|
@ -4,12 +4,13 @@ defmodule ElasticsearchTest do
|
|||
doctest Elasticsearch
|
||||
|
||||
setup do
|
||||
on_exit fn ->
|
||||
on_exit(fn ->
|
||||
"posts"
|
||||
|> Elasticsearch.indexes_starting_with()
|
||||
|> elem(1)
|
||||
|> Enum.map(&Elasticsearch.delete!("/#{&1}"))
|
||||
|
||||
Elasticsearch.delete("/nonexistent")
|
||||
end
|
||||
end)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -7,12 +7,12 @@ defmodule Mix.Tasks.Elasticsearch.BuildTest do
|
|||
alias Elasticsearch
|
||||
|
||||
setup do
|
||||
on_exit fn ->
|
||||
on_exit(fn ->
|
||||
"posts"
|
||||
|> Elasticsearch.indexes_starting_with()
|
||||
|> elem(1)
|
||||
|> Enum.map(&Elasticsearch.delete("/#{&1}"))
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
describe ".run" do
|
||||
|
@ -59,9 +59,9 @@ defmodule Mix.Tasks.Elasticsearch.BuildTest do
|
|||
rerun("elasticsearch.build", ["posts"])
|
||||
|
||||
io =
|
||||
capture_io fn ->
|
||||
capture_io(fn ->
|
||||
rerun("elasticsearch.build", ["posts", "--existing"])
|
||||
end
|
||||
end)
|
||||
|
||||
assert io =~ "Index already exists: posts-"
|
||||
end
|
||||
|
|
|
@ -2,7 +2,7 @@ defmodule Elasticsearch.Test.DataLoader do
|
|||
@moduledoc false
|
||||
@behaviour Elasticsearch.DataLoader
|
||||
|
||||
def load(Post, _offset, limit) when limit <= 10_000 do
|
||||
def load(Post, _offset, limit) when limit <= 10_000 do
|
||||
[%Post{title: "Name", author: "Author"}]
|
||||
|> Stream.cycle()
|
||||
|> Stream.map(&Map.put(&1, :id, random_str()))
|
||||
|
|
|
@ -7,6 +7,7 @@ defimpl Elasticsearch.Document, for: Post do
|
|||
def id(item), do: item.id
|
||||
def type(_item), do: "post"
|
||||
def parent(_item), do: false
|
||||
|
||||
def encode(item) do
|
||||
%{
|
||||
title: item.title,
|
||||
|
|
|
@ -1,7 +1,11 @@
|
|||
ExUnit.start()
|
||||
|
||||
unless System.get_env("CI") do
|
||||
Elasticsearch.Executable.start_link("Elasticsearch", "./vendor/elasticsearch/bin/elasticsearch", 9200)
|
||||
Elasticsearch.Executable.start_link(
|
||||
"Elasticsearch",
|
||||
"./vendor/elasticsearch/bin/elasticsearch",
|
||||
9200
|
||||
)
|
||||
end
|
||||
|
||||
{:ok, _} = Elasticsearch.wait_for_boot(15)
|
||||
|
|
Loading…
Reference in a new issue