Files
LocalAI/core/services/cloudproxy/proxy.go
Richard Palethorpe 6a80e23733 feat(middleware): Model routing, PII filtering, Cloud model proxies (#9802)
Add a routing middleware stack and a cloud-proxy backend.

* cloud-proxy: a Go gRPC backend that forwards OpenAI- and
  Anthropic-shaped chat requests to upstream providers, with an
  optional translate mode (OpenAI request -> Anthropic /v1/messages
  -> OpenAI response) and full tool-calling support.

* routing: admission control, content-aware model routing
  (embedding cache + classifier + rerank + Arch-Router score),
  PII detection/redaction (regex + NER) with streaming filter and
  OpenAI/Anthropic adapters, and a per-user/per-key billing recorder
  backed by GORM or in-memory storage.

* middleware: UsageMiddleware records usage via the billing recorder,
  plus admission, route-model, usage-stamp and trace middlewares.

* observability: BackendTrace ring buffer stores full request bodies
  (capped), MITM proxy emits structured trace events, and router
  classifier decisions surface at /api/router/decide.

* gallery: Arch-Router-1.5B (Q4_K_M and Q8_0).

* UI: cloud-proxy model-editor fields, classifier system-prompt and
  score-normalization config, and a Traces page rendering request
  bodies.

Assisted-by: claude-code:claude-opus-4-7 [Read] [Edit] [Bash]

Signed-off-by: Richard Palethorpe <io@richiejp.com>
2026-05-25 09:28:27 +02:00

126 lines
3.4 KiB
Go

// Package cloudproxy stitches the cloud-proxy gRPC backend to the
// HTTP edge: model rewrite, body shaping, and SSE-aware PII filtering
// on the response. The outbound HTTP request itself lives inside the
// cloud-proxy backend binary (backend/go/cloud-proxy), not here — this
// package is the core-side glue.
package cloudproxy
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"github.com/labstack/echo/v4"
"github.com/mudler/LocalAI/core/services/cloudproxy/ssewire"
"github.com/mudler/LocalAI/core/services/routing/pii"
"github.com/mudler/xlog"
)
func rewriteModel(body []byte, upstreamModel string) ([]byte, error) {
if upstreamModel == "" {
return body, nil
}
var m map[string]any
if err := json.Unmarshal(body, &m); err != nil {
return nil, fmt.Errorf("cloudproxy: parse request body: %w", err)
}
m["model"] = upstreamModel
return json.Marshal(m)
}
func streaming(body []byte) bool {
var probe struct {
Stream bool `json:"stream"`
}
if err := json.Unmarshal(body, &probe); err != nil {
return false
}
return probe.Stream
}
// passthroughError emits the upstream's error response unchanged.
func passthroughError(c echo.Context, statusCode int, contentType string, body io.Reader) error {
const maxErrBody = 1 << 20
buf, _ := io.ReadAll(io.LimitReader(body, maxErrBody))
if contentType != "" {
c.Response().Header().Set("Content-Type", contentType)
}
c.Response().WriteHeader(statusCode)
_, _ = c.Response().Writer.Write(buf)
return nil
}
func forwardBuffered(c echo.Context, statusCode int, contentType string, body io.Reader) error {
if contentType != "" {
c.Response().Header().Set("Content-Type", contentType)
}
c.Response().WriteHeader(statusCode)
_, err := io.Copy(c.Response().Writer, body)
return err
}
// forwardStream applies SSE-aware PII rewriting as the response flows
// to the client. provider selects the dialect (openai vs anthropic);
// it comes from cfg.Proxy.Provider on the cloud-proxy backend.
func forwardStream(c echo.Context, body io.Reader, provider string, filter *pii.StreamFilter) error {
c.Response().Header().Set("Content-Type", "text/event-stream")
c.Response().Header().Set("Cache-Control", "no-cache")
c.Response().Header().Set("Connection", "keep-alive")
c.Response().WriteHeader(http.StatusOK)
emit := func(line string) error {
_, err := fmt.Fprint(c.Response().Writer, line)
if err != nil {
return err
}
c.Response().Flush()
return nil
}
flushResidual := func() {
if filter == nil {
return
}
residual := filter.Drain()
if residual == "" {
return
}
if line := ssewire.SynthResidualEvent(ssewire.Provider(provider), residual); line != "" {
_ = emit(line)
}
}
prov := ssewire.Provider(provider)
scanner := ssewire.NewScanner(body)
for scanner.Scan() {
ev := scanner.Event()
if ssewire.IsTerminalMarker(ev.DataLine, prov) {
flushResidual()
_ = emit(ev.Raw)
continue
}
out := ev.Raw
if filter != nil && ev.DataLine != "" {
rewritten, drop := ssewire.RewritePayload(ev.DataLine, prov, filter)
if drop {
continue
}
if rewritten != ev.DataLine {
// strings.Replace with n=1 touches only the data line,
// preserving any "event:"/"id:" preamble.
out = strings.Replace(ev.Raw, ev.DataLine, rewritten, 1)
}
}
if err := emit(out); err != nil {
return nil
}
}
if err := scanner.Err(); err != nil && err != io.EOF {
xlog.Debug("cloudproxy: stream read error", "error", err)
}
flushResidual()
return nil
}