mirror of
https://github.com/kopia/kopia.git
synced 2026-05-18 03:34:41 -04:00
feat(cli): ability to read repository status ... (#2799)
* fix spelling * permissive index read * fixup v1 permissive index * fixups for lint * trigger rebuild * Update repo/local_config.go Co-authored-by: Shikhar Mall <mall.shikhar.in@gmail.com> * rename permissive read flag * extend "permissive-read" rename * hide permissive command-line flag * syntax fixup * fixup for test * fixups for tests * Update .golangci.yml Co-authored-by: Shikhar Mall <mall.shikhar.in@gmail.com> --------- Co-authored-by: Shikhar Mall <mall.shikhar.in@gmail.com>
This commit is contained in:
1
Makefile
1
Makefile
@@ -480,3 +480,4 @@ perf-benchmark-results:
|
||||
gcloud compute scp $(PERF_BENCHMARK_INSTANCE):psrecord-* tests/perf_benchmark --zone=$(PERF_BENCHMARK_INSTANCE_ZONE)
|
||||
gcloud compute scp $(PERF_BENCHMARK_INSTANCE):repo-size-* tests/perf_benchmark --zone=$(PERF_BENCHMARK_INSTANCE_ZONE)
|
||||
(cd tests/perf_benchmark && go run process_results.go)
|
||||
|
||||
|
||||
@@ -54,6 +54,7 @@ type connectOptions struct {
|
||||
connectUsername string
|
||||
connectCheckForUpdates bool
|
||||
connectReadonly bool
|
||||
connectPermissiveCacheLoading bool
|
||||
connectDescription string
|
||||
connectEnableActions bool
|
||||
|
||||
@@ -72,6 +73,7 @@ func (c *connectOptions) setup(svc appServices, cmd *kingpin.CmdClause) {
|
||||
cmd.Flag("override-username", "Override username used by this repository connection").Hidden().StringVar(&c.connectUsername)
|
||||
cmd.Flag("check-for-updates", "Periodically check for Kopia updates on GitHub").Default("true").Envar(svc.EnvName(checkForUpdatesEnvar)).BoolVar(&c.connectCheckForUpdates)
|
||||
cmd.Flag("readonly", "Make repository read-only to avoid accidental changes").BoolVar(&c.connectReadonly)
|
||||
cmd.Flag("permissive-cache-loading", "Do not fail when loading bad cache index entries. Repository must be opened in read-only mode").Hidden().BoolVar(&c.connectPermissiveCacheLoading)
|
||||
cmd.Flag("description", "Human-readable description of the repository").StringVar(&c.connectDescription)
|
||||
cmd.Flag("enable-actions", "Allow snapshot actions").BoolVar(&c.connectEnableActions)
|
||||
cmd.Flag("repository-format-cache-duration", "Duration of kopia.repository format blob cache").Hidden().DurationVar(&c.formatBlobCacheDuration)
|
||||
@@ -98,6 +100,7 @@ func (c *connectOptions) toRepoConnectOptions() *repo.ConnectOptions {
|
||||
Hostname: c.connectHostname,
|
||||
Username: c.connectUsername,
|
||||
ReadOnly: c.connectReadonly,
|
||||
PermissiveCacheLoading: c.connectPermissiveCacheLoading,
|
||||
Description: c.connectDescription,
|
||||
EnableActions: c.connectEnableActions,
|
||||
FormatBlobCacheDuration: c.getFormatBlobCacheDuration(),
|
||||
|
||||
@@ -10,11 +10,12 @@
|
||||
)
|
||||
|
||||
type commandRepositorySetClient struct {
|
||||
repoClientOptionsReadOnly bool
|
||||
repoClientOptionsReadWrite bool
|
||||
repoClientOptionsDescription []string
|
||||
repoClientOptionsUsername []string
|
||||
repoClientOptionsHostname []string
|
||||
repoClientOptionsReadOnly bool
|
||||
repoClientOptionsReadWrite bool
|
||||
repoClientOptionsPermissiveCacheLoading bool
|
||||
repoClientOptionsDescription []string
|
||||
repoClientOptionsUsername []string
|
||||
repoClientOptionsHostname []string
|
||||
|
||||
formatBlobCacheDuration time.Duration
|
||||
disableFormatBlobCache bool
|
||||
@@ -27,6 +28,7 @@ func (c *commandRepositorySetClient) setup(svc appServices, parent commandParent
|
||||
|
||||
cmd.Flag("read-only", "Set repository to read-only").BoolVar(&c.repoClientOptionsReadOnly)
|
||||
cmd.Flag("read-write", "Set repository to read-write").BoolVar(&c.repoClientOptionsReadWrite)
|
||||
cmd.Flag("permissive-cache-loading", "Do not fail when loading bad cache index entries. Repository must be opened in read-only mode").Hidden().BoolVar(&c.repoClientOptionsPermissiveCacheLoading)
|
||||
cmd.Flag("description", "Change description").StringsVar(&c.repoClientOptionsDescription)
|
||||
cmd.Flag("username", "Change username").StringsVar(&c.repoClientOptionsUsername)
|
||||
cmd.Flag("hostname", "Change hostname").StringsVar(&c.repoClientOptionsHostname)
|
||||
@@ -64,6 +66,17 @@ func (c *commandRepositorySetClient) run(ctx context.Context, rep repo.Repositor
|
||||
}
|
||||
}
|
||||
|
||||
if c.repoClientOptionsPermissiveCacheLoading {
|
||||
if !opt.PermissiveCacheLoading {
|
||||
log(ctx).Infof("Repository fails on read of bad index blobs.")
|
||||
} else {
|
||||
opt.PermissiveCacheLoading = true
|
||||
anyChange = true
|
||||
|
||||
log(ctx).Infof("Setting to load indicies into cache permissively.")
|
||||
}
|
||||
}
|
||||
|
||||
if v := c.repoClientOptionsDescription; len(v) > 0 {
|
||||
opt.Description = v[0]
|
||||
anyChange = true
|
||||
|
||||
@@ -72,7 +72,7 @@ func isRetriable(err error) bool {
|
||||
case errors.Is(err, blob.ErrBlobAlreadyExists):
|
||||
return false
|
||||
|
||||
case errors.Is(err, repo.ErrRepositoryUnavailableDueToUpgrageInProgress):
|
||||
case errors.Is(err, repo.ErrRepositoryUnavailableDueToUpgradeInProgress):
|
||||
// hard-fail when upgrade is in progress
|
||||
return false
|
||||
|
||||
|
||||
@@ -26,8 +26,9 @@
|
||||
const smallIndexEntryCountThreshold = 100
|
||||
|
||||
type committedContentIndex struct {
|
||||
rev atomic.Int64
|
||||
cache committedContentIndexCache
|
||||
rev atomic.Int64
|
||||
cache committedContentIndexCache
|
||||
permissiveCacheLoading bool
|
||||
|
||||
mu sync.RWMutex
|
||||
// +checklocks:mu
|
||||
@@ -168,6 +169,10 @@ func (c *committedContentIndex) merge(ctx context.Context, indexFiles []blob.ID)
|
||||
|
||||
ndx, err = c.cache.openIndex(ctx, e)
|
||||
if err != nil {
|
||||
if c.permissiveCacheLoading {
|
||||
continue
|
||||
}
|
||||
|
||||
newlyOpened.Close() //nolint:errcheck
|
||||
|
||||
return nil, nil, errors.Wrapf(err, "unable to open pack index %q", e)
|
||||
@@ -291,7 +296,7 @@ func (c *committedContentIndex) close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *committedContentIndex) fetchIndexBlobs(ctx context.Context, indexBlobs []blob.ID) error {
|
||||
func (c *committedContentIndex) fetchIndexBlobs(ctx context.Context, isPermissiveCacheLoading bool, indexBlobs []blob.ID) error {
|
||||
ch, err := c.missingIndexBlobs(ctx, indexBlobs)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -304,6 +309,7 @@ func (c *committedContentIndex) fetchIndexBlobs(ctx context.Context, indexBlobs
|
||||
c.log.Debugf("Downloading %v new index blobs...", len(indexBlobs))
|
||||
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
for i := 0; i < parallelFetches; i++ {
|
||||
eg.Go(func() error {
|
||||
var data gather.WriteBuffer
|
||||
@@ -313,6 +319,10 @@ func (c *committedContentIndex) fetchIndexBlobs(ctx context.Context, indexBlobs
|
||||
data.Reset()
|
||||
|
||||
if err := c.fetchOne(ctx, indexBlobID, &data); err != nil {
|
||||
if isPermissiveCacheLoading {
|
||||
c.log.Errorf("skipping bad read of index blob %v", indexBlobID)
|
||||
continue
|
||||
}
|
||||
return errors.Wrapf(err, "error loading index blob %v", indexBlobID)
|
||||
}
|
||||
|
||||
@@ -355,6 +365,7 @@ func (c *committedContentIndex) missingIndexBlobs(ctx context.Context, blobs []b
|
||||
func newCommittedContentIndex(caching *CachingOptions,
|
||||
v1PerContentOverhead func() int,
|
||||
formatProvider format.Provider,
|
||||
permissiveCacheLoading bool,
|
||||
fetchOne func(ctx context.Context, blobID blob.ID, output *gather.WriteBuffer) error,
|
||||
log logging.Logger,
|
||||
minSweepAge time.Duration,
|
||||
@@ -372,11 +383,12 @@ func newCommittedContentIndex(caching *CachingOptions,
|
||||
}
|
||||
|
||||
return &committedContentIndex{
|
||||
cache: cache,
|
||||
inUse: map[blob.ID]index.Index{},
|
||||
v1PerContentOverhead: v1PerContentOverhead,
|
||||
formatProvider: formatProvider,
|
||||
fetchOne: fetchOne,
|
||||
log: log,
|
||||
cache: cache,
|
||||
permissiveCacheLoading: permissiveCacheLoading,
|
||||
inUse: map[blob.ID]index.Index{},
|
||||
v1PerContentOverhead: v1PerContentOverhead,
|
||||
formatProvider: formatProvider,
|
||||
fetchOne: fetchOne,
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,7 +87,8 @@ type SharedManager struct {
|
||||
// lock to protect the set of committed indexes
|
||||
// shared lock will be acquired when writing new content to allow it to happen in parallel
|
||||
// exclusive lock will be acquired during compaction or refresh.
|
||||
indexesLock sync.RWMutex
|
||||
indexesLock sync.RWMutex
|
||||
permissiveCacheLoading bool
|
||||
|
||||
// maybeRefreshIndexes() will call Refresh() after this point in ime.
|
||||
// +checklocks:indexesLock
|
||||
@@ -228,7 +229,7 @@ func (sm *SharedManager) loadPackIndexesLocked(ctx context.Context) error {
|
||||
indexBlobIDs = append(indexBlobIDs, b.BlobID)
|
||||
}
|
||||
|
||||
err = sm.committedContents.fetchIndexBlobs(ctx, indexBlobIDs)
|
||||
err = sm.committedContents.fetchIndexBlobs(ctx, sm.permissiveCacheLoading, indexBlobIDs)
|
||||
if err == nil {
|
||||
err = sm.committedContents.use(ctx, indexBlobIDs, ignoreDeletedBefore)
|
||||
if err != nil {
|
||||
@@ -260,17 +261,19 @@ func (sm *SharedManager) getCacheForContentID(id ID) cache.ContentCache {
|
||||
return sm.contentCache
|
||||
}
|
||||
|
||||
// indexBlobManager return the index manager for content.
|
||||
func (sm *SharedManager) indexBlobManager() (indexblob.Manager, error) {
|
||||
mp, mperr := sm.format.GetMutableParameters()
|
||||
if mperr != nil {
|
||||
return nil, errors.Wrap(mperr, "mutable parameters")
|
||||
}
|
||||
|
||||
var q indexblob.Manager = sm.indexBlobManagerV0
|
||||
if mp.EpochParameters.Enabled {
|
||||
return sm.indexBlobManagerV1, nil
|
||||
q = sm.indexBlobManagerV1
|
||||
}
|
||||
|
||||
return sm.indexBlobManagerV0, nil
|
||||
return q, nil
|
||||
}
|
||||
|
||||
func (sm *SharedManager) decryptContentAndVerify(payload gather.Bytes, bi Info, output *gather.WriteBuffer) error {
|
||||
@@ -505,6 +508,7 @@ func(ctx context.Context, blobIDs []blob.ID, outputPrefix blob.ID) error {
|
||||
sm.committedContents = newCommittedContentIndex(caching,
|
||||
sm.format.Encryptor().Overhead,
|
||||
sm.format,
|
||||
sm.permissiveCacheLoading,
|
||||
enc.GetEncryptedBlob,
|
||||
sm.namedLogger("committed-content-index"),
|
||||
caching.MinIndexSweepAge.DurationOrDefault(DefaultIndexCacheSweepAge))
|
||||
@@ -598,6 +602,7 @@ func NewSharedManager(ctx context.Context, st blob.Storage, prov format.Provider
|
||||
Stats: new(Stats),
|
||||
timeNow: opts.TimeNow,
|
||||
format: prov,
|
||||
permissiveCacheLoading: opts.PermissiveCacheLoading,
|
||||
minPreambleLength: defaultMinPreambleLength,
|
||||
maxPreambleLength: defaultMaxPreambleLength,
|
||||
paddingUnit: defaultPaddingUnit,
|
||||
|
||||
@@ -161,6 +161,10 @@ func (bm *WriteManager) DeleteContent(ctx context.Context, contentID ID) error {
|
||||
}
|
||||
|
||||
func (bm *WriteManager) maybeRefreshIndexes(ctx context.Context) error {
|
||||
if bm.permissiveCacheLoading {
|
||||
return nil
|
||||
}
|
||||
|
||||
if !bm.disableIndexRefresh.Load() && bm.shouldRefreshIndexes() {
|
||||
if err := bm.Refresh(ctx); err != nil {
|
||||
return errors.Wrap(err, "error refreshing indexes")
|
||||
@@ -943,10 +947,11 @@ func (bm *WriteManager) MetadataCache() cache.ContentCache {
|
||||
|
||||
// ManagerOptions are the optional parameters for manager creation.
|
||||
type ManagerOptions struct {
|
||||
TimeNow func() time.Time // Time provider
|
||||
DisableInternalLog bool
|
||||
RetentionMode string
|
||||
RetentionPeriod time.Duration
|
||||
TimeNow func() time.Time // Time provider
|
||||
DisableInternalLog bool
|
||||
RetentionMode string
|
||||
RetentionPeriod time.Duration
|
||||
PermissiveCacheLoading bool
|
||||
}
|
||||
|
||||
// CloneOrDefault returns a clone of provided ManagerOptions or default empty struct if nil.
|
||||
|
||||
@@ -2283,6 +2283,110 @@ func (s *contentManagerSuite) TestPrefetchContent(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestContentPermissiveCacheLoading check that permissive reads read content as recorded.
|
||||
func (s *contentManagerSuite) TestContentPermissiveCacheLoading(t *testing.T) {
|
||||
data := blobtesting.DataMap{}
|
||||
timeNow := faketime.AutoAdvance(fakeTime, 1*time.Second)
|
||||
st := blobtesting.NewMapStorage(data, nil, timeNow)
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
|
||||
tweaks := &contentManagerTestTweaks{
|
||||
ManagerOptions: ManagerOptions{
|
||||
TimeNow: timeNow,
|
||||
},
|
||||
}
|
||||
|
||||
bm := s.newTestContentManagerWithTweaks(t, st, tweaks)
|
||||
|
||||
ids := make([]ID, 100)
|
||||
for i := 0; i < len(ids); i++ {
|
||||
ids[i] = writeContentAndVerify(ctx, t, bm, seededRandomData(i, maxPackCapacity/2))
|
||||
|
||||
for j := 0; j < i; j++ {
|
||||
// verify all contents written so far
|
||||
verifyContent(ctx, t, bm, ids[j], seededRandomData(j, maxPackCapacity/2))
|
||||
}
|
||||
|
||||
// every 10 contents, create new content manager
|
||||
if i%10 == 0 {
|
||||
t.Logf("------- flushing & reopening -----")
|
||||
require.NoError(t, bm.Flush(ctx))
|
||||
require.NoError(t, bm.CloseShared(ctx))
|
||||
bm = s.newTestContentManagerWithTweaks(t, st, tweaks)
|
||||
}
|
||||
}
|
||||
|
||||
require.NoError(t, bm.Flush(ctx))
|
||||
require.NoError(t, bm.CloseShared(ctx))
|
||||
|
||||
tweaks = &contentManagerTestTweaks{
|
||||
ManagerOptions: ManagerOptions{
|
||||
TimeNow: timeNow,
|
||||
PermissiveCacheLoading: true,
|
||||
},
|
||||
}
|
||||
|
||||
bm = s.newTestContentManagerWithTweaks(t, st, tweaks)
|
||||
|
||||
for i := 0; i < len(ids); i++ {
|
||||
verifyContent(ctx, t, bm, ids[i], seededRandomData(i, maxPackCapacity/2))
|
||||
}
|
||||
}
|
||||
|
||||
// TestContentIndexPermissiveReadsWithFault check that permissive reads read content as recorded.
|
||||
func (s *contentManagerSuite) TestContentIndexPermissiveReadsWithFault(t *testing.T) {
|
||||
data := blobtesting.DataMap{}
|
||||
timeNow := faketime.AutoAdvance(fakeTime, 1*time.Second)
|
||||
st := blobtesting.NewMapStorage(data, nil, timeNow)
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
|
||||
tweaks := &contentManagerTestTweaks{
|
||||
ManagerOptions: ManagerOptions{
|
||||
TimeNow: timeNow,
|
||||
},
|
||||
}
|
||||
|
||||
bm := s.newTestContentManagerWithTweaks(t, st, tweaks)
|
||||
|
||||
ids := make([]ID, 100)
|
||||
for i := 0; i < len(ids); i++ {
|
||||
ids[i] = writeContentAndVerify(ctx, t, bm, seededRandomData(i, maxPackCapacity/2))
|
||||
|
||||
for j := 0; j < i; j++ {
|
||||
// verify all contents written so far
|
||||
verifyContent(ctx, t, bm, ids[j], seededRandomData(j, maxPackCapacity/2))
|
||||
}
|
||||
|
||||
// every 10 contents, create new content manager
|
||||
if i%10 == 0 {
|
||||
t.Logf("------- flushing & reopening -----")
|
||||
require.NoError(t, bm.Flush(ctx))
|
||||
require.NoError(t, bm.CloseShared(ctx))
|
||||
bm = s.newTestContentManagerWithTweaks(t, st, tweaks)
|
||||
}
|
||||
}
|
||||
|
||||
require.NoError(t, format.WriteLegacyIndexPoisonBlob(ctx, st))
|
||||
|
||||
require.NoError(t, bm.Flush(ctx))
|
||||
require.NoError(t, bm.CloseShared(ctx))
|
||||
|
||||
tweaks = &contentManagerTestTweaks{
|
||||
ManagerOptions: ManagerOptions{
|
||||
TimeNow: timeNow,
|
||||
PermissiveCacheLoading: true,
|
||||
},
|
||||
}
|
||||
|
||||
bm = s.newTestContentManagerWithTweaks(t, st, tweaks)
|
||||
|
||||
for i := 0; i < len(ids); i++ {
|
||||
verifyContent(ctx, t, bm, ids[i], seededRandomData(i, maxPackCapacity/2))
|
||||
}
|
||||
}
|
||||
|
||||
func wipeCache(t *testing.T, st cache.Storage) {
|
||||
t.Helper()
|
||||
|
||||
|
||||
@@ -391,11 +391,11 @@ func TestFormatUpgradeDuringOngoingWriteSessions(t *testing.T) {
|
||||
curTime = curTime.Add(formatBlockCacheDuration + time.Second)
|
||||
|
||||
// ongoing writes should get interrupted this time
|
||||
require.ErrorIs(t, w1.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress)
|
||||
require.ErrorIs(t, w1.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgradeInProgress)
|
||||
|
||||
require.ErrorIs(t, w2.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress)
|
||||
require.ErrorIs(t, w3.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress)
|
||||
require.ErrorIs(t, lw.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress)
|
||||
require.ErrorIs(t, w2.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgradeInProgress)
|
||||
require.ErrorIs(t, w3.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgradeInProgress)
|
||||
require.ErrorIs(t, lw.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgradeInProgress)
|
||||
}
|
||||
|
||||
func writeObject(ctx context.Context, t *testing.T, rep repo.RepositoryWriter, data []byte, testCaseID string) {
|
||||
|
||||
@@ -20,12 +20,16 @@
|
||||
|
||||
const configDirMode = 0o700
|
||||
|
||||
// ErrCannotWriteToRepoConnectionWithPermissiveCacheLoading error to indicate.
|
||||
var ErrCannotWriteToRepoConnectionWithPermissiveCacheLoading = errors.Errorf("cannot write to repo connection with permissive cache loading")
|
||||
|
||||
// ClientOptions contains client-specific options that are persisted in local configuration file.
|
||||
type ClientOptions struct {
|
||||
Hostname string `json:"hostname"`
|
||||
Username string `json:"username"`
|
||||
|
||||
ReadOnly bool `json:"readonly,omitempty"`
|
||||
ReadOnly bool `json:"readonly,omitempty"`
|
||||
PermissiveCacheLoading bool `json:"permissiveCacheLoading,omitempty"`
|
||||
|
||||
// Description is human-readable description of the repository to use in the UI.
|
||||
Description string `json:"description,omitempty"`
|
||||
@@ -149,5 +153,9 @@ func LoadConfigFromFile(fileName string) (*LocalConfig, error) {
|
||||
}
|
||||
}
|
||||
|
||||
if lc.PermissiveCacheLoading && os.Getenv("KOPIA_UPGRADE_LOCK_ENABLED") == "" {
|
||||
return nil, errors.New("must have set KOPIA_UPGRADE_LOCK_ENABLED when connecting to repository with permissive cache loading")
|
||||
}
|
||||
|
||||
return &lc, nil
|
||||
}
|
||||
|
||||
64
repo/open.go
64
repo/open.go
@@ -85,9 +85,9 @@ type Options struct {
|
||||
// ErrAlreadyInitialized is returned when repository is already initialized in the provided storage.
|
||||
var ErrAlreadyInitialized = format.ErrAlreadyInitialized
|
||||
|
||||
// ErrRepositoryUnavailableDueToUpgrageInProgress is returned when repository
|
||||
// ErrRepositoryUnavailableDueToUpgradeInProgress is returned when repository
|
||||
// is undergoing upgrade that requires exclusive access.
|
||||
var ErrRepositoryUnavailableDueToUpgrageInProgress = errors.Errorf("repository upgrade in progress")
|
||||
var ErrRepositoryUnavailableDueToUpgradeInProgress = errors.Errorf("repository upgrade in progress")
|
||||
|
||||
// Open opens a Repository specified in the configuration file.
|
||||
func Open(ctx context.Context, configFile, password string, options *Options) (rep Repository, err error) {
|
||||
@@ -121,6 +121,10 @@ func Open(ctx context.Context, configFile, password string, options *Options) (r
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if lc.PermissiveCacheLoading && !lc.ReadOnly {
|
||||
return nil, ErrCannotWriteToRepoConnectionWithPermissiveCacheLoading
|
||||
}
|
||||
|
||||
if lc.APIServer != nil {
|
||||
return openAPIServer(ctx, lc.APIServer, lc.ClientOptions, lc.Caching, password, options)
|
||||
}
|
||||
@@ -235,8 +239,9 @@ func openDirect(ctx context.Context, configFile string, lc *LocalConfig, passwor
|
||||
func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions, password string, options *Options, cacheOpts *content.CachingOptions, configFile string) (DirectRepository, error) {
|
||||
cacheOpts = cacheOpts.CloneOrDefault()
|
||||
cmOpts := &content.ManagerOptions{
|
||||
TimeNow: defaultTime(options.TimeNowFunc),
|
||||
DisableInternalLog: options.DisableInternalLog,
|
||||
TimeNow: defaultTime(options.TimeNowFunc),
|
||||
DisableInternalLog: options.DisableInternalLog,
|
||||
PermissiveCacheLoading: cliOpts.PermissiveCacheLoading,
|
||||
}
|
||||
|
||||
mr := metrics.NewRegistry()
|
||||
@@ -247,25 +252,7 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions,
|
||||
return nil, errors.Wrap(ferr, "unable to create format manager")
|
||||
}
|
||||
|
||||
if _, err := retry.WithExponentialBackoffMaxRetries(ctx, -1, "wait for upgrade", func() (interface{}, error) {
|
||||
uli, err := fmgr.UpgradeLockIntent()
|
||||
if err != nil {
|
||||
//nolint:wrapcheck
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// retry if upgrade lock has been taken
|
||||
if locked, _ := uli.IsLocked(cmOpts.TimeNow()); locked && options.UpgradeOwnerID != uli.OwnerID {
|
||||
return nil, ErrRepositoryUnavailableDueToUpgrageInProgress
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}, func(internalErr error) bool {
|
||||
return !options.DoNotWaitForUpgrade && errors.Is(internalErr, ErrRepositoryUnavailableDueToUpgrageInProgress)
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// check features before and perform configuration before performing IO
|
||||
if err := handleMissingRequiredFeatures(ctx, fmgr, options.TestOnlyIgnoreMissingRequiredFeatures); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -307,8 +294,33 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions,
|
||||
st = wrapLockingStorage(st, blobcfg)
|
||||
}
|
||||
|
||||
// background/interleaving upgrade lock storage monitor
|
||||
st = upgradeLockMonitor(fmgr, options.UpgradeOwnerID, st, cmOpts.TimeNow, options.OnFatalError, options.TestOnlyIgnoreMissingRequiredFeatures)
|
||||
_, err = retry.WithExponentialBackoffMaxRetries(ctx, -1, "wait for upgrade", func() (interface{}, error) {
|
||||
//nolint:govet
|
||||
uli, err := fmgr.UpgradeLockIntent()
|
||||
if err != nil {
|
||||
//nolint:wrapcheck
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// retry if upgrade lock has been taken
|
||||
if !cliOpts.PermissiveCacheLoading {
|
||||
if locked, _ := uli.IsLocked(cmOpts.TimeNow()); locked && options.UpgradeOwnerID != uli.OwnerID {
|
||||
return nil, ErrRepositoryUnavailableDueToUpgradeInProgress
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}, func(internalErr error) bool {
|
||||
return !options.DoNotWaitForUpgrade && errors.Is(internalErr, ErrRepositoryUnavailableDueToUpgradeInProgress)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !cliOpts.PermissiveCacheLoading {
|
||||
// background/interleaving upgrade lock storage monitor
|
||||
st = upgradeLockMonitor(fmgr, options.UpgradeOwnerID, st, cmOpts.TimeNow, options.OnFatalError, options.TestOnlyIgnoreMissingRequiredFeatures)
|
||||
}
|
||||
|
||||
scm, ferr := content.NewSharedManager(ctx, st, fmgr, cacheOpts, cmOpts, mr)
|
||||
if ferr != nil {
|
||||
@@ -456,7 +468,7 @@ func upgradeLockMonitor(
|
||||
if uli != nil {
|
||||
// only allow the upgrade owner to perform storage operations
|
||||
if locked, _ := uli.IsLocked(now()); locked && upgradeOwnerID != uli.OwnerID {
|
||||
return ErrRepositoryUnavailableDueToUpgrageInProgress
|
||||
return ErrRepositoryUnavailableDueToUpgradeInProgress
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
"github.com/kopia/kopia/internal/testlogging"
|
||||
"github.com/kopia/kopia/internal/testutil"
|
||||
"github.com/kopia/kopia/internal/uitask"
|
||||
"github.com/kopia/kopia/repo"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/blob/filesystem"
|
||||
"github.com/kopia/kopia/snapshot"
|
||||
@@ -253,6 +254,9 @@ func TestServerStartAsyncRepoConnect(t *testing.T) {
|
||||
func TestServerCreateAndConnectViaAPI(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
//nolint:tenv
|
||||
os.Setenv("KOPIA_UPGRADE_LOCK_ENABLED", "true")
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
|
||||
runner := testenv.NewInProcRunner(t)
|
||||
@@ -309,6 +313,9 @@ func TestServerCreateAndConnectViaAPI(t *testing.T) {
|
||||
ConnectRepositoryRequest: serverapi.ConnectRepositoryRequest{
|
||||
Password: "foofoo",
|
||||
Storage: connInfo,
|
||||
ClientOptions: repo.ClientOptions{
|
||||
PermissiveCacheLoading: true,
|
||||
},
|
||||
},
|
||||
}); err != nil {
|
||||
t.Fatalf("create error: %v", err)
|
||||
|
||||
Reference in New Issue
Block a user