From c65613ccba2b2d74e95d890a914434ebb78489c0 Mon Sep 17 00:00:00 2001 From: Aaron Alpar <55999015+aaron-kasten@users.noreply.github.com> Date: Mon, 27 Mar 2023 16:08:11 -0700 Subject: [PATCH] feat(cli): ability to read repository status ... (#2799) * fix spelling * permissive index read * fixup v1 permissive index * fixups for lint * trigger rebuild * Update repo/local_config.go Co-authored-by: Shikhar Mall * rename permissive read flag * extend "permissive-read" rename * hide permissive command-line flag * syntax fixup * fixup for test * fixups for tests * Update .golangci.yml Co-authored-by: Shikhar Mall --------- Co-authored-by: Shikhar Mall --- Makefile | 1 + cli/command_repository_connect.go | 3 + cli/command_repository_set_client.go | 23 ++++- repo/blob/retrying/retrying_storage.go | 2 +- repo/content/committed_content_index.go | 30 ++++-- repo/content/committed_read_manager.go | 13 ++- repo/content/content_manager.go | 13 ++- repo/content/content_manager_test.go | 104 +++++++++++++++++++++ repo/format/upgrade_lock_test.go | 8 +- repo/local_config.go | 10 +- repo/open.go | 64 +++++++------ tests/end_to_end_test/server_start_test.go | 7 ++ 12 files changed, 224 insertions(+), 54 deletions(-) diff --git a/Makefile b/Makefile index 677f0d5d4..d5ecf31fe 100644 --- a/Makefile +++ b/Makefile @@ -480,3 +480,4 @@ perf-benchmark-results: gcloud compute scp $(PERF_BENCHMARK_INSTANCE):psrecord-* tests/perf_benchmark --zone=$(PERF_BENCHMARK_INSTANCE_ZONE) gcloud compute scp $(PERF_BENCHMARK_INSTANCE):repo-size-* tests/perf_benchmark --zone=$(PERF_BENCHMARK_INSTANCE_ZONE) (cd tests/perf_benchmark && go run process_results.go) + diff --git a/cli/command_repository_connect.go b/cli/command_repository_connect.go index 60cd9e3b6..d4a599ad9 100644 --- a/cli/command_repository_connect.go +++ b/cli/command_repository_connect.go @@ -54,6 +54,7 @@ type connectOptions struct { connectUsername string connectCheckForUpdates bool connectReadonly bool + connectPermissiveCacheLoading bool connectDescription string connectEnableActions bool @@ -72,6 +73,7 @@ func (c *connectOptions) setup(svc appServices, cmd *kingpin.CmdClause) { cmd.Flag("override-username", "Override username used by this repository connection").Hidden().StringVar(&c.connectUsername) cmd.Flag("check-for-updates", "Periodically check for Kopia updates on GitHub").Default("true").Envar(svc.EnvName(checkForUpdatesEnvar)).BoolVar(&c.connectCheckForUpdates) cmd.Flag("readonly", "Make repository read-only to avoid accidental changes").BoolVar(&c.connectReadonly) + cmd.Flag("permissive-cache-loading", "Do not fail when loading bad cache index entries. Repository must be opened in read-only mode").Hidden().BoolVar(&c.connectPermissiveCacheLoading) cmd.Flag("description", "Human-readable description of the repository").StringVar(&c.connectDescription) cmd.Flag("enable-actions", "Allow snapshot actions").BoolVar(&c.connectEnableActions) cmd.Flag("repository-format-cache-duration", "Duration of kopia.repository format blob cache").Hidden().DurationVar(&c.formatBlobCacheDuration) @@ -98,6 +100,7 @@ func (c *connectOptions) toRepoConnectOptions() *repo.ConnectOptions { Hostname: c.connectHostname, Username: c.connectUsername, ReadOnly: c.connectReadonly, + PermissiveCacheLoading: c.connectPermissiveCacheLoading, Description: c.connectDescription, EnableActions: c.connectEnableActions, FormatBlobCacheDuration: c.getFormatBlobCacheDuration(), diff --git a/cli/command_repository_set_client.go b/cli/command_repository_set_client.go index 94dce0b46..b691b4fcb 100644 --- a/cli/command_repository_set_client.go +++ b/cli/command_repository_set_client.go @@ -10,11 +10,12 @@ ) type commandRepositorySetClient struct { - repoClientOptionsReadOnly bool - repoClientOptionsReadWrite bool - repoClientOptionsDescription []string - repoClientOptionsUsername []string - repoClientOptionsHostname []string + repoClientOptionsReadOnly bool + repoClientOptionsReadWrite bool + repoClientOptionsPermissiveCacheLoading bool + repoClientOptionsDescription []string + repoClientOptionsUsername []string + repoClientOptionsHostname []string formatBlobCacheDuration time.Duration disableFormatBlobCache bool @@ -27,6 +28,7 @@ func (c *commandRepositorySetClient) setup(svc appServices, parent commandParent cmd.Flag("read-only", "Set repository to read-only").BoolVar(&c.repoClientOptionsReadOnly) cmd.Flag("read-write", "Set repository to read-write").BoolVar(&c.repoClientOptionsReadWrite) + cmd.Flag("permissive-cache-loading", "Do not fail when loading bad cache index entries. Repository must be opened in read-only mode").Hidden().BoolVar(&c.repoClientOptionsPermissiveCacheLoading) cmd.Flag("description", "Change description").StringsVar(&c.repoClientOptionsDescription) cmd.Flag("username", "Change username").StringsVar(&c.repoClientOptionsUsername) cmd.Flag("hostname", "Change hostname").StringsVar(&c.repoClientOptionsHostname) @@ -64,6 +66,17 @@ func (c *commandRepositorySetClient) run(ctx context.Context, rep repo.Repositor } } + if c.repoClientOptionsPermissiveCacheLoading { + if !opt.PermissiveCacheLoading { + log(ctx).Infof("Repository fails on read of bad index blobs.") + } else { + opt.PermissiveCacheLoading = true + anyChange = true + + log(ctx).Infof("Setting to load indicies into cache permissively.") + } + } + if v := c.repoClientOptionsDescription; len(v) > 0 { opt.Description = v[0] anyChange = true diff --git a/repo/blob/retrying/retrying_storage.go b/repo/blob/retrying/retrying_storage.go index 9c158703f..dda05ccf1 100644 --- a/repo/blob/retrying/retrying_storage.go +++ b/repo/blob/retrying/retrying_storage.go @@ -72,7 +72,7 @@ func isRetriable(err error) bool { case errors.Is(err, blob.ErrBlobAlreadyExists): return false - case errors.Is(err, repo.ErrRepositoryUnavailableDueToUpgrageInProgress): + case errors.Is(err, repo.ErrRepositoryUnavailableDueToUpgradeInProgress): // hard-fail when upgrade is in progress return false diff --git a/repo/content/committed_content_index.go b/repo/content/committed_content_index.go index 90d313c16..c4a872f95 100644 --- a/repo/content/committed_content_index.go +++ b/repo/content/committed_content_index.go @@ -26,8 +26,9 @@ const smallIndexEntryCountThreshold = 100 type committedContentIndex struct { - rev atomic.Int64 - cache committedContentIndexCache + rev atomic.Int64 + cache committedContentIndexCache + permissiveCacheLoading bool mu sync.RWMutex // +checklocks:mu @@ -168,6 +169,10 @@ func (c *committedContentIndex) merge(ctx context.Context, indexFiles []blob.ID) ndx, err = c.cache.openIndex(ctx, e) if err != nil { + if c.permissiveCacheLoading { + continue + } + newlyOpened.Close() //nolint:errcheck return nil, nil, errors.Wrapf(err, "unable to open pack index %q", e) @@ -291,7 +296,7 @@ func (c *committedContentIndex) close() error { return nil } -func (c *committedContentIndex) fetchIndexBlobs(ctx context.Context, indexBlobs []blob.ID) error { +func (c *committedContentIndex) fetchIndexBlobs(ctx context.Context, isPermissiveCacheLoading bool, indexBlobs []blob.ID) error { ch, err := c.missingIndexBlobs(ctx, indexBlobs) if err != nil { return err @@ -304,6 +309,7 @@ func (c *committedContentIndex) fetchIndexBlobs(ctx context.Context, indexBlobs c.log.Debugf("Downloading %v new index blobs...", len(indexBlobs)) eg, ctx := errgroup.WithContext(ctx) + for i := 0; i < parallelFetches; i++ { eg.Go(func() error { var data gather.WriteBuffer @@ -313,6 +319,10 @@ func (c *committedContentIndex) fetchIndexBlobs(ctx context.Context, indexBlobs data.Reset() if err := c.fetchOne(ctx, indexBlobID, &data); err != nil { + if isPermissiveCacheLoading { + c.log.Errorf("skipping bad read of index blob %v", indexBlobID) + continue + } return errors.Wrapf(err, "error loading index blob %v", indexBlobID) } @@ -355,6 +365,7 @@ func (c *committedContentIndex) missingIndexBlobs(ctx context.Context, blobs []b func newCommittedContentIndex(caching *CachingOptions, v1PerContentOverhead func() int, formatProvider format.Provider, + permissiveCacheLoading bool, fetchOne func(ctx context.Context, blobID blob.ID, output *gather.WriteBuffer) error, log logging.Logger, minSweepAge time.Duration, @@ -372,11 +383,12 @@ func newCommittedContentIndex(caching *CachingOptions, } return &committedContentIndex{ - cache: cache, - inUse: map[blob.ID]index.Index{}, - v1PerContentOverhead: v1PerContentOverhead, - formatProvider: formatProvider, - fetchOne: fetchOne, - log: log, + cache: cache, + permissiveCacheLoading: permissiveCacheLoading, + inUse: map[blob.ID]index.Index{}, + v1PerContentOverhead: v1PerContentOverhead, + formatProvider: formatProvider, + fetchOne: fetchOne, + log: log, } } diff --git a/repo/content/committed_read_manager.go b/repo/content/committed_read_manager.go index 4c3b940c3..bca87cc08 100644 --- a/repo/content/committed_read_manager.go +++ b/repo/content/committed_read_manager.go @@ -87,7 +87,8 @@ type SharedManager struct { // lock to protect the set of committed indexes // shared lock will be acquired when writing new content to allow it to happen in parallel // exclusive lock will be acquired during compaction or refresh. - indexesLock sync.RWMutex + indexesLock sync.RWMutex + permissiveCacheLoading bool // maybeRefreshIndexes() will call Refresh() after this point in ime. // +checklocks:indexesLock @@ -228,7 +229,7 @@ func (sm *SharedManager) loadPackIndexesLocked(ctx context.Context) error { indexBlobIDs = append(indexBlobIDs, b.BlobID) } - err = sm.committedContents.fetchIndexBlobs(ctx, indexBlobIDs) + err = sm.committedContents.fetchIndexBlobs(ctx, sm.permissiveCacheLoading, indexBlobIDs) if err == nil { err = sm.committedContents.use(ctx, indexBlobIDs, ignoreDeletedBefore) if err != nil { @@ -260,17 +261,19 @@ func (sm *SharedManager) getCacheForContentID(id ID) cache.ContentCache { return sm.contentCache } +// indexBlobManager return the index manager for content. func (sm *SharedManager) indexBlobManager() (indexblob.Manager, error) { mp, mperr := sm.format.GetMutableParameters() if mperr != nil { return nil, errors.Wrap(mperr, "mutable parameters") } + var q indexblob.Manager = sm.indexBlobManagerV0 if mp.EpochParameters.Enabled { - return sm.indexBlobManagerV1, nil + q = sm.indexBlobManagerV1 } - return sm.indexBlobManagerV0, nil + return q, nil } func (sm *SharedManager) decryptContentAndVerify(payload gather.Bytes, bi Info, output *gather.WriteBuffer) error { @@ -505,6 +508,7 @@ func(ctx context.Context, blobIDs []blob.ID, outputPrefix blob.ID) error { sm.committedContents = newCommittedContentIndex(caching, sm.format.Encryptor().Overhead, sm.format, + sm.permissiveCacheLoading, enc.GetEncryptedBlob, sm.namedLogger("committed-content-index"), caching.MinIndexSweepAge.DurationOrDefault(DefaultIndexCacheSweepAge)) @@ -598,6 +602,7 @@ func NewSharedManager(ctx context.Context, st blob.Storage, prov format.Provider Stats: new(Stats), timeNow: opts.TimeNow, format: prov, + permissiveCacheLoading: opts.PermissiveCacheLoading, minPreambleLength: defaultMinPreambleLength, maxPreambleLength: defaultMaxPreambleLength, paddingUnit: defaultPaddingUnit, diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index 5d66695c2..4d4b5c6b3 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -161,6 +161,10 @@ func (bm *WriteManager) DeleteContent(ctx context.Context, contentID ID) error { } func (bm *WriteManager) maybeRefreshIndexes(ctx context.Context) error { + if bm.permissiveCacheLoading { + return nil + } + if !bm.disableIndexRefresh.Load() && bm.shouldRefreshIndexes() { if err := bm.Refresh(ctx); err != nil { return errors.Wrap(err, "error refreshing indexes") @@ -943,10 +947,11 @@ func (bm *WriteManager) MetadataCache() cache.ContentCache { // ManagerOptions are the optional parameters for manager creation. type ManagerOptions struct { - TimeNow func() time.Time // Time provider - DisableInternalLog bool - RetentionMode string - RetentionPeriod time.Duration + TimeNow func() time.Time // Time provider + DisableInternalLog bool + RetentionMode string + RetentionPeriod time.Duration + PermissiveCacheLoading bool } // CloneOrDefault returns a clone of provided ManagerOptions or default empty struct if nil. diff --git a/repo/content/content_manager_test.go b/repo/content/content_manager_test.go index 09b775ea6..7da6b759b 100644 --- a/repo/content/content_manager_test.go +++ b/repo/content/content_manager_test.go @@ -2283,6 +2283,110 @@ func (s *contentManagerSuite) TestPrefetchContent(t *testing.T) { } } +// TestContentPermissiveCacheLoading check that permissive reads read content as recorded. +func (s *contentManagerSuite) TestContentPermissiveCacheLoading(t *testing.T) { + data := blobtesting.DataMap{} + timeNow := faketime.AutoAdvance(fakeTime, 1*time.Second) + st := blobtesting.NewMapStorage(data, nil, timeNow) + + ctx := testlogging.Context(t) + + tweaks := &contentManagerTestTweaks{ + ManagerOptions: ManagerOptions{ + TimeNow: timeNow, + }, + } + + bm := s.newTestContentManagerWithTweaks(t, st, tweaks) + + ids := make([]ID, 100) + for i := 0; i < len(ids); i++ { + ids[i] = writeContentAndVerify(ctx, t, bm, seededRandomData(i, maxPackCapacity/2)) + + for j := 0; j < i; j++ { + // verify all contents written so far + verifyContent(ctx, t, bm, ids[j], seededRandomData(j, maxPackCapacity/2)) + } + + // every 10 contents, create new content manager + if i%10 == 0 { + t.Logf("------- flushing & reopening -----") + require.NoError(t, bm.Flush(ctx)) + require.NoError(t, bm.CloseShared(ctx)) + bm = s.newTestContentManagerWithTweaks(t, st, tweaks) + } + } + + require.NoError(t, bm.Flush(ctx)) + require.NoError(t, bm.CloseShared(ctx)) + + tweaks = &contentManagerTestTweaks{ + ManagerOptions: ManagerOptions{ + TimeNow: timeNow, + PermissiveCacheLoading: true, + }, + } + + bm = s.newTestContentManagerWithTweaks(t, st, tweaks) + + for i := 0; i < len(ids); i++ { + verifyContent(ctx, t, bm, ids[i], seededRandomData(i, maxPackCapacity/2)) + } +} + +// TestContentIndexPermissiveReadsWithFault check that permissive reads read content as recorded. +func (s *contentManagerSuite) TestContentIndexPermissiveReadsWithFault(t *testing.T) { + data := blobtesting.DataMap{} + timeNow := faketime.AutoAdvance(fakeTime, 1*time.Second) + st := blobtesting.NewMapStorage(data, nil, timeNow) + + ctx := testlogging.Context(t) + + tweaks := &contentManagerTestTweaks{ + ManagerOptions: ManagerOptions{ + TimeNow: timeNow, + }, + } + + bm := s.newTestContentManagerWithTweaks(t, st, tweaks) + + ids := make([]ID, 100) + for i := 0; i < len(ids); i++ { + ids[i] = writeContentAndVerify(ctx, t, bm, seededRandomData(i, maxPackCapacity/2)) + + for j := 0; j < i; j++ { + // verify all contents written so far + verifyContent(ctx, t, bm, ids[j], seededRandomData(j, maxPackCapacity/2)) + } + + // every 10 contents, create new content manager + if i%10 == 0 { + t.Logf("------- flushing & reopening -----") + require.NoError(t, bm.Flush(ctx)) + require.NoError(t, bm.CloseShared(ctx)) + bm = s.newTestContentManagerWithTweaks(t, st, tweaks) + } + } + + require.NoError(t, format.WriteLegacyIndexPoisonBlob(ctx, st)) + + require.NoError(t, bm.Flush(ctx)) + require.NoError(t, bm.CloseShared(ctx)) + + tweaks = &contentManagerTestTweaks{ + ManagerOptions: ManagerOptions{ + TimeNow: timeNow, + PermissiveCacheLoading: true, + }, + } + + bm = s.newTestContentManagerWithTweaks(t, st, tweaks) + + for i := 0; i < len(ids); i++ { + verifyContent(ctx, t, bm, ids[i], seededRandomData(i, maxPackCapacity/2)) + } +} + func wipeCache(t *testing.T, st cache.Storage) { t.Helper() diff --git a/repo/format/upgrade_lock_test.go b/repo/format/upgrade_lock_test.go index 994202285..6088716cd 100644 --- a/repo/format/upgrade_lock_test.go +++ b/repo/format/upgrade_lock_test.go @@ -391,11 +391,11 @@ func TestFormatUpgradeDuringOngoingWriteSessions(t *testing.T) { curTime = curTime.Add(formatBlockCacheDuration + time.Second) // ongoing writes should get interrupted this time - require.ErrorIs(t, w1.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress) + require.ErrorIs(t, w1.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgradeInProgress) - require.ErrorIs(t, w2.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress) - require.ErrorIs(t, w3.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress) - require.ErrorIs(t, lw.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress) + require.ErrorIs(t, w2.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgradeInProgress) + require.ErrorIs(t, w3.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgradeInProgress) + require.ErrorIs(t, lw.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgradeInProgress) } func writeObject(ctx context.Context, t *testing.T, rep repo.RepositoryWriter, data []byte, testCaseID string) { diff --git a/repo/local_config.go b/repo/local_config.go index 4efcc1ef7..a5c672a1d 100644 --- a/repo/local_config.go +++ b/repo/local_config.go @@ -20,12 +20,16 @@ const configDirMode = 0o700 +// ErrCannotWriteToRepoConnectionWithPermissiveCacheLoading error to indicate. +var ErrCannotWriteToRepoConnectionWithPermissiveCacheLoading = errors.Errorf("cannot write to repo connection with permissive cache loading") + // ClientOptions contains client-specific options that are persisted in local configuration file. type ClientOptions struct { Hostname string `json:"hostname"` Username string `json:"username"` - ReadOnly bool `json:"readonly,omitempty"` + ReadOnly bool `json:"readonly,omitempty"` + PermissiveCacheLoading bool `json:"permissiveCacheLoading,omitempty"` // Description is human-readable description of the repository to use in the UI. Description string `json:"description,omitempty"` @@ -149,5 +153,9 @@ func LoadConfigFromFile(fileName string) (*LocalConfig, error) { } } + if lc.PermissiveCacheLoading && os.Getenv("KOPIA_UPGRADE_LOCK_ENABLED") == "" { + return nil, errors.New("must have set KOPIA_UPGRADE_LOCK_ENABLED when connecting to repository with permissive cache loading") + } + return &lc, nil } diff --git a/repo/open.go b/repo/open.go index d262662d8..472912bdb 100644 --- a/repo/open.go +++ b/repo/open.go @@ -85,9 +85,9 @@ type Options struct { // ErrAlreadyInitialized is returned when repository is already initialized in the provided storage. var ErrAlreadyInitialized = format.ErrAlreadyInitialized -// ErrRepositoryUnavailableDueToUpgrageInProgress is returned when repository +// ErrRepositoryUnavailableDueToUpgradeInProgress is returned when repository // is undergoing upgrade that requires exclusive access. -var ErrRepositoryUnavailableDueToUpgrageInProgress = errors.Errorf("repository upgrade in progress") +var ErrRepositoryUnavailableDueToUpgradeInProgress = errors.Errorf("repository upgrade in progress") // Open opens a Repository specified in the configuration file. func Open(ctx context.Context, configFile, password string, options *Options) (rep Repository, err error) { @@ -121,6 +121,10 @@ func Open(ctx context.Context, configFile, password string, options *Options) (r return nil, err } + if lc.PermissiveCacheLoading && !lc.ReadOnly { + return nil, ErrCannotWriteToRepoConnectionWithPermissiveCacheLoading + } + if lc.APIServer != nil { return openAPIServer(ctx, lc.APIServer, lc.ClientOptions, lc.Caching, password, options) } @@ -235,8 +239,9 @@ func openDirect(ctx context.Context, configFile string, lc *LocalConfig, passwor func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions, password string, options *Options, cacheOpts *content.CachingOptions, configFile string) (DirectRepository, error) { cacheOpts = cacheOpts.CloneOrDefault() cmOpts := &content.ManagerOptions{ - TimeNow: defaultTime(options.TimeNowFunc), - DisableInternalLog: options.DisableInternalLog, + TimeNow: defaultTime(options.TimeNowFunc), + DisableInternalLog: options.DisableInternalLog, + PermissiveCacheLoading: cliOpts.PermissiveCacheLoading, } mr := metrics.NewRegistry() @@ -247,25 +252,7 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions, return nil, errors.Wrap(ferr, "unable to create format manager") } - if _, err := retry.WithExponentialBackoffMaxRetries(ctx, -1, "wait for upgrade", func() (interface{}, error) { - uli, err := fmgr.UpgradeLockIntent() - if err != nil { - //nolint:wrapcheck - return nil, err - } - - // retry if upgrade lock has been taken - if locked, _ := uli.IsLocked(cmOpts.TimeNow()); locked && options.UpgradeOwnerID != uli.OwnerID { - return nil, ErrRepositoryUnavailableDueToUpgrageInProgress - } - - return false, nil - }, func(internalErr error) bool { - return !options.DoNotWaitForUpgrade && errors.Is(internalErr, ErrRepositoryUnavailableDueToUpgrageInProgress) - }); err != nil { - return nil, err - } - + // check features before and perform configuration before performing IO if err := handleMissingRequiredFeatures(ctx, fmgr, options.TestOnlyIgnoreMissingRequiredFeatures); err != nil { return nil, err } @@ -307,8 +294,33 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions, st = wrapLockingStorage(st, blobcfg) } - // background/interleaving upgrade lock storage monitor - st = upgradeLockMonitor(fmgr, options.UpgradeOwnerID, st, cmOpts.TimeNow, options.OnFatalError, options.TestOnlyIgnoreMissingRequiredFeatures) + _, err = retry.WithExponentialBackoffMaxRetries(ctx, -1, "wait for upgrade", func() (interface{}, error) { + //nolint:govet + uli, err := fmgr.UpgradeLockIntent() + if err != nil { + //nolint:wrapcheck + return nil, err + } + + // retry if upgrade lock has been taken + if !cliOpts.PermissiveCacheLoading { + if locked, _ := uli.IsLocked(cmOpts.TimeNow()); locked && options.UpgradeOwnerID != uli.OwnerID { + return nil, ErrRepositoryUnavailableDueToUpgradeInProgress + } + } + + return false, nil + }, func(internalErr error) bool { + return !options.DoNotWaitForUpgrade && errors.Is(internalErr, ErrRepositoryUnavailableDueToUpgradeInProgress) + }) + if err != nil { + return nil, err + } + + if !cliOpts.PermissiveCacheLoading { + // background/interleaving upgrade lock storage monitor + st = upgradeLockMonitor(fmgr, options.UpgradeOwnerID, st, cmOpts.TimeNow, options.OnFatalError, options.TestOnlyIgnoreMissingRequiredFeatures) + } scm, ferr := content.NewSharedManager(ctx, st, fmgr, cacheOpts, cmOpts, mr) if ferr != nil { @@ -456,7 +468,7 @@ func upgradeLockMonitor( if uli != nil { // only allow the upgrade owner to perform storage operations if locked, _ := uli.IsLocked(now()); locked && upgradeOwnerID != uli.OwnerID { - return ErrRepositoryUnavailableDueToUpgrageInProgress + return ErrRepositoryUnavailableDueToUpgradeInProgress } } diff --git a/tests/end_to_end_test/server_start_test.go b/tests/end_to_end_test/server_start_test.go index 63473bab9..d08dca42b 100644 --- a/tests/end_to_end_test/server_start_test.go +++ b/tests/end_to_end_test/server_start_test.go @@ -20,6 +20,7 @@ "github.com/kopia/kopia/internal/testlogging" "github.com/kopia/kopia/internal/testutil" "github.com/kopia/kopia/internal/uitask" + "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/blob/filesystem" "github.com/kopia/kopia/snapshot" @@ -253,6 +254,9 @@ func TestServerStartAsyncRepoConnect(t *testing.T) { func TestServerCreateAndConnectViaAPI(t *testing.T) { t.Parallel() + //nolint:tenv + os.Setenv("KOPIA_UPGRADE_LOCK_ENABLED", "true") + ctx := testlogging.Context(t) runner := testenv.NewInProcRunner(t) @@ -309,6 +313,9 @@ func TestServerCreateAndConnectViaAPI(t *testing.T) { ConnectRepositoryRequest: serverapi.ConnectRepositoryRequest{ Password: "foofoo", Storage: connInfo, + ClientOptions: repo.ClientOptions{ + PermissiveCacheLoading: true, + }, }, }); err != nil { t.Fatalf("create error: %v", err)