mirror of
https://github.com/mudler/LocalAI.git
synced 2026-06-29 19:06:43 -04:00
Compare commits
1 Commits
master
...
fix/distri
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a73516f9b4 |
@@ -356,6 +356,12 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoade
|
||||
PrefixConfig: prefixCfg,
|
||||
Pressure: pressure,
|
||||
SharedModels: cfg.Distributed.SharedModels,
|
||||
// Cap how long a cold load may hold the per-model advisory lock: the
|
||||
// configured backend.install deadline plus a margin for file staging and
|
||||
// the remote LoadModel. Derived from the install timeout so raising it
|
||||
// (for slow links pulling multi-GB images) widens the ceiling too,
|
||||
// instead of letting the static default cut a legitimately slow load.
|
||||
ModelLoadCeiling: cfg.Distributed.BackendInstallTimeoutOrDefault() + 10*time.Minute,
|
||||
})
|
||||
|
||||
// Wire staging-progress broadcasting so file-staging shows up on every
|
||||
|
||||
@@ -130,6 +130,20 @@ func WithLockCtx(ctx context.Context, db *gorm.DB, key int64, fn func() error) e
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// Neutralize any deployment-wide lock_timeout on this dedicated connection.
|
||||
// Operators commonly set a short global lock_timeout (on the role or
|
||||
// database) to bound ordinary row-lock waits. Applied to the blocking
|
||||
// pg_advisory_lock below, it aborts the wait with SQLSTATE 55P03 and turns
|
||||
// LocalAI's intentional cross-replica "wait your turn, then re-check"
|
||||
// coordination into a hard error for the caller (e.g. a chat request that
|
||||
// just wanted to reuse a model another replica is loading). Let the Go
|
||||
// context be the single source of truth for how long we wait instead.
|
||||
if _, err := conn.ExecContext(ctx, "SET lock_timeout = 0"); err != nil {
|
||||
return fmt.Errorf("advisorylock: disabling lock_timeout: %w", err)
|
||||
}
|
||||
// Restore the session default before this pooled connection is reused.
|
||||
defer func() { _, _ = conn.ExecContext(context.Background(), "RESET lock_timeout") }()
|
||||
|
||||
if _, err := conn.ExecContext(ctx, "SELECT pg_advisory_lock($1)", key); err != nil {
|
||||
return fmt.Errorf("advisorylock: acquiring lock %d: %w", key, err)
|
||||
}
|
||||
|
||||
@@ -158,6 +158,53 @@ var _ = Describe("AdvisoryLock", func() {
|
||||
Expect(err).To(HaveOccurred())
|
||||
})
|
||||
|
||||
It("waits out a short server-side lock_timeout instead of failing with 55P03", func() {
|
||||
const lockKey int64 = 703
|
||||
|
||||
// Reproduce the production deployment that triggered this: a short
|
||||
// global lock_timeout set on the database. Without the fix, a waiter
|
||||
// blocked on pg_advisory_lock() is aborted by the server after this
|
||||
// window and surfaces SQLSTATE 55P03 ("canceling statement due to
|
||||
// lock timeout") to the caller instead of waiting for its turn.
|
||||
Expect(db.Exec("ALTER DATABASE testdb SET lock_timeout = '300ms'").Error).ToNot(HaveOccurred())
|
||||
sqlDB, err := db.DB()
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
// Drop pooled connections so subsequent ones reconnect and inherit
|
||||
// the new database-level lock_timeout default.
|
||||
sqlDB.SetMaxIdleConns(0)
|
||||
|
||||
holding := make(chan struct{})
|
||||
released := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
herr := WithLockCtx(context.Background(), db, lockKey, func() error {
|
||||
close(holding)
|
||||
// Hold well past the 300ms server lock_timeout.
|
||||
time.Sleep(1 * time.Second)
|
||||
return nil
|
||||
})
|
||||
Expect(herr).ToNot(HaveOccurred())
|
||||
close(released)
|
||||
}()
|
||||
|
||||
<-holding // ensure the holder owns the lock before we contend
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
executed := false
|
||||
start := time.Now()
|
||||
werr := WithLockCtx(ctx, db, lockKey, func() error {
|
||||
executed = true
|
||||
return nil
|
||||
})
|
||||
Expect(werr).ToNot(HaveOccurred(),
|
||||
"waiter should wait out the in-progress hold, not fail with lock_timeout (55P03)")
|
||||
Expect(executed).To(BeTrue())
|
||||
Expect(time.Since(start)).To(BeNumerically(">=", 400*time.Millisecond),
|
||||
"waiter should have actually waited for the holder to release")
|
||||
<-released
|
||||
})
|
||||
|
||||
It("serializes concurrent WithLockCtx on same key", func() {
|
||||
const lockKey int64 = 702
|
||||
|
||||
|
||||
@@ -68,6 +68,13 @@ type SmartRouterOptions struct {
|
||||
// the absolute model paths untouched so the worker loads them directly from
|
||||
// the shared volume (#10556). See config.DistributedConfig.SharedModels.
|
||||
SharedModels bool
|
||||
// ModelLoadCeiling is the hard upper bound on how long a single cold-load
|
||||
// attempt (node selection -> backend install -> file staging -> LoadModel)
|
||||
// may run while holding the per-model advisory lock. It backstops every
|
||||
// sub-step's own timeout so a wedged worker can never pin the lock - and
|
||||
// every other replica's request for that model - indefinitely. Zero selects
|
||||
// defaultModelLoadCeiling.
|
||||
ModelLoadCeiling time.Duration
|
||||
}
|
||||
|
||||
// SmartRouter routes inference requests to the best available backend node.
|
||||
@@ -101,8 +108,18 @@ type SmartRouter struct {
|
||||
// sharedModels skips file staging when all nodes mount the same models
|
||||
// directory at the same path (see SmartRouterOptions.SharedModels).
|
||||
sharedModels bool
|
||||
// modelLoadCeiling bounds how long a cold load may hold the per-model
|
||||
// advisory lock (see SmartRouterOptions.ModelLoadCeiling).
|
||||
modelLoadCeiling time.Duration
|
||||
}
|
||||
|
||||
// defaultModelLoadCeiling is the fallback hold ceiling for a cold model load.
|
||||
// It must comfortably exceed the slowest legitimate load - a multi-GB backend
|
||||
// install (DefaultBackendInstallTimeout, 15m) plus staging and the remote
|
||||
// LoadModel (5m) - so it never cuts a real load short; it only ever fires when
|
||||
// a step is genuinely wedged (e.g. a worker that died mid-install).
|
||||
const defaultModelLoadCeiling = 25 * time.Minute
|
||||
|
||||
// probeCacheTTL is how long a successful gRPC HealthCheck on a backend is
|
||||
// trusted before the next request re-probes. Matches healthCheckTTL in
|
||||
// pkg/model/model.go so the single-process and distributed paths share a
|
||||
@@ -117,6 +134,10 @@ func NewSmartRouter(registry ModelRouter, opts SmartRouterOptions) *SmartRouter
|
||||
if factory == nil {
|
||||
factory = &tokenClientFactory{token: opts.AuthToken}
|
||||
}
|
||||
ceiling := opts.ModelLoadCeiling
|
||||
if ceiling <= 0 {
|
||||
ceiling = defaultModelLoadCeiling
|
||||
}
|
||||
return &SmartRouter{
|
||||
registry: registry,
|
||||
unloader: opts.Unloader,
|
||||
@@ -131,6 +152,7 @@ func NewSmartRouter(registry ModelRouter, opts SmartRouterOptions) *SmartRouter
|
||||
prefixConfig: opts.PrefixConfig,
|
||||
pressure: opts.Pressure,
|
||||
sharedModels: opts.SharedModels,
|
||||
modelLoadCeiling: ceiling,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -383,11 +405,19 @@ func (r *SmartRouter) Route(ctx context.Context, modelID, modelName, backendType
|
||||
// the request context. If staging were bound to it, the multi-GB upload
|
||||
// aborts with "context canceled" mid-transfer and large models can never
|
||||
// finish staging (the model-load outage). WithoutCancel keeps the request's
|
||||
// values (prefix chain, etc.) but drops its cancellation/deadline. Each
|
||||
// long step still has its own bound (the file stager's resume budget,
|
||||
// LoadModel's 5m timeout), and the per-model advisory lock below de-dupes
|
||||
// concurrent loaders across replicas.
|
||||
loadCtx := context.WithoutCancel(ctx)
|
||||
// values (prefix chain, etc.) but drops its cancellation/deadline.
|
||||
//
|
||||
// Detaching from the caller is necessary, but it must not be unbounded: the
|
||||
// load runs while holding the per-model advisory lock, and a worker that
|
||||
// dies mid-install (its backend.install never replies) would otherwise pin
|
||||
// that lock (and every other replica's request for the same model) until
|
||||
// the NATS install deadline alone expires. Re-impose a single hard ceiling
|
||||
// over the whole sequence so the lock is always released in bounded time,
|
||||
// even if a sub-step wedges. Each long step still has its own (tighter)
|
||||
// bound; this only backstops them. The per-model advisory lock below
|
||||
// de-dupes concurrent loaders across replicas.
|
||||
loadCtx, cancelLoad := context.WithTimeout(context.WithoutCancel(ctx), r.modelLoadCeiling)
|
||||
defer cancelLoad()
|
||||
loadModel := func(ctx context.Context) (*RouteResult, error) {
|
||||
// Re-check after acquiring lock — another request may have loaded it
|
||||
node, nm, err := r.registry.FindAndLockNodeWithModel(ctx, trackingKey, candidateNodeIDs, pref)
|
||||
@@ -916,7 +946,14 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod
|
||||
}
|
||||
|
||||
key := fmt.Sprintf("%s|%s|%s|%d", node.ID, backendType, modelID, replicaIndex)
|
||||
v, err, _ := r.installFlight.Do(key, func() (any, error) {
|
||||
// DoChan rather than Do so this wait honors ctx cancellation. InstallBackend
|
||||
// blocks for its full NATS deadline (15m by default) when a worker accepts
|
||||
// the request but never replies (e.g. it died mid-install). Without ctx
|
||||
// awareness the caller (holding the per-model advisory lock) would sit there
|
||||
// the whole time; here a cancelled ctx (typically the model-load ceiling)
|
||||
// frees the caller promptly. The shared install keeps running in the
|
||||
// background and still coalesces other callers via singleflight.
|
||||
resCh := r.installFlight.DoChan(key, func() (any, error) {
|
||||
reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex, "", nil)
|
||||
if err != nil {
|
||||
return "", err
|
||||
@@ -931,10 +968,15 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod
|
||||
}
|
||||
return addr, nil
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return "", ctx.Err()
|
||||
case res := <-resCh:
|
||||
if res.Err != nil {
|
||||
return "", res.Err
|
||||
}
|
||||
return res.Val.(string), nil
|
||||
}
|
||||
return v.(string), nil
|
||||
}
|
||||
|
||||
func (r *SmartRouter) buildClientForAddr(node *BackendNode, addr string, parallel bool) grpc.Backend {
|
||||
|
||||
@@ -493,6 +493,44 @@ var _ = Describe("SmartRouter", func() {
|
||||
Expect(result.Node.ID).To(Equal("n3"))
|
||||
})
|
||||
})
|
||||
|
||||
Context("worker wedges mid-install (dead node holding the lock)", func() {
|
||||
It("aborts the load at the ModelLoadCeiling instead of blocking forever", func() {
|
||||
// Simulate the production incident: the chosen worker accepts the
|
||||
// backend.install but never replies (it died), so InstallBackend
|
||||
// would otherwise block for its full NATS deadline (15m by
|
||||
// default) while pinning the per-model advisory lock. Route must
|
||||
// give up at the ceiling so the lock is released promptly.
|
||||
reg.findAndLockErr = errors.New("not found")
|
||||
reg.findIdleNode = &BackendNode{ID: "n4", Name: "dead-node", Address: "10.0.0.4:50051"}
|
||||
|
||||
block := make(chan struct{})
|
||||
defer close(block) // let the background install goroutine drain at test end
|
||||
unloader.installHook = func() { <-block }
|
||||
|
||||
router := NewSmartRouter(reg, SmartRouterOptions{
|
||||
Unloader: unloader,
|
||||
ClientFactory: factory,
|
||||
ModelLoadCeiling: 200 * time.Millisecond,
|
||||
})
|
||||
|
||||
done := make(chan error, 1)
|
||||
start := time.Now()
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
_, err := router.Route(context.Background(), "wedged-model",
|
||||
"models/wedged.gguf", "llama-cpp",
|
||||
&pb.ModelOptions{Model: "models/wedged.gguf"}, false)
|
||||
done <- err
|
||||
}()
|
||||
|
||||
var routeErr error
|
||||
Eventually(done, 5*time.Second).Should(Receive(&routeErr),
|
||||
"Route must not block on a wedged install past the ceiling")
|
||||
Expect(routeErr).To(HaveOccurred())
|
||||
Expect(time.Since(start)).To(BeNumerically("<", 5*time.Second))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
Describe("scheduleNewModel (mock-based, via Route)", func() {
|
||||
|
||||
Reference in New Issue
Block a user