Files
kopia/internal/blobtesting/verify.go
Jarek Kowalski 7401684e71 blob: replaced blob.Storage.SetTime() method with blob.PutOptions.SetTime (#1595)
* sharded: plumbed through blob.PutOptions

* blob: removed blob.Storage.SetTime() method

This was only used for `kopia repo sync-to` and got replaced with
an equivalent blob.PutOptions.SetTime, which wehn set to non-zero time
will attempt to set the modification time on a file.

Since some providers don't support changing modification time, we
are able to emulate it using per-blob metadata (on B2, Azure and GCS),
sadly S3 is still unsupported, because it does not support returning
metadata in list results.

Also added PutOptions.GetTime, which when set to not nil, will
populate the provided variable with actual time that got assigned
to the blob.

Added tests that verify that each provider supports GetTime
and SetTime according to this spec.

* blob: additional test coverage for filesystem storage

* blob: added PutBlobAndGetMetadata() helper and used where appropriate

* fixed test failures

* pr feedback

* Update repo/blob/azure/azure_storage.go

Co-authored-by: Shikhar Mall <mall.shikhar.in@gmail.com>

* Update repo/blob/filesystem/filesystem_storage.go

Co-authored-by: Shikhar Mall <mall.shikhar.in@gmail.com>

* Update repo/blob/filesystem/filesystem_storage.go

Co-authored-by: Shikhar Mall <mall.shikhar.in@gmail.com>

* blobtesting: fixed object_locking_map.go

* blobtesting: removed SetTime from ObjectLockingMap

Co-authored-by: Shikhar Mall <mall.shikhar.in@gmail.com>
2021-12-18 14:00:20 -08:00

220 lines
5.8 KiB
Go

package blobtesting
import (
"bytes"
"context"
"fmt"
"os"
"testing"
"time"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"github.com/kopia/kopia/internal/gather"
"github.com/kopia/kopia/internal/providervalidation"
"github.com/kopia/kopia/repo/blob"
)
// VerifyStorage verifies the behavior of the specified storage.
// nolint:gocyclo,thelper
func VerifyStorage(ctx context.Context, t *testing.T, r blob.Storage, opts blob.PutOptions) {
blocks := []struct {
blk blob.ID
contents []byte
}{
{blk: "abcdbbf4f0507d054ed5a80a5b65086f602b", contents: []byte{}},
{blk: "zxce0e35630770c54668a8cfb4e414c6bf8f", contents: []byte{1}},
{blk: "abff4585856ebf0748fd989e1dd623a8963d", contents: bytes.Repeat([]byte{1}, 1000)},
{blk: "abgc3dca496d510f492c858a2df1eb824e62", contents: bytes.Repeat([]byte{1}, 10000)},
{blk: "kopia.repository", contents: bytes.Repeat([]byte{2}, 100)},
}
// First verify that blocks don't exist.
t.Run("VerifyBlobsNotFound", func(t *testing.T) {
for _, b := range blocks {
b := b
t.Run(string(b.blk), func(t *testing.T) {
t.Parallel()
AssertGetBlobNotFound(ctx, t, r, b.blk)
AssertGetMetadataNotFound(ctx, t, r, b.blk)
})
}
})
if err := r.DeleteBlob(ctx, "no-such-blob"); err != nil && !errors.Is(err, blob.ErrBlobNotFound) {
t.Errorf("invalid error when deleting non-existent blob: %v", err)
}
initialAddConcurrency := 2
if os.Getenv("CI") != "" {
initialAddConcurrency = 4
}
// Now add blocks.
t.Run("AddBlobs", func(t *testing.T) {
for _, b := range blocks {
for i := 0; i < initialAddConcurrency; i++ {
b := b
t.Run(fmt.Sprintf("%v-%v", b.blk, i), func(t *testing.T) {
t.Parallel()
if err := r.PutBlob(ctx, b.blk, gather.FromSlice(b.contents), opts); err != nil {
t.Fatalf("can't put blob: %v", err)
}
})
}
}
})
t.Run("GetBlobs", func(t *testing.T) {
for _, b := range blocks {
b := b
t.Run(string(b.blk), func(t *testing.T) {
t.Parallel()
AssertGetBlob(ctx, t, r, b.blk, b.contents)
})
}
})
t.Run("ListBlobs", func(t *testing.T) {
errExpected := errors.New("expected error")
t.Run("ListErrorNoPrefix", func(t *testing.T) {
t.Parallel()
require.ErrorIs(t, r.ListBlobs(ctx, "", func(bm blob.Metadata) error {
return errExpected
}), errExpected)
})
t.Run("ListErrorWithPrefix", func(t *testing.T) {
t.Parallel()
require.ErrorIs(t, r.ListBlobs(ctx, "ab", func(bm blob.Metadata) error {
return errExpected
}), errExpected)
})
t.Run("ListNoPrefix", func(t *testing.T) {
t.Parallel()
AssertListResults(ctx, t, r, "", blocks[0].blk, blocks[1].blk, blocks[2].blk, blocks[3].blk, blocks[4].blk)
})
t.Run("ListWithPrefix", func(t *testing.T) {
t.Parallel()
AssertListResults(ctx, t, r, "ab", blocks[0].blk, blocks[2].blk, blocks[3].blk)
})
})
t.Run("OverwriteBlobs", func(t *testing.T) {
for _, b := range blocks {
b := b
t.Run(string(b.blk), func(t *testing.T) {
t.Parallel()
require.NoErrorf(t, r.PutBlob(ctx, b.blk, gather.FromSlice(b.contents), blob.PutOptions{}), "can't put blob: %v", b)
AssertGetBlob(ctx, t, r, b.blk, b.contents)
})
}
})
t.Run("DeleteBlobsAndList", func(t *testing.T) {
require.NoError(t, r.DeleteBlob(ctx, blocks[0].blk))
require.NoError(t, r.DeleteBlob(ctx, blocks[0].blk))
AssertListResults(ctx, t, r, "ab", blocks[2].blk, blocks[3].blk)
AssertListResults(ctx, t, r, "", blocks[1].blk, blocks[2].blk, blocks[3].blk, blocks[4].blk)
})
t.Run("PutBlobsWithSetTime", func(t *testing.T) {
for _, b := range blocks {
b := b
t.Run(string(b.blk), func(t *testing.T) {
t.Parallel()
inTime := time.Date(2020, 1, 2, 12, 30, 40, 0, time.UTC)
err := r.PutBlob(ctx, b.blk, gather.FromSlice(b.contents), blob.PutOptions{
SetModTime: inTime,
})
if errors.Is(err, blob.ErrSetTimeUnsupported) {
t.Skip("setting time unsupported")
}
bm, err := r.GetMetadata(ctx, b.blk)
require.NoError(t, err)
AssertTimestampsCloseEnough(t, bm.BlobID, bm.Timestamp, inTime)
all, err := blob.ListAllBlobs(ctx, r, b.blk)
require.NoError(t, err)
require.Len(t, all, 1)
AssertTimestampsCloseEnough(t, all[0].BlobID, all[0].Timestamp, inTime)
})
}
})
t.Run("PutBlobsWithGetTime", func(t *testing.T) {
for _, b := range blocks {
b := b
t.Run(string(b.blk), func(t *testing.T) {
t.Parallel()
var outTime time.Time
require.NoError(t, r.PutBlob(ctx, b.blk, gather.FromSlice(b.contents), blob.PutOptions{
GetModTime: &outTime,
}))
require.False(t, outTime.IsZero(), "modification time was not returned")
bm, err := r.GetMetadata(ctx, b.blk)
require.NoError(t, err)
AssertTimestampsCloseEnough(t, bm.BlobID, bm.Timestamp, outTime)
all, err := blob.ListAllBlobs(ctx, r, b.blk)
require.NoError(t, err)
require.Len(t, all, 1)
AssertTimestampsCloseEnough(t, all[0].BlobID, all[0].Timestamp, outTime)
})
}
})
}
// AssertConnectionInfoRoundTrips verifies that the ConnectionInfo returned by a given storage can be used to create
// equivalent storage.
// nolint:thelper
func AssertConnectionInfoRoundTrips(ctx context.Context, t *testing.T, s blob.Storage) {
ci := s.ConnectionInfo()
s2, err := blob.NewStorage(ctx, ci, false)
if err != nil {
t.Fatalf("err: %v", err)
}
ci2 := s2.ConnectionInfo()
require.Equal(t, ci, ci2)
require.NoError(t, s2.Close(ctx))
}
// TestValidationOptions is the set of options used when running providing validation from tests.
// nolint:gomnd
var TestValidationOptions = providervalidation.Options{
MaxClockDrift: 3 * time.Minute,
ConcurrencyTestDuration: 15 * time.Second,
NumPutBlobWorkers: 3,
NumGetBlobWorkers: 3,
NumGetMetadataWorkers: 3,
NumListBlobsWorkers: 3,
MaxBlobLength: 10e6,
}