diff --git a/core/http/endpoints/openresponses/responses.go b/core/http/endpoints/openresponses/responses.go index 916380d01..f8d741508 100644 --- a/core/http/endpoints/openresponses/responses.go +++ b/core/http/endpoints/openresponses/responses.go @@ -3,6 +3,7 @@ package openresponses import ( "context" "encoding/json" + "errors" "fmt" "time" @@ -10,6 +11,7 @@ import ( "github.com/labstack/echo/v4" "github.com/mudler/LocalAI/core/backend" "github.com/mudler/LocalAI/core/config" + "github.com/mudler/LocalAI/core/http/auth" mcpTools "github.com/mudler/LocalAI/core/http/endpoints/mcp" openaiEndpoint "github.com/mudler/LocalAI/core/http/endpoints/openai" "github.com/mudler/LocalAI/core/http/middleware" @@ -246,8 +248,11 @@ func ResponsesEndpoint(cl *config.ModelConfigLoader, ml *model.ModelLoader, eval // Create cancellable context for background execution bgCtx, bgCancel := context.WithCancel(context.Background()) - // Store the background response + // Store the background response and stamp its owner before the ID + // is returned to the client, so later GET/cancel/resume can verify + // the caller owns it. store.StoreBackground(responseID, input, queuedResponse, bgCancel, input.Stream) + store.SetOwner(responseID, ownerFromContext(c)) // Start background processing goroutine go func() { @@ -1587,6 +1592,7 @@ func handleOpenResponsesNonStream(c echo.Context, responseID string, createdAt i if shouldStore { store := GetGlobalStore() store.Store(responseID, input, response) + store.SetOwner(responseID, ownerFromContext(c)) } return c.JSON(200, response) @@ -2322,6 +2328,7 @@ func handleOpenResponsesStream(c echo.Context, responseID string, createdAt int6 if shouldStore { store := GetGlobalStore() store.Store(responseID, input, responseCompleted) + store.SetOwner(responseID, ownerFromContext(c)) } // Send [DONE] @@ -2966,6 +2973,18 @@ func convertORToolsToOpenAIFormat(orTools []schema.ORFunctionTool) []functions.T return result } +// ownerFromContext returns the identity (user ID) of the authenticated +// caller, or empty string when no authentication was performed (single-key / +// no-auth deployments). It is the value stamped on a response at creation and +// compared on read/cancel/resume to prevent one caller from accessing +// another's response by guessing its ID. +func ownerFromContext(c echo.Context) string { + if u := auth.GetUser(c); u != nil { + return u.ID + } + return "" +} + // GetResponseEndpoint returns a handler for GET /responses/:id // This endpoint is used for polling background responses or resuming streaming // @Summary Get a response by ID @@ -2991,6 +3010,12 @@ func GetResponseEndpoint() func(c echo.Context) error { return sendOpenResponsesError(c, 404, "not_found", fmt.Sprintf("response not found: %s", responseID), "id") } + // Enforce response ownership. Return 404 (not 403) on mismatch so the + // existence of another caller's response is not leaked. + if !accessAllowed(stored, ownerFromContext(c)) { + return sendOpenResponsesError(c, 404, "not_found", fmt.Sprintf("response not found: %s", responseID), "id") + } + // Check if streaming resume is requested streamParam := c.QueryParam("stream") if streamParam == "true" { @@ -3022,16 +3047,21 @@ func GetResponseEndpoint() func(c echo.Context) error { // handleStreamResume handles resuming a streaming response from a specific sequence number func handleStreamResume(c echo.Context, store *ResponseStore, responseID string, stored *StoredResponse, startingAfter int) error { + // Fetch buffered events before committing to an SSE response so an + // offset-lost gap can be reported as a clean HTTP status rather than a + // silently truncated event stream. + events, err := store.GetEventsAfter(responseID, startingAfter) + if err != nil { + if errors.Is(err, ErrOffsetLost) { + return sendOpenResponsesError(c, 409, "invalid_request_error", fmt.Sprintf("starting_after=%d is older than the oldest retained event; the resume buffer evicted those events and the stream cannot be resumed from that point", startingAfter), "starting_after") + } + return sendOpenResponsesError(c, 500, "server_error", fmt.Sprintf("failed to get events: %v", err), "") + } + c.Response().Header().Set("Content-Type", "text/event-stream") c.Response().Header().Set("Cache-Control", "no-cache") c.Response().Header().Set("Connection", "keep-alive") - // Get buffered events after the starting point - events, err := store.GetEventsAfter(responseID, startingAfter) - if err != nil { - return sendOpenResponsesError(c, 500, "server_error", fmt.Sprintf("failed to get events: %v", err), "") - } - // Send all buffered events for _, event := range events { fmt.Fprintf(c.Response().Writer, "event: %s\ndata: %s\n\n", event.EventType, string(event.Data)) @@ -3126,6 +3156,17 @@ func CancelResponseEndpoint() func(c echo.Context) error { } store := GetGlobalStore() + + // Look up first so ownership can be checked before any mutation. + stored, err := store.Get(responseID) + if err != nil { + return sendOpenResponsesError(c, 404, "not_found", fmt.Sprintf("response not found: %s", responseID), "id") + } + // Return 404 (not 403) on owner mismatch so existence is not leaked. + if !accessAllowed(stored, ownerFromContext(c)) { + return sendOpenResponsesError(c, 404, "not_found", fmt.Sprintf("response not found: %s", responseID), "id") + } + response, err := store.Cancel(responseID) if err != nil { return sendOpenResponsesError(c, 404, "not_found", fmt.Sprintf("response not found: %s", responseID), "id") diff --git a/core/http/endpoints/openresponses/store.go b/core/http/endpoints/openresponses/store.go index bea5b7413..ab52261e5 100644 --- a/core/http/endpoints/openresponses/store.go +++ b/core/http/endpoints/openresponses/store.go @@ -3,6 +3,7 @@ package openresponses import ( "context" "encoding/json" + "errors" "fmt" "sync" "time" @@ -11,6 +12,30 @@ import ( "github.com/mudler/xlog" ) +const ( + // defaultMaxStreamEvents bounds how many resume-buffer events a single + // background response retains. Without a cap, a long-running or abandoned + // background generation grows StreamEvents without limit and can exhaust + // process memory. When the cap is exceeded the oldest events are evicted + // from the front (see AppendEvent). Mirrors llama.cpp's byte-capped slot + // ring used for resumable /slots state. + defaultMaxStreamEvents = 8192 + + // defaultMaxStreamBytes caps the total serialized size of retained + // resume-buffer events, evicting oldest-first when exceeded. This guards + // against a handful of very large events defeating the count cap. 0 + // disables the byte cap (count cap still applies). + defaultMaxStreamBytes = 64 << 20 // 64 MiB +) + +// ErrOffsetLost is returned by GetEventsAfter when the requested +// starting_after sequence number is older than the oldest event still +// retained in the resume buffer (i.e. the events between the requested +// offset and the current watermark were evicted by the cap). Callers should +// surface this to clients as a distinct error instead of silently returning +// a truncated stream that omits the dropped events. +var ErrOffsetLost = errors.New("resume offset lost: requested events were evicted from the buffer") + // ResponseStore provides thread-safe storage for Open Responses API responses type ResponseStore struct { mu sync.RWMutex @@ -18,6 +43,12 @@ type ResponseStore struct { ttl time.Duration // Time-to-live for stored responses (0 = no expiration) cleanupCtx context.Context cleanupCancel context.CancelFunc + + // maxStreamEvents / maxStreamBytes bound the per-response resume buffer. + // Set once at construction from the default constants; tests may lower + // them. A value <= 0 disables that particular cap. + maxStreamEvents int + maxStreamBytes int } // StreamedEvent represents a buffered SSE event for streaming resume @@ -35,6 +66,12 @@ type StoredResponse struct { StoredAt time.Time ExpiresAt *time.Time // nil if no expiration + // Owner is the identity (user ID) that created this response. It is set + // once at creation and never mutated, so it can be read without holding + // mu. Empty means "no owner" (single-key / no-auth deployments), in which + // case ownership checks are skipped for backward compatibility. + Owner string + // Background execution support CancelFunc context.CancelFunc // For cancellation of background tasks StreamEvents []StreamedEvent // Buffered events for streaming resume @@ -42,6 +79,14 @@ type StoredResponse struct { IsBackground bool // Was created with background=true EventsChan chan struct{} // Signals new events for live subscribers mu sync.RWMutex // Protect concurrent access to this response + + // streamBytes tracks the total serialized size of the events currently + // retained in StreamEvents, used to enforce the byte cap. droppedThrough + // is the highest sequence number evicted from the front of the buffer + // (-1 = nothing evicted); it is the watermark GetEventsAfter compares + // against to detect a lost resume offset. Both are guarded by mu. + streamBytes int + droppedThrough int } var getGlobalStore = sync.OnceValue(func() *ResponseStore { @@ -81,8 +126,10 @@ func (s *ResponseStore) SetTTL(ttl time.Duration) { // If ttl is 0, responses are stored indefinitely func NewResponseStore(ttl time.Duration) *ResponseStore { store := &ResponseStore{ - responses: make(map[string]*StoredResponse), - ttl: ttl, + responses: make(map[string]*StoredResponse), + ttl: ttl, + maxStreamEvents: defaultMaxStreamEvents, + maxStreamBytes: defaultMaxStreamBytes, } // Start cleanup goroutine if TTL is set @@ -109,11 +156,12 @@ func (s *ResponseStore) Store(responseID string, request *schema.OpenResponsesRe } stored := &StoredResponse{ - Request: request, - Response: response, - Items: items, - StoredAt: time.Now(), - ExpiresAt: nil, + Request: request, + Response: response, + Items: items, + StoredAt: time.Now(), + ExpiresAt: nil, + droppedThrough: -1, } // Set expiration if TTL is configured @@ -256,16 +304,17 @@ func (s *ResponseStore) StoreBackground(responseID string, request *schema.OpenR } stored := &StoredResponse{ - Request: request, - Response: response, - Items: items, - StoredAt: time.Now(), - ExpiresAt: nil, - CancelFunc: cancelFunc, - StreamEvents: []StreamedEvent{}, - StreamEnabled: streamEnabled, - IsBackground: true, - EventsChan: make(chan struct{}, 100), // Buffered channel for event notifications + Request: request, + Response: response, + Items: items, + StoredAt: time.Now(), + ExpiresAt: nil, + CancelFunc: cancelFunc, + StreamEvents: []StreamedEvent{}, + StreamEnabled: streamEnabled, + IsBackground: true, + EventsChan: make(chan struct{}, 100), // Buffered channel for event notifications + droppedThrough: -1, } // Set expiration if TTL is configured @@ -349,6 +398,25 @@ func (s *ResponseStore) AppendEvent(responseID string, event *schema.ORStreamEve EventType: event.Type, Data: data, }) + stored.streamBytes += len(data) + + // Evict oldest events from the front once either cap is exceeded. The + // byte cap never evicts the only remaining event (a single oversized + // event is still served once). Each eviction advances droppedThrough so + // a later resume below the watermark is reported as ErrOffsetLost rather + // than silently skipping the dropped events. + for (s.maxStreamEvents > 0 && len(stored.StreamEvents) > s.maxStreamEvents) || + (s.maxStreamBytes > 0 && stored.streamBytes > s.maxStreamBytes && len(stored.StreamEvents) > 1) { + evicted := stored.StreamEvents[0] + stored.streamBytes -= len(evicted.Data) + if evicted.SequenceNumber > stored.droppedThrough { + stored.droppedThrough = evicted.SequenceNumber + } + // Release the evicted payload so it can be GC'd even though the + // backing array element is still owned by the slice until reuse. + stored.StreamEvents[0].Data = nil + stored.StreamEvents = stored.StreamEvents[1:] + } stored.mu.Unlock() // Notify any subscribers of new event @@ -374,6 +442,14 @@ func (s *ResponseStore) GetEventsAfter(responseID string, startingAfter int) ([] stored.mu.RLock() defer stored.mu.RUnlock() + // If the requested offset is older than the watermark, the events the + // client expects next (those in (startingAfter, droppedThrough]) were + // evicted by the cap. Signal the gap rather than returning a stream that + // silently skips them. + if startingAfter < stored.droppedThrough { + return nil, ErrOffsetLost + } + var result []StreamedEvent for _, event := range stored.StreamEvents { if event.SequenceNumber > startingAfter { @@ -447,3 +523,30 @@ func (s *ResponseStore) IsStreamEnabled(responseID string) (bool, error) { return stored.StreamEnabled, nil } + +// SetOwner records the identity that owns a stored response. It is called +// once, right after the response is stored and before its ID is handed back +// to any client, so no lock on the stored response is required. A no-op for +// an empty owner or unknown response ID. +func (s *ResponseStore) SetOwner(responseID, owner string) { + if owner == "" { + return + } + + s.mu.RLock() + stored, exists := s.responses[responseID] + s.mu.RUnlock() + if !exists { + return + } + + stored.Owner = owner +} + +// accessAllowed reports whether a caller identified by callerID may read or +// mutate the given stored response. An empty owner (single-key / no-auth +// deployments) is accessible by anyone, preserving backward compatibility; +// otherwise the caller identity must match the recorded owner. +func accessAllowed(stored *StoredResponse, callerID string) bool { + return stored.Owner == "" || stored.Owner == callerID +} diff --git a/core/http/endpoints/openresponses/store_test.go b/core/http/endpoints/openresponses/store_test.go index 360e32df4..d59db2d38 100644 --- a/core/http/endpoints/openresponses/store_test.go +++ b/core/http/endpoints/openresponses/store_test.go @@ -585,6 +585,86 @@ var _ = Describe("ResponseStore", func() { Expect(enabled2).To(BeFalse()) }) + It("should bound the resume buffer and evict oldest events past the cap", func() { + // Lower the caps so the test stays fast; production defaults are + // large. Same-package access to the unexported fields is fine. + store.maxStreamEvents = 5 + store.maxStreamBytes = 0 // count cap only for this test + + responseID := "resp_buffer_cap" + request := &schema.OpenResponsesRequest{Model: "test"} + response := &schema.ORResponseResource{ + ID: responseID, + Object: "response", + Status: schema.ORStatusInProgress, + } + + _, cancel := context.WithCancel(context.Background()) + defer cancel() + + store.StoreBackground(responseID, request, response, cancel, true) + + // Append well past the cap. + const total = 20 + for i := range total { + err := store.AppendEvent(responseID, &schema.ORStreamEvent{ + Type: "response.output_text.delta", + SequenceNumber: i, + }) + Expect(err).ToNot(HaveOccurred()) + } + + stored, err := store.Get(responseID) + Expect(err).ToNot(HaveOccurred()) + + // (a) Buffer length stays bounded by the cap. + Expect(len(stored.StreamEvents)).To(Equal(5)) + + // (b) Oldest events were evicted: only the last 5 sequence numbers + // remain (15..19). + Expect(stored.StreamEvents[0].SequenceNumber).To(Equal(15)) + Expect(stored.StreamEvents[len(stored.StreamEvents)-1].SequenceNumber).To(Equal(19)) + + // Asking for events after the last retained seq still works. + retained, err := store.GetEventsAfter(responseID, 14) + Expect(err).ToNot(HaveOccurred()) + Expect(retained).To(HaveLen(5)) + + // (c) Asking below the dropped watermark returns ErrOffsetLost. + _, err = store.GetEventsAfter(responseID, 0) + Expect(err).To(MatchError(ErrOffsetLost)) + + _, err = store.GetEventsAfter(responseID, -1) + Expect(err).To(MatchError(ErrOffsetLost)) + }) + + It("should record and enforce response ownership", func() { + responseID := "resp_owner_test" + request := &schema.OpenResponsesRequest{Model: "test"} + response := &schema.ORResponseResource{ID: responseID, Object: "response", Status: schema.ORStatusCompleted} + + store.Store(responseID, request, response) + store.SetOwner(responseID, "userA") + + stored, err := store.Get(responseID) + Expect(err).ToNot(HaveOccurred()) + Expect(stored.Owner).To(Equal("userA")) + + // Owner matches -> allowed; different identity -> denied. + Expect(accessAllowed(stored, "userA")).To(BeTrue()) + Expect(accessAllowed(stored, "userB")).To(BeFalse()) + + // Backward compatibility: a response with no owner is accessible + // by any caller (single-key / no-auth deployments). + noOwnerID := "resp_no_owner" + store.Store(noOwnerID, request, &schema.ORResponseResource{ID: noOwnerID, Object: "response"}) + noOwner, err := store.Get(noOwnerID) + Expect(err).ToNot(HaveOccurred()) + Expect(noOwner.Owner).To(BeEmpty()) + Expect(accessAllowed(noOwner, "anyone")).To(BeTrue()) + Expect(accessAllowed(noOwner, "")).To(BeTrue()) + }) + It("should notify subscribers of new events", func() { responseID := "resp_events_chan" request := &schema.OpenResponsesRequest{Model: "test"}