feat(repository): implemented format.Manager - take 2 (#2349)

* implemented format blob cache abstraction

* moved upgrade lock logic to repo/format

* moved set parameters logic to repo/format

* moved change password functionality to repo/format

* mechanical changes

* mechanical changes to react to format manager interface

* get current repository format bytes instead of static

* implemented format.Manager which dynamically fetches and caches latest format blob

* repo changes to use format.Manager

* fixed failing unit test due to different timings

* reduced lock contention by using RWMutex

* serve immutable parts of format without any locks

* increase test timeout

* fixed handling of negative validDuration

The new rules are:

- validDuration < 0 - ignore initial cached file, refresh every 15min
- validDuration > 15min - refresh every 15 minutes
- validDuration > 0 && validDuration <= 15min - refresh using provided
  interval (mostly used for testing)
This commit is contained in:
Jarek Kowalski
2022-09-02 18:55:01 -07:00
committed by GitHub
parent 7bda16ab33
commit cbb7d68fb6
26 changed files with 1524 additions and 717 deletions

View File

@@ -36,7 +36,7 @@ func (c *commandRepositoryChangePassword) run(ctx context.Context, rep repo.Dire
newPass = c.newPassword
}
if err := rep.ChangePassword(ctx, newPass); err != nil {
if err := rep.FormatManager().ChangePassword(ctx, newPass); err != nil {
return errors.Wrap(err, "unable to change password")
}

View File

@@ -125,10 +125,17 @@ func (c *commandRepositorySetParameters) setRetentionModeParameter(ctx context.C
func (c *commandRepositorySetParameters) run(ctx context.Context, rep repo.DirectRepositoryWriter) error {
var anyChange bool
mp := rep.ContentReader().ContentFormat().Struct().MutableParameters
blobcfg := rep.BlobCfg()
mp, err := rep.FormatManager().GetMutableParameters()
if err != nil {
return errors.Wrap(err, "mutable parameters")
}
requiredFeatures, err := rep.RequiredFeatures()
blobcfg, err := rep.FormatManager().BlobCfgBlob()
if err != nil {
return errors.Wrap(err, "blob configuration")
}
requiredFeatures, err := rep.FormatManager().RequiredFeatures()
if err != nil {
return errors.Wrap(err, "unable to get required features")
}
@@ -187,7 +194,7 @@ func (c *commandRepositorySetParameters) run(ctx context.Context, rep repo.Direc
}
}
if err := rep.SetParameters(ctx, mp, blobcfg, requiredFeatures); err != nil {
if err := rep.FormatManager().SetParameters(ctx, mp, blobcfg, requiredFeatures); err != nil {
return errors.Wrap(err, "error setting parameters")
}

View File

@@ -63,9 +63,9 @@ func (c *commandRepositoryStatus) outputJSON(ctx context.Context, r repo.Reposit
ci := dr.BlobReader().ConnectionInfo()
s.UniqueIDHex = hex.EncodeToString(dr.UniqueID())
s.ObjectFormat = dr.ObjectFormat()
s.BlobRetention = dr.BlobCfg()
s.Storage = scrubber.ScrubSensitiveData(reflect.ValueOf(ci)).Interface().(blob.ConnectionInfo) //nolint:forcetypeassert
s.ContentFormat = scrubber.ScrubSensitiveData(reflect.ValueOf(dr.ContentReader().ContentFormat().Struct())).Interface().(format.ContentFormat) //nolint:forcetypeassert
s.BlobRetention, _ = dr.FormatManager().BlobCfgBlob()
s.Storage = scrubber.ScrubSensitiveData(reflect.ValueOf(ci)).Interface().(blob.ConnectionInfo) //nolint:forcetypeassert
s.ContentFormat = dr.FormatManager().ScrubbedContentFormat()
switch cp, err := dr.BlobVolume().GetCapacity(ctx); {
case err == nil:
@@ -88,7 +88,7 @@ func (c *commandRepositoryStatus) dumpUpgradeStatus(ctx context.Context, dr repo
return nil
}
l, err := drw.GetUpgradeLockIntent(ctx)
l, err := drw.FormatManager().GetUpgradeLockIntent(ctx)
if err != nil {
return errors.Wrap(err, "failed to get the upgrade lock intent")
}
@@ -120,7 +120,7 @@ func (c *commandRepositoryStatus) dumpUpgradeStatus(ctx context.Context, dr repo
}
func (c *commandRepositoryStatus) dumpRetentionStatus(dr repo.DirectRepository) {
if blobcfg := dr.BlobCfg(); blobcfg.IsRetentionEnabled() {
if blobcfg, _ := dr.FormatManager().BlobCfgBlob(); blobcfg.IsRetentionEnabled() {
c.out.printStdout("\n")
c.out.printStdout("Blob retention mode: %s\n", blobcfg.RetentionMode)
c.out.printStdout("Blob retention period: %s\n", blobcfg.RetentionPeriod)
@@ -251,7 +251,7 @@ func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository)
}
func (c *commandRepositoryStatus) outputRequiredFeatures(dr repo.DirectRepository) {
if req, _ := dr.RequiredFeatures(); len(req) > 0 {
if req, _ := dr.FormatManager().RequiredFeatures(); len(req) > 0 {
var featureIDs []string
for _, r := range req {

View File

@@ -4,7 +4,6 @@
"context"
"fmt"
"os"
"path/filepath"
"time"
"github.com/alecthomas/kingpin"
@@ -86,7 +85,7 @@ func (c *commandRepositoryUpgrade) forceRollbackAction(ctx context.Context, rep
return errors.New("repository upgrade lock can only be revoked unsafely; please use the --force flag")
}
if err := rep.RollbackUpgrade(ctx); err != nil {
if err := rep.FormatManager().RollbackUpgrade(ctx); err != nil {
return errors.Wrap(err, "failed to rollback the upgrade")
}
@@ -141,7 +140,7 @@ func (c *commandRepositoryUpgrade) setLockIntent(ctx context.Context, rep repo.D
// Update format-blob and clear the cache.
// This will fail if we have already upgraded.
l, err := rep.SetUpgradeLockIntent(ctx, *l)
l, err := rep.FormatManager().SetUpgradeLockIntent(ctx, *l)
if err != nil {
return errors.Wrap(err, "error setting the upgrade lock intent")
}
@@ -182,7 +181,7 @@ func (c *commandRepositoryUpgrade) drainOrCommit(ctx context.Context, rep repo.D
if mp.EpochParameters.Enabled {
log(ctx).Infof("Repository indices have already been migrated to the epoch format, no need to drain other clients")
l, err := rep.GetUpgradeLockIntent(ctx)
l, err := rep.FormatManager().GetUpgradeLockIntent(ctx)
if err != nil {
return errors.Wrap(err, "failed to get upgrade lock intent")
}
@@ -224,28 +223,8 @@ func (c *commandRepositoryUpgrade) sleepWithContext(ctx context.Context, dur tim
}
func (c *commandRepositoryUpgrade) drainAllClients(ctx context.Context, rep repo.DirectRepositoryWriter) error {
password, err := c.svc.getPasswordFromFlags(ctx, false, false)
if err != nil {
return errors.Wrap(err, "getting password")
}
configFile, err := filepath.Abs(c.svc.repositoryConfigFileName())
if err != nil {
return errors.Wrap(err, "error resolving config file path")
}
lc, err := repo.LoadConfigFromFile(configFile)
if err != nil {
return errors.Wrapf(err, "error loading config file %q", configFile)
}
cacheOpts := lc.Caching.CloneOrDefault()
for {
l, err := format.ReadAndCacheRepoUpgradeLock(ctx, rep.BlobStorage(), password, cacheOpts.CacheDirectory, -1)
if err != nil {
return errors.Wrap(err, "unable to reload the repository format blob")
}
l, err := rep.FormatManager().GetUpgradeLockIntent(ctx)
upgradeTime := l.UpgradeTime()
now := rep.Time()
@@ -280,7 +259,7 @@ func (c *commandRepositoryUpgrade) upgrade(ctx context.Context, rep repo.DirectR
return errors.Wrap(mperr, "mutable parameters")
}
rf, err := rep.RequiredFeatures()
rf, err := rep.FormatManager().RequiredFeatures()
if err != nil {
return errors.Wrap(err, "error getting repository features")
}
@@ -295,12 +274,17 @@ func (c *commandRepositoryUpgrade) upgrade(ctx context.Context, rep repo.DirectR
log(ctx).Infof("migrating current indices to epoch format")
if err := rep.ContentManager().PrepareUpgradeToIndexBlobManagerV1(ctx, mp.EpochParameters); err != nil {
return errors.Wrap(err, "error upgrading indices")
if uerr := rep.ContentManager().PrepareUpgradeToIndexBlobManagerV1(ctx, mp.EpochParameters); uerr != nil {
return errors.Wrap(uerr, "error upgrading indices")
}
blobCfg, err := rep.FormatManager().BlobCfgBlob()
if err != nil {
return errors.Wrap(err, "error getting blob configuration")
}
// update format-blob and clear the cache
if err := rep.SetParameters(ctx, mp, rep.BlobCfg(), rf); err != nil {
if err := rep.FormatManager().SetParameters(ctx, mp, blobCfg, rf); err != nil {
return errors.Wrap(err, "error setting parameters")
}
@@ -322,7 +306,7 @@ func (c *commandRepositoryUpgrade) upgrade(ctx context.Context, rep repo.DirectR
// cleanup and backups used for the rollback mechanism, so we cannot rollback
// after this phase.
func (c *commandRepositoryUpgrade) commitUpgrade(ctx context.Context, rep repo.DirectRepositoryWriter) error {
if err := rep.CommitUpgrade(ctx); err != nil {
if err := rep.FormatManager().CommitUpgrade(ctx); err != nil {
return errors.Wrap(err, "error finalizing upgrade")
}
// we need to reopen the repository after this point

View File

@@ -1,58 +0,0 @@
package repo
import (
"context"
"os"
"path/filepath"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo/format"
)
// ChangePassword changes the repository password and rewrites
// `kopia.repository` & `kopia.blobcfg`.
func (r *directRepository) ChangePassword(ctx context.Context, newPassword string) error {
f := r.formatBlob
repoConfig, err := f.DecryptRepositoryConfig(r.formatEncryptionKey)
if err != nil {
return errors.Wrap(err, "unable to decrypt repository config")
}
if !repoConfig.EnablePasswordChange {
return errors.Errorf("password changes are not supported for repositories created using Kopia v0.8 or older")
}
newFormatEncryptionKey, err := f.DeriveFormatEncryptionKeyFromPassword(newPassword)
if err != nil {
return errors.Wrap(err, "unable to derive master key")
}
r.formatEncryptionKey = newFormatEncryptionKey
if err := f.EncryptRepositoryConfig(repoConfig, newFormatEncryptionKey); err != nil {
return errors.Wrap(err, "unable to encrypt format bytes")
}
if err := f.WriteBlobCfgBlob(ctx, r.blobs, r.blobCfgBlob, newFormatEncryptionKey); err != nil {
return errors.Wrap(err, "unable to write blobcfg blob")
}
if err := f.WriteKopiaRepositoryBlob(ctx, r.blobs, r.blobCfgBlob); err != nil {
return errors.Wrap(err, "unable to write format blob")
}
// remove cached kopia.repository blob.
if cd := r.cachingOptions.CacheDirectory; cd != "" {
if err := os.Remove(filepath.Join(cd, format.KopiaRepositoryBlobID)); err != nil {
log(ctx).Errorf("unable to remove %s: %v", format.KopiaRepositoryBlobID, err)
}
if err := os.Remove(filepath.Join(cd, format.KopiaBlobCfgBlobID)); err != nil && !os.IsNotExist(err) {
log(ctx).Errorf("unable to remove %s: %v", format.KopiaBlobCfgBlobID, err)
}
}
return nil
}

View File

@@ -752,7 +752,12 @@ func (bm *WriteManager) getOrCreatePendingPackInfoLocked(ctx context.Context, pr
return nil, errors.Wrap(err, "unable to read crypto bytes")
}
b.Append(bm.format.RepositoryFormatBytes())
suffix, berr := bm.format.RepositoryFormatBytes()
if berr != nil {
return nil, errors.Wrap(berr, "format bytes")
}
b.Append(suffix)
//nolint:gosec
if err := writeRandomBytesToBuffer(b, rand.Intn(bm.maxPreambleLength-bm.minPreambleLength+1)+bm.minPreambleLength); err != nil {
@@ -941,11 +946,10 @@ func (bm *WriteManager) MetadataCache() cache.ContentCache {
// ManagerOptions are the optional parameters for manager creation.
type ManagerOptions struct {
RepositoryFormatBytes []byte
TimeNow func() time.Time // Time provider
DisableInternalLog bool
RetentionMode string
RetentionPeriod time.Duration
TimeNow func() time.Time // Time provider
DisableInternalLog bool
RetentionMode string
RetentionPeriod time.Duration
}
// CloneOrDefault returns a clone of provided ManagerOptions or default empty struct if nil.

View File

@@ -59,8 +59,8 @@ func serializeBlobCfgBytes(f *KopiaRepositoryJSON, r BlobStorageConfiguration, f
}
}
// DeserializeBlobCfgBytes decrypts and deserializes the given bytes into BlobStorageConfiguration.
func (f *KopiaRepositoryJSON) DeserializeBlobCfgBytes(encryptedBlobCfgBytes, formatEncryptionKey []byte) (BlobStorageConfiguration, error) {
// deserializeBlobCfgBytes decrypts and deserializes the given bytes into BlobStorageConfiguration.
func deserializeBlobCfgBytes(j *KopiaRepositoryJSON, encryptedBlobCfgBytes, formatEncryptionKey []byte) (BlobStorageConfiguration, error) {
var (
plainText []byte
r BlobStorageConfiguration
@@ -71,18 +71,18 @@ func (f *KopiaRepositoryJSON) DeserializeBlobCfgBytes(encryptedBlobCfgBytes, for
return r, nil
}
switch f.EncryptionAlgorithm {
switch j.EncryptionAlgorithm {
case "NONE": // do nothing
plainText = encryptedBlobCfgBytes
case aes256GcmEncryption:
plainText, err = decryptRepositoryBlobBytesAes256Gcm(encryptedBlobCfgBytes, formatEncryptionKey, f.UniqueID)
plainText, err = decryptRepositoryBlobBytesAes256Gcm(encryptedBlobCfgBytes, formatEncryptionKey, j.UniqueID)
if err != nil {
return BlobStorageConfiguration{}, errors.Errorf("unable to decrypt repository blobcfg blob")
}
default:
return BlobStorageConfiguration{}, errors.Errorf("unknown encryption algorithm: '%v'", f.EncryptionAlgorithm)
return BlobStorageConfiguration{}, errors.Errorf("unknown encryption algorithm: '%v'", j.EncryptionAlgorithm)
}
if err = json.Unmarshal(plainText, &r); err != nil {

View File

@@ -33,7 +33,7 @@
const KopiaRepositoryBlobID = "kopia.repository"
// ErrInvalidPassword is returned when repository password is invalid.
var ErrInvalidPassword = errors.Errorf("invalid repository password")
var ErrInvalidPassword = errors.Errorf("invalid repository password") // +checklocksignore
//nolint:gochecknoglobals
var (

View File

@@ -1,9 +1,11 @@
package format
import (
"bytes"
"context"
"os"
"path/filepath"
"sync"
"time"
"github.com/pkg/errors"
@@ -12,133 +14,163 @@
"github.com/kopia/kopia/internal/cache"
"github.com/kopia/kopia/internal/cachedir"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/logging"
)
// DefaultRepositoryBlobCacheDuration is the duration for which we treat cached kopia.repository
// as valid.
const DefaultRepositoryBlobCacheDuration = 15 * time.Minute
var log = logging.Module("kopia/repo/format")
func formatBytesCachingEnabled(cacheDirectory string, validDuration time.Duration) bool {
if cacheDirectory == "" {
return false
}
return validDuration > 0
// blobCache encapsulates cache for format blobs.
// Note that the cache only stores very small number of blobs at the root of the repository,
// usually 1 or 2.
type blobCache interface {
Get(ctx context.Context, blobID blob.ID) ([]byte, time.Time, bool)
Put(ctx context.Context, blobID blob.ID, data []byte) (time.Time, error)
Remove(ctx context.Context, ids []blob.ID)
}
func readRepositoryBlobBytesFromCache(ctx context.Context, cachedFile string, validDuration time.Duration) (data []byte, cacheMTime time.Time, err error) {
type nullCache struct{}
func (nullCache) Get(ctx context.Context, blobID blob.ID) ([]byte, time.Time, bool) {
return nil, time.Time{}, false
}
func (nullCache) Put(ctx context.Context, blobID blob.ID, data []byte) (time.Time, error) {
return clock.Now(), nil
}
func (nullCache) Remove(ctx context.Context, ids []blob.ID) {
}
type inMemoryCache struct {
timeNow func() time.Time // +checklocksignore
mu sync.Mutex
// +checklocks:mu
data map[blob.ID][]byte
// +checklocks:mu
times map[blob.ID]time.Time
}
func (c *inMemoryCache) Get(ctx context.Context, blobID blob.ID) ([]byte, time.Time, bool) {
c.mu.Lock()
defer c.mu.Unlock()
data, ok := c.data[blobID]
if ok {
return data, c.times[blobID], true
}
return nil, time.Time{}, false
}
func (c *inMemoryCache) Put(ctx context.Context, blobID blob.ID, data []byte) (time.Time, error) {
c.mu.Lock()
defer c.mu.Unlock()
c.data[blobID] = data
c.times[blobID] = c.timeNow()
return c.times[blobID], nil
}
func (c *inMemoryCache) Remove(ctx context.Context, ids []blob.ID) {
c.mu.Lock()
defer c.mu.Unlock()
for _, blobID := range ids {
delete(c.data, blobID)
delete(c.times, blobID)
}
}
type onDiskCache struct {
cacheDirectory string
}
func (c *onDiskCache) Get(ctx context.Context, blobID blob.ID) ([]byte, time.Time, bool) {
cachedFile := filepath.Join(c.cacheDirectory, string(blobID))
cst, err := os.Stat(cachedFile)
if err != nil {
return nil, time.Time{}, errors.Wrap(err, "unable to open cache file")
return nil, time.Time{}, false
}
cacheMTime = cst.ModTime()
if clock.Now().Sub(cacheMTime) > validDuration {
// got cached file, but it's too old, remove it
if err = os.Remove(cachedFile); err != nil {
log(ctx).Debugf("unable to remove cache file: %v", err)
}
cacheMTime := cst.ModTime()
return nil, time.Time{}, errors.Errorf("cached file too old")
}
//nolint:gosec
data, err := os.ReadFile(cachedFile)
data, err = os.ReadFile(cachedFile) //nolint:gosec
if err != nil {
return nil, time.Time{}, errors.Wrapf(err, "failed to read the cache file %q", cachedFile)
}
return data, cacheMTime, nil
return data, cacheMTime, err == nil
}
// ReadAndCacheRepositoryBlobBytes reads the provided blob from the repository or cache directory.
func ReadAndCacheRepositoryBlobBytes(ctx context.Context, st blob.Storage, cacheDirectory, blobID string, validDuration time.Duration) ([]byte, time.Time, error) {
cachedFile := filepath.Join(cacheDirectory, blobID)
func (c *onDiskCache) Put(ctx context.Context, blobID blob.ID, data []byte) (time.Time, error) {
cachedFile := filepath.Join(c.cacheDirectory, string(blobID))
if validDuration == 0 {
validDuration = DefaultRepositoryBlobCacheDuration
}
// optimistically assume cache directory exist, create it if not
if err := atomicfile.Write(cachedFile, bytes.NewReader(data)); err != nil {
if err := os.MkdirAll(c.cacheDirectory, cache.DirMode); err != nil && !os.IsExist(err) {
return time.Time{}, errors.Wrap(err, "unable to create cache directory")
}
if cacheDirectory != "" {
if err := os.MkdirAll(cacheDirectory, cache.DirMode); err != nil && !os.IsExist(err) {
log(ctx).Errorf("unable to create cache directory: %v", err)
if err := cachedir.WriteCacheMarker(c.cacheDirectory); err != nil {
return time.Time{}, errors.Wrap(err, "unable to write cache directory marker")
}
if err := atomicfile.Write(cachedFile, bytes.NewReader(data)); err != nil {
return time.Time{}, errors.Wrapf(err, "unable to write to cache: %v", string(blobID))
}
}
cacheEnabled := formatBytesCachingEnabled(cacheDirectory, validDuration)
if cacheEnabled {
data, cacheMTime, err := readRepositoryBlobBytesFromCache(ctx, cachedFile, validDuration)
if err == nil {
log(ctx).Debugf("%s retrieved from cache", blobID)
return data, cacheMTime, nil
}
if os.IsNotExist(err) {
log(ctx).Debugf("%s could not be fetched from cache: %v", blobID, err)
}
} else {
log(ctx).Debugf("%s cache not enabled", blobID)
cst, err := os.Stat(cachedFile)
if err != nil {
return time.Time{}, errors.Wrap(err, "unable to open cache file")
}
var b gather.WriteBuffer
defer b.Close()
if err := st.GetBlob(ctx, blob.ID(blobID), 0, -1, &b); err != nil {
return nil, time.Time{}, errors.Wrapf(err, "error getting %s blob", blobID)
}
if cacheEnabled {
if err := atomicfile.Write(cachedFile, b.Bytes().Reader()); err != nil {
log(ctx).Warnf("unable to write cache: %v", err)
}
}
return b.ToByteSlice(), clock.Now(), nil
return cst.ModTime(), nil
}
// ReadAndCacheDecodedRepositoryConfig reads `kopia.repository` blob, potentially from cache and decodes it.
func ReadAndCacheDecodedRepositoryConfig(ctx context.Context, st blob.Storage, password, cacheDir string, validDuration time.Duration) (ufb *DecodedRepositoryConfig, err error) {
ufb = &DecodedRepositoryConfig{}
func (c *onDiskCache) Remove(ctx context.Context, ids []blob.ID) {
for _, blobID := range ids {
fname := filepath.Join(c.cacheDirectory, string(blobID))
log(ctx).Infof("deleting %v", fname)
ufb.KopiaRepositoryBytes, ufb.CacheMTime, err = ReadAndCacheRepositoryBlobBytes(ctx, st, cacheDir, KopiaRepositoryBlobID, validDuration)
if err != nil {
return nil, errors.Wrap(err, "unable to read format blob")
if err := os.Remove(fname); err != nil && !os.IsNotExist(err) {
log(ctx).Debugf("unable to remove cached repository format blob: %v", err)
}
}
if err = cachedir.WriteCacheMarker(cacheDir); err != nil {
return nil, errors.Wrap(err, "unable to write cache directory marker")
}
ufb.KopiaRepository, err = ParseKopiaRepositoryJSON(ufb.KopiaRepositoryBytes)
if err != nil {
return nil, errors.Wrap(err, "can't parse format blob")
}
ufb.KopiaRepositoryBytes, err = addFormatBlobChecksumAndLength(ufb.KopiaRepositoryBytes)
if err != nil {
return nil, errors.Errorf("unable to add checksum")
}
ufb.FormatEncryptionKey, err = ufb.KopiaRepository.DeriveFormatEncryptionKeyFromPassword(password)
if err != nil {
return nil, err
}
ufb.RepoConfig, err = ufb.KopiaRepository.DecryptRepositoryConfig(ufb.FormatEncryptionKey)
if err != nil {
return nil, ErrInvalidPassword
}
return ufb, nil
}
// ReadAndCacheRepoUpgradeLock loads the lock config from cache and returns it.
func ReadAndCacheRepoUpgradeLock(ctx context.Context, st blob.Storage, password, cacheDir string, validDuration time.Duration) (*UpgradeLockIntent, error) {
ufb, err := ReadAndCacheDecodedRepositoryConfig(ctx, st, password, cacheDir, validDuration)
return ufb.RepoConfig.UpgradeLock, err
// NewDiskCache returns on-disk blob cache.
func NewDiskCache(cacheDir string) blobCache {
return &onDiskCache{cacheDir}
}
// NewMemoryBlobCache returns in-memory blob cache.
func NewMemoryBlobCache(timeNow func() time.Time) blobCache {
return &inMemoryCache{
timeNow: timeNow,
data: map[blob.ID][]byte{},
times: map[blob.ID]time.Time{},
}
}
// NewFormatBlobCache creates an implementationof blobCache for particular cache settings.
func NewFormatBlobCache(cacheDir string, validDuration time.Duration, timeNow func() time.Time) blobCache {
if cacheDir != "" {
return NewDiskCache(cacheDir)
}
if validDuration > 0 {
return NewMemoryBlobCache(timeNow)
}
return &nullCache{}
}
var (
_ blobCache = (*nullCache)(nil)
_ blobCache = (*inMemoryCache)(nil)
_ blobCache = (*onDiskCache)(nil)
)

View File

@@ -0,0 +1,98 @@
package format
import (
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/internal/testutil"
"github.com/kopia/kopia/repo/blob"
)
func TestFormatBlobCache(t *testing.T) {
tempdir1 := testutil.TempDirectory(t)
tempdir2 := filepath.Join(testutil.TempDirectory(t), "subdir")
cases := []struct {
desc string
fbc blobCache
isDurable bool
}{
{"NullCache", NewFormatBlobCache("", -1, clock.Now), false},
{"DiskCache-Exists", NewFormatBlobCache(tempdir1, -1, clock.Now), true},
{"DiskCache-NotExists", NewFormatBlobCache(tempdir2, -1, clock.Now), true},
{"MemoryCache", NewFormatBlobCache("", 10*time.Second, clock.Now), true},
}
t.Run("Cases", func(t *testing.T) {
for _, tc := range cases {
tc := tc
t.Run(tc.desc, func(t *testing.T) {
t.Parallel()
ctx := testlogging.Context(t)
v, mtime, ok := tc.fbc.Get(ctx, "blob1")
require.False(t, ok)
require.Nil(t, v)
require.Zero(t, mtime)
mtime, err := tc.fbc.Put(ctx, "blob1", []byte{1, 2, 3})
require.NoError(t, err)
require.NotZero(t, mtime)
_, err = tc.fbc.Put(ctx, "blob2", []byte{3, 4, 5})
require.NoError(t, err)
v2, mtime2, ok := tc.fbc.Get(ctx, "blob1")
if tc.isDurable {
require.True(t, ok)
require.Equal(t, []byte{1, 2, 3}, v2)
require.Equal(t, mtime, mtime2)
} else {
require.False(t, ok)
require.Nil(t, v)
require.Zero(t, mtime2)
}
time.Sleep(3 * time.Second)
// overwrite
mtime3, err := tc.fbc.Put(ctx, "blob1", []byte{2, 3, 4})
require.NoError(t, err)
require.Greater(t, mtime3, mtime2)
v2, mtime4, ok := tc.fbc.Get(ctx, "blob1")
if tc.isDurable {
require.True(t, ok)
require.Equal(t, []byte{2, 3, 4}, v2)
require.Equal(t, mtime3, mtime4)
} else {
require.False(t, ok)
require.Nil(t, v2)
require.Zero(t, mtime4)
}
tc.fbc.Remove(ctx, []blob.ID{"blob1"})
v3, mtime5, ok := tc.fbc.Get(ctx, "blob1")
require.False(t, ok)
require.Nil(t, v3)
require.Zero(t, mtime5)
})
}
})
require.NoFileExists(t, filepath.Join(tempdir1, "blob1"))
require.NoFileExists(t, filepath.Join(tempdir2, "blob1"))
require.FileExists(t, filepath.Join(tempdir1, "blob2"))
require.FileExists(t, filepath.Join(tempdir2, "blob2"))
}

View File

@@ -0,0 +1,44 @@
package format
import (
"context"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo/blob"
)
// ChangePassword changes the repository password and rewrites
// `kopia.repository` & `kopia.blobcfg`.
func (m *Manager) ChangePassword(ctx context.Context, newPassword string) error {
m.mu.Lock()
defer m.mu.Unlock()
if !m.repoConfig.EnablePasswordChange {
return errors.Errorf("password changes are not supported for repositories created using Kopia v0.8 or older")
}
newFormatEncryptionKey, err := m.j.DeriveFormatEncryptionKeyFromPassword(newPassword)
if err != nil {
return errors.Wrap(err, "unable to derive master key")
}
m.formatEncryptionKey = newFormatEncryptionKey
m.password = newPassword
if err := m.j.EncryptRepositoryConfig(m.repoConfig, newFormatEncryptionKey); err != nil {
return errors.Wrap(err, "unable to encrypt format bytes")
}
if err := m.j.WriteBlobCfgBlob(ctx, m.blobs, m.blobCfgBlob, newFormatEncryptionKey); err != nil {
return errors.Wrap(err, "unable to write blobcfg blob")
}
if err := m.j.WriteKopiaRepositoryBlob(ctx, m.blobs, m.blobCfgBlob); err != nil {
return errors.Wrap(err, "unable to write format blob")
}
m.cache.Remove(ctx, []blob.ID{KopiaRepositoryBlobID, KopiaBlobCfgBlobID})
return nil
}

View File

@@ -0,0 +1,492 @@
package format
import (
"context"
"crypto/rand"
"io"
"sync"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/feature"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/encryption"
"github.com/kopia/kopia/repo/hashing"
"github.com/kopia/kopia/repo/logging"
)
var log = logging.Module("kopia/repo/format")
// UniqueIDLengthBytes is the length of random unique ID of each repository.
const UniqueIDLengthBytes = 32
// Manager manages the contents of `kopia.repository` and `kopia.blobcfg`.
type Manager struct {
//nolint:containedctx
ctx context.Context // +checklocksignore
blobs blob.Storage // +checklocksignore
validDuration time.Duration // +checklocksignore
password string // +checklocksignore
cache blobCache // +checklocksignore
// provider for immutable parts of the format data, used to avoid locks.
immutable Provider
timeNow func() time.Time // +checklocksignore
// all the stuff protected by a mutex is valid until `validUntil`
mu sync.RWMutex
// +checklocks:mu
formatEncryptionKey []byte
// +checklocks:mu
j *KopiaRepositoryJSON
// +checklocks:mu
repoConfig *RepositoryConfig
// +checklocks:mu
blobCfgBlob BlobStorageConfiguration
// +checklocks:mu
current Provider
// +checklocks:mu
validUntil time.Time
// +checklocks:mu
loadedTime time.Time
// +checklocks:mu
refreshCounter int
// +checklocks:mu
ignoreCacheOnFirstRefresh bool
}
func (m *Manager) getOrRefreshFormat() (Provider, error) {
if err := m.maybeRefreshNotLocked(); err != nil {
return nil, err
}
m.mu.RLock()
defer m.mu.RUnlock()
return m.current, nil
}
func (m *Manager) maybeRefreshNotLocked() error {
m.mu.RLock()
val := m.validUntil
m.mu.RUnlock()
if m.timeNow().Before(val) {
return nil
}
// current format not valid anymore, kick off a refresh
return m.refresh(m.ctx)
}
// readAndCacheRepositoryBlobBytes reads the provided blob from the repository or cache directory.
// +checklocks:m.mu
func (m *Manager) readAndCacheRepositoryBlobBytes(ctx context.Context, blobID blob.ID) ([]byte, time.Time, error) {
if !m.ignoreCacheOnFirstRefresh {
if data, mtime, ok := m.cache.Get(ctx, blobID); ok {
// read from cache and still valid
age := m.timeNow().Sub(mtime)
if age < m.validDuration {
return data, mtime, nil
}
}
}
var b gather.WriteBuffer
defer b.Close()
if err := m.blobs.GetBlob(ctx, blobID, 0, -1, &b); err != nil {
return nil, time.Time{}, errors.Wrapf(err, "error getting %s blob", blobID)
}
data := b.ToByteSlice()
mtime, err := m.cache.Put(ctx, blobID, data)
return data, mtime, errors.Wrapf(err, "error adding %s blob", blobID)
}
// ValidCacheDuration returns the duration for which each blob in the cache is valid.
func (m *Manager) ValidCacheDuration() time.Duration {
return m.validDuration
}
// RefreshCount returns the number of time the format has been refreshed.
func (m *Manager) RefreshCount() int {
m.mu.RLock()
defer m.mu.RUnlock()
return m.refreshCounter
}
// refresh reads `kopia.repository` blob, potentially from cache and decodes it.
func (m *Manager) refresh(ctx context.Context) error {
m.mu.Lock()
defer m.mu.Unlock()
b, cacheMTime, err := m.readAndCacheRepositoryBlobBytes(ctx, KopiaRepositoryBlobID)
if err != nil {
return errors.Wrap(err, "unable to read format blob")
}
j, err := ParseKopiaRepositoryJSON(b)
if err != nil {
return errors.Wrap(err, "can't parse format blob")
}
b, err = addFormatBlobChecksumAndLength(b)
if err != nil {
return errors.Errorf("unable to add checksum")
}
var formatEncryptionKey []byte
// try decrypting using old key, if present to avoid deriving it, which is expensive
repoConfig, err := j.decryptRepositoryConfig(m.formatEncryptionKey)
if err == nil {
// still valid, no need to derive
formatEncryptionKey = m.formatEncryptionKey
} else {
formatEncryptionKey, err = j.DeriveFormatEncryptionKeyFromPassword(m.password)
if err != nil {
return errors.Wrap(err, "derive format encryption key")
}
repoConfig, err = j.decryptRepositoryConfig(formatEncryptionKey)
if err != nil {
return ErrInvalidPassword
}
}
var blobCfg BlobStorageConfiguration
if b2, _, err2 := m.readAndCacheRepositoryBlobBytes(ctx, KopiaBlobCfgBlobID); err2 == nil {
var e2 error
blobCfg, e2 = deserializeBlobCfgBytes(j, b2, formatEncryptionKey)
if e2 != nil {
return errors.Wrap(e2, "deserialize blob config")
}
} else if !errors.Is(err2, blob.ErrBlobNotFound) {
return errors.Wrap(err2, "load blob config")
}
prov, err := NewFormattingOptionsProvider(&repoConfig.ContentFormat, b)
if err != nil {
return errors.Wrap(err, "error creating format provider")
}
m.current = prov
m.j = j
m.repoConfig = repoConfig
m.validUntil = cacheMTime.Add(m.validDuration)
m.formatEncryptionKey = formatEncryptionKey
m.loadedTime = cacheMTime
m.blobCfgBlob = blobCfg
m.ignoreCacheOnFirstRefresh = false
if m.immutable == nil {
// on first refresh, set `immutable``
m.immutable = prov
}
m.refreshCounter++
return nil
}
// GetEncryptionAlgorithm returns the encryption algorithm.
func (m *Manager) GetEncryptionAlgorithm() string {
return m.immutable.GetEncryptionAlgorithm()
}
// GetHashFunction returns the hash function.
func (m *Manager) GetHashFunction() string {
return m.immutable.GetHashFunction()
}
// GetECCAlgorithm returns the ECC algorithm.
func (m *Manager) GetECCAlgorithm() string {
return m.immutable.GetECCAlgorithm()
}
// GetECCOverheadPercent returns the ECC overhead percent.
func (m *Manager) GetECCOverheadPercent() int {
return m.immutable.GetECCOverheadPercent()
}
// GetHmacSecret returns the HMAC function.
func (m *Manager) GetHmacSecret() []byte {
return m.immutable.GetHmacSecret()
}
// HashFunc returns the resolved hash function.
func (m *Manager) HashFunc() hashing.HashFunc {
return m.immutable.HashFunc()
}
// Encryptor returns the resolved encryptor.
func (m *Manager) Encryptor() encryption.Encryptor {
return m.immutable.Encryptor()
}
// GetMasterKey gets the master key.
func (m *Manager) GetMasterKey() []byte {
return m.immutable.GetMasterKey()
}
// SupportsPasswordChange returns true if the repository supports password change.
func (m *Manager) SupportsPasswordChange() bool {
return m.immutable.SupportsPasswordChange()
}
// RepositoryFormatBytes returns the bytes of `kopia.repository` blob.
// This function blocks to refresh the format blob if necessary.
func (m *Manager) RepositoryFormatBytes() ([]byte, error) {
f, err := m.getOrRefreshFormat()
if err != nil {
return nil, err
}
//nolint:wrapcheck
return f.RepositoryFormatBytes()
}
// GetMutableParameters gets mutable paramers of the repository.
// This function blocks to refresh the format blob if necessary.
func (m *Manager) GetMutableParameters() (MutableParameters, error) {
f, err := m.getOrRefreshFormat()
if err != nil {
return MutableParameters{}, err
}
//nolint:wrapcheck
return f.GetMutableParameters()
}
// UpgradeLockIntent returns the current lock intent.
func (m *Manager) UpgradeLockIntent() (*UpgradeLockIntent, error) {
if err := m.maybeRefreshNotLocked(); err != nil {
return nil, err
}
m.mu.RLock()
defer m.mu.RUnlock()
return m.repoConfig.UpgradeLock.Clone(), nil
}
// RequiredFeatures returns the list of features required to open the repository.
func (m *Manager) RequiredFeatures() ([]feature.Required, error) {
if err := m.maybeRefreshNotLocked(); err != nil {
return nil, err
}
m.mu.RLock()
defer m.mu.RUnlock()
return m.repoConfig.RequiredFeatures, nil
}
// LoadedTime gets the time when the config was last reloaded.
func (m *Manager) LoadedTime() time.Time {
m.mu.RLock()
defer m.mu.RUnlock()
return m.loadedTime
}
// updateRepoConfigLocked updates repository config and rewrites kopia.repository blob.
// +checklocks:m.mu
func (m *Manager) updateRepoConfigLocked(ctx context.Context) error {
if err := m.j.EncryptRepositoryConfig(m.repoConfig, m.formatEncryptionKey); err != nil {
return errors.Errorf("unable to encrypt format bytes")
}
if err := m.j.WriteKopiaRepositoryBlob(ctx, m.blobs, m.blobCfgBlob); err != nil {
return errors.Wrap(err, "unable to write format blob")
}
m.cache.Remove(ctx, []blob.ID{KopiaRepositoryBlobID})
return nil
}
// UniqueID gets the unique ID of a repository allocated at creation time.
func (m *Manager) UniqueID() []byte {
m.mu.RLock()
defer m.mu.RUnlock()
return m.j.UniqueID
}
// BlobCfgBlob gets the BlobStorageConfiguration.
func (m *Manager) BlobCfgBlob() (BlobStorageConfiguration, error) {
if err := m.maybeRefreshNotLocked(); err != nil {
return BlobStorageConfiguration{}, err
}
m.mu.RLock()
defer m.mu.RUnlock()
return m.blobCfgBlob, nil
}
// ObjectFormat gets the object format.
func (m *Manager) ObjectFormat() ObjectFormat {
m.mu.RLock()
defer m.mu.RUnlock()
return m.repoConfig.ObjectFormat
}
// FormatEncryptionKey gets the format encryption key derived from the password.
func (m *Manager) FormatEncryptionKey() []byte {
m.mu.RLock()
defer m.mu.RUnlock()
return m.formatEncryptionKey
}
// ScrubbedContentFormat returns scrubbed content format with all sensitive data replaced.
func (m *Manager) ScrubbedContentFormat() ContentFormat {
m.mu.RLock()
defer m.mu.RUnlock()
cf := m.repoConfig.ContentFormat
cf.MasterKey = nil
cf.HMACSecret = nil
return cf
}
// NewManager creates new format manager which automatically refreshes format blob on reads (in a blocking manner).
func NewManager(
ctx context.Context,
st blob.Storage,
cacheDir string,
validDuration time.Duration,
password string,
timeNow func() time.Time,
) (*Manager, error) {
return NewManagerWithCache(ctx, st, validDuration, password, timeNow, NewFormatBlobCache(cacheDir, validDuration, timeNow))
}
// NewManagerWithCache creates new format manager which automatically refreshes format blob on reads (in a blocking manner)
// and uses the provided cache.
func NewManagerWithCache(
ctx context.Context,
st blob.Storage,
validDuration time.Duration,
password string,
timeNow func() time.Time,
cache blobCache,
) (*Manager, error) {
var ignoreCacheOnFirstRefresh bool
if validDuration < 0 {
// valid duration less than zero indicates we want to skip cache on first access
validDuration = DefaultRepositoryBlobCacheDuration
ignoreCacheOnFirstRefresh = true
}
if validDuration > DefaultRepositoryBlobCacheDuration {
log(ctx).Infof("repository format cache duration capped at %v", DefaultRepositoryBlobCacheDuration)
validDuration = DefaultRepositoryBlobCacheDuration
}
m := &Manager{
ctx: ctx,
blobs: st,
validDuration: validDuration,
password: password,
cache: cache,
timeNow: timeNow,
ignoreCacheOnFirstRefresh: ignoreCacheOnFirstRefresh,
}
err := m.refresh(ctx)
return m, err
}
// ErrAlreadyInitialized indicates that repository has already been initialized.
var ErrAlreadyInitialized = errors.Errorf("repository already initialized")
// Initialize initializes the format blob in a given storage.
func Initialize(ctx context.Context, st blob.Storage, formatBlob *KopiaRepositoryJSON, repoConfig *RepositoryConfig, blobcfg BlobStorageConfiguration, password string) error {
// get the blob - expect ErrNotFound
var tmp gather.WriteBuffer
defer tmp.Close()
err := st.GetBlob(ctx, KopiaRepositoryBlobID, 0, -1, &tmp)
if err == nil {
return ErrAlreadyInitialized
}
if !errors.Is(err, blob.ErrBlobNotFound) {
return errors.Wrap(err, "unexpected error when checking for format blob")
}
err = st.GetBlob(ctx, KopiaBlobCfgBlobID, 0, -1, &tmp)
if err == nil {
return errors.Errorf("possible corruption: blobcfg blob exists, but format blob is not found")
}
if !errors.Is(err, blob.ErrBlobNotFound) {
return errors.Wrap(err, "unexpected error when checking for blobcfg blob")
}
if formatBlob.EncryptionAlgorithm == "" {
formatBlob.EncryptionAlgorithm = DefaultFormatEncryption
}
if formatBlob.KeyDerivationAlgorithm == "" {
formatBlob.KeyDerivationAlgorithm = DefaultKeyDerivationAlgorithm
}
if len(formatBlob.UniqueID) == 0 {
formatBlob.UniqueID = randomBytes(UniqueIDLengthBytes)
}
formatEncryptionKey, err := formatBlob.DeriveFormatEncryptionKeyFromPassword(password)
if err != nil {
return errors.Wrap(err, "unable to derive format encryption key")
}
if err = repoConfig.MutableParameters.Validate(); err != nil {
return errors.Wrap(err, "invalid parameters")
}
if err = blobcfg.Validate(); err != nil {
return errors.Wrap(err, "blob config")
}
if err = formatBlob.EncryptRepositoryConfig(repoConfig, formatEncryptionKey); err != nil {
return errors.Wrap(err, "unable to encrypt format bytes")
}
if err := formatBlob.WriteBlobCfgBlob(ctx, st, blobcfg, formatEncryptionKey); err != nil {
return errors.Wrap(err, "unable to write blobcfg blob")
}
if err := formatBlob.WriteKopiaRepositoryBlob(ctx, st, blobcfg); err != nil {
return errors.Wrap(err, "unable to write format blob")
}
return nil
}
var _ Provider = (*Manager)(nil)
func randomBytes(n int) []byte {
b := make([]byte, n)
io.ReadFull(rand.Reader, b) //nolint:errcheck
return b
}

View File

@@ -0,0 +1,309 @@
package format_test
import (
"bytes"
"testing"
"time"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/internal/blobtesting"
"github.com/kopia/kopia/internal/epoch"
"github.com/kopia/kopia/internal/faketime"
"github.com/kopia/kopia/internal/feature"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/testlogging"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/encryption"
"github.com/kopia/kopia/repo/format"
"github.com/kopia/kopia/repo/hashing"
)
var (
errSomeError = errors.Errorf("some error")
cf = format.ContentFormat{
MutableParameters: format.MutableParameters{
Version: format.FormatVersion1,
EpochParameters: epoch.DefaultParameters(),
MaxPackSize: 20e6,
IndexVersion: 2,
},
Hash: hashing.DefaultAlgorithm,
Encryption: encryption.DefaultAlgorithm,
HMACSecret: []byte{1, 2, 3, 4, 5},
}
uli = &format.UpgradeLockIntent{
OwnerID: "foo@bar",
}
rc = &format.RepositoryConfig{
ContentFormat: cf,
UpgradeLock: uli,
}
cacheDuration = 10 * time.Minute
)
func TestFormatManager(t *testing.T) {
ctx := testlogging.Context(t)
startTime := time.Date(2020, 1, 1, 12, 0, 0, 0, time.UTC)
ta := faketime.NewTimeAdvance(startTime, 0)
nowFunc := ta.NowFunc()
cache := format.NewMemoryBlobCache(nowFunc)
st := blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, nil)
fst := blobtesting.NewFaultyStorage(st)
require.NoError(t, format.Initialize(ctx, fst, &format.KopiaRepositoryJSON{}, rc, format.BlobStorageConfiguration{}, "some-password"))
rawBytes := mustGetBytes(t, st, "kopia.repository")
mgr, err := format.NewManagerWithCache(ctx, fst, cacheDuration, "some-password", nowFunc, cache)
require.NoError(t, err)
require.Equal(t, cf.HMACSecret, mgr.GetHmacSecret())
require.Equal(t, cf.Encryption, mgr.GetEncryptionAlgorithm())
require.Equal(t, cf.Hash, mgr.GetHashFunction())
require.NotNil(t, mgr.HashFunc())
require.NotNil(t, mgr.Encryptor())
require.Equal(t, cf.MasterKey, mgr.GetMasterKey())
require.Equal(t, false, mgr.SupportsPasswordChange())
require.Equal(t, startTime, mgr.LoadedTime())
require.Equal(t, cf.MutableParameters, mustGetMutableParameters(t, mgr))
require.True(t, bytes.Contains(mustGetRepositoryFormatBytes(t, mgr), rawBytes))
require.Equal(t, uli, mustGetUpgradeLockIntent(t, mgr))
// move time to be 1ns shy of when the cache expires
fst.AddFault(blobtesting.MethodGetBlob).ErrorInstead(errSomeError)
ta.Advance(cacheDuration - 1)
// despite the failure, we still trust the cache
mustGetMutableParameters(t, mgr)
// now move the final nanosecond, this will trigger a load and storage errors
ta.Advance(1)
// error on first read, subsequent reads are ok
require.ErrorIs(t, expectMutableParametersError(t, mgr), errSomeError)
mustGetMutableParameters(t, mgr)
mustGetMutableParameters(t, mgr)
n := mgr.LoadedTime()
require.Equal(t, 2, mgr.RefreshCount())
// open another manager when cache is still valid, it will reuse old cached time
ta.Advance(5)
mgr2, err := format.NewManagerWithCache(ctx, fst, cacheDuration, "some-password", nowFunc, cache)
require.NoError(t, err)
mustGetMutableParameters(t, mgr2)
require.Equal(t, n, mgr2.LoadedTime())
// open another manager when cache has already expired
ta.Advance(2 * cacheDuration)
n = ta.NowFunc()()
mgr3, err := format.NewManagerWithCache(ctx, fst, cacheDuration, "some-password", nowFunc, cache)
require.NoError(t, err)
// make sure we're using current time
require.Equal(t, n, mgr3.LoadedTime())
// update using mgr3
mp := mustGetMutableParameters(t, mgr3)
bc2 := mustGetBlobStorageConfiguration(t, mgr3)
rf2 := mustGetRequiredFeatures(t, mgr3)
// make some changes
mp.MaxPackSize++
require.NoError(t, mgr3.SetParameters(ctx, mp, bc2, rf2))
// enough time has passed since last read, so mgr will notice the update immediately
require.Equal(t, mp, mustGetMutableParameters(t, mgr))
// update again
oldmp := mp
mp.MaxPackSize++
require.NoError(t, mgr3.SetParameters(ctx, mp, bc2, rf2))
// mgr still sees old mp
require.Equal(t, oldmp, mustGetMutableParameters(t, mgr))
// advance time, the now update is now visible
ta.Advance(cacheDuration)
require.Equal(t, mp, mustGetMutableParameters(t, mgr))
}
func TestInitialize(t *testing.T) {
ctx := testlogging.Context(t)
st := blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, nil)
fst := blobtesting.NewFaultyStorage(st)
// error fetching first blob - kopia.repository
fst.AddFault(blobtesting.MethodGetBlob).ErrorInstead(errSomeError)
require.ErrorIs(t,
format.Initialize(ctx, fst, &format.KopiaRepositoryJSON{}, rc, format.BlobStorageConfiguration{}, "some-password"),
errSomeError)
// error fetching second blob - kopia.blobcfg
fst.AddFault(blobtesting.MethodGetBlob)
fst.AddFault(blobtesting.MethodGetBlob).ErrorInstead(errSomeError)
require.ErrorIs(t,
format.Initialize(ctx, fst, &format.KopiaRepositoryJSON{}, rc, format.BlobStorageConfiguration{}, "some-password"),
errSomeError)
// success
require.NoError(t, format.Initialize(ctx, fst, &format.KopiaRepositoryJSON{}, rc, format.BlobStorageConfiguration{}, "some-password"))
// already initialized
require.ErrorIs(t,
format.Initialize(ctx, fst, &format.KopiaRepositoryJSON{}, rc, format.BlobStorageConfiguration{}, "some-password"),
format.ErrAlreadyInitialized)
}
func TestChangePassword(t *testing.T) {
ctx := testlogging.Context(t)
startTime := time.Date(2020, 1, 1, 12, 0, 0, 0, time.UTC)
ta := faketime.NewTimeAdvance(startTime, 0)
nowFunc := ta.NowFunc()
cache := format.NewMemoryBlobCache(nowFunc)
cf2 := cf
cf2.Version = format.FormatVersion3
cf2.EnablePasswordChange = true
rc = &format.RepositoryConfig{
ContentFormat: cf2,
UpgradeLock: uli,
}
st := blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, nil)
fst := blobtesting.NewFaultyStorage(st)
require.NoError(t, format.Initialize(ctx, fst, &format.KopiaRepositoryJSON{}, rc, format.BlobStorageConfiguration{}, "some-password"))
mgr, err := format.NewManagerWithCache(ctx, fst, cacheDuration, "some-password", nowFunc, cache)
require.NoError(t, err)
mgr2, err := format.NewManagerWithCache(ctx, fst, cacheDuration, "some-password", nowFunc, cache)
require.NoError(t, err)
require.NoError(t, mgr2.ChangePassword(ctx, "new-password"))
// immediately after changing the password, both managers can still read the repo
mustGetMutableParameters(t, mgr)
mustGetMutableParameters(t, mgr2)
ta.Advance(cacheDuration)
require.ErrorIs(t, expectMutableParametersError(t, mgr), format.ErrInvalidPassword)
mustGetMutableParameters(t, mgr2)
_, err = format.NewManagerWithCache(ctx, fst, cacheDuration, "some-password", nowFunc, cache)
require.ErrorIs(t, err, format.ErrInvalidPassword)
}
func TestFormatManagerValidDuration(t *testing.T) {
cases := map[time.Duration]time.Duration{
-1: 15 * time.Minute,
time.Second: time.Second,
30 * time.Minute: 15 * time.Minute,
10 * time.Minute: 10 * time.Minute,
}
for requestedCacheDuration, actualCacheDuration := range cases {
ctx := testlogging.Context(t)
startTime := time.Date(2020, 1, 1, 12, 0, 0, 0, time.UTC)
ta := faketime.NewTimeAdvance(startTime, 0)
nowFunc := ta.NowFunc()
cache := format.NewMemoryBlobCache(nowFunc)
st := blobtesting.NewMapStorage(blobtesting.DataMap{}, nil, nil)
fst := blobtesting.NewFaultyStorage(st)
require.NoError(t, format.Initialize(ctx, fst, &format.KopiaRepositoryJSON{}, rc, format.BlobStorageConfiguration{}, "some-password"))
if requestedCacheDuration < 0 {
// plant a malformed cache entry to ensure it's not being used
cache.Put(ctx, "kopia.repository", []byte("malformed"))
}
mgr, err := format.NewManagerWithCache(ctx, fst, requestedCacheDuration, "some-password", nowFunc, cache)
require.NoError(t, err)
require.Equal(t, actualCacheDuration, mgr.ValidCacheDuration())
}
}
func mustGetMutableParameters(t *testing.T, mgr *format.Manager) format.MutableParameters {
t.Helper()
mp, err := mgr.GetMutableParameters()
require.NoError(t, err)
return mp
}
func mustGetUpgradeLockIntent(t *testing.T, mgr *format.Manager) *format.UpgradeLockIntent {
t.Helper()
uli, err := mgr.GetUpgradeLockIntent(testlogging.Context(t))
require.NoError(t, err)
return uli
}
func mustGetRepositoryFormatBytes(t *testing.T, mgr *format.Manager) []byte {
t.Helper()
b, err := mgr.RepositoryFormatBytes()
require.NoError(t, err)
return b
}
func mustGetRequiredFeatures(t *testing.T, mgr *format.Manager) []feature.Required {
t.Helper()
rf, err := mgr.RequiredFeatures()
require.NoError(t, err)
return rf
}
func mustGetBlobStorageConfiguration(t *testing.T, mgr *format.Manager) format.BlobStorageConfiguration {
t.Helper()
cfg, err := mgr.BlobCfgBlob()
require.NoError(t, err)
return cfg
}
func expectMutableParametersError(t *testing.T, mgr *format.Manager) error {
t.Helper()
_, err := mgr.GetMutableParameters()
require.Error(t, err)
return err
}
func mustGetBytes(t *testing.T, st blob.Storage, blobID blob.ID) []byte {
t.Helper()
var tmp gather.WriteBuffer
defer tmp.Close()
require.NoError(t, st.GetBlob(testlogging.Context(t), blobID, 0, -1, &tmp))
return tmp.ToByteSlice()
}

View File

@@ -59,8 +59,8 @@ type Provider interface {
GetMutableParameters() (MutableParameters, error)
SupportsPasswordChange() bool
GetMasterKey() []byte
RepositoryFormatBytes() []byte
Struct() ContentFormat
RepositoryFormatBytes() ([]byte, error)
}
type formattingOptionsProvider struct {
@@ -71,13 +71,11 @@ type formattingOptionsProvider struct {
formatBytes []byte
}
func (f *formattingOptionsProvider) Struct() ContentFormat {
return *f.ContentFormat
}
// NewFormattingOptionsProvider validates the provided formatting options and returns static
// FormattingOptionsProvider based on them.
func NewFormattingOptionsProvider(f *ContentFormat, formatBytes []byte) (Provider, error) {
func NewFormattingOptionsProvider(f0 *ContentFormat, formatBytes []byte) (Provider, error) {
clone := *f0
f := &clone
formatVersion := f.Version
if formatVersion < MinSupportedReadVersion || formatVersion > CurrentWriteVersion {
@@ -96,6 +94,12 @@ func NewFormattingOptionsProvider(f *ContentFormat, formatBytes []byte) (Provide
return nil, errors.Errorf("index version %v is not supported", f.IndexVersion)
}
// apply default
if f.MaxPackSize == 0 {
// legacy only, apply default
f.MaxPackSize = 20 << 20 //nolint:gomnd
}
h, err := hashing.CreateHashFunc(f)
if err != nil {
return nil, errors.Wrap(err, "unable to create hash")
@@ -145,8 +149,12 @@ func (f *formattingOptionsProvider) HashFunc() hashing.HashFunc {
return f.h
}
func (f *formattingOptionsProvider) RepositoryFormatBytes() []byte {
return f.formatBytes
func (f *formattingOptionsProvider) RepositoryFormatBytes() ([]byte, error) {
if f.SupportsPasswordChange() {
return nil, nil
}
return f.formatBytes, nil
}
var _ Provider = (*formattingOptionsProvider)(nil)

View File

@@ -0,0 +1,48 @@
package format
import (
"context"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/feature"
"github.com/kopia/kopia/repo/blob"
)
// SetParameters sets the mutable repository parameters.
func (m *Manager) SetParameters(
ctx context.Context,
mp MutableParameters,
blobcfg BlobStorageConfiguration,
requiredFeatures []feature.Required,
) error {
m.mu.Lock()
defer m.mu.Unlock()
if err := mp.Validate(); err != nil {
return errors.Wrap(err, "invalid parameters")
}
if err := blobcfg.Validate(); err != nil {
return errors.Wrap(err, "invalid blob-config options")
}
m.repoConfig.ContentFormat.MutableParameters = mp
m.repoConfig.RequiredFeatures = requiredFeatures
if err := m.j.EncryptRepositoryConfig(m.repoConfig, m.formatEncryptionKey); err != nil {
return errors.Errorf("unable to encrypt format bytes")
}
if err := m.j.WriteBlobCfgBlob(ctx, m.blobs, blobcfg, m.formatEncryptionKey); err != nil {
return errors.Wrap(err, "unable to write blobcfg blob")
}
if err := m.j.WriteKopiaRepositoryBlob(ctx, m.blobs, m.blobCfgBlob); err != nil {
return errors.Wrap(err, "unable to write format blob")
}
m.cache.Remove(ctx, []blob.ID{KopiaRepositoryBlobID, KopiaBlobCfgBlobID})
return nil
}

View File

@@ -2,7 +2,6 @@
import (
"encoding/json"
"time"
"github.com/pkg/errors"
@@ -24,17 +23,8 @@ type EncryptedRepositoryConfig struct {
Format RepositoryConfig `json:"format"`
}
// DecodedRepositoryConfig encapsulates contents of decoded `kopia.repository` blob.
type DecodedRepositoryConfig struct {
KopiaRepository *KopiaRepositoryJSON
KopiaRepositoryBytes []byte // serialized format blob
CacheMTime time.Time // mod time of the format blob cache file
RepoConfig *RepositoryConfig // unencrypted format blob structure
FormatEncryptionKey []byte // key derived from the password
}
// DecryptRepositoryConfig decrypts RepositoryConfig stored in EncryptedFormatBytes.
func (f *KopiaRepositoryJSON) DecryptRepositoryConfig(masterKey []byte) (*RepositoryConfig, error) {
// decryptRepositoryConfig decrypts RepositoryConfig stored in EncryptedFormatBytes.
func (f *KopiaRepositoryJSON) decryptRepositoryConfig(masterKey []byte) (*RepositoryConfig, error) {
switch f.EncryptionAlgorithm {
case aes256GcmEncryption:
plainText, err := decryptRepositoryBlobBytesAes256Gcm(f.EncryptedFormatBytes, masterKey, f.UniqueID)

172
repo/format/upgrade_lock.go Normal file
View File

@@ -0,0 +1,172 @@
package format
import (
"context"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/blob"
)
// BackupBlobIDPrefix is the prefix for all identifiers of the BLOBs that
// keep a backup copy of the FormatBlobID BLOB for the purposes of rollback
// during upgrade.
const BackupBlobIDPrefix = "kopia.repository.backup."
// BackupBlobID gets the upgrade backu pblob-id fro mthe lock.
func BackupBlobID(l UpgradeLockIntent) blob.ID {
return blob.ID(BackupBlobIDPrefix + l.OwnerID)
}
// SetUpgradeLockIntent sets the upgrade lock intent on the repository format
// blob for other clients to notice. If a lock intent was already placed then
// it updates the existing lock using the output of the UpgradeLock.Update().
//
// This method also backs up the original format version on the upgrade lock
// intent and sets the latest format-version o nthe repository blob. This
// should cause the unsupporting clients (non-upgrade capable) to fail
// connecting to the repository.
func (m *Manager) SetUpgradeLockIntent(ctx context.Context, l UpgradeLockIntent) (*UpgradeLockIntent, error) {
if err := m.maybeRefreshNotLocked(); err != nil {
return nil, err
}
m.mu.Lock()
defer m.mu.Unlock()
if err := l.Validate(); err != nil {
return nil, errors.Wrap(err, "invalid upgrade lock intent")
}
if m.repoConfig.UpgradeLock == nil {
// when we are putting a new lock then ensure that we can upgrade
// to that version
if m.repoConfig.ContentFormat.Version >= MaxFormatVersion {
return nil, errors.Errorf("repository is using version %d, and version %d is the maximum",
m.repoConfig.ContentFormat.Version, MaxFormatVersion)
}
// backup the current repository config from local cache to the
// repository when we place the lock for the first time
if err := m.j.WriteKopiaRepositoryBlobWithID(ctx, m.blobs, m.blobCfgBlob, BackupBlobID(l)); err != nil {
return nil, errors.Wrap(err, "failed to backup the repo format blob")
}
// set a new lock or revoke an existing lock
m.repoConfig.UpgradeLock = &l
// mark the upgrade to the new format version, this will ensure that older
// clients won't be able to parse the new version
m.repoConfig.ContentFormat.Version = MaxFormatVersion
} else if newL, err := m.repoConfig.UpgradeLock.Update(&l); err == nil {
m.repoConfig.UpgradeLock = newL
} else {
return nil, errors.Wrap(err, "failed to update the existing lock")
}
if err := m.updateRepoConfigLocked(ctx); err != nil {
return nil, errors.Wrap(err, "error updating repo config")
}
return m.repoConfig.UpgradeLock.Clone(), nil
}
// CommitUpgrade removes the upgrade lock from the from the repository format
// blob. This in-effect commits the new repository format t othe repository and
// resumes all access to the repository.
func (m *Manager) CommitUpgrade(ctx context.Context) error {
if err := m.maybeRefreshNotLocked(); err != nil {
return err
}
m.mu.Lock()
defer m.mu.Unlock()
if m.repoConfig.UpgradeLock == nil {
return errors.New("no upgrade in progress")
}
// restore the old format version
m.repoConfig.UpgradeLock = nil
return m.updateRepoConfigLocked(ctx)
}
// RollbackUpgrade removes the upgrade lock while also restoring the
// format-blob's original version. This method does not restore the original
// repository data format and neither does it validate against any repository
// changes. Rolling back the repository format is currently not supported and
// hence using this API could render the repository corrupted and unreadable by
// clients.
func (m *Manager) RollbackUpgrade(ctx context.Context) error {
if err := m.maybeRefreshNotLocked(); err != nil {
return err
}
m.mu.Lock()
defer m.mu.Unlock()
if m.repoConfig.UpgradeLock == nil {
return errors.New("no upgrade in progress")
}
// restore the oldest backup and delete the rest
var oldestBackup *blob.Metadata
if err := m.blobs.ListBlobs(ctx, BackupBlobIDPrefix, func(bm blob.Metadata) error {
var delID blob.ID
if oldestBackup == nil || bm.Timestamp.Before(oldestBackup.Timestamp) {
if oldestBackup != nil {
// delete the current candidate because we have found an even older one
delID = oldestBackup.BlobID
}
oldestBackup = &bm
} else {
delID = bm.BlobID
}
if delID != "" {
// delete the backup that we are not going to need for rollback
if err := m.blobs.DeleteBlob(ctx, delID); err != nil {
return errors.Wrapf(err, "failed to delete the format blob backup %q", delID)
}
}
return nil
}); err != nil {
return errors.Wrap(err, "failed to list backup blobs")
}
// restore only when we find a backup, otherwise simply cleanup the local cache
if oldestBackup != nil {
var d gather.WriteBuffer
if err := m.blobs.GetBlob(ctx, oldestBackup.BlobID, 0, -1, &d); err != nil {
return errors.Wrapf(err, "failed to read from backup %q", oldestBackup.BlobID)
}
if err := m.blobs.PutBlob(ctx, KopiaRepositoryBlobID, d.Bytes(), blob.PutOptions{}); err != nil {
return errors.Wrapf(err, "failed to restore format blob from backup %q", oldestBackup.BlobID)
}
// delete the backup after we have restored the format-blob
if err := m.blobs.DeleteBlob(ctx, oldestBackup.BlobID); err != nil {
return errors.Wrapf(err, "failed to delete the format blob backup %q", oldestBackup.BlobID)
}
}
m.cache.Remove(ctx, []blob.ID{KopiaRepositoryBlobID})
return nil
}
// GetUpgradeLockIntent gets the current upgrade lock intent.
func (m *Manager) GetUpgradeLockIntent(ctx context.Context) (*UpgradeLockIntent, error) {
if err := m.maybeRefreshNotLocked(); err != nil {
return nil, err
}
m.mu.RLock()
defer m.mu.RUnlock()
return m.repoConfig.UpgradeLock, nil
}

View File

@@ -51,7 +51,12 @@ func (l *UpgradeLockIntent) Update(other *UpgradeLockIntent) (*UpgradeLockIntent
// Clone creates a copy of the UpgradeLock instance.
func (l *UpgradeLockIntent) Clone() *UpgradeLockIntent {
if l == nil {
return nil
}
clone := *l
return &clone
}

View File

@@ -1,4 +1,4 @@
package repo_test
package format_test
import (
"context"
@@ -24,6 +24,7 @@
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/encryption"
"github.com/kopia/kopia/repo/format"
"github.com/kopia/kopia/repo/object"
)
func TestFormatUpgradeSetLock(t *testing.T) {
@@ -43,17 +44,17 @@ func TestFormatUpgradeSetLock(t *testing.T) {
}
// set invalid lock
_, err := env.RepositoryWriter.SetUpgradeLockIntent(ctx, *l)
_, err := env.RepositoryWriter.FormatManager().SetUpgradeLockIntent(ctx, *l)
require.EqualError(t, err, "invalid upgrade lock intent: no owner-id set, it is required to set a unique owner-id")
l.OwnerID = "upgrade-owner"
l, err = env.RepositoryWriter.SetUpgradeLockIntent(ctx, *l)
l, err = env.RepositoryWriter.FormatManager().SetUpgradeLockIntent(ctx, *l)
require.NoError(t, err)
l.OwnerID = "new-upgrade-owner"
// verify that second owner cannot set / update the lock
_, err = env.RepositoryWriter.SetUpgradeLockIntent(ctx, *l)
_, err = env.RepositoryWriter.FormatManager().SetUpgradeLockIntent(ctx, *l)
require.EqualError(t, err,
"failed to update the existing lock: upgrade owner-id mismatch \"new-upgrade-owner\" != \"upgrade-owner\", you are not the owner of the upgrade lock")
@@ -63,10 +64,10 @@ func TestFormatUpgradeSetLock(t *testing.T) {
l.AdvanceNoticeDuration *= 2
// update the lock
_, err = env.RepositoryWriter.SetUpgradeLockIntent(ctx, *l)
_, err = env.RepositoryWriter.FormatManager().SetUpgradeLockIntent(ctx, *l)
require.NoError(t, err)
require.NoError(t, env.RepositoryWriter.CommitUpgrade(ctx))
require.NoError(t, env.RepositoryWriter.FormatManager().CommitUpgrade(ctx))
}
func TestFormatUpgradeAlreadyUpgraded(t *testing.T) {
@@ -83,7 +84,7 @@ func TestFormatUpgradeAlreadyUpgraded(t *testing.T) {
MaxPermittedClockDrift: formatBlockCacheDuration / 3,
}
_, err := env.RepositoryWriter.SetUpgradeLockIntent(ctx, *l)
_, err := env.RepositoryWriter.FormatManager().SetUpgradeLockIntent(ctx, *l)
require.EqualError(t, err, fmt.Sprintf("repository is using version %d, and version %d is the maximum",
format.MaxFormatVersion, format.MaxFormatVersion))
}
@@ -104,15 +105,15 @@ func TestFormatUpgradeCommit(t *testing.T) {
MaxPermittedClockDrift: formatBlockCacheDuration / 3,
}
require.EqualError(t, env.RepositoryWriter.CommitUpgrade(ctx), "no upgrade in progress")
require.EqualError(t, env.RepositoryWriter.FormatManager().CommitUpgrade(ctx), "no upgrade in progress")
_, err := env.RepositoryWriter.SetUpgradeLockIntent(ctx, *l)
_, err := env.RepositoryWriter.FormatManager().SetUpgradeLockIntent(ctx, *l)
require.NoError(t, err)
require.NoError(t, env.RepositoryWriter.CommitUpgrade(ctx))
require.NoError(t, env.RepositoryWriter.FormatManager().CommitUpgrade(ctx))
// verify that rollback after commit fails
require.EqualError(t, env.RepositoryWriter.RollbackUpgrade(ctx), "no upgrade in progress")
require.EqualError(t, env.RepositoryWriter.FormatManager().RollbackUpgrade(ctx), "no upgrade in progress")
}
func TestFormatUpgradeRollback(t *testing.T) {
@@ -131,16 +132,16 @@ func TestFormatUpgradeRollback(t *testing.T) {
MaxPermittedClockDrift: formatBlockCacheDuration / 3,
}
_, err := env.RepositoryWriter.SetUpgradeLockIntent(ctx, *l)
_, err := env.RepositoryWriter.FormatManager().SetUpgradeLockIntent(ctx, *l)
require.NoError(t, err)
require.NoError(t, env.RepositoryWriter.RollbackUpgrade(ctx))
require.NoError(t, env.RepositoryWriter.FormatManager().RollbackUpgrade(ctx))
// reopen the repo because we still have the lock in-memory
env.MustReopen(t)
// verify that commit after rollback fails
require.EqualError(t, env.RepositoryWriter.CommitUpgrade(ctx), "no upgrade in progress")
require.EqualError(t, env.RepositoryWriter.FormatManager().CommitUpgrade(ctx), "no upgrade in progress")
}
func TestFormatUpgradeMultipleLocksRollback(t *testing.T) {
@@ -164,25 +165,25 @@ func TestFormatUpgradeMultipleLocksRollback(t *testing.T) {
})
// first lock by primary creator
_, err := env.RepositoryWriter.SetUpgradeLockIntent(ctx, *l)
_, err := env.RepositoryWriter.FormatManager().SetUpgradeLockIntent(ctx, *l)
require.NoError(t, err)
// second lock from a random owner
secondL := l.Clone()
secondL.OwnerID = "another-upgrade-owner"
_, err = secondWriter.(repo.DirectRepositoryWriter).SetUpgradeLockIntent(ctx, *secondL)
_, err = secondWriter.(repo.DirectRepositoryWriter).FormatManager().SetUpgradeLockIntent(ctx, *secondL)
require.NoError(t, err)
// verify that we have two repository backups, the second one will contain
// the first owner's lock
{
var backups []string
require.NoError(t, env.RootStorage().ListBlobs(ctx, repo.FormatBlobBackupIDPrefix, func(bm blob.Metadata) error {
require.NoError(t, env.RootStorage().ListBlobs(ctx, format.BackupBlobIDPrefix, func(bm blob.Metadata) error {
backups = append(backups, string(bm.BlobID))
return nil
}))
sort.Strings(backups)
require.Equal(t, []string{string(repo.FormatBlobBackupID(*secondL)), string(repo.FormatBlobBackupID(*l))},
require.Equal(t, []string{string(format.BackupBlobID(*secondL)), string(format.BackupBlobID(*l))},
backups, "invalid backups list")
}
@@ -195,10 +196,10 @@ func TestFormatUpgradeMultipleLocksRollback(t *testing.T) {
require.NoError(t, mperr)
require.Equal(t, format.FormatVersion3, mp.Version)
require.NoError(t, env.RepositoryWriter.RollbackUpgrade(ctx))
require.NoError(t, env.RepositoryWriter.FormatManager().RollbackUpgrade(ctx))
// verify that we have no repository backups pending
require.NoError(t, env.RootStorage().ListBlobs(ctx, repo.FormatBlobBackupIDPrefix, func(bm blob.Metadata) error {
require.NoError(t, env.RootStorage().ListBlobs(ctx, format.BackupBlobIDPrefix, func(bm blob.Metadata) error {
t.Fatalf("found unexpected backup: %s", bm.BlobID)
return nil
}))
@@ -209,7 +210,7 @@ func TestFormatUpgradeMultipleLocksRollback(t *testing.T) {
// verify that commit after rollback fails, this ensures that the correct
// backup got restored because if the second backup was restored then we'd
// still get a lock to be committed without any error
require.EqualError(t, env.RepositoryWriter.CommitUpgrade(ctx), "no upgrade in progress")
require.EqualError(t, env.RepositoryWriter.FormatManager().CommitUpgrade(ctx), "no upgrade in progress")
// verify that we are back to the original version where we started from
mp, err = env.RepositoryWriter.ContentManager().ContentFormat().GetMutableParameters()
@@ -238,7 +239,7 @@ func TestFormatUpgradeFailureToBackupFormatBlobOnLock(t *testing.T) {
blobtesting.NewVersionedMapStorage(nil),
// GetBlob filter
func(ctx context.Context, id blob.ID) error {
if !allowGets && id == repo.FormatBlobBackupID(allowedLock) {
if !allowGets && id == format.BackupBlobID(allowedLock) {
return errors.New("unexpected error on get")
}
return nil
@@ -252,7 +253,7 @@ func(ctx context.Context) error {
},
// PutBlob callback
func(ctx context.Context, id blob.ID, _ *blob.PutOptions) error {
if !allowPuts || (strings.HasPrefix(string(id), repo.FormatBlobBackupIDPrefix) && id != repo.FormatBlobBackupID(allowedLock)) {
if !allowPuts || (strings.HasPrefix(string(id), format.BackupBlobIDPrefix) && id != format.BackupBlobID(allowedLock)) {
return errors.New("unexpected error")
}
return nil
@@ -286,31 +287,31 @@ func(ctx context.Context, id blob.ID, _ *blob.PutOptions) error {
r, err := repo.Open(testlogging.Context(t), configFile, "password", &repo.Options{UpgradeOwnerID: "allowed-upgrade-owner"})
require.NoError(t, err)
_, err = r.(repo.DirectRepositoryWriter).SetUpgradeLockIntent(testlogging.Context(t), *faultyLock)
_, err = r.(repo.DirectRepositoryWriter).FormatManager().SetUpgradeLockIntent(testlogging.Context(t), *faultyLock)
require.EqualError(t, err, "failed to backup the repo format blob: unable to write format blob \"kopia.repository.backup.faulty-upgrade-owner\": unexpected error")
_, err = r.(repo.DirectRepositoryWriter).SetUpgradeLockIntent(testlogging.Context(t), allowedLock)
_, err = r.(repo.DirectRepositoryWriter).FormatManager().SetUpgradeLockIntent(testlogging.Context(t), allowedLock)
require.NoError(t, err)
require.EqualError(t, r.(repo.DirectRepositoryWriter).RollbackUpgrade(testlogging.Context(t)),
require.EqualError(t, r.(repo.DirectRepositoryWriter).FormatManager().RollbackUpgrade(testlogging.Context(t)),
"failed to delete the format blob backup \"kopia.repository.backup.allowed-upgrade-owner\": unexpected error")
require.EqualError(t, r.(repo.DirectRepositoryWriter).RollbackUpgrade(testlogging.Context(t)),
require.EqualError(t, r.(repo.DirectRepositoryWriter).FormatManager().RollbackUpgrade(testlogging.Context(t)),
"failed to delete the format blob backup \"kopia.repository.backup.allowed-upgrade-owner\": unexpected error")
allowPuts = false
require.EqualError(t, r.(repo.DirectRepositoryWriter).RollbackUpgrade(testlogging.Context(t)),
require.EqualError(t, r.(repo.DirectRepositoryWriter).FormatManager().RollbackUpgrade(testlogging.Context(t)),
"failed to restore format blob from backup \"kopia.repository.backup.allowed-upgrade-owner\": unexpected error")
allowGets = false
require.EqualError(t, r.(repo.DirectRepositoryWriter).RollbackUpgrade(testlogging.Context(t)),
require.EqualError(t, r.(repo.DirectRepositoryWriter).FormatManager().RollbackUpgrade(testlogging.Context(t)),
"failed to read from backup \"kopia.repository.backup.allowed-upgrade-owner\": unexpected error on get")
allowPuts, allowGets, allowDeletes = true, true, true
require.NoError(t, r.(repo.DirectRepositoryWriter).RollbackUpgrade(testlogging.Context(t)))
require.NoError(t, r.(repo.DirectRepositoryWriter).FormatManager().RollbackUpgrade(testlogging.Context(t)))
}
func TestFormatUpgradeDuringOngoingWriteSessions(t *testing.T) {
@@ -365,7 +366,8 @@ func TestFormatUpgradeDuringOngoingWriteSessions(t *testing.T) {
MaxPermittedClockDrift: formatBlockCacheDuration / 3,
}
_, err = env.RepositoryWriter.SetUpgradeLockIntent(ctx, l)
// set upgrade lock using independent client
_, err = env.MustConnectOpenAnother(t).(repo.DirectRepositoryWriter).FormatManager().SetUpgradeLockIntent(ctx, l)
require.NoError(t, err)
// ongoing writes should NOT get interrupted because the upgrade lock
@@ -390,7 +392,20 @@ func TestFormatUpgradeDuringOngoingWriteSessions(t *testing.T) {
// ongoing writes should get interrupted this time
require.ErrorIs(t, w1.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress)
require.ErrorIs(t, w2.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress)
require.ErrorIs(t, w3.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress)
require.ErrorIs(t, lw.Flush(ctx), repo.ErrRepositoryUnavailableDueToUpgrageInProgress)
}
func writeObject(ctx context.Context, t *testing.T, rep repo.RepositoryWriter, data []byte, testCaseID string) {
t.Helper()
w := rep.NewObjectWriter(ctx, object.WriterOptions{})
_, err := w.Write(data)
require.NoError(t, err, testCaseID)
_, err = w.Result()
require.NoError(t, err, testCaseID)
}

View File

@@ -9,7 +9,6 @@
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/ecc"
@@ -31,7 +30,6 @@
const (
hmacSecretLength = 32
masterKeyLength = 32
uniqueIDLength = 32
)
// NewRepositoryOptions specifies options that apply to newly created repositories.
@@ -45,67 +43,22 @@ type NewRepositoryOptions struct {
RetentionPeriod time.Duration `json:"retentionPeriod,omitempty"`
}
// ErrAlreadyInitialized indicates that repository has already been initialized.
var ErrAlreadyInitialized = errors.Errorf("repository already initialized")
// Initialize creates initial repository data structures in the specified storage with given credentials.
func Initialize(ctx context.Context, st blob.Storage, opt *NewRepositoryOptions, password string) error {
if opt == nil {
opt = &NewRepositoryOptions{}
}
// get the blob - expect ErrNotFound
var tmp gather.WriteBuffer
defer tmp.Close()
err := st.GetBlob(ctx, format.KopiaRepositoryBlobID, 0, -1, &tmp)
if err == nil {
return ErrAlreadyInitialized
}
if !errors.Is(err, blob.ErrBlobNotFound) {
return errors.Wrap(err, "unexpected error when checking for format blob")
}
err = st.GetBlob(ctx, format.KopiaBlobCfgBlobID, 0, -1, &tmp)
if err == nil {
return errors.Errorf("possible corruption: blobcfg blob exists, but format blob is not found")
}
if !errors.Is(err, blob.ErrBlobNotFound) {
return errors.Wrap(err, "unexpected error when checking for blobcfg blob")
}
formatBlob := formatBlobFromOptions(opt)
blobcfg := blobCfgBlobFromOptions(opt)
formatEncryptionKey, err := formatBlob.DeriveFormatEncryptionKeyFromPassword(password)
if err != nil {
return errors.Wrap(err, "unable to derive format encryption key")
}
f, err := repositoryObjectFormatFromOptions(opt)
repoConfig, err := repositoryObjectFormatFromOptions(opt)
if err != nil {
return errors.Wrap(err, "invalid parameters")
}
if err = f.MutableParameters.Validate(); err != nil {
return errors.Wrap(err, "invalid parameters")
}
if err = formatBlob.EncryptRepositoryConfig(f, formatEncryptionKey); err != nil {
return errors.Wrap(err, "unable to encrypt format bytes")
}
if err := formatBlob.WriteBlobCfgBlob(ctx, st, blobcfg, formatEncryptionKey); err != nil {
return errors.Wrap(err, "unable to write blobcfg blob")
}
if err := formatBlob.WriteKopiaRepositoryBlob(ctx, st, blobcfg); err != nil {
return errors.Wrap(err, "unable to write format blob")
}
return nil
//nolint:wrapcheck
return format.Initialize(ctx, st, formatBlob, repoConfig, blobcfg, password)
}
func formatBlobFromOptions(opt *NewRepositoryOptions) *format.KopiaRepositoryJSON {
@@ -114,7 +67,7 @@ func formatBlobFromOptions(opt *NewRepositoryOptions) *format.KopiaRepositoryJSO
BuildInfo: BuildInfo,
BuildVersion: BuildVersion,
KeyDerivationAlgorithm: format.DefaultKeyDerivationAlgorithm,
UniqueID: applyDefaultRandomBytes(opt.UniqueID, uniqueIDLength),
UniqueID: applyDefaultRandomBytes(opt.UniqueID, format.UniqueIDLengthBytes),
EncryptionAlgorithm: format.DefaultFormatEncryption,
}
}
@@ -178,13 +131,6 @@ func repositoryObjectFormatFromOptions(opt *NewRepositoryOptions) (*format.Repos
return f, nil
}
func randomBytes(n int) []byte {
b := make([]byte, n)
io.ReadFull(rand.Reader, b) //nolint:errcheck
return b
}
func applyDefaultInt(v, def int) int {
if v == 0 {
return def
@@ -211,6 +157,13 @@ func applyDefaultString(v, def string) string {
return v
}
func randomBytes(n int) []byte {
b := make([]byte, n)
io.ReadFull(rand.Reader, b) //nolint:errcheck
return b
}
func applyDefaultRandomBytes(b []byte, n int) []byte {
if b == nil {
return randomBytes(n)

View File

@@ -77,6 +77,9 @@ type Options struct {
// ErrInvalidPassword is returned when repository password is invalid.
var ErrInvalidPassword = format.ErrInvalidPassword
// ErrAlreadyInitialized is returned when repository is already initialized in the provided storage.
var ErrAlreadyInitialized = format.ErrAlreadyInitialized
// ErrRepositoryUnavailableDueToUpgrageInProgress is returned when repository
// is undergoing upgrade that requires exclusive access.
var ErrRepositoryUnavailableDueToUpgrageInProgress = errors.Errorf("repository upgrade in progress")
@@ -198,7 +201,7 @@ func openDirect(ctx context.Context, configFile string, lc *LocalConfig, passwor
// openWithConfig opens the repository with a given configuration, avoiding the need for a config file.
//
//nolint:funlen,gocyclo,cyclop
//nolint:funlen,gocyclo
func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, password string, options *Options, cacheOpts *content.CachingOptions, configFile string) (DirectRepository, error) {
cacheOpts = cacheOpts.CloneOrDefault()
cmOpts := &content.ManagerOptions{
@@ -206,19 +209,20 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw
DisableInternalLog: options.DisableInternalLog,
}
var ufb *format.DecodedRepositoryConfig
fmgr, ferr := format.NewManager(ctx, st, cacheOpts.CacheDirectory, lc.FormatBlobCacheDuration, password, cmOpts.TimeNow)
if ferr != nil {
return nil, errors.Wrap(ferr, "unable to create format manager")
}
if _, err := retry.WithExponentialBackoffMaxRetries(ctx, -1, "read repo config and wait for upgrade", func() (interface{}, error) {
var internalErr error
ufb, internalErr = format.ReadAndCacheDecodedRepositoryConfig(ctx, st, password, cacheOpts.CacheDirectory,
lc.FormatBlobCacheDuration)
if internalErr != nil {
if _, err := retry.WithExponentialBackoffMaxRetries(ctx, -1, "wait for upgrade", func() (interface{}, error) {
uli, err := fmgr.UpgradeLockIntent()
if err != nil {
//nolint:wrapcheck
return nil, internalErr
return nil, err
}
// retry if upgrade lock has been taken
if locked, _ := ufb.RepoConfig.UpgradeLock.IsLocked(cmOpts.TimeNow()); locked && options.UpgradeOwnerID != ufb.RepoConfig.UpgradeLock.OwnerID {
if locked, _ := uli.IsLocked(cmOpts.TimeNow()); locked && options.UpgradeOwnerID != uli.OwnerID {
return nil, ErrRepositoryUnavailableDueToUpgrageInProgress
}
@@ -230,40 +234,15 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw
return nil, err
}
if err := handleMissingRequiredFeatures(ctx, ufb.RepoConfig, options.TestOnlyIgnoreMissingRequiredFeatures); err != nil {
if err := handleMissingRequiredFeatures(ctx, fmgr, options.TestOnlyIgnoreMissingRequiredFeatures); err != nil {
return nil, err
}
cmOpts.RepositoryFormatBytes = ufb.KopiaRepositoryBytes
// Read blobcfg blob, potentially from cache.
bb, _, err := format.ReadAndCacheRepositoryBlobBytes(ctx, st, cacheOpts.CacheDirectory, format.KopiaBlobCfgBlobID, lc.FormatBlobCacheDuration)
if err != nil && !errors.Is(err, blob.ErrBlobNotFound) {
return nil, errors.Wrap(err, "unable to read blobcfg blob")
}
blobcfg, err := ufb.KopiaRepository.DeserializeBlobCfgBytes(bb, ufb.FormatEncryptionKey)
if err != nil {
return nil, ErrInvalidPassword
}
if ufb.RepoConfig.ContentFormat.EnablePasswordChange {
cacheOpts.HMACSecret = format.DeriveKeyFromMasterKey(ufb.RepoConfig.HMACSecret, ufb.KopiaRepository.UniqueID, localCacheIntegrityPurpose, localCacheIntegrityHMACSecretLength)
if fmgr.SupportsPasswordChange() {
cacheOpts.HMACSecret = format.DeriveKeyFromMasterKey(fmgr.GetHmacSecret(), fmgr.UniqueID(), localCacheIntegrityPurpose, localCacheIntegrityHMACSecretLength)
} else {
// deriving from ufb.FormatEncryptionKey was actually a bug, that only matters will change when we change the password
cacheOpts.HMACSecret = format.DeriveKeyFromMasterKey(ufb.FormatEncryptionKey, ufb.KopiaRepository.UniqueID, localCacheIntegrityPurpose, localCacheIntegrityHMACSecretLength)
}
fo := &ufb.RepoConfig.ContentFormat
if fo.MaxPackSize == 0 {
// legacy only, apply default
fo.MaxPackSize = 20 << 20 //nolint:gomnd
}
// do not embed repository format info in pack blobs when password change is enabled.
if fo.EnablePasswordChange {
cmOpts.RepositoryFormatBytes = nil
cacheOpts.HMACSecret = format.DeriveKeyFromMasterKey(fmgr.FormatEncryptionKey(), fmgr.UniqueID(), localCacheIntegrityPurpose, localCacheIntegrityHMACSecretLength)
}
limits := throttlingLimitsFromConnectionInfo(ctx, st.ConnectionInfo())
@@ -271,9 +250,9 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw
limits = *lc.Throttling
}
st, throttler, err := addThrottler(st, limits)
if err != nil {
return nil, errors.Wrap(err, "unable to add throttler")
st, throttler, ferr := addThrottler(st, limits)
if ferr != nil {
return nil, errors.Wrap(ferr, "unable to add throttler")
}
throttler.OnUpdate(func(l throttling.Limits) error {
@@ -287,22 +266,21 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw
return lc2.writeToFile(configFile)
})
blobcfg, err := fmgr.BlobCfgBlob()
if err != nil {
return nil, errors.Wrap(err, "blob configuration")
}
if blobcfg.IsRetentionEnabled() {
st = wrapLockingStorage(st, blobcfg)
}
// background/interleaving upgrade lock storage monitor
st = upgradeLockMonitor(options.UpgradeOwnerID, st, password, cacheOpts, lc.FormatBlobCacheDuration,
ufb.CacheMTime, cmOpts.TimeNow, options.OnFatalError, options.TestOnlyIgnoreMissingRequiredFeatures)
st = upgradeLockMonitor(fmgr, options.UpgradeOwnerID, st, cmOpts.TimeNow, options.OnFatalError, options.TestOnlyIgnoreMissingRequiredFeatures)
fop, err := format.NewFormattingOptionsProvider(fo, cmOpts.RepositoryFormatBytes)
if err != nil {
return nil, errors.Wrap(err, "unable to create format options provider")
}
scm, err := content.NewSharedManager(ctx, st, fop, cacheOpts, cmOpts)
if err != nil {
return nil, errors.Wrap(err, "unable to create shared content manager")
scm, ferr := content.NewSharedManager(ctx, st, fmgr, cacheOpts, cmOpts)
if ferr != nil {
return nil, errors.Wrap(ferr, "unable to create shared content manager")
}
cm := content.NewWriteManager(ctx, scm, content.SessionOptions{
@@ -310,14 +288,14 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw
SessionHost: lc.Hostname,
}, "")
om, err := object.NewObjectManager(ctx, cm, ufb.RepoConfig.ObjectFormat)
if err != nil {
return nil, errors.Wrap(err, "unable to open object manager")
om, ferr := object.NewObjectManager(ctx, cm, fmgr.ObjectFormat())
if ferr != nil {
return nil, errors.Wrap(ferr, "unable to open object manager")
}
manifests, err := manifest.NewManager(ctx, cm, manifest.ManagerOptions{TimeNow: cmOpts.TimeNow})
if err != nil {
return nil, errors.Wrap(err, "unable to open manifests")
manifests, ferr := manifest.NewManager(ctx, cm, manifest.ManagerOptions{TimeNow: cmOpts.TimeNow})
if ferr != nil {
return nil, errors.Wrap(ferr, "unable to open manifests")
}
dr := &directRepository{
@@ -327,16 +305,13 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw
mmgr: manifests,
sm: scm,
directRepositoryParameters: directRepositoryParameters{
uniqueID: ufb.KopiaRepository.UniqueID,
cachingOptions: *cacheOpts,
formatBlob: ufb.KopiaRepository,
blobCfgBlob: blobcfg,
formatEncryptionKey: ufb.FormatEncryptionKey,
timeNow: cmOpts.TimeNow,
cliOpts: lc.ClientOptions.ApplyDefaults(ctx, "Repository in "+st.DisplayName()),
configFile: configFile,
nextWriterID: new(int32),
throttler: throttler,
cachingOptions: *cacheOpts,
fmgr: fmgr,
timeNow: cmOpts.TimeNow,
cliOpts: lc.ClientOptions.ApplyDefaults(ctx, "Repository in "+st.DisplayName()),
configFile: configFile,
nextWriterID: new(int32),
throttler: throttler,
},
closed: make(chan struct{}),
}
@@ -344,10 +319,15 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw
return dr, nil
}
func handleMissingRequiredFeatures(ctx context.Context, repoConfig *format.RepositoryConfig, ignoreErrors bool) error {
func handleMissingRequiredFeatures(ctx context.Context, fmgr *format.Manager, ignoreErrors bool) error {
required, err := fmgr.RequiredFeatures()
if err != nil {
return errors.Wrap(err, "required features")
}
// See if the current version of Kopia supports all features required by the repository format.
// so we can safely fail to start in case repository has been upgraded to a new, incompatible version.
if missingFeatures := feature.GetUnsupportedFeatures(repoConfig.RequiredFeatures, supportedFeatures); len(missingFeatures) > 0 {
if missingFeatures := feature.GetUnsupportedFeatures(required, supportedFeatures); len(missingFeatures) > 0 {
for _, mf := range missingFeatures {
if ignoreErrors || mf.IfNotUnderstood.Warn {
log(ctx).Warnf("%s", mf.UnsupportedMessage())
@@ -393,26 +373,22 @@ func addThrottler(st blob.Storage, limits throttling.Limits) (blob.Storage, thro
}
func upgradeLockMonitor(
fmgr *format.Manager,
upgradeOwnerID string,
st blob.Storage,
password string,
cacheOpts *content.CachingOptions,
lockRefreshInterval time.Duration,
lastSync time.Time,
now func() time.Time,
onFatalError func(err error),
ignoreMissingRequiredFeatures bool,
) blob.Storage {
var (
m sync.RWMutex
nextSync = lastSync.Add(lockRefreshInterval)
m sync.RWMutex
lastCheckTime time.Time
)
cb := func(ctx context.Context) error {
// protected read for nextSync because it will be shared between
// parallel storage operations
m.RLock()
if nextSync.After(now()) {
// see if we already checked that revision
if lastCheckTime.Equal(fmgr.LoadedTime()) {
m.RUnlock()
return nil
}
@@ -422,31 +398,30 @@ func upgradeLockMonitor(
m.Lock()
defer m.Unlock()
if nextSync.After(now()) {
ltime := fmgr.LoadedTime()
if lastCheckTime.Equal(ltime) {
return nil
}
ufb, err := format.ReadAndCacheDecodedRepositoryConfig(ctx, st, password, cacheOpts.CacheDirectory, lockRefreshInterval)
uli, err := fmgr.UpgradeLockIntent()
if err != nil {
//nolint:wrapcheck
return err
return errors.Wrap(err, "upgrade lock intent")
}
if err := handleMissingRequiredFeatures(ctx, ufb.RepoConfig, ignoreMissingRequiredFeatures); err != nil {
if err := handleMissingRequiredFeatures(ctx, fmgr, ignoreMissingRequiredFeatures); err != nil {
onFatalError(err)
return err
}
// only allow the upgrade owner to perform storage operations
if locked, _ := ufb.RepoConfig.UpgradeLock.IsLocked(now()); locked && upgradeOwnerID != ufb.RepoConfig.UpgradeLock.OwnerID {
return ErrRepositoryUnavailableDueToUpgrageInProgress
if uli != nil {
// only allow the upgrade owner to perform storage operations
if locked, _ := uli.IsLocked(now()); locked && upgradeOwnerID != uli.OwnerID {
return ErrRepositoryUnavailableDueToUpgrageInProgress
}
}
// prevent backward jumps on nextSync
newNextSync := ufb.CacheMTime.Add(lockRefreshInterval)
if newNextSync.After(nextSync) {
nextSync = newNextSync
}
lastCheckTime = ltime
return nil
}

View File

@@ -1,71 +0,0 @@
package repo
import (
"context"
"os"
"path/filepath"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/feature"
"github.com/kopia/kopia/repo/format"
)
func (r *directRepository) RequiredFeatures() ([]feature.Required, error) {
repoConfig, err := r.formatBlob.DecryptRepositoryConfig(r.formatEncryptionKey)
if err != nil {
return nil, errors.Wrap(err, "unable to decrypt repository config")
}
return repoConfig.RequiredFeatures, nil
}
// SetParameters changes mutable repository parameters.
func (r *directRepository) SetParameters(
ctx context.Context,
m format.MutableParameters,
blobcfg format.BlobStorageConfiguration,
requiredFeatures []feature.Required,
) error {
f := r.formatBlob
repoConfig, err := f.DecryptRepositoryConfig(r.formatEncryptionKey)
if err != nil {
return errors.Wrap(err, "unable to decrypt repository config")
}
if err := m.Validate(); err != nil {
return errors.Wrap(err, "invalid parameters")
}
if err := blobcfg.Validate(); err != nil {
return errors.Wrap(err, "invalid blob-config options")
}
repoConfig.ContentFormat.MutableParameters = m
repoConfig.RequiredFeatures = requiredFeatures
if err := f.EncryptRepositoryConfig(repoConfig, r.formatEncryptionKey); err != nil {
return errors.Errorf("unable to encrypt format bytes")
}
if err := f.WriteBlobCfgBlob(ctx, r.blobs, blobcfg, r.formatEncryptionKey); err != nil {
return errors.Wrap(err, "unable to write blobcfg blob")
}
if err := f.WriteKopiaRepositoryBlob(ctx, r.blobs, r.blobCfgBlob); err != nil {
return errors.Wrap(err, "unable to write format blob")
}
if cd := r.cachingOptions.CacheDirectory; cd != "" {
if err := os.Remove(filepath.Join(cd, format.KopiaRepositoryBlobID)); err != nil {
log(ctx).Errorf("unable to remove %s: %v", format.KopiaRepositoryBlobID, err)
}
if err := os.Remove(filepath.Join(cd, format.KopiaBlobCfgBlobID)); err != nil && !os.IsNotExist(err) {
log(ctx).Errorf("unable to remove %s: %v", format.KopiaBlobCfgBlobID, err)
}
}
return nil
}

View File

@@ -10,7 +10,6 @@
"go.opentelemetry.io/otel"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/feature"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/blob/throttling"
"github.com/kopia/kopia/repo/content"
@@ -54,7 +53,7 @@ type DirectRepository interface {
Repository
ObjectFormat() format.ObjectFormat
BlobCfg() format.BlobStorageConfiguration
FormatManager() *format.Manager
BlobReader() blob.Reader
BlobVolume() blob.Volume
ContentReader() content.Reader
@@ -66,7 +65,6 @@ type DirectRepository interface {
DeriveKey(purpose []byte, keyLength int) []byte
Token(password string) (string, error)
Throttler() throttling.SettableThrottler
RequiredFeatures() ([]feature.Required, error)
DisableIndexRefresh()
}
@@ -76,25 +74,22 @@ type DirectRepositoryWriter interface {
DirectRepository
BlobStorage() blob.Storage
ContentManager() *content.WriteManager
SetParameters(ctx context.Context, m format.MutableParameters, blobcfg format.BlobStorageConfiguration, requiredFeatures []feature.Required) error
ChangePassword(ctx context.Context, newPassword string) error
GetUpgradeLockIntent(ctx context.Context) (*format.UpgradeLockIntent, error)
SetUpgradeLockIntent(ctx context.Context, l format.UpgradeLockIntent) (*format.UpgradeLockIntent, error)
CommitUpgrade(ctx context.Context) error
RollbackUpgrade(ctx context.Context) error
// SetParameters(ctx context.Context, m format.MutableParameters, blobcfg format.BlobStorageConfiguration, requiredFeatures []feature.Required) error
// ChangePassword(ctx context.Context, newPassword string) error
// GetUpgradeLockIntent(ctx context.Context) (*format.UpgradeLockIntent, error)
// SetUpgradeLockIntent(ctx context.Context, l format.UpgradeLockIntent) (*format.UpgradeLockIntent, error)
// CommitUpgrade(ctx context.Context) error
// RollbackUpgrade(ctx context.Context) error
}
type directRepositoryParameters struct {
uniqueID []byte
configFile string
cachingOptions content.CachingOptions
cliOpts ClientOptions
timeNow func() time.Time
formatBlob *format.KopiaRepositoryJSON
blobCfgBlob format.BlobStorageConfiguration
formatEncryptionKey []byte
nextWriterID *int32
throttler throttling.SettableThrottler
configFile string
cachingOptions content.CachingOptions
cliOpts ClientOptions
timeNow func() time.Time
fmgr *format.Manager
nextWriterID *int32
throttler throttling.SettableThrottler
}
// directRepository is an implementation of repository that directly manipulates underlying storage.
@@ -113,13 +108,13 @@ type directRepository struct {
// DeriveKey derives encryption key of the provided length from the master key.
func (r *directRepository) DeriveKey(purpose []byte, keyLength int) []byte {
if r.cmgr.ContentFormat().SupportsPasswordChange() {
return format.DeriveKeyFromMasterKey(r.cmgr.ContentFormat().GetMasterKey(), r.uniqueID, purpose, keyLength)
return format.DeriveKeyFromMasterKey(r.cmgr.ContentFormat().GetMasterKey(), r.UniqueID(), purpose, keyLength)
}
// version of kopia <v0.9 had a bug where certain keys were derived directly from
// the password and not from the random master key. This made it impossible to change
// password.
return format.DeriveKeyFromMasterKey(r.formatEncryptionKey, r.uniqueID, purpose, keyLength)
return format.DeriveKeyFromMasterKey(r.fmgr.FormatEncryptionKey(), r.UniqueID(), purpose, keyLength)
}
// ClientOptions returns client options.
@@ -308,7 +303,7 @@ func (r *directRepository) ObjectFormat() format.ObjectFormat {
// UniqueID returns unique repository ID from which many keys and secrets are derived.
func (r *directRepository) UniqueID() []byte {
return r.uniqueID
return r.fmgr.UniqueID()
}
// BlobReader returns the blob reader.
@@ -342,8 +337,9 @@ func (r *directRepository) Time() time.Time {
return defaultTime(r.timeNow)()
}
func (r *directRepository) BlobCfg() format.BlobStorageConfiguration {
return r.directRepositoryParameters.blobCfgBlob
// FormatManager returns the format manager.
func (r *directRepository) FormatManager() *format.Manager {
return r.fmgr
}
// WriteSessionOptions describes options for a write session.

View File

@@ -405,7 +405,7 @@ func TestInitializeWithBlobCfgRetentionBlob(t *testing.T) {
// verify that the blobcfg retention blob is created
require.NoError(t, env.RepositoryWriter.BlobStorage().GetBlob(ctx, format.KopiaBlobCfgBlobID, 0, -1, &d))
require.NoError(t, env.RepositoryWriter.ChangePassword(ctx, "new-password"))
require.NoError(t, env.RepositoryWriter.FormatManager().ChangePassword(ctx, "new-password"))
// verify that the blobcfg retention blob is created and is different after
// password-change
require.NoError(t, env.RepositoryWriter.BlobStorage().GetBlob(ctx, format.KopiaBlobCfgBlobID, 0, -1, &d))
@@ -425,7 +425,7 @@ func TestInitializeWithBlobCfgRetentionBlob(t *testing.T) {
// verify that we error out on corrupted blobcfg blob
_, err := repo.Open(ctx, env.ConfigFile(), env.Password, &repo.Options{})
require.EqualError(t, err, "invalid repository password")
require.ErrorContains(t, err, "invalid repository password")
// restore the original blob
require.NoError(t, env.RepositoryWriter.BlobStorage().PutBlob(ctx, format.KopiaBlobCfgBlobID, d.Bytes(), blob.PutOptions{}))
@@ -637,9 +637,9 @@ func (s *formatSpecificTestSuite) TestWriteSessionFlushOnFailure(t *testing.T) {
func (s *formatSpecificTestSuite) TestChangePassword(t *testing.T) {
ctx, env := repotesting.NewEnvironment(t, s.formatVersion)
if s.formatVersion == format.FormatVersion1 {
require.Error(t, env.RepositoryWriter.ChangePassword(ctx, "new-password"))
require.Error(t, env.RepositoryWriter.FormatManager().ChangePassword(ctx, "new-password"))
} else {
require.NoError(t, env.RepositoryWriter.ChangePassword(ctx, "new-password"))
require.NoError(t, env.RepositoryWriter.FormatManager().ChangePassword(ctx, "new-password"))
r, err := repo.Open(ctx, env.RepositoryWriter.ConfigFilename(), "new-password", nil)
require.NoError(t, err)
@@ -690,13 +690,16 @@ func TestDeriveKey(t *testing.T) {
mp, mperr := cf.GetMutableParameters()
require.NoError(t, mperr)
feat, err := dw1Upgraded.RequiredFeatures()
feat, err := dw1Upgraded.FormatManager().RequiredFeatures()
require.NoError(t, err)
// perform upgrade
mp.Version = v2
require.NoError(t, dw1Upgraded.SetParameters(ctx, mp, dw1Upgraded.BlobCfg(), feat))
blobCfg, err := dw1Upgraded.FormatManager().BlobCfgBlob()
require.NoError(t, err)
require.NoError(t, dw1Upgraded.FormatManager().SetParameters(ctx, mp, blobCfg, feat))
return env.MustConnectOpenAnother(t).(repo.DirectRepositoryWriter)
}
@@ -718,7 +721,10 @@ func TestDeriveKey(t *testing.T) {
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
require.Equal(t, tc.wantFormat, tc.dw.ContentReader().ContentFormat().Struct().Version)
mp, err := tc.dw.FormatManager().GetMutableParameters()
require.NoError(t, err)
require.Equal(t, tc.wantFormat, mp.Version)
require.Equal(t, tc.wantKey, tc.dw.DeriveKey(testPurpose, testKeyLength))
})
}

View File

@@ -1,202 +0,0 @@
package repo
import (
"context"
"os"
"path/filepath"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/format"
)
// FormatBlobBackupIDPrefix is the prefix for all identifiers of the BLOBs that
// keep a backup copy of the FormatBlobID BLOB for the purposes of rollback
// during upgrade.
const FormatBlobBackupIDPrefix = "kopia.repository.backup."
// FormatBlobBackupID gets the upgrade backu pblob-id fro mthe lock.
func FormatBlobBackupID(l format.UpgradeLockIntent) blob.ID {
return blob.ID(FormatBlobBackupIDPrefix + l.OwnerID)
}
func (r *directRepository) updateRepoConfig(ctx context.Context, cb func(repoConfig *format.RepositoryConfig) error) (*format.RepositoryConfig, error) {
f := r.formatBlob
repoConfig, err := f.DecryptRepositoryConfig(r.formatEncryptionKey)
if err != nil {
return nil, errors.Wrap(err, "unable to decrypt repository config")
}
if err := cb(repoConfig); err != nil {
return nil, err
}
if err := f.EncryptRepositoryConfig(repoConfig, r.formatEncryptionKey); err != nil {
return nil, errors.Errorf("unable to encrypt format bytes")
}
if err := f.WriteKopiaRepositoryBlob(ctx, r.blobs, r.blobCfgBlob); err != nil {
return nil, errors.Wrap(err, "unable to write format blob")
}
if cd := r.cachingOptions.CacheDirectory; cd != "" {
if err := os.Remove(filepath.Join(cd, format.KopiaRepositoryBlobID)); err != nil && !os.IsNotExist(err) {
return nil, errors.Errorf("unable to remove cached repository format blob: %v", err)
}
}
return repoConfig, nil
}
// SetUpgradeLockIntent sets the upgrade lock intent on the repository format
// blob for other clients to notice. If a lock intent was already placed then
// it updates the existing lock using the output of the UpgradeLock.Update().
//
// This method also backs up the original format version on the upgrade lock
// intent and sets the latest format-version o nthe repository blob. This
// should cause the unsupporting clients (non-upgrade capable) to fail
// connecting to the repository.
func (r *directRepository) SetUpgradeLockIntent(ctx context.Context, l format.UpgradeLockIntent) (*format.UpgradeLockIntent, error) {
repoConfig, err := r.updateRepoConfig(ctx, func(repoConfig *format.RepositoryConfig) error {
if err := l.Validate(); err != nil {
return errors.Wrap(err, "invalid upgrade lock intent")
}
if repoConfig.UpgradeLock == nil {
// when we are putting a new lock then ensure that we can upgrade
// to that version
if repoConfig.ContentFormat.Version >= format.MaxFormatVersion {
return errors.Errorf("repository is using version %d, and version %d is the maximum",
repoConfig.ContentFormat.Version, format.MaxFormatVersion)
}
// backup the current repository config from local cache to the
// repository when we place the lock for the first time
if err := r.formatBlob.WriteKopiaRepositoryBlobWithID(ctx, r.blobs, r.blobCfgBlob, FormatBlobBackupID(l)); err != nil {
return errors.Wrap(err, "failed to backup the repo format blob")
}
// set a new lock or revoke an existing lock
repoConfig.UpgradeLock = &l
// mark the upgrade to the new format version, this will ensure that older
// clients won't be able to parse the new version
repoConfig.ContentFormat.Version = format.MaxFormatVersion
} else if newL, err := repoConfig.UpgradeLock.Update(&l); err == nil {
repoConfig.UpgradeLock = newL
} else {
return errors.Wrap(err, "failed to update the existing lock")
}
return nil
})
if err != nil {
return nil, err
}
return repoConfig.UpgradeLock, nil
}
// CommitUpgrade removes the upgrade lock from the from the repository format
// blob. This in-effect commits the new repository format t othe repository and
// resumes all access to the repository.
func (r *directRepository) CommitUpgrade(ctx context.Context) error {
_, err := r.updateRepoConfig(ctx, func(repoConfig *format.RepositoryConfig) error {
if repoConfig.UpgradeLock == nil {
return errors.New("no upgrade in progress")
}
// restore the old format version
repoConfig.UpgradeLock = nil
return nil
})
return err
}
// RollbackUpgrade removes the upgrade lock while also restoring the
// format-blob's original version. This method does not restore the original
// repository data format and neither does it validate against any repository
// changes. Rolling back the repository format is currently not supported and
// hence using this API could render the repository corrupted and unreadable by
// clients.
//
//nolint:gocyclo
func (r *directRepository) RollbackUpgrade(ctx context.Context) error {
f := r.formatBlob
repoConfig, err := f.DecryptRepositoryConfig(r.formatEncryptionKey)
if err != nil {
return errors.Wrap(err, "unable to decrypt repository config")
}
if repoConfig.UpgradeLock == nil {
return errors.New("no upgrade in progress")
}
// restore the oldest backup and delete the rest
var oldestBackup *blob.Metadata
if err = r.blobs.ListBlobs(ctx, FormatBlobBackupIDPrefix, func(bm blob.Metadata) error {
var delID blob.ID
if oldestBackup == nil || bm.Timestamp.Before(oldestBackup.Timestamp) {
if oldestBackup != nil {
// delete the current candidate because we have found an even older one
delID = oldestBackup.BlobID
}
oldestBackup = &bm
} else {
delID = bm.BlobID
}
if delID != "" {
// delete the backup that we are not going to need for rollback
if err = r.blobs.DeleteBlob(ctx, delID); err != nil {
return errors.Wrapf(err, "failed to delete the format blob backup %q", delID)
}
}
return nil
}); err != nil {
return errors.Wrap(err, "failed to list backup blobs")
}
// restore only when we find a backup, otherwise simply cleanup the local cache
if oldestBackup != nil {
var d gather.WriteBuffer
if err = r.blobs.GetBlob(ctx, oldestBackup.BlobID, 0, -1, &d); err != nil {
return errors.Wrapf(err, "failed to read from backup %q", oldestBackup.BlobID)
}
if err = r.blobs.PutBlob(ctx, format.KopiaRepositoryBlobID, d.Bytes(), blob.PutOptions{}); err != nil {
return errors.Wrapf(err, "failed to restore format blob from backup %q", oldestBackup.BlobID)
}
// delete the backup after we have restored the format-blob
if err = r.blobs.DeleteBlob(ctx, oldestBackup.BlobID); err != nil {
return errors.Wrapf(err, "failed to delete the format blob backup %q", oldestBackup.BlobID)
}
}
if cd := r.cachingOptions.CacheDirectory; cd != "" {
if err = os.Remove(filepath.Join(cd, format.KopiaRepositoryBlobID)); err != nil && !os.IsNotExist(err) {
return errors.Errorf("unable to remove cached repository format blob: %v", err)
}
}
return nil
}
func (r *directRepository) GetUpgradeLockIntent(ctx context.Context) (*format.UpgradeLockIntent, error) {
f := r.formatBlob
repoConfig, err := f.DecryptRepositoryConfig(r.formatEncryptionKey)
if err != nil {
return nil, errors.Wrap(err, "unable to decrypt repository config")
}
return repoConfig.UpgradeLock, nil
}

View File

@@ -208,7 +208,7 @@ func (s *formatSpecificTestSuite) TestSnapshotGCMinContentAgeSafety(t *testing.T
// Advance time so the first content created above is close fall of the
// SafetyFull.MinContentAgeSubjectToGC window. Leave a small buffer
// of 10 seconds below for "passage of time" between now an when snapshot
// of 20 seconds below for "passage of time" between now an when snapshot
// GC actually starts.
// Note: The duration of this buffer is a "magical number" that depends on
// the number of times time is advanced between "now" and when snapshot
@@ -217,7 +217,7 @@ func (s *formatSpecificTestSuite) TestSnapshotGCMinContentAgeSafety(t *testing.T
require.NoError(t, err)
require.NotEmpty(t, ci)
timeAdvance := safety.MinContentAgeSubjectToGC - th.fakeTime.NowFunc()().Sub(ci.Timestamp()) - 10*time.Second
timeAdvance := safety.MinContentAgeSubjectToGC - th.fakeTime.NowFunc()().Sub(ci.Timestamp()) - 20*time.Second
require.Positive(t, timeAdvance)
th.fakeTime.Advance(timeAdvance)