Functionalize mix tasks

Mix tasks should be clients on the business logic, and contain nothing
that doesn't relate to being a mix task.
This commit is contained in:
Daniel Berkompas 2017-11-03 14:22:47 -07:00
commit 832dc9db99
10 changed files with 121 additions and 74 deletions

View file

@ -8,17 +8,18 @@ config :elasticsearch,
password: "password",
bulk_page_size: 5000,
bulk_wait_interval: 15_000, # 15 seconds
loader: Elasticsearch.Test.DataLoader,
api_module: Elasticsearch.API.HTTP,
indexes: %{
index1: %{
alias: "index1_alias",
schema: "priv/elasticsearch/index1.json",
schema: "test/support/settings/index1.json",
loader: Elasticsearch.Test.DataLoader,
sources: [Type1]
},
index2: %{
alias: "index2_alias",
schema: "priv/elasticsearch/index2.json",
schema: "test/support/settings/index2.json",
loader: Elasticsearch.Test.DataLoader,
sources: [Type2]
}
}

View file

@ -32,7 +32,7 @@ defmodule Elasticsearch do
## Example
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "test/support/settings/index1.json")
:ok
iex> Elasticsearch.create_index_from_file("test2", "nonexistent.json")
@ -55,7 +55,7 @@ defmodule Elasticsearch do
## Example
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "test/support/settings/index1.json")
...> struct = %Type1{id: 123, name: "Post", author: "Author"}
...> Elasticsearch.put_document(struct, "test1")
{:ok,
@ -113,7 +113,7 @@ defmodule Elasticsearch do
## Example
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "test/support/settings/index1.json")
...> Elasticsearch.alias_index("test1", "test")
:ok
"""
@ -173,8 +173,8 @@ defmodule Elasticsearch do
## Example
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.create_index_from_file("test2", "priv/elasticsearch/index2.json")
iex> Elasticsearch.create_index_from_file("test1", "test/support/settings/index1.json")
...> Elasticsearch.create_index_from_file("test2", "test/support/settings/index2.json")
...> Elasticsearch.indexes_starting_with("test")
{:ok, ["test1", "test2"]}
"""
@ -195,8 +195,8 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
...> Elasticsearch.create_index_from_file("test2", "priv/elasticsearch/index2.json")
iex> Elasticsearch.create_index_from_file("test1", "test/support/settings/index1.json")
...> Elasticsearch.create_index_from_file("test2", "test/support/settings/index2.json")
...> Elasticsearch.latest_index_starting_with("test")
{:ok, "test2"}
@ -228,7 +228,7 @@ defmodule Elasticsearch do
## Example
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "test/support/settings/index1.json")
...> Elasticsearch.refresh_index("test1")
:ok
"""
@ -244,7 +244,7 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "test/support/settings/index1.json")
...> Elasticsearch.refresh_index!("test1")
:ok
@ -270,7 +270,7 @@ defmodule Elasticsearch do
If there is only one index, and `num_to_keep` is >= 1, the index is not deleted.
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "test/support/settings/index1.json")
...> Elasticsearch.clean_indexes_starting_with("test", 1)
...> Elasticsearch.indexes_starting_with("test")
{:ok, ["test1"]}
@ -278,7 +278,7 @@ defmodule Elasticsearch do
If `num_to_keep` is less than the number of indexes, the older indexes are
deleted.
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "test/support/settings/index1.json")
...> Elasticsearch.clean_indexes_starting_with("test", 0)
...> Elasticsearch.indexes_starting_with("test")
{:ok, []}
@ -363,7 +363,7 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "test/support/settings/index1.json")
...> Elasticsearch.put("/test1/type1/id", %{"name" => "name", "author" => "author"})
{:ok,
%{"_id" => "id", "_index" => "test1",
@ -388,7 +388,7 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "test/support/settings/index1.json")
...> Elasticsearch.put!("/test1/type1/id", %{"name" => "name", "author" => "author"})
%{"_id" => "id", "_index" => "test1",
"_shards" => %{"failed" => 0, "successful" => 1, "total" => 2},
@ -411,7 +411,7 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "test/support/settings/index1.json")
...> query = %{"query" => %{"match_all" => %{}}}
...> {:ok, resp} = Elasticsearch.post("/test1/_search", query)
...> resp["hits"]["hits"]
@ -427,11 +427,11 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "test/support/settings/index1.json")
...> query = %{"query" => %{"match_all" => %{}}}
...> resp = Elasticsearch.post!("/test1/_search", query)
...> resp["hits"]["hits"]
[]
...> is_map(resp)
true
Raises an error if the path is invalid or another error occurs:
@ -451,7 +451,7 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "test/support/settings/index1.json")
...> Elasticsearch.delete("/test1")
{:ok, %{"acknowledged" => true}}
@ -483,7 +483,7 @@ defmodule Elasticsearch do
## Examples
iex> Elasticsearch.create_index_from_file("test1", "priv/elasticsearch/index1.json")
iex> Elasticsearch.create_index_from_file("test1", "test/support/settings/index1.json")
...> Elasticsearch.delete!("/test1")
%{"acknowledged" => true}

View file

@ -0,0 +1,56 @@
defmodule Elasticsearch.Builder do
@moduledoc """
Wrapper functions that make it easier to build indexes from scratch.
"""
@doc """
Creates an index using a zero-downtime hot-swap technique.
1. Build an index for the given `alias`, with a timestamp: `alias-12323123`
2. Bulk upload data to that index using `loader` and `sources`.
3. Alias the `alias` to `alias-12323123`.
4. Remove old indexes beginning with `alias`.
5. Refresh `alias-12323123`.
This allows an old index to be served while a new index for `alias` is built.
## Example
iex> file = "test/support/settings/index1.json"
...> loader = Elasticsearch.Test.DataLoader
...> Builder.hot_swap_index("index1", file, loader, [Type1])
:ok
"""
@spec hot_swap_index(String.t, Path.t, Elasticsearch.DataLoader.t, list) ::
:ok |
{:error, Elasticsearch.Exception.t}
def hot_swap_index(alias, settings_file, loader, sources) do
index_name = build_index_name(alias)
with :ok <- Elasticsearch.create_index_from_file(index_name, settings_file),
:ok <- Elasticsearch.Bulk.upload(index_name, loader, sources),
:ok <- Elasticsearch.alias_index(index_name, alias),
:ok <- Elasticsearch.clean_indexes_starting_with(alias, 2),
:ok <- Elasticsearch.refresh_index(index_name) do
:ok
end
end
@doc """
Generates a name for an index that will be aliased to a given `alias`.
Similar to migrations, the name will contain a timestamp.
## Example
Config.build_index_name("main")
# => "main-1509581256"
"""
@spec build_index_name(String.t) :: String.t
def build_index_name(alias) do
"#{alias}-#{system_timestamp()}"
end
defp system_timestamp do
DateTime.to_unix(DateTime.utc_now)
end
end

View file

@ -68,17 +68,17 @@ defmodule Elasticsearch.Bulk do
## Example
iex> Bulk.upload("test1", [:type1])
iex> Bulk.upload("test1", Elasticsearch.Test.DataLoader, [Type1])
:ok
"""
@spec upload(String.t, list) :: :ok | {:error, [map]}
def upload(index_name, sources, errors \\ [])
def upload(_index_name, [], []), do: :ok
def upload(_index_name, [], errors), do: {:error, errors}
def upload(index_name, [source | tail] = _sources, errors) do
@spec upload(String.t, Elasticsearch.DataLoader.t, list) :: :ok | {:error, [map]}
def upload(index_name, loader, sources, errors \\ [])
def upload(_index_name, _loader, [], []), do: :ok
def upload(_index_name, _loader, [], errors), do: {:error, errors}
def upload(index_name, loader, [source | tail] = _sources, errors) do
errors =
source
|> DataStream.stream()
|> DataStream.stream(loader)
|> Stream.map(&encode!(&1, index_name))
|> Stream.chunk_every(config()[:bulk_page_size])
|> Stream.map(&Elasticsearch.put("/#{index_name}/_bulk", Enum.join(&1)))

View file

@ -59,20 +59,6 @@ defmodule Elasticsearch.Config do
end
end
@doc """
Generates a name for an index that will be aliased to a given `alias`.
Similar to migrations, the name will contain a timestamp.
## Example
Config.build_index_name("main")
# => "main-1509581256"
"""
@spec build_index_name(String.t) :: String.t
def build_index_name(alias) do
"#{alias}-#{system_timestamp()}"
end
@doc """
Gets the full configuration for a given index.
@ -82,7 +68,7 @@ defmodule Elasticsearch.Config do
indexes: %{
index1: %{
alias: "index1_alias",
schema: "priv/elasticsearch/index1.json",
schema: "test/support/settings/index1.json",
sources: [Type1]
}
}
@ -92,13 +78,17 @@ defmodule Elasticsearch.Config do
iex> Config.config_for_index(:index1)
%{
alias: "index1_alias",
schema: "priv/elasticsearch/index1.json",
schema: "test/support/settings/index1.json",
loader: Elasticsearch.Test.DataLoader,
sources: [Type1]
}
"""
@spec config_for_index(atom) ::
%{alias: String.t, schema: String.t, sources: [DataLoader.source]} |
nil
@spec config_for_index(atom) :: %{
alias: String.t,
schema: String.t,
loader: DataLoader.t,
sources: [DataLoader.source]
} | nil
def config_for_index(index) do
all()[:indexes][index]
end
@ -121,8 +111,4 @@ defmodule Elasticsearch.Config do
defp read_from_system({:system, env}, default), do: System.get_env(env) || default
defp read_from_system(value, _default), do: value
defp system_timestamp do
DateTime.to_unix(DateTime.utc_now)
end
end

View file

@ -24,14 +24,14 @@ defmodule Elasticsearch.DataStream do
## Example
iex> stream = DataStream.stream(MyApp.Schema)
iex> stream = DataStream.stream(MyApp.Schema, Elasticsearch.Test.DataLoader)
...> is_function(stream)
true
"""
@spec stream(source) :: Stream.t
def stream(source) do
Stream.resource(&init/0, &next(&1, source), &finish/1)
@spec stream(source, Elasticsearch.DataLoader.t) :: Stream.t
def stream(source, loader) do
Stream.resource(&init/0, &next(&1, source, loader), &finish/1)
end
# Store state in the following format:
@ -42,21 +42,21 @@ defmodule Elasticsearch.DataStream do
end
# If no items, load another page of items
defp next({[], offset, limit}, source) do
load_page(source, offset, limit)
defp next({[], offset, limit}, source, loader) do
load_page(source, loader, offset, limit)
end
# If there are items, return the next item, and set the new state equal to
# {tail, offset, limit}
defp next({[h | t], offset, limit}, _source) do
defp next({[h | t], offset, limit}, _source, _loader) do
{[h], {t, offset, limit}}
end
# Fetch a new page of items
defp load_page(source, offset, limit) do
defp load_page(source, loader, offset, limit) do
page_size = config()[:bulk_page_size]
case config()[:loader].load(source, offset, limit) do
case loader.load(source, offset, limit) do
# If the load returns no more items (i.e., we've iterated through them
# all) then halt the stream and leave offset and limit unchanged.
[] ->

View file

@ -5,7 +5,10 @@ defmodule Mix.Tasks.Elasticsearch.Build do
require Logger
alias Elasticsearch.Config
alias Elasticsearch.{
Builder,
Config
}
@doc false
def run(args) do
@ -30,32 +33,26 @@ defmodule Mix.Tasks.Elasticsearch.Build do
end
end
defp build(config, :rebuild) do
index_name = Config.build_index_name(config[:alias])
with :ok <- Elasticsearch.create_index_from_file(index_name, config[:schema]),
:ok <- Elasticsearch.Bulk.upload(index_name, config[:sources]),
:ok <- Elasticsearch.alias_index(index_name, config[:alias]),
:ok <- Elasticsearch.clean_indexes_starting_with(config[:alias], 2),
:ok <- Elasticsearch.refresh_index(index_name) do
:ok
defp build(%{alias: alias, schema: schema, loader: loader, sources: sources}, :rebuild) do
with :ok <- Builder.hot_swap_index(alias, schema, loader, sources) do
:ok
else
{:error, errors} when is_list(errors) ->
errors = for error <- errors, do: "#{inspect(error)}\n"
Mix.raise """
Index created, but not aliased: #{index_name}
Index created, but not aliased: #{alias}
The following errors occurred:
#{errors}
"""
{:error, :enoent} ->
Mix.raise """
Schema file not found at #{config[:schema]}.
Schema file not found at #{schema}.
"""
{:error, exception} ->
Mix.raise """
Index #{index_name} could not be created.
Index #{alias} could not be created.
#{inspect exception}
"""

View file

@ -0,0 +1,7 @@
defmodule Elasticsearch.BuilderTest do
use ExUnit.Case
alias Elasticsearch.Builder
doctest Elasticsearch.Builder
end