mirror of
https://github.com/mudler/LocalAI.git
synced 2026-06-12 02:38:19 -04:00
Compare commits
21 Commits
dependabot
...
v4.4.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f618636c71 | ||
|
|
892fc49949 | ||
|
|
228a6dfe79 | ||
|
|
51a92b6093 | ||
|
|
b5964d385d | ||
|
|
fba8c9c498 | ||
|
|
6b2badb837 | ||
|
|
8b8506d01a | ||
|
|
6910a0bb48 | ||
|
|
cffd03b522 | ||
|
|
bf448d3794 | ||
|
|
1d4a12f7c0 | ||
|
|
186d62801d | ||
|
|
da4ed05429 | ||
|
|
ec1eea4f45 | ||
|
|
b203b32e57 | ||
|
|
48a8ce98aa | ||
|
|
8344d1c865 | ||
|
|
d2e6b93369 | ||
|
|
e1ec03d33f | ||
|
|
9323f4b5ca |
2
Makefile
2
Makefile
@@ -180,7 +180,7 @@ osx-signed: build
|
||||
|
||||
## Run
|
||||
run: ## run local-ai
|
||||
CGO_LDFLAGS="$(CGO_LDFLAGS)" $(GOCMD) run ./
|
||||
CGO_LDFLAGS="$(CGO_LDFLAGS)" $(GOCMD) run ./cmd/local-ai
|
||||
|
||||
prepare-test: protogen-go build-mock-backend
|
||||
|
||||
|
||||
10
README.md
10
README.md
@@ -149,6 +149,16 @@ local-ai run https://gist.githubusercontent.com/.../phi-2.yaml
|
||||
local-ai run oci://localai/phi-2:latest
|
||||
```
|
||||
|
||||
To test a running LocalAI server from the terminal, open an interactive chat session from another shell. Inside the prompt, `/models` lists installed models and `/model <name>` switches between them.
|
||||
|
||||
```bash
|
||||
# Terminal 1
|
||||
local-ai run llama-3.2-1b-instruct:q4_k_m
|
||||
|
||||
# Terminal 2
|
||||
local-ai chat --model llama-3.2-1b-instruct:q4_k_m
|
||||
```
|
||||
|
||||
> **Automatic Backend Detection**: LocalAI automatically detects your GPU capabilities and downloads the appropriate backend. For advanced options, see [GPU Acceleration](https://localai.io/features/gpu-acceleration/).
|
||||
|
||||
For more details, see the [Getting Started guide](https://localai.io/basics/getting_started/).
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
# ds4 backend Makefile.
|
||||
#
|
||||
# Upstream pin lives below as DS4_VERSION?=c463029c205c2ec8d7ab6c0df4a3f52979091286
|
||||
# Upstream pin lives below as DS4_VERSION?=8384adf0f9fa0f3bb342dd925372de778b95b263
|
||||
# (.github/bump_deps.sh) can find and update it - matches the
|
||||
# llama-cpp / ik-llama-cpp / turboquant convention.
|
||||
|
||||
DS4_VERSION?=c463029c205c2ec8d7ab6c0df4a3f52979091286
|
||||
DS4_VERSION?=8384adf0f9fa0f3bb342dd925372de778b95b263
|
||||
DS4_REPO?=https://github.com/antirez/ds4
|
||||
|
||||
CURRENT_MAKEFILE_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
|
||||
IK_LLAMA_VERSION?=6b9de3dbaa21ae95ea80638e5ee836795cc48c93
|
||||
IK_LLAMA_VERSION?=e6f8112f3ba126eed3ff5b30cdd08085414a7516
|
||||
LLAMA_REPO?=https://github.com/ikawrakow/ik_llama.cpp
|
||||
|
||||
CMAKE_ARGS?=
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
|
||||
LLAMA_VERSION?=9e3b928fd8c9d14dbf15a8768b9fdd7e5c721d66
|
||||
LLAMA_VERSION?=039e20a2db9e87b2477c76cc04905f3e1acad77f
|
||||
LLAMA_REPO?=https://github.com/ggerganov/llama.cpp
|
||||
|
||||
CMAKE_ARGS?=
|
||||
|
||||
@@ -381,6 +381,15 @@ json parse_options(bool streaming, const backend::PredictOptions* predict, const
|
||||
});
|
||||
}
|
||||
|
||||
// for each video in the request, add the video data
|
||||
for (int i = 0; i < predict->videos_size(); i++) {
|
||||
data["video_data"].push_back(json
|
||||
{
|
||||
{"id", i},
|
||||
{"data", predict->videos(i)},
|
||||
});
|
||||
}
|
||||
|
||||
data["stop"] = predict->stopprompts();
|
||||
// data["n_probs"] = predict->nprobs();
|
||||
//TODO: images,
|
||||
@@ -1503,7 +1512,7 @@ public:
|
||||
msg_json["role"] = msg.role();
|
||||
|
||||
bool is_last_user_msg = (i == last_user_msg_idx);
|
||||
bool has_images_or_audio = (request->images_size() > 0 || request->audios_size() > 0);
|
||||
bool has_images_or_audio = (request->images_size() > 0 || request->audios_size() > 0 || request->videos_size() > 0);
|
||||
|
||||
// Handle content - can be string, null, or array
|
||||
// For multimodal content, we'll embed images/audio from separate fields
|
||||
@@ -1554,6 +1563,16 @@ public:
|
||||
content_array.push_back(audio_chunk);
|
||||
}
|
||||
}
|
||||
if (request->videos_size() > 0) {
|
||||
for (int j = 0; j < request->videos_size(); j++) {
|
||||
json video_chunk;
|
||||
video_chunk["type"] = "input_video";
|
||||
json input_video;
|
||||
input_video["data"] = request->videos(j);
|
||||
video_chunk["input_video"] = input_video;
|
||||
content_array.push_back(video_chunk);
|
||||
}
|
||||
}
|
||||
msg_json["content"] = content_array;
|
||||
} else {
|
||||
// Use content as-is (already array or not last user message)
|
||||
@@ -1588,6 +1607,16 @@ public:
|
||||
content_array.push_back(audio_chunk);
|
||||
}
|
||||
}
|
||||
if (request->videos_size() > 0) {
|
||||
for (int j = 0; j < request->videos_size(); j++) {
|
||||
json video_chunk;
|
||||
video_chunk["type"] = "input_video";
|
||||
json input_video;
|
||||
input_video["data"] = request->videos(j);
|
||||
video_chunk["input_video"] = input_video;
|
||||
content_array.push_back(video_chunk);
|
||||
}
|
||||
}
|
||||
msg_json["content"] = content_array;
|
||||
} else if (msg.role() == "tool") {
|
||||
// Tool role messages must have content field set, even if empty
|
||||
@@ -2039,6 +2068,16 @@ public:
|
||||
files.push_back(decoded_data);
|
||||
}
|
||||
}
|
||||
|
||||
const auto &video_data = data.find("video_data");
|
||||
if (video_data != data.end() && video_data->is_array())
|
||||
{
|
||||
for (const auto &video : *video_data)
|
||||
{
|
||||
auto decoded_data = base64_decode(video["data"].get<std::string>());
|
||||
files.push_back(decoded_data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const bool has_mtmd = ctx_server.impl->mctx != nullptr;
|
||||
@@ -2291,7 +2330,7 @@ public:
|
||||
}
|
||||
|
||||
bool is_last_user_msg = (i == last_user_msg_idx);
|
||||
bool has_images_or_audio = (request->images_size() > 0 || request->audios_size() > 0);
|
||||
bool has_images_or_audio = (request->images_size() > 0 || request->audios_size() > 0 || request->videos_size() > 0);
|
||||
|
||||
// Handle content - can be string, null, or array
|
||||
// For multimodal content, we'll embed images/audio from separate fields
|
||||
@@ -2344,6 +2383,16 @@ public:
|
||||
content_array.push_back(audio_chunk);
|
||||
}
|
||||
}
|
||||
if (request->videos_size() > 0) {
|
||||
for (int j = 0; j < request->videos_size(); j++) {
|
||||
json video_chunk;
|
||||
video_chunk["type"] = "input_video";
|
||||
json input_video;
|
||||
input_video["data"] = request->videos(j);
|
||||
video_chunk["input_video"] = input_video;
|
||||
content_array.push_back(video_chunk);
|
||||
}
|
||||
}
|
||||
msg_json["content"] = content_array;
|
||||
} else {
|
||||
// Use content as-is (already array or not last user message)
|
||||
@@ -2383,6 +2432,16 @@ public:
|
||||
content_array.push_back(audio_chunk);
|
||||
}
|
||||
}
|
||||
if (request->videos_size() > 0) {
|
||||
for (int j = 0; j < request->videos_size(); j++) {
|
||||
json video_chunk;
|
||||
video_chunk["type"] = "input_video";
|
||||
json input_video;
|
||||
input_video["data"] = request->videos(j);
|
||||
video_chunk["input_video"] = input_video;
|
||||
content_array.push_back(video_chunk);
|
||||
}
|
||||
}
|
||||
msg_json["content"] = content_array;
|
||||
SRV_INF("[CONTENT DEBUG] Predict: Message %d created content array with media\n", i);
|
||||
} else if (!msg.tool_calls().empty()) {
|
||||
@@ -2845,6 +2904,16 @@ public:
|
||||
files.push_back(decoded_data);
|
||||
}
|
||||
}
|
||||
|
||||
const auto &video_data = data.find("video_data");
|
||||
if (video_data != data.end() && video_data->is_array())
|
||||
{
|
||||
for (const auto &video : *video_data)
|
||||
{
|
||||
auto decoded_data = base64_decode(video["data"].get<std::string>());
|
||||
files.push_back(decoded_data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// process files
|
||||
|
||||
@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
|
||||
|
||||
# CrispASR version (release tag)
|
||||
CRISPASR_REPO?=https://github.com/CrispStrobe/CrispASR
|
||||
CRISPASR_VERSION?=f7838a306687f22c281d29c250f879a4ab3df2d7
|
||||
CRISPASR_VERSION?=c29f6653a516a3001d923944dad8892072cc7334
|
||||
SO_TARGET?=libgocrispasr.so
|
||||
|
||||
CMAKE_ARGS+=-DBUILD_SHARED_LIBS=OFF
|
||||
|
||||
@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
|
||||
|
||||
# stablediffusion.cpp (ggml)
|
||||
STABLEDIFFUSION_GGML_REPO?=https://github.com/leejet/stable-diffusion.cpp
|
||||
STABLEDIFFUSION_GGML_VERSION?=b3d56d0ba1bd437886079e339118e8e75bb79ee7
|
||||
STABLEDIFFUSION_GGML_VERSION?=19bdfe22d255d5b4dff39d449318b9bc5ea2317f
|
||||
|
||||
CMAKE_ARGS+=-DGGML_MAX_NAME=128
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ JOBS?=$(shell nproc --ignore=1)
|
||||
|
||||
# whisper.cpp version
|
||||
WHISPER_REPO?=https://github.com/ggml-org/whisper.cpp
|
||||
WHISPER_CPP_VERSION?=a8ec021f2750a473ff4a8f3883bc9fdf5feafa84
|
||||
WHISPER_CPP_VERSION?=df7638d8229a243af8a4b5a8ae557e0d74e0a0ae
|
||||
SO_TARGET?=libgowhisper.so
|
||||
|
||||
CMAKE_ARGS+=-DBUILD_SHARED_LIBS=OFF
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
--extra-index-url https://download.pytorch.org/whl/cpu
|
||||
transformers==5.10.2
|
||||
transformers==4.48.3
|
||||
accelerate
|
||||
torch==2.4.1
|
||||
torchaudio==2.4.1
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
torch==2.4.1
|
||||
torchaudio==2.4.1
|
||||
transformers==5.10.2
|
||||
transformers==4.48.3
|
||||
accelerate
|
||||
coqui-tts
|
||||
@@ -1,6 +1,6 @@
|
||||
--extra-index-url https://download.pytorch.org/whl/rocm7.0
|
||||
torch==2.10.0+rocm7.0
|
||||
torchaudio==2.10.0+rocm7.0
|
||||
transformers==5.10.2
|
||||
transformers==4.48.3
|
||||
accelerate
|
||||
coqui-tts
|
||||
@@ -3,6 +3,6 @@ torch==2.8.0+xpu
|
||||
torchaudio==2.8.0+xpu
|
||||
optimum[openvino]
|
||||
setuptools
|
||||
transformers==5.10.2
|
||||
transformers==4.48.3
|
||||
accelerate
|
||||
coqui-tts
|
||||
@@ -1,4 +1,4 @@
|
||||
torch==2.7.1
|
||||
transformers==5.10.2
|
||||
transformers==4.48.3
|
||||
accelerate
|
||||
coqui-tts
|
||||
|
||||
@@ -26,7 +26,10 @@ from vllm.engine.arg_utils import AsyncEngineArgs
|
||||
from vllm.engine.async_llm_engine import AsyncLLMEngine
|
||||
from vllm.sampling_params import SamplingParams
|
||||
from vllm.utils import random_uuid
|
||||
from vllm.transformers_utils.tokenizer import get_tokenizer
|
||||
try:
|
||||
from vllm.tokenizers import get_tokenizer # vLLM >= 0.22
|
||||
except ImportError:
|
||||
from vllm.transformers_utils.tokenizer import get_tokenizer # vLLM < 0.22
|
||||
from vllm.multimodal.utils import fetch_image
|
||||
from vllm.assets.video import VideoAsset
|
||||
import base64
|
||||
|
||||
30
core/cli/chat/chat.go
Normal file
30
core/cli/chat/chat.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package chat
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
Model string
|
||||
BaseURL string
|
||||
APIKey string
|
||||
In io.Reader
|
||||
Out io.Writer
|
||||
}
|
||||
|
||||
func Run(ctx context.Context, opts Options) error {
|
||||
if opts.In == nil {
|
||||
opts.In = strings.NewReader("")
|
||||
}
|
||||
if opts.Out == nil {
|
||||
opts.Out = io.Discard
|
||||
}
|
||||
|
||||
session, err := newChatSession(ctx, newLocalAIChatClient(opts.BaseURL, opts.APIKey), opts.Model)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return runTerminalChat(ctx, session, opts.In, opts.Out)
|
||||
}
|
||||
13
core/cli/chat/chat_suite_test.go
Normal file
13
core/cli/chat/chat_suite_test.go
Normal file
@@ -0,0 +1,13 @@
|
||||
package chat
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
func TestChat(t *testing.T) {
|
||||
RegisterFailHandler(Fail)
|
||||
RunSpecs(t, "Chat Suite")
|
||||
}
|
||||
172
core/cli/chat/chat_test.go
Normal file
172
core/cli/chat/chat_test.go
Normal file
@@ -0,0 +1,172 @@
|
||||
package chat
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("Run chat", func() {
|
||||
It("streams a single chat response", func() {
|
||||
var capturedModel string
|
||||
var capturedAuth string
|
||||
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path == "/v1/models" {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
writeResponse(w, `{"object":"list","data":[{"id":"test-model","object":"model"}]}`)
|
||||
return
|
||||
}
|
||||
|
||||
Expect(r.URL.Path).To(Equal("/v1/chat/completions"))
|
||||
capturedAuth = r.Header.Get("Authorization")
|
||||
|
||||
var body struct {
|
||||
Model string `json:"model"`
|
||||
Messages []struct {
|
||||
Role string `json:"role"`
|
||||
Content string `json:"content"`
|
||||
} `json:"messages"`
|
||||
}
|
||||
Expect(json.NewDecoder(r.Body).Decode(&body)).To(Succeed())
|
||||
capturedModel = body.Model
|
||||
Expect(body.Messages).To(HaveLen(1))
|
||||
Expect(body.Messages[0].Role).To(Equal("user"))
|
||||
Expect(body.Messages[0].Content).To(Equal("hello"))
|
||||
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
writeResponse(w, "data: {\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hi\"}}]}\n\n")
|
||||
writeResponse(w, "data: {\"choices\":[{\"index\":0,\"delta\":{\"content\":\"!\"}}]}\n\n")
|
||||
writeResponse(w, "data: [DONE]\n\n")
|
||||
}))
|
||||
defer server.Close()
|
||||
|
||||
var out bytes.Buffer
|
||||
err := Run(GinkgoT().Context(), Options{
|
||||
Model: "test-model",
|
||||
BaseURL: server.URL + "/v1",
|
||||
APIKey: "secret",
|
||||
In: strings.NewReader("hello\n/exit\n"),
|
||||
Out: &out,
|
||||
})
|
||||
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(capturedModel).To(Equal("test-model"))
|
||||
Expect(capturedAuth).To(Equal("Bearer secret"))
|
||||
Expect(out.String()).To(ContainSubstring("assistant: hi!"))
|
||||
Expect(out.String()).To(ContainSubstring("bye"))
|
||||
})
|
||||
|
||||
It("auto-selects the only available model", func() {
|
||||
server := chatTestServer([]string{"solo"}, nil)
|
||||
defer server.Close()
|
||||
|
||||
var out bytes.Buffer
|
||||
err := Run(GinkgoT().Context(), Options{
|
||||
BaseURL: server.URL + "/v1",
|
||||
In: strings.NewReader("/exit\n"),
|
||||
Out: &out,
|
||||
})
|
||||
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(out.String()).To(ContainSubstring("LocalAI chat (solo)"))
|
||||
})
|
||||
|
||||
It("returns an actionable error when no models are installed", func() {
|
||||
server := chatTestServer(nil, nil)
|
||||
defer server.Close()
|
||||
|
||||
err := Run(GinkgoT().Context(), Options{
|
||||
BaseURL: server.URL + "/v1",
|
||||
In: strings.NewReader(""),
|
||||
})
|
||||
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("no chat models are installed"))
|
||||
Expect(err.Error()).To(ContainSubstring("local-ai models install <model>"))
|
||||
})
|
||||
|
||||
It("returns an actionable error when multiple models are available without a selection", func() {
|
||||
server := chatTestServer([]string{"alpha", "beta"}, nil)
|
||||
defer server.Close()
|
||||
|
||||
err := Run(GinkgoT().Context(), Options{
|
||||
BaseURL: server.URL + "/v1",
|
||||
In: strings.NewReader(""),
|
||||
})
|
||||
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(ContainSubstring("multiple models are available"))
|
||||
Expect(err.Error()).To(ContainSubstring("--model"))
|
||||
Expect(err.Error()).To(ContainSubstring("alpha"))
|
||||
Expect(err.Error()).To(ContainSubstring("beta"))
|
||||
})
|
||||
|
||||
It("lists and switches models inside the chat", func() {
|
||||
requestedModels := []string{}
|
||||
server := chatTestServer([]string{"alpha", "beta"}, func(model string) {
|
||||
requestedModels = append(requestedModels, model)
|
||||
})
|
||||
defer server.Close()
|
||||
|
||||
var out bytes.Buffer
|
||||
err := Run(GinkgoT().Context(), Options{
|
||||
Model: "alpha",
|
||||
BaseURL: server.URL + "/v1",
|
||||
In: strings.NewReader("/models\n/model beta\nhello\n/exit\n"),
|
||||
Out: &out,
|
||||
})
|
||||
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(out.String()).To(ContainSubstring("* alpha"))
|
||||
Expect(out.String()).To(ContainSubstring(" beta"))
|
||||
Expect(out.String()).To(ContainSubstring("switched to beta; conversation cleared"))
|
||||
Expect(requestedModels).To(Equal([]string{"beta"}))
|
||||
})
|
||||
})
|
||||
|
||||
func chatTestServer(models []string, onChat func(model string)) *httptest.Server {
|
||||
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.URL.Path {
|
||||
case "/v1/models":
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
writeResponse(w, `{"object":"list","data":[`)
|
||||
for i, model := range models {
|
||||
if i > 0 {
|
||||
writeResponse(w, ",")
|
||||
}
|
||||
writeResponsef(w, `{"id":%q,"object":"model"}`, model)
|
||||
}
|
||||
writeResponse(w, `]}`)
|
||||
case "/v1/chat/completions":
|
||||
var body struct {
|
||||
Model string `json:"model"`
|
||||
}
|
||||
Expect(json.NewDecoder(r.Body).Decode(&body)).To(Succeed())
|
||||
if onChat != nil {
|
||||
onChat(body.Model)
|
||||
}
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
writeResponse(w, "data: {\"choices\":[{\"index\":0,\"delta\":{\"content\":\"ok\"}}]}\n\n")
|
||||
writeResponse(w, "data: [DONE]\n\n")
|
||||
default:
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
func writeResponse(w io.Writer, text string) {
|
||||
_, err := fmt.Fprint(w, text)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
}
|
||||
|
||||
func writeResponsef(w io.Writer, format string, args ...any) {
|
||||
_, err := fmt.Fprintf(w, format, args...)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
}
|
||||
114
core/cli/chat/client.go
Normal file
114
core/cli/chat/client.go
Normal file
@@ -0,0 +1,114 @@
|
||||
package chat
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
openai "github.com/sashabaranov/go-openai"
|
||||
)
|
||||
|
||||
type chatClient interface {
|
||||
ListModels(ctx context.Context) ([]string, error)
|
||||
StreamChat(ctx context.Context, model string, messages []chatMessage, out io.Writer) (string, error)
|
||||
}
|
||||
|
||||
type localAIChatClient struct {
|
||||
client *openai.Client
|
||||
}
|
||||
|
||||
func newLocalAIChatClient(baseURL string, apiKey string) *localAIChatClient {
|
||||
cfg := openai.DefaultConfig(apiKey)
|
||||
cfg.BaseURL = baseURL
|
||||
return &localAIChatClient{client: openai.NewClientWithConfig(cfg)}
|
||||
}
|
||||
|
||||
func (c *localAIChatClient) ListModels(ctx context.Context) ([]string, error) {
|
||||
resp, err := c.client.ListModels(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
models := make([]string, 0, len(resp.Models))
|
||||
for _, model := range resp.Models {
|
||||
if model.ID != "" {
|
||||
models = append(models, model.ID)
|
||||
}
|
||||
}
|
||||
sort.Strings(models)
|
||||
return models, nil
|
||||
}
|
||||
|
||||
func (c *localAIChatClient) StreamChat(ctx context.Context, model string, messages []chatMessage, out io.Writer) (string, error) {
|
||||
stream, err := c.client.CreateChatCompletionStream(ctx, openai.ChatCompletionRequest{
|
||||
Model: model,
|
||||
Messages: openAIChatMessages(messages),
|
||||
})
|
||||
if err != nil {
|
||||
return "", friendlyChatError(err, model)
|
||||
}
|
||||
defer func() {
|
||||
_ = stream.Close()
|
||||
}()
|
||||
|
||||
var answer strings.Builder
|
||||
for {
|
||||
resp, err := stream.Recv()
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return answer.String(), friendlyChatError(err, model)
|
||||
}
|
||||
if len(resp.Choices) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
token := resp.Choices[0].Delta.Content
|
||||
if token == "" {
|
||||
continue
|
||||
}
|
||||
answer.WriteString(token)
|
||||
if _, err := fmt.Fprint(out, token); err != nil {
|
||||
return answer.String(), err
|
||||
}
|
||||
}
|
||||
|
||||
return answer.String(), nil
|
||||
}
|
||||
|
||||
func openAIChatMessages(messages []chatMessage) []openai.ChatCompletionMessage {
|
||||
converted := make([]openai.ChatCompletionMessage, len(messages))
|
||||
for i, message := range messages {
|
||||
converted[i] = openai.ChatCompletionMessage{
|
||||
Role: message.Role,
|
||||
Content: message.Content,
|
||||
}
|
||||
}
|
||||
return converted
|
||||
}
|
||||
|
||||
func friendlyChatError(err error, model string) error {
|
||||
var apiErr *openai.APIError
|
||||
if errors.As(err, &apiErr) {
|
||||
switch apiErr.HTTPStatusCode {
|
||||
case 404:
|
||||
return fmt.Errorf("model %q is not available. Run `local-ai models list`, install a model with `local-ai models install <model>`, or switch with `/model <name>`", model)
|
||||
case 403:
|
||||
return fmt.Errorf("model %q is disabled. Enable it from LocalAI settings or choose another model with `/model <name>`", model)
|
||||
}
|
||||
if apiErr.Message != "" {
|
||||
return errors.New(apiErr.Message)
|
||||
}
|
||||
}
|
||||
|
||||
msg := err.Error()
|
||||
if strings.Contains(msg, "model") && strings.Contains(msg, "not found") {
|
||||
return fmt.Errorf("model %q is not available. Run `local-ai models list`, install a model with `local-ai models install <model>`, or switch with `/model <name>`", model)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
17
core/cli/chat/models.go
Normal file
17
core/cli/chat/models.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package chat
|
||||
|
||||
import "strings"
|
||||
|
||||
func formatChatModelList(models []string, current string) string {
|
||||
var b strings.Builder
|
||||
for _, model := range models {
|
||||
prefix := " "
|
||||
if model == current {
|
||||
prefix = "* "
|
||||
}
|
||||
b.WriteString(prefix)
|
||||
b.WriteString(model)
|
||||
b.WriteByte('\n')
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
120
core/cli/chat/session.go
Normal file
120
core/cli/chat/session.go
Normal file
@@ -0,0 +1,120 @@
|
||||
package chat
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
chatRoleUser = "user"
|
||||
chatRoleAssistant = "assistant"
|
||||
)
|
||||
|
||||
type chatMessage struct {
|
||||
Role string
|
||||
Content string
|
||||
}
|
||||
|
||||
type chatSession struct {
|
||||
client chatClient
|
||||
model string
|
||||
models []string
|
||||
messages []chatMessage
|
||||
}
|
||||
|
||||
func newChatSession(ctx context.Context, client chatClient, requestedModel string) (*chatSession, error) {
|
||||
models, err := client.ListModels(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list models: %w", err)
|
||||
}
|
||||
|
||||
model, err := resolveChatModel(requestedModel, models)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &chatSession{
|
||||
client: client,
|
||||
model: model,
|
||||
models: models,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *chatSession) CurrentModel() string {
|
||||
return s.model
|
||||
}
|
||||
|
||||
func (s *chatSession) Models() []string {
|
||||
models := make([]string, len(s.models))
|
||||
copy(models, s.models)
|
||||
return models
|
||||
}
|
||||
|
||||
func (s *chatSession) Clear() {
|
||||
s.messages = nil
|
||||
}
|
||||
|
||||
func (s *chatSession) SwitchModel(model string) error {
|
||||
if !modelExists(s.models, model) {
|
||||
return fmt.Errorf("model %q is not available. Use /models to see installed models", model)
|
||||
}
|
||||
s.model = model
|
||||
s.Clear()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *chatSession) Send(ctx context.Context, prompt string, out io.Writer) error {
|
||||
s.messages = append(s.messages, chatMessage{
|
||||
Role: chatRoleUser,
|
||||
Content: prompt,
|
||||
})
|
||||
|
||||
answer, err := s.client.StreamChat(ctx, s.model, s.messages, out)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.messages = append(s.messages, chatMessage{
|
||||
Role: chatRoleAssistant,
|
||||
Content: answer,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func resolveChatModel(requested string, models []string) (string, error) {
|
||||
switch {
|
||||
case requested == "" && len(models) == 0:
|
||||
return "", errors.New(`no chat models are installed.
|
||||
|
||||
Install a model first, for example:
|
||||
local-ai models list
|
||||
local-ai models install <model>
|
||||
local-ai run
|
||||
|
||||
Then start a chat session:
|
||||
local-ai chat --model <model>`)
|
||||
case requested == "" && len(models) == 1:
|
||||
return models[0], nil
|
||||
case requested == "" && len(models) > 1:
|
||||
var b strings.Builder
|
||||
b.WriteString("multiple models are available; choose one with --model:\n")
|
||||
b.WriteString(formatChatModelList(models, ""))
|
||||
return "", errors.New(b.String())
|
||||
case !modelExists(models, requested):
|
||||
return "", fmt.Errorf("model %q is not available. Use `local-ai models list` and `local-ai models install <model>`, or pass an installed model with --model", requested)
|
||||
default:
|
||||
return requested, nil
|
||||
}
|
||||
}
|
||||
|
||||
func modelExists(models []string, name string) bool {
|
||||
for _, model := range models {
|
||||
if model == name {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
56
core/cli/chat/session_test.go
Normal file
56
core/cli/chat/session_test.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package chat
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("Chat session", func() {
|
||||
It("keeps model switching and message history out of the terminal adapter", func() {
|
||||
client := &fakeChatClient{
|
||||
models: []string{"alpha", "beta"},
|
||||
answer: "pong",
|
||||
}
|
||||
|
||||
session, err := newChatSession(context.Background(), client, "alpha")
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(session.CurrentModel()).To(Equal("alpha"))
|
||||
|
||||
Expect(session.SwitchModel("beta")).To(Succeed())
|
||||
Expect(session.CurrentModel()).To(Equal("beta"))
|
||||
Expect(session.Send(context.Background(), "ping", io.Discard)).To(Succeed())
|
||||
|
||||
Expect(client.requests).To(HaveLen(1))
|
||||
Expect(client.requests[0].model).To(Equal("beta"))
|
||||
Expect(client.requests[0].messages).To(HaveLen(1))
|
||||
Expect(client.requests[0].messages[0].Content).To(Equal("ping"))
|
||||
})
|
||||
})
|
||||
|
||||
type fakeChatClient struct {
|
||||
models []string
|
||||
answer string
|
||||
requests []fakeChatRequest
|
||||
}
|
||||
|
||||
type fakeChatRequest struct {
|
||||
model string
|
||||
messages []chatMessage
|
||||
}
|
||||
|
||||
func (c *fakeChatClient) ListModels(context.Context) ([]string, error) {
|
||||
return c.models, nil
|
||||
}
|
||||
|
||||
func (c *fakeChatClient) StreamChat(_ context.Context, model string, messages []chatMessage, out io.Writer) (string, error) {
|
||||
copied := make([]chatMessage, len(messages))
|
||||
copy(copied, messages)
|
||||
c.requests = append(c.requests, fakeChatRequest{model: model, messages: copied})
|
||||
if _, err := io.WriteString(out, c.answer); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return c.answer, nil
|
||||
}
|
||||
93
core/cli/chat/terminal.go
Normal file
93
core/cli/chat/terminal.go
Normal file
@@ -0,0 +1,93 @@
|
||||
package chat
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func runTerminalChat(ctx context.Context, session *chatSession, in io.Reader, out io.Writer) error {
|
||||
scanner := bufio.NewScanner(in)
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), 4*1024*1024)
|
||||
|
||||
if err := writeChat(out, "LocalAI chat (%s)\n", session.CurrentModel()); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := writeChat(out, "Type /exit to quit, /clear to reset the conversation, /models to list models.\n"); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for {
|
||||
if err := writeChat(out, "\n> "); err != nil {
|
||||
return err
|
||||
}
|
||||
if !scanner.Scan() {
|
||||
break
|
||||
}
|
||||
|
||||
prompt := strings.TrimSpace(scanner.Text())
|
||||
switch prompt {
|
||||
case "":
|
||||
continue
|
||||
case "/bye", "/exit", "/quit":
|
||||
return writeChat(out, "bye\n")
|
||||
case "/clear":
|
||||
session.Clear()
|
||||
if err := writeChat(out, "conversation cleared\n"); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
case "/models":
|
||||
if err := printChatModels(out, session.Models(), session.CurrentModel()); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if nextModel, ok := strings.CutPrefix(prompt, "/model "); ok {
|
||||
nextModel = strings.TrimSpace(nextModel)
|
||||
if nextModel == "" {
|
||||
if err := writeChat(out, "usage: /model <name>\n"); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err := session.SwitchModel(nextModel); err != nil {
|
||||
if writeErr := writeChat(out, "%s\n", err); writeErr != nil {
|
||||
return writeErr
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err := writeChat(out, "switched to %s; conversation cleared\n", session.CurrentModel()); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if err := writeChat(out, "assistant: "); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := session.Send(ctx, prompt, out); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := writeChat(out, "\n"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return scanner.Err()
|
||||
}
|
||||
|
||||
func printChatModels(out io.Writer, models []string, current string) error {
|
||||
if len(models) == 0 {
|
||||
return writeChat(out, "no models installed\n")
|
||||
}
|
||||
return writeChat(out, "%s", formatChatModelList(models, current))
|
||||
}
|
||||
|
||||
func writeChat(out io.Writer, format string, args ...any) error {
|
||||
_, err := fmt.Fprintf(out, format, args...)
|
||||
return err
|
||||
}
|
||||
25
core/cli/chat_cmd.go
Normal file
25
core/cli/chat_cmd.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
|
||||
chatcli "github.com/mudler/LocalAI/core/cli/chat"
|
||||
cliContext "github.com/mudler/LocalAI/core/cli/context"
|
||||
)
|
||||
|
||||
type ChatCMD struct {
|
||||
Model string `short:"m" help:"Model name to use. Defaults to the only model returned by the server when exactly one is available"`
|
||||
Endpoint string `env:"LOCALAI_CHAT_ENDPOINT" default:"http://127.0.0.1:8080" help:"LocalAI server endpoint. The /v1 path is added automatically when omitted"`
|
||||
APIKey string `env:"LOCALAI_API_KEY,API_KEY" help:"API key to use when the LocalAI server requires authentication"`
|
||||
}
|
||||
|
||||
func (c *ChatCMD) Run(ctx *cliContext.Context) error {
|
||||
return chatcli.Run(context.Background(), chatcli.Options{
|
||||
Model: c.Model,
|
||||
BaseURL: chatAPIBaseURL(c.Endpoint),
|
||||
APIKey: c.APIKey,
|
||||
In: os.Stdin,
|
||||
Out: os.Stdout,
|
||||
})
|
||||
}
|
||||
27
core/cli/chat_cmd_test.go
Normal file
27
core/cli/chat_cmd_test.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("Chat command wiring", func() {
|
||||
Describe("chatAPIBaseURL", func() {
|
||||
It("adds /v1 to a root endpoint", func() {
|
||||
Expect(chatAPIBaseURL("http://127.0.0.1:8080")).To(Equal("http://127.0.0.1:8080/v1"))
|
||||
})
|
||||
|
||||
It("keeps endpoints that already include /v1", func() {
|
||||
Expect(chatAPIBaseURL("http://127.0.0.1:8080/v1")).To(Equal("http://127.0.0.1:8080/v1"))
|
||||
Expect(chatAPIBaseURL("http://127.0.0.1:8080/v1/")).To(Equal("http://127.0.0.1:8080/v1"))
|
||||
})
|
||||
|
||||
It("adds a default http scheme", func() {
|
||||
Expect(chatAPIBaseURL("127.0.0.1:8080")).To(Equal("http://127.0.0.1:8080/v1"))
|
||||
})
|
||||
|
||||
It("preserves non-root paths before /v1", func() {
|
||||
Expect(chatAPIBaseURL("http://127.0.0.1:8080/localai")).To(Equal("http://127.0.0.1:8080/localai/v1"))
|
||||
})
|
||||
})
|
||||
})
|
||||
29
core/cli/chat_endpoint.go
Normal file
29
core/cli/chat_endpoint.go
Normal file
@@ -0,0 +1,29 @@
|
||||
package cli
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func chatAPIBaseURL(endpoint string) string {
|
||||
if !strings.Contains(endpoint, "://") {
|
||||
endpoint = "http://" + endpoint
|
||||
}
|
||||
|
||||
u, err := url.Parse(endpoint)
|
||||
if err != nil {
|
||||
return strings.TrimRight(endpoint, "/") + "/v1"
|
||||
}
|
||||
|
||||
path := strings.TrimRight(u.Path, "/")
|
||||
if path == "" {
|
||||
u.Path = "/v1"
|
||||
} else if path != "/v1" && !strings.HasSuffix(path, "/v1") {
|
||||
u.Path = path + "/v1"
|
||||
} else {
|
||||
u.Path = path
|
||||
}
|
||||
u.RawQuery = ""
|
||||
u.Fragment = ""
|
||||
return u.String()
|
||||
}
|
||||
@@ -9,6 +9,7 @@ var CLI struct {
|
||||
cliContext.Context `embed:""`
|
||||
|
||||
Run RunCMD `cmd:"" help:"Run LocalAI, this the default command if no other command is specified. Run 'local-ai run --help' for more information" default:"withargs"`
|
||||
Chat ChatCMD `cmd:"" help:"Open an interactive chat session against a running LocalAI server"`
|
||||
Federated FederatedCLI `cmd:"" help:"Run LocalAI in federated mode"`
|
||||
Models ModelsCMD `cmd:"" help:"Manage LocalAI models and definitions"`
|
||||
Backends BackendsCMD `cmd:"" help:"Manage LocalAI backends and definitions"`
|
||||
|
||||
@@ -30,6 +30,8 @@ type RunCMD struct {
|
||||
ModelArgs []string `arg:"" optional:"" name:"models" help:"Model configuration URLs to load"`
|
||||
|
||||
ExternalBackends []string `env:"LOCALAI_EXTERNAL_BACKENDS,EXTERNAL_BACKENDS" help:"A list of external backends to load from gallery on boot" group:"backends"`
|
||||
WebRTCNAT1To1IPs []string `env:"LOCALAI_WEBRTC_NAT_1TO1_IPS,WEBRTC_NAT_1TO1_IPS" help:"IPs advertised as the host ICE candidates for /v1/realtime WebRTC instead of every local interface. Set to the reachable host/LAN IP when running under Docker host networking or NAT, where pion otherwise offers unreachable bridge addresses and the connection drops after ICE consent checks fail." group:"api"`
|
||||
WebRTCICEInterfaces []string `env:"LOCALAI_WEBRTC_ICE_INTERFACES,WEBRTC_ICE_INTERFACES" help:"Restrict /v1/realtime WebRTC ICE candidate gathering to these network interfaces (e.g. eth0), filtering out docker0/veth noise." group:"api"`
|
||||
BackendsPath string `env:"LOCALAI_BACKENDS_PATH,BACKENDS_PATH" type:"path" default:"${basepath}/backends" help:"Path containing backends used for inferencing" group:"backends"`
|
||||
BackendsSystemPath string `env:"LOCALAI_BACKENDS_SYSTEM_PATH,BACKEND_SYSTEM_PATH" type:"path" default:"/var/lib/local-ai/backends" help:"Path containing system backends used for inferencing" group:"backends"`
|
||||
ModelsPath string `env:"LOCALAI_MODELS_PATH,MODELS_PATH" type:"path" default:"${basepath}/models" help:"Path containing models used for inferencing" group:"storage"`
|
||||
@@ -225,6 +227,8 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
|
||||
config.WithApiKeys(r.APIKeys),
|
||||
config.WithModelsURL(append(r.Models, r.ModelArgs...)...),
|
||||
config.WithExternalBackends(r.ExternalBackends...),
|
||||
config.WithWebRTCNAT1To1IPs(r.WebRTCNAT1To1IPs...),
|
||||
config.WithWebRTCICEInterfaces(r.WebRTCICEInterfaces...),
|
||||
config.WithOpaqueErrors(r.OpaqueErrors),
|
||||
config.WithEnforcedPredownloadScans(!r.DisablePredownloadScan),
|
||||
config.WithSubtleKeyComparison(r.UseSubtleKeyComparison),
|
||||
@@ -652,12 +656,12 @@ func (r *RunCMD) Run(ctx *cliContext.Context) error {
|
||||
// waitForServerReady polls the given address until the HTTP server is
|
||||
// accepting connections or the context is cancelled.
|
||||
func waitForServerReady(address string, ctx context.Context) {
|
||||
// Ensure the address has a host component for dialing.
|
||||
// Echo accepts ":8080" but net.Dial needs a resolvable host.
|
||||
host, port, err := net.SplitHostPort(address)
|
||||
if err == nil && host == "" {
|
||||
address = "127.0.0.1:" + port
|
||||
}
|
||||
ticker := time.NewTicker(250 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
@@ -665,11 +669,17 @@ func waitForServerReady(address string, ctx context.Context) {
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
conn, err := net.DialTimeout("tcp", address, 500*time.Millisecond)
|
||||
if err == nil {
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,10 +12,19 @@ import (
|
||||
)
|
||||
|
||||
type ApplicationConfig struct {
|
||||
Context context.Context
|
||||
ConfigFile string
|
||||
SystemState *system.SystemState
|
||||
ExternalBackends []string
|
||||
Context context.Context
|
||||
ConfigFile string
|
||||
SystemState *system.SystemState
|
||||
ExternalBackends []string
|
||||
|
||||
// WebRTCNAT1To1IPs, when set, are advertised as the host ICE candidates for
|
||||
// /v1/realtime WebRTC instead of every local interface address. Needed when
|
||||
// the routable address differs from what pion gathers — e.g. Docker host
|
||||
// networking (where pion also offers unreachable bridge IPs) or NAT.
|
||||
WebRTCNAT1To1IPs []string
|
||||
// WebRTCICEInterfaces, when set, restricts ICE candidate gathering to these
|
||||
// network interfaces (e.g. eth0), filtering out docker0/veth noise.
|
||||
WebRTCICEInterfaces []string
|
||||
UploadLimitMB, Threads, ContextSize int
|
||||
F16 bool
|
||||
Debug bool
|
||||
@@ -81,7 +90,6 @@ type ApplicationConfig struct {
|
||||
// file is mode 0600.
|
||||
MITMCADir string
|
||||
|
||||
|
||||
// PIIPatternOverrides applies persisted per-id deltas (action,
|
||||
// disabled) to the live redactor at startup. Loaded from
|
||||
// runtime_settings.json and applied right after pii.NewRedactor.
|
||||
@@ -116,11 +124,11 @@ type ApplicationConfig struct {
|
||||
// --require-backend-integrity / LOCALAI_REQUIRE_BACKEND_INTEGRITY.
|
||||
RequireBackendIntegrity bool
|
||||
|
||||
SingleBackend bool // Deprecated: use MaxActiveBackends = 1 instead
|
||||
MaxActiveBackends int // Maximum number of active backends (0 = unlimited, 1 = single backend mode)
|
||||
WatchDogIdle bool
|
||||
WatchDogBusy bool
|
||||
WatchDog bool
|
||||
SingleBackend bool // Deprecated: use MaxActiveBackends = 1 instead
|
||||
MaxActiveBackends int // Maximum number of active backends (0 = unlimited, 1 = single backend mode)
|
||||
WatchDogIdle bool
|
||||
WatchDogBusy bool
|
||||
WatchDog bool
|
||||
|
||||
// Memory Reclaimer settings (works with GPU if available, otherwise RAM)
|
||||
MemoryReclaimerEnabled bool // Enable memory threshold monitoring
|
||||
@@ -311,6 +319,18 @@ func WithExternalBackends(backends ...string) AppOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithWebRTCNAT1To1IPs(ips ...string) AppOption {
|
||||
return func(o *ApplicationConfig) {
|
||||
o.WebRTCNAT1To1IPs = ips
|
||||
}
|
||||
}
|
||||
|
||||
func WithWebRTCICEInterfaces(interfaces ...string) AppOption {
|
||||
return func(o *ApplicationConfig) {
|
||||
o.WebRTCICEInterfaces = interfaces
|
||||
}
|
||||
}
|
||||
|
||||
func WithMachineTag(tag string) AppOption {
|
||||
return func(o *ApplicationConfig) {
|
||||
o.MachineTag = tag
|
||||
@@ -702,7 +722,6 @@ func WithMITMCADir(dir string) AppOption {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
func WithDynamicConfigDir(dynamicConfigsDir string) AppOption {
|
||||
return func(o *ApplicationConfig) {
|
||||
o.DynamicConfigsDir = dynamicConfigsDir
|
||||
|
||||
@@ -308,6 +308,41 @@ func DefaultRegistry() map[string]FieldMetaOverride {
|
||||
},
|
||||
Order: 64,
|
||||
},
|
||||
"pipeline.disable_thinking": {
|
||||
Section: "pipeline",
|
||||
Label: "Disable Thinking",
|
||||
Description: "Suppress reasoning/thinking output from the pipeline LLM (sets enable_thinking=false on the underlying model). Use for models that emit <think> blocks you don't want spoken or streamed back to the realtime client.",
|
||||
Component: "toggle",
|
||||
Order: 65,
|
||||
},
|
||||
"pipeline.streaming.llm": {
|
||||
Section: "pipeline",
|
||||
Label: "Stream LLM",
|
||||
Description: "Stream LLM tokens to the realtime client as they are generated instead of waiting for the full response. Emits incremental response.output_audio_transcript.delta / text deltas.",
|
||||
Component: "toggle",
|
||||
Order: 66,
|
||||
},
|
||||
"pipeline.streaming.tts": {
|
||||
Section: "pipeline",
|
||||
Label: "Stream TTS",
|
||||
Description: "Stream synthesized audio chunks to the realtime client as they are produced (requires a TTS backend that implements TTSStream). Falls back to unary synthesis otherwise.",
|
||||
Component: "toggle",
|
||||
Order: 67,
|
||||
},
|
||||
"pipeline.streaming.transcription": {
|
||||
Section: "pipeline",
|
||||
Label: "Stream Transcription",
|
||||
Description: "Stream partial transcription text to the realtime client as the STT backend produces it (requires a transcription backend that implements AudioTranscriptionStream). Falls back to unary transcription otherwise.",
|
||||
Component: "toggle",
|
||||
Order: 68,
|
||||
},
|
||||
"pipeline.streaming.clause_chunking": {
|
||||
Section: "pipeline",
|
||||
Label: "Clause Chunking",
|
||||
Description: "Split the streamed reply into speakable clauses and synthesize each as soon as it completes, instead of buffering the whole message before TTS — lower time-to-first-audio. Script-aware (handles CJK 。!? and Thai/Lao spaces), so it does not whitespace-split. Requires Stream LLM; off buffers the whole message.",
|
||||
Component: "toggle",
|
||||
Order: 69,
|
||||
},
|
||||
|
||||
// --- Functions ---
|
||||
"function.grammar.parallel_calls": {
|
||||
|
||||
@@ -499,6 +499,16 @@ type Pipeline struct {
|
||||
// the pipeline's LLM without editing the LLM model config. Overrides the LLM's
|
||||
// own reasoning_effort. Unset leaves the LLM model config in charge.
|
||||
ReasoningEffort string `yaml:"reasoning_effort,omitempty" json:"reasoning_effort,omitempty"`
|
||||
|
||||
// Streaming opts each pipeline stage into incremental delivery (LLM tokens,
|
||||
// TTS audio chunks, transcription text). Unset stages keep the blocking
|
||||
// unary path, so existing configs are unaffected.
|
||||
Streaming PipelineStreaming `yaml:"streaming,omitempty" json:"streaming,omitempty"`
|
||||
|
||||
// DisableThinking suppresses reasoning/thinking for the pipeline LLM (maps
|
||||
// to enable_thinking=false backend metadata) without editing the underlying
|
||||
// LLM model config. Unset leaves the LLM model config in charge.
|
||||
DisableThinking *bool `yaml:"disable_thinking,omitempty" json:"disable_thinking,omitempty"`
|
||||
}
|
||||
|
||||
// ApplyReasoningEffort resolves the effective reasoning effort — a per-request
|
||||
@@ -530,6 +540,41 @@ func (c *ModelConfig) ApplyReasoningEffort(requestEffort string) {
|
||||
}
|
||||
}
|
||||
|
||||
// @Description PipelineStreaming toggles incremental delivery per realtime stage.
|
||||
type PipelineStreaming struct {
|
||||
LLM *bool `yaml:"llm,omitempty" json:"llm,omitempty"`
|
||||
TTS *bool `yaml:"tts,omitempty" json:"tts,omitempty"`
|
||||
Transcription *bool `yaml:"transcription,omitempty" json:"transcription,omitempty"`
|
||||
// ClauseChunking splits the streamed LLM reply into speakable clauses and
|
||||
// synthesizes each as soon as it completes, instead of buffering the whole
|
||||
// message before TTS. Script-aware (CJK/Thai), so it does not rely on
|
||||
// whitespace sentence boundaries. Requires LLM streaming; unset buffers the
|
||||
// whole message (today's default).
|
||||
ClauseChunking *bool `yaml:"clause_chunking,omitempty" json:"clause_chunking,omitempty"`
|
||||
}
|
||||
|
||||
// StreamLLM reports whether LLM tokens should be streamed for this pipeline.
|
||||
func (p Pipeline) StreamLLM() bool { return p.Streaming.LLM != nil && *p.Streaming.LLM }
|
||||
|
||||
// StreamTTS reports whether TTS audio should be streamed for this pipeline.
|
||||
func (p Pipeline) StreamTTS() bool { return p.Streaming.TTS != nil && *p.Streaming.TTS }
|
||||
|
||||
// StreamTranscription reports whether transcription text should be streamed.
|
||||
func (p Pipeline) StreamTranscription() bool {
|
||||
return p.Streaming.Transcription != nil && *p.Streaming.Transcription
|
||||
}
|
||||
|
||||
// ChunkClauses reports whether the streamed reply should be split into
|
||||
// script-aware clauses and synthesized incrementally rather than buffered whole.
|
||||
func (p Pipeline) ChunkClauses() bool {
|
||||
return p.Streaming.ClauseChunking != nil && *p.Streaming.ClauseChunking
|
||||
}
|
||||
|
||||
// ThinkingDisabled reports whether the pipeline forces the LLM's thinking off.
|
||||
func (p Pipeline) ThinkingDisabled() bool {
|
||||
return p.DisableThinking != nil && *p.DisableThinking
|
||||
}
|
||||
|
||||
// @Description File configuration for model downloads
|
||||
type File struct {
|
||||
Filename string `yaml:"filename,omitempty" json:"filename,omitempty"`
|
||||
|
||||
57
core/config/pipeline_streaming_test.go
Normal file
57
core/config/pipeline_streaming_test.go
Normal file
@@ -0,0 +1,57 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
// The realtime pipeline can stream each stage (LLM tokens, TTS audio,
|
||||
// transcription text) and can disable model "thinking" for the LLM. These are
|
||||
// opt-in per pipeline; everything defaults to off so existing configs keep the
|
||||
// unary behaviour.
|
||||
var _ = Describe("Pipeline streaming config", func() {
|
||||
It("defaults every streaming + thinking helper to false when unset", func() {
|
||||
var p Pipeline
|
||||
Expect(p.StreamLLM()).To(BeFalse())
|
||||
Expect(p.StreamTTS()).To(BeFalse())
|
||||
Expect(p.StreamTranscription()).To(BeFalse())
|
||||
Expect(p.ChunkClauses()).To(BeFalse())
|
||||
Expect(p.ThinkingDisabled()).To(BeFalse())
|
||||
})
|
||||
|
||||
It("parses the nested streaming block and disable_thinking from YAML", func() {
|
||||
var c ModelConfig
|
||||
err := yaml.Unmarshal([]byte(`
|
||||
name: gpt-realtime
|
||||
pipeline:
|
||||
llm: my-llm
|
||||
tts: my-tts
|
||||
transcription: my-stt
|
||||
streaming:
|
||||
llm: true
|
||||
tts: true
|
||||
transcription: true
|
||||
clause_chunking: true
|
||||
disable_thinking: true
|
||||
`), &c)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(c.Pipeline.StreamLLM()).To(BeTrue())
|
||||
Expect(c.Pipeline.StreamTTS()).To(BeTrue())
|
||||
Expect(c.Pipeline.StreamTranscription()).To(BeTrue())
|
||||
Expect(c.Pipeline.ChunkClauses()).To(BeTrue())
|
||||
Expect(c.Pipeline.ThinkingDisabled()).To(BeTrue())
|
||||
})
|
||||
|
||||
It("treats an explicit false in the streaming block as disabled", func() {
|
||||
var c ModelConfig
|
||||
err := yaml.Unmarshal([]byte(`
|
||||
name: gpt-realtime
|
||||
pipeline:
|
||||
streaming:
|
||||
tts: false
|
||||
`), &c)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(c.Pipeline.StreamTTS()).To(BeFalse())
|
||||
})
|
||||
})
|
||||
@@ -103,7 +103,12 @@ func applyAutoparserOverride(
|
||||
// blocks like "<think></think>" that some models emit when reasoning
|
||||
// is disabled.
|
||||
if deltaReasoning == "" && deltaContent != "" {
|
||||
deltaReasoning, deltaContent = reason.ExtractReasoningWithConfig(deltaContent, thinkingStartToken, reasoningConfig)
|
||||
// Complete-response extraction: only honor a prefilled <think> start
|
||||
// token when deltaContent actually closes the reasoning block. Without
|
||||
// it the model answered directly and the whole answer must stay in
|
||||
// content rather than be swallowed as unclosed reasoning. See
|
||||
// reason.ExtractReasoningComplete.
|
||||
deltaReasoning, deltaContent = reason.ExtractReasoningComplete(deltaContent, thinkingStartToken, reasoningConfig)
|
||||
}
|
||||
xlog.Debug("[ChatDeltas] non-SSE no-tools: overriding result with C++ autoparser deltas",
|
||||
"content_len", len(deltaContent), "reasoning_len", len(deltaReasoning))
|
||||
|
||||
@@ -186,6 +186,114 @@ var _ = Describe("applyAutoparserOverride", func() {
|
||||
Expect(result).To(Equal(existing))
|
||||
})
|
||||
})
|
||||
|
||||
// Regression tests for the prefilled-thinking-token path (thinkingStartToken
|
||||
// != ""). This is the configuration the gallery qwen3 family runs in: the
|
||||
// chat template injects <think> into the prompt, so DetectThinkingStartToken
|
||||
// returns "<think>" and the model's output begins *inside* a reasoning block
|
||||
// — it emits a closing </think> but no opening tag.
|
||||
//
|
||||
// The defensive Go-side fallback prepends the start token so the standard
|
||||
// extractor can pair it with the model's </think>. But on a *complete*
|
||||
// response that contains NO closing tag (the model answered directly with no
|
||||
// reasoning at all), prepending <think> manufactures an unclosed block that
|
||||
// swallows the entire answer into reasoning, leaving content empty. That is
|
||||
// the bug: short/direct answers (session names, JSON summaries) come back
|
||||
// with an empty content field.
|
||||
Context("autoparser delivered content with empty reasoning and a prefilled thinking token", func() {
|
||||
const startToken = "<think>"
|
||||
|
||||
It("keeps a tag-less direct answer as content instead of swallowing it as reasoning", func() {
|
||||
// Model answered directly: no <think>, no </think> anywhere.
|
||||
chatDeltas := []*pb.ChatDelta{
|
||||
{Content: "hello", ReasoningContent: ""},
|
||||
}
|
||||
|
||||
result := applyAutoparserOverride(chatDeltas, startToken, reason.Config{}, nil)
|
||||
|
||||
Expect(result).To(HaveLen(1))
|
||||
Expect(result[0].Message.Content).ToNot(BeNil())
|
||||
Expect(*(result[0].Message.Content.(*string))).To(Equal("hello"),
|
||||
"a complete answer with no closing reasoning tag must stay in content")
|
||||
Expect(result[0].Message.Reasoning).To(BeNil(),
|
||||
"no reasoning block was emitted, so Reasoning must not be set")
|
||||
})
|
||||
|
||||
It("keeps a tag-less JSON answer as content (the summary case)", func() {
|
||||
raw := `{"short":"Tests pass","long":"go test ./... succeeded."}`
|
||||
chatDeltas := []*pb.ChatDelta{
|
||||
{Content: raw, ReasoningContent: ""},
|
||||
}
|
||||
|
||||
result := applyAutoparserOverride(chatDeltas, startToken, reason.Config{}, nil)
|
||||
|
||||
Expect(result).To(HaveLen(1))
|
||||
Expect(*(result[0].Message.Content.(*string))).To(Equal(raw))
|
||||
Expect(result[0].Message.Reasoning).To(BeNil())
|
||||
})
|
||||
|
||||
It("still splits reasoning when the model emits the closing tag (prefill paired with </think>)", func() {
|
||||
// The legitimate prefill case: <think> was in the prompt, so the
|
||||
// output carries only the closing tag. The closing tag is the proof
|
||||
// that a reasoning block exists, so extraction must run.
|
||||
raw := "The user wants a greeting.\n</think>\n\nHello there!"
|
||||
chatDeltas := []*pb.ChatDelta{
|
||||
{Content: raw, ReasoningContent: ""},
|
||||
}
|
||||
|
||||
result := applyAutoparserOverride(chatDeltas, startToken, reason.Config{}, nil)
|
||||
|
||||
Expect(result).To(HaveLen(1))
|
||||
content := *(result[0].Message.Content.(*string))
|
||||
Expect(content).To(ContainSubstring("Hello there!"))
|
||||
Expect(content).ToNot(ContainSubstring("</think>"))
|
||||
Expect(content).ToNot(ContainSubstring("The user wants a greeting"))
|
||||
Expect(result[0].Message.Reasoning).ToNot(BeNil())
|
||||
Expect(*result[0].Message.Reasoning).To(ContainSubstring("The user wants a greeting"))
|
||||
})
|
||||
|
||||
It("still splits a fully-tagged <think>…</think> block with a prefill token set", func() {
|
||||
raw := "<think>Reasoning here.</think>Final answer."
|
||||
chatDeltas := []*pb.ChatDelta{
|
||||
{Content: raw, ReasoningContent: ""},
|
||||
}
|
||||
|
||||
result := applyAutoparserOverride(chatDeltas, startToken, reason.Config{}, nil)
|
||||
|
||||
Expect(result).To(HaveLen(1))
|
||||
Expect(*(result[0].Message.Content.(*string))).To(Equal("Final answer."))
|
||||
Expect(result[0].Message.Reasoning).ToNot(BeNil())
|
||||
Expect(*result[0].Message.Reasoning).To(ContainSubstring("Reasoning here"))
|
||||
})
|
||||
|
||||
// End-to-end regression for the real production failure: a request with
|
||||
// enable_thinking=false against a <think>-capable model (qwen3 family).
|
||||
//
|
||||
// In non-thinking mode the model emits no reasoning block, so llama.cpp's
|
||||
// autoparser correctly returns ChatDeltas with Content set and
|
||||
// ReasoningContent EMPTY (verified against stock llama-server: the same
|
||||
// model with chat_template_kwargs.enable_thinking=false returns
|
||||
// reasoning_content=null and content="hello"). But thinkingStartToken is
|
||||
// detected per-model from the enable_thinking=TRUE render
|
||||
// (grpc-server renders with enable_thinking=true; DetectThinkingStartToken
|
||||
// does not evaluate the jinja {% if enable_thinking %} conditional), so it
|
||||
// is "<think>" even for this non-thinking request. The old code prepended
|
||||
// it and swallowed the answer. This is the case that broke session
|
||||
// summaries and auto-titles and was NOT covered before.
|
||||
It("preserves content for a non-thinking-mode request (enable_thinking=false, empty reasoning_content)", func() {
|
||||
// What llama.cpp's autoparser actually returns in non-thinking mode.
|
||||
chatDeltas := []*pb.ChatDelta{
|
||||
{Content: `{"short":"Go tests passed for internal/session"}`, ReasoningContent: ""},
|
||||
}
|
||||
|
||||
result := applyAutoparserOverride(chatDeltas, startToken, reason.Config{}, nil)
|
||||
|
||||
Expect(result).To(HaveLen(1))
|
||||
Expect(*(result[0].Message.Content.(*string))).To(Equal(`{"short":"Go tests passed for internal/session"}`),
|
||||
"non-thinking-mode answers must reach the client intact, not be swallowed as reasoning")
|
||||
Expect(result[0].Message.Reasoning).To(BeNil())
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("mergeToolCallDeltas", func() {
|
||||
|
||||
@@ -2,8 +2,10 @@ package openai
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/base64"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
@@ -235,6 +237,12 @@ type Model interface {
|
||||
Transcribe(ctx context.Context, audio, language string, translate bool, diarize bool, prompt string) (*schema.TranscriptionResult, error)
|
||||
Predict(ctx context.Context, messages schema.Messages, images, videos, audios []string, tokenCallback func(string, backend.TokenUsage) bool, tools []types.ToolUnion, toolChoice *types.ToolChoiceUnion, logprobs *int, topLogprobs *int, logitBias map[string]float64) (func() (backend.LLMResponse, error), error)
|
||||
TTS(ctx context.Context, text, voice, language string) (string, *proto.Result, error)
|
||||
// TTSStream synthesizes speech incrementally, invoking onAudio with raw PCM
|
||||
// chunks (and the backend sample rate) as they are produced.
|
||||
TTSStream(ctx context.Context, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error
|
||||
// TranscribeStream transcribes audio incrementally, invoking onDelta for each
|
||||
// transcript text fragment and returning the final aggregated result.
|
||||
TranscribeStream(ctx context.Context, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error)
|
||||
PredictConfig() *config.ModelConfig
|
||||
}
|
||||
|
||||
@@ -1254,27 +1262,15 @@ func commitUtterance(ctx context.Context, utt []byte, session *Session, conv *Co
|
||||
// TODO: If we have a real any-to-any model then transcription is optional
|
||||
var transcript string
|
||||
if session.InputAudioTranscription != nil {
|
||||
tr, err := session.ModelInterface.Transcribe(ctx, f.Name(), session.InputAudioTranscription.Language, false, false, session.InputAudioTranscription.Prompt)
|
||||
// emitTranscription streams transcript deltas when
|
||||
// pipeline.streaming.transcription is set, otherwise emits a single
|
||||
// completed event; either way it returns the final transcript text.
|
||||
var err error
|
||||
transcript, err = emitTranscription(ctx, t, session, generateItemID(), f.Name())
|
||||
if err != nil {
|
||||
sendError(t, "transcription_failed", err.Error(), "", "event_TODO")
|
||||
return
|
||||
} else if tr == nil {
|
||||
sendError(t, "transcription_failed", "trancribe result is nil", "", "event_TODO")
|
||||
return
|
||||
}
|
||||
|
||||
transcript = tr.Text
|
||||
sendEvent(t, types.ConversationItemInputAudioTranscriptionCompletedEvent{
|
||||
ServerEventBase: types.ServerEventBase{
|
||||
EventID: "event_TODO",
|
||||
},
|
||||
|
||||
ItemID: generateItemID(),
|
||||
// ResponseID: "resp_TODO", // Not needed for transcription completed event
|
||||
// OutputIndex: 0,
|
||||
ContentIndex: 0,
|
||||
Transcript: transcript,
|
||||
})
|
||||
} else {
|
||||
sendNotImplemented(t, "any-to-any models")
|
||||
return
|
||||
@@ -1502,6 +1498,26 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
|
||||
},
|
||||
})
|
||||
|
||||
// Streamed LLM path: when the pipeline opts into LLM streaming, stream the
|
||||
// transcript to the client as it is generated and synthesize the buffered
|
||||
// message once. Tool turns are supported only when the model uses its
|
||||
// tokenizer template: the C++ autoparser then delivers content and tool
|
||||
// calls via ChatDeltas (clearing the text stream), so the spoken transcript
|
||||
// never leaks tool-call tokens. Grammar-based function calling emits the
|
||||
// call as JSON in the token stream, so those turns keep the buffered path.
|
||||
if config != nil && session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamLLM() {
|
||||
canStream := len(tools) == 0 || config.TemplateConfig.UseTokenizerTemplate
|
||||
var respMods []types.Modality
|
||||
if overrides != nil {
|
||||
respMods = overrides.OutputModalities
|
||||
}
|
||||
if canStream && modalitiesContainAudio(resolveOutputModalities(session.OutputModalities, respMods)) {
|
||||
if streamLLMResponse(ctx, session, conv, t, responseID, conversationHistory, images, config, tools, toolChoice, toolTurn) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
predFunc, err := session.ModelInterface.Predict(ctx, conversationHistory, images, nil, nil, nil, tools, toolChoice, nil, nil, nil)
|
||||
if err != nil {
|
||||
sendError(t, "inference_failed", fmt.Sprintf("backend error: %v", err), "", "") // item.Assistant.ID is unknown here
|
||||
@@ -1579,7 +1595,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
|
||||
// ExtractReasoningWithConfig is a no-op when no tag pair matches,
|
||||
// so it's safe to apply unconditionally in the no-reasoning branch.
|
||||
if deltaReasoning == "" && deltaContent != "" {
|
||||
deltaReasoning, deltaContent = reasoning.ExtractReasoningWithConfig(deltaContent, thinkingStartToken, config.ReasoningConfig)
|
||||
deltaReasoning, deltaContent = reasoning.ExtractReasoningComplete(deltaContent, thinkingStartToken, spokenReasoningConfig(config.ReasoningConfig))
|
||||
}
|
||||
reasoningText = deltaReasoning
|
||||
responseWithoutReasoning = deltaContent
|
||||
@@ -1587,7 +1603,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
|
||||
cleanedResponse = deltaContent
|
||||
toolCalls = deltaToolCalls
|
||||
} else {
|
||||
reasoningText, responseWithoutReasoning = reasoning.ExtractReasoningWithConfig(rawResponse, thinkingStartToken, config.ReasoningConfig)
|
||||
reasoningText, responseWithoutReasoning = reasoning.ExtractReasoningComplete(rawResponse, thinkingStartToken, spokenReasoningConfig(config.ReasoningConfig))
|
||||
textContent = functions.ParseTextContent(responseWithoutReasoning, config.FunctionsConfig)
|
||||
cleanedResponse = functions.CleanupLLMResult(responseWithoutReasoning, config.FunctionsConfig)
|
||||
toolCalls = functions.ParseFunctionCall(cleanedResponse, config.FunctionsConfig)
|
||||
@@ -1713,64 +1729,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
|
||||
return
|
||||
}
|
||||
|
||||
audioFilePath, res, err := session.ModelInterface.TTS(ctx, finalSpeech, session.Voice, session.InputAudioTranscription.Language)
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
xlog.Debug("TTS cancelled (barge-in)")
|
||||
sendCancelledResponse()
|
||||
return
|
||||
}
|
||||
xlog.Error("TTS failed", "error", err)
|
||||
sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID)
|
||||
return
|
||||
}
|
||||
if !res.Success {
|
||||
xlog.Error("TTS failed", "message", res.Message)
|
||||
sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %s", res.Message), "", item.Assistant.ID)
|
||||
return
|
||||
}
|
||||
defer func() { _ = os.Remove(audioFilePath) }()
|
||||
|
||||
audioBytes, err := os.ReadFile(audioFilePath)
|
||||
if err != nil {
|
||||
xlog.Error("failed to read TTS file", "error", err)
|
||||
sendError(t, "tts_error", fmt.Sprintf("Failed to read TTS audio: %v", err), "", item.Assistant.ID)
|
||||
return
|
||||
}
|
||||
|
||||
// Parse WAV header to get raw PCM and the actual sample rate from the TTS backend.
|
||||
pcmData, ttsSampleRate := laudio.ParseWAV(audioBytes)
|
||||
if ttsSampleRate == 0 {
|
||||
ttsSampleRate = localSampleRate
|
||||
}
|
||||
xlog.Debug("TTS audio parsed", "raw_bytes", len(audioBytes), "pcm_bytes", len(pcmData), "sample_rate", ttsSampleRate)
|
||||
|
||||
// SendAudio (WebRTC) passes PCM at the TTS sample rate directly to the
|
||||
// Opus encoder, which resamples to 48kHz internally. This avoids a
|
||||
// lossy intermediate resample through 16kHz.
|
||||
// XXX: This is a noop in websocket mode; it's included in the JSON instead
|
||||
if err := t.SendAudio(ctx, pcmData, ttsSampleRate); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
xlog.Debug("Audio playback cancelled (barge-in)")
|
||||
sendCancelledResponse()
|
||||
return
|
||||
}
|
||||
xlog.Error("failed to send audio via transport", "error", err)
|
||||
}
|
||||
|
||||
// For WebSocket clients, resample to the session's output rate and
|
||||
// deliver audio as base64 in JSON events. WebRTC clients already
|
||||
// received audio over the RTP track, so skip the base64 payload.
|
||||
if !isWebRTC {
|
||||
wsPCM := pcmData
|
||||
if ttsSampleRate != session.OutputSampleRate {
|
||||
samples := sound.BytesToInt16sLE(pcmData)
|
||||
resampled := sound.ResampleInt16(samples, ttsSampleRate, session.OutputSampleRate)
|
||||
wsPCM = sound.Int16toBytesLE(resampled)
|
||||
}
|
||||
audioString = base64.StdEncoding.EncodeToString(wsPCM)
|
||||
}
|
||||
|
||||
// Transcript of the spoken reply (the audio's text).
|
||||
sendEvent(t, types.ResponseOutputAudioTranscriptDeltaEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: responseID,
|
||||
@@ -1788,15 +1747,26 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
|
||||
Transcript: finalSpeech,
|
||||
})
|
||||
|
||||
// Synthesize and send the audio. With pipeline.streaming.tts enabled
|
||||
// emitSpeech forwards a response.output_audio.delta per backend PCM
|
||||
// chunk as it's produced; otherwise it sends the whole utterance as a
|
||||
// single delta. The returned PCM is stored (base64) on the item below.
|
||||
pcmAudio, err := emitSpeech(ctx, t, session, responseID, item.Assistant.ID, finalSpeech)
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
xlog.Debug("TTS cancelled (barge-in)")
|
||||
sendCancelledResponse()
|
||||
return
|
||||
}
|
||||
xlog.Error("TTS failed", "error", err)
|
||||
sendError(t, "tts_error", fmt.Sprintf("TTS generation failed: %v", err), "", item.Assistant.ID)
|
||||
return
|
||||
}
|
||||
if !isWebRTC {
|
||||
audioString = base64.StdEncoding.EncodeToString(pcmAudio)
|
||||
}
|
||||
|
||||
if !isWebRTC {
|
||||
sendEvent(t, types.ResponseOutputAudioDeltaEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: responseID,
|
||||
ItemID: item.Assistant.ID,
|
||||
OutputIndex: 0,
|
||||
ContentIndex: 0,
|
||||
Delta: audioString,
|
||||
})
|
||||
sendEvent(t, types.ResponseOutputAudioDoneEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: responseID,
|
||||
@@ -1849,17 +1819,27 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
|
||||
})
|
||||
}
|
||||
|
||||
// Handle Tool Calls. Two paths:
|
||||
// - LocalAI Assistant tools (session.AssistantExecutor.IsTool) run
|
||||
// server-side; we append both the call and its output to conv.Items
|
||||
// and re-trigger a follow-up response so the model can speak the
|
||||
// result. The client only sees observability events.
|
||||
// - All other tools follow the standard OpenAI flow: emit
|
||||
// function_call_arguments.done and wait for the client to send
|
||||
// conversation.item.create back.
|
||||
xlog.Debug("About to handle tool calls", "finalToolCallsCount", len(finalToolCalls))
|
||||
// Emit the parsed tool calls, the terminal response.done, and (for
|
||||
// server-side assistant tools) the follow-up response. Shared with the
|
||||
// streamed path so both finalize tool calls identically.
|
||||
emitToolCallItems(ctx, session, conv, t, responseID, finalToolCalls, finalSpeech != "", toolTurn)
|
||||
}
|
||||
|
||||
// emitToolCallItems emits the realtime function_call items for the parsed tool
|
||||
// calls, the terminal response.done, and — for server-side LocalAI Assistant
|
||||
// tools — re-triggers a follow-up response so the model can speak the result.
|
||||
// hasContent shifts the tool-call output index past the assistant content item
|
||||
// when the same turn also produced spoken/text content. Two tool paths:
|
||||
// - LocalAI Assistant tools (session.AssistantExecutor.IsTool) run server-side;
|
||||
// we append both the call and its output to conv.Items and re-trigger. The
|
||||
// client only sees observability events.
|
||||
// - All other tools follow the standard OpenAI flow: emit
|
||||
// function_call_arguments.done and wait for the client to send
|
||||
// conversation.item.create back.
|
||||
func emitToolCallItems(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, toolCalls []functions.FuncCallResults, hasContent bool, toolTurn int) {
|
||||
xlog.Debug("About to handle tool calls", "finalToolCallsCount", len(toolCalls))
|
||||
executedAssistantTool := false
|
||||
for i, tc := range finalToolCalls {
|
||||
for i, tc := range toolCalls {
|
||||
toolCallID := generateItemID()
|
||||
callID := "call_" + generateUniqueID() // OpenAI uses call_xyz
|
||||
|
||||
@@ -1879,7 +1859,7 @@ func triggerResponseAtTurn(ctx context.Context, session *Session, conv *Conversa
|
||||
conv.Lock.Unlock()
|
||||
|
||||
outputIndex := i
|
||||
if finalSpeech != "" {
|
||||
if hasContent {
|
||||
outputIndex++
|
||||
}
|
||||
|
||||
@@ -2005,8 +1985,11 @@ func generateItemID() string {
|
||||
}
|
||||
|
||||
func generateUniqueID() string {
|
||||
// Generate a unique ID string
|
||||
// For simplicity, use a counter or UUID
|
||||
// Implement as needed
|
||||
return "unique_id"
|
||||
// 16 random bytes, hex-encoded. Must be collision-free: session, item,
|
||||
// response and call IDs build on this, and the conversation tracks/removes
|
||||
// items by ID (e.g. cancel() in realtime_stream.go, conversation.item.retrieve).
|
||||
// A constant would make every ID alias and corrupt that bookkeeping.
|
||||
var b [16]byte
|
||||
_, _ = rand.Read(b[:])
|
||||
return hex.EncodeToString(b[:])
|
||||
}
|
||||
|
||||
200
core/http/endpoints/openai/realtime_chunker.go
Normal file
200
core/http/endpoints/openai/realtime_chunker.go
Normal file
@@ -0,0 +1,200 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"unicode"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/rivo/uniseg"
|
||||
)
|
||||
|
||||
// Default clause-chunker bounds (in runes). minRunes gates only sub-sentence
|
||||
// (clause-mark / Thai-space) cuts so we don't synthesize tiny choppy fragments;
|
||||
// full sentences always flush regardless of length. maxRunes caps an
|
||||
// unterminated run so a long punctuation-less span doesn't buffer unbounded.
|
||||
const (
|
||||
defaultClauseMinRunes = 12
|
||||
defaultClauseMaxRunes = 200
|
||||
)
|
||||
|
||||
// clauseChunker splits streamed LLM content into speakable clauses for
|
||||
// incremental TTS, in a SCRIPT-AWARE way so it works for languages without
|
||||
// whitespace word boundaries. It leans on UAX #29 sentence segmentation (which
|
||||
// natively terminates on CJK 。!? as well as Latin .!?), adds CJK clause
|
||||
// punctuation (,、;:) and Thai/Lao spaces as finer boundaries, and caps an
|
||||
// over-long unterminated run via UAX #14 line-break opportunities.
|
||||
//
|
||||
// Unlike the old ASCII .!?/newline segmenter (dropped in 076dcdbe), it does not
|
||||
// degrade to whole-message buffering for CJK (handled natively) or Thai/Lao
|
||||
// (handled via spaces, which Thai uses at clause/sentence boundaries). Scripts
|
||||
// that genuinely need a dictionary (Khmer/Myanmar) simply stay buffered until a
|
||||
// space or end-of-message — no worse than the buffered default.
|
||||
//
|
||||
// It is not safe for concurrent use; callers feed it from a single goroutine
|
||||
// (the LLM token callback).
|
||||
type clauseChunker struct {
|
||||
buf strings.Builder
|
||||
minRunes int
|
||||
maxRunes int
|
||||
}
|
||||
|
||||
func newClauseChunker(minRunes, maxRunes int) *clauseChunker {
|
||||
return &clauseChunker{minRunes: minRunes, maxRunes: maxRunes}
|
||||
}
|
||||
|
||||
// push appends streamed content and returns any clauses that are now complete —
|
||||
// "complete" meaning confirmed by following content, so we never speak a clause
|
||||
// that the next token might extend. Incomplete trailing text stays buffered.
|
||||
func (c *clauseChunker) push(text string) []string {
|
||||
c.buf.WriteString(text)
|
||||
return c.drain(false)
|
||||
}
|
||||
|
||||
// flush returns the remaining buffered clauses, treating end-of-input as a hard
|
||||
// boundary, and clears the buffer.
|
||||
func (c *clauseChunker) flush() []string {
|
||||
return c.drain(true)
|
||||
}
|
||||
|
||||
func (c *clauseChunker) drain(final bool) []string {
|
||||
s := c.buf.String()
|
||||
rest := s
|
||||
var out []string
|
||||
for rest != "" {
|
||||
end, ok := c.nextBoundary(rest, final)
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if seg := strings.TrimSpace(rest[:end]); seg != "" {
|
||||
out = append(out, seg)
|
||||
}
|
||||
rest = rest[end:]
|
||||
}
|
||||
// Rewriting the builder reallocates and copies the whole buffer; skip it on
|
||||
// the common per-token call where no boundary was confirmed.
|
||||
if len(rest) != len(s) {
|
||||
c.buf.Reset()
|
||||
c.buf.WriteString(rest)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// nextBoundary returns the byte offset just past the first emittable clause in
|
||||
// s, or ok=false when more input is needed (final=false) and no boundary is
|
||||
// confirmed yet.
|
||||
func (c *clauseChunker) nextBoundary(s string, final bool) (int, bool) {
|
||||
if s == "" {
|
||||
return 0, false
|
||||
}
|
||||
|
||||
// 1) UAX #29 sentence boundary. When the first sentence is followed by more
|
||||
// text it is a confirmed complete sentence (handles Latin .!? with
|
||||
// abbreviation/decimal guards, and CJK 。!? with no whitespace).
|
||||
sentence, rest, _ := uniseg.FirstSentenceInString(s, -1)
|
||||
if rest != "" {
|
||||
// Optionally cut finer inside the sentence at a clause boundary.
|
||||
if cut, ok := c.firstClauseCut(sentence); ok {
|
||||
return cut, true
|
||||
}
|
||||
return len(sentence), true
|
||||
}
|
||||
|
||||
// 2) Unterminated tail: look for a sub-sentence clause boundary (CJK
|
||||
// punctuation or a Thai/Lao space) confirmed by following content.
|
||||
if cut, ok := c.firstClauseCut(s); ok {
|
||||
return cut, true
|
||||
}
|
||||
|
||||
// 3) Over-long punctuation-less run: force a typographically legal break so
|
||||
// we don't buffer unbounded (e.g. a long CJK run with no punctuation).
|
||||
if !final && c.maxRunes > 0 && utf8.RuneCountInString(s) > c.maxRunes {
|
||||
if cut, ok := lineBreakCut(s, c.maxRunes); ok {
|
||||
return cut, true
|
||||
}
|
||||
}
|
||||
|
||||
// 4) End of input: emit whatever remains as the final clause.
|
||||
if final {
|
||||
return len(s), true
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
// firstClauseCut returns the byte offset just past the first sub-sentence clause
|
||||
// boundary in s — a CJK clause punctuation mark, or a space following a Thai/Lao
|
||||
// letter — provided the prefix is at least minRunes long and non-space content
|
||||
// follows. The boundary mark (and any trailing spaces) stay with the left clause.
|
||||
func (c *clauseChunker) firstClauseCut(s string) (int, bool) {
|
||||
var prev rune
|
||||
runes := 0
|
||||
for i, r := range s {
|
||||
boundary := isCJKClausePunct(r) || (unicode.IsSpace(r) && isThaiLao(prev))
|
||||
if boundary && runes+1 >= c.minRunes {
|
||||
end := i + utf8.RuneLen(r)
|
||||
for end < len(s) {
|
||||
nr, sz := utf8.DecodeRuneInString(s[end:])
|
||||
if !unicode.IsSpace(nr) {
|
||||
break
|
||||
}
|
||||
end += sz
|
||||
}
|
||||
if end < len(s) { // confirmed: real content follows the boundary
|
||||
return end, true
|
||||
}
|
||||
// Boundary sits at the end of the buffer with nothing after it yet —
|
||||
// wait for the next token to confirm it rather than emit early.
|
||||
return 0, false
|
||||
}
|
||||
prev = r
|
||||
runes++
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
// lineBreakCut walks UAX #14 line segments and returns the byte offset of the
|
||||
// last legal break opportunity at or before maxRunes. Returns ok=false when the
|
||||
// run has no internal break opportunity (e.g. a space-less Thai run), leaving it
|
||||
// buffered.
|
||||
func lineBreakCut(s string, maxRunes int) (int, bool) {
|
||||
state := -1
|
||||
rest := s
|
||||
consumed := 0
|
||||
runes := 0
|
||||
for rest != "" {
|
||||
seg, rem, _, st := uniseg.FirstLineSegmentInString(rest, state)
|
||||
state = st
|
||||
runes += utf8.RuneCountInString(seg)
|
||||
consumed += len(seg)
|
||||
rest = rem
|
||||
if runes >= maxRunes {
|
||||
if consumed < len(s) {
|
||||
return consumed, true
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
// isCJKClausePunct reports whether r is a CJK clause-level separator worth a
|
||||
// soft TTS break. Sentence terminators (。!?) are intentionally excluded — UAX
|
||||
// #29 sentence segmentation already handles those.
|
||||
func isCJKClausePunct(r rune) bool {
|
||||
switch r {
|
||||
case ',', // , fullwidth comma
|
||||
'、', // 、 ideographic comma
|
||||
';', // ; fullwidth semicolon
|
||||
':', // : fullwidth colon
|
||||
'・', // ・ katakana middle dot
|
||||
'・': // ・ halfwidth katakana middle dot
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// isThaiLao reports whether r is a Thai or Lao letter. Those scripts have no
|
||||
// inter-word spaces; an ASCII space inside such a run marks a clause/sentence
|
||||
// boundary, which is the only no-dictionary segmentation signal available.
|
||||
func isThaiLao(r rune) bool {
|
||||
return unicode.Is(unicode.Thai, r) || unicode.Is(unicode.Lao, r)
|
||||
}
|
||||
103
core/http/endpoints/openai/realtime_chunker_test.go
Normal file
103
core/http/endpoints/openai/realtime_chunker_test.go
Normal file
@@ -0,0 +1,103 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"unicode/utf8"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
// clauseChunker splits streamed LLM content into speakable clauses in a
|
||||
// script-aware way: UAX#29 sentences (Latin .!? and CJK 。!?), CJK clause
|
||||
// punctuation, and Thai/Lao spaces — never whitespace-splitting CJK.
|
||||
var _ = Describe("clauseChunker", func() {
|
||||
Context("Latin sentences", func() {
|
||||
It("emits a sentence only once following content confirms it is complete", func() {
|
||||
c := newClauseChunker(12, 200)
|
||||
Expect(c.push("Hello world. How are you?")).To(Equal([]string{"Hello world."}))
|
||||
// The trailing sentence is held until flush (the next token might extend it).
|
||||
Expect(c.flush()).To(Equal([]string{"How are you?"}))
|
||||
})
|
||||
|
||||
It("assembles a sentence across many small tokens", func() {
|
||||
c := newClauseChunker(12, 200)
|
||||
var got []string
|
||||
for _, tok := range []string{"Hello", " world.", " How", " are", " you?"} {
|
||||
got = append(got, c.push(tok)...)
|
||||
}
|
||||
got = append(got, c.flush()...)
|
||||
Expect(got).To(Equal([]string{"Hello world.", "How are you?"}))
|
||||
})
|
||||
|
||||
It("does not split decimals or abbreviations (UAX#29 SB6)", func() {
|
||||
c := newClauseChunker(12, 200)
|
||||
got := c.push("Pi is 3.14 and e is 2.72. Done")
|
||||
Expect(got).To(Equal([]string{"Pi is 3.14 and e is 2.72."}))
|
||||
Expect(c.flush()).To(Equal([]string{"Done"}))
|
||||
})
|
||||
})
|
||||
|
||||
Context("CJK (no whitespace)", func() {
|
||||
It("splits Chinese on the ideographic full stop", func() {
|
||||
c := newClauseChunker(12, 200)
|
||||
Expect(c.push("你好世界。今天天气很好。")).To(Equal([]string{"你好世界。"}))
|
||||
Expect(c.flush()).To(Equal([]string{"今天天气很好。"}))
|
||||
})
|
||||
|
||||
It("splits Japanese on the ideographic full stop", func() {
|
||||
c := newClauseChunker(12, 200)
|
||||
Expect(c.push("こんにちは。元気ですか。")).To(Equal([]string{"こんにちは。"}))
|
||||
Expect(c.flush()).To(Equal([]string{"元気ですか。"}))
|
||||
})
|
||||
|
||||
It("splits on CJK clause punctuation for lower latency", func() {
|
||||
c := newClauseChunker(2, 200) // small min so short test clauses cut
|
||||
Expect(c.push("你好,世界。再见")).To(Equal([]string{"你好,", "世界。"}))
|
||||
Expect(c.flush()).To(Equal([]string{"再见"}))
|
||||
})
|
||||
})
|
||||
|
||||
Context("Thai (spaces mark clauses, not words)", func() {
|
||||
It("splits a Thai run on the inter-clause space", func() {
|
||||
c := newClauseChunker(2, 200)
|
||||
Expect(c.push("สวัสดีครับ กินข้าวไหม")).To(Equal([]string{"สวัสดีครับ"}))
|
||||
Expect(c.flush()).To(Equal([]string{"กินข้าวไหม"}))
|
||||
})
|
||||
|
||||
It("never shatters a space-less Thai run into characters", func() {
|
||||
c := newClauseChunker(2, 200)
|
||||
Expect(c.push("สวัสดีครับ")).To(BeEmpty()) // held, no boundary
|
||||
Expect(c.flush()).To(Equal([]string{"สวัสดีครับ"}))
|
||||
})
|
||||
})
|
||||
|
||||
Context("length cap (UAX#14 fallback)", func() {
|
||||
It("force-breaks an over-long punctuation-less CJK run at legal points", func() {
|
||||
c := newClauseChunker(4, 10) // maxRunes = 10
|
||||
run := strings.Repeat("字", 25)
|
||||
got := c.push(run)
|
||||
got = append(got, c.flush()...)
|
||||
total := 0
|
||||
for _, seg := range got {
|
||||
n := utf8.RuneCountInString(seg)
|
||||
Expect(n).To(BeNumerically("<=", 10)) // never exceeds the cap
|
||||
total += n
|
||||
}
|
||||
Expect(total).To(Equal(25)) // nothing dropped
|
||||
Expect(len(got)).To(BeNumerically(">=", 3)) // 10 + 10 + 5
|
||||
})
|
||||
})
|
||||
|
||||
Context("buffer lifecycle", func() {
|
||||
It("flush clears the buffer so the chunker is reusable", func() {
|
||||
c := newClauseChunker(12, 200)
|
||||
// "First one." is confirmed by the following "Second", so push drains it;
|
||||
// only the unterminated tail remains for flush.
|
||||
Expect(c.push("First one. Second")).To(Equal([]string{"First one."}))
|
||||
Expect(c.flush()).To(Equal([]string{"Second"}))
|
||||
Expect(c.flush()).To(BeEmpty())
|
||||
Expect(c.push("Again. More")).To(Equal([]string{"Again."}))
|
||||
})
|
||||
})
|
||||
})
|
||||
138
core/http/endpoints/openai/realtime_doubles_test.go
Normal file
138
core/http/endpoints/openai/realtime_doubles_test.go
Normal file
@@ -0,0 +1,138 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/mudler/LocalAI/core/backend"
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
|
||||
"github.com/mudler/LocalAI/core/schema"
|
||||
"github.com/mudler/LocalAI/pkg/grpc/proto"
|
||||
)
|
||||
|
||||
// fakeTransport records the server events and audio sent to a realtime client
|
||||
// so streaming behaviour can be asserted without a real WebSocket/WebRTC peer.
|
||||
// It is not a *WebRTCTransport, so handler code takes the WebSocket path.
|
||||
type fakeTransport struct {
|
||||
events []types.ServerEvent
|
||||
audio []fakeAudioChunk
|
||||
}
|
||||
|
||||
type fakeAudioChunk struct {
|
||||
pcm []byte
|
||||
sampleRate int
|
||||
}
|
||||
|
||||
func (f *fakeTransport) SendEvent(e types.ServerEvent) error {
|
||||
f.events = append(f.events, e)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeTransport) ReadEvent() ([]byte, error) { return nil, nil }
|
||||
|
||||
func (f *fakeTransport) SendAudio(_ context.Context, pcm []byte, sampleRate int) error {
|
||||
f.audio = append(f.audio, fakeAudioChunk{pcm: pcm, sampleRate: sampleRate})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *fakeTransport) Close() error { return nil }
|
||||
|
||||
// countEvents returns how many recorded events have the given type.
|
||||
func (f *fakeTransport) countEvents(et types.ServerEventType) int {
|
||||
n := 0
|
||||
for _, e := range f.events {
|
||||
if e.ServerEventType() == et {
|
||||
n++
|
||||
}
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// transcriptDeltaText concatenates the Delta of every recorded transcript
|
||||
// delta event — i.e. the text streamed to the client as it is generated.
|
||||
func (f *fakeTransport) transcriptDeltaText() string {
|
||||
var b strings.Builder
|
||||
for _, e := range f.events {
|
||||
if d, ok := e.(types.ResponseOutputAudioTranscriptDeltaEvent); ok {
|
||||
b.WriteString(d.Delta)
|
||||
}
|
||||
}
|
||||
return b.String()
|
||||
}
|
||||
|
||||
// fakeModel is a configurable Model double. TTSStream replays ttsStreamChunks
|
||||
// and TranscribeStream replays transcribeDeltas, so the handler's streaming
|
||||
// paths can be driven deterministically.
|
||||
type fakeModel struct {
|
||||
cfg *config.ModelConfig
|
||||
|
||||
ttsFile string
|
||||
ttsStreamChunks [][]byte
|
||||
ttsStreamRate int
|
||||
ttsStreamErr error
|
||||
|
||||
transcribeDeltas []string
|
||||
transcribeFinal *schema.TranscriptionResult
|
||||
|
||||
// Predict streaming: predictTokens are replayed through the token callback
|
||||
// (simulating streamed LLM output); predictResp/predictErr are returned by
|
||||
// the deferred predict function. predictChunkDeltas, when set, are delivered
|
||||
// per-token via TokenUsage.ChatDeltas to exercise the autoparser path.
|
||||
predictTokens []string
|
||||
predictChunkDeltas [][]*proto.ChatDelta
|
||||
predictResp backend.LLMResponse
|
||||
predictErr error
|
||||
}
|
||||
|
||||
func (m *fakeModel) VAD(context.Context, *schema.VADRequest) (*schema.VADResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *fakeModel) Transcribe(context.Context, string, string, bool, bool, string) (*schema.TranscriptionResult, error) {
|
||||
return m.transcribeFinal, nil
|
||||
}
|
||||
|
||||
func (m *fakeModel) Predict(_ context.Context, _ schema.Messages, _, _, _ []string, cb func(string, backend.TokenUsage) bool, _ []types.ToolUnion, _ *types.ToolChoiceUnion, _, _ *int, _ map[string]float64) (func() (backend.LLMResponse, error), error) {
|
||||
if m.predictErr != nil {
|
||||
return nil, m.predictErr
|
||||
}
|
||||
return func() (backend.LLMResponse, error) {
|
||||
for i, tok := range m.predictTokens {
|
||||
if cb == nil {
|
||||
continue
|
||||
}
|
||||
usage := backend.TokenUsage{}
|
||||
if i < len(m.predictChunkDeltas) {
|
||||
usage.ChatDeltas = m.predictChunkDeltas[i]
|
||||
}
|
||||
cb(tok, usage)
|
||||
}
|
||||
return m.predictResp, nil
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *fakeModel) TTS(context.Context, string, string, string) (string, *proto.Result, error) {
|
||||
return m.ttsFile, &proto.Result{Success: true}, nil
|
||||
}
|
||||
|
||||
func (m *fakeModel) TTSStream(_ context.Context, _, _, _ string, onAudio func(pcm []byte, sampleRate int) error) error {
|
||||
if m.ttsStreamErr != nil {
|
||||
return m.ttsStreamErr
|
||||
}
|
||||
for _, c := range m.ttsStreamChunks {
|
||||
if err := onAudio(c, m.ttsStreamRate); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *fakeModel) TranscribeStream(_ context.Context, _, _ string, _, _ bool, _ string, onDelta func(text string)) (*schema.TranscriptionResult, error) {
|
||||
for _, d := range m.transcribeDeltas {
|
||||
onDelta(d)
|
||||
}
|
||||
return m.transcribeFinal, nil
|
||||
}
|
||||
|
||||
func (m *fakeModel) PredictConfig() *config.ModelConfig { return m.cfg }
|
||||
@@ -3,6 +3,7 @@ package openai
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@@ -87,6 +88,14 @@ func (m *transcriptOnlyModel) TTS(ctx context.Context, text, voice, language str
|
||||
return "", nil, fmt.Errorf("TTS not supported in transcript-only mode")
|
||||
}
|
||||
|
||||
func (m *transcriptOnlyModel) TTSStream(ctx context.Context, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error {
|
||||
return fmt.Errorf("TTS not supported in transcript-only mode")
|
||||
}
|
||||
|
||||
func (m *transcriptOnlyModel) TranscribeStream(ctx context.Context, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error) {
|
||||
return transcribeStream(ctx, m.modelLoader, *m.TranscriptionConfig, m.appConfig, audio, language, translate, diarize, prompt, onDelta)
|
||||
}
|
||||
|
||||
func (m *transcriptOnlyModel) PredictConfig() *config.ModelConfig {
|
||||
return nil
|
||||
}
|
||||
@@ -321,10 +330,75 @@ func (m *wrappedModel) TTS(ctx context.Context, text, voice, language string) (s
|
||||
return backend.ModelTTS(ctx, text, voice, language, "", nil, m.modelLoader, m.appConfig, *m.TTSConfig)
|
||||
}
|
||||
|
||||
func (m *wrappedModel) TTSStream(ctx context.Context, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error {
|
||||
return ttsStream(ctx, m.modelLoader, m.appConfig, *m.TTSConfig, text, voice, language, onAudio)
|
||||
}
|
||||
|
||||
func (m *wrappedModel) TranscribeStream(ctx context.Context, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error) {
|
||||
return transcribeStream(ctx, m.modelLoader, *m.TranscriptionConfig, m.appConfig, audio, language, translate, diarize, prompt, onDelta)
|
||||
}
|
||||
|
||||
func (m *wrappedModel) PredictConfig() *config.ModelConfig {
|
||||
return m.LLMConfig
|
||||
}
|
||||
|
||||
// wavStreamHeaderBytes is the size of the WAV header that backend.ModelTTSStream
|
||||
// emits as its first audio callback; the sample rate lives at byte offset 24.
|
||||
const wavStreamHeaderBytes = 44
|
||||
|
||||
// ttsStream adapts backend.ModelTTSStream (which emits a WAV stream: a 44-byte
|
||||
// header carrying the sample rate, then raw PCM) to the realtime onAudio
|
||||
// callback, which wants raw PCM plus the sample rate. The header is buffered
|
||||
// until complete, the sample rate is read from it, and subsequent bytes are
|
||||
// forwarded as PCM.
|
||||
func ttsStream(ctx context.Context, ml *model.ModelLoader, appConfig *config.ApplicationConfig, ttsConfig config.ModelConfig, text, voice, language string, onAudio func(pcm []byte, sampleRate int) error) error {
|
||||
var header []byte
|
||||
headerDone := false
|
||||
sampleRate := 0
|
||||
return backend.ModelTTSStream(ctx, text, voice, language, "", nil, ml, appConfig, ttsConfig, func(b []byte) error {
|
||||
if headerDone {
|
||||
if len(b) == 0 {
|
||||
return nil
|
||||
}
|
||||
return onAudio(b, sampleRate)
|
||||
}
|
||||
header = append(header, b...)
|
||||
if len(header) < wavStreamHeaderBytes {
|
||||
return nil
|
||||
}
|
||||
sampleRate = int(binary.LittleEndian.Uint32(header[24:28]))
|
||||
headerDone = true
|
||||
if len(header) > wavStreamHeaderBytes {
|
||||
return onAudio(header[wavStreamHeaderBytes:], sampleRate)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// transcribeStream adapts backend.ModelTranscriptionStream to the realtime
|
||||
// onDelta callback, returning the final aggregated transcription result.
|
||||
func transcribeStream(ctx context.Context, ml *model.ModelLoader, transcriptionConfig config.ModelConfig, appConfig *config.ApplicationConfig, audio, language string, translate, diarize bool, prompt string, onDelta func(text string)) (*schema.TranscriptionResult, error) {
|
||||
var final *schema.TranscriptionResult
|
||||
err := backend.ModelTranscriptionStream(ctx, backend.TranscriptionRequest{
|
||||
Audio: audio,
|
||||
Language: language,
|
||||
Translate: translate,
|
||||
Diarize: diarize,
|
||||
Prompt: prompt,
|
||||
}, ml, transcriptionConfig, appConfig, func(chunk backend.TranscriptionStreamChunk) {
|
||||
if chunk.Delta != "" {
|
||||
onDelta(chunk.Delta)
|
||||
}
|
||||
if chunk.Final != nil {
|
||||
final = chunk.Final
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return final, nil
|
||||
}
|
||||
|
||||
func newTranscriptionOnlyModel(pipeline *config.Pipeline, cl *config.ModelConfigLoader, ml *model.ModelLoader, appConfig *config.ApplicationConfig) (Model, *config.ModelConfig, error) {
|
||||
cfgVAD, err := cl.LoadModelConfigFileByName(pipeline.VAD, ml.ModelPath)
|
||||
if err != nil {
|
||||
@@ -454,8 +528,10 @@ func newModel(pipeline *config.Pipeline, cl *config.ModelConfigLoader, ml *model
|
||||
return nil, fmt.Errorf("failed to validate config: %w", err)
|
||||
}
|
||||
|
||||
// Let the pipeline set the LLM's reasoning effort (cfgLLM is a per-session copy).
|
||||
// Let the pipeline set the LLM's reasoning effort and force thinking off
|
||||
// (cfgLLM is a per-session copy). disable_thinking applies after the effort.
|
||||
applyPipelineReasoning(cfgLLM, *pipeline)
|
||||
applyPipelineThinking(cfgLLM, *pipeline)
|
||||
|
||||
cfgTTS, err := cl.LoadModelConfigFileByName(pipeline.TTS, ml.ModelPath)
|
||||
if err != nil {
|
||||
|
||||
102
core/http/endpoints/openai/realtime_speech.go
Normal file
102
core/http/endpoints/openai/realtime_speech.go
Normal file
@@ -0,0 +1,102 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
|
||||
laudio "github.com/mudler/LocalAI/pkg/audio"
|
||||
"github.com/mudler/LocalAI/pkg/sound"
|
||||
)
|
||||
|
||||
// emitSpeech synthesizes text and sends the audio to the client. When the
|
||||
// pipeline opts into TTS streaming it forwards each PCM chunk as its own
|
||||
// response.output_audio.delta as soon as the backend produces it; otherwise it
|
||||
// synthesizes the whole utterance and sends it as a single delta.
|
||||
//
|
||||
// It deliberately does NOT emit transcript or audio-done events: the caller owns
|
||||
// those so a streamed reply can be split into several spoken segments that share
|
||||
// one response/item.
|
||||
//
|
||||
// It returns the PCM audio (at the session output rate) accumulated across all
|
||||
// chunks, which the caller base64-encodes onto the conversation item. For WebRTC
|
||||
// the audio goes over the RTP track instead, so the returned slice is empty.
|
||||
func emitSpeech(ctx context.Context, t Transport, session *Session, responseID, itemID, text string) ([]byte, error) {
|
||||
if text == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
_, isWebRTC := t.(*WebRTCTransport)
|
||||
|
||||
var wsAudio []byte // PCM at the session output rate, accumulated for the item record
|
||||
|
||||
// sendChunk hands one PCM buffer to the transport: WebRTC consumes the raw
|
||||
// PCM directly (it resamples internally); WebSocket gets base64 PCM at the
|
||||
// session output rate via a JSON delta event.
|
||||
sendChunk := func(pcm []byte, sampleRate int) error {
|
||||
if len(pcm) == 0 {
|
||||
return nil
|
||||
}
|
||||
if err := t.SendAudio(ctx, pcm, sampleRate); err != nil {
|
||||
return err
|
||||
}
|
||||
if isWebRTC {
|
||||
return nil
|
||||
}
|
||||
wsPCM := pcm
|
||||
if sampleRate != 0 && sampleRate != session.OutputSampleRate {
|
||||
samples := sound.BytesToInt16sLE(pcm)
|
||||
resampled := sound.ResampleInt16(samples, sampleRate, session.OutputSampleRate)
|
||||
wsPCM = sound.Int16toBytesLE(resampled)
|
||||
}
|
||||
wsAudio = append(wsAudio, wsPCM...)
|
||||
return t.SendEvent(types.ResponseOutputAudioDeltaEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: responseID,
|
||||
ItemID: itemID,
|
||||
OutputIndex: 0,
|
||||
ContentIndex: 0,
|
||||
Delta: base64.StdEncoding.EncodeToString(wsPCM),
|
||||
})
|
||||
}
|
||||
|
||||
language := ""
|
||||
if session.InputAudioTranscription != nil {
|
||||
language = session.InputAudioTranscription.Language
|
||||
}
|
||||
|
||||
if session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamTTS() {
|
||||
if err := session.ModelInterface.TTSStream(ctx, text, session.Voice, language, sendChunk); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return wsAudio, nil
|
||||
}
|
||||
|
||||
// Unary fallback: synthesize the whole utterance to a file, then emit once.
|
||||
audioFilePath, res, err := session.ModelInterface.TTS(ctx, text, session.Voice, language)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if res != nil && !res.Success {
|
||||
return nil, fmt.Errorf("tts generation failed: %s", res.Message)
|
||||
}
|
||||
defer func() { _ = os.Remove(audioFilePath) }()
|
||||
|
||||
// filepath.Clean normalizes the backend-produced temp path before reading
|
||||
// (also keeps gosec G304 quiet — the path is backend-controlled, not user input).
|
||||
audioBytes, err := os.ReadFile(filepath.Clean(audioFilePath))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read tts audio: %w", err)
|
||||
}
|
||||
pcm, sampleRate := laudio.ParseWAV(audioBytes)
|
||||
if sampleRate == 0 {
|
||||
sampleRate = session.OutputSampleRate
|
||||
}
|
||||
if err := sendChunk(pcm, sampleRate); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return wsAudio, nil
|
||||
}
|
||||
70
core/http/endpoints/openai/realtime_speech_test.go
Normal file
70
core/http/endpoints/openai/realtime_speech_test.go
Normal file
@@ -0,0 +1,70 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
|
||||
laudio "github.com/mudler/LocalAI/pkg/audio"
|
||||
)
|
||||
|
||||
// emitSpeech synthesizes a piece of text and forwards the audio to the client,
|
||||
// streaming a delta per TTS chunk when the pipeline opts in, or sending the
|
||||
// whole utterance as one delta otherwise.
|
||||
var _ = Describe("emitSpeech", func() {
|
||||
ttsOn := true
|
||||
|
||||
streamingSession := func(m Model) *Session {
|
||||
return &Session{
|
||||
OutputSampleRate: 24000,
|
||||
ModelInterface: m,
|
||||
ModelConfig: &config.ModelConfig{
|
||||
Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{TTS: &ttsOn}},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
It("streams one output_audio.delta per TTS chunk when streaming is enabled", func() {
|
||||
m := &fakeModel{
|
||||
ttsStreamChunks: [][]byte{{1, 2}, {3, 4}, {5, 6}},
|
||||
ttsStreamRate: 24000,
|
||||
}
|
||||
t := &fakeTransport{}
|
||||
|
||||
audio, err := emitSpeech(context.Background(), t, streamingSession(m), "resp1", "item1", "Hello there.")
|
||||
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(3))
|
||||
// The returned audio is all chunks concatenated (session output rate).
|
||||
Expect(audio).To(Equal([]byte{1, 2, 3, 4, 5, 6}))
|
||||
})
|
||||
|
||||
It("sends a single output_audio.delta in unary mode", func() {
|
||||
// A minimal real WAV file for the unary TTS path to read + parse.
|
||||
f, err := os.CreateTemp("", "emit-*.wav")
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
defer func() { _ = os.Remove(f.Name()) }()
|
||||
pcm := make([]byte, 320) // 160 samples of silence
|
||||
hdr := laudio.NewWAVHeader(uint32(len(pcm)))
|
||||
Expect(hdr.Write(f)).To(Succeed())
|
||||
_, err = f.Write(pcm)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(f.Close()).To(Succeed())
|
||||
|
||||
session := &Session{
|
||||
OutputSampleRate: 24000,
|
||||
ModelInterface: &fakeModel{ttsFile: f.Name()},
|
||||
ModelConfig: &config.ModelConfig{}, // streaming off
|
||||
}
|
||||
t := &fakeTransport{}
|
||||
|
||||
_, err = emitSpeech(context.Background(), t, session, "resp1", "item1", "Hello there.")
|
||||
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(1))
|
||||
})
|
||||
})
|
||||
315
core/http/endpoints/openai/realtime_stream.go
Normal file
315
core/http/endpoints/openai/realtime_stream.go
Normal file
@@ -0,0 +1,315 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
|
||||
"github.com/mudler/LocalAI/core/backend"
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
|
||||
"github.com/mudler/LocalAI/core/schema"
|
||||
"github.com/mudler/LocalAI/pkg/functions"
|
||||
"github.com/mudler/LocalAI/pkg/reasoning"
|
||||
)
|
||||
|
||||
// transcriptStreamer turns streamed LLM tokens into the assistant's spoken
|
||||
// transcript: it strips reasoning incrementally and sends one
|
||||
// response.output_audio_transcript.delta per content fragment. It does NOT
|
||||
// synthesize audio — the caller buffers the full message and synthesizes it
|
||||
// once (streaming the audio chunks when the TTS backend supports TTSStream),
|
||||
// which works uniformly for streaming and non-streaming TTS and for languages
|
||||
// without sentence or word boundaries.
|
||||
type transcriptStreamer struct {
|
||||
ctx context.Context
|
||||
t Transport
|
||||
responseID string
|
||||
itemID string
|
||||
extractor *reasoning.ReasoningExtractor
|
||||
|
||||
// announce, if set, is invoked once just before the first transcript delta.
|
||||
// It lets the caller create the assistant item lazily, so a content-less
|
||||
// tool-call turn never emits a spurious empty assistant item.
|
||||
announce func()
|
||||
announced bool
|
||||
}
|
||||
|
||||
func newTranscriptStreamer(ctx context.Context, t Transport, responseID, itemID, thinkingStartToken string, reasoningCfg reasoning.Config) *transcriptStreamer {
|
||||
return &transcriptStreamer{
|
||||
ctx: ctx,
|
||||
t: t,
|
||||
responseID: responseID,
|
||||
itemID: itemID,
|
||||
extractor: reasoning.NewReasoningExtractor(thinkingStartToken, spokenReasoningConfig(reasoningCfg)),
|
||||
}
|
||||
}
|
||||
|
||||
// onToken handles one streamed unit of model output, sending a transcript delta
|
||||
// for the new content (reasoning stripped) and returning that content delta so
|
||||
// the caller can also feed it to the clause chunker. For plain-content models
|
||||
// the unit is the raw text token; for autoparser tool turns the backend clears
|
||||
// the text and delivers content via ChatDeltas, so the caller passes that
|
||||
// content here. Returns "" when the token produced no new spoken content.
|
||||
func (s *transcriptStreamer) onToken(token string) string {
|
||||
_, content := s.extractor.ProcessToken(token)
|
||||
if content == "" {
|
||||
return ""
|
||||
}
|
||||
if !s.announced {
|
||||
s.announced = true
|
||||
if s.announce != nil {
|
||||
s.announce()
|
||||
}
|
||||
}
|
||||
_ = s.t.SendEvent(types.ResponseOutputAudioTranscriptDeltaEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: s.responseID,
|
||||
ItemID: s.itemID,
|
||||
OutputIndex: 0,
|
||||
ContentIndex: 0,
|
||||
Delta: content,
|
||||
})
|
||||
return content
|
||||
}
|
||||
|
||||
// content returns the full transcript so far with reasoning stripped.
|
||||
func (s *transcriptStreamer) content() string {
|
||||
return s.extractor.CleanedContent()
|
||||
}
|
||||
|
||||
// streamLLMResponse drives a streamed realtime reply. It streams the assistant
|
||||
// transcript as the LLM generates, then synthesizes the whole buffered message
|
||||
// once (streaming the audio chunks when the TTS backend supports it, otherwise a
|
||||
// single unary delta). Tool calls parsed from the autoparser ChatDeltas are
|
||||
// emitted after the spoken content. The assistant content item is created lazily
|
||||
// on the first content delta, so a content-less tool-call turn emits only the
|
||||
// tool calls. It returns true when it has fully handled the response so the
|
||||
// caller can return; callers must only invoke it for an audio modality, and with
|
||||
// tools only when the model uses its tokenizer template (see triggerResponseAtTurn).
|
||||
func streamLLMResponse(ctx context.Context, session *Session, conv *Conversation, t Transport, responseID string, history schema.Messages, images []string, llmCfg *config.ModelConfig, tools []types.ToolUnion, toolChoice *types.ToolChoiceUnion, toolTurn int) bool {
|
||||
itemID := generateItemID()
|
||||
item := types.MessageItemUnion{
|
||||
Assistant: &types.MessageItemAssistant{
|
||||
ID: itemID,
|
||||
Status: types.ItemStatusInProgress,
|
||||
Content: []types.MessageContentOutput{{Type: types.MessageContentTypeOutputAudio}},
|
||||
},
|
||||
}
|
||||
|
||||
// announce creates the assistant content item lazily, just before the first
|
||||
// transcript delta — a tool-only turn never produces content, so it stays out
|
||||
// of the conversation and the client sees only the tool calls.
|
||||
announced := false
|
||||
announce := func() {
|
||||
announced = true
|
||||
conv.Lock.Lock()
|
||||
conv.Items = append(conv.Items, &item)
|
||||
conv.Lock.Unlock()
|
||||
sendEvent(t, types.ResponseOutputItemAddedEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: responseID,
|
||||
OutputIndex: 0,
|
||||
Item: item,
|
||||
})
|
||||
sendEvent(t, types.ResponseContentPartAddedEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: responseID,
|
||||
ItemID: itemID,
|
||||
OutputIndex: 0,
|
||||
ContentIndex: 0,
|
||||
Part: item.Assistant.Content[0],
|
||||
})
|
||||
}
|
||||
|
||||
cancel := func() {
|
||||
if announced {
|
||||
conv.Lock.Lock()
|
||||
for i := len(conv.Items) - 1; i >= 0; i-- {
|
||||
if conv.Items[i].Assistant != nil && conv.Items[i].Assistant.ID == itemID {
|
||||
conv.Items = append(conv.Items[:i], conv.Items[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
conv.Lock.Unlock()
|
||||
}
|
||||
sendEvent(t, types.ResponseDoneEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
Response: types.Response{ID: responseID, Object: "realtime.response", Status: types.ResponseStatusCancelled},
|
||||
})
|
||||
}
|
||||
|
||||
var template string
|
||||
if llmCfg.TemplateConfig.UseTokenizerTemplate {
|
||||
template = llmCfg.GetModelTemplate()
|
||||
} else {
|
||||
template = llmCfg.TemplateConfig.Chat
|
||||
}
|
||||
thinkingStartToken := reasoning.DetectThinkingStartToken(template, &llmCfg.ReasoningConfig)
|
||||
|
||||
// The autoparser (tokenizer-template path) already delivers reasoning-free
|
||||
// content. Prefilling the thinking start token here would re-tag that clean
|
||||
// content as an unclosed reasoning block, leaving CleanedContent() empty —
|
||||
// no spoken reply, no TTS. Disable the prefill; closed tag pairs are still
|
||||
// stripped (PEG-fallback case, #9985).
|
||||
reasoningCfg := llmCfg.ReasoningConfig
|
||||
if llmCfg.TemplateConfig.UseTokenizerTemplate {
|
||||
disablePrefill := true
|
||||
reasoningCfg.DisableReasoningTagPrefill = &disablePrefill
|
||||
}
|
||||
|
||||
streamer := newTranscriptStreamer(ctx, t, responseID, itemID, thinkingStartToken, reasoningCfg)
|
||||
streamer.announce = announce
|
||||
|
||||
// Clause chunking (opt-in): synthesize each clause as soon as it completes
|
||||
// instead of buffering the whole reply. streamedAudio accumulates the PCM
|
||||
// across clauses for the conversation item record; ttsErr captures the first
|
||||
// synthesis failure so the token callback can stop the prediction. emitSpeech
|
||||
// runs synchronously here — the LLM keeps generating into the gRPC stream
|
||||
// while a clause is synthesized, so audio still starts mid-generation.
|
||||
var chunker *clauseChunker
|
||||
if session.ModelConfig != nil && session.ModelConfig.Pipeline.ChunkClauses() {
|
||||
chunker = newClauseChunker(defaultClauseMinRunes, defaultClauseMaxRunes)
|
||||
}
|
||||
var streamedAudio []byte
|
||||
var ttsErr error
|
||||
speakClause := func(clause string) error {
|
||||
a, err := emitSpeech(ctx, t, session, responseID, itemID, clause)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
streamedAudio = append(streamedAudio, a...)
|
||||
return nil
|
||||
}
|
||||
|
||||
// fail reports a mid-stream failure. A cancelled context means the client
|
||||
// interrupted (barge-in), so roll the turn back instead of erroring.
|
||||
fail := func(code, msg string, err error) bool {
|
||||
if ctx.Err() != nil {
|
||||
cancel()
|
||||
} else {
|
||||
sendError(t, code, fmt.Sprintf("%s: %v", msg, err), "", itemID)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
cb := func(token string, usage backend.TokenUsage) bool {
|
||||
if ctx.Err() != nil {
|
||||
return false
|
||||
}
|
||||
// Plain-content models stream text via the token; autoparser tool turns
|
||||
// clear the text and deliver content via ChatDeltas, so prefer the latter
|
||||
// when present. Either way only content reaches the transcript — tool-call
|
||||
// deltas are parsed from the final response below.
|
||||
text := token
|
||||
if len(usage.ChatDeltas) > 0 {
|
||||
text = functions.ContentFromChatDeltas(usage.ChatDeltas)
|
||||
}
|
||||
delta := streamer.onToken(text)
|
||||
if chunker != nil && delta != "" {
|
||||
for _, clause := range chunker.push(delta) {
|
||||
if ttsErr = speakClause(clause); ttsErr != nil {
|
||||
return false // stop the prediction; reported after predFunc returns
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
predFunc, err := session.ModelInterface.Predict(ctx, history, images, nil, nil, cb, tools, toolChoice, nil, nil, nil)
|
||||
if err != nil {
|
||||
sendError(t, "inference_failed", fmt.Sprintf("backend error: %v", err), "", itemID)
|
||||
return true
|
||||
}
|
||||
pred, err := predFunc()
|
||||
// A clause synthesis failed mid-stream (the callback stopped the prediction);
|
||||
// report it as a TTS error rather than a prediction error.
|
||||
if ttsErr != nil {
|
||||
return fail("tts_error", "TTS generation failed", ttsErr)
|
||||
}
|
||||
if err != nil {
|
||||
return fail("prediction_failed", "backend error", err)
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
cancel()
|
||||
return true
|
||||
}
|
||||
|
||||
content := streamer.content()
|
||||
toolCalls := functions.ToolCallsFromChatDeltas(pred.ChatDeltas)
|
||||
|
||||
// Finalize the spoken content item only when the turn produced content. A
|
||||
// tool-only turn skips this entirely (no empty assistant item).
|
||||
if content != "" {
|
||||
if !announced {
|
||||
announce()
|
||||
}
|
||||
|
||||
// Synthesize the audio. With clause chunking the completed clauses were
|
||||
// already spoken inside the token callback; flush the trailing clause(s)
|
||||
// the segmenter was still holding. Otherwise buffer the whole message and
|
||||
// synthesize it once. emitSpeech streams the audio chunks when the TTS
|
||||
// backend supports TTSStream, otherwise it sends a single unary delta.
|
||||
var audio []byte
|
||||
if chunker != nil {
|
||||
for _, clause := range chunker.flush() {
|
||||
if ttsErr = speakClause(clause); ttsErr != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
audio = streamedAudio
|
||||
} else {
|
||||
audio, ttsErr = emitSpeech(ctx, t, session, responseID, itemID, content)
|
||||
}
|
||||
if ttsErr != nil {
|
||||
return fail("tts_error", "TTS generation failed", ttsErr)
|
||||
}
|
||||
|
||||
_, isWebRTC := t.(*WebRTCTransport)
|
||||
|
||||
sendEvent(t, types.ResponseOutputAudioTranscriptDoneEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: responseID,
|
||||
ItemID: itemID,
|
||||
OutputIndex: 0,
|
||||
ContentIndex: 0,
|
||||
Transcript: content,
|
||||
})
|
||||
if !isWebRTC {
|
||||
sendEvent(t, types.ResponseOutputAudioDoneEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: responseID,
|
||||
ItemID: itemID,
|
||||
OutputIndex: 0,
|
||||
ContentIndex: 0,
|
||||
})
|
||||
}
|
||||
|
||||
conv.Lock.Lock()
|
||||
item.Assistant.Status = types.ItemStatusCompleted
|
||||
item.Assistant.Content[0].Transcript = content
|
||||
if !isWebRTC {
|
||||
item.Assistant.Content[0].Audio = base64.StdEncoding.EncodeToString(audio)
|
||||
}
|
||||
conv.Lock.Unlock()
|
||||
|
||||
sendEvent(t, types.ResponseContentPartDoneEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: responseID,
|
||||
ItemID: itemID,
|
||||
OutputIndex: 0,
|
||||
ContentIndex: 0,
|
||||
Part: item.Assistant.Content[0],
|
||||
})
|
||||
sendEvent(t, types.ResponseOutputItemDoneEvent{
|
||||
ServerEventBase: types.ServerEventBase{},
|
||||
ResponseID: responseID,
|
||||
OutputIndex: 0,
|
||||
Item: item,
|
||||
})
|
||||
}
|
||||
|
||||
// Emit any tool calls, the terminal response.done, and (for server-side
|
||||
// assistant tools) the follow-up turn — shared with the buffered path.
|
||||
emitToolCallItems(ctx, session, conv, t, responseID, toolCalls, content != "", toolTurn)
|
||||
return true
|
||||
}
|
||||
213
core/http/endpoints/openai/realtime_stream_test.go
Normal file
213
core/http/endpoints/openai/realtime_stream_test.go
Normal file
@@ -0,0 +1,213 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/mudler/LocalAI/core/backend"
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
|
||||
"github.com/mudler/LocalAI/pkg/grpc/proto"
|
||||
"github.com/mudler/LocalAI/pkg/reasoning"
|
||||
)
|
||||
|
||||
// transcriptStreamer turns streamed LLM tokens into incremental transcript
|
||||
// deltas, stripping reasoning. Audio is synthesized once from the full message
|
||||
// by the caller, so there is no per-sentence segmentation.
|
||||
var _ = Describe("transcriptStreamer", func() {
|
||||
It("emits one transcript delta per content token", func() {
|
||||
t := &fakeTransport{}
|
||||
s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "", reasoning.Config{})
|
||||
|
||||
for _, tok := range []string{"Hello", " world.", " Bye"} {
|
||||
s.onToken(tok)
|
||||
}
|
||||
|
||||
Expect(s.content()).To(Equal("Hello world. Bye"))
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(3))
|
||||
Expect(t.transcriptDeltaText()).To(Equal("Hello world. Bye"))
|
||||
})
|
||||
|
||||
It("strips leaked reasoning even when reasoning is disabled (disable_thinking safety net)", func() {
|
||||
// disable_thinking maps to DisableReasoning=true (enable_thinking=false to
|
||||
// the backend). If the model emits thinking anyway, the transcript must
|
||||
// still not leak it: stripping always runs for spoken output.
|
||||
disable := true
|
||||
t := &fakeTransport{}
|
||||
s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "",
|
||||
reasoning.Config{DisableReasoning: &disable})
|
||||
|
||||
s.onToken("<think>secret plan</think>")
|
||||
s.onToken("The answer is 42.")
|
||||
|
||||
Expect(s.content()).To(Equal("The answer is 42."))
|
||||
Expect(s.content()).ToNot(ContainSubstring("secret plan"))
|
||||
Expect(t.transcriptDeltaText()).ToNot(ContainSubstring("secret plan"))
|
||||
})
|
||||
|
||||
It("does not swallow autoparser content when the template has a thinking start token (tokenizer-template path)", func() {
|
||||
// Regression: with tag prefill on, the detected <think> token is
|
||||
// prepended to the autoparser's already-clean content, swallowing the
|
||||
// whole reply (empty transcript → no TTS). streamLLMResponse disables
|
||||
// the prefill for the tokenizer-template path.
|
||||
disablePrefill := true
|
||||
t := &fakeTransport{}
|
||||
s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "<think>",
|
||||
reasoning.Config{DisableReasoningTagPrefill: &disablePrefill})
|
||||
|
||||
s.onToken("Hello")
|
||||
s.onToken(" there.")
|
||||
|
||||
Expect(s.content()).To(Equal("Hello there."))
|
||||
Expect(t.transcriptDeltaText()).To(Equal("Hello there."))
|
||||
})
|
||||
|
||||
It("still strips embedded closed reasoning tags with prefill disabled (PEG-fallback safety, #9985)", func() {
|
||||
// Disabling prefill must not stop stripping closed <think>…</think>
|
||||
// pairs the PEG fallback can leave in autoparser content.
|
||||
disablePrefill := true
|
||||
t := &fakeTransport{}
|
||||
s := newTranscriptStreamer(context.Background(), t, "resp1", "item1", "<think>",
|
||||
reasoning.Config{DisableReasoningTagPrefill: &disablePrefill})
|
||||
|
||||
s.onToken("<think>secret</think>")
|
||||
s.onToken("The answer is 42.")
|
||||
|
||||
Expect(s.content()).To(Equal("The answer is 42."))
|
||||
Expect(t.transcriptDeltaText()).ToNot(ContainSubstring("secret"))
|
||||
})
|
||||
})
|
||||
|
||||
// streamLLMResponse drives a full streamed realtime turn: live transcript
|
||||
// deltas while the LLM generates, then the whole message is synthesized once.
|
||||
var _ = Describe("streamLLMResponse", func() {
|
||||
It("streams transcript deltas then synthesizes the whole message once", func() {
|
||||
on := true
|
||||
m := &fakeModel{
|
||||
predictTokens: []string{"Hello", " world.", " How are you?"},
|
||||
predictResp: backend.LLMResponse{Response: "Hello world. How are you?"},
|
||||
ttsStreamChunks: [][]byte{{9}},
|
||||
ttsStreamRate: 24000,
|
||||
}
|
||||
session := &Session{
|
||||
OutputSampleRate: 24000,
|
||||
ModelInterface: m,
|
||||
ModelConfig: &config.ModelConfig{
|
||||
Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on}},
|
||||
},
|
||||
}
|
||||
conv := &Conversation{}
|
||||
t := &fakeTransport{}
|
||||
llmCfg := &config.ModelConfig{}
|
||||
|
||||
handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0)
|
||||
|
||||
Expect(handled).To(BeTrue())
|
||||
// One live transcript delta per streamed token.
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(3))
|
||||
// The whole message is synthesized ONCE (not per sentence): a single
|
||||
// emitSpeech replays the one TTS stream chunk.
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(1))
|
||||
Expect(t.transcriptDeltaText()).To(Equal("Hello world. How are you?"))
|
||||
})
|
||||
|
||||
It("synthesizes each clause as it completes when clause chunking is enabled", func() {
|
||||
on := true
|
||||
m := &fakeModel{
|
||||
predictTokens: []string{"Hello world.", " How are you?"},
|
||||
predictResp: backend.LLMResponse{Response: "Hello world. How are you?"},
|
||||
ttsStreamChunks: [][]byte{{9}},
|
||||
ttsStreamRate: 24000,
|
||||
}
|
||||
session := &Session{
|
||||
OutputSampleRate: 24000,
|
||||
ModelInterface: m,
|
||||
ModelConfig: &config.ModelConfig{
|
||||
Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on, ClauseChunking: &on}},
|
||||
},
|
||||
}
|
||||
conv := &Conversation{}
|
||||
t := &fakeTransport{}
|
||||
llmCfg := &config.ModelConfig{}
|
||||
|
||||
handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0)
|
||||
|
||||
Expect(handled).To(BeTrue())
|
||||
// Two clauses ("Hello world." mid-stream, "How are you?" on flush) → two
|
||||
// emitSpeech calls → two audio deltas, vs one for whole-message buffering.
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioDelta)).To(Equal(2))
|
||||
// The full transcript still streams verbatim.
|
||||
Expect(t.transcriptDeltaText()).To(Equal("Hello world. How are you?"))
|
||||
// Exactly one terminal response.done.
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseDone)).To(Equal(1))
|
||||
})
|
||||
|
||||
It("streams content deltas and emits tool-call items (autoparser tool turn)", func() {
|
||||
on := true
|
||||
// Autoparser path: reply.Message is empty; content + tool calls arrive via
|
||||
// ChatDeltas. Chunk 1 carries content, chunk 2 carries the tool call.
|
||||
contentDelta := []*proto.ChatDelta{{Content: "Let me check."}}
|
||||
toolDelta := []*proto.ChatDelta{{ToolCalls: []*proto.ToolCallDelta{{Index: 0, Name: "get_weather", Arguments: `{"city":"Paris"}`}}}}
|
||||
m := &fakeModel{
|
||||
predictTokens: []string{"", ""},
|
||||
predictChunkDeltas: [][]*proto.ChatDelta{contentDelta, toolDelta},
|
||||
predictResp: backend.LLMResponse{ChatDeltas: append(append([]*proto.ChatDelta{}, contentDelta...), toolDelta...)},
|
||||
ttsStreamChunks: [][]byte{{9}},
|
||||
ttsStreamRate: 24000,
|
||||
}
|
||||
session := &Session{
|
||||
OutputSampleRate: 24000,
|
||||
ModelInterface: m,
|
||||
ModelConfig: &config.ModelConfig{
|
||||
Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on}},
|
||||
},
|
||||
}
|
||||
conv := &Conversation{}
|
||||
t := &fakeTransport{}
|
||||
llmCfg := &config.ModelConfig{}
|
||||
llmCfg.TemplateConfig.UseTokenizerTemplate = true
|
||||
|
||||
handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0)
|
||||
|
||||
Expect(handled).To(BeTrue())
|
||||
// The spoken content was streamed live.
|
||||
Expect(t.transcriptDeltaText()).To(Equal("Let me check."))
|
||||
// The tool call is emitted as a function_call item.
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseFunctionCallArgumentsDone)).To(Equal(1))
|
||||
// Exactly one terminal response.done.
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseDone)).To(Equal(1))
|
||||
})
|
||||
|
||||
It("emits only tool-call items for a content-less tool turn (no empty assistant item)", func() {
|
||||
on := true
|
||||
toolDelta := []*proto.ChatDelta{{ToolCalls: []*proto.ToolCallDelta{{Index: 0, Name: "get_weather", Arguments: `{"city":"Rome"}`}}}}
|
||||
m := &fakeModel{
|
||||
predictTokens: []string{""},
|
||||
predictChunkDeltas: [][]*proto.ChatDelta{toolDelta},
|
||||
predictResp: backend.LLMResponse{ChatDeltas: toolDelta},
|
||||
}
|
||||
session := &Session{
|
||||
OutputSampleRate: 24000,
|
||||
ModelInterface: m,
|
||||
ModelConfig: &config.ModelConfig{
|
||||
Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{LLM: &on, TTS: &on}},
|
||||
},
|
||||
}
|
||||
conv := &Conversation{}
|
||||
t := &fakeTransport{}
|
||||
llmCfg := &config.ModelConfig{}
|
||||
llmCfg.TemplateConfig.UseTokenizerTemplate = true
|
||||
|
||||
handled := streamLLMResponse(context.Background(), session, conv, t, "resp1", nil, nil, llmCfg, nil, nil, 0)
|
||||
|
||||
Expect(handled).To(BeTrue())
|
||||
// No content → no transcript deltas and no spurious assistant content item.
|
||||
Expect(t.transcriptDeltaText()).To(Equal(""))
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseOutputAudioTranscriptDelta)).To(Equal(0))
|
||||
// The tool call is still emitted.
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseFunctionCallArgumentsDone)).To(Equal(1))
|
||||
Expect(t.countEvents(types.ServerEventTypeResponseDone)).To(Equal(1))
|
||||
})
|
||||
})
|
||||
33
core/http/endpoints/openai/realtime_thinking.go
Normal file
33
core/http/endpoints/openai/realtime_thinking.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
"github.com/mudler/LocalAI/pkg/reasoning"
|
||||
)
|
||||
|
||||
// applyPipelineThinking forces the LLM's reasoning/thinking off when the realtime
|
||||
// pipeline sets disable_thinking, mapping to the enable_thinking=false backend
|
||||
// metadata via ReasoningConfig.DisableReasoning. The LLM config passed in is the
|
||||
// per-session copy returned by the config loader, so this does not affect other
|
||||
// users of the same model. When the pipeline does not set disable_thinking the
|
||||
// LLM config is left untouched.
|
||||
func applyPipelineThinking(llm *config.ModelConfig, pipeline config.Pipeline) {
|
||||
if llm == nil || !pipeline.ThinkingDisabled() {
|
||||
return
|
||||
}
|
||||
disable := true
|
||||
llm.ReasoningConfig.DisableReasoning = &disable
|
||||
}
|
||||
|
||||
// spokenReasoningConfig adapts a model's reasoning config for stripping reasoning
|
||||
// OUT of realtime spoken output. ReasoningConfig.DisableReasoning is overloaded:
|
||||
// the backend reads it as the "enable_thinking=false" hint (which pipeline
|
||||
// disable_thinking sets via applyPipelineThinking), but the reasoning extractor
|
||||
// reads it as "skip stripping, assume there is no reasoning". Honouring the latter
|
||||
// when extracting for speech would leak raw <think>…</think> whenever the model
|
||||
// ignores the suppression hint. Spoken output must never contain reasoning, so we
|
||||
// always strip: clear DisableReasoning while keeping custom tokens/tag pairs.
|
||||
func spokenReasoningConfig(cfg reasoning.Config) reasoning.Config {
|
||||
cfg.DisableReasoning = nil
|
||||
return cfg
|
||||
}
|
||||
50
core/http/endpoints/openai/realtime_thinking_test.go
Normal file
50
core/http/endpoints/openai/realtime_thinking_test.go
Normal file
@@ -0,0 +1,50 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
"github.com/mudler/LocalAI/pkg/reasoning"
|
||||
)
|
||||
|
||||
// applyPipelineThinking lets a realtime pipeline force the LLM's thinking off
|
||||
// (enable_thinking=false metadata) without editing the LLM model config.
|
||||
var _ = Describe("applyPipelineThinking", func() {
|
||||
It("disables reasoning on the LLM config when the pipeline disables thinking", func() {
|
||||
disable := true
|
||||
llm := &config.ModelConfig{}
|
||||
applyPipelineThinking(llm, config.Pipeline{DisableThinking: &disable})
|
||||
Expect(llm.ReasoningConfig.DisableReasoning).ToNot(BeNil())
|
||||
Expect(*llm.ReasoningConfig.DisableReasoning).To(BeTrue())
|
||||
})
|
||||
|
||||
It("leaves the LLM config untouched when the pipeline does not set disable_thinking", func() {
|
||||
llm := &config.ModelConfig{}
|
||||
applyPipelineThinking(llm, config.Pipeline{})
|
||||
Expect(llm.ReasoningConfig.DisableReasoning).To(BeNil())
|
||||
})
|
||||
})
|
||||
|
||||
// spokenReasoningConfig clears DisableReasoning so realtime spoken output always
|
||||
// strips reasoning, even though disable_thinking sets DisableReasoning=true on the
|
||||
// LLM config (which the backend reads as enable_thinking=false).
|
||||
var _ = Describe("spokenReasoningConfig", func() {
|
||||
It("clears DisableReasoning so the extractor still strips leaked reasoning", func() {
|
||||
disable := true
|
||||
out := spokenReasoningConfig(reasoning.Config{DisableReasoning: &disable})
|
||||
Expect(out.DisableReasoning).To(BeNil())
|
||||
})
|
||||
|
||||
It("preserves the other reasoning settings", func() {
|
||||
disable := true
|
||||
out := spokenReasoningConfig(reasoning.Config{
|
||||
DisableReasoning: &disable,
|
||||
ThinkingStartTokens: []string{"<reason>"},
|
||||
TagPairs: []reasoning.TagPair{{Start: "<reason>", End: "</reason>"}},
|
||||
})
|
||||
Expect(out.ThinkingStartTokens).To(Equal([]string{"<reason>"}))
|
||||
Expect(out.TagPairs).To(HaveLen(1))
|
||||
Expect(out.TagPairs[0].Start).To(Equal("<reason>"))
|
||||
})
|
||||
})
|
||||
63
core/http/endpoints/openai/realtime_transcription.go
Normal file
63
core/http/endpoints/openai/realtime_transcription.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
|
||||
)
|
||||
|
||||
// emitTranscription transcribes a committed utterance and emits the transcription
|
||||
// events for it, returning the final transcript text. With
|
||||
// pipeline.streaming.transcription enabled it streams each transcript fragment as
|
||||
// a conversation.item.input_audio_transcription.delta as the backend produces it,
|
||||
// then a completed event; otherwise it transcribes the whole utterance and emits
|
||||
// a single completed event. delta and completed events share itemID.
|
||||
func emitTranscription(ctx context.Context, t Transport, session *Session, itemID, audioPath string) (string, error) {
|
||||
cfg := session.InputAudioTranscription
|
||||
|
||||
if session.ModelConfig != nil && session.ModelConfig.Pipeline.StreamTranscription() {
|
||||
final, err := session.ModelInterface.TranscribeStream(ctx, audioPath, cfg.Language, false, false, cfg.Prompt, func(delta string) {
|
||||
_ = t.SendEvent(types.ConversationItemInputAudioTranscriptionDeltaEvent{
|
||||
ServerEventBase: types.ServerEventBase{EventID: "event_TODO"},
|
||||
ItemID: itemID,
|
||||
ContentIndex: 0,
|
||||
Delta: delta,
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
transcript := ""
|
||||
if final != nil {
|
||||
transcript = final.Text
|
||||
}
|
||||
if err := t.SendEvent(types.ConversationItemInputAudioTranscriptionCompletedEvent{
|
||||
ServerEventBase: types.ServerEventBase{EventID: "event_TODO"},
|
||||
ItemID: itemID,
|
||||
ContentIndex: 0,
|
||||
Transcript: transcript,
|
||||
}); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return transcript, nil
|
||||
}
|
||||
|
||||
// Unary fallback: transcribe the whole utterance, emit one completed event.
|
||||
tr, err := session.ModelInterface.Transcribe(ctx, audioPath, cfg.Language, false, false, cfg.Prompt)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if tr == nil {
|
||||
return "", fmt.Errorf("transcribe result is nil")
|
||||
}
|
||||
if err := t.SendEvent(types.ConversationItemInputAudioTranscriptionCompletedEvent{
|
||||
ServerEventBase: types.ServerEventBase{EventID: "event_TODO"},
|
||||
ItemID: itemID,
|
||||
ContentIndex: 0,
|
||||
Transcript: tr.Text,
|
||||
}); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return tr.Text, nil
|
||||
}
|
||||
54
core/http/endpoints/openai/realtime_transcription_test.go
Normal file
54
core/http/endpoints/openai/realtime_transcription_test.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
"github.com/mudler/LocalAI/core/http/endpoints/openai/types"
|
||||
"github.com/mudler/LocalAI/core/schema"
|
||||
)
|
||||
|
||||
// emitTranscription transcribes a committed utterance, streaming transcript text
|
||||
// deltas when the pipeline opts in, and returns the final transcript text.
|
||||
var _ = Describe("emitTranscription", func() {
|
||||
It("streams transcription deltas then a completed event when streaming is enabled", func() {
|
||||
on := true
|
||||
session := &Session{
|
||||
InputAudioTranscription: &types.AudioTranscription{},
|
||||
ModelConfig: &config.ModelConfig{
|
||||
Pipeline: config.Pipeline{Streaming: config.PipelineStreaming{Transcription: &on}},
|
||||
},
|
||||
ModelInterface: &fakeModel{
|
||||
transcribeDeltas: []string{"Hel", "lo", " world"},
|
||||
transcribeFinal: &schema.TranscriptionResult{Text: "Hello world"},
|
||||
},
|
||||
}
|
||||
t := &fakeTransport{}
|
||||
|
||||
transcript, err := emitTranscription(context.Background(), t, session, "item1", "/tmp/x.wav")
|
||||
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(transcript).To(Equal("Hello world"))
|
||||
Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionDelta)).To(Equal(3))
|
||||
Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionCompleted)).To(Equal(1))
|
||||
})
|
||||
|
||||
It("emits a single completed event with no deltas in unary mode", func() {
|
||||
session := &Session{
|
||||
InputAudioTranscription: &types.AudioTranscription{},
|
||||
ModelConfig: &config.ModelConfig{}, // streaming off
|
||||
ModelInterface: &fakeModel{transcribeFinal: &schema.TranscriptionResult{Text: "Hi"}},
|
||||
}
|
||||
t := &fakeTransport{}
|
||||
|
||||
transcript, err := emitTranscription(context.Background(), t, session, "item1", "/tmp/x.wav")
|
||||
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(transcript).To(Equal("Hi"))
|
||||
Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionDelta)).To(Equal(0))
|
||||
Expect(t.countEvents(types.ServerEventTypeConversationItemInputAudioTranscriptionCompleted)).To(Equal(1))
|
||||
})
|
||||
})
|
||||
@@ -48,7 +48,8 @@ func RealtimeCalls(application *application.Application) echo.HandlerFunc {
|
||||
return c.JSON(http.StatusInternalServerError, map[string]string{"error": "codec registration failed"})
|
||||
}
|
||||
|
||||
api := webrtc.NewAPI(webrtc.WithMediaEngine(m))
|
||||
se := webRTCSettingEngine(application.ApplicationConfig())
|
||||
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithSettingEngine(se))
|
||||
|
||||
pc, err := api.NewPeerConnection(webrtc.Configuration{})
|
||||
if err != nil {
|
||||
|
||||
47
core/http/endpoints/openai/realtime_webrtc_ice.go
Normal file
47
core/http/endpoints/openai/realtime_webrtc_ice.go
Normal file
@@ -0,0 +1,47 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
"github.com/mudler/xlog"
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
// webRTCSettingEngine builds the pion SettingEngine for /v1/realtime WebRTC.
|
||||
//
|
||||
// With a default (empty) SettingEngine, pion gathers a host ICE candidate for
|
||||
// every local interface. Under Docker host networking that includes bridge
|
||||
// addresses (docker0/veth, 172.x) that a remote browser cannot route to; the
|
||||
// connection often establishes on a good pair and then drops once ICE consent
|
||||
// checks fail on the unreachable ones. The two opt-in knobs below let an
|
||||
// operator advertise only the reachable address.
|
||||
func webRTCSettingEngine(cfg *config.ApplicationConfig) webrtc.SettingEngine {
|
||||
s := webrtc.SettingEngine{}
|
||||
if cfg == nil {
|
||||
return s
|
||||
}
|
||||
if len(cfg.WebRTCNAT1To1IPs) > 0 {
|
||||
s.SetNAT1To1IPs(cfg.WebRTCNAT1To1IPs, webrtc.ICECandidateTypeHost)
|
||||
xlog.Debug("realtime webrtc: advertising NAT 1:1 host IPs", "ips", cfg.WebRTCNAT1To1IPs)
|
||||
}
|
||||
if filter := iceInterfaceFilter(cfg.WebRTCICEInterfaces); filter != nil {
|
||||
s.SetInterfaceFilter(filter)
|
||||
xlog.Debug("realtime webrtc: restricting ICE interfaces", "interfaces", cfg.WebRTCICEInterfaces)
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
// iceInterfaceFilter returns an interface allow-list predicate for pion, or nil
|
||||
// when no interfaces are configured (pion's default: gather from all).
|
||||
func iceInterfaceFilter(allowed []string) func(string) bool {
|
||||
if len(allowed) == 0 {
|
||||
return nil
|
||||
}
|
||||
set := make(map[string]struct{}, len(allowed))
|
||||
for _, name := range allowed {
|
||||
set[name] = struct{}{}
|
||||
}
|
||||
return func(iface string) bool {
|
||||
_, ok := set[iface]
|
||||
return ok
|
||||
}
|
||||
}
|
||||
39
core/http/endpoints/openai/realtime_webrtc_ice_test.go
Normal file
39
core/http/endpoints/openai/realtime_webrtc_ice_test.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package openai
|
||||
|
||||
import (
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("webRTC ICE settings", func() {
|
||||
Describe("iceInterfaceFilter", func() {
|
||||
It("returns nil when no interfaces are configured", func() {
|
||||
Expect(iceInterfaceFilter(nil)).To(BeNil())
|
||||
Expect(iceInterfaceFilter([]string{})).To(BeNil())
|
||||
})
|
||||
|
||||
It("admits only the configured interfaces", func() {
|
||||
f := iceInterfaceFilter([]string{"eth0", "wlan0"})
|
||||
Expect(f).NotTo(BeNil())
|
||||
Expect(f("eth0")).To(BeTrue())
|
||||
Expect(f("wlan0")).To(BeTrue())
|
||||
Expect(f("docker0")).To(BeFalse())
|
||||
Expect(f("veth123")).To(BeFalse())
|
||||
})
|
||||
})
|
||||
|
||||
Describe("webRTCSettingEngine", func() {
|
||||
It("does not panic on a nil config", func() {
|
||||
Expect(func() { webRTCSettingEngine(nil) }).NotTo(Panic())
|
||||
})
|
||||
|
||||
It("builds an engine with NAT 1:1 IPs and an interface filter configured", func() {
|
||||
cfg := &config.ApplicationConfig{
|
||||
WebRTCNAT1To1IPs: []string{"192.168.1.10"},
|
||||
WebRTCICEInterfaces: []string{"eth0"},
|
||||
}
|
||||
Expect(func() { webRTCSettingEngine(cfg) }).NotTo(Panic())
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -1356,7 +1356,7 @@ func handleOpenResponsesNonStream(c echo.Context, responseID string, createdAt i
|
||||
thinkingStartToken := reason.DetectThinkingStartToken(template, &cfg.ReasoningConfig)
|
||||
|
||||
// Extract reasoning from result before cleaning
|
||||
reasoningContent, cleanedResult := reason.ExtractReasoningWithConfig(result, thinkingStartToken, cfg.ReasoningConfig)
|
||||
reasoningContent, cleanedResult := reason.ExtractReasoningComplete(result, thinkingStartToken, cfg.ReasoningConfig)
|
||||
|
||||
// Parse tool calls if using functions
|
||||
var outputItems []schema.ORItemField
|
||||
@@ -1996,7 +1996,7 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
|
||||
finalCleanedResult = extractor.CleanedContent()
|
||||
}
|
||||
if finalReasoning == "" && finalCleanedResult == "" {
|
||||
finalReasoning, finalCleanedResult = reason.ExtractReasoningWithConfig(result, thinkingStartToken, cfg.ReasoningConfig)
|
||||
finalReasoning, finalCleanedResult = reason.ExtractReasoningComplete(result, thinkingStartToken, cfg.ReasoningConfig)
|
||||
}
|
||||
|
||||
// Close reasoning item if it exists and wasn't closed yet
|
||||
@@ -2493,7 +2493,7 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
|
||||
finalCleanedResult = extractor.CleanedContent()
|
||||
}
|
||||
if finalReasoning == "" && finalCleanedResult == "" {
|
||||
finalReasoning, finalCleanedResult = reason.ExtractReasoningWithConfig(result, thinkingStartToken, cfg.ReasoningConfig)
|
||||
finalReasoning, finalCleanedResult = reason.ExtractReasoningComplete(result, thinkingStartToken, cfg.ReasoningConfig)
|
||||
}
|
||||
|
||||
// Close reasoning item if it exists and wasn't closed yet
|
||||
|
||||
6
core/http/react-ui/src/hooks/useChat.js
vendored
6
core/http/react-ui/src/hooks/useChat.js
vendored
@@ -216,6 +216,12 @@ export function useChat(initialModel = '') {
|
||||
audio_url: { url: `data:${file.type};base64,${file.base64}` },
|
||||
})
|
||||
userFiles.push({ name: file.name, type: 'audio' })
|
||||
} else if (file.type?.startsWith('video/')) {
|
||||
messageContent.push({
|
||||
type: 'video_url',
|
||||
video_url: { url: `data:${file.type};base64,${file.base64}` },
|
||||
})
|
||||
userFiles.push({ name: file.name, type: 'video' })
|
||||
} else {
|
||||
// Text/PDF files - append to content
|
||||
if (file.textContent) {
|
||||
|
||||
@@ -265,7 +265,7 @@ function UserMessageContent({ content, files }) {
|
||||
<div className="chat-message-files">
|
||||
{files.map((f, i) => (
|
||||
<span key={i} className="chat-file-inline">
|
||||
<i className={`fas ${f.type === 'image' ? 'fa-image' : f.type === 'audio' ? 'fa-headphones' : 'fa-file'}`} />
|
||||
<i className={`fas ${f.type === 'image' ? 'fa-image' : f.type === 'audio' ? 'fa-headphones' : f.type === 'video' ? 'fa-film' : 'fa-file'}`} />
|
||||
{f.name}
|
||||
</span>
|
||||
))}
|
||||
@@ -274,6 +274,9 @@ function UserMessageContent({ content, files }) {
|
||||
{Array.isArray(content) && content.filter(c => c.type === 'image_url').map((img, i) => (
|
||||
<img key={i} src={img.image_url.url} alt="attached" className="chat-inline-image" />
|
||||
))}
|
||||
{Array.isArray(content) && content.filter(c => c.type === 'video_url').map((vid, i) => (
|
||||
<video key={i} src={vid.video_url.url} controls className="chat-inline-video" />
|
||||
))}
|
||||
</>
|
||||
)
|
||||
}
|
||||
@@ -711,7 +714,7 @@ export default function Chat() {
|
||||
for (const file of e.target.files) {
|
||||
const base64 = await fileToBase64(file)
|
||||
const entry = { name: file.name, type: file.type, base64 }
|
||||
if (!file.type.startsWith('image/') && !file.type.startsWith('audio/')) {
|
||||
if (!file.type.startsWith('image/') && !file.type.startsWith('audio/') && !file.type.startsWith('video/')) {
|
||||
entry.textContent = await file.text().catch(() => '')
|
||||
}
|
||||
newFiles.push(entry)
|
||||
@@ -1244,7 +1247,7 @@ export default function Chat() {
|
||||
<div className="chat-files">
|
||||
{files.map((f, i) => (
|
||||
<span key={i} className="chat-file-badge">
|
||||
<i className={`fas ${f.type?.startsWith('image/') ? 'fa-image' : f.type?.startsWith('audio/') ? 'fa-headphones' : 'fa-file'}`} />
|
||||
<i className={`fas ${f.type?.startsWith('image/') ? 'fa-image' : f.type?.startsWith('audio/') ? 'fa-headphones' : f.type?.startsWith('video/') ? 'fa-film' : 'fa-file'}`} />
|
||||
{f.name}
|
||||
<button onClick={() => setFiles(prev => prev.filter((_, idx) => idx !== i))}>
|
||||
<i className="fas fa-xmark" />
|
||||
@@ -1343,7 +1346,7 @@ export default function Chat() {
|
||||
ref={fileInputRef}
|
||||
type="file"
|
||||
multiple
|
||||
accept="image/*,audio/*,application/pdf,.txt,.md,.csv,.json"
|
||||
accept="image/*,audio/*,video/*,application/pdf,.txt,.md,.csv,.json"
|
||||
style={{ display: 'none' }}
|
||||
onChange={handleFileChange}
|
||||
/>
|
||||
|
||||
@@ -17,6 +17,24 @@ const STATUS_STYLES = {
|
||||
error: { icon: 'fa-solid fa-circle', color: 'var(--color-error)', bg: 'var(--color-error-light)' },
|
||||
}
|
||||
|
||||
// upsertAssistant merges a streamed transcript fragment into the assistant entry
|
||||
// identified by the server's item_id, or appends a new entry if none exists yet.
|
||||
// Keying by item_id (not a mutable index tracked across handler/updater
|
||||
// boundaries) makes streamed deltas idempotent and order-independent, so React's
|
||||
// batching of non-React data-channel events cannot produce a duplicate bubble.
|
||||
// mode 'append' adds to the running text; 'replace' sets the final transcript.
|
||||
function upsertAssistant(prev, itemId, text, mode) {
|
||||
// Only assistant entries carry an id, and the streaming entry is almost
|
||||
// always the newest — search from the tail so per-delta cost stays constant.
|
||||
const i = prev.findLastIndex(e => e.id === itemId)
|
||||
if (i === -1) {
|
||||
return [...prev, { role: 'assistant', id: itemId, text }]
|
||||
}
|
||||
const next = [...prev]
|
||||
next[i] = { ...next[i], text: mode === 'append' ? next[i].text + text : text }
|
||||
return next
|
||||
}
|
||||
|
||||
export default function Talk() {
|
||||
const { addToast } = useOutletContext()
|
||||
const navigate = useNavigate()
|
||||
@@ -34,7 +52,10 @@ export default function Talk() {
|
||||
|
||||
// Transcript
|
||||
const [transcript, setTranscript] = useState([])
|
||||
const streamingRef = useRef(null) // tracks the index of the in-progress assistant message
|
||||
// item_id of the assistant message currently streaming — used only to remove
|
||||
// its partial bubble when a response is cancelled (barge-in). The transcript
|
||||
// itself is keyed by item_id via upsertAssistant, not by this ref.
|
||||
const inProgressIdRef = useRef(null)
|
||||
|
||||
// Session settings
|
||||
const [instructions, setInstructions] = useState(
|
||||
@@ -227,39 +248,21 @@ export default function Talk() {
|
||||
break
|
||||
case 'conversation.item.input_audio_transcription.completed':
|
||||
if (event.transcript) {
|
||||
streamingRef.current = null
|
||||
setTranscript(prev => [...prev, { role: 'user', text: event.transcript }])
|
||||
}
|
||||
updateStatus('thinking', 'Generating response...')
|
||||
break
|
||||
case 'response.output_audio_transcript.delta':
|
||||
if (event.delta) {
|
||||
setTranscript(prev => {
|
||||
if (streamingRef.current !== null) {
|
||||
const updated = [...prev]
|
||||
updated[streamingRef.current] = {
|
||||
...updated[streamingRef.current],
|
||||
text: updated[streamingRef.current].text + event.delta,
|
||||
}
|
||||
return updated
|
||||
}
|
||||
streamingRef.current = prev.length
|
||||
return [...prev, { role: 'assistant', text: event.delta }]
|
||||
})
|
||||
inProgressIdRef.current = event.item_id
|
||||
setTranscript(prev => upsertAssistant(prev, event.item_id, event.delta, 'append'))
|
||||
}
|
||||
break
|
||||
case 'response.output_audio_transcript.done':
|
||||
if (event.transcript) {
|
||||
setTranscript(prev => {
|
||||
if (streamingRef.current !== null) {
|
||||
const updated = [...prev]
|
||||
updated[streamingRef.current] = { ...updated[streamingRef.current], text: event.transcript }
|
||||
return updated
|
||||
}
|
||||
return [...prev, { role: 'assistant', text: event.transcript }]
|
||||
})
|
||||
setTranscript(prev => upsertAssistant(prev, event.item_id, event.transcript, 'replace'))
|
||||
}
|
||||
streamingRef.current = null
|
||||
inProgressIdRef.current = null
|
||||
break
|
||||
case 'response.output_audio.delta':
|
||||
updateStatus('speaking', 'Speaking...')
|
||||
@@ -281,7 +284,7 @@ export default function Talk() {
|
||||
// Pretty-print JSON for readability; fall back to raw string.
|
||||
try { preview = JSON.stringify(JSON.parse(preview), null, 2) } catch (_) { /* keep raw */ }
|
||||
setTranscript(prev => [...prev, { role: 'tool_result', text: preview }])
|
||||
streamingRef.current = null // tool result ends the current assistant text run
|
||||
inProgressIdRef.current = null // tool result ends the current assistant text run
|
||||
}
|
||||
break
|
||||
}
|
||||
@@ -290,9 +293,20 @@ export default function Talk() {
|
||||
// conversation.item.create + response.create when it's done.
|
||||
handleFunctionCall(event)
|
||||
break
|
||||
case 'response.done':
|
||||
case 'response.done': {
|
||||
// A cancelled response (barge-in / interruption) leaves a partial,
|
||||
// incrementally-streamed assistant bubble behind. The server discards
|
||||
// the interrupted item from history; mirror that here (remove the
|
||||
// in-progress assistant entry by item_id) so the regenerated reply
|
||||
// doesn't show up as a second assistant message.
|
||||
if (event.response?.status === 'cancelled' && inProgressIdRef.current) {
|
||||
const id = inProgressIdRef.current
|
||||
inProgressIdRef.current = null
|
||||
setTranscript(prev => prev.filter(e => e.id !== id))
|
||||
}
|
||||
updateStatus('listening', 'Listening...')
|
||||
break
|
||||
}
|
||||
case 'error':
|
||||
hasErrorRef.current = true
|
||||
updateStatus('error', 'Error: ' + (event.error?.message || 'Unknown error'))
|
||||
@@ -789,7 +803,7 @@ export default function Talk() {
|
||||
const iconColor = isToolCall || isToolResult ? 'var(--color-text-secondary)'
|
||||
: isUser ? 'var(--color-primary)' : 'var(--color-accent)'
|
||||
return (
|
||||
<div key={i} style={{ display: 'flex', alignItems: 'flex-start', gap: 'var(--spacing-xs)' }}>
|
||||
<div key={entry.id || i} style={{ display: 'flex', alignItems: 'flex-start', gap: 'var(--spacing-xs)' }}>
|
||||
<i className={iconClass} style={{ color: iconColor, marginTop: 3, flexShrink: 0, fontSize: '0.75rem' }} />
|
||||
<p style={{
|
||||
margin: 0,
|
||||
|
||||
@@ -466,10 +466,11 @@ func (s *AgentPoolService) Chat(name, message string) (string, error) {
|
||||
s.collectAndCopyMetadata(metadata, chatUserID)
|
||||
}
|
||||
|
||||
content := s.appendLocalAGIKBCitations(response.Response, name, message, response.State)
|
||||
msg := map[string]any{
|
||||
"id": messageID + "-agent",
|
||||
"sender": "agent",
|
||||
"content": response.Response,
|
||||
"content": content,
|
||||
"timestamp": time.Now().Format(time.RFC3339),
|
||||
}
|
||||
if len(metadata) > 0 {
|
||||
@@ -489,6 +490,79 @@ func (s *AgentPoolService) Chat(name, message string) (string, error) {
|
||||
return messageID, nil
|
||||
}
|
||||
|
||||
func (s *AgentPoolService) appendLocalAGIKBCitations(response, agentKey, message string, states []coreTypes.ActionState) string {
|
||||
if strings.TrimSpace(response) == "" {
|
||||
return response
|
||||
}
|
||||
|
||||
userID, collection := splitAgentKey(agentKey)
|
||||
cfg := s.localAGI.pool.GetConfig(agentKey)
|
||||
if cfg == nil || !cfg.EnableKnowledgeBase {
|
||||
return response
|
||||
}
|
||||
|
||||
citations := kbCitationsFromActionStates(states)
|
||||
if len(citations) == 0 && cfg.KBAutoSearch {
|
||||
maxResults := cfg.KnowledgeBaseResults
|
||||
if maxResults <= 0 {
|
||||
maxResults = 5
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
kbResult := agents.KBAutoSearchPrompt(ctx, s.apiURL, s.apiKey, collection, message, maxResults, userID)
|
||||
citations = kbResult.Citations
|
||||
}
|
||||
|
||||
return agents.AppendKBCitations(response, collection, userID, citations)
|
||||
}
|
||||
|
||||
func splitAgentKey(agentKey string) (userID, name string) {
|
||||
if uid, n, ok := strings.Cut(agentKey, ":"); ok {
|
||||
return uid, n
|
||||
}
|
||||
return "", agentKey
|
||||
}
|
||||
|
||||
func kbCitationsFromActionStates(states []coreTypes.ActionState) []agents.KBCitation {
|
||||
var citations []agents.KBCitation
|
||||
for _, state := range states {
|
||||
citations = append(citations, kbCitationsFromMetadata(state.Metadata)...)
|
||||
}
|
||||
return citations
|
||||
}
|
||||
|
||||
func kbCitationsFromMetadata(metadata map[string]any) []agents.KBCitation {
|
||||
if len(metadata) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
fileName := metadata["file_name"]
|
||||
source := metadata["source"]
|
||||
if fileName == nil && source == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
citation := agents.KBCitation{
|
||||
FileName: metadataString(fileName),
|
||||
EntryKey: metadataString(source),
|
||||
}
|
||||
if citation.FileName == "" && citation.EntryKey == "" {
|
||||
return nil
|
||||
}
|
||||
return []agents.KBCitation{citation}
|
||||
}
|
||||
|
||||
func metadataString(value any) string {
|
||||
switch v := value.(type) {
|
||||
case string:
|
||||
return v
|
||||
case fmt.Stringer:
|
||||
return v.String()
|
||||
default:
|
||||
return ""
|
||||
}
|
||||
}
|
||||
|
||||
// userOutputsDir returns the per-user outputs directory, creating it if needed.
|
||||
// If userID is empty, falls back to the shared outputs directory.
|
||||
func (s *AgentPoolService) userOutputsDir(userID string) string {
|
||||
|
||||
127
core/services/agents/citations.go
Normal file
127
core/services/agents/citations.go
Normal file
@@ -0,0 +1,127 @@
|
||||
package agents
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type kbCitationList struct {
|
||||
mu sync.Mutex
|
||||
citations []KBCitation
|
||||
}
|
||||
|
||||
func (l *kbCitationList) AddKBCitations(citations []KBCitation) {
|
||||
if len(citations) == 0 {
|
||||
return
|
||||
}
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
l.citations = append(l.citations, citations...)
|
||||
}
|
||||
|
||||
func (l *kbCitationList) Citations() []KBCitation {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
out := make([]KBCitation, len(l.citations))
|
||||
copy(out, l.citations)
|
||||
return out
|
||||
}
|
||||
|
||||
// AppendKBCitations appends a markdown Sources block for KB citations.
|
||||
func AppendKBCitations(response, collection, userID string, citations []KBCitation) string {
|
||||
if strings.TrimSpace(response) == "" || len(citations) == 0 {
|
||||
return response
|
||||
}
|
||||
|
||||
var lines []string
|
||||
seen := make(map[string]struct{})
|
||||
for _, citation := range citations {
|
||||
key := strings.TrimSpace(citation.EntryKey)
|
||||
if key == "" {
|
||||
key = strings.TrimSpace(citation.FileName)
|
||||
}
|
||||
if key == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[key]; ok {
|
||||
continue
|
||||
}
|
||||
seen[key] = struct{}{}
|
||||
|
||||
displayName := kbCitationDisplayName(citation)
|
||||
if displayName == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
sourceURL := kbCitationRawFileURL(collection, citation.EntryKey, userID)
|
||||
number := len(lines) + 1
|
||||
if sourceURL == "" {
|
||||
lines = append(lines, fmt.Sprintf("[%d] %s", number, displayName))
|
||||
continue
|
||||
}
|
||||
lines = append(lines, fmt.Sprintf("[%d] [%s](%s)", number, escapeMarkdownLinkText(displayName), sourceURL))
|
||||
}
|
||||
|
||||
if len(lines) == 0 {
|
||||
return response
|
||||
}
|
||||
|
||||
var sb strings.Builder
|
||||
sb.WriteString(strings.TrimRight(response, "\n"))
|
||||
sb.WriteString("\n\nSources:\n")
|
||||
for _, line := range lines {
|
||||
sb.WriteString(line)
|
||||
sb.WriteString("\n")
|
||||
}
|
||||
return strings.TrimRight(sb.String(), "\n")
|
||||
}
|
||||
|
||||
func kbCitationDisplayName(citation KBCitation) string {
|
||||
if fileName := strings.TrimSpace(citation.FileName); fileName != "" {
|
||||
return fileName
|
||||
}
|
||||
|
||||
segments := strings.Split(strings.Trim(strings.TrimSpace(citation.EntryKey), "/"), "/")
|
||||
for i := len(segments) - 1; i >= 0; i-- {
|
||||
if segment := strings.TrimSpace(segments[i]); segment != "" {
|
||||
return segment
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func kbCitationRawFileURL(collection, entryKey, userID string) string {
|
||||
collection = strings.TrimSpace(collection)
|
||||
entryKey = strings.Trim(strings.TrimSpace(entryKey), "/")
|
||||
if collection == "" || entryKey == "" {
|
||||
return ""
|
||||
}
|
||||
|
||||
var escapedEntrySegments []string
|
||||
for _, segment := range strings.Split(entryKey, "/") {
|
||||
if segment == "" {
|
||||
continue
|
||||
}
|
||||
escapedEntrySegments = append(escapedEntrySegments, url.PathEscape(segment))
|
||||
}
|
||||
if len(escapedEntrySegments) == 0 {
|
||||
return ""
|
||||
}
|
||||
|
||||
sourceURL := "/api/agents/collections/" + url.PathEscape(collection) + "/entries-raw/" + strings.Join(escapedEntrySegments, "/")
|
||||
if userID != "" {
|
||||
query := url.Values{}
|
||||
query.Set("user_id", userID)
|
||||
sourceURL += "?" + query.Encode()
|
||||
}
|
||||
return sourceURL
|
||||
}
|
||||
|
||||
func escapeMarkdownLinkText(text string) string {
|
||||
text = strings.ReplaceAll(text, `\`, `\\`)
|
||||
text = strings.ReplaceAll(text, "[", `\[`)
|
||||
text = strings.ReplaceAll(text, "]", `\]`)
|
||||
return text
|
||||
}
|
||||
@@ -167,10 +167,12 @@ func ExecuteChatWithLLM(ctx context.Context, llm cogito.LLM, cfg *AgentConfig, m
|
||||
}
|
||||
}
|
||||
|
||||
kbCitations := &kbCitationList{}
|
||||
if cfg.EnableKnowledgeBase && (kbMode == KBModeAutoSearch || kbMode == KBModeBoth) {
|
||||
kbResults := KBAutoSearchPrompt(ctx, effectiveURL, effectiveKey, cfg.Name, message, cfg.KnowledgeBaseResults, userID)
|
||||
if kbResults != "" {
|
||||
fragment = fragment.AddMessage(cogito.SystemMessageRole, kbResults)
|
||||
kbResult := KBAutoSearchPrompt(ctx, effectiveURL, effectiveKey, cfg.Name, message, cfg.KnowledgeBaseResults, userID)
|
||||
if kbResult.Prompt != "" {
|
||||
fragment = fragment.AddMessage(cogito.SystemMessageRole, kbResult.Prompt)
|
||||
kbCitations.AddKBCitations(kbResult.Citations)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -197,7 +199,7 @@ func ExecuteChatWithLLM(ctx context.Context, llm cogito.LLM, cfg *AgentConfig, m
|
||||
}
|
||||
cogitoOpts = append(cogitoOpts, cogito.WithTools(
|
||||
cogito.NewToolDefinition(
|
||||
KBSearchMemoryTool{APIURL: effectiveURL, APIKey: effectiveKey, Collection: cfg.Name, MaxResults: kbResults, UserID: userID},
|
||||
KBSearchMemoryTool{APIURL: effectiveURL, APIKey: effectiveKey, Collection: cfg.Name, MaxResults: kbResults, UserID: userID, CitationCollector: kbCitations},
|
||||
KBSearchMemoryArgs{},
|
||||
"search_memory",
|
||||
"Search the knowledge base for relevant information",
|
||||
@@ -336,6 +338,8 @@ func ExecuteChatWithLLM(ctx context.Context, llm cogito.LLM, cfg *AgentConfig, m
|
||||
if cfg.StripThinkingTags && response != "" {
|
||||
response = stripThinkingTags(response)
|
||||
}
|
||||
responseForMemory := response
|
||||
response = AppendKBCitations(response, cfg.Name, userID, kbCitations.Citations())
|
||||
|
||||
// Save conversation to KB when long-term memory is enabled.
|
||||
// Use a detached context: the parent ctx may be cancelled (e.g. in distributed
|
||||
@@ -344,7 +348,7 @@ func ExecuteChatWithLLM(ctx context.Context, llm cogito.LLM, cfg *AgentConfig, m
|
||||
go func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
saveConversationToKB(ctx, llm, effectiveURL, effectiveKey, cfg, message, response, userID)
|
||||
saveConversationToKB(ctx, llm, effectiveURL, effectiveKey, cfg, message, responseForMemory, userID)
|
||||
}()
|
||||
}
|
||||
|
||||
|
||||
@@ -2,6 +2,8 @@ package agents
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
@@ -36,6 +38,34 @@ func (m *mockLLM) CreateChatCompletion(ctx context.Context, req openai.ChatCompl
|
||||
}, cogito.LLMUsage{}, nil
|
||||
}
|
||||
|
||||
type toolCallingMockLLM struct {
|
||||
createResponses []openai.ChatCompletionResponse
|
||||
askResponse string
|
||||
callCount atomic.Int32
|
||||
}
|
||||
|
||||
func (m *toolCallingMockLLM) Ask(ctx context.Context, f cogito.Fragment) (cogito.Fragment, error) {
|
||||
m.callCount.Add(1)
|
||||
return f.AddMessage(cogito.AssistantMessageRole, m.askResponse), nil
|
||||
}
|
||||
|
||||
func (m *toolCallingMockLLM) CreateChatCompletion(ctx context.Context, req openai.ChatCompletionRequest) (cogito.LLMReply, cogito.LLMUsage, error) {
|
||||
idx := int(m.callCount.Add(1)) - 1
|
||||
if idx >= len(m.createResponses) {
|
||||
return cogito.LLMReply{
|
||||
ChatCompletionResponse: openai.ChatCompletionResponse{
|
||||
Choices: []openai.ChatCompletionChoice{{
|
||||
Message: openai.ChatCompletionMessage{
|
||||
Role: "assistant",
|
||||
Content: "No more tools needed.",
|
||||
},
|
||||
}},
|
||||
},
|
||||
}, cogito.LLMUsage{}, nil
|
||||
}
|
||||
return cogito.LLMReply{ChatCompletionResponse: m.createResponses[idx]}, cogito.LLMUsage{}, nil
|
||||
}
|
||||
|
||||
// statusCollector records status callbacks in a thread-safe way.
|
||||
type statusCollector struct {
|
||||
mu sync.Mutex
|
||||
@@ -73,6 +103,74 @@ var _ = DescribeTable("stripThinkingTags",
|
||||
Entry("adjacent tag pairs", "<thinking>a</thinking><thinking>b</thinking>", ""),
|
||||
)
|
||||
|
||||
var _ = DescribeTable("appendKBCitations",
|
||||
func(response, collection, userID string, citations []KBCitation, want string) {
|
||||
Expect(AppendKBCitations(response, collection, userID, citations)).To(Equal(want))
|
||||
},
|
||||
Entry("leaves responses without citations unchanged",
|
||||
"answer",
|
||||
"agent",
|
||||
"",
|
||||
nil,
|
||||
"answer",
|
||||
),
|
||||
Entry("leaves blank responses unchanged",
|
||||
"",
|
||||
"agent",
|
||||
"",
|
||||
[]KBCitation{{FileName: "source.pdf", EntryKey: "uuid/source.pdf"}},
|
||||
"",
|
||||
),
|
||||
Entry("appends clickable source links",
|
||||
"answer",
|
||||
"my-agent",
|
||||
"",
|
||||
[]KBCitation{{FileName: "new feature.pdf", EntryKey: "uuid/new feature.pdf"}},
|
||||
"answer\n\nSources:\n[1] [new feature.pdf](/api/agents/collections/my-agent/entries-raw/uuid/new%20feature.pdf)",
|
||||
),
|
||||
Entry("deduplicates citations by entry key",
|
||||
"answer",
|
||||
"agent",
|
||||
"",
|
||||
[]KBCitation{
|
||||
{FileName: "first.pdf", EntryKey: "uuid/shared.pdf"},
|
||||
{FileName: "second.pdf", EntryKey: "uuid/shared.pdf"},
|
||||
},
|
||||
"answer\n\nSources:\n[1] [first.pdf](/api/agents/collections/agent/entries-raw/uuid/shared.pdf)",
|
||||
),
|
||||
Entry("uses plain text when entry key is missing",
|
||||
"answer",
|
||||
"agent",
|
||||
"",
|
||||
[]KBCitation{{FileName: "source.pdf"}},
|
||||
"answer\n\nSources:\n[1] source.pdf",
|
||||
),
|
||||
Entry("uses entry basename when filename is missing",
|
||||
"answer",
|
||||
"agent",
|
||||
"",
|
||||
[]KBCitation{{EntryKey: "uuid/source.pdf"}},
|
||||
"answer\n\nSources:\n[1] [source.pdf](/api/agents/collections/agent/entries-raw/uuid/source.pdf)",
|
||||
),
|
||||
Entry("adds user id query when present",
|
||||
"answer",
|
||||
"agent",
|
||||
"user 1",
|
||||
[]KBCitation{{FileName: "source.pdf", EntryKey: "uuid/source.pdf"}},
|
||||
"answer\n\nSources:\n[1] [source.pdf](/api/agents/collections/agent/entries-raw/uuid/source.pdf?user_id=user+1)",
|
||||
),
|
||||
Entry("escapes collection, path segments, and markdown link text",
|
||||
"answer",
|
||||
"agent one",
|
||||
"",
|
||||
[]KBCitation{{FileName: "source [draft].pdf", EntryKey: "uuid/source [draft].pdf"}},
|
||||
`answer
|
||||
|
||||
Sources:
|
||||
[1] [source \[draft\].pdf](/api/agents/collections/agent%20one/entries-raw/uuid/source%20%5Bdraft%5D.pdf)`,
|
||||
),
|
||||
)
|
||||
|
||||
var _ = Describe("ExecuteChatWithLLM", func() {
|
||||
var (
|
||||
ctx context.Context
|
||||
@@ -184,6 +282,150 @@ var _ = Describe("ExecuteChatWithLLM", func() {
|
||||
})
|
||||
})
|
||||
|
||||
Context("knowledge base citations", func() {
|
||||
It("appends KB sources to the returned response and callback message", func() {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/api/agents/collections/kb-agent/search", func(w http.ResponseWriter, r *http.Request) {
|
||||
Expect(r.URL.Query().Get("user_id")).To(Equal("user-1"))
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, _ = w.Write([]byte(`{
|
||||
"results": [
|
||||
{
|
||||
"content": "KB content",
|
||||
"id": "result-1",
|
||||
"similarity": 0.99,
|
||||
"metadata": {
|
||||
"file_name": "new feature.pdf",
|
||||
"source": "uuid/new feature.pdf"
|
||||
}
|
||||
}
|
||||
],
|
||||
"count": 1
|
||||
}`))
|
||||
})
|
||||
server := httptest.NewServer(mux)
|
||||
defer server.Close()
|
||||
|
||||
var msgContent string
|
||||
cb.OnMessage = func(sender, content, messageID string) {
|
||||
msgContent = content
|
||||
}
|
||||
|
||||
llm := &mockLLM{response: "agent reply"}
|
||||
cfg := &AgentConfig{
|
||||
Name: "kb-agent",
|
||||
Model: "test-model",
|
||||
EnableKnowledgeBase: true,
|
||||
KBMode: KBModeAutoSearch,
|
||||
}
|
||||
|
||||
result, err := ExecuteChatWithLLM(ctx, llm, cfg, "hello", cb, ExecuteChatOpts{
|
||||
APIURL: server.URL,
|
||||
UserID: "user-1",
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(result).To(Equal("agent reply\n\nSources:\n[1] [new feature.pdf](/api/agents/collections/kb-agent/entries-raw/uuid/new%20feature.pdf?user_id=user-1)"))
|
||||
Expect(msgContent).To(Equal(result))
|
||||
})
|
||||
|
||||
It("collects citations from the search_memory tool", func() {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/api/agents/collections/kb-agent/search", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, _ = w.Write([]byte(`{
|
||||
"results": [
|
||||
{
|
||||
"content": "Tool KB content",
|
||||
"id": "result-1",
|
||||
"similarity": 0.99,
|
||||
"metadata": {
|
||||
"file_name": "tool source.pdf",
|
||||
"source": "uuid/tool source.pdf"
|
||||
}
|
||||
}
|
||||
],
|
||||
"count": 1
|
||||
}`))
|
||||
})
|
||||
server := httptest.NewServer(mux)
|
||||
defer server.Close()
|
||||
|
||||
collector := &kbCitationList{}
|
||||
tool := KBSearchMemoryTool{
|
||||
APIURL: server.URL,
|
||||
Collection: "kb-agent",
|
||||
CitationCollector: collector,
|
||||
}
|
||||
|
||||
result, _, err := tool.Run(KBSearchMemoryArgs{Query: "hello"})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(result).To(ContainSubstring("Tool KB content"))
|
||||
Expect(collector.Citations()).To(Equal([]KBCitation{{FileName: "tool source.pdf", EntryKey: "uuid/tool source.pdf"}}))
|
||||
})
|
||||
|
||||
It("appends KB sources found through tools-only search_memory calls", func() {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/api/agents/collections/kb-agent/search", func(w http.ResponseWriter, r *http.Request) {
|
||||
Expect(r.URL.Query().Get("user_id")).To(Equal("user-1"))
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_, _ = w.Write([]byte(`{
|
||||
"results": [
|
||||
{
|
||||
"content": "Tool KB content",
|
||||
"id": "result-1",
|
||||
"similarity": 0.99,
|
||||
"metadata": {
|
||||
"file_name": "tool source.pdf",
|
||||
"source": "uuid/tool source.pdf"
|
||||
}
|
||||
}
|
||||
],
|
||||
"count": 1
|
||||
}`))
|
||||
})
|
||||
server := httptest.NewServer(mux)
|
||||
defer server.Close()
|
||||
|
||||
llm := &toolCallingMockLLM{
|
||||
askResponse: "agent reply from tool context",
|
||||
createResponses: []openai.ChatCompletionResponse{
|
||||
{
|
||||
Choices: []openai.ChatCompletionChoice{
|
||||
{
|
||||
Message: openai.ChatCompletionMessage{
|
||||
Role: "assistant",
|
||||
ToolCalls: []openai.ToolCall{
|
||||
{
|
||||
ID: "call-1",
|
||||
Type: openai.ToolTypeFunction,
|
||||
Function: openai.FunctionCall{
|
||||
Name: "search_memory",
|
||||
Arguments: `{"query":"hello"}`,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
cfg := &AgentConfig{
|
||||
Name: "kb-agent",
|
||||
Model: "test-model",
|
||||
EnableKnowledgeBase: true,
|
||||
KBMode: KBModeTools,
|
||||
}
|
||||
|
||||
result, err := ExecuteChatWithLLM(ctx, llm, cfg, "hello", cb, ExecuteChatOpts{
|
||||
APIURL: server.URL,
|
||||
UserID: "user-1",
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(result).To(Equal("agent reply from tool context\n\nSources:\n[1] [tool source.pdf](/api/agents/collections/kb-agent/entries-raw/uuid/tool%20source.pdf?user_id=user-1)"))
|
||||
})
|
||||
})
|
||||
|
||||
Context("context cancellation", func() {
|
||||
It("returns an error when context is already cancelled", func() {
|
||||
cancelledCtx, cancel := context.WithCancel(ctx)
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -17,10 +18,19 @@ import (
|
||||
"github.com/mudler/LocalAI/pkg/httpclient"
|
||||
)
|
||||
|
||||
// Metadata keys populated by localrecall for every stored chunk. The original
|
||||
// upload file name lives under file_name (used for display); source holds the
|
||||
// collection entry key ("<uuid>/<filename>") used to build the raw-file URL.
|
||||
const (
|
||||
kbMetadataFileName = "file_name"
|
||||
kbMetadataSource = "source"
|
||||
)
|
||||
|
||||
// KBSearchResult represents a search result from the knowledge base.
|
||||
// Field names mirror the collection search endpoint's JSON response.
|
||||
type KBSearchResult struct {
|
||||
Content string `json:"content"`
|
||||
Score float64 `json:"score"`
|
||||
ID string `json:"id"`
|
||||
Similarity float64 `json:"similarity"`
|
||||
Metadata map[string]string `json:"metadata"`
|
||||
}
|
||||
@@ -31,22 +41,48 @@ type kbSearchResponse struct {
|
||||
Count int `json:"count"`
|
||||
}
|
||||
|
||||
// KBAutoSearchPrompt queries the knowledge base with the user's message
|
||||
// and returns a system prompt block with relevant results.
|
||||
// KBCitation is a single source document that a KB search drew from. Citations
|
||||
// travel alongside the prompt as structured data so the consumer (and UI) can
|
||||
// render clickable source links, independent of what the model writes inline.
|
||||
type KBCitation struct {
|
||||
// FileName is the original uploaded file name, for display (e.g. "report.pdf").
|
||||
FileName string `json:"file_name"`
|
||||
// EntryKey is the collection entry identifier ("<uuid>/<filename>"), used to
|
||||
// build the raw-file URL and as the de-duplication key.
|
||||
EntryKey string `json:"entry_key"`
|
||||
}
|
||||
|
||||
// KBSearchContext is the result of an auto-search against the knowledge base:
|
||||
// the system-prompt block to feed the model, plus the de-duplicated list of
|
||||
// source documents the results were drawn from.
|
||||
type KBSearchContext struct {
|
||||
Prompt string `json:"prompt"`
|
||||
Citations []KBCitation `json:"citations"`
|
||||
}
|
||||
|
||||
// KBCitationCollector receives source citations found during KB searches.
|
||||
type KBCitationCollector interface {
|
||||
AddKBCitations([]KBCitation)
|
||||
}
|
||||
|
||||
// KBAutoSearchPrompt queries the knowledge base with the user's message and
|
||||
// returns a KBSearchContext: a system prompt block with the relevant results
|
||||
// plus the de-duplicated source citations those results came from.
|
||||
// Uses LocalAI's collection search endpoint via the API.
|
||||
func KBAutoSearchPrompt(ctx context.Context, apiURL, apiKey, collection, query string, maxResults int, userID string) string {
|
||||
func KBAutoSearchPrompt(ctx context.Context, apiURL, apiKey, collection, query string, maxResults int, userID string) KBSearchContext {
|
||||
if collection == "" || query == "" {
|
||||
return ""
|
||||
return KBSearchContext{}
|
||||
}
|
||||
|
||||
if maxResults <= 0 {
|
||||
maxResults = 5
|
||||
}
|
||||
|
||||
// Call LocalAI's collection search API
|
||||
searchURL := strings.TrimRight(apiURL, "/") + "/api/agents/collections/" + collection + "/search"
|
||||
searchURL := strings.TrimRight(apiURL, "/") + "/api/agents/collections/" + url.PathEscape(collection) + "/search"
|
||||
if userID != "" {
|
||||
searchURL += "?user_id=" + userID
|
||||
query := url.Values{}
|
||||
query.Set("user_id", userID)
|
||||
searchURL += "?" + query.Encode()
|
||||
}
|
||||
reqBody, _ := json.Marshal(map[string]any{
|
||||
"query": query,
|
||||
@@ -56,7 +92,7 @@ func KBAutoSearchPrompt(ctx context.Context, apiURL, apiKey, collection, query s
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, searchURL, strings.NewReader(string(reqBody)))
|
||||
if err != nil {
|
||||
xlog.Warn("KB auto-search: failed to create request", "error", err)
|
||||
return ""
|
||||
return KBSearchContext{}
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
if apiKey != "" {
|
||||
@@ -66,41 +102,70 @@ func KBAutoSearchPrompt(ctx context.Context, apiURL, apiKey, collection, query s
|
||||
resp, err := httpclient.New().Do(req)
|
||||
if err != nil {
|
||||
xlog.Warn("KB auto-search: request failed", "error", err)
|
||||
return ""
|
||||
return KBSearchContext{}
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
xlog.Warn("KB auto-search: non-200 response", "status", resp.StatusCode, "body", string(body))
|
||||
return ""
|
||||
return KBSearchContext{}
|
||||
}
|
||||
|
||||
var searchResp kbSearchResponse
|
||||
if err := json.NewDecoder(resp.Body).Decode(&searchResp); err != nil {
|
||||
xlog.Warn("KB auto-search: failed to decode response", "error", err)
|
||||
return ""
|
||||
return KBSearchContext{}
|
||||
}
|
||||
|
||||
if len(searchResp.Results) == 0 {
|
||||
return ""
|
||||
return KBSearchContext{}
|
||||
}
|
||||
|
||||
// Format results as a system prompt block (same format as LocalAGI)
|
||||
// Build the system prompt block, labelling each chunk with its source file
|
||||
// so the model can attribute inline, and collect the structured citations.
|
||||
var sb strings.Builder
|
||||
sb.WriteString("Given the user input you have the following in memory:\n")
|
||||
for i, r := range searchResp.Results {
|
||||
sb.WriteString(fmt.Sprintf("- %s", r.Content))
|
||||
if len(r.Metadata) > 0 {
|
||||
meta, _ := json.Marshal(r.Metadata)
|
||||
sb.WriteString(fmt.Sprintf(" (%s)", string(meta)))
|
||||
|
||||
var citations []KBCitation
|
||||
seen := make(map[string]struct{})
|
||||
|
||||
for _, r := range searchResp.Results {
|
||||
fileName := r.Metadata[kbMetadataFileName]
|
||||
source := r.Metadata[kbMetadataSource]
|
||||
|
||||
label := fileName
|
||||
if label == "" {
|
||||
label = "unknown"
|
||||
}
|
||||
if i < len(searchResp.Results)-1 {
|
||||
sb.WriteString("\n")
|
||||
sb.WriteString(fmt.Sprintf("[Source: %s]\n%s\n", label, r.Content))
|
||||
|
||||
// Citations are de-duplicated per source document: many chunks from the
|
||||
// same file share one source key, so a file is listed only once. Skip
|
||||
// results with no source key — they cannot be linked back to a document.
|
||||
dedupKey := source
|
||||
if dedupKey == "" {
|
||||
dedupKey = fileName
|
||||
}
|
||||
if dedupKey == "" {
|
||||
continue
|
||||
}
|
||||
if _, ok := seen[dedupKey]; ok {
|
||||
continue
|
||||
}
|
||||
seen[dedupKey] = struct{}{}
|
||||
citations = append(citations, KBCitation{
|
||||
FileName: fileName,
|
||||
EntryKey: source,
|
||||
})
|
||||
}
|
||||
|
||||
return sb.String()
|
||||
sb.WriteString("When answering, cite sources using [Source: filename].")
|
||||
|
||||
return KBSearchContext{
|
||||
Prompt: sb.String(),
|
||||
Citations: citations,
|
||||
}
|
||||
}
|
||||
|
||||
// KBSearchMemoryArgs defines the arguments for the search_memory tool.
|
||||
@@ -110,21 +175,25 @@ type KBSearchMemoryArgs struct {
|
||||
|
||||
// KBSearchMemoryTool implements the search_memory MCP tool.
|
||||
type KBSearchMemoryTool struct {
|
||||
APIURL string
|
||||
APIKey string
|
||||
Collection string
|
||||
MaxResults int
|
||||
UserID string
|
||||
APIURL string
|
||||
APIKey string
|
||||
Collection string
|
||||
MaxResults int
|
||||
UserID string
|
||||
CitationCollector KBCitationCollector
|
||||
}
|
||||
|
||||
func (t KBSearchMemoryTool) Run(args KBSearchMemoryArgs) (string, any, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
result := KBAutoSearchPrompt(ctx, t.APIURL, t.APIKey, t.Collection, args.Query, t.MaxResults, t.UserID)
|
||||
if result == "" {
|
||||
if result.Prompt == "" {
|
||||
return "No results found.", nil, nil
|
||||
}
|
||||
return result, nil, nil
|
||||
if t.CitationCollector != nil {
|
||||
t.CitationCollector.AddKBCitations(result.Citations)
|
||||
}
|
||||
return result.Prompt, nil, nil
|
||||
}
|
||||
|
||||
// KBAddMemoryArgs defines the arguments for the add_memory tool.
|
||||
@@ -156,9 +225,11 @@ func (t KBAddMemoryTool) Run(args KBAddMemoryArgs) (string, any, error) {
|
||||
|
||||
// KBStoreContent uploads text content to a collection via the multipart upload API.
|
||||
func KBStoreContent(ctx context.Context, apiURL, apiKey, collection, content, userID string) error {
|
||||
uploadURL := strings.TrimRight(apiURL, "/") + "/api/agents/collections/" + collection + "/upload"
|
||||
uploadURL := strings.TrimRight(apiURL, "/") + "/api/agents/collections/" + url.PathEscape(collection) + "/upload"
|
||||
if userID != "" {
|
||||
uploadURL += "?user_id=" + userID
|
||||
query := url.Values{}
|
||||
query.Set("user_id", userID)
|
||||
uploadURL += "?" + query.Encode()
|
||||
}
|
||||
|
||||
// Build multipart form with the text content as a file
|
||||
|
||||
@@ -157,3 +157,82 @@ func (c *InFlightTrackingClient) Rerank(ctx context.Context, in *pb.RerankReques
|
||||
res, err := c.Backend.Rerank(ctx, in, opts...)
|
||||
return res, c.reconcile(err)
|
||||
}
|
||||
|
||||
func (c *InFlightTrackingClient) VAD(ctx context.Context, in *pb.VADRequest, opts ...ggrpc.CallOption) (*pb.VADResponse, error) {
|
||||
defer c.track(ctx)()
|
||||
res, err := c.Backend.VAD(ctx, in, opts...)
|
||||
return res, c.reconcile(err)
|
||||
}
|
||||
|
||||
func (c *InFlightTrackingClient) Diarize(ctx context.Context, in *pb.DiarizeRequest, opts ...ggrpc.CallOption) (*pb.DiarizeResponse, error) {
|
||||
defer c.track(ctx)()
|
||||
res, err := c.Backend.Diarize(ctx, in, opts...)
|
||||
return res, c.reconcile(err)
|
||||
}
|
||||
|
||||
func (c *InFlightTrackingClient) FaceVerify(ctx context.Context, in *pb.FaceVerifyRequest, opts ...ggrpc.CallOption) (*pb.FaceVerifyResponse, error) {
|
||||
defer c.track(ctx)()
|
||||
res, err := c.Backend.FaceVerify(ctx, in, opts...)
|
||||
return res, c.reconcile(err)
|
||||
}
|
||||
|
||||
func (c *InFlightTrackingClient) FaceAnalyze(ctx context.Context, in *pb.FaceAnalyzeRequest, opts ...ggrpc.CallOption) (*pb.FaceAnalyzeResponse, error) {
|
||||
defer c.track(ctx)()
|
||||
res, err := c.Backend.FaceAnalyze(ctx, in, opts...)
|
||||
return res, c.reconcile(err)
|
||||
}
|
||||
|
||||
func (c *InFlightTrackingClient) VoiceVerify(ctx context.Context, in *pb.VoiceVerifyRequest, opts ...ggrpc.CallOption) (*pb.VoiceVerifyResponse, error) {
|
||||
defer c.track(ctx)()
|
||||
res, err := c.Backend.VoiceVerify(ctx, in, opts...)
|
||||
return res, c.reconcile(err)
|
||||
}
|
||||
|
||||
func (c *InFlightTrackingClient) VoiceAnalyze(ctx context.Context, in *pb.VoiceAnalyzeRequest, opts ...ggrpc.CallOption) (*pb.VoiceAnalyzeResponse, error) {
|
||||
defer c.track(ctx)()
|
||||
res, err := c.Backend.VoiceAnalyze(ctx, in, opts...)
|
||||
return res, c.reconcile(err)
|
||||
}
|
||||
|
||||
func (c *InFlightTrackingClient) VoiceEmbed(ctx context.Context, in *pb.VoiceEmbedRequest, opts ...ggrpc.CallOption) (*pb.VoiceEmbedResponse, error) {
|
||||
defer c.track(ctx)()
|
||||
res, err := c.Backend.VoiceEmbed(ctx, in, opts...)
|
||||
return res, c.reconcile(err)
|
||||
}
|
||||
|
||||
func (c *InFlightTrackingClient) TokenClassify(ctx context.Context, in *pb.TokenClassifyRequest, opts ...ggrpc.CallOption) (*pb.TokenClassifyResponse, error) {
|
||||
defer c.track(ctx)()
|
||||
res, err := c.Backend.TokenClassify(ctx, in, opts...)
|
||||
return res, c.reconcile(err)
|
||||
}
|
||||
|
||||
func (c *InFlightTrackingClient) Score(ctx context.Context, in *pb.ScoreRequest, opts ...ggrpc.CallOption) (*pb.ScoreResponse, error) {
|
||||
defer c.track(ctx)()
|
||||
res, err := c.Backend.Score(ctx, in, opts...)
|
||||
return res, c.reconcile(err)
|
||||
}
|
||||
|
||||
func (c *InFlightTrackingClient) AudioEncode(ctx context.Context, in *pb.AudioEncodeRequest, opts ...ggrpc.CallOption) (*pb.AudioEncodeResult, error) {
|
||||
defer c.track(ctx)()
|
||||
res, err := c.Backend.AudioEncode(ctx, in, opts...)
|
||||
return res, c.reconcile(err)
|
||||
}
|
||||
|
||||
func (c *InFlightTrackingClient) AudioDecode(ctx context.Context, in *pb.AudioDecodeRequest, opts ...ggrpc.CallOption) (*pb.AudioDecodeResult, error) {
|
||||
defer c.track(ctx)()
|
||||
res, err := c.Backend.AudioDecode(ctx, in, opts...)
|
||||
return res, c.reconcile(err)
|
||||
}
|
||||
|
||||
func (c *InFlightTrackingClient) AudioTransform(ctx context.Context, in *pb.AudioTransformRequest, opts ...ggrpc.CallOption) (*pb.AudioTransformResult, error) {
|
||||
defer c.track(ctx)()
|
||||
res, err := c.Backend.AudioTransform(ctx, in, opts...)
|
||||
return res, c.reconcile(err)
|
||||
}
|
||||
|
||||
// AudioTransformStream, AudioToAudioStream and Forward are deliberately left as
|
||||
// embedded passthrough: they return a stream client and the inference spans the
|
||||
// stream's lifetime, not the constructor call. Wrapping the constructor with
|
||||
// track() would increment and immediately decrement (and fire onFirstComplete)
|
||||
// before any audio flows. Tracking those correctly needs the done() func tied to
|
||||
// stream close, which the current Backend interface doesn't surface here.
|
||||
|
||||
@@ -304,6 +304,105 @@ var _ = Describe("InFlightTrackingClient", func() {
|
||||
})
|
||||
})
|
||||
|
||||
Describe("non-LLM inference methods track in-flight", func() {
|
||||
// silero-vad and friends only ever expose a single non-Predict method.
|
||||
// If that method isn't wrapped, the load-time reservation released by
|
||||
// onFirstComplete never fires and in-flight is stuck at 1 forever.
|
||||
assertTracked := func(call func() error) {
|
||||
var firstFired int
|
||||
client.OnFirstComplete(func() { firstFired++ })
|
||||
err := call()
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(tracker.increments).To(Equal(1), "method must increment in-flight")
|
||||
Expect(tracker.decrements).To(Equal(1), "method must decrement in-flight")
|
||||
Expect(firstFired).To(Equal(1), "method must release the load-time reservation")
|
||||
}
|
||||
|
||||
It("VAD", func() {
|
||||
assertTracked(func() error {
|
||||
_, err := client.VAD(context.Background(), &pb.VADRequest{})
|
||||
return err
|
||||
})
|
||||
})
|
||||
|
||||
It("Diarize", func() {
|
||||
assertTracked(func() error {
|
||||
_, err := client.Diarize(context.Background(), &pb.DiarizeRequest{})
|
||||
return err
|
||||
})
|
||||
})
|
||||
|
||||
It("VoiceVerify", func() {
|
||||
assertTracked(func() error {
|
||||
_, err := client.VoiceVerify(context.Background(), &pb.VoiceVerifyRequest{})
|
||||
return err
|
||||
})
|
||||
})
|
||||
|
||||
It("VoiceAnalyze", func() {
|
||||
assertTracked(func() error {
|
||||
_, err := client.VoiceAnalyze(context.Background(), &pb.VoiceAnalyzeRequest{})
|
||||
return err
|
||||
})
|
||||
})
|
||||
|
||||
It("VoiceEmbed", func() {
|
||||
assertTracked(func() error {
|
||||
_, err := client.VoiceEmbed(context.Background(), &pb.VoiceEmbedRequest{})
|
||||
return err
|
||||
})
|
||||
})
|
||||
|
||||
It("FaceVerify", func() {
|
||||
assertTracked(func() error {
|
||||
_, err := client.FaceVerify(context.Background(), &pb.FaceVerifyRequest{})
|
||||
return err
|
||||
})
|
||||
})
|
||||
|
||||
It("FaceAnalyze", func() {
|
||||
assertTracked(func() error {
|
||||
_, err := client.FaceAnalyze(context.Background(), &pb.FaceAnalyzeRequest{})
|
||||
return err
|
||||
})
|
||||
})
|
||||
|
||||
It("TokenClassify", func() {
|
||||
assertTracked(func() error {
|
||||
_, err := client.TokenClassify(context.Background(), &pb.TokenClassifyRequest{})
|
||||
return err
|
||||
})
|
||||
})
|
||||
|
||||
It("Score", func() {
|
||||
assertTracked(func() error {
|
||||
_, err := client.Score(context.Background(), &pb.ScoreRequest{})
|
||||
return err
|
||||
})
|
||||
})
|
||||
|
||||
It("AudioEncode", func() {
|
||||
assertTracked(func() error {
|
||||
_, err := client.AudioEncode(context.Background(), &pb.AudioEncodeRequest{})
|
||||
return err
|
||||
})
|
||||
})
|
||||
|
||||
It("AudioDecode", func() {
|
||||
assertTracked(func() error {
|
||||
_, err := client.AudioDecode(context.Background(), &pb.AudioDecodeRequest{})
|
||||
return err
|
||||
})
|
||||
})
|
||||
|
||||
It("AudioTransform", func() {
|
||||
assertTracked(func() error {
|
||||
_, err := client.AudioTransform(context.Background(), &pb.AudioTransformRequest{})
|
||||
return err
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
Describe("stale model reload (self-heal)", func() {
|
||||
It("removes the replica when the backend reports the model is not loaded", func() {
|
||||
backend.predictErr = fmt.Errorf("parakeet-cpp: model not loaded")
|
||||
|
||||
@@ -429,7 +429,7 @@ name: my-model
|
||||
reasoning_effort: none # none | minimal | low | medium | high
|
||||
```
|
||||
|
||||
For [realtime pipelines]({{%relref "docs/features/openai-realtime" %}}), set it on the pipeline so it applies to the pipeline's LLM without editing that model's own config:
|
||||
For [realtime pipelines]({{%relref "features/openai-realtime" %}}), set it on the pipeline so it applies to the pipeline's LLM without editing that model's own config:
|
||||
|
||||
```yaml
|
||||
name: gpt-realtime
|
||||
|
||||
@@ -31,6 +31,43 @@ This configuration links the following components:
|
||||
|
||||
Make sure all referenced models (`silero-vad-ggml`, `whisper-large-turbo`, `qwen3-4b`, `tts-1`) are also installed or defined in your LocalAI instance.
|
||||
|
||||
### Streaming the pipeline
|
||||
|
||||
By default each stage runs to completion before the next begins: the whole utterance is transcribed, the full LLM reply is generated, then it is synthesized. Each stage can instead be streamed incrementally, which lowers the time-to-first-audio of a turn:
|
||||
|
||||
```yaml
|
||||
name: gpt-realtime
|
||||
pipeline:
|
||||
vad: silero-vad-ggml
|
||||
transcription: whisper-large-turbo
|
||||
llm: qwen3-4b
|
||||
tts: tts-1
|
||||
streaming:
|
||||
llm: true # stream LLM tokens as transcript deltas
|
||||
tts: true # emit audio deltas per synthesized chunk
|
||||
transcription: true # stream transcript text deltas of the user's speech
|
||||
clause_chunking: true # synthesize each clause as soon as it completes
|
||||
```
|
||||
|
||||
- **streaming.tts**: emit a `response.output_audio.delta` per audio chunk the TTS backend produces (requires a backend that supports streaming synthesis), instead of one delta for the whole utterance. Falls back to a single unary delta otherwise.
|
||||
- **streaming.transcription**: stream `conversation.item.input_audio_transcription.delta` events as the transcript is produced (requires a transcription backend that supports streaming).
|
||||
- **streaming.llm**: stream the LLM reply token-by-token as `response.output_audio_transcript.delta` events. The full reply is buffered and synthesized once it is complete — streamed as audio chunks when `streaming.tts` is enabled (and the TTS backend supports it), otherwise as a single unary delta. Reasoning/thinking is always stripped from the spoken transcript. Tool calls are supported while streaming when the LLM uses its tokenizer template (`use_tokenizer_template: true`): the backend's autoparser then delivers content and tool calls separately, so the spoken transcript never leaks tool-call tokens. Grammar-based function calling keeps the buffered path.
|
||||
- **streaming.clause_chunking**: instead of buffering the whole reply before TTS, split it into speakable clauses and synthesize each as soon as it completes, lowering the time-to-first-audio. The splitter is script-aware: it uses Unicode sentence segmentation (so it handles CJK `。!?` with no whitespace), CJK clause punctuation (`,、;:`), and Thai/Lao spaces — it does **not** rely on whitespace sentence boundaries, so it works for languages such as Chinese, Japanese and Thai where the old per-sentence approach degraded to whole-message buffering. Requires `streaming.llm`; scripts that genuinely need a dictionary (e.g. Khmer, Burmese) simply stay buffered until a space or end-of-message. Off by default.
|
||||
|
||||
All streaming flags are off by default, so existing pipelines are unaffected.
|
||||
|
||||
### Disabling thinking
|
||||
|
||||
For reasoning models, you can force the pipeline LLM's thinking off without editing the LLM model config:
|
||||
|
||||
```yaml
|
||||
pipeline:
|
||||
llm: qwen3-4b
|
||||
disable_thinking: true # maps to enable_thinking=false for the realtime LLM
|
||||
```
|
||||
|
||||
This is applied only to the realtime session's copy of the LLM config, so it does not affect other users of the same model. Leave it unset to use the LLM model config's own reasoning settings.
|
||||
|
||||
## Transports
|
||||
|
||||
The Realtime API supports two transports: **WebSocket** and **WebRTC**.
|
||||
@@ -74,6 +111,28 @@ EXTERNAL_GRPC_BACKENDS=opus:/path/to/backend/go/opus/opus
|
||||
|
||||
The opus backend is loaded automatically when a WebRTC session starts. It does not require any model configuration file — just the backend binary.
|
||||
|
||||
#### WebRTC behind Docker host networking or NAT
|
||||
|
||||
By default pion gathers a host ICE candidate for every local interface. Under
|
||||
Docker **host networking** that includes bridge addresses (`docker0`/`veth`,
|
||||
`172.x`) that a remote browser cannot route to: the call typically connects on a
|
||||
good candidate and then drops a few seconds later when ICE consent checks fail on
|
||||
the unreachable ones. Two settings let you advertise only the reachable address:
|
||||
|
||||
```bash
|
||||
# Advertise these IPs as the host ICE candidates (e.g. the host's LAN IP)
|
||||
LOCALAI_WEBRTC_NAT_1TO1_IPS=192.168.1.10
|
||||
|
||||
# ...or restrict ICE gathering to specific interfaces
|
||||
LOCALAI_WEBRTC_ICE_INTERFACES=eth0
|
||||
```
|
||||
|
||||
{{% notice tip %}}
|
||||
For a browser on another LAN machine talking to LocalAI in a host-networked
|
||||
container, set `LOCALAI_WEBRTC_NAT_1TO1_IPS` to the host's LAN IP. This is the
|
||||
most reliable fix for WebRTC connections that establish and then drop.
|
||||
{{% /notice %}}
|
||||
|
||||
## Protocol
|
||||
|
||||
The API follows the OpenAI Realtime API protocol for handling sessions, audio buffers, and conversation items.
|
||||
|
||||
@@ -20,7 +20,29 @@ With the CLI you can list the models with `local-ai models list` and install the
|
||||
You can also [run models manually]({{%relref "getting-started/models" %}}) by copying files into the `models` directory.
|
||||
{{% /notice %}}
|
||||
|
||||
You can test out the API endpoints using `curl`, few examples are listed below. The models we are referring here (`gpt-4`, `gpt-4-vision-preview`, `tts-1`, `whisper-1`) are examples - replace them with the model names you have installed.
|
||||
You can test chat models from the CLI without keeping a separate `curl` command around:
|
||||
|
||||
```bash
|
||||
# Terminal 1
|
||||
local-ai run
|
||||
|
||||
# Terminal 2
|
||||
local-ai chat --model gpt-4
|
||||
```
|
||||
|
||||
`local-ai chat` connects to a running LocalAI server, opens an interactive chat prompt, and exits when you type `/exit`, `/quit`, or `/bye`. Use `/models` to list installed models, `/model <name>` to switch models, and `/clear` to reset the current conversation. If the server exposes exactly one model, LocalAI uses that model automatically:
|
||||
|
||||
```bash
|
||||
# Terminal 1
|
||||
local-ai run llama-3.2-1b-instruct:q4_k_m
|
||||
|
||||
# Terminal 2
|
||||
local-ai chat
|
||||
```
|
||||
|
||||
When more than one model is configured, pass `--model` with the installed model name to avoid ambiguity. Use `--endpoint` to connect to a non-default server, for example `local-ai chat --endpoint http://127.0.0.1:8081 --model gpt-4`.
|
||||
|
||||
You can also test out the API endpoints using `curl`, few examples are listed below. The models we are referring here (`gpt-4`, `gpt-4-vision-preview`, `tts-1`, `whisper-1`) are examples - replace them with the model names you have installed.
|
||||
|
||||
### Text Generation
|
||||
|
||||
|
||||
@@ -118,6 +118,21 @@ For more information on VRAM management, see [VRAM and Memory Management]({{%rel
|
||||
|
||||
See [Authentication & Authorization]({{%relref "features/authentication" %}}) for full documentation.
|
||||
|
||||
## Chat Flags
|
||||
|
||||
Use `local-ai chat` to open an interactive terminal chat session against a running LocalAI server.
|
||||
|
||||
| Parameter | Default | Description | Environment Variable |
|
||||
|-----------|---------|-------------|----------------------|
|
||||
| `--endpoint` | `http://127.0.0.1:8080` | LocalAI server endpoint. The `/v1` path is added automatically when omitted. | `$LOCALAI_CHAT_ENDPOINT` |
|
||||
| `--model` | | Model name to use. If omitted, LocalAI uses the only model returned by the server when exactly one is available. | |
|
||||
| `--api-key` | | API key to use when the LocalAI server requires authentication. | `$LOCALAI_API_KEY`, `$API_KEY` |
|
||||
|
||||
- Inside the chat prompt:
|
||||
- Use `/models` to list installed models.
|
||||
- Use `/model <name>` to switch to a different model and clear the conversation.
|
||||
- Use `/clear` to reset the current conversation.
|
||||
|
||||
## P2P Flags
|
||||
|
||||
| Parameter | Default | Description | Environment Variable |
|
||||
@@ -181,4 +196,3 @@ export LOCALAI_F16=true
|
||||
|
||||
- See [Advanced Usage]({{%relref "advanced/advanced-usage" %}}) for configuration examples
|
||||
- See [VRAM and Memory Management]({{%relref "advanced/vram-management" %}}) for memory management options
|
||||
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
{
|
||||
"version": "v4.3.6"
|
||||
"version": "v4.4.0"
|
||||
}
|
||||
|
||||
2
go.mod
2
go.mod
@@ -465,7 +465,7 @@ require (
|
||||
github.com/quic-go/qpack v0.6.0 // indirect
|
||||
github.com/quic-go/quic-go v0.59.0 // indirect
|
||||
github.com/quic-go/webtransport-go v0.10.0 // indirect
|
||||
github.com/rivo/uniseg v0.4.7 // indirect
|
||||
github.com/rivo/uniseg v0.4.7
|
||||
github.com/shoenig/go-m1cpu v0.1.6 // indirect
|
||||
github.com/shopspring/decimal v1.4.0 // indirect
|
||||
github.com/sirupsen/logrus v1.9.4 // indirect
|
||||
|
||||
@@ -89,6 +89,35 @@ func ExtractReasoningWithConfig(content, thinkingStartToken string, config Confi
|
||||
return reasoning, cleanedContent
|
||||
}
|
||||
|
||||
// ExtractReasoningComplete extracts reasoning from a COMPLETE (non-streaming)
|
||||
// model response. It behaves like ExtractReasoningWithConfig except that it only
|
||||
// honors a prefilled thinking start token when the response actually contains
|
||||
// the matching closing tag.
|
||||
//
|
||||
// Rationale: when a chat template injects the start token into the prompt (so
|
||||
// DetectThinkingStartToken returns e.g. "<think>"), the model's output begins
|
||||
// inside a reasoning block and carries only the closing tag. The defensive
|
||||
// fallback prepends the start token so the extractor can pair it with that
|
||||
// close tag. But on a COMPLETE response with no closing tag, the model answered
|
||||
// directly with no reasoning at all — prepending the start token would
|
||||
// manufacture an unclosed block that swallows the entire answer into reasoning,
|
||||
// leaving content empty (breaking short/direct answers such as session names or
|
||||
// JSON summaries). Genuine reasoning tags already present in the content still
|
||||
// extract, because dropping the synthetic prefill does not affect them.
|
||||
//
|
||||
// Streaming callers must keep using ExtractReasoningWithConfig: mid-stream an
|
||||
// as-yet-unclosed block is legitimate and its tokens should surface as
|
||||
// reasoning deltas as they arrive.
|
||||
func ExtractReasoningComplete(content, thinkingStartToken string, config Config) (reasoning string, cleanedContent string) {
|
||||
startToken := thinkingStartToken
|
||||
if startToken != "" {
|
||||
if end := ClosingTokenForStart(startToken, &config); end == "" || !strings.Contains(content, end) {
|
||||
startToken = ""
|
||||
}
|
||||
}
|
||||
return ExtractReasoningWithConfig(content, startToken, config)
|
||||
}
|
||||
|
||||
// PrependThinkingTokenIfNeeded prepends the thinking start token to content if it was
|
||||
// detected in the prompt. This allows the standard extraction logic to work correctly
|
||||
// for models where the thinking token is already in the prompt.
|
||||
@@ -131,6 +160,48 @@ func PrependThinkingTokenIfNeeded(content string, startToken string) string {
|
||||
return startToken + content
|
||||
}
|
||||
|
||||
// defaultReasoningTagPairs are the built-in start/end reasoning tag pairs,
|
||||
// matching llama.cpp's chat-parser.cpp. Kept at package scope so that
|
||||
// ExtractReasoning and ClosingTokenForStart share a single source of truth.
|
||||
var defaultReasoningTagPairs = []TagPair{
|
||||
{Start: "<|START_THINKING|>", End: "<|END_THINKING|>"}, // Command-R models
|
||||
{Start: "<|inner_prefix|>", End: "<|inner_suffix|>"}, // Apertus models
|
||||
{Start: "<seed:think>", End: "</seed:think>"}, // Seed models
|
||||
{Start: "<think>", End: "</think>"}, // DeepSeek, Granite, ExaOne models
|
||||
{Start: "<|think|>", End: "<|end|><|begin|>assistant<|content|>"}, // Solar Open models (complex end)
|
||||
{Start: "<|channel>thought", End: "<channel|>"}, // Gemma 4 models
|
||||
{Start: "<thinking>", End: "</thinking>"}, // General thinking tag
|
||||
{Start: "[THINK]", End: "[/THINK]"}, // Magistral models
|
||||
}
|
||||
|
||||
// ClosingTokenForStart returns the closing reasoning tag that pairs with the
|
||||
// given start token, searching custom config TagPairs first then the built-in
|
||||
// defaults. Returns "" when startToken is empty or unrecognized.
|
||||
//
|
||||
// Used by the non-streaming autoparser fallback to decide whether a complete
|
||||
// response that began with a prefilled thinking token actually closed its
|
||||
// reasoning block: only then is synthesizing the start token (so the standard
|
||||
// extractor can pair it with the model's close tag) safe. A complete response
|
||||
// with no closing tag is a direct answer, not unclosed reasoning.
|
||||
func ClosingTokenForStart(startToken string, config *Config) string {
|
||||
if startToken == "" {
|
||||
return ""
|
||||
}
|
||||
if config != nil {
|
||||
for _, pair := range config.TagPairs {
|
||||
if pair.Start == startToken {
|
||||
return pair.End
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, pair := range defaultReasoningTagPairs {
|
||||
if pair.Start == startToken {
|
||||
return pair.End
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// ExtractReasoning extracts reasoning content from thinking tags and returns
|
||||
// both the extracted reasoning and the cleaned content (with tags removed).
|
||||
// It handles <thinking>...</thinking> and <think>...</think> tags.
|
||||
@@ -145,22 +216,7 @@ func ExtractReasoning(content string, config *Config) (reasoning string, cleaned
|
||||
var cleanedParts []string
|
||||
remaining := content
|
||||
|
||||
// Define default tag pairs to look for (matching llama.cpp's chat-parser.cpp)
|
||||
defaultTagPairs := []struct {
|
||||
start string
|
||||
end string
|
||||
}{
|
||||
{"<|START_THINKING|>", "<|END_THINKING|>"}, // Command-R models
|
||||
{"<|inner_prefix|>", "<|inner_suffix|>"}, // Apertus models
|
||||
{"<seed:think>", "</seed:think>"}, // Seed models
|
||||
{"<think>", "</think>"}, // DeepSeek, Granite, ExaOne models
|
||||
{"<|think|>", "<|end|><|begin|>assistant<|content|>"}, // Solar Open models (complex end)
|
||||
{"<|channel>thought", "<channel|>"}, // Gemma 4 models
|
||||
{"<thinking>", "</thinking>"}, // General thinking tag
|
||||
{"[THINK]", "[/THINK]"}, // Magistral models
|
||||
}
|
||||
|
||||
// Merge custom tag pairs with default tag pairs (custom pairs first for priority)
|
||||
// Merge custom tag pairs (highest priority) with the built-in defaults.
|
||||
var tagPairs []struct {
|
||||
start string
|
||||
end string
|
||||
@@ -175,9 +231,11 @@ func ExtractReasoning(content string, config *Config) (reasoning string, cleaned
|
||||
}
|
||||
}
|
||||
}
|
||||
// Add default tag pairs
|
||||
for _, pair := range defaultTagPairs {
|
||||
tagPairs = append(tagPairs, pair)
|
||||
for _, pair := range defaultReasoningTagPairs {
|
||||
tagPairs = append(tagPairs, struct {
|
||||
start string
|
||||
end string
|
||||
}{pair.Start, pair.End})
|
||||
}
|
||||
|
||||
// Track the last position we've processed
|
||||
|
||||
@@ -1175,6 +1175,55 @@ var _ = Describe("Custom Tokens and Tag Pairs Integration", func() {
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("ClosingTokenForStart", func() {
|
||||
It("returns the default closing tag for a known start token", func() {
|
||||
Expect(ClosingTokenForStart("<think>", nil)).To(Equal("</think>"))
|
||||
Expect(ClosingTokenForStart("<thinking>", nil)).To(Equal("</thinking>"))
|
||||
Expect(ClosingTokenForStart("[THINK]", nil)).To(Equal("[/THINK]"))
|
||||
})
|
||||
|
||||
It("returns empty for an empty or unknown start token", func() {
|
||||
Expect(ClosingTokenForStart("", nil)).To(BeEmpty())
|
||||
Expect(ClosingTokenForStart("<nope>", nil)).To(BeEmpty())
|
||||
})
|
||||
|
||||
It("prefers custom config tag pairs over the defaults", func() {
|
||||
cfg := &Config{TagPairs: []TagPair{{Start: "<think>", End: "<<END>>"}}}
|
||||
Expect(ClosingTokenForStart("<think>", cfg)).To(Equal("<<END>>"))
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("ExtractReasoningComplete", func() {
|
||||
const startToken = "<think>"
|
||||
|
||||
It("keeps a tag-less answer as content when a start token is prefilled but no close tag is present", func() {
|
||||
// The bug guard: prompt-prefilled <think>, model answered directly with
|
||||
// no reasoning. The synthetic prefill must not swallow it as reasoning.
|
||||
reasoning, content := ExtractReasoningComplete("hello", startToken, Config{})
|
||||
Expect(reasoning).To(BeEmpty())
|
||||
Expect(content).To(Equal("hello"))
|
||||
})
|
||||
|
||||
It("extracts reasoning when the model emits only the closing tag (legitimate prefill)", func() {
|
||||
reasoning, content := ExtractReasoningComplete("the rationale\n</think>\n\nthe answer", startToken, Config{})
|
||||
Expect(reasoning).To(ContainSubstring("the rationale"))
|
||||
Expect(content).To(ContainSubstring("the answer"))
|
||||
Expect(content).ToNot(ContainSubstring("</think>"))
|
||||
})
|
||||
|
||||
It("extracts a fully-tagged block regardless of the prefill token", func() {
|
||||
reasoning, content := ExtractReasoningComplete("<think>r</think>answer", startToken, Config{})
|
||||
Expect(reasoning).To(Equal("r"))
|
||||
Expect(content).To(Equal("answer"))
|
||||
})
|
||||
|
||||
It("behaves like ExtractReasoningWithConfig when no start token is prefilled", func() {
|
||||
reasoning, content := ExtractReasoningComplete("<think>r</think>answer", "", Config{})
|
||||
Expect(reasoning).To(Equal("r"))
|
||||
Expect(content).To(Equal("answer"))
|
||||
})
|
||||
})
|
||||
|
||||
// Helper function to create bool pointers for test configs
|
||||
func boolPtr(b bool) *bool {
|
||||
return &b
|
||||
|
||||
Reference in New Issue
Block a user