diff --git a/repo/content/committed_read_manager.go b/repo/content/committed_read_manager.go index 7a158a375..4d6a558db 100644 --- a/repo/content/committed_read_manager.go +++ b/repo/content/committed_read_manager.go @@ -117,13 +117,13 @@ func (sm *SharedManager) attemptReadPackFileLocalIndex(ctx context.Context, pack return localIndexBytes, nil } -func (sm *SharedManager) loadPackIndexesUnlocked(ctx context.Context) ([]IndexBlobInfo, error) { +func (sm *SharedManager) loadPackIndexesUnlocked(ctx context.Context) error { nextSleepTime := 100 * time.Millisecond //nolint:gomnd for i := 0; i < indexLoadAttempts; i++ { if err := ctx.Err(); err != nil { // nolint:wrapcheck - return nil, err + return err } if i > 0 { @@ -135,7 +135,7 @@ func (sm *SharedManager) loadPackIndexesUnlocked(ctx context.Context) ([]IndexBl indexBlobs, err := sm.indexBlobManager.listActiveIndexBlobs(ctx) if err != nil { - return nil, errors.Wrap(err, "error listing index blobs") + return errors.Wrap(err, "error listing index blobs") } var indexBlobIDs []blob.ID @@ -147,22 +147,22 @@ func (sm *SharedManager) loadPackIndexesUnlocked(ctx context.Context) ([]IndexBl if err == nil { err = sm.committedContents.use(ctx, indexBlobIDs) if err != nil { - return nil, err + return err } if len(indexBlobs) > indexBlobCompactionWarningThreshold { sm.log.Errorf("Found too many index blobs (%v), this may result in degraded performance.\n\nPlease ensure periodic repository maintenance is enabled or run 'kopia maintenance'.", len(indexBlobs)) } - return indexBlobs, nil + return nil } if !errors.Is(err, blob.ErrBlobNotFound) { - return nil, err + return err } } - return nil, errors.Errorf("unable to load pack indexes despite %v retries", indexLoadAttempts) + return errors.Errorf("unable to load pack indexes despite %v retries", indexLoadAttempts) } func (sm *SharedManager) getCacheForContentID(id ID) contentCache { @@ -280,6 +280,8 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca ownWritesCache: owc, listCache: listCache, indexBlobCache: metadataCache, + maxPackSize: sm.maxPackSize, + indexVersion: sm.indexVersion, log: logging.WithPrefix("[index-blob-manager] ", sm.sharedBaseLogger), } @@ -413,7 +415,7 @@ func NewSharedManager(ctx context.Context, st blob.Storage, f *FormattingOptions return nil, errors.Wrap(err, "error setting up read manager caches") } - if _, err := sm.loadPackIndexesUnlocked(ctx); err != nil { + if err := sm.loadPackIndexesUnlocked(ctx); err != nil { return nil, errors.Wrap(err, "error loading indexes") } diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index 6679e0358..4f7d1ec1f 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -751,7 +751,7 @@ func (bm *WriteManager) Refresh(ctx context.Context) error { t0 := clock.Now() - _, err := bm.loadPackIndexesUnlocked(ctx) + err := bm.loadPackIndexesUnlocked(ctx) bm.log.Debugf("Refresh completed in %v", clock.Since(t0)) return err diff --git a/repo/content/content_manager_indexes.go b/repo/content/content_manager_indexes.go index 868e5c867..3e980e941 100644 --- a/repo/content/content_manager_indexes.go +++ b/repo/content/content_manager_indexes.go @@ -33,164 +33,18 @@ func (co *CompactOptions) maxEventualConsistencySettleTime() time.Duration { func (bm *WriteManager) CompactIndexes(ctx context.Context, opt CompactOptions) error { bm.log.Debugf("CompactIndexes(%+v)", opt) - bm.lock() - defer bm.unlock() - - indexBlobs, err := bm.loadPackIndexesUnlocked(ctx) - if err != nil { - return errors.Wrap(err, "error loading indexes") - } - - blobsToCompact := bm.getBlobsToCompact(indexBlobs, opt) - - if err := bm.compactIndexBlobs(ctx, blobsToCompact, opt); err != nil { + if err := bm.indexBlobManager.compact(ctx, opt); err != nil { return errors.Wrap(err, "error performing compaction") } - if err := bm.indexBlobManager.cleanup(ctx, opt.maxEventualConsistencySettleTime()); err != nil { - return errors.Wrap(err, "error cleaning up index blobs") - } - - // reload indexes after cleanup. - if _, err := bm.loadPackIndexesUnlocked(ctx); err != nil { + // reload indexes after compaction. + if err := bm.loadPackIndexesUnlocked(ctx); err != nil { return errors.Wrap(err, "error re-loading indexes") } return nil } -func (sm *SharedManager) getBlobsToCompact(indexBlobs []IndexBlobInfo, opt CompactOptions) []IndexBlobInfo { - var nonCompactedBlobs, verySmallBlobs []IndexBlobInfo - - var totalSizeNonCompactedBlobs, totalSizeVerySmallBlobs, totalSizeMediumSizedBlobs int64 - - var mediumSizedBlobCount int - - for _, b := range indexBlobs { - if b.Length > int64(sm.maxPackSize) && !opt.AllIndexes { - continue - } - - nonCompactedBlobs = append(nonCompactedBlobs, b) - totalSizeNonCompactedBlobs += b.Length - - if b.Length < int64(sm.maxPackSize/verySmallContentFraction) { - verySmallBlobs = append(verySmallBlobs, b) - totalSizeVerySmallBlobs += b.Length - } else { - mediumSizedBlobCount++ - totalSizeMediumSizedBlobs += b.Length - } - } - - if len(nonCompactedBlobs) < opt.MaxSmallBlobs { - // current count is below min allowed - nothing to do - sm.log.Debugf("no small contents to compact") - return nil - } - - if len(verySmallBlobs) > len(nonCompactedBlobs)/2 && mediumSizedBlobCount+1 < opt.MaxSmallBlobs { - sm.log.Debugf("compacting %v very small contents", len(verySmallBlobs)) - return verySmallBlobs - } - - sm.log.Debugf("compacting all %v non-compacted contents", len(nonCompactedBlobs)) - - return nonCompactedBlobs -} - -func (sm *SharedManager) compactIndexBlobs(ctx context.Context, indexBlobs []IndexBlobInfo, opt CompactOptions) error { - if len(indexBlobs) <= 1 && opt.DropDeletedBefore.IsZero() && len(opt.DropContents) == 0 { - return nil - } - - bld := make(packIndexBuilder) - - var inputs, outputs []blob.Metadata - - for i, indexBlob := range indexBlobs { - sm.log.Debugf("compacting-entries[%v/%v] %v", i, len(indexBlobs), indexBlob) - - if err := sm.addIndexBlobsToBuilder(ctx, bld, indexBlob); err != nil { - return errors.Wrap(err, "error adding index to builder") - } - - inputs = append(inputs, indexBlob.Metadata) - } - - // after we built index map in memory, drop contents from it - // we must do it after all input blobs have been merged, otherwise we may resurrect contents. - sm.dropContentsFromBuilder(bld, opt) - - var buf bytes.Buffer - if err := bld.Build(&buf, sm.indexVersion); err != nil { - return errors.Wrap(err, "unable to build an index") - } - - compactedIndexBlob, err := sm.indexBlobManager.writeIndexBlob(ctx, buf.Bytes(), "") - if err != nil { - return errors.Wrap(err, "unable to write compacted indexes") - } - - // compaction wrote index blob that's the same as one of the sources - // it must be a no-op. - for _, indexBlob := range indexBlobs { - if indexBlob.BlobID == compactedIndexBlob.BlobID { - sm.log.Debugf("compaction-noop") - return nil - } - } - - outputs = append(outputs, compactedIndexBlob) - - if err := sm.indexBlobManager.registerCompaction(ctx, inputs, outputs, opt.maxEventualConsistencySettleTime()); err != nil { - return errors.Wrap(err, "unable to register compaction") - } - - return nil -} - -func (sm *SharedManager) dropContentsFromBuilder(bld packIndexBuilder, opt CompactOptions) { - for _, dc := range opt.DropContents { - if _, ok := bld[dc]; ok { - sm.log.Debugf("manual-drop-from-index %v", dc) - delete(bld, dc) - } - } - - if !opt.DropDeletedBefore.IsZero() { - sm.log.Debugf("drop-content-deleted-before %v", opt.DropDeletedBefore) - - for _, i := range bld { - if i.GetDeleted() && i.Timestamp().Before(opt.DropDeletedBefore) { - sm.log.Debugf("drop-from-index-old-deleted %v %v", i.GetContentID(), i.Timestamp()) - delete(bld, i.GetContentID()) - } - } - - sm.log.Debugf("finished drop-content-deleted-before %v", opt.DropDeletedBefore) - } -} - -func (sm *SharedManager) addIndexBlobsToBuilder(ctx context.Context, bld packIndexBuilder, indexBlob IndexBlobInfo) error { - data, err := sm.indexBlobManager.getIndexBlob(ctx, indexBlob.BlobID) - if err != nil { - return errors.Wrapf(err, "error getting index %q", indexBlob.BlobID) - } - - index, err := openPackIndex(bytes.NewReader(data), uint32(sm.crypter.Encryptor.Overhead())) - if err != nil { - return errors.Wrapf(err, "unable to open index blob %q", indexBlob) - } - - _ = index.Iterate(AllIDs, func(i Info) error { - bld.Add(i) - return nil - }) - - return nil -} - // ParseIndexBlob loads entries in a given index blob and returns them. func (sm *SharedManager) ParseIndexBlob(ctx context.Context, blobID blob.ID) ([]Info, error) { data, err := sm.indexBlobManager.getIndexBlob(ctx, blobID) diff --git a/repo/content/index_blob_manager.go b/repo/content/index_blob_manager.go index f740ca643..99cbbe888 100644 --- a/repo/content/index_blob_manager.go +++ b/repo/content/index_blob_manager.go @@ -1,6 +1,7 @@ package content import ( + "bytes" "context" "encoding/json" "time" @@ -19,8 +20,8 @@ type indexBlobManager interface { listActiveIndexBlobs(ctx context.Context) ([]IndexBlobInfo, error) listAllIndexBlobs(ctx context.Context) ([]IndexBlobInfo, error) getIndexBlob(ctx context.Context, blobID blob.ID) ([]byte, error) + compact(ctx context.Context, opts CompactOptions) error registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata, maxEventualConsistencySettleTime time.Duration) error - cleanup(ctx context.Context, maxEventualConsistencySettleTime time.Duration) error flushCache() } @@ -63,6 +64,8 @@ type indexBlobManagerImpl struct { timeNow func() time.Time indexBlobCache contentCache log logging.Logger + maxPackSize int + indexVersion int } func (m *indexBlobManagerImpl) listAndMergeOwnWrites(ctx context.Context, prefix blob.ID) ([]blob.Metadata, error) { @@ -145,6 +148,25 @@ func (m *indexBlobManagerImpl) flushCache() { m.listCache.deleteListCache(compactionLogBlobPrefix) } +func (m *indexBlobManagerImpl) compact(ctx context.Context, opt CompactOptions) error { + indexBlobs, err := m.listActiveIndexBlobs(ctx) + if err != nil { + return errors.Wrap(err, "error listing active index blobs") + } + + blobsToCompact := m.getBlobsToCompact(indexBlobs, opt) + + if err := m.compactIndexBlobs(ctx, blobsToCompact, opt); err != nil { + return errors.Wrap(err, "error performing compaction") + } + + if err := m.cleanup(ctx, opt.maxEventualConsistencySettleTime()); err != nil { + return errors.Wrap(err, "error cleaning up index blobs") + } + + return nil +} + func (m *indexBlobManagerImpl) registerCompaction(ctx context.Context, inputs, outputs []blob.Metadata, maxEventualConsistencySettleTime time.Duration) error { logEntryBytes, err := json.Marshal(&compactionLogEntry{ InputMetadata: inputs, @@ -435,6 +457,138 @@ func (m *indexBlobManagerImpl) cleanup(ctx context.Context, maxEventualConsisten return nil } +func (m *indexBlobManagerImpl) getBlobsToCompact(indexBlobs []IndexBlobInfo, opt CompactOptions) []IndexBlobInfo { + var nonCompactedBlobs, verySmallBlobs []IndexBlobInfo + + var totalSizeNonCompactedBlobs, totalSizeVerySmallBlobs, totalSizeMediumSizedBlobs int64 + + var mediumSizedBlobCount int + + for _, b := range indexBlobs { + if b.Length > int64(m.maxPackSize) && !opt.AllIndexes { + continue + } + + nonCompactedBlobs = append(nonCompactedBlobs, b) + totalSizeNonCompactedBlobs += b.Length + + if b.Length < int64(m.maxPackSize/verySmallContentFraction) { + verySmallBlobs = append(verySmallBlobs, b) + totalSizeVerySmallBlobs += b.Length + } else { + mediumSizedBlobCount++ + totalSizeMediumSizedBlobs += b.Length + } + } + + if len(nonCompactedBlobs) < opt.MaxSmallBlobs { + // current count is below min allowed - nothing to do + m.log.Debugf("no small contents to compact") + return nil + } + + if len(verySmallBlobs) > len(nonCompactedBlobs)/2 && mediumSizedBlobCount+1 < opt.MaxSmallBlobs { + m.log.Debugf("compacting %v very small contents", len(verySmallBlobs)) + return verySmallBlobs + } + + m.log.Debugf("compacting all %v non-compacted contents", len(nonCompactedBlobs)) + + return nonCompactedBlobs +} + +func (m *indexBlobManagerImpl) compactIndexBlobs(ctx context.Context, indexBlobs []IndexBlobInfo, opt CompactOptions) error { + if len(indexBlobs) <= 1 && opt.DropDeletedBefore.IsZero() && len(opt.DropContents) == 0 { + return nil + } + + bld := make(packIndexBuilder) + + var inputs, outputs []blob.Metadata + + for i, indexBlob := range indexBlobs { + m.log.Debugf("compacting-entries[%v/%v] %v", i, len(indexBlobs), indexBlob) + + if err := m.addIndexBlobsToBuilder(ctx, bld, indexBlob); err != nil { + return errors.Wrap(err, "error adding index to builder") + } + + inputs = append(inputs, indexBlob.Metadata) + } + + // after we built index map in memory, drop contents from it + // we must do it after all input blobs have been merged, otherwise we may resurrect contents. + m.dropContentsFromBuilder(bld, opt) + + var buf bytes.Buffer + if err := bld.Build(&buf, m.indexVersion); err != nil { + return errors.Wrap(err, "unable to build an index") + } + + compactedIndexBlob, err := m.writeIndexBlob(ctx, buf.Bytes(), "") + if err != nil { + return errors.Wrap(err, "unable to write compacted indexes") + } + + // compaction wrote index blob that's the same as one of the sources + // it must be a no-op. + for _, indexBlob := range indexBlobs { + if indexBlob.BlobID == compactedIndexBlob.BlobID { + m.log.Debugf("compaction-noop") + return nil + } + } + + outputs = append(outputs, compactedIndexBlob) + + if err := m.registerCompaction(ctx, inputs, outputs, opt.maxEventualConsistencySettleTime()); err != nil { + return errors.Wrap(err, "unable to register compaction") + } + + return nil +} + +func (m *indexBlobManagerImpl) dropContentsFromBuilder(bld packIndexBuilder, opt CompactOptions) { + for _, dc := range opt.DropContents { + if _, ok := bld[dc]; ok { + m.log.Debugf("manual-drop-from-index %v", dc) + delete(bld, dc) + } + } + + if !opt.DropDeletedBefore.IsZero() { + m.log.Debugf("drop-content-deleted-before %v", opt.DropDeletedBefore) + + for _, i := range bld { + if i.GetDeleted() && i.Timestamp().Before(opt.DropDeletedBefore) { + m.log.Debugf("drop-from-index-old-deleted %v %v", i.GetContentID(), i.Timestamp()) + delete(bld, i.GetContentID()) + } + } + + m.log.Debugf("finished drop-content-deleted-before %v", opt.DropDeletedBefore) + } +} + +func (m *indexBlobManagerImpl) addIndexBlobsToBuilder(ctx context.Context, bld packIndexBuilder, indexBlob IndexBlobInfo) error { + data, err := m.getIndexBlob(ctx, indexBlob.BlobID) + if err != nil { + return errors.Wrapf(err, "error getting index %q", indexBlob.BlobID) + } + + index, err := openPackIndex(bytes.NewReader(data), uint32(m.crypter.Encryptor.Overhead())) + if err != nil { + return errors.Wrapf(err, "unable to open index blob %q", indexBlob) + } + + _ = index.Iterate(AllIDs, func(i Info) error { + bld.Add(i) + return nil + }) + + return nil +} + func blobsOlderThan(m []blob.Metadata, cutoffTime time.Time) []blob.Metadata { var res []blob.Metadata diff --git a/repo/content/index_blob_manager_test.go b/repo/content/index_blob_manager_test.go index fef4701a1..a9eaf44a9 100644 --- a/repo/content/index_blob_manager_test.go +++ b/repo/content/index_blob_manager_test.go @@ -796,9 +796,11 @@ func newIndexBlobManagerForTesting(t *testing.T, st blob.Storage, localTimeNow f HashFunction: hf, Encryptor: enc, }, - listCache: lc, - timeNow: localTimeNow, - log: repologging.Printf(t.Logf)("test"), + listCache: lc, + timeNow: localTimeNow, + maxPackSize: 20 << 20, + indexVersion: 1, + log: repologging.Printf(t.Logf)("test"), } return m