fix(galleryop): persist cancellation + periodically reap orphaned ops

Two distributed gaps surfaced when a replica was killed mid-upgrade on a live
cluster, leaving the backend stuck 'processing' in the UI forever:

1. CancelOperation flipped the in-memory status to cancelled and broadcast a
   NATS event but never persisted the terminal status. On the next replica
   restart the still-active row re-hydrated straight back into
   processingBackends and the UI spun again. It now calls store.Cancel(id) so
   the cancel survives a restart.

2. CleanStale (which marks abandoned active ops failed) only ran once on
   startup, so an op orphaned AFTER startup - its owning replica's foreground
   handler goroutine gone - was never reaped until the next restart. Add
   GalleryService.ReapStaleOperations and run it on a 15m ticker (CleanStale
   now returns the reaped count for observability).

Neither is covered by the OpCache self-evict fix: an orphaned op never reaches
Processed, so it would never self-evict.

Assisted-by: Claude:claude-opus-4-8 [Claude Code]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2026-06-07 23:10:53 +00:00
parent 7824105a31
commit 05c0a08e24
4 changed files with 178 additions and 12 deletions

View File

@@ -23,9 +23,9 @@ import (
"github.com/mudler/LocalAI/core/services/routing/pii"
"github.com/mudler/LocalAI/core/services/routing/router"
"github.com/mudler/LocalAI/core/services/storage"
"github.com/mudler/LocalAI/pkg/signals"
coreStartup "github.com/mudler/LocalAI/core/startup"
"github.com/mudler/LocalAI/internal"
"github.com/mudler/LocalAI/pkg/signals"
"github.com/mudler/LocalAI/pkg/vram"
"github.com/mudler/LocalAI/pkg/model"
@@ -308,10 +308,31 @@ func New(opts ...config.AppOption) (*Application, error) {
application.galleryService.SetNATSClient(distSvc.Nats)
if distSvc.DistStores != nil && distSvc.DistStores.Gallery != nil {
// Clean up stale in-progress operations from previous crashed instances
if err := distSvc.DistStores.Gallery.CleanStale(30 * time.Minute); err != nil {
if _, err := distSvc.DistStores.Gallery.CleanStale(30 * time.Minute); err != nil {
xlog.Warn("Failed to clean stale gallery operations", "error", err)
}
application.galleryService.SetGalleryStore(distSvc.DistStores.Gallery)
// Reap stale ops periodically, not just at boot: an op orphaned by
// a replica that died mid-install (its foreground handler goroutine
// gone) would otherwise linger "processing" in the UI until the next
// restart. 30m matches the install/upgrade ceiling so a genuinely
// slow op is never reaped out from under itself.
gsvc := application.galleryService
go func() {
ticker := time.NewTicker(15 * time.Minute)
defer ticker.Stop()
for {
select {
case <-options.Context.Done():
return
case <-ticker.C:
if _, err := gsvc.ReapStaleOperations(30 * time.Minute); err != nil {
xlog.Warn("Failed to reap stale gallery operations", "error", err)
}
}
}
}()
}
// Hydrate from the store first so the wildcard subscriber finds an
// already-populated statuses map for any operations still in flight

View File

@@ -180,18 +180,21 @@ func (s *GalleryStore) Cancel(id string) error {
return s.UpdateStatus(id, "cancelled", "")
}
// CleanStale marks abandoned in-progress operations as failed.
// Should be called on startup to recover from crashed instances that
// left records in pending/downloading/processing state.
func (s *GalleryStore) CleanStale(age time.Duration) error {
// CleanStale marks abandoned in-progress operations as failed and returns the
// number of rows reaped. Called on startup AND periodically to recover from
// crashed/restarted instances that left records in pending/downloading/
// processing state — an op orphaned after startup would otherwise linger
// "processing" until the next restart.
func (s *GalleryStore) CleanStale(age time.Duration) (int64, error) {
cutoff := time.Now().Add(-age)
return s.db.Model(&GalleryOperationRecord{}).
res := s.db.Model(&GalleryOperationRecord{}).
Where("updated_at < ? AND status IN ?", cutoff, activeStatuses).
Updates(map[string]any{
"status": "failed",
"error": "stale operation cleaned up on startup",
"error": "stale operation reaped (abandoned by a crashed or restarted instance)",
"updated_at": time.Now(),
}).Error
})
return res.RowsAffected, res.Error
}
// CleanOld removes operations older than the given duration.

View File

@@ -0,0 +1,106 @@
package galleryop_test
import (
"context"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/services/distributed"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/testutil"
)
// Reproduces "a cancelled/orphaned op resurrects as 'processing' after a pod
// restart". CancelOperation flipped the in-memory status to cancelled and
// broadcast a NATS event, but never persisted the terminal status to the
// gallery store. On the next replica restart the still-"pending" row hydrated
// straight back into processingBackends and the UI spun again. CancelOperation
// must persist the cancellation so it survives a restart.
var _ = Describe("GalleryService.CancelOperation persistence", func() {
It("persists the cancelled status to the gallery store", func() {
db := testutil.SetupTestDB()
store, err := distributed.NewGalleryStore(db)
Expect(err).ToNot(HaveOccurred())
// Seed an in-flight op as if a replica was mid-install.
Expect(store.Create(&distributed.GalleryOperationRecord{
ID: "op-cancel",
GalleryElementName: "llama-cpp-development",
OpType: "backend_install",
Status: "pending",
Progress: 0,
})).To(Succeed())
svc := galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
svc.SetGalleryStore(store)
// Make the op locally cancellable so CancelOperation proceeds.
svc.StoreCancellation("op-cancel", context.CancelFunc(func() {}))
Expect(svc.CancelOperation("op-cancel")).To(Succeed())
// The persisted row must now be terminal — otherwise it re-hydrates as
// pending on the next restart.
rec, err := store.Get("op-cancel")
Expect(err).ToNot(HaveOccurred())
Expect(rec.Status).To(Equal("cancelled"))
// And a fresh service hydrating from the store must NOT see it as active.
fresh := galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
fresh.SetGalleryStore(store)
Expect(fresh.Hydrate()).To(Succeed())
Expect(fresh.GetStatus("op-cancel")).To(BeNil(),
"a cancelled op must not hydrate back as active after a restart")
})
})
// Reproduces "an op orphaned by a replica that died mid-flight stays 'pending'
// forever". CleanStale (which marks abandoned active ops failed) only ran once
// on startup, so an op orphaned AFTER startup was never reaped until the next
// restart. The service must reap stale ops on an interval, not just at boot.
var _ = Describe("GalleryService.ReapStaleOperations", func() {
It("marks abandoned active ops terminal once they pass the age cutoff", func() {
db := testutil.SetupTestDB()
store, err := distributed.NewGalleryStore(db)
Expect(err).ToNot(HaveOccurred())
Expect(store.Create(&distributed.GalleryOperationRecord{
ID: "orphan-op",
GalleryElementName: "llama-cpp-development",
OpType: "backend_install",
Status: "pending",
Progress: 0,
})).To(Succeed())
// Force the row's updated_at into the past so it is older than the cutoff.
Expect(db.Exec(
"UPDATE gallery_operations SET updated_at = ? WHERE id = ?",
time.Now().Add(-1*time.Hour), "orphan-op",
).Error).To(Succeed())
// A fresh, still-progressing op must NOT be reaped.
Expect(store.Create(&distributed.GalleryOperationRecord{
ID: "live-op",
GalleryElementName: "vllm-development",
OpType: "backend_install",
Status: "downloading",
Progress: 50,
})).To(Succeed())
svc := galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
svc.SetGalleryStore(store)
reaped, err := svc.ReapStaleOperations(30 * time.Minute)
Expect(err).ToNot(HaveOccurred())
Expect(reaped).To(Equal(int64(1)))
orphan, err := store.Get("orphan-op")
Expect(err).ToNot(HaveOccurred())
Expect(orphan.Status).To(Equal("failed"))
live, err := store.Get("live-op")
Expect(err).ToNot(HaveOccurred())
Expect(live.Status).To(Equal("downloading"), "a recently-updated op must not be reaped")
})
})

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync"
"time"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
@@ -31,9 +32,9 @@ type GalleryService struct {
// natsClient is the wider MessagingClient (Publisher + subscribe methods)
// when wired by the distributed startup path; broadcastSubs holds the
// progress + cancel subscriptions opened by SubscribeBroadcasts.
natsClient messaging.MessagingClient
galleryStore *distributed.GalleryStore
broadcastSubs []messaging.Subscription
natsClient messaging.MessagingClient
galleryStore *distributed.GalleryStore
broadcastSubs []messaging.Subscription
// OnBackendOpCompleted is fired after every successful install/upgrade/delete
// on the backend channel. The Application wires this to UpgradeChecker.TriggerCheck
@@ -274,6 +275,29 @@ func (g *GalleryService) GetAllStatus() map[string]*OpStatus {
return g.statuses
}
// ReapStaleOperations marks abandoned in-progress operations (pending/
// downloading/processing) older than `age` as failed, so an op orphaned by a
// replica that died mid-flight does not linger as "processing" forever. The
// store's CleanStale runs once on startup; this exposes it for periodic
// invocation (a post-startup orphan is otherwise not reaped until the next
// restart). No-op when no gallery store is wired. Returns rows reaped.
func (g *GalleryService) ReapStaleOperations(age time.Duration) (int64, error) {
g.Lock()
store := g.galleryStore
g.Unlock()
if store == nil {
return 0, nil
}
n, err := store.CleanStale(age)
if err != nil {
return 0, err
}
if n > 0 {
xlog.Info("Reaped stale gallery operations", "count", n)
}
return n, nil
}
// CancelOperation cancels an in-progress operation by its ID.
//
// In distributed mode the UI's cancel click may land on a different replica
@@ -295,6 +319,7 @@ func (g *GalleryService) CancelOperation(id string) error {
}
nc := g.natsClient
store := g.galleryStore
if !localExists && nc == nil {
g.Unlock()
@@ -315,6 +340,17 @@ func (g *GalleryService) CancelOperation(id string) error {
}
g.Unlock()
// Persist the terminal status so the cancel survives a restart. Without
// this the row stays in its active state and re-hydrates straight back into
// processingBackends on the next replica boot — the UI spins again on an op
// the admin already cancelled. The peer that broadcasts wins the write; a
// no-op when standalone (store nil).
if store != nil {
if err := store.Cancel(id); err != nil {
xlog.Warn("Failed to persist gallery operation cancellation", "op_id", id, "error", err)
}
}
// I/O and user-provided callback after Unlock — the cancel-wildcard
// subscriber loops back into applyCancel on this same replica, which
// would otherwise deadlock on g.Mutex.