From 23299c34512e640d50e1ddff888758a119a09cb8 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Sat, 6 Aug 2022 18:11:32 -0700 Subject: [PATCH] refactor(repository): ensure MutableParameters are never cached (#2284) --- cli/auto_upgrade.go | 7 +- cli/command_index_epoch_list.go | 6 +- cli/command_maintenance_run.go | 8 +- cli/command_repository_status.go | 31 +++++-- cli/command_repository_upgrade.go | 20 +++- internal/epoch/epoch_manager.go | 104 ++++++++++++--------- internal/epoch/epoch_manager_test.go | 16 +++- internal/server/api_repo.go | 21 ++++- internal/server/grpc_session.go | 7 +- repo/api_server_repository.go | 4 +- repo/content/committed_content_index.go | 14 ++- repo/content/committed_read_manager.go | 63 ++++++++++--- repo/content/content_index_recovery.go | 7 +- repo/content/content_manager.go | 73 +++++++++++---- repo/content/content_manager_indexes.go | 16 +++- repo/content/content_manager_lock_free.go | 7 +- repo/content/content_manager_test.go | 7 +- repo/content/content_reader.go | 4 +- repo/content/index_blob_manager_v0.go | 37 +++++--- repo/content/index_blob_manager_v1.go | 7 +- repo/format/content_format.go | 4 +- repo/format/format_provider.go | 108 +++------------------- repo/grpc_repository_client.go | 4 +- repo/maintenance/content_rewrite.go | 7 +- repo/maintenance/maintenance_run.go | 13 ++- repo/maintenance/maintenance_schedule.go | 7 +- repo/object/object_manager.go | 2 +- repo/object/object_manager_test.go | 4 +- repo/object/object_writer.go | 7 +- repo/repository.go | 15 ++- repo/upgrade_lock_test.go | 12 ++- 31 files changed, 387 insertions(+), 255 deletions(-) diff --git a/cli/auto_upgrade.go b/cli/auto_upgrade.go index f84f9d5fa..d217ee2a1 100644 --- a/cli/auto_upgrade.go +++ b/cli/auto_upgrade.go @@ -40,7 +40,12 @@ func setDefaultMaintenanceParameters(ctx context.Context, rep repo.RepositoryWri p.Owner = rep.ClientOptions().UsernameAtHost() if dw, ok := rep.(repo.DirectRepositoryWriter); ok { - if _, ok := dw.ContentReader().EpochManager(); ok { + _, ok, err := dw.ContentReader().EpochManager() + if err != nil { + return errors.Wrap(err, "epoch manager") + } + + if ok { // disable quick maintenance cycle p.QuickCycle.Enabled = false } diff --git a/cli/command_index_epoch_list.go b/cli/command_index_epoch_list.go index f50755fb9..027a8700e 100644 --- a/cli/command_index_epoch_list.go +++ b/cli/command_index_epoch_list.go @@ -23,7 +23,11 @@ func (c *commandIndexEpochList) setup(svc appServices, parent commandParent) { } func (c *commandIndexEpochList) run(ctx context.Context, rep repo.DirectRepository) error { - emgr, ok := rep.ContentReader().EpochManager() + emgr, ok, err := rep.ContentReader().EpochManager() + if err != nil { + return errors.Wrap(err, "epoch manager") + } + if !ok { return errors.Errorf("epoch manager is not active") } diff --git a/cli/command_maintenance_run.go b/cli/command_maintenance_run.go index 60a205090..a01c5e4d0 100644 --- a/cli/command_maintenance_run.go +++ b/cli/command_maintenance_run.go @@ -3,6 +3,8 @@ import ( "context" + "github.com/pkg/errors" + "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/maintenance" "github.com/kopia/kopia/snapshot/snapshotmaintenance" @@ -25,7 +27,11 @@ func (c *commandMaintenanceRun) setup(svc appServices, parent commandParent) { func (c *commandMaintenanceRun) run(ctx context.Context, rep repo.DirectRepositoryWriter) error { mode := maintenance.ModeQuick - _, supportsEpochManager := rep.ContentManager().EpochManager() + + _, supportsEpochManager, err := rep.ContentManager().EpochManager() + if err != nil { + return errors.Wrap(err, "EpochManager") + } if c.maintenanceRunFull || supportsEpochManager { mode = maintenance.ModeFull diff --git a/cli/command_repository_status.go b/cli/command_repository_status.go index 38d919e31..b1f55bb67 100644 --- a/cli/command_repository_status.go +++ b/cli/command_repository_status.go @@ -15,6 +15,7 @@ "github.com/kopia/kopia/internal/units" "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/content/index" "github.com/kopia/kopia/repo/format" ) @@ -126,7 +127,7 @@ func (c *commandRepositoryStatus) dumpRetentionStatus(dr repo.DirectRepository) } } -// nolint: funlen +// nolint: funlen,gocyclo func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository) error { if c.jo.jsonOutput { return c.outputJSON(ctx, rep) @@ -172,21 +173,31 @@ func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository) contentFormat := dr.ContentReader().ContentFormat() + mp, mperr := contentFormat.GetMutableParameters() + if mperr != nil { + return errors.Wrap(mperr, "mutable parameters") + } + c.out.printStdout("\n") c.out.printStdout("Unique ID: %x\n", dr.UniqueID()) c.out.printStdout("Hash: %v\n", contentFormat.GetHashFunction()) c.out.printStdout("Encryption: %v\n", contentFormat.GetEncryptionAlgorithm()) c.out.printStdout("Splitter: %v\n", dr.ObjectFormat().Splitter) - c.out.printStdout("Format version: %v\n", contentFormat.FormatVersion()) - c.out.printStdout("Content compression: %v\n", dr.ContentReader().SupportsContentCompression()) + c.out.printStdout("Format version: %v\n", mp.Version) + c.out.printStdout("Content compression: %v\n", mp.IndexVersion >= index.Version2) c.out.printStdout("Password changes: %v\n", contentFormat.SupportsPasswordChange()) c.outputRequiredFeatures(dr) - c.out.printStdout("Max pack length: %v\n", units.BytesStringBase2(int64(contentFormat.MaxPackBlobSize()))) - c.out.printStdout("Index Format: v%v\n", contentFormat.WriteIndexVersion()) + c.out.printStdout("Max pack length: %v\n", units.BytesStringBase2(int64(mp.MaxPackSize))) + c.out.printStdout("Index Format: v%v\n", mp.IndexVersion) - if emgr, ok := dr.ContentReader().EpochManager(); ok { + emgr, epochMgrEnabled, emerr := dr.ContentReader().EpochManager() + if emerr != nil { + return errors.Wrap(emerr, "epoch manager") + } + + if epochMgrEnabled { c.out.printStdout("\n") c.out.printStdout("Epoch Manager: enabled\n") @@ -196,10 +207,10 @@ func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository) } c.out.printStdout("\n") - c.out.printStdout("Epoch refresh frequency: %v\n", contentFormat.GetEpochRefreshFrequency()) - c.out.printStdout("Epoch advance on: %v blobs or %v, minimum %v\n", contentFormat.GetEpochAdvanceOnCountThreshold(), units.BytesStringBase2(contentFormat.GetEpochAdvanceOnTotalSizeBytesThreshold()), contentFormat.GetMinEpochDuration()) - c.out.printStdout("Epoch cleanup margin: %v\n", contentFormat.GetEpochCleanupSafetyMargin()) - c.out.printStdout("Epoch checkpoint every: %v epochs\n", contentFormat.GetEpochFullCheckpointFrequency()) + c.out.printStdout("Epoch refresh frequency: %v\n", mp.EpochParameters.EpochRefreshFrequency) + c.out.printStdout("Epoch advance on: %v blobs or %v, minimum %v\n", mp.EpochParameters.EpochAdvanceOnCountThreshold, units.BytesStringBase2(mp.EpochParameters.EpochAdvanceOnTotalSizeBytesThreshold), mp.EpochParameters.MinEpochDuration) + c.out.printStdout("Epoch cleanup margin: %v\n", mp.EpochParameters.CleanupSafetyMargin) + c.out.printStdout("Epoch checkpoint every: %v epochs\n", mp.EpochParameters.FullCheckpointFrequency) } else { c.out.printStdout("Epoch Manager: disabled\n") } diff --git a/cli/command_repository_upgrade.go b/cli/command_repository_upgrade.go index 36a850fd1..b8dae36ae 100644 --- a/cli/command_repository_upgrade.go +++ b/cli/command_repository_upgrade.go @@ -123,7 +123,12 @@ func (c *commandRepositoryUpgrade) setLockIntent(ctx context.Context, rep repo.D } now := rep.Time() - mp := rep.ContentReader().ContentFormat().GetMutableParameters() + + mp, mperr := rep.ContentReader().ContentFormat().GetMutableParameters() + if mperr != nil { + return errors.Wrap(mperr, "mutable parameters") + } + openOpts := c.svc.optionsFromFlags(ctx) l := &format.UpgradeLockIntent{ OwnerID: openOpts.UpgradeOwnerID, @@ -164,7 +169,13 @@ func (c *commandRepositoryUpgrade) setLockIntent(ctx context.Context, rep repo.D // skipped until the lock is fully established. func (c *commandRepositoryUpgrade) drainOrCommit(ctx context.Context, rep repo.DirectRepositoryWriter) error { cf := rep.ContentReader().ContentFormat() - if cf.GetEpochManagerEnabled() { + + mp, mperr := cf.GetMutableParameters() + if mperr != nil { + return errors.Wrap(mperr, "mutable parameters") + } + + if mp.EpochParameters.Enabled { log(ctx).Infof("Repository indices have already been migrated to the epoch format, no need to drain other clients") l, err := rep.GetUpgradeLockIntent(ctx) @@ -251,7 +262,10 @@ func (c *commandRepositoryUpgrade) drainAllClients(ctx context.Context, rep repo // repository. This phase runs after the lock has been acquired in one of the // prior phases. func (c *commandRepositoryUpgrade) upgrade(ctx context.Context, rep repo.DirectRepositoryWriter) error { - mp := rep.ContentReader().ContentFormat().GetMutableParameters() + mp, mperr := rep.ContentReader().ContentFormat().GetMutableParameters() + if mperr != nil { + return errors.Wrap(mperr, "mutable parameters") + } rf, err := rep.RequiredFeatures() if err != nil { diff --git a/internal/epoch/epoch_manager.go b/internal/epoch/epoch_manager.go index 54a2cfcef..b4709216c 100644 --- a/internal/epoch/epoch_manager.go +++ b/internal/epoch/epoch_manager.go @@ -31,29 +31,7 @@ // ParametersProvider provides epoch manager parameters. type ParametersProvider interface { - // whether epoch manager is enabled, must be true. - GetEpochManagerEnabled() bool - - // how frequently each client will list blobs to determine the current epoch. - GetEpochRefreshFrequency() time.Duration - - // number of epochs between full checkpoints. - GetEpochFullCheckpointFrequency() int - - // do not delete uncompacted blobs if the corresponding compacted blob age is less than this. - GetEpochCleanupSafetyMargin() time.Duration - - // minimum duration of an epoch - GetMinEpochDuration() time.Duration - - // advance epoch if number of files exceeds this - GetEpochAdvanceOnCountThreshold() int - - // advance epoch if total size of files exceeds this. - GetEpochAdvanceOnTotalSizeBytesThreshold() int64 - - // number of blobs to delete in parallel during cleanup - GetEpochDeleteParallelism() int + GetParameters() (*Parameters, error) } // ErrVerySlowIndexWrite is returned by WriteIndex if a write takes more than 2 epochs (usually >48h). @@ -196,7 +174,7 @@ func (cs *CurrentSnapshot) isSettledEpochNumber(epoch int) bool { // Manager manages repository epochs. type Manager struct { - Parameters ParametersProvider + paramProvider ParametersProvider st blob.Storage compact CompactionFunc @@ -317,7 +295,7 @@ func (e *Manager) maxCleanupTime(cs CurrentSnapshot) time.Time { return maxTime } -func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot) error { +func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot, p *Parameters) error { eg, ctx := errgroup.WithContext(ctx) // find max timestamp recently written to the repository to establish storage clock. @@ -331,14 +309,14 @@ func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot) error // only delete blobs if a suitable replacement exists and has been written sufficiently // long ago. we don't want to delete blobs that are created too recently, because other clients // may have not observed them yet. - maxReplacementTime := maxTime.Add(-e.Parameters.GetEpochCleanupSafetyMargin()) + maxReplacementTime := maxTime.Add(-p.CleanupSafetyMargin) eg.Go(func() error { return e.cleanupEpochMarkers(ctx, cs) }) eg.Go(func() error { - return e.cleanupWatermarks(ctx, cs, maxReplacementTime) + return e.cleanupWatermarks(ctx, cs, p, maxReplacementTime) }) return errors.Wrap(eg.Wait(), "error cleaning up index blobs") @@ -356,10 +334,15 @@ func (e *Manager) cleanupEpochMarkers(ctx context.Context, cs CurrentSnapshot) e } } - return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.Parameters.GetEpochDeleteParallelism()), "error deleting index blob marker") + p, err := e.getParameters() + if err != nil { + return err + } + + return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, p.DeleteParallelism), "error deleting index blob marker") } -func (e *Manager) cleanupWatermarks(ctx context.Context, cs CurrentSnapshot, maxReplacementTime time.Time) error { +func (e *Manager) cleanupWatermarks(ctx context.Context, cs CurrentSnapshot, p *Parameters, maxReplacementTime time.Time) error { var toDelete []blob.ID for _, bm := range cs.DeletionWatermarkBlobs { @@ -377,7 +360,7 @@ func (e *Manager) cleanupWatermarks(ctx context.Context, cs CurrentSnapshot, max } } - return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.Parameters.GetEpochDeleteParallelism()), "error deleting watermark blobs") + return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, p.DeleteParallelism), "error deleting watermark blobs") } // CleanupSupersededIndexes cleans up the indexes which have been superseded by compacted ones. @@ -387,6 +370,11 @@ func (e *Manager) CleanupSupersededIndexes(ctx context.Context) error { return err } + p, err := e.getParameters() + if err != nil { + return err + } + // find max timestamp recently written to the repository to establish storage clock. // we will be deleting blobs whose timestamps are sufficiently old enough relative // to this max time. This assumes that storage clock moves forward somewhat reasonably. @@ -398,7 +386,7 @@ func (e *Manager) CleanupSupersededIndexes(ctx context.Context) error { // only delete blobs if a suitable replacement exists and has been written sufficiently // long ago. we don't want to delete blobs that are created too recently, because other clients // may have not observed them yet. - maxReplacementTime := maxTime.Add(-e.Parameters.GetEpochCleanupSafetyMargin()) + maxReplacementTime := maxTime.Add(-p.CleanupSafetyMargin) e.log.Debugw("Cleaning up superseded index blobs...", "maxReplacementTime", maxReplacementTime) @@ -420,7 +408,7 @@ func (e *Manager) CleanupSupersededIndexes(ctx context.Context) error { } } - if err := blob.DeleteMultiple(ctx, e.st, toDelete, e.Parameters.GetEpochDeleteParallelism()); err != nil { + if err := blob.DeleteMultiple(ctx, e.st, toDelete, p.DeleteParallelism); err != nil { return errors.Wrap(err, "unable to delete uncompacted blobs") } @@ -438,14 +426,28 @@ func blobSetWrittenEarlyEnough(replacementSet []blob.Metadata, maxReplacementTim return blob.MaxTimestamp(replacementSet).Before(maxReplacementTime) } +func (e *Manager) getParameters() (*Parameters, error) { + emp, err := e.paramProvider.GetParameters() + if err != nil { + return nil, errors.Wrap(err, "epoch manager parameters") + } + + return emp, nil +} + func (e *Manager) refreshLocked(ctx context.Context) error { if ctx.Err() != nil { return errors.Wrap(ctx.Err(), "refreshLocked") } + p, err := e.getParameters() + if err != nil { + return err + } + nextDelayTime := initiaRefreshAttemptSleep - if !e.Parameters.GetEpochManagerEnabled() { + if !p.Enabled { return errors.Errorf("epoch manager not enabled") } @@ -553,7 +555,7 @@ func (e *Manager) loadSingleEpochCompactions(ctx context.Context, cs *CurrentSna return nil } -func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs CurrentSnapshot) { +func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs CurrentSnapshot, p *Parameters) { latestSettled := cs.WriteEpoch - numUnsettledEpochs if latestSettled < 0 { return @@ -564,7 +566,7 @@ func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs firstNonRangeCompacted = cs.LongestRangeCheckpointSets[len(cs.LongestRangeCheckpointSets)-1].MaxEpoch + 1 } - if latestSettled-firstNonRangeCompacted < e.Parameters.GetEpochFullCheckpointFrequency() { + if latestSettled-firstNonRangeCompacted < p.FullCheckpointFrequency { e.log.Debugf("not generating range checkpoint") return @@ -587,14 +589,14 @@ func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs func (e *Manager) maybeOptimizeRangeCheckpointsAsync(ctx context.Context, cs CurrentSnapshot) { } -func (e *Manager) maybeStartCleanupAsync(ctx context.Context, cs CurrentSnapshot) { +func (e *Manager) maybeStartCleanupAsync(ctx context.Context, cs CurrentSnapshot, p *Parameters) { e.backgroundWork.Add(1) // we're starting background work, ignore parent cancelation signal. ctxutil.GoDetached(ctx, func(ctx context.Context) { defer e.backgroundWork.Done() - if err := e.cleanupInternal(ctx, cs); err != nil { + if err := e.cleanupInternal(ctx, cs, p); err != nil { e.log.Errorf("error cleaning up index blobs: %v, performance may be affected", err) } }) @@ -637,16 +639,21 @@ func (e *Manager) loadUncompactedEpochs(ctx context.Context, min, max int) (map[ // refreshAttemptLocked attempts to load the committedState of // the index and updates `lastKnownState` state atomically when complete. func (e *Manager) refreshAttemptLocked(ctx context.Context) error { + e.log.Debugf("refreshAttemptLocked") + + p, perr := e.getParameters() + if perr != nil { + return perr + } + cs := CurrentSnapshot{ WriteEpoch: 0, EpochStartTime: map[int]time.Time{}, UncompactedEpochSets: map[int][]blob.Metadata{}, SingleEpochCompactionSets: map[int][]blob.Metadata{}, - ValidUntil: e.timeFunc().Add(e.Parameters.GetEpochRefreshFrequency()), + ValidUntil: e.timeFunc().Add(p.EpochRefreshFrequency), } - e.log.Debugf("refreshAttemptLocked") - eg, ctx1 := errgroup.WithContext(ctx) eg.Go(func() error { @@ -680,7 +687,7 @@ func (e *Manager) refreshAttemptLocked(ctx context.Context) error { len(ues[cs.WriteEpoch+1]), cs.ValidUntil.Format(time.RFC3339Nano)) - if shouldAdvance(cs.UncompactedEpochSets[cs.WriteEpoch], e.Parameters.GetMinEpochDuration(), e.Parameters.GetEpochAdvanceOnCountThreshold(), e.Parameters.GetEpochAdvanceOnTotalSizeBytesThreshold()) { + if shouldAdvance(cs.UncompactedEpochSets[cs.WriteEpoch], p.MinEpochDuration, p.EpochAdvanceOnCountThreshold, p.EpochAdvanceOnTotalSizeBytesThreshold) { if err := e.advanceEpoch(ctx, cs); err != nil { return errors.Wrap(err, "error advancing epoch") } @@ -694,8 +701,8 @@ func (e *Manager) refreshAttemptLocked(ctx context.Context) error { e.lastKnownState = cs - e.maybeGenerateNextRangeCheckpointAsync(ctx, cs) - e.maybeStartCleanupAsync(ctx, cs) + e.maybeGenerateNextRangeCheckpointAsync(ctx, cs, p) + e.maybeStartCleanupAsync(ctx, cs, p) e.maybeOptimizeRangeCheckpointsAsync(ctx, cs) return nil @@ -765,9 +772,16 @@ func (e *Manager) WriteIndex(ctx context.Context, dataShards map[blob.ID]blob.By writtenForEpoch := -1 for { + e.log.Debugf("refreshAttemptLocked") + + p, err := e.getParameters() + if err != nil { + return nil, err + } + // make sure we have at least 75% of remaining time // nolint:gomnd - cs, err := e.committedState(ctx, 3*e.Parameters.GetEpochRefreshFrequency()/4) + cs, err := e.committedState(ctx, 3*p.EpochRefreshFrequency/4) if err != nil { return nil, errors.Wrap(err, "error getting committed state") } @@ -997,7 +1011,7 @@ func NewManager(st blob.Storage, paramProvider ParametersProvider, compactor Com log: log, compact: compactor, timeFunc: timeNow, - Parameters: paramProvider, + paramProvider: paramProvider, getCompleteIndexSetTooSlow: new(int32), committedStateRefreshTooSlow: new(int32), writeIndexTooSlow: new(int32), diff --git a/internal/epoch/epoch_manager_test.go b/internal/epoch/epoch_manager_test.go index 576d8cd1b..30926b9fb 100644 --- a/internal/epoch/epoch_manager_test.go +++ b/internal/epoch/epoch_manager_test.go @@ -92,7 +92,7 @@ func newTestEnv(t *testing.T) *epochManagerTestEnv { st = fs st = logging.NewWrapper(st, testlogging.NewTestLogger(t), "[STORAGE] ") te := &epochManagerTestEnv{unloggedst: unloggedst, st: st, ft: ft} - m := NewManager(te.st, &Parameters{ + m := NewManager(te.st, parameterProvider{&Parameters{ Enabled: true, EpochRefreshFrequency: 20 * time.Minute, FullCheckpointFrequency: 7, @@ -102,7 +102,7 @@ func newTestEnv(t *testing.T) *epochManagerTestEnv { EpochAdvanceOnCountThreshold: 15, EpochAdvanceOnTotalSizeBytesThreshold: 20 << 20, DeleteParallelism: 1, - }, te.compact, testlogging.NewTestLogger(t), te.ft.NowFunc()) + }}, te.compact, testlogging.NewTestLogger(t), te.ft.NowFunc()) te.mgr = m te.faultyStorage = fs te.data = data @@ -121,7 +121,7 @@ func (te *epochManagerTestEnv) another() *epochManagerTestEnv { faultyStorage: te.faultyStorage, } - te2.mgr = NewManager(te2.st, te.mgr.Parameters, te2.compact, te.mgr.log, te.mgr.timeFunc) + te2.mgr = NewManager(te2.st, te.mgr.paramProvider, te2.compact, te.mgr.log, te.mgr.timeFunc) return te2 } @@ -577,7 +577,7 @@ func verifySequentialWrites(t *testing.T, te *epochManagerTestEnv) { func TestIndexEpochManager_Disabled(t *testing.T) { te := newTestEnv(t) - (te.mgr.Parameters).(*Parameters).Enabled = false + te.mgr.paramProvider.(parameterProvider).Parameters.Enabled = false _, err := te.mgr.Current(testlogging.Context(t)) require.Error(t, err) @@ -736,3 +736,11 @@ func (te *epochManagerTestEnv) mustWriteIndexFiles(ctx context.Context, t *testi require.NoError(t, err) } + +type parameterProvider struct { + *Parameters +} + +func (p parameterProvider) GetParameters() (*Parameters, error) { + return p.Parameters, nil +} diff --git a/internal/server/api_repo.go b/internal/server/api_repo.go index 41e6db635..bf46f2d34 100644 --- a/internal/server/api_repo.go +++ b/internal/server/api_repo.go @@ -34,11 +34,16 @@ func handleRepoParameters(ctx context.Context, rc requestContext) (interface{}, }, nil } + scc, err := dr.ContentReader().SupportsContentCompression() + if err != nil { + return nil, internalServerError(err) + } + rp := &remoterepoapi.Parameters{ HashFunction: dr.ContentReader().ContentFormat().GetHashFunction(), HMACSecret: dr.ContentReader().ContentFormat().GetHmacSecret(), ObjectFormat: dr.ObjectFormat(), - SupportsContentCompression: dr.ContentReader().SupportsContentCompression(), + SupportsContentCompression: scc, } return rp, nil @@ -54,16 +59,26 @@ func handleRepoStatus(ctx context.Context, rc requestContext) (interface{}, *api dr, ok := rc.rep.(repo.DirectRepository) if ok { + mp, mperr := dr.ContentReader().ContentFormat().GetMutableParameters() + if mperr != nil { + return nil, internalServerError(mperr) + } + + scc, err := dr.ContentReader().SupportsContentCompression() + if err != nil { + return nil, internalServerError(err) + } + return &serverapi.StatusResponse{ Connected: true, ConfigFile: dr.ConfigFilename(), Hash: dr.ContentReader().ContentFormat().GetHashFunction(), Encryption: dr.ContentReader().ContentFormat().GetEncryptionAlgorithm(), - MaxPackSize: dr.ContentReader().ContentFormat().MaxPackBlobSize(), + MaxPackSize: mp.MaxPackSize, Splitter: dr.ObjectFormat().Splitter, Storage: dr.BlobReader().ConnectionInfo().Type, ClientOptions: dr.ClientOptions(), - SupportsContentCompression: dr.ContentReader().SupportsContentCompression(), + SupportsContentCompression: scc, }, nil } diff --git a/internal/server/grpc_session.go b/internal/server/grpc_session.go index 2bab85e65..3ea1f1864 100644 --- a/internal/server/grpc_session.go +++ b/internal/server/grpc_session.go @@ -455,6 +455,11 @@ func (s *Server) handleInitialSessionHandshake(srv grpcapi.KopiaRepository_Sessi return repo.WriteSessionOptions{}, errors.Errorf("missing initialization request") } + scc, err := dr.ContentReader().SupportsContentCompression() + if err != nil { + return repo.WriteSessionOptions{}, errors.Wrap(err, "supports content compression") + } + if err := s.send(srv, initializeReq.GetRequestId(), &grpcapi.SessionResponse{ Response: &grpcapi.SessionResponse_InitializeSession{ InitializeSession: &grpcapi.InitializeSessionResponse{ @@ -462,7 +467,7 @@ func (s *Server) handleInitialSessionHandshake(srv grpcapi.KopiaRepository_Sessi HashFunction: dr.ContentReader().ContentFormat().GetHashFunction(), HmacSecret: dr.ContentReader().ContentFormat().GetHmacSecret(), Splitter: dr.ObjectFormat().Splitter, - SupportsContentCompression: dr.ContentReader().SupportsContentCompression(), + SupportsContentCompression: scc, }, }, }, diff --git a/repo/api_server_repository.go b/repo/api_server_repository.go index aaaab8d2a..8bc36cb6f 100644 --- a/repo/api_server_repository.go +++ b/repo/api_server_repository.go @@ -145,8 +145,8 @@ func (r *apiServerRepository) Flush(ctx context.Context) error { return errors.Wrap(r.cli.Post(ctx, "flush", nil, nil), "Flush") } -func (r *apiServerRepository) SupportsContentCompression() bool { - return r.serverSupportsContentCompression +func (r *apiServerRepository) SupportsContentCompression() (bool, error) { + return r.serverSupportsContentCompression, nil } func (r *apiServerRepository) NewWriter(ctx context.Context, opt WriteSessionOptions) (context.Context, RepositoryWriter, error) { diff --git a/repo/content/committed_content_index.go b/repo/content/committed_content_index.go index 7ed432bed..dd5b90c36 100644 --- a/repo/content/committed_content_index.go +++ b/repo/content/committed_content_index.go @@ -15,6 +15,7 @@ "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/content/index" + "github.com/kopia/kopia/repo/format" "github.com/kopia/kopia/repo/logging" ) @@ -38,7 +39,7 @@ type committedContentIndex struct { merged index.Merged v1PerContentOverhead uint32 - indexVersion int + formatProvider format.Provider // fetchOne loads one index blob fetchOne func(ctx context.Context, blobID blob.ID, output *gather.WriteBuffer) error @@ -259,9 +260,14 @@ func (c *committedContentIndex) combineSmallIndexes(m index.Merged) (index.Merge } } + mp, mperr := c.formatProvider.GetMutableParameters() + if mperr != nil { + return nil, errors.Wrap(mperr, "error getting mutable parameters") + } + var buf bytes.Buffer - if err := b.Build(&buf, c.indexVersion); err != nil { + if err := b.Build(&buf, mp.IndexVersion); err != nil { return nil, errors.Wrap(err, "error building combined in-memory index") } @@ -349,7 +355,7 @@ func (c *committedContentIndex) missingIndexBlobs(ctx context.Context, blobs []b func newCommittedContentIndex(caching *CachingOptions, v1PerContentOverhead uint32, - indexVersion int, + formatProvider format.Provider, fetchOne func(ctx context.Context, blobID blob.ID, output *gather.WriteBuffer) error, log logging.Logger, minSweepAge time.Duration, @@ -370,7 +376,7 @@ func newCommittedContentIndex(caching *CachingOptions, cache: cache, inUse: map[blob.ID]index.Index{}, v1PerContentOverhead: v1PerContentOverhead, - indexVersion: indexVersion, + formatProvider: formatProvider, fetchOne: fetchOne, log: log, } diff --git a/repo/content/committed_read_manager.go b/repo/content/committed_read_manager.go index 483d7a784..bde047fd0 100644 --- a/repo/content/committed_read_manager.go +++ b/repo/content/committed_read_manager.go @@ -181,19 +181,24 @@ func (sm *SharedManager) loadPackIndexesLocked(ctx context.Context) error { nextSleepTime := 100 * time.Millisecond //nolint:gomnd for i := 0; i < indexLoadAttempts; i++ { + ibm, err0 := sm.indexBlobManager() + if err0 != nil { + return err0 + } + if err := ctx.Err(); err != nil { // nolint:wrapcheck return err } if i > 0 { - sm.indexBlobManager().flushCache(ctx) + ibm.flushCache(ctx) sm.log.Debugf("encountered NOT_FOUND when loading, sleeping %v before retrying #%v", nextSleepTime, i) time.Sleep(nextSleepTime) nextSleepTime *= 2 } - indexBlobs, ignoreDeletedBefore, err := sm.indexBlobManager().listActiveIndexBlobs(ctx) + indexBlobs, ignoreDeletedBefore, err := ibm.listActiveIndexBlobs(ctx) if err != nil { return errors.Wrap(err, "error listing index blobs") } @@ -235,12 +240,17 @@ func (sm *SharedManager) getCacheForContentID(id ID) cache.ContentCache { return sm.contentCache } -func (sm *SharedManager) indexBlobManager() indexBlobManager { - if sm.format.GetEpochManagerEnabled() { - return sm.indexBlobManagerV1 +func (sm *SharedManager) indexBlobManager() (indexBlobManager, error) { + mp, mperr := sm.format.GetMutableParameters() + if mperr != nil { + return nil, errors.Wrap(mperr, "mutable parameters") } - return sm.indexBlobManagerV0 + if mp.EpochParameters.Enabled { + return sm.indexBlobManagerV1, nil + } + + return sm.indexBlobManagerV0, nil } func (sm *SharedManager) decryptContentAndVerify(payload gather.Bytes, bi Info, output *gather.WriteBuffer) error { @@ -313,7 +323,12 @@ func (sm *SharedManager) IndexBlobs(ctx context.Context, includeInactive bool) ( return result, nil } - blobs, _, err := sm.indexBlobManager().listActiveIndexBlobs(ctx) + ibm, err0 := sm.indexBlobManager() + if err0 != nil { + return nil, err0 + } + + blobs, _, err := ibm.listActiveIndexBlobs(ctx) // nolint:wrapcheck return blobs, err @@ -450,7 +465,7 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca log: sm.namedLogger("index-blob-manager"), } - sm.indexBlobManagerV1.epochMgr = epoch.NewManager(cachedSt, sm.format, sm.indexBlobManagerV1.compactEpoch, sm.namedLogger("epoch-manager"), sm.timeNow) + sm.indexBlobManagerV1.epochMgr = epoch.NewManager(cachedSt, epochParameters{sm.format}, sm.indexBlobManagerV1.compactEpoch, sm.namedLogger("epoch-manager"), sm.timeNow) // once everything is ready, set it up sm.contentCache = dataCache @@ -458,7 +473,7 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca sm.indexBlobCache = indexBlobCache sm.committedContents = newCommittedContentIndex(caching, uint32(sm.format.Encryptor().Overhead()), - sm.format.WriteIndexVersion(), + sm.format, sm.enc.getEncryptedBlob, sm.namedLogger("committed-content-index"), caching.MinIndexSweepAge.DurationOrDefault(DefaultIndexCacheSweepAge)) @@ -466,14 +481,32 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca return nil } -// EpochManager returns the epoch manager. -func (sm *SharedManager) EpochManager() (*epoch.Manager, bool) { - ibm1, ok := sm.indexBlobManager().(*indexBlobManagerV1) - if !ok { - return nil, false +type epochParameters struct { + prov format.Provider +} + +func (p epochParameters) GetParameters() (*epoch.Parameters, error) { + mp, mperr := p.prov.GetMutableParameters() + if mperr != nil { + return nil, errors.Wrap(mperr, "mutable parameters") } - return ibm1.epochMgr, true + return &mp.EpochParameters, nil +} + +// EpochManager returns the epoch manager. +func (sm *SharedManager) EpochManager() (*epoch.Manager, bool, error) { + ibm, err := sm.indexBlobManager() + if err != nil { + return nil, false, err + } + + ibm1, ok := ibm.(*indexBlobManagerV1) + if !ok { + return nil, false, nil + } + + return ibm1.epochMgr, true, nil } // AddRef adds a reference to shared manager to prevents its closing on Release(). diff --git a/repo/content/content_index_recovery.go b/repo/content/content_index_recovery.go index 3b78d9b7b..fb09bd277 100644 --- a/repo/content/content_index_recovery.go +++ b/repo/content/content_index_recovery.go @@ -171,7 +171,12 @@ func decodePostamble(payload []byte) *packContentPostamble { } func (sm *SharedManager) buildLocalIndex(pending index.Builder, output *gather.WriteBuffer) error { - if err := pending.Build(output, sm.format.WriteIndexVersion()); err != nil { + mp, mperr := sm.format.GetMutableParameters() + if mperr != nil { + return errors.Wrap(mperr, "mutable parameters") + } + + if err := pending.Build(output, mp.IndexVersion); err != nil { return errors.Wrap(err, "unable to build local index") } diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index 12bee95b0..3611b30d8 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -255,7 +255,7 @@ func (bm *WriteManager) maybeRetryWritingFailedPacksUnlocked(ctx context.Context return nil } -func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, data gather.Bytes, isDeleted bool, comp compression.HeaderID, previousWriteTime int64) error { +func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, data gather.Bytes, isDeleted bool, comp compression.HeaderID, previousWriteTime int64, mp format.MutableParameters) error { // see if the current index is old enough to cause automatic flush. if err := bm.maybeFlushBasedOnTimeUnlocked(ctx); err != nil { return errors.Wrap(err, "unable to flush old pending writes") @@ -267,7 +267,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat defer compressedAndEncrypted.Close() // encrypt and compress before taking lock - actualComp, err := bm.maybeCompressAndEncryptDataForPacking(data, contentID, comp, &compressedAndEncrypted) + actualComp, err := bm.maybeCompressAndEncryptDataForPacking(data, contentID, comp, &compressedAndEncrypted, mp) if err != nil { return errors.Wrapf(err, "unable to encrypt %q", contentID) } @@ -317,7 +317,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat PackBlobID: pp.packBlobID, PackOffset: uint32(pp.currentPackData.Length()), TimestampSeconds: bm.contentWriteTime(previousWriteTime), - FormatVersion: byte(bm.format.FormatVersion()), + FormatVersion: byte(mp.Version), OriginalLength: uint32(data.Length()), } @@ -331,7 +331,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat pp.currentPackItems[contentID] = info - shouldWrite := pp.currentPackData.Length() >= bm.format.MaxPackBlobSize() + shouldWrite := pp.currentPackData.Length() >= mp.MaxPackSize if shouldWrite { // we're about to write to storage without holding a lock // remove from pendingPacks so other goroutine tries to mess with this pending pack. @@ -370,9 +370,9 @@ func (bm *WriteManager) EnableIndexFlush(ctx context.Context) { } // +checklocks:bm.mu -func (bm *WriteManager) verifyInvariantsLocked() { +func (bm *WriteManager) verifyInvariantsLocked(mp format.MutableParameters) { bm.verifyCurrentPackItemsLocked() - bm.verifyPackIndexBuilderLocked() + bm.verifyPackIndexBuilderLocked(mp) } // +checklocks:bm.mu @@ -391,7 +391,7 @@ func (bm *WriteManager) verifyCurrentPackItemsLocked() { } // +checklocks:bm.mu -func (bm *WriteManager) verifyPackIndexBuilderLocked() { +func (bm *WriteManager) verifyPackIndexBuilderLocked(mp format.MutableParameters) { for k, cpi := range bm.packIndexBuilder { bm.assertInvariant(cpi.GetContentID() == k, "content ID entry has invalid key: %v %v", cpi.GetContentID(), k) @@ -399,7 +399,7 @@ func (bm *WriteManager) verifyPackIndexBuilderLocked() { bm.assertInvariant(cpi.GetPackBlobID() == "", "content can't be both deleted and have a pack content: %v", cpi.GetContentID()) } else { bm.assertInvariant(cpi.GetPackBlobID() != "", "content that's not deleted must have a pack content: %+v", cpi) - bm.assertInvariant(cpi.GetFormatVersion() == byte(bm.format.FormatVersion()), "content that's not deleted must have a valid format version: %+v", cpi) + bm.assertInvariant(cpi.GetFormatVersion() == byte(mp.Version), "content that's not deleted must have a valid format version: %+v", cpi) } bm.assertInvariant(cpi.GetTimestampSeconds() != 0, "content has no timestamp: %v", cpi.GetContentID()) @@ -423,8 +423,13 @@ func (bm *WriteManager) writeIndexBlobs(ctx context.Context, dataShards []gather ctx, span := tracer.Start(ctx, "WriteIndexBlobs") defer span.End() + ibm, err := bm.indexBlobManager() + if err != nil { + return nil, err + } + // nolint:wrapcheck - return bm.indexBlobManager().writeIndexBlobs(ctx, dataShards, sessionID) + return ibm.writeIndexBlobs(ctx, dataShards, sessionID) } // +checklocksread:bm.indexesLock @@ -436,7 +441,7 @@ func (bm *WriteManager) addIndexBlob(ctx context.Context, indexBlobID blob.ID, d } // +checklocks:bm.mu -func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context) error { +func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context, mp format.MutableParameters) error { ctx, span := tracer.Start(ctx, "FlushPackIndexes") defer span.End() @@ -447,7 +452,7 @@ func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context) error { if len(bm.packIndexBuilder) > 0 { _, span2 := tracer.Start(ctx, "BuildShards") - dataShards, closeShards, err := bm.packIndexBuilder.BuildShards(bm.format.WriteIndexVersion(), true, bm.format.IndexShardSize()) + dataShards, closeShards, err := bm.packIndexBuilder.BuildShards(mp.IndexVersion, true, defaultIndexShardSize) span2.End() @@ -596,6 +601,11 @@ func (bm *WriteManager) setFlushingLocked(v bool) { // Any pending writes completed before Flush() has started are guaranteed to be committed to the // repository before Flush() returns. func (bm *WriteManager) Flush(ctx context.Context) error { + mp, mperr := bm.format.GetMutableParameters() + if mperr != nil { + return errors.Wrap(mperr, "mutable parameters") + } + bm.lock() defer bm.unlock() @@ -633,7 +643,7 @@ func (bm *WriteManager) Flush(ctx context.Context) error { return errors.Wrap(err, "error writing pending content") } - if err := bm.flushPackIndexesLocked(ctx); err != nil { + if err := bm.flushPackIndexesLocked(ctx, mp); err != nil { return errors.Wrap(err, "error flushing indexes") } @@ -646,7 +656,12 @@ func (bm *WriteManager) Flush(ctx context.Context) error { func (bm *WriteManager) RewriteContent(ctx context.Context, contentID ID) error { bm.log.Debugf("rewrite-content %v", contentID) - return bm.rewriteContent(ctx, contentID, false) + mp, mperr := bm.format.GetMutableParameters() + if mperr != nil { + return errors.Wrap(mperr, "mutable parameters") + } + + return bm.rewriteContent(ctx, contentID, false, mp) } func (bm *WriteManager) getContentDataAndInfo(ctx context.Context, contentID ID, output *gather.WriteBuffer) (Info, error) { @@ -672,14 +687,19 @@ func (bm *WriteManager) getContentDataAndInfo(ctx context.Context, contentID ID, func (bm *WriteManager) UndeleteContent(ctx context.Context, contentID ID) error { bm.log.Debugf("UndeleteContent(%q)", contentID) - return bm.rewriteContent(ctx, contentID, true) + mp, mperr := bm.format.GetMutableParameters() + if mperr != nil { + return errors.Wrap(mperr, "mutable parameters") + } + + return bm.rewriteContent(ctx, contentID, true, mp) } // When onlyRewriteDelete is true, the content is only rewritten if the existing // content is marked as deleted. The new content is NOT marked deleted. // When onlyRewriteDelete is false, the content is unconditionally rewritten // and the content's deleted status is preserved. -func (bm *WriteManager) rewriteContent(ctx context.Context, contentID ID, onlyRewriteDeleted bool) error { +func (bm *WriteManager) rewriteContent(ctx context.Context, contentID ID, onlyRewriteDeleted bool, mp format.MutableParameters) error { var data gather.WriteBuffer defer data.Close() @@ -698,7 +718,7 @@ func (bm *WriteManager) rewriteContent(ctx context.Context, contentID ID, onlyRe isDeleted = false } - return bm.addToPackUnlocked(ctx, contentID, data.Bytes(), isDeleted, bi.GetCompressionHeaderID(), bi.GetTimestampSeconds()) + return bm.addToPackUnlocked(ctx, contentID, data.Bytes(), isDeleted, bi.GetCompressionHeaderID(), bi.GetTimestampSeconds(), mp) } func packPrefixForContentID(contentID ID) blob.ID { @@ -747,13 +767,23 @@ func (bm *WriteManager) getOrCreatePendingPackInfoLocked(ctx context.Context, pr } // SupportsContentCompression returns true if content manager supports content-compression. -func (bm *WriteManager) SupportsContentCompression() bool { - return bm.format.WriteIndexVersion() >= index.Version2 +func (bm *WriteManager) SupportsContentCompression() (bool, error) { + mp, mperr := bm.format.GetMutableParameters() + if mperr != nil { + return false, errors.Wrap(mperr, "mutable parameters") + } + + return mp.IndexVersion >= index.Version2, nil } // WriteContent saves a given content of data to a pack group with a provided name and returns a contentID // that's based on the contents of data written. func (bm *WriteManager) WriteContent(ctx context.Context, data gather.Bytes, prefix index.IDPrefix, comp compression.HeaderID) (ID, error) { + mp, mperr := bm.format.GetMutableParameters() + if mperr != nil { + return EmptyID, errors.Wrap(mperr, "mutable parameters") + } + if err := bm.maybeRetryWritingFailedPacksUnlocked(ctx); err != nil { return EmptyID, err } @@ -797,7 +827,7 @@ func (bm *WriteManager) WriteContent(ctx context.Context, data gather.Bytes, pre bm.log.Debugf(logbuf.String()) - return contentID, bm.addToPackUnlocked(ctx, contentID, data, false, comp, previousWriteTime) + return contentID, bm.addToPackUnlocked(ctx, contentID, data, false, comp, previousWriteTime, mp) } // GetContent gets the contents of a given content. If the content is not found returns ErrContentNotFound. @@ -892,7 +922,10 @@ func (bm *WriteManager) lock() { // +checklocksrelease:bm.mu func (bm *WriteManager) unlock() { if bm.checkInvariantsOnUnlock { - bm.verifyInvariantsLocked() + mp, mperr := bm.format.GetMutableParameters() + if mperr == nil { + bm.verifyInvariantsLocked(mp) + } } bm.mu.Unlock() diff --git a/repo/content/content_manager_indexes.go b/repo/content/content_manager_indexes.go index 32786d8ba..bdbcc816a 100644 --- a/repo/content/content_manager_indexes.go +++ b/repo/content/content_manager_indexes.go @@ -38,11 +38,16 @@ func (sm *SharedManager) Refresh(ctx context.Context) error { sm.log.Debugf("Refresh started") - sm.indexBlobManager().invalidate(ctx) + ibm, err := sm.indexBlobManager() + if err != nil { + return err + } + + ibm.invalidate(ctx) timer := timetrack.StartTimer() - err := sm.loadPackIndexesLocked(ctx) + err = sm.loadPackIndexesLocked(ctx) sm.log.Debugf("Refresh completed in %v", timer.Elapsed()) return err @@ -57,7 +62,12 @@ func (sm *SharedManager) CompactIndexes(ctx context.Context, opt CompactOptions) sm.log.Debugf("CompactIndexes(%+v)", opt) - if err := sm.indexBlobManager().compact(ctx, opt); err != nil { + ibm, err := sm.indexBlobManager() + if err != nil { + return err + } + + if err := ibm.compact(ctx, opt); err != nil { return errors.Wrap(err, "error performing compaction") } diff --git a/repo/content/content_manager_lock_free.go b/repo/content/content_manager_lock_free.go index 6ed6d4772..ecd691ce9 100644 --- a/repo/content/content_manager_lock_free.go +++ b/repo/content/content_manager_lock_free.go @@ -16,27 +16,28 @@ "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/compression" "github.com/kopia/kopia/repo/content/index" + "github.com/kopia/kopia/repo/format" "github.com/kopia/kopia/repo/hashing" "github.com/kopia/kopia/repo/logging" ) const indexBlobCompactionWarningThreshold = 1000 -func (sm *SharedManager) maybeCompressAndEncryptDataForPacking(data gather.Bytes, contentID ID, comp compression.HeaderID, output *gather.WriteBuffer) (compression.HeaderID, error) { +func (sm *SharedManager) maybeCompressAndEncryptDataForPacking(data gather.Bytes, contentID ID, comp compression.HeaderID, output *gather.WriteBuffer, mp format.MutableParameters) (compression.HeaderID, error) { var hashOutput [hashing.MaxHashSize]byte iv := getPackedContentIV(hashOutput[:0], contentID) // If the content is prefixed (which represents Kopia's own metadata as opposed to user data), // and we're on V2 format or greater, enable internal compression even when not requested. - if contentID.HasPrefix() && comp == NoCompression && sm.format.WriteIndexVersion() >= index.Version2 { + if contentID.HasPrefix() && comp == NoCompression && mp.IndexVersion >= index.Version2 { // 'zstd-fastest' has a good mix of being fast, low memory usage and high compression for JSON. comp = compression.HeaderZstdFastest } // nolint:nestif if comp != NoCompression { - if sm.format.WriteIndexVersion() < index.Version2 { + if mp.IndexVersion < index.Version2 { return NoCompression, errors.Errorf("compression is not enabled for this repository") } diff --git a/repo/content/content_manager_test.go b/repo/content/content_manager_test.go index 7e5d2d0b4..52f9f9ab4 100644 --- a/repo/content/content_manager_test.go +++ b/repo/content/content_manager_test.go @@ -1829,7 +1829,7 @@ func (s *contentManagerSuite) TestAutoCompressionOfMetadata(t *testing.T) { info, err := bm.ContentInfo(ctx, contentID) require.NoError(t, err) - if bm.SupportsContentCompression() { + if scc, _ := bm.SupportsContentCompression(); scc { require.Equal(t, compression.HeaderZstdFastest, info.GetCompressionHeaderID()) } else { require.Equal(t, NoCompression, info.GetCompressionHeaderID()) @@ -2030,7 +2030,6 @@ func (s *contentManagerSuite) TestCompression_Disabled(t *testing.T) { indexVersion: index.Version1, }) - require.False(t, bm.SupportsContentCompression()) ctx := testlogging.Context(t) compressibleData := bytes.Repeat([]byte{1, 2, 3, 4}, 1000) @@ -2046,8 +2045,6 @@ func (s *contentManagerSuite) TestCompression_CompressibleData(t *testing.T) { indexVersion: index.Version2, }) - require.True(t, bm.SupportsContentCompression()) - ctx := testlogging.Context(t) compressibleData := bytes.Repeat([]byte{1, 2, 3, 4}, 1000) headerID := compression.ByName["gzip"].HeaderID() @@ -2081,8 +2078,6 @@ func (s *contentManagerSuite) TestCompression_NonCompressibleData(t *testing.T) indexVersion: index.Version2, }) - require.True(t, bm.SupportsContentCompression()) - ctx := testlogging.Context(t) nonCompressibleData := make([]byte, 65000) headerID := compression.ByName["pgzip"].HeaderID() diff --git a/repo/content/content_reader.go b/repo/content/content_reader.go index d5c03c265..f9bb95e5f 100644 --- a/repo/content/content_reader.go +++ b/repo/content/content_reader.go @@ -9,12 +9,12 @@ // Reader defines content read API. type Reader interface { - SupportsContentCompression() bool + SupportsContentCompression() (bool, error) ContentFormat() format.Provider GetContent(ctx context.Context, id ID) ([]byte, error) ContentInfo(ctx context.Context, id ID) (Info, error) IterateContents(ctx context.Context, opts IterateOptions, callback IterateCallback) error IteratePacks(ctx context.Context, opts IteratePackOptions, callback IteratePacksCallback) error ListActiveSessions(ctx context.Context) (map[SessionID]*SessionInfo, error) - EpochManager() (*epoch.Manager, bool) + EpochManager() (*epoch.Manager, bool, error) } diff --git a/repo/content/index_blob_manager_v0.go b/repo/content/index_blob_manager_v0.go index ca5067d5e..469d92777 100644 --- a/repo/content/index_blob_manager_v0.go +++ b/repo/content/index_blob_manager_v0.go @@ -11,6 +11,7 @@ "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/content/index" + "github.com/kopia/kopia/repo/format" "github.com/kopia/kopia/repo/logging" ) @@ -20,6 +21,8 @@ const ( legacyIndexPoisonBlobID = "n00000000000000000000000000000000-repository_unreadable_by_this_kopia_version_upgrade_required" + defaultIndexShardSize = 16e6 // slightly less than 2^24, which lets index use 24-bit/3-byte indexes + defaultEventualConsistencySettleTime = 1 * time.Hour compactionLogBlobPrefix = "m" cleanupBlobPrefix = "l" @@ -52,9 +55,7 @@ type cleanupEntry struct { // IndexFormattingOptions provides options for formatting index blobs. type IndexFormattingOptions interface { - MaxIndexBlobSize() int64 - WriteIndexVersion() int - IndexShardSize() int + GetMutableParameters() (format.MutableParameters, error) } type indexBlobManagerV0 struct { @@ -136,7 +137,12 @@ func (m *indexBlobManagerV0) compact(ctx context.Context, opt CompactOptions) er return errors.Wrap(err, "error listing active index blobs") } - blobsToCompact := m.getBlobsToCompact(indexBlobs, opt) + mp, mperr := m.formattingOptions.GetMutableParameters() + if mperr != nil { + return errors.Wrap(mperr, "mutable parameters") + } + + blobsToCompact := m.getBlobsToCompact(indexBlobs, opt, mp) if err := m.compactIndexBlobs(ctx, blobsToCompact, opt); err != nil { return errors.Wrap(err, "error performing compaction") @@ -416,22 +422,22 @@ func (m *indexBlobManagerV0) cleanup(ctx context.Context, maxEventualConsistency return nil } -func (m *indexBlobManagerV0) getBlobsToCompact(indexBlobs []IndexBlobInfo, opt CompactOptions) []IndexBlobInfo { - var nonCompactedBlobs, verySmallBlobs []IndexBlobInfo - - var totalSizeNonCompactedBlobs, totalSizeVerySmallBlobs, totalSizeMediumSizedBlobs int64 - - var mediumSizedBlobCount int +func (m *indexBlobManagerV0) getBlobsToCompact(indexBlobs []IndexBlobInfo, opt CompactOptions, mp format.MutableParameters) []IndexBlobInfo { + var ( + nonCompactedBlobs, verySmallBlobs []IndexBlobInfo + totalSizeNonCompactedBlobs, totalSizeVerySmallBlobs, totalSizeMediumSizedBlobs int64 + mediumSizedBlobCount int + ) for _, b := range indexBlobs { - if b.Length > m.formattingOptions.MaxIndexBlobSize() && !opt.AllIndexes { + if b.Length > int64(mp.MaxPackSize) && !opt.AllIndexes { continue } nonCompactedBlobs = append(nonCompactedBlobs, b) totalSizeNonCompactedBlobs += b.Length - if b.Length < m.formattingOptions.MaxIndexBlobSize()/verySmallContentFraction { + if b.Length < int64(mp.MaxPackSize)/verySmallContentFraction { verySmallBlobs = append(verySmallBlobs, b) totalSizeVerySmallBlobs += b.Length } else { @@ -461,6 +467,11 @@ func (m *indexBlobManagerV0) compactIndexBlobs(ctx context.Context, indexBlobs [ return nil } + mp, mperr := m.formattingOptions.GetMutableParameters() + if mperr != nil { + return errors.Wrap(mperr, "mutable parameters") + } + bld := make(index.Builder) var inputs, outputs []blob.Metadata @@ -479,7 +490,7 @@ func (m *indexBlobManagerV0) compactIndexBlobs(ctx context.Context, indexBlobs [ // we must do it after all input blobs have been merged, otherwise we may resurrect contents. m.dropContentsFromBuilder(bld, opt) - dataShards, cleanupShards, err := bld.BuildShards(m.formattingOptions.WriteIndexVersion(), false, m.formattingOptions.IndexShardSize()) + dataShards, cleanupShards, err := bld.BuildShards(mp.IndexVersion, false, defaultIndexShardSize) if err != nil { return errors.Wrap(err, "unable to build an index") } diff --git a/repo/content/index_blob_manager_v1.go b/repo/content/index_blob_manager_v1.go index 885fdbc76..3b4884bea 100644 --- a/repo/content/index_blob_manager_v1.go +++ b/repo/content/index_blob_manager_v1.go @@ -68,7 +68,12 @@ func (m *indexBlobManagerV1) compactEpoch(ctx context.Context, blobIDs []blob.ID } } - dataShards, cleanupShards, err := tmpbld.BuildShards(m.formattingOptions.WriteIndexVersion(), true, m.formattingOptions.IndexShardSize()) + mp, mperr := m.formattingOptions.GetMutableParameters() + if mperr != nil { + return errors.Wrap(mperr, "mutable parameters") + } + + dataShards, cleanupShards, err := tmpbld.BuildShards(mp.IndexVersion, true, defaultIndexShardSize) if err != nil { return errors.Wrap(err, "unable to build index dataShards") } diff --git a/repo/format/content_format.go b/repo/format/content_format.go index 2c026b35b..9360caeaa 100644 --- a/repo/format/content_format.go +++ b/repo/format/content_format.go @@ -42,8 +42,8 @@ func (f *ContentFormat) ResolveFormatVersion() error { } // GetMutableParameters implements FormattingOptionsProvider. -func (f *ContentFormat) GetMutableParameters() MutableParameters { - return f.MutableParameters +func (f *ContentFormat) GetMutableParameters() (MutableParameters, error) { + return f.MutableParameters, nil } // SupportsPasswordChange implements FormattingOptionsProvider. diff --git a/repo/format/format_provider.go b/repo/format/format_provider.go index 401ba79a1..8f954b7e4 100644 --- a/repo/format/format_provider.go +++ b/repo/format/format_provider.go @@ -1,11 +1,8 @@ package format import ( - "time" - "github.com/pkg/errors" - "github.com/kopia/kopia/internal/epoch" "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/repo/content/index" "github.com/kopia/kopia/repo/encryption" @@ -16,8 +13,6 @@ minValidPackSize = 10 << 20 maxValidPackSize = 120 << 20 - defaultIndexShardSize = 16e6 // slightly less than 2^24, which lets index use 24-bit/3-byte indexes - // CurrentWriteVersion is the version of the repository applied to new repositories. CurrentWriteVersion = FormatVersion3 @@ -51,23 +46,17 @@ // Provider provides current formatting options. The options returned // should not be cached for more than a few seconds as they are subject to change. type Provider interface { - epoch.ParametersProvider - - MaxIndexBlobSize() int64 - WriteIndexVersion() int - IndexShardSize() int - encryption.Parameters hashing.Parameters HashFunc() hashing.HashFunc Encryptor() encryption.Encryptor - GetMutableParameters() MutableParameters - GetMasterKey() []byte + // this is typically cached, but sometimes refreshes MutableParameters from + // the repository so the results should not be cached. + GetMutableParameters() (MutableParameters, error) SupportsPasswordChange() bool - FormatVersion() Version - MaxPackBlobSize() int + GetMasterKey() []byte RepositoryFormatBytes() []byte Struct() ContentFormat } @@ -75,55 +64,9 @@ type Provider interface { type formattingOptionsProvider struct { *ContentFormat - h hashing.HashFunc - e encryption.Encryptor - actualFormatVersion Version - actualIndexVersion int - formatBytes []byte -} - -func (f *formattingOptionsProvider) FormatVersion() Version { - return f.Version -} - -// whether epoch manager is enabled, must be true. -func (f *formattingOptionsProvider) GetEpochManagerEnabled() bool { - return f.EpochParameters.Enabled -} - -// how frequently each client will list blobs to determine the current epoch. -func (f *formattingOptionsProvider) GetEpochRefreshFrequency() time.Duration { - return f.EpochParameters.EpochRefreshFrequency -} - -// number of epochs between full checkpoints. -func (f *formattingOptionsProvider) GetEpochFullCheckpointFrequency() int { - return f.EpochParameters.FullCheckpointFrequency -} - -// GetEpochCleanupSafetyMargin returns safety margin to prevent uncompacted blobs from being deleted if the corresponding compacted blob age is less than this. -func (f *formattingOptionsProvider) GetEpochCleanupSafetyMargin() time.Duration { - return f.EpochParameters.CleanupSafetyMargin -} - -// GetMinEpochDuration returns the minimum duration of an epoch. -func (f *formattingOptionsProvider) GetMinEpochDuration() time.Duration { - return f.EpochParameters.MinEpochDuration -} - -// GetEpochAdvanceOnCountThreshold returns the number of files above which epoch should be advanced. -func (f *formattingOptionsProvider) GetEpochAdvanceOnCountThreshold() int { - return f.EpochParameters.EpochAdvanceOnCountThreshold -} - -// GetEpochAdvanceOnTotalSizeBytesThreshold returns the total size of files above which the epoch should be advanced. -func (f *formattingOptionsProvider) GetEpochAdvanceOnTotalSizeBytesThreshold() int64 { - return f.EpochParameters.EpochAdvanceOnTotalSizeBytesThreshold -} - -// GetEpochDeleteParallelism returns the number of blobs to delete in parallel during cleanup. -func (f *formattingOptionsProvider) GetEpochDeleteParallelism() int { - return f.EpochParameters.DeleteParallelism + h hashing.HashFunc + e encryption.Encryptor + formatBytes []byte } func (f *formattingOptionsProvider) Struct() ContentFormat { @@ -143,13 +86,12 @@ func NewFormattingOptionsProvider(f *ContentFormat, formatBytes []byte) (Provide return nil, errors.Errorf("can't handle repositories created using version %v (min supported %v, max supported %v)", formatVersion, MinSupportedWriteVersion, MaxSupportedWriteVersion) } - actualIndexVersion := f.IndexVersion - if actualIndexVersion == 0 { - actualIndexVersion = legacyIndexVersion + if f.IndexVersion == 0 { + f.IndexVersion = legacyIndexVersion } - if actualIndexVersion < index.Version1 || actualIndexVersion > index.Version2 { - return nil, errors.Errorf("index version %v is not supported", actualIndexVersion) + if f.IndexVersion < index.Version1 || f.IndexVersion > index.Version2 { + return nil, errors.Errorf("index version %v is not supported", f.IndexVersion) } h, err := hashing.CreateHashFunc(f) @@ -175,11 +117,9 @@ func NewFormattingOptionsProvider(f *ContentFormat, formatBytes []byte) (Provide return &formattingOptionsProvider{ ContentFormat: f, - h: h, - e: e, - actualIndexVersion: actualIndexVersion, - actualFormatVersion: f.Version, - formatBytes: formatBytes, + h: h, + e: e, + formatBytes: formatBytes, }, nil } @@ -191,26 +131,6 @@ func (f *formattingOptionsProvider) HashFunc() hashing.HashFunc { return f.h } -func (f *formattingOptionsProvider) WriteIndexVersion() int { - return f.actualIndexVersion -} - -func (f *formattingOptionsProvider) MaxIndexBlobSize() int64 { - return int64(f.MaxPackSize) -} - -func (f *formattingOptionsProvider) MaxPackBlobSize() int { - return f.MaxPackSize -} - -func (f *formattingOptionsProvider) GetEpochManagerParameters() epoch.Parameters { - return f.EpochParameters -} - -func (f *formattingOptionsProvider) IndexShardSize() int { - return defaultIndexShardSize -} - func (f *formattingOptionsProvider) RepositoryFormatBytes() []byte { return f.formatBytes } diff --git a/repo/grpc_repository_client.go b/repo/grpc_repository_client.go index 798871fb3..789b20d5f 100644 --- a/repo/grpc_repository_client.go +++ b/repo/grpc_repository_client.go @@ -633,8 +633,8 @@ func (r *grpcInnerSession) GetContent(ctx context.Context, contentID content.ID) return nil, errNoSessionResponse() } -func (r *grpcRepositoryClient) SupportsContentCompression() bool { - return r.serverSupportsContentCompression +func (r *grpcRepositoryClient) SupportsContentCompression() (bool, error) { + return r.serverSupportsContentCompression, nil } func (r *grpcRepositoryClient) doWrite(ctx context.Context, contentID content.ID, data []byte, prefix content.IDPrefix, comp compression.HeaderID) error { diff --git a/repo/maintenance/content_rewrite.go b/repo/maintenance/content_rewrite.go index c8f2561b3..12986613d 100644 --- a/repo/maintenance/content_rewrite.go +++ b/repo/maintenance/content_rewrite.go @@ -136,8 +136,11 @@ func getContentToRewrite(ctx context.Context, rep repo.DirectRepository, opt *Re // add all content IDs from short packs if opt.ShortPacks { - threshold := int64(rep.ContentReader().ContentFormat().MaxPackBlobSize() * shortPackThresholdPercent / 100) //nolint:gomnd - findContentInShortPacks(ctx, rep, ch, threshold, opt) + mp, mperr := rep.ContentReader().ContentFormat().GetMutableParameters() + if mperr == nil { + threshold := int64(mp.MaxPackSize * shortPackThresholdPercent / 100) //nolint:gomnd + findContentInShortPacks(ctx, rep, ch, threshold, opt) + } } // add all blocks with given format version diff --git a/repo/maintenance/maintenance_run.go b/repo/maintenance/maintenance_run.go index 49df01f20..930dd7c3b 100644 --- a/repo/maintenance/maintenance_run.go +++ b/repo/maintenance/maintenance_run.go @@ -246,11 +246,16 @@ func Run(ctx context.Context, runParams RunParameters, safety SafetyParameters) } func runQuickMaintenance(ctx context.Context, runParams RunParameters, safety SafetyParameters) error { - if _, ok := runParams.rep.ContentManager().EpochManager(); ok { + _, ok, emerr := runParams.rep.ContentManager().EpochManager() + if ok { log(ctx).Debugf("quick maintenance not required for epoch manager") return nil } + if emerr != nil { + return errors.Wrap(emerr, "epoch manager") + } + s, err := GetSchedule(ctx, runParams.rep) if err != nil { return errors.Wrap(err, "unable to get schedule") @@ -321,7 +326,11 @@ func runTaskCleanupLogs(ctx context.Context, runParams RunParameters, s *Schedul } func runTaskCleanupEpochManager(ctx context.Context, runParams RunParameters, s *Schedule) error { - em, ok := runParams.rep.ContentManager().EpochManager() + em, ok, emerr := runParams.rep.ContentManager().EpochManager() + if emerr != nil { + return errors.Wrap(emerr, "epoch manager") + } + if !ok { return nil } diff --git a/repo/maintenance/maintenance_schedule.go b/repo/maintenance/maintenance_schedule.go index 2e472e34a..46f0a9ec0 100644 --- a/repo/maintenance/maintenance_schedule.go +++ b/repo/maintenance/maintenance_schedule.go @@ -62,7 +62,12 @@ func (s *Schedule) ReportRun(taskType TaskType, info RunInfo) { } func getAES256GCM(rep repo.DirectRepository) (cipher.AEAD, error) { - c, err := aes.NewCipher(rep.DeriveKey(maintenanceScheduleKeyPurpose, maintenanceScheduleKeySize)) + key, err := rep.DeriveKey(maintenanceScheduleKeyPurpose, maintenanceScheduleKeySize) + if err != nil { + return nil, errors.Wrap(err, "derive key") + } + + c, err := aes.NewCipher(key) if err != nil { return nil, errors.Wrap(err, "unable to create AES-256 cipher") } diff --git a/repo/object/object_manager.go b/repo/object/object_manager.go index ed9519b10..e0e0df711 100644 --- a/repo/object/object_manager.go +++ b/repo/object/object_manager.go @@ -34,7 +34,7 @@ type contentReader interface { type contentManager interface { contentReader - SupportsContentCompression() bool + SupportsContentCompression() (bool, error) WriteContent(ctx context.Context, data gather.Bytes, prefix content.IDPrefix, comp compression.HeaderID) (content.ID, error) } diff --git a/repo/object/object_manager_test.go b/repo/object/object_manager_test.go index 770a3ac82..357afdc90 100644 --- a/repo/object/object_manager_test.go +++ b/repo/object/object_manager_test.go @@ -80,8 +80,8 @@ func (f *fakeContentManager) WriteContent(ctx context.Context, data gather.Bytes return contentID, nil } -func (f *fakeContentManager) SupportsContentCompression() bool { - return f.supportsContentCompression +func (f *fakeContentManager) SupportsContentCompression() (bool, error) { + return f.supportsContentCompression, nil } func (f *fakeContentManager) ContentInfo(ctx context.Context, contentID content.ID) (content.Info, error) { diff --git a/repo/object/object_writer.go b/repo/object/object_writer.go index a22e4475d..67c511f58 100644 --- a/repo/object/object_writer.go +++ b/repo/object/object_writer.go @@ -188,8 +188,13 @@ func (w *objectWriter) prepareAndWriteContentChunk(chunkID int, data gather.Byte comp := content.NoCompression objectComp := w.compressor + scc, err := w.om.contentMgr.SupportsContentCompression() + if err != nil { + return errors.Wrap(err, "supports content compression") + } + // do not compress in this layer, instead pass comp to the content manager. - if w.om.contentMgr.SupportsContentCompression() && w.compressor != nil { + if scc && w.compressor != nil { comp = w.compressor.HeaderID() objectComp = nil } diff --git a/repo/repository.go b/repo/repository.go index 2513e10f6..c5df08248 100644 --- a/repo/repository.go +++ b/repo/repository.go @@ -63,7 +63,7 @@ type DirectRepository interface { AlsoLogToContentLog(ctx context.Context) context.Context UniqueID() []byte ConfigFilename() string - DeriveKey(purpose []byte, keyLength int) []byte + DeriveKey(purpose []byte, keyLength int) ([]byte, error) Token(password string) (string, error) Throttler() throttling.SettableThrottler RequiredFeatures() ([]feature.Required, error) @@ -111,15 +111,20 @@ type directRepository struct { } // DeriveKey derives encryption key of the provided length from the master key. -func (r *directRepository) DeriveKey(purpose []byte, keyLength int) []byte { - if r.cmgr.ContentFormat().SupportsPasswordChange() { - return format.DeriveKeyFromMasterKey(r.cmgr.ContentFormat().GetMasterKey(), r.uniqueID, purpose, keyLength) +func (r *directRepository) DeriveKey(purpose []byte, keyLength int) ([]byte, error) { + mp, mperr := r.cmgr.ContentFormat().GetMutableParameters() + if mperr != nil { + return nil, errors.Wrap(mperr, "mutable parameters") + } + + if mp.Version >= format.FormatVersion2 { + return format.DeriveKeyFromMasterKey(r.cmgr.ContentFormat().GetMasterKey(), r.uniqueID, purpose, keyLength), nil } // version of kopia