feat(middleware): Model routing, PII filtering, Cloud model proxies (#9802)

Add a routing middleware stack and a cloud-proxy backend.

* cloud-proxy: a Go gRPC backend that forwards OpenAI- and
  Anthropic-shaped chat requests to upstream providers, with an
  optional translate mode (OpenAI request -> Anthropic /v1/messages
  -> OpenAI response) and full tool-calling support.

* routing: admission control, content-aware model routing
  (embedding cache + classifier + rerank + Arch-Router score),
  PII detection/redaction (regex + NER) with streaming filter and
  OpenAI/Anthropic adapters, and a per-user/per-key billing recorder
  backed by GORM or in-memory storage.

* middleware: UsageMiddleware records usage via the billing recorder,
  plus admission, route-model, usage-stamp and trace middlewares.

* observability: BackendTrace ring buffer stores full request bodies
  (capped), MITM proxy emits structured trace events, and router
  classifier decisions surface at /api/router/decide.

* gallery: Arch-Router-1.5B (Q4_K_M and Q8_0).

* UI: cloud-proxy model-editor fields, classifier system-prompt and
  score-normalization config, and a Traces page rendering request
  bodies.

Assisted-by: claude-code:claude-opus-4-7 [Read] [Edit] [Bash]

Signed-off-by: Richard Palethorpe <io@richiejp.com>
This commit is contained in:
Richard Palethorpe
2026-05-25 08:28:27 +01:00
committed by GitHub
parent 1dcd1ae915
commit 6a80e23733
229 changed files with 26339 additions and 1030 deletions

View File

@@ -34,6 +34,7 @@
#include <regex>
#include <algorithm>
#include <atomic>
#include <cmath>
#include <cstdlib>
#include <fstream>
#include <iterator>
@@ -121,6 +122,40 @@ static std::string base64_encode_bytes(const unsigned char* data, size_t len) {
bool loaded_model; // TODO: add a mutex for this, but happens only once loading the model
// Score bypasses the slot loop (see the comment on Score below) so it
// must not run concurrently with any slot-loop RPC. These counters
// are a defence-in-depth tripwire — ModelConfig.Validate already
// rejects llama-cpp configs that mix score with chat/completion/
// embeddings, so a healthy deployment never trips them. seq_cst is
// load-bearing for the increment-then-check pattern below.
static std::atomic<int> slot_loop_inflight{0};
static std::atomic<int> score_inflight{0};
// Increment-then-check, not check-then-increment: two simultaneous
// racers both observe the other's increment and both abort cleanly.
// Reversed, both could see zero and proceed.
struct conflict_guard {
std::atomic<int>& self;
conflict_guard(const char* rpc, std::atomic<int>& self_, std::atomic<int>& other, const char* other_name)
: self(self_) {
self.fetch_add(1, std::memory_order_seq_cst);
int o = other.load(std::memory_order_seq_cst);
if (o > 0) {
fprintf(stderr,
"FATAL: %s called with %s=%d. The llama-cpp backend cannot "
"service Score and slot-loop RPCs concurrently — Score "
"bypasses the slot loop and races the llama_context. Bind "
"Score-using features to a model dedicated to scoring "
"(known_usecases: [score] with no chat/completion/embeddings).\n",
rpc, other_name, o);
std::abort();
}
}
~conflict_guard() {
self.fetch_sub(1, std::memory_order_seq_cst);
}
};
static std::function<void(int)> shutdown_handler;
static std::atomic_flag is_terminating = ATOMIC_FLAG_INIT;
@@ -1446,6 +1481,7 @@ public:
if (params_base.model.path.empty()) {
return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, "Model not loaded");
}
conflict_guard guard("PredictStream", slot_loop_inflight, score_inflight, "score_inflight");
json data = parse_options(true, request, params_base, ctx_server.get_llama_context());
@@ -2205,6 +2241,7 @@ public:
if (params_base.model.path.empty()) {
return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, "Model not loaded");
}
conflict_guard guard("Predict", slot_loop_inflight, score_inflight, "score_inflight");
json data = parse_options(true, request, params_base, ctx_server.get_llama_context());
data["stream"] = false;
@@ -2963,6 +3000,7 @@ public:
if (params_base.model.path.empty()) {
return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, "Model not loaded");
}
conflict_guard guard("Embedding", slot_loop_inflight, score_inflight, "score_inflight");
json body = parse_options(false, request, params_base, ctx_server.get_llama_context());
body["stream"] = false;
@@ -3070,6 +3108,8 @@ public:
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "\"documents\" must be a non-empty string array");
}
conflict_guard guard("Rerank", slot_loop_inflight, score_inflight, "score_inflight");
// Create and queue the task
auto rd = ctx_server.get_response_reader();
{
@@ -3142,12 +3182,218 @@ public:
return grpc::Status::OK;
}
// Score returns the model's joint log-probability of each candidate
// continuation given a shared prompt.
//
// WHY bypass the slot/task queue: upstream server_context exposes
// get_llama_context as "main thread only" and the slot loop's
// update_slots() owns the context whenever a task is in flight.
// No public synchronization primitive is available — so Score is
// unsafe to call concurrently with active generation through this
// backend. In practice routing-classifier calls happen before the
// request is routed to a generation backend, so the model used
// for Score is typically idle. Concurrent Score calls are
// serialised by a local mutex; KV-cache state is isolated behind
// a dedicated sequence ID cleared between candidates.
//
// A patch to server-context.cpp that adds SERVER_TASK_TYPE_SCORE
// and routes scoring through the slot loop would be the correct
// long-term fix; tracked as a follow-up.
//
// Perf TODO (measured: ~450 ms warm for 3 candidates on Arch-
// Router-1.5B Q4_K_M + Intel SYCL): the current loop re-decodes
// `prompt + candidate` from scratch for every candidate, throwing
// away the prompt's KV cache between iterations. A smarter
// version would:
// 1. Decode just the prompt once into score_seq_id.
// 2. Snapshot/cp that sequence (llama_memory_seq_cp) into a
// per-candidate sequence id.
// 3. For each candidate, decode only its tokens onto the copy
// (continuing from the saved prompt state), read logits.
// 4. llama_memory_seq_rm the copy.
// Estimated speedup: 3-candidate calls 450 ms -> ~150-200 ms,
// 6-candidate calls 630 ms -> ~220 ms. Single source-file change,
// no proto / Go-side changes needed. Worth doing once routing is
// wired into the middleware and Score is on the hot path of every
// chat request.
grpc::Status Score(ServerContext* context, const backend::ScoreRequest* request, backend::ScoreResponse* response) override {
auto auth = checkAuth(context);
if (!auth.ok()) return auth;
if (params_base.model.path.empty()) {
return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, "Model not loaded");
}
if (request->candidates_size() == 0) {
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "candidates must be non-empty");
}
// Tripwire against the slot loop. Acquired before score_mutex
// so it fires even when this Score is queued behind another.
conflict_guard guard("Score", score_inflight, slot_loop_inflight, "slot_loop_inflight");
// Serialise concurrent Score calls. The slot loop is still
// free to race with us — see the class comment above.
static std::mutex score_mutex;
std::lock_guard<std::mutex> score_lock(score_mutex);
llama_context * lctx = ctx_server.get_llama_context();
if (lctx == nullptr) {
return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, "llama context unavailable (sleeping?)");
}
const llama_vocab * vocab = ctx_server.impl->vocab;
const int32_t n_vocab = llama_vocab_n_tokens(vocab);
const int32_t n_ctx = llama_n_ctx(lctx);
llama_memory_t mem = llama_get_memory(lctx);
// The KV-cache is sized to seq_to_stream.size() at load
// (typically equal to n_slots, often 1). Sequence IDs must
// be in [0, n_seq_max), so we can't pick a high-value
// "private" ID — we have to share with the slot. We clear
// the cache before AND after each candidate to keep
// scoring isolated from whatever state the slot held, and
// the static mutex above guarantees no other Score call is
// racing in the meantime. The slot loop is still free to
// race (see comment on this method) — Score must not run
// concurrently with generation through this backend.
const llama_seq_id score_seq_id = 0;
llama_memory_seq_rm(mem, score_seq_id, -1, -1);
// Tokenize the shared prompt once with add_special=true so
// BOS is prepended when the model requires it. parse_special
// keeps chat-template markers in the prompt intact.
const std::string prompt = request->prompt();
std::vector<llama_token> prompt_tokens = common_tokenize(vocab, prompt, /*add_special=*/true, /*parse_special=*/true);
const int32_t prompt_len = (int32_t) prompt_tokens.size();
for (int ci = 0; ci < request->candidates_size(); ci++) {
const std::string & candidate_text = request->candidates(ci);
// Re-tokenize prompt + candidate as a single string. BPE
// merges across the boundary can shift the tokenization
// versus tokenize(prompt) ++ tokenize(candidate), so we
// find the divergence point against prompt_tokens.
std::vector<llama_token> full_tokens = common_tokenize(vocab, prompt + candidate_text, /*add_special=*/true, /*parse_special=*/true);
int32_t divergence = prompt_len;
const int32_t min_len = std::min<int32_t>(prompt_len, (int32_t) full_tokens.size());
for (int32_t i = 0; i < min_len; i++) {
if (prompt_tokens[i] != full_tokens[i]) {
divergence = i;
break;
}
}
const int32_t cand_len = (int32_t) full_tokens.size() - divergence;
backend::CandidateScore * cs = response->add_candidates();
cs->set_num_tokens(cand_len);
if (cand_len <= 0) {
cs->set_log_prob(0.0);
if (request->length_normalize()) {
cs->set_length_normalized_log_prob(0.0);
}
continue;
}
if (divergence < 1) {
// Need at least one prior token (typically BOS) to
// predict the first candidate token's logit. Tokeniser
// models without BOS + an empty prompt fall in here.
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
"Score: prompt produced no leading tokens; need at least one (e.g. BOS) to predict candidate");
}
if ((int32_t) full_tokens.size() > n_ctx) {
return grpc::Status(grpc::StatusCode::OUT_OF_RANGE,
"Score: prompt+candidate exceeds context size (got " +
std::to_string(full_tokens.size()) + ", n_ctx=" + std::to_string(n_ctx) + ")");
}
// Build a batch covering the entire prompt+candidate. We
// need logits at (divergence-1) onward — those are the
// predictions for each candidate token.
llama_batch batch = llama_batch_init((int32_t) full_tokens.size(), 0, 1);
for (int32_t i = 0; i < (int32_t) full_tokens.size(); i++) {
batch.token[i] = full_tokens[i];
batch.pos[i] = i;
batch.n_seq_id[i] = 1;
batch.seq_id[i][0] = score_seq_id;
// logits[i] is "do we want the prediction *for the
// next token*, computed from this position?"
// We want predictions for candidate tokens at
// positions divergence .. full_tokens.size()-1, which
// come from logits at positions (divergence-1) ..
// (full_tokens.size()-2).
bool need_logit = (i >= divergence - 1) && (i < (int32_t) full_tokens.size() - 1);
batch.logits[i] = need_logit ? 1 : 0;
}
batch.n_tokens = (int32_t) full_tokens.size();
// Decode the batch. If decode fails (e.g. KV slot
// exhaustion), surface as INTERNAL — the caller will
// typically fall back to a sampling-based classifier.
int decode_err = llama_decode(lctx, batch);
if (decode_err != 0) {
llama_batch_free(batch);
llama_memory_seq_rm(mem, score_seq_id, -1, -1);
return grpc::Status(grpc::StatusCode::INTERNAL,
"llama_decode failed during Score: " + std::to_string(decode_err));
}
// Sum log-probabilities of the actual candidate tokens.
double total_log_prob = 0.0;
for (int32_t k = 0; k < cand_len; k++) {
// The k-th candidate token sits at full_tokens index
// (divergence + k). Its predicting logit is at batch
// position (divergence + k - 1).
int32_t logit_pos = divergence + k - 1;
const float * logits = llama_get_logits_ith(lctx, logit_pos);
if (logits == nullptr) {
llama_batch_free(batch);
llama_memory_seq_rm(mem, score_seq_id, -1, -1);
return grpc::Status(grpc::StatusCode::INTERNAL,
"llama_get_logits_ith returned null at position " + std::to_string(logit_pos));
}
llama_token target_token = full_tokens[divergence + k];
// Compute log_softmax(logits)[target_token] with the
// max-subtraction stability trick.
float max_logit = logits[0];
for (int32_t v = 1; v < n_vocab; v++) {
if (logits[v] > max_logit) max_logit = logits[v];
}
double sum_exp = 0.0;
for (int32_t v = 0; v < n_vocab; v++) {
sum_exp += std::exp((double)(logits[v] - max_logit));
}
double token_log_prob = (double)(logits[target_token] - max_logit) - std::log(sum_exp);
total_log_prob += token_log_prob;
if (request->include_token_logprobs()) {
backend::TokenLogProb * tlp = cs->add_tokens();
std::string piece = common_token_to_piece(lctx, target_token);
tlp->set_token(piece);
tlp->set_log_prob(token_log_prob);
}
}
cs->set_log_prob(total_log_prob);
if (request->length_normalize() && cand_len > 0) {
cs->set_length_normalized_log_prob(total_log_prob / (double) cand_len);
}
llama_batch_free(batch);
// Drop this candidate's KV-cache contribution so the next
// candidate starts from a clean state. Without this, the
// next decode would conflict at positions 0..N-1 for our
// sequence ID.
llama_memory_seq_rm(mem, score_seq_id, -1, -1);
}
return grpc::Status::OK;
}
grpc::Status TokenizeString(ServerContext* context, const backend::PredictOptions* request, backend::TokenizationResponse* response) override {
auto auth = checkAuth(context);
if (!auth.ok()) return auth;
if (params_base.model.path.empty()) {
return grpc::Status(grpc::StatusCode::FAILED_PRECONDITION, "Model not loaded");
}
conflict_guard guard("TokenizeString", slot_loop_inflight, score_inflight, "score_inflight");
json body = parse_options(false, request, params_base, ctx_server.get_llama_context());
body["stream"] = false;
@@ -3169,6 +3415,8 @@ public:
grpc::Status GetMetrics(ServerContext* /*context*/, const backend::MetricsRequest* /*request*/, backend::MetricsResponse* response) override {
conflict_guard guard("GetMetrics", slot_loop_inflight, score_inflight, "score_inflight");
// request slots data using task queue
auto rd = ctx_server.get_response_reader();
int task_id = rd.queue_tasks.get_new_id();