removed asynchronous mode for object.Manager

This commit is contained in:
Jarek Kowalski
2018-12-22 19:58:00 -08:00
parent b8ecd1dd10
commit 3487cc191e
5 changed files with 9 additions and 81 deletions

View File

@@ -7,7 +7,6 @@
"context"
"fmt"
"io"
"sync"
"github.com/kopia/repo/block"
"github.com/kopia/repo/internal/jsonstream"
@@ -40,22 +39,11 @@ type Manager struct {
Format Format
blockMgr blockManager
async bool
writeBackWG sync.WaitGroup
writeBackSemaphore semaphore
trace func(message string, args ...interface{})
trace func(message string, args ...interface{})
newSplitter func() objectSplitter
}
// Close closes the connection to the underlying blob storage and releases any resources.
func (om *Manager) Close(ctx context.Context) error {
om.writeBackWG.Wait()
return om.Flush(ctx)
}
// NewWriter creates an ObjectWriter for writing to the repository.
func (om *Manager) NewWriter(ctx context.Context, opt WriterOptions) Writer {
w := &objectWriter{
@@ -78,9 +66,6 @@ func (om *Manager) Open(ctx context.Context, objectID ID) (Reader, error) {
// log.Printf("Repository::Open %v", objectID.String())
// defer log.Printf("finished Repository::Open() %v", objectID.String())
// Flush any pending writes.
om.writeBackWG.Wait()
if indexObjectID, ok := objectID.IndexObjectID(); ok {
rd, err := om.Open(ctx, indexObjectID)
if err != nil {
@@ -109,9 +94,6 @@ func (om *Manager) Open(ctx context.Context, objectID ID) (Reader, error) {
// VerifyObject ensures that all objects backing ObjectID are present in the repository
// and returns the total length of the object and storage blocks of which it is composed.
func (om *Manager) VerifyObject(ctx context.Context, oid ID) (int64, []string, error) {
// Flush any pending writes.
om.writeBackWG.Wait()
blocks := &blockTracker{}
l, err := om.verifyObjectInternal(ctx, oid, blocks)
if err != nil {
@@ -169,20 +151,12 @@ func (om *Manager) verifyObjectInternal(ctx context.Context, oid ID, blocks *blo
}
// Flush closes any pending pack files. Once this method returns, ObjectIDs returned by ObjectManager are
// ok to be used.
func (om *Manager) Flush(ctx context.Context) error {
om.writeBackWG.Wait()
return nil
}
func nullTrace(message string, args ...interface{}) {
}
// ManagerOptions specifies object manager options.
type ManagerOptions struct {
WriteBack int
Trace func(message string, args ...interface{})
Trace func(message string, args ...interface{})
}
// NewObjectManager creates an ObjectManager with the specified block manager and format.
@@ -213,11 +187,6 @@ func NewObjectManager(ctx context.Context, bm blockManager, f Format, opts Manag
om.trace = nullTrace
}
if opts.WriteBack > 0 {
om.async = true
om.writeBackSemaphore = make(semaphore, opts.WriteBack)
}
return om, nil
}

View File

@@ -105,8 +105,6 @@ func TestWriters(t *testing.T) {
continue
}
om.writeBackWG.Wait()
if !objectIDsEqual(result, c.objectID) {
t.Errorf("incorrect result for %v, expected: %v got: %v", c.data, c.objectID.String(), result.String())
}
@@ -204,8 +202,6 @@ func TestIndirection(t *testing.T) {
t.Errorf("unexpected block count for %v: %v, expected %v", c.dataLength, got, want)
}
om.Flush(ctx)
l, b, err := om.VerifyObject(ctx, result)
if err != nil {
t.Errorf("error verifying %q: %v", result, err)

View File

@@ -59,13 +59,10 @@ type objectWriter struct {
splitter objectSplitter
pendingBlocksWG sync.WaitGroup
err asyncErrors
}
func (w *objectWriter) Close() error {
w.pendingBlocksWG.Wait()
return w.err.check()
return nil
}
func (w *objectWriter) Write(data []byte) (n int, err error) {
@@ -97,34 +94,14 @@ func (w *objectWriter) flushBuffer() error {
w.buffer.WriteTo(&b2) //nolint:errcheck
w.buffer.Reset()
do := func() {
blockID, err := w.repo.blockMgr.WriteBlock(w.ctx, b2.Bytes(), w.prefix)
w.repo.trace("OBJECT_WRITER(%q) stored %v (%v bytes)", w.description, blockID, length)
if err != nil {
w.err.add(fmt.Errorf("error when flushing chunk %d of %s: %v", chunkID, w.description, err))
return
}
w.blockIndex[chunkID].Object = DirectObjectID(blockID)
blockID, err := w.repo.blockMgr.WriteBlock(w.ctx, b2.Bytes(), w.prefix)
w.repo.trace("OBJECT_WRITER(%q) stored %v (%v bytes)", w.description, blockID, length)
if err != nil {
return fmt.Errorf("error when flushing chunk %d of %s: %v", chunkID, w.description, err)
}
if w.repo.async {
w.repo.writeBackSemaphore.Lock()
w.pendingBlocksWG.Add(1)
w.repo.writeBackWG.Add(1)
go func() {
defer w.pendingBlocksWG.Done()
defer w.repo.writeBackWG.Done()
defer w.repo.writeBackSemaphore.Unlock()
do()
}()
return nil
}
do()
return w.err.check()
w.blockIndex[chunkID].Object = DirectObjectID(blockID)
return nil
}
func (w *objectWriter) Result() (ID, error) {
@@ -133,11 +110,6 @@ func (w *objectWriter) Result() (ID, error) {
return "", err
}
}
w.pendingBlocksWG.Wait()
if err := w.err.check(); err != nil {
return "", err
}
if len(w.blockIndex) == 1 {
return w.blockIndex[0].Object, nil

View File

@@ -28,9 +28,6 @@ func (r *Repository) Close(ctx context.Context) error {
if err := r.Manifests.Flush(ctx); err != nil {
return fmt.Errorf("error flushing manifests: %v", err)
}
if err := r.Objects.Close(ctx); err != nil {
return fmt.Errorf("error closing objects: %v", err)
}
if err := r.Blocks.Flush(ctx); err != nil {
return fmt.Errorf("error closing blocks: %v", err)
}
@@ -45,9 +42,6 @@ func (r *Repository) Flush(ctx context.Context) error {
if err := r.Manifests.Flush(ctx); err != nil {
return err
}
if err := r.Objects.Flush(ctx); err != nil {
return err
}
return r.Blocks.Flush(ctx)
}

View File

@@ -45,8 +45,6 @@ func TestWriters(t *testing.T) {
continue
}
env.Repository.Objects.Flush(ctx)
if !objectIDsEqual(result, c.objectID) {
t.Errorf("incorrect result for %v, expected: %v got: %v", c.data, c.objectID.String(), result.String())
}
@@ -96,7 +94,6 @@ func TestPackingSimple(t *testing.T) {
oid2c := writeObject(ctx, t, env.Repository, []byte(content2), "packed-object-2c")
oid1c := writeObject(ctx, t, env.Repository, []byte(content1), "packed-object-1c")
env.Repository.Objects.Flush(ctx)
env.Repository.Blocks.Flush(ctx)
if got, want := oid1a.String(), oid1b.String(); got != want {