From 2972165e53a65b899c7a29e3bbd8d12bdfa584e9 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Tue, 30 Jun 2026 07:23:12 +0000 Subject: [PATCH] fix(distributed): bound advisory-lock wait instead of disabling lock_timeout Setting lock_timeout = 0 to override a deployment's short global lock_timeout meant "wait forever" server-side. Safe for SmartRouter.Route (its loadCtx now carries the model-load ceiling) but unsafe for the schema-migration callers that pass context.Background(): a holder whose session never releases would hang them indefinitely. Derive the server-side lock_timeout from the caller's context instead: its remaining budget plus a margin (so the Go context's cancellation still wins with a clean error and the server bound is only a backstop), or a finite 30m backstop when the context has no deadline. Never zero - "wait forever" is no longer possible, while a deployment's hostile short lock_timeout is still overridden so legitimate cross-replica waits don't fail with 55P03. Added a spec proving a deadline-less waiter gives up at the (shrunk) backstop rather than hanging. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] --- core/services/advisorylock/advisorylock.go | 46 +++++++++++++++++-- .../advisorylock/advisorylock_test.go | 34 ++++++++++++++ 2 files changed, 75 insertions(+), 5 deletions(-) diff --git a/core/services/advisorylock/advisorylock.go b/core/services/advisorylock/advisorylock.go index 1ef0acffc..a37403c31 100644 --- a/core/services/advisorylock/advisorylock.go +++ b/core/services/advisorylock/advisorylock.go @@ -6,10 +6,39 @@ import ( "hash/fnv" "strings" "sync" + "time" "gorm.io/gorm" ) +// advisoryLockWaitBackstop bounds, server-side, how long we will wait to +// acquire a blocking advisory lock when the caller's context carries no +// deadline (e.g. a startup schema migration using context.Background()). It +// only exists so such a caller cannot hang forever behind a holder whose +// session never releases the lock; it is far longer than any legitimate +// guarded section. A var (not const) so tests can shrink it. +var advisoryLockWaitBackstop = 30 * time.Minute + +// advisoryLockTimeoutMargin is added to a context's remaining budget when +// deriving the server-side lock_timeout, so the Go context's own (cleaner) +// cancellation fires first and the server bound is only ever a backstop. +const advisoryLockTimeoutMargin = 30 * time.Second + +// advisoryLockWaitBudget returns the server-side lock_timeout to use for a +// blocking acquire: the caller context's remaining time plus a margin (so the +// Go context still governs), or the backstop when the context has no deadline. +// Never returns zero - "wait forever" must not be possible. +func advisoryLockWaitBudget(ctx context.Context) time.Duration { + if dl, ok := ctx.Deadline(); ok { + budget := time.Until(dl) + advisoryLockTimeoutMargin + if budget < time.Second { + budget = time.Second + } + return budget + } + return advisoryLockWaitBackstop +} + // localLocks holds one buffered channel (capacity 1) per lock key, used as an // in-process mutex for non-PostgreSQL dialects (SQLite). A SQLite auth DB is // effectively single-process, so serializing guarded sections within this @@ -130,16 +159,23 @@ func WithLockCtx(ctx context.Context, db *gorm.DB, key int64, fn func() error) e } defer conn.Close() - // Neutralize any deployment-wide lock_timeout on this dedicated connection. + // Override any deployment-wide lock_timeout on this dedicated connection. // Operators commonly set a short global lock_timeout (on the role or // database) to bound ordinary row-lock waits. Applied to the blocking // pg_advisory_lock below, it aborts the wait with SQLSTATE 55P03 and turns // LocalAI's intentional cross-replica "wait your turn, then re-check" // coordination into a hard error for the caller (e.g. a chat request that - // just wanted to reuse a model another replica is loading). Let the Go - // context be the single source of truth for how long we wait instead. - if _, err := conn.ExecContext(ctx, "SET lock_timeout = 0"); err != nil { - return fmt.Errorf("advisorylock: disabling lock_timeout: %w", err) + // just wanted to reuse a model another replica is loading). + // + // We do NOT disable it outright (lock_timeout = 0 would wait forever, which + // is unsafe for the schema-migration callers that pass context.Background()). + // Instead we set a bound derived from the caller's context: its remaining + // budget plus a margin so the Go context's cancellation wins with a clean + // error, or a finite backstop when the context has no deadline. + waitBudget := advisoryLockWaitBudget(ctx) + if _, err := conn.ExecContext(ctx, + fmt.Sprintf("SET lock_timeout = %d", waitBudget.Milliseconds())); err != nil { + return fmt.Errorf("advisorylock: setting lock_timeout: %w", err) } // Restore the session default before this pooled connection is reused. defer func() { _, _ = conn.ExecContext(context.Background(), "RESET lock_timeout") }() diff --git a/core/services/advisorylock/advisorylock_test.go b/core/services/advisorylock/advisorylock_test.go index e37034916..a77df56c4 100644 --- a/core/services/advisorylock/advisorylock_test.go +++ b/core/services/advisorylock/advisorylock_test.go @@ -205,6 +205,40 @@ var _ = Describe("AdvisoryLock", func() { <-released }) + It("bounds a deadline-less waiter with the backstop instead of waiting forever", func() { + const lockKey int64 = 704 + + // A caller with no context deadline (e.g. startup schema migration + // passing context.Background()) must not hang forever if the holder + // never releases. Shrink the backstop so the test is fast. + origBackstop := advisoryLockWaitBackstop + advisoryLockWaitBackstop = 500 * time.Millisecond + DeferCleanup(func() { advisoryLockWaitBackstop = origBackstop }) + + holding := make(chan struct{}) + release := make(chan struct{}) + go func() { + defer GinkgoRecover() + _ = WithLockCtx(context.Background(), db, lockKey, func() error { + close(holding) + <-release // hold until the test releases us + return nil + }) + }() + defer close(release) + + <-holding + + start := time.Now() + err := WithLockCtx(context.Background(), db, lockKey, func() error { + Fail("waiter should not have acquired the still-held lock") + return nil + }) + Expect(err).To(HaveOccurred(), "deadline-less waiter should give up at the backstop, not hang") + Expect(time.Since(start)).To(BeNumerically("<", 5*time.Second), + "backstop must cap the wait well under the test timeout") + }) + It("serializes concurrent WithLockCtx on same key", func() { const lockKey int64 = 702