diff --git a/core/http/endpoints/openai/realtime_transport_webrtc.go b/core/http/endpoints/openai/realtime_transport_webrtc.go index b687654bd..9ddec5edb 100644 --- a/core/http/endpoints/openai/realtime_transport_webrtc.go +++ b/core/http/endpoints/openai/realtime_transport_webrtc.go @@ -113,8 +113,13 @@ func (t *WebRTCTransport) sendLoop() { return } if err := t.dc.SendText(string(data)); err != nil { - xlog.Error("data channel send failed", "error", err) - return + // Drop just this event and keep the loop alive: a single + // failed send (e.g. an event over the negotiated SCTP + // max-message-size) must not tear down the session and + // silently drop every subsequent event. A genuinely dead + // transport is handled by the <-t.closed case. + xlog.Error("data channel send failed, dropping event", "error", err) + continue } case <-t.closed: // Drain any remaining queued events before exiting @@ -122,7 +127,8 @@ func (t *WebRTCTransport) sendLoop() { select { case data := <-t.outEvents: if err := t.dc.SendText(string(data)); err != nil { - return + xlog.Error("data channel send failed while draining, dropping event", "error", err) + continue } default: return diff --git a/core/http/endpoints/openai/realtime_webrtc.go b/core/http/endpoints/openai/realtime_webrtc.go index 0ac982c19..26edf94ea 100644 --- a/core/http/endpoints/openai/realtime_webrtc.go +++ b/core/http/endpoints/openai/realtime_webrtc.go @@ -128,10 +128,13 @@ func RealtimeCalls(application *application.Application) echo.HandlerFunc { handleIncomingAudioTrack(track, transport) }) - // Set the remote SDP (client's offer) + // Set the remote SDP (client's offer). Raise the data-channel + // max-message-size the browser advertised so pion permits the larger + // realtime events some turns produce (e.g. tool calls), which would + // otherwise be dropped on send. See realtime_webrtc_sctp.go. if err := pc.SetRemoteDescription(webrtc.SessionDescription{ Type: webrtc.SDPTypeOffer, - SDP: req.SDP, + SDP: raiseDataChannelMaxMessageSize(req.SDP), }); err != nil { transport.Close() xlog.Error("failed to set remote description", "error", err) diff --git a/core/http/endpoints/openai/realtime_webrtc_sctp.go b/core/http/endpoints/openai/realtime_webrtc_sctp.go new file mode 100644 index 000000000..b0355ba70 --- /dev/null +++ b/core/http/endpoints/openai/realtime_webrtc_sctp.go @@ -0,0 +1,29 @@ +package openai + +import ( + "fmt" + "regexp" +) + +// realtimeDataChannelMaxMessageSize is the SCTP max-message-size LocalAI honors +// for the "oai-events" data channel, in bytes. +// +// Browsers advertise a conservative max-message-size in their SDP offer (Chrome +// uses 262144 = 256 KiB). pion enforces the remote's advertised value on send, +// so a single realtime event larger than it cannot be sent: the SendText fails, +// the event is dropped, and the turn silently yields no response. Some turns +// legitimately produce a single JSON event above 256 KiB (notably tool calls +// with sizeable schemas or results). Browsers advertise this value +// conservatively but their SCTP stacks reassemble much larger messages, so we +// raise the value honored for our own server-generated events. +const realtimeDataChannelMaxMessageSize = 16 * 1024 * 1024 // 16 MiB + +var maxMessageSizeAttrRe = regexp.MustCompile(`a=max-message-size:\d+`) + +// raiseDataChannelMaxMessageSize rewrites the SCTP max-message-size attribute in +// an SDP offer to realtimeDataChannelMaxMessageSize so pion permits larger +// outbound realtime events. Offers that don't carry the attribute are returned +// unchanged. +func raiseDataChannelMaxMessageSize(sdp string) string { + return maxMessageSizeAttrRe.ReplaceAllString(sdp, fmt.Sprintf("a=max-message-size:%d", realtimeDataChannelMaxMessageSize)) +} diff --git a/core/http/endpoints/openai/realtime_webrtc_sctp_test.go b/core/http/endpoints/openai/realtime_webrtc_sctp_test.go new file mode 100644 index 000000000..92da4e706 --- /dev/null +++ b/core/http/endpoints/openai/realtime_webrtc_sctp_test.go @@ -0,0 +1,33 @@ +package openai + +import ( + "fmt" + "strings" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("raiseDataChannelMaxMessageSize", func() { + It("raises a max-message-size the browser advertised", func() { + offer := "v=0\r\nm=application 9 UDP/DTLS/SCTP webrtc-datachannel\r\na=max-message-size:262144\r\n" + out := raiseDataChannelMaxMessageSize(offer) + Expect(out).To(ContainSubstring(fmt.Sprintf("a=max-message-size:%d", realtimeDataChannelMaxMessageSize))) + Expect(out).NotTo(ContainSubstring("a=max-message-size:262144")) + }) + + It("leaves an offer without the attribute unchanged", func() { + offer := "v=0\r\nm=application 9 UDP/DTLS/SCTP webrtc-datachannel\r\n" + Expect(raiseDataChannelMaxMessageSize(offer)).To(Equal(offer)) + }) + + It("rewrites every occurrence", func() { + offer := "a=max-message-size:1024\r\na=max-message-size:262144\r\n" + out := raiseDataChannelMaxMessageSize(offer) + Expect(strings.Count(out, fmt.Sprintf("a=max-message-size:%d", realtimeDataChannelMaxMessageSize))).To(Equal(2)) + }) + + It("raises above the 256 KiB browsers advertise", func() { + Expect(realtimeDataChannelMaxMessageSize).To(BeNumerically(">", 262144)) + }) +})