mirror of
https://github.com/mudler/LocalAI.git
synced 2026-07-01 11:56:57 -04:00
* feat(distributed): add SyncedMap cross-replica in-memory state component Introduce core/services/syncstate.SyncedMap[K,V]: a thread-safe in-memory map that keeps itself consistent across frontend replicas via NATS, with an optional pluggable durable Store and hydrate-from-source convergence. Several features keep process-local state surfaced to the API (finetune/quant jobs, agent tasks, model configs) and each hand-wired the same in-memory + NATS broadcast + read-through-store legs - or forgot to, reintroducing cross-replica staleness. SyncedMap makes that consistency a configuration choice: - local writes mutate the map, write through the Store, then broadcast a delta; - the apply path is memory-only and never re-publishes or re-writes the Store (structural echo-loop guard, mirroring galleryop.mergeStatus); - on Start and on NATS reconnect the map re-hydrates from the source (Store, else Loader); an optional periodic Reconcile repairs silent drift; - standalone mode (nil NATS client) is a strict in-memory no-op. Reconnect re-hydrate is wired via a new *messaging.Client.OnReconnect callback, consumed through an optional type-assertion so MessagingClient stays minimal. Adds messaging.SubjectSyncStateDelta and a reusable testutil.FakeBus (synchronous in-process MessagingClient with wildcard matching) for adopter tests. Component only; service migrations follow in subsequent commits. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] * refactor(finetune): back jobs with SyncedMap for cross-replica consistency FineTuneService kept jobs in a process-local map and, although it wrote them to Postgres, ListJobs/GetJob never read the store back and the wired natsClient was never used - so in distributed mode a job created on one replica was invisible to the others. Replace the map and the dead client with a syncstate.SyncedMap keyed by job ID, value *schema.FineTuneJob (the exact REST shape, so responses are unchanged). - Add a Store adapter (core/services/finetune/syncstore.go) over FineTuneStore, plus FineTuneStore.ListAll (global hydrate; per-user List kept) and an idempotent Upsert (create-or-update; Create alone fails on dup key). - Writes go through SyncedMap.Set/Delete (write-through + broadcast); reads use List/Get. The on-disk state.json path becomes the standalone Loader, keeping single-node restart recovery (stale->stopped / exporting->failed fixups). - Fold SetNATSClient/SetFineTuneStore into NewFineTuneService; app.go passes the distributed NATS client + store when distributed, nil otherwise. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] * refactor(agentpool): back agent tasks with SyncedMap for cross-replica consistency AgentJobService.ListTasks read the process-local tasks map only, while ListJobs already read through the DB persister + dispatcher NATS - so in distributed mode a task created on one replica was invisible to the others. Back tasks with a syncstate.SyncedMap keyed by task ID (value schema.Task, the exact REST shape); jobs are left untouched. - Store adapter (task_syncstore.go) over the existing JobPersister (LoadTasks/SaveTask/DeleteTask); reads svc.persister/userID live so a persister swap needs no rebuild. No new persister methods required. - Task reads -> SyncedMap.List/Get; create/update -> Set (write-through + broadcast); delete -> Delete. The file persister now owns its own task set so the write-through path does not re-enter the SyncedMap lock (deadlock guard). - The distributed NATS client is not available at construction (start() precedes initDistributed), so it is injected via SetTaskSyncNATS, which rebuilds the still-empty map before Start/hydrate. Wired at the main, restart, and per-user (UserServicesManager) distributed sites. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] * refactor(quantization): back jobs with SyncedMap + durable QuantStore QuantizationService kept jobs in a process-local map persisted only to a local state.json, so in distributed mode jobs were neither visible across replicas nor durable cluster-wide. Back jobs with a syncstate.SyncedMap keyed by job ID (value *schema.QuantizationJob, the exact REST shape). - New distributed.QuantStore (GORM, table quantization_jobs) mirroring FineTuneStore: Create/Get/ListAll/Upsert(idempotent)/Delete, registered for AutoMigrate via distributed.InitStores (Stores.Quant). - New adapter (quantization/syncstore.go) over QuantStore implementing syncstate.Store, with record<->schema conversion. - Reads go through List/Get, writes through Set/Delete (write-through + broadcast); state.json is kept as the standalone Loader for single-node restart recovery (stale-job fixups preserved). - app.go passes the distributed NATS client + QuantStore when distributed, nil otherwise; Start/Close lifecycle mirrors finetune. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] * fix(syncstate): annotate gosec G118 false positive on lifeCtx gosec flagged the WithCancel in Start as "cancellation function not called" because the returned cancel is stored on the struct rather than called/deferred in scope. It is invoked in Close (covered by tests), and lifeCtx must outlive Start to drive the reconnect/reconcile goroutines. Suppress the verified false positive with a justified #nosec G118. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] * test(distributed): e2e two-replica SyncedMap sync over real NATS + Postgres Adds the real-infrastructure counterpart to the fake-bus unit tests, in the existing distributed e2e suite (testcontainers NATS + PostgreSQL). Two SyncedMap instances stand in for two frontend replicas - each with its OWN NATS connection to a shared server and a SHARED Postgres store (the distributed-mode invariant) - and assert, over the wire: - a create on replica A is observed by replica B; - an update and a delete propagate A -> B (delete prunes, which a reload cannot); - a late-joining replica recovers a job it never received a delta for, via store hydrate on Start (the at-most-once gap a fake bus cannot exercise); - a local Set is written through to the shared Postgres store. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
161 lines
4.4 KiB
Go
161 lines
4.4 KiB
Go
package testutil
|
|
|
|
import (
|
|
"encoding/json"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/mudler/LocalAI/core/services/messaging"
|
|
)
|
|
|
|
// FakeBus is an in-memory messaging.MessagingClient that delivers each published
|
|
// message synchronously to every registered subscriber whose subject filter
|
|
// matches, including NATS-style wildcard subjects (`*` matches exactly one
|
|
// token).
|
|
//
|
|
// Synchronous delivery keeps specs deterministic: the moment Publish returns,
|
|
// every matching subscriber's handler has already run, so the spec body can read
|
|
// the resulting state without polling. It is the shared test double for every
|
|
// cross-replica-sync adopter (gallery, syncstate, ...) so they exercise the same
|
|
// delivery semantics. It deliberately depends only on the standard library and
|
|
// the messaging package — no test framework — so it is importable anywhere.
|
|
type FakeBus struct {
|
|
mu sync.Mutex
|
|
subs []fakeBusSub
|
|
// publishCounts records how many messages were published per subject, so a
|
|
// spec can assert the echo-loop guard (an applied delta must not re-publish).
|
|
publishCounts map[string]int
|
|
|
|
// reconnectCbs back the optional OnReconnect/TriggerReconnect pair, letting a
|
|
// spec exercise the component's reconnect re-hydrate path without a real
|
|
// NATS server.
|
|
reconnectCbs []func()
|
|
}
|
|
|
|
type fakeBusSub struct {
|
|
subject string
|
|
handler func([]byte)
|
|
}
|
|
|
|
// NewFakeBus returns a ready-to-use in-memory bus.
|
|
func NewFakeBus() *FakeBus {
|
|
return &FakeBus{publishCounts: map[string]int{}}
|
|
}
|
|
|
|
// subjectMatches reports whether a subscription filter matches a concrete
|
|
// subject, honoring the single-token `*` wildcard used by NATS.
|
|
func subjectMatches(filter, subject string) bool {
|
|
if filter == subject {
|
|
return true
|
|
}
|
|
fp := strings.Split(filter, ".")
|
|
sp := strings.Split(subject, ".")
|
|
if len(fp) != len(sp) {
|
|
return false
|
|
}
|
|
for i := range fp {
|
|
if fp[i] == "*" {
|
|
continue
|
|
}
|
|
if fp[i] != sp[i] {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// Publish marshals data as JSON and delivers it synchronously to every matching
|
|
// subscriber.
|
|
func (b *FakeBus) Publish(subject string, data any) error {
|
|
payload, err := json.Marshal(data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
b.mu.Lock()
|
|
b.publishCounts[subject]++
|
|
subs := append([]fakeBusSub(nil), b.subs...)
|
|
b.mu.Unlock()
|
|
for _, s := range subs {
|
|
if subjectMatches(s.subject, subject) {
|
|
s.handler(payload)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// PublishCount returns how many messages were published on the exact subject.
|
|
func (b *FakeBus) PublishCount(subject string) int {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
return b.publishCounts[subject]
|
|
}
|
|
|
|
type fakeBusSubscription struct {
|
|
bus *FakeBus
|
|
subRef fakeBusSub
|
|
}
|
|
|
|
func (s *fakeBusSubscription) Unsubscribe() error {
|
|
s.bus.mu.Lock()
|
|
defer s.bus.mu.Unlock()
|
|
for i, candidate := range s.bus.subs {
|
|
if candidate.subject == s.subRef.subject {
|
|
s.bus.subs = append(s.bus.subs[:i], s.bus.subs[i+1:]...)
|
|
return nil
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *FakeBus) Subscribe(subject string, handler func([]byte)) (messaging.Subscription, error) {
|
|
sub := fakeBusSub{subject: subject, handler: handler}
|
|
b.mu.Lock()
|
|
b.subs = append(b.subs, sub)
|
|
b.mu.Unlock()
|
|
return &fakeBusSubscription{bus: b, subRef: sub}, nil
|
|
}
|
|
|
|
func (b *FakeBus) QueueSubscribe(subject, _ string, handler func([]byte)) (messaging.Subscription, error) {
|
|
return b.Subscribe(subject, handler)
|
|
}
|
|
|
|
func (b *FakeBus) QueueSubscribeReply(string, string, func([]byte, func([]byte))) (messaging.Subscription, error) {
|
|
return &fakeBusSubscription{bus: b}, nil
|
|
}
|
|
|
|
func (b *FakeBus) SubscribeReply(string, func([]byte, func([]byte))) (messaging.Subscription, error) {
|
|
return &fakeBusSubscription{bus: b}, nil
|
|
}
|
|
|
|
func (b *FakeBus) Request(string, []byte, time.Duration) ([]byte, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (b *FakeBus) IsConnected() bool { return true }
|
|
func (b *FakeBus) Close() {}
|
|
|
|
// OnReconnect mirrors *messaging.Client.OnReconnect so a spec can drive the
|
|
// component's reconnect re-hydrate path. The component detects this method via an
|
|
// optional interface assertion; implementing it here keeps the fake a faithful
|
|
// stand-in for the concrete client.
|
|
func (b *FakeBus) OnReconnect(cb func()) {
|
|
if cb == nil {
|
|
return
|
|
}
|
|
b.mu.Lock()
|
|
b.reconnectCbs = append(b.reconnectCbs, cb)
|
|
b.mu.Unlock()
|
|
}
|
|
|
|
// TriggerReconnect runs every registered reconnect callback, simulating a NATS
|
|
// reconnect event.
|
|
func (b *FakeBus) TriggerReconnect() {
|
|
b.mu.Lock()
|
|
cbs := append([]func(){}, b.reconnectCbs...)
|
|
b.mu.Unlock()
|
|
for _, cb := range cbs {
|
|
cb()
|
|
}
|
|
}
|