From b85b7e29dff124f82bf24d868cca045047fcda6e Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Sun, 24 May 2026 20:14:02 +0000 Subject: [PATCH] feat(distributed): surface picked node ID via X-LocalAI-Node header Plumb the SmartRouter's per-request node decision up to the OpenAI inference handlers (chat, completions, embeddings) and attach it as the X-LocalAI-Node response header when the operator enabled --expose-node-header. Wiring: - pkg/model.Model gains a NodeID field plus mutex-guarded SetNodeID/NodeID accessors. The router stamps it on the *Model it returns from NewModelWithClient; the field stays empty for in-process loads. - core/services/nodes/model_router.go SetNodeID after constructing the Model so the in-process store carries the most-recent routing decision per modelID. - core/http/endpoints/openai/node_header.go centralizes the policy in maybeSetNodeHeader (no-op when the flag is off, the model is not loaded, or no node ID is recorded). chat, completion and embeddings handlers call it before writing the response. Best-effort caveat: the distributed LoadModel path overwrites the per modelID store entry on every routing decision, so under heavy concurrency the header reflects "a recent decision" rather than "the exact node that served this exact request". This is acceptable for observability and matches what operators already see in the cluster logs. Documented in the flag help text and in the distributed-mode feature doc. Assisted-by: Claude:claude-opus-4-7[1m] Signed-off-by: Ettore Di Giacinto --- core/http/endpoints/openai/chat.go | 10 ++++ core/http/endpoints/openai/completion.go | 10 ++++ core/http/endpoints/openai/embeddings.go | 5 ++ core/http/endpoints/openai/node_header.go | 58 +++++++++++++++++++++++ core/services/nodes/model_router.go | 5 ++ docs/content/features/distributed-mode.md | 1 + pkg/model/model.go | 25 ++++++++++ 7 files changed, 114 insertions(+) create mode 100644 core/http/endpoints/openai/node_header.go diff --git a/core/http/endpoints/openai/chat.go b/core/http/endpoints/openai/chat.go index db716e4b7..7247ad18c 100644 --- a/core/http/endpoints/openai/chat.go +++ b/core/http/endpoints/openai/chat.go @@ -325,6 +325,11 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator c.Response().Header().Set("Cache-Control", "no-cache") c.Response().Header().Set("Connection", "keep-alive") c.Response().Header().Set("X-Correlation-ID", id) + // Best-effort: surface the picked node ID when a previous request + // has already routed this model. For the very first request after + // a fresh start the model isn't loaded yet, so the header is + // omitted; warm-cache requests get accurate attribution. + maybeSetNodeHeader(c, startupOptions, ml, input.Model) mcpStreamMaxIterations := 10 if config.Agent.MaxIterations > 0 { @@ -935,6 +940,11 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator respData, _ := json.Marshal(resp) xlog.Debug("Response", "response", string(respData)) + // Attribute the response to a specific worker node when + // distributed mode is enabled and the operator opted in via + // --expose-node-header. No-op otherwise. + maybeSetNodeHeader(c, startupOptions, ml, input.Model) + // Return the prediction in the response body return c.JSON(200, resp) } // end MCP iteration loop diff --git a/core/http/endpoints/openai/completion.go b/core/http/endpoints/openai/completion.go index f81e13e6a..354e7e461 100644 --- a/core/http/endpoints/openai/completion.go +++ b/core/http/endpoints/openai/completion.go @@ -106,6 +106,11 @@ func CompletionEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, eva c.Response().Header().Set("Content-Type", "text/event-stream") c.Response().Header().Set("Cache-Control", "no-cache") c.Response().Header().Set("Connection", "keep-alive") + // Best-effort: surface the picked node ID when a previous request + // has already routed this model. For the very first request after + // a fresh start the model isn't loaded yet, so the header is + // omitted; warm-cache requests get accurate attribution. + maybeSetNodeHeader(c, appConfig, ml, input.Model) if len(config.PromptStrings) > 1 { return errors.New("cannot handle more than 1 `PromptStrings` when Streaming") @@ -274,6 +279,11 @@ func CompletionEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, eva jsonResult, _ := json.Marshal(resp) xlog.Debug("Response", "response", string(jsonResult)) + // Attribute the response to a specific worker node when distributed + // mode is enabled and the operator opted in via --expose-node-header. + // No-op otherwise. + maybeSetNodeHeader(c, appConfig, ml, input.Model) + // Return the prediction in the response body return c.JSON(200, resp) } diff --git a/core/http/endpoints/openai/embeddings.go b/core/http/endpoints/openai/embeddings.go index 517881f66..edf30b762 100644 --- a/core/http/endpoints/openai/embeddings.go +++ b/core/http/endpoints/openai/embeddings.go @@ -102,6 +102,11 @@ func EmbeddingsEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, app jsonResult, _ := json.Marshal(resp) xlog.Debug("Response", "response", string(jsonResult)) + // Attribute the response to a specific worker node when distributed + // mode is enabled and the operator opted in via --expose-node-header. + // No-op otherwise. + maybeSetNodeHeader(c, appConfig, ml, input.Model) + // Return the prediction in the response body return c.JSON(200, resp) } diff --git a/core/http/endpoints/openai/node_header.go b/core/http/endpoints/openai/node_header.go new file mode 100644 index 000000000..0fb32d770 --- /dev/null +++ b/core/http/endpoints/openai/node_header.go @@ -0,0 +1,58 @@ +package openai + +import ( + "github.com/labstack/echo/v4" + + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/pkg/model" +) + +// NodeHeaderName is the HTTP response header that, when --expose-node-header +// is enabled, carries the ID of the distributed-mode worker node that served +// the inference request. Off by default: node IDs reveal internal topology +// and shouldn't be exposed on a public endpoint. +const NodeHeaderName = "X-LocalAI-Node" + +// nodeIDContextKey is the Echo context key used to stash the picked node ID +// for the current request, so streaming handlers can attach the header +// before any chunk is written. The constant lives here to keep the surface +// minimal: it is private to the openai handlers package. +const nodeIDContextKey = "served-by-node-id" + +// MaybeSetNodeHeaderForTest is an exported shim around maybeSetNodeHeader +// to let external tests (package openai_test) exercise the header policy +// without exporting the implementation. Production callers must use the +// internal lowercase helper. +func MaybeSetNodeHeaderForTest(c echo.Context, appConfig *config.ApplicationConfig, ml *model.ModelLoader, modelID string) { + maybeSetNodeHeader(c, appConfig, ml, modelID) +} + +// maybeSetNodeHeader writes the X-LocalAI-Node response header when the +// operator has opted in via --expose-node-header / LOCALAI_EXPOSE_NODE_HEADER +// and the requested model's most-recent routing decision carries a node ID. +// +// Best-effort: the ModelLoader's distributed store keeps only the latest +// routing per modelID, so under high concurrency for the same model across +// different nodes, this header reflects "a recent decision" rather than +// "the exact node that served this exact request". That is acceptable for +// observability and load-balancer attribution: it is the same constraint +// the operator already sees in the distributed-mode logs. +// +// Callers should invoke this right before writing the first byte of the +// response (after computing the result for buffered handlers, or before +// streaming begins for SSE handlers) so Echo flushes it with the headers. +func maybeSetNodeHeader(c echo.Context, appConfig *config.ApplicationConfig, ml *model.ModelLoader, modelID string) { + if appConfig == nil || !appConfig.ExposeNodeHeader || ml == nil || modelID == "" { + return + } + m := ml.CheckIsLoaded(modelID) + if m == nil { + return + } + nodeID := m.NodeID() + if nodeID == "" { + return + } + c.Set(nodeIDContextKey, nodeID) + c.Response().Header().Set(NodeHeaderName, nodeID) +} diff --git a/core/services/nodes/model_router.go b/core/services/nodes/model_router.go index d834bcbc8..de424a92b 100644 --- a/core/services/nodes/model_router.go +++ b/core/services/nodes/model_router.go @@ -68,6 +68,11 @@ func (a *ModelRouterAdapter) Route(ctx context.Context, backend, modelID, modelN // by SmartRouter. Use NewModelWithClient so the wrapper is preserved when // the ModelLoader returns this model on subsequent requests. m := model.NewModelWithClient(modelID, result.Node.Address, result.Client) + // Stash the picked node ID so HTTP handlers can surface it via the + // optional X-LocalAI-Node response header. Best-effort: the in-process + // store keeps only the latest routing decision per modelID; see the + // nodeID field comment on Model. + m.SetNodeID(result.Node.ID) xlog.Info("Model routed to remote node", "model", modelName, "node", result.Node.Name, "address", result.Node.Address) return m, nil diff --git a/docs/content/features/distributed-mode.md b/docs/content/features/distributed-mode.md index 792539bce..ceea3a7d4 100644 --- a/docs/content/features/distributed-mode.md +++ b/docs/content/features/distributed-mode.md @@ -88,6 +88,7 @@ The frontend is a standard LocalAI instance with distributed mode enabled. These | `--auth-database-url` | `LOCALAI_AUTH_DATABASE_URL` | *(required)* | PostgreSQL connection URL | | `--backend-install-timeout` | `LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT` | `15m` | How long the frontend waits for a worker to acknowledge a backend install before considering the request stalled. Raise it when workers pull large backend images over slow links. If a worker takes longer than this, the operation shows as "still installing in background" in the admin UI and clears once the worker finishes. | | `--backend-upgrade-timeout` | `LOCALAI_NATS_BACKEND_UPGRADE_TIMEOUT` | `15m` | Same as the install timeout, applied to backend upgrades (force-reinstall). | +| `--expose-node-header` | `LOCALAI_EXPOSE_NODE_HEADER` | `false` | When enabled, inference responses (chat completions, completions, embeddings) carry an `X-LocalAI-Node` header with the ID of the worker node that served the request. Useful for debugging, observability and load-balancer attribution. Off by default: the node ID reveals internal cluster topology and should not be exposed on a public endpoint. Best-effort: the header reflects the most recent routing decision recorded for the model, which under heavy concurrency may not exactly match the node that served a specific in-flight request. | ### Optional: S3 Object Storage diff --git a/pkg/model/model.go b/pkg/model/model.go index 4a0e6af2a..13bf3b330 100644 --- a/pkg/model/model.go +++ b/pkg/model/model.go @@ -19,6 +19,14 @@ type Model struct { client grpc.Backend process *process.Process lastHealthCheck time.Time + // nodeID is the ID of the distributed-mode worker node that owns this + // model handle, when set. Empty for in-process models. Best-effort: + // because the distributed LoadModel path overwrites the per-modelID + // store entry on every routing decision, this value reflects the + // most-recently-routed node for the model, not necessarily the node + // that served a specific in-flight request. Used by the optional + // X-LocalAI-Node response header (--expose-node-header). + nodeID string sync.Mutex } @@ -40,6 +48,23 @@ func NewModelWithClient(ID, address string, client grpc.Backend) *Model { } } +// SetNodeID records the distributed-mode worker node that owns this model +// handle. Safe to call from any goroutine. +func (m *Model) SetNodeID(id string) { + m.Lock() + defer m.Unlock() + m.nodeID = id +} + +// NodeID returns the distributed-mode worker node ID associated with this +// model handle, or "" if unknown / in-process. See the nodeID field comment +// for the best-effort caveat. +func (m *Model) NodeID() string { + m.Lock() + defer m.Unlock() + return m.nodeID +} + func (m *Model) Process() *process.Process { return m.process }