mirror of
https://github.com/mudler/LocalAI.git
synced 2026-04-30 20:15:35 -04:00
* fix(streaming): dedupe content, recover reasoning, unique tool IDs
When tool calls are discovered only during final parsing (after the
streaming token callback returns), processTools' default switch branch
used to emit the full accumulated content alongside the tool_call args
chunk. Clients that accumulate delta.content per the OpenAI streaming
contract end up showing every narration line twice. Three related bugs
in the same flush path:
1. Content duplication: the args chunk carried Content:textContentToReturn
even though the text had already been streamed token-by-token via
the token callback, so delta.content was both the running total and
bundled with tool_calls in one delta (two spec violations).
2. Reasoning drop: when the C++ autoparser surfaces reasoning only as
a final aggregate (no incremental tokens), the callback never emits
it and the flush branch didn't either, silently losing it.
3. tool_call ID collision: empty ss.ID fell back to the request id, so
multiple empty-ID calls in the same turn all shared the same id,
breaking tool_result matching by tool_call_id.
Extracted the block into buildDeferredToolCallChunks (pure function,
unit-testable) and added 19 Ginkgo specs covering streamed vs.
not-streamed content/reasoning, single vs. multi call, and
incremental-vs-deferred emission. Every case asserts the invariant
that no delta carries both non-empty Content/Reasoning and non-empty
ToolCalls.
Fix summary:
- emit reasoning in its own leading chunk when !reasoningAlreadyStreamed
- emit role+content in their own chunks when !contentAlreadyStreamed
- drop Content from the tool_call args chunk
- fallback to fmt.Sprintf("%s-%d", id, i) for empty ss.ID so calls stay
uniquely addressable
Reproduced live against qwen3.6-35b-a3b-apex served by LocalAI with
the C++ autoparser; the full-content replay chunk that preceded each
tool_calls block is gone after the fix.
Assisted-by: Claude:claude-opus-4-7 go vet
* fix(streaming): dedupe reasoning in the noActionToRun final chunk
extractor.Reasoning() returns only the Go-side extractor's lastReasoning
accumulator (pkg/reasoning/extractor.go:129). ChatDelta reasoning
coming through ProcessChatDeltaReasoning lives in a separate
accumulator (cdLastStrippedReasoning) that Reasoning() does not
expose. The "reasoning != \"\" && extractor.Reasoning() == \"\"" guard
therefore fires exactly when the autoparser streamed reasoning
incrementally via the callback — producing a duplicate final delivery.
Replace both guard sites in the noActionToRun branch with the
sentReasoning flag introduced in the previous commit. Extract the
closing-chunk logic into buildNoActionFinalChunks so the refactor is
testable; the helper mirrors buildDeferredToolCallChunks.
Add Ginkgo coverage for both the content-streamed and
content-not-streamed paths: reasoning is dropped when it was streamed,
delivered once when it arrived only as a final aggregate, and omitted
when empty. Metadata invariants carried over from the sibling helper.
Assisted-by: Claude:claude-opus-4-7 go vet
* fix(streaming): detect noActionToRun anywhere in functionResults
The previous condition only looked at functionResults[0].Name, which
misbehaved when a real tool call followed a noAction sentinel — the
noAction shadowed the real call and the whole turn was treated as a
question to answer, silently dropping the tool call. The mirror case,
[realCall, noActionCall], fell into the default branch and emitted the
noAction entry as if it were a real tool_call.
Replace with hasRealCall, which scans the slice and returns true as
soon as it finds a non-noAction entry. noActionToRun now matches the
semantic intent: "every entry is the noAction sentinel (or the slice
is empty)".
Note: this does not change incremental emission, where noAction
entries may still be forwarded as tool_call chunks by the XML/JSON
iterative parsers. That is a separate layer (functions.Parse*) and
addressing it requires threading noAction through the parser APIs —
out of scope for this change.
Assisted-by: Claude:claude-opus-4-7 go vet
1263 lines
44 KiB
Go
1263 lines
44 KiB
Go
package openai
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/labstack/echo/v4"
|
|
"github.com/mudler/LocalAI/core/backend"
|
|
"github.com/mudler/LocalAI/core/config"
|
|
mcpTools "github.com/mudler/LocalAI/core/http/endpoints/mcp"
|
|
"github.com/mudler/LocalAI/core/http/middleware"
|
|
"github.com/mudler/LocalAI/core/schema"
|
|
"github.com/mudler/LocalAI/pkg/functions"
|
|
reason "github.com/mudler/LocalAI/pkg/reasoning"
|
|
|
|
"github.com/mudler/LocalAI/core/templates"
|
|
pb "github.com/mudler/LocalAI/pkg/grpc/proto"
|
|
"github.com/mudler/LocalAI/pkg/model"
|
|
|
|
"github.com/mudler/xlog"
|
|
)
|
|
|
|
// mergeToolCallDeltas merges streaming tool call deltas into complete tool calls.
|
|
// In SSE streaming, a single tool call arrives as multiple chunks sharing the same Index:
|
|
// the first chunk carries the ID, Type, and Name; subsequent chunks append to Arguments.
|
|
func mergeToolCallDeltas(existing []schema.ToolCall, deltas []schema.ToolCall) []schema.ToolCall {
|
|
byIndex := make(map[int]int, len(existing)) // tool call Index -> position in slice
|
|
for i, tc := range existing {
|
|
byIndex[tc.Index] = i
|
|
}
|
|
for _, d := range deltas {
|
|
pos, found := byIndex[d.Index]
|
|
if !found {
|
|
byIndex[d.Index] = len(existing)
|
|
existing = append(existing, d)
|
|
continue
|
|
}
|
|
// Merge into existing entry
|
|
tc := &existing[pos]
|
|
if d.ID != "" {
|
|
tc.ID = d.ID
|
|
}
|
|
if d.Type != "" {
|
|
tc.Type = d.Type
|
|
}
|
|
if d.FunctionCall.Name != "" {
|
|
tc.FunctionCall.Name = d.FunctionCall.Name
|
|
}
|
|
tc.FunctionCall.Arguments += d.FunctionCall.Arguments
|
|
}
|
|
return existing
|
|
}
|
|
|
|
// ChatEndpoint is the OpenAI Completion API endpoint https://platform.openai.com/docs/api-reference/chat/create
|
|
// @Summary Generate a chat completions for a given prompt and model.
|
|
// @Tags inference
|
|
// @Param request body schema.OpenAIRequest true "query params"
|
|
// @Success 200 {object} schema.OpenAIResponse "Response"
|
|
// @Router /v1/chat/completions [post]
|
|
func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator *templates.Evaluator, startupOptions *config.ApplicationConfig, natsClient mcpTools.MCPNATSClient) echo.HandlerFunc {
|
|
process := func(s string, req *schema.OpenAIRequest, config *config.ModelConfig, loader *model.ModelLoader, responses chan schema.OpenAIResponse, extraUsage bool, id string, created int) error {
|
|
initialMessage := schema.OpenAIResponse{
|
|
ID: id,
|
|
Created: created,
|
|
Model: req.Model, // we have to return what the user sent here, due to OpenAI spec.
|
|
Choices: []schema.Choice{{Delta: &schema.Message{Role: "assistant"}, Index: 0, FinishReason: nil}},
|
|
Object: "chat.completion.chunk",
|
|
}
|
|
responses <- initialMessage
|
|
|
|
// Detect if thinking token is already in prompt or template
|
|
// When UseTokenizerTemplate is enabled, predInput is empty, so we check the template
|
|
var template string
|
|
if config.TemplateConfig.UseTokenizerTemplate {
|
|
template = config.GetModelTemplate()
|
|
} else {
|
|
template = s
|
|
}
|
|
thinkingStartToken := reason.DetectThinkingStartToken(template, &config.ReasoningConfig)
|
|
extractor := reason.NewReasoningExtractor(thinkingStartToken, config.ReasoningConfig)
|
|
|
|
_, _, _, err := ComputeChoices(req, s, config, cl, startupOptions, loader, func(s string, c *[]schema.Choice) {}, func(s string, tokenUsage backend.TokenUsage) bool {
|
|
var reasoningDelta, contentDelta string
|
|
|
|
// Always keep the Go-side extractor in sync with raw tokens so it
|
|
// can serve as fallback for backends without an autoparser (e.g. vLLM).
|
|
goReasoning, goContent := extractor.ProcessToken(s)
|
|
|
|
// When C++ autoparser chat deltas are available, prefer them — they
|
|
// handle model-specific formats (Gemma 4, etc.) without Go-side tags.
|
|
// Otherwise fall back to Go-side extraction.
|
|
if tokenUsage.HasChatDeltaContent() {
|
|
rawReasoning, cd := tokenUsage.ChatDeltaReasoningAndContent()
|
|
contentDelta = cd
|
|
reasoningDelta = extractor.ProcessChatDeltaReasoning(rawReasoning)
|
|
} else {
|
|
reasoningDelta = goReasoning
|
|
contentDelta = goContent
|
|
}
|
|
|
|
usage := schema.OpenAIUsage{
|
|
PromptTokens: tokenUsage.Prompt,
|
|
CompletionTokens: tokenUsage.Completion,
|
|
TotalTokens: tokenUsage.Prompt + tokenUsage.Completion,
|
|
}
|
|
if extraUsage {
|
|
usage.TimingTokenGeneration = tokenUsage.TimingTokenGeneration
|
|
usage.TimingPromptProcessing = tokenUsage.TimingPromptProcessing
|
|
}
|
|
|
|
delta := &schema.Message{}
|
|
if contentDelta != "" {
|
|
delta.Content = &contentDelta
|
|
}
|
|
if reasoningDelta != "" {
|
|
delta.Reasoning = &reasoningDelta
|
|
}
|
|
|
|
resp := schema.OpenAIResponse{
|
|
ID: id,
|
|
Created: created,
|
|
Model: req.Model, // we have to return what the user sent here, due to OpenAI spec.
|
|
Choices: []schema.Choice{{Delta: delta, Index: 0, FinishReason: nil}},
|
|
Object: "chat.completion.chunk",
|
|
Usage: usage,
|
|
}
|
|
|
|
responses <- resp
|
|
return true
|
|
})
|
|
close(responses)
|
|
return err
|
|
}
|
|
processTools := func(noAction string, prompt string, req *schema.OpenAIRequest, config *config.ModelConfig, loader *model.ModelLoader, responses chan schema.OpenAIResponse, extraUsage bool, id string, created int, textContentToReturn *string) error {
|
|
// Detect if thinking token is already in prompt or template
|
|
var template string
|
|
if config.TemplateConfig.UseTokenizerTemplate {
|
|
template = config.GetModelTemplate()
|
|
} else {
|
|
template = prompt
|
|
}
|
|
thinkingStartToken := reason.DetectThinkingStartToken(template, &config.ReasoningConfig)
|
|
extractor := reason.NewReasoningExtractor(thinkingStartToken, config.ReasoningConfig)
|
|
|
|
result := ""
|
|
lastEmittedCount := 0
|
|
sentInitialRole := false
|
|
sentReasoning := false
|
|
hasChatDeltaToolCalls := false
|
|
hasChatDeltaContent := false
|
|
|
|
_, tokenUsage, chatDeltas, err := ComputeChoices(req, prompt, config, cl, startupOptions, loader, func(s string, c *[]schema.Choice) {}, func(s string, usage backend.TokenUsage) bool {
|
|
result += s
|
|
|
|
// Track whether ChatDeltas from the C++ autoparser contain
|
|
// tool calls or content, so the retry decision can account for them.
|
|
for _, d := range usage.ChatDeltas {
|
|
if len(d.ToolCalls) > 0 {
|
|
hasChatDeltaToolCalls = true
|
|
}
|
|
if d.Content != "" {
|
|
hasChatDeltaContent = true
|
|
}
|
|
}
|
|
|
|
var reasoningDelta, contentDelta string
|
|
|
|
goReasoning, goContent := extractor.ProcessToken(s)
|
|
|
|
if usage.HasChatDeltaContent() {
|
|
rawReasoning, cd := usage.ChatDeltaReasoningAndContent()
|
|
contentDelta = cd
|
|
reasoningDelta = extractor.ProcessChatDeltaReasoning(rawReasoning)
|
|
} else {
|
|
reasoningDelta = goReasoning
|
|
contentDelta = goContent
|
|
}
|
|
|
|
// Emit reasoning deltas in their own SSE chunks before any tool-call chunks
|
|
// (OpenAI spec: reasoning and tool_calls never share a delta)
|
|
if reasoningDelta != "" {
|
|
responses <- schema.OpenAIResponse{
|
|
ID: id,
|
|
Created: created,
|
|
Model: req.Model,
|
|
Choices: []schema.Choice{{
|
|
Delta: &schema.Message{Reasoning: &reasoningDelta},
|
|
Index: 0,
|
|
}},
|
|
Object: "chat.completion.chunk",
|
|
}
|
|
sentReasoning = true
|
|
}
|
|
|
|
// Stream content deltas (cleaned of reasoning tags) while no tool calls
|
|
// have been detected. Once the incremental parser finds tool calls,
|
|
// content stops — per OpenAI spec, content and tool_calls don't mix.
|
|
if lastEmittedCount == 0 && contentDelta != "" {
|
|
if !sentInitialRole {
|
|
responses <- schema.OpenAIResponse{
|
|
ID: id, Created: created, Model: req.Model,
|
|
Choices: []schema.Choice{{Delta: &schema.Message{Role: "assistant"}, Index: 0}},
|
|
Object: "chat.completion.chunk",
|
|
}
|
|
sentInitialRole = true
|
|
}
|
|
responses <- schema.OpenAIResponse{
|
|
ID: id, Created: created, Model: req.Model,
|
|
Choices: []schema.Choice{{
|
|
Delta: &schema.Message{Content: &contentDelta},
|
|
Index: 0,
|
|
}},
|
|
Object: "chat.completion.chunk",
|
|
}
|
|
}
|
|
|
|
// Try incremental XML parsing for streaming support using iterative parser
|
|
// This allows emitting partial tool calls as they're being generated
|
|
cleanedResult := functions.CleanupLLMResult(result, config.FunctionsConfig)
|
|
|
|
// Determine XML format from config
|
|
var xmlFormat *functions.XMLToolCallFormat
|
|
if config.FunctionsConfig.XMLFormat != nil {
|
|
xmlFormat = config.FunctionsConfig.XMLFormat
|
|
} else if config.FunctionsConfig.XMLFormatPreset != "" {
|
|
xmlFormat = functions.GetXMLFormatPreset(config.FunctionsConfig.XMLFormatPreset)
|
|
}
|
|
|
|
// Use iterative parser for streaming (partial parsing enabled)
|
|
// Try XML parsing first
|
|
partialResults, parseErr := functions.ParseXMLIterative(cleanedResult, xmlFormat, true)
|
|
if parseErr == nil && len(partialResults) > 0 {
|
|
// Emit new XML tool calls that weren't emitted before
|
|
if len(partialResults) > lastEmittedCount {
|
|
for i := lastEmittedCount; i < len(partialResults); i++ {
|
|
toolCall := partialResults[i]
|
|
initialMessage := schema.OpenAIResponse{
|
|
ID: id,
|
|
Created: created,
|
|
Model: req.Model,
|
|
Choices: []schema.Choice{{
|
|
Delta: &schema.Message{
|
|
Role: "assistant",
|
|
ToolCalls: []schema.ToolCall{
|
|
{
|
|
Index: i,
|
|
ID: id,
|
|
Type: "function",
|
|
FunctionCall: schema.FunctionCall{
|
|
Name: toolCall.Name,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
Index: 0,
|
|
FinishReason: nil,
|
|
}},
|
|
Object: "chat.completion.chunk",
|
|
}
|
|
select {
|
|
case responses <- initialMessage:
|
|
default:
|
|
}
|
|
}
|
|
lastEmittedCount = len(partialResults)
|
|
}
|
|
} else {
|
|
// Try JSON tool call parsing for streaming.
|
|
// Only emit NEW tool calls (same guard as XML parser above).
|
|
jsonResults, jsonErr := functions.ParseJSONIterative(cleanedResult, true)
|
|
if jsonErr == nil && len(jsonResults) > lastEmittedCount {
|
|
for i := lastEmittedCount; i < len(jsonResults); i++ {
|
|
jsonObj := jsonResults[i]
|
|
name, ok := jsonObj["name"].(string)
|
|
if !ok || name == "" {
|
|
continue
|
|
}
|
|
args := "{}"
|
|
if argsVal, ok := jsonObj["arguments"]; ok {
|
|
if argsStr, ok := argsVal.(string); ok {
|
|
args = argsStr
|
|
} else {
|
|
argsBytes, _ := json.Marshal(argsVal)
|
|
args = string(argsBytes)
|
|
}
|
|
}
|
|
initialMessage := schema.OpenAIResponse{
|
|
ID: id,
|
|
Created: created,
|
|
Model: req.Model,
|
|
Choices: []schema.Choice{{
|
|
Delta: &schema.Message{
|
|
Role: "assistant",
|
|
ToolCalls: []schema.ToolCall{
|
|
{
|
|
Index: i,
|
|
ID: id,
|
|
Type: "function",
|
|
FunctionCall: schema.FunctionCall{
|
|
Name: name,
|
|
Arguments: args,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
Index: 0,
|
|
FinishReason: nil,
|
|
}},
|
|
Object: "chat.completion.chunk",
|
|
}
|
|
responses <- initialMessage
|
|
}
|
|
lastEmittedCount = len(jsonResults)
|
|
}
|
|
}
|
|
return true
|
|
},
|
|
func(attempt int) bool {
|
|
// After streaming completes: check if we got actionable content
|
|
cleaned := extractor.CleanedContent()
|
|
// Check for tool calls from chat deltas (will be re-checked after ComputeChoices,
|
|
// but we need to know here whether to retry).
|
|
// Also check ChatDelta flags — when the C++ autoparser is active,
|
|
// tool calls and content are delivered via ChatDeltas while the
|
|
// raw message is cleared. Without this check, we'd retry
|
|
// unnecessarily, losing valid results and concatenating output.
|
|
hasToolCalls := lastEmittedCount > 0 || hasChatDeltaToolCalls
|
|
hasContent := cleaned != "" || hasChatDeltaContent
|
|
if !hasContent && !hasToolCalls {
|
|
xlog.Warn("Streaming: backend produced only reasoning, retrying",
|
|
"reasoning_len", len(extractor.Reasoning()), "attempt", attempt+1)
|
|
extractor.ResetAndSuppressReasoning()
|
|
result = ""
|
|
lastEmittedCount = 0
|
|
sentInitialRole = false
|
|
hasChatDeltaToolCalls = false
|
|
hasChatDeltaContent = false
|
|
return true
|
|
}
|
|
return false
|
|
},
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// Try using pre-parsed tool calls from C++ autoparser (chat deltas)
|
|
var functionResults []functions.FuncCallResults
|
|
var reasoning string
|
|
|
|
if deltaToolCalls := functions.ToolCallsFromChatDeltas(chatDeltas); len(deltaToolCalls) > 0 {
|
|
xlog.Debug("[ChatDeltas] Using pre-parsed tool calls from C++ autoparser", "count", len(deltaToolCalls))
|
|
functionResults = deltaToolCalls
|
|
// Use content/reasoning from deltas too
|
|
*textContentToReturn = functions.ContentFromChatDeltas(chatDeltas)
|
|
reasoning = functions.ReasoningFromChatDeltas(chatDeltas)
|
|
} else {
|
|
// Fallback: parse tool calls from raw text (no chat deltas from backend)
|
|
xlog.Debug("[ChatDeltas] no pre-parsed tool calls, falling back to Go-side text parsing")
|
|
reasoning = extractor.Reasoning()
|
|
cleanedResult := extractor.CleanedContent()
|
|
*textContentToReturn = functions.ParseTextContent(cleanedResult, config.FunctionsConfig)
|
|
cleanedResult = functions.CleanupLLMResult(cleanedResult, config.FunctionsConfig)
|
|
functionResults = functions.ParseFunctionCall(cleanedResult, config.FunctionsConfig)
|
|
}
|
|
xlog.Debug("[ChatDeltas] final tool call decision", "tool_calls", len(functionResults), "text_content", *textContentToReturn)
|
|
// noAction is a sentinel "just answer" pseudo-function — not a real
|
|
// tool call. Scan the whole slice rather than only index 0 so we
|
|
// don't drop a real tool call that happens to follow a noAction
|
|
// entry, and so the default branch isn't entered with only noAction
|
|
// entries to emit as tool_calls.
|
|
noActionToRun := !hasRealCall(functionResults, noAction)
|
|
|
|
switch {
|
|
case noActionToRun:
|
|
usage := schema.OpenAIUsage{
|
|
PromptTokens: tokenUsage.Prompt,
|
|
CompletionTokens: tokenUsage.Completion,
|
|
TotalTokens: tokenUsage.Prompt + tokenUsage.Completion,
|
|
}
|
|
if extraUsage {
|
|
usage.TimingTokenGeneration = tokenUsage.TimingTokenGeneration
|
|
usage.TimingPromptProcessing = tokenUsage.TimingPromptProcessing
|
|
}
|
|
|
|
var result string
|
|
if !sentInitialRole {
|
|
var hqErr error
|
|
result, hqErr = handleQuestion(config, functionResults, extractor.CleanedContent(), prompt)
|
|
if hqErr != nil {
|
|
xlog.Error("error handling question", "error", hqErr)
|
|
return hqErr
|
|
}
|
|
}
|
|
for _, chunk := range buildNoActionFinalChunks(
|
|
id, req.Model, created,
|
|
sentInitialRole, sentReasoning,
|
|
result, reasoning, usage,
|
|
) {
|
|
responses <- chunk
|
|
}
|
|
|
|
default:
|
|
for _, chunk := range buildDeferredToolCallChunks(
|
|
id, req.Model, created,
|
|
functionResults, lastEmittedCount,
|
|
sentInitialRole, *textContentToReturn,
|
|
sentReasoning, reasoning,
|
|
) {
|
|
responses <- chunk
|
|
}
|
|
}
|
|
|
|
close(responses)
|
|
return err
|
|
}
|
|
|
|
return func(c echo.Context) error {
|
|
var textContentToReturn string
|
|
id := uuid.New().String()
|
|
created := int(time.Now().Unix())
|
|
|
|
input, ok := c.Get(middleware.CONTEXT_LOCALS_KEY_LOCALAI_REQUEST).(*schema.OpenAIRequest)
|
|
if !ok || input.Model == "" {
|
|
return echo.ErrBadRequest
|
|
}
|
|
|
|
extraUsage := c.Request().Header.Get("Extra-Usage") != ""
|
|
|
|
config, ok := c.Get(middleware.CONTEXT_LOCALS_KEY_MODEL_CONFIG).(*config.ModelConfig)
|
|
if !ok || config == nil {
|
|
return echo.ErrBadRequest
|
|
}
|
|
|
|
xlog.Debug("Chat endpoint configuration read", "config", config)
|
|
|
|
funcs := input.Functions
|
|
shouldUseFn := len(input.Functions) > 0 && config.ShouldUseFunctions()
|
|
strictMode := false
|
|
|
|
// MCP tool injection: when mcp_servers is set in metadata and model has MCP config
|
|
var mcpExecutor mcpTools.ToolExecutor
|
|
mcpServers := mcpTools.MCPServersFromMetadata(input.Metadata)
|
|
|
|
// MCP prompt and resource injection (extracted before tool injection)
|
|
mcpPromptName, mcpPromptArgs := mcpTools.MCPPromptFromMetadata(input.Metadata)
|
|
mcpResourceURIs := mcpTools.MCPResourcesFromMetadata(input.Metadata)
|
|
|
|
if (len(mcpServers) > 0 || mcpPromptName != "" || len(mcpResourceURIs) > 0) && (config.MCP.Servers != "" || config.MCP.Stdio != "") {
|
|
remote, stdio, mcpErr := config.MCP.MCPConfigFromYAML()
|
|
if mcpErr == nil {
|
|
mcpExecutor = mcpTools.NewToolExecutor(c.Request().Context(), natsClient, config.Name, remote, stdio, mcpServers)
|
|
|
|
// Prompt and resource injection (pre-processing step — resolves locally regardless of distributed mode)
|
|
namedSessions, sessErr := mcpTools.NamedSessionsFromMCPConfig(config.Name, remote, stdio, mcpServers)
|
|
if sessErr == nil && len(namedSessions) > 0 {
|
|
mcpCtx, _ := mcpTools.InjectMCPContext(c.Request().Context(), namedSessions, mcpPromptName, mcpPromptArgs, mcpResourceURIs)
|
|
if mcpCtx != nil {
|
|
input.Messages = append(mcpCtx.PromptMessages, input.Messages...)
|
|
mcpTools.AppendResourceSuffix(input.Messages, mcpCtx.ResourceSuffix)
|
|
}
|
|
}
|
|
|
|
// Tool injection via executor
|
|
if mcpExecutor.HasTools() {
|
|
mcpFuncs, discErr := mcpExecutor.DiscoverTools(c.Request().Context())
|
|
if discErr == nil {
|
|
for _, fn := range mcpFuncs {
|
|
funcs = append(funcs, fn)
|
|
input.Tools = append(input.Tools, functions.Tool{Type: "function", Function: fn})
|
|
}
|
|
shouldUseFn = len(funcs) > 0 && config.ShouldUseFunctions()
|
|
xlog.Debug("MCP tools injected", "count", len(mcpFuncs), "total_funcs", len(funcs))
|
|
} else {
|
|
xlog.Error("Failed to discover MCP tools", "error", discErr)
|
|
}
|
|
}
|
|
} else {
|
|
xlog.Error("Failed to parse MCP config", "error", mcpErr)
|
|
}
|
|
}
|
|
|
|
xlog.Debug("Tool call routing decision",
|
|
"shouldUseFn", shouldUseFn,
|
|
"len(input.Functions)", len(input.Functions),
|
|
"len(input.Tools)", len(input.Tools),
|
|
"config.ShouldUseFunctions()", config.ShouldUseFunctions(),
|
|
"config.FunctionToCall()", config.FunctionToCall(),
|
|
)
|
|
|
|
for _, f := range input.Functions {
|
|
if f.Strict {
|
|
strictMode = true
|
|
break
|
|
}
|
|
}
|
|
|
|
// Allow the user to set custom actions via config file
|
|
// to be "embedded" in each model
|
|
noActionName := "answer"
|
|
noActionDescription := "use this action to answer without performing any action"
|
|
|
|
if config.FunctionsConfig.NoActionFunctionName != "" {
|
|
noActionName = config.FunctionsConfig.NoActionFunctionName
|
|
}
|
|
if config.FunctionsConfig.NoActionDescriptionName != "" {
|
|
noActionDescription = config.FunctionsConfig.NoActionDescriptionName
|
|
}
|
|
|
|
// If we are using a response format, we need to generate a grammar for it
|
|
if config.ResponseFormatMap != nil {
|
|
d := schema.ChatCompletionResponseFormat{}
|
|
dat, err := json.Marshal(config.ResponseFormatMap)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = json.Unmarshal(dat, &d)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
switch d.Type {
|
|
case "json_object":
|
|
input.Grammar = functions.JSONBNF
|
|
case "json_schema":
|
|
d := schema.JsonSchemaRequest{}
|
|
dat, err := json.Marshal(config.ResponseFormatMap)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = json.Unmarshal(dat, &d)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
fs := &functions.JSONFunctionStructure{
|
|
AnyOf: []functions.Item{d.JsonSchema.Schema},
|
|
}
|
|
g, err := fs.Grammar(config.FunctionsConfig.GrammarOptions()...)
|
|
if err == nil {
|
|
input.Grammar = g
|
|
} else {
|
|
xlog.Error("Failed generating grammar", "error", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
config.Grammar = input.Grammar
|
|
|
|
if shouldUseFn {
|
|
xlog.Debug("Response needs to process functions")
|
|
}
|
|
|
|
switch {
|
|
// Generates grammar with internal's LocalAI engine
|
|
case (!config.FunctionsConfig.GrammarConfig.NoGrammar || strictMode) && shouldUseFn:
|
|
noActionGrammar := functions.Function{
|
|
Name: noActionName,
|
|
Description: noActionDescription,
|
|
Parameters: map[string]any{
|
|
"properties": map[string]any{
|
|
"message": map[string]any{
|
|
"type": "string",
|
|
"description": "The message to reply the user with",
|
|
}},
|
|
},
|
|
}
|
|
|
|
// Append the no action function
|
|
if !config.FunctionsConfig.DisableNoAction && !strictMode {
|
|
funcs = append(funcs, noActionGrammar)
|
|
}
|
|
|
|
// Force picking one of the functions by the request
|
|
if config.FunctionToCall() != "" {
|
|
funcs = funcs.Select(config.FunctionToCall())
|
|
}
|
|
|
|
// Update input grammar or json_schema based on use_llama_grammar option
|
|
jsStruct := funcs.ToJSONStructure(config.FunctionsConfig.FunctionNameKey, config.FunctionsConfig.FunctionNameKey)
|
|
g, err := jsStruct.Grammar(config.FunctionsConfig.GrammarOptions()...)
|
|
if err == nil {
|
|
config.Grammar = g
|
|
} else {
|
|
xlog.Error("Failed generating grammar", "error", err)
|
|
}
|
|
case input.JSONFunctionGrammarObject != nil:
|
|
g, err := input.JSONFunctionGrammarObject.Grammar(config.FunctionsConfig.GrammarOptions()...)
|
|
if err == nil {
|
|
config.Grammar = g
|
|
} else {
|
|
xlog.Error("Failed generating grammar", "error", err)
|
|
}
|
|
|
|
default:
|
|
// Force picking one of the functions by the request
|
|
if config.FunctionToCall() != "" {
|
|
funcs = funcs.Select(config.FunctionToCall())
|
|
}
|
|
}
|
|
|
|
// process functions if we have any defined or if we have a function call string
|
|
|
|
// functions are not supported in stream mode (yet?)
|
|
toStream := input.Stream
|
|
|
|
xlog.Debug("Parameters", "config", config)
|
|
|
|
var predInput string
|
|
|
|
// If we are using the tokenizer template, we don't need to process the messages
|
|
// unless we are processing functions
|
|
if !config.TemplateConfig.UseTokenizerTemplate {
|
|
predInput = evaluator.TemplateMessages(*input, input.Messages, config, funcs, shouldUseFn)
|
|
|
|
xlog.Debug("Prompt (after templating)", "prompt", predInput)
|
|
if config.Grammar != "" {
|
|
xlog.Debug("Grammar", "grammar", config.Grammar)
|
|
}
|
|
}
|
|
|
|
switch {
|
|
case toStream:
|
|
|
|
xlog.Debug("Stream request received")
|
|
c.Response().Header().Set("Content-Type", "text/event-stream")
|
|
c.Response().Header().Set("Cache-Control", "no-cache")
|
|
c.Response().Header().Set("Connection", "keep-alive")
|
|
c.Response().Header().Set("X-Correlation-ID", id)
|
|
|
|
mcpStreamMaxIterations := 10
|
|
if config.Agent.MaxIterations > 0 {
|
|
mcpStreamMaxIterations = config.Agent.MaxIterations
|
|
}
|
|
hasMCPToolsStream := mcpExecutor != nil && mcpExecutor.HasTools()
|
|
|
|
for mcpStreamIter := 0; mcpStreamIter <= mcpStreamMaxIterations; mcpStreamIter++ {
|
|
// Re-template on MCP iterations
|
|
if mcpStreamIter > 0 && !config.TemplateConfig.UseTokenizerTemplate {
|
|
predInput = evaluator.TemplateMessages(*input, input.Messages, config, funcs, shouldUseFn)
|
|
xlog.Debug("MCP stream re-templating", "iteration", mcpStreamIter)
|
|
}
|
|
|
|
responses := make(chan schema.OpenAIResponse)
|
|
ended := make(chan error, 1)
|
|
|
|
go func() {
|
|
if !shouldUseFn {
|
|
ended <- process(predInput, input, config, ml, responses, extraUsage, id, created)
|
|
} else {
|
|
ended <- processTools(noActionName, predInput, input, config, ml, responses, extraUsage, id, created, &textContentToReturn)
|
|
}
|
|
}()
|
|
|
|
usage := &schema.OpenAIUsage{}
|
|
toolsCalled := false
|
|
var collectedToolCalls []schema.ToolCall
|
|
var collectedContent string
|
|
|
|
LOOP:
|
|
for {
|
|
select {
|
|
case <-input.Context.Done():
|
|
// Context was cancelled (client disconnected or request cancelled)
|
|
xlog.Debug("Request context cancelled, stopping stream")
|
|
input.Cancel()
|
|
break LOOP
|
|
case ev := <-responses:
|
|
if len(ev.Choices) == 0 {
|
|
xlog.Debug("No choices in the response, skipping")
|
|
continue
|
|
}
|
|
usage = &ev.Usage // Copy a pointer to the latest usage chunk so that the stop message can reference it
|
|
if len(ev.Choices[0].Delta.ToolCalls) > 0 {
|
|
toolsCalled = true
|
|
// Collect and merge tool call deltas for MCP execution
|
|
if hasMCPToolsStream {
|
|
collectedToolCalls = mergeToolCallDeltas(collectedToolCalls, ev.Choices[0].Delta.ToolCalls)
|
|
}
|
|
}
|
|
// Collect content for MCP conversation history and automatic tool parsing fallback
|
|
if (hasMCPToolsStream || config.FunctionsConfig.AutomaticToolParsingFallback) && ev.Choices[0].Delta != nil && ev.Choices[0].Delta.Content != nil {
|
|
if s, ok := ev.Choices[0].Delta.Content.(string); ok {
|
|
collectedContent += s
|
|
} else if sp, ok := ev.Choices[0].Delta.Content.(*string); ok && sp != nil {
|
|
collectedContent += *sp
|
|
}
|
|
}
|
|
respData, err := json.Marshal(ev)
|
|
if err != nil {
|
|
xlog.Debug("Failed to marshal response", "error", err)
|
|
input.Cancel()
|
|
continue
|
|
}
|
|
xlog.Debug("Sending chunk", "chunk", string(respData))
|
|
_, err = fmt.Fprintf(c.Response().Writer, "data: %s\n\n", string(respData))
|
|
if err != nil {
|
|
xlog.Debug("Sending chunk failed", "error", err)
|
|
input.Cancel()
|
|
return err
|
|
}
|
|
c.Response().Flush()
|
|
case err := <-ended:
|
|
if err == nil {
|
|
break LOOP
|
|
}
|
|
xlog.Error("Stream ended with error", "error", err)
|
|
|
|
errorResp := schema.ErrorResponse{
|
|
Error: &schema.APIError{
|
|
Message: err.Error(),
|
|
Type: "server_error",
|
|
Code: "server_error",
|
|
},
|
|
}
|
|
respData, marshalErr := json.Marshal(errorResp)
|
|
if marshalErr != nil {
|
|
xlog.Error("Failed to marshal error response", "error", marshalErr)
|
|
fmt.Fprintf(c.Response().Writer, "data: {\"error\":{\"message\":\"Internal error\",\"type\":\"server_error\"}}\n\n")
|
|
} else {
|
|
fmt.Fprintf(c.Response().Writer, "data: %s\n\n", respData)
|
|
}
|
|
fmt.Fprintf(c.Response().Writer, "data: [DONE]\n\n")
|
|
c.Response().Flush()
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Drain responses channel to unblock the background goroutine if it's
|
|
// still trying to send (e.g., after client disconnect). The goroutine
|
|
// calls close(responses) when done, which terminates the drain.
|
|
if input.Context.Err() != nil {
|
|
go func() { for range responses {} }()
|
|
<-ended
|
|
}
|
|
|
|
// MCP streaming tool execution: if we collected MCP tool calls, execute and loop
|
|
if hasMCPToolsStream && toolsCalled && len(collectedToolCalls) > 0 {
|
|
var hasMCPCalls bool
|
|
for _, tc := range collectedToolCalls {
|
|
if mcpExecutor != nil && mcpExecutor.IsTool(tc.FunctionCall.Name) {
|
|
hasMCPCalls = true
|
|
break
|
|
}
|
|
}
|
|
if hasMCPCalls {
|
|
// Append assistant message with tool_calls
|
|
assistantMsg := schema.Message{
|
|
Role: "assistant",
|
|
Content: collectedContent,
|
|
ToolCalls: collectedToolCalls,
|
|
}
|
|
input.Messages = append(input.Messages, assistantMsg)
|
|
|
|
// Execute MCP tool calls and stream results as tool_result events
|
|
for _, tc := range collectedToolCalls {
|
|
if mcpExecutor == nil || !mcpExecutor.IsTool(tc.FunctionCall.Name) {
|
|
continue
|
|
}
|
|
xlog.Debug("Executing MCP tool (stream)", "tool", tc.FunctionCall.Name, "iteration", mcpStreamIter)
|
|
toolResult, toolErr := mcpExecutor.ExecuteTool(c.Request().Context(), tc.FunctionCall.Name, tc.FunctionCall.Arguments)
|
|
if toolErr != nil {
|
|
xlog.Error("MCP tool execution failed", "tool", tc.FunctionCall.Name, "error", toolErr)
|
|
toolResult = fmt.Sprintf("Error: %v", toolErr)
|
|
}
|
|
input.Messages = append(input.Messages, schema.Message{
|
|
Role: "tool",
|
|
Content: toolResult,
|
|
StringContent: toolResult,
|
|
ToolCallID: tc.ID,
|
|
Name: tc.FunctionCall.Name,
|
|
})
|
|
|
|
// Stream tool result event to client
|
|
mcpEvent := map[string]any{
|
|
"type": "mcp_tool_result",
|
|
"name": tc.FunctionCall.Name,
|
|
"result": toolResult,
|
|
}
|
|
if mcpEventData, err := json.Marshal(mcpEvent); err == nil {
|
|
fmt.Fprintf(c.Response().Writer, "data: %s\n\n", mcpEventData)
|
|
c.Response().Flush()
|
|
}
|
|
}
|
|
|
|
xlog.Debug("MCP streaming tools executed, re-running inference", "iteration", mcpStreamIter)
|
|
continue // next MCP stream iteration
|
|
}
|
|
}
|
|
|
|
// Automatic tool parsing fallback for streaming: when no tools were
|
|
// requested but the model emitted tool call markup, parse and emit them.
|
|
if !shouldUseFn && config.FunctionsConfig.AutomaticToolParsingFallback && collectedContent != "" && !toolsCalled {
|
|
parsed := functions.ParseFunctionCall(collectedContent, config.FunctionsConfig)
|
|
for i, fc := range parsed {
|
|
toolCallID := fc.ID
|
|
if toolCallID == "" {
|
|
toolCallID = id
|
|
}
|
|
toolCallMsg := schema.OpenAIResponse{
|
|
ID: id,
|
|
Created: created,
|
|
Model: input.Model,
|
|
Choices: []schema.Choice{{
|
|
Delta: &schema.Message{
|
|
Role: "assistant",
|
|
ToolCalls: []schema.ToolCall{{
|
|
Index: i,
|
|
ID: toolCallID,
|
|
Type: "function",
|
|
FunctionCall: schema.FunctionCall{
|
|
Name: fc.Name,
|
|
Arguments: fc.Arguments,
|
|
},
|
|
}},
|
|
},
|
|
Index: 0,
|
|
}},
|
|
Object: "chat.completion.chunk",
|
|
}
|
|
respData, _ := json.Marshal(toolCallMsg)
|
|
fmt.Fprintf(c.Response().Writer, "data: %s\n\n", respData)
|
|
c.Response().Flush()
|
|
toolsCalled = true
|
|
}
|
|
}
|
|
|
|
// No MCP tools to execute, send final stop message
|
|
finishReason := FinishReasonStop
|
|
if toolsCalled && len(input.Tools) > 0 {
|
|
finishReason = FinishReasonToolCalls
|
|
} else if toolsCalled {
|
|
finishReason = FinishReasonFunctionCall
|
|
}
|
|
|
|
resp := &schema.OpenAIResponse{
|
|
ID: id,
|
|
Created: created,
|
|
Model: input.Model, // we have to return what the user sent here, due to OpenAI spec.
|
|
Choices: []schema.Choice{
|
|
{
|
|
FinishReason: &finishReason,
|
|
Index: 0,
|
|
Delta: &schema.Message{},
|
|
}},
|
|
Object: "chat.completion.chunk",
|
|
Usage: *usage,
|
|
}
|
|
respData, _ := json.Marshal(resp)
|
|
|
|
fmt.Fprintf(c.Response().Writer, "data: %s\n\n", respData)
|
|
fmt.Fprintf(c.Response().Writer, "data: [DONE]\n\n")
|
|
c.Response().Flush()
|
|
xlog.Debug("Stream ended")
|
|
return nil
|
|
} // end MCP stream iteration loop
|
|
|
|
// Safety fallback
|
|
fmt.Fprintf(c.Response().Writer, "data: [DONE]\n\n")
|
|
c.Response().Flush()
|
|
return nil
|
|
|
|
// no streaming mode
|
|
default:
|
|
mcpMaxIterations := 10
|
|
if config.Agent.MaxIterations > 0 {
|
|
mcpMaxIterations = config.Agent.MaxIterations
|
|
}
|
|
hasMCPTools := mcpExecutor != nil && mcpExecutor.HasTools()
|
|
|
|
for mcpIteration := 0; mcpIteration <= mcpMaxIterations; mcpIteration++ {
|
|
// Re-template on each MCP iteration since messages may have changed
|
|
if mcpIteration > 0 && !config.TemplateConfig.UseTokenizerTemplate {
|
|
predInput = evaluator.TemplateMessages(*input, input.Messages, config, funcs, shouldUseFn)
|
|
xlog.Debug("MCP re-templating", "iteration", mcpIteration, "prompt_len", len(predInput))
|
|
}
|
|
|
|
// Detect if thinking token is already in prompt or template
|
|
var template string
|
|
if config.TemplateConfig.UseTokenizerTemplate {
|
|
template = config.GetModelTemplate() // TODO: this should be the parsed jinja template. But for now this is the best we can do.
|
|
} else {
|
|
template = predInput
|
|
}
|
|
thinkingStartToken := reason.DetectThinkingStartToken(template, &config.ReasoningConfig)
|
|
|
|
xlog.Debug("Thinking start token", "thinkingStartToken", thinkingStartToken, "template", template)
|
|
|
|
// When shouldUseFn, the callback just stores the raw text — tool parsing
|
|
// is deferred to after ComputeChoices so we can check chat deltas first
|
|
// and avoid redundant Go-side parsing.
|
|
var cbRawResult, cbReasoning string
|
|
|
|
tokenCallback := func(s string, c *[]schema.Choice) {
|
|
reasoning, s := reason.ExtractReasoningWithConfig(s, thinkingStartToken, config.ReasoningConfig)
|
|
|
|
if !shouldUseFn {
|
|
stopReason := FinishReasonStop
|
|
message := &schema.Message{Role: "assistant", Content: &s}
|
|
if reasoning != "" {
|
|
message.Reasoning = &reasoning
|
|
}
|
|
*c = append(*c, schema.Choice{FinishReason: &stopReason, Index: 0, Message: message})
|
|
return
|
|
}
|
|
|
|
// Store raw text for deferred tool parsing
|
|
cbRawResult = s
|
|
cbReasoning = reasoning
|
|
}
|
|
|
|
var result []schema.Choice
|
|
var tokenUsage backend.TokenUsage
|
|
var err error
|
|
|
|
var chatDeltas []*pb.ChatDelta
|
|
result, tokenUsage, chatDeltas, err = ComputeChoices(
|
|
input,
|
|
predInput,
|
|
config,
|
|
cl,
|
|
startupOptions,
|
|
ml,
|
|
tokenCallback,
|
|
nil,
|
|
func(attempt int) bool {
|
|
if !shouldUseFn {
|
|
return false
|
|
}
|
|
// Retry when backend produced only reasoning and no content/tool calls.
|
|
// Full tool parsing is deferred until after ComputeChoices returns
|
|
// (when chat deltas are available), but we can detect the empty case here.
|
|
if cbRawResult == "" && textContentToReturn == "" {
|
|
xlog.Warn("Backend produced reasoning without actionable content, retrying",
|
|
"reasoning_len", len(cbReasoning), "attempt", attempt+1)
|
|
cbRawResult = ""
|
|
cbReasoning = ""
|
|
textContentToReturn = ""
|
|
return true
|
|
}
|
|
return false
|
|
},
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// For non-tool requests: prefer C++ autoparser chat deltas over
|
|
// Go-side tag extraction (which can mangle output when thinkingStartToken
|
|
// differs from the model's actual reasoning tags, e.g. Gemma 4).
|
|
if !shouldUseFn && len(chatDeltas) > 0 {
|
|
deltaContent := functions.ContentFromChatDeltas(chatDeltas)
|
|
deltaReasoning := functions.ReasoningFromChatDeltas(chatDeltas)
|
|
if deltaContent != "" || deltaReasoning != "" {
|
|
xlog.Debug("[ChatDeltas] non-SSE no-tools: overriding result with C++ autoparser deltas",
|
|
"content_len", len(deltaContent), "reasoning_len", len(deltaReasoning))
|
|
stopReason := FinishReasonStop
|
|
message := &schema.Message{Role: "assistant", Content: &deltaContent}
|
|
if deltaReasoning != "" {
|
|
message.Reasoning = &deltaReasoning
|
|
}
|
|
newChoice := schema.Choice{FinishReason: &stopReason, Index: 0, Message: message}
|
|
// Preserve logprobs from the original result
|
|
if len(result) > 0 && result[0].Logprobs != nil {
|
|
newChoice.Logprobs = result[0].Logprobs
|
|
}
|
|
result = []schema.Choice{newChoice}
|
|
}
|
|
}
|
|
|
|
// Tool parsing is deferred here (only when shouldUseFn) so chat deltas are available
|
|
if shouldUseFn {
|
|
var funcResults []functions.FuncCallResults
|
|
|
|
// Try pre-parsed tool calls from C++ autoparser first
|
|
if deltaToolCalls := functions.ToolCallsFromChatDeltas(chatDeltas); len(deltaToolCalls) > 0 {
|
|
xlog.Debug("[ChatDeltas] non-SSE: using C++ autoparser tool calls, skipping Go-side parsing", "count", len(deltaToolCalls))
|
|
funcResults = deltaToolCalls
|
|
textContentToReturn = functions.ContentFromChatDeltas(chatDeltas)
|
|
cbReasoning = functions.ReasoningFromChatDeltas(chatDeltas)
|
|
} else if deltaContent := functions.ContentFromChatDeltas(chatDeltas); len(chatDeltas) > 0 && deltaContent != "" {
|
|
// ChatDeltas have content but no tool calls — model answered without using tools.
|
|
// This happens with thinking models (e.g. Gemma 4) where the Go-side reasoning
|
|
// extraction misclassifies clean content as reasoning, leaving cbRawResult empty.
|
|
xlog.Debug("[ChatDeltas] non-SSE: using C++ autoparser content (no tool calls)", "content_len", len(deltaContent))
|
|
textContentToReturn = deltaContent
|
|
cbReasoning = functions.ReasoningFromChatDeltas(chatDeltas)
|
|
} else {
|
|
// Fallback: parse tool calls from raw text
|
|
xlog.Debug("[ChatDeltas] non-SSE: no chat deltas, falling back to Go-side text parsing")
|
|
textContentToReturn = functions.ParseTextContent(cbRawResult, config.FunctionsConfig)
|
|
cbRawResult = functions.CleanupLLMResult(cbRawResult, config.FunctionsConfig)
|
|
funcResults = functions.ParseFunctionCall(cbRawResult, config.FunctionsConfig)
|
|
}
|
|
|
|
// Content-based tool call fallback: if no tool calls were found,
|
|
// try parsing the raw result — ParseFunctionCall handles detection internally.
|
|
if len(funcResults) == 0 {
|
|
contentFuncResults := functions.ParseFunctionCall(cbRawResult, config.FunctionsConfig)
|
|
if len(contentFuncResults) > 0 {
|
|
funcResults = contentFuncResults
|
|
textContentToReturn = functions.StripToolCallMarkup(cbRawResult)
|
|
}
|
|
}
|
|
|
|
noActionsToRun := len(funcResults) > 0 && funcResults[0].Name == noActionName || len(funcResults) == 0
|
|
|
|
switch {
|
|
case noActionsToRun:
|
|
// Use textContentToReturn if available (e.g. from ChatDeltas),
|
|
// otherwise fall back to cbRawResult for legacy Go-side parsing.
|
|
questionInput := cbRawResult
|
|
if textContentToReturn != "" {
|
|
questionInput = textContentToReturn
|
|
}
|
|
qResult, qErr := handleQuestion(config, funcResults, questionInput, predInput)
|
|
if qErr != nil {
|
|
xlog.Error("error handling question", "error", qErr)
|
|
}
|
|
|
|
stopReason := FinishReasonStop
|
|
message := &schema.Message{Role: "assistant", Content: &qResult}
|
|
if cbReasoning != "" {
|
|
message.Reasoning = &cbReasoning
|
|
}
|
|
result = append(result, schema.Choice{
|
|
FinishReason: &stopReason,
|
|
Message: message,
|
|
})
|
|
default:
|
|
toolCallsReason := FinishReasonToolCalls
|
|
toolChoice := schema.Choice{
|
|
FinishReason: &toolCallsReason,
|
|
Message: &schema.Message{
|
|
Role: "assistant",
|
|
},
|
|
}
|
|
if cbReasoning != "" {
|
|
toolChoice.Message.Reasoning = &cbReasoning
|
|
}
|
|
|
|
for _, ss := range funcResults {
|
|
name, args := ss.Name, ss.Arguments
|
|
toolCallID := ss.ID
|
|
if toolCallID == "" {
|
|
toolCallID = id
|
|
}
|
|
if len(input.Tools) > 0 {
|
|
toolChoice.Message.Content = textContentToReturn
|
|
toolChoice.Message.ToolCalls = append(toolChoice.Message.ToolCalls,
|
|
schema.ToolCall{
|
|
ID: toolCallID,
|
|
Type: "function",
|
|
FunctionCall: schema.FunctionCall{
|
|
Name: name,
|
|
Arguments: args,
|
|
},
|
|
},
|
|
)
|
|
} else {
|
|
// Deprecated function_call format
|
|
functionCallReason := FinishReasonFunctionCall
|
|
message := &schema.Message{
|
|
Role: "assistant",
|
|
Content: &textContentToReturn,
|
|
FunctionCall: map[string]any{
|
|
"name": name,
|
|
"arguments": args,
|
|
},
|
|
}
|
|
if cbReasoning != "" {
|
|
message.Reasoning = &cbReasoning
|
|
}
|
|
result = append(result, schema.Choice{
|
|
FinishReason: &functionCallReason,
|
|
Message: message,
|
|
})
|
|
}
|
|
}
|
|
|
|
if len(input.Tools) > 0 {
|
|
result = append(result, toolChoice)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Automatic tool parsing fallback: when no tools/functions were in the
|
|
// request but the model emitted tool call markup, parse and surface them.
|
|
if !shouldUseFn && config.FunctionsConfig.AutomaticToolParsingFallback && len(result) > 0 {
|
|
for i, choice := range result {
|
|
if choice.Message == nil || choice.Message.Content == nil {
|
|
continue
|
|
}
|
|
contentStr, ok := choice.Message.Content.(string)
|
|
if !ok || contentStr == "" {
|
|
continue
|
|
}
|
|
parsed := functions.ParseFunctionCall(contentStr, config.FunctionsConfig)
|
|
if len(parsed) == 0 {
|
|
continue
|
|
}
|
|
stripped := functions.StripToolCallMarkup(contentStr)
|
|
toolCallsReason := FinishReasonToolCalls
|
|
result[i].FinishReason = &toolCallsReason
|
|
if stripped != "" {
|
|
result[i].Message.Content = &stripped
|
|
} else {
|
|
result[i].Message.Content = nil
|
|
}
|
|
for _, fc := range parsed {
|
|
toolCallID := fc.ID
|
|
if toolCallID == "" {
|
|
toolCallID = id
|
|
}
|
|
result[i].Message.ToolCalls = append(result[i].Message.ToolCalls,
|
|
schema.ToolCall{
|
|
ID: toolCallID,
|
|
Type: "function",
|
|
FunctionCall: schema.FunctionCall{
|
|
Name: fc.Name,
|
|
Arguments: fc.Arguments,
|
|
},
|
|
},
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
// MCP server-side tool execution loop:
|
|
// If we have MCP tools and the model returned tool_calls, execute MCP tools
|
|
// and re-run inference with the results appended to the conversation.
|
|
if hasMCPTools && len(result) > 0 {
|
|
var mcpCallsExecuted bool
|
|
for _, choice := range result {
|
|
if choice.Message == nil || len(choice.Message.ToolCalls) == 0 {
|
|
continue
|
|
}
|
|
// Check if any tool calls are MCP tools
|
|
var hasMCPCalls bool
|
|
for _, tc := range choice.Message.ToolCalls {
|
|
if mcpExecutor != nil && mcpExecutor.IsTool(tc.FunctionCall.Name) {
|
|
hasMCPCalls = true
|
|
break
|
|
}
|
|
}
|
|
if !hasMCPCalls {
|
|
continue
|
|
}
|
|
|
|
// Append assistant message with tool_calls to conversation
|
|
assistantContent := ""
|
|
if choice.Message.Content != nil {
|
|
if s, ok := choice.Message.Content.(string); ok {
|
|
assistantContent = s
|
|
} else if sp, ok := choice.Message.Content.(*string); ok && sp != nil {
|
|
assistantContent = *sp
|
|
}
|
|
}
|
|
assistantMsg := schema.Message{
|
|
Role: "assistant",
|
|
Content: assistantContent,
|
|
ToolCalls: choice.Message.ToolCalls,
|
|
}
|
|
input.Messages = append(input.Messages, assistantMsg)
|
|
|
|
// Execute each MCP tool call and append results
|
|
for _, tc := range choice.Message.ToolCalls {
|
|
if mcpExecutor == nil || !mcpExecutor.IsTool(tc.FunctionCall.Name) {
|
|
continue
|
|
}
|
|
xlog.Debug("Executing MCP tool", "tool", tc.FunctionCall.Name, "arguments", tc.FunctionCall.Arguments, "iteration", mcpIteration)
|
|
toolResult, toolErr := mcpExecutor.ExecuteTool(c.Request().Context(), tc.FunctionCall.Name, tc.FunctionCall.Arguments)
|
|
if toolErr != nil {
|
|
xlog.Error("MCP tool execution failed", "tool", tc.FunctionCall.Name, "error", toolErr)
|
|
toolResult = fmt.Sprintf("Error: %v", toolErr)
|
|
}
|
|
input.Messages = append(input.Messages, schema.Message{
|
|
Role: "tool",
|
|
Content: toolResult,
|
|
StringContent: toolResult,
|
|
ToolCallID: tc.ID,
|
|
Name: tc.FunctionCall.Name,
|
|
})
|
|
mcpCallsExecuted = true
|
|
}
|
|
}
|
|
|
|
if mcpCallsExecuted {
|
|
xlog.Debug("MCP tools executed, re-running inference", "iteration", mcpIteration, "messages", len(input.Messages))
|
|
continue // next MCP iteration
|
|
}
|
|
}
|
|
|
|
// No MCP tools to execute (or no MCP tools configured), return response
|
|
usage := schema.OpenAIUsage{
|
|
PromptTokens: tokenUsage.Prompt,
|
|
CompletionTokens: tokenUsage.Completion,
|
|
TotalTokens: tokenUsage.Prompt + tokenUsage.Completion,
|
|
}
|
|
if extraUsage {
|
|
usage.TimingTokenGeneration = tokenUsage.TimingTokenGeneration
|
|
usage.TimingPromptProcessing = tokenUsage.TimingPromptProcessing
|
|
}
|
|
|
|
resp := &schema.OpenAIResponse{
|
|
ID: id,
|
|
Created: created,
|
|
Model: input.Model, // we have to return what the user sent here, due to OpenAI spec.
|
|
Choices: result,
|
|
Object: "chat.completion",
|
|
Usage: usage,
|
|
}
|
|
respData, _ := json.Marshal(resp)
|
|
xlog.Debug("Response", "response", string(respData))
|
|
|
|
// Return the prediction in the response body
|
|
return c.JSON(200, resp)
|
|
} // end MCP iteration loop
|
|
|
|
// Should not reach here, but safety fallback
|
|
return fmt.Errorf("MCP iteration limit reached")
|
|
}
|
|
}
|
|
}
|
|
|
|
func handleQuestion(config *config.ModelConfig, funcResults []functions.FuncCallResults, result, prompt string) (string, error) {
|
|
|
|
if len(funcResults) == 0 && result != "" {
|
|
xlog.Debug("nothing function results but we had a message from the LLM")
|
|
|
|
return result, nil
|
|
}
|
|
|
|
xlog.Debug("nothing to do, computing a reply")
|
|
arg := ""
|
|
if len(funcResults) > 0 {
|
|
arg = funcResults[0].Arguments
|
|
}
|
|
// If there is a message that the LLM already sends as part of the JSON reply, use it
|
|
arguments := map[string]any{}
|
|
if err := json.Unmarshal([]byte(arg), &arguments); err != nil {
|
|
xlog.Debug("handleQuestion: function result did not contain a valid JSON object")
|
|
}
|
|
m, exists := arguments["message"]
|
|
if exists {
|
|
switch message := m.(type) {
|
|
case string:
|
|
if message != "" {
|
|
xlog.Debug("Reply received from LLM", "message", message)
|
|
message = backend.Finetune(*config, prompt, message)
|
|
xlog.Debug("Reply received from LLM(finetuned)", "message", message)
|
|
|
|
return message, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
xlog.Debug("No action received from LLM, without a message, computing a reply")
|
|
|
|
return "", nil
|
|
}
|