From 1bceb7155ede754dea05b731579a1e9ef46e56a1 Mon Sep 17 00:00:00 2001 From: Mike McKay-Dirden <93532247+KastenMike@users.noreply.github.com> Date: Fri, 27 Sep 2024 06:46:25 +0200 Subject: [PATCH] feat(providers): GCS immutability (#4134) - Allow blob `Put/ExtendBlobRetention` - PITR support - PITR/versioning tests --- cli/storage_gcs.go | 22 +++ repo/blob/gcs/gcs_immu_test.go | 125 ++++++++++++++ repo/blob/gcs/gcs_options.go | 4 + repo/blob/gcs/gcs_pit.go | 140 +++++++++++++++ repo/blob/gcs/gcs_storage.go | 83 +++++++-- repo/blob/gcs/gcs_storage_test.go | 94 +++++++++- repo/blob/gcs/gcs_versioned.go | 98 +++++++++++ repo/blob/gcs/gcs_versioned_test.go | 258 ++++++++++++++++++++++++++++ 8 files changed, 799 insertions(+), 25 deletions(-) create mode 100644 repo/blob/gcs/gcs_immu_test.go create mode 100644 repo/blob/gcs/gcs_pit.go create mode 100644 repo/blob/gcs/gcs_versioned.go create mode 100644 repo/blob/gcs/gcs_versioned_test.go diff --git a/cli/storage_gcs.go b/cli/storage_gcs.go index 738aef565..707407aad 100644 --- a/cli/storage_gcs.go +++ b/cli/storage_gcs.go @@ -4,6 +4,7 @@ "context" "encoding/json" "os" + "time" "github.com/alecthomas/kingpin/v2" "github.com/pkg/errors" @@ -26,11 +27,32 @@ func (c *storageGCSFlags) Setup(_ StorageProviderServices, cmd *kingpin.CmdClaus cmd.Flag("embed-credentials", "Embed GCS credentials JSON in Kopia configuration").BoolVar(&c.embedCredentials) commonThrottlingFlags(cmd, &c.options.Limits) + + var pointInTimeStr string + + pitPreAction := func(_ *kingpin.ParseContext) error { + if pointInTimeStr != "" { + t, err := time.Parse(time.RFC3339, pointInTimeStr) + if err != nil { + return errors.Wrap(err, "invalid point-in-time argument") + } + + c.options.PointInTime = &t + } + + return nil + } + + cmd.Flag("point-in-time", "Use a point-in-time view of the storage repository when supported").PlaceHolder(time.RFC3339).PreAction(pitPreAction).StringVar(&pointInTimeStr) } func (c *storageGCSFlags) Connect(ctx context.Context, isCreate bool, formatVersion int) (blob.Storage, error) { _ = formatVersion + if isCreate && c.options.PointInTime != nil && !c.options.PointInTime.IsZero() { + return nil, errors.New("Cannot specify a 'point-in-time' option when creating a repository") + } + if c.embedCredentials { data, err := os.ReadFile(c.options.ServiceAccountCredentialsFile) if err != nil { diff --git a/repo/blob/gcs/gcs_immu_test.go b/repo/blob/gcs/gcs_immu_test.go new file mode 100644 index 000000000..a545260fb --- /dev/null +++ b/repo/blob/gcs/gcs_immu_test.go @@ -0,0 +1,125 @@ +package gcs_test + +import ( + "context" + "crypto/rand" + "fmt" + "os" + "testing" + "time" + + gcsclient "cloud.google.com/go/storage" + "github.com/stretchr/testify/require" + "google.golang.org/api/option" + + "github.com/kopia/kopia/internal/clock" + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/internal/testutil" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/blob/gcs" +) + +// TestGoogleStorageImmutabilityProtection runs through the behavior of Google immutability protection. +func TestGoogleStorageImmutabilityProtection(t *testing.T) { + t.Parallel() + testutil.ProviderTest(t) + + opts := bucketOpts{ + projectID: os.Getenv(testBucketProjectID), + bucket: os.Getenv(testImmutableBucketEnv), + credentialsFile: os.Getenv(testBucketCredentialsFile), + isLockedBucket: true, + } + createBucket(t, opts) + validateBucket(t, opts) + + data := make([]byte, 8) + rand.Read(data) + + ctx := testlogging.Context(t) + + // use context that gets canceled after opening storage to ensure it's not used beyond New(). + newctx, cancel := context.WithCancel(ctx) + prefix := fmt.Sprintf("test-%v-%x/", clock.Now().Unix(), data) + st, err := gcs.New(newctx, &gcs.Options{ + BucketName: opts.bucket, + ServiceAccountCredentialsFile: opts.credentialsFile, + Prefix: prefix, + }, false) + + cancel() + require.NoError(t, err) + + t.Cleanup(func() { + st.Close(ctx) + }) + + const ( + blobName = "sExample" + dummyBlob = blob.ID(blobName) + ) + + blobNameFullPath := prefix + blobName + + putOpts := blob.PutOptions{ + RetentionPeriod: 3 * time.Second, + } + err = st.PutBlob(ctx, dummyBlob, gather.FromSlice([]byte("x")), putOpts) + require.NoError(t, err) + cli := getGoogleCLI(t, opts.credentialsFile) + + count := getBlobCount(ctx, t, st, dummyBlob[:1]) + require.Equal(t, 1, count) + + attrs, err := cli.Bucket(opts.bucket).Object(blobNameFullPath).Attrs(ctx) + require.NoError(t, err) + + blobRetention := attrs.RetentionExpirationTime + if !blobRetention.After(attrs.Created) { + t.Fatalf("blob retention period not in the future enough: %v (created at %v)", blobRetention, attrs.Created) + } + + extendOpts := blob.ExtendOptions{ + RetentionPeriod: 10 * time.Second, + } + err = st.ExtendBlobRetention(ctx, dummyBlob, extendOpts) + require.NoError(t, err) + + attrs, err = cli.Bucket(opts.bucket).Object(blobNameFullPath).Attrs(ctx) + require.NoError(t, err) + + extendedRetention := attrs.RetentionExpirationTime + if !extendedRetention.After(blobRetention) { + t.Fatalf("blob retention period not extended. was %v, now %v", blobRetention, extendedRetention) + } + + updAttrs := gcsclient.ObjectAttrsToUpdate{ + Retention: &gcsclient.ObjectRetention{ + Mode: "Unlocked", + RetainUntil: clock.Now().Add(10 * time.Minute), + }, + } + _, err = cli.Bucket(opts.bucket).Object(blobNameFullPath).OverrideUnlockedRetention(true).Update(ctx, updAttrs) + require.Error(t, err) + require.ErrorContains(t, err, "Its retention mode cannot be changed and its retention period cannot be shortened.") + + err = st.DeleteBlob(ctx, dummyBlob) + require.NoError(t, err) + + count = getBlobCount(ctx, t, st, dummyBlob[:1]) + require.Equal(t, 0, count) +} + +// getGoogleCLI returns a separate client to verify things the Storage interface doesn't support. +func getGoogleCLI(t *testing.T, credentialsFile string) *gcsclient.Client { + t.Helper() + + ctx := context.Background() + cli, err := gcsclient.NewClient(ctx, option.WithCredentialsFile(credentialsFile)) + if err != nil { + t.Fatalf("unable to create GCS client: %v", err) + } + + return cli +} diff --git a/repo/blob/gcs/gcs_options.go b/repo/blob/gcs/gcs_options.go index 86336b610..53db319be 100644 --- a/repo/blob/gcs/gcs_options.go +++ b/repo/blob/gcs/gcs_options.go @@ -2,6 +2,7 @@ import ( "encoding/json" + "time" "github.com/kopia/kopia/repo/blob/throttling" ) @@ -24,4 +25,7 @@ type Options struct { ReadOnly bool `json:"readOnly,omitempty"` throttling.Limits + + // PointInTime specifies a view of the (versioned) store at that time + PointInTime *time.Time `json:"pointInTime,omitempty"` } diff --git a/repo/blob/gcs/gcs_pit.go b/repo/blob/gcs/gcs_pit.go new file mode 100644 index 000000000..6a552744e --- /dev/null +++ b/repo/blob/gcs/gcs_pit.go @@ -0,0 +1,140 @@ +package gcs + +import ( + "context" + "time" + + "github.com/pkg/errors" + + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/blob/readonly" +) + +type gcsPointInTimeStorage struct { + gcsStorage + + pointInTime time.Time +} + +func (gcs *gcsPointInTimeStorage) ListBlobs(ctx context.Context, blobIDPrefix blob.ID, cb func(bm blob.Metadata) error) error { + var ( + previousID blob.ID + vs []versionMetadata + ) + + err := gcs.listBlobVersions(ctx, blobIDPrefix, func(vm versionMetadata) error { + if vm.BlobID != previousID { + // different blob, process previous one + if v, found := newestAtUnlessDeleted(vs, gcs.pointInTime); found { + if err := cb(v.Metadata); err != nil { + return err + } + } + + previousID = vm.BlobID + vs = vs[:0] // reset for next blob to reuse the slice storage whenever possible and avoid unnecessary allocations. + } + + vs = append(vs, vm) + + return nil + }) + if err != nil { + return errors.Wrapf(err, "could not list blob versions at time %s", gcs.pointInTime) + } + + // process last blob + if v, found := newestAtUnlessDeleted(vs, gcs.pointInTime); found { + if err := cb(v.Metadata); err != nil { + return err + } + } + + return nil +} + +func (gcs *gcsPointInTimeStorage) GetBlob(ctx context.Context, b blob.ID, offset, length int64, output blob.OutputBuffer) error { + // getVersionedMetadata returns the specific blob version at time t + m, err := gcs.getVersionedMetadata(ctx, b) + if err != nil { + return errors.Wrap(err, "getting metadata") + } + + return gcs.getBlobWithVersion(ctx, b, m.Version, offset, length, output) +} + +func (gcs *gcsPointInTimeStorage) GetMetadata(ctx context.Context, b blob.ID) (blob.Metadata, error) { + bm, err := gcs.getVersionedMetadata(ctx, b) + + return bm.Metadata, err +} + +func (gcs *gcsPointInTimeStorage) getVersionedMetadata(ctx context.Context, b blob.ID) (versionMetadata, error) { + var vml []versionMetadata + + if err := gcs.getBlobVersions(ctx, b, func(m versionMetadata) error { + // only include versions older than s.pointInTime + if !m.Timestamp.After(gcs.pointInTime) { + vml = append(vml, m) + } + + return nil + }); err != nil { + return versionMetadata{}, errors.Wrapf(err, "could not get version metadata for blob %s", b) + } + + if v, found := newestAtUnlessDeleted(vml, gcs.pointInTime); found { + return v, nil + } + + return versionMetadata{}, blob.ErrBlobNotFound +} + +// newestAtUnlessDeleted returns the last version in the list older than the PIT. +// Google sorts in ascending order so return the last element in the list. +func newestAtUnlessDeleted(vx []versionMetadata, t time.Time) (v versionMetadata, found bool) { + vs := getOlderThan(vx, t) + + if len(vs) == 0 { + return versionMetadata{}, false + } + + v = vs[len(vs)-1] + + return v, !v.IsDeleteMarker +} + +// Removes versions that are newer than t. The filtering is done in place +// and uses the same slice storage as vs. Assumes entries in vs are in ascending +// timestamp order like Azure and unlike S3, which assumes descending. +func getOlderThan(vs []versionMetadata, t time.Time) []versionMetadata { + for i := range vs { + if vs[i].Timestamp.After(t) { + return vs[:i] + } + } + + return vs +} + +// maybePointInTimeStore wraps s with a point-in-time store when s is versioned +// and a point-in-time value is specified. Otherwise s is returned. +func maybePointInTimeStore(ctx context.Context, gcs *gcsStorage, pointInTime *time.Time) (blob.Storage, error) { + if pit := gcs.Options.PointInTime; pit == nil || pit.IsZero() { + return gcs, nil + } + + attrs, err := gcs.bucket.Attrs(ctx) + if err != nil { + return nil, errors.Wrapf(err, "could not get determine if bucket '%s' supports versioning", gcs.BucketName) + } + + if !attrs.VersioningEnabled { + return nil, errors.Errorf("cannot create point-in-time view for non-versioned bucket '%s'", gcs.BucketName) + } + + return readonly.NewWrapper(&gcsPointInTimeStorage{ + gcsStorage: *gcs, + pointInTime: *pointInTime, + }), nil +} diff --git a/repo/blob/gcs/gcs_storage.go b/repo/blob/gcs/gcs_storage.go index d3dbfb921..5c0aec3ff 100644 --- a/repo/blob/gcs/gcs_storage.go +++ b/repo/blob/gcs/gcs_storage.go @@ -7,6 +7,8 @@ "fmt" "net/http" "os" + "strconv" + "time" gcsclient "cloud.google.com/go/storage" "github.com/pkg/errors" @@ -26,6 +28,7 @@ const ( gcsStorageType = "gcs" writerChunkSize = 1 << 20 + latestVersionID = "" timeMapKey = "Kopia-Mtime" // case is important, first letter must be capitalized. ) @@ -39,12 +42,28 @@ type gcsStorage struct { } func (gcs *gcsStorage) GetBlob(ctx context.Context, b blob.ID, offset, length int64, output blob.OutputBuffer) error { + return gcs.getBlobWithVersion(ctx, b, latestVersionID, offset, length, output) +} + +// getBlobWithVersion returns full or partial contents of a blob with given ID and version. +func (gcs *gcsStorage) getBlobWithVersion(ctx context.Context, b blob.ID, version string, offset, length int64, output blob.OutputBuffer) error { if offset < 0 { return blob.ErrInvalidRange } + obj := gcs.bucket.Object(gcs.getObjectNameString(b)) + + if version != "" { + gen, err := strconv.ParseInt(version, 10, 64) + if err != nil { + return errors.Wrap(err, "failed to parse blob version") + } + + obj = obj.Generation(gen) + } + attempt := func() error { - reader, err := gcs.bucket.Object(gcs.getObjectNameString(b)).NewRangeReader(ctx, offset, length) + reader, err := obj.NewRangeReader(ctx, offset, length) if err != nil { return errors.Wrap(err, "NewRangeReader") } @@ -62,13 +81,20 @@ func (gcs *gcsStorage) GetBlob(ctx context.Context, b blob.ID, offset, length in } func (gcs *gcsStorage) GetMetadata(ctx context.Context, b blob.ID) (blob.Metadata, error) { - attrs, err := gcs.bucket.Object(gcs.getObjectNameString(b)).Attrs(ctx) + objName := gcs.getObjectNameString(b) + obj := gcs.bucket.Object(objName) + + attrs, err := obj.Attrs(ctx) if err != nil { return blob.Metadata{}, errors.Wrap(translateError(err), "Attrs") } + return gcs.getBlobMeta(attrs), nil +} + +func (gcs *gcsStorage) getBlobMeta(attrs *gcsclient.ObjectAttrs) blob.Metadata { bm := blob.Metadata{ - BlobID: b, + BlobID: gcs.toBlobID(attrs.Name), Length: attrs.Size, Timestamp: attrs.Created, } @@ -77,7 +103,7 @@ func (gcs *gcsStorage) GetMetadata(ctx context.Context, b blob.ID) (blob.Metadat bm.Timestamp = t } - return bm, nil + return bm } func translateError(err error) error { @@ -103,10 +129,6 @@ func translateError(err error) error { } func (gcs *gcsStorage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes, opts blob.PutOptions) error { - if opts.HasRetentionOptions() { - return errors.Wrap(blob.ErrUnsupportedPutBlobOption, "blob-retention") - } - ctx, cancel := context.WithCancel(ctx) obj := gcs.bucket.Object(gcs.getObjectNameString(b)) @@ -121,6 +143,14 @@ func (gcs *gcsStorage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes, writer.ContentType = "application/x-kopia" writer.ObjectAttrs.Metadata = timestampmeta.ToMap(opts.SetModTime, timeMapKey) + if opts.RetentionPeriod != 0 { + retainUntilDate := clock.Now().Add(opts.RetentionPeriod).UTC() + writer.ObjectAttrs.Retention = &gcsclient.ObjectRetention{ + Mode: string(blob.Locked), + RetainUntil: retainUntilDate, + } + } + err := iocopy.JustCopy(writer, data.Reader()) if err != nil { // cancel context before closing the writer causes it to abandon the upload. @@ -154,6 +184,22 @@ func (gcs *gcsStorage) DeleteBlob(ctx context.Context, b blob.ID) error { return err } +func (gcs *gcsStorage) ExtendBlobRetention(ctx context.Context, b blob.ID, opts blob.ExtendOptions) error { + retainUntilDate := clock.Now().Add(opts.RetentionPeriod).UTC().Truncate(time.Second) + + r := &gcsclient.ObjectRetention{ + Mode: string(blob.Locked), + RetainUntil: retainUntilDate, + } + + _, err := gcs.bucket.Object(gcs.getObjectNameString(b)).Update(ctx, gcsclient.ObjectAttrsToUpdate{Retention: r}) + if err != nil { + return errors.Wrap(err, "unable to extend retention period to "+retainUntilDate.String()) + } + + return nil +} + func (gcs *gcsStorage) getObjectNameString(blobID blob.ID) string { return gcs.Prefix + string(blobID) } @@ -165,15 +211,7 @@ func (gcs *gcsStorage) ListBlobs(ctx context.Context, prefix blob.ID, callback f oa, err := lst.Next() for err == nil { - bm := blob.Metadata{ - BlobID: blob.ID(oa.Name[len(gcs.Prefix):]), - Length: oa.Size, - Timestamp: oa.Created, - } - - if t, ok := timestampmeta.FromValue(oa.Metadata[timeMapKey]); ok { - bm.Timestamp = t - } + bm := gcs.getBlobMeta(oa) if cberr := callback(bm); cberr != nil { return cberr @@ -204,6 +242,10 @@ func (gcs *gcsStorage) Close(ctx context.Context) error { return errors.Wrap(gcs.storageClient.Close(), "error closing GCS storage") } +func (gcs *gcsStorage) toBlobID(blobName string) blob.ID { + return blob.ID(blobName[len(gcs.Prefix):]) +} + func tokenSourceFromCredentialsFile(ctx context.Context, fn string, scopes ...string) (oauth2.TokenSource, error) { data, err := os.ReadFile(fn) //nolint:gosec if err != nil { @@ -263,12 +305,17 @@ func New(ctx context.Context, opt *Options, isCreate bool) (blob.Storage, error) return nil, errors.New("bucket name must be specified") } - gcs := &gcsStorage{ + st := &gcsStorage{ Options: *opt, storageClient: cli, bucket: cli.Bucket(opt.BucketName), } + gcs, err := maybePointInTimeStore(ctx, st, opt.PointInTime) + if err != nil { + return nil, err + } + // verify GCS connection is functional by listing blobs in a bucket, which will fail if the bucket // does not exist. We list with a prefix that will not exist, to avoid iterating through any objects. nonExistentPrefix := fmt.Sprintf("kopia-gcs-storage-initializing-%v", clock.Now().UnixNano()) diff --git a/repo/blob/gcs/gcs_storage_test.go b/repo/blob/gcs/gcs_storage_test.go index 512cd4138..d56b3cd9a 100644 --- a/repo/blob/gcs/gcs_storage_test.go +++ b/repo/blob/gcs/gcs_storage_test.go @@ -7,10 +7,13 @@ "encoding/base64" "io" "os" + "strings" "testing" + gcsclient "cloud.google.com/go/storage" "github.com/google/uuid" "github.com/stretchr/testify/require" + "google.golang.org/api/option" "github.com/kopia/kopia/internal/blobtesting" "github.com/kopia/kopia/internal/providervalidation" @@ -20,6 +23,72 @@ "github.com/kopia/kopia/repo/blob/gcs" ) +const ( + testBucketEnv = "KOPIA_GCS_TEST_BUCKET" + testBucketProjectID = "KOPIA_GCS_TEST_PROJECT_ID" + testBucketCredentialsFile = "KOPIA_GCS_CREDENTIALS_FILE" + testBucketCredentialsJSONGzip = "KOPIA_GCS_CREDENTIALS_JSON_GZIP" + testImmutableBucketEnv = "KOPIA_GCS_TEST_IMMUTABLE_BUCKET" +) + +type bucketOpts struct { + bucket string + credentialsFile string + projectID string + isLockedBucket bool +} + +func createBucket(t *testing.T, opts bucketOpts) { + t.Helper() + ctx := context.Background() + + cli, err := gcsclient.NewClient(ctx, option.WithCredentialsFile(opts.credentialsFile)) + if err != nil { + t.Fatalf("unable to create GCS client: %v", err) + } + + attrs := &gcsclient.BucketAttrs{} + + bucketHandle := cli.Bucket(opts.bucket) + if opts.isLockedBucket { + attrs.VersioningEnabled = true + bucketHandle = bucketHandle.SetObjectRetention(true) + } + + err = bucketHandle.Create(ctx, opts.projectID, attrs) + if err == nil { + return + } + + if strings.Contains(err.Error(), "The requested bucket name is not available") { + return + } + + if strings.Contains(err.Error(), "Your previous request to create the named bucket succeeded and you already own it") { + return + } + + t.Fatalf("issue creating bucket: %v", err) +} + +func validateBucket(t *testing.T, opts bucketOpts) { + t.Helper() + ctx := context.Background() + + cli, err := gcsclient.NewClient(ctx, option.WithCredentialsFile(opts.credentialsFile)) + if err != nil { + t.Fatalf("unable to create GCS client: %v", err) + } + + attrs, err := cli.Bucket(opts.bucket).Attrs(ctx) + require.NoError(t, err) + + if opts.isLockedBucket { + require.True(t, attrs.VersioningEnabled) + require.Equal(t, "Enabled", attrs.ObjectRetentionMode) + } +} + func TestCleanupOldData(t *testing.T) { t.Parallel() testutil.ProviderTest(t) @@ -59,16 +128,13 @@ func TestGCSStorageInvalid(t *testing.T) { t.Parallel() testutil.ProviderTest(t) - bucket := os.Getenv("KOPIA_GCS_TEST_BUCKET") - if bucket == "" { - t.Skip("KOPIA_GCS_TEST_BUCKET not provided") - } + bucket := os.Getenv(testBucketEnv) ctx := testlogging.Context(t) if _, err := gcs.New(ctx, &gcs.Options{ BucketName: bucket + "-no-such-bucket", - ServiceAccountCredentialsFile: os.Getenv("KOPIA_GCS_CREDENTIALS_FILE"), + ServiceAccountCredentialsFile: os.Getenv(testBucketCredentialsFile), }, false); err == nil { t.Fatalf("unexpected success connecting to GCS, wanted error") } @@ -88,12 +154,12 @@ func gunzip(d []byte) ([]byte, error) { func mustGetOptionsOrSkip(t *testing.T, prefix string) *gcs.Options { t.Helper() - bucket := os.Getenv("KOPIA_GCS_TEST_BUCKET") + bucket := os.Getenv(testBucketEnv) if bucket == "" { t.Skip("KOPIA_GCS_TEST_BUCKET not provided") } - credDataGZ, err := base64.StdEncoding.DecodeString(os.Getenv("KOPIA_GCS_CREDENTIALS_JSON_GZIP")) + credDataGZ, err := base64.StdEncoding.DecodeString(os.Getenv(testBucketCredentialsJSONGzip)) if err != nil { t.Skip("skipping test because GCS credentials file can't be decoded") } @@ -109,3 +175,17 @@ func mustGetOptionsOrSkip(t *testing.T, prefix string) *gcs.Options { Prefix: prefix, } } + +func getBlobCount(ctx context.Context, t *testing.T, st blob.Storage, prefix blob.ID) int { + t.Helper() + + var count int + + err := st.ListBlobs(ctx, prefix, func(bm blob.Metadata) error { + count++ + return nil + }) + require.NoError(t, err) + + return count +} diff --git a/repo/blob/gcs/gcs_versioned.go b/repo/blob/gcs/gcs_versioned.go new file mode 100644 index 000000000..2142fda9f --- /dev/null +++ b/repo/blob/gcs/gcs_versioned.go @@ -0,0 +1,98 @@ +package gcs + +import ( + "context" + "strconv" + + "cloud.google.com/go/storage" + "github.com/pkg/errors" + "google.golang.org/api/iterator" + + "github.com/kopia/kopia/repo/blob" +) + +// versionMetadata has metadata for a single BLOB version. +type versionMetadata struct { + blob.Metadata + + // Versioning related information + IsDeleteMarker bool + Version string +} + +// versionMetadataCallback is called when processing the metadata for each blob version. +type versionMetadataCallback func(versionMetadata) error + +// getBlobVersions lists all the versions for the blob with the given ID. +func (gcs *gcsPointInTimeStorage) getBlobVersions(ctx context.Context, prefix blob.ID, callback versionMetadataCallback) error { + var foundBlobs bool + + if err := gcs.list(ctx, prefix, true, func(vm versionMetadata) error { + foundBlobs = true + + return callback(vm) + }); err != nil { + return err + } + + if !foundBlobs { + return blob.ErrBlobNotFound + } + + return nil +} + +// listBlobVersions lists all versions for all the blobs with the given blob ID prefix. +func (gcs *gcsPointInTimeStorage) listBlobVersions(ctx context.Context, prefix blob.ID, callback versionMetadataCallback) error { + return gcs.list(ctx, prefix, false, callback) +} + +func (gcs *gcsPointInTimeStorage) list(ctx context.Context, prefix blob.ID, onlyMatching bool, callback versionMetadataCallback) error { + query := storage.Query{ + Prefix: gcs.getObjectNameString(prefix), + // Versions true to output all generations of objects + Versions: true, + } + + ctx, cancel := context.WithCancel(ctx) + + defer cancel() + + it := gcs.bucket.Objects(ctx, &query) + + for { + attrs, err := it.Next() + if errors.Is(err, iterator.Done) { + break + } + + if err != nil { + return errors.Wrapf(err, "could not list objects with prefix %q", query.Prefix) + } + + if onlyMatching && attrs.Name != query.Prefix { + return nil + } + + om := gcs.getVersionMetadata(attrs) + + if errCallback := callback(om); errCallback != nil { + return errors.Wrapf(errCallback, "callback failed for %q", attrs.Name) + } + } + + return nil +} + +func (gcs *gcsPointInTimeStorage) getVersionMetadata(oi *storage.ObjectAttrs) versionMetadata { + bm := gcs.getBlobMeta(oi) + + return versionMetadata{ + Metadata: bm, + // Google marks all previous versions as logically deleted, so we should only consider + // a version deleted if the deletion occurred before the PIT. Unlike Azure/S3 there is no dedicated + // delete marker version (if a 1 version blob is deleted there is still 1 version). + IsDeleteMarker: !oi.Deleted.IsZero() && oi.Deleted.Before(*gcs.PointInTime), + Version: strconv.FormatInt(oi.Generation, 10), + } +} diff --git a/repo/blob/gcs/gcs_versioned_test.go b/repo/blob/gcs/gcs_versioned_test.go new file mode 100644 index 000000000..5b7be2d15 --- /dev/null +++ b/repo/blob/gcs/gcs_versioned_test.go @@ -0,0 +1,258 @@ +package gcs_test + +import ( + "context" + "crypto/rand" + "fmt" + "os" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + + "github.com/kopia/kopia/internal/clock" + "github.com/kopia/kopia/internal/gather" + "github.com/kopia/kopia/internal/testlogging" + "github.com/kopia/kopia/internal/testutil" + "github.com/kopia/kopia/repo/blob" + "github.com/kopia/kopia/repo/blob/gcs" +) + +func TestGetBlobVersionsFailsWhenVersioningDisabled(t *testing.T) { + t.Parallel() + testutil.ProviderTest(t) + + // must be with Versioning disabled. + bucket := os.Getenv(testBucketEnv) + + ctx := testlogging.Context(t) + data := make([]byte, 8) + rand.Read(data) + // use context that gets canceled after opening storage to ensure it's not used beyond New(). + newctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + + prefix := fmt.Sprintf("test-%v-%x/", clock.Now().Unix(), data) + opts := &gcs.Options{ + BucketName: bucket, + ServiceAccountCredentialsFile: os.Getenv(testBucketCredentialsFile), + Prefix: prefix, + } + st, err := gcs.New(newctx, opts, false) + require.NoError(t, err) + + t.Cleanup(func() { + st.Close(ctx) + }) + + pit := clock.Now() + opts.PointInTime = &pit + _, err = gcs.New(ctx, opts, false) + require.Error(t, err) +} + +func TestGetBlobVersions(t *testing.T) { + t.Parallel() + testutil.ProviderTest(t) + + // must be with Versioning enabled. + bOpts := bucketOpts{ + projectID: os.Getenv(testBucketProjectID), + bucket: os.Getenv(testImmutableBucketEnv), + credentialsFile: os.Getenv(testBucketCredentialsFile), + isLockedBucket: true, + } + + createBucket(t, bOpts) + validateBucket(t, bOpts) + + ctx := testlogging.Context(t) + data := make([]byte, 8) + rand.Read(data) + // use context that gets canceled after opening storage to ensure it's not used beyond New(). + newctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + + prefix := fmt.Sprintf("test-%v-%x/", clock.Now().Unix(), data) + opts := &gcs.Options{ + BucketName: bOpts.bucket, + ServiceAccountCredentialsFile: bOpts.credentialsFile, + Prefix: prefix, + } + st, err := gcs.New(newctx, opts, false) + require.NoError(t, err) + + t.Cleanup(func() { + st.Close(ctx) + }) + + const ( + originalData = "original" + updatedData = "some update" + latestData = "latest version" + ) + + dataBlobs := []string{originalData, updatedData, latestData} + + const blobName = "TestGetBlobVersions" + blobID := blob.ID(blobName) + dataTimestamps, err := putBlobs(ctx, st, blobID, dataBlobs) + require.NoError(t, err) + + pastPIT := dataTimestamps[0].Add(-1 * time.Second) + futurePIT := dataTimestamps[2].Add(1 * time.Second) + + for _, tt := range []struct { + testName string + pointInTime *time.Time + expectedBlobData string + expectedError error + }{ + { + testName: "unset PIT", + pointInTime: nil, + expectedBlobData: latestData, + expectedError: nil, + }, + { + testName: "set in the future", + pointInTime: &futurePIT, + expectedBlobData: latestData, + expectedError: nil, + }, + { + testName: "set in the past", + pointInTime: &pastPIT, + expectedBlobData: "", + expectedError: blob.ErrBlobNotFound, + }, + { + testName: "original data", + pointInTime: &dataTimestamps[0], + expectedBlobData: originalData, + expectedError: nil, + }, + { + testName: "updated data", + pointInTime: &dataTimestamps[1], + expectedBlobData: updatedData, + expectedError: nil, + }, + { + testName: "latest data", + pointInTime: &dataTimestamps[2], + expectedBlobData: latestData, + expectedError: nil, + }, + } { + t.Run(tt.testName, func(t *testing.T) { + opts.PointInTime = tt.pointInTime + st, err = gcs.New(ctx, opts, false) + require.NoError(t, err) + + var tmp gather.WriteBuffer + err = st.GetBlob(ctx, blobID, 0, -1, &tmp) + require.ErrorIs(t, err, tt.expectedError) + require.Equal(t, tt.expectedBlobData, string(tmp.ToByteSlice())) + }) + } +} + +func TestGetBlobVersionsWithDeletion(t *testing.T) { + t.Parallel() + testutil.ProviderTest(t) + + // must be with Versioning enabled. + bOpts := bucketOpts{ + projectID: os.Getenv(testBucketProjectID), + bucket: os.Getenv(testImmutableBucketEnv), + credentialsFile: os.Getenv(testBucketCredentialsFile), + isLockedBucket: true, + } + + createBucket(t, bOpts) + validateBucket(t, bOpts) + + ctx := testlogging.Context(t) + data := make([]byte, 8) + rand.Read(data) + // use context that gets canceled after opening storage to ensure it's not used beyond New(). + newctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) + + prefix := fmt.Sprintf("test-%v-%x/", clock.Now().Unix(), data) + opts := &gcs.Options{ + BucketName: bOpts.bucket, + ServiceAccountCredentialsFile: bOpts.credentialsFile, + Prefix: prefix, + } + st, err := gcs.New(newctx, opts, false) + require.NoError(t, err) + + t.Cleanup(func() { + st.Close(ctx) + }) + + const ( + originalData = "original" + updatedData = "some update" + ) + + dataBlobs := []string{originalData, updatedData} + + const blobName = "TestGetBlobVersionsWithDeletion" + blobID := blob.ID(blobName) + dataTimestamps, err := putBlobs(ctx, st, blobID, dataBlobs) + require.NoError(t, err) + + count := getBlobCount(ctx, t, st, blobID) + require.Equal(t, 1, count) + + err = st.DeleteBlob(ctx, blobID) + require.NoError(t, err) + + // blob no longer found. + count = getBlobCount(ctx, t, st, blobID) + require.Equal(t, 0, count) + + opts.PointInTime = &dataTimestamps[1] + st, err = gcs.New(ctx, opts, false) + require.NoError(t, err) + + // blob visible again with PIT set. + count = getBlobCount(ctx, t, st, blobID) + require.Equal(t, 1, count) + + var tmp gather.WriteBuffer + err = st.GetBlob(ctx, blobID, 0, -1, &tmp) + require.NoError(t, err) + require.Equal(t, updatedData, string(tmp.ToByteSlice())) + + opts.PointInTime = &dataTimestamps[0] + st, err = gcs.New(ctx, opts, false) + require.NoError(t, err) + + err = st.GetBlob(ctx, blobID, 0, -1, &tmp) + require.NoError(t, err) + require.Equal(t, originalData, string(tmp.ToByteSlice())) +} + +func putBlobs(ctx context.Context, cli blob.Storage, blobID blob.ID, blobs []string) ([]time.Time, error) { + var putTimes []time.Time + + for _, b := range blobs { + if err := cli.PutBlob(ctx, blobID, gather.FromSlice([]byte(b)), blob.PutOptions{}); err != nil { + return nil, errors.Wrap(err, "putting blob") + } + + m, err := cli.GetMetadata(ctx, blobID) + if err != nil { + return nil, errors.Wrap(err, "getting metadata") + } + + putTimes = append(putTimes, m.Timestamp) + } + + return putTimes, nil +}