From f03aacf7e7e2188ba4fed710c80942f20dbd130b Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Fri, 22 May 2026 22:00:49 +0000 Subject: [PATCH] feat(distributed): forward backend install progress into galleryop OpStatus DistributedBackendManager.InstallBackend now passes the gallery op ID and a progress bridge into the adapter call. Each BackendInstallProgressEvent from the worker becomes a galleryop.ProgressCallback tick - which the existing backendHandler already turns into OpStatus.UpdateStatus, so the admin UI/SSE polling sees per-byte progress for distributed installs without any UI-side change. UpgradeBackend is intentionally left silent for now: its wire request (BackendUpgradeRequest) does not carry OpID, and rolling-update fallback is the rarer path. Will be picked up in a follow-up if the worker upgrade path also gets a progress channel. Signed-off-by: Ettore Di Giacinto --- core/services/nodes/managers_distributed.go | 18 +++++++- .../nodes/managers_distributed_test.go | 45 +++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/core/services/nodes/managers_distributed.go b/core/services/nodes/managers_distributed.go index 1cb52e94b..980b15c2a 100644 --- a/core/services/nodes/managers_distributed.go +++ b/core/services/nodes/managers_distributed.go @@ -10,6 +10,7 @@ import ( "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/gallery" "github.com/mudler/LocalAI/core/services/galleryop" + "github.com/mudler/LocalAI/core/services/messaging" "github.com/mudler/LocalAI/pkg/model" "github.com/mudler/LocalAI/pkg/system" "github.com/mudler/xlog" @@ -417,7 +418,7 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall // Admin-driven backend install: not tied to a specific replica slot. // Pass replica 0 - the worker's processKey is "backend#0" when no // modelID is supplied, matching pre-PR4 behavior. - reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0, "", nil) + reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0, op.ID, bridgeProgressCb(progressCb)) if err != nil { return err } @@ -547,3 +548,18 @@ func summarizeRunningOnWorker(nodes []NodeOpStatus) string { } return strings.Join(names, ", ") } + +// bridgeProgressCb adapts a BackendInstallProgressEvent stream to the +// (file, current, total, percentage) callback shape that +// galleryop.ProgressCallback expects (and that backendHandler already +// translates into OpStatus.UpdateStatus). nil in -> nil out so callers +// that don't pass a progressCb skip subscription work on the adapter +// side, matching the reconciler-retry semantics. +func bridgeProgressCb(progressCb galleryop.ProgressCallback) func(messaging.BackendInstallProgressEvent) { + if progressCb == nil { + return nil + } + return func(ev messaging.BackendInstallProgressEvent) { + progressCb(ev.FileName, ev.Current, ev.Total, ev.Percentage) + } +} diff --git a/core/services/nodes/managers_distributed_test.go b/core/services/nodes/managers_distributed_test.go index f07d966cc..1aaada2b9 100644 --- a/core/services/nodes/managers_distributed_test.go +++ b/core/services/nodes/managers_distributed_test.go @@ -574,6 +574,51 @@ var _ = Describe("DistributedBackendManager", func() { Expect(rowsAfter).To(HaveLen(1), "upgrade rows must not be cleared by backend.list presence") }) }) + + Context("InstallBackend streams progress events to the caller's progressCb", func() { + It("invokes progressCb once per worker-published progress event", func() { + node := registerHealthyBackend("worker-prog", "10.0.0.7:50051") + + mc.scriptReply(messaging.SubjectNodeBackendInstall(node.ID), messaging.BackendInstallReply{Success: true, Address: "10.0.0.7:50051"}) + mc.scheduleProgressPublish(node.ID, "op-prog-1", []messaging.BackendInstallProgressEvent{ + {OpID: "op-prog-1", NodeID: node.ID, Backend: "vllm", FileName: "vllm.tar", Current: "100 MB", Total: "1 GB", Percentage: 10}, + {OpID: "op-prog-1", NodeID: node.ID, Backend: "vllm", FileName: "vllm.tar", Current: "1 GB", Total: "1 GB", Percentage: 100}, + }) + + type tick struct { + FileName, Current, Total string + Percentage float64 + } + var ( + pcCalls []tick + mu sync.Mutex + ) + progressCb := func(file, current, total string, pct float64) { + mu.Lock() + defer mu.Unlock() + pcCalls = append(pcCalls, tick{file, current, total, pct}) + } + + opVal := op("vllm") + opVal.ID = "op-prog-1" + Expect(mgr.InstallBackend(ctx, opVal, progressCb)).To(Succeed()) + + Eventually(func() int { + mu.Lock() + defer mu.Unlock() + return len(pcCalls) + }, "1s").Should(Equal(2)) + mu.Lock() + defer mu.Unlock() + // The adapter dispatches each progress event to its own goroutine + // (see unloader.go: `go onProgress(ev)`) so two events emitted back + // to back can land at the bridge in either order. Assert the set of + // percentages observed contains both ticks, rather than depending + // on goroutine scheduling for ordering. + pcts := []float64{pcCalls[0].Percentage, pcCalls[1].Percentage} + Expect(pcts).To(ConsistOf(10.0, 100.0)) + }) + }) }) Describe("UpgradeBackend", func() {