From a72649b48630fe4c1d93c0c643cf4cfde5693872 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Fri, 22 May 2026 22:44:18 +0000 Subject: [PATCH] feat(distributed): write per-node OpStatus entries during install fan-out DistributedBackendManager now accepts a nodeProgressSink and feeds it two streams: 1. enqueueAndDrainBackendOp emits a per-node terminal entry on each status it appends to BackendOpResult (queued, success, error, running_on_worker). The opID is threaded through the function so the sink gets the right gallery op identity. 2. The install apply closure fans each BackendInstallProgressEvent into the sink as a downloading entry, alongside the legacy progressCb path so the aggregate single-bar view stays correct. Production wiring passes the GalleryService (which implements UpdateNodeProgress via Task 2) as the sink. Single-node tests pass nil. DeleteBackend and UpgradeBackend pass an empty opID so the sink path no-ops for ops that aren't gallery-tracked the same way as Install. Signed-off-by: Ettore Di Giacinto --- core/application/startup.go | 4 +- core/services/nodes/managers_distributed.go | 118 ++++++++++++++---- .../nodes/managers_distributed_test.go | 107 +++++++++++++++- tests/e2e/distributed/managers_test.go | 4 +- 4 files changed, 201 insertions(+), 32 deletions(-) diff --git a/core/application/startup.go b/core/application/startup.go index 1ddeabb0d..183713f86 100644 --- a/core/application/startup.go +++ b/core/application/startup.go @@ -17,9 +17,9 @@ import ( "github.com/mudler/LocalAI/core/services/jobs" "github.com/mudler/LocalAI/core/services/nodes" "github.com/mudler/LocalAI/core/services/storage" - "github.com/mudler/LocalAI/pkg/vram" coreStartup "github.com/mudler/LocalAI/core/startup" "github.com/mudler/LocalAI/internal" + "github.com/mudler/LocalAI/pkg/vram" "github.com/mudler/LocalAI/pkg/model" "github.com/mudler/LocalAI/pkg/sanitize" @@ -200,7 +200,7 @@ func New(opts ...config.AppOption) (*Application, error) { nodes.NewDistributedModelManager(options, application.modelLoader, distSvc.Unloader), ) application.galleryService.SetBackendManager( - nodes.NewDistributedBackendManager(options, application.modelLoader, distSvc.Unloader, distSvc.Registry), + nodes.NewDistributedBackendManager(options, application.modelLoader, distSvc.Unloader, distSvc.Registry, application.galleryService), ) } } diff --git a/core/services/nodes/managers_distributed.go b/core/services/nodes/managers_distributed.go index 980b15c2a..931223aff 100644 --- a/core/services/nodes/managers_distributed.go +++ b/core/services/nodes/managers_distributed.go @@ -49,6 +49,13 @@ func (d *DistributedModelManager) InstallModel(ctx context.Context, op *galleryo return d.local.InstallModel(ctx, op, progressCb) } +// nodeProgressSink is the narrow interface DistributedBackendManager uses to +// publish per-node progress without dragging in the full *GalleryService. +// nil means "no sink, skip per-node writes" (used by single-node tests). +type nodeProgressSink interface { + UpdateNodeProgress(opID, nodeID string, np galleryop.NodeProgress) +} + // DistributedBackendManager wraps a local BackendManager and adds NATS fan-out // for backend deletion so worker nodes clean up stale files. type DistributedBackendManager struct { @@ -57,16 +64,20 @@ type DistributedBackendManager struct { registry *NodeRegistry backendGalleries []config.Gallery systemState *system.SystemState + progressSink nodeProgressSink } // NewDistributedBackendManager creates a DistributedBackendManager. -func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model.ModelLoader, adapter *RemoteUnloaderAdapter, registry *NodeRegistry) *DistributedBackendManager { +// progressSink may be nil to disable per-node OpStatus writes (single-node +// tests don't need it). +func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model.ModelLoader, adapter *RemoteUnloaderAdapter, registry *NodeRegistry, progressSink nodeProgressSink) *DistributedBackendManager { return &DistributedBackendManager{ local: galleryop.NewLocalBackendManager(appConfig, ml), adapter: adapter, registry: registry, backendGalleries: appConfig.BackendGalleries, systemState: appConfig.SystemState, + progressSink: progressSink, } } @@ -117,25 +128,48 @@ func (r BackendOpResult) Err() error { // when the node returns. // targetNodeIDs is an optional allowlist: when non-nil, only nodes whose ID is // in the set are visited. Used by UpgradeBackend to avoid asking nodes that -// never had the backend installed to "upgrade" it — such requests fail at the +// never had the backend installed to "upgrade" it - such requests fail at the // gallery (no platform variant) and would otherwise leave a forever-retrying // pending_backend_ops row. nil means "fan out to every node" (Install/Delete). -func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, op, backend string, galleriesJSON []byte, targetNodeIDs map[string]bool, apply func(node BackendNode) error) (BackendOpResult, error) { +// +// opID is the gallery operation identifier; when non-empty and progressSink is +// set, every per-node terminal status appended to BackendOpResult is also +// mirrored into the sink so the UI's per-node OpStatus.Nodes view stays in +// lockstep with the manager's view. opID may be empty for ops that aren't +// gallery-tracked (e.g. DeleteBackend's plain code path). +func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, opID, op, backend string, galleriesJSON []byte, targetNodeIDs map[string]bool, apply func(node BackendNode) error) (BackendOpResult, error) { allNodes, err := d.registry.List(ctx) if err != nil { return BackendOpResult{}, err } + // emitNodeProgress is a small helper that funnels every NodeOpStatus we + // append to result.Nodes into the per-node OpStatus sink (when configured + // and opID is known). Keeping it inline avoids drift between the + // BackendOpResult view and the sink view - they're written from the same + // code path on the same terminal statuses. + emitNodeProgress := func(node BackendNode, status, errMsg string) { + if d.progressSink == nil || opID == "" { + return + } + d.progressSink.UpdateNodeProgress(opID, node.ID, galleryop.NodeProgress{ + NodeID: node.ID, + NodeName: node.Name, + Status: status, + Error: errMsg, + }) + } + result := BackendOpResult{Nodes: make([]NodeOpStatus, 0, len(allNodes))} for _, node := range allNodes { - // Pending nodes haven't been approved yet — no intent to apply. + // Pending nodes haven't been approved yet - no intent to apply. if node.Status == StatusPending { continue } // Backend lifecycle ops only make sense on backend-type workers. // Agent workers don't subscribe to backend.install/delete/list, so // enqueueing for them guarantees a forever-retrying row that the - // reconciler can never drain. Silently skip — they aren't consumers. + // reconciler can never drain. Silently skip - they aren't consumers. if node.NodeType != "" && node.NodeType != NodeTypeBackend { continue } @@ -144,19 +178,23 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context } if err := d.registry.UpsertPendingBackendOp(ctx, node.ID, backend, op, galleriesJSON); err != nil { xlog.Warn("Failed to enqueue backend op", "op", op, "node", node.Name, "backend", backend, "error", err) + errMsg := fmt.Sprintf("enqueue failed: %v", err) result.Nodes = append(result.Nodes, NodeOpStatus{ NodeID: node.ID, NodeName: node.Name, Status: "error", - Error: fmt.Sprintf("enqueue failed: %v", err), + Error: errMsg, }) + emitNodeProgress(node, "error", errMsg) continue } if node.Status != StatusHealthy { // Intent is recorded; reconciler will retry when the node recovers. + errMsg := fmt.Sprintf("node %s, will retry when healthy", node.Status) result.Nodes = append(result.Nodes, NodeOpStatus{ NodeID: node.ID, NodeName: node.Name, Status: "queued", - Error: fmt.Sprintf("node %s, will retry when healthy", node.Status), + Error: errMsg, }) + emitNodeProgress(node, "queued", errMsg) continue } @@ -170,6 +208,7 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context result.Nodes = append(result.Nodes, NodeOpStatus{ NodeID: node.ID, NodeName: node.Name, Status: "success", }) + emitNodeProgress(node, "success", "") continue } @@ -190,6 +229,7 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context result.Nodes = append(result.Nodes, NodeOpStatus{ NodeID: node.ID, NodeName: node.Name, Status: "running_on_worker", Error: errMsg, }) + emitNodeProgress(node, "running_on_worker", errMsg) continue } @@ -203,6 +243,7 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context result.Nodes = append(result.Nodes, NodeOpStatus{ NodeID: node.ID, NodeName: node.Name, Status: "error", Error: errMsg, }) + emitNodeProgress(node, "error", errMsg) } return result, nil } @@ -244,7 +285,11 @@ func (d *DistributedBackendManager) DeleteBackend(name string) error { } ctx := context.Background() - result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error { + // Empty opID: plain DeleteBackend isn't gallery-tracked the same way as + // Install/Upgrade (no progress dialog), so we skip the per-node sink + // writes here. DeleteBackendDetailed is the HTTP path that surfaces + // per-node results in its own response. + result, err := d.enqueueAndDrainBackendOp(ctx, "", OpBackendDelete, name, nil, nil, func(node BackendNode) error { reply, err := d.adapter.DeleteBackend(node.ID, name) if err != nil { return err @@ -267,7 +312,7 @@ func (d *DistributedBackendManager) DeleteBackendDetailed(ctx context.Context, n if err := d.local.DeleteBackend(name); err != nil && !errors.Is(err, gallery.ErrBackendNotFound) { return BackendOpResult{}, err } - return d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error { + return d.enqueueAndDrainBackendOp(ctx, "", OpBackendDelete, name, nil, nil, func(node BackendNode) error { reply, err := d.adapter.DeleteBackend(node.ID, name) if err != nil { return err @@ -414,11 +459,41 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall targetNodeIDs = map[string]bool{op.TargetNodeID: true} } - result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, targetNodeIDs, func(node BackendNode) error { + result, err := d.enqueueAndDrainBackendOp(ctx, op.ID, OpBackendInstall, backendName, galleriesJSON, targetNodeIDs, func(node BackendNode) error { + // onProgress fans each BackendInstallProgressEvent into two + // observers: the legacy single-bar progressCb (kept so callers + // that only consume the aggregate view keep working) and the + // per-node sink (so OpStatus.Nodes gets a "downloading" tick + // per file/percentage with node attribution). Defined inside the + // loop so each node captures its own node.Name into the closure. + onProgress := func(ev messaging.BackendInstallProgressEvent) { + if progressCb != nil { + progressCb(ev.FileName, ev.Current, ev.Total, ev.Percentage) + } + if d.progressSink != nil && op.ID != "" { + d.progressSink.UpdateNodeProgress(op.ID, ev.NodeID, galleryop.NodeProgress{ + NodeID: ev.NodeID, + NodeName: node.Name, + Status: "downloading", + FileName: ev.FileName, + Current: ev.Current, + Total: ev.Total, + Percentage: ev.Percentage, + Phase: ev.Phase, + }) + } + } + // nil-callback shortcut: when there is nothing to deliver to, + // hand the adapter a nil onProgress so it skips the per-op NATS + // subscription. Matches the pre-Phase-4 bridgeProgressCb semantics. + var onProgressArg func(messaging.BackendInstallProgressEvent) + if progressCb != nil || d.progressSink != nil { + onProgressArg = onProgress + } // Admin-driven backend install: not tied to a specific replica slot. // Pass replica 0 - the worker's processKey is "backend#0" when no // modelID is supplied, matching pre-PR4 behavior. - reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0, op.ID, bridgeProgressCb(progressCb)) + reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0, op.ID, onProgressArg) if err != nil { return err } @@ -473,7 +548,11 @@ func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name str targetNodeIDs[n.NodeID] = true } - result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error { + // Empty opID: the caller (galleryop) doesn't thread an op ID into + // UpgradeBackend today, so we can't tag per-node sink writes with the + // right OpStatus key. Until the upgrade path takes a ManagementOp the + // way InstallBackend does, the sink stays no-op here. + result, err := d.enqueueAndDrainBackendOp(ctx, "", OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error { reply, err := d.adapter.UpgradeBackend(node.ID, name, string(galleriesJSON), "", "", "", 0) if err != nil { // Rolling-update fallback: an older worker doesn't know @@ -548,18 +627,3 @@ func summarizeRunningOnWorker(nodes []NodeOpStatus) string { } return strings.Join(names, ", ") } - -// bridgeProgressCb adapts a BackendInstallProgressEvent stream to the -// (file, current, total, percentage) callback shape that -// galleryop.ProgressCallback expects (and that backendHandler already -// translates into OpStatus.UpdateStatus). nil in -> nil out so callers -// that don't pass a progressCb skip subscription work on the adapter -// side, matching the reconciler-retry semantics. -func bridgeProgressCb(progressCb galleryop.ProgressCallback) func(messaging.BackendInstallProgressEvent) { - if progressCb == nil { - return nil - } - return func(ev messaging.BackendInstallProgressEvent) { - progressCb(ev.FileName, ev.Current, ev.Total, ev.Percentage) - } -} diff --git a/core/services/nodes/managers_distributed_test.go b/core/services/nodes/managers_distributed_test.go index 478c8c786..4569c4ff7 100644 --- a/core/services/nodes/managers_distributed_test.go +++ b/core/services/nodes/managers_distributed_test.go @@ -13,6 +13,7 @@ import ( . "github.com/onsi/gomega" "gorm.io/gorm" + "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/gallery" "github.com/mudler/LocalAI/core/services/galleryop" "github.com/mudler/LocalAI/core/services/messaging" @@ -256,8 +257,43 @@ func (s *scriptedMessagingClient) SubscribeReply(_ string, _ func([]byte, func([ func (s *scriptedMessagingClient) IsConnected() bool { return true } func (s *scriptedMessagingClient) Close() {} +// recordingNodeCall captures a single UpdateNodeProgress invocation so +// per-node OpStatus tests can assert on the sequence of writes the +// DistributedBackendManager fans out into the sink. +type recordingNodeCall struct { + OpID string + NodeID string + Progress galleryop.NodeProgress +} + +// recordingProgressSink is a test-only nodeProgressSink that just records +// every call. Used by the per-node OpStatus specs below to assert the +// manager wrote the expected terminal and downloading entries. +type recordingProgressSink struct { + mu sync.Mutex + calls []recordingNodeCall +} + +func (r *recordingProgressSink) UpdateNodeProgress(opID, nodeID string, np galleryop.NodeProgress) { + r.mu.Lock() + defer r.mu.Unlock() + r.calls = append(r.calls, recordingNodeCall{OpID: opID, NodeID: nodeID, Progress: np}) +} + +func (r *recordingProgressSink) callsFor(opID, nodeID string) []galleryop.NodeProgress { + r.mu.Lock() + defer r.mu.Unlock() + out := []galleryop.NodeProgress{} + for _, c := range r.calls { + if c.OpID == opID && c.NodeID == nodeID { + out = append(out, c.Progress) + } + } + return out +} + // fakeNoRespondersErr is the unscripted-subject default. It matches -// nats.ErrNoResponders by string only — used when a test forgets to script +// nats.ErrNoResponders by string only - used when a test forgets to script // a node so the failure is loud but doesn't tickle errors.Is(...) sentinel // paths the test wasn't deliberately exercising. Tests that DO want the // real sentinel (e.g. to drive the manager's NoResponders fallback) call @@ -645,6 +681,75 @@ var _ = Describe("DistributedBackendManager", func() { }, "200ms").Should(Equal(0)) }) }) + + Context("populates per-node OpStatus entries", func() { + var sink *recordingProgressSink + + BeforeEach(func() { + // Reconstruct mgr with the recording sink so the new code + // path (per-node OpStatus writes) is exercised. The default + // mgr in the outer BeforeEach has progressSink=nil so the + // pre-existing specs keep verifying the no-sink behavior. + sink = &recordingProgressSink{} + appCfg := &config.ApplicationConfig{} + mgr = NewDistributedBackendManager(appCfg, nil, adapter, registry, sink) + // stubLocalBackendManager mirrors the production behaviour + // where the frontend node rarely has the backend installed + // locally - the NATS fan-out is what these specs verify. + mgr.local = stubLocalBackendManager{} + }) + + It("emits a success entry for each healthy node visited", func() { + node := registerHealthyBackend("worker-ok", "10.0.0.9:50051") + mc.scriptReply(messaging.SubjectNodeBackendInstall(node.ID), + messaging.BackendInstallReply{Success: true, Address: "10.0.0.9:50051"}) + + opVal := op("vllm") + opVal.ID = "op-node-success" + Expect(mgr.InstallBackend(ctx, opVal, nil)).To(Succeed()) + + calls := sink.callsFor("op-node-success", node.ID) + Expect(calls).ToNot(BeEmpty()) + Expect(calls[len(calls)-1].Status).To(Equal("success")) + Expect(calls[len(calls)-1].NodeName).To(Equal("worker-ok")) + }) + + It("emits a running_on_worker entry when NATS times out", func() { + node := registerHealthyBackend("worker-slow", "10.0.0.10:50051") + mc.scriptErr(messaging.SubjectNodeBackendInstall(node.ID), nats.ErrTimeout) + + opVal := op("vllm") + opVal.ID = "op-node-slow" + // Soft failure: returns wrapped ErrWorkerStillInstalling. + _ = mgr.InstallBackend(ctx, opVal, nil) + + calls := sink.callsFor("op-node-slow", node.ID) + Expect(calls).ToNot(BeEmpty()) + Expect(calls[len(calls)-1].Status).To(Equal("running_on_worker")) + }) + + It("emits downloading entries from progress events", func() { + node := registerHealthyBackend("worker-dl", "10.0.0.11:50051") + mc.scriptReply(messaging.SubjectNodeBackendInstall(node.ID), + messaging.BackendInstallReply{Success: true}) + mc.scheduleProgressPublish(node.ID, "op-node-dl", []messaging.BackendInstallProgressEvent{ + {OpID: "op-node-dl", NodeID: node.ID, Backend: "vllm", FileName: "vllm.tar", Current: "1 GB", Total: "1 GB", Percentage: 100, Phase: "downloading"}, + }) + + opVal := op("vllm") + opVal.ID = "op-node-dl" + Expect(mgr.InstallBackend(ctx, opVal, nil)).To(Succeed()) + + Eventually(func() bool { + for _, np := range sink.callsFor("op-node-dl", node.ID) { + if np.Status == "downloading" && np.Percentage == 100.0 { + return true + } + } + return false + }, "1s").Should(BeTrue()) + }) + }) }) Describe("UpgradeBackend", func() { diff --git a/tests/e2e/distributed/managers_test.go b/tests/e2e/distributed/managers_test.go index 6651062f9..b4f51ef95 100644 --- a/tests/e2e/distributed/managers_test.go +++ b/tests/e2e/distributed/managers_test.go @@ -253,7 +253,7 @@ var _ = Describe("Model and Backend Managers", Label("Distributed"), func() { appCfg.SystemState = ss adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute) - distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry) + distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry, nil) err = distMgr.DeleteBackend("my-backend") Expect(err).ToNot(HaveOccurred()) @@ -300,7 +300,7 @@ var _ = Describe("Model and Backend Managers", Label("Distributed"), func() { appCfg.SystemState = ss adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute) - distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry) + distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry, nil) // Should NOT return an error even though the backend doesn't exist locally err = distMgr.DeleteBackend("remote-only-backend")