From cd2fcfeb4f092f174d5ffb70911a9fbbb7ceebf4 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sat, 19 Aug 2017 14:08:42 -0700 Subject: [PATCH] refactored ObjectWriter to allow parallel hashing of parts of large files --- repo/blocks.go | 6 +- repo/indirect.go | 6 +- repo/object_manager.go | 53 ++++--------- repo/object_manager_test.go | 5 +- repo/object_reader.go | 2 +- repo/object_writer.go | 143 +++++++++++++++++++++--------------- repo/repository.go | 2 +- repo/writeback.go | 17 ----- 8 files changed, 105 insertions(+), 129 deletions(-) diff --git a/repo/blocks.go b/repo/blocks.go index 9e23df504..5ce828268 100644 --- a/repo/blocks.go +++ b/repo/blocks.go @@ -47,10 +47,8 @@ func (r *ObjectManager) addStorageBlocks(result map[string]bool, oid ObjectID) e return err } for _, st := range chunks { - if st.Object != nil { - if err := r.addStorageBlocks(result, *st.Object); err != nil { - return err - } + if err := r.addStorageBlocks(result, st.Object); err != nil { + return err } } diff --git a/repo/indirect.go b/repo/indirect.go index 1845a7c14..b3ec9a2b5 100644 --- a/repo/indirect.go +++ b/repo/indirect.go @@ -4,7 +4,7 @@ // indirectObjectEntry represents an entry in indirect object stream. type indirectObjectEntry struct { - Start int64 `json:"s,omitempty"` - Length int64 `json:"l,omitempty"` - Object *ObjectID `json:"o,omitempty"` + Start int64 `json:"s,omitempty"` + Length int64 `json:"l,omitempty"` + Object ObjectID `json:"o,omitempty"` } diff --git a/repo/object_manager.go b/repo/object_manager.go index 807d424d4..3352efbd9 100644 --- a/repo/object_manager.go +++ b/repo/object_manager.go @@ -7,6 +7,7 @@ "io" "log" "strings" + "sync" "sync/atomic" "github.com/kopia/kopia/blob" @@ -32,9 +33,12 @@ type ObjectManager struct { formatter objectFormatter packMgr *packManager - writeBack writebackManager blockSizeCache *blockSizeCache + async bool + writeBackWG sync.WaitGroup + writeBackSemaphore semaphore + trace func(message string, args ...interface{}) newSplitter func() objectSplitter @@ -42,7 +46,7 @@ type ObjectManager struct { // Close closes the connection to the underlying blob storage and releases any resources. func (r *ObjectManager) Close() error { - r.writeBack.flush() + r.writeBackWG.Wait() r.blockSizeCache.close() return nil @@ -73,7 +77,7 @@ func (r *ObjectManager) Open(objectID ObjectID) (ObjectReader, error) { // defer log.Printf("finished Repository::Open() %v", objectID.String()) // Flush any pending writes. - r.writeBack.flush() + r.writeBackWG.Wait() if objectID.Section != nil { baseReader, err := r.Open(objectID.Section.Base) @@ -164,29 +168,24 @@ func newObjectManager(s blob.Storage, f config.RepositoryObjectFormat, opts *Opt } else { r.trace = nullTrace } - r.writeBack.workers = opts.WriteBack - } - - if r.writeBack.enabled() { - r.writeBack.semaphore = make(semaphore, r.writeBack.workers) + if opts.WriteBack > 0 { + r.async = true + r.writeBackSemaphore = make(semaphore, opts.WriteBack) + } } return r, nil } -// hashEncryptAndWriteMaybeAsync computes hash of a given buffer, optionally encrypts and writes it to storage. +// hashEncryptAndWrite computes hash of a given buffer, optionally encrypts and writes it to storage. // The write is not guaranteed to complete synchronously in case write-back is used, but by the time // Repository.Close() returns all writes are guaranteed be over. -func (r *ObjectManager) hashEncryptAndWriteMaybeAsync(packGroup string, buffer *bytes.Buffer, prefix string, disablePacking bool) (ObjectID, error) { +func (r *ObjectManager) hashEncryptAndWrite(packGroup string, buffer *bytes.Buffer, prefix string, disablePacking bool) (ObjectID, error) { var data []byte if buffer != nil { data = buffer.Bytes() } - if err := r.writeBack.errors.check(); err != nil { - return NullObjectID, err - } - // Hash the block and compute encryption key. objectID := r.formatter.ComputeObjectID(data) objectID.StorageBlock = prefix + objectID.StorageBlock @@ -198,30 +197,6 @@ func (r *ObjectManager) hashEncryptAndWriteMaybeAsync(packGroup string, buffer * return packOID, err } - if r.writeBack.enabled() { - r.writeBack.waitGroup.Add(1) - r.writeBack.semaphore.Lock() - go func() { - if _, err := r.encryptAndMaybeWrite(objectID, buffer, prefix); err != nil { - r.writeBack.errors.add(err) - } - r.writeBack.semaphore.Unlock() - r.writeBack.waitGroup.Done() - }() - - // async will fail later. - return objectID, nil - } - - return r.encryptAndMaybeWrite(objectID, buffer, prefix) -} - -func (r *ObjectManager) encryptAndMaybeWrite(objectID ObjectID, buffer *bytes.Buffer, prefix string) (ObjectID, error) { - var data []byte - if buffer != nil { - data = buffer.Bytes() - } - // Before performing encryption, check if the block is already there. blockSize, err := r.blockSizeCache.getSize(objectID.StorageBlock) atomic.AddInt32(&r.stats.CheckedBlocks, int32(1)) @@ -247,7 +222,7 @@ func (r *ObjectManager) encryptAndMaybeWrite(objectID ObjectID, buffer *bytes.Bu atomic.AddInt64(&r.stats.WrittenBytes, int64(len(data))) if err := r.storage.PutBlock(objectID.StorageBlock, data); err != nil { - r.writeBack.errors.add(err) + return NullObjectID, err } return objectID, nil diff --git a/repo/object_manager_test.go b/repo/object_manager_test.go index 8211b790a..ee9b50cbe 100644 --- a/repo/object_manager_test.go +++ b/repo/object_manager_test.go @@ -84,7 +84,7 @@ func TestWriters(t *testing.T) { continue } - repo.writeBack.flush() + repo.writeBackWG.Wait() if !objectIDsEqual(result, c.objectID) { t.Errorf("incorrect result for %v, expected: %v got: %v %#v", c.data, c.objectID.String(), result.String(), result.BinaryContent) @@ -238,13 +238,12 @@ func TestIndirection(t *testing.T) { writer := repo.NewWriter(WriterOptions{}) writer.Write(contentBytes) result, err := writer.Result(false) - repo.writeBack.flush() if err != nil { t.Errorf("error getting writer results: %v", err) } if indirectionLevel(result) != c.expectedIndirection { - t.Errorf("incorrect indirection level for size: %v: %v, expected %v", c.dataLength, result.Indirect, c.expectedIndirection) + t.Errorf("incorrect indirection level for size: %v: %v, expected %v", c.dataLength, indirectionLevel(result), c.expectedIndirection) } if got, want := len(data)-2, c.expectedBlockCount; got != want { diff --git a/repo/object_reader.go b/repo/object_reader.go index f20f0b1fa..1f1388f72 100644 --- a/repo/object_reader.go +++ b/repo/object_reader.go @@ -65,7 +65,7 @@ func (r *objectReader) Read(buffer []byte) (int, error) { func (r *objectReader) openCurrentChunk() error { st := r.seekTable[r.currentChunkIndex] - blockData, err := r.repo.Open(*st.Object) + blockData, err := r.repo.Open(st.Object) if err != nil { return err } diff --git a/repo/object_writer.go b/repo/object_writer.go index 22ded92d1..50bbaf91f 100644 --- a/repo/object_writer.go +++ b/repo/object_writer.go @@ -4,6 +4,7 @@ "bytes" "fmt" "io" + "sync" "github.com/kopia/kopia/internal/jsonstream" ) @@ -18,10 +19,14 @@ type ObjectWriter interface { } type blockTracker struct { + mu sync.Mutex blocks map[string]bool } func (t *blockTracker) addBlock(blockID string) { + t.mu.Lock() + defer t.mu.Unlock() + if t.blocks == nil { t.blocks = make(map[string]bool) } @@ -29,6 +34,9 @@ func (t *blockTracker) addBlock(blockID string) { } func (t *blockTracker) blockIDs() []string { + t.mu.Lock() + defer t.mu.Unlock() + result := make([]string, 0, len(t.blocks)) for k := range t.blocks { result = append(result, k) @@ -42,12 +50,9 @@ type objectWriter struct { buffer bytes.Buffer totalLength int64 - prefix string - listWriter *objectWriter - listProtoWriter *jsonstream.Writer - listCurrentPos int64 - flushedObjectCount int - lastFlushedObject ObjectID + prefix string + currentPosition int64 + blockIndex []indirectObjectEntry description string indirectLevel int32 @@ -57,14 +62,15 @@ type objectWriter struct { disablePacking bool packGroup string + + pendingBlocksWG sync.WaitGroup + + err asyncErrors } func (w *objectWriter) Close() error { - if w.listWriter != nil { - w.listWriter.Close() - w.listWriter = nil - } - return nil + w.pendingBlocksWG.Wait() + return w.err.check() } func (w *objectWriter) Write(data []byte) (n int, err error) { @@ -85,59 +91,56 @@ func (w *objectWriter) Write(data []byte) (n int, err error) { } func (w *objectWriter) flushBuffer(force bool) error { - if !force && w.buffer.Len() == 0 { + length := w.buffer.Len() + + if !force && length == 0 { w.repo.trace("OBJECT_WRITER(%q).flushBuffer(force=%v) empty", w.description, force) return nil } - length := w.buffer.Len() + chunkID := len(w.blockIndex) + w.blockIndex = append(w.blockIndex, indirectObjectEntry{}) + w.blockIndex[chunkID].Start = w.currentPosition + w.blockIndex[chunkID].Length = int64(length) + w.currentPosition += int64(length) var b2 bytes.Buffer w.buffer.WriteTo(&b2) w.buffer.Reset() - objectID, err := w.repo.hashEncryptAndWriteMaybeAsync(w.packGroup, &b2, w.prefix, w.disablePacking) - w.repo.trace("OBJECT_WRITER(%q) stored %v (%v bytes)", w.description, objectID, length) - if err != nil { - return fmt.Errorf( - "error when flushing chunk %d of %s: %#v", - w.flushedObjectCount, - w.description, - err) - } - - w.blockTracker.addBlock(objectID.StorageBlock) - - w.flushedObjectCount++ - w.lastFlushedObject = objectID - if w.listWriter == nil { - w.listWriter = &objectWriter{ - repo: w.repo, - indirectLevel: w.indirectLevel + 1, - prefix: w.prefix, - description: "LIST(" + w.description + ")", - blockTracker: w.blockTracker, - splitter: w.repo.newSplitter(), - - disablePacking: w.disablePacking, - packGroup: w.packGroup, + do := func() { + objectID, err := w.repo.hashEncryptAndWrite(w.packGroup, &b2, w.prefix, w.disablePacking) + w.repo.trace("OBJECT_WRITER(%q) stored %v (%v bytes)", w.description, objectID, length) + if err != nil { + w.err.add(fmt.Errorf("error when flushing chunk %d of %s: %v", chunkID, w.description, err)) + return } - w.listProtoWriter = jsonstream.NewWriter(w.listWriter, indirectStreamType) - w.listCurrentPos = 0 + + w.blockTracker.addBlock(objectID.StorageBlock) + w.blockIndex[chunkID].Object = objectID } - w.listProtoWriter.Write(&indirectObjectEntry{ - Object: &objectID, - Start: w.listCurrentPos, - Length: int64(length), - }) + if w.repo.async { + w.repo.writeBackSemaphore.Lock() + w.pendingBlocksWG.Add(1) + w.repo.writeBackWG.Add(1) - w.listCurrentPos += int64(length) - return nil + go func() { + defer w.pendingBlocksWG.Done() + defer w.repo.writeBackWG.Done() + defer w.repo.writeBackSemaphore.Unlock() + do() + }() + + return nil + } + + do() + return w.err.check() } func (w *objectWriter) Result(forceStored bool) (ObjectID, error) { - if !forceStored && w.flushedObjectCount == 0 { + if !forceStored && len(w.blockIndex) == 0 { if w.buffer.Len() == 0 { return NullObjectID, nil } @@ -148,21 +151,38 @@ func (w *objectWriter) Result(forceStored bool) (ObjectID, error) { } w.flushBuffer(forceStored) - defer func() { - if w.listWriter != nil { - w.listWriter.Close() - } - }() + w.pendingBlocksWG.Wait() - if w.flushedObjectCount == 1 { - w.lastFlushedObject = addIndirection(w.lastFlushedObject, w.indirectLevel) - return w.lastFlushedObject, nil - } else if w.flushedObjectCount == 0 { - return NullObjectID, nil - } else { - w.listProtoWriter.Finalize() - return w.listWriter.Result(true) + if err := w.err.check(); err != nil { + return NullObjectID, err } + + if len(w.blockIndex) == 1 { + return addIndirection(w.blockIndex[0].Object, w.indirectLevel), nil + } + + if len(w.blockIndex) == 0 { + return NullObjectID, nil + } + + iw := &objectWriter{ + repo: w.repo, + indirectLevel: w.indirectLevel + 1, + prefix: w.prefix, + description: "LIST(" + w.description + ")", + blockTracker: w.blockTracker, + splitter: w.repo.newSplitter(), + + disablePacking: w.disablePacking, + packGroup: w.packGroup, + } + + jw := jsonstream.NewWriter(iw, indirectStreamType) + for _, e := range w.blockIndex { + jw.Write(&e) + } + jw.Finalize() + return iw.Result(true) } func addIndirection(oid ObjectID, level int32) ObjectID { @@ -174,6 +194,7 @@ func addIndirection(oid ObjectID, level int32) ObjectID { } func (w *objectWriter) StorageBlocks() []string { + w.pendingBlocksWG.Wait() return w.blockTracker.blockIDs() } diff --git a/repo/repository.go b/repo/repository.go index e073d3b09..95e4d31f5 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -82,7 +82,7 @@ func (r *Repository) Close() error { // Flush waits for all in-flight writes to complete. func (r *Repository) Flush() error { - r.ObjectManager.writeBack.flush() + r.ObjectManager.writeBackWG.Wait() return nil } diff --git a/repo/writeback.go b/repo/writeback.go index ccd05b21d..29da44830 100644 --- a/repo/writeback.go +++ b/repo/writeback.go @@ -6,23 +6,6 @@ "sync" ) -type writebackManager struct { - workers int - semaphore semaphore - errors asyncErrors - waitGroup sync.WaitGroup -} - -func (w *writebackManager) enabled() bool { - return w.workers > 0 -} - -func (w *writebackManager) flush() { - if w.workers > 0 { - w.waitGroup.Wait() - } -} - type asyncErrors struct { sync.RWMutex errors []error