From 2e5b08a3b843fed4bcfaea985f72ef329b323591 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Thu, 25 Aug 2016 19:41:13 -0700 Subject: [PATCH] Added write-back support to Repository. --- Makefile | 4 ++ cmd/kopia/command_backup.go | 2 +- repo/buffer_manager.go | 3 -- repo/repository.go | 99 ++++++++++++++++++++++++++++++++----- repo/repository_test.go | 7 ++- 5 files changed, 96 insertions(+), 19 deletions(-) diff --git a/Makefile b/Makefile index 36ab70538..820a05e6a 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,10 @@ install: @echo Building version: $(BUILD_INFO) / $(BUILD_VERSION) go install -ldflags $(LDARGS) github.com/kopia/kopia/cmd/kopia +install-race: + @echo Building version: $(BUILD_INFO) / $(BUILD_VERSION) + go install -race -ldflags $(LDARGS) github.com/kopia/kopia/cmd/kopia + build: go build github.com/kopia/kopia/... diff --git a/cmd/kopia/command_backup.go b/cmd/kopia/command_backup.go index a1960030a..58d90828c 100644 --- a/cmd/kopia/command_backup.go +++ b/cmd/kopia/command_backup.go @@ -52,7 +52,7 @@ func runBackupCommand(context *kingpin.ParseContext) error { repoOptions = append(repoOptions, repo.WriteBack(*backupWriteBack)) } - vlt, r := mustOpenVaultAndRepository() + vlt, r := mustOpenVaultAndRepository(repoOptions...) defer r.Close() var options []fs.UploaderOption diff --git a/repo/buffer_manager.go b/repo/buffer_manager.go index 515c7d378..6e553ca49 100644 --- a/repo/buffer_manager.go +++ b/repo/buffer_manager.go @@ -19,8 +19,6 @@ type bufferManager struct { // newBuffer returns a new or reused bytes.Buffer. func (mgr *bufferManager) newBuffer() *bytes.Buffer { atomic.AddInt32(&mgr.outstandingCount, 1) - //debug.PrintStack() - //b := mgr.pool.Get().(*bytes.Buffer) b := mgr.pool.New().(*bytes.Buffer) b.Reset() return b @@ -28,7 +26,6 @@ func (mgr *bufferManager) newBuffer() *bytes.Buffer { // returnBuffer returns the give buffer to the pool func (mgr *bufferManager) returnBuffer(b *bytes.Buffer) { - //log.Printf("returning buffer") atomic.AddInt32(&mgr.outstandingCount, -1) mgr.pool.Put(b) } diff --git a/repo/repository.go b/repo/repository.go index 031a0c939..277d4b21a 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -3,12 +3,11 @@ import ( "bufio" "bytes" - "crypto/hmac" "fmt" - "hash" "io" "log" "strings" + "sync" "sync/atomic" "github.com/kopia/kopia/internal/jsonstream" @@ -65,7 +64,16 @@ type RepositoryStats struct { ValidBlocks int32 } -type keygenFunc func([]byte) (blockIDBytes []byte, key []byte) +type empty struct{} +type semaphore chan empty + +func (s semaphore) Lock() { + s <- empty{} +} + +func (s semaphore) Unlock() { + <-s +} type repository struct { storage storage.Storage @@ -76,10 +84,46 @@ type repository struct { format Format formatter ObjectFormatter - workerCount int + writeBackWorkers int + + writeBackSemaphore semaphore + writeBackErrors asyncErrors + + waitGroup sync.WaitGroup +} + +type asyncErrors struct { + sync.RWMutex + errors []error +} + +func (e *asyncErrors) add(err error) { + e.Lock() + e.errors = append(e.errors, err) + e.Unlock() +} + +func (e *asyncErrors) check() error { + e.RLock() + defer e.RUnlock() + + switch len(e.errors) { + case 0: + return nil + case 1: + return e.errors[0] + default: + msg := make([]string, len(e.errors)) + for i, err := range e.errors { + msg[i] = err.Error() + } + + return fmt.Errorf("%v errors: %v", len(e.errors), strings.Join(msg, ";")) + } } func (r *repository) Close() error { + r.Flush() if err := r.storage.Close(); err != nil { return err } @@ -89,6 +133,9 @@ func (r *repository) Close() error { } func (r *repository) Flush() error { + if r.writeBackWorkers > 0 { + r.waitGroup.Wait() + } return nil } @@ -117,6 +164,9 @@ func (r *repository) Open(objectID ObjectID) (ObjectReader, error) { // log.Printf("Repository::Open %v", objectID.String()) // defer log.Printf("finished Repository::Open() %v", objectID.String()) + // Flush any pending writes. + r.Flush() + if objectID.Section != nil { baseReader, err := r.Open(objectID.Section.Base) if err != nil { @@ -155,9 +205,9 @@ func (r *repository) Open(objectID ObjectID) (ObjectReader, error) { // WriteBack is an RepositoryOption that enables asynchronous writes to the storage using the pool // of goroutines. -func WriteBack(workerCount int) RepositoryOption { +func WriteBack(writeBackWorkers int) RepositoryOption { return func(o *repository) error { - o.workerCount = workerCount + o.writeBackWorkers = writeBackWorkers return nil } } @@ -170,12 +220,6 @@ func EnableLogging(options ...logging.Option) RepositoryOption { } } -func hmacFunc(key []byte, hf func() hash.Hash) func() hash.Hash { - return func() hash.Hash { - return hmac.New(hf, key) - } -} - // New creates a Repository with the specified storage, format and options. func New(s storage.Storage, f *Format, options ...RepositoryOption) (Repository, error) { if f.MaxBlockSize < 100 { @@ -203,6 +247,9 @@ func New(s storage.Storage, f *Format, options ...RepositoryOption) (Repository, } r.bufferManager = newBufferManager(int(r.format.MaxBlockSize)) + if r.writeBackWorkers > 0 { + r.writeBackSemaphore = make(semaphore, r.writeBackWorkers) + } return r, nil } @@ -225,6 +272,10 @@ func (r *repository) hashEncryptAndWriteMaybeAsync(buffer *bytes.Buffer, prefix } }() + if err := r.writeBackErrors.check(); err != nil { + return NullObjectID, err + } + // Hash the block and compute encryption key. blockID, encryptionKey := r.formatter.ComputeBlockIDAndKey(data, r.format.Secret) atomic.AddInt32(&r.stats.HashedBlocks, 1) @@ -255,7 +306,29 @@ func (r *repository) hashEncryptAndWriteMaybeAsync(buffer *bytes.Buffer, prefix } } - // Write the block + if r.writeBackWorkers > 0 { + // Tell the defer block not to return the buffer synchronously. + isAsync = true + + r.waitGroup.Add(1) + r.writeBackSemaphore.Lock() + go func() { + defer func() { + r.bufferManager.returnBuffer(buffer) + r.writeBackSemaphore.Unlock() + r.waitGroup.Done() + }() + + if err := r.storage.PutBlock(objectID.StorageBlock, data, storage.PutOptionsDefault); err != nil { + r.writeBackErrors.add(err) + } + }() + + // async will fail later. + return objectID, nil + } + + // Synchronous case if err := r.storage.PutBlock(objectID.StorageBlock, data, storage.PutOptionsDefault); err != nil { return NullObjectID, err } diff --git a/repo/repository_test.go b/repo/repository_test.go index 718641a72..a326b3b99 100644 --- a/repo/repository_test.go +++ b/repo/repository_test.go @@ -18,7 +18,7 @@ ) func init() { - //panicOnBufferLeaks = true + panicOnBufferLeaks = true } func testFormat() *Format { @@ -48,7 +48,7 @@ func setupTest(t *testing.T) (data map[string][]byte, repo Repository) { data = map[string][]byte{} st := storagetesting.NewMapStorage(data) - repo, err := New(st, testFormat()) + repo, err := New(st, testFormat(), WriteBack(5)) if err != nil { t.Errorf("cannot create manager: %v", err) } @@ -86,6 +86,8 @@ func TestWriters(t *testing.T) { continue } + repo.Flush() + if !objectIDsEqual(result, c.objectID) { t.Errorf("incorrect result for %v, expected: %v got: %v", c.data, c.objectID.UIString(), result) } @@ -174,6 +176,7 @@ func TestIndirection(t *testing.T) { writer := repo.NewWriter() writer.Write(contentBytes) result, err := writer.Result(false) + repo.Flush() if err != nil { t.Errorf("error getting writer results: %v", err) }