Compare commits

..

3 Commits

Author SHA1 Message Date
Ettore Di Giacinto
9d896a37b2 Merge branch 'master' into fix/distributed-model-load-lock-timeout 2026-06-30 22:34:00 +02:00
Ettore Di Giacinto
2972165e53 fix(distributed): bound advisory-lock wait instead of disabling lock_timeout
Setting lock_timeout = 0 to override a deployment's short global lock_timeout
meant "wait forever" server-side. Safe for SmartRouter.Route (its loadCtx now
carries the model-load ceiling) but unsafe for the schema-migration callers
that pass context.Background(): a holder whose session never releases would
hang them indefinitely.

Derive the server-side lock_timeout from the caller's context instead: its
remaining budget plus a margin (so the Go context's cancellation still wins
with a clean error and the server bound is only a backstop), or a finite
30m backstop when the context has no deadline. Never zero - "wait forever"
is no longer possible, while a deployment's hostile short lock_timeout is
still overridden so legitimate cross-replica waits don't fail with 55P03.

Added a spec proving a deadline-less waiter gives up at the (shrunk) backstop
rather than hanging.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]
2026-06-30 07:23:12 +00:00
Ettore Di Giacinto
a73516f9b4 fix(distributed): don't let a dead worker pin the model-load advisory lock
In distributed mode a chat request could fail with:

  failed to route model with internal loader: routing model ...:
  loading model ...: advisorylock: acquiring lock <id>:
  ERROR: canceling statement due to lock timeout (SQLSTATE 55P03)

Root cause is two independent defects in the cross-replica model-load path:

1. SmartRouter.Route holds a per-model PostgreSQL advisory lock for the whole
   cold-load sequence, which includes installBackendOnNode -> InstallBackend,
   a NATS request-reply with a 15m deadline (DefaultBackendInstallTimeout) that
   ignored ctx. When the chosen worker died mid-install, the holder sat on the
   lock for up to 15m. The detached loadCtx (WithoutCancel) had no deadline, so
   nothing capped the hold.

2. The acquiring statement, pg_advisory_lock(), is subject to any deployment
   global lock_timeout. A common operator setting (e.g. 10s) aborts the wait
   with SQLSTATE 55P03, so every other replica's request for that model hard
   -errored instead of waiting for the in-progress load and reusing it. For the
   ~15m window the model was effectively unroutable.

Fixes:

- advisorylock.WithLockCtx (postgres): SET lock_timeout = 0 on its dedicated
  connection (RESET before it returns to the pool) so the Go context, not a
  deployment-wide GUC, governs how long we wait. Waiters now block and then
  re-check, reusing the model another replica just loaded.

- SmartRouter: bound the detached loadCtx with a single ModelLoadCeiling so the
  lock is always released in bounded time even if a sub-step wedges. Default is
  the configured backend.install deadline + 10m (staging + LoadModel margin),
  so a legitimately slow load is never cut.

- installBackendOnNode: use singleflight.DoChan + select on ctx.Done() so the
  install wait honors cancellation; the ceiling can then actually free a caller
  pinned behind a dead worker. The shared install still coalesces via
  singleflight.

Reproduced both defects as failing tests first (a real 55P03 against a
testcontainer with a short lock_timeout; a wedged install that blocks Route)
and confirmed green.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]
2026-06-29 22:51:17 +00:00
17 changed files with 236 additions and 303 deletions

View File

@@ -1,5 +1,5 @@
IK_LLAMA_VERSION?=29431b31c89e79c10f8736e8f2742485ba1713d6
IK_LLAMA_VERSION?=f74a6fb87b315b2c3154166e075360e15021a61d
LLAMA_REPO?=https://github.com/ikawrakow/ik_llama.cpp
CMAKE_ARGS?=

View File

@@ -1,5 +1,5 @@
LLAMA_VERSION?=0eca4d490e591d4e93058d07540cf47278a72577
LLAMA_VERSION?=6f4f53f2b7da54fcdbbecaaa734337c337ad6176
LLAMA_REPO?=https://github.com/ggerganov/llama.cpp
CMAKE_ARGS?=

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# CrispASR version (release tag)
CRISPASR_REPO?=https://github.com/CrispStrobe/CrispASR
CRISPASR_VERSION?=8fd9db8fec8cb5e929d23d3267ed5817794feb1a
CRISPASR_VERSION?=3b93758f9725d400eca82976f895e4cec3f31260
SO_TARGET?=libgocrispasr.so
CMAKE_ARGS+=-DBUILD_SHARED_LIBS=OFF

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# stablediffusion.cpp (ggml)
STABLEDIFFUSION_GGML_REPO?=https://github.com/leejet/stable-diffusion.cpp
STABLEDIFFUSION_GGML_VERSION?=484baa41e5e006c52dcd4addc38c830b9489745f
STABLEDIFFUSION_GGML_VERSION?=3b6c9ca97cfcda8e68e719e6670d06379fcbe943
CMAKE_ARGS+=-DGGML_MAX_NAME=128

View File

@@ -798,7 +798,6 @@ void sd_img_gen_params_set_seed(sd_img_gen_params_t *params, int64_t seed) {
int gen_image(sd_img_gen_params_t *p, int steps, char *dst, float cfg_scale, char *src_image, float strength, char *mask_image, char* ref_images[], int ref_images_count) {
sd_image_t* results;
int num_results_out = 0;
std::vector<int> skip_layers = {7, 8, 9};
@@ -995,14 +994,10 @@ int gen_image(sd_img_gen_params_t *p, int steps, char *dst, float cfg_scale, cha
sd_ctx_params_to_str(&ctx_params),
sd_img_gen_params_to_str(p));
bool gen_ok = generate_image(sd_c, p, &results, &num_results_out);
results = generate_image(sd_c, p);
std::free(p);
if (!gen_ok || num_results_out == 0) {
results = NULL;
}
if (results == NULL) {
fprintf (stderr, "NO results\n");
if (input_image_buffer) free(input_image_buffer);

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# whisper.cpp version
WHISPER_REPO?=https://github.com/ggml-org/whisper.cpp
WHISPER_CPP_VERSION?=0874de3e8e8e48361dba85c7fe6d176f008bf158
WHISPER_CPP_VERSION?=0ae02cdb2c7317b50991367c165736ce42ed96ac
SO_TARGET?=libgowhisper.so
CMAKE_ARGS+=-DBUILD_SHARED_LIBS=OFF

View File

@@ -104,7 +104,7 @@ if [ "$(uname -s)" = "Darwin" ]; then
# can rewrite it. Darwin therefore follows vllm-metal and can lag the Linux
# vllm pin (requirements-cublas13-after.txt, bumped independently against
# vllm/vllm) until vllm-metal supports a newer vLLM.
VLLM_METAL_VERSION="v0.3.0.dev20260630095652"
VLLM_METAL_VERSION="v0.3.0.dev20260628073537"
# The coupled vLLM source version is whatever this vllm-metal release builds
# against -- it declares it in its own installer as `vllm_v=`. Derive it from

View File

@@ -3,8 +3,8 @@
# on a cu130 host. Pull the cu130-flavoured wheel from vLLM's per-tag index
# instead — the cublas13 case in install.sh adds --index-strategy=unsafe-best-match
# so uv consults this index alongside PyPI.
--extra-index-url https://wheels.vllm.ai/0.24.0/cu130
--extra-index-url https://wheels.vllm.ai/0.23.0/cu130
# VERSION COUPLING: darwin/Apple-Silicon builds use vllm-metal (see install.sh),
# which pins this exact vLLM version. Bumping vllm here means coordinating with a
# vllm-metal release that supports the new version, or macOS/Metal builds break.
vllm==0.24.0
vllm==0.23.0

View File

@@ -356,6 +356,12 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoade
PrefixConfig: prefixCfg,
Pressure: pressure,
SharedModels: cfg.Distributed.SharedModels,
// Cap how long a cold load may hold the per-model advisory lock: the
// configured backend.install deadline plus a margin for file staging and
// the remote LoadModel. Derived from the install timeout so raising it
// (for slow links pulling multi-GB images) widens the ceiling too,
// instead of letting the static default cut a legitimately slow load.
ModelLoadCeiling: cfg.Distributed.BackendInstallTimeoutOrDefault() + 10*time.Minute,
})
// Wire staging-progress broadcasting so file-staging shows up on every

View File

@@ -6,10 +6,39 @@ import (
"hash/fnv"
"strings"
"sync"
"time"
"gorm.io/gorm"
)
// advisoryLockWaitBackstop bounds, server-side, how long we will wait to
// acquire a blocking advisory lock when the caller's context carries no
// deadline (e.g. a startup schema migration using context.Background()). It
// only exists so such a caller cannot hang forever behind a holder whose
// session never releases the lock; it is far longer than any legitimate
// guarded section. A var (not const) so tests can shrink it.
var advisoryLockWaitBackstop = 30 * time.Minute
// advisoryLockTimeoutMargin is added to a context's remaining budget when
// deriving the server-side lock_timeout, so the Go context's own (cleaner)
// cancellation fires first and the server bound is only ever a backstop.
const advisoryLockTimeoutMargin = 30 * time.Second
// advisoryLockWaitBudget returns the server-side lock_timeout to use for a
// blocking acquire: the caller context's remaining time plus a margin (so the
// Go context still governs), or the backstop when the context has no deadline.
// Never returns zero - "wait forever" must not be possible.
func advisoryLockWaitBudget(ctx context.Context) time.Duration {
if dl, ok := ctx.Deadline(); ok {
budget := time.Until(dl) + advisoryLockTimeoutMargin
if budget < time.Second {
budget = time.Second
}
return budget
}
return advisoryLockWaitBackstop
}
// localLocks holds one buffered channel (capacity 1) per lock key, used as an
// in-process mutex for non-PostgreSQL dialects (SQLite). A SQLite auth DB is
// effectively single-process, so serializing guarded sections within this
@@ -130,6 +159,27 @@ func WithLockCtx(ctx context.Context, db *gorm.DB, key int64, fn func() error) e
}
defer conn.Close()
// Override any deployment-wide lock_timeout on this dedicated connection.
// Operators commonly set a short global lock_timeout (on the role or
// database) to bound ordinary row-lock waits. Applied to the blocking
// pg_advisory_lock below, it aborts the wait with SQLSTATE 55P03 and turns
// LocalAI's intentional cross-replica "wait your turn, then re-check"
// coordination into a hard error for the caller (e.g. a chat request that
// just wanted to reuse a model another replica is loading).
//
// We do NOT disable it outright (lock_timeout = 0 would wait forever, which
// is unsafe for the schema-migration callers that pass context.Background()).
// Instead we set a bound derived from the caller's context: its remaining
// budget plus a margin so the Go context's cancellation wins with a clean
// error, or a finite backstop when the context has no deadline.
waitBudget := advisoryLockWaitBudget(ctx)
if _, err := conn.ExecContext(ctx,
fmt.Sprintf("SET lock_timeout = %d", waitBudget.Milliseconds())); err != nil {
return fmt.Errorf("advisorylock: setting lock_timeout: %w", err)
}
// Restore the session default before this pooled connection is reused.
defer func() { _, _ = conn.ExecContext(context.Background(), "RESET lock_timeout") }()
if _, err := conn.ExecContext(ctx, "SELECT pg_advisory_lock($1)", key); err != nil {
return fmt.Errorf("advisorylock: acquiring lock %d: %w", key, err)
}

View File

@@ -158,6 +158,87 @@ var _ = Describe("AdvisoryLock", func() {
Expect(err).To(HaveOccurred())
})
It("waits out a short server-side lock_timeout instead of failing with 55P03", func() {
const lockKey int64 = 703
// Reproduce the production deployment that triggered this: a short
// global lock_timeout set on the database. Without the fix, a waiter
// blocked on pg_advisory_lock() is aborted by the server after this
// window and surfaces SQLSTATE 55P03 ("canceling statement due to
// lock timeout") to the caller instead of waiting for its turn.
Expect(db.Exec("ALTER DATABASE testdb SET lock_timeout = '300ms'").Error).ToNot(HaveOccurred())
sqlDB, err := db.DB()
Expect(err).ToNot(HaveOccurred())
// Drop pooled connections so subsequent ones reconnect and inherit
// the new database-level lock_timeout default.
sqlDB.SetMaxIdleConns(0)
holding := make(chan struct{})
released := make(chan struct{})
go func() {
defer GinkgoRecover()
herr := WithLockCtx(context.Background(), db, lockKey, func() error {
close(holding)
// Hold well past the 300ms server lock_timeout.
time.Sleep(1 * time.Second)
return nil
})
Expect(herr).ToNot(HaveOccurred())
close(released)
}()
<-holding // ensure the holder owns the lock before we contend
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
executed := false
start := time.Now()
werr := WithLockCtx(ctx, db, lockKey, func() error {
executed = true
return nil
})
Expect(werr).ToNot(HaveOccurred(),
"waiter should wait out the in-progress hold, not fail with lock_timeout (55P03)")
Expect(executed).To(BeTrue())
Expect(time.Since(start)).To(BeNumerically(">=", 400*time.Millisecond),
"waiter should have actually waited for the holder to release")
<-released
})
It("bounds a deadline-less waiter with the backstop instead of waiting forever", func() {
const lockKey int64 = 704
// A caller with no context deadline (e.g. startup schema migration
// passing context.Background()) must not hang forever if the holder
// never releases. Shrink the backstop so the test is fast.
origBackstop := advisoryLockWaitBackstop
advisoryLockWaitBackstop = 500 * time.Millisecond
DeferCleanup(func() { advisoryLockWaitBackstop = origBackstop })
holding := make(chan struct{})
release := make(chan struct{})
go func() {
defer GinkgoRecover()
_ = WithLockCtx(context.Background(), db, lockKey, func() error {
close(holding)
<-release // hold until the test releases us
return nil
})
}()
defer close(release)
<-holding
start := time.Now()
err := WithLockCtx(context.Background(), db, lockKey, func() error {
Fail("waiter should not have acquired the still-held lock")
return nil
})
Expect(err).To(HaveOccurred(), "deadline-less waiter should give up at the backstop, not hang")
Expect(time.Since(start)).To(BeNumerically("<", 5*time.Second),
"backstop must cap the wait well under the test timeout")
})
It("serializes concurrent WithLockCtx on same key", func() {
const lockKey int64 = 702

View File

@@ -68,6 +68,13 @@ type SmartRouterOptions struct {
// the absolute model paths untouched so the worker loads them directly from
// the shared volume (#10556). See config.DistributedConfig.SharedModels.
SharedModels bool
// ModelLoadCeiling is the hard upper bound on how long a single cold-load
// attempt (node selection -> backend install -> file staging -> LoadModel)
// may run while holding the per-model advisory lock. It backstops every
// sub-step's own timeout so a wedged worker can never pin the lock - and
// every other replica's request for that model - indefinitely. Zero selects
// defaultModelLoadCeiling.
ModelLoadCeiling time.Duration
}
// SmartRouter routes inference requests to the best available backend node.
@@ -101,8 +108,18 @@ type SmartRouter struct {
// sharedModels skips file staging when all nodes mount the same models
// directory at the same path (see SmartRouterOptions.SharedModels).
sharedModels bool
// modelLoadCeiling bounds how long a cold load may hold the per-model
// advisory lock (see SmartRouterOptions.ModelLoadCeiling).
modelLoadCeiling time.Duration
}
// defaultModelLoadCeiling is the fallback hold ceiling for a cold model load.
// It must comfortably exceed the slowest legitimate load - a multi-GB backend
// install (DefaultBackendInstallTimeout, 15m) plus staging and the remote
// LoadModel (5m) - so it never cuts a real load short; it only ever fires when
// a step is genuinely wedged (e.g. a worker that died mid-install).
const defaultModelLoadCeiling = 25 * time.Minute
// probeCacheTTL is how long a successful gRPC HealthCheck on a backend is
// trusted before the next request re-probes. Matches healthCheckTTL in
// pkg/model/model.go so the single-process and distributed paths share a
@@ -117,6 +134,10 @@ func NewSmartRouter(registry ModelRouter, opts SmartRouterOptions) *SmartRouter
if factory == nil {
factory = &tokenClientFactory{token: opts.AuthToken}
}
ceiling := opts.ModelLoadCeiling
if ceiling <= 0 {
ceiling = defaultModelLoadCeiling
}
return &SmartRouter{
registry: registry,
unloader: opts.Unloader,
@@ -131,6 +152,7 @@ func NewSmartRouter(registry ModelRouter, opts SmartRouterOptions) *SmartRouter
prefixConfig: opts.PrefixConfig,
pressure: opts.Pressure,
sharedModels: opts.SharedModels,
modelLoadCeiling: ceiling,
}
}
@@ -383,11 +405,19 @@ func (r *SmartRouter) Route(ctx context.Context, modelID, modelName, backendType
// the request context. If staging were bound to it, the multi-GB upload
// aborts with "context canceled" mid-transfer and large models can never
// finish staging (the model-load outage). WithoutCancel keeps the request's
// values (prefix chain, etc.) but drops its cancellation/deadline. Each
// long step still has its own bound (the file stager's resume budget,
// LoadModel's 5m timeout), and the per-model advisory lock below de-dupes
// concurrent loaders across replicas.
loadCtx := context.WithoutCancel(ctx)
// values (prefix chain, etc.) but drops its cancellation/deadline.
//
// Detaching from the caller is necessary, but it must not be unbounded: the
// load runs while holding the per-model advisory lock, and a worker that
// dies mid-install (its backend.install never replies) would otherwise pin
// that lock (and every other replica's request for the same model) until
// the NATS install deadline alone expires. Re-impose a single hard ceiling
// over the whole sequence so the lock is always released in bounded time,
// even if a sub-step wedges. Each long step still has its own (tighter)
// bound; this only backstops them. The per-model advisory lock below
// de-dupes concurrent loaders across replicas.
loadCtx, cancelLoad := context.WithTimeout(context.WithoutCancel(ctx), r.modelLoadCeiling)
defer cancelLoad()
loadModel := func(ctx context.Context) (*RouteResult, error) {
// Re-check after acquiring lock — another request may have loaded it
node, nm, err := r.registry.FindAndLockNodeWithModel(ctx, trackingKey, candidateNodeIDs, pref)
@@ -916,7 +946,14 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod
}
key := fmt.Sprintf("%s|%s|%s|%d", node.ID, backendType, modelID, replicaIndex)
v, err, _ := r.installFlight.Do(key, func() (any, error) {
// DoChan rather than Do so this wait honors ctx cancellation. InstallBackend
// blocks for its full NATS deadline (15m by default) when a worker accepts
// the request but never replies (e.g. it died mid-install). Without ctx
// awareness the caller (holding the per-model advisory lock) would sit there
// the whole time; here a cancelled ctx (typically the model-load ceiling)
// frees the caller promptly. The shared install keeps running in the
// background and still coalesces other callers via singleflight.
resCh := r.installFlight.DoChan(key, func() (any, error) {
reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex, "", nil)
if err != nil {
return "", err
@@ -931,10 +968,15 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod
}
return addr, nil
})
if err != nil {
return "", err
select {
case <-ctx.Done():
return "", ctx.Err()
case res := <-resCh:
if res.Err != nil {
return "", res.Err
}
return res.Val.(string), nil
}
return v.(string), nil
}
func (r *SmartRouter) buildClientForAddr(node *BackendNode, addr string, parallel bool) grpc.Backend {

View File

@@ -493,6 +493,44 @@ var _ = Describe("SmartRouter", func() {
Expect(result.Node.ID).To(Equal("n3"))
})
})
Context("worker wedges mid-install (dead node holding the lock)", func() {
It("aborts the load at the ModelLoadCeiling instead of blocking forever", func() {
// Simulate the production incident: the chosen worker accepts the
// backend.install but never replies (it died), so InstallBackend
// would otherwise block for its full NATS deadline (15m by
// default) while pinning the per-model advisory lock. Route must
// give up at the ceiling so the lock is released promptly.
reg.findAndLockErr = errors.New("not found")
reg.findIdleNode = &BackendNode{ID: "n4", Name: "dead-node", Address: "10.0.0.4:50051"}
block := make(chan struct{})
defer close(block) // let the background install goroutine drain at test end
unloader.installHook = func() { <-block }
router := NewSmartRouter(reg, SmartRouterOptions{
Unloader: unloader,
ClientFactory: factory,
ModelLoadCeiling: 200 * time.Millisecond,
})
done := make(chan error, 1)
start := time.Now()
go func() {
defer GinkgoRecover()
_, err := router.Route(context.Background(), "wedged-model",
"models/wedged.gguf", "llama-cpp",
&pb.ModelOptions{Model: "models/wedged.gguf"}, false)
done <- err
}()
var routeErr error
Eventually(done, 5*time.Second).Should(Receive(&routeErr),
"Route must not block on a wedged install past the ceiling")
Expect(routeErr).To(HaveOccurred())
Expect(time.Since(start)).To(BeNumerically("<", 5*time.Second))
})
})
})
Describe("scheduleNewModel (mock-based, via Route)", func() {

View File

@@ -1,3 +1,3 @@
{
"version": "v4.5.6"
"version": "v4.5.5"
}

View File

@@ -1,105 +0,0 @@
package grpc
import (
"log"
"os"
"runtime"
"strings"
"time"
)
// Backend worker processes (the per-model gRPC servers LocalAI spawns) are
// deliberately placed in their own process group by the process manager so
// LocalAI's graceful shutdown can signal the whole group. That graceful path
// (SIGTERM -> grace -> SIGKILL, driven by pkg/signals + pkg/model) only runs
// when LocalAI itself receives a catchable signal and lives long enough to run
// its handlers. If LocalAI is SIGKILLed (e.g. a supervising process's
// graceful-shutdown grace period elapses first), that teardown never runs and
// this backend would be reparented to init and linger, holding VRAM and its
// listen port.
//
// The watcher below is a best-effort backstop for exactly that case: it does
// NOT replace the graceful teardown, it only covers the "parent vanished
// without cleaning up" path. It works by detecting reparenting: when the
// process that spawned this backend dies, the kernel reparents us to the
// nearest sub-reaper or to init (PID 1), so getppid() stops matching the value
// we captured at startup. This getppid() approach is portable across
// Linux/macOS (unlike Linux-only PR_SET_PDEATHSIG), which is why it's used
// here rather than a kernel parent-death signal.
const (
// EnvBackendParentWatch toggles the parent-death watcher. It is enabled by
// default; set it to a falsey value ("false", "0", "no", "off") to disable
// (e.g. when running a backend standalone for debugging under a shell whose
// lifetime shouldn't govern the backend).
EnvBackendParentWatch = "LOCALAI_BACKEND_PARENT_WATCH"
// EnvBackendParentWatchInterval overrides the poll interval as a Go
// duration string (e.g. "500ms"). Defaults to defaultParentWatchInterval.
EnvBackendParentWatchInterval = "LOCALAI_BACKEND_PARENT_WATCH_INTERVAL"
defaultParentWatchInterval = 2 * time.Second
)
// parentWatchEnabled reports whether the watcher should run in this process.
func parentWatchEnabled() bool {
switch strings.ToLower(strings.TrimSpace(os.Getenv(EnvBackendParentWatch))) {
case "false", "0", "no", "off":
return false
}
// Windows does not reparent orphans to a well-known init PID, so the
// getppid() heuristic used here doesn't apply there.
return runtime.GOOS != "windows"
}
// parentWatchInterval returns the configured poll interval, or the default.
func parentWatchInterval() time.Duration {
if v := os.Getenv(EnvBackendParentWatchInterval); v != "" {
if d, err := time.ParseDuration(v); err == nil && d > 0 {
return d
}
}
return defaultParentWatchInterval
}
// parentDied reports whether this process has been reparented away from the
// parent it had when the watcher started. Reparenting is the standard POSIX
// signal that the original parent (here, the LocalAI process that spawned this
// backend) has exited: the orphan is handed to the nearest sub-reaper or to
// init (PID 1), so getppid() no longer matches the value captured at startup.
func parentDied(origPPID int) bool {
ppid := os.Getppid()
return ppid != origPPID || ppid == 1
}
// watchParentDeath polls until parentDied reports the original parent is gone,
// then invokes onDeath. It blocks, so run it in its own goroutine.
func watchParentDeath(origPPID int, interval time.Duration, onDeath func()) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
if parentDied(origPPID) {
onDeath()
return
}
}
}
// startParentDeathWatcher installs the best-effort safety net described above
// on the calling backend process. It is a no-op when disabled or on platforms
// where the mechanism doesn't apply. This is a backstop alongside — never a
// replacement for — LocalAI's graceful SIGTERM->grace->SIGKILL teardown.
func startParentDeathWatcher() {
if !parentWatchEnabled() {
return
}
origPPID := os.Getppid()
// A parent of 1 at startup means we were already orphaned (or launched
// directly under init) — there's no original parent to watch for.
if origPPID <= 1 {
return
}
interval := parentWatchInterval()
go watchParentDeath(origPPID, interval, func() {
log.Printf("backend parent process (pid %d) exited without stopping this backend; self-terminating to avoid orphaning", origPPID)
os.Exit(1)
})
}

View File

@@ -1,168 +0,0 @@
//go:build !windows
package grpc
import (
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"syscall"
"testing"
"time"
)
// These env vars drive the helper roles this test binary re-executes itself as
// (see the init() dispatcher). They are only set for the spawned child/
// grandchild processes, never for the normal `go test` invocation.
const (
envRole = "LOCALAI_PARENTWATCH_TEST_ROLE"
envReady = "LOCALAI_PARENTWATCH_TEST_READY" // grandchild writes its PID here once the watcher is armed
envExited = "LOCALAI_PARENTWATCH_TEST_EXITED" // grandchild writes here when it detects reparenting
)
// init dispatches the helper roles when this test binary is re-executed with a
// role set. It runs before the testing/Ginkgo machinery, and is a no-op during
// a normal test run (role unset).
func init() {
switch os.Getenv(envRole) {
case "middle":
runMiddleRole()
case "grandchild":
runGrandchildRole()
}
}
// childEnv returns the current environment with the parentwatch test role set
// to the given value (replacing any inherited role), leaving the ready/exited
// file paths inherited.
func childEnv(role string) []string {
out := make([]string, 0, len(os.Environ())+1)
for _, kv := range os.Environ() {
if len(kv) > len(envRole) && kv[:len(envRole)+1] == envRole+"=" {
continue
}
out = append(out, kv)
}
return append(out, envRole+"="+role)
}
// runGrandchildRole arms the REAL watchParentDeath against its current parent
// (the "middle" process), signals readiness, then blocks. When middle exits and
// we are reparented, the watcher fires and we record it before exiting.
func runGrandchildRole() {
exitedFile := os.Getenv(envExited)
readyFile := os.Getenv(envReady)
origPPID := os.Getppid()
go watchParentDeath(origPPID, 50*time.Millisecond, func() {
_ = os.WriteFile(exitedFile, []byte("1"), 0o644)
os.Exit(7)
})
// Safety valve: never linger if something goes wrong with the test.
go func() {
time.Sleep(30 * time.Second)
os.Exit(2)
}()
// Signal readiness only after the watcher captured origPPID, so middle
// won't exit before we've recorded it as our original parent.
_ = os.WriteFile(readyFile, []byte(strconv.Itoa(os.Getpid())), 0o644)
select {} // block until the watcher terminates us
}
// runMiddleRole spawns the grandchild (which arms the watcher against us),
// waits until it is ready, then exits — orphaning the grandchild so it gets
// reparented, which is what the watcher must detect.
func runMiddleRole() {
readyFile := os.Getenv(envReady)
self, err := os.Executable()
if err != nil {
os.Exit(3)
}
cmd := exec.Command(self)
cmd.Env = childEnv("grandchild")
// Own process group, mirroring how real backends are spawned, and discard
// std streams so the grandchild doesn't keep any parent pipe open.
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
if err := cmd.Start(); err != nil {
os.Exit(4)
}
if !waitForFile(readyFile, 10*time.Second) {
os.Exit(5)
}
os.Exit(0) // orphan the grandchild
}
func waitForFile(path string, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if _, err := os.Stat(path); err == nil {
return true
}
time.Sleep(20 * time.Millisecond)
}
return false
}
// TestParentDeathWatcherDetectsReparent builds a genuine two-level process tree
// (test -> middle -> grandchild), lets the middle process die, and asserts the
// grandchild's watchParentDeath detects the reparenting and self-terminates.
func TestParentDeathWatcherDetectsReparent(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("parent-death watcher is not supported on windows")
}
dir := t.TempDir()
readyFile := filepath.Join(dir, "ready")
exitedFile := filepath.Join(dir, "exited")
self, err := os.Executable()
if err != nil {
t.Fatalf("cannot resolve test executable: %v", err)
}
middle := exec.Command(self)
middle.Env = append(childEnv("middle"),
envReady+"="+readyFile,
envExited+"="+exitedFile,
)
// Discard the helpers' output; keep the test log clean.
middle.Stdout = nil
middle.Stderr = nil
if err := middle.Start(); err != nil {
t.Fatalf("failed to start middle helper: %v", err)
}
// Wait only for the middle process; the grandchild is intentionally left
// orphaned. No pipes are shared, so this returns as soon as middle exits.
if err := middle.Wait(); err != nil {
t.Fatalf("middle helper exited with error: %v", err)
}
// The grandchild must have armed the watcher (and thus captured middle as
// its parent) before middle exited.
if _, err := os.Stat(readyFile); err != nil {
t.Fatalf("grandchild never signaled readiness: %v", err)
}
// Best-effort cleanup in case the watcher somehow doesn't fire.
t.Cleanup(func() {
if b, err := os.ReadFile(readyFile); err == nil {
if pid, err := strconv.Atoi(string(b)); err == nil {
_ = syscall.Kill(pid, syscall.SIGKILL)
}
}
})
// Now that middle is gone, the grandchild has been reparented; the watcher
// must notice and write the exited marker.
if !waitForFile(exitedFile, 10*time.Second) {
t.Fatalf("watcher did not detect parent death within timeout")
}
}

View File

@@ -939,9 +939,6 @@ func StartServer(address string, model AIModel) error {
s := grpc.NewServer(serverOpts()...)
pb.RegisterBackendServer(s, &server{llm: model})
log.Printf("gRPC Server listening at %v", lis.Addr())
// Safety net: self-terminate if the LocalAI process that spawned this
// backend dies without running its graceful teardown (see parentwatch.go).
startParentDeathWatcher()
if err := s.Serve(lis); err != nil {
return err
}
@@ -957,9 +954,6 @@ func RunServer(address string, model AIModel) (func() error, error) {
s := grpc.NewServer(serverOpts()...)
pb.RegisterBackendServer(s, &server{llm: model})
log.Printf("gRPC Server listening at %v", lis.Addr())
// Safety net: self-terminate if the LocalAI process that spawned this
// backend dies without running its graceful teardown (see parentwatch.go).
startParentDeathWatcher()
if err = s.Serve(lis); err != nil {
return func() error {
return lis.Close()