mirror of
https://github.com/mudler/LocalAI.git
synced 2026-06-28 18:37:43 -04:00
fix(oci): retry layer downloads on transient network errors (#10579)
Installing large backend images (e.g. vLLM/vLLM-omni, several GiB) over the Web UI could fail with "failed to download layer 0: unexpected EOF" when a single connection to the registry dropped mid-stream. The whole install then failed with no recovery, and since the download is not resumable, retrying from the UI restarted from zero and usually hit the same blip again - so users saw it as a consistent, size-correlated failure (issue #10577). The registry transport already retries manifest/digest fetches via defaultRetryPredicate (GetImage/GetImageDigest), but the per-layer data stream in DownloadOCIImageTar bypassed it entirely: layer.Compressed() + xio.Copy ran exactly once. Extract the per-layer copy into downloadLayerToFile, which retries on the same transient errors (unexpected EOF, EOF, EPIPE, ECONNRESET, connection refused) with exponential backoff, truncating any partial data before each retry. Non-retryable errors and context cancellation still fail fast. Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
@@ -63,6 +63,72 @@ var defaultRetryPredicate = func(err error) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// layerDownloadRetries is the number of additional attempts made when a layer
|
||||
// download fails with a transient/retryable network error.
|
||||
var layerDownloadRetries = 3
|
||||
|
||||
// layerRetryBackoff returns the wait before retry attempt n (1-indexed). It is a
|
||||
// variable so tests can eliminate the wait.
|
||||
var layerRetryBackoff = func(attempt int) time.Duration {
|
||||
d := defaultRetryBackoff.Duration
|
||||
for i := 1; i < attempt; i++ {
|
||||
d = time.Duration(float64(d) * defaultRetryBackoff.Factor)
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
// downloadLayerToFile streams a single compressed layer into dst, retrying on
|
||||
// transient network errors (unexpected EOF, connection reset, ...). Large
|
||||
// backend images (e.g. vLLM) are several GiB and a single dropped connection
|
||||
// mid-stream previously failed the whole install with "unexpected EOF" and no
|
||||
// recovery. The registry transport already retries manifest fetches via
|
||||
// defaultRetryPredicate (see GetImage/GetImageDigest); this extends the same
|
||||
// behaviour to the layer data stream. See issue #10577.
|
||||
func downloadLayerToFile(ctx context.Context, layer v1.Layer, dst *os.File, progress *progressWriter) error {
|
||||
var lastErr error
|
||||
for attempt := 0; attempt <= layerDownloadRetries; attempt++ {
|
||||
if attempt > 0 {
|
||||
// Discard any partial data from the previous failed attempt.
|
||||
if _, err := dst.Seek(0, io.SeekStart); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := dst.Truncate(0); err != nil {
|
||||
return err
|
||||
}
|
||||
if progress != nil {
|
||||
progress.written = 0
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-time.After(layerRetryBackoff(attempt)):
|
||||
}
|
||||
}
|
||||
|
||||
var w io.Writer = dst
|
||||
if progress != nil {
|
||||
w = io.MultiWriter(dst, progress)
|
||||
}
|
||||
|
||||
var reader io.ReadCloser
|
||||
reader, lastErr = layer.Compressed()
|
||||
if lastErr == nil {
|
||||
_, lastErr = xio.Copy(ctx, w, reader)
|
||||
_ = reader.Close()
|
||||
}
|
||||
if lastErr == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop early on context cancellation or non-retryable errors.
|
||||
if ctx.Err() != nil || !defaultRetryPredicate(lastErr) {
|
||||
return lastErr
|
||||
}
|
||||
logs.Warn.Printf("layer download failed (attempt %d/%d), retrying: %v", attempt+1, layerDownloadRetries+1, lastErr)
|
||||
}
|
||||
return lastErr
|
||||
}
|
||||
|
||||
type progressWriter struct {
|
||||
written int64
|
||||
total int64
|
||||
@@ -304,23 +370,17 @@ func DownloadOCIImageTar(ctx context.Context, img v1.Image, imageRef string, tar
|
||||
}
|
||||
|
||||
// Create progress writer for this layer
|
||||
var writer io.Writer = file
|
||||
var progress *progressWriter
|
||||
if downloadStatus != nil {
|
||||
writer = io.MultiWriter(file, &progressWriter{
|
||||
progress = &progressWriter{
|
||||
total: totalCompressedSize,
|
||||
fileName: fmt.Sprintf("Downloading %d/%d %s", i+1, len(layers), imageName),
|
||||
downloadStatus: downloadStatus,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Download the compressed layer
|
||||
layerReader, err := layer.Compressed()
|
||||
if err != nil {
|
||||
file.Close()
|
||||
return fmt.Errorf("failed to get compressed layer: %v", err)
|
||||
}
|
||||
|
||||
_, err = xio.Copy(ctx, writer, layerReader)
|
||||
// Download the compressed layer, retrying on transient network errors.
|
||||
err = downloadLayerToFile(ctx, layer, file, progress)
|
||||
file.Close()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to download layer %d: %v", i, err)
|
||||
|
||||
123
pkg/oci/layer_internal_test.go
Normal file
123
pkg/oci/layer_internal_test.go
Normal file
@@ -0,0 +1,123 @@
|
||||
package oci
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
v1 "github.com/google/go-containerregistry/pkg/v1"
|
||||
"github.com/google/go-containerregistry/pkg/v1/types"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
// failingReader yields prefix bytes then returns err, simulating a connection
|
||||
// dropped mid-stream while downloading a layer.
|
||||
type failingReader struct {
|
||||
prefix []byte
|
||||
off int
|
||||
err error
|
||||
}
|
||||
|
||||
func (r *failingReader) Read(p []byte) (int, error) {
|
||||
if r.off < len(r.prefix) {
|
||||
n := copy(p, r.prefix[r.off:])
|
||||
r.off += n
|
||||
return n, nil
|
||||
}
|
||||
return 0, r.err
|
||||
}
|
||||
|
||||
// fakeLayer is a minimal v1.Layer whose Compressed() fails failUntil times with
|
||||
// err (after emitting a partial prefix) before finally returning data in full.
|
||||
type fakeLayer struct {
|
||||
data []byte
|
||||
failUntil int
|
||||
err error
|
||||
calls int
|
||||
}
|
||||
|
||||
func (f *fakeLayer) Digest() (v1.Hash, error) { return v1.Hash{}, nil }
|
||||
func (f *fakeLayer) DiffID() (v1.Hash, error) { return v1.Hash{}, nil }
|
||||
func (f *fakeLayer) Size() (int64, error) { return int64(len(f.data)), nil }
|
||||
func (f *fakeLayer) MediaType() (types.MediaType, error) { return types.DockerLayer, nil }
|
||||
func (f *fakeLayer) Uncompressed() (io.ReadCloser, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
}
|
||||
|
||||
func (f *fakeLayer) Compressed() (io.ReadCloser, error) {
|
||||
f.calls++
|
||||
if f.calls <= f.failUntil {
|
||||
return io.NopCloser(&failingReader{prefix: []byte("partial-garbage"), err: f.err}), nil
|
||||
}
|
||||
return io.NopCloser(bytes.NewReader(f.data)), nil
|
||||
}
|
||||
|
||||
var _ = Describe("downloadLayerToFile", func() {
|
||||
var (
|
||||
dst *os.File
|
||||
restoreWait func()
|
||||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
var err error
|
||||
dst, err = os.CreateTemp("", "layer-retry-*.tar.gz")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
// Eliminate the real backoff sleep so the test is fast.
|
||||
prev := layerRetryBackoff
|
||||
layerRetryBackoff = func(int) time.Duration { return 0 }
|
||||
restoreWait = func() { layerRetryBackoff = prev }
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
restoreWait()
|
||||
_ = dst.Close()
|
||||
_ = os.Remove(dst.Name())
|
||||
})
|
||||
|
||||
It("retries on unexpected EOF and writes the complete layer", func() {
|
||||
layer := &fakeLayer{
|
||||
data: []byte("the-real-layer-contents"),
|
||||
failUntil: 2,
|
||||
err: io.ErrUnexpectedEOF,
|
||||
}
|
||||
|
||||
err := downloadLayerToFile(context.Background(), layer, dst, nil)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(layer.calls).To(Equal(3))
|
||||
|
||||
got, err := os.ReadFile(dst.Name())
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
// The partial data from the two failed attempts must have been
|
||||
// discarded, leaving exactly the real contents.
|
||||
Expect(string(got)).To(Equal("the-real-layer-contents"))
|
||||
})
|
||||
|
||||
It("does not retry on a non-retryable error", func() {
|
||||
layer := &fakeLayer{
|
||||
data: []byte("never-reached"),
|
||||
failUntil: 1,
|
||||
err: errors.New("permission denied"),
|
||||
}
|
||||
|
||||
err := downloadLayerToFile(context.Background(), layer, dst, nil)
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(layer.calls).To(Equal(1))
|
||||
})
|
||||
|
||||
It("gives up after exhausting retries on a persistent transient error", func() {
|
||||
layer := &fakeLayer{
|
||||
data: []byte("unreachable"),
|
||||
failUntil: 1000,
|
||||
err: io.ErrUnexpectedEOF,
|
||||
}
|
||||
|
||||
err := downloadLayerToFile(context.Background(), layer, dst, nil)
|
||||
Expect(err).To(MatchError(io.ErrUnexpectedEOF))
|
||||
Expect(layer.calls).To(Equal(layerDownloadRetries + 1))
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user