diff --git a/core/application/startup.go b/core/application/startup.go index be559479f..8ddf5a1f6 100644 --- a/core/application/startup.go +++ b/core/application/startup.go @@ -23,9 +23,9 @@ import ( "github.com/mudler/LocalAI/core/services/routing/pii" "github.com/mudler/LocalAI/core/services/routing/router" "github.com/mudler/LocalAI/core/services/storage" - "github.com/mudler/LocalAI/pkg/signals" coreStartup "github.com/mudler/LocalAI/core/startup" "github.com/mudler/LocalAI/internal" + "github.com/mudler/LocalAI/pkg/signals" "github.com/mudler/LocalAI/pkg/vram" "github.com/mudler/LocalAI/pkg/model" @@ -308,10 +308,31 @@ func New(opts ...config.AppOption) (*Application, error) { application.galleryService.SetNATSClient(distSvc.Nats) if distSvc.DistStores != nil && distSvc.DistStores.Gallery != nil { // Clean up stale in-progress operations from previous crashed instances - if err := distSvc.DistStores.Gallery.CleanStale(30 * time.Minute); err != nil { + if _, err := distSvc.DistStores.Gallery.CleanStale(30 * time.Minute); err != nil { xlog.Warn("Failed to clean stale gallery operations", "error", err) } application.galleryService.SetGalleryStore(distSvc.DistStores.Gallery) + + // Reap stale ops periodically, not just at boot: an op orphaned by + // a replica that died mid-install (its foreground handler goroutine + // gone) would otherwise linger "processing" in the UI until the next + // restart. 30m matches the install/upgrade ceiling so a genuinely + // slow op is never reaped out from under itself. + gsvc := application.galleryService + go func() { + ticker := time.NewTicker(15 * time.Minute) + defer ticker.Stop() + for { + select { + case <-options.Context.Done(): + return + case <-ticker.C: + if _, err := gsvc.ReapStaleOperations(30 * time.Minute); err != nil { + xlog.Warn("Failed to reap stale gallery operations", "error", err) + } + } + } + }() } // Hydrate from the store first so the wildcard subscriber finds an // already-populated statuses map for any operations still in flight diff --git a/core/services/distributed/gallery.go b/core/services/distributed/gallery.go index 95149e9c4..7b1239e5a 100644 --- a/core/services/distributed/gallery.go +++ b/core/services/distributed/gallery.go @@ -180,18 +180,21 @@ func (s *GalleryStore) Cancel(id string) error { return s.UpdateStatus(id, "cancelled", "") } -// CleanStale marks abandoned in-progress operations as failed. -// Should be called on startup to recover from crashed instances that -// left records in pending/downloading/processing state. -func (s *GalleryStore) CleanStale(age time.Duration) error { +// CleanStale marks abandoned in-progress operations as failed and returns the +// number of rows reaped. Called on startup AND periodically to recover from +// crashed/restarted instances that left records in pending/downloading/ +// processing state — an op orphaned after startup would otherwise linger +// "processing" until the next restart. +func (s *GalleryStore) CleanStale(age time.Duration) (int64, error) { cutoff := time.Now().Add(-age) - return s.db.Model(&GalleryOperationRecord{}). + res := s.db.Model(&GalleryOperationRecord{}). Where("updated_at < ? AND status IN ?", cutoff, activeStatuses). Updates(map[string]any{ "status": "failed", - "error": "stale operation cleaned up on startup", + "error": "stale operation reaped (abandoned by a crashed or restarted instance)", "updated_at": time.Now(), - }).Error + }) + return res.RowsAffected, res.Error } // CleanOld removes operations older than the given duration. diff --git a/core/services/galleryop/cancel_persist_test.go b/core/services/galleryop/cancel_persist_test.go new file mode 100644 index 000000000..c75a7bd85 --- /dev/null +++ b/core/services/galleryop/cancel_persist_test.go @@ -0,0 +1,106 @@ +package galleryop_test + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/services/distributed" + "github.com/mudler/LocalAI/core/services/galleryop" + "github.com/mudler/LocalAI/core/services/testutil" +) + +// Reproduces "a cancelled/orphaned op resurrects as 'processing' after a pod +// restart". CancelOperation flipped the in-memory status to cancelled and +// broadcast a NATS event, but never persisted the terminal status to the +// gallery store. On the next replica restart the still-"pending" row hydrated +// straight back into processingBackends and the UI spun again. CancelOperation +// must persist the cancellation so it survives a restart. +var _ = Describe("GalleryService.CancelOperation persistence", func() { + It("persists the cancelled status to the gallery store", func() { + db := testutil.SetupTestDB() + store, err := distributed.NewGalleryStore(db) + Expect(err).ToNot(HaveOccurred()) + + // Seed an in-flight op as if a replica was mid-install. + Expect(store.Create(&distributed.GalleryOperationRecord{ + ID: "op-cancel", + GalleryElementName: "llama-cpp-development", + OpType: "backend_install", + Status: "pending", + Progress: 0, + })).To(Succeed()) + + svc := galleryop.NewGalleryService(&config.ApplicationConfig{}, nil) + svc.SetGalleryStore(store) + // Make the op locally cancellable so CancelOperation proceeds. + svc.StoreCancellation("op-cancel", context.CancelFunc(func() {})) + + Expect(svc.CancelOperation("op-cancel")).To(Succeed()) + + // The persisted row must now be terminal — otherwise it re-hydrates as + // pending on the next restart. + rec, err := store.Get("op-cancel") + Expect(err).ToNot(HaveOccurred()) + Expect(rec.Status).To(Equal("cancelled")) + + // And a fresh service hydrating from the store must NOT see it as active. + fresh := galleryop.NewGalleryService(&config.ApplicationConfig{}, nil) + fresh.SetGalleryStore(store) + Expect(fresh.Hydrate()).To(Succeed()) + Expect(fresh.GetStatus("op-cancel")).To(BeNil(), + "a cancelled op must not hydrate back as active after a restart") + }) +}) + +// Reproduces "an op orphaned by a replica that died mid-flight stays 'pending' +// forever". CleanStale (which marks abandoned active ops failed) only ran once +// on startup, so an op orphaned AFTER startup was never reaped until the next +// restart. The service must reap stale ops on an interval, not just at boot. +var _ = Describe("GalleryService.ReapStaleOperations", func() { + It("marks abandoned active ops terminal once they pass the age cutoff", func() { + db := testutil.SetupTestDB() + store, err := distributed.NewGalleryStore(db) + Expect(err).ToNot(HaveOccurred()) + + Expect(store.Create(&distributed.GalleryOperationRecord{ + ID: "orphan-op", + GalleryElementName: "llama-cpp-development", + OpType: "backend_install", + Status: "pending", + Progress: 0, + })).To(Succeed()) + // Force the row's updated_at into the past so it is older than the cutoff. + Expect(db.Exec( + "UPDATE gallery_operations SET updated_at = ? WHERE id = ?", + time.Now().Add(-1*time.Hour), "orphan-op", + ).Error).To(Succeed()) + + // A fresh, still-progressing op must NOT be reaped. + Expect(store.Create(&distributed.GalleryOperationRecord{ + ID: "live-op", + GalleryElementName: "vllm-development", + OpType: "backend_install", + Status: "downloading", + Progress: 50, + })).To(Succeed()) + + svc := galleryop.NewGalleryService(&config.ApplicationConfig{}, nil) + svc.SetGalleryStore(store) + + reaped, err := svc.ReapStaleOperations(30 * time.Minute) + Expect(err).ToNot(HaveOccurred()) + Expect(reaped).To(Equal(int64(1))) + + orphan, err := store.Get("orphan-op") + Expect(err).ToNot(HaveOccurred()) + Expect(orphan.Status).To(Equal("failed")) + + live, err := store.Get("live-op") + Expect(err).ToNot(HaveOccurred()) + Expect(live.Status).To(Equal("downloading"), "a recently-updated op must not be reaped") + }) +}) diff --git a/core/services/galleryop/service.go b/core/services/galleryop/service.go index e98639557..df0352e99 100644 --- a/core/services/galleryop/service.go +++ b/core/services/galleryop/service.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sync" + "time" "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/gallery" @@ -31,9 +32,9 @@ type GalleryService struct { // natsClient is the wider MessagingClient (Publisher + subscribe methods) // when wired by the distributed startup path; broadcastSubs holds the // progress + cancel subscriptions opened by SubscribeBroadcasts. - natsClient messaging.MessagingClient - galleryStore *distributed.GalleryStore - broadcastSubs []messaging.Subscription + natsClient messaging.MessagingClient + galleryStore *distributed.GalleryStore + broadcastSubs []messaging.Subscription // OnBackendOpCompleted is fired after every successful install/upgrade/delete // on the backend channel. The Application wires this to UpgradeChecker.TriggerCheck @@ -274,6 +275,29 @@ func (g *GalleryService) GetAllStatus() map[string]*OpStatus { return g.statuses } +// ReapStaleOperations marks abandoned in-progress operations (pending/ +// downloading/processing) older than `age` as failed, so an op orphaned by a +// replica that died mid-flight does not linger as "processing" forever. The +// store's CleanStale runs once on startup; this exposes it for periodic +// invocation (a post-startup orphan is otherwise not reaped until the next +// restart). No-op when no gallery store is wired. Returns rows reaped. +func (g *GalleryService) ReapStaleOperations(age time.Duration) (int64, error) { + g.Lock() + store := g.galleryStore + g.Unlock() + if store == nil { + return 0, nil + } + n, err := store.CleanStale(age) + if err != nil { + return 0, err + } + if n > 0 { + xlog.Info("Reaped stale gallery operations", "count", n) + } + return n, nil +} + // CancelOperation cancels an in-progress operation by its ID. // // In distributed mode the UI's cancel click may land on a different replica @@ -295,6 +319,7 @@ func (g *GalleryService) CancelOperation(id string) error { } nc := g.natsClient + store := g.galleryStore if !localExists && nc == nil { g.Unlock() @@ -315,6 +340,17 @@ func (g *GalleryService) CancelOperation(id string) error { } g.Unlock() + // Persist the terminal status so the cancel survives a restart. Without + // this the row stays in its active state and re-hydrates straight back into + // processingBackends on the next replica boot — the UI spins again on an op + // the admin already cancelled. The peer that broadcasts wins the write; a + // no-op when standalone (store nil). + if store != nil { + if err := store.Cancel(id); err != nil { + xlog.Warn("Failed to persist gallery operation cancellation", "op_id", id, "error", err) + } + } + // I/O and user-provided callback after Unlock — the cancel-wildcard // subscriber loops back into applyCancel on this same replica, which // would otherwise deadlock on g.Mutex.