From 58f29496e5e3e60386a3a23426f6552f5548acf2 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Fri, 22 May 2026 21:20:13 +0000 Subject: [PATCH] feat(messaging): add BackendInstallProgressEvent wire type and subject New NATS subject nodes..backend.install..progress lets the worker publish transient progress events (file, current/total bytes, percentage, phase) while a long-running install pulls its OCI image. BackendInstallRequest gains an optional OpID field so the worker knows which subject to publish on. Transient pub/sub (not JetStream): the install reply remains ground truth for success/failure; dropped progress events are tolerable. Signed-off-by: Ettore Di Giacinto --- .../messaging/backend_install_progress.go | 23 +++++++++ .../backend_install_progress_test.go | 51 +++++++++++++++++++ core/services/messaging/subjects.go | 6 +++ 3 files changed, 80 insertions(+) create mode 100644 core/services/messaging/backend_install_progress.go create mode 100644 core/services/messaging/backend_install_progress_test.go diff --git a/core/services/messaging/backend_install_progress.go b/core/services/messaging/backend_install_progress.go new file mode 100644 index 000000000..825112ee8 --- /dev/null +++ b/core/services/messaging/backend_install_progress.go @@ -0,0 +1,23 @@ +package messaging + +// BackendInstallProgressEvent is the wire payload published by a worker to +// nodes..backend.install..progress while a long-running install +// is in flight. Transient: dropped events are acceptable, the master relies +// on BackendInstallReply for ground truth on success/failure. +type BackendInstallProgressEvent struct { + OpID string `json:"op_id"` + NodeID string `json:"node_id"` + Backend string `json:"backend"` + FileName string `json:"file_name,omitempty"` + Current string `json:"current,omitempty"` // human-readable size, e.g. "412 MB" + Total string `json:"total,omitempty"` // human-readable size, e.g. "2.1 GB" + Percentage float64 `json:"percentage"` + Phase string `json:"phase,omitempty"` // "resolving" | "downloading" | "extracting" | "starting" +} + +// SubjectNodeBackendInstallProgress returns the NATS subject for transient +// progress events emitted by a worker during a single backend.install run. +// Per-op so multiple concurrent installs on the same node never alias. +func SubjectNodeBackendInstallProgress(nodeID, opID string) string { + return subjectNodePrefix + sanitizeSubjectToken(nodeID) + ".backend.install." + sanitizeSubjectToken(opID) + ".progress" +} diff --git a/core/services/messaging/backend_install_progress_test.go b/core/services/messaging/backend_install_progress_test.go new file mode 100644 index 000000000..0decace3d --- /dev/null +++ b/core/services/messaging/backend_install_progress_test.go @@ -0,0 +1,51 @@ +package messaging_test + +import ( + "encoding/json" + "strings" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/services/messaging" +) + +var _ = Describe("BackendInstallProgress", func() { + Context("SubjectNodeBackendInstallProgress", func() { + It("composes the per-op progress subject", func() { + Expect(messaging.SubjectNodeBackendInstallProgress("node-abc", "op-123")). + To(Equal("nodes.node-abc.backend.install.op-123.progress")) + }) + + It("sanitizes NATS-reserved characters in node and op tokens", func() { + // '.' is the NATS hierarchy delimiter, '*' and '>' are wildcards, + // and whitespace must be stripped — sanitizeSubjectToken replaces + // all of them with '-'. The resulting subject must still parse as + // exactly six hierarchy segments: nodes//backend/install//progress. + subj := messaging.SubjectNodeBackendInstallProgress("a.b c", "x.y z") + Expect(subj).ToNot(ContainSubstring(" ")) + Expect(strings.Count(subj, ".")).To(Equal(5)) + }) + }) + + Context("BackendInstallProgressEvent", func() { + It("JSON round-trips with all known fields", func() { + ev := messaging.BackendInstallProgressEvent{ + OpID: "op-123", + NodeID: "node-abc", + Backend: "vllm", + FileName: "vllm-cpu.tar.zst", + Current: "412 MB", + Total: "2.1 GB", + Percentage: 19.6, + Phase: "downloading", + } + raw, err := json.Marshal(ev) + Expect(err).ToNot(HaveOccurred()) + + var got messaging.BackendInstallProgressEvent + Expect(json.Unmarshal(raw, &got)).To(Succeed()) + Expect(got).To(Equal(ev)) + }) + }) +}) diff --git a/core/services/messaging/subjects.go b/core/services/messaging/subjects.go index 25080caee..6cf7a9969 100644 --- a/core/services/messaging/subjects.go +++ b/core/services/messaging/subjects.go @@ -144,6 +144,12 @@ type BackendInstallRequest struct { // worker still works (the master's install fallback path also uses this // when backend.upgrade returns nats.ErrNoResponders). Force bool `json:"force,omitempty"` + // OpID identifies the admin-side operation. When non-empty the worker + // publishes BackendInstallProgressEvent values to + // SubjectNodeBackendInstallProgress(nodeID, OpID) while the install is + // running, debounced to roughly 250ms. Empty means the caller is a + // reconciler-driven retry that does not need progress streamed. + OpID string `json:"op_id,omitempty"` } // BackendInstallReply is the response from a backend.install NATS request.