From 79440ab969bebde1c31bba658af003dfe138f89f Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sun, 24 Sep 2017 21:35:19 -0700 Subject: [PATCH] fixed locking of pack manager, changed how it writes pack and index objects to avoid using object writer, which prevents certain cycles --- repo/object_manager.go | 49 +++++++++++----------- repo/object_manager_test.go | 12 +++--- repo/object_writer.go | 17 +++----- repo/pack_manager.go | 82 +++++++++++++++++-------------------- 4 files changed, 75 insertions(+), 85 deletions(-) diff --git a/repo/object_manager.go b/repo/object_manager.go index ee82df5d1..776c77293 100644 --- a/repo/object_manager.go +++ b/repo/object_manager.go @@ -65,12 +65,11 @@ func (r *ObjectManager) Optimize(cutoffTime time.Time) error { // NewWriter creates an ObjectWriter for writing to the repository. func (r *ObjectManager) NewWriter(opt WriterOptions) ObjectWriter { w := &objectWriter{ - repo: r, - splitter: r.newSplitter(), - description: opt.Description, - prefix: opt.BlockNamePrefix, - isPackInternalObject: opt.isPackInternalObject, - packGroup: opt.PackGroup, + repo: r, + splitter: r.newSplitter(), + description: opt.Description, + prefix: opt.BlockNamePrefix, + packGroup: opt.PackGroup, } if opt.splitter != nil { @@ -278,28 +277,30 @@ func (r *ObjectManager) hashEncryptAndWrite(packGroup string, buffer *bytes.Buff atomic.AddInt32(&r.stats.HashedBlocks, 1) atomic.AddInt64(&r.stats.HashedBytes, int64(len(data))) - if !isPackInternalObject && r.format.MaxPackedContentLength > 0 && len(data) <= r.format.MaxPackedContentLength { - packOID, err := r.packMgr.AddToPack(packGroup, objectID.StorageBlock, data) - return packOID, err - } + if !isPackInternalObject { + if r.format.MaxPackedContentLength > 0 && len(data) <= r.format.MaxPackedContentLength { + packOID, err := r.packMgr.AddToPack(packGroup, objectID.StorageBlock, data) + return packOID, err + } - // Before performing encryption, check if the block is already there. - blockSize, err := r.blockSizeCache.getSize(objectID.StorageBlock) - atomic.AddInt32(&r.stats.CheckedBlocks, int32(1)) - if err == nil && blockSize == int64(len(data)) { - atomic.AddInt32(&r.stats.PresentBlocks, int32(1)) - // Block already exists in storage, correct size, return without uploading. - return objectID, nil - } + // Before performing encryption, check if the block is already there. + blockSize, err := r.blockSizeCache.getSize(objectID.StorageBlock) + atomic.AddInt32(&r.stats.CheckedBlocks, int32(1)) + if err == nil && blockSize == int64(len(data)) { + atomic.AddInt32(&r.stats.PresentBlocks, int32(1)) + // Block already exists in storage, correct size, return without uploading. + return objectID, nil + } - if err != nil && err != blob.ErrBlockNotFound { - // Don't know whether block exists in storage. - return NullObjectID, err + if err != nil && err != blob.ErrBlockNotFound { + // Don't know whether block exists in storage. + return NullObjectID, err + } } // Encrypt the block in-place. atomic.AddInt64(&r.stats.EncryptedBytes, int64(len(data))) - data, err = r.formatter.Encrypt(data, objectID, 0) + data, err := r.formatter.Encrypt(data, objectID, 0) if err != nil { return NullObjectID, err } @@ -312,7 +313,9 @@ func (r *ObjectManager) hashEncryptAndWrite(packGroup string, buffer *bytes.Buff } r.blockSizeCache.put(objectID.StorageBlock, int64(len(data))) - r.packMgr.RegisterUnpackedBlock(objectID.StorageBlock, int64(len(data)), isPackInternalObject) + if !isPackInternalObject { + r.packMgr.RegisterUnpackedBlock(objectID.StorageBlock, int64(len(data))) + } return objectID, nil } diff --git a/repo/object_manager_test.go b/repo/object_manager_test.go index 48ba126bc..3cca342e6 100644 --- a/repo/object_manager_test.go +++ b/repo/object_manager_test.go @@ -240,13 +240,13 @@ func TestIndirection(t *testing.T) { expectedBlockCount int expectedIndirection int }{ - //{dataLength: 200, expectedBlockCount: 1, expectedIndirection: 0}, + {dataLength: 200, expectedBlockCount: 1, expectedIndirection: 0}, {dataLength: 250, expectedBlockCount: 3, expectedIndirection: 1}, - // {dataLength: 1400, expectedBlockCount: 7, expectedIndirection: 3}, - // {dataLength: 2000, expectedBlockCount: 8, expectedIndirection: 3}, - // {dataLength: 3000, expectedBlockCount: 9, expectedIndirection: 3}, - // {dataLength: 4000, expectedBlockCount: 14, expectedIndirection: 4}, - // {dataLength: 10000, expectedBlockCount: 25, expectedIndirection: 4}, + {dataLength: 1400, expectedBlockCount: 7, expectedIndirection: 3}, + {dataLength: 2000, expectedBlockCount: 8, expectedIndirection: 3}, + {dataLength: 3000, expectedBlockCount: 9, expectedIndirection: 3}, + {dataLength: 4000, expectedBlockCount: 14, expectedIndirection: 4}, + {dataLength: 10000, expectedBlockCount: 25, expectedIndirection: 4}, } for _, c := range cases { diff --git a/repo/object_writer.go b/repo/object_writer.go index 6285a2df4..3050d1c89 100644 --- a/repo/object_writer.go +++ b/repo/object_writer.go @@ -55,10 +55,8 @@ type objectWriter struct { description string - splitter objectSplitter - - isPackInternalObject bool - packGroup string + splitter objectSplitter + packGroup string pendingBlocksWG sync.WaitGroup @@ -100,7 +98,7 @@ func (w *objectWriter) flushBuffer() error { w.buffer.Reset() do := func() { - objectID, err := w.repo.hashEncryptAndWrite(w.packGroup, &b2, w.prefix, w.isPackInternalObject) + objectID, err := w.repo.hashEncryptAndWrite(w.packGroup, &b2, w.prefix, false) w.repo.trace("OBJECT_WRITER(%q) stored %v (%v bytes)", w.description, objectID, length) if err != nil { w.err.add(fmt.Errorf("error when flushing chunk %d of %s: %v", chunkID, w.description, err)) @@ -112,7 +110,7 @@ func (w *objectWriter) flushBuffer() error { // When writing pack internal object don't use asynchronous write, since we're already under the semaphore // and it may lead to a deadlock. - if w.repo.async && !w.isPackInternalObject { + if w.repo.async { w.repo.writeBackSemaphore.Lock() w.pendingBlocksWG.Add(1) w.repo.writeBackWG.Add(1) @@ -150,9 +148,7 @@ func (w *objectWriter) Result() (ObjectID, error) { prefix: w.prefix, description: "LIST(" + w.description + ")", splitter: w.repo.newSplitter(), - - isPackInternalObject: w.isPackInternalObject, - packGroup: w.packGroup, + packGroup: w.packGroup, } jw := jsonstream.NewWriter(iw, indirectStreamType) @@ -173,6 +169,5 @@ type WriterOptions struct { Description string PackGroup string - splitter objectSplitter - isPackInternalObject bool + splitter objectSplitter } diff --git a/repo/pack_manager.go b/repo/pack_manager.go index 24696c2f3..7f444f166 100644 --- a/repo/pack_manager.go +++ b/repo/pack_manager.go @@ -38,7 +38,7 @@ type packManager struct { objectManager *ObjectManager storage blob.Storage - mu sync.RWMutex + mu sync.Mutex blockToIndex map[string]*packIndex pendingPackIndexes packIndexes @@ -52,6 +52,9 @@ func (p *packManager) blockIDToPackSection(blockID string) (ObjectIDSection, boo return ObjectIDSection{}, false, nil } + p.mu.Lock() + defer p.mu.Unlock() + pi, err := p.ensurePackIndexesLoaded() if err != nil { return ObjectIDSection{}, false, fmt.Errorf("can't load pack index: %v", err) @@ -92,24 +95,15 @@ func (p *packManager) blockIDToPackSection(blockID string) (ObjectIDSection, boo return ObjectIDSection{}, false, fmt.Errorf("invalid pack index for %q", blockID) } -func (p *packManager) RegisterUnpackedBlock(blockID string, dataLength int64, isInternal bool) error { +func (p *packManager) RegisterUnpackedBlock(blockID string, dataLength int64) error { if strings.HasPrefix(blockID, packObjectPrefix) { return nil } - if !isInternal { - p.mu.Lock() - defer p.mu.Unlock() - } + p.mu.Lock() + defer p.mu.Unlock() - // See if we already have this block ID in an unpacked pack group. - ndx, ok := p.blockToIndex[blockID] - if ok && ndx.PackGroup == unpackedObjectsPackGroup { - return nil - } - - g := p.ensurePackGroupLocked(unpackedObjectsPackGroup) - g.currentPackIndex.Items[blockID] = fmt.Sprintf("0+%v", dataLength) + g := p.registerUnpackedBlockLockedNoFlush(blockID, dataLength) if time.Now().After(p.flushPackIndexesAfter) || len(g.currentPackIndex.Items) > maxNonPackedBlocksPerPackIndex { if err := p.finishPackAndMaybeFlushIndexes(g); err != nil { @@ -120,16 +114,29 @@ func (p *packManager) RegisterUnpackedBlock(blockID string, dataLength int64, is return nil } +func (p *packManager) registerUnpackedBlockLockedNoFlush(blockID string, dataLength int64) *packInfo { + g := p.ensurePackGroupLocked(unpackedObjectsPackGroup) + + // See if we already have this block ID in an unpacked pack group. + ndx, ok := p.blockToIndex[blockID] + if ok && ndx.PackGroup == unpackedObjectsPackGroup { + return g + } + + g.currentPackIndex.Items[blockID] = fmt.Sprintf("0+%v", dataLength) + return g + +} func (p *packManager) AddToPack(packGroup string, blockID string, data []byte) (ObjectID, error) { if strings.HasPrefix(blockID, packObjectPrefix) { return NullObjectID, fmt.Errorf("pack objects can't be packed: %v", blockID) } - p.ensurePackIndexesLoaded() - p.mu.Lock() defer p.mu.Unlock() + p.ensurePackIndexesLoaded() + // See if we already have this block ID in some pack. if _, ok := p.blockToIndex[blockID]; ok { return ObjectID{StorageBlock: blockID}, nil @@ -207,21 +214,15 @@ func (p *packManager) flushPackIndexesLocked() error { } func (p *packManager) writePackIndexes(ndx packIndexes) (string, error) { - w := p.objectManager.NewWriter(WriterOptions{ - isPackInternalObject: true, - Description: "pack index", - BlockNamePrefix: packObjectPrefix, - splitter: newNeverSplitter(), - }) - defer w.Close() + var buf bytes.Buffer - zw := gzip.NewWriter(w) + zw := gzip.NewWriter(&buf) if err := json.NewEncoder(zw).Encode(ndx); err != nil { return "", fmt.Errorf("can't encode pack index: %v", err) } zw.Close() - oid, err := w.Result() + oid, err := p.objectManager.hashEncryptAndWrite("", &buf, packObjectPrefix, true) if err != nil { return "", fmt.Errorf("can't save pack index object: %v", err) } @@ -247,21 +248,11 @@ func (p *packManager) finishPackLocked(g *packInfo) error { if g.currentPackIndex == nil { return nil } - p.pendingPackIndexes[g.currentPackID] = g.currentPackIndex if g.currentPackData.Len() > 0 { - w := p.objectManager.NewWriter(WriterOptions{ - Description: fmt.Sprintf("pack:%v", g.currentPackID), - splitter: newNeverSplitter(), - isPackInternalObject: true, - }) - defer w.Close() - - if _, err := g.currentPackData.WriteTo(w); err != nil { - return fmt.Errorf("unable to write pack: %v", err) - } + dataLength := int64(g.currentPackData.Len()) + oid, err := p.objectManager.hashEncryptAndWrite(unpackedObjectsPackGroup, &g.currentPackData, "", true) g.currentPackData.Reset() - oid, err := w.Result() if err != nil { return fmt.Errorf("can't save pack data: %v", err) @@ -271,15 +262,18 @@ func (p *packManager) finishPackLocked(g *packInfo) error { return fmt.Errorf("storage block is empty: %v", oid) } + p.registerUnpackedBlockLockedNoFlush(oid.StorageBlock, dataLength) + g.currentPackIndex.PackBlockID = oid.StorageBlock } + p.pendingPackIndexes[g.currentPackID] = g.currentPackIndex g.currentPackIndex = nil return nil } -func (p *packManager) loadMergedPackIndex(olderThan *time.Time) (map[string]*packIndex, []string, error) { +func (p *packManager) loadMergedPackIndexLocked(olderThan *time.Time) (map[string]*packIndex, []string, error) { ch, cancel := p.objectManager.storage.ListBlocks(packObjectPrefix) defer cancel() @@ -389,17 +383,12 @@ func (p *packManager) loadMergedPackIndex(olderThan *time.Time) (map[string]*pac } func (p *packManager) ensurePackIndexesLoaded() (map[string]*packIndex, error) { - p.mu.RLock() pi := p.blockToIndex - p.mu.RUnlock() if pi != nil { return pi, nil } - p.mu.Lock() - defer p.mu.Unlock() - - merged, _, err := p.loadMergedPackIndex(nil) + merged, _, err := p.loadMergedPackIndexLocked(nil) if err != nil { return nil, err } @@ -418,7 +407,10 @@ func (p *packManager) ensurePackIndexesLoaded() (map[string]*packIndex, error) { } func (p *packManager) Compact(cutoffTime time.Time) error { - merged, blockIDs, err := p.loadMergedPackIndex(&cutoffTime) + p.mu.Lock() + defer p.mu.Unlock() + + merged, blockIDs, err := p.loadMergedPackIndexLocked(&cutoffTime) if err != nil { return err }