feat(grpc): request cancellation for Go backends via the Cancellable capability

The llama.cpp C++ backend aborts generation when its gRPC context is
cancelled (grpc-server.cpp polls context->IsCancelled() in the result
loops), but Go backends served by pkg/grpc never observed context
cancellation: a disconnected client left the generation running to
completion. Add an optional Cancellable capability; the server registers
context.AfterFunc on the request/stream context (after the Locking block
so queued requests cannot abort the current owner) covering both rich
and legacy paths. dllm implements it: measured cancel latency ~10ms vs
~10s of orphaned generation, and follow-up requests no longer queue
behind cancelled ones (~220ms vs ~9s in the e2e proof).

Assisted-by: Claude Code (Fable 5)
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
Ettore Di Giacinto
2026-06-11 17:50:04 +00:00
parent eb61e1d770
commit ad6d1dbc8b
8 changed files with 349 additions and 17 deletions

View File

@@ -56,7 +56,7 @@ header comment has the full list):
| PredictStreamRich | `dllm_capi_generate_stream`; per committed diffusion block → UTF-8 holdback → parser.Feed → one Reply per non-empty delta batch (channel closed by the CALLER, per `pkg/grpc/interface.go`) |
| Predict / PredictStream | Legacy paths, delegate to the rich pair (legacy stream INVERTS channel ownership: the impl closes) |
| TokenizeString | `dllm_capi_tokenize_json` (C side prepends BOS per `vocab.add_bos`) |
| Cancel | `dllm_capi_cancel`; currently INERT in practice - the gRPC server does not hand the request/stream context to backends, so client disconnects never reach it (plumbing is future work) |
| Cancel | `dllm_capi_cancel`, exposed as the `grpc.Cancellable` capability (`pkg/grpc/interface.go`): the gRPC server arms it via `context.AfterFunc` on the Predict/PredictStream context, so client disconnects/timeouts abort the in-flight generate - llama.cpp `IsCancelled()` parity for Go backends |
`n_threads` and `ctx_len` are accepted-but-ignored by the engine at the
current pin (the context bound comes from GGUF `n_ctx_train`); they are sent
@@ -102,8 +102,8 @@ output is NOT gemma4-parsed (plain content, like any non-autoparsing backend).
| Layer | Gate | What |
|---|---|---|
| `backend/go/dllm/*_test.go` (renderer/parser/wiring) | none - run in plain `go test ./backend/go/dllm/...` | Ginkgo specs over a fake `generator` seam; canonical renderer fixtures from transformers' `test_modeling_diffusion_gemma.py`, parser tables from the vLLM gemma4 parsers |
| `backend/go/dllm/dllm_test.go` C-ABI smoke | `DLLM_TEST_LIBRARY` + `DLLM_TEST_TINY_MODEL` (dllm.cpp's `tests/fixtures/tiny_with_vocab.gguf`); Skips when unset | Drives the real `libdllm.so`: ABI check, load, tokenize `[2,18]`, deterministic generate, cancel |
| `tests/e2e-backends/dllm_test.go` | `BACKEND_TEST_DLLM=1` + `BACKEND_BINARY` (packaged run.sh) + `BACKEND_TEST_MODEL_FILE` (tiny fixture) | Templated chat round trip (Messages + UseTokenizerTemplate) over the real gRPC binary, non-streaming + streaming |
| `backend/go/dllm/dllm_test.go` C-ABI smoke | `DLLM_TEST_LIBRARY` + `DLLM_TEST_TINY_MODEL` (dllm.cpp's `tests/fixtures/tiny_with_vocab.gguf`); Skips when unset | Drives the real `libdllm.so`: ABI check, load, tokenize `[2,18]`, deterministic generate, cancel (incl. mid-stream `Dllm.Cancel` aborting a deliberately slow `eb_max_steps:256` run in ~10ms) |
| `tests/e2e-backends/dllm_test.go` | `BACKEND_TEST_DLLM=1` + `BACKEND_BINARY` (packaged run.sh) + `BACKEND_TEST_MODEL_FILE` (tiny fixture) | Templated chat round trip (Messages + UseTokenizerTemplate) over the real gRPC binary, non-streaming + streaming; plus client-context cancellation mid-stream (proves the `Cancellable` server plumbing end to end) |
| Real-model e2e | `BACKEND_TEST_DLLM_REAL_MODEL_FILE` (26B BF16, ~50 GB) + `BACKEND_TEST_DLLM_REAL_GPU_LAYERS` | CUDA-13-class hardware only |
Tool-call e2e is deliberately absent from the tiny-model spec: the fixture has
@@ -123,8 +123,11 @@ no ldd walk yet).
## Known limitations
- **Cancel is unwired**: nothing calls `Dllm.Cancel` on client disconnect
until the gRPC server plumbs the request context through to backends.
- **Cancel granularity**: the C-ABI cancel flag is per-ctx and resets on
every generate entry, so a Cancel racing a NEW generate can be lost, and
with requests queued on the worker it aborts whichever generate is
currently running (acceptable: the server de-registers the hook on normal
completion, one process serves one model).
- **Throughput**: ~0.15 tok/s on the 26B at default settings (GB10) - every
denoise step recomputes the full prompt+canvas. The upstream prefix-KV
cache (dllm.cpp P3) is the fix; `kv_cache:on` errors until it lands

View File

@@ -21,12 +21,18 @@ import (
"sync"
"unicode/utf8"
grpc "github.com/mudler/LocalAI/pkg/grpc"
"github.com/mudler/LocalAI/pkg/grpc/base"
"github.com/mudler/LocalAI/pkg/grpc/grpcerrors"
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
"github.com/mudler/xlog"
)
// The gRPC server cancels in-flight generations on client disconnect only
// for backends advertising the Cancellable capability; keep Dllm pinned to
// it so a signature drift fails the build, not the disconnect path.
var _ grpc.Cancellable = (*Dllm)(nil)
// generator is the seam between the backend wiring and the dllm.cpp C-ABI:
// the real implementation (capiGenerator) wraps the cGenerate/cTokenizeJSON
// family, while tests substitute a fake to exercise prompt construction,
@@ -181,18 +187,29 @@ func (d *Dllm) Free() error {
return nil
}
// Cancel requests cancellation of the in-flight generate. It deliberately
// bypasses the worker queue: dllm_capi_cancel is the one call the C-ABI
// allows from any goroutine mid-generate (it only flips an atomic).
// Cancel requests cancellation of the in-flight generate (the
// grpc.Cancellable capability). The gRPC server arms it via
// context.AfterFunc on the request/stream context, so a client
// disconnect or timeout aborts the generation server-side - the same
// semantics the llama.cpp C++ backend gets from polling IsCancelled().
// It deliberately bypasses the worker queue: dllm_capi_cancel is the one
// call the C-ABI allows from any goroutine mid-generate (it only flips
// an atomic).
//
// LIMITATION: nothing invokes this on client disconnect today. The gRPC
// server (pkg/grpc/server.go) does not hand the request/stream context to
// Predict/PredictStreamRich, so a dropped HTTP client cannot reach the
// backend until that plumbing exists; the method is here so future server
// wiring (or an admin RPC) has something to call. Note dllm_capi.h's
// cancel-reset race: each generate resets the flag on entry, so a caller
// racing a new generate should re-issue Cancel.
// Note dllm_capi.h's cancel-reset race: each generate resets the flag on
// entry, so a Cancel racing a NEW generate on the same ctx can be lost
// (and, with requests queued on the worker, it aborts whichever generate
// is currently running). The single-flag granularity is acceptable here
// because the server de-registers the hook on normal completion and one
// backend process serves one model.
func (d *Dllm) Cancel() {
// RLock so a server-side AfterFunc firing in the window between a
// request finishing and a model unload cannot touch a freed C ctx
// (Free holds the write lock while tearing gen down). cancel() is the
// one C call that is safe concurrently with an in-flight generate, so
// taking a read lock here cannot deadlock against request holders.
d.genMu.RLock()
defer d.genMu.RUnlock()
if d.gen != nil {
d.gen.cancel()
}

View File

@@ -768,4 +768,40 @@ var _ = Describe("Dllm backend (real tiny model)", func() {
}
Expect(streamed).ToNot(BeEmpty())
})
It("aborts an in-flight generation promptly on Cancel", func() {
d := &Dllm{}
// eb_max_steps inflates the per-block denoise loop so the full run
// takes ~10s on the tiny fixture (vs ~40ms at engine defaults; 16
// blocks, first block after ~0.7s) - long enough that a prompt
// post-cancel return is distinguishable from the generation simply
// finishing.
Expect(d.Load(&pb.ModelOptions{
ModelFile: os.Getenv("DLLM_TEST_TINY_MODEL"),
Options: []string{"eb_max_steps:256"},
})).To(Succeed())
DeferCleanup(func() { Expect(d.Free()).To(Succeed()) })
ch := make(chan *pb.Reply, 64)
errCh := make(chan error, 1)
go func() {
defer GinkgoRecover()
errCh <- d.PredictStreamRich(&pb.PredictOptions{Prompt: "hello", Tokens: 256, Seed: 7}, ch)
}()
// Cancel only once the first block proves the generate is in
// flight: the C side resets the cancel flag on generate entry, so
// an earlier Cancel would be swallowed (dllm_capi.h race note).
Eventually(ch, "60s").Should(Receive())
cancelAt := time.Now()
d.Cancel()
// Uncancelled, ~10s of generation remain; the cancelled call must
// come back in milliseconds (the flag is checked per denoise step).
var genErr error
Eventually(errCh, "5s").Should(Receive(&genErr))
latency := time.Since(cancelAt)
Expect(genErr).To(MatchError(ContainSubstring("cancelled")))
GinkgoWriter.Printf("dllm cancel: PredictStreamRich returned %v after Cancel\n", latency)
})
})

View File

@@ -670,6 +670,7 @@ This backend is **experimental**, and the engine does not yet have a prompt-KV p
- [📖 Text generation (GPT)]({{%relref "features/text-generation" %}})
- [🔥 OpenAI functions]({{%relref "features/openai-functions" %}}) - tool calls are parsed natively by the backend (gemma4 `<|tool_call>` markers), not by LocalAI's grammar/regex fallback
- Reasoning - opt-in thinking streams as `reasoning_content` (see below)
- Request cancellation - disconnecting the client (or a request timeout) aborts the in-flight generation server-side, so an abandoned slow run does not keep the GPU busy
#### Supported platforms

158
pkg/grpc/cancel_test.go Normal file
View File

@@ -0,0 +1,158 @@
package grpc
import (
"context"
"errors"
"sync"
"sync/atomic"
"time"
"github.com/mudler/LocalAI/pkg/grpc/base"
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var errGenCancelled = errors.New("generation cancelled")
// cancellableBackend implements AIModel + AIModelRich + Cancellable. Its
// rich predict paths optionally block until Cancel fires (blockUntilCancel),
// which lets the specs prove the server's context.AfterFunc plumbing: a
// cancelled request context must reach Cancel and unblock the generation.
type cancellableBackend struct {
base.SingleThread
blockUntilCancel bool
started chan struct{} // closed when a predict call is in flight
startOnce sync.Once
cancelled chan struct{} // closed by Cancel
cancelOnce sync.Once
cancelCalls atomic.Int32
}
func newCancellableBackend(blockUntilCancel bool) *cancellableBackend {
return &cancellableBackend{
blockUntilCancel: blockUntilCancel,
started: make(chan struct{}),
cancelled: make(chan struct{}),
}
}
func (c *cancellableBackend) Cancel() {
c.cancelCalls.Add(1)
c.cancelOnce.Do(func() { close(c.cancelled) })
}
func (c *cancellableBackend) run() error {
c.startOnce.Do(func() { close(c.started) })
if !c.blockUntilCancel {
return nil
}
select {
case <-c.cancelled:
return errGenCancelled
case <-time.After(30 * time.Second):
// Backstop so a regression (Cancel never wired) fails the spec
// instead of hanging the suite.
return errors.New("cancellableBackend: Cancel never fired")
}
}
func (c *cancellableBackend) PredictRich(*pb.PredictOptions) (*pb.Reply, error) {
if err := c.run(); err != nil {
return nil, err
}
return &pb.Reply{Message: []byte("done")}, nil
}
func (c *cancellableBackend) PredictStreamRich(_ *pb.PredictOptions, out chan<- *pb.Reply) error {
out <- &pb.Reply{Message: []byte("first")}
return c.run()
}
func (c *cancellableBackend) Predict(*pb.PredictOptions) (string, error) {
return "", errors.New("cancellableBackend: legacy Predict should not have been called")
}
func (c *cancellableBackend) PredictStream(*pb.PredictOptions, chan string) error {
return errors.New("cancellableBackend: legacy PredictStream should not have been called")
}
var _ AIModelRich = (*cancellableBackend)(nil)
var _ Cancellable = (*cancellableBackend)(nil)
var _ = Describe("Cancellable capability", func() {
It("PredictStream: cancelling the request context fires Cancel and ends the stream with the backend's error", func() {
backend := newCancellableBackend(true)
addr := "test://cancel-stream"
Provide(addr, backend)
c := NewClient(addr, true, nil, false)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errCh := make(chan error, 1)
go func() {
defer GinkgoRecover()
errCh <- c.PredictStream(ctx, &pb.PredictOptions{}, func(*pb.Reply) {})
}()
// Only cancel once the generation is provably in flight; cancelling
// earlier would race the AfterFunc registration in the server.
Eventually(backend.started, "5s").Should(BeClosed())
cancel()
var err error
Eventually(errCh, "5s").Should(Receive(&err))
Expect(err).To(MatchError(errGenCancelled))
Expect(backend.cancelCalls.Load()).To(BeNumerically(">=", 1))
})
It("Predict: cancelling the request context fires Cancel and unblocks the call", func() {
backend := newCancellableBackend(true)
addr := "test://cancel-predict"
Provide(addr, backend)
c := NewClient(addr, true, nil, false)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errCh := make(chan error, 1)
go func() {
defer GinkgoRecover()
_, err := c.Predict(ctx, &pb.PredictOptions{})
errCh <- err
}()
Eventually(backend.started, "5s").Should(BeClosed())
cancel()
var err error
Eventually(errCh, "5s").Should(Receive(&err))
Expect(err).To(MatchError(errGenCancelled))
Expect(backend.cancelCalls.Load()).To(BeNumerically(">=", 1))
})
It("does not call Cancel when the request completes normally", func() {
backend := newCancellableBackend(false)
addr := "test://cancel-clean"
Provide(addr, backend)
c := NewClient(addr, true, nil, false)
ctx, cancel := context.WithCancel(context.Background())
var replies []*pb.Reply
err := c.PredictStream(ctx, &pb.PredictOptions{}, func(r *pb.Reply) {
replies = append(replies, r)
})
Expect(err).ToNot(HaveOccurred())
Expect(replies).To(HaveLen(1))
// Cancelling AFTER completion must not reach the backend: the
// deferred AfterFunc stop de-registered the hook, so a shared or
// reused context cannot abort someone else's later generation.
cancel()
Consistently(backend.cancelCalls.Load, "200ms").Should(BeZero())
})
})

View File

@@ -72,6 +72,19 @@ func newReply(s string) *pb.Reply {
return &pb.Reply{Message: []byte(s)}
}
// Cancellable is an optional capability: backends that can abort an
// in-flight generation implement it. The server calls Cancel when the
// request's gRPC context is cancelled (client disconnect/timeout),
// giving Go backends the same semantics the llama.cpp C++ backend gets
// from polling context->IsCancelled() in its result loops.
//
// Cancel may be invoked from an arbitrary goroutine while the
// generation is running, so implementations must make it safe to call
// concurrently with Predict/PredictStream (and their rich variants).
type Cancellable interface {
Cancel()
}
// AIModelRich is an optional extension to AIModel for backends that
// can produce a full *pb.Reply — including tool-call deltas and
// usage tokens — rather than just a content string. The gRPC server

View File

@@ -63,11 +63,32 @@ func (s *server) LoadModel(ctx context.Context, in *pb.ModelOptions) (*pb.Result
return &pb.Result{Message: "Loading succeeded", Success: true}, nil
}
// cancelOnDone arms the optional Cancellable capability: when ctx is
// cancelled (client disconnect/timeout) the backend's Cancel fires so it
// can abort the in-flight generation - the Go-backend equivalent of the
// llama.cpp C++ server polling context->IsCancelled() in its result loops.
// Callers MUST defer the returned stop so a normally-completed request
// de-registers the hook before returning; otherwise a later cancellation
// of the same ctx would abort an unrelated in-flight generation.
//
// Arm it AFTER the Locking() block: for serialized backends a request
// queued on the lock is not generating yet, and cancelling it must not
// abort whichever request currently owns the backend.
func (s *server) cancelOnDone(ctx context.Context) (stop func() bool) {
if c, ok := s.llm.(Cancellable); ok {
return context.AfterFunc(ctx, c.Cancel)
}
return func() bool { return false }
}
func (s *server) Predict(ctx context.Context, in *pb.PredictOptions) (*pb.Reply, error) {
if s.llm.Locking() {
s.llm.Lock()
defer s.llm.Unlock()
}
// One registration covers both the rich and the legacy branch below.
stop := s.cancelOnDone(ctx)
defer stop()
if rich, ok := s.llm.(AIModelRich); ok {
return rich.PredictRich(in)
}
@@ -275,6 +296,10 @@ func (s *server) PredictStream(in *pb.PredictOptions, stream pb.Backend_PredictS
defer s.llm.Unlock()
}
// One registration covers both the rich and the legacy branch below.
stop := s.cancelOnDone(stream.Context())
defer stop()
if rich, ok := s.llm.(AIModelRich); ok {
replyChan := make(chan *pb.Reply)
done := make(chan bool)

View File

@@ -61,8 +61,9 @@ import (
// DeferCleanup the moment each resource exists, so a failure anywhere in
// setup (port-wait timeout, dial error, LoadModel failure) still reaps the
// spawned server - critical for the real-model spec, where a failed load
// would otherwise leak a ~50GB process.
func startDllmBackend(modelFile string, gpuLayers int32) pb.BackendClient {
// would otherwise leak a ~50GB process. options are extra ModelOptions
// "key:value" entries (eb_* sampler knobs etc.).
func startDllmBackend(modelFile string, gpuLayers int32, options ...string) pb.BackendClient {
GinkgoHelper()
binary := os.Getenv("BACKEND_BINARY")
@@ -116,6 +117,7 @@ func startDllmBackend(modelFile string, gpuLayers int32) pb.BackendClient {
ContextSize: envInt32("BACKEND_TEST_CTX_SIZE", 512),
Threads: envInt32("BACKEND_TEST_THREADS", 4),
NGPULayers: gpuLayers,
Options: options,
})
Expect(err).NotTo(HaveOccurred())
Expect(res.GetSuccess()).To(BeTrue(), "dllm LoadModel failed: %s", res.GetMessage())
@@ -201,6 +203,83 @@ var _ = Describe("dllm templated chat-completion (tiny model)", Ordered, func()
})
})
var _ = Describe("dllm request cancellation (tiny model)", Ordered, func() {
var client pb.BackendClient
BeforeAll(func() {
if os.Getenv("BACKEND_TEST_DLLM") != "1" {
Skip("dllm cancellation spec is opt-in; set BACKEND_TEST_DLLM=1 (plus BACKEND_BINARY and BACKEND_TEST_MODEL_FILE) to run it")
}
modelFile := os.Getenv("BACKEND_TEST_MODEL_FILE")
Expect(modelFile).NotTo(BeEmpty(),
"dllm cancellation spec requires BACKEND_TEST_MODEL_FILE (dllm.cpp's tests/fixtures/tiny_with_vocab.gguf)")
// eb_max_steps inflates the per-block denoise loop: a 256-token run
// takes ~10s on the tiny fixture (vs ~40ms at engine defaults), so a
// cancelled request is clearly distinguishable from one that simply
// finished. A dedicated backend process keeps the chat specs fast.
client = startDllmBackend(modelFile, 0, "eb_max_steps:256")
})
// This is the end-to-end proof of the Cancellable plumbing
// (pkg/grpc/server.go arming backend.Cancel via context.AfterFunc on
// the stream context): a client disconnect mid-stream must abort the
// server-side generation, not just orphan it.
It("aborts the in-flight generation when the client context is cancelled mid-stream", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Raw-prompt mode, not the templated chat request: the templated
// render can hit an end-of-turn token after the first block and
// finish before the cancel lands, which would silently turn this
// into a no-op spec. The raw "hello" run is probed deterministic
// with seed 7: 16 blocks, the eb_max_steps cap hit on every one,
// ~10s total if left to finish.
req := &pb.PredictOptions{Prompt: "hello", Tokens: 256, Seed: 7}
stream, err := client.PredictStream(ctx, req)
Expect(err).NotTo(HaveOccurred())
// First chunk received = the generate is provably in flight (the C
// side resets the cancel flag on generate entry, so cancelling
// before it starts would be swallowed).
_, err = stream.Recv()
Expect(err).NotTo(HaveOccurred())
cancel()
// Client side: the stream must end promptly, not after the
// remaining ~9s of generation (the first chunk arrives after one
// ~0.7s block, so plenty of generation is provably outstanding).
recvDone := make(chan error, 1)
go func() {
defer GinkgoRecover()
for {
if _, rerr := stream.Recv(); rerr != nil {
recvDone <- rerr
return
}
}
}()
var rerr error
Eventually(recvDone, "5s").Should(Receive(&rerr))
Expect(rerr).NotTo(Equal(io.EOF), "stream completed normally despite the cancelled context")
// Server side: prove the generation actually aborted. dllm
// serializes every C call through one worker goroutine, so if the
// orphaned generation were still grinding, this follow-up would
// queue behind its remaining ~9s instead of completing in ~1s
// (16 tokens = one block at eb_max_steps:256).
followCtx, followCancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer followCancel()
start := time.Now()
res, err := client.Predict(followCtx, dllmChatRequest())
elapsed := time.Since(start)
Expect(err).NotTo(HaveOccurred())
Expect(string(res.GetMessage())).NotTo(BeEmpty())
Expect(elapsed).To(BeNumerically("<", 5*time.Second),
"follow-up request queued behind the cancelled generation - server-side Cancel did not reach the backend")
GinkgoWriter.Printf("dllm cancel e2e: follow-up completed in %v after mid-stream cancellation\n", elapsed)
})
})
var _ = Describe("dllm templated chat-completion (real model)", Ordered, func() {
var client pb.BackendClient