Content manager cleanups (#1118)

* nit: dead code cleanup

* content: cleaned up committedContextIndex usage, removed unused code

* content: moved fetching index blobs into committedContentIndex
This commit is contained in:
Jarek Kowalski
2021-06-05 13:34:20 -07:00
committed by GitHub
parent f094b97628
commit 6b646f7e9d
5 changed files with 134 additions and 144 deletions

View File

@@ -8,6 +8,7 @@
"sync/atomic"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/repo/blob"
@@ -31,6 +32,9 @@ type committedContentIndex struct {
v1PerContentOverhead uint32
indexVersion int
// fetchOne loads one index blob
fetchOne func(ctx context.Context, blobID blob.ID) ([]byte, error)
log logging.Logger
}
@@ -112,43 +116,54 @@ func (c *committedContentIndex) packFilesChanged(packFiles []blob.ID) bool {
return false
}
// Uses packFiles for indexing and returns whether or not the set of index
// packs have changed compared to the previous set. An error is returned if the
// indices cannot be read for any reason.
func (c *committedContentIndex) use(ctx context.Context, packFiles []blob.ID) (bool, error) {
c.mu.Lock()
defer c.mu.Unlock()
if !c.packFilesChanged(packFiles) {
return false, nil
}
atomic.AddInt64(&c.rev, 1)
var newMerged mergedIndex
newInUse := map[blob.ID]packIndex{}
func (c *committedContentIndex) merge(ctx context.Context, packFiles []blob.ID) (merged mergedIndex, used map[blob.ID]packIndex, finalErr error) {
used = map[blob.ID]packIndex{}
defer func() {
newMerged.Close() //nolint:errcheck
// we failed along the way, close the merged index.
if finalErr != nil {
merged.Close() //nolint:errcheck
}
}()
for _, e := range packFiles {
ndx, err := c.cache.openIndex(ctx, e)
if err != nil {
return false, errors.Wrapf(err, "unable to open pack index %q", e)
return nil, nil, errors.Wrapf(err, "unable to open pack index %q", e)
}
newMerged = append(newMerged, ndx)
newInUse[e] = ndx
merged = append(merged, ndx)
used[e] = ndx
}
mergedAndCombined, err := c.combineSmallIndexes(newMerged)
mergedAndCombined, err := c.combineSmallIndexes(merged)
if err != nil {
return false, errors.Wrap(err, "unable to combine small indexes")
return nil, nil, errors.Wrap(err, "unable to combine small indexes")
}
c.log.Debugf("combined %v into %v index segments", len(newMerged), len(mergedAndCombined))
c.log.Debugf("combined %v into %v index segments", len(merged), len(mergedAndCombined))
merged = mergedAndCombined
return
}
// Uses packFiles for indexing. An error is returned if the
// indices cannot be read for any reason.
func (c *committedContentIndex) use(ctx context.Context, packFiles []blob.ID) error {
c.mu.Lock()
defer c.mu.Unlock()
if !c.packFilesChanged(packFiles) {
return nil
}
mergedAndCombined, newInUse, err := c.merge(ctx, packFiles)
if err != nil {
return err
}
atomic.AddInt64(&c.rev, 1)
c.merged = mergedAndCombined
c.inUse = newInUse
@@ -157,9 +172,7 @@ func (c *committedContentIndex) use(ctx context.Context, packFiles []blob.ID) (b
c.log.Errorf("unable to expire unused content index files: %v", err)
}
newMerged = nil // prevent closing newMerged indices
return true, nil
return nil
}
func (c *committedContentIndex) combineSmallIndexes(m mergedIndex) (mergedIndex, error) {
@@ -215,7 +228,72 @@ func (c *committedContentIndex) close() error {
return nil
}
func newCommittedContentIndex(caching *CachingOptions, v1PerContentOverhead uint32, indexVersion int, baseLog logging.Logger) *committedContentIndex {
func (c *committedContentIndex) fetchIndexBlobs(
ctx context.Context,
indexBlobs []blob.ID,
) error {
ch, err := c.missingIndexBlobs(ctx, indexBlobs)
if err != nil {
return err
}
if len(ch) == 0 {
return nil
}
c.log.Debugf("Downloading %v new index blobs...", len(indexBlobs))
eg, ctx := errgroup.WithContext(ctx)
for i := 0; i < parallelFetches; i++ {
eg.Go(func() error {
for indexBlobID := range ch {
data, err := c.fetchOne(ctx, indexBlobID)
if err != nil {
return errors.Wrapf(err, "error loading index blob %v", indexBlobID)
}
if err := c.addContent(ctx, indexBlobID, data, false); err != nil {
return errors.Wrap(err, "unable to add to committed content cache")
}
}
return nil
})
}
if err := eg.Wait(); err != nil {
return errors.Wrap(err, "error downloading indexes")
}
c.log.Debugf("Index blobs downloaded.")
return nil
}
// missingIndexBlobs returns a closed channel filled with blob IDs that are not in committedContents cache.
func (c *committedContentIndex) missingIndexBlobs(ctx context.Context, blobs []blob.ID) (<-chan blob.ID, error) {
ch := make(chan blob.ID, len(blobs))
defer close(ch)
for _, id := range blobs {
has, err := c.cache.hasIndexBlobID(ctx, id)
if err != nil {
return nil, errors.Wrapf(err, "error determining whether index blob %v has been downloaded", id)
}
if !has {
ch <- id
}
}
return ch, nil
}
func newCommittedContentIndex(caching *CachingOptions,
v1PerContentOverhead uint32,
indexVersion int,
fetchOne func(ctx context.Context, blobID blob.ID) ([]byte, error),
baseLog logging.Logger,
) *committedContentIndex {
log := logging.WithPrefix("[committed-content-index] ", baseLog)
var cache committedContentIndexCache
@@ -235,6 +313,7 @@ func newCommittedContentIndex(caching *CachingOptions, v1PerContentOverhead uint
inUse: map[blob.ID]packIndex{},
v1PerContentOverhead: v1PerContentOverhead,
indexVersion: indexVersion,
fetchOne: fetchOne,
log: baseLog,
}
}

View File

@@ -4,7 +4,6 @@
"bytes"
"context"
"os"
"sync"
"sync/atomic"
"time"
@@ -118,13 +117,13 @@ func (sm *SharedManager) attemptReadPackFileLocalIndex(ctx context.Context, pack
return localIndexBytes, nil
}
func (sm *SharedManager) loadPackIndexesUnlocked(ctx context.Context) ([]IndexBlobInfo, bool, error) {
func (sm *SharedManager) loadPackIndexesUnlocked(ctx context.Context) ([]IndexBlobInfo, error) {
nextSleepTime := 100 * time.Millisecond //nolint:gomnd
for i := 0; i < indexLoadAttempts; i++ {
if err := ctx.Err(); err != nil {
// nolint:wrapcheck
return nil, false, err
return nil, err
}
if i > 0 {
@@ -136,109 +135,34 @@ func (sm *SharedManager) loadPackIndexesUnlocked(ctx context.Context) ([]IndexBl
indexBlobs, err := sm.indexBlobManager.listIndexBlobs(ctx, false)
if err != nil {
return nil, false, errors.Wrap(err, "error listing index blobs")
return nil, errors.Wrap(err, "error listing index blobs")
}
err = sm.tryLoadPackIndexBlobsUnlocked(ctx, indexBlobs)
var indexBlobIDs []blob.ID
for _, b := range indexBlobs {
indexBlobIDs = append(indexBlobIDs, b.BlobID)
}
err = sm.committedContents.fetchIndexBlobs(ctx, indexBlobIDs)
if err == nil {
var indexBlobIDs []blob.ID
for _, b := range indexBlobs {
indexBlobIDs = append(indexBlobIDs, b.BlobID)
}
var updated bool
updated, err = sm.committedContents.use(ctx, indexBlobIDs)
err = sm.committedContents.use(ctx, indexBlobIDs)
if err != nil {
return nil, false, err
return nil, err
}
if len(indexBlobs) > indexBlobCompactionWarningThreshold {
sm.log.Errorf("Found too many index blobs (%v), this may result in degraded performance.\n\nPlease ensure periodic repository maintenance is enabled or run 'kopia maintenance'.", len(indexBlobs))
}
return indexBlobs, updated, nil
return indexBlobs, nil
}
if !errors.Is(err, blob.ErrBlobNotFound) {
return nil, false, err
return nil, err
}
}
return nil, false, errors.Errorf("unable to load pack indexes despite %v retries", indexLoadAttempts)
}
func (sm *SharedManager) tryLoadPackIndexBlobsUnlocked(ctx context.Context, indexBlobs []IndexBlobInfo) error {
ch, unprocessedIndexesSize, err := sm.unprocessedIndexBlobsUnlocked(ctx, indexBlobs)
if err != nil {
return err
}
if len(ch) == 0 {
return nil
}
sm.log.Debugf("downloading %v new index blobs (%v bytes)...", len(ch), unprocessedIndexesSize)
var wg sync.WaitGroup
errch := make(chan error, parallelFetches)
for i := 0; i < parallelFetches; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for indexBlobID := range ch {
data, err := sm.indexBlobManager.getIndexBlob(ctx, indexBlobID)
if err != nil {
errch <- err
return
}
if err := sm.committedContents.addContent(ctx, indexBlobID, data, false); err != nil {
errch <- errors.Wrap(err, "unable to add to committed content cache")
return
}
}
}()
}
wg.Wait()
close(errch)
// Propagate async errors, if any.
for err := range errch {
return err
}
sm.log.Debugf("Index contents downloaded.")
return nil
}
// unprocessedIndexBlobsUnlocked returns a closed channel filled with content IDs that are not in committedContents cache.
func (sm *SharedManager) unprocessedIndexBlobsUnlocked(ctx context.Context, contents []IndexBlobInfo) (resultCh <-chan blob.ID, totalSize int64, err error) {
ch := make(chan blob.ID, len(contents))
defer close(ch)
for _, c := range contents {
has, err := sm.committedContents.cache.hasIndexBlobID(ctx, c.BlobID)
if err != nil {
return nil, 0, errors.Wrapf(err, "error determining whether index blob %v has been downloaded", c.BlobID)
}
if has {
sm.log.Debugf("index-already-cached %v", c.BlobID)
continue
}
ch <- c.BlobID
totalSize += c.Length
}
return ch, totalSize, nil
return nil, errors.Errorf("unable to load pack indexes despite %v retries", indexLoadAttempts)
}
func (sm *SharedManager) getCacheForContentID(id ID) contentCache {
@@ -344,13 +268,6 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca
return errors.Wrap(err, "unable to initialize own writes cache")
}
contentIndex := newCommittedContentIndex(caching, uint32(sm.crypter.Encryptor.Overhead()), sm.indexVersion, sm.sharedBaseLogger)
// once everything is ready, set it up
sm.contentCache = dataCache
sm.metadataCache = metadataCache
sm.committedContents = contentIndex
sm.indexBlobManager = &indexBlobManagerImpl{
st: sm.st,
crypter: sm.crypter,
@@ -361,6 +278,11 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca
log: logging.WithPrefix("[index-blob-manager] ", sm.sharedBaseLogger),
}
// once everything is ready, set it up
sm.contentCache = dataCache
sm.metadataCache = metadataCache
sm.committedContents = newCommittedContentIndex(caching, uint32(sm.crypter.Encryptor.Overhead()), sm.indexVersion, sm.indexBlobManager.getIndexBlob, sm.sharedBaseLogger)
return nil
}
@@ -486,7 +408,7 @@ func NewSharedManager(ctx context.Context, st blob.Storage, f *FormattingOptions
return nil, errors.Wrap(err, "error setting up read manager caches")
}
if _, _, err := sm.loadPackIndexesUnlocked(ctx); err != nil {
if _, err := sm.loadPackIndexesUnlocked(ctx); err != nil {
return nil, errors.Wrap(err, "error loading indexes")
}

View File

@@ -743,7 +743,7 @@ func (bm *WriteManager) unlock() {
}
// Refresh reloads the committed content indexes.
func (bm *WriteManager) Refresh(ctx context.Context) (bool, error) {
func (bm *WriteManager) Refresh(ctx context.Context) error {
bm.lock()
defer bm.unlock()
@@ -751,10 +751,10 @@ func (bm *WriteManager) Refresh(ctx context.Context) (bool, error) {
t0 := clock.Now()
_, updated, err := bm.loadPackIndexesUnlocked(ctx)
bm.log.Debugf("Refresh completed in %v and updated=%v", clock.Since(t0), updated)
_, err := bm.loadPackIndexesUnlocked(ctx)
bm.log.Debugf("Refresh completed in %v", clock.Since(t0))
return updated, err
return err
}
// SyncMetadataCache synchronizes metadata cache with metadata blobs in storage.
@@ -768,12 +768,6 @@ func (bm *WriteManager) SyncMetadataCache(ctx context.Context) error {
return nil
}
// DecryptBlob returns the contents of an encrypted blob that can be decrypted (n,m,l).
func (bm *WriteManager) DecryptBlob(ctx context.Context, blobID blob.ID) ([]byte, error) {
// nolint:wrapcheck
return bm.indexBlobManager.getIndexBlob(ctx, blobID)
}
// ManagerOptions are the optional parameters for manager creation.
type ManagerOptions struct {
RepositoryFormatBytes []byte

View File

@@ -36,7 +36,7 @@ func (bm *WriteManager) CompactIndexes(ctx context.Context, opt CompactOptions)
bm.lock()
defer bm.unlock()
indexBlobs, _, err := bm.loadPackIndexesUnlocked(ctx)
indexBlobs, err := bm.loadPackIndexesUnlocked(ctx)
if err != nil {
return errors.Wrap(err, "error loading indexes")
}
@@ -52,7 +52,7 @@ func (bm *WriteManager) CompactIndexes(ctx context.Context, opt CompactOptions)
}
// reload indexes after cleanup.
if _, _, err := bm.loadPackIndexesUnlocked(ctx); err != nil {
if _, err := bm.loadPackIndexesUnlocked(ctx); err != nil {
return errors.Wrap(err, "error re-loading indexes")
}

View File

@@ -285,14 +285,9 @@ func (r *directRepository) IndexBlobReader() content.IndexBlobReader {
return r.cmgr
}
// Refresh periodically makes external changes visible to repository.
// Refresh makes external changes visible to repository.
func (r *directRepository) Refresh(ctx context.Context) error {
_, err := r.cmgr.Refresh(ctx)
if err != nil {
return errors.Wrap(err, "error refreshing content index")
}
return nil
return errors.Wrap(r.cmgr.Refresh(ctx), "error refreshing content index")
}
// RefreshPeriodically periodically refreshes the repository to reflect the changes made by other hosts.