mirror of
https://github.com/kopia/kopia.git
synced 2026-05-24 22:54:55 -04:00
refactor(repository): added context to potentially blocking repository methods (#3654)
Primarily for wiring a context.Context to a call to content.Manager.refresh, which was using a detached context.
This commit is contained in:
@@ -40,7 +40,7 @@ func setDefaultMaintenanceParameters(ctx context.Context, rep repo.RepositoryWri
|
||||
p.Owner = rep.ClientOptions().UsernameAtHost()
|
||||
|
||||
if dw, ok := rep.(repo.DirectRepositoryWriter); ok {
|
||||
_, ok, err := dw.ContentReader().EpochManager()
|
||||
_, ok, err := dw.ContentReader().EpochManager(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "epoch manager")
|
||||
}
|
||||
|
||||
@@ -23,7 +23,7 @@ func (c *commandIndexEpochList) setup(svc appServices, parent commandParent) {
|
||||
}
|
||||
|
||||
func (c *commandIndexEpochList) run(ctx context.Context, rep repo.DirectRepository) error {
|
||||
emgr, ok, err := rep.ContentReader().EpochManager()
|
||||
emgr, ok, err := rep.ContentReader().EpochManager(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "epoch manager")
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ func (c *commandMaintenanceRun) setup(svc appServices, parent commandParent) {
|
||||
func (c *commandMaintenanceRun) run(ctx context.Context, rep repo.DirectRepositoryWriter) error {
|
||||
mode := maintenance.ModeQuick
|
||||
|
||||
_, supportsEpochManager, err := rep.ContentManager().EpochManager()
|
||||
_, supportsEpochManager, err := rep.ContentManager().EpochManager(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "EpochManager")
|
||||
}
|
||||
|
||||
@@ -177,7 +177,7 @@ func (c *commandMaintenanceSet) run(ctx context.Context, rep repo.DirectReposito
|
||||
return errors.Errorf("no changes specified")
|
||||
}
|
||||
|
||||
blobCfg, err := rep.FormatManager().BlobCfgBlob()
|
||||
blobCfg, err := rep.FormatManager().BlobCfgBlob(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "blob configuration")
|
||||
}
|
||||
|
||||
@@ -174,17 +174,17 @@ func (c *commandRepositorySetParameters) disableBlobRetention(ctx context.Contex
|
||||
}
|
||||
|
||||
func (c *commandRepositorySetParameters) run(ctx context.Context, rep repo.DirectRepositoryWriter) error {
|
||||
mp, err := rep.FormatManager().GetMutableParameters()
|
||||
mp, err := rep.FormatManager().GetMutableParameters(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "mutable parameters")
|
||||
}
|
||||
|
||||
blobcfg, err := rep.FormatManager().BlobCfgBlob()
|
||||
blobcfg, err := rep.FormatManager().BlobCfgBlob(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "blob configuration")
|
||||
}
|
||||
|
||||
requiredFeatures, err := rep.FormatManager().RequiredFeatures()
|
||||
requiredFeatures, err := rep.FormatManager().RequiredFeatures(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to get required features")
|
||||
}
|
||||
|
||||
@@ -63,7 +63,7 @@ func (c *commandRepositoryStatus) outputJSON(ctx context.Context, r repo.Reposit
|
||||
ci := dr.BlobReader().ConnectionInfo()
|
||||
s.UniqueIDHex = hex.EncodeToString(dr.UniqueID())
|
||||
s.ObjectFormat = dr.ObjectFormat()
|
||||
s.BlobRetention, _ = dr.FormatManager().BlobCfgBlob()
|
||||
s.BlobRetention, _ = dr.FormatManager().BlobCfgBlob(ctx)
|
||||
s.Storage = scrubber.ScrubSensitiveData(reflect.ValueOf(ci)).Interface().(blob.ConnectionInfo) //nolint:forcetypeassert
|
||||
s.ContentFormat = dr.FormatManager().ScrubbedContentFormat()
|
||||
|
||||
@@ -82,13 +82,13 @@ func (c *commandRepositoryStatus) outputJSON(ctx context.Context, r repo.Reposit
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *commandRepositoryStatus) dumpUpgradeStatus(dr repo.DirectRepository) error {
|
||||
func (c *commandRepositoryStatus) dumpUpgradeStatus(ctx context.Context, dr repo.DirectRepository) error {
|
||||
drw, isDr := dr.(repo.DirectRepositoryWriter)
|
||||
if !isDr {
|
||||
return nil
|
||||
}
|
||||
|
||||
l, err := drw.FormatManager().GetUpgradeLockIntent()
|
||||
l, err := drw.FormatManager().GetUpgradeLockIntent(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to get the upgrade lock intent")
|
||||
}
|
||||
@@ -120,8 +120,8 @@ func (c *commandRepositoryStatus) dumpUpgradeStatus(dr repo.DirectRepository) er
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *commandRepositoryStatus) dumpRetentionStatus(dr repo.DirectRepository) {
|
||||
if blobcfg, _ := dr.FormatManager().BlobCfgBlob(); blobcfg.IsRetentionEnabled() {
|
||||
func (c *commandRepositoryStatus) dumpRetentionStatus(ctx context.Context, dr repo.DirectRepository) {
|
||||
if blobcfg, _ := dr.FormatManager().BlobCfgBlob(ctx); blobcfg.IsRetentionEnabled() {
|
||||
c.out.printStdout("\n")
|
||||
c.out.printStdout("Blob retention mode: %s\n", blobcfg.RetentionMode)
|
||||
c.out.printStdout("Blob retention period: %s\n", blobcfg.RetentionPeriod)
|
||||
@@ -174,7 +174,7 @@ func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository)
|
||||
|
||||
contentFormat := dr.ContentReader().ContentFormat()
|
||||
|
||||
mp, mperr := contentFormat.GetMutableParameters()
|
||||
mp, mperr := contentFormat.GetMutableParameters(ctx)
|
||||
if mperr != nil {
|
||||
return errors.Wrap(mperr, "mutable parameters")
|
||||
}
|
||||
@@ -188,12 +188,12 @@ func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository)
|
||||
c.out.printStdout("Content compression: %v\n", mp.IndexVersion >= index.Version2)
|
||||
c.out.printStdout("Password changes: %v\n", contentFormat.SupportsPasswordChange())
|
||||
|
||||
c.outputRequiredFeatures(dr)
|
||||
c.outputRequiredFeatures(ctx, dr)
|
||||
|
||||
c.out.printStdout("Max pack length: %v\n", units.BytesString(int64(mp.MaxPackSize)))
|
||||
c.out.printStdout("Index Format: v%v\n", mp.IndexVersion)
|
||||
|
||||
emgr, epochMgrEnabled, emerr := dr.ContentReader().EpochManager()
|
||||
emgr, epochMgrEnabled, emerr := dr.ContentReader().EpochManager(ctx)
|
||||
if emerr != nil {
|
||||
return errors.Wrap(emerr, "epoch manager")
|
||||
}
|
||||
@@ -216,9 +216,9 @@ func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository)
|
||||
c.out.printStdout("Epoch Manager: disabled\n")
|
||||
}
|
||||
|
||||
c.dumpRetentionStatus(dr)
|
||||
c.dumpRetentionStatus(ctx, dr)
|
||||
|
||||
if err := c.dumpUpgradeStatus(dr); err != nil {
|
||||
if err := c.dumpUpgradeStatus(ctx, dr); err != nil {
|
||||
return errors.Wrap(err, "failed to dump upgrade status")
|
||||
}
|
||||
|
||||
@@ -251,8 +251,8 @@ func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *commandRepositoryStatus) outputRequiredFeatures(dr repo.DirectRepository) {
|
||||
if req, _ := dr.FormatManager().RequiredFeatures(); len(req) > 0 {
|
||||
func (c *commandRepositoryStatus) outputRequiredFeatures(ctx context.Context, dr repo.DirectRepository) {
|
||||
if req, _ := dr.FormatManager().RequiredFeatures(ctx); len(req) > 0 {
|
||||
var featureIDs []string
|
||||
|
||||
for _, r := range req {
|
||||
|
||||
@@ -297,7 +297,7 @@ func (c *commandRepositoryUpgrade) setLockIntent(ctx context.Context, rep repo.D
|
||||
|
||||
now := rep.Time()
|
||||
|
||||
mp, mperr := rep.ContentReader().ContentFormat().GetMutableParameters()
|
||||
mp, mperr := rep.ContentReader().ContentFormat().GetMutableParameters(ctx)
|
||||
if mperr != nil {
|
||||
return errors.Wrap(mperr, "mutable parameters")
|
||||
}
|
||||
@@ -355,7 +355,7 @@ func (c *commandRepositoryUpgrade) setLockIntent(ctx context.Context, rep repo.D
|
||||
func (c *commandRepositoryUpgrade) drainOrCommit(ctx context.Context, rep repo.DirectRepositoryWriter) error {
|
||||
cf := rep.ContentReader().ContentFormat()
|
||||
|
||||
mp, mperr := cf.GetMutableParameters()
|
||||
mp, mperr := cf.GetMutableParameters(ctx)
|
||||
if mperr != nil {
|
||||
return errors.Wrap(mperr, "mutable parameters")
|
||||
}
|
||||
@@ -363,7 +363,7 @@ func (c *commandRepositoryUpgrade) drainOrCommit(ctx context.Context, rep repo.D
|
||||
if mp.EpochParameters.Enabled {
|
||||
log(ctx).Infof("Repository indices have already been migrated to the epoch format, no need to drain other clients")
|
||||
|
||||
l, err := rep.FormatManager().GetUpgradeLockIntent()
|
||||
l, err := rep.FormatManager().GetUpgradeLockIntent(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to get upgrade lock intent")
|
||||
}
|
||||
@@ -406,7 +406,7 @@ func (c *commandRepositoryUpgrade) sleepWithContext(ctx context.Context, dur tim
|
||||
|
||||
func (c *commandRepositoryUpgrade) drainAllClients(ctx context.Context, rep repo.DirectRepositoryWriter) error {
|
||||
for {
|
||||
l, err := rep.FormatManager().GetUpgradeLockIntent()
|
||||
l, err := rep.FormatManager().GetUpgradeLockIntent(ctx)
|
||||
|
||||
upgradeTime := l.UpgradeTime()
|
||||
now := rep.Time()
|
||||
@@ -436,12 +436,12 @@ func (c *commandRepositoryUpgrade) drainAllClients(ctx context.Context, rep repo
|
||||
// repository. This phase runs after the lock has been acquired in one of the
|
||||
// prior phases.
|
||||
func (c *commandRepositoryUpgrade) upgrade(ctx context.Context, rep repo.DirectRepositoryWriter) error {
|
||||
mp, mperr := rep.ContentReader().ContentFormat().GetMutableParameters()
|
||||
mp, mperr := rep.ContentReader().ContentFormat().GetMutableParameters(ctx)
|
||||
if mperr != nil {
|
||||
return errors.Wrap(mperr, "mutable parameters")
|
||||
}
|
||||
|
||||
rf, err := rep.FormatManager().RequiredFeatures()
|
||||
rf, err := rep.FormatManager().RequiredFeatures(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error getting repository features")
|
||||
}
|
||||
@@ -460,7 +460,7 @@ func (c *commandRepositoryUpgrade) upgrade(ctx context.Context, rep repo.DirectR
|
||||
return errors.Wrap(uerr, "error upgrading indices")
|
||||
}
|
||||
|
||||
blobCfg, err := rep.FormatManager().BlobCfgBlob()
|
||||
blobCfg, err := rep.FormatManager().BlobCfgBlob(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "error getting blob configuration")
|
||||
}
|
||||
|
||||
@@ -33,7 +33,7 @@
|
||||
|
||||
// ParametersProvider provides epoch manager parameters.
|
||||
type ParametersProvider interface {
|
||||
GetParameters() (*Parameters, error)
|
||||
GetParameters(ctx context.Context) (*Parameters, error)
|
||||
}
|
||||
|
||||
// ErrVerySlowIndexWrite is returned by WriteIndex if a write takes more than 2 epochs (usually >48h).
|
||||
@@ -327,7 +327,7 @@ func (e *Manager) cleanupEpochMarkers(ctx context.Context, cs CurrentSnapshot) e
|
||||
}
|
||||
}
|
||||
|
||||
p, err := e.getParameters()
|
||||
p, err := e.getParameters(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -363,7 +363,7 @@ func (e *Manager) CleanupSupersededIndexes(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
p, err := e.getParameters()
|
||||
p, err := e.getParameters(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -419,8 +419,8 @@ func blobSetWrittenEarlyEnough(replacementSet []blob.Metadata, maxReplacementTim
|
||||
return blob.MaxTimestamp(replacementSet).Before(maxReplacementTime)
|
||||
}
|
||||
|
||||
func (e *Manager) getParameters() (*Parameters, error) {
|
||||
emp, err := e.paramProvider.GetParameters()
|
||||
func (e *Manager) getParameters(ctx context.Context) (*Parameters, error) {
|
||||
emp, err := e.paramProvider.GetParameters(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "epoch manager parameters")
|
||||
}
|
||||
@@ -433,7 +433,7 @@ func (e *Manager) refreshLocked(ctx context.Context) error {
|
||||
return errors.Wrap(ctx.Err(), "refreshLocked")
|
||||
}
|
||||
|
||||
p, err := e.getParameters()
|
||||
p, err := e.getParameters(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -666,7 +666,7 @@ func (e *Manager) loadUncompactedEpochs(ctx context.Context, min, max int) (map[
|
||||
func (e *Manager) refreshAttemptLocked(ctx context.Context) error {
|
||||
e.log.Debug("refreshAttemptLocked")
|
||||
|
||||
p, perr := e.getParameters()
|
||||
p, perr := e.getParameters(ctx)
|
||||
if perr != nil {
|
||||
return perr
|
||||
}
|
||||
@@ -789,7 +789,7 @@ func (e *Manager) WriteIndex(ctx context.Context, dataShards map[blob.ID]blob.By
|
||||
for {
|
||||
e.log.Debug("WriteIndex")
|
||||
|
||||
p, err := e.getParameters()
|
||||
p, err := e.getParameters(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -348,7 +348,7 @@ func TestIndexEpochManager_NoCompactionInReadOnly(t *testing.T) {
|
||||
return nil
|
||||
}
|
||||
|
||||
p, err := te.mgr.getParameters()
|
||||
p, err := te.mgr.getParameters(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Write data to the index such that the next time it's opened it should
|
||||
@@ -806,7 +806,7 @@ type parameterProvider struct {
|
||||
*Parameters
|
||||
}
|
||||
|
||||
func (p parameterProvider) GetParameters() (*Parameters, error) {
|
||||
func (p parameterProvider) GetParameters(ctx context.Context) (*Parameters, error) {
|
||||
return p.Parameters, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -35,7 +35,7 @@ func handleRepoParameters(ctx context.Context, rc requestContext) (interface{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
scc, err := dr.ContentReader().SupportsContentCompression()
|
||||
scc, err := dr.ContentReader().SupportsContentCompression(ctx)
|
||||
if err != nil {
|
||||
return nil, internalServerError(err)
|
||||
}
|
||||
@@ -60,12 +60,14 @@ func handleRepoStatus(ctx context.Context, rc requestContext) (interface{}, *api
|
||||
|
||||
dr, ok := rc.rep.(repo.DirectRepository)
|
||||
if ok {
|
||||
mp, mperr := dr.ContentReader().ContentFormat().GetMutableParameters()
|
||||
contentFormat := dr.ContentReader().ContentFormat()
|
||||
|
||||
mp, mperr := contentFormat.GetMutableParameters(ctx)
|
||||
if mperr != nil {
|
||||
return nil, internalServerError(mperr)
|
||||
}
|
||||
|
||||
scc, err := dr.ContentReader().SupportsContentCompression()
|
||||
scc, err := dr.ContentReader().SupportsContentCompression(ctx)
|
||||
if err != nil {
|
||||
return nil, internalServerError(err)
|
||||
}
|
||||
@@ -74,10 +76,10 @@ func handleRepoStatus(ctx context.Context, rc requestContext) (interface{}, *api
|
||||
Connected: true,
|
||||
ConfigFile: dr.ConfigFilename(),
|
||||
FormatVersion: mp.Version,
|
||||
Hash: dr.ContentReader().ContentFormat().GetHashFunction(),
|
||||
Encryption: dr.ContentReader().ContentFormat().GetEncryptionAlgorithm(),
|
||||
ECC: dr.ContentReader().ContentFormat().GetECCAlgorithm(),
|
||||
ECCOverheadPercent: dr.ContentReader().ContentFormat().GetECCOverheadPercent(),
|
||||
Hash: contentFormat.GetHashFunction(),
|
||||
Encryption: contentFormat.GetEncryptionAlgorithm(),
|
||||
ECC: contentFormat.GetECCAlgorithm(),
|
||||
ECCOverheadPercent: contentFormat.GetECCOverheadPercent(),
|
||||
MaxPackSize: mp.MaxPackSize,
|
||||
Splitter: dr.ObjectFormat().Splitter,
|
||||
Storage: dr.BlobReader().ConnectionInfo().Type,
|
||||
|
||||
@@ -551,7 +551,7 @@ func (s *Server) handleInitialSessionHandshake(srv grpcapi.KopiaRepository_Sessi
|
||||
return repo.WriteSessionOptions{}, errors.Errorf("missing initialization request")
|
||||
}
|
||||
|
||||
scc, err := dr.ContentReader().SupportsContentCompression()
|
||||
scc, err := dr.ContentReader().SupportsContentCompression(srv.Context())
|
||||
if err != nil {
|
||||
return repo.WriteSessionOptions{}, errors.Wrap(err, "supports content compression")
|
||||
}
|
||||
|
||||
@@ -157,7 +157,7 @@ func (r *apiServerRepository) Flush(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *apiServerRepository) SupportsContentCompression() (bool, error) {
|
||||
func (r *apiServerRepository) SupportsContentCompression(ctx context.Context) (bool, error) {
|
||||
return r.serverSupportsContentCompression, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -186,7 +186,7 @@ func (c *committedContentIndex) merge(ctx context.Context, indexFiles []blob.ID)
|
||||
newUsedMap[e] = ndx
|
||||
}
|
||||
|
||||
mergedAndCombined, err := c.combineSmallIndexes(newMerged)
|
||||
mergedAndCombined, err := c.combineSmallIndexes(ctx, newMerged)
|
||||
if err != nil {
|
||||
newlyOpened.Close() //nolint:errcheck
|
||||
|
||||
@@ -239,7 +239,7 @@ func (c *committedContentIndex) use(ctx context.Context, indexFiles []blob.ID, i
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *committedContentIndex) combineSmallIndexes(m index.Merged) (index.Merged, error) {
|
||||
func (c *committedContentIndex) combineSmallIndexes(ctx context.Context, m index.Merged) (index.Merged, error) {
|
||||
var toKeep, toMerge index.Merged
|
||||
|
||||
for _, ndx := range m {
|
||||
@@ -265,7 +265,7 @@ func (c *committedContentIndex) combineSmallIndexes(m index.Merged) (index.Merge
|
||||
}
|
||||
}
|
||||
|
||||
mp, mperr := c.formatProvider.GetMutableParameters()
|
||||
mp, mperr := c.formatProvider.GetMutableParameters(ctx)
|
||||
if mperr != nil {
|
||||
return nil, errors.Wrap(mperr, "error getting mutable parameters")
|
||||
}
|
||||
|
||||
@@ -204,7 +204,7 @@ func (sm *SharedManager) loadPackIndexesLocked(ctx context.Context) error {
|
||||
nextSleepTime := 100 * time.Millisecond //nolint:gomnd
|
||||
|
||||
for i := 0; i < indexLoadAttempts; i++ {
|
||||
ibm, err0 := sm.indexBlobManager()
|
||||
ibm, err0 := sm.indexBlobManager(ctx)
|
||||
if err0 != nil {
|
||||
return err0
|
||||
}
|
||||
@@ -268,8 +268,8 @@ func (sm *SharedManager) getCacheForContentID(id ID) cache.ContentCache {
|
||||
}
|
||||
|
||||
// indexBlobManager return the index manager for content.
|
||||
func (sm *SharedManager) indexBlobManager() (indexblob.Manager, error) {
|
||||
mp, mperr := sm.format.GetMutableParameters()
|
||||
func (sm *SharedManager) indexBlobManager(ctx context.Context) (indexblob.Manager, error) {
|
||||
mp, mperr := sm.format.GetMutableParameters(ctx)
|
||||
if mperr != nil {
|
||||
return nil, errors.Wrap(mperr, "mutable parameters")
|
||||
}
|
||||
@@ -359,7 +359,7 @@ func (sm *SharedManager) IndexBlobs(ctx context.Context, includeInactive bool) (
|
||||
return result, nil
|
||||
}
|
||||
|
||||
ibm, err0 := sm.indexBlobManager()
|
||||
ibm, err0 := sm.indexBlobManager(ctx)
|
||||
if err0 != nil {
|
||||
return nil, err0
|
||||
}
|
||||
@@ -539,8 +539,8 @@ type epochParameters struct {
|
||||
prov format.Provider
|
||||
}
|
||||
|
||||
func (p epochParameters) GetParameters() (*epoch.Parameters, error) {
|
||||
mp, mperr := p.prov.GetMutableParameters()
|
||||
func (p epochParameters) GetParameters(ctx context.Context) (*epoch.Parameters, error) {
|
||||
mp, mperr := p.prov.GetMutableParameters(ctx)
|
||||
if mperr != nil {
|
||||
return nil, errors.Wrap(mperr, "mutable parameters")
|
||||
}
|
||||
@@ -549,8 +549,8 @@ func (p epochParameters) GetParameters() (*epoch.Parameters, error) {
|
||||
}
|
||||
|
||||
// EpochManager returns the epoch manager.
|
||||
func (sm *SharedManager) EpochManager() (*epoch.Manager, bool, error) {
|
||||
ibm, err := sm.indexBlobManager()
|
||||
func (sm *SharedManager) EpochManager(ctx context.Context) (*epoch.Manager, bool, error) {
|
||||
ibm, err := sm.indexBlobManager(ctx)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/content/index"
|
||||
"github.com/kopia/kopia/repo/format"
|
||||
)
|
||||
|
||||
// RecoverIndexFromPackBlob attempts to recover index blob entries from a given pack file.
|
||||
@@ -42,7 +43,7 @@ func (bm *WriteManager) RecoverIndexFromPackBlob(ctx context.Context, packFile b
|
||||
|
||||
if commit {
|
||||
bm.lock()
|
||||
defer bm.unlock()
|
||||
defer bm.unlock(ctx)
|
||||
|
||||
for _, is := range recovered {
|
||||
bm.packIndexBuilder.Add(is)
|
||||
@@ -170,12 +171,7 @@ func decodePostamble(payload []byte) *packContentPostamble {
|
||||
}
|
||||
}
|
||||
|
||||
func (sm *SharedManager) buildLocalIndex(pending index.Builder, output *gather.WriteBuffer) error {
|
||||
mp, mperr := sm.format.GetMutableParameters()
|
||||
if mperr != nil {
|
||||
return errors.Wrap(mperr, "mutable parameters")
|
||||
}
|
||||
|
||||
func (sm *SharedManager) buildLocalIndex(mp format.MutableParameters, pending index.Builder, output *gather.WriteBuffer) error {
|
||||
if err := pending.Build(output, mp.IndexVersion); err != nil {
|
||||
return errors.Wrap(err, "unable to build local index")
|
||||
}
|
||||
@@ -184,14 +180,14 @@ func (sm *SharedManager) buildLocalIndex(pending index.Builder, output *gather.W
|
||||
}
|
||||
|
||||
// appendPackFileIndexRecoveryData appends data designed to help with recovery of pack index in case it gets damaged or lost.
|
||||
func (sm *SharedManager) appendPackFileIndexRecoveryData(pending index.Builder, output *gather.WriteBuffer) error {
|
||||
func (sm *SharedManager) appendPackFileIndexRecoveryData(mp format.MutableParameters, pending index.Builder, output *gather.WriteBuffer) error {
|
||||
// build, encrypt and append local index
|
||||
localIndexOffset := output.Length()
|
||||
|
||||
var localIndex gather.WriteBuffer
|
||||
defer localIndex.Close()
|
||||
|
||||
if err := sm.buildLocalIndex(pending, &localIndex); err != nil {
|
||||
if err := sm.buildLocalIndex(mp, pending, &localIndex); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -121,7 +121,7 @@ func (bm *WriteManager) Revision() int64 {
|
||||
// of randomness or a contemporaneous timestamp that will never reappear.
|
||||
func (bm *WriteManager) DeleteContent(ctx context.Context, contentID ID) error {
|
||||
bm.lock()
|
||||
defer bm.unlock()
|
||||
defer bm.unlock(ctx)
|
||||
|
||||
bm.revision.Add(1)
|
||||
|
||||
@@ -213,7 +213,7 @@ func (bm *WriteManager) contentWriteTime(previousUnixTimeSeconds int64) int64 {
|
||||
func (bm *WriteManager) maybeFlushBasedOnTimeUnlocked(ctx context.Context) error {
|
||||
bm.lock()
|
||||
shouldFlush := bm.timeNow().After(bm.flushPackIndexesAfter)
|
||||
bm.unlock()
|
||||
bm.unlock(ctx)
|
||||
|
||||
if !shouldFlush {
|
||||
return nil
|
||||
@@ -224,7 +224,7 @@ func (bm *WriteManager) maybeFlushBasedOnTimeUnlocked(ctx context.Context) error
|
||||
|
||||
func (bm *WriteManager) maybeRetryWritingFailedPacksUnlocked(ctx context.Context) error {
|
||||
bm.lock()
|
||||
defer bm.unlock()
|
||||
defer bm.unlock(ctx)
|
||||
|
||||
// do not start new uploads while flushing
|
||||
for bm.flushing {
|
||||
@@ -271,7 +271,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat
|
||||
if previousWriteTime < 0 {
|
||||
if _, _, err = bm.getContentInfoReadLocked(ctx, contentID); err == nil {
|
||||
// we lost the race while compressing the content, the content now exists.
|
||||
bm.unlock()
|
||||
bm.unlock(ctx)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -294,14 +294,14 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat
|
||||
bm.log.Debugf("retry-write %v", pp.packBlobID)
|
||||
|
||||
if err = bm.writePackAndAddToIndexLocked(ctx, pp); err != nil {
|
||||
bm.unlock()
|
||||
bm.unlock(ctx)
|
||||
return errors.Wrap(err, "error writing previously failed pack")
|
||||
}
|
||||
}
|
||||
|
||||
pp, err := bm.getOrCreatePendingPackInfoLocked(ctx, prefix)
|
||||
if err != nil {
|
||||
bm.unlock()
|
||||
bm.unlock(ctx)
|
||||
return errors.Wrap(err, "unable to create pending pack")
|
||||
}
|
||||
|
||||
@@ -316,7 +316,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat
|
||||
}
|
||||
|
||||
if _, err := compressedAndEncrypted.Bytes().WriteTo(pp.currentPackData); err != nil {
|
||||
bm.unlock()
|
||||
bm.unlock(ctx)
|
||||
return errors.Wrapf(err, "unable to append %q to pack data", contentID)
|
||||
}
|
||||
|
||||
@@ -333,7 +333,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat
|
||||
bm.writingPacks = append(bm.writingPacks, pp)
|
||||
}
|
||||
|
||||
bm.unlock()
|
||||
bm.unlock(ctx)
|
||||
|
||||
// at this point we're unlocked so different goroutines can encrypt and
|
||||
// save to storage in parallel.
|
||||
@@ -349,7 +349,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat
|
||||
// DisableIndexFlush increments the counter preventing automatic index flushes.
|
||||
func (bm *WriteManager) DisableIndexFlush(ctx context.Context) {
|
||||
bm.lock()
|
||||
defer bm.unlock()
|
||||
defer bm.unlock(ctx)
|
||||
bm.log.Debugf("DisableIndexFlush()")
|
||||
bm.disableIndexFlushCount++
|
||||
}
|
||||
@@ -358,7 +358,7 @@ func (bm *WriteManager) DisableIndexFlush(ctx context.Context) {
|
||||
// The flushes will be re-enabled when the index drops to zero.
|
||||
func (bm *WriteManager) EnableIndexFlush(ctx context.Context) {
|
||||
bm.lock()
|
||||
defer bm.unlock()
|
||||
defer bm.unlock(ctx)
|
||||
bm.log.Debugf("EnableIndexFlush()")
|
||||
bm.disableIndexFlushCount--
|
||||
}
|
||||
@@ -417,7 +417,7 @@ func (bm *WriteManager) writeIndexBlobs(ctx context.Context, dataShards []gather
|
||||
ctx, span := tracer.Start(ctx, "WriteIndexBlobs")
|
||||
defer span.End()
|
||||
|
||||
ibm, err := bm.indexBlobManager()
|
||||
ibm, err := bm.indexBlobManager(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -509,7 +509,7 @@ func (bm *WriteManager) writePackAndAddToIndexUnlocked(ctx context.Context, pp *
|
||||
packFileIndex, writeErr := bm.prepareAndWritePackInternal(ctx, pp, bm.onUpload)
|
||||
|
||||
bm.lock()
|
||||
defer bm.unlock()
|
||||
defer bm.unlock(ctx)
|
||||
|
||||
return bm.processWritePackResultLocked(pp, packFileIndex, writeErr)
|
||||
}
|
||||
@@ -547,7 +547,12 @@ func (bm *WriteManager) processWritePackResultLocked(pp *pendingPackInfo, packFi
|
||||
}
|
||||
|
||||
func (sm *SharedManager) prepareAndWritePackInternal(ctx context.Context, pp *pendingPackInfo, onUpload func(int64)) (index.Builder, error) {
|
||||
packFileIndex, err := sm.preparePackDataContent(pp)
|
||||
mp, mperr := sm.format.GetMutableParameters(ctx)
|
||||
if mperr != nil {
|
||||
return nil, errors.Wrap(mperr, "mutable parameters")
|
||||
}
|
||||
|
||||
packFileIndex, err := sm.preparePackDataContent(mp, pp)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "error preparing data content")
|
||||
}
|
||||
@@ -590,13 +595,13 @@ func (bm *WriteManager) setFlushingLocked(v bool) {
|
||||
// Any pending writes completed before Flush() has started are guaranteed to be committed to the
|
||||
// repository before Flush() returns.
|
||||
func (bm *WriteManager) Flush(ctx context.Context) error {
|
||||
mp, mperr := bm.format.GetMutableParameters()
|
||||
mp, mperr := bm.format.GetMutableParameters(ctx)
|
||||
if mperr != nil {
|
||||
return errors.Wrap(mperr, "mutable parameters")
|
||||
}
|
||||
|
||||
bm.lock()
|
||||
defer bm.unlock()
|
||||
defer bm.unlock(ctx)
|
||||
|
||||
bm.log.Debugf("flush")
|
||||
|
||||
@@ -645,7 +650,7 @@ func (bm *WriteManager) Flush(ctx context.Context) error {
|
||||
func (bm *WriteManager) RewriteContent(ctx context.Context, contentID ID) error {
|
||||
bm.log.Debugf("rewrite-content %v", contentID)
|
||||
|
||||
mp, mperr := bm.format.GetMutableParameters()
|
||||
mp, mperr := bm.format.GetMutableParameters(ctx)
|
||||
if mperr != nil {
|
||||
return errors.Wrap(mperr, "mutable parameters")
|
||||
}
|
||||
@@ -676,7 +681,7 @@ func (bm *WriteManager) getContentDataAndInfo(ctx context.Context, contentID ID,
|
||||
func (bm *WriteManager) UndeleteContent(ctx context.Context, contentID ID) error {
|
||||
bm.log.Debugf("UndeleteContent(%q)", contentID)
|
||||
|
||||
mp, mperr := bm.format.GetMutableParameters()
|
||||
mp, mperr := bm.format.GetMutableParameters(ctx)
|
||||
if mperr != nil {
|
||||
return errors.Wrap(mperr, "mutable parameters")
|
||||
}
|
||||
@@ -740,7 +745,7 @@ func (bm *WriteManager) getOrCreatePendingPackInfoLocked(ctx context.Context, pr
|
||||
return nil, errors.Wrap(err, "unable to read crypto bytes")
|
||||
}
|
||||
|
||||
suffix, berr := bm.format.RepositoryFormatBytes()
|
||||
suffix, berr := bm.format.RepositoryFormatBytes(ctx)
|
||||
if berr != nil {
|
||||
return nil, errors.Wrap(berr, "format bytes")
|
||||
}
|
||||
@@ -763,8 +768,8 @@ func (bm *WriteManager) getOrCreatePendingPackInfoLocked(ctx context.Context, pr
|
||||
}
|
||||
|
||||
// SupportsContentCompression returns true if content manager supports content-compression.
|
||||
func (bm *WriteManager) SupportsContentCompression() (bool, error) {
|
||||
mp, mperr := bm.format.GetMutableParameters()
|
||||
func (bm *WriteManager) SupportsContentCompression(ctx context.Context) (bool, error) {
|
||||
mp, mperr := bm.format.GetMutableParameters(ctx)
|
||||
if mperr != nil {
|
||||
return false, errors.Wrap(mperr, "mutable parameters")
|
||||
}
|
||||
@@ -780,7 +785,7 @@ func (bm *WriteManager) WriteContent(ctx context.Context, data gather.Bytes, pre
|
||||
bm.writeContentBytes.Observe(int64(data.Length()), t0.Elapsed())
|
||||
}()
|
||||
|
||||
mp, mperr := bm.format.GetMutableParameters()
|
||||
mp, mperr := bm.format.GetMutableParameters(ctx)
|
||||
if mperr != nil {
|
||||
return EmptyID, errors.Wrap(mperr, "mutable parameters")
|
||||
}
|
||||
@@ -924,9 +929,9 @@ func (bm *WriteManager) lock() {
|
||||
}
|
||||
|
||||
// +checklocksrelease:bm.mu
|
||||
func (bm *WriteManager) unlock() {
|
||||
func (bm *WriteManager) unlock(ctx context.Context) {
|
||||
if bm.checkInvariantsOnUnlock {
|
||||
mp, mperr := bm.format.GetMutableParameters()
|
||||
mp, mperr := bm.format.GetMutableParameters(ctx)
|
||||
if mperr == nil {
|
||||
bm.verifyInvariantsLocked(mp)
|
||||
}
|
||||
|
||||
@@ -20,7 +20,7 @@ func (sm *SharedManager) Refresh(ctx context.Context) error {
|
||||
|
||||
sm.log.Debugf("Refresh started")
|
||||
|
||||
ibm, err := sm.indexBlobManager()
|
||||
ibm, err := sm.indexBlobManager(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -44,7 +44,7 @@ func (sm *SharedManager) CompactIndexes(ctx context.Context, opt indexblob.Compa
|
||||
|
||||
sm.log.Debugf("CompactIndexes(%+v)", opt)
|
||||
|
||||
ibm, err := sm.indexBlobManager()
|
||||
ibm, err := sm.indexBlobManager(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -84,9 +84,9 @@ func maybeParallelExecutor(parallel int, originalCallback IterateCallback) (Iter
|
||||
return callback, cleanup
|
||||
}
|
||||
|
||||
func (bm *WriteManager) snapshotUncommittedItems() index.Builder {
|
||||
func (bm *WriteManager) snapshotUncommittedItems(ctx context.Context) index.Builder {
|
||||
bm.lock()
|
||||
defer bm.unlock()
|
||||
defer bm.unlock(ctx)
|
||||
|
||||
overlay := bm.packIndexBuilder.Clone()
|
||||
|
||||
@@ -116,7 +116,7 @@ func (bm *WriteManager) IterateContents(ctx context.Context, opts IterateOptions
|
||||
callback, cleanup := maybeParallelExecutor(opts.Parallel, callback)
|
||||
defer cleanup() //nolint:errcheck
|
||||
|
||||
uncommitted := bm.snapshotUncommittedItems()
|
||||
uncommitted := bm.snapshotUncommittedItems(ctx)
|
||||
|
||||
invokeCallback := func(i Info) error {
|
||||
if !opts.IncludeDeleted {
|
||||
|
||||
@@ -121,7 +121,7 @@ func (sm *SharedManager) getContentDataReadLocked(ctx context.Context, pp *pendi
|
||||
return sm.decryptContentAndVerify(payload.Bytes(), bi, output)
|
||||
}
|
||||
|
||||
func (sm *SharedManager) preparePackDataContent(pp *pendingPackInfo) (index.Builder, error) {
|
||||
func (sm *SharedManager) preparePackDataContent(mp format.MutableParameters, pp *pendingPackInfo) (index.Builder, error) {
|
||||
packFileIndex := index.Builder{}
|
||||
haveContent := false
|
||||
|
||||
@@ -173,7 +173,7 @@ func (sm *SharedManager) preparePackDataContent(pp *pendingPackInfo) (index.Buil
|
||||
}
|
||||
}
|
||||
|
||||
err := sm.appendPackFileIndexRecoveryData(packFileIndex, pp.currentPackData)
|
||||
err := sm.appendPackFileIndexRecoveryData(mp, packFileIndex, pp.currentPackData)
|
||||
|
||||
return packFileIndex, err
|
||||
}
|
||||
|
||||
@@ -1830,7 +1830,7 @@ func (s *contentManagerSuite) TestAutoCompressionOfMetadata(t *testing.T) {
|
||||
info, err := bm.ContentInfo(ctx, contentID)
|
||||
require.NoError(t, err)
|
||||
|
||||
if scc, _ := bm.SupportsContentCompression(); scc {
|
||||
if scc, _ := bm.SupportsContentCompression(ctx); scc {
|
||||
require.Equal(t, compression.HeaderZstdFastest, info.GetCompressionHeaderID())
|
||||
} else {
|
||||
require.Equal(t, NoCompression, info.GetCompressionHeaderID())
|
||||
|
||||
@@ -9,12 +9,12 @@
|
||||
|
||||
// Reader defines content read API.
|
||||
type Reader interface {
|
||||
SupportsContentCompression() (bool, error)
|
||||
SupportsContentCompression(ctx context.Context) (bool, error)
|
||||
ContentFormat() format.Provider
|
||||
GetContent(ctx context.Context, id ID) ([]byte, error)
|
||||
ContentInfo(ctx context.Context, id ID) (Info, error)
|
||||
IterateContents(ctx context.Context, opts IterateOptions, callback IterateCallback) error
|
||||
IteratePacks(ctx context.Context, opts IteratePackOptions, callback IteratePacksCallback) error
|
||||
ListActiveSessions(ctx context.Context) (map[SessionID]*SessionInfo, error)
|
||||
EpochManager() (*epoch.Manager, bool, error)
|
||||
EpochManager(ctx context.Context) (*epoch.Manager, bool, error)
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ type cleanupEntry struct {
|
||||
|
||||
// IndexFormattingOptions provides options for formatting index blobs.
|
||||
type IndexFormattingOptions interface {
|
||||
GetMutableParameters() (format.MutableParameters, error)
|
||||
GetMutableParameters(ctx context.Context) (format.MutableParameters, error)
|
||||
}
|
||||
|
||||
// ManagerV0 is a V0 (legacy) implementation of index blob manager.
|
||||
@@ -155,7 +155,7 @@ func (m *ManagerV0) Compact(ctx context.Context, opt CompactOptions) error {
|
||||
return errors.Wrap(err, "error listing active index blobs")
|
||||
}
|
||||
|
||||
mp, mperr := m.formattingOptions.GetMutableParameters()
|
||||
mp, mperr := m.formattingOptions.GetMutableParameters(ctx)
|
||||
if mperr != nil {
|
||||
return errors.Wrap(mperr, "mutable parameters")
|
||||
}
|
||||
@@ -484,7 +484,7 @@ func (m *ManagerV0) compactIndexBlobs(ctx context.Context, indexBlobs []Metadata
|
||||
return nil
|
||||
}
|
||||
|
||||
mp, mperr := m.formattingOptions.GetMutableParameters()
|
||||
mp, mperr := m.formattingOptions.GetMutableParameters(ctx)
|
||||
if mperr != nil {
|
||||
return errors.Wrap(mperr, "mutable parameters")
|
||||
}
|
||||
|
||||
@@ -76,7 +76,7 @@ func (m *ManagerV1) CompactEpoch(ctx context.Context, blobIDs []blob.ID, outputP
|
||||
}
|
||||
}
|
||||
|
||||
mp, mperr := m.formattingOptions.GetMutableParameters()
|
||||
mp, mperr := m.formattingOptions.GetMutableParameters(ctx)
|
||||
if mperr != nil {
|
||||
return errors.Wrap(mperr, "mutable parameters")
|
||||
}
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package format
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/internal/epoch"
|
||||
@@ -44,7 +46,7 @@ func (f *ContentFormat) ResolveFormatVersion() error {
|
||||
}
|
||||
|
||||
// GetMutableParameters implements FormattingOptionsProvider.
|
||||
func (f *ContentFormat) GetMutableParameters() (MutableParameters, error) {
|
||||
func (f *ContentFormat) GetMutableParameters(ctx context.Context) (MutableParameters, error) {
|
||||
return f.MutableParameters, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -59,8 +59,8 @@ type Manager struct {
|
||||
ignoreCacheOnFirstRefresh bool
|
||||
}
|
||||
|
||||
func (m *Manager) getOrRefreshFormat() (Provider, error) {
|
||||
if err := m.maybeRefreshNotLocked(); err != nil {
|
||||
func (m *Manager) getOrRefreshFormat(ctx context.Context) (Provider, error) {
|
||||
if err := m.maybeRefreshNotLocked(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -70,7 +70,7 @@ func (m *Manager) getOrRefreshFormat() (Provider, error) {
|
||||
return m.current, nil
|
||||
}
|
||||
|
||||
func (m *Manager) maybeRefreshNotLocked() error {
|
||||
func (m *Manager) maybeRefreshNotLocked(ctx context.Context) error {
|
||||
m.mu.RLock()
|
||||
val := m.validUntil
|
||||
m.mu.RUnlock()
|
||||
@@ -80,7 +80,7 @@ func (m *Manager) maybeRefreshNotLocked() error {
|
||||
}
|
||||
|
||||
// current format not valid anymore, kick off a refresh
|
||||
return m.refresh(m.ctx)
|
||||
return m.refresh(ctx)
|
||||
}
|
||||
|
||||
// readAndCacheRepositoryBlobBytes reads the provided blob from the repository or cache directory.
|
||||
@@ -247,31 +247,31 @@ func (m *Manager) SupportsPasswordChange() bool {
|
||||
|
||||
// RepositoryFormatBytes returns the bytes of `kopia.repository` blob.
|
||||
// This function blocks to refresh the format blob if necessary.
|
||||
func (m *Manager) RepositoryFormatBytes() ([]byte, error) {
|
||||
f, err := m.getOrRefreshFormat()
|
||||
func (m *Manager) RepositoryFormatBytes(ctx context.Context) ([]byte, error) {
|
||||
f, err := m.getOrRefreshFormat(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
//nolint:wrapcheck
|
||||
return f.RepositoryFormatBytes()
|
||||
return f.RepositoryFormatBytes(ctx)
|
||||
}
|
||||
|
||||
// GetMutableParameters gets mutable paramers of the repository.
|
||||
// This function blocks to refresh the format blob if necessary.
|
||||
func (m *Manager) GetMutableParameters() (MutableParameters, error) {
|
||||
f, err := m.getOrRefreshFormat()
|
||||
func (m *Manager) GetMutableParameters(ctx context.Context) (MutableParameters, error) {
|
||||
f, err := m.getOrRefreshFormat(ctx)
|
||||
if err != nil {
|
||||
return MutableParameters{}, err
|
||||
}
|
||||
|
||||
//nolint:wrapcheck
|
||||
return f.GetMutableParameters()
|
||||
return f.GetMutableParameters(ctx)
|
||||
}
|
||||
|
||||
// UpgradeLockIntent returns the current lock intent.
|
||||
func (m *Manager) UpgradeLockIntent() (*UpgradeLockIntent, error) {
|
||||
if err := m.maybeRefreshNotLocked(); err != nil {
|
||||
func (m *Manager) UpgradeLockIntent(ctx context.Context) (*UpgradeLockIntent, error) {
|
||||
if err := m.maybeRefreshNotLocked(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -282,8 +282,8 @@ func (m *Manager) UpgradeLockIntent() (*UpgradeLockIntent, error) {
|
||||
}
|
||||
|
||||
// RequiredFeatures returns the list of features required to open the repository.
|
||||
func (m *Manager) RequiredFeatures() ([]feature.Required, error) {
|
||||
if err := m.maybeRefreshNotLocked(); err != nil {
|
||||
func (m *Manager) RequiredFeatures(ctx context.Context) ([]feature.Required, error) {
|
||||
if err := m.maybeRefreshNotLocked(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -326,8 +326,8 @@ func (m *Manager) UniqueID() []byte {
|
||||
}
|
||||
|
||||
// BlobCfgBlob gets the BlobStorageConfiguration.
|
||||
func (m *Manager) BlobCfgBlob() (BlobStorageConfiguration, error) {
|
||||
if err := m.maybeRefreshNotLocked(); err != nil {
|
||||
func (m *Manager) BlobCfgBlob(ctx context.Context) (BlobStorageConfiguration, error) {
|
||||
if err := m.maybeRefreshNotLocked(ctx); err != nil {
|
||||
return BlobStorageConfiguration{}, err
|
||||
}
|
||||
|
||||
|
||||
@@ -402,7 +402,7 @@ func TestFormatManagerValidDuration(t *testing.T) {
|
||||
func mustGetMutableParameters(t *testing.T, mgr *format.Manager) format.MutableParameters {
|
||||
t.Helper()
|
||||
|
||||
mp, err := mgr.GetMutableParameters()
|
||||
mp, err := mgr.GetMutableParameters(testlogging.Context(t))
|
||||
require.NoError(t, err)
|
||||
|
||||
return mp
|
||||
@@ -411,7 +411,7 @@ func mustGetMutableParameters(t *testing.T, mgr *format.Manager) format.MutableP
|
||||
func mustGetUpgradeLockIntent(t *testing.T, mgr *format.Manager) *format.UpgradeLockIntent {
|
||||
t.Helper()
|
||||
|
||||
uli, err := mgr.GetUpgradeLockIntent()
|
||||
uli, err := mgr.GetUpgradeLockIntent(testlogging.Context(t))
|
||||
require.NoError(t, err)
|
||||
|
||||
return uli
|
||||
@@ -420,7 +420,7 @@ func mustGetUpgradeLockIntent(t *testing.T, mgr *format.Manager) *format.Upgrade
|
||||
func mustGetRepositoryFormatBytes(t *testing.T, mgr *format.Manager) []byte {
|
||||
t.Helper()
|
||||
|
||||
b, err := mgr.RepositoryFormatBytes()
|
||||
b, err := mgr.RepositoryFormatBytes(testlogging.Context(t))
|
||||
require.NoError(t, err)
|
||||
|
||||
return b
|
||||
@@ -429,7 +429,7 @@ func mustGetRepositoryFormatBytes(t *testing.T, mgr *format.Manager) []byte {
|
||||
func mustGetRequiredFeatures(t *testing.T, mgr *format.Manager) []feature.Required {
|
||||
t.Helper()
|
||||
|
||||
rf, err := mgr.RequiredFeatures()
|
||||
rf, err := mgr.RequiredFeatures(testlogging.Context(t))
|
||||
require.NoError(t, err)
|
||||
|
||||
return rf
|
||||
@@ -438,7 +438,7 @@ func mustGetRequiredFeatures(t *testing.T, mgr *format.Manager) []feature.Requir
|
||||
func mustGetBlobStorageConfiguration(t *testing.T, mgr *format.Manager) format.BlobStorageConfiguration {
|
||||
t.Helper()
|
||||
|
||||
cfg, err := mgr.BlobCfgBlob()
|
||||
cfg, err := mgr.BlobCfgBlob(testlogging.Context(t))
|
||||
require.NoError(t, err)
|
||||
|
||||
return cfg
|
||||
@@ -447,7 +447,7 @@ func mustGetBlobStorageConfiguration(t *testing.T, mgr *format.Manager) format.B
|
||||
func expectMutableParametersError(t *testing.T, mgr *format.Manager) error {
|
||||
t.Helper()
|
||||
|
||||
_, err := mgr.GetMutableParameters()
|
||||
_, err := mgr.GetMutableParameters(testlogging.Context(t))
|
||||
require.Error(t, err)
|
||||
|
||||
return err
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package format
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
@@ -56,11 +58,11 @@ type Provider interface {
|
||||
|
||||
// this is typically cached, but sometimes refreshes MutableParameters from
|
||||
// the repository so the results should not be cached.
|
||||
GetMutableParameters() (MutableParameters, error)
|
||||
GetMutableParameters(ctx context.Context) (MutableParameters, error)
|
||||
SupportsPasswordChange() bool
|
||||
GetMasterKey() []byte
|
||||
|
||||
RepositoryFormatBytes() ([]byte, error)
|
||||
RepositoryFormatBytes(ctx context.Context) ([]byte, error)
|
||||
}
|
||||
|
||||
type formattingOptionsProvider struct {
|
||||
@@ -149,7 +151,7 @@ func (f *formattingOptionsProvider) HashFunc() hashing.HashFunc {
|
||||
return f.h
|
||||
}
|
||||
|
||||
func (f *formattingOptionsProvider) RepositoryFormatBytes() ([]byte, error) {
|
||||
func (f *formattingOptionsProvider) RepositoryFormatBytes(ctx context.Context) ([]byte, error) {
|
||||
if f.SupportsPasswordChange() {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
@@ -37,7 +37,7 @@ func BackupBlobID(l UpgradeLockIntent) blob.ID {
|
||||
// should cause the unsupporting clients (non-upgrade capable) to fail
|
||||
// connecting to the repository.
|
||||
func (m *Manager) SetUpgradeLockIntent(ctx context.Context, l UpgradeLockIntent) (*UpgradeLockIntent, error) {
|
||||
if err := m.maybeRefreshNotLocked(); err != nil {
|
||||
if err := m.maybeRefreshNotLocked(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -96,7 +96,7 @@ func WriteLegacyIndexPoisonBlob(ctx context.Context, st blob.Storage) error {
|
||||
// blob. This in-effect commits the new repository format to the repository and
|
||||
// resumes all access to the repository.
|
||||
func (m *Manager) CommitUpgrade(ctx context.Context) error {
|
||||
if err := m.maybeRefreshNotLocked(); err != nil {
|
||||
if err := m.maybeRefreshNotLocked(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -125,7 +125,7 @@ func (m *Manager) CommitUpgrade(ctx context.Context) error {
|
||||
// hence using this API could render the repository corrupted and unreadable by
|
||||
// clients.
|
||||
func (m *Manager) RollbackUpgrade(ctx context.Context) error {
|
||||
if err := m.maybeRefreshNotLocked(); err != nil {
|
||||
if err := m.maybeRefreshNotLocked(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -186,8 +186,8 @@ func (m *Manager) RollbackUpgrade(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// GetUpgradeLockIntent gets the current upgrade lock intent.
|
||||
func (m *Manager) GetUpgradeLockIntent() (*UpgradeLockIntent, error) {
|
||||
if err := m.maybeRefreshNotLocked(); err != nil {
|
||||
func (m *Manager) GetUpgradeLockIntent(ctx context.Context) (*UpgradeLockIntent, error) {
|
||||
if err := m.maybeRefreshNotLocked(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@@ -192,7 +192,7 @@ func TestFormatUpgradeMultipleLocksRollback(t *testing.T) {
|
||||
opts.UpgradeOwnerID = "another-upgrade-owner"
|
||||
})
|
||||
|
||||
mp, mperr := env.RepositoryWriter.ContentManager().ContentFormat().GetMutableParameters()
|
||||
mp, mperr := env.RepositoryWriter.ContentManager().ContentFormat().GetMutableParameters(ctx)
|
||||
require.NoError(t, mperr)
|
||||
require.Equal(t, format.FormatVersion3, mp.Version)
|
||||
|
||||
@@ -213,7 +213,7 @@ func TestFormatUpgradeMultipleLocksRollback(t *testing.T) {
|
||||
require.EqualError(t, env.RepositoryWriter.FormatManager().CommitUpgrade(ctx), "no upgrade in progress")
|
||||
|
||||
// verify that we are back to the original version where we started from
|
||||
mp, err = env.RepositoryWriter.ContentManager().ContentFormat().GetMutableParameters()
|
||||
mp, err = env.RepositoryWriter.ContentManager().ContentFormat().GetMutableParameters(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, format.FormatVersion1, mp.Version)
|
||||
|
||||
@@ -682,7 +682,7 @@ func (r *grpcInnerSession) GetContent(ctx context.Context, contentID content.ID)
|
||||
return nil, errNoSessionResponse()
|
||||
}
|
||||
|
||||
func (r *grpcRepositoryClient) SupportsContentCompression() (bool, error) {
|
||||
func (r *grpcRepositoryClient) SupportsContentCompression(ctx context.Context) (bool, error) {
|
||||
return r.serverSupportsContentCompression, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -40,7 +40,7 @@ func ExtendBlobRetentionTime(ctx context.Context, rep repo.DirectRepositoryWrite
|
||||
opt.Parallel = runtime.NumCPU() * parallelBlobRetainCPUMultiplier
|
||||
}
|
||||
|
||||
blobCfg, err := rep.FormatManager().BlobCfgBlob()
|
||||
blobCfg, err := rep.FormatManager().BlobCfgBlob(ctx)
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "blob configuration")
|
||||
}
|
||||
|
||||
@@ -136,7 +136,7 @@ func getContentToRewrite(ctx context.Context, rep repo.DirectRepository, opt *Re
|
||||
|
||||
// add all content IDs from short packs
|
||||
if opt.ShortPacks {
|
||||
mp, mperr := rep.ContentReader().ContentFormat().GetMutableParameters()
|
||||
mp, mperr := rep.ContentReader().ContentFormat().GetMutableParameters(ctx)
|
||||
if mperr == nil {
|
||||
threshold := int64(mp.MaxPackSize * shortPackThresholdPercent / 100) //nolint:gomnd
|
||||
findContentInShortPacks(ctx, rep, ch, threshold, opt)
|
||||
|
||||
@@ -247,7 +247,7 @@ func Run(ctx context.Context, runParams RunParameters, safety SafetyParameters)
|
||||
}
|
||||
|
||||
func runQuickMaintenance(ctx context.Context, runParams RunParameters, safety SafetyParameters) error {
|
||||
_, ok, emerr := runParams.rep.ContentManager().EpochManager()
|
||||
_, ok, emerr := runParams.rep.ContentManager().EpochManager(ctx)
|
||||
if ok {
|
||||
log(ctx).Debugf("quick maintenance not required for epoch manager")
|
||||
return nil
|
||||
@@ -328,7 +328,7 @@ func runTaskCleanupLogs(ctx context.Context, runParams RunParameters, s *Schedul
|
||||
}
|
||||
|
||||
func runTaskCleanupEpochManager(ctx context.Context, runParams RunParameters, s *Schedule) error {
|
||||
em, ok, emerr := runParams.rep.ContentManager().EpochManager()
|
||||
em, ok, emerr := runParams.rep.ContentManager().EpochManager(ctx)
|
||||
if emerr != nil {
|
||||
return errors.Wrap(emerr, "epoch manager")
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ type contentReader interface {
|
||||
|
||||
type contentManager interface {
|
||||
contentReader
|
||||
SupportsContentCompression() (bool, error)
|
||||
SupportsContentCompression(ctx context.Context) (bool, error)
|
||||
WriteContent(ctx context.Context, data gather.Bytes, prefix content.IDPrefix, comp compression.HeaderID) (content.ID, error)
|
||||
}
|
||||
|
||||
|
||||
@@ -79,7 +79,7 @@ func (f *fakeContentManager) WriteContent(ctx context.Context, data gather.Bytes
|
||||
return contentID, nil
|
||||
}
|
||||
|
||||
func (f *fakeContentManager) SupportsContentCompression() (bool, error) {
|
||||
func (f *fakeContentManager) SupportsContentCompression(ctx context.Context) (bool, error) {
|
||||
return f.supportsContentCompression, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -188,7 +188,7 @@ func (w *objectWriter) prepareAndWriteContentChunk(chunkID int, data gather.Byte
|
||||
comp := content.NoCompression
|
||||
objectComp := w.compressor
|
||||
|
||||
scc, err := w.om.contentMgr.SupportsContentCompression()
|
||||
scc, err := w.om.contentMgr.SupportsContentCompression(w.ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "supports content compression")
|
||||
}
|
||||
|
||||
@@ -286,7 +286,7 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions,
|
||||
return lc2.writeToFile(configFile)
|
||||
})
|
||||
|
||||
blobcfg, err := fmgr.BlobCfgBlob()
|
||||
blobcfg, err := fmgr.BlobCfgBlob(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "blob configuration")
|
||||
}
|
||||
@@ -297,7 +297,7 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions,
|
||||
|
||||
_, err = retry.WithExponentialBackoffMaxRetries(ctx, -1, "wait for upgrade", func() (interface{}, error) {
|
||||
//nolint:govet
|
||||
uli, err := fmgr.UpgradeLockIntent()
|
||||
uli, err := fmgr.UpgradeLockIntent(ctx)
|
||||
if err != nil {
|
||||
//nolint:wrapcheck
|
||||
return nil, err
|
||||
@@ -377,7 +377,7 @@ func openWithConfig(ctx context.Context, st blob.Storage, cliOpts ClientOptions,
|
||||
}
|
||||
|
||||
func handleMissingRequiredFeatures(ctx context.Context, fmgr *format.Manager, ignoreErrors bool) error {
|
||||
required, err := fmgr.RequiredFeatures()
|
||||
required, err := fmgr.RequiredFeatures(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "required features")
|
||||
}
|
||||
@@ -455,7 +455,7 @@ func upgradeLockMonitor(
|
||||
return nil
|
||||
}
|
||||
|
||||
uli, err := fmgr.UpgradeLockIntent()
|
||||
uli, err := fmgr.UpgradeLockIntent(ctx)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "upgrade lock intent")
|
||||
}
|
||||
|
||||
@@ -893,16 +893,16 @@ func TestDeriveKey(t *testing.T) {
|
||||
dw1Upgraded := env.Repository.(repo.DirectRepositoryWriter)
|
||||
cf := dw1Upgraded.ContentReader().ContentFormat()
|
||||
|
||||
mp, mperr := cf.GetMutableParameters()
|
||||
mp, mperr := cf.GetMutableParameters(ctx)
|
||||
require.NoError(t, mperr)
|
||||
|
||||
feat, err := dw1Upgraded.FormatManager().RequiredFeatures()
|
||||
feat, err := dw1Upgraded.FormatManager().RequiredFeatures(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// perform upgrade
|
||||
mp.Version = v2
|
||||
|
||||
blobCfg, err := dw1Upgraded.FormatManager().BlobCfgBlob()
|
||||
blobCfg, err := dw1Upgraded.FormatManager().BlobCfgBlob(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, dw1Upgraded.FormatManager().SetParameters(ctx, mp, blobCfg, feat))
|
||||
@@ -927,7 +927,7 @@ func TestDeriveKey(t *testing.T) {
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
mp, err := tc.dw.FormatManager().GetMutableParameters()
|
||||
mp, err := tc.dw.FormatManager().GetMutableParameters(testlogging.Context(t))
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, tc.wantFormat, mp.Version)
|
||||
|
||||
Reference in New Issue
Block a user