Files
kopia/internal/jsonstream/reader.go
2016-08-21 00:21:40 -07:00

91 lines
2.0 KiB
Go

package jsonstream
import (
"encoding/json"
"fmt"
"io"
)
// Reader reads a stream of JSON objects.
type Reader struct {
decoder *json.Decoder
}
// Read reads the next JSON objects from the stream, returns io.EOF on the end of stream.
func (r *Reader) Read(v interface{}) error {
if r.decoder.More() {
return r.decoder.Decode(v)
}
if err := ensureDelimiter(r.decoder, json.Delim(']')); err != nil {
return invalidStreamFormatError(err)
}
if err := ensureDelimiter(r.decoder, json.Delim('}')); err != nil {
return invalidStreamFormatError(err)
}
return io.EOF
}
func ensureDelimiter(d *json.Decoder, expected json.Delim) error {
t, err := d.Token()
if err != nil {
return err
}
if t != expected {
return fmt.Errorf("expected '%v', got %v", expected.String(), t)
}
return nil
}
func ensureStringToken(d *json.Decoder, expected string) error {
t, err := d.Token()
if err != nil {
return err
}
if s, ok := t.(string); ok {
if s == expected {
return nil
}
}
return fmt.Errorf("expected '%v', got '%v'", expected, t)
}
func invalidStreamFormatError(cause error) error {
return fmt.Errorf("invalid stream format: %v", cause)
}
// NewReader returns new Reader on top of a given buffered reader.
// The provided header must match the beginning of a stream.
func NewReader(r io.Reader, header string) (*Reader, error) {
dr := Reader{
decoder: json.NewDecoder(r),
}
if err := ensureDelimiter(dr.decoder, json.Delim('{')); err != nil {
return nil, invalidStreamFormatError(err)
}
if err := ensureStringToken(dr.decoder, "stream"); err != nil {
return nil, invalidStreamFormatError(err)
}
if err := ensureStringToken(dr.decoder, header); err != nil {
return nil, invalidStreamFormatError(err)
}
if err := ensureStringToken(dr.decoder, "entries"); err != nil {
return nil, invalidStreamFormatError(err)
}
if err := ensureDelimiter(dr.decoder, json.Delim('[')); err != nil {
return nil, invalidStreamFormatError(err)
}
return &dr, nil
}