feat(distributed): add SyncedMap cross-replica in-memory state component

Introduce core/services/syncstate.SyncedMap[K,V]: a thread-safe in-memory map
that keeps itself consistent across frontend replicas via NATS, with an optional
pluggable durable Store and hydrate-from-source convergence.

Several features keep process-local state surfaced to the API (finetune/quant
jobs, agent tasks, model configs) and each hand-wired the same in-memory + NATS
broadcast + read-through-store legs - or forgot to, reintroducing cross-replica
staleness. SyncedMap makes that consistency a configuration choice:

- local writes mutate the map, write through the Store, then broadcast a delta;
- the apply path is memory-only and never re-publishes or re-writes the Store
  (structural echo-loop guard, mirroring galleryop.mergeStatus);
- on Start and on NATS reconnect the map re-hydrates from the source (Store, else
  Loader); an optional periodic Reconcile repairs silent drift;
- standalone mode (nil NATS client) is a strict in-memory no-op.

Reconnect re-hydrate is wired via a new *messaging.Client.OnReconnect callback,
consumed through an optional type-assertion so MessagingClient stays minimal.
Adds messaging.SubjectSyncStateDelta and a reusable testutil.FakeBus (synchronous
in-process MessagingClient with wildcard matching) for adopter tests.

Component only; service migrations follow in subsequent commits.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]
This commit is contained in:
Ettore Di Giacinto
2026-06-26 23:49:41 +00:00
parent 64150ca7ab
commit e4e3fde68b
6 changed files with 804 additions and 1 deletions

View File

@@ -22,6 +22,14 @@ const subscribeConfirmTimeout = 5 * time.Second
type Client struct {
conn *nats.Conn
mu sync.RWMutex
// reconnectCbs are invoked after the underlying connection is
// re-established. nats.go transparently resubscribes existing
// subscriptions on reconnect, but it cannot know that a consumer kept
// derived in-memory state (e.g. syncstate.SyncedMap) that may have drifted
// while the link was down — these callbacks let such consumers re-hydrate.
cbMu sync.Mutex
reconnectCbs []func()
}
// New creates a new NATS client with auto-reconnect.
@@ -31,6 +39,10 @@ func New(url string, opts ...Option) (*Client, error) {
o(&cfg)
}
// Allocate the client up front so the reconnect handler closure can reach
// it; conn is populated after nats.Connect succeeds below.
c := &Client{}
natsOpts := []nats.Option{
nats.RetryOnFailedConnect(true),
nats.MaxReconnects(-1),
@@ -41,6 +53,7 @@ func New(url string, opts ...Option) (*Client, error) {
}),
nats.ReconnectHandler(func(_ *nats.Conn) {
xlog.Info("NATS reconnected")
c.runReconnectCallbacks()
}),
nats.ClosedHandler(func(_ *nats.Conn) {
xlog.Info("NATS connection closed")
@@ -103,7 +116,33 @@ func New(url string, opts ...Option) (*Client, error) {
return nil, fmt.Errorf("connecting to NATS at %s: %w", sanitize.URL(url), err)
}
return &Client{conn: nc}, nil
c.conn = nc
return c, nil
}
// OnReconnect registers a callback invoked after the NATS connection is
// re-established. It is consumed via an optional interface type-assertion
// (interface{ OnReconnect(func()) }) rather than being added to MessagingClient,
// so the messaging abstraction stays minimal and standalone/test clients are not
// forced to implement reconnect semantics. A nil callback is ignored.
func (c *Client) OnReconnect(cb func()) {
if cb == nil {
return
}
c.cbMu.Lock()
c.reconnectCbs = append(c.reconnectCbs, cb)
c.cbMu.Unlock()
}
// runReconnectCallbacks invokes registered reconnect callbacks. It copies the
// slice under the lock so a callback that (re)registers cannot deadlock.
func (c *Client) runReconnectCallbacks() {
c.cbMu.Lock()
cbs := append([]func(){}, c.reconnectCbs...)
c.cbMu.Unlock()
for _, cb := range cbs {
cb()
}
}
// Publish marshals data as JSON and publishes it to the given subject.

View File

@@ -380,6 +380,20 @@ func SubjectCacheInvalidateCollection(name string) string {
return "cache.invalidate.collections." + sanitizeSubjectToken(name)
}
// SyncedMap State Sync (Pub/Sub — broadcast to all frontends)
//
// The reusable syncstate.SyncedMap component publishes a {op,key,value} delta on
// this subject whenever a replica mutates a piece of cross-replica in-memory
// state. Peers subscribe and apply the delta to their own map, so a round-robin
// API request that lands on a replica which did not originate the change still
// sees it. Convergence on (re)connect is done by re-hydrating from the durable
// source, so no request/reply snapshot subject is needed here.
func SubjectSyncStateDelta(name string) string {
return subjectSyncStatePrefix + sanitizeSubjectToken(name) + ".delta"
}
const subjectSyncStatePrefix = "state."
// Prefix-Cache Routing Sync (Pub/Sub - broadcast to all frontends)
//
// Frontends share prefix-cache observations so a request routed to any replica

View File

@@ -0,0 +1,286 @@
// Package syncstate provides SyncedMap, a reusable cross-replica in-memory map.
//
// LocalAI in distributed mode runs multiple frontend replicas behind a
// round-robin load balancer. Several features keep process-local in-memory state
// that is surfaced to the HTTP/UI API; without cross-replica sync a poll that
// lands on a replica which did not originate a change sees stale or missing data.
// SyncedMap collapses the three legs each feature otherwise hand-wires - an
// in-memory map, a NATS broadcast/apply path, and optional durable read-through -
// into one well-tested component so cross-replica consistency is a configuration
// choice rather than a bespoke re-implementation.
package syncstate
import (
"context"
"sync"
"time"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/xlog"
)
// Op values carried on the wire and passed to OnApply.
const (
opSet = "set"
opDelete = "delete"
)
// Store is optional durable backing for a SyncedMap. In distributed mode it is a
// single shared DB, so the apply path (a delta received from a peer) updates
// memory only and never re-writes the Store.
type Store[K comparable, V any] interface {
List(ctx context.Context) ([]V, error)
Upsert(ctx context.Context, v V) error
Delete(ctx context.Context, k K) error
}
// Config configures a SyncedMap.
type Config[K comparable, V any] struct {
Name string // subject namespace, e.g. "finetune.jobs"
Key func(V) K // extract the key from a value
Nats messaging.MessagingClient // nil => standalone: in-memory only, no broadcast/subscribe
Store Store[K, V] // optional read-through persistence
Loader func(ctx context.Context) ([]V, error) // source when there is no Store (e.g. disk reload)
OnApply func(op string, k K, v V) // optional hook after an applied change (e.g. ShutdownModel)
Reconcile time.Duration // optional periodic re-hydrate; 0 = off
}
// delta is the JSON wire envelope broadcast on every local mutation. Value is
// omitempty so a delete carries only op+key.
type delta[K comparable, V any] struct {
Op string `json:"op"`
Key K `json:"key"`
Value V `json:"value,omitempty"`
}
// SyncedMap is a cross-replica in-memory map. A local write (Set/Delete) updates
// memory, the optional durable Store, then broadcasts a delta to peers. A peer's
// delta updates memory only and fires OnApply - it never re-broadcasts and never
// writes the Store. That structural split is the echo-loop guard (same pattern as
// galleryop.mergeStatus / OpCache.applyStart): receiving your own broadcast just
// re-applies an idempotent value to memory, so there is no storm and no
// double-write.
type SyncedMap[K comparable, V any] struct {
cfg Config[K, V]
mu sync.RWMutex
data map[K]V
sub Subscription
// lifeCtx outlives Start's argument: a reconnect callback or reconcile tick
// can fire long after Start returns, so they must not be tied to a ctx the
// caller may cancel. Close cancels it.
lifeCtx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}
// Subscription is the subset of messaging.Subscription the component holds onto.
type Subscription = messaging.Subscription
// New constructs a SyncedMap. Call Start to hydrate and begin syncing.
func New[K comparable, V any](cfg Config[K, V]) *SyncedMap[K, V] {
return &SyncedMap[K, V]{cfg: cfg, data: make(map[K]V)}
}
func (m *SyncedMap[K, V]) subject() string {
return messaging.SubjectSyncStateDelta(m.cfg.Name)
}
// Start hydrates from the source, subscribes for peer deltas, registers a
// reconnect re-hydrate (when the client supports it), and starts the optional
// reconcile ticker.
func (m *SyncedMap[K, V]) Start(ctx context.Context) error {
if err := m.hydrate(ctx); err != nil {
return err
}
m.lifeCtx, m.cancel = context.WithCancel(context.Background())
if m.cfg.Nats != nil {
sub, err := messaging.SubscribeJSON(m.cfg.Nats, m.subject(), m.apply)
if err != nil {
return err
}
m.sub = sub
// nats.go transparently resubscribes on reconnect, but it cannot know we
// kept derived in-memory state that may have drifted while the link was
// down, so re-hydrate from the durable source. Detected via an optional
// interface so MessagingClient itself stays minimal; standalone/test
// clients without the method simply fall back to the reconcile ticker.
if r, ok := m.cfg.Nats.(interface{ OnReconnect(func()) }); ok {
r.OnReconnect(func() {
if err := m.hydrate(m.lifeCtx); err != nil {
xlog.Warn("syncstate: reconnect re-hydrate failed", "name", m.cfg.Name, "error", err)
}
})
}
}
if m.cfg.Reconcile > 0 {
m.wg.Add(1)
go m.reconcileLoop()
}
return nil
}
// Close unsubscribes and stops the reconcile ticker.
func (m *SyncedMap[K, V]) Close() error {
if m.cancel != nil {
m.cancel()
}
m.wg.Wait()
if m.sub != nil {
return m.sub.Unsubscribe()
}
return nil
}
// Set updates the value locally, writes through the Store, then broadcasts.
// Per the data-flow contract the Store write happens under the lock so memory and
// durable state move together; the broadcast is best-effort after unlocking.
func (m *SyncedMap[K, V]) Set(ctx context.Context, v V) error {
k := m.cfg.Key(v)
m.mu.Lock()
m.data[k] = v
if m.cfg.Store != nil {
if err := m.cfg.Store.Upsert(ctx, v); err != nil {
m.mu.Unlock()
return err
}
}
m.mu.Unlock()
m.publish(opSet, k, v)
return nil
}
// Delete removes the key locally, deletes it from the Store, then broadcasts.
func (m *SyncedMap[K, V]) Delete(ctx context.Context, k K) error {
m.mu.Lock()
delete(m.data, k)
if m.cfg.Store != nil {
if err := m.cfg.Store.Delete(ctx, k); err != nil {
m.mu.Unlock()
return err
}
}
m.mu.Unlock()
var zero V
m.publish(opDelete, k, zero)
return nil
}
// Get returns the value for k and whether it was present.
func (m *SyncedMap[K, V]) Get(k K) (V, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
v, ok := m.data[k]
return v, ok
}
// List returns a snapshot slice of all values.
func (m *SyncedMap[K, V]) List() []V {
m.mu.RLock()
defer m.mu.RUnlock()
out := make([]V, 0, len(m.data))
for _, v := range m.data {
out = append(out, v)
}
return out
}
// Snapshot returns a copy of the underlying map.
func (m *SyncedMap[K, V]) Snapshot() map[K]V {
m.mu.RLock()
defer m.mu.RUnlock()
out := make(map[K]V, len(m.data))
for k, v := range m.data {
out[k] = v
}
return out
}
// publish broadcasts a delta. Standalone (nil Nats) is a strict no-op.
func (m *SyncedMap[K, V]) publish(op string, k K, v V) {
if m.cfg.Nats == nil {
return
}
if err := m.cfg.Nats.Publish(m.subject(), delta[K, V]{Op: op, Key: k, Value: v}); err != nil {
xlog.Warn("syncstate: failed to broadcast delta", "name", m.cfg.Name, "op", op, "error", err)
}
}
// apply handles a peer's delta: memory-only update plus OnApply. It deliberately
// never writes the Store nor re-publishes - that is the echo-loop guard.
func (m *SyncedMap[K, V]) apply(d delta[K, V]) {
switch d.Op {
case opSet:
m.mu.Lock()
m.data[d.Key] = d.Value
m.mu.Unlock()
case opDelete:
m.mu.Lock()
delete(m.data, d.Key)
m.mu.Unlock()
default:
xlog.Warn("syncstate: ignoring delta with unknown op", "name", m.cfg.Name, "op", d.Op)
return
}
if m.cfg.OnApply != nil {
m.cfg.OnApply(d.Op, d.Key, d.Value)
}
}
// hydrate replaces the whole map from the durable source: Store if present, else
// Loader. With neither, a late joiner starts empty and catches up via deltas
// (acceptable only for ephemeral state).
func (m *SyncedMap[K, V]) hydrate(ctx context.Context) error {
var (
vals []V
err error
)
switch {
case m.cfg.Store != nil:
vals, err = m.cfg.Store.List(ctx)
case m.cfg.Loader != nil:
vals, err = m.cfg.Loader(ctx)
default:
return nil
}
if err != nil {
return err
}
m.replaceAll(vals)
return nil
}
// replaceAll atomically swaps the map contents for the given values, keyed via
// cfg.Key.
func (m *SyncedMap[K, V]) replaceAll(vals []V) {
next := make(map[K]V, len(vals))
for _, v := range vals {
next[m.cfg.Key(v)] = v
}
m.mu.Lock()
m.data = next
m.mu.Unlock()
}
// reconcileLoop periodically re-hydrates to repair silent drift (missed deltas).
func (m *SyncedMap[K, V]) reconcileLoop() {
defer m.wg.Done()
t := time.NewTicker(m.cfg.Reconcile)
defer t.Stop()
for {
select {
case <-m.lifeCtx.Done():
return
case <-t.C:
if err := m.hydrate(m.lifeCtx); err != nil {
xlog.Warn("syncstate: reconcile re-hydrate failed", "name", m.cfg.Name, "error", err)
}
}
}
}

View File

@@ -0,0 +1,13 @@
package syncstate_test
import (
"testing"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
func TestSyncstate(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Syncstate Suite")
}

View File

@@ -0,0 +1,291 @@
package syncstate_test
import (
"context"
"sync"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"github.com/mudler/LocalAI/core/services/messaging"
"github.com/mudler/LocalAI/core/services/syncstate"
"github.com/mudler/LocalAI/core/services/testutil"
)
// job is a minimal JSON-serializable value stand-in for the real cross-replica
// records (finetune/quant/agent jobs) the component is built for.
type job struct {
ID string `json:"id"`
Status string `json:"status"`
}
func jobKey(j *job) string { return j.ID }
const stateName = "test.jobs"
func deltaSubject() string { return messaging.SubjectSyncStateDelta(stateName) }
// fakeStore is an in-memory Store that records call counts so specs can assert
// the write-through-vs-apply split (local writes hit the Store; applied deltas
// must not).
type fakeStore struct {
mu sync.Mutex
data map[string]*job
upsertCalls int
deleteCalls int
listCalls int
}
func newFakeStore(seed ...*job) *fakeStore {
s := &fakeStore{data: map[string]*job{}}
for _, j := range seed {
s.data[j.ID] = j
}
return s
}
func (s *fakeStore) List(_ context.Context) ([]*job, error) {
s.mu.Lock()
defer s.mu.Unlock()
s.listCalls++
out := make([]*job, 0, len(s.data))
for _, j := range s.data {
out = append(out, j)
}
return out, nil
}
func (s *fakeStore) Upsert(_ context.Context, j *job) error {
s.mu.Lock()
defer s.mu.Unlock()
s.upsertCalls++
s.data[j.ID] = j
return nil
}
func (s *fakeStore) Delete(_ context.Context, k string) error {
s.mu.Lock()
defer s.mu.Unlock()
s.deleteCalls++
delete(s.data, k)
return nil
}
// add simulates a peer replica writing to the shared DB out-of-band (e.g. while
// this replica was partitioned), so a re-hydrate can be observed to pick it up.
func (s *fakeStore) add(j *job) {
s.mu.Lock()
defer s.mu.Unlock()
s.data[j.ID] = j
}
func (s *fakeStore) counts() (upsert, del, list int) {
s.mu.Lock()
defer s.mu.Unlock()
return s.upsertCalls, s.deleteCalls, s.listCalls
}
var _ = Describe("SyncedMap", func() {
ctx := context.Background()
Describe("cross-replica delta propagation", func() {
var (
bus *testutil.FakeBus
a, b *syncstate.SyncedMap[string, *job]
)
BeforeEach(func() {
bus = testutil.NewFakeBus()
a = syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus})
b = syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus})
Expect(a.Start(ctx)).To(Succeed())
Expect(b.Start(ctx)).To(Succeed())
})
AfterEach(func() {
Expect(a.Close()).To(Succeed())
Expect(b.Close()).To(Succeed())
})
It("propagates a Set on A to B", func() {
Expect(a.Set(ctx, &job{ID: "1", Status: "running"})).To(Succeed())
got, ok := b.Get("1")
Expect(ok).To(BeTrue(), "replica B should see the value A just set")
Expect(got.Status).To(Equal("running"))
})
It("prunes a Delete on A from B", func() {
Expect(a.Set(ctx, &job{ID: "1", Status: "running"})).To(Succeed())
_, present := b.Get("1")
Expect(present).To(BeTrue(), "precondition: B must have the value before the delete")
Expect(a.Delete(ctx, "1")).To(Succeed())
_, ok := b.Get("1")
Expect(ok).To(BeFalse(), "a delete on A must remove the key from B")
})
})
Describe("hydration", func() {
It("hydrates on Start from a preloaded Store", func() {
store := newFakeStore(&job{ID: "x", Status: "done"})
m := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Store: store})
Expect(m.Start(ctx)).To(Succeed())
got, ok := m.Get("x")
Expect(ok).To(BeTrue(), "Start must populate the map from the Store")
Expect(got.Status).To(Equal("done"))
})
It("uses the Loader when Store is nil", func() {
m := syncstate.New(syncstate.Config[string, *job]{
Name: stateName,
Key: jobKey,
Loader: func(_ context.Context) ([]*job, error) {
return []*job{{ID: "l", Status: "loaded"}}, nil
},
})
Expect(m.Start(ctx)).To(Succeed())
got, ok := m.Get("l")
Expect(ok).To(BeTrue(), "Loader output must hydrate the map when there is no Store")
Expect(got.Status).To(Equal("loaded"))
})
})
Describe("echo-loop guard", func() {
It("applies its own broadcast once and does not re-publish", func() {
bus := testutil.NewFakeBus()
a := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus})
b := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus})
Expect(a.Start(ctx)).To(Succeed())
Expect(b.Start(ctx)).To(Succeed())
defer func() {
Expect(a.Close()).To(Succeed())
Expect(b.Close()).To(Succeed())
}()
Expect(a.Set(ctx, &job{ID: "e", Status: "running"})).To(Succeed())
// One local write must produce exactly one broadcast: A and B both
// receive it and apply to memory, but the apply path never re-publishes.
Expect(bus.PublishCount(deltaSubject())).To(Equal(1),
"the apply path must not re-broadcast, otherwise replicas storm")
Expect(a.List()).To(HaveLen(1), "A must not double-store its own echo")
_, ok := b.Get("e")
Expect(ok).To(BeTrue())
})
})
Describe("Store write-through vs apply", func() {
It("writes the Store on local Set/Delete but not on an applied delta", func() {
bus := testutil.NewFakeBus()
storeA := newFakeStore()
storeB := newFakeStore()
a := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus, Store: storeA})
b := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus, Store: storeB})
Expect(a.Start(ctx)).To(Succeed())
Expect(b.Start(ctx)).To(Succeed())
defer func() {
Expect(a.Close()).To(Succeed())
Expect(b.Close()).To(Succeed())
}()
Expect(a.Set(ctx, &job{ID: "w", Status: "running"})).To(Succeed())
upA, _, _ := storeA.counts()
upB, _, _ := storeB.counts()
Expect(upA).To(Equal(1), "local Set must write through to its own Store")
Expect(upB).To(Equal(0), "the apply path must never write the peer's Store")
Expect(a.Delete(ctx, "w")).To(Succeed())
_, delA, _ := storeA.counts()
_, delB, _ := storeB.counts()
Expect(delA).To(Equal(1), "local Delete must delete from its own Store")
Expect(delB).To(Equal(0), "the apply path must never delete from the peer's Store")
})
})
Describe("OnApply hook", func() {
It("fires with the correct op and key on an applied delta", func() {
bus := testutil.NewFakeBus()
var (
mu sync.Mutex
ops []string
keys []string
)
a := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus})
b := syncstate.New(syncstate.Config[string, *job]{
Name: stateName, Key: jobKey, Nats: bus,
OnApply: func(op string, k string, _ *job) {
mu.Lock()
ops = append(ops, op)
keys = append(keys, k)
mu.Unlock()
},
})
Expect(a.Start(ctx)).To(Succeed())
Expect(b.Start(ctx)).To(Succeed())
defer func() {
Expect(a.Close()).To(Succeed())
Expect(b.Close()).To(Succeed())
}()
Expect(a.Set(ctx, &job{ID: "o", Status: "running"})).To(Succeed())
Expect(a.Delete(ctx, "o")).To(Succeed())
mu.Lock()
defer mu.Unlock()
Expect(ops).To(Equal([]string{"set", "delete"}))
Expect(keys).To(Equal([]string{"o", "o"}))
})
})
Describe("standalone (nil Nats)", func() {
It("works in-memory with no panic and nothing to broadcast", func() {
m := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey})
Expect(m.Start(ctx)).To(Succeed())
defer func() { Expect(m.Close()).To(Succeed()) }()
Expect(func() {
Expect(m.Set(ctx, &job{ID: "s", Status: "running"})).To(Succeed())
}).ToNot(Panic())
got, ok := m.Get("s")
Expect(ok).To(BeTrue())
Expect(got.Status).To(Equal("running"))
Expect(m.List()).To(HaveLen(1))
Expect(m.Snapshot()).To(HaveKey("s"))
Expect(m.Delete(ctx, "s")).To(Succeed())
_, ok = m.Get("s")
Expect(ok).To(BeFalse())
})
})
Describe("reconnect re-hydrate", func() {
It("re-reads the source when the messaging client reconnects", func() {
bus := testutil.NewFakeBus()
store := newFakeStore(&job{ID: "init", Status: "running"})
m := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus, Store: store})
Expect(m.Start(ctx)).To(Succeed())
defer func() { Expect(m.Close()).To(Succeed()) }()
_, ok := m.Get("init")
Expect(ok).To(BeTrue())
// A peer writes to the shared DB while we are unaware (no delta seen).
store.add(&job{ID: "late", Status: "running"})
_, ok = m.Get("late")
Expect(ok).To(BeFalse(), "the new row should not appear before a re-hydrate")
bus.TriggerReconnect()
_, ok = m.Get("late")
Expect(ok).To(BeTrue(), "reconnect must re-hydrate from the source and pick up drift")
_, _, list := store.counts()
Expect(list).To(Equal(2), "exactly one Start hydrate plus one reconnect re-hydrate")
})
})
})

View File

@@ -0,0 +1,160 @@
package testutil
import (
"encoding/json"
"strings"
"sync"
"time"
"github.com/mudler/LocalAI/core/services/messaging"
)
// FakeBus is an in-memory messaging.MessagingClient that delivers each published
// message synchronously to every registered subscriber whose subject filter
// matches, including NATS-style wildcard subjects (`*` matches exactly one
// token).
//
// Synchronous delivery keeps specs deterministic: the moment Publish returns,
// every matching subscriber's handler has already run, so the spec body can read
// the resulting state without polling. It is the shared test double for every
// cross-replica-sync adopter (gallery, syncstate, ...) so they exercise the same
// delivery semantics. It deliberately depends only on the standard library and
// the messaging package — no test framework — so it is importable anywhere.
type FakeBus struct {
mu sync.Mutex
subs []fakeBusSub
// publishCounts records how many messages were published per subject, so a
// spec can assert the echo-loop guard (an applied delta must not re-publish).
publishCounts map[string]int
// reconnectCbs back the optional OnReconnect/TriggerReconnect pair, letting a
// spec exercise the component's reconnect re-hydrate path without a real
// NATS server.
reconnectCbs []func()
}
type fakeBusSub struct {
subject string
handler func([]byte)
}
// NewFakeBus returns a ready-to-use in-memory bus.
func NewFakeBus() *FakeBus {
return &FakeBus{publishCounts: map[string]int{}}
}
// subjectMatches reports whether a subscription filter matches a concrete
// subject, honoring the single-token `*` wildcard used by NATS.
func subjectMatches(filter, subject string) bool {
if filter == subject {
return true
}
fp := strings.Split(filter, ".")
sp := strings.Split(subject, ".")
if len(fp) != len(sp) {
return false
}
for i := range fp {
if fp[i] == "*" {
continue
}
if fp[i] != sp[i] {
return false
}
}
return true
}
// Publish marshals data as JSON and delivers it synchronously to every matching
// subscriber.
func (b *FakeBus) Publish(subject string, data any) error {
payload, err := json.Marshal(data)
if err != nil {
return err
}
b.mu.Lock()
b.publishCounts[subject]++
subs := append([]fakeBusSub(nil), b.subs...)
b.mu.Unlock()
for _, s := range subs {
if subjectMatches(s.subject, subject) {
s.handler(payload)
}
}
return nil
}
// PublishCount returns how many messages were published on the exact subject.
func (b *FakeBus) PublishCount(subject string) int {
b.mu.Lock()
defer b.mu.Unlock()
return b.publishCounts[subject]
}
type fakeBusSubscription struct {
bus *FakeBus
subRef fakeBusSub
}
func (s *fakeBusSubscription) Unsubscribe() error {
s.bus.mu.Lock()
defer s.bus.mu.Unlock()
for i, candidate := range s.bus.subs {
if candidate.subject == s.subRef.subject {
s.bus.subs = append(s.bus.subs[:i], s.bus.subs[i+1:]...)
return nil
}
}
return nil
}
func (b *FakeBus) Subscribe(subject string, handler func([]byte)) (messaging.Subscription, error) {
sub := fakeBusSub{subject: subject, handler: handler}
b.mu.Lock()
b.subs = append(b.subs, sub)
b.mu.Unlock()
return &fakeBusSubscription{bus: b, subRef: sub}, nil
}
func (b *FakeBus) QueueSubscribe(subject, _ string, handler func([]byte)) (messaging.Subscription, error) {
return b.Subscribe(subject, handler)
}
func (b *FakeBus) QueueSubscribeReply(string, string, func([]byte, func([]byte))) (messaging.Subscription, error) {
return &fakeBusSubscription{bus: b}, nil
}
func (b *FakeBus) SubscribeReply(string, func([]byte, func([]byte))) (messaging.Subscription, error) {
return &fakeBusSubscription{bus: b}, nil
}
func (b *FakeBus) Request(string, []byte, time.Duration) ([]byte, error) {
return nil, nil
}
func (b *FakeBus) IsConnected() bool { return true }
func (b *FakeBus) Close() {}
// OnReconnect mirrors *messaging.Client.OnReconnect so a spec can drive the
// component's reconnect re-hydrate path. The component detects this method via an
// optional interface assertion; implementing it here keeps the fake a faithful
// stand-in for the concrete client.
func (b *FakeBus) OnReconnect(cb func()) {
if cb == nil {
return
}
b.mu.Lock()
b.reconnectCbs = append(b.reconnectCbs, cb)
b.mu.Unlock()
}
// TriggerReconnect runs every registered reconnect callback, simulating a NATS
// reconnect event.
func (b *FakeBus) TriggerReconnect() {
b.mu.Lock()
cbs := append([]func(){}, b.reconnectCbs...)
b.mu.Unlock()
for _, cb := range cbs {
cb()
}
}