mirror of
https://github.com/kopia/kopia.git
synced 2025-12-23 22:57:50 -05:00
feat(providers): GCS immutability (#4134)
- Allow blob `Put/ExtendBlobRetention` - PITR support - PITR/versioning tests
This commit is contained in:
committed by
GitHub
parent
083b448926
commit
1bceb7155e
@@ -4,6 +4,7 @@
|
||||
"context"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/alecthomas/kingpin/v2"
|
||||
"github.com/pkg/errors"
|
||||
@@ -26,11 +27,32 @@ func (c *storageGCSFlags) Setup(_ StorageProviderServices, cmd *kingpin.CmdClaus
|
||||
cmd.Flag("embed-credentials", "Embed GCS credentials JSON in Kopia configuration").BoolVar(&c.embedCredentials)
|
||||
|
||||
commonThrottlingFlags(cmd, &c.options.Limits)
|
||||
|
||||
var pointInTimeStr string
|
||||
|
||||
pitPreAction := func(_ *kingpin.ParseContext) error {
|
||||
if pointInTimeStr != "" {
|
||||
t, err := time.Parse(time.RFC3339, pointInTimeStr)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "invalid point-in-time argument")
|
||||
}
|
||||
|
||||
c.options.PointInTime = &t
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
cmd.Flag("point-in-time", "Use a point-in-time view of the storage repository when supported").PlaceHolder(time.RFC3339).PreAction(pitPreAction).StringVar(&pointInTimeStr)
|
||||
}
|
||||
|
||||
func (c *storageGCSFlags) Connect(ctx context.Context, isCreate bool, formatVersion int) (blob.Storage, error) {
|
||||
_ = formatVersion
|
||||
|
||||
if isCreate && c.options.PointInTime != nil && !c.options.PointInTime.IsZero() {
|
||||
return nil, errors.New("Cannot specify a 'point-in-time' option when creating a repository")
|
||||
}
|
||||
|
||||
if c.embedCredentials {
|
||||
data, err := os.ReadFile(c.options.ServiceAccountCredentialsFile)
|
||||
if err != nil {
|
||||
|
||||
125
repo/blob/gcs/gcs_immu_test.go
Normal file
125
repo/blob/gcs/gcs_immu_test.go
Normal file
@@ -0,0 +1,125 @@
|
||||
package gcs_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
gcsclient "cloud.google.com/go/storage"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/api/option"
|
||||
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/internal/testlogging"
|
||||
"github.com/kopia/kopia/internal/testutil"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/blob/gcs"
|
||||
)
|
||||
|
||||
// TestGoogleStorageImmutabilityProtection runs through the behavior of Google immutability protection.
|
||||
func TestGoogleStorageImmutabilityProtection(t *testing.T) {
|
||||
t.Parallel()
|
||||
testutil.ProviderTest(t)
|
||||
|
||||
opts := bucketOpts{
|
||||
projectID: os.Getenv(testBucketProjectID),
|
||||
bucket: os.Getenv(testImmutableBucketEnv),
|
||||
credentialsFile: os.Getenv(testBucketCredentialsFile),
|
||||
isLockedBucket: true,
|
||||
}
|
||||
createBucket(t, opts)
|
||||
validateBucket(t, opts)
|
||||
|
||||
data := make([]byte, 8)
|
||||
rand.Read(data)
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
|
||||
// use context that gets canceled after opening storage to ensure it's not used beyond New().
|
||||
newctx, cancel := context.WithCancel(ctx)
|
||||
prefix := fmt.Sprintf("test-%v-%x/", clock.Now().Unix(), data)
|
||||
st, err := gcs.New(newctx, &gcs.Options{
|
||||
BucketName: opts.bucket,
|
||||
ServiceAccountCredentialsFile: opts.credentialsFile,
|
||||
Prefix: prefix,
|
||||
}, false)
|
||||
|
||||
cancel()
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Cleanup(func() {
|
||||
st.Close(ctx)
|
||||
})
|
||||
|
||||
const (
|
||||
blobName = "sExample"
|
||||
dummyBlob = blob.ID(blobName)
|
||||
)
|
||||
|
||||
blobNameFullPath := prefix + blobName
|
||||
|
||||
putOpts := blob.PutOptions{
|
||||
RetentionPeriod: 3 * time.Second,
|
||||
}
|
||||
err = st.PutBlob(ctx, dummyBlob, gather.FromSlice([]byte("x")), putOpts)
|
||||
require.NoError(t, err)
|
||||
cli := getGoogleCLI(t, opts.credentialsFile)
|
||||
|
||||
count := getBlobCount(ctx, t, st, dummyBlob[:1])
|
||||
require.Equal(t, 1, count)
|
||||
|
||||
attrs, err := cli.Bucket(opts.bucket).Object(blobNameFullPath).Attrs(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
blobRetention := attrs.RetentionExpirationTime
|
||||
if !blobRetention.After(attrs.Created) {
|
||||
t.Fatalf("blob retention period not in the future enough: %v (created at %v)", blobRetention, attrs.Created)
|
||||
}
|
||||
|
||||
extendOpts := blob.ExtendOptions{
|
||||
RetentionPeriod: 10 * time.Second,
|
||||
}
|
||||
err = st.ExtendBlobRetention(ctx, dummyBlob, extendOpts)
|
||||
require.NoError(t, err)
|
||||
|
||||
attrs, err = cli.Bucket(opts.bucket).Object(blobNameFullPath).Attrs(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
extendedRetention := attrs.RetentionExpirationTime
|
||||
if !extendedRetention.After(blobRetention) {
|
||||
t.Fatalf("blob retention period not extended. was %v, now %v", blobRetention, extendedRetention)
|
||||
}
|
||||
|
||||
updAttrs := gcsclient.ObjectAttrsToUpdate{
|
||||
Retention: &gcsclient.ObjectRetention{
|
||||
Mode: "Unlocked",
|
||||
RetainUntil: clock.Now().Add(10 * time.Minute),
|
||||
},
|
||||
}
|
||||
_, err = cli.Bucket(opts.bucket).Object(blobNameFullPath).OverrideUnlockedRetention(true).Update(ctx, updAttrs)
|
||||
require.Error(t, err)
|
||||
require.ErrorContains(t, err, "Its retention mode cannot be changed and its retention period cannot be shortened.")
|
||||
|
||||
err = st.DeleteBlob(ctx, dummyBlob)
|
||||
require.NoError(t, err)
|
||||
|
||||
count = getBlobCount(ctx, t, st, dummyBlob[:1])
|
||||
require.Equal(t, 0, count)
|
||||
}
|
||||
|
||||
// getGoogleCLI returns a separate client to verify things the Storage interface doesn't support.
|
||||
func getGoogleCLI(t *testing.T, credentialsFile string) *gcsclient.Client {
|
||||
t.Helper()
|
||||
|
||||
ctx := context.Background()
|
||||
cli, err := gcsclient.NewClient(ctx, option.WithCredentialsFile(credentialsFile))
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create GCS client: %v", err)
|
||||
}
|
||||
|
||||
return cli
|
||||
}
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"time"
|
||||
|
||||
"github.com/kopia/kopia/repo/blob/throttling"
|
||||
)
|
||||
@@ -24,4 +25,7 @@ type Options struct {
|
||||
ReadOnly bool `json:"readOnly,omitempty"`
|
||||
|
||||
throttling.Limits
|
||||
|
||||
// PointInTime specifies a view of the (versioned) store at that time
|
||||
PointInTime *time.Time `json:"pointInTime,omitempty"`
|
||||
}
|
||||
|
||||
140
repo/blob/gcs/gcs_pit.go
Normal file
140
repo/blob/gcs/gcs_pit.go
Normal file
@@ -0,0 +1,140 @@
|
||||
package gcs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/blob/readonly"
|
||||
)
|
||||
|
||||
type gcsPointInTimeStorage struct {
|
||||
gcsStorage
|
||||
|
||||
pointInTime time.Time
|
||||
}
|
||||
|
||||
func (gcs *gcsPointInTimeStorage) ListBlobs(ctx context.Context, blobIDPrefix blob.ID, cb func(bm blob.Metadata) error) error {
|
||||
var (
|
||||
previousID blob.ID
|
||||
vs []versionMetadata
|
||||
)
|
||||
|
||||
err := gcs.listBlobVersions(ctx, blobIDPrefix, func(vm versionMetadata) error {
|
||||
if vm.BlobID != previousID {
|
||||
// different blob, process previous one
|
||||
if v, found := newestAtUnlessDeleted(vs, gcs.pointInTime); found {
|
||||
if err := cb(v.Metadata); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
previousID = vm.BlobID
|
||||
vs = vs[:0] // reset for next blob to reuse the slice storage whenever possible and avoid unnecessary allocations.
|
||||
}
|
||||
|
||||
vs = append(vs, vm)
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "could not list blob versions at time %s", gcs.pointInTime)
|
||||
}
|
||||
|
||||
// process last blob
|
||||
if v, found := newestAtUnlessDeleted(vs, gcs.pointInTime); found {
|
||||
if err := cb(v.Metadata); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gcs *gcsPointInTimeStorage) GetBlob(ctx context.Context, b blob.ID, offset, length int64, output blob.OutputBuffer) error {
|
||||
// getVersionedMetadata returns the specific blob version at time t
|
||||
m, err := gcs.getVersionedMetadata(ctx, b)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "getting metadata")
|
||||
}
|
||||
|
||||
return gcs.getBlobWithVersion(ctx, b, m.Version, offset, length, output)
|
||||
}
|
||||
|
||||
func (gcs *gcsPointInTimeStorage) GetMetadata(ctx context.Context, b blob.ID) (blob.Metadata, error) {
|
||||
bm, err := gcs.getVersionedMetadata(ctx, b)
|
||||
|
||||
return bm.Metadata, err
|
||||
}
|
||||
|
||||
func (gcs *gcsPointInTimeStorage) getVersionedMetadata(ctx context.Context, b blob.ID) (versionMetadata, error) {
|
||||
var vml []versionMetadata
|
||||
|
||||
if err := gcs.getBlobVersions(ctx, b, func(m versionMetadata) error {
|
||||
// only include versions older than s.pointInTime
|
||||
if !m.Timestamp.After(gcs.pointInTime) {
|
||||
vml = append(vml, m)
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return versionMetadata{}, errors.Wrapf(err, "could not get version metadata for blob %s", b)
|
||||
}
|
||||
|
||||
if v, found := newestAtUnlessDeleted(vml, gcs.pointInTime); found {
|
||||
return v, nil
|
||||
}
|
||||
|
||||
return versionMetadata{}, blob.ErrBlobNotFound
|
||||
}
|
||||
|
||||
// newestAtUnlessDeleted returns the last version in the list older than the PIT.
|
||||
// Google sorts in ascending order so return the last element in the list.
|
||||
func newestAtUnlessDeleted(vx []versionMetadata, t time.Time) (v versionMetadata, found bool) {
|
||||
vs := getOlderThan(vx, t)
|
||||
|
||||
if len(vs) == 0 {
|
||||
return versionMetadata{}, false
|
||||
}
|
||||
|
||||
v = vs[len(vs)-1]
|
||||
|
||||
return v, !v.IsDeleteMarker
|
||||
}
|
||||
|
||||
// Removes versions that are newer than t. The filtering is done in place
|
||||
// and uses the same slice storage as vs. Assumes entries in vs are in ascending
|
||||
// timestamp order like Azure and unlike S3, which assumes descending.
|
||||
func getOlderThan(vs []versionMetadata, t time.Time) []versionMetadata {
|
||||
for i := range vs {
|
||||
if vs[i].Timestamp.After(t) {
|
||||
return vs[:i]
|
||||
}
|
||||
}
|
||||
|
||||
return vs
|
||||
}
|
||||
|
||||
// maybePointInTimeStore wraps s with a point-in-time store when s is versioned
|
||||
// and a point-in-time value is specified. Otherwise s is returned.
|
||||
func maybePointInTimeStore(ctx context.Context, gcs *gcsStorage, pointInTime *time.Time) (blob.Storage, error) {
|
||||
if pit := gcs.Options.PointInTime; pit == nil || pit.IsZero() {
|
||||
return gcs, nil
|
||||
}
|
||||
|
||||
attrs, err := gcs.bucket.Attrs(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "could not get determine if bucket '%s' supports versioning", gcs.BucketName)
|
||||
}
|
||||
|
||||
if !attrs.VersioningEnabled {
|
||||
return nil, errors.Errorf("cannot create point-in-time view for non-versioned bucket '%s'", gcs.BucketName)
|
||||
}
|
||||
|
||||
return readonly.NewWrapper(&gcsPointInTimeStorage{
|
||||
gcsStorage: *gcs,
|
||||
pointInTime: *pointInTime,
|
||||
}), nil
|
||||
}
|
||||
@@ -7,6 +7,8 @@
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
gcsclient "cloud.google.com/go/storage"
|
||||
"github.com/pkg/errors"
|
||||
@@ -26,6 +28,7 @@
|
||||
const (
|
||||
gcsStorageType = "gcs"
|
||||
writerChunkSize = 1 << 20
|
||||
latestVersionID = ""
|
||||
|
||||
timeMapKey = "Kopia-Mtime" // case is important, first letter must be capitalized.
|
||||
)
|
||||
@@ -39,12 +42,28 @@ type gcsStorage struct {
|
||||
}
|
||||
|
||||
func (gcs *gcsStorage) GetBlob(ctx context.Context, b blob.ID, offset, length int64, output blob.OutputBuffer) error {
|
||||
return gcs.getBlobWithVersion(ctx, b, latestVersionID, offset, length, output)
|
||||
}
|
||||
|
||||
// getBlobWithVersion returns full or partial contents of a blob with given ID and version.
|
||||
func (gcs *gcsStorage) getBlobWithVersion(ctx context.Context, b blob.ID, version string, offset, length int64, output blob.OutputBuffer) error {
|
||||
if offset < 0 {
|
||||
return blob.ErrInvalidRange
|
||||
}
|
||||
|
||||
obj := gcs.bucket.Object(gcs.getObjectNameString(b))
|
||||
|
||||
if version != "" {
|
||||
gen, err := strconv.ParseInt(version, 10, 64)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to parse blob version")
|
||||
}
|
||||
|
||||
obj = obj.Generation(gen)
|
||||
}
|
||||
|
||||
attempt := func() error {
|
||||
reader, err := gcs.bucket.Object(gcs.getObjectNameString(b)).NewRangeReader(ctx, offset, length)
|
||||
reader, err := obj.NewRangeReader(ctx, offset, length)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "NewRangeReader")
|
||||
}
|
||||
@@ -62,13 +81,20 @@ func (gcs *gcsStorage) GetBlob(ctx context.Context, b blob.ID, offset, length in
|
||||
}
|
||||
|
||||
func (gcs *gcsStorage) GetMetadata(ctx context.Context, b blob.ID) (blob.Metadata, error) {
|
||||
attrs, err := gcs.bucket.Object(gcs.getObjectNameString(b)).Attrs(ctx)
|
||||
objName := gcs.getObjectNameString(b)
|
||||
obj := gcs.bucket.Object(objName)
|
||||
|
||||
attrs, err := obj.Attrs(ctx)
|
||||
if err != nil {
|
||||
return blob.Metadata{}, errors.Wrap(translateError(err), "Attrs")
|
||||
}
|
||||
|
||||
return gcs.getBlobMeta(attrs), nil
|
||||
}
|
||||
|
||||
func (gcs *gcsStorage) getBlobMeta(attrs *gcsclient.ObjectAttrs) blob.Metadata {
|
||||
bm := blob.Metadata{
|
||||
BlobID: b,
|
||||
BlobID: gcs.toBlobID(attrs.Name),
|
||||
Length: attrs.Size,
|
||||
Timestamp: attrs.Created,
|
||||
}
|
||||
@@ -77,7 +103,7 @@ func (gcs *gcsStorage) GetMetadata(ctx context.Context, b blob.ID) (blob.Metadat
|
||||
bm.Timestamp = t
|
||||
}
|
||||
|
||||
return bm, nil
|
||||
return bm
|
||||
}
|
||||
|
||||
func translateError(err error) error {
|
||||
@@ -103,10 +129,6 @@ func translateError(err error) error {
|
||||
}
|
||||
|
||||
func (gcs *gcsStorage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes, opts blob.PutOptions) error {
|
||||
if opts.HasRetentionOptions() {
|
||||
return errors.Wrap(blob.ErrUnsupportedPutBlobOption, "blob-retention")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
obj := gcs.bucket.Object(gcs.getObjectNameString(b))
|
||||
@@ -121,6 +143,14 @@ func (gcs *gcsStorage) PutBlob(ctx context.Context, b blob.ID, data blob.Bytes,
|
||||
writer.ContentType = "application/x-kopia"
|
||||
writer.ObjectAttrs.Metadata = timestampmeta.ToMap(opts.SetModTime, timeMapKey)
|
||||
|
||||
if opts.RetentionPeriod != 0 {
|
||||
retainUntilDate := clock.Now().Add(opts.RetentionPeriod).UTC()
|
||||
writer.ObjectAttrs.Retention = &gcsclient.ObjectRetention{
|
||||
Mode: string(blob.Locked),
|
||||
RetainUntil: retainUntilDate,
|
||||
}
|
||||
}
|
||||
|
||||
err := iocopy.JustCopy(writer, data.Reader())
|
||||
if err != nil {
|
||||
// cancel context before closing the writer causes it to abandon the upload.
|
||||
@@ -154,6 +184,22 @@ func (gcs *gcsStorage) DeleteBlob(ctx context.Context, b blob.ID) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (gcs *gcsStorage) ExtendBlobRetention(ctx context.Context, b blob.ID, opts blob.ExtendOptions) error {
|
||||
retainUntilDate := clock.Now().Add(opts.RetentionPeriod).UTC().Truncate(time.Second)
|
||||
|
||||
r := &gcsclient.ObjectRetention{
|
||||
Mode: string(blob.Locked),
|
||||
RetainUntil: retainUntilDate,
|
||||
}
|
||||
|
||||
_, err := gcs.bucket.Object(gcs.getObjectNameString(b)).Update(ctx, gcsclient.ObjectAttrsToUpdate{Retention: r})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to extend retention period to "+retainUntilDate.String())
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gcs *gcsStorage) getObjectNameString(blobID blob.ID) string {
|
||||
return gcs.Prefix + string(blobID)
|
||||
}
|
||||
@@ -165,15 +211,7 @@ func (gcs *gcsStorage) ListBlobs(ctx context.Context, prefix blob.ID, callback f
|
||||
|
||||
oa, err := lst.Next()
|
||||
for err == nil {
|
||||
bm := blob.Metadata{
|
||||
BlobID: blob.ID(oa.Name[len(gcs.Prefix):]),
|
||||
Length: oa.Size,
|
||||
Timestamp: oa.Created,
|
||||
}
|
||||
|
||||
if t, ok := timestampmeta.FromValue(oa.Metadata[timeMapKey]); ok {
|
||||
bm.Timestamp = t
|
||||
}
|
||||
bm := gcs.getBlobMeta(oa)
|
||||
|
||||
if cberr := callback(bm); cberr != nil {
|
||||
return cberr
|
||||
@@ -204,6 +242,10 @@ func (gcs *gcsStorage) Close(ctx context.Context) error {
|
||||
return errors.Wrap(gcs.storageClient.Close(), "error closing GCS storage")
|
||||
}
|
||||
|
||||
func (gcs *gcsStorage) toBlobID(blobName string) blob.ID {
|
||||
return blob.ID(blobName[len(gcs.Prefix):])
|
||||
}
|
||||
|
||||
func tokenSourceFromCredentialsFile(ctx context.Context, fn string, scopes ...string) (oauth2.TokenSource, error) {
|
||||
data, err := os.ReadFile(fn) //nolint:gosec
|
||||
if err != nil {
|
||||
@@ -263,12 +305,17 @@ func New(ctx context.Context, opt *Options, isCreate bool) (blob.Storage, error)
|
||||
return nil, errors.New("bucket name must be specified")
|
||||
}
|
||||
|
||||
gcs := &gcsStorage{
|
||||
st := &gcsStorage{
|
||||
Options: *opt,
|
||||
storageClient: cli,
|
||||
bucket: cli.Bucket(opt.BucketName),
|
||||
}
|
||||
|
||||
gcs, err := maybePointInTimeStore(ctx, st, opt.PointInTime)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// verify GCS connection is functional by listing blobs in a bucket, which will fail if the bucket
|
||||
// does not exist. We list with a prefix that will not exist, to avoid iterating through any objects.
|
||||
nonExistentPrefix := fmt.Sprintf("kopia-gcs-storage-initializing-%v", clock.Now().UnixNano())
|
||||
|
||||
@@ -7,10 +7,13 @@
|
||||
"encoding/base64"
|
||||
"io"
|
||||
"os"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
gcsclient "cloud.google.com/go/storage"
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/api/option"
|
||||
|
||||
"github.com/kopia/kopia/internal/blobtesting"
|
||||
"github.com/kopia/kopia/internal/providervalidation"
|
||||
@@ -20,6 +23,72 @@
|
||||
"github.com/kopia/kopia/repo/blob/gcs"
|
||||
)
|
||||
|
||||
const (
|
||||
testBucketEnv = "KOPIA_GCS_TEST_BUCKET"
|
||||
testBucketProjectID = "KOPIA_GCS_TEST_PROJECT_ID"
|
||||
testBucketCredentialsFile = "KOPIA_GCS_CREDENTIALS_FILE"
|
||||
testBucketCredentialsJSONGzip = "KOPIA_GCS_CREDENTIALS_JSON_GZIP"
|
||||
testImmutableBucketEnv = "KOPIA_GCS_TEST_IMMUTABLE_BUCKET"
|
||||
)
|
||||
|
||||
type bucketOpts struct {
|
||||
bucket string
|
||||
credentialsFile string
|
||||
projectID string
|
||||
isLockedBucket bool
|
||||
}
|
||||
|
||||
func createBucket(t *testing.T, opts bucketOpts) {
|
||||
t.Helper()
|
||||
ctx := context.Background()
|
||||
|
||||
cli, err := gcsclient.NewClient(ctx, option.WithCredentialsFile(opts.credentialsFile))
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create GCS client: %v", err)
|
||||
}
|
||||
|
||||
attrs := &gcsclient.BucketAttrs{}
|
||||
|
||||
bucketHandle := cli.Bucket(opts.bucket)
|
||||
if opts.isLockedBucket {
|
||||
attrs.VersioningEnabled = true
|
||||
bucketHandle = bucketHandle.SetObjectRetention(true)
|
||||
}
|
||||
|
||||
err = bucketHandle.Create(ctx, opts.projectID, attrs)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if strings.Contains(err.Error(), "The requested bucket name is not available") {
|
||||
return
|
||||
}
|
||||
|
||||
if strings.Contains(err.Error(), "Your previous request to create the named bucket succeeded and you already own it") {
|
||||
return
|
||||
}
|
||||
|
||||
t.Fatalf("issue creating bucket: %v", err)
|
||||
}
|
||||
|
||||
func validateBucket(t *testing.T, opts bucketOpts) {
|
||||
t.Helper()
|
||||
ctx := context.Background()
|
||||
|
||||
cli, err := gcsclient.NewClient(ctx, option.WithCredentialsFile(opts.credentialsFile))
|
||||
if err != nil {
|
||||
t.Fatalf("unable to create GCS client: %v", err)
|
||||
}
|
||||
|
||||
attrs, err := cli.Bucket(opts.bucket).Attrs(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
if opts.isLockedBucket {
|
||||
require.True(t, attrs.VersioningEnabled)
|
||||
require.Equal(t, "Enabled", attrs.ObjectRetentionMode)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanupOldData(t *testing.T) {
|
||||
t.Parallel()
|
||||
testutil.ProviderTest(t)
|
||||
@@ -59,16 +128,13 @@ func TestGCSStorageInvalid(t *testing.T) {
|
||||
t.Parallel()
|
||||
testutil.ProviderTest(t)
|
||||
|
||||
bucket := os.Getenv("KOPIA_GCS_TEST_BUCKET")
|
||||
if bucket == "" {
|
||||
t.Skip("KOPIA_GCS_TEST_BUCKET not provided")
|
||||
}
|
||||
bucket := os.Getenv(testBucketEnv)
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
|
||||
if _, err := gcs.New(ctx, &gcs.Options{
|
||||
BucketName: bucket + "-no-such-bucket",
|
||||
ServiceAccountCredentialsFile: os.Getenv("KOPIA_GCS_CREDENTIALS_FILE"),
|
||||
ServiceAccountCredentialsFile: os.Getenv(testBucketCredentialsFile),
|
||||
}, false); err == nil {
|
||||
t.Fatalf("unexpected success connecting to GCS, wanted error")
|
||||
}
|
||||
@@ -88,12 +154,12 @@ func gunzip(d []byte) ([]byte, error) {
|
||||
func mustGetOptionsOrSkip(t *testing.T, prefix string) *gcs.Options {
|
||||
t.Helper()
|
||||
|
||||
bucket := os.Getenv("KOPIA_GCS_TEST_BUCKET")
|
||||
bucket := os.Getenv(testBucketEnv)
|
||||
if bucket == "" {
|
||||
t.Skip("KOPIA_GCS_TEST_BUCKET not provided")
|
||||
}
|
||||
|
||||
credDataGZ, err := base64.StdEncoding.DecodeString(os.Getenv("KOPIA_GCS_CREDENTIALS_JSON_GZIP"))
|
||||
credDataGZ, err := base64.StdEncoding.DecodeString(os.Getenv(testBucketCredentialsJSONGzip))
|
||||
if err != nil {
|
||||
t.Skip("skipping test because GCS credentials file can't be decoded")
|
||||
}
|
||||
@@ -109,3 +175,17 @@ func mustGetOptionsOrSkip(t *testing.T, prefix string) *gcs.Options {
|
||||
Prefix: prefix,
|
||||
}
|
||||
}
|
||||
|
||||
func getBlobCount(ctx context.Context, t *testing.T, st blob.Storage, prefix blob.ID) int {
|
||||
t.Helper()
|
||||
|
||||
var count int
|
||||
|
||||
err := st.ListBlobs(ctx, prefix, func(bm blob.Metadata) error {
|
||||
count++
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
return count
|
||||
}
|
||||
|
||||
98
repo/blob/gcs/gcs_versioned.go
Normal file
98
repo/blob/gcs/gcs_versioned.go
Normal file
@@ -0,0 +1,98 @@
|
||||
package gcs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
|
||||
"cloud.google.com/go/storage"
|
||||
"github.com/pkg/errors"
|
||||
"google.golang.org/api/iterator"
|
||||
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
)
|
||||
|
||||
// versionMetadata has metadata for a single BLOB version.
|
||||
type versionMetadata struct {
|
||||
blob.Metadata
|
||||
|
||||
// Versioning related information
|
||||
IsDeleteMarker bool
|
||||
Version string
|
||||
}
|
||||
|
||||
// versionMetadataCallback is called when processing the metadata for each blob version.
|
||||
type versionMetadataCallback func(versionMetadata) error
|
||||
|
||||
// getBlobVersions lists all the versions for the blob with the given ID.
|
||||
func (gcs *gcsPointInTimeStorage) getBlobVersions(ctx context.Context, prefix blob.ID, callback versionMetadataCallback) error {
|
||||
var foundBlobs bool
|
||||
|
||||
if err := gcs.list(ctx, prefix, true, func(vm versionMetadata) error {
|
||||
foundBlobs = true
|
||||
|
||||
return callback(vm)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !foundBlobs {
|
||||
return blob.ErrBlobNotFound
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// listBlobVersions lists all versions for all the blobs with the given blob ID prefix.
|
||||
func (gcs *gcsPointInTimeStorage) listBlobVersions(ctx context.Context, prefix blob.ID, callback versionMetadataCallback) error {
|
||||
return gcs.list(ctx, prefix, false, callback)
|
||||
}
|
||||
|
||||
func (gcs *gcsPointInTimeStorage) list(ctx context.Context, prefix blob.ID, onlyMatching bool, callback versionMetadataCallback) error {
|
||||
query := storage.Query{
|
||||
Prefix: gcs.getObjectNameString(prefix),
|
||||
// Versions true to output all generations of objects
|
||||
Versions: true,
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
defer cancel()
|
||||
|
||||
it := gcs.bucket.Objects(ctx, &query)
|
||||
|
||||
for {
|
||||
attrs, err := it.Next()
|
||||
if errors.Is(err, iterator.Done) {
|
||||
break
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "could not list objects with prefix %q", query.Prefix)
|
||||
}
|
||||
|
||||
if onlyMatching && attrs.Name != query.Prefix {
|
||||
return nil
|
||||
}
|
||||
|
||||
om := gcs.getVersionMetadata(attrs)
|
||||
|
||||
if errCallback := callback(om); errCallback != nil {
|
||||
return errors.Wrapf(errCallback, "callback failed for %q", attrs.Name)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (gcs *gcsPointInTimeStorage) getVersionMetadata(oi *storage.ObjectAttrs) versionMetadata {
|
||||
bm := gcs.getBlobMeta(oi)
|
||||
|
||||
return versionMetadata{
|
||||
Metadata: bm,
|
||||
// Google marks all previous versions as logically deleted, so we should only consider
|
||||
// a version deleted if the deletion occurred before the PIT. Unlike Azure/S3 there is no dedicated
|
||||
// delete marker version (if a 1 version blob is deleted there is still 1 version).
|
||||
IsDeleteMarker: !oi.Deleted.IsZero() && oi.Deleted.Before(*gcs.PointInTime),
|
||||
Version: strconv.FormatInt(oi.Generation, 10),
|
||||
}
|
||||
}
|
||||
258
repo/blob/gcs/gcs_versioned_test.go
Normal file
258
repo/blob/gcs/gcs_versioned_test.go
Normal file
@@ -0,0 +1,258 @@
|
||||
package gcs_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/kopia/kopia/internal/clock"
|
||||
"github.com/kopia/kopia/internal/gather"
|
||||
"github.com/kopia/kopia/internal/testlogging"
|
||||
"github.com/kopia/kopia/internal/testutil"
|
||||
"github.com/kopia/kopia/repo/blob"
|
||||
"github.com/kopia/kopia/repo/blob/gcs"
|
||||
)
|
||||
|
||||
func TestGetBlobVersionsFailsWhenVersioningDisabled(t *testing.T) {
|
||||
t.Parallel()
|
||||
testutil.ProviderTest(t)
|
||||
|
||||
// must be with Versioning disabled.
|
||||
bucket := os.Getenv(testBucketEnv)
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
data := make([]byte, 8)
|
||||
rand.Read(data)
|
||||
// use context that gets canceled after opening storage to ensure it's not used beyond New().
|
||||
newctx, cancel := context.WithCancel(ctx)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
prefix := fmt.Sprintf("test-%v-%x/", clock.Now().Unix(), data)
|
||||
opts := &gcs.Options{
|
||||
BucketName: bucket,
|
||||
ServiceAccountCredentialsFile: os.Getenv(testBucketCredentialsFile),
|
||||
Prefix: prefix,
|
||||
}
|
||||
st, err := gcs.New(newctx, opts, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Cleanup(func() {
|
||||
st.Close(ctx)
|
||||
})
|
||||
|
||||
pit := clock.Now()
|
||||
opts.PointInTime = &pit
|
||||
_, err = gcs.New(ctx, opts, false)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestGetBlobVersions(t *testing.T) {
|
||||
t.Parallel()
|
||||
testutil.ProviderTest(t)
|
||||
|
||||
// must be with Versioning enabled.
|
||||
bOpts := bucketOpts{
|
||||
projectID: os.Getenv(testBucketProjectID),
|
||||
bucket: os.Getenv(testImmutableBucketEnv),
|
||||
credentialsFile: os.Getenv(testBucketCredentialsFile),
|
||||
isLockedBucket: true,
|
||||
}
|
||||
|
||||
createBucket(t, bOpts)
|
||||
validateBucket(t, bOpts)
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
data := make([]byte, 8)
|
||||
rand.Read(data)
|
||||
// use context that gets canceled after opening storage to ensure it's not used beyond New().
|
||||
newctx, cancel := context.WithCancel(ctx)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
prefix := fmt.Sprintf("test-%v-%x/", clock.Now().Unix(), data)
|
||||
opts := &gcs.Options{
|
||||
BucketName: bOpts.bucket,
|
||||
ServiceAccountCredentialsFile: bOpts.credentialsFile,
|
||||
Prefix: prefix,
|
||||
}
|
||||
st, err := gcs.New(newctx, opts, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Cleanup(func() {
|
||||
st.Close(ctx)
|
||||
})
|
||||
|
||||
const (
|
||||
originalData = "original"
|
||||
updatedData = "some update"
|
||||
latestData = "latest version"
|
||||
)
|
||||
|
||||
dataBlobs := []string{originalData, updatedData, latestData}
|
||||
|
||||
const blobName = "TestGetBlobVersions"
|
||||
blobID := blob.ID(blobName)
|
||||
dataTimestamps, err := putBlobs(ctx, st, blobID, dataBlobs)
|
||||
require.NoError(t, err)
|
||||
|
||||
pastPIT := dataTimestamps[0].Add(-1 * time.Second)
|
||||
futurePIT := dataTimestamps[2].Add(1 * time.Second)
|
||||
|
||||
for _, tt := range []struct {
|
||||
testName string
|
||||
pointInTime *time.Time
|
||||
expectedBlobData string
|
||||
expectedError error
|
||||
}{
|
||||
{
|
||||
testName: "unset PIT",
|
||||
pointInTime: nil,
|
||||
expectedBlobData: latestData,
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
testName: "set in the future",
|
||||
pointInTime: &futurePIT,
|
||||
expectedBlobData: latestData,
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
testName: "set in the past",
|
||||
pointInTime: &pastPIT,
|
||||
expectedBlobData: "",
|
||||
expectedError: blob.ErrBlobNotFound,
|
||||
},
|
||||
{
|
||||
testName: "original data",
|
||||
pointInTime: &dataTimestamps[0],
|
||||
expectedBlobData: originalData,
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
testName: "updated data",
|
||||
pointInTime: &dataTimestamps[1],
|
||||
expectedBlobData: updatedData,
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
testName: "latest data",
|
||||
pointInTime: &dataTimestamps[2],
|
||||
expectedBlobData: latestData,
|
||||
expectedError: nil,
|
||||
},
|
||||
} {
|
||||
t.Run(tt.testName, func(t *testing.T) {
|
||||
opts.PointInTime = tt.pointInTime
|
||||
st, err = gcs.New(ctx, opts, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
var tmp gather.WriteBuffer
|
||||
err = st.GetBlob(ctx, blobID, 0, -1, &tmp)
|
||||
require.ErrorIs(t, err, tt.expectedError)
|
||||
require.Equal(t, tt.expectedBlobData, string(tmp.ToByteSlice()))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetBlobVersionsWithDeletion(t *testing.T) {
|
||||
t.Parallel()
|
||||
testutil.ProviderTest(t)
|
||||
|
||||
// must be with Versioning enabled.
|
||||
bOpts := bucketOpts{
|
||||
projectID: os.Getenv(testBucketProjectID),
|
||||
bucket: os.Getenv(testImmutableBucketEnv),
|
||||
credentialsFile: os.Getenv(testBucketCredentialsFile),
|
||||
isLockedBucket: true,
|
||||
}
|
||||
|
||||
createBucket(t, bOpts)
|
||||
validateBucket(t, bOpts)
|
||||
|
||||
ctx := testlogging.Context(t)
|
||||
data := make([]byte, 8)
|
||||
rand.Read(data)
|
||||
// use context that gets canceled after opening storage to ensure it's not used beyond New().
|
||||
newctx, cancel := context.WithCancel(ctx)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
prefix := fmt.Sprintf("test-%v-%x/", clock.Now().Unix(), data)
|
||||
opts := &gcs.Options{
|
||||
BucketName: bOpts.bucket,
|
||||
ServiceAccountCredentialsFile: bOpts.credentialsFile,
|
||||
Prefix: prefix,
|
||||
}
|
||||
st, err := gcs.New(newctx, opts, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Cleanup(func() {
|
||||
st.Close(ctx)
|
||||
})
|
||||
|
||||
const (
|
||||
originalData = "original"
|
||||
updatedData = "some update"
|
||||
)
|
||||
|
||||
dataBlobs := []string{originalData, updatedData}
|
||||
|
||||
const blobName = "TestGetBlobVersionsWithDeletion"
|
||||
blobID := blob.ID(blobName)
|
||||
dataTimestamps, err := putBlobs(ctx, st, blobID, dataBlobs)
|
||||
require.NoError(t, err)
|
||||
|
||||
count := getBlobCount(ctx, t, st, blobID)
|
||||
require.Equal(t, 1, count)
|
||||
|
||||
err = st.DeleteBlob(ctx, blobID)
|
||||
require.NoError(t, err)
|
||||
|
||||
// blob no longer found.
|
||||
count = getBlobCount(ctx, t, st, blobID)
|
||||
require.Equal(t, 0, count)
|
||||
|
||||
opts.PointInTime = &dataTimestamps[1]
|
||||
st, err = gcs.New(ctx, opts, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
// blob visible again with PIT set.
|
||||
count = getBlobCount(ctx, t, st, blobID)
|
||||
require.Equal(t, 1, count)
|
||||
|
||||
var tmp gather.WriteBuffer
|
||||
err = st.GetBlob(ctx, blobID, 0, -1, &tmp)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, updatedData, string(tmp.ToByteSlice()))
|
||||
|
||||
opts.PointInTime = &dataTimestamps[0]
|
||||
st, err = gcs.New(ctx, opts, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = st.GetBlob(ctx, blobID, 0, -1, &tmp)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, originalData, string(tmp.ToByteSlice()))
|
||||
}
|
||||
|
||||
func putBlobs(ctx context.Context, cli blob.Storage, blobID blob.ID, blobs []string) ([]time.Time, error) {
|
||||
var putTimes []time.Time
|
||||
|
||||
for _, b := range blobs {
|
||||
if err := cli.PutBlob(ctx, blobID, gather.FromSlice([]byte(b)), blob.PutOptions{}); err != nil {
|
||||
return nil, errors.Wrap(err, "putting blob")
|
||||
}
|
||||
|
||||
m, err := cli.GetMetadata(ctx, blobID)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "getting metadata")
|
||||
}
|
||||
|
||||
putTimes = append(putTimes, m.Timestamp)
|
||||
}
|
||||
|
||||
return putTimes, nil
|
||||
}
|
||||
Reference in New Issue
Block a user