s3: fixed off-by-one error in s3_storage.GetBlock(), added more tests

added progress reporting to s3 provider
This commit is contained in:
Jarek Kowalski
2018-09-13 20:11:41 -07:00
parent 69782d2abe
commit a922c2b568
2 changed files with 65 additions and 3 deletions

View File

@@ -23,6 +23,32 @@ func AssertGetBlock(ctx context.Context, t *testing.T, s storage.Storage, block
if !bytes.Equal(b, expected) {
t.Errorf(errorPrefix()+"GetBlock(%v) returned %x, but expected %x", block, b, expected)
}
half := int64(len(expected) / 2)
if half == 0 {
return
}
b, err = s.GetBlock(ctx, block, 0, half)
if err != nil {
t.Errorf(errorPrefix()+"GetBlock(%v) returned error %v, expected data: %v", block, err, expected)
return
}
if !bytes.Equal(b, expected[0:half]) {
t.Errorf(errorPrefix()+"GetBlock(%v) returned %x, but expected %x", block, b, expected[0:half])
}
b, err = s.GetBlock(ctx, block, half, int64(len(expected))-half)
if err != nil {
t.Errorf(errorPrefix()+"GetBlock(%v) returned error %v, expected data: %v", block, err, expected)
return
}
if !bytes.Equal(b, expected[len(expected)-int(half):]) {
t.Errorf(errorPrefix()+"GetBlock(%v) returned %x, but expected %x", block, b, expected[len(expected)-int(half):])
}
}
// AssertGetBlockNotFound asserts that GetBlock() for specified storage block returns ErrBlockNotFound.

View File

@@ -34,7 +34,7 @@ func (s *s3Storage) GetBlock(ctx context.Context, b string, offset, length int64
attempt := func() (interface{}, error) {
var opt minio.GetObjectOptions
if length > 0 {
if err := opt.SetRange(offset, offset+length); err != nil {
if err := opt.SetRange(offset, offset+length-1); err != nil {
return nil, fmt.Errorf("unable to set range: %v", err)
}
}
@@ -81,6 +81,9 @@ func isRetriableError(err error) bool {
func translateError(err error) error {
if me, ok := err.(minio.ErrorResponse); ok {
if me.StatusCode == 200 {
return nil
}
if me.StatusCode == 404 {
return storage.ErrBlockNotFound
}
@@ -100,11 +103,18 @@ func (s *s3Storage) PutBlock(ctx context.Context, b string, data []byte) error {
return err
}
n, err := s.cli.PutObject(s.BucketName, s.getObjectNameString(b), throttled, -1, minio.PutObjectOptions{})
progressCallback := storage.ProgressCallback(ctx)
n, err := s.cli.PutObject(s.BucketName, s.getObjectNameString(b), throttled, -1, minio.PutObjectOptions{
ContentType: "application/x-kopia",
Progress: newProgressReader(progressCallback, b, int64(len(data))),
})
if err == io.EOF && n == 0 {
// special case empty stream
_, err = s.cli.PutObject(s.BucketName, s.getObjectNameString(b), bytes.NewBuffer(nil), 0, minio.PutObjectOptions{})
_, err = s.cli.PutObject(s.BucketName, s.getObjectNameString(b), bytes.NewBuffer(nil), 0, minio.PutObjectOptions{
ContentType: "application/x-kopia",
})
}
return translateError(err)
}
@@ -157,6 +167,32 @@ func (s *s3Storage) String() string {
return fmt.Sprintf("s3://%v/%v", s.BucketName, s.Prefix)
}
type progressReader struct {
cb storage.ProgressFunc
blockID string
completed int64
totalLength int64
lastReported int64
}
func (r *progressReader) Read(b []byte) (int, error) {
r.completed += int64(len(b))
if r.completed >= r.lastReported+1000000 || r.completed == r.totalLength {
r.cb(r.blockID, r.completed, r.totalLength)
r.lastReported = r.completed
}
return len(b), nil
}
func newProgressReader(cb storage.ProgressFunc, blockID string, totalLength int64) io.Reader {
if cb == nil {
return nil
}
return &progressReader{cb: cb, blockID: blockID, totalLength: totalLength}
}
func toBandwidth(bytesPerSecond int) iothrottler.Bandwidth {
if bytesPerSecond <= 0 {
return iothrottler.Unlimited