mirror of
https://github.com/mudler/LocalAI.git
synced 2026-06-27 18:06:58 -04:00
* feat(distributed): add SyncedMap cross-replica in-memory state component Introduce core/services/syncstate.SyncedMap[K,V]: a thread-safe in-memory map that keeps itself consistent across frontend replicas via NATS, with an optional pluggable durable Store and hydrate-from-source convergence. Several features keep process-local state surfaced to the API (finetune/quant jobs, agent tasks, model configs) and each hand-wired the same in-memory + NATS broadcast + read-through-store legs - or forgot to, reintroducing cross-replica staleness. SyncedMap makes that consistency a configuration choice: - local writes mutate the map, write through the Store, then broadcast a delta; - the apply path is memory-only and never re-publishes or re-writes the Store (structural echo-loop guard, mirroring galleryop.mergeStatus); - on Start and on NATS reconnect the map re-hydrates from the source (Store, else Loader); an optional periodic Reconcile repairs silent drift; - standalone mode (nil NATS client) is a strict in-memory no-op. Reconnect re-hydrate is wired via a new *messaging.Client.OnReconnect callback, consumed through an optional type-assertion so MessagingClient stays minimal. Adds messaging.SubjectSyncStateDelta and a reusable testutil.FakeBus (synchronous in-process MessagingClient with wildcard matching) for adopter tests. Component only; service migrations follow in subsequent commits. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] * refactor(finetune): back jobs with SyncedMap for cross-replica consistency FineTuneService kept jobs in a process-local map and, although it wrote them to Postgres, ListJobs/GetJob never read the store back and the wired natsClient was never used - so in distributed mode a job created on one replica was invisible to the others. Replace the map and the dead client with a syncstate.SyncedMap keyed by job ID, value *schema.FineTuneJob (the exact REST shape, so responses are unchanged). - Add a Store adapter (core/services/finetune/syncstore.go) over FineTuneStore, plus FineTuneStore.ListAll (global hydrate; per-user List kept) and an idempotent Upsert (create-or-update; Create alone fails on dup key). - Writes go through SyncedMap.Set/Delete (write-through + broadcast); reads use List/Get. The on-disk state.json path becomes the standalone Loader, keeping single-node restart recovery (stale->stopped / exporting->failed fixups). - Fold SetNATSClient/SetFineTuneStore into NewFineTuneService; app.go passes the distributed NATS client + store when distributed, nil otherwise. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] * refactor(agentpool): back agent tasks with SyncedMap for cross-replica consistency AgentJobService.ListTasks read the process-local tasks map only, while ListJobs already read through the DB persister + dispatcher NATS - so in distributed mode a task created on one replica was invisible to the others. Back tasks with a syncstate.SyncedMap keyed by task ID (value schema.Task, the exact REST shape); jobs are left untouched. - Store adapter (task_syncstore.go) over the existing JobPersister (LoadTasks/SaveTask/DeleteTask); reads svc.persister/userID live so a persister swap needs no rebuild. No new persister methods required. - Task reads -> SyncedMap.List/Get; create/update -> Set (write-through + broadcast); delete -> Delete. The file persister now owns its own task set so the write-through path does not re-enter the SyncedMap lock (deadlock guard). - The distributed NATS client is not available at construction (start() precedes initDistributed), so it is injected via SetTaskSyncNATS, which rebuilds the still-empty map before Start/hydrate. Wired at the main, restart, and per-user (UserServicesManager) distributed sites. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] * refactor(quantization): back jobs with SyncedMap + durable QuantStore QuantizationService kept jobs in a process-local map persisted only to a local state.json, so in distributed mode jobs were neither visible across replicas nor durable cluster-wide. Back jobs with a syncstate.SyncedMap keyed by job ID (value *schema.QuantizationJob, the exact REST shape). - New distributed.QuantStore (GORM, table quantization_jobs) mirroring FineTuneStore: Create/Get/ListAll/Upsert(idempotent)/Delete, registered for AutoMigrate via distributed.InitStores (Stores.Quant). - New adapter (quantization/syncstore.go) over QuantStore implementing syncstate.Store, with record<->schema conversion. - Reads go through List/Get, writes through Set/Delete (write-through + broadcast); state.json is kept as the standalone Loader for single-node restart recovery (stale-job fixups preserved). - app.go passes the distributed NATS client + QuantStore when distributed, nil otherwise; Start/Close lifecycle mirrors finetune. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] * fix(syncstate): annotate gosec G118 false positive on lifeCtx gosec flagged the WithCancel in Start as "cancellation function not called" because the returned cancel is stored on the struct rather than called/deferred in scope. It is invoked in Close (covered by tests), and lifeCtx must outlive Start to drive the reconnect/reconcile goroutines. Suppress the verified false positive with a justified #nosec G118. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] * test(distributed): e2e two-replica SyncedMap sync over real NATS + Postgres Adds the real-infrastructure counterpart to the fake-bus unit tests, in the existing distributed e2e suite (testcontainers NATS + PostgreSQL). Two SyncedMap instances stand in for two frontend replicas - each with its OWN NATS connection to a shared server and a SHARED Postgres store (the distributed-mode invariant) - and assert, over the wire: - a create on replica A is observed by replica B; - an update and a delete propagate A -> B (delete prunes, which a reload cannot); - a late-joining replica recovers a job it never received a delta for, via store hydrate on Start (the at-most-once gap a fake bus cannot exercise); - a local Set is written through to the shared Postgres store. Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Assisted-by: Claude:claude-opus-4-8 [Claude Code] --------- Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
338 lines
12 KiB
Go
338 lines
12 KiB
Go
package messaging
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/mudler/LocalAI/pkg/sanitize"
|
|
"github.com/mudler/xlog"
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/nats-io/nkeys"
|
|
)
|
|
|
|
// subscribeConfirmTimeout bounds the server round-trip used to detect whether a
|
|
// subscription was rejected (e.g. by JWT permissions) before returning to the caller.
|
|
const subscribeConfirmTimeout = 5 * time.Second
|
|
|
|
// Client wraps a NATS connection and provides helpers for pub/sub and queue subscriptions.
|
|
type Client struct {
|
|
conn *nats.Conn
|
|
mu sync.RWMutex
|
|
|
|
// reconnectCbs are invoked after the underlying connection is
|
|
// re-established. nats.go transparently resubscribes existing
|
|
// subscriptions on reconnect, but it cannot know that a consumer kept
|
|
// derived in-memory state (e.g. syncstate.SyncedMap) that may have drifted
|
|
// while the link was down — these callbacks let such consumers re-hydrate.
|
|
cbMu sync.Mutex
|
|
reconnectCbs []func()
|
|
}
|
|
|
|
// New creates a new NATS client with auto-reconnect.
|
|
func New(url string, opts ...Option) (*Client, error) {
|
|
var cfg connectConfig
|
|
for _, o := range opts {
|
|
o(&cfg)
|
|
}
|
|
|
|
// Allocate the client up front so the reconnect handler closure can reach
|
|
// it; conn is populated after nats.Connect succeeds below.
|
|
c := &Client{}
|
|
|
|
natsOpts := []nats.Option{
|
|
nats.RetryOnFailedConnect(true),
|
|
nats.MaxReconnects(-1),
|
|
nats.DisconnectErrHandler(func(_ *nats.Conn, err error) {
|
|
if err != nil {
|
|
xlog.Warn("NATS disconnected", "error", err)
|
|
}
|
|
}),
|
|
nats.ReconnectHandler(func(_ *nats.Conn) {
|
|
xlog.Info("NATS reconnected")
|
|
c.runReconnectCallbacks()
|
|
}),
|
|
nats.ClosedHandler(func(_ *nats.Conn) {
|
|
xlog.Info("NATS connection closed")
|
|
}),
|
|
// Surface async errors (notably permission violations) that NATS would
|
|
// otherwise deliver silently. A subscription the server rejects for a
|
|
// JWT permission means the worker never receives those messages, so make
|
|
// it loud rather than letting the feature fail invisibly.
|
|
nats.ErrorHandler(func(_ *nats.Conn, sub *nats.Subscription, err error) {
|
|
subject := ""
|
|
if sub != nil {
|
|
subject = sub.Subject
|
|
}
|
|
if errors.Is(err, nats.ErrPermissionViolation) {
|
|
xlog.Error("NATS permission violation — check JWT pub/sub allow lists", "subject", subject, "error", err)
|
|
return
|
|
}
|
|
xlog.Warn("NATS async error", "subject", subject, "error", err)
|
|
}),
|
|
}
|
|
switch {
|
|
case cfg.jwtProvider != nil:
|
|
// Fetch creds on every (re)connect so a refresh loop can rotate the JWT
|
|
// before expiry; the server expiring the old JWT triggers a reconnect
|
|
// that transparently picks up the new one.
|
|
natsOpts = append(natsOpts, nats.UserJWT(
|
|
func() (string, error) {
|
|
jwt, _ := cfg.jwtProvider()
|
|
if jwt == "" {
|
|
return "", fmt.Errorf("no NATS user JWT available")
|
|
}
|
|
return jwt, nil
|
|
},
|
|
func(nonce []byte) ([]byte, error) {
|
|
_, seed := cfg.jwtProvider()
|
|
kp, err := nkeys.FromSeed([]byte(seed))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("loading NATS user seed: %w", err)
|
|
}
|
|
defer kp.Wipe()
|
|
return kp.Sign(nonce)
|
|
},
|
|
))
|
|
case cfg.userJWT != "" && cfg.userSeed != "":
|
|
natsOpts = append(natsOpts, nats.UserJWTAndSeed(cfg.userJWT, cfg.userSeed))
|
|
}
|
|
if cfg.tls.Enabled() {
|
|
if err := cfg.tls.Validate(); err != nil {
|
|
return nil, err
|
|
}
|
|
tlsOpts, err := cfg.tls.natsOptions()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
natsOpts = append(natsOpts, tlsOpts...)
|
|
}
|
|
|
|
nc, err := nats.Connect(url, natsOpts...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("connecting to NATS at %s: %w", sanitize.URL(url), err)
|
|
}
|
|
|
|
c.conn = nc
|
|
return c, nil
|
|
}
|
|
|
|
// OnReconnect registers a callback invoked after the NATS connection is
|
|
// re-established. It is consumed via an optional interface type-assertion
|
|
// (interface{ OnReconnect(func()) }) rather than being added to MessagingClient,
|
|
// so the messaging abstraction stays minimal and standalone/test clients are not
|
|
// forced to implement reconnect semantics. A nil callback is ignored.
|
|
func (c *Client) OnReconnect(cb func()) {
|
|
if cb == nil {
|
|
return
|
|
}
|
|
c.cbMu.Lock()
|
|
c.reconnectCbs = append(c.reconnectCbs, cb)
|
|
c.cbMu.Unlock()
|
|
}
|
|
|
|
// runReconnectCallbacks invokes registered reconnect callbacks. It copies the
|
|
// slice under the lock so a callback that (re)registers cannot deadlock.
|
|
func (c *Client) runReconnectCallbacks() {
|
|
c.cbMu.Lock()
|
|
cbs := append([]func(){}, c.reconnectCbs...)
|
|
c.cbMu.Unlock()
|
|
for _, cb := range cbs {
|
|
cb()
|
|
}
|
|
}
|
|
|
|
// Publish marshals data as JSON and publishes it to the given subject.
|
|
func (c *Client) Publish(subject string, data any) error {
|
|
payload, err := json.Marshal(data)
|
|
if err != nil {
|
|
return fmt.Errorf("marshalling message for %s: %w", subject, err)
|
|
}
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.conn.Publish(subject, payload)
|
|
}
|
|
|
|
// Subscribe creates a subscription on the given subject. All subscribers receive every message.
|
|
func (c *Client) Subscribe(subject string, handler func([]byte)) (Subscription, error) {
|
|
return c.confirmSubscription(subject, func(conn *nats.Conn) (*nats.Subscription, error) {
|
|
return conn.Subscribe(subject, func(msg *nats.Msg) {
|
|
handler(msg.Data)
|
|
})
|
|
})
|
|
}
|
|
|
|
// QueueSubscribe creates a queue subscription. Within the same queue group,
|
|
// only one subscriber receives each message (load-balanced).
|
|
func (c *Client) QueueSubscribe(subject, queue string, handler func([]byte)) (Subscription, error) {
|
|
return c.confirmSubscription(subject, func(conn *nats.Conn) (*nats.Subscription, error) {
|
|
return conn.QueueSubscribe(subject, queue, func(msg *nats.Msg) {
|
|
handler(msg.Data)
|
|
})
|
|
})
|
|
}
|
|
|
|
// confirmSubscription creates a subscription via mk and forces a server
|
|
// round-trip so that a permissions violation — which NATS otherwise reports
|
|
// only asynchronously — is returned to the caller synchronously. The server
|
|
// emits the "-ERR Permissions Violation" for a rejected SUB before the PONG
|
|
// that satisfies the flush, so by the time FlushTimeout returns the violation
|
|
// is recorded as the connection's last error. Without this, a worker whose JWT
|
|
// lacks a subject gets a non-nil subscription that never receives a message,
|
|
// turning a permission misconfiguration into a silent failure.
|
|
func (c *Client) confirmSubscription(subject string, mk func(*nats.Conn) (*nats.Subscription, error)) (Subscription, error) {
|
|
c.mu.RLock()
|
|
conn := c.conn
|
|
c.mu.RUnlock()
|
|
if conn == nil {
|
|
return nil, fmt.Errorf("subscribe to %s: nil NATS connection", subject)
|
|
}
|
|
|
|
sub, err := mk(conn)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// A failed flush here means we could not round-trip to the server (not yet
|
|
// connected, reconnecting, slow link). RetryOnFailedConnect intentionally
|
|
// buffers subscriptions across that gap, so do NOT fail — keep the
|
|
// subscription and let it replay on (re)connect; a later permission
|
|
// violation is still logged by the async error handler in New.
|
|
if err := conn.FlushTimeout(subscribeConfirmTimeout); err != nil {
|
|
xlog.Debug("Could not confirm NATS subscription (will replay on connect)", "subject", subject, "error", err)
|
|
return sub, nil
|
|
}
|
|
// Flush succeeded, so any permission violation for this SUB has already been
|
|
// recorded as the connection's last error (the server emits it before the
|
|
// PONG). LastError is per-connection; match the exact quoted subject the
|
|
// server echoes ("Subscription to \"<subject>\"") so a stale violation for
|
|
// another subject can't be mis-attributed here.
|
|
if lerr := conn.LastError(); lerr != nil &&
|
|
errors.Is(lerr, nats.ErrPermissionViolation) &&
|
|
strings.Contains(lerr.Error(), `Subscription to "`+subject+`"`) {
|
|
_ = sub.Unsubscribe()
|
|
return nil, fmt.Errorf("subscription to %s denied by NATS server (check JWT sub allow list): %w", subject, lerr)
|
|
}
|
|
return sub, nil
|
|
}
|
|
|
|
// Request sends a request and waits for a reply (request-reply pattern).
|
|
// Returns the raw reply data.
|
|
func (c *Client) Request(subject string, data []byte, timeout time.Duration) ([]byte, error) {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
msg, err := c.conn.Request(subject, data, timeout)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("request to %s: %w", subject, err)
|
|
}
|
|
return msg.Data, nil
|
|
}
|
|
|
|
// SubscribeReply creates a subscription that supports replying to requests.
|
|
// The handler receives the raw request data and the reply subject.
|
|
func (c *Client) SubscribeReply(subject string, handler func(data []byte, reply func([]byte))) (Subscription, error) {
|
|
return c.confirmSubscription(subject, func(conn *nats.Conn) (*nats.Subscription, error) {
|
|
return conn.Subscribe(subject, func(msg *nats.Msg) {
|
|
handler(msg.Data, func(replyData []byte) {
|
|
if msg.Reply != "" {
|
|
if err := msg.Respond(replyData); err != nil {
|
|
xlog.Warn("Failed to send NATS reply", "subject", subject, "error", err)
|
|
}
|
|
}
|
|
})
|
|
})
|
|
})
|
|
}
|
|
|
|
// QueueSubscribeReply creates a queue subscription that supports replying to requests.
|
|
// Load-balanced across subscribers in the same queue group, with request-reply support.
|
|
func (c *Client) QueueSubscribeReply(subject, queue string, handler func(data []byte, reply func([]byte))) (Subscription, error) {
|
|
return c.confirmSubscription(subject, func(conn *nats.Conn) (*nats.Subscription, error) {
|
|
return conn.QueueSubscribe(subject, queue, func(msg *nats.Msg) {
|
|
handler(msg.Data, func(replyData []byte) {
|
|
if msg.Reply != "" {
|
|
if err := msg.Respond(replyData); err != nil {
|
|
xlog.Warn("Failed to send NATS reply", "subject", subject, "error", err)
|
|
}
|
|
}
|
|
})
|
|
})
|
|
})
|
|
}
|
|
|
|
// SubscribeJSON creates a subscription that automatically unmarshals JSON messages.
|
|
// Invalid JSON messages are logged and skipped.
|
|
func SubscribeJSON[T any](c MessagingClient, subject string, handler func(T)) (Subscription, error) {
|
|
return c.Subscribe(subject, func(data []byte) {
|
|
var evt T
|
|
if err := json.Unmarshal(data, &evt); err != nil {
|
|
xlog.Warn("Failed to unmarshal NATS message", "subject", subject, "error", err)
|
|
return
|
|
}
|
|
handler(evt)
|
|
})
|
|
}
|
|
|
|
// QueueSubscribeJSON creates a queue subscription that automatically unmarshals JSON messages.
|
|
// Invalid JSON messages are logged and skipped.
|
|
func QueueSubscribeJSON[T any](c MessagingClient, subject, queue string, handler func(T)) (Subscription, error) {
|
|
return c.QueueSubscribe(subject, queue, func(data []byte) {
|
|
var evt T
|
|
if err := json.Unmarshal(data, &evt); err != nil {
|
|
xlog.Warn("Failed to unmarshal NATS message", "subject", subject, "error", err)
|
|
return
|
|
}
|
|
handler(evt)
|
|
})
|
|
}
|
|
|
|
// RequestJSON sends a JSON request-reply via NATS, marshaling the request and
|
|
// unmarshaling the reply. This eliminates the repeated marshal/request/unmarshal
|
|
// boilerplate across all NATS request-reply call sites.
|
|
func RequestJSON[Req, Reply any](c MessagingClient, subject string, req Req, timeout time.Duration) (*Reply, error) {
|
|
data, err := json.Marshal(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshaling request: %w", err)
|
|
}
|
|
replyData, err := c.Request(subject, data, timeout)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("NATS request to %s: %w", subject, err)
|
|
}
|
|
var reply Reply
|
|
if err := json.Unmarshal(replyData, &reply); err != nil {
|
|
return nil, fmt.Errorf("unmarshaling reply from %s: %w", subject, err)
|
|
}
|
|
return &reply, nil
|
|
}
|
|
|
|
// Conn returns the underlying NATS connection for advanced usage.
|
|
//
|
|
// Deprecated: Prefer using the MessagingClient interface methods (Publish, Subscribe, etc.)
|
|
// instead of accessing the raw NATS connection. This method couples callers to the
|
|
// concrete Client type and bypasses the abstraction layer.
|
|
func (c *Client) Conn() *nats.Conn {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.conn
|
|
}
|
|
|
|
// IsConnected returns true if the client is currently connected to a NATS server.
|
|
func (c *Client) IsConnected() bool {
|
|
c.mu.RLock()
|
|
defer c.mu.RUnlock()
|
|
return c.conn != nil && c.conn.IsConnected()
|
|
}
|
|
|
|
// Close drains and closes the NATS connection, waiting for in-flight messages.
|
|
func (c *Client) Close() {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
if c.conn != nil {
|
|
c.conn.Drain()
|
|
c.conn.FlushTimeout(5 * time.Second)
|
|
}
|
|
}
|