From cbb492ea09dcb1cf093abffc5f876fb810dc56ba Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Tue, 8 Aug 2017 07:56:05 +0200 Subject: [PATCH] removed buffer manager --- repo/buffer_manager.go | 52 ------------------- repo/buffer_manager_test.go | 45 ----------------- repo/object_manager.go | 11 ++--- repo/object_manager_test.go | 4 -- repo/object_writer.go | 99 +++++++++++++++++-------------------- 5 files changed, 47 insertions(+), 164 deletions(-) delete mode 100644 repo/buffer_manager.go delete mode 100644 repo/buffer_manager_test.go diff --git a/repo/buffer_manager.go b/repo/buffer_manager.go deleted file mode 100644 index 6e553ca49..000000000 --- a/repo/buffer_manager.go +++ /dev/null @@ -1,52 +0,0 @@ -package repo - -import ( - "bytes" - "log" - "sync" - "sync/atomic" -) - -var panicOnBufferLeaks = false - -// bufferManager manages pool of reusable bytes.Buffer objects. -type bufferManager struct { - outstandingCount int32 - - pool sync.Pool -} - -// newBuffer returns a new or reused bytes.Buffer. -func (mgr *bufferManager) newBuffer() *bytes.Buffer { - atomic.AddInt32(&mgr.outstandingCount, 1) - b := mgr.pool.New().(*bytes.Buffer) - b.Reset() - return b -} - -// returnBuffer returns the give buffer to the pool -func (mgr *bufferManager) returnBuffer(b *bytes.Buffer) { - atomic.AddInt32(&mgr.outstandingCount, -1) - mgr.pool.Put(b) -} - -func (mgr *bufferManager) close() { - if mgr.outstandingCount != 0 { - if panicOnBufferLeaks { - log.Panicf("WARNING: Found %v buffer leaks.", mgr.outstandingCount) - } else { - log.Printf("WARNING: Found %v buffer leaks.", mgr.outstandingCount) - } - } -} - -func newBufferManager(blockSize int) *bufferManager { - mgr := &bufferManager{} - mgr.pool = sync.Pool{ - New: func() interface{} { - return bytes.NewBuffer(make([]byte, blockSize)) - }, - } - return mgr - -} diff --git a/repo/buffer_manager_test.go b/repo/buffer_manager_test.go deleted file mode 100644 index f30b32b9d..000000000 --- a/repo/buffer_manager_test.go +++ /dev/null @@ -1,45 +0,0 @@ -package repo - -import ( - "bytes" - "testing" -) - -func TestBufferManager(t *testing.T) { - mgr := newBufferManager(10) - defer mgr.close() - - verifyBufferClean := func(b *bytes.Buffer) { - if b.Cap() != 10 { - t.Errorf("unexpected cap: %v", b.Cap()) - } - if b.Len() != 0 { - t.Errorf("unexpected len: %v", b.Len()) - } - } - - b := mgr.newBuffer() - if mgr.outstandingCount != 1 { - t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount) - } - b1 := mgr.newBuffer() - verifyBufferClean(b) - verifyBufferClean(b1) - if mgr.outstandingCount != 2 { - t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount) - } - mgr.returnBuffer(b) - mgr.returnBuffer(b) - if mgr.outstandingCount != 0 { - t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount) - } - b2 := mgr.newBuffer() - if mgr.outstandingCount != 1 { - t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount) - } - verifyBufferClean(b2) - mgr.returnBuffer(b2) - if mgr.outstandingCount != 0 { - t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount) - } -} diff --git a/repo/object_manager.go b/repo/object_manager.go index 425101676..2ec81c27a 100644 --- a/repo/object_manager.go +++ b/repo/object_manager.go @@ -28,10 +28,9 @@ type ObjectManager struct { stats Stats storage blob.Storage - verbose bool - bufferManager *bufferManager - format config.RepositoryObjectFormat - formatter objectFormatter + verbose bool + format config.RepositoryObjectFormat + formatter objectFormatter packMgr *packManager writeBack writebackManager @@ -42,7 +41,6 @@ type ObjectManager struct { // Close closes the connection to the underlying blob storage and releases any resources. func (r *ObjectManager) Close() error { r.writeBack.flush() - r.bufferManager.close() return nil } @@ -195,7 +193,6 @@ func newObjectManager(s blob.Storage, f config.RepositoryObjectFormat, opts *Opt r.writeBack.workers = opts.WriteBack } - r.bufferManager = newBufferManager(int(r.format.MaxBlockSize)) if r.writeBack.enabled() { r.writeBack.semaphore = make(semaphore, r.writeBack.workers) } @@ -246,8 +243,6 @@ func (r *ObjectManager) hashEncryptAndWriteMaybeAsync(buffer *bytes.Buffer, pref } func (r *ObjectManager) encryptAndMaybeWrite(objectID ObjectID, buffer *bytes.Buffer, prefix string) (ObjectID, error) { - defer r.bufferManager.returnBuffer(buffer) - var data []byte if buffer != nil { data = buffer.Bytes() diff --git a/repo/object_manager_test.go b/repo/object_manager_test.go index 55073b1aa..4b0136aee 100644 --- a/repo/object_manager_test.go +++ b/repo/object_manager_test.go @@ -18,10 +18,6 @@ "github.com/kopia/kopia/internal/storagetesting" ) -func init() { - panicOnBufferLeaks = true -} - func setupTest(t *testing.T, mods ...func(o *NewRepositoryOptions)) (data map[string][]byte, om *Repository) { data = map[string][]byte{} st := storagetesting.NewMapStorage(data) diff --git a/repo/object_writer.go b/repo/object_writer.go index ae7802a31..1463b8fca 100644 --- a/repo/object_writer.go +++ b/repo/object_writer.go @@ -39,7 +39,7 @@ func (t *blockTracker) blockIDs() []string { type objectWriter struct { repo *ObjectManager - buffer *bytes.Buffer + buffer bytes.Buffer totalLength int64 prefix string @@ -59,11 +59,6 @@ type objectWriter struct { } func (w *objectWriter) Close() error { - if w.buffer != nil { - w.repo.bufferManager.returnBuffer(w.buffer) - w.buffer = nil - } - if w.listWriter != nil { w.listWriter.Close() w.listWriter = nil @@ -76,10 +71,6 @@ func (w *objectWriter) Write(data []byte) (n int, err error) { w.totalLength += int64(dataLen) for _, d := range data { - if w.buffer == nil { - w.buffer = w.repo.bufferManager.newBuffer() - } - w.buffer.WriteByte(d) if w.splitter.add(d) { @@ -93,57 +84,55 @@ func (w *objectWriter) Write(data []byte) (n int, err error) { } func (w *objectWriter) flushBuffer(force bool) error { - // log.Printf("flushing bufer") - // defer log.Printf("flushed") - if w.buffer != nil || force { - var length int - if w.buffer != nil { - length = w.buffer.Len() - } - - b := w.buffer - w.buffer = nil - - objectID, err := w.repo.hashEncryptAndWriteMaybeAsync(b, w.prefix, w.disablePacking) - if err != nil { - return fmt.Errorf( - "error when flushing chunk %d of %s: %#v", - w.flushedObjectCount, - w.description, - err) - } - - w.blockTracker.addBlock(objectID.StorageBlock) - - w.flushedObjectCount++ - w.lastFlushedObject = objectID - if w.listWriter == nil { - w.listWriter = &objectWriter{ - repo: w.repo, - indirectLevel: w.indirectLevel + 1, - prefix: w.prefix, - description: "LIST(" + w.description + ")", - blockTracker: w.blockTracker, - splitter: w.repo.newSplitter(), - } - w.listProtoWriter = jsonstream.NewWriter(w.listWriter, indirectStreamType) - w.listCurrentPos = 0 - } - - w.listProtoWriter.Write(&indirectObjectEntry{ - Object: &objectID, - Start: w.listCurrentPos, - Length: int64(length), - }) - - w.listCurrentPos += int64(length) + if !force && w.buffer.Len() == 0 { + return nil } + + length := w.buffer.Len() + + var b2 bytes.Buffer + w.buffer.WriteTo(&b2) + w.buffer.Reset() + + objectID, err := w.repo.hashEncryptAndWriteMaybeAsync(&b2, w.prefix, w.disablePacking) + if err != nil { + return fmt.Errorf( + "error when flushing chunk %d of %s: %#v", + w.flushedObjectCount, + w.description, + err) + } + + w.blockTracker.addBlock(objectID.StorageBlock) + + w.flushedObjectCount++ + w.lastFlushedObject = objectID + if w.listWriter == nil { + w.listWriter = &objectWriter{ + repo: w.repo, + indirectLevel: w.indirectLevel + 1, + prefix: w.prefix, + description: "LIST(" + w.description + ")", + blockTracker: w.blockTracker, + splitter: w.repo.newSplitter(), + } + w.listProtoWriter = jsonstream.NewWriter(w.listWriter, indirectStreamType) + w.listCurrentPos = 0 + } + + w.listProtoWriter.Write(&indirectObjectEntry{ + Object: &objectID, + Start: w.listCurrentPos, + Length: int64(length), + }) + + w.listCurrentPos += int64(length) return nil } func (w *objectWriter) Result(forceStored bool) (ObjectID, error) { if !forceStored && w.flushedObjectCount == 0 { - if w.buffer == nil { + if w.buffer.Len() == 0 { return NullObjectID, nil }