feat(distributed): surface picked node ID via X-LocalAI-Node header

Plumb the SmartRouter's per-request node decision up to the OpenAI
inference handlers (chat, completions, embeddings) and attach it as the
X-LocalAI-Node response header when the operator enabled
--expose-node-header.

Wiring:

- pkg/model.Model gains a NodeID field plus mutex-guarded
  SetNodeID/NodeID accessors. The router stamps it on the *Model it
  returns from NewModelWithClient; the field stays empty for in-process
  loads.
- core/services/nodes/model_router.go SetNodeID after constructing the
  Model so the in-process store carries the most-recent routing
  decision per modelID.
- core/http/endpoints/openai/node_header.go centralizes the policy in
  maybeSetNodeHeader (no-op when the flag is off, the model is not
  loaded, or no node ID is recorded). chat, completion and embeddings
  handlers call it before writing the response.

Best-effort caveat: the distributed LoadModel path overwrites the per
modelID store entry on every routing decision, so under heavy
concurrency the header reflects "a recent decision" rather than "the
exact node that served this exact request". This is acceptable for
observability and matches what operators already see in the cluster
logs. Documented in the flag help text and in the distributed-mode
feature doc.

Assisted-by: Claude:claude-opus-4-7[1m]
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2026-05-24 20:14:02 +00:00
parent 17791fb741
commit b85b7e29df
7 changed files with 114 additions and 0 deletions

View File

@@ -325,6 +325,11 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator
c.Response().Header().Set("Cache-Control", "no-cache")
c.Response().Header().Set("Connection", "keep-alive")
c.Response().Header().Set("X-Correlation-ID", id)
// Best-effort: surface the picked node ID when a previous request
// has already routed this model. For the very first request after
// a fresh start the model isn't loaded yet, so the header is
// omitted; warm-cache requests get accurate attribution.
maybeSetNodeHeader(c, startupOptions, ml, input.Model)
mcpStreamMaxIterations := 10
if config.Agent.MaxIterations > 0 {
@@ -935,6 +940,11 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator
respData, _ := json.Marshal(resp)
xlog.Debug("Response", "response", string(respData))
// Attribute the response to a specific worker node when
// distributed mode is enabled and the operator opted in via
// --expose-node-header. No-op otherwise.
maybeSetNodeHeader(c, startupOptions, ml, input.Model)
// Return the prediction in the response body
return c.JSON(200, resp)
} // end MCP iteration loop

View File

@@ -106,6 +106,11 @@ func CompletionEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, eva
c.Response().Header().Set("Content-Type", "text/event-stream")
c.Response().Header().Set("Cache-Control", "no-cache")
c.Response().Header().Set("Connection", "keep-alive")
// Best-effort: surface the picked node ID when a previous request
// has already routed this model. For the very first request after
// a fresh start the model isn't loaded yet, so the header is
// omitted; warm-cache requests get accurate attribution.
maybeSetNodeHeader(c, appConfig, ml, input.Model)
if len(config.PromptStrings) > 1 {
return errors.New("cannot handle more than 1 `PromptStrings` when Streaming")
@@ -274,6 +279,11 @@ func CompletionEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, eva
jsonResult, _ := json.Marshal(resp)
xlog.Debug("Response", "response", string(jsonResult))
// Attribute the response to a specific worker node when distributed
// mode is enabled and the operator opted in via --expose-node-header.
// No-op otherwise.
maybeSetNodeHeader(c, appConfig, ml, input.Model)
// Return the prediction in the response body
return c.JSON(200, resp)
}

View File

@@ -102,6 +102,11 @@ func EmbeddingsEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, app
jsonResult, _ := json.Marshal(resp)
xlog.Debug("Response", "response", string(jsonResult))
// Attribute the response to a specific worker node when distributed
// mode is enabled and the operator opted in via --expose-node-header.
// No-op otherwise.
maybeSetNodeHeader(c, appConfig, ml, input.Model)
// Return the prediction in the response body
return c.JSON(200, resp)
}

View File

@@ -0,0 +1,58 @@
package openai
import (
"github.com/labstack/echo/v4"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/pkg/model"
)
// NodeHeaderName is the HTTP response header that, when --expose-node-header
// is enabled, carries the ID of the distributed-mode worker node that served
// the inference request. Off by default: node IDs reveal internal topology
// and shouldn't be exposed on a public endpoint.
const NodeHeaderName = "X-LocalAI-Node"
// nodeIDContextKey is the Echo context key used to stash the picked node ID
// for the current request, so streaming handlers can attach the header
// before any chunk is written. The constant lives here to keep the surface
// minimal: it is private to the openai handlers package.
const nodeIDContextKey = "served-by-node-id"
// MaybeSetNodeHeaderForTest is an exported shim around maybeSetNodeHeader
// to let external tests (package openai_test) exercise the header policy
// without exporting the implementation. Production callers must use the
// internal lowercase helper.
func MaybeSetNodeHeaderForTest(c echo.Context, appConfig *config.ApplicationConfig, ml *model.ModelLoader, modelID string) {
maybeSetNodeHeader(c, appConfig, ml, modelID)
}
// maybeSetNodeHeader writes the X-LocalAI-Node response header when the
// operator has opted in via --expose-node-header / LOCALAI_EXPOSE_NODE_HEADER
// and the requested model's most-recent routing decision carries a node ID.
//
// Best-effort: the ModelLoader's distributed store keeps only the latest
// routing per modelID, so under high concurrency for the same model across
// different nodes, this header reflects "a recent decision" rather than
// "the exact node that served this exact request". That is acceptable for
// observability and load-balancer attribution: it is the same constraint
// the operator already sees in the distributed-mode logs.
//
// Callers should invoke this right before writing the first byte of the
// response (after computing the result for buffered handlers, or before
// streaming begins for SSE handlers) so Echo flushes it with the headers.
func maybeSetNodeHeader(c echo.Context, appConfig *config.ApplicationConfig, ml *model.ModelLoader, modelID string) {
if appConfig == nil || !appConfig.ExposeNodeHeader || ml == nil || modelID == "" {
return
}
m := ml.CheckIsLoaded(modelID)
if m == nil {
return
}
nodeID := m.NodeID()
if nodeID == "" {
return
}
c.Set(nodeIDContextKey, nodeID)
c.Response().Header().Set(NodeHeaderName, nodeID)
}

View File

@@ -68,6 +68,11 @@ func (a *ModelRouterAdapter) Route(ctx context.Context, backend, modelID, modelN
// by SmartRouter. Use NewModelWithClient so the wrapper is preserved when
// the ModelLoader returns this model on subsequent requests.
m := model.NewModelWithClient(modelID, result.Node.Address, result.Client)
// Stash the picked node ID so HTTP handlers can surface it via the
// optional X-LocalAI-Node response header. Best-effort: the in-process
// store keeps only the latest routing decision per modelID; see the
// nodeID field comment on Model.
m.SetNodeID(result.Node.ID)
xlog.Info("Model routed to remote node", "model", modelName, "node", result.Node.Name, "address", result.Node.Address)
return m, nil

View File

@@ -88,6 +88,7 @@ The frontend is a standard LocalAI instance with distributed mode enabled. These
| `--auth-database-url` | `LOCALAI_AUTH_DATABASE_URL` | *(required)* | PostgreSQL connection URL |
| `--backend-install-timeout` | `LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT` | `15m` | How long the frontend waits for a worker to acknowledge a backend install before considering the request stalled. Raise it when workers pull large backend images over slow links. If a worker takes longer than this, the operation shows as "still installing in background" in the admin UI and clears once the worker finishes. |
| `--backend-upgrade-timeout` | `LOCALAI_NATS_BACKEND_UPGRADE_TIMEOUT` | `15m` | Same as the install timeout, applied to backend upgrades (force-reinstall). |
| `--expose-node-header` | `LOCALAI_EXPOSE_NODE_HEADER` | `false` | When enabled, inference responses (chat completions, completions, embeddings) carry an `X-LocalAI-Node` header with the ID of the worker node that served the request. Useful for debugging, observability and load-balancer attribution. Off by default: the node ID reveals internal cluster topology and should not be exposed on a public endpoint. Best-effort: the header reflects the most recent routing decision recorded for the model, which under heavy concurrency may not exactly match the node that served a specific in-flight request. |
### Optional: S3 Object Storage

View File

@@ -19,6 +19,14 @@ type Model struct {
client grpc.Backend
process *process.Process
lastHealthCheck time.Time
// nodeID is the ID of the distributed-mode worker node that owns this
// model handle, when set. Empty for in-process models. Best-effort:
// because the distributed LoadModel path overwrites the per-modelID
// store entry on every routing decision, this value reflects the
// most-recently-routed node for the model, not necessarily the node
// that served a specific in-flight request. Used by the optional
// X-LocalAI-Node response header (--expose-node-header).
nodeID string
sync.Mutex
}
@@ -40,6 +48,23 @@ func NewModelWithClient(ID, address string, client grpc.Backend) *Model {
}
}
// SetNodeID records the distributed-mode worker node that owns this model
// handle. Safe to call from any goroutine.
func (m *Model) SetNodeID(id string) {
m.Lock()
defer m.Unlock()
m.nodeID = id
}
// NodeID returns the distributed-mode worker node ID associated with this
// model handle, or "" if unknown / in-process. See the nodeID field comment
// for the best-effort caveat.
func (m *Model) NodeID() string {
m.Lock()
defer m.Unlock()
return m.nodeID
}
func (m *Model) Process() *process.Process {
return m.process
}