mirror of
https://github.com/mudler/LocalAI.git
synced 2026-06-22 15:49:12 -04:00
fix(distributed): detach cold-load staging from the request context (#10438)
A model not yet loaded on a worker is staged lazily on the inference request path. Staging a multi-GB model takes minutes - far longer than any client keeps its HTTP request open - so a browser refresh, an ingress/LB idle-timeout, or a round-robined retry landing on another frontend replica cancels the request context and aborts the upload with "context canceled" mid-transfer. Large models then never finish staging, so they never load (observed in a 2-replica deployment: both frontends repeatedly failed to stage a 15.7 GB GGUF, each attempt dying at a different offset). Bind the cold load (staging + LoadModel + the per-model advisory lock) to context.WithoutCancel(ctx): it keeps the request's values (prefix chain) but drops cancellation/deadline. Each long step keeps its own bound (the file stager's resume budget, LoadModel's 5m timeout), and the advisory lock still de-dupes concurrent loaders across replicas. Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
@@ -359,8 +359,21 @@ func (r *SmartRouter) Route(ctx context.Context, modelID, modelName, backendType
|
||||
}
|
||||
}
|
||||
|
||||
// Step 2: Model not loaded — schedule loading with distributed lock to prevent duplicates
|
||||
loadModel := func() (*RouteResult, error) {
|
||||
// Step 2: Model not loaded — schedule loading with distributed lock to prevent duplicates.
|
||||
//
|
||||
// Detach the cold-load from the caller's context. Staging a model can
|
||||
// transfer multiple GB to a worker, which takes far longer than any client
|
||||
// keeps its HTTP request open — a browser refresh, an ingress/LB idle
|
||||
// timeout, or a round-robined retry landing on another replica all cancel
|
||||
// the request context. If staging were bound to it, the multi-GB upload
|
||||
// aborts with "context canceled" mid-transfer and large models can never
|
||||
// finish staging (the model-load outage). WithoutCancel keeps the request's
|
||||
// values (prefix chain, etc.) but drops its cancellation/deadline. Each
|
||||
// long step still has its own bound (the file stager's resume budget,
|
||||
// LoadModel's 5m timeout), and the per-model advisory lock below de-dupes
|
||||
// concurrent loaders across replicas.
|
||||
loadCtx := context.WithoutCancel(ctx)
|
||||
loadModel := func(ctx context.Context) (*RouteResult, error) {
|
||||
// Re-check after acquiring lock — another request may have loaded it
|
||||
node, nm, err := r.registry.FindAndLockNodeWithModel(ctx, trackingKey, candidateNodeIDs, pref)
|
||||
if err == nil && node != nil {
|
||||
@@ -433,9 +446,9 @@ func (r *SmartRouter) Route(ctx context.Context, modelID, modelName, backendType
|
||||
if r.db != nil {
|
||||
lockKey := advisorylock.KeyFromString("model-load:" + trackingKey)
|
||||
var result *RouteResult
|
||||
lockErr := advisorylock.WithLockCtx(ctx, r.db, lockKey, func() error {
|
||||
lockErr := advisorylock.WithLockCtx(loadCtx, r.db, lockKey, func() error {
|
||||
var err error
|
||||
result, err = loadModel()
|
||||
result, err = loadModel(loadCtx)
|
||||
return err
|
||||
})
|
||||
if lockErr != nil {
|
||||
@@ -444,7 +457,7 @@ func (r *SmartRouter) Route(ctx context.Context, modelID, modelName, backendType
|
||||
return result, nil
|
||||
}
|
||||
// No DB (non-distributed) — proceed without lock
|
||||
return loadModel()
|
||||
return loadModel(loadCtx)
|
||||
}
|
||||
|
||||
// parseSelectorJSON decodes a JSON node selector string into a map.
|
||||
|
||||
80
core/services/nodes/router_staging_context_test.go
Normal file
80
core/services/nodes/router_staging_context_test.go
Normal file
@@ -0,0 +1,80 @@
|
||||
package nodes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/mudler/LocalAI/core/services/messaging"
|
||||
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
|
||||
)
|
||||
|
||||
// cancelOnStageStager simulates the triggering HTTP request being abandoned
|
||||
// (client disconnect, ingress idle-timeout) the moment a multi-GB file starts
|
||||
// staging. It cancels the request context and records whether the context the
|
||||
// stager itself received was cancelled as a result.
|
||||
type cancelOnStageStager struct {
|
||||
fakeFileStager
|
||||
cancelRequest context.CancelFunc
|
||||
staged bool
|
||||
ctxErrOnStage error
|
||||
}
|
||||
|
||||
func (s *cancelOnStageStager) EnsureRemote(ctx context.Context, _, _, key string) (string, error) {
|
||||
s.staged = true
|
||||
// Mid-transfer: the client gives up on the (minutes-long) request.
|
||||
if s.cancelRequest != nil {
|
||||
s.cancelRequest()
|
||||
}
|
||||
// A multi-GB upload must survive this. If staging were bound to the
|
||||
// request context, ctx is now cancelled and the real HTTP stager would
|
||||
// abort with "context canceled" — exactly the production outage.
|
||||
s.ctxErrOnStage = ctx.Err()
|
||||
return "/remote/" + key, nil
|
||||
}
|
||||
|
||||
var _ = Describe("Route cold-load staging context", func() {
|
||||
It("detaches staging from the request context so a client disconnect cannot abort a multi-GB transfer", func() {
|
||||
// A real model file so stageModelFiles actually calls the stager
|
||||
// (non-existent paths are skipped).
|
||||
tmp := GinkgoT().TempDir()
|
||||
modelFile := filepath.Join(tmp, "big.gguf")
|
||||
Expect(os.WriteFile(modelFile, []byte("weights"), 0o644)).To(Succeed())
|
||||
|
||||
reg := &fakeModelRouter{
|
||||
findAndLockErr: errors.New("not loaded"),
|
||||
findIdleNode: &BackendNode{ID: "n1", Name: "worker-1", Address: "10.0.0.1:50051"},
|
||||
}
|
||||
backend := &stubBackend{loadResult: &pb.Result{Success: true}}
|
||||
factory := &stubClientFactory{client: backend}
|
||||
unloader := &fakeUnloader{installReply: &messaging.BackendInstallReply{
|
||||
Success: true,
|
||||
Address: "10.0.0.1:9001",
|
||||
}}
|
||||
stager := &cancelOnStageStager{}
|
||||
|
||||
router := NewSmartRouter(reg, SmartRouterOptions{
|
||||
Unloader: unloader,
|
||||
ClientFactory: factory,
|
||||
FileStager: stager,
|
||||
// DB nil: no advisory lock, exercises the same detached load ctx.
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
stager.cancelRequest = cancel
|
||||
defer cancel()
|
||||
|
||||
result, err := router.Route(ctx, "big-model", filepath.Join("models", "big.gguf"), "llama-cpp",
|
||||
&pb.ModelOptions{Model: "big.gguf", ModelFile: modelFile}, false)
|
||||
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(result).ToNot(BeNil())
|
||||
Expect(stager.staged).To(BeTrue(), "staging must have been attempted")
|
||||
Expect(stager.ctxErrOnStage).ToNot(HaveOccurred(),
|
||||
"staging context must survive cancellation of the triggering request")
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user