removed buffer manager

This commit is contained in:
Jarek Kowalski
2017-08-08 07:56:05 +02:00
parent 65d188ec59
commit cbb492ea09
5 changed files with 47 additions and 164 deletions

View File

@@ -1,52 +0,0 @@
package repo
import (
"bytes"
"log"
"sync"
"sync/atomic"
)
var panicOnBufferLeaks = false
// bufferManager manages pool of reusable bytes.Buffer objects.
type bufferManager struct {
outstandingCount int32
pool sync.Pool
}
// newBuffer returns a new or reused bytes.Buffer.
func (mgr *bufferManager) newBuffer() *bytes.Buffer {
atomic.AddInt32(&mgr.outstandingCount, 1)
b := mgr.pool.New().(*bytes.Buffer)
b.Reset()
return b
}
// returnBuffer returns the give buffer to the pool
func (mgr *bufferManager) returnBuffer(b *bytes.Buffer) {
atomic.AddInt32(&mgr.outstandingCount, -1)
mgr.pool.Put(b)
}
func (mgr *bufferManager) close() {
if mgr.outstandingCount != 0 {
if panicOnBufferLeaks {
log.Panicf("WARNING: Found %v buffer leaks.", mgr.outstandingCount)
} else {
log.Printf("WARNING: Found %v buffer leaks.", mgr.outstandingCount)
}
}
}
func newBufferManager(blockSize int) *bufferManager {
mgr := &bufferManager{}
mgr.pool = sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, blockSize))
},
}
return mgr
}

View File

@@ -1,45 +0,0 @@
package repo
import (
"bytes"
"testing"
)
func TestBufferManager(t *testing.T) {
mgr := newBufferManager(10)
defer mgr.close()
verifyBufferClean := func(b *bytes.Buffer) {
if b.Cap() != 10 {
t.Errorf("unexpected cap: %v", b.Cap())
}
if b.Len() != 0 {
t.Errorf("unexpected len: %v", b.Len())
}
}
b := mgr.newBuffer()
if mgr.outstandingCount != 1 {
t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount)
}
b1 := mgr.newBuffer()
verifyBufferClean(b)
verifyBufferClean(b1)
if mgr.outstandingCount != 2 {
t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount)
}
mgr.returnBuffer(b)
mgr.returnBuffer(b)
if mgr.outstandingCount != 0 {
t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount)
}
b2 := mgr.newBuffer()
if mgr.outstandingCount != 1 {
t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount)
}
verifyBufferClean(b2)
mgr.returnBuffer(b2)
if mgr.outstandingCount != 0 {
t.Errorf("unexpected outstandingCount: %v", mgr.outstandingCount)
}
}

View File

@@ -28,10 +28,9 @@ type ObjectManager struct {
stats Stats
storage blob.Storage
verbose bool
bufferManager *bufferManager
format config.RepositoryObjectFormat
formatter objectFormatter
verbose bool
format config.RepositoryObjectFormat
formatter objectFormatter
packMgr *packManager
writeBack writebackManager
@@ -42,7 +41,6 @@ type ObjectManager struct {
// Close closes the connection to the underlying blob storage and releases any resources.
func (r *ObjectManager) Close() error {
r.writeBack.flush()
r.bufferManager.close()
return nil
}
@@ -195,7 +193,6 @@ func newObjectManager(s blob.Storage, f config.RepositoryObjectFormat, opts *Opt
r.writeBack.workers = opts.WriteBack
}
r.bufferManager = newBufferManager(int(r.format.MaxBlockSize))
if r.writeBack.enabled() {
r.writeBack.semaphore = make(semaphore, r.writeBack.workers)
}
@@ -246,8 +243,6 @@ func (r *ObjectManager) hashEncryptAndWriteMaybeAsync(buffer *bytes.Buffer, pref
}
func (r *ObjectManager) encryptAndMaybeWrite(objectID ObjectID, buffer *bytes.Buffer, prefix string) (ObjectID, error) {
defer r.bufferManager.returnBuffer(buffer)
var data []byte
if buffer != nil {
data = buffer.Bytes()

View File

@@ -18,10 +18,6 @@
"github.com/kopia/kopia/internal/storagetesting"
)
func init() {
panicOnBufferLeaks = true
}
func setupTest(t *testing.T, mods ...func(o *NewRepositoryOptions)) (data map[string][]byte, om *Repository) {
data = map[string][]byte{}
st := storagetesting.NewMapStorage(data)

View File

@@ -39,7 +39,7 @@ func (t *blockTracker) blockIDs() []string {
type objectWriter struct {
repo *ObjectManager
buffer *bytes.Buffer
buffer bytes.Buffer
totalLength int64
prefix string
@@ -59,11 +59,6 @@ type objectWriter struct {
}
func (w *objectWriter) Close() error {
if w.buffer != nil {
w.repo.bufferManager.returnBuffer(w.buffer)
w.buffer = nil
}
if w.listWriter != nil {
w.listWriter.Close()
w.listWriter = nil
@@ -76,10 +71,6 @@ func (w *objectWriter) Write(data []byte) (n int, err error) {
w.totalLength += int64(dataLen)
for _, d := range data {
if w.buffer == nil {
w.buffer = w.repo.bufferManager.newBuffer()
}
w.buffer.WriteByte(d)
if w.splitter.add(d) {
@@ -93,57 +84,55 @@ func (w *objectWriter) Write(data []byte) (n int, err error) {
}
func (w *objectWriter) flushBuffer(force bool) error {
// log.Printf("flushing bufer")
// defer log.Printf("flushed")
if w.buffer != nil || force {
var length int
if w.buffer != nil {
length = w.buffer.Len()
}
b := w.buffer
w.buffer = nil
objectID, err := w.repo.hashEncryptAndWriteMaybeAsync(b, w.prefix, w.disablePacking)
if err != nil {
return fmt.Errorf(
"error when flushing chunk %d of %s: %#v",
w.flushedObjectCount,
w.description,
err)
}
w.blockTracker.addBlock(objectID.StorageBlock)
w.flushedObjectCount++
w.lastFlushedObject = objectID
if w.listWriter == nil {
w.listWriter = &objectWriter{
repo: w.repo,
indirectLevel: w.indirectLevel + 1,
prefix: w.prefix,
description: "LIST(" + w.description + ")",
blockTracker: w.blockTracker,
splitter: w.repo.newSplitter(),
}
w.listProtoWriter = jsonstream.NewWriter(w.listWriter, indirectStreamType)
w.listCurrentPos = 0
}
w.listProtoWriter.Write(&indirectObjectEntry{
Object: &objectID,
Start: w.listCurrentPos,
Length: int64(length),
})
w.listCurrentPos += int64(length)
if !force && w.buffer.Len() == 0 {
return nil
}
length := w.buffer.Len()
var b2 bytes.Buffer
w.buffer.WriteTo(&b2)
w.buffer.Reset()
objectID, err := w.repo.hashEncryptAndWriteMaybeAsync(&b2, w.prefix, w.disablePacking)
if err != nil {
return fmt.Errorf(
"error when flushing chunk %d of %s: %#v",
w.flushedObjectCount,
w.description,
err)
}
w.blockTracker.addBlock(objectID.StorageBlock)
w.flushedObjectCount++
w.lastFlushedObject = objectID
if w.listWriter == nil {
w.listWriter = &objectWriter{
repo: w.repo,
indirectLevel: w.indirectLevel + 1,
prefix: w.prefix,
description: "LIST(" + w.description + ")",
blockTracker: w.blockTracker,
splitter: w.repo.newSplitter(),
}
w.listProtoWriter = jsonstream.NewWriter(w.listWriter, indirectStreamType)
w.listCurrentPos = 0
}
w.listProtoWriter.Write(&indirectObjectEntry{
Object: &objectID,
Start: w.listCurrentPos,
Length: int64(length),
})
w.listCurrentPos += int64(length)
return nil
}
func (w *objectWriter) Result(forceStored bool) (ObjectID, error) {
if !forceStored && w.flushedObjectCount == 0 {
if w.buffer == nil {
if w.buffer.Len() == 0 {
return NullObjectID, nil
}