From 6b646f7e9d19734da7e1a248edfe0bbfa5522adc Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sat, 5 Jun 2021 13:34:20 -0700 Subject: [PATCH] Content manager cleanups (#1118) * nit: dead code cleanup * content: cleaned up committedContextIndex usage, removed unused code * content: moved fetching index blobs into committedContentIndex --- repo/content/committed_content_index.go | 133 +++++++++++++++++++----- repo/content/committed_read_manager.go | 118 ++++----------------- repo/content/content_manager.go | 14 +-- repo/content/content_manager_indexes.go | 4 +- repo/repository.go | 9 +- 5 files changed, 134 insertions(+), 144 deletions(-) diff --git a/repo/content/committed_content_index.go b/repo/content/committed_content_index.go index bdd3f7378..f41cd4f9f 100644 --- a/repo/content/committed_content_index.go +++ b/repo/content/committed_content_index.go @@ -8,6 +8,7 @@ "sync/atomic" "github.com/pkg/errors" + "golang.org/x/sync/errgroup" "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/repo/blob" @@ -31,6 +32,9 @@ type committedContentIndex struct { v1PerContentOverhead uint32 indexVersion int + // fetchOne loads one index blob + fetchOne func(ctx context.Context, blobID blob.ID) ([]byte, error) + log logging.Logger } @@ -112,43 +116,54 @@ func (c *committedContentIndex) packFilesChanged(packFiles []blob.ID) bool { return false } -// Uses packFiles for indexing and returns whether or not the set of index -// packs have changed compared to the previous set. An error is returned if the -// indices cannot be read for any reason. -func (c *committedContentIndex) use(ctx context.Context, packFiles []blob.ID) (bool, error) { - c.mu.Lock() - defer c.mu.Unlock() - - if !c.packFilesChanged(packFiles) { - return false, nil - } - - atomic.AddInt64(&c.rev, 1) - - var newMerged mergedIndex - - newInUse := map[blob.ID]packIndex{} +func (c *committedContentIndex) merge(ctx context.Context, packFiles []blob.ID) (merged mergedIndex, used map[blob.ID]packIndex, finalErr error) { + used = map[blob.ID]packIndex{} defer func() { - newMerged.Close() //nolint:errcheck + // we failed along the way, close the merged index. + if finalErr != nil { + merged.Close() //nolint:errcheck + } }() for _, e := range packFiles { ndx, err := c.cache.openIndex(ctx, e) if err != nil { - return false, errors.Wrapf(err, "unable to open pack index %q", e) + return nil, nil, errors.Wrapf(err, "unable to open pack index %q", e) } - newMerged = append(newMerged, ndx) - newInUse[e] = ndx + merged = append(merged, ndx) + used[e] = ndx } - mergedAndCombined, err := c.combineSmallIndexes(newMerged) + mergedAndCombined, err := c.combineSmallIndexes(merged) if err != nil { - return false, errors.Wrap(err, "unable to combine small indexes") + return nil, nil, errors.Wrap(err, "unable to combine small indexes") } - c.log.Debugf("combined %v into %v index segments", len(newMerged), len(mergedAndCombined)) + c.log.Debugf("combined %v into %v index segments", len(merged), len(mergedAndCombined)) + + merged = mergedAndCombined + + return +} + +// Uses packFiles for indexing. An error is returned if the +// indices cannot be read for any reason. +func (c *committedContentIndex) use(ctx context.Context, packFiles []blob.ID) error { + c.mu.Lock() + defer c.mu.Unlock() + + if !c.packFilesChanged(packFiles) { + return nil + } + + mergedAndCombined, newInUse, err := c.merge(ctx, packFiles) + if err != nil { + return err + } + + atomic.AddInt64(&c.rev, 1) c.merged = mergedAndCombined c.inUse = newInUse @@ -157,9 +172,7 @@ func (c *committedContentIndex) use(ctx context.Context, packFiles []blob.ID) (b c.log.Errorf("unable to expire unused content index files: %v", err) } - newMerged = nil // prevent closing newMerged indices - - return true, nil + return nil } func (c *committedContentIndex) combineSmallIndexes(m mergedIndex) (mergedIndex, error) { @@ -215,7 +228,72 @@ func (c *committedContentIndex) close() error { return nil } -func newCommittedContentIndex(caching *CachingOptions, v1PerContentOverhead uint32, indexVersion int, baseLog logging.Logger) *committedContentIndex { +func (c *committedContentIndex) fetchIndexBlobs( + ctx context.Context, + indexBlobs []blob.ID, +) error { + ch, err := c.missingIndexBlobs(ctx, indexBlobs) + if err != nil { + return err + } + + if len(ch) == 0 { + return nil + } + + c.log.Debugf("Downloading %v new index blobs...", len(indexBlobs)) + + eg, ctx := errgroup.WithContext(ctx) + for i := 0; i < parallelFetches; i++ { + eg.Go(func() error { + for indexBlobID := range ch { + data, err := c.fetchOne(ctx, indexBlobID) + if err != nil { + return errors.Wrapf(err, "error loading index blob %v", indexBlobID) + } + + if err := c.addContent(ctx, indexBlobID, data, false); err != nil { + return errors.Wrap(err, "unable to add to committed content cache") + } + } + return nil + }) + } + + if err := eg.Wait(); err != nil { + return errors.Wrap(err, "error downloading indexes") + } + + c.log.Debugf("Index blobs downloaded.") + + return nil +} + +// missingIndexBlobs returns a closed channel filled with blob IDs that are not in committedContents cache. +func (c *committedContentIndex) missingIndexBlobs(ctx context.Context, blobs []blob.ID) (<-chan blob.ID, error) { + ch := make(chan blob.ID, len(blobs)) + defer close(ch) + + for _, id := range blobs { + has, err := c.cache.hasIndexBlobID(ctx, id) + if err != nil { + return nil, errors.Wrapf(err, "error determining whether index blob %v has been downloaded", id) + } + + if !has { + ch <- id + } + } + + return ch, nil +} + +func newCommittedContentIndex(caching *CachingOptions, + v1PerContentOverhead uint32, + indexVersion int, + fetchOne func(ctx context.Context, blobID blob.ID) ([]byte, error), + baseLog logging.Logger, +) *committedContentIndex { log := logging.WithPrefix("[committed-content-index] ", baseLog) var cache committedContentIndexCache @@ -235,6 +313,7 @@ func newCommittedContentIndex(caching *CachingOptions, v1PerContentOverhead uint inUse: map[blob.ID]packIndex{}, v1PerContentOverhead: v1PerContentOverhead, indexVersion: indexVersion, + fetchOne: fetchOne, log: baseLog, } } diff --git a/repo/content/committed_read_manager.go b/repo/content/committed_read_manager.go index d24f5de08..b683f3414 100644 --- a/repo/content/committed_read_manager.go +++ b/repo/content/committed_read_manager.go @@ -4,7 +4,6 @@ "bytes" "context" "os" - "sync" "sync/atomic" "time" @@ -118,13 +117,13 @@ func (sm *SharedManager) attemptReadPackFileLocalIndex(ctx context.Context, pack return localIndexBytes, nil } -func (sm *SharedManager) loadPackIndexesUnlocked(ctx context.Context) ([]IndexBlobInfo, bool, error) { +func (sm *SharedManager) loadPackIndexesUnlocked(ctx context.Context) ([]IndexBlobInfo, error) { nextSleepTime := 100 * time.Millisecond //nolint:gomnd for i := 0; i < indexLoadAttempts; i++ { if err := ctx.Err(); err != nil { // nolint:wrapcheck - return nil, false, err + return nil, err } if i > 0 { @@ -136,109 +135,34 @@ func (sm *SharedManager) loadPackIndexesUnlocked(ctx context.Context) ([]IndexBl indexBlobs, err := sm.indexBlobManager.listIndexBlobs(ctx, false) if err != nil { - return nil, false, errors.Wrap(err, "error listing index blobs") + return nil, errors.Wrap(err, "error listing index blobs") } - err = sm.tryLoadPackIndexBlobsUnlocked(ctx, indexBlobs) + var indexBlobIDs []blob.ID + for _, b := range indexBlobs { + indexBlobIDs = append(indexBlobIDs, b.BlobID) + } + + err = sm.committedContents.fetchIndexBlobs(ctx, indexBlobIDs) if err == nil { - var indexBlobIDs []blob.ID - for _, b := range indexBlobs { - indexBlobIDs = append(indexBlobIDs, b.BlobID) - } - - var updated bool - - updated, err = sm.committedContents.use(ctx, indexBlobIDs) + err = sm.committedContents.use(ctx, indexBlobIDs) if err != nil { - return nil, false, err + return nil, err } if len(indexBlobs) > indexBlobCompactionWarningThreshold { sm.log.Errorf("Found too many index blobs (%v), this may result in degraded performance.\n\nPlease ensure periodic repository maintenance is enabled or run 'kopia maintenance'.", len(indexBlobs)) } - return indexBlobs, updated, nil + return indexBlobs, nil } if !errors.Is(err, blob.ErrBlobNotFound) { - return nil, false, err + return nil, err } } - return nil, false, errors.Errorf("unable to load pack indexes despite %v retries", indexLoadAttempts) -} - -func (sm *SharedManager) tryLoadPackIndexBlobsUnlocked(ctx context.Context, indexBlobs []IndexBlobInfo) error { - ch, unprocessedIndexesSize, err := sm.unprocessedIndexBlobsUnlocked(ctx, indexBlobs) - if err != nil { - return err - } - - if len(ch) == 0 { - return nil - } - - sm.log.Debugf("downloading %v new index blobs (%v bytes)...", len(ch), unprocessedIndexesSize) - - var wg sync.WaitGroup - - errch := make(chan error, parallelFetches) - - for i := 0; i < parallelFetches; i++ { - wg.Add(1) - - go func() { - defer wg.Done() - - for indexBlobID := range ch { - data, err := sm.indexBlobManager.getIndexBlob(ctx, indexBlobID) - if err != nil { - errch <- err - return - } - - if err := sm.committedContents.addContent(ctx, indexBlobID, data, false); err != nil { - errch <- errors.Wrap(err, "unable to add to committed content cache") - return - } - } - }() - } - - wg.Wait() - close(errch) - - // Propagate async errors, if any. - for err := range errch { - return err - } - - sm.log.Debugf("Index contents downloaded.") - - return nil -} - -// unprocessedIndexBlobsUnlocked returns a closed channel filled with content IDs that are not in committedContents cache. -func (sm *SharedManager) unprocessedIndexBlobsUnlocked(ctx context.Context, contents []IndexBlobInfo) (resultCh <-chan blob.ID, totalSize int64, err error) { - ch := make(chan blob.ID, len(contents)) - defer close(ch) - - for _, c := range contents { - has, err := sm.committedContents.cache.hasIndexBlobID(ctx, c.BlobID) - if err != nil { - return nil, 0, errors.Wrapf(err, "error determining whether index blob %v has been downloaded", c.BlobID) - } - - if has { - sm.log.Debugf("index-already-cached %v", c.BlobID) - continue - } - - ch <- c.BlobID - totalSize += c.Length - } - - return ch, totalSize, nil + return nil, errors.Errorf("unable to load pack indexes despite %v retries", indexLoadAttempts) } func (sm *SharedManager) getCacheForContentID(id ID) contentCache { @@ -344,13 +268,6 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca return errors.Wrap(err, "unable to initialize own writes cache") } - contentIndex := newCommittedContentIndex(caching, uint32(sm.crypter.Encryptor.Overhead()), sm.indexVersion, sm.sharedBaseLogger) - - // once everything is ready, set it up - sm.contentCache = dataCache - sm.metadataCache = metadataCache - sm.committedContents = contentIndex - sm.indexBlobManager = &indexBlobManagerImpl{ st: sm.st, crypter: sm.crypter, @@ -361,6 +278,11 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca log: logging.WithPrefix("[index-blob-manager] ", sm.sharedBaseLogger), } + // once everything is ready, set it up + sm.contentCache = dataCache + sm.metadataCache = metadataCache + sm.committedContents = newCommittedContentIndex(caching, uint32(sm.crypter.Encryptor.Overhead()), sm.indexVersion, sm.indexBlobManager.getIndexBlob, sm.sharedBaseLogger) + return nil } @@ -486,7 +408,7 @@ func NewSharedManager(ctx context.Context, st blob.Storage, f *FormattingOptions return nil, errors.Wrap(err, "error setting up read manager caches") } - if _, _, err := sm.loadPackIndexesUnlocked(ctx); err != nil { + if _, err := sm.loadPackIndexesUnlocked(ctx); err != nil { return nil, errors.Wrap(err, "error loading indexes") } diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index 27ac9ee0a..6679e0358 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -743,7 +743,7 @@ func (bm *WriteManager) unlock() { } // Refresh reloads the committed content indexes. -func (bm *WriteManager) Refresh(ctx context.Context) (bool, error) { +func (bm *WriteManager) Refresh(ctx context.Context) error { bm.lock() defer bm.unlock() @@ -751,10 +751,10 @@ func (bm *WriteManager) Refresh(ctx context.Context) (bool, error) { t0 := clock.Now() - _, updated, err := bm.loadPackIndexesUnlocked(ctx) - bm.log.Debugf("Refresh completed in %v and updated=%v", clock.Since(t0), updated) + _, err := bm.loadPackIndexesUnlocked(ctx) + bm.log.Debugf("Refresh completed in %v", clock.Since(t0)) - return updated, err + return err } // SyncMetadataCache synchronizes metadata cache with metadata blobs in storage. @@ -768,12 +768,6 @@ func (bm *WriteManager) SyncMetadataCache(ctx context.Context) error { return nil } -// DecryptBlob returns the contents of an encrypted blob that can be decrypted (n,m,l). -func (bm *WriteManager) DecryptBlob(ctx context.Context, blobID blob.ID) ([]byte, error) { - // nolint:wrapcheck - return bm.indexBlobManager.getIndexBlob(ctx, blobID) -} - // ManagerOptions are the optional parameters for manager creation. type ManagerOptions struct { RepositoryFormatBytes []byte diff --git a/repo/content/content_manager_indexes.go b/repo/content/content_manager_indexes.go index 65fc466dc..868e5c867 100644 --- a/repo/content/content_manager_indexes.go +++ b/repo/content/content_manager_indexes.go @@ -36,7 +36,7 @@ func (bm *WriteManager) CompactIndexes(ctx context.Context, opt CompactOptions) bm.lock() defer bm.unlock() - indexBlobs, _, err := bm.loadPackIndexesUnlocked(ctx) + indexBlobs, err := bm.loadPackIndexesUnlocked(ctx) if err != nil { return errors.Wrap(err, "error loading indexes") } @@ -52,7 +52,7 @@ func (bm *WriteManager) CompactIndexes(ctx context.Context, opt CompactOptions) } // reload indexes after cleanup. - if _, _, err := bm.loadPackIndexesUnlocked(ctx); err != nil { + if _, err := bm.loadPackIndexesUnlocked(ctx); err != nil { return errors.Wrap(err, "error re-loading indexes") } diff --git a/repo/repository.go b/repo/repository.go index fd2ee6faa..21359dd82 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -285,14 +285,9 @@ func (r *directRepository) IndexBlobReader() content.IndexBlobReader { return r.cmgr } -// Refresh periodically makes external changes visible to repository. +// Refresh makes external changes visible to repository. func (r *directRepository) Refresh(ctx context.Context) error { - _, err := r.cmgr.Refresh(ctx) - if err != nil { - return errors.Wrap(err, "error refreshing content index") - } - - return nil + return errors.Wrap(r.cmgr.Refresh(ctx), "error refreshing content index") } // RefreshPeriodically periodically refreshes the repository to reflect the changes made by other hosts.