Files
LocalAI/core/backend/transcript.go
LocalAI [bot] 2be07f61da feat(whisper): honor client cancellation via ggml abort_callback (#9710)
* refactor(transcription): propagate request ctx through ModelTranscription*

Replaces context.Background() with the HTTP request ctx so client
disconnects start cancelling the gRPC call. No backend-side abort wiring
yet — that comes in a later commit. Pure plumbing.

Assisted-by: Claude:claude-haiku-4-5
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(cli): pass ctx to backend.ModelTranscription

Follow-up to e65d3e1f which threaded ctx through ModelTranscription
but missed the CLI caller. CLI commands have no request-scoped ctx,
so context.Background() is correct here.

Assisted-by: Claude:claude-haiku-4-5
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor(audio): propagate request ctx into TTS, sound-gen, audio-transform

Same ctx-plumbing pattern applied to the rest of the audio path. CLI
callers use context.Background() since there is no request scope; HTTP
callers use c.Request().Context().

Assisted-by: Claude:claude-haiku-4-5
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor(backend): propagate request ctx into biometric, detection, rerank, diarization paths

Replaces remaining context.Background() sites in core/backend with the
caller's ctx. After this commit, every core/backend/*.go entry point
threads the request ctx end-to-end to the gRPC client.

Assisted-by: Claude:claude-haiku-4-5
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* refactor(grpc): plumb ctx through AIModel.AudioTranscription{,Stream}

Adds context.Context as first parameter to the AIModel interface methods
that wrap whisper-style transcription. Server-side gRPC handler now
forwards the per-RPC ctx (server-streaming uses stream.Context()).
Whisper, Voxtral, vibevoice-cpp, and sherpa-onnx accept the parameter;
none uses it yet — the actual cancellation primitive lands in the next
commit so this is pure plumbing.

Assisted-by: Claude:claude-sonnet-4-6
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(whisper): add abort_callback hook in the C++ bridge

Installs a std::atomic<int> flag, wires it into
whisper_full_params.abort_callback, and exposes a set_abort(int) C
symbol so Go can flip the flag from a goroutine watching the request
context. transcribe() now distinguishes abort (return 2) from real
whisper_full failure (return 1).

Assisted-by: Claude:claude-haiku-4-5
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(whisper): register set_abort symbol in the purego loader

Adds the Go-side binding for the new C export so the next commit can
call CppSetAbort(1) from a watcher goroutine on ctx.Done().

Assisted-by: Claude:claude-haiku-4-5
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* feat(whisper): honor ctx cancellation and return codes.Canceled

A watcher goroutine watches ctx.Done() during AudioTranscription and
calls CppSetAbort(1) on cancel. whisper_full sees abort_callback return
true at the next compute graph step, returns non-zero, and the bridge
returns 2 -> AudioTranscription maps that to codes.Canceled.

Adds an opt-in test (gated on WHISPER_MODEL_PATH / WHISPER_AUDIO_PATH)
that asserts cancellation latency under 5s and proves the abort flag
resets cleanly so the next transcription succeeds.

Assisted-by: Claude:claude-sonnet-4-6
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(whisper): join the cancel watcher goroutine before returning

Follow-up to 85edf9d2. The previous commit used `defer close(done)` and
called the watcher "joined synchronously" — but close() only signals,
it does not block until the goroutine exits. That left a window where
a late CppSetAbort(1) from a cancelled call could land on the next
call, after its C-side g_abort reset but before whisper_full() began
polling the abort callback, corrupting the second transcription.

Switch to a sync.WaitGroup join so wg.Wait() blocks until the watcher
has actually returned from its select.

Assisted-by: Claude:claude-sonnet-4-6
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(whisper): short-circuit pre-cancelled ctx in AudioTranscription

If ctx is already Done() at entry, return codes.Canceled immediately
instead of running the full transcription. The C-side g_abort reset
happens at the start of transcribe() and would otherwise overwrite a
watcher-set abort flag from an already-cancelled ctx, producing a
spurious successful transcription on a request the client has already
abandoned.

Assisted-by: Claude:claude-haiku-4-5
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* fix(tests/distributed): update testLLM mock for new AudioTranscription signature

Phase B (93c48e19) added context.Context to AIModel.AudioTranscription
but missed the testLLM mock in tests/e2e/distributed. CI golangci-lint
caught it: *testLLM did not implement grpc.AIModel because the method
signature lacked the ctx parameter, which broke the distributed test
suite compilation and cascaded through every backend-build job that
runs `go build ./...`.

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

* test(whisper): port cancellation test to Ginkgo/Gomega

Project policy (.agents/coding-style.md, enforced by golangci-lint
forbidigo) is that all Go tests must use Ginkgo v2 + Gomega — no
stdlib testing patterns (t.Skip, t.Fatalf, etc.). Convert the
cancellation test to a Describe/It block with Skip(...) for env
gating and Expect/HaveOccurred for assertions.

Same coverage: cancel mid-flight returns codes.Canceled within 5s and
a follow-up transcription succeeds, proving the C-side g_abort flag
resets cleanly.

Assisted-by: Claude:claude-opus-4-7
Signed-off-by: Ettore Di Giacinto <mudler@localai.io>

---------

Signed-off-by: Ettore Di Giacinto <mudler@localai.io>
Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
2026-05-08 01:44:47 +02:00

211 lines
6.5 KiB
Go

package backend
import (
"context"
"fmt"
"maps"
"time"
"github.com/mudler/LocalAI/core/config"
"github.com/mudler/LocalAI/core/schema"
"github.com/mudler/LocalAI/core/trace"
grpcPkg "github.com/mudler/LocalAI/pkg/grpc"
"github.com/mudler/LocalAI/pkg/grpc/proto"
"github.com/mudler/LocalAI/pkg/model"
)
// TranscriptionRequest groups the parameters accepted by ModelTranscription.
// Use this so callers don't have to pass long positional arg lists when they
// only care about a subset of fields.
type TranscriptionRequest struct {
Audio string
Language string
Translate bool
Diarize bool
Prompt string
Temperature float32
TimestampGranularities []string
}
func (r *TranscriptionRequest) toProto(threads uint32) *proto.TranscriptRequest {
return &proto.TranscriptRequest{
Dst: r.Audio,
Language: r.Language,
Translate: r.Translate,
Diarize: r.Diarize,
Threads: threads,
Prompt: r.Prompt,
Temperature: r.Temperature,
TimestampGranularities: r.TimestampGranularities,
}
}
func loadTranscriptionModel(ml *model.ModelLoader, modelConfig config.ModelConfig, appConfig *config.ApplicationConfig) (grpcPkg.Backend, error) {
if modelConfig.Backend == "" {
modelConfig.Backend = model.WhisperBackend
}
opts := ModelOptions(modelConfig, appConfig)
transcriptionModel, err := ml.Load(opts...)
if err != nil {
recordModelLoadFailure(appConfig, modelConfig.Name, modelConfig.Backend, err, nil)
return nil, err
}
if transcriptionModel == nil {
return nil, fmt.Errorf("could not load transcription model")
}
return transcriptionModel, nil
}
func ModelTranscription(ctx context.Context, audio, language string, translate, diarize bool, prompt string, ml *model.ModelLoader, modelConfig config.ModelConfig, appConfig *config.ApplicationConfig) (*schema.TranscriptionResult, error) {
return ModelTranscriptionWithOptions(ctx, TranscriptionRequest{
Audio: audio,
Language: language,
Translate: translate,
Diarize: diarize,
Prompt: prompt,
}, ml, modelConfig, appConfig)
}
func ModelTranscriptionWithOptions(ctx context.Context, req TranscriptionRequest, ml *model.ModelLoader, modelConfig config.ModelConfig, appConfig *config.ApplicationConfig) (*schema.TranscriptionResult, error) {
transcriptionModel, err := loadTranscriptionModel(ml, modelConfig, appConfig)
if err != nil {
return nil, err
}
var startTime time.Time
var audioSnippet map[string]any
if appConfig.EnableTracing {
trace.InitBackendTracingIfEnabled(appConfig.TracingMaxItems)
startTime = time.Now()
// Capture audio before the backend call — the backend may delete the file.
audioSnippet = trace.AudioSnippet(req.Audio)
}
r, err := transcriptionModel.AudioTranscription(ctx, req.toProto(uint32(*modelConfig.Threads)))
if err != nil {
if appConfig.EnableTracing {
errData := map[string]any{
"audio_file": req.Audio,
"language": req.Language,
"translate": req.Translate,
"diarize": req.Diarize,
"prompt": req.Prompt,
}
if audioSnippet != nil {
maps.Copy(errData, audioSnippet)
}
trace.RecordBackendTrace(trace.BackendTrace{
Timestamp: startTime,
Duration: time.Since(startTime),
Type: trace.BackendTraceTranscription,
ModelName: modelConfig.Name,
Backend: modelConfig.Backend,
Summary: trace.TruncateString(req.Audio, 200),
Error: err.Error(),
Data: errData,
})
}
return nil, err
}
tr := transcriptResultFromProto(r)
if appConfig.EnableTracing {
data := map[string]any{
"audio_file": req.Audio,
"language": req.Language,
"translate": req.Translate,
"diarize": req.Diarize,
"prompt": req.Prompt,
"result_text": tr.Text,
"segments_count": len(tr.Segments),
}
if audioSnippet != nil {
maps.Copy(data, audioSnippet)
}
trace.RecordBackendTrace(trace.BackendTrace{
Timestamp: startTime,
Duration: time.Since(startTime),
Type: trace.BackendTraceTranscription,
ModelName: modelConfig.Name,
Backend: modelConfig.Backend,
Summary: trace.TruncateString(req.Audio+" -> "+tr.Text, 200),
Data: data,
})
}
return tr, err
}
// TranscriptionStreamChunk is a streaming event emitted by
// ModelTranscriptionStream. Either Delta carries an incremental text fragment,
// or Final carries the completed transcription as the very last event.
type TranscriptionStreamChunk struct {
Delta string
Final *schema.TranscriptionResult
}
// ModelTranscriptionStream runs the gRPC streaming transcription RPC and
// invokes onChunk for each event the backend produces. Backends that don't
// support real streaming should still emit one terminal event with Final set,
// which the HTTP layer turns into a single delta + done SSE pair.
func ModelTranscriptionStream(ctx context.Context, req TranscriptionRequest, ml *model.ModelLoader, modelConfig config.ModelConfig, appConfig *config.ApplicationConfig, onChunk func(TranscriptionStreamChunk)) error {
transcriptionModel, err := loadTranscriptionModel(ml, modelConfig, appConfig)
if err != nil {
return err
}
pbReq := req.toProto(uint32(*modelConfig.Threads))
pbReq.Stream = true
return transcriptionModel.AudioTranscriptionStream(ctx, pbReq, func(chunk *proto.TranscriptStreamResponse) {
if chunk == nil {
return
}
out := TranscriptionStreamChunk{Delta: chunk.Delta}
if chunk.FinalResult != nil {
out.Final = transcriptResultFromProto(chunk.FinalResult)
}
onChunk(out)
})
}
func transcriptResultFromProto(r *proto.TranscriptResult) *schema.TranscriptionResult {
if r == nil {
return &schema.TranscriptionResult{}
}
tr := &schema.TranscriptionResult{
Text: r.Text,
Language: r.Language,
Duration: float64(r.Duration),
}
for _, s := range r.Segments {
var tks []int
for _, t := range s.Tokens {
tks = append(tks, int(t))
}
var words []schema.TranscriptionWord
for _, w := range s.Words {
var word = schema.TranscriptionWord{
Start: time.Duration(w.Start),
End: time.Duration(w.End),
Text: w.Text,
}
words = append(words, word)
tr.Words = append(tr.Words, word)
}
tr.Segments = append(tr.Segments,
schema.TranscriptionSegment{
Text: s.Text,
Id: int(s.Id),
Start: time.Duration(s.Start),
End: time.Duration(s.End),
Tokens: tks,
Speaker: s.Speaker,
Words: words,
})
}
return tr
}