cleanup: moved writeback to separate file

This commit is contained in:
Jarek Kowalski
2017-07-18 10:45:34 +02:00
parent c3f0d5ddc4
commit cda3086dea
3 changed files with 79 additions and 60 deletions

View File

@@ -7,7 +7,6 @@
"io"
"log"
"strings"
"sync"
"sync/atomic"
"github.com/kopia/kopia/blob"
@@ -23,17 +22,6 @@ type ObjectReader interface {
Length() int64
}
type empty struct{}
type semaphore chan empty
func (s semaphore) Lock() {
s <- empty{}
}
func (s semaphore) Unlock() {
<-s
}
// Repository implements a content-addressable storage on top of blob storage.
type Repository struct {
Stats Stats // vital statistics
@@ -44,44 +32,11 @@ type Repository struct {
format Format
formatter ObjectFormatter
writeBackWorkers int
writeBackSemaphore semaphore
writeBackErrors asyncErrors
waitGroup sync.WaitGroup
writeBack writebackManager
newSplitter func() objectSplitter
}
type asyncErrors struct {
sync.RWMutex
errors []error
}
func (e *asyncErrors) add(err error) {
e.Lock()
e.errors = append(e.errors, err)
e.Unlock()
}
func (e *asyncErrors) check() error {
e.RLock()
defer e.RUnlock()
switch len(e.errors) {
case 0:
return nil
case 1:
return e.errors[0]
default:
msg := make([]string, len(e.errors))
for i, err := range e.errors {
msg[i] = err.Error()
}
return fmt.Errorf("%v errors: %v", len(e.errors), strings.Join(msg, ";"))
}
}
// Close closes the connection to the underlying blob storage and releases any resources.
func (r *Repository) Close() error {
r.Flush()
@@ -95,9 +50,7 @@ func (r *Repository) Close() error {
// Flush waits for all in-flight writes to complete.
func (r *Repository) Flush() error {
if r.writeBackWorkers > 0 {
r.waitGroup.Wait()
}
r.writeBack.flush()
return nil
}
@@ -164,7 +117,7 @@ func (r *Repository) Open(objectID ObjectID) (ObjectReader, error) {
// of goroutines.
func WriteBack(writeBackWorkers int) RepositoryOption {
return func(o *Repository) {
o.writeBackWorkers = writeBackWorkers
o.writeBack.workers = writeBackWorkers
}
}
@@ -214,8 +167,8 @@ func New(s blob.Storage, f *Format, options ...RepositoryOption) (*Repository, e
}
r.bufferManager = newBufferManager(int(r.format.MaxBlockSize))
if r.writeBackWorkers > 0 {
r.writeBackSemaphore = make(semaphore, r.writeBackWorkers)
if r.writeBack.enabled() {
r.writeBack.semaphore = make(semaphore, r.writeBack.workers)
}
return r, nil
@@ -230,7 +183,7 @@ func (r *Repository) hashEncryptAndWriteMaybeAsync(buffer *bytes.Buffer, prefix
data = buffer.Bytes()
}
if err := r.writeBackErrors.check(); err != nil {
if err := r.writeBack.errors.check(); err != nil {
return NullObjectID, err
}
@@ -240,16 +193,16 @@ func (r *Repository) hashEncryptAndWriteMaybeAsync(buffer *bytes.Buffer, prefix
atomic.AddInt32(&r.Stats.HashedBlocks, 1)
atomic.AddInt64(&r.Stats.HashedBytes, int64(len(data)))
if r.writeBackWorkers > 0 {
if r.writeBack.enabled() {
// Tell the defer block not to return the buffer synchronously.
r.waitGroup.Add(1)
r.writeBackSemaphore.Lock()
r.writeBack.waitGroup.Add(1)
r.writeBack.semaphore.Lock()
go func() {
if _, err := r.encryptAndMaybeWrite(objectID, buffer, prefix); err != nil {
r.writeBackErrors.add(err)
r.writeBack.errors.add(err)
}
r.writeBackSemaphore.Unlock()
r.waitGroup.Done()
r.writeBack.semaphore.Unlock()
r.writeBack.waitGroup.Done()
}()
// async will fail later.
@@ -292,7 +245,7 @@ func (r *Repository) encryptAndMaybeWrite(objectID ObjectID, buffer *bytes.Buffe
atomic.AddInt64(&r.Stats.WrittenBytes, int64(len(data)))
if err := r.Storage.PutBlock(objectID.StorageBlock, data, blob.PutOptionsDefault); err != nil {
r.writeBackErrors.add(err)
r.writeBack.errors.add(err)
}
return objectID, nil

12
repo/semaphore.go Normal file
View File

@@ -0,0 +1,12 @@
package repo
type empty struct{}
type semaphore chan empty
func (s semaphore) Lock() {
s <- empty{}
}
func (s semaphore) Unlock() {
<-s
}

54
repo/writeback.go Normal file
View File

@@ -0,0 +1,54 @@
package repo
import (
"fmt"
"strings"
"sync"
)
type writebackManager struct {
workers int
semaphore semaphore
errors asyncErrors
waitGroup sync.WaitGroup
}
func (w *writebackManager) enabled() bool {
return w.workers > 0
}
func (w *writebackManager) flush() {
if w.workers > 0 {
w.waitGroup.Wait()
}
}
type asyncErrors struct {
sync.RWMutex
errors []error
}
func (e *asyncErrors) add(err error) {
e.Lock()
e.errors = append(e.errors, err)
e.Unlock()
}
func (e *asyncErrors) check() error {
e.RLock()
defer e.RUnlock()
switch len(e.errors) {
case 0:
return nil
case 1:
return e.errors[0]
default:
msg := make([]string, len(e.errors))
for i, err := range e.errors {
msg[i] = err.Error()
}
return fmt.Errorf("%v errors: %v", len(e.errors), strings.Join(msg, ";"))
}
}