From d3f66bd5d9771dade16709eff8c852da62746167 Mon Sep 17 00:00:00 2001 From: Marcel Otto Date: Thu, 5 Nov 2020 13:36:08 +0100 Subject: [PATCH] Support for streaming on file read and write serialization functions --- CHANGELOG.md | 6 +- lib/rdf/serialization/reader.ex | 37 +++++++++- lib/rdf/serialization/serialization.ex | 33 +++++++++ lib/rdf/serialization/writer.ex | 38 +++++++++- .../unit/serialization/serialization_test.exs | 74 +++++++++++++++---- 5 files changed, 165 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 35bedf3..580757e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,11 @@ This project adheres to [Semantic Versioning](http://semver.org/) and ### Changed +- all `read_file/3` and `write_file/3` functions on `RDF.Serialization` and the + modules of RDF serialization formats can use streaming via the `:stream` flag + option; for `read_file/3` and `write_file/3` it defaults to `false`, while for + `read_file!/3` and `write_file!/3` it defaults to `true` when the respective + format supports streams - the Inspect form of the RDF data structures are now Turtle-based and respect the usual `:limit` behaviour - more compact Inspect form for `RDF.PrefixMap` @@ -27,7 +32,6 @@ This project adheres to [Semantic Versioning](http://semver.org/) and efficient in terms of performance and memory consumption than the previous ref-based blank nodes - ### Fixed - `RDF.BlankNode`s based on refs weren't serializable to Turtle diff --git a/lib/rdf/serialization/reader.ex b/lib/rdf/serialization/reader.ex index 79719fd..2433b8f 100644 --- a/lib/rdf/serialization/reader.ex +++ b/lib/rdf/serialization/reader.ex @@ -7,7 +7,7 @@ defmodule RDF.Serialization.Reader do implicitly use the proper `RDF.Serialization.Decoder` module. """ - alias RDF.{Dataset, Graph} + alias RDF.{Serialization, Dataset, Graph} @spec read_string(module, String.t(), keyword) :: {:ok, Graph.t() | Dataset.t()} | {:error, any} def read_string(decoder, content, opts \\ []) do @@ -30,15 +30,44 @@ defmodule RDF.Serialization.Reader do @spec read_file(module, Path.t(), keyword) :: {:ok, Graph.t() | Dataset.t()} | {:error, any} def read_file(decoder, file, opts \\ []) do + decoder + |> Serialization.use_file_streaming(opts) + |> do_read_file(decoder, file, opts) + end + + defp do_read_file(false, decoder, file, opts) do case File.read(file) do - {:ok, content} -> read_string(decoder, content, opts) + {:ok, content} -> decoder.decode(content, opts) {:error, reason} -> {:error, reason} end end + defp do_read_file(true, decoder, file, opts) do + {:ok, + file + |> File.stream!() + |> decoder.decode_from_stream(opts)} + rescue + error in RuntimeError -> {:error, error.message} + error -> {:error, error} + end + @spec read_file!(module, Path.t(), keyword) :: Graph.t() | Dataset.t() def read_file!(decoder, file, opts \\ []) do - content = File.read!(file) - read_string!(decoder, content, opts) + decoder + |> Serialization.use_file_streaming!(opts) + |> do_read_file!(decoder, file, opts) + end + + defp do_read_file!(false, decoder, file, opts) do + file + |> File.read!() + |> decoder.decode!(opts) + end + + defp do_read_file!(true, decoder, file, opts) do + file + |> File.stream!() + |> decoder.decode_from_stream(opts) end end diff --git a/lib/rdf/serialization/serialization.ex b/lib/rdf/serialization/serialization.ex index ff22f97..877d4ea 100644 --- a/lib/rdf/serialization/serialization.ex +++ b/lib/rdf/serialization/serialization.ex @@ -350,4 +350,37 @@ defmodule RDF.Serialization do {:error, "unable to detect serialization format"} end end + + @doc false + def use_file_streaming(mod, opts) do + case Keyword.get(opts, :stream) do + true -> + if mod.stream_support?() do + true + else + raise "#{inspect(mod)} does not support streams" + end + + _ -> + false + end + end + + @doc false + def use_file_streaming!(mod, opts) do + case Keyword.get(opts, :stream) do + nil -> + mod.stream_support?() + + true -> + if mod.stream_support?() do + true + else + raise "#{inspect(mod)} does not support streams" + end + + false -> + false + end + end end diff --git a/lib/rdf/serialization/writer.ex b/lib/rdf/serialization/writer.ex index b0164ca..a0a3f9f 100644 --- a/lib/rdf/serialization/writer.ex +++ b/lib/rdf/serialization/writer.ex @@ -7,6 +7,8 @@ defmodule RDF.Serialization.Writer do implicitly use the proper `RDF.Serialization.Encoder` module. """ + alias RDF.Serialization + @default_file_mode ~w[write exclusive]a @spec write_string(module, RDF.Data.t(), keyword) :: {:ok, String.t()} | {:error, any} @@ -30,16 +32,48 @@ defmodule RDF.Serialization.Writer do @spec write_file(module, RDF.Data.t(), Path.t(), keyword) :: :ok | {:error, any} def write_file(encoder, data, path, opts \\ []) do - with {:ok, encoded_string} <- write_string(encoder, data, opts) do + encoder + |> Serialization.use_file_streaming(opts) + |> do_write_file(encoder, data, path, opts) + + :ok + rescue + error in RuntimeError -> {:error, error.message} + error -> {:error, error} + end + + defp do_write_file(false, encoder, data, path, opts) do + with {:ok, encoded_string} <- encoder.encode(data, opts) do File.write(path, encoded_string, file_mode(encoder, opts)) end end + defp do_write_file(true, encoder, data, path, opts) do + data + |> encoder.stream(opts) + |> Enum.into(File.stream!(path, file_mode(encoder, opts))) + end + @spec write_file!(module, RDF.Data.t(), Path.t(), keyword) :: :ok def write_file!(encoder, data, path, opts \\ []) do - encoded_string = write_string!(encoder, data, opts) + encoder + |> Serialization.use_file_streaming!(opts) + |> do_write_file!(encoder, data, path, opts) + end + + defp do_write_file!(false, encoder, data, path, opts) do + encoded_string = encoder.encode!(data, opts) File.write!(path, encoded_string, file_mode(encoder, opts)) end + defp do_write_file!(true, encoder, data, path, opts) do + data + |> encoder.stream(opts) + |> Enum.into(File.stream!(path, file_mode(encoder, opts))) + + :ok + end + + end defp file_mode(_encoder, opts) do file_mode = Keyword.get(opts, :file_mode, @default_file_mode) diff --git a/test/unit/serialization/serialization_test.exs b/test/unit/serialization/serialization_test.exs index 06c4373..de7a394 100644 --- a/test/unit/serialization/serialization_test.exs +++ b/test/unit/serialization/serialization_test.exs @@ -3,7 +3,7 @@ defmodule RDF.SerializationTest do doctest RDF.Serialization - alias RDF.Serialization + alias RDF.{Serialization, NTriples, Turtle} @example_graph Graph.new([{EX.S, RDF.type(), EX.O}], prefixes: %{"" => EX}) @example_ntriples_file "test/data/serialization_test_graph.nt" @@ -211,19 +211,17 @@ defmodule RDF.SerializationTest do end describe "write_file/2" do - test "without arguments, i.e. via file extension" do - file = file("write_file_test.ttl") + test "without :format option, i.e. via file extension and with streaming" do + file = file("write_file_test.nt") if File.exists?(file), do: File.rm(file) - assert Serialization.write_file(@example_graph, file, prefixes: %{"" => EX.__base_iri__()}) == - :ok - + assert Serialization.write_file(@example_graph, file, stream: true) == :ok assert File.exists?(file) - assert File.read!(file) == @example_turtle_string + assert File.read!(file) == @example_ntriples_string File.rm(file) end - test "with format name" do + test "with format name and without streaming" do file = file("write_file_test.nt") if File.exists?(file), do: File.rm(file) @@ -239,7 +237,7 @@ defmodule RDF.SerializationTest do end describe "write_file!/2" do - test "without arguments, i.e. via file extension" do + test "without :format option, i.e. via file extension and without streaming" do file = file("write_file_test.ttl") if File.exists?(file), do: File.rm(file) @@ -251,18 +249,62 @@ defmodule RDF.SerializationTest do File.rm(file) end - test "with format name" do + test "with format name and with streaming" do file = file("write_file_test.nt") if File.exists?(file), do: File.rm(file) - assert Serialization.write_file!(@example_graph, file, - format: :turtle, - prefixes: %{"" => EX.__base_iri__()} - ) == :ok - + assert Serialization.write_file!(@example_graph, file, format: :ntriples) == :ok assert File.exists?(file) - assert File.read!(file) == @example_turtle_string + assert File.read!(file) == @example_ntriples_string File.rm(file) end end + + describe "use_file_streaming/2" do + test "without opts" do + refute Serialization.use_file_streaming(NTriples.Decoder, []) + refute Serialization.use_file_streaming(NTriples.Encoder, []) + refute Serialization.use_file_streaming(Turtle.Decoder, []) + refute Serialization.use_file_streaming(Turtle.Encoder, []) + end + + test "when stream: true and format does support streams" do + assert Serialization.use_file_streaming(NTriples.Decoder, stream: true) + assert Serialization.use_file_streaming(NTriples.Encoder, stream: true) + end + + test "when stream: true and format does not support streams" do + assert_raise RuntimeError, "RDF.Turtle.Decoder does not support streams", fn -> + Serialization.use_file_streaming(Turtle.Decoder, stream: true) + end + + assert_raise RuntimeError, "RDF.Turtle.Encoder does not support streams", fn -> + Serialization.use_file_streaming(Turtle.Encoder, stream: true) + end + end + end + + describe "use_file_streaming!/2" do + test "without opts" do + assert Serialization.use_file_streaming!(NTriples.Decoder, []) + assert Serialization.use_file_streaming!(NTriples.Encoder, []) + refute Serialization.use_file_streaming!(Turtle.Decoder, []) + refute Serialization.use_file_streaming!(Turtle.Encoder, []) + end + + test "when stream: true and format does support streams" do + assert Serialization.use_file_streaming!(NTriples.Decoder, stream: true) + assert Serialization.use_file_streaming!(NTriples.Encoder, stream: true) + end + + test "when stream: true and format does not support streams" do + assert_raise RuntimeError, "RDF.Turtle.Decoder does not support streams", fn -> + Serialization.use_file_streaming!(Turtle.Decoder, stream: true) + end + + assert_raise RuntimeError, "RDF.Turtle.Encoder does not support streams", fn -> + Serialization.use_file_streaming!(Turtle.Encoder, stream: true) + end + end + end end