diff --git a/core/services/nodes/router.go b/core/services/nodes/router.go index ccbf48f43..f26fea2b9 100644 --- a/core/services/nodes/router.go +++ b/core/services/nodes/router.go @@ -359,8 +359,21 @@ func (r *SmartRouter) Route(ctx context.Context, modelID, modelName, backendType } } - // Step 2: Model not loaded — schedule loading with distributed lock to prevent duplicates - loadModel := func() (*RouteResult, error) { + // Step 2: Model not loaded — schedule loading with distributed lock to prevent duplicates. + // + // Detach the cold-load from the caller's context. Staging a model can + // transfer multiple GB to a worker, which takes far longer than any client + // keeps its HTTP request open — a browser refresh, an ingress/LB idle + // timeout, or a round-robined retry landing on another replica all cancel + // the request context. If staging were bound to it, the multi-GB upload + // aborts with "context canceled" mid-transfer and large models can never + // finish staging (the model-load outage). WithoutCancel keeps the request's + // values (prefix chain, etc.) but drops its cancellation/deadline. Each + // long step still has its own bound (the file stager's resume budget, + // LoadModel's 5m timeout), and the per-model advisory lock below de-dupes + // concurrent loaders across replicas. + loadCtx := context.WithoutCancel(ctx) + loadModel := func(ctx context.Context) (*RouteResult, error) { // Re-check after acquiring lock — another request may have loaded it node, nm, err := r.registry.FindAndLockNodeWithModel(ctx, trackingKey, candidateNodeIDs, pref) if err == nil && node != nil { @@ -433,9 +446,9 @@ func (r *SmartRouter) Route(ctx context.Context, modelID, modelName, backendType if r.db != nil { lockKey := advisorylock.KeyFromString("model-load:" + trackingKey) var result *RouteResult - lockErr := advisorylock.WithLockCtx(ctx, r.db, lockKey, func() error { + lockErr := advisorylock.WithLockCtx(loadCtx, r.db, lockKey, func() error { var err error - result, err = loadModel() + result, err = loadModel(loadCtx) return err }) if lockErr != nil { @@ -444,7 +457,7 @@ func (r *SmartRouter) Route(ctx context.Context, modelID, modelName, backendType return result, nil } // No DB (non-distributed) — proceed without lock - return loadModel() + return loadModel(loadCtx) } // parseSelectorJSON decodes a JSON node selector string into a map. diff --git a/core/services/nodes/router_staging_context_test.go b/core/services/nodes/router_staging_context_test.go new file mode 100644 index 000000000..6fa892689 --- /dev/null +++ b/core/services/nodes/router_staging_context_test.go @@ -0,0 +1,80 @@ +package nodes + +import ( + "context" + "errors" + "os" + "path/filepath" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/services/messaging" + pb "github.com/mudler/LocalAI/pkg/grpc/proto" +) + +// cancelOnStageStager simulates the triggering HTTP request being abandoned +// (client disconnect, ingress idle-timeout) the moment a multi-GB file starts +// staging. It cancels the request context and records whether the context the +// stager itself received was cancelled as a result. +type cancelOnStageStager struct { + fakeFileStager + cancelRequest context.CancelFunc + staged bool + ctxErrOnStage error +} + +func (s *cancelOnStageStager) EnsureRemote(ctx context.Context, _, _, key string) (string, error) { + s.staged = true + // Mid-transfer: the client gives up on the (minutes-long) request. + if s.cancelRequest != nil { + s.cancelRequest() + } + // A multi-GB upload must survive this. If staging were bound to the + // request context, ctx is now cancelled and the real HTTP stager would + // abort with "context canceled" — exactly the production outage. + s.ctxErrOnStage = ctx.Err() + return "/remote/" + key, nil +} + +var _ = Describe("Route cold-load staging context", func() { + It("detaches staging from the request context so a client disconnect cannot abort a multi-GB transfer", func() { + // A real model file so stageModelFiles actually calls the stager + // (non-existent paths are skipped). + tmp := GinkgoT().TempDir() + modelFile := filepath.Join(tmp, "big.gguf") + Expect(os.WriteFile(modelFile, []byte("weights"), 0o644)).To(Succeed()) + + reg := &fakeModelRouter{ + findAndLockErr: errors.New("not loaded"), + findIdleNode: &BackendNode{ID: "n1", Name: "worker-1", Address: "10.0.0.1:50051"}, + } + backend := &stubBackend{loadResult: &pb.Result{Success: true}} + factory := &stubClientFactory{client: backend} + unloader := &fakeUnloader{installReply: &messaging.BackendInstallReply{ + Success: true, + Address: "10.0.0.1:9001", + }} + stager := &cancelOnStageStager{} + + router := NewSmartRouter(reg, SmartRouterOptions{ + Unloader: unloader, + ClientFactory: factory, + FileStager: stager, + // DB nil: no advisory lock, exercises the same detached load ctx. + }) + + ctx, cancel := context.WithCancel(context.Background()) + stager.cancelRequest = cancel + defer cancel() + + result, err := router.Route(ctx, "big-model", filepath.Join("models", "big.gguf"), "llama-cpp", + &pb.ModelOptions{Model: "big.gguf", ModelFile: modelFile}, false) + + Expect(err).ToNot(HaveOccurred()) + Expect(result).ToNot(BeNil()) + Expect(stager.staged).To(BeTrue(), "staging must have been attempted") + Expect(stager.ctxErrOnStage).ToNot(HaveOccurred(), + "staging context must survive cancellation of the triggering request") + }) +})