From a922c2b56883a4ab66cbf3af4e69817023a4462c Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Thu, 13 Sep 2018 20:11:41 -0700 Subject: [PATCH] s3: fixed off-by-one error in s3_storage.GetBlock(), added more tests added progress reporting to s3 provider --- repo/internal/storagetesting/asserts.go | 26 +++++++++++++++ repo/storage/s3/s3_storage.go | 42 +++++++++++++++++++++++-- 2 files changed, 65 insertions(+), 3 deletions(-) diff --git a/repo/internal/storagetesting/asserts.go b/repo/internal/storagetesting/asserts.go index bf722a4a4..e30e51721 100644 --- a/repo/internal/storagetesting/asserts.go +++ b/repo/internal/storagetesting/asserts.go @@ -23,6 +23,32 @@ func AssertGetBlock(ctx context.Context, t *testing.T, s storage.Storage, block if !bytes.Equal(b, expected) { t.Errorf(errorPrefix()+"GetBlock(%v) returned %x, but expected %x", block, b, expected) } + + half := int64(len(expected) / 2) + if half == 0 { + return + } + + b, err = s.GetBlock(ctx, block, 0, half) + if err != nil { + t.Errorf(errorPrefix()+"GetBlock(%v) returned error %v, expected data: %v", block, err, expected) + return + } + + if !bytes.Equal(b, expected[0:half]) { + t.Errorf(errorPrefix()+"GetBlock(%v) returned %x, but expected %x", block, b, expected[0:half]) + } + + b, err = s.GetBlock(ctx, block, half, int64(len(expected))-half) + if err != nil { + t.Errorf(errorPrefix()+"GetBlock(%v) returned error %v, expected data: %v", block, err, expected) + return + } + + if !bytes.Equal(b, expected[len(expected)-int(half):]) { + t.Errorf(errorPrefix()+"GetBlock(%v) returned %x, but expected %x", block, b, expected[len(expected)-int(half):]) + } + } // AssertGetBlockNotFound asserts that GetBlock() for specified storage block returns ErrBlockNotFound. diff --git a/repo/storage/s3/s3_storage.go b/repo/storage/s3/s3_storage.go index 336a77ffb..974190f86 100644 --- a/repo/storage/s3/s3_storage.go +++ b/repo/storage/s3/s3_storage.go @@ -34,7 +34,7 @@ func (s *s3Storage) GetBlock(ctx context.Context, b string, offset, length int64 attempt := func() (interface{}, error) { var opt minio.GetObjectOptions if length > 0 { - if err := opt.SetRange(offset, offset+length); err != nil { + if err := opt.SetRange(offset, offset+length-1); err != nil { return nil, fmt.Errorf("unable to set range: %v", err) } } @@ -81,6 +81,9 @@ func isRetriableError(err error) bool { func translateError(err error) error { if me, ok := err.(minio.ErrorResponse); ok { + if me.StatusCode == 200 { + return nil + } if me.StatusCode == 404 { return storage.ErrBlockNotFound } @@ -100,11 +103,18 @@ func (s *s3Storage) PutBlock(ctx context.Context, b string, data []byte) error { return err } - n, err := s.cli.PutObject(s.BucketName, s.getObjectNameString(b), throttled, -1, minio.PutObjectOptions{}) + progressCallback := storage.ProgressCallback(ctx) + n, err := s.cli.PutObject(s.BucketName, s.getObjectNameString(b), throttled, -1, minio.PutObjectOptions{ + ContentType: "application/x-kopia", + Progress: newProgressReader(progressCallback, b, int64(len(data))), + }) if err == io.EOF && n == 0 { // special case empty stream - _, err = s.cli.PutObject(s.BucketName, s.getObjectNameString(b), bytes.NewBuffer(nil), 0, minio.PutObjectOptions{}) + _, err = s.cli.PutObject(s.BucketName, s.getObjectNameString(b), bytes.NewBuffer(nil), 0, minio.PutObjectOptions{ + ContentType: "application/x-kopia", + }) } + return translateError(err) } @@ -157,6 +167,32 @@ func (s *s3Storage) String() string { return fmt.Sprintf("s3://%v/%v", s.BucketName, s.Prefix) } +type progressReader struct { + cb storage.ProgressFunc + blockID string + completed int64 + totalLength int64 + lastReported int64 +} + +func (r *progressReader) Read(b []byte) (int, error) { + r.completed += int64(len(b)) + if r.completed >= r.lastReported+1000000 || r.completed == r.totalLength { + r.cb(r.blockID, r.completed, r.totalLength) + r.lastReported = r.completed + } + return len(b), nil +} + +func newProgressReader(cb storage.ProgressFunc, blockID string, totalLength int64) io.Reader { + if cb == nil { + return nil + } + + return &progressReader{cb: cb, blockID: blockID, totalLength: totalLength} + +} + func toBandwidth(bytesPerSecond int) iothrottler.Bandwidth { if bytesPerSecond <= 0 { return iothrottler.Unlimited