refactor(repository): ensure MutableParameters are never cached (#2284)

This commit is contained in:
Jarek Kowalski
2022-08-06 18:11:32 -07:00
committed by GitHub
parent 46697a69ae
commit 23299c3451
31 changed files with 387 additions and 255 deletions

View File

@@ -40,7 +40,12 @@ func setDefaultMaintenanceParameters(ctx context.Context, rep repo.RepositoryWri
p.Owner = rep.ClientOptions().UsernameAtHost()
if dw, ok := rep.(repo.DirectRepositoryWriter); ok {
if _, ok := dw.ContentReader().EpochManager(); ok {
_, ok, err := dw.ContentReader().EpochManager()
if err != nil {
return errors.Wrap(err, "epoch manager")
}
if ok {
// disable quick maintenance cycle
p.QuickCycle.Enabled = false
}

View File

@@ -23,7 +23,11 @@ func (c *commandIndexEpochList) setup(svc appServices, parent commandParent) {
}
func (c *commandIndexEpochList) run(ctx context.Context, rep repo.DirectRepository) error {
emgr, ok := rep.ContentReader().EpochManager()
emgr, ok, err := rep.ContentReader().EpochManager()
if err != nil {
return errors.Wrap(err, "epoch manager")
}
if !ok {
return errors.Errorf("epoch manager is not active")
}

View File

@@ -3,6 +3,8 @@
import (
"context"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/maintenance"
"github.com/kopia/kopia/snapshot/snapshotmaintenance"
@@ -25,7 +27,11 @@ func (c *commandMaintenanceRun) setup(svc appServices, parent commandParent) {
func (c *commandMaintenanceRun) run(ctx context.Context, rep repo.DirectRepositoryWriter) error {
mode := maintenance.ModeQuick
_, supportsEpochManager := rep.ContentManager().EpochManager()
_, supportsEpochManager, err := rep.ContentManager().EpochManager()
if err != nil {
return errors.Wrap(err, "EpochManager")
}
if c.maintenanceRunFull || supportsEpochManager {
mode = maintenance.ModeFull

View File

@@ -15,6 +15,7 @@
"github.com/kopia/kopia/internal/units"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content/index"
"github.com/kopia/kopia/repo/format"
)
@@ -126,7 +127,7 @@ func (c *commandRepositoryStatus) dumpRetentionStatus(dr repo.DirectRepository)
}
}
// nolint: funlen
// nolint: funlen,gocyclo
func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository) error {
if c.jo.jsonOutput {
return c.outputJSON(ctx, rep)
@@ -172,21 +173,31 @@ func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository)
contentFormat := dr.ContentReader().ContentFormat()
mp, mperr := contentFormat.GetMutableParameters()
if mperr != nil {
return errors.Wrap(mperr, "mutable parameters")
}
c.out.printStdout("\n")
c.out.printStdout("Unique ID: %x\n", dr.UniqueID())
c.out.printStdout("Hash: %v\n", contentFormat.GetHashFunction())
c.out.printStdout("Encryption: %v\n", contentFormat.GetEncryptionAlgorithm())
c.out.printStdout("Splitter: %v\n", dr.ObjectFormat().Splitter)
c.out.printStdout("Format version: %v\n", contentFormat.FormatVersion())
c.out.printStdout("Content compression: %v\n", dr.ContentReader().SupportsContentCompression())
c.out.printStdout("Format version: %v\n", mp.Version)
c.out.printStdout("Content compression: %v\n", mp.IndexVersion >= index.Version2)
c.out.printStdout("Password changes: %v\n", contentFormat.SupportsPasswordChange())
c.outputRequiredFeatures(dr)
c.out.printStdout("Max pack length: %v\n", units.BytesStringBase2(int64(contentFormat.MaxPackBlobSize())))
c.out.printStdout("Index Format: v%v\n", contentFormat.WriteIndexVersion())
c.out.printStdout("Max pack length: %v\n", units.BytesStringBase2(int64(mp.MaxPackSize)))
c.out.printStdout("Index Format: v%v\n", mp.IndexVersion)
if emgr, ok := dr.ContentReader().EpochManager(); ok {
emgr, epochMgrEnabled, emerr := dr.ContentReader().EpochManager()
if emerr != nil {
return errors.Wrap(emerr, "epoch manager")
}
if epochMgrEnabled {
c.out.printStdout("\n")
c.out.printStdout("Epoch Manager: enabled\n")
@@ -196,10 +207,10 @@ func (c *commandRepositoryStatus) run(ctx context.Context, rep repo.Repository)
}
c.out.printStdout("\n")
c.out.printStdout("Epoch refresh frequency: %v\n", contentFormat.GetEpochRefreshFrequency())
c.out.printStdout("Epoch advance on: %v blobs or %v, minimum %v\n", contentFormat.GetEpochAdvanceOnCountThreshold(), units.BytesStringBase2(contentFormat.GetEpochAdvanceOnTotalSizeBytesThreshold()), contentFormat.GetMinEpochDuration())
c.out.printStdout("Epoch cleanup margin: %v\n", contentFormat.GetEpochCleanupSafetyMargin())
c.out.printStdout("Epoch checkpoint every: %v epochs\n", contentFormat.GetEpochFullCheckpointFrequency())
c.out.printStdout("Epoch refresh frequency: %v\n", mp.EpochParameters.EpochRefreshFrequency)
c.out.printStdout("Epoch advance on: %v blobs or %v, minimum %v\n", mp.EpochParameters.EpochAdvanceOnCountThreshold, units.BytesStringBase2(mp.EpochParameters.EpochAdvanceOnTotalSizeBytesThreshold), mp.EpochParameters.MinEpochDuration)
c.out.printStdout("Epoch cleanup margin: %v\n", mp.EpochParameters.CleanupSafetyMargin)
c.out.printStdout("Epoch checkpoint every: %v epochs\n", mp.EpochParameters.FullCheckpointFrequency)
} else {
c.out.printStdout("Epoch Manager: disabled\n")
}

View File

@@ -123,7 +123,12 @@ func (c *commandRepositoryUpgrade) setLockIntent(ctx context.Context, rep repo.D
}
now := rep.Time()
mp := rep.ContentReader().ContentFormat().GetMutableParameters()
mp, mperr := rep.ContentReader().ContentFormat().GetMutableParameters()
if mperr != nil {
return errors.Wrap(mperr, "mutable parameters")
}
openOpts := c.svc.optionsFromFlags(ctx)
l := &format.UpgradeLockIntent{
OwnerID: openOpts.UpgradeOwnerID,
@@ -164,7 +169,13 @@ func (c *commandRepositoryUpgrade) setLockIntent(ctx context.Context, rep repo.D
// skipped until the lock is fully established.
func (c *commandRepositoryUpgrade) drainOrCommit(ctx context.Context, rep repo.DirectRepositoryWriter) error {
cf := rep.ContentReader().ContentFormat()
if cf.GetEpochManagerEnabled() {
mp, mperr := cf.GetMutableParameters()
if mperr != nil {
return errors.Wrap(mperr, "mutable parameters")
}
if mp.EpochParameters.Enabled {
log(ctx).Infof("Repository indices have already been migrated to the epoch format, no need to drain other clients")
l, err := rep.GetUpgradeLockIntent(ctx)
@@ -251,7 +262,10 @@ func (c *commandRepositoryUpgrade) drainAllClients(ctx context.Context, rep repo
// repository. This phase runs after the lock has been acquired in one of the
// prior phases.
func (c *commandRepositoryUpgrade) upgrade(ctx context.Context, rep repo.DirectRepositoryWriter) error {
mp := rep.ContentReader().ContentFormat().GetMutableParameters()
mp, mperr := rep.ContentReader().ContentFormat().GetMutableParameters()
if mperr != nil {
return errors.Wrap(mperr, "mutable parameters")
}
rf, err := rep.RequiredFeatures()
if err != nil {

View File

@@ -31,29 +31,7 @@
// ParametersProvider provides epoch manager parameters.
type ParametersProvider interface {
// whether epoch manager is enabled, must be true.
GetEpochManagerEnabled() bool
// how frequently each client will list blobs to determine the current epoch.
GetEpochRefreshFrequency() time.Duration
// number of epochs between full checkpoints.
GetEpochFullCheckpointFrequency() int
// do not delete uncompacted blobs if the corresponding compacted blob age is less than this.
GetEpochCleanupSafetyMargin() time.Duration
// minimum duration of an epoch
GetMinEpochDuration() time.Duration
// advance epoch if number of files exceeds this
GetEpochAdvanceOnCountThreshold() int
// advance epoch if total size of files exceeds this.
GetEpochAdvanceOnTotalSizeBytesThreshold() int64
// number of blobs to delete in parallel during cleanup
GetEpochDeleteParallelism() int
GetParameters() (*Parameters, error)
}
// ErrVerySlowIndexWrite is returned by WriteIndex if a write takes more than 2 epochs (usually >48h).
@@ -196,7 +174,7 @@ func (cs *CurrentSnapshot) isSettledEpochNumber(epoch int) bool {
// Manager manages repository epochs.
type Manager struct {
Parameters ParametersProvider
paramProvider ParametersProvider
st blob.Storage
compact CompactionFunc
@@ -317,7 +295,7 @@ func (e *Manager) maxCleanupTime(cs CurrentSnapshot) time.Time {
return maxTime
}
func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot) error {
func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot, p *Parameters) error {
eg, ctx := errgroup.WithContext(ctx)
// find max timestamp recently written to the repository to establish storage clock.
@@ -331,14 +309,14 @@ func (e *Manager) cleanupInternal(ctx context.Context, cs CurrentSnapshot) error
// only delete blobs if a suitable replacement exists and has been written sufficiently
// long ago. we don't want to delete blobs that are created too recently, because other clients
// may have not observed them yet.
maxReplacementTime := maxTime.Add(-e.Parameters.GetEpochCleanupSafetyMargin())
maxReplacementTime := maxTime.Add(-p.CleanupSafetyMargin)
eg.Go(func() error {
return e.cleanupEpochMarkers(ctx, cs)
})
eg.Go(func() error {
return e.cleanupWatermarks(ctx, cs, maxReplacementTime)
return e.cleanupWatermarks(ctx, cs, p, maxReplacementTime)
})
return errors.Wrap(eg.Wait(), "error cleaning up index blobs")
@@ -356,10 +334,15 @@ func (e *Manager) cleanupEpochMarkers(ctx context.Context, cs CurrentSnapshot) e
}
}
return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.Parameters.GetEpochDeleteParallelism()), "error deleting index blob marker")
p, err := e.getParameters()
if err != nil {
return err
}
return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, p.DeleteParallelism), "error deleting index blob marker")
}
func (e *Manager) cleanupWatermarks(ctx context.Context, cs CurrentSnapshot, maxReplacementTime time.Time) error {
func (e *Manager) cleanupWatermarks(ctx context.Context, cs CurrentSnapshot, p *Parameters, maxReplacementTime time.Time) error {
var toDelete []blob.ID
for _, bm := range cs.DeletionWatermarkBlobs {
@@ -377,7 +360,7 @@ func (e *Manager) cleanupWatermarks(ctx context.Context, cs CurrentSnapshot, max
}
}
return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, e.Parameters.GetEpochDeleteParallelism()), "error deleting watermark blobs")
return errors.Wrap(blob.DeleteMultiple(ctx, e.st, toDelete, p.DeleteParallelism), "error deleting watermark blobs")
}
// CleanupSupersededIndexes cleans up the indexes which have been superseded by compacted ones.
@@ -387,6 +370,11 @@ func (e *Manager) CleanupSupersededIndexes(ctx context.Context) error {
return err
}
p, err := e.getParameters()
if err != nil {
return err
}
// find max timestamp recently written to the repository to establish storage clock.
// we will be deleting blobs whose timestamps are sufficiently old enough relative
// to this max time. This assumes that storage clock moves forward somewhat reasonably.
@@ -398,7 +386,7 @@ func (e *Manager) CleanupSupersededIndexes(ctx context.Context) error {
// only delete blobs if a suitable replacement exists and has been written sufficiently
// long ago. we don't want to delete blobs that are created too recently, because other clients
// may have not observed them yet.
maxReplacementTime := maxTime.Add(-e.Parameters.GetEpochCleanupSafetyMargin())
maxReplacementTime := maxTime.Add(-p.CleanupSafetyMargin)
e.log.Debugw("Cleaning up superseded index blobs...",
"maxReplacementTime", maxReplacementTime)
@@ -420,7 +408,7 @@ func (e *Manager) CleanupSupersededIndexes(ctx context.Context) error {
}
}
if err := blob.DeleteMultiple(ctx, e.st, toDelete, e.Parameters.GetEpochDeleteParallelism()); err != nil {
if err := blob.DeleteMultiple(ctx, e.st, toDelete, p.DeleteParallelism); err != nil {
return errors.Wrap(err, "unable to delete uncompacted blobs")
}
@@ -438,14 +426,28 @@ func blobSetWrittenEarlyEnough(replacementSet []blob.Metadata, maxReplacementTim
return blob.MaxTimestamp(replacementSet).Before(maxReplacementTime)
}
func (e *Manager) getParameters() (*Parameters, error) {
emp, err := e.paramProvider.GetParameters()
if err != nil {
return nil, errors.Wrap(err, "epoch manager parameters")
}
return emp, nil
}
func (e *Manager) refreshLocked(ctx context.Context) error {
if ctx.Err() != nil {
return errors.Wrap(ctx.Err(), "refreshLocked")
}
p, err := e.getParameters()
if err != nil {
return err
}
nextDelayTime := initiaRefreshAttemptSleep
if !e.Parameters.GetEpochManagerEnabled() {
if !p.Enabled {
return errors.Errorf("epoch manager not enabled")
}
@@ -553,7 +555,7 @@ func (e *Manager) loadSingleEpochCompactions(ctx context.Context, cs *CurrentSna
return nil
}
func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs CurrentSnapshot) {
func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs CurrentSnapshot, p *Parameters) {
latestSettled := cs.WriteEpoch - numUnsettledEpochs
if latestSettled < 0 {
return
@@ -564,7 +566,7 @@ func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs
firstNonRangeCompacted = cs.LongestRangeCheckpointSets[len(cs.LongestRangeCheckpointSets)-1].MaxEpoch + 1
}
if latestSettled-firstNonRangeCompacted < e.Parameters.GetEpochFullCheckpointFrequency() {
if latestSettled-firstNonRangeCompacted < p.FullCheckpointFrequency {
e.log.Debugf("not generating range checkpoint")
return
@@ -587,14 +589,14 @@ func (e *Manager) maybeGenerateNextRangeCheckpointAsync(ctx context.Context, cs
func (e *Manager) maybeOptimizeRangeCheckpointsAsync(ctx context.Context, cs CurrentSnapshot) {
}
func (e *Manager) maybeStartCleanupAsync(ctx context.Context, cs CurrentSnapshot) {
func (e *Manager) maybeStartCleanupAsync(ctx context.Context, cs CurrentSnapshot, p *Parameters) {
e.backgroundWork.Add(1)
// we're starting background work, ignore parent cancelation signal.
ctxutil.GoDetached(ctx, func(ctx context.Context) {
defer e.backgroundWork.Done()
if err := e.cleanupInternal(ctx, cs); err != nil {
if err := e.cleanupInternal(ctx, cs, p); err != nil {
e.log.Errorf("error cleaning up index blobs: %v, performance may be affected", err)
}
})
@@ -637,16 +639,21 @@ func (e *Manager) loadUncompactedEpochs(ctx context.Context, min, max int) (map[
// refreshAttemptLocked attempts to load the committedState of
// the index and updates `lastKnownState` state atomically when complete.
func (e *Manager) refreshAttemptLocked(ctx context.Context) error {
e.log.Debugf("refreshAttemptLocked")
p, perr := e.getParameters()
if perr != nil {
return perr
}
cs := CurrentSnapshot{
WriteEpoch: 0,
EpochStartTime: map[int]time.Time{},
UncompactedEpochSets: map[int][]blob.Metadata{},
SingleEpochCompactionSets: map[int][]blob.Metadata{},
ValidUntil: e.timeFunc().Add(e.Parameters.GetEpochRefreshFrequency()),
ValidUntil: e.timeFunc().Add(p.EpochRefreshFrequency),
}
e.log.Debugf("refreshAttemptLocked")
eg, ctx1 := errgroup.WithContext(ctx)
eg.Go(func() error {
@@ -680,7 +687,7 @@ func (e *Manager) refreshAttemptLocked(ctx context.Context) error {
len(ues[cs.WriteEpoch+1]),
cs.ValidUntil.Format(time.RFC3339Nano))
if shouldAdvance(cs.UncompactedEpochSets[cs.WriteEpoch], e.Parameters.GetMinEpochDuration(), e.Parameters.GetEpochAdvanceOnCountThreshold(), e.Parameters.GetEpochAdvanceOnTotalSizeBytesThreshold()) {
if shouldAdvance(cs.UncompactedEpochSets[cs.WriteEpoch], p.MinEpochDuration, p.EpochAdvanceOnCountThreshold, p.EpochAdvanceOnTotalSizeBytesThreshold) {
if err := e.advanceEpoch(ctx, cs); err != nil {
return errors.Wrap(err, "error advancing epoch")
}
@@ -694,8 +701,8 @@ func (e *Manager) refreshAttemptLocked(ctx context.Context) error {
e.lastKnownState = cs
e.maybeGenerateNextRangeCheckpointAsync(ctx, cs)
e.maybeStartCleanupAsync(ctx, cs)
e.maybeGenerateNextRangeCheckpointAsync(ctx, cs, p)
e.maybeStartCleanupAsync(ctx, cs, p)
e.maybeOptimizeRangeCheckpointsAsync(ctx, cs)
return nil
@@ -765,9 +772,16 @@ func (e *Manager) WriteIndex(ctx context.Context, dataShards map[blob.ID]blob.By
writtenForEpoch := -1
for {
e.log.Debugf("refreshAttemptLocked")
p, err := e.getParameters()
if err != nil {
return nil, err
}
// make sure we have at least 75% of remaining time
// nolint:gomnd
cs, err := e.committedState(ctx, 3*e.Parameters.GetEpochRefreshFrequency()/4)
cs, err := e.committedState(ctx, 3*p.EpochRefreshFrequency/4)
if err != nil {
return nil, errors.Wrap(err, "error getting committed state")
}
@@ -997,7 +1011,7 @@ func NewManager(st blob.Storage, paramProvider ParametersProvider, compactor Com
log: log,
compact: compactor,
timeFunc: timeNow,
Parameters: paramProvider,
paramProvider: paramProvider,
getCompleteIndexSetTooSlow: new(int32),
committedStateRefreshTooSlow: new(int32),
writeIndexTooSlow: new(int32),

View File

@@ -92,7 +92,7 @@ func newTestEnv(t *testing.T) *epochManagerTestEnv {
st = fs
st = logging.NewWrapper(st, testlogging.NewTestLogger(t), "[STORAGE] ")
te := &epochManagerTestEnv{unloggedst: unloggedst, st: st, ft: ft}
m := NewManager(te.st, &Parameters{
m := NewManager(te.st, parameterProvider{&Parameters{
Enabled: true,
EpochRefreshFrequency: 20 * time.Minute,
FullCheckpointFrequency: 7,
@@ -102,7 +102,7 @@ func newTestEnv(t *testing.T) *epochManagerTestEnv {
EpochAdvanceOnCountThreshold: 15,
EpochAdvanceOnTotalSizeBytesThreshold: 20 << 20,
DeleteParallelism: 1,
}, te.compact, testlogging.NewTestLogger(t), te.ft.NowFunc())
}}, te.compact, testlogging.NewTestLogger(t), te.ft.NowFunc())
te.mgr = m
te.faultyStorage = fs
te.data = data
@@ -121,7 +121,7 @@ func (te *epochManagerTestEnv) another() *epochManagerTestEnv {
faultyStorage: te.faultyStorage,
}
te2.mgr = NewManager(te2.st, te.mgr.Parameters, te2.compact, te.mgr.log, te.mgr.timeFunc)
te2.mgr = NewManager(te2.st, te.mgr.paramProvider, te2.compact, te.mgr.log, te.mgr.timeFunc)
return te2
}
@@ -577,7 +577,7 @@ func verifySequentialWrites(t *testing.T, te *epochManagerTestEnv) {
func TestIndexEpochManager_Disabled(t *testing.T) {
te := newTestEnv(t)
(te.mgr.Parameters).(*Parameters).Enabled = false
te.mgr.paramProvider.(parameterProvider).Parameters.Enabled = false
_, err := te.mgr.Current(testlogging.Context(t))
require.Error(t, err)
@@ -736,3 +736,11 @@ func (te *epochManagerTestEnv) mustWriteIndexFiles(ctx context.Context, t *testi
require.NoError(t, err)
}
type parameterProvider struct {
*Parameters
}
func (p parameterProvider) GetParameters() (*Parameters, error) {
return p.Parameters, nil
}

View File

@@ -34,11 +34,16 @@ func handleRepoParameters(ctx context.Context, rc requestContext) (interface{},
}, nil
}
scc, err := dr.ContentReader().SupportsContentCompression()
if err != nil {
return nil, internalServerError(err)
}
rp := &remoterepoapi.Parameters{
HashFunction: dr.ContentReader().ContentFormat().GetHashFunction(),
HMACSecret: dr.ContentReader().ContentFormat().GetHmacSecret(),
ObjectFormat: dr.ObjectFormat(),
SupportsContentCompression: dr.ContentReader().SupportsContentCompression(),
SupportsContentCompression: scc,
}
return rp, nil
@@ -54,16 +59,26 @@ func handleRepoStatus(ctx context.Context, rc requestContext) (interface{}, *api
dr, ok := rc.rep.(repo.DirectRepository)
if ok {
mp, mperr := dr.ContentReader().ContentFormat().GetMutableParameters()
if mperr != nil {
return nil, internalServerError(mperr)
}
scc, err := dr.ContentReader().SupportsContentCompression()
if err != nil {
return nil, internalServerError(err)
}
return &serverapi.StatusResponse{
Connected: true,
ConfigFile: dr.ConfigFilename(),
Hash: dr.ContentReader().ContentFormat().GetHashFunction(),
Encryption: dr.ContentReader().ContentFormat().GetEncryptionAlgorithm(),
MaxPackSize: dr.ContentReader().ContentFormat().MaxPackBlobSize(),
MaxPackSize: mp.MaxPackSize,
Splitter: dr.ObjectFormat().Splitter,
Storage: dr.BlobReader().ConnectionInfo().Type,
ClientOptions: dr.ClientOptions(),
SupportsContentCompression: dr.ContentReader().SupportsContentCompression(),
SupportsContentCompression: scc,
}, nil
}

View File

@@ -455,6 +455,11 @@ func (s *Server) handleInitialSessionHandshake(srv grpcapi.KopiaRepository_Sessi
return repo.WriteSessionOptions{}, errors.Errorf("missing initialization request")
}
scc, err := dr.ContentReader().SupportsContentCompression()
if err != nil {
return repo.WriteSessionOptions{}, errors.Wrap(err, "supports content compression")
}
if err := s.send(srv, initializeReq.GetRequestId(), &grpcapi.SessionResponse{
Response: &grpcapi.SessionResponse_InitializeSession{
InitializeSession: &grpcapi.InitializeSessionResponse{
@@ -462,7 +467,7 @@ func (s *Server) handleInitialSessionHandshake(srv grpcapi.KopiaRepository_Sessi
HashFunction: dr.ContentReader().ContentFormat().GetHashFunction(),
HmacSecret: dr.ContentReader().ContentFormat().GetHmacSecret(),
Splitter: dr.ObjectFormat().Splitter,
SupportsContentCompression: dr.ContentReader().SupportsContentCompression(),
SupportsContentCompression: scc,
},
},
},

View File

@@ -145,8 +145,8 @@ func (r *apiServerRepository) Flush(ctx context.Context) error {
return errors.Wrap(r.cli.Post(ctx, "flush", nil, nil), "Flush")
}
func (r *apiServerRepository) SupportsContentCompression() bool {
return r.serverSupportsContentCompression
func (r *apiServerRepository) SupportsContentCompression() (bool, error) {
return r.serverSupportsContentCompression, nil
}
func (r *apiServerRepository) NewWriter(ctx context.Context, opt WriteSessionOptions) (context.Context, RepositoryWriter, error) {

View File

@@ -15,6 +15,7 @@
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content/index"
"github.com/kopia/kopia/repo/format"
"github.com/kopia/kopia/repo/logging"
)
@@ -38,7 +39,7 @@ type committedContentIndex struct {
merged index.Merged
v1PerContentOverhead uint32
indexVersion int
formatProvider format.Provider
// fetchOne loads one index blob
fetchOne func(ctx context.Context, blobID blob.ID, output *gather.WriteBuffer) error
@@ -259,9 +260,14 @@ func (c *committedContentIndex) combineSmallIndexes(m index.Merged) (index.Merge
}
}
mp, mperr := c.formatProvider.GetMutableParameters()
if mperr != nil {
return nil, errors.Wrap(mperr, "error getting mutable parameters")
}
var buf bytes.Buffer
if err := b.Build(&buf, c.indexVersion); err != nil {
if err := b.Build(&buf, mp.IndexVersion); err != nil {
return nil, errors.Wrap(err, "error building combined in-memory index")
}
@@ -349,7 +355,7 @@ func (c *committedContentIndex) missingIndexBlobs(ctx context.Context, blobs []b
func newCommittedContentIndex(caching *CachingOptions,
v1PerContentOverhead uint32,
indexVersion int,
formatProvider format.Provider,
fetchOne func(ctx context.Context, blobID blob.ID, output *gather.WriteBuffer) error,
log logging.Logger,
minSweepAge time.Duration,
@@ -370,7 +376,7 @@ func newCommittedContentIndex(caching *CachingOptions,
cache: cache,
inUse: map[blob.ID]index.Index{},
v1PerContentOverhead: v1PerContentOverhead,
indexVersion: indexVersion,
formatProvider: formatProvider,
fetchOne: fetchOne,
log: log,
}

View File

@@ -181,19 +181,24 @@ func (sm *SharedManager) loadPackIndexesLocked(ctx context.Context) error {
nextSleepTime := 100 * time.Millisecond //nolint:gomnd
for i := 0; i < indexLoadAttempts; i++ {
ibm, err0 := sm.indexBlobManager()
if err0 != nil {
return err0
}
if err := ctx.Err(); err != nil {
// nolint:wrapcheck
return err
}
if i > 0 {
sm.indexBlobManager().flushCache(ctx)
ibm.flushCache(ctx)
sm.log.Debugf("encountered NOT_FOUND when loading, sleeping %v before retrying #%v", nextSleepTime, i)
time.Sleep(nextSleepTime)
nextSleepTime *= 2
}
indexBlobs, ignoreDeletedBefore, err := sm.indexBlobManager().listActiveIndexBlobs(ctx)
indexBlobs, ignoreDeletedBefore, err := ibm.listActiveIndexBlobs(ctx)
if err != nil {
return errors.Wrap(err, "error listing index blobs")
}
@@ -235,12 +240,17 @@ func (sm *SharedManager) getCacheForContentID(id ID) cache.ContentCache {
return sm.contentCache
}
func (sm *SharedManager) indexBlobManager() indexBlobManager {
if sm.format.GetEpochManagerEnabled() {
return sm.indexBlobManagerV1
func (sm *SharedManager) indexBlobManager() (indexBlobManager, error) {
mp, mperr := sm.format.GetMutableParameters()
if mperr != nil {
return nil, errors.Wrap(mperr, "mutable parameters")
}
return sm.indexBlobManagerV0
if mp.EpochParameters.Enabled {
return sm.indexBlobManagerV1, nil
}
return sm.indexBlobManagerV0, nil
}
func (sm *SharedManager) decryptContentAndVerify(payload gather.Bytes, bi Info, output *gather.WriteBuffer) error {
@@ -313,7 +323,12 @@ func (sm *SharedManager) IndexBlobs(ctx context.Context, includeInactive bool) (
return result, nil
}
blobs, _, err := sm.indexBlobManager().listActiveIndexBlobs(ctx)
ibm, err0 := sm.indexBlobManager()
if err0 != nil {
return nil, err0
}
blobs, _, err := ibm.listActiveIndexBlobs(ctx)
// nolint:wrapcheck
return blobs, err
@@ -450,7 +465,7 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca
log: sm.namedLogger("index-blob-manager"),
}
sm.indexBlobManagerV1.epochMgr = epoch.NewManager(cachedSt, sm.format, sm.indexBlobManagerV1.compactEpoch, sm.namedLogger("epoch-manager"), sm.timeNow)
sm.indexBlobManagerV1.epochMgr = epoch.NewManager(cachedSt, epochParameters{sm.format}, sm.indexBlobManagerV1.compactEpoch, sm.namedLogger("epoch-manager"), sm.timeNow)
// once everything is ready, set it up
sm.contentCache = dataCache
@@ -458,7 +473,7 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca
sm.indexBlobCache = indexBlobCache
sm.committedContents = newCommittedContentIndex(caching,
uint32(sm.format.Encryptor().Overhead()),
sm.format.WriteIndexVersion(),
sm.format,
sm.enc.getEncryptedBlob,
sm.namedLogger("committed-content-index"),
caching.MinIndexSweepAge.DurationOrDefault(DefaultIndexCacheSweepAge))
@@ -466,14 +481,32 @@ func (sm *SharedManager) setupReadManagerCaches(ctx context.Context, caching *Ca
return nil
}
// EpochManager returns the epoch manager.
func (sm *SharedManager) EpochManager() (*epoch.Manager, bool) {
ibm1, ok := sm.indexBlobManager().(*indexBlobManagerV1)
if !ok {
return nil, false
type epochParameters struct {
prov format.Provider
}
func (p epochParameters) GetParameters() (*epoch.Parameters, error) {
mp, mperr := p.prov.GetMutableParameters()
if mperr != nil {
return nil, errors.Wrap(mperr, "mutable parameters")
}
return ibm1.epochMgr, true
return &mp.EpochParameters, nil
}
// EpochManager returns the epoch manager.
func (sm *SharedManager) EpochManager() (*epoch.Manager, bool, error) {
ibm, err := sm.indexBlobManager()
if err != nil {
return nil, false, err
}
ibm1, ok := ibm.(*indexBlobManagerV1)
if !ok {
return nil, false, nil
}
return ibm1.epochMgr, true, nil
}
// AddRef adds a reference to shared manager to prevents its closing on Release().

View File

@@ -171,7 +171,12 @@ func decodePostamble(payload []byte) *packContentPostamble {
}
func (sm *SharedManager) buildLocalIndex(pending index.Builder, output *gather.WriteBuffer) error {
if err := pending.Build(output, sm.format.WriteIndexVersion()); err != nil {
mp, mperr := sm.format.GetMutableParameters()
if mperr != nil {
return errors.Wrap(mperr, "mutable parameters")
}
if err := pending.Build(output, mp.IndexVersion); err != nil {
return errors.Wrap(err, "unable to build local index")
}

View File

@@ -255,7 +255,7 @@ func (bm *WriteManager) maybeRetryWritingFailedPacksUnlocked(ctx context.Context
return nil
}
func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, data gather.Bytes, isDeleted bool, comp compression.HeaderID, previousWriteTime int64) error {
func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, data gather.Bytes, isDeleted bool, comp compression.HeaderID, previousWriteTime int64, mp format.MutableParameters) error {
// see if the current index is old enough to cause automatic flush.
if err := bm.maybeFlushBasedOnTimeUnlocked(ctx); err != nil {
return errors.Wrap(err, "unable to flush old pending writes")
@@ -267,7 +267,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat
defer compressedAndEncrypted.Close()
// encrypt and compress before taking lock
actualComp, err := bm.maybeCompressAndEncryptDataForPacking(data, contentID, comp, &compressedAndEncrypted)
actualComp, err := bm.maybeCompressAndEncryptDataForPacking(data, contentID, comp, &compressedAndEncrypted, mp)
if err != nil {
return errors.Wrapf(err, "unable to encrypt %q", contentID)
}
@@ -317,7 +317,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat
PackBlobID: pp.packBlobID,
PackOffset: uint32(pp.currentPackData.Length()),
TimestampSeconds: bm.contentWriteTime(previousWriteTime),
FormatVersion: byte(bm.format.FormatVersion()),
FormatVersion: byte(mp.Version),
OriginalLength: uint32(data.Length()),
}
@@ -331,7 +331,7 @@ func (bm *WriteManager) addToPackUnlocked(ctx context.Context, contentID ID, dat
pp.currentPackItems[contentID] = info
shouldWrite := pp.currentPackData.Length() >= bm.format.MaxPackBlobSize()
shouldWrite := pp.currentPackData.Length() >= mp.MaxPackSize
if shouldWrite {
// we're about to write to storage without holding a lock
// remove from pendingPacks so other goroutine tries to mess with this pending pack.
@@ -370,9 +370,9 @@ func (bm *WriteManager) EnableIndexFlush(ctx context.Context) {
}
// +checklocks:bm.mu
func (bm *WriteManager) verifyInvariantsLocked() {
func (bm *WriteManager) verifyInvariantsLocked(mp format.MutableParameters) {
bm.verifyCurrentPackItemsLocked()
bm.verifyPackIndexBuilderLocked()
bm.verifyPackIndexBuilderLocked(mp)
}
// +checklocks:bm.mu
@@ -391,7 +391,7 @@ func (bm *WriteManager) verifyCurrentPackItemsLocked() {
}
// +checklocks:bm.mu
func (bm *WriteManager) verifyPackIndexBuilderLocked() {
func (bm *WriteManager) verifyPackIndexBuilderLocked(mp format.MutableParameters) {
for k, cpi := range bm.packIndexBuilder {
bm.assertInvariant(cpi.GetContentID() == k, "content ID entry has invalid key: %v %v", cpi.GetContentID(), k)
@@ -399,7 +399,7 @@ func (bm *WriteManager) verifyPackIndexBuilderLocked() {
bm.assertInvariant(cpi.GetPackBlobID() == "", "content can't be both deleted and have a pack content: %v", cpi.GetContentID())
} else {
bm.assertInvariant(cpi.GetPackBlobID() != "", "content that's not deleted must have a pack content: %+v", cpi)
bm.assertInvariant(cpi.GetFormatVersion() == byte(bm.format.FormatVersion()), "content that's not deleted must have a valid format version: %+v", cpi)
bm.assertInvariant(cpi.GetFormatVersion() == byte(mp.Version), "content that's not deleted must have a valid format version: %+v", cpi)
}
bm.assertInvariant(cpi.GetTimestampSeconds() != 0, "content has no timestamp: %v", cpi.GetContentID())
@@ -423,8 +423,13 @@ func (bm *WriteManager) writeIndexBlobs(ctx context.Context, dataShards []gather
ctx, span := tracer.Start(ctx, "WriteIndexBlobs")
defer span.End()
ibm, err := bm.indexBlobManager()
if err != nil {
return nil, err
}
// nolint:wrapcheck
return bm.indexBlobManager().writeIndexBlobs(ctx, dataShards, sessionID)
return ibm.writeIndexBlobs(ctx, dataShards, sessionID)
}
// +checklocksread:bm.indexesLock
@@ -436,7 +441,7 @@ func (bm *WriteManager) addIndexBlob(ctx context.Context, indexBlobID blob.ID, d
}
// +checklocks:bm.mu
func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context) error {
func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context, mp format.MutableParameters) error {
ctx, span := tracer.Start(ctx, "FlushPackIndexes")
defer span.End()
@@ -447,7 +452,7 @@ func (bm *WriteManager) flushPackIndexesLocked(ctx context.Context) error {
if len(bm.packIndexBuilder) > 0 {
_, span2 := tracer.Start(ctx, "BuildShards")
dataShards, closeShards, err := bm.packIndexBuilder.BuildShards(bm.format.WriteIndexVersion(), true, bm.format.IndexShardSize())
dataShards, closeShards, err := bm.packIndexBuilder.BuildShards(mp.IndexVersion, true, defaultIndexShardSize)
span2.End()
@@ -596,6 +601,11 @@ func (bm *WriteManager) setFlushingLocked(v bool) {
// Any pending writes completed before Flush() has started are guaranteed to be committed to the
// repository before Flush() returns.
func (bm *WriteManager) Flush(ctx context.Context) error {
mp, mperr := bm.format.GetMutableParameters()
if mperr != nil {
return errors.Wrap(mperr, "mutable parameters")
}
bm.lock()
defer bm.unlock()
@@ -633,7 +643,7 @@ func (bm *WriteManager) Flush(ctx context.Context) error {
return errors.Wrap(err, "error writing pending content")
}
if err := bm.flushPackIndexesLocked(ctx); err != nil {
if err := bm.flushPackIndexesLocked(ctx, mp); err != nil {
return errors.Wrap(err, "error flushing indexes")
}
@@ -646,7 +656,12 @@ func (bm *WriteManager) Flush(ctx context.Context) error {
func (bm *WriteManager) RewriteContent(ctx context.Context, contentID ID) error {
bm.log.Debugf("rewrite-content %v", contentID)
return bm.rewriteContent(ctx, contentID, false)
mp, mperr := bm.format.GetMutableParameters()
if mperr != nil {
return errors.Wrap(mperr, "mutable parameters")
}
return bm.rewriteContent(ctx, contentID, false, mp)
}
func (bm *WriteManager) getContentDataAndInfo(ctx context.Context, contentID ID, output *gather.WriteBuffer) (Info, error) {
@@ -672,14 +687,19 @@ func (bm *WriteManager) getContentDataAndInfo(ctx context.Context, contentID ID,
func (bm *WriteManager) UndeleteContent(ctx context.Context, contentID ID) error {
bm.log.Debugf("UndeleteContent(%q)", contentID)
return bm.rewriteContent(ctx, contentID, true)
mp, mperr := bm.format.GetMutableParameters()
if mperr != nil {
return errors.Wrap(mperr, "mutable parameters")
}
return bm.rewriteContent(ctx, contentID, true, mp)
}
// When onlyRewriteDelete is true, the content is only rewritten if the existing
// content is marked as deleted. The new content is NOT marked deleted.
// When onlyRewriteDelete is false, the content is unconditionally rewritten
// and the content's deleted status is preserved.
func (bm *WriteManager) rewriteContent(ctx context.Context, contentID ID, onlyRewriteDeleted bool) error {
func (bm *WriteManager) rewriteContent(ctx context.Context, contentID ID, onlyRewriteDeleted bool, mp format.MutableParameters) error {
var data gather.WriteBuffer
defer data.Close()
@@ -698,7 +718,7 @@ func (bm *WriteManager) rewriteContent(ctx context.Context, contentID ID, onlyRe
isDeleted = false
}
return bm.addToPackUnlocked(ctx, contentID, data.Bytes(), isDeleted, bi.GetCompressionHeaderID(), bi.GetTimestampSeconds())
return bm.addToPackUnlocked(ctx, contentID, data.Bytes(), isDeleted, bi.GetCompressionHeaderID(), bi.GetTimestampSeconds(), mp)
}
func packPrefixForContentID(contentID ID) blob.ID {
@@ -747,13 +767,23 @@ func (bm *WriteManager) getOrCreatePendingPackInfoLocked(ctx context.Context, pr
}
// SupportsContentCompression returns true if content manager supports content-compression.
func (bm *WriteManager) SupportsContentCompression() bool {
return bm.format.WriteIndexVersion() >= index.Version2
func (bm *WriteManager) SupportsContentCompression() (bool, error) {
mp, mperr := bm.format.GetMutableParameters()
if mperr != nil {
return false, errors.Wrap(mperr, "mutable parameters")
}
return mp.IndexVersion >= index.Version2, nil
}
// WriteContent saves a given content of data to a pack group with a provided name and returns a contentID
// that's based on the contents of data written.
func (bm *WriteManager) WriteContent(ctx context.Context, data gather.Bytes, prefix index.IDPrefix, comp compression.HeaderID) (ID, error) {
mp, mperr := bm.format.GetMutableParameters()
if mperr != nil {
return EmptyID, errors.Wrap(mperr, "mutable parameters")
}
if err := bm.maybeRetryWritingFailedPacksUnlocked(ctx); err != nil {
return EmptyID, err
}
@@ -797,7 +827,7 @@ func (bm *WriteManager) WriteContent(ctx context.Context, data gather.Bytes, pre
bm.log.Debugf(logbuf.String())
return contentID, bm.addToPackUnlocked(ctx, contentID, data, false, comp, previousWriteTime)
return contentID, bm.addToPackUnlocked(ctx, contentID, data, false, comp, previousWriteTime, mp)
}
// GetContent gets the contents of a given content. If the content is not found returns ErrContentNotFound.
@@ -892,7 +922,10 @@ func (bm *WriteManager) lock() {
// +checklocksrelease:bm.mu
func (bm *WriteManager) unlock() {
if bm.checkInvariantsOnUnlock {
bm.verifyInvariantsLocked()
mp, mperr := bm.format.GetMutableParameters()
if mperr == nil {
bm.verifyInvariantsLocked(mp)
}
}
bm.mu.Unlock()

View File

@@ -38,11 +38,16 @@ func (sm *SharedManager) Refresh(ctx context.Context) error {
sm.log.Debugf("Refresh started")
sm.indexBlobManager().invalidate(ctx)
ibm, err := sm.indexBlobManager()
if err != nil {
return err
}
ibm.invalidate(ctx)
timer := timetrack.StartTimer()
err := sm.loadPackIndexesLocked(ctx)
err = sm.loadPackIndexesLocked(ctx)
sm.log.Debugf("Refresh completed in %v", timer.Elapsed())
return err
@@ -57,7 +62,12 @@ func (sm *SharedManager) CompactIndexes(ctx context.Context, opt CompactOptions)
sm.log.Debugf("CompactIndexes(%+v)", opt)
if err := sm.indexBlobManager().compact(ctx, opt); err != nil {
ibm, err := sm.indexBlobManager()
if err != nil {
return err
}
if err := ibm.compact(ctx, opt); err != nil {
return errors.Wrap(err, "error performing compaction")
}

View File

@@ -16,27 +16,28 @@
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/compression"
"github.com/kopia/kopia/repo/content/index"
"github.com/kopia/kopia/repo/format"
"github.com/kopia/kopia/repo/hashing"
"github.com/kopia/kopia/repo/logging"
)
const indexBlobCompactionWarningThreshold = 1000
func (sm *SharedManager) maybeCompressAndEncryptDataForPacking(data gather.Bytes, contentID ID, comp compression.HeaderID, output *gather.WriteBuffer) (compression.HeaderID, error) {
func (sm *SharedManager) maybeCompressAndEncryptDataForPacking(data gather.Bytes, contentID ID, comp compression.HeaderID, output *gather.WriteBuffer, mp format.MutableParameters) (compression.HeaderID, error) {
var hashOutput [hashing.MaxHashSize]byte
iv := getPackedContentIV(hashOutput[:0], contentID)
// If the content is prefixed (which represents Kopia's own metadata as opposed to user data),
// and we're on V2 format or greater, enable internal compression even when not requested.
if contentID.HasPrefix() && comp == NoCompression && sm.format.WriteIndexVersion() >= index.Version2 {
if contentID.HasPrefix() && comp == NoCompression && mp.IndexVersion >= index.Version2 {
// 'zstd-fastest' has a good mix of being fast, low memory usage and high compression for JSON.
comp = compression.HeaderZstdFastest
}
// nolint:nestif
if comp != NoCompression {
if sm.format.WriteIndexVersion() < index.Version2 {
if mp.IndexVersion < index.Version2 {
return NoCompression, errors.Errorf("compression is not enabled for this repository")
}

View File

@@ -1829,7 +1829,7 @@ func (s *contentManagerSuite) TestAutoCompressionOfMetadata(t *testing.T) {
info, err := bm.ContentInfo(ctx, contentID)
require.NoError(t, err)
if bm.SupportsContentCompression() {
if scc, _ := bm.SupportsContentCompression(); scc {
require.Equal(t, compression.HeaderZstdFastest, info.GetCompressionHeaderID())
} else {
require.Equal(t, NoCompression, info.GetCompressionHeaderID())
@@ -2030,7 +2030,6 @@ func (s *contentManagerSuite) TestCompression_Disabled(t *testing.T) {
indexVersion: index.Version1,
})
require.False(t, bm.SupportsContentCompression())
ctx := testlogging.Context(t)
compressibleData := bytes.Repeat([]byte{1, 2, 3, 4}, 1000)
@@ -2046,8 +2045,6 @@ func (s *contentManagerSuite) TestCompression_CompressibleData(t *testing.T) {
indexVersion: index.Version2,
})
require.True(t, bm.SupportsContentCompression())
ctx := testlogging.Context(t)
compressibleData := bytes.Repeat([]byte{1, 2, 3, 4}, 1000)
headerID := compression.ByName["gzip"].HeaderID()
@@ -2081,8 +2078,6 @@ func (s *contentManagerSuite) TestCompression_NonCompressibleData(t *testing.T)
indexVersion: index.Version2,
})
require.True(t, bm.SupportsContentCompression())
ctx := testlogging.Context(t)
nonCompressibleData := make([]byte, 65000)
headerID := compression.ByName["pgzip"].HeaderID()

View File

@@ -9,12 +9,12 @@
// Reader defines content read API.
type Reader interface {
SupportsContentCompression() bool
SupportsContentCompression() (bool, error)
ContentFormat() format.Provider
GetContent(ctx context.Context, id ID) ([]byte, error)
ContentInfo(ctx context.Context, id ID) (Info, error)
IterateContents(ctx context.Context, opts IterateOptions, callback IterateCallback) error
IteratePacks(ctx context.Context, opts IteratePackOptions, callback IteratePacksCallback) error
ListActiveSessions(ctx context.Context) (map[SessionID]*SessionInfo, error)
EpochManager() (*epoch.Manager, bool)
EpochManager() (*epoch.Manager, bool, error)
}

View File

@@ -11,6 +11,7 @@
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/content/index"
"github.com/kopia/kopia/repo/format"
"github.com/kopia/kopia/repo/logging"
)
@@ -20,6 +21,8 @@
const (
legacyIndexPoisonBlobID = "n00000000000000000000000000000000-repository_unreadable_by_this_kopia_version_upgrade_required"
defaultIndexShardSize = 16e6 // slightly less than 2^24, which lets index use 24-bit/3-byte indexes
defaultEventualConsistencySettleTime = 1 * time.Hour
compactionLogBlobPrefix = "m"
cleanupBlobPrefix = "l"
@@ -52,9 +55,7 @@ type cleanupEntry struct {
// IndexFormattingOptions provides options for formatting index blobs.
type IndexFormattingOptions interface {
MaxIndexBlobSize() int64
WriteIndexVersion() int
IndexShardSize() int
GetMutableParameters() (format.MutableParameters, error)
}
type indexBlobManagerV0 struct {
@@ -136,7 +137,12 @@ func (m *indexBlobManagerV0) compact(ctx context.Context, opt CompactOptions) er
return errors.Wrap(err, "error listing active index blobs")
}
blobsToCompact := m.getBlobsToCompact(indexBlobs, opt)
mp, mperr := m.formattingOptions.GetMutableParameters()
if mperr != nil {
return errors.Wrap(mperr, "mutable parameters")
}
blobsToCompact := m.getBlobsToCompact(indexBlobs, opt, mp)
if err := m.compactIndexBlobs(ctx, blobsToCompact, opt); err != nil {
return errors.Wrap(err, "error performing compaction")
@@ -416,22 +422,22 @@ func (m *indexBlobManagerV0) cleanup(ctx context.Context, maxEventualConsistency
return nil
}
func (m *indexBlobManagerV0) getBlobsToCompact(indexBlobs []IndexBlobInfo, opt CompactOptions) []IndexBlobInfo {
var nonCompactedBlobs, verySmallBlobs []IndexBlobInfo
var totalSizeNonCompactedBlobs, totalSizeVerySmallBlobs, totalSizeMediumSizedBlobs int64
var mediumSizedBlobCount int
func (m *indexBlobManagerV0) getBlobsToCompact(indexBlobs []IndexBlobInfo, opt CompactOptions, mp format.MutableParameters) []IndexBlobInfo {
var (
nonCompactedBlobs, verySmallBlobs []IndexBlobInfo
totalSizeNonCompactedBlobs, totalSizeVerySmallBlobs, totalSizeMediumSizedBlobs int64
mediumSizedBlobCount int
)
for _, b := range indexBlobs {
if b.Length > m.formattingOptions.MaxIndexBlobSize() && !opt.AllIndexes {
if b.Length > int64(mp.MaxPackSize) && !opt.AllIndexes {
continue
}
nonCompactedBlobs = append(nonCompactedBlobs, b)
totalSizeNonCompactedBlobs += b.Length
if b.Length < m.formattingOptions.MaxIndexBlobSize()/verySmallContentFraction {
if b.Length < int64(mp.MaxPackSize)/verySmallContentFraction {
verySmallBlobs = append(verySmallBlobs, b)
totalSizeVerySmallBlobs += b.Length
} else {
@@ -461,6 +467,11 @@ func (m *indexBlobManagerV0) compactIndexBlobs(ctx context.Context, indexBlobs [
return nil
}
mp, mperr := m.formattingOptions.GetMutableParameters()
if mperr != nil {
return errors.Wrap(mperr, "mutable parameters")
}
bld := make(index.Builder)
var inputs, outputs []blob.Metadata
@@ -479,7 +490,7 @@ func (m *indexBlobManagerV0) compactIndexBlobs(ctx context.Context, indexBlobs [
// we must do it after all input blobs have been merged, otherwise we may resurrect contents.
m.dropContentsFromBuilder(bld, opt)
dataShards, cleanupShards, err := bld.BuildShards(m.formattingOptions.WriteIndexVersion(), false, m.formattingOptions.IndexShardSize())
dataShards, cleanupShards, err := bld.BuildShards(mp.IndexVersion, false, defaultIndexShardSize)
if err != nil {
return errors.Wrap(err, "unable to build an index")
}

View File

@@ -68,7 +68,12 @@ func (m *indexBlobManagerV1) compactEpoch(ctx context.Context, blobIDs []blob.ID
}
}
dataShards, cleanupShards, err := tmpbld.BuildShards(m.formattingOptions.WriteIndexVersion(), true, m.formattingOptions.IndexShardSize())
mp, mperr := m.formattingOptions.GetMutableParameters()
if mperr != nil {
return errors.Wrap(mperr, "mutable parameters")
}
dataShards, cleanupShards, err := tmpbld.BuildShards(mp.IndexVersion, true, defaultIndexShardSize)
if err != nil {
return errors.Wrap(err, "unable to build index dataShards")
}

View File

@@ -42,8 +42,8 @@ func (f *ContentFormat) ResolveFormatVersion() error {
}
// GetMutableParameters implements FormattingOptionsProvider.
func (f *ContentFormat) GetMutableParameters() MutableParameters {
return f.MutableParameters
func (f *ContentFormat) GetMutableParameters() (MutableParameters, error) {
return f.MutableParameters, nil
}
// SupportsPasswordChange implements FormattingOptionsProvider.

View File

@@ -1,11 +1,8 @@
package format
import (
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/internal/epoch"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/repo/content/index"
"github.com/kopia/kopia/repo/encryption"
@@ -16,8 +13,6 @@
minValidPackSize = 10 << 20
maxValidPackSize = 120 << 20
defaultIndexShardSize = 16e6 // slightly less than 2^24, which lets index use 24-bit/3-byte indexes
// CurrentWriteVersion is the version of the repository applied to new repositories.
CurrentWriteVersion = FormatVersion3
@@ -51,23 +46,17 @@
// Provider provides current formatting options. The options returned
// should not be cached for more than a few seconds as they are subject to change.
type Provider interface {
epoch.ParametersProvider
MaxIndexBlobSize() int64
WriteIndexVersion() int
IndexShardSize() int
encryption.Parameters
hashing.Parameters
HashFunc() hashing.HashFunc
Encryptor() encryption.Encryptor
GetMutableParameters() MutableParameters
GetMasterKey() []byte
// this is typically cached, but sometimes refreshes MutableParameters from
// the repository so the results should not be cached.
GetMutableParameters() (MutableParameters, error)
SupportsPasswordChange() bool
FormatVersion() Version
MaxPackBlobSize() int
GetMasterKey() []byte
RepositoryFormatBytes() []byte
Struct() ContentFormat
}
@@ -75,55 +64,9 @@ type Provider interface {
type formattingOptionsProvider struct {
*ContentFormat
h hashing.HashFunc
e encryption.Encryptor
actualFormatVersion Version
actualIndexVersion int
formatBytes []byte
}
func (f *formattingOptionsProvider) FormatVersion() Version {
return f.Version
}
// whether epoch manager is enabled, must be true.
func (f *formattingOptionsProvider) GetEpochManagerEnabled() bool {
return f.EpochParameters.Enabled
}
// how frequently each client will list blobs to determine the current epoch.
func (f *formattingOptionsProvider) GetEpochRefreshFrequency() time.Duration {
return f.EpochParameters.EpochRefreshFrequency
}
// number of epochs between full checkpoints.
func (f *formattingOptionsProvider) GetEpochFullCheckpointFrequency() int {
return f.EpochParameters.FullCheckpointFrequency
}
// GetEpochCleanupSafetyMargin returns safety margin to prevent uncompacted blobs from being deleted if the corresponding compacted blob age is less than this.
func (f *formattingOptionsProvider) GetEpochCleanupSafetyMargin() time.Duration {
return f.EpochParameters.CleanupSafetyMargin
}
// GetMinEpochDuration returns the minimum duration of an epoch.
func (f *formattingOptionsProvider) GetMinEpochDuration() time.Duration {
return f.EpochParameters.MinEpochDuration
}
// GetEpochAdvanceOnCountThreshold returns the number of files above which epoch should be advanced.
func (f *formattingOptionsProvider) GetEpochAdvanceOnCountThreshold() int {
return f.EpochParameters.EpochAdvanceOnCountThreshold
}
// GetEpochAdvanceOnTotalSizeBytesThreshold returns the total size of files above which the epoch should be advanced.
func (f *formattingOptionsProvider) GetEpochAdvanceOnTotalSizeBytesThreshold() int64 {
return f.EpochParameters.EpochAdvanceOnTotalSizeBytesThreshold
}
// GetEpochDeleteParallelism returns the number of blobs to delete in parallel during cleanup.
func (f *formattingOptionsProvider) GetEpochDeleteParallelism() int {
return f.EpochParameters.DeleteParallelism
h hashing.HashFunc
e encryption.Encryptor
formatBytes []byte
}
func (f *formattingOptionsProvider) Struct() ContentFormat {
@@ -143,13 +86,12 @@ func NewFormattingOptionsProvider(f *ContentFormat, formatBytes []byte) (Provide
return nil, errors.Errorf("can't handle repositories created using version %v (min supported %v, max supported %v)", formatVersion, MinSupportedWriteVersion, MaxSupportedWriteVersion)
}
actualIndexVersion := f.IndexVersion
if actualIndexVersion == 0 {
actualIndexVersion = legacyIndexVersion
if f.IndexVersion == 0 {
f.IndexVersion = legacyIndexVersion
}
if actualIndexVersion < index.Version1 || actualIndexVersion > index.Version2 {
return nil, errors.Errorf("index version %v is not supported", actualIndexVersion)
if f.IndexVersion < index.Version1 || f.IndexVersion > index.Version2 {
return nil, errors.Errorf("index version %v is not supported", f.IndexVersion)
}
h, err := hashing.CreateHashFunc(f)
@@ -175,11 +117,9 @@ func NewFormattingOptionsProvider(f *ContentFormat, formatBytes []byte) (Provide
return &formattingOptionsProvider{
ContentFormat: f,
h: h,
e: e,
actualIndexVersion: actualIndexVersion,
actualFormatVersion: f.Version,
formatBytes: formatBytes,
h: h,
e: e,
formatBytes: formatBytes,
}, nil
}
@@ -191,26 +131,6 @@ func (f *formattingOptionsProvider) HashFunc() hashing.HashFunc {
return f.h
}
func (f *formattingOptionsProvider) WriteIndexVersion() int {
return f.actualIndexVersion
}
func (f *formattingOptionsProvider) MaxIndexBlobSize() int64 {
return int64(f.MaxPackSize)
}
func (f *formattingOptionsProvider) MaxPackBlobSize() int {
return f.MaxPackSize
}
func (f *formattingOptionsProvider) GetEpochManagerParameters() epoch.Parameters {
return f.EpochParameters
}
func (f *formattingOptionsProvider) IndexShardSize() int {
return defaultIndexShardSize
}
func (f *formattingOptionsProvider) RepositoryFormatBytes() []byte {
return f.formatBytes
}

View File

@@ -633,8 +633,8 @@ func (r *grpcInnerSession) GetContent(ctx context.Context, contentID content.ID)
return nil, errNoSessionResponse()
}
func (r *grpcRepositoryClient) SupportsContentCompression() bool {
return r.serverSupportsContentCompression
func (r *grpcRepositoryClient) SupportsContentCompression() (bool, error) {
return r.serverSupportsContentCompression, nil
}
func (r *grpcRepositoryClient) doWrite(ctx context.Context, contentID content.ID, data []byte, prefix content.IDPrefix, comp compression.HeaderID) error {

View File

@@ -136,8 +136,11 @@ func getContentToRewrite(ctx context.Context, rep repo.DirectRepository, opt *Re
// add all content IDs from short packs
if opt.ShortPacks {
threshold := int64(rep.ContentReader().ContentFormat().MaxPackBlobSize() * shortPackThresholdPercent / 100) //nolint:gomnd
findContentInShortPacks(ctx, rep, ch, threshold, opt)
mp, mperr := rep.ContentReader().ContentFormat().GetMutableParameters()
if mperr == nil {
threshold := int64(mp.MaxPackSize * shortPackThresholdPercent / 100) //nolint:gomnd
findContentInShortPacks(ctx, rep, ch, threshold, opt)
}
}
// add all blocks with given format version

View File

@@ -246,11 +246,16 @@ func Run(ctx context.Context, runParams RunParameters, safety SafetyParameters)
}
func runQuickMaintenance(ctx context.Context, runParams RunParameters, safety SafetyParameters) error {
if _, ok := runParams.rep.ContentManager().EpochManager(); ok {
_, ok, emerr := runParams.rep.ContentManager().EpochManager()
if ok {
log(ctx).Debugf("quick maintenance not required for epoch manager")
return nil
}
if emerr != nil {
return errors.Wrap(emerr, "epoch manager")
}
s, err := GetSchedule(ctx, runParams.rep)
if err != nil {
return errors.Wrap(err, "unable to get schedule")
@@ -321,7 +326,11 @@ func runTaskCleanupLogs(ctx context.Context, runParams RunParameters, s *Schedul
}
func runTaskCleanupEpochManager(ctx context.Context, runParams RunParameters, s *Schedule) error {
em, ok := runParams.rep.ContentManager().EpochManager()
em, ok, emerr := runParams.rep.ContentManager().EpochManager()
if emerr != nil {
return errors.Wrap(emerr, "epoch manager")
}
if !ok {
return nil
}

View File

@@ -62,7 +62,12 @@ func (s *Schedule) ReportRun(taskType TaskType, info RunInfo) {
}
func getAES256GCM(rep repo.DirectRepository) (cipher.AEAD, error) {
c, err := aes.NewCipher(rep.DeriveKey(maintenanceScheduleKeyPurpose, maintenanceScheduleKeySize))
key, err := rep.DeriveKey(maintenanceScheduleKeyPurpose, maintenanceScheduleKeySize)
if err != nil {
return nil, errors.Wrap(err, "derive key")
}
c, err := aes.NewCipher(key)
if err != nil {
return nil, errors.Wrap(err, "unable to create AES-256 cipher")
}

View File

@@ -34,7 +34,7 @@ type contentReader interface {
type contentManager interface {
contentReader
SupportsContentCompression() bool
SupportsContentCompression() (bool, error)
WriteContent(ctx context.Context, data gather.Bytes, prefix content.IDPrefix, comp compression.HeaderID) (content.ID, error)
}

View File

@@ -80,8 +80,8 @@ func (f *fakeContentManager) WriteContent(ctx context.Context, data gather.Bytes
return contentID, nil
}
func (f *fakeContentManager) SupportsContentCompression() bool {
return f.supportsContentCompression
func (f *fakeContentManager) SupportsContentCompression() (bool, error) {
return f.supportsContentCompression, nil
}
func (f *fakeContentManager) ContentInfo(ctx context.Context, contentID content.ID) (content.Info, error) {

View File

@@ -188,8 +188,13 @@ func (w *objectWriter) prepareAndWriteContentChunk(chunkID int, data gather.Byte
comp := content.NoCompression
objectComp := w.compressor
scc, err := w.om.contentMgr.SupportsContentCompression()
if err != nil {
return errors.Wrap(err, "supports content compression")
}
// do not compress in this layer, instead pass comp to the content manager.
if w.om.contentMgr.SupportsContentCompression() && w.compressor != nil {
if scc && w.compressor != nil {
comp = w.compressor.HeaderID()
objectComp = nil
}

View File

@@ -63,7 +63,7 @@ type DirectRepository interface {
AlsoLogToContentLog(ctx context.Context) context.Context
UniqueID() []byte
ConfigFilename() string
DeriveKey(purpose []byte, keyLength int) []byte
DeriveKey(purpose []byte, keyLength int) ([]byte, error)
Token(password string) (string, error)
Throttler() throttling.SettableThrottler
RequiredFeatures() ([]feature.Required, error)
@@ -111,15 +111,20 @@ type directRepository struct {
}
// DeriveKey derives encryption key of the provided length from the master key.
func (r *directRepository) DeriveKey(purpose []byte, keyLength int) []byte {
if r.cmgr.ContentFormat().SupportsPasswordChange() {
return format.DeriveKeyFromMasterKey(r.cmgr.ContentFormat().GetMasterKey(), r.uniqueID, purpose, keyLength)
func (r *directRepository) DeriveKey(purpose []byte, keyLength int) ([]byte, error) {
mp, mperr := r.cmgr.ContentFormat().GetMutableParameters()
if mperr != nil {
return nil, errors.Wrap(mperr, "mutable parameters")
}
if mp.Version >= format.FormatVersion2 {
return format.DeriveKeyFromMasterKey(r.cmgr.ContentFormat().GetMasterKey(), r.uniqueID, purpose, keyLength), nil
}
// version of kopia <v0.9 had a bug where certain keys were derived directly from
// the password and not from the random master key. This made it impossible to change
// password.
return format.DeriveKeyFromMasterKey(r.formatEncryptionKey, r.uniqueID, purpose, keyLength)
return format.DeriveKeyFromMasterKey(r.formatEncryptionKey, r.uniqueID, purpose, keyLength), nil
}
// ClientOptions returns client options.

View File

@@ -190,8 +190,10 @@ func TestFormatUpgradeMultipleLocksRollback(t *testing.T) {
env.MustReopen(t, func(opts *repo.Options) {
opts.UpgradeOwnerID = "another-upgrade-owner"
})
require.Equal(t, format.FormatVersion3,
env.RepositoryWriter.ContentManager().ContentFormat().FormatVersion())
mp, mperr := env.RepositoryWriter.ContentManager().ContentFormat().GetMutableParameters()
require.NoError(t, mperr)
require.Equal(t, format.FormatVersion3, mp.Version)
require.NoError(t, env.RepositoryWriter.RollbackUpgrade(ctx))
@@ -210,8 +212,10 @@ func TestFormatUpgradeMultipleLocksRollback(t *testing.T) {
require.EqualError(t, env.RepositoryWriter.CommitUpgrade(ctx), "no upgrade in progress")
// verify that we are back to the original version where we started from
require.Equal(t, format.FormatVersion1,
env.RepositoryWriter.ContentManager().ContentFormat().FormatVersion())
mp, err = env.RepositoryWriter.ContentManager().ContentFormat().GetMutableParameters()
require.NoError(t, err)
require.Equal(t, format.FormatVersion1, mp.Version)
}
func TestFormatUpgradeFailureToBackupFormatBlobOnLock(t *testing.T) {