mirror of
https://github.com/mudler/LocalAI.git
synced 2026-05-24 08:38:02 -04:00
feat(messaging): add BackendInstallProgressEvent wire type and subject
New NATS subject nodes.<nodeID>.backend.install.<opID>.progress lets the worker publish transient progress events (file, current/total bytes, percentage, phase) while a long-running install pulls its OCI image. BackendInstallRequest gains an optional OpID field so the worker knows which subject to publish on. Transient pub/sub (not JetStream): the install reply remains ground truth for success/failure; dropped progress events are tolerable. Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
23
core/services/messaging/backend_install_progress.go
Normal file
23
core/services/messaging/backend_install_progress.go
Normal file
@@ -0,0 +1,23 @@
|
||||
package messaging
|
||||
|
||||
// BackendInstallProgressEvent is the wire payload published by a worker to
|
||||
// nodes.<nodeID>.backend.install.<opID>.progress while a long-running install
|
||||
// is in flight. Transient: dropped events are acceptable, the master relies
|
||||
// on BackendInstallReply for ground truth on success/failure.
|
||||
type BackendInstallProgressEvent struct {
|
||||
OpID string `json:"op_id"`
|
||||
NodeID string `json:"node_id"`
|
||||
Backend string `json:"backend"`
|
||||
FileName string `json:"file_name,omitempty"`
|
||||
Current string `json:"current,omitempty"` // human-readable size, e.g. "412 MB"
|
||||
Total string `json:"total,omitempty"` // human-readable size, e.g. "2.1 GB"
|
||||
Percentage float64 `json:"percentage"`
|
||||
Phase string `json:"phase,omitempty"` // "resolving" | "downloading" | "extracting" | "starting"
|
||||
}
|
||||
|
||||
// SubjectNodeBackendInstallProgress returns the NATS subject for transient
|
||||
// progress events emitted by a worker during a single backend.install run.
|
||||
// Per-op so multiple concurrent installs on the same node never alias.
|
||||
func SubjectNodeBackendInstallProgress(nodeID, opID string) string {
|
||||
return subjectNodePrefix + sanitizeSubjectToken(nodeID) + ".backend.install." + sanitizeSubjectToken(opID) + ".progress"
|
||||
}
|
||||
51
core/services/messaging/backend_install_progress_test.go
Normal file
51
core/services/messaging/backend_install_progress_test.go
Normal file
@@ -0,0 +1,51 @@
|
||||
package messaging_test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/mudler/LocalAI/core/services/messaging"
|
||||
)
|
||||
|
||||
var _ = Describe("BackendInstallProgress", func() {
|
||||
Context("SubjectNodeBackendInstallProgress", func() {
|
||||
It("composes the per-op progress subject", func() {
|
||||
Expect(messaging.SubjectNodeBackendInstallProgress("node-abc", "op-123")).
|
||||
To(Equal("nodes.node-abc.backend.install.op-123.progress"))
|
||||
})
|
||||
|
||||
It("sanitizes NATS-reserved characters in node and op tokens", func() {
|
||||
// '.' is the NATS hierarchy delimiter, '*' and '>' are wildcards,
|
||||
// and whitespace must be stripped — sanitizeSubjectToken replaces
|
||||
// all of them with '-'. The resulting subject must still parse as
|
||||
// exactly six hierarchy segments: nodes/<node>/backend/install/<op>/progress.
|
||||
subj := messaging.SubjectNodeBackendInstallProgress("a.b c", "x.y z")
|
||||
Expect(subj).ToNot(ContainSubstring(" "))
|
||||
Expect(strings.Count(subj, ".")).To(Equal(5))
|
||||
})
|
||||
})
|
||||
|
||||
Context("BackendInstallProgressEvent", func() {
|
||||
It("JSON round-trips with all known fields", func() {
|
||||
ev := messaging.BackendInstallProgressEvent{
|
||||
OpID: "op-123",
|
||||
NodeID: "node-abc",
|
||||
Backend: "vllm",
|
||||
FileName: "vllm-cpu.tar.zst",
|
||||
Current: "412 MB",
|
||||
Total: "2.1 GB",
|
||||
Percentage: 19.6,
|
||||
Phase: "downloading",
|
||||
}
|
||||
raw, err := json.Marshal(ev)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
var got messaging.BackendInstallProgressEvent
|
||||
Expect(json.Unmarshal(raw, &got)).To(Succeed())
|
||||
Expect(got).To(Equal(ev))
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -144,6 +144,12 @@ type BackendInstallRequest struct {
|
||||
// worker still works (the master's install fallback path also uses this
|
||||
// when backend.upgrade returns nats.ErrNoResponders).
|
||||
Force bool `json:"force,omitempty"`
|
||||
// OpID identifies the admin-side operation. When non-empty the worker
|
||||
// publishes BackendInstallProgressEvent values to
|
||||
// SubjectNodeBackendInstallProgress(nodeID, OpID) while the install is
|
||||
// running, debounced to roughly 250ms. Empty means the caller is a
|
||||
// reconciler-driven retry that does not need progress streamed.
|
||||
OpID string `json:"op_id,omitempty"`
|
||||
}
|
||||
|
||||
// BackendInstallReply is the response from a backend.install NATS request.
|
||||
|
||||
Reference in New Issue
Block a user