From e8e75aadb6e46d413ce6918816394818af1ef20f Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Fri, 22 May 2026 21:46:33 +0000 Subject: [PATCH] feat(distributed): RemoteUnloaderAdapter subscribes to install progress InstallBackend gains opID + onProgress parameters. When both are set, the adapter subscribes to nodes..backend.install..progress BEFORE publishing the install request, decodes each message into the caller's onProgress callback in a goroutine (so a slow callback never stalls the NATS reader thread), and unsubscribes after RequestJSON returns. When onProgress is nil OR opID is empty (the reconciler retry path), subscription is skipped entirely - silent installs cost nothing extra. Subscribe failure is logged at Warn and the install proceeds without progress streaming; the NATS round-trip still owns terminal status. Signed-off-by: Ettore Di Giacinto --- core/services/nodes/managers_distributed.go | 2 +- .../nodes/managers_distributed_test.go | 74 +++++++++++++++++-- core/services/nodes/reconciler.go | 8 +- core/services/nodes/router.go | 2 +- core/services/nodes/router_test.go | 2 +- core/services/nodes/unloader.go | 43 ++++++++++- core/services/nodes/unloader_test.go | 49 +++++++++++- tests/e2e/distributed/node_lifecycle_test.go | 4 +- 8 files changed, 162 insertions(+), 22 deletions(-) diff --git a/core/services/nodes/managers_distributed.go b/core/services/nodes/managers_distributed.go index 765d4b5a6..1cb52e94b 100644 --- a/core/services/nodes/managers_distributed.go +++ b/core/services/nodes/managers_distributed.go @@ -417,7 +417,7 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall // Admin-driven backend install: not tied to a specific replica slot. // Pass replica 0 - the worker's processKey is "backend#0" when no // modelID is supplied, matching pre-PR4 behavior. - reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0) + reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0, "", nil) if err != nil { return err } diff --git a/core/services/nodes/managers_distributed_test.go b/core/services/nodes/managers_distributed_test.go index 352e06188..f07d966cc 100644 --- a/core/services/nodes/managers_distributed_test.go +++ b/core/services/nodes/managers_distributed_test.go @@ -23,12 +23,14 @@ import ( // (or error). Used so each fan-out request can simulate a different worker // outcome without spinning up real NATS. type scriptedMessagingClient struct { - mu sync.Mutex - replies map[string][]byte - errs map[string]error - calls []requestCall - matchedReplies map[string][]matchedReply - publishes []progressPublishCall + mu sync.Mutex + replies map[string][]byte + errs map[string]error + calls []requestCall + matchedReplies map[string][]matchedReply + publishes []progressPublishCall + scheduledProgressPublishes []scheduledProgressPublish + subscribes []string } // progressPublishCall records a single Publish invocation. The progress @@ -42,6 +44,16 @@ type progressPublishCall struct { Event messaging.BackendInstallProgressEvent } +// scheduledProgressPublish queues a batch of BackendInstallProgressEvent +// values to be delivered the next time Subscribe is called with the matching +// subject. This lets master-side tests assert that the adapter installs its +// handler BEFORE publishing the install request, by scripting events to be +// delivered as soon as the subscription appears. +type scheduledProgressPublish struct { + subject string + events []messaging.BackendInstallProgressEvent +} + // matchedReply lets a test script a canned reply that only fires when the // inbound request matches a predicate. Used by scriptReplyMatching to // distinguish "install Force=true" (the fallback) from "install Force=false" @@ -181,7 +193,55 @@ func (s *scriptedMessagingClient) publishCalls(subject string) []messaging.Backe } return out } -func (s *scriptedMessagingClient) Subscribe(_ string, _ func([]byte)) (messaging.Subscription, error) { + +// scheduleProgressPublish queues a set of BackendInstallProgressEvent values +// to be delivered on the next Subscribe call matching the per-op progress +// subject. A short delay before delivery gives the subscriber time to install +// its message handler before the events arrive. +func (s *scriptedMessagingClient) scheduleProgressPublish(nodeID, opID string, events []messaging.BackendInstallProgressEvent) { + s.mu.Lock() + defer s.mu.Unlock() + s.scheduledProgressPublishes = append(s.scheduledProgressPublishes, scheduledProgressPublish{ + subject: messaging.SubjectNodeBackendInstallProgress(nodeID, opID), + events: events, + }) +} + +// subscribeCalls returns the subjects on which Subscribe was invoked. +// Used to confirm the master skipped subscription when onProgress was nil. +func (s *scriptedMessagingClient) subscribeCalls() []string { + s.mu.Lock() + defer s.mu.Unlock() + out := make([]string, len(s.subscribes)) + copy(out, s.subscribes) + return out +} + +func (s *scriptedMessagingClient) Subscribe(subject string, handler func([]byte)) (messaging.Subscription, error) { + s.mu.Lock() + s.subscribes = append(s.subscribes, subject) + matched := []scheduledProgressPublish{} + remaining := s.scheduledProgressPublishes[:0] + for _, sp := range s.scheduledProgressPublishes { + if sp.subject == subject { + matched = append(matched, sp) + } else { + remaining = append(remaining, sp) + } + } + s.scheduledProgressPublishes = remaining + s.mu.Unlock() + + go func() { + time.Sleep(20 * time.Millisecond) + for _, sp := range matched { + for _, ev := range sp.events { + raw, _ := json.Marshal(ev) + handler(raw) + } + } + }() + return &fakeSubscription{}, nil } func (s *scriptedMessagingClient) QueueSubscribe(_ string, _ string, _ func([]byte)) (messaging.Subscription, error) { diff --git a/core/services/nodes/reconciler.go b/core/services/nodes/reconciler.go index 72e1c441c..9606c3f52 100644 --- a/core/services/nodes/reconciler.go +++ b/core/services/nodes/reconciler.go @@ -68,9 +68,9 @@ type ModelScheduler interface { // ReplicaReconcilerOptions holds configuration for creating a ReplicaReconciler. type ReplicaReconcilerOptions struct { - Registry *NodeRegistry + Registry *NodeRegistry Scheduler ModelScheduler - Unloader NodeCommandSender + Unloader NodeCommandSender // Adapter is the NATS sender used to retry pending backend ops. When nil, // the state-reconciler pending-drain pass is a no-op (single-node mode). Adapter *RemoteUnloaderAdapter @@ -78,7 +78,7 @@ type ReplicaReconcilerOptions struct { // addresses. Matches the worker's token so HealthCheck auth succeeds. RegistrationToken string // Prober overrides the default gRPC health probe (used by tests). - Prober ModelProber + Prober ModelProber DB *gorm.DB Interval time.Duration // default 30s ScaleDownDelay time.Duration // default 5m @@ -191,7 +191,7 @@ func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) { // Pending-op drain for admin install — not a per-replica load. // Replica 0 is the conventional admin slot. Install is idempotent: // the worker short-circuits if the backend is already running. - reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0) + reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0, "", nil) if err != nil { applyErr = err } else if !reply.Success { diff --git a/core/services/nodes/router.go b/core/services/nodes/router.go index 2d9101f6d..ca95b1653 100644 --- a/core/services/nodes/router.go +++ b/core/services/nodes/router.go @@ -688,7 +688,7 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod key := fmt.Sprintf("%s|%s|%s|%d", node.ID, backendType, modelID, replicaIndex) v, err, _ := r.installFlight.Do(key, func() (any, error) { - reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex) + reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex, "", nil) if err != nil { return "", err } diff --git a/core/services/nodes/router_test.go b/core/services/nodes/router_test.go index 966df2f33..5f944bce8 100644 --- a/core/services/nodes/router_test.go +++ b/core/services/nodes/router_test.go @@ -330,7 +330,7 @@ type upgradeCall struct { replica int } -func (f *fakeUnloader) InstallBackend(nodeID, backend, modelID, _, _, _, _ string, replica int) (*messaging.BackendInstallReply, error) { +func (f *fakeUnloader) InstallBackend(nodeID, backend, modelID, _, _, _, _ string, replica int, _ string, _ func(messaging.BackendInstallProgressEvent)) (*messaging.BackendInstallReply, error) { // installHook intentionally runs OUTSIDE the mutex: the hook may block // on a channel and we don't want to serialize concurrent callers, // which would defeat the singleflight-overlap test. diff --git a/core/services/nodes/unloader.go b/core/services/nodes/unloader.go index 8bc8d5808..69002c4cd 100644 --- a/core/services/nodes/unloader.go +++ b/core/services/nodes/unloader.go @@ -2,6 +2,7 @@ package nodes import ( "context" + "encoding/json" "errors" "fmt" "strings" @@ -33,7 +34,7 @@ type backendStopRequest struct { // nats.ErrNoResponders for old workers that don't subscribe to the new // backend.upgrade subject. type NodeCommandSender interface { - InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) + InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int, opID string, onProgress func(messaging.BackendInstallProgressEvent)) (*messaging.BackendInstallReply, error) UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error) DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error) ListBackends(nodeID string) (*messaging.BackendListReply, error) @@ -116,9 +117,39 @@ func (a *RemoteUnloaderAdapter) UnloadRemoteModel(modelName string) error { // For force-reinstall (admin-driven Upgrade), use UpgradeBackend instead - // it lives on a different NATS subject so it cannot head-of-line-block // routine load traffic on the same worker. -func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) { +func (a *RemoteUnloaderAdapter) InstallBackend( + nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, + replicaIndex int, + opID string, + onProgress func(messaging.BackendInstallProgressEvent), +) (*messaging.BackendInstallReply, error) { subject := messaging.SubjectNodeBackendInstall(nodeID) - xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex) + xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex, "opID", opID) + + // Subscribe to the per-op progress subject BEFORE publishing the install + // request so we don't miss early events. When onProgress is nil OR opID + // is empty (the reconciler-driven retry path), skip subscription entirely: + // silent installs cost nothing extra. + var sub messaging.Subscription + if onProgress != nil && opID != "" { + progressSubject := messaging.SubjectNodeBackendInstallProgress(nodeID, opID) + s, subErr := a.nats.Subscribe(progressSubject, func(raw []byte) { + var ev messaging.BackendInstallProgressEvent + if err := json.Unmarshal(raw, &ev); err != nil { + xlog.Debug("malformed install progress event", "subject", progressSubject, "error", err) + return + } + // Goroutine guard: a slow onProgress callback must not stall + // the NATS reader thread. + go onProgress(ev) + }) + if subErr != nil { + xlog.Warn("Failed to subscribe to install progress subject; proceeding without progress streaming", + "subject", progressSubject, "error", subErr) + } else { + sub = s + } + } reply, err := messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{ Backend: backendType, @@ -128,7 +159,13 @@ func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, gal Name: name, Alias: alias, ReplicaIndex: int32(replicaIndex), + OpID: opID, }, a.installTimeout) + + if sub != nil { + _ = sub.Unsubscribe() + } + if err != nil && isNATSTimeout(err) { return nil, fmt.Errorf("%w (subject=%s nodeID=%s backend=%s): %v", galleryop.ErrWorkerStillInstalling, subject, nodeID, backendType, err) diff --git a/core/services/nodes/unloader_test.go b/core/services/nodes/unloader_test.go index e894b1acd..980dd4dcf 100644 --- a/core/services/nodes/unloader_test.go +++ b/core/services/nodes/unloader_test.go @@ -270,7 +270,7 @@ var _ = Describe("RemoteUnloaderAdapter timeout configuration", func() { mc.scriptReply(messaging.SubjectNodeBackendInstall("n1"), messaging.BackendInstallReply{Success: true, Address: "127.0.0.1:0"}) adapter := NewRemoteUnloaderAdapter(nil, mc, 7*time.Minute, 11*time.Minute) - _, err := adapter.InstallBackend("n1", "llama-cpp", "", "[]", "", "", "", 0) + _, err := adapter.InstallBackend("n1", "llama-cpp", "", "[]", "", "", "", 0, "", nil) Expect(err).ToNot(HaveOccurred()) Expect(mc.calls).To(HaveLen(1)) @@ -296,7 +296,7 @@ var _ = Describe("RemoteUnloaderAdapter NATS timeout handling", func() { mc.scriptErr(messaging.SubjectNodeBackendInstall("n1"), nats.ErrTimeout) adapter := NewRemoteUnloaderAdapter(nil, mc, 100*time.Millisecond, 1*time.Second) - _, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0) + _, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "", nil) Expect(err).To(HaveOccurred()) Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeTrue(), "expected wrapped ErrWorkerStillInstalling, got %v", err) @@ -307,9 +307,52 @@ var _ = Describe("RemoteUnloaderAdapter NATS timeout handling", func() { mc.scriptErr(messaging.SubjectNodeBackendInstall("n1"), nats.ErrNoResponders) adapter := NewRemoteUnloaderAdapter(nil, mc, 100*time.Millisecond, 1*time.Second) - _, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0) + _, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "", nil) Expect(err).To(HaveOccurred()) Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeFalse()) Expect(errors.Is(err, nats.ErrNoResponders)).To(BeTrue()) }) }) + +var _ = Describe("RemoteUnloaderAdapter install progress streaming", func() { + It("forwards BackendInstallProgressEvent values into the onProgress callback when the worker publishes them", func() { + mc := newScriptedMessagingClient() + mc.scriptReply(messaging.SubjectNodeBackendInstall("n1"), messaging.BackendInstallReply{Success: true, Address: "127.0.0.1:0"}) + mc.scheduleProgressPublish("n1", "op-abc", []messaging.BackendInstallProgressEvent{ + {OpID: "op-abc", NodeID: "n1", Backend: "vllm", FileName: "vllm.tar.zst", Current: "100 MB", Total: "1 GB", Percentage: 10}, + {OpID: "op-abc", NodeID: "n1", Backend: "vllm", FileName: "vllm.tar.zst", Current: "500 MB", Total: "1 GB", Percentage: 50}, + }) + + adapter := NewRemoteUnloaderAdapter(nil, mc, 1*time.Second, 1*time.Second) + var ( + received []messaging.BackendInstallProgressEvent + mu sync.Mutex + ) + onProgress := func(ev messaging.BackendInstallProgressEvent) { + mu.Lock() + defer mu.Unlock() + received = append(received, ev) + } + + _, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "op-abc", onProgress) + Expect(err).ToNot(HaveOccurred()) + + Eventually(func() int { + mu.Lock() + defer mu.Unlock() + return len(received) + }, "1s").Should(Equal(2)) + }) + + It("does NOT subscribe when onProgress is nil (reconciler retry path)", func() { + mc := newScriptedMessagingClient() + mc.scriptReply(messaging.SubjectNodeBackendInstall("n1"), messaging.BackendInstallReply{Success: true}) + + adapter := NewRemoteUnloaderAdapter(nil, mc, 1*time.Second, 1*time.Second) + _, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "", nil) + Expect(err).ToNot(HaveOccurred()) + + Expect(mc.subscribeCalls()).To(BeEmpty(), + "reconciler-driven retries must not subscribe to the per-op progress subject") + }) +}) diff --git a/tests/e2e/distributed/node_lifecycle_test.go b/tests/e2e/distributed/node_lifecycle_test.go index d5f4cc1f5..e0539e912 100644 --- a/tests/e2e/distributed/node_lifecycle_test.go +++ b/tests/e2e/distributed/node_lifecycle_test.go @@ -58,7 +58,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f FlushNATS(infra.NC) adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute) - installReply, err := adapter.InstallBackend(node.ID, "llama-cpp", "", "", "", "", "", 0) + installReply, err := adapter.InstallBackend(node.ID, "llama-cpp", "", "", "", "", "", 0, "", nil) Expect(err).ToNot(HaveOccurred()) Expect(installReply.Success).To(BeTrue()) }) @@ -79,7 +79,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f FlushNATS(infra.NC) adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute) - installReply, err := adapter.InstallBackend(node.ID, "nonexistent", "", "", "", "", "", 0) + installReply, err := adapter.InstallBackend(node.ID, "nonexistent", "", "", "", "", "", 0, "", nil) Expect(err).ToNot(HaveOccurred()) Expect(installReply.Success).To(BeFalse()) Expect(installReply.Error).To(ContainSubstring("backend not found"))