fix(providers): change versioning check for Azure storage (#3520)

* Change the way the versioning check is done
* Add test to ensure check fails on non-versioned Azure Blob containers (buckets)
This commit is contained in:
Mike McKay-Dirden
2023-12-22 01:39:58 +01:00
committed by GitHub
parent 7a0a68ecc1
commit fc640a98e4
3 changed files with 68 additions and 22 deletions

View File

@@ -132,10 +132,13 @@ func (az *azPointInTimeStorage) listBlobVersions(ctx context.Context, prefix blo
}
for _, it := range page.Segment.BlobItems {
vm := az.getVersionedBlobMeta(it)
vm, err := az.getVersionedBlobMeta(it)
if err != nil {
return translateError(err)
}
if err := callback(vm); err != nil {
return err
if err := callback(*vm); err != nil {
return translateError(err)
}
}
}
@@ -181,21 +184,17 @@ func maybePointInTimeStore(ctx context.Context, s *azStorage, pointInTime *time.
return s, nil
}
// Versioning is needed for PIT. This check will fail if someone deleted the Kopia Repository file.
props, err := s.service.ServiceClient().
NewContainerClient(s.container).
NewBlobClient(s.getObjectNameString(format.KopiaRepositoryBlobID)).
GetProperties(ctx, nil)
if err != nil {
return nil, errors.Wrapf(err, "could not get determine if container '%s' supports versioning", s.container)
}
if props.VersionID == nil {
return nil, errors.Errorf("cannot create point-in-time view for non-versioned container '%s'", s.container)
}
return readonly.NewWrapper(&azPointInTimeStorage{
pit := &azPointInTimeStorage{
azStorage: *s,
pointInTime: *pointInTime,
}), nil
pointInTime: *pointInTime, // not used for the check
}
err := pit.getBlobVersions(ctx, format.KopiaRepositoryBlobID, func(vm versionMetadata) error {
return nil
})
if err != nil {
return nil, errors.Wrap(err, "versioning must be enabled and a Kopia repository must exist")
}
return readonly.NewWrapper(pit), nil
}

View File

@@ -4,6 +4,7 @@
"context"
azblobmodels "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/pkg/errors"
"github.com/kopia/kopia/repo/blob"
)
@@ -19,14 +20,18 @@ type versionMetadata struct {
type versionMetadataCallback func(versionMetadata) error
func (az *azPointInTimeStorage) getVersionedBlobMeta(it *azblobmodels.BlobItem) versionMetadata {
func (az *azPointInTimeStorage) getVersionedBlobMeta(it *azblobmodels.BlobItem) (*versionMetadata, error) {
if it.VersionID == nil {
return nil, errors.Errorf("versionID is nil. Versioning must be enabled on the container for PIT")
}
bm := az.getBlobMeta(it)
return versionMetadata{
return &versionMetadata{
Metadata: bm,
Version: *it.VersionID,
IsDeleteMarker: az.isAzureDeleteMarker(it),
}
}, nil
}
// getBlobVersions lists all the versions for the blob with the given prefix.

View File

@@ -19,6 +19,46 @@
"github.com/kopia/kopia/repo/format"
)
func TestGetBlobVersionsFailsWhenVersioningDisabled(t *testing.T) {
t.Parallel()
testutil.ProviderTest(t)
// must be with Versioning disabled
container := getEnvOrSkip(t, testContainerEnv)
storageAccount := getEnvOrSkip(t, testStorageAccountEnv)
storageKey := getEnvOrSkip(t, testStorageKeyEnv)
ctx := testlogging.Context(t)
data := make([]byte, 8)
rand.Read(data)
// use context that gets canceled after opening storage to ensure it's not used beyond New().
newctx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)
prefix := fmt.Sprintf("test-%v-%x/", clock.Now().Unix(), data)
opts := &azure.Options{
Container: container,
StorageAccount: storageAccount,
StorageKey: storageKey,
Prefix: prefix,
}
st, err := azure.New(newctx, opts, false)
require.NoError(t, err)
t.Cleanup(func() {
st.Close(ctx)
})
// required for PIT versioning check
err = st.PutBlob(ctx, format.KopiaRepositoryBlobID, gather.FromSlice([]byte(nil)), blob.PutOptions{})
require.NoError(t, err)
pit := clock.Now()
opts.PointInTime = &pit
_, err = azure.New(ctx, opts, false)
require.Error(t, err)
}
func TestGetBlobVersions(t *testing.T) {
t.Parallel()
testutil.ProviderTest(t)
@@ -54,6 +94,8 @@ func TestGetBlobVersions(t *testing.T) {
// required for PIT versioning check
err = st.PutBlob(ctx, format.KopiaRepositoryBlobID, gather.FromSlice([]byte(nil)), blob.PutOptions{})
require.NoError(t, err)
err = st.DeleteBlob(ctx, format.KopiaRepositoryBlobID) // blob can be deleted and still work
require.NoError(t, err)
const (
originalData = "original"