From 42aad38540476558bf4ec36e4a99787bbf2c51b5 Mon Sep 17 00:00:00 2001 From: PhracturedBlue Date: Mon, 3 Jul 2023 16:20:02 -0700 Subject: [PATCH] feat(repository): Implement retention time extension on S3 buckets using Object Locks (#2179) * Implement ability to extend retention time on S3 buckets using Object Locks * Move object-lock extension to maintenance.Params. * Use a default function for unsupported extensions instead of duplicating code * Fix potential lockup during object-lock extension * Fix race condition. Add more code coverage * rebase to V3 * Add checks to prevent user from setting Retention Period < Full Maintenance Interval --------- Co-authored-by: Ashlie Martinez --- cli/command_maintenance_info.go | 6 + cli/command_maintenance_set.go | 29 ++++ cli/command_maintenance_set_test.go | 83 +++++++++++ cli/command_repository_set_parameters.go | 12 ++ internal/blobtesting/eventually_consistent.go | 4 + internal/blobtesting/faulty.go | 5 + internal/blobtesting/map.go | 1 + internal/blobtesting/object_locking_map.go | 17 +++ internal/blobtesting/verify.go | 12 ++ repo/blob/azure/azure_storage.go | 1 + repo/blob/b2/b2_storage.go | 1 + repo/blob/filesystem/filesystem_storage.go | 3 +- repo/blob/gcs/gcs_storage.go | 1 + repo/blob/gdrive/gdrive_storage.go | 1 + repo/blob/logging/logging_storage.go | 20 +++ repo/blob/readonly/readonly_storage.go | 1 + repo/blob/s3/s3_storage.go | 19 +++ repo/blob/sftp/sftp_storage.go | 3 +- repo/blob/storage.go | 21 +++ repo/blob/storage_extend_test.go | 138 +++++++++++++++++ repo/blob/storagemetrics/storage_metrics.go | 51 ++++--- repo/blob/suite_test.go | 24 +++ repo/blob/throttling/throttling_storage.go | 18 ++- repo/blob/webdav/webdav_storage.go | 3 +- repo/locking_storage.go | 22 +++ repo/maintenance/blob_retain.go | 140 ++++++++++++++++++ repo/maintenance/blob_retain_test.go | 123 +++++++++++++++ repo/maintenance/maintenance_params.go | 7 + repo/maintenance/maintenance_run.go | 35 +++-- repo/open.go | 10 +- 30 files changed, 768 insertions(+), 43 deletions(-) create mode 100644 cli/command_maintenance_set_test.go create mode 100644 repo/blob/storage_extend_test.go create mode 100644 repo/blob/suite_test.go create mode 100644 repo/locking_storage.go create mode 100644 repo/maintenance/blob_retain.go create mode 100644 repo/maintenance/blob_retain_test.go diff --git a/cli/command_maintenance_info.go b/cli/command_maintenance_info.go index f5d157c25..60908e2f7 100644 --- a/cli/command_maintenance_info.go +++ b/cli/command_maintenance_info.go @@ -66,6 +66,12 @@ func (c *commandMaintenanceInfo) run(ctx context.Context, rep repo.DirectReposit c.out.printStdout(" max age of logs: %v\n", cl.MaxAge) c.out.printStdout(" max total size: %v\n", units.BytesString(cl.MaxTotalSize)) + if p.ExtendObjectLocks { + c.out.printStdout("Object Lock Extension: enabled\n") + } else { + c.out.printStdout("Object Lock Extension: disabled\n") + } + c.out.printStdout("Recent Maintenance Runs:\n") for run, timings := range s.Runs { diff --git a/cli/command_maintenance_set.go b/cli/command_maintenance_set.go index c08905c1f..d2f5bd9d3 100644 --- a/cli/command_maintenance_set.go +++ b/cli/command_maintenance_set.go @@ -23,6 +23,8 @@ type commandMaintenanceSet struct { maxRetainedLogCount int maxRetainedLogAge time.Duration maxTotalRetainedLogSizeMB int64 + + extendObjectLocks []bool // optional boolean } func (c *commandMaintenanceSet) setup(svc appServices, parent commandParent) { @@ -51,6 +53,7 @@ func (c *commandMaintenanceSet) setup(svc appServices, parent commandParent) { cmd.Flag("max-retained-log-count", "Set maximum number of log sessions to retain").IntVar(&c.maxRetainedLogCount) cmd.Flag("max-retained-log-age", "Set maximum age of log sessions to retain").DurationVar(&c.maxRetainedLogAge) cmd.Flag("max-retained-log-size-mb", "Set maximum total size of log sessions").Int64Var(&c.maxTotalRetainedLogSizeMB) + cmd.Flag("extend-object-locks", "Extend retention period of locked objects as part of full maintenance.").BoolListVar(&c.extendObjectLocks) cmd.Action(svc.directRepositoryWriteAction(c.run)) } @@ -121,6 +124,22 @@ func (c *commandMaintenanceSet) setMaintenanceEnabledAndIntervalFromFlags(ctx co } } +func (c *commandMaintenanceSet) setMaintenanceObjectLockExtendFromFlags(ctx context.Context, p *maintenance.Params, changed *bool) { + // we use lists to distinguish between flag not set + // Zero elements == not set, more than zero - flag set, in which case we pick the last value + if len(c.extendObjectLocks) > 0 { + lastVal := c.extendObjectLocks[len(c.extendObjectLocks)-1] + p.ExtendObjectLocks = lastVal + *changed = true + + if lastVal { + log(ctx).Info("Object Lock extension maintenance enabled.") + } else { + log(ctx).Info("Object Lock extension maintenance disabled.") + } + } +} + func (c *commandMaintenanceSet) run(ctx context.Context, rep repo.DirectRepositoryWriter) error { p, err := maintenance.GetParams(ctx, rep) if err != nil { @@ -138,6 +157,7 @@ func (c *commandMaintenanceSet) run(ctx context.Context, rep repo.DirectReposito c.setMaintenanceEnabledAndIntervalFromFlags(ctx, &p.QuickCycle, "quick", c.maintenanceSetEnableQuick, c.maintenanceSetQuickFrequency, &changedParams) c.setMaintenanceEnabledAndIntervalFromFlags(ctx, &p.FullCycle, "full", c.maintenanceSetEnableFull, c.maintenanceSetFullFrequency, &changedParams) c.setLogCleanupParametersFromFlags(ctx, p, &changedParams) + c.setMaintenanceObjectLockExtendFromFlags(ctx, p, &changedParams) if pauseDuration := c.maintenanceSetPauseQuick; pauseDuration != -1 { s.NextQuickMaintenanceTime = rep.Time().Add(pauseDuration) @@ -157,6 +177,15 @@ func (c *commandMaintenanceSet) run(ctx context.Context, rep repo.DirectReposito return errors.Errorf("no changes specified") } + blobCfg, err := rep.FormatManager().BlobCfgBlob() + if err != nil { + return errors.Wrap(err, "blob configuration") + } + + if err = maintenance.CheckExtendRetention(ctx, blobCfg, p); err != nil { + return errors.Wrap(err, "unable to apply maintenance changes") + } + if changedSchedule { if err := maintenance.SetSchedule(ctx, rep, s); err != nil { return errors.Wrap(err, "unable to set schedule") diff --git a/cli/command_maintenance_set_test.go b/cli/command_maintenance_set_test.go new file mode 100644 index 000000000..41dd22a60 --- /dev/null +++ b/cli/command_maintenance_set_test.go @@ -0,0 +1,83 @@ +package cli_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/cli" + "github.com/kopia/kopia/internal/testutil" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/tests/testenv" +) + +func TestMaintenanceSetExtendObjectLocks(t *testing.T) { + t.Parallel() + + e := testenv.NewCLITest(t, testenv.RepoFormatNotImportant, testenv.NewInProcRunner(t)) + defer e.RunAndExpectSuccess(t, "repo", "disconnect") + + var mi cli.MaintenanceInfo + + e.RunAndExpectSuccess(t, "repo", "create", "filesystem", "--path", e.RepoDir) + + testutil.MustParseJSONLines(t, e.RunAndExpectSuccess(t, "maintenance", "info", "--json"), &mi) + + require.False(t, mi.ExtendObjectLocks, "ExtendOjectLocks should not default to enabled.") + + e.RunAndExpectSuccess(t, "maintenance", "set", "--extend-object-locks", "true") + + testutil.MustParseJSONLines(t, e.RunAndExpectSuccess(t, "maintenance", "info", "--json"), &mi) + + require.True(t, mi.ExtendObjectLocks, "ExtendOjectLocks should be enabled.") + + e.RunAndExpectSuccess(t, "maintenance", "set", "--extend-object-locks", "false") + + testutil.MustParseJSONLines(t, e.RunAndExpectSuccess(t, "maintenance", "info", "--json"), &mi) + + require.False(t, mi.ExtendObjectLocks, "ExtendOjectLocks should be disabled.") +} + +func (s *formatSpecificTestSuite) TestInvalidExtendRetainOptions(t *testing.T) { + var mi cli.MaintenanceInfo + + var rs cli.RepositoryStatus + + e := s.setupInMemoryRepo(t) + + // set retention + e.RunAndExpectSuccess(t, "repository", "set-parameters", "--retention-mode", blob.Compliance.String(), + "--retention-period", "48h") + + e.RunAndExpectSuccess(t, "maintenance", "set", "--full-interval", "24h01m") + + // Cannot enable extend object locks when retention_period-full_maintenance_interval < 24h + e.RunAndExpectFailure(t, "maintenance", "set", "--extend-object-locks", "true") + + testutil.MustParseJSONLines(t, e.RunAndExpectSuccess(t, "maintenance", "info", "--json"), &mi) + + require.False(t, mi.ExtendObjectLocks, "ExtendOjectLocks should be disabled.") + + // Enable extend object locks when retention_period-full_maintenance_interval > 24h + e.RunAndExpectSuccess(t, "maintenance", "set", "--full-interval", "23h59m") + + e.RunAndExpectSuccess(t, "maintenance", "set", "--extend-object-locks", "true") + + // Cannot change full_maintenance_interval when retention_period-full_maintenance_interval < 24h + e.RunAndExpectFailure(t, "maintenance", "set", "--full-interval", "24h01m") + + testutil.MustParseJSONLines(t, e.RunAndExpectSuccess(t, "maintenance", "info", "--json"), &mi) + + require.True(t, mi.ExtendObjectLocks, "ExtendOjectLocks should be enabled.") + require.True(t, mi.FullCycle.Interval == 86340000000000, "maintenance-interval should be unchanged.") + + // Cannot change retention_period when retention_period-full_maintenance_interval < 24h + e.RunAndExpectFailure(t, "repository", "set-parameters", "--retention-period", "47h") + testutil.MustParseJSONLines(t, e.RunAndExpectSuccess(t, "repo", "status", "--json"), &rs) + require.True(t, rs.BlobRetention.RetentionPeriod == 172800000000000, "retention-interval should be unchanged.") + + // Can change retention_period when retention_period-full_maintenance_interval > 24h + e.RunAndExpectSuccess(t, "repository", "set-parameters", "--retention-period", "49h") + testutil.MustParseJSONLines(t, e.RunAndExpectSuccess(t, "repo", "status", "--json"), &rs) + require.True(t, rs.BlobRetention.RetentionPeriod == 176400000000000, "retention-interval should be unchanged.") +} diff --git a/cli/command_repository_set_parameters.go b/cli/command_repository_set_parameters.go index d898a892c..d9b76ea2e 100644 --- a/cli/command_repository_set_parameters.go +++ b/cli/command_repository_set_parameters.go @@ -12,6 +12,7 @@ "github.com/kopia/kopia/repo" "github.com/kopia/kopia/repo/blob" "github.com/kopia/kopia/repo/format" + "github.com/kopia/kopia/repo/maintenance" ) type commandRepositorySetParameters struct { @@ -230,6 +231,17 @@ func (c *commandRepositorySetParameters) run(ctx context.Context, rep repo.Direc return errors.Errorf("no changes") } + if blobcfg.IsRetentionEnabled() { + p, err := maintenance.GetParams(ctx, rep) + if err != nil { + return errors.Wrap(err, "unable to get current maintenance parameters") + } + + if err := maintenance.CheckExtendRetention(ctx, blobcfg, p); err != nil { + return errors.Wrap(err, "unable to apply maintenance changes") + } + } + if err := updateRepositoryParameters(ctx, upgradeToEpochManager, mp, rep, blobcfg, requiredFeatures); err != nil { return errors.Wrap(err, "error updating repository parameters") } diff --git a/internal/blobtesting/eventually_consistent.go b/internal/blobtesting/eventually_consistent.go index db6553c3f..82e82cde2 100644 --- a/internal/blobtesting/eventually_consistent.go +++ b/internal/blobtesting/eventually_consistent.go @@ -285,6 +285,10 @@ func (s *eventuallyConsistentStorage) FlushCaches(ctx context.Context) error { return s.realStorage.FlushCaches(ctx) } +func (s *eventuallyConsistentStorage) ExtendBlobRetention(ctx context.Context, b blob.ID, opts blob.ExtendOptions) error { + return s.realStorage.ExtendBlobRetention(ctx, b, opts) +} + // NewEventuallyConsistentStorage returns an eventually-consistent storage wrapper on top // of provided storage. func NewEventuallyConsistentStorage(st blob.Storage, listSettleTime time.Duration, timeNow func() time.Time) blob.Storage { diff --git a/internal/blobtesting/faulty.go b/internal/blobtesting/faulty.go index 620724e42..f5cc99c05 100644 --- a/internal/blobtesting/faulty.go +++ b/internal/blobtesting/faulty.go @@ -123,4 +123,9 @@ func (s *FaultyStorage) FlushCaches(ctx context.Context) error { return s.base.FlushCaches(ctx) } +// ExtendBlobRetention implements blob.Storage. +func (s *FaultyStorage) ExtendBlobRetention(ctx context.Context, b blob.ID, opts blob.ExtendOptions) error { + return s.base.ExtendBlobRetention(ctx, b, opts) +} + var _ blob.Storage = (*FaultyStorage)(nil) diff --git a/internal/blobtesting/map.go b/internal/blobtesting/map.go index 78a4820e2..810e95a33 100644 --- a/internal/blobtesting/map.go +++ b/internal/blobtesting/map.go @@ -18,6 +18,7 @@ type DataMap map[blob.ID][]byte type mapStorage struct { + blob.UnsupportedBlobRetention // +checklocks:mutex data DataMap // +checklocks:mutex diff --git a/internal/blobtesting/object_locking_map.go b/internal/blobtesting/object_locking_map.go index 5954f6c70..9276d2fb9 100644 --- a/internal/blobtesting/object_locking_map.go +++ b/internal/blobtesting/object_locking_map.go @@ -187,6 +187,23 @@ func (s *objectLockingMap) DeleteBlob(ctx context.Context, id blob.ID) error { return nil } +// ExtendBlobRetention will alter the retention time on a blob if it exists. +func (s *objectLockingMap) ExtendBlobRetention(ctx context.Context, id blob.ID, opts blob.ExtendOptions) error { + s.mutex.Lock() + defer s.mutex.Unlock() + + e, err := s.getLatestByID(id) + if err != nil { + return blob.ErrBlobNotFound + } + + if !e.retentionTime.IsZero() { + e.retentionTime = e.mtime.Add(opts.RetentionPeriod) + } + + return nil +} + // ListBlobs will return the list of all the objects except the ones which have // a delete-marker as their latest version. func (s *objectLockingMap) ListBlobs(ctx context.Context, prefix blob.ID, callback func(blob.Metadata) error) error { diff --git a/internal/blobtesting/verify.go b/internal/blobtesting/verify.go index fba682de5..87d44b560 100644 --- a/internal/blobtesting/verify.go +++ b/internal/blobtesting/verify.go @@ -128,6 +128,18 @@ func VerifyStorage(ctx context.Context, t *testing.T, r blob.Storage, opts blob. } }) + t.Run("ExtendBlobRetention", func(t *testing.T) { + err := r.ExtendBlobRetention(ctx, blocks[0].blk, blob.ExtendOptions{ + RetentionMode: opts.RetentionMode, + RetentionPeriod: opts.RetentionPeriod, + }) + if opts.RetentionMode != "" && err != nil { + t.Fatalf("No error expected during extend retention: %v", err) + } else if opts.RetentionMode == "" && err == nil { + t.Fatal("No error found when expected during extend retention") + } + }) + t.Run("DeleteBlobsAndList", func(t *testing.T) { require.NoError(t, r.DeleteBlob(ctx, blocks[0].blk)) require.NoError(t, r.DeleteBlob(ctx, blocks[0].blk)) diff --git a/repo/blob/azure/azure_storage.go b/repo/blob/azure/azure_storage.go index 14a907ee2..2f55ed60f 100644 --- a/repo/blob/azure/azure_storage.go +++ b/repo/blob/azure/azure_storage.go @@ -27,6 +27,7 @@ type azStorage struct { Options + blob.UnsupportedBlobRetention service *azblob.Client container string diff --git a/repo/blob/b2/b2_storage.go b/repo/blob/b2/b2_storage.go index e2fb64941..fb45e08c8 100644 --- a/repo/blob/b2/b2_storage.go +++ b/repo/blob/b2/b2_storage.go @@ -26,6 +26,7 @@ type b2Storage struct { Options + blob.UnsupportedBlobRetention cli *backblaze.B2 bucket *backblaze.Bucket diff --git a/repo/blob/filesystem/filesystem_storage.go b/repo/blob/filesystem/filesystem_storage.go index 8030b58de..1a9f5e207 100644 --- a/repo/blob/filesystem/filesystem_storage.go +++ b/repo/blob/filesystem/filesystem_storage.go @@ -34,6 +34,7 @@ type fsStorage struct { sharded.Storage + blob.UnsupportedBlobRetention } type fsImpl struct { @@ -365,7 +366,7 @@ func New(ctx context.Context, opts *Options, isCreate bool) (blob.Storage, error } return &fsStorage{ - sharded.New(&fsImpl{*opts, osi}, opts.Path, opts.Options, isCreate), + Storage: sharded.New(&fsImpl{*opts, osi}, opts.Path, opts.Options, isCreate), }, nil } diff --git a/repo/blob/gcs/gcs_storage.go b/repo/blob/gcs/gcs_storage.go index d058a4bc9..bad430c95 100644 --- a/repo/blob/gcs/gcs_storage.go +++ b/repo/blob/gcs/gcs_storage.go @@ -32,6 +32,7 @@ type gcsStorage struct { Options + blob.UnsupportedBlobRetention storageClient *gcsclient.Client bucket *gcsclient.BucketHandle diff --git a/repo/blob/gdrive/gdrive_storage.go b/repo/blob/gdrive/gdrive_storage.go index 2762e6d1d..ef53176c7 100644 --- a/repo/blob/gdrive/gdrive_storage.go +++ b/repo/blob/gdrive/gdrive_storage.go @@ -45,6 +45,7 @@ type gdriveStorage struct { Options + blob.UnsupportedBlobRetention client *drive.FilesService about *drive.AboutService diff --git a/repo/blob/logging/logging_storage.go b/repo/blob/logging/logging_storage.go index 2f8817e4b..e4180e2f4 100644 --- a/repo/blob/logging/logging_storage.go +++ b/repo/blob/logging/logging_storage.go @@ -211,6 +211,26 @@ func (s *loggingStorage) FlushCaches(ctx context.Context) error { return err } +func (s *loggingStorage) ExtendBlobRetention(ctx context.Context, b blob.ID, opts blob.ExtendOptions) error { + ctx, span := tracer.Start(ctx, "ExtendBlobRetention") + defer span.End() + + s.beginConcurrency() + defer s.endConcurrency() + + timer := timetrack.StartTimer() + err := s.base.ExtendBlobRetention(ctx, b, opts) + dt := timer.Elapsed() + + s.logger.Debugw(s.prefix+"ExtendBlobRetention", + "blobID", b, + "error", err, + "duration", dt, + ) + //nolint:wrapcheck + return err +} + func (s *loggingStorage) translateError(err error) interface{} { if err == nil { return nil diff --git a/repo/blob/readonly/readonly_storage.go b/repo/blob/readonly/readonly_storage.go index bbd8eee24..6171011d0 100644 --- a/repo/blob/readonly/readonly_storage.go +++ b/repo/blob/readonly/readonly_storage.go @@ -15,6 +15,7 @@ // readonlyStorage prevents all mutations on the underlying storage. type readonlyStorage struct { base blob.Storage + blob.UnsupportedBlobRetention } func (s readonlyStorage) GetCapacity(ctx context.Context) (blob.Capacity, error) { diff --git a/repo/blob/s3/s3_storage.go b/repo/blob/s3/s3_storage.go index 6cd5b6775..af2a58221 100644 --- a/repo/blob/s3/s3_storage.go +++ b/repo/blob/s3/s3_storage.go @@ -231,6 +231,25 @@ func (s *s3Storage) DeleteBlob(ctx context.Context, b blob.ID) error { return err } +func (s *s3Storage) ExtendBlobRetention(ctx context.Context, b blob.ID, opts blob.ExtendOptions) error { + retentionMode := minio.RetentionMode(opts.RetentionMode) + if !retentionMode.IsValid() { + return errors.Errorf("invalid retention mode: %q", opts.RetentionMode) + } + + retainUntilDate := clock.Now().Add(opts.RetentionPeriod).UTC() + + err := s.cli.PutObjectRetention(ctx, s.BucketName, s.getObjectNameString(b), minio.PutObjectRetentionOptions{ + RetainUntilDate: &retainUntilDate, + Mode: &retentionMode, + }) + if err != nil { + return errors.Wrap(err, "unable to extend retention period") + } + + return nil +} + func (s *s3Storage) getObjectNameString(b blob.ID) string { return s.Prefix + string(b) } diff --git a/repo/blob/sftp/sftp_storage.go b/repo/blob/sftp/sftp_storage.go index 14c39ab73..bb310f2cb 100644 --- a/repo/blob/sftp/sftp_storage.go +++ b/repo/blob/sftp/sftp_storage.go @@ -39,6 +39,7 @@ // sftpStorage implements blob.Storage on top of sftp. type sftpStorage struct { sharded.Storage + blob.UnsupportedBlobRetention } type sftpImpl struct { @@ -540,7 +541,7 @@ func New(ctx context.Context, opts *Options, isCreate bool) (blob.Storage, error } r := &sftpStorage{ - sharded.New(impl, opts.Path, opts.Options, isCreate), + Storage: sharded.New(impl, opts.Path, opts.Options, isCreate), } impl.rec = connection.NewReconnector(impl) diff --git a/repo/blob/storage.go b/repo/blob/storage.go index dd9a0d51a..2a19a6ad5 100644 --- a/repo/blob/storage.go +++ b/repo/blob/storage.go @@ -40,6 +40,10 @@ // implementation that does not support the intended functionality. var ErrNotAVolume = errors.New("unsupported method, storage is not a volume") +// ErrUnsupportedObjectLock is returned when attempting to use an Object Lock specific +// function on a storage implementation that does not have the intended functionality. +var ErrUnsupportedObjectLock = errors.New("object locking unsupported") + // Bytes encapsulates a sequence of bytes, possibly stored in a non-contiguous buffers, // which can be written sequentially or treated as a io.Reader. type Bytes interface { @@ -128,6 +132,20 @@ type PutOptions struct { GetModTime *time.Time // if != nil, populate the value pointed at with the actual modification time } +// ExtendOptions represents retention options for extending object locks. +type ExtendOptions struct { + RetentionMode RetentionMode + RetentionPeriod time.Duration +} + +// UnsupportedBlobRetention provides a default implementation for ExtendBlobRetention. +type UnsupportedBlobRetention struct{} + +// ExtendBlobRetention provides a common implementation for unsupported blob retention storage. +func (s *UnsupportedBlobRetention) ExtendBlobRetention(context.Context, ID, ExtendOptions) error { + return ErrUnsupportedObjectLock +} + // HasRetentionOptions returns true when blob-retention settings have been // specified, otherwise returns false. func (o PutOptions) HasRetentionOptions() bool { @@ -161,6 +179,9 @@ type Storage interface { // FlushCaches flushes any local caches associated with storage. FlushCaches(ctx context.Context) error + + // ExtendBlobRetention extends the retention time of a blob (when blob retention is enabled) + ExtendBlobRetention(ctx context.Context, blobID ID, opts ExtendOptions) error } // ID is a string that represents blob identifier. diff --git a/repo/blob/storage_extend_test.go b/repo/blob/storage_extend_test.go new file mode 100644 index 000000000..1022c18a0 --- /dev/null +++ b/repo/blob/storage_extend_test.go @@ -0,0 +1,138 @@ +package blob_test + +import ( + "io" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/kopia/kopia/internal/cache" + "github.com/kopia/kopia/internal/faketime" + "github.com/kopia/kopia/internal/repotesting" + "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/encryption" + "github.com/kopia/kopia/repo/object" +) + +var testHMACSecret = []byte{1, 2, 3} + +var testMasterKey = []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15} + +func (s *formatSpecificTestSuite) TestExtendBlobRetention(t *testing.T) { + // set up fake clock which is initially synchronized to wall clock time + // and moved at the same speed but which can be moved forward. + ta := faketime.NewClockTimeWithOffset(0) + + ctx, env := repotesting.NewEnvironment(t, s.formatVersion, repotesting.Options{ + OpenOptions: func(o *repo.Options) { + o.TimeNowFunc = ta.NowFunc() + }, + NewRepositoryOptions: func(nro *repo.NewRepositoryOptions) { + nro.BlockFormat.Encryption = encryption.DefaultAlgorithm + nro.BlockFormat.MasterKey = testMasterKey + nro.BlockFormat.Hash = "HMAC-SHA256" + nro.BlockFormat.HMACSecret = testHMACSecret + nro.RetentionMode = blob.Governance + nro.RetentionPeriod = time.Hour * 24 + }, + }) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + io.WriteString(w, "hello world!") + w.Result() + w.Close() + + env.RepositoryWriter.Flush(ctx) + + blobsBefore, err := blob.ListAllBlobs(ctx, env.RepositoryWriter.BlobStorage(), "") + if err != nil { + t.Fatal(err) + } + + if got, want := len(blobsBefore), 4; got != want { + t.Fatalf("unexpected number of blobs after writing: %v", blobsBefore) + } + + lastBlobIdx := len(blobsBefore) - 1 + st := env.RootStorage().(cache.Storage) + + // Verify that file is locked + _, err = st.TouchBlob(ctx, blobsBefore[lastBlobIdx].BlobID, time.Hour) + assert.EqualErrorf(t, err, "cannot alter object before retention period expires", "Altering locked object should fail") + + ta.Advance(7 * 24 * time.Hour) + + // Verify that file is unlocked + _, err = st.TouchBlob(ctx, blobsBefore[lastBlobIdx].BlobID, time.Hour) + if err != nil { + t.Fatalf("Altering expired object failed") + } + + // Relock blob + err = env.RepositoryWriter.BlobStorage().ExtendBlobRetention(ctx, blobsBefore[lastBlobIdx].BlobID, blob.ExtendOptions{ + RetentionMode: blob.Governance, + RetentionPeriod: 2 * time.Hour, + }) + if err != nil { + t.Fatalf("Extending Retention time failed, got err: %v", err) + } + + // Verify Lock period + ta.Advance(1 * time.Hour) + + _, err = st.TouchBlob(ctx, blobsBefore[lastBlobIdx].BlobID, time.Hour) + assert.EqualErrorf(t, err, "cannot alter object before retention period expires", "Altering locked object should fail") + + ta.Advance(2 * time.Hour) + + _, err = st.TouchBlob(ctx, blobsBefore[lastBlobIdx].BlobID, time.Hour) + if err != nil { + t.Fatalf("Altering expired object failed") + } +} + +func (s *formatSpecificTestSuite) TestExtendBlobRetentionUnsupported(t *testing.T) { + // set up fake clock which is initially synchronized to wall clock time + // and moved at the same speed but which can be moved forward. + ta := faketime.NewClockTimeWithOffset(0) + + ctx, env := repotesting.NewEnvironment(t, s.formatVersion, repotesting.Options{ + OpenOptions: func(o *repo.Options) { + o.TimeNowFunc = ta.NowFunc() + }, + NewRepositoryOptions: func(nro *repo.NewRepositoryOptions) { + nro.BlockFormat.Encryption = encryption.DefaultAlgorithm + nro.BlockFormat.MasterKey = testMasterKey + nro.BlockFormat.Hash = "HMAC-SHA256" + nro.BlockFormat.HMACSecret = testHMACSecret + // Ensure retention is disabled to trigger ExtendBlobRetention unsupported + nro.RetentionPeriod = 0 + nro.RetentionMode = "" + }, + }) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + io.WriteString(w, "hello world!") + w.Result() + w.Close() + + env.RepositoryWriter.Flush(ctx) + + blobsBefore, err := blob.ListAllBlobs(ctx, env.RepositoryWriter.BlobStorage(), "") + if err != nil { + t.Fatal(err) + } + + if got, want := len(blobsBefore), 4; got != want { + t.Fatalf("unexpected number of blobs after writing: %v", blobsBefore) + } + + lastBlobIdx := len(blobsBefore) - 1 + + // Extend retention time + err = env.RepositoryWriter.BlobStorage().ExtendBlobRetention(ctx, blobsBefore[lastBlobIdx].BlobID, blob.ExtendOptions{ + RetentionMode: blob.Governance, + RetentionPeriod: 2 * time.Hour, + }) + assert.EqualErrorf(t, err, "object locking unsupported", "Storage should not support ExtendBlobRetention") +} diff --git a/repo/blob/storagemetrics/storage_metrics.go b/repo/blob/storagemetrics/storage_metrics.go index 6fe1de017..68b3d656e 100644 --- a/repo/blob/storagemetrics/storage_metrics.go +++ b/repo/blob/storagemetrics/storage_metrics.go @@ -18,24 +18,26 @@ type blobMetrics struct { uploadedBytes *metrics.Counter listBlobItems *metrics.Counter - getBlobPartialDuration *metrics.Distribution[time.Duration] - getBlobFullDuration *metrics.Distribution[time.Duration] - putBlobDuration *metrics.Distribution[time.Duration] - getCapacityDuration *metrics.Distribution[time.Duration] - getMetadataDuration *metrics.Distribution[time.Duration] - deleteBlobDuration *metrics.Distribution[time.Duration] - listBlobsDuration *metrics.Distribution[time.Duration] - closeDuration *metrics.Distribution[time.Duration] - flushCachesDuration *metrics.Distribution[time.Duration] + getBlobPartialDuration *metrics.Distribution[time.Duration] + getBlobFullDuration *metrics.Distribution[time.Duration] + putBlobDuration *metrics.Distribution[time.Duration] + getCapacityDuration *metrics.Distribution[time.Duration] + getMetadataDuration *metrics.Distribution[time.Duration] + deleteBlobDuration *metrics.Distribution[time.Duration] + extendBlobRetentionDuration *metrics.Distribution[time.Duration] + listBlobsDuration *metrics.Distribution[time.Duration] + closeDuration *metrics.Distribution[time.Duration] + flushCachesDuration *metrics.Distribution[time.Duration] - getBlobErrors *metrics.Counter - getCapacityErrors *metrics.Counter - getMetadataErrors *metrics.Counter - putBlobErrors *metrics.Counter - deleteBlobErrors *metrics.Counter - listBlobsErrors *metrics.Counter - closeErrors *metrics.Counter - flushCachesErrors *metrics.Counter + getBlobErrors *metrics.Counter + getCapacityErrors *metrics.Counter + getMetadataErrors *metrics.Counter + putBlobErrors *metrics.Counter + deleteBlobErrors *metrics.Counter + extendBlobRetentionErrors *metrics.Counter + listBlobsErrors *metrics.Counter + closeErrors *metrics.Counter + flushCachesErrors *metrics.Counter } func (s *blobMetrics) GetBlob(ctx context.Context, id blob.ID, offset, length int64, output blob.OutputBuffer) error { @@ -121,6 +123,21 @@ func (s *blobMetrics) DeleteBlob(ctx context.Context, id blob.ID) error { return err } +func (s *blobMetrics) ExtendBlobRetention(ctx context.Context, id blob.ID, opts blob.ExtendOptions) error { + timer := timetrack.StartTimer() + err := s.base.ExtendBlobRetention(ctx, id, opts) + dt := timer.Elapsed() + + s.extendBlobRetentionDuration.Observe(dt) + + if err != nil { + s.extendBlobRetentionErrors.Add(1) + } + + //nolint:wrapcheck + return err +} + func (s *blobMetrics) ListBlobs(ctx context.Context, prefix blob.ID, callback func(blob.Metadata) error) error { timer := timetrack.StartTimer() cnt := int64(0) diff --git a/repo/blob/suite_test.go b/repo/blob/suite_test.go new file mode 100644 index 000000000..78b158a91 --- /dev/null +++ b/repo/blob/suite_test.go @@ -0,0 +1,24 @@ +package blob_test + +import ( + "testing" + + "github.com/kopia/kopia/internal/testutil" + "github.com/kopia/kopia/repo/format" +) + +type formatSpecificTestSuite struct { + formatVersion format.Version +} + +func TestFormatV1(t *testing.T) { + testutil.RunAllTestsWithParam(t, &formatSpecificTestSuite{format.FormatVersion1}) +} + +func TestFormatV2(t *testing.T) { + testutil.RunAllTestsWithParam(t, &formatSpecificTestSuite{format.FormatVersion2}) +} + +func TestFormatV3(t *testing.T) { + testutil.RunAllTestsWithParam(t, &formatSpecificTestSuite{format.FormatVersion3}) +} diff --git a/repo/blob/throttling/throttling_storage.go b/repo/blob/throttling/throttling_storage.go index 3662bbaa9..5c5f32726 100644 --- a/repo/blob/throttling/throttling_storage.go +++ b/repo/blob/throttling/throttling_storage.go @@ -14,11 +14,12 @@ // operations supported. const ( - operationGetBlob = "GetBlob" - operationGetMetadata = "GetMetadata" - operationListBlobs = "ListBlobs" - operationPutBlob = "PutBlob" - operationDeleteBlob = "DeleteBlob" + operationGetBlob = "GetBlob" + operationGetMetadata = "GetMetadata" + operationListBlobs = "ListBlobs" + operationPutBlob = "PutBlob" + operationDeleteBlob = "DeleteBlob" + operationExtendBlobRetention = "ExtendBlobRetention" ) // Throttler implements throttling policy by blocking before certain operations are @@ -104,6 +105,13 @@ func (s *throttlingStorage) DeleteBlob(ctx context.Context, id blob.ID) error { return s.Storage.DeleteBlob(ctx, id) //nolint:wrapcheck } +func (s *throttlingStorage) ExtendBlobRetention(ctx context.Context, id blob.ID, opts blob.ExtendOptions) error { + s.throttler.BeforeOperation(ctx, operationExtendBlobRetention) + defer s.throttler.AfterOperation(ctx, operationExtendBlobRetention) + + return s.Storage.ExtendBlobRetention(ctx, id, opts) //nolint:wrapcheck +} + // NewWrapper returns a Storage wrapper that adds retry loop around all operations of the underlying storage. func NewWrapper(wrapped blob.Storage, throttler Throttler) blob.Storage { return &throttlingStorage{wrapped, throttler} diff --git a/repo/blob/webdav/webdav_storage.go b/repo/blob/webdav/webdav_storage.go index d5af61bd8..a8b503be5 100644 --- a/repo/blob/webdav/webdav_storage.go +++ b/repo/blob/webdav/webdav_storage.go @@ -36,6 +36,7 @@ // may be accessed using WebDAV or File interchangeably. type davStorage struct { sharded.Storage + blob.UnsupportedBlobRetention } type davStorageImpl struct { @@ -277,7 +278,7 @@ func New(ctx context.Context, opts *Options, isCreate bool) (blob.Storage, error } s := retrying.NewWrapper(&davStorage{ - sharded.New(&davStorageImpl{ + Storage: sharded.New(&davStorageImpl{ Options: *opts, cli: cli, }, "", opts.Options, isCreate), diff --git a/repo/locking_storage.go b/repo/locking_storage.go new file mode 100644 index 000000000..3500a71a3 --- /dev/null +++ b/repo/locking_storage.go @@ -0,0 +1,22 @@ +package repo + +import ( + "github.com/kopia/kopia/internal/epoch" + "github.com/kopia/kopia/repo/content" + "github.com/kopia/kopia/repo/content/indexblob" + "github.com/kopia/kopia/repo/format" +) + +// GetLockingStoragePrefixes Return all prefixes that may be maintained by Object Locking. +func GetLockingStoragePrefixes() []string { + var prefixes []string + // collect prefixes that need to be locked on put + for _, prefix := range content.PackBlobIDPrefixes { + prefixes = append(prefixes, string(prefix)) + } + + prefixes = append(prefixes, indexblob.V0IndexBlobPrefix, epoch.EpochManagerIndexUberPrefix, format.KopiaRepositoryBlobID, + format.KopiaBlobCfgBlobID) + + return prefixes +} diff --git a/repo/maintenance/blob_retain.go b/repo/maintenance/blob_retain.go new file mode 100644 index 000000000..e195f63c0 --- /dev/null +++ b/repo/maintenance/blob_retain.go @@ -0,0 +1,140 @@ +package maintenance + +import ( + "context" + "runtime" + "sync" + "sync/atomic" + "time" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/format" +) + +const parallelBlobRetainCPUMultiplier = 2 + +const minRetentionMaintenanceDiff = time.Duration(24) * time.Hour + +// ExtendBlobRetentionTimeOptions provides options for extending blob retention algorithm. +type ExtendBlobRetentionTimeOptions struct { + Parallel int + DryRun bool +} + +// ExtendBlobRetentionTime extends the retention time of all relevant blobs managed by storage engine with Object Locking enabled. +func ExtendBlobRetentionTime(ctx context.Context, rep repo.DirectRepositoryWriter, opt ExtendBlobRetentionTimeOptions) (int, error) { + const extendQueueSize = 100 + + var ( + wg sync.WaitGroup + prefixes []blob.ID + cnt = new(uint32) + toExtend = new(uint32) + failedCnt = new(uint32) + ) + + if opt.Parallel == 0 { + opt.Parallel = runtime.NumCPU() * parallelBlobRetainCPUMultiplier + } + + blobCfg, err := rep.FormatManager().BlobCfgBlob() + if err != nil { + return 0, errors.Wrap(err, "blob configuration") + } + + if !blobCfg.IsRetentionEnabled() { + // Blob retention is disabled + log(ctx).Info("Object lock retention is disabled.") + return 0, nil + } + + extend := make(chan blob.Metadata, extendQueueSize) + extendOpts := blob.ExtendOptions{ + RetentionMode: blobCfg.RetentionMode, + RetentionPeriod: blobCfg.RetentionPeriod, + } + + if !opt.DryRun { + // start goroutines to extend blob retention as they come. + for i := 0; i < opt.Parallel; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + + for bm := range extend { + if err1 := rep.BlobStorage().ExtendBlobRetention(ctx, bm.BlobID, extendOpts); err1 != nil { + log(ctx).Errorf("Failed to extend blob %v: %v", bm.BlobID, err1) + atomic.AddUint32(failedCnt, 1) + + continue + } + + curCnt := atomic.AddUint32(cnt, 1) + if curCnt%100 == 0 { + log(ctx).Infof(" extended %v blobs", curCnt) + } + } + }() + } + } + + // Convert prefixes from string to BlobID. + for _, pfx := range repo.GetLockingStoragePrefixes() { + prefixes = append(prefixes, blob.ID(pfx)) + } + + // iterate all relevant (active, extendable) blobs and count them + optionally send to the channel to be extended + log(ctx).Infof("Extending retention time for blobs...") + + err = blob.IterateAllPrefixesInParallel(ctx, opt.Parallel, rep.BlobStorage(), prefixes, func(bm blob.Metadata) error { + if !opt.DryRun { + extend <- bm + } + + atomic.AddUint32(toExtend, 1) + return nil + }) + + close(extend) + log(ctx).Infof("Found %v blobs to extend", *toExtend) + + // wait for all extend workers to finish. + wg.Wait() + + if *failedCnt > 0 { + return 0, errors.Errorf("Failed to extend %v blobs", *failedCnt) + } + + if err != nil { + return 0, errors.Wrap(err, "error iterating packs") + } + + if opt.DryRun { + return int(*toExtend), nil + } + + log(ctx).Infof("Extended total %v blobs", *cnt) + + return int(*cnt), nil +} + +// CheckExtendRetention verifies if extension can be enabled due to maintenance and blob parameters. +func CheckExtendRetention(ctx context.Context, blobCfg format.BlobStorageConfiguration, p *Params) error { + if !p.ExtendObjectLocks { + return nil + } + + if !p.FullCycle.Enabled { + log(ctx).Warn("Object Lock extension will not function because Full-Maintenance is disabled") + } + + if blobCfg.RetentionPeriod > 0 && blobCfg.RetentionPeriod-p.FullCycle.Interval < minRetentionMaintenanceDiff { + return errors.Errorf("The repo RetentionPeriod must be %v greater than the Full Maintenance interval %v %v", minRetentionMaintenanceDiff, blobCfg.RetentionPeriod, p.FullCycle.Interval) + } + + return nil +} diff --git a/repo/maintenance/blob_retain_test.go b/repo/maintenance/blob_retain_test.go new file mode 100644 index 000000000..9dcc4653d --- /dev/null +++ b/repo/maintenance/blob_retain_test.go @@ -0,0 +1,123 @@ +package maintenance_test + +import ( + "io" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/kopia/kopia/internal/cache" + "github.com/kopia/kopia/internal/faketime" + "github.com/kopia/kopia/internal/repotesting" + "github.com/kopia/kopia/repo" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/encryption" + "github.com/kopia/kopia/repo/maintenance" + "github.com/kopia/kopia/repo/object" +) + +const blockFormatHash = "HMAC-SHA256" + +func (s *formatSpecificTestSuite) TestExtendBlobRetentionTime(t *testing.T) { + // set up fake clock which is initially synchronized to wall clock time + // and moved at the same speed but which can be moved forward. + ta := faketime.NewClockTimeWithOffset(0) + + ctx, env := repotesting.NewEnvironment(t, s.formatVersion, repotesting.Options{ + OpenOptions: func(o *repo.Options) { + o.TimeNowFunc = ta.NowFunc() + }, + NewRepositoryOptions: func(nro *repo.NewRepositoryOptions) { + nro.BlockFormat.Encryption = encryption.DefaultAlgorithm + nro.BlockFormat.MasterKey = testMasterKey + nro.BlockFormat.Hash = blockFormatHash + nro.BlockFormat.HMACSecret = testHMACSecret + nro.RetentionMode = blob.Governance + nro.RetentionPeriod = time.Hour * 24 + }, + }) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + io.WriteString(w, "hello world!") + w.Result() + w.Close() + + env.RepositoryWriter.Flush(ctx) + + blobsBefore, err := blob.ListAllBlobs(ctx, env.RepositoryWriter.BlobStorage(), "") + if err != nil { + t.Fatal(err) + } + + if got, want := len(blobsBefore), 4; got != want { + t.Fatalf("unexpected number of blobs after writing: %v", blobsBefore) + } + + lastBlobIdx := len(blobsBefore) - 1 + st := env.RootStorage().(cache.Storage) + + ta.Advance(7 * 24 * time.Hour) + + if _, err = st.TouchBlob(ctx, blobsBefore[lastBlobIdx].BlobID, time.Hour); err != nil { + t.Fatalf("Altering expired object failed") + } + + // extend retention time of all blobs + if _, err = maintenance.ExtendBlobRetentionTime(ctx, env.RepositoryWriter, maintenance.ExtendBlobRetentionTimeOptions{}); err != nil { + t.Fatal(err) + } + + _, err = st.TouchBlob(ctx, blobsBefore[lastBlobIdx].BlobID, time.Hour) + assert.EqualErrorf(t, err, "cannot alter object before retention period expires", "Altering locked object should fail") +} + +func (s *formatSpecificTestSuite) TestExtendBlobRetentionTimeDisabled(t *testing.T) { + // set up fake clock which is initially synchronized to wall clock time + // and moved at the same speed but which can be moved forward. + ta := faketime.NewClockTimeWithOffset(0) + + ctx, env := repotesting.NewEnvironment(t, s.formatVersion, repotesting.Options{ + OpenOptions: func(o *repo.Options) { + o.TimeNowFunc = ta.NowFunc() + }, + NewRepositoryOptions: func(nro *repo.NewRepositoryOptions) { + nro.BlockFormat.Encryption = encryption.DefaultAlgorithm + nro.BlockFormat.MasterKey = testMasterKey + nro.BlockFormat.Hash = blockFormatHash + nro.BlockFormat.HMACSecret = testHMACSecret + }, + }) + w := env.RepositoryWriter.NewObjectWriter(ctx, object.WriterOptions{}) + io.WriteString(w, "hello world!") + w.Result() + w.Close() + + env.RepositoryWriter.Flush(ctx) + + blobsBefore, err := blob.ListAllBlobs(ctx, env.RepositoryWriter.BlobStorage(), "") + if err != nil { + t.Fatal(err) + } + + if got, want := len(blobsBefore), 4; got != want { + t.Fatalf("unexpected number of blobs after writing: %v", blobsBefore) + } + + lastBlobIdx := len(blobsBefore) - 1 + st := env.RootStorage().(cache.Storage) + + ta.Advance(7 * 24 * time.Hour) + + if _, err = st.TouchBlob(ctx, blobsBefore[lastBlobIdx].BlobID, time.Hour); err != nil { + t.Fatalf("Altering expired object failed") + } + + // extend retention time of all blobs + if _, err = maintenance.ExtendBlobRetentionTime(ctx, env.RepositoryWriter, maintenance.ExtendBlobRetentionTimeOptions{}); err != nil { + t.Fatal(err) + } + + if _, err = st.TouchBlob(ctx, blobsBefore[lastBlobIdx].BlobID, time.Hour); err != nil { + t.Fatalf("Altering expired object failed") + } +} diff --git a/repo/maintenance/maintenance_params.go b/repo/maintenance/maintenance_params.go index f70e7f648..3cbbb208b 100644 --- a/repo/maintenance/maintenance_params.go +++ b/repo/maintenance/maintenance_params.go @@ -23,6 +23,8 @@ type Params struct { FullCycle CycleParams `json:"full"` LogRetention LogRetentionOptions `json:"logRetention"` + + ExtendObjectLocks bool `json:"extendObjectLocks"` } func (p *Params) isOwnedByByThisUser(rep repo.Repository) bool { @@ -41,6 +43,11 @@ func DefaultParams() Params { Interval: 1 * time.Hour, }, LogRetention: defaultLogRetention(), + // Don't attempt to extend object locks by default. This option may not be + // supported by all storage providers or blob implementations (currently + // supported by S3 backend) and may cause data to be kept longer than + // desired if the retention period is relatively long. + ExtendObjectLocks: false, } } diff --git a/repo/maintenance/maintenance_run.go b/repo/maintenance/maintenance_run.go index 1e08cb1c6..455e0da0a 100644 --- a/repo/maintenance/maintenance_run.go +++ b/repo/maintenance/maintenance_run.go @@ -36,15 +36,16 @@ // Task IDs. const ( - TaskSnapshotGarbageCollection = "snapshot-gc" - TaskDeleteOrphanedBlobsQuick = "quick-delete-blobs" - TaskDeleteOrphanedBlobsFull = "full-delete-blobs" - TaskRewriteContentsQuick = "quick-rewrite-contents" - TaskRewriteContentsFull = "full-rewrite-contents" - TaskDropDeletedContentsFull = "full-drop-deleted-content" - TaskIndexCompaction = "index-compaction" - TaskCleanupLogs = "cleanup-logs" - TaskCleanupEpochManager = "cleanup-epoch-manager" + TaskSnapshotGarbageCollection = "snapshot-gc" + TaskDeleteOrphanedBlobsQuick = "quick-delete-blobs" + TaskDeleteOrphanedBlobsFull = "full-delete-blobs" + TaskRewriteContentsQuick = "quick-rewrite-contents" + TaskRewriteContentsFull = "full-rewrite-contents" + TaskDropDeletedContentsFull = "full-drop-deleted-content" + TaskIndexCompaction = "index-compaction" + TaskExtendBlobRetentionTimeFull = "extend-blob-retention-time" + TaskCleanupLogs = "cleanup-logs" + TaskCleanupEpochManager = "cleanup-epoch-manager" ) // shouldRun returns Mode if repository is due for periodic maintenance. @@ -400,6 +401,13 @@ func runTaskDeleteOrphanedBlobsQuick(ctx context.Context, runParams RunParameter }) } +func runTaskExtendBlobRetentionTimeFull(ctx context.Context, runParams RunParameters, s *Schedule) error { + return ReportRun(ctx, runParams.rep, TaskExtendBlobRetentionTimeFull, s, func() error { + _, err := ExtendBlobRetentionTime(ctx, runParams.rep, ExtendBlobRetentionTimeOptions{}) + return err + }) +} + func runFullMaintenance(ctx context.Context, runParams RunParameters, safety SafetyParameters) error { s, err := GetSchedule(ctx, runParams.rep) if err != nil { @@ -431,6 +439,15 @@ func runFullMaintenance(ctx context.Context, runParams RunParameters, safety Saf notDeletingOrphanedBlobs(ctx, s, safety) } + // extend retention-time on supported storage. + if runParams.Params.ExtendObjectLocks { + if err := runTaskExtendBlobRetentionTimeFull(ctx, runParams, s); err != nil { + return errors.Wrap(err, "error extending object lock retention time") + } + } else { + log(ctx).Debug("Extending object lock retention-period is disabled.") + } + if err := runTaskCleanupLogs(ctx, runParams, s); err != nil { return errors.Wrap(err, "error cleaning up logs") } diff --git a/repo/open.go b/repo/open.go index dd7c5a206..9aa44c1e2 100644 --- a/repo/open.go +++ b/repo/open.go @@ -14,7 +14,6 @@ "github.com/kopia/kopia/internal/cache" "github.com/kopia/kopia/internal/cacheprot" - "github.com/kopia/kopia/internal/epoch" "github.com/kopia/kopia/internal/feature" "github.com/kopia/kopia/internal/metrics" "github.com/kopia/kopia/internal/retry" @@ -25,7 +24,6 @@ "github.com/kopia/kopia/repo/blob/storagemetrics" "github.com/kopia/kopia/repo/blob/throttling" "github.com/kopia/kopia/repo/content" - "github.com/kopia/kopia/repo/content/indexblob" "github.com/kopia/kopia/repo/format" "github.com/kopia/kopia/repo/logging" "github.com/kopia/kopia/repo/manifest" @@ -394,13 +392,7 @@ func handleMissingRequiredFeatures(ctx context.Context, fmgr *format.Manager, ig func wrapLockingStorage(st blob.Storage, r format.BlobStorageConfiguration) blob.Storage { // collect prefixes that need to be locked on put - var prefixes []string - for _, prefix := range content.PackBlobIDPrefixes { - prefixes = append(prefixes, string(prefix)) - } - - prefixes = append(prefixes, indexblob.V0IndexBlobPrefix, epoch.EpochManagerIndexUberPrefix, format.KopiaRepositoryBlobID, - format.KopiaBlobCfgBlobID) + prefixes := GetLockingStoragePrefixes() return beforeop.NewWrapper(st, nil, nil, nil, func(ctx context.Context, id blob.ID, opts *blob.PutOptions) error { for _, prefix := range prefixes {