epoch: added epoch manager + unit tests (#1128)

This commit is contained in:
Jarek Kowalski
2021-06-11 18:57:21 -07:00
committed by GitHub
parent 46a6cc3f24
commit 7735fcc525
9 changed files with 1482 additions and 0 deletions

View File

@@ -0,0 +1,45 @@
package epoch
import (
"strconv"
"strings"
"github.com/kopia/kopia/repo/blob"
)
// findCompleteSetOfBlobs looks for a complete set of blobs IDs following a naming convention:
// '<any>-s<set>-c<count>'
// where:
// 'prefix' is arbitrary string not containing a dash ('-')
// 'set' is a random string shared by all indexes in the same set
// 'count' is a number that specifies how many items must be in the set to make it complete.
//
// The algorithm returns IDs of blobs that form the first complete set.
func findCompleteSetOfBlobs(bms []blob.Metadata) []blob.Metadata {
sets := map[string][]blob.Metadata{}
for _, bm := range bms {
id := bm.BlobID
parts := strings.Split(string(id), "-")
if len(parts) < 3 || !strings.HasPrefix(parts[1], "s") || !strings.HasPrefix(parts[2], "c") {
// malformed ID, ignore
continue
}
count, err := strconv.Atoi(parts[2][1:])
if err != nil {
// malformed ID, ignore
continue
}
setID := parts[1]
sets[setID] = append(sets[setID], bm)
if len(sets[setID]) == count {
return sets[setID]
}
}
return nil
}

View File

@@ -0,0 +1,96 @@
package epoch
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/repo/blob"
)
func TestFindCompleteSetOfBlobs(t *testing.T) {
cases := []struct {
input []blob.ID
want []blob.ID
}{
{
input: []blob.ID{},
want: []blob.ID{},
},
// one complete session of size 2
{
input: []blob.ID{
"a-s0-c2",
"b-s0-c2",
},
want: []blob.ID{
"a-s0-c2",
"b-s0-c2",
},
},
// one complete session with some malformed name
{
input: []blob.ID{
"a-s0-c2",
"malformed",
"b-s0-c2",
},
want: []blob.ID{
"a-s0-c2",
"b-s0-c2",
},
},
// one complete session with some malformed blob ID
{
input: []blob.ID{
"a-s0-c2",
"malformed-s0-x2",
"b-s0-c2",
},
want: []blob.ID{
"a-s0-c2",
"b-s0-c2",
},
},
// one complete session with some malformed count
{
input: []blob.ID{
"a-s0-c2",
"malformed-s0-cNAN",
"b-s0-c2",
},
want: []blob.ID{
"a-s0-c2",
"b-s0-c2",
},
},
// two complete sessions, we pick 's0' as it's the first one to become complete.
{
input: []blob.ID{
"foo-s0-c2",
"aaa-s1-c2",
"bar-s0-c2",
"bbb-s1-c2",
},
want: []blob.ID{
"foo-s0-c2",
"bar-s0-c2",
},
},
}
for _, tc := range cases {
require.Equal(t, tc.want, blob.IDsFromMetadata(findCompleteSetOfBlobs(dummyMetadataForIDs(tc.input))), "invalid result for %v", tc.input)
}
}
func dummyMetadataForIDs(ids []blob.ID) []blob.Metadata {
var result []blob.Metadata
for _, id := range ids {
result = append(result, blob.Metadata{BlobID: id})
}
return result
}

View File

@@ -0,0 +1,53 @@
package epoch
import (
"time"
"github.com/kopia/kopia/repo/blob"
)
// shouldAdvanceEpoch determines if the current epoch should be advanced based on set of blobs in it.
//
// Epoch will be advanced if it's been more than 'minEpochDuration' between earliest and
// most recent write AND at least one of the criteria has been met:
//
// - number of blobs in the epoch exceeds 'countThreshold'
// - total size of blobs in the epoch exceeds 'totalSizeBytesThreshold'.
func shouldAdvance(bms []blob.Metadata, minEpochDuration time.Duration, countThreshold int, totalSizeBytesThreshold int64) bool {
if len(bms) == 0 {
return false
}
var (
min = bms[0].Timestamp
max = bms[0].Timestamp
totalSize = int64(0)
)
for _, bm := range bms {
if bm.Timestamp.Before(min) {
min = bm.Timestamp
}
if bm.Timestamp.After(max) {
max = bm.Timestamp
}
totalSize += bm.Length
}
// not enough time between first and last write in an epoch.
if max.Sub(min) < minEpochDuration {
return false
}
if len(bms) >= countThreshold {
return true
}
if totalSize >= totalSizeBytesThreshold {
return true
}
return false
}

View File

@@ -0,0 +1,89 @@
package epoch
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/repo/blob"
)
func TestShouldAdvanceEpoch(t *testing.T) {
t0 := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)
var lotsOfMetadata []blob.Metadata
lotsOfMetadata = append(lotsOfMetadata, blob.Metadata{
Timestamp: t0, Length: 1,
})
for i := 0; i < defaultParams.EpochAdvanceOnCountThreshold; i++ {
lotsOfMetadata = append(lotsOfMetadata, blob.Metadata{
Timestamp: t0.Add(defaultParams.MinEpochDuration),
Length: 1,
})
}
cases := []struct {
desc string
bms []blob.Metadata
want bool
}{
{
desc: "zero blobs",
bms: []blob.Metadata{},
want: false,
},
{
desc: "one blob",
bms: []blob.Metadata{
{Timestamp: t0, Length: 1},
},
want: false,
},
{
desc: "two blobs, not enough time passed, size enough to advance",
bms: []blob.Metadata{
{Timestamp: t0.Add(defaultParams.MinEpochDuration - 1), Length: defaultParams.EpochAdvanceOnTotalSizeBytesThreshold},
{Timestamp: t0, Length: 1},
},
want: false,
},
{
desc: "two blobs, enough time passed, total size enough to advance",
bms: []blob.Metadata{
{Timestamp: t0, Length: 1},
{Timestamp: t0.Add(defaultParams.MinEpochDuration), Length: defaultParams.EpochAdvanceOnTotalSizeBytesThreshold},
},
want: true,
},
{
desc: "two blobs, enough time passed, total size not enough to advance",
bms: []blob.Metadata{
{Timestamp: t0, Length: 1},
{Timestamp: t0.Add(defaultParams.MinEpochDuration), Length: defaultParams.EpochAdvanceOnTotalSizeBytesThreshold - 2},
},
want: false,
},
{
desc: "enough time passed, count not enough to advance",
bms: []blob.Metadata{
{Timestamp: t0, Length: 1},
{Timestamp: t0.Add(defaultParams.MinEpochDuration), Length: 1},
},
want: false,
},
{
desc: "enough time passed, count enough to advance",
bms: lotsOfMetadata,
want: true,
},
}
for _, tc := range cases {
require.Equal(t, tc.want,
shouldAdvance(tc.bms, defaultParams.MinEpochDuration, defaultParams.EpochAdvanceOnCountThreshold, defaultParams.EpochAdvanceOnTotalSizeBytesThreshold),
tc.desc)
}
}

View File

@@ -0,0 +1,582 @@
// Package epoch manages repository epochs.
// It implements protocol described https://github.com/kopia/kopia/issues/1090 and is intentionally
// separate from 'content' package to be able to test in isolation.
package epoch
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/retry"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/logging"
)
// Parameters encapsulates all parameters that influence the behavior of epoch manager.
type Parameters struct {
// how frequently each client will list blobs to determine the current epoch.
EpochRefreshFrequency time.Duration
// number of epochs between full checkpoints.
FullCheckpointFrequency int
// do not delete uncompacted blobs if the corresponding compacted blob age is less than this.
CleanupSafetyMargin time.Duration
// minimum duration of an epoch
MinEpochDuration time.Duration
// advance epoch if number of files exceeds this
EpochAdvanceOnCountThreshold int
// advance epoch if total size of files exceeds this.
EpochAdvanceOnTotalSizeBytesThreshold int64
// number of blobs to delete in parallel during cleanup
DeleteParallelism int
}
// nolint:gomnd
var defaultParams = Parameters{
EpochRefreshFrequency: 20 * time.Minute,
FullCheckpointFrequency: 7,
CleanupSafetyMargin: 1 * time.Hour,
MinEpochDuration: 6 * time.Hour,
EpochAdvanceOnCountThreshold: 100,
EpochAdvanceOnTotalSizeBytesThreshold: 10 << 20,
DeleteParallelism: 4,
}
// snapshot captures a point-in time snapshot of a repository indexes, including current epoch
// information and existing checkpoints.
type snapshot struct {
WriteEpoch int `json:"writeEpoch"`
LatestFullCheckpointEpoch int `json:"latestCheckpointEpoch"`
FullCheckpointSets map[int][]blob.Metadata `json:"fullCheckpointSets"`
SingleEpochCompactionSets map[int][]blob.Metadata `json:"singleEpochCompactionSets"`
EpochStartTime map[int]time.Time `json:"epochStartTimes"`
ValidUntil time.Time `json:"validUntil"` // time after which the contents of this struct are no longer valid
}
func (cs *snapshot) isSettledEpochNumber(epoch int) bool {
return epoch <= cs.WriteEpoch-numUnsettledEpochs
}
// Manager manages repository epochs.
type Manager struct {
st blob.Storage
compact CompactionFunc
log logging.Logger
timeFunc func() time.Time
params Parameters
// wait group that waits for all compaction and cleanup goroutines.
backgroundWork sync.WaitGroup
// mutable under lock, data invalid until refresh succeeds at least once.
mu sync.Mutex
lastKnownState snapshot
// counters keeping track of the number of times operations were too slow and had to
// be retried, for testability.
committedStateRefreshTooSlow *int32
getCompleteIndexSetTooSlow *int32
}
const (
epochMarkerIndexBlobPrefix blob.ID = "xe"
uncompactedIndexBlobPrefix blob.ID = "xn"
singleEpochCompactionBlobPrefix blob.ID = "xs"
fullCheckpointIndexBlobPrefix blob.ID = "xf"
numUnsettledEpochs = 2
)
// CompactionFunc merges the given set of index blobs into a new index blob set with a given prefix
// and writes them out as a set following naming convention established in 'complete_set.go'.
type CompactionFunc func(ctx context.Context, blobIDs []blob.ID, outputPrefix blob.ID) error
// Flush waits for all in-process compaction work to complete.
func (e *Manager) Flush() {
// ensure all background compactions complete.
e.backgroundWork.Wait()
}
// Refresh refreshes information about current epoch.
func (e *Manager) Refresh(ctx context.Context) error {
e.mu.Lock()
defer e.mu.Unlock()
return e.refreshLocked(ctx)
}
// Cleanup cleans up the old indexes for which there's a compacted replacement.
func (e *Manager) Cleanup(ctx context.Context) error {
cs, err := e.committedState(ctx)
if err != nil {
return err
}
return e.cleanupInternal(ctx, cs)
}
func (e *Manager) cleanupInternal(ctx context.Context, cs snapshot) error {
eg, ctx := errgroup.WithContext(ctx)
// delete epoch markers for epoch < current-1
eg.Go(func() error {
var toDelete []blob.ID
if err := e.st.ListBlobs(ctx, epochMarkerIndexBlobPrefix, func(bm blob.Metadata) error {
if n, ok := epochNumberFromBlobID(bm.BlobID); ok {
if n < cs.WriteEpoch-1 {
toDelete = append(toDelete, bm.BlobID)
}
}
return nil
}); err != nil {
return errors.Wrap(err, "error listing epoch markers")
}
return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.params.DeleteParallelism), "error deleting index blob marker")
})
// delete uncompacted indexes for epochs that already have single-epoch compaction
// that was written sufficiently long ago.
eg.Go(func() error {
blobs, err := blob.ListAllBlobs(ctx, e.st, uncompactedIndexBlobPrefix)
if err != nil {
return errors.Wrap(err, "error listing uncompacted blobs")
}
var toDelete []blob.ID
for _, bm := range blobs {
if cs.safeToDeleteUncompactedBlob(bm, e.params.CleanupSafetyMargin) {
toDelete = append(toDelete, bm.BlobID)
}
}
if err := blob.DeleteMultiple(ctx, e.st, toDelete, e.params.DeleteParallelism); err != nil {
return errors.Wrap(err, "unable to delete uncompacted blobs")
}
return nil
})
// delete single-epoch compacted indexes epoch numbers for which full-world state compacted exist
if cs.LatestFullCheckpointEpoch > 0 {
eg.Go(func() error {
blobs, err := blob.ListAllBlobs(ctx, e.st, singleEpochCompactionBlobPrefix)
if err != nil {
return errors.Wrap(err, "error refreshing epochs")
}
var toDelete []blob.ID
for _, bm := range blobs {
epoch, ok := epochNumberFromBlobID(bm.BlobID)
if !ok {
continue
}
if epoch < cs.LatestFullCheckpointEpoch {
toDelete = append(toDelete, bm.BlobID)
}
}
return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.params.DeleteParallelism), "error deleting single-epoch compacted blobs")
})
}
return errors.Wrap(eg.Wait(), "error cleaning up index blobs")
}
func (cs *snapshot) safeToDeleteUncompactedBlob(bm blob.Metadata, safetyMargin time.Duration) bool {
epoch, ok := epochNumberFromBlobID(bm.BlobID)
if !ok {
return false
}
if epoch < cs.LatestFullCheckpointEpoch {
return true
}
cset := cs.SingleEpochCompactionSets[epoch]
if cset == nil {
// single-epoch compaction set does not exist for this epoch, don't delete.
return false
}
// compaction set was written sufficiently long ago to be reliably discovered by all
// other clients - we can delete uncompacted blobs for this epoch.
compactionSetWriteTime := blob.MaxTimestamp(cset)
return compactionSetWriteTime.Add(safetyMargin).Before(cs.EpochStartTime[cs.WriteEpoch])
}
func (e *Manager) refreshLocked(ctx context.Context) error {
return errors.Wrap(retry.WithExponentialBackoffNoValue(ctx, "epoch manager refresh", func() error {
return e.refreshAttemptLocked(ctx)
}, retry.Always), "error refreshing")
}
func (e *Manager) loadWriteEpoch(ctx context.Context, cs *snapshot) error {
blobs, err := blob.ListAllBlobs(ctx, e.st, epochMarkerIndexBlobPrefix)
if err != nil {
return errors.Wrap(err, "error loading write epoch")
}
for epoch, bm := range groupByEpochNumber(blobs) {
cs.EpochStartTime[epoch] = bm[0].Timestamp
if epoch > cs.WriteEpoch {
cs.WriteEpoch = epoch
}
}
return nil
}
func (e *Manager) loadFullCheckpoints(ctx context.Context, cs *snapshot) error {
blobs, err := blob.ListAllBlobs(ctx, e.st, fullCheckpointIndexBlobPrefix)
if err != nil {
return errors.Wrap(err, "error loading full checkpoints")
}
for epoch, bms := range groupByEpochNumber(blobs) {
if comp := findCompleteSetOfBlobs(bms); comp != nil {
cs.FullCheckpointSets[epoch] = comp
if epoch > cs.LatestFullCheckpointEpoch {
cs.LatestFullCheckpointEpoch = epoch
}
}
}
return nil
}
func (e *Manager) loadSingleEpochCompactions(ctx context.Context, cs *snapshot) error {
blobs, err := blob.ListAllBlobs(ctx, e.st, singleEpochCompactionBlobPrefix)
if err != nil {
return errors.Wrap(err, "error loading single-epoch compactions")
}
for epoch, bms := range groupByEpochNumber(blobs) {
if comp := findCompleteSetOfBlobs(bms); comp != nil {
cs.SingleEpochCompactionSets[epoch] = comp
}
}
return nil
}
func (e *Manager) maybeStartFullCheckpointAsync(ctx context.Context, cs snapshot) {
if cs.WriteEpoch-cs.LatestFullCheckpointEpoch < e.params.FullCheckpointFrequency {
return
}
e.backgroundWork.Add(1)
go func() {
defer e.backgroundWork.Done()
if err := e.generateFullCheckpointFromCommittedState(ctx, cs, cs.WriteEpoch-numUnsettledEpochs); err != nil {
e.log.Errorf("unable to generate full checkpoint: %v, performance will be affected", err)
}
}()
}
func (e *Manager) maybeStartCleanupAsync(ctx context.Context, cs snapshot) {
e.backgroundWork.Add(1)
go func() {
defer e.backgroundWork.Done()
if err := e.cleanupInternal(ctx, cs); err != nil {
e.log.Errorf("error cleaning up index blobs: %v, performance may be affected", err)
}
}()
}
// refreshAttemptLocked attempts to load the committedState of
// the index and updates `lastKnownState` state atomically when complete.
func (e *Manager) refreshAttemptLocked(ctx context.Context) error {
cs := snapshot{
WriteEpoch: 0,
EpochStartTime: map[int]time.Time{},
SingleEpochCompactionSets: map[int][]blob.Metadata{},
LatestFullCheckpointEpoch: 0,
FullCheckpointSets: map[int][]blob.Metadata{},
ValidUntil: e.timeFunc().Add(e.params.EpochRefreshFrequency),
}
eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
return e.loadWriteEpoch(ctx, &cs)
})
eg.Go(func() error {
return e.loadSingleEpochCompactions(ctx, &cs)
})
eg.Go(func() error {
return e.loadFullCheckpoints(ctx, &cs)
})
if err := eg.Wait(); err != nil {
return errors.Wrap(err, "error refreshing")
}
if e.timeFunc().After(cs.ValidUntil) {
atomic.AddInt32(e.committedStateRefreshTooSlow, 1)
return errors.Errorf("refreshing committed state took too long")
}
e.lastKnownState = cs
e.maybeStartFullCheckpointAsync(ctx, cs)
e.maybeStartCleanupAsync(ctx, cs)
e.log.Debugf("current epoch %v started at %v", cs.WriteEpoch, cs.EpochStartTime[cs.WriteEpoch])
return nil
}
func (e *Manager) committedState(ctx context.Context) (snapshot, error) {
e.mu.Lock()
defer e.mu.Unlock()
if e.timeFunc().After(e.lastKnownState.ValidUntil) {
if err := e.refreshLocked(ctx); err != nil {
return snapshot{}, err
}
}
return e.lastKnownState, nil
}
// Current returns the current epoch number.
func (e *Manager) Current(ctx context.Context) (int, error) {
cs, err := e.committedState(ctx)
if err != nil {
return 0, err
}
return cs.WriteEpoch, nil
}
// GetCompleteIndexSet returns the set of blobs forming a complete index set up to the provided epoch number.
func (e *Manager) GetCompleteIndexSet(ctx context.Context, maxEpoch int) ([]blob.ID, error) {
for {
cs, err := e.committedState(ctx)
if err != nil {
return nil, err
}
result, err := e.getCompleteIndexSetForCommittedState(ctx, cs, maxEpoch)
if e.timeFunc().Before(cs.ValidUntil) {
return result, err
}
// We need to retry if local process took too long (e.g. because the machine went
// to sleep at the wrong moment) and committed state is no longer valid.
//
// One scenario where this matters is cleanup: if determining the set of indexes takes
// too long, it's possible for a cleanup process to delete some of the uncompacted
// indexes that are still treated as authoritative according to old committed state.
//
// Retrying will re-examine the state of the world and re-do the logic.
e.log.Debugf("GetCompleteIndexSet took too long, retrying to ensure correctness")
atomic.AddInt32(e.getCompleteIndexSetTooSlow, 1)
}
}
func (e *Manager) getCompleteIndexSetForCommittedState(ctx context.Context, cs snapshot, maxEpoch int) ([]blob.ID, error) {
var (
startEpoch int
resultMutex sync.Mutex
result []blob.ID
)
for i := maxEpoch; i >= 0; i-- {
if blobs := cs.FullCheckpointSets[i]; blobs != nil {
result = append(result, blob.IDsFromMetadata(blobs)...)
startEpoch = i + 1
e.log.Debugf("using full checkpoint at epoch %v", i)
break
}
}
eg, ctx := errgroup.WithContext(ctx)
e.log.Debugf("adding incremental state for epochs %v..%v", startEpoch, maxEpoch)
for i := startEpoch; i <= maxEpoch; i++ {
i := i
eg.Go(func() error {
s, err := e.getIndexesFromEpochInternal(ctx, cs, i)
if err != nil {
return errors.Wrapf(err, "error getting indexes for epoch %v", i)
}
resultMutex.Lock()
result = append(result, s...)
resultMutex.Unlock()
return nil
})
}
return result, errors.Wrap(eg.Wait(), "error getting indexes")
}
// WroteIndex is invoked after writing an index blob. It will validate whether the index was written
// in the correct epoch.
func (e *Manager) WroteIndex(ctx context.Context, bm blob.Metadata) error {
cs, err := e.committedState(ctx)
if err != nil {
return err
}
epoch, ok := epochNumberFromBlobID(bm.BlobID)
if !ok {
return errors.Errorf("invalid blob ID written")
}
if cs.isSettledEpochNumber(epoch) {
return errors.Errorf("index write took to long")
}
e.invalidate()
return nil
}
func (e *Manager) getIndexesFromEpochInternal(ctx context.Context, cs snapshot, epoch int) ([]blob.ID, error) {
// check if the epoch is old enough to possibly have compacted blobs
epochSettled := cs.isSettledEpochNumber(epoch)
if epochSettled && cs.SingleEpochCompactionSets[epoch] != nil {
return blob.IDsFromMetadata(cs.SingleEpochCompactionSets[epoch]), nil
}
// load uncompacted blobs for this epoch
uncompactedBlobs, err := blob.ListAllBlobs(ctx, e.st, uncompactedEpochBlobPrefix(epoch))
if err != nil {
return nil, errors.Wrapf(err, "error listing uncompacted indexes for epoch %v", epoch)
}
// Ignore blobs written after the epoch has been settled.
//
// Epochs N is 'settled' after epoch N+2 has been started and that makes N subject to compaction,
// because at this point all clients will agree that we're in epoch N+1 or N+2.
//
// In a pathological case it's possible for client to write a blob for a 'settled' epoch if they:
//
// 1. determine current epoch number (N).
// 2. go to sleep for a very long time, enough for epoch >=N+2 to become current.
// 3. write blob for the epoch number N
uncompactedBlobs = blobsWrittenBefore(
uncompactedBlobs,
cs.EpochStartTime[epoch+numUnsettledEpochs],
)
if epochSettled {
e.backgroundWork.Add(1)
go func() {
defer e.backgroundWork.Done()
e.log.Debugf("starting single-epoch compaction of %v", epoch)
if err := e.compact(ctx, blob.IDsFromMetadata(uncompactedBlobs), compactedEpochBlobPrefix(epoch)); err != nil {
e.log.Errorf("unable to compact blobs for epoch %v: %v, performance will be affected", epoch, err)
}
}()
}
advance := shouldAdvance(uncompactedBlobs, e.params.MinEpochDuration, e.params.EpochAdvanceOnCountThreshold, e.params.EpochAdvanceOnTotalSizeBytesThreshold)
if advance && epoch == cs.WriteEpoch {
if err := e.advanceEpoch(ctx, cs.WriteEpoch+1); err != nil {
e.log.Errorf("unable to advance epoch: %v, performance will be affected", err)
}
}
// return uncompacted blobs to the caller while we're compacting them in background
return blob.IDsFromMetadata(uncompactedBlobs), nil
}
func (e *Manager) advanceEpoch(ctx context.Context, newEpoch int) error {
blobID := blob.ID(fmt.Sprintf("%v%v", string(epochMarkerIndexBlobPrefix), newEpoch))
if err := e.st.PutBlob(ctx, blobID, gather.FromSlice([]byte("epoch-marker"))); err != nil {
return errors.Wrap(err, "error writing epoch marker")
}
e.invalidate()
return nil
}
func (e *Manager) invalidate() {
e.mu.Lock()
defer e.mu.Unlock()
e.lastKnownState = snapshot{}
}
func (e *Manager) generateFullCheckpointFromCommittedState(ctx context.Context, cs snapshot, epoch int) error {
e.log.Debugf("generating full checkpoint until epoch %v", epoch)
completeSet, err := e.getCompleteIndexSetForCommittedState(ctx, cs, epoch)
if err != nil {
return errors.Wrap(err, "unable to get full checkpoint")
}
if e.timeFunc().After(cs.ValidUntil) {
return errors.Errorf("not generating full checkpoint - the committed state is no longer valid")
}
if err := e.compact(ctx, completeSet, fullCheckpointBlobPrefix(epoch)); err != nil {
return errors.Wrap(err, "unable to compact blobs")
}
return nil
}
func uncompactedEpochBlobPrefix(epoch int) blob.ID {
return blob.ID(fmt.Sprintf("%v%v_", uncompactedIndexBlobPrefix, epoch))
}
func compactedEpochBlobPrefix(epoch int) blob.ID {
return blob.ID(fmt.Sprintf("%v%v_", singleEpochCompactionBlobPrefix, epoch))
}
func fullCheckpointBlobPrefix(epoch int) blob.ID {
return blob.ID(fmt.Sprintf("%v%v_", fullCheckpointIndexBlobPrefix, epoch))
}
// NewManager creates new epoch manager.
func NewManager(st blob.Storage, params Parameters, compactor CompactionFunc, sharedBaseLogger logging.Logger) *Manager {
return &Manager{
st: st,
log: logging.WithPrefix("[epoch-manager] ", sharedBaseLogger),
compact: compactor,
timeFunc: clock.Now,
params: params,
getCompleteIndexSetTooSlow: new(int32),
committedStateRefreshTooSlow: new(int32),
}
}

View File

@@ -0,0 +1,445 @@
package epoch
import (
"context"
"encoding/json"
"fmt"
"math"
"math/rand"
"sort"
"sync/atomic"
"testing"
"time"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/internal/blobtesting"
"github.com/kopia/kopia/internal/faketime"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/blob/logging"
)
const verifyAllEpochs = -1
type fakeIndex struct {
Entries []int `json:"entries"`
}
func (n *fakeIndex) Bytes() []byte {
v, err := json.Marshal(n)
if err != nil {
panic("err: " + err.Error())
}
return v
}
func parseFakeIndex(b []byte) (*fakeIndex, error) {
r := &fakeIndex{}
err := json.Unmarshal(b, &r)
return r, errors.Wrap(err, "error unmashaling JSON")
}
func newFakeIndexWithEntries(entries ...int) *fakeIndex {
return &fakeIndex{
Entries: entries,
}
}
type epochManagerTestEnv struct {
data blobtesting.DataMap
unloggedst blob.Storage
st blob.Storage
ft *faketime.ClockTimeWithOffset
mgr *Manager
faultyStorage *blobtesting.FaultyStorage
}
func (te *epochManagerTestEnv) compact(ctx context.Context, blobs []blob.ID, prefix blob.ID) error {
merged, err := te.getMergedIndexContents(ctx, blobs)
if err != nil {
return errors.Wrap(err, "unable to merge")
}
return errors.Wrap(
te.st.PutBlob(ctx, blob.ID(fmt.Sprintf("%v%016x-s0-c1", prefix, rand.Int63())), gather.FromSlice(merged.Bytes())),
"PutBlob error")
}
// write two dummy compaction blobs instead of 3, simulating a compaction that crashed before fully complete.
func (te *epochManagerTestEnv) interruptedCompaction(ctx context.Context, _ []blob.ID, prefix blob.ID) error {
sess := rand.Int63()
te.st.PutBlob(ctx, blob.ID(fmt.Sprintf("%v%016x-s%v-c3", prefix, sess, rand.Int63())), gather.FromSlice([]byte("dummy")))
te.st.PutBlob(ctx, blob.ID(fmt.Sprintf("%v%016x-s%v-c3", prefix, sess, rand.Int63())), gather.FromSlice([]byte("dummy")))
return errors.Errorf("failed for some reason")
}
func newTestEnv(t *testing.T) *epochManagerTestEnv {
t.Helper()
data := blobtesting.DataMap{}
ft := faketime.NewClockTimeWithOffset(0)
st := blobtesting.NewMapStorage(data, nil, ft.NowFunc())
unloggedst := st
fs := &blobtesting.FaultyStorage{
Base: st,
}
st = fs
st = logging.NewWrapper(st, t.Logf, "[STORAGE] ")
te := &epochManagerTestEnv{unloggedst: unloggedst, st: st, ft: ft}
m := NewManager(te.st, Parameters{
EpochRefreshFrequency: 20 * time.Minute,
FullCheckpointFrequency: 7,
CleanupSafetyMargin: 1 * time.Hour,
MinEpochDuration: 12 * time.Hour,
EpochAdvanceOnCountThreshold: 25,
EpochAdvanceOnTotalSizeBytesThreshold: 20 << 20,
DeleteParallelism: 1,
}, te.compact, testlogging.NewTestLogger(t))
m.timeFunc = te.ft.NowFunc()
te.mgr = m
te.faultyStorage = fs
te.data = data
return te
}
func TestIndexEpochManager_Regular(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
verifySequentialWrites(t, te)
}
func TestIndexEpochManager_RogueBlobs(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
te.data[epochMarkerIndexBlobPrefix+"zzzz"] = []byte{1}
te.data[singleEpochCompactionBlobPrefix+"zzzz"] = []byte{1}
te.data[fullCheckpointIndexBlobPrefix+"zzzz"] = []byte{1}
verifySequentialWrites(t, te)
te.mgr.Cleanup(testlogging.Context(t))
}
func TestIndexEpochManager_CompactionSilentlyDoesNothing(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
// set up test environment in which compactions never succeed for whatever reason.
te.mgr.compact = func(ctx context.Context, blobIDs []blob.ID, outputPrefix blob.ID) error {
return nil
}
verifySequentialWrites(t, te)
}
func TestIndexEpochManager_CompactionAlwaysFails(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
// set up test environment in which compactions never succeed for whatever reason.
te.mgr.compact = func(ctx context.Context, blobIDs []blob.ID, outputPrefix blob.ID) error {
return nil
}
verifySequentialWrites(t, te)
}
func TestIndexEpochManager_CompactionRandomlyCrashed(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
// set up test environment in which compactions never succeed for whatever reason.
te.mgr.compact = func(ctx context.Context, blobIDs []blob.ID, outputPrefix blob.ID) error {
if rand.Intn(100) < 20 {
return te.interruptedCompaction(ctx, blobIDs, outputPrefix)
}
return te.compact(ctx, blobIDs, outputPrefix)
}
verifySequentialWrites(t, te)
}
func TestIndexEpochManager_DeletionFailing(t *testing.T) {
t.Parallel()
te := newTestEnv(t)
te.faultyStorage.Faults = map[string][]*blobtesting.Fault{
"DeleteBlob": {
{Repeat: math.MaxInt32, Err: errors.Errorf("something bad happened")},
},
}
// set up test environment in which compactions never succeed for whatever reason.
te.mgr.compact = func(ctx context.Context, blobIDs []blob.ID, outputPrefix blob.ID) error {
if rand.Intn(100) < 20 {
return te.interruptedCompaction(ctx, blobIDs, outputPrefix)
}
return te.compact(ctx, blobIDs, outputPrefix)
}
verifySequentialWrites(t, te)
}
func TestRefreshRetriesIfTakingTooLong(t *testing.T) {
te := newTestEnv(t)
defer te.mgr.Flush()
te.faultyStorage.Faults = map[string][]*blobtesting.Fault{
"ListBlobs": {
&blobtesting.Fault{
Repeat: 4, // refresh does 3 lists, so this will cause 2 unsuccessful retries
ErrCallback: func() error {
te.ft.Advance(24 * time.Hour)
return nil
},
},
},
}
ctx := testlogging.Context(t)
require.NoError(t, te.mgr.Refresh(ctx))
require.EqualValues(t, 2, *te.mgr.committedStateRefreshTooSlow)
}
func TestGetCompleteIndexSetRetriesIfTookTooLong(t *testing.T) {
te := newTestEnv(t)
defer te.mgr.Flush()
ctx := testlogging.Context(t)
// load committed state
require.NoError(t, te.mgr.Refresh(ctx))
cnt := new(int32)
// ensure we're not running any background goroutines before modifying 'Faults'
te.mgr.Flush()
te.faultyStorage.Faults = map[string][]*blobtesting.Fault{
"ListBlobs": {
&blobtesting.Fault{
Repeat: 1000,
ErrCallback: func() error {
if atomic.AddInt32(cnt, 1) == 1 {
te.ft.Advance(24 * time.Hour)
}
return nil
},
},
},
}
_, err := te.mgr.GetCompleteIndexSet(ctx, 0)
require.NoError(t, err)
require.EqualValues(t, 1, *te.mgr.getCompleteIndexSetTooSlow)
}
func TestLateWriteIsIgnored(t *testing.T) {
te := newTestEnv(t)
defer te.mgr.Flush()
ctx := testlogging.Context(t)
// get current epoch number
epoch, err := te.mgr.Current(ctx)
require.NoError(t, err)
var rnd [8]byte
rand.Read(rnd[:])
blobID1 := blob.ID(fmt.Sprintf("%v%v_%x", uncompactedIndexBlobPrefix, epoch, rnd[:]))
rand.Read(rnd[:])
blobID2 := blob.ID(fmt.Sprintf("%v%v_%x", uncompactedIndexBlobPrefix, epoch, rnd[:]))
// at this point it's possible that the process hangs for a very long time, during which the
// current epoch moves by 2. This would be dangerous, since we'd potentially modify an already
// settled epoch.
// To verify this, we call WroteIndex() after the write which will fail if the write finished
// late. During read we will ignore index files with dates that are too late.
// simulate process process hanging for a very long time, during which time the epoch moves.
for i := 0; i < 30; i++ {
te.mustWriteIndexFile(ctx, t, newFakeIndexWithEntries(100+i))
te.ft.Advance(time.Hour)
}
// epoch advance is triggered during reads.
_, err = te.mgr.GetCompleteIndexSet(ctx, epoch+1)
require.NoError(t, err)
// make sure the epoch has moved
epoch2, err := te.mgr.Current(ctx)
require.NoError(t, err)
require.Equal(t, epoch+1, epoch2)
require.NoError(t, te.st.PutBlob(ctx, blobID1, gather.FromSlice([]byte("dummy"))))
bm, err := te.unloggedst.GetMetadata(ctx, blobID1)
require.NoError(t, err)
// it's not an error to finish the write in the next epoch.
require.NoError(t, te.mgr.WroteIndex(ctx, bm))
// move the epoch one more.
for i := 0; i < 30; i++ {
te.mustWriteIndexFile(ctx, t, newFakeIndexWithEntries(100+i))
te.ft.Advance(time.Hour)
}
// epoch advance is triggered during reads.
_, err = te.mgr.GetCompleteIndexSet(ctx, epoch+2)
require.NoError(t, err)
// make sure the epoch has moved
epoch3, err := te.mgr.Current(ctx)
require.NoError(t, err)
require.Equal(t, epoch+2, epoch3)
// on Windows the time does not always move forward, give it a nudge.
te.ft.Advance(2 * time.Second)
require.NoError(t, te.st.PutBlob(ctx, blobID2, gather.FromSlice([]byte("dummy"))))
bm, err = te.unloggedst.GetMetadata(ctx, blobID2)
require.NoError(t, err)
// at this point WroteIndex() will fail because epoch #0 is already settled.
require.Error(t, te.mgr.WroteIndex(ctx, bm))
iset, err := te.mgr.GetCompleteIndexSet(ctx, epoch3)
require.NoError(t, err)
// blobID1 will be included in the index.
require.Contains(t, iset, blobID1)
// blobID2 will be excluded from the index.
require.NotContains(t, iset, blobID2)
}
// nolint:thelper
func verifySequentialWrites(t *testing.T, te *epochManagerTestEnv) {
ctx := testlogging.Context(t)
expected := &fakeIndex{}
endTime := te.ft.NowFunc()().Add(90 * 24 * time.Hour)
indexNum := 1
for te.ft.NowFunc()().Before(endTime) {
indexNum++
te.mustWriteIndexFile(ctx, t, newFakeIndexWithEntries(indexNum))
expected.Entries = append(expected.Entries, indexNum)
te.verifyCompleteIndexSet(ctx, t, verifyAllEpochs, expected)
dt := randomTime(1*time.Minute, 8*time.Hour)
t.Logf("advancing time by %v", dt)
te.ft.Advance(dt)
if indexNum%7 == 0 {
require.NoError(t, te.mgr.Refresh(ctx))
}
if indexNum%27 == 0 {
// do not require.NoError because we'll be sometimes inducing faults
te.mgr.Cleanup(ctx)
}
}
te.mgr.Flush()
for k, v := range te.data {
t.Logf("data: %v (%v)", k, len(v))
}
t.Logf("total written %v", indexNum)
t.Logf("total remaining %v", len(te.data))
}
func randomTime(min, max time.Duration) time.Duration {
return time.Duration(float64(max-min)*rand.Float64() + float64(min))
}
func (te *epochManagerTestEnv) verifyCompleteIndexSet(ctx context.Context, t *testing.T, maxEpoch int, want *fakeIndex) {
t.Helper()
if maxEpoch == verifyAllEpochs {
n, err := te.mgr.Current(ctx)
require.NoError(t, err)
maxEpoch = n + 1
}
blobs, err := te.mgr.GetCompleteIndexSet(ctx, maxEpoch)
t.Logf("complete set length: %v", len(blobs))
require.NoError(t, err)
merged, err := te.getMergedIndexContents(ctx, blobs)
require.NoError(t, err)
require.Equal(t, want.Entries, merged.Entries)
}
func (te *epochManagerTestEnv) getMergedIndexContents(ctx context.Context, blobIDs []blob.ID) (*fakeIndex, error) {
result := &fakeIndex{}
for _, blobID := range blobIDs {
v, err := te.unloggedst.GetBlob(ctx, blobID, 0, -1)
if err != nil {
return nil, errors.Wrap(err, "unable to get blob")
}
ndx, err := parseFakeIndex(v)
if err != nil {
return nil, errors.Wrap(err, "unable to parse fake index")
}
result.Entries = append(result.Entries, ndx.Entries...)
}
sort.Ints(result.Entries)
return result, nil
}
func (te *epochManagerTestEnv) mustWriteIndexFile(ctx context.Context, t *testing.T, ndx *fakeIndex) {
t.Helper()
epoch, err := te.mgr.Current(ctx)
require.NoError(t, err)
var rnd [8]byte
rand.Read(rnd[:])
blobID := blob.ID(fmt.Sprintf("%v%v_%x", uncompactedIndexBlobPrefix, epoch, rnd[:]))
require.NoError(t, te.st.PutBlob(ctx, blobID, gather.FromSlice(ndx.Bytes())))
bm, err := te.unloggedst.GetMetadata(ctx, blobID)
require.NoError(t, err)
require.NoError(t, te.mgr.WroteIndex(ctx, bm))
}

View File

@@ -0,0 +1,57 @@
package epoch
import (
"strconv"
"strings"
"time"
"unicode"
"github.com/kopia/kopia/repo/blob"
)
// epochNumberFromBlobID extracts the epoch number from a string formatted as
// <prefix><epochNumber>_<remainder>.
func epochNumberFromBlobID(blobID blob.ID) (int, bool) {
s := string(blobID)
if p := strings.IndexByte(s, '_'); p >= 0 {
s = s[0:p]
}
for len(s) > 0 && !unicode.IsDigit(rune(s[0])) {
s = s[1:]
}
n, err := strconv.Atoi(s)
if err != nil {
return 0, false
}
return n, true
}
func blobsWrittenBefore(bms []blob.Metadata, maxTime time.Time) []blob.Metadata {
var result []blob.Metadata
for _, bm := range bms {
if !maxTime.IsZero() && bm.Timestamp.After(maxTime) {
continue
}
result = append(result, bm)
}
return result
}
func groupByEpochNumber(bms []blob.Metadata) map[int][]blob.Metadata {
result := map[int][]blob.Metadata{}
for _, bm := range bms {
if n, ok := epochNumberFromBlobID(bm.BlobID); ok {
result[n] = append(result[n], bm)
}
}
return result
}

View File

@@ -0,0 +1,106 @@
package epoch
import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/repo/blob"
)
func TestEpochNumberFromBlobID(t *testing.T) {
cases := []struct {
input blob.ID
want int
}{
{"pppp9", 9},
{"x7", 7},
{"x01234_1235", 1234},
{"x0_1235", 0},
{"abc01234_", 1234},
{"abc1234_", 1234},
{"abc1234_xxxx-sadfasd", 1234},
}
for _, tc := range cases {
n, ok := epochNumberFromBlobID(tc.input)
require.True(t, ok, tc.input)
require.Equal(t, tc.want, n)
}
}
func TestEpochNumberFromBlobID_Invalid(t *testing.T) {
cases := []blob.ID{
"_",
"a_",
"x123x_",
}
for _, tc := range cases {
_, ok := epochNumberFromBlobID(tc)
require.False(t, ok, "epochNumberFromBlobID(%v)", tc)
}
}
func TestBlobsWrittenBefore(t *testing.T) {
t0 := time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)
bm0 := blob.Metadata{BlobID: "a", Timestamp: t0}
t1 := time.Date(2000, 1, 2, 0, 0, 0, 0, time.UTC)
bm1 := blob.Metadata{BlobID: "b", Timestamp: t1}
t2 := time.Date(2000, 1, 3, 0, 0, 0, 0, time.UTC)
bm2 := blob.Metadata{BlobID: "c", Timestamp: t2}
cases := []struct {
bms []blob.Metadata
cutoff time.Time
want []blob.Metadata
}{
{[]blob.Metadata{bm0, bm1, bm2}, time.Time{}, []blob.Metadata{bm0, bm1, bm2}},
{[]blob.Metadata{bm0, bm1, bm2}, t0.Add(-1), nil},
{[]blob.Metadata{bm0, bm1, bm2}, t0, []blob.Metadata{bm0}},
{[]blob.Metadata{bm0, bm1, bm2}, t1.Add(-1), []blob.Metadata{bm0}},
{[]blob.Metadata{bm0, bm1, bm2}, t1, []blob.Metadata{bm0, bm1}},
{[]blob.Metadata{bm0, bm1, bm2}, t2.Add(-1), []blob.Metadata{bm0, bm1}},
{[]blob.Metadata{bm0, bm1, bm2}, t2, []blob.Metadata{bm0, bm1, bm2}},
}
for i, tc := range cases {
require.Equal(t, tc.want, blobsWrittenBefore(tc.bms, tc.cutoff), "#%v blobsWrittenBefore(%v,%v)", i, tc.bms, tc.cutoff)
}
}
func TestGroupByEpochNumber(t *testing.T) {
cases := []struct {
input []blob.Metadata
want map[int][]blob.Metadata
}{
{
input: []blob.Metadata{
{BlobID: "e1_abc"},
{BlobID: "e2_cde"},
{BlobID: "e1_def"},
{BlobID: "e3_fgh"},
},
want: map[int][]blob.Metadata{
1: {
{BlobID: "e1_abc"},
{BlobID: "e1_def"},
},
2: {
{BlobID: "e2_cde"},
},
3: {
{BlobID: "e3_fgh"},
},
},
},
}
for _, tc := range cases {
got := groupByEpochNumber(tc.input)
require.Equal(t, tc.want, got)
}
}

View File

@@ -2,6 +2,7 @@
package faketime
import (
"sync"
"sync/atomic"
"time"
@@ -59,6 +60,7 @@ func (t *TimeAdvance) Advance(dt time.Duration) time.Time {
// ClockTimeWithOffset allows controlling the passage of time. Intended to be used in
// tests.
type ClockTimeWithOffset struct {
mu sync.Mutex
offset time.Duration
}
@@ -70,6 +72,9 @@ func NewClockTimeWithOffset(offset time.Duration) *ClockTimeWithOffset {
// NowFunc returns a time provider function for t.
func (t *ClockTimeWithOffset) NowFunc() func() time.Time {
return func() time.Time {
t.mu.Lock()
defer t.mu.Unlock()
return clock.Now().Add(t.offset)
}
}
@@ -77,6 +82,10 @@ func (t *ClockTimeWithOffset) NowFunc() func() time.Time {
// Advance advances t by dt, such that the next call to t.NowFunc()() returns
// current t + dt.
func (t *ClockTimeWithOffset) Advance(dt time.Duration) time.Time {
t.mu.Lock()
defer t.mu.Unlock()
t.offset += dt
return clock.Now().Add(t.offset)
}