From 3806269d3ef6b34313b59bc1e071ca2ef0bf534a Mon Sep 17 00:00:00 2001 From: Marcel Otto Date: Fri, 6 Nov 2020 13:03:01 +0100 Subject: [PATCH] Add read_stream!/2 functions --- lib/rdf/serialization/decoder.ex | 37 ++++++++++++++++--- lib/rdf/serialization/format.ex | 15 +++++++- lib/rdf/serialization/reader.ex | 23 ++++++++---- lib/rdf/serialization/serialization.ex | 23 +++++++++++- lib/rdf/serializations/nquads_decoder.ex | 10 ++--- lib/rdf/serializations/ntriples_decoder.ex | 10 ++--- test/unit/nquads_decoder_test.exs | 20 ++++++++++ test/unit/ntriples_decoder_test.exs | 19 ++++++++++ .../unit/serialization/serialization_test.exs | 34 +++++++++++++++-- 9 files changed, 163 insertions(+), 28 deletions(-) diff --git a/lib/rdf/serialization/decoder.ex b/lib/rdf/serialization/decoder.ex index c98064c..659ba39 100644 --- a/lib/rdf/serialization/decoder.ex +++ b/lib/rdf/serialization/decoder.ex @@ -16,26 +16,40 @@ defmodule RDF.Serialization.Decoder do @doc """ Decodes a serialized `RDF.Graph` or `RDF.Dataset` from a string. - As opposed to `decode`, it raises an exception if an error occurs. + As opposed to `decode/2`, it raises an exception if an error occurs. Note: The `__using__` macro automatically provides an overridable default implementation based on the non-bang `decode` function. """ - @callback decode!(String.t(), keyword) :: RDF.Graph.t() | RDF.Dataset.t() + @callback decode!(String.t(), keyword) :: Graph.t() | Dataset.t() @doc """ Decodes a serialized `RDF.Graph` or `RDF.Dataset` from a stream. - """ - @callback decode_from_stream(Enumerable.t(), keyword) :: RDF.Graph.t() | RDF.Dataset.t() - @optional_callbacks decode_from_stream: 2 + It returns an `{:ok, data}` tuple, with `data` being the deserialized graph or + dataset, or `{:error, reason}` if an error occurs. + """ + @callback decode_from_stream(Enumerable.t(), keyword) :: + {:ok, Graph.t() | Dataset.t()} | {:error, any} + + @doc """ + Decodes a serialized `RDF.Graph` or `RDF.Dataset` from a stream. + + As opposed to `decode_from_stream/2`, it raises an exception if an error occurs. + + Note: The `__using__` macro automatically provides an overridable default + implementation based on the non-bang `decode` function. + """ + @callback decode_from_stream!(Enumerable.t(), keyword) :: Graph.t() | Dataset.t() + + @optional_callbacks decode_from_stream: 2, decode_from_stream!: 2 defmacro __using__(_) do quote bind_quoted: [], unquote: true do @behaviour unquote(__MODULE__) @impl unquote(__MODULE__) - @spec decode!(String.t(), keyword) :: RDF.Graph.t() | RDF.Dataset.t() + @spec decode!(String.t(), keyword) :: Graph.t() | Dataset.t() def decode!(content, opts \\ []) do case decode(content, opts) do {:ok, data} -> data @@ -56,6 +70,17 @@ defmodule RDF.Serialization.Decoder do |> Keyword.has_key?(:decode_from_stream) @doc false def stream_support?, do: @stream_support + + if @stream_support and + not (__MODULE__ |> Module.definitions_in() |> Keyword.has_key?(:decode_from_stream!)) do + @impl unquote(__MODULE__) + def decode_from_stream!(stream, opts \\ []) do + case decode_from_stream(stream, opts) do + {:ok, data} -> data + {:error, reason} -> raise reason + end + end + end end end end diff --git a/lib/rdf/serialization/format.ex b/lib/rdf/serialization/format.ex index 1d69b1a..5665303 100644 --- a/lib/rdf/serialization/format.ex +++ b/lib/rdf/serialization/format.ex @@ -106,11 +106,24 @@ defmodule RDF.Serialization.Format do @doc """ Deserializes a graph or dataset from a stream. + It returns an `{:ok, data}` tuple, with `data` being the deserialized graph or + dataset, or `{:error, reason}` if an error occurs. + #{@decoder_doc_ref} """ - @spec read_stream(Enumerable.t(), keyword) :: Graph.t() | Dataset.t() + @spec read_stream(Enumerable.t(), keyword) :: {:ok, Graph.t() | Dataset.t()} | {:error, any} def read_stream(stream, opts \\ []), do: Reader.read_stream(decoder(), stream, opts) + @doc """ + Deserializes a graph or dataset from a stream. + + As opposed to `read_stream/2`, it raises an exception if an error occurs. + + #{@decoder_doc_ref} + """ + @spec read_stream!(Enumerable.t(), keyword) :: Graph.t() | Dataset.t() + def read_stream!(stream, opts \\ []), do: Reader.read_stream!(decoder(), stream, opts) + @doc """ Deserializes a graph or dataset from a file. diff --git a/lib/rdf/serialization/reader.ex b/lib/rdf/serialization/reader.ex index 4d2e680..e0fee00 100644 --- a/lib/rdf/serialization/reader.ex +++ b/lib/rdf/serialization/reader.ex @@ -21,7 +21,8 @@ defmodule RDF.Serialization.Reader do decoder.decode!(content, opts) end - @spec read_stream(module, Enumerable.t(), keyword) :: Graph.t() | Dataset.t() + @spec read_stream(module, Enumerable.t(), keyword) :: + {:ok, Graph.t() | Dataset.t()} | {:error, any} def read_stream(decoder, stream, opts \\ []) do if decoder.stream_support?() do decoder.decode_from_stream(stream, opts) @@ -30,10 +31,19 @@ defmodule RDF.Serialization.Reader do end end + @spec read_stream!(module, Enumerable.t(), keyword) :: Graph.t() | Dataset.t() + def read_stream!(decoder, stream, opts \\ []) do + if decoder.stream_support?() do + decoder.decode_from_stream!(stream, opts) + else + raise "#{inspect(decoder)} does not support streaming" + end + end + @spec read_file(module, Path.t(), keyword) :: {:ok, Graph.t() | Dataset.t()} | {:error, any} def read_file(decoder, file, opts \\ []) do decoder - |> Serialization.use_file_streaming(opts) + |> Serialization.use_file_streaming!(opts) |> do_read_file(decoder, file, opts) end @@ -48,10 +58,9 @@ defmodule RDF.Serialization.Reader do end defp do_read_file(true, decoder, file, opts) do - {:ok, - file - |> File.stream!(file_mode(decoder, opts)) - |> decoder.decode_from_stream(opts)} + file + |> File.stream!(file_mode(decoder, opts)) + |> decoder.decode_from_stream(opts) rescue error in RuntimeError -> {:error, error.message} error -> {:error, error} @@ -77,7 +86,7 @@ defmodule RDF.Serialization.Reader do defp do_read_file!(_stream_mode, decoder, file, opts) do file |> File.stream!(file_mode(decoder, opts)) - |> decoder.decode_from_stream(opts) + |> decoder.decode_from_stream!(opts) end @doc false diff --git a/lib/rdf/serialization/serialization.ex b/lib/rdf/serialization/serialization.ex index 1b05d48..4c10c70 100644 --- a/lib/rdf/serialization/serialization.ex +++ b/lib/rdf/serialization/serialization.ex @@ -167,16 +167,37 @@ defmodule RDF.Serialization do @doc """ Deserializes a graph or dataset from a stream. + It returns an `{:ok, data}` tuple, with `data` being the deserialized graph or + dataset, or `{:error, reason}` if an error occurs. + The format must be specified with the `format` option and a format name or the `media_type` option and the media type of the format. Please refer to the documentation of the decoder of a RDF serialization format for format-specific options. """ - @spec read_stream(Enumerable.t(), keyword) :: Graph.t() | Dataset.t() + @spec read_stream(Enumerable.t(), keyword) :: {:ok, Graph.t() | Dataset.t()} | {:error, any} def read_stream(stream, opts) do with {:ok, format} <- string_format(opts) do format.read_stream(stream, opts) + end + end + + @doc """ + Deserializes a graph or dataset from a stream. + + As opposed to `read_stream/2`, it raises an exception if an error occurs. + + The format must be specified with the `format` option and a format name or the + `media_type` option and the media type of the format. + + Please refer to the documentation of the decoder of a RDF serialization format + for format-specific options. + """ + @spec read_stream!(Enumerable.t(), keyword) :: Graph.t() | Dataset.t() + def read_stream!(stream, opts) do + with {:ok, format} <- string_format(opts) do + format.read_stream!(stream, opts) else {:error, error} -> raise error end diff --git a/lib/rdf/serializations/nquads_decoder.ex b/lib/rdf/serializations/nquads_decoder.ex index e935bba..7a45078 100644 --- a/lib/rdf/serializations/nquads_decoder.ex +++ b/lib/rdf/serializations/nquads_decoder.ex @@ -16,13 +16,13 @@ defmodule RDF.NQuads.Decoder do end @impl RDF.Serialization.Decoder - @spec decode_from_stream(Enumerable.t(), keyword) :: Dataset.t() + @spec decode_from_stream(Enumerable.t(), keyword) :: {:ok, Dataset.t()} | {:error, any} def decode_from_stream(stream, _opts \\ []) do - Enum.reduce(stream, Dataset.new(), fn line, dataset -> + Enum.reduce_while(stream, {:ok, Dataset.new()}, fn line, {:ok, dataset} -> case do_decode(line, false) do - {:ok, []} -> dataset - {:ok, [[quad]]} -> Dataset.add(dataset, quad) - {:error, error} -> raise error + {:ok, []} -> {:cont, {:ok, dataset}} + {:ok, [[quad]]} -> {:cont, {:ok, Dataset.add(dataset, quad)}} + {:error, _} = error -> {:halt, error} end end) end diff --git a/lib/rdf/serializations/ntriples_decoder.ex b/lib/rdf/serializations/ntriples_decoder.ex index fd63ef9..986a4e3 100644 --- a/lib/rdf/serializations/ntriples_decoder.ex +++ b/lib/rdf/serializations/ntriples_decoder.ex @@ -16,13 +16,13 @@ defmodule RDF.NTriples.Decoder do end @impl RDF.Serialization.Decoder - @spec decode_from_stream(Enumerable.t(), keyword) :: Graph.t() + @spec decode_from_stream(Enumerable.t(), keyword) :: {:ok, Graph.t()} | {:error, any} def decode_from_stream(stream, _opts \\ []) do - Enum.reduce(stream, Graph.new(), fn line, graph -> + Enum.reduce_while(stream, {:ok, Graph.new()}, fn line, {:ok, graph} -> case do_decode(line, false) do - {:ok, []} -> graph - {:ok, [[triple]]} -> Graph.add(graph, triple) - {:error, error} -> raise error + {:ok, []} -> {:cont, {:ok, graph}} + {:ok, [[triple]]} -> {:cont, {:ok, Graph.add(graph, triple)}} + {:error, _} = error -> {:halt, error} end end) end diff --git a/test/unit/nquads_decoder_test.exs b/test/unit/nquads_decoder_test.exs index 685886d..8c05dfa 100644 --- a/test/unit/nquads_decoder_test.exs +++ b/test/unit/nquads_decoder_test.exs @@ -188,6 +188,26 @@ defmodule RDF.NQuads.DecoderTest do """ |> string_to_stream() |> Decoder.decode_from_stream() == + {:ok, + Dataset.new([ + {EX.S1, EX.p1(), EX.O1, EX.G}, + {EX.S1, EX.p2(), EX.O2, EX.G}, + {EX.S2, EX.p3(), ~B"foo", EX.G}, + {EX.S2, EX.p3(), ~L"foo"en} + ])} + end + + test "decode_from_stream!/2" do + assert """ + . + . + _:foo . + + + "foo"@en . + """ + |> string_to_stream() + |> Decoder.decode_from_stream!() == Dataset.new([ {EX.S1, EX.p1(), EX.O1, EX.G}, {EX.S1, EX.p2(), EX.O2, EX.G}, diff --git a/test/unit/ntriples_decoder_test.exs b/test/unit/ntriples_decoder_test.exs index 4590960..6930c13 100644 --- a/test/unit/ntriples_decoder_test.exs +++ b/test/unit/ntriples_decoder_test.exs @@ -152,6 +152,25 @@ defmodule RDF.NTriples.DecoderTest do """ |> string_to_stream() |> Decoder.decode_from_stream() == + {:ok, + Graph.new([ + {EX.S1, EX.p1(), EX.O1}, + {EX.S1, EX.p2(), ~B"foo"}, + {EX.S2, EX.p3(), ~L"foo"en} + ])} + end + + test "decode_from_stream!/2" do + assert """ + . + + _:foo . + + + "foo"@en . + """ + |> string_to_stream() + |> Decoder.decode_from_stream!() == Graph.new([ {EX.S1, EX.p1(), EX.O1}, {EX.S1, EX.p2(), ~B"foo"}, diff --git a/test/unit/serialization/serialization_test.exs b/test/unit/serialization/serialization_test.exs index ce59947..e215bba 100644 --- a/test/unit/serialization/serialization_test.exs +++ b/test/unit/serialization/serialization_test.exs @@ -71,6 +71,34 @@ defmodule RDF.SerializationTest do assert @example_ntriples_string |> string_to_stream() |> Serialization.read_stream(format: :ntriples) == + {:ok, Graph.clear_metadata(@example_graph)} + end + + test "with wrong format name" do + assert {:error, "N-Triple scanner error" <> _} = + @example_turtle_string + |> string_to_stream() + |> Serialization.read_stream(format: :ntriples) + end + + test "with invalid format name" do + assert {:error, "unable to detect serialization format"} == + Serialization.read_stream(@example_ntriples_string, format: :foo) + end + + test "with media_type" do + assert @example_ntriples_string + |> string_to_stream() + |> Serialization.read_stream(media_type: "application/n-triples") == + {:ok, Graph.clear_metadata(@example_graph)} + end + end + + describe "read_stream!/2" do + test "with correct format name" do + assert @example_ntriples_string + |> string_to_stream() + |> Serialization.read_stream!(format: :ntriples) == Graph.clear_metadata(@example_graph) end @@ -78,20 +106,20 @@ defmodule RDF.SerializationTest do assert_raise RuntimeError, fn -> @example_ntriples_string |> string_to_stream() - |> Serialization.read_stream(format: :turtle) + |> Serialization.read_stream!(format: :turtle) end end test "with invalid format name" do assert_raise RuntimeError, "unable to detect serialization format", fn -> - Serialization.read_stream(@example_ntriples_string, format: :foo) + Serialization.read_stream!(@example_ntriples_string, format: :foo) end end test "with media_type" do assert @example_ntriples_string |> string_to_stream() - |> Serialization.read_stream(media_type: "application/n-triples") == + |> Serialization.read_stream!(media_type: "application/n-triples") == Graph.clear_metadata(@example_graph) end end