feat(distributed): inject NATS install/upgrade timeouts into RemoteUnloaderAdapter

Removes the hardcoded 3m / 15m literals from RemoteUnloaderAdapter and
threads in DistributedConfig.BackendInstallTimeoutOrDefault() and
BackendUpgradeTimeoutOrDefault() at construction. Install now defaults
to 15m (was 3m); cold OCI image pulls on Jetson Wi-Fi routinely blew
past the old ceiling. Scripted messaging client captures the timeout
so tests can assert the configured value actually reaches the NATS
request.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2026-05-22 19:56:43 +00:00
parent 4306b730ed
commit f9b47c6eab
9 changed files with 86 additions and 38 deletions

View File

@@ -233,7 +233,12 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoade
xlog.Info("File stager initialized (HTTP direct transfer)")
}
// Create RemoteUnloaderAdapter — needed by SmartRouter and startup.go
remoteUnloader := nodes.NewRemoteUnloaderAdapter(registry, natsClient)
remoteUnloader := nodes.NewRemoteUnloaderAdapter(
registry,
natsClient,
cfg.Distributed.BackendInstallTimeoutOrDefault(),
cfg.Distributed.BackendUpgradeTimeoutOrDefault(),
)
// All dependencies ready — build SmartRouter with all options at once
var conflictResolver nodes.ConcurrencyConflictResolver

View File

@@ -98,10 +98,10 @@ func (s *scriptedMessagingClient) scriptReplyMatching(subject string, pred func(
})
}
func (s *scriptedMessagingClient) Request(subject string, data []byte, _ time.Duration) ([]byte, error) {
func (s *scriptedMessagingClient) Request(subject string, data []byte, timeout time.Duration) ([]byte, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.calls = append(s.calls, requestCall{Subject: subject, Data: data})
s.calls = append(s.calls, requestCall{Subject: subject, Data: data, Timeout: timeout})
// Predicate-matched replies take precedence over flat scriptReply.
if matchers, ok := s.matchedReplies[subject]; ok {
@@ -204,7 +204,7 @@ var _ = Describe("DistributedBackendManager", func() {
Expect(err).ToNot(HaveOccurred())
mc = newScriptedMessagingClient()
adapter = NewRemoteUnloaderAdapter(nil, mc)
adapter = NewRemoteUnloaderAdapter(nil, mc, 3*time.Minute, 15*time.Minute)
mgr = &DistributedBackendManager{
local: stubLocalBackendManager{},
adapter: adapter,

View File

@@ -43,15 +43,23 @@ type NodeCommandSender interface {
// This mirrors the local ModelLoader's startProcess()/deleteProcess() but
// over NATS for remote nodes.
type RemoteUnloaderAdapter struct {
registry ModelLocator
nats messaging.MessagingClient
registry ModelLocator
nats messaging.MessagingClient
installTimeout time.Duration
upgradeTimeout time.Duration
}
// NewRemoteUnloaderAdapter creates a new adapter.
func NewRemoteUnloaderAdapter(registry ModelLocator, nats messaging.MessagingClient) *RemoteUnloaderAdapter {
// NewRemoteUnloaderAdapter creates a new adapter. installTimeout and
// upgradeTimeout govern the NATS request-reply deadlines for backend.install
// and backend.upgrade respectively. Use
// DistributedConfig.BackendInstallTimeoutOrDefault() /
// BackendUpgradeTimeoutOrDefault() at construction.
func NewRemoteUnloaderAdapter(registry ModelLocator, nats messaging.MessagingClient, installTimeout, upgradeTimeout time.Duration) *RemoteUnloaderAdapter {
return &RemoteUnloaderAdapter{
registry: registry,
nats: nats,
registry: registry,
nats: nats,
installTimeout: installTimeout,
upgradeTimeout: upgradeTimeout,
}
}
@@ -87,11 +95,13 @@ func (a *RemoteUnloaderAdapter) UnloadRemoteModel(modelName string) error {
// is on disk, the worker just spawns a process; only a missing binary
// triggers a full gallery pull.
//
// Timeout: 3 minutes. Most calls return in under 2 seconds (process already
// running). The 3-minute ceiling covers the cold-binary spawn-after-download
// case while still failing fast enough to surface real worker hangs.
// Timeout: configured via DistributedConfig.BackendInstallTimeoutOrDefault
// (default 15m). Most calls return in under 2 seconds (process already
// running). The 15-minute ceiling covers the cold-binary spawn-after-download
// case on slow links (Jetson Wi-Fi, multi-GB CUDA images) while still
// failing fast enough to surface real worker hangs.
//
// For force-reinstall (admin-driven Upgrade), use UpgradeBackend instead
// For force-reinstall (admin-driven Upgrade), use UpgradeBackend instead -
// it lives on a different NATS subject so it cannot head-of-line-block
// routine load traffic on the same worker.
func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) {
@@ -106,17 +116,18 @@ func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, gal
Name: name,
Alias: alias,
ReplicaIndex: int32(replicaIndex),
}, 3*time.Minute)
}, a.installTimeout)
}
// UpgradeBackend sends a backend.upgrade request-reply to a worker node.
// The worker stops every live process for this backend, force-reinstalls
// from the gallery (overwriting the on-disk artifact), and replies. The
// next routine InstallBackend call spawns a fresh process with the new
// binary upgrade itself does not start a process.
// binary - upgrade itself does not start a process.
//
// Timeout: 15 minutes. Real-world worst case observed: 810 minutes for
// large CUDA-l4t backend images on Jetson over WiFi.
// Timeout: configured via DistributedConfig.BackendUpgradeTimeoutOrDefault
// (default 15m). Real-world worst case observed: 8-10 minutes for large
// CUDA-l4t backend images on Jetson over WiFi.
func (a *RemoteUnloaderAdapter) UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error) {
subject := messaging.SubjectNodeBackendUpgrade(nodeID)
xlog.Info("Sending NATS backend.upgrade", "nodeID", nodeID, "backend", backendType, "replica", replicaIndex)
@@ -128,7 +139,7 @@ func (a *RemoteUnloaderAdapter) UpgradeBackend(nodeID, backendType, galleriesJSO
Name: name,
Alias: alias,
ReplicaIndex: int32(replicaIndex),
}, 15*time.Minute)
}, a.upgradeTimeout)
}
// installWithForceFallback is the rolling-update fallback used by
@@ -149,7 +160,7 @@ func (a *RemoteUnloaderAdapter) installWithForceFallback(nodeID, backendType, ga
Alias: alias,
ReplicaIndex: int32(replicaIndex),
Force: true,
}, 15*time.Minute)
}, a.upgradeTimeout)
}
// ListBackends queries a worker node for its installed backends via NATS request-reply.

View File

@@ -60,6 +60,7 @@ type publishCall struct {
type requestCall struct {
Subject string
Data []byte
Timeout time.Duration
}
func (f *fakeMessagingClient) Publish(subject string, data any) error {
@@ -93,10 +94,10 @@ func (f *fakeMessagingClient) SubscribeReply(_ string, _ func(data []byte, reply
return &fakeSubscription{}, nil
}
func (f *fakeMessagingClient) Request(subject string, data []byte, _ time.Duration) ([]byte, error) {
func (f *fakeMessagingClient) Request(subject string, data []byte, timeout time.Duration) ([]byte, error) {
f.mu.Lock()
defer f.mu.Unlock()
f.requestCalls = append(f.requestCalls, requestCall{Subject: subject, Data: data})
f.requestCalls = append(f.requestCalls, requestCall{Subject: subject, Data: data, Timeout: timeout})
return f.requestReply, f.requestErr
}
@@ -119,7 +120,7 @@ var _ = Describe("RemoteUnloaderAdapter", func() {
BeforeEach(func() {
locator = &fakeModelLocator{}
mc = &fakeMessagingClient{}
adapter = NewRemoteUnloaderAdapter(locator, mc)
adapter = NewRemoteUnloaderAdapter(locator, mc, 3*time.Minute, 15*time.Minute)
})
Describe("UnloadRemoteModel", func() {
@@ -154,7 +155,7 @@ var _ = Describe("RemoteUnloaderAdapter", func() {
}
// Use a messaging client that fails the first Publish call only.
failOnce := &failOnceMessagingClient{inner: mc, failOn: 0}
adapter = NewRemoteUnloaderAdapter(locator, failOnce)
adapter = NewRemoteUnloaderAdapter(locator, failOnce, 3*time.Minute, 15*time.Minute)
Expect(adapter.UnloadRemoteModel("llama")).To(Succeed())
@@ -259,3 +260,29 @@ func (f *failOnceMessagingClient) Request(subject string, data []byte, timeout t
func (f *failOnceMessagingClient) IsConnected() bool { return true }
func (f *failOnceMessagingClient) Close() {}
var _ = Describe("RemoteUnloaderAdapter timeout configuration", func() {
It("passes the configured install timeout to the messaging client", func() {
mc := newScriptedMessagingClient()
mc.scriptReply(messaging.SubjectNodeBackendInstall("n1"), messaging.BackendInstallReply{Success: true, Address: "127.0.0.1:0"})
adapter := NewRemoteUnloaderAdapter(nil, mc, 7*time.Minute, 11*time.Minute)
_, err := adapter.InstallBackend("n1", "llama-cpp", "", "[]", "", "", "", 0)
Expect(err).ToNot(HaveOccurred())
Expect(mc.calls).To(HaveLen(1))
Expect(mc.calls[0].Timeout).To(Equal(7 * time.Minute))
})
It("passes the configured upgrade timeout to the messaging client", func() {
mc := newScriptedMessagingClient()
mc.scriptReply(messaging.SubjectNodeBackendUpgrade("n1"), messaging.BackendUpgradeReply{Success: true})
adapter := NewRemoteUnloaderAdapter(nil, mc, 7*time.Minute, 11*time.Minute)
_, err := adapter.UpgradeBackend("n1", "llama-cpp", "[]", "", "", "", 0)
Expect(err).ToNot(HaveOccurred())
Expect(mc.calls).To(HaveLen(1))
Expect(mc.calls[0].Timeout).To(Equal(11 * time.Minute))
})
})

View File

@@ -1,6 +1,8 @@
package nodes
import (
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
@@ -15,7 +17,7 @@ var _ = Describe("RemoteUnloaderAdapter.UpgradeBackend", func() {
mc.scriptReply(messaging.SubjectNodeBackendUpgrade(nodeID),
messaging.BackendUpgradeReply{Success: true})
adapter := NewRemoteUnloaderAdapter(nil, mc)
adapter := NewRemoteUnloaderAdapter(nil, mc, 3*time.Minute, 15*time.Minute)
reply, err := adapter.UpgradeBackend(nodeID, "llama-cpp", `[{"name":"x"}]`, "", "", "", 0)
Expect(err).ToNot(HaveOccurred())
Expect(reply.Success).To(BeTrue())
@@ -24,7 +26,7 @@ var _ = Describe("RemoteUnloaderAdapter.UpgradeBackend", func() {
It("returns the underlying error when the subject has no responders", func() {
mc := newScriptedMessagingClient() // unscripted subject => fakeNoRespondersErr by harness convention
adapter := NewRemoteUnloaderAdapter(nil, mc)
adapter := NewRemoteUnloaderAdapter(nil, mc, 3*time.Minute, 15*time.Minute)
_, err := adapter.UpgradeBackend("missing-node", "llama-cpp", "", "", "", "", 0)
Expect(err).To(HaveOccurred())
})

View File

@@ -225,7 +225,7 @@ var _ = Describe("Full Distributed Inference Flow", Label("Distributed"), func()
// newTestSmartRouter creates a SmartRouter with NATS wired up and a mock
// backend.install handler that always replies success for all registered nodes.
newTestSmartRouter := func(reg *nodes.NodeRegistry, extraOpts ...nodes.SmartRouterOptions) *nodes.SmartRouter {
unloader := nodes.NewRemoteUnloaderAdapter(reg, infra.NC)
unloader := nodes.NewRemoteUnloaderAdapter(reg, infra.NC, 3*time.Minute, 15*time.Minute)
opts := nodes.SmartRouterOptions{
Unloader: unloader,
@@ -395,7 +395,7 @@ var _ = Describe("Full Distributed Inference Flow", Label("Distributed"), func()
Expect(err).ToNot(HaveOccurred())
// Create RemoteUnloaderAdapter and unload model
unloader := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
unloader := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
err = unloader.UnloadRemoteModel("old-model")
Expect(err).ToNot(HaveOccurred())

View File

@@ -6,6 +6,7 @@ import (
"os"
"path/filepath"
"sync/atomic"
"time"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/services/galleryop"
@@ -175,7 +176,7 @@ var _ = Describe("Model and Backend Managers", Label("Distributed"), func() {
appCfg := config.NewApplicationConfig()
appCfg.SystemState = ss
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
distMgr := nodes.NewDistributedModelManager(appCfg, ml, adapter)
err = distMgr.DeleteModel("big-model")
@@ -251,7 +252,7 @@ var _ = Describe("Model and Backend Managers", Label("Distributed"), func() {
appCfg := config.NewApplicationConfig()
appCfg.SystemState = ss
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry)
err = distMgr.DeleteBackend("my-backend")
@@ -298,7 +299,7 @@ var _ = Describe("Model and Backend Managers", Label("Distributed"), func() {
appCfg := config.NewApplicationConfig()
appCfg.SystemState = ss
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry)
// Should NOT return an error even though the backend doesn't exist locally

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"sync/atomic"
"time"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/nodes"
@@ -56,7 +57,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
installReply, err := adapter.InstallBackend(node.ID, "llama-cpp", "", "", "", "", "", 0)
Expect(err).ToNot(HaveOccurred())
Expect(installReply.Success).To(BeTrue())
@@ -77,7 +78,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
installReply, err := adapter.InstallBackend(node.ID, "nonexistent", "", "", "", "", "", 0)
Expect(err).ToNot(HaveOccurred())
Expect(installReply.Success).To(BeFalse())
@@ -103,7 +104,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
// Frontend calls UnloadRemoteModel (triggered by UI "Stop" or WatchDog)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
Expect(adapter.UnloadRemoteModel("whisper-large")).To(Succeed())
Eventually(func() int32 { return stopReceived.Load() }, "5s").Should(Equal(int32(1)))
@@ -133,14 +134,14 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
adapter.UnloadRemoteModel("shared-model")
Eventually(func() int32 { return count.Load() }, "5s").Should(Equal(int32(2)))
})
It("should be no-op for models not on any node", func() {
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
Expect(adapter.UnloadRemoteModel("nonexistent-model")).To(Succeed())
})
})
@@ -161,7 +162,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
Expect(adapter.StopNode(node.ID)).To(Succeed())
Eventually(func() int32 { return stopped.Load() }, "5s").Should(Equal(int32(1)))

View File

@@ -3,6 +3,7 @@ package distributed_test
import (
"context"
"encoding/json"
"time"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/nodes"
@@ -78,7 +79,7 @@ var _ = Describe("SmartRouter trackingKey", Label("Distributed"), func() {
Expect(registry.Register(context.Background(), node, true)).To(Succeed())
nodeID = node.ID
unloader := nodes.NewRemoteUnloaderAdapter(registry, infra.NC)
unloader := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
router = nodes.NewSmartRouter(registry, nodes.SmartRouterOptions{
Unloader: unloader,
})