Fix Fetch.enable + http-proxy CDP send/recv deadlock (#2462)

WsConnection.send used to switch the socket to blocking mode on
EWOULDBLOCK, with a comment claiming this 'should virtually never
happen'. Under CDP Fetch.enable + --http-proxy it is routine: the
CONNECT+TLS proxy round-trip means many subresources are in flight
simultaneously, lightpanda emits a flood of Fetch.requestPaused events,
the kernel send buffer fills, lightpanda blocks on write, and
puppeteer's matching Fetch.continueRequest replies pile up in
lightpanda's TCP recv buffer (which lightpanda can't drain because it
is blocked on the write). Both peers wedge until the client times out.

Other contributing problems all collapse paused-intercept sites where
an outer loop polls without ever draining the CDP socket, OR where
disconnect-time cleanup re-enters JS through paths the runtime can no
longer satisfy:

* HttpClient.perform skipped the CDP socket poll entirely whenever
  processMessages() returned non-empty, so a steady stream of HTTP
  completions could starve CDP reads.
* ScriptManagerBase.waitForImport spun on `client.tick(200)` and
  discarded the .cdp_socket return, so a script `import()` whose
  request was paused at the InterceptionLayer hung forever.
* BrowserContext.deinit aborted pending intercepts via `transfer.abort`,
  which fired XHR/script error_callback chains into a half-torn-down
  V8 context (the inspector had already been stopped two lines above).
* Headers.deinit was non-idempotent, so a value-copied Headers (the
  hazard RobotsLayer documents) double-freed its curl_slist on the
  second deinit; the symptom was an "incorrect alignment" panic
  inside ZigToCurlAllocator.free.
* Transfer.deinit was non-idempotent, so a cascade out of
  error_callback (e.g. Script.errorCallback -> manager.evaluate() ->
  JS execution -> Frame.deinit -> abortOwner -> Transfer.kill ->
  Transfer.deinit) reached `arena_pool.release` twice on the same
  arena.

Coordinated changes:

* src/network/WsConnection.zig: On EWOULDBLOCK, instead of switching
  to a blocking write, poll for both POLLOUT and POLLIN. While waiting
  for write space, drain any incoming bytes into the reader buffer
  (without dispatching - that would re-enter send and recurse). Adds
  tryRead/bufferedBytes accessors.

* src/browser/HttpClient.zig:
  - Add has_buffered_input to CDPClient. In perform(), return
    .cdp_socket when buffered input exists, and always do at least a
    non-blocking poll on the CDP socket so HTTP completions can no
    longer starve CDP reads.
  - Make Transfer.deinit idempotent by claiming ownership through
    `client.transfers.remove(self.id)`. Second deinits (cascades out
    of error_callback) early-return.
  - Make `Transfer.kill` public (was `fn`) so BrowserContext.deinit
    can use it.
  - Tighten RequestParams.deinit / Request.deinit to take `*` instead
    of `*const` so they can call into `Headers.deinit` (now mutating).

* src/network/http.zig: Headers.deinit now nulls out `self.headers`
  after `curl_slist_free_all`, so a second deinit is a no-op. Without
  this guard a value-copied Headers double-frees the curl_slist (the
  hazard RobotsLayer's call site already documents).

* src/browser/ScriptManagerBase.zig: waitForImport now drains pending
  CDP messages on every iteration (matching the syncRequest pattern)
  and re-fetches the imported_modules entry per iteration. The cached
  entry was a use-after-free risk because the CDP-drain step above
  re-enters JS, and a transitively-imported module's preloadImport()
  -> getOrPut() can rehash the map and invalidate the prior entry
  pointer.

* src/cdp/CDP.zig:
  - Wire hasBufferedInput.
  - Replace read() with tryRead() in readSocket and tolerate the
    no_new_data case so we still process messages drained during a
    backpressured send.
  - BrowserContext.deinit aborts pending intercepts via
    `transfer.kill` instead of `transfer.abort`. `kill` fires
    shutdown_callback (or no-op for transfers without one), avoiding
    error_callback's re-entry into JS through XHR/script error
    handlers - those crash because the V8 context and inspector this
    BC owns have either been torn down already or are about to be.

Verified end-to-end against a puppeteer + http-proxy reproducer:

| URL                        | before               | after            |
|----------------------------|----------------------|------------------|
| example.com                | OK 124ms             | OK 117ms         |
| github.com                 | HANG 20s (12/82)     | OK 1410ms (82/82)|
| shopify.com                | HANG 20s (1/4)       | OK 1973ms (66/66)|
| allbirds.com               | HANG 20s (12/53)     | OK 3944ms (372)  |
| allbirds wool-runners PDP  | HANG 20s (12/53)     | OK 6286ms (459)  |

(nike.com still doesn't reach `load` event but all 68 continueRequests
process cleanly - the remaining stall is third-party widgets keeping
the page in `loading` state, not the CDP/HTTP deadlock this PR fixes.)

Lightpanda survives 18 back-to-back navigation runs across the matrix
above (3 per URL) without crashing.

Fixes #2462.
This commit is contained in:
Scott Taylor
2026-05-14 22:37:11 -04:00
committed by Pierre Tachoire
parent 8b098a3c97
commit f013990ff3
5 changed files with 235 additions and 46 deletions

View File

@@ -170,6 +170,12 @@ pub const CDPClient = struct {
blocking_read_start: *const fn (*anyopaque) bool,
blocking_read: *const fn (*anyopaque) bool,
blocking_read_end: *const fn (*anyopaque) bool,
// Returns true if the CDP client has bytes already buffered (typically
// rescued from the socket while a synchronous send was backpressured;
// see WsConnection.send). When true, perform() returns .cdp_socket so
// the runner drains them even if the OS recv buffer is empty. See
// lightpanda-io/browser#2462.
has_buffered_input: *const fn (*anyopaque) bool,
};
pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp_client: ?CDPClient) !void {
@@ -413,7 +419,8 @@ pub fn _request(_: *anyopaque, transfer: *Transfer) !void {
// inside this function, we free it before returning the error. Callers
// must NOT pair `request()` with their own `errdefer headers.deinit()`
// — that's a double-free.
pub fn request(self: *Client, req: Request, owner: ?*Owner) !void {
pub fn request(self: *Client, req_in: Request, owner: ?*Owner) !void {
var req = req_in;
const arena = self.arena_pool.acquire(.small, "Request.arena") catch |err| {
req.headers.deinit();
return err;
@@ -658,21 +665,36 @@ fn perform(self: *Client, timeout_ms: c_int) anyerror!PerformStatus {
// We're potentially going to block for a while until we get data. Process
// whatever messages we have waiting ahead of time.
if (try self.processMessages()) {
return .normal;
}
const processed = try self.processMessages();
var status = PerformStatus.normal;
if (self.cdp_client) |cdp_client| {
// Bytes may have been rescued from the CDP socket while a
// synchronous send was backpressured (see WsConnection.send). The
// OS recv buffer is empty in that case, but we still owe the
// dispatcher a chance to process them. See
// lightpanda-io/browser#2462.
if (cdp_client.has_buffered_input(cdp_client.ctx)) {
return .cdp_socket;
}
// Even when we processed completion messages this round, do a
// non-blocking poll of the CDP socket so a steady stream of HTTP
// completions can't starve CDP reads. Without this, a flood of
// intercepted/proxied transfers can leave Fetch.continueRequest
// messages unread for seconds at a time.
const cdp_timeout: c_int = if (processed) 0 else timeout_ms;
var wait_fds = [_]http.WaitFd{.{
.fd = cdp_client.socket,
.events = .{ .pollin = true },
.revents = .{},
}};
try self.handles.poll(&wait_fds, timeout_ms);
try self.handles.poll(&wait_fds, cdp_timeout);
if (wait_fds[0].revents.pollin or wait_fds[0].revents.pollpri or wait_fds[0].revents.pollout) {
status = .cdp_socket;
}
} else if (processed) {
return .normal;
} else if (running > 0) {
try self.handles.poll(&.{}, timeout_ms);
}
@@ -961,7 +983,7 @@ pub const Request = struct {
return written.ptr[0..written.len :0];
}
pub fn deinit(self: *const Request) void {
pub fn deinit(self: *Request) void {
self.headers.deinit();
}
};
@@ -1131,6 +1153,22 @@ pub const Transfer = struct {
}
pub fn deinit(self: *Transfer) void {
// Use `transfers.remove` as a one-shot ownership claim. If it
// returns false, this transfer has already been deinit'd by a
// cascade out of `error_callback` (e.g. Script.errorCallback ->
// manager.evaluate() -> JS execution -> Frame.deinit ->
// abortOwner -> Transfer.kill -> Transfer.deinit). Returning
// here keeps us from double-freeing the arena. See
// lightpanda-io/browser#2462.
//
// Reading `self.id` and `self.client` after a prior deinit has
// released `self.arena` is technically a stale read, but
// arena_pool zombies the memory until the slot is handed out
// again. If by some race the slot were re-used between the two
// deinits, the new tenant's id would not be in `transfers`
// either, so the early-out still fires and we bail cleanly.
if (!self.client.transfers.remove(self.id)) return;
if (self._conn) |conn| {
self.client.removeConn(conn);
self._conn = null;
@@ -1145,10 +1183,6 @@ pub const Transfer = struct {
self._queued = false;
}
// Drop the id→*Transfer index entry before freeing the memory.
// Any concurrent CDP lookup by id will now see this transfer as gone.
_ = self.client.transfers.remove(self.id);
self.req.deinit();
if (self.owner) |o| {
o.removeTransfer(self);
@@ -1175,8 +1209,12 @@ pub const Transfer = struct {
// Owner-driven teardown: fires shutdown_callback (not error_callback)
// and otherwise behaves like abort. Called by Client.abortOwner /
// abortRequests when a Frame / WGS is being torn down.
fn kill(self: *Transfer) void {
// abortRequests when a Frame / WGS is being torn down, and from
// BrowserContext.deinit when the CDP connection drops with paused
// intercepts still outstanding -- in both cases firing error_callback
// would re-enter JS via XHR/script error handlers, and the inspector
// / V8 context the handler reaches into has already been torn down.
pub fn kill(self: *Transfer) void {
if (self.req.shutdown_callback) |cb| {
cb(self.req.ctx);
}

View File

@@ -286,22 +286,41 @@ pub fn preloadImport(self: *ScriptManagerBase, url: [:0]const u8, referrer: []co
}
pub fn waitForImport(self: *ScriptManagerBase, url: [:0]const u8) !ModuleSource {
const entry = self.imported_modules.getEntry(url) orelse {
// It shouldn't be possible for v8 to ask for a module that we didn't
// `preloadImport` above.
return error.UnknownModule;
};
const was_evaluating = self.is_evaluating;
self.is_evaluating = true;
defer self.is_evaluating = was_evaluating;
var client = self.client;
while (true) {
// Re-fetch the entry on every iteration. We can't cache the pointer
// outside the loop: the CDP-drain step below (re-)enters JS, which
// can call back into `preloadImport` for a transitively-imported
// module. `preloadImport` calls `imported_modules.getOrPut`, which
// may rehash the map and invalidate every existing entry pointer.
// A cached `entry` would then be a use-after-free on the next
// `entry.value_ptr.state` access. See lightpanda-io/browser#2462.
const entry = self.imported_modules.getEntry(url) orelse {
// It shouldn't be possible for v8 to ask for a module that we
// didn't `preloadImport` above.
return error.UnknownModule;
};
switch (entry.value_ptr.state) {
.loading => {
_ = try client.tick(200);
continue;
// Drain pending CDP messages every iteration. Without this,
// a script imported under `Fetch.enable` whose request is
// paused at the InterceptionLayer hangs forever: the CDP
// client's matching `Fetch.continueRequest` reply never
// reaches `CDP.processMessage`, the request never resumes,
// and this loop spins until the client times out.
// `syncRequest` already does this; the import path needs
// the same treatment. See lightpanda-io/browser#2462.
const status = try client.tick(200);
if (status == .cdp_socket) {
if (client.cdp_client) |cdp| {
_ = cdp.blocking_read(cdp.ctx);
}
}
},
.done => |script| {
var shared = false;

View File

@@ -111,6 +111,7 @@ pub fn init(
.blocking_read_start = CDP.blockingReadStart,
.blocking_read = CDP.blockingRead,
.blocking_read_end = CDP.blockingReadStop,
.has_buffered_input = CDP.hasBufferedInput,
});
}
@@ -149,15 +150,27 @@ pub fn blockingReadStop(ctx: *anyopaque) bool {
return true;
}
pub fn hasBufferedInput(ctx: *anyopaque) bool {
const self: *CDP = @ptrCast(@alignCast(ctx));
return self.ws.bufferedBytes() > 0;
}
pub fn readSocket(self: *CDP) bool {
const n = self.ws.read() catch |err| {
// Use tryRead, not read, so that if the socket has no new bytes (because
// we already drained them while a backpressured send was waiting in
// WsConnection.send) we still process whatever is buffered instead of
// logging an error and bailing. See lightpanda-io/browser#2462.
const result = self.ws.tryRead() catch |err| {
log.warn(.app, "CDP read", .{ .err = err });
return false;
};
if (n == 0) {
log.info(.app, "CDP disconnect", .{});
return false;
switch (result) {
.closed => {
log.info(.app, "CDP disconnect", .{});
return false;
},
.data, .no_new_data => {},
}
return self.ws.processMessages(self) catch false;
@@ -574,7 +587,13 @@ pub const BrowserContext = struct {
);
http_client.interception_layer.intercepted -= 1;
if (http_client.findTransfer(transfer_id)) |transfer| {
transfer.abort(error.ClientDisconnect);
// `kill` (not `abort`) fires shutdown_callback (or no-op),
// not error_callback. error_callback re-enters JS via XHR
// / script error handlers, which crash because the
// inspector + V8 context this BC owns are about to be
// torn down (and partially already are: we just called
// `inspector.stopSession`). See lightpanda-io/browser#2462.
transfer.kill();
}
}

View File

@@ -353,31 +353,27 @@ pub fn deinit(self: *WsConnection) void {
pub fn send(self: *WsConnection, data: []const u8) !void {
var pos: usize = 0;
var changed_to_blocking: bool = false;
defer _ = self.send_arena.reset(.{ .retain_with_limit = 1024 * 32 });
defer if (changed_to_blocking) {
// We had to change our socket to blocking me to get our write out
// We need to change it back to non-blocking.
_ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags) catch |err| {
log.err(.app, "ws restore nonblocking", .{ .err = err });
};
};
LOOP: while (pos < data.len) {
const written = posix.write(self.socket, data[pos..]) catch |err| switch (err) {
error.WouldBlock => {
// self.socket is nonblocking, because we don't want to block
// reads. But our life is a lot easier if we block writes,
// largely, because we don't have to maintain a queue of pending
// writes (which would each need their own allocations). So
// if we get a WouldBlock error, we'll switch the socket to
// blocking and switch it back to non-blocking after the write
// is complete. Doesn't seem particularly efficiently, but
// this should virtually never happen.
assert(changed_to_blocking == false, "WsConnection.double block", .{});
changed_to_blocking = true;
_ = try posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true })));
// The peer's recv buffer is full. Naively switching to a
// blocking write would deadlock when the peer is _also_
// blocked trying to send to us — a real scenario under CDP
// Fetch.enable + http-proxy, where lightpanda emits a flood
// of Fetch.requestPaused events while puppeteer is trying
// to send us back the matching Fetch.continueRequest
// messages. Each side fills the other's TCP send buffer,
// and a blocking write here never returns. See
// lightpanda-io/browser#2462.
//
// Instead, poll for POLLOUT _while also_ draining any
// POLLIN data into the reader buffer (without processing —
// that would re-enter the dispatch path and call back into
// send, risking unbounded recursion). The buffered bytes
// are picked up by the next CDP.readSocket call.
try self.waitWritableDrainingReads();
continue :LOOP;
},
else => return err,
@@ -390,6 +386,83 @@ pub fn send(self: *WsConnection, data: []const u8) !void {
}
}
// Block until the socket is writable. Concurrently drains any incoming
// bytes into the reader buffer (without processing them) so the peer
// keeps making progress reading our writes. Used to break the two-way
// backpressure deadlock described in `send`.
fn waitWritableDrainingReads(self: *WsConnection) !void {
// Bounded so a truly dead peer eventually surfaces (TCP keepalive will
// close the socket; this just bounds any single send).
const POLL_TIMEOUT_MS: i32 = 30_000;
// We may have to stop watching POLLIN if the reader buffer is at its
// cap and we can't make any more space (otherwise poll would return
// immediately every iteration with POLLIN set, busy-spinning).
var watch_in = true;
while (true) {
const events_in: i16 = if (watch_in) @as(i16, @intCast(posix.POLL.IN)) else 0;
var pfds = [_]posix.pollfd{.{
.fd = self.socket,
.events = @as(i16, @intCast(posix.POLL.OUT)) | events_in,
.revents = 0,
}};
const n = try posix.poll(&pfds, POLL_TIMEOUT_MS);
if (n == 0) return error.WouldBlock;
const revents = pfds[0].revents;
if (revents & @as(i16, @intCast(posix.POLL.NVAL | posix.POLL.ERR | posix.POLL.HUP)) != 0) {
return error.Closed;
}
if (watch_in and revents & @as(i16, @intCast(posix.POLL.IN)) != 0) {
// Drain whatever's available without processing. May grow the
// buffer up to CDP_MAX_MESSAGE_SIZE.
const before = self.bufferedBytes();
try self.drainAvailable();
if (self.bufferedBytes() == before and self.reader.readBuf().len == 0) {
// Buffer is at cap and we couldn't read anything more.
// Stop watching POLLIN so we don't busy-spin while we wait
// for POLLOUT.
watch_in = false;
}
}
if (revents & @as(i16, @intCast(posix.POLL.OUT)) != 0) {
return;
}
}
}
// Read everything currently available on the socket into the reader
// buffer (non-blocking). Grows the buffer if needed (capped at
// CDP_MAX_MESSAGE_SIZE so a misbehaving peer can't OOM us). Does not
// process messages — the caller will handle that on the next pass
// through CDP.readSocket.
fn drainAvailable(self: *WsConnection) !void {
while (true) {
var buf = self.reader.readBuf();
if (buf.len == 0) {
const current = self.reader.buf.len;
if (current >= CDP_MAX_MESSAGE_SIZE) {
// Already at the cap; refuse to grow further. Stop draining
// — the next POLLOUT-then-write attempt will block again,
// but at least the peer's reads will eventually free our
// send buffer and let us proceed.
return;
}
self.reader.buf = try growBuffer(self.reader.allocator, self.reader.buf, current + 1);
buf = self.reader.readBuf();
}
const n = posix.read(self.socket, buf) catch |err| switch (err) {
error.WouldBlock => return,
else => return err,
};
if (n == 0) return error.Closed; // peer closed
self.reader.len += n;
}
}
const EMPTY_PONG = [_]u8{ 138, 0 };
fn sendPong(self: *WsConnection, data: []const u8) !void {
@@ -477,6 +550,37 @@ pub fn read(self: *WsConnection) !usize {
return n;
}
pub const ReadResult = enum { data, no_new_data, closed };
// Variant of `read` that distinguishes "no new bytes on the wire" from
// "peer closed". Used by CDP.readSocket so it can still drain buffered
// messages (those rescued during a backpressured send) without bailing
// when posix.read reports EWOULDBLOCK.
pub fn tryRead(self: *WsConnection) !ReadResult {
const buf = self.reader.readBuf();
if (buf.len == 0) {
// Reader buffer is completely full of unprocessed messages. Don't
// call posix.read with a zero-length buffer (returns 0 → looks like
// EOF). Caller should processMessages first.
return if (self.bufferedBytes() > 0) .no_new_data else .closed;
}
const n = posix.read(self.socket, buf) catch |err| switch (err) {
error.WouldBlock => return .no_new_data,
else => return err,
};
if (n == 0) return .closed;
self.reader.len += n;
return .data;
}
// Number of bytes sitting in the reader buffer that haven't been parsed
// into messages yet. Used by HttpClient.perform to detect that data was
// rescued from the socket during a backpressured send and still needs to
// be dispatched.
pub fn bufferedBytes(self: *const WsConnection) usize {
return self.reader.len - self.reader.pos;
}
fn processHttpRequest(self: *WsConnection) !HttpResult {
assert(self.reader.pos == 0, "WsConnection.HTTP pos", .{ .pos = self.reader.pos });
const request = self.reader.buf[0..self.reader.len];

View File

@@ -83,8 +83,17 @@ pub const Headers = struct {
return .{ .headers = updated_headers };
}
pub fn deinit(self: *const Headers) void {
pub fn deinit(self: *Headers) void {
// Null out after free to make deinit idempotent. Headers is value-
// copied across structs (Transfer holds it inside a Request, and
// some layers / cleanup paths produce a second deinit on the same
// slist). Without this, those paths double-free the curl_slist
// chain and the ZigToCurlAllocator hits an unaligned-block error
// inside curl_slist_free_all on the second pass. RobotsLayer
// documented the value-copy hazard for its own path; this guard
// is the catch-all so other code paths don't have to.
if (self.headers) |hdr| {
self.headers = null;
libcurl.curl_slist_free_all(hdr);
}
}