From 569d9bbd9e31a81d072094f806fc07c2c5bf64fc Mon Sep 17 00:00:00 2001 From: "LocalAI [bot]" <139863280+localai-bot@users.noreply.github.com> Date: Mon, 22 Jun 2026 09:28:07 +0200 Subject: [PATCH] fix(distributed): broadcast file-staging progress across replicas (#10440) File-staging progress lived only in the SmartRouter's in-memory StagingTracker on the replica performing the transfer. In a multi-replica deployment behind a round-robin load balancer, a /api/operations poll that lands on any other replica saw no staging row, so the progress ("processing file ... Total ... Current ...") flickered in and out as polls rotated between frontends. Mirror the pattern already used for gallery-install progress: the origin replica broadcasts staging ticks over NATS (SubjectStagingProgress, a new staging..progress subject), and peers merge them via ApplyRemote (SubscribeBroadcasts on the wildcard). Byte-level ticks are leading-edge debounced (~1/s); Start/FileComplete/Complete always publish. A locally-owned op stays authoritative so the origin's own echo and stray peer events can't clobber it, and mirrored remote ops expire after a TTL so a missed Done event can't leave a phantom row. The UI read path (StagingTracker.GetAll) is unchanged. Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto Co-authored-by: Ettore Di Giacinto --- core/application/distributed.go | 9 + core/services/messaging/subjects.go | 16 ++ core/services/nodes/staging_progress.go | 217 +++++++++++++++--- .../nodes/staging_progress_broadcast_test.go | 109 +++++++++ 4 files changed, 317 insertions(+), 34 deletions(-) create mode 100644 core/services/nodes/staging_progress_broadcast_test.go diff --git a/core/application/distributed.go b/core/application/distributed.go index 00c39422d..3235e4304 100644 --- a/core/application/distributed.go +++ b/core/application/distributed.go @@ -357,6 +357,15 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoade Pressure: pressure, }) + // Wire staging-progress broadcasting so file-staging shows up on every + // replica, not just the one performing the transfer. Without this, a + // /api/operations poll that round-robins onto a peer sees no staging row and + // the progress flickers. The origin publishes; peers mirror via the wildcard. + router.StagingTracker().SetPublisher(natsClient) + if _, err := router.StagingTracker().SubscribeBroadcasts(natsClient); err != nil { + xlog.Warn("Failed to subscribe to staging progress broadcasts", "error", err) + } + // Create ReplicaReconciler for auto-scaling model replicas. Adapter + // RegistrationToken feed the state-reconciliation passes: pending op // drain uses the adapter, and model health probes use the token to auth diff --git a/core/services/messaging/subjects.go b/core/services/messaging/subjects.go index 1bee20f44..7d099460c 100644 --- a/core/services/messaging/subjects.go +++ b/core/services/messaging/subjects.go @@ -64,6 +64,22 @@ func SubjectGalleryProgress(opID string) string { return subjectGalleryPrefix + sanitizeSubjectToken(opID) + ".progress" } +// SubjectStagingProgress returns the NATS subject a frontend replica publishes +// file-staging progress on. Staging progress is otherwise per-process state +// (the SmartRouter's in-memory StagingTracker), so without this broadcast a +// /api/operations poll that round-robins onto a replica that did not originate +// the staging op sees nothing - the progress row flickers in multi-replica +// deployments. Peers subscribe to the wildcard and merge. +func SubjectStagingProgress(modelID string) string { + return subjectStagingPrefix + sanitizeSubjectToken(modelID) + ".progress" +} + +const subjectStagingPrefix = "staging." + +// SubjectStagingProgressWildcard matches every replica's staging-progress +// broadcasts so a peer can mirror staging ops it did not originate. +const SubjectStagingProgressWildcard = "staging.*.progress" + // SubjectGalleryOpStart and SubjectGalleryOpEnd are broadcast subjects for the // in-memory OpCache lifecycle. Frontend replicas publish to these when an // admin admits a new install/delete (Start) and when an operation is diff --git a/core/services/nodes/staging_progress.go b/core/services/nodes/staging_progress.go index 3d066c0fa..0a6ddc50e 100644 --- a/core/services/nodes/staging_progress.go +++ b/core/services/nodes/staging_progress.go @@ -5,58 +5,138 @@ import ( "fmt" "sync" "time" + + "github.com/mudler/LocalAI/core/services/messaging" ) // StagingStatus represents the current progress of a model staging operation. type StagingStatus struct { - ModelID string `json:"model_id"` - NodeName string `json:"node_name"` - FileName string `json:"file_name"` - BytesSent int64 `json:"bytes_sent"` - TotalBytes int64 `json:"total_bytes"` - Progress float64 `json:"progress"` // 0-100 overall progress - Speed string `json:"speed"` - FileIndex int `json:"file_index"` - TotalFiles int `json:"total_files"` - Message string `json:"message"` + ModelID string `json:"model_id"` + NodeName string `json:"node_name"` + FileName string `json:"file_name"` + BytesSent int64 `json:"bytes_sent"` + TotalBytes int64 `json:"total_bytes"` + Progress float64 `json:"progress"` // 0-100 overall progress + Speed string `json:"speed"` + FileIndex int `json:"file_index"` + TotalFiles int `json:"total_files"` + Message string `json:"message"` StartedAt time.Time `json:"started_at"` } +const ( + // stagingBroadcastInterval bounds how often byte-level UpdateFile ticks are + // re-broadcast to peers (leading-edge debounce). State transitions (Start, + // FileComplete, Complete) always publish so peers never miss them. + stagingBroadcastInterval = time.Second + // stagingRemoteTTL drops a mirrored (remote) op whose last update is older + // than this. NATS pub/sub is fire-and-forget, so a missed Done event would + // otherwise leave a phantom staging row on a peer forever; a live op + // refreshes its mirror at least every stagingBroadcastInterval. + stagingRemoteTTL = 60 * time.Second +) + +// stagingEntry wraps a StagingStatus with the bookkeeping needed to keep peer +// replicas consistent: whether this op is mirrored from a peer (remote) vs. +// owned locally, when it was last updated (for remote-mirror expiry), and when +// its byte progress was last broadcast (for debounce). +type stagingEntry struct { + status StagingStatus + remote bool + updatedAt time.Time + lastPub time.Time +} + // StagingTracker tracks active file staging operations in-memory. // Used by SmartRouter to publish progress and by /api/operations to surface it. +// +// In distributed mode each frontend replica runs its own tracker. The replica +// performing a transfer owns the op locally and broadcasts progress over NATS +// (SetPublisher); peers mirror it via ApplyRemote (SubscribeBroadcasts) so a +// /api/operations poll that round-robins onto any replica surfaces the op. type StagingTracker struct { - mu sync.RWMutex - active map[string]*StagingStatus + mu sync.RWMutex + active map[string]*stagingEntry + publisher messaging.Publisher +} + +// StagingProgressEvent is the wire payload a frontend replica broadcasts on +// SubjectStagingProgress so peer replicas can mirror a staging op they did not +// originate. Done signals the op finished (peers drop their mirrored copy). +type StagingProgressEvent struct { + ModelID string `json:"model_id"` + Status *StagingStatus `json:"status,omitempty"` + Done bool `json:"done"` } // NewStagingTracker creates a new tracker. func NewStagingTracker() *StagingTracker { return &StagingTracker{ - active: make(map[string]*StagingStatus), + active: make(map[string]*stagingEntry), } } +// SetPublisher wires the NATS publisher used to broadcast staging progress to +// peer replicas. No-op publisher (nil) keeps the tracker standalone. +func (t *StagingTracker) SetPublisher(p messaging.Publisher) { + t.mu.Lock() + defer t.mu.Unlock() + t.publisher = p +} + +// SubscribeBroadcasts subscribes to peer replicas' staging-progress broadcasts +// and mirrors them into this tracker, so /api/operations on any replica surfaces +// staging ops it did not originate. Returns the subscription for cleanup. +func (t *StagingTracker) SubscribeBroadcasts(nc messaging.MessagingClient) (messaging.Subscription, error) { + return messaging.SubscribeJSON(nc, messaging.SubjectStagingProgressWildcard, func(evt StagingProgressEvent) { + if evt.ModelID == "" { + return + } + t.ApplyRemote(evt) + }) +} + +// publishStaging emits an event to the per-model staging subject. The publisher +// is captured by the caller under the lock and passed in, so publishing happens +// outside the lock (a slow NATS link must not stall the staging copy loop). +func publishStaging(p messaging.Publisher, evt StagingProgressEvent) { + if p == nil { + return + } + _ = p.Publish(messaging.SubjectStagingProgress(evt.ModelID), evt) +} + // Start registers a new staging operation for the given model. func (t *StagingTracker) Start(modelID, nodeName string, totalFiles int) { t.mu.Lock() - defer t.mu.Unlock() - t.active[modelID] = &StagingStatus{ - ModelID: modelID, - NodeName: nodeName, - TotalFiles: totalFiles, - StartedAt: time.Now(), - Message: "Preparing to stage model files", + e := &stagingEntry{ + status: StagingStatus{ + ModelID: modelID, + NodeName: nodeName, + TotalFiles: totalFiles, + StartedAt: time.Now(), + Message: "Preparing to stage model files", + }, + updatedAt: time.Now(), + // lastPub stays zero so the first UpdateFile tick always broadcasts. } + t.active[modelID] = e + pub := t.publisher + snap := e.status + t.mu.Unlock() + + publishStaging(pub, StagingProgressEvent{ModelID: modelID, Status: &snap}) } // UpdateFile updates the tracker with current file transfer progress. func (t *StagingTracker) UpdateFile(modelID, fileName string, fileIndex int, bytesSent, totalBytes int64, speed string) { t.mu.Lock() - defer t.mu.Unlock() - s, ok := t.active[modelID] + e, ok := t.active[modelID] if !ok { + t.mu.Unlock() return } + s := &e.status s.FileName = fileName s.FileIndex = fileIndex s.BytesSent = bytesSent @@ -79,52 +159,121 @@ func (t *StagingTracker) UpdateFile(modelID, fileName string, fileIndex int, byt } else { s.Message = fmt.Sprintf("Staging %s", fileName) } + + e.updatedAt = time.Now() + // Leading-edge debounce: byte ticks fire many times per second; only + // re-broadcast at most once per stagingBroadcastInterval. + var pub messaging.Publisher + var snap StagingStatus + if time.Since(e.lastPub) >= stagingBroadcastInterval { + e.lastPub = time.Now() + pub = t.publisher + snap = e.status + } + t.mu.Unlock() + + if pub != nil { + publishStaging(pub, StagingProgressEvent{ModelID: modelID, Status: &snap}) + } } // FileComplete marks a single file as done within a staging operation. func (t *StagingTracker) FileComplete(modelID string, fileIndex, totalFiles int) { t.mu.Lock() - defer t.mu.Unlock() - s, ok := t.active[modelID] + e, ok := t.active[modelID] if !ok { + t.mu.Unlock() return } + s := &e.status if totalFiles > 0 { s.Progress = float64(fileIndex) / float64(totalFiles) * 100 } s.BytesSent = 0 s.TotalBytes = 0 s.Speed = "" + e.updatedAt = time.Now() + e.lastPub = time.Now() + pub := t.publisher + snap := e.status + t.mu.Unlock() + + // Always broadcast a per-file completion so peers' progress bars advance. + publishStaging(pub, StagingProgressEvent{ModelID: modelID, Status: &snap}) } // Complete removes a staging operation (it's done). func (t *StagingTracker) Complete(modelID string) { t.mu.Lock() - defer t.mu.Unlock() + _, ok := t.active[modelID] delete(t.active, modelID) + pub := t.publisher + t.mu.Unlock() + + if ok { + // Tell peers to drop their mirrored copy. + publishStaging(pub, StagingProgressEvent{ModelID: modelID, Done: true}) + } } -// GetAll returns a snapshot of all active staging operations. +// ApplyRemote merges a peer replica's staging broadcast into this tracker. It +// never re-broadcasts (no echo loop). A locally-owned op is authoritative: a +// remote event for the same model is ignored, so the origin replica receiving +// its own broadcast (and any stray peer event) cannot clobber or delete it. +func (t *StagingTracker) ApplyRemote(evt StagingProgressEvent) { + t.mu.Lock() + defer t.mu.Unlock() + + if existing, ok := t.active[evt.ModelID]; ok && !existing.remote { + // We own this op locally — ignore peer chatter about it. + return + } + if evt.Done { + delete(t.active, evt.ModelID) + return + } + if evt.Status == nil { + return + } + t.active[evt.ModelID] = &stagingEntry{ + status: *evt.Status, + remote: true, + updatedAt: time.Now(), + } +} + +// GetAll returns a snapshot of all active staging operations. Stale remote +// mirrors (a peer op whose Done event was missed) are pruned here so they don't +// linger in the UI. func (t *StagingTracker) GetAll() map[string]StagingStatus { - t.mu.RLock() - defer t.mu.RUnlock() + t.mu.Lock() + defer t.mu.Unlock() + now := time.Now() result := make(map[string]StagingStatus, len(t.active)) - for k, v := range t.active { - result[k] = *v + for k, e := range t.active { + if e.remote && now.Sub(e.updatedAt) > stagingRemoteTTL { + delete(t.active, k) + continue + } + result[k] = e.status } return result } -// Get returns the status of a specific staging operation, or nil if not active. +// Get returns the status of a specific staging operation, or nil if not active +// (or a stale remote mirror). func (t *StagingTracker) Get(modelID string) *StagingStatus { t.mu.RLock() defer t.mu.RUnlock() - s, ok := t.active[modelID] + e, ok := t.active[modelID] if !ok { return nil } - copy := *s - return © + if e.remote && time.Since(e.updatedAt) > stagingRemoteTTL { + return nil + } + s := e.status + return &s } // StagingProgressCallback is called by file stagers to report byte-level progress. diff --git a/core/services/nodes/staging_progress_broadcast_test.go b/core/services/nodes/staging_progress_broadcast_test.go new file mode 100644 index 000000000..0f0f0db1e --- /dev/null +++ b/core/services/nodes/staging_progress_broadcast_test.go @@ -0,0 +1,109 @@ +package nodes + +import ( + "encoding/json" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/services/messaging" +) + +// decodeStagingEvents extracts every StagingProgressEvent the fake messaging +// client captured, in publish order. +func decodeStagingEvents(mc *fakeMessagingClient) []StagingProgressEvent { + mc.mu.Lock() + defer mc.mu.Unlock() + var out []StagingProgressEvent + for _, p := range mc.published { + var evt StagingProgressEvent + if err := json.Unmarshal(p.Data, &evt); err != nil { + continue + } + if evt.ModelID == "" { + continue + } + out = append(out, evt) + } + return out +} + +var _ = Describe("StagingTracker cross-replica broadcast", func() { + Context("when a publisher is wired (distributed mode)", func() { + It("broadcasts staging progress so a peer replica surfaces an op it did not originate", func() { + mc := &fakeMessagingClient{} + origin := NewStagingTracker() + origin.SetPublisher(mc) + + origin.Start("model-x", "worker-1", 1) + origin.UpdateFile("model-x", "weights.gguf", 1, 5<<30, 10<<30, "100 MiB/s") + + events := decodeStagingEvents(mc) + Expect(events).ToNot(BeEmpty(), "writes must be broadcast over NATS") + Expect(mc.published[0].Subject).To(Equal(messaging.SubjectStagingProgress("model-x"))) + + // A peer replica that never ran the op merges the broadcast. + peer := NewStagingTracker() + for _, evt := range events { + peer.ApplyRemote(evt) + } + + all := peer.GetAll() + Expect(all).To(HaveKey("model-x")) + Expect(all["model-x"].NodeName).To(Equal("worker-1")) + Expect(all["model-x"].FileName).To(Equal("weights.gguf")) + Expect(all["model-x"].TotalBytes).To(Equal(int64(10 << 30))) + }) + + It("removes the op from the peer when the origin completes it", func() { + mc := &fakeMessagingClient{} + origin := NewStagingTracker() + origin.SetPublisher(mc) + + origin.Start("model-x", "worker-1", 1) + origin.Complete("model-x") + + peer := NewStagingTracker() + for _, evt := range decodeStagingEvents(mc) { + peer.ApplyRemote(evt) + } + Expect(peer.GetAll()).ToNot(HaveKey("model-x")) + }) + + It("does not let a peer broadcast clobber an op this replica is itself running", func() { + local := NewStagingTracker() + local.Start("model-x", "worker-local", 2) + local.UpdateFile("model-x", "weights.gguf", 1, 9<<30, 10<<30, "") + + // A stray/older remote event for the SAME modelID must not overwrite + // the authoritative local state, nor delete it. + local.ApplyRemote(StagingProgressEvent{ + ModelID: "model-x", + Status: &StagingStatus{ModelID: "model-x", NodeName: "worker-other", FileName: "stale.gguf"}, + }) + local.ApplyRemote(StagingProgressEvent{ModelID: "model-x", Done: true}) + + all := local.GetAll() + Expect(all).To(HaveKey("model-x")) + Expect(all["model-x"].NodeName).To(Equal("worker-local")) + Expect(all["model-x"].FileName).To(Equal("weights.gguf")) + }) + }) + + Context("when no publisher is wired (standalone mode)", func() { + It("does not broadcast", func() { + mc := &fakeMessagingClient{} + t := NewStagingTracker() + t.Start("model-x", "worker-1", 1) + t.UpdateFile("model-x", "weights.gguf", 1, 1<<30, 10<<30, "") + Expect(mc.published).To(BeEmpty()) + }) + }) +}) + +var _ = Describe("SubjectStagingProgress", func() { + It("namespaces by model id and matches the wildcard prefix", func() { + Expect(messaging.SubjectStagingProgress("model-x")).To(Equal("staging.model-x.progress")) + Expect(messaging.SubjectStagingProgressWildcard).To(Equal("staging.*.progress")) + }) +})