feat(distributed): RemoteUnloaderAdapter subscribes to install progress

InstallBackend gains opID + onProgress parameters. When both are set,
the adapter subscribes to nodes.<nodeID>.backend.install.<opID>.progress
BEFORE publishing the install request, decodes each message into the
caller's onProgress callback in a goroutine (so a slow callback never
stalls the NATS reader thread), and unsubscribes after RequestJSON
returns.

When onProgress is nil OR opID is empty (the reconciler retry path),
subscription is skipped entirely - silent installs cost nothing extra.

Subscribe failure is logged at Warn and the install proceeds without
progress streaming; the NATS round-trip still owns terminal status.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2026-05-22 21:46:33 +00:00
parent 352ebe241d
commit e8e75aadb6
8 changed files with 162 additions and 22 deletions

View File

@@ -417,7 +417,7 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall
// Admin-driven backend install: not tied to a specific replica slot.
// Pass replica 0 - the worker's processKey is "backend#0" when no
// modelID is supplied, matching pre-PR4 behavior.
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0)
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0, "", nil)
if err != nil {
return err
}

View File

@@ -23,12 +23,14 @@ import (
// (or error). Used so each fan-out request can simulate a different worker
// outcome without spinning up real NATS.
type scriptedMessagingClient struct {
mu sync.Mutex
replies map[string][]byte
errs map[string]error
calls []requestCall
matchedReplies map[string][]matchedReply
publishes []progressPublishCall
mu sync.Mutex
replies map[string][]byte
errs map[string]error
calls []requestCall
matchedReplies map[string][]matchedReply
publishes []progressPublishCall
scheduledProgressPublishes []scheduledProgressPublish
subscribes []string
}
// progressPublishCall records a single Publish invocation. The progress
@@ -42,6 +44,16 @@ type progressPublishCall struct {
Event messaging.BackendInstallProgressEvent
}
// scheduledProgressPublish queues a batch of BackendInstallProgressEvent
// values to be delivered the next time Subscribe is called with the matching
// subject. This lets master-side tests assert that the adapter installs its
// handler BEFORE publishing the install request, by scripting events to be
// delivered as soon as the subscription appears.
type scheduledProgressPublish struct {
subject string
events []messaging.BackendInstallProgressEvent
}
// matchedReply lets a test script a canned reply that only fires when the
// inbound request matches a predicate. Used by scriptReplyMatching to
// distinguish "install Force=true" (the fallback) from "install Force=false"
@@ -181,7 +193,55 @@ func (s *scriptedMessagingClient) publishCalls(subject string) []messaging.Backe
}
return out
}
func (s *scriptedMessagingClient) Subscribe(_ string, _ func([]byte)) (messaging.Subscription, error) {
// scheduleProgressPublish queues a set of BackendInstallProgressEvent values
// to be delivered on the next Subscribe call matching the per-op progress
// subject. A short delay before delivery gives the subscriber time to install
// its message handler before the events arrive.
func (s *scriptedMessagingClient) scheduleProgressPublish(nodeID, opID string, events []messaging.BackendInstallProgressEvent) {
s.mu.Lock()
defer s.mu.Unlock()
s.scheduledProgressPublishes = append(s.scheduledProgressPublishes, scheduledProgressPublish{
subject: messaging.SubjectNodeBackendInstallProgress(nodeID, opID),
events: events,
})
}
// subscribeCalls returns the subjects on which Subscribe was invoked.
// Used to confirm the master skipped subscription when onProgress was nil.
func (s *scriptedMessagingClient) subscribeCalls() []string {
s.mu.Lock()
defer s.mu.Unlock()
out := make([]string, len(s.subscribes))
copy(out, s.subscribes)
return out
}
func (s *scriptedMessagingClient) Subscribe(subject string, handler func([]byte)) (messaging.Subscription, error) {
s.mu.Lock()
s.subscribes = append(s.subscribes, subject)
matched := []scheduledProgressPublish{}
remaining := s.scheduledProgressPublishes[:0]
for _, sp := range s.scheduledProgressPublishes {
if sp.subject == subject {
matched = append(matched, sp)
} else {
remaining = append(remaining, sp)
}
}
s.scheduledProgressPublishes = remaining
s.mu.Unlock()
go func() {
time.Sleep(20 * time.Millisecond)
for _, sp := range matched {
for _, ev := range sp.events {
raw, _ := json.Marshal(ev)
handler(raw)
}
}
}()
return &fakeSubscription{}, nil
}
func (s *scriptedMessagingClient) QueueSubscribe(_ string, _ string, _ func([]byte)) (messaging.Subscription, error) {

View File

@@ -68,9 +68,9 @@ type ModelScheduler interface {
// ReplicaReconcilerOptions holds configuration for creating a ReplicaReconciler.
type ReplicaReconcilerOptions struct {
Registry *NodeRegistry
Registry *NodeRegistry
Scheduler ModelScheduler
Unloader NodeCommandSender
Unloader NodeCommandSender
// Adapter is the NATS sender used to retry pending backend ops. When nil,
// the state-reconciler pending-drain pass is a no-op (single-node mode).
Adapter *RemoteUnloaderAdapter
@@ -78,7 +78,7 @@ type ReplicaReconcilerOptions struct {
// addresses. Matches the worker's token so HealthCheck auth succeeds.
RegistrationToken string
// Prober overrides the default gRPC health probe (used by tests).
Prober ModelProber
Prober ModelProber
DB *gorm.DB
Interval time.Duration // default 30s
ScaleDownDelay time.Duration // default 5m
@@ -191,7 +191,7 @@ func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) {
// Pending-op drain for admin install — not a per-replica load.
// Replica 0 is the conventional admin slot. Install is idempotent:
// the worker short-circuits if the backend is already running.
reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0)
reply, err := rc.adapter.InstallBackend(op.NodeID, op.Backend, "", string(op.Galleries), "", "", "", 0, "", nil)
if err != nil {
applyErr = err
} else if !reply.Success {

View File

@@ -688,7 +688,7 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod
key := fmt.Sprintf("%s|%s|%s|%d", node.ID, backendType, modelID, replicaIndex)
v, err, _ := r.installFlight.Do(key, func() (any, error) {
reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex)
reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex, "", nil)
if err != nil {
return "", err
}

View File

@@ -330,7 +330,7 @@ type upgradeCall struct {
replica int
}
func (f *fakeUnloader) InstallBackend(nodeID, backend, modelID, _, _, _, _ string, replica int) (*messaging.BackendInstallReply, error) {
func (f *fakeUnloader) InstallBackend(nodeID, backend, modelID, _, _, _, _ string, replica int, _ string, _ func(messaging.BackendInstallProgressEvent)) (*messaging.BackendInstallReply, error) {
// installHook intentionally runs OUTSIDE the mutex: the hook may block
// on a channel and we don't want to serialize concurrent callers,
// which would defeat the singleflight-overlap test.

View File

@@ -2,6 +2,7 @@ package nodes
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
@@ -33,7 +34,7 @@ type backendStopRequest struct {
// nats.ErrNoResponders for old workers that don't subscribe to the new
// backend.upgrade subject.
type NodeCommandSender interface {
InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error)
InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int, opID string, onProgress func(messaging.BackendInstallProgressEvent)) (*messaging.BackendInstallReply, error)
UpgradeBackend(nodeID, backendType, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendUpgradeReply, error)
DeleteBackend(nodeID, backendName string) (*messaging.BackendDeleteReply, error)
ListBackends(nodeID string) (*messaging.BackendListReply, error)
@@ -116,9 +117,39 @@ func (a *RemoteUnloaderAdapter) UnloadRemoteModel(modelName string) error {
// For force-reinstall (admin-driven Upgrade), use UpgradeBackend instead -
// it lives on a different NATS subject so it cannot head-of-line-block
// routine load traffic on the same worker.
func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, galleriesJSON, uri, name, alias string, replicaIndex int) (*messaging.BackendInstallReply, error) {
func (a *RemoteUnloaderAdapter) InstallBackend(
nodeID, backendType, modelID, galleriesJSON, uri, name, alias string,
replicaIndex int,
opID string,
onProgress func(messaging.BackendInstallProgressEvent),
) (*messaging.BackendInstallReply, error) {
subject := messaging.SubjectNodeBackendInstall(nodeID)
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex)
xlog.Info("Sending NATS backend.install", "nodeID", nodeID, "backend", backendType, "modelID", modelID, "replica", replicaIndex, "opID", opID)
// Subscribe to the per-op progress subject BEFORE publishing the install
// request so we don't miss early events. When onProgress is nil OR opID
// is empty (the reconciler-driven retry path), skip subscription entirely:
// silent installs cost nothing extra.
var sub messaging.Subscription
if onProgress != nil && opID != "" {
progressSubject := messaging.SubjectNodeBackendInstallProgress(nodeID, opID)
s, subErr := a.nats.Subscribe(progressSubject, func(raw []byte) {
var ev messaging.BackendInstallProgressEvent
if err := json.Unmarshal(raw, &ev); err != nil {
xlog.Debug("malformed install progress event", "subject", progressSubject, "error", err)
return
}
// Goroutine guard: a slow onProgress callback must not stall
// the NATS reader thread.
go onProgress(ev)
})
if subErr != nil {
xlog.Warn("Failed to subscribe to install progress subject; proceeding without progress streaming",
"subject", progressSubject, "error", subErr)
} else {
sub = s
}
}
reply, err := messaging.RequestJSON[messaging.BackendInstallRequest, messaging.BackendInstallReply](a.nats, subject, messaging.BackendInstallRequest{
Backend: backendType,
@@ -128,7 +159,13 @@ func (a *RemoteUnloaderAdapter) InstallBackend(nodeID, backendType, modelID, gal
Name: name,
Alias: alias,
ReplicaIndex: int32(replicaIndex),
OpID: opID,
}, a.installTimeout)
if sub != nil {
_ = sub.Unsubscribe()
}
if err != nil && isNATSTimeout(err) {
return nil, fmt.Errorf("%w (subject=%s nodeID=%s backend=%s): %v",
galleryop.ErrWorkerStillInstalling, subject, nodeID, backendType, err)

View File

@@ -270,7 +270,7 @@ var _ = Describe("RemoteUnloaderAdapter timeout configuration", func() {
mc.scriptReply(messaging.SubjectNodeBackendInstall("n1"), messaging.BackendInstallReply{Success: true, Address: "127.0.0.1:0"})
adapter := NewRemoteUnloaderAdapter(nil, mc, 7*time.Minute, 11*time.Minute)
_, err := adapter.InstallBackend("n1", "llama-cpp", "", "[]", "", "", "", 0)
_, err := adapter.InstallBackend("n1", "llama-cpp", "", "[]", "", "", "", 0, "", nil)
Expect(err).ToNot(HaveOccurred())
Expect(mc.calls).To(HaveLen(1))
@@ -296,7 +296,7 @@ var _ = Describe("RemoteUnloaderAdapter NATS timeout handling", func() {
mc.scriptErr(messaging.SubjectNodeBackendInstall("n1"), nats.ErrTimeout)
adapter := NewRemoteUnloaderAdapter(nil, mc, 100*time.Millisecond, 1*time.Second)
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0)
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "", nil)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeTrue(),
"expected wrapped ErrWorkerStillInstalling, got %v", err)
@@ -307,9 +307,52 @@ var _ = Describe("RemoteUnloaderAdapter NATS timeout handling", func() {
mc.scriptErr(messaging.SubjectNodeBackendInstall("n1"), nats.ErrNoResponders)
adapter := NewRemoteUnloaderAdapter(nil, mc, 100*time.Millisecond, 1*time.Second)
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0)
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "", nil)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, galleryop.ErrWorkerStillInstalling)).To(BeFalse())
Expect(errors.Is(err, nats.ErrNoResponders)).To(BeTrue())
})
})
var _ = Describe("RemoteUnloaderAdapter install progress streaming", func() {
It("forwards BackendInstallProgressEvent values into the onProgress callback when the worker publishes them", func() {
mc := newScriptedMessagingClient()
mc.scriptReply(messaging.SubjectNodeBackendInstall("n1"), messaging.BackendInstallReply{Success: true, Address: "127.0.0.1:0"})
mc.scheduleProgressPublish("n1", "op-abc", []messaging.BackendInstallProgressEvent{
{OpID: "op-abc", NodeID: "n1", Backend: "vllm", FileName: "vllm.tar.zst", Current: "100 MB", Total: "1 GB", Percentage: 10},
{OpID: "op-abc", NodeID: "n1", Backend: "vllm", FileName: "vllm.tar.zst", Current: "500 MB", Total: "1 GB", Percentage: 50},
})
adapter := NewRemoteUnloaderAdapter(nil, mc, 1*time.Second, 1*time.Second)
var (
received []messaging.BackendInstallProgressEvent
mu sync.Mutex
)
onProgress := func(ev messaging.BackendInstallProgressEvent) {
mu.Lock()
defer mu.Unlock()
received = append(received, ev)
}
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "op-abc", onProgress)
Expect(err).ToNot(HaveOccurred())
Eventually(func() int {
mu.Lock()
defer mu.Unlock()
return len(received)
}, "1s").Should(Equal(2))
})
It("does NOT subscribe when onProgress is nil (reconciler retry path)", func() {
mc := newScriptedMessagingClient()
mc.scriptReply(messaging.SubjectNodeBackendInstall("n1"), messaging.BackendInstallReply{Success: true})
adapter := NewRemoteUnloaderAdapter(nil, mc, 1*time.Second, 1*time.Second)
_, err := adapter.InstallBackend("n1", "vllm", "", "[]", "", "", "", 0, "", nil)
Expect(err).ToNot(HaveOccurred())
Expect(mc.subscribeCalls()).To(BeEmpty(),
"reconciler-driven retries must not subscribe to the per-op progress subject")
})
})

View File

@@ -58,7 +58,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
installReply, err := adapter.InstallBackend(node.ID, "llama-cpp", "", "", "", "", "", 0)
installReply, err := adapter.InstallBackend(node.ID, "llama-cpp", "", "", "", "", "", 0, "", nil)
Expect(err).ToNot(HaveOccurred())
Expect(installReply.Success).To(BeTrue())
})
@@ -79,7 +79,7 @@ var _ = Describe("Node Backend Lifecycle (NATS-driven)", Label("Distributed"), f
FlushNATS(infra.NC)
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
installReply, err := adapter.InstallBackend(node.ID, "nonexistent", "", "", "", "", "", 0)
installReply, err := adapter.InstallBackend(node.ID, "nonexistent", "", "", "", "", "", 0, "", nil)
Expect(err).ToNot(HaveOccurred())
Expect(installReply.Success).To(BeFalse())
Expect(installReply.Error).To(ContainSubstring("backend not found"))