diff --git a/core/http/endpoints/openai/chat.go b/core/http/endpoints/openai/chat.go index cf2f05663..04bd28b36 100644 --- a/core/http/endpoints/openai/chat.go +++ b/core/http/endpoints/openai/chat.go @@ -174,8 +174,78 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator result := "" lastEmittedCount := 0 + + // Track accumulated content for incremental reasoning and content extraction (mirrors process()) + accumulatedContent := "" + lastEmittedReasoning := "" + lastEmittedCleanedContent := "" + sentInitialRole := false + _, tokenUsage, chatDeltas, err := ComputeChoices(req, prompt, config, cl, startupOptions, loader, func(s string, c *[]schema.Choice) {}, func(s string, usage backend.TokenUsage) bool { result += s + accumulatedContent += s + + // Incremental reasoning extraction — emit reasoning deltas in their own SSE chunks + // before any tool-call chunks (OpenAI spec: reasoning and tool_calls never share a delta) + currentReasoning, cleanedContent := reason.ExtractReasoningWithConfig(accumulatedContent, thinkingStartToken, config.ReasoningConfig) + + var reasoningDelta *string + if currentReasoning != lastEmittedReasoning { + if len(currentReasoning) > len(lastEmittedReasoning) && strings.HasPrefix(currentReasoning, lastEmittedReasoning) { + newReasoning := currentReasoning[len(lastEmittedReasoning):] + reasoningDelta = &newReasoning + lastEmittedReasoning = currentReasoning + } else if currentReasoning != "" { + reasoningDelta = ¤tReasoning + lastEmittedReasoning = currentReasoning + } + } + + if reasoningDelta != nil && *reasoningDelta != "" { + responses <- schema.OpenAIResponse{ + ID: id, + Created: created, + Model: req.Model, + Choices: []schema.Choice{{ + Delta: &schema.Message{Reasoning: reasoningDelta}, + Index: 0, + }}, + Object: "chat.completion.chunk", + } + } + + // Stream content deltas (cleaned of reasoning tags) while no tool calls + // have been detected. Once the incremental parser finds tool calls, + // content stops — per OpenAI spec, content and tool_calls don't mix. + if lastEmittedCount == 0 && cleanedContent != "" { + var deltaContent string + if len(cleanedContent) > len(lastEmittedCleanedContent) && strings.HasPrefix(cleanedContent, lastEmittedCleanedContent) { + deltaContent = cleanedContent[len(lastEmittedCleanedContent):] + lastEmittedCleanedContent = cleanedContent + } else if cleanedContent != lastEmittedCleanedContent { + deltaContent = cleanedContent + lastEmittedCleanedContent = cleanedContent + } + if deltaContent != "" { + if !sentInitialRole { + responses <- schema.OpenAIResponse{ + ID: id, Created: created, Model: req.Model, + Choices: []schema.Choice{{Delta: &schema.Message{Role: "assistant"}, Index: 0}}, + Object: "chat.completion.chunk", + } + sentInitialRole = true + } + responses <- schema.OpenAIResponse{ + ID: id, Created: created, Model: req.Model, + Choices: []schema.Choice{{ + Delta: &schema.Message{Content: &deltaContent}, + Index: 0, + }}, + Object: "chat.completion.chunk", + } + } + } + // Try incremental XML parsing for streaming support using iterative parser // This allows emitting partial tool calls as they're being generated cleanedResult := functions.CleanupLLMResult(result, config.FunctionsConfig) @@ -306,20 +376,6 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator switch { case noActionToRun: - initialMessage := schema.OpenAIResponse{ - ID: id, - Created: created, - Model: req.Model, // we have to return what the user sent here, due to OpenAI spec. - Choices: []schema.Choice{{Delta: &schema.Message{Role: "assistant"}, Index: 0, FinishReason: nil}}, - Object: "chat.completion.chunk", - } - responses <- initialMessage - - result, err := handleQuestion(config, functionResults, result, prompt) - if err != nil { - xlog.Error("error handling question", "error", err) - return err - } usage := schema.OpenAIUsage{ PromptTokens: tokenUsage.Prompt, CompletionTokens: tokenUsage.Completion, @@ -330,25 +386,43 @@ func ChatEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, evaluator usage.TimingPromptProcessing = tokenUsage.TimingPromptProcessing } - var deltaReasoning *string - if reasoning != "" { - deltaReasoning = &reasoning - } - delta := &schema.Message{Content: &result} - if deltaReasoning != nil { - delta.Reasoning = deltaReasoning - } + if sentInitialRole { + // Content was already streamed during the callback — just emit usage. + delta := &schema.Message{} + if reasoning != "" && lastEmittedReasoning == "" { + delta.Reasoning = &reasoning + } + responses <- schema.OpenAIResponse{ + ID: id, Created: created, Model: req.Model, + Choices: []schema.Choice{{Delta: delta, Index: 0}}, + Object: "chat.completion.chunk", + Usage: usage, + } + } else { + // Content was NOT streamed — send everything at once (fallback). + responses <- schema.OpenAIResponse{ + ID: id, Created: created, Model: req.Model, + Choices: []schema.Choice{{Delta: &schema.Message{Role: "assistant"}, Index: 0}}, + Object: "chat.completion.chunk", + } - resp := schema.OpenAIResponse{ - ID: id, - Created: created, - Model: req.Model, // we have to return what the user sent here, due to OpenAI spec. - Choices: []schema.Choice{{Delta: delta, Index: 0, FinishReason: nil}}, - Object: "chat.completion.chunk", - Usage: usage, - } + result, err := handleQuestion(config, functionResults, result, prompt) + if err != nil { + xlog.Error("error handling question", "error", err) + return err + } - responses <- resp + delta := &schema.Message{Content: &result} + if reasoning != "" { + delta.Reasoning = &reasoning + } + responses <- schema.OpenAIResponse{ + ID: id, Created: created, Model: req.Model, + Choices: []schema.Choice{{Delta: delta, Index: 0}}, + Object: "chat.completion.chunk", + Usage: usage, + } + } default: for i, ss := range functionResults { diff --git a/core/http/endpoints/openresponses/responses.go b/core/http/endpoints/openresponses/responses.go index dd51e1a36..9b0ae2a23 100644 --- a/core/http/endpoints/openresponses/responses.go +++ b/core/http/endpoints/openresponses/responses.go @@ -1737,6 +1737,16 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6 for mcpStreamIter := 0; mcpStreamIter <= mcpStreamMaxIterations; mcpStreamIter++ { if mcpStreamIter > 0 { + // Reset reasoning and tool-call state for re-inference so reasoning + // extraction runs again on subsequent iterations + inToolCallMode = false + accumulatedContent = "" + lastEmittedReasoning = "" + lastEmittedCleanedContent = "" + currentMessageID = "" + lastEmittedToolCallCount = 0 + currentReasoningID = "" + predInput = evaluator.TemplateMessages(*openAIReq, openAIReq.Messages, cfg, funcs, shouldUseFn) xlog.Debug("Open Responses stream MCP re-templating", "iteration", mcpStreamIter) images = images[:0] diff --git a/core/http/react-ui/src/pages/AgentChat.jsx b/core/http/react-ui/src/pages/AgentChat.jsx index 757ae74b9..908e2d9af 100644 --- a/core/http/react-ui/src/pages/AgentChat.jsx +++ b/core/http/react-ui/src/pages/AgentChat.jsx @@ -104,6 +104,9 @@ export default function AgentChat() { const [editingName, setEditingName] = useState(null) const [editName, setEditName] = useState('') const [chatSearch, setChatSearch] = useState('') + const [streamContent, setStreamContent] = useState('') + const [streamReasoning, setStreamReasoning] = useState('') + const [streamToolCalls, setStreamToolCalls] = useState([]) const messagesEndRef = useRef(null) const messagesRef = useRef(null) const textareaRef = useRef(null) @@ -150,8 +153,41 @@ export default function AgentChat() { const data = JSON.parse(e.data) if (data.status === 'processing') { setProcessingChatId(activeIdRef.current) + setStreamContent('') + setStreamReasoning('') + setStreamToolCalls([]) } else if (data.status === 'completed') { setProcessingChatId(null) + setStreamContent('') + setStreamReasoning('') + setStreamToolCalls([]) + } + } catch (_err) { + // ignore + } + }) + + es.addEventListener('stream_event', (e) => { + try { + const data = JSON.parse(e.data) + if (data.type === 'reasoning') { + setStreamReasoning(prev => prev + (data.content || '')) + } else if (data.type === 'content') { + setStreamContent(prev => prev + (data.content || '')) + } else if (data.type === 'tool_call') { + const name = data.tool_name || '' + const args = data.tool_args || '' + setStreamToolCalls(prev => { + if (name) { + return [...prev, { name, args }] + } + if (prev.length === 0) return prev + const updated = [...prev] + updated[updated.length - 1] = { ...updated[updated.length - 1], args: updated[updated.length - 1].args + args } + return updated + }) + } else if (data.type === 'done') { + // Content will be finalized by json_message event } } catch (_err) { // ignore @@ -192,7 +228,7 @@ export default function AgentChat() { // Auto-scroll to bottom useEffect(() => { messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' }) - }, [messages]) + }, [messages, streamContent, streamReasoning, streamToolCalls]) // Highlight code blocks useEffect(() => { @@ -537,7 +573,50 @@ export default function AgentChat() { flushSystem('end') return elements })()} - {processing && ( + {processing && (streamReasoning || streamContent || streamToolCalls.length > 0) && ( +
+
+ +
+
+ {streamReasoning && ( +
+ + + {streamContent ? 'Thinking' : 'Thinking...'} + + +
+
+
+
+
+
+ )} + {streamToolCalls.length > 0 && !streamContent && ( +
+ {streamToolCalls.map((tc, idx) => ( +
+ + + {tc.name} + + calling... +
+ ))} +
+ )} + {streamContent && ( +
+ + +
+ )} +
+
+ )} + {processing && !streamReasoning && !streamContent && streamToolCalls.length === 0 && (
diff --git a/core/http/react-ui/src/pages/AgentJobDetails.jsx b/core/http/react-ui/src/pages/AgentJobDetails.jsx index b8fcedd39..8729c5243 100644 --- a/core/http/react-ui/src/pages/AgentJobDetails.jsx +++ b/core/http/react-ui/src/pages/AgentJobDetails.jsx @@ -8,6 +8,9 @@ const traceColors = { tool_call: { bg: 'rgba(139,92,246,0.1)', border: 'rgba(139,92,246,0.3)', icon: 'fa-wrench', color: 'var(--color-accent)' }, tool_result: { bg: 'rgba(34,197,94,0.1)', border: 'rgba(34,197,94,0.3)', icon: 'fa-check', color: 'var(--color-success)' }, status: { bg: 'rgba(245,158,11,0.1)', border: 'rgba(245,158,11,0.3)', icon: 'fa-info-circle', color: 'var(--color-warning)' }, + stream_reasoning: { bg: 'rgba(99,102,241,0.06)', border: 'rgba(99,102,241,0.2)', icon: 'fa-lightbulb', color: 'var(--color-primary)' }, + stream_content: { bg: 'rgba(59,130,246,0.08)', border: 'rgba(59,130,246,0.25)', icon: 'fa-pen-nib', color: 'var(--color-info, #3b82f6)' }, + stream_tool_call: { bg: 'rgba(139,92,246,0.06)', border: 'rgba(139,92,246,0.2)', icon: 'fa-bolt', color: 'var(--color-accent)' }, } function TraceCard({ trace, index }) { diff --git a/core/services/agent_jobs.go b/core/services/agent_jobs.go index 8cf9777e8..10fa92ca7 100644 --- a/core/services/agent_jobs.go +++ b/core/services/agent_jobs.go @@ -887,6 +887,35 @@ func (s *AgentJobService) executeJobInternal(job schema.Job, task schema.Task, c job.Traces = append(job.Traces, trace) s.jobs.Set(job.ID, job) }), + cogito.WithStreamCallback(func(ev cogito.StreamEvent) { + switch ev.Type { + case cogito.StreamEventReasoning: + trace := schema.JobTrace{ + Type: "stream_reasoning", + Content: ev.Content, + Timestamp: time.Now(), + } + job.Traces = append(job.Traces, trace) + s.jobs.Set(job.ID, job) + case cogito.StreamEventContent: + trace := schema.JobTrace{ + Type: "stream_content", + Content: ev.Content, + Timestamp: time.Now(), + } + job.Traces = append(job.Traces, trace) + s.jobs.Set(job.ID, job) + case cogito.StreamEventToolCall: + trace := schema.JobTrace{ + Type: "stream_tool_call", + Content: ev.ToolArgs, + ToolName: ev.ToolName, + Timestamp: time.Now(), + } + job.Traces = append(job.Traces, trace) + s.jobs.Set(job.ID, job) + } + }), ) // Execute tools diff --git a/go.mod b/go.mod index ba23fea78..e3eddb077 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/mholt/archiver/v3 v3.5.1 github.com/microcosm-cc/bluemonday v1.0.27 github.com/modelcontextprotocol/go-sdk v1.4.0 - github.com/mudler/cogito v0.9.5-0.20260313170202-42271c7e1a6b + github.com/mudler/cogito v0.9.5-0.20260315222927-63abdec7189b github.com/mudler/edgevpn v0.31.1 github.com/mudler/go-processmanager v0.1.0 github.com/mudler/memory v0.0.0-20251216220809-d1256471a6c2 @@ -128,7 +128,7 @@ require ( github.com/kevinburke/ssh_config v1.2.0 // indirect github.com/labstack/gommon v0.4.2 // indirect github.com/mschoch/smat v0.2.0 // indirect - github.com/mudler/LocalAGI v0.0.0-20260314222828-e38f13ab8cec + github.com/mudler/LocalAGI v0.0.0-20260315223407-da286065e126 github.com/mudler/localrecall v0.5.9-0.20260314221856-96d63875cc47 // indirect github.com/mudler/skillserver v0.0.5-0.20260221145827-0639a82c8f49 github.com/olekukonko/tablewriter v0.0.5 // indirect diff --git a/go.sum b/go.sum index b477239d2..5902da1e0 100644 --- a/go.sum +++ b/go.sum @@ -654,10 +654,10 @@ github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= -github.com/mudler/LocalAGI v0.0.0-20260314222828-e38f13ab8cec h1:Y6JYhfJidFktfmQC00SwHtQVh0lr0O52qihgTKddSNU= -github.com/mudler/LocalAGI v0.0.0-20260314222828-e38f13ab8cec/go.mod h1:yf+IlZzQCGgKPGFn5yclzA2Dxxhy75K3YDubkjCub04= -github.com/mudler/cogito v0.9.5-0.20260313170202-42271c7e1a6b h1:Hs2Byjnukgkwm5Vw7z5aSZEjznPHaxjr2vLc5Uu1fHQ= -github.com/mudler/cogito v0.9.5-0.20260313170202-42271c7e1a6b/go.mod h1:6sfja3lcu2nWRzEc0wwqGNu/eCG3EWgij+8s7xyUeQ4= +github.com/mudler/LocalAGI v0.0.0-20260315223407-da286065e126 h1:7owcRhvwMP5BDDPsoK8TLnOBY4khcGJXutMY7pe9lhc= +github.com/mudler/LocalAGI v0.0.0-20260315223407-da286065e126/go.mod h1:w8kG2r/TlADJ4SnYemPNirW1pdHsqM/RAdCPk9r5Ll0= +github.com/mudler/cogito v0.9.5-0.20260315222927-63abdec7189b h1:A74T2Lauvg61KodYqsjTYDY05kPLcW+efVZjd23dghU= +github.com/mudler/cogito v0.9.5-0.20260315222927-63abdec7189b/go.mod h1:6sfja3lcu2nWRzEc0wwqGNu/eCG3EWgij+8s7xyUeQ4= github.com/mudler/edgevpn v0.31.1 h1:7qegiDWd0kAg6ljhNHxqvp8hbo/6BbzSdbb7/2WZfiY= github.com/mudler/edgevpn v0.31.1/go.mod h1:ftV5B0nKFzm4R8vR80UYnCb2nf7lxCRgAALxUEEgCf8= github.com/mudler/go-piper v0.0.0-20241023091659-2494246fd9fc h1:RxwneJl1VgvikiX28EkpdAyL4yQVnJMrbquKospjHyA=