improved GCS exponential backoff

This commit is contained in:
Jarek Kowalski
2017-08-12 16:28:01 -07:00
parent 60654e125b
commit 9d1fb5032d

View File

@@ -6,6 +6,8 @@
"errors"
"fmt"
"io/ioutil"
"log"
"time"
"github.com/kopia/kopia/internal/throttle"
@@ -23,6 +25,10 @@
const (
gcsStorageType = "gcs"
retryMaxAttempts = 10
retryInitialSleepAmount = 1 * time.Second
retryMaxSleepAmount = 32 * time.Second
)
type gcsStorage struct {
@@ -37,23 +43,73 @@ type gcsStorage struct {
}
func (gcs *gcsStorage) BlockSize(b string) (int64, error) {
oh := gcs.bucket.Object(gcs.getObjectNameString(b))
a, err := oh.Attrs(context.Background())
attempt := func() (interface{}, error) {
oh := gcs.bucket.Object(gcs.getObjectNameString(b))
a, err := oh.Attrs(context.Background())
if err != nil {
return 0, err
}
return a.Size, nil
}
v, err := exponentialBackoff(fmt.Sprintf("BlockSize(%q)", b), attempt)
if err != nil {
return 0, translateError(err)
}
return a.Size, nil
return v.(int64), nil
}
func (gcs *gcsStorage) GetBlock(b string) ([]byte, error) {
reader, err := gcs.bucket.Object(gcs.getObjectNameString(b)).NewReader(gcs.ctx)
attempt := func() (interface{}, error) {
reader, err := gcs.bucket.Object(gcs.getObjectNameString(b)).NewReader(gcs.ctx)
if err != nil {
return nil, err
}
defer reader.Close()
v, err := ioutil.ReadAll(reader)
return v, err
}
v, err := exponentialBackoff(fmt.Sprintf("GetBlock(%q)", b), attempt)
if err != nil {
return nil, translateError(err)
}
defer reader.Close()
return ioutil.ReadAll(reader)
return v.([]byte), nil
}
func exponentialBackoff(desc string, attempt func() (interface{}, error)) (interface{}, error) {
sleepAmount := retryInitialSleepAmount
for i := 0; i < retryMaxAttempts; i++ {
v, err := attempt()
if !isRetriableError(err) {
return v, err
}
log.Printf("got error %v when %v (#%v), sleeping for %v before retrying", err, desc, i, sleepAmount)
time.Sleep(sleepAmount)
sleepAmount *= 2
if sleepAmount > retryMaxSleepAmount {
sleepAmount = retryMaxSleepAmount
}
}
return nil, fmt.Errorf("unable to complete %v despite %v retries", desc, retryMaxAttempts)
}
func isRetriableError(err error) bool {
switch err {
case nil:
return false
case storage.ErrObjectNotExist:
return false
case storage.ErrBucketNotExist:
return false
default:
return true
}
}
func translateError(err error) error {
@@ -62,32 +118,39 @@ func translateError(err error) error {
return nil
case storage.ErrObjectNotExist:
return blob.ErrBlockNotFound
case storage.ErrObjectNotExist:
case storage.ErrBucketNotExist:
return blob.ErrBlockNotFound
default:
return err
return fmt.Errorf("unexpected GCS error: %v", err)
}
}
func (gcs *gcsStorage) PutBlock(b string, data []byte, options blob.PutOptions) error {
writer := gcs.bucket.Object(gcs.getObjectNameString(b)).NewWriter(gcs.ctx)
throttledWriter, err := gcs.uploadThrottler.AddWriter(writer)
if err != nil {
return err
}
n, err := throttledWriter.Write(data)
if err != nil {
return translateError(err)
}
if n != len(data) {
return writer.CloseWithError(errors.New("truncated write"))
attempt := func() (interface{}, error) {
writer := gcs.bucket.Object(gcs.getObjectNameString(b)).NewWriter(gcs.ctx)
n, err := writer.Write(data)
if err != nil {
return nil, err
}
if n != len(data) {
writer.CloseWithError(errors.New("truncated write"))
return nil, fmt.Errorf("truncated write %v of %v bytes", n, len(data))
}
return nil, writer.Close()
}
return translateError(writer.Close())
_, err := exponentialBackoff(fmt.Sprintf("PutBlock(%q)", b), attempt)
return translateError(err)
}
func (gcs *gcsStorage) DeleteBlock(b string) error {
return translateError(gcs.bucket.Object(gcs.getObjectNameString(b)).Delete(gcs.ctx))
attempt := func() (interface{}, error) {
return nil, gcs.bucket.Object(gcs.getObjectNameString(b)).Delete(gcs.ctx)
}
_, err := exponentialBackoff(fmt.Sprintf("DeleteBlock(%q)", b), attempt)
return translateError(err)
}
func (gcs *gcsStorage) getObjectNameString(b string) string {