diff --git a/core/services/messaging/client.go b/core/services/messaging/client.go index 31257f1fd..e01c7d9ca 100644 --- a/core/services/messaging/client.go +++ b/core/services/messaging/client.go @@ -22,6 +22,14 @@ const subscribeConfirmTimeout = 5 * time.Second type Client struct { conn *nats.Conn mu sync.RWMutex + + // reconnectCbs are invoked after the underlying connection is + // re-established. nats.go transparently resubscribes existing + // subscriptions on reconnect, but it cannot know that a consumer kept + // derived in-memory state (e.g. syncstate.SyncedMap) that may have drifted + // while the link was down — these callbacks let such consumers re-hydrate. + cbMu sync.Mutex + reconnectCbs []func() } // New creates a new NATS client with auto-reconnect. @@ -31,6 +39,10 @@ func New(url string, opts ...Option) (*Client, error) { o(&cfg) } + // Allocate the client up front so the reconnect handler closure can reach + // it; conn is populated after nats.Connect succeeds below. + c := &Client{} + natsOpts := []nats.Option{ nats.RetryOnFailedConnect(true), nats.MaxReconnects(-1), @@ -41,6 +53,7 @@ func New(url string, opts ...Option) (*Client, error) { }), nats.ReconnectHandler(func(_ *nats.Conn) { xlog.Info("NATS reconnected") + c.runReconnectCallbacks() }), nats.ClosedHandler(func(_ *nats.Conn) { xlog.Info("NATS connection closed") @@ -103,7 +116,33 @@ func New(url string, opts ...Option) (*Client, error) { return nil, fmt.Errorf("connecting to NATS at %s: %w", sanitize.URL(url), err) } - return &Client{conn: nc}, nil + c.conn = nc + return c, nil +} + +// OnReconnect registers a callback invoked after the NATS connection is +// re-established. It is consumed via an optional interface type-assertion +// (interface{ OnReconnect(func()) }) rather than being added to MessagingClient, +// so the messaging abstraction stays minimal and standalone/test clients are not +// forced to implement reconnect semantics. A nil callback is ignored. +func (c *Client) OnReconnect(cb func()) { + if cb == nil { + return + } + c.cbMu.Lock() + c.reconnectCbs = append(c.reconnectCbs, cb) + c.cbMu.Unlock() +} + +// runReconnectCallbacks invokes registered reconnect callbacks. It copies the +// slice under the lock so a callback that (re)registers cannot deadlock. +func (c *Client) runReconnectCallbacks() { + c.cbMu.Lock() + cbs := append([]func(){}, c.reconnectCbs...) + c.cbMu.Unlock() + for _, cb := range cbs { + cb() + } } // Publish marshals data as JSON and publishes it to the given subject. diff --git a/core/services/messaging/subjects.go b/core/services/messaging/subjects.go index 7d099460c..d2b11f535 100644 --- a/core/services/messaging/subjects.go +++ b/core/services/messaging/subjects.go @@ -380,6 +380,20 @@ func SubjectCacheInvalidateCollection(name string) string { return "cache.invalidate.collections." + sanitizeSubjectToken(name) } +// SyncedMap State Sync (Pub/Sub — broadcast to all frontends) +// +// The reusable syncstate.SyncedMap component publishes a {op,key,value} delta on +// this subject whenever a replica mutates a piece of cross-replica in-memory +// state. Peers subscribe and apply the delta to their own map, so a round-robin +// API request that lands on a replica which did not originate the change still +// sees it. Convergence on (re)connect is done by re-hydrating from the durable +// source, so no request/reply snapshot subject is needed here. +func SubjectSyncStateDelta(name string) string { + return subjectSyncStatePrefix + sanitizeSubjectToken(name) + ".delta" +} + +const subjectSyncStatePrefix = "state." + // Prefix-Cache Routing Sync (Pub/Sub - broadcast to all frontends) // // Frontends share prefix-cache observations so a request routed to any replica diff --git a/core/services/syncstate/syncstate.go b/core/services/syncstate/syncstate.go new file mode 100644 index 000000000..8c3e673c3 --- /dev/null +++ b/core/services/syncstate/syncstate.go @@ -0,0 +1,286 @@ +// Package syncstate provides SyncedMap, a reusable cross-replica in-memory map. +// +// LocalAI in distributed mode runs multiple frontend replicas behind a +// round-robin load balancer. Several features keep process-local in-memory state +// that is surfaced to the HTTP/UI API; without cross-replica sync a poll that +// lands on a replica which did not originate a change sees stale or missing data. +// SyncedMap collapses the three legs each feature otherwise hand-wires - an +// in-memory map, a NATS broadcast/apply path, and optional durable read-through - +// into one well-tested component so cross-replica consistency is a configuration +// choice rather than a bespoke re-implementation. +package syncstate + +import ( + "context" + "sync" + "time" + + "github.com/mudler/LocalAI/core/services/messaging" + "github.com/mudler/xlog" +) + +// Op values carried on the wire and passed to OnApply. +const ( + opSet = "set" + opDelete = "delete" +) + +// Store is optional durable backing for a SyncedMap. In distributed mode it is a +// single shared DB, so the apply path (a delta received from a peer) updates +// memory only and never re-writes the Store. +type Store[K comparable, V any] interface { + List(ctx context.Context) ([]V, error) + Upsert(ctx context.Context, v V) error + Delete(ctx context.Context, k K) error +} + +// Config configures a SyncedMap. +type Config[K comparable, V any] struct { + Name string // subject namespace, e.g. "finetune.jobs" + Key func(V) K // extract the key from a value + Nats messaging.MessagingClient // nil => standalone: in-memory only, no broadcast/subscribe + Store Store[K, V] // optional read-through persistence + Loader func(ctx context.Context) ([]V, error) // source when there is no Store (e.g. disk reload) + OnApply func(op string, k K, v V) // optional hook after an applied change (e.g. ShutdownModel) + Reconcile time.Duration // optional periodic re-hydrate; 0 = off +} + +// delta is the JSON wire envelope broadcast on every local mutation. Value is +// omitempty so a delete carries only op+key. +type delta[K comparable, V any] struct { + Op string `json:"op"` + Key K `json:"key"` + Value V `json:"value,omitempty"` +} + +// SyncedMap is a cross-replica in-memory map. A local write (Set/Delete) updates +// memory, the optional durable Store, then broadcasts a delta to peers. A peer's +// delta updates memory only and fires OnApply - it never re-broadcasts and never +// writes the Store. That structural split is the echo-loop guard (same pattern as +// galleryop.mergeStatus / OpCache.applyStart): receiving your own broadcast just +// re-applies an idempotent value to memory, so there is no storm and no +// double-write. +type SyncedMap[K comparable, V any] struct { + cfg Config[K, V] + + mu sync.RWMutex + data map[K]V + + sub Subscription + + // lifeCtx outlives Start's argument: a reconnect callback or reconcile tick + // can fire long after Start returns, so they must not be tied to a ctx the + // caller may cancel. Close cancels it. + lifeCtx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +// Subscription is the subset of messaging.Subscription the component holds onto. +type Subscription = messaging.Subscription + +// New constructs a SyncedMap. Call Start to hydrate and begin syncing. +func New[K comparable, V any](cfg Config[K, V]) *SyncedMap[K, V] { + return &SyncedMap[K, V]{cfg: cfg, data: make(map[K]V)} +} + +func (m *SyncedMap[K, V]) subject() string { + return messaging.SubjectSyncStateDelta(m.cfg.Name) +} + +// Start hydrates from the source, subscribes for peer deltas, registers a +// reconnect re-hydrate (when the client supports it), and starts the optional +// reconcile ticker. +func (m *SyncedMap[K, V]) Start(ctx context.Context) error { + if err := m.hydrate(ctx); err != nil { + return err + } + + m.lifeCtx, m.cancel = context.WithCancel(context.Background()) + + if m.cfg.Nats != nil { + sub, err := messaging.SubscribeJSON(m.cfg.Nats, m.subject(), m.apply) + if err != nil { + return err + } + m.sub = sub + + // nats.go transparently resubscribes on reconnect, but it cannot know we + // kept derived in-memory state that may have drifted while the link was + // down, so re-hydrate from the durable source. Detected via an optional + // interface so MessagingClient itself stays minimal; standalone/test + // clients without the method simply fall back to the reconcile ticker. + if r, ok := m.cfg.Nats.(interface{ OnReconnect(func()) }); ok { + r.OnReconnect(func() { + if err := m.hydrate(m.lifeCtx); err != nil { + xlog.Warn("syncstate: reconnect re-hydrate failed", "name", m.cfg.Name, "error", err) + } + }) + } + } + + if m.cfg.Reconcile > 0 { + m.wg.Add(1) + go m.reconcileLoop() + } + return nil +} + +// Close unsubscribes and stops the reconcile ticker. +func (m *SyncedMap[K, V]) Close() error { + if m.cancel != nil { + m.cancel() + } + m.wg.Wait() + if m.sub != nil { + return m.sub.Unsubscribe() + } + return nil +} + +// Set updates the value locally, writes through the Store, then broadcasts. +// Per the data-flow contract the Store write happens under the lock so memory and +// durable state move together; the broadcast is best-effort after unlocking. +func (m *SyncedMap[K, V]) Set(ctx context.Context, v V) error { + k := m.cfg.Key(v) + m.mu.Lock() + m.data[k] = v + if m.cfg.Store != nil { + if err := m.cfg.Store.Upsert(ctx, v); err != nil { + m.mu.Unlock() + return err + } + } + m.mu.Unlock() + m.publish(opSet, k, v) + return nil +} + +// Delete removes the key locally, deletes it from the Store, then broadcasts. +func (m *SyncedMap[K, V]) Delete(ctx context.Context, k K) error { + m.mu.Lock() + delete(m.data, k) + if m.cfg.Store != nil { + if err := m.cfg.Store.Delete(ctx, k); err != nil { + m.mu.Unlock() + return err + } + } + m.mu.Unlock() + var zero V + m.publish(opDelete, k, zero) + return nil +} + +// Get returns the value for k and whether it was present. +func (m *SyncedMap[K, V]) Get(k K) (V, bool) { + m.mu.RLock() + defer m.mu.RUnlock() + v, ok := m.data[k] + return v, ok +} + +// List returns a snapshot slice of all values. +func (m *SyncedMap[K, V]) List() []V { + m.mu.RLock() + defer m.mu.RUnlock() + out := make([]V, 0, len(m.data)) + for _, v := range m.data { + out = append(out, v) + } + return out +} + +// Snapshot returns a copy of the underlying map. +func (m *SyncedMap[K, V]) Snapshot() map[K]V { + m.mu.RLock() + defer m.mu.RUnlock() + out := make(map[K]V, len(m.data)) + for k, v := range m.data { + out[k] = v + } + return out +} + +// publish broadcasts a delta. Standalone (nil Nats) is a strict no-op. +func (m *SyncedMap[K, V]) publish(op string, k K, v V) { + if m.cfg.Nats == nil { + return + } + if err := m.cfg.Nats.Publish(m.subject(), delta[K, V]{Op: op, Key: k, Value: v}); err != nil { + xlog.Warn("syncstate: failed to broadcast delta", "name", m.cfg.Name, "op", op, "error", err) + } +} + +// apply handles a peer's delta: memory-only update plus OnApply. It deliberately +// never writes the Store nor re-publishes - that is the echo-loop guard. +func (m *SyncedMap[K, V]) apply(d delta[K, V]) { + switch d.Op { + case opSet: + m.mu.Lock() + m.data[d.Key] = d.Value + m.mu.Unlock() + case opDelete: + m.mu.Lock() + delete(m.data, d.Key) + m.mu.Unlock() + default: + xlog.Warn("syncstate: ignoring delta with unknown op", "name", m.cfg.Name, "op", d.Op) + return + } + if m.cfg.OnApply != nil { + m.cfg.OnApply(d.Op, d.Key, d.Value) + } +} + +// hydrate replaces the whole map from the durable source: Store if present, else +// Loader. With neither, a late joiner starts empty and catches up via deltas +// (acceptable only for ephemeral state). +func (m *SyncedMap[K, V]) hydrate(ctx context.Context) error { + var ( + vals []V + err error + ) + switch { + case m.cfg.Store != nil: + vals, err = m.cfg.Store.List(ctx) + case m.cfg.Loader != nil: + vals, err = m.cfg.Loader(ctx) + default: + return nil + } + if err != nil { + return err + } + m.replaceAll(vals) + return nil +} + +// replaceAll atomically swaps the map contents for the given values, keyed via +// cfg.Key. +func (m *SyncedMap[K, V]) replaceAll(vals []V) { + next := make(map[K]V, len(vals)) + for _, v := range vals { + next[m.cfg.Key(v)] = v + } + m.mu.Lock() + m.data = next + m.mu.Unlock() +} + +// reconcileLoop periodically re-hydrates to repair silent drift (missed deltas). +func (m *SyncedMap[K, V]) reconcileLoop() { + defer m.wg.Done() + t := time.NewTicker(m.cfg.Reconcile) + defer t.Stop() + for { + select { + case <-m.lifeCtx.Done(): + return + case <-t.C: + if err := m.hydrate(m.lifeCtx); err != nil { + xlog.Warn("syncstate: reconcile re-hydrate failed", "name", m.cfg.Name, "error", err) + } + } + } +} diff --git a/core/services/syncstate/syncstate_suite_test.go b/core/services/syncstate/syncstate_suite_test.go new file mode 100644 index 000000000..b4f025c9e --- /dev/null +++ b/core/services/syncstate/syncstate_suite_test.go @@ -0,0 +1,13 @@ +package syncstate_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestSyncstate(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Syncstate Suite") +} diff --git a/core/services/syncstate/syncstate_test.go b/core/services/syncstate/syncstate_test.go new file mode 100644 index 000000000..1e31db41b --- /dev/null +++ b/core/services/syncstate/syncstate_test.go @@ -0,0 +1,291 @@ +package syncstate_test + +import ( + "context" + "sync" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/mudler/LocalAI/core/services/messaging" + "github.com/mudler/LocalAI/core/services/syncstate" + "github.com/mudler/LocalAI/core/services/testutil" +) + +// job is a minimal JSON-serializable value stand-in for the real cross-replica +// records (finetune/quant/agent jobs) the component is built for. +type job struct { + ID string `json:"id"` + Status string `json:"status"` +} + +func jobKey(j *job) string { return j.ID } + +const stateName = "test.jobs" + +func deltaSubject() string { return messaging.SubjectSyncStateDelta(stateName) } + +// fakeStore is an in-memory Store that records call counts so specs can assert +// the write-through-vs-apply split (local writes hit the Store; applied deltas +// must not). +type fakeStore struct { + mu sync.Mutex + data map[string]*job + upsertCalls int + deleteCalls int + listCalls int +} + +func newFakeStore(seed ...*job) *fakeStore { + s := &fakeStore{data: map[string]*job{}} + for _, j := range seed { + s.data[j.ID] = j + } + return s +} + +func (s *fakeStore) List(_ context.Context) ([]*job, error) { + s.mu.Lock() + defer s.mu.Unlock() + s.listCalls++ + out := make([]*job, 0, len(s.data)) + for _, j := range s.data { + out = append(out, j) + } + return out, nil +} + +func (s *fakeStore) Upsert(_ context.Context, j *job) error { + s.mu.Lock() + defer s.mu.Unlock() + s.upsertCalls++ + s.data[j.ID] = j + return nil +} + +func (s *fakeStore) Delete(_ context.Context, k string) error { + s.mu.Lock() + defer s.mu.Unlock() + s.deleteCalls++ + delete(s.data, k) + return nil +} + +// add simulates a peer replica writing to the shared DB out-of-band (e.g. while +// this replica was partitioned), so a re-hydrate can be observed to pick it up. +func (s *fakeStore) add(j *job) { + s.mu.Lock() + defer s.mu.Unlock() + s.data[j.ID] = j +} + +func (s *fakeStore) counts() (upsert, del, list int) { + s.mu.Lock() + defer s.mu.Unlock() + return s.upsertCalls, s.deleteCalls, s.listCalls +} + +var _ = Describe("SyncedMap", func() { + ctx := context.Background() + + Describe("cross-replica delta propagation", func() { + var ( + bus *testutil.FakeBus + a, b *syncstate.SyncedMap[string, *job] + ) + + BeforeEach(func() { + bus = testutil.NewFakeBus() + a = syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus}) + b = syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus}) + Expect(a.Start(ctx)).To(Succeed()) + Expect(b.Start(ctx)).To(Succeed()) + }) + + AfterEach(func() { + Expect(a.Close()).To(Succeed()) + Expect(b.Close()).To(Succeed()) + }) + + It("propagates a Set on A to B", func() { + Expect(a.Set(ctx, &job{ID: "1", Status: "running"})).To(Succeed()) + + got, ok := b.Get("1") + Expect(ok).To(BeTrue(), "replica B should see the value A just set") + Expect(got.Status).To(Equal("running")) + }) + + It("prunes a Delete on A from B", func() { + Expect(a.Set(ctx, &job{ID: "1", Status: "running"})).To(Succeed()) + _, present := b.Get("1") + Expect(present).To(BeTrue(), "precondition: B must have the value before the delete") + + Expect(a.Delete(ctx, "1")).To(Succeed()) + + _, ok := b.Get("1") + Expect(ok).To(BeFalse(), "a delete on A must remove the key from B") + }) + }) + + Describe("hydration", func() { + It("hydrates on Start from a preloaded Store", func() { + store := newFakeStore(&job{ID: "x", Status: "done"}) + m := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Store: store}) + Expect(m.Start(ctx)).To(Succeed()) + + got, ok := m.Get("x") + Expect(ok).To(BeTrue(), "Start must populate the map from the Store") + Expect(got.Status).To(Equal("done")) + }) + + It("uses the Loader when Store is nil", func() { + m := syncstate.New(syncstate.Config[string, *job]{ + Name: stateName, + Key: jobKey, + Loader: func(_ context.Context) ([]*job, error) { + return []*job{{ID: "l", Status: "loaded"}}, nil + }, + }) + Expect(m.Start(ctx)).To(Succeed()) + + got, ok := m.Get("l") + Expect(ok).To(BeTrue(), "Loader output must hydrate the map when there is no Store") + Expect(got.Status).To(Equal("loaded")) + }) + }) + + Describe("echo-loop guard", func() { + It("applies its own broadcast once and does not re-publish", func() { + bus := testutil.NewFakeBus() + a := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus}) + b := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus}) + Expect(a.Start(ctx)).To(Succeed()) + Expect(b.Start(ctx)).To(Succeed()) + defer func() { + Expect(a.Close()).To(Succeed()) + Expect(b.Close()).To(Succeed()) + }() + + Expect(a.Set(ctx, &job{ID: "e", Status: "running"})).To(Succeed()) + + // One local write must produce exactly one broadcast: A and B both + // receive it and apply to memory, but the apply path never re-publishes. + Expect(bus.PublishCount(deltaSubject())).To(Equal(1), + "the apply path must not re-broadcast, otherwise replicas storm") + Expect(a.List()).To(HaveLen(1), "A must not double-store its own echo") + _, ok := b.Get("e") + Expect(ok).To(BeTrue()) + }) + }) + + Describe("Store write-through vs apply", func() { + It("writes the Store on local Set/Delete but not on an applied delta", func() { + bus := testutil.NewFakeBus() + storeA := newFakeStore() + storeB := newFakeStore() + a := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus, Store: storeA}) + b := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus, Store: storeB}) + Expect(a.Start(ctx)).To(Succeed()) + Expect(b.Start(ctx)).To(Succeed()) + defer func() { + Expect(a.Close()).To(Succeed()) + Expect(b.Close()).To(Succeed()) + }() + + Expect(a.Set(ctx, &job{ID: "w", Status: "running"})).To(Succeed()) + + upA, _, _ := storeA.counts() + upB, _, _ := storeB.counts() + Expect(upA).To(Equal(1), "local Set must write through to its own Store") + Expect(upB).To(Equal(0), "the apply path must never write the peer's Store") + + Expect(a.Delete(ctx, "w")).To(Succeed()) + _, delA, _ := storeA.counts() + _, delB, _ := storeB.counts() + Expect(delA).To(Equal(1), "local Delete must delete from its own Store") + Expect(delB).To(Equal(0), "the apply path must never delete from the peer's Store") + }) + }) + + Describe("OnApply hook", func() { + It("fires with the correct op and key on an applied delta", func() { + bus := testutil.NewFakeBus() + var ( + mu sync.Mutex + ops []string + keys []string + ) + a := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus}) + b := syncstate.New(syncstate.Config[string, *job]{ + Name: stateName, Key: jobKey, Nats: bus, + OnApply: func(op string, k string, _ *job) { + mu.Lock() + ops = append(ops, op) + keys = append(keys, k) + mu.Unlock() + }, + }) + Expect(a.Start(ctx)).To(Succeed()) + Expect(b.Start(ctx)).To(Succeed()) + defer func() { + Expect(a.Close()).To(Succeed()) + Expect(b.Close()).To(Succeed()) + }() + + Expect(a.Set(ctx, &job{ID: "o", Status: "running"})).To(Succeed()) + Expect(a.Delete(ctx, "o")).To(Succeed()) + + mu.Lock() + defer mu.Unlock() + Expect(ops).To(Equal([]string{"set", "delete"})) + Expect(keys).To(Equal([]string{"o", "o"})) + }) + }) + + Describe("standalone (nil Nats)", func() { + It("works in-memory with no panic and nothing to broadcast", func() { + m := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey}) + Expect(m.Start(ctx)).To(Succeed()) + defer func() { Expect(m.Close()).To(Succeed()) }() + + Expect(func() { + Expect(m.Set(ctx, &job{ID: "s", Status: "running"})).To(Succeed()) + }).ToNot(Panic()) + + got, ok := m.Get("s") + Expect(ok).To(BeTrue()) + Expect(got.Status).To(Equal("running")) + Expect(m.List()).To(HaveLen(1)) + Expect(m.Snapshot()).To(HaveKey("s")) + + Expect(m.Delete(ctx, "s")).To(Succeed()) + _, ok = m.Get("s") + Expect(ok).To(BeFalse()) + }) + }) + + Describe("reconnect re-hydrate", func() { + It("re-reads the source when the messaging client reconnects", func() { + bus := testutil.NewFakeBus() + store := newFakeStore(&job{ID: "init", Status: "running"}) + m := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus, Store: store}) + Expect(m.Start(ctx)).To(Succeed()) + defer func() { Expect(m.Close()).To(Succeed()) }() + + _, ok := m.Get("init") + Expect(ok).To(BeTrue()) + + // A peer writes to the shared DB while we are unaware (no delta seen). + store.add(&job{ID: "late", Status: "running"}) + _, ok = m.Get("late") + Expect(ok).To(BeFalse(), "the new row should not appear before a re-hydrate") + + bus.TriggerReconnect() + + _, ok = m.Get("late") + Expect(ok).To(BeTrue(), "reconnect must re-hydrate from the source and pick up drift") + _, _, list := store.counts() + Expect(list).To(Equal(2), "exactly one Start hydrate plus one reconnect re-hydrate") + }) + }) +}) diff --git a/core/services/testutil/fakebus.go b/core/services/testutil/fakebus.go new file mode 100644 index 000000000..7452d810f --- /dev/null +++ b/core/services/testutil/fakebus.go @@ -0,0 +1,160 @@ +package testutil + +import ( + "encoding/json" + "strings" + "sync" + "time" + + "github.com/mudler/LocalAI/core/services/messaging" +) + +// FakeBus is an in-memory messaging.MessagingClient that delivers each published +// message synchronously to every registered subscriber whose subject filter +// matches, including NATS-style wildcard subjects (`*` matches exactly one +// token). +// +// Synchronous delivery keeps specs deterministic: the moment Publish returns, +// every matching subscriber's handler has already run, so the spec body can read +// the resulting state without polling. It is the shared test double for every +// cross-replica-sync adopter (gallery, syncstate, ...) so they exercise the same +// delivery semantics. It deliberately depends only on the standard library and +// the messaging package — no test framework — so it is importable anywhere. +type FakeBus struct { + mu sync.Mutex + subs []fakeBusSub + // publishCounts records how many messages were published per subject, so a + // spec can assert the echo-loop guard (an applied delta must not re-publish). + publishCounts map[string]int + + // reconnectCbs back the optional OnReconnect/TriggerReconnect pair, letting a + // spec exercise the component's reconnect re-hydrate path without a real + // NATS server. + reconnectCbs []func() +} + +type fakeBusSub struct { + subject string + handler func([]byte) +} + +// NewFakeBus returns a ready-to-use in-memory bus. +func NewFakeBus() *FakeBus { + return &FakeBus{publishCounts: map[string]int{}} +} + +// subjectMatches reports whether a subscription filter matches a concrete +// subject, honoring the single-token `*` wildcard used by NATS. +func subjectMatches(filter, subject string) bool { + if filter == subject { + return true + } + fp := strings.Split(filter, ".") + sp := strings.Split(subject, ".") + if len(fp) != len(sp) { + return false + } + for i := range fp { + if fp[i] == "*" { + continue + } + if fp[i] != sp[i] { + return false + } + } + return true +} + +// Publish marshals data as JSON and delivers it synchronously to every matching +// subscriber. +func (b *FakeBus) Publish(subject string, data any) error { + payload, err := json.Marshal(data) + if err != nil { + return err + } + b.mu.Lock() + b.publishCounts[subject]++ + subs := append([]fakeBusSub(nil), b.subs...) + b.mu.Unlock() + for _, s := range subs { + if subjectMatches(s.subject, subject) { + s.handler(payload) + } + } + return nil +} + +// PublishCount returns how many messages were published on the exact subject. +func (b *FakeBus) PublishCount(subject string) int { + b.mu.Lock() + defer b.mu.Unlock() + return b.publishCounts[subject] +} + +type fakeBusSubscription struct { + bus *FakeBus + subRef fakeBusSub +} + +func (s *fakeBusSubscription) Unsubscribe() error { + s.bus.mu.Lock() + defer s.bus.mu.Unlock() + for i, candidate := range s.bus.subs { + if candidate.subject == s.subRef.subject { + s.bus.subs = append(s.bus.subs[:i], s.bus.subs[i+1:]...) + return nil + } + } + return nil +} + +func (b *FakeBus) Subscribe(subject string, handler func([]byte)) (messaging.Subscription, error) { + sub := fakeBusSub{subject: subject, handler: handler} + b.mu.Lock() + b.subs = append(b.subs, sub) + b.mu.Unlock() + return &fakeBusSubscription{bus: b, subRef: sub}, nil +} + +func (b *FakeBus) QueueSubscribe(subject, _ string, handler func([]byte)) (messaging.Subscription, error) { + return b.Subscribe(subject, handler) +} + +func (b *FakeBus) QueueSubscribeReply(string, string, func([]byte, func([]byte))) (messaging.Subscription, error) { + return &fakeBusSubscription{bus: b}, nil +} + +func (b *FakeBus) SubscribeReply(string, func([]byte, func([]byte))) (messaging.Subscription, error) { + return &fakeBusSubscription{bus: b}, nil +} + +func (b *FakeBus) Request(string, []byte, time.Duration) ([]byte, error) { + return nil, nil +} + +func (b *FakeBus) IsConnected() bool { return true } +func (b *FakeBus) Close() {} + +// OnReconnect mirrors *messaging.Client.OnReconnect so a spec can drive the +// component's reconnect re-hydrate path. The component detects this method via an +// optional interface assertion; implementing it here keeps the fake a faithful +// stand-in for the concrete client. +func (b *FakeBus) OnReconnect(cb func()) { + if cb == nil { + return + } + b.mu.Lock() + b.reconnectCbs = append(b.reconnectCbs, cb) + b.mu.Unlock() +} + +// TriggerReconnect runs every registered reconnect callback, simulating a NATS +// reconnect event. +func (b *FakeBus) TriggerReconnect() { + b.mu.Lock() + cbs := append([]func(){}, b.reconnectCbs...) + b.mu.Unlock() + for _, cb := range cbs { + cb() + } +}