Compare commits

..

2 Commits

Author SHA1 Message Date
Ettore Di Giacinto
94e3e06b8b fix(process): extend parent-death backstop to C++ and Python backends
The Go parent-death watcher (pkg/grpc/parentwatch.go, commit 772b435d5)
only protects backends that route through pkg/grpc. C++ and Python
backends don't, so the originally-reported case — the llama.cpp gRPC
worker surviving a non-graceful LocalAI death — was still uncovered.

Extend the same best-effort backstop to both languages, reusing the
exact mechanism and semantics:

- capture getppid() at startup, skip if already orphaned (<=1)
- a background thread polls getppid() and self-exits on reparenting
  (getppid() != orig || == 1), portable across Linux/macOS, no-op on
  Windows
- same env vars: LOCALAI_BACKEND_PARENT_WATCH (default on; falsy
  false/0/no/off disable) and LOCALAI_BACKEND_PARENT_WATCH_INTERVAL
  (default 2s; accepts Go-style durations like 500ms/2s/1m)

C++: implemented in backend/cpp/llama-cpp (the reported, most-used C++
backend) as a dependency-free header parent_watch.h, wired into
grpc-server.cpp's main() and copied at build time via prepare.sh. C++
backends have no shared server scaffolding, so other C++ backends
(ds4, ik-llama-cpp, privacy-filter, ...) are not yet covered and would
each need the same one-line include+call as follow-ups.

Python: implemented once in the shared common/parent_watch.py and armed
from common/grpc_auth.py's get_auth_interceptors() — the single helper
every one of the 35 Python backends invokes while building its gRPC
server — so all Python backends (and future ones) are covered with no
per-backend edits and no duplicated implementation.

Tests (real process-tree reparent detection, mirroring the Go test):
- backend/cpp/llama-cpp/parent_watch_test.cpp (via run-unit-tests.sh)
- backend/python/common/parent_watch_test.py (python -m unittest)

Co-Authored-By: Claude Sonnet 5 <noreply@anthropic.com>
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-07-02 07:31:31 +00:00
Ettore Di Giacinto
d31bf94731 fix(grpc): self-terminate backend workers when LocalAI dies non-gracefully
Symptom: a backend model-worker subprocess (the per-model gRPC server LocalAI
spawns) can be orphaned and linger — holding VRAM and its listen port — if the
LocalAI process is killed non-gracefully (e.g. a supervisor's graceful-shutdown
grace period elapses and LocalAI is SIGKILLed) before its own teardown runs.

Root cause: LocalAI's graceful teardown (pkg/signals/handler.go installs the
SIGINT/SIGTERM handler; core/cli/run.go registers app.Shutdown ->
ModelLoader.StopAllGRPC -> process.Stop in pkg/model/process.go) only runs when
LocalAI receives a catchable signal and survives long enough to run its
handlers. Backends are spawned via github.com/mudler/go-processmanager v0.1.1,
whose getSysProcAttr() sets Setpgid:true (own process group, so the group can be
signalled) but never PR_SET_PDEATHSIG/Pdeathsig, and exposes no Config field or
option for a caller to inject/extend SysProcAttr. LocalAI fully delegates
spawning to that library (it never builds the exec.Cmd itself), so it cannot set
a kernel parent-death signal at the spawn site. If LocalAI is SIGKILLed, nothing
tells the backend to exit and it is reparented to init.

Fix: add a best-effort, backend-side safety net at the one shared choke point
every out-of-process Go backend routes through — grpc.StartServer / RunServer in
pkg/grpc. On startup it captures getppid() and polls; when the process is
reparented (getppid changes / becomes 1 — the standard POSIX signal the original
parent died) it logs and self-terminates. getppid() reparent detection is
portable (Linux + macOS), unlike Linux-only PR_SET_PDEATHSIG. Toggle via
LOCALAI_BACKEND_PARENT_WATCH (default on; off on Windows) and
LOCALAI_BACKEND_PARENT_WATCH_INTERVAL. This is strictly a backstop alongside the
existing graceful SIGTERM->grace->SIGKILL teardown, which is unchanged.

Scope/limitations: covers Go-based backends (everything using pkg/grpc). The
C++ backends (e.g. llama-cpp) and Python backends do not route through
pkg/grpc and are not covered by this mechanism — they would each need an
equivalent parent-death check (follow-up). The fully general fix is for
go-processmanager to expose SysProcAttr injection so LocalAI can set Pdeathsig
at spawn for every backend regardless of language (suggested upstream follow-up;
out of scope for this LocalAI-only PR).

Test: pkg/grpc/parentwatch_test.go builds a real test -> middle -> grandchild
process tree, lets the middle process exit to orphan the grandchild running the
real watchParentDeath, and asserts it detects the reparent and self-terminates.
Unix-only (build-tagged), runs in CI (Linux).

Co-Authored-By: Claude Sonnet 5 <noreply@anthropic.com>
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
2026-07-02 07:31:31 +00:00
41 changed files with 1007 additions and 509 deletions

View File

@@ -171,17 +171,6 @@ RUN if [ "${BUILD_TYPE}" = "hipblas" ]; then \
ln -s /opt/rocm-**/lib/llvm/lib/libomp.so /usr/lib/libomp.so \
; fi
# ROCm's bundled libdrm_amdgpu is built with a hardcoded fallback lookup path
# for the ASIC ID table (/opt/amdgpu/share/libdrm/amdgpu.ids), which only exists
# if AMD's full amdgpu graphics/DKMS stack is installed. This compute-only image
# doesn't have it, so hipblas/rocBLAS log "No such file or directory" on every
# model load and can fail to identify the GPU. Point it at the equivalent file
# Ubuntu's libdrm-common package already ships.
RUN if [ "${BUILD_TYPE}" = "hipblas" ] && [ -f /usr/share/libdrm/amdgpu.ids ] && [ ! -e /opt/amdgpu/share/libdrm/amdgpu.ids ]; then \
mkdir -p /opt/amdgpu/share/libdrm && \
ln -s /usr/share/libdrm/amdgpu.ids /opt/amdgpu/share/libdrm/amdgpu.ids \
; fi
RUN expr "${BUILD_TYPE}" = intel && echo "intel" > /run/localai/capability || echo "not intel"
# Cuda

View File

@@ -1,5 +1,5 @@
IK_LLAMA_VERSION?=068b173649f2fd8dc96b35ada5a0b76d8985105d
IK_LLAMA_VERSION?=29431b31c89e79c10f8736e8f2742485ba1713d6
LLAMA_REPO?=https://github.com/ikawrakow/ik_llama.cpp
CMAKE_ARGS?=

View File

@@ -101,4 +101,13 @@ if(LLAMA_GRPC_BUILD_TESTS)
target_link_libraries(message_content_test PRIVATE ${_LLAMA_COMMON_TARGET})
target_compile_features(message_content_test PRIVATE cxx_std_17)
add_test(NAME message_content_test COMMAND message_content_test)
# Parent-death watcher test (parent_watch.h) — standard library only, but
# needs a threading runtime for std::thread.
find_package(Threads REQUIRED)
add_executable(parent_watch_test parent_watch_test.cpp parent_watch.h)
target_include_directories(parent_watch_test PRIVATE ${CMAKE_CURRENT_SOURCE_DIR})
target_link_libraries(parent_watch_test PRIVATE Threads::Threads)
target_compile_features(parent_watch_test PRIVATE cxx_std_17)
add_test(NAME parent_watch_test COMMAND parent_watch_test)
endif()

View File

@@ -1,5 +1,5 @@
LLAMA_VERSION?=4fc4ec5541b243957ae5099edb67372f8f3b550e
LLAMA_VERSION?=0eca4d490e591d4e93058d07540cf47278a72577
LLAMA_REPO?=https://github.com/ggerganov/llama.cpp
CMAKE_ARGS?=

View File

@@ -75,6 +75,8 @@
#include <windows.h>
#endif
#include "parent_watch.h" // best-effort parent-death backstop (see header)
using grpc::Server;
using grpc::ServerBuilder;
@@ -3442,6 +3444,10 @@ int main(int argc, char** argv) {
}
}
// Best-effort backstop: self-terminate if the LocalAI process that spawned
// us dies without cleaning us up (see parent_watch.h).
llama_grpc::start_parent_death_watcher();
server_context ctx_server;
BackendServiceImpl service(ctx_server);

View File

@@ -0,0 +1,179 @@
// Parent-death watcher (best-effort backstop) for the llama.cpp gRPC backend.
//
// LocalAI spawns this backend as a child process and, on a clean shutdown,
// tears it down itself (SIGTERM -> grace -> SIGKILL). That graceful path only
// runs when LocalAI receives a catchable signal and lives long enough to run
// its handlers. If LocalAI is SIGKILLed (e.g. a supervising process's grace
// period elapses first), that teardown never runs and this backend would be
// reparented to init and linger, holding VRAM and its listen port.
//
// The watcher here is a best-effort backstop for exactly that case: it does
// NOT replace the graceful teardown, it only covers the "parent vanished
// without cleaning up" path. It detects reparenting: when the process that
// spawned this backend dies, the kernel reparents us to the nearest sub-reaper
// or to init (PID 1), so getppid() stops matching the value captured at
// startup. This getppid() approach is portable across Linux/macOS (unlike the
// Linux-only PR_SET_PDEATHSIG), which is why it is used here, mirroring the Go
// backends' pkg/grpc/parentwatch.go. It is disabled on Windows, which has no
// equivalent orphan-reparenting semantics.
//
// This header is intentionally dependency-free (C++ standard library only) so
// it can be exercised by a standalone unit test (parent_watch_test.cpp) without
// building the full llama.cpp + gRPC backend.
#ifndef LLAMA_GRPC_PARENT_WATCH_H
#define LLAMA_GRPC_PARENT_WATCH_H
#include <algorithm>
#include <cctype>
#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <functional>
#include <string>
#include <thread>
#if !defined(_WIN32)
#include <unistd.h> // getppid(2), _exit(2)
#endif
namespace llama_grpc {
// Env var names are shared verbatim with the Go and Python backends for
// consistency across languages.
inline const char *kEnvParentWatch() { return "LOCALAI_BACKEND_PARENT_WATCH"; }
inline const char *kEnvParentWatchInterval() { return "LOCALAI_BACKEND_PARENT_WATCH_INTERVAL"; }
// Default poll interval in milliseconds. Matches the Go side's 2 * time.Second.
inline long parent_watch_default_interval_ms() { return 2000; }
namespace detail {
inline std::string trim_lower(const std::string &in, bool lower) {
size_t a = in.find_first_not_of(" \t\r\n");
size_t b = in.find_last_not_of(" \t\r\n");
if (a == std::string::npos) {
return "";
}
std::string s = in.substr(a, b - a + 1);
if (lower) {
std::transform(s.begin(), s.end(), s.begin(),
[](unsigned char c) { return std::tolower(c); });
}
return s;
}
} // namespace detail
// parent_watch_enabled reports whether the watcher should run. Enabled by
// default; a falsey value ("false"/"0"/"no"/"off", case-insensitive) disables
// it, matching the Go implementation's exact semantics.
inline bool parent_watch_enabled() {
#if defined(_WIN32)
return false;
#else
const char *v = std::getenv(kEnvParentWatch());
if (v == nullptr || v[0] == '\0') {
return true;
}
const std::string s = detail::trim_lower(v, true);
return !(s == "false" || s == "0" || s == "no" || s == "off");
#endif
}
// parent_watch_interval_ms returns the poll interval in milliseconds. Accepts
// Go-style duration strings ("500ms", "2s", "1m") for cross-language parity, or
// a bare number interpreted as seconds. Defaults to
// parent_watch_default_interval_ms().
inline long parent_watch_interval_ms() {
const long def = parent_watch_default_interval_ms();
const char *v = std::getenv(kEnvParentWatchInterval());
if (v == nullptr || v[0] == '\0') {
return def;
}
const std::string s = detail::trim_lower(v, false);
if (s.empty()) {
return def;
}
size_t i = 0;
while (i < s.size() && (std::isdigit((unsigned char)s[i]) || s[i] == '.')) {
i++;
}
if (i == 0) {
return def;
}
double num = 0.0;
try {
num = std::stod(s.substr(0, i));
} catch (...) {
return def;
}
const std::string unit = s.substr(i);
long ms;
if (unit == "ms") {
ms = (long)num;
} else if (unit == "s" || unit.empty()) {
ms = (long)(num * 1000.0);
} else if (unit == "m") {
ms = (long)(num * 60000.0);
} else {
return def; // unrecognized unit
}
return ms > 0 ? ms : def;
}
#if !defined(_WIN32)
// parent_died reports whether this process has been reparented away from the
// parent it had when the watcher started. Reparenting is the standard POSIX
// signal that the original parent (here, the LocalAI process that spawned this
// backend) has exited: the orphan is handed to the nearest sub-reaper or to
// init (PID 1), so getppid() no longer matches the value captured at startup.
inline bool parent_died(pid_t orig_ppid) {
const pid_t ppid = getppid();
return ppid != orig_ppid || ppid == 1;
}
// watch_parent_death polls until parent_died reports the original parent is
// gone, then invokes on_death. It blocks, so run it on its own thread.
inline void watch_parent_death(pid_t orig_ppid, long interval_ms,
const std::function<void()> &on_death) {
for (;;) {
std::this_thread::sleep_for(std::chrono::milliseconds(interval_ms));
if (parent_died(orig_ppid)) {
on_death();
return;
}
}
}
#endif
// start_parent_death_watcher installs the best-effort safety net described in
// the file header on the calling backend process. It is a no-op when disabled,
// on Windows, or when the process is already orphaned at startup
// (getppid() <= 1). This is a backstop alongside — never a replacement for —
// LocalAI's graceful teardown.
inline void start_parent_death_watcher() {
#if !defined(_WIN32)
if (!parent_watch_enabled()) {
return;
}
const pid_t orig_ppid = getppid();
// A parent of 1 (or less) at startup means we were already orphaned (or
// launched directly under init) — there is no original parent to watch for.
if (orig_ppid <= 1) {
return;
}
const long interval_ms = parent_watch_interval_ms();
std::thread([orig_ppid, interval_ms]() {
watch_parent_death(orig_ppid, interval_ms, [orig_ppid]() {
fprintf(stderr,
"backend parent process (pid %d) exited without stopping "
"this backend; self-terminating to avoid orphaning\n",
(int)orig_ppid);
fflush(stderr);
_exit(1);
});
}).detach();
#endif
}
} // namespace llama_grpc
#endif // LLAMA_GRPC_PARENT_WATCH_H

View File

@@ -0,0 +1,197 @@
// Unit tests for the parent-death watcher (parent_watch.h).
//
// Build & run standalone (C++ standard library only, no nlohmann/json needed):
// g++ -std=c++17 -pthread parent_watch_test.cpp -o t && ./t
//
// The core test (TestDetectsReparent) builds a genuine two-level process tree
// (test -> middle -> grandchild), lets the middle process die, and asserts the
// grandchild's watch_parent_death detects the reparenting and self-terminates —
// mirroring the Go test in pkg/grpc/parentwatch_test.go, but with fork(2).
//
// On Windows this file compiles to a no-op success (the watcher is unsupported
// there), matching parent_watch.h's platform gating.
#include <cstdio>
#include <cstdlib>
#include <string>
#include "parent_watch.h"
static int failures = 0;
static void check(bool ok, const std::string &name) {
if (!ok) {
failures++;
fprintf(stderr, "FAIL: %s\n", name.c_str());
} else {
fprintf(stderr, "ok: %s\n", name.c_str());
}
}
// Env-parsing tests are platform-independent and always run.
static void test_env_parsing() {
using namespace llama_grpc;
// Interval: default when unset.
unsetenv("LOCALAI_BACKEND_PARENT_WATCH_INTERVAL");
check(parent_watch_interval_ms() == 2000, "interval default 2000ms");
setenv("LOCALAI_BACKEND_PARENT_WATCH_INTERVAL", "500ms", 1);
check(parent_watch_interval_ms() == 500, "interval 500ms");
setenv("LOCALAI_BACKEND_PARENT_WATCH_INTERVAL", "2s", 1);
check(parent_watch_interval_ms() == 2000, "interval 2s");
setenv("LOCALAI_BACKEND_PARENT_WATCH_INTERVAL", "1m", 1);
check(parent_watch_interval_ms() == 60000, "interval 1m");
setenv("LOCALAI_BACKEND_PARENT_WATCH_INTERVAL", "3", 1); // bare number -> seconds
check(parent_watch_interval_ms() == 3000, "interval bare 3 -> 3000ms");
setenv("LOCALAI_BACKEND_PARENT_WATCH_INTERVAL", "garbage", 1);
check(parent_watch_interval_ms() == 2000, "interval garbage -> default");
unsetenv("LOCALAI_BACKEND_PARENT_WATCH_INTERVAL");
#if !defined(_WIN32)
// Enabled semantics (POSIX only; always false on Windows).
unsetenv("LOCALAI_BACKEND_PARENT_WATCH");
check(parent_watch_enabled(), "enabled by default");
for (const char *falsey : {"false", "0", "no", "off", "OFF", " False "}) {
setenv("LOCALAI_BACKEND_PARENT_WATCH", falsey, 1);
check(!parent_watch_enabled(), std::string("disabled by '") + falsey + "'");
}
setenv("LOCALAI_BACKEND_PARENT_WATCH", "true", 1);
check(parent_watch_enabled(), "enabled by 'true'");
setenv("LOCALAI_BACKEND_PARENT_WATCH", "1", 1);
check(parent_watch_enabled(), "enabled by '1'");
unsetenv("LOCALAI_BACKEND_PARENT_WATCH");
#endif
}
#if !defined(_WIN32)
#include <atomic>
#include <ctime>
#include <sys/stat.h>
#include <sys/wait.h>
#include <unistd.h>
static bool file_exists(const std::string &p) {
struct stat st;
return ::stat(p.c_str(), &st) == 0;
}
static bool wait_for_file(const std::string &p, int timeout_ms) {
int waited = 0;
while (waited < timeout_ms) {
if (file_exists(p)) {
return true;
}
usleep(20 * 1000);
waited += 20;
}
return false;
}
static void write_file(const std::string &p, const std::string &content) {
FILE *f = fopen(p.c_str(), "w");
if (f) {
fwrite(content.data(), 1, content.size(), f);
fclose(f);
}
}
// Builds test -> middle -> grandchild via fork(2). The grandchild arms the REAL
// watch_parent_death against middle; middle exits, orphaning the grandchild;
// the watcher must detect the reparenting and self-terminate.
static void test_detects_reparent() {
char tmpl[] = "/tmp/parentwatch_test_XXXXXX";
char *dir = mkdtemp(tmpl);
if (dir == nullptr) {
check(false, "mkdtemp");
return;
}
const std::string ready_file = std::string(dir) + "/ready";
const std::string exited_file = std::string(dir) + "/exited";
pid_t middle = fork();
if (middle < 0) {
check(false, "fork middle");
return;
}
if (middle == 0) {
// ---- middle process ----
pid_t grandchild = fork();
if (grandchild < 0) {
_exit(4);
}
if (grandchild == 0) {
// ---- grandchild process ----
pid_t orig_ppid = getppid(); // == middle
std::thread([&]() {
llama_grpc::watch_parent_death(orig_ppid, 50 /*ms*/, [&]() {
write_file(exited_file, "1");
_exit(7);
});
}).detach();
// Safety valve: never linger if something goes wrong.
std::thread([]() {
usleep(30 * 1000 * 1000);
_exit(2);
}).detach();
// Signal readiness only after the watcher captured orig_ppid.
write_file(ready_file, std::to_string(getpid()));
for (;;) {
pause();
}
}
// middle: wait until grandchild is ready, then exit to orphan it.
if (!wait_for_file(ready_file, 10000)) {
_exit(5);
}
_exit(0);
}
// ---- test (top) process ----
int status = 0;
waitpid(middle, &status, 0); // reap middle only; grandchild is orphaned
check(file_exists(ready_file), "grandchild signaled readiness");
bool detected = wait_for_file(exited_file, 10000);
check(detected, "watcher detected parent death and self-terminated");
// Best-effort cleanup: kill the grandchild if it somehow survived.
if (file_exists(ready_file)) {
FILE *f = fopen(ready_file.c_str(), "r");
if (f) {
int pid = 0;
if (fscanf(f, "%d", &pid) == 1 && pid > 1) {
kill(pid, SIGKILL);
}
fclose(f);
}
}
unlink(ready_file.c_str());
unlink(exited_file.c_str());
rmdir(dir);
}
#endif // !_WIN32
int main() {
test_env_parsing();
#if !defined(_WIN32)
test_detects_reparent();
#endif
if (failures == 0) {
fprintf(stderr, "\nAll parent_watch tests passed.\n");
return 0;
}
fprintf(stderr, "\n%d parent_watch test(s) failed.\n", failures);
return 1;
}

View File

@@ -22,6 +22,10 @@ cp -r grpc-server.cpp llama.cpp/tools/grpc-server/
# unit test (compiled only when -DLLAMA_GRPC_BUILD_TESTS=ON).
cp -r message_content.h llama.cpp/tools/grpc-server/
cp -r message_content_test.cpp llama.cpp/tools/grpc-server/
# Parent-death watcher (included by grpc-server.cpp) and its standalone unit
# test (run via backend/cpp/run-unit-tests.sh; also buildable under ctest).
cp -r parent_watch.h llama.cpp/tools/grpc-server/
cp -r parent_watch_test.cpp llama.cpp/tools/grpc-server/
cp -rfv llama.cpp/vendor/nlohmann/json.hpp llama.cpp/tools/grpc-server/
cp -rfv llama.cpp/vendor/cpp-httplib/httplib.h llama.cpp/tools/grpc-server/

View File

@@ -54,7 +54,7 @@ for test_src in "${tests[@]}"; do
name="$(basename "$test_src" .cpp)"
bin="$(mktemp -d)/$name"
echo "==> $test_src"
if ! "$CXX" -std=c++17 -Wall -Wextra \
if ! "$CXX" -std=c++17 -Wall -Wextra -pthread \
-I"$JSON_INC" -I"$(dirname "$test_src")" \
"$test_src" -o "$bin"; then
echo "COMPILE FAILED: $test_src" >&2

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# CrispASR version (release tag)
CRISPASR_REPO?=https://github.com/CrispStrobe/CrispASR
CRISPASR_VERSION?=fcbc8718e654995e3bd2d0c98bcb8e55e297d23c
CRISPASR_VERSION?=8fd9db8fec8cb5e929d23d3267ed5817794feb1a
SO_TARGET?=libgocrispasr.so
CMAKE_ARGS+=-DBUILD_SHARED_LIBS=OFF

View File

@@ -1,6 +1,6 @@
# parakeet-cpp backend Makefile.
#
# Upstream pin lives below as PARAKEET_VERSION?=e8acc6172a94e20a952cf1843decace5d771a94b
# Upstream pin lives below as PARAKEET_VERSION?=f469a57270a1cc4554acb15febf60e56619673b9
# (.github/bump_deps.sh) can find and update it - matches the
# whisper.cpp / ds4 / vibevoice-cpp convention.
#
@@ -15,7 +15,7 @@
# That's what the L0 smoke test uses. The default target below does the
# proper clone-at-pin + cmake build so CI doesn't need a side-checkout.
PARAKEET_VERSION?=e8acc6172a94e20a952cf1843decace5d771a94b
PARAKEET_VERSION?=f469a57270a1cc4554acb15febf60e56619673b9
PARAKEET_REPO?=https://github.com/mudler/parakeet.cpp
GOCMD?=go

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# stablediffusion.cpp (ggml)
STABLEDIFFUSION_GGML_REPO?=https://github.com/leejet/stable-diffusion.cpp
STABLEDIFFUSION_GGML_VERSION?=3590aa8d626e671a1b1dc84506ea2932a243a480
STABLEDIFFUSION_GGML_VERSION?=484baa41e5e006c52dcd4addc38c830b9489745f
CMAKE_ARGS+=-DGGML_MAX_NAME=128

View File

@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
# whisper.cpp version
WHISPER_REPO?=https://github.com/ggml-org/whisper.cpp
WHISPER_CPP_VERSION?=6fc7c33b4c3a2cec83e4b65abd5e96a890480375
WHISPER_CPP_VERSION?=0874de3e8e8e48361dba85c7fe6d176f008bf158
SO_TARGET?=libgowhisper.so
CMAKE_ARGS+=-DBUILD_SHARED_LIBS=OFF

View File

@@ -11,6 +11,8 @@ import os
import grpc
from parent_watch import start_parent_death_watcher
class _AbortHandler(grpc.RpcMethodHandler):
"""A method handler that immediately aborts with UNAUTHENTICATED."""
@@ -70,6 +72,13 @@ def get_auth_interceptors(*, aio: bool = False):
Returns an empty list when LOCALAI_GRPC_AUTH_TOKEN is not set.
"""
# Arm the best-effort parent-death backstop here: this is the single helper
# every LocalAI Python backend invokes exactly once while building its gRPC
# server (mirroring how the Go watcher arms in pkg/grpc's shared serve path).
# start_parent_death_watcher() is idempotent and a no-op when disabled or on
# unsupported platforms — see parent_watch.py.
start_parent_death_watcher()
token = os.environ.get("LOCALAI_GRPC_AUTH_TOKEN", "")
if not token:
return []

View File

@@ -0,0 +1,149 @@
"""Parent-death watcher (best-effort backstop) for LocalAI Python backends.
LocalAI spawns each backend as a child process and, on a clean shutdown, tears
it down itself (SIGTERM -> grace -> SIGKILL). That graceful path only runs when
LocalAI receives a catchable signal and lives long enough to run its handlers.
If LocalAI is SIGKILLed (e.g. a supervising process's grace period elapses
first), that teardown never runs and this backend would be reparented to init
and linger, holding GPU/VRAM and its listen port.
The watcher here is a best-effort backstop for exactly that case: it does NOT
replace the graceful teardown, it only covers the "parent vanished without
cleaning up" path. It detects reparenting: when the process that spawned this
backend dies, the kernel reparents us to the nearest sub-reaper or to init
(PID 1), so os.getppid() stops matching the value captured at startup. This
getppid() approach is portable across Linux/macOS (unlike the Linux-only
PR_SET_PDEATHSIG), which is why it is used here, mirroring the Go backends'
pkg/grpc/parentwatch.go and the C++ backends' parent_watch.h. It is disabled on
Windows, which has no equivalent orphan-reparenting semantics.
Env vars (shared verbatim across the Go, C++ and Python backends):
LOCALAI_BACKEND_PARENT_WATCH enabled by default; a falsey value
("false"/"0"/"no"/"off", case-insensitive)
disables it.
LOCALAI_BACKEND_PARENT_WATCH_INTERVAL poll interval as a Go-style duration
string ("500ms", "2s", "1m") or a bare
number of seconds. Defaults to 2s.
"""
import os
import sys
import threading
ENV_PARENT_WATCH = "LOCALAI_BACKEND_PARENT_WATCH"
ENV_PARENT_WATCH_INTERVAL = "LOCALAI_BACKEND_PARENT_WATCH_INTERVAL"
_DEFAULT_INTERVAL_SECONDS = 2.0
# Guard so repeated calls (e.g. get_auth_interceptors invoked more than once)
# only ever arm a single watcher thread per process.
_started = False
_started_lock = threading.Lock()
def _enabled():
"""Report whether the watcher should run in this process."""
# Windows does not reparent orphans to a well-known init PID, so the
# getppid() heuristic used here doesn't apply there.
if os.name == "nt" or sys.platform.startswith("win"):
return False
val = os.environ.get(ENV_PARENT_WATCH, "").strip().lower()
if val in ("false", "0", "no", "off"):
return False
return True
def _interval_seconds():
"""Return the configured poll interval in seconds, or the default.
Accepts Go-style duration strings ("500ms", "2s", "1m") for cross-language
parity, or a bare number interpreted as seconds.
"""
raw = os.environ.get(ENV_PARENT_WATCH_INTERVAL, "").strip()
if not raw:
return _DEFAULT_INTERVAL_SECONDS
# Split numeric prefix from unit suffix.
i = 0
while i < len(raw) and (raw[i].isdigit() or raw[i] == "." or (i == 0 and raw[i] in "+-")):
i += 1
if i == 0:
return _DEFAULT_INTERVAL_SECONDS
try:
num = float(raw[:i])
except ValueError:
return _DEFAULT_INTERVAL_SECONDS
unit = raw[i:].lower()
if unit == "ms":
seconds = num / 1000.0
elif unit in ("s", ""):
seconds = num
elif unit == "m":
seconds = num * 60.0
else:
return _DEFAULT_INTERVAL_SECONDS
return seconds if seconds > 0 else _DEFAULT_INTERVAL_SECONDS
def _parent_died(orig_ppid):
"""Report whether this process has been reparented away from orig_ppid.
Reparenting is the standard POSIX signal that the original parent (here, the
LocalAI process that spawned this backend) has exited: the orphan is handed
to the nearest sub-reaper or to init (PID 1), so os.getppid() no longer
matches the value captured at startup.
"""
ppid = os.getppid()
return ppid != orig_ppid or ppid == 1
def _watch(orig_ppid, interval, on_death):
"""Poll until _parent_died reports the original parent is gone, then call
on_death. Blocks, so run it on its own (daemon) thread."""
import time
while True:
time.sleep(interval)
if _parent_died(orig_ppid):
on_death()
return
def start_parent_death_watcher():
"""Install the best-effort safety net described in this module's docstring.
No-op when disabled, on Windows, when already orphaned at startup
(os.getppid() <= 1), or if already started. This is a backstop alongside —
never a replacement for — LocalAI's graceful teardown.
"""
global _started
if not _enabled():
return
with _started_lock:
if _started:
return
orig_ppid = os.getppid()
# A parent of 1 (or less) at startup means we were already orphaned (or
# launched directly under init) — there is no original parent to watch.
if orig_ppid <= 1:
return
interval = _interval_seconds()
def on_death():
print(
"backend parent process (pid {}) exited without stopping this "
"backend; self-terminating to avoid orphaning".format(orig_ppid),
file=sys.stderr,
flush=True,
)
# Immediate, non-cleanup exit: this is a shutdown safety net and the
# normal graceful path is already gone.
os._exit(1)
thread = threading.Thread(
target=_watch,
args=(orig_ppid, interval, on_death),
name="parent-death-watcher",
daemon=True,
)
thread.start()
_started = True

View File

@@ -0,0 +1,150 @@
"""Unit tests for the parent-death watcher (parent_watch.py).
Run standalone (Python standard library only, no backend venv needed):
python3 -m unittest parent_watch_test
The core test (test_detects_reparent) builds a genuine two-level process tree
(test -> middle -> grandchild) with os.fork, lets the middle process die, and
asserts the grandchild's parent_watch._watch detects the reparenting and
self-terminates — mirroring the Go test in pkg/grpc/parentwatch_test.go and the
C++ test in backend/cpp/llama-cpp/parent_watch_test.cpp.
"""
import os
import sys
import tempfile
import threading
import time
import unittest
import parent_watch
class TestParentWatchEnvParsing(unittest.TestCase):
def setUp(self):
self._saved = {
k: os.environ.get(k)
for k in (parent_watch.ENV_PARENT_WATCH, parent_watch.ENV_PARENT_WATCH_INTERVAL)
}
for k in self._saved:
os.environ.pop(k, None)
def tearDown(self):
for k, v in self._saved.items():
if v is None:
os.environ.pop(k, None)
else:
os.environ[k] = v
def test_interval_default(self):
self.assertEqual(parent_watch._interval_seconds(), 2.0)
def test_interval_units(self):
cases = {"500ms": 0.5, "2s": 2.0, "1m": 60.0, "3": 3.0, "0.5s": 0.5}
for raw, expected in cases.items():
os.environ[parent_watch.ENV_PARENT_WATCH_INTERVAL] = raw
self.assertAlmostEqual(parent_watch._interval_seconds(), expected, msg=raw)
def test_interval_garbage_falls_back(self):
os.environ[parent_watch.ENV_PARENT_WATCH_INTERVAL] = "garbage"
self.assertEqual(parent_watch._interval_seconds(), 2.0)
@unittest.skipIf(os.name == "nt" or sys.platform.startswith("win"), "POSIX only")
def test_enabled_default(self):
self.assertTrue(parent_watch._enabled())
@unittest.skipIf(os.name == "nt" or sys.platform.startswith("win"), "POSIX only")
def test_disabled_by_falsey(self):
for val in ("false", "0", "no", "off", "OFF", " False "):
os.environ[parent_watch.ENV_PARENT_WATCH] = val
self.assertFalse(parent_watch._enabled(), msg=val)
@unittest.skipIf(os.name == "nt" or sys.platform.startswith("win"), "POSIX only")
def test_enabled_by_truthy(self):
for val in ("true", "1", "yes", "on"):
os.environ[parent_watch.ENV_PARENT_WATCH] = val
self.assertTrue(parent_watch._enabled(), msg=val)
@unittest.skipIf(os.name == "nt" or sys.platform.startswith("win"), "fork/reparent is POSIX only")
class TestParentWatchReparent(unittest.TestCase):
def _wait_for_file(self, path, timeout=10.0):
deadline = time.time() + timeout
while time.time() < deadline:
if os.path.exists(path):
return True
time.sleep(0.02)
return False
def test_detects_reparent(self):
tmpdir = tempfile.mkdtemp(prefix="parentwatch_test_")
ready_file = os.path.join(tmpdir, "ready")
exited_file = os.path.join(tmpdir, "exited")
middle = os.fork()
if middle == 0:
# ---- middle process ----
grandchild = os.fork()
if grandchild == 0:
# ---- grandchild process: arm the REAL watcher against middle ----
orig_ppid = os.getppid()
def on_death():
with open(exited_file, "w") as f:
f.write("1")
os._exit(7)
threading.Thread(
target=parent_watch._watch,
args=(orig_ppid, 0.05, on_death),
daemon=True,
).start()
# Safety valve: never linger if something goes wrong.
def bail():
time.sleep(30)
os._exit(2)
threading.Thread(target=bail, daemon=True).start()
# Signal readiness only after the watcher captured orig_ppid.
with open(ready_file, "w") as f:
f.write(str(os.getpid()))
while True:
time.sleep(1)
else:
# middle: wait until grandchild is ready, then exit to orphan it.
if not self._wait_for_file(ready_file):
os._exit(5)
os._exit(0)
# ---- test (top) process ----
os.waitpid(middle, 0) # reap middle only; grandchild is orphaned
self.assertTrue(os.path.exists(ready_file), "grandchild never signaled readiness")
self.assertTrue(
self._wait_for_file(exited_file),
"watcher did not detect parent death within timeout",
)
# Best-effort cleanup: kill the grandchild if it somehow survived.
try:
with open(ready_file) as f:
pid = int(f.read().strip())
if pid > 1:
os.kill(pid, 9)
except (OSError, ValueError):
pass
for p in (ready_file, exited_file):
try:
os.remove(p)
except OSError:
pass
try:
os.rmdir(tmpdir)
except OSError:
pass
if __name__ == "__main__":
unittest.main()

View File

@@ -748,12 +748,7 @@ class BackendServicer(backend_pb2_grpc.BackendServicer):
# When (A) native streaming ran cleanly, per-delta yields above already
# delivered everything — do NOT extract again on the full text or we'd
# duplicate content/tool_calls into the final chunk.
# NOTE: `native_streaming` is a capability flag ("streaming parser is
# available"), not a state flag ("streaming actually ran"). For
# non-streaming requests it is still True but the per-delta loop was
# never entered, so we MUST still run extract_tool_calls here. Hence
# the explicit `streaming and …` guard on both branches.
if has_tool_parser and not (streaming and native_streaming and not native_streaming_error):
if has_tool_parser and not (native_streaming and not native_streaming_error):
try:
tp = tp_instance
if tp is None:
@@ -775,7 +770,7 @@ class BackendServicer(backend_pb2_grpc.BackendServicer):
))
except Exception as e:
print(f"Tool parser error: {e}", file=sys.stderr)
elif streaming and native_streaming and not native_streaming_error:
elif native_streaming and not native_streaming_error:
# Per-delta path already emitted content + tool_calls; the final
# chat_delta should carry only metadata (token counts, logprobs).
content = ""

View File

@@ -104,7 +104,7 @@ if [ "$(uname -s)" = "Darwin" ]; then
# can rewrite it. Darwin therefore follows vllm-metal and can lag the Linux
# vllm pin (requirements-cublas13-after.txt, bumped independently against
# vllm/vllm) until vllm-metal supports a newer vLLM.
VLLM_METAL_VERSION="v0.3.0.dev20260701132215"
VLLM_METAL_VERSION="v0.3.0.dev20260630095652"
# The coupled vLLM source version is whatever this vllm-metal release builds
# against -- it declares it in its own installer as `vllm_v=`. Derive it from

View File

@@ -356,12 +356,6 @@ func initDistributed(cfg *config.ApplicationConfig, authDB *gorm.DB, configLoade
PrefixConfig: prefixCfg,
Pressure: pressure,
SharedModels: cfg.Distributed.SharedModels,
// Cap how long a cold load may hold the per-model advisory lock: the
// configured backend.install deadline plus a margin for file staging and
// the remote LoadModel. Derived from the install timeout so raising it
// (for slow links pulling multi-GB images) widens the ceiling too,
// instead of letting the static default cut a legitimately slow load.
ModelLoadCeiling: cfg.Distributed.BackendInstallTimeoutOrDefault() + 10*time.Minute,
})
// Wire staging-progress broadcasting so file-staging shows up on every

View File

@@ -369,7 +369,7 @@ func New(opts ...config.AppOption) (*Application, error) {
}
for _, backend := range options.ExternalBackends {
if err := galleryop.InstallExternalBackend(options.Context, options.BackendGalleries, options.SystemState, application.ModelLoader(), nil, backend, "", "", false, options.RequireBackendIntegrity); err != nil {
if err := galleryop.InstallExternalBackend(options.Context, options.BackendGalleries, options.SystemState, application.ModelLoader(), nil, backend, "", "", options.RequireBackendIntegrity); err != nil {
xlog.Error("error installing external backend", "error", err)
}
}

View File

@@ -127,7 +127,7 @@ func (bi *BackendsInstall) Run(ctx *cliContext.Context) error {
}
modelLoader := model.NewModelLoader(systemState)
err = galleryop.InstallExternalBackend(context.Background(), galleries, systemState, modelLoader, progressCallback, bi.BackendArgs, bi.Name, bi.Alias, false, bi.RequireBackendIntegrity)
err = galleryop.InstallExternalBackend(context.Background(), galleries, systemState, modelLoader, progressCallback, bi.BackendArgs, bi.Name, bi.Alias, bi.RequireBackendIntegrity)
if err != nil {
return err
}

View File

@@ -65,10 +65,6 @@ type BackendEndpointService struct {
type GalleryBackend struct {
ID string `json:"id"`
// Force reinstalls the backend even when it is already installed and
// runnable. Off by default so apply stays idempotent for supervising
// apps that ensure their backend on every boot.
Force bool `json:"force"`
}
func CreateBackendEndpointService(galleries []config.Gallery, systemState *system.SystemState, backendApplier *galleryop.GalleryService, upgradeChecker UpgradeInfoProvider) BackendEndpointService {
@@ -107,9 +103,7 @@ func (mgs *BackendEndpointService) GetAllStatusEndpoint() echo.HandlerFunc {
}
}
// ApplyBackendEndpoint installs a new backend to a LocalAI instance. The op is
// idempotent: an already-installed, runnable backend is left alone unless the
// request sets "force": true (explicit reinstall).
// ApplyBackendEndpoint installs a new backend to a LocalAI instance
// @Summary Install backends to LocalAI.
// @Tags backends
// @Param request body GalleryBackend true "query params"
@@ -143,7 +137,6 @@ func (mgs *BackendEndpointService) ApplyBackendEndpoint(systemState *system.Syst
ID: uuid.String(),
GalleryElementName: input.ID,
Galleries: mgs.galleries,
Force: input.Force,
}
return c.JSON(200, schema.BackendResponse{ID: uuid.String(), StatusURL: fmt.Sprintf("%sbackends/jobs/%s", middleware.BaseURL(c), uuid.String())})

View File

@@ -1,87 +0,0 @@
package localai_test
import (
"net/http"
"net/http/httptest"
"os"
"strings"
"github.com/labstack/echo/v4"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
. "github.com/mudler/LocalAI/core/http/endpoints/localai"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/system"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
// POST /backends/apply must be idempotent by default: supervising apps call it
// on every boot to ensure a backend exists, and forcing a reinstall there
// re-downloads the whole artifact each time. Reinstall stays available behind
// the explicit force flag.
var _ = Describe("POST /backends/apply force plumbing", func() {
var (
app *echo.Echo
gs *galleryop.GalleryService
tmpDir string
received chan galleryop.ManagementOp[gallery.GalleryBackend, any]
)
BeforeEach(func() {
app = echo.New()
var err error
tmpDir, err = os.MkdirTemp("", "backends-apply-test-*")
Expect(err).NotTo(HaveOccurred())
systemState, err := system.GetSystemState(system.WithBackendPath(tmpDir))
Expect(err).NotTo(HaveOccurred())
appConfig := &config.ApplicationConfig{SystemState: systemState}
// The service is deliberately not started: the test reads the op off
// the (unbuffered) channel itself.
gs = galleryop.NewGalleryService(appConfig, model.NewModelLoader(systemState))
svc := CreateBackendEndpointService(nil, systemState, gs, nil)
app.POST("/backends/apply", svc.ApplyBackendEndpoint(systemState))
received = make(chan galleryop.ManagementOp[gallery.GalleryBackend, any], 1)
go func() {
op := <-gs.BackendGalleryChannel
received <- op
}()
})
AfterEach(func() {
Expect(os.RemoveAll(tmpDir)).To(Succeed())
})
apply := func(body string) *httptest.ResponseRecorder {
req := httptest.NewRequest(http.MethodPost, "/backends/apply", strings.NewReader(body))
req.Header.Set(echo.HeaderContentType, echo.MIMEApplicationJSON)
rec := httptest.NewRecorder()
app.ServeHTTP(rec, req)
return rec
}
It("enqueues a non-forced op by default", func() {
rec := apply(`{"id":"llama-cpp"}`)
Expect(rec.Code).To(Equal(http.StatusOK))
var op galleryop.ManagementOp[gallery.GalleryBackend, any]
Eventually(received).Should(Receive(&op))
Expect(op.GalleryElementName).To(Equal("llama-cpp"))
Expect(op.Force).To(BeFalse())
})
It("enqueues a forced op when the request sets force", func() {
rec := apply(`{"id":"llama-cpp","force":true}`)
Expect(rec.Code).To(Equal(http.StatusOK))
var op galleryop.ManagementOp[gallery.GalleryBackend, any]
Eventually(received).Should(Receive(&op))
Expect(op.GalleryElementName).To(Equal("llama-cpp"))
Expect(op.Force).To(BeTrue())
})
})

View File

@@ -1243,9 +1243,6 @@ func RegisterUIAPIRoutes(app *echo.Echo, cl *config.ModelConfigLoader, ml *model
Galleries: appConfig.BackendGalleries,
Context: ctx,
CancelFunc: cancelFunc,
// The React UI's "Reinstall backend" action reuses this route, so
// the op must force even when the backend is already installed.
Force: true,
}
// Store cancellation function immediately so queued operations can be cancelled
galleryService.StoreCancellation(uid, cancelFunc)

View File

@@ -6,39 +6,10 @@ import (
"hash/fnv"
"strings"
"sync"
"time"
"gorm.io/gorm"
)
// advisoryLockWaitBackstop bounds, server-side, how long we will wait to
// acquire a blocking advisory lock when the caller's context carries no
// deadline (e.g. a startup schema migration using context.Background()). It
// only exists so such a caller cannot hang forever behind a holder whose
// session never releases the lock; it is far longer than any legitimate
// guarded section. A var (not const) so tests can shrink it.
var advisoryLockWaitBackstop = 30 * time.Minute
// advisoryLockTimeoutMargin is added to a context's remaining budget when
// deriving the server-side lock_timeout, so the Go context's own (cleaner)
// cancellation fires first and the server bound is only ever a backstop.
const advisoryLockTimeoutMargin = 30 * time.Second
// advisoryLockWaitBudget returns the server-side lock_timeout to use for a
// blocking acquire: the caller context's remaining time plus a margin (so the
// Go context still governs), or the backstop when the context has no deadline.
// Never returns zero - "wait forever" must not be possible.
func advisoryLockWaitBudget(ctx context.Context) time.Duration {
if dl, ok := ctx.Deadline(); ok {
budget := time.Until(dl) + advisoryLockTimeoutMargin
if budget < time.Second {
budget = time.Second
}
return budget
}
return advisoryLockWaitBackstop
}
// localLocks holds one buffered channel (capacity 1) per lock key, used as an
// in-process mutex for non-PostgreSQL dialects (SQLite). A SQLite auth DB is
// effectively single-process, so serializing guarded sections within this
@@ -159,27 +130,6 @@ func WithLockCtx(ctx context.Context, db *gorm.DB, key int64, fn func() error) e
}
defer conn.Close()
// Override any deployment-wide lock_timeout on this dedicated connection.
// Operators commonly set a short global lock_timeout (on the role or
// database) to bound ordinary row-lock waits. Applied to the blocking
// pg_advisory_lock below, it aborts the wait with SQLSTATE 55P03 and turns
// LocalAI's intentional cross-replica "wait your turn, then re-check"
// coordination into a hard error for the caller (e.g. a chat request that
// just wanted to reuse a model another replica is loading).
//
// We do NOT disable it outright (lock_timeout = 0 would wait forever, which
// is unsafe for the schema-migration callers that pass context.Background()).
// Instead we set a bound derived from the caller's context: its remaining
// budget plus a margin so the Go context's cancellation wins with a clean
// error, or a finite backstop when the context has no deadline.
waitBudget := advisoryLockWaitBudget(ctx)
if _, err := conn.ExecContext(ctx,
fmt.Sprintf("SET lock_timeout = %d", waitBudget.Milliseconds())); err != nil {
return fmt.Errorf("advisorylock: setting lock_timeout: %w", err)
}
// Restore the session default before this pooled connection is reused.
defer func() { _, _ = conn.ExecContext(context.Background(), "RESET lock_timeout") }()
if _, err := conn.ExecContext(ctx, "SELECT pg_advisory_lock($1)", key); err != nil {
return fmt.Errorf("advisorylock: acquiring lock %d: %w", key, err)
}

View File

@@ -158,87 +158,6 @@ var _ = Describe("AdvisoryLock", func() {
Expect(err).To(HaveOccurred())
})
It("waits out a short server-side lock_timeout instead of failing with 55P03", func() {
const lockKey int64 = 703
// Reproduce the production deployment that triggered this: a short
// global lock_timeout set on the database. Without the fix, a waiter
// blocked on pg_advisory_lock() is aborted by the server after this
// window and surfaces SQLSTATE 55P03 ("canceling statement due to
// lock timeout") to the caller instead of waiting for its turn.
Expect(db.Exec("ALTER DATABASE testdb SET lock_timeout = '300ms'").Error).ToNot(HaveOccurred())
sqlDB, err := db.DB()
Expect(err).ToNot(HaveOccurred())
// Drop pooled connections so subsequent ones reconnect and inherit
// the new database-level lock_timeout default.
sqlDB.SetMaxIdleConns(0)
holding := make(chan struct{})
released := make(chan struct{})
go func() {
defer GinkgoRecover()
herr := WithLockCtx(context.Background(), db, lockKey, func() error {
close(holding)
// Hold well past the 300ms server lock_timeout.
time.Sleep(1 * time.Second)
return nil
})
Expect(herr).ToNot(HaveOccurred())
close(released)
}()
<-holding // ensure the holder owns the lock before we contend
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
executed := false
start := time.Now()
werr := WithLockCtx(ctx, db, lockKey, func() error {
executed = true
return nil
})
Expect(werr).ToNot(HaveOccurred(),
"waiter should wait out the in-progress hold, not fail with lock_timeout (55P03)")
Expect(executed).To(BeTrue())
Expect(time.Since(start)).To(BeNumerically(">=", 400*time.Millisecond),
"waiter should have actually waited for the holder to release")
<-released
})
It("bounds a deadline-less waiter with the backstop instead of waiting forever", func() {
const lockKey int64 = 704
// A caller with no context deadline (e.g. startup schema migration
// passing context.Background()) must not hang forever if the holder
// never releases. Shrink the backstop so the test is fast.
origBackstop := advisoryLockWaitBackstop
advisoryLockWaitBackstop = 500 * time.Millisecond
DeferCleanup(func() { advisoryLockWaitBackstop = origBackstop })
holding := make(chan struct{})
release := make(chan struct{})
go func() {
defer GinkgoRecover()
_ = WithLockCtx(context.Background(), db, lockKey, func() error {
close(holding)
<-release // hold until the test releases us
return nil
})
}()
defer close(release)
<-holding
start := time.Now()
err := WithLockCtx(context.Background(), db, lockKey, func() error {
Fail("waiter should not have acquired the still-held lock")
return nil
})
Expect(err).To(HaveOccurred(), "deadline-less waiter should give up at the backstop, not hang")
Expect(time.Since(start)).To(BeNumerically("<", 5*time.Second),
"backstop must cap the wait well under the test timeout")
})
It("serializes concurrent WithLockCtx on same key", func() {
const lockKey int64 = 702

View File

@@ -1,114 +0,0 @@
package galleryop_test
import (
"context"
"os"
"path/filepath"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/core/services/galleryop"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/system"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"gopkg.in/yaml.v3"
)
// The install op must be idempotent unless Force is set: API clients call
// POST /backends/apply on every boot to make sure the backend exists, and an
// unconditional force here re-downloads the whole backend artifact each time.
// Reinstall is an explicit, opted-in action.
var _ = Describe("LocalBackendManager force semantics", func() {
var (
backendsDir string
srcDir string
mgr *galleryop.LocalBackendManager
systemState *system.SystemState
ml *model.ModelLoader
)
const installedRunSh = "#!/bin/sh\necho installed\n"
const galleryRunSh = "#!/bin/sh\necho from-gallery\n"
installedRunShPath := func() string {
return filepath.Join(backendsDir, "test-backend", "run.sh")
}
BeforeEach(func() {
var err error
backendsDir, err = os.MkdirTemp("", "force-backends-*")
Expect(err).NotTo(HaveOccurred())
srcDir, err = os.MkdirTemp("", "force-src-*")
Expect(err).NotTo(HaveOccurred())
// The gallery serves test-backend from a plain directory (offline).
// The gallery yaml itself must live under the backends path: file://
// galleries outside the trusted root are rejected by the downloader.
Expect(os.WriteFile(filepath.Join(srcDir, "run.sh"), []byte(galleryRunSh), 0o755)).To(Succeed())
entries := []map[string]any{{"name": "test-backend", "uri": srcDir}}
data, err := yaml.Marshal(entries)
Expect(err).NotTo(HaveOccurred())
galleryYAML := filepath.Join(backendsDir, "gallery.yaml")
Expect(os.WriteFile(galleryYAML, data, 0o644)).To(Succeed())
// test-backend is already installed, with content that differs from
// the gallery's so a reinstall is observable.
Expect(os.MkdirAll(filepath.Join(backendsDir, "test-backend"), 0o755)).To(Succeed())
Expect(os.WriteFile(installedRunShPath(), []byte(installedRunSh), 0o755)).To(Succeed())
systemState, err = system.GetSystemState(system.WithBackendPath(backendsDir))
Expect(err).NotTo(HaveOccurred())
appConfig := &config.ApplicationConfig{
SystemState: systemState,
BackendGalleries: []config.Gallery{{Name: "test", URL: "file://" + galleryYAML}},
}
ml = model.NewModelLoader(systemState)
mgr = galleryop.NewLocalBackendManager(appConfig, ml)
})
AfterEach(func() {
Expect(os.RemoveAll(backendsDir)).To(Succeed())
Expect(os.RemoveAll(srcDir)).To(Succeed())
})
It("skips an already-installed backend when Force is not set", func() {
op := &galleryop.ManagementOp[gallery.GalleryBackend, any]{
ID: "op-1",
GalleryElementName: "test-backend",
}
Expect(mgr.InstallBackend(context.Background(), op, nil)).To(Succeed())
content, err := os.ReadFile(installedRunShPath())
Expect(err).NotTo(HaveOccurred())
Expect(string(content)).To(Equal(installedRunSh), "install without Force must not overwrite an installed backend")
})
It("reinstalls an already-installed backend when Force is set", func() {
op := &galleryop.ManagementOp[gallery.GalleryBackend, any]{
ID: "op-2",
GalleryElementName: "test-backend",
Force: true,
}
Expect(mgr.InstallBackend(context.Background(), op, nil)).To(Succeed())
content, err := os.ReadFile(installedRunShPath())
Expect(err).NotTo(HaveOccurred())
Expect(string(content)).To(Equal(galleryRunSh), "install with Force must overwrite the installed backend")
})
// The LOCALAI_EXTERNAL_BACKENDS boot loop goes through
// InstallExternalBackend's gallery-name path on EVERY startup; it must not
// force, or each boot re-downloads every listed backend.
It("skips an already-installed backend on the non-forced external gallery-name path", func() {
err := galleryop.InstallExternalBackend(context.Background(),
[]config.Gallery{{Name: "test", URL: "file://" + filepath.Join(backendsDir, "gallery.yaml")}},
systemState, ml, nil, "test-backend", "", "", false, false)
Expect(err).NotTo(HaveOccurred())
content, err := os.ReadFile(installedRunShPath())
Expect(err).NotTo(HaveOccurred())
Expect(string(content)).To(Equal(installedRunSh), "non-forced external install must not overwrite an installed backend")
})
})

View File

@@ -144,12 +144,7 @@ func (g *GalleryService) backendHandler(op *ManagementOp[gallery.GalleryBackend,
// InstallExternalBackend installs a backend from an external source (OCI image, URL, or path).
// This method contains the logic to detect the input type and call the appropriate installation function.
// It can be used by both CLI and Web UI for installing backends from external sources.
//
// force applies only to the gallery-name fallback: a URI install (dir/OCI/file)
// always writes, but a bare gallery name is an "ensure installed" — the
// LOCALAI_EXTERNAL_BACKENDS boot loop runs it on every start and must not
// re-download an installed, runnable backend.
func InstallExternalBackend(ctx context.Context, galleries []config.Gallery, systemState *system.SystemState, modelLoader *model.ModelLoader, downloadStatus func(string, string, string, float64), backend, name, alias string, force, requireIntegrity bool) error {
func InstallExternalBackend(ctx context.Context, galleries []config.Gallery, systemState *system.SystemState, modelLoader *model.ModelLoader, downloadStatus func(string, string, string, float64), backend, name, alias string, requireIntegrity bool) error {
uri := downloader.URI(backend)
switch {
case uri.LooksLikeDir():
@@ -207,7 +202,7 @@ func InstallExternalBackend(ctx context.Context, galleries []config.Gallery, sys
if name != "" || alias != "" {
return fmt.Errorf("specifying a name or alias is not supported for gallery backends")
}
err := gallery.InstallBackendFromGallery(ctx, galleries, systemState, modelLoader, backend, downloadStatus, force, requireIntegrity)
err := gallery.InstallBackendFromGallery(ctx, galleries, systemState, modelLoader, backend, downloadStatus, true, requireIntegrity)
if err != nil {
return fmt.Errorf("error installing backend %s: %w", backend, err)
}

View File

@@ -70,7 +70,6 @@ var _ = Describe("InstallExternalBackend", func() {
"test-backend", // gallery name
"custom-name", // name should not be allowed
"",
false, // force
false,
)
Expect(err).To(HaveOccurred())
@@ -87,7 +86,6 @@ var _ = Describe("InstallExternalBackend", func() {
"non-existent-backend",
"",
"",
false, // force
false,
)
Expect(err).To(HaveOccurred())
@@ -105,7 +103,6 @@ var _ = Describe("InstallExternalBackend", func() {
"oci://quay.io/mudler/tests:localai-backend-test",
"", // name is required for OCI images
"",
false, // force
false,
)
Expect(err).To(HaveOccurred())
@@ -139,7 +136,6 @@ var _ = Describe("InstallExternalBackend", func() {
testBackendPath,
"", // name should be inferred as "source-backend"
"",
false, // force
false,
)
// The function should at least attempt to install with the inferred name
@@ -159,7 +155,6 @@ var _ = Describe("InstallExternalBackend", func() {
testBackendPath,
"custom-backend-name",
"",
false, // force
false,
)
// The function should use the provided name
@@ -178,7 +173,6 @@ var _ = Describe("InstallExternalBackend", func() {
testBackendPath,
"custom-backend-name",
"custom-alias",
false, // force
false,
)
// The function should accept alias for directory paths

View File

@@ -110,13 +110,10 @@ func (b *LocalBackendManager) CheckUpgrades(ctx context.Context) (map[string]gal
func (b *LocalBackendManager) InstallBackend(ctx context.Context, op *ManagementOp[gallery.GalleryBackend, any], progressCb ProgressCallback) error {
if op.ExternalURI != "" {
return InstallExternalBackend(ctx, b.backendGalleries, b.systemState, b.modelLoader,
progressCb, op.ExternalURI, op.ExternalName, op.ExternalAlias, op.Force, b.requireBackendIntegrity)
progressCb, op.ExternalURI, op.ExternalName, op.ExternalAlias, b.requireBackendIntegrity)
}
// op.Force distinguishes an explicit reinstall from an idempotent
// "make sure it's installed" op; the latter must not re-download an
// already-runnable backend (supervisors apply on every boot).
return gallery.InstallBackendFromGallery(ctx, b.backendGalleries, b.systemState,
b.modelLoader, op.GalleryElementName, progressCb, op.Force, b.requireBackendIntegrity)
b.modelLoader, op.GalleryElementName, progressCb, true, b.requireBackendIntegrity)
}
func (b *LocalBackendManager) IsDistributed() bool { return false }

View File

@@ -45,13 +45,6 @@ type ManagementOp[T any, E any] struct {
// Upgrade is true if this is an upgrade operation (not a fresh install)
Upgrade bool
// Force reinstalls a backend even when it is already installed and
// runnable. Without it a backend install op is idempotent — API clients
// that ensure a backend exists on every boot must not trigger a full
// artifact re-download each time. The UI's explicit "Reinstall backend"
// action sets it.
Force bool
}
type OpStatus struct {

View File

@@ -68,13 +68,6 @@ type SmartRouterOptions struct {
// the absolute model paths untouched so the worker loads them directly from
// the shared volume (#10556). See config.DistributedConfig.SharedModels.
SharedModels bool
// ModelLoadCeiling is the hard upper bound on how long a single cold-load
// attempt (node selection -> backend install -> file staging -> LoadModel)
// may run while holding the per-model advisory lock. It backstops every
// sub-step's own timeout so a wedged worker can never pin the lock - and
// every other replica's request for that model - indefinitely. Zero selects
// defaultModelLoadCeiling.
ModelLoadCeiling time.Duration
}
// SmartRouter routes inference requests to the best available backend node.
@@ -108,18 +101,8 @@ type SmartRouter struct {
// sharedModels skips file staging when all nodes mount the same models
// directory at the same path (see SmartRouterOptions.SharedModels).
sharedModels bool
// modelLoadCeiling bounds how long a cold load may hold the per-model
// advisory lock (see SmartRouterOptions.ModelLoadCeiling).
modelLoadCeiling time.Duration
}
// defaultModelLoadCeiling is the fallback hold ceiling for a cold model load.
// It must comfortably exceed the slowest legitimate load - a multi-GB backend
// install (DefaultBackendInstallTimeout, 15m) plus staging and the remote
// LoadModel (5m) - so it never cuts a real load short; it only ever fires when
// a step is genuinely wedged (e.g. a worker that died mid-install).
const defaultModelLoadCeiling = 25 * time.Minute
// probeCacheTTL is how long a successful gRPC HealthCheck on a backend is
// trusted before the next request re-probes. Matches healthCheckTTL in
// pkg/model/model.go so the single-process and distributed paths share a
@@ -134,10 +117,6 @@ func NewSmartRouter(registry ModelRouter, opts SmartRouterOptions) *SmartRouter
if factory == nil {
factory = &tokenClientFactory{token: opts.AuthToken}
}
ceiling := opts.ModelLoadCeiling
if ceiling <= 0 {
ceiling = defaultModelLoadCeiling
}
return &SmartRouter{
registry: registry,
unloader: opts.Unloader,
@@ -152,7 +131,6 @@ func NewSmartRouter(registry ModelRouter, opts SmartRouterOptions) *SmartRouter
prefixConfig: opts.PrefixConfig,
pressure: opts.Pressure,
sharedModels: opts.SharedModels,
modelLoadCeiling: ceiling,
}
}
@@ -405,19 +383,11 @@ func (r *SmartRouter) Route(ctx context.Context, modelID, modelName, backendType
// the request context. If staging were bound to it, the multi-GB upload
// aborts with "context canceled" mid-transfer and large models can never
// finish staging (the model-load outage). WithoutCancel keeps the request's
// values (prefix chain, etc.) but drops its cancellation/deadline.
//
// Detaching from the caller is necessary, but it must not be unbounded: the
// load runs while holding the per-model advisory lock, and a worker that
// dies mid-install (its backend.install never replies) would otherwise pin
// that lock (and every other replica's request for the same model) until
// the NATS install deadline alone expires. Re-impose a single hard ceiling
// over the whole sequence so the lock is always released in bounded time,
// even if a sub-step wedges. Each long step still has its own (tighter)
// bound; this only backstops them. The per-model advisory lock below
// de-dupes concurrent loaders across replicas.
loadCtx, cancelLoad := context.WithTimeout(context.WithoutCancel(ctx), r.modelLoadCeiling)
defer cancelLoad()
// values (prefix chain, etc.) but drops its cancellation/deadline. Each
// long step still has its own bound (the file stager's resume budget,
// LoadModel's 5m timeout), and the per-model advisory lock below de-dupes
// concurrent loaders across replicas.
loadCtx := context.WithoutCancel(ctx)
loadModel := func(ctx context.Context) (*RouteResult, error) {
// Re-check after acquiring lock — another request may have loaded it
node, nm, err := r.registry.FindAndLockNodeWithModel(ctx, trackingKey, candidateNodeIDs, pref)
@@ -946,14 +916,7 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod
}
key := fmt.Sprintf("%s|%s|%s|%d", node.ID, backendType, modelID, replicaIndex)
// DoChan rather than Do so this wait honors ctx cancellation. InstallBackend
// blocks for its full NATS deadline (15m by default) when a worker accepts
// the request but never replies (e.g. it died mid-install). Without ctx
// awareness the caller (holding the per-model advisory lock) would sit there
// the whole time; here a cancelled ctx (typically the model-load ceiling)
// frees the caller promptly. The shared install keeps running in the
// background and still coalesces other callers via singleflight.
resCh := r.installFlight.DoChan(key, func() (any, error) {
v, err, _ := r.installFlight.Do(key, func() (any, error) {
reply, err := r.unloader.InstallBackend(node.ID, backendType, modelID, r.galleriesJSON, "", "", "", replicaIndex, "", nil)
if err != nil {
return "", err
@@ -968,15 +931,10 @@ func (r *SmartRouter) installBackendOnNode(ctx context.Context, node *BackendNod
}
return addr, nil
})
select {
case <-ctx.Done():
return "", ctx.Err()
case res := <-resCh:
if res.Err != nil {
return "", res.Err
}
return res.Val.(string), nil
if err != nil {
return "", err
}
return v.(string), nil
}
func (r *SmartRouter) buildClientForAddr(node *BackendNode, addr string, parallel bool) grpc.Backend {

View File

@@ -493,44 +493,6 @@ var _ = Describe("SmartRouter", func() {
Expect(result.Node.ID).To(Equal("n3"))
})
})
Context("worker wedges mid-install (dead node holding the lock)", func() {
It("aborts the load at the ModelLoadCeiling instead of blocking forever", func() {
// Simulate the production incident: the chosen worker accepts the
// backend.install but never replies (it died), so InstallBackend
// would otherwise block for its full NATS deadline (15m by
// default) while pinning the per-model advisory lock. Route must
// give up at the ceiling so the lock is released promptly.
reg.findAndLockErr = errors.New("not found")
reg.findIdleNode = &BackendNode{ID: "n4", Name: "dead-node", Address: "10.0.0.4:50051"}
block := make(chan struct{})
defer close(block) // let the background install goroutine drain at test end
unloader.installHook = func() { <-block }
router := NewSmartRouter(reg, SmartRouterOptions{
Unloader: unloader,
ClientFactory: factory,
ModelLoadCeiling: 200 * time.Millisecond,
})
done := make(chan error, 1)
start := time.Now()
go func() {
defer GinkgoRecover()
_, err := router.Route(context.Background(), "wedged-model",
"models/wedged.gguf", "llama-cpp",
&pb.ModelOptions{Model: "models/wedged.gguf"}, false)
done <- err
}()
var routeErr error
Eventually(done, 5*time.Second).Should(Receive(&routeErr),
"Route must not block on a wedged install past the ceiling")
Expect(routeErr).To(HaveOccurred())
Expect(time.Since(start)).To(BeNumerically("<", 5*time.Second))
})
})
})
Describe("scheduleNewModel (mock-based, via Route)", func() {

View File

@@ -134,7 +134,7 @@ func (s *backendSupervisor) installBackend(req messaging.BackendInstallRequest,
if req.URI != "" {
xlog.Info("Installing backend from external URI", "backend", req.Backend, "uri", req.URI, "force", force)
if err := galleryop.InstallExternalBackend(
context.Background(), galleries, s.systemState, s.ml, downloadCb, req.URI, req.Name, req.Alias, force, s.cfg.RequireBackendIntegrity,
context.Background(), galleries, s.systemState, s.ml, downloadCb, req.URI, req.Name, req.Alias, s.cfg.RequireBackendIntegrity,
); err != nil {
return "", fmt.Errorf("installing backend from gallery: %w", err)
}
@@ -201,7 +201,7 @@ func (s *backendSupervisor) upgradeBackend(req messaging.BackendUpgradeRequest)
if req.URI != "" {
xlog.Info("Upgrading backend from external URI", "backend", req.Backend, "uri", req.URI)
if err := galleryop.InstallExternalBackend(
context.Background(), galleries, s.systemState, s.ml, downloadCb, req.URI, req.Name, req.Alias, true, s.cfg.RequireBackendIntegrity,
context.Background(), galleries, s.systemState, s.ml, downloadCb, req.URI, req.Name, req.Alias, s.cfg.RequireBackendIntegrity,
); err != nil {
return fmt.Errorf("upgrading backend from external URI: %w", err)
}

View File

@@ -1758,8 +1758,8 @@
use_tokenizer_template: true
files:
- filename: llama-cpp/models/Qwopus3.5-9B-Coder-MTP-GGUF/Qwopus3.5-9B-Coder-MTP-Q4_K_M.gguf
sha256: f6fc5d193045796d9e1870cbc40f827fe55f53f70593c3f5c1968b82b9331991
uri: https://huggingface.co/Jackrong/Qwopus3.5-9B-Coder-MTP-GGUF/resolve/main/Qwopus3.5-9B-Coder-MTP-Q4_K_M.gguf
sha256: 9ea3ecd122a5165b8b81655f29eaf09d71daf841503e4c4212bdfadb36ab3712
- filename: llama-cpp/mmproj/Qwopus3.5-9B-Coder-MTP-GGUF/Qwopus3.5-9B-Coder-MTP-mmproj.gguf
sha256: f48daca405a1c768a9514e392c3955dcc4a9d66a5cf64cf45e064092b5f20ee4
uri: https://huggingface.co/Jackrong/Qwopus3.5-9B-Coder-MTP-GGUF/resolve/main/Qwopus3.5-9B-Coder-MTP-mmproj.gguf

105
pkg/grpc/parentwatch.go Normal file
View File

@@ -0,0 +1,105 @@
package grpc
import (
"log"
"os"
"runtime"
"strings"
"time"
)
// Backend worker processes (the per-model gRPC servers LocalAI spawns) are
// deliberately placed in their own process group by the process manager so
// LocalAI's graceful shutdown can signal the whole group. That graceful path
// (SIGTERM -> grace -> SIGKILL, driven by pkg/signals + pkg/model) only runs
// when LocalAI itself receives a catchable signal and lives long enough to run
// its handlers. If LocalAI is SIGKILLed (e.g. a supervising process's
// graceful-shutdown grace period elapses first), that teardown never runs and
// this backend would be reparented to init and linger, holding VRAM and its
// listen port.
//
// The watcher below is a best-effort backstop for exactly that case: it does
// NOT replace the graceful teardown, it only covers the "parent vanished
// without cleaning up" path. It works by detecting reparenting: when the
// process that spawned this backend dies, the kernel reparents us to the
// nearest sub-reaper or to init (PID 1), so getppid() stops matching the value
// we captured at startup. This getppid() approach is portable across
// Linux/macOS (unlike Linux-only PR_SET_PDEATHSIG), which is why it's used
// here rather than a kernel parent-death signal.
const (
// EnvBackendParentWatch toggles the parent-death watcher. It is enabled by
// default; set it to a falsey value ("false", "0", "no", "off") to disable
// (e.g. when running a backend standalone for debugging under a shell whose
// lifetime shouldn't govern the backend).
EnvBackendParentWatch = "LOCALAI_BACKEND_PARENT_WATCH"
// EnvBackendParentWatchInterval overrides the poll interval as a Go
// duration string (e.g. "500ms"). Defaults to defaultParentWatchInterval.
EnvBackendParentWatchInterval = "LOCALAI_BACKEND_PARENT_WATCH_INTERVAL"
defaultParentWatchInterval = 2 * time.Second
)
// parentWatchEnabled reports whether the watcher should run in this process.
func parentWatchEnabled() bool {
switch strings.ToLower(strings.TrimSpace(os.Getenv(EnvBackendParentWatch))) {
case "false", "0", "no", "off":
return false
}
// Windows does not reparent orphans to a well-known init PID, so the
// getppid() heuristic used here doesn't apply there.
return runtime.GOOS != "windows"
}
// parentWatchInterval returns the configured poll interval, or the default.
func parentWatchInterval() time.Duration {
if v := os.Getenv(EnvBackendParentWatchInterval); v != "" {
if d, err := time.ParseDuration(v); err == nil && d > 0 {
return d
}
}
return defaultParentWatchInterval
}
// parentDied reports whether this process has been reparented away from the
// parent it had when the watcher started. Reparenting is the standard POSIX
// signal that the original parent (here, the LocalAI process that spawned this
// backend) has exited: the orphan is handed to the nearest sub-reaper or to
// init (PID 1), so getppid() no longer matches the value captured at startup.
func parentDied(origPPID int) bool {
ppid := os.Getppid()
return ppid != origPPID || ppid == 1
}
// watchParentDeath polls until parentDied reports the original parent is gone,
// then invokes onDeath. It blocks, so run it in its own goroutine.
func watchParentDeath(origPPID int, interval time.Duration, onDeath func()) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
if parentDied(origPPID) {
onDeath()
return
}
}
}
// startParentDeathWatcher installs the best-effort safety net described above
// on the calling backend process. It is a no-op when disabled or on platforms
// where the mechanism doesn't apply. This is a backstop alongside — never a
// replacement for — LocalAI's graceful SIGTERM->grace->SIGKILL teardown.
func startParentDeathWatcher() {
if !parentWatchEnabled() {
return
}
origPPID := os.Getppid()
// A parent of 1 at startup means we were already orphaned (or launched
// directly under init) — there's no original parent to watch for.
if origPPID <= 1 {
return
}
interval := parentWatchInterval()
go watchParentDeath(origPPID, interval, func() {
log.Printf("backend parent process (pid %d) exited without stopping this backend; self-terminating to avoid orphaning", origPPID)
os.Exit(1)
})
}

View File

@@ -0,0 +1,163 @@
//go:build !windows
package grpc
import (
"os"
"os/exec"
"path/filepath"
"runtime"
"strconv"
"syscall"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
// These env vars drive the helper roles this test binary re-executes itself as
// (see the init() dispatcher). They are only set for the spawned child/
// grandchild processes, never for the normal `go test` invocation.
const (
envRole = "LOCALAI_PARENTWATCH_TEST_ROLE"
envReady = "LOCALAI_PARENTWATCH_TEST_READY" // grandchild writes its PID here once the watcher is armed
envExited = "LOCALAI_PARENTWATCH_TEST_EXITED" // grandchild writes here when it detects reparenting
)
// init dispatches the helper roles when this test binary is re-executed with a
// role set. It runs before the testing/Ginkgo machinery, and is a no-op during
// a normal test run (role unset).
func init() {
switch os.Getenv(envRole) {
case "middle":
runMiddleRole()
case "grandchild":
runGrandchildRole()
}
}
// childEnv returns the current environment with the parentwatch test role set
// to the given value (replacing any inherited role), leaving the ready/exited
// file paths inherited.
func childEnv(role string) []string {
out := make([]string, 0, len(os.Environ())+1)
for _, kv := range os.Environ() {
if len(kv) > len(envRole) && kv[:len(envRole)+1] == envRole+"=" {
continue
}
out = append(out, kv)
}
return append(out, envRole+"="+role)
}
// runGrandchildRole arms the REAL watchParentDeath against its current parent
// (the "middle" process), signals readiness, then blocks. When middle exits and
// we are reparented, the watcher fires and we record it before exiting.
func runGrandchildRole() {
exitedFile := os.Getenv(envExited)
readyFile := os.Getenv(envReady)
origPPID := os.Getppid()
go watchParentDeath(origPPID, 50*time.Millisecond, func() {
_ = os.WriteFile(exitedFile, []byte("1"), 0o644)
os.Exit(7)
})
// Safety valve: never linger if something goes wrong with the test.
go func() {
time.Sleep(30 * time.Second)
os.Exit(2)
}()
// Signal readiness only after the watcher captured origPPID, so middle
// won't exit before we've recorded it as our original parent.
_ = os.WriteFile(readyFile, []byte(strconv.Itoa(os.Getpid())), 0o644)
select {} // block until the watcher terminates us
}
// runMiddleRole spawns the grandchild (which arms the watcher against us),
// waits until it is ready, then exits — orphaning the grandchild so it gets
// reparented, which is what the watcher must detect.
func runMiddleRole() {
readyFile := os.Getenv(envReady)
self, err := os.Executable()
if err != nil {
os.Exit(3)
}
cmd := exec.Command(self)
cmd.Env = childEnv("grandchild")
// Own process group, mirroring how real backends are spawned, and discard
// std streams so the grandchild doesn't keep any parent pipe open.
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
if err := cmd.Start(); err != nil {
os.Exit(4)
}
if !waitForFile(readyFile, 10*time.Second) {
os.Exit(5)
}
os.Exit(0) // orphan the grandchild
}
func waitForFile(path string, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
if _, err := os.Stat(path); err == nil {
return true
}
time.Sleep(20 * time.Millisecond)
}
return false
}
// This spec builds a genuine two-level process tree (test -> middle ->
// grandchild), lets the middle process die, and asserts the grandchild's
// watchParentDeath detects the reparenting and self-terminates.
var _ = Describe("watchParentDeath", func() {
It("detects reparenting and self-terminates the orphaned process", func() {
if runtime.GOOS == "windows" {
Skip("parent-death watcher is not supported on windows")
}
dir := GinkgoT().TempDir()
readyFile := filepath.Join(dir, "ready")
exitedFile := filepath.Join(dir, "exited")
self, err := os.Executable()
Expect(err).NotTo(HaveOccurred(), "cannot resolve test executable")
middle := exec.Command(self)
middle.Env = append(childEnv("middle"),
envReady+"="+readyFile,
envExited+"="+exitedFile,
)
// Discard the helpers' output; keep the test log clean.
middle.Stdout = nil
middle.Stderr = nil
Expect(middle.Start()).To(Succeed(), "failed to start middle helper")
// Wait only for the middle process; the grandchild is intentionally left
// orphaned. No pipes are shared, so this returns as soon as middle exits.
Expect(middle.Wait()).To(Succeed(), "middle helper exited with error")
// The grandchild must have armed the watcher (and thus captured middle as
// its parent) before middle exited.
_, err = os.Stat(readyFile)
Expect(err).NotTo(HaveOccurred(), "grandchild never signaled readiness")
// Best-effort cleanup in case the watcher somehow doesn't fire.
DeferCleanup(func() {
if b, err := os.ReadFile(readyFile); err == nil {
if pid, err := strconv.Atoi(string(b)); err == nil {
_ = syscall.Kill(pid, syscall.SIGKILL)
}
}
})
// Now that middle is gone, the grandchild has been reparented; the watcher
// must notice and write the exited marker.
Expect(waitForFile(exitedFile, 10*time.Second)).To(BeTrue(), "watcher did not detect parent death within timeout")
})
})

View File

@@ -939,6 +939,9 @@ func StartServer(address string, model AIModel) error {
s := grpc.NewServer(serverOpts()...)
pb.RegisterBackendServer(s, &server{llm: model})
log.Printf("gRPC Server listening at %v", lis.Addr())
// Safety net: self-terminate if the LocalAI process that spawned this
// backend dies without running its graceful teardown (see parentwatch.go).
startParentDeathWatcher()
if err := s.Serve(lis); err != nil {
return err
}
@@ -954,6 +957,9 @@ func RunServer(address string, model AIModel) (func() error, error) {
s := grpc.NewServer(serverOpts()...)
pb.RegisterBackendServer(s, &server{llm: model})
log.Printf("gRPC Server listening at %v", lis.Addr())
// Safety net: self-terminate if the LocalAI process that spawned this
// backend dies without running its graceful teardown (see parentwatch.go).
startParentDeathWatcher()
if err = s.Serve(lis); err != nil {
return func() error {
return lis.Close()

View File

@@ -3605,10 +3605,6 @@ const docTemplate = `{
"localai.GalleryBackend": {
"type": "object",
"properties": {
"force": {
"description": "Force reinstalls the backend even when it is already installed and\nrunnable. Off by default so apply stays idempotent for supervising\napps that ensure their backend on every boot.",
"type": "boolean"
},
"id": {
"type": "string"
}

View File

@@ -3602,10 +3602,6 @@
"localai.GalleryBackend": {
"type": "object",
"properties": {
"force": {
"description": "Force reinstalls the backend even when it is already installed and\nrunnable. Off by default so apply stays idempotent for supervising\napps that ensure their backend on every boot.",
"type": "boolean"
},
"id": {
"type": "string"
}

View File

@@ -303,12 +303,6 @@ definitions:
type: object
localai.GalleryBackend:
properties:
force:
description: |-
Force reinstalls the backend even when it is already installed and
runnable. Off by default so apply stays idempotent for supervising
apps that ensure their backend on every boot.
type: boolean
id:
type: string
type: object