diff --git a/internal/epoch/complete_set.go b/internal/epoch/complete_set.go new file mode 100644 index 000000000..db66fed7f --- /dev/null +++ b/internal/epoch/complete_set.go @@ -0,0 +1,45 @@ +package epoch + +import ( + "strconv" + "strings" + + "github.com/kopia/kopia/repo/blob" +) + +// findCompleteSetOfBlobs looks for a complete set of blobs IDs following a naming convention: +// '-s-c' +// where: +// 'prefix' is arbitrary string not containing a dash ('-') +// 'set' is a random string shared by all indexes in the same set +// 'count' is a number that specifies how many items must be in the set to make it complete. +// +// The algorithm returns IDs of blobs that form the first complete set. +func findCompleteSetOfBlobs(bms []blob.Metadata) []blob.Metadata { + sets := map[string][]blob.Metadata{} + + for _, bm := range bms { + id := bm.BlobID + parts := strings.Split(string(id), "-") + + if len(parts) < 3 || !strings.HasPrefix(parts[1], "s") || !strings.HasPrefix(parts[2], "c") { + // malformed ID, ignore + continue + } + + count, err := strconv.Atoi(parts[2][1:]) + if err != nil { + // malformed ID, ignore + continue + } + + setID := parts[1] + sets[setID] = append(sets[setID], bm) + + if len(sets[setID]) == count { + return sets[setID] + } + } + + return nil +} diff --git a/internal/epoch/complete_set_test.go b/internal/epoch/complete_set_test.go new file mode 100644 index 000000000..8409e44c9 --- /dev/null +++ b/internal/epoch/complete_set_test.go @@ -0,0 +1,96 @@ +package epoch + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/repo/blob" +) + +func TestFindCompleteSetOfBlobs(t *testing.T) { + cases := []struct { + input []blob.ID + want []blob.ID + }{ + { + input: []blob.ID{}, + want: []blob.ID{}, + }, + + // one complete session of size 2 + { + input: []blob.ID{ + "a-s0-c2", + "b-s0-c2", + }, + want: []blob.ID{ + "a-s0-c2", + "b-s0-c2", + }, + }, + // one complete session with some malformed name + { + input: []blob.ID{ + "a-s0-c2", + "malformed", + "b-s0-c2", + }, + want: []blob.ID{ + "a-s0-c2", + "b-s0-c2", + }, + }, + // one complete session with some malformed blob ID + { + input: []blob.ID{ + "a-s0-c2", + "malformed-s0-x2", + "b-s0-c2", + }, + want: []blob.ID{ + "a-s0-c2", + "b-s0-c2", + }, + }, + // one complete session with some malformed count + { + input: []blob.ID{ + "a-s0-c2", + "malformed-s0-cNAN", + "b-s0-c2", + }, + want: []blob.ID{ + "a-s0-c2", + "b-s0-c2", + }, + }, + // two complete sessions, we pick 's0' as it's the first one to become complete. + { + input: []blob.ID{ + "foo-s0-c2", + "aaa-s1-c2", + "bar-s0-c2", + "bbb-s1-c2", + }, + want: []blob.ID{ + "foo-s0-c2", + "bar-s0-c2", + }, + }, + } + + for _, tc := range cases { + require.Equal(t, tc.want, blob.IDsFromMetadata(findCompleteSetOfBlobs(dummyMetadataForIDs(tc.input))), "invalid result for %v", tc.input) + } +} + +func dummyMetadataForIDs(ids []blob.ID) []blob.Metadata { + var result []blob.Metadata + + for _, id := range ids { + result = append(result, blob.Metadata{BlobID: id}) + } + + return result +} diff --git a/internal/epoch/epoch_advance.go b/internal/epoch/epoch_advance.go new file mode 100644 index 000000000..f7ede899b --- /dev/null +++ b/internal/epoch/epoch_advance.go @@ -0,0 +1,53 @@ +package epoch + +import ( + "time" + + "github.com/kopia/kopia/repo/blob" +) + +// shouldAdvanceEpoch determines if the current epoch should be advanced based on set of blobs in it. +// +// Epoch will be advanced if it's been more than 'minEpochDuration' between earliest and +// most recent write AND at least one of the criteria has been met: +// +// - number of blobs in the epoch exceeds 'countThreshold' +// - total size of blobs in the epoch exceeds 'totalSizeBytesThreshold'. +func shouldAdvance(bms []blob.Metadata, minEpochDuration time.Duration, countThreshold int, totalSizeBytesThreshold int64) bool { + if len(bms) == 0 { + return false + } + + var ( + min = bms[0].Timestamp + max = bms[0].Timestamp + totalSize = int64(0) + ) + + for _, bm := range bms { + if bm.Timestamp.Before(min) { + min = bm.Timestamp + } + + if bm.Timestamp.After(max) { + max = bm.Timestamp + } + + totalSize += bm.Length + } + + // not enough time between first and last write in an epoch. + if max.Sub(min) < minEpochDuration { + return false + } + + if len(bms) >= countThreshold { + return true + } + + if totalSize >= totalSizeBytesThreshold { + return true + } + + return false +} diff --git a/internal/epoch/epoch_advance_test.go b/internal/epoch/epoch_advance_test.go new file mode 100644 index 000000000..0cabdb346 --- /dev/null +++ b/internal/epoch/epoch_advance_test.go @@ -0,0 +1,89 @@ +package epoch + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/repo/blob" +) + +func TestShouldAdvanceEpoch(t *testing.T) { + t0 := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) + + var lotsOfMetadata []blob.Metadata + + lotsOfMetadata = append(lotsOfMetadata, blob.Metadata{ + Timestamp: t0, Length: 1, + }) + + for i := 0; i < defaultParams.EpochAdvanceOnCountThreshold; i++ { + lotsOfMetadata = append(lotsOfMetadata, blob.Metadata{ + Timestamp: t0.Add(defaultParams.MinEpochDuration), + Length: 1, + }) + } + + cases := []struct { + desc string + bms []blob.Metadata + want bool + }{ + { + desc: "zero blobs", + bms: []blob.Metadata{}, + want: false, + }, + { + desc: "one blob", + bms: []blob.Metadata{ + {Timestamp: t0, Length: 1}, + }, + want: false, + }, + { + desc: "two blobs, not enough time passed, size enough to advance", + bms: []blob.Metadata{ + {Timestamp: t0.Add(defaultParams.MinEpochDuration - 1), Length: defaultParams.EpochAdvanceOnTotalSizeBytesThreshold}, + {Timestamp: t0, Length: 1}, + }, + want: false, + }, + { + desc: "two blobs, enough time passed, total size enough to advance", + bms: []blob.Metadata{ + {Timestamp: t0, Length: 1}, + {Timestamp: t0.Add(defaultParams.MinEpochDuration), Length: defaultParams.EpochAdvanceOnTotalSizeBytesThreshold}, + }, + want: true, + }, + { + desc: "two blobs, enough time passed, total size not enough to advance", + bms: []blob.Metadata{ + {Timestamp: t0, Length: 1}, + {Timestamp: t0.Add(defaultParams.MinEpochDuration), Length: defaultParams.EpochAdvanceOnTotalSizeBytesThreshold - 2}, + }, + want: false, + }, + { + desc: "enough time passed, count not enough to advance", + bms: []blob.Metadata{ + {Timestamp: t0, Length: 1}, + {Timestamp: t0.Add(defaultParams.MinEpochDuration), Length: 1}, + }, + want: false, + }, + { + desc: "enough time passed, count enough to advance", + bms: lotsOfMetadata, + want: true, + }, + } + + for _, tc := range cases { + require.Equal(t, tc.want, + shouldAdvance(tc.bms, defaultParams.MinEpochDuration, defaultParams.EpochAdvanceOnCountThreshold, defaultParams.EpochAdvanceOnTotalSizeBytesThreshold), + tc.desc) + } +} diff --git a/internal/epoch/epoch_manager.go b/internal/epoch/epoch_manager.go new file mode 100644 index 000000000..36bf9b3d9 --- /dev/null +++ b/internal/epoch/epoch_manager.go @@ -0,0 +1,582 @@ +// Package epoch manages repository epochs. +// It implements protocol described https://github.com/kopia/kopia/issues/1090 and is intentionally +// separate from 'content' package to be able to test in isolation. +package epoch + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + + "github.com/kopia/kopia/internal/clock" + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/internal/retry" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/logging" +) + +// Parameters encapsulates all parameters that influence the behavior of epoch manager. +type Parameters struct { + // how frequently each client will list blobs to determine the current epoch. + EpochRefreshFrequency time.Duration + + // number of epochs between full checkpoints. + FullCheckpointFrequency int + + // do not delete uncompacted blobs if the corresponding compacted blob age is less than this. + CleanupSafetyMargin time.Duration + + // minimum duration of an epoch + MinEpochDuration time.Duration + + // advance epoch if number of files exceeds this + EpochAdvanceOnCountThreshold int + + // advance epoch if total size of files exceeds this. + EpochAdvanceOnTotalSizeBytesThreshold int64 + + // number of blobs to delete in parallel during cleanup + DeleteParallelism int +} + +// nolint:gomnd +var defaultParams = Parameters{ + EpochRefreshFrequency: 20 * time.Minute, + FullCheckpointFrequency: 7, + CleanupSafetyMargin: 1 * time.Hour, + MinEpochDuration: 6 * time.Hour, + EpochAdvanceOnCountThreshold: 100, + EpochAdvanceOnTotalSizeBytesThreshold: 10 << 20, + DeleteParallelism: 4, +} + +// snapshot captures a point-in time snapshot of a repository indexes, including current epoch +// information and existing checkpoints. +type snapshot struct { + WriteEpoch int `json:"writeEpoch"` + LatestFullCheckpointEpoch int `json:"latestCheckpointEpoch"` + FullCheckpointSets map[int][]blob.Metadata `json:"fullCheckpointSets"` + SingleEpochCompactionSets map[int][]blob.Metadata `json:"singleEpochCompactionSets"` + EpochStartTime map[int]time.Time `json:"epochStartTimes"` + ValidUntil time.Time `json:"validUntil"` // time after which the contents of this struct are no longer valid +} + +func (cs *snapshot) isSettledEpochNumber(epoch int) bool { + return epoch <= cs.WriteEpoch-numUnsettledEpochs +} + +// Manager manages repository epochs. +type Manager struct { + st blob.Storage + compact CompactionFunc + log logging.Logger + timeFunc func() time.Time + params Parameters + + // wait group that waits for all compaction and cleanup goroutines. + backgroundWork sync.WaitGroup + + // mutable under lock, data invalid until refresh succeeds at least once. + mu sync.Mutex + lastKnownState snapshot + + // counters keeping track of the number of times operations were too slow and had to + // be retried, for testability. + committedStateRefreshTooSlow *int32 + getCompleteIndexSetTooSlow *int32 +} + +const ( + epochMarkerIndexBlobPrefix blob.ID = "xe" + uncompactedIndexBlobPrefix blob.ID = "xn" + singleEpochCompactionBlobPrefix blob.ID = "xs" + fullCheckpointIndexBlobPrefix blob.ID = "xf" + + numUnsettledEpochs = 2 +) + +// CompactionFunc merges the given set of index blobs into a new index blob set with a given prefix +// and writes them out as a set following naming convention established in 'complete_set.go'. +type CompactionFunc func(ctx context.Context, blobIDs []blob.ID, outputPrefix blob.ID) error + +// Flush waits for all in-process compaction work to complete. +func (e *Manager) Flush() { + // ensure all background compactions complete. + e.backgroundWork.Wait() +} + +// Refresh refreshes information about current epoch. +func (e *Manager) Refresh(ctx context.Context) error { + e.mu.Lock() + defer e.mu.Unlock() + + return e.refreshLocked(ctx) +} + +// Cleanup cleans up the old indexes for which there's a compacted replacement. +func (e *Manager) Cleanup(ctx context.Context) error { + cs, err := e.committedState(ctx) + if err != nil { + return err + } + + return e.cleanupInternal(ctx, cs) +} + +func (e *Manager) cleanupInternal(ctx context.Context, cs snapshot) error { + eg, ctx := errgroup.WithContext(ctx) + + // delete epoch markers for epoch < current-1 + eg.Go(func() error { + var toDelete []blob.ID + + if err := e.st.ListBlobs(ctx, epochMarkerIndexBlobPrefix, func(bm blob.Metadata) error { + if n, ok := epochNumberFromBlobID(bm.BlobID); ok { + if n < cs.WriteEpoch-1 { + toDelete = append(toDelete, bm.BlobID) + } + } + + return nil + }); err != nil { + return errors.Wrap(err, "error listing epoch markers") + } + + return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.params.DeleteParallelism), "error deleting index blob marker") + }) + + // delete uncompacted indexes for epochs that already have single-epoch compaction + // that was written sufficiently long ago. + eg.Go(func() error { + blobs, err := blob.ListAllBlobs(ctx, e.st, uncompactedIndexBlobPrefix) + if err != nil { + return errors.Wrap(err, "error listing uncompacted blobs") + } + + var toDelete []blob.ID + + for _, bm := range blobs { + if cs.safeToDeleteUncompactedBlob(bm, e.params.CleanupSafetyMargin) { + toDelete = append(toDelete, bm.BlobID) + } + } + + if err := blob.DeleteMultiple(ctx, e.st, toDelete, e.params.DeleteParallelism); err != nil { + return errors.Wrap(err, "unable to delete uncompacted blobs") + } + + return nil + }) + + // delete single-epoch compacted indexes epoch numbers for which full-world state compacted exist + if cs.LatestFullCheckpointEpoch > 0 { + eg.Go(func() error { + blobs, err := blob.ListAllBlobs(ctx, e.st, singleEpochCompactionBlobPrefix) + if err != nil { + return errors.Wrap(err, "error refreshing epochs") + } + + var toDelete []blob.ID + + for _, bm := range blobs { + epoch, ok := epochNumberFromBlobID(bm.BlobID) + if !ok { + continue + } + + if epoch < cs.LatestFullCheckpointEpoch { + toDelete = append(toDelete, bm.BlobID) + } + } + + return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.params.DeleteParallelism), "error deleting single-epoch compacted blobs") + }) + } + + return errors.Wrap(eg.Wait(), "error cleaning up index blobs") +} + +func (cs *snapshot) safeToDeleteUncompactedBlob(bm blob.Metadata, safetyMargin time.Duration) bool { + epoch, ok := epochNumberFromBlobID(bm.BlobID) + if !ok { + return false + } + + if epoch < cs.LatestFullCheckpointEpoch { + return true + } + + cset := cs.SingleEpochCompactionSets[epoch] + if cset == nil { + // single-epoch compaction set does not exist for this epoch, don't delete. + return false + } + + // compaction set was written sufficiently long ago to be reliably discovered by all + // other clients - we can delete uncompacted blobs for this epoch. + compactionSetWriteTime := blob.MaxTimestamp(cset) + + return compactionSetWriteTime.Add(safetyMargin).Before(cs.EpochStartTime[cs.WriteEpoch]) +} + +func (e *Manager) refreshLocked(ctx context.Context) error { + return errors.Wrap(retry.WithExponentialBackoffNoValue(ctx, "epoch manager refresh", func() error { + return e.refreshAttemptLocked(ctx) + }, retry.Always), "error refreshing") +} + +func (e *Manager) loadWriteEpoch(ctx context.Context, cs *snapshot) error { + blobs, err := blob.ListAllBlobs(ctx, e.st, epochMarkerIndexBlobPrefix) + if err != nil { + return errors.Wrap(err, "error loading write epoch") + } + + for epoch, bm := range groupByEpochNumber(blobs) { + cs.EpochStartTime[epoch] = bm[0].Timestamp + + if epoch > cs.WriteEpoch { + cs.WriteEpoch = epoch + } + } + + return nil +} + +func (e *Manager) loadFullCheckpoints(ctx context.Context, cs *snapshot) error { + blobs, err := blob.ListAllBlobs(ctx, e.st, fullCheckpointIndexBlobPrefix) + if err != nil { + return errors.Wrap(err, "error loading full checkpoints") + } + + for epoch, bms := range groupByEpochNumber(blobs) { + if comp := findCompleteSetOfBlobs(bms); comp != nil { + cs.FullCheckpointSets[epoch] = comp + + if epoch > cs.LatestFullCheckpointEpoch { + cs.LatestFullCheckpointEpoch = epoch + } + } + } + + return nil +} + +func (e *Manager) loadSingleEpochCompactions(ctx context.Context, cs *snapshot) error { + blobs, err := blob.ListAllBlobs(ctx, e.st, singleEpochCompactionBlobPrefix) + if err != nil { + return errors.Wrap(err, "error loading single-epoch compactions") + } + + for epoch, bms := range groupByEpochNumber(blobs) { + if comp := findCompleteSetOfBlobs(bms); comp != nil { + cs.SingleEpochCompactionSets[epoch] = comp + } + } + + return nil +} + +func (e *Manager) maybeStartFullCheckpointAsync(ctx context.Context, cs snapshot) { + if cs.WriteEpoch-cs.LatestFullCheckpointEpoch < e.params.FullCheckpointFrequency { + return + } + + e.backgroundWork.Add(1) + + go func() { + defer e.backgroundWork.Done() + + if err := e.generateFullCheckpointFromCommittedState(ctx, cs, cs.WriteEpoch-numUnsettledEpochs); err != nil { + e.log.Errorf("unable to generate full checkpoint: %v, performance will be affected", err) + } + }() +} + +func (e *Manager) maybeStartCleanupAsync(ctx context.Context, cs snapshot) { + e.backgroundWork.Add(1) + + go func() { + defer e.backgroundWork.Done() + + if err := e.cleanupInternal(ctx, cs); err != nil { + e.log.Errorf("error cleaning up index blobs: %v, performance may be affected", err) + } + }() +} + +// refreshAttemptLocked attempts to load the committedState of +// the index and updates `lastKnownState` state atomically when complete. +func (e *Manager) refreshAttemptLocked(ctx context.Context) error { + cs := snapshot{ + WriteEpoch: 0, + EpochStartTime: map[int]time.Time{}, + SingleEpochCompactionSets: map[int][]blob.Metadata{}, + LatestFullCheckpointEpoch: 0, + FullCheckpointSets: map[int][]blob.Metadata{}, + ValidUntil: e.timeFunc().Add(e.params.EpochRefreshFrequency), + } + + eg, ctx := errgroup.WithContext(ctx) + eg.Go(func() error { + return e.loadWriteEpoch(ctx, &cs) + }) + eg.Go(func() error { + return e.loadSingleEpochCompactions(ctx, &cs) + }) + eg.Go(func() error { + return e.loadFullCheckpoints(ctx, &cs) + }) + + if err := eg.Wait(); err != nil { + return errors.Wrap(err, "error refreshing") + } + + if e.timeFunc().After(cs.ValidUntil) { + atomic.AddInt32(e.committedStateRefreshTooSlow, 1) + + return errors.Errorf("refreshing committed state took too long") + } + + e.lastKnownState = cs + + e.maybeStartFullCheckpointAsync(ctx, cs) + e.maybeStartCleanupAsync(ctx, cs) + + e.log.Debugf("current epoch %v started at %v", cs.WriteEpoch, cs.EpochStartTime[cs.WriteEpoch]) + + return nil +} + +func (e *Manager) committedState(ctx context.Context) (snapshot, error) { + e.mu.Lock() + defer e.mu.Unlock() + + if e.timeFunc().After(e.lastKnownState.ValidUntil) { + if err := e.refreshLocked(ctx); err != nil { + return snapshot{}, err + } + } + + return e.lastKnownState, nil +} + +// Current returns the current epoch number. +func (e *Manager) Current(ctx context.Context) (int, error) { + cs, err := e.committedState(ctx) + if err != nil { + return 0, err + } + + return cs.WriteEpoch, nil +} + +// GetCompleteIndexSet returns the set of blobs forming a complete index set up to the provided epoch number. +func (e *Manager) GetCompleteIndexSet(ctx context.Context, maxEpoch int) ([]blob.ID, error) { + for { + cs, err := e.committedState(ctx) + if err != nil { + return nil, err + } + + result, err := e.getCompleteIndexSetForCommittedState(ctx, cs, maxEpoch) + if e.timeFunc().Before(cs.ValidUntil) { + return result, err + } + + // We need to retry if local process took too long (e.g. because the machine went + // to sleep at the wrong moment) and committed state is no longer valid. + // + // One scenario where this matters is cleanup: if determining the set of indexes takes + // too long, it's possible for a cleanup process to delete some of the uncompacted + // indexes that are still treated as authoritative according to old committed state. + // + // Retrying will re-examine the state of the world and re-do the logic. + e.log.Debugf("GetCompleteIndexSet took too long, retrying to ensure correctness") + atomic.AddInt32(e.getCompleteIndexSetTooSlow, 1) + } +} + +func (e *Manager) getCompleteIndexSetForCommittedState(ctx context.Context, cs snapshot, maxEpoch int) ([]blob.ID, error) { + var ( + startEpoch int + + resultMutex sync.Mutex + result []blob.ID + ) + + for i := maxEpoch; i >= 0; i-- { + if blobs := cs.FullCheckpointSets[i]; blobs != nil { + result = append(result, blob.IDsFromMetadata(blobs)...) + startEpoch = i + 1 + + e.log.Debugf("using full checkpoint at epoch %v", i) + + break + } + } + + eg, ctx := errgroup.WithContext(ctx) + + e.log.Debugf("adding incremental state for epochs %v..%v", startEpoch, maxEpoch) + + for i := startEpoch; i <= maxEpoch; i++ { + i := i + + eg.Go(func() error { + s, err := e.getIndexesFromEpochInternal(ctx, cs, i) + if err != nil { + return errors.Wrapf(err, "error getting indexes for epoch %v", i) + } + + resultMutex.Lock() + result = append(result, s...) + resultMutex.Unlock() + + return nil + }) + } + + return result, errors.Wrap(eg.Wait(), "error getting indexes") +} + +// WroteIndex is invoked after writing an index blob. It will validate whether the index was written +// in the correct epoch. +func (e *Manager) WroteIndex(ctx context.Context, bm blob.Metadata) error { + cs, err := e.committedState(ctx) + if err != nil { + return err + } + + epoch, ok := epochNumberFromBlobID(bm.BlobID) + if !ok { + return errors.Errorf("invalid blob ID written") + } + + if cs.isSettledEpochNumber(epoch) { + return errors.Errorf("index write took to long") + } + + e.invalidate() + + return nil +} + +func (e *Manager) getIndexesFromEpochInternal(ctx context.Context, cs snapshot, epoch int) ([]blob.ID, error) { + // check if the epoch is old enough to possibly have compacted blobs + epochSettled := cs.isSettledEpochNumber(epoch) + if epochSettled && cs.SingleEpochCompactionSets[epoch] != nil { + return blob.IDsFromMetadata(cs.SingleEpochCompactionSets[epoch]), nil + } + + // load uncompacted blobs for this epoch + uncompactedBlobs, err := blob.ListAllBlobs(ctx, e.st, uncompactedEpochBlobPrefix(epoch)) + if err != nil { + return nil, errors.Wrapf(err, "error listing uncompacted indexes for epoch %v", epoch) + } + + // Ignore blobs written after the epoch has been settled. + // + // Epochs N is 'settled' after epoch N+2 has been started and that makes N subject to compaction, + // because at this point all clients will agree that we're in epoch N+1 or N+2. + // + // In a pathological case it's possible for client to write a blob for a 'settled' epoch if they: + // + // 1. determine current epoch number (N). + // 2. go to sleep for a very long time, enough for epoch >=N+2 to become current. + // 3. write blob for the epoch number N + uncompactedBlobs = blobsWrittenBefore( + uncompactedBlobs, + cs.EpochStartTime[epoch+numUnsettledEpochs], + ) + + if epochSettled { + e.backgroundWork.Add(1) + + go func() { + defer e.backgroundWork.Done() + + e.log.Debugf("starting single-epoch compaction of %v", epoch) + + if err := e.compact(ctx, blob.IDsFromMetadata(uncompactedBlobs), compactedEpochBlobPrefix(epoch)); err != nil { + e.log.Errorf("unable to compact blobs for epoch %v: %v, performance will be affected", epoch, err) + } + }() + } + + advance := shouldAdvance(uncompactedBlobs, e.params.MinEpochDuration, e.params.EpochAdvanceOnCountThreshold, e.params.EpochAdvanceOnTotalSizeBytesThreshold) + if advance && epoch == cs.WriteEpoch { + if err := e.advanceEpoch(ctx, cs.WriteEpoch+1); err != nil { + e.log.Errorf("unable to advance epoch: %v, performance will be affected", err) + } + } + + // return uncompacted blobs to the caller while we're compacting them in background + return blob.IDsFromMetadata(uncompactedBlobs), nil +} + +func (e *Manager) advanceEpoch(ctx context.Context, newEpoch int) error { + blobID := blob.ID(fmt.Sprintf("%v%v", string(epochMarkerIndexBlobPrefix), newEpoch)) + + if err := e.st.PutBlob(ctx, blobID, gather.FromSlice([]byte("epoch-marker"))); err != nil { + return errors.Wrap(err, "error writing epoch marker") + } + + e.invalidate() + + return nil +} + +func (e *Manager) invalidate() { + e.mu.Lock() + defer e.mu.Unlock() + e.lastKnownState = snapshot{} +} + +func (e *Manager) generateFullCheckpointFromCommittedState(ctx context.Context, cs snapshot, epoch int) error { + e.log.Debugf("generating full checkpoint until epoch %v", epoch) + + completeSet, err := e.getCompleteIndexSetForCommittedState(ctx, cs, epoch) + if err != nil { + return errors.Wrap(err, "unable to get full checkpoint") + } + + if e.timeFunc().After(cs.ValidUntil) { + return errors.Errorf("not generating full checkpoint - the committed state is no longer valid") + } + + if err := e.compact(ctx, completeSet, fullCheckpointBlobPrefix(epoch)); err != nil { + return errors.Wrap(err, "unable to compact blobs") + } + + return nil +} + +func uncompactedEpochBlobPrefix(epoch int) blob.ID { + return blob.ID(fmt.Sprintf("%v%v_", uncompactedIndexBlobPrefix, epoch)) +} + +func compactedEpochBlobPrefix(epoch int) blob.ID { + return blob.ID(fmt.Sprintf("%v%v_", singleEpochCompactionBlobPrefix, epoch)) +} + +func fullCheckpointBlobPrefix(epoch int) blob.ID { + return blob.ID(fmt.Sprintf("%v%v_", fullCheckpointIndexBlobPrefix, epoch)) +} + +// NewManager creates new epoch manager. +func NewManager(st blob.Storage, params Parameters, compactor CompactionFunc, sharedBaseLogger logging.Logger) *Manager { + return &Manager{ + st: st, + log: logging.WithPrefix("[epoch-manager] ", sharedBaseLogger), + compact: compactor, + timeFunc: clock.Now, + params: params, + getCompleteIndexSetTooSlow: new(int32), + committedStateRefreshTooSlow: new(int32), + } +} diff --git a/internal/epoch/epoch_manager_test.go b/internal/epoch/epoch_manager_test.go new file mode 100644 index 000000000..c8d8ef010 --- /dev/null +++ b/internal/epoch/epoch_manager_test.go @@ -0,0 +1,445 @@ +package epoch + +import ( + "context" + "encoding/json" + "fmt" + "math" + "math/rand" + "sort" + "sync/atomic" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/internal/blobtesting" + "github.com/kopia/kopia/internal/faketime" + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/blob/logging" +) + +const verifyAllEpochs = -1 + +type fakeIndex struct { + Entries []int `json:"entries"` +} + +func (n *fakeIndex) Bytes() []byte { + v, err := json.Marshal(n) + if err != nil { + panic("err: " + err.Error()) + } + + return v +} + +func parseFakeIndex(b []byte) (*fakeIndex, error) { + r := &fakeIndex{} + err := json.Unmarshal(b, &r) + + return r, errors.Wrap(err, "error unmashaling JSON") +} + +func newFakeIndexWithEntries(entries ...int) *fakeIndex { + return &fakeIndex{ + Entries: entries, + } +} + +type epochManagerTestEnv struct { + data blobtesting.DataMap + unloggedst blob.Storage + st blob.Storage + ft *faketime.ClockTimeWithOffset + mgr *Manager + faultyStorage *blobtesting.FaultyStorage +} + +func (te *epochManagerTestEnv) compact(ctx context.Context, blobs []blob.ID, prefix blob.ID) error { + merged, err := te.getMergedIndexContents(ctx, blobs) + if err != nil { + return errors.Wrap(err, "unable to merge") + } + + return errors.Wrap( + te.st.PutBlob(ctx, blob.ID(fmt.Sprintf("%v%016x-s0-c1", prefix, rand.Int63())), gather.FromSlice(merged.Bytes())), + "PutBlob error") +} + +// write two dummy compaction blobs instead of 3, simulating a compaction that crashed before fully complete. +func (te *epochManagerTestEnv) interruptedCompaction(ctx context.Context, _ []blob.ID, prefix blob.ID) error { + sess := rand.Int63() + + te.st.PutBlob(ctx, blob.ID(fmt.Sprintf("%v%016x-s%v-c3", prefix, sess, rand.Int63())), gather.FromSlice([]byte("dummy"))) + te.st.PutBlob(ctx, blob.ID(fmt.Sprintf("%v%016x-s%v-c3", prefix, sess, rand.Int63())), gather.FromSlice([]byte("dummy"))) + + return errors.Errorf("failed for some reason") +} + +func newTestEnv(t *testing.T) *epochManagerTestEnv { + t.Helper() + + data := blobtesting.DataMap{} + ft := faketime.NewClockTimeWithOffset(0) + st := blobtesting.NewMapStorage(data, nil, ft.NowFunc()) + unloggedst := st + fs := &blobtesting.FaultyStorage{ + Base: st, + } + st = fs + st = logging.NewWrapper(st, t.Logf, "[STORAGE] ") + te := &epochManagerTestEnv{unloggedst: unloggedst, st: st, ft: ft} + m := NewManager(te.st, Parameters{ + EpochRefreshFrequency: 20 * time.Minute, + FullCheckpointFrequency: 7, + CleanupSafetyMargin: 1 * time.Hour, + MinEpochDuration: 12 * time.Hour, + EpochAdvanceOnCountThreshold: 25, + EpochAdvanceOnTotalSizeBytesThreshold: 20 << 20, + DeleteParallelism: 1, + }, te.compact, testlogging.NewTestLogger(t)) + m.timeFunc = te.ft.NowFunc() + te.mgr = m + te.faultyStorage = fs + te.data = data + + return te +} + +func TestIndexEpochManager_Regular(t *testing.T) { + t.Parallel() + + te := newTestEnv(t) + + verifySequentialWrites(t, te) +} + +func TestIndexEpochManager_RogueBlobs(t *testing.T) { + t.Parallel() + + te := newTestEnv(t) + + te.data[epochMarkerIndexBlobPrefix+"zzzz"] = []byte{1} + te.data[singleEpochCompactionBlobPrefix+"zzzz"] = []byte{1} + te.data[fullCheckpointIndexBlobPrefix+"zzzz"] = []byte{1} + + verifySequentialWrites(t, te) + te.mgr.Cleanup(testlogging.Context(t)) +} + +func TestIndexEpochManager_CompactionSilentlyDoesNothing(t *testing.T) { + t.Parallel() + + te := newTestEnv(t) + + // set up test environment in which compactions never succeed for whatever reason. + te.mgr.compact = func(ctx context.Context, blobIDs []blob.ID, outputPrefix blob.ID) error { + return nil + } + + verifySequentialWrites(t, te) +} + +func TestIndexEpochManager_CompactionAlwaysFails(t *testing.T) { + t.Parallel() + + te := newTestEnv(t) + + // set up test environment in which compactions never succeed for whatever reason. + te.mgr.compact = func(ctx context.Context, blobIDs []blob.ID, outputPrefix blob.ID) error { + return nil + } + + verifySequentialWrites(t, te) +} + +func TestIndexEpochManager_CompactionRandomlyCrashed(t *testing.T) { + t.Parallel() + + te := newTestEnv(t) + + // set up test environment in which compactions never succeed for whatever reason. + te.mgr.compact = func(ctx context.Context, blobIDs []blob.ID, outputPrefix blob.ID) error { + if rand.Intn(100) < 20 { + return te.interruptedCompaction(ctx, blobIDs, outputPrefix) + } + + return te.compact(ctx, blobIDs, outputPrefix) + } + + verifySequentialWrites(t, te) +} + +func TestIndexEpochManager_DeletionFailing(t *testing.T) { + t.Parallel() + + te := newTestEnv(t) + te.faultyStorage.Faults = map[string][]*blobtesting.Fault{ + "DeleteBlob": { + {Repeat: math.MaxInt32, Err: errors.Errorf("something bad happened")}, + }, + } + + // set up test environment in which compactions never succeed for whatever reason. + te.mgr.compact = func(ctx context.Context, blobIDs []blob.ID, outputPrefix blob.ID) error { + if rand.Intn(100) < 20 { + return te.interruptedCompaction(ctx, blobIDs, outputPrefix) + } + + return te.compact(ctx, blobIDs, outputPrefix) + } + + verifySequentialWrites(t, te) +} + +func TestRefreshRetriesIfTakingTooLong(t *testing.T) { + te := newTestEnv(t) + defer te.mgr.Flush() + + te.faultyStorage.Faults = map[string][]*blobtesting.Fault{ + "ListBlobs": { + &blobtesting.Fault{ + Repeat: 4, // refresh does 3 lists, so this will cause 2 unsuccessful retries + ErrCallback: func() error { + te.ft.Advance(24 * time.Hour) + + return nil + }, + }, + }, + } + + ctx := testlogging.Context(t) + + require.NoError(t, te.mgr.Refresh(ctx)) + + require.EqualValues(t, 2, *te.mgr.committedStateRefreshTooSlow) +} + +func TestGetCompleteIndexSetRetriesIfTookTooLong(t *testing.T) { + te := newTestEnv(t) + defer te.mgr.Flush() + + ctx := testlogging.Context(t) + + // load committed state + require.NoError(t, te.mgr.Refresh(ctx)) + + cnt := new(int32) + + // ensure we're not running any background goroutines before modifying 'Faults' + te.mgr.Flush() + + te.faultyStorage.Faults = map[string][]*blobtesting.Fault{ + "ListBlobs": { + &blobtesting.Fault{ + Repeat: 1000, + ErrCallback: func() error { + if atomic.AddInt32(cnt, 1) == 1 { + te.ft.Advance(24 * time.Hour) + } + + return nil + }, + }, + }, + } + + _, err := te.mgr.GetCompleteIndexSet(ctx, 0) + require.NoError(t, err) + + require.EqualValues(t, 1, *te.mgr.getCompleteIndexSetTooSlow) +} + +func TestLateWriteIsIgnored(t *testing.T) { + te := newTestEnv(t) + defer te.mgr.Flush() + + ctx := testlogging.Context(t) + + // get current epoch number + epoch, err := te.mgr.Current(ctx) + require.NoError(t, err) + + var rnd [8]byte + + rand.Read(rnd[:]) + + blobID1 := blob.ID(fmt.Sprintf("%v%v_%x", uncompactedIndexBlobPrefix, epoch, rnd[:])) + + rand.Read(rnd[:]) + blobID2 := blob.ID(fmt.Sprintf("%v%v_%x", uncompactedIndexBlobPrefix, epoch, rnd[:])) + + // at this point it's possible that the process hangs for a very long time, during which the + // current epoch moves by 2. This would be dangerous, since we'd potentially modify an already + // settled epoch. + // To verify this, we call WroteIndex() after the write which will fail if the write finished + // late. During read we will ignore index files with dates that are too late. + + // simulate process process hanging for a very long time, during which time the epoch moves. + for i := 0; i < 30; i++ { + te.mustWriteIndexFile(ctx, t, newFakeIndexWithEntries(100+i)) + te.ft.Advance(time.Hour) + } + + // epoch advance is triggered during reads. + _, err = te.mgr.GetCompleteIndexSet(ctx, epoch+1) + require.NoError(t, err) + + // make sure the epoch has moved + epoch2, err := te.mgr.Current(ctx) + require.NoError(t, err) + require.Equal(t, epoch+1, epoch2) + + require.NoError(t, te.st.PutBlob(ctx, blobID1, gather.FromSlice([]byte("dummy")))) + bm, err := te.unloggedst.GetMetadata(ctx, blobID1) + + require.NoError(t, err) + + // it's not an error to finish the write in the next epoch. + require.NoError(t, te.mgr.WroteIndex(ctx, bm)) + + // move the epoch one more. + for i := 0; i < 30; i++ { + te.mustWriteIndexFile(ctx, t, newFakeIndexWithEntries(100+i)) + te.ft.Advance(time.Hour) + } + + // epoch advance is triggered during reads. + _, err = te.mgr.GetCompleteIndexSet(ctx, epoch+2) + require.NoError(t, err) + + // make sure the epoch has moved + epoch3, err := te.mgr.Current(ctx) + require.NoError(t, err) + require.Equal(t, epoch+2, epoch3) + + // on Windows the time does not always move forward, give it a nudge. + te.ft.Advance(2 * time.Second) + + require.NoError(t, te.st.PutBlob(ctx, blobID2, gather.FromSlice([]byte("dummy")))) + bm, err = te.unloggedst.GetMetadata(ctx, blobID2) + + require.NoError(t, err) + + // at this point WroteIndex() will fail because epoch #0 is already settled. + require.Error(t, te.mgr.WroteIndex(ctx, bm)) + + iset, err := te.mgr.GetCompleteIndexSet(ctx, epoch3) + require.NoError(t, err) + + // blobID1 will be included in the index. + require.Contains(t, iset, blobID1) + + // blobID2 will be excluded from the index. + require.NotContains(t, iset, blobID2) +} + +// nolint:thelper +func verifySequentialWrites(t *testing.T, te *epochManagerTestEnv) { + ctx := testlogging.Context(t) + expected := &fakeIndex{} + + endTime := te.ft.NowFunc()().Add(90 * 24 * time.Hour) + + indexNum := 1 + + for te.ft.NowFunc()().Before(endTime) { + indexNum++ + + te.mustWriteIndexFile(ctx, t, newFakeIndexWithEntries(indexNum)) + + expected.Entries = append(expected.Entries, indexNum) + te.verifyCompleteIndexSet(ctx, t, verifyAllEpochs, expected) + + dt := randomTime(1*time.Minute, 8*time.Hour) + t.Logf("advancing time by %v", dt) + te.ft.Advance(dt) + + if indexNum%7 == 0 { + require.NoError(t, te.mgr.Refresh(ctx)) + } + + if indexNum%27 == 0 { + // do not require.NoError because we'll be sometimes inducing faults + te.mgr.Cleanup(ctx) + } + } + + te.mgr.Flush() + + for k, v := range te.data { + t.Logf("data: %v (%v)", k, len(v)) + } + + t.Logf("total written %v", indexNum) + t.Logf("total remaining %v", len(te.data)) +} + +func randomTime(min, max time.Duration) time.Duration { + return time.Duration(float64(max-min)*rand.Float64() + float64(min)) +} + +func (te *epochManagerTestEnv) verifyCompleteIndexSet(ctx context.Context, t *testing.T, maxEpoch int, want *fakeIndex) { + t.Helper() + + if maxEpoch == verifyAllEpochs { + n, err := te.mgr.Current(ctx) + require.NoError(t, err) + + maxEpoch = n + 1 + } + + blobs, err := te.mgr.GetCompleteIndexSet(ctx, maxEpoch) + t.Logf("complete set length: %v", len(blobs)) + require.NoError(t, err) + + merged, err := te.getMergedIndexContents(ctx, blobs) + require.NoError(t, err) + require.Equal(t, want.Entries, merged.Entries) +} + +func (te *epochManagerTestEnv) getMergedIndexContents(ctx context.Context, blobIDs []blob.ID) (*fakeIndex, error) { + result := &fakeIndex{} + + for _, blobID := range blobIDs { + v, err := te.unloggedst.GetBlob(ctx, blobID, 0, -1) + if err != nil { + return nil, errors.Wrap(err, "unable to get blob") + } + + ndx, err := parseFakeIndex(v) + if err != nil { + return nil, errors.Wrap(err, "unable to parse fake index") + } + + result.Entries = append(result.Entries, ndx.Entries...) + } + + sort.Ints(result.Entries) + + return result, nil +} + +func (te *epochManagerTestEnv) mustWriteIndexFile(ctx context.Context, t *testing.T, ndx *fakeIndex) { + t.Helper() + + epoch, err := te.mgr.Current(ctx) + require.NoError(t, err) + + var rnd [8]byte + + rand.Read(rnd[:]) + + blobID := blob.ID(fmt.Sprintf("%v%v_%x", uncompactedIndexBlobPrefix, epoch, rnd[:])) + + require.NoError(t, te.st.PutBlob(ctx, blobID, gather.FromSlice(ndx.Bytes()))) + bm, err := te.unloggedst.GetMetadata(ctx, blobID) + + require.NoError(t, err) + require.NoError(t, te.mgr.WroteIndex(ctx, bm)) +} diff --git a/internal/epoch/epoch_utils.go b/internal/epoch/epoch_utils.go new file mode 100644 index 000000000..aaebca3c0 --- /dev/null +++ b/internal/epoch/epoch_utils.go @@ -0,0 +1,57 @@ +package epoch + +import ( + "strconv" + "strings" + "time" + "unicode" + + "github.com/kopia/kopia/repo/blob" +) + +// epochNumberFromBlobID extracts the epoch number from a string formatted as +// _. +func epochNumberFromBlobID(blobID blob.ID) (int, bool) { + s := string(blobID) + + if p := strings.IndexByte(s, '_'); p >= 0 { + s = s[0:p] + } + + for len(s) > 0 && !unicode.IsDigit(rune(s[0])) { + s = s[1:] + } + + n, err := strconv.Atoi(s) + if err != nil { + return 0, false + } + + return n, true +} + +func blobsWrittenBefore(bms []blob.Metadata, maxTime time.Time) []blob.Metadata { + var result []blob.Metadata + + for _, bm := range bms { + if !maxTime.IsZero() && bm.Timestamp.After(maxTime) { + continue + } + + result = append(result, bm) + } + + return result +} + +func groupByEpochNumber(bms []blob.Metadata) map[int][]blob.Metadata { + result := map[int][]blob.Metadata{} + + for _, bm := range bms { + if n, ok := epochNumberFromBlobID(bm.BlobID); ok { + result[n] = append(result[n], bm) + } + } + + return result +} diff --git a/internal/epoch/epoch_utils_test.go b/internal/epoch/epoch_utils_test.go new file mode 100644 index 000000000..50c31361a --- /dev/null +++ b/internal/epoch/epoch_utils_test.go @@ -0,0 +1,106 @@ +package epoch + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/repo/blob" +) + +func TestEpochNumberFromBlobID(t *testing.T) { + cases := []struct { + input blob.ID + want int + }{ + {"pppp9", 9}, + {"x7", 7}, + {"x01234_1235", 1234}, + {"x0_1235", 0}, + {"abc01234_", 1234}, + {"abc1234_", 1234}, + {"abc1234_xxxx-sadfasd", 1234}, + } + + for _, tc := range cases { + n, ok := epochNumberFromBlobID(tc.input) + require.True(t, ok, tc.input) + require.Equal(t, tc.want, n) + } +} + +func TestEpochNumberFromBlobID_Invalid(t *testing.T) { + cases := []blob.ID{ + "_", + "a_", + "x123x_", + } + + for _, tc := range cases { + _, ok := epochNumberFromBlobID(tc) + require.False(t, ok, "epochNumberFromBlobID(%v)", tc) + } +} + +func TestBlobsWrittenBefore(t *testing.T) { + t0 := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) + bm0 := blob.Metadata{BlobID: "a", Timestamp: t0} + + t1 := time.Date(2000, 1, 2, 0, 0, 0, 0, time.UTC) + bm1 := blob.Metadata{BlobID: "b", Timestamp: t1} + + t2 := time.Date(2000, 1, 3, 0, 0, 0, 0, time.UTC) + bm2 := blob.Metadata{BlobID: "c", Timestamp: t2} + + cases := []struct { + bms []blob.Metadata + cutoff time.Time + want []blob.Metadata + }{ + {[]blob.Metadata{bm0, bm1, bm2}, time.Time{}, []blob.Metadata{bm0, bm1, bm2}}, + {[]blob.Metadata{bm0, bm1, bm2}, t0.Add(-1), nil}, + {[]blob.Metadata{bm0, bm1, bm2}, t0, []blob.Metadata{bm0}}, + {[]blob.Metadata{bm0, bm1, bm2}, t1.Add(-1), []blob.Metadata{bm0}}, + {[]blob.Metadata{bm0, bm1, bm2}, t1, []blob.Metadata{bm0, bm1}}, + {[]blob.Metadata{bm0, bm1, bm2}, t2.Add(-1), []blob.Metadata{bm0, bm1}}, + {[]blob.Metadata{bm0, bm1, bm2}, t2, []blob.Metadata{bm0, bm1, bm2}}, + } + + for i, tc := range cases { + require.Equal(t, tc.want, blobsWrittenBefore(tc.bms, tc.cutoff), "#%v blobsWrittenBefore(%v,%v)", i, tc.bms, tc.cutoff) + } +} + +func TestGroupByEpochNumber(t *testing.T) { + cases := []struct { + input []blob.Metadata + want map[int][]blob.Metadata + }{ + { + input: []blob.Metadata{ + {BlobID: "e1_abc"}, + {BlobID: "e2_cde"}, + {BlobID: "e1_def"}, + {BlobID: "e3_fgh"}, + }, + want: map[int][]blob.Metadata{ + 1: { + {BlobID: "e1_abc"}, + {BlobID: "e1_def"}, + }, + 2: { + {BlobID: "e2_cde"}, + }, + 3: { + {BlobID: "e3_fgh"}, + }, + }, + }, + } + + for _, tc := range cases { + got := groupByEpochNumber(tc.input) + require.Equal(t, tc.want, got) + } +} diff --git a/internal/faketime/faketime.go b/internal/faketime/faketime.go index 5650189e1..f14519af9 100644 --- a/internal/faketime/faketime.go +++ b/internal/faketime/faketime.go @@ -2,6 +2,7 @@ package faketime import ( + "sync" "sync/atomic" "time" @@ -59,6 +60,7 @@ func (t *TimeAdvance) Advance(dt time.Duration) time.Time { // ClockTimeWithOffset allows controlling the passage of time. Intended to be used in // tests. type ClockTimeWithOffset struct { + mu sync.Mutex offset time.Duration } @@ -70,6 +72,9 @@ func NewClockTimeWithOffset(offset time.Duration) *ClockTimeWithOffset { // NowFunc returns a time provider function for t. func (t *ClockTimeWithOffset) NowFunc() func() time.Time { return func() time.Time { + t.mu.Lock() + defer t.mu.Unlock() + return clock.Now().Add(t.offset) } } @@ -77,6 +82,10 @@ func (t *ClockTimeWithOffset) NowFunc() func() time.Time { // Advance advances t by dt, such that the next call to t.NowFunc()() returns // current t + dt. func (t *ClockTimeWithOffset) Advance(dt time.Duration) time.Time { + t.mu.Lock() + defer t.mu.Unlock() + t.offset += dt + return clock.Now().Add(t.offset) }