feat(repository): Implement retention time extension on S3 buckets using Object Locks (#2179)

* Implement ability to extend retention time on S3 buckets using Object Locks
    * Move object-lock extension to maintenance.Params.
    * Use a default function for unsupported extensions instead of duplicating code
    * Fix potential lockup during object-lock extension
    * Fix race condition.  Add more code coverage
    * rebase to V3
* Add checks to prevent user from setting Retention Period  < Full Maintenance Interval

---------

Co-authored-by: Ashlie Martinez <ashmrtnz@alcion.ai>
This commit is contained in:
PhracturedBlue
2023-07-03 16:20:02 -07:00
committed by GitHub
parent a368a41973
commit 42aad38540
30 changed files with 768 additions and 43 deletions

View File

@@ -66,6 +66,12 @@ func (c *commandMaintenanceInfo) run(ctx context.Context, rep repo.DirectReposit
c.out.printStdout(" max age of logs: %v\n", cl.MaxAge)
c.out.printStdout(" max total size: %v\n", units.BytesString(cl.MaxTotalSize))
if p.ExtendObjectLocks {
c.out.printStdout("Object Lock Extension: enabled\n")
} else {
c.out.printStdout("Object Lock Extension: disabled\n")
}
c.out.printStdout("Recent Maintenance Runs:\n")
for run, timings := range s.Runs {

View File

@@ -23,6 +23,8 @@ type commandMaintenanceSet struct {
maxRetainedLogCount int
maxRetainedLogAge time.Duration
maxTotalRetainedLogSizeMB int64
extendObjectLocks []bool // optional boolean
}
func (c *commandMaintenanceSet) setup(svc appServices, parent commandParent) {
@@ -51,6 +53,7 @@ func (c *commandMaintenanceSet) setup(svc appServices, parent commandParent) {
cmd.Flag("max-retained-log-count", "Set maximum number of log sessions to retain").IntVar(&c.maxRetainedLogCount)
cmd.Flag("max-retained-log-age", "Set maximum age of log sessions to retain").DurationVar(&c.maxRetainedLogAge)
cmd.Flag("max-retained-log-size-mb", "Set maximum total size of log sessions").Int64Var(&c.maxTotalRetainedLogSizeMB)
cmd.Flag("extend-object-locks", "Extend retention period of locked objects as part of full maintenance.").BoolListVar(&c.extendObjectLocks)
cmd.Action(svc.directRepositoryWriteAction(c.run))
}
@@ -121,6 +124,22 @@ func (c *commandMaintenanceSet) setMaintenanceEnabledAndIntervalFromFlags(ctx co
}
}
func (c *commandMaintenanceSet) setMaintenanceObjectLockExtendFromFlags(ctx context.Context, p *maintenance.Params, changed *bool) {
// we use lists to distinguish between flag not set
// Zero elements == not set, more than zero - flag set, in which case we pick the last value
if len(c.extendObjectLocks) > 0 {
lastVal := c.extendObjectLocks[len(c.extendObjectLocks)-1]
p.ExtendObjectLocks = lastVal
*changed = true
if lastVal {
log(ctx).Info("Object Lock extension maintenance enabled.")
} else {
log(ctx).Info("Object Lock extension maintenance disabled.")
}
}
}
func (c *commandMaintenanceSet) run(ctx context.Context, rep repo.DirectRepositoryWriter) error {
p, err := maintenance.GetParams(ctx, rep)
if err != nil {
@@ -138,6 +157,7 @@ func (c *commandMaintenanceSet) run(ctx context.Context, rep repo.DirectReposito
c.setMaintenanceEnabledAndIntervalFromFlags(ctx, &p.QuickCycle, "quick", c.maintenanceSetEnableQuick, c.maintenanceSetQuickFrequency, &changedParams)
c.setMaintenanceEnabledAndIntervalFromFlags(ctx, &p.FullCycle, "full", c.maintenanceSetEnableFull, c.maintenanceSetFullFrequency, &changedParams)
c.setLogCleanupParametersFromFlags(ctx, p, &changedParams)
c.setMaintenanceObjectLockExtendFromFlags(ctx, p, &changedParams)
if pauseDuration := c.maintenanceSetPauseQuick; pauseDuration != -1 {
s.NextQuickMaintenanceTime = rep.Time().Add(pauseDuration)
@@ -157,6 +177,15 @@ func (c *commandMaintenanceSet) run(ctx context.Context, rep repo.DirectReposito
return errors.Errorf("no changes specified")
}
blobCfg, err := rep.FormatManager().BlobCfgBlob()
if err != nil {
return errors.Wrap(err, "blob configuration")
}
if err = maintenance.CheckExtendRetention(ctx, blobCfg, p); err != nil {
return errors.Wrap(err, "unable to apply maintenance changes")
}
if changedSchedule {
if err := maintenance.SetSchedule(ctx, rep, s); err != nil {
return errors.Wrap(err, "unable to set schedule")

View File

@@ -0,0 +1,83 @@
package cli_test
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/cli"
"github.com/kopia/kopia/internal/testutil"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/tests/testenv"
)
func TestMaintenanceSetExtendObjectLocks(t *testing.T) {
t.Parallel()
e := testenv.NewCLITest(t, testenv.RepoFormatNotImportant, testenv.NewInProcRunner(t))
defer e.RunAndExpectSuccess(t, "repo", "disconnect")
var mi cli.MaintenanceInfo
e.RunAndExpectSuccess(t, "repo", "create", "filesystem", "--path", e.RepoDir)
testutil.MustParseJSONLines(t, e.RunAndExpectSuccess(t, "maintenance", "info", "--json"), &mi)
require.False(t, mi.ExtendObjectLocks, "ExtendOjectLocks should not default to enabled.")
e.RunAndExpectSuccess(t, "maintenance", "set", "--extend-object-locks", "true")
testutil.MustParseJSONLines(t, e.RunAndExpectSuccess(t, "maintenance", "info", "--json"), &mi)
require.True(t, mi.ExtendObjectLocks, "ExtendOjectLocks should be enabled.")
e.RunAndExpectSuccess(t, "maintenance", "set", "--extend-object-locks", "false")
testutil.MustParseJSONLines(t, e.RunAndExpectSuccess(t, "maintenance", "info", "--json"), &mi)
require.False(t, mi.ExtendObjectLocks, "ExtendOjectLocks should be disabled.")
}
func (s *formatSpecificTestSuite) TestInvalidExtendRetainOptions(t *testing.T) {
var mi cli.MaintenanceInfo
var rs cli.RepositoryStatus
e := s.setupInMemoryRepo(t)
// set retention
e.RunAndExpectSuccess(t, "repository", "set-parameters", "--retention-mode", blob.Compliance.String(),
"--retention-period", "48h")
e.RunAndExpectSuccess(t, "maintenance", "set", "--full-interval", "24h01m")
// Cannot enable extend object locks when retention_period-full_maintenance_interval < 24h
e.RunAndExpectFailure(t, "maintenance", "set", "--extend-object-locks", "true")
testutil.MustParseJSONLines(t, e.RunAndExpectSuccess(t, "maintenance", "info", "--json"), &mi)
require.False(t, mi.ExtendObjectLocks, "ExtendOjectLocks should be disabled.")
// Enable extend object locks when retention_period-full_maintenance_interval > 24h
e.RunAndExpectSuccess(t, "maintenance", "set", "--full-interval", "23h59m")
e.RunAndExpectSuccess(t, "maintenance", "set", "--extend-object-locks", "true")
// Cannot change full_maintenance_interval when retention_period-full_maintenance_interval < 24h
e.RunAndExpectFailure(t, "maintenance", "set", "--full-interval", "24h01m")
testutil.MustParseJSONLines(t, e.RunAndExpectSuccess(t, "maintenance", "info", "--json"), &mi)
require.True(t, mi.ExtendObjectLocks, "ExtendOjectLocks should be enabled.")
require.True(t, mi.FullCycle.Interval == 86340000000000, "maintenance-interval should be unchanged.")
// Cannot change retention_period when retention_period-full_maintenance_interval < 24h
e.RunAndExpectFailure(t, "repository", "set-parameters", "--retention-period", "47h")
testutil.MustParseJSONLines(t, e.RunAndExpectSuccess(t, "repo", "status", "--json"), &rs)
require.True(t, rs.BlobRetention.RetentionPeriod == 172800000000000, "retention-interval should be unchanged.")
// Can change retention_period when retention_period-full_maintenance_interval > 24h
e.RunAndExpectSuccess(t, "repository", "set-parameters", "--retention-period", "49h")
testutil.MustParseJSONLines(t, e.RunAndExpectSuccess(t, "repo", "status", "--json"), &rs)
require.True(t, rs.BlobRetention.RetentionPeriod == 176400000000000, "retention-interval should be unchanged.")
}

View File

@@ -12,6 +12,7 @@
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/format"
"github.com/kopia/kopia/repo/maintenance"
)
type commandRepositorySetParameters struct {
@@ -230,6 +231,17 @@ func (c *commandRepositorySetParameters) run(ctx context.Context, rep repo.Direc
return errors.Errorf("no changes")
}
if blobcfg.IsRetentionEnabled() {
p, err := maintenance.GetParams(ctx, rep)
if err != nil {
return errors.Wrap(err, "unable to get current maintenance parameters")
}
if err := maintenance.CheckExtendRetention(ctx, blobcfg, p); err != nil {
return errors.Wrap(err, "unable to apply maintenance changes")
}
}
if err := updateRepositoryParameters(ctx, upgradeToEpochManager, mp, rep, blobcfg, requiredFeatures); err != nil {
return errors.Wrap(err, "error updating repository parameters")
}

View File

@@ -285,6 +285,10 @@ func (s *eventuallyConsistentStorage) FlushCaches(ctx context.Context) error {
return s.realStorage.FlushCaches(ctx)
}
func (s *eventuallyConsistentStorage) ExtendBlobRetention(ctx context.Context, b blob.ID, opts blob.ExtendOptions) error {
return s.realStorage.ExtendBlobRetention(ctx, b, opts)
}
// NewEventuallyConsistentStorage returns an eventually-consistent storage wrapper on top
// of provided storage.
func NewEventuallyConsistentStorage(st blob.Storage, listSettleTime time.Duration, timeNow func() time.Time) blob.Storage {

View File

@@ -123,4 +123,9 @@ func (s *FaultyStorage) FlushCaches(ctx context.Context) error {
return s.base.FlushCaches(ctx)
}
// ExtendBlobRetention implements blob.Storage.
func (s *FaultyStorage) ExtendBlobRetention(ctx context.Context, b blob.ID, opts blob.ExtendOptions) error {
return s.base.ExtendBlobRetention(ctx, b, opts)
}
var _ blob.Storage = (*FaultyStorage)(nil)

View File

@@ -18,6 +18,7 @@
type DataMap map[blob.ID][]byte
type mapStorage struct {
blob.UnsupportedBlobRetention
// +checklocks:mutex
data DataMap
// +checklocks:mutex

View File

@@ -187,6 +187,23 @@ func (s *objectLockingMap) DeleteBlob(ctx context.Context, id blob.ID) error {
return nil
}
// ExtendBlobRetention will alter the retention time on a blob if it exists.
func (s *objectLockingMap) ExtendBlobRetention(ctx context.Context, id blob.ID, opts blob.ExtendOptions) error {
s.mutex.Lock()
defer s.mutex.Unlock()
e, err := s.getLatestByID(id)
if err != nil {
return blob.ErrBlobNotFound
}
if !e.retentionTime.IsZero() {
e.retentionTime = e.mtime.Add(opts.RetentionPeriod)
}
return nil
}
// ListBlobs will return the list of all the objects except the ones which have
// a delete-marker as their latest version.
func (s *objectLockingMap) ListBlobs(ctx context.Context, prefix blob.ID, callback func(blob.Metadata) error) error {

View File

@@ -128,6 +128,18 @@ func VerifyStorage(ctx context.Context, t *testing.T, r blob.Storage, opts blob.
}
})
t.Run("ExtendBlobRetention", func(t *testing.T) {
err := r.ExtendBlobRetention(ctx, blocks[0].blk, blob.ExtendOptions{
RetentionMode: opts.RetentionMode,
RetentionPeriod: opts.RetentionPeriod,
})
if opts.RetentionMode != "" && err != nil {
t.Fatalf("No error expected during extend retention: %v", err)
} else if opts.RetentionMode == "" && err == nil {
t.Fatal("No error found when expected during extend retention")
}
})
t.Run("DeleteBlobsAndList", func(t *testing.T) {
require.NoError(t, r.DeleteBlob(ctx, blocks[0].blk))
require.NoError(t, r.DeleteBlob(ctx, blocks[0].blk))

View File

@@ -27,6 +27,7 @@
type azStorage struct {
Options
blob.UnsupportedBlobRetention
service *azblob.Client
container string

View File

@@ -26,6 +26,7 @@
type b2Storage struct {
Options
blob.UnsupportedBlobRetention
cli *backblaze.B2
bucket *backblaze.Bucket

View File

@@ -34,6 +34,7 @@
type fsStorage struct {
sharded.Storage
blob.UnsupportedBlobRetention
}
type fsImpl struct {
@@ -365,7 +366,7 @@ func New(ctx context.Context, opts *Options, isCreate bool) (blob.Storage, error
}
return &fsStorage{
sharded.New(&fsImpl{*opts, osi}, opts.Path, opts.Options, isCreate),
Storage: sharded.New(&fsImpl{*opts, osi}, opts.Path, opts.Options, isCreate),
}, nil
}

View File

@@ -32,6 +32,7 @@
type gcsStorage struct {
Options
blob.UnsupportedBlobRetention
storageClient *gcsclient.Client
bucket *gcsclient.BucketHandle

View File

@@ -45,6 +45,7 @@
type gdriveStorage struct {
Options
blob.UnsupportedBlobRetention
client *drive.FilesService
about *drive.AboutService

View File

@@ -211,6 +211,26 @@ func (s *loggingStorage) FlushCaches(ctx context.Context) error {
return err
}
func (s *loggingStorage) ExtendBlobRetention(ctx context.Context, b blob.ID, opts blob.ExtendOptions) error {
ctx, span := tracer.Start(ctx, "ExtendBlobRetention")
defer span.End()
s.beginConcurrency()
defer s.endConcurrency()
timer := timetrack.StartTimer()
err := s.base.ExtendBlobRetention(ctx, b, opts)
dt := timer.Elapsed()
s.logger.Debugw(s.prefix+"ExtendBlobRetention",
"blobID", b,
"error", err,
"duration", dt,
)
//nolint:wrapcheck
return err
}
func (s *loggingStorage) translateError(err error) interface{} {
if err == nil {
return nil

View File

@@ -15,6 +15,7 @@
// readonlyStorage prevents all mutations on the underlying storage.
type readonlyStorage struct {
base blob.Storage
blob.UnsupportedBlobRetention
}
func (s readonlyStorage) GetCapacity(ctx context.Context) (blob.Capacity, error) {

View File

@@ -231,6 +231,25 @@ func (s *s3Storage) DeleteBlob(ctx context.Context, b blob.ID) error {
return err
}
func (s *s3Storage) ExtendBlobRetention(ctx context.Context, b blob.ID, opts blob.ExtendOptions) error {
retentionMode := minio.RetentionMode(opts.RetentionMode)
if !retentionMode.IsValid() {
return errors.Errorf("invalid retention mode: %q", opts.RetentionMode)
}
retainUntilDate := clock.Now().Add(opts.RetentionPeriod).UTC()
err := s.cli.PutObjectRetention(ctx, s.BucketName, s.getObjectNameString(b), minio.PutObjectRetentionOptions{
RetainUntilDate: &retainUntilDate,
Mode: &retentionMode,
})
if err != nil {
return errors.Wrap(err, "unable to extend retention period")
}
return nil
}
func (s *s3Storage) getObjectNameString(b blob.ID) string {
return s.Prefix + string(b)
}

View File

@@ -39,6 +39,7 @@
// sftpStorage implements blob.Storage on top of sftp.
type sftpStorage struct {
sharded.Storage
blob.UnsupportedBlobRetention
}
type sftpImpl struct {
@@ -540,7 +541,7 @@ func New(ctx context.Context, opts *Options, isCreate bool) (blob.Storage, error
}
r := &sftpStorage{
sharded.New(impl, opts.Path, opts.Options, isCreate),
Storage: sharded.New(impl, opts.Path, opts.Options, isCreate),
}
impl.rec = connection.NewReconnector(impl)

View File

@@ -40,6 +40,10 @@
// implementation that does not support the intended functionality.
var ErrNotAVolume = errors.New("unsupported method, storage is not a volume")
// ErrUnsupportedObjectLock is returned when attempting to use an Object Lock specific
// function on a storage implementation that does not have the intended functionality.
var ErrUnsupportedObjectLock = errors.New("object locking unsupported")
// Bytes encapsulates a sequence of bytes, possibly stored in a non-contiguous buffers,
// which can be written sequentially or treated as a io.Reader.
type Bytes interface {
@@ -128,6 +132,20 @@ type PutOptions struct {
GetModTime *time.Time // if != nil, populate the value pointed at with the actual modification time
}
// ExtendOptions represents retention options for extending object locks.
type ExtendOptions struct {
RetentionMode RetentionMode
RetentionPeriod time.Duration
}
// UnsupportedBlobRetention provides a default implementation for ExtendBlobRetention.
type UnsupportedBlobRetention struct{}
// ExtendBlobRetention provides a common implementation for unsupported blob retention storage.
func (s *UnsupportedBlobRetention) ExtendBlobRetention(context.Context, ID, ExtendOptions) error {
return ErrUnsupportedObjectLock
}
// HasRetentionOptions returns true when blob-retention settings have been
// specified, otherwise returns false.
func (o PutOptions) HasRetentionOptions() bool {
@@ -161,6 +179,9 @@ type Storage interface {
// FlushCaches flushes any local caches associated with storage.
FlushCaches(ctx context.Context) error
// ExtendBlobRetention extends the retention time of a blob (when blob retention is enabled)
ExtendBlobRetention(ctx context.Context, blobID ID, opts ExtendOptions) error
}
// ID is a string that represents blob identifier.

View File

@@ -0,0 +1,138 @@
package blob_test
import (
"io"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/kopia/kopia/internal/cache"
"github.com/kopia/kopia/internal/faketime"
"github.com/kopia/kopia/internal/repotesting"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/encryption"
"github.com/kopia/kopia/repo/object"
)
var testHMACSecret = []byte{1, 2, 3}
var testMasterKey = []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
func (s *formatSpecificTestSuite) TestExtendBlobRetention(t *testing.T) {
// set up fake clock which is initially synchronized to wall clock time
// and moved at the same speed but which can be moved forward.
ta := faketime.NewClockTimeWithOffset(0)
ctx, env := repotesting.NewEnvironment(t, s.formatVersion, repotesting.Options{
OpenOptions: func(o *repo.Options) {
o.TimeNowFunc = ta.NowFunc()
},
NewRepositoryOptions: func(nro *repo.NewRepositoryOptions) {
nro.BlockFormat.Encryption = encryption.DefaultAlgorithm
nro.BlockFormat.MasterKey = testMasterKey
nro.BlockFormat.Hash = "HMAC-SHA256"
nro.BlockFormat.HMACSecret = testHMACSecret
nro.RetentionMode = blob.Governance
nro.RetentionPeriod = time.Hour * 24
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
io.WriteString(w, "hello world!")
w.Result()
w.Close()
env.RepositoryWriter.Flush(ctx)
blobsBefore, err := blob.ListAllBlobs(ctx, env.RepositoryWriter.BlobStorage(), "")
if err != nil {
t.Fatal(err)
}
if got, want := len(blobsBefore), 4; got != want {
t.Fatalf("unexpected number of blobs after writing: %v", blobsBefore)
}
lastBlobIdx := len(blobsBefore) - 1
st := env.RootStorage().(cache.Storage)
// Verify that file is locked
_, err = st.TouchBlob(ctx, blobsBefore[lastBlobIdx].BlobID, time.Hour)
assert.EqualErrorf(t, err, "cannot alter object before retention period expires", "Altering locked object should fail")
ta.Advance(7 * 24 * time.Hour)
// Verify that file is unlocked
_, err = st.TouchBlob(ctx, blobsBefore[lastBlobIdx].BlobID, time.Hour)
if err != nil {
t.Fatalf("Altering expired object failed")
}
// Relock blob
err = env.RepositoryWriter.BlobStorage().ExtendBlobRetention(ctx, blobsBefore[lastBlobIdx].BlobID, blob.ExtendOptions{
RetentionMode: blob.Governance,
RetentionPeriod: 2 * time.Hour,
})
if err != nil {
t.Fatalf("Extending Retention time failed, got err: %v", err)
}
// Verify Lock period
ta.Advance(1 * time.Hour)
_, err = st.TouchBlob(ctx, blobsBefore[lastBlobIdx].BlobID, time.Hour)
assert.EqualErrorf(t, err, "cannot alter object before retention period expires", "Altering locked object should fail")
ta.Advance(2 * time.Hour)
_, err = st.TouchBlob(ctx, blobsBefore[lastBlobIdx].BlobID, time.Hour)
if err != nil {
t.Fatalf("Altering expired object failed")
}
}
func (s *formatSpecificTestSuite) TestExtendBlobRetentionUnsupported(t *testing.T) {
// set up fake clock which is initially synchronized to wall clock time
// and moved at the same speed but which can be moved forward.
ta := faketime.NewClockTimeWithOffset(0)
ctx, env := repotesting.NewEnvironment(t, s.formatVersion, repotesting.Options{
OpenOptions: func(o *repo.Options) {
o.TimeNowFunc = ta.NowFunc()
},
NewRepositoryOptions: func(nro *repo.NewRepositoryOptions) {
nro.BlockFormat.Encryption = encryption.DefaultAlgorithm
nro.BlockFormat.MasterKey = testMasterKey
nro.BlockFormat.Hash = "HMAC-SHA256"
nro.BlockFormat.HMACSecret = testHMACSecret
// Ensure retention is disabled to trigger ExtendBlobRetention unsupported
nro.RetentionPeriod = 0
nro.RetentionMode = ""
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
io.WriteString(w, "hello world!")
w.Result()
w.Close()
env.RepositoryWriter.Flush(ctx)
blobsBefore, err := blob.ListAllBlobs(ctx, env.RepositoryWriter.BlobStorage(), "")
if err != nil {
t.Fatal(err)
}
if got, want := len(blobsBefore), 4; got != want {
t.Fatalf("unexpected number of blobs after writing: %v", blobsBefore)
}
lastBlobIdx := len(blobsBefore) - 1
// Extend retention time
err = env.RepositoryWriter.BlobStorage().ExtendBlobRetention(ctx, blobsBefore[lastBlobIdx].BlobID, blob.ExtendOptions{
RetentionMode: blob.Governance,
RetentionPeriod: 2 * time.Hour,
})
assert.EqualErrorf(t, err, "object locking unsupported", "Storage should not support ExtendBlobRetention")
}

View File

@@ -18,24 +18,26 @@ type blobMetrics struct {
uploadedBytes *metrics.Counter
listBlobItems *metrics.Counter
getBlobPartialDuration *metrics.Distribution[time.Duration]
getBlobFullDuration *metrics.Distribution[time.Duration]
putBlobDuration *metrics.Distribution[time.Duration]
getCapacityDuration *metrics.Distribution[time.Duration]
getMetadataDuration *metrics.Distribution[time.Duration]
deleteBlobDuration *metrics.Distribution[time.Duration]
listBlobsDuration *metrics.Distribution[time.Duration]
closeDuration *metrics.Distribution[time.Duration]
flushCachesDuration *metrics.Distribution[time.Duration]
getBlobPartialDuration *metrics.Distribution[time.Duration]
getBlobFullDuration *metrics.Distribution[time.Duration]
putBlobDuration *metrics.Distribution[time.Duration]
getCapacityDuration *metrics.Distribution[time.Duration]
getMetadataDuration *metrics.Distribution[time.Duration]
deleteBlobDuration *metrics.Distribution[time.Duration]
extendBlobRetentionDuration *metrics.Distribution[time.Duration]
listBlobsDuration *metrics.Distribution[time.Duration]
closeDuration *metrics.Distribution[time.Duration]
flushCachesDuration *metrics.Distribution[time.Duration]
getBlobErrors *metrics.Counter
getCapacityErrors *metrics.Counter
getMetadataErrors *metrics.Counter
putBlobErrors *metrics.Counter
deleteBlobErrors *metrics.Counter
listBlobsErrors *metrics.Counter
closeErrors *metrics.Counter
flushCachesErrors *metrics.Counter
getBlobErrors *metrics.Counter
getCapacityErrors *metrics.Counter
getMetadataErrors *metrics.Counter
putBlobErrors *metrics.Counter
deleteBlobErrors *metrics.Counter
extendBlobRetentionErrors *metrics.Counter
listBlobsErrors *metrics.Counter
closeErrors *metrics.Counter
flushCachesErrors *metrics.Counter
}
func (s *blobMetrics) GetBlob(ctx context.Context, id blob.ID, offset, length int64, output blob.OutputBuffer) error {
@@ -121,6 +123,21 @@ func (s *blobMetrics) DeleteBlob(ctx context.Context, id blob.ID) error {
return err
}
func (s *blobMetrics) ExtendBlobRetention(ctx context.Context, id blob.ID, opts blob.ExtendOptions) error {
timer := timetrack.StartTimer()
err := s.base.ExtendBlobRetention(ctx, id, opts)
dt := timer.Elapsed()
s.extendBlobRetentionDuration.Observe(dt)
if err != nil {
s.extendBlobRetentionErrors.Add(1)
}
//nolint:wrapcheck
return err
}
func (s *blobMetrics) ListBlobs(ctx context.Context, prefix blob.ID, callback func(blob.Metadata) error) error {
timer := timetrack.StartTimer()
cnt := int64(0)

24
repo/blob/suite_test.go Normal file
View File

@@ -0,0 +1,24 @@
package blob_test
import (
"testing"
"github.com/kopia/kopia/internal/testutil"
"github.com/kopia/kopia/repo/format"
)
type formatSpecificTestSuite struct {
formatVersion format.Version
}
func TestFormatV1(t *testing.T) {
testutil.RunAllTestsWithParam(t, &formatSpecificTestSuite{format.FormatVersion1})
}
func TestFormatV2(t *testing.T) {
testutil.RunAllTestsWithParam(t, &formatSpecificTestSuite{format.FormatVersion2})
}
func TestFormatV3(t *testing.T) {
testutil.RunAllTestsWithParam(t, &formatSpecificTestSuite{format.FormatVersion3})
}

View File

@@ -14,11 +14,12 @@
// operations supported.
const (
operationGetBlob = "GetBlob"
operationGetMetadata = "GetMetadata"
operationListBlobs = "ListBlobs"
operationPutBlob = "PutBlob"
operationDeleteBlob = "DeleteBlob"
operationGetBlob = "GetBlob"
operationGetMetadata = "GetMetadata"
operationListBlobs = "ListBlobs"
operationPutBlob = "PutBlob"
operationDeleteBlob = "DeleteBlob"
operationExtendBlobRetention = "ExtendBlobRetention"
)
// Throttler implements throttling policy by blocking before certain operations are
@@ -104,6 +105,13 @@ func (s *throttlingStorage) DeleteBlob(ctx context.Context, id blob.ID) error {
return s.Storage.DeleteBlob(ctx, id) //nolint:wrapcheck
}
func (s *throttlingStorage) ExtendBlobRetention(ctx context.Context, id blob.ID, opts blob.ExtendOptions) error {
s.throttler.BeforeOperation(ctx, operationExtendBlobRetention)
defer s.throttler.AfterOperation(ctx, operationExtendBlobRetention)
return s.Storage.ExtendBlobRetention(ctx, id, opts) //nolint:wrapcheck
}
// NewWrapper returns a Storage wrapper that adds retry loop around all operations of the underlying storage.
func NewWrapper(wrapped blob.Storage, throttler Throttler) blob.Storage {
return &throttlingStorage{wrapped, throttler}

View File

@@ -36,6 +36,7 @@
// may be accessed using WebDAV or File interchangeably.
type davStorage struct {
sharded.Storage
blob.UnsupportedBlobRetention
}
type davStorageImpl struct {
@@ -277,7 +278,7 @@ func New(ctx context.Context, opts *Options, isCreate bool) (blob.Storage, error
}
s := retrying.NewWrapper(&davStorage{
sharded.New(&davStorageImpl{
Storage: sharded.New(&davStorageImpl{
Options: *opts,
cli: cli,
}, "", opts.Options, isCreate),

22
repo/locking_storage.go Normal file
View File

@@ -0,0 +1,22 @@
package repo
import (
"github.com/kopia/kopia/internal/epoch"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/content/indexblob"
"github.com/kopia/kopia/repo/format"
)
// GetLockingStoragePrefixes Return all prefixes that may be maintained by Object Locking.
func GetLockingStoragePrefixes() []string {
var prefixes []string
// collect prefixes that need to be locked on put
for _, prefix := range content.PackBlobIDPrefixes {
prefixes = append(prefixes, string(prefix))
}
prefixes = append(prefixes, indexblob.V0IndexBlobPrefix, epoch.EpochManagerIndexUberPrefix, format.KopiaRepositoryBlobID,
format.KopiaBlobCfgBlobID)
return prefixes
}

View File

@@ -0,0 +1,140 @@
package maintenance
import (
"context"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/format"
)
const parallelBlobRetainCPUMultiplier = 2
const minRetentionMaintenanceDiff = time.Duration(24) * time.Hour
// ExtendBlobRetentionTimeOptions provides options for extending blob retention algorithm.
type ExtendBlobRetentionTimeOptions struct {
Parallel int
DryRun bool
}
// ExtendBlobRetentionTime extends the retention time of all relevant blobs managed by storage engine with Object Locking enabled.
func ExtendBlobRetentionTime(ctx context.Context, rep repo.DirectRepositoryWriter, opt ExtendBlobRetentionTimeOptions) (int, error) {
const extendQueueSize = 100
var (
wg sync.WaitGroup
prefixes []blob.ID
cnt = new(uint32)
toExtend = new(uint32)
failedCnt = new(uint32)
)
if opt.Parallel == 0 {
opt.Parallel = runtime.NumCPU() * parallelBlobRetainCPUMultiplier
}
blobCfg, err := rep.FormatManager().BlobCfgBlob()
if err != nil {
return 0, errors.Wrap(err, "blob configuration")
}
if !blobCfg.IsRetentionEnabled() {
// Blob retention is disabled
log(ctx).Info("Object lock retention is disabled.")
return 0, nil
}
extend := make(chan blob.Metadata, extendQueueSize)
extendOpts := blob.ExtendOptions{
RetentionMode: blobCfg.RetentionMode,
RetentionPeriod: blobCfg.RetentionPeriod,
}
if !opt.DryRun {
// start goroutines to extend blob retention as they come.
for i := 0; i < opt.Parallel; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for bm := range extend {
if err1 := rep.BlobStorage().ExtendBlobRetention(ctx, bm.BlobID, extendOpts); err1 != nil {
log(ctx).Errorf("Failed to extend blob %v: %v", bm.BlobID, err1)
atomic.AddUint32(failedCnt, 1)
continue
}
curCnt := atomic.AddUint32(cnt, 1)
if curCnt%100 == 0 {
log(ctx).Infof(" extended %v blobs", curCnt)
}
}
}()
}
}
// Convert prefixes from string to BlobID.
for _, pfx := range repo.GetLockingStoragePrefixes() {
prefixes = append(prefixes, blob.ID(pfx))
}
// iterate all relevant (active, extendable) blobs and count them + optionally send to the channel to be extended
log(ctx).Infof("Extending retention time for blobs...")
err = blob.IterateAllPrefixesInParallel(ctx, opt.Parallel, rep.BlobStorage(), prefixes, func(bm blob.Metadata) error {
if !opt.DryRun {
extend <- bm
}
atomic.AddUint32(toExtend, 1)
return nil
})
close(extend)
log(ctx).Infof("Found %v blobs to extend", *toExtend)
// wait for all extend workers to finish.
wg.Wait()
if *failedCnt > 0 {
return 0, errors.Errorf("Failed to extend %v blobs", *failedCnt)
}
if err != nil {
return 0, errors.Wrap(err, "error iterating packs")
}
if opt.DryRun {
return int(*toExtend), nil
}
log(ctx).Infof("Extended total %v blobs", *cnt)
return int(*cnt), nil
}
// CheckExtendRetention verifies if extension can be enabled due to maintenance and blob parameters.
func CheckExtendRetention(ctx context.Context, blobCfg format.BlobStorageConfiguration, p *Params) error {
if !p.ExtendObjectLocks {
return nil
}
if !p.FullCycle.Enabled {
log(ctx).Warn("Object Lock extension will not function because Full-Maintenance is disabled")
}
if blobCfg.RetentionPeriod > 0 && blobCfg.RetentionPeriod-p.FullCycle.Interval < minRetentionMaintenanceDiff {
return errors.Errorf("The repo RetentionPeriod must be %v greater than the Full Maintenance interval %v %v", minRetentionMaintenanceDiff, blobCfg.RetentionPeriod, p.FullCycle.Interval)
}
return nil
}

View File

@@ -0,0 +1,123 @@
package maintenance_test
import (
"io"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/kopia/kopia/internal/cache"
"github.com/kopia/kopia/internal/faketime"
"github.com/kopia/kopia/internal/repotesting"
"github.com/kopia/kopia/repo"
"github.com/kopia/kopia/repo/blob"
"github.com/kopia/kopia/repo/encryption"
"github.com/kopia/kopia/repo/maintenance"
"github.com/kopia/kopia/repo/object"
)
const blockFormatHash = "HMAC-SHA256"
func (s *formatSpecificTestSuite) TestExtendBlobRetentionTime(t *testing.T) {
// set up fake clock which is initially synchronized to wall clock time
// and moved at the same speed but which can be moved forward.
ta := faketime.NewClockTimeWithOffset(0)
ctx, env := repotesting.NewEnvironment(t, s.formatVersion, repotesting.Options{
OpenOptions: func(o *repo.Options) {
o.TimeNowFunc = ta.NowFunc()
},
NewRepositoryOptions: func(nro *repo.NewRepositoryOptions) {
nro.BlockFormat.Encryption = encryption.DefaultAlgorithm
nro.BlockFormat.MasterKey = testMasterKey
nro.BlockFormat.Hash = blockFormatHash
nro.BlockFormat.HMACSecret = testHMACSecret
nro.RetentionMode = blob.Governance
nro.RetentionPeriod = time.Hour * 24
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
io.WriteString(w, "hello world!")
w.Result()
w.Close()
env.RepositoryWriter.Flush(ctx)
blobsBefore, err := blob.ListAllBlobs(ctx, env.RepositoryWriter.BlobStorage(), "")
if err != nil {
t.Fatal(err)
}
if got, want := len(blobsBefore), 4; got != want {
t.Fatalf("unexpected number of blobs after writing: %v", blobsBefore)
}
lastBlobIdx := len(blobsBefore) - 1
st := env.RootStorage().(cache.Storage)
ta.Advance(7 * 24 * time.Hour)
if _, err = st.TouchBlob(ctx, blobsBefore[lastBlobIdx].BlobID, time.Hour); err != nil {
t.Fatalf("Altering expired object failed")
}
// extend retention time of all blobs
if _, err = maintenance.ExtendBlobRetentionTime(ctx, env.RepositoryWriter, maintenance.ExtendBlobRetentionTimeOptions{}); err != nil {
t.Fatal(err)
}
_, err = st.TouchBlob(ctx, blobsBefore[lastBlobIdx].BlobID, time.Hour)
assert.EqualErrorf(t, err, "cannot alter object before retention period expires", "Altering locked object should fail")
}
func (s *formatSpecificTestSuite) TestExtendBlobRetentionTimeDisabled(t *testing.T) {
// set up fake clock which is initially synchronized to wall clock time
// and moved at the same speed but which can be moved forward.
ta := faketime.NewClockTimeWithOffset(0)
ctx, env := repotesting.NewEnvironment(t, s.formatVersion, repotesting.Options{
OpenOptions: func(o *repo.Options) {
o.TimeNowFunc = ta.NowFunc()
},
NewRepositoryOptions: func(nro *repo.NewRepositoryOptions) {
nro.BlockFormat.Encryption = encryption.DefaultAlgorithm
nro.BlockFormat.MasterKey = testMasterKey
nro.BlockFormat.Hash = blockFormatHash
nro.BlockFormat.HMACSecret = testHMACSecret
},
})
w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{})
io.WriteString(w, "hello world!")
w.Result()
w.Close()
env.RepositoryWriter.Flush(ctx)
blobsBefore, err := blob.ListAllBlobs(ctx, env.RepositoryWriter.BlobStorage(), "")
if err != nil {
t.Fatal(err)
}
if got, want := len(blobsBefore), 4; got != want {
t.Fatalf("unexpected number of blobs after writing: %v", blobsBefore)
}
lastBlobIdx := len(blobsBefore) - 1
st := env.RootStorage().(cache.Storage)
ta.Advance(7 * 24 * time.Hour)
if _, err = st.TouchBlob(ctx, blobsBefore[lastBlobIdx].BlobID, time.Hour); err != nil {
t.Fatalf("Altering expired object failed")
}
// extend retention time of all blobs
if _, err = maintenance.ExtendBlobRetentionTime(ctx, env.RepositoryWriter, maintenance.ExtendBlobRetentionTimeOptions{}); err != nil {
t.Fatal(err)
}
if _, err = st.TouchBlob(ctx, blobsBefore[lastBlobIdx].BlobID, time.Hour); err != nil {
t.Fatalf("Altering expired object failed")
}
}

View File

@@ -23,6 +23,8 @@ type Params struct {
FullCycle CycleParams `json:"full"`
LogRetention LogRetentionOptions `json:"logRetention"`
ExtendObjectLocks bool `json:"extendObjectLocks"`
}
func (p *Params) isOwnedByByThisUser(rep repo.Repository) bool {
@@ -41,6 +43,11 @@ func DefaultParams() Params {
Interval: 1 * time.Hour,
},
LogRetention: defaultLogRetention(),
// Don't attempt to extend object locks by default. This option may not be
// supported by all storage providers or blob implementations (currently
// supported by S3 backend) and may cause data to be kept longer than
// desired if the retention period is relatively long.
ExtendObjectLocks: false,
}
}

View File

@@ -36,15 +36,16 @@
// Task IDs.
const (
TaskSnapshotGarbageCollection = "snapshot-gc"
TaskDeleteOrphanedBlobsQuick = "quick-delete-blobs"
TaskDeleteOrphanedBlobsFull = "full-delete-blobs"
TaskRewriteContentsQuick = "quick-rewrite-contents"
TaskRewriteContentsFull = "full-rewrite-contents"
TaskDropDeletedContentsFull = "full-drop-deleted-content"
TaskIndexCompaction = "index-compaction"
TaskCleanupLogs = "cleanup-logs"
TaskCleanupEpochManager = "cleanup-epoch-manager"
TaskSnapshotGarbageCollection = "snapshot-gc"
TaskDeleteOrphanedBlobsQuick = "quick-delete-blobs"
TaskDeleteOrphanedBlobsFull = "full-delete-blobs"
TaskRewriteContentsQuick = "quick-rewrite-contents"
TaskRewriteContentsFull = "full-rewrite-contents"
TaskDropDeletedContentsFull = "full-drop-deleted-content"
TaskIndexCompaction = "index-compaction"
TaskExtendBlobRetentionTimeFull = "extend-blob-retention-time"
TaskCleanupLogs = "cleanup-logs"
TaskCleanupEpochManager = "cleanup-epoch-manager"
)
// shouldRun returns Mode if repository is due for periodic maintenance.
@@ -400,6 +401,13 @@ func runTaskDeleteOrphanedBlobsQuick(ctx context.Context, runParams RunParameter
})
}
func runTaskExtendBlobRetentionTimeFull(ctx context.Context, runParams RunParameters, s *Schedule) error {
return ReportRun(ctx, runParams.rep, TaskExtendBlobRetentionTimeFull, s, func() error {
_, err := ExtendBlobRetentionTime(ctx, runParams.rep, ExtendBlobRetentionTimeOptions{})
return err
})
}
func runFullMaintenance(ctx context.Context, runParams RunParameters, safety SafetyParameters) error {
s, err := GetSchedule(ctx, runParams.rep)
if err != nil {
@@ -431,6 +439,15 @@ func runFullMaintenance(ctx context.Context, runParams RunParameters, safety Saf
notDeletingOrphanedBlobs(ctx, s, safety)
}
// extend retention-time on supported storage.
if runParams.Params.ExtendObjectLocks {
if err := runTaskExtendBlobRetentionTimeFull(ctx, runParams, s); err != nil {
return errors.Wrap(err, "error extending object lock retention time")
}
} else {
log(ctx).Debug("Extending object lock retention-period is disabled.")
}
if err := runTaskCleanupLogs(ctx, runParams, s); err != nil {
return errors.Wrap(err, "error cleaning up logs")
}

View File

@@ -14,7 +14,6 @@
"github.com/kopia/kopia/internal/cache"
"github.com/kopia/kopia/internal/cacheprot"
"github.com/kopia/kopia/internal/epoch"
"github.com/kopia/kopia/internal/feature"
"github.com/kopia/kopia/internal/metrics"
"github.com/kopia/kopia/internal/retry"
@@ -25,7 +24,6 @@
"github.com/kopia/kopia/repo/blob/storagemetrics"
"github.com/kopia/kopia/repo/blob/throttling"
"github.com/kopia/kopia/repo/content"
"github.com/kopia/kopia/repo/content/indexblob"
"github.com/kopia/kopia/repo/format"
"github.com/kopia/kopia/repo/logging"
"github.com/kopia/kopia/repo/manifest"
@@ -394,13 +392,7 @@ func handleMissingRequiredFeatures(ctx context.Context, fmgr *format.Manager, ig
func wrapLockingStorage(st blob.Storage, r format.BlobStorageConfiguration) blob.Storage {
// collect prefixes that need to be locked on put
var prefixes []string
for _, prefix := range content.PackBlobIDPrefixes {
prefixes = append(prefixes, string(prefix))
}
prefixes = append(prefixes, indexblob.V0IndexBlobPrefix, epoch.EpochManagerIndexUberPrefix, format.KopiaRepositoryBlobID,
format.KopiaBlobCfgBlobID)
prefixes := GetLockingStoragePrefixes()
return beforeop.NewWrapper(st, nil, nil, nil, func(ctx context.Context, id blob.ID, opts *blob.PutOptions) error {
for _, prefix := range prefixes {