Support for streaming on file read and write serialization functions

This commit is contained in:
Marcel Otto 2020-11-05 13:36:08 +01:00
parent 2bff651276
commit d3f66bd5d9
5 changed files with 165 additions and 23 deletions

View file

@ -17,6 +17,11 @@ This project adheres to [Semantic Versioning](http://semver.org/) and
### Changed
- all `read_file/3` and `write_file/3` functions on `RDF.Serialization` and the
modules of RDF serialization formats can use streaming via the `:stream` flag
option; for `read_file/3` and `write_file/3` it defaults to `false`, while for
`read_file!/3` and `write_file!/3` it defaults to `true` when the respective
format supports streams
- the Inspect form of the RDF data structures are now Turtle-based and respect
the usual `:limit` behaviour
- more compact Inspect form for `RDF.PrefixMap`
@ -27,7 +32,6 @@ This project adheres to [Semantic Versioning](http://semver.org/) and
efficient in terms of performance and memory consumption than the previous
ref-based blank nodes
### Fixed
- `RDF.BlankNode`s based on refs weren't serializable to Turtle

View file

@ -7,7 +7,7 @@ defmodule RDF.Serialization.Reader do
implicitly use the proper `RDF.Serialization.Decoder` module.
"""
alias RDF.{Dataset, Graph}
alias RDF.{Serialization, Dataset, Graph}
@spec read_string(module, String.t(), keyword) :: {:ok, Graph.t() | Dataset.t()} | {:error, any}
def read_string(decoder, content, opts \\ []) do
@ -30,15 +30,44 @@ defmodule RDF.Serialization.Reader do
@spec read_file(module, Path.t(), keyword) :: {:ok, Graph.t() | Dataset.t()} | {:error, any}
def read_file(decoder, file, opts \\ []) do
decoder
|> Serialization.use_file_streaming(opts)
|> do_read_file(decoder, file, opts)
end
defp do_read_file(false, decoder, file, opts) do
case File.read(file) do
{:ok, content} -> read_string(decoder, content, opts)
{:ok, content} -> decoder.decode(content, opts)
{:error, reason} -> {:error, reason}
end
end
defp do_read_file(true, decoder, file, opts) do
{:ok,
file
|> File.stream!()
|> decoder.decode_from_stream(opts)}
rescue
error in RuntimeError -> {:error, error.message}
error -> {:error, error}
end
@spec read_file!(module, Path.t(), keyword) :: Graph.t() | Dataset.t()
def read_file!(decoder, file, opts \\ []) do
content = File.read!(file)
read_string!(decoder, content, opts)
decoder
|> Serialization.use_file_streaming!(opts)
|> do_read_file!(decoder, file, opts)
end
defp do_read_file!(false, decoder, file, opts) do
file
|> File.read!()
|> decoder.decode!(opts)
end
defp do_read_file!(true, decoder, file, opts) do
file
|> File.stream!()
|> decoder.decode_from_stream(opts)
end
end

View file

@ -350,4 +350,37 @@ defmodule RDF.Serialization do
{:error, "unable to detect serialization format"}
end
end
@doc false
def use_file_streaming(mod, opts) do
case Keyword.get(opts, :stream) do
true ->
if mod.stream_support?() do
true
else
raise "#{inspect(mod)} does not support streams"
end
_ ->
false
end
end
@doc false
def use_file_streaming!(mod, opts) do
case Keyword.get(opts, :stream) do
nil ->
mod.stream_support?()
true ->
if mod.stream_support?() do
true
else
raise "#{inspect(mod)} does not support streams"
end
false ->
false
end
end
end

View file

@ -7,6 +7,8 @@ defmodule RDF.Serialization.Writer do
implicitly use the proper `RDF.Serialization.Encoder` module.
"""
alias RDF.Serialization
@default_file_mode ~w[write exclusive]a
@spec write_string(module, RDF.Data.t(), keyword) :: {:ok, String.t()} | {:error, any}
@ -30,16 +32,48 @@ defmodule RDF.Serialization.Writer do
@spec write_file(module, RDF.Data.t(), Path.t(), keyword) :: :ok | {:error, any}
def write_file(encoder, data, path, opts \\ []) do
with {:ok, encoded_string} <- write_string(encoder, data, opts) do
encoder
|> Serialization.use_file_streaming(opts)
|> do_write_file(encoder, data, path, opts)
:ok
rescue
error in RuntimeError -> {:error, error.message}
error -> {:error, error}
end
defp do_write_file(false, encoder, data, path, opts) do
with {:ok, encoded_string} <- encoder.encode(data, opts) do
File.write(path, encoded_string, file_mode(encoder, opts))
end
end
defp do_write_file(true, encoder, data, path, opts) do
data
|> encoder.stream(opts)
|> Enum.into(File.stream!(path, file_mode(encoder, opts)))
end
@spec write_file!(module, RDF.Data.t(), Path.t(), keyword) :: :ok
def write_file!(encoder, data, path, opts \\ []) do
encoded_string = write_string!(encoder, data, opts)
encoder
|> Serialization.use_file_streaming!(opts)
|> do_write_file!(encoder, data, path, opts)
end
defp do_write_file!(false, encoder, data, path, opts) do
encoded_string = encoder.encode!(data, opts)
File.write!(path, encoded_string, file_mode(encoder, opts))
end
defp do_write_file!(true, encoder, data, path, opts) do
data
|> encoder.stream(opts)
|> Enum.into(File.stream!(path, file_mode(encoder, opts)))
:ok
end
end
defp file_mode(_encoder, opts) do
file_mode = Keyword.get(opts, :file_mode, @default_file_mode)

View file

@ -3,7 +3,7 @@ defmodule RDF.SerializationTest do
doctest RDF.Serialization
alias RDF.Serialization
alias RDF.{Serialization, NTriples, Turtle}
@example_graph Graph.new([{EX.S, RDF.type(), EX.O}], prefixes: %{"" => EX})
@example_ntriples_file "test/data/serialization_test_graph.nt"
@ -211,19 +211,17 @@ defmodule RDF.SerializationTest do
end
describe "write_file/2" do
test "without arguments, i.e. via file extension" do
file = file("write_file_test.ttl")
test "without :format option, i.e. via file extension and with streaming" do
file = file("write_file_test.nt")
if File.exists?(file), do: File.rm(file)
assert Serialization.write_file(@example_graph, file, prefixes: %{"" => EX.__base_iri__()}) ==
:ok
assert Serialization.write_file(@example_graph, file, stream: true) == :ok
assert File.exists?(file)
assert File.read!(file) == @example_turtle_string
assert File.read!(file) == @example_ntriples_string
File.rm(file)
end
test "with format name" do
test "with format name and without streaming" do
file = file("write_file_test.nt")
if File.exists?(file), do: File.rm(file)
@ -239,7 +237,7 @@ defmodule RDF.SerializationTest do
end
describe "write_file!/2" do
test "without arguments, i.e. via file extension" do
test "without :format option, i.e. via file extension and without streaming" do
file = file("write_file_test.ttl")
if File.exists?(file), do: File.rm(file)
@ -251,18 +249,62 @@ defmodule RDF.SerializationTest do
File.rm(file)
end
test "with format name" do
test "with format name and with streaming" do
file = file("write_file_test.nt")
if File.exists?(file), do: File.rm(file)
assert Serialization.write_file!(@example_graph, file,
format: :turtle,
prefixes: %{"" => EX.__base_iri__()}
) == :ok
assert Serialization.write_file!(@example_graph, file, format: :ntriples) == :ok
assert File.exists?(file)
assert File.read!(file) == @example_turtle_string
assert File.read!(file) == @example_ntriples_string
File.rm(file)
end
end
describe "use_file_streaming/2" do
test "without opts" do
refute Serialization.use_file_streaming(NTriples.Decoder, [])
refute Serialization.use_file_streaming(NTriples.Encoder, [])
refute Serialization.use_file_streaming(Turtle.Decoder, [])
refute Serialization.use_file_streaming(Turtle.Encoder, [])
end
test "when stream: true and format does support streams" do
assert Serialization.use_file_streaming(NTriples.Decoder, stream: true)
assert Serialization.use_file_streaming(NTriples.Encoder, stream: true)
end
test "when stream: true and format does not support streams" do
assert_raise RuntimeError, "RDF.Turtle.Decoder does not support streams", fn ->
Serialization.use_file_streaming(Turtle.Decoder, stream: true)
end
assert_raise RuntimeError, "RDF.Turtle.Encoder does not support streams", fn ->
Serialization.use_file_streaming(Turtle.Encoder, stream: true)
end
end
end
describe "use_file_streaming!/2" do
test "without opts" do
assert Serialization.use_file_streaming!(NTriples.Decoder, [])
assert Serialization.use_file_streaming!(NTriples.Encoder, [])
refute Serialization.use_file_streaming!(Turtle.Decoder, [])
refute Serialization.use_file_streaming!(Turtle.Encoder, [])
end
test "when stream: true and format does support streams" do
assert Serialization.use_file_streaming!(NTriples.Decoder, stream: true)
assert Serialization.use_file_streaming!(NTriples.Encoder, stream: true)
end
test "when stream: true and format does not support streams" do
assert_raise RuntimeError, "RDF.Turtle.Decoder does not support streams", fn ->
Serialization.use_file_streaming!(Turtle.Decoder, stream: true)
end
assert_raise RuntimeError, "RDF.Turtle.Encoder does not support streams", fn ->
Serialization.use_file_streaming!(Turtle.Encoder, stream: true)
end
end
end
end