Index manager refactor (#1131)

* content: cleaned up and refactored indexBlobManager api, no code movement

* content: refactored indexBlobManagerImpl - code move only
This commit is contained in:
Jarek Kowalski
2021-06-11 19:05:33 -07:00
committed by GitHub
parent 42c59cb65b
commit 17ba77f57f
5 changed files with 174 additions and 162 deletions

View File

@@ -117,13 +117,13 @@ func (sm *SharedManager) attemptReadPackFileLocalIndex(ctx context.Context, pack
return localIndexBytes, nil
}
func (sm *SharedManager) loadPackIndexesUnlocked(ctx context.Context) ([]IndexBlobInfo, error) {
func (sm *SharedManager) loadPackIndexesUnlocked(ctx context.Context) error {
nextSleepTime := 100 * time.Millisecond //nolint:gomnd
for i := 0; i < indexLoadAttempts; i++ {
if err := ctx.Err(); err != nil {
// nolint:wrapcheck
return nil, err
return err
}
if i > 0 {
@@ -135,7 +135,7 @@ func (sm *SharedManager) loadPackIndexesUnlocked(ctx context.Context) ([]IndexBl
indexBlobs, err := sm.indexBlobManager.listActiveIndexBlobs(ctx)
if err != nil {
return nil, errors.Wrap(err, "error listing index blobs")
return errors.Wrap(err, "error listing index blobs")
}
var indexBlobIDs []blob.ID
@@ -147,22 +147,22 @@ func (sm *SharedManager) loadPackIndexesUnlocked(ctx context.Context) ([]IndexBl
if err == nil {
err = sm.committedContents.use(ctx, indexBlobIDs)
if err != nil {
return nil, err
return err
}
if len(indexBlobs) > indexBlobCompactionWarningThreshold {
sm.log.Errorf("Found too many index blobs (%v), this may result in degraded performance.\n\nPlease ensure periodic repository maintenance is enabled or run 'kopia maintenance'.", len(indexBlobs))
}
return indexBlobs, nil
return nil
}
if !errors.Is(err, blob.ErrBlobNotFound) {
return nil, err
return err
}
}
return nil, errors.Errorf("unable to load pack indexes despite %v retries", indexLoadAttempts)
return errors.Errorf("unable to load pack indexes despite %v retries", indexLoadAttempts)
}
func (sm *SharedManager) getCacheForContentID(id ID) contentCache {
@@ -280,6 +280,8 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca
ownWritesCache: owc,
listCache: listCache,
indexBlobCache: metadataCache,
maxPackSize: sm.maxPackSize,
indexVersion: sm.indexVersion,
log: logging.WithPrefix("[index-blob-manager] ", sm.sharedBaseLogger),
}
@@ -413,7 +415,7 @@ func NewSharedManager(ctx context.Context, st blob.Storage, f *FormattingOptions
return nil, errors.Wrap(err, "error setting up read manager caches")
}
if _, err := sm.loadPackIndexesUnlocked(ctx); err != nil {
if err := sm.loadPackIndexesUnlocked(ctx); err != nil {
return nil, errors.Wrap(err, "error loading indexes")
}

View File

@@ -751,7 +751,7 @@ func (bm *WriteManager) Refresh(ctx context.Context) error {
t0 := clock.Now()
_, err := bm.loadPackIndexesUnlocked(ctx)
err := bm.loadPackIndexesUnlocked(ctx)
bm.log.Debugf("Refresh completed in %v", clock.Since(t0))
return err

View File

@@ -33,164 +33,18 @@ func (co *CompactOptions) maxEventualConsistencySettleTime() time.Duration {
func (bm *WriteManager) CompactIndexes(ctx context.Context, opt CompactOptions) error {
bm.log.Debugf("CompactIndexes(%+v)", opt)
bm.lock()
defer bm.unlock()
indexBlobs, err := bm.loadPackIndexesUnlocked(ctx)
if err != nil {
return errors.Wrap(err, "error loading indexes")
}
blobsToCompact := bm.getBlobsToCompact(indexBlobs, opt)
if err := bm.compactIndexBlobs(ctx, blobsToCompact, opt); err != nil {
if err := bm.indexBlobManager.compact(ctx, opt); err != nil {
return errors.Wrap(err, "error performing compaction")
}
if err := bm.indexBlobManager.cleanup(ctx, opt.maxEventualConsistencySettleTime()); err != nil {
return errors.Wrap(err, "error cleaning up index blobs")
}
// reload indexes after cleanup.
if _, err := bm.loadPackIndexesUnlocked(ctx); err != nil {
// reload indexes after compaction.
if err := bm.loadPackIndexesUnlocked(ctx); err != nil {
return errors.Wrap(err, "error re-loading indexes")
}
return nil
}
func (sm *SharedManager) getBlobsToCompact(indexBlobs []IndexBlobInfo, opt CompactOptions) []IndexBlobInfo {
var nonCompactedBlobs, verySmallBlobs []IndexBlobInfo
var totalSizeNonCompactedBlobs, totalSizeVerySmallBlobs, totalSizeMediumSizedBlobs int64
var mediumSizedBlobCount int
for _, b := range indexBlobs {
if b.Length > int64(sm.maxPackSize) && !opt.AllIndexes {
continue
}
nonCompactedBlobs = append(nonCompactedBlobs, b)
totalSizeNonCompactedBlobs += b.Length
if b.Length < int64(sm.maxPackSize/verySmallContentFraction) {
verySmallBlobs = append(verySmallBlobs, b)
totalSizeVerySmallBlobs += b.Length
} else {
mediumSizedBlobCount++
totalSizeMediumSizedBlobs += b.Length
}
}
if len(nonCompactedBlobs) < opt.MaxSmallBlobs {
// current count is below min allowed - nothing to do
sm.log.Debugf("no small contents to compact")
return nil
}
if len(verySmallBlobs) > len(nonCompactedBlobs)/2 && mediumSizedBlobCount+1 < opt.MaxSmallBlobs {
sm.log.Debugf("compacting %v very small contents", len(verySmallBlobs))
return verySmallBlobs
}
sm.log.Debugf("compacting all %v non-compacted contents", len(nonCompactedBlobs))
return nonCompactedBlobs
}
func (sm *SharedManager) compactIndexBlobs(ctx context.Context, indexBlobs []IndexBlobInfo, opt CompactOptions) error {
if len(indexBlobs) <= 1 && opt.DropDeletedBefore.IsZero() && len(opt.DropContents) == 0 {
return nil
}
bld := make(packIndexBuilder)
var inputs, outputs []blob.Metadata
for i, indexBlob := range indexBlobs {
sm.log.Debugf("compacting-entries[%v/%v] %v", i, len(indexBlobs), indexBlob)
if err := sm.addIndexBlobsToBuilder(ctx, bld, indexBlob); err != nil {
return errors.Wrap(err, "error adding index to builder")
}
inputs = append(inputs, indexBlob.Metadata)
}
// after we built index map in memory, drop contents from it
// we must do it after all input blobs have been merged, otherwise we may resurrect contents.
sm.dropContentsFromBuilder(bld, opt)
var buf bytes.Buffer
if err := bld.Build(&buf, sm.indexVersion); err != nil {
return errors.Wrap(err, "unable to build an index")
}
compactedIndexBlob, err := sm.indexBlobManager.writeIndexBlob(ctx, buf.Bytes(), "")
if err != nil {
return errors.Wrap(err, "unable to write compacted indexes")
}
// compaction wrote index blob that's the same as one of the sources
// it must be a no-op.
for _, indexBlob := range indexBlobs {
if indexBlob.BlobID == compactedIndexBlob.BlobID {
sm.log.Debugf("compaction-noop")
return nil
}
}
outputs = append(outputs, compactedIndexBlob)
if err := sm.indexBlobManager.registerCompaction(ctx, inputs, outputs, opt.maxEventualConsistencySettleTime()); err != nil {
return errors.Wrap(err, "unable to register compaction")
}
return nil
}
func (sm *SharedManager) dropContentsFromBuilder(bld packIndexBuilder, opt CompactOptions) {
for _, dc := range opt.DropContents {
if _, ok := bld[dc]; ok {
sm.log.Debugf("manual-drop-from-index %v", dc)
delete(bld, dc)
}
}
if !opt.DropDeletedBefore.IsZero() {
sm.log.Debugf("drop-content-deleted-before %v", opt.DropDeletedBefore)
for _, i := range bld {
if i.GetDeleted() && i.Timestamp().Before(opt.DropDeletedBefore) {
sm.log.Debugf("drop-from-index-old-deleted %v %v", i.GetContentID(), i.Timestamp())
delete(bld, i.GetContentID())
}
}
sm.log.Debugf("finished drop-content-deleted-before %v", opt.DropDeletedBefore)
}
}
func (sm *SharedManager) addIndexBlobsToBuilder(ctx context.Context, bld packIndexBuilder, indexBlob IndexBlobInfo) error {
data, err := sm.indexBlobManager.getIndexBlob(ctx, indexBlob.BlobID)
if err != nil {
return errors.Wrapf(err, "error getting index %q", indexBlob.BlobID)
}
index, err := openPackIndex(bytes.NewReader(data), uint32(sm.crypter.Encryptor.Overhead()))
if err != nil {
return errors.Wrapf(err, "unable to open index blob %q", indexBlob)
}
_ = index.Iterate(AllIDs, func(i Info) error {
bld.Add(i)
return nil
})
return nil
}
// ParseIndexBlob loads entries in a given index blob and returns them.
func (sm *SharedManager) ParseIndexBlob(ctx context.Context, blobID blob.ID) ([]Info, error) {
data, err := sm.indexBlobManager.getIndexBlob(ctx, blobID)

View File

@@ -1,6 +1,7 @@
package content
import (
"bytes"
"context"
"encoding/json"
"time"
@@ -19,8 +20,8 @@ type indexBlobManager interface {
listActiveIndexBlobs(ctx context.Context) ([]IndexBlobInfo, error)
listAllIndexBlobs(ctx context.Context) ([]IndexBlobInfo, error)
getIndexBlob(ctx context.Context, blobID blob.ID) ([]byte, error)
compact(ctx context.Context, opts CompactOptions) error
registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata, maxEventualConsistencySettleTime time.Duration) error
cleanup(ctx context.Context, maxEventualConsistencySettleTime time.Duration) error
flushCache()
}
@@ -63,6 +64,8 @@ type indexBlobManagerImpl struct {
timeNow func() time.Time
indexBlobCache contentCache
log logging.Logger
maxPackSize int
indexVersion int
}
func (m *indexBlobManagerImpl) listAndMergeOwnWrites(ctx context.Context, prefix blob.ID) ([]blob.Metadata, error) {
@@ -145,6 +148,25 @@ func (m *indexBlobManagerImpl) flushCache() {
m.listCache.deleteListCache(compactionLogBlobPrefix)
}
func (m *indexBlobManagerImpl) compact(ctx context.Context, opt CompactOptions) error {
indexBlobs, err := m.listActiveIndexBlobs(ctx)
if err != nil {
return errors.Wrap(err, "error listing active index blobs")
}
blobsToCompact := m.getBlobsToCompact(indexBlobs, opt)
if err := m.compactIndexBlobs(ctx, blobsToCompact, opt); err != nil {
return errors.Wrap(err, "error performing compaction")
}
if err := m.cleanup(ctx, opt.maxEventualConsistencySettleTime()); err != nil {
return errors.Wrap(err, "error cleaning up index blobs")
}
return nil
}
func (m *indexBlobManagerImpl) registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata, maxEventualConsistencySettleTime time.Duration) error {
logEntryBytes, err := json.Marshal(&compactionLogEntry{
InputMetadata: inputs,
@@ -435,6 +457,138 @@ func (m *indexBlobManagerImpl) cleanup(ctx context.Context, maxEventualConsisten
return nil
}
func (m *indexBlobManagerImpl) getBlobsToCompact(indexBlobs []IndexBlobInfo, opt CompactOptions) []IndexBlobInfo {
var nonCompactedBlobs, verySmallBlobs []IndexBlobInfo
var totalSizeNonCompactedBlobs, totalSizeVerySmallBlobs, totalSizeMediumSizedBlobs int64
var mediumSizedBlobCount int
for _, b := range indexBlobs {
if b.Length > int64(m.maxPackSize) && !opt.AllIndexes {
continue
}
nonCompactedBlobs = append(nonCompactedBlobs, b)
totalSizeNonCompactedBlobs += b.Length
if b.Length < int64(m.maxPackSize/verySmallContentFraction) {
verySmallBlobs = append(verySmallBlobs, b)
totalSizeVerySmallBlobs += b.Length
} else {
mediumSizedBlobCount++
totalSizeMediumSizedBlobs += b.Length
}
}
if len(nonCompactedBlobs) < opt.MaxSmallBlobs {
// current count is below min allowed - nothing to do
m.log.Debugf("no small contents to compact")
return nil
}
if len(verySmallBlobs) > len(nonCompactedBlobs)/2 && mediumSizedBlobCount+1 < opt.MaxSmallBlobs {
m.log.Debugf("compacting %v very small contents", len(verySmallBlobs))
return verySmallBlobs
}
m.log.Debugf("compacting all %v non-compacted contents", len(nonCompactedBlobs))
return nonCompactedBlobs
}
func (m *indexBlobManagerImpl) compactIndexBlobs(ctx context.Context, indexBlobs []IndexBlobInfo, opt CompactOptions) error {
if len(indexBlobs) <= 1 && opt.DropDeletedBefore.IsZero() && len(opt.DropContents) == 0 {
return nil
}
bld := make(packIndexBuilder)
var inputs, outputs []blob.Metadata
for i, indexBlob := range indexBlobs {
m.log.Debugf("compacting-entries[%v/%v] %v", i, len(indexBlobs), indexBlob)
if err := m.addIndexBlobsToBuilder(ctx, bld, indexBlob); err != nil {
return errors.Wrap(err, "error adding index to builder")
}
inputs = append(inputs, indexBlob.Metadata)
}
// after we built index map in memory, drop contents from it
// we must do it after all input blobs have been merged, otherwise we may resurrect contents.
m.dropContentsFromBuilder(bld, opt)
var buf bytes.Buffer
if err := bld.Build(&buf, m.indexVersion); err != nil {
return errors.Wrap(err, "unable to build an index")
}
compactedIndexBlob, err := m.writeIndexBlob(ctx, buf.Bytes(), "")
if err != nil {
return errors.Wrap(err, "unable to write compacted indexes")
}
// compaction wrote index blob that's the same as one of the sources
// it must be a no-op.
for _, indexBlob := range indexBlobs {
if indexBlob.BlobID == compactedIndexBlob.BlobID {
m.log.Debugf("compaction-noop")
return nil
}
}
outputs = append(outputs, compactedIndexBlob)
if err := m.registerCompaction(ctx, inputs, outputs, opt.maxEventualConsistencySettleTime()); err != nil {
return errors.Wrap(err, "unable to register compaction")
}
return nil
}
func (m *indexBlobManagerImpl) dropContentsFromBuilder(bld packIndexBuilder, opt CompactOptions) {
for _, dc := range opt.DropContents {
if _, ok := bld[dc]; ok {
m.log.Debugf("manual-drop-from-index %v", dc)
delete(bld, dc)
}
}
if !opt.DropDeletedBefore.IsZero() {
m.log.Debugf("drop-content-deleted-before %v", opt.DropDeletedBefore)
for _, i := range bld {
if i.GetDeleted() && i.Timestamp().Before(opt.DropDeletedBefore) {
m.log.Debugf("drop-from-index-old-deleted %v %v", i.GetContentID(), i.Timestamp())
delete(bld, i.GetContentID())
}
}
m.log.Debugf("finished drop-content-deleted-before %v", opt.DropDeletedBefore)
}
}
func (m *indexBlobManagerImpl) addIndexBlobsToBuilder(ctx context.Context, bld packIndexBuilder, indexBlob IndexBlobInfo) error {
data, err := m.getIndexBlob(ctx, indexBlob.BlobID)
if err != nil {
return errors.Wrapf(err, "error getting index %q", indexBlob.BlobID)
}
index, err := openPackIndex(bytes.NewReader(data), uint32(m.crypter.Encryptor.Overhead()))
if err != nil {
return errors.Wrapf(err, "unable to open index blob %q", indexBlob)
}
_ = index.Iterate(AllIDs, func(i Info) error {
bld.Add(i)
return nil
})
return nil
}
func blobsOlderThan(m []blob.Metadata, cutoffTime time.Time) []blob.Metadata {
var res []blob.Metadata

View File

@@ -796,9 +796,11 @@ func newIndexBlobManagerForTesting(t *testing.T, st blob.Storage, localTimeNow f
HashFunction: hf,
Encryptor: enc,
},
listCache: lc,
timeNow: localTimeNow,
log: repologging.Printf(t.Logf)("test"),
listCache: lc,
timeNow: localTimeNow,
maxPackSize: 20 << 20,
indexVersion: 1,
log: repologging.Printf(t.Logf)("test"),
}
return m