diff --git a/cli/command_repository_change_password.go b/cli/command_repository_change_password.go index e8e1b2424..adfd841f8 100644 --- a/cli/command_repository_change_password.go +++ b/cli/command_repository_change_password.go @@ -36,7 +36,7 @@ func (c *commandRepositoryChangePassword) run(ctx context.Context, rep repo.Dire newPass = c.newPassword } - if err := rep.ChangePassword(ctx, newPass); err != nil { + if err := rep.FormatManager().ChangePassword(ctx, newPass); err != nil { return errors.Wrap(err, "unable to change password") } diff --git a/cli/command_repository_set_parameters.go b/cli/command_repository_set_parameters.go index 6bbafed33..43048bde7 100644 --- a/cli/command_repository_set_parameters.go +++ b/cli/command_repository_set_parameters.go @@ -125,10 +125,17 @@ func (c *commandRepositorySetParameters) setRetentionModeParameter(ctx context.C func (c *commandRepositorySetParameters) run(ctx context.Context, rep repo.DirectRepositoryWriter) error { var anyChange bool - mp := rep.ContentReader().ContentFormat().Struct().MutableParameters - blobcfg := rep.BlobCfg() + mp, err := rep.FormatManager().GetMutableParameters() + if err != nil { + return errors.Wrap(err, "mutable parameters") + } - requiredFeatures, err := rep.RequiredFeatures() + blobcfg, err := rep.FormatManager().BlobCfgBlob() + if err != nil { + return errors.Wrap(err, "blob configuration") + } + + requiredFeatures, err := rep.FormatManager().RequiredFeatures() if err != nil { return errors.Wrap(err, "unable to get required features") } @@ -187,7 +194,7 @@ func (c *commandRepositorySetParameters) run(ctx context.Context, rep repo.Direc } } - if err := rep.SetParameters(ctx, mp, blobcfg, requiredFeatures); err != nil { + if err := rep.FormatManager().SetParameters(ctx, mp, blobcfg, requiredFeatures); err != nil { return errors.Wrap(err, "error setting parameters") } diff --git a/cli/command_repository_status.go b/cli/command_repository_status.go index 1b8cd2be2..aa12f1d20 100644 --- a/cli/command_repository_status.go +++ b/cli/command_repository_status.go @@ -63,9 +63,9 @@ func (c *commandRepositoryStatus) outputJSON(ctx context.Context, r repo.Reposit ci := dr.BlobReader().ConnectionInfo() s.UniqueIDHex = hex.EncodeToString(dr.UniqueID()) s.ObjectFormat = dr.ObjectFormat() - s.BlobRetention = dr.BlobCfg() - s.Storage = scrubber.ScrubSensitiveData(reflect.ValueOf(ci)).Interface().(blob.ConnectionInfo) //nolint:forcetypeassert - s.ContentFormat = scrubber.ScrubSensitiveData(reflect.ValueOf(dr.ContentReader().ContentFormat().Struct())).Interface().(format.ContentFormat) //nolint:forcetypeassert + s.BlobRetention, _ = dr.FormatManager().BlobCfgBlob() + s.Storage = scrubber.ScrubSensitiveData(reflect.ValueOf(ci)).Interface().(blob.ConnectionInfo) //nolint:forcetypeassert + s.ContentFormat = dr.FormatManager().ScrubbedContentFormat() switch cp, err := dr.BlobVolume().GetCapacity(ctx); { case err == nil: @@ -88,7 +88,7 @@ func (c *commandRepositoryStatus) dumpUpgradeStatus(ctx context.Context, dr repo return nil } - l, err := drw.GetUpgradeLockIntent(ctx) + l, err := drw.FormatManager().GetUpgradeLockIntent(ctx) if err != nil { return errors.Wrap(err, "failed to get the upgrade lock intent") } @@ -120,7 +120,7 @@ func (c *commandRepositoryStatus) dumpUpgradeStatus(ctx context.Context, dr repo } func (c *commandRepositoryStatus) dumpRetentionStatus(dr repo.DirectRepository) { - if blobcfg := dr.BlobCfg(); blobcfg.IsRetentionEnabled() { + if blobcfg, _ := dr.FormatManager().BlobCfgBlob(); blobcfg.IsRetentionEnabled() { c.out.printStdout("\n") c.out.printStdout("Blob retention mode: %s\n", blobcfg.RetentionMode) c.out.printStdout("Blob retention period: %s\n", blobcfg.RetentionPeriod) @@ -251,7 +251,7 @@ func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository) } func (c *commandRepositoryStatus) outputRequiredFeatures(dr repo.DirectRepository) { - if req, _ := dr.RequiredFeatures(); len(req) > 0 { + if req, _ := dr.FormatManager().RequiredFeatures(); len(req) > 0 { var featureIDs []string for _, r := range req { diff --git a/cli/command_repository_upgrade.go b/cli/command_repository_upgrade.go index 5af370c17..98217d838 100644 --- a/cli/command_repository_upgrade.go +++ b/cli/command_repository_upgrade.go @@ -4,7 +4,6 @@ "context" "fmt" "os" - "path/filepath" "time" "github.com/alecthomas/kingpin" @@ -86,7 +85,7 @@ func (c *commandRepositoryUpgrade) forceRollbackAction(ctx context.Context, rep return errors.New("repository upgrade lock can only be revoked unsafely; please use the --force flag") } - if err := rep.RollbackUpgrade(ctx); err != nil { + if err := rep.FormatManager().RollbackUpgrade(ctx); err != nil { return errors.Wrap(err, "failed to rollback the upgrade") } @@ -141,7 +140,7 @@ func (c *commandRepositoryUpgrade) setLockIntent(ctx context.Context, rep repo.D // Update format-blob and clear the cache. // This will fail if we have already upgraded. - l, err := rep.SetUpgradeLockIntent(ctx, *l) + l, err := rep.FormatManager().SetUpgradeLockIntent(ctx, *l) if err != nil { return errors.Wrap(err, "error setting the upgrade lock intent") } @@ -182,7 +181,7 @@ func (c *commandRepositoryUpgrade) drainOrCommit(ctx context.Context, rep repo.D if mp.EpochParameters.Enabled { log(ctx).Infof("Repository indices have already been migrated to the epoch format, no need to drain other clients") - l, err := rep.GetUpgradeLockIntent(ctx) + l, err := rep.FormatManager().GetUpgradeLockIntent(ctx) if err != nil { return errors.Wrap(err, "failed to get upgrade lock intent") } @@ -224,28 +223,8 @@ func (c *commandRepositoryUpgrade) sleepWithContext(ctx context.Context, dur tim } func (c *commandRepositoryUpgrade) drainAllClients(ctx context.Context, rep repo.DirectRepositoryWriter) error { - password, err := c.svc.getPasswordFromFlags(ctx, false, false) - if err != nil { - return errors.Wrap(err, "getting password") - } - - configFile, err := filepath.Abs(c.svc.repositoryConfigFileName()) - if err != nil { - return errors.Wrap(err, "error resolving config file path") - } - - lc, err := repo.LoadConfigFromFile(configFile) - if err != nil { - return errors.Wrapf(err, "error loading config file %q", configFile) - } - - cacheOpts := lc.Caching.CloneOrDefault() - for { - l, err := format.ReadAndCacheRepoUpgradeLock(ctx, rep.BlobStorage(), password, cacheOpts.CacheDirectory, -1) - if err != nil { - return errors.Wrap(err, "unable to reload the repository format blob") - } + l, err := rep.FormatManager().GetUpgradeLockIntent(ctx) upgradeTime := l.UpgradeTime() now := rep.Time() @@ -280,7 +259,7 @@ func (c *commandRepositoryUpgrade) upgrade(ctx context.Context, rep repo.DirectR return errors.Wrap(mperr, "mutable parameters") } - rf, err := rep.RequiredFeatures() + rf, err := rep.FormatManager().RequiredFeatures() if err != nil { return errors.Wrap(err, "error getting repository features") } @@ -295,12 +274,17 @@ func (c *commandRepositoryUpgrade) upgrade(ctx context.Context, rep repo.DirectR log(ctx).Infof("migrating current indices to epoch format") - if err := rep.ContentManager().PrepareUpgradeToIndexBlobManagerV1(ctx, mp.EpochParameters); err != nil { - return errors.Wrap(err, "error upgrading indices") + if uerr := rep.ContentManager().PrepareUpgradeToIndexBlobManagerV1(ctx, mp.EpochParameters); uerr != nil { + return errors.Wrap(uerr, "error upgrading indices") + } + + blobCfg, err := rep.FormatManager().BlobCfgBlob() + if err != nil { + return errors.Wrap(err, "error getting blob configuration") } // update format-blob and clear the cache - if err := rep.SetParameters(ctx, mp, rep.BlobCfg(), rf); err != nil { + if err := rep.FormatManager().SetParameters(ctx, mp, blobCfg, rf); err != nil { return errors.Wrap(err, "error setting parameters") } @@ -322,7 +306,7 @@ func (c *commandRepositoryUpgrade) upgrade(ctx context.Context, rep repo.DirectR // cleanup and backups used for the rollback mechanism, so we cannot rollback // after this phase. func (c *commandRepositoryUpgrade) commitUpgrade(ctx context.Context, rep repo.DirectRepositoryWriter) error { - if err := rep.CommitUpgrade(ctx); err != nil { + if err := rep.FormatManager().CommitUpgrade(ctx); err != nil { return errors.Wrap(err, "error finalizing upgrade") } // we need to reopen the repository after this point diff --git a/repo/change_password.go b/repo/change_password.go deleted file mode 100644 index 60aa82c1b..000000000 --- a/repo/change_password.go +++ /dev/null @@ -1,58 +0,0 @@ -package repo - -import ( - "context" - "os" - "path/filepath" - - "github.com/pkg/errors" - - "github.com/kopia/kopia/repo/format" -) - -// ChangePassword changes the repository password and rewrites -// `kopia.repository` & `kopia.blobcfg`. -func (r *directRepository) ChangePassword(ctx context.Context, newPassword string) error { - f := r.formatBlob - - repoConfig, err := f.DecryptRepositoryConfig(r.formatEncryptionKey) - if err != nil { - return errors.Wrap(err, "unable to decrypt repository config") - } - - if !repoConfig.EnablePasswordChange { - return errors.Errorf("password changes are not supported for repositories created using Kopia v0.8 or older") - } - - newFormatEncryptionKey, err := f.DeriveFormatEncryptionKeyFromPassword(newPassword) - if err != nil { - return errors.Wrap(err, "unable to derive master key") - } - - r.formatEncryptionKey = newFormatEncryptionKey - - if err := f.EncryptRepositoryConfig(repoConfig, newFormatEncryptionKey); err != nil { - return errors.Wrap(err, "unable to encrypt format bytes") - } - - if err := f.WriteBlobCfgBlob(ctx, r.blobs, r.blobCfgBlob, newFormatEncryptionKey); err != nil { - return errors.Wrap(err, "unable to write blobcfg blob") - } - - if err := f.WriteKopiaRepositoryBlob(ctx, r.blobs, r.blobCfgBlob); err != nil { - return errors.Wrap(err, "unable to write format blob") - } - - // remove cached kopia.repository blob. - if cd := r.cachingOptions.CacheDirectory; cd != "" { - if err := os.Remove(filepath.Join(cd, format.KopiaRepositoryBlobID)); err != nil { - log(ctx).Errorf("unable to remove %s: %v", format.KopiaRepositoryBlobID, err) - } - - if err := os.Remove(filepath.Join(cd, format.KopiaBlobCfgBlobID)); err != nil && !os.IsNotExist(err) { - log(ctx).Errorf("unable to remove %s: %v", format.KopiaBlobCfgBlobID, err) - } - } - - return nil -} diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index 634a7ff66..2459882f2 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -752,7 +752,12 @@ func (bm *WriteManager) getOrCreatePendingPackInfoLocked(ctx context.Context, pr return nil, errors.Wrap(err, "unable to read crypto bytes") } - b.Append(bm.format.RepositoryFormatBytes()) + suffix, berr := bm.format.RepositoryFormatBytes() + if berr != nil { + return nil, errors.Wrap(berr, "format bytes") + } + + b.Append(suffix) //nolint:gosec if err := writeRandomBytesToBuffer(b, rand.Intn(bm.maxPreambleLength-bm.minPreambleLength+1)+bm.minPreambleLength); err != nil { @@ -941,11 +946,10 @@ func (bm *WriteManager) MetadataCache() cache.ContentCache { // ManagerOptions are the optional parameters for manager creation. type ManagerOptions struct { - RepositoryFormatBytes []byte - TimeNow func() time.Time // Time provider - DisableInternalLog bool - RetentionMode string - RetentionPeriod time.Duration + TimeNow func() time.Time // Time provider + DisableInternalLog bool + RetentionMode string + RetentionPeriod time.Duration } // CloneOrDefault returns a clone of provided ManagerOptions or default empty struct if nil. diff --git a/repo/format/blobcfg_blob.go b/repo/format/blobcfg_blob.go index da248481a..31833513f 100644 --- a/repo/format/blobcfg_blob.go +++ b/repo/format/blobcfg_blob.go @@ -59,8 +59,8 @@ func serializeBlobCfgBytes(f *KopiaRepositoryJSON, r BlobStorageConfiguration, f } } -// DeserializeBlobCfgBytes decrypts and deserializes the given bytes into BlobStorageConfiguration. -func (f *KopiaRepositoryJSON) DeserializeBlobCfgBytes(encryptedBlobCfgBytes, formatEncryptionKey []byte) (BlobStorageConfiguration, error) { +// deserializeBlobCfgBytes decrypts and deserializes the given bytes into BlobStorageConfiguration. +func deserializeBlobCfgBytes(j *KopiaRepositoryJSON, encryptedBlobCfgBytes, formatEncryptionKey []byte) (BlobStorageConfiguration, error) { var ( plainText []byte r BlobStorageConfiguration @@ -71,18 +71,18 @@ func (f *KopiaRepositoryJSON) DeserializeBlobCfgBytes(encryptedBlobCfgBytes, for return r, nil } - switch f.EncryptionAlgorithm { + switch j.EncryptionAlgorithm { case "NONE": // do nothing plainText = encryptedBlobCfgBytes case aes256GcmEncryption: - plainText, err = decryptRepositoryBlobBytesAes256Gcm(encryptedBlobCfgBytes, formatEncryptionKey, f.UniqueID) + plainText, err = decryptRepositoryBlobBytesAes256Gcm(encryptedBlobCfgBytes, formatEncryptionKey, j.UniqueID) if err != nil { return BlobStorageConfiguration{}, errors.Errorf("unable to decrypt repository blobcfg blob") } default: - return BlobStorageConfiguration{}, errors.Errorf("unknown encryption algorithm: '%v'", f.EncryptionAlgorithm) + return BlobStorageConfiguration{}, errors.Errorf("unknown encryption algorithm: '%v'", j.EncryptionAlgorithm) } if err = json.Unmarshal(plainText, &r); err != nil { diff --git a/repo/format/format_blob.go b/repo/format/format_blob.go index b38b4e7fb..9ce6ba43a 100644 --- a/repo/format/format_blob.go +++ b/repo/format/format_blob.go @@ -33,7 +33,7 @@ const KopiaRepositoryBlobID = "kopia.repository" // ErrInvalidPassword is returned when repository password is invalid. -var ErrInvalidPassword = errors.Errorf("invalid repository password") +var ErrInvalidPassword = errors.Errorf("invalid repository password") // +checklocksignore //nolint:gochecknoglobals var ( diff --git a/repo/format/format_blob_cache.go b/repo/format/format_blob_cache.go index b458f9f45..6563ef442 100644 --- a/repo/format/format_blob_cache.go +++ b/repo/format/format_blob_cache.go @@ -1,9 +1,11 @@ package format import ( + "bytes" "context" "os" "path/filepath" + "sync" "time" "github.com/pkg/errors" @@ -12,133 +14,163 @@ "github.com/kopia/kopia/internal/cache" "github.com/kopia/kopia/internal/cachedir" "github.com/kopia/kopia/internal/clock" - "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/repo/blob" - "github.com/kopia/kopia/repo/logging" ) // DefaultRepositoryBlobCacheDuration is the duration for which we treat cached kopia.repository // as valid. const DefaultRepositoryBlobCacheDuration = 15 * time.Minute -var log = logging.Module("kopia/repo/format") - -func formatBytesCachingEnabled(cacheDirectory string, validDuration time.Duration) bool { - if cacheDirectory == "" { - return false - } - - return validDuration > 0 +// blobCache encapsulates cache for format blobs. +// Note that the cache only stores very small number of blobs at the root of the repository, +// usually 1 or 2. +type blobCache interface { + Get(ctx context.Context, blobID blob.ID) ([]byte, time.Time, bool) + Put(ctx context.Context, blobID blob.ID, data []byte) (time.Time, error) + Remove(ctx context.Context, ids []blob.ID) } -func readRepositoryBlobBytesFromCache(ctx context.Context, cachedFile string, validDuration time.Duration) (data []byte, cacheMTime time.Time, err error) { +type nullCache struct{} + +func (nullCache) Get(ctx context.Context, blobID blob.ID) ([]byte, time.Time, bool) { + return nil, time.Time{}, false +} + +func (nullCache) Put(ctx context.Context, blobID blob.ID, data []byte) (time.Time, error) { + return clock.Now(), nil +} + +func (nullCache) Remove(ctx context.Context, ids []blob.ID) { +} + +type inMemoryCache struct { + timeNow func() time.Time // +checklocksignore + + mu sync.Mutex + // +checklocks:mu + data map[blob.ID][]byte + // +checklocks:mu + times map[blob.ID]time.Time +} + +func (c *inMemoryCache) Get(ctx context.Context, blobID blob.ID) ([]byte, time.Time, bool) { + c.mu.Lock() + defer c.mu.Unlock() + + data, ok := c.data[blobID] + if ok { + return data, c.times[blobID], true + } + + return nil, time.Time{}, false +} + +func (c *inMemoryCache) Put(ctx context.Context, blobID blob.ID, data []byte) (time.Time, error) { + c.mu.Lock() + defer c.mu.Unlock() + + c.data[blobID] = data + c.times[blobID] = c.timeNow() + + return c.times[blobID], nil +} + +func (c *inMemoryCache) Remove(ctx context.Context, ids []blob.ID) { + c.mu.Lock() + defer c.mu.Unlock() + + for _, blobID := range ids { + delete(c.data, blobID) + delete(c.times, blobID) + } +} + +type onDiskCache struct { + cacheDirectory string +} + +func (c *onDiskCache) Get(ctx context.Context, blobID blob.ID) ([]byte, time.Time, bool) { + cachedFile := filepath.Join(c.cacheDirectory, string(blobID)) + cst, err := os.Stat(cachedFile) if err != nil { - return nil, time.Time{}, errors.Wrap(err, "unable to open cache file") + return nil, time.Time{}, false } - cacheMTime = cst.ModTime() - if clock.Now().Sub(cacheMTime) > validDuration { - // got cached file, but it's too old, remove it - if err = os.Remove(cachedFile); err != nil { - log(ctx).Debugf("unable to remove cache file: %v", err) - } + cacheMTime := cst.ModTime() - return nil, time.Time{}, errors.Errorf("cached file too old") - } + //nolint:gosec + data, err := os.ReadFile(cachedFile) - data, err = os.ReadFile(cachedFile) //nolint:gosec - if err != nil { - return nil, time.Time{}, errors.Wrapf(err, "failed to read the cache file %q", cachedFile) - } - - return data, cacheMTime, nil + return data, cacheMTime, err == nil } -// ReadAndCacheRepositoryBlobBytes reads the provided blob from the repository or cache directory. -func ReadAndCacheRepositoryBlobBytes(ctx context.Context, st blob.Storage, cacheDirectory, blobID string, validDuration time.Duration) ([]byte, time.Time, error) { - cachedFile := filepath.Join(cacheDirectory, blobID) +func (c *onDiskCache) Put(ctx context.Context, blobID blob.ID, data []byte) (time.Time, error) { + cachedFile := filepath.Join(c.cacheDirectory, string(blobID)) - if validDuration == 0 { - validDuration = DefaultRepositoryBlobCacheDuration - } + // optimistically assume cache directory exist, create it if not + if err := atomicfile.Write(cachedFile, bytes.NewReader(data)); err != nil { + if err := os.MkdirAll(c.cacheDirectory, cache.DirMode); err != nil && !os.IsExist(err) { + return time.Time{}, errors.Wrap(err, "unable to create cache directory") + } - if cacheDirectory != "" { - if err := os.MkdirAll(cacheDirectory, cache.DirMode); err != nil && !os.IsExist(err) { - log(ctx).Errorf("unable to create cache directory: %v", err) + if err := cachedir.WriteCacheMarker(c.cacheDirectory); err != nil { + return time.Time{}, errors.Wrap(err, "unable to write cache directory marker") + } + + if err := atomicfile.Write(cachedFile, bytes.NewReader(data)); err != nil { + return time.Time{}, errors.Wrapf(err, "unable to write to cache: %v", string(blobID)) } } - cacheEnabled := formatBytesCachingEnabled(cacheDirectory, validDuration) - if cacheEnabled { - data, cacheMTime, err := readRepositoryBlobBytesFromCache(ctx, cachedFile, validDuration) - if err == nil { - log(ctx).Debugf("%s retrieved from cache", blobID) - - return data, cacheMTime, nil - } - - if os.IsNotExist(err) { - log(ctx).Debugf("%s could not be fetched from cache: %v", blobID, err) - } - } else { - log(ctx).Debugf("%s cache not enabled", blobID) + cst, err := os.Stat(cachedFile) + if err != nil { + return time.Time{}, errors.Wrap(err, "unable to open cache file") } - var b gather.WriteBuffer - defer b.Close() - - if err := st.GetBlob(ctx, blob.ID(blobID), 0, -1, &b); err != nil { - return nil, time.Time{}, errors.Wrapf(err, "error getting %s blob", blobID) - } - - if cacheEnabled { - if err := atomicfile.Write(cachedFile, b.Bytes().Reader()); err != nil { - log(ctx).Warnf("unable to write cache: %v", err) - } - } - - return b.ToByteSlice(), clock.Now(), nil + return cst.ModTime(), nil } -// ReadAndCacheDecodedRepositoryConfig reads `kopia.repository` blob, potentially from cache and decodes it. -func ReadAndCacheDecodedRepositoryConfig(ctx context.Context, st blob.Storage, password, cacheDir string, validDuration time.Duration) (ufb *DecodedRepositoryConfig, err error) { - ufb = &DecodedRepositoryConfig{} +func (c *onDiskCache) Remove(ctx context.Context, ids []blob.ID) { + for _, blobID := range ids { + fname := filepath.Join(c.cacheDirectory, string(blobID)) + log(ctx).Infof("deleting %v", fname) - ufb.KopiaRepositoryBytes, ufb.CacheMTime, err = ReadAndCacheRepositoryBlobBytes(ctx, st, cacheDir, KopiaRepositoryBlobID, validDuration) - if err != nil { - return nil, errors.Wrap(err, "unable to read format blob") + if err := os.Remove(fname); err != nil && !os.IsNotExist(err) { + log(ctx).Debugf("unable to remove cached repository format blob: %v", err) + } } - - if err = cachedir.WriteCacheMarker(cacheDir); err != nil { - return nil, errors.Wrap(err, "unable to write cache directory marker") - } - - ufb.KopiaRepository, err = ParseKopiaRepositoryJSON(ufb.KopiaRepositoryBytes) - if err != nil { - return nil, errors.Wrap(err, "can't parse format blob") - } - - ufb.KopiaRepositoryBytes, err = addFormatBlobChecksumAndLength(ufb.KopiaRepositoryBytes) - if err != nil { - return nil, errors.Errorf("unable to add checksum") - } - - ufb.FormatEncryptionKey, err = ufb.KopiaRepository.DeriveFormatEncryptionKeyFromPassword(password) - if err != nil { - return nil, err - } - - ufb.RepoConfig, err = ufb.KopiaRepository.DecryptRepositoryConfig(ufb.FormatEncryptionKey) - if err != nil { - return nil, ErrInvalidPassword - } - - return ufb, nil } -// ReadAndCacheRepoUpgradeLock loads the lock config from cache and returns it. -func ReadAndCacheRepoUpgradeLock(ctx context.Context, st blob.Storage, password, cacheDir string, validDuration time.Duration) (*UpgradeLockIntent, error) { - ufb, err := ReadAndCacheDecodedRepositoryConfig(ctx, st, password, cacheDir, validDuration) - return ufb.RepoConfig.UpgradeLock, err +// NewDiskCache returns on-disk blob cache. +func NewDiskCache(cacheDir string) blobCache { + return &onDiskCache{cacheDir} } + +// NewMemoryBlobCache returns in-memory blob cache. +func NewMemoryBlobCache(timeNow func() time.Time) blobCache { + return &inMemoryCache{ + timeNow: timeNow, + data: map[blob.ID][]byte{}, + times: map[blob.ID]time.Time{}, + } +} + +// NewFormatBlobCache creates an implementationof blobCache for particular cache settings. +func NewFormatBlobCache(cacheDir string, validDuration time.Duration, timeNow func() time.Time) blobCache { + if cacheDir != "" { + return NewDiskCache(cacheDir) + } + + if validDuration > 0 { + return NewMemoryBlobCache(timeNow) + } + + return &nullCache{} +} + +var ( + _ blobCache = (*nullCache)(nil) + _ blobCache = (*inMemoryCache)(nil) + _ blobCache = (*onDiskCache)(nil) +) diff --git a/repo/format/format_blob_cache_test.go b/repo/format/format_blob_cache_test.go new file mode 100644 index 000000000..105b005ef --- /dev/null +++ b/repo/format/format_blob_cache_test.go @@ -0,0 +1,98 @@ +package format + +import ( + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/internal/clock" + "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/internal/testutil" + "github.com/kopia/kopia/repo/blob" +) + +func TestFormatBlobCache(t *testing.T) { + tempdir1 := testutil.TempDirectory(t) + tempdir2 := filepath.Join(testutil.TempDirectory(t), "subdir") + + cases := []struct { + desc string + fbc blobCache + isDurable bool + }{ + {"NullCache", NewFormatBlobCache("", -1, clock.Now), false}, + {"DiskCache-Exists", NewFormatBlobCache(tempdir1, -1, clock.Now), true}, + {"DiskCache-NotExists", NewFormatBlobCache(tempdir2, -1, clock.Now), true}, + {"MemoryCache", NewFormatBlobCache("", 10*time.Second, clock.Now), true}, + } + + t.Run("Cases", func(t *testing.T) { + for _, tc := range cases { + tc := tc + + t.Run(tc.desc, func(t *testing.T) { + t.Parallel() + + ctx := testlogging.Context(t) + + v, mtime, ok := tc.fbc.Get(ctx, "blob1") + require.False(t, ok) + require.Nil(t, v) + require.Zero(t, mtime) + + mtime, err := tc.fbc.Put(ctx, "blob1", []byte{1, 2, 3}) + require.NoError(t, err) + require.NotZero(t, mtime) + + _, err = tc.fbc.Put(ctx, "blob2", []byte{3, 4, 5}) + require.NoError(t, err) + + v2, mtime2, ok := tc.fbc.Get(ctx, "blob1") + + if tc.isDurable { + require.True(t, ok) + require.Equal(t, []byte{1, 2, 3}, v2) + require.Equal(t, mtime, mtime2) + } else { + require.False(t, ok) + require.Nil(t, v) + require.Zero(t, mtime2) + } + + time.Sleep(3 * time.Second) + + // overwrite + mtime3, err := tc.fbc.Put(ctx, "blob1", []byte{2, 3, 4}) + require.NoError(t, err) + require.Greater(t, mtime3, mtime2) + + v2, mtime4, ok := tc.fbc.Get(ctx, "blob1") + + if tc.isDurable { + require.True(t, ok) + require.Equal(t, []byte{2, 3, 4}, v2) + require.Equal(t, mtime3, mtime4) + } else { + require.False(t, ok) + require.Nil(t, v2) + require.Zero(t, mtime4) + } + + tc.fbc.Remove(ctx, []blob.ID{"blob1"}) + + v3, mtime5, ok := tc.fbc.Get(ctx, "blob1") + + require.False(t, ok) + require.Nil(t, v3) + require.Zero(t, mtime5) + }) + } + }) + + require.NoFileExists(t, filepath.Join(tempdir1, "blob1")) + require.NoFileExists(t, filepath.Join(tempdir2, "blob1")) + require.FileExists(t, filepath.Join(tempdir1, "blob2")) + require.FileExists(t, filepath.Join(tempdir2, "blob2")) +} diff --git a/repo/format/format_change_password.go b/repo/format/format_change_password.go new file mode 100644 index 000000000..28927bd7d --- /dev/null +++ b/repo/format/format_change_password.go @@ -0,0 +1,44 @@ +package format + +import ( + "context" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/repo/blob" +) + +// ChangePassword changes the repository password and rewrites +// `kopia.repository` & `kopia.blobcfg`. +func (m *Manager) ChangePassword(ctx context.Context, newPassword string) error { + m.mu.Lock() + defer m.mu.Unlock() + + if !m.repoConfig.EnablePasswordChange { + return errors.Errorf("password changes are not supported for repositories created using Kopia v0.8 or older") + } + + newFormatEncryptionKey, err := m.j.DeriveFormatEncryptionKeyFromPassword(newPassword) + if err != nil { + return errors.Wrap(err, "unable to derive master key") + } + + m.formatEncryptionKey = newFormatEncryptionKey + m.password = newPassword + + if err := m.j.EncryptRepositoryConfig(m.repoConfig, newFormatEncryptionKey); err != nil { + return errors.Wrap(err, "unable to encrypt format bytes") + } + + if err := m.j.WriteBlobCfgBlob(ctx, m.blobs, m.blobCfgBlob, newFormatEncryptionKey); err != nil { + return errors.Wrap(err, "unable to write blobcfg blob") + } + + if err := m.j.WriteKopiaRepositoryBlob(ctx, m.blobs, m.blobCfgBlob); err != nil { + return errors.Wrap(err, "unable to write format blob") + } + + m.cache.Remove(ctx, []blob.ID{KopiaRepositoryBlobID, KopiaBlobCfgBlobID}) + + return nil +} diff --git a/repo/format/format_manager.go b/repo/format/format_manager.go new file mode 100644 index 000000000..1b78ad6dc --- /dev/null +++ b/repo/format/format_manager.go @@ -0,0 +1,492 @@ +package format + +import ( + "context" + "crypto/rand" + "io" + "sync" + "time" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/feature" + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/encryption" + "github.com/kopia/kopia/repo/hashing" + "github.com/kopia/kopia/repo/logging" +) + +var log = logging.Module("kopia/repo/format") + +// UniqueIDLengthBytes is the length of random unique ID of each repository. +const UniqueIDLengthBytes = 32 + +// Manager manages the contents of `kopia.repository` and `kopia.blobcfg`. +type Manager struct { + //nolint:containedctx + ctx context.Context // +checklocksignore + blobs blob.Storage // +checklocksignore + validDuration time.Duration // +checklocksignore + password string // +checklocksignore + cache blobCache // +checklocksignore + + // provider for immutable parts of the format data, used to avoid locks. + immutable Provider + + timeNow func() time.Time // +checklocksignore + + // all the stuff protected by a mutex is valid until `validUntil` + mu sync.RWMutex + // +checklocks:mu + formatEncryptionKey []byte + // +checklocks:mu + j *KopiaRepositoryJSON + // +checklocks:mu + repoConfig *RepositoryConfig + // +checklocks:mu + blobCfgBlob BlobStorageConfiguration + // +checklocks:mu + current Provider + // +checklocks:mu + validUntil time.Time + // +checklocks:mu + loadedTime time.Time + // +checklocks:mu + refreshCounter int + // +checklocks:mu + ignoreCacheOnFirstRefresh bool +} + +func (m *Manager) getOrRefreshFormat() (Provider, error) { + if err := m.maybeRefreshNotLocked(); err != nil { + return nil, err + } + + m.mu.RLock() + defer m.mu.RUnlock() + + return m.current, nil +} + +func (m *Manager) maybeRefreshNotLocked() error { + m.mu.RLock() + val := m.validUntil + m.mu.RUnlock() + + if m.timeNow().Before(val) { + return nil + } + + // current format not valid anymore, kick off a refresh + return m.refresh(m.ctx) +} + +// readAndCacheRepositoryBlobBytes reads the provided blob from the repository or cache directory. +// +checklocks:m.mu +func (m *Manager) readAndCacheRepositoryBlobBytes(ctx context.Context, blobID blob.ID) ([]byte, time.Time, error) { + if !m.ignoreCacheOnFirstRefresh { + if data, mtime, ok := m.cache.Get(ctx, blobID); ok { + // read from cache and still valid + age := m.timeNow().Sub(mtime) + + if age < m.validDuration { + return data, mtime, nil + } + } + } + + var b gather.WriteBuffer + defer b.Close() + + if err := m.blobs.GetBlob(ctx, blobID, 0, -1, &b); err != nil { + return nil, time.Time{}, errors.Wrapf(err, "error getting %s blob", blobID) + } + + data := b.ToByteSlice() + + mtime, err := m.cache.Put(ctx, blobID, data) + + return data, mtime, errors.Wrapf(err, "error adding %s blob", blobID) +} + +// ValidCacheDuration returns the duration for which each blob in the cache is valid. +func (m *Manager) ValidCacheDuration() time.Duration { + return m.validDuration +} + +// RefreshCount returns the number of time the format has been refreshed. +func (m *Manager) RefreshCount() int { + m.mu.RLock() + defer m.mu.RUnlock() + + return m.refreshCounter +} + +// refresh reads `kopia.repository` blob, potentially from cache and decodes it. +func (m *Manager) refresh(ctx context.Context) error { + m.mu.Lock() + defer m.mu.Unlock() + + b, cacheMTime, err := m.readAndCacheRepositoryBlobBytes(ctx, KopiaRepositoryBlobID) + if err != nil { + return errors.Wrap(err, "unable to read format blob") + } + + j, err := ParseKopiaRepositoryJSON(b) + if err != nil { + return errors.Wrap(err, "can't parse format blob") + } + + b, err = addFormatBlobChecksumAndLength(b) + if err != nil { + return errors.Errorf("unable to add checksum") + } + + var formatEncryptionKey []byte + + // try decrypting using old key, if present to avoid deriving it, which is expensive + repoConfig, err := j.decryptRepositoryConfig(m.formatEncryptionKey) + if err == nil { + // still valid, no need to derive + formatEncryptionKey = m.formatEncryptionKey + } else { + formatEncryptionKey, err = j.DeriveFormatEncryptionKeyFromPassword(m.password) + if err != nil { + return errors.Wrap(err, "derive format encryption key") + } + + repoConfig, err = j.decryptRepositoryConfig(formatEncryptionKey) + if err != nil { + return ErrInvalidPassword + } + } + + var blobCfg BlobStorageConfiguration + + if b2, _, err2 := m.readAndCacheRepositoryBlobBytes(ctx, KopiaBlobCfgBlobID); err2 == nil { + var e2 error + + blobCfg, e2 = deserializeBlobCfgBytes(j, b2, formatEncryptionKey) + if e2 != nil { + return errors.Wrap(e2, "deserialize blob config") + } + } else if !errors.Is(err2, blob.ErrBlobNotFound) { + return errors.Wrap(err2, "load blob config") + } + + prov, err := NewFormattingOptionsProvider(&repoConfig.ContentFormat, b) + if err != nil { + return errors.Wrap(err, "error creating format provider") + } + + m.current = prov + m.j = j + m.repoConfig = repoConfig + m.validUntil = cacheMTime.Add(m.validDuration) + m.formatEncryptionKey = formatEncryptionKey + m.loadedTime = cacheMTime + m.blobCfgBlob = blobCfg + m.ignoreCacheOnFirstRefresh = false + + if m.immutable == nil { + // on first refresh, set `immutable`` + m.immutable = prov + } + + m.refreshCounter++ + + return nil +} + +// GetEncryptionAlgorithm returns the encryption algorithm. +func (m *Manager) GetEncryptionAlgorithm() string { + return m.immutable.GetEncryptionAlgorithm() +} + +// GetHashFunction returns the hash function. +func (m *Manager) GetHashFunction() string { + return m.immutable.GetHashFunction() +} + +// GetECCAlgorithm returns the ECC algorithm. +func (m *Manager) GetECCAlgorithm() string { + return m.immutable.GetECCAlgorithm() +} + +// GetECCOverheadPercent returns the ECC overhead percent. +func (m *Manager) GetECCOverheadPercent() int { + return m.immutable.GetECCOverheadPercent() +} + +// GetHmacSecret returns the HMAC function. +func (m *Manager) GetHmacSecret() []byte { + return m.immutable.GetHmacSecret() +} + +// HashFunc returns the resolved hash function. +func (m *Manager) HashFunc() hashing.HashFunc { + return m.immutable.HashFunc() +} + +// Encryptor returns the resolved encryptor. +func (m *Manager) Encryptor() encryption.Encryptor { + return m.immutable.Encryptor() +} + +// GetMasterKey gets the master key. +func (m *Manager) GetMasterKey() []byte { + return m.immutable.GetMasterKey() +} + +// SupportsPasswordChange returns true if the repository supports password change. +func (m *Manager) SupportsPasswordChange() bool { + return m.immutable.SupportsPasswordChange() +} + +// RepositoryFormatBytes returns the bytes of `kopia.repository` blob. +// This function blocks to refresh the format blob if necessary. +func (m *Manager) RepositoryFormatBytes() ([]byte, error) { + f, err := m.getOrRefreshFormat() + if err != nil { + return nil, err + } + + //nolint:wrapcheck + return f.RepositoryFormatBytes() +} + +// GetMutableParameters gets mutable paramers of the repository. +// This function blocks to refresh the format blob if necessary. +func (m *Manager) GetMutableParameters() (MutableParameters, error) { + f, err := m.getOrRefreshFormat() + if err != nil { + return MutableParameters{}, err + } + + //nolint:wrapcheck + return f.GetMutableParameters() +} + +// UpgradeLockIntent returns the current lock intent. +func (m *Manager) UpgradeLockIntent() (*UpgradeLockIntent, error) { + if err := m.maybeRefreshNotLocked(); err != nil { + return nil, err + } + + m.mu.RLock() + defer m.mu.RUnlock() + + return m.repoConfig.UpgradeLock.Clone(), nil +} + +// RequiredFeatures returns the list of features required to open the repository. +func (m *Manager) RequiredFeatures() ([]feature.Required, error) { + if err := m.maybeRefreshNotLocked(); err != nil { + return nil, err + } + + m.mu.RLock() + defer m.mu.RUnlock() + + return m.repoConfig.RequiredFeatures, nil +} + +// LoadedTime gets the time when the config was last reloaded. +func (m *Manager) LoadedTime() time.Time { + m.mu.RLock() + defer m.mu.RUnlock() + + return m.loadedTime +} + +// updateRepoConfigLocked updates repository config and rewrites kopia.repository blob. +// +checklocks:m.mu +func (m *Manager) updateRepoConfigLocked(ctx context.Context) error { + if err := m.j.EncryptRepositoryConfig(m.repoConfig, m.formatEncryptionKey); err != nil { + return errors.Errorf("unable to encrypt format bytes") + } + + if err := m.j.WriteKopiaRepositoryBlob(ctx, m.blobs, m.blobCfgBlob); err != nil { + return errors.Wrap(err, "unable to write format blob") + } + + m.cache.Remove(ctx, []blob.ID{KopiaRepositoryBlobID}) + + return nil +} + +// UniqueID gets the unique ID of a repository allocated at creation time. +func (m *Manager) UniqueID() []byte { + m.mu.RLock() + defer m.mu.RUnlock() + + return m.j.UniqueID +} + +// BlobCfgBlob gets the BlobStorageConfiguration. +func (m *Manager) BlobCfgBlob() (BlobStorageConfiguration, error) { + if err := m.maybeRefreshNotLocked(); err != nil { + return BlobStorageConfiguration{}, err + } + + m.mu.RLock() + defer m.mu.RUnlock() + + return m.blobCfgBlob, nil +} + +// ObjectFormat gets the object format. +func (m *Manager) ObjectFormat() ObjectFormat { + m.mu.RLock() + defer m.mu.RUnlock() + + return m.repoConfig.ObjectFormat +} + +// FormatEncryptionKey gets the format encryption key derived from the password. +func (m *Manager) FormatEncryptionKey() []byte { + m.mu.RLock() + defer m.mu.RUnlock() + + return m.formatEncryptionKey +} + +// ScrubbedContentFormat returns scrubbed content format with all sensitive data replaced. +func (m *Manager) ScrubbedContentFormat() ContentFormat { + m.mu.RLock() + defer m.mu.RUnlock() + + cf := m.repoConfig.ContentFormat + cf.MasterKey = nil + cf.HMACSecret = nil + + return cf +} + +// NewManager creates new format manager which automatically refreshes format blob on reads (in a blocking manner). +func NewManager( + ctx context.Context, + st blob.Storage, + cacheDir string, + validDuration time.Duration, + password string, + timeNow func() time.Time, +) (*Manager, error) { + return NewManagerWithCache(ctx, st, validDuration, password, timeNow, NewFormatBlobCache(cacheDir, validDuration, timeNow)) +} + +// NewManagerWithCache creates new format manager which automatically refreshes format blob on reads (in a blocking manner) +// and uses the provided cache. +func NewManagerWithCache( + ctx context.Context, + st blob.Storage, + validDuration time.Duration, + password string, + timeNow func() time.Time, + cache blobCache, +) (*Manager, error) { + var ignoreCacheOnFirstRefresh bool + + if validDuration < 0 { + // valid duration less than zero indicates we want to skip cache on first access + validDuration = DefaultRepositoryBlobCacheDuration + ignoreCacheOnFirstRefresh = true + } + + if validDuration > DefaultRepositoryBlobCacheDuration { + log(ctx).Infof("repository format cache duration capped at %v", DefaultRepositoryBlobCacheDuration) + + validDuration = DefaultRepositoryBlobCacheDuration + } + + m := &Manager{ + ctx: ctx, + blobs: st, + validDuration: validDuration, + password: password, + cache: cache, + timeNow: timeNow, + ignoreCacheOnFirstRefresh: ignoreCacheOnFirstRefresh, + } + + err := m.refresh(ctx) + + return m, err +} + +// ErrAlreadyInitialized indicates that repository has already been initialized. +var ErrAlreadyInitialized = errors.Errorf("repository already initialized") + +// Initialize initializes the format blob in a given storage. +func Initialize(ctx context.Context, st blob.Storage, formatBlob *KopiaRepositoryJSON, repoConfig *RepositoryConfig, blobcfg BlobStorageConfiguration, password string) error { + // get the blob - expect ErrNotFound + var tmp gather.WriteBuffer + defer tmp.Close() + + err := st.GetBlob(ctx, KopiaRepositoryBlobID, 0, -1, &tmp) + if err == nil { + return ErrAlreadyInitialized + } + + if !errors.Is(err, blob.ErrBlobNotFound) { + return errors.Wrap(err, "unexpected error when checking for format blob") + } + + err = st.GetBlob(ctx, KopiaBlobCfgBlobID, 0, -1, &tmp) + if err == nil { + return errors.Errorf("possible corruption: blobcfg blob exists, but format blob is not found") + } + + if !errors.Is(err, blob.ErrBlobNotFound) { + return errors.Wrap(err, "unexpected error when checking for blobcfg blob") + } + + if formatBlob.EncryptionAlgorithm == "" { + formatBlob.EncryptionAlgorithm = DefaultFormatEncryption + } + + if formatBlob.KeyDerivationAlgorithm == "" { + formatBlob.KeyDerivationAlgorithm = DefaultKeyDerivationAlgorithm + } + + if len(formatBlob.UniqueID) == 0 { + formatBlob.UniqueID = randomBytes(UniqueIDLengthBytes) + } + + formatEncryptionKey, err := formatBlob.DeriveFormatEncryptionKeyFromPassword(password) + if err != nil { + return errors.Wrap(err, "unable to derive format encryption key") + } + + if err = repoConfig.MutableParameters.Validate(); err != nil { + return errors.Wrap(err, "invalid parameters") + } + + if err = blobcfg.Validate(); err != nil { + return errors.Wrap(err, "blob config") + } + + if err = formatBlob.EncryptRepositoryConfig(repoConfig, formatEncryptionKey); err != nil { + return errors.Wrap(err, "unable to encrypt format bytes") + } + + if err := formatBlob.WriteBlobCfgBlob(ctx, st, blobcfg, formatEncryptionKey); err != nil { + return errors.Wrap(err, "unable to write blobcfg blob") + } + + if err := formatBlob.WriteKopiaRepositoryBlob(ctx, st, blobcfg); err != nil { + return errors.Wrap(err, "unable to write format blob") + } + + return nil +} + +var _ Provider = (*Manager)(nil) + +func randomBytes(n int) []byte { + b := make([]byte, n) + io.ReadFull(rand.Reader, b) //nolint:errcheck + + return b +} diff --git a/repo/format/format_manager_test.go b/repo/format/format_manager_test.go new file mode 100644 index 000000000..2b0c1b92a --- /dev/null +++ b/repo/format/format_manager_test.go @@ -0,0 +1,309 @@ +package format_test + +import ( + "bytes" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/internal/blobtesting" + "github.com/kopia/kopia/internal/epoch" + "github.com/kopia/kopia/internal/faketime" + "github.com/kopia/kopia/internal/feature" + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/encryption" + "github.com/kopia/kopia/repo/format" + "github.com/kopia/kopia/repo/hashing" +) + +var ( + errSomeError = errors.Errorf("some error") + + cf = format.ContentFormat{ + MutableParameters: format.MutableParameters{ + Version: format.FormatVersion1, + EpochParameters: epoch.DefaultParameters(), + MaxPackSize: 20e6, + IndexVersion: 2, + }, + Hash: hashing.DefaultAlgorithm, + Encryption: encryption.DefaultAlgorithm, + HMACSecret: []byte{1, 2, 3, 4, 5}, + } + + uli = &format.UpgradeLockIntent{ + OwnerID: "foo@bar", + } + + rc = &format.RepositoryConfig{ + ContentFormat: cf, + UpgradeLock: uli, + } + + cacheDuration = 10 * time.Minute +) + +func TestFormatManager(t *testing.T) { + ctx := testlogging.Context(t) + + startTime := time.Date(2020, 1, 1, 12, 0, 0, 0, time.UTC) + ta := faketime.NewTimeAdvance(startTime, 0) + nowFunc := ta.NowFunc() + cache := format.NewMemoryBlobCache(nowFunc) + + st := blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, nil) + fst := blobtesting.NewFaultyStorage(st) + require.NoError(t, format.Initialize(ctx, fst, &format.KopiaRepositoryJSON{}, rc, format.BlobStorageConfiguration{}, "some-password")) + + rawBytes := mustGetBytes(t, st, "kopia.repository") + + mgr, err := format.NewManagerWithCache(ctx, fst, cacheDuration, "some-password", nowFunc, cache) + require.NoError(t, err) + + require.Equal(t, cf.HMACSecret, mgr.GetHmacSecret()) + require.Equal(t, cf.Encryption, mgr.GetEncryptionAlgorithm()) + require.Equal(t, cf.Hash, mgr.GetHashFunction()) + require.NotNil(t, mgr.HashFunc()) + require.NotNil(t, mgr.Encryptor()) + require.Equal(t, cf.MasterKey, mgr.GetMasterKey()) + require.Equal(t, false, mgr.SupportsPasswordChange()) + require.Equal(t, startTime, mgr.LoadedTime()) + require.Equal(t, cf.MutableParameters, mustGetMutableParameters(t, mgr)) + require.True(t, bytes.Contains(mustGetRepositoryFormatBytes(t, mgr), rawBytes)) + require.Equal(t, uli, mustGetUpgradeLockIntent(t, mgr)) + + // move time to be 1ns shy of when the cache expires + fst.AddFault(blobtesting.MethodGetBlob).ErrorInstead(errSomeError) + ta.Advance(cacheDuration - 1) + + // despite the failure, we still trust the cache + mustGetMutableParameters(t, mgr) + + // now move the final nanosecond, this will trigger a load and storage errors + ta.Advance(1) + + // error on first read, subsequent reads are ok + require.ErrorIs(t, expectMutableParametersError(t, mgr), errSomeError) + mustGetMutableParameters(t, mgr) + mustGetMutableParameters(t, mgr) + + n := mgr.LoadedTime() + + require.Equal(t, 2, mgr.RefreshCount()) + + // open another manager when cache is still valid, it will reuse old cached time + ta.Advance(5) + + mgr2, err := format.NewManagerWithCache(ctx, fst, cacheDuration, "some-password", nowFunc, cache) + require.NoError(t, err) + + mustGetMutableParameters(t, mgr2) + + require.Equal(t, n, mgr2.LoadedTime()) + // open another manager when cache has already expired + ta.Advance(2 * cacheDuration) + + n = ta.NowFunc()() + + mgr3, err := format.NewManagerWithCache(ctx, fst, cacheDuration, "some-password", nowFunc, cache) + require.NoError(t, err) + + // make sure we're using current time + require.Equal(t, n, mgr3.LoadedTime()) + + // update using mgr3 + mp := mustGetMutableParameters(t, mgr3) + bc2 := mustGetBlobStorageConfiguration(t, mgr3) + rf2 := mustGetRequiredFeatures(t, mgr3) + + // make some changes + mp.MaxPackSize++ + + require.NoError(t, mgr3.SetParameters(ctx, mp, bc2, rf2)) + + // enough time has passed since last read, so mgr will notice the update immediately + require.Equal(t, mp, mustGetMutableParameters(t, mgr)) + + // update again + oldmp := mp + mp.MaxPackSize++ + require.NoError(t, mgr3.SetParameters(ctx, mp, bc2, rf2)) + + // mgr still sees old mp + require.Equal(t, oldmp, mustGetMutableParameters(t, mgr)) + + // advance time, the now update is now visible + ta.Advance(cacheDuration) + require.Equal(t, mp, mustGetMutableParameters(t, mgr)) +} + +func TestInitialize(t *testing.T) { + ctx := testlogging.Context(t) + + st := blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, nil) + fst := blobtesting.NewFaultyStorage(st) + + // error fetching first blob - kopia.repository + fst.AddFault(blobtesting.MethodGetBlob).ErrorInstead(errSomeError) + require.ErrorIs(t, + format.Initialize(ctx, fst, &format.KopiaRepositoryJSON{}, rc, format.BlobStorageConfiguration{}, "some-password"), + errSomeError) + + // error fetching second blob - kopia.blobcfg + fst.AddFault(blobtesting.MethodGetBlob) + fst.AddFault(blobtesting.MethodGetBlob).ErrorInstead(errSomeError) + require.ErrorIs(t, + format.Initialize(ctx, fst, &format.KopiaRepositoryJSON{}, rc, format.BlobStorageConfiguration{}, "some-password"), + errSomeError) + + // success + require.NoError(t, format.Initialize(ctx, fst, &format.KopiaRepositoryJSON{}, rc, format.BlobStorageConfiguration{}, "some-password")) + + // already initialized + require.ErrorIs(t, + format.Initialize(ctx, fst, &format.KopiaRepositoryJSON{}, rc, format.BlobStorageConfiguration{}, "some-password"), + format.ErrAlreadyInitialized) +} + +func TestChangePassword(t *testing.T) { + ctx := testlogging.Context(t) + + startTime := time.Date(2020, 1, 1, 12, 0, 0, 0, time.UTC) + ta := faketime.NewTimeAdvance(startTime, 0) + nowFunc := ta.NowFunc() + cache := format.NewMemoryBlobCache(nowFunc) + + cf2 := cf + cf2.Version = format.FormatVersion3 + cf2.EnablePasswordChange = true + + rc = &format.RepositoryConfig{ + ContentFormat: cf2, + UpgradeLock: uli, + } + + st := blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, nil) + fst := blobtesting.NewFaultyStorage(st) + require.NoError(t, format.Initialize(ctx, fst, &format.KopiaRepositoryJSON{}, rc, format.BlobStorageConfiguration{}, "some-password")) + + mgr, err := format.NewManagerWithCache(ctx, fst, cacheDuration, "some-password", nowFunc, cache) + require.NoError(t, err) + + mgr2, err := format.NewManagerWithCache(ctx, fst, cacheDuration, "some-password", nowFunc, cache) + require.NoError(t, err) + + require.NoError(t, mgr2.ChangePassword(ctx, "new-password")) + + // immediately after changing the password, both managers can still read the repo + mustGetMutableParameters(t, mgr) + mustGetMutableParameters(t, mgr2) + + ta.Advance(cacheDuration) + + require.ErrorIs(t, expectMutableParametersError(t, mgr), format.ErrInvalidPassword) + mustGetMutableParameters(t, mgr2) + + _, err = format.NewManagerWithCache(ctx, fst, cacheDuration, "some-password", nowFunc, cache) + require.ErrorIs(t, err, format.ErrInvalidPassword) +} + +func TestFormatManagerValidDuration(t *testing.T) { + cases := map[time.Duration]time.Duration{ + -1: 15 * time.Minute, + time.Second: time.Second, + 30 * time.Minute: 15 * time.Minute, + 10 * time.Minute: 10 * time.Minute, + } + + for requestedCacheDuration, actualCacheDuration := range cases { + ctx := testlogging.Context(t) + + startTime := time.Date(2020, 1, 1, 12, 0, 0, 0, time.UTC) + ta := faketime.NewTimeAdvance(startTime, 0) + nowFunc := ta.NowFunc() + cache := format.NewMemoryBlobCache(nowFunc) + + st := blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, nil) + fst := blobtesting.NewFaultyStorage(st) + require.NoError(t, format.Initialize(ctx, fst, &format.KopiaRepositoryJSON{}, rc, format.BlobStorageConfiguration{}, "some-password")) + + if requestedCacheDuration < 0 { + // plant a malformed cache entry to ensure it's not being used + cache.Put(ctx, "kopia.repository", []byte("malformed")) + } + + mgr, err := format.NewManagerWithCache(ctx, fst, requestedCacheDuration, "some-password", nowFunc, cache) + require.NoError(t, err) + + require.Equal(t, actualCacheDuration, mgr.ValidCacheDuration()) + } +} + +func mustGetMutableParameters(t *testing.T, mgr *format.Manager) format.MutableParameters { + t.Helper() + + mp, err := mgr.GetMutableParameters() + require.NoError(t, err) + + return mp +} + +func mustGetUpgradeLockIntent(t *testing.T, mgr *format.Manager) *format.UpgradeLockIntent { + t.Helper() + + uli, err := mgr.GetUpgradeLockIntent(testlogging.Context(t)) + require.NoError(t, err) + + return uli +} + +func mustGetRepositoryFormatBytes(t *testing.T, mgr *format.Manager) []byte { + t.Helper() + + b, err := mgr.RepositoryFormatBytes() + require.NoError(t, err) + + return b +} + +func mustGetRequiredFeatures(t *testing.T, mgr *format.Manager) []feature.Required { + t.Helper() + + rf, err := mgr.RequiredFeatures() + require.NoError(t, err) + + return rf +} + +func mustGetBlobStorageConfiguration(t *testing.T, mgr *format.Manager) format.BlobStorageConfiguration { + t.Helper() + + cfg, err := mgr.BlobCfgBlob() + require.NoError(t, err) + + return cfg +} + +func expectMutableParametersError(t *testing.T, mgr *format.Manager) error { + t.Helper() + + _, err := mgr.GetMutableParameters() + require.Error(t, err) + + return err +} + +func mustGetBytes(t *testing.T, st blob.Storage, blobID blob.ID) []byte { + t.Helper() + + var tmp gather.WriteBuffer + defer tmp.Close() + + require.NoError(t, st.GetBlob(testlogging.Context(t), blobID, 0, -1, &tmp)) + + return tmp.ToByteSlice() +} diff --git a/repo/format/format_provider.go b/repo/format/format_provider.go index 21d8c6345..f0d23e0e7 100644 --- a/repo/format/format_provider.go +++ b/repo/format/format_provider.go @@ -59,8 +59,8 @@ type Provider interface { GetMutableParameters() (MutableParameters, error) SupportsPasswordChange() bool GetMasterKey() []byte - RepositoryFormatBytes() []byte - Struct() ContentFormat + + RepositoryFormatBytes() ([]byte, error) } type formattingOptionsProvider struct { @@ -71,13 +71,11 @@ type formattingOptionsProvider struct { formatBytes []byte } -func (f *formattingOptionsProvider) Struct() ContentFormat { - return *f.ContentFormat -} - // NewFormattingOptionsProvider validates the provided formatting options and returns static // FormattingOptionsProvider based on them. -func NewFormattingOptionsProvider(f *ContentFormat, formatBytes []byte) (Provider, error) { +func NewFormattingOptionsProvider(f0 *ContentFormat, formatBytes []byte) (Provider, error) { + clone := *f0 + f := &clone formatVersion := f.Version if formatVersion < MinSupportedReadVersion || formatVersion > CurrentWriteVersion { @@ -96,6 +94,12 @@ func NewFormattingOptionsProvider(f *ContentFormat, formatBytes []byte) (Provide return nil, errors.Errorf("index version %v is not supported", f.IndexVersion) } + // apply default + if f.MaxPackSize == 0 { + // legacy only, apply default + f.MaxPackSize = 20 << 20 //nolint:gomnd + } + h, err := hashing.CreateHashFunc(f) if err != nil { return nil, errors.Wrap(err, "unable to create hash") @@ -145,8 +149,12 @@ func (f *formattingOptionsProvider) HashFunc() hashing.HashFunc { return f.h } -func (f *formattingOptionsProvider) RepositoryFormatBytes() []byte { - return f.formatBytes +func (f *formattingOptionsProvider) RepositoryFormatBytes() ([]byte, error) { + if f.SupportsPasswordChange() { + return nil, nil + } + + return f.formatBytes, nil } var _ Provider = (*formattingOptionsProvider)(nil) diff --git a/repo/format/format_set_parameters.go b/repo/format/format_set_parameters.go new file mode 100644 index 000000000..e417f5c71 --- /dev/null +++ b/repo/format/format_set_parameters.go @@ -0,0 +1,48 @@ +package format + +import ( + "context" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/feature" + "github.com/kopia/kopia/repo/blob" +) + +// SetParameters sets the mutable repository parameters. +func (m *Manager) SetParameters( + ctx context.Context, + mp MutableParameters, + blobcfg BlobStorageConfiguration, + requiredFeatures []feature.Required, +) error { + m.mu.Lock() + defer m.mu.Unlock() + + if err := mp.Validate(); err != nil { + return errors.Wrap(err, "invalid parameters") + } + + if err := blobcfg.Validate(); err != nil { + return errors.Wrap(err, "invalid blob-config options") + } + + m.repoConfig.ContentFormat.MutableParameters = mp + m.repoConfig.RequiredFeatures = requiredFeatures + + if err := m.j.EncryptRepositoryConfig(m.repoConfig, m.formatEncryptionKey); err != nil { + return errors.Errorf("unable to encrypt format bytes") + } + + if err := m.j.WriteBlobCfgBlob(ctx, m.blobs, blobcfg, m.formatEncryptionKey); err != nil { + return errors.Wrap(err, "unable to write blobcfg blob") + } + + if err := m.j.WriteKopiaRepositoryBlob(ctx, m.blobs, m.blobCfgBlob); err != nil { + return errors.Wrap(err, "unable to write format blob") + } + + m.cache.Remove(ctx, []blob.ID{KopiaRepositoryBlobID, KopiaBlobCfgBlobID}) + + return nil +} diff --git a/repo/format/repository_config.go b/repo/format/repository_config.go index 134c4ecdf..69d81b306 100644 --- a/repo/format/repository_config.go +++ b/repo/format/repository_config.go @@ -2,7 +2,6 @@ import ( "encoding/json" - "time" "github.com/pkg/errors" @@ -24,17 +23,8 @@ type EncryptedRepositoryConfig struct { Format RepositoryConfig `json:"format"` } -// DecodedRepositoryConfig encapsulates contents of decoded `kopia.repository` blob. -type DecodedRepositoryConfig struct { - KopiaRepository *KopiaRepositoryJSON - KopiaRepositoryBytes []byte // serialized format blob - CacheMTime time.Time // mod time of the format blob cache file - RepoConfig *RepositoryConfig // unencrypted format blob structure - FormatEncryptionKey []byte // key derived from the password -} - -// DecryptRepositoryConfig decrypts RepositoryConfig stored in EncryptedFormatBytes. -func (f *KopiaRepositoryJSON) DecryptRepositoryConfig(masterKey []byte) (*RepositoryConfig, error) { +// decryptRepositoryConfig decrypts RepositoryConfig stored in EncryptedFormatBytes. +func (f *KopiaRepositoryJSON) decryptRepositoryConfig(masterKey []byte) (*RepositoryConfig, error) { switch f.EncryptionAlgorithm { case aes256GcmEncryption: plainText, err := decryptRepositoryBlobBytesAes256Gcm(f.EncryptedFormatBytes, masterKey, f.UniqueID) diff --git a/repo/format/upgrade_lock.go b/repo/format/upgrade_lock.go new file mode 100644 index 000000000..faee00bb4 --- /dev/null +++ b/repo/format/upgrade_lock.go @@ -0,0 +1,172 @@ +package format + +import ( + "context" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/repo/blob" +) + +// BackupBlobIDPrefix is the prefix for all identifiers of the BLOBs that +// keep a backup copy of the FormatBlobID BLOB for the purposes of rollback +// during upgrade. +const BackupBlobIDPrefix = "kopia.repository.backup." + +// BackupBlobID gets the upgrade backu pblob-id fro mthe lock. +func BackupBlobID(l UpgradeLockIntent) blob.ID { + return blob.ID(BackupBlobIDPrefix + l.OwnerID) +} + +// SetUpgradeLockIntent sets the upgrade lock intent on the repository format +// blob for other clients to notice. If a lock intent was already placed then +// it updates the existing lock using the output of the UpgradeLock.Update(). +// +// This method also backs up the original format version on the upgrade lock +// intent and sets the latest format-version o nthe repository blob. This +// should cause the unsupporting clients (non-upgrade capable) to fail +// connecting to the repository. +func (m *Manager) SetUpgradeLockIntent(ctx context.Context, l UpgradeLockIntent) (*UpgradeLockIntent, error) { + if err := m.maybeRefreshNotLocked(); err != nil { + return nil, err + } + + m.mu.Lock() + defer m.mu.Unlock() + + if err := l.Validate(); err != nil { + return nil, errors.Wrap(err, "invalid upgrade lock intent") + } + + if m.repoConfig.UpgradeLock == nil { + // when we are putting a new lock then ensure that we can upgrade + // to that version + if m.repoConfig.ContentFormat.Version >= MaxFormatVersion { + return nil, errors.Errorf("repository is using version %d, and version %d is the maximum", + m.repoConfig.ContentFormat.Version, MaxFormatVersion) + } + + // backup the current repository config from local cache to the + // repository when we place the lock for the first time + if err := m.j.WriteKopiaRepositoryBlobWithID(ctx, m.blobs, m.blobCfgBlob, BackupBlobID(l)); err != nil { + return nil, errors.Wrap(err, "failed to backup the repo format blob") + } + + // set a new lock or revoke an existing lock + m.repoConfig.UpgradeLock = &l + // mark the upgrade to the new format version, this will ensure that older + // clients won't be able to parse the new version + m.repoConfig.ContentFormat.Version = MaxFormatVersion + } else if newL, err := m.repoConfig.UpgradeLock.Update(&l); err == nil { + m.repoConfig.UpgradeLock = newL + } else { + return nil, errors.Wrap(err, "failed to update the existing lock") + } + + if err := m.updateRepoConfigLocked(ctx); err != nil { + return nil, errors.Wrap(err, "error updating repo config") + } + + return m.repoConfig.UpgradeLock.Clone(), nil +} + +// CommitUpgrade removes the upgrade lock from the from the repository format +// blob. This in-effect commits the new repository format t othe repository and +// resumes all access to the repository. +func (m *Manager) CommitUpgrade(ctx context.Context) error { + if err := m.maybeRefreshNotLocked(); err != nil { + return err + } + + m.mu.Lock() + defer m.mu.Unlock() + + if m.repoConfig.UpgradeLock == nil { + return errors.New("no upgrade in progress") + } + + // restore the old format version + m.repoConfig.UpgradeLock = nil + + return m.updateRepoConfigLocked(ctx) +} + +// RollbackUpgrade removes the upgrade lock while also restoring the +// format-blob's original version. This method does not restore the original +// repository data format and neither does it validate against any repository +// changes. Rolling back the repository format is currently not supported and +// hence using this API could render the repository corrupted and unreadable by +// clients. +func (m *Manager) RollbackUpgrade(ctx context.Context) error { + if err := m.maybeRefreshNotLocked(); err != nil { + return err + } + + m.mu.Lock() + defer m.mu.Unlock() + + if m.repoConfig.UpgradeLock == nil { + return errors.New("no upgrade in progress") + } + + // restore the oldest backup and delete the rest + var oldestBackup *blob.Metadata + + if err := m.blobs.ListBlobs(ctx, BackupBlobIDPrefix, func(bm blob.Metadata) error { + var delID blob.ID + if oldestBackup == nil || bm.Timestamp.Before(oldestBackup.Timestamp) { + if oldestBackup != nil { + // delete the current candidate because we have found an even older one + delID = oldestBackup.BlobID + } + oldestBackup = &bm + } else { + delID = bm.BlobID + } + + if delID != "" { + // delete the backup that we are not going to need for rollback + if err := m.blobs.DeleteBlob(ctx, delID); err != nil { + return errors.Wrapf(err, "failed to delete the format blob backup %q", delID) + } + } + + return nil + }); err != nil { + return errors.Wrap(err, "failed to list backup blobs") + } + + // restore only when we find a backup, otherwise simply cleanup the local cache + if oldestBackup != nil { + var d gather.WriteBuffer + if err := m.blobs.GetBlob(ctx, oldestBackup.BlobID, 0, -1, &d); err != nil { + return errors.Wrapf(err, "failed to read from backup %q", oldestBackup.BlobID) + } + + if err := m.blobs.PutBlob(ctx, KopiaRepositoryBlobID, d.Bytes(), blob.PutOptions{}); err != nil { + return errors.Wrapf(err, "failed to restore format blob from backup %q", oldestBackup.BlobID) + } + + // delete the backup after we have restored the format-blob + if err := m.blobs.DeleteBlob(ctx, oldestBackup.BlobID); err != nil { + return errors.Wrapf(err, "failed to delete the format blob backup %q", oldestBackup.BlobID) + } + } + + m.cache.Remove(ctx, []blob.ID{KopiaRepositoryBlobID}) + + return nil +} + +// GetUpgradeLockIntent gets the current upgrade lock intent. +func (m *Manager) GetUpgradeLockIntent(ctx context.Context) (*UpgradeLockIntent, error) { + if err := m.maybeRefreshNotLocked(); err != nil { + return nil, err + } + + m.mu.RLock() + defer m.mu.RUnlock() + + return m.repoConfig.UpgradeLock, nil +} diff --git a/repo/format/upgrade_lock_intent.go b/repo/format/upgrade_lock_intent.go index b75e4ba34..8b9e9fd9f 100644 --- a/repo/format/upgrade_lock_intent.go +++ b/repo/format/upgrade_lock_intent.go @@ -51,7 +51,12 @@ func (l *UpgradeLockIntent) Update(other *UpgradeLockIntent) (*UpgradeLockIntent // Clone creates a copy of the UpgradeLock instance. func (l *UpgradeLockIntent) Clone() *UpgradeLockIntent { + if l == nil { + return nil + } + clone := *l + return &clone } diff --git a/repo/upgrade_lock_test.go b/repo/format/upgrade_lock_test.go similarity index 78% rename from repo/upgrade_lock_test.go rename to repo/format/upgrade_lock_test.go index 1969161c8..9deb21a49 100644 --- a/repo/upgrade_lock_test.go +++ b/repo/format/upgrade_lock_test.go @@ -1,4 +1,4 @@ -package repo_test +package format_test import ( "context" @@ -24,6 +24,7 @@ "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/repo/encryption" "github.com/kopia/kopia/repo/format" + "github.com/kopia/kopia/repo/object" ) func TestFormatUpgradeSetLock(t *testing.T) { @@ -43,17 +44,17 @@ func TestFormatUpgradeSetLock(t *testing.T) { } // set invalid lock - _, err := env.RepositoryWriter.SetUpgradeLockIntent(ctx, *l) + _, err := env.RepositoryWriter.FormatManager().SetUpgradeLockIntent(ctx, *l) require.EqualError(t, err, "invalid upgrade lock intent: no owner-id set, it is required to set a unique owner-id") l.OwnerID = "upgrade-owner" - l, err = env.RepositoryWriter.SetUpgradeLockIntent(ctx, *l) + l, err = env.RepositoryWriter.FormatManager().SetUpgradeLockIntent(ctx, *l) require.NoError(t, err) l.OwnerID = "new-upgrade-owner" // verify that second owner cannot set / update the lock - _, err = env.RepositoryWriter.SetUpgradeLockIntent(ctx, *l) + _, err = env.RepositoryWriter.FormatManager().SetUpgradeLockIntent(ctx, *l) require.EqualError(t, err, "failed to update the existing lock: upgrade owner-id mismatch \"new-upgrade-owner\" != \"upgrade-owner\", you are not the owner of the upgrade lock") @@ -63,10 +64,10 @@ func TestFormatUpgradeSetLock(t *testing.T) { l.AdvanceNoticeDuration *= 2 // update the lock - _, err = env.RepositoryWriter.SetUpgradeLockIntent(ctx, *l) + _, err = env.RepositoryWriter.FormatManager().SetUpgradeLockIntent(ctx, *l) require.NoError(t, err) - require.NoError(t, env.RepositoryWriter.CommitUpgrade(ctx)) + require.NoError(t, env.RepositoryWriter.FormatManager().CommitUpgrade(ctx)) } func TestFormatUpgradeAlreadyUpgraded(t *testing.T) { @@ -83,7 +84,7 @@ func TestFormatUpgradeAlreadyUpgraded(t *testing.T) { MaxPermittedClockDrift: formatBlockCacheDuration / 3, } - _, err := env.RepositoryWriter.SetUpgradeLockIntent(ctx, *l) + _, err := env.RepositoryWriter.FormatManager().SetUpgradeLockIntent(ctx, *l) require.EqualError(t, err, fmt.Sprintf("repository is using version %d, and version %d is the maximum", format.MaxFormatVersion, format.MaxFormatVersion)) } @@ -104,15 +105,15 @@ func TestFormatUpgradeCommit(t *testing.T) { MaxPermittedClockDrift: formatBlockCacheDuration / 3, } - require.EqualError(t, env.RepositoryWriter.CommitUpgrade(ctx), "no upgrade in progress") + require.EqualError(t, env.RepositoryWriter.FormatManager().CommitUpgrade(ctx), "no upgrade in progress") - _, err := env.RepositoryWriter.SetUpgradeLockIntent(ctx, *l) + _, err := env.RepositoryWriter.FormatManager().SetUpgradeLockIntent(ctx, *l) require.NoError(t, err) - require.NoError(t, env.RepositoryWriter.CommitUpgrade(ctx)) + require.NoError(t, env.RepositoryWriter.FormatManager().CommitUpgrade(ctx)) // verify that rollback after commit fails - require.EqualError(t, env.RepositoryWriter.RollbackUpgrade(ctx), "no upgrade in progress") + require.EqualError(t, env.RepositoryWriter.FormatManager().RollbackUpgrade(ctx), "no upgrade in progress") } func TestFormatUpgradeRollback(t *testing.T) { @@ -131,16 +132,16 @@ func TestFormatUpgradeRollback(t *testing.T) { MaxPermittedClockDrift: formatBlockCacheDuration / 3, } - _, err := env.RepositoryWriter.SetUpgradeLockIntent(ctx, *l) + _, err := env.RepositoryWriter.FormatManager().SetUpgradeLockIntent(ctx, *l) require.NoError(t, err) - require.NoError(t, env.RepositoryWriter.RollbackUpgrade(ctx)) + require.NoError(t, env.RepositoryWriter.FormatManager().RollbackUpgrade(ctx)) // reopen the repo because we still have the lock in-memory env.MustReopen(t) // verify that commit after rollback fails - require.EqualError(t, env.RepositoryWriter.CommitUpgrade(ctx), "no upgrade in progress") + require.EqualError(t, env.RepositoryWriter.FormatManager().CommitUpgrade(ctx), "no upgrade in progress") } func TestFormatUpgradeMultipleLocksRollback(t *testing.T) { @@ -164,25 +165,25 @@ func TestFormatUpgradeMultipleLocksRollback(t *testing.T) { }) // first lock by primary creator - _, err := env.RepositoryWriter.SetUpgradeLockIntent(ctx, *l) + _, err := env.RepositoryWriter.FormatManager().SetUpgradeLockIntent(ctx, *l) require.NoError(t, err) // second lock from a random owner secondL := l.Clone() secondL.OwnerID = "another-upgrade-owner" - _, err = secondWriter.(repo.DirectRepositoryWriter).SetUpgradeLockIntent(ctx, *secondL) + _, err = secondWriter.(repo.DirectRepositoryWriter).FormatManager().SetUpgradeLockIntent(ctx, *secondL) require.NoError(t, err) // verify that we have two repository backups, the second one will contain // the first owner's lock { var backups []string - require.NoError(t, env.RootStorage().ListBlobs(ctx, repo.FormatBlobBackupIDPrefix, func(bm blob.Metadata) error { + require.NoError(t, env.RootStorage().ListBlobs(ctx, format.BackupBlobIDPrefix, func(bm blob.Metadata) error { backups = append(backups, string(bm.BlobID)) return nil })) sort.Strings(backups) - require.Equal(t, []string{string(repo.FormatBlobBackupID(*secondL)), string(repo.FormatBlobBackupID(*l))}, + require.Equal(t, []string{string(format.BackupBlobID(*secondL)), string(format.BackupBlobID(*l))}, backups, "invalid backups list") } @@ -195,10 +196,10 @@ func TestFormatUpgradeMultipleLocksRollback(t *testing.T) { require.NoError(t, mperr) require.Equal(t, format.FormatVersion3, mp.Version) - require.NoError(t, env.RepositoryWriter.RollbackUpgrade(ctx)) + require.NoError(t, env.RepositoryWriter.FormatManager().RollbackUpgrade(ctx)) // verify that we have no repository backups pending - require.NoError(t, env.RootStorage().ListBlobs(ctx, repo.FormatBlobBackupIDPrefix, func(bm blob.Metadata) error { + require.NoError(t, env.RootStorage().ListBlobs(ctx, format.BackupBlobIDPrefix, func(bm blob.Metadata) error { t.Fatalf("found unexpected backup: %s", bm.BlobID) return nil })) @@ -209,7 +210,7 @@ func TestFormatUpgradeMultipleLocksRollback(t *testing.T) { // verify that commit after rollback fails, this ensures that the correct // backup got restored because if the second backup was restored then we'd // still get a lock to be committed without any error - require.EqualError(t, env.RepositoryWriter.CommitUpgrade(ctx), "no upgrade in progress") + require.EqualError(t, env.RepositoryWriter.FormatManager().CommitUpgrade(ctx), "no upgrade in progress") // verify that we are back to the original version where we started from mp, err = env.RepositoryWriter.ContentManager().ContentFormat().GetMutableParameters() @@ -238,7 +239,7 @@ func TestFormatUpgradeFailureToBackupFormatBlobOnLock(t *testing.T) { blobtesting.NewVersionedMapStorage(nil), // GetBlob filter func(ctx context.Context, id blob.ID) error { - if !allowGets && id == repo.FormatBlobBackupID(allowedLock) { + if !allowGets && id == format.BackupBlobID(allowedLock) { return errors.New("unexpected error on get") } return nil @@ -252,7 +253,7 @@ func(ctx context.Context) error { }, // PutBlob callback func(ctx context.Context, id blob.ID, _ *blob.PutOptions) error { - if !allowPuts || (strings.HasPrefix(string(id), repo.FormatBlobBackupIDPrefix) && id != repo.FormatBlobBackupID(allowedLock)) { + if !allowPuts || (strings.HasPrefix(string(id), format.BackupBlobIDPrefix) && id != format.BackupBlobID(allowedLock)) { return errors.New("unexpected error") } return nil @@ -286,31 +287,31 @@ func(ctx context.Context, id blob.ID, _ *blob.PutOptions) error { r, err := repo.Open(testlogging.Context(t), configFile, "password", &repo.Options{UpgradeOwnerID: "allowed-upgrade-owner"}) require.NoError(t, err) - _, err = r.(repo.DirectRepositoryWriter).SetUpgradeLockIntent(testlogging.Context(t), *faultyLock) + _, err = r.(repo.DirectRepositoryWriter).FormatManager().SetUpgradeLockIntent(testlogging.Context(t), *faultyLock) require.EqualError(t, err, "failed to backup the repo format blob: unable to write format blob \"kopia.repository.backup.faulty-upgrade-owner\": unexpected error") - _, err = r.(repo.DirectRepositoryWriter).SetUpgradeLockIntent(testlogging.Context(t), allowedLock) + _, err = r.(repo.DirectRepositoryWriter).FormatManager().SetUpgradeLockIntent(testlogging.Context(t), allowedLock) require.NoError(t, err) - require.EqualError(t, r.(repo.DirectRepositoryWriter).RollbackUpgrade(testlogging.Context(t)), + require.EqualError(t, r.(repo.DirectRepositoryWriter).FormatManager().RollbackUpgrade(testlogging.Context(t)), "failed to delete the format blob backup \"kopia.repository.backup.allowed-upgrade-owner\": unexpected error") - require.EqualError(t, r.(repo.DirectRepositoryWriter).RollbackUpgrade(testlogging.Context(t)), + require.EqualError(t, r.(repo.DirectRepositoryWriter).FormatManager().RollbackUpgrade(testlogging.Context(t)), "failed to delete the format blob backup \"kopia.repository.backup.allowed-upgrade-owner\": unexpected error") allowPuts = false - require.EqualError(t, r.(repo.DirectRepositoryWriter).RollbackUpgrade(testlogging.Context(t)), + require.EqualError(t, r.(repo.DirectRepositoryWriter).FormatManager().RollbackUpgrade(testlogging.Context(t)), "failed to restore format blob from backup \"kopia.repository.backup.allowed-upgrade-owner\": unexpected error") allowGets = false - require.EqualError(t, r.(repo.DirectRepositoryWriter).RollbackUpgrade(testlogging.Context(t)), + require.EqualError(t, r.(repo.DirectRepositoryWriter).FormatManager().RollbackUpgrade(testlogging.Context(t)), "failed to read from backup \"kopia.repository.backup.allowed-upgrade-owner\": unexpected error on get") allowPuts, allowGets, allowDeletes = true, true, true - require.NoError(t, r.(repo.DirectRepositoryWriter).RollbackUpgrade(testlogging.Context(t))) + require.NoError(t, r.(repo.DirectRepositoryWriter).FormatManager().RollbackUpgrade(testlogging.Context(t))) } func TestFormatUpgradeDuringOngoingWriteSessions(t *testing.T) { @@ -365,7 +366,8 @@ func TestFormatUpgradeDuringOngoingWriteSessions(t *testing.T) { MaxPermittedClockDrift: formatBlockCacheDuration / 3, } - _, err = env.RepositoryWriter.SetUpgradeLockIntent(ctx, l) + // set upgrade lock using independent client + _, err = env.MustConnectOpenAnother(t).(repo.DirectRepositoryWriter).FormatManager().SetUpgradeLockIntent(ctx, l) require.NoError(t, err) // ongoing writes should NOT get interrupted because the upgrade lock @@ -390,7 +392,20 @@ func TestFormatUpgradeDuringOngoingWriteSessions(t *testing.T) { // ongoing writes should get interrupted this time require.ErrorIs(t, w1.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress) + require.ErrorIs(t, w2.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress) require.ErrorIs(t, w3.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress) require.ErrorIs(t, lw.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress) } + +func writeObject(ctx context.Context, t *testing.T, rep repo.RepositoryWriter, data []byte, testCaseID string) { + t.Helper() + + w := rep.NewObjectWriter(ctx, object.WriterOptions{}) + + _, err := w.Write(data) + require.NoError(t, err, testCaseID) + + _, err = w.Result() + require.NoError(t, err, testCaseID) +} diff --git a/repo/initialize.go b/repo/initialize.go index 5fc63e2aa..96d71e2ed 100644 --- a/repo/initialize.go +++ b/repo/initialize.go @@ -9,7 +9,6 @@ "github.com/pkg/errors" - "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/content" "github.com/kopia/kopia/repo/ecc" @@ -31,7 +30,6 @@ const ( hmacSecretLength = 32 masterKeyLength = 32 - uniqueIDLength = 32 ) // NewRepositoryOptions specifies options that apply to newly created repositories. @@ -45,67 +43,22 @@ type NewRepositoryOptions struct { RetentionPeriod time.Duration `json:"retentionPeriod,omitempty"` } -// ErrAlreadyInitialized indicates that repository has already been initialized. -var ErrAlreadyInitialized = errors.Errorf("repository already initialized") - // Initialize creates initial repository data structures in the specified storage with given credentials. func Initialize(ctx context.Context, st blob.Storage, opt *NewRepositoryOptions, password string) error { if opt == nil { opt = &NewRepositoryOptions{} } - // get the blob - expect ErrNotFound - var tmp gather.WriteBuffer - defer tmp.Close() - - err := st.GetBlob(ctx, format.KopiaRepositoryBlobID, 0, -1, &tmp) - if err == nil { - return ErrAlreadyInitialized - } - - if !errors.Is(err, blob.ErrBlobNotFound) { - return errors.Wrap(err, "unexpected error when checking for format blob") - } - - err = st.GetBlob(ctx, format.KopiaBlobCfgBlobID, 0, -1, &tmp) - if err == nil { - return errors.Errorf("possible corruption: blobcfg blob exists, but format blob is not found") - } - - if !errors.Is(err, blob.ErrBlobNotFound) { - return errors.Wrap(err, "unexpected error when checking for blobcfg blob") - } - formatBlob := formatBlobFromOptions(opt) blobcfg := blobCfgBlobFromOptions(opt) - formatEncryptionKey, err := formatBlob.DeriveFormatEncryptionKeyFromPassword(password) - if err != nil { - return errors.Wrap(err, "unable to derive format encryption key") - } - - f, err := repositoryObjectFormatFromOptions(opt) + repoConfig, err := repositoryObjectFormatFromOptions(opt) if err != nil { return errors.Wrap(err, "invalid parameters") } - if err = f.MutableParameters.Validate(); err != nil { - return errors.Wrap(err, "invalid parameters") - } - - if err = formatBlob.EncryptRepositoryConfig(f, formatEncryptionKey); err != nil { - return errors.Wrap(err, "unable to encrypt format bytes") - } - - if err := formatBlob.WriteBlobCfgBlob(ctx, st, blobcfg, formatEncryptionKey); err != nil { - return errors.Wrap(err, "unable to write blobcfg blob") - } - - if err := formatBlob.WriteKopiaRepositoryBlob(ctx, st, blobcfg); err != nil { - return errors.Wrap(err, "unable to write format blob") - } - - return nil + //nolint:wrapcheck + return format.Initialize(ctx, st, formatBlob, repoConfig, blobcfg, password) } func formatBlobFromOptions(opt *NewRepositoryOptions) *format.KopiaRepositoryJSON { @@ -114,7 +67,7 @@ func formatBlobFromOptions(opt *NewRepositoryOptions) *format.KopiaRepositoryJSO BuildInfo: BuildInfo, BuildVersion: BuildVersion, KeyDerivationAlgorithm: format.DefaultKeyDerivationAlgorithm, - UniqueID: applyDefaultRandomBytes(opt.UniqueID, uniqueIDLength), + UniqueID: applyDefaultRandomBytes(opt.UniqueID, format.UniqueIDLengthBytes), EncryptionAlgorithm: format.DefaultFormatEncryption, } } @@ -178,13 +131,6 @@ func repositoryObjectFormatFromOptions(opt *NewRepositoryOptions) (*format.Repos return f, nil } -func randomBytes(n int) []byte { - b := make([]byte, n) - io.ReadFull(rand.Reader, b) //nolint:errcheck - - return b -} - func applyDefaultInt(v, def int) int { if v == 0 { return def @@ -211,6 +157,13 @@ func applyDefaultString(v, def string) string { return v } +func randomBytes(n int) []byte { + b := make([]byte, n) + io.ReadFull(rand.Reader, b) //nolint:errcheck + + return b +} + func applyDefaultRandomBytes(b []byte, n int) []byte { if b == nil { return randomBytes(n) diff --git a/repo/open.go b/repo/open.go index 292d4540c..67633d9b4 100644 --- a/repo/open.go +++ b/repo/open.go @@ -77,6 +77,9 @@ type Options struct { // ErrInvalidPassword is returned when repository password is invalid. var ErrInvalidPassword = format.ErrInvalidPassword +// ErrAlreadyInitialized is returned when repository is already initialized in the provided storage. +var ErrAlreadyInitialized = format.ErrAlreadyInitialized + // ErrRepositoryUnavailableDueToUpgrageInProgress is returned when repository // is undergoing upgrade that requires exclusive access. var ErrRepositoryUnavailableDueToUpgrageInProgress = errors.Errorf("repository upgrade in progress") @@ -198,7 +201,7 @@ func openDirect(ctx context.Context, configFile string, lc *LocalConfig, passwor // openWithConfig opens the repository with a given configuration, avoiding the need for a config file. // -//nolint:funlen,gocyclo,cyclop +//nolint:funlen,gocyclo func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, password string, options *Options, cacheOpts *content.CachingOptions, configFile string) (DirectRepository, error) { cacheOpts = cacheOpts.CloneOrDefault() cmOpts := &content.ManagerOptions{ @@ -206,19 +209,20 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw DisableInternalLog: options.DisableInternalLog, } - var ufb *format.DecodedRepositoryConfig + fmgr, ferr := format.NewManager(ctx, st, cacheOpts.CacheDirectory, lc.FormatBlobCacheDuration, password, cmOpts.TimeNow) + if ferr != nil { + return nil, errors.Wrap(ferr, "unable to create format manager") + } - if _, err := retry.WithExponentialBackoffMaxRetries(ctx, -1, "read repo config and wait for upgrade", func() (interface{}, error) { - var internalErr error - ufb, internalErr = format.ReadAndCacheDecodedRepositoryConfig(ctx, st, password, cacheOpts.CacheDirectory, - lc.FormatBlobCacheDuration) - if internalErr != nil { + if _, err := retry.WithExponentialBackoffMaxRetries(ctx, -1, "wait for upgrade", func() (interface{}, error) { + uli, err := fmgr.UpgradeLockIntent() + if err != nil { //nolint:wrapcheck - return nil, internalErr + return nil, err } // retry if upgrade lock has been taken - if locked, _ := ufb.RepoConfig.UpgradeLock.IsLocked(cmOpts.TimeNow()); locked && options.UpgradeOwnerID != ufb.RepoConfig.UpgradeLock.OwnerID { + if locked, _ := uli.IsLocked(cmOpts.TimeNow()); locked && options.UpgradeOwnerID != uli.OwnerID { return nil, ErrRepositoryUnavailableDueToUpgrageInProgress } @@ -230,40 +234,15 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw return nil, err } - if err := handleMissingRequiredFeatures(ctx, ufb.RepoConfig, options.TestOnlyIgnoreMissingRequiredFeatures); err != nil { + if err := handleMissingRequiredFeatures(ctx, fmgr, options.TestOnlyIgnoreMissingRequiredFeatures); err != nil { return nil, err } - cmOpts.RepositoryFormatBytes = ufb.KopiaRepositoryBytes - - // Read blobcfg blob, potentially from cache. - bb, _, err := format.ReadAndCacheRepositoryBlobBytes(ctx, st, cacheOpts.CacheDirectory, format.KopiaBlobCfgBlobID, lc.FormatBlobCacheDuration) - if err != nil && !errors.Is(err, blob.ErrBlobNotFound) { - return nil, errors.Wrap(err, "unable to read blobcfg blob") - } - - blobcfg, err := ufb.KopiaRepository.DeserializeBlobCfgBytes(bb, ufb.FormatEncryptionKey) - if err != nil { - return nil, ErrInvalidPassword - } - - if ufb.RepoConfig.ContentFormat.EnablePasswordChange { - cacheOpts.HMACSecret = format.DeriveKeyFromMasterKey(ufb.RepoConfig.HMACSecret, ufb.KopiaRepository.UniqueID, localCacheIntegrityPurpose, localCacheIntegrityHMACSecretLength) + if fmgr.SupportsPasswordChange() { + cacheOpts.HMACSecret = format.DeriveKeyFromMasterKey(fmgr.GetHmacSecret(), fmgr.UniqueID(), localCacheIntegrityPurpose, localCacheIntegrityHMACSecretLength) } else { // deriving from ufb.FormatEncryptionKey was actually a bug, that only matters will change when we change the password - cacheOpts.HMACSecret = format.DeriveKeyFromMasterKey(ufb.FormatEncryptionKey, ufb.KopiaRepository.UniqueID, localCacheIntegrityPurpose, localCacheIntegrityHMACSecretLength) - } - - fo := &ufb.RepoConfig.ContentFormat - - if fo.MaxPackSize == 0 { - // legacy only, apply default - fo.MaxPackSize = 20 << 20 //nolint:gomnd - } - - // do not embed repository format info in pack blobs when password change is enabled. - if fo.EnablePasswordChange { - cmOpts.RepositoryFormatBytes = nil + cacheOpts.HMACSecret = format.DeriveKeyFromMasterKey(fmgr.FormatEncryptionKey(), fmgr.UniqueID(), localCacheIntegrityPurpose, localCacheIntegrityHMACSecretLength) } limits := throttlingLimitsFromConnectionInfo(ctx, st.ConnectionInfo()) @@ -271,9 +250,9 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw limits = *lc.Throttling } - st, throttler, err := addThrottler(st, limits) - if err != nil { - return nil, errors.Wrap(err, "unable to add throttler") + st, throttler, ferr := addThrottler(st, limits) + if ferr != nil { + return nil, errors.Wrap(ferr, "unable to add throttler") } throttler.OnUpdate(func(l throttling.Limits) error { @@ -287,22 +266,21 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw return lc2.writeToFile(configFile) }) + blobcfg, err := fmgr.BlobCfgBlob() + if err != nil { + return nil, errors.Wrap(err, "blob configuration") + } + if blobcfg.IsRetentionEnabled() { st = wrapLockingStorage(st, blobcfg) } // background/interleaving upgrade lock storage monitor - st = upgradeLockMonitor(options.UpgradeOwnerID, st, password, cacheOpts, lc.FormatBlobCacheDuration, - ufb.CacheMTime, cmOpts.TimeNow, options.OnFatalError, options.TestOnlyIgnoreMissingRequiredFeatures) + st = upgradeLockMonitor(fmgr, options.UpgradeOwnerID, st, cmOpts.TimeNow, options.OnFatalError, options.TestOnlyIgnoreMissingRequiredFeatures) - fop, err := format.NewFormattingOptionsProvider(fo, cmOpts.RepositoryFormatBytes) - if err != nil { - return nil, errors.Wrap(err, "unable to create format options provider") - } - - scm, err := content.NewSharedManager(ctx, st, fop, cacheOpts, cmOpts) - if err != nil { - return nil, errors.Wrap(err, "unable to create shared content manager") + scm, ferr := content.NewSharedManager(ctx, st, fmgr, cacheOpts, cmOpts) + if ferr != nil { + return nil, errors.Wrap(ferr, "unable to create shared content manager") } cm := content.NewWriteManager(ctx, scm, content.SessionOptions{ @@ -310,14 +288,14 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw SessionHost: lc.Hostname, }, "") - om, err := object.NewObjectManager(ctx, cm, ufb.RepoConfig.ObjectFormat) - if err != nil { - return nil, errors.Wrap(err, "unable to open object manager") + om, ferr := object.NewObjectManager(ctx, cm, fmgr.ObjectFormat()) + if ferr != nil { + return nil, errors.Wrap(ferr, "unable to open object manager") } - manifests, err := manifest.NewManager(ctx, cm, manifest.ManagerOptions{TimeNow: cmOpts.TimeNow}) - if err != nil { - return nil, errors.Wrap(err, "unable to open manifests") + manifests, ferr := manifest.NewManager(ctx, cm, manifest.ManagerOptions{TimeNow: cmOpts.TimeNow}) + if ferr != nil { + return nil, errors.Wrap(ferr, "unable to open manifests") } dr := &directRepository{ @@ -327,16 +305,13 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw mmgr: manifests, sm: scm, directRepositoryParameters: directRepositoryParameters{ - uniqueID: ufb.KopiaRepository.UniqueID, - cachingOptions: *cacheOpts, - formatBlob: ufb.KopiaRepository, - blobCfgBlob: blobcfg, - formatEncryptionKey: ufb.FormatEncryptionKey, - timeNow: cmOpts.TimeNow, - cliOpts: lc.ClientOptions.ApplyDefaults(ctx, "Repository in "+st.DisplayName()), - configFile: configFile, - nextWriterID: new(int32), - throttler: throttler, + cachingOptions: *cacheOpts, + fmgr: fmgr, + timeNow: cmOpts.TimeNow, + cliOpts: lc.ClientOptions.ApplyDefaults(ctx, "Repository in "+st.DisplayName()), + configFile: configFile, + nextWriterID: new(int32), + throttler: throttler, }, closed: make(chan struct{}), } @@ -344,10 +319,15 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw return dr, nil } -func handleMissingRequiredFeatures(ctx context.Context, repoConfig *format.RepositoryConfig, ignoreErrors bool) error { +func handleMissingRequiredFeatures(ctx context.Context, fmgr *format.Manager, ignoreErrors bool) error { + required, err := fmgr.RequiredFeatures() + if err != nil { + return errors.Wrap(err, "required features") + } + // See if the current version of Kopia supports all features required by the repository format. // so we can safely fail to start in case repository has been upgraded to a new, incompatible version. - if missingFeatures := feature.GetUnsupportedFeatures(repoConfig.RequiredFeatures, supportedFeatures); len(missingFeatures) > 0 { + if missingFeatures := feature.GetUnsupportedFeatures(required, supportedFeatures); len(missingFeatures) > 0 { for _, mf := range missingFeatures { if ignoreErrors || mf.IfNotUnderstood.Warn { log(ctx).Warnf("%s", mf.UnsupportedMessage()) @@ -393,26 +373,22 @@ func addThrottler(st blob.Storage, limits throttling.Limits) (blob.Storage, thro } func upgradeLockMonitor( + fmgr *format.Manager, upgradeOwnerID string, st blob.Storage, - password string, - cacheOpts *content.CachingOptions, - lockRefreshInterval time.Duration, - lastSync time.Time, now func() time.Time, onFatalError func(err error), ignoreMissingRequiredFeatures bool, ) blob.Storage { var ( - m sync.RWMutex - nextSync = lastSync.Add(lockRefreshInterval) + m sync.RWMutex + lastCheckTime time.Time ) cb := func(ctx context.Context) error { - // protected read for nextSync because it will be shared between - // parallel storage operations m.RLock() - if nextSync.After(now()) { + // see if we already checked that revision + if lastCheckTime.Equal(fmgr.LoadedTime()) { m.RUnlock() return nil } @@ -422,31 +398,30 @@ func upgradeLockMonitor( m.Lock() defer m.Unlock() - if nextSync.After(now()) { + ltime := fmgr.LoadedTime() + + if lastCheckTime.Equal(ltime) { return nil } - ufb, err := format.ReadAndCacheDecodedRepositoryConfig(ctx, st, password, cacheOpts.CacheDirectory, lockRefreshInterval) + uli, err := fmgr.UpgradeLockIntent() if err != nil { - //nolint:wrapcheck - return err + return errors.Wrap(err, "upgrade lock intent") } - if err := handleMissingRequiredFeatures(ctx, ufb.RepoConfig, ignoreMissingRequiredFeatures); err != nil { + if err := handleMissingRequiredFeatures(ctx, fmgr, ignoreMissingRequiredFeatures); err != nil { onFatalError(err) return err } - // only allow the upgrade owner to perform storage operations - if locked, _ := ufb.RepoConfig.UpgradeLock.IsLocked(now()); locked && upgradeOwnerID != ufb.RepoConfig.UpgradeLock.OwnerID { - return ErrRepositoryUnavailableDueToUpgrageInProgress + if uli != nil { + // only allow the upgrade owner to perform storage operations + if locked, _ := uli.IsLocked(now()); locked && upgradeOwnerID != uli.OwnerID { + return ErrRepositoryUnavailableDueToUpgrageInProgress + } } - // prevent backward jumps on nextSync - newNextSync := ufb.CacheMTime.Add(lockRefreshInterval) - if newNextSync.After(nextSync) { - nextSync = newNextSync - } + lastCheckTime = ltime return nil } diff --git a/repo/parameters.go b/repo/parameters.go deleted file mode 100644 index 0afad138e..000000000 --- a/repo/parameters.go +++ /dev/null @@ -1,71 +0,0 @@ -package repo - -import ( - "context" - "os" - "path/filepath" - - "github.com/pkg/errors" - - "github.com/kopia/kopia/internal/feature" - "github.com/kopia/kopia/repo/format" -) - -func (r *directRepository) RequiredFeatures() ([]feature.Required, error) { - repoConfig, err := r.formatBlob.DecryptRepositoryConfig(r.formatEncryptionKey) - if err != nil { - return nil, errors.Wrap(err, "unable to decrypt repository config") - } - - return repoConfig.RequiredFeatures, nil -} - -// SetParameters changes mutable repository parameters. -func (r *directRepository) SetParameters( - ctx context.Context, - m format.MutableParameters, - blobcfg format.BlobStorageConfiguration, - requiredFeatures []feature.Required, -) error { - f := r.formatBlob - - repoConfig, err := f.DecryptRepositoryConfig(r.formatEncryptionKey) - if err != nil { - return errors.Wrap(err, "unable to decrypt repository config") - } - - if err := m.Validate(); err != nil { - return errors.Wrap(err, "invalid parameters") - } - - if err := blobcfg.Validate(); err != nil { - return errors.Wrap(err, "invalid blob-config options") - } - - repoConfig.ContentFormat.MutableParameters = m - repoConfig.RequiredFeatures = requiredFeatures - - if err := f.EncryptRepositoryConfig(repoConfig, r.formatEncryptionKey); err != nil { - return errors.Errorf("unable to encrypt format bytes") - } - - if err := f.WriteBlobCfgBlob(ctx, r.blobs, blobcfg, r.formatEncryptionKey); err != nil { - return errors.Wrap(err, "unable to write blobcfg blob") - } - - if err := f.WriteKopiaRepositoryBlob(ctx, r.blobs, r.blobCfgBlob); err != nil { - return errors.Wrap(err, "unable to write format blob") - } - - if cd := r.cachingOptions.CacheDirectory; cd != "" { - if err := os.Remove(filepath.Join(cd, format.KopiaRepositoryBlobID)); err != nil { - log(ctx).Errorf("unable to remove %s: %v", format.KopiaRepositoryBlobID, err) - } - - if err := os.Remove(filepath.Join(cd, format.KopiaBlobCfgBlobID)); err != nil && !os.IsNotExist(err) { - log(ctx).Errorf("unable to remove %s: %v", format.KopiaBlobCfgBlobID, err) - } - } - - return nil -} diff --git a/repo/repository.go b/repo/repository.go index 72e3b1272..adbea6428 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -10,7 +10,6 @@ "go.opentelemetry.io/otel" "github.com/kopia/kopia/internal/clock" - "github.com/kopia/kopia/internal/feature" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/blob/throttling" "github.com/kopia/kopia/repo/content" @@ -54,7 +53,7 @@ type DirectRepository interface { Repository ObjectFormat() format.ObjectFormat - BlobCfg() format.BlobStorageConfiguration + FormatManager() *format.Manager BlobReader() blob.Reader BlobVolume() blob.Volume ContentReader() content.Reader @@ -66,7 +65,6 @@ type DirectRepository interface { DeriveKey(purpose []byte, keyLength int) []byte Token(password string) (string, error) Throttler() throttling.SettableThrottler - RequiredFeatures() ([]feature.Required, error) DisableIndexRefresh() } @@ -76,25 +74,22 @@ type DirectRepositoryWriter interface { DirectRepository BlobStorage() blob.Storage ContentManager() *content.WriteManager - SetParameters(ctx context.Context, m format.MutableParameters, blobcfg format.BlobStorageConfiguration, requiredFeatures []feature.Required) error - ChangePassword(ctx context.Context, newPassword string) error - GetUpgradeLockIntent(ctx context.Context) (*format.UpgradeLockIntent, error) - SetUpgradeLockIntent(ctx context.Context, l format.UpgradeLockIntent) (*format.UpgradeLockIntent, error) - CommitUpgrade(ctx context.Context) error - RollbackUpgrade(ctx context.Context) error + // SetParameters(ctx context.Context, m format.MutableParameters, blobcfg format.BlobStorageConfiguration, requiredFeatures []feature.Required) error + // ChangePassword(ctx context.Context, newPassword string) error + // GetUpgradeLockIntent(ctx context.Context) (*format.UpgradeLockIntent, error) + // SetUpgradeLockIntent(ctx context.Context, l format.UpgradeLockIntent) (*format.UpgradeLockIntent, error) + // CommitUpgrade(ctx context.Context) error + // RollbackUpgrade(ctx context.Context) error } type directRepositoryParameters struct { - uniqueID []byte - configFile string - cachingOptions content.CachingOptions - cliOpts ClientOptions - timeNow func() time.Time - formatBlob *format.KopiaRepositoryJSON - blobCfgBlob format.BlobStorageConfiguration - formatEncryptionKey []byte - nextWriterID *int32 - throttler throttling.SettableThrottler + configFile string + cachingOptions content.CachingOptions + cliOpts ClientOptions + timeNow func() time.Time + fmgr *format.Manager + nextWriterID *int32 + throttler throttling.SettableThrottler } // directRepository is an implementation of repository that directly manipulates underlying storage. @@ -113,13 +108,13 @@ type directRepository struct { // DeriveKey derives encryption key of the provided length from the master key. func (r *directRepository) DeriveKey(purpose []byte, keyLength int) []byte { if r.cmgr.ContentFormat().SupportsPasswordChange() { - return format.DeriveKeyFromMasterKey(r.cmgr.ContentFormat().GetMasterKey(), r.uniqueID, purpose, keyLength) + return format.DeriveKeyFromMasterKey(r.cmgr.ContentFormat().GetMasterKey(), r.UniqueID(), purpose, keyLength) } // version of kopia = format.MaxFormatVersion { - return errors.Errorf("repository is using version %d, and version %d is the maximum", - repoConfig.ContentFormat.Version, format.MaxFormatVersion) - } - - // backup the current repository config from local cache to the - // repository when we place the lock for the first time - if err := r.formatBlob.WriteKopiaRepositoryBlobWithID(ctx, r.blobs, r.blobCfgBlob, FormatBlobBackupID(l)); err != nil { - return errors.Wrap(err, "failed to backup the repo format blob") - } - - // set a new lock or revoke an existing lock - repoConfig.UpgradeLock = &l - // mark the upgrade to the new format version, this will ensure that older - // clients won't be able to parse the new version - repoConfig.ContentFormat.Version = format.MaxFormatVersion - } else if newL, err := repoConfig.UpgradeLock.Update(&l); err == nil { - repoConfig.UpgradeLock = newL - } else { - return errors.Wrap(err, "failed to update the existing lock") - } - - return nil - }) - if err != nil { - return nil, err - } - - return repoConfig.UpgradeLock, nil -} - -// CommitUpgrade removes the upgrade lock from the from the repository format -// blob. This in-effect commits the new repository format t othe repository and -// resumes all access to the repository. -func (r *directRepository) CommitUpgrade(ctx context.Context) error { - _, err := r.updateRepoConfig(ctx, func(repoConfig *format.RepositoryConfig) error { - if repoConfig.UpgradeLock == nil { - return errors.New("no upgrade in progress") - } - - // restore the old format version - repoConfig.UpgradeLock = nil - - return nil - }) - - return err -} - -// RollbackUpgrade removes the upgrade lock while also restoring the -// format-blob's original version. This method does not restore the original -// repository data format and neither does it validate against any repository -// changes. Rolling back the repository format is currently not supported and -// hence using this API could render the repository corrupted and unreadable by -// clients. -// -//nolint:gocyclo -func (r *directRepository) RollbackUpgrade(ctx context.Context) error { - f := r.formatBlob - - repoConfig, err := f.DecryptRepositoryConfig(r.formatEncryptionKey) - if err != nil { - return errors.Wrap(err, "unable to decrypt repository config") - } - - if repoConfig.UpgradeLock == nil { - return errors.New("no upgrade in progress") - } - - // restore the oldest backup and delete the rest - var oldestBackup *blob.Metadata - - if err = r.blobs.ListBlobs(ctx, FormatBlobBackupIDPrefix, func(bm blob.Metadata) error { - var delID blob.ID - if oldestBackup == nil || bm.Timestamp.Before(oldestBackup.Timestamp) { - if oldestBackup != nil { - // delete the current candidate because we have found an even older one - delID = oldestBackup.BlobID - } - oldestBackup = &bm - } else { - delID = bm.BlobID - } - - if delID != "" { - // delete the backup that we are not going to need for rollback - if err = r.blobs.DeleteBlob(ctx, delID); err != nil { - return errors.Wrapf(err, "failed to delete the format blob backup %q", delID) - } - } - - return nil - }); err != nil { - return errors.Wrap(err, "failed to list backup blobs") - } - - // restore only when we find a backup, otherwise simply cleanup the local cache - if oldestBackup != nil { - var d gather.WriteBuffer - if err = r.blobs.GetBlob(ctx, oldestBackup.BlobID, 0, -1, &d); err != nil { - return errors.Wrapf(err, "failed to read from backup %q", oldestBackup.BlobID) - } - - if err = r.blobs.PutBlob(ctx, format.KopiaRepositoryBlobID, d.Bytes(), blob.PutOptions{}); err != nil { - return errors.Wrapf(err, "failed to restore format blob from backup %q", oldestBackup.BlobID) - } - - // delete the backup after we have restored the format-blob - if err = r.blobs.DeleteBlob(ctx, oldestBackup.BlobID); err != nil { - return errors.Wrapf(err, "failed to delete the format blob backup %q", oldestBackup.BlobID) - } - } - - if cd := r.cachingOptions.CacheDirectory; cd != "" { - if err = os.Remove(filepath.Join(cd, format.KopiaRepositoryBlobID)); err != nil && !os.IsNotExist(err) { - return errors.Errorf("unable to remove cached repository format blob: %v", err) - } - } - - return nil -} - -func (r *directRepository) GetUpgradeLockIntent(ctx context.Context) (*format.UpgradeLockIntent, error) { - f := r.formatBlob - - repoConfig, err := f.DecryptRepositoryConfig(r.formatEncryptionKey) - if err != nil { - return nil, errors.Wrap(err, "unable to decrypt repository config") - } - - return repoConfig.UpgradeLock, nil -} diff --git a/snapshot/snapshotmaintenance/snapshotmaintenance_test.go b/snapshot/snapshotmaintenance/snapshotmaintenance_test.go index c3b1ca4d3..a03155ceb 100644 --- a/snapshot/snapshotmaintenance/snapshotmaintenance_test.go +++ b/snapshot/snapshotmaintenance/snapshotmaintenance_test.go @@ -208,7 +208,7 @@ func (s *formatSpecificTestSuite) TestSnapshotGCMinContentAgeSafety(t *testing.T // Advance time so the first content created above is close fall of the // SafetyFull.MinContentAgeSubjectToGC window. Leave a small buffer - // of 10 seconds below for "passage of time" between now an when snapshot + // of 20 seconds below for "passage of time" between now an when snapshot // GC actually starts. // Note: The duration of this buffer is a "magical number" that depends on // the number of times time is advanced between "now" and when snapshot @@ -217,7 +217,7 @@ func (s *formatSpecificTestSuite) TestSnapshotGCMinContentAgeSafety(t *testing.T require.NoError(t, err) require.NotEmpty(t, ci) - timeAdvance := safety.MinContentAgeSubjectToGC - th.fakeTime.NowFunc()().Sub(ci.Timestamp()) - 10*time.Second + timeAdvance := safety.MinContentAgeSubjectToGC - th.fakeTime.NowFunc()().Sub(ci.Timestamp()) - 20*time.Second require.Positive(t, timeAdvance) th.fakeTime.Advance(timeAdvance)