diff --git a/cli/command_repository_set_parameters.go b/cli/command_repository_set_parameters.go index 96aee8972..435eb4aeb 100644 --- a/cli/command_repository_set_parameters.go +++ b/cli/command_repository_set_parameters.go @@ -113,7 +113,7 @@ func (c *commandRepositorySetParameters) setRetentionModeParameter(ctx context.C func (c *commandRepositorySetParameters) run(ctx context.Context, rep repo.DirectRepositoryWriter) error { var anyChange bool - mp := rep.ContentReader().ContentFormat().MutableParameters + mp := rep.ContentReader().ContentFormat().GetMutableParameters() blobcfg := rep.BlobCfg() upgradeToEpochManager := false diff --git a/cli/command_repository_status.go b/cli/command_repository_status.go index 1f54f1fbf..ded7ec0c5 100644 --- a/cli/command_repository_status.go +++ b/cli/command_repository_status.go @@ -63,8 +63,8 @@ func (c *commandRepositoryStatus) outputJSON(ctx context.Context, r repo.Reposit s.UniqueIDHex = hex.EncodeToString(dr.UniqueID()) s.ObjectFormat = dr.ObjectFormat() s.BlobRetention = dr.BlobCfg() - s.Storage = scrubber.ScrubSensitiveData(reflect.ValueOf(ci)).Interface().(blob.ConnectionInfo) // nolint:forcetypeassert - s.ContentFormat = scrubber.ScrubSensitiveData(reflect.ValueOf(dr.ContentReader().ContentFormat())).Interface().(content.FormattingOptions) // nolint:forcetypeassert + s.Storage = scrubber.ScrubSensitiveData(reflect.ValueOf(ci)).Interface().(blob.ConnectionInfo) // nolint:forcetypeassert + s.ContentFormat = scrubber.ScrubSensitiveData(reflect.ValueOf(dr.ContentReader().ContentFormat().Struct())).Interface().(content.FormattingOptions) // nolint:forcetypeassert switch cp, err := dr.BlobVolume().GetCapacity(ctx); { case err == nil: @@ -170,17 +170,19 @@ func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository) c.out.printStdout("Storage config: %v\n", string(cjson)) } + contentFormat := dr.ContentReader().ContentFormat() + c.out.printStdout("\n") c.out.printStdout("Unique ID: %x\n", dr.UniqueID()) - c.out.printStdout("Hash: %v\n", dr.ContentReader().ContentFormat().Hash) - c.out.printStdout("Encryption: %v\n", dr.ContentReader().ContentFormat().Encryption) + c.out.printStdout("Hash: %v\n", contentFormat.GetHashFunction()) + c.out.printStdout("Encryption: %v\n", contentFormat.GetEncryptionAlgorithm()) c.out.printStdout("Splitter: %v\n", dr.ObjectFormat().Splitter) - c.out.printStdout("Format version: %v\n", dr.ContentReader().ContentFormat().Version) + c.out.printStdout("Format version: %v\n", contentFormat.FormatVersion()) c.out.printStdout("Content compression: %v\n", dr.ContentReader().SupportsContentCompression()) - c.out.printStdout("Password changes: %v\n", dr.ContentReader().ContentFormat().EnablePasswordChange) + c.out.printStdout("Password changes: %v\n", contentFormat.SupportsPasswordChange()) - c.out.printStdout("Max pack length: %v\n", units.BytesStringBase2(int64(dr.ContentReader().ContentFormat().MaxPackSize))) - c.out.printStdout("Index Format: v%v\n", dr.ContentReader().ContentFormat().IndexVersion) + c.out.printStdout("Max pack length: %v\n", units.BytesStringBase2(int64(contentFormat.MaxPackBlobSize()))) + c.out.printStdout("Index Format: v%v\n", contentFormat.WriteIndexVersion()) if emgr, ok := dr.ContentReader().EpochManager(); ok { c.out.printStdout("\n") @@ -192,10 +194,10 @@ func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository) } c.out.printStdout("\n") - c.out.printStdout("Epoch refresh frequency: %v\n", emgr.Params.EpochRefreshFrequency) - c.out.printStdout("Epoch advance on: %v blobs or %v, minimum %v\n", emgr.Params.EpochAdvanceOnCountThreshold, units.BytesStringBase2(emgr.Params.EpochAdvanceOnTotalSizeBytesThreshold), emgr.Params.MinEpochDuration) - c.out.printStdout("Epoch cleanup margin: %v\n", emgr.Params.CleanupSafetyMargin) - c.out.printStdout("Epoch checkpoint every: %v epochs\n", emgr.Params.FullCheckpointFrequency) + c.out.printStdout("Epoch refresh frequency: %v\n", contentFormat.GetEpochRefreshFrequency()) + c.out.printStdout("Epoch advance on: %v blobs or %v, minimum %v\n", contentFormat.GetEpochAdvanceOnCountThreshold(), units.BytesStringBase2(contentFormat.GetEpochAdvanceOnTotalSizeBytesThreshold()), contentFormat.GetMinEpochDuration()) + c.out.printStdout("Epoch cleanup margin: %v\n", contentFormat.GetEpochCleanupSafetyMargin()) + c.out.printStdout("Epoch checkpoint every: %v epochs\n", contentFormat.GetEpochFullCheckpointFrequency()) } else { c.out.printStdout("Epoch Manager: disabled\n") } diff --git a/cli/command_repository_upgrade.go b/cli/command_repository_upgrade.go index 2c6ce52f2..49c7779b6 100644 --- a/cli/command_repository_upgrade.go +++ b/cli/command_repository_upgrade.go @@ -122,7 +122,7 @@ func (c *commandRepositoryUpgrade) setLockIntent(ctx context.Context, rep repo.D } now := rep.Time() - mp := rep.ContentReader().ContentFormat().MutableParameters + mp := rep.ContentReader().ContentFormat().GetMutableParameters() openOpts := c.svc.optionsFromFlags(ctx) l := &repo.UpgradeLockIntent{ OwnerID: openOpts.UpgradeOwnerID, @@ -163,7 +163,7 @@ func (c *commandRepositoryUpgrade) setLockIntent(ctx context.Context, rep repo.D // skipped until the lock is fully established. func (c *commandRepositoryUpgrade) drainOrCommit(ctx context.Context, rep repo.DirectRepositoryWriter) error { cf := rep.ContentReader().ContentFormat() - if cf.MutableParameters.EpochParameters.Enabled { + if cf.GetEpochManagerEnabled() { log(ctx).Infof("Repository indices have already been migrated to the epoch format, no need to drain other clients") l, err := rep.GetUpgradeLockIntent(ctx) @@ -250,7 +250,7 @@ func (c *commandRepositoryUpgrade) drainAllClients(ctx context.Context, rep repo // repository. This phase runs after the lock has been acquired in one of the // prior phases. func (c *commandRepositoryUpgrade) upgrade(ctx context.Context, rep repo.DirectRepositoryWriter) error { - mp := rep.ContentReader().ContentFormat().MutableParameters + mp := rep.ContentReader().ContentFormat().GetMutableParameters() if mp.EpochParameters.Enabled { // nothing to upgrade on format, so let the next action commit the upgraded format blob return nil diff --git a/internal/epoch/epoch_manager.go b/internal/epoch/epoch_manager.go index 79407a187..54a2cfcef 100644 --- a/internal/epoch/epoch_manager.go +++ b/internal/epoch/epoch_manager.go @@ -29,6 +29,33 @@ maxRefreshAttemptSleepExponent = 1.5 ) +// ParametersProvider provides epoch manager parameters. +type ParametersProvider interface { + // whether epoch manager is enabled, must be true. + GetEpochManagerEnabled() bool + + // how frequently each client will list blobs to determine the current epoch. + GetEpochRefreshFrequency() time.Duration + + // number of epochs between full checkpoints. + GetEpochFullCheckpointFrequency() int + + // do not delete uncompacted blobs if the corresponding compacted blob age is less than this. + GetEpochCleanupSafetyMargin() time.Duration + + // minimum duration of an epoch + GetMinEpochDuration() time.Duration + + // advance epoch if number of files exceeds this + GetEpochAdvanceOnCountThreshold() int + + // advance epoch if total size of files exceeds this. + GetEpochAdvanceOnTotalSizeBytesThreshold() int64 + + // number of blobs to delete in parallel during cleanup + GetEpochDeleteParallelism() int +} + // ErrVerySlowIndexWrite is returned by WriteIndex if a write takes more than 2 epochs (usually >48h). // This is theoretically possible with laptops going to sleep, etc. var ErrVerySlowIndexWrite = errors.Errorf("extremely slow index write - index write took more than two epochs") @@ -60,6 +87,46 @@ type Parameters struct { DeleteParallelism int } +// GetEpochManagerEnabled returns whether epoch manager is enabled, must be true. +func (p *Parameters) GetEpochManagerEnabled() bool { + return p.Enabled +} + +// GetEpochRefreshFrequency determines how frequently each client will list blobs to determine the current epoch. +func (p *Parameters) GetEpochRefreshFrequency() time.Duration { + return p.EpochRefreshFrequency +} + +// GetEpochFullCheckpointFrequency returns the number of epochs between full checkpoints. +func (p *Parameters) GetEpochFullCheckpointFrequency() int { + return p.FullCheckpointFrequency +} + +// GetEpochCleanupSafetyMargin returns safety margin to prevent uncompacted blobs from being deleted if the corresponding compacted blob age is less than this. +func (p *Parameters) GetEpochCleanupSafetyMargin() time.Duration { + return p.CleanupSafetyMargin +} + +// GetMinEpochDuration returns the minimum duration of an epoch. +func (p *Parameters) GetMinEpochDuration() time.Duration { + return p.MinEpochDuration +} + +// GetEpochAdvanceOnCountThreshold returns the number of files above which epoch should be advanced. +func (p *Parameters) GetEpochAdvanceOnCountThreshold() int { + return p.EpochAdvanceOnCountThreshold +} + +// GetEpochAdvanceOnTotalSizeBytesThreshold returns the total size of files above which the epoch should be advanced. +func (p *Parameters) GetEpochAdvanceOnTotalSizeBytesThreshold() int64 { + return p.EpochAdvanceOnTotalSizeBytesThreshold +} + +// GetEpochDeleteParallelism returns the number of blobs to delete in parallel during cleanup. +func (p *Parameters) GetEpochDeleteParallelism() int { + return p.DeleteParallelism +} + // Validate validates epoch parameters. // nolint:gomnd func (p *Parameters) Validate() error { @@ -129,7 +196,7 @@ func (cs *CurrentSnapshot) isSettledEpochNumber(epoch int) bool { // Manager manages repository epochs. type Manager struct { - Params Parameters + Parameters ParametersProvider st blob.Storage compact CompactionFunc @@ -264,7 +331,7 @@ func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot) error // only delete blobs if a suitable replacement exists and has been written sufficiently // long ago. we don't want to delete blobs that are created too recently, because other clients // may have not observed them yet. - maxReplacementTime := maxTime.Add(-e.Params.CleanupSafetyMargin) + maxReplacementTime := maxTime.Add(-e.Parameters.GetEpochCleanupSafetyMargin()) eg.Go(func() error { return e.cleanupEpochMarkers(ctx, cs) @@ -289,7 +356,7 @@ func (e *Manager) cleanupEpochMarkers(ctx context.Context, cs CurrentSnapshot) e } } - return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.Params.DeleteParallelism), "error deleting index blob marker") + return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.Parameters.GetEpochDeleteParallelism()), "error deleting index blob marker") } func (e *Manager) cleanupWatermarks(ctx context.Context, cs CurrentSnapshot, maxReplacementTime time.Time) error { @@ -310,7 +377,7 @@ func (e *Manager) cleanupWatermarks(ctx context.Context, cs CurrentSnapshot, max } } - return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.Params.DeleteParallelism), "error deleting watermark blobs") + return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.Parameters.GetEpochDeleteParallelism()), "error deleting watermark blobs") } // CleanupSupersededIndexes cleans up the indexes which have been superseded by compacted ones. @@ -331,7 +398,7 @@ func (e *Manager) CleanupSupersededIndexes(ctx context.Context) error { // only delete blobs if a suitable replacement exists and has been written sufficiently // long ago. we don't want to delete blobs that are created too recently, because other clients // may have not observed them yet. - maxReplacementTime := maxTime.Add(-e.Params.CleanupSafetyMargin) + maxReplacementTime := maxTime.Add(-e.Parameters.GetEpochCleanupSafetyMargin()) e.log.Debugw("Cleaning up superseded index blobs...", "maxReplacementTime", maxReplacementTime) @@ -353,7 +420,7 @@ func (e *Manager) CleanupSupersededIndexes(ctx context.Context) error { } } - if err := blob.DeleteMultiple(ctx, e.st, toDelete, e.Params.DeleteParallelism); err != nil { + if err := blob.DeleteMultiple(ctx, e.st, toDelete, e.Parameters.GetEpochDeleteParallelism()); err != nil { return errors.Wrap(err, "unable to delete uncompacted blobs") } @@ -378,7 +445,7 @@ func (e *Manager) refreshLocked(ctx context.Context) error { nextDelayTime := initiaRefreshAttemptSleep - if !e.Params.Enabled { + if !e.Parameters.GetEpochManagerEnabled() { return errors.Errorf("epoch manager not enabled") } @@ -497,7 +564,7 @@ func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs firstNonRangeCompacted = cs.LongestRangeCheckpointSets[len(cs.LongestRangeCheckpointSets)-1].MaxEpoch + 1 } - if latestSettled-firstNonRangeCompacted < e.Params.FullCheckpointFrequency { + if latestSettled-firstNonRangeCompacted < e.Parameters.GetEpochFullCheckpointFrequency() { e.log.Debugf("not generating range checkpoint") return @@ -575,7 +642,7 @@ func (e *Manager) refreshAttemptLocked(ctx context.Context) error { EpochStartTime: map[int]time.Time{}, UncompactedEpochSets: map[int][]blob.Metadata{}, SingleEpochCompactionSets: map[int][]blob.Metadata{}, - ValidUntil: e.timeFunc().Add(e.Params.EpochRefreshFrequency), + ValidUntil: e.timeFunc().Add(e.Parameters.GetEpochRefreshFrequency()), } e.log.Debugf("refreshAttemptLocked") @@ -613,7 +680,7 @@ func (e *Manager) refreshAttemptLocked(ctx context.Context) error { len(ues[cs.WriteEpoch+1]), cs.ValidUntil.Format(time.RFC3339Nano)) - if shouldAdvance(cs.UncompactedEpochSets[cs.WriteEpoch], e.Params.MinEpochDuration, e.Params.EpochAdvanceOnCountThreshold, e.Params.EpochAdvanceOnTotalSizeBytesThreshold) { + if shouldAdvance(cs.UncompactedEpochSets[cs.WriteEpoch], e.Parameters.GetMinEpochDuration(), e.Parameters.GetEpochAdvanceOnCountThreshold(), e.Parameters.GetEpochAdvanceOnTotalSizeBytesThreshold()) { if err := e.advanceEpoch(ctx, cs); err != nil { return errors.Wrap(err, "error advancing epoch") } @@ -700,7 +767,7 @@ func (e *Manager) WriteIndex(ctx context.Context, dataShards map[blob.ID]blob.By for { // make sure we have at least 75% of remaining time // nolint:gomnd - cs, err := e.committedState(ctx, 3*e.Params.EpochRefreshFrequency/4) + cs, err := e.committedState(ctx, 3*e.Parameters.GetEpochRefreshFrequency()/4) if err != nil { return nil, errors.Wrap(err, "error getting committed state") } @@ -924,13 +991,13 @@ func rangeCheckpointBlobPrefix(epoch1, epoch2 int) blob.ID { } // NewManager creates new epoch manager. -func NewManager(st blob.Storage, params Parameters, compactor CompactionFunc, log logging.Logger, timeNow func() time.Time) *Manager { +func NewManager(st blob.Storage, paramProvider ParametersProvider, compactor CompactionFunc, log logging.Logger, timeNow func() time.Time) *Manager { return &Manager{ st: st, log: log, compact: compactor, timeFunc: timeNow, - Params: params, + Parameters: paramProvider, getCompleteIndexSetTooSlow: new(int32), committedStateRefreshTooSlow: new(int32), writeIndexTooSlow: new(int32), diff --git a/internal/epoch/epoch_manager_test.go b/internal/epoch/epoch_manager_test.go index fb21a00f7..576d8cd1b 100644 --- a/internal/epoch/epoch_manager_test.go +++ b/internal/epoch/epoch_manager_test.go @@ -92,7 +92,7 @@ func newTestEnv(t *testing.T) *epochManagerTestEnv { st = fs st = logging.NewWrapper(st, testlogging.NewTestLogger(t), "[STORAGE] ") te := &epochManagerTestEnv{unloggedst: unloggedst, st: st, ft: ft} - m := NewManager(te.st, Parameters{ + m := NewManager(te.st, &Parameters{ Enabled: true, EpochRefreshFrequency: 20 * time.Minute, FullCheckpointFrequency: 7, @@ -121,7 +121,7 @@ func (te *epochManagerTestEnv) another() *epochManagerTestEnv { faultyStorage: te.faultyStorage, } - te2.mgr = NewManager(te2.st, te.mgr.Params, te2.compact, te.mgr.log, te.mgr.timeFunc) + te2.mgr = NewManager(te2.st, te.mgr.Parameters, te2.compact, te.mgr.log, te.mgr.timeFunc) return te2 } @@ -577,7 +577,7 @@ func verifySequentialWrites(t *testing.T, te *epochManagerTestEnv) { func TestIndexEpochManager_Disabled(t *testing.T) { te := newTestEnv(t) - te.mgr.Params.Enabled = false + (te.mgr.Parameters).(*Parameters).Enabled = false _, err := te.mgr.Current(testlogging.Context(t)) require.Error(t, err) diff --git a/internal/server/api_repo.go b/internal/server/api_repo.go index 806658917..42475dcdc 100644 --- a/internal/server/api_repo.go +++ b/internal/server/api_repo.go @@ -34,8 +34,8 @@ func handleRepoParameters(ctx context.Context, rc requestContext) (interface{}, } rp := &remoterepoapi.Parameters{ - HashFunction: dr.ContentReader().ContentFormat().Hash, - HMACSecret: dr.ContentReader().ContentFormat().HMACSecret, + HashFunction: dr.ContentReader().ContentFormat().GetHashFunction(), + HMACSecret: dr.ContentReader().ContentFormat().GetHmacSecret(), Format: dr.ObjectFormat(), SupportsContentCompression: dr.ContentReader().SupportsContentCompression(), } @@ -56,9 +56,9 @@ func handleRepoStatus(ctx context.Context, rc requestContext) (interface{}, *api return &serverapi.StatusResponse{ Connected: true, ConfigFile: dr.ConfigFilename(), - Hash: dr.ContentReader().ContentFormat().Hash, - Encryption: dr.ContentReader().ContentFormat().Encryption, - MaxPackSize: dr.ContentReader().ContentFormat().MaxPackSize, + Hash: dr.ContentReader().ContentFormat().GetHashFunction(), + Encryption: dr.ContentReader().ContentFormat().GetEncryptionAlgorithm(), + MaxPackSize: dr.ContentReader().ContentFormat().MaxPackBlobSize(), Splitter: dr.ObjectFormat().Splitter, Storage: dr.BlobReader().ConnectionInfo().Type, ClientOptions: dr.ClientOptions(), diff --git a/internal/server/grpc_session.go b/internal/server/grpc_session.go index 1e334a09b..2bab85e65 100644 --- a/internal/server/grpc_session.go +++ b/internal/server/grpc_session.go @@ -459,8 +459,8 @@ func (s *Server) handleInitialSessionHandshake(srv grpcapi.KopiaRepository_Sessi Response: &grpcapi.SessionResponse_InitializeSession{ InitializeSession: &grpcapi.InitializeSessionResponse{ Parameters: &grpcapi.RepositoryParameters{ - HashFunction: dr.ContentReader().ContentFormat().Hash, - HmacSecret: dr.ContentReader().ContentFormat().HMACSecret, + HashFunction: dr.ContentReader().ContentFormat().GetHashFunction(), + HmacSecret: dr.ContentReader().ContentFormat().GetHmacSecret(), Splitter: dr.ObjectFormat().Splitter, SupportsContentCompression: dr.ContentReader().SupportsContentCompression(), }, diff --git a/repo/content/committed_read_manager.go b/repo/content/committed_read_manager.go index 02adfa56c..188d776fa 100644 --- a/repo/content/committed_read_manager.go +++ b/repo/content/committed_read_manager.go @@ -21,7 +21,6 @@ "github.com/kopia/kopia/repo/blob/filesystem" "github.com/kopia/kopia/repo/blob/sharded" "github.com/kopia/kopia/repo/compression" - "github.com/kopia/kopia/repo/content/index" "github.com/kopia/kopia/repo/hashing" "github.com/kopia/kopia/repo/logging" ) @@ -70,6 +69,11 @@ type indexBlobManager interface { invalidate(ctx context.Context) } +// CrypterProvider returns the crypter to use for encrypting contents and blobs. +type CrypterProvider interface { + Crypter() *Crypter +} + // SharedManager is responsible for read-only access to committed data. type SharedManager struct { // +checkatomic @@ -80,7 +84,6 @@ type SharedManager struct { Stats *Stats st blob.Storage - indexBlobManager indexBlobManager // points at either indexBlobManagerV0 or indexBlobManagerV1 indexBlobManagerV0 *indexBlobManagerV0 indexBlobManagerV1 *indexBlobManagerV1 @@ -88,7 +91,6 @@ type SharedManager struct { metadataCache cache.ContentCache indexBlobCache *cache.PersistentCache committedContents *committedContentIndex - crypter *Crypter enc *encryptedBlobMgr timeNow func() time.Time @@ -101,16 +103,12 @@ type SharedManager struct { // +checklocks:indexesLock refreshIndexesAfter time.Time - format FormattingOptions + format FormattingOptionsProvider + checkInvariantsOnUnlock bool - writeFormatVersion int32 // format version to write - maxPackSize int minPreambleLength int maxPreambleLength int paddingUnit int - repositoryFormatBytes []byte - indexVersion int - indexShardSize int // logger where logs should be written log logging.Logger @@ -123,7 +121,7 @@ type SharedManager struct { // Crypter returns the crypter. func (sm *SharedManager) Crypter() *Crypter { - return sm.crypter + return sm.format.Crypter() } func (sm *SharedManager) readPackFileLocalIndex(ctx context.Context, packFile blob.ID, packFileLength int64, output *gather.WriteBuffer) error { @@ -198,13 +196,13 @@ func (sm *SharedManager) loadPackIndexesLocked(ctx context.Context) error { } if i > 0 { - sm.indexBlobManager.flushCache(ctx) + sm.indexBlobManager().flushCache(ctx) sm.log.Debugf("encountered NOT_FOUND when loading, sleeping %v before retrying #%v", nextSleepTime, i) time.Sleep(nextSleepTime) nextSleepTime *= 2 } - indexBlobs, ignoreDeletedBefore, err := sm.indexBlobManager.listActiveIndexBlobs(ctx) + indexBlobs, ignoreDeletedBefore, err := sm.indexBlobManager().listActiveIndexBlobs(ctx) if err != nil { return errors.Wrap(err, "error listing index blobs") } @@ -246,6 +244,14 @@ func (sm *SharedManager) getCacheForContentID(id ID) cache.ContentCache { return sm.contentCache } +func (sm *SharedManager) indexBlobManager() indexBlobManager { + if sm.format.GetEpochManagerEnabled() { + return sm.indexBlobManagerV1 + } + + return sm.indexBlobManagerV0 +} + func (sm *SharedManager) decryptContentAndVerify(payload gather.Bytes, bi Info, output *gather.WriteBuffer) error { sm.Stats.readContent(payload.Length()) @@ -285,7 +291,7 @@ func (sm *SharedManager) decryptContentAndVerify(payload gather.Bytes, bi Info, } func (sm *SharedManager) decryptAndVerify(encrypted gather.Bytes, iv []byte, output *gather.WriteBuffer) error { - if err := sm.crypter.Encryptor.Decrypt(encrypted, iv, output); err != nil { + if err := sm.format.Crypter().Encryptor.Decrypt(encrypted, iv, output); err != nil { sm.Stats.foundInvalidContent() return errors.Wrap(err, "decrypt") } @@ -316,7 +322,7 @@ func (sm *SharedManager) IndexBlobs(ctx context.Context, includeInactive bool) ( return result, nil } - blobs, _, err := sm.indexBlobManager.listActiveIndexBlobs(ctx) + blobs, _, err := sm.indexBlobManager().listActiveIndexBlobs(ctx) // nolint:wrapcheck return blobs, err @@ -429,54 +435,49 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca } sm.enc = &encryptedBlobMgr{ - st: cachedSt, - crypter: sm.crypter, - indexBlobCache: indexBlobCache, - log: sm.namedLogger("encrypted-blob-manager"), + st: cachedSt, + crypterProvider: sm.format, + indexBlobCache: indexBlobCache, + log: sm.namedLogger("encrypted-blob-manager"), } // set up legacy index blob manager sm.indexBlobManagerV0 = &indexBlobManagerV0{ - st: cachedSt, - enc: sm.enc, - timeNow: sm.timeNow, - maxPackSize: sm.maxPackSize, - indexVersion: sm.indexVersion, - indexShardSize: sm.indexShardSize, - log: sm.namedLogger("index-blob-manager"), + st: cachedSt, + enc: sm.enc, + timeNow: sm.timeNow, + formattingOptions: sm.format, + log: sm.namedLogger("index-blob-manager"), } // set up new index blob manager sm.indexBlobManagerV1 = &indexBlobManagerV1{ - st: cachedSt, - enc: sm.enc, - timeNow: sm.timeNow, - maxPackSize: sm.maxPackSize, - indexShardSize: sm.indexShardSize, - indexVersion: sm.indexVersion, - log: sm.namedLogger("index-blob-manager"), + st: cachedSt, + enc: sm.enc, + timeNow: sm.timeNow, + formattingOptions: sm.format, + log: sm.namedLogger("index-blob-manager"), } - sm.indexBlobManagerV1.epochMgr = epoch.NewManager(cachedSt, sm.format.EpochParameters, sm.indexBlobManagerV1.compactEpoch, sm.namedLogger("epoch-manager"), sm.timeNow) - // select active index blob manager based on parameters - if sm.format.EpochParameters.Enabled { - sm.indexBlobManager = sm.indexBlobManagerV1 - } else { - sm.indexBlobManager = sm.indexBlobManagerV0 - } + sm.indexBlobManagerV1.epochMgr = epoch.NewManager(cachedSt, sm.format, sm.indexBlobManagerV1.compactEpoch, sm.namedLogger("epoch-manager"), sm.timeNow) // once everything is ready, set it up sm.contentCache = dataCache sm.metadataCache = metadataCache sm.indexBlobCache = indexBlobCache - sm.committedContents = newCommittedContentIndex(caching, uint32(sm.crypter.Encryptor.Overhead()), sm.indexVersion, sm.enc.getEncryptedBlob, sm.namedLogger("committed-content-index"), caching.MinIndexSweepAge.DurationOrDefault(DefaultIndexCacheSweepAge)) + sm.committedContents = newCommittedContentIndex(caching, + uint32(sm.format.Crypter().Encryptor.Overhead()), + sm.format.WriteIndexVersion(), + sm.enc.getEncryptedBlob, + sm.namedLogger("committed-content-index"), + caching.MinIndexSweepAge.DurationOrDefault(DefaultIndexCacheSweepAge)) return nil } // EpochManager returns the epoch manager. func (sm *SharedManager) EpochManager() (*epoch.Manager, bool) { - ibm1, ok := sm.indexBlobManager.(*indexBlobManagerV1) + ibm1, ok := sm.indexBlobManager().(*indexBlobManagerV1) if !ok { return nil, false } @@ -546,36 +547,14 @@ func (sm *SharedManager) shouldRefreshIndexes() bool { } // NewSharedManager returns SharedManager that is used by SessionWriteManagers on top of a repository. -func NewSharedManager(ctx context.Context, st blob.Storage, f *FormattingOptions, caching *CachingOptions, opts *ManagerOptions) (*SharedManager, error) { +func NewSharedManager(ctx context.Context, st blob.Storage, prov FormattingOptionsProvider, caching *CachingOptions, opts *ManagerOptions) (*SharedManager, error) { opts = opts.CloneOrDefault() if opts.TimeNow == nil { opts.TimeNow = clock.Now } - if f.Version < minSupportedReadVersion || f.Version > currentWriteVersion { - return nil, errors.Errorf("can't handle repositories created using version %v (min supported %v, max supported %v)", f.Version, minSupportedReadVersion, maxSupportedReadVersion) - } - - if f.Version < minSupportedWriteVersion || f.Version > currentWriteVersion { - return nil, errors.Errorf("can't handle repositories created using version %v (min supported %v, max supported %v)", f.Version, minSupportedWriteVersion, maxSupportedWriteVersion) - } - - crypter, err := CreateCrypter(f) - if err != nil { - return nil, err - } - - actualIndexVersion := f.IndexVersion - if actualIndexVersion == 0 { - actualIndexVersion = legacyIndexVersion - } - - if actualIndexVersion < index.Version1 || actualIndexVersion > index.Version2 { - return nil, errors.Errorf("index version %v is not supported", actualIndexVersion) - } - // create internal logger that will be writing logs as encrypted repository blobs. - ilm := newInternalLogManager(ctx, st, crypter) + ilm := newInternalLogManager(ctx, st, prov) // sharedBaseLogger writes to the both context and internal log // and is used as a base for all content manager components. @@ -588,19 +567,13 @@ func NewSharedManager(ctx context.Context, st blob.Storage, f *FormattingOptions sm := &SharedManager{ st: st, - crypter: crypter, Stats: new(Stats), timeNow: opts.TimeNow, - format: *f, - maxPackSize: f.MaxPackSize, + format: prov, minPreambleLength: defaultMinPreambleLength, maxPreambleLength: defaultMaxPreambleLength, paddingUnit: defaultPaddingUnit, - repositoryFormatBytes: opts.RepositoryFormatBytes, checkInvariantsOnUnlock: os.Getenv("KOPIA_VERIFY_INVARIANTS") != "", - writeFormatVersion: int32(f.Version), - indexVersion: actualIndexVersion, - indexShardSize: defaultIndexShardSize, internalLogManager: ilm, internalLogger: internalLog, contextLogger: logging.Module(FormatLogModule)(ctx), diff --git a/repo/content/content_formatter_test.go b/repo/content/content_formatter_test.go index 7bcbc7a62..c0f74d78f 100644 --- a/repo/content/content_formatter_test.go +++ b/repo/content/content_formatter_test.go @@ -100,7 +100,7 @@ func verifyEndToEndFormatter(ctx context.Context, t *testing.T, hashAlgo, encryp keyTime := map[blob.ID]time.Time{} st := blobtesting.NewMapStorage(data, keyTime, nil) - bm, err := NewManagerForTesting(testlogging.Context(t), st, &FormattingOptions{ + bm, err := NewManagerForTesting(testlogging.Context(t), st, mustCreateFormatProvider(t, &FormattingOptions{ Hash: hashAlgo, Encryption: encryptionAlgo, HMACSecret: hmacSecret, @@ -109,7 +109,7 @@ func verifyEndToEndFormatter(ctx context.Context, t *testing.T, hashAlgo, encryp MaxPackSize: maxPackSize, }, MasterKey: make([]byte, 32), // zero key, does not matter - }, nil, nil) + }), nil, nil) if err != nil { t.Errorf("can't create content manager with hash %v and encryption %v: %v", hashAlgo, encryptionAlgo, err.Error()) return @@ -159,3 +159,12 @@ func verifyEndToEndFormatter(ctx context.Context, t *testing.T, hashAlgo, encryp } } } + +func mustCreateFormatProvider(t *testing.T, f *FormattingOptions) FormattingOptionsProvider { + t.Helper() + + fop, err := NewFormattingOptionsProvider(f, nil) + require.NoError(t, err) + + return fop +} diff --git a/repo/content/content_formatting_options.go b/repo/content/content_formatting_options.go index 666e1720b..b91a07216 100644 --- a/repo/content/content_formatting_options.go +++ b/repo/content/content_formatting_options.go @@ -9,6 +9,8 @@ "github.com/kopia/kopia/internal/units" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/content/index" + "github.com/kopia/kopia/repo/encryption" + "github.com/kopia/kopia/repo/hashing" ) const ( @@ -61,6 +63,16 @@ func (f *FormattingOptions) ResolveFormatVersion() error { } } +// GetMutableParameters implements FormattingOptionsProvider. +func (f *FormattingOptions) GetMutableParameters() MutableParameters { + return f.MutableParameters +} + +// SupportsPasswordChange implements FormattingOptionsProvider. +func (f *FormattingOptions) SupportsPasswordChange() bool { + return f.EnablePasswordChange +} + // MutableParameters represents parameters of the content manager that can be mutated after the repository // is created. type MutableParameters struct { @@ -111,6 +123,149 @@ func (f *FormattingOptions) GetHmacSecret() []byte { return f.HMACSecret } +// FormattingOptionsProvider provides current formatting options. The options returned +// should not be cached for more than a few seconds as they are subject to change. +type FormattingOptionsProvider interface { + epoch.ParametersProvider + IndexFormattingOptions + CrypterProvider + + encryption.Parameters + hashing.Parameters + + GetMutableParameters() MutableParameters + GetMasterKey() []byte + SupportsPasswordChange() bool + FormatVersion() FormatVersion + MaxPackBlobSize() int + RepositoryFormatBytes() []byte + Struct() FormattingOptions +} + +type formattingOptionsProvider struct { + *FormattingOptions + + crypter *Crypter + actualFormatVersion FormatVersion + actualIndexVersion int + formatBytes []byte +} + +func (f *formattingOptionsProvider) FormatVersion() FormatVersion { + return f.Version +} + +// whether epoch manager is enabled, must be true. +func (f *formattingOptionsProvider) GetEpochManagerEnabled() bool { + return f.EpochParameters.Enabled +} + +// how frequently each client will list blobs to determine the current epoch. +func (f *formattingOptionsProvider) GetEpochRefreshFrequency() time.Duration { + return f.EpochParameters.EpochRefreshFrequency +} + +// number of epochs between full checkpoints. +func (f *formattingOptionsProvider) GetEpochFullCheckpointFrequency() int { + return f.EpochParameters.FullCheckpointFrequency +} + +// GetEpochCleanupSafetyMargin returns safety margin to prevent uncompacted blobs from being deleted if the corresponding compacted blob age is less than this. +func (f *formattingOptionsProvider) GetEpochCleanupSafetyMargin() time.Duration { + return f.EpochParameters.CleanupSafetyMargin +} + +// GetMinEpochDuration returns the minimum duration of an epoch. +func (f *formattingOptionsProvider) GetMinEpochDuration() time.Duration { + return f.EpochParameters.MinEpochDuration +} + +// GetEpochAdvanceOnCountThreshold returns the number of files above which epoch should be advanced. +func (f *formattingOptionsProvider) GetEpochAdvanceOnCountThreshold() int { + return f.EpochParameters.EpochAdvanceOnCountThreshold +} + +// GetEpochAdvanceOnTotalSizeBytesThreshold returns the total size of files above which the epoch should be advanced. +func (f *formattingOptionsProvider) GetEpochAdvanceOnTotalSizeBytesThreshold() int64 { + return f.EpochParameters.EpochAdvanceOnTotalSizeBytesThreshold +} + +// GetEpochDeleteParallelism returns the number of blobs to delete in parallel during cleanup. +func (f *formattingOptionsProvider) GetEpochDeleteParallelism() int { + return f.EpochParameters.DeleteParallelism +} + +func (f *formattingOptionsProvider) Struct() FormattingOptions { + return *f.FormattingOptions +} + +// NewFormattingOptionsProvider validates the provided formatting options and returns static +// FormattingOptionsProvider based on them. +func NewFormattingOptionsProvider(f *FormattingOptions, formatBytes []byte) (FormattingOptionsProvider, error) { + formatVersion := f.Version + + if formatVersion < minSupportedReadVersion || formatVersion > currentWriteVersion { + return nil, errors.Errorf("can't handle repositories created using version %v (min supported %v, max supported %v)", formatVersion, minSupportedReadVersion, maxSupportedReadVersion) + } + + if formatVersion < minSupportedWriteVersion || formatVersion > currentWriteVersion { + return nil, errors.Errorf("can't handle repositories created using version %v (min supported %v, max supported %v)", formatVersion, minSupportedWriteVersion, maxSupportedWriteVersion) + } + + actualIndexVersion := f.IndexVersion + if actualIndexVersion == 0 { + actualIndexVersion = legacyIndexVersion + } + + if actualIndexVersion < index.Version1 || actualIndexVersion > index.Version2 { + return nil, errors.Errorf("index version %v is not supported", actualIndexVersion) + } + + crypter, err := CreateCrypter(f) + if err != nil { + return nil, errors.Wrap(err, "unable to create crypter") + } + + return &formattingOptionsProvider{ + FormattingOptions: f, + + crypter: crypter, + actualIndexVersion: actualIndexVersion, + actualFormatVersion: f.Version, + formatBytes: formatBytes, + }, nil +} + +func (f *formattingOptionsProvider) Crypter() *Crypter { + return f.crypter +} + +func (f *formattingOptionsProvider) WriteIndexVersion() int { + return f.actualIndexVersion +} + +func (f *formattingOptionsProvider) MaxIndexBlobSize() int64 { + return int64(f.MaxPackSize) +} + +func (f *formattingOptionsProvider) MaxPackBlobSize() int { + return f.MaxPackSize +} + +func (f *formattingOptionsProvider) GetEpochManagerParameters() epoch.Parameters { + return f.EpochParameters +} + +func (f *formattingOptionsProvider) IndexShardSize() int { + return defaultIndexShardSize +} + +func (f *formattingOptionsProvider) RepositoryFormatBytes() []byte { + return f.formatBytes +} + +var _ FormattingOptionsProvider = (*formattingOptionsProvider)(nil) + // BlobCfgBlob is the content for `kopia.blobcfg` blob which contains the blob // management configuration options. type BlobCfgBlob struct { diff --git a/repo/content/content_index_recovery.go b/repo/content/content_index_recovery.go index ca50479e9..ee4781c5f 100644 --- a/repo/content/content_index_recovery.go +++ b/repo/content/content_index_recovery.go @@ -22,7 +22,7 @@ func (bm *WriteManager) RecoverIndexFromPackBlob(ctx context.Context, packFile b return nil, err } - ndx, err := index.Open(localIndexBytes.Bytes().ToByteSlice(), nil, uint32(bm.crypter.Encryptor.Overhead())) + ndx, err := index.Open(localIndexBytes.Bytes().ToByteSlice(), nil, uint32(bm.format.Crypter().Encryptor.Overhead())) if err != nil { return nil, errors.Errorf("unable to open index in file %v", packFile) } @@ -171,7 +171,7 @@ func decodePostamble(payload []byte) *packContentPostamble { } func (sm *SharedManager) buildLocalIndex(pending index.Builder, output *gather.WriteBuffer) error { - if err := pending.Build(output, sm.indexVersion); err != nil { + if err := pending.Build(output, sm.format.WriteIndexVersion()); err != nil { return errors.Wrap(err, "unable to build local index") } @@ -195,7 +195,7 @@ func (sm *SharedManager) appendPackFileIndexRecoveryData(pending index.Builder, var encryptedLocalIndex gather.WriteBuffer defer encryptedLocalIndex.Close() - if err := sm.crypter.Encryptor.Encrypt(localIndex.Bytes(), localIndexIV, &encryptedLocalIndex); err != nil { + if err := sm.format.Crypter().Encryptor.Encrypt(localIndex.Bytes(), localIndexIV, &encryptedLocalIndex); err != nil { return errors.Wrap(err, "encryption error") } diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index 6f453a542..26e7208a6 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -331,7 +331,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat PackBlobID: pp.packBlobID, PackOffset: uint32(pp.currentPackData.Length()), TimestampSeconds: bm.contentWriteTime(previousWriteTime), - FormatVersion: byte(bm.writeFormatVersion), + FormatVersion: byte(bm.format.FormatVersion()), OriginalLength: uint32(data.Length()), } @@ -345,7 +345,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat pp.currentPackItems[contentID] = info - shouldWrite := pp.currentPackData.Length() >= bm.maxPackSize + shouldWrite := pp.currentPackData.Length() >= bm.format.MaxPackBlobSize() if shouldWrite { // we're about to write to storage without holding a lock // remove from pendingPacks so other goroutine tries to mess with this pending pack. @@ -413,7 +413,7 @@ func (bm *WriteManager) verifyPackIndexBuilderLocked() { bm.assertInvariant(cpi.GetPackBlobID() == "", "content can't be both deleted and have a pack content: %v", cpi.GetContentID()) } else { bm.assertInvariant(cpi.GetPackBlobID() != "", "content that's not deleted must have a pack content: %+v", cpi) - bm.assertInvariant(cpi.GetFormatVersion() == byte(bm.writeFormatVersion), "content that's not deleted must have a valid format version: %+v", cpi) + bm.assertInvariant(cpi.GetFormatVersion() == byte(bm.format.FormatVersion()), "content that's not deleted must have a valid format version: %+v", cpi) } bm.assertInvariant(cpi.GetTimestampSeconds() != 0, "content has no timestamp: %v", cpi.GetContentID()) @@ -438,7 +438,7 @@ func (bm *WriteManager) writeIndexBlobs(ctx context.Context, dataShards []gather defer span.End() // nolint:wrapcheck - return bm.indexBlobManager.writeIndexBlobs(ctx, dataShards, sessionID) + return bm.indexBlobManager().writeIndexBlobs(ctx, dataShards, sessionID) } // +checklocksread:bm.indexesLock @@ -461,7 +461,7 @@ func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context) error { if len(bm.packIndexBuilder) > 0 { _, span2 := tracer.Start(ctx, "BuildShards") - dataShards, closeShards, err := bm.packIndexBuilder.BuildShards(bm.indexVersion, true, bm.indexShardSize) + dataShards, closeShards, err := bm.packIndexBuilder.BuildShards(bm.format.WriteIndexVersion(), true, bm.format.IndexShardSize()) span2.End() @@ -592,7 +592,7 @@ func removePendingPack(slice []*pendingPackInfo, pp *pendingPackInfo) []*pending } // ContentFormat returns formatting options. -func (bm *WriteManager) ContentFormat() FormattingOptions { +func (bm *WriteManager) ContentFormat() FormattingOptionsProvider { return bm.format } @@ -743,7 +743,7 @@ func (bm *WriteManager) getOrCreatePendingPackInfoLocked(ctx context.Context, pr return nil, errors.Wrap(err, "unable to read crypto bytes") } - b.Append(bm.repositoryFormatBytes) + b.Append(bm.format.RepositoryFormatBytes()) // nolint:gosec if err := writeRandomBytesToBuffer(b, rand.Intn(bm.maxPreambleLength-bm.minPreambleLength+1)+bm.minPreambleLength); err != nil { @@ -762,7 +762,7 @@ func (bm *WriteManager) getOrCreatePendingPackInfoLocked(ctx context.Context, pr // SupportsContentCompression returns true if content manager supports content-compression. func (bm *WriteManager) SupportsContentCompression() bool { - return bm.format.IndexVersion >= index.Version2 + return bm.format.WriteIndexVersion() >= index.Version2 } // WriteContent saves a given content of data to a pack group with a provided name and returns a contentID @@ -938,7 +938,7 @@ func (o *ManagerOptions) CloneOrDefault() *ManagerOptions { } // NewManagerForTesting creates new content manager with given packing options and a formatter. -func NewManagerForTesting(ctx context.Context, st blob.Storage, f *FormattingOptions, caching *CachingOptions, options *ManagerOptions) (*WriteManager, error) { +func NewManagerForTesting(ctx context.Context, st blob.Storage, f FormattingOptionsProvider, caching *CachingOptions, options *ManagerOptions) (*WriteManager, error) { options = options.CloneOrDefault() if options.TimeNow == nil { options.TimeNow = clock.Now diff --git a/repo/content/content_manager_indexes.go b/repo/content/content_manager_indexes.go index 9074f53e5..af3ba7237 100644 --- a/repo/content/content_manager_indexes.go +++ b/repo/content/content_manager_indexes.go @@ -38,7 +38,7 @@ func (sm *SharedManager) Refresh(ctx context.Context) error { sm.log.Debugf("Refresh started") - sm.indexBlobManager.invalidate(ctx) + sm.indexBlobManager().invalidate(ctx) timer := timetrack.StartTimer() @@ -57,7 +57,7 @@ func (sm *SharedManager) CompactIndexes(ctx context.Context, opt CompactOptions) sm.log.Debugf("CompactIndexes(%+v)", opt) - if err := sm.indexBlobManager.compact(ctx, opt); err != nil { + if err := sm.indexBlobManager().compact(ctx, opt); err != nil { return errors.Wrap(err, "error performing compaction") } diff --git a/repo/content/content_manager_lock_free.go b/repo/content/content_manager_lock_free.go index 14e7d74c7..52a194b06 100644 --- a/repo/content/content_manager_lock_free.go +++ b/repo/content/content_manager_lock_free.go @@ -30,14 +30,14 @@ func (sm *SharedManager) maybeCompressAndEncryptDataForPacking(data gather.Bytes // If the content is prefixed (which represents Kopia's own metadata as opposed to user data), // and we're on V2 format or greater, enable internal compression even when not requested. - if contentID.HasPrefix() && comp == NoCompression && sm.format.IndexVersion >= index.Version2 { + if contentID.HasPrefix() && comp == NoCompression && sm.format.WriteIndexVersion() >= index.Version2 { // 'zstd-fastest' has a good mix of being fast, low memory usage and high compression for JSON. comp = compression.HeaderZstdFastest } // nolint:nestif if comp != NoCompression { - if sm.format.IndexVersion < index.Version2 { + if sm.format.WriteIndexVersion() < index.Version2 { return NoCompression, errors.Errorf("compression is not enabled for this repository") } @@ -62,7 +62,7 @@ func (sm *SharedManager) maybeCompressAndEncryptDataForPacking(data gather.Bytes } } - if err := sm.crypter.Encryptor.Encrypt(data, iv, output); err != nil { + if err := sm.Crypter().Encryptor.Encrypt(data, iv, output); err != nil { return NoCompression, errors.Wrap(err, "unable to encrypt") } @@ -181,7 +181,7 @@ func (sm *SharedManager) writePackFileNotLocked(ctx context.Context, packFile bl func (sm *SharedManager) hashData(output []byte, data gather.Bytes) []byte { // Hash the content and compute encryption key. - contentID := sm.crypter.HashFunction(output, data) + contentID := sm.format.Crypter().HashFunction(output, data) sm.Stats.hashedContent(data.Length()) return contentID diff --git a/repo/content/content_manager_test.go b/repo/content/content_manager_test.go index 48ff15309..945420770 100644 --- a/repo/content/content_manager_test.go +++ b/repo/content/content_manager_test.go @@ -354,13 +354,13 @@ func (s *contentManagerSuite) TestContentManagerFailedToWritePack(t *testing.T) ta := faketime.NewTimeAdvance(fakeTime, 0) - bm, err := NewManagerForTesting(testlogging.Context(t), st, &FormattingOptions{ + bm, err := NewManagerForTesting(testlogging.Context(t), st, mustCreateFormatProvider(t, &FormattingOptions{ Hash: "HMAC-SHA256-128", Encryption: "AES256-GCM-HMAC-SHA256", MutableParameters: s.mutableParameters, HMACSecret: []byte("foo"), MasterKey: []byte("0123456789abcdef0123456789abcdef"), - }, nil, &ManagerOptions{TimeNow: ta.NowFunc()}) + }), nil, &ManagerOptions{TimeNow: ta.NowFunc()}) if err != nil { t.Fatalf("can't create bm: %v", err) } @@ -1877,11 +1877,11 @@ func (s *contentManagerSuite) verifyVersionCompat(t *testing.T, writeVersion For data := blobtesting.DataMap{} st := blobtesting.NewMapStorage(data, nil, nil) - mgr := s.newTestContentManager(t, st) + mgr := s.newTestContentManagerWithTweaks(t, st, &contentManagerTestTweaks{ + formatVersion: writeVersion, + }) defer mgr.Close(ctx) - mgr.writeFormatVersion = int32(writeVersion) - dataSet := map[ID][]byte{} for i := 0; i < 3000000; i = (i + 1) * 2 { @@ -2331,8 +2331,9 @@ type contentManagerTestTweaks struct { CachingOptions ManagerOptions - indexVersion int - maxPackSize int + indexVersion int + maxPackSize int + formatVersion FormatVersion } func (s *contentManagerSuite) newTestContentManagerWithTweaks(t *testing.T, st blob.Storage, tweaks *contentManagerTestTweaks) *WriteManager { @@ -2357,13 +2358,17 @@ func (s *contentManagerSuite) newTestContentManagerWithTweaks(t *testing.T, st b mp.MaxPackSize = mps } + if tweaks.formatVersion != 0 { + mp.Version = tweaks.formatVersion + } + ctx := testlogging.Context(t) - fo := &FormattingOptions{ + fo := mustCreateFormatProvider(t, &FormattingOptions{ Hash: "HMAC-SHA256", Encryption: "AES256-GCM-HMAC-SHA256", HMACSecret: hmacSecret, MutableParameters: mp, - } + }) bm, err := NewManagerForTesting(ctx, st, fo, &tweaks.CachingOptions, &tweaks.ManagerOptions) if err != nil { diff --git a/repo/content/content_reader.go b/repo/content/content_reader.go index 6b4d51788..79630fa4f 100644 --- a/repo/content/content_reader.go +++ b/repo/content/content_reader.go @@ -9,7 +9,7 @@ // Reader defines content read API. type Reader interface { SupportsContentCompression() bool - ContentFormat() FormattingOptions + ContentFormat() FormattingOptionsProvider GetContent(ctx context.Context, id ID) ([]byte, error) ContentInfo(ctx context.Context, id ID) (Info, error) IterateContents(ctx context.Context, opts IterateOptions, callback IterateCallback) error diff --git a/repo/content/encrypted_blob_mgr.go b/repo/content/encrypted_blob_mgr.go index dbcf17722..e9686555f 100644 --- a/repo/content/encrypted_blob_mgr.go +++ b/repo/content/encrypted_blob_mgr.go @@ -12,10 +12,10 @@ ) type encryptedBlobMgr struct { - st blob.Storage - crypter *Crypter - indexBlobCache *cache.PersistentCache - log logging.Logger + st blob.Storage + crypterProvider CrypterProvider + indexBlobCache *cache.PersistentCache + log logging.Logger } func (m *encryptedBlobMgr) getEncryptedBlob(ctx context.Context, blobID blob.ID, output *gather.WriteBuffer) error { @@ -29,14 +29,14 @@ func (m *encryptedBlobMgr) getEncryptedBlob(ctx context.Context, blobID blob.ID, return errors.Wrap(err, "getContent") } - return m.crypter.DecryptBLOB(payload.Bytes(), blobID, output) + return m.crypterProvider.Crypter().DecryptBLOB(payload.Bytes(), blobID, output) } func (m *encryptedBlobMgr) encryptAndWriteBlob(ctx context.Context, data gather.Bytes, prefix blob.ID, sessionID SessionID) (blob.Metadata, error) { var data2 gather.WriteBuffer defer data2.Close() - blobID, err := m.crypter.EncryptBLOB(data, prefix, sessionID, &data2) + blobID, err := m.crypterProvider.Crypter().EncryptBLOB(data, prefix, sessionID, &data2) if err != nil { return blob.Metadata{}, errors.Wrap(err, "error encrypting") } diff --git a/repo/content/encrypted_blob_mgr_test.go b/repo/content/encrypted_blob_mgr_test.go index a4b233200..ed589e3c1 100644 --- a/repo/content/encrypted_blob_mgr_test.go +++ b/repo/content/encrypted_blob_mgr_test.go @@ -43,10 +43,10 @@ func TestEncryptedBlobManager(t *testing.T) { } ebm := encryptedBlobMgr{ - st: fs, - crypter: cr, - indexBlobCache: nil, - log: logging.NullLogger, + st: fs, + crypterProvider: staticCrypterProvider{cr}, + indexBlobCache: nil, + log: logging.NullLogger, } ctx := testlogging.Context(t) @@ -85,3 +85,11 @@ func TestEncryptedBlobManager(t *testing.T) { _, err = ebm.encryptAndWriteBlob(ctx, gather.FromSlice([]byte{1, 2, 3, 4}), "x", "session1") require.ErrorIs(t, err, someError2) } + +type staticCrypterProvider struct { + theCrypter *Crypter +} + +func (p staticCrypterProvider) Crypter() *Crypter { + return p.theCrypter +} diff --git a/repo/content/index_blob_manager_v0.go b/repo/content/index_blob_manager_v0.go index b8903ea5b..b817e95d4 100644 --- a/repo/content/index_blob_manager_v0.go +++ b/repo/content/index_blob_manager_v0.go @@ -45,14 +45,20 @@ type cleanupEntry struct { age time.Duration // not serialized, computed on load } +// IndexFormattingOptions provides options for formatting index blobs. +type IndexFormattingOptions interface { + MaxIndexBlobSize() int64 + WriteIndexVersion() int + IndexShardSize() int +} + type indexBlobManagerV0 struct { - st blob.Storage - enc *encryptedBlobMgr - timeNow func() time.Time - log logging.Logger - maxPackSize int - indexVersion int - indexShardSize int + st blob.Storage + enc *encryptedBlobMgr + timeNow func() time.Time + log logging.Logger + + formattingOptions IndexFormattingOptions } func (m *indexBlobManagerV0) listActiveIndexBlobs(ctx context.Context) ([]IndexBlobInfo, time.Time, error) { @@ -413,14 +419,14 @@ func (m *indexBlobManagerV0) getBlobsToCompact(indexBlobs []IndexBlobInfo, opt C var mediumSizedBlobCount int for _, b := range indexBlobs { - if b.Length > int64(m.maxPackSize) && !opt.AllIndexes { + if b.Length > m.formattingOptions.MaxIndexBlobSize() && !opt.AllIndexes { continue } nonCompactedBlobs = append(nonCompactedBlobs, b) totalSizeNonCompactedBlobs += b.Length - if b.Length < int64(m.maxPackSize/verySmallContentFraction) { + if b.Length < m.formattingOptions.MaxIndexBlobSize()/verySmallContentFraction { verySmallBlobs = append(verySmallBlobs, b) totalSizeVerySmallBlobs += b.Length } else { @@ -468,7 +474,7 @@ func (m *indexBlobManagerV0) compactIndexBlobs(ctx context.Context, indexBlobs [ // we must do it after all input blobs have been merged, otherwise we may resurrect contents. m.dropContentsFromBuilder(bld, opt) - dataShards, cleanupShards, err := bld.BuildShards(m.indexVersion, false, m.indexShardSize) + dataShards, cleanupShards, err := bld.BuildShards(m.formattingOptions.WriteIndexVersion(), false, m.formattingOptions.IndexShardSize()) if err != nil { return errors.Wrap(err, "unable to build an index") } @@ -520,7 +526,7 @@ func addIndexBlobsToBuilder(ctx context.Context, enc *encryptedBlobMgr, bld inde return errors.Wrapf(err, "error getting index %q", indexBlobID) } - ndx, err := index.Open(data.ToByteSlice(), nil, uint32(enc.crypter.Encryptor.Overhead())) + ndx, err := index.Open(data.ToByteSlice(), nil, uint32(enc.crypterProvider.Crypter().Encryptor.Overhead())) if err != nil { return errors.Wrapf(err, "unable to open index blob %q", indexBlobID) } diff --git a/repo/content/index_blob_manager_v0_test.go b/repo/content/index_blob_manager_v0_test.go index 351d7ddc3..99e30686b 100644 --- a/repo/content/index_blob_manager_v0_test.go +++ b/repo/content/index_blob_manager_v0_test.go @@ -794,16 +794,14 @@ func newIndexBlobManagerForTesting(t *testing.T, st blob.Storage, localTimeNow f enc: &encryptedBlobMgr{ st: st, indexBlobCache: nil, - crypter: &Crypter{ + crypterProvider: staticCrypterProvider{&Crypter{ HashFunction: hf, Encryptor: enc, - }, + }}, log: log, }, - timeNow: localTimeNow, - maxPackSize: 20 << 20, - indexVersion: 1, - log: log, + timeNow: localTimeNow, + log: log, } return m diff --git a/repo/content/index_blob_manager_v1.go b/repo/content/index_blob_manager_v1.go index bcbd5ad4e..74f62929e 100644 --- a/repo/content/index_blob_manager_v1.go +++ b/repo/content/index_blob_manager_v1.go @@ -16,14 +16,12 @@ ) type indexBlobManagerV1 struct { - st blob.Storage - enc *encryptedBlobMgr - epochMgr *epoch.Manager - timeNow func() time.Time - log logging.Logger - maxPackSize int - indexVersion int - indexShardSize int + st blob.Storage + enc *encryptedBlobMgr + epochMgr *epoch.Manager + timeNow func() time.Time + log logging.Logger + formattingOptions IndexFormattingOptions } func (m *indexBlobManagerV1) listActiveIndexBlobs(ctx context.Context) ([]IndexBlobInfo, time.Time, error) { @@ -70,7 +68,7 @@ func (m *indexBlobManagerV1) compactEpoch(ctx context.Context, blobIDs []blob.ID } } - dataShards, cleanupShards, err := tmpbld.BuildShards(m.indexVersion, true, m.indexShardSize) + dataShards, cleanupShards, err := tmpbld.BuildShards(m.formattingOptions.WriteIndexVersion(), true, m.formattingOptions.IndexShardSize()) if err != nil { return errors.Wrap(err, "unable to build index dataShards") } @@ -91,7 +89,7 @@ func (m *indexBlobManagerV1) compactEpoch(ctx context.Context, blobIDs []blob.ID for _, data := range dataShards { data2.Reset() - blobID, err := m.enc.crypter.EncryptBLOB(data, outputPrefix, SessionID(sessionID), &data2) + blobID, err := m.enc.crypterProvider.Crypter().EncryptBLOB(data, outputPrefix, SessionID(sessionID), &data2) if err != nil { return errors.Wrap(err, "error encrypting") } @@ -115,7 +113,7 @@ func (m *indexBlobManagerV1) writeIndexBlobs(ctx context.Context, dataShards []g data2 := gather.NewWriteBuffer() defer data2.Close() //nolint:gocritic - unprefixedBlobID, err := m.enc.crypter.EncryptBLOB(data, "", sessionID, data2) + unprefixedBlobID, err := m.enc.crypterProvider.Crypter().EncryptBLOB(data, "", sessionID, data2) if err != nil { return nil, errors.Wrap(err, "error encrypting") } @@ -131,8 +129,6 @@ func (m *indexBlobManagerV1) writeIndexBlobs(ctx context.Context, dataShards []g // PrepareUpgradeToIndexBlobManagerV1 prepares the repository for migrating to IndexBlobManagerV1. func (sm *SharedManager) PrepareUpgradeToIndexBlobManagerV1(ctx context.Context, params epoch.Parameters) error { - sm.indexBlobManagerV1.epochMgr.Params = params - ibl, _, err := sm.indexBlobManagerV0.listActiveIndexBlobs(ctx) if err != nil { return errors.Wrap(err, "error listing active index blobs") @@ -148,7 +144,5 @@ func (sm *SharedManager) PrepareUpgradeToIndexBlobManagerV1(ctx context.Context, return errors.Wrap(err, "unable to generate initial epoch") } - sm.indexBlobManager = sm.indexBlobManagerV1 - return nil } diff --git a/repo/content/internal_logger.go b/repo/content/internal_logger.go index 2510987c4..7931ceade 100644 --- a/repo/content/internal_logger.go +++ b/repo/content/internal_logger.go @@ -33,7 +33,7 @@ type internalLogManager struct { ctx context.Context // nolint:containedctx st blob.Storage - bc *Crypter + bc CrypterProvider wg sync.WaitGroup timeFunc func() time.Time flushThreshold int @@ -48,7 +48,7 @@ func (m *internalLogManager) encryptAndWriteLogBlob(prefix blob.ID, data gather. encrypted := gather.NewWriteBuffer() // Close happens in a goroutine - blobID, err := m.bc.EncryptBLOB(data, prefix, "", encrypted) + blobID, err := m.bc.Crypter().EncryptBLOB(data, prefix, "", encrypted) if err != nil { encrypted.Close() @@ -208,7 +208,7 @@ func (l *internalLogger) Sync() error { } // newInternalLogManager creates a new blobLogManager that will emit logs as repository blobs with a given prefix. -func newInternalLogManager(ctx context.Context, st blob.Storage, bc *Crypter) *internalLogManager { +func newInternalLogManager(ctx context.Context, st blob.Storage, bc CrypterProvider) *internalLogManager { return &internalLogManager{ ctx: ctx, st: st, diff --git a/repo/content/sessions.go b/repo/content/sessions.go index 46a2af444..7c522f2f6 100644 --- a/repo/content/sessions.go +++ b/repo/content/sessions.go @@ -111,7 +111,7 @@ func (bm *WriteManager) writeSessionMarkerLocked(ctx context.Context) error { var encrypted gather.WriteBuffer defer encrypted.Close() - sessionBlobID, err := bm.crypter.EncryptBLOB(gather.FromSlice(js), BlobIDPrefixSession, bm.currentSessionInfo.ID, &encrypted) + sessionBlobID, err := bm.format.Crypter().EncryptBLOB(gather.FromSlice(js), BlobIDPrefixSession, bm.currentSessionInfo.ID, &encrypted) if err != nil { return errors.Wrap(err, "unable to encrypt session marker") } @@ -178,7 +178,7 @@ func (bm *WriteManager) ListActiveSessions(ctx context.Context) (map[SessionID]* return nil, errors.Wrapf(err, "error loading session: %v", b.BlobID) } - err = bm.crypter.DecryptBLOB(payload.Bytes(), b.BlobID, &decrypted) + err = bm.format.Crypter().DecryptBLOB(payload.Bytes(), b.BlobID, &decrypted) if err != nil { return nil, errors.Wrapf(err, "error decrypting session: %v", b.BlobID) } diff --git a/repo/maintenance/content_rewrite.go b/repo/maintenance/content_rewrite.go index 38a45211a..c8f2561b3 100644 --- a/repo/maintenance/content_rewrite.go +++ b/repo/maintenance/content_rewrite.go @@ -136,7 +136,7 @@ func getContentToRewrite(ctx context.Context, rep repo.DirectRepository, opt *Re // add all content IDs from short packs if opt.ShortPacks { - threshold := int64(rep.ContentReader().ContentFormat().MaxPackSize * shortPackThresholdPercent / 100) //nolint:gomnd + threshold := int64(rep.ContentReader().ContentFormat().MaxPackBlobSize() * shortPackThresholdPercent / 100) //nolint:gomnd findContentInShortPacks(ctx, rep, ch, threshold, opt) } diff --git a/repo/manifest/manifest_manager_test.go b/repo/manifest/manifest_manager_test.go index 6b6b10d62..4ee851e17 100644 --- a/repo/manifest/manifest_manager_test.go +++ b/repo/manifest/manifest_manager_test.go @@ -142,20 +142,19 @@ func TestManifestInitCorruptedBlock(t *testing.T) { data := blobtesting.DataMap{} st := blobtesting.NewMapStorage(data, nil, nil) - f := &content.FormattingOptions{ + fop, err := content.NewFormattingOptionsProvider(&content.FormattingOptions{ Hash: hashing.DefaultAlgorithm, Encryption: encryption.DefaultAlgorithm, MutableParameters: content.MutableParameters{ Version: 1, MaxPackSize: 100000, }, - } + }, nil) + require.NoError(t, err) // write some data to storage - bm, err := content.NewManagerForTesting(ctx, st, f, nil, nil) - if err != nil { - t.Fatalf("err: %v", err) - } + bm, err := content.NewManagerForTesting(ctx, st, fop, nil, nil) + require.NoError(t, err) bm0 := bm @@ -182,10 +181,8 @@ func TestManifestInitCorruptedBlock(t *testing.T) { } // make a new content manager based on corrupted data. - bm, err = content.NewManagerForTesting(ctx, st, f, nil, nil) - if err != nil { - t.Fatalf("err: %v", err) - } + bm, err = content.NewManagerForTesting(ctx, st, fop, nil, nil) + require.NoError(t, err) t.Cleanup(func() { bm.Close(ctx) }) @@ -305,24 +302,24 @@ func newManagerForTesting(ctx context.Context, t *testing.T, data blobtesting.Da st := blobtesting.NewMapStorage(data, nil, nil) - bm, err := content.NewManagerForTesting(ctx, st, &content.FormattingOptions{ + fop, err := content.NewFormattingOptionsProvider(&content.FormattingOptions{ Hash: hashing.DefaultAlgorithm, Encryption: encryption.DefaultAlgorithm, MutableParameters: content.MutableParameters{ Version: 1, MaxPackSize: 100000, }, - }, nil, nil) - if err != nil { - t.Fatalf("can't create content manager: %v", err) - } + }, nil) + + require.NoError(t, err) + + bm, err := content.NewManagerForTesting(ctx, st, fop, nil, nil) + require.NoError(t, err) t.Cleanup(func() { bm.Close(ctx) }) mm, err := NewManager(ctx, bm, ManagerOptions{}) - if err != nil { - t.Fatalf("can't create manifest manager: %v", err) - } + require.NoError(t, err) return mm } diff --git a/repo/open.go b/repo/open.go index db0b508e7..010e1e446 100644 --- a/repo/open.go +++ b/repo/open.go @@ -333,7 +333,12 @@ func openWithConfig(ctx context.Context, st blob.Storage, lc *LocalConfig, passw st = upgradeLockMonitor(options.UpgradeOwnerID, st, password, cacheOpts, lc.FormatBlobCacheDuration, ufb.cacheMTime, cmOpts.TimeNow) - scm, err := content.NewSharedManager(ctx, st, fo, cacheOpts, cmOpts) + fop, err := content.NewFormattingOptionsProvider(fo, cmOpts.RepositoryFormatBytes) + if err != nil { + return nil, errors.Wrap(err, "unable to create format options provider") + } + + scm, err := content.NewSharedManager(ctx, st, fop, cacheOpts, cmOpts) if err != nil { return nil, errors.Wrap(err, "unable to create shared content manager") } diff --git a/repo/repository.go b/repo/repository.go index ccf450809..b718fafa8 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -110,8 +110,8 @@ type directRepository struct { // DeriveKey derives encryption key of the provided length from the master key. func (r *directRepository) DeriveKey(purpose []byte, keyLength int) []byte { - if r.cmgr.ContentFormat().EnablePasswordChange { - return deriveKeyFromMasterKey(r.cmgr.ContentFormat().MasterKey, r.uniqueID, purpose, keyLength) + if r.cmgr.ContentFormat().SupportsPasswordChange() { + return deriveKeyFromMasterKey(r.cmgr.ContentFormat().GetMasterKey(), r.uniqueID, purpose, keyLength) } // version of kopia