Added write-back support to Repository.

This commit is contained in:
Jarek Kowalski
2016-08-25 19:41:13 -07:00
parent 381d3780dc
commit 2e5b08a3b8
5 changed files with 96 additions and 19 deletions

View File

@@ -14,6 +14,10 @@ install:
@echo Building version: $(BUILD_INFO) / $(BUILD_VERSION)
go install -ldflags $(LDARGS) github.com/kopia/kopia/cmd/kopia
install-race:
@echo Building version: $(BUILD_INFO) / $(BUILD_VERSION)
go install -race -ldflags $(LDARGS) github.com/kopia/kopia/cmd/kopia
build:
go build github.com/kopia/kopia/...

View File

@@ -52,7 +52,7 @@ func runBackupCommand(context *kingpin.ParseContext) error {
repoOptions = append(repoOptions, repo.WriteBack(*backupWriteBack))
}
vlt, r := mustOpenVaultAndRepository()
vlt, r := mustOpenVaultAndRepository(repoOptions...)
defer r.Close()
var options []fs.UploaderOption

View File

@@ -19,8 +19,6 @@ type bufferManager struct {
// newBuffer returns a new or reused bytes.Buffer.
func (mgr *bufferManager) newBuffer() *bytes.Buffer {
atomic.AddInt32(&mgr.outstandingCount, 1)
//debug.PrintStack()
//b := mgr.pool.Get().(*bytes.Buffer)
b := mgr.pool.New().(*bytes.Buffer)
b.Reset()
return b
@@ -28,7 +26,6 @@ func (mgr *bufferManager) newBuffer() *bytes.Buffer {
// returnBuffer returns the give buffer to the pool
func (mgr *bufferManager) returnBuffer(b *bytes.Buffer) {
//log.Printf("returning buffer")
atomic.AddInt32(&mgr.outstandingCount, -1)
mgr.pool.Put(b)
}

View File

@@ -3,12 +3,11 @@
import (
"bufio"
"bytes"
"crypto/hmac"
"fmt"
"hash"
"io"
"log"
"strings"
"sync"
"sync/atomic"
"github.com/kopia/kopia/internal/jsonstream"
@@ -65,7 +64,16 @@ type RepositoryStats struct {
ValidBlocks int32
}
type keygenFunc func([]byte) (blockIDBytes []byte, key []byte)
type empty struct{}
type semaphore chan empty
func (s semaphore) Lock() {
s <- empty{}
}
func (s semaphore) Unlock() {
<-s
}
type repository struct {
storage storage.Storage
@@ -76,10 +84,46 @@ type repository struct {
format Format
formatter ObjectFormatter
workerCount int
writeBackWorkers int
writeBackSemaphore semaphore
writeBackErrors asyncErrors
waitGroup sync.WaitGroup
}
type asyncErrors struct {
sync.RWMutex
errors []error
}
func (e *asyncErrors) add(err error) {
e.Lock()
e.errors = append(e.errors, err)
e.Unlock()
}
func (e *asyncErrors) check() error {
e.RLock()
defer e.RUnlock()
switch len(e.errors) {
case 0:
return nil
case 1:
return e.errors[0]
default:
msg := make([]string, len(e.errors))
for i, err := range e.errors {
msg[i] = err.Error()
}
return fmt.Errorf("%v errors: %v", len(e.errors), strings.Join(msg, ";"))
}
}
func (r *repository) Close() error {
r.Flush()
if err := r.storage.Close(); err != nil {
return err
}
@@ -89,6 +133,9 @@ func (r *repository) Close() error {
}
func (r *repository) Flush() error {
if r.writeBackWorkers > 0 {
r.waitGroup.Wait()
}
return nil
}
@@ -117,6 +164,9 @@ func (r *repository) Open(objectID ObjectID) (ObjectReader, error) {
// log.Printf("Repository::Open %v", objectID.String())
// defer log.Printf("finished Repository::Open() %v", objectID.String())
// Flush any pending writes.
r.Flush()
if objectID.Section != nil {
baseReader, err := r.Open(objectID.Section.Base)
if err != nil {
@@ -155,9 +205,9 @@ func (r *repository) Open(objectID ObjectID) (ObjectReader, error) {
// WriteBack is an RepositoryOption that enables asynchronous writes to the storage using the pool
// of goroutines.
func WriteBack(workerCount int) RepositoryOption {
func WriteBack(writeBackWorkers int) RepositoryOption {
return func(o *repository) error {
o.workerCount = workerCount
o.writeBackWorkers = writeBackWorkers
return nil
}
}
@@ -170,12 +220,6 @@ func EnableLogging(options ...logging.Option) RepositoryOption {
}
}
func hmacFunc(key []byte, hf func() hash.Hash) func() hash.Hash {
return func() hash.Hash {
return hmac.New(hf, key)
}
}
// New creates a Repository with the specified storage, format and options.
func New(s storage.Storage, f *Format, options ...RepositoryOption) (Repository, error) {
if f.MaxBlockSize < 100 {
@@ -203,6 +247,9 @@ func New(s storage.Storage, f *Format, options ...RepositoryOption) (Repository,
}
r.bufferManager = newBufferManager(int(r.format.MaxBlockSize))
if r.writeBackWorkers > 0 {
r.writeBackSemaphore = make(semaphore, r.writeBackWorkers)
}
return r, nil
}
@@ -225,6 +272,10 @@ func (r *repository) hashEncryptAndWriteMaybeAsync(buffer *bytes.Buffer, prefix
}
}()
if err := r.writeBackErrors.check(); err != nil {
return NullObjectID, err
}
// Hash the block and compute encryption key.
blockID, encryptionKey := r.formatter.ComputeBlockIDAndKey(data, r.format.Secret)
atomic.AddInt32(&r.stats.HashedBlocks, 1)
@@ -255,7 +306,29 @@ func (r *repository) hashEncryptAndWriteMaybeAsync(buffer *bytes.Buffer, prefix
}
}
// Write the block
if r.writeBackWorkers > 0 {
// Tell the defer block not to return the buffer synchronously.
isAsync = true
r.waitGroup.Add(1)
r.writeBackSemaphore.Lock()
go func() {
defer func() {
r.bufferManager.returnBuffer(buffer)
r.writeBackSemaphore.Unlock()
r.waitGroup.Done()
}()
if err := r.storage.PutBlock(objectID.StorageBlock, data, storage.PutOptionsDefault); err != nil {
r.writeBackErrors.add(err)
}
}()
// async will fail later.
return objectID, nil
}
// Synchronous case
if err := r.storage.PutBlock(objectID.StorageBlock, data, storage.PutOptionsDefault); err != nil {
return NullObjectID, err
}

View File

@@ -18,7 +18,7 @@
)
func init() {
//panicOnBufferLeaks = true
panicOnBufferLeaks = true
}
func testFormat() *Format {
@@ -48,7 +48,7 @@ func setupTest(t *testing.T) (data map[string][]byte, repo Repository) {
data = map[string][]byte{}
st := storagetesting.NewMapStorage(data)
repo, err := New(st, testFormat())
repo, err := New(st, testFormat(), WriteBack(5))
if err != nil {
t.Errorf("cannot create manager: %v", err)
}
@@ -86,6 +86,8 @@ func TestWriters(t *testing.T) {
continue
}
repo.Flush()
if !objectIDsEqual(result, c.objectID) {
t.Errorf("incorrect result for %v, expected: %v got: %v", c.data, c.objectID.UIString(), result)
}
@@ -174,6 +176,7 @@ func TestIndirection(t *testing.T) {
writer := repo.NewWriter()
writer.Write(contentBytes)
result, err := writer.Result(false)
repo.Flush()
if err != nil {
t.Errorf("error getting writer results: %v", err)
}