From f9b47c6eabdfab6653988fb32da6b6b71e991dea Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Fri, 22 May 2026 19:56:43 +0000 Subject: [PATCH] feat(distributed): inject NATS install/upgrade timeouts into RemoteUnloaderAdapter Removes the hardcoded 3m / 15m literals from RemoteUnloaderAdapter and threads in DistributedConfig.BackendInstallTimeoutOrDefault() and BackendUpgradeTimeoutOrDefault() at construction. Install now defaults to 15m (was 3m); cold OCI image pulls on Jetson Wi-Fi routinely blew past the old ceiling. Scripted messaging client captures the timeout so tests can assert the configured value actually reaches the NATS request. Signed-off-by: Ettore Di Giacinto --- core/application/distributed.go | 7 ++- .../nodes/managers_distributed_test.go | 6 +-- core/services/nodes/unloader.go | 43 ++++++++++++------- core/services/nodes/unloader_test.go | 35 +++++++++++++-- core/services/nodes/unloader_upgrade_test.go | 6 ++- .../distributed/distributed_full_flow_test.go | 4 +- tests/e2e/distributed/managers_test.go | 7 +-- tests/e2e/distributed/node_lifecycle_test.go | 13 +++--- tests/e2e/distributed/router_tracking_test.go | 3 +- 9 files changed, 86 insertions(+), 38 deletions(-) diff --git a/core/application/distributed.go b/core/application/distributed.go index 7662992df..a82ca1931 100644 --- a/core/application/distributed.go +++ b/core/application/distributed.go @@ -233,7 +233,12 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoade xlog.Info("File stager initialized (HTTP direct transfer)") } // Create RemoteUnloaderAdapter — needed by SmartRouter and startup.go - remoteUnloader := nodes.NewRemoteUnloaderAdapter(registry, natsClient) + remoteUnloader := nodes.NewRemoteUnloaderAdapter( + registry, + natsClient, + cfg.Distributed.BackendInstallTimeoutOrDefault(), + cfg.Distributed.BackendUpgradeTimeoutOrDefault(), + ) // All dependencies ready — build SmartRouter with all options at once var conflictResolver nodes.ConcurrencyConflictResolver diff --git a/core/services/nodes/managers_distributed_test.go b/core/services/nodes/managers_distributed_test.go index 793ed5659..10df15214 100644 --- a/core/services/nodes/managers_distributed_test.go +++ b/core/services/nodes/managers_distributed_test.go @@ -98,10 +98,10 @@ func (s *scriptedMessagingClient) scriptReplyMatching(subject string, pred func( }) } -func (s *scriptedMessagingClient) Request(subject string, data []byte, _ time.Duration) ([]byte, error) { +func (s *scriptedMessagingClient) Request(subject string, data []byte, timeout time.Duration) ([]byte, error) { s.mu.Lock() defer s.mu.Unlock() - s.calls = append(s.calls, requestCall{Subject: subject, Data: data}) + s.calls = append(s.calls, requestCall{Subject: subject, Data: data, Timeout: timeout}) // Predicate-matched replies take precedence over flat scriptReply. if matchers, ok := s.matchedReplies[subject]; ok { @@ -204,7 +204,7 @@ var _ = Describe("DistributedBackendManager", func() { Expect(err).ToNot(HaveOccurred()) mc = newScriptedMessagingClient() - adapter = NewRemoteUnloaderAdapter(nil, mc) + adapter = NewRemoteUnloaderAdapter(nil, mc, 3*time.Minute, 15*time.Minute) mgr = &DistributedBackendManager{ local: stubLocalBackendManager{}, adapter: adapter, diff --git a/core/services/nodes/unloader.go b/core/services/nodes/unloader.go index 611a34dee..909cb4b2f 100644 --- a/core/services/nodes/unloader.go +++ b/core/services/nodes/unloader.go @@ -43,15 +43,23 @@ type NodeCommandSender interface { // This mirrors the local ModelLoader's startProcess()/deleteProcess() but // over NATS for remote nodes. type RemoteUnloaderAdapter struct { - registry ModelLocator - nats messaging.MessagingClient + registry ModelLocator + nats messaging.MessagingClient + installTimeout time.Duration + upgradeTimeout time.Duration } -// NewRemoteUnloaderAdapter creates a new adapter. -func NewRemoteUnloaderAdapter(registry ModelLocator, nats messaging.MessagingClient) *RemoteUnloaderAdapter { +// NewRemoteUnloaderAdapter creates a new adapter. installTimeout and +// upgradeTimeout govern the NATS request-reply deadlines for backend.install +// and backend.upgrade respectively. Use +// DistributedConfig.BackendInstallTimeoutOrDefault() / +// BackendUpgradeTimeoutOrDefault() at construction. +func NewRemoteUnloaderAdapter(registry ModelLocator, nats messaging.MessagingClient, installTimeout, upgradeTimeout time.Duration) *RemoteUnloaderAdapter { return &RemoteUnloaderAdapter{ - registry: registry, - nats: nats, + registry: registry, + nats: nats, + installTimeout: installTimeout, + upgradeTimeout: upgradeTimeout, } } @@ -87,11 +95,13 @@ func (a *RemoteUnloaderAdapter) UnloadRemoteModel(modelName string) error { // is on disk, the worker just spawns a process; only a missing binary // triggers a full gallery pull. // -// Timeout: 3 minutes. Most calls return in under 2 seconds (process already -// running). The 3-minute ceiling covers the cold-binary spawn-after-download -// case while still failing fast enough to surface real worker hangs. +// Timeout: configured via DistributedConfig.BackendInstallTimeoutOrDefault +// (default 15m). Most calls return in under 2 seconds (process already +// running). The 15-minute ceiling covers the cold-binary spawn-after-download +// case on slow links (Jetson Wi-Fi, multi-GB CUDA images) while still +// failing fast enough to surface real worker hangs. // -// For force-reinstall (admin-driven Upgrade), use UpgradeBackend instead — +// For force-reinstall (admin-driven Upgrade), use UpgradeBackend instead - // it lives on a different NATS subject so it cannot head-of-line-block // routine load traffic on the same worker. func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) { @@ -106,17 +116,18 @@ func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, gal Name: name, Alias: alias, ReplicaIndex: int32(replicaIndex), - }, 3*time.Minute) + }, a.installTimeout) } // UpgradeBackend sends a backend.upgrade request-reply to a worker node. // The worker stops every live process for this backend, force-reinstalls // from the gallery (overwriting the on-disk artifact), and replies. The // next routine InstallBackend call spawns a fresh process with the new -// binary — upgrade itself does not start a process. +// binary - upgrade itself does not start a process. // -// Timeout: 15 minutes. Real-world worst case observed: 8–10 minutes for -// large CUDA-l4t backend images on Jetson over WiFi. +// Timeout: configured via DistributedConfig.BackendUpgradeTimeoutOrDefault +// (default 15m). Real-world worst case observed: 8-10 minutes for large +// CUDA-l4t backend images on Jetson over WiFi. func (a *RemoteUnloaderAdapter) UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error) { subject := messaging.SubjectNodeBackendUpgrade(nodeID) xlog.Info("Sending NATS backend.upgrade", "nodeID", nodeID, "backend", backendType, "replica", replicaIndex) @@ -128,7 +139,7 @@ func (a *RemoteUnloaderAdapter) UpgradeBackend(nodeID, backendType, galleriesJSO Name: name, Alias: alias, ReplicaIndex: int32(replicaIndex), - }, 15*time.Minute) + }, a.upgradeTimeout) } // installWithForceFallback is the rolling-update fallback used by @@ -149,7 +160,7 @@ func (a *RemoteUnloaderAdapter) installWithForceFallback(nodeID, backendType, ga Alias: alias, ReplicaIndex: int32(replicaIndex), Force: true, - }, 15*time.Minute) + }, a.upgradeTimeout) } // ListBackends queries a worker node for its installed backends via NATS request-reply. diff --git a/core/services/nodes/unloader_test.go b/core/services/nodes/unloader_test.go index de8f3b501..dd4dc5ea9 100644 --- a/core/services/nodes/unloader_test.go +++ b/core/services/nodes/unloader_test.go @@ -60,6 +60,7 @@ type publishCall struct { type requestCall struct { Subject string Data []byte + Timeout time.Duration } func (f *fakeMessagingClient) Publish(subject string, data any) error { @@ -93,10 +94,10 @@ func (f *fakeMessagingClient) SubscribeReply(_ string, _ func(data []byte, reply return &fakeSubscription{}, nil } -func (f *fakeMessagingClient) Request(subject string, data []byte, _ time.Duration) ([]byte, error) { +func (f *fakeMessagingClient) Request(subject string, data []byte, timeout time.Duration) ([]byte, error) { f.mu.Lock() defer f.mu.Unlock() - f.requestCalls = append(f.requestCalls, requestCall{Subject: subject, Data: data}) + f.requestCalls = append(f.requestCalls, requestCall{Subject: subject, Data: data, Timeout: timeout}) return f.requestReply, f.requestErr } @@ -119,7 +120,7 @@ var _ = Describe("RemoteUnloaderAdapter", func() { BeforeEach(func() { locator = &fakeModelLocator{} mc = &fakeMessagingClient{} - adapter = NewRemoteUnloaderAdapter(locator, mc) + adapter = NewRemoteUnloaderAdapter(locator, mc, 3*time.Minute, 15*time.Minute) }) Describe("UnloadRemoteModel", func() { @@ -154,7 +155,7 @@ var _ = Describe("RemoteUnloaderAdapter", func() { } // Use a messaging client that fails the first Publish call only. failOnce := &failOnceMessagingClient{inner: mc, failOn: 0} - adapter = NewRemoteUnloaderAdapter(locator, failOnce) + adapter = NewRemoteUnloaderAdapter(locator, failOnce, 3*time.Minute, 15*time.Minute) Expect(adapter.UnloadRemoteModel("llama")).To(Succeed()) @@ -259,3 +260,29 @@ func (f *failOnceMessagingClient) Request(subject string, data []byte, timeout t func (f *failOnceMessagingClient) IsConnected() bool { return true } func (f *failOnceMessagingClient) Close() {} + +var _ = Describe("RemoteUnloaderAdapter timeout configuration", func() { + It("passes the configured install timeout to the messaging client", func() { + mc := newScriptedMessagingClient() + mc.scriptReply(messaging.SubjectNodeBackendInstall("n1"), messaging.BackendInstallReply{Success: true, Address: "127.0.0.1:0"}) + adapter := NewRemoteUnloaderAdapter(nil, mc, 7*time.Minute, 11*time.Minute) + + _, err := adapter.InstallBackend("n1", "llama-cpp", "", "[]", "", "", "", 0) + Expect(err).ToNot(HaveOccurred()) + + Expect(mc.calls).To(HaveLen(1)) + Expect(mc.calls[0].Timeout).To(Equal(7 * time.Minute)) + }) + + It("passes the configured upgrade timeout to the messaging client", func() { + mc := newScriptedMessagingClient() + mc.scriptReply(messaging.SubjectNodeBackendUpgrade("n1"), messaging.BackendUpgradeReply{Success: true}) + adapter := NewRemoteUnloaderAdapter(nil, mc, 7*time.Minute, 11*time.Minute) + + _, err := adapter.UpgradeBackend("n1", "llama-cpp", "[]", "", "", "", 0) + Expect(err).ToNot(HaveOccurred()) + + Expect(mc.calls).To(HaveLen(1)) + Expect(mc.calls[0].Timeout).To(Equal(11 * time.Minute)) + }) +}) diff --git a/core/services/nodes/unloader_upgrade_test.go b/core/services/nodes/unloader_upgrade_test.go index 2261dd02e..48059e48c 100644 --- a/core/services/nodes/unloader_upgrade_test.go +++ b/core/services/nodes/unloader_upgrade_test.go @@ -1,6 +1,8 @@ package nodes import ( + "time" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -15,7 +17,7 @@ var _ = Describe("RemoteUnloaderAdapter.UpgradeBackend", func() { mc.scriptReply(messaging.SubjectNodeBackendUpgrade(nodeID), messaging.BackendUpgradeReply{Success: true}) - adapter := NewRemoteUnloaderAdapter(nil, mc) + adapter := NewRemoteUnloaderAdapter(nil, mc, 3*time.Minute, 15*time.Minute) reply, err := adapter.UpgradeBackend(nodeID, "llama-cpp", `[{"name":"x"}]`, "", "", "", 0) Expect(err).ToNot(HaveOccurred()) Expect(reply.Success).To(BeTrue()) @@ -24,7 +26,7 @@ var _ = Describe("RemoteUnloaderAdapter.UpgradeBackend", func() { It("returns the underlying error when the subject has no responders", func() { mc := newScriptedMessagingClient() // unscripted subject => fakeNoRespondersErr by harness convention - adapter := NewRemoteUnloaderAdapter(nil, mc) + adapter := NewRemoteUnloaderAdapter(nil, mc, 3*time.Minute, 15*time.Minute) _, err := adapter.UpgradeBackend("missing-node", "llama-cpp", "", "", "", "", 0) Expect(err).To(HaveOccurred()) }) diff --git a/tests/e2e/distributed/distributed_full_flow_test.go b/tests/e2e/distributed/distributed_full_flow_test.go index 5215c5617..cc3d123b9 100644 --- a/tests/e2e/distributed/distributed_full_flow_test.go +++ b/tests/e2e/distributed/distributed_full_flow_test.go @@ -225,7 +225,7 @@ var _ = Describe("Full Distributed Inference Flow", Label("Distributed"), func() // newTestSmartRouter creates a SmartRouter with NATS wired up and a mock // backend.install handler that always replies success for all registered nodes. newTestSmartRouter := func(reg *nodes.NodeRegistry, extraOpts ...nodes.SmartRouterOptions) *nodes.SmartRouter { - unloader := nodes.NewRemoteUnloaderAdapter(reg, infra.NC) + unloader := nodes.NewRemoteUnloaderAdapter(reg, infra.NC, 3*time.Minute, 15*time.Minute) opts := nodes.SmartRouterOptions{ Unloader: unloader, @@ -395,7 +395,7 @@ var _ = Describe("Full Distributed Inference Flow", Label("Distributed"), func() Expect(err).ToNot(HaveOccurred()) // Create RemoteUnloaderAdapter and unload model - unloader := nodes.NewRemoteUnloaderAdapter(registry, infra.NC) + unloader := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute) err = unloader.UnloadRemoteModel("old-model") Expect(err).ToNot(HaveOccurred()) diff --git a/tests/e2e/distributed/managers_test.go b/tests/e2e/distributed/managers_test.go index 702669bf2..6651062f9 100644 --- a/tests/e2e/distributed/managers_test.go +++ b/tests/e2e/distributed/managers_test.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" "sync/atomic" + "time" "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/services/galleryop" @@ -175,7 +176,7 @@ var _ = Describe("Model and Backend Managers", Label("Distributed"), func() { appCfg := config.NewApplicationConfig() appCfg.SystemState = ss - adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC) + adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute) distMgr := nodes.NewDistributedModelManager(appCfg, ml, adapter) err = distMgr.DeleteModel("big-model") @@ -251,7 +252,7 @@ var _ = Describe("Model and Backend Managers", Label("Distributed"), func() { appCfg := config.NewApplicationConfig() appCfg.SystemState = ss - adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC) + adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute) distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry) err = distMgr.DeleteBackend("my-backend") @@ -298,7 +299,7 @@ var _ = Describe("Model and Backend Managers", Label("Distributed"), func() { appCfg := config.NewApplicationConfig() appCfg.SystemState = ss - adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC) + adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute) distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry) // Should NOT return an error even though the backend doesn't exist locally diff --git a/tests/e2e/distributed/node_lifecycle_test.go b/tests/e2e/distributed/node_lifecycle_test.go index ea69e2a2d..d5f4cc1f5 100644 --- a/tests/e2e/distributed/node_lifecycle_test.go +++ b/tests/e2e/distributed/node_lifecycle_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "sync/atomic" + "time" "github.com/mudler/LocalAI/core/services/messaging" "github.com/mudler/LocalAI/core/services/nodes" @@ -56,7 +57,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f FlushNATS(infra.NC) - adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC) + adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute) installReply, err := adapter.InstallBackend(node.ID, "llama-cpp", "", "", "", "", "", 0) Expect(err).ToNot(HaveOccurred()) Expect(installReply.Success).To(BeTrue()) @@ -77,7 +78,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f FlushNATS(infra.NC) - adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC) + adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute) installReply, err := adapter.InstallBackend(node.ID, "nonexistent", "", "", "", "", "", 0) Expect(err).ToNot(HaveOccurred()) Expect(installReply.Success).To(BeFalse()) @@ -103,7 +104,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f FlushNATS(infra.NC) // Frontend calls UnloadRemoteModel (triggered by UI "Stop" or WatchDog) - adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC) + adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute) Expect(adapter.UnloadRemoteModel("whisper-large")).To(Succeed()) Eventually(func() int32 { return stopReceived.Load() }, "5s").Should(Equal(int32(1))) @@ -133,14 +134,14 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f FlushNATS(infra.NC) - adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC) + adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute) adapter.UnloadRemoteModel("shared-model") Eventually(func() int32 { return count.Load() }, "5s").Should(Equal(int32(2))) }) It("should be no-op for models not on any node", func() { - adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC) + adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute) Expect(adapter.UnloadRemoteModel("nonexistent-model")).To(Succeed()) }) }) @@ -161,7 +162,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f FlushNATS(infra.NC) - adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC) + adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute) Expect(adapter.StopNode(node.ID)).To(Succeed()) Eventually(func() int32 { return stopped.Load() }, "5s").Should(Equal(int32(1))) diff --git a/tests/e2e/distributed/router_tracking_test.go b/tests/e2e/distributed/router_tracking_test.go index 0862bef5e..fe7cced3d 100644 --- a/tests/e2e/distributed/router_tracking_test.go +++ b/tests/e2e/distributed/router_tracking_test.go @@ -3,6 +3,7 @@ package distributed_test import ( "context" "encoding/json" + "time" "github.com/mudler/LocalAI/core/services/messaging" "github.com/mudler/LocalAI/core/services/nodes" @@ -78,7 +79,7 @@ var _ = Describe("SmartRouter trackingKey", Label("Distributed"), func() { Expect(registry.Register(context.Background(), node, true)).To(Succeed()) nodeID = node.ID - unloader := nodes.NewRemoteUnloaderAdapter(registry, infra.NC) + unloader := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute) router = nodes.NewSmartRouter(registry, nodes.SmartRouterOptions{ Unloader: unloader, })