diff --git a/internal/repotesting/repotesting.go b/internal/repotesting/repotesting.go index 756bf9329..0bfd69211 100644 --- a/internal/repotesting/repotesting.go +++ b/internal/repotesting/repotesting.go @@ -168,12 +168,12 @@ func (e *Environment) MustReopen(tb testing.TB, openOpts ...func(*repo.Options)) } // MustOpenAnother opens another repository backend by the same storage. -func (e *Environment) MustOpenAnother(tb testing.TB) repo.RepositoryWriter { +func (e *Environment) MustOpenAnother(tb testing.TB, openOpts ...func(*repo.Options)) repo.RepositoryWriter { tb.Helper() ctx := testlogging.Context(tb) - rep2, err := repo.Open(ctx, e.ConfigFile(), e.Password, &repo.Options{}) + rep2, err := repo.Open(ctx, e.ConfigFile(), e.Password, repoOptions(openOpts)) if err != nil { tb.Fatalf("err: %v", err) } diff --git a/internal/retry/retry.go b/internal/retry/retry.go index 20fc0860b..f7eb5d84e 100644 --- a/internal/retry/retry.go +++ b/internal/retry/retry.go @@ -34,6 +34,13 @@ func WithExponentialBackoff(ctx context.Context, desc string, attempt AttemptFun return internalRetry(ctx, desc, attempt, isRetriableError, retryInitialSleepAmount, retryMaxSleepAmount, maxAttempts, retryExponent) } +// WithExponentialBackoffMaxRetries is the same as WithExponentialBackoff, +// additionally it allows customizing the max number of retries before giving +// up (count parameter). A negative value for count would run this forever. +func WithExponentialBackoffMaxRetries(ctx context.Context, count int, desc string, attempt AttemptFunc, isRetriableError IsRetriableFunc) (interface{}, error) { + return internalRetry(ctx, desc, attempt, isRetriableError, retryInitialSleepAmount, retryMaxSleepAmount, count, retryExponent) +} + // Periodically runs the provided attempt until it succeeds, waiting given fixed amount between attempts. func Periodically(ctx context.Context, interval time.Duration, count int, desc string, attempt AttemptFunc, isRetriableError IsRetriableFunc) (interface{}, error) { return internalRetry(ctx, desc, attempt, isRetriableError, interval, interval, count, 1) @@ -54,9 +61,12 @@ func PeriodicallyNoValue(ctx context.Context, interval time.Duration, count int, func internalRetry(ctx context.Context, desc string, attempt AttemptFunc, isRetriableError IsRetriableFunc, initial, max time.Duration, count int, factor float64) (interface{}, error) { sleepAmount := initial - var lastError error + var ( + lastError error + i = 0 + ) - for i := 0; i < count; i++ { + for ; i < count || count < 0; i++ { if cerr := ctx.Err(); cerr != nil { // nolint:wrapcheck return nil, cerr @@ -82,7 +92,7 @@ func internalRetry(ctx context.Context, desc string, attempt AttemptFunc, isRetr } } - return nil, errors.Wrapf(lastError, "unable to complete %v despite %v retries", desc, count) + return nil, errors.Wrapf(lastError, "unable to complete %v despite %v retries", desc, i) } // WithExponentialBackoffNoValue is a shorthand for WithExponentialBackoff except the diff --git a/repo/blob/beforeop/beforeop.go b/repo/blob/beforeop/beforeop.go index 4277b79fd..f6e58f1eb 100644 --- a/repo/blob/beforeop/beforeop.go +++ b/repo/blob/beforeop/beforeop.go @@ -71,3 +71,15 @@ func NewWrapper(wrapped blob.Storage, onGetBlob onGetBlobCallback, onGetMetadata onPutBlob: onPutBlob, } } + +// NewUniformWrapper is same as NewWrapper except that it only accepts a single +// uniform callback for all the operations for simpler use-cases. +func NewUniformWrapper(wrapped blob.Storage, cb callback) blob.Storage { + return &beforeOp{ + Storage: wrapped, + onGetBlob: func(_ blob.ID) error { return cb() }, + onGetMetadata: cb, + onDeleteBlob: cb, + onPutBlob: func(_ blob.ID, _ *blob.PutOptions) error { return cb() }, + } +} diff --git a/repo/open.go b/repo/open.go index 7e59ea92c..838f24559 100644 --- a/repo/open.go +++ b/repo/open.go @@ -6,6 +6,7 @@ "os" "path/filepath" "strings" + "sync" "time" "github.com/pkg/errors" @@ -16,6 +17,7 @@ "github.com/kopia/kopia/internal/clock" "github.com/kopia/kopia/internal/epoch" "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/internal/retry" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/blob/beforeop" loggingwrapper "github.com/kopia/kopia/repo/blob/logging" @@ -69,6 +71,7 @@ type Options struct { TraceStorage bool // Logs all storage access using provided Printf-style function TimeNowFunc func() time.Time // Time provider DisableInternalLog bool // Disable internal log + UpgradeOwnerID string // Owner-ID of any upgrade in progress, when this is not set the access may be restricted } // ErrInvalidPassword is returned when repository password is invalid. @@ -183,66 +186,109 @@ func openDirect(ctx context.Context, configFile string, lc *LocalConfig, passwor return r, nil } -// openWithConfig opens the repository with a given configuration, avoiding the need for a config file. -// nolint:funlen,gocyclo -func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, password string, options *Options, caching *content.CachingOptions, configFile string) (DirectRepository, error) { - caching = caching.CloneOrDefault() +type unpackedFormatBlob struct { + f *formatBlob + fb []byte // serialized format blob + cacheMTime time.Time // mod time of the format blob cache file + repoConfig *repositoryObjectFormat // unencrypted format blob structure + formatEncryptionKey []byte // key derived from the password +} - // Read repo blobs, potentially from cache. - fb, rb, err := readAndCacheRepositoryBlobs(ctx, st, caching.CacheDirectory, lc.FormatBlobCacheDuration) +func readAndCacheRepoConfig(ctx context.Context, st blob.Storage, password string, cacheOpts *content.CachingOptions, validDuration time.Duration) (ufb *unpackedFormatBlob, err error) { + ufb = &unpackedFormatBlob{} + + // Read format blob, potentially from cache. + ufb.fb, ufb.cacheMTime, err = readAndCacheRepositoryBlobBytes(ctx, st, cacheOpts.CacheDirectory, FormatBlobID, validDuration) if err != nil { - return nil, errors.Wrap(err, "unable to read repository blobs") + return nil, errors.Wrap(err, "unable to read format blob") } - if err = writeCacheMarker(caching.CacheDirectory); err != nil { + if err = writeCacheMarker(cacheOpts.CacheDirectory); err != nil { return nil, errors.Wrap(err, "unable to write cache directory marker") } - f, err := parseFormatBlob(fb) + ufb.f, err = parseFormatBlob(ufb.fb) if err != nil { return nil, errors.Wrap(err, "can't parse format blob") } - fb, err = addFormatBlobChecksumAndLength(fb) + ufb.fb, err = addFormatBlobChecksumAndLength(ufb.fb) if err != nil { return nil, errors.Errorf("unable to add checksum") } - formatEncryptionKey, err := f.deriveFormatEncryptionKeyFromPassword(password) + ufb.formatEncryptionKey, err = ufb.f.deriveFormatEncryptionKeyFromPassword(password) if err != nil { return nil, err } - repoConfig, err := f.decryptFormatBytes(formatEncryptionKey) + ufb.repoConfig, err = ufb.f.decryptFormatBytes(ufb.formatEncryptionKey) if err != nil { return nil, ErrInvalidPassword } - blobcfg, err := deserializeBlobCfgBytes(f, rb, formatEncryptionKey) + return ufb, nil +} + +// openWithConfig opens the repository with a given configuration, avoiding the need for a config file. +// nolint:funlen,gocyclo +func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, password string, options *Options, cacheOpts *content.CachingOptions, configFile string) (DirectRepository, error) { + cacheOpts = cacheOpts.CloneOrDefault() + cmOpts := &content.ManagerOptions{ + TimeNow: defaultTime(options.TimeNowFunc), + DisableInternalLog: options.DisableInternalLog, + } + + var ufb *unpackedFormatBlob + + if _, err := retry.WithExponentialBackoffMaxRetries(ctx, -1, "read repo config and wait for upgrade", func() (interface{}, error) { + var internalErr error + ufb, internalErr = readAndCacheRepoConfig(ctx, st, password, cacheOpts, + lc.FormatBlobCacheDuration) + if internalErr != nil { + return nil, internalErr + } + + // retry if upgrade lock has been taken + if locked, _ := ufb.repoConfig.UpgradeLock.IsLocked(cmOpts.TimeNow()); locked && options.UpgradeOwnerID != ufb.repoConfig.UpgradeLock.OwnerID { + return nil, ErrRepositoryUnavailableDueToUpgrageInProgress + } + + return nil, nil + }, func(internalErr error) bool { + return errors.Is(internalErr, ErrRepositoryUnavailableDueToUpgrageInProgress) + }); err != nil { + // nolint:wrapcheck + return nil, err + } + + cmOpts.RepositoryFormatBytes = ufb.fb + + // Read blobcfg blob, potentially from cache. + bb, _, err := readAndCacheRepositoryBlobBytes(ctx, st, cacheOpts.CacheDirectory, BlobCfgBlobID, lc.FormatBlobCacheDuration) + if err != nil && !errors.Is(err, blob.ErrBlobNotFound) { + return nil, errors.Wrap(err, "unable to read blobcfg blob") + } + + blobcfg, err := deserializeBlobCfgBytes(ufb.f, bb, ufb.formatEncryptionKey) if err != nil { return nil, ErrInvalidPassword } - if repoConfig.FormattingOptions.EnablePasswordChange { - caching.HMACSecret = deriveKeyFromMasterKey(repoConfig.HMACSecret, f.UniqueID, localCacheIntegrityPurpose, localCacheIntegrityHMACSecretLength) + if ufb.repoConfig.FormattingOptions.EnablePasswordChange { + cacheOpts.HMACSecret = deriveKeyFromMasterKey(ufb.repoConfig.HMACSecret, ufb.f.UniqueID, localCacheIntegrityPurpose, localCacheIntegrityHMACSecretLength) } else { - // deriving from formatEncryptionKey was actually a bug, that only matters will change when we change the password - caching.HMACSecret = deriveKeyFromMasterKey(formatEncryptionKey, f.UniqueID, localCacheIntegrityPurpose, localCacheIntegrityHMACSecretLength) + // deriving from ufb.formatEncryptionKey was actually a bug, that only matters will change when we change the password + cacheOpts.HMACSecret = deriveKeyFromMasterKey(ufb.formatEncryptionKey, ufb.f.UniqueID, localCacheIntegrityPurpose, localCacheIntegrityHMACSecretLength) } - fo := &repoConfig.FormattingOptions + fo := &ufb.repoConfig.FormattingOptions if fo.MaxPackSize == 0 { // legacy only, apply default fo.MaxPackSize = 20 << 20 // nolint:gomnd } - cmOpts := &content.ManagerOptions{ - RepositoryFormatBytes: fb, - TimeNow: defaultTime(options.TimeNowFunc), - DisableInternalLog: options.DisableInternalLog, - } - // do not embed repository format info in pack blobs when password change is enabled. if fo.EnablePasswordChange { cmOpts.RepositoryFormatBytes = nil @@ -257,7 +303,11 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw st = wrapLockingStorage(st, blobcfg) } - scm, err := content.NewSharedManager(ctx, st, fo, caching, cmOpts) + // background/interleaving upgrade lock storage monitor + st = upgradeLockMonitor(ctx, options.UpgradeOwnerID, st, password, cacheOpts, lc.FormatBlobCacheDuration, + ufb.cacheMTime, cmOpts.TimeNow) + + scm, err := content.NewSharedManager(ctx, st, fo, cacheOpts, cmOpts) if err != nil { return nil, errors.Wrap(err, "unable to create shared content manager") } @@ -267,7 +317,7 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw SessionHost: lc.Hostname, }, "") - om, err := object.NewObjectManager(ctx, cm, repoConfig.Format) + om, err := object.NewObjectManager(ctx, cm, ufb.repoConfig.Format) if err != nil { return nil, errors.Wrap(err, "unable to open object manager") } @@ -285,11 +335,11 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw sm: scm, throttler: throttler, directRepositoryParameters: directRepositoryParameters{ - uniqueID: f.UniqueID, - cachingOptions: *caching, - formatBlob: f, + uniqueID: ufb.f.UniqueID, + cachingOptions: *cacheOpts, + formatBlob: ufb.f, blobCfgBlob: blobcfg, - formatEncryptionKey: formatEncryptionKey, + formatEncryptionKey: ufb.formatEncryptionKey, timeNow: cmOpts.TimeNow, cliOpts: lc.ClientOptions.ApplyDefaults(ctx, "Repository in "+st.DisplayName()), configFile: configFile, @@ -333,6 +383,61 @@ func addThrottler(ctx context.Context, st blob.Storage) (blob.Storage, throttlin return throttling.NewWrapper(st, throttler), throttler, nil } +func upgradeLockMonitor( + ctx context.Context, + upgradeOwnerID string, + st blob.Storage, + password string, + cacheOpts *content.CachingOptions, + lockRefreshInterval time.Duration, + lastSync time.Time, + now func() time.Time, +) blob.Storage { + var ( + m sync.RWMutex + nextSync = lastSync.Add(lockRefreshInterval) + ) + + cb := func() error { + // protected read for nextSync because it will be shared between + // parallel storage operations + m.RLock() + if nextSync.After(now()) { + m.RUnlock() + return nil + } + m.RUnlock() + + // upgrade the lock and verify again in-case someone else won the race to refresh + m.Lock() + defer m.Unlock() + + if nextSync.After(now()) { + return nil + } + + ufb, err := readAndCacheRepoConfig(ctx, st, password, cacheOpts, lockRefreshInterval) + if err != nil { + return err + } + + // only allow the upgrade owner to perform storage operations + if locked, _ := ufb.repoConfig.UpgradeLock.IsLocked(now()); locked && upgradeOwnerID != ufb.repoConfig.UpgradeLock.OwnerID { + return ErrRepositoryUnavailableDueToUpgrageInProgress + } + + // prevent backward jumps on nextSync + newNextSync := ufb.cacheMTime.Add(lockRefreshInterval) + if newNextSync.After(nextSync) { + nextSync = newNextSync + } + + return nil + } + + return beforeop.NewUniformWrapper(st, cb) +} + func throttlingLimitsFromConnectionInfo(ctx context.Context, ci blob.ConnectionInfo) throttling.Limits { v, err := json.Marshal(ci.Config) if err != nil { @@ -387,25 +492,31 @@ func formatBytesCachingEnabled(cacheDirectory string, validDuration time.Duratio return validDuration > 0 } -func readRepositoryBlobBytesFromCache(ctx context.Context, cachedFile string, validDuration time.Duration) ([]byte, error) { +func readRepositoryBlobBytesFromCache(ctx context.Context, cachedFile string, validDuration time.Duration) (data []byte, cacheMTime time.Time, err error) { cst, err := os.Stat(cachedFile) if err != nil { - return nil, errors.Wrap(err, "unable to open cache file") + return nil, time.Time{}, errors.Wrap(err, "unable to open cache file") } - if clock.Now().Sub(cst.ModTime()) > validDuration { + cacheMTime = cst.ModTime() + if clock.Now().Sub(cacheMTime) > validDuration { // got cached file, but it's too old, remove it - if err := os.Remove(cachedFile); err != nil { + if err = os.Remove(cachedFile); err != nil { log(ctx).Debugf("unable to remove cache file: %v", err) } - return nil, errors.Errorf("cached file too old") + return nil, time.Time{}, errors.Errorf("cached file too old") } - return os.ReadFile(cachedFile) //nolint:wrapcheck,gosec + data, err = os.ReadFile(cachedFile) // nolint:gosec + if err != nil { + return nil, time.Time{}, errors.Wrapf(err, "failed to read the cache file %q", cachedFile) + } + + return data, cacheMTime, nil } -func readAndCacheRepositoryBlobBytes(ctx context.Context, st blob.Storage, cacheDirectory, blobID string, validDuration time.Duration) ([]byte, error) { +func readAndCacheRepositoryBlobBytes(ctx context.Context, st blob.Storage, cacheDirectory, blobID string, validDuration time.Duration) ([]byte, time.Time, error) { cachedFile := filepath.Join(cacheDirectory, blobID) if validDuration == 0 { @@ -420,11 +531,11 @@ func readAndCacheRepositoryBlobBytes(ctx context.Context, st blob.Storage, cache cacheEnabled := formatBytesCachingEnabled(cacheDirectory, validDuration) if cacheEnabled { - b, err := readRepositoryBlobBytesFromCache(ctx, cachedFile, validDuration) + data, cacheMTime, err := readRepositoryBlobBytesFromCache(ctx, cachedFile, validDuration) if err == nil { log(ctx).Debugf("%s retrieved from cache", blobID) - return b, nil + return data, cacheMTime, nil } log(ctx).Debugf("%s could not be fetched from cache: %v", blobID, err) @@ -436,7 +547,7 @@ func readAndCacheRepositoryBlobBytes(ctx context.Context, st blob.Storage, cache defer b.Close() if err := st.GetBlob(ctx, blob.ID(blobID), 0, -1, &b); err != nil { - return nil, errors.Wrapf(err, "error getting %s blob", blobID) + return nil, time.Time{}, errors.Wrapf(err, "error getting %s blob", blobID) } if cacheEnabled { @@ -445,21 +556,5 @@ func readAndCacheRepositoryBlobBytes(ctx context.Context, st blob.Storage, cache } } - return b.ToByteSlice(), nil -} - -func readAndCacheRepositoryBlobs(ctx context.Context, st blob.Storage, cacheDirectory string, validDuration time.Duration) (format, blobcfg []byte, err error) { - // Read format blob, potentially from cache. - fb, err := readAndCacheRepositoryBlobBytes(ctx, st, cacheDirectory, FormatBlobID, validDuration) - if err != nil { - return nil, nil, errors.Wrap(err, "unable to read format blob") - } - - // Read blobcfg blob, potentially from cache. - rb, err := readAndCacheRepositoryBlobBytes(ctx, st, cacheDirectory, BlobCfgBlobID, validDuration) - if err != nil && !errors.Is(err, blob.ErrBlobNotFound) { - return nil, nil, errors.Wrap(err, "unable to read blobcfg blob") - } - - return fb, rb, nil + return b.ToByteSlice(), clock.Now(), nil } diff --git a/repo/upgrade_lock_test.go b/repo/upgrade_lock_test.go index 3fe1f5169..de4f5183f 100644 --- a/repo/upgrade_lock_test.go +++ b/repo/upgrade_lock_test.go @@ -7,6 +7,7 @@ "sort" "strings" "testing" + "time" "github.com/pkg/errors" "github.com/stretchr/testify/require" @@ -25,12 +26,15 @@ ) func TestFormatUpgradeSetLock(t *testing.T) { - ctx, env := repotesting.NewEnvironment(t, content.FormatVersion1) + ctx, env := repotesting.NewEnvironment(t, content.FormatVersion1, repotesting.Options{OpenOptions: func(opts *repo.Options) { + // nolint:goconst + opts.UpgradeOwnerID = "upgrade-owner" + }}) formatBlockCacheDuration := env.Repository.ClientOptions().FormatBlobCacheDuration l := &content.UpgradeLock{ CreationTime: env.Repository.Time(), - AdvanceNoticeDuration: 0, + AdvanceNoticeDuration: 15 * time.Hour, IODrainTimeout: formatBlockCacheDuration * 2, StatusPollInterval: formatBlockCacheDuration, Message: "upgrading from format version 2 -> 3", @@ -84,7 +88,9 @@ func TestFormatUpgradeAlreadyUpgraded(t *testing.T) { } func TestFormatUpgradeCommit(t *testing.T) { - ctx, env := repotesting.NewEnvironment(t, content.FormatVersion1) + ctx, env := repotesting.NewEnvironment(t, content.FormatVersion1, repotesting.Options{OpenOptions: func(opts *repo.Options) { + opts.UpgradeOwnerID = "upgrade-owner" + }}) formatBlockCacheDuration := env.Repository.ClientOptions().FormatBlobCacheDuration l := &content.UpgradeLock{ @@ -109,7 +115,9 @@ func TestFormatUpgradeCommit(t *testing.T) { } func TestFormatUpgradeRollback(t *testing.T) { - ctx, env := repotesting.NewEnvironment(t, content.FormatVersion1) + ctx, env := repotesting.NewEnvironment(t, content.FormatVersion1, repotesting.Options{OpenOptions: func(opts *repo.Options) { + opts.UpgradeOwnerID = "upgrade-owner" + }}) formatBlockCacheDuration := env.Repository.ClientOptions().FormatBlobCacheDuration l := &content.UpgradeLock{ @@ -134,8 +142,10 @@ func TestFormatUpgradeRollback(t *testing.T) { require.EqualError(t, env.RepositoryWriter.CommitUpgrade(ctx), "no upgrade in progress") } -func TestFormatMultipleLocksRollback(t *testing.T) { - ctx, env := repotesting.NewEnvironment(t, content.FormatVersion1) +func TestFormatUpgradeMultipleLocksRollback(t *testing.T) { + ctx, env := repotesting.NewEnvironment(t, content.FormatVersion1, repotesting.Options{OpenOptions: func(opts *repo.Options) { + opts.UpgradeOwnerID = "upgrade-owner" + }}) formatBlockCacheDuration := env.Repository.ClientOptions().FormatBlobCacheDuration l := &content.UpgradeLock{ @@ -148,7 +158,9 @@ func TestFormatMultipleLocksRollback(t *testing.T) { MaxPermittedClockDrift: formatBlockCacheDuration / 3, } - secondWriter := env.MustOpenAnother(t) + secondWriter := env.MustOpenAnother(t, func(opts *repo.Options) { + opts.UpgradeOwnerID = "upgrade-owner" + }) // first lock by primary creator _, err := env.RepositoryWriter.SetUpgradeLockIntent(ctx, *l) @@ -174,7 +186,9 @@ func TestFormatMultipleLocksRollback(t *testing.T) { } // verify that we have upgraded our format version - env.MustReopen(t) + env.MustReopen(t, func(opts *repo.Options) { + opts.UpgradeOwnerID = "another-upgrade-owner" + }) require.Equal(t, content.FormatVersion2, env.RepositoryWriter.ContentManager().ContentFormat().MutableParameters.Version) @@ -264,7 +278,7 @@ func(id blob.ID, _ *blob.PutOptions) error { require.NoError(t, repo.Connect(testlogging.Context(t), configFile, st, "password", &connectOpts)) - r, err := repo.Open(testlogging.Context(t), configFile, "password", nil) + r, err := repo.Open(testlogging.Context(t), configFile, "password", &repo.Options{UpgradeOwnerID: "allowed-upgrade-owner"}) require.NoError(t, err) _, err = r.(repo.DirectRepositoryWriter).SetUpgradeLockIntent(testlogging.Context(t), *faultyLock) @@ -293,3 +307,85 @@ func(id blob.ID, _ *blob.PutOptions) error { require.NoError(t, r.(repo.DirectRepositoryWriter).RollbackUpgrade(testlogging.Context(t))) } + +func TestFormatUpgradeDuringOngoingWriteSessions(t *testing.T) { + curTime := clock.Now() + ctx, env := repotesting.NewEnvironment(t, content.FormatVersion1, repotesting.Options{ + // new environment with controlled time + OpenOptions: func(opts *repo.Options) { + opts.TimeNowFunc = func() time.Time { + return curTime + } + }, + }) + + rep := env.Repository // read-only + + lw := rep.(repo.RepositoryWriter) + + // w1, w2, w3 are indepdendent sessions. + _, w1, err := rep.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "writer1"}) + require.NoError(t, err) + + defer w1.Close(ctx) + + _, w2, err := rep.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "writer2"}) + require.NoError(t, err) + + defer w2.Close(ctx) + + _, w3, err := rep.NewWriter(ctx, repo.WriteSessionOptions{Purpose: "writer3"}) + require.NoError(t, err) + + defer w3.Close(ctx) + + o1Data := []byte{1, 2, 3} + o2Data := []byte{2, 3, 4} + o3Data := []byte{3, 4, 5} + o4Data := []byte{4, 5, 6} + + writeObject(ctx, t, w1, o1Data, "o1") + writeObject(ctx, t, w2, o2Data, "o2") + writeObject(ctx, t, w3, o3Data, "o3") + writeObject(ctx, t, lw, o4Data, "o4") + + formatBlockCacheDuration := env.Repository.ClientOptions().FormatBlobCacheDuration + l := content.UpgradeLock{ + OwnerID: "upgrade-owner", + CreationTime: env.Repository.Time(), + AdvanceNoticeDuration: 0, + IODrainTimeout: formatBlockCacheDuration * 2, + StatusPollInterval: formatBlockCacheDuration, + Message: "upgrading from format version 2 -> 3", + MaxPermittedClockDrift: formatBlockCacheDuration / 3, + } + + _, err = env.RepositoryWriter.SetUpgradeLockIntent(ctx, l) + require.NoError(t, err) + + // ongoing writes should NOT get interrupted because the upgrade lock + // monitor could not have noticed the lock yet + require.NoError(t, w1.Flush(ctx)) + require.NoError(t, w2.Flush(ctx)) + require.NoError(t, w3.Flush(ctx)) + require.NoError(t, lw.Flush(ctx)) + + o5Data := []byte{7, 8, 9} + o6Data := []byte{10, 11, 12} + o7Data := []byte{13, 14, 15} + o8Data := []byte{16, 17, 18} + + writeObject(ctx, t, w1, o5Data, "o5") + writeObject(ctx, t, w2, o6Data, "o6") + writeObject(ctx, t, w3, o7Data, "o7") + writeObject(ctx, t, lw, o8Data, "o8") + + // move time forward by the lock refresh interval + curTime = curTime.Add(formatBlockCacheDuration + time.Second) + + // ongoing writes should get interrupted this time + require.ErrorIs(t, w1.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress) + require.ErrorIs(t, w2.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress) + require.ErrorIs(t, w3.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress) + require.ErrorIs(t, lw.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress) +}