feat(distributed): forward backend install progress into galleryop OpStatus

DistributedBackendManager.InstallBackend now passes the gallery op ID
and a progress bridge into the adapter call. Each
BackendInstallProgressEvent from the worker becomes a
galleryop.ProgressCallback tick - which the existing backendHandler
already turns into OpStatus.UpdateStatus, so the admin UI/SSE polling
sees per-byte progress for distributed installs without any UI-side
change.

UpgradeBackend is intentionally left silent for now: its wire request
(BackendUpgradeRequest) does not carry OpID, and rolling-update
fallback is the rarer path. Will be picked up in a follow-up if the
worker upgrade path also gets a progress channel.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2026-05-22 22:00:49 +00:00
parent e8e75aadb6
commit f03aacf7e7
2 changed files with 62 additions and 1 deletions

View File

@@ -10,6 +10,7 @@ import (
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/system"
"github.com/mudler/xlog"
@@ -417,7 +418,7 @@ func (d *DistributedBackendManager) InstallBackend(ctx context.Context, op *gall
// Admin-driven backend install: not tied to a specific replica slot.
// Pass replica 0 - the worker's processKey is "backend#0" when no
// modelID is supplied, matching pre-PR4 behavior.
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0, "", nil)
reply, err := d.adapter.InstallBackend(node.ID, backendName, "", string(galleriesJSON), op.ExternalURI, op.ExternalName, op.ExternalAlias, 0, op.ID, bridgeProgressCb(progressCb))
if err != nil {
return err
}
@@ -547,3 +548,18 @@ func summarizeRunningOnWorker(nodes []NodeOpStatus) string {
}
return strings.Join(names, ", ")
}
// bridgeProgressCb adapts a BackendInstallProgressEvent stream to the
// (file, current, total, percentage) callback shape that
// galleryop.ProgressCallback expects (and that backendHandler already
// translates into OpStatus.UpdateStatus). nil in -> nil out so callers
// that don't pass a progressCb skip subscription work on the adapter
// side, matching the reconciler-retry semantics.
func bridgeProgressCb(progressCb galleryop.ProgressCallback) func(messaging.BackendInstallProgressEvent) {
if progressCb == nil {
return nil
}
return func(ev messaging.BackendInstallProgressEvent) {
progressCb(ev.FileName, ev.Current, ev.Total, ev.Percentage)
}
}

View File

@@ -574,6 +574,51 @@ var _ = Describe("DistributedBackendManager", func() {
Expect(rowsAfter).To(HaveLen(1), "upgrade rows must not be cleared by backend.list presence")
})
})
Context("InstallBackend streams progress events to the caller's progressCb", func() {
It("invokes progressCb once per worker-published progress event", func() {
node := registerHealthyBackend("worker-prog", "10.0.0.7:50051")
mc.scriptReply(messaging.SubjectNodeBackendInstall(node.ID), messaging.BackendInstallReply{Success: true, Address: "10.0.0.7:50051"})
mc.scheduleProgressPublish(node.ID, "op-prog-1", []messaging.BackendInstallProgressEvent{
{OpID: "op-prog-1", NodeID: node.ID, Backend: "vllm", FileName: "vllm.tar", Current: "100 MB", Total: "1 GB", Percentage: 10},
{OpID: "op-prog-1", NodeID: node.ID, Backend: "vllm", FileName: "vllm.tar", Current: "1 GB", Total: "1 GB", Percentage: 100},
})
type tick struct {
FileName, Current, Total string
Percentage float64
}
var (
pcCalls []tick
mu sync.Mutex
)
progressCb := func(file, current, total string, pct float64) {
mu.Lock()
defer mu.Unlock()
pcCalls = append(pcCalls, tick{file, current, total, pct})
}
opVal := op("vllm")
opVal.ID = "op-prog-1"
Expect(mgr.InstallBackend(ctx, opVal, progressCb)).To(Succeed())
Eventually(func() int {
mu.Lock()
defer mu.Unlock()
return len(pcCalls)
}, "1s").Should(Equal(2))
mu.Lock()
defer mu.Unlock()
// The adapter dispatches each progress event to its own goroutine
// (see unloader.go: `go onProgress(ev)`) so two events emitted back
// to back can land at the bridge in either order. Assert the set of
// percentages observed contains both ticks, rather than depending
// on goroutine scheduling for ordering.
pcts := []float64{pcCalls[0].Percentage, pcCalls[1].Percentage}
Expect(pcts).To(ConsistOf(10.0, 100.0))
})
})
})
Describe("UpgradeBackend", func() {