refactor(repository): introduce interface for reading FormattingOptions (#2235)

Instead of passing static content.FormattingOptions (and caching it)
we now introduce an interface to provide its values.

This will allow the values to dynamically change at runtime in the
future to support cases like live migration.
This commit is contained in:
Jarek Kowalski
2022-07-28 17:27:04 -07:00
committed by GitHub
parent 71ebf14de2
commit 56f3046d8a
29 changed files with 445 additions and 221 deletions

View File

@@ -113,7 +113,7 @@ func (c *commandRepositorySetParameters) setRetentionModeParameter(ctx context.C
func (c *commandRepositorySetParameters) run(ctx context.Context, rep repo.DirectRepositoryWriter) error {
var anyChange bool
mp := rep.ContentReader().ContentFormat().MutableParameters
mp := rep.ContentReader().ContentFormat().GetMutableParameters()
blobcfg := rep.BlobCfg()
upgradeToEpochManager := false

View File

@@ -63,8 +63,8 @@ func (c *commandRepositoryStatus) outputJSON(ctx context.Context, r repo.Reposit
s.UniqueIDHex = hex.EncodeToString(dr.UniqueID())
s.ObjectFormat = dr.ObjectFormat()
s.BlobRetention = dr.BlobCfg()
s.Storage = scrubber.ScrubSensitiveData(reflect.ValueOf(ci)).Interface().(blob.ConnectionInfo) // nolint:forcetypeassert
s.ContentFormat = scrubber.ScrubSensitiveData(reflect.ValueOf(dr.ContentReader().ContentFormat())).Interface().(content.FormattingOptions) // nolint:forcetypeassert
s.Storage = scrubber.ScrubSensitiveData(reflect.ValueOf(ci)).Interface().(blob.ConnectionInfo) // nolint:forcetypeassert
s.ContentFormat = scrubber.ScrubSensitiveData(reflect.ValueOf(dr.ContentReader().ContentFormat().Struct())).Interface().(content.FormattingOptions) // nolint:forcetypeassert
switch cp, err := dr.BlobVolume().GetCapacity(ctx); {
case err == nil:
@@ -170,17 +170,19 @@ func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository)
c.out.printStdout("Storage config: %v\n", string(cjson))
}
contentFormat := dr.ContentReader().ContentFormat()
c.out.printStdout("\n")
c.out.printStdout("Unique ID: %x\n", dr.UniqueID())
c.out.printStdout("Hash: %v\n", dr.ContentReader().ContentFormat().Hash)
c.out.printStdout("Encryption: %v\n", dr.ContentReader().ContentFormat().Encryption)
c.out.printStdout("Hash: %v\n", contentFormat.GetHashFunction())
c.out.printStdout("Encryption: %v\n", contentFormat.GetEncryptionAlgorithm())
c.out.printStdout("Splitter: %v\n", dr.ObjectFormat().Splitter)
c.out.printStdout("Format version: %v\n", dr.ContentReader().ContentFormat().Version)
c.out.printStdout("Format version: %v\n", contentFormat.FormatVersion())
c.out.printStdout("Content compression: %v\n", dr.ContentReader().SupportsContentCompression())
c.out.printStdout("Password changes: %v\n", dr.ContentReader().ContentFormat().EnablePasswordChange)
c.out.printStdout("Password changes: %v\n", contentFormat.SupportsPasswordChange())
c.out.printStdout("Max pack length: %v\n", units.BytesStringBase2(int64(dr.ContentReader().ContentFormat().MaxPackSize)))
c.out.printStdout("Index Format: v%v\n", dr.ContentReader().ContentFormat().IndexVersion)
c.out.printStdout("Max pack length: %v\n", units.BytesStringBase2(int64(contentFormat.MaxPackBlobSize())))
c.out.printStdout("Index Format: v%v\n", contentFormat.WriteIndexVersion())
if emgr, ok := dr.ContentReader().EpochManager(); ok {
c.out.printStdout("\n")
@@ -192,10 +194,10 @@ func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository)
}
c.out.printStdout("\n")
c.out.printStdout("Epoch refresh frequency: %v\n", emgr.Params.EpochRefreshFrequency)
c.out.printStdout("Epoch advance on: %v blobs or %v, minimum %v\n", emgr.Params.EpochAdvanceOnCountThreshold, units.BytesStringBase2(emgr.Params.EpochAdvanceOnTotalSizeBytesThreshold), emgr.Params.MinEpochDuration)
c.out.printStdout("Epoch cleanup margin: %v\n", emgr.Params.CleanupSafetyMargin)
c.out.printStdout("Epoch checkpoint every: %v epochs\n", emgr.Params.FullCheckpointFrequency)
c.out.printStdout("Epoch refresh frequency: %v\n", contentFormat.GetEpochRefreshFrequency())
c.out.printStdout("Epoch advance on: %v blobs or %v, minimum %v\n", contentFormat.GetEpochAdvanceOnCountThreshold(), units.BytesStringBase2(contentFormat.GetEpochAdvanceOnTotalSizeBytesThreshold()), contentFormat.GetMinEpochDuration())
c.out.printStdout("Epoch cleanup margin: %v\n", contentFormat.GetEpochCleanupSafetyMargin())
c.out.printStdout("Epoch checkpoint every: %v epochs\n", contentFormat.GetEpochFullCheckpointFrequency())
} else {
c.out.printStdout("Epoch Manager: disabled\n")
}

View File

@@ -122,7 +122,7 @@ func (c *commandRepositoryUpgrade) setLockIntent(ctx context.Context, rep repo.D
}
now := rep.Time()
mp := rep.ContentReader().ContentFormat().MutableParameters
mp := rep.ContentReader().ContentFormat().GetMutableParameters()
openOpts := c.svc.optionsFromFlags(ctx)
l := &repo.UpgradeLockIntent{
OwnerID: openOpts.UpgradeOwnerID,
@@ -163,7 +163,7 @@ func (c *commandRepositoryUpgrade) setLockIntent(ctx context.Context, rep repo.D
// skipped until the lock is fully established.
func (c *commandRepositoryUpgrade) drainOrCommit(ctx context.Context, rep repo.DirectRepositoryWriter) error {
cf := rep.ContentReader().ContentFormat()
if cf.MutableParameters.EpochParameters.Enabled {
if cf.GetEpochManagerEnabled() {
log(ctx).Infof("Repository indices have already been migrated to the epoch format, no need to drain other clients")
l, err := rep.GetUpgradeLockIntent(ctx)
@@ -250,7 +250,7 @@ func (c *commandRepositoryUpgrade) drainAllClients(ctx context.Context, rep repo
// repository. This phase runs after the lock has been acquired in one of the
// prior phases.
func (c *commandRepositoryUpgrade) upgrade(ctx context.Context, rep repo.DirectRepositoryWriter) error {
mp := rep.ContentReader().ContentFormat().MutableParameters
mp := rep.ContentReader().ContentFormat().GetMutableParameters()
if mp.EpochParameters.Enabled {
// nothing to upgrade on format, so let the next action commit the upgraded format blob
return nil

View File

@@ -29,6 +29,33 @@
maxRefreshAttemptSleepExponent = 1.5
)
// ParametersProvider provides epoch manager parameters.
type ParametersProvider interface {
// whether epoch manager is enabled, must be true.
GetEpochManagerEnabled() bool
// how frequently each client will list blobs to determine the current epoch.
GetEpochRefreshFrequency() time.Duration
// number of epochs between full checkpoints.
GetEpochFullCheckpointFrequency() int
// do not delete uncompacted blobs if the corresponding compacted blob age is less than this.
GetEpochCleanupSafetyMargin() time.Duration
// minimum duration of an epoch
GetMinEpochDuration() time.Duration
// advance epoch if number of files exceeds this
GetEpochAdvanceOnCountThreshold() int
// advance epoch if total size of files exceeds this.
GetEpochAdvanceOnTotalSizeBytesThreshold() int64
// number of blobs to delete in parallel during cleanup
GetEpochDeleteParallelism() int
}
// ErrVerySlowIndexWrite is returned by WriteIndex if a write takes more than 2 epochs (usually >48h).
// This is theoretically possible with laptops going to sleep, etc.
var ErrVerySlowIndexWrite = errors.Errorf("extremely slow index write - index write took more than two epochs")
@@ -60,6 +87,46 @@ type Parameters struct {
DeleteParallelism int
}
// GetEpochManagerEnabled returns whether epoch manager is enabled, must be true.
func (p *Parameters) GetEpochManagerEnabled() bool {
return p.Enabled
}
// GetEpochRefreshFrequency determines how frequently each client will list blobs to determine the current epoch.
func (p *Parameters) GetEpochRefreshFrequency() time.Duration {
return p.EpochRefreshFrequency
}
// GetEpochFullCheckpointFrequency returns the number of epochs between full checkpoints.
func (p *Parameters) GetEpochFullCheckpointFrequency() int {
return p.FullCheckpointFrequency
}
// GetEpochCleanupSafetyMargin returns safety margin to prevent uncompacted blobs from being deleted if the corresponding compacted blob age is less than this.
func (p *Parameters) GetEpochCleanupSafetyMargin() time.Duration {
return p.CleanupSafetyMargin
}
// GetMinEpochDuration returns the minimum duration of an epoch.
func (p *Parameters) GetMinEpochDuration() time.Duration {
return p.MinEpochDuration
}
// GetEpochAdvanceOnCountThreshold returns the number of files above which epoch should be advanced.
func (p *Parameters) GetEpochAdvanceOnCountThreshold() int {
return p.EpochAdvanceOnCountThreshold
}
// GetEpochAdvanceOnTotalSizeBytesThreshold returns the total size of files above which the epoch should be advanced.
func (p *Parameters) GetEpochAdvanceOnTotalSizeBytesThreshold() int64 {
return p.EpochAdvanceOnTotalSizeBytesThreshold
}
// GetEpochDeleteParallelism returns the number of blobs to delete in parallel during cleanup.
func (p *Parameters) GetEpochDeleteParallelism() int {
return p.DeleteParallelism
}
// Validate validates epoch parameters.
// nolint:gomnd
func (p *Parameters) Validate() error {
@@ -129,7 +196,7 @@ func (cs *CurrentSnapshot) isSettledEpochNumber(epoch int) bool {
// Manager manages repository epochs.
type Manager struct {
Params Parameters
Parameters ParametersProvider
st blob.Storage
compact CompactionFunc
@@ -264,7 +331,7 @@ func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot) error
// only delete blobs if a suitable replacement exists and has been written sufficiently
// long ago. we don't want to delete blobs that are created too recently, because other clients
// may have not observed them yet.
maxReplacementTime := maxTime.Add(-e.Params.CleanupSafetyMargin)
maxReplacementTime := maxTime.Add(-e.Parameters.GetEpochCleanupSafetyMargin())
eg.Go(func() error {
return e.cleanupEpochMarkers(ctx, cs)
@@ -289,7 +356,7 @@ func (e *Manager) cleanupEpochMarkers(ctx context.Context, cs CurrentSnapshot) e
}
}
return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.Params.DeleteParallelism), "error deleting index blob marker")
return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.Parameters.GetEpochDeleteParallelism()), "error deleting index blob marker")
}
func (e *Manager) cleanupWatermarks(ctx context.Context, cs CurrentSnapshot, maxReplacementTime time.Time) error {
@@ -310,7 +377,7 @@ func (e *Manager) cleanupWatermarks(ctx context.Context, cs CurrentSnapshot, max
}
}
return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.Params.DeleteParallelism), "error deleting watermark blobs")
return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.Parameters.GetEpochDeleteParallelism()), "error deleting watermark blobs")
}
// CleanupSupersededIndexes cleans up the indexes which have been superseded by compacted ones.
@@ -331,7 +398,7 @@ func (e *Manager) CleanupSupersededIndexes(ctx context.Context) error {
// only delete blobs if a suitable replacement exists and has been written sufficiently
// long ago. we don't want to delete blobs that are created too recently, because other clients
// may have not observed them yet.
maxReplacementTime := maxTime.Add(-e.Params.CleanupSafetyMargin)
maxReplacementTime := maxTime.Add(-e.Parameters.GetEpochCleanupSafetyMargin())
e.log.Debugw("Cleaning up superseded index blobs...",
"maxReplacementTime", maxReplacementTime)
@@ -353,7 +420,7 @@ func (e *Manager) CleanupSupersededIndexes(ctx context.Context) error {
}
}
if err := blob.DeleteMultiple(ctx, e.st, toDelete, e.Params.DeleteParallelism); err != nil {
if err := blob.DeleteMultiple(ctx, e.st, toDelete, e.Parameters.GetEpochDeleteParallelism()); err != nil {
return errors.Wrap(err, "unable to delete uncompacted blobs")
}
@@ -378,7 +445,7 @@ func (e *Manager) refreshLocked(ctx context.Context) error {
nextDelayTime := initiaRefreshAttemptSleep
if !e.Params.Enabled {
if !e.Parameters.GetEpochManagerEnabled() {
return errors.Errorf("epoch manager not enabled")
}
@@ -497,7 +564,7 @@ func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs
firstNonRangeCompacted = cs.LongestRangeCheckpointSets[len(cs.LongestRangeCheckpointSets)-1].MaxEpoch + 1
}
if latestSettled-firstNonRangeCompacted < e.Params.FullCheckpointFrequency {
if latestSettled-firstNonRangeCompacted < e.Parameters.GetEpochFullCheckpointFrequency() {
e.log.Debugf("not generating range checkpoint")
return
@@ -575,7 +642,7 @@ func (e *Manager) refreshAttemptLocked(ctx context.Context) error {
EpochStartTime: map[int]time.Time{},
UncompactedEpochSets: map[int][]blob.Metadata{},
SingleEpochCompactionSets: map[int][]blob.Metadata{},
ValidUntil: e.timeFunc().Add(e.Params.EpochRefreshFrequency),
ValidUntil: e.timeFunc().Add(e.Parameters.GetEpochRefreshFrequency()),
}
e.log.Debugf("refreshAttemptLocked")
@@ -613,7 +680,7 @@ func (e *Manager) refreshAttemptLocked(ctx context.Context) error {
len(ues[cs.WriteEpoch+1]),
cs.ValidUntil.Format(time.RFC3339Nano))
if shouldAdvance(cs.UncompactedEpochSets[cs.WriteEpoch], e.Params.MinEpochDuration, e.Params.EpochAdvanceOnCountThreshold, e.Params.EpochAdvanceOnTotalSizeBytesThreshold) {
if shouldAdvance(cs.UncompactedEpochSets[cs.WriteEpoch], e.Parameters.GetMinEpochDuration(), e.Parameters.GetEpochAdvanceOnCountThreshold(), e.Parameters.GetEpochAdvanceOnTotalSizeBytesThreshold()) {
if err := e.advanceEpoch(ctx, cs); err != nil {
return errors.Wrap(err, "error advancing epoch")
}
@@ -700,7 +767,7 @@ func (e *Manager) WriteIndex(ctx context.Context, dataShards map[blob.ID]blob.By
for {
// make sure we have at least 75% of remaining time
// nolint:gomnd
cs, err := e.committedState(ctx, 3*e.Params.EpochRefreshFrequency/4)
cs, err := e.committedState(ctx, 3*e.Parameters.GetEpochRefreshFrequency()/4)
if err != nil {
return nil, errors.Wrap(err, "error getting committed state")
}
@@ -924,13 +991,13 @@ func rangeCheckpointBlobPrefix(epoch1, epoch2 int) blob.ID {
}
// NewManager creates new epoch manager.
func NewManager(st blob.Storage, params Parameters, compactor CompactionFunc, log logging.Logger, timeNow func() time.Time) *Manager {
func NewManager(st blob.Storage, paramProvider ParametersProvider, compactor CompactionFunc, log logging.Logger, timeNow func() time.Time) *Manager {
return &Manager{
st: st,
log: log,
compact: compactor,
timeFunc: timeNow,
Params: params,
Parameters: paramProvider,
getCompleteIndexSetTooSlow: new(int32),
committedStateRefreshTooSlow: new(int32),
writeIndexTooSlow: new(int32),

View File

@@ -92,7 +92,7 @@ func newTestEnv(t *testing.T) *epochManagerTestEnv {
st = fs
st = logging.NewWrapper(st, testlogging.NewTestLogger(t), "[STORAGE] ")
te := &epochManagerTestEnv{unloggedst: unloggedst, st: st, ft: ft}
m := NewManager(te.st, Parameters{
m := NewManager(te.st, &Parameters{
Enabled: true,
EpochRefreshFrequency: 20 * time.Minute,
FullCheckpointFrequency: 7,
@@ -121,7 +121,7 @@ func (te *epochManagerTestEnv) another() *epochManagerTestEnv {
faultyStorage: te.faultyStorage,
}
te2.mgr = NewManager(te2.st, te.mgr.Params, te2.compact, te.mgr.log, te.mgr.timeFunc)
te2.mgr = NewManager(te2.st, te.mgr.Parameters, te2.compact, te.mgr.log, te.mgr.timeFunc)
return te2
}
@@ -577,7 +577,7 @@ func verifySequentialWrites(t *testing.T, te *epochManagerTestEnv) {
func TestIndexEpochManager_Disabled(t *testing.T) {
te := newTestEnv(t)
te.mgr.Params.Enabled = false
(te.mgr.Parameters).(*Parameters).Enabled = false
_, err := te.mgr.Current(testlogging.Context(t))
require.Error(t, err)

View File

@@ -34,8 +34,8 @@ func handleRepoParameters(ctx context.Context, rc requestContext) (interface{},
}
rp := &remoterepoapi.Parameters{
HashFunction: dr.ContentReader().ContentFormat().Hash,
HMACSecret: dr.ContentReader().ContentFormat().HMACSecret,
HashFunction: dr.ContentReader().ContentFormat().GetHashFunction(),
HMACSecret: dr.ContentReader().ContentFormat().GetHmacSecret(),
Format: dr.ObjectFormat(),
SupportsContentCompression: dr.ContentReader().SupportsContentCompression(),
}
@@ -56,9 +56,9 @@ func handleRepoStatus(ctx context.Context, rc requestContext) (interface{}, *api
return &serverapi.StatusResponse{
Connected: true,
ConfigFile: dr.ConfigFilename(),
Hash: dr.ContentReader().ContentFormat().Hash,
Encryption: dr.ContentReader().ContentFormat().Encryption,
MaxPackSize: dr.ContentReader().ContentFormat().MaxPackSize,
Hash: dr.ContentReader().ContentFormat().GetHashFunction(),
Encryption: dr.ContentReader().ContentFormat().GetEncryptionAlgorithm(),
MaxPackSize: dr.ContentReader().ContentFormat().MaxPackBlobSize(),
Splitter: dr.ObjectFormat().Splitter,
Storage: dr.BlobReader().ConnectionInfo().Type,
ClientOptions: dr.ClientOptions(),

View File

@@ -459,8 +459,8 @@ func (s *Server) handleInitialSessionHandshake(srv grpcapi.KopiaRepository_Sessi
Response: &grpcapi.SessionResponse_InitializeSession{
InitializeSession: &grpcapi.InitializeSessionResponse{
Parameters: &grpcapi.RepositoryParameters{
HashFunction: dr.ContentReader().ContentFormat().Hash,
HmacSecret: dr.ContentReader().ContentFormat().HMACSecret,
HashFunction: dr.ContentReader().ContentFormat().GetHashFunction(),
HmacSecret: dr.ContentReader().ContentFormat().GetHmacSecret(),
Splitter: dr.ObjectFormat().Splitter,
SupportsContentCompression: dr.ContentReader().SupportsContentCompression(),
},

View File

@@ -21,7 +21,6 @@
"github.com/kopia/kopia/repo/blob/filesystem"
"github.com/kopia/kopia/repo/blob/sharded"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content/index"
"github.com/kopia/kopia/repo/hashing"
"github.com/kopia/kopia/repo/logging"
)
@@ -70,6 +69,11 @@ type indexBlobManager interface {
invalidate(ctx context.Context)
}
// CrypterProvider returns the crypter to use for encrypting contents and blobs.
type CrypterProvider interface {
Crypter() *Crypter
}
// SharedManager is responsible for read-only access to committed data.
type SharedManager struct {
// +checkatomic
@@ -80,7 +84,6 @@ type SharedManager struct {
Stats *Stats
st blob.Storage
indexBlobManager indexBlobManager // points at either indexBlobManagerV0 or indexBlobManagerV1
indexBlobManagerV0 *indexBlobManagerV0
indexBlobManagerV1 *indexBlobManagerV1
@@ -88,7 +91,6 @@ type SharedManager struct {
metadataCache cache.ContentCache
indexBlobCache *cache.PersistentCache
committedContents *committedContentIndex
crypter *Crypter
enc *encryptedBlobMgr
timeNow func() time.Time
@@ -101,16 +103,12 @@ type SharedManager struct {
// +checklocks:indexesLock
refreshIndexesAfter time.Time
format FormattingOptions
format FormattingOptionsProvider
checkInvariantsOnUnlock bool
writeFormatVersion int32 // format version to write
maxPackSize int
minPreambleLength int
maxPreambleLength int
paddingUnit int
repositoryFormatBytes []byte
indexVersion int
indexShardSize int
// logger where logs should be written
log logging.Logger
@@ -123,7 +121,7 @@ type SharedManager struct {
// Crypter returns the crypter.
func (sm *SharedManager) Crypter() *Crypter {
return sm.crypter
return sm.format.Crypter()
}
func (sm *SharedManager) readPackFileLocalIndex(ctx context.Context, packFile blob.ID, packFileLength int64, output *gather.WriteBuffer) error {
@@ -198,13 +196,13 @@ func (sm *SharedManager) loadPackIndexesLocked(ctx context.Context) error {
}
if i > 0 {
sm.indexBlobManager.flushCache(ctx)
sm.indexBlobManager().flushCache(ctx)
sm.log.Debugf("encountered NOT_FOUND when loading, sleeping %v before retrying #%v", nextSleepTime, i)
time.Sleep(nextSleepTime)
nextSleepTime *= 2
}
indexBlobs, ignoreDeletedBefore, err := sm.indexBlobManager.listActiveIndexBlobs(ctx)
indexBlobs, ignoreDeletedBefore, err := sm.indexBlobManager().listActiveIndexBlobs(ctx)
if err != nil {
return errors.Wrap(err, "error listing index blobs")
}
@@ -246,6 +244,14 @@ func (sm *SharedManager) getCacheForContentID(id ID) cache.ContentCache {
return sm.contentCache
}
func (sm *SharedManager) indexBlobManager() indexBlobManager {
if sm.format.GetEpochManagerEnabled() {
return sm.indexBlobManagerV1
}
return sm.indexBlobManagerV0
}
func (sm *SharedManager) decryptContentAndVerify(payload gather.Bytes, bi Info, output *gather.WriteBuffer) error {
sm.Stats.readContent(payload.Length())
@@ -285,7 +291,7 @@ func (sm *SharedManager) decryptContentAndVerify(payload gather.Bytes, bi Info,
}
func (sm *SharedManager) decryptAndVerify(encrypted gather.Bytes, iv []byte, output *gather.WriteBuffer) error {
if err := sm.crypter.Encryptor.Decrypt(encrypted, iv, output); err != nil {
if err := sm.format.Crypter().Encryptor.Decrypt(encrypted, iv, output); err != nil {
sm.Stats.foundInvalidContent()
return errors.Wrap(err, "decrypt")
}
@@ -316,7 +322,7 @@ func (sm *SharedManager) IndexBlobs(ctx context.Context, includeInactive bool) (
return result, nil
}
blobs, _, err := sm.indexBlobManager.listActiveIndexBlobs(ctx)
blobs, _, err := sm.indexBlobManager().listActiveIndexBlobs(ctx)
// nolint:wrapcheck
return blobs, err
@@ -429,54 +435,49 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca
}
sm.enc = &encryptedBlobMgr{
st: cachedSt,
crypter: sm.crypter,
indexBlobCache: indexBlobCache,
log: sm.namedLogger("encrypted-blob-manager"),
st: cachedSt,
crypterProvider: sm.format,
indexBlobCache: indexBlobCache,
log: sm.namedLogger("encrypted-blob-manager"),
}
// set up legacy index blob manager
sm.indexBlobManagerV0 = &indexBlobManagerV0{
st: cachedSt,
enc: sm.enc,
timeNow: sm.timeNow,
maxPackSize: sm.maxPackSize,
indexVersion: sm.indexVersion,
indexShardSize: sm.indexShardSize,
log: sm.namedLogger("index-blob-manager"),
st: cachedSt,
enc: sm.enc,
timeNow: sm.timeNow,
formattingOptions: sm.format,
log: sm.namedLogger("index-blob-manager"),
}
// set up new index blob manager
sm.indexBlobManagerV1 = &indexBlobManagerV1{
st: cachedSt,
enc: sm.enc,
timeNow: sm.timeNow,
maxPackSize: sm.maxPackSize,
indexShardSize: sm.indexShardSize,
indexVersion: sm.indexVersion,
log: sm.namedLogger("index-blob-manager"),
st: cachedSt,
enc: sm.enc,
timeNow: sm.timeNow,
formattingOptions: sm.format,
log: sm.namedLogger("index-blob-manager"),
}
sm.indexBlobManagerV1.epochMgr = epoch.NewManager(cachedSt, sm.format.EpochParameters, sm.indexBlobManagerV1.compactEpoch, sm.namedLogger("epoch-manager"), sm.timeNow)
// select active index blob manager based on parameters
if sm.format.EpochParameters.Enabled {
sm.indexBlobManager = sm.indexBlobManagerV1
} else {
sm.indexBlobManager = sm.indexBlobManagerV0
}
sm.indexBlobManagerV1.epochMgr = epoch.NewManager(cachedSt, sm.format, sm.indexBlobManagerV1.compactEpoch, sm.namedLogger("epoch-manager"), sm.timeNow)
// once everything is ready, set it up
sm.contentCache = dataCache
sm.metadataCache = metadataCache
sm.indexBlobCache = indexBlobCache
sm.committedContents = newCommittedContentIndex(caching, uint32(sm.crypter.Encryptor.Overhead()), sm.indexVersion, sm.enc.getEncryptedBlob, sm.namedLogger("committed-content-index"), caching.MinIndexSweepAge.DurationOrDefault(DefaultIndexCacheSweepAge))
sm.committedContents = newCommittedContentIndex(caching,
uint32(sm.format.Crypter().Encryptor.Overhead()),
sm.format.WriteIndexVersion(),
sm.enc.getEncryptedBlob,
sm.namedLogger("committed-content-index"),
caching.MinIndexSweepAge.DurationOrDefault(DefaultIndexCacheSweepAge))
return nil
}
// EpochManager returns the epoch manager.
func (sm *SharedManager) EpochManager() (*epoch.Manager, bool) {
ibm1, ok := sm.indexBlobManager.(*indexBlobManagerV1)
ibm1, ok := sm.indexBlobManager().(*indexBlobManagerV1)
if !ok {
return nil, false
}
@@ -546,36 +547,14 @@ func (sm *SharedManager) shouldRefreshIndexes() bool {
}
// NewSharedManager returns SharedManager that is used by SessionWriteManagers on top of a repository.
func NewSharedManager(ctx context.Context, st blob.Storage, f *FormattingOptions, caching *CachingOptions, opts *ManagerOptions) (*SharedManager, error) {
func NewSharedManager(ctx context.Context, st blob.Storage, prov FormattingOptionsProvider, caching *CachingOptions, opts *ManagerOptions) (*SharedManager, error) {
opts = opts.CloneOrDefault()
if opts.TimeNow == nil {
opts.TimeNow = clock.Now
}
if f.Version < minSupportedReadVersion || f.Version > currentWriteVersion {
return nil, errors.Errorf("can't handle repositories created using version %v (min supported %v, max supported %v)", f.Version, minSupportedReadVersion, maxSupportedReadVersion)
}
if f.Version < minSupportedWriteVersion || f.Version > currentWriteVersion {
return nil, errors.Errorf("can't handle repositories created using version %v (min supported %v, max supported %v)", f.Version, minSupportedWriteVersion, maxSupportedWriteVersion)
}
crypter, err := CreateCrypter(f)
if err != nil {
return nil, err
}
actualIndexVersion := f.IndexVersion
if actualIndexVersion == 0 {
actualIndexVersion = legacyIndexVersion
}
if actualIndexVersion < index.Version1 || actualIndexVersion > index.Version2 {
return nil, errors.Errorf("index version %v is not supported", actualIndexVersion)
}
// create internal logger that will be writing logs as encrypted repository blobs.
ilm := newInternalLogManager(ctx, st, crypter)
ilm := newInternalLogManager(ctx, st, prov)
// sharedBaseLogger writes to the both context and internal log
// and is used as a base for all content manager components.
@@ -588,19 +567,13 @@ func NewSharedManager(ctx context.Context, st blob.Storage, f *FormattingOptions
sm := &SharedManager{
st: st,
crypter: crypter,
Stats: new(Stats),
timeNow: opts.TimeNow,
format: *f,
maxPackSize: f.MaxPackSize,
format: prov,
minPreambleLength: defaultMinPreambleLength,
maxPreambleLength: defaultMaxPreambleLength,
paddingUnit: defaultPaddingUnit,
repositoryFormatBytes: opts.RepositoryFormatBytes,
checkInvariantsOnUnlock: os.Getenv("KOPIA_VERIFY_INVARIANTS") != "",
writeFormatVersion: int32(f.Version),
indexVersion: actualIndexVersion,
indexShardSize: defaultIndexShardSize,
internalLogManager: ilm,
internalLogger: internalLog,
contextLogger: logging.Module(FormatLogModule)(ctx),

View File

@@ -100,7 +100,7 @@ func verifyEndToEndFormatter(ctx context.Context, t *testing.T, hashAlgo, encryp
keyTime := map[blob.ID]time.Time{}
st := blobtesting.NewMapStorage(data, keyTime, nil)
bm, err := NewManagerForTesting(testlogging.Context(t), st, &FormattingOptions{
bm, err := NewManagerForTesting(testlogging.Context(t), st, mustCreateFormatProvider(t, &FormattingOptions{
Hash: hashAlgo,
Encryption: encryptionAlgo,
HMACSecret: hmacSecret,
@@ -109,7 +109,7 @@ func verifyEndToEndFormatter(ctx context.Context, t *testing.T, hashAlgo, encryp
MaxPackSize: maxPackSize,
},
MasterKey: make([]byte, 32), // zero key, does not matter
}, nil, nil)
}), nil, nil)
if err != nil {
t.Errorf("can't create content manager with hash %v and encryption %v: %v", hashAlgo, encryptionAlgo, err.Error())
return
@@ -159,3 +159,12 @@ func verifyEndToEndFormatter(ctx context.Context, t *testing.T, hashAlgo, encryp
}
}
}
func mustCreateFormatProvider(t *testing.T, f *FormattingOptions) FormattingOptionsProvider {
t.Helper()
fop, err := NewFormattingOptionsProvider(f, nil)
require.NoError(t, err)
return fop
}

View File

@@ -9,6 +9,8 @@
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content/index"
"github.com/kopia/kopia/repo/encryption"
"github.com/kopia/kopia/repo/hashing"
)
const (
@@ -61,6 +63,16 @@ func (f *FormattingOptions) ResolveFormatVersion() error {
}
}
// GetMutableParameters implements FormattingOptionsProvider.
func (f *FormattingOptions) GetMutableParameters() MutableParameters {
return f.MutableParameters
}
// SupportsPasswordChange implements FormattingOptionsProvider.
func (f *FormattingOptions) SupportsPasswordChange() bool {
return f.EnablePasswordChange
}
// MutableParameters represents parameters of the content manager that can be mutated after the repository
// is created.
type MutableParameters struct {
@@ -111,6 +123,149 @@ func (f *FormattingOptions) GetHmacSecret() []byte {
return f.HMACSecret
}
// FormattingOptionsProvider provides current formatting options. The options returned
// should not be cached for more than a few seconds as they are subject to change.
type FormattingOptionsProvider interface {
epoch.ParametersProvider
IndexFormattingOptions
CrypterProvider
encryption.Parameters
hashing.Parameters
GetMutableParameters() MutableParameters
GetMasterKey() []byte
SupportsPasswordChange() bool
FormatVersion() FormatVersion
MaxPackBlobSize() int
RepositoryFormatBytes() []byte
Struct() FormattingOptions
}
type formattingOptionsProvider struct {
*FormattingOptions
crypter *Crypter
actualFormatVersion FormatVersion
actualIndexVersion int
formatBytes []byte
}
func (f *formattingOptionsProvider) FormatVersion() FormatVersion {
return f.Version
}
// whether epoch manager is enabled, must be true.
func (f *formattingOptionsProvider) GetEpochManagerEnabled() bool {
return f.EpochParameters.Enabled
}
// how frequently each client will list blobs to determine the current epoch.
func (f *formattingOptionsProvider) GetEpochRefreshFrequency() time.Duration {
return f.EpochParameters.EpochRefreshFrequency
}
// number of epochs between full checkpoints.
func (f *formattingOptionsProvider) GetEpochFullCheckpointFrequency() int {
return f.EpochParameters.FullCheckpointFrequency
}
// GetEpochCleanupSafetyMargin returns safety margin to prevent uncompacted blobs from being deleted if the corresponding compacted blob age is less than this.
func (f *formattingOptionsProvider) GetEpochCleanupSafetyMargin() time.Duration {
return f.EpochParameters.CleanupSafetyMargin
}
// GetMinEpochDuration returns the minimum duration of an epoch.
func (f *formattingOptionsProvider) GetMinEpochDuration() time.Duration {
return f.EpochParameters.MinEpochDuration
}
// GetEpochAdvanceOnCountThreshold returns the number of files above which epoch should be advanced.
func (f *formattingOptionsProvider) GetEpochAdvanceOnCountThreshold() int {
return f.EpochParameters.EpochAdvanceOnCountThreshold
}
// GetEpochAdvanceOnTotalSizeBytesThreshold returns the total size of files above which the epoch should be advanced.
func (f *formattingOptionsProvider) GetEpochAdvanceOnTotalSizeBytesThreshold() int64 {
return f.EpochParameters.EpochAdvanceOnTotalSizeBytesThreshold
}
// GetEpochDeleteParallelism returns the number of blobs to delete in parallel during cleanup.
func (f *formattingOptionsProvider) GetEpochDeleteParallelism() int {
return f.EpochParameters.DeleteParallelism
}
func (f *formattingOptionsProvider) Struct() FormattingOptions {
return *f.FormattingOptions
}
// NewFormattingOptionsProvider validates the provided formatting options and returns static
// FormattingOptionsProvider based on them.
func NewFormattingOptionsProvider(f *FormattingOptions, formatBytes []byte) (FormattingOptionsProvider, error) {
formatVersion := f.Version
if formatVersion < minSupportedReadVersion || formatVersion > currentWriteVersion {
return nil, errors.Errorf("can't handle repositories created using version %v (min supported %v, max supported %v)", formatVersion, minSupportedReadVersion, maxSupportedReadVersion)
}
if formatVersion < minSupportedWriteVersion || formatVersion > currentWriteVersion {
return nil, errors.Errorf("can't handle repositories created using version %v (min supported %v, max supported %v)", formatVersion, minSupportedWriteVersion, maxSupportedWriteVersion)
}
actualIndexVersion := f.IndexVersion
if actualIndexVersion == 0 {
actualIndexVersion = legacyIndexVersion
}
if actualIndexVersion < index.Version1 || actualIndexVersion > index.Version2 {
return nil, errors.Errorf("index version %v is not supported", actualIndexVersion)
}
crypter, err := CreateCrypter(f)
if err != nil {
return nil, errors.Wrap(err, "unable to create crypter")
}
return &formattingOptionsProvider{
FormattingOptions: f,
crypter: crypter,
actualIndexVersion: actualIndexVersion,
actualFormatVersion: f.Version,
formatBytes: formatBytes,
}, nil
}
func (f *formattingOptionsProvider) Crypter() *Crypter {
return f.crypter
}
func (f *formattingOptionsProvider) WriteIndexVersion() int {
return f.actualIndexVersion
}
func (f *formattingOptionsProvider) MaxIndexBlobSize() int64 {
return int64(f.MaxPackSize)
}
func (f *formattingOptionsProvider) MaxPackBlobSize() int {
return f.MaxPackSize
}
func (f *formattingOptionsProvider) GetEpochManagerParameters() epoch.Parameters {
return f.EpochParameters
}
func (f *formattingOptionsProvider) IndexShardSize() int {
return defaultIndexShardSize
}
func (f *formattingOptionsProvider) RepositoryFormatBytes() []byte {
return f.formatBytes
}
var _ FormattingOptionsProvider = (*formattingOptionsProvider)(nil)
// BlobCfgBlob is the content for `kopia.blobcfg` blob which contains the blob
// management configuration options.
type BlobCfgBlob struct {

View File

@@ -22,7 +22,7 @@ func (bm *WriteManager) RecoverIndexFromPackBlob(ctx context.Context, packFile b
return nil, err
}
ndx, err := index.Open(localIndexBytes.Bytes().ToByteSlice(), nil, uint32(bm.crypter.Encryptor.Overhead()))
ndx, err := index.Open(localIndexBytes.Bytes().ToByteSlice(), nil, uint32(bm.format.Crypter().Encryptor.Overhead()))
if err != nil {
return nil, errors.Errorf("unable to open index in file %v", packFile)
}
@@ -171,7 +171,7 @@ func decodePostamble(payload []byte) *packContentPostamble {
}
func (sm *SharedManager) buildLocalIndex(pending index.Builder, output *gather.WriteBuffer) error {
if err := pending.Build(output, sm.indexVersion); err != nil {
if err := pending.Build(output, sm.format.WriteIndexVersion()); err != nil {
return errors.Wrap(err, "unable to build local index")
}
@@ -195,7 +195,7 @@ func (sm *SharedManager) appendPackFileIndexRecoveryData(pending index.Builder,
var encryptedLocalIndex gather.WriteBuffer
defer encryptedLocalIndex.Close()
if err := sm.crypter.Encryptor.Encrypt(localIndex.Bytes(), localIndexIV, &encryptedLocalIndex); err != nil {
if err := sm.format.Crypter().Encryptor.Encrypt(localIndex.Bytes(), localIndexIV, &encryptedLocalIndex); err != nil {
return errors.Wrap(err, "encryption error")
}

View File

@@ -331,7 +331,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat
PackBlobID: pp.packBlobID,
PackOffset: uint32(pp.currentPackData.Length()),
TimestampSeconds: bm.contentWriteTime(previousWriteTime),
FormatVersion: byte(bm.writeFormatVersion),
FormatVersion: byte(bm.format.FormatVersion()),
OriginalLength: uint32(data.Length()),
}
@@ -345,7 +345,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat
pp.currentPackItems[contentID] = info
shouldWrite := pp.currentPackData.Length() >= bm.maxPackSize
shouldWrite := pp.currentPackData.Length() >= bm.format.MaxPackBlobSize()
if shouldWrite {
// we're about to write to storage without holding a lock
// remove from pendingPacks so other goroutine tries to mess with this pending pack.
@@ -413,7 +413,7 @@ func (bm *WriteManager) verifyPackIndexBuilderLocked() {
bm.assertInvariant(cpi.GetPackBlobID() == "", "content can't be both deleted and have a pack content: %v", cpi.GetContentID())
} else {
bm.assertInvariant(cpi.GetPackBlobID() != "", "content that's not deleted must have a pack content: %+v", cpi)
bm.assertInvariant(cpi.GetFormatVersion() == byte(bm.writeFormatVersion), "content that's not deleted must have a valid format version: %+v", cpi)
bm.assertInvariant(cpi.GetFormatVersion() == byte(bm.format.FormatVersion()), "content that's not deleted must have a valid format version: %+v", cpi)
}
bm.assertInvariant(cpi.GetTimestampSeconds() != 0, "content has no timestamp: %v", cpi.GetContentID())
@@ -438,7 +438,7 @@ func (bm *WriteManager) writeIndexBlobs(ctx context.Context, dataShards []gather
defer span.End()
// nolint:wrapcheck
return bm.indexBlobManager.writeIndexBlobs(ctx, dataShards, sessionID)
return bm.indexBlobManager().writeIndexBlobs(ctx, dataShards, sessionID)
}
// +checklocksread:bm.indexesLock
@@ -461,7 +461,7 @@ func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context) error {
if len(bm.packIndexBuilder) > 0 {
_, span2 := tracer.Start(ctx, "BuildShards")
dataShards, closeShards, err := bm.packIndexBuilder.BuildShards(bm.indexVersion, true, bm.indexShardSize)
dataShards, closeShards, err := bm.packIndexBuilder.BuildShards(bm.format.WriteIndexVersion(), true, bm.format.IndexShardSize())
span2.End()
@@ -592,7 +592,7 @@ func removePendingPack(slice []*pendingPackInfo, pp *pendingPackInfo) []*pending
}
// ContentFormat returns formatting options.
func (bm *WriteManager) ContentFormat() FormattingOptions {
func (bm *WriteManager) ContentFormat() FormattingOptionsProvider {
return bm.format
}
@@ -743,7 +743,7 @@ func (bm *WriteManager) getOrCreatePendingPackInfoLocked(ctx context.Context, pr
return nil, errors.Wrap(err, "unable to read crypto bytes")
}
b.Append(bm.repositoryFormatBytes)
b.Append(bm.format.RepositoryFormatBytes())
// nolint:gosec
if err := writeRandomBytesToBuffer(b, rand.Intn(bm.maxPreambleLength-bm.minPreambleLength+1)+bm.minPreambleLength); err != nil {
@@ -762,7 +762,7 @@ func (bm *WriteManager) getOrCreatePendingPackInfoLocked(ctx context.Context, pr
// SupportsContentCompression returns true if content manager supports content-compression.
func (bm *WriteManager) SupportsContentCompression() bool {
return bm.format.IndexVersion >= index.Version2
return bm.format.WriteIndexVersion() >= index.Version2
}
// WriteContent saves a given content of data to a pack group with a provided name and returns a contentID
@@ -938,7 +938,7 @@ func (o *ManagerOptions) CloneOrDefault() *ManagerOptions {
}
// NewManagerForTesting creates new content manager with given packing options and a formatter.
func NewManagerForTesting(ctx context.Context, st blob.Storage, f *FormattingOptions, caching *CachingOptions, options *ManagerOptions) (*WriteManager, error) {
func NewManagerForTesting(ctx context.Context, st blob.Storage, f FormattingOptionsProvider, caching *CachingOptions, options *ManagerOptions) (*WriteManager, error) {
options = options.CloneOrDefault()
if options.TimeNow == nil {
options.TimeNow = clock.Now

View File

@@ -38,7 +38,7 @@ func (sm *SharedManager) Refresh(ctx context.Context) error {
sm.log.Debugf("Refresh started")
sm.indexBlobManager.invalidate(ctx)
sm.indexBlobManager().invalidate(ctx)
timer := timetrack.StartTimer()
@@ -57,7 +57,7 @@ func (sm *SharedManager) CompactIndexes(ctx context.Context, opt CompactOptions)
sm.log.Debugf("CompactIndexes(%+v)", opt)
if err := sm.indexBlobManager.compact(ctx, opt); err != nil {
if err := sm.indexBlobManager().compact(ctx, opt); err != nil {
return errors.Wrap(err, "error performing compaction")
}

View File

@@ -30,14 +30,14 @@ func (sm *SharedManager) maybeCompressAndEncryptDataForPacking(data gather.Bytes
// If the content is prefixed (which represents Kopia's own metadata as opposed to user data),
// and we're on V2 format or greater, enable internal compression even when not requested.
if contentID.HasPrefix() && comp == NoCompression && sm.format.IndexVersion >= index.Version2 {
if contentID.HasPrefix() && comp == NoCompression && sm.format.WriteIndexVersion() >= index.Version2 {
// 'zstd-fastest' has a good mix of being fast, low memory usage and high compression for JSON.
comp = compression.HeaderZstdFastest
}
// nolint:nestif
if comp != NoCompression {
if sm.format.IndexVersion < index.Version2 {
if sm.format.WriteIndexVersion() < index.Version2 {
return NoCompression, errors.Errorf("compression is not enabled for this repository")
}
@@ -62,7 +62,7 @@ func (sm *SharedManager) maybeCompressAndEncryptDataForPacking(data gather.Bytes
}
}
if err := sm.crypter.Encryptor.Encrypt(data, iv, output); err != nil {
if err := sm.Crypter().Encryptor.Encrypt(data, iv, output); err != nil {
return NoCompression, errors.Wrap(err, "unable to encrypt")
}
@@ -181,7 +181,7 @@ func (sm *SharedManager) writePackFileNotLocked(ctx context.Context, packFile bl
func (sm *SharedManager) hashData(output []byte, data gather.Bytes) []byte {
// Hash the content and compute encryption key.
contentID := sm.crypter.HashFunction(output, data)
contentID := sm.format.Crypter().HashFunction(output, data)
sm.Stats.hashedContent(data.Length())
return contentID

View File

@@ -354,13 +354,13 @@ func (s *contentManagerSuite) TestContentManagerFailedToWritePack(t *testing.T)
ta := faketime.NewTimeAdvance(fakeTime, 0)
bm, err := NewManagerForTesting(testlogging.Context(t), st, &FormattingOptions{
bm, err := NewManagerForTesting(testlogging.Context(t), st, mustCreateFormatProvider(t, &FormattingOptions{
Hash: "HMAC-SHA256-128",
Encryption: "AES256-GCM-HMAC-SHA256",
MutableParameters: s.mutableParameters,
HMACSecret: []byte("foo"),
MasterKey: []byte("0123456789abcdef0123456789abcdef"),
}, nil, &ManagerOptions{TimeNow: ta.NowFunc()})
}), nil, &ManagerOptions{TimeNow: ta.NowFunc()})
if err != nil {
t.Fatalf("can't create bm: %v", err)
}
@@ -1877,11 +1877,11 @@ func (s *contentManagerSuite) verifyVersionCompat(t *testing.T, writeVersion For
data := blobtesting.DataMap{}
st := blobtesting.NewMapStorage(data, nil, nil)
mgr := s.newTestContentManager(t, st)
mgr := s.newTestContentManagerWithTweaks(t, st, &contentManagerTestTweaks{
formatVersion: writeVersion,
})
defer mgr.Close(ctx)
mgr.writeFormatVersion = int32(writeVersion)
dataSet := map[ID][]byte{}
for i := 0; i < 3000000; i = (i + 1) * 2 {
@@ -2331,8 +2331,9 @@ type contentManagerTestTweaks struct {
CachingOptions
ManagerOptions
indexVersion int
maxPackSize int
indexVersion int
maxPackSize int
formatVersion FormatVersion
}
func (s *contentManagerSuite) newTestContentManagerWithTweaks(t *testing.T, st blob.Storage, tweaks *contentManagerTestTweaks) *WriteManager {
@@ -2357,13 +2358,17 @@ func (s *contentManagerSuite) newTestContentManagerWithTweaks(t *testing.T, st b
mp.MaxPackSize = mps
}
if tweaks.formatVersion != 0 {
mp.Version = tweaks.formatVersion
}
ctx := testlogging.Context(t)
fo := &FormattingOptions{
fo := mustCreateFormatProvider(t, &FormattingOptions{
Hash: "HMAC-SHA256",
Encryption: "AES256-GCM-HMAC-SHA256",
HMACSecret: hmacSecret,
MutableParameters: mp,
}
})
bm, err := NewManagerForTesting(ctx, st, fo, &tweaks.CachingOptions, &tweaks.ManagerOptions)
if err != nil {

View File

@@ -9,7 +9,7 @@
// Reader defines content read API.
type Reader interface {
SupportsContentCompression() bool
ContentFormat() FormattingOptions
ContentFormat() FormattingOptionsProvider
GetContent(ctx context.Context, id ID) ([]byte, error)
ContentInfo(ctx context.Context, id ID) (Info, error)
IterateContents(ctx context.Context, opts IterateOptions, callback IterateCallback) error

View File

@@ -12,10 +12,10 @@
)
type encryptedBlobMgr struct {
st blob.Storage
crypter *Crypter
indexBlobCache *cache.PersistentCache
log logging.Logger
st blob.Storage
crypterProvider CrypterProvider
indexBlobCache *cache.PersistentCache
log logging.Logger
}
func (m *encryptedBlobMgr) getEncryptedBlob(ctx context.Context, blobID blob.ID, output *gather.WriteBuffer) error {
@@ -29,14 +29,14 @@ func (m *encryptedBlobMgr) getEncryptedBlob(ctx context.Context, blobID blob.ID,
return errors.Wrap(err, "getContent")
}
return m.crypter.DecryptBLOB(payload.Bytes(), blobID, output)
return m.crypterProvider.Crypter().DecryptBLOB(payload.Bytes(), blobID, output)
}
func (m *encryptedBlobMgr) encryptAndWriteBlob(ctx context.Context, data gather.Bytes, prefix blob.ID, sessionID SessionID) (blob.Metadata, error) {
var data2 gather.WriteBuffer
defer data2.Close()
blobID, err := m.crypter.EncryptBLOB(data, prefix, sessionID, &data2)
blobID, err := m.crypterProvider.Crypter().EncryptBLOB(data, prefix, sessionID, &data2)
if err != nil {
return blob.Metadata{}, errors.Wrap(err, "error encrypting")
}

View File

@@ -43,10 +43,10 @@ func TestEncryptedBlobManager(t *testing.T) {
}
ebm := encryptedBlobMgr{
st: fs,
crypter: cr,
indexBlobCache: nil,
log: logging.NullLogger,
st: fs,
crypterProvider: staticCrypterProvider{cr},
indexBlobCache: nil,
log: logging.NullLogger,
}
ctx := testlogging.Context(t)
@@ -85,3 +85,11 @@ func TestEncryptedBlobManager(t *testing.T) {
_, err = ebm.encryptAndWriteBlob(ctx, gather.FromSlice([]byte{1, 2, 3, 4}), "x", "session1")
require.ErrorIs(t, err, someError2)
}
type staticCrypterProvider struct {
theCrypter *Crypter
}
func (p staticCrypterProvider) Crypter() *Crypter {
return p.theCrypter
}

View File

@@ -45,14 +45,20 @@ type cleanupEntry struct {
age time.Duration // not serialized, computed on load
}
// IndexFormattingOptions provides options for formatting index blobs.
type IndexFormattingOptions interface {
MaxIndexBlobSize() int64
WriteIndexVersion() int
IndexShardSize() int
}
type indexBlobManagerV0 struct {
st blob.Storage
enc *encryptedBlobMgr
timeNow func() time.Time
log logging.Logger
maxPackSize int
indexVersion int
indexShardSize int
st blob.Storage
enc *encryptedBlobMgr
timeNow func() time.Time
log logging.Logger
formattingOptions IndexFormattingOptions
}
func (m *indexBlobManagerV0) listActiveIndexBlobs(ctx context.Context) ([]IndexBlobInfo, time.Time, error) {
@@ -413,14 +419,14 @@ func (m *indexBlobManagerV0) getBlobsToCompact(indexBlobs []IndexBlobInfo, opt C
var mediumSizedBlobCount int
for _, b := range indexBlobs {
if b.Length > int64(m.maxPackSize) && !opt.AllIndexes {
if b.Length > m.formattingOptions.MaxIndexBlobSize() && !opt.AllIndexes {
continue
}
nonCompactedBlobs = append(nonCompactedBlobs, b)
totalSizeNonCompactedBlobs += b.Length
if b.Length < int64(m.maxPackSize/verySmallContentFraction) {
if b.Length < m.formattingOptions.MaxIndexBlobSize()/verySmallContentFraction {
verySmallBlobs = append(verySmallBlobs, b)
totalSizeVerySmallBlobs += b.Length
} else {
@@ -468,7 +474,7 @@ func (m *indexBlobManagerV0) compactIndexBlobs(ctx context.Context, indexBlobs [
// we must do it after all input blobs have been merged, otherwise we may resurrect contents.
m.dropContentsFromBuilder(bld, opt)
dataShards, cleanupShards, err := bld.BuildShards(m.indexVersion, false, m.indexShardSize)
dataShards, cleanupShards, err := bld.BuildShards(m.formattingOptions.WriteIndexVersion(), false, m.formattingOptions.IndexShardSize())
if err != nil {
return errors.Wrap(err, "unable to build an index")
}
@@ -520,7 +526,7 @@ func addIndexBlobsToBuilder(ctx context.Context, enc *encryptedBlobMgr, bld inde
return errors.Wrapf(err, "error getting index %q", indexBlobID)
}
ndx, err := index.Open(data.ToByteSlice(), nil, uint32(enc.crypter.Encryptor.Overhead()))
ndx, err := index.Open(data.ToByteSlice(), nil, uint32(enc.crypterProvider.Crypter().Encryptor.Overhead()))
if err != nil {
return errors.Wrapf(err, "unable to open index blob %q", indexBlobID)
}

View File

@@ -794,16 +794,14 @@ func newIndexBlobManagerForTesting(t *testing.T, st blob.Storage, localTimeNow f
enc: &encryptedBlobMgr{
st: st,
indexBlobCache: nil,
crypter: &Crypter{
crypterProvider: staticCrypterProvider{&Crypter{
HashFunction: hf,
Encryptor: enc,
},
}},
log: log,
},
timeNow: localTimeNow,
maxPackSize: 20 << 20,
indexVersion: 1,
log: log,
timeNow: localTimeNow,
log: log,
}
return m

View File

@@ -16,14 +16,12 @@
)
type indexBlobManagerV1 struct {
st blob.Storage
enc *encryptedBlobMgr
epochMgr *epoch.Manager
timeNow func() time.Time
log logging.Logger
maxPackSize int
indexVersion int
indexShardSize int
st blob.Storage
enc *encryptedBlobMgr
epochMgr *epoch.Manager
timeNow func() time.Time
log logging.Logger
formattingOptions IndexFormattingOptions
}
func (m *indexBlobManagerV1) listActiveIndexBlobs(ctx context.Context) ([]IndexBlobInfo, time.Time, error) {
@@ -70,7 +68,7 @@ func (m *indexBlobManagerV1) compactEpoch(ctx context.Context, blobIDs []blob.ID
}
}
dataShards, cleanupShards, err := tmpbld.BuildShards(m.indexVersion, true, m.indexShardSize)
dataShards, cleanupShards, err := tmpbld.BuildShards(m.formattingOptions.WriteIndexVersion(), true, m.formattingOptions.IndexShardSize())
if err != nil {
return errors.Wrap(err, "unable to build index dataShards")
}
@@ -91,7 +89,7 @@ func (m *indexBlobManagerV1) compactEpoch(ctx context.Context, blobIDs []blob.ID
for _, data := range dataShards {
data2.Reset()
blobID, err := m.enc.crypter.EncryptBLOB(data, outputPrefix, SessionID(sessionID), &data2)
blobID, err := m.enc.crypterProvider.Crypter().EncryptBLOB(data, outputPrefix, SessionID(sessionID), &data2)
if err != nil {
return errors.Wrap(err, "error encrypting")
}
@@ -115,7 +113,7 @@ func (m *indexBlobManagerV1) writeIndexBlobs(ctx context.Context, dataShards []g
data2 := gather.NewWriteBuffer()
defer data2.Close() //nolint:gocritic
unprefixedBlobID, err := m.enc.crypter.EncryptBLOB(data, "", sessionID, data2)
unprefixedBlobID, err := m.enc.crypterProvider.Crypter().EncryptBLOB(data, "", sessionID, data2)
if err != nil {
return nil, errors.Wrap(err, "error encrypting")
}
@@ -131,8 +129,6 @@ func (m *indexBlobManagerV1) writeIndexBlobs(ctx context.Context, dataShards []g
// PrepareUpgradeToIndexBlobManagerV1 prepares the repository for migrating to IndexBlobManagerV1.
func (sm *SharedManager) PrepareUpgradeToIndexBlobManagerV1(ctx context.Context, params epoch.Parameters) error {
sm.indexBlobManagerV1.epochMgr.Params = params
ibl, _, err := sm.indexBlobManagerV0.listActiveIndexBlobs(ctx)
if err != nil {
return errors.Wrap(err, "error listing active index blobs")
@@ -148,7 +144,5 @@ func (sm *SharedManager) PrepareUpgradeToIndexBlobManagerV1(ctx context.Context,
return errors.Wrap(err, "unable to generate initial epoch")
}
sm.indexBlobManager = sm.indexBlobManagerV1
return nil
}

View File

@@ -33,7 +33,7 @@ type internalLogManager struct {
ctx context.Context // nolint:containedctx
st blob.Storage
bc *Crypter
bc CrypterProvider
wg sync.WaitGroup
timeFunc func() time.Time
flushThreshold int
@@ -48,7 +48,7 @@ func (m *internalLogManager) encryptAndWriteLogBlob(prefix blob.ID, data gather.
encrypted := gather.NewWriteBuffer()
// Close happens in a goroutine
blobID, err := m.bc.EncryptBLOB(data, prefix, "", encrypted)
blobID, err := m.bc.Crypter().EncryptBLOB(data, prefix, "", encrypted)
if err != nil {
encrypted.Close()
@@ -208,7 +208,7 @@ func (l *internalLogger) Sync() error {
}
// newInternalLogManager creates a new blobLogManager that will emit logs as repository blobs with a given prefix.
func newInternalLogManager(ctx context.Context, st blob.Storage, bc *Crypter) *internalLogManager {
func newInternalLogManager(ctx context.Context, st blob.Storage, bc CrypterProvider) *internalLogManager {
return &internalLogManager{
ctx: ctx,
st: st,

View File

@@ -111,7 +111,7 @@ func (bm *WriteManager) writeSessionMarkerLocked(ctx context.Context) error {
var encrypted gather.WriteBuffer
defer encrypted.Close()
sessionBlobID, err := bm.crypter.EncryptBLOB(gather.FromSlice(js), BlobIDPrefixSession, bm.currentSessionInfo.ID, &encrypted)
sessionBlobID, err := bm.format.Crypter().EncryptBLOB(gather.FromSlice(js), BlobIDPrefixSession, bm.currentSessionInfo.ID, &encrypted)
if err != nil {
return errors.Wrap(err, "unable to encrypt session marker")
}
@@ -178,7 +178,7 @@ func (bm *WriteManager) ListActiveSessions(ctx context.Context) (map[SessionID]*
return nil, errors.Wrapf(err, "error loading session: %v", b.BlobID)
}
err = bm.crypter.DecryptBLOB(payload.Bytes(), b.BlobID, &decrypted)
err = bm.format.Crypter().DecryptBLOB(payload.Bytes(), b.BlobID, &decrypted)
if err != nil {
return nil, errors.Wrapf(err, "error decrypting session: %v", b.BlobID)
}

View File

@@ -136,7 +136,7 @@ func getContentToRewrite(ctx context.Context, rep repo.DirectRepository, opt *Re
// add all content IDs from short packs
if opt.ShortPacks {
threshold := int64(rep.ContentReader().ContentFormat().MaxPackSize * shortPackThresholdPercent / 100) //nolint:gomnd
threshold := int64(rep.ContentReader().ContentFormat().MaxPackBlobSize() * shortPackThresholdPercent / 100) //nolint:gomnd
findContentInShortPacks(ctx, rep, ch, threshold, opt)
}

View File

@@ -142,20 +142,19 @@ func TestManifestInitCorruptedBlock(t *testing.T) {
data := blobtesting.DataMap{}
st := blobtesting.NewMapStorage(data, nil, nil)
f := &content.FormattingOptions{
fop, err := content.NewFormattingOptionsProvider(&content.FormattingOptions{
Hash: hashing.DefaultAlgorithm,
Encryption: encryption.DefaultAlgorithm,
MutableParameters: content.MutableParameters{
Version: 1,
MaxPackSize: 100000,
},
}
}, nil)
require.NoError(t, err)
// write some data to storage
bm, err := content.NewManagerForTesting(ctx, st, f, nil, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
bm, err := content.NewManagerForTesting(ctx, st, fop, nil, nil)
require.NoError(t, err)
bm0 := bm
@@ -182,10 +181,8 @@ func TestManifestInitCorruptedBlock(t *testing.T) {
}
// make a new content manager based on corrupted data.
bm, err = content.NewManagerForTesting(ctx, st, f, nil, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
bm, err = content.NewManagerForTesting(ctx, st, fop, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { bm.Close(ctx) })
@@ -305,24 +302,24 @@ func newManagerForTesting(ctx context.Context, t *testing.T, data blobtesting.Da
st := blobtesting.NewMapStorage(data, nil, nil)
bm, err := content.NewManagerForTesting(ctx, st, &content.FormattingOptions{
fop, err := content.NewFormattingOptionsProvider(&content.FormattingOptions{
Hash: hashing.DefaultAlgorithm,
Encryption: encryption.DefaultAlgorithm,
MutableParameters: content.MutableParameters{
Version: 1,
MaxPackSize: 100000,
},
}, nil, nil)
if err != nil {
t.Fatalf("can't create content manager: %v", err)
}
}, nil)
require.NoError(t, err)
bm, err := content.NewManagerForTesting(ctx, st, fop, nil, nil)
require.NoError(t, err)
t.Cleanup(func() { bm.Close(ctx) })
mm, err := NewManager(ctx, bm, ManagerOptions{})
if err != nil {
t.Fatalf("can't create manifest manager: %v", err)
}
require.NoError(t, err)
return mm
}

View File

@@ -333,7 +333,12 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw
st = upgradeLockMonitor(options.UpgradeOwnerID, st, password, cacheOpts, lc.FormatBlobCacheDuration,
ufb.cacheMTime, cmOpts.TimeNow)
scm, err := content.NewSharedManager(ctx, st, fo, cacheOpts, cmOpts)
fop, err := content.NewFormattingOptionsProvider(fo, cmOpts.RepositoryFormatBytes)
if err != nil {
return nil, errors.Wrap(err, "unable to create format options provider")
}
scm, err := content.NewSharedManager(ctx, st, fop, cacheOpts, cmOpts)
if err != nil {
return nil, errors.Wrap(err, "unable to create shared content manager")
}

View File

@@ -110,8 +110,8 @@ type directRepository struct {
// DeriveKey derives encryption key of the provided length from the master key.
func (r *directRepository) DeriveKey(purpose []byte, keyLength int) []byte {
if r.cmgr.ContentFormat().EnablePasswordChange {
return deriveKeyFromMasterKey(r.cmgr.ContentFormat().MasterKey, r.uniqueID, purpose, keyLength)
if r.cmgr.ContentFormat().SupportsPasswordChange() {
return deriveKeyFromMasterKey(r.cmgr.ContentFormat().GetMasterKey(), r.uniqueID, purpose, keyLength)
}
// version of kopia <v0.9 had a bug where certain keys were derived directly from

View File

@@ -191,7 +191,7 @@ func TestFormatUpgradeMultipleLocksRollback(t *testing.T) {
opts.UpgradeOwnerID = "another-upgrade-owner"
})
require.Equal(t, content.FormatVersion3,
env.RepositoryWriter.ContentManager().ContentFormat().MutableParameters.Version)
env.RepositoryWriter.ContentManager().ContentFormat().FormatVersion())
require.NoError(t, env.RepositoryWriter.RollbackUpgrade(ctx))
@@ -211,7 +211,7 @@ func TestFormatUpgradeMultipleLocksRollback(t *testing.T) {
// verify that we are back to the original version where we started from
require.Equal(t, content.FormatVersion1,
env.RepositoryWriter.ContentManager().ContentFormat().MutableParameters.Version)
env.RepositoryWriter.ContentManager().ContentFormat().FormatVersion())
}
func TestFormatUpgradeFailureToBackupFormatBlobOnLock(t *testing.T) {

View File

@@ -9,6 +9,8 @@
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/internal/blobtesting"
"github.com/kopia/kopia/internal/clock"
"github.com/kopia/kopia/internal/gather"
@@ -45,16 +47,19 @@ func TestStressBlockManager(t *testing.T) {
func stressTestWithStorage(t *testing.T, st blob.Storage, duration time.Duration) {
ctx := testlogging.Context(t)
fop, err := content.NewFormattingOptionsProvider(&content.FormattingOptions{
Hash: "HMAC-SHA256-128",
Encryption: encryption.DefaultAlgorithm,
MutableParameters: content.MutableParameters{
Version: 1,
MaxPackSize: 20000000,
},
MasterKey: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
}, nil)
require.NoError(t, err)
openMgr := func() (*content.WriteManager, error) {
return content.NewManagerForTesting(ctx, st, &content.FormattingOptions{
Hash: "HMAC-SHA256-128",
Encryption: encryption.DefaultAlgorithm,
MutableParameters: content.MutableParameters{
Version: 1,
MaxPackSize: 20000000,
},
MasterKey: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15},
}, nil, nil)
return content.NewManagerForTesting(ctx, st, fop, nil, nil)
}
seed0 := clock.Now().Nanosecond()