From 56f3046d8ac9180e389d9b1cd89de7ffdc2efb83 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Thu, 28 Jul 2022 17:27:04 -0700 Subject: [PATCH] refactor(repository): introduce interface for reading FormattingOptions (#2235) Instead of passing static content.FormattingOptions (and caching it) we now introduce an interface to provide its values. This will allow the values to dynamically change at runtime in the future to support cases like live migration. --- cli/command_repository_set_parameters.go | 2 +- cli/command_repository_status.go | 26 ++-- cli/command_repository_upgrade.go | 6 +- internal/epoch/epoch_manager.go | 93 +++++++++++-- internal/epoch/epoch_manager_test.go | 6 +- internal/server/api_repo.go | 10 +- internal/server/grpc_session.go | 4 +- repo/content/committed_read_manager.go | 117 ++++++---------- repo/content/content_formatter_test.go | 13 +- repo/content/content_formatting_options.go | 155 +++++++++++++++++++++ repo/content/content_index_recovery.go | 6 +- repo/content/content_manager.go | 18 +-- repo/content/content_manager_indexes.go | 4 +- repo/content/content_manager_lock_free.go | 8 +- repo/content/content_manager_test.go | 23 +-- repo/content/content_reader.go | 2 +- repo/content/encrypted_blob_mgr.go | 12 +- repo/content/encrypted_blob_mgr_test.go | 16 ++- repo/content/index_blob_manager_v0.go | 28 ++-- repo/content/index_blob_manager_v0_test.go | 10 +- repo/content/index_blob_manager_v1.go | 24 ++-- repo/content/internal_logger.go | 6 +- repo/content/sessions.go | 4 +- repo/maintenance/content_rewrite.go | 2 +- repo/manifest/manifest_manager_test.go | 33 ++--- repo/open.go | 7 +- repo/repository.go | 4 +- repo/upgrade_lock_test.go | 4 +- tests/stress_test/stress_test.go | 23 +-- 29 files changed, 445 insertions(+), 221 deletions(-) diff --git a/cli/command_repository_set_parameters.go b/cli/command_repository_set_parameters.go index 96aee8972..435eb4aeb 100644 --- a/cli/command_repository_set_parameters.go +++ b/cli/command_repository_set_parameters.go @@ -113,7 +113,7 @@ func (c *commandRepositorySetParameters) setRetentionModeParameter(ctx context.C func (c *commandRepositorySetParameters) run(ctx context.Context, rep repo.DirectRepositoryWriter) error { var anyChange bool - mp := rep.ContentReader().ContentFormat().MutableParameters + mp := rep.ContentReader().ContentFormat().GetMutableParameters() blobcfg := rep.BlobCfg() upgradeToEpochManager := false diff --git a/cli/command_repository_status.go b/cli/command_repository_status.go index 1f54f1fbf..ded7ec0c5 100644 --- a/cli/command_repository_status.go +++ b/cli/command_repository_status.go @@ -63,8 +63,8 @@ func (c *commandRepositoryStatus) outputJSON(ctx context.Context, r repo.Reposit s.UniqueIDHex = hex.EncodeToString(dr.UniqueID()) s.ObjectFormat = dr.ObjectFormat() s.BlobRetention = dr.BlobCfg() - s.Storage = scrubber.ScrubSensitiveData(reflect.ValueOf(ci)).Interface().(blob.ConnectionInfo) // nolint:forcetypeassert - s.ContentFormat = scrubber.ScrubSensitiveData(reflect.ValueOf(dr.ContentReader().ContentFormat())).Interface().(content.FormattingOptions) // nolint:forcetypeassert + s.Storage = scrubber.ScrubSensitiveData(reflect.ValueOf(ci)).Interface().(blob.ConnectionInfo) // nolint:forcetypeassert + s.ContentFormat = scrubber.ScrubSensitiveData(reflect.ValueOf(dr.ContentReader().ContentFormat().Struct())).Interface().(content.FormattingOptions) // nolint:forcetypeassert switch cp, err := dr.BlobVolume().GetCapacity(ctx); { case err == nil: @@ -170,17 +170,19 @@ func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository) c.out.printStdout("Storage config: %v\n", string(cjson)) } + contentFormat := dr.ContentReader().ContentFormat() + c.out.printStdout("\n") c.out.printStdout("Unique ID: %x\n", dr.UniqueID()) - c.out.printStdout("Hash: %v\n", dr.ContentReader().ContentFormat().Hash) - c.out.printStdout("Encryption: %v\n", dr.ContentReader().ContentFormat().Encryption) + c.out.printStdout("Hash: %v\n", contentFormat.GetHashFunction()) + c.out.printStdout("Encryption: %v\n", contentFormat.GetEncryptionAlgorithm()) c.out.printStdout("Splitter: %v\n", dr.ObjectFormat().Splitter) - c.out.printStdout("Format version: %v\n", dr.ContentReader().ContentFormat().Version) + c.out.printStdout("Format version: %v\n", contentFormat.FormatVersion()) c.out.printStdout("Content compression: %v\n", dr.ContentReader().SupportsContentCompression()) - c.out.printStdout("Password changes: %v\n", dr.ContentReader().ContentFormat().EnablePasswordChange) + c.out.printStdout("Password changes: %v\n", contentFormat.SupportsPasswordChange()) - c.out.printStdout("Max pack length: %v\n", units.BytesStringBase2(int64(dr.ContentReader().ContentFormat().MaxPackSize))) - c.out.printStdout("Index Format: v%v\n", dr.ContentReader().ContentFormat().IndexVersion) + c.out.printStdout("Max pack length: %v\n", units.BytesStringBase2(int64(contentFormat.MaxPackBlobSize()))) + c.out.printStdout("Index Format: v%v\n", contentFormat.WriteIndexVersion()) if emgr, ok := dr.ContentReader().EpochManager(); ok { c.out.printStdout("\n") @@ -192,10 +194,10 @@ func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository) } c.out.printStdout("\n") - c.out.printStdout("Epoch refresh frequency: %v\n", emgr.Params.EpochRefreshFrequency) - c.out.printStdout("Epoch advance on: %v blobs or %v, minimum %v\n", emgr.Params.EpochAdvanceOnCountThreshold, units.BytesStringBase2(emgr.Params.EpochAdvanceOnTotalSizeBytesThreshold), emgr.Params.MinEpochDuration) - c.out.printStdout("Epoch cleanup margin: %v\n", emgr.Params.CleanupSafetyMargin) - c.out.printStdout("Epoch checkpoint every: %v epochs\n", emgr.Params.FullCheckpointFrequency) + c.out.printStdout("Epoch refresh frequency: %v\n", contentFormat.GetEpochRefreshFrequency()) + c.out.printStdout("Epoch advance on: %v blobs or %v, minimum %v\n", contentFormat.GetEpochAdvanceOnCountThreshold(), units.BytesStringBase2(contentFormat.GetEpochAdvanceOnTotalSizeBytesThreshold()), contentFormat.GetMinEpochDuration()) + c.out.printStdout("Epoch cleanup margin: %v\n", contentFormat.GetEpochCleanupSafetyMargin()) + c.out.printStdout("Epoch checkpoint every: %v epochs\n", contentFormat.GetEpochFullCheckpointFrequency()) } else { c.out.printStdout("Epoch Manager: disabled\n") } diff --git a/cli/command_repository_upgrade.go b/cli/command_repository_upgrade.go index 2c6ce52f2..49c7779b6 100644 --- a/cli/command_repository_upgrade.go +++ b/cli/command_repository_upgrade.go @@ -122,7 +122,7 @@ func (c *commandRepositoryUpgrade) setLockIntent(ctx context.Context, rep repo.D } now := rep.Time() - mp := rep.ContentReader().ContentFormat().MutableParameters + mp := rep.ContentReader().ContentFormat().GetMutableParameters() openOpts := c.svc.optionsFromFlags(ctx) l := &repo.UpgradeLockIntent{ OwnerID: openOpts.UpgradeOwnerID, @@ -163,7 +163,7 @@ func (c *commandRepositoryUpgrade) setLockIntent(ctx context.Context, rep repo.D // skipped until the lock is fully established. func (c *commandRepositoryUpgrade) drainOrCommit(ctx context.Context, rep repo.DirectRepositoryWriter) error { cf := rep.ContentReader().ContentFormat() - if cf.MutableParameters.EpochParameters.Enabled { + if cf.GetEpochManagerEnabled() { log(ctx).Infof("Repository indices have already been migrated to the epoch format, no need to drain other clients") l, err := rep.GetUpgradeLockIntent(ctx) @@ -250,7 +250,7 @@ func (c *commandRepositoryUpgrade) drainAllClients(ctx context.Context, rep repo // repository. This phase runs after the lock has been acquired in one of the // prior phases. func (c *commandRepositoryUpgrade) upgrade(ctx context.Context, rep repo.DirectRepositoryWriter) error { - mp := rep.ContentReader().ContentFormat().MutableParameters + mp := rep.ContentReader().ContentFormat().GetMutableParameters() if mp.EpochParameters.Enabled { // nothing to upgrade on format, so let the next action commit the upgraded format blob return nil diff --git a/internal/epoch/epoch_manager.go b/internal/epoch/epoch_manager.go index 79407a187..54a2cfcef 100644 --- a/internal/epoch/epoch_manager.go +++ b/internal/epoch/epoch_manager.go @@ -29,6 +29,33 @@ maxRefreshAttemptSleepExponent = 1.5 ) +// ParametersProvider provides epoch manager parameters. +type ParametersProvider interface { + // whether epoch manager is enabled, must be true. + GetEpochManagerEnabled() bool + + // how frequently each client will list blobs to determine the current epoch. + GetEpochRefreshFrequency() time.Duration + + // number of epochs between full checkpoints. + GetEpochFullCheckpointFrequency() int + + // do not delete uncompacted blobs if the corresponding compacted blob age is less than this. + GetEpochCleanupSafetyMargin() time.Duration + + // minimum duration of an epoch + GetMinEpochDuration() time.Duration + + // advance epoch if number of files exceeds this + GetEpochAdvanceOnCountThreshold() int + + // advance epoch if total size of files exceeds this. + GetEpochAdvanceOnTotalSizeBytesThreshold() int64 + + // number of blobs to delete in parallel during cleanup + GetEpochDeleteParallelism() int +} + // ErrVerySlowIndexWrite is returned by WriteIndex if a write takes more than 2 epochs (usually >48h). // This is theoretically possible with laptops going to sleep, etc. var ErrVerySlowIndexWrite = errors.Errorf("extremely slow index write - index write took more than two epochs") @@ -60,6 +87,46 @@ type Parameters struct { DeleteParallelism int } +// GetEpochManagerEnabled returns whether epoch manager is enabled, must be true. +func (p *Parameters) GetEpochManagerEnabled() bool { + return p.Enabled +} + +// GetEpochRefreshFrequency determines how frequently each client will list blobs to determine the current epoch. +func (p *Parameters) GetEpochRefreshFrequency() time.Duration { + return p.EpochRefreshFrequency +} + +// GetEpochFullCheckpointFrequency returns the number of epochs between full checkpoints. +func (p *Parameters) GetEpochFullCheckpointFrequency() int { + return p.FullCheckpointFrequency +} + +// GetEpochCleanupSafetyMargin returns safety margin to prevent uncompacted blobs from being deleted if the corresponding compacted blob age is less than this. +func (p *Parameters) GetEpochCleanupSafetyMargin() time.Duration { + return p.CleanupSafetyMargin +} + +// GetMinEpochDuration returns the minimum duration of an epoch. +func (p *Parameters) GetMinEpochDuration() time.Duration { + return p.MinEpochDuration +} + +// GetEpochAdvanceOnCountThreshold returns the number of files above which epoch should be advanced. +func (p *Parameters) GetEpochAdvanceOnCountThreshold() int { + return p.EpochAdvanceOnCountThreshold +} + +// GetEpochAdvanceOnTotalSizeBytesThreshold returns the total size of files above which the epoch should be advanced. +func (p *Parameters) GetEpochAdvanceOnTotalSizeBytesThreshold() int64 { + return p.EpochAdvanceOnTotalSizeBytesThreshold +} + +// GetEpochDeleteParallelism returns the number of blobs to delete in parallel during cleanup. +func (p *Parameters) GetEpochDeleteParallelism() int { + return p.DeleteParallelism +} + // Validate validates epoch parameters. // nolint:gomnd func (p *Parameters) Validate() error { @@ -129,7 +196,7 @@ func (cs *CurrentSnapshot) isSettledEpochNumber(epoch int) bool { // Manager manages repository epochs. type Manager struct { - Params Parameters + Parameters ParametersProvider st blob.Storage compact CompactionFunc @@ -264,7 +331,7 @@ func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot) error // only delete blobs if a suitable replacement exists and has been written sufficiently // long ago. we don't want to delete blobs that are created too recently, because other clients // may have not observed them yet. - maxReplacementTime := maxTime.Add(-e.Params.CleanupSafetyMargin) + maxReplacementTime := maxTime.Add(-e.Parameters.GetEpochCleanupSafetyMargin()) eg.Go(func() error { return e.cleanupEpochMarkers(ctx, cs) @@ -289,7 +356,7 @@ func (e *Manager) cleanupEpochMarkers(ctx context.Context, cs CurrentSnapshot) e } } - return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.Params.DeleteParallelism), "error deleting index blob marker") + return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.Parameters.GetEpochDeleteParallelism()), "error deleting index blob marker") } func (e *Manager) cleanupWatermarks(ctx context.Context, cs CurrentSnapshot, maxReplacementTime time.Time) error { @@ -310,7 +377,7 @@ func (e *Manager) cleanupWatermarks(ctx context.Context, cs CurrentSnapshot, max } } - return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.Params.DeleteParallelism), "error deleting watermark blobs") + return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.Parameters.GetEpochDeleteParallelism()), "error deleting watermark blobs") } // CleanupSupersededIndexes cleans up the indexes which have been superseded by compacted ones. @@ -331,7 +398,7 @@ func (e *Manager) CleanupSupersededIndexes(ctx context.Context) error { // only delete blobs if a suitable replacement exists and has been written sufficiently // long ago. we don't want to delete blobs that are created too recently, because other clients // may have not observed them yet. - maxReplacementTime := maxTime.Add(-e.Params.CleanupSafetyMargin) + maxReplacementTime := maxTime.Add(-e.Parameters.GetEpochCleanupSafetyMargin()) e.log.Debugw("Cleaning up superseded index blobs...", "maxReplacementTime", maxReplacementTime) @@ -353,7 +420,7 @@ func (e *Manager) CleanupSupersededIndexes(ctx context.Context) error { } } - if err := blob.DeleteMultiple(ctx, e.st, toDelete, e.Params.DeleteParallelism); err != nil { + if err := blob.DeleteMultiple(ctx, e.st, toDelete, e.Parameters.GetEpochDeleteParallelism()); err != nil { return errors.Wrap(err, "unable to delete uncompacted blobs") } @@ -378,7 +445,7 @@ func (e *Manager) refreshLocked(ctx context.Context) error { nextDelayTime := initiaRefreshAttemptSleep - if !e.Params.Enabled { + if !e.Parameters.GetEpochManagerEnabled() { return errors.Errorf("epoch manager not enabled") } @@ -497,7 +564,7 @@ func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs firstNonRangeCompacted = cs.LongestRangeCheckpointSets[len(cs.LongestRangeCheckpointSets)-1].MaxEpoch + 1 } - if latestSettled-firstNonRangeCompacted < e.Params.FullCheckpointFrequency { + if latestSettled-firstNonRangeCompacted < e.Parameters.GetEpochFullCheckpointFrequency() { e.log.Debugf("not generating range checkpoint") return @@ -575,7 +642,7 @@ func (e *Manager) refreshAttemptLocked(ctx context.Context) error { EpochStartTime: map[int]time.Time{}, UncompactedEpochSets: map[int][]blob.Metadata{}, SingleEpochCompactionSets: map[int][]blob.Metadata{}, - ValidUntil: e.timeFunc().Add(e.Params.EpochRefreshFrequency), + ValidUntil: e.timeFunc().Add(e.Parameters.GetEpochRefreshFrequency()), } e.log.Debugf("refreshAttemptLocked") @@ -613,7 +680,7 @@ func (e *Manager) refreshAttemptLocked(ctx context.Context) error { len(ues[cs.WriteEpoch+1]), cs.ValidUntil.Format(time.RFC3339Nano)) - if shouldAdvance(cs.UncompactedEpochSets[cs.WriteEpoch], e.Params.MinEpochDuration, e.Params.EpochAdvanceOnCountThreshold, e.Params.EpochAdvanceOnTotalSizeBytesThreshold) { + if shouldAdvance(cs.UncompactedEpochSets[cs.WriteEpoch], e.Parameters.GetMinEpochDuration(), e.Parameters.GetEpochAdvanceOnCountThreshold(), e.Parameters.GetEpochAdvanceOnTotalSizeBytesThreshold()) { if err := e.advanceEpoch(ctx, cs); err != nil { return errors.Wrap(err, "error advancing epoch") } @@ -700,7 +767,7 @@ func (e *Manager) WriteIndex(ctx context.Context, dataShards map[blob.ID]blob.By for { // make sure we have at least 75% of remaining time // nolint:gomnd - cs, err := e.committedState(ctx, 3*e.Params.EpochRefreshFrequency/4) + cs, err := e.committedState(ctx, 3*e.Parameters.GetEpochRefreshFrequency()/4) if err != nil { return nil, errors.Wrap(err, "error getting committed state") } @@ -924,13 +991,13 @@ func rangeCheckpointBlobPrefix(epoch1, epoch2 int) blob.ID { } // NewManager creates new epoch manager. -func NewManager(st blob.Storage, params Parameters, compactor CompactionFunc, log logging.Logger, timeNow func() time.Time) *Manager { +func NewManager(st blob.Storage, paramProvider ParametersProvider, compactor CompactionFunc, log logging.Logger, timeNow func() time.Time) *Manager { return &Manager{ st: st, log: log, compact: compactor, timeFunc: timeNow, - Params: params, + Parameters: paramProvider, getCompleteIndexSetTooSlow: new(int32), committedStateRefreshTooSlow: new(int32), writeIndexTooSlow: new(int32), diff --git a/internal/epoch/epoch_manager_test.go b/internal/epoch/epoch_manager_test.go index fb21a00f7..576d8cd1b 100644 --- a/internal/epoch/epoch_manager_test.go +++ b/internal/epoch/epoch_manager_test.go @@ -92,7 +92,7 @@ func newTestEnv(t *testing.T) *epochManagerTestEnv { st = fs st = logging.NewWrapper(st, testlogging.NewTestLogger(t), "[STORAGE] ") te := &epochManagerTestEnv{unloggedst: unloggedst, st: st, ft: ft} - m := NewManager(te.st, Parameters{ + m := NewManager(te.st, &Parameters{ Enabled: true, EpochRefreshFrequency: 20 * time.Minute, FullCheckpointFrequency: 7, @@ -121,7 +121,7 @@ func (te *epochManagerTestEnv) another() *epochManagerTestEnv { faultyStorage: te.faultyStorage, } - te2.mgr = NewManager(te2.st, te.mgr.Params, te2.compact, te.mgr.log, te.mgr.timeFunc) + te2.mgr = NewManager(te2.st, te.mgr.Parameters, te2.compact, te.mgr.log, te.mgr.timeFunc) return te2 } @@ -577,7 +577,7 @@ func verifySequentialWrites(t *testing.T, te *epochManagerTestEnv) { func TestIndexEpochManager_Disabled(t *testing.T) { te := newTestEnv(t) - te.mgr.Params.Enabled = false + (te.mgr.Parameters).(*Parameters).Enabled = false _, err := te.mgr.Current(testlogging.Context(t)) require.Error(t, err) diff --git a/internal/server/api_repo.go b/internal/server/api_repo.go index 806658917..42475dcdc 100644 --- a/internal/server/api_repo.go +++ b/internal/server/api_repo.go @@ -34,8 +34,8 @@ func handleRepoParameters(ctx context.Context, rc requestContext) (interface{}, } rp := &remoterepoapi.Parameters{ - HashFunction: dr.ContentReader().ContentFormat().Hash, - HMACSecret: dr.ContentReader().ContentFormat().HMACSecret, + HashFunction: dr.ContentReader().ContentFormat().GetHashFunction(), + HMACSecret: dr.ContentReader().ContentFormat().GetHmacSecret(), Format: dr.ObjectFormat(), SupportsContentCompression: dr.ContentReader().SupportsContentCompression(), } @@ -56,9 +56,9 @@ func handleRepoStatus(ctx context.Context, rc requestContext) (interface{}, *api return &serverapi.StatusResponse{ Connected: true, ConfigFile: dr.ConfigFilename(), - Hash: dr.ContentReader().ContentFormat().Hash, - Encryption: dr.ContentReader().ContentFormat().Encryption, - MaxPackSize: dr.ContentReader().ContentFormat().MaxPackSize, + Hash: dr.ContentReader().ContentFormat().GetHashFunction(), + Encryption: dr.ContentReader().ContentFormat().GetEncryptionAlgorithm(), + MaxPackSize: dr.ContentReader().ContentFormat().MaxPackBlobSize(), Splitter: dr.ObjectFormat().Splitter, Storage: dr.BlobReader().ConnectionInfo().Type, ClientOptions: dr.ClientOptions(), diff --git a/internal/server/grpc_session.go b/internal/server/grpc_session.go index 1e334a09b..2bab85e65 100644 --- a/internal/server/grpc_session.go +++ b/internal/server/grpc_session.go @@ -459,8 +459,8 @@ func (s *Server) handleInitialSessionHandshake(srv grpcapi.KopiaRepository_Sessi Response: &grpcapi.SessionResponse_InitializeSession{ InitializeSession: &grpcapi.InitializeSessionResponse{ Parameters: &grpcapi.RepositoryParameters{ - HashFunction: dr.ContentReader().ContentFormat().Hash, - HmacSecret: dr.ContentReader().ContentFormat().HMACSecret, + HashFunction: dr.ContentReader().ContentFormat().GetHashFunction(), + HmacSecret: dr.ContentReader().ContentFormat().GetHmacSecret(), Splitter: dr.ObjectFormat().Splitter, SupportsContentCompression: dr.ContentReader().SupportsContentCompression(), }, diff --git a/repo/content/committed_read_manager.go b/repo/content/committed_read_manager.go index 02adfa56c..188d776fa 100644 --- a/repo/content/committed_read_manager.go +++ b/repo/content/committed_read_manager.go @@ -21,7 +21,6 @@ "github.com/kopia/kopia/repo/blob/filesystem" "github.com/kopia/kopia/repo/blob/sharded" "github.com/kopia/kopia/repo/compression" - "github.com/kopia/kopia/repo/content/index" "github.com/kopia/kopia/repo/hashing" "github.com/kopia/kopia/repo/logging" ) @@ -70,6 +69,11 @@ type indexBlobManager interface { invalidate(ctx context.Context) } +// CrypterProvider returns the crypter to use for encrypting contents and blobs. +type CrypterProvider interface { + Crypter() *Crypter +} + // SharedManager is responsible for read-only access to committed data. type SharedManager struct { // +checkatomic @@ -80,7 +84,6 @@ type SharedManager struct { Stats *Stats st blob.Storage - indexBlobManager indexBlobManager // points at either indexBlobManagerV0 or indexBlobManagerV1 indexBlobManagerV0 *indexBlobManagerV0 indexBlobManagerV1 *indexBlobManagerV1 @@ -88,7 +91,6 @@ type SharedManager struct { metadataCache cache.ContentCache indexBlobCache *cache.PersistentCache committedContents *committedContentIndex - crypter *Crypter enc *encryptedBlobMgr timeNow func() time.Time @@ -101,16 +103,12 @@ type SharedManager struct { // +checklocks:indexesLock refreshIndexesAfter time.Time - format FormattingOptions + format FormattingOptionsProvider + checkInvariantsOnUnlock bool - writeFormatVersion int32 // format version to write - maxPackSize int minPreambleLength int maxPreambleLength int paddingUnit int - repositoryFormatBytes []byte - indexVersion int - indexShardSize int // logger where logs should be written log logging.Logger @@ -123,7 +121,7 @@ type SharedManager struct { // Crypter returns the crypter. func (sm *SharedManager) Crypter() *Crypter { - return sm.crypter + return sm.format.Crypter() } func (sm *SharedManager) readPackFileLocalIndex(ctx context.Context, packFile blob.ID, packFileLength int64, output *gather.WriteBuffer) error { @@ -198,13 +196,13 @@ func (sm *SharedManager) loadPackIndexesLocked(ctx context.Context) error { } if i > 0 { - sm.indexBlobManager.flushCache(ctx) + sm.indexBlobManager().flushCache(ctx) sm.log.Debugf("encountered NOT_FOUND when loading, sleeping %v before retrying #%v", nextSleepTime, i) time.Sleep(nextSleepTime) nextSleepTime *= 2 } - indexBlobs, ignoreDeletedBefore, err := sm.indexBlobManager.listActiveIndexBlobs(ctx) + indexBlobs, ignoreDeletedBefore, err := sm.indexBlobManager().listActiveIndexBlobs(ctx) if err != nil { return errors.Wrap(err, "error listing index blobs") } @@ -246,6 +244,14 @@ func (sm *SharedManager) getCacheForContentID(id ID) cache.ContentCache { return sm.contentCache } +func (sm *SharedManager) indexBlobManager() indexBlobManager { + if sm.format.GetEpochManagerEnabled() { + return sm.indexBlobManagerV1 + } + + return sm.indexBlobManagerV0 +} + func (sm *SharedManager) decryptContentAndVerify(payload gather.Bytes, bi Info, output *gather.WriteBuffer) error { sm.Stats.readContent(payload.Length()) @@ -285,7 +291,7 @@ func (sm *SharedManager) decryptContentAndVerify(payload gather.Bytes, bi Info, } func (sm *SharedManager) decryptAndVerify(encrypted gather.Bytes, iv []byte, output *gather.WriteBuffer) error { - if err := sm.crypter.Encryptor.Decrypt(encrypted, iv, output); err != nil { + if err := sm.format.Crypter().Encryptor.Decrypt(encrypted, iv, output); err != nil { sm.Stats.foundInvalidContent() return errors.Wrap(err, "decrypt") } @@ -316,7 +322,7 @@ func (sm *SharedManager) IndexBlobs(ctx context.Context, includeInactive bool) ( return result, nil } - blobs, _, err := sm.indexBlobManager.listActiveIndexBlobs(ctx) + blobs, _, err := sm.indexBlobManager().listActiveIndexBlobs(ctx) // nolint:wrapcheck return blobs, err @@ -429,54 +435,49 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca } sm.enc = &encryptedBlobMgr{ - st: cachedSt, - crypter: sm.crypter, - indexBlobCache: indexBlobCache, - log: sm.namedLogger("encrypted-blob-manager"), + st: cachedSt, + crypterProvider: sm.format, + indexBlobCache: indexBlobCache, + log: sm.namedLogger("encrypted-blob-manager"), } // set up legacy index blob manager sm.indexBlobManagerV0 = &indexBlobManagerV0{ - st: cachedSt, - enc: sm.enc, - timeNow: sm.timeNow, - maxPackSize: sm.maxPackSize, - indexVersion: sm.indexVersion, - indexShardSize: sm.indexShardSize, - log: sm.namedLogger("index-blob-manager"), + st: cachedSt, + enc: sm.enc, + timeNow: sm.timeNow, + formattingOptions: sm.format, + log: sm.namedLogger("index-blob-manager"), } // set up new index blob manager sm.indexBlobManagerV1 = &indexBlobManagerV1{ - st: cachedSt, - enc: sm.enc, - timeNow: sm.timeNow, - maxPackSize: sm.maxPackSize, - indexShardSize: sm.indexShardSize, - indexVersion: sm.indexVersion, - log: sm.namedLogger("index-blob-manager"), + st: cachedSt, + enc: sm.enc, + timeNow: sm.timeNow, + formattingOptions: sm.format, + log: sm.namedLogger("index-blob-manager"), } - sm.indexBlobManagerV1.epochMgr = epoch.NewManager(cachedSt, sm.format.EpochParameters, sm.indexBlobManagerV1.compactEpoch, sm.namedLogger("epoch-manager"), sm.timeNow) - // select active index blob manager based on parameters - if sm.format.EpochParameters.Enabled { - sm.indexBlobManager = sm.indexBlobManagerV1 - } else { - sm.indexBlobManager = sm.indexBlobManagerV0 - } + sm.indexBlobManagerV1.epochMgr = epoch.NewManager(cachedSt, sm.format, sm.indexBlobManagerV1.compactEpoch, sm.namedLogger("epoch-manager"), sm.timeNow) // once everything is ready, set it up sm.contentCache = dataCache sm.metadataCache = metadataCache sm.indexBlobCache = indexBlobCache - sm.committedContents = newCommittedContentIndex(caching, uint32(sm.crypter.Encryptor.Overhead()), sm.indexVersion, sm.enc.getEncryptedBlob, sm.namedLogger("committed-content-index"), caching.MinIndexSweepAge.DurationOrDefault(DefaultIndexCacheSweepAge)) + sm.committedContents = newCommittedContentIndex(caching, + uint32(sm.format.Crypter().Encryptor.Overhead()), + sm.format.WriteIndexVersion(), + sm.enc.getEncryptedBlob, + sm.namedLogger("committed-content-index"), + caching.MinIndexSweepAge.DurationOrDefault(DefaultIndexCacheSweepAge)) return nil } // EpochManager returns the epoch manager. func (sm *SharedManager) EpochManager() (*epoch.Manager, bool) { - ibm1, ok := sm.indexBlobManager.(*indexBlobManagerV1) + ibm1, ok := sm.indexBlobManager().(*indexBlobManagerV1) if !ok { return nil, false } @@ -546,36 +547,14 @@ func (sm *SharedManager) shouldRefreshIndexes() bool { } // NewSharedManager returns SharedManager that is used by SessionWriteManagers on top of a repository. -func NewSharedManager(ctx context.Context, st blob.Storage, f *FormattingOptions, caching *CachingOptions, opts *ManagerOptions) (*SharedManager, error) { +func NewSharedManager(ctx context.Context, st blob.Storage, prov FormattingOptionsProvider, caching *CachingOptions, opts *ManagerOptions) (*SharedManager, error) { opts = opts.CloneOrDefault() if opts.TimeNow == nil { opts.TimeNow = clock.Now } - if f.Version < minSupportedReadVersion || f.Version > currentWriteVersion { - return nil, errors.Errorf("can't handle repositories created using version %v (min supported %v, max supported %v)", f.Version, minSupportedReadVersion, maxSupportedReadVersion) - } - - if f.Version < minSupportedWriteVersion || f.Version > currentWriteVersion { - return nil, errors.Errorf("can't handle repositories created using version %v (min supported %v, max supported %v)", f.Version, minSupportedWriteVersion, maxSupportedWriteVersion) - } - - crypter, err := CreateCrypter(f) - if err != nil { - return nil, err - } - - actualIndexVersion := f.IndexVersion - if actualIndexVersion == 0 { - actualIndexVersion = legacyIndexVersion - } - - if actualIndexVersion < index.Version1 || actualIndexVersion > index.Version2 { - return nil, errors.Errorf("index version %v is not supported", actualIndexVersion) - } - // create internal logger that will be writing logs as encrypted repository blobs. - ilm := newInternalLogManager(ctx, st, crypter) + ilm := newInternalLogManager(ctx, st, prov) // sharedBaseLogger writes to the both context and internal log // and is used as a base for all content manager components. @@ -588,19 +567,13 @@ func NewSharedManager(ctx context.Context, st blob.Storage, f *FormattingOptions sm := &SharedManager{ st: st, - crypter: crypter, Stats: new(Stats), timeNow: opts.TimeNow, - format: *f, - maxPackSize: f.MaxPackSize, + format: prov, minPreambleLength: defaultMinPreambleLength, maxPreambleLength: defaultMaxPreambleLength, paddingUnit: defaultPaddingUnit, - repositoryFormatBytes: opts.RepositoryFormatBytes, checkInvariantsOnUnlock: os.Getenv("KOPIA_VERIFY_INVARIANTS") != "", - writeFormatVersion: int32(f.Version), - indexVersion: actualIndexVersion, - indexShardSize: defaultIndexShardSize, internalLogManager: ilm, internalLogger: internalLog, contextLogger: logging.Module(FormatLogModule)(ctx), diff --git a/repo/content/content_formatter_test.go b/repo/content/content_formatter_test.go index 7bcbc7a62..c0f74d78f 100644 --- a/repo/content/content_formatter_test.go +++ b/repo/content/content_formatter_test.go @@ -100,7 +100,7 @@ func verifyEndToEndFormatter(ctx context.Context, t *testing.T, hashAlgo, encryp keyTime := map[blob.ID]time.Time{} st := blobtesting.NewMapStorage(data, keyTime, nil) - bm, err := NewManagerForTesting(testlogging.Context(t), st, &FormattingOptions{ + bm, err := NewManagerForTesting(testlogging.Context(t), st, mustCreateFormatProvider(t, &FormattingOptions{ Hash: hashAlgo, Encryption: encryptionAlgo, HMACSecret: hmacSecret, @@ -109,7 +109,7 @@ func verifyEndToEndFormatter(ctx context.Context, t *testing.T, hashAlgo, encryp MaxPackSize: maxPackSize, }, MasterKey: make([]byte, 32), // zero key, does not matter - }, nil, nil) + }), nil, nil) if err != nil { t.Errorf("can't create content manager with hash %v and encryption %v: %v", hashAlgo, encryptionAlgo, err.Error()) return @@ -159,3 +159,12 @@ func verifyEndToEndFormatter(ctx context.Context, t *testing.T, hashAlgo, encryp } } } + +func mustCreateFormatProvider(t *testing.T, f *FormattingOptions) FormattingOptionsProvider { + t.Helper() + + fop, err := NewFormattingOptionsProvider(f, nil) + require.NoError(t, err) + + return fop +} diff --git a/repo/content/content_formatting_options.go b/repo/content/content_formatting_options.go index 666e1720b..b91a07216 100644 --- a/repo/content/content_formatting_options.go +++ b/repo/content/content_formatting_options.go @@ -9,6 +9,8 @@ "github.com/kopia/kopia/internal/units" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/content/index" + "github.com/kopia/kopia/repo/encryption" + "github.com/kopia/kopia/repo/hashing" ) const ( @@ -61,6 +63,16 @@ func (f *FormattingOptions) ResolveFormatVersion() error { } } +// GetMutableParameters implements FormattingOptionsProvider. +func (f *FormattingOptions) GetMutableParameters() MutableParameters { + return f.MutableParameters +} + +// SupportsPasswordChange implements FormattingOptionsProvider. +func (f *FormattingOptions) SupportsPasswordChange() bool { + return f.EnablePasswordChange +} + // MutableParameters represents parameters of the content manager that can be mutated after the repository // is created. type MutableParameters struct { @@ -111,6 +123,149 @@ func (f *FormattingOptions) GetHmacSecret() []byte { return f.HMACSecret } +// FormattingOptionsProvider provides current formatting options. The options returned +// should not be cached for more than a few seconds as they are subject to change. +type FormattingOptionsProvider interface { + epoch.ParametersProvider + IndexFormattingOptions + CrypterProvider + + encryption.Parameters + hashing.Parameters + + GetMutableParameters() MutableParameters + GetMasterKey() []byte + SupportsPasswordChange() bool + FormatVersion() FormatVersion + MaxPackBlobSize() int + RepositoryFormatBytes() []byte + Struct() FormattingOptions +} + +type formattingOptionsProvider struct { + *FormattingOptions + + crypter *Crypter + actualFormatVersion FormatVersion + actualIndexVersion int + formatBytes []byte +} + +func (f *formattingOptionsProvider) FormatVersion() FormatVersion { + return f.Version +} + +// whether epoch manager is enabled, must be true. +func (f *formattingOptionsProvider) GetEpochManagerEnabled() bool { + return f.EpochParameters.Enabled +} + +// how frequently each client will list blobs to determine the current epoch. +func (f *formattingOptionsProvider) GetEpochRefreshFrequency() time.Duration { + return f.EpochParameters.EpochRefreshFrequency +} + +// number of epochs between full checkpoints. +func (f *formattingOptionsProvider) GetEpochFullCheckpointFrequency() int { + return f.EpochParameters.FullCheckpointFrequency +} + +// GetEpochCleanupSafetyMargin returns safety margin to prevent uncompacted blobs from being deleted if the corresponding compacted blob age is less than this. +func (f *formattingOptionsProvider) GetEpochCleanupSafetyMargin() time.Duration { + return f.EpochParameters.CleanupSafetyMargin +} + +// GetMinEpochDuration returns the minimum duration of an epoch. +func (f *formattingOptionsProvider) GetMinEpochDuration() time.Duration { + return f.EpochParameters.MinEpochDuration +} + +// GetEpochAdvanceOnCountThreshold returns the number of files above which epoch should be advanced. +func (f *formattingOptionsProvider) GetEpochAdvanceOnCountThreshold() int { + return f.EpochParameters.EpochAdvanceOnCountThreshold +} + +// GetEpochAdvanceOnTotalSizeBytesThreshold returns the total size of files above which the epoch should be advanced. +func (f *formattingOptionsProvider) GetEpochAdvanceOnTotalSizeBytesThreshold() int64 { + return f.EpochParameters.EpochAdvanceOnTotalSizeBytesThreshold +} + +// GetEpochDeleteParallelism returns the number of blobs to delete in parallel during cleanup. +func (f *formattingOptionsProvider) GetEpochDeleteParallelism() int { + return f.EpochParameters.DeleteParallelism +} + +func (f *formattingOptionsProvider) Struct() FormattingOptions { + return *f.FormattingOptions +} + +// NewFormattingOptionsProvider validates the provided formatting options and returns static +// FormattingOptionsProvider based on them. +func NewFormattingOptionsProvider(f *FormattingOptions, formatBytes []byte) (FormattingOptionsProvider, error) { + formatVersion := f.Version + + if formatVersion < minSupportedReadVersion || formatVersion > currentWriteVersion { + return nil, errors.Errorf("can't handle repositories created using version %v (min supported %v, max supported %v)", formatVersion, minSupportedReadVersion, maxSupportedReadVersion) + } + + if formatVersion < minSupportedWriteVersion || formatVersion > currentWriteVersion { + return nil, errors.Errorf("can't handle repositories created using version %v (min supported %v, max supported %v)", formatVersion, minSupportedWriteVersion, maxSupportedWriteVersion) + } + + actualIndexVersion := f.IndexVersion + if actualIndexVersion == 0 { + actualIndexVersion = legacyIndexVersion + } + + if actualIndexVersion < index.Version1 || actualIndexVersion > index.Version2 { + return nil, errors.Errorf("index version %v is not supported", actualIndexVersion) + } + + crypter, err := CreateCrypter(f) + if err != nil { + return nil, errors.Wrap(err, "unable to create crypter") + } + + return &formattingOptionsProvider{ + FormattingOptions: f, + + crypter: crypter, + actualIndexVersion: actualIndexVersion, + actualFormatVersion: f.Version, + formatBytes: formatBytes, + }, nil +} + +func (f *formattingOptionsProvider) Crypter() *Crypter { + return f.crypter +} + +func (f *formattingOptionsProvider) WriteIndexVersion() int { + return f.actualIndexVersion +} + +func (f *formattingOptionsProvider) MaxIndexBlobSize() int64 { + return int64(f.MaxPackSize) +} + +func (f *formattingOptionsProvider) MaxPackBlobSize() int { + return f.MaxPackSize +} + +func (f *formattingOptionsProvider) GetEpochManagerParameters() epoch.Parameters { + return f.EpochParameters +} + +func (f *formattingOptionsProvider) IndexShardSize() int { + return defaultIndexShardSize +} + +func (f *formattingOptionsProvider) RepositoryFormatBytes() []byte { + return f.formatBytes +} + +var _ FormattingOptionsProvider = (*formattingOptionsProvider)(nil) + // BlobCfgBlob is the content for `kopia.blobcfg` blob which contains the blob // management configuration options. type BlobCfgBlob struct { diff --git a/repo/content/content_index_recovery.go b/repo/content/content_index_recovery.go index ca50479e9..ee4781c5f 100644 --- a/repo/content/content_index_recovery.go +++ b/repo/content/content_index_recovery.go @@ -22,7 +22,7 @@ func (bm *WriteManager) RecoverIndexFromPackBlob(ctx context.Context, packFile b return nil, err } - ndx, err := index.Open(localIndexBytes.Bytes().ToByteSlice(), nil, uint32(bm.crypter.Encryptor.Overhead())) + ndx, err := index.Open(localIndexBytes.Bytes().ToByteSlice(), nil, uint32(bm.format.Crypter().Encryptor.Overhead())) if err != nil { return nil, errors.Errorf("unable to open index in file %v", packFile) } @@ -171,7 +171,7 @@ func decodePostamble(payload []byte) *packContentPostamble { } func (sm *SharedManager) buildLocalIndex(pending index.Builder, output *gather.WriteBuffer) error { - if err := pending.Build(output, sm.indexVersion); err != nil { + if err := pending.Build(output, sm.format.WriteIndexVersion()); err != nil { return errors.Wrap(err, "unable to build local index") } @@ -195,7 +195,7 @@ func (sm *SharedManager) appendPackFileIndexRecoveryData(pending index.Builder, var encryptedLocalIndex gather.WriteBuffer defer encryptedLocalIndex.Close() - if err := sm.crypter.Encryptor.Encrypt(localIndex.Bytes(), localIndexIV, &encryptedLocalIndex); err != nil { + if err := sm.format.Crypter().Encryptor.Encrypt(localIndex.Bytes(), localIndexIV, &encryptedLocalIndex); err != nil { return errors.Wrap(err, "encryption error") } diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index 6f453a542..26e7208a6 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -331,7 +331,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat PackBlobID: pp.packBlobID, PackOffset: uint32(pp.currentPackData.Length()), TimestampSeconds: bm.contentWriteTime(previousWriteTime), - FormatVersion: byte(bm.writeFormatVersion), + FormatVersion: byte(bm.format.FormatVersion()), OriginalLength: uint32(data.Length()), } @@ -345,7 +345,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat pp.currentPackItems[contentID] = info - shouldWrite := pp.currentPackData.Length() >= bm.maxPackSize + shouldWrite := pp.currentPackData.Length() >= bm.format.MaxPackBlobSize() if shouldWrite { // we're about to write to storage without holding a lock // remove from pendingPacks so other goroutine tries to mess with this pending pack. @@ -413,7 +413,7 @@ func (bm *WriteManager) verifyPackIndexBuilderLocked() { bm.assertInvariant(cpi.GetPackBlobID() == "", "content can't be both deleted and have a pack content: %v", cpi.GetContentID()) } else { bm.assertInvariant(cpi.GetPackBlobID() != "", "content that's not deleted must have a pack content: %+v", cpi) - bm.assertInvariant(cpi.GetFormatVersion() == byte(bm.writeFormatVersion), "content that's not deleted must have a valid format version: %+v", cpi) + bm.assertInvariant(cpi.GetFormatVersion() == byte(bm.format.FormatVersion()), "content that's not deleted must have a valid format version: %+v", cpi) } bm.assertInvariant(cpi.GetTimestampSeconds() != 0, "content has no timestamp: %v", cpi.GetContentID()) @@ -438,7 +438,7 @@ func (bm *WriteManager) writeIndexBlobs(ctx context.Context, dataShards []gather defer span.End() // nolint:wrapcheck - return bm.indexBlobManager.writeIndexBlobs(ctx, dataShards, sessionID) + return bm.indexBlobManager().writeIndexBlobs(ctx, dataShards, sessionID) } // +checklocksread:bm.indexesLock @@ -461,7 +461,7 @@ func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context) error { if len(bm.packIndexBuilder) > 0 { _, span2 := tracer.Start(ctx, "BuildShards") - dataShards, closeShards, err := bm.packIndexBuilder.BuildShards(bm.indexVersion, true, bm.indexShardSize) + dataShards, closeShards, err := bm.packIndexBuilder.BuildShards(bm.format.WriteIndexVersion(), true, bm.format.IndexShardSize()) span2.End() @@ -592,7 +592,7 @@ func removePendingPack(slice []*pendingPackInfo, pp *pendingPackInfo) []*pending } // ContentFormat returns formatting options. -func (bm *WriteManager) ContentFormat() FormattingOptions { +func (bm *WriteManager) ContentFormat() FormattingOptionsProvider { return bm.format } @@ -743,7 +743,7 @@ func (bm *WriteManager) getOrCreatePendingPackInfoLocked(ctx context.Context, pr return nil, errors.Wrap(err, "unable to read crypto bytes") } - b.Append(bm.repositoryFormatBytes) + b.Append(bm.format.RepositoryFormatBytes()) // nolint:gosec if err := writeRandomBytesToBuffer(b, rand.Intn(bm.maxPreambleLength-bm.minPreambleLength+1)+bm.minPreambleLength); err != nil { @@ -762,7 +762,7 @@ func (bm *WriteManager) getOrCreatePendingPackInfoLocked(ctx context.Context, pr // SupportsContentCompression returns true if content manager supports content-compression. func (bm *WriteManager) SupportsContentCompression() bool { - return bm.format.IndexVersion >= index.Version2 + return bm.format.WriteIndexVersion() >= index.Version2 } // WriteContent saves a given content of data to a pack group with a provided name and returns a contentID @@ -938,7 +938,7 @@ func (o *ManagerOptions) CloneOrDefault() *ManagerOptions { } // NewManagerForTesting creates new content manager with given packing options and a formatter. -func NewManagerForTesting(ctx context.Context, st blob.Storage, f *FormattingOptions, caching *CachingOptions, options *ManagerOptions) (*WriteManager, error) { +func NewManagerForTesting(ctx context.Context, st blob.Storage, f FormattingOptionsProvider, caching *CachingOptions, options *ManagerOptions) (*WriteManager, error) { options = options.CloneOrDefault() if options.TimeNow == nil { options.TimeNow = clock.Now diff --git a/repo/content/content_manager_indexes.go b/repo/content/content_manager_indexes.go index 9074f53e5..af3ba7237 100644 --- a/repo/content/content_manager_indexes.go +++ b/repo/content/content_manager_indexes.go @@ -38,7 +38,7 @@ func (sm *SharedManager) Refresh(ctx context.Context) error { sm.log.Debugf("Refresh started") - sm.indexBlobManager.invalidate(ctx) + sm.indexBlobManager().invalidate(ctx) timer := timetrack.StartTimer() @@ -57,7 +57,7 @@ func (sm *SharedManager) CompactIndexes(ctx context.Context, opt CompactOptions) sm.log.Debugf("CompactIndexes(%+v)", opt) - if err := sm.indexBlobManager.compact(ctx, opt); err != nil { + if err := sm.indexBlobManager().compact(ctx, opt); err != nil { return errors.Wrap(err, "error performing compaction") } diff --git a/repo/content/content_manager_lock_free.go b/repo/content/content_manager_lock_free.go index 14e7d74c7..52a194b06 100644 --- a/repo/content/content_manager_lock_free.go +++ b/repo/content/content_manager_lock_free.go @@ -30,14 +30,14 @@ func (sm *SharedManager) maybeCompressAndEncryptDataForPacking(data gather.Bytes // If the content is prefixed (which represents Kopia's own metadata as opposed to user data), // and we're on V2 format or greater, enable internal compression even when not requested. - if contentID.HasPrefix() && comp == NoCompression && sm.format.IndexVersion >= index.Version2 { + if contentID.HasPrefix() && comp == NoCompression && sm.format.WriteIndexVersion() >= index.Version2 { // 'zstd-fastest' has a good mix of being fast, low memory usage and high compression for JSON. comp = compression.HeaderZstdFastest } // nolint:nestif if comp != NoCompression { - if sm.format.IndexVersion < index.Version2 { + if sm.format.WriteIndexVersion() < index.Version2 { return NoCompression, errors.Errorf("compression is not enabled for this repository") } @@ -62,7 +62,7 @@ func (sm *SharedManager) maybeCompressAndEncryptDataForPacking(data gather.Bytes } } - if err := sm.crypter.Encryptor.Encrypt(data, iv, output); err != nil { + if err := sm.Crypter().Encryptor.Encrypt(data, iv, output); err != nil { return NoCompression, errors.Wrap(err, "unable to encrypt") } @@ -181,7 +181,7 @@ func (sm *SharedManager) writePackFileNotLocked(ctx context.Context, packFile bl func (sm *SharedManager) hashData(output []byte, data gather.Bytes) []byte { // Hash the content and compute encryption key. - contentID := sm.crypter.HashFunction(output, data) + contentID := sm.format.Crypter().HashFunction(output, data) sm.Stats.hashedContent(data.Length()) return contentID diff --git a/repo/content/content_manager_test.go b/repo/content/content_manager_test.go index 48ff15309..945420770 100644 --- a/repo/content/content_manager_test.go +++ b/repo/content/content_manager_test.go @@ -354,13 +354,13 @@ func (s *contentManagerSuite) TestContentManagerFailedToWritePack(t *testing.T) ta := faketime.NewTimeAdvance(fakeTime, 0) - bm, err := NewManagerForTesting(testlogging.Context(t), st, &FormattingOptions{ + bm, err := NewManagerForTesting(testlogging.Context(t), st, mustCreateFormatProvider(t, &FormattingOptions{ Hash: "HMAC-SHA256-128", Encryption: "AES256-GCM-HMAC-SHA256", MutableParameters: s.mutableParameters, HMACSecret: []byte("foo"), MasterKey: []byte("0123456789abcdef0123456789abcdef"), - }, nil, &ManagerOptions{TimeNow: ta.NowFunc()}) + }), nil, &ManagerOptions{TimeNow: ta.NowFunc()}) if err != nil { t.Fatalf("can't create bm: %v", err) } @@ -1877,11 +1877,11 @@ func (s *contentManagerSuite) verifyVersionCompat(t *testing.T, writeVersion For data := blobtesting.DataMap{} st := blobtesting.NewMapStorage(data, nil, nil) - mgr := s.newTestContentManager(t, st) + mgr := s.newTestContentManagerWithTweaks(t, st, &contentManagerTestTweaks{ + formatVersion: writeVersion, + }) defer mgr.Close(ctx) - mgr.writeFormatVersion = int32(writeVersion) - dataSet := map[ID][]byte{} for i := 0; i < 3000000; i = (i + 1) * 2 { @@ -2331,8 +2331,9 @@ type contentManagerTestTweaks struct { CachingOptions ManagerOptions - indexVersion int - maxPackSize int + indexVersion int + maxPackSize int + formatVersion FormatVersion } func (s *contentManagerSuite) newTestContentManagerWithTweaks(t *testing.T, st blob.Storage, tweaks *contentManagerTestTweaks) *WriteManager { @@ -2357,13 +2358,17 @@ func (s *contentManagerSuite) newTestContentManagerWithTweaks(t *testing.T, st b mp.MaxPackSize = mps } + if tweaks.formatVersion != 0 { + mp.Version = tweaks.formatVersion + } + ctx := testlogging.Context(t) - fo := &FormattingOptions{ + fo := mustCreateFormatProvider(t, &FormattingOptions{ Hash: "HMAC-SHA256", Encryption: "AES256-GCM-HMAC-SHA256", HMACSecret: hmacSecret, MutableParameters: mp, - } + }) bm, err := NewManagerForTesting(ctx, st, fo, &tweaks.CachingOptions, &tweaks.ManagerOptions) if err != nil { diff --git a/repo/content/content_reader.go b/repo/content/content_reader.go index 6b4d51788..79630fa4f 100644 --- a/repo/content/content_reader.go +++ b/repo/content/content_reader.go @@ -9,7 +9,7 @@ // Reader defines content read API. type Reader interface { SupportsContentCompression() bool - ContentFormat() FormattingOptions + ContentFormat() FormattingOptionsProvider GetContent(ctx context.Context, id ID) ([]byte, error) ContentInfo(ctx context.Context, id ID) (Info, error) IterateContents(ctx context.Context, opts IterateOptions, callback IterateCallback) error diff --git a/repo/content/encrypted_blob_mgr.go b/repo/content/encrypted_blob_mgr.go index dbcf17722..e9686555f 100644 --- a/repo/content/encrypted_blob_mgr.go +++ b/repo/content/encrypted_blob_mgr.go @@ -12,10 +12,10 @@ ) type encryptedBlobMgr struct { - st blob.Storage - crypter *Crypter - indexBlobCache *cache.PersistentCache - log logging.Logger + st blob.Storage + crypterProvider CrypterProvider + indexBlobCache *cache.PersistentCache + log logging.Logger } func (m *encryptedBlobMgr) getEncryptedBlob(ctx context.Context, blobID blob.ID, output *gather.WriteBuffer) error { @@ -29,14 +29,14 @@ func (m *encryptedBlobMgr) getEncryptedBlob(ctx context.Context, blobID blob.ID, return errors.Wrap(err, "getContent") } - return m.crypter.DecryptBLOB(payload.Bytes(), blobID, output) + return m.crypterProvider.Crypter().DecryptBLOB(payload.Bytes(), blobID, output) } func (m *encryptedBlobMgr) encryptAndWriteBlob(ctx context.Context, data gather.Bytes, prefix blob.ID, sessionID SessionID) (blob.Metadata, error) { var data2 gather.WriteBuffer defer data2.Close() - blobID, err := m.crypter.EncryptBLOB(data, prefix, sessionID, &data2) + blobID, err := m.crypterProvider.Crypter().EncryptBLOB(data, prefix, sessionID, &data2) if err != nil { return blob.Metadata{}, errors.Wrap(err, "error encrypting") } diff --git a/repo/content/encrypted_blob_mgr_test.go b/repo/content/encrypted_blob_mgr_test.go index a4b233200..ed589e3c1 100644 --- a/repo/content/encrypted_blob_mgr_test.go +++ b/repo/content/encrypted_blob_mgr_test.go @@ -43,10 +43,10 @@ func TestEncryptedBlobManager(t *testing.T) { } ebm := encryptedBlobMgr{ - st: fs, - crypter: cr, - indexBlobCache: nil, - log: logging.NullLogger, + st: fs, + crypterProvider: staticCrypterProvider{cr}, + indexBlobCache: nil, + log: logging.NullLogger, } ctx := testlogging.Context(t) @@ -85,3 +85,11 @@ func TestEncryptedBlobManager(t *testing.T) { _, err = ebm.encryptAndWriteBlob(ctx, gather.FromSlice([]byte{1, 2, 3, 4}), "x", "session1") require.ErrorIs(t, err, someError2) } + +type staticCrypterProvider struct { + theCrypter *Crypter +} + +func (p staticCrypterProvider) Crypter() *Crypter { + return p.theCrypter +} diff --git a/repo/content/index_blob_manager_v0.go b/repo/content/index_blob_manager_v0.go index b8903ea5b..b817e95d4 100644 --- a/repo/content/index_blob_manager_v0.go +++ b/repo/content/index_blob_manager_v0.go @@ -45,14 +45,20 @@ type cleanupEntry struct { age time.Duration // not serialized, computed on load } +// IndexFormattingOptions provides options for formatting index blobs. +type IndexFormattingOptions interface { + MaxIndexBlobSize() int64 + WriteIndexVersion() int + IndexShardSize() int +} + type indexBlobManagerV0 struct { - st blob.Storage - enc *encryptedBlobMgr - timeNow func() time.Time - log logging.Logger - maxPackSize int - indexVersion int - indexShardSize int + st blob.Storage + enc *encryptedBlobMgr + timeNow func() time.Time + log logging.Logger + + formattingOptions IndexFormattingOptions } func (m *indexBlobManagerV0) listActiveIndexBlobs(ctx context.Context) ([]IndexBlobInfo, time.Time, error) { @@ -413,14 +419,14 @@ func (m *indexBlobManagerV0) getBlobsToCompact(indexBlobs []IndexBlobInfo, opt C var mediumSizedBlobCount int for _, b := range indexBlobs { - if b.Length > int64(m.maxPackSize) && !opt.AllIndexes { + if b.Length > m.formattingOptions.MaxIndexBlobSize() && !opt.AllIndexes { continue } nonCompactedBlobs = append(nonCompactedBlobs, b) totalSizeNonCompactedBlobs += b.Length - if b.Length < int64(m.maxPackSize/verySmallContentFraction) { + if b.Length < m.formattingOptions.MaxIndexBlobSize()/verySmallContentFraction { verySmallBlobs = append(verySmallBlobs, b) totalSizeVerySmallBlobs += b.Length } else { @@ -468,7 +474,7 @@ func (m *indexBlobManagerV0) compactIndexBlobs(ctx context.Context, indexBlobs [ // we must do it after all input blobs have been merged, otherwise we may resurrect contents. m.dropContentsFromBuilder(bld, opt) - dataShards, cleanupShards, err := bld.BuildShards(m.indexVersion, false, m.indexShardSize) + dataShards, cleanupShards, err := bld.BuildShards(m.formattingOptions.WriteIndexVersion(), false, m.formattingOptions.IndexShardSize()) if err != nil { return errors.Wrap(err, "unable to build an index") } @@ -520,7 +526,7 @@ func addIndexBlobsToBuilder(ctx context.Context, enc *encryptedBlobMgr, bld inde return errors.Wrapf(err, "error getting index %q", indexBlobID) } - ndx, err := index.Open(data.ToByteSlice(), nil, uint32(enc.crypter.Encryptor.Overhead())) + ndx, err := index.Open(data.ToByteSlice(), nil, uint32(enc.crypterProvider.Crypter().Encryptor.Overhead())) if err != nil { return errors.Wrapf(err, "unable to open index blob %q", indexBlobID) } diff --git a/repo/content/index_blob_manager_v0_test.go b/repo/content/index_blob_manager_v0_test.go index 351d7ddc3..99e30686b 100644 --- a/repo/content/index_blob_manager_v0_test.go +++ b/repo/content/index_blob_manager_v0_test.go @@ -794,16 +794,14 @@ func newIndexBlobManagerForTesting(t *testing.T, st blob.Storage, localTimeNow f enc: &encryptedBlobMgr{ st: st, indexBlobCache: nil, - crypter: &Crypter{ + crypterProvider: staticCrypterProvider{&Crypter{ HashFunction: hf, Encryptor: enc, - }, + }}, log: log, }, - timeNow: localTimeNow, - maxPackSize: 20 << 20, - indexVersion: 1, - log: log, + timeNow: localTimeNow, + log: log, } return m diff --git a/repo/content/index_blob_manager_v1.go b/repo/content/index_blob_manager_v1.go index bcbd5ad4e..74f62929e 100644 --- a/repo/content/index_blob_manager_v1.go +++ b/repo/content/index_blob_manager_v1.go @@ -16,14 +16,12 @@ ) type indexBlobManagerV1 struct { - st blob.Storage - enc *encryptedBlobMgr - epochMgr *epoch.Manager - timeNow func() time.Time - log logging.Logger - maxPackSize int - indexVersion int - indexShardSize int + st blob.Storage + enc *encryptedBlobMgr + epochMgr *epoch.Manager + timeNow func() time.Time + log logging.Logger + formattingOptions IndexFormattingOptions } func (m *indexBlobManagerV1) listActiveIndexBlobs(ctx context.Context) ([]IndexBlobInfo, time.Time, error) { @@ -70,7 +68,7 @@ func (m *indexBlobManagerV1) compactEpoch(ctx context.Context, blobIDs []blob.ID } } - dataShards, cleanupShards, err := tmpbld.BuildShards(m.indexVersion, true, m.indexShardSize) + dataShards, cleanupShards, err := tmpbld.BuildShards(m.formattingOptions.WriteIndexVersion(), true, m.formattingOptions.IndexShardSize()) if err != nil { return errors.Wrap(err, "unable to build index dataShards") } @@ -91,7 +89,7 @@ func (m *indexBlobManagerV1) compactEpoch(ctx context.Context, blobIDs []blob.ID for _, data := range dataShards { data2.Reset() - blobID, err := m.enc.crypter.EncryptBLOB(data, outputPrefix, SessionID(sessionID), &data2) + blobID, err := m.enc.crypterProvider.Crypter().EncryptBLOB(data, outputPrefix, SessionID(sessionID), &data2) if err != nil { return errors.Wrap(err, "error encrypting") } @@ -115,7 +113,7 @@ func (m *indexBlobManagerV1) writeIndexBlobs(ctx context.Context, dataShards []g data2 := gather.NewWriteBuffer() defer data2.Close() //nolint:gocritic - unprefixedBlobID, err := m.enc.crypter.EncryptBLOB(data, "", sessionID, data2) + unprefixedBlobID, err := m.enc.crypterProvider.Crypter().EncryptBLOB(data, "", sessionID, data2) if err != nil { return nil, errors.Wrap(err, "error encrypting") } @@ -131,8 +129,6 @@ func (m *indexBlobManagerV1) writeIndexBlobs(ctx context.Context, dataShards []g // PrepareUpgradeToIndexBlobManagerV1 prepares the repository for migrating to IndexBlobManagerV1. func (sm *SharedManager) PrepareUpgradeToIndexBlobManagerV1(ctx context.Context, params epoch.Parameters) error { - sm.indexBlobManagerV1.epochMgr.Params = params - ibl, _, err := sm.indexBlobManagerV0.listActiveIndexBlobs(ctx) if err != nil { return errors.Wrap(err, "error listing active index blobs") @@ -148,7 +144,5 @@ func (sm *SharedManager) PrepareUpgradeToIndexBlobManagerV1(ctx context.Context, return errors.Wrap(err, "unable to generate initial epoch") } - sm.indexBlobManager = sm.indexBlobManagerV1 - return nil } diff --git a/repo/content/internal_logger.go b/repo/content/internal_logger.go index 2510987c4..7931ceade 100644 --- a/repo/content/internal_logger.go +++ b/repo/content/internal_logger.go @@ -33,7 +33,7 @@ type internalLogManager struct { ctx context.Context // nolint:containedctx st blob.Storage - bc *Crypter + bc CrypterProvider wg sync.WaitGroup timeFunc func() time.Time flushThreshold int @@ -48,7 +48,7 @@ func (m *internalLogManager) encryptAndWriteLogBlob(prefix blob.ID, data gather. encrypted := gather.NewWriteBuffer() // Close happens in a goroutine - blobID, err := m.bc.EncryptBLOB(data, prefix, "", encrypted) + blobID, err := m.bc.Crypter().EncryptBLOB(data, prefix, "", encrypted) if err != nil { encrypted.Close() @@ -208,7 +208,7 @@ func (l *internalLogger) Sync() error { } // newInternalLogManager creates a new blobLogManager that will emit logs as repository blobs with a given prefix. -func newInternalLogManager(ctx context.Context, st blob.Storage, bc *Crypter) *internalLogManager { +func newInternalLogManager(ctx context.Context, st blob.Storage, bc CrypterProvider) *internalLogManager { return &internalLogManager{ ctx: ctx, st: st, diff --git a/repo/content/sessions.go b/repo/content/sessions.go index 46a2af444..7c522f2f6 100644 --- a/repo/content/sessions.go +++ b/repo/content/sessions.go @@ -111,7 +111,7 @@ func (bm *WriteManager) writeSessionMarkerLocked(ctx context.Context) error { var encrypted gather.WriteBuffer defer encrypted.Close() - sessionBlobID, err := bm.crypter.EncryptBLOB(gather.FromSlice(js), BlobIDPrefixSession, bm.currentSessionInfo.ID, &encrypted) + sessionBlobID, err := bm.format.Crypter().EncryptBLOB(gather.FromSlice(js), BlobIDPrefixSession, bm.currentSessionInfo.ID, &encrypted) if err != nil { return errors.Wrap(err, "unable to encrypt session marker") } @@ -178,7 +178,7 @@ func (bm *WriteManager) ListActiveSessions(ctx context.Context) (map[SessionID]* return nil, errors.Wrapf(err, "error loading session: %v", b.BlobID) } - err = bm.crypter.DecryptBLOB(payload.Bytes(), b.BlobID, &decrypted) + err = bm.format.Crypter().DecryptBLOB(payload.Bytes(), b.BlobID, &decrypted) if err != nil { return nil, errors.Wrapf(err, "error decrypting session: %v", b.BlobID) } diff --git a/repo/maintenance/content_rewrite.go b/repo/maintenance/content_rewrite.go index 38a45211a..c8f2561b3 100644 --- a/repo/maintenance/content_rewrite.go +++ b/repo/maintenance/content_rewrite.go @@ -136,7 +136,7 @@ func getContentToRewrite(ctx context.Context, rep repo.DirectRepository, opt *Re // add all content IDs from short packs if opt.ShortPacks { - threshold := int64(rep.ContentReader().ContentFormat().MaxPackSize * shortPackThresholdPercent / 100) //nolint:gomnd + threshold := int64(rep.ContentReader().ContentFormat().MaxPackBlobSize() * shortPackThresholdPercent / 100) //nolint:gomnd findContentInShortPacks(ctx, rep, ch, threshold, opt) } diff --git a/repo/manifest/manifest_manager_test.go b/repo/manifest/manifest_manager_test.go index 6b6b10d62..4ee851e17 100644 --- a/repo/manifest/manifest_manager_test.go +++ b/repo/manifest/manifest_manager_test.go @@ -142,20 +142,19 @@ func TestManifestInitCorruptedBlock(t *testing.T) { data := blobtesting.DataMap{} st := blobtesting.NewMapStorage(data, nil, nil) - f := &content.FormattingOptions{ + fop, err := content.NewFormattingOptionsProvider(&content.FormattingOptions{ Hash: hashing.DefaultAlgorithm, Encryption: encryption.DefaultAlgorithm, MutableParameters: content.MutableParameters{ Version: 1, MaxPackSize: 100000, }, - } + }, nil) + require.NoError(t, err) // write some data to storage - bm, err := content.NewManagerForTesting(ctx, st, f, nil, nil) - if err != nil { - t.Fatalf("err: %v", err) - } + bm, err := content.NewManagerForTesting(ctx, st, fop, nil, nil) + require.NoError(t, err) bm0 := bm @@ -182,10 +181,8 @@ func TestManifestInitCorruptedBlock(t *testing.T) { } // make a new content manager based on corrupted data. - bm, err = content.NewManagerForTesting(ctx, st, f, nil, nil) - if err != nil { - t.Fatalf("err: %v", err) - } + bm, err = content.NewManagerForTesting(ctx, st, fop, nil, nil) + require.NoError(t, err) t.Cleanup(func() { bm.Close(ctx) }) @@ -305,24 +302,24 @@ func newManagerForTesting(ctx context.Context, t *testing.T, data blobtesting.Da st := blobtesting.NewMapStorage(data, nil, nil) - bm, err := content.NewManagerForTesting(ctx, st, &content.FormattingOptions{ + fop, err := content.NewFormattingOptionsProvider(&content.FormattingOptions{ Hash: hashing.DefaultAlgorithm, Encryption: encryption.DefaultAlgorithm, MutableParameters: content.MutableParameters{ Version: 1, MaxPackSize: 100000, }, - }, nil, nil) - if err != nil { - t.Fatalf("can't create content manager: %v", err) - } + }, nil) + + require.NoError(t, err) + + bm, err := content.NewManagerForTesting(ctx, st, fop, nil, nil) + require.NoError(t, err) t.Cleanup(func() { bm.Close(ctx) }) mm, err := NewManager(ctx, bm, ManagerOptions{}) - if err != nil { - t.Fatalf("can't create manifest manager: %v", err) - } + require.NoError(t, err) return mm } diff --git a/repo/open.go b/repo/open.go index db0b508e7..010e1e446 100644 --- a/repo/open.go +++ b/repo/open.go @@ -333,7 +333,12 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw st = upgradeLockMonitor(options.UpgradeOwnerID, st, password, cacheOpts, lc.FormatBlobCacheDuration, ufb.cacheMTime, cmOpts.TimeNow) - scm, err := content.NewSharedManager(ctx, st, fo, cacheOpts, cmOpts) + fop, err := content.NewFormattingOptionsProvider(fo, cmOpts.RepositoryFormatBytes) + if err != nil { + return nil, errors.Wrap(err, "unable to create format options provider") + } + + scm, err := content.NewSharedManager(ctx, st, fop, cacheOpts, cmOpts) if err != nil { return nil, errors.Wrap(err, "unable to create shared content manager") } diff --git a/repo/repository.go b/repo/repository.go index ccf450809..b718fafa8 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -110,8 +110,8 @@ type directRepository struct { // DeriveKey derives encryption key of the provided length from the master key. func (r *directRepository) DeriveKey(purpose []byte, keyLength int) []byte { - if r.cmgr.ContentFormat().EnablePasswordChange { - return deriveKeyFromMasterKey(r.cmgr.ContentFormat().MasterKey, r.uniqueID, purpose, keyLength) + if r.cmgr.ContentFormat().SupportsPasswordChange() { + return deriveKeyFromMasterKey(r.cmgr.ContentFormat().GetMasterKey(), r.uniqueID, purpose, keyLength) } // version of kopia