From 0e578386940801ee25a4dada05eb3087f080c446 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Mon, 21 Mar 2016 22:54:29 -0700 Subject: [PATCH] non-encrypted formatter --- Makefile | 6 + auth/credentials.go | 83 +++++++++ auth/credentials_test.go | 31 ++++ auth/keys.go | 43 +++++ cas/buffer_manager.go | 69 ++++++++ cas/buffer_manager_test.go | 49 ++++++ cas/crypto_stream.go | 91 ++++++++++ cas/crypto_stream_test.go | 103 +++++++++++ cas/format.go | 9 + cas/formatter.go | 69 ++++++++ cas/object_manager.go | 187 ++++++++++++++++++++ cas/object_manager_test.go | 352 +++++++++++++++++++++++++++++++++++++ cas/object_reader.go | 253 ++++++++++++++++++++++++++ cas/object_writer.go | 204 +++++++++++++++++++++ content/objectid.go | 4 +- 15 files changed, 1551 insertions(+), 2 deletions(-) create mode 100644 auth/credentials.go create mode 100644 auth/credentials_test.go create mode 100644 auth/keys.go create mode 100644 cas/buffer_manager.go create mode 100644 cas/buffer_manager_test.go create mode 100644 cas/crypto_stream.go create mode 100644 cas/crypto_stream_test.go create mode 100644 cas/format.go create mode 100644 cas/formatter.go create mode 100644 cas/object_manager.go create mode 100644 cas/object_manager_test.go create mode 100644 cas/object_reader.go create mode 100644 cas/object_writer.go diff --git a/Makefile b/Makefile index 699492bfd..8cc017b76 100644 --- a/Makefile +++ b/Makefile @@ -4,5 +4,11 @@ build: deps: go get -u -t -v github.com/kopia/kopia/... +dev-deps: + go get -u golang.org/x/tools/cmd/gorename + go get -u github.com/golang/lint/golint + go get -u golang.org/x/tools/cmd/oracle + go get -u github.com/nsf/gocode + test: go test -timeout 30s github.com/kopia/kopia/... diff --git a/auth/credentials.go b/auth/credentials.go new file mode 100644 index 000000000..a23b94a63 --- /dev/null +++ b/auth/credentials.go @@ -0,0 +1,83 @@ +package auth + +import ( + "crypto/sha256" + "io/ioutil" + "sync" + + "golang.org/x/crypto/pbkdf2" +) + +type Credentials interface { + Username() string + PrivateKey() *UserPrivateKey +} + +type credentials struct { + sync.Mutex + once sync.Once + + username string + privateKey *UserPrivateKey + passwordPrompt func() string +} + +func (pc *credentials) Username() string { + return pc.username +} + +func (pc *credentials) PrivateKey() *UserPrivateKey { + pc.once.Do(pc.deriveKeyFromPassword) + + return pc.privateKey +} + +func (pc *credentials) deriveKeyFromPassword() { + if pc.privateKey != nil { + return + } + + password := pc.passwordPrompt() + k := pbkdf2.Key([]byte(password), []byte(pc.username), pbkdf2Rounds, 32, sha256.New) + pk, err := newPrivateKey(k) + if err != nil { + panic("should not happen") + } + pc.privateKey = pk +} + +func Password(username, password string) Credentials { + return &credentials{ + username: username, + passwordPrompt: func() string { + return password + }, + } +} + +func PasswordPrompt(username string, prompt func() string) Credentials { + return &credentials{ + username: username, + passwordPrompt: prompt, + } +} + +func Key(username string, key []byte) (Credentials, error) { + pk, err := newPrivateKey(key) + if err != nil { + return nil, err + } + + return &credentials{ + username: username, + privateKey: pk, + }, nil +} + +func KeyFromFile(username string, fileName string) (Credentials, error) { + k, err := ioutil.ReadFile(fileName) + if err != nil { + return nil, err + } + return Key(username, k) +} diff --git a/auth/credentials_test.go b/auth/credentials_test.go new file mode 100644 index 000000000..5bd6e807b --- /dev/null +++ b/auth/credentials_test.go @@ -0,0 +1,31 @@ +package auth + +import ( + "bytes" + "encoding/hex" + + "testing" +) + +func TestCredentials(t *testing.T) { + cases := []struct { + username string + password string + expectedKey string + }{ + {"foo", "bar", "60d6051cfbff0f53344ff64cd9770c65747ced5c541748b7f992cf575bffa2ad"}, + {"user", "bar", "fff2b04b391c1a31a41dab88843311ce7f93393ec97fb8a1be3697c5a88b85ca"}, + } + + for i, c := range cases { + creds := Password(c.username, c.password) + if u := creds.Username(); u != c.username { + t.Errorf("invalid username #%v: %v expected %v", i, u, c.username) + } + + expectedKeyBytes, _ := hex.DecodeString(c.expectedKey) + if v := creds.PrivateKey(); !bytes.Equal(expectedKeyBytes, v.Bytes()) { + t.Errorf("invalid key #%v: expected %x, got: %x", i, expectedKeyBytes, v.Bytes()) + } + } +} diff --git a/auth/keys.go b/auth/keys.go new file mode 100644 index 000000000..544e4cd75 --- /dev/null +++ b/auth/keys.go @@ -0,0 +1,43 @@ +package auth + +import ( + "errors" + + "golang.org/x/crypto/curve25519" +) + +const ( + pbkdf2Rounds = 10000 + masterKeySize = 32 +) + +type UserPrivateKey struct { + key [32]byte +} + +func (prv UserPrivateKey) Bytes() []byte { + r := make([]byte, 32) + copy(r, prv.key[:]) + return r +} + +func (prv UserPrivateKey) PublicKey() *UserPublicKey { + pub := &UserPublicKey{} + + curve25519.ScalarBaseMult(&pub.key, &prv.key) + return pub +} + +func newPrivateKey(key []byte) (*UserPrivateKey, error) { + if len(key) != 32 { + return nil, errors.New("unsupported key length") + } + + k := &UserPrivateKey{} + copy(k.key[:], key) + return k, nil +} + +type UserPublicKey struct { + key [32]byte +} diff --git a/cas/buffer_manager.go b/cas/buffer_manager.go new file mode 100644 index 000000000..1c5731224 --- /dev/null +++ b/cas/buffer_manager.go @@ -0,0 +1,69 @@ +package cas + +import ( + "bytes" + "io" + "log" + "sync" + "sync/atomic" +) + +// bufferManager manages pool of reusable bytes.Buffer objects. +type bufferManager struct { + outstandingCount int32 + + pool sync.Pool +} + +// newBuffer returns a new or reused bytes.Buffer. +func (mgr *bufferManager) newBuffer() *bytes.Buffer { + atomic.AddInt32(&mgr.outstandingCount, 1) + + b := mgr.pool.Get().(*bytes.Buffer) + b.Reset() + return b +} + +// returnBuffer returns the give buffer to the pool +func (mgr *bufferManager) returnBuffer(b *bytes.Buffer) { + atomic.AddInt32(&mgr.outstandingCount, -1) + mgr.pool.Put(b) +} + +func (mgr *bufferManager) returnBufferOnClose(b *bytes.Buffer) io.ReadCloser { + return &returnOnCloser{ + buffer: b, + mgr: mgr, + } +} + +func (mgr *bufferManager) close() { + if mgr.outstandingCount != 0 { + log.Println("WARNING: Found buffer leaks.") + } +} + +type returnOnCloser struct { + buffer *bytes.Buffer + mgr *bufferManager +} + +func (roc *returnOnCloser) Read(b []byte) (int, error) { + return roc.buffer.Read(b) +} + +func (roc *returnOnCloser) Close() error { + roc.mgr.returnBuffer(roc.buffer) + return nil +} + +func newBufferManager(blockSize int) *bufferManager { + mgr := &bufferManager{} + mgr.pool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, blockSize)) + }, + } + return mgr + +} diff --git a/cas/buffer_manager_test.go b/cas/buffer_manager_test.go new file mode 100644 index 000000000..aa40cd6f6 --- /dev/null +++ b/cas/buffer_manager_test.go @@ -0,0 +1,49 @@ +package cas + +import ( + "bytes" + "testing" +) + +func TestBufferManager(t *testing.T) { + mgr := newBufferManager(10) + defer mgr.close() + + verifyBufferClean := func(b *bytes.Buffer) { + if b.Cap() != 10 { + t.Errorf("unexpected cap: %v", b.Cap()) + } + if b.Len() != 0 { + t.Errorf("unexpected len: %v", b.Len()) + } + } + + b := mgr.newBuffer() + if mgr.outstandingCount != 1 { + t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount) + } + b1 := mgr.newBuffer() + verifyBufferClean(b) + verifyBufferClean(b1) + if mgr.outstandingCount != 2 { + t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount) + } + closer := mgr.returnBufferOnClose(b) + closer.Close() + if mgr.outstandingCount != 1 { + t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount) + } + mgr.returnBuffer(b) + if mgr.outstandingCount != 0 { + t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount) + } + b2 := mgr.newBuffer() + if mgr.outstandingCount != 1 { + t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount) + } + verifyBufferClean(b2) + mgr.returnBuffer(b2) + if mgr.outstandingCount != 0 { + t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount) + } +} diff --git a/cas/crypto_stream.go b/cas/crypto_stream.go new file mode 100644 index 000000000..57742bf28 --- /dev/null +++ b/cas/crypto_stream.go @@ -0,0 +1,91 @@ +package cas + +import ( + "bytes" + "crypto/cipher" + "hash" + "io" +) + +// encryptingReader wraps an io.Reader and returns data encrypted using a stream cipher +type encryptingReader struct { + source io.Reader + header io.Reader + checksum io.Reader + closer io.Closer + + cipher cipher.Stream + hash hash.Hash +} + +func (er *encryptingReader) Read(b []byte) (int, error) { + read := 0 + for len(b) > 0 { + switch { + case er.header != nil: + n, err := er.header.Read(b) + er.addToChecksum(b[0:n]) + read += n + b = b[n:] + if err == io.EOF { + er.header = nil + } else if err != nil { + return read, err + } + + case er.source != nil: + n, err := er.source.Read(b) + got := b[0:n] + er.cipher.XORKeyStream(got, got) + er.addToChecksum(got) + read += n + b = b[n:] + if err == io.EOF { + er.source = nil + er.checksum = bytes.NewReader(er.hash.Sum(nil)) + } else if err != nil { + return read, err + } + + case er.checksum != nil: + n, err := er.checksum.Read(b) + b = b[n:] + read += n + if err == io.EOF { + er.checksum = nil + } + if err != nil { + return read, err + } + + default: + return read, io.EOF + } + } + return read, nil +} + +func (er *encryptingReader) addToChecksum(b []byte) { + n, err := er.hash.Write(b) + if err != nil || n != len(b) { + panic("unexpected hashing error") + } +} + +func (er *encryptingReader) Close() error { + if er.closer != nil { + return er.closer.Close() + } + + return nil +} + +func newEncryptingReader(source io.ReadCloser, header []byte, c cipher.Stream, hash hash.Hash) io.ReadCloser { + return &encryptingReader{ + source: source, + header: bytes.NewReader(header), + cipher: c, + hash: hash, + closer: source, + } +} diff --git a/cas/crypto_stream_test.go b/cas/crypto_stream_test.go new file mode 100644 index 000000000..6f7ab3ff9 --- /dev/null +++ b/cas/crypto_stream_test.go @@ -0,0 +1,103 @@ +package cas + +import ( + "bytes" + "crypto/cipher" + "crypto/hmac" + "crypto/md5" + "crypto/sha1" + "encoding/hex" + "fmt" + "hash" + "io/ioutil" + + "testing" +) + +type fakeStreamCipher struct { + xor byte +} + +func (nsc *fakeStreamCipher) XORKeyStream(dst, src []byte) { + for i := 0; i < len(src); i++ { + dst[i] = src[i] ^ nsc.xor + } +} + +func TestCryptoStream(t *testing.T) { + for _, s := range []struct { + cipher cipher.Stream + hash func() hash.Hash + header string + data string + expected string + }{ + { + cipher: &fakeStreamCipher{}, + hash: sha1.New, + header: "", + data: "", + expected: "da39a3ee5e6b4b0d3255bfef95601890afd80709", // SHA1 of empty string + }, + { + cipher: &fakeStreamCipher{}, + hash: md5.New, + header: "", + data: "", + expected: "d41d8cd98f00b204e9800998ecf8427e", // MD5 of empty string + }, + { + cipher: &fakeStreamCipher{}, + hash: sha1.New, + header: "0000", + data: "", + expected: "00001489f923c4dca729178b3e3233458550d8dddf29", + }, + { + cipher: &fakeStreamCipher{}, + hash: sha1.New, + header: "1234", + data: "", + expected: "1234ffa76d854a2969e7b9d83868d455512fce0fd74d", + }, + { + cipher: &fakeStreamCipher{}, + hash: sha1.New, + header: "deadbeef", + data: "cafeb1bab3c0", + expected: "deadbeefcafeb1bab3c00b01e595963c80cee1e04a6c1079dc2e186a553f", + }, + { + cipher: &fakeStreamCipher{0}, + hash: func() hash.Hash { return hmac.New(sha1.New, []byte{1, 2, 3}) }, + header: "deadbeef", + data: "cafeb1bab3c0", + expected: "deadbeefcafeb1bab3c0a5c88d0104f5fafc8ee629002104199b523665e5", + }, + } { + header, err := hex.DecodeString(s.header) + if err != nil { + t.Errorf("error decoding IV: %v", err) + continue + } + data, err := hex.DecodeString(s.data) + if err != nil { + t.Errorf("error decoding data: %v", err) + continue + } + enc := newEncryptingReader( + ioutil.NopCloser(bytes.NewReader(data)), + header, + s.cipher, + s.hash()) + v, err := ioutil.ReadAll(enc) + actual := fmt.Sprintf("%x", v) + if err != nil { + t.Errorf("expected %v got error: %v", s.expected, err) + } + + if actual != s.expected { + t.Errorf("expected %v got: %v", s.expected, actual) + } + } +} diff --git a/cas/format.go b/cas/format.go new file mode 100644 index 000000000..7fa142cc3 --- /dev/null +++ b/cas/format.go @@ -0,0 +1,9 @@ +package cas + +// Format describes the format of object data. +type Format struct { + Algorithm string `json:"algorithm"` + Secret []byte `json:"secret,omitempty"` + MaxInlineBlobSize int `json:"maxInlineBlobSize"` + MaxBlobSize int `json:"maxBlobSize"` +} diff --git a/cas/formatter.go b/cas/formatter.go new file mode 100644 index 000000000..2b1d8d7aa --- /dev/null +++ b/cas/formatter.go @@ -0,0 +1,69 @@ +package cas + +import ( + "crypto/aes" + "crypto/cipher" + "crypto/hmac" + "crypto/rand" + "crypto/sha256" + "crypto/sha512" + "encoding/hex" + "hash" + "io" + + "github.com/kopia/kopia/content" +) + +type streamTransformer func(io.ReadCloser) io.ReadCloser + +type objectFormatter interface { + Do(b []byte, prefix string) (content.ObjectID, streamTransformer) +} + +type nonEncryptingFormatter struct { + hash func() hash.Hash +} + +func (f *nonEncryptingFormatter) Do(b []byte, prefix string) (content.ObjectID, streamTransformer) { + h := f.hash() + h.Write(b) + blockID := hex.EncodeToString(h.Sum(nil)) + + return content.ObjectID(prefix + blockID), func(r io.ReadCloser) io.ReadCloser { return r } +} + +func newNonEncryptingFormatter(hash func() hash.Hash) objectFormatter { + return &nonEncryptingFormatter{ + hash: hash, + } +} + +type aesEncryptingFormatter struct { + masterContentSecret []byte +} + +func (f *aesEncryptingFormatter) Do(b []byte, prefix string) (content.ObjectID, streamTransformer) { + // Compute HMAC-SHA512 of the content + s := hmac.New(sha512.New, f.masterContentSecret) + s.Write(b) + contentHash := s.Sum(nil) + + // Split the hash into two portions - encryption key and content ID. + aesKey := contentHash[0:32] + return content.ObjectID(prefix + hex.EncodeToString(contentHash[32:64]) + ".e"), + func(r io.ReadCloser) io.ReadCloser { + var iv [aes.BlockSize]byte + rand.Read(iv[:]) + + validationKey := []byte{1, 2, 3, 4} + + aes, err := aes.NewCipher(aesKey) + if err != nil { + panic("") + } + + ctr := cipher.NewCTR(aes, iv[:]) + + return newEncryptingReader(r, iv[:], ctr, hmac.New(sha256.New, validationKey)) + } +} diff --git a/cas/object_manager.go b/cas/object_manager.go new file mode 100644 index 000000000..151057fb4 --- /dev/null +++ b/cas/object_manager.go @@ -0,0 +1,187 @@ +package cas + +import ( + "crypto/hmac" + "crypto/md5" + "crypto/sha1" + "crypto/sha256" + "crypto/sha512" + "fmt" + "hash" + "io" + "strings" + + "github.com/kopia/kopia/content" + "github.com/kopia/kopia/storage" +) + +// ObjectManager manages objects stored in a repository and allows reading and writing them. +type ObjectManager interface { + // NewWriter opens an ObjectWriter for writing new content to the repository. + NewWriter(options ...WriterOption) ObjectWriter + + // Open creates an io.ReadSeeker for reading object with a specified ID. + Open(objectID content.ObjectID) (io.ReadSeeker, error) + + Flush() error + Repository() storage.Repository + Close() +} + +type objectManager struct { + repository storage.Repository + verbose bool + formatter objectFormatter + bufferManager *bufferManager + + maxInlineBlobSize int + maxBlobSize int +} + +func (mgr *objectManager) Close() { + mgr.Flush() + mgr.bufferManager.close() +} + +func (mgr *objectManager) Flush() error { + return mgr.repository.Flush() +} + +func (mgr *objectManager) Repository() storage.Repository { + return mgr.repository +} + +func (mgr *objectManager) NewWriter(options ...WriterOption) ObjectWriter { + result := newObjectWriter( + objectWriterConfig{ + mgr: mgr, + putOptions: storage.PutOptions{}, + }, + content.ObjectIDTypeStored) + + for _, option := range options { + option(result) + } + + return result +} + +func (mgr *objectManager) Open(objectID content.ObjectID) (io.ReadSeeker, error) { + r, err := mgr.newRawReader(objectID) + if err != nil { + return nil, err + } + + if objectID.Type() == content.ObjectIDTypeList { + seekTable := make([]seekTableEntry, 0, 100) + + seekTable, err = mgr.flattenListChunk(seekTable, objectID, r) + if err != nil { + return nil, err + } + + totalLength := seekTable[len(seekTable)-1].endOffset() + + return &objectReader{ + repository: mgr.repository, + seekTable: seekTable, + totalLength: totalLength, + }, nil + } + return r, err +} + +// ObjectManagerOption controls the behavior of ObjectManager. +type ObjectManagerOption func(o *objectManager) error + +// WriteBack is an ObjectManagerOption that enables asynchronous writes to the repository using the pool +// of goroutines. +func WriteBack(workerCount int) ObjectManagerOption { + return func(o *objectManager) error { + o.repository = storage.NewWriteBackWrapper(o.repository, workerCount) + return nil + } +} + +// WriteLimit is an ObjectManagerOption that sets the limit on the number of bytes that can be written +// to the repository in this ObjectManager session. Once the limit is reached, the repository will +// return ErrWriteLimitExceeded. +func WriteLimit(maxBytes int64) ObjectManagerOption { + return func(o *objectManager) error { + o.repository = storage.NewWriteLimitWrapper(o.repository, maxBytes) + return nil + } +} + +// EnableLogging is an ObjectManagerOption that causes all repository access to be logged. +func EnableLogging() ObjectManagerOption { + return func(o *objectManager) error { + o.repository = storage.NewLoggingWrapper(o.repository) + return nil + } +} + +func MaxInlineBlobSize(sizeBytes int) ObjectManagerOption { + return func(o *objectManager) error { + o.maxInlineBlobSize = sizeBytes + return nil + } +} + +func MaxBlobSize(sizeBytes int) ObjectManagerOption { + return func(o *objectManager) error { + o.maxBlobSize = sizeBytes + return nil + } +} + +// NewObjectManager creates new ObjectManager with the specified repository, options, and key provider. +func NewObjectManager( + r storage.Repository, + f Format, + options ...ObjectManagerOption, +) (ObjectManager, error) { + mgr := &objectManager{ + repository: r, + maxInlineBlobSize: f.MaxInlineBlobSize, + maxBlobSize: f.MaxBlobSize, + } + + var hashFunc func() hash.Hash + + hashAlgo := f.Algorithm + hf := strings.TrimPrefix(hashAlgo, "hmac-") + + switch hf { + case "md5": + hashFunc = md5.New + case "sha1": + hashFunc = sha1.New + case "sha256": + hashFunc = sha256.New + case "sha512": + hashFunc = sha512.New + default: + return nil, fmt.Errorf("unknown hash function: %v", hf) + } + + if strings.HasPrefix(hashAlgo, "hmac-") { + rawHashFunc := hashFunc + hashFunc = func() hash.Hash { + return hmac.New(rawHashFunc, f.Secret) + } + } + + mgr.formatter = newNonEncryptingFormatter(hashFunc) + + for _, o := range options { + if err := o(mgr); err != nil { + mgr.Close() + return nil, err + } + } + + mgr.bufferManager = newBufferManager(mgr.maxBlobSize) + + return mgr, nil +} diff --git a/cas/object_manager_test.go b/cas/object_manager_test.go new file mode 100644 index 000000000..44f41c933 --- /dev/null +++ b/cas/object_manager_test.go @@ -0,0 +1,352 @@ +package cas + +import ( + "bytes" + "crypto/md5" + cryptorand "crypto/rand" + "encoding/hex" + "fmt" + "io/ioutil" + "math/rand" + "reflect" + "strings" + "testing" + + "github.com/kopia/kopia/content" + "github.com/kopia/kopia/storage" +) + +func testFormat() Format { + return Format{ + MaxBlobSize: 200, + MaxInlineBlobSize: 20, + Algorithm: "md5", + } +} + +func getMd5Digest(data []byte) string { + hash := md5.New() + hash.Write(data) + return hex.EncodeToString(hash.Sum(nil)) +} + +func getMd5CObjectID(data []byte) string { + return fmt.Sprintf("C%s", getMd5Digest(data)) +} + +func getMd5LObjectID(data []byte) string { + return fmt.Sprintf("L%s", getMd5Digest(data)) +} + +func setupTest(t *testing.T) (data map[string][]byte, mgr ObjectManager) { + data = map[string][]byte{} + st := storage.NewMapRepository(data) + + mgr, err := NewObjectManager(st, testFormat()) + if err != nil { + t.Errorf("cannot create manager: %v", err) + } + return +} + +func TestWriters(t *testing.T) { + cases := []struct { + data []byte + objectID content.ObjectID + }{ + {[]byte{}, "B"}, + {[]byte("quick brown fox"), "Tquick brown fox"}, + {[]byte{1, 2, 3, 4}, "BAQIDBA=="}, + {[]byte{0xc2, 0x28}, "Bwig="}, // invalid UTF-8, will be represented as binary + { + []byte("the quick brown fox jumps over the lazy dog"), + "CX77add1d5f41223d5582fca736a5cb335", + }, + {make([]byte, 100), "CX6d0bb00954ceb7fbee436bb55a8397a9"}, // 100 zero bytes + } + + for _, c := range cases { + data, mgr := setupTest(t) + + writer := mgr.NewWriter( + WithBlockNamePrefix("X"), + ) + + writer.Write(c.data) + + result, err := writer.Result(false) + if err != nil { + t.Errorf("error getting writer results for %v, expected: %v", c.data, c.objectID) + continue + } + + if result != c.objectID { + t.Errorf("incorrect result for %v, expected: %v got: %v", c.data, c.objectID, result) + } + + if !c.objectID.Type().IsStored() { + if len(data) != 0 { + t.Errorf("unexpected data written to the repository: %v", data) + } + } else { + if len(data) != 1 { + t.Errorf("unexpected data written to the repository: %v", data) + } + } + } + +} + +func TestWriterCompleteChunkInTwoWrites(t *testing.T) { + _, mgr := setupTest(t) + + bytes := make([]byte, 100) + writer := mgr.NewWriter( + WithBlockNamePrefix("X"), + ) + writer.Write(bytes[0:50]) + writer.Write(bytes[0:50]) + result, err := writer.Result(false) + if string(result) != "CX6d0bb00954ceb7fbee436bb55a8397a9" { + t.Errorf("unexpected result: %v err: %v", result, err) + } +} + +func TestWriterListChunk(t *testing.T) { + data, mgr := setupTest(t) + + contentBytes := make([]byte, 250) + contentMd5Sum200 := getMd5Digest(contentBytes[0:200]) // hash of 200 zero bytes + contentMd5Sum50 := getMd5Digest(contentBytes[200:250]) // hash of 50 zero bytes + listChunkContent := []byte("200,C" + contentMd5Sum200 + "\n50,C" + contentMd5Sum50 + "\n") + listChunkObjectID := getMd5LObjectID(listChunkContent) + + writer := mgr.NewWriter() + writer.Write(contentBytes) + result, err := writer.Result(false) + if err != nil { + t.Errorf("error getting writer results: %v", err) + } + + if string(result) != listChunkObjectID { + t.Errorf("incorrect list chunk ID: %v, expected: %v", result, listChunkObjectID) + } + + // We have 3 chunks - 200 zero bytes, 50 zero bytes and the list. + if !reflect.DeepEqual(data, map[string][]byte{ + contentMd5Sum200: contentBytes[0:200], + contentMd5Sum50: contentBytes[0:50], + getMd5Digest(listChunkContent): listChunkContent, + }) { + t.Errorf("invalid repository contents: %v", data) + } +} + +func TestWriterListOfListsChunk(t *testing.T) { + data, mgr := setupTest(t) + + contentBytes := make([]byte, 1400) + chunk1Id := getMd5CObjectID(contentBytes[0:200]) // hash of 200 zero bytes + + list1ChunkContent := []byte(strings.Repeat("200,"+chunk1Id+"\n", 5)) + list1ObjectID := fmt.Sprintf("%v,%v", len(list1ChunkContent), getMd5LObjectID(list1ChunkContent)) + + list2ChunkContent := []byte(strings.Repeat("200,"+chunk1Id+"\n", 2)) + list2ObjectID := fmt.Sprintf("%v,%v", len(list2ChunkContent), getMd5LObjectID(list2ChunkContent)) + + listOfListsChunkContent := []byte(list1ObjectID + "\n" + list2ObjectID + "\n") + listOfListsObjectID := getMd5LObjectID(listOfListsChunkContent) + + writer := mgr.NewWriter() + writer.Write(contentBytes) + result, err := writer.Result(false) + if string(result) != listOfListsObjectID || err != nil { + t.Errorf("unexpected result: %v expected: %v, err: %v", result, listOfListsObjectID, err) + } + + // We have 4 chunks - 200 zero bytes, 2 lists, and 1 list-of-lists. + if !reflect.DeepEqual(data, map[string][]byte{ + getMd5Digest(contentBytes[0:200]): contentBytes[0:200], + getMd5Digest(list1ChunkContent): list1ChunkContent, + getMd5Digest(list2ChunkContent): list2ChunkContent, + getMd5Digest(listOfListsChunkContent): listOfListsChunkContent, + }) { + t.Errorf("invalid repository contents: %v", data) + } +} + +func TestWriterListOfListsOfListsChunk(t *testing.T) { + data, mgr := setupTest(t) + + writtenData := make([]byte, 10000) + chunk1Id := getMd5CObjectID(writtenData[0:200]) // hash of 200 zero bytes + + // First level list chunk has 5 C[] chunk IDs, because that's how many IDs fit in one chunk. + // (that number 200 was chosen for a reason, to make testing easy) + // + // 200,Cfbaf48ec981a5eecdb57b929fdd426e8\n + // 200,Cfbaf48ec981a5eecdb57b929fdd426e8\n + // 200,Cfbaf48ec981a5eecdb57b929fdd426e8\n + // 200,Cfbaf48ec981a5eecdb57b929fdd426e8\n + // 200,Cfbaf48ec981a5eecdb57b929fdd426e8\n + list1ChunkContent := []byte(strings.Repeat("200,"+chunk1Id+"\n", 5)) + list1ObjectID := fmt.Sprintf("%v,%v", len(list1ChunkContent), getMd5LObjectID(list1ChunkContent)) + + // Second level lists L[] chunks from the first level. They have all the same content + // because all lists are identical. + // 190,L52760f658059fef754f5deabdd01df93\n + // 190,L52760f658059fef754f5deabdd01df93\n + // 190,L52760f658059fef754f5deabdd01df93\n + // 190,L52760f658059fef754f5deabdd01df93\n + // 190,L52760f658059fef754f5deabdd01df93\n + list2ChunkContent := []byte(strings.Repeat(string(list1ObjectID)+"\n", 5)) + list2ObjectID := fmt.Sprintf("%v,%v", len(list2ChunkContent), getMd5LObjectID(list2ChunkContent)) + + // Now those lists are also identical and represent 5000 bytes each, so + // the top-level list-of-lists-of-lists will have 2 entries: + // + // 190,Lb99b28e34c87e4934b4cc5631bb38ee8\n + // 190,Lb99b28e34c87e4934b4cc5631bb38ee8\n + list3ChunkContent := []byte(strings.Repeat(string(list2ObjectID)+"\n", 2)) + list3ObjectID := getMd5LObjectID(list3ChunkContent) + + writer := mgr.NewWriter() + writer.Write(writtenData) + + result, err := writer.Result(false) + if string(result) != list3ObjectID { + t.Errorf("unexpected result: %v err: %v", result, err) + } + + // We have 4 data blocks representing 10000 bytes of zero. Not bad! + if !reflect.DeepEqual(data, map[string][]byte{ + getMd5Digest(writtenData[0:200]): writtenData[0:200], + getMd5Digest(list1ChunkContent): list1ChunkContent, + getMd5Digest(list2ChunkContent): list2ChunkContent, + getMd5Digest(list3ChunkContent): list3ChunkContent, + }) { + t.Errorf("invalid repository contents: %v", data) + } +} + +func TestEncryption(t *testing.T) { + data := map[string][]byte{} + content := bytes.Repeat([]byte{0xcd}, 50) + + s := testFormat() + s.Algorithm = "hmac-md5" + s.Secret = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25} + + mgr, err := NewObjectManager(storage.NewMapRepository(data), s) + if err != nil { + t.Errorf("cannot create manager: %v", err) + } + w := mgr.NewWriter() + w.Write(content) + result, err := w.Result(false) + if string(result) != "C697eaf0aca3a3aea3a75164746ffaa79" { + t.Errorf("unexpected result: %v err: %v", result, err) + } +} + +func TestReader(t *testing.T) { + data, mgr := setupTest(t) + + storedPayload := []byte("foo\nbar") + data["abcdef"] = storedPayload + + cases := []struct { + text string + payload []byte + }{ + {"B", []byte{}}, + {"BAQIDBA==", []byte{1, 2, 3, 4}}, + {"T", []byte{}}, + {"Tfoo\nbar", []byte("foo\nbar")}, + {"Cabcdef", storedPayload}, + } + + for _, c := range cases { + objectID, err := content.ParseObjectID(c.text) + if err != nil { + t.Errorf("cannot parse object ID: %v", err) + continue + } + + reader, err := mgr.Open(objectID) + if err != nil { + t.Errorf("cannot create reader for %v: %v", objectID, err) + continue + } + + d, err := ioutil.ReadAll(reader) + if err != nil { + t.Errorf("cannot read all data for %v: %v", objectID, err) + continue + } + if !bytes.Equal(d, c.payload) { + t.Errorf("incorrect payload for %v: expected: %v got: %v", objectID, c.payload, d) + continue + } + } +} + +func TestReaderStoredBlockNotFound(t *testing.T) { + _, mgr := setupTest(t) + + objectID, err := content.ParseObjectID("Cno-such-block") + if err != nil { + t.Errorf("cannot parse object ID: %v", err) + } + reader, err := mgr.Open(objectID) + if err != storage.ErrBlockNotFound || reader != nil { + t.Errorf("unexpected result: reader: %v err: %v", reader, err) + } +} + +func TestEndToEndReadAndSeek(t *testing.T) { + _, mgr := setupTest(t) + + for _, forceStored := range []bool{false, true} { + for _, size := range []int{1, 199, 200, 201, 9999, 512434} { + // Create some random data sample of the specified size. + randomData := make([]byte, size) + cryptorand.Read(randomData) + + writer := mgr.NewWriter( + WithBlockNamePrefix("X"), + ) + writer.Write(randomData) + objectID, err := writer.Result(forceStored) + if err != nil { + t.Errorf("cannot get writer result for %v/%v: %v", forceStored, size, err) + continue + } + + reader, err := mgr.Open(objectID) + if err != nil { + t.Errorf("cannot get reader for %v/%v: %v", forceStored, size, err) + continue + } + + for i := 0; i < 20; i++ { + sampleSize := int(rand.Int31n(300)) + seekOffset := int(rand.Int31n(int32(len(randomData)))) + if seekOffset+sampleSize > len(randomData) { + sampleSize = len(randomData) - seekOffset + } + if sampleSize > 0 { + got := make([]byte, sampleSize) + reader.Seek(int64(seekOffset), 0) + reader.Read(got) + + expected := randomData[seekOffset : seekOffset+sampleSize] + + if !bytes.Equal(expected, got) { + t.Errorf("incorrect data read for %v/%v: expected: %v, got: %v", forceStored, size, expected, got) + } + } + } + } + } +} diff --git a/cas/object_reader.go b/cas/object_reader.go new file mode 100644 index 000000000..d983dabbf --- /dev/null +++ b/cas/object_reader.go @@ -0,0 +1,253 @@ +package cas + +import ( + "bufio" + "bytes" + "fmt" + "io" + "path/filepath" + "strconv" + "strings" + + "github.com/kopia/kopia/content" + "github.com/kopia/kopia/storage" +) + +type repositoryReader interface { + GetBlock(id storage.BlockID) ([]byte, error) +} + +type seekTableEntry struct { + startOffset int64 + length int64 + blockID storage.BlockID +} + +func (r *seekTableEntry) endOffset() int64 { + return r.startOffset + int64(r.length) +} + +func (r *seekTableEntry) String() string { + return fmt.Sprintf("start: %d len: %d end: %d block: %s", + r.startOffset, + r.length, + r.endOffset(), + r.blockID) +} + +type objectReader struct { + repository repositoryReader + + seekTable []seekTableEntry + + currentPosition int64 // Overall position in the objectReader + totalLength int64 // Overall length + + currentChunkIndex int // Index of current chunk in the seek table + currentChunkData []byte // Current chunk data + currentChunkPosition int // Read position in the current chunk +} + +func (r *objectReader) Read(buffer []byte) (int, error) { + readBytes := 0 + remaining := len(buffer) + + for remaining > 0 { + if r.currentChunkData != nil { + toCopy := len(r.currentChunkData) - r.currentChunkPosition + if toCopy == 0 { + // EOF on curren chunk + r.closeCurrentChunk() + r.currentChunkIndex++ + continue + } + + if toCopy > remaining { + toCopy = remaining + } + + copy(buffer[readBytes:], + r.currentChunkData[r.currentChunkPosition:r.currentChunkPosition+toCopy]) + r.currentChunkPosition += toCopy + r.currentPosition += int64(toCopy) + readBytes += toCopy + remaining -= toCopy + } else if r.currentChunkIndex < len(r.seekTable) { + err := r.openCurrentChunk() + if err != nil { + return 0, err + } + } else { + break + } + } + + if readBytes == 0 { + return readBytes, io.EOF + } + + return readBytes, nil +} + +func (r *objectReader) openCurrentChunk() error { + blockID := r.seekTable[r.currentChunkIndex].blockID + blockData, err := r.repository.GetBlock(blockID) + if err != nil { + return err + } + + r.currentChunkData = blockData + r.currentChunkPosition = 0 + return nil +} + +func (r *objectReader) closeCurrentChunk() { + r.currentChunkData = nil +} + +func (r *objectReader) findChunkIndexForOffset(offset int64) int { + left := 0 + right := len(r.seekTable) - 1 + for left <= right { + middle := (left + right) / 2 + + if offset < r.seekTable[middle].startOffset { + right = middle - 1 + continue + } + + if offset >= r.seekTable[middle].endOffset() { + left = middle + 1 + continue + } + + return middle + } + + panic("Unreachable code") +} + +func (r *objectReader) Seek(offset int64, whence int) (int64, error) { + if whence == 1 { + return r.Seek(r.currentPosition+offset, 0) + } + + if whence == 2 { + return r.Seek(r.totalLength+offset, 0) + } + + if offset < 0 { + return -1, fmt.Errorf("Invalid seek.") + } + + if offset > r.totalLength { + offset = r.totalLength + } + + index := r.findChunkIndexForOffset(offset) + + chunkStartOffset := r.seekTable[index].startOffset + + if index != r.currentChunkIndex { + r.closeCurrentChunk() + r.currentChunkIndex = index + } + + if r.currentChunkData == nil { + r.openCurrentChunk() + } + + r.currentChunkPosition = int(offset - chunkStartOffset) + r.currentPosition = offset + + return r.currentPosition, nil +} + +func (mgr *objectManager) newRawReader(objectID content.ObjectID) (io.ReadSeeker, error) { + inline := objectID.InlineData() + if inline != nil { + return bytes.NewReader(inline), nil + } + + blockID := objectID.BlockID() + ext := filepath.Ext(string(blockID)) + payload, err := mgr.repository.GetBlock(blockID) + if err != nil { + return nil, err + } + + switch ext { + case "": + // Unencrypted data + return bytes.NewReader(payload), nil + + case ".e": + return mgr.decryptPayload(payload) + + default: + return nil, fmt.Errorf("unsupported block format: %v", blockID) + + } +} + +func (mgr *objectManager) decryptPayload(b []byte) (io.ReadSeeker, error) { + return nil, nil +} + +func (mgr *objectManager) flattenListChunk( + seekTable []seekTableEntry, + listObjectID content.ObjectID, + rawReader io.Reader) ([]seekTableEntry, error) { + + scanner := bufio.NewScanner(rawReader) + + for scanner.Scan() { + c := scanner.Text() + comma := strings.Index(c, ",") + if comma <= 0 { + return nil, fmt.Errorf("unsupported entry '%v' in list '%s'", c, listObjectID) + } + + length, err := strconv.ParseInt(c[0:comma], 10, 64) + + objectID, err := content.ParseObjectID(c[comma+1:]) + if err != nil { + return nil, fmt.Errorf("unsupported entry '%v' in list '%s': %#v", c, listObjectID, err) + } + + switch objectID.Type() { + case content.ObjectIDTypeList: + subreader, err := mgr.newRawReader(objectID) + if err != nil { + return nil, err + } + + seekTable, err = mgr.flattenListChunk(seekTable, objectID, subreader) + if err != nil { + return nil, err + } + + case content.ObjectIDTypeStored: + var startOffset int64 + if len(seekTable) > 0 { + startOffset = seekTable[len(seekTable)-1].endOffset() + } else { + startOffset = 0 + } + + seekTable = append( + seekTable, + seekTableEntry{ + blockID: objectID.BlockID(), + startOffset: startOffset, + length: length, + }) + + default: + return nil, fmt.Errorf("unsupported entry '%v' in list '%v'", objectID, listObjectID) + + } + } + + return seekTable, nil +} diff --git a/cas/object_writer.go b/cas/object_writer.go new file mode 100644 index 000000000..dc7b67044 --- /dev/null +++ b/cas/object_writer.go @@ -0,0 +1,204 @@ +package cas + +import ( + "bytes" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "unicode/utf8" + + "github.com/kopia/kopia/content" + "github.com/kopia/kopia/storage" +) + +type blockHasher interface { + hashBuffer(data []byte) string +} + +// ObjectWriter allows writing content to the repository and supports automatic deduplication and encryption +// of written data. +type ObjectWriter interface { + io.WriteCloser + + Result(forceStored bool) (content.ObjectID, error) +} + +// objectWriterConfig +type objectWriterConfig struct { + mgr *objectManager + putOptions storage.PutOptions +} + +type objectWriter struct { + objectWriterConfig + + buffer *bytes.Buffer + totalLength int64 + + prefix string + listWriter *objectWriter + flushedObjectCount int + lastFlushedObject content.ObjectID + + description string + objectType content.ObjectIDType + + atomicWrites bool +} + +func (w *objectWriter) Close() error { + if w.buffer != nil { + w.mgr.bufferManager.returnBuffer(w.buffer) + w.buffer = nil + } + if w.listWriter != nil { + w.listWriter.Close() + w.listWriter = nil + } + return nil +} + +func (w *objectWriter) Write(data []byte) (n int, err error) { + remaining := len(data) + w.totalLength += int64(remaining) + + for remaining > 0 { + if w.buffer == nil { + w.buffer = w.mgr.bufferManager.newBuffer() + } + room := w.buffer.Cap() - w.buffer.Len() + + if remaining <= room { + w.buffer.Write(data) + remaining = 0 + } else { + if w.atomicWrites { + if w.buffer == nil { + // We're at the beginning of a buffer, fail if the buffer is too small. + return 0, fmt.Errorf("object writer buffer too small, need: %v, have: %v", remaining, room) + } + if err := w.flushBuffer(); err != nil { + return 0, err + } + + continue + } + + w.buffer.Write(data[0:room]) + + if err := w.flushBuffer(); err != nil { + return 0, err + } + data = data[room:] + remaining = len(data) + } + } + return len(data), nil +} + +func (w *objectWriter) flushBuffer() error { + if w.buffer != nil { + data := w.buffer.Bytes() + length := w.buffer.Len() + + b := w.mgr.bufferManager.returnBufferOnClose(w.buffer) + w.buffer = nil + + objectID, transformer := w.mgr.formatter.Do(data, string(w.objectType)+w.prefix) + b = transformer(b) + + if err := w.mgr.repository.PutBlock(objectID.BlockID(), b, storage.PutOptions{}); err != nil { + return fmt.Errorf( + "error when flushing chunk %d of %s to %#v: %#v", + w.flushedObjectCount, + w.description, + objectID.BlockID(), + err) + } + + w.flushedObjectCount++ + w.lastFlushedObject = objectID + if w.listWriter == nil { + w.listWriter = newObjectWriter(w.objectWriterConfig, content.ObjectIDTypeList) + w.listWriter.description = "LIST(" + w.description + ")" + w.listWriter.atomicWrites = true + } + + fmt.Fprintf(w.listWriter, "%v,%v\n", length, objectID) + } + return nil +} + +func newObjectWriter(cfg objectWriterConfig, objectType content.ObjectIDType) *objectWriter { + return &objectWriter{ + objectWriterConfig: cfg, + objectType: objectType, + } +} + +func (w *objectWriter) Result(forceStored bool) (content.ObjectID, error) { + if !forceStored && w.flushedObjectCount == 0 { + if w.buffer == nil { + return content.NewInlineBinaryObjectID(nil), nil + } + + if w.buffer.Len() < w.mgr.maxInlineBlobSize { + data := w.buffer.Bytes() + if !utf8.Valid(data) { + return content.NewInlineBinaryObjectID(data), nil + } + + // If the binary represents valid UTF8, try encoding it as text (JSON) or binary chunk + // and pick the one that has shorter representation. + dataString := string(data) + jsonData, _ := json.Marshal(dataString) + + jsonLen := len(jsonData) + base64Len := base64.StdEncoding.EncodedLen(len(data)) + + if jsonLen < base64Len { + return content.NewInlineTextObjectID(dataString), nil + } + + return content.NewInlineBinaryObjectID(data), nil + } + } + + w.flushBuffer() + defer func() { + w.listWriter.Close() + }() + + if w.flushedObjectCount == 1 { + return w.lastFlushedObject, nil + } else if w.flushedObjectCount == 0 { + return content.NullObjectID, nil + } else { + return w.listWriter.Result(true) + } +} + +// WriterOption is an option that can be passed to ObjectManager.NewWriter() +type WriterOption func(*objectWriter) + +// WithBlockNamePrefix causes the ObjectWriter to prefix any blocks emitted to the repository with a given string. +func WithBlockNamePrefix(prefix string) WriterOption { + return func(w *objectWriter) { + w.prefix = prefix + } +} + +// WithDescription is used for debugging only and causes the following string to be emitted in logs. +func WithDescription(description string) WriterOption { + return func(w *objectWriter) { + w.description = description + } +} + +// WithPutOptions causes the ObjectWriter to use the specified options when writing blocks to the repository. +func WithPutOptions(options storage.PutOptions) WriterOption { + return func(w *objectWriter) { + w.putOptions = options + } +} diff --git a/content/objectid.go b/content/objectid.go index 6d2b3f7b9..69a1b6644 100644 --- a/content/objectid.go +++ b/content/objectid.go @@ -142,11 +142,11 @@ func (c ObjectID) EncryptionInfo() ObjectEncryptionInfo { return NoEncryption } -func newInlineBinaryObjectID(data []byte) ObjectID { +func NewInlineBinaryObjectID(data []byte) ObjectID { return ObjectID("B" + base64.StdEncoding.EncodeToString(data)) } -func newInlineTextObjectID(text string) ObjectID { +func NewInlineTextObjectID(text string) ObjectID { return ObjectID("T" + text) }