Files
LocalAI/core/services/nodes/probe_cache.go
LocalAI [bot] 8bbe89a537 fix(distributed): route per request across loaded replicas + cache probeHealth (#9968)
* refactor(distributed): extract PickBestReplica from FindAndLockNodeWithModel

Lifts the replica-selection policy (in_flight ASC, last_used ASC,
available_vram DESC) out of the SQL ORDER BY into a pure Go function in
the new replicapicker.go. The SQL clause keeps its FOR UPDATE atomicity
and remains the production path used by SmartRouter; PickBestReplica is
the canonical implementation that the future per-frontend rotating
replica cache (TODO referenced from pkg/model) will call against an
in-memory snapshot without paying a DB round-trip per inference.

A new registry_test mirror spec seeds a multi-tier scenario and asserts
both layers pick the same replica, so any future tweak to either side
fails the test until the other side is updated.

No behavior change.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-7 [Claude Code]

* fix(distributed): route per inference request and cache probeHealth

Two related fixes that together restore load balancing across loaded
replicas of the same model.

1. ModelLoader.Load and LoadModel bypass the local *Model cache when
   modelRouter is set. The cached *Model wraps an InFlightTrackingClient
   bound to a single (nodeID, replicaIndex) — reusing it pinned every
   subsequent request to whichever node won the very first pick, so
   FindAndLockNodeWithModel's round-robin never got a chance to run
   even after the reconciler scaled the model out to a second node. In
   distributed mode SmartRouter.Route now runs per request, and
   PickBestReplica picks the least-loaded replica each time.

   SmartRouter has its own coalescing (advisory DB lock for first-time
   loads + singleflight on backend.install RPC) so concurrent first
   requests for a not-yet-loaded model still produce a single worker
   side install.

2. SmartRouter.probeHealth memoizes successful gRPC HealthCheck results
   in a new probeCache (probe_cache.go) with a 30s TTL. With per-request
   routing every inference call hits probeHealth, and llama.cpp-style
   backends serialize HealthCheck behind active Predict — so a burst of
   incoming requests stalled on the probe to a node already mid-stream,
   tripping the 2s timeout and falling through to the install path.
   singleflight collapses N concurrent first-time probes for the same
   (node, addr) into one round-trip, failed probes invalidate the entry
   so the staleness-recovery path still triggers, and the TTL matches
   pkg/model/model.go's healthCheckTTL so the single-process and
   distributed paths share a staleness budget. The background
   HealthMonitor still reaps actually-dead backends within ~45s.

The bypass introduces one short FindAndLockNodeWithModel transaction per
inference. A TODO in pkg/model/loader.go documents the future per modelID
rotating-replica cache that would reuse PickBestReplica against an
in-memory snapshot and skip the DB round-trip for hot paths.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-7 [Claude Code]

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-24 08:15:27 +00:00

95 lines
3.0 KiB
Go

package nodes
import (
"sync"
"time"
"golang.org/x/sync/singleflight"
)
// probeCache memoizes recent successful gRPC HealthCheck results for
// (nodeID, addr) tuples so SmartRouter.probeHealth doesn't pay a round-trip
// on every inference request.
//
// Why this exists: with per-request routing (see pkg/model/loader.go), every
// inference call goes through SmartRouter.Route, which probes the backend
// before returning a client. Many gRPC backends (notably llama.cpp's server)
// serialize HealthCheck against active Predict on a shared goroutine, so a
// burst of new requests can stall behind a single long-running stream —
// exactly the "queue stalling" symptom observed in distributed clusters.
//
// The background HealthMonitor (perModelHealthCheck) is still the cluster-wide
// source of truth that reaps actually-dead backends within ~45s; this cache
// only saves the per-request hot path from re-asking when nothing has changed.
//
// TTL matches healthCheckTTL in pkg/model/model.go so the single-process
// IsRecentlyHealthy path and this distributed-mode path share the same
// staleness budget.
type probeCache struct {
ttl time.Duration
mu sync.Mutex
seen map[string]time.Time // key → last successful probe
flight singleflight.Group // coalesces concurrent probes for the same key
}
// newProbeCache returns a probeCache with the given TTL. Zero TTL disables
// caching: every call to DoOrCached invokes the probe.
func newProbeCache(ttl time.Duration) *probeCache {
return &probeCache{
ttl: ttl,
seen: make(map[string]time.Time),
}
}
// IsFresh reports whether key was successfully probed within TTL.
func (c *probeCache) IsFresh(key string) bool {
if c.ttl <= 0 {
return false
}
c.mu.Lock()
defer c.mu.Unlock()
last, ok := c.seen[key]
return ok && time.Since(last) < c.ttl
}
// markFresh records key as successfully probed at the current time.
func (c *probeCache) markFresh(key string) {
c.mu.Lock()
defer c.mu.Unlock()
c.seen[key] = time.Now()
}
// Invalidate drops any cached freshness for key. Used after a probe failure
// (or any other signal that the backend may not be alive) so the next call
// will re-probe instead of trusting stale state.
func (c *probeCache) Invalidate(key string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.seen, key)
}
// DoOrCached returns true if key is fresh; otherwise it runs probe (coalescing
// concurrent callers via singleflight) and caches a successful result. Failed
// probes invalidate the cache, so a transient miss doesn't pin every
// subsequent request to a re-probe.
func (c *probeCache) DoOrCached(key string, probe func() bool) bool {
if c.IsFresh(key) {
return true
}
v, _, _ := c.flight.Do(key, func() (any, error) {
// Double-check after potentially waiting: another caller in this
// flight may have just populated the cache.
if c.IsFresh(key) {
return true, nil
}
ok := probe()
if ok {
c.markFresh(key)
} else {
c.Invalidate(key)
}
return ok, nil
})
return v.(bool)
}