diff --git a/blob/gcs/gcs_storage.go b/blob/gcs/gcs_storage.go index 6a3725eeb..052e10c86 100644 --- a/blob/gcs/gcs_storage.go +++ b/blob/gcs/gcs_storage.go @@ -6,6 +6,8 @@ "errors" "fmt" "io/ioutil" + "log" + "time" "github.com/kopia/kopia/internal/throttle" @@ -23,6 +25,10 @@ const ( gcsStorageType = "gcs" + + retryMaxAttempts = 10 + retryInitialSleepAmount = 1 * time.Second + retryMaxSleepAmount = 32 * time.Second ) type gcsStorage struct { @@ -37,23 +43,73 @@ type gcsStorage struct { } func (gcs *gcsStorage) BlockSize(b string) (int64, error) { - oh := gcs.bucket.Object(gcs.getObjectNameString(b)) - a, err := oh.Attrs(context.Background()) + attempt := func() (interface{}, error) { + oh := gcs.bucket.Object(gcs.getObjectNameString(b)) + a, err := oh.Attrs(context.Background()) + if err != nil { + return 0, err + } + + return a.Size, nil + } + + v, err := exponentialBackoff(fmt.Sprintf("BlockSize(%q)", b), attempt) if err != nil { return 0, translateError(err) } - return a.Size, nil + return v.(int64), nil } func (gcs *gcsStorage) GetBlock(b string) ([]byte, error) { - reader, err := gcs.bucket.Object(gcs.getObjectNameString(b)).NewReader(gcs.ctx) + attempt := func() (interface{}, error) { + reader, err := gcs.bucket.Object(gcs.getObjectNameString(b)).NewReader(gcs.ctx) + if err != nil { + return nil, err + } + defer reader.Close() + + v, err := ioutil.ReadAll(reader) + return v, err + } + + v, err := exponentialBackoff(fmt.Sprintf("GetBlock(%q)", b), attempt) if err != nil { return nil, translateError(err) } - defer reader.Close() - return ioutil.ReadAll(reader) + return v.([]byte), nil +} + +func exponentialBackoff(desc string, attempt func() (interface{}, error)) (interface{}, error) { + sleepAmount := retryInitialSleepAmount + for i := 0; i < retryMaxAttempts; i++ { + v, err := attempt() + if !isRetriableError(err) { + return v, err + } + log.Printf("got error %v when %v (#%v), sleeping for %v before retrying", err, desc, i, sleepAmount) + time.Sleep(sleepAmount) + sleepAmount *= 2 + if sleepAmount > retryMaxSleepAmount { + sleepAmount = retryMaxSleepAmount + } + } + + return nil, fmt.Errorf("unable to complete %v despite %v retries", desc, retryMaxAttempts) +} + +func isRetriableError(err error) bool { + switch err { + case nil: + return false + case storage.ErrObjectNotExist: + return false + case storage.ErrBucketNotExist: + return false + default: + return true + } } func translateError(err error) error { @@ -62,32 +118,39 @@ func translateError(err error) error { return nil case storage.ErrObjectNotExist: return blob.ErrBlockNotFound - case storage.ErrObjectNotExist: + case storage.ErrBucketNotExist: return blob.ErrBlockNotFound default: - return err + return fmt.Errorf("unexpected GCS error: %v", err) } } func (gcs *gcsStorage) PutBlock(b string, data []byte, options blob.PutOptions) error { - writer := gcs.bucket.Object(gcs.getObjectNameString(b)).NewWriter(gcs.ctx) - throttledWriter, err := gcs.uploadThrottler.AddWriter(writer) - if err != nil { - return err - } - n, err := throttledWriter.Write(data) - if err != nil { - return translateError(err) - } - if n != len(data) { - return writer.CloseWithError(errors.New("truncated write")) + attempt := func() (interface{}, error) { + writer := gcs.bucket.Object(gcs.getObjectNameString(b)).NewWriter(gcs.ctx) + n, err := writer.Write(data) + if err != nil { + return nil, err + } + if n != len(data) { + writer.CloseWithError(errors.New("truncated write")) + return nil, fmt.Errorf("truncated write %v of %v bytes", n, len(data)) + } + + return nil, writer.Close() } - return translateError(writer.Close()) + _, err := exponentialBackoff(fmt.Sprintf("PutBlock(%q)", b), attempt) + return translateError(err) } func (gcs *gcsStorage) DeleteBlock(b string) error { - return translateError(gcs.bucket.Object(gcs.getObjectNameString(b)).Delete(gcs.ctx)) + attempt := func() (interface{}, error) { + return nil, gcs.bucket.Object(gcs.getObjectNameString(b)).Delete(gcs.ctx) + } + + _, err := exponentialBackoff(fmt.Sprintf("DeleteBlock(%q)", b), attempt) + return translateError(err) } func (gcs *gcsStorage) getObjectNameString(b string) string {