diff --git a/repo/repository.go b/repo/repository.go index 2124a237f..ac61a39d4 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -7,7 +7,6 @@ "io" "log" "strings" - "sync" "sync/atomic" "github.com/kopia/kopia/blob" @@ -23,17 +22,6 @@ type ObjectReader interface { Length() int64 } -type empty struct{} -type semaphore chan empty - -func (s semaphore) Lock() { - s <- empty{} -} - -func (s semaphore) Unlock() { - <-s -} - // Repository implements a content-addressable storage on top of blob storage. type Repository struct { Stats Stats // vital statistics @@ -44,44 +32,11 @@ type Repository struct { format Format formatter ObjectFormatter - writeBackWorkers int - writeBackSemaphore semaphore - writeBackErrors asyncErrors - waitGroup sync.WaitGroup + writeBack writebackManager newSplitter func() objectSplitter } -type asyncErrors struct { - sync.RWMutex - errors []error -} - -func (e *asyncErrors) add(err error) { - e.Lock() - e.errors = append(e.errors, err) - e.Unlock() -} - -func (e *asyncErrors) check() error { - e.RLock() - defer e.RUnlock() - - switch len(e.errors) { - case 0: - return nil - case 1: - return e.errors[0] - default: - msg := make([]string, len(e.errors)) - for i, err := range e.errors { - msg[i] = err.Error() - } - - return fmt.Errorf("%v errors: %v", len(e.errors), strings.Join(msg, ";")) - } -} - // Close closes the connection to the underlying blob storage and releases any resources. func (r *Repository) Close() error { r.Flush() @@ -95,9 +50,7 @@ func (r *Repository) Close() error { // Flush waits for all in-flight writes to complete. func (r *Repository) Flush() error { - if r.writeBackWorkers > 0 { - r.waitGroup.Wait() - } + r.writeBack.flush() return nil } @@ -164,7 +117,7 @@ func (r *Repository) Open(objectID ObjectID) (ObjectReader, error) { // of goroutines. func WriteBack(writeBackWorkers int) RepositoryOption { return func(o *Repository) { - o.writeBackWorkers = writeBackWorkers + o.writeBack.workers = writeBackWorkers } } @@ -214,8 +167,8 @@ func New(s blob.Storage, f *Format, options ...RepositoryOption) (*Repository, e } r.bufferManager = newBufferManager(int(r.format.MaxBlockSize)) - if r.writeBackWorkers > 0 { - r.writeBackSemaphore = make(semaphore, r.writeBackWorkers) + if r.writeBack.enabled() { + r.writeBack.semaphore = make(semaphore, r.writeBack.workers) } return r, nil @@ -230,7 +183,7 @@ func (r *Repository) hashEncryptAndWriteMaybeAsync(buffer *bytes.Buffer, prefix data = buffer.Bytes() } - if err := r.writeBackErrors.check(); err != nil { + if err := r.writeBack.errors.check(); err != nil { return NullObjectID, err } @@ -240,16 +193,16 @@ func (r *Repository) hashEncryptAndWriteMaybeAsync(buffer *bytes.Buffer, prefix atomic.AddInt32(&r.Stats.HashedBlocks, 1) atomic.AddInt64(&r.Stats.HashedBytes, int64(len(data))) - if r.writeBackWorkers > 0 { + if r.writeBack.enabled() { // Tell the defer block not to return the buffer synchronously. - r.waitGroup.Add(1) - r.writeBackSemaphore.Lock() + r.writeBack.waitGroup.Add(1) + r.writeBack.semaphore.Lock() go func() { if _, err := r.encryptAndMaybeWrite(objectID, buffer, prefix); err != nil { - r.writeBackErrors.add(err) + r.writeBack.errors.add(err) } - r.writeBackSemaphore.Unlock() - r.waitGroup.Done() + r.writeBack.semaphore.Unlock() + r.writeBack.waitGroup.Done() }() // async will fail later. @@ -292,7 +245,7 @@ func (r *Repository) encryptAndMaybeWrite(objectID ObjectID, buffer *bytes.Buffe atomic.AddInt64(&r.Stats.WrittenBytes, int64(len(data))) if err := r.Storage.PutBlock(objectID.StorageBlock, data, blob.PutOptionsDefault); err != nil { - r.writeBackErrors.add(err) + r.writeBack.errors.add(err) } return objectID, nil diff --git a/repo/semaphore.go b/repo/semaphore.go new file mode 100644 index 000000000..1828f9097 --- /dev/null +++ b/repo/semaphore.go @@ -0,0 +1,12 @@ +package repo + +type empty struct{} +type semaphore chan empty + +func (s semaphore) Lock() { + s <- empty{} +} + +func (s semaphore) Unlock() { + <-s +} diff --git a/repo/writeback.go b/repo/writeback.go new file mode 100644 index 000000000..ccd05b21d --- /dev/null +++ b/repo/writeback.go @@ -0,0 +1,54 @@ +package repo + +import ( + "fmt" + "strings" + "sync" +) + +type writebackManager struct { + workers int + semaphore semaphore + errors asyncErrors + waitGroup sync.WaitGroup +} + +func (w *writebackManager) enabled() bool { + return w.workers > 0 +} + +func (w *writebackManager) flush() { + if w.workers > 0 { + w.waitGroup.Wait() + } +} + +type asyncErrors struct { + sync.RWMutex + errors []error +} + +func (e *asyncErrors) add(err error) { + e.Lock() + e.errors = append(e.errors, err) + e.Unlock() +} + +func (e *asyncErrors) check() error { + e.RLock() + defer e.RUnlock() + + switch len(e.errors) { + case 0: + return nil + case 1: + return e.errors[0] + default: + msg := make([]string, len(e.errors)) + for i, err := range e.errors { + msg[i] = err.Error() + } + + return fmt.Errorf("%v errors: %v", len(e.errors), strings.Join(msg, ";")) + } +}