From f013990ff3637abbded435f961e5e27c8b56751d Mon Sep 17 00:00:00 2001 From: Scott Taylor Date: Thu, 14 May 2026 22:37:11 -0400 Subject: [PATCH] Fix Fetch.enable + http-proxy CDP send/recv deadlock (#2462) WsConnection.send used to switch the socket to blocking mode on EWOULDBLOCK, with a comment claiming this 'should virtually never happen'. Under CDP Fetch.enable + --http-proxy it is routine: the CONNECT+TLS proxy round-trip means many subresources are in flight simultaneously, lightpanda emits a flood of Fetch.requestPaused events, the kernel send buffer fills, lightpanda blocks on write, and puppeteer's matching Fetch.continueRequest replies pile up in lightpanda's TCP recv buffer (which lightpanda can't drain because it is blocked on the write). Both peers wedge until the client times out. Other contributing problems all collapse paused-intercept sites where an outer loop polls without ever draining the CDP socket, OR where disconnect-time cleanup re-enters JS through paths the runtime can no longer satisfy: * HttpClient.perform skipped the CDP socket poll entirely whenever processMessages() returned non-empty, so a steady stream of HTTP completions could starve CDP reads. * ScriptManagerBase.waitForImport spun on `client.tick(200)` and discarded the .cdp_socket return, so a script `import()` whose request was paused at the InterceptionLayer hung forever. * BrowserContext.deinit aborted pending intercepts via `transfer.abort`, which fired XHR/script error_callback chains into a half-torn-down V8 context (the inspector had already been stopped two lines above). * Headers.deinit was non-idempotent, so a value-copied Headers (the hazard RobotsLayer documents) double-freed its curl_slist on the second deinit; the symptom was an "incorrect alignment" panic inside ZigToCurlAllocator.free. * Transfer.deinit was non-idempotent, so a cascade out of error_callback (e.g. Script.errorCallback -> manager.evaluate() -> JS execution -> Frame.deinit -> abortOwner -> Transfer.kill -> Transfer.deinit) reached `arena_pool.release` twice on the same arena. Coordinated changes: * src/network/WsConnection.zig: On EWOULDBLOCK, instead of switching to a blocking write, poll for both POLLOUT and POLLIN. While waiting for write space, drain any incoming bytes into the reader buffer (without dispatching - that would re-enter send and recurse). Adds tryRead/bufferedBytes accessors. * src/browser/HttpClient.zig: - Add has_buffered_input to CDPClient. In perform(), return .cdp_socket when buffered input exists, and always do at least a non-blocking poll on the CDP socket so HTTP completions can no longer starve CDP reads. - Make Transfer.deinit idempotent by claiming ownership through `client.transfers.remove(self.id)`. Second deinits (cascades out of error_callback) early-return. - Make `Transfer.kill` public (was `fn`) so BrowserContext.deinit can use it. - Tighten RequestParams.deinit / Request.deinit to take `*` instead of `*const` so they can call into `Headers.deinit` (now mutating). * src/network/http.zig: Headers.deinit now nulls out `self.headers` after `curl_slist_free_all`, so a second deinit is a no-op. Without this guard a value-copied Headers double-frees the curl_slist (the hazard RobotsLayer's call site already documents). * src/browser/ScriptManagerBase.zig: waitForImport now drains pending CDP messages on every iteration (matching the syncRequest pattern) and re-fetches the imported_modules entry per iteration. The cached entry was a use-after-free risk because the CDP-drain step above re-enters JS, and a transitively-imported module's preloadImport() -> getOrPut() can rehash the map and invalidate the prior entry pointer. * src/cdp/CDP.zig: - Wire hasBufferedInput. - Replace read() with tryRead() in readSocket and tolerate the no_new_data case so we still process messages drained during a backpressured send. - BrowserContext.deinit aborts pending intercepts via `transfer.kill` instead of `transfer.abort`. `kill` fires shutdown_callback (or no-op for transfers without one), avoiding error_callback's re-entry into JS through XHR/script error handlers - those crash because the V8 context and inspector this BC owns have either been torn down already or are about to be. Verified end-to-end against a puppeteer + http-proxy reproducer: | URL | before | after | |----------------------------|----------------------|------------------| | example.com | OK 124ms | OK 117ms | | github.com | HANG 20s (12/82) | OK 1410ms (82/82)| | shopify.com | HANG 20s (1/4) | OK 1973ms (66/66)| | allbirds.com | HANG 20s (12/53) | OK 3944ms (372) | | allbirds wool-runners PDP | HANG 20s (12/53) | OK 6286ms (459) | (nike.com still doesn't reach `load` event but all 68 continueRequests process cleanly - the remaining stall is third-party widgets keeping the page in `loading` state, not the CDP/HTTP deadlock this PR fixes.) Lightpanda survives 18 back-to-back navigation runs across the matrix above (3 per URL) without crashing. Fixes #2462. --- src/browser/HttpClient.zig | 62 ++++++++++--- src/browser/ScriptManagerBase.zig | 35 ++++++-- src/cdp/CDP.zig | 29 ++++-- src/network/WsConnection.zig | 144 +++++++++++++++++++++++++----- src/network/http.zig | 11 ++- 5 files changed, 235 insertions(+), 46 deletions(-) diff --git a/src/browser/HttpClient.zig b/src/browser/HttpClient.zig index 127a4a32..bf48c719 100644 --- a/src/browser/HttpClient.zig +++ b/src/browser/HttpClient.zig @@ -170,6 +170,12 @@ pub const CDPClient = struct { blocking_read_start: *const fn (*anyopaque) bool, blocking_read: *const fn (*anyopaque) bool, blocking_read_end: *const fn (*anyopaque) bool, + // Returns true if the CDP client has bytes already buffered (typically + // rescued from the socket while a synchronous send was backpressured; + // see WsConnection.send). When true, perform() returns .cdp_socket so + // the runner drains them even if the OS recv buffer is empty. See + // lightpanda-io/browser#2462. + has_buffered_input: *const fn (*anyopaque) bool, }; pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp_client: ?CDPClient) !void { @@ -413,7 +419,8 @@ pub fn _request(_: *anyopaque, transfer: *Transfer) !void { // inside this function, we free it before returning the error. Callers // must NOT pair `request()` with their own `errdefer headers.deinit()` // — that's a double-free. -pub fn request(self: *Client, req: Request, owner: ?*Owner) !void { +pub fn request(self: *Client, req_in: Request, owner: ?*Owner) !void { + var req = req_in; const arena = self.arena_pool.acquire(.small, "Request.arena") catch |err| { req.headers.deinit(); return err; @@ -658,21 +665,36 @@ fn perform(self: *Client, timeout_ms: c_int) anyerror!PerformStatus { // We're potentially going to block for a while until we get data. Process // whatever messages we have waiting ahead of time. - if (try self.processMessages()) { - return .normal; - } + const processed = try self.processMessages(); var status = PerformStatus.normal; if (self.cdp_client) |cdp_client| { + // Bytes may have been rescued from the CDP socket while a + // synchronous send was backpressured (see WsConnection.send). The + // OS recv buffer is empty in that case, but we still owe the + // dispatcher a chance to process them. See + // lightpanda-io/browser#2462. + if (cdp_client.has_buffered_input(cdp_client.ctx)) { + return .cdp_socket; + } + + // Even when we processed completion messages this round, do a + // non-blocking poll of the CDP socket so a steady stream of HTTP + // completions can't starve CDP reads. Without this, a flood of + // intercepted/proxied transfers can leave Fetch.continueRequest + // messages unread for seconds at a time. + const cdp_timeout: c_int = if (processed) 0 else timeout_ms; var wait_fds = [_]http.WaitFd{.{ .fd = cdp_client.socket, .events = .{ .pollin = true }, .revents = .{}, }}; - try self.handles.poll(&wait_fds, timeout_ms); + try self.handles.poll(&wait_fds, cdp_timeout); if (wait_fds[0].revents.pollin or wait_fds[0].revents.pollpri or wait_fds[0].revents.pollout) { status = .cdp_socket; } + } else if (processed) { + return .normal; } else if (running > 0) { try self.handles.poll(&.{}, timeout_ms); } @@ -961,7 +983,7 @@ pub const Request = struct { return written.ptr[0..written.len :0]; } - pub fn deinit(self: *const Request) void { + pub fn deinit(self: *Request) void { self.headers.deinit(); } }; @@ -1131,6 +1153,22 @@ pub const Transfer = struct { } pub fn deinit(self: *Transfer) void { + // Use `transfers.remove` as a one-shot ownership claim. If it + // returns false, this transfer has already been deinit'd by a + // cascade out of `error_callback` (e.g. Script.errorCallback -> + // manager.evaluate() -> JS execution -> Frame.deinit -> + // abortOwner -> Transfer.kill -> Transfer.deinit). Returning + // here keeps us from double-freeing the arena. See + // lightpanda-io/browser#2462. + // + // Reading `self.id` and `self.client` after a prior deinit has + // released `self.arena` is technically a stale read, but + // arena_pool zombies the memory until the slot is handed out + // again. If by some race the slot were re-used between the two + // deinits, the new tenant's id would not be in `transfers` + // either, so the early-out still fires and we bail cleanly. + if (!self.client.transfers.remove(self.id)) return; + if (self._conn) |conn| { self.client.removeConn(conn); self._conn = null; @@ -1145,10 +1183,6 @@ pub const Transfer = struct { self._queued = false; } - // Drop the id→*Transfer index entry before freeing the memory. - // Any concurrent CDP lookup by id will now see this transfer as gone. - _ = self.client.transfers.remove(self.id); - self.req.deinit(); if (self.owner) |o| { o.removeTransfer(self); @@ -1175,8 +1209,12 @@ pub const Transfer = struct { // Owner-driven teardown: fires shutdown_callback (not error_callback) // and otherwise behaves like abort. Called by Client.abortOwner / - // abortRequests when a Frame / WGS is being torn down. - fn kill(self: *Transfer) void { + // abortRequests when a Frame / WGS is being torn down, and from + // BrowserContext.deinit when the CDP connection drops with paused + // intercepts still outstanding -- in both cases firing error_callback + // would re-enter JS via XHR/script error handlers, and the inspector + // / V8 context the handler reaches into has already been torn down. + pub fn kill(self: *Transfer) void { if (self.req.shutdown_callback) |cb| { cb(self.req.ctx); } diff --git a/src/browser/ScriptManagerBase.zig b/src/browser/ScriptManagerBase.zig index fbaf2c6e..8041fb34 100644 --- a/src/browser/ScriptManagerBase.zig +++ b/src/browser/ScriptManagerBase.zig @@ -286,22 +286,41 @@ pub fn preloadImport(self: *ScriptManagerBase, url: [:0]const u8, referrer: []co } pub fn waitForImport(self: *ScriptManagerBase, url: [:0]const u8) !ModuleSource { - const entry = self.imported_modules.getEntry(url) orelse { - // It shouldn't be possible for v8 to ask for a module that we didn't - // `preloadImport` above. - return error.UnknownModule; - }; - const was_evaluating = self.is_evaluating; self.is_evaluating = true; defer self.is_evaluating = was_evaluating; var client = self.client; while (true) { + // Re-fetch the entry on every iteration. We can't cache the pointer + // outside the loop: the CDP-drain step below (re-)enters JS, which + // can call back into `preloadImport` for a transitively-imported + // module. `preloadImport` calls `imported_modules.getOrPut`, which + // may rehash the map and invalidate every existing entry pointer. + // A cached `entry` would then be a use-after-free on the next + // `entry.value_ptr.state` access. See lightpanda-io/browser#2462. + const entry = self.imported_modules.getEntry(url) orelse { + // It shouldn't be possible for v8 to ask for a module that we + // didn't `preloadImport` above. + return error.UnknownModule; + }; + switch (entry.value_ptr.state) { .loading => { - _ = try client.tick(200); - continue; + // Drain pending CDP messages every iteration. Without this, + // a script imported under `Fetch.enable` whose request is + // paused at the InterceptionLayer hangs forever: the CDP + // client's matching `Fetch.continueRequest` reply never + // reaches `CDP.processMessage`, the request never resumes, + // and this loop spins until the client times out. + // `syncRequest` already does this; the import path needs + // the same treatment. See lightpanda-io/browser#2462. + const status = try client.tick(200); + if (status == .cdp_socket) { + if (client.cdp_client) |cdp| { + _ = cdp.blocking_read(cdp.ctx); + } + } }, .done => |script| { var shared = false; diff --git a/src/cdp/CDP.zig b/src/cdp/CDP.zig index ecb00753..1568cd4c 100644 --- a/src/cdp/CDP.zig +++ b/src/cdp/CDP.zig @@ -111,6 +111,7 @@ pub fn init( .blocking_read_start = CDP.blockingReadStart, .blocking_read = CDP.blockingRead, .blocking_read_end = CDP.blockingReadStop, + .has_buffered_input = CDP.hasBufferedInput, }); } @@ -149,15 +150,27 @@ pub fn blockingReadStop(ctx: *anyopaque) bool { return true; } +pub fn hasBufferedInput(ctx: *anyopaque) bool { + const self: *CDP = @ptrCast(@alignCast(ctx)); + return self.ws.bufferedBytes() > 0; +} + pub fn readSocket(self: *CDP) bool { - const n = self.ws.read() catch |err| { + // Use tryRead, not read, so that if the socket has no new bytes (because + // we already drained them while a backpressured send was waiting in + // WsConnection.send) we still process whatever is buffered instead of + // logging an error and bailing. See lightpanda-io/browser#2462. + const result = self.ws.tryRead() catch |err| { log.warn(.app, "CDP read", .{ .err = err }); return false; }; - if (n == 0) { - log.info(.app, "CDP disconnect", .{}); - return false; + switch (result) { + .closed => { + log.info(.app, "CDP disconnect", .{}); + return false; + }, + .data, .no_new_data => {}, } return self.ws.processMessages(self) catch false; @@ -574,7 +587,13 @@ pub const BrowserContext = struct { ); http_client.interception_layer.intercepted -= 1; if (http_client.findTransfer(transfer_id)) |transfer| { - transfer.abort(error.ClientDisconnect); + // `kill` (not `abort`) fires shutdown_callback (or no-op), + // not error_callback. error_callback re-enters JS via XHR + // / script error handlers, which crash because the + // inspector + V8 context this BC owns are about to be + // torn down (and partially already are: we just called + // `inspector.stopSession`). See lightpanda-io/browser#2462. + transfer.kill(); } } diff --git a/src/network/WsConnection.zig b/src/network/WsConnection.zig index d4598904..aa9f4b3d 100644 --- a/src/network/WsConnection.zig +++ b/src/network/WsConnection.zig @@ -353,31 +353,27 @@ pub fn deinit(self: *WsConnection) void { pub fn send(self: *WsConnection, data: []const u8) !void { var pos: usize = 0; - var changed_to_blocking: bool = false; defer _ = self.send_arena.reset(.{ .retain_with_limit = 1024 * 32 }); - defer if (changed_to_blocking) { - // We had to change our socket to blocking me to get our write out - // We need to change it back to non-blocking. - _ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags) catch |err| { - log.err(.app, "ws restore nonblocking", .{ .err = err }); - }; - }; - LOOP: while (pos < data.len) { const written = posix.write(self.socket, data[pos..]) catch |err| switch (err) { error.WouldBlock => { - // self.socket is nonblocking, because we don't want to block - // reads. But our life is a lot easier if we block writes, - // largely, because we don't have to maintain a queue of pending - // writes (which would each need their own allocations). So - // if we get a WouldBlock error, we'll switch the socket to - // blocking and switch it back to non-blocking after the write - // is complete. Doesn't seem particularly efficiently, but - // this should virtually never happen. - assert(changed_to_blocking == false, "WsConnection.double block", .{}); - changed_to_blocking = true; - _ = try posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true }))); + // The peer's recv buffer is full. Naively switching to a + // blocking write would deadlock when the peer is _also_ + // blocked trying to send to us — a real scenario under CDP + // Fetch.enable + http-proxy, where lightpanda emits a flood + // of Fetch.requestPaused events while puppeteer is trying + // to send us back the matching Fetch.continueRequest + // messages. Each side fills the other's TCP send buffer, + // and a blocking write here never returns. See + // lightpanda-io/browser#2462. + // + // Instead, poll for POLLOUT _while also_ draining any + // POLLIN data into the reader buffer (without processing — + // that would re-enter the dispatch path and call back into + // send, risking unbounded recursion). The buffered bytes + // are picked up by the next CDP.readSocket call. + try self.waitWritableDrainingReads(); continue :LOOP; }, else => return err, @@ -390,6 +386,83 @@ pub fn send(self: *WsConnection, data: []const u8) !void { } } +// Block until the socket is writable. Concurrently drains any incoming +// bytes into the reader buffer (without processing them) so the peer +// keeps making progress reading our writes. Used to break the two-way +// backpressure deadlock described in `send`. +fn waitWritableDrainingReads(self: *WsConnection) !void { + // Bounded so a truly dead peer eventually surfaces (TCP keepalive will + // close the socket; this just bounds any single send). + const POLL_TIMEOUT_MS: i32 = 30_000; + + // We may have to stop watching POLLIN if the reader buffer is at its + // cap and we can't make any more space (otherwise poll would return + // immediately every iteration with POLLIN set, busy-spinning). + var watch_in = true; + + while (true) { + const events_in: i16 = if (watch_in) @as(i16, @intCast(posix.POLL.IN)) else 0; + var pfds = [_]posix.pollfd{.{ + .fd = self.socket, + .events = @as(i16, @intCast(posix.POLL.OUT)) | events_in, + .revents = 0, + }}; + const n = try posix.poll(&pfds, POLL_TIMEOUT_MS); + if (n == 0) return error.WouldBlock; + + const revents = pfds[0].revents; + if (revents & @as(i16, @intCast(posix.POLL.NVAL | posix.POLL.ERR | posix.POLL.HUP)) != 0) { + return error.Closed; + } + + if (watch_in and revents & @as(i16, @intCast(posix.POLL.IN)) != 0) { + // Drain whatever's available without processing. May grow the + // buffer up to CDP_MAX_MESSAGE_SIZE. + const before = self.bufferedBytes(); + try self.drainAvailable(); + if (self.bufferedBytes() == before and self.reader.readBuf().len == 0) { + // Buffer is at cap and we couldn't read anything more. + // Stop watching POLLIN so we don't busy-spin while we wait + // for POLLOUT. + watch_in = false; + } + } + + if (revents & @as(i16, @intCast(posix.POLL.OUT)) != 0) { + return; + } + } +} + +// Read everything currently available on the socket into the reader +// buffer (non-blocking). Grows the buffer if needed (capped at +// CDP_MAX_MESSAGE_SIZE so a misbehaving peer can't OOM us). Does not +// process messages — the caller will handle that on the next pass +// through CDP.readSocket. +fn drainAvailable(self: *WsConnection) !void { + while (true) { + var buf = self.reader.readBuf(); + if (buf.len == 0) { + const current = self.reader.buf.len; + if (current >= CDP_MAX_MESSAGE_SIZE) { + // Already at the cap; refuse to grow further. Stop draining + // — the next POLLOUT-then-write attempt will block again, + // but at least the peer's reads will eventually free our + // send buffer and let us proceed. + return; + } + self.reader.buf = try growBuffer(self.reader.allocator, self.reader.buf, current + 1); + buf = self.reader.readBuf(); + } + const n = posix.read(self.socket, buf) catch |err| switch (err) { + error.WouldBlock => return, + else => return err, + }; + if (n == 0) return error.Closed; // peer closed + self.reader.len += n; + } +} + const EMPTY_PONG = [_]u8{ 138, 0 }; fn sendPong(self: *WsConnection, data: []const u8) !void { @@ -477,6 +550,37 @@ pub fn read(self: *WsConnection) !usize { return n; } +pub const ReadResult = enum { data, no_new_data, closed }; + +// Variant of `read` that distinguishes "no new bytes on the wire" from +// "peer closed". Used by CDP.readSocket so it can still drain buffered +// messages (those rescued during a backpressured send) without bailing +// when posix.read reports EWOULDBLOCK. +pub fn tryRead(self: *WsConnection) !ReadResult { + const buf = self.reader.readBuf(); + if (buf.len == 0) { + // Reader buffer is completely full of unprocessed messages. Don't + // call posix.read with a zero-length buffer (returns 0 → looks like + // EOF). Caller should processMessages first. + return if (self.bufferedBytes() > 0) .no_new_data else .closed; + } + const n = posix.read(self.socket, buf) catch |err| switch (err) { + error.WouldBlock => return .no_new_data, + else => return err, + }; + if (n == 0) return .closed; + self.reader.len += n; + return .data; +} + +// Number of bytes sitting in the reader buffer that haven't been parsed +// into messages yet. Used by HttpClient.perform to detect that data was +// rescued from the socket during a backpressured send and still needs to +// be dispatched. +pub fn bufferedBytes(self: *const WsConnection) usize { + return self.reader.len - self.reader.pos; +} + fn processHttpRequest(self: *WsConnection) !HttpResult { assert(self.reader.pos == 0, "WsConnection.HTTP pos", .{ .pos = self.reader.pos }); const request = self.reader.buf[0..self.reader.len]; diff --git a/src/network/http.zig b/src/network/http.zig index e97e779f..21a5b6f5 100644 --- a/src/network/http.zig +++ b/src/network/http.zig @@ -83,8 +83,17 @@ pub const Headers = struct { return .{ .headers = updated_headers }; } - pub fn deinit(self: *const Headers) void { + pub fn deinit(self: *Headers) void { + // Null out after free to make deinit idempotent. Headers is value- + // copied across structs (Transfer holds it inside a Request, and + // some layers / cleanup paths produce a second deinit on the same + // slist). Without this, those paths double-free the curl_slist + // chain and the ZigToCurlAllocator hits an unaligned-block error + // inside curl_slist_free_all on the second pass. RobotsLayer + // documented the value-copy hazard for its own path; this guard + // is the catch-all so other code paths don't have to. if (self.headers) |hdr| { + self.headers = null; libcurl.curl_slist_free_all(hdr); } }