From 4f898820570ec3b2e05da4622a61bd13d2e81252 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Fri, 22 May 2026 20:08:45 +0000 Subject: [PATCH] feat(distributed): introduce galleryop.ErrWorkerStillInstalling sentinel When the NATS request-reply for backend.install (or .upgrade) times out the worker is almost always still pulling the OCI image. Wrap the timeout in a typed sentinel so the manager above can distinguish "worker hung" from "worker still working" and leave the pending_backend_ops row in place for the reconciler to confirm via backend.list. Signed-off-by: Ettore Di Giacinto --- core/services/galleryop/errors.go | 13 ++++++++++ core/services/nodes/unloader.go | 37 +++++++++++++++++++++++++--- core/services/nodes/unloader_test.go | 27 ++++++++++++++++++++ 3 files changed, 74 insertions(+), 3 deletions(-) create mode 100644 core/services/galleryop/errors.go diff --git a/core/services/galleryop/errors.go b/core/services/galleryop/errors.go new file mode 100644 index 000000000..064d90236 --- /dev/null +++ b/core/services/galleryop/errors.go @@ -0,0 +1,13 @@ +package galleryop + +import "errors" + +// ErrWorkerStillInstalling indicates a distributed backend install +// timed out at the NATS round-trip layer but the worker is most likely +// still pulling the OCI image in the background. Producers +// (DistributedBackendManager) wrap this when the round-trip times out; +// consumers (backendHandler) use errors.Is(err, ErrWorkerStillInstalling) +// to surface a yellow "in progress" OpStatus instead of a red error, +// leaving the pending_backend_ops row in place for the reconciler to +// confirm via backend.list. +var ErrWorkerStillInstalling = errors.New("worker did not reply in time; install may still be running in the background") diff --git a/core/services/nodes/unloader.go b/core/services/nodes/unloader.go index 909cb4b2f..dee967a9c 100644 --- a/core/services/nodes/unloader.go +++ b/core/services/nodes/unloader.go @@ -2,9 +2,14 @@ package nodes import ( "context" + "errors" "fmt" + "strings" "time" + "github.com/nats-io/nats.go" + + "github.com/mudler/LocalAI/core/services/galleryop" "github.com/mudler/LocalAI/core/services/messaging" "github.com/mudler/xlog" ) @@ -108,7 +113,7 @@ func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, gal subject := messaging.SubjectNodeBackendInstall(nodeID) xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex) - return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{ + reply, err := messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{ Backend: backendType, ModelID: modelID, BackendGalleries: galleriesJSON, @@ -117,6 +122,11 @@ func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, gal Alias: alias, ReplicaIndex: int32(replicaIndex), }, a.installTimeout) + if err != nil && isNATSTimeout(err) { + return nil, fmt.Errorf("%w (subject=%s nodeID=%s backend=%s): %v", + galleryop.ErrWorkerStillInstalling, subject, nodeID, backendType, err) + } + return reply, err } // UpgradeBackend sends a backend.upgrade request-reply to a worker node. @@ -132,7 +142,7 @@ func (a *RemoteUnloaderAdapter) UpgradeBackend(nodeID, backendType, galleriesJSO subject := messaging.SubjectNodeBackendUpgrade(nodeID) xlog.Info("Sending NATS backend.upgrade", "nodeID", nodeID, "backend", backendType, "replica", replicaIndex) - return messaging.RequestJSON[messaging.BackendUpgradeRequest, messaging.BackendUpgradeReply](a.nats, subject, messaging.BackendUpgradeRequest{ + reply, err := messaging.RequestJSON[messaging.BackendUpgradeRequest, messaging.BackendUpgradeReply](a.nats, subject, messaging.BackendUpgradeRequest{ Backend: backendType, BackendGalleries: galleriesJSON, URI: uri, @@ -140,6 +150,11 @@ func (a *RemoteUnloaderAdapter) UpgradeBackend(nodeID, backendType, galleriesJSO Alias: alias, ReplicaIndex: int32(replicaIndex), }, a.upgradeTimeout) + if err != nil && isNATSTimeout(err) { + return nil, fmt.Errorf("%w (subject=%s nodeID=%s backend=%s): %v", + galleryop.ErrWorkerStillInstalling, subject, nodeID, backendType, err) + } + return reply, err } // installWithForceFallback is the rolling-update fallback used by @@ -152,7 +167,7 @@ func (a *RemoteUnloaderAdapter) installWithForceFallback(nodeID, backendType, ga subject := messaging.SubjectNodeBackendInstall(nodeID) xlog.Warn("Falling back to legacy backend.install Force=true (old worker)", "nodeID", nodeID, "backend", backendType) - return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{ + reply, err := messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{ Backend: backendType, BackendGalleries: galleriesJSON, URI: uri, @@ -161,6 +176,11 @@ func (a *RemoteUnloaderAdapter) installWithForceFallback(nodeID, backendType, ga ReplicaIndex: int32(replicaIndex), Force: true, }, a.upgradeTimeout) + if err != nil && isNATSTimeout(err) { + return nil, fmt.Errorf("%w (subject=%s nodeID=%s backend=%s): %v", + galleryop.ErrWorkerStillInstalling, subject, nodeID, backendType, err) + } + return reply, err } // ListBackends queries a worker node for its installed backends via NATS request-reply. @@ -239,3 +259,14 @@ func (a *RemoteUnloaderAdapter) StopNode(nodeID string) error { subject := messaging.SubjectNodeStop(nodeID) return a.nats.Publish(subject, nil) } + +// isNATSTimeout returns true if err looks like a NATS request-reply timeout. +// nats.ErrTimeout is the canonical sentinel; context.DeadlineExceeded can +// also surface depending on the client's path; we accept both, plus a +// string-match fallback for clients that return a bare error. +func isNATSTimeout(err error) bool { + if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) { + return true + } + return err != nil && strings.Contains(err.Error(), "nats: timeout") +} diff --git a/core/services/nodes/unloader_test.go b/core/services/nodes/unloader_test.go index dd4dc5ea9..e894b1acd 100644 --- a/core/services/nodes/unloader_test.go +++ b/core/services/nodes/unloader_test.go @@ -3,13 +3,16 @@ package nodes import ( "context" "encoding/json" + "errors" "fmt" "sync" "time" + "github.com/nats-io/nats.go" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/mudler/LocalAI/core/services/galleryop" "github.com/mudler/LocalAI/core/services/messaging" ) @@ -286,3 +289,27 @@ var _ = Describe("RemoteUnloaderAdapter timeout configuration", func() { Expect(mc.calls[0].Timeout).To(Equal(11 * time.Minute)) }) }) + +var _ = Describe("RemoteUnloaderAdapter NATS timeout handling", func() { + It("wraps nats.ErrTimeout from InstallBackend in galleryop.ErrWorkerStillInstalling", func() { + mc := newScriptedMessagingClient() + mc.scriptErr(messaging.SubjectNodeBackendInstall("n1"), nats.ErrTimeout) + adapter := NewRemoteUnloaderAdapter(nil, mc, 100*time.Millisecond, 1*time.Second) + + _, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0) + Expect(err).To(HaveOccurred()) + Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeTrue(), + "expected wrapped ErrWorkerStillInstalling, got %v", err) + }) + + It("does NOT wrap non-timeout errors", func() { + mc := newScriptedMessagingClient() + mc.scriptErr(messaging.SubjectNodeBackendInstall("n1"), nats.ErrNoResponders) + adapter := NewRemoteUnloaderAdapter(nil, mc, 100*time.Millisecond, 1*time.Second) + + _, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0) + Expect(err).To(HaveOccurred()) + Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeFalse()) + Expect(errors.Is(err, nats.ErrNoResponders)).To(BeTrue()) + }) +})