From 524ffaf4b8adc754d3ca42fb0f774c179cb1ba2b Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Tue, 20 Feb 2024 14:48:23 -0800 Subject: [PATCH] refactor(repository): added context to potentially blocking repository methods (#3654) Primarily for wiring a context.Context to a call to content.Manager.refresh, which was using a detached context. --- cli/auto_upgrade.go | 2 +- cli/command_index_epoch_list.go | 2 +- cli/command_maintenance_run.go | 2 +- cli/command_maintenance_set.go | 2 +- cli/command_repository_set_parameters.go | 6 +-- cli/command_repository_status.go | 24 ++++----- cli/command_repository_upgrade.go | 14 ++--- internal/epoch/epoch_manager.go | 16 +++--- internal/epoch/epoch_manager_test.go | 4 +- internal/server/api_repo.go | 16 +++--- internal/server/grpc_session.go | 2 +- repo/api_server_repository.go | 2 +- repo/content/committed_content_index.go | 6 +-- repo/content/committed_read_manager.go | 16 +++--- repo/content/content_index_recovery.go | 14 ++--- repo/content/content_manager.go | 51 ++++++++++--------- repo/content/content_manager_indexes.go | 4 +- repo/content/content_manager_iterate.go | 6 +-- repo/content/content_manager_lock_free.go | 4 +- repo/content/content_manager_test.go | 2 +- repo/content/content_reader.go | 4 +- .../indexblob/index_blob_manager_v0.go | 6 +-- .../indexblob/index_blob_manager_v1.go | 2 +- repo/format/content_format.go | 4 +- repo/format/format_manager.go | 32 ++++++------ repo/format/format_manager_test.go | 12 ++--- repo/format/format_provider.go | 8 +-- repo/format/upgrade_lock.go | 10 ++-- repo/format/upgrade_lock_test.go | 4 +- repo/grpc_repository_client.go | 2 +- repo/maintenance/blob_retain.go | 2 +- repo/maintenance/content_rewrite.go | 2 +- repo/maintenance/maintenance_run.go | 4 +- repo/object/object_manager.go | 2 +- repo/object/object_manager_test.go | 2 +- repo/object/object_writer.go | 2 +- repo/open.go | 8 +-- repo/repository_test.go | 8 +-- 38 files changed, 158 insertions(+), 151 deletions(-) diff --git a/cli/auto_upgrade.go b/cli/auto_upgrade.go index 11f684b5a..285378c7a 100644 --- a/cli/auto_upgrade.go +++ b/cli/auto_upgrade.go @@ -40,7 +40,7 @@ func setDefaultMaintenanceParameters(ctx context.Context, rep repo.RepositoryWri p.Owner = rep.ClientOptions().UsernameAtHost() if dw, ok := rep.(repo.DirectRepositoryWriter); ok { - _, ok, err := dw.ContentReader().EpochManager() + _, ok, err := dw.ContentReader().EpochManager(ctx) if err != nil { return errors.Wrap(err, "epoch manager") } diff --git a/cli/command_index_epoch_list.go b/cli/command_index_epoch_list.go index 36d961f07..327f7ffe0 100644 --- a/cli/command_index_epoch_list.go +++ b/cli/command_index_epoch_list.go @@ -23,7 +23,7 @@ func (c *commandIndexEpochList) setup(svc appServices, parent commandParent) { } func (c *commandIndexEpochList) run(ctx context.Context, rep repo.DirectRepository) error { - emgr, ok, err := rep.ContentReader().EpochManager() + emgr, ok, err := rep.ContentReader().EpochManager(ctx) if err != nil { return errors.Wrap(err, "epoch manager") } diff --git a/cli/command_maintenance_run.go b/cli/command_maintenance_run.go index 75306752e..d2e42c612 100644 --- a/cli/command_maintenance_run.go +++ b/cli/command_maintenance_run.go @@ -28,7 +28,7 @@ func (c *commandMaintenanceRun) setup(svc appServices, parent commandParent) { func (c *commandMaintenanceRun) run(ctx context.Context, rep repo.DirectRepositoryWriter) error { mode := maintenance.ModeQuick - _, supportsEpochManager, err := rep.ContentManager().EpochManager() + _, supportsEpochManager, err := rep.ContentManager().EpochManager(ctx) if err != nil { return errors.Wrap(err, "EpochManager") } diff --git a/cli/command_maintenance_set.go b/cli/command_maintenance_set.go index d2f5bd9d3..93b3ddf92 100644 --- a/cli/command_maintenance_set.go +++ b/cli/command_maintenance_set.go @@ -177,7 +177,7 @@ func (c *commandMaintenanceSet) run(ctx context.Context, rep repo.DirectReposito return errors.Errorf("no changes specified") } - blobCfg, err := rep.FormatManager().BlobCfgBlob() + blobCfg, err := rep.FormatManager().BlobCfgBlob(ctx) if err != nil { return errors.Wrap(err, "blob configuration") } diff --git a/cli/command_repository_set_parameters.go b/cli/command_repository_set_parameters.go index e7204259b..5930ce995 100644 --- a/cli/command_repository_set_parameters.go +++ b/cli/command_repository_set_parameters.go @@ -174,17 +174,17 @@ func (c *commandRepositorySetParameters) disableBlobRetention(ctx context.Contex } func (c *commandRepositorySetParameters) run(ctx context.Context, rep repo.DirectRepositoryWriter) error { - mp, err := rep.FormatManager().GetMutableParameters() + mp, err := rep.FormatManager().GetMutableParameters(ctx) if err != nil { return errors.Wrap(err, "mutable parameters") } - blobcfg, err := rep.FormatManager().BlobCfgBlob() + blobcfg, err := rep.FormatManager().BlobCfgBlob(ctx) if err != nil { return errors.Wrap(err, "blob configuration") } - requiredFeatures, err := rep.FormatManager().RequiredFeatures() + requiredFeatures, err := rep.FormatManager().RequiredFeatures(ctx) if err != nil { return errors.Wrap(err, "unable to get required features") } diff --git a/cli/command_repository_status.go b/cli/command_repository_status.go index 060195b1e..7121c8191 100644 --- a/cli/command_repository_status.go +++ b/cli/command_repository_status.go @@ -63,7 +63,7 @@ func (c *commandRepositoryStatus) outputJSON(ctx context.Context, r repo.Reposit ci := dr.BlobReader().ConnectionInfo() s.UniqueIDHex = hex.EncodeToString(dr.UniqueID()) s.ObjectFormat = dr.ObjectFormat() - s.BlobRetention, _ = dr.FormatManager().BlobCfgBlob() + s.BlobRetention, _ = dr.FormatManager().BlobCfgBlob(ctx) s.Storage = scrubber.ScrubSensitiveData(reflect.ValueOf(ci)).Interface().(blob.ConnectionInfo) //nolint:forcetypeassert s.ContentFormat = dr.FormatManager().ScrubbedContentFormat() @@ -82,13 +82,13 @@ func (c *commandRepositoryStatus) outputJSON(ctx context.Context, r repo.Reposit return nil } -func (c *commandRepositoryStatus) dumpUpgradeStatus(dr repo.DirectRepository) error { +func (c *commandRepositoryStatus) dumpUpgradeStatus(ctx context.Context, dr repo.DirectRepository) error { drw, isDr := dr.(repo.DirectRepositoryWriter) if !isDr { return nil } - l, err := drw.FormatManager().GetUpgradeLockIntent() + l, err := drw.FormatManager().GetUpgradeLockIntent(ctx) if err != nil { return errors.Wrap(err, "failed to get the upgrade lock intent") } @@ -120,8 +120,8 @@ func (c *commandRepositoryStatus) dumpUpgradeStatus(dr repo.DirectRepository) er return nil } -func (c *commandRepositoryStatus) dumpRetentionStatus(dr repo.DirectRepository) { - if blobcfg, _ := dr.FormatManager().BlobCfgBlob(); blobcfg.IsRetentionEnabled() { +func (c *commandRepositoryStatus) dumpRetentionStatus(ctx context.Context, dr repo.DirectRepository) { + if blobcfg, _ := dr.FormatManager().BlobCfgBlob(ctx); blobcfg.IsRetentionEnabled() { c.out.printStdout("\n") c.out.printStdout("Blob retention mode: %s\n", blobcfg.RetentionMode) c.out.printStdout("Blob retention period: %s\n", blobcfg.RetentionPeriod) @@ -174,7 +174,7 @@ func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository) contentFormat := dr.ContentReader().ContentFormat() - mp, mperr := contentFormat.GetMutableParameters() + mp, mperr := contentFormat.GetMutableParameters(ctx) if mperr != nil { return errors.Wrap(mperr, "mutable parameters") } @@ -188,12 +188,12 @@ func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository) c.out.printStdout("Content compression: %v\n", mp.IndexVersion >= index.Version2) c.out.printStdout("Password changes: %v\n", contentFormat.SupportsPasswordChange()) - c.outputRequiredFeatures(dr) + c.outputRequiredFeatures(ctx, dr) c.out.printStdout("Max pack length: %v\n", units.BytesString(int64(mp.MaxPackSize))) c.out.printStdout("Index Format: v%v\n", mp.IndexVersion) - emgr, epochMgrEnabled, emerr := dr.ContentReader().EpochManager() + emgr, epochMgrEnabled, emerr := dr.ContentReader().EpochManager(ctx) if emerr != nil { return errors.Wrap(emerr, "epoch manager") } @@ -216,9 +216,9 @@ func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository) c.out.printStdout("Epoch Manager: disabled\n") } - c.dumpRetentionStatus(dr) + c.dumpRetentionStatus(ctx, dr) - if err := c.dumpUpgradeStatus(dr); err != nil { + if err := c.dumpUpgradeStatus(ctx, dr); err != nil { return errors.Wrap(err, "failed to dump upgrade status") } @@ -251,8 +251,8 @@ func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository) return nil } -func (c *commandRepositoryStatus) outputRequiredFeatures(dr repo.DirectRepository) { - if req, _ := dr.FormatManager().RequiredFeatures(); len(req) > 0 { +func (c *commandRepositoryStatus) outputRequiredFeatures(ctx context.Context, dr repo.DirectRepository) { + if req, _ := dr.FormatManager().RequiredFeatures(ctx); len(req) > 0 { var featureIDs []string for _, r := range req { diff --git a/cli/command_repository_upgrade.go b/cli/command_repository_upgrade.go index 546903dc4..f8a737e84 100644 --- a/cli/command_repository_upgrade.go +++ b/cli/command_repository_upgrade.go @@ -297,7 +297,7 @@ func (c *commandRepositoryUpgrade) setLockIntent(ctx context.Context, rep repo.D now := rep.Time() - mp, mperr := rep.ContentReader().ContentFormat().GetMutableParameters() + mp, mperr := rep.ContentReader().ContentFormat().GetMutableParameters(ctx) if mperr != nil { return errors.Wrap(mperr, "mutable parameters") } @@ -355,7 +355,7 @@ func (c *commandRepositoryUpgrade) setLockIntent(ctx context.Context, rep repo.D func (c *commandRepositoryUpgrade) drainOrCommit(ctx context.Context, rep repo.DirectRepositoryWriter) error { cf := rep.ContentReader().ContentFormat() - mp, mperr := cf.GetMutableParameters() + mp, mperr := cf.GetMutableParameters(ctx) if mperr != nil { return errors.Wrap(mperr, "mutable parameters") } @@ -363,7 +363,7 @@ func (c *commandRepositoryUpgrade) drainOrCommit(ctx context.Context, rep repo.D if mp.EpochParameters.Enabled { log(ctx).Infof("Repository indices have already been migrated to the epoch format, no need to drain other clients") - l, err := rep.FormatManager().GetUpgradeLockIntent() + l, err := rep.FormatManager().GetUpgradeLockIntent(ctx) if err != nil { return errors.Wrap(err, "failed to get upgrade lock intent") } @@ -406,7 +406,7 @@ func (c *commandRepositoryUpgrade) sleepWithContext(ctx context.Context, dur tim func (c *commandRepositoryUpgrade) drainAllClients(ctx context.Context, rep repo.DirectRepositoryWriter) error { for { - l, err := rep.FormatManager().GetUpgradeLockIntent() + l, err := rep.FormatManager().GetUpgradeLockIntent(ctx) upgradeTime := l.UpgradeTime() now := rep.Time() @@ -436,12 +436,12 @@ func (c *commandRepositoryUpgrade) drainAllClients(ctx context.Context, rep repo // repository. This phase runs after the lock has been acquired in one of the // prior phases. func (c *commandRepositoryUpgrade) upgrade(ctx context.Context, rep repo.DirectRepositoryWriter) error { - mp, mperr := rep.ContentReader().ContentFormat().GetMutableParameters() + mp, mperr := rep.ContentReader().ContentFormat().GetMutableParameters(ctx) if mperr != nil { return errors.Wrap(mperr, "mutable parameters") } - rf, err := rep.FormatManager().RequiredFeatures() + rf, err := rep.FormatManager().RequiredFeatures(ctx) if err != nil { return errors.Wrap(err, "error getting repository features") } @@ -460,7 +460,7 @@ func (c *commandRepositoryUpgrade) upgrade(ctx context.Context, rep repo.DirectR return errors.Wrap(uerr, "error upgrading indices") } - blobCfg, err := rep.FormatManager().BlobCfgBlob() + blobCfg, err := rep.FormatManager().BlobCfgBlob(ctx) if err != nil { return errors.Wrap(err, "error getting blob configuration") } diff --git a/internal/epoch/epoch_manager.go b/internal/epoch/epoch_manager.go index 88f7a0716..4fc617523 100644 --- a/internal/epoch/epoch_manager.go +++ b/internal/epoch/epoch_manager.go @@ -33,7 +33,7 @@ // ParametersProvider provides epoch manager parameters. type ParametersProvider interface { - GetParameters() (*Parameters, error) + GetParameters(ctx context.Context) (*Parameters, error) } // ErrVerySlowIndexWrite is returned by WriteIndex if a write takes more than 2 epochs (usually >48h). @@ -327,7 +327,7 @@ func (e *Manager) cleanupEpochMarkers(ctx context.Context, cs CurrentSnapshot) e } } - p, err := e.getParameters() + p, err := e.getParameters(ctx) if err != nil { return err } @@ -363,7 +363,7 @@ func (e *Manager) CleanupSupersededIndexes(ctx context.Context) error { return err } - p, err := e.getParameters() + p, err := e.getParameters(ctx) if err != nil { return err } @@ -419,8 +419,8 @@ func blobSetWrittenEarlyEnough(replacementSet []blob.Metadata, maxReplacementTim return blob.MaxTimestamp(replacementSet).Before(maxReplacementTime) } -func (e *Manager) getParameters() (*Parameters, error) { - emp, err := e.paramProvider.GetParameters() +func (e *Manager) getParameters(ctx context.Context) (*Parameters, error) { + emp, err := e.paramProvider.GetParameters(ctx) if err != nil { return nil, errors.Wrap(err, "epoch manager parameters") } @@ -433,7 +433,7 @@ func (e *Manager) refreshLocked(ctx context.Context) error { return errors.Wrap(ctx.Err(), "refreshLocked") } - p, err := e.getParameters() + p, err := e.getParameters(ctx) if err != nil { return err } @@ -666,7 +666,7 @@ func (e *Manager) loadUncompactedEpochs(ctx context.Context, min, max int) (map[ func (e *Manager) refreshAttemptLocked(ctx context.Context) error { e.log.Debug("refreshAttemptLocked") - p, perr := e.getParameters() + p, perr := e.getParameters(ctx) if perr != nil { return perr } @@ -789,7 +789,7 @@ func (e *Manager) WriteIndex(ctx context.Context, dataShards map[blob.ID]blob.By for { e.log.Debug("WriteIndex") - p, err := e.getParameters() + p, err := e.getParameters(ctx) if err != nil { return nil, err } diff --git a/internal/epoch/epoch_manager_test.go b/internal/epoch/epoch_manager_test.go index 608a86659..313a99c0c 100644 --- a/internal/epoch/epoch_manager_test.go +++ b/internal/epoch/epoch_manager_test.go @@ -348,7 +348,7 @@ func TestIndexEpochManager_NoCompactionInReadOnly(t *testing.T) { return nil } - p, err := te.mgr.getParameters() + p, err := te.mgr.getParameters(ctx) require.NoError(t, err) // Write data to the index such that the next time it's opened it should @@ -806,7 +806,7 @@ type parameterProvider struct { *Parameters } -func (p parameterProvider) GetParameters() (*Parameters, error) { +func (p parameterProvider) GetParameters(ctx context.Context) (*Parameters, error) { return p.Parameters, nil } diff --git a/internal/server/api_repo.go b/internal/server/api_repo.go index 3e499c50e..5223d592a 100644 --- a/internal/server/api_repo.go +++ b/internal/server/api_repo.go @@ -35,7 +35,7 @@ func handleRepoParameters(ctx context.Context, rc requestContext) (interface{}, }, nil } - scc, err := dr.ContentReader().SupportsContentCompression() + scc, err := dr.ContentReader().SupportsContentCompression(ctx) if err != nil { return nil, internalServerError(err) } @@ -60,12 +60,14 @@ func handleRepoStatus(ctx context.Context, rc requestContext) (interface{}, *api dr, ok := rc.rep.(repo.DirectRepository) if ok { - mp, mperr := dr.ContentReader().ContentFormat().GetMutableParameters() + contentFormat := dr.ContentReader().ContentFormat() + + mp, mperr := contentFormat.GetMutableParameters(ctx) if mperr != nil { return nil, internalServerError(mperr) } - scc, err := dr.ContentReader().SupportsContentCompression() + scc, err := dr.ContentReader().SupportsContentCompression(ctx) if err != nil { return nil, internalServerError(err) } @@ -74,10 +76,10 @@ func handleRepoStatus(ctx context.Context, rc requestContext) (interface{}, *api Connected: true, ConfigFile: dr.ConfigFilename(), FormatVersion: mp.Version, - Hash: dr.ContentReader().ContentFormat().GetHashFunction(), - Encryption: dr.ContentReader().ContentFormat().GetEncryptionAlgorithm(), - ECC: dr.ContentReader().ContentFormat().GetECCAlgorithm(), - ECCOverheadPercent: dr.ContentReader().ContentFormat().GetECCOverheadPercent(), + Hash: contentFormat.GetHashFunction(), + Encryption: contentFormat.GetEncryptionAlgorithm(), + ECC: contentFormat.GetECCAlgorithm(), + ECCOverheadPercent: contentFormat.GetECCOverheadPercent(), MaxPackSize: mp.MaxPackSize, Splitter: dr.ObjectFormat().Splitter, Storage: dr.BlobReader().ConnectionInfo().Type, diff --git a/internal/server/grpc_session.go b/internal/server/grpc_session.go index fcd3f4256..b8d020c34 100644 --- a/internal/server/grpc_session.go +++ b/internal/server/grpc_session.go @@ -551,7 +551,7 @@ func (s *Server) handleInitialSessionHandshake(srv grpcapi.KopiaRepository_Sessi return repo.WriteSessionOptions{}, errors.Errorf("missing initialization request") } - scc, err := dr.ContentReader().SupportsContentCompression() + scc, err := dr.ContentReader().SupportsContentCompression(srv.Context()) if err != nil { return repo.WriteSessionOptions{}, errors.Wrap(err, "supports content compression") } diff --git a/repo/api_server_repository.go b/repo/api_server_repository.go index 8233980d0..0d9b6e2ee 100644 --- a/repo/api_server_repository.go +++ b/repo/api_server_repository.go @@ -157,7 +157,7 @@ func (r *apiServerRepository) Flush(ctx context.Context) error { return nil } -func (r *apiServerRepository) SupportsContentCompression() (bool, error) { +func (r *apiServerRepository) SupportsContentCompression(ctx context.Context) (bool, error) { return r.serverSupportsContentCompression, nil } diff --git a/repo/content/committed_content_index.go b/repo/content/committed_content_index.go index ef0ae5eb4..fc222b5a4 100644 --- a/repo/content/committed_content_index.go +++ b/repo/content/committed_content_index.go @@ -186,7 +186,7 @@ func (c *committedContentIndex) merge(ctx context.Context, indexFiles []blob.ID) newUsedMap[e] = ndx } - mergedAndCombined, err := c.combineSmallIndexes(newMerged) + mergedAndCombined, err := c.combineSmallIndexes(ctx, newMerged) if err != nil { newlyOpened.Close() //nolint:errcheck @@ -239,7 +239,7 @@ func (c *committedContentIndex) use(ctx context.Context, indexFiles []blob.ID, i return nil } -func (c *committedContentIndex) combineSmallIndexes(m index.Merged) (index.Merged, error) { +func (c *committedContentIndex) combineSmallIndexes(ctx context.Context, m index.Merged) (index.Merged, error) { var toKeep, toMerge index.Merged for _, ndx := range m { @@ -265,7 +265,7 @@ func (c *committedContentIndex) combineSmallIndexes(m index.Merged) (index.Merge } } - mp, mperr := c.formatProvider.GetMutableParameters() + mp, mperr := c.formatProvider.GetMutableParameters(ctx) if mperr != nil { return nil, errors.Wrap(mperr, "error getting mutable parameters") } diff --git a/repo/content/committed_read_manager.go b/repo/content/committed_read_manager.go index 3b8cd9872..4b4174c96 100644 --- a/repo/content/committed_read_manager.go +++ b/repo/content/committed_read_manager.go @@ -204,7 +204,7 @@ func (sm *SharedManager) loadPackIndexesLocked(ctx context.Context) error { nextSleepTime := 100 * time.Millisecond //nolint:gomnd for i := 0; i < indexLoadAttempts; i++ { - ibm, err0 := sm.indexBlobManager() + ibm, err0 := sm.indexBlobManager(ctx) if err0 != nil { return err0 } @@ -268,8 +268,8 @@ func (sm *SharedManager) getCacheForContentID(id ID) cache.ContentCache { } // indexBlobManager return the index manager for content. -func (sm *SharedManager) indexBlobManager() (indexblob.Manager, error) { - mp, mperr := sm.format.GetMutableParameters() +func (sm *SharedManager) indexBlobManager(ctx context.Context) (indexblob.Manager, error) { + mp, mperr := sm.format.GetMutableParameters(ctx) if mperr != nil { return nil, errors.Wrap(mperr, "mutable parameters") } @@ -359,7 +359,7 @@ func (sm *SharedManager) IndexBlobs(ctx context.Context, includeInactive bool) ( return result, nil } - ibm, err0 := sm.indexBlobManager() + ibm, err0 := sm.indexBlobManager(ctx) if err0 != nil { return nil, err0 } @@ -539,8 +539,8 @@ type epochParameters struct { prov format.Provider } -func (p epochParameters) GetParameters() (*epoch.Parameters, error) { - mp, mperr := p.prov.GetMutableParameters() +func (p epochParameters) GetParameters(ctx context.Context) (*epoch.Parameters, error) { + mp, mperr := p.prov.GetMutableParameters(ctx) if mperr != nil { return nil, errors.Wrap(mperr, "mutable parameters") } @@ -549,8 +549,8 @@ func (p epochParameters) GetParameters() (*epoch.Parameters, error) { } // EpochManager returns the epoch manager. -func (sm *SharedManager) EpochManager() (*epoch.Manager, bool, error) { - ibm, err := sm.indexBlobManager() +func (sm *SharedManager) EpochManager(ctx context.Context) (*epoch.Manager, bool, error) { + ibm, err := sm.indexBlobManager(ctx) if err != nil { return nil, false, err } diff --git a/repo/content/content_index_recovery.go b/repo/content/content_index_recovery.go index 5adf99abd..95a0892f3 100644 --- a/repo/content/content_index_recovery.go +++ b/repo/content/content_index_recovery.go @@ -10,6 +10,7 @@ "github.com/kopia/kopia/internal/gather" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/content/index" + "github.com/kopia/kopia/repo/format" ) // RecoverIndexFromPackBlob attempts to recover index blob entries from a given pack file. @@ -42,7 +43,7 @@ func (bm *WriteManager) RecoverIndexFromPackBlob(ctx context.Context, packFile b if commit { bm.lock() - defer bm.unlock() + defer bm.unlock(ctx) for _, is := range recovered { bm.packIndexBuilder.Add(is) @@ -170,12 +171,7 @@ func decodePostamble(payload []byte) *packContentPostamble { } } -func (sm *SharedManager) buildLocalIndex(pending index.Builder, output *gather.WriteBuffer) error { - mp, mperr := sm.format.GetMutableParameters() - if mperr != nil { - return errors.Wrap(mperr, "mutable parameters") - } - +func (sm *SharedManager) buildLocalIndex(mp format.MutableParameters, pending index.Builder, output *gather.WriteBuffer) error { if err := pending.Build(output, mp.IndexVersion); err != nil { return errors.Wrap(err, "unable to build local index") } @@ -184,14 +180,14 @@ func (sm *SharedManager) buildLocalIndex(pending index.Builder, output *gather.W } // appendPackFileIndexRecoveryData appends data designed to help with recovery of pack index in case it gets damaged or lost. -func (sm *SharedManager) appendPackFileIndexRecoveryData(pending index.Builder, output *gather.WriteBuffer) error { +func (sm *SharedManager) appendPackFileIndexRecoveryData(mp format.MutableParameters, pending index.Builder, output *gather.WriteBuffer) error { // build, encrypt and append local index localIndexOffset := output.Length() var localIndex gather.WriteBuffer defer localIndex.Close() - if err := sm.buildLocalIndex(pending, &localIndex); err != nil { + if err := sm.buildLocalIndex(mp, pending, &localIndex); err != nil { return err } diff --git a/repo/content/content_manager.go b/repo/content/content_manager.go index 06a3431bc..bbf0477e4 100644 --- a/repo/content/content_manager.go +++ b/repo/content/content_manager.go @@ -121,7 +121,7 @@ func (bm *WriteManager) Revision() int64 { // of randomness or a contemporaneous timestamp that will never reappear. func (bm *WriteManager) DeleteContent(ctx context.Context, contentID ID) error { bm.lock() - defer bm.unlock() + defer bm.unlock(ctx) bm.revision.Add(1) @@ -213,7 +213,7 @@ func (bm *WriteManager) contentWriteTime(previousUnixTimeSeconds int64) int64 { func (bm *WriteManager) maybeFlushBasedOnTimeUnlocked(ctx context.Context) error { bm.lock() shouldFlush := bm.timeNow().After(bm.flushPackIndexesAfter) - bm.unlock() + bm.unlock(ctx) if !shouldFlush { return nil @@ -224,7 +224,7 @@ func (bm *WriteManager) maybeFlushBasedOnTimeUnlocked(ctx context.Context) error func (bm *WriteManager) maybeRetryWritingFailedPacksUnlocked(ctx context.Context) error { bm.lock() - defer bm.unlock() + defer bm.unlock(ctx) // do not start new uploads while flushing for bm.flushing { @@ -271,7 +271,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat if previousWriteTime < 0 { if _, _, err = bm.getContentInfoReadLocked(ctx, contentID); err == nil { // we lost the race while compressing the content, the content now exists. - bm.unlock() + bm.unlock(ctx) return nil } } @@ -294,14 +294,14 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat bm.log.Debugf("retry-write %v", pp.packBlobID) if err = bm.writePackAndAddToIndexLocked(ctx, pp); err != nil { - bm.unlock() + bm.unlock(ctx) return errors.Wrap(err, "error writing previously failed pack") } } pp, err := bm.getOrCreatePendingPackInfoLocked(ctx, prefix) if err != nil { - bm.unlock() + bm.unlock(ctx) return errors.Wrap(err, "unable to create pending pack") } @@ -316,7 +316,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat } if _, err := compressedAndEncrypted.Bytes().WriteTo(pp.currentPackData); err != nil { - bm.unlock() + bm.unlock(ctx) return errors.Wrapf(err, "unable to append %q to pack data", contentID) } @@ -333,7 +333,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat bm.writingPacks = append(bm.writingPacks, pp) } - bm.unlock() + bm.unlock(ctx) // at this point we're unlocked so different goroutines can encrypt and // save to storage in parallel. @@ -349,7 +349,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat // DisableIndexFlush increments the counter preventing automatic index flushes. func (bm *WriteManager) DisableIndexFlush(ctx context.Context) { bm.lock() - defer bm.unlock() + defer bm.unlock(ctx) bm.log.Debugf("DisableIndexFlush()") bm.disableIndexFlushCount++ } @@ -358,7 +358,7 @@ func (bm *WriteManager) DisableIndexFlush(ctx context.Context) { // The flushes will be re-enabled when the index drops to zero. func (bm *WriteManager) EnableIndexFlush(ctx context.Context) { bm.lock() - defer bm.unlock() + defer bm.unlock(ctx) bm.log.Debugf("EnableIndexFlush()") bm.disableIndexFlushCount-- } @@ -417,7 +417,7 @@ func (bm *WriteManager) writeIndexBlobs(ctx context.Context, dataShards []gather ctx, span := tracer.Start(ctx, "WriteIndexBlobs") defer span.End() - ibm, err := bm.indexBlobManager() + ibm, err := bm.indexBlobManager(ctx) if err != nil { return nil, err } @@ -509,7 +509,7 @@ func (bm *WriteManager) writePackAndAddToIndexUnlocked(ctx context.Context, pp * packFileIndex, writeErr := bm.prepareAndWritePackInternal(ctx, pp, bm.onUpload) bm.lock() - defer bm.unlock() + defer bm.unlock(ctx) return bm.processWritePackResultLocked(pp, packFileIndex, writeErr) } @@ -547,7 +547,12 @@ func (bm *WriteManager) processWritePackResultLocked(pp *pendingPackInfo, packFi } func (sm *SharedManager) prepareAndWritePackInternal(ctx context.Context, pp *pendingPackInfo, onUpload func(int64)) (index.Builder, error) { - packFileIndex, err := sm.preparePackDataContent(pp) + mp, mperr := sm.format.GetMutableParameters(ctx) + if mperr != nil { + return nil, errors.Wrap(mperr, "mutable parameters") + } + + packFileIndex, err := sm.preparePackDataContent(mp, pp) if err != nil { return nil, errors.Wrap(err, "error preparing data content") } @@ -590,13 +595,13 @@ func (bm *WriteManager) setFlushingLocked(v bool) { // Any pending writes completed before Flush() has started are guaranteed to be committed to the // repository before Flush() returns. func (bm *WriteManager) Flush(ctx context.Context) error { - mp, mperr := bm.format.GetMutableParameters() + mp, mperr := bm.format.GetMutableParameters(ctx) if mperr != nil { return errors.Wrap(mperr, "mutable parameters") } bm.lock() - defer bm.unlock() + defer bm.unlock(ctx) bm.log.Debugf("flush") @@ -645,7 +650,7 @@ func (bm *WriteManager) Flush(ctx context.Context) error { func (bm *WriteManager) RewriteContent(ctx context.Context, contentID ID) error { bm.log.Debugf("rewrite-content %v", contentID) - mp, mperr := bm.format.GetMutableParameters() + mp, mperr := bm.format.GetMutableParameters(ctx) if mperr != nil { return errors.Wrap(mperr, "mutable parameters") } @@ -676,7 +681,7 @@ func (bm *WriteManager) getContentDataAndInfo(ctx context.Context, contentID ID, func (bm *WriteManager) UndeleteContent(ctx context.Context, contentID ID) error { bm.log.Debugf("UndeleteContent(%q)", contentID) - mp, mperr := bm.format.GetMutableParameters() + mp, mperr := bm.format.GetMutableParameters(ctx) if mperr != nil { return errors.Wrap(mperr, "mutable parameters") } @@ -740,7 +745,7 @@ func (bm *WriteManager) getOrCreatePendingPackInfoLocked(ctx context.Context, pr return nil, errors.Wrap(err, "unable to read crypto bytes") } - suffix, berr := bm.format.RepositoryFormatBytes() + suffix, berr := bm.format.RepositoryFormatBytes(ctx) if berr != nil { return nil, errors.Wrap(berr, "format bytes") } @@ -763,8 +768,8 @@ func (bm *WriteManager) getOrCreatePendingPackInfoLocked(ctx context.Context, pr } // SupportsContentCompression returns true if content manager supports content-compression. -func (bm *WriteManager) SupportsContentCompression() (bool, error) { - mp, mperr := bm.format.GetMutableParameters() +func (bm *WriteManager) SupportsContentCompression(ctx context.Context) (bool, error) { + mp, mperr := bm.format.GetMutableParameters(ctx) if mperr != nil { return false, errors.Wrap(mperr, "mutable parameters") } @@ -780,7 +785,7 @@ func (bm *WriteManager) WriteContent(ctx context.Context, data gather.Bytes, pre bm.writeContentBytes.Observe(int64(data.Length()), t0.Elapsed()) }() - mp, mperr := bm.format.GetMutableParameters() + mp, mperr := bm.format.GetMutableParameters(ctx) if mperr != nil { return EmptyID, errors.Wrap(mperr, "mutable parameters") } @@ -924,9 +929,9 @@ func (bm *WriteManager) lock() { } // +checklocksrelease:bm.mu -func (bm *WriteManager) unlock() { +func (bm *WriteManager) unlock(ctx context.Context) { if bm.checkInvariantsOnUnlock { - mp, mperr := bm.format.GetMutableParameters() + mp, mperr := bm.format.GetMutableParameters(ctx) if mperr == nil { bm.verifyInvariantsLocked(mp) } diff --git a/repo/content/content_manager_indexes.go b/repo/content/content_manager_indexes.go index 07742ffce..62368d976 100644 --- a/repo/content/content_manager_indexes.go +++ b/repo/content/content_manager_indexes.go @@ -20,7 +20,7 @@ func (sm *SharedManager) Refresh(ctx context.Context) error { sm.log.Debugf("Refresh started") - ibm, err := sm.indexBlobManager() + ibm, err := sm.indexBlobManager(ctx) if err != nil { return err } @@ -44,7 +44,7 @@ func (sm *SharedManager) CompactIndexes(ctx context.Context, opt indexblob.Compa sm.log.Debugf("CompactIndexes(%+v)", opt) - ibm, err := sm.indexBlobManager() + ibm, err := sm.indexBlobManager(ctx) if err != nil { return err } diff --git a/repo/content/content_manager_iterate.go b/repo/content/content_manager_iterate.go index af563f823..6e6691e42 100644 --- a/repo/content/content_manager_iterate.go +++ b/repo/content/content_manager_iterate.go @@ -84,9 +84,9 @@ func maybeParallelExecutor(parallel int, originalCallback IterateCallback) (Iter return callback, cleanup } -func (bm *WriteManager) snapshotUncommittedItems() index.Builder { +func (bm *WriteManager) snapshotUncommittedItems(ctx context.Context) index.Builder { bm.lock() - defer bm.unlock() + defer bm.unlock(ctx) overlay := bm.packIndexBuilder.Clone() @@ -116,7 +116,7 @@ func (bm *WriteManager) IterateContents(ctx context.Context, opts IterateOptions callback, cleanup := maybeParallelExecutor(opts.Parallel, callback) defer cleanup() //nolint:errcheck - uncommitted := bm.snapshotUncommittedItems() + uncommitted := bm.snapshotUncommittedItems(ctx) invokeCallback := func(i Info) error { if !opts.IncludeDeleted { diff --git a/repo/content/content_manager_lock_free.go b/repo/content/content_manager_lock_free.go index 8e769f07f..493428ae4 100644 --- a/repo/content/content_manager_lock_free.go +++ b/repo/content/content_manager_lock_free.go @@ -121,7 +121,7 @@ func (sm *SharedManager) getContentDataReadLocked(ctx context.Context, pp *pendi return sm.decryptContentAndVerify(payload.Bytes(), bi, output) } -func (sm *SharedManager) preparePackDataContent(pp *pendingPackInfo) (index.Builder, error) { +func (sm *SharedManager) preparePackDataContent(mp format.MutableParameters, pp *pendingPackInfo) (index.Builder, error) { packFileIndex := index.Builder{} haveContent := false @@ -173,7 +173,7 @@ func (sm *SharedManager) preparePackDataContent(pp *pendingPackInfo) (index.Buil } } - err := sm.appendPackFileIndexRecoveryData(packFileIndex, pp.currentPackData) + err := sm.appendPackFileIndexRecoveryData(mp, packFileIndex, pp.currentPackData) return packFileIndex, err } diff --git a/repo/content/content_manager_test.go b/repo/content/content_manager_test.go index 26404088b..eeb077236 100644 --- a/repo/content/content_manager_test.go +++ b/repo/content/content_manager_test.go @@ -1830,7 +1830,7 @@ func (s *contentManagerSuite) TestAutoCompressionOfMetadata(t *testing.T) { info, err := bm.ContentInfo(ctx, contentID) require.NoError(t, err) - if scc, _ := bm.SupportsContentCompression(); scc { + if scc, _ := bm.SupportsContentCompression(ctx); scc { require.Equal(t, compression.HeaderZstdFastest, info.GetCompressionHeaderID()) } else { require.Equal(t, NoCompression, info.GetCompressionHeaderID()) diff --git a/repo/content/content_reader.go b/repo/content/content_reader.go index f9bb95e5f..0b1cfeae2 100644 --- a/repo/content/content_reader.go +++ b/repo/content/content_reader.go @@ -9,12 +9,12 @@ // Reader defines content read API. type Reader interface { - SupportsContentCompression() (bool, error) + SupportsContentCompression(ctx context.Context) (bool, error) ContentFormat() format.Provider GetContent(ctx context.Context, id ID) ([]byte, error) ContentInfo(ctx context.Context, id ID) (Info, error) IterateContents(ctx context.Context, opts IterateOptions, callback IterateCallback) error IteratePacks(ctx context.Context, opts IteratePackOptions, callback IteratePacksCallback) error ListActiveSessions(ctx context.Context) (map[SessionID]*SessionInfo, error) - EpochManager() (*epoch.Manager, bool, error) + EpochManager(ctx context.Context) (*epoch.Manager, bool, error) } diff --git a/repo/content/indexblob/index_blob_manager_v0.go b/repo/content/indexblob/index_blob_manager_v0.go index 1f76f493f..a88bb72f4 100644 --- a/repo/content/indexblob/index_blob_manager_v0.go +++ b/repo/content/indexblob/index_blob_manager_v0.go @@ -53,7 +53,7 @@ type cleanupEntry struct { // IndexFormattingOptions provides options for formatting index blobs. type IndexFormattingOptions interface { - GetMutableParameters() (format.MutableParameters, error) + GetMutableParameters(ctx context.Context) (format.MutableParameters, error) } // ManagerV0 is a V0 (legacy) implementation of index blob manager. @@ -155,7 +155,7 @@ func (m *ManagerV0) Compact(ctx context.Context, opt CompactOptions) error { return errors.Wrap(err, "error listing active index blobs") } - mp, mperr := m.formattingOptions.GetMutableParameters() + mp, mperr := m.formattingOptions.GetMutableParameters(ctx) if mperr != nil { return errors.Wrap(mperr, "mutable parameters") } @@ -484,7 +484,7 @@ func (m *ManagerV0) compactIndexBlobs(ctx context.Context, indexBlobs []Metadata return nil } - mp, mperr := m.formattingOptions.GetMutableParameters() + mp, mperr := m.formattingOptions.GetMutableParameters(ctx) if mperr != nil { return errors.Wrap(mperr, "mutable parameters") } diff --git a/repo/content/indexblob/index_blob_manager_v1.go b/repo/content/indexblob/index_blob_manager_v1.go index d9a415db5..04df58d0f 100644 --- a/repo/content/indexblob/index_blob_manager_v1.go +++ b/repo/content/indexblob/index_blob_manager_v1.go @@ -76,7 +76,7 @@ func (m *ManagerV1) CompactEpoch(ctx context.Context, blobIDs []blob.ID, outputP } } - mp, mperr := m.formattingOptions.GetMutableParameters() + mp, mperr := m.formattingOptions.GetMutableParameters(ctx) if mperr != nil { return errors.Wrap(mperr, "mutable parameters") } diff --git a/repo/format/content_format.go b/repo/format/content_format.go index fb5e98ef5..0dfbe06c3 100644 --- a/repo/format/content_format.go +++ b/repo/format/content_format.go @@ -1,6 +1,8 @@ package format import ( + "context" + "github.com/pkg/errors" "github.com/kopia/kopia/internal/epoch" @@ -44,7 +46,7 @@ func (f *ContentFormat) ResolveFormatVersion() error { } // GetMutableParameters implements FormattingOptionsProvider. -func (f *ContentFormat) GetMutableParameters() (MutableParameters, error) { +func (f *ContentFormat) GetMutableParameters(ctx context.Context) (MutableParameters, error) { return f.MutableParameters, nil } diff --git a/repo/format/format_manager.go b/repo/format/format_manager.go index dcb86f632..48b375dc2 100644 --- a/repo/format/format_manager.go +++ b/repo/format/format_manager.go @@ -59,8 +59,8 @@ type Manager struct { ignoreCacheOnFirstRefresh bool } -func (m *Manager) getOrRefreshFormat() (Provider, error) { - if err := m.maybeRefreshNotLocked(); err != nil { +func (m *Manager) getOrRefreshFormat(ctx context.Context) (Provider, error) { + if err := m.maybeRefreshNotLocked(ctx); err != nil { return nil, err } @@ -70,7 +70,7 @@ func (m *Manager) getOrRefreshFormat() (Provider, error) { return m.current, nil } -func (m *Manager) maybeRefreshNotLocked() error { +func (m *Manager) maybeRefreshNotLocked(ctx context.Context) error { m.mu.RLock() val := m.validUntil m.mu.RUnlock() @@ -80,7 +80,7 @@ func (m *Manager) maybeRefreshNotLocked() error { } // current format not valid anymore, kick off a refresh - return m.refresh(m.ctx) + return m.refresh(ctx) } // readAndCacheRepositoryBlobBytes reads the provided blob from the repository or cache directory. @@ -247,31 +247,31 @@ func (m *Manager) SupportsPasswordChange() bool { // RepositoryFormatBytes returns the bytes of `kopia.repository` blob. // This function blocks to refresh the format blob if necessary. -func (m *Manager) RepositoryFormatBytes() ([]byte, error) { - f, err := m.getOrRefreshFormat() +func (m *Manager) RepositoryFormatBytes(ctx context.Context) ([]byte, error) { + f, err := m.getOrRefreshFormat(ctx) if err != nil { return nil, err } //nolint:wrapcheck - return f.RepositoryFormatBytes() + return f.RepositoryFormatBytes(ctx) } // GetMutableParameters gets mutable paramers of the repository. // This function blocks to refresh the format blob if necessary. -func (m *Manager) GetMutableParameters() (MutableParameters, error) { - f, err := m.getOrRefreshFormat() +func (m *Manager) GetMutableParameters(ctx context.Context) (MutableParameters, error) { + f, err := m.getOrRefreshFormat(ctx) if err != nil { return MutableParameters{}, err } //nolint:wrapcheck - return f.GetMutableParameters() + return f.GetMutableParameters(ctx) } // UpgradeLockIntent returns the current lock intent. -func (m *Manager) UpgradeLockIntent() (*UpgradeLockIntent, error) { - if err := m.maybeRefreshNotLocked(); err != nil { +func (m *Manager) UpgradeLockIntent(ctx context.Context) (*UpgradeLockIntent, error) { + if err := m.maybeRefreshNotLocked(ctx); err != nil { return nil, err } @@ -282,8 +282,8 @@ func (m *Manager) UpgradeLockIntent() (*UpgradeLockIntent, error) { } // RequiredFeatures returns the list of features required to open the repository. -func (m *Manager) RequiredFeatures() ([]feature.Required, error) { - if err := m.maybeRefreshNotLocked(); err != nil { +func (m *Manager) RequiredFeatures(ctx context.Context) ([]feature.Required, error) { + if err := m.maybeRefreshNotLocked(ctx); err != nil { return nil, err } @@ -326,8 +326,8 @@ func (m *Manager) UniqueID() []byte { } // BlobCfgBlob gets the BlobStorageConfiguration. -func (m *Manager) BlobCfgBlob() (BlobStorageConfiguration, error) { - if err := m.maybeRefreshNotLocked(); err != nil { +func (m *Manager) BlobCfgBlob(ctx context.Context) (BlobStorageConfiguration, error) { + if err := m.maybeRefreshNotLocked(ctx); err != nil { return BlobStorageConfiguration{}, err } diff --git a/repo/format/format_manager_test.go b/repo/format/format_manager_test.go index 69e9335c1..2a3e3db36 100644 --- a/repo/format/format_manager_test.go +++ b/repo/format/format_manager_test.go @@ -402,7 +402,7 @@ func TestFormatManagerValidDuration(t *testing.T) { func mustGetMutableParameters(t *testing.T, mgr *format.Manager) format.MutableParameters { t.Helper() - mp, err := mgr.GetMutableParameters() + mp, err := mgr.GetMutableParameters(testlogging.Context(t)) require.NoError(t, err) return mp @@ -411,7 +411,7 @@ func mustGetMutableParameters(t *testing.T, mgr *format.Manager) format.MutableP func mustGetUpgradeLockIntent(t *testing.T, mgr *format.Manager) *format.UpgradeLockIntent { t.Helper() - uli, err := mgr.GetUpgradeLockIntent() + uli, err := mgr.GetUpgradeLockIntent(testlogging.Context(t)) require.NoError(t, err) return uli @@ -420,7 +420,7 @@ func mustGetUpgradeLockIntent(t *testing.T, mgr *format.Manager) *format.Upgrade func mustGetRepositoryFormatBytes(t *testing.T, mgr *format.Manager) []byte { t.Helper() - b, err := mgr.RepositoryFormatBytes() + b, err := mgr.RepositoryFormatBytes(testlogging.Context(t)) require.NoError(t, err) return b @@ -429,7 +429,7 @@ func mustGetRepositoryFormatBytes(t *testing.T, mgr *format.Manager) []byte { func mustGetRequiredFeatures(t *testing.T, mgr *format.Manager) []feature.Required { t.Helper() - rf, err := mgr.RequiredFeatures() + rf, err := mgr.RequiredFeatures(testlogging.Context(t)) require.NoError(t, err) return rf @@ -438,7 +438,7 @@ func mustGetRequiredFeatures(t *testing.T, mgr *format.Manager) []feature.Requir func mustGetBlobStorageConfiguration(t *testing.T, mgr *format.Manager) format.BlobStorageConfiguration { t.Helper() - cfg, err := mgr.BlobCfgBlob() + cfg, err := mgr.BlobCfgBlob(testlogging.Context(t)) require.NoError(t, err) return cfg @@ -447,7 +447,7 @@ func mustGetBlobStorageConfiguration(t *testing.T, mgr *format.Manager) format.B func expectMutableParametersError(t *testing.T, mgr *format.Manager) error { t.Helper() - _, err := mgr.GetMutableParameters() + _, err := mgr.GetMutableParameters(testlogging.Context(t)) require.Error(t, err) return err diff --git a/repo/format/format_provider.go b/repo/format/format_provider.go index f0d23e0e7..3042b137d 100644 --- a/repo/format/format_provider.go +++ b/repo/format/format_provider.go @@ -1,6 +1,8 @@ package format import ( + "context" + "github.com/pkg/errors" "github.com/kopia/kopia/internal/gather" @@ -56,11 +58,11 @@ type Provider interface { // this is typically cached, but sometimes refreshes MutableParameters from // the repository so the results should not be cached. - GetMutableParameters() (MutableParameters, error) + GetMutableParameters(ctx context.Context) (MutableParameters, error) SupportsPasswordChange() bool GetMasterKey() []byte - RepositoryFormatBytes() ([]byte, error) + RepositoryFormatBytes(ctx context.Context) ([]byte, error) } type formattingOptionsProvider struct { @@ -149,7 +151,7 @@ func (f *formattingOptionsProvider) HashFunc() hashing.HashFunc { return f.h } -func (f *formattingOptionsProvider) RepositoryFormatBytes() ([]byte, error) { +func (f *formattingOptionsProvider) RepositoryFormatBytes(ctx context.Context) ([]byte, error) { if f.SupportsPasswordChange() { return nil, nil } diff --git a/repo/format/upgrade_lock.go b/repo/format/upgrade_lock.go index fda433d14..48680e598 100644 --- a/repo/format/upgrade_lock.go +++ b/repo/format/upgrade_lock.go @@ -37,7 +37,7 @@ func BackupBlobID(l UpgradeLockIntent) blob.ID { // should cause the unsupporting clients (non-upgrade capable) to fail // connecting to the repository. func (m *Manager) SetUpgradeLockIntent(ctx context.Context, l UpgradeLockIntent) (*UpgradeLockIntent, error) { - if err := m.maybeRefreshNotLocked(); err != nil { + if err := m.maybeRefreshNotLocked(ctx); err != nil { return nil, err } @@ -96,7 +96,7 @@ func WriteLegacyIndexPoisonBlob(ctx context.Context, st blob.Storage) error { // blob. This in-effect commits the new repository format to the repository and // resumes all access to the repository. func (m *Manager) CommitUpgrade(ctx context.Context) error { - if err := m.maybeRefreshNotLocked(); err != nil { + if err := m.maybeRefreshNotLocked(ctx); err != nil { return err } @@ -125,7 +125,7 @@ func (m *Manager) CommitUpgrade(ctx context.Context) error { // hence using this API could render the repository corrupted and unreadable by // clients. func (m *Manager) RollbackUpgrade(ctx context.Context) error { - if err := m.maybeRefreshNotLocked(); err != nil { + if err := m.maybeRefreshNotLocked(ctx); err != nil { return err } @@ -186,8 +186,8 @@ func (m *Manager) RollbackUpgrade(ctx context.Context) error { } // GetUpgradeLockIntent gets the current upgrade lock intent. -func (m *Manager) GetUpgradeLockIntent() (*UpgradeLockIntent, error) { - if err := m.maybeRefreshNotLocked(); err != nil { +func (m *Manager) GetUpgradeLockIntent(ctx context.Context) (*UpgradeLockIntent, error) { + if err := m.maybeRefreshNotLocked(ctx); err != nil { return nil, err } diff --git a/repo/format/upgrade_lock_test.go b/repo/format/upgrade_lock_test.go index 6088716cd..e1a701786 100644 --- a/repo/format/upgrade_lock_test.go +++ b/repo/format/upgrade_lock_test.go @@ -192,7 +192,7 @@ func TestFormatUpgradeMultipleLocksRollback(t *testing.T) { opts.UpgradeOwnerID = "another-upgrade-owner" }) - mp, mperr := env.RepositoryWriter.ContentManager().ContentFormat().GetMutableParameters() + mp, mperr := env.RepositoryWriter.ContentManager().ContentFormat().GetMutableParameters(ctx) require.NoError(t, mperr) require.Equal(t, format.FormatVersion3, mp.Version) @@ -213,7 +213,7 @@ func TestFormatUpgradeMultipleLocksRollback(t *testing.T) { require.EqualError(t, env.RepositoryWriter.FormatManager().CommitUpgrade(ctx), "no upgrade in progress") // verify that we are back to the original version where we started from - mp, err = env.RepositoryWriter.ContentManager().ContentFormat().GetMutableParameters() + mp, err = env.RepositoryWriter.ContentManager().ContentFormat().GetMutableParameters(ctx) require.NoError(t, err) require.Equal(t, format.FormatVersion1, mp.Version) diff --git a/repo/grpc_repository_client.go b/repo/grpc_repository_client.go index f6ef3ef95..961228f4e 100644 --- a/repo/grpc_repository_client.go +++ b/repo/grpc_repository_client.go @@ -682,7 +682,7 @@ func (r *grpcInnerSession) GetContent(ctx context.Context, contentID content.ID) return nil, errNoSessionResponse() } -func (r *grpcRepositoryClient) SupportsContentCompression() (bool, error) { +func (r *grpcRepositoryClient) SupportsContentCompression(ctx context.Context) (bool, error) { return r.serverSupportsContentCompression, nil } diff --git a/repo/maintenance/blob_retain.go b/repo/maintenance/blob_retain.go index e195f63c0..041ecee10 100644 --- a/repo/maintenance/blob_retain.go +++ b/repo/maintenance/blob_retain.go @@ -40,7 +40,7 @@ func ExtendBlobRetentionTime(ctx context.Context, rep repo.DirectRepositoryWrite opt.Parallel = runtime.NumCPU() * parallelBlobRetainCPUMultiplier } - blobCfg, err := rep.FormatManager().BlobCfgBlob() + blobCfg, err := rep.FormatManager().BlobCfgBlob(ctx) if err != nil { return 0, errors.Wrap(err, "blob configuration") } diff --git a/repo/maintenance/content_rewrite.go b/repo/maintenance/content_rewrite.go index 77f17737a..221c3a764 100644 --- a/repo/maintenance/content_rewrite.go +++ b/repo/maintenance/content_rewrite.go @@ -136,7 +136,7 @@ func getContentToRewrite(ctx context.Context, rep repo.DirectRepository, opt *Re // add all content IDs from short packs if opt.ShortPacks { - mp, mperr := rep.ContentReader().ContentFormat().GetMutableParameters() + mp, mperr := rep.ContentReader().ContentFormat().GetMutableParameters(ctx) if mperr == nil { threshold := int64(mp.MaxPackSize * shortPackThresholdPercent / 100) //nolint:gomnd findContentInShortPacks(ctx, rep, ch, threshold, opt) diff --git a/repo/maintenance/maintenance_run.go b/repo/maintenance/maintenance_run.go index 794df4117..e18fc22c3 100644 --- a/repo/maintenance/maintenance_run.go +++ b/repo/maintenance/maintenance_run.go @@ -247,7 +247,7 @@ func Run(ctx context.Context, runParams RunParameters, safety SafetyParameters) } func runQuickMaintenance(ctx context.Context, runParams RunParameters, safety SafetyParameters) error { - _, ok, emerr := runParams.rep.ContentManager().EpochManager() + _, ok, emerr := runParams.rep.ContentManager().EpochManager(ctx) if ok { log(ctx).Debugf("quick maintenance not required for epoch manager") return nil @@ -328,7 +328,7 @@ func runTaskCleanupLogs(ctx context.Context, runParams RunParameters, s *Schedul } func runTaskCleanupEpochManager(ctx context.Context, runParams RunParameters, s *Schedule) error { - em, ok, emerr := runParams.rep.ContentManager().EpochManager() + em, ok, emerr := runParams.rep.ContentManager().EpochManager(ctx) if emerr != nil { return errors.Wrap(emerr, "epoch manager") } diff --git a/repo/object/object_manager.go b/repo/object/object_manager.go index a6f7526fc..5ff04526a 100644 --- a/repo/object/object_manager.go +++ b/repo/object/object_manager.go @@ -35,7 +35,7 @@ type contentReader interface { type contentManager interface { contentReader - SupportsContentCompression() (bool, error) + SupportsContentCompression(ctx context.Context) (bool, error) WriteContent(ctx context.Context, data gather.Bytes, prefix content.IDPrefix, comp compression.HeaderID) (content.ID, error) } diff --git a/repo/object/object_manager_test.go b/repo/object/object_manager_test.go index 5979a9a76..4372f8ff7 100644 --- a/repo/object/object_manager_test.go +++ b/repo/object/object_manager_test.go @@ -79,7 +79,7 @@ func (f *fakeContentManager) WriteContent(ctx context.Context, data gather.Bytes return contentID, nil } -func (f *fakeContentManager) SupportsContentCompression() (bool, error) { +func (f *fakeContentManager) SupportsContentCompression(ctx context.Context) (bool, error) { return f.supportsContentCompression, nil } diff --git a/repo/object/object_writer.go b/repo/object/object_writer.go index 032932bb1..e4f8b69ee 100644 --- a/repo/object/object_writer.go +++ b/repo/object/object_writer.go @@ -188,7 +188,7 @@ func (w *objectWriter) prepareAndWriteContentChunk(chunkID int, data gather.Byte comp := content.NoCompression objectComp := w.compressor - scc, err := w.om.contentMgr.SupportsContentCompression() + scc, err := w.om.contentMgr.SupportsContentCompression(w.ctx) if err != nil { return errors.Wrap(err, "supports content compression") } diff --git a/repo/open.go b/repo/open.go index 2f82c1c4f..ca2d72fea 100644 --- a/repo/open.go +++ b/repo/open.go @@ -286,7 +286,7 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions, return lc2.writeToFile(configFile) }) - blobcfg, err := fmgr.BlobCfgBlob() + blobcfg, err := fmgr.BlobCfgBlob(ctx) if err != nil { return nil, errors.Wrap(err, "blob configuration") } @@ -297,7 +297,7 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions, _, err = retry.WithExponentialBackoffMaxRetries(ctx, -1, "wait for upgrade", func() (interface{}, error) { //nolint:govet - uli, err := fmgr.UpgradeLockIntent() + uli, err := fmgr.UpgradeLockIntent(ctx) if err != nil { //nolint:wrapcheck return nil, err @@ -377,7 +377,7 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions, } func handleMissingRequiredFeatures(ctx context.Context, fmgr *format.Manager, ignoreErrors bool) error { - required, err := fmgr.RequiredFeatures() + required, err := fmgr.RequiredFeatures(ctx) if err != nil { return errors.Wrap(err, "required features") } @@ -455,7 +455,7 @@ func upgradeLockMonitor( return nil } - uli, err := fmgr.UpgradeLockIntent() + uli, err := fmgr.UpgradeLockIntent(ctx) if err != nil { return errors.Wrap(err, "upgrade lock intent") } diff --git a/repo/repository_test.go b/repo/repository_test.go index 60afcfcff..d1b658b8f 100644 --- a/repo/repository_test.go +++ b/repo/repository_test.go @@ -893,16 +893,16 @@ func TestDeriveKey(t *testing.T) { dw1Upgraded := env.Repository.(repo.DirectRepositoryWriter) cf := dw1Upgraded.ContentReader().ContentFormat() - mp, mperr := cf.GetMutableParameters() + mp, mperr := cf.GetMutableParameters(ctx) require.NoError(t, mperr) - feat, err := dw1Upgraded.FormatManager().RequiredFeatures() + feat, err := dw1Upgraded.FormatManager().RequiredFeatures(ctx) require.NoError(t, err) // perform upgrade mp.Version = v2 - blobCfg, err := dw1Upgraded.FormatManager().BlobCfgBlob() + blobCfg, err := dw1Upgraded.FormatManager().BlobCfgBlob(ctx) require.NoError(t, err) require.NoError(t, dw1Upgraded.FormatManager().SetParameters(ctx, mp, blobCfg, feat)) @@ -927,7 +927,7 @@ func TestDeriveKey(t *testing.T) { for _, tc := range cases { t.Run(tc.desc, func(t *testing.T) { - mp, err := tc.dw.FormatManager().GetMutableParameters() + mp, err := tc.dw.FormatManager().GetMutableParameters(testlogging.Context(t)) require.NoError(t, err) require.Equal(t, tc.wantFormat, mp.Version)