feat(galleryop): UpdateNodeProgress merges per-node ticks by NodeID

GalleryService.UpdateNodeProgress(opID, nodeID, np) merges a NodeProgress
into OpStatus.Nodes (keyed by NodeID, no duplicates) and mirrors the
latest tick into the aggregate Progress / FileName /
DownloadedFileSize / TotalFileSize fields so the legacy single-bar
OperationsBar view keeps working unchanged alongside the new per-node
breakdown.

Concurrent-safe via the existing g.Mutex.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2026-05-22 22:35:11 +00:00
parent e14d9ae8e3
commit f96df5eb85
2 changed files with 92 additions and 0 deletions

View File

@@ -6,6 +6,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/services/galleryop"
)
@@ -40,3 +41,53 @@ var _ = Describe("OpStatus.Nodes", func() {
Expect(got.Nodes[0]).To(Equal(os.Nodes[0]))
})
})
var _ = Describe("GalleryService.UpdateNodeProgress", func() {
var svc *galleryop.GalleryService
BeforeEach(func() {
// UpdateNodeProgress + GetStatus only touch the in-memory statuses
// map. A zero-value ApplicationConfig is enough to get past the
// LocalModelManager / LocalBackendManager constructors.
svc = galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
})
It("creates a node entry on first call", func() {
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{
NodeID: "n1", NodeName: "worker-a", Status: "downloading", Percentage: 12.0,
})
st := svc.GetStatus("op1")
Expect(st).ToNot(BeNil())
Expect(st.Nodes).To(HaveLen(1))
Expect(st.Nodes[0].NodeID).To(Equal("n1"))
Expect(st.Nodes[0].Percentage).To(Equal(12.0))
})
It("merges subsequent updates into the same NodeID entry, not appending", func() {
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{NodeID: "n1", NodeName: "worker-a", Status: "downloading", Percentage: 12.0})
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{NodeID: "n1", NodeName: "worker-a", Status: "downloading", Percentage: 48.0, FileName: "vllm.tar"})
st := svc.GetStatus("op1")
Expect(st.Nodes).To(HaveLen(1))
Expect(st.Nodes[0].Percentage).To(Equal(48.0))
Expect(st.Nodes[0].FileName).To(Equal("vllm.tar"))
})
It("appends a new entry for a different NodeID", func() {
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{NodeID: "n1", NodeName: "worker-a", Status: "downloading", Percentage: 12.0})
svc.UpdateNodeProgress("op1", "n2", galleryop.NodeProgress{NodeID: "n2", NodeName: "worker-b", Status: "queued"})
st := svc.GetStatus("op1")
Expect(st.Nodes).To(HaveLen(2))
})
It("mirrors the latest tick into the aggregate OpStatus fields", func() {
svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{
NodeID: "n1", NodeName: "worker-a", Status: "downloading",
Percentage: 33.0, FileName: "vllm.tar", Current: "330 MB", Total: "1 GB",
})
st := svc.GetStatus("op1")
Expect(st.Progress).To(Equal(33.0))
Expect(st.FileName).To(Equal("vllm.tar"))
Expect(st.DownloadedFileSize).To(Equal("330 MB"))
Expect(st.TotalFileSize).To(Equal("1 GB"))
})
})

View File

@@ -135,6 +135,47 @@ func (g *GalleryService) UpdateStatus(s string, op *OpStatus) {
}
}
// UpdateNodeProgress merges a per-node progress tick into OpStatus.Nodes,
// keyed by nodeID, and mirrors the latest values into the aggregate
// Progress / FileName / DownloadedFileSize / TotalFileSize / Message
// fields so the legacy single-bar OperationsBar view keeps working
// unchanged alongside the new per-node breakdown.
//
// We deliberately do NOT delegate the aggregate mirror to UpdateStatus
// here: UpdateStatus overwrites the entire OpStatus, which would clobber
// the Nodes slice we just merged into. Doing the merge + mirror under a
// single lock keeps both views consistent and concurrent-safe.
func (g *GalleryService) UpdateNodeProgress(opID, nodeID string, np NodeProgress) {
g.Lock()
defer g.Unlock()
status := g.statuses[opID]
if status == nil {
status = &OpStatus{}
g.statuses[opID] = status
}
merged := false
for i := range status.Nodes {
if status.Nodes[i].NodeID == nodeID {
status.Nodes[i] = np
merged = true
break
}
}
if !merged {
status.Nodes = append(status.Nodes, np)
}
// Mirror the latest tick into the legacy aggregate fields so the
// existing single-bar UI keeps rendering meaningful progress.
status.FileName = np.FileName
status.Progress = np.Percentage
status.DownloadedFileSize = np.Current
status.TotalFileSize = np.Total
if np.Phase != "" {
status.Message = np.Phase
}
}
func (g *GalleryService) GetStatus(s string) *OpStatus {
g.Lock()
defer g.Unlock()