fix(distributed): sync gallery OpCache + caches across frontend replicas (#9983)

When the LocalAI frontend deployment is scaled past one replica, the UI's
/api/operations poll round-robins between pods. Each pod kept the OpCache
(galleryID->jobID), OpStatus map, and the post-install in-memory caches
(ModelConfigLoader, UpgradeChecker) purely in-process. Reads never
consulted PostgreSQL or NATS even though writes already published to PG.
Symptoms:

- A user installing a model on replica A saw the operation card flicker
  in and out as the load balancer alternated.
- The Models page re-fetched the whole gallery on every flicker because
  useEffect([operations.length]) re-fires when the count changes.
- A chat completion that landed on replica B after the install completed
  on replica A failed to find the new model — B's ModelConfigLoader was
  still the old one because nothing told it to reload from disk.
- The UpgradeChecker 6-hour cache stayed stale on peer replicas after a
  backend upgrade, so /api/backends/upgrades kept surfacing an upgrade
  that had already shipped.

Mirror the jobs Dispatcher pattern for gallery ops:

- OpCache learns SetMessagingClient/SetGalleryStore + a Start(ctx) that
  hydrates from PostgreSQL and subscribes to gallery.opcache.{start,end}.
  Set/SetBackend now upsert cache_key + is_backend_op on the gallery_
  operations row and broadcast OpCacheEvent so peers merge it in. The
  hydrate path uses a new GalleryStore.ListActive() (status in {pending,
  downloading, processing} and updated within 30 min).
- GalleryService.SubscribeBroadcasts wires a SubjectGalleryProgress-
  Wildcard subscriber that calls a new lock-light mergeStatus into the
  local statuses map, plus a SubjectGalleryCancelWildcard subscriber that
  runs the locally-registered cancel func. Hydrate() restores active rows
  from PostgreSQL on startup so a freshly-started replica is not
  observably empty mid-install. CancelOperation tolerates the cancel func
  living on a different replica and publishes anyway.
- modelHandler and backendHandler publish on the new
  SubjectCacheInvalidateModels / SubjectCacheInvalidateBackends after
  a successful install/delete/upgrade. SubscribeBroadcasts wires peers
  to refresh: OnModelsChanged (re-runs LoadModelConfigsFromPath) and
  OnBackendOpCompleted (re-triggers UpgradeChecker). The originating
  replica reloads inline so it never enters the broadcast handler.
- OpStatus.Error (an error interface) flat-marshalled to "{}" over JSON,
  so a failed install replicated to a peer arrived with a nil error and
  the UI's failure banner never appeared. Add MarshalJSON/UnmarshalJSON
  via an opStatusWire shim that round-trips Error as a string.
- UpdateStatus and CancelOperation now drop the mutex before publishing
  to NATS or persisting to PostgreSQL. The wildcard subscriber's
  mergeStatus loops back into the same service on the publishing replica
  and would deadlock otherwise; this also prevents future PG round-trips
  from stalling concurrent readers on every progress tick.

Tests cover the OpStatus error round-trip, OpCache propagation through a
shared in-memory bus, OpCache PostgreSQL hydration (active-only),
GalleryService progress + cancel broadcast, Nodes preservation across a
peer's bare progress tick, GalleryService hydration from PG, and the
two cache-invalidation broadcasts (models + backends). 44 specs total
in galleryop; routes/operations specs and jobs/agents suites still pass.


Assisted-by: claude-code:claude-opus-4-7

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
LocalAI [bot]
2026-05-25 17:28:14 +02:00
committed by GitHub
parent b02e3ffe61
commit 8d6548c0b9
9 changed files with 1187 additions and 38 deletions

View File

@@ -15,6 +15,7 @@ import (
"github.com/mudler/LocalAI/core/http/auth"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/jobs"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/monitoring"
"github.com/mudler/LocalAI/core/services/nodes"
"github.com/mudler/LocalAI/core/services/routing/admission"
@@ -312,6 +313,30 @@ func New(opts ...config.AppOption) (*Application, error) {
}
application.galleryService.SetGalleryStore(distSvc.DistStores.Gallery)
}
// Hydrate from the store first so the wildcard subscriber finds an
// already-populated statuses map for any operations still in flight
// on a peer replica.
if err := application.galleryService.Hydrate(); err != nil {
xlog.Warn("Gallery service hydrate failed", "error", err)
}
// Bind cache-invalidation handler before SubscribeBroadcasts so the
// first inbound event is already routed. Peer replicas install a
// model and broadcast on SubjectCacheInvalidateModels; this
// callback re-runs LoadModelConfigsFromPath so a subsequent chat
// completion that load-balances onto this replica finds the new
// config. The originating replica reloads inline in modelHandler
// and never enters this path.
gs := application.galleryService
sys := options.SystemState
cfgLoaderOpts := options.ToConfigLoaderOptions()
gs.OnModelsChanged = func(_ messaging.CacheInvalidateEvent) {
if err := application.ModelConfigLoader().LoadModelConfigsFromPath(sys.Model.ModelsPath, cfgLoaderOpts...); err != nil {
xlog.Warn("Failed to reload model configs after peer invalidation", "error", err)
}
}
if err := application.galleryService.SubscribeBroadcasts(); err != nil {
xlog.Warn("Gallery service subscribe failed", "error", err)
}
// Wire distributed model/backend managers so delete propagates to workers
application.galleryService.SetModelManager(
nodes.NewDistributedModelManager(options, application.modelLoader, distSvc.Unloader),

View File

@@ -367,6 +367,20 @@ func API(application *application.Application) (*echo.Echo, error) {
var opcache *galleryop.OpCache
if !application.ApplicationConfig().DisableWebUI {
opcache = galleryop.NewOpCache(application.GalleryService())
// In distributed mode, wire the NATS client + gallery store so this
// replica's OpCache stays in sync with peers — without this the
// /api/operations endpoint returns whatever this single replica
// happened to admit, and a load-balanced UI poll alternates between
// "operation visible" and "operation gone" between replicas.
if d := application.Distributed(); d != nil {
opcache.SetMessagingClient(d.Nats)
if d.DistStores != nil && d.DistStores.Gallery != nil {
opcache.SetGalleryStore(d.DistStores.Gallery)
}
if err := opcache.Start(application.ApplicationConfig().Context); err != nil {
xlog.Warn("OpCache distributed subscribe failed; running standalone", "error", err)
}
}
}
mcpMw := auth.RequireFeature(application.AuthDB(), auth.FeatureMCP)

View File

@@ -6,16 +6,25 @@ import (
"github.com/google/uuid"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
// GalleryOperationRecord tracks model/backend download operations in PostgreSQL.
//
// CacheKey and IsBackendOp mirror the in-memory OpCache held by each frontend
// replica. They are written when a request first lands so a freshly-started
// (or freshly-routed-to) replica can rebuild its OpCache from this table
// instead of returning an empty `/api/operations` payload while the real
// operation is still in flight on a peer.
type GalleryOperationRecord struct {
ID string `gorm:"primaryKey;size:36" json:"id"`
UserID string `gorm:"index;size:36" json:"user_id,omitempty"`
GalleryElementName string `gorm:"size:255" json:"gallery_element_name"`
OpType string `gorm:"size:32" json:"op_type"` // "model_install", "model_delete", "backend_install"
Status string `gorm:"size:32;default:pending" json:"status"` // pending, downloading, processing, completed, failed, cancelled
Progress float64 `json:"progress"` // 0.0 to 1.0
CacheKey string `gorm:"index;size:512" json:"cache_key,omitempty"` // OpCache key (galleryID or node:<id>:<backend>)
IsBackendOp bool `json:"is_backend_op"` // true if installed via SetBackend
OpType string `gorm:"size:32" json:"op_type"` // "model_install", "model_delete", "backend_install"
Status string `gorm:"size:32;default:pending" json:"status"` // pending, downloading, processing, completed, failed, cancelled
Progress float64 `json:"progress"` // 0.0 to 1.0
Message string `gorm:"type:text" json:"message,omitempty"`
Error string `gorm:"type:text" json:"error,omitempty"`
FileName string `gorm:"size:512" json:"file_name,omitempty"`
@@ -27,6 +36,12 @@ type GalleryOperationRecord struct {
UpdatedAt time.Time `json:"updated_at"`
}
// activeStatuses lists the gallery_operations.status values that represent an
// operation a replica should still surface via /api/operations. Hydration and
// the dedup lookup share this set so the two paths never disagree about what
// "still active" means.
var activeStatuses = []string{"pending", "downloading", "processing"}
func (GalleryOperationRecord) TableName() string { return "gallery_operations" }
// GalleryStore manages gallery operation state in PostgreSQL.
@@ -42,14 +57,26 @@ func NewGalleryStore(db *gorm.DB) (*GalleryStore, error) {
return &GalleryStore{db: db}, nil
}
// Create stores a new gallery operation.
// Create stores a new gallery operation. Tolerates a row already existing
// for this ID — OpCache.Set may have written a placeholder row via
// UpsertCacheKey before the galleryop service goroutine called Create, and
// in that case we want to fill in the descriptive columns (gallery element
// name, op type, status) rather than fail with a primary-key conflict.
// CacheKey and IsBackendOp are intentionally not in DoUpdates so the
// placeholder's values win.
func (s *GalleryStore) Create(op *GalleryOperationRecord) error {
if op.ID == "" {
op.ID = uuid.New().String()
}
op.CreatedAt = time.Now()
op.UpdatedAt = op.CreatedAt
return s.db.Create(op).Error
return s.db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
DoUpdates: clause.AssignmentColumns([]string{
"gallery_element_name", "op_type", "status",
"frontend_id", "user_id", "cancellable", "updated_at",
}),
}).Create(op).Error
}
// UpdateProgress updates progress for an operation.
@@ -93,6 +120,47 @@ func (s *GalleryStore) List(status string) ([]GalleryOperationRecord, error) {
return ops, q.Find(&ops).Error
}
// ListActive returns operations still considered in-flight — used by replicas
// to rehydrate their in-memory OpCache + statuses on startup. Stale records
// (older than 30 minutes without an update) are excluded so a crashed peer's
// orphaned rows never resurrect on a healthy replica; the existing CleanStale
// reaper eventually marks them failed.
func (s *GalleryStore) ListActive() ([]GalleryOperationRecord, error) {
var ops []GalleryOperationRecord
staleCutoff := time.Now().Add(-30 * time.Minute)
err := s.db.Where("status IN ? AND updated_at > ?", activeStatuses, staleCutoff).
Order("created_at DESC").Find(&ops).Error
return ops, err
}
// UpsertCacheKey records the in-memory OpCache key + IsBackendOp flag on the
// gallery_operations row, creating the row if it does not exist yet.
//
// Why upsert: OpCache.Set is called by the HTTP admission handler before the
// galleryop service goroutine processes the operation and calls Create. If
// OpCache wrote with a plain Updates() those columns would silently be lost
// in the window between the two, so peer replicas hydrating in that window
// would still rebuild an empty OpCache. Upsert closes that window.
func (s *GalleryStore) UpsertCacheKey(id, cacheKey string, isBackend bool) error {
now := time.Now()
rec := GalleryOperationRecord{
ID: id,
CacheKey: cacheKey,
IsBackendOp: isBackend,
Status: "pending",
CreatedAt: now,
UpdatedAt: now,
}
return s.db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
DoUpdates: clause.Assignments(map[string]any{
"cache_key": cacheKey,
"is_backend_op": isBackend,
"updated_at": now,
}),
}).Create(&rec).Error
}
// FindDuplicate checks if another instance is already downloading the same element.
// Only considers records updated within the last 30 minutes as active — older
// in-progress records are assumed to be stale (crashed instance).
@@ -100,7 +168,7 @@ func (s *GalleryStore) FindDuplicate(elementName string) (*GalleryOperationRecor
var op GalleryOperationRecord
staleCutoff := time.Now().Add(-30 * time.Minute)
err := s.db.Where("gallery_element_name = ? AND status IN ? AND updated_at > ?", elementName,
[]string{"pending", "downloading", "processing"}, staleCutoff).First(&op).Error
activeStatuses, staleCutoff).First(&op).Error
if err != nil {
return nil, err
}
@@ -118,8 +186,7 @@ func (s *GalleryStore) Cancel(id string) error {
func (s *GalleryStore) CleanStale(age time.Duration) error {
cutoff := time.Now().Add(-age)
return s.db.Model(&GalleryOperationRecord{}).
Where("updated_at < ? AND status IN ?", cutoff,
[]string{"pending", "downloading", "processing"}).
Where("updated_at < ? AND status IN ?", cutoff, activeStatuses).
Updates(map[string]any{
"status": "failed",
"error": "stale operation cleaned up on startup",

View File

@@ -9,6 +9,7 @@ import (
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/pkg/downloader"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/system"
@@ -114,6 +115,21 @@ func (g *GalleryService) backendHandler(op *ManagementOp[gallery.GalleryBackend,
return err
}
// Tell peer replicas that the backend set has changed. UpgradeChecker
// caches upgrade-available bits for 6 hours, so without this peers would
// keep advertising an upgrade for a backend that already moved.
opName := "install"
switch {
case op.Delete:
opName = "delete"
case op.Upgrade:
opName = "upgrade"
}
g.publishCacheInvalidate(messaging.SubjectCacheInvalidateBackends, messaging.CacheInvalidateEvent{
Element: op.GalleryElementName,
Op: opName,
})
g.UpdateStatus(op.ID,
&OpStatus{
Deletion: op.Delete,

View File

@@ -0,0 +1,458 @@
package galleryop_test
import (
"context"
"encoding/json"
"errors"
"strings"
"sync"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/services/distributed"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/testutil"
)
// fakeBus is an in-memory MessagingClient that delivers each published
// message synchronously to every registered subscriber whose subject filter
// matches, including NATS-style wildcard subjects (`*` matches one token).
//
// Synchronous delivery keeps the specs deterministic: the moment Publish
// returns, every subscriber's handler has run, so the spec body can read
// the resulting state without polling.
type fakeBus struct {
mu sync.Mutex
subs []fakeBusSub
}
type fakeBusSub struct {
subject string
handler func([]byte)
}
func newFakeBus() *fakeBus { return &fakeBus{} }
func subjectMatches(filter, subject string) bool {
if filter == subject {
return true
}
fp := strings.Split(filter, ".")
sp := strings.Split(subject, ".")
if len(fp) != len(sp) {
return false
}
for i := range fp {
if fp[i] == "*" {
continue
}
if fp[i] != sp[i] {
return false
}
}
return true
}
func (b *fakeBus) Publish(subject string, data any) error {
payload, err := json.Marshal(data)
if err != nil {
return err
}
b.mu.Lock()
subs := append([]fakeBusSub(nil), b.subs...)
b.mu.Unlock()
for _, s := range subs {
if subjectMatches(s.subject, subject) {
s.handler(payload)
}
}
return nil
}
type fakeBusSubscription struct {
bus *fakeBus
subRef fakeBusSub
}
func (s *fakeBusSubscription) Unsubscribe() error {
s.bus.mu.Lock()
defer s.bus.mu.Unlock()
for i, candidate := range s.bus.subs {
if candidate.subject == s.subRef.subject {
s.bus.subs = append(s.bus.subs[:i], s.bus.subs[i+1:]...)
return nil
}
}
return nil
}
func (b *fakeBus) Subscribe(subject string, handler func([]byte)) (messaging.Subscription, error) {
sub := fakeBusSub{subject: subject, handler: handler}
b.mu.Lock()
b.subs = append(b.subs, sub)
b.mu.Unlock()
return &fakeBusSubscription{bus: b, subRef: sub}, nil
}
func (b *fakeBus) QueueSubscribe(subject, _ string, handler func([]byte)) (messaging.Subscription, error) {
return b.Subscribe(subject, handler)
}
func (b *fakeBus) QueueSubscribeReply(string, string, func([]byte, func([]byte))) (messaging.Subscription, error) {
return &fakeBusSubscription{bus: b}, nil
}
func (b *fakeBus) SubscribeReply(string, func([]byte, func([]byte))) (messaging.Subscription, error) {
return &fakeBusSubscription{bus: b}, nil
}
func (b *fakeBus) Request(string, []byte, time.Duration) ([]byte, error) {
return nil, nil
}
func (b *fakeBus) IsConnected() bool { return true }
func (b *fakeBus) Close() {}
var _ = Describe("OpStatus JSON wire format", func() {
It("round-trips a non-nil Error through Marshal/Unmarshal as a string", func() {
original := &galleryop.OpStatus{
Progress: 42.0,
Message: "downloading",
GalleryElementName: "vllm",
Error: errors.New("disk full"),
Processed: true,
}
raw, err := json.Marshal(original)
Expect(err).ToNot(HaveOccurred())
var got galleryop.OpStatus
Expect(json.Unmarshal(raw, &got)).To(Succeed())
Expect(got.Error).ToNot(BeNil(), "the error must survive the round-trip — peer replicas need to surface the failure")
Expect(got.Error.Error()).To(Equal("disk full"))
Expect(got.Progress).To(Equal(42.0))
Expect(got.GalleryElementName).To(Equal("vllm"))
Expect(got.Processed).To(BeTrue())
})
It("emits no error field when Error is nil", func() {
original := &galleryop.OpStatus{Progress: 10.0}
raw, err := json.Marshal(original)
Expect(err).ToNot(HaveOccurred())
Expect(string(raw)).ToNot(ContainSubstring(`"error":`),
"omitempty must keep nil errors out of the wire payload")
var got galleryop.OpStatus
Expect(json.Unmarshal(raw, &got)).To(Succeed())
Expect(got.Error).To(BeNil())
})
})
var _ = Describe("OpCache distributed sync", func() {
var (
svcA *galleryop.GalleryService
svcB *galleryop.GalleryService
opA *galleryop.OpCache
opB *galleryop.OpCache
bus *fakeBus
)
BeforeEach(func() {
bus = newFakeBus()
svcA = galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
svcB = galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
opA = galleryop.NewOpCache(svcA)
opB = galleryop.NewOpCache(svcB)
opA.SetMessagingClient(bus)
opB.SetMessagingClient(bus)
Expect(opA.Start(context.Background())).To(Succeed())
Expect(opB.Start(context.Background())).To(Succeed())
})
AfterEach(func() {
opA.Close()
opB.Close()
})
It("propagates a Set on replica A to replica B's OpCache", func() {
opA.Set("llama-3-8b", "job-uuid-1")
Expect(opB.Exists("llama-3-8b")).To(BeTrue(),
"replica B should see the operation that replica A admitted")
Expect(opB.Get("llama-3-8b")).To(Equal("job-uuid-1"))
Expect(opB.IsBackendOp("llama-3-8b")).To(BeFalse())
})
It("propagates SetBackend with the backend-op flag set", func() {
opA.SetBackend("official@vllm", "job-uuid-2")
Expect(opB.Exists("official@vllm")).To(BeTrue())
Expect(opB.IsBackendOp("official@vllm")).To(BeTrue(),
"peer must learn that this op is a backend install, not a model install")
})
It("propagates DeleteUUID across replicas", func() {
opA.Set("llama-3-8b", "job-uuid-3")
Expect(opB.Exists("llama-3-8b")).To(BeTrue())
opA.DeleteUUID("job-uuid-3")
Expect(opA.Exists("llama-3-8b")).To(BeFalse())
Expect(opB.Exists("llama-3-8b")).To(BeFalse(),
"a dismissed operation must clear from peer replicas too")
})
It("does not double-write on echo (replica A's broadcast received by replica A)", func() {
// We can't directly observe self-receive but we can confirm the
// resulting state is the same single value, not corrupted or doubled.
opA.Set("model-x", "job-uuid-4")
Expect(opA.Map()).To(HaveLen(1))
Expect(opA.Get("model-x")).To(Equal("job-uuid-4"))
})
})
var _ = Describe("OpCache PostgreSQL hydration", func() {
It("rebuilds the OpCache from active gallery_operations rows on Start", func() {
db := testutil.SetupTestDB()
store, err := distributed.NewGalleryStore(db)
Expect(err).ToNot(HaveOccurred())
// Seed: two active model installs (one with backend flag, one without)
// and one completed op that must NOT resurrect.
Expect(store.UpsertCacheKey("job-A", "llama-3-8b", false)).To(Succeed())
Expect(store.UpsertCacheKey("job-B", "official@vllm", true)).To(Succeed())
Expect(store.UpsertCacheKey("job-C", "old-stale", false)).To(Succeed())
Expect(store.UpdateStatus("job-C", "completed", "")).To(Succeed())
svc := galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
cache := galleryop.NewOpCache(svc)
cache.SetGalleryStore(store)
Expect(cache.Start(context.Background())).To(Succeed())
Expect(cache.Exists("llama-3-8b")).To(BeTrue())
Expect(cache.IsBackendOp("llama-3-8b")).To(BeFalse())
Expect(cache.Get("llama-3-8b")).To(Equal("job-A"))
Expect(cache.Exists("official@vllm")).To(BeTrue())
Expect(cache.IsBackendOp("official@vllm")).To(BeTrue())
Expect(cache.Exists("old-stale")).To(BeFalse(),
"completed operations must not resurrect on hydration")
})
})
var _ = Describe("GalleryService broadcast sync", func() {
var (
svcA *galleryop.GalleryService
svcB *galleryop.GalleryService
bus *fakeBus
)
BeforeEach(func() {
bus = newFakeBus()
svcA = galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
svcB = galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
svcA.SetNATSClient(bus)
svcB.SetNATSClient(bus)
Expect(svcA.SubscribeBroadcasts()).To(Succeed())
Expect(svcB.SubscribeBroadcasts()).To(Succeed())
})
AfterEach(func() {
svcA.CloseBroadcasts()
svcB.CloseBroadcasts()
})
It("delivers an UpdateStatus on A into B's statuses map via the wildcard subscriber", func() {
svcA.UpdateStatus("op-1", &galleryop.OpStatus{
Progress: 50.0,
Message: "halfway",
GalleryElementName: "llama-3-8b",
})
st := svcB.GetStatus("op-1")
Expect(st).ToNot(BeNil(),
"replica B must see the progress its peer just published")
Expect(st.Progress).To(Equal(50.0))
Expect(st.Message).To(Equal("halfway"))
Expect(st.GalleryElementName).To(Equal("llama-3-8b"))
})
It("preserves a peer's accumulated Nodes when a tick arrives with empty Nodes", func() {
// Seed B with a multi-node breakdown via UpdateNodeProgress.
svcB.UpdateNodeProgress("op-2", "n1", galleryop.NodeProgress{
NodeID: "n1", NodeName: "worker-a", Status: galleryop.NodeStatusDownloading, Percentage: 20.0,
})
svcB.UpdateNodeProgress("op-2", "n2", galleryop.NodeProgress{
NodeID: "n2", NodeName: "worker-b", Status: galleryop.NodeStatusDownloading, Percentage: 30.0,
})
// A publishes a single-bar progress tick (no Nodes) — must not wipe B's Nodes.
svcA.UpdateStatus("op-2", &galleryop.OpStatus{Progress: 25.0, Message: "downloading"})
st := svcB.GetStatus("op-2")
Expect(st.Nodes).To(HaveLen(2),
"merged tick must carry forward existing per-node breakdown")
})
It("preserves the error message through a peer's wildcard broadcast", func() {
svcA.UpdateStatus("op-3", &galleryop.OpStatus{
Processed: true,
Error: errors.New("oci pull failed"),
})
st := svcB.GetStatus("op-3")
Expect(st).ToNot(BeNil())
Expect(st.Error).ToNot(BeNil(),
"a failed op's error must survive the broadcast hop")
Expect(st.Error.Error()).To(Equal("oci pull failed"))
Expect(st.Processed).To(BeTrue())
})
It("runs the local cancel func when a peer publishes a cancel event", func() {
var cancelCalled bool
ctx, cancel := context.WithCancel(context.Background())
svcB.StoreCancellation("op-4", func() {
cancelCalled = true
cancel()
})
_ = ctx
// A side fires CancelOperation; the cancel func lives on B and must run.
Expect(svcA.CancelOperation("op-4")).To(Succeed())
Expect(cancelCalled).To(BeTrue(),
"the replica holding the cancel func must run it when a peer requests cancellation")
st := svcB.GetStatus("op-4")
Expect(st).ToNot(BeNil())
Expect(st.Cancelled).To(BeTrue())
})
})
var _ = Describe("GalleryService cache invalidation broadcasts", func() {
var (
svcA *galleryop.GalleryService
svcB *galleryop.GalleryService
bus *fakeBus
)
BeforeEach(func() {
bus = newFakeBus()
svcA = galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
svcB = galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
svcA.SetNATSClient(bus)
svcB.SetNATSClient(bus)
})
AfterEach(func() {
svcA.CloseBroadcasts()
svcB.CloseBroadcasts()
})
It("delivers SubjectCacheInvalidateModels to peer's OnModelsChanged callback", func() {
var (
mu sync.Mutex
seen []messaging.CacheInvalidateEvent
)
svcB.OnModelsChanged = func(evt messaging.CacheInvalidateEvent) {
mu.Lock()
seen = append(seen, evt)
mu.Unlock()
}
Expect(svcA.SubscribeBroadcasts()).To(Succeed())
Expect(svcB.SubscribeBroadcasts()).To(Succeed())
Expect(bus.Publish(messaging.SubjectCacheInvalidateModels, messaging.CacheInvalidateEvent{
Element: "llama-3-8b", Op: "install",
})).To(Succeed())
mu.Lock()
defer mu.Unlock()
// Both replicas subscribed; both callbacks fire (svcA's is nil-callback so no-op).
Expect(seen).To(ContainElement(messaging.CacheInvalidateEvent{
Element: "llama-3-8b", Op: "install",
}))
})
It("delivers SubjectCacheInvalidateBackends to peer's OnBackendOpCompleted callback", func() {
done := make(chan struct{}, 1)
svcB.OnBackendOpCompleted = func() {
select {
case done <- struct{}{}:
default:
}
}
Expect(svcA.SubscribeBroadcasts()).To(Succeed())
Expect(svcB.SubscribeBroadcasts()).To(Succeed())
Expect(bus.Publish(messaging.SubjectCacheInvalidateBackends, messaging.CacheInvalidateEvent{
Element: "vllm", Op: "upgrade",
})).To(Succeed())
Eventually(done, "2s", "10ms").Should(Receive(),
"peer must fire its UpgradeChecker hook when any replica completes a backend op")
})
It("survives a nil OnModelsChanged callback (subscriber set but no handler)", func() {
Expect(svcA.SubscribeBroadcasts()).To(Succeed())
Expect(svcB.SubscribeBroadcasts()).To(Succeed())
// No callbacks registered on either side — just ensure publish does not panic.
Expect(bus.Publish(messaging.SubjectCacheInvalidateModels, messaging.CacheInvalidateEvent{
Element: "x", Op: "install",
})).To(Succeed())
})
})
var _ = Describe("GalleryService PostgreSQL hydration", func() {
It("rebuilds the in-memory statuses map from active rows", func() {
db := testutil.SetupTestDB()
store, err := distributed.NewGalleryStore(db)
Expect(err).ToNot(HaveOccurred())
Expect(store.Create(&distributed.GalleryOperationRecord{
ID: "active-op",
GalleryElementName: "llama-3-8b",
OpType: "model_install",
Status: "downloading",
Progress: 65.0,
Message: "fetching shards",
})).To(Succeed())
Expect(store.Create(&distributed.GalleryOperationRecord{
ID: "failed-op",
GalleryElementName: "broken",
OpType: "backend_install",
Status: "downloading",
Progress: 10.0,
Error: "oci pull failed",
})).To(Succeed())
Expect(store.Create(&distributed.GalleryOperationRecord{
ID: "completed-op",
GalleryElementName: "done",
OpType: "model_install",
Status: "completed",
Progress: 100.0,
})).To(Succeed())
svc := galleryop.NewGalleryService(&config.ApplicationConfig{}, nil)
svc.SetGalleryStore(store)
Expect(svc.Hydrate()).To(Succeed())
active := svc.GetStatus("active-op")
Expect(active).ToNot(BeNil(), "in-flight op must be hydrated")
Expect(active.Progress).To(Equal(65.0))
Expect(active.Message).To(Equal("fetching shards"))
Expect(active.GalleryElementName).To(Equal("llama-3-8b"))
failed := svc.GetStatus("failed-op")
Expect(failed).ToNot(BeNil())
Expect(failed.Error).ToNot(BeNil(),
"the persisted error message must be reconstructed as an error value")
Expect(failed.Error.Error()).To(Equal("oci pull failed"))
Expect(svc.GetStatus("completed-op")).To(BeNil(),
"completed ops are filtered out of ListActive and must not hydrate")
})
})

View File

@@ -9,6 +9,7 @@ import (
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/system"
"github.com/mudler/LocalAI/pkg/utils"
@@ -111,6 +112,19 @@ func (g *GalleryService) modelHandler(op *ManagementOp[gallery.GalleryModel, gal
return err
}
// Tell peer replicas to refresh their own ModelConfigLoader. The local
// LoadModelConfigsFromPath above already covered THIS replica; without
// this broadcast a chat completion routed by the load balancer to a peer
// would fail to find a model just installed.
op2 := "install"
if op.Delete {
op2 = "delete"
}
g.publishCacheInvalidate(messaging.SubjectCacheInvalidateModels, messaging.CacheInvalidateEvent{
Element: op.GalleryElementName,
Op: op2,
})
g.UpdateStatus(op.ID,
&OpStatus{
Deletion: op.Delete,

View File

@@ -2,10 +2,16 @@ package galleryop
import (
"context"
"encoding/json"
"errors"
"strings"
"sync"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/services/distributed"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/pkg/xsync"
"github.com/mudler/xlog"
)
type ManagementOp[T any, E any] struct {
@@ -44,7 +50,7 @@ type ManagementOp[T any, E any] struct {
type OpStatus struct {
Deletion bool `json:"deletion"` // Deletion is true if the operation is a deletion
FileName string `json:"file_name"`
Error error `json:"error"`
Error error `json:"-"` // see MarshalJSON: serialized to "error" as a string
Processed bool `json:"processed"`
Message string `json:"message"`
Progress float64 `json:"progress"`
@@ -62,6 +68,99 @@ type OpStatus struct {
Nodes []NodeProgress `json:"nodes,omitempty"`
}
// opStatusWire is the JSON shape used when an OpStatus crosses a process
// boundary (NATS broadcast). The Error field on OpStatus is an `error`
// interface, which json.Marshal flattens to `{}` because the concrete error
// type usually has no exported fields — so a failed install replicated to a
// peer frontend would arrive with a nil error and the UI would never surface
// the failure. opStatusWire serializes the error as its Error() string and
// reconstructs it on read.
type opStatusWire struct {
Deletion bool `json:"deletion"`
FileName string `json:"file_name"`
ErrorMessage string `json:"error,omitempty"`
Processed bool `json:"processed"`
Message string `json:"message"`
Progress float64 `json:"progress"`
TotalFileSize string `json:"file_size"`
DownloadedFileSize string `json:"downloaded_size"`
GalleryElementName string `json:"gallery_element_name"`
Cancelled bool `json:"cancelled"`
Cancellable bool `json:"cancellable"`
Nodes []NodeProgress `json:"nodes,omitempty"`
}
func (o OpStatus) MarshalJSON() ([]byte, error) {
w := opStatusWire{
Deletion: o.Deletion,
FileName: o.FileName,
Processed: o.Processed,
Message: o.Message,
Progress: o.Progress,
TotalFileSize: o.TotalFileSize,
DownloadedFileSize: o.DownloadedFileSize,
GalleryElementName: o.GalleryElementName,
Cancelled: o.Cancelled,
Cancellable: o.Cancellable,
Nodes: o.Nodes,
}
if o.Error != nil {
w.ErrorMessage = o.Error.Error()
}
return json.Marshal(w)
}
func (o *OpStatus) UnmarshalJSON(data []byte) error {
var w opStatusWire
if err := json.Unmarshal(data, &w); err != nil {
return err
}
o.Deletion = w.Deletion
o.FileName = w.FileName
o.Processed = w.Processed
o.Message = w.Message
o.Progress = w.Progress
o.TotalFileSize = w.TotalFileSize
o.DownloadedFileSize = w.DownloadedFileSize
o.GalleryElementName = w.GalleryElementName
o.Cancelled = w.Cancelled
o.Cancellable = w.Cancellable
o.Nodes = w.Nodes
if w.ErrorMessage != "" {
o.Error = errors.New(w.ErrorMessage)
} else {
o.Error = nil
}
return nil
}
// OpCacheEvent is the NATS payload broadcast by frontend replicas when an
// admin operation is admitted (SubjectGalleryOpStart) or dismissed
// (SubjectGalleryOpEnd). Peers merge these into their local OpCache so a
// load-balanced /api/operations poll never returns an empty list while a
// peer is mid-install.
type OpCacheEvent struct {
JobID string `json:"job_id"`
CacheKey string `json:"cache_key"`
IsBackend bool `json:"is_backend"`
}
// GalleryProgressEvent is the NATS payload for an OpStatus broadcast. It
// wraps OpStatus with the opID/JobID so subscribers reading the wildcard
// subject don't need to parse it back out of the NATS subject string.
type GalleryProgressEvent struct {
JobID string `json:"job_id"`
Status *OpStatus `json:"status"`
}
// GalleryCancelEvent is the NATS payload for a gallery cancellation. The
// local cancellation func may live on a different frontend replica than the
// one that received the UI cancel button click; the broadcast subscriber
// runs the cancel func on whichever replica registered it.
type GalleryCancelEvent struct {
JobID string `json:"id"`
}
// NodeStatus values shared between NodeProgress (per-node tick) and the
// NodeOpStatus surfaced by DistributedBackendManager's fan-out. Defined
// as exported constants so producers (the manager, the progress bridge)
@@ -98,6 +197,12 @@ type OpCache struct {
status *xsync.SyncedMap[string, string]
backendOps *xsync.SyncedMap[string, bool] // Tracks which operations are backend operations
galleryService *GalleryService
// Distributed sync (nil when standalone).
mu sync.RWMutex
nats messaging.MessagingClient
store *distributed.GalleryStore
subs []messaging.Subscription
}
func NewOpCache(galleryService *GalleryService) *OpCache {
@@ -108,14 +213,159 @@ func NewOpCache(galleryService *GalleryService) *OpCache {
}
}
// SetMessagingClient enables cross-replica OpCache sync. Once set, Set/
// SetBackend/DeleteUUID publish OpCacheEvent messages that peer OpCaches
// merge into their local maps. Call Start after this to subscribe.
func (m *OpCache) SetMessagingClient(nc messaging.MessagingClient) {
m.mu.Lock()
defer m.mu.Unlock()
m.nats = nc
}
// SetGalleryStore enables PostgreSQL-backed OpCache persistence.
// Set/SetBackend upsert the cache_key + is_backend_op columns; Start
// hydrates the in-memory maps from active rows so a freshly-started
// replica does not return an empty /api/operations payload while a peer
// is mid-install.
func (m *OpCache) SetGalleryStore(s *distributed.GalleryStore) {
m.mu.Lock()
defer m.mu.Unlock()
m.store = s
}
// Start hydrates the in-memory maps from PostgreSQL (if a store was wired)
// and subscribes to the broadcast subjects (if NATS was wired). It returns
// the first subscribe error; hydration errors are logged but non-fatal so
// the frontend still comes up.
//
// Safe to call exactly once after SetMessagingClient / SetGalleryStore. The
// ctx parameter is reserved for future cancellation — current subscriptions
// live for the lifetime of the OpCache and are released by Close.
func (m *OpCache) Start(_ context.Context) error {
m.mu.RLock()
store := m.store
nc := m.nats
m.mu.RUnlock()
if store != nil {
if err := m.hydrateFromStore(store); err != nil {
xlog.Warn("OpCache hydrate failed; starting empty", "error", err)
}
}
if nc == nil {
return nil
}
startSub, err := messaging.SubscribeJSON(nc, messaging.SubjectGalleryOpStart, func(evt OpCacheEvent) {
m.applyStart(evt)
})
if err != nil {
return err
}
endSub, err := messaging.SubscribeJSON(nc, messaging.SubjectGalleryOpEnd, func(evt OpCacheEvent) {
m.applyEnd(evt)
})
if err != nil {
if uerr := startSub.Unsubscribe(); uerr != nil {
xlog.Warn("failed to unsubscribe partial OpCache subscription", "error", uerr)
}
return err
}
m.mu.Lock()
m.subs = append(m.subs, startSub, endSub)
m.mu.Unlock()
return nil
}
// Close drops all NATS subscriptions. Safe to call multiple times.
func (m *OpCache) Close() {
m.mu.Lock()
subs := m.subs
m.subs = nil
m.mu.Unlock()
for _, s := range subs {
if err := s.Unsubscribe(); err != nil {
xlog.Warn("OpCache unsubscribe failed", "error", err)
}
}
}
func (m *OpCache) hydrateFromStore(store *distributed.GalleryStore) error {
ops, err := store.ListActive()
if err != nil {
return err
}
for _, op := range ops {
if op.CacheKey == "" {
continue
}
m.status.Set(op.CacheKey, op.ID)
if op.IsBackendOp {
m.backendOps.Set(op.CacheKey, true)
}
}
return nil
}
// applyStart merges an inbound OpStart event into the local maps. Idempotent:
// receiving our own broadcast is a harmless re-assignment of the same value.
func (m *OpCache) applyStart(evt OpCacheEvent) {
if evt.CacheKey == "" || evt.JobID == "" {
return
}
m.status.Set(evt.CacheKey, evt.JobID)
if evt.IsBackend {
m.backendOps.Set(evt.CacheKey, true)
}
}
// applyEnd removes any entries whose jobID matches the event. Idempotent.
func (m *OpCache) applyEnd(evt OpCacheEvent) {
if evt.JobID == "" {
return
}
for _, k := range m.status.Keys() {
if m.status.Get(k) == evt.JobID {
m.status.Delete(k)
m.backendOps.Delete(k)
}
}
}
func (m *OpCache) Set(key string, value string) {
m.status.Set(key, value)
m.persistAndBroadcastStart(key, value, false)
}
// SetBackend sets a key-value pair and marks it as a backend operation
func (m *OpCache) SetBackend(key string, value string) {
m.status.Set(key, value)
m.backendOps.Set(key, true)
m.persistAndBroadcastStart(key, value, true)
}
func (m *OpCache) persistAndBroadcastStart(key, value string, isBackend bool) {
m.mu.RLock()
store := m.store
nc := m.nats
m.mu.RUnlock()
if store != nil {
if err := store.UpsertCacheKey(value, key, isBackend); err != nil {
xlog.Warn("OpCache failed to persist cache key", "job_id", value, "error", err)
}
}
if nc != nil {
if err := nc.Publish(messaging.SubjectGalleryOpStart, OpCacheEvent{
JobID: value,
CacheKey: key,
IsBackend: isBackend,
}); err != nil {
xlog.Warn("OpCache failed to broadcast start", "job_id", value, "error", err)
}
}
}
// IsBackendOp returns true if the given key is a backend operation
@@ -128,10 +378,23 @@ func (m *OpCache) Get(key string) string {
}
func (m *OpCache) DeleteUUID(uuid string) {
deleted := false
for _, k := range m.status.Keys() {
if m.status.Get(k) == uuid {
m.status.Delete(k)
m.backendOps.Delete(k) // Also clean up the backend flag
deleted = true
}
}
if !deleted {
return
}
m.mu.RLock()
nc := m.nats
m.mu.RUnlock()
if nc != nil {
if err := nc.Publish(messaging.SubjectGalleryOpEnd, OpCacheEvent{JobID: uuid}); err != nil {
xlog.Warn("OpCache failed to broadcast end", "job_id", uuid, "error", err)
}
}
}

View File

@@ -2,6 +2,7 @@ package galleryop
import (
"context"
"errors"
"fmt"
"sync"
@@ -11,6 +12,7 @@ import (
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/system"
"github.com/mudler/xlog"
)
type GalleryService struct {
@@ -25,16 +27,32 @@ type GalleryService struct {
statuses map[string]*OpStatus
cancellations map[string]context.CancelFunc
// Distributed mode (nil when not in distributed mode)
natsClient messaging.Publisher
galleryStore *distributed.GalleryStore
// Distributed mode (nil when not in distributed mode).
// natsClient is the wider MessagingClient (Publisher + subscribe methods)
// when wired by the distributed startup path; broadcastSubs holds the
// progress + cancel subscriptions opened by SubscribeBroadcasts.
natsClient messaging.MessagingClient
galleryStore *distributed.GalleryStore
broadcastSubs []messaging.Subscription
// OnBackendOpCompleted is fired after every successful install/upgrade/delete
// on the backend channel. The Application wires this to UpgradeChecker.TriggerCheck
// so `/api/backends/upgrades` stops surfacing a backend as upgradeable the moment
// the worker finishes — previously the cache only refreshed on the 6-hour tick,
// making manual upgrades look like they failed even when they hadn't.
//
// In distributed mode the same hook also fires on peer replicas via the
// SubjectCacheInvalidateBackends subscriber, so every replica's
// UpgradeChecker stays in sync.
OnBackendOpCompleted func()
// OnModelsChanged is fired on peer replicas when SOMEONE else publishes
// SubjectCacheInvalidateModels. The Application wires this to
// ModelConfigLoader.LoadModelConfigsFromPath so a chat completion that
// load-balances onto this replica can find the just-installed model.
// The originating replica reloads inline (models.go) so it does not need
// the hook.
OnModelsChanged func(messaging.CacheInvalidateEvent)
}
func NewGalleryService(appConfig *config.ApplicationConfig, ml *model.ModelLoader) *GalleryService {
@@ -75,7 +93,10 @@ func (g *GalleryService) BackendManager() BackendManager {
}
// SetNATSClient sets the NATS client for distributed progress publishing.
func (g *GalleryService) SetNATSClient(nc messaging.Publisher) {
// Accepting the wider MessagingClient (vs. plain Publisher) lets
// SubscribeBroadcasts wire the wildcard subscriptions that keep peer
// replicas' statuses + cancellations in sync.
func (g *GalleryService) SetNATSClient(nc messaging.MessagingClient) {
g.Lock()
defer g.Unlock()
g.natsClient = nc
@@ -109,7 +130,6 @@ func (g *GalleryService) DeleteBackend(name string) error {
func (g *GalleryService) UpdateStatus(s string, op *OpStatus) {
g.Lock()
defer g.Unlock()
// Preserve any per-node entries already accumulated by UpdateNodeProgress:
// the legacy progressCb path (used by the Phase 2 install bridge) calls
// UpdateStatus with a fresh *OpStatus on every tick, which would otherwise
@@ -123,9 +143,15 @@ func (g *GalleryService) UpdateStatus(s string, op *OpStatus) {
}
}
g.statuses[s] = op
store := g.galleryStore
nc := g.natsClient
g.Unlock()
// Persist to PostgreSQL in distributed mode
if g.galleryStore != nil {
// I/O happens after Unlock. The NATS broadcast loops back into our own
// wildcard subscriber (mergeStatus), which would deadlock on this mutex
// if we still held it. Holding the lock across a PostgreSQL round-trip
// would also stall every concurrent reader on each progress tick.
if store != nil && op != nil {
if op.Processed {
status, errMsg := "completed", ""
if op.Error != nil {
@@ -135,18 +161,64 @@ func (g *GalleryService) UpdateStatus(s string, op *OpStatus) {
if op.Cancelled {
status = "cancelled"
}
g.galleryStore.UpdateStatus(s, status, errMsg)
if err := store.UpdateStatus(s, status, errMsg); err != nil {
xlog.Warn("Failed to persist gallery operation status", "op_id", s, "error", err)
}
} else {
g.galleryStore.UpdateProgress(s, op.Progress, op.Message, op.DownloadedFileSize)
if err := store.UpdateProgress(s, op.Progress, op.Message, op.DownloadedFileSize); err != nil {
xlog.Warn("Failed to persist gallery operation progress", "op_id", s, "error", err)
}
}
}
// Publish progress to NATS in distributed mode
if g.natsClient != nil {
g.natsClient.Publish(messaging.SubjectGalleryProgress(s), op)
// Publish progress to NATS in distributed mode. The payload wraps the
// OpStatus with the opID so peer replicas reading the wildcard subject
// don't need to parse it back out of the NATS subject string.
if nc != nil {
if err := nc.Publish(messaging.SubjectGalleryProgress(s), GalleryProgressEvent{
JobID: s,
Status: op,
}); err != nil {
xlog.Warn("Failed to broadcast gallery progress", "op_id", s, "error", err)
}
}
}
// publishCacheInvalidate broadcasts a cache invalidation event so peer
// replicas refresh whatever in-memory state mirrors disk. No-op when
// natsClient is not wired (standalone mode).
func (g *GalleryService) publishCacheInvalidate(subject string, evt messaging.CacheInvalidateEvent) {
g.Lock()
nc := g.natsClient
g.Unlock()
if nc == nil {
return
}
if err := nc.Publish(subject, evt); err != nil {
xlog.Warn("Failed to broadcast cache invalidation", "subject", subject, "error", err)
}
}
// mergeStatus is the broadcast-side merge: it updates the in-memory map from
// a peer's GalleryProgressEvent without re-publishing to NATS or re-writing
// to PostgreSQL. UpdateStatus is the local-write entry point and does both;
// mergeStatus is what the wildcard subscriber calls. Splitting them avoids
// an echo loop (replica publishes → its own subscriber receives → mergeStatus
// silently re-applies → no second publish).
func (g *GalleryService) mergeStatus(opID string, op *OpStatus) {
if op == nil {
return
}
g.Lock()
defer g.Unlock()
if len(op.Nodes) == 0 {
if prev := g.statuses[opID]; prev != nil && len(prev.Nodes) > 0 {
op.Nodes = prev.Nodes
}
}
g.statuses[opID] = op
}
// UpdateNodeProgress merges a per-node progress tick into OpStatus.Nodes,
// keyed by nodeID, and mirrors the latest values into the aggregate
// Progress / FileName / DownloadedFileSize / TotalFileSize / Message
@@ -202,36 +274,38 @@ func (g *GalleryService) GetAllStatus() map[string]*OpStatus {
return g.statuses
}
// CancelOperation cancels an in-progress operation by its ID
// CancelOperation cancels an in-progress operation by its ID.
//
// In distributed mode the UI's cancel click may land on a different replica
// than the one running the operation. We still publish the cancel event in
// that case — the peer holding the cancellation func picks it up via the
// SubjectGalleryCancelWildcard subscriber and runs it locally. The caller
// gets a non-error reply so the UI shows the cancel as accepted.
func (g *GalleryService) CancelOperation(id string) error {
g.Lock()
defer g.Unlock()
// Check if operation is already cancelled
if status, ok := g.statuses[id]; ok && status.Cancelled {
g.Unlock()
return fmt.Errorf("operation %q is already cancelled", id)
}
cancelFunc, exists := g.cancellations[id]
if !exists {
cancelFunc, localExists := g.cancellations[id]
if localExists {
delete(g.cancellations, id)
}
nc := g.natsClient
if !localExists && nc == nil {
g.Unlock()
return fmt.Errorf("operation %q not found or already completed", id)
}
// Cancel the operation
cancelFunc()
// Publish cancellation to NATS in distributed mode
if g.natsClient != nil {
g.natsClient.Publish(messaging.SubjectGalleryCancel(id), map[string]string{"id": id})
}
// Update status to reflect cancellation
if status, ok := g.statuses[id]; ok {
status.Cancelled = true
status.Processed = true
status.Message = "cancelled"
} else {
// Create status for queued operations that haven't started yet
g.statuses[id] = &OpStatus{
Cancelled: true,
Processed: true,
@@ -239,13 +313,59 @@ func (g *GalleryService) CancelOperation(id string) error {
Cancellable: false,
}
}
g.Unlock()
// Clean up cancellation function
delete(g.cancellations, id)
// I/O and user-provided callback after Unlock — the cancel-wildcard
// subscriber loops back into applyCancel on this same replica, which
// would otherwise deadlock on g.Mutex.
if cancelFunc != nil {
cancelFunc()
}
if nc != nil {
if err := nc.Publish(messaging.SubjectGalleryCancel(id), GalleryCancelEvent{JobID: id}); err != nil {
xlog.Warn("Failed to broadcast gallery cancel", "op_id", id, "error", err)
}
}
return nil
}
// applyCancel is the broadcast-side counterpart to CancelOperation. The
// wildcard subscriber calls it when a peer publishes a cancel event:
// run the local cancel func if we have one (no echo via NATS), and reflect
// the cancellation in the local statuses map. Idempotent: a replica that
// already cancelled this op locally treats the inbound event as a no-op.
func (g *GalleryService) applyCancel(id string) {
g.Lock()
cancelFunc, hasCancel := g.cancellations[id]
if hasCancel {
delete(g.cancellations, id)
}
if status, ok := g.statuses[id]; ok {
if status.Cancelled {
g.Unlock()
return
}
status.Cancelled = true
status.Processed = true
status.Message = "cancelled"
} else {
g.statuses[id] = &OpStatus{
Cancelled: true,
Processed: true,
Message: "cancelled",
Cancellable: false,
}
}
g.Unlock()
// Invoke the cancel func after Unlock so a callback that touches
// GalleryService doesn't re-enter the mutex.
if hasCancel {
cancelFunc()
}
}
// storeCancellation stores a cancellation function for an operation
func (g *GalleryService) storeCancellation(id string, cancelFunc context.CancelFunc) {
g.Lock()
@@ -345,3 +465,147 @@ func (g *GalleryService) Start(c context.Context, cl *config.ModelConfigLoader,
return nil
}
// SubscribeBroadcasts opens the wildcard subscriptions that keep this
// replica's in-memory statuses + cancellation state in sync with peers.
// Returns an error if the progress subscription fails; cancel-sub failures
// are not fatal but are logged.
//
// Hydrate should be called before this so the freshly-started replica has
// the pre-existing operations before live updates start flowing.
func (g *GalleryService) SubscribeBroadcasts() error {
g.Lock()
nc := g.natsClient
g.Unlock()
if nc == nil {
return nil
}
progressSub, err := messaging.SubscribeJSON(nc, messaging.SubjectGalleryProgressWildcard, func(evt GalleryProgressEvent) {
if evt.JobID == "" || evt.Status == nil {
return
}
g.mergeStatus(evt.JobID, evt.Status)
})
if err != nil {
return fmt.Errorf("subscribing to gallery progress wildcard: %w", err)
}
cancelSub, err := messaging.SubscribeJSON(nc, messaging.SubjectGalleryCancelWildcard, func(evt GalleryCancelEvent) {
if evt.JobID == "" {
return
}
g.applyCancel(evt.JobID)
})
if err != nil {
if uerr := progressSub.Unsubscribe(); uerr != nil {
xlog.Warn("failed to unsubscribe partial gallery progress sub", "error", uerr)
}
return fmt.Errorf("subscribing to gallery cancel wildcard: %w", err)
}
modelsSub, err := messaging.SubscribeJSON(nc, messaging.SubjectCacheInvalidateModels, func(evt messaging.CacheInvalidateEvent) {
g.Lock()
cb := g.OnModelsChanged
g.Unlock()
if cb != nil {
cb(evt)
}
})
if err != nil {
if uerr := progressSub.Unsubscribe(); uerr != nil {
xlog.Warn("failed to unsubscribe partial gallery progress sub", "error", uerr)
}
if uerr := cancelSub.Unsubscribe(); uerr != nil {
xlog.Warn("failed to unsubscribe partial gallery cancel sub", "error", uerr)
}
return fmt.Errorf("subscribing to models invalidation: %w", err)
}
backendsSub, err := messaging.SubscribeJSON(nc, messaging.SubjectCacheInvalidateBackends, func(_ messaging.CacheInvalidateEvent) {
g.Lock()
cb := g.OnBackendOpCompleted
g.Unlock()
if cb != nil {
// Run off-goroutine so a slow UpgradeChecker doesn't stall the
// NATS receive loop. Matches the local fire-after-install path.
go cb()
}
})
if err != nil {
if uerr := progressSub.Unsubscribe(); uerr != nil {
xlog.Warn("failed to unsubscribe partial gallery progress sub", "error", uerr)
}
if uerr := cancelSub.Unsubscribe(); uerr != nil {
xlog.Warn("failed to unsubscribe partial gallery cancel sub", "error", uerr)
}
if uerr := modelsSub.Unsubscribe(); uerr != nil {
xlog.Warn("failed to unsubscribe partial models sub", "error", uerr)
}
return fmt.Errorf("subscribing to backends invalidation: %w", err)
}
g.Lock()
g.broadcastSubs = append(g.broadcastSubs, progressSub, cancelSub, modelsSub, backendsSub)
g.Unlock()
return nil
}
// CloseBroadcasts drops the wildcard subscriptions. Safe to call multiple times.
func (g *GalleryService) CloseBroadcasts() {
g.Lock()
subs := g.broadcastSubs
g.broadcastSubs = nil
g.Unlock()
for _, s := range subs {
if err := s.Unsubscribe(); err != nil {
xlog.Warn("GalleryService unsubscribe failed", "error", err)
}
}
}
// Hydrate loads still-active operations from the GalleryStore into the
// in-memory statuses map so a freshly-started replica does not return an
// empty /api/operations payload while a peer is mid-install. Idempotent.
// No-op when no store is wired.
//
// The reconstructed OpStatus carries no Error type — the DB stores the
// message as a string and Hydrate surfaces it via errors.New so the UI's
// "operation failed" banner survives a frontend restart.
func (g *GalleryService) Hydrate() error {
g.Lock()
store := g.galleryStore
g.Unlock()
if store == nil {
return nil
}
ops, err := store.ListActive()
if err != nil {
return fmt.Errorf("listing active gallery ops: %w", err)
}
g.Lock()
defer g.Unlock()
for _, op := range ops {
// Skip rows that already have an in-memory status — the live
// broadcast subscriber will fill any gaps with fresher data.
if _, ok := g.statuses[op.ID]; ok {
continue
}
st := &OpStatus{
Message: op.Message,
Progress: op.Progress,
FileName: op.FileName,
TotalFileSize: op.TotalFileSize,
DownloadedFileSize: op.DownloadedFileSize,
GalleryElementName: op.GalleryElementName,
Cancellable: op.Cancellable,
Deletion: op.OpType == "model_delete",
}
if op.Error != "" {
st.Error = errors.New(op.Error)
}
g.statuses[op.ID] = st
}
xlog.Info("Hydrated gallery service statuses from store", "count", len(ops))
return nil
}

View File

@@ -64,6 +64,16 @@ func SubjectGalleryProgress(opID string) string {
return subjectGalleryPrefix + sanitizeSubjectToken(opID) + ".progress"
}
// SubjectGalleryOpStart and SubjectGalleryOpEnd are broadcast subjects for the
// in-memory OpCache lifecycle. Frontend replicas publish to these when an
// admin admits a new install/delete (Start) and when an operation is
// dismissed (End), so peer replicas can keep their OpCache in sync without
// hitting PostgreSQL on every UI poll.
const (
SubjectGalleryOpStart = "gallery.opcache.start"
SubjectGalleryOpEnd = "gallery.opcache.end"
)
// Control Signals (Pub/Sub — targeted cancellation)
const (
subjectJobCancelPrefix = "jobs."
@@ -321,8 +331,26 @@ func SubjectNodeFilesListDir(nodeID string) string {
// Cache Invalidation (Pub/Sub — broadcast to all instances)
const (
SubjectCacheInvalidateSkills = "cache.invalidate.skills"
// SubjectCacheInvalidateModels is broadcast by the replica that completed
// a model install/delete. Peers subscribe and re-run
// ModelConfigLoader.LoadModelConfigsFromPath so a chat completion routed
// to a different replica can find the newly installed model.
SubjectCacheInvalidateModels = "cache.invalidate.models"
// SubjectCacheInvalidateBackends is broadcast after a backend
// install/upgrade/delete. Peers retrigger their UpgradeChecker so the
// 6-hour upgrade-available cache flips to fresh on every replica, not
// just the one that handled the request.
SubjectCacheInvalidateBackends = "cache.invalidate.backends"
)
// CacheInvalidateEvent is the payload for cache invalidation broadcasts.
// Element names a specific model/backend when known; empty means "the whole
// set was touched, do a full reload."
type CacheInvalidateEvent struct {
Element string `json:"element,omitempty"`
Op string `json:"op,omitempty"` // "install" | "delete" | "upgrade"
}
// SubjectCacheInvalidateCollection returns the NATS subject for collection cache invalidation.
func SubjectCacheInvalidateCollection(name string) string {
return "cache.invalidate.collections." + sanitizeSubjectToken(name)