mirror of
https://github.com/kopia/kopia.git
synced 2026-04-22 23:18:03 -04:00
feat(general): Upgrade lock retries & monitoring during open & write sessions (#1758)
* background upgrade lock monitor * retry lock forever on connect * pr feedback * remove time computations under read lock for efficiency * extend the unit test to cover lock monitoring with a controlled time function * more cleanup Co-authored-by: Shikhar Mall <small@kopia.io>
This commit is contained in:
@@ -168,12 +168,12 @@ func (e *Environment) MustReopen(tb testing.TB, openOpts ...func(*repo.Options))
|
||||
}
|
||||
|
||||
// MustOpenAnother opens another repository backend by the same storage.
|
||||
func (e *Environment) MustOpenAnother(tb testing.TB) repo.RepositoryWriter {
|
||||
func (e *Environment) MustOpenAnother(tb testing.TB, openOpts ...func(*repo.Options)) repo.RepositoryWriter {
|
||||
tb.Helper()
|
||||
|
||||
ctx := testlogging.Context(tb)
|
||||
|
||||
rep2, err := repo.Open(ctx, e.ConfigFile(), e.Password, &repo.Options{})
|
||||
rep2, err := repo.Open(ctx, e.ConfigFile(), e.Password, repoOptions(openOpts))
|
||||
if err != nil {
|
||||
tb.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
@@ -34,6 +34,13 @@ func WithExponentialBackoff(ctx context.Context, desc string, attempt AttemptFun
|
||||
return internalRetry(ctx, desc, attempt, isRetriableError, retryInitialSleepAmount, retryMaxSleepAmount, maxAttempts, retryExponent)
|
||||
}
|
||||
|
||||
// WithExponentialBackoffMaxRetries is the same as WithExponentialBackoff,
|
||||
// additionally it allows customizing the max number of retries before giving
|
||||
// up (count parameter). A negative value for count would run this forever.
|
||||
func WithExponentialBackoffMaxRetries(ctx context.Context, count int, desc string, attempt AttemptFunc, isRetriableError IsRetriableFunc) (interface{}, error) {
|
||||
return internalRetry(ctx, desc, attempt, isRetriableError, retryInitialSleepAmount, retryMaxSleepAmount, count, retryExponent)
|
||||
}
|
||||
|
||||
// Periodically runs the provided attempt until it succeeds, waiting given fixed amount between attempts.
|
||||
func Periodically(ctx context.Context, interval time.Duration, count int, desc string, attempt AttemptFunc, isRetriableError IsRetriableFunc) (interface{}, error) {
|
||||
return internalRetry(ctx, desc, attempt, isRetriableError, interval, interval, count, 1)
|
||||
@@ -54,9 +61,12 @@ func PeriodicallyNoValue(ctx context.Context, interval time.Duration, count int,
|
||||
func internalRetry(ctx context.Context, desc string, attempt AttemptFunc, isRetriableError IsRetriableFunc, initial, max time.Duration, count int, factor float64) (interface{}, error) {
|
||||
sleepAmount := initial
|
||||
|
||||
var lastError error
|
||||
var (
|
||||
lastError error
|
||||
i = 0
|
||||
)
|
||||
|
||||
for i := 0; i < count; i++ {
|
||||
for ; i < count || count < 0; i++ {
|
||||
if cerr := ctx.Err(); cerr != nil {
|
||||
// nolint:wrapcheck
|
||||
return nil, cerr
|
||||
@@ -82,7 +92,7 @@ func internalRetry(ctx context.Context, desc string, attempt AttemptFunc, isRetr
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errors.Wrapf(lastError, "unable to complete %v despite %v retries", desc, count)
|
||||
return nil, errors.Wrapf(lastError, "unable to complete %v despite %v retries", desc, i)
|
||||
}
|
||||
|
||||
// WithExponentialBackoffNoValue is a shorthand for WithExponentialBackoff except the
|
||||
|
||||
@@ -71,3 +71,15 @@ func NewWrapper(wrapped blob.Storage, onGetBlob onGetBlobCallback, onGetMetadata
|
||||
onPutBlob: onPutBlob,
|
||||
}
|
||||
}
|
||||
|
||||
// NewUniformWrapper is same as NewWrapper except that it only accepts a single
|
||||
// uniform callback for all the operations for simpler use-cases.
|
||||
func NewUniformWrapper(wrapped blob.Storage, cb callback) blob.Storage {
|
||||
return &beforeOp{
|
||||
Storage: wrapped,
|
||||
onGetBlob: func(_ blob.ID) error { return cb() },
|
||||
onGetMetadata: cb,
|
||||
onDeleteBlob: cb,
|
||||
onPutBlob: func(_ blob.ID, _ *blob.PutOptions) error { return cb() },
|
||||
}
|
||||
}
|
||||
|
||||
209
repo/open.go
209
repo/open.go
@@ -6,6 +6,7 @@
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
@@ -16,6 +17,7 @@
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/internal/epoch"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/internal/retry"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/blob/beforeop"
|
||||
loggingwrapper "github.com/kopia/kopia/repo/blob/logging"
|
||||
@@ -69,6 +71,7 @@ type Options struct {
|
||||
TraceStorage bool // Logs all storage access using provided Printf-style function
|
||||
TimeNowFunc func() time.Time // Time provider
|
||||
DisableInternalLog bool // Disable internal log
|
||||
UpgradeOwnerID string // Owner-ID of any upgrade in progress, when this is not set the access may be restricted
|
||||
}
|
||||
|
||||
// ErrInvalidPassword is returned when repository password is invalid.
|
||||
@@ -183,66 +186,109 @@ func openDirect(ctx context.Context, configFile string, lc *LocalConfig, passwor
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// openWithConfig opens the repository with a given configuration, avoiding the need for a config file.
|
||||
// nolint:funlen,gocyclo
|
||||
func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, password string, options *Options, caching *content.CachingOptions, configFile string) (DirectRepository, error) {
|
||||
caching = caching.CloneOrDefault()
|
||||
type unpackedFormatBlob struct {
|
||||
f *formatBlob
|
||||
fb []byte // serialized format blob
|
||||
cacheMTime time.Time // mod time of the format blob cache file
|
||||
repoConfig *repositoryObjectFormat // unencrypted format blob structure
|
||||
formatEncryptionKey []byte // key derived from the password
|
||||
}
|
||||
|
||||
// Read repo blobs, potentially from cache.
|
||||
fb, rb, err := readAndCacheRepositoryBlobs(ctx, st, caching.CacheDirectory, lc.FormatBlobCacheDuration)
|
||||
func readAndCacheRepoConfig(ctx context.Context, st blob.Storage, password string, cacheOpts *content.CachingOptions, validDuration time.Duration) (ufb *unpackedFormatBlob, err error) {
|
||||
ufb = &unpackedFormatBlob{}
|
||||
|
||||
// Read format blob, potentially from cache.
|
||||
ufb.fb, ufb.cacheMTime, err = readAndCacheRepositoryBlobBytes(ctx, st, cacheOpts.CacheDirectory, FormatBlobID, validDuration)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to read repository blobs")
|
||||
return nil, errors.Wrap(err, "unable to read format blob")
|
||||
}
|
||||
|
||||
if err = writeCacheMarker(caching.CacheDirectory); err != nil {
|
||||
if err = writeCacheMarker(cacheOpts.CacheDirectory); err != nil {
|
||||
return nil, errors.Wrap(err, "unable to write cache directory marker")
|
||||
}
|
||||
|
||||
f, err := parseFormatBlob(fb)
|
||||
ufb.f, err = parseFormatBlob(ufb.fb)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't parse format blob")
|
||||
}
|
||||
|
||||
fb, err = addFormatBlobChecksumAndLength(fb)
|
||||
ufb.fb, err = addFormatBlobChecksumAndLength(ufb.fb)
|
||||
if err != nil {
|
||||
return nil, errors.Errorf("unable to add checksum")
|
||||
}
|
||||
|
||||
formatEncryptionKey, err := f.deriveFormatEncryptionKeyFromPassword(password)
|
||||
ufb.formatEncryptionKey, err = ufb.f.deriveFormatEncryptionKeyFromPassword(password)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
repoConfig, err := f.decryptFormatBytes(formatEncryptionKey)
|
||||
ufb.repoConfig, err = ufb.f.decryptFormatBytes(ufb.formatEncryptionKey)
|
||||
if err != nil {
|
||||
return nil, ErrInvalidPassword
|
||||
}
|
||||
|
||||
blobcfg, err := deserializeBlobCfgBytes(f, rb, formatEncryptionKey)
|
||||
return ufb, nil
|
||||
}
|
||||
|
||||
// openWithConfig opens the repository with a given configuration, avoiding the need for a config file.
|
||||
// nolint:funlen,gocyclo
|
||||
func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, password string, options *Options, cacheOpts *content.CachingOptions, configFile string) (DirectRepository, error) {
|
||||
cacheOpts = cacheOpts.CloneOrDefault()
|
||||
cmOpts := &content.ManagerOptions{
|
||||
TimeNow: defaultTime(options.TimeNowFunc),
|
||||
DisableInternalLog: options.DisableInternalLog,
|
||||
}
|
||||
|
||||
var ufb *unpackedFormatBlob
|
||||
|
||||
if _, err := retry.WithExponentialBackoffMaxRetries(ctx, -1, "read repo config and wait for upgrade", func() (interface{}, error) {
|
||||
var internalErr error
|
||||
ufb, internalErr = readAndCacheRepoConfig(ctx, st, password, cacheOpts,
|
||||
lc.FormatBlobCacheDuration)
|
||||
if internalErr != nil {
|
||||
return nil, internalErr
|
||||
}
|
||||
|
||||
// retry if upgrade lock has been taken
|
||||
if locked, _ := ufb.repoConfig.UpgradeLock.IsLocked(cmOpts.TimeNow()); locked && options.UpgradeOwnerID != ufb.repoConfig.UpgradeLock.OwnerID {
|
||||
return nil, ErrRepositoryUnavailableDueToUpgrageInProgress
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}, func(internalErr error) bool {
|
||||
return errors.Is(internalErr, ErrRepositoryUnavailableDueToUpgrageInProgress)
|
||||
}); err != nil {
|
||||
// nolint:wrapcheck
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cmOpts.RepositoryFormatBytes = ufb.fb
|
||||
|
||||
// Read blobcfg blob, potentially from cache.
|
||||
bb, _, err := readAndCacheRepositoryBlobBytes(ctx, st, cacheOpts.CacheDirectory, BlobCfgBlobID, lc.FormatBlobCacheDuration)
|
||||
if err != nil && !errors.Is(err, blob.ErrBlobNotFound) {
|
||||
return nil, errors.Wrap(err, "unable to read blobcfg blob")
|
||||
}
|
||||
|
||||
blobcfg, err := deserializeBlobCfgBytes(ufb.f, bb, ufb.formatEncryptionKey)
|
||||
if err != nil {
|
||||
return nil, ErrInvalidPassword
|
||||
}
|
||||
|
||||
if repoConfig.FormattingOptions.EnablePasswordChange {
|
||||
caching.HMACSecret = deriveKeyFromMasterKey(repoConfig.HMACSecret, f.UniqueID, localCacheIntegrityPurpose, localCacheIntegrityHMACSecretLength)
|
||||
if ufb.repoConfig.FormattingOptions.EnablePasswordChange {
|
||||
cacheOpts.HMACSecret = deriveKeyFromMasterKey(ufb.repoConfig.HMACSecret, ufb.f.UniqueID, localCacheIntegrityPurpose, localCacheIntegrityHMACSecretLength)
|
||||
} else {
|
||||
// deriving from formatEncryptionKey was actually a bug, that only matters will change when we change the password
|
||||
caching.HMACSecret = deriveKeyFromMasterKey(formatEncryptionKey, f.UniqueID, localCacheIntegrityPurpose, localCacheIntegrityHMACSecretLength)
|
||||
// deriving from ufb.formatEncryptionKey was actually a bug, that only matters will change when we change the password
|
||||
cacheOpts.HMACSecret = deriveKeyFromMasterKey(ufb.formatEncryptionKey, ufb.f.UniqueID, localCacheIntegrityPurpose, localCacheIntegrityHMACSecretLength)
|
||||
}
|
||||
|
||||
fo := &repoConfig.FormattingOptions
|
||||
fo := &ufb.repoConfig.FormattingOptions
|
||||
|
||||
if fo.MaxPackSize == 0 {
|
||||
// legacy only, apply default
|
||||
fo.MaxPackSize = 20 << 20 // nolint:gomnd
|
||||
}
|
||||
|
||||
cmOpts := &content.ManagerOptions{
|
||||
RepositoryFormatBytes: fb,
|
||||
TimeNow: defaultTime(options.TimeNowFunc),
|
||||
DisableInternalLog: options.DisableInternalLog,
|
||||
}
|
||||
|
||||
// do not embed repository format info in pack blobs when password change is enabled.
|
||||
if fo.EnablePasswordChange {
|
||||
cmOpts.RepositoryFormatBytes = nil
|
||||
@@ -257,7 +303,11 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw
|
||||
st = wrapLockingStorage(st, blobcfg)
|
||||
}
|
||||
|
||||
scm, err := content.NewSharedManager(ctx, st, fo, caching, cmOpts)
|
||||
// background/interleaving upgrade lock storage monitor
|
||||
st = upgradeLockMonitor(ctx, options.UpgradeOwnerID, st, password, cacheOpts, lc.FormatBlobCacheDuration,
|
||||
ufb.cacheMTime, cmOpts.TimeNow)
|
||||
|
||||
scm, err := content.NewSharedManager(ctx, st, fo, cacheOpts, cmOpts)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to create shared content manager")
|
||||
}
|
||||
@@ -267,7 +317,7 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw
|
||||
SessionHost: lc.Hostname,
|
||||
}, "")
|
||||
|
||||
om, err := object.NewObjectManager(ctx, cm, repoConfig.Format)
|
||||
om, err := object.NewObjectManager(ctx, cm, ufb.repoConfig.Format)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to open object manager")
|
||||
}
|
||||
@@ -285,11 +335,11 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw
|
||||
sm: scm,
|
||||
throttler: throttler,
|
||||
directRepositoryParameters: directRepositoryParameters{
|
||||
uniqueID: f.UniqueID,
|
||||
cachingOptions: *caching,
|
||||
formatBlob: f,
|
||||
uniqueID: ufb.f.UniqueID,
|
||||
cachingOptions: *cacheOpts,
|
||||
formatBlob: ufb.f,
|
||||
blobCfgBlob: blobcfg,
|
||||
formatEncryptionKey: formatEncryptionKey,
|
||||
formatEncryptionKey: ufb.formatEncryptionKey,
|
||||
timeNow: cmOpts.TimeNow,
|
||||
cliOpts: lc.ClientOptions.ApplyDefaults(ctx, "Repository in "+st.DisplayName()),
|
||||
configFile: configFile,
|
||||
@@ -333,6 +383,61 @@ func addThrottler(ctx context.Context, st blob.Storage) (blob.Storage, throttlin
|
||||
return throttling.NewWrapper(st, throttler), throttler, nil
|
||||
}
|
||||
|
||||
func upgradeLockMonitor(
|
||||
ctx context.Context,
|
||||
upgradeOwnerID string,
|
||||
st blob.Storage,
|
||||
password string,
|
||||
cacheOpts *content.CachingOptions,
|
||||
lockRefreshInterval time.Duration,
|
||||
lastSync time.Time,
|
||||
now func() time.Time,
|
||||
) blob.Storage {
|
||||
var (
|
||||
m sync.RWMutex
|
||||
nextSync = lastSync.Add(lockRefreshInterval)
|
||||
)
|
||||
|
||||
cb := func() error {
|
||||
// protected read for nextSync because it will be shared between
|
||||
// parallel storage operations
|
||||
m.RLock()
|
||||
if nextSync.After(now()) {
|
||||
m.RUnlock()
|
||||
return nil
|
||||
}
|
||||
m.RUnlock()
|
||||
|
||||
// upgrade the lock and verify again in-case someone else won the race to refresh
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
if nextSync.After(now()) {
|
||||
return nil
|
||||
}
|
||||
|
||||
ufb, err := readAndCacheRepoConfig(ctx, st, password, cacheOpts, lockRefreshInterval)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// only allow the upgrade owner to perform storage operations
|
||||
if locked, _ := ufb.repoConfig.UpgradeLock.IsLocked(now()); locked && upgradeOwnerID != ufb.repoConfig.UpgradeLock.OwnerID {
|
||||
return ErrRepositoryUnavailableDueToUpgrageInProgress
|
||||
}
|
||||
|
||||
// prevent backward jumps on nextSync
|
||||
newNextSync := ufb.cacheMTime.Add(lockRefreshInterval)
|
||||
if newNextSync.After(nextSync) {
|
||||
nextSync = newNextSync
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return beforeop.NewUniformWrapper(st, cb)
|
||||
}
|
||||
|
||||
func throttlingLimitsFromConnectionInfo(ctx context.Context, ci blob.ConnectionInfo) throttling.Limits {
|
||||
v, err := json.Marshal(ci.Config)
|
||||
if err != nil {
|
||||
@@ -387,25 +492,31 @@ func formatBytesCachingEnabled(cacheDirectory string, validDuration time.Duratio
|
||||
return validDuration > 0
|
||||
}
|
||||
|
||||
func readRepositoryBlobBytesFromCache(ctx context.Context, cachedFile string, validDuration time.Duration) ([]byte, error) {
|
||||
func readRepositoryBlobBytesFromCache(ctx context.Context, cachedFile string, validDuration time.Duration) (data []byte, cacheMTime time.Time, err error) {
|
||||
cst, err := os.Stat(cachedFile)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "unable to open cache file")
|
||||
return nil, time.Time{}, errors.Wrap(err, "unable to open cache file")
|
||||
}
|
||||
|
||||
if clock.Now().Sub(cst.ModTime()) > validDuration {
|
||||
cacheMTime = cst.ModTime()
|
||||
if clock.Now().Sub(cacheMTime) > validDuration {
|
||||
// got cached file, but it's too old, remove it
|
||||
if err := os.Remove(cachedFile); err != nil {
|
||||
if err = os.Remove(cachedFile); err != nil {
|
||||
log(ctx).Debugf("unable to remove cache file: %v", err)
|
||||
}
|
||||
|
||||
return nil, errors.Errorf("cached file too old")
|
||||
return nil, time.Time{}, errors.Errorf("cached file too old")
|
||||
}
|
||||
|
||||
return os.ReadFile(cachedFile) //nolint:wrapcheck,gosec
|
||||
data, err = os.ReadFile(cachedFile) // nolint:gosec
|
||||
if err != nil {
|
||||
return nil, time.Time{}, errors.Wrapf(err, "failed to read the cache file %q", cachedFile)
|
||||
}
|
||||
|
||||
return data, cacheMTime, nil
|
||||
}
|
||||
|
||||
func readAndCacheRepositoryBlobBytes(ctx context.Context, st blob.Storage, cacheDirectory, blobID string, validDuration time.Duration) ([]byte, error) {
|
||||
func readAndCacheRepositoryBlobBytes(ctx context.Context, st blob.Storage, cacheDirectory, blobID string, validDuration time.Duration) ([]byte, time.Time, error) {
|
||||
cachedFile := filepath.Join(cacheDirectory, blobID)
|
||||
|
||||
if validDuration == 0 {
|
||||
@@ -420,11 +531,11 @@ func readAndCacheRepositoryBlobBytes(ctx context.Context, st blob.Storage, cache
|
||||
|
||||
cacheEnabled := formatBytesCachingEnabled(cacheDirectory, validDuration)
|
||||
if cacheEnabled {
|
||||
b, err := readRepositoryBlobBytesFromCache(ctx, cachedFile, validDuration)
|
||||
data, cacheMTime, err := readRepositoryBlobBytesFromCache(ctx, cachedFile, validDuration)
|
||||
if err == nil {
|
||||
log(ctx).Debugf("%s retrieved from cache", blobID)
|
||||
|
||||
return b, nil
|
||||
return data, cacheMTime, nil
|
||||
}
|
||||
|
||||
log(ctx).Debugf("%s could not be fetched from cache: %v", blobID, err)
|
||||
@@ -436,7 +547,7 @@ func readAndCacheRepositoryBlobBytes(ctx context.Context, st blob.Storage, cache
|
||||
defer b.Close()
|
||||
|
||||
if err := st.GetBlob(ctx, blob.ID(blobID), 0, -1, &b); err != nil {
|
||||
return nil, errors.Wrapf(err, "error getting %s blob", blobID)
|
||||
return nil, time.Time{}, errors.Wrapf(err, "error getting %s blob", blobID)
|
||||
}
|
||||
|
||||
if cacheEnabled {
|
||||
@@ -445,21 +556,5 @@ func readAndCacheRepositoryBlobBytes(ctx context.Context, st blob.Storage, cache
|
||||
}
|
||||
}
|
||||
|
||||
return b.ToByteSlice(), nil
|
||||
}
|
||||
|
||||
func readAndCacheRepositoryBlobs(ctx context.Context, st blob.Storage, cacheDirectory string, validDuration time.Duration) (format, blobcfg []byte, err error) {
|
||||
// Read format blob, potentially from cache.
|
||||
fb, err := readAndCacheRepositoryBlobBytes(ctx, st, cacheDirectory, FormatBlobID, validDuration)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "unable to read format blob")
|
||||
}
|
||||
|
||||
// Read blobcfg blob, potentially from cache.
|
||||
rb, err := readAndCacheRepositoryBlobBytes(ctx, st, cacheDirectory, BlobCfgBlobID, validDuration)
|
||||
if err != nil && !errors.Is(err, blob.ErrBlobNotFound) {
|
||||
return nil, nil, errors.Wrap(err, "unable to read blobcfg blob")
|
||||
}
|
||||
|
||||
return fb, rb, nil
|
||||
return b.ToByteSlice(), clock.Now(), nil
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/require"
|
||||
@@ -25,12 +26,15 @@
|
||||
)
|
||||
|
||||
func TestFormatUpgradeSetLock(t *testing.T) {
|
||||
ctx, env := repotesting.NewEnvironment(t, content.FormatVersion1)
|
||||
ctx, env := repotesting.NewEnvironment(t, content.FormatVersion1, repotesting.Options{OpenOptions: func(opts *repo.Options) {
|
||||
// nolint:goconst
|
||||
opts.UpgradeOwnerID = "upgrade-owner"
|
||||
}})
|
||||
formatBlockCacheDuration := env.Repository.ClientOptions().FormatBlobCacheDuration
|
||||
|
||||
l := &content.UpgradeLock{
|
||||
CreationTime: env.Repository.Time(),
|
||||
AdvanceNoticeDuration: 0,
|
||||
AdvanceNoticeDuration: 15 * time.Hour,
|
||||
IODrainTimeout: formatBlockCacheDuration * 2,
|
||||
StatusPollInterval: formatBlockCacheDuration,
|
||||
Message: "upgrading from format version 2 -> 3",
|
||||
@@ -84,7 +88,9 @@ func TestFormatUpgradeAlreadyUpgraded(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFormatUpgradeCommit(t *testing.T) {
|
||||
ctx, env := repotesting.NewEnvironment(t, content.FormatVersion1)
|
||||
ctx, env := repotesting.NewEnvironment(t, content.FormatVersion1, repotesting.Options{OpenOptions: func(opts *repo.Options) {
|
||||
opts.UpgradeOwnerID = "upgrade-owner"
|
||||
}})
|
||||
formatBlockCacheDuration := env.Repository.ClientOptions().FormatBlobCacheDuration
|
||||
|
||||
l := &content.UpgradeLock{
|
||||
@@ -109,7 +115,9 @@ func TestFormatUpgradeCommit(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFormatUpgradeRollback(t *testing.T) {
|
||||
ctx, env := repotesting.NewEnvironment(t, content.FormatVersion1)
|
||||
ctx, env := repotesting.NewEnvironment(t, content.FormatVersion1, repotesting.Options{OpenOptions: func(opts *repo.Options) {
|
||||
opts.UpgradeOwnerID = "upgrade-owner"
|
||||
}})
|
||||
formatBlockCacheDuration := env.Repository.ClientOptions().FormatBlobCacheDuration
|
||||
|
||||
l := &content.UpgradeLock{
|
||||
@@ -134,8 +142,10 @@ func TestFormatUpgradeRollback(t *testing.T) {
|
||||
require.EqualError(t, env.RepositoryWriter.CommitUpgrade(ctx), "no upgrade in progress")
|
||||
}
|
||||
|
||||
func TestFormatMultipleLocksRollback(t *testing.T) {
|
||||
ctx, env := repotesting.NewEnvironment(t, content.FormatVersion1)
|
||||
func TestFormatUpgradeMultipleLocksRollback(t *testing.T) {
|
||||
ctx, env := repotesting.NewEnvironment(t, content.FormatVersion1, repotesting.Options{OpenOptions: func(opts *repo.Options) {
|
||||
opts.UpgradeOwnerID = "upgrade-owner"
|
||||
}})
|
||||
formatBlockCacheDuration := env.Repository.ClientOptions().FormatBlobCacheDuration
|
||||
|
||||
l := &content.UpgradeLock{
|
||||
@@ -148,7 +158,9 @@ func TestFormatMultipleLocksRollback(t *testing.T) {
|
||||
MaxPermittedClockDrift: formatBlockCacheDuration / 3,
|
||||
}
|
||||
|
||||
secondWriter := env.MustOpenAnother(t)
|
||||
secondWriter := env.MustOpenAnother(t, func(opts *repo.Options) {
|
||||
opts.UpgradeOwnerID = "upgrade-owner"
|
||||
})
|
||||
|
||||
// first lock by primary creator
|
||||
_, err := env.RepositoryWriter.SetUpgradeLockIntent(ctx, *l)
|
||||
@@ -174,7 +186,9 @@ func TestFormatMultipleLocksRollback(t *testing.T) {
|
||||
}
|
||||
|
||||
// verify that we have upgraded our format version
|
||||
env.MustReopen(t)
|
||||
env.MustReopen(t, func(opts *repo.Options) {
|
||||
opts.UpgradeOwnerID = "another-upgrade-owner"
|
||||
})
|
||||
require.Equal(t, content.FormatVersion2,
|
||||
env.RepositoryWriter.ContentManager().ContentFormat().MutableParameters.Version)
|
||||
|
||||
@@ -264,7 +278,7 @@ func(id blob.ID, _ *blob.PutOptions) error {
|
||||
|
||||
require.NoError(t, repo.Connect(testlogging.Context(t), configFile, st, "password", &connectOpts))
|
||||
|
||||
r, err := repo.Open(testlogging.Context(t), configFile, "password", nil)
|
||||
r, err := repo.Open(testlogging.Context(t), configFile, "password", &repo.Options{UpgradeOwnerID: "allowed-upgrade-owner"})
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = r.(repo.DirectRepositoryWriter).SetUpgradeLockIntent(testlogging.Context(t), *faultyLock)
|
||||
@@ -293,3 +307,85 @@ func(id blob.ID, _ *blob.PutOptions) error {
|
||||
|
||||
require.NoError(t, r.(repo.DirectRepositoryWriter).RollbackUpgrade(testlogging.Context(t)))
|
||||
}
|
||||
|
||||
func TestFormatUpgradeDuringOngoingWriteSessions(t *testing.T) {
|
||||
curTime := clock.Now()
|
||||
ctx, env := repotesting.NewEnvironment(t, content.FormatVersion1, repotesting.Options{
|
||||
// new environment with controlled time
|
||||
OpenOptions: func(opts *repo.Options) {
|
||||
opts.TimeNowFunc = func() time.Time {
|
||||
return curTime
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
rep := env.Repository // read-only
|
||||
|
||||
lw := rep.(repo.RepositoryWriter)
|
||||
|
||||
// w1, w2, w3 are indepdendent sessions.
|
||||
_, w1, err := rep.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "writer1"})
|
||||
require.NoError(t, err)
|
||||
|
||||
defer w1.Close(ctx)
|
||||
|
||||
_, w2, err := rep.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "writer2"})
|
||||
require.NoError(t, err)
|
||||
|
||||
defer w2.Close(ctx)
|
||||
|
||||
_, w3, err := rep.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "writer3"})
|
||||
require.NoError(t, err)
|
||||
|
||||
defer w3.Close(ctx)
|
||||
|
||||
o1Data := []byte{1, 2, 3}
|
||||
o2Data := []byte{2, 3, 4}
|
||||
o3Data := []byte{3, 4, 5}
|
||||
o4Data := []byte{4, 5, 6}
|
||||
|
||||
writeObject(ctx, t, w1, o1Data, "o1")
|
||||
writeObject(ctx, t, w2, o2Data, "o2")
|
||||
writeObject(ctx, t, w3, o3Data, "o3")
|
||||
writeObject(ctx, t, lw, o4Data, "o4")
|
||||
|
||||
formatBlockCacheDuration := env.Repository.ClientOptions().FormatBlobCacheDuration
|
||||
l := content.UpgradeLock{
|
||||
OwnerID: "upgrade-owner",
|
||||
CreationTime: env.Repository.Time(),
|
||||
AdvanceNoticeDuration: 0,
|
||||
IODrainTimeout: formatBlockCacheDuration * 2,
|
||||
StatusPollInterval: formatBlockCacheDuration,
|
||||
Message: "upgrading from format version 2 -> 3",
|
||||
MaxPermittedClockDrift: formatBlockCacheDuration / 3,
|
||||
}
|
||||
|
||||
_, err = env.RepositoryWriter.SetUpgradeLockIntent(ctx, l)
|
||||
require.NoError(t, err)
|
||||
|
||||
// ongoing writes should NOT get interrupted because the upgrade lock
|
||||
// monitor could not have noticed the lock yet
|
||||
require.NoError(t, w1.Flush(ctx))
|
||||
require.NoError(t, w2.Flush(ctx))
|
||||
require.NoError(t, w3.Flush(ctx))
|
||||
require.NoError(t, lw.Flush(ctx))
|
||||
|
||||
o5Data := []byte{7, 8, 9}
|
||||
o6Data := []byte{10, 11, 12}
|
||||
o7Data := []byte{13, 14, 15}
|
||||
o8Data := []byte{16, 17, 18}
|
||||
|
||||
writeObject(ctx, t, w1, o5Data, "o5")
|
||||
writeObject(ctx, t, w2, o6Data, "o6")
|
||||
writeObject(ctx, t, w3, o7Data, "o7")
|
||||
writeObject(ctx, t, lw, o8Data, "o8")
|
||||
|
||||
// move time forward by the lock refresh interval
|
||||
curTime = curTime.Add(formatBlockCacheDuration + time.Second)
|
||||
|
||||
// ongoing writes should get interrupted this time
|
||||
require.ErrorIs(t, w1.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress)
|
||||
require.ErrorIs(t, w2.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress)
|
||||
require.ErrorIs(t, w3.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress)
|
||||
require.ErrorIs(t, lw.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user