diff --git a/core/services/advisorylock/advisorylock.go b/core/services/advisorylock/advisorylock.go index 1ef0acffc..a37403c31 100644 --- a/core/services/advisorylock/advisorylock.go +++ b/core/services/advisorylock/advisorylock.go @@ -6,10 +6,39 @@ import ( "hash/fnv" "strings" "sync" + "time" "gorm.io/gorm" ) +// advisoryLockWaitBackstop bounds, server-side, how long we will wait to +// acquire a blocking advisory lock when the caller's context carries no +// deadline (e.g. a startup schema migration using context.Background()). It +// only exists so such a caller cannot hang forever behind a holder whose +// session never releases the lock; it is far longer than any legitimate +// guarded section. A var (not const) so tests can shrink it. +var advisoryLockWaitBackstop = 30 * time.Minute + +// advisoryLockTimeoutMargin is added to a context's remaining budget when +// deriving the server-side lock_timeout, so the Go context's own (cleaner) +// cancellation fires first and the server bound is only ever a backstop. +const advisoryLockTimeoutMargin = 30 * time.Second + +// advisoryLockWaitBudget returns the server-side lock_timeout to use for a +// blocking acquire: the caller context's remaining time plus a margin (so the +// Go context still governs), or the backstop when the context has no deadline. +// Never returns zero - "wait forever" must not be possible. +func advisoryLockWaitBudget(ctx context.Context) time.Duration { + if dl, ok := ctx.Deadline(); ok { + budget := time.Until(dl) + advisoryLockTimeoutMargin + if budget < time.Second { + budget = time.Second + } + return budget + } + return advisoryLockWaitBackstop +} + // localLocks holds one buffered channel (capacity 1) per lock key, used as an // in-process mutex for non-PostgreSQL dialects (SQLite). A SQLite auth DB is // effectively single-process, so serializing guarded sections within this @@ -130,16 +159,23 @@ func WithLockCtx(ctx context.Context, db *gorm.DB, key int64, fn func() error) e } defer conn.Close() - // Neutralize any deployment-wide lock_timeout on this dedicated connection. + // Override any deployment-wide lock_timeout on this dedicated connection. // Operators commonly set a short global lock_timeout (on the role or // database) to bound ordinary row-lock waits. Applied to the blocking // pg_advisory_lock below, it aborts the wait with SQLSTATE 55P03 and turns // LocalAI's intentional cross-replica "wait your turn, then re-check" // coordination into a hard error for the caller (e.g. a chat request that - // just wanted to reuse a model another replica is loading). Let the Go - // context be the single source of truth for how long we wait instead. - if _, err := conn.ExecContext(ctx, "SET lock_timeout = 0"); err != nil { - return fmt.Errorf("advisorylock: disabling lock_timeout: %w", err) + // just wanted to reuse a model another replica is loading). + // + // We do NOT disable it outright (lock_timeout = 0 would wait forever, which + // is unsafe for the schema-migration callers that pass context.Background()). + // Instead we set a bound derived from the caller's context: its remaining + // budget plus a margin so the Go context's cancellation wins with a clean + // error, or a finite backstop when the context has no deadline. + waitBudget := advisoryLockWaitBudget(ctx) + if _, err := conn.ExecContext(ctx, + fmt.Sprintf("SET lock_timeout = %d", waitBudget.Milliseconds())); err != nil { + return fmt.Errorf("advisorylock: setting lock_timeout: %w", err) } // Restore the session default before this pooled connection is reused. defer func() { _, _ = conn.ExecContext(context.Background(), "RESET lock_timeout") }() diff --git a/core/services/advisorylock/advisorylock_test.go b/core/services/advisorylock/advisorylock_test.go index e37034916..a77df56c4 100644 --- a/core/services/advisorylock/advisorylock_test.go +++ b/core/services/advisorylock/advisorylock_test.go @@ -205,6 +205,40 @@ var _ = Describe("AdvisoryLock", func() { <-released }) + It("bounds a deadline-less waiter with the backstop instead of waiting forever", func() { + const lockKey int64 = 704 + + // A caller with no context deadline (e.g. startup schema migration + // passing context.Background()) must not hang forever if the holder + // never releases. Shrink the backstop so the test is fast. + origBackstop := advisoryLockWaitBackstop + advisoryLockWaitBackstop = 500 * time.Millisecond + DeferCleanup(func() { advisoryLockWaitBackstop = origBackstop }) + + holding := make(chan struct{}) + release := make(chan struct{}) + go func() { + defer GinkgoRecover() + _ = WithLockCtx(context.Background(), db, lockKey, func() error { + close(holding) + <-release // hold until the test releases us + return nil + }) + }() + defer close(release) + + <-holding + + start := time.Now() + err := WithLockCtx(context.Background(), db, lockKey, func() error { + Fail("waiter should not have acquired the still-held lock") + return nil + }) + Expect(err).To(HaveOccurred(), "deadline-less waiter should give up at the backstop, not hang") + Expect(time.Since(start)).To(BeNumerically("<", 5*time.Second), + "backstop must cap the wait well under the test timeout") + }) + It("serializes concurrent WithLockCtx on same key", func() { const lockKey int64 = 702