diff --git a/core/services/galleryop/node_progress_test.go b/core/services/galleryop/node_progress_test.go index e2478c5e2..34c9386e0 100644 --- a/core/services/galleryop/node_progress_test.go +++ b/core/services/galleryop/node_progress_test.go @@ -6,6 +6,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/services/galleryop" ) @@ -40,3 +41,53 @@ var _ = Describe("OpStatus.Nodes", func() { Expect(got.Nodes[0]).To(Equal(os.Nodes[0])) }) }) + +var _ = Describe("GalleryService.UpdateNodeProgress", func() { + var svc *galleryop.GalleryService + + BeforeEach(func() { + // UpdateNodeProgress + GetStatus only touch the in-memory statuses + // map. A zero-value ApplicationConfig is enough to get past the + // LocalModelManager / LocalBackendManager constructors. + svc = galleryop.NewGalleryService(&config.ApplicationConfig{}, nil) + }) + + It("creates a node entry on first call", func() { + svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{ + NodeID: "n1", NodeName: "worker-a", Status: "downloading", Percentage: 12.0, + }) + st := svc.GetStatus("op1") + Expect(st).ToNot(BeNil()) + Expect(st.Nodes).To(HaveLen(1)) + Expect(st.Nodes[0].NodeID).To(Equal("n1")) + Expect(st.Nodes[0].Percentage).To(Equal(12.0)) + }) + + It("merges subsequent updates into the same NodeID entry, not appending", func() { + svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{NodeID: "n1", NodeName: "worker-a", Status: "downloading", Percentage: 12.0}) + svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{NodeID: "n1", NodeName: "worker-a", Status: "downloading", Percentage: 48.0, FileName: "vllm.tar"}) + st := svc.GetStatus("op1") + Expect(st.Nodes).To(HaveLen(1)) + Expect(st.Nodes[0].Percentage).To(Equal(48.0)) + Expect(st.Nodes[0].FileName).To(Equal("vllm.tar")) + }) + + It("appends a new entry for a different NodeID", func() { + svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{NodeID: "n1", NodeName: "worker-a", Status: "downloading", Percentage: 12.0}) + svc.UpdateNodeProgress("op1", "n2", galleryop.NodeProgress{NodeID: "n2", NodeName: "worker-b", Status: "queued"}) + st := svc.GetStatus("op1") + Expect(st.Nodes).To(HaveLen(2)) + }) + + It("mirrors the latest tick into the aggregate OpStatus fields", func() { + svc.UpdateNodeProgress("op1", "n1", galleryop.NodeProgress{ + NodeID: "n1", NodeName: "worker-a", Status: "downloading", + Percentage: 33.0, FileName: "vllm.tar", Current: "330 MB", Total: "1 GB", + }) + st := svc.GetStatus("op1") + Expect(st.Progress).To(Equal(33.0)) + Expect(st.FileName).To(Equal("vllm.tar")) + Expect(st.DownloadedFileSize).To(Equal("330 MB")) + Expect(st.TotalFileSize).To(Equal("1 GB")) + }) +}) diff --git a/core/services/galleryop/service.go b/core/services/galleryop/service.go index b6e1510c8..bd5e24b2e 100644 --- a/core/services/galleryop/service.go +++ b/core/services/galleryop/service.go @@ -135,6 +135,47 @@ func (g *GalleryService) UpdateStatus(s string, op *OpStatus) { } } +// UpdateNodeProgress merges a per-node progress tick into OpStatus.Nodes, +// keyed by nodeID, and mirrors the latest values into the aggregate +// Progress / FileName / DownloadedFileSize / TotalFileSize / Message +// fields so the legacy single-bar OperationsBar view keeps working +// unchanged alongside the new per-node breakdown. +// +// We deliberately do NOT delegate the aggregate mirror to UpdateStatus +// here: UpdateStatus overwrites the entire OpStatus, which would clobber +// the Nodes slice we just merged into. Doing the merge + mirror under a +// single lock keeps both views consistent and concurrent-safe. +func (g *GalleryService) UpdateNodeProgress(opID, nodeID string, np NodeProgress) { + g.Lock() + defer g.Unlock() + status := g.statuses[opID] + if status == nil { + status = &OpStatus{} + g.statuses[opID] = status + } + merged := false + for i := range status.Nodes { + if status.Nodes[i].NodeID == nodeID { + status.Nodes[i] = np + merged = true + break + } + } + if !merged { + status.Nodes = append(status.Nodes, np) + } + + // Mirror the latest tick into the legacy aggregate fields so the + // existing single-bar UI keeps rendering meaningful progress. + status.FileName = np.FileName + status.Progress = np.Percentage + status.DownloadedFileSize = np.Current + status.TotalFileSize = np.Total + if np.Phase != "" { + status.Message = np.Phase + } +} + func (g *GalleryService) GetStatus(s string) *OpStatus { g.Lock() defer g.Unlock()