mirror of
https://github.com/mudler/LocalAI.git
synced 2026-05-23 16:20:01 -04:00
feat(distributed): introduce galleryop.ErrWorkerStillInstalling sentinel
When the NATS request-reply for backend.install (or .upgrade) times out the worker is almost always still pulling the OCI image. Wrap the timeout in a typed sentinel so the manager above can distinguish "worker hung" from "worker still working" and leave the pending_backend_ops row in place for the reconciler to confirm via backend.list. Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
13
core/services/galleryop/errors.go
Normal file
13
core/services/galleryop/errors.go
Normal file
@@ -0,0 +1,13 @@
|
||||
package galleryop
|
||||
|
||||
import "errors"
|
||||
|
||||
// ErrWorkerStillInstalling indicates a distributed backend install
|
||||
// timed out at the NATS round-trip layer but the worker is most likely
|
||||
// still pulling the OCI image in the background. Producers
|
||||
// (DistributedBackendManager) wrap this when the round-trip times out;
|
||||
// consumers (backendHandler) use errors.Is(err, ErrWorkerStillInstalling)
|
||||
// to surface a yellow "in progress" OpStatus instead of a red error,
|
||||
// leaving the pending_backend_ops row in place for the reconciler to
|
||||
// confirm via backend.list.
|
||||
var ErrWorkerStillInstalling = errors.New("worker did not reply in time; install may still be running in the background")
|
||||
@@ -2,9 +2,14 @@ package nodes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
|
||||
"github.com/mudler/LocalAI/core/services/galleryop"
|
||||
"github.com/mudler/LocalAI/core/services/messaging"
|
||||
"github.com/mudler/xlog"
|
||||
)
|
||||
@@ -108,7 +113,7 @@ func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, gal
|
||||
subject := messaging.SubjectNodeBackendInstall(nodeID)
|
||||
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex)
|
||||
|
||||
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
|
||||
reply, err := messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
|
||||
Backend: backendType,
|
||||
ModelID: modelID,
|
||||
BackendGalleries: galleriesJSON,
|
||||
@@ -117,6 +122,11 @@ func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, gal
|
||||
Alias: alias,
|
||||
ReplicaIndex: int32(replicaIndex),
|
||||
}, a.installTimeout)
|
||||
if err != nil && isNATSTimeout(err) {
|
||||
return nil, fmt.Errorf("%w (subject=%s nodeID=%s backend=%s): %v",
|
||||
galleryop.ErrWorkerStillInstalling, subject, nodeID, backendType, err)
|
||||
}
|
||||
return reply, err
|
||||
}
|
||||
|
||||
// UpgradeBackend sends a backend.upgrade request-reply to a worker node.
|
||||
@@ -132,7 +142,7 @@ func (a *RemoteUnloaderAdapter) UpgradeBackend(nodeID, backendType, galleriesJSO
|
||||
subject := messaging.SubjectNodeBackendUpgrade(nodeID)
|
||||
xlog.Info("Sending NATS backend.upgrade", "nodeID", nodeID, "backend", backendType, "replica", replicaIndex)
|
||||
|
||||
return messaging.RequestJSON[messaging.BackendUpgradeRequest, messaging.BackendUpgradeReply](a.nats, subject, messaging.BackendUpgradeRequest{
|
||||
reply, err := messaging.RequestJSON[messaging.BackendUpgradeRequest, messaging.BackendUpgradeReply](a.nats, subject, messaging.BackendUpgradeRequest{
|
||||
Backend: backendType,
|
||||
BackendGalleries: galleriesJSON,
|
||||
URI: uri,
|
||||
@@ -140,6 +150,11 @@ func (a *RemoteUnloaderAdapter) UpgradeBackend(nodeID, backendType, galleriesJSO
|
||||
Alias: alias,
|
||||
ReplicaIndex: int32(replicaIndex),
|
||||
}, a.upgradeTimeout)
|
||||
if err != nil && isNATSTimeout(err) {
|
||||
return nil, fmt.Errorf("%w (subject=%s nodeID=%s backend=%s): %v",
|
||||
galleryop.ErrWorkerStillInstalling, subject, nodeID, backendType, err)
|
||||
}
|
||||
return reply, err
|
||||
}
|
||||
|
||||
// installWithForceFallback is the rolling-update fallback used by
|
||||
@@ -152,7 +167,7 @@ func (a *RemoteUnloaderAdapter) installWithForceFallback(nodeID, backendType, ga
|
||||
subject := messaging.SubjectNodeBackendInstall(nodeID)
|
||||
xlog.Warn("Falling back to legacy backend.install Force=true (old worker)", "nodeID", nodeID, "backend", backendType)
|
||||
|
||||
return messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
|
||||
reply, err := messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
|
||||
Backend: backendType,
|
||||
BackendGalleries: galleriesJSON,
|
||||
URI: uri,
|
||||
@@ -161,6 +176,11 @@ func (a *RemoteUnloaderAdapter) installWithForceFallback(nodeID, backendType, ga
|
||||
ReplicaIndex: int32(replicaIndex),
|
||||
Force: true,
|
||||
}, a.upgradeTimeout)
|
||||
if err != nil && isNATSTimeout(err) {
|
||||
return nil, fmt.Errorf("%w (subject=%s nodeID=%s backend=%s): %v",
|
||||
galleryop.ErrWorkerStillInstalling, subject, nodeID, backendType, err)
|
||||
}
|
||||
return reply, err
|
||||
}
|
||||
|
||||
// ListBackends queries a worker node for its installed backends via NATS request-reply.
|
||||
@@ -239,3 +259,14 @@ func (a *RemoteUnloaderAdapter) StopNode(nodeID string) error {
|
||||
subject := messaging.SubjectNodeStop(nodeID)
|
||||
return a.nats.Publish(subject, nil)
|
||||
}
|
||||
|
||||
// isNATSTimeout returns true if err looks like a NATS request-reply timeout.
|
||||
// nats.ErrTimeout is the canonical sentinel; context.DeadlineExceeded can
|
||||
// also surface depending on the client's path; we accept both, plus a
|
||||
// string-match fallback for clients that return a bare error.
|
||||
func isNATSTimeout(err error) bool {
|
||||
if errors.Is(err, nats.ErrTimeout) || errors.Is(err, context.DeadlineExceeded) {
|
||||
return true
|
||||
}
|
||||
return err != nil && strings.Contains(err.Error(), "nats: timeout")
|
||||
}
|
||||
|
||||
@@ -3,13 +3,16 @@ package nodes
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/mudler/LocalAI/core/services/galleryop"
|
||||
"github.com/mudler/LocalAI/core/services/messaging"
|
||||
)
|
||||
|
||||
@@ -286,3 +289,27 @@ var _ = Describe("RemoteUnloaderAdapter timeout configuration", func() {
|
||||
Expect(mc.calls[0].Timeout).To(Equal(11 * time.Minute))
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("RemoteUnloaderAdapter NATS timeout handling", func() {
|
||||
It("wraps nats.ErrTimeout from InstallBackend in galleryop.ErrWorkerStillInstalling", func() {
|
||||
mc := newScriptedMessagingClient()
|
||||
mc.scriptErr(messaging.SubjectNodeBackendInstall("n1"), nats.ErrTimeout)
|
||||
adapter := NewRemoteUnloaderAdapter(nil, mc, 100*time.Millisecond, 1*time.Second)
|
||||
|
||||
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0)
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeTrue(),
|
||||
"expected wrapped ErrWorkerStillInstalling, got %v", err)
|
||||
})
|
||||
|
||||
It("does NOT wrap non-timeout errors", func() {
|
||||
mc := newScriptedMessagingClient()
|
||||
mc.scriptErr(messaging.SubjectNodeBackendInstall("n1"), nats.ErrNoResponders)
|
||||
adapter := NewRemoteUnloaderAdapter(nil, mc, 100*time.Millisecond, 1*time.Second)
|
||||
|
||||
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0)
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeFalse())
|
||||
Expect(errors.Is(err, nats.ErrNoResponders)).To(BeTrue())
|
||||
})
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user