From 3487cc191e6f850e79ad2f80ce97dec3b7bc8cb4 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sat, 22 Dec 2018 19:58:00 -0800 Subject: [PATCH] removed asynchronous mode for object.Manager --- object/object_manager.go | 35 ++------------------ object/object_manager_test.go | 4 --- object/object_writer.go | 42 ++++-------------------- repository.go | 6 ---- tests/repository_test/repository_test.go | 3 -- 5 files changed, 9 insertions(+), 81 deletions(-) diff --git a/object/object_manager.go b/object/object_manager.go index f56d1c1ee..c85ed783c 100644 --- a/object/object_manager.go +++ b/object/object_manager.go @@ -7,7 +7,6 @@ "context" "fmt" "io" - "sync" "github.com/kopia/repo/block" "github.com/kopia/repo/internal/jsonstream" @@ -40,22 +39,11 @@ type Manager struct { Format Format blockMgr blockManager - - async bool - writeBackWG sync.WaitGroup - writeBackSemaphore semaphore - - trace func(message string, args ...interface{}) + trace func(message string, args ...interface{}) newSplitter func() objectSplitter } -// Close closes the connection to the underlying blob storage and releases any resources. -func (om *Manager) Close(ctx context.Context) error { - om.writeBackWG.Wait() - return om.Flush(ctx) -} - // NewWriter creates an ObjectWriter for writing to the repository. func (om *Manager) NewWriter(ctx context.Context, opt WriterOptions) Writer { w := &objectWriter{ @@ -78,9 +66,6 @@ func (om *Manager) Open(ctx context.Context, objectID ID) (Reader, error) { // log.Printf("Repository::Open %v", objectID.String()) // defer log.Printf("finished Repository::Open() %v", objectID.String()) - // Flush any pending writes. - om.writeBackWG.Wait() - if indexObjectID, ok := objectID.IndexObjectID(); ok { rd, err := om.Open(ctx, indexObjectID) if err != nil { @@ -109,9 +94,6 @@ func (om *Manager) Open(ctx context.Context, objectID ID) (Reader, error) { // VerifyObject ensures that all objects backing ObjectID are present in the repository // and returns the total length of the object and storage blocks of which it is composed. func (om *Manager) VerifyObject(ctx context.Context, oid ID) (int64, []string, error) { - // Flush any pending writes. - om.writeBackWG.Wait() - blocks := &blockTracker{} l, err := om.verifyObjectInternal(ctx, oid, blocks) if err != nil { @@ -169,20 +151,12 @@ func (om *Manager) verifyObjectInternal(ctx context.Context, oid ID, blocks *blo } -// Flush closes any pending pack files. Once this method returns, ObjectIDs returned by ObjectManager are -// ok to be used. -func (om *Manager) Flush(ctx context.Context) error { - om.writeBackWG.Wait() - return nil -} - func nullTrace(message string, args ...interface{}) { } // ManagerOptions specifies object manager options. type ManagerOptions struct { - WriteBack int - Trace func(message string, args ...interface{}) + Trace func(message string, args ...interface{}) } // NewObjectManager creates an ObjectManager with the specified block manager and format. @@ -213,11 +187,6 @@ func NewObjectManager(ctx context.Context, bm blockManager, f Format, opts Manag om.trace = nullTrace } - if opts.WriteBack > 0 { - om.async = true - om.writeBackSemaphore = make(semaphore, opts.WriteBack) - } - return om, nil } diff --git a/object/object_manager_test.go b/object/object_manager_test.go index 5b14345ea..f2b712fb4 100644 --- a/object/object_manager_test.go +++ b/object/object_manager_test.go @@ -105,8 +105,6 @@ func TestWriters(t *testing.T) { continue } - om.writeBackWG.Wait() - if !objectIDsEqual(result, c.objectID) { t.Errorf("incorrect result for %v, expected: %v got: %v", c.data, c.objectID.String(), result.String()) } @@ -204,8 +202,6 @@ func TestIndirection(t *testing.T) { t.Errorf("unexpected block count for %v: %v, expected %v", c.dataLength, got, want) } - om.Flush(ctx) - l, b, err := om.VerifyObject(ctx, result) if err != nil { t.Errorf("error verifying %q: %v", result, err) diff --git a/object/object_writer.go b/object/object_writer.go index 2f11bd966..b8f4ddba7 100644 --- a/object/object_writer.go +++ b/object/object_writer.go @@ -59,13 +59,10 @@ type objectWriter struct { splitter objectSplitter pendingBlocksWG sync.WaitGroup - - err asyncErrors } func (w *objectWriter) Close() error { - w.pendingBlocksWG.Wait() - return w.err.check() + return nil } func (w *objectWriter) Write(data []byte) (n int, err error) { @@ -97,34 +94,14 @@ func (w *objectWriter) flushBuffer() error { w.buffer.WriteTo(&b2) //nolint:errcheck w.buffer.Reset() - do := func() { - blockID, err := w.repo.blockMgr.WriteBlock(w.ctx, b2.Bytes(), w.prefix) - w.repo.trace("OBJECT_WRITER(%q) stored %v (%v bytes)", w.description, blockID, length) - if err != nil { - w.err.add(fmt.Errorf("error when flushing chunk %d of %s: %v", chunkID, w.description, err)) - return - } - - w.blockIndex[chunkID].Object = DirectObjectID(blockID) + blockID, err := w.repo.blockMgr.WriteBlock(w.ctx, b2.Bytes(), w.prefix) + w.repo.trace("OBJECT_WRITER(%q) stored %v (%v bytes)", w.description, blockID, length) + if err != nil { + return fmt.Errorf("error when flushing chunk %d of %s: %v", chunkID, w.description, err) } - if w.repo.async { - w.repo.writeBackSemaphore.Lock() - w.pendingBlocksWG.Add(1) - w.repo.writeBackWG.Add(1) - - go func() { - defer w.pendingBlocksWG.Done() - defer w.repo.writeBackWG.Done() - defer w.repo.writeBackSemaphore.Unlock() - do() - }() - - return nil - } - - do() - return w.err.check() + w.blockIndex[chunkID].Object = DirectObjectID(blockID) + return nil } func (w *objectWriter) Result() (ID, error) { @@ -133,11 +110,6 @@ func (w *objectWriter) Result() (ID, error) { return "", err } } - w.pendingBlocksWG.Wait() - - if err := w.err.check(); err != nil { - return "", err - } if len(w.blockIndex) == 1 { return w.blockIndex[0].Object, nil diff --git a/repository.go b/repository.go index 539d1d66f..163b941ba 100644 --- a/repository.go +++ b/repository.go @@ -28,9 +28,6 @@ func (r *Repository) Close(ctx context.Context) error { if err := r.Manifests.Flush(ctx); err != nil { return fmt.Errorf("error flushing manifests: %v", err) } - if err := r.Objects.Close(ctx); err != nil { - return fmt.Errorf("error closing objects: %v", err) - } if err := r.Blocks.Flush(ctx); err != nil { return fmt.Errorf("error closing blocks: %v", err) } @@ -45,9 +42,6 @@ func (r *Repository) Flush(ctx context.Context) error { if err := r.Manifests.Flush(ctx); err != nil { return err } - if err := r.Objects.Flush(ctx); err != nil { - return err - } return r.Blocks.Flush(ctx) } diff --git a/tests/repository_test/repository_test.go b/tests/repository_test/repository_test.go index 514cb6498..29fb036ae 100644 --- a/tests/repository_test/repository_test.go +++ b/tests/repository_test/repository_test.go @@ -45,8 +45,6 @@ func TestWriters(t *testing.T) { continue } - env.Repository.Objects.Flush(ctx) - if !objectIDsEqual(result, c.objectID) { t.Errorf("incorrect result for %v, expected: %v got: %v", c.data, c.objectID.String(), result.String()) } @@ -96,7 +94,6 @@ func TestPackingSimple(t *testing.T) { oid2c := writeObject(ctx, t, env.Repository, []byte(content2), "packed-object-2c") oid1c := writeObject(ctx, t, env.Repository, []byte(content1), "packed-object-1c") - env.Repository.Objects.Flush(ctx) env.Repository.Blocks.Flush(ctx) if got, want := oid1a.String(), oid1b.String(); got != want {