feat(ds4): layer-split distributed inference (#10098)

* feat(ds4): add standalone ds4-worker distributed worker binary

Add worker_main.c, a minimal standalone worker that owns a slice of the
model's transformer layers and serves activations over ds4's own TCP
transport via ds4_dist_run(). It links the same engine objects the
backend already builds (including ds4_distributed.o) and has NO
gRPC/protobuf dependency, so it builds even on hosts lacking protobuf/grpc
dev headers. Launched by `local-ai worker ds4-distributed`.

Wire the ds4-worker CMake target (mirrors grpc-server's object/GPU/native
handling) and have the Makefile copy + clean the binary alongside
grpc-server. Ignore the built ds4-worker artifact.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]

* feat(ds4): package ds4-worker alongside grpc-server

Copy the standalone ds4-worker binary into the backend package (Linux
package.sh) and the Darwin OCI tar (ds4-darwin.sh: both the explicit copy
and the otool dylib-bundling loop) so distributed workers ship with the
backend.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]

* fix(ds4): tighten ds4-worker integer arg validation to match upstream

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]

* feat(ds4): wire grpc-server as distributed coordinator

Add distributed COORDINATOR support to the ds4 backend's gRPC server.
Distributed inference is an engine backend: when LoadModel receives
'ds4_role:coordinator', the process populates ds4_engine_options.distributed
(role, layer slice, listen host/port) before ds4_engine_open, then the normal
ds4_session_* generation path runs transparently once the worker route covers
all layers.

- New LoadModel options: ds4_role, ds4_layers (START:END or START:output),
  ds4_listen (host:port), ds4_route_timeout.
- parse_layers_spec() maps the layer spec onto ds4_distributed_layers.
- wait_route_ready() blocks generation until
  ds4_session_distributed_route_ready() reports full coverage (or timeout),
  gating both Predict and PredictStream; returns UNAVAILABLE on timeout/error.
- No ds4_role => g_distributed stays false and wait_route_ready is a no-op,
  so single-node behavior is unchanged.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]

* fix(ds4): don't block Status during route wait; validate coordinator opts

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]

* feat(cli): add ds4-distributed worker exec helper

Add the ds4WorkerArgs helper plus findDS4Backend/DS4Distributed.Run that
resolve the ds4 backend via the gallery and exec the packaged ds4-worker
binary. Unlike worker_llamacpp.go, ds4 bundles its own dynamic loader
(lib/ld.so) for glibc compatibility, so when present we exec ds4-worker
through that loader with LD_LIBRARY_PATH=<backend>/lib, mirroring
backend/cpp/ds4/run.sh; otherwise we exec it directly.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]

* feat(cli): register the ds4-distributed worker subcommand

Wire DS4Distributed into the Worker kong command tree so
`local-ai worker ds4-distributed` is available.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]

* docs(ds4): document layer-split distributed inference

Add a ds4 section to the distributed-mode feature docs (coordinator
model YAML, manual worker command, layer-range semantics, the
'GGUF on every machine' requirement, coordinator-listens dial
direction vs llama.cpp) and a terse Distributed mode section to the
ds4 backend agent guide.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]

* test(ds4): opt-in hardware-gated distributed e2e spec

Add a self-contained, opt-in Ginkgo spec to the backend e2e suite that
spins a ds4 coordinator (via the packaged run.sh, loaded with
ds4_role/ds4_layers/ds4_listen options) plus a ds4-worker process for
the upper layers, then uses Eventually to assert a short successful
Predict once the layer route forms, before tearing the worker down.

Gated by BACKEND_TEST_DS4_DISTRIBUTED=1 (plus the existing
BACKEND_BINARY + BACKEND_TEST_MODEL_FILE and optional layer/listen/accel
knobs); compiles and skips cleanly with no env, hardware, or model.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]

* test(ds4): pass coordinator ctx to worker; lowercase error string

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]

* docs(ds4): note distributed transport is plaintext/unauthenticated

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]

* style(ds4): replace em dashes in distributed docs/agent/test per repo convention

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]

* fix(ds4): link ds4-worker with the C++ driver for CUDA/Metal builds

The ds4-worker target is built from worker_main.c (C), so CMake linked it
with the C driver. The nvcc-built ds4_cuda.o (and Obj-C++ ds4_metal.o)
reference the C++ runtime, so the CUDA/Metal builds failed with undefined
libstdc++ symbols (std::__throw_length_error). The CPU build passed because
ds4_cpu.o is pure C. Force LINKER_LANGUAGE CXX so libstdc++ is linked.

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Assisted-by: Claude:claude-opus-4-8 [Claude Code]

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
LocalAI [bot]
2026-05-31 00:09:55 +02:00
committed by GitHub
parent a44bdb29d4
commit 07f6c15a37
13 changed files with 764 additions and 5 deletions

View File

@@ -68,6 +68,34 @@ go test -count=1 -timeout=30m -v ./tests/e2e-backends/...
CI does not load the model; the suite is opt-in via env vars.
## Distributed mode
ds4 supports **layer-split** distributed inference (a model too big for one host,
split by transformer layer; the GGUF must be present on every machine, each loads
only its slice). Topology is **inverted** vs llama.cpp: the coordinator listens,
workers dial in.
- **`ds4-worker` binary**: built and packaged next to `grpc-server` (`package.sh`
copies it into `package/`). Links the same engine objects plus `ds4_distributed.o`;
**no gRPC/protobuf dependency** (speaks ds4's own TCP transport), so it builds
even where `grpc-server` can't. Runs the worker serving loop (`ds4_dist_run`).
- **Coordinator wiring**: the ds4 `grpc-server` acts as coordinator when `LoadModel`
`ModelOptions.Options` (from model-YAML `options:`) carry:
- `ds4_role:coordinator` (enables distributed mode; absent → single-node, back-compat)
- `ds4_layers:0:19` (coordinator's own slice, inclusive; `N:output` includes the head)
- `ds4_listen:0.0.0.0:1234` (address workers dial into)
- `ds4_route_timeout:60` (optional; seconds Predict/PredictStream wait for the route
to form before returning gRPC `UNAVAILABLE`; default 60)
- **Worker CLI**: `local-ai worker ds4-distributed -- <ds4-worker args>` resolves the
ds4 backend and execs the packaged `ds4-worker` (raw passthrough), e.g.
`--role worker --model /models/ds4flash.gguf --layers 20:output --coordinator <host> 1234`.
Opt-in e2e in `tests/e2e-backends/backend_test.go`, gated by
`BACKEND_TEST_DS4_DISTRIBUTED=1` (plus `BACKEND_TEST_DS4_WORKER_BINARY`,
`BACKEND_TEST_DS4_WORKER_LAYERS`, `BACKEND_TEST_DS4_COORDINATOR_LAYERS`,
`BACKEND_TEST_DS4_LISTEN`). Design spec:
`docs/superpowers/specs/2026-05-30-ds4-distributed-inference-design.md`.
## Importer
`core/gallery/importers/ds4.go` (`DS4Importer`) auto-detects ds4 weights by

View File

@@ -2,6 +2,7 @@ ds4/
build/
package/
grpc-server
ds4-worker
*.o
backend.pb.cc
backend.pb.h

View File

@@ -104,3 +104,36 @@ if(DS4_NATIVE)
target_compile_options(${TARGET} PRIVATE -march=native)
endif()
endif()
# ds4-worker: standalone distributed worker. Links the same ds4 engine objects
# (including ds4_distributed.o) but has NO gRPC/protobuf dependency - it speaks
# ds4's own TCP transport via ds4_dist_run(). Buildable wherever the engine
# objects build, even on hosts without protobuf/grpc dev headers.
add_executable(ds4-worker worker_main.c)
target_include_directories(ds4-worker PRIVATE ${DS4_DIR})
foreach(obj ${DS4_OBJS})
target_sources(ds4-worker PRIVATE ${obj})
set_source_files_properties(${obj} PROPERTIES EXTERNAL_OBJECT TRUE GENERATED TRUE)
endforeach()
# worker_main.c is C, but the engine objects built by nvcc (ds4_cuda.o) and the
# Metal path (ds4_metal.o, Obj-C++) reference the C++ runtime (libstdc++). Force
# the C++ linker driver so those symbols resolve; the C driver would not link
# libstdc++ and the CUDA/Metal builds fail with undefined std:: references.
set_target_properties(ds4-worker PROPERTIES LINKER_LANGUAGE CXX)
target_link_libraries(ds4-worker PRIVATE Threads::Threads m)
if(DS4_GPU STREQUAL "cuda")
target_link_libraries(ds4-worker PRIVATE CUDA::cudart CUDA::cublas)
elseif(DS4_GPU STREQUAL "metal")
target_link_libraries(ds4-worker PRIVATE ${FOUNDATION_LIB} ${METAL_LIB})
elseif(DS4_GPU STREQUAL "cpu")
target_compile_definitions(ds4-worker PRIVATE DS4_NO_GPU)
endif()
if(DS4_NATIVE)
if(APPLE)
target_compile_options(ds4-worker PRIVATE -mcpu=native)
else()
target_compile_options(ds4-worker PRIVATE -march=native)
endif()
endif()

View File

@@ -66,6 +66,7 @@ grpc-server: ds4/ds4.o
mkdir -p $(BUILD_DIR)
cd $(BUILD_DIR) && cmake $(CMAKE_ARGS) $(CURRENT_MAKEFILE_DIR) && cmake --build . --config Release -j $(JOBS)
cp $(BUILD_DIR)/grpc-server grpc-server
cp $(BUILD_DIR)/ds4-worker ds4-worker
package: grpc-server
bash package.sh
@@ -74,7 +75,7 @@ test:
@echo "ds4 backend: e2e coverage at tests/e2e-backends/ (BACKEND_BINARY mode)"
clean:
rm -rf $(BUILD_DIR) grpc-server package
rm -rf $(BUILD_DIR) grpc-server ds4-worker package
if [ -d ds4 ]; then $(MAKE) -C ds4 clean; fi
purge: clean

View File

@@ -23,8 +23,11 @@ extern "C" {
#include <atomic>
#include <chrono>
#include <climits>
#include <csignal>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <iostream>
#include <memory>
#include <mutex>
@@ -51,6 +54,12 @@ ds4_session *g_session = nullptr;
int g_ctx_size = 32768;
std::string g_kv_cache_dir; // empty disables disk cache
// Distributed coordinator state. g_distributed is set true when LoadModel is
// given 'ds4_role:coordinator'; generation then waits for the worker route to
// form before running. Single-node behavior is unchanged when unset.
bool g_distributed = false;
int g_route_timeout_sec = 60;
std::atomic<Server *> g_server{nullptr};
// Parse a "key:value" option string. Returns empty when no colon.
@@ -60,6 +69,77 @@ static std::pair<std::string, std::string> split_option(const std::string &opt)
return {opt.substr(0, colon), opt.substr(colon + 1)};
}
// Parse a positive base-10 integer. Returns false (without throwing) on empty,
// trailing garbage, non-positive, or overflow - unlike std::stoi.
static bool parse_positive_int(const std::string &s, int *out) {
if (s.empty()) return false;
char *end = nullptr;
long v = std::strtol(s.c_str(), &end, 10);
if (!end || *end != '\0' || v <= 0 || v > INT_MAX) return false;
*out = static_cast<int>(v);
return true;
}
// Parse a ds4 layer spec "START:END" or "START:output" into the engine's
// distributed layer fields. Returns false on malformed input.
static bool parse_layers_spec(const std::string &spec, ds4_distributed_layers *out) {
auto colon = spec.find(':');
if (colon == std::string::npos) return false;
std::string lhs = spec.substr(0, colon);
std::string rhs = spec.substr(colon + 1);
if (lhs.empty() || rhs.empty()) return false;
char *end = nullptr;
long start = std::strtol(lhs.c_str(), &end, 10);
if (!end || *end != '\0' || start < 0) return false;
out->start = static_cast<uint32_t>(start);
out->has_output = false;
if (rhs == "output") {
out->has_output = true;
out->end = out->start; // engine treats has_output as "through final layer"
} else {
long e = std::strtol(rhs.c_str(), &end, 10);
if (!end || *end != '\0' || e < start) return false;
out->end = static_cast<uint32_t>(e);
}
out->set = true;
return true;
}
// When acting as a distributed coordinator, block until the worker route
// covers all layers (ds4_session_distributed_route_ready == 1) or the timeout
// elapses. Returns an empty string on success, or an error message to return
// to the client. No-op when not distributed.
//
// Takes the g_engine_mu lock by reference and RELEASES it during each poll
// sleep. The wait can span up to g_route_timeout_sec seconds while workers
// connect; holding g_engine_mu the whole time would block the Status/Health
// readiness probes (they also lock g_engine_mu), making LocalAI's loader treat
// a still-starting worker as hung.
static std::string wait_route_ready(std::unique_lock<std::mutex> &lock) {
if (!g_distributed) return "";
char err[256] = {0};
const int deadline_polls = g_route_timeout_sec * 10; // 100ms per poll
for (int i = 0; i <= deadline_polls; ++i) {
int ready = ds4_session_distributed_route_ready(g_session, err, sizeof(err));
if (ready == 1) return "";
if (ready < 0) {
return std::string("ds4 distributed route error: ") +
(err[0] ? err : "unknown");
}
// Release the lock while sleeping so Status/Health and other RPCs can
// interleave during worker startup.
lock.unlock();
struct timespec ts = {0, 100L * 1000L * 1000L}; // 100ms
nanosleep(&ts, nullptr);
lock.lock();
// A concurrent Free() may have torn down the engine while we slept.
if (!g_engine || !g_session) {
return "ds4: model unloaded while waiting for distributed route";
}
}
return "ds4 distributed route incomplete: workers not connected (layers uncovered)";
}
static void append_token_text(ds4_engine *engine, int token, std::string &out) {
size_t len = 0;
const char *text = ds4_token_text(engine, token, &len);
@@ -377,6 +457,11 @@ public:
backend::Result *result) override {
std::lock_guard<std::mutex> lock(g_engine_mu);
// Reset distributed state so a model swap (a second LoadModel without
// ds4_role) doesn't inherit a stale coordinator configuration.
g_distributed = false;
g_route_timeout_sec = 60;
if (g_engine) {
if (g_session) { ds4_session_free(g_session); g_session = nullptr; }
ds4_engine_close(g_engine);
@@ -394,12 +479,23 @@ public:
std::string mtp_path;
int mtp_draft = 0;
float mtp_margin = 3.0f;
std::string ds4_role, ds4_layers, ds4_listen;
for (const auto &opt : request->options()) {
auto [k, v] = split_option(opt);
if (k == "mtp_path") mtp_path = v;
else if (k == "mtp_draft") mtp_draft = std::stoi(v);
else if (k == "mtp_margin") mtp_margin = std::stof(v);
else if (k == "kv_cache_dir") g_kv_cache_dir = v;
else if (k == "ds4_role") ds4_role = v;
else if (k == "ds4_layers") ds4_layers = v;
else if (k == "ds4_listen") ds4_listen = v;
else if (k == "ds4_route_timeout") {
if (!parse_positive_int(v, &g_route_timeout_sec)) {
result->set_success(false);
result->set_message("ds4: ds4_route_timeout must be a positive integer");
return GStatus::OK;
}
}
}
g_kv_cache.SetDir(g_kv_cache_dir);
@@ -422,6 +518,49 @@ public:
opt.backend = DS4_BACKEND_CUDA;
#endif
// Coordinator wiring. 'ds4_role:coordinator' enables layer-split
// distributed inference: this process listens on ds4_listen and owns
// the ds4_layers slice; workers dial in (see `local-ai worker
// ds4-distributed`). Absent ds4_role => unchanged single-node path.
// Must be static: opt.distributed.listen_host is a const char* the
// engine retains past this call, so it cannot point at a local that
// goes out of scope (otherwise a future "simplify to local" refactor
// reintroduces a dangling pointer).
static std::string s_listen_host;
if (ds4_role == "coordinator") {
if (ds4_layers.empty() || ds4_listen.empty()) {
result->set_success(false);
result->set_message("ds4: ds4_role:coordinator requires ds4_layers and ds4_listen");
return GStatus::OK;
}
// host:port for IPv4/hostname; IPv6 literals are unsupported (the
// first colon would split inside the address).
auto host_port = split_option(ds4_listen); // "host:port" -> {host, port}
if (host_port.second.empty()) {
result->set_success(false);
result->set_message("ds4: ds4_listen must be host:port");
return GStatus::OK;
}
int listen_port = 0;
if (!parse_positive_int(host_port.second, &listen_port)) {
result->set_success(false);
result->set_message("ds4: ds4_listen port must be a positive integer");
return GStatus::OK;
}
ds4_distributed_layers layers = {};
if (!parse_layers_spec(ds4_layers, &layers)) {
result->set_success(false);
result->set_message("ds4: invalid ds4_layers (want START:END or START:output)");
return GStatus::OK;
}
s_listen_host = host_port.first;
opt.distributed.role = DS4_DISTRIBUTED_COORDINATOR;
opt.distributed.layers = layers;
opt.distributed.listen_host = s_listen_host.c_str();
opt.distributed.listen_port = listen_port;
g_distributed = true;
}
int rc = ds4_engine_open(&g_engine, &opt);
if (rc != 0 || !g_engine) {
result->set_success(false);
@@ -458,10 +597,13 @@ public:
GStatus Predict(ServerContext *, const backend::PredictOptions *request,
backend::Reply *reply) override {
std::lock_guard<std::mutex> lock(g_engine_mu);
std::unique_lock<std::mutex> lock(g_engine_mu);
if (!g_engine || !g_session) {
return GStatus(StatusCode::FAILED_PRECONDITION, "ds4: model not loaded");
}
if (std::string route_err = wait_route_ready(lock); !route_err.empty()) {
return GStatus(StatusCode::UNAVAILABLE, route_err);
}
ds4_tokens prompt = {};
build_prompt(g_engine, request, &prompt);
int n_predict = request->tokens() > 0 ? request->tokens() : 256;
@@ -554,10 +696,13 @@ public:
GStatus PredictStream(ServerContext *, const backend::PredictOptions *request,
ServerWriter<backend::Reply> *writer) override {
std::lock_guard<std::mutex> lock(g_engine_mu);
std::unique_lock<std::mutex> lock(g_engine_mu);
if (!g_engine || !g_session) {
return GStatus(StatusCode::FAILED_PRECONDITION, "ds4: model not loaded");
}
if (std::string route_err = wait_route_ready(lock); !route_err.empty()) {
return GStatus(StatusCode::UNAVAILABLE, route_err);
}
ds4_tokens prompt = {};
build_prompt(g_engine, request, &prompt);
int n_predict = request->tokens() > 0 ? request->tokens() : 256;

View File

@@ -5,7 +5,8 @@ REPO_ROOT="${CURDIR}/../../.."
mkdir -p "$CURDIR/package/lib"
cp -avf "$CURDIR/grpc-server" "$CURDIR/package/"
cp -rfv "$CURDIR/run.sh" "$CURDIR/package/"
cp -avf "$CURDIR/ds4-worker" "$CURDIR/package/"
cp -rfv "$CURDIR/run.sh" "$CURDIR/package/"
UNAME_S=$(uname -s)
if [ "$UNAME_S" = "Darwin" ]; then

View File

@@ -0,0 +1,126 @@
// ds4-worker: standalone distributed worker for the LocalAI ds4 backend.
//
// A ds4 distributed worker owns a slice of the model's transformer layers,
// dials the coordinator, and serves activations for its slice. It does NOT
// speak backend.proto - it speaks ds4's own TCP transport via ds4_dist_run().
// This binary is intentionally minimal (no HTTP/web/kvstore/linenoise): it
// only needs the engine objects + ds4_distributed.o, which the backend already
// builds. It is launched by `local-ai worker ds4-distributed`.
//
// Usage:
// ds4-worker --role worker --model <gguf> --layers 20:output \
// --coordinator <host> <port> [--cpu|--cuda|--metal] [-c CTX] [-t N]
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <limits.h>
#include "ds4.h"
#include "ds4_distributed.h"
static const char *need_arg(int *i, int argc, char **argv, const char *flag) {
if (*i + 1 >= argc) {
fprintf(stderr, "ds4-worker: missing value for %s\n", flag);
exit(2);
}
return argv[++(*i)];
}
static int parse_int_arg(const char *s, const char *flag) {
char *end = NULL;
long v = strtol(s, &end, 10);
if (!s[0] || *end || v <= 0 || v > INT_MAX) {
fprintf(stderr, "ds4-worker: invalid value for %s: %s\n", flag, s);
exit(2);
}
return (int)v;
}
static ds4_backend default_backend(void) {
#if defined(DS4_NO_GPU)
return DS4_BACKEND_CPU;
#elif defined(__APPLE__)
return DS4_BACKEND_METAL;
#else
return DS4_BACKEND_CUDA;
#endif
}
int main(int argc, char **argv) {
signal(SIGPIPE, SIG_IGN);
ds4_engine_options opt = {0};
opt.backend = default_backend();
int ctx_size = 32768;
for (int i = 1; i < argc; i++) {
const char *arg = argv[i];
if (!strcmp(arg, "-h") || !strcmp(arg, "--help")) {
fprintf(stdout, "ds4-worker: standalone ds4 distributed worker\n");
ds4_dist_usage(stdout);
fprintf(stdout, " -m, --model PATH model GGUF (the worker loads only its --layers slice)\n");
fprintf(stdout, " -c, --ctx N context size (default 32768)\n");
fprintf(stdout, " -t, --threads N CPU threads\n");
fprintf(stdout, " --cpu|--cuda|--metal backend override\n");
return 0;
}
char dist_err[256] = {0};
ds4_dist_cli_parse_result dist_parse =
ds4_dist_parse_cli_arg(arg, &i, argc, argv, &opt.distributed,
dist_err, sizeof(dist_err));
if (dist_parse == DS4_DIST_CLI_ERROR) {
fprintf(stderr, "ds4-worker: %s\n",
dist_err[0] ? dist_err : "invalid distributed option");
return 2;
}
if (dist_parse == DS4_DIST_CLI_MATCHED) continue;
if (!strcmp(arg, "-m") || !strcmp(arg, "--model")) {
opt.model_path = need_arg(&i, argc, argv, arg);
} else if (!strcmp(arg, "-c") || !strcmp(arg, "--ctx")) {
ctx_size = parse_int_arg(need_arg(&i, argc, argv, arg), arg);
} else if (!strcmp(arg, "-t") || !strcmp(arg, "--threads")) {
opt.n_threads = parse_int_arg(need_arg(&i, argc, argv, arg), arg);
} else if (!strcmp(arg, "--cpu")) {
opt.backend = DS4_BACKEND_CPU;
} else if (!strcmp(arg, "--cuda")) {
opt.backend = DS4_BACKEND_CUDA;
} else if (!strcmp(arg, "--metal")) {
opt.backend = DS4_BACKEND_METAL;
} else {
fprintf(stderr, "ds4-worker: unknown option: %s\n", arg);
return 2;
}
}
if (opt.distributed.role != DS4_DISTRIBUTED_WORKER) {
fprintf(stderr, "ds4-worker: --role worker is required\n");
return 2;
}
if (!opt.model_path) {
fprintf(stderr, "ds4-worker: --model is required\n");
return 2;
}
char prep_err[256] = {0};
if (ds4_dist_prepare_engine_options(&opt.distributed, &opt,
prep_err, sizeof(prep_err)) != 0) {
fprintf(stderr, "ds4-worker: %s\n", prep_err);
return 2;
}
ds4_engine *engine = NULL;
if (ds4_engine_open(&engine, &opt) != 0 || !engine) {
fprintf(stderr, "ds4-worker: failed to open engine\n");
return 1;
}
ds4_dist_generation_options gen = {0};
gen.ctx_size = ctx_size;
int rc = ds4_dist_run(engine, &opt.distributed, &gen);
ds4_engine_close(engine);
return rc;
}

View File

@@ -14,4 +14,5 @@ type Worker struct {
LLamaCPP LLamaCPP `cmd:"" name:"llama-cpp-rpc" help:"Starts a llama.cpp worker in standalone mode"`
MLXDistributed MLXDistributed `cmd:"" name:"mlx-distributed" help:"Starts an MLX distributed worker in standalone mode (requires --hostfile and --rank)"`
VLLMDistributed VLLMDistributed `cmd:"" name:"vllm" help:"Starts a vLLM data-parallel follower process. Multi-node DP for a single model: head runs the existing vllm backend with engine_args.data_parallel_size>1, followers run this command."`
DS4Distributed DS4Distributed `cmd:"" name:"ds4-distributed" help:"Starts a ds4 distributed worker in standalone mode: owns a layer slice and dials the coordinator (pass ds4-worker args after --)"`
}

View File

@@ -0,0 +1,108 @@
package worker
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
"strings"
"syscall"
cliContext "github.com/mudler/LocalAI/core/cli/context"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/gallery"
"github.com/mudler/LocalAI/pkg/model"
"github.com/mudler/LocalAI/pkg/system"
"github.com/mudler/xlog"
)
type DS4Distributed struct {
WorkerFlags `embed:""`
ExtraDS4Args string `name:"ds4-args" env:"LOCALAI_EXTRA_DS4_ARGS,EXTRA_DS4_ARGS" help:"Arguments passed to ds4-worker (e.g. '--role worker --model m.gguf --layers 20:output --coordinator HOST PORT')"`
}
const (
ds4WorkerBinaryName = "ds4-worker"
ds4GalleryName = "ds4"
)
// ds4WorkerArgs builds the argv for syscall.Exec when launching ds4-worker
// directly: the binary path followed by the space-split extra args. An empty
// extra string yields a bare invocation.
func ds4WorkerArgs(binary, extra string) []string {
args := []string{binary}
args = append(args, strings.Fields(extra)...)
return args
}
func findDS4Backend(galleries string, systemState *system.SystemState, requireIntegrity bool) (string, error) {
backends, err := gallery.ListSystemBackends(systemState)
if err != nil {
xlog.Warn("Failed listing system backends", "error", err)
return "", err
}
backend, ok := backends.Get(ds4GalleryName)
if !ok {
ml := model.NewModelLoader(systemState)
var gals []config.Gallery
if err := json.Unmarshal([]byte(galleries), &gals); err != nil {
xlog.Error("failed loading galleries", "error", err)
return "", err
}
if err := gallery.InstallBackendFromGallery(context.Background(), gals, systemState, ml, ds4GalleryName, nil, true, requireIntegrity); err != nil {
xlog.Error("ds4 backend not found, failed to install it", "error", err)
return "", err
}
backends, err = gallery.ListSystemBackends(systemState)
if err != nil {
return "", err
}
backend, ok = backends.Get(ds4GalleryName)
if !ok {
return "", errors.New("ds4 backend not found after install")
}
}
backendPath := filepath.Dir(backend.RunFile)
if backendPath == "" {
return "", errors.New("ds4 backend not found, install it first")
}
return filepath.Join(backendPath, ds4WorkerBinaryName), nil
}
func (r *DS4Distributed) Run(ctx *cliContext.Context) error {
if r.ExtraDS4Args == "" && len(os.Args) < 4 {
return fmt.Errorf("usage: local-ai worker ds4-distributed -- --role worker --model <gguf> --layers <START:END|START:output> --coordinator <host> <port>")
}
systemState, err := system.GetSystemState(
system.WithBackendPath(r.BackendsPath),
system.WithBackendSystemPath(r.BackendsSystemPath),
)
if err != nil {
return err
}
worker, err := findDS4Backend(r.BackendGalleries, systemState, r.RequireBackendIntegrity)
if err != nil {
return err
}
// ds4 bundles its own dynamic loader (lib/ld.so) for glibc compatibility,
// like backend/cpp/ds4/run.sh does for grpc-server. Launch ds4-worker via
// that loader when present; otherwise exec it directly. (This is a
// deliberate divergence from worker_llamacpp.go, which has no bundled loader.)
backendPath := filepath.Dir(worker)
env := os.Environ()
loader := filepath.Join(backendPath, "lib", "ld.so")
if _, statErr := os.Stat(loader); statErr == nil {
env = append(env, "LD_LIBRARY_PATH="+filepath.Join(backendPath, "lib")+":"+os.Getenv("LD_LIBRARY_PATH"))
args := append([]string{loader}, ds4WorkerArgs(worker, r.ExtraDS4Args)...)
return syscall.Exec(loader, args, env)
}
return syscall.Exec(worker, ds4WorkerArgs(worker, r.ExtraDS4Args), env)
}

View File

@@ -0,0 +1,28 @@
package worker
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("ds4 worker CLI", func() {
It("uses the ds4 backend gallery name and worker binary name", func() {
Expect(ds4GalleryName).To(Equal("ds4"))
Expect(ds4WorkerBinaryName).To(Equal("ds4-worker"))
})
It("assembles direct exec args as [binary, extra-split...]", func() {
args := ds4WorkerArgs("/b/ds4-worker", "--role worker --model m.gguf --layers 20:output --coordinator 10.0.0.1 1234")
Expect(args).To(Equal([]string{
"/b/ds4-worker",
"--role", "worker",
"--model", "m.gguf",
"--layers", "20:output",
"--coordinator", "10.0.0.1", "1234",
}))
})
It("drops empty extra args to a bare binary invocation", func() {
Expect(ds4WorkerArgs("/b/ds4-worker", "")).To(Equal([]string{"/b/ds4-worker"}))
})
})

View File

@@ -424,6 +424,72 @@ engine_args:
- **Network reachability.** The head's `data_parallel_rpc_port` plus a range of ZMQ ports (typically `data_parallel_rpc_port..+N`) must be reachable from every follower. Open them in your firewall / security group.
- **Topology must match exactly.** A mismatch in `--data-parallel-size` between head and any follower will hang the handshake. Check the head's vLLM logs for `waiting for N DP ranks` if startup stalls.
## ds4 Layer-Split Distributed Inference
The ds4 backend (DeepSeek V4 Flash) supports **layer-parallel** distributed inference: a single model that is too large for one machine is split by transformer layer across several machines. Each machine must have the GGUF present locally, but loads **only its own slice** of the layers. This lets you run a model whose weights exceed any single host's memory.
This is **not** routed through the SmartRouter: it is a model-internal split, configured manually (Phase 1). It is unrelated to the NATS/PostgreSQL distributed mode described above.
### Topology
ds4 uses a **coordinator/worker** split:
- The **coordinator** owns tokenization, sampling, the prompt, and a low layer range (e.g. `0:19`). It is LocalAI's ds4 backend and **listens** on a host/port. Workers dial into it.
- One or more **workers** own higher layer ranges (e.g. `20:output`). Each worker loads only its slice and **dials the coordinator** to register the range it can serve. The last worker normally owns the output head.
- Activations flow through the connected slices and back to the coordinator. The route is "ready" only once the coordinator plus all connected workers cover every layer.
This dial direction is the **inverse** of the llama.cpp RPC model, where the main server dials *out* to a list of `rpc-server` workers. With ds4 the **workers dial in** to the coordinator.
### Coordinator setup
The coordinator is a normal LocalAI ds4 model whose YAML carries distributed `options:`:
```yaml
name: ds4flash
backend: ds4
options:
- "ds4_role:coordinator"
- "ds4_layers:0:19"
- "ds4_listen:0.0.0.0:1234"
```
| Option | Meaning |
|--------|---------|
| `ds4_role:coordinator` | Enables distributed coordinator mode. Without `ds4_role`, the backend behaves as a normal single-node ds4 model. |
| `ds4_layers:0:19` | The coordinator's own layer slice (inclusive). |
| `ds4_listen:0.0.0.0:1234` | Address that workers dial into. |
| `ds4_route_timeout:60` | Optional. Seconds the coordinator waits for the worker route to form before returning an error on a request. Defaults to 60. |
{{% notice warning %}}
Worker↔coordinator traffic is **plaintext and unauthenticated**: there is no TLS or auth on this channel. Bind `ds4_listen` to an address on a trusted/private network only; using `0.0.0.0` exposes the coordinator on every interface. Run the layer split exclusively over a network you control.
{{% /notice %}}
Once the model is loaded, the coordinator serves requests exactly like a single-node ds4 model: generation goes through the ordinary inference path and is transparently routed across the layer slices.
### Worker setup
On each worker machine (with the GGUF present locally), start a worker pointed at the coordinator:
```bash
local-ai worker ds4-distributed -- \
--role worker \
--model /models/ds4flash.gguf \
--layers 20:output \
--coordinator <coordinator-host> 1234
```
`local-ai worker ds4-distributed` resolves the ds4 backend and execs the packaged `ds4-worker` binary, passing everything after `--` straight through.
### Layer-range semantics
- Ranges are **inclusive**: `0:19` is layers 0 through 19.
- `N:output` means layer N through the final layer **plus the output head**. The last worker normally owns the output head.
- The coordinator and all connected workers together **must cover every layer**. Until they do, the coordinator returns a gRPC `UNAVAILABLE` error on inference requests (so a worker that starts slightly after the coordinator is tolerated: once it connects and the route is complete, requests succeed). The wait is tunable via `ds4_route_timeout`.
{{% notice note %}}
ds4 layer-split inference is **manual setup** in this release (Phase 1): you place the coordinator config and launch each worker yourself, and the layer ranges must be partitioned by hand so they cover the whole model. P2P auto-discovery of the coordinator is planned for a later phase.
{{% /notice %}}
## Scaling
**Adding worker capacity:** Start additional `worker` instances pointing to the same frontend. They self-register automatically:

View File

@@ -15,6 +15,7 @@ mkdir -p build/darwin/lib
mkdir -p backend-images
cp -rf backend/cpp/ds4/grpc-server build/darwin/
cp -rf backend/cpp/ds4/ds4-worker build/darwin/
cp -rf backend/cpp/ds4/run.sh build/darwin/
# Apple Silicon: pick up Homebrew-installed protobuf utf8_validity if present.
@@ -28,7 +29,7 @@ for file in $ADDITIONAL_LIBS; do
done
# Walk dylibs via otool -L and bundle anything that isn't a system framework.
for file in build/darwin/grpc-server; do
for file in build/darwin/grpc-server build/darwin/ds4-worker; do
LIBS="$(otool -L "$file" | awk 'NR > 1 { system("echo " $1) } ' | xargs echo)"
for lib in $LIBS; do
if [[ "$lib" == *.dylib ]] && [[ -e "$lib" ]]; then

View File

@@ -1144,6 +1144,226 @@ var _ = Describe("Backend container", Ordered, func() {
})
})
// ─── ds4 layer-split distributed inference (opt-in, hardware-gated) ──────────
//
// ds4 distributes a single model across machines by transformer layer: a
// coordinator (LocalAI's ds4 backend) listens and owns a low layer slice; one
// or more `ds4-worker` processes dial in and own the higher slices (the last
// owns the output head). The route is "ready" only once coordinator + workers
// cover every layer; until then the coordinator returns gRPC UNAVAILABLE.
//
// This spec is entirely opt-in. It only runs when BACKEND_TEST_DS4_DISTRIBUTED=1
// is set, AND the suite's normal BACKEND_BINARY (the packaged ds4 run.sh) and
// BACKEND_TEST_MODEL_FILE (the GGUF, present on this machine) are provided. With
// none of those set it compiles and SKIPs cleanly: no hardware, model, or
// network required.
//
// What it covers (single-host, two-process): coordinator option-loading
// (ds4_role/ds4_layers/ds4_listen) through the real gRPC LoadModel path, a
// real ds4-worker process spawned for the upper layers dialing the listen
// address, route formation, and a short successful Predict once the route is
// up. It then tears the worker down.
//
// What it does NOT cover: multi-host networking, >1 worker, failure/timeout
// paths, or the `local-ai worker ds4-distributed` CLI wrapper (that resolves
// the backend + execs ds4-worker and is unit-tested separately); here we exec
// the packaged ds4-worker binary directly so the e2e stays self-contained.
//
// Env vars (in addition to BACKEND_BINARY + BACKEND_TEST_MODEL_FILE):
//
// BACKEND_TEST_DS4_DISTRIBUTED Set to "1" to enable this spec.
// BACKEND_TEST_DS4_WORKER_BINARY Path to the packaged `ds4-worker` binary.
// Defaults to a `ds4-worker` sitting next to
// the BACKEND_BINARY run.sh.
// BACKEND_TEST_DS4_COORDINATOR_LAYERS Coordinator's own layer slice (default "0:19").
// BACKEND_TEST_DS4_WORKER_LAYERS Worker's layer slice (default "20:output").
// BACKEND_TEST_DS4_LISTEN Address workers dial into (default "127.0.0.1:<free port>").
// BACKEND_TEST_DS4_WORKER_ACCEL Optional accel flag for the worker:
// "cpu" (default), "cuda", or "metal".
var _ = Describe("ds4 layer-split distributed inference", Ordered, func() {
var (
workDir string
binaryDir string
modelFile string
listen string
coordCmd *exec.Cmd
workerCmd *exec.Cmd
conn *grpc.ClientConn
client pb.BackendClient
)
BeforeAll(func() {
if os.Getenv("BACKEND_TEST_DS4_DISTRIBUTED") != "1" {
Skip("ds4 distributed spec is opt-in; set BACKEND_TEST_DS4_DISTRIBUTED=1 (plus BACKEND_BINARY and BACKEND_TEST_MODEL_FILE) to run it")
}
binary := os.Getenv("BACKEND_BINARY")
Expect(binary).NotTo(BeEmpty(),
"ds4 distributed spec requires BACKEND_BINARY pointing at the packaged ds4 run.sh")
Expect(filepath.Base(binary)).To(Equal("run.sh"),
"BACKEND_BINARY must point at a run.sh produced by 'make -C backend/cpp/ds4 package'")
binaryDir = filepath.Dir(binary)
Expect(filepath.Join(binaryDir, "run.sh")).To(BeAnExistingFile())
modelFile = os.Getenv("BACKEND_TEST_MODEL_FILE")
Expect(modelFile).NotTo(BeEmpty(),
"ds4 distributed spec requires BACKEND_TEST_MODEL_FILE (GGUF present on this host)")
Expect(modelFile).To(BeAnExistingFile())
// Locate the ds4-worker binary the same way the suite locates the
// backend: next to the packaged run.sh, overridable via env.
workerBin := os.Getenv("BACKEND_TEST_DS4_WORKER_BINARY")
if workerBin == "" {
workerBin = filepath.Join(binaryDir, "ds4-worker")
}
Expect(workerBin).To(BeAnExistingFile(),
"ds4-worker binary not found (set BACKEND_TEST_DS4_WORKER_BINARY or package it next to run.sh)")
coordLayers := os.Getenv("BACKEND_TEST_DS4_COORDINATOR_LAYERS")
if coordLayers == "" {
coordLayers = "0:19"
}
workerLayers := os.Getenv("BACKEND_TEST_DS4_WORKER_LAYERS")
if workerLayers == "" {
workerLayers = "20:output"
}
listen = os.Getenv("BACKEND_TEST_DS4_LISTEN")
if listen == "" {
lp, err := freeport.GetFreePort()
Expect(err).NotTo(HaveOccurred())
listen = fmt.Sprintf("127.0.0.1:%d", lp)
}
// The worker dials the listen host/port as two separate CLI args.
listenHost, listenPort, err := net.SplitHostPort(listen)
Expect(err).NotTo(HaveOccurred(), "BACKEND_TEST_DS4_LISTEN must be host:port, got %q", listen)
workDir, err = os.MkdirTemp("", "ds4-dist-e2e-*")
Expect(err).NotTo(HaveOccurred())
Expect(os.Chmod(filepath.Join(binaryDir, "run.sh"), 0o755)).To(Succeed())
_ = os.Chmod(workerBin, 0o755)
// 1) Start the coordinator gRPC backend.
port, err := freeport.GetFreePort()
Expect(err).NotTo(HaveOccurred())
coordAddr := fmt.Sprintf("127.0.0.1:%d", port)
coordCmd = exec.Command(filepath.Join(binaryDir, "run.sh"), "--addr="+coordAddr)
coordCmd.Stdout = GinkgoWriter
coordCmd.Stderr = GinkgoWriter
Expect(coordCmd.Start()).To(Succeed())
Eventually(func() error {
c, derr := net.DialTimeout("tcp", coordAddr, 500*time.Millisecond)
if derr != nil {
return derr
}
_ = c.Close()
return nil
}, 30*time.Second, 200*time.Millisecond).Should(Succeed(), "coordinator backend did not start")
conn, err = grpc.Dial(coordAddr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(50*1024*1024)),
)
Expect(err).NotTo(HaveOccurred())
client = pb.NewBackendClient(conn)
// 2) Load the coordinator model with distributed options. This proves
// ds4_role/ds4_layers/ds4_listen option parsing through the real
// LoadModel path.
loadCtx, loadCancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer loadCancel()
ctxSize := envInt32("BACKEND_TEST_CTX_SIZE", 512)
res, err := client.LoadModel(loadCtx, &pb.ModelOptions{
Model: modelFile,
ModelFile: modelFile,
ContextSize: ctxSize,
Threads: envInt32("BACKEND_TEST_THREADS", 4),
NGPULayers: 0,
MMap: true,
Options: []string{
"ds4_role:coordinator",
"ds4_layers:" + coordLayers,
"ds4_listen:" + listen,
},
})
Expect(err).NotTo(HaveOccurred())
Expect(res.GetSuccess()).To(BeTrue(), "coordinator LoadModel failed: %s", res.GetMessage())
// 3) Spawn the ds4-worker for the upper layers, dialing the coordinator.
accel := strings.ToLower(strings.TrimSpace(os.Getenv("BACKEND_TEST_DS4_WORKER_ACCEL")))
workerArgs := []string{
"--role", "worker",
"--model", modelFile,
"--layers", workerLayers,
"--coordinator", listenHost, listenPort,
"-c", fmt.Sprintf("%d", ctxSize),
}
switch accel {
case "", "cpu":
workerArgs = append(workerArgs, "--cpu")
case "cuda":
workerArgs = append(workerArgs, "--cuda")
case "metal":
workerArgs = append(workerArgs, "--metal")
default:
Fail(fmt.Sprintf("unsupported BACKEND_TEST_DS4_WORKER_ACCEL=%q (want cpu|cuda|metal)", accel))
}
workerCmd = exec.Command(workerBin, workerArgs...)
workerCmd.Stdout = GinkgoWriter
workerCmd.Stderr = GinkgoWriter
Expect(workerCmd.Start()).To(Succeed())
})
AfterAll(func() {
if conn != nil {
_ = conn.Close()
}
if workerCmd != nil && workerCmd.Process != nil {
_ = workerCmd.Process.Kill()
_, _ = workerCmd.Process.Wait()
}
if coordCmd != nil && coordCmd.Process != nil {
_ = coordCmd.Process.Kill()
_, _ = coordCmd.Process.Wait()
}
if workDir != "" {
_ = os.RemoveAll(workDir)
}
})
It("forms the route and generates a short completion", func() {
// The coordinator returns UNAVAILABLE until the worker has connected and
// the layer range is fully covered. Eventually retries until the route
// is up (or the worker dies), then asserts non-empty content.
var last string
Eventually(func() error {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
res, err := client.Predict(ctx, &pb.PredictOptions{
Prompt: defaultPrompt,
Tokens: 20,
Temperature: 0.1,
TopK: 40,
TopP: 0.9,
})
if err != nil {
return err
}
last = string(res.GetMessage())
if last == "" {
return fmt.Errorf("predict returned empty content")
}
return nil
}, 5*time.Minute, 2*time.Second).Should(Succeed(),
"coordinator never produced a completion once the worker route should have formed")
GinkgoWriter.Printf("ds4 distributed Predict: %q\n", last)
})
})
// extractImage runs `docker create` + `docker export` to materialise the image
// rootfs into dest. Using export (not save) avoids dealing with layer tarballs.
func extractImage(image, dest string) {