From a73516f9b488aa174b6ea4208c85c67e0cc0b292 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Mon, 29 Jun 2026 22:51:17 +0000 Subject: [PATCH] fix(distributed): don't let a dead worker pin the model-load advisory lock In distributed mode a chat request could fail with: failed to route model with internal loader: routing model ...: loading model ...: advisorylock: acquiring lock : ERROR: canceling statement due to lock timeout (SQLSTATE 55P03) Root cause is two independent defects in the cross-replica model-load path: 1. SmartRouter.Route holds a per-model PostgreSQL advisory lock for the whole cold-load sequence, which includes installBackendOnNode -> InstallBackend, a NATS request-reply with a 15m deadline (DefaultBackendInstallTimeout) that ignored ctx. When the chosen worker died mid-install, the holder sat on the lock for up to 15m. The detached loadCtx (WithoutCancel) had no deadline, so nothing capped the hold. 2. The acquiring statement, pg_advisory_lock(), is subject to any deployment global lock_timeout. A common operator setting (e.g. 10s) aborts the wait with SQLSTATE 55P03, so every other replica's request for that model hard -errored instead of waiting for the in-progress load and reusing it. For the ~15m window the model was effectively unroutable. Fixes: - advisorylock.WithLockCtx (postgres): SET lock_timeout = 0 on its dedicated connection (RESET before it returns to the pool) so the Go context, not a deployment-wide GUC, governs how long we wait. Waiters now block and then re-check, reusing the model another replica just loaded. - SmartRouter: bound the detached loadCtx with a single ModelLoadCeiling so the lock is always released in bounded time even if a sub-step wedges. Default is the configured backend.install deadline + 10m (staging + LoadModel margin), so a legitimately slow load is never cut. - installBackendOnNode: use singleflight.DoChan + select on ctx.Done() so the install wait honors cancellation; the ceiling can then actually free a caller pinned behind a dead worker. The shared install still coalesces via singleflight. Reproduced both defects as failing tests first (a real 55P03 against a testcontainer with a short lock_timeout; a wedged install that blocks Route) and confirmed green. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] --- core/application/distributed.go | 6 ++ core/services/advisorylock/advisorylock.go | 14 +++++ .../advisorylock/advisorylock_test.go | 47 +++++++++++++++ core/services/nodes/router.go | 60 ++++++++++++++++--- core/services/nodes/router_test.go | 38 ++++++++++++ 5 files changed, 156 insertions(+), 9 deletions(-) diff --git a/core/application/distributed.go b/core/application/distributed.go index a1efe0304..40cc9fff3 100644 --- a/core/application/distributed.go +++ b/core/application/distributed.go @@ -356,6 +356,12 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoade PrefixConfig: prefixCfg, Pressure: pressure, SharedModels: cfg.Distributed.SharedModels, + // Cap how long a cold load may hold the per-model advisory lock: the + // configured backend.install deadline plus a margin for file staging and + // the remote LoadModel. Derived from the install timeout so raising it + // (for slow links pulling multi-GB images) widens the ceiling too, + // instead of letting the static default cut a legitimately slow load. + ModelLoadCeiling: cfg.Distributed.BackendInstallTimeoutOrDefault() + 10*time.Minute, }) // Wire staging-progress broadcasting so file-staging shows up on every diff --git a/core/services/advisorylock/advisorylock.go b/core/services/advisorylock/advisorylock.go index f51a6357e..1ef0acffc 100644 --- a/core/services/advisorylock/advisorylock.go +++ b/core/services/advisorylock/advisorylock.go @@ -130,6 +130,20 @@ func WithLockCtx(ctx context.Context, db *gorm.DB, key int64, fn func() error) e } defer conn.Close() + // Neutralize any deployment-wide lock_timeout on this dedicated connection. + // Operators commonly set a short global lock_timeout (on the role or + // database) to bound ordinary row-lock waits. Applied to the blocking + // pg_advisory_lock below, it aborts the wait with SQLSTATE 55P03 and turns + // LocalAI's intentional cross-replica "wait your turn, then re-check" + // coordination into a hard error for the caller (e.g. a chat request that + // just wanted to reuse a model another replica is loading). Let the Go + // context be the single source of truth for how long we wait instead. + if _, err := conn.ExecContext(ctx, "SET lock_timeout = 0"); err != nil { + return fmt.Errorf("advisorylock: disabling lock_timeout: %w", err) + } + // Restore the session default before this pooled connection is reused. + defer func() { _, _ = conn.ExecContext(context.Background(), "RESET lock_timeout") }() + if _, err := conn.ExecContext(ctx, "SELECT pg_advisory_lock($1)", key); err != nil { return fmt.Errorf("advisorylock: acquiring lock %d: %w", key, err) } diff --git a/core/services/advisorylock/advisorylock_test.go b/core/services/advisorylock/advisorylock_test.go index 709f77c29..e37034916 100644 --- a/core/services/advisorylock/advisorylock_test.go +++ b/core/services/advisorylock/advisorylock_test.go @@ -158,6 +158,53 @@ var _ = Describe("AdvisoryLock", func() { Expect(err).To(HaveOccurred()) }) + It("waits out a short server-side lock_timeout instead of failing with 55P03", func() { + const lockKey int64 = 703 + + // Reproduce the production deployment that triggered this: a short + // global lock_timeout set on the database. Without the fix, a waiter + // blocked on pg_advisory_lock() is aborted by the server after this + // window and surfaces SQLSTATE 55P03 ("canceling statement due to + // lock timeout") to the caller instead of waiting for its turn. + Expect(db.Exec("ALTER DATABASE testdb SET lock_timeout = '300ms'").Error).ToNot(HaveOccurred()) + sqlDB, err := db.DB() + Expect(err).ToNot(HaveOccurred()) + // Drop pooled connections so subsequent ones reconnect and inherit + // the new database-level lock_timeout default. + sqlDB.SetMaxIdleConns(0) + + holding := make(chan struct{}) + released := make(chan struct{}) + go func() { + defer GinkgoRecover() + herr := WithLockCtx(context.Background(), db, lockKey, func() error { + close(holding) + // Hold well past the 300ms server lock_timeout. + time.Sleep(1 * time.Second) + return nil + }) + Expect(herr).ToNot(HaveOccurred()) + close(released) + }() + + <-holding // ensure the holder owns the lock before we contend + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + executed := false + start := time.Now() + werr := WithLockCtx(ctx, db, lockKey, func() error { + executed = true + return nil + }) + Expect(werr).ToNot(HaveOccurred(), + "waiter should wait out the in-progress hold, not fail with lock_timeout (55P03)") + Expect(executed).To(BeTrue()) + Expect(time.Since(start)).To(BeNumerically(">=", 400*time.Millisecond), + "waiter should have actually waited for the holder to release") + <-released + }) + It("serializes concurrent WithLockCtx on same key", func() { const lockKey int64 = 702 diff --git a/core/services/nodes/router.go b/core/services/nodes/router.go index 664672e39..670c9ca21 100644 --- a/core/services/nodes/router.go +++ b/core/services/nodes/router.go @@ -68,6 +68,13 @@ type SmartRouterOptions struct { // the absolute model paths untouched so the worker loads them directly from // the shared volume (#10556). See config.DistributedConfig.SharedModels. SharedModels bool + // ModelLoadCeiling is the hard upper bound on how long a single cold-load + // attempt (node selection -> backend install -> file staging -> LoadModel) + // may run while holding the per-model advisory lock. It backstops every + // sub-step's own timeout so a wedged worker can never pin the lock - and + // every other replica's request for that model - indefinitely. Zero selects + // defaultModelLoadCeiling. + ModelLoadCeiling time.Duration } // SmartRouter routes inference requests to the best available backend node. @@ -101,8 +108,18 @@ type SmartRouter struct { // sharedModels skips file staging when all nodes mount the same models // directory at the same path (see SmartRouterOptions.SharedModels). sharedModels bool + // modelLoadCeiling bounds how long a cold load may hold the per-model + // advisory lock (see SmartRouterOptions.ModelLoadCeiling). + modelLoadCeiling time.Duration } +// defaultModelLoadCeiling is the fallback hold ceiling for a cold model load. +// It must comfortably exceed the slowest legitimate load - a multi-GB backend +// install (DefaultBackendInstallTimeout, 15m) plus staging and the remote +// LoadModel (5m) - so it never cuts a real load short; it only ever fires when +// a step is genuinely wedged (e.g. a worker that died mid-install). +const defaultModelLoadCeiling = 25 * time.Minute + // probeCacheTTL is how long a successful gRPC HealthCheck on a backend is // trusted before the next request re-probes. Matches healthCheckTTL in // pkg/model/model.go so the single-process and distributed paths share a @@ -117,6 +134,10 @@ func NewSmartRouter(registry ModelRouter, opts SmartRouterOptions) *SmartRouter if factory == nil { factory = &tokenClientFactory{token: opts.AuthToken} } + ceiling := opts.ModelLoadCeiling + if ceiling <= 0 { + ceiling = defaultModelLoadCeiling + } return &SmartRouter{ registry: registry, unloader: opts.Unloader, @@ -131,6 +152,7 @@ func NewSmartRouter(registry ModelRouter, opts SmartRouterOptions) *SmartRouter prefixConfig: opts.PrefixConfig, pressure: opts.Pressure, sharedModels: opts.SharedModels, + modelLoadCeiling: ceiling, } } @@ -383,11 +405,19 @@ func (r *SmartRouter) Route(ctx context.Context, modelID, modelName, backendType // the request context. If staging were bound to it, the multi-GB upload // aborts with "context canceled" mid-transfer and large models can never // finish staging (the model-load outage). WithoutCancel keeps the request's - // values (prefix chain, etc.) but drops its cancellation/deadline. Each - // long step still has its own bound (the file stager's resume budget, - // LoadModel's 5m timeout), and the per-model advisory lock below de-dupes - // concurrent loaders across replicas. - loadCtx := context.WithoutCancel(ctx) + // values (prefix chain, etc.) but drops its cancellation/deadline. + // + // Detaching from the caller is necessary, but it must not be unbounded: the + // load runs while holding the per-model advisory lock, and a worker that + // dies mid-install (its backend.install never replies) would otherwise pin + // that lock (and every other replica's request for the same model) until + // the NATS install deadline alone expires. Re-impose a single hard ceiling + // over the whole sequence so the lock is always released in bounded time, + // even if a sub-step wedges. Each long step still has its own (tighter) + // bound; this only backstops them. The per-model advisory lock below + // de-dupes concurrent loaders across replicas. + loadCtx, cancelLoad := context.WithTimeout(context.WithoutCancel(ctx), r.modelLoadCeiling) + defer cancelLoad() loadModel := func(ctx context.Context) (*RouteResult, error) { // Re-check after acquiring lock — another request may have loaded it node, nm, err := r.registry.FindAndLockNodeWithModel(ctx, trackingKey, candidateNodeIDs, pref) @@ -916,7 +946,14 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod } key := fmt.Sprintf("%s|%s|%s|%d", node.ID, backendType, modelID, replicaIndex) - v, err, _ := r.installFlight.Do(key, func() (any, error) { + // DoChan rather than Do so this wait honors ctx cancellation. InstallBackend + // blocks for its full NATS deadline (15m by default) when a worker accepts + // the request but never replies (e.g. it died mid-install). Without ctx + // awareness the caller (holding the per-model advisory lock) would sit there + // the whole time; here a cancelled ctx (typically the model-load ceiling) + // frees the caller promptly. The shared install keeps running in the + // background and still coalesces other callers via singleflight. + resCh := r.installFlight.DoChan(key, func() (any, error) { reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex, "", nil) if err != nil { return "", err @@ -931,10 +968,15 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod } return addr, nil }) - if err != nil { - return "", err + select { + case <-ctx.Done(): + return "", ctx.Err() + case res := <-resCh: + if res.Err != nil { + return "", res.Err + } + return res.Val.(string), nil } - return v.(string), nil } func (r *SmartRouter) buildClientForAddr(node *BackendNode, addr string, parallel bool) grpc.Backend { diff --git a/core/services/nodes/router_test.go b/core/services/nodes/router_test.go index e97dae122..81696ac4d 100644 --- a/core/services/nodes/router_test.go +++ b/core/services/nodes/router_test.go @@ -493,6 +493,44 @@ var _ = Describe("SmartRouter", func() { Expect(result.Node.ID).To(Equal("n3")) }) }) + + Context("worker wedges mid-install (dead node holding the lock)", func() { + It("aborts the load at the ModelLoadCeiling instead of blocking forever", func() { + // Simulate the production incident: the chosen worker accepts the + // backend.install but never replies (it died), so InstallBackend + // would otherwise block for its full NATS deadline (15m by + // default) while pinning the per-model advisory lock. Route must + // give up at the ceiling so the lock is released promptly. + reg.findAndLockErr = errors.New("not found") + reg.findIdleNode = &BackendNode{ID: "n4", Name: "dead-node", Address: "10.0.0.4:50051"} + + block := make(chan struct{}) + defer close(block) // let the background install goroutine drain at test end + unloader.installHook = func() { <-block } + + router := NewSmartRouter(reg, SmartRouterOptions{ + Unloader: unloader, + ClientFactory: factory, + ModelLoadCeiling: 200 * time.Millisecond, + }) + + done := make(chan error, 1) + start := time.Now() + go func() { + defer GinkgoRecover() + _, err := router.Route(context.Background(), "wedged-model", + "models/wedged.gguf", "llama-cpp", + &pb.ModelOptions{Model: "models/wedged.gguf"}, false) + done <- err + }() + + var routeErr error + Eventually(done, 5*time.Second).Should(Receive(&routeErr), + "Route must not block on a wedged install past the ceiling") + Expect(routeErr).To(HaveOccurred()) + Expect(time.Since(start)).To(BeNumerically("<", 5*time.Second)) + }) + }) }) Describe("scheduleNewModel (mock-based, via Route)", func() {