mirror of
https://github.com/mudler/LocalAI.git
synced 2026-06-28 02:17:00 -04:00
* feat(distributed): add SyncedMap cross-replica in-memory state component Introduce core/services/syncstate.SyncedMap[K,V]: a thread-safe in-memory map that keeps itself consistent across frontend replicas via NATS, with an optional pluggable durable Store and hydrate-from-source convergence. Several features keep process-local state surfaced to the API (finetune/quant jobs, agent tasks, model configs) and each hand-wired the same in-memory + NATS broadcast + read-through-store legs - or forgot to, reintroducing cross-replica staleness. SyncedMap makes that consistency a configuration choice: - local writes mutate the map, write through the Store, then broadcast a delta; - the apply path is memory-only and never re-publishes or re-writes the Store (structural echo-loop guard, mirroring galleryop.mergeStatus); - on Start and on NATS reconnect the map re-hydrates from the source (Store, else Loader); an optional periodic Reconcile repairs silent drift; - standalone mode (nil NATS client) is a strict in-memory no-op. Reconnect re-hydrate is wired via a new *messaging.Client.OnReconnect callback, consumed through an optional type-assertion so MessagingClient stays minimal. Adds messaging.SubjectSyncStateDelta and a reusable testutil.FakeBus (synchronous in-process MessagingClient with wildcard matching) for adopter tests. Component only; service migrations follow in subsequent commits. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] * refactor(finetune): back jobs with SyncedMap for cross-replica consistency FineTuneService kept jobs in a process-local map and, although it wrote them to Postgres, ListJobs/GetJob never read the store back and the wired natsClient was never used - so in distributed mode a job created on one replica was invisible to the others. Replace the map and the dead client with a syncstate.SyncedMap keyed by job ID, value *schema.FineTuneJob (the exact REST shape, so responses are unchanged). - Add a Store adapter (core/services/finetune/syncstore.go) over FineTuneStore, plus FineTuneStore.ListAll (global hydrate; per-user List kept) and an idempotent Upsert (create-or-update; Create alone fails on dup key). - Writes go through SyncedMap.Set/Delete (write-through + broadcast); reads use List/Get. The on-disk state.json path becomes the standalone Loader, keeping single-node restart recovery (stale->stopped / exporting->failed fixups). - Fold SetNATSClient/SetFineTuneStore into NewFineTuneService; app.go passes the distributed NATS client + store when distributed, nil otherwise. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] * refactor(agentpool): back agent tasks with SyncedMap for cross-replica consistency AgentJobService.ListTasks read the process-local tasks map only, while ListJobs already read through the DB persister + dispatcher NATS - so in distributed mode a task created on one replica was invisible to the others. Back tasks with a syncstate.SyncedMap keyed by task ID (value schema.Task, the exact REST shape); jobs are left untouched. - Store adapter (task_syncstore.go) over the existing JobPersister (LoadTasks/SaveTask/DeleteTask); reads svc.persister/userID live so a persister swap needs no rebuild. No new persister methods required. - Task reads -> SyncedMap.List/Get; create/update -> Set (write-through + broadcast); delete -> Delete. The file persister now owns its own task set so the write-through path does not re-enter the SyncedMap lock (deadlock guard). - The distributed NATS client is not available at construction (start() precedes initDistributed), so it is injected via SetTaskSyncNATS, which rebuilds the still-empty map before Start/hydrate. Wired at the main, restart, and per-user (UserServicesManager) distributed sites. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] * refactor(quantization): back jobs with SyncedMap + durable QuantStore QuantizationService kept jobs in a process-local map persisted only to a local state.json, so in distributed mode jobs were neither visible across replicas nor durable cluster-wide. Back jobs with a syncstate.SyncedMap keyed by job ID (value *schema.QuantizationJob, the exact REST shape). - New distributed.QuantStore (GORM, table quantization_jobs) mirroring FineTuneStore: Create/Get/ListAll/Upsert(idempotent)/Delete, registered for AutoMigrate via distributed.InitStores (Stores.Quant). - New adapter (quantization/syncstore.go) over QuantStore implementing syncstate.Store, with record<->schema conversion. - Reads go through List/Get, writes through Set/Delete (write-through + broadcast); state.json is kept as the standalone Loader for single-node restart recovery (stale-job fixups preserved). - app.go passes the distributed NATS client + QuantStore when distributed, nil otherwise; Start/Close lifecycle mirrors finetune. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] * fix(syncstate): annotate gosec G118 false positive on lifeCtx gosec flagged the WithCancel in Start as "cancellation function not called" because the returned cancel is stored on the struct rather than called/deferred in scope. It is invoked in Close (covered by tests), and lifeCtx must outlive Start to drive the reconnect/reconcile goroutines. Suppress the verified false positive with a justified #nosec G118. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] * test(distributed): e2e two-replica SyncedMap sync over real NATS + Postgres Adds the real-infrastructure counterpart to the fake-bus unit tests, in the existing distributed e2e suite (testcontainers NATS + PostgreSQL). Two SyncedMap instances stand in for two frontend replicas - each with its OWN NATS connection to a shared server and a SHARED Postgres store (the distributed-mode invariant) - and assert, over the wire: - a create on replica A is observed by replica B; - an update and a delete propagate A -> B (delete prunes, which a reload cannot); - a late-joining replica recovers a job it never received a delta for, via store hydrate on Start (the at-most-once gap a fake bus cannot exercise); - a local Set is written through to the shared Postgres store. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
292 lines
8.8 KiB
Go
292 lines
8.8 KiB
Go
package syncstate_test
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
. "github.com/onsi/ginkgo/v2"
|
|
. "github.com/onsi/gomega"
|
|
|
|
"github.com/mudler/LocalAI/core/services/messaging"
|
|
"github.com/mudler/LocalAI/core/services/syncstate"
|
|
"github.com/mudler/LocalAI/core/services/testutil"
|
|
)
|
|
|
|
// job is a minimal JSON-serializable value stand-in for the real cross-replica
|
|
// records (finetune/quant/agent jobs) the component is built for.
|
|
type job struct {
|
|
ID string `json:"id"`
|
|
Status string `json:"status"`
|
|
}
|
|
|
|
func jobKey(j *job) string { return j.ID }
|
|
|
|
const stateName = "test.jobs"
|
|
|
|
func deltaSubject() string { return messaging.SubjectSyncStateDelta(stateName) }
|
|
|
|
// fakeStore is an in-memory Store that records call counts so specs can assert
|
|
// the write-through-vs-apply split (local writes hit the Store; applied deltas
|
|
// must not).
|
|
type fakeStore struct {
|
|
mu sync.Mutex
|
|
data map[string]*job
|
|
upsertCalls int
|
|
deleteCalls int
|
|
listCalls int
|
|
}
|
|
|
|
func newFakeStore(seed ...*job) *fakeStore {
|
|
s := &fakeStore{data: map[string]*job{}}
|
|
for _, j := range seed {
|
|
s.data[j.ID] = j
|
|
}
|
|
return s
|
|
}
|
|
|
|
func (s *fakeStore) List(_ context.Context) ([]*job, error) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.listCalls++
|
|
out := make([]*job, 0, len(s.data))
|
|
for _, j := range s.data {
|
|
out = append(out, j)
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (s *fakeStore) Upsert(_ context.Context, j *job) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.upsertCalls++
|
|
s.data[j.ID] = j
|
|
return nil
|
|
}
|
|
|
|
func (s *fakeStore) Delete(_ context.Context, k string) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.deleteCalls++
|
|
delete(s.data, k)
|
|
return nil
|
|
}
|
|
|
|
// add simulates a peer replica writing to the shared DB out-of-band (e.g. while
|
|
// this replica was partitioned), so a re-hydrate can be observed to pick it up.
|
|
func (s *fakeStore) add(j *job) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.data[j.ID] = j
|
|
}
|
|
|
|
func (s *fakeStore) counts() (upsert, del, list int) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
return s.upsertCalls, s.deleteCalls, s.listCalls
|
|
}
|
|
|
|
var _ = Describe("SyncedMap", func() {
|
|
ctx := context.Background()
|
|
|
|
Describe("cross-replica delta propagation", func() {
|
|
var (
|
|
bus *testutil.FakeBus
|
|
a, b *syncstate.SyncedMap[string, *job]
|
|
)
|
|
|
|
BeforeEach(func() {
|
|
bus = testutil.NewFakeBus()
|
|
a = syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus})
|
|
b = syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus})
|
|
Expect(a.Start(ctx)).To(Succeed())
|
|
Expect(b.Start(ctx)).To(Succeed())
|
|
})
|
|
|
|
AfterEach(func() {
|
|
Expect(a.Close()).To(Succeed())
|
|
Expect(b.Close()).To(Succeed())
|
|
})
|
|
|
|
It("propagates a Set on A to B", func() {
|
|
Expect(a.Set(ctx, &job{ID: "1", Status: "running"})).To(Succeed())
|
|
|
|
got, ok := b.Get("1")
|
|
Expect(ok).To(BeTrue(), "replica B should see the value A just set")
|
|
Expect(got.Status).To(Equal("running"))
|
|
})
|
|
|
|
It("prunes a Delete on A from B", func() {
|
|
Expect(a.Set(ctx, &job{ID: "1", Status: "running"})).To(Succeed())
|
|
_, present := b.Get("1")
|
|
Expect(present).To(BeTrue(), "precondition: B must have the value before the delete")
|
|
|
|
Expect(a.Delete(ctx, "1")).To(Succeed())
|
|
|
|
_, ok := b.Get("1")
|
|
Expect(ok).To(BeFalse(), "a delete on A must remove the key from B")
|
|
})
|
|
})
|
|
|
|
Describe("hydration", func() {
|
|
It("hydrates on Start from a preloaded Store", func() {
|
|
store := newFakeStore(&job{ID: "x", Status: "done"})
|
|
m := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Store: store})
|
|
Expect(m.Start(ctx)).To(Succeed())
|
|
|
|
got, ok := m.Get("x")
|
|
Expect(ok).To(BeTrue(), "Start must populate the map from the Store")
|
|
Expect(got.Status).To(Equal("done"))
|
|
})
|
|
|
|
It("uses the Loader when Store is nil", func() {
|
|
m := syncstate.New(syncstate.Config[string, *job]{
|
|
Name: stateName,
|
|
Key: jobKey,
|
|
Loader: func(_ context.Context) ([]*job, error) {
|
|
return []*job{{ID: "l", Status: "loaded"}}, nil
|
|
},
|
|
})
|
|
Expect(m.Start(ctx)).To(Succeed())
|
|
|
|
got, ok := m.Get("l")
|
|
Expect(ok).To(BeTrue(), "Loader output must hydrate the map when there is no Store")
|
|
Expect(got.Status).To(Equal("loaded"))
|
|
})
|
|
})
|
|
|
|
Describe("echo-loop guard", func() {
|
|
It("applies its own broadcast once and does not re-publish", func() {
|
|
bus := testutil.NewFakeBus()
|
|
a := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus})
|
|
b := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus})
|
|
Expect(a.Start(ctx)).To(Succeed())
|
|
Expect(b.Start(ctx)).To(Succeed())
|
|
defer func() {
|
|
Expect(a.Close()).To(Succeed())
|
|
Expect(b.Close()).To(Succeed())
|
|
}()
|
|
|
|
Expect(a.Set(ctx, &job{ID: "e", Status: "running"})).To(Succeed())
|
|
|
|
// One local write must produce exactly one broadcast: A and B both
|
|
// receive it and apply to memory, but the apply path never re-publishes.
|
|
Expect(bus.PublishCount(deltaSubject())).To(Equal(1),
|
|
"the apply path must not re-broadcast, otherwise replicas storm")
|
|
Expect(a.List()).To(HaveLen(1), "A must not double-store its own echo")
|
|
_, ok := b.Get("e")
|
|
Expect(ok).To(BeTrue())
|
|
})
|
|
})
|
|
|
|
Describe("Store write-through vs apply", func() {
|
|
It("writes the Store on local Set/Delete but not on an applied delta", func() {
|
|
bus := testutil.NewFakeBus()
|
|
storeA := newFakeStore()
|
|
storeB := newFakeStore()
|
|
a := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus, Store: storeA})
|
|
b := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus, Store: storeB})
|
|
Expect(a.Start(ctx)).To(Succeed())
|
|
Expect(b.Start(ctx)).To(Succeed())
|
|
defer func() {
|
|
Expect(a.Close()).To(Succeed())
|
|
Expect(b.Close()).To(Succeed())
|
|
}()
|
|
|
|
Expect(a.Set(ctx, &job{ID: "w", Status: "running"})).To(Succeed())
|
|
|
|
upA, _, _ := storeA.counts()
|
|
upB, _, _ := storeB.counts()
|
|
Expect(upA).To(Equal(1), "local Set must write through to its own Store")
|
|
Expect(upB).To(Equal(0), "the apply path must never write the peer's Store")
|
|
|
|
Expect(a.Delete(ctx, "w")).To(Succeed())
|
|
_, delA, _ := storeA.counts()
|
|
_, delB, _ := storeB.counts()
|
|
Expect(delA).To(Equal(1), "local Delete must delete from its own Store")
|
|
Expect(delB).To(Equal(0), "the apply path must never delete from the peer's Store")
|
|
})
|
|
})
|
|
|
|
Describe("OnApply hook", func() {
|
|
It("fires with the correct op and key on an applied delta", func() {
|
|
bus := testutil.NewFakeBus()
|
|
var (
|
|
mu sync.Mutex
|
|
ops []string
|
|
keys []string
|
|
)
|
|
a := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus})
|
|
b := syncstate.New(syncstate.Config[string, *job]{
|
|
Name: stateName, Key: jobKey, Nats: bus,
|
|
OnApply: func(op string, k string, _ *job) {
|
|
mu.Lock()
|
|
ops = append(ops, op)
|
|
keys = append(keys, k)
|
|
mu.Unlock()
|
|
},
|
|
})
|
|
Expect(a.Start(ctx)).To(Succeed())
|
|
Expect(b.Start(ctx)).To(Succeed())
|
|
defer func() {
|
|
Expect(a.Close()).To(Succeed())
|
|
Expect(b.Close()).To(Succeed())
|
|
}()
|
|
|
|
Expect(a.Set(ctx, &job{ID: "o", Status: "running"})).To(Succeed())
|
|
Expect(a.Delete(ctx, "o")).To(Succeed())
|
|
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
Expect(ops).To(Equal([]string{"set", "delete"}))
|
|
Expect(keys).To(Equal([]string{"o", "o"}))
|
|
})
|
|
})
|
|
|
|
Describe("standalone (nil Nats)", func() {
|
|
It("works in-memory with no panic and nothing to broadcast", func() {
|
|
m := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey})
|
|
Expect(m.Start(ctx)).To(Succeed())
|
|
defer func() { Expect(m.Close()).To(Succeed()) }()
|
|
|
|
Expect(func() {
|
|
Expect(m.Set(ctx, &job{ID: "s", Status: "running"})).To(Succeed())
|
|
}).ToNot(Panic())
|
|
|
|
got, ok := m.Get("s")
|
|
Expect(ok).To(BeTrue())
|
|
Expect(got.Status).To(Equal("running"))
|
|
Expect(m.List()).To(HaveLen(1))
|
|
Expect(m.Snapshot()).To(HaveKey("s"))
|
|
|
|
Expect(m.Delete(ctx, "s")).To(Succeed())
|
|
_, ok = m.Get("s")
|
|
Expect(ok).To(BeFalse())
|
|
})
|
|
})
|
|
|
|
Describe("reconnect re-hydrate", func() {
|
|
It("re-reads the source when the messaging client reconnects", func() {
|
|
bus := testutil.NewFakeBus()
|
|
store := newFakeStore(&job{ID: "init", Status: "running"})
|
|
m := syncstate.New(syncstate.Config[string, *job]{Name: stateName, Key: jobKey, Nats: bus, Store: store})
|
|
Expect(m.Start(ctx)).To(Succeed())
|
|
defer func() { Expect(m.Close()).To(Succeed()) }()
|
|
|
|
_, ok := m.Get("init")
|
|
Expect(ok).To(BeTrue())
|
|
|
|
// A peer writes to the shared DB while we are unaware (no delta seen).
|
|
store.add(&job{ID: "late", Status: "running"})
|
|
_, ok = m.Get("late")
|
|
Expect(ok).To(BeFalse(), "the new row should not appear before a re-hydrate")
|
|
|
|
bus.TriggerReconnect()
|
|
|
|
_, ok = m.Get("late")
|
|
Expect(ok).To(BeTrue(), "reconnect must re-hydrate from the source and pick up drift")
|
|
_, _, list := store.counts()
|
|
Expect(list).To(Equal(2), "exactly one Start hydrate plus one reconnect re-hydrate")
|
|
})
|
|
})
|
|
})
|