Add read_stream!/2 functions

This commit is contained in:
Marcel Otto 2020-11-06 13:03:01 +01:00
parent 8b8c3feef7
commit 3806269d3e
9 changed files with 163 additions and 28 deletions

View file

@ -16,26 +16,40 @@ defmodule RDF.Serialization.Decoder do
@doc """ @doc """
Decodes a serialized `RDF.Graph` or `RDF.Dataset` from a string. Decodes a serialized `RDF.Graph` or `RDF.Dataset` from a string.
As opposed to `decode`, it raises an exception if an error occurs. As opposed to `decode/2`, it raises an exception if an error occurs.
Note: The `__using__` macro automatically provides an overridable default Note: The `__using__` macro automatically provides an overridable default
implementation based on the non-bang `decode` function. implementation based on the non-bang `decode` function.
""" """
@callback decode!(String.t(), keyword) :: RDF.Graph.t() | RDF.Dataset.t() @callback decode!(String.t(), keyword) :: Graph.t() | Dataset.t()
@doc """ @doc """
Decodes a serialized `RDF.Graph` or `RDF.Dataset` from a stream. Decodes a serialized `RDF.Graph` or `RDF.Dataset` from a stream.
"""
@callback decode_from_stream(Enumerable.t(), keyword) :: RDF.Graph.t() | RDF.Dataset.t()
@optional_callbacks decode_from_stream: 2 It returns an `{:ok, data}` tuple, with `data` being the deserialized graph or
dataset, or `{:error, reason}` if an error occurs.
"""
@callback decode_from_stream(Enumerable.t(), keyword) ::
{:ok, Graph.t() | Dataset.t()} | {:error, any}
@doc """
Decodes a serialized `RDF.Graph` or `RDF.Dataset` from a stream.
As opposed to `decode_from_stream/2`, it raises an exception if an error occurs.
Note: The `__using__` macro automatically provides an overridable default
implementation based on the non-bang `decode` function.
"""
@callback decode_from_stream!(Enumerable.t(), keyword) :: Graph.t() | Dataset.t()
@optional_callbacks decode_from_stream: 2, decode_from_stream!: 2
defmacro __using__(_) do defmacro __using__(_) do
quote bind_quoted: [], unquote: true do quote bind_quoted: [], unquote: true do
@behaviour unquote(__MODULE__) @behaviour unquote(__MODULE__)
@impl unquote(__MODULE__) @impl unquote(__MODULE__)
@spec decode!(String.t(), keyword) :: RDF.Graph.t() | RDF.Dataset.t() @spec decode!(String.t(), keyword) :: Graph.t() | Dataset.t()
def decode!(content, opts \\ []) do def decode!(content, opts \\ []) do
case decode(content, opts) do case decode(content, opts) do
{:ok, data} -> data {:ok, data} -> data
@ -56,6 +70,17 @@ defmodule RDF.Serialization.Decoder do
|> Keyword.has_key?(:decode_from_stream) |> Keyword.has_key?(:decode_from_stream)
@doc false @doc false
def stream_support?, do: @stream_support def stream_support?, do: @stream_support
if @stream_support and
not (__MODULE__ |> Module.definitions_in() |> Keyword.has_key?(:decode_from_stream!)) do
@impl unquote(__MODULE__)
def decode_from_stream!(stream, opts \\ []) do
case decode_from_stream(stream, opts) do
{:ok, data} -> data
{:error, reason} -> raise reason
end
end
end
end end
end end
end end

View file

@ -106,11 +106,24 @@ defmodule RDF.Serialization.Format do
@doc """ @doc """
Deserializes a graph or dataset from a stream. Deserializes a graph or dataset from a stream.
It returns an `{:ok, data}` tuple, with `data` being the deserialized graph or
dataset, or `{:error, reason}` if an error occurs.
#{@decoder_doc_ref} #{@decoder_doc_ref}
""" """
@spec read_stream(Enumerable.t(), keyword) :: Graph.t() | Dataset.t() @spec read_stream(Enumerable.t(), keyword) :: {:ok, Graph.t() | Dataset.t()} | {:error, any}
def read_stream(stream, opts \\ []), do: Reader.read_stream(decoder(), stream, opts) def read_stream(stream, opts \\ []), do: Reader.read_stream(decoder(), stream, opts)
@doc """
Deserializes a graph or dataset from a stream.
As opposed to `read_stream/2`, it raises an exception if an error occurs.
#{@decoder_doc_ref}
"""
@spec read_stream!(Enumerable.t(), keyword) :: Graph.t() | Dataset.t()
def read_stream!(stream, opts \\ []), do: Reader.read_stream!(decoder(), stream, opts)
@doc """ @doc """
Deserializes a graph or dataset from a file. Deserializes a graph or dataset from a file.

View file

@ -21,7 +21,8 @@ defmodule RDF.Serialization.Reader do
decoder.decode!(content, opts) decoder.decode!(content, opts)
end end
@spec read_stream(module, Enumerable.t(), keyword) :: Graph.t() | Dataset.t() @spec read_stream(module, Enumerable.t(), keyword) ::
{:ok, Graph.t() | Dataset.t()} | {:error, any}
def read_stream(decoder, stream, opts \\ []) do def read_stream(decoder, stream, opts \\ []) do
if decoder.stream_support?() do if decoder.stream_support?() do
decoder.decode_from_stream(stream, opts) decoder.decode_from_stream(stream, opts)
@ -30,10 +31,19 @@ defmodule RDF.Serialization.Reader do
end end
end end
@spec read_stream!(module, Enumerable.t(), keyword) :: Graph.t() | Dataset.t()
def read_stream!(decoder, stream, opts \\ []) do
if decoder.stream_support?() do
decoder.decode_from_stream!(stream, opts)
else
raise "#{inspect(decoder)} does not support streaming"
end
end
@spec read_file(module, Path.t(), keyword) :: {:ok, Graph.t() | Dataset.t()} | {:error, any} @spec read_file(module, Path.t(), keyword) :: {:ok, Graph.t() | Dataset.t()} | {:error, any}
def read_file(decoder, file, opts \\ []) do def read_file(decoder, file, opts \\ []) do
decoder decoder
|> Serialization.use_file_streaming(opts) |> Serialization.use_file_streaming!(opts)
|> do_read_file(decoder, file, opts) |> do_read_file(decoder, file, opts)
end end
@ -48,10 +58,9 @@ defmodule RDF.Serialization.Reader do
end end
defp do_read_file(true, decoder, file, opts) do defp do_read_file(true, decoder, file, opts) do
{:ok, file
file |> File.stream!(file_mode(decoder, opts))
|> File.stream!(file_mode(decoder, opts)) |> decoder.decode_from_stream(opts)
|> decoder.decode_from_stream(opts)}
rescue rescue
error in RuntimeError -> {:error, error.message} error in RuntimeError -> {:error, error.message}
error -> {:error, error} error -> {:error, error}
@ -77,7 +86,7 @@ defmodule RDF.Serialization.Reader do
defp do_read_file!(_stream_mode, decoder, file, opts) do defp do_read_file!(_stream_mode, decoder, file, opts) do
file file
|> File.stream!(file_mode(decoder, opts)) |> File.stream!(file_mode(decoder, opts))
|> decoder.decode_from_stream(opts) |> decoder.decode_from_stream!(opts)
end end
@doc false @doc false

View file

@ -167,16 +167,37 @@ defmodule RDF.Serialization do
@doc """ @doc """
Deserializes a graph or dataset from a stream. Deserializes a graph or dataset from a stream.
It returns an `{:ok, data}` tuple, with `data` being the deserialized graph or
dataset, or `{:error, reason}` if an error occurs.
The format must be specified with the `format` option and a format name or the The format must be specified with the `format` option and a format name or the
`media_type` option and the media type of the format. `media_type` option and the media type of the format.
Please refer to the documentation of the decoder of a RDF serialization format Please refer to the documentation of the decoder of a RDF serialization format
for format-specific options. for format-specific options.
""" """
@spec read_stream(Enumerable.t(), keyword) :: Graph.t() | Dataset.t() @spec read_stream(Enumerable.t(), keyword) :: {:ok, Graph.t() | Dataset.t()} | {:error, any}
def read_stream(stream, opts) do def read_stream(stream, opts) do
with {:ok, format} <- string_format(opts) do with {:ok, format} <- string_format(opts) do
format.read_stream(stream, opts) format.read_stream(stream, opts)
end
end
@doc """
Deserializes a graph or dataset from a stream.
As opposed to `read_stream/2`, it raises an exception if an error occurs.
The format must be specified with the `format` option and a format name or the
`media_type` option and the media type of the format.
Please refer to the documentation of the decoder of a RDF serialization format
for format-specific options.
"""
@spec read_stream!(Enumerable.t(), keyword) :: Graph.t() | Dataset.t()
def read_stream!(stream, opts) do
with {:ok, format} <- string_format(opts) do
format.read_stream!(stream, opts)
else else
{:error, error} -> raise error {:error, error} -> raise error
end end

View file

@ -16,13 +16,13 @@ defmodule RDF.NQuads.Decoder do
end end
@impl RDF.Serialization.Decoder @impl RDF.Serialization.Decoder
@spec decode_from_stream(Enumerable.t(), keyword) :: Dataset.t() @spec decode_from_stream(Enumerable.t(), keyword) :: {:ok, Dataset.t()} | {:error, any}
def decode_from_stream(stream, _opts \\ []) do def decode_from_stream(stream, _opts \\ []) do
Enum.reduce(stream, Dataset.new(), fn line, dataset -> Enum.reduce_while(stream, {:ok, Dataset.new()}, fn line, {:ok, dataset} ->
case do_decode(line, false) do case do_decode(line, false) do
{:ok, []} -> dataset {:ok, []} -> {:cont, {:ok, dataset}}
{:ok, [[quad]]} -> Dataset.add(dataset, quad) {:ok, [[quad]]} -> {:cont, {:ok, Dataset.add(dataset, quad)}}
{:error, error} -> raise error {:error, _} = error -> {:halt, error}
end end
end) end)
end end

View file

@ -16,13 +16,13 @@ defmodule RDF.NTriples.Decoder do
end end
@impl RDF.Serialization.Decoder @impl RDF.Serialization.Decoder
@spec decode_from_stream(Enumerable.t(), keyword) :: Graph.t() @spec decode_from_stream(Enumerable.t(), keyword) :: {:ok, Graph.t()} | {:error, any}
def decode_from_stream(stream, _opts \\ []) do def decode_from_stream(stream, _opts \\ []) do
Enum.reduce(stream, Graph.new(), fn line, graph -> Enum.reduce_while(stream, {:ok, Graph.new()}, fn line, {:ok, graph} ->
case do_decode(line, false) do case do_decode(line, false) do
{:ok, []} -> graph {:ok, []} -> {:cont, {:ok, graph}}
{:ok, [[triple]]} -> Graph.add(graph, triple) {:ok, [[triple]]} -> {:cont, {:ok, Graph.add(graph, triple)}}
{:error, error} -> raise error {:error, _} = error -> {:halt, error}
end end
end) end)
end end

View file

@ -188,6 +188,26 @@ defmodule RDF.NQuads.DecoderTest do
""" """
|> string_to_stream() |> string_to_stream()
|> Decoder.decode_from_stream() == |> Decoder.decode_from_stream() ==
{:ok,
Dataset.new([
{EX.S1, EX.p1(), EX.O1, EX.G},
{EX.S1, EX.p2(), EX.O2, EX.G},
{EX.S2, EX.p3(), ~B"foo", EX.G},
{EX.S2, EX.p3(), ~L"foo"en}
])}
end
test "decode_from_stream!/2" do
assert """
<http://example.org/#S1> <http://example.org/#p1> <http://example.org/#O1> <http://example.org/#G> .
<http://example.org/#S1> <http://example.org/#p2> <http://example.org/#O2> <http://example.org/#G> .
<http://example.org/#S2> <http://example.org/#p3> _:foo <http://example.org/#G> .
<http://example.org/#S2> <http://example.org/#p3> "foo"@en .
"""
|> string_to_stream()
|> Decoder.decode_from_stream!() ==
Dataset.new([ Dataset.new([
{EX.S1, EX.p1(), EX.O1, EX.G}, {EX.S1, EX.p1(), EX.O1, EX.G},
{EX.S1, EX.p2(), EX.O2, EX.G}, {EX.S1, EX.p2(), EX.O2, EX.G},

View file

@ -152,6 +152,25 @@ defmodule RDF.NTriples.DecoderTest do
""" """
|> string_to_stream() |> string_to_stream()
|> Decoder.decode_from_stream() == |> Decoder.decode_from_stream() ==
{:ok,
Graph.new([
{EX.S1, EX.p1(), EX.O1},
{EX.S1, EX.p2(), ~B"foo"},
{EX.S2, EX.p3(), ~L"foo"en}
])}
end
test "decode_from_stream!/2" do
assert """
<http://example.org/#S1> <http://example.org/#p1> <http://example.org/#O1> .
<http://example.org/#S1> <http://example.org/#p2> _:foo .
<http://example.org/#S2> <http://example.org/#p3> "foo"@en .
"""
|> string_to_stream()
|> Decoder.decode_from_stream!() ==
Graph.new([ Graph.new([
{EX.S1, EX.p1(), EX.O1}, {EX.S1, EX.p1(), EX.O1},
{EX.S1, EX.p2(), ~B"foo"}, {EX.S1, EX.p2(), ~B"foo"},

View file

@ -71,6 +71,34 @@ defmodule RDF.SerializationTest do
assert @example_ntriples_string assert @example_ntriples_string
|> string_to_stream() |> string_to_stream()
|> Serialization.read_stream(format: :ntriples) == |> Serialization.read_stream(format: :ntriples) ==
{:ok, Graph.clear_metadata(@example_graph)}
end
test "with wrong format name" do
assert {:error, "N-Triple scanner error" <> _} =
@example_turtle_string
|> string_to_stream()
|> Serialization.read_stream(format: :ntriples)
end
test "with invalid format name" do
assert {:error, "unable to detect serialization format"} ==
Serialization.read_stream(@example_ntriples_string, format: :foo)
end
test "with media_type" do
assert @example_ntriples_string
|> string_to_stream()
|> Serialization.read_stream(media_type: "application/n-triples") ==
{:ok, Graph.clear_metadata(@example_graph)}
end
end
describe "read_stream!/2" do
test "with correct format name" do
assert @example_ntriples_string
|> string_to_stream()
|> Serialization.read_stream!(format: :ntriples) ==
Graph.clear_metadata(@example_graph) Graph.clear_metadata(@example_graph)
end end
@ -78,20 +106,20 @@ defmodule RDF.SerializationTest do
assert_raise RuntimeError, fn -> assert_raise RuntimeError, fn ->
@example_ntriples_string @example_ntriples_string
|> string_to_stream() |> string_to_stream()
|> Serialization.read_stream(format: :turtle) |> Serialization.read_stream!(format: :turtle)
end end
end end
test "with invalid format name" do test "with invalid format name" do
assert_raise RuntimeError, "unable to detect serialization format", fn -> assert_raise RuntimeError, "unable to detect serialization format", fn ->
Serialization.read_stream(@example_ntriples_string, format: :foo) Serialization.read_stream!(@example_ntriples_string, format: :foo)
end end
end end
test "with media_type" do test "with media_type" do
assert @example_ntriples_string assert @example_ntriples_string
|> string_to_stream() |> string_to_stream()
|> Serialization.read_stream(media_type: "application/n-triples") == |> Serialization.read_stream!(media_type: "application/n-triples") ==
Graph.clear_metadata(@example_graph) Graph.clear_metadata(@example_graph)
end end
end end