non-encrypted formatter

This commit is contained in:
Jarek Kowalski
2016-03-21 22:54:29 -07:00
parent 3e3c4de77b
commit 0e57838694
15 changed files with 1551 additions and 2 deletions

View File

@@ -4,5 +4,11 @@ build:
deps:
go get -u -t -v github.com/kopia/kopia/...
dev-deps:
go get -u golang.org/x/tools/cmd/gorename
go get -u github.com/golang/lint/golint
go get -u golang.org/x/tools/cmd/oracle
go get -u github.com/nsf/gocode
test:
go test -timeout 30s github.com/kopia/kopia/...

83
auth/credentials.go Normal file
View File

@@ -0,0 +1,83 @@
package auth
import (
"crypto/sha256"
"io/ioutil"
"sync"
"golang.org/x/crypto/pbkdf2"
)
type Credentials interface {
Username() string
PrivateKey() *UserPrivateKey
}
type credentials struct {
sync.Mutex
once sync.Once
username string
privateKey *UserPrivateKey
passwordPrompt func() string
}
func (pc *credentials) Username() string {
return pc.username
}
func (pc *credentials) PrivateKey() *UserPrivateKey {
pc.once.Do(pc.deriveKeyFromPassword)
return pc.privateKey
}
func (pc *credentials) deriveKeyFromPassword() {
if pc.privateKey != nil {
return
}
password := pc.passwordPrompt()
k := pbkdf2.Key([]byte(password), []byte(pc.username), pbkdf2Rounds, 32, sha256.New)
pk, err := newPrivateKey(k)
if err != nil {
panic("should not happen")
}
pc.privateKey = pk
}
func Password(username, password string) Credentials {
return &credentials{
username: username,
passwordPrompt: func() string {
return password
},
}
}
func PasswordPrompt(username string, prompt func() string) Credentials {
return &credentials{
username: username,
passwordPrompt: prompt,
}
}
func Key(username string, key []byte) (Credentials, error) {
pk, err := newPrivateKey(key)
if err != nil {
return nil, err
}
return &credentials{
username: username,
privateKey: pk,
}, nil
}
func KeyFromFile(username string, fileName string) (Credentials, error) {
k, err := ioutil.ReadFile(fileName)
if err != nil {
return nil, err
}
return Key(username, k)
}

31
auth/credentials_test.go Normal file
View File

@@ -0,0 +1,31 @@
package auth
import (
"bytes"
"encoding/hex"
"testing"
)
func TestCredentials(t *testing.T) {
cases := []struct {
username string
password string
expectedKey string
}{
{"foo", "bar", "60d6051cfbff0f53344ff64cd9770c65747ced5c541748b7f992cf575bffa2ad"},
{"user", "bar", "fff2b04b391c1a31a41dab88843311ce7f93393ec97fb8a1be3697c5a88b85ca"},
}
for i, c := range cases {
creds := Password(c.username, c.password)
if u := creds.Username(); u != c.username {
t.Errorf("invalid username #%v: %v expected %v", i, u, c.username)
}
expectedKeyBytes, _ := hex.DecodeString(c.expectedKey)
if v := creds.PrivateKey(); !bytes.Equal(expectedKeyBytes, v.Bytes()) {
t.Errorf("invalid key #%v: expected %x, got: %x", i, expectedKeyBytes, v.Bytes())
}
}
}

43
auth/keys.go Normal file
View File

@@ -0,0 +1,43 @@
package auth
import (
"errors"
"golang.org/x/crypto/curve25519"
)
const (
pbkdf2Rounds = 10000
masterKeySize = 32
)
type UserPrivateKey struct {
key [32]byte
}
func (prv UserPrivateKey) Bytes() []byte {
r := make([]byte, 32)
copy(r, prv.key[:])
return r
}
func (prv UserPrivateKey) PublicKey() *UserPublicKey {
pub := &UserPublicKey{}
curve25519.ScalarBaseMult(&pub.key, &prv.key)
return pub
}
func newPrivateKey(key []byte) (*UserPrivateKey, error) {
if len(key) != 32 {
return nil, errors.New("unsupported key length")
}
k := &UserPrivateKey{}
copy(k.key[:], key)
return k, nil
}
type UserPublicKey struct {
key [32]byte
}

69
cas/buffer_manager.go Normal file
View File

@@ -0,0 +1,69 @@
package cas
import (
"bytes"
"io"
"log"
"sync"
"sync/atomic"
)
// bufferManager manages pool of reusable bytes.Buffer objects.
type bufferManager struct {
outstandingCount int32
pool sync.Pool
}
// newBuffer returns a new or reused bytes.Buffer.
func (mgr *bufferManager) newBuffer() *bytes.Buffer {
atomic.AddInt32(&mgr.outstandingCount, 1)
b := mgr.pool.Get().(*bytes.Buffer)
b.Reset()
return b
}
// returnBuffer returns the give buffer to the pool
func (mgr *bufferManager) returnBuffer(b *bytes.Buffer) {
atomic.AddInt32(&mgr.outstandingCount, -1)
mgr.pool.Put(b)
}
func (mgr *bufferManager) returnBufferOnClose(b *bytes.Buffer) io.ReadCloser {
return &returnOnCloser{
buffer: b,
mgr: mgr,
}
}
func (mgr *bufferManager) close() {
if mgr.outstandingCount != 0 {
log.Println("WARNING: Found buffer leaks.")
}
}
type returnOnCloser struct {
buffer *bytes.Buffer
mgr *bufferManager
}
func (roc *returnOnCloser) Read(b []byte) (int, error) {
return roc.buffer.Read(b)
}
func (roc *returnOnCloser) Close() error {
roc.mgr.returnBuffer(roc.buffer)
return nil
}
func newBufferManager(blockSize int) *bufferManager {
mgr := &bufferManager{}
mgr.pool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, blockSize))
},
}
return mgr
}

View File

@@ -0,0 +1,49 @@
package cas
import (
"bytes"
"testing"
)
func TestBufferManager(t *testing.T) {
mgr := newBufferManager(10)
defer mgr.close()
verifyBufferClean := func(b *bytes.Buffer) {
if b.Cap() != 10 {
t.Errorf("unexpected cap: %v", b.Cap())
}
if b.Len() != 0 {
t.Errorf("unexpected len: %v", b.Len())
}
}
b := mgr.newBuffer()
if mgr.outstandingCount != 1 {
t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount)
}
b1 := mgr.newBuffer()
verifyBufferClean(b)
verifyBufferClean(b1)
if mgr.outstandingCount != 2 {
t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount)
}
closer := mgr.returnBufferOnClose(b)
closer.Close()
if mgr.outstandingCount != 1 {
t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount)
}
mgr.returnBuffer(b)
if mgr.outstandingCount != 0 {
t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount)
}
b2 := mgr.newBuffer()
if mgr.outstandingCount != 1 {
t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount)
}
verifyBufferClean(b2)
mgr.returnBuffer(b2)
if mgr.outstandingCount != 0 {
t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount)
}
}

91
cas/crypto_stream.go Normal file
View File

@@ -0,0 +1,91 @@
package cas
import (
"bytes"
"crypto/cipher"
"hash"
"io"
)
// encryptingReader wraps an io.Reader and returns data encrypted using a stream cipher
type encryptingReader struct {
source io.Reader
header io.Reader
checksum io.Reader
closer io.Closer
cipher cipher.Stream
hash hash.Hash
}
func (er *encryptingReader) Read(b []byte) (int, error) {
read := 0
for len(b) > 0 {
switch {
case er.header != nil:
n, err := er.header.Read(b)
er.addToChecksum(b[0:n])
read += n
b = b[n:]
if err == io.EOF {
er.header = nil
} else if err != nil {
return read, err
}
case er.source != nil:
n, err := er.source.Read(b)
got := b[0:n]
er.cipher.XORKeyStream(got, got)
er.addToChecksum(got)
read += n
b = b[n:]
if err == io.EOF {
er.source = nil
er.checksum = bytes.NewReader(er.hash.Sum(nil))
} else if err != nil {
return read, err
}
case er.checksum != nil:
n, err := er.checksum.Read(b)
b = b[n:]
read += n
if err == io.EOF {
er.checksum = nil
}
if err != nil {
return read, err
}
default:
return read, io.EOF
}
}
return read, nil
}
func (er *encryptingReader) addToChecksum(b []byte) {
n, err := er.hash.Write(b)
if err != nil || n != len(b) {
panic("unexpected hashing error")
}
}
func (er *encryptingReader) Close() error {
if er.closer != nil {
return er.closer.Close()
}
return nil
}
func newEncryptingReader(source io.ReadCloser, header []byte, c cipher.Stream, hash hash.Hash) io.ReadCloser {
return &encryptingReader{
source: source,
header: bytes.NewReader(header),
cipher: c,
hash: hash,
closer: source,
}
}

103
cas/crypto_stream_test.go Normal file
View File

@@ -0,0 +1,103 @@
package cas
import (
"bytes"
"crypto/cipher"
"crypto/hmac"
"crypto/md5"
"crypto/sha1"
"encoding/hex"
"fmt"
"hash"
"io/ioutil"
"testing"
)
type fakeStreamCipher struct {
xor byte
}
func (nsc *fakeStreamCipher) XORKeyStream(dst, src []byte) {
for i := 0; i < len(src); i++ {
dst[i] = src[i] ^ nsc.xor
}
}
func TestCryptoStream(t *testing.T) {
for _, s := range []struct {
cipher cipher.Stream
hash func() hash.Hash
header string
data string
expected string
}{
{
cipher: &fakeStreamCipher{},
hash: sha1.New,
header: "",
data: "",
expected: "da39a3ee5e6b4b0d3255bfef95601890afd80709", // SHA1 of empty string
},
{
cipher: &fakeStreamCipher{},
hash: md5.New,
header: "",
data: "",
expected: "d41d8cd98f00b204e9800998ecf8427e", // MD5 of empty string
},
{
cipher: &fakeStreamCipher{},
hash: sha1.New,
header: "0000",
data: "",
expected: "00001489f923c4dca729178b3e3233458550d8dddf29",
},
{
cipher: &fakeStreamCipher{},
hash: sha1.New,
header: "1234",
data: "",
expected: "1234ffa76d854a2969e7b9d83868d455512fce0fd74d",
},
{
cipher: &fakeStreamCipher{},
hash: sha1.New,
header: "deadbeef",
data: "cafeb1bab3c0",
expected: "deadbeefcafeb1bab3c00b01e595963c80cee1e04a6c1079dc2e186a553f",
},
{
cipher: &fakeStreamCipher{0},
hash: func() hash.Hash { return hmac.New(sha1.New, []byte{1, 2, 3}) },
header: "deadbeef",
data: "cafeb1bab3c0",
expected: "deadbeefcafeb1bab3c0a5c88d0104f5fafc8ee629002104199b523665e5",
},
} {
header, err := hex.DecodeString(s.header)
if err != nil {
t.Errorf("error decoding IV: %v", err)
continue
}
data, err := hex.DecodeString(s.data)
if err != nil {
t.Errorf("error decoding data: %v", err)
continue
}
enc := newEncryptingReader(
ioutil.NopCloser(bytes.NewReader(data)),
header,
s.cipher,
s.hash())
v, err := ioutil.ReadAll(enc)
actual := fmt.Sprintf("%x", v)
if err != nil {
t.Errorf("expected %v got error: %v", s.expected, err)
}
if actual != s.expected {
t.Errorf("expected %v got: %v", s.expected, actual)
}
}
}

9
cas/format.go Normal file
View File

@@ -0,0 +1,9 @@
package cas
// Format describes the format of object data.
type Format struct {
Algorithm string `json:"algorithm"`
Secret []byte `json:"secret,omitempty"`
MaxInlineBlobSize int `json:"maxInlineBlobSize"`
MaxBlobSize int `json:"maxBlobSize"`
}

69
cas/formatter.go Normal file
View File

@@ -0,0 +1,69 @@
package cas
import (
"crypto/aes"
"crypto/cipher"
"crypto/hmac"
"crypto/rand"
"crypto/sha256"
"crypto/sha512"
"encoding/hex"
"hash"
"io"
"github.com/kopia/kopia/content"
)
type streamTransformer func(io.ReadCloser) io.ReadCloser
type objectFormatter interface {
Do(b []byte, prefix string) (content.ObjectID, streamTransformer)
}
type nonEncryptingFormatter struct {
hash func() hash.Hash
}
func (f *nonEncryptingFormatter) Do(b []byte, prefix string) (content.ObjectID, streamTransformer) {
h := f.hash()
h.Write(b)
blockID := hex.EncodeToString(h.Sum(nil))
return content.ObjectID(prefix + blockID), func(r io.ReadCloser) io.ReadCloser { return r }
}
func newNonEncryptingFormatter(hash func() hash.Hash) objectFormatter {
return &nonEncryptingFormatter{
hash: hash,
}
}
type aesEncryptingFormatter struct {
masterContentSecret []byte
}
func (f *aesEncryptingFormatter) Do(b []byte, prefix string) (content.ObjectID, streamTransformer) {
// Compute HMAC-SHA512 of the content
s := hmac.New(sha512.New, f.masterContentSecret)
s.Write(b)
contentHash := s.Sum(nil)
// Split the hash into two portions - encryption key and content ID.
aesKey := contentHash[0:32]
return content.ObjectID(prefix + hex.EncodeToString(contentHash[32:64]) + ".e"),
func(r io.ReadCloser) io.ReadCloser {
var iv [aes.BlockSize]byte
rand.Read(iv[:])
validationKey := []byte{1, 2, 3, 4}
aes, err := aes.NewCipher(aesKey)
if err != nil {
panic("")
}
ctr := cipher.NewCTR(aes, iv[:])
return newEncryptingReader(r, iv[:], ctr, hmac.New(sha256.New, validationKey))
}
}

187
cas/object_manager.go Normal file
View File

@@ -0,0 +1,187 @@
package cas
import (
"crypto/hmac"
"crypto/md5"
"crypto/sha1"
"crypto/sha256"
"crypto/sha512"
"fmt"
"hash"
"io"
"strings"
"github.com/kopia/kopia/content"
"github.com/kopia/kopia/storage"
)
// ObjectManager manages objects stored in a repository and allows reading and writing them.
type ObjectManager interface {
// NewWriter opens an ObjectWriter for writing new content to the repository.
NewWriter(options ...WriterOption) ObjectWriter
// Open creates an io.ReadSeeker for reading object with a specified ID.
Open(objectID content.ObjectID) (io.ReadSeeker, error)
Flush() error
Repository() storage.Repository
Close()
}
type objectManager struct {
repository storage.Repository
verbose bool
formatter objectFormatter
bufferManager *bufferManager
maxInlineBlobSize int
maxBlobSize int
}
func (mgr *objectManager) Close() {
mgr.Flush()
mgr.bufferManager.close()
}
func (mgr *objectManager) Flush() error {
return mgr.repository.Flush()
}
func (mgr *objectManager) Repository() storage.Repository {
return mgr.repository
}
func (mgr *objectManager) NewWriter(options ...WriterOption) ObjectWriter {
result := newObjectWriter(
objectWriterConfig{
mgr: mgr,
putOptions: storage.PutOptions{},
},
content.ObjectIDTypeStored)
for _, option := range options {
option(result)
}
return result
}
func (mgr *objectManager) Open(objectID content.ObjectID) (io.ReadSeeker, error) {
r, err := mgr.newRawReader(objectID)
if err != nil {
return nil, err
}
if objectID.Type() == content.ObjectIDTypeList {
seekTable := make([]seekTableEntry, 0, 100)
seekTable, err = mgr.flattenListChunk(seekTable, objectID, r)
if err != nil {
return nil, err
}
totalLength := seekTable[len(seekTable)-1].endOffset()
return &objectReader{
repository: mgr.repository,
seekTable: seekTable,
totalLength: totalLength,
}, nil
}
return r, err
}
// ObjectManagerOption controls the behavior of ObjectManager.
type ObjectManagerOption func(o *objectManager) error
// WriteBack is an ObjectManagerOption that enables asynchronous writes to the repository using the pool
// of goroutines.
func WriteBack(workerCount int) ObjectManagerOption {
return func(o *objectManager) error {
o.repository = storage.NewWriteBackWrapper(o.repository, workerCount)
return nil
}
}
// WriteLimit is an ObjectManagerOption that sets the limit on the number of bytes that can be written
// to the repository in this ObjectManager session. Once the limit is reached, the repository will
// return ErrWriteLimitExceeded.
func WriteLimit(maxBytes int64) ObjectManagerOption {
return func(o *objectManager) error {
o.repository = storage.NewWriteLimitWrapper(o.repository, maxBytes)
return nil
}
}
// EnableLogging is an ObjectManagerOption that causes all repository access to be logged.
func EnableLogging() ObjectManagerOption {
return func(o *objectManager) error {
o.repository = storage.NewLoggingWrapper(o.repository)
return nil
}
}
func MaxInlineBlobSize(sizeBytes int) ObjectManagerOption {
return func(o *objectManager) error {
o.maxInlineBlobSize = sizeBytes
return nil
}
}
func MaxBlobSize(sizeBytes int) ObjectManagerOption {
return func(o *objectManager) error {
o.maxBlobSize = sizeBytes
return nil
}
}
// NewObjectManager creates new ObjectManager with the specified repository, options, and key provider.
func NewObjectManager(
r storage.Repository,
f Format,
options ...ObjectManagerOption,
) (ObjectManager, error) {
mgr := &objectManager{
repository: r,
maxInlineBlobSize: f.MaxInlineBlobSize,
maxBlobSize: f.MaxBlobSize,
}
var hashFunc func() hash.Hash
hashAlgo := f.Algorithm
hf := strings.TrimPrefix(hashAlgo, "hmac-")
switch hf {
case "md5":
hashFunc = md5.New
case "sha1":
hashFunc = sha1.New
case "sha256":
hashFunc = sha256.New
case "sha512":
hashFunc = sha512.New
default:
return nil, fmt.Errorf("unknown hash function: %v", hf)
}
if strings.HasPrefix(hashAlgo, "hmac-") {
rawHashFunc := hashFunc
hashFunc = func() hash.Hash {
return hmac.New(rawHashFunc, f.Secret)
}
}
mgr.formatter = newNonEncryptingFormatter(hashFunc)
for _, o := range options {
if err := o(mgr); err != nil {
mgr.Close()
return nil, err
}
}
mgr.bufferManager = newBufferManager(mgr.maxBlobSize)
return mgr, nil
}

352
cas/object_manager_test.go Normal file
View File

@@ -0,0 +1,352 @@
package cas
import (
"bytes"
"crypto/md5"
cryptorand "crypto/rand"
"encoding/hex"
"fmt"
"io/ioutil"
"math/rand"
"reflect"
"strings"
"testing"
"github.com/kopia/kopia/content"
"github.com/kopia/kopia/storage"
)
func testFormat() Format {
return Format{
MaxBlobSize: 200,
MaxInlineBlobSize: 20,
Algorithm: "md5",
}
}
func getMd5Digest(data []byte) string {
hash := md5.New()
hash.Write(data)
return hex.EncodeToString(hash.Sum(nil))
}
func getMd5CObjectID(data []byte) string {
return fmt.Sprintf("C%s", getMd5Digest(data))
}
func getMd5LObjectID(data []byte) string {
return fmt.Sprintf("L%s", getMd5Digest(data))
}
func setupTest(t *testing.T) (data map[string][]byte, mgr ObjectManager) {
data = map[string][]byte{}
st := storage.NewMapRepository(data)
mgr, err := NewObjectManager(st, testFormat())
if err != nil {
t.Errorf("cannot create manager: %v", err)
}
return
}
func TestWriters(t *testing.T) {
cases := []struct {
data []byte
objectID content.ObjectID
}{
{[]byte{}, "B"},
{[]byte("quick brown fox"), "Tquick brown fox"},
{[]byte{1, 2, 3, 4}, "BAQIDBA=="},
{[]byte{0xc2, 0x28}, "Bwig="}, // invalid UTF-8, will be represented as binary
{
[]byte("the quick brown fox jumps over the lazy dog"),
"CX77add1d5f41223d5582fca736a5cb335",
},
{make([]byte, 100), "CX6d0bb00954ceb7fbee436bb55a8397a9"}, // 100 zero bytes
}
for _, c := range cases {
data, mgr := setupTest(t)
writer := mgr.NewWriter(
WithBlockNamePrefix("X"),
)
writer.Write(c.data)
result, err := writer.Result(false)
if err != nil {
t.Errorf("error getting writer results for %v, expected: %v", c.data, c.objectID)
continue
}
if result != c.objectID {
t.Errorf("incorrect result for %v, expected: %v got: %v", c.data, c.objectID, result)
}
if !c.objectID.Type().IsStored() {
if len(data) != 0 {
t.Errorf("unexpected data written to the repository: %v", data)
}
} else {
if len(data) != 1 {
t.Errorf("unexpected data written to the repository: %v", data)
}
}
}
}
func TestWriterCompleteChunkInTwoWrites(t *testing.T) {
_, mgr := setupTest(t)
bytes := make([]byte, 100)
writer := mgr.NewWriter(
WithBlockNamePrefix("X"),
)
writer.Write(bytes[0:50])
writer.Write(bytes[0:50])
result, err := writer.Result(false)
if string(result) != "CX6d0bb00954ceb7fbee436bb55a8397a9" {
t.Errorf("unexpected result: %v err: %v", result, err)
}
}
func TestWriterListChunk(t *testing.T) {
data, mgr := setupTest(t)
contentBytes := make([]byte, 250)
contentMd5Sum200 := getMd5Digest(contentBytes[0:200]) // hash of 200 zero bytes
contentMd5Sum50 := getMd5Digest(contentBytes[200:250]) // hash of 50 zero bytes
listChunkContent := []byte("200,C" + contentMd5Sum200 + "\n50,C" + contentMd5Sum50 + "\n")
listChunkObjectID := getMd5LObjectID(listChunkContent)
writer := mgr.NewWriter()
writer.Write(contentBytes)
result, err := writer.Result(false)
if err != nil {
t.Errorf("error getting writer results: %v", err)
}
if string(result) != listChunkObjectID {
t.Errorf("incorrect list chunk ID: %v, expected: %v", result, listChunkObjectID)
}
// We have 3 chunks - 200 zero bytes, 50 zero bytes and the list.
if !reflect.DeepEqual(data, map[string][]byte{
contentMd5Sum200: contentBytes[0:200],
contentMd5Sum50: contentBytes[0:50],
getMd5Digest(listChunkContent): listChunkContent,
}) {
t.Errorf("invalid repository contents: %v", data)
}
}
func TestWriterListOfListsChunk(t *testing.T) {
data, mgr := setupTest(t)
contentBytes := make([]byte, 1400)
chunk1Id := getMd5CObjectID(contentBytes[0:200]) // hash of 200 zero bytes
list1ChunkContent := []byte(strings.Repeat("200,"+chunk1Id+"\n", 5))
list1ObjectID := fmt.Sprintf("%v,%v", len(list1ChunkContent), getMd5LObjectID(list1ChunkContent))
list2ChunkContent := []byte(strings.Repeat("200,"+chunk1Id+"\n", 2))
list2ObjectID := fmt.Sprintf("%v,%v", len(list2ChunkContent), getMd5LObjectID(list2ChunkContent))
listOfListsChunkContent := []byte(list1ObjectID + "\n" + list2ObjectID + "\n")
listOfListsObjectID := getMd5LObjectID(listOfListsChunkContent)
writer := mgr.NewWriter()
writer.Write(contentBytes)
result, err := writer.Result(false)
if string(result) != listOfListsObjectID || err != nil {
t.Errorf("unexpected result: %v expected: %v, err: %v", result, listOfListsObjectID, err)
}
// We have 4 chunks - 200 zero bytes, 2 lists, and 1 list-of-lists.
if !reflect.DeepEqual(data, map[string][]byte{
getMd5Digest(contentBytes[0:200]): contentBytes[0:200],
getMd5Digest(list1ChunkContent): list1ChunkContent,
getMd5Digest(list2ChunkContent): list2ChunkContent,
getMd5Digest(listOfListsChunkContent): listOfListsChunkContent,
}) {
t.Errorf("invalid repository contents: %v", data)
}
}
func TestWriterListOfListsOfListsChunk(t *testing.T) {
data, mgr := setupTest(t)
writtenData := make([]byte, 10000)
chunk1Id := getMd5CObjectID(writtenData[0:200]) // hash of 200 zero bytes
// First level list chunk has 5 C[] chunk IDs, because that's how many IDs fit in one chunk.
// (that number 200 was chosen for a reason, to make testing easy)
//
// 200,Cfbaf48ec981a5eecdb57b929fdd426e8\n
// 200,Cfbaf48ec981a5eecdb57b929fdd426e8\n
// 200,Cfbaf48ec981a5eecdb57b929fdd426e8\n
// 200,Cfbaf48ec981a5eecdb57b929fdd426e8\n
// 200,Cfbaf48ec981a5eecdb57b929fdd426e8\n
list1ChunkContent := []byte(strings.Repeat("200,"+chunk1Id+"\n", 5))
list1ObjectID := fmt.Sprintf("%v,%v", len(list1ChunkContent), getMd5LObjectID(list1ChunkContent))
// Second level lists L[] chunks from the first level. They have all the same content
// because all lists are identical.
// 190,L52760f658059fef754f5deabdd01df93\n
// 190,L52760f658059fef754f5deabdd01df93\n
// 190,L52760f658059fef754f5deabdd01df93\n
// 190,L52760f658059fef754f5deabdd01df93\n
// 190,L52760f658059fef754f5deabdd01df93\n
list2ChunkContent := []byte(strings.Repeat(string(list1ObjectID)+"\n", 5))
list2ObjectID := fmt.Sprintf("%v,%v", len(list2ChunkContent), getMd5LObjectID(list2ChunkContent))
// Now those lists are also identical and represent 5000 bytes each, so
// the top-level list-of-lists-of-lists will have 2 entries:
//
// 190,Lb99b28e34c87e4934b4cc5631bb38ee8\n
// 190,Lb99b28e34c87e4934b4cc5631bb38ee8\n
list3ChunkContent := []byte(strings.Repeat(string(list2ObjectID)+"\n", 2))
list3ObjectID := getMd5LObjectID(list3ChunkContent)
writer := mgr.NewWriter()
writer.Write(writtenData)
result, err := writer.Result(false)
if string(result) != list3ObjectID {
t.Errorf("unexpected result: %v err: %v", result, err)
}
// We have 4 data blocks representing 10000 bytes of zero. Not bad!
if !reflect.DeepEqual(data, map[string][]byte{
getMd5Digest(writtenData[0:200]): writtenData[0:200],
getMd5Digest(list1ChunkContent): list1ChunkContent,
getMd5Digest(list2ChunkContent): list2ChunkContent,
getMd5Digest(list3ChunkContent): list3ChunkContent,
}) {
t.Errorf("invalid repository contents: %v", data)
}
}
func TestEncryption(t *testing.T) {
data := map[string][]byte{}
content := bytes.Repeat([]byte{0xcd}, 50)
s := testFormat()
s.Algorithm = "hmac-md5"
s.Secret = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25}
mgr, err := NewObjectManager(storage.NewMapRepository(data), s)
if err != nil {
t.Errorf("cannot create manager: %v", err)
}
w := mgr.NewWriter()
w.Write(content)
result, err := w.Result(false)
if string(result) != "C697eaf0aca3a3aea3a75164746ffaa79" {
t.Errorf("unexpected result: %v err: %v", result, err)
}
}
func TestReader(t *testing.T) {
data, mgr := setupTest(t)
storedPayload := []byte("foo\nbar")
data["abcdef"] = storedPayload
cases := []struct {
text string
payload []byte
}{
{"B", []byte{}},
{"BAQIDBA==", []byte{1, 2, 3, 4}},
{"T", []byte{}},
{"Tfoo\nbar", []byte("foo\nbar")},
{"Cabcdef", storedPayload},
}
for _, c := range cases {
objectID, err := content.ParseObjectID(c.text)
if err != nil {
t.Errorf("cannot parse object ID: %v", err)
continue
}
reader, err := mgr.Open(objectID)
if err != nil {
t.Errorf("cannot create reader for %v: %v", objectID, err)
continue
}
d, err := ioutil.ReadAll(reader)
if err != nil {
t.Errorf("cannot read all data for %v: %v", objectID, err)
continue
}
if !bytes.Equal(d, c.payload) {
t.Errorf("incorrect payload for %v: expected: %v got: %v", objectID, c.payload, d)
continue
}
}
}
func TestReaderStoredBlockNotFound(t *testing.T) {
_, mgr := setupTest(t)
objectID, err := content.ParseObjectID("Cno-such-block")
if err != nil {
t.Errorf("cannot parse object ID: %v", err)
}
reader, err := mgr.Open(objectID)
if err != storage.ErrBlockNotFound || reader != nil {
t.Errorf("unexpected result: reader: %v err: %v", reader, err)
}
}
func TestEndToEndReadAndSeek(t *testing.T) {
_, mgr := setupTest(t)
for _, forceStored := range []bool{false, true} {
for _, size := range []int{1, 199, 200, 201, 9999, 512434} {
// Create some random data sample of the specified size.
randomData := make([]byte, size)
cryptorand.Read(randomData)
writer := mgr.NewWriter(
WithBlockNamePrefix("X"),
)
writer.Write(randomData)
objectID, err := writer.Result(forceStored)
if err != nil {
t.Errorf("cannot get writer result for %v/%v: %v", forceStored, size, err)
continue
}
reader, err := mgr.Open(objectID)
if err != nil {
t.Errorf("cannot get reader for %v/%v: %v", forceStored, size, err)
continue
}
for i := 0; i < 20; i++ {
sampleSize := int(rand.Int31n(300))
seekOffset := int(rand.Int31n(int32(len(randomData))))
if seekOffset+sampleSize > len(randomData) {
sampleSize = len(randomData) - seekOffset
}
if sampleSize > 0 {
got := make([]byte, sampleSize)
reader.Seek(int64(seekOffset), 0)
reader.Read(got)
expected := randomData[seekOffset : seekOffset+sampleSize]
if !bytes.Equal(expected, got) {
t.Errorf("incorrect data read for %v/%v: expected: %v, got: %v", forceStored, size, expected, got)
}
}
}
}
}
}

253
cas/object_reader.go Normal file
View File

@@ -0,0 +1,253 @@
package cas
import (
"bufio"
"bytes"
"fmt"
"io"
"path/filepath"
"strconv"
"strings"
"github.com/kopia/kopia/content"
"github.com/kopia/kopia/storage"
)
type repositoryReader interface {
GetBlock(id storage.BlockID) ([]byte, error)
}
type seekTableEntry struct {
startOffset int64
length int64
blockID storage.BlockID
}
func (r *seekTableEntry) endOffset() int64 {
return r.startOffset + int64(r.length)
}
func (r *seekTableEntry) String() string {
return fmt.Sprintf("start: %d len: %d end: %d block: %s",
r.startOffset,
r.length,
r.endOffset(),
r.blockID)
}
type objectReader struct {
repository repositoryReader
seekTable []seekTableEntry
currentPosition int64 // Overall position in the objectReader
totalLength int64 // Overall length
currentChunkIndex int // Index of current chunk in the seek table
currentChunkData []byte // Current chunk data
currentChunkPosition int // Read position in the current chunk
}
func (r *objectReader) Read(buffer []byte) (int, error) {
readBytes := 0
remaining := len(buffer)
for remaining > 0 {
if r.currentChunkData != nil {
toCopy := len(r.currentChunkData) - r.currentChunkPosition
if toCopy == 0 {
// EOF on curren chunk
r.closeCurrentChunk()
r.currentChunkIndex++
continue
}
if toCopy > remaining {
toCopy = remaining
}
copy(buffer[readBytes:],
r.currentChunkData[r.currentChunkPosition:r.currentChunkPosition+toCopy])
r.currentChunkPosition += toCopy
r.currentPosition += int64(toCopy)
readBytes += toCopy
remaining -= toCopy
} else if r.currentChunkIndex < len(r.seekTable) {
err := r.openCurrentChunk()
if err != nil {
return 0, err
}
} else {
break
}
}
if readBytes == 0 {
return readBytes, io.EOF
}
return readBytes, nil
}
func (r *objectReader) openCurrentChunk() error {
blockID := r.seekTable[r.currentChunkIndex].blockID
blockData, err := r.repository.GetBlock(blockID)
if err != nil {
return err
}
r.currentChunkData = blockData
r.currentChunkPosition = 0
return nil
}
func (r *objectReader) closeCurrentChunk() {
r.currentChunkData = nil
}
func (r *objectReader) findChunkIndexForOffset(offset int64) int {
left := 0
right := len(r.seekTable) - 1
for left <= right {
middle := (left + right) / 2
if offset < r.seekTable[middle].startOffset {
right = middle - 1
continue
}
if offset >= r.seekTable[middle].endOffset() {
left = middle + 1
continue
}
return middle
}
panic("Unreachable code")
}
func (r *objectReader) Seek(offset int64, whence int) (int64, error) {
if whence == 1 {
return r.Seek(r.currentPosition+offset, 0)
}
if whence == 2 {
return r.Seek(r.totalLength+offset, 0)
}
if offset < 0 {
return -1, fmt.Errorf("Invalid seek.")
}
if offset > r.totalLength {
offset = r.totalLength
}
index := r.findChunkIndexForOffset(offset)
chunkStartOffset := r.seekTable[index].startOffset
if index != r.currentChunkIndex {
r.closeCurrentChunk()
r.currentChunkIndex = index
}
if r.currentChunkData == nil {
r.openCurrentChunk()
}
r.currentChunkPosition = int(offset - chunkStartOffset)
r.currentPosition = offset
return r.currentPosition, nil
}
func (mgr *objectManager) newRawReader(objectID content.ObjectID) (io.ReadSeeker, error) {
inline := objectID.InlineData()
if inline != nil {
return bytes.NewReader(inline), nil
}
blockID := objectID.BlockID()
ext := filepath.Ext(string(blockID))
payload, err := mgr.repository.GetBlock(blockID)
if err != nil {
return nil, err
}
switch ext {
case "":
// Unencrypted data
return bytes.NewReader(payload), nil
case ".e":
return mgr.decryptPayload(payload)
default:
return nil, fmt.Errorf("unsupported block format: %v", blockID)
}
}
func (mgr *objectManager) decryptPayload(b []byte) (io.ReadSeeker, error) {
return nil, nil
}
func (mgr *objectManager) flattenListChunk(
seekTable []seekTableEntry,
listObjectID content.ObjectID,
rawReader io.Reader) ([]seekTableEntry, error) {
scanner := bufio.NewScanner(rawReader)
for scanner.Scan() {
c := scanner.Text()
comma := strings.Index(c, ",")
if comma <= 0 {
return nil, fmt.Errorf("unsupported entry '%v' in list '%s'", c, listObjectID)
}
length, err := strconv.ParseInt(c[0:comma], 10, 64)
objectID, err := content.ParseObjectID(c[comma+1:])
if err != nil {
return nil, fmt.Errorf("unsupported entry '%v' in list '%s': %#v", c, listObjectID, err)
}
switch objectID.Type() {
case content.ObjectIDTypeList:
subreader, err := mgr.newRawReader(objectID)
if err != nil {
return nil, err
}
seekTable, err = mgr.flattenListChunk(seekTable, objectID, subreader)
if err != nil {
return nil, err
}
case content.ObjectIDTypeStored:
var startOffset int64
if len(seekTable) > 0 {
startOffset = seekTable[len(seekTable)-1].endOffset()
} else {
startOffset = 0
}
seekTable = append(
seekTable,
seekTableEntry{
blockID: objectID.BlockID(),
startOffset: startOffset,
length: length,
})
default:
return nil, fmt.Errorf("unsupported entry '%v' in list '%v'", objectID, listObjectID)
}
}
return seekTable, nil
}

204
cas/object_writer.go Normal file
View File

@@ -0,0 +1,204 @@
package cas
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"unicode/utf8"
"github.com/kopia/kopia/content"
"github.com/kopia/kopia/storage"
)
type blockHasher interface {
hashBuffer(data []byte) string
}
// ObjectWriter allows writing content to the repository and supports automatic deduplication and encryption
// of written data.
type ObjectWriter interface {
io.WriteCloser
Result(forceStored bool) (content.ObjectID, error)
}
// objectWriterConfig
type objectWriterConfig struct {
mgr *objectManager
putOptions storage.PutOptions
}
type objectWriter struct {
objectWriterConfig
buffer *bytes.Buffer
totalLength int64
prefix string
listWriter *objectWriter
flushedObjectCount int
lastFlushedObject content.ObjectID
description string
objectType content.ObjectIDType
atomicWrites bool
}
func (w *objectWriter) Close() error {
if w.buffer != nil {
w.mgr.bufferManager.returnBuffer(w.buffer)
w.buffer = nil
}
if w.listWriter != nil {
w.listWriter.Close()
w.listWriter = nil
}
return nil
}
func (w *objectWriter) Write(data []byte) (n int, err error) {
remaining := len(data)
w.totalLength += int64(remaining)
for remaining > 0 {
if w.buffer == nil {
w.buffer = w.mgr.bufferManager.newBuffer()
}
room := w.buffer.Cap() - w.buffer.Len()
if remaining <= room {
w.buffer.Write(data)
remaining = 0
} else {
if w.atomicWrites {
if w.buffer == nil {
// We're at the beginning of a buffer, fail if the buffer is too small.
return 0, fmt.Errorf("object writer buffer too small, need: %v, have: %v", remaining, room)
}
if err := w.flushBuffer(); err != nil {
return 0, err
}
continue
}
w.buffer.Write(data[0:room])
if err := w.flushBuffer(); err != nil {
return 0, err
}
data = data[room:]
remaining = len(data)
}
}
return len(data), nil
}
func (w *objectWriter) flushBuffer() error {
if w.buffer != nil {
data := w.buffer.Bytes()
length := w.buffer.Len()
b := w.mgr.bufferManager.returnBufferOnClose(w.buffer)
w.buffer = nil
objectID, transformer := w.mgr.formatter.Do(data, string(w.objectType)+w.prefix)
b = transformer(b)
if err := w.mgr.repository.PutBlock(objectID.BlockID(), b, storage.PutOptions{}); err != nil {
return fmt.Errorf(
"error when flushing chunk %d of %s to %#v: %#v",
w.flushedObjectCount,
w.description,
objectID.BlockID(),
err)
}
w.flushedObjectCount++
w.lastFlushedObject = objectID
if w.listWriter == nil {
w.listWriter = newObjectWriter(w.objectWriterConfig, content.ObjectIDTypeList)
w.listWriter.description = "LIST(" + w.description + ")"
w.listWriter.atomicWrites = true
}
fmt.Fprintf(w.listWriter, "%v,%v\n", length, objectID)
}
return nil
}
func newObjectWriter(cfg objectWriterConfig, objectType content.ObjectIDType) *objectWriter {
return &objectWriter{
objectWriterConfig: cfg,
objectType: objectType,
}
}
func (w *objectWriter) Result(forceStored bool) (content.ObjectID, error) {
if !forceStored && w.flushedObjectCount == 0 {
if w.buffer == nil {
return content.NewInlineBinaryObjectID(nil), nil
}
if w.buffer.Len() < w.mgr.maxInlineBlobSize {
data := w.buffer.Bytes()
if !utf8.Valid(data) {
return content.NewInlineBinaryObjectID(data), nil
}
// If the binary represents valid UTF8, try encoding it as text (JSON) or binary chunk
// and pick the one that has shorter representation.
dataString := string(data)
jsonData, _ := json.Marshal(dataString)
jsonLen := len(jsonData)
base64Len := base64.StdEncoding.EncodedLen(len(data))
if jsonLen < base64Len {
return content.NewInlineTextObjectID(dataString), nil
}
return content.NewInlineBinaryObjectID(data), nil
}
}
w.flushBuffer()
defer func() {
w.listWriter.Close()
}()
if w.flushedObjectCount == 1 {
return w.lastFlushedObject, nil
} else if w.flushedObjectCount == 0 {
return content.NullObjectID, nil
} else {
return w.listWriter.Result(true)
}
}
// WriterOption is an option that can be passed to ObjectManager.NewWriter()
type WriterOption func(*objectWriter)
// WithBlockNamePrefix causes the ObjectWriter to prefix any blocks emitted to the repository with a given string.
func WithBlockNamePrefix(prefix string) WriterOption {
return func(w *objectWriter) {
w.prefix = prefix
}
}
// WithDescription is used for debugging only and causes the following string to be emitted in logs.
func WithDescription(description string) WriterOption {
return func(w *objectWriter) {
w.description = description
}
}
// WithPutOptions causes the ObjectWriter to use the specified options when writing blocks to the repository.
func WithPutOptions(options storage.PutOptions) WriterOption {
return func(w *objectWriter) {
w.putOptions = options
}
}

View File

@@ -142,11 +142,11 @@ func (c ObjectID) EncryptionInfo() ObjectEncryptionInfo {
return NoEncryption
}
func newInlineBinaryObjectID(data []byte) ObjectID {
func NewInlineBinaryObjectID(data []byte) ObjectID {
return ObjectID("B" + base64.StdEncoding.EncodeToString(data))
}
func newInlineTextObjectID(text string) ObjectID {
func NewInlineTextObjectID(text string) ObjectID {
return ObjectID("T" + text)
}