diff --git a/src/browser/HttpClient.zig b/src/browser/HttpClient.zig index 127a4a32..bf48c719 100644 --- a/src/browser/HttpClient.zig +++ b/src/browser/HttpClient.zig @@ -170,6 +170,12 @@ pub const CDPClient = struct { blocking_read_start: *const fn (*anyopaque) bool, blocking_read: *const fn (*anyopaque) bool, blocking_read_end: *const fn (*anyopaque) bool, + // Returns true if the CDP client has bytes already buffered (typically + // rescued from the socket while a synchronous send was backpressured; + // see WsConnection.send). When true, perform() returns .cdp_socket so + // the runner drains them even if the OS recv buffer is empty. See + // lightpanda-io/browser#2462. + has_buffered_input: *const fn (*anyopaque) bool, }; pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp_client: ?CDPClient) !void { @@ -413,7 +419,8 @@ pub fn _request(_: *anyopaque, transfer: *Transfer) !void { // inside this function, we free it before returning the error. Callers // must NOT pair `request()` with their own `errdefer headers.deinit()` // — that's a double-free. -pub fn request(self: *Client, req: Request, owner: ?*Owner) !void { +pub fn request(self: *Client, req_in: Request, owner: ?*Owner) !void { + var req = req_in; const arena = self.arena_pool.acquire(.small, "Request.arena") catch |err| { req.headers.deinit(); return err; @@ -658,21 +665,36 @@ fn perform(self: *Client, timeout_ms: c_int) anyerror!PerformStatus { // We're potentially going to block for a while until we get data. Process // whatever messages we have waiting ahead of time. - if (try self.processMessages()) { - return .normal; - } + const processed = try self.processMessages(); var status = PerformStatus.normal; if (self.cdp_client) |cdp_client| { + // Bytes may have been rescued from the CDP socket while a + // synchronous send was backpressured (see WsConnection.send). The + // OS recv buffer is empty in that case, but we still owe the + // dispatcher a chance to process them. See + // lightpanda-io/browser#2462. + if (cdp_client.has_buffered_input(cdp_client.ctx)) { + return .cdp_socket; + } + + // Even when we processed completion messages this round, do a + // non-blocking poll of the CDP socket so a steady stream of HTTP + // completions can't starve CDP reads. Without this, a flood of + // intercepted/proxied transfers can leave Fetch.continueRequest + // messages unread for seconds at a time. + const cdp_timeout: c_int = if (processed) 0 else timeout_ms; var wait_fds = [_]http.WaitFd{.{ .fd = cdp_client.socket, .events = .{ .pollin = true }, .revents = .{}, }}; - try self.handles.poll(&wait_fds, timeout_ms); + try self.handles.poll(&wait_fds, cdp_timeout); if (wait_fds[0].revents.pollin or wait_fds[0].revents.pollpri or wait_fds[0].revents.pollout) { status = .cdp_socket; } + } else if (processed) { + return .normal; } else if (running > 0) { try self.handles.poll(&.{}, timeout_ms); } @@ -961,7 +983,7 @@ pub const Request = struct { return written.ptr[0..written.len :0]; } - pub fn deinit(self: *const Request) void { + pub fn deinit(self: *Request) void { self.headers.deinit(); } }; @@ -1131,6 +1153,22 @@ pub const Transfer = struct { } pub fn deinit(self: *Transfer) void { + // Use `transfers.remove` as a one-shot ownership claim. If it + // returns false, this transfer has already been deinit'd by a + // cascade out of `error_callback` (e.g. Script.errorCallback -> + // manager.evaluate() -> JS execution -> Frame.deinit -> + // abortOwner -> Transfer.kill -> Transfer.deinit). Returning + // here keeps us from double-freeing the arena. See + // lightpanda-io/browser#2462. + // + // Reading `self.id` and `self.client` after a prior deinit has + // released `self.arena` is technically a stale read, but + // arena_pool zombies the memory until the slot is handed out + // again. If by some race the slot were re-used between the two + // deinits, the new tenant's id would not be in `transfers` + // either, so the early-out still fires and we bail cleanly. + if (!self.client.transfers.remove(self.id)) return; + if (self._conn) |conn| { self.client.removeConn(conn); self._conn = null; @@ -1145,10 +1183,6 @@ pub const Transfer = struct { self._queued = false; } - // Drop the id→*Transfer index entry before freeing the memory. - // Any concurrent CDP lookup by id will now see this transfer as gone. - _ = self.client.transfers.remove(self.id); - self.req.deinit(); if (self.owner) |o| { o.removeTransfer(self); @@ -1175,8 +1209,12 @@ pub const Transfer = struct { // Owner-driven teardown: fires shutdown_callback (not error_callback) // and otherwise behaves like abort. Called by Client.abortOwner / - // abortRequests when a Frame / WGS is being torn down. - fn kill(self: *Transfer) void { + // abortRequests when a Frame / WGS is being torn down, and from + // BrowserContext.deinit when the CDP connection drops with paused + // intercepts still outstanding -- in both cases firing error_callback + // would re-enter JS via XHR/script error handlers, and the inspector + // / V8 context the handler reaches into has already been torn down. + pub fn kill(self: *Transfer) void { if (self.req.shutdown_callback) |cb| { cb(self.req.ctx); } diff --git a/src/browser/ScriptManagerBase.zig b/src/browser/ScriptManagerBase.zig index fbaf2c6e..8041fb34 100644 --- a/src/browser/ScriptManagerBase.zig +++ b/src/browser/ScriptManagerBase.zig @@ -286,22 +286,41 @@ pub fn preloadImport(self: *ScriptManagerBase, url: [:0]const u8, referrer: []co } pub fn waitForImport(self: *ScriptManagerBase, url: [:0]const u8) !ModuleSource { - const entry = self.imported_modules.getEntry(url) orelse { - // It shouldn't be possible for v8 to ask for a module that we didn't - // `preloadImport` above. - return error.UnknownModule; - }; - const was_evaluating = self.is_evaluating; self.is_evaluating = true; defer self.is_evaluating = was_evaluating; var client = self.client; while (true) { + // Re-fetch the entry on every iteration. We can't cache the pointer + // outside the loop: the CDP-drain step below (re-)enters JS, which + // can call back into `preloadImport` for a transitively-imported + // module. `preloadImport` calls `imported_modules.getOrPut`, which + // may rehash the map and invalidate every existing entry pointer. + // A cached `entry` would then be a use-after-free on the next + // `entry.value_ptr.state` access. See lightpanda-io/browser#2462. + const entry = self.imported_modules.getEntry(url) orelse { + // It shouldn't be possible for v8 to ask for a module that we + // didn't `preloadImport` above. + return error.UnknownModule; + }; + switch (entry.value_ptr.state) { .loading => { - _ = try client.tick(200); - continue; + // Drain pending CDP messages every iteration. Without this, + // a script imported under `Fetch.enable` whose request is + // paused at the InterceptionLayer hangs forever: the CDP + // client's matching `Fetch.continueRequest` reply never + // reaches `CDP.processMessage`, the request never resumes, + // and this loop spins until the client times out. + // `syncRequest` already does this; the import path needs + // the same treatment. See lightpanda-io/browser#2462. + const status = try client.tick(200); + if (status == .cdp_socket) { + if (client.cdp_client) |cdp| { + _ = cdp.blocking_read(cdp.ctx); + } + } }, .done => |script| { var shared = false; diff --git a/src/cdp/CDP.zig b/src/cdp/CDP.zig index ecb00753..1568cd4c 100644 --- a/src/cdp/CDP.zig +++ b/src/cdp/CDP.zig @@ -111,6 +111,7 @@ pub fn init( .blocking_read_start = CDP.blockingReadStart, .blocking_read = CDP.blockingRead, .blocking_read_end = CDP.blockingReadStop, + .has_buffered_input = CDP.hasBufferedInput, }); } @@ -149,15 +150,27 @@ pub fn blockingReadStop(ctx: *anyopaque) bool { return true; } +pub fn hasBufferedInput(ctx: *anyopaque) bool { + const self: *CDP = @ptrCast(@alignCast(ctx)); + return self.ws.bufferedBytes() > 0; +} + pub fn readSocket(self: *CDP) bool { - const n = self.ws.read() catch |err| { + // Use tryRead, not read, so that if the socket has no new bytes (because + // we already drained them while a backpressured send was waiting in + // WsConnection.send) we still process whatever is buffered instead of + // logging an error and bailing. See lightpanda-io/browser#2462. + const result = self.ws.tryRead() catch |err| { log.warn(.app, "CDP read", .{ .err = err }); return false; }; - if (n == 0) { - log.info(.app, "CDP disconnect", .{}); - return false; + switch (result) { + .closed => { + log.info(.app, "CDP disconnect", .{}); + return false; + }, + .data, .no_new_data => {}, } return self.ws.processMessages(self) catch false; @@ -574,7 +587,13 @@ pub const BrowserContext = struct { ); http_client.interception_layer.intercepted -= 1; if (http_client.findTransfer(transfer_id)) |transfer| { - transfer.abort(error.ClientDisconnect); + // `kill` (not `abort`) fires shutdown_callback (or no-op), + // not error_callback. error_callback re-enters JS via XHR + // / script error handlers, which crash because the + // inspector + V8 context this BC owns are about to be + // torn down (and partially already are: we just called + // `inspector.stopSession`). See lightpanda-io/browser#2462. + transfer.kill(); } } diff --git a/src/network/WsConnection.zig b/src/network/WsConnection.zig index d4598904..aa9f4b3d 100644 --- a/src/network/WsConnection.zig +++ b/src/network/WsConnection.zig @@ -353,31 +353,27 @@ pub fn deinit(self: *WsConnection) void { pub fn send(self: *WsConnection, data: []const u8) !void { var pos: usize = 0; - var changed_to_blocking: bool = false; defer _ = self.send_arena.reset(.{ .retain_with_limit = 1024 * 32 }); - defer if (changed_to_blocking) { - // We had to change our socket to blocking me to get our write out - // We need to change it back to non-blocking. - _ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags) catch |err| { - log.err(.app, "ws restore nonblocking", .{ .err = err }); - }; - }; - LOOP: while (pos < data.len) { const written = posix.write(self.socket, data[pos..]) catch |err| switch (err) { error.WouldBlock => { - // self.socket is nonblocking, because we don't want to block - // reads. But our life is a lot easier if we block writes, - // largely, because we don't have to maintain a queue of pending - // writes (which would each need their own allocations). So - // if we get a WouldBlock error, we'll switch the socket to - // blocking and switch it back to non-blocking after the write - // is complete. Doesn't seem particularly efficiently, but - // this should virtually never happen. - assert(changed_to_blocking == false, "WsConnection.double block", .{}); - changed_to_blocking = true; - _ = try posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true }))); + // The peer's recv buffer is full. Naively switching to a + // blocking write would deadlock when the peer is _also_ + // blocked trying to send to us — a real scenario under CDP + // Fetch.enable + http-proxy, where lightpanda emits a flood + // of Fetch.requestPaused events while puppeteer is trying + // to send us back the matching Fetch.continueRequest + // messages. Each side fills the other's TCP send buffer, + // and a blocking write here never returns. See + // lightpanda-io/browser#2462. + // + // Instead, poll for POLLOUT _while also_ draining any + // POLLIN data into the reader buffer (without processing — + // that would re-enter the dispatch path and call back into + // send, risking unbounded recursion). The buffered bytes + // are picked up by the next CDP.readSocket call. + try self.waitWritableDrainingReads(); continue :LOOP; }, else => return err, @@ -390,6 +386,83 @@ pub fn send(self: *WsConnection, data: []const u8) !void { } } +// Block until the socket is writable. Concurrently drains any incoming +// bytes into the reader buffer (without processing them) so the peer +// keeps making progress reading our writes. Used to break the two-way +// backpressure deadlock described in `send`. +fn waitWritableDrainingReads(self: *WsConnection) !void { + // Bounded so a truly dead peer eventually surfaces (TCP keepalive will + // close the socket; this just bounds any single send). + const POLL_TIMEOUT_MS: i32 = 30_000; + + // We may have to stop watching POLLIN if the reader buffer is at its + // cap and we can't make any more space (otherwise poll would return + // immediately every iteration with POLLIN set, busy-spinning). + var watch_in = true; + + while (true) { + const events_in: i16 = if (watch_in) @as(i16, @intCast(posix.POLL.IN)) else 0; + var pfds = [_]posix.pollfd{.{ + .fd = self.socket, + .events = @as(i16, @intCast(posix.POLL.OUT)) | events_in, + .revents = 0, + }}; + const n = try posix.poll(&pfds, POLL_TIMEOUT_MS); + if (n == 0) return error.WouldBlock; + + const revents = pfds[0].revents; + if (revents & @as(i16, @intCast(posix.POLL.NVAL | posix.POLL.ERR | posix.POLL.HUP)) != 0) { + return error.Closed; + } + + if (watch_in and revents & @as(i16, @intCast(posix.POLL.IN)) != 0) { + // Drain whatever's available without processing. May grow the + // buffer up to CDP_MAX_MESSAGE_SIZE. + const before = self.bufferedBytes(); + try self.drainAvailable(); + if (self.bufferedBytes() == before and self.reader.readBuf().len == 0) { + // Buffer is at cap and we couldn't read anything more. + // Stop watching POLLIN so we don't busy-spin while we wait + // for POLLOUT. + watch_in = false; + } + } + + if (revents & @as(i16, @intCast(posix.POLL.OUT)) != 0) { + return; + } + } +} + +// Read everything currently available on the socket into the reader +// buffer (non-blocking). Grows the buffer if needed (capped at +// CDP_MAX_MESSAGE_SIZE so a misbehaving peer can't OOM us). Does not +// process messages — the caller will handle that on the next pass +// through CDP.readSocket. +fn drainAvailable(self: *WsConnection) !void { + while (true) { + var buf = self.reader.readBuf(); + if (buf.len == 0) { + const current = self.reader.buf.len; + if (current >= CDP_MAX_MESSAGE_SIZE) { + // Already at the cap; refuse to grow further. Stop draining + // — the next POLLOUT-then-write attempt will block again, + // but at least the peer's reads will eventually free our + // send buffer and let us proceed. + return; + } + self.reader.buf = try growBuffer(self.reader.allocator, self.reader.buf, current + 1); + buf = self.reader.readBuf(); + } + const n = posix.read(self.socket, buf) catch |err| switch (err) { + error.WouldBlock => return, + else => return err, + }; + if (n == 0) return error.Closed; // peer closed + self.reader.len += n; + } +} + const EMPTY_PONG = [_]u8{ 138, 0 }; fn sendPong(self: *WsConnection, data: []const u8) !void { @@ -477,6 +550,37 @@ pub fn read(self: *WsConnection) !usize { return n; } +pub const ReadResult = enum { data, no_new_data, closed }; + +// Variant of `read` that distinguishes "no new bytes on the wire" from +// "peer closed". Used by CDP.readSocket so it can still drain buffered +// messages (those rescued during a backpressured send) without bailing +// when posix.read reports EWOULDBLOCK. +pub fn tryRead(self: *WsConnection) !ReadResult { + const buf = self.reader.readBuf(); + if (buf.len == 0) { + // Reader buffer is completely full of unprocessed messages. Don't + // call posix.read with a zero-length buffer (returns 0 → looks like + // EOF). Caller should processMessages first. + return if (self.bufferedBytes() > 0) .no_new_data else .closed; + } + const n = posix.read(self.socket, buf) catch |err| switch (err) { + error.WouldBlock => return .no_new_data, + else => return err, + }; + if (n == 0) return .closed; + self.reader.len += n; + return .data; +} + +// Number of bytes sitting in the reader buffer that haven't been parsed +// into messages yet. Used by HttpClient.perform to detect that data was +// rescued from the socket during a backpressured send and still needs to +// be dispatched. +pub fn bufferedBytes(self: *const WsConnection) usize { + return self.reader.len - self.reader.pos; +} + fn processHttpRequest(self: *WsConnection) !HttpResult { assert(self.reader.pos == 0, "WsConnection.HTTP pos", .{ .pos = self.reader.pos }); const request = self.reader.buf[0..self.reader.len]; diff --git a/src/network/http.zig b/src/network/http.zig index e97e779f..21a5b6f5 100644 --- a/src/network/http.zig +++ b/src/network/http.zig @@ -83,8 +83,17 @@ pub const Headers = struct { return .{ .headers = updated_headers }; } - pub fn deinit(self: *const Headers) void { + pub fn deinit(self: *Headers) void { + // Null out after free to make deinit idempotent. Headers is value- + // copied across structs (Transfer holds it inside a Request, and + // some layers / cleanup paths produce a second deinit on the same + // slist). Without this, those paths double-free the curl_slist + // chain and the ZigToCurlAllocator hits an unaligned-block error + // inside curl_slist_free_all on the second pass. RobotsLayer + // documented the value-copy hazard for its own path; this guard + // is the catch-all so other code paths don't have to. if (self.headers) |hdr| { + self.headers = null; libcurl.curl_slist_free_all(hdr); } }