mirror of
https://github.com/kopia/kopia.git
synced 2026-05-24 06:34:46 -04:00
fixed locking of pack manager, changed how it writes pack and index objects to avoid using object writer, which prevents certain cycles
This commit is contained in:
@@ -65,12 +65,11 @@ func (r *ObjectManager) Optimize(cutoffTime time.Time) error {
|
||||
// NewWriter creates an ObjectWriter for writing to the repository.
|
||||
func (r *ObjectManager) NewWriter(opt WriterOptions) ObjectWriter {
|
||||
w := &objectWriter{
|
||||
repo: r,
|
||||
splitter: r.newSplitter(),
|
||||
description: opt.Description,
|
||||
prefix: opt.BlockNamePrefix,
|
||||
isPackInternalObject: opt.isPackInternalObject,
|
||||
packGroup: opt.PackGroup,
|
||||
repo: r,
|
||||
splitter: r.newSplitter(),
|
||||
description: opt.Description,
|
||||
prefix: opt.BlockNamePrefix,
|
||||
packGroup: opt.PackGroup,
|
||||
}
|
||||
|
||||
if opt.splitter != nil {
|
||||
@@ -278,28 +277,30 @@ func (r *ObjectManager) hashEncryptAndWrite(packGroup string, buffer *bytes.Buff
|
||||
atomic.AddInt32(&r.stats.HashedBlocks, 1)
|
||||
atomic.AddInt64(&r.stats.HashedBytes, int64(len(data)))
|
||||
|
||||
if !isPackInternalObject && r.format.MaxPackedContentLength > 0 && len(data) <= r.format.MaxPackedContentLength {
|
||||
packOID, err := r.packMgr.AddToPack(packGroup, objectID.StorageBlock, data)
|
||||
return packOID, err
|
||||
}
|
||||
if !isPackInternalObject {
|
||||
if r.format.MaxPackedContentLength > 0 && len(data) <= r.format.MaxPackedContentLength {
|
||||
packOID, err := r.packMgr.AddToPack(packGroup, objectID.StorageBlock, data)
|
||||
return packOID, err
|
||||
}
|
||||
|
||||
// Before performing encryption, check if the block is already there.
|
||||
blockSize, err := r.blockSizeCache.getSize(objectID.StorageBlock)
|
||||
atomic.AddInt32(&r.stats.CheckedBlocks, int32(1))
|
||||
if err == nil && blockSize == int64(len(data)) {
|
||||
atomic.AddInt32(&r.stats.PresentBlocks, int32(1))
|
||||
// Block already exists in storage, correct size, return without uploading.
|
||||
return objectID, nil
|
||||
}
|
||||
// Before performing encryption, check if the block is already there.
|
||||
blockSize, err := r.blockSizeCache.getSize(objectID.StorageBlock)
|
||||
atomic.AddInt32(&r.stats.CheckedBlocks, int32(1))
|
||||
if err == nil && blockSize == int64(len(data)) {
|
||||
atomic.AddInt32(&r.stats.PresentBlocks, int32(1))
|
||||
// Block already exists in storage, correct size, return without uploading.
|
||||
return objectID, nil
|
||||
}
|
||||
|
||||
if err != nil && err != blob.ErrBlockNotFound {
|
||||
// Don't know whether block exists in storage.
|
||||
return NullObjectID, err
|
||||
if err != nil && err != blob.ErrBlockNotFound {
|
||||
// Don't know whether block exists in storage.
|
||||
return NullObjectID, err
|
||||
}
|
||||
}
|
||||
|
||||
// Encrypt the block in-place.
|
||||
atomic.AddInt64(&r.stats.EncryptedBytes, int64(len(data)))
|
||||
data, err = r.formatter.Encrypt(data, objectID, 0)
|
||||
data, err := r.formatter.Encrypt(data, objectID, 0)
|
||||
if err != nil {
|
||||
return NullObjectID, err
|
||||
}
|
||||
@@ -312,7 +313,9 @@ func (r *ObjectManager) hashEncryptAndWrite(packGroup string, buffer *bytes.Buff
|
||||
}
|
||||
r.blockSizeCache.put(objectID.StorageBlock, int64(len(data)))
|
||||
|
||||
r.packMgr.RegisterUnpackedBlock(objectID.StorageBlock, int64(len(data)), isPackInternalObject)
|
||||
if !isPackInternalObject {
|
||||
r.packMgr.RegisterUnpackedBlock(objectID.StorageBlock, int64(len(data)))
|
||||
}
|
||||
return objectID, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -240,13 +240,13 @@ func TestIndirection(t *testing.T) {
|
||||
expectedBlockCount int
|
||||
expectedIndirection int
|
||||
}{
|
||||
//{dataLength: 200, expectedBlockCount: 1, expectedIndirection: 0},
|
||||
{dataLength: 200, expectedBlockCount: 1, expectedIndirection: 0},
|
||||
{dataLength: 250, expectedBlockCount: 3, expectedIndirection: 1},
|
||||
// {dataLength: 1400, expectedBlockCount: 7, expectedIndirection: 3},
|
||||
// {dataLength: 2000, expectedBlockCount: 8, expectedIndirection: 3},
|
||||
// {dataLength: 3000, expectedBlockCount: 9, expectedIndirection: 3},
|
||||
// {dataLength: 4000, expectedBlockCount: 14, expectedIndirection: 4},
|
||||
// {dataLength: 10000, expectedBlockCount: 25, expectedIndirection: 4},
|
||||
{dataLength: 1400, expectedBlockCount: 7, expectedIndirection: 3},
|
||||
{dataLength: 2000, expectedBlockCount: 8, expectedIndirection: 3},
|
||||
{dataLength: 3000, expectedBlockCount: 9, expectedIndirection: 3},
|
||||
{dataLength: 4000, expectedBlockCount: 14, expectedIndirection: 4},
|
||||
{dataLength: 10000, expectedBlockCount: 25, expectedIndirection: 4},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
|
||||
@@ -55,10 +55,8 @@ type objectWriter struct {
|
||||
|
||||
description string
|
||||
|
||||
splitter objectSplitter
|
||||
|
||||
isPackInternalObject bool
|
||||
packGroup string
|
||||
splitter objectSplitter
|
||||
packGroup string
|
||||
|
||||
pendingBlocksWG sync.WaitGroup
|
||||
|
||||
@@ -100,7 +98,7 @@ func (w *objectWriter) flushBuffer() error {
|
||||
w.buffer.Reset()
|
||||
|
||||
do := func() {
|
||||
objectID, err := w.repo.hashEncryptAndWrite(w.packGroup, &b2, w.prefix, w.isPackInternalObject)
|
||||
objectID, err := w.repo.hashEncryptAndWrite(w.packGroup, &b2, w.prefix, false)
|
||||
w.repo.trace("OBJECT_WRITER(%q) stored %v (%v bytes)", w.description, objectID, length)
|
||||
if err != nil {
|
||||
w.err.add(fmt.Errorf("error when flushing chunk %d of %s: %v", chunkID, w.description, err))
|
||||
@@ -112,7 +110,7 @@ func (w *objectWriter) flushBuffer() error {
|
||||
|
||||
// When writing pack internal object don't use asynchronous write, since we're already under the semaphore
|
||||
// and it may lead to a deadlock.
|
||||
if w.repo.async && !w.isPackInternalObject {
|
||||
if w.repo.async {
|
||||
w.repo.writeBackSemaphore.Lock()
|
||||
w.pendingBlocksWG.Add(1)
|
||||
w.repo.writeBackWG.Add(1)
|
||||
@@ -150,9 +148,7 @@ func (w *objectWriter) Result() (ObjectID, error) {
|
||||
prefix: w.prefix,
|
||||
description: "LIST(" + w.description + ")",
|
||||
splitter: w.repo.newSplitter(),
|
||||
|
||||
isPackInternalObject: w.isPackInternalObject,
|
||||
packGroup: w.packGroup,
|
||||
packGroup: w.packGroup,
|
||||
}
|
||||
|
||||
jw := jsonstream.NewWriter(iw, indirectStreamType)
|
||||
@@ -173,6 +169,5 @@ type WriterOptions struct {
|
||||
Description string
|
||||
PackGroup string
|
||||
|
||||
splitter objectSplitter
|
||||
isPackInternalObject bool
|
||||
splitter objectSplitter
|
||||
}
|
||||
|
||||
@@ -38,7 +38,7 @@ type packManager struct {
|
||||
objectManager *ObjectManager
|
||||
storage blob.Storage
|
||||
|
||||
mu sync.RWMutex
|
||||
mu sync.Mutex
|
||||
blockToIndex map[string]*packIndex
|
||||
|
||||
pendingPackIndexes packIndexes
|
||||
@@ -52,6 +52,9 @@ func (p *packManager) blockIDToPackSection(blockID string) (ObjectIDSection, boo
|
||||
return ObjectIDSection{}, false, nil
|
||||
}
|
||||
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
pi, err := p.ensurePackIndexesLoaded()
|
||||
if err != nil {
|
||||
return ObjectIDSection{}, false, fmt.Errorf("can't load pack index: %v", err)
|
||||
@@ -92,24 +95,15 @@ func (p *packManager) blockIDToPackSection(blockID string) (ObjectIDSection, boo
|
||||
return ObjectIDSection{}, false, fmt.Errorf("invalid pack index for %q", blockID)
|
||||
}
|
||||
|
||||
func (p *packManager) RegisterUnpackedBlock(blockID string, dataLength int64, isInternal bool) error {
|
||||
func (p *packManager) RegisterUnpackedBlock(blockID string, dataLength int64) error {
|
||||
if strings.HasPrefix(blockID, packObjectPrefix) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !isInternal {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
}
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
// See if we already have this block ID in an unpacked pack group.
|
||||
ndx, ok := p.blockToIndex[blockID]
|
||||
if ok && ndx.PackGroup == unpackedObjectsPackGroup {
|
||||
return nil
|
||||
}
|
||||
|
||||
g := p.ensurePackGroupLocked(unpackedObjectsPackGroup)
|
||||
g.currentPackIndex.Items[blockID] = fmt.Sprintf("0+%v", dataLength)
|
||||
g := p.registerUnpackedBlockLockedNoFlush(blockID, dataLength)
|
||||
|
||||
if time.Now().After(p.flushPackIndexesAfter) || len(g.currentPackIndex.Items) > maxNonPackedBlocksPerPackIndex {
|
||||
if err := p.finishPackAndMaybeFlushIndexes(g); err != nil {
|
||||
@@ -120,16 +114,29 @@ func (p *packManager) RegisterUnpackedBlock(blockID string, dataLength int64, is
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *packManager) registerUnpackedBlockLockedNoFlush(blockID string, dataLength int64) *packInfo {
|
||||
g := p.ensurePackGroupLocked(unpackedObjectsPackGroup)
|
||||
|
||||
// See if we already have this block ID in an unpacked pack group.
|
||||
ndx, ok := p.blockToIndex[blockID]
|
||||
if ok && ndx.PackGroup == unpackedObjectsPackGroup {
|
||||
return g
|
||||
}
|
||||
|
||||
g.currentPackIndex.Items[blockID] = fmt.Sprintf("0+%v", dataLength)
|
||||
return g
|
||||
|
||||
}
|
||||
func (p *packManager) AddToPack(packGroup string, blockID string, data []byte) (ObjectID, error) {
|
||||
if strings.HasPrefix(blockID, packObjectPrefix) {
|
||||
return NullObjectID, fmt.Errorf("pack objects can't be packed: %v", blockID)
|
||||
}
|
||||
|
||||
p.ensurePackIndexesLoaded()
|
||||
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
p.ensurePackIndexesLoaded()
|
||||
|
||||
// See if we already have this block ID in some pack.
|
||||
if _, ok := p.blockToIndex[blockID]; ok {
|
||||
return ObjectID{StorageBlock: blockID}, nil
|
||||
@@ -207,21 +214,15 @@ func (p *packManager) flushPackIndexesLocked() error {
|
||||
}
|
||||
|
||||
func (p *packManager) writePackIndexes(ndx packIndexes) (string, error) {
|
||||
w := p.objectManager.NewWriter(WriterOptions{
|
||||
isPackInternalObject: true,
|
||||
Description: "pack index",
|
||||
BlockNamePrefix: packObjectPrefix,
|
||||
splitter: newNeverSplitter(),
|
||||
})
|
||||
defer w.Close()
|
||||
var buf bytes.Buffer
|
||||
|
||||
zw := gzip.NewWriter(w)
|
||||
zw := gzip.NewWriter(&buf)
|
||||
if err := json.NewEncoder(zw).Encode(ndx); err != nil {
|
||||
return "", fmt.Errorf("can't encode pack index: %v", err)
|
||||
}
|
||||
zw.Close()
|
||||
|
||||
oid, err := w.Result()
|
||||
oid, err := p.objectManager.hashEncryptAndWrite("", &buf, packObjectPrefix, true)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("can't save pack index object: %v", err)
|
||||
}
|
||||
@@ -247,21 +248,11 @@ func (p *packManager) finishPackLocked(g *packInfo) error {
|
||||
if g.currentPackIndex == nil {
|
||||
return nil
|
||||
}
|
||||
p.pendingPackIndexes[g.currentPackID] = g.currentPackIndex
|
||||
|
||||
if g.currentPackData.Len() > 0 {
|
||||
w := p.objectManager.NewWriter(WriterOptions{
|
||||
Description: fmt.Sprintf("pack:%v", g.currentPackID),
|
||||
splitter: newNeverSplitter(),
|
||||
isPackInternalObject: true,
|
||||
})
|
||||
defer w.Close()
|
||||
|
||||
if _, err := g.currentPackData.WriteTo(w); err != nil {
|
||||
return fmt.Errorf("unable to write pack: %v", err)
|
||||
}
|
||||
dataLength := int64(g.currentPackData.Len())
|
||||
oid, err := p.objectManager.hashEncryptAndWrite(unpackedObjectsPackGroup, &g.currentPackData, "", true)
|
||||
g.currentPackData.Reset()
|
||||
oid, err := w.Result()
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't save pack data: %v", err)
|
||||
@@ -271,15 +262,18 @@ func (p *packManager) finishPackLocked(g *packInfo) error {
|
||||
return fmt.Errorf("storage block is empty: %v", oid)
|
||||
}
|
||||
|
||||
p.registerUnpackedBlockLockedNoFlush(oid.StorageBlock, dataLength)
|
||||
|
||||
g.currentPackIndex.PackBlockID = oid.StorageBlock
|
||||
}
|
||||
|
||||
p.pendingPackIndexes[g.currentPackID] = g.currentPackIndex
|
||||
g.currentPackIndex = nil
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *packManager) loadMergedPackIndex(olderThan *time.Time) (map[string]*packIndex, []string, error) {
|
||||
func (p *packManager) loadMergedPackIndexLocked(olderThan *time.Time) (map[string]*packIndex, []string, error) {
|
||||
ch, cancel := p.objectManager.storage.ListBlocks(packObjectPrefix)
|
||||
defer cancel()
|
||||
|
||||
@@ -389,17 +383,12 @@ func (p *packManager) loadMergedPackIndex(olderThan *time.Time) (map[string]*pac
|
||||
}
|
||||
|
||||
func (p *packManager) ensurePackIndexesLoaded() (map[string]*packIndex, error) {
|
||||
p.mu.RLock()
|
||||
pi := p.blockToIndex
|
||||
p.mu.RUnlock()
|
||||
if pi != nil {
|
||||
return pi, nil
|
||||
}
|
||||
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
merged, _, err := p.loadMergedPackIndex(nil)
|
||||
merged, _, err := p.loadMergedPackIndexLocked(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -418,7 +407,10 @@ func (p *packManager) ensurePackIndexesLoaded() (map[string]*packIndex, error) {
|
||||
}
|
||||
|
||||
func (p *packManager) Compact(cutoffTime time.Time) error {
|
||||
merged, blockIDs, err := p.loadMergedPackIndex(&cutoffTime)
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
merged, blockIDs, err := p.loadMergedPackIndexLocked(&cutoffTime)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user