feat(distributed): write per-node OpStatus entries during install fan-out

DistributedBackendManager now accepts a nodeProgressSink and feeds it
two streams:

1. enqueueAndDrainBackendOp emits a per-node terminal entry on each
   status it appends to BackendOpResult (queued, success, error,
   running_on_worker). The opID is threaded through the function so
   the sink gets the right gallery op identity.

2. The install apply closure fans each BackendInstallProgressEvent
   into the sink as a downloading entry, alongside the legacy
   progressCb path so the aggregate single-bar view stays correct.

Production wiring passes the GalleryService (which implements
UpdateNodeProgress via Task 2) as the sink. Single-node tests pass
nil. DeleteBackend and UpgradeBackend pass an empty opID so the
sink path no-ops for ops that aren't gallery-tracked the same way
as Install.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2026-05-22 22:44:18 +00:00
parent f96df5eb85
commit a72649b486
4 changed files with 201 additions and 32 deletions

View File

@@ -17,9 +17,9 @@ import (
"github.com/mudler/LocalAI/core/services/jobs"
"github.com/mudler/LocalAI/core/services/nodes"
"github.com/mudler/LocalAI/core/services/storage"
"github.com/mudler/LocalAI/pkg/vram"
coreStartup "github.com/mudler/LocalAI/core/startup"
"github.com/mudler/LocalAI/internal"
"github.com/mudler/LocalAI/pkg/vram"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/sanitize"
@@ -200,7 +200,7 @@ func New(opts ...config.AppOption) (*Application, error) {
nodes.NewDistributedModelManager(options, application.modelLoader, distSvc.Unloader),
)
application.galleryService.SetBackendManager(
nodes.NewDistributedBackendManager(options, application.modelLoader, distSvc.Unloader, distSvc.Registry),
nodes.NewDistributedBackendManager(options, application.modelLoader, distSvc.Unloader, distSvc.Registry, application.galleryService),
)
}
}

View File

@@ -49,6 +49,13 @@ func (d *DistributedModelManager) InstallModel(ctx context.Context, op *galleryo
return d.local.InstallModel(ctx, op, progressCb)
}
// nodeProgressSink is the narrow interface DistributedBackendManager uses to
// publish per-node progress without dragging in the full *GalleryService.
// nil means "no sink, skip per-node writes" (used by single-node tests).
type nodeProgressSink interface {
UpdateNodeProgress(opID, nodeID string, np galleryop.NodeProgress)
}
// DistributedBackendManager wraps a local BackendManager and adds NATS fan-out
// for backend deletion so worker nodes clean up stale files.
type DistributedBackendManager struct {
@@ -57,16 +64,20 @@ type DistributedBackendManager struct {
registry *NodeRegistry
backendGalleries []config.Gallery
systemState *system.SystemState
progressSink nodeProgressSink
}
// NewDistributedBackendManager creates a DistributedBackendManager.
func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model.ModelLoader, adapter *RemoteUnloaderAdapter, registry *NodeRegistry) *DistributedBackendManager {
// progressSink may be nil to disable per-node OpStatus writes (single-node
// tests don't need it).
func NewDistributedBackendManager(appConfig *config.ApplicationConfig, ml *model.ModelLoader, adapter *RemoteUnloaderAdapter, registry *NodeRegistry, progressSink nodeProgressSink) *DistributedBackendManager {
return &DistributedBackendManager{
local: galleryop.NewLocalBackendManager(appConfig, ml),
adapter: adapter,
registry: registry,
backendGalleries: appConfig.BackendGalleries,
systemState: appConfig.SystemState,
progressSink: progressSink,
}
}
@@ -117,25 +128,48 @@ func (r BackendOpResult) Err() error {
// when the node returns.
// targetNodeIDs is an optional allowlist: when non-nil, only nodes whose ID is
// in the set are visited. Used by UpgradeBackend to avoid asking nodes that
// never had the backend installed to "upgrade" it such requests fail at the
// never had the backend installed to "upgrade" it - such requests fail at the
// gallery (no platform variant) and would otherwise leave a forever-retrying
// pending_backend_ops row. nil means "fan out to every node" (Install/Delete).
func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, op, backend string, galleriesJSON []byte, targetNodeIDs map[string]bool, apply func(node BackendNode) error) (BackendOpResult, error) {
//
// opID is the gallery operation identifier; when non-empty and progressSink is
// set, every per-node terminal status appended to BackendOpResult is also
// mirrored into the sink so the UI's per-node OpStatus.Nodes view stays in
// lockstep with the manager's view. opID may be empty for ops that aren't
// gallery-tracked (e.g. DeleteBackend's plain code path).
func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context, opID, op, backend string, galleriesJSON []byte, targetNodeIDs map[string]bool, apply func(node BackendNode) error) (BackendOpResult, error) {
allNodes, err := d.registry.List(ctx)
if err != nil {
return BackendOpResult{}, err
}
// emitNodeProgress is a small helper that funnels every NodeOpStatus we
// append to result.Nodes into the per-node OpStatus sink (when configured
// and opID is known). Keeping it inline avoids drift between the
// BackendOpResult view and the sink view - they're written from the same
// code path on the same terminal statuses.
emitNodeProgress := func(node BackendNode, status, errMsg string) {
if d.progressSink == nil || opID == "" {
return
}
d.progressSink.UpdateNodeProgress(opID, node.ID, galleryop.NodeProgress{
NodeID: node.ID,
NodeName: node.Name,
Status: status,
Error: errMsg,
})
}
result := BackendOpResult{Nodes: make([]NodeOpStatus, 0, len(allNodes))}
for _, node := range allNodes {
// Pending nodes haven't been approved yet no intent to apply.
// Pending nodes haven't been approved yet - no intent to apply.
if node.Status == StatusPending {
continue
}
// Backend lifecycle ops only make sense on backend-type workers.
// Agent workers don't subscribe to backend.install/delete/list, so
// enqueueing for them guarantees a forever-retrying row that the
// reconciler can never drain. Silently skip they aren't consumers.
// reconciler can never drain. Silently skip - they aren't consumers.
if node.NodeType != "" && node.NodeType != NodeTypeBackend {
continue
}
@@ -144,19 +178,23 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context
}
if err := d.registry.UpsertPendingBackendOp(ctx, node.ID, backend, op, galleriesJSON); err != nil {
xlog.Warn("Failed to enqueue backend op", "op", op, "node", node.Name, "backend", backend, "error", err)
errMsg := fmt.Sprintf("enqueue failed: %v", err)
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "error",
Error: fmt.Sprintf("enqueue failed: %v", err),
Error: errMsg,
})
emitNodeProgress(node, "error", errMsg)
continue
}
if node.Status != StatusHealthy {
// Intent is recorded; reconciler will retry when the node recovers.
errMsg := fmt.Sprintf("node %s, will retry when healthy", node.Status)
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "queued",
Error: fmt.Sprintf("node %s, will retry when healthy", node.Status),
Error: errMsg,
})
emitNodeProgress(node, "queued", errMsg)
continue
}
@@ -170,6 +208,7 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "success",
})
emitNodeProgress(node, "success", "")
continue
}
@@ -190,6 +229,7 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "running_on_worker", Error: errMsg,
})
emitNodeProgress(node, "running_on_worker", errMsg)
continue
}
@@ -203,6 +243,7 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context
result.Nodes = append(result.Nodes, NodeOpStatus{
NodeID: node.ID, NodeName: node.Name, Status: "error", Error: errMsg,
})
emitNodeProgress(node, "error", errMsg)
}
return result, nil
}
@@ -244,7 +285,11 @@ func (d *DistributedBackendManager) DeleteBackend(name string) error {
}
ctx := context.Background()
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error {
// Empty opID: plain DeleteBackend isn't gallery-tracked the same way as
// Install/Upgrade (no progress dialog), so we skip the per-node sink
// writes here. DeleteBackendDetailed is the HTTP path that surfaces
// per-node results in its own response.
result, err := d.enqueueAndDrainBackendOp(ctx, "", OpBackendDelete, name, nil, nil, func(node BackendNode) error {
reply, err := d.adapter.DeleteBackend(node.ID, name)
if err != nil {
return err
@@ -267,7 +312,7 @@ func (d *DistributedBackendManager) DeleteBackendDetailed(ctx context.Context, n
if err := d.local.DeleteBackend(name); err != nil && !errors.Is(err, gallery.ErrBackendNotFound) {
return BackendOpResult{}, err
}
return d.enqueueAndDrainBackendOp(ctx, OpBackendDelete, name, nil, nil, func(node BackendNode) error {
return d.enqueueAndDrainBackendOp(ctx, "", OpBackendDelete, name, nil, nil, func(node BackendNode) error {
reply, err := d.adapter.DeleteBackend(node.ID, name)
if err != nil {
return err
@@ -414,11 +459,41 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall
targetNodeIDs = map[string]bool{op.TargetNodeID: true}
}
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendInstall, backendName, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
result, err := d.enqueueAndDrainBackendOp(ctx, op.ID, OpBackendInstall, backendName, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
// onProgress fans each BackendInstallProgressEvent into two
// observers: the legacy single-bar progressCb (kept so callers
// that only consume the aggregate view keep working) and the
// per-node sink (so OpStatus.Nodes gets a "downloading" tick
// per file/percentage with node attribution). Defined inside the
// loop so each node captures its own node.Name into the closure.
onProgress := func(ev messaging.BackendInstallProgressEvent) {
if progressCb != nil {
progressCb(ev.FileName, ev.Current, ev.Total, ev.Percentage)
}
if d.progressSink != nil && op.ID != "" {
d.progressSink.UpdateNodeProgress(op.ID, ev.NodeID, galleryop.NodeProgress{
NodeID: ev.NodeID,
NodeName: node.Name,
Status: "downloading",
FileName: ev.FileName,
Current: ev.Current,
Total: ev.Total,
Percentage: ev.Percentage,
Phase: ev.Phase,
})
}
}
// nil-callback shortcut: when there is nothing to deliver to,
// hand the adapter a nil onProgress so it skips the per-op NATS
// subscription. Matches the pre-Phase-4 bridgeProgressCb semantics.
var onProgressArg func(messaging.BackendInstallProgressEvent)
if progressCb != nil || d.progressSink != nil {
onProgressArg = onProgress
}
// Admin-driven backend install: not tied to a specific replica slot.
// Pass replica 0 - the worker's processKey is "backend#0" when no
// modelID is supplied, matching pre-PR4 behavior.
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0, op.ID, bridgeProgressCb(progressCb))
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0, op.ID, onProgressArg)
if err != nil {
return err
}
@@ -473,7 +548,11 @@ func (d *DistributedBackendManager) UpgradeBackend(ctx context.Context, name str
targetNodeIDs[n.NodeID] = true
}
result, err := d.enqueueAndDrainBackendOp(ctx, OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
// Empty opID: the caller (galleryop) doesn't thread an op ID into
// UpgradeBackend today, so we can't tag per-node sink writes with the
// right OpStatus key. Until the upgrade path takes a ManagementOp the
// way InstallBackend does, the sink stays no-op here.
result, err := d.enqueueAndDrainBackendOp(ctx, "", OpBackendUpgrade, name, galleriesJSON, targetNodeIDs, func(node BackendNode) error {
reply, err := d.adapter.UpgradeBackend(node.ID, name, string(galleriesJSON), "", "", "", 0)
if err != nil {
// Rolling-update fallback: an older worker doesn't know
@@ -548,18 +627,3 @@ func summarizeRunningOnWorker(nodes []NodeOpStatus) string {
}
return strings.Join(names, ", ")
}
// bridgeProgressCb adapts a BackendInstallProgressEvent stream to the
// (file, current, total, percentage) callback shape that
// galleryop.ProgressCallback expects (and that backendHandler already
// translates into OpStatus.UpdateStatus). nil in -> nil out so callers
// that don't pass a progressCb skip subscription work on the adapter
// side, matching the reconciler-retry semantics.
func bridgeProgressCb(progressCb galleryop.ProgressCallback) func(messaging.BackendInstallProgressEvent) {
if progressCb == nil {
return nil
}
return func(ev messaging.BackendInstallProgressEvent) {
progressCb(ev.FileName, ev.Current, ev.Total, ev.Percentage)
}
}

View File

@@ -13,6 +13,7 @@ import (
. "github.com/onsi/gomega"
"gorm.io/gorm"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/messaging"
@@ -256,8 +257,43 @@ func (s *scriptedMessagingClient) SubscribeReply(_ string, _ func([]byte, func([
func (s *scriptedMessagingClient) IsConnected() bool { return true }
func (s *scriptedMessagingClient) Close() {}
// recordingNodeCall captures a single UpdateNodeProgress invocation so
// per-node OpStatus tests can assert on the sequence of writes the
// DistributedBackendManager fans out into the sink.
type recordingNodeCall struct {
OpID string
NodeID string
Progress galleryop.NodeProgress
}
// recordingProgressSink is a test-only nodeProgressSink that just records
// every call. Used by the per-node OpStatus specs below to assert the
// manager wrote the expected terminal and downloading entries.
type recordingProgressSink struct {
mu sync.Mutex
calls []recordingNodeCall
}
func (r *recordingProgressSink) UpdateNodeProgress(opID, nodeID string, np galleryop.NodeProgress) {
r.mu.Lock()
defer r.mu.Unlock()
r.calls = append(r.calls, recordingNodeCall{OpID: opID, NodeID: nodeID, Progress: np})
}
func (r *recordingProgressSink) callsFor(opID, nodeID string) []galleryop.NodeProgress {
r.mu.Lock()
defer r.mu.Unlock()
out := []galleryop.NodeProgress{}
for _, c := range r.calls {
if c.OpID == opID && c.NodeID == nodeID {
out = append(out, c.Progress)
}
}
return out
}
// fakeNoRespondersErr is the unscripted-subject default. It matches
// nats.ErrNoResponders by string only used when a test forgets to script
// nats.ErrNoResponders by string only - used when a test forgets to script
// a node so the failure is loud but doesn't tickle errors.Is(...) sentinel
// paths the test wasn't deliberately exercising. Tests that DO want the
// real sentinel (e.g. to drive the manager's NoResponders fallback) call
@@ -645,6 +681,75 @@ var _ = Describe("DistributedBackendManager", func() {
}, "200ms").Should(Equal(0))
})
})
Context("populates per-node OpStatus entries", func() {
var sink *recordingProgressSink
BeforeEach(func() {
// Reconstruct mgr with the recording sink so the new code
// path (per-node OpStatus writes) is exercised. The default
// mgr in the outer BeforeEach has progressSink=nil so the
// pre-existing specs keep verifying the no-sink behavior.
sink = &recordingProgressSink{}
appCfg := &config.ApplicationConfig{}
mgr = NewDistributedBackendManager(appCfg, nil, adapter, registry, sink)
// stubLocalBackendManager mirrors the production behaviour
// where the frontend node rarely has the backend installed
// locally - the NATS fan-out is what these specs verify.
mgr.local = stubLocalBackendManager{}
})
It("emits a success entry for each healthy node visited", func() {
node := registerHealthyBackend("worker-ok", "10.0.0.9:50051")
mc.scriptReply(messaging.SubjectNodeBackendInstall(node.ID),
messaging.BackendInstallReply{Success: true, Address: "10.0.0.9:50051"})
opVal := op("vllm")
opVal.ID = "op-node-success"
Expect(mgr.InstallBackend(ctx, opVal, nil)).To(Succeed())
calls := sink.callsFor("op-node-success", node.ID)
Expect(calls).ToNot(BeEmpty())
Expect(calls[len(calls)-1].Status).To(Equal("success"))
Expect(calls[len(calls)-1].NodeName).To(Equal("worker-ok"))
})
It("emits a running_on_worker entry when NATS times out", func() {
node := registerHealthyBackend("worker-slow", "10.0.0.10:50051")
mc.scriptErr(messaging.SubjectNodeBackendInstall(node.ID), nats.ErrTimeout)
opVal := op("vllm")
opVal.ID = "op-node-slow"
// Soft failure: returns wrapped ErrWorkerStillInstalling.
_ = mgr.InstallBackend(ctx, opVal, nil)
calls := sink.callsFor("op-node-slow", node.ID)
Expect(calls).ToNot(BeEmpty())
Expect(calls[len(calls)-1].Status).To(Equal("running_on_worker"))
})
It("emits downloading entries from progress events", func() {
node := registerHealthyBackend("worker-dl", "10.0.0.11:50051")
mc.scriptReply(messaging.SubjectNodeBackendInstall(node.ID),
messaging.BackendInstallReply{Success: true})
mc.scheduleProgressPublish(node.ID, "op-node-dl", []messaging.BackendInstallProgressEvent{
{OpID: "op-node-dl", NodeID: node.ID, Backend: "vllm", FileName: "vllm.tar", Current: "1 GB", Total: "1 GB", Percentage: 100, Phase: "downloading"},
})
opVal := op("vllm")
opVal.ID = "op-node-dl"
Expect(mgr.InstallBackend(ctx, opVal, nil)).To(Succeed())
Eventually(func() bool {
for _, np := range sink.callsFor("op-node-dl", node.ID) {
if np.Status == "downloading" && np.Percentage == 100.0 {
return true
}
}
return false
}, "1s").Should(BeTrue())
})
})
})
Describe("UpgradeBackend", func() {

View File

@@ -253,7 +253,7 @@ var _ = Describe("Model and Backend Managers", Label("Distributed"), func() {
appCfg.SystemState = ss
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry)
distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry, nil)
err = distMgr.DeleteBackend("my-backend")
Expect(err).ToNot(HaveOccurred())
@@ -300,7 +300,7 @@ var _ = Describe("Model and Backend Managers", Label("Distributed"), func() {
appCfg.SystemState = ss
adapter := nodes.NewRemoteUnloaderAdapter(registry, infra.NC, 3*time.Minute, 15*time.Minute)
distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry)
distMgr := nodes.NewDistributedBackendManager(appCfg, ml, adapter, registry, nil)
// Should NOT return an error even though the backend doesn't exist locally
err = distMgr.DeleteBackend("remote-only-backend")