From 07f6c15a37ac9e005b99c7e5aed4f6ac533bc585 Mon Sep 17 00:00:00 2001 From: "LocalAI [bot]" <139863280+localai-bot@users.noreply.github.com> Date: Sun, 31 May 2026 00:09:55 +0200 Subject: [PATCH] feat(ds4): layer-split distributed inference (#10098) * feat(ds4): add standalone ds4-worker distributed worker binary Add worker_main.c, a minimal standalone worker that owns a slice of the model's transformer layers and serves activations over ds4's own TCP transport via ds4_dist_run(). It links the same engine objects the backend already builds (including ds4_distributed.o) and has NO gRPC/protobuf dependency, so it builds even on hosts lacking protobuf/grpc dev headers. Launched by `local-ai worker ds4-distributed`. Wire the ds4-worker CMake target (mirrors grpc-server's object/GPU/native handling) and have the Makefile copy + clean the binary alongside grpc-server. Ignore the built ds4-worker artifact. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] * feat(ds4): package ds4-worker alongside grpc-server Copy the standalone ds4-worker binary into the backend package (Linux package.sh) and the Darwin OCI tar (ds4-darwin.sh: both the explicit copy and the otool dylib-bundling loop) so distributed workers ship with the backend. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] * fix(ds4): tighten ds4-worker integer arg validation to match upstream Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] * feat(ds4): wire grpc-server as distributed coordinator Add distributed COORDINATOR support to the ds4 backend's gRPC server. Distributed inference is an engine backend: when LoadModel receives 'ds4_role:coordinator', the process populates ds4_engine_options.distributed (role, layer slice, listen host/port) before ds4_engine_open, then the normal ds4_session_* generation path runs transparently once the worker route covers all layers. - New LoadModel options: ds4_role, ds4_layers (START:END or START:output), ds4_listen (host:port), ds4_route_timeout. - parse_layers_spec() maps the layer spec onto ds4_distributed_layers. - wait_route_ready() blocks generation until ds4_session_distributed_route_ready() reports full coverage (or timeout), gating both Predict and PredictStream; returns UNAVAILABLE on timeout/error. - No ds4_role => g_distributed stays false and wait_route_ready is a no-op, so single-node behavior is unchanged. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] * fix(ds4): don't block Status during route wait; validate coordinator opts Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] * feat(cli): add ds4-distributed worker exec helper Add the ds4WorkerArgs helper plus findDS4Backend/DS4Distributed.Run that resolve the ds4 backend via the gallery and exec the packaged ds4-worker binary. Unlike worker_llamacpp.go, ds4 bundles its own dynamic loader (lib/ld.so) for glibc compatibility, so when present we exec ds4-worker through that loader with LD_LIBRARY_PATH=/lib, mirroring backend/cpp/ds4/run.sh; otherwise we exec it directly. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] * feat(cli): register the ds4-distributed worker subcommand Wire DS4Distributed into the Worker kong command tree so `local-ai worker ds4-distributed` is available. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] * docs(ds4): document layer-split distributed inference Add a ds4 section to the distributed-mode feature docs (coordinator model YAML, manual worker command, layer-range semantics, the 'GGUF on every machine' requirement, coordinator-listens dial direction vs llama.cpp) and a terse Distributed mode section to the ds4 backend agent guide. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] * test(ds4): opt-in hardware-gated distributed e2e spec Add a self-contained, opt-in Ginkgo spec to the backend e2e suite that spins a ds4 coordinator (via the packaged run.sh, loaded with ds4_role/ds4_layers/ds4_listen options) plus a ds4-worker process for the upper layers, then uses Eventually to assert a short successful Predict once the layer route forms, before tearing the worker down. Gated by BACKEND_TEST_DS4_DISTRIBUTED=1 (plus the existing BACKEND_BINARY + BACKEND_TEST_MODEL_FILE and optional layer/listen/accel knobs); compiles and skips cleanly with no env, hardware, or model. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] * test(ds4): pass coordinator ctx to worker; lowercase error string Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] * docs(ds4): note distributed transport is plaintext/unauthenticated Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] * style(ds4): replace em dashes in distributed docs/agent/test per repo convention Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] * fix(ds4): link ds4-worker with the C++ driver for CUDA/Metal builds The ds4-worker target is built from worker_main.c (C), so CMake linked it with the C driver. The nvcc-built ds4_cuda.o (and Obj-C++ ds4_metal.o) reference the C++ runtime, so the CUDA/Metal builds failed with undefined libstdc++ symbols (std::__throw_length_error). The CPU build passed because ds4_cpu.o is pure C. Force LINKER_LANGUAGE CXX so libstdc++ is linked. Signed-off-by: Ettore Di Giacinto Assisted-by: Claude:claude-opus-4-8 [Claude Code] --------- Signed-off-by: Ettore Di Giacinto Co-authored-by: Ettore Di Giacinto --- .agents/ds4-backend.md | 28 +++ backend/cpp/ds4/.gitignore | 1 + backend/cpp/ds4/CMakeLists.txt | 33 ++++ backend/cpp/ds4/Makefile | 3 +- backend/cpp/ds4/grpc-server.cpp | 149 ++++++++++++++- backend/cpp/ds4/package.sh | 3 +- backend/cpp/ds4/worker_main.c | 126 +++++++++++++ core/cli/worker/worker.go | 1 + core/cli/worker/worker_ds4.go | 108 +++++++++++ core/cli/worker/worker_ds4_test.go | 28 +++ docs/content/features/distributed-mode.md | 66 +++++++ scripts/build/ds4-darwin.sh | 3 +- tests/e2e-backends/backend_test.go | 220 ++++++++++++++++++++++ 13 files changed, 764 insertions(+), 5 deletions(-) create mode 100644 backend/cpp/ds4/worker_main.c create mode 100644 core/cli/worker/worker_ds4.go create mode 100644 core/cli/worker/worker_ds4_test.go diff --git a/.agents/ds4-backend.md b/.agents/ds4-backend.md index 5ac70ee7f..3edc14b11 100644 --- a/.agents/ds4-backend.md +++ b/.agents/ds4-backend.md @@ -68,6 +68,34 @@ go test -count=1 -timeout=30m -v ./tests/e2e-backends/... CI does not load the model; the suite is opt-in via env vars. +## Distributed mode + +ds4 supports **layer-split** distributed inference (a model too big for one host, +split by transformer layer; the GGUF must be present on every machine, each loads +only its slice). Topology is **inverted** vs llama.cpp: the coordinator listens, +workers dial in. + +- **`ds4-worker` binary**: built and packaged next to `grpc-server` (`package.sh` + copies it into `package/`). Links the same engine objects plus `ds4_distributed.o`; + **no gRPC/protobuf dependency** (speaks ds4's own TCP transport), so it builds + even where `grpc-server` can't. Runs the worker serving loop (`ds4_dist_run`). +- **Coordinator wiring**: the ds4 `grpc-server` acts as coordinator when `LoadModel` + `ModelOptions.Options` (from model-YAML `options:`) carry: + - `ds4_role:coordinator` (enables distributed mode; absent → single-node, back-compat) + - `ds4_layers:0:19` (coordinator's own slice, inclusive; `N:output` includes the head) + - `ds4_listen:0.0.0.0:1234` (address workers dial into) + - `ds4_route_timeout:60` (optional; seconds Predict/PredictStream wait for the route + to form before returning gRPC `UNAVAILABLE`; default 60) +- **Worker CLI**: `local-ai worker ds4-distributed -- ` resolves the + ds4 backend and execs the packaged `ds4-worker` (raw passthrough), e.g. + `--role worker --model /models/ds4flash.gguf --layers 20:output --coordinator 1234`. + +Opt-in e2e in `tests/e2e-backends/backend_test.go`, gated by +`BACKEND_TEST_DS4_DISTRIBUTED=1` (plus `BACKEND_TEST_DS4_WORKER_BINARY`, +`BACKEND_TEST_DS4_WORKER_LAYERS`, `BACKEND_TEST_DS4_COORDINATOR_LAYERS`, +`BACKEND_TEST_DS4_LISTEN`). Design spec: +`docs/superpowers/specs/2026-05-30-ds4-distributed-inference-design.md`. + ## Importer `core/gallery/importers/ds4.go` (`DS4Importer`) auto-detects ds4 weights by diff --git a/backend/cpp/ds4/.gitignore b/backend/cpp/ds4/.gitignore index a9f016206..23b76c31f 100644 --- a/backend/cpp/ds4/.gitignore +++ b/backend/cpp/ds4/.gitignore @@ -2,6 +2,7 @@ ds4/ build/ package/ grpc-server +ds4-worker *.o backend.pb.cc backend.pb.h diff --git a/backend/cpp/ds4/CMakeLists.txt b/backend/cpp/ds4/CMakeLists.txt index 53ccb8c7a..bb61dd0b2 100644 --- a/backend/cpp/ds4/CMakeLists.txt +++ b/backend/cpp/ds4/CMakeLists.txt @@ -104,3 +104,36 @@ if(DS4_NATIVE) target_compile_options(${TARGET} PRIVATE -march=native) endif() endif() + +# ds4-worker: standalone distributed worker. Links the same ds4 engine objects +# (including ds4_distributed.o) but has NO gRPC/protobuf dependency - it speaks +# ds4's own TCP transport via ds4_dist_run(). Buildable wherever the engine +# objects build, even on hosts without protobuf/grpc dev headers. +add_executable(ds4-worker worker_main.c) +target_include_directories(ds4-worker PRIVATE ${DS4_DIR}) +foreach(obj ${DS4_OBJS}) + target_sources(ds4-worker PRIVATE ${obj}) + set_source_files_properties(${obj} PROPERTIES EXTERNAL_OBJECT TRUE GENERATED TRUE) +endforeach() +# worker_main.c is C, but the engine objects built by nvcc (ds4_cuda.o) and the +# Metal path (ds4_metal.o, Obj-C++) reference the C++ runtime (libstdc++). Force +# the C++ linker driver so those symbols resolve; the C driver would not link +# libstdc++ and the CUDA/Metal builds fail with undefined std:: references. +set_target_properties(ds4-worker PROPERTIES LINKER_LANGUAGE CXX) +target_link_libraries(ds4-worker PRIVATE Threads::Threads m) + +if(DS4_GPU STREQUAL "cuda") + target_link_libraries(ds4-worker PRIVATE CUDA::cudart CUDA::cublas) +elseif(DS4_GPU STREQUAL "metal") + target_link_libraries(ds4-worker PRIVATE ${FOUNDATION_LIB} ${METAL_LIB}) +elseif(DS4_GPU STREQUAL "cpu") + target_compile_definitions(ds4-worker PRIVATE DS4_NO_GPU) +endif() + +if(DS4_NATIVE) + if(APPLE) + target_compile_options(ds4-worker PRIVATE -mcpu=native) + else() + target_compile_options(ds4-worker PRIVATE -march=native) + endif() +endif() diff --git a/backend/cpp/ds4/Makefile b/backend/cpp/ds4/Makefile index 125367c4f..b9d3e1a46 100644 --- a/backend/cpp/ds4/Makefile +++ b/backend/cpp/ds4/Makefile @@ -66,6 +66,7 @@ grpc-server: ds4/ds4.o mkdir -p $(BUILD_DIR) cd $(BUILD_DIR) && cmake $(CMAKE_ARGS) $(CURRENT_MAKEFILE_DIR) && cmake --build . --config Release -j $(JOBS) cp $(BUILD_DIR)/grpc-server grpc-server + cp $(BUILD_DIR)/ds4-worker ds4-worker package: grpc-server bash package.sh @@ -74,7 +75,7 @@ test: @echo "ds4 backend: e2e coverage at tests/e2e-backends/ (BACKEND_BINARY mode)" clean: - rm -rf $(BUILD_DIR) grpc-server package + rm -rf $(BUILD_DIR) grpc-server ds4-worker package if [ -d ds4 ]; then $(MAKE) -C ds4 clean; fi purge: clean diff --git a/backend/cpp/ds4/grpc-server.cpp b/backend/cpp/ds4/grpc-server.cpp index c4f149b75..f6672a4a0 100644 --- a/backend/cpp/ds4/grpc-server.cpp +++ b/backend/cpp/ds4/grpc-server.cpp @@ -23,8 +23,11 @@ extern "C" { #include #include +#include #include +#include #include +#include #include #include #include @@ -51,6 +54,12 @@ ds4_session *g_session = nullptr; int g_ctx_size = 32768; std::string g_kv_cache_dir; // empty disables disk cache +// Distributed coordinator state. g_distributed is set true when LoadModel is +// given 'ds4_role:coordinator'; generation then waits for the worker route to +// form before running. Single-node behavior is unchanged when unset. +bool g_distributed = false; +int g_route_timeout_sec = 60; + std::atomic g_server{nullptr}; // Parse a "key:value" option string. Returns empty when no colon. @@ -60,6 +69,77 @@ static std::pair split_option(const std::string &opt) return {opt.substr(0, colon), opt.substr(colon + 1)}; } +// Parse a positive base-10 integer. Returns false (without throwing) on empty, +// trailing garbage, non-positive, or overflow - unlike std::stoi. +static bool parse_positive_int(const std::string &s, int *out) { + if (s.empty()) return false; + char *end = nullptr; + long v = std::strtol(s.c_str(), &end, 10); + if (!end || *end != '\0' || v <= 0 || v > INT_MAX) return false; + *out = static_cast(v); + return true; +} + +// Parse a ds4 layer spec "START:END" or "START:output" into the engine's +// distributed layer fields. Returns false on malformed input. +static bool parse_layers_spec(const std::string &spec, ds4_distributed_layers *out) { + auto colon = spec.find(':'); + if (colon == std::string::npos) return false; + std::string lhs = spec.substr(0, colon); + std::string rhs = spec.substr(colon + 1); + if (lhs.empty() || rhs.empty()) return false; + char *end = nullptr; + long start = std::strtol(lhs.c_str(), &end, 10); + if (!end || *end != '\0' || start < 0) return false; + out->start = static_cast(start); + out->has_output = false; + if (rhs == "output") { + out->has_output = true; + out->end = out->start; // engine treats has_output as "through final layer" + } else { + long e = std::strtol(rhs.c_str(), &end, 10); + if (!end || *end != '\0' || e < start) return false; + out->end = static_cast(e); + } + out->set = true; + return true; +} + +// When acting as a distributed coordinator, block until the worker route +// covers all layers (ds4_session_distributed_route_ready == 1) or the timeout +// elapses. Returns an empty string on success, or an error message to return +// to the client. No-op when not distributed. +// +// Takes the g_engine_mu lock by reference and RELEASES it during each poll +// sleep. The wait can span up to g_route_timeout_sec seconds while workers +// connect; holding g_engine_mu the whole time would block the Status/Health +// readiness probes (they also lock g_engine_mu), making LocalAI's loader treat +// a still-starting worker as hung. +static std::string wait_route_ready(std::unique_lock &lock) { + if (!g_distributed) return ""; + char err[256] = {0}; + const int deadline_polls = g_route_timeout_sec * 10; // 100ms per poll + for (int i = 0; i <= deadline_polls; ++i) { + int ready = ds4_session_distributed_route_ready(g_session, err, sizeof(err)); + if (ready == 1) return ""; + if (ready < 0) { + return std::string("ds4 distributed route error: ") + + (err[0] ? err : "unknown"); + } + // Release the lock while sleeping so Status/Health and other RPCs can + // interleave during worker startup. + lock.unlock(); + struct timespec ts = {0, 100L * 1000L * 1000L}; // 100ms + nanosleep(&ts, nullptr); + lock.lock(); + // A concurrent Free() may have torn down the engine while we slept. + if (!g_engine || !g_session) { + return "ds4: model unloaded while waiting for distributed route"; + } + } + return "ds4 distributed route incomplete: workers not connected (layers uncovered)"; +} + static void append_token_text(ds4_engine *engine, int token, std::string &out) { size_t len = 0; const char *text = ds4_token_text(engine, token, &len); @@ -377,6 +457,11 @@ public: backend::Result *result) override { std::lock_guard lock(g_engine_mu); + // Reset distributed state so a model swap (a second LoadModel without + // ds4_role) doesn't inherit a stale coordinator configuration. + g_distributed = false; + g_route_timeout_sec = 60; + if (g_engine) { if (g_session) { ds4_session_free(g_session); g_session = nullptr; } ds4_engine_close(g_engine); @@ -394,12 +479,23 @@ public: std::string mtp_path; int mtp_draft = 0; float mtp_margin = 3.0f; + std::string ds4_role, ds4_layers, ds4_listen; for (const auto &opt : request->options()) { auto [k, v] = split_option(opt); if (k == "mtp_path") mtp_path = v; else if (k == "mtp_draft") mtp_draft = std::stoi(v); else if (k == "mtp_margin") mtp_margin = std::stof(v); else if (k == "kv_cache_dir") g_kv_cache_dir = v; + else if (k == "ds4_role") ds4_role = v; + else if (k == "ds4_layers") ds4_layers = v; + else if (k == "ds4_listen") ds4_listen = v; + else if (k == "ds4_route_timeout") { + if (!parse_positive_int(v, &g_route_timeout_sec)) { + result->set_success(false); + result->set_message("ds4: ds4_route_timeout must be a positive integer"); + return GStatus::OK; + } + } } g_kv_cache.SetDir(g_kv_cache_dir); @@ -422,6 +518,49 @@ public: opt.backend = DS4_BACKEND_CUDA; #endif + // Coordinator wiring. 'ds4_role:coordinator' enables layer-split + // distributed inference: this process listens on ds4_listen and owns + // the ds4_layers slice; workers dial in (see `local-ai worker + // ds4-distributed`). Absent ds4_role => unchanged single-node path. + // Must be static: opt.distributed.listen_host is a const char* the + // engine retains past this call, so it cannot point at a local that + // goes out of scope (otherwise a future "simplify to local" refactor + // reintroduces a dangling pointer). + static std::string s_listen_host; + if (ds4_role == "coordinator") { + if (ds4_layers.empty() || ds4_listen.empty()) { + result->set_success(false); + result->set_message("ds4: ds4_role:coordinator requires ds4_layers and ds4_listen"); + return GStatus::OK; + } + // host:port for IPv4/hostname; IPv6 literals are unsupported (the + // first colon would split inside the address). + auto host_port = split_option(ds4_listen); // "host:port" -> {host, port} + if (host_port.second.empty()) { + result->set_success(false); + result->set_message("ds4: ds4_listen must be host:port"); + return GStatus::OK; + } + int listen_port = 0; + if (!parse_positive_int(host_port.second, &listen_port)) { + result->set_success(false); + result->set_message("ds4: ds4_listen port must be a positive integer"); + return GStatus::OK; + } + ds4_distributed_layers layers = {}; + if (!parse_layers_spec(ds4_layers, &layers)) { + result->set_success(false); + result->set_message("ds4: invalid ds4_layers (want START:END or START:output)"); + return GStatus::OK; + } + s_listen_host = host_port.first; + opt.distributed.role = DS4_DISTRIBUTED_COORDINATOR; + opt.distributed.layers = layers; + opt.distributed.listen_host = s_listen_host.c_str(); + opt.distributed.listen_port = listen_port; + g_distributed = true; + } + int rc = ds4_engine_open(&g_engine, &opt); if (rc != 0 || !g_engine) { result->set_success(false); @@ -458,10 +597,13 @@ public: GStatus Predict(ServerContext *, const backend::PredictOptions *request, backend::Reply *reply) override { - std::lock_guard lock(g_engine_mu); + std::unique_lock lock(g_engine_mu); if (!g_engine || !g_session) { return GStatus(StatusCode::FAILED_PRECONDITION, "ds4: model not loaded"); } + if (std::string route_err = wait_route_ready(lock); !route_err.empty()) { + return GStatus(StatusCode::UNAVAILABLE, route_err); + } ds4_tokens prompt = {}; build_prompt(g_engine, request, &prompt); int n_predict = request->tokens() > 0 ? request->tokens() : 256; @@ -554,10 +696,13 @@ public: GStatus PredictStream(ServerContext *, const backend::PredictOptions *request, ServerWriter *writer) override { - std::lock_guard lock(g_engine_mu); + std::unique_lock lock(g_engine_mu); if (!g_engine || !g_session) { return GStatus(StatusCode::FAILED_PRECONDITION, "ds4: model not loaded"); } + if (std::string route_err = wait_route_ready(lock); !route_err.empty()) { + return GStatus(StatusCode::UNAVAILABLE, route_err); + } ds4_tokens prompt = {}; build_prompt(g_engine, request, &prompt); int n_predict = request->tokens() > 0 ? request->tokens() : 256; diff --git a/backend/cpp/ds4/package.sh b/backend/cpp/ds4/package.sh index 46d9d3c3c..33f848f0f 100755 --- a/backend/cpp/ds4/package.sh +++ b/backend/cpp/ds4/package.sh @@ -5,7 +5,8 @@ REPO_ROOT="${CURDIR}/../../.." mkdir -p "$CURDIR/package/lib" cp -avf "$CURDIR/grpc-server" "$CURDIR/package/" -cp -rfv "$CURDIR/run.sh" "$CURDIR/package/" +cp -avf "$CURDIR/ds4-worker" "$CURDIR/package/" +cp -rfv "$CURDIR/run.sh" "$CURDIR/package/" UNAME_S=$(uname -s) if [ "$UNAME_S" = "Darwin" ]; then diff --git a/backend/cpp/ds4/worker_main.c b/backend/cpp/ds4/worker_main.c new file mode 100644 index 000000000..c668bc667 --- /dev/null +++ b/backend/cpp/ds4/worker_main.c @@ -0,0 +1,126 @@ +// ds4-worker: standalone distributed worker for the LocalAI ds4 backend. +// +// A ds4 distributed worker owns a slice of the model's transformer layers, +// dials the coordinator, and serves activations for its slice. It does NOT +// speak backend.proto - it speaks ds4's own TCP transport via ds4_dist_run(). +// This binary is intentionally minimal (no HTTP/web/kvstore/linenoise): it +// only needs the engine objects + ds4_distributed.o, which the backend already +// builds. It is launched by `local-ai worker ds4-distributed`. +// +// Usage: +// ds4-worker --role worker --model --layers 20:output \ +// --coordinator [--cpu|--cuda|--metal] [-c CTX] [-t N] + +#include +#include +#include +#include +#include + +#include "ds4.h" +#include "ds4_distributed.h" + +static const char *need_arg(int *i, int argc, char **argv, const char *flag) { + if (*i + 1 >= argc) { + fprintf(stderr, "ds4-worker: missing value for %s\n", flag); + exit(2); + } + return argv[++(*i)]; +} + +static int parse_int_arg(const char *s, const char *flag) { + char *end = NULL; + long v = strtol(s, &end, 10); + if (!s[0] || *end || v <= 0 || v > INT_MAX) { + fprintf(stderr, "ds4-worker: invalid value for %s: %s\n", flag, s); + exit(2); + } + return (int)v; +} + +static ds4_backend default_backend(void) { +#if defined(DS4_NO_GPU) + return DS4_BACKEND_CPU; +#elif defined(__APPLE__) + return DS4_BACKEND_METAL; +#else + return DS4_BACKEND_CUDA; +#endif +} + +int main(int argc, char **argv) { + signal(SIGPIPE, SIG_IGN); + + ds4_engine_options opt = {0}; + opt.backend = default_backend(); + int ctx_size = 32768; + + for (int i = 1; i < argc; i++) { + const char *arg = argv[i]; + if (!strcmp(arg, "-h") || !strcmp(arg, "--help")) { + fprintf(stdout, "ds4-worker: standalone ds4 distributed worker\n"); + ds4_dist_usage(stdout); + fprintf(stdout, " -m, --model PATH model GGUF (the worker loads only its --layers slice)\n"); + fprintf(stdout, " -c, --ctx N context size (default 32768)\n"); + fprintf(stdout, " -t, --threads N CPU threads\n"); + fprintf(stdout, " --cpu|--cuda|--metal backend override\n"); + return 0; + } + + char dist_err[256] = {0}; + ds4_dist_cli_parse_result dist_parse = + ds4_dist_parse_cli_arg(arg, &i, argc, argv, &opt.distributed, + dist_err, sizeof(dist_err)); + if (dist_parse == DS4_DIST_CLI_ERROR) { + fprintf(stderr, "ds4-worker: %s\n", + dist_err[0] ? dist_err : "invalid distributed option"); + return 2; + } + if (dist_parse == DS4_DIST_CLI_MATCHED) continue; + + if (!strcmp(arg, "-m") || !strcmp(arg, "--model")) { + opt.model_path = need_arg(&i, argc, argv, arg); + } else if (!strcmp(arg, "-c") || !strcmp(arg, "--ctx")) { + ctx_size = parse_int_arg(need_arg(&i, argc, argv, arg), arg); + } else if (!strcmp(arg, "-t") || !strcmp(arg, "--threads")) { + opt.n_threads = parse_int_arg(need_arg(&i, argc, argv, arg), arg); + } else if (!strcmp(arg, "--cpu")) { + opt.backend = DS4_BACKEND_CPU; + } else if (!strcmp(arg, "--cuda")) { + opt.backend = DS4_BACKEND_CUDA; + } else if (!strcmp(arg, "--metal")) { + opt.backend = DS4_BACKEND_METAL; + } else { + fprintf(stderr, "ds4-worker: unknown option: %s\n", arg); + return 2; + } + } + + if (opt.distributed.role != DS4_DISTRIBUTED_WORKER) { + fprintf(stderr, "ds4-worker: --role worker is required\n"); + return 2; + } + if (!opt.model_path) { + fprintf(stderr, "ds4-worker: --model is required\n"); + return 2; + } + + char prep_err[256] = {0}; + if (ds4_dist_prepare_engine_options(&opt.distributed, &opt, + prep_err, sizeof(prep_err)) != 0) { + fprintf(stderr, "ds4-worker: %s\n", prep_err); + return 2; + } + + ds4_engine *engine = NULL; + if (ds4_engine_open(&engine, &opt) != 0 || !engine) { + fprintf(stderr, "ds4-worker: failed to open engine\n"); + return 1; + } + + ds4_dist_generation_options gen = {0}; + gen.ctx_size = ctx_size; + int rc = ds4_dist_run(engine, &opt.distributed, &gen); + ds4_engine_close(engine); + return rc; +} diff --git a/core/cli/worker/worker.go b/core/cli/worker/worker.go index 95eb9e5b7..792c604f5 100644 --- a/core/cli/worker/worker.go +++ b/core/cli/worker/worker.go @@ -14,4 +14,5 @@ type Worker struct { LLamaCPP LLamaCPP `cmd:"" name:"llama-cpp-rpc" help:"Starts a llama.cpp worker in standalone mode"` MLXDistributed MLXDistributed `cmd:"" name:"mlx-distributed" help:"Starts an MLX distributed worker in standalone mode (requires --hostfile and --rank)"` VLLMDistributed VLLMDistributed `cmd:"" name:"vllm" help:"Starts a vLLM data-parallel follower process. Multi-node DP for a single model: head runs the existing vllm backend with engine_args.data_parallel_size>1, followers run this command."` + DS4Distributed DS4Distributed `cmd:"" name:"ds4-distributed" help:"Starts a ds4 distributed worker in standalone mode: owns a layer slice and dials the coordinator (pass ds4-worker args after --)"` } diff --git a/core/cli/worker/worker_ds4.go b/core/cli/worker/worker_ds4.go new file mode 100644 index 000000000..f2139dbd0 --- /dev/null +++ b/core/cli/worker/worker_ds4.go @@ -0,0 +1,108 @@ +package worker + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "strings" + "syscall" + + cliContext "github.com/mudler/LocalAI/core/cli/context" + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/gallery" + "github.com/mudler/LocalAI/pkg/model" + "github.com/mudler/LocalAI/pkg/system" + "github.com/mudler/xlog" +) + +type DS4Distributed struct { + WorkerFlags `embed:""` + ExtraDS4Args string `name:"ds4-args" env:"LOCALAI_EXTRA_DS4_ARGS,EXTRA_DS4_ARGS" help:"Arguments passed to ds4-worker (e.g. '--role worker --model m.gguf --layers 20:output --coordinator HOST PORT')"` +} + +const ( + ds4WorkerBinaryName = "ds4-worker" + ds4GalleryName = "ds4" +) + +// ds4WorkerArgs builds the argv for syscall.Exec when launching ds4-worker +// directly: the binary path followed by the space-split extra args. An empty +// extra string yields a bare invocation. +func ds4WorkerArgs(binary, extra string) []string { + args := []string{binary} + args = append(args, strings.Fields(extra)...) + return args +} + +func findDS4Backend(galleries string, systemState *system.SystemState, requireIntegrity bool) (string, error) { + backends, err := gallery.ListSystemBackends(systemState) + if err != nil { + xlog.Warn("Failed listing system backends", "error", err) + return "", err + } + + backend, ok := backends.Get(ds4GalleryName) + if !ok { + ml := model.NewModelLoader(systemState) + var gals []config.Gallery + if err := json.Unmarshal([]byte(galleries), &gals); err != nil { + xlog.Error("failed loading galleries", "error", err) + return "", err + } + if err := gallery.InstallBackendFromGallery(context.Background(), gals, systemState, ml, ds4GalleryName, nil, true, requireIntegrity); err != nil { + xlog.Error("ds4 backend not found, failed to install it", "error", err) + return "", err + } + backends, err = gallery.ListSystemBackends(systemState) + if err != nil { + return "", err + } + backend, ok = backends.Get(ds4GalleryName) + if !ok { + return "", errors.New("ds4 backend not found after install") + } + } + + backendPath := filepath.Dir(backend.RunFile) + if backendPath == "" { + return "", errors.New("ds4 backend not found, install it first") + } + return filepath.Join(backendPath, ds4WorkerBinaryName), nil +} + +func (r *DS4Distributed) Run(ctx *cliContext.Context) error { + if r.ExtraDS4Args == "" && len(os.Args) < 4 { + return fmt.Errorf("usage: local-ai worker ds4-distributed -- --role worker --model --layers --coordinator ") + } + + systemState, err := system.GetSystemState( + system.WithBackendPath(r.BackendsPath), + system.WithBackendSystemPath(r.BackendsSystemPath), + ) + if err != nil { + return err + } + + worker, err := findDS4Backend(r.BackendGalleries, systemState, r.RequireBackendIntegrity) + if err != nil { + return err + } + + // ds4 bundles its own dynamic loader (lib/ld.so) for glibc compatibility, + // like backend/cpp/ds4/run.sh does for grpc-server. Launch ds4-worker via + // that loader when present; otherwise exec it directly. (This is a + // deliberate divergence from worker_llamacpp.go, which has no bundled loader.) + backendPath := filepath.Dir(worker) + env := os.Environ() + loader := filepath.Join(backendPath, "lib", "ld.so") + if _, statErr := os.Stat(loader); statErr == nil { + env = append(env, "LD_LIBRARY_PATH="+filepath.Join(backendPath, "lib")+":"+os.Getenv("LD_LIBRARY_PATH")) + args := append([]string{loader}, ds4WorkerArgs(worker, r.ExtraDS4Args)...) + return syscall.Exec(loader, args, env) + } + + return syscall.Exec(worker, ds4WorkerArgs(worker, r.ExtraDS4Args), env) +} diff --git a/core/cli/worker/worker_ds4_test.go b/core/cli/worker/worker_ds4_test.go new file mode 100644 index 000000000..2a0050e12 --- /dev/null +++ b/core/cli/worker/worker_ds4_test.go @@ -0,0 +1,28 @@ +package worker + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("ds4 worker CLI", func() { + It("uses the ds4 backend gallery name and worker binary name", func() { + Expect(ds4GalleryName).To(Equal("ds4")) + Expect(ds4WorkerBinaryName).To(Equal("ds4-worker")) + }) + + It("assembles direct exec args as [binary, extra-split...]", func() { + args := ds4WorkerArgs("/b/ds4-worker", "--role worker --model m.gguf --layers 20:output --coordinator 10.0.0.1 1234") + Expect(args).To(Equal([]string{ + "/b/ds4-worker", + "--role", "worker", + "--model", "m.gguf", + "--layers", "20:output", + "--coordinator", "10.0.0.1", "1234", + })) + }) + + It("drops empty extra args to a bare binary invocation", func() { + Expect(ds4WorkerArgs("/b/ds4-worker", "")).To(Equal([]string{"/b/ds4-worker"})) + }) +}) diff --git a/docs/content/features/distributed-mode.md b/docs/content/features/distributed-mode.md index c9b48d835..af5f74645 100644 --- a/docs/content/features/distributed-mode.md +++ b/docs/content/features/distributed-mode.md @@ -424,6 +424,72 @@ engine_args: - **Network reachability.** The head's `data_parallel_rpc_port` plus a range of ZMQ ports (typically `data_parallel_rpc_port..+N`) must be reachable from every follower. Open them in your firewall / security group. - **Topology must match exactly.** A mismatch in `--data-parallel-size` between head and any follower will hang the handshake. Check the head's vLLM logs for `waiting for N DP ranks` if startup stalls. +## ds4 Layer-Split Distributed Inference + +The ds4 backend (DeepSeek V4 Flash) supports **layer-parallel** distributed inference: a single model that is too large for one machine is split by transformer layer across several machines. Each machine must have the GGUF present locally, but loads **only its own slice** of the layers. This lets you run a model whose weights exceed any single host's memory. + +This is **not** routed through the SmartRouter: it is a model-internal split, configured manually (Phase 1). It is unrelated to the NATS/PostgreSQL distributed mode described above. + +### Topology + +ds4 uses a **coordinator/worker** split: + +- The **coordinator** owns tokenization, sampling, the prompt, and a low layer range (e.g. `0:19`). It is LocalAI's ds4 backend and **listens** on a host/port. Workers dial into it. +- One or more **workers** own higher layer ranges (e.g. `20:output`). Each worker loads only its slice and **dials the coordinator** to register the range it can serve. The last worker normally owns the output head. +- Activations flow through the connected slices and back to the coordinator. The route is "ready" only once the coordinator plus all connected workers cover every layer. + +This dial direction is the **inverse** of the llama.cpp RPC model, where the main server dials *out* to a list of `rpc-server` workers. With ds4 the **workers dial in** to the coordinator. + +### Coordinator setup + +The coordinator is a normal LocalAI ds4 model whose YAML carries distributed `options:`: + +```yaml +name: ds4flash +backend: ds4 +options: + - "ds4_role:coordinator" + - "ds4_layers:0:19" + - "ds4_listen:0.0.0.0:1234" +``` + +| Option | Meaning | +|--------|---------| +| `ds4_role:coordinator` | Enables distributed coordinator mode. Without `ds4_role`, the backend behaves as a normal single-node ds4 model. | +| `ds4_layers:0:19` | The coordinator's own layer slice (inclusive). | +| `ds4_listen:0.0.0.0:1234` | Address that workers dial into. | +| `ds4_route_timeout:60` | Optional. Seconds the coordinator waits for the worker route to form before returning an error on a request. Defaults to 60. | + +{{% notice warning %}} +Worker↔coordinator traffic is **plaintext and unauthenticated**: there is no TLS or auth on this channel. Bind `ds4_listen` to an address on a trusted/private network only; using `0.0.0.0` exposes the coordinator on every interface. Run the layer split exclusively over a network you control. +{{% /notice %}} + +Once the model is loaded, the coordinator serves requests exactly like a single-node ds4 model: generation goes through the ordinary inference path and is transparently routed across the layer slices. + +### Worker setup + +On each worker machine (with the GGUF present locally), start a worker pointed at the coordinator: + +```bash +local-ai worker ds4-distributed -- \ + --role worker \ + --model /models/ds4flash.gguf \ + --layers 20:output \ + --coordinator 1234 +``` + +`local-ai worker ds4-distributed` resolves the ds4 backend and execs the packaged `ds4-worker` binary, passing everything after `--` straight through. + +### Layer-range semantics + +- Ranges are **inclusive**: `0:19` is layers 0 through 19. +- `N:output` means layer N through the final layer **plus the output head**. The last worker normally owns the output head. +- The coordinator and all connected workers together **must cover every layer**. Until they do, the coordinator returns a gRPC `UNAVAILABLE` error on inference requests (so a worker that starts slightly after the coordinator is tolerated: once it connects and the route is complete, requests succeed). The wait is tunable via `ds4_route_timeout`. + +{{% notice note %}} +ds4 layer-split inference is **manual setup** in this release (Phase 1): you place the coordinator config and launch each worker yourself, and the layer ranges must be partitioned by hand so they cover the whole model. P2P auto-discovery of the coordinator is planned for a later phase. +{{% /notice %}} + ## Scaling **Adding worker capacity:** Start additional `worker` instances pointing to the same frontend. They self-register automatically: diff --git a/scripts/build/ds4-darwin.sh b/scripts/build/ds4-darwin.sh index 23017d9a1..2da359668 100755 --- a/scripts/build/ds4-darwin.sh +++ b/scripts/build/ds4-darwin.sh @@ -15,6 +15,7 @@ mkdir -p build/darwin/lib mkdir -p backend-images cp -rf backend/cpp/ds4/grpc-server build/darwin/ +cp -rf backend/cpp/ds4/ds4-worker build/darwin/ cp -rf backend/cpp/ds4/run.sh build/darwin/ # Apple Silicon: pick up Homebrew-installed protobuf utf8_validity if present. @@ -28,7 +29,7 @@ for file in $ADDITIONAL_LIBS; do done # Walk dylibs via otool -L and bundle anything that isn't a system framework. -for file in build/darwin/grpc-server; do +for file in build/darwin/grpc-server build/darwin/ds4-worker; do LIBS="$(otool -L "$file" | awk 'NR > 1 { system("echo " $1) } ' | xargs echo)" for lib in $LIBS; do if [[ "$lib" == *.dylib ]] && [[ -e "$lib" ]]; then diff --git a/tests/e2e-backends/backend_test.go b/tests/e2e-backends/backend_test.go index 95f95f666..4c7dac33c 100644 --- a/tests/e2e-backends/backend_test.go +++ b/tests/e2e-backends/backend_test.go @@ -1144,6 +1144,226 @@ var _ = Describe("Backend container", Ordered, func() { }) }) +// ─── ds4 layer-split distributed inference (opt-in, hardware-gated) ────────── +// +// ds4 distributes a single model across machines by transformer layer: a +// coordinator (LocalAI's ds4 backend) listens and owns a low layer slice; one +// or more `ds4-worker` processes dial in and own the higher slices (the last +// owns the output head). The route is "ready" only once coordinator + workers +// cover every layer; until then the coordinator returns gRPC UNAVAILABLE. +// +// This spec is entirely opt-in. It only runs when BACKEND_TEST_DS4_DISTRIBUTED=1 +// is set, AND the suite's normal BACKEND_BINARY (the packaged ds4 run.sh) and +// BACKEND_TEST_MODEL_FILE (the GGUF, present on this machine) are provided. With +// none of those set it compiles and SKIPs cleanly: no hardware, model, or +// network required. +// +// What it covers (single-host, two-process): coordinator option-loading +// (ds4_role/ds4_layers/ds4_listen) through the real gRPC LoadModel path, a +// real ds4-worker process spawned for the upper layers dialing the listen +// address, route formation, and a short successful Predict once the route is +// up. It then tears the worker down. +// +// What it does NOT cover: multi-host networking, >1 worker, failure/timeout +// paths, or the `local-ai worker ds4-distributed` CLI wrapper (that resolves +// the backend + execs ds4-worker and is unit-tested separately); here we exec +// the packaged ds4-worker binary directly so the e2e stays self-contained. +// +// Env vars (in addition to BACKEND_BINARY + BACKEND_TEST_MODEL_FILE): +// +// BACKEND_TEST_DS4_DISTRIBUTED Set to "1" to enable this spec. +// BACKEND_TEST_DS4_WORKER_BINARY Path to the packaged `ds4-worker` binary. +// Defaults to a `ds4-worker` sitting next to +// the BACKEND_BINARY run.sh. +// BACKEND_TEST_DS4_COORDINATOR_LAYERS Coordinator's own layer slice (default "0:19"). +// BACKEND_TEST_DS4_WORKER_LAYERS Worker's layer slice (default "20:output"). +// BACKEND_TEST_DS4_LISTEN Address workers dial into (default "127.0.0.1:"). +// BACKEND_TEST_DS4_WORKER_ACCEL Optional accel flag for the worker: +// "cpu" (default), "cuda", or "metal". +var _ = Describe("ds4 layer-split distributed inference", Ordered, func() { + var ( + workDir string + binaryDir string + modelFile string + listen string + + coordCmd *exec.Cmd + workerCmd *exec.Cmd + conn *grpc.ClientConn + client pb.BackendClient + ) + + BeforeAll(func() { + if os.Getenv("BACKEND_TEST_DS4_DISTRIBUTED") != "1" { + Skip("ds4 distributed spec is opt-in; set BACKEND_TEST_DS4_DISTRIBUTED=1 (plus BACKEND_BINARY and BACKEND_TEST_MODEL_FILE) to run it") + } + + binary := os.Getenv("BACKEND_BINARY") + Expect(binary).NotTo(BeEmpty(), + "ds4 distributed spec requires BACKEND_BINARY pointing at the packaged ds4 run.sh") + Expect(filepath.Base(binary)).To(Equal("run.sh"), + "BACKEND_BINARY must point at a run.sh produced by 'make -C backend/cpp/ds4 package'") + binaryDir = filepath.Dir(binary) + Expect(filepath.Join(binaryDir, "run.sh")).To(BeAnExistingFile()) + + modelFile = os.Getenv("BACKEND_TEST_MODEL_FILE") + Expect(modelFile).NotTo(BeEmpty(), + "ds4 distributed spec requires BACKEND_TEST_MODEL_FILE (GGUF present on this host)") + Expect(modelFile).To(BeAnExistingFile()) + + // Locate the ds4-worker binary the same way the suite locates the + // backend: next to the packaged run.sh, overridable via env. + workerBin := os.Getenv("BACKEND_TEST_DS4_WORKER_BINARY") + if workerBin == "" { + workerBin = filepath.Join(binaryDir, "ds4-worker") + } + Expect(workerBin).To(BeAnExistingFile(), + "ds4-worker binary not found (set BACKEND_TEST_DS4_WORKER_BINARY or package it next to run.sh)") + + coordLayers := os.Getenv("BACKEND_TEST_DS4_COORDINATOR_LAYERS") + if coordLayers == "" { + coordLayers = "0:19" + } + workerLayers := os.Getenv("BACKEND_TEST_DS4_WORKER_LAYERS") + if workerLayers == "" { + workerLayers = "20:output" + } + + listen = os.Getenv("BACKEND_TEST_DS4_LISTEN") + if listen == "" { + lp, err := freeport.GetFreePort() + Expect(err).NotTo(HaveOccurred()) + listen = fmt.Sprintf("127.0.0.1:%d", lp) + } + // The worker dials the listen host/port as two separate CLI args. + listenHost, listenPort, err := net.SplitHostPort(listen) + Expect(err).NotTo(HaveOccurred(), "BACKEND_TEST_DS4_LISTEN must be host:port, got %q", listen) + + workDir, err = os.MkdirTemp("", "ds4-dist-e2e-*") + Expect(err).NotTo(HaveOccurred()) + + Expect(os.Chmod(filepath.Join(binaryDir, "run.sh"), 0o755)).To(Succeed()) + _ = os.Chmod(workerBin, 0o755) + + // 1) Start the coordinator gRPC backend. + port, err := freeport.GetFreePort() + Expect(err).NotTo(HaveOccurred()) + coordAddr := fmt.Sprintf("127.0.0.1:%d", port) + + coordCmd = exec.Command(filepath.Join(binaryDir, "run.sh"), "--addr="+coordAddr) + coordCmd.Stdout = GinkgoWriter + coordCmd.Stderr = GinkgoWriter + Expect(coordCmd.Start()).To(Succeed()) + + Eventually(func() error { + c, derr := net.DialTimeout("tcp", coordAddr, 500*time.Millisecond) + if derr != nil { + return derr + } + _ = c.Close() + return nil + }, 30*time.Second, 200*time.Millisecond).Should(Succeed(), "coordinator backend did not start") + + conn, err = grpc.Dial(coordAddr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(50*1024*1024)), + ) + Expect(err).NotTo(HaveOccurred()) + client = pb.NewBackendClient(conn) + + // 2) Load the coordinator model with distributed options. This proves + // ds4_role/ds4_layers/ds4_listen option parsing through the real + // LoadModel path. + loadCtx, loadCancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer loadCancel() + ctxSize := envInt32("BACKEND_TEST_CTX_SIZE", 512) + res, err := client.LoadModel(loadCtx, &pb.ModelOptions{ + Model: modelFile, + ModelFile: modelFile, + ContextSize: ctxSize, + Threads: envInt32("BACKEND_TEST_THREADS", 4), + NGPULayers: 0, + MMap: true, + Options: []string{ + "ds4_role:coordinator", + "ds4_layers:" + coordLayers, + "ds4_listen:" + listen, + }, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(res.GetSuccess()).To(BeTrue(), "coordinator LoadModel failed: %s", res.GetMessage()) + + // 3) Spawn the ds4-worker for the upper layers, dialing the coordinator. + accel := strings.ToLower(strings.TrimSpace(os.Getenv("BACKEND_TEST_DS4_WORKER_ACCEL"))) + workerArgs := []string{ + "--role", "worker", + "--model", modelFile, + "--layers", workerLayers, + "--coordinator", listenHost, listenPort, + "-c", fmt.Sprintf("%d", ctxSize), + } + switch accel { + case "", "cpu": + workerArgs = append(workerArgs, "--cpu") + case "cuda": + workerArgs = append(workerArgs, "--cuda") + case "metal": + workerArgs = append(workerArgs, "--metal") + default: + Fail(fmt.Sprintf("unsupported BACKEND_TEST_DS4_WORKER_ACCEL=%q (want cpu|cuda|metal)", accel)) + } + workerCmd = exec.Command(workerBin, workerArgs...) + workerCmd.Stdout = GinkgoWriter + workerCmd.Stderr = GinkgoWriter + Expect(workerCmd.Start()).To(Succeed()) + }) + + AfterAll(func() { + if conn != nil { + _ = conn.Close() + } + if workerCmd != nil && workerCmd.Process != nil { + _ = workerCmd.Process.Kill() + _, _ = workerCmd.Process.Wait() + } + if coordCmd != nil && coordCmd.Process != nil { + _ = coordCmd.Process.Kill() + _, _ = coordCmd.Process.Wait() + } + if workDir != "" { + _ = os.RemoveAll(workDir) + } + }) + + It("forms the route and generates a short completion", func() { + // The coordinator returns UNAVAILABLE until the worker has connected and + // the layer range is fully covered. Eventually retries until the route + // is up (or the worker dies), then asserts non-empty content. + var last string + Eventually(func() error { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + res, err := client.Predict(ctx, &pb.PredictOptions{ + Prompt: defaultPrompt, + Tokens: 20, + Temperature: 0.1, + TopK: 40, + TopP: 0.9, + }) + if err != nil { + return err + } + last = string(res.GetMessage()) + if last == "" { + return fmt.Errorf("predict returned empty content") + } + return nil + }, 5*time.Minute, 2*time.Second).Should(Succeed(), + "coordinator never produced a completion once the worker route should have formed") + GinkgoWriter.Printf("ds4 distributed Predict: %q\n", last) + }) +}) + // extractImage runs `docker create` + `docker export` to materialise the image // rootfs into dest. Using export (not save) avoids dealing with layer tarballs. func extractImage(image, dest string) {