mirror of
https://github.com/mudler/LocalAI.git
synced 2026-06-28 02:17:00 -04:00
fix(openresponses): bound resume-stream buffer and enforce response ownership (#10569)
The background=true resumable-stream path had two latent issues. 1. Unbounded resume buffer. AppendEvent grew StreamEvents without limit, so a long-running or abandoned background generation could consume process memory without bound. The store now caps the buffer (event count and total bytes, mirroring llama.cpp's byte-capped slot ring), evicting oldest events from the front and advancing a droppedThrough watermark. GetEventsAfter returns ErrOffsetLost when the requested starting_after is below the watermark, and handleStreamResume surfaces that as HTTP 409 before committing to the SSE response, so a resuming client gets a clear error instead of a silently truncated stream. 2. Missing ownership check (IDOR). GET /responses/:id, its stream resume, and /cancel looked up responses purely by ID, letting any caller who knows or guesses an ID read or cancel another caller's response. Responses now carry the creating caller's identity (auth.GetUser), stamped at creation and compared on read/cancel/resume; a mismatch returns 404 (not 403) so existence is not leaked. Backward compatible: responses with no owner (single-key / no-auth deployments) remain accessible. Assisted-by: Claude:claude-opus-4-8 [Claude Code] Signed-off-by: Ettore Di Giacinto <mudler@localai.io> Co-authored-by: Ettore Di Giacinto <mudler@localai.io>
This commit is contained in:
@@ -3,6 +3,7 @@ package openresponses
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
@@ -10,6 +11,7 @@ import (
|
||||
"github.com/labstack/echo/v4"
|
||||
"github.com/mudler/LocalAI/core/backend"
|
||||
"github.com/mudler/LocalAI/core/config"
|
||||
"github.com/mudler/LocalAI/core/http/auth"
|
||||
mcpTools "github.com/mudler/LocalAI/core/http/endpoints/mcp"
|
||||
openaiEndpoint "github.com/mudler/LocalAI/core/http/endpoints/openai"
|
||||
"github.com/mudler/LocalAI/core/http/middleware"
|
||||
@@ -246,8 +248,11 @@ func ResponsesEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, eval
|
||||
// Create cancellable context for background execution
|
||||
bgCtx, bgCancel := context.WithCancel(context.Background())
|
||||
|
||||
// Store the background response
|
||||
// Store the background response and stamp its owner before the ID
|
||||
// is returned to the client, so later GET/cancel/resume can verify
|
||||
// the caller owns it.
|
||||
store.StoreBackground(responseID, input, queuedResponse, bgCancel, input.Stream)
|
||||
store.SetOwner(responseID, ownerFromContext(c))
|
||||
|
||||
// Start background processing goroutine
|
||||
go func() {
|
||||
@@ -1587,6 +1592,7 @@ func handleOpenResponsesNonStream(c echo.Context, responseID string, createdAt i
|
||||
if shouldStore {
|
||||
store := GetGlobalStore()
|
||||
store.Store(responseID, input, response)
|
||||
store.SetOwner(responseID, ownerFromContext(c))
|
||||
}
|
||||
|
||||
return c.JSON(200, response)
|
||||
@@ -2322,6 +2328,7 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6
|
||||
if shouldStore {
|
||||
store := GetGlobalStore()
|
||||
store.Store(responseID, input, responseCompleted)
|
||||
store.SetOwner(responseID, ownerFromContext(c))
|
||||
}
|
||||
|
||||
// Send [DONE]
|
||||
@@ -2966,6 +2973,18 @@ func convertORToolsToOpenAIFormat(orTools []schema.ORFunctionTool) []functions.T
|
||||
return result
|
||||
}
|
||||
|
||||
// ownerFromContext returns the identity (user ID) of the authenticated
|
||||
// caller, or empty string when no authentication was performed (single-key /
|
||||
// no-auth deployments). It is the value stamped on a response at creation and
|
||||
// compared on read/cancel/resume to prevent one caller from accessing
|
||||
// another's response by guessing its ID.
|
||||
func ownerFromContext(c echo.Context) string {
|
||||
if u := auth.GetUser(c); u != nil {
|
||||
return u.ID
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// GetResponseEndpoint returns a handler for GET /responses/:id
|
||||
// This endpoint is used for polling background responses or resuming streaming
|
||||
// @Summary Get a response by ID
|
||||
@@ -2991,6 +3010,12 @@ func GetResponseEndpoint() func(c echo.Context) error {
|
||||
return sendOpenResponsesError(c, 404, "not_found", fmt.Sprintf("response not found: %s", responseID), "id")
|
||||
}
|
||||
|
||||
// Enforce response ownership. Return 404 (not 403) on mismatch so the
|
||||
// existence of another caller's response is not leaked.
|
||||
if !accessAllowed(stored, ownerFromContext(c)) {
|
||||
return sendOpenResponsesError(c, 404, "not_found", fmt.Sprintf("response not found: %s", responseID), "id")
|
||||
}
|
||||
|
||||
// Check if streaming resume is requested
|
||||
streamParam := c.QueryParam("stream")
|
||||
if streamParam == "true" {
|
||||
@@ -3022,16 +3047,21 @@ func GetResponseEndpoint() func(c echo.Context) error {
|
||||
|
||||
// handleStreamResume handles resuming a streaming response from a specific sequence number
|
||||
func handleStreamResume(c echo.Context, store *ResponseStore, responseID string, stored *StoredResponse, startingAfter int) error {
|
||||
// Fetch buffered events before committing to an SSE response so an
|
||||
// offset-lost gap can be reported as a clean HTTP status rather than a
|
||||
// silently truncated event stream.
|
||||
events, err := store.GetEventsAfter(responseID, startingAfter)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrOffsetLost) {
|
||||
return sendOpenResponsesError(c, 409, "invalid_request_error", fmt.Sprintf("starting_after=%d is older than the oldest retained event; the resume buffer evicted those events and the stream cannot be resumed from that point", startingAfter), "starting_after")
|
||||
}
|
||||
return sendOpenResponsesError(c, 500, "server_error", fmt.Sprintf("failed to get events: %v", err), "")
|
||||
}
|
||||
|
||||
c.Response().Header().Set("Content-Type", "text/event-stream")
|
||||
c.Response().Header().Set("Cache-Control", "no-cache")
|
||||
c.Response().Header().Set("Connection", "keep-alive")
|
||||
|
||||
// Get buffered events after the starting point
|
||||
events, err := store.GetEventsAfter(responseID, startingAfter)
|
||||
if err != nil {
|
||||
return sendOpenResponsesError(c, 500, "server_error", fmt.Sprintf("failed to get events: %v", err), "")
|
||||
}
|
||||
|
||||
// Send all buffered events
|
||||
for _, event := range events {
|
||||
fmt.Fprintf(c.Response().Writer, "event: %s\ndata: %s\n\n", event.EventType, string(event.Data))
|
||||
@@ -3126,6 +3156,17 @@ func CancelResponseEndpoint() func(c echo.Context) error {
|
||||
}
|
||||
|
||||
store := GetGlobalStore()
|
||||
|
||||
// Look up first so ownership can be checked before any mutation.
|
||||
stored, err := store.Get(responseID)
|
||||
if err != nil {
|
||||
return sendOpenResponsesError(c, 404, "not_found", fmt.Sprintf("response not found: %s", responseID), "id")
|
||||
}
|
||||
// Return 404 (not 403) on owner mismatch so existence is not leaked.
|
||||
if !accessAllowed(stored, ownerFromContext(c)) {
|
||||
return sendOpenResponsesError(c, 404, "not_found", fmt.Sprintf("response not found: %s", responseID), "id")
|
||||
}
|
||||
|
||||
response, err := store.Cancel(responseID)
|
||||
if err != nil {
|
||||
return sendOpenResponsesError(c, 404, "not_found", fmt.Sprintf("response not found: %s", responseID), "id")
|
||||
|
||||
@@ -3,6 +3,7 @@ package openresponses
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -11,6 +12,30 @@ import (
|
||||
"github.com/mudler/xlog"
|
||||
)
|
||||
|
||||
const (
|
||||
// defaultMaxStreamEvents bounds how many resume-buffer events a single
|
||||
// background response retains. Without a cap, a long-running or abandoned
|
||||
// background generation grows StreamEvents without limit and can exhaust
|
||||
// process memory. When the cap is exceeded the oldest events are evicted
|
||||
// from the front (see AppendEvent). Mirrors llama.cpp's byte-capped slot
|
||||
// ring used for resumable /slots state.
|
||||
defaultMaxStreamEvents = 8192
|
||||
|
||||
// defaultMaxStreamBytes caps the total serialized size of retained
|
||||
// resume-buffer events, evicting oldest-first when exceeded. This guards
|
||||
// against a handful of very large events defeating the count cap. 0
|
||||
// disables the byte cap (count cap still applies).
|
||||
defaultMaxStreamBytes = 64 << 20 // 64 MiB
|
||||
)
|
||||
|
||||
// ErrOffsetLost is returned by GetEventsAfter when the requested
|
||||
// starting_after sequence number is older than the oldest event still
|
||||
// retained in the resume buffer (i.e. the events between the requested
|
||||
// offset and the current watermark were evicted by the cap). Callers should
|
||||
// surface this to clients as a distinct error instead of silently returning
|
||||
// a truncated stream that omits the dropped events.
|
||||
var ErrOffsetLost = errors.New("resume offset lost: requested events were evicted from the buffer")
|
||||
|
||||
// ResponseStore provides thread-safe storage for Open Responses API responses
|
||||
type ResponseStore struct {
|
||||
mu sync.RWMutex
|
||||
@@ -18,6 +43,12 @@ type ResponseStore struct {
|
||||
ttl time.Duration // Time-to-live for stored responses (0 = no expiration)
|
||||
cleanupCtx context.Context
|
||||
cleanupCancel context.CancelFunc
|
||||
|
||||
// maxStreamEvents / maxStreamBytes bound the per-response resume buffer.
|
||||
// Set once at construction from the default constants; tests may lower
|
||||
// them. A value <= 0 disables that particular cap.
|
||||
maxStreamEvents int
|
||||
maxStreamBytes int
|
||||
}
|
||||
|
||||
// StreamedEvent represents a buffered SSE event for streaming resume
|
||||
@@ -35,6 +66,12 @@ type StoredResponse struct {
|
||||
StoredAt time.Time
|
||||
ExpiresAt *time.Time // nil if no expiration
|
||||
|
||||
// Owner is the identity (user ID) that created this response. It is set
|
||||
// once at creation and never mutated, so it can be read without holding
|
||||
// mu. Empty means "no owner" (single-key / no-auth deployments), in which
|
||||
// case ownership checks are skipped for backward compatibility.
|
||||
Owner string
|
||||
|
||||
// Background execution support
|
||||
CancelFunc context.CancelFunc // For cancellation of background tasks
|
||||
StreamEvents []StreamedEvent // Buffered events for streaming resume
|
||||
@@ -42,6 +79,14 @@ type StoredResponse struct {
|
||||
IsBackground bool // Was created with background=true
|
||||
EventsChan chan struct{} // Signals new events for live subscribers
|
||||
mu sync.RWMutex // Protect concurrent access to this response
|
||||
|
||||
// streamBytes tracks the total serialized size of the events currently
|
||||
// retained in StreamEvents, used to enforce the byte cap. droppedThrough
|
||||
// is the highest sequence number evicted from the front of the buffer
|
||||
// (-1 = nothing evicted); it is the watermark GetEventsAfter compares
|
||||
// against to detect a lost resume offset. Both are guarded by mu.
|
||||
streamBytes int
|
||||
droppedThrough int
|
||||
}
|
||||
|
||||
var getGlobalStore = sync.OnceValue(func() *ResponseStore {
|
||||
@@ -81,8 +126,10 @@ func (s *ResponseStore) SetTTL(ttl time.Duration) {
|
||||
// If ttl is 0, responses are stored indefinitely
|
||||
func NewResponseStore(ttl time.Duration) *ResponseStore {
|
||||
store := &ResponseStore{
|
||||
responses: make(map[string]*StoredResponse),
|
||||
ttl: ttl,
|
||||
responses: make(map[string]*StoredResponse),
|
||||
ttl: ttl,
|
||||
maxStreamEvents: defaultMaxStreamEvents,
|
||||
maxStreamBytes: defaultMaxStreamBytes,
|
||||
}
|
||||
|
||||
// Start cleanup goroutine if TTL is set
|
||||
@@ -109,11 +156,12 @@ func (s *ResponseStore) Store(responseID string, request *schema.OpenResponsesRe
|
||||
}
|
||||
|
||||
stored := &StoredResponse{
|
||||
Request: request,
|
||||
Response: response,
|
||||
Items: items,
|
||||
StoredAt: time.Now(),
|
||||
ExpiresAt: nil,
|
||||
Request: request,
|
||||
Response: response,
|
||||
Items: items,
|
||||
StoredAt: time.Now(),
|
||||
ExpiresAt: nil,
|
||||
droppedThrough: -1,
|
||||
}
|
||||
|
||||
// Set expiration if TTL is configured
|
||||
@@ -256,16 +304,17 @@ func (s *ResponseStore) StoreBackground(responseID string, request *schema.OpenR
|
||||
}
|
||||
|
||||
stored := &StoredResponse{
|
||||
Request: request,
|
||||
Response: response,
|
||||
Items: items,
|
||||
StoredAt: time.Now(),
|
||||
ExpiresAt: nil,
|
||||
CancelFunc: cancelFunc,
|
||||
StreamEvents: []StreamedEvent{},
|
||||
StreamEnabled: streamEnabled,
|
||||
IsBackground: true,
|
||||
EventsChan: make(chan struct{}, 100), // Buffered channel for event notifications
|
||||
Request: request,
|
||||
Response: response,
|
||||
Items: items,
|
||||
StoredAt: time.Now(),
|
||||
ExpiresAt: nil,
|
||||
CancelFunc: cancelFunc,
|
||||
StreamEvents: []StreamedEvent{},
|
||||
StreamEnabled: streamEnabled,
|
||||
IsBackground: true,
|
||||
EventsChan: make(chan struct{}, 100), // Buffered channel for event notifications
|
||||
droppedThrough: -1,
|
||||
}
|
||||
|
||||
// Set expiration if TTL is configured
|
||||
@@ -349,6 +398,25 @@ func (s *ResponseStore) AppendEvent(responseID string, event *schema.ORStreamEve
|
||||
EventType: event.Type,
|
||||
Data: data,
|
||||
})
|
||||
stored.streamBytes += len(data)
|
||||
|
||||
// Evict oldest events from the front once either cap is exceeded. The
|
||||
// byte cap never evicts the only remaining event (a single oversized
|
||||
// event is still served once). Each eviction advances droppedThrough so
|
||||
// a later resume below the watermark is reported as ErrOffsetLost rather
|
||||
// than silently skipping the dropped events.
|
||||
for (s.maxStreamEvents > 0 && len(stored.StreamEvents) > s.maxStreamEvents) ||
|
||||
(s.maxStreamBytes > 0 && stored.streamBytes > s.maxStreamBytes && len(stored.StreamEvents) > 1) {
|
||||
evicted := stored.StreamEvents[0]
|
||||
stored.streamBytes -= len(evicted.Data)
|
||||
if evicted.SequenceNumber > stored.droppedThrough {
|
||||
stored.droppedThrough = evicted.SequenceNumber
|
||||
}
|
||||
// Release the evicted payload so it can be GC'd even though the
|
||||
// backing array element is still owned by the slice until reuse.
|
||||
stored.StreamEvents[0].Data = nil
|
||||
stored.StreamEvents = stored.StreamEvents[1:]
|
||||
}
|
||||
stored.mu.Unlock()
|
||||
|
||||
// Notify any subscribers of new event
|
||||
@@ -374,6 +442,14 @@ func (s *ResponseStore) GetEventsAfter(responseID string, startingAfter int) ([]
|
||||
stored.mu.RLock()
|
||||
defer stored.mu.RUnlock()
|
||||
|
||||
// If the requested offset is older than the watermark, the events the
|
||||
// client expects next (those in (startingAfter, droppedThrough]) were
|
||||
// evicted by the cap. Signal the gap rather than returning a stream that
|
||||
// silently skips them.
|
||||
if startingAfter < stored.droppedThrough {
|
||||
return nil, ErrOffsetLost
|
||||
}
|
||||
|
||||
var result []StreamedEvent
|
||||
for _, event := range stored.StreamEvents {
|
||||
if event.SequenceNumber > startingAfter {
|
||||
@@ -447,3 +523,30 @@ func (s *ResponseStore) IsStreamEnabled(responseID string) (bool, error) {
|
||||
|
||||
return stored.StreamEnabled, nil
|
||||
}
|
||||
|
||||
// SetOwner records the identity that owns a stored response. It is called
|
||||
// once, right after the response is stored and before its ID is handed back
|
||||
// to any client, so no lock on the stored response is required. A no-op for
|
||||
// an empty owner or unknown response ID.
|
||||
func (s *ResponseStore) SetOwner(responseID, owner string) {
|
||||
if owner == "" {
|
||||
return
|
||||
}
|
||||
|
||||
s.mu.RLock()
|
||||
stored, exists := s.responses[responseID]
|
||||
s.mu.RUnlock()
|
||||
if !exists {
|
||||
return
|
||||
}
|
||||
|
||||
stored.Owner = owner
|
||||
}
|
||||
|
||||
// accessAllowed reports whether a caller identified by callerID may read or
|
||||
// mutate the given stored response. An empty owner (single-key / no-auth
|
||||
// deployments) is accessible by anyone, preserving backward compatibility;
|
||||
// otherwise the caller identity must match the recorded owner.
|
||||
func accessAllowed(stored *StoredResponse, callerID string) bool {
|
||||
return stored.Owner == "" || stored.Owner == callerID
|
||||
}
|
||||
|
||||
@@ -585,6 +585,86 @@ var _ = Describe("ResponseStore", func() {
|
||||
Expect(enabled2).To(BeFalse())
|
||||
})
|
||||
|
||||
It("should bound the resume buffer and evict oldest events past the cap", func() {
|
||||
// Lower the caps so the test stays fast; production defaults are
|
||||
// large. Same-package access to the unexported fields is fine.
|
||||
store.maxStreamEvents = 5
|
||||
store.maxStreamBytes = 0 // count cap only for this test
|
||||
|
||||
responseID := "resp_buffer_cap"
|
||||
request := &schema.OpenResponsesRequest{Model: "test"}
|
||||
response := &schema.ORResponseResource{
|
||||
ID: responseID,
|
||||
Object: "response",
|
||||
Status: schema.ORStatusInProgress,
|
||||
}
|
||||
|
||||
_, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
store.StoreBackground(responseID, request, response, cancel, true)
|
||||
|
||||
// Append well past the cap.
|
||||
const total = 20
|
||||
for i := range total {
|
||||
err := store.AppendEvent(responseID, &schema.ORStreamEvent{
|
||||
Type: "response.output_text.delta",
|
||||
SequenceNumber: i,
|
||||
})
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
}
|
||||
|
||||
stored, err := store.Get(responseID)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
|
||||
// (a) Buffer length stays bounded by the cap.
|
||||
Expect(len(stored.StreamEvents)).To(Equal(5))
|
||||
|
||||
// (b) Oldest events were evicted: only the last 5 sequence numbers
|
||||
// remain (15..19).
|
||||
Expect(stored.StreamEvents[0].SequenceNumber).To(Equal(15))
|
||||
Expect(stored.StreamEvents[len(stored.StreamEvents)-1].SequenceNumber).To(Equal(19))
|
||||
|
||||
// Asking for events after the last retained seq still works.
|
||||
retained, err := store.GetEventsAfter(responseID, 14)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(retained).To(HaveLen(5))
|
||||
|
||||
// (c) Asking below the dropped watermark returns ErrOffsetLost.
|
||||
_, err = store.GetEventsAfter(responseID, 0)
|
||||
Expect(err).To(MatchError(ErrOffsetLost))
|
||||
|
||||
_, err = store.GetEventsAfter(responseID, -1)
|
||||
Expect(err).To(MatchError(ErrOffsetLost))
|
||||
})
|
||||
|
||||
It("should record and enforce response ownership", func() {
|
||||
responseID := "resp_owner_test"
|
||||
request := &schema.OpenResponsesRequest{Model: "test"}
|
||||
response := &schema.ORResponseResource{ID: responseID, Object: "response", Status: schema.ORStatusCompleted}
|
||||
|
||||
store.Store(responseID, request, response)
|
||||
store.SetOwner(responseID, "userA")
|
||||
|
||||
stored, err := store.Get(responseID)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(stored.Owner).To(Equal("userA"))
|
||||
|
||||
// Owner matches -> allowed; different identity -> denied.
|
||||
Expect(accessAllowed(stored, "userA")).To(BeTrue())
|
||||
Expect(accessAllowed(stored, "userB")).To(BeFalse())
|
||||
|
||||
// Backward compatibility: a response with no owner is accessible
|
||||
// by any caller (single-key / no-auth deployments).
|
||||
noOwnerID := "resp_no_owner"
|
||||
store.Store(noOwnerID, request, &schema.ORResponseResource{ID: noOwnerID, Object: "response"})
|
||||
noOwner, err := store.Get(noOwnerID)
|
||||
Expect(err).ToNot(HaveOccurred())
|
||||
Expect(noOwner.Owner).To(BeEmpty())
|
||||
Expect(accessAllowed(noOwner, "anyone")).To(BeTrue())
|
||||
Expect(accessAllowed(noOwner, "")).To(BeTrue())
|
||||
})
|
||||
|
||||
It("should notify subscribers of new events", func() {
|
||||
responseID := "resp_events_chan"
|
||||
request := &schema.OpenResponsesRequest{Model: "test"}
|
||||
|
||||
Reference in New Issue
Block a user