diff --git a/core/application/startup.go b/core/application/startup.go index 8268112bc..9fb6519aa 100644 --- a/core/application/startup.go +++ b/core/application/startup.go @@ -15,6 +15,7 @@ import ( "github.com/mudler/LocalAI/core/http/auth" "github.com/mudler/LocalAI/core/services/galleryop" "github.com/mudler/LocalAI/core/services/jobs" + "github.com/mudler/LocalAI/core/services/messaging" "github.com/mudler/LocalAI/core/services/monitoring" "github.com/mudler/LocalAI/core/services/nodes" "github.com/mudler/LocalAI/core/services/routing/admission" @@ -312,6 +313,30 @@ func New(opts ...config.AppOption) (*Application, error) { } application.galleryService.SetGalleryStore(distSvc.DistStores.Gallery) } + // Hydrate from the store first so the wildcard subscriber finds an + // already-populated statuses map for any operations still in flight + // on a peer replica. + if err := application.galleryService.Hydrate(); err != nil { + xlog.Warn("Gallery service hydrate failed", "error", err) + } + // Bind cache-invalidation handler before SubscribeBroadcasts so the + // first inbound event is already routed. Peer replicas install a + // model and broadcast on SubjectCacheInvalidateModels; this + // callback re-runs LoadModelConfigsFromPath so a subsequent chat + // completion that load-balances onto this replica finds the new + // config. The originating replica reloads inline in modelHandler + // and never enters this path. + gs := application.galleryService + sys := options.SystemState + cfgLoaderOpts := options.ToConfigLoaderOptions() + gs.OnModelsChanged = func(_ messaging.CacheInvalidateEvent) { + if err := application.ModelConfigLoader().LoadModelConfigsFromPath(sys.Model.ModelsPath, cfgLoaderOpts...); err != nil { + xlog.Warn("Failed to reload model configs after peer invalidation", "error", err) + } + } + if err := application.galleryService.SubscribeBroadcasts(); err != nil { + xlog.Warn("Gallery service subscribe failed", "error", err) + } // Wire distributed model/backend managers so delete propagates to workers application.galleryService.SetModelManager( nodes.NewDistributedModelManager(options, application.modelLoader, distSvc.Unloader), diff --git a/core/http/app.go b/core/http/app.go index 4aa7aa46f..79a1067b3 100644 --- a/core/http/app.go +++ b/core/http/app.go @@ -367,6 +367,20 @@ func API(application *application.Application) (*echo.Echo, error) { var opcache *galleryop.OpCache if !application.ApplicationConfig().DisableWebUI { opcache = galleryop.NewOpCache(application.GalleryService()) + // In distributed mode, wire the NATS client + gallery store so this + // replica's OpCache stays in sync with peers — without this the + // /api/operations endpoint returns whatever this single replica + // happened to admit, and a load-balanced UI poll alternates between + // "operation visible" and "operation gone" between replicas. + if d := application.Distributed(); d != nil { + opcache.SetMessagingClient(d.Nats) + if d.DistStores != nil && d.DistStores.Gallery != nil { + opcache.SetGalleryStore(d.DistStores.Gallery) + } + if err := opcache.Start(application.ApplicationConfig().Context); err != nil { + xlog.Warn("OpCache distributed subscribe failed; running standalone", "error", err) + } + } } mcpMw := auth.RequireFeature(application.AuthDB(), auth.FeatureMCP) diff --git a/core/services/distributed/gallery.go b/core/services/distributed/gallery.go index 27ed48d61..95149e9c4 100644 --- a/core/services/distributed/gallery.go +++ b/core/services/distributed/gallery.go @@ -6,16 +6,25 @@ import ( "github.com/google/uuid" "gorm.io/gorm" + "gorm.io/gorm/clause" ) // GalleryOperationRecord tracks model/backend download operations in PostgreSQL. +// +// CacheKey and IsBackendOp mirror the in-memory OpCache held by each frontend +// replica. They are written when a request first lands so a freshly-started +// (or freshly-routed-to) replica can rebuild its OpCache from this table +// instead of returning an empty `/api/operations` payload while the real +// operation is still in flight on a peer. type GalleryOperationRecord struct { ID string `gorm:"primaryKey;size:36" json:"id"` UserID string `gorm:"index;size:36" json:"user_id,omitempty"` GalleryElementName string `gorm:"size:255" json:"gallery_element_name"` - OpType string `gorm:"size:32" json:"op_type"` // "model_install", "model_delete", "backend_install" - Status string `gorm:"size:32;default:pending" json:"status"` // pending, downloading, processing, completed, failed, cancelled - Progress float64 `json:"progress"` // 0.0 to 1.0 + CacheKey string `gorm:"index;size:512" json:"cache_key,omitempty"` // OpCache key (galleryID or node::) + IsBackendOp bool `json:"is_backend_op"` // true if installed via SetBackend + OpType string `gorm:"size:32" json:"op_type"` // "model_install", "model_delete", "backend_install" + Status string `gorm:"size:32;default:pending" json:"status"` // pending, downloading, processing, completed, failed, cancelled + Progress float64 `json:"progress"` // 0.0 to 1.0 Message string `gorm:"type:text" json:"message,omitempty"` Error string `gorm:"type:text" json:"error,omitempty"` FileName string `gorm:"size:512" json:"file_name,omitempty"` @@ -27,6 +36,12 @@ type GalleryOperationRecord struct { UpdatedAt time.Time `json:"updated_at"` } +// activeStatuses lists the gallery_operations.status values that represent an +// operation a replica should still surface via /api/operations. Hydration and +// the dedup lookup share this set so the two paths never disagree about what +// "still active" means. +var activeStatuses = []string{"pending", "downloading", "processing"} + func (GalleryOperationRecord) TableName() string { return "gallery_operations" } // GalleryStore manages gallery operation state in PostgreSQL. @@ -42,14 +57,26 @@ func NewGalleryStore(db *gorm.DB) (*GalleryStore, error) { return &GalleryStore{db: db}, nil } -// Create stores a new gallery operation. +// Create stores a new gallery operation. Tolerates a row already existing +// for this ID — OpCache.Set may have written a placeholder row via +// UpsertCacheKey before the galleryop service goroutine called Create, and +// in that case we want to fill in the descriptive columns (gallery element +// name, op type, status) rather than fail with a primary-key conflict. +// CacheKey and IsBackendOp are intentionally not in DoUpdates so the +// placeholder's values win. func (s *GalleryStore) Create(op *GalleryOperationRecord) error { if op.ID == "" { op.ID = uuid.New().String() } op.CreatedAt = time.Now() op.UpdatedAt = op.CreatedAt - return s.db.Create(op).Error + return s.db.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "id"}}, + DoUpdates: clause.AssignmentColumns([]string{ + "gallery_element_name", "op_type", "status", + "frontend_id", "user_id", "cancellable", "updated_at", + }), + }).Create(op).Error } // UpdateProgress updates progress for an operation. @@ -93,6 +120,47 @@ func (s *GalleryStore) List(status string) ([]GalleryOperationRecord, error) { return ops, q.Find(&ops).Error } +// ListActive returns operations still considered in-flight — used by replicas +// to rehydrate their in-memory OpCache + statuses on startup. Stale records +// (older than 30 minutes without an update) are excluded so a crashed peer's +// orphaned rows never resurrect on a healthy replica; the existing CleanStale +// reaper eventually marks them failed. +func (s *GalleryStore) ListActive() ([]GalleryOperationRecord, error) { + var ops []GalleryOperationRecord + staleCutoff := time.Now().Add(-30 * time.Minute) + err := s.db.Where("status IN ? AND updated_at > ?", activeStatuses, staleCutoff). + Order("created_at DESC").Find(&ops).Error + return ops, err +} + +// UpsertCacheKey records the in-memory OpCache key + IsBackendOp flag on the +// gallery_operations row, creating the row if it does not exist yet. +// +// Why upsert: OpCache.Set is called by the HTTP admission handler before the +// galleryop service goroutine processes the operation and calls Create. If +// OpCache wrote with a plain Updates() those columns would silently be lost +// in the window between the two, so peer replicas hydrating in that window +// would still rebuild an empty OpCache. Upsert closes that window. +func (s *GalleryStore) UpsertCacheKey(id, cacheKey string, isBackend bool) error { + now := time.Now() + rec := GalleryOperationRecord{ + ID: id, + CacheKey: cacheKey, + IsBackendOp: isBackend, + Status: "pending", + CreatedAt: now, + UpdatedAt: now, + } + return s.db.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "id"}}, + DoUpdates: clause.Assignments(map[string]any{ + "cache_key": cacheKey, + "is_backend_op": isBackend, + "updated_at": now, + }), + }).Create(&rec).Error +} + // FindDuplicate checks if another instance is already downloading the same element. // Only considers records updated within the last 30 minutes as active — older // in-progress records are assumed to be stale (crashed instance). @@ -100,7 +168,7 @@ func (s *GalleryStore) FindDuplicate(elementName string) (*GalleryOperationRecor var op GalleryOperationRecord staleCutoff := time.Now().Add(-30 * time.Minute) err := s.db.Where("gallery_element_name = ? AND status IN ? AND updated_at > ?", elementName, - []string{"pending", "downloading", "processing"}, staleCutoff).First(&op).Error + activeStatuses, staleCutoff).First(&op).Error if err != nil { return nil, err } @@ -118,8 +186,7 @@ func (s *GalleryStore) Cancel(id string) error { func (s *GalleryStore) CleanStale(age time.Duration) error { cutoff := time.Now().Add(-age) return s.db.Model(&GalleryOperationRecord{}). - Where("updated_at < ? AND status IN ?", cutoff, - []string{"pending", "downloading", "processing"}). + Where("updated_at < ? AND status IN ?", cutoff, activeStatuses). Updates(map[string]any{ "status": "failed", "error": "stale operation cleaned up on startup", diff --git a/core/services/galleryop/backends.go b/core/services/galleryop/backends.go index 01dc30112..1ba4e2cca 100644 --- a/core/services/galleryop/backends.go +++ b/core/services/galleryop/backends.go @@ -9,6 +9,7 @@ import ( "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/gallery" + "github.com/mudler/LocalAI/core/services/messaging" "github.com/mudler/LocalAI/pkg/downloader" "github.com/mudler/LocalAI/pkg/model" "github.com/mudler/LocalAI/pkg/system" @@ -114,6 +115,21 @@ func (g *GalleryService) backendHandler(op *ManagementOp[gallery.GalleryBackend, return err } + // Tell peer replicas that the backend set has changed. UpgradeChecker + // caches upgrade-available bits for 6 hours, so without this peers would + // keep advertising an upgrade for a backend that already moved. + opName := "install" + switch { + case op.Delete: + opName = "delete" + case op.Upgrade: + opName = "upgrade" + } + g.publishCacheInvalidate(messaging.SubjectCacheInvalidateBackends, messaging.CacheInvalidateEvent{ + Element: op.GalleryElementName, + Op: opName, + }) + g.UpdateStatus(op.ID, &OpStatus{ Deletion: op.Delete, diff --git a/core/services/galleryop/distributed_sync_test.go b/core/services/galleryop/distributed_sync_test.go new file mode 100644 index 000000000..7c1087de8 --- /dev/null +++ b/core/services/galleryop/distributed_sync_test.go @@ -0,0 +1,458 @@ +package galleryop_test + +import ( + "context" + "encoding/json" + "errors" + "strings" + "sync" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/services/distributed" + "github.com/mudler/LocalAI/core/services/galleryop" + "github.com/mudler/LocalAI/core/services/messaging" + "github.com/mudler/LocalAI/core/services/testutil" +) + +// fakeBus is an in-memory MessagingClient that delivers each published +// message synchronously to every registered subscriber whose subject filter +// matches, including NATS-style wildcard subjects (`*` matches one token). +// +// Synchronous delivery keeps the specs deterministic: the moment Publish +// returns, every subscriber's handler has run, so the spec body can read +// the resulting state without polling. +type fakeBus struct { + mu sync.Mutex + subs []fakeBusSub +} + +type fakeBusSub struct { + subject string + handler func([]byte) +} + +func newFakeBus() *fakeBus { return &fakeBus{} } + +func subjectMatches(filter, subject string) bool { + if filter == subject { + return true + } + fp := strings.Split(filter, ".") + sp := strings.Split(subject, ".") + if len(fp) != len(sp) { + return false + } + for i := range fp { + if fp[i] == "*" { + continue + } + if fp[i] != sp[i] { + return false + } + } + return true +} + +func (b *fakeBus) Publish(subject string, data any) error { + payload, err := json.Marshal(data) + if err != nil { + return err + } + b.mu.Lock() + subs := append([]fakeBusSub(nil), b.subs...) + b.mu.Unlock() + for _, s := range subs { + if subjectMatches(s.subject, subject) { + s.handler(payload) + } + } + return nil +} + +type fakeBusSubscription struct { + bus *fakeBus + subRef fakeBusSub +} + +func (s *fakeBusSubscription) Unsubscribe() error { + s.bus.mu.Lock() + defer s.bus.mu.Unlock() + for i, candidate := range s.bus.subs { + if candidate.subject == s.subRef.subject { + s.bus.subs = append(s.bus.subs[:i], s.bus.subs[i+1:]...) + return nil + } + } + return nil +} + +func (b *fakeBus) Subscribe(subject string, handler func([]byte)) (messaging.Subscription, error) { + sub := fakeBusSub{subject: subject, handler: handler} + b.mu.Lock() + b.subs = append(b.subs, sub) + b.mu.Unlock() + return &fakeBusSubscription{bus: b, subRef: sub}, nil +} + +func (b *fakeBus) QueueSubscribe(subject, _ string, handler func([]byte)) (messaging.Subscription, error) { + return b.Subscribe(subject, handler) +} + +func (b *fakeBus) QueueSubscribeReply(string, string, func([]byte, func([]byte))) (messaging.Subscription, error) { + return &fakeBusSubscription{bus: b}, nil +} + +func (b *fakeBus) SubscribeReply(string, func([]byte, func([]byte))) (messaging.Subscription, error) { + return &fakeBusSubscription{bus: b}, nil +} + +func (b *fakeBus) Request(string, []byte, time.Duration) ([]byte, error) { + return nil, nil +} + +func (b *fakeBus) IsConnected() bool { return true } +func (b *fakeBus) Close() {} + +var _ = Describe("OpStatus JSON wire format", func() { + It("round-trips a non-nil Error through Marshal/Unmarshal as a string", func() { + original := &galleryop.OpStatus{ + Progress: 42.0, + Message: "downloading", + GalleryElementName: "vllm", + Error: errors.New("disk full"), + Processed: true, + } + raw, err := json.Marshal(original) + Expect(err).ToNot(HaveOccurred()) + + var got galleryop.OpStatus + Expect(json.Unmarshal(raw, &got)).To(Succeed()) + Expect(got.Error).ToNot(BeNil(), "the error must survive the round-trip — peer replicas need to surface the failure") + Expect(got.Error.Error()).To(Equal("disk full")) + Expect(got.Progress).To(Equal(42.0)) + Expect(got.GalleryElementName).To(Equal("vllm")) + Expect(got.Processed).To(BeTrue()) + }) + + It("emits no error field when Error is nil", func() { + original := &galleryop.OpStatus{Progress: 10.0} + raw, err := json.Marshal(original) + Expect(err).ToNot(HaveOccurred()) + Expect(string(raw)).ToNot(ContainSubstring(`"error":`), + "omitempty must keep nil errors out of the wire payload") + + var got galleryop.OpStatus + Expect(json.Unmarshal(raw, &got)).To(Succeed()) + Expect(got.Error).To(BeNil()) + }) +}) + +var _ = Describe("OpCache distributed sync", func() { + var ( + svcA *galleryop.GalleryService + svcB *galleryop.GalleryService + opA *galleryop.OpCache + opB *galleryop.OpCache + bus *fakeBus + ) + + BeforeEach(func() { + bus = newFakeBus() + svcA = galleryop.NewGalleryService(&config.ApplicationConfig{}, nil) + svcB = galleryop.NewGalleryService(&config.ApplicationConfig{}, nil) + opA = galleryop.NewOpCache(svcA) + opB = galleryop.NewOpCache(svcB) + opA.SetMessagingClient(bus) + opB.SetMessagingClient(bus) + Expect(opA.Start(context.Background())).To(Succeed()) + Expect(opB.Start(context.Background())).To(Succeed()) + }) + + AfterEach(func() { + opA.Close() + opB.Close() + }) + + It("propagates a Set on replica A to replica B's OpCache", func() { + opA.Set("llama-3-8b", "job-uuid-1") + + Expect(opB.Exists("llama-3-8b")).To(BeTrue(), + "replica B should see the operation that replica A admitted") + Expect(opB.Get("llama-3-8b")).To(Equal("job-uuid-1")) + Expect(opB.IsBackendOp("llama-3-8b")).To(BeFalse()) + }) + + It("propagates SetBackend with the backend-op flag set", func() { + opA.SetBackend("official@vllm", "job-uuid-2") + + Expect(opB.Exists("official@vllm")).To(BeTrue()) + Expect(opB.IsBackendOp("official@vllm")).To(BeTrue(), + "peer must learn that this op is a backend install, not a model install") + }) + + It("propagates DeleteUUID across replicas", func() { + opA.Set("llama-3-8b", "job-uuid-3") + Expect(opB.Exists("llama-3-8b")).To(BeTrue()) + + opA.DeleteUUID("job-uuid-3") + + Expect(opA.Exists("llama-3-8b")).To(BeFalse()) + Expect(opB.Exists("llama-3-8b")).To(BeFalse(), + "a dismissed operation must clear from peer replicas too") + }) + + It("does not double-write on echo (replica A's broadcast received by replica A)", func() { + // We can't directly observe self-receive but we can confirm the + // resulting state is the same single value, not corrupted or doubled. + opA.Set("model-x", "job-uuid-4") + Expect(opA.Map()).To(HaveLen(1)) + Expect(opA.Get("model-x")).To(Equal("job-uuid-4")) + }) +}) + +var _ = Describe("OpCache PostgreSQL hydration", func() { + It("rebuilds the OpCache from active gallery_operations rows on Start", func() { + db := testutil.SetupTestDB() + store, err := distributed.NewGalleryStore(db) + Expect(err).ToNot(HaveOccurred()) + + // Seed: two active model installs (one with backend flag, one without) + // and one completed op that must NOT resurrect. + Expect(store.UpsertCacheKey("job-A", "llama-3-8b", false)).To(Succeed()) + Expect(store.UpsertCacheKey("job-B", "official@vllm", true)).To(Succeed()) + Expect(store.UpsertCacheKey("job-C", "old-stale", false)).To(Succeed()) + Expect(store.UpdateStatus("job-C", "completed", "")).To(Succeed()) + + svc := galleryop.NewGalleryService(&config.ApplicationConfig{}, nil) + cache := galleryop.NewOpCache(svc) + cache.SetGalleryStore(store) + Expect(cache.Start(context.Background())).To(Succeed()) + + Expect(cache.Exists("llama-3-8b")).To(BeTrue()) + Expect(cache.IsBackendOp("llama-3-8b")).To(BeFalse()) + Expect(cache.Get("llama-3-8b")).To(Equal("job-A")) + + Expect(cache.Exists("official@vllm")).To(BeTrue()) + Expect(cache.IsBackendOp("official@vllm")).To(BeTrue()) + + Expect(cache.Exists("old-stale")).To(BeFalse(), + "completed operations must not resurrect on hydration") + }) +}) + +var _ = Describe("GalleryService broadcast sync", func() { + var ( + svcA *galleryop.GalleryService + svcB *galleryop.GalleryService + bus *fakeBus + ) + + BeforeEach(func() { + bus = newFakeBus() + svcA = galleryop.NewGalleryService(&config.ApplicationConfig{}, nil) + svcB = galleryop.NewGalleryService(&config.ApplicationConfig{}, nil) + svcA.SetNATSClient(bus) + svcB.SetNATSClient(bus) + Expect(svcA.SubscribeBroadcasts()).To(Succeed()) + Expect(svcB.SubscribeBroadcasts()).To(Succeed()) + }) + + AfterEach(func() { + svcA.CloseBroadcasts() + svcB.CloseBroadcasts() + }) + + It("delivers an UpdateStatus on A into B's statuses map via the wildcard subscriber", func() { + svcA.UpdateStatus("op-1", &galleryop.OpStatus{ + Progress: 50.0, + Message: "halfway", + GalleryElementName: "llama-3-8b", + }) + + st := svcB.GetStatus("op-1") + Expect(st).ToNot(BeNil(), + "replica B must see the progress its peer just published") + Expect(st.Progress).To(Equal(50.0)) + Expect(st.Message).To(Equal("halfway")) + Expect(st.GalleryElementName).To(Equal("llama-3-8b")) + }) + + It("preserves a peer's accumulated Nodes when a tick arrives with empty Nodes", func() { + // Seed B with a multi-node breakdown via UpdateNodeProgress. + svcB.UpdateNodeProgress("op-2", "n1", galleryop.NodeProgress{ + NodeID: "n1", NodeName: "worker-a", Status: galleryop.NodeStatusDownloading, Percentage: 20.0, + }) + svcB.UpdateNodeProgress("op-2", "n2", galleryop.NodeProgress{ + NodeID: "n2", NodeName: "worker-b", Status: galleryop.NodeStatusDownloading, Percentage: 30.0, + }) + + // A publishes a single-bar progress tick (no Nodes) — must not wipe B's Nodes. + svcA.UpdateStatus("op-2", &galleryop.OpStatus{Progress: 25.0, Message: "downloading"}) + + st := svcB.GetStatus("op-2") + Expect(st.Nodes).To(HaveLen(2), + "merged tick must carry forward existing per-node breakdown") + }) + + It("preserves the error message through a peer's wildcard broadcast", func() { + svcA.UpdateStatus("op-3", &galleryop.OpStatus{ + Processed: true, + Error: errors.New("oci pull failed"), + }) + + st := svcB.GetStatus("op-3") + Expect(st).ToNot(BeNil()) + Expect(st.Error).ToNot(BeNil(), + "a failed op's error must survive the broadcast hop") + Expect(st.Error.Error()).To(Equal("oci pull failed")) + Expect(st.Processed).To(BeTrue()) + }) + + It("runs the local cancel func when a peer publishes a cancel event", func() { + var cancelCalled bool + ctx, cancel := context.WithCancel(context.Background()) + svcB.StoreCancellation("op-4", func() { + cancelCalled = true + cancel() + }) + _ = ctx + + // A side fires CancelOperation; the cancel func lives on B and must run. + Expect(svcA.CancelOperation("op-4")).To(Succeed()) + + Expect(cancelCalled).To(BeTrue(), + "the replica holding the cancel func must run it when a peer requests cancellation") + st := svcB.GetStatus("op-4") + Expect(st).ToNot(BeNil()) + Expect(st.Cancelled).To(BeTrue()) + }) +}) + +var _ = Describe("GalleryService cache invalidation broadcasts", func() { + var ( + svcA *galleryop.GalleryService + svcB *galleryop.GalleryService + bus *fakeBus + ) + + BeforeEach(func() { + bus = newFakeBus() + svcA = galleryop.NewGalleryService(&config.ApplicationConfig{}, nil) + svcB = galleryop.NewGalleryService(&config.ApplicationConfig{}, nil) + svcA.SetNATSClient(bus) + svcB.SetNATSClient(bus) + }) + + AfterEach(func() { + svcA.CloseBroadcasts() + svcB.CloseBroadcasts() + }) + + It("delivers SubjectCacheInvalidateModels to peer's OnModelsChanged callback", func() { + var ( + mu sync.Mutex + seen []messaging.CacheInvalidateEvent + ) + svcB.OnModelsChanged = func(evt messaging.CacheInvalidateEvent) { + mu.Lock() + seen = append(seen, evt) + mu.Unlock() + } + Expect(svcA.SubscribeBroadcasts()).To(Succeed()) + Expect(svcB.SubscribeBroadcasts()).To(Succeed()) + + Expect(bus.Publish(messaging.SubjectCacheInvalidateModels, messaging.CacheInvalidateEvent{ + Element: "llama-3-8b", Op: "install", + })).To(Succeed()) + + mu.Lock() + defer mu.Unlock() + // Both replicas subscribed; both callbacks fire (svcA's is nil-callback so no-op). + Expect(seen).To(ContainElement(messaging.CacheInvalidateEvent{ + Element: "llama-3-8b", Op: "install", + })) + }) + + It("delivers SubjectCacheInvalidateBackends to peer's OnBackendOpCompleted callback", func() { + done := make(chan struct{}, 1) + svcB.OnBackendOpCompleted = func() { + select { + case done <- struct{}{}: + default: + } + } + Expect(svcA.SubscribeBroadcasts()).To(Succeed()) + Expect(svcB.SubscribeBroadcasts()).To(Succeed()) + + Expect(bus.Publish(messaging.SubjectCacheInvalidateBackends, messaging.CacheInvalidateEvent{ + Element: "vllm", Op: "upgrade", + })).To(Succeed()) + + Eventually(done, "2s", "10ms").Should(Receive(), + "peer must fire its UpgradeChecker hook when any replica completes a backend op") + }) + + It("survives a nil OnModelsChanged callback (subscriber set but no handler)", func() { + Expect(svcA.SubscribeBroadcasts()).To(Succeed()) + Expect(svcB.SubscribeBroadcasts()).To(Succeed()) + // No callbacks registered on either side — just ensure publish does not panic. + Expect(bus.Publish(messaging.SubjectCacheInvalidateModels, messaging.CacheInvalidateEvent{ + Element: "x", Op: "install", + })).To(Succeed()) + }) +}) + +var _ = Describe("GalleryService PostgreSQL hydration", func() { + It("rebuilds the in-memory statuses map from active rows", func() { + db := testutil.SetupTestDB() + store, err := distributed.NewGalleryStore(db) + Expect(err).ToNot(HaveOccurred()) + + Expect(store.Create(&distributed.GalleryOperationRecord{ + ID: "active-op", + GalleryElementName: "llama-3-8b", + OpType: "model_install", + Status: "downloading", + Progress: 65.0, + Message: "fetching shards", + })).To(Succeed()) + Expect(store.Create(&distributed.GalleryOperationRecord{ + ID: "failed-op", + GalleryElementName: "broken", + OpType: "backend_install", + Status: "downloading", + Progress: 10.0, + Error: "oci pull failed", + })).To(Succeed()) + Expect(store.Create(&distributed.GalleryOperationRecord{ + ID: "completed-op", + GalleryElementName: "done", + OpType: "model_install", + Status: "completed", + Progress: 100.0, + })).To(Succeed()) + + svc := galleryop.NewGalleryService(&config.ApplicationConfig{}, nil) + svc.SetGalleryStore(store) + Expect(svc.Hydrate()).To(Succeed()) + + active := svc.GetStatus("active-op") + Expect(active).ToNot(BeNil(), "in-flight op must be hydrated") + Expect(active.Progress).To(Equal(65.0)) + Expect(active.Message).To(Equal("fetching shards")) + Expect(active.GalleryElementName).To(Equal("llama-3-8b")) + + failed := svc.GetStatus("failed-op") + Expect(failed).ToNot(BeNil()) + Expect(failed.Error).ToNot(BeNil(), + "the persisted error message must be reconstructed as an error value") + Expect(failed.Error.Error()).To(Equal("oci pull failed")) + + Expect(svc.GetStatus("completed-op")).To(BeNil(), + "completed ops are filtered out of ListActive and must not hydrate") + }) +}) diff --git a/core/services/galleryop/models.go b/core/services/galleryop/models.go index 2d7406f2f..3d50a330e 100644 --- a/core/services/galleryop/models.go +++ b/core/services/galleryop/models.go @@ -9,6 +9,7 @@ import ( "github.com/mudler/LocalAI/core/config" "github.com/mudler/LocalAI/core/gallery" + "github.com/mudler/LocalAI/core/services/messaging" "github.com/mudler/LocalAI/pkg/model" "github.com/mudler/LocalAI/pkg/system" "github.com/mudler/LocalAI/pkg/utils" @@ -111,6 +112,19 @@ func (g *GalleryService) modelHandler(op *ManagementOp[gallery.GalleryModel, gal return err } + // Tell peer replicas to refresh their own ModelConfigLoader. The local + // LoadModelConfigsFromPath above already covered THIS replica; without + // this broadcast a chat completion routed by the load balancer to a peer + // would fail to find a model just installed. + op2 := "install" + if op.Delete { + op2 = "delete" + } + g.publishCacheInvalidate(messaging.SubjectCacheInvalidateModels, messaging.CacheInvalidateEvent{ + Element: op.GalleryElementName, + Op: op2, + }) + g.UpdateStatus(op.ID, &OpStatus{ Deletion: op.Delete, diff --git a/core/services/galleryop/operation.go b/core/services/galleryop/operation.go index 8dd7d3180..c88be5149 100644 --- a/core/services/galleryop/operation.go +++ b/core/services/galleryop/operation.go @@ -2,10 +2,16 @@ package galleryop import ( "context" + "encoding/json" + "errors" "strings" + "sync" "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/services/distributed" + "github.com/mudler/LocalAI/core/services/messaging" "github.com/mudler/LocalAI/pkg/xsync" + "github.com/mudler/xlog" ) type ManagementOp[T any, E any] struct { @@ -44,7 +50,7 @@ type ManagementOp[T any, E any] struct { type OpStatus struct { Deletion bool `json:"deletion"` // Deletion is true if the operation is a deletion FileName string `json:"file_name"` - Error error `json:"error"` + Error error `json:"-"` // see MarshalJSON: serialized to "error" as a string Processed bool `json:"processed"` Message string `json:"message"` Progress float64 `json:"progress"` @@ -62,6 +68,99 @@ type OpStatus struct { Nodes []NodeProgress `json:"nodes,omitempty"` } +// opStatusWire is the JSON shape used when an OpStatus crosses a process +// boundary (NATS broadcast). The Error field on OpStatus is an `error` +// interface, which json.Marshal flattens to `{}` because the concrete error +// type usually has no exported fields — so a failed install replicated to a +// peer frontend would arrive with a nil error and the UI would never surface +// the failure. opStatusWire serializes the error as its Error() string and +// reconstructs it on read. +type opStatusWire struct { + Deletion bool `json:"deletion"` + FileName string `json:"file_name"` + ErrorMessage string `json:"error,omitempty"` + Processed bool `json:"processed"` + Message string `json:"message"` + Progress float64 `json:"progress"` + TotalFileSize string `json:"file_size"` + DownloadedFileSize string `json:"downloaded_size"` + GalleryElementName string `json:"gallery_element_name"` + Cancelled bool `json:"cancelled"` + Cancellable bool `json:"cancellable"` + Nodes []NodeProgress `json:"nodes,omitempty"` +} + +func (o OpStatus) MarshalJSON() ([]byte, error) { + w := opStatusWire{ + Deletion: o.Deletion, + FileName: o.FileName, + Processed: o.Processed, + Message: o.Message, + Progress: o.Progress, + TotalFileSize: o.TotalFileSize, + DownloadedFileSize: o.DownloadedFileSize, + GalleryElementName: o.GalleryElementName, + Cancelled: o.Cancelled, + Cancellable: o.Cancellable, + Nodes: o.Nodes, + } + if o.Error != nil { + w.ErrorMessage = o.Error.Error() + } + return json.Marshal(w) +} + +func (o *OpStatus) UnmarshalJSON(data []byte) error { + var w opStatusWire + if err := json.Unmarshal(data, &w); err != nil { + return err + } + o.Deletion = w.Deletion + o.FileName = w.FileName + o.Processed = w.Processed + o.Message = w.Message + o.Progress = w.Progress + o.TotalFileSize = w.TotalFileSize + o.DownloadedFileSize = w.DownloadedFileSize + o.GalleryElementName = w.GalleryElementName + o.Cancelled = w.Cancelled + o.Cancellable = w.Cancellable + o.Nodes = w.Nodes + if w.ErrorMessage != "" { + o.Error = errors.New(w.ErrorMessage) + } else { + o.Error = nil + } + return nil +} + +// OpCacheEvent is the NATS payload broadcast by frontend replicas when an +// admin operation is admitted (SubjectGalleryOpStart) or dismissed +// (SubjectGalleryOpEnd). Peers merge these into their local OpCache so a +// load-balanced /api/operations poll never returns an empty list while a +// peer is mid-install. +type OpCacheEvent struct { + JobID string `json:"job_id"` + CacheKey string `json:"cache_key"` + IsBackend bool `json:"is_backend"` +} + +// GalleryProgressEvent is the NATS payload for an OpStatus broadcast. It +// wraps OpStatus with the opID/JobID so subscribers reading the wildcard +// subject don't need to parse it back out of the NATS subject string. +type GalleryProgressEvent struct { + JobID string `json:"job_id"` + Status *OpStatus `json:"status"` +} + +// GalleryCancelEvent is the NATS payload for a gallery cancellation. The +// local cancellation func may live on a different frontend replica than the +// one that received the UI cancel button click; the broadcast subscriber +// runs the cancel func on whichever replica registered it. +type GalleryCancelEvent struct { + JobID string `json:"id"` +} + // NodeStatus values shared between NodeProgress (per-node tick) and the // NodeOpStatus surfaced by DistributedBackendManager's fan-out. Defined // as exported constants so producers (the manager, the progress bridge) @@ -98,6 +197,12 @@ type OpCache struct { status *xsync.SyncedMap[string, string] backendOps *xsync.SyncedMap[string, bool] // Tracks which operations are backend operations galleryService *GalleryService + + // Distributed sync (nil when standalone). + mu sync.RWMutex + nats messaging.MessagingClient + store *distributed.GalleryStore + subs []messaging.Subscription } func NewOpCache(galleryService *GalleryService) *OpCache { @@ -108,14 +213,159 @@ func NewOpCache(galleryService *GalleryService) *OpCache { } } +// SetMessagingClient enables cross-replica OpCache sync. Once set, Set/ +// SetBackend/DeleteUUID publish OpCacheEvent messages that peer OpCaches +// merge into their local maps. Call Start after this to subscribe. +func (m *OpCache) SetMessagingClient(nc messaging.MessagingClient) { + m.mu.Lock() + defer m.mu.Unlock() + m.nats = nc +} + +// SetGalleryStore enables PostgreSQL-backed OpCache persistence. +// Set/SetBackend upsert the cache_key + is_backend_op columns; Start +// hydrates the in-memory maps from active rows so a freshly-started +// replica does not return an empty /api/operations payload while a peer +// is mid-install. +func (m *OpCache) SetGalleryStore(s *distributed.GalleryStore) { + m.mu.Lock() + defer m.mu.Unlock() + m.store = s +} + +// Start hydrates the in-memory maps from PostgreSQL (if a store was wired) +// and subscribes to the broadcast subjects (if NATS was wired). It returns +// the first subscribe error; hydration errors are logged but non-fatal so +// the frontend still comes up. +// +// Safe to call exactly once after SetMessagingClient / SetGalleryStore. The +// ctx parameter is reserved for future cancellation — current subscriptions +// live for the lifetime of the OpCache and are released by Close. +func (m *OpCache) Start(_ context.Context) error { + m.mu.RLock() + store := m.store + nc := m.nats + m.mu.RUnlock() + + if store != nil { + if err := m.hydrateFromStore(store); err != nil { + xlog.Warn("OpCache hydrate failed; starting empty", "error", err) + } + } + + if nc == nil { + return nil + } + + startSub, err := messaging.SubscribeJSON(nc, messaging.SubjectGalleryOpStart, func(evt OpCacheEvent) { + m.applyStart(evt) + }) + if err != nil { + return err + } + endSub, err := messaging.SubscribeJSON(nc, messaging.SubjectGalleryOpEnd, func(evt OpCacheEvent) { + m.applyEnd(evt) + }) + if err != nil { + if uerr := startSub.Unsubscribe(); uerr != nil { + xlog.Warn("failed to unsubscribe partial OpCache subscription", "error", uerr) + } + return err + } + + m.mu.Lock() + m.subs = append(m.subs, startSub, endSub) + m.mu.Unlock() + return nil +} + +// Close drops all NATS subscriptions. Safe to call multiple times. +func (m *OpCache) Close() { + m.mu.Lock() + subs := m.subs + m.subs = nil + m.mu.Unlock() + for _, s := range subs { + if err := s.Unsubscribe(); err != nil { + xlog.Warn("OpCache unsubscribe failed", "error", err) + } + } +} + +func (m *OpCache) hydrateFromStore(store *distributed.GalleryStore) error { + ops, err := store.ListActive() + if err != nil { + return err + } + for _, op := range ops { + if op.CacheKey == "" { + continue + } + m.status.Set(op.CacheKey, op.ID) + if op.IsBackendOp { + m.backendOps.Set(op.CacheKey, true) + } + } + return nil +} + +// applyStart merges an inbound OpStart event into the local maps. Idempotent: +// receiving our own broadcast is a harmless re-assignment of the same value. +func (m *OpCache) applyStart(evt OpCacheEvent) { + if evt.CacheKey == "" || evt.JobID == "" { + return + } + m.status.Set(evt.CacheKey, evt.JobID) + if evt.IsBackend { + m.backendOps.Set(evt.CacheKey, true) + } +} + +// applyEnd removes any entries whose jobID matches the event. Idempotent. +func (m *OpCache) applyEnd(evt OpCacheEvent) { + if evt.JobID == "" { + return + } + for _, k := range m.status.Keys() { + if m.status.Get(k) == evt.JobID { + m.status.Delete(k) + m.backendOps.Delete(k) + } + } +} + func (m *OpCache) Set(key string, value string) { m.status.Set(key, value) + m.persistAndBroadcastStart(key, value, false) } // SetBackend sets a key-value pair and marks it as a backend operation func (m *OpCache) SetBackend(key string, value string) { m.status.Set(key, value) m.backendOps.Set(key, true) + m.persistAndBroadcastStart(key, value, true) +} + +func (m *OpCache) persistAndBroadcastStart(key, value string, isBackend bool) { + m.mu.RLock() + store := m.store + nc := m.nats + m.mu.RUnlock() + + if store != nil { + if err := store.UpsertCacheKey(value, key, isBackend); err != nil { + xlog.Warn("OpCache failed to persist cache key", "job_id", value, "error", err) + } + } + if nc != nil { + if err := nc.Publish(messaging.SubjectGalleryOpStart, OpCacheEvent{ + JobID: value, + CacheKey: key, + IsBackend: isBackend, + }); err != nil { + xlog.Warn("OpCache failed to broadcast start", "job_id", value, "error", err) + } + } } // IsBackendOp returns true if the given key is a backend operation @@ -128,10 +378,23 @@ func (m *OpCache) Get(key string) string { } func (m *OpCache) DeleteUUID(uuid string) { + deleted := false for _, k := range m.status.Keys() { if m.status.Get(k) == uuid { m.status.Delete(k) m.backendOps.Delete(k) // Also clean up the backend flag + deleted = true + } + } + if !deleted { + return + } + m.mu.RLock() + nc := m.nats + m.mu.RUnlock() + if nc != nil { + if err := nc.Publish(messaging.SubjectGalleryOpEnd, OpCacheEvent{JobID: uuid}); err != nil { + xlog.Warn("OpCache failed to broadcast end", "job_id", uuid, "error", err) } } } diff --git a/core/services/galleryop/service.go b/core/services/galleryop/service.go index 799e5144d..e98639557 100644 --- a/core/services/galleryop/service.go +++ b/core/services/galleryop/service.go @@ -2,6 +2,7 @@ package galleryop import ( "context" + "errors" "fmt" "sync" @@ -11,6 +12,7 @@ import ( "github.com/mudler/LocalAI/core/services/messaging" "github.com/mudler/LocalAI/pkg/model" "github.com/mudler/LocalAI/pkg/system" + "github.com/mudler/xlog" ) type GalleryService struct { @@ -25,16 +27,32 @@ type GalleryService struct { statuses map[string]*OpStatus cancellations map[string]context.CancelFunc - // Distributed mode (nil when not in distributed mode) - natsClient messaging.Publisher - galleryStore *distributed.GalleryStore + // Distributed mode (nil when not in distributed mode). + // natsClient is the wider MessagingClient (Publisher + subscribe methods) + // when wired by the distributed startup path; broadcastSubs holds the + // progress + cancel subscriptions opened by SubscribeBroadcasts. + natsClient messaging.MessagingClient + galleryStore *distributed.GalleryStore + broadcastSubs []messaging.Subscription // OnBackendOpCompleted is fired after every successful install/upgrade/delete // on the backend channel. The Application wires this to UpgradeChecker.TriggerCheck // so `/api/backends/upgrades` stops surfacing a backend as upgradeable the moment // the worker finishes — previously the cache only refreshed on the 6-hour tick, // making manual upgrades look like they failed even when they hadn't. + // + // In distributed mode the same hook also fires on peer replicas via the + // SubjectCacheInvalidateBackends subscriber, so every replica's + // UpgradeChecker stays in sync. OnBackendOpCompleted func() + + // OnModelsChanged is fired on peer replicas when SOMEONE else publishes + // SubjectCacheInvalidateModels. The Application wires this to + // ModelConfigLoader.LoadModelConfigsFromPath so a chat completion that + // load-balances onto this replica can find the just-installed model. + // The originating replica reloads inline (models.go) so it does not need + // the hook. + OnModelsChanged func(messaging.CacheInvalidateEvent) } func NewGalleryService(appConfig *config.ApplicationConfig, ml *model.ModelLoader) *GalleryService { @@ -75,7 +93,10 @@ func (g *GalleryService) BackendManager() BackendManager { } // SetNATSClient sets the NATS client for distributed progress publishing. -func (g *GalleryService) SetNATSClient(nc messaging.Publisher) { +// Accepting the wider MessagingClient (vs. plain Publisher) lets +// SubscribeBroadcasts wire the wildcard subscriptions that keep peer +// replicas' statuses + cancellations in sync. +func (g *GalleryService) SetNATSClient(nc messaging.MessagingClient) { g.Lock() defer g.Unlock() g.natsClient = nc @@ -109,7 +130,6 @@ func (g *GalleryService) DeleteBackend(name string) error { func (g *GalleryService) UpdateStatus(s string, op *OpStatus) { g.Lock() - defer g.Unlock() // Preserve any per-node entries already accumulated by UpdateNodeProgress: // the legacy progressCb path (used by the Phase 2 install bridge) calls // UpdateStatus with a fresh *OpStatus on every tick, which would otherwise @@ -123,9 +143,15 @@ func (g *GalleryService) UpdateStatus(s string, op *OpStatus) { } } g.statuses[s] = op + store := g.galleryStore + nc := g.natsClient + g.Unlock() - // Persist to PostgreSQL in distributed mode - if g.galleryStore != nil { + // I/O happens after Unlock. The NATS broadcast loops back into our own + // wildcard subscriber (mergeStatus), which would deadlock on this mutex + // if we still held it. Holding the lock across a PostgreSQL round-trip + // would also stall every concurrent reader on each progress tick. + if store != nil && op != nil { if op.Processed { status, errMsg := "completed", "" if op.Error != nil { @@ -135,18 +161,64 @@ func (g *GalleryService) UpdateStatus(s string, op *OpStatus) { if op.Cancelled { status = "cancelled" } - g.galleryStore.UpdateStatus(s, status, errMsg) + if err := store.UpdateStatus(s, status, errMsg); err != nil { + xlog.Warn("Failed to persist gallery operation status", "op_id", s, "error", err) + } } else { - g.galleryStore.UpdateProgress(s, op.Progress, op.Message, op.DownloadedFileSize) + if err := store.UpdateProgress(s, op.Progress, op.Message, op.DownloadedFileSize); err != nil { + xlog.Warn("Failed to persist gallery operation progress", "op_id", s, "error", err) + } } } - // Publish progress to NATS in distributed mode - if g.natsClient != nil { - g.natsClient.Publish(messaging.SubjectGalleryProgress(s), op) + // Publish progress to NATS in distributed mode. The payload wraps the + // OpStatus with the opID so peer replicas reading the wildcard subject + // don't need to parse it back out of the NATS subject string. + if nc != nil { + if err := nc.Publish(messaging.SubjectGalleryProgress(s), GalleryProgressEvent{ + JobID: s, + Status: op, + }); err != nil { + xlog.Warn("Failed to broadcast gallery progress", "op_id", s, "error", err) + } } } +// publishCacheInvalidate broadcasts a cache invalidation event so peer +// replicas refresh whatever in-memory state mirrors disk. No-op when +// natsClient is not wired (standalone mode). +func (g *GalleryService) publishCacheInvalidate(subject string, evt messaging.CacheInvalidateEvent) { + g.Lock() + nc := g.natsClient + g.Unlock() + if nc == nil { + return + } + if err := nc.Publish(subject, evt); err != nil { + xlog.Warn("Failed to broadcast cache invalidation", "subject", subject, "error", err) + } +} + +// mergeStatus is the broadcast-side merge: it updates the in-memory map from +// a peer's GalleryProgressEvent without re-publishing to NATS or re-writing +// to PostgreSQL. UpdateStatus is the local-write entry point and does both; +// mergeStatus is what the wildcard subscriber calls. Splitting them avoids +// an echo loop (replica publishes → its own subscriber receives → mergeStatus +// silently re-applies → no second publish). +func (g *GalleryService) mergeStatus(opID string, op *OpStatus) { + if op == nil { + return + } + g.Lock() + defer g.Unlock() + if len(op.Nodes) == 0 { + if prev := g.statuses[opID]; prev != nil && len(prev.Nodes) > 0 { + op.Nodes = prev.Nodes + } + } + g.statuses[opID] = op +} + // UpdateNodeProgress merges a per-node progress tick into OpStatus.Nodes, // keyed by nodeID, and mirrors the latest values into the aggregate // Progress / FileName / DownloadedFileSize / TotalFileSize / Message @@ -202,36 +274,38 @@ func (g *GalleryService) GetAllStatus() map[string]*OpStatus { return g.statuses } -// CancelOperation cancels an in-progress operation by its ID +// CancelOperation cancels an in-progress operation by its ID. +// +// In distributed mode the UI's cancel click may land on a different replica +// than the one running the operation. We still publish the cancel event in +// that case — the peer holding the cancellation func picks it up via the +// SubjectGalleryCancelWildcard subscriber and runs it locally. The caller +// gets a non-error reply so the UI shows the cancel as accepted. func (g *GalleryService) CancelOperation(id string) error { g.Lock() - defer g.Unlock() - // Check if operation is already cancelled if status, ok := g.statuses[id]; ok && status.Cancelled { + g.Unlock() return fmt.Errorf("operation %q is already cancelled", id) } - cancelFunc, exists := g.cancellations[id] - if !exists { + cancelFunc, localExists := g.cancellations[id] + if localExists { + delete(g.cancellations, id) + } + + nc := g.natsClient + + if !localExists && nc == nil { + g.Unlock() return fmt.Errorf("operation %q not found or already completed", id) } - // Cancel the operation - cancelFunc() - - // Publish cancellation to NATS in distributed mode - if g.natsClient != nil { - g.natsClient.Publish(messaging.SubjectGalleryCancel(id), map[string]string{"id": id}) - } - - // Update status to reflect cancellation if status, ok := g.statuses[id]; ok { status.Cancelled = true status.Processed = true status.Message = "cancelled" } else { - // Create status for queued operations that haven't started yet g.statuses[id] = &OpStatus{ Cancelled: true, Processed: true, @@ -239,13 +313,59 @@ func (g *GalleryService) CancelOperation(id string) error { Cancellable: false, } } + g.Unlock() - // Clean up cancellation function - delete(g.cancellations, id) + // I/O and user-provided callback after Unlock — the cancel-wildcard + // subscriber loops back into applyCancel on this same replica, which + // would otherwise deadlock on g.Mutex. + if cancelFunc != nil { + cancelFunc() + } + if nc != nil { + if err := nc.Publish(messaging.SubjectGalleryCancel(id), GalleryCancelEvent{JobID: id}); err != nil { + xlog.Warn("Failed to broadcast gallery cancel", "op_id", id, "error", err) + } + } return nil } +// applyCancel is the broadcast-side counterpart to CancelOperation. The +// wildcard subscriber calls it when a peer publishes a cancel event: +// run the local cancel func if we have one (no echo via NATS), and reflect +// the cancellation in the local statuses map. Idempotent: a replica that +// already cancelled this op locally treats the inbound event as a no-op. +func (g *GalleryService) applyCancel(id string) { + g.Lock() + cancelFunc, hasCancel := g.cancellations[id] + if hasCancel { + delete(g.cancellations, id) + } + if status, ok := g.statuses[id]; ok { + if status.Cancelled { + g.Unlock() + return + } + status.Cancelled = true + status.Processed = true + status.Message = "cancelled" + } else { + g.statuses[id] = &OpStatus{ + Cancelled: true, + Processed: true, + Message: "cancelled", + Cancellable: false, + } + } + g.Unlock() + + // Invoke the cancel func after Unlock so a callback that touches + // GalleryService doesn't re-enter the mutex. + if hasCancel { + cancelFunc() + } +} + // storeCancellation stores a cancellation function for an operation func (g *GalleryService) storeCancellation(id string, cancelFunc context.CancelFunc) { g.Lock() @@ -345,3 +465,147 @@ func (g *GalleryService) Start(c context.Context, cl *config.ModelConfigLoader, return nil } + +// SubscribeBroadcasts opens the wildcard subscriptions that keep this +// replica's in-memory statuses + cancellation state in sync with peers. +// Returns an error if the progress subscription fails; cancel-sub failures +// are not fatal but are logged. +// +// Hydrate should be called before this so the freshly-started replica has +// the pre-existing operations before live updates start flowing. +func (g *GalleryService) SubscribeBroadcasts() error { + g.Lock() + nc := g.natsClient + g.Unlock() + if nc == nil { + return nil + } + + progressSub, err := messaging.SubscribeJSON(nc, messaging.SubjectGalleryProgressWildcard, func(evt GalleryProgressEvent) { + if evt.JobID == "" || evt.Status == nil { + return + } + g.mergeStatus(evt.JobID, evt.Status) + }) + if err != nil { + return fmt.Errorf("subscribing to gallery progress wildcard: %w", err) + } + + cancelSub, err := messaging.SubscribeJSON(nc, messaging.SubjectGalleryCancelWildcard, func(evt GalleryCancelEvent) { + if evt.JobID == "" { + return + } + g.applyCancel(evt.JobID) + }) + if err != nil { + if uerr := progressSub.Unsubscribe(); uerr != nil { + xlog.Warn("failed to unsubscribe partial gallery progress sub", "error", uerr) + } + return fmt.Errorf("subscribing to gallery cancel wildcard: %w", err) + } + + modelsSub, err := messaging.SubscribeJSON(nc, messaging.SubjectCacheInvalidateModels, func(evt messaging.CacheInvalidateEvent) { + g.Lock() + cb := g.OnModelsChanged + g.Unlock() + if cb != nil { + cb(evt) + } + }) + if err != nil { + if uerr := progressSub.Unsubscribe(); uerr != nil { + xlog.Warn("failed to unsubscribe partial gallery progress sub", "error", uerr) + } + if uerr := cancelSub.Unsubscribe(); uerr != nil { + xlog.Warn("failed to unsubscribe partial gallery cancel sub", "error", uerr) + } + return fmt.Errorf("subscribing to models invalidation: %w", err) + } + + backendsSub, err := messaging.SubscribeJSON(nc, messaging.SubjectCacheInvalidateBackends, func(_ messaging.CacheInvalidateEvent) { + g.Lock() + cb := g.OnBackendOpCompleted + g.Unlock() + if cb != nil { + // Run off-goroutine so a slow UpgradeChecker doesn't stall the + // NATS receive loop. Matches the local fire-after-install path. + go cb() + } + }) + if err != nil { + if uerr := progressSub.Unsubscribe(); uerr != nil { + xlog.Warn("failed to unsubscribe partial gallery progress sub", "error", uerr) + } + if uerr := cancelSub.Unsubscribe(); uerr != nil { + xlog.Warn("failed to unsubscribe partial gallery cancel sub", "error", uerr) + } + if uerr := modelsSub.Unsubscribe(); uerr != nil { + xlog.Warn("failed to unsubscribe partial models sub", "error", uerr) + } + return fmt.Errorf("subscribing to backends invalidation: %w", err) + } + + g.Lock() + g.broadcastSubs = append(g.broadcastSubs, progressSub, cancelSub, modelsSub, backendsSub) + g.Unlock() + return nil +} + +// CloseBroadcasts drops the wildcard subscriptions. Safe to call multiple times. +func (g *GalleryService) CloseBroadcasts() { + g.Lock() + subs := g.broadcastSubs + g.broadcastSubs = nil + g.Unlock() + for _, s := range subs { + if err := s.Unsubscribe(); err != nil { + xlog.Warn("GalleryService unsubscribe failed", "error", err) + } + } +} + +// Hydrate loads still-active operations from the GalleryStore into the +// in-memory statuses map so a freshly-started replica does not return an +// empty /api/operations payload while a peer is mid-install. Idempotent. +// No-op when no store is wired. +// +// The reconstructed OpStatus carries no Error type — the DB stores the +// message as a string and Hydrate surfaces it via errors.New so the UI's +// "operation failed" banner survives a frontend restart. +func (g *GalleryService) Hydrate() error { + g.Lock() + store := g.galleryStore + g.Unlock() + if store == nil { + return nil + } + ops, err := store.ListActive() + if err != nil { + return fmt.Errorf("listing active gallery ops: %w", err) + } + g.Lock() + defer g.Unlock() + for _, op := range ops { + // Skip rows that already have an in-memory status — the live + // broadcast subscriber will fill any gaps with fresher data. + if _, ok := g.statuses[op.ID]; ok { + continue + } + st := &OpStatus{ + Message: op.Message, + Progress: op.Progress, + FileName: op.FileName, + TotalFileSize: op.TotalFileSize, + DownloadedFileSize: op.DownloadedFileSize, + GalleryElementName: op.GalleryElementName, + Cancellable: op.Cancellable, + Deletion: op.OpType == "model_delete", + } + if op.Error != "" { + st.Error = errors.New(op.Error) + } + g.statuses[op.ID] = st + } + xlog.Info("Hydrated gallery service statuses from store", "count", len(ops)) + return nil +} diff --git a/core/services/messaging/subjects.go b/core/services/messaging/subjects.go index 6cf7a9969..28af9e8d9 100644 --- a/core/services/messaging/subjects.go +++ b/core/services/messaging/subjects.go @@ -64,6 +64,16 @@ func SubjectGalleryProgress(opID string) string { return subjectGalleryPrefix + sanitizeSubjectToken(opID) + ".progress" } +// SubjectGalleryOpStart and SubjectGalleryOpEnd are broadcast subjects for the +// in-memory OpCache lifecycle. Frontend replicas publish to these when an +// admin admits a new install/delete (Start) and when an operation is +// dismissed (End), so peer replicas can keep their OpCache in sync without +// hitting PostgreSQL on every UI poll. +const ( + SubjectGalleryOpStart = "gallery.opcache.start" + SubjectGalleryOpEnd = "gallery.opcache.end" +) + // Control Signals (Pub/Sub — targeted cancellation) const ( subjectJobCancelPrefix = "jobs." @@ -321,8 +331,26 @@ func SubjectNodeFilesListDir(nodeID string) string { // Cache Invalidation (Pub/Sub — broadcast to all instances) const ( SubjectCacheInvalidateSkills = "cache.invalidate.skills" + // SubjectCacheInvalidateModels is broadcast by the replica that completed + // a model install/delete. Peers subscribe and re-run + // ModelConfigLoader.LoadModelConfigsFromPath so a chat completion routed + // to a different replica can find the newly installed model. + SubjectCacheInvalidateModels = "cache.invalidate.models" + // SubjectCacheInvalidateBackends is broadcast after a backend + // install/upgrade/delete. Peers retrigger their UpgradeChecker so the + // 6-hour upgrade-available cache flips to fresh on every replica, not + // just the one that handled the request. + SubjectCacheInvalidateBackends = "cache.invalidate.backends" ) +// CacheInvalidateEvent is the payload for cache invalidation broadcasts. +// Element names a specific model/backend when known; empty means "the whole +// set was touched, do a full reload." +type CacheInvalidateEvent struct { + Element string `json:"element,omitempty"` + Op string `json:"op,omitempty"` // "install" | "delete" | "upgrade" +} + // SubjectCacheInvalidateCollection returns the NATS subject for collection cache invalidation. func SubjectCacheInvalidateCollection(name string) string { return "cache.invalidate.collections." + sanitizeSubjectToken(name)