diff --git a/core/http/endpoints/openai/chat.go b/core/http/endpoints/openai/chat.go index db716e4b7..7247ad18c 100644 --- a/core/http/endpoints/openai/chat.go +++ b/core/http/endpoints/openai/chat.go @@ -325,6 +325,11 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator c.Response().Header().Set("Cache-Control", "no-cache") c.Response().Header().Set("Connection", "keep-alive") c.Response().Header().Set("X-Correlation-ID", id) + // Best-effort: surface the picked node ID when a previous request + // has already routed this model. For the very first request after + // a fresh start the model isn't loaded yet, so the header is + // omitted; warm-cache requests get accurate attribution. + maybeSetNodeHeader(c, startupOptions, ml, input.Model) mcpStreamMaxIterations := 10 if config.Agent.MaxIterations > 0 { @@ -935,6 +940,11 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator respData, _ := json.Marshal(resp) xlog.Debug("Response", "response", string(respData)) + // Attribute the response to a specific worker node when + // distributed mode is enabled and the operator opted in via + // --expose-node-header. No-op otherwise. + maybeSetNodeHeader(c, startupOptions, ml, input.Model) + // Return the prediction in the response body return c.JSON(200, resp) } // end MCP iteration loop diff --git a/core/http/endpoints/openai/completion.go b/core/http/endpoints/openai/completion.go index f81e13e6a..354e7e461 100644 --- a/core/http/endpoints/openai/completion.go +++ b/core/http/endpoints/openai/completion.go @@ -106,6 +106,11 @@ func CompletionEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, eva c.Response().Header().Set("Content-Type", "text/event-stream") c.Response().Header().Set("Cache-Control", "no-cache") c.Response().Header().Set("Connection", "keep-alive") + // Best-effort: surface the picked node ID when a previous request + // has already routed this model. For the very first request after + // a fresh start the model isn't loaded yet, so the header is + // omitted; warm-cache requests get accurate attribution. + maybeSetNodeHeader(c, appConfig, ml, input.Model) if len(config.PromptStrings) > 1 { return errors.New("cannot handle more than 1 `PromptStrings` when Streaming") @@ -274,6 +279,11 @@ func CompletionEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, eva jsonResult, _ := json.Marshal(resp) xlog.Debug("Response", "response", string(jsonResult)) + // Attribute the response to a specific worker node when distributed + // mode is enabled and the operator opted in via --expose-node-header. + // No-op otherwise. + maybeSetNodeHeader(c, appConfig, ml, input.Model) + // Return the prediction in the response body return c.JSON(200, resp) } diff --git a/core/http/endpoints/openai/embeddings.go b/core/http/endpoints/openai/embeddings.go index 517881f66..edf30b762 100644 --- a/core/http/endpoints/openai/embeddings.go +++ b/core/http/endpoints/openai/embeddings.go @@ -102,6 +102,11 @@ func EmbeddingsEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, app jsonResult, _ := json.Marshal(resp) xlog.Debug("Response", "response", string(jsonResult)) + // Attribute the response to a specific worker node when distributed + // mode is enabled and the operator opted in via --expose-node-header. + // No-op otherwise. + maybeSetNodeHeader(c, appConfig, ml, input.Model) + // Return the prediction in the response body return c.JSON(200, resp) } diff --git a/core/http/endpoints/openai/node_header.go b/core/http/endpoints/openai/node_header.go new file mode 100644 index 000000000..0fb32d770 --- /dev/null +++ b/core/http/endpoints/openai/node_header.go @@ -0,0 +1,58 @@ +package openai + +import ( + "github.com/labstack/echo/v4" + + "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/pkg/model" +) + +// NodeHeaderName is the HTTP response header that, when --expose-node-header +// is enabled, carries the ID of the distributed-mode worker node that served +// the inference request. Off by default: node IDs reveal internal topology +// and shouldn't be exposed on a public endpoint. +const NodeHeaderName = "X-LocalAI-Node" + +// nodeIDContextKey is the Echo context key used to stash the picked node ID +// for the current request, so streaming handlers can attach the header +// before any chunk is written. The constant lives here to keep the surface +// minimal: it is private to the openai handlers package. +const nodeIDContextKey = "served-by-node-id" + +// MaybeSetNodeHeaderForTest is an exported shim around maybeSetNodeHeader +// to let external tests (package openai_test) exercise the header policy +// without exporting the implementation. Production callers must use the +// internal lowercase helper. +func MaybeSetNodeHeaderForTest(c echo.Context, appConfig *config.ApplicationConfig, ml *model.ModelLoader, modelID string) { + maybeSetNodeHeader(c, appConfig, ml, modelID) +} + +// maybeSetNodeHeader writes the X-LocalAI-Node response header when the +// operator has opted in via --expose-node-header / LOCALAI_EXPOSE_NODE_HEADER +// and the requested model's most-recent routing decision carries a node ID. +// +// Best-effort: the ModelLoader's distributed store keeps only the latest +// routing per modelID, so under high concurrency for the same model across +// different nodes, this header reflects "a recent decision" rather than +// "the exact node that served this exact request". That is acceptable for +// observability and load-balancer attribution: it is the same constraint +// the operator already sees in the distributed-mode logs. +// +// Callers should invoke this right before writing the first byte of the +// response (after computing the result for buffered handlers, or before +// streaming begins for SSE handlers) so Echo flushes it with the headers. +func maybeSetNodeHeader(c echo.Context, appConfig *config.ApplicationConfig, ml *model.ModelLoader, modelID string) { + if appConfig == nil || !appConfig.ExposeNodeHeader || ml == nil || modelID == "" { + return + } + m := ml.CheckIsLoaded(modelID) + if m == nil { + return + } + nodeID := m.NodeID() + if nodeID == "" { + return + } + c.Set(nodeIDContextKey, nodeID) + c.Response().Header().Set(NodeHeaderName, nodeID) +} diff --git a/core/services/nodes/model_router.go b/core/services/nodes/model_router.go index d834bcbc8..de424a92b 100644 --- a/core/services/nodes/model_router.go +++ b/core/services/nodes/model_router.go @@ -68,6 +68,11 @@ func (a *ModelRouterAdapter) Route(ctx context.Context, backend, modelID, modelN // by SmartRouter. Use NewModelWithClient so the wrapper is preserved when // the ModelLoader returns this model on subsequent requests. m := model.NewModelWithClient(modelID, result.Node.Address, result.Client) + // Stash the picked node ID so HTTP handlers can surface it via the + // optional X-LocalAI-Node response header. Best-effort: the in-process + // store keeps only the latest routing decision per modelID; see the + // nodeID field comment on Model. + m.SetNodeID(result.Node.ID) xlog.Info("Model routed to remote node", "model", modelName, "node", result.Node.Name, "address", result.Node.Address) return m, nil diff --git a/docs/content/features/distributed-mode.md b/docs/content/features/distributed-mode.md index 792539bce..ceea3a7d4 100644 --- a/docs/content/features/distributed-mode.md +++ b/docs/content/features/distributed-mode.md @@ -88,6 +88,7 @@ The frontend is a standard LocalAI instance with distributed mode enabled. These | `--auth-database-url` | `LOCALAI_AUTH_DATABASE_URL` | *(required)* | PostgreSQL connection URL | | `--backend-install-timeout` | `LOCALAI_NATS_BACKEND_INSTALL_TIMEOUT` | `15m` | How long the frontend waits for a worker to acknowledge a backend install before considering the request stalled. Raise it when workers pull large backend images over slow links. If a worker takes longer than this, the operation shows as "still installing in background" in the admin UI and clears once the worker finishes. | | `--backend-upgrade-timeout` | `LOCALAI_NATS_BACKEND_UPGRADE_TIMEOUT` | `15m` | Same as the install timeout, applied to backend upgrades (force-reinstall). | +| `--expose-node-header` | `LOCALAI_EXPOSE_NODE_HEADER` | `false` | When enabled, inference responses (chat completions, completions, embeddings) carry an `X-LocalAI-Node` header with the ID of the worker node that served the request. Useful for debugging, observability and load-balancer attribution. Off by default: the node ID reveals internal cluster topology and should not be exposed on a public endpoint. Best-effort: the header reflects the most recent routing decision recorded for the model, which under heavy concurrency may not exactly match the node that served a specific in-flight request. | ### Optional: S3 Object Storage diff --git a/pkg/model/model.go b/pkg/model/model.go index 4a0e6af2a..13bf3b330 100644 --- a/pkg/model/model.go +++ b/pkg/model/model.go @@ -19,6 +19,14 @@ type Model struct { client grpc.Backend process *process.Process lastHealthCheck time.Time + // nodeID is the ID of the distributed-mode worker node that owns this + // model handle, when set. Empty for in-process models. Best-effort: + // because the distributed LoadModel path overwrites the per-modelID + // store entry on every routing decision, this value reflects the + // most-recently-routed node for the model, not necessarily the node + // that served a specific in-flight request. Used by the optional + // X-LocalAI-Node response header (--expose-node-header). + nodeID string sync.Mutex } @@ -40,6 +48,23 @@ func NewModelWithClient(ID, address string, client grpc.Backend) *Model { } } +// SetNodeID records the distributed-mode worker node that owns this model +// handle. Safe to call from any goroutine. +func (m *Model) SetNodeID(id string) { + m.Lock() + defer m.Unlock() + m.nodeID = id +} + +// NodeID returns the distributed-mode worker node ID associated with this +// model handle, or "" if unknown / in-process. See the nodeID field comment +// for the best-effort caveat. +func (m *Model) NodeID() string { + m.Lock() + defer m.Unlock() + return m.nodeID +} + func (m *Model) Process() *process.Process { return m.process }