Main/Network reads CDP socket

Previously, the CDP socket was added to the worker's multi and fully owned
by the worker. While this is simple, it introduced some issues:

1 - Cannot detect a disconnected client during JS processing ( for(;;) )

2 - A blocked worker can cause back-pressure that blocks the client. This can
    cause a deadlock if the worker is blocked waiting for a CDP message

In addition to these 2 problems, there was 1 other serious CDP-related issue:
arbitrary CDP messages could be processed during JavaScript callback. For
example, a Worker calls importScripts while request interception is enabled,
this requires us to tick the HttpClient waiting for the interception response.
But, a client could sent Target.closeTarget, which we'd process and delete the
frame..all while importScripts is still blocked. Assuming importScripts unblocks
everything is a big UAF since the frame (and its workers) were cleared from
closeTarget.

The CDP socket is now read from the network (main) thread and an OTP-style
mailbox is used. The network thread posts message to the Worker's inbox and
signals it to wakeup. This solves #1 and #2. It doesn't directly solve the
reentrancy issue, but it provides the foundation. Specifically, in introduces
a queue for of CDP message and more control over when/how that queue is
processed. At "safe points" (Runner.tick, HttpClient.tick), any message can
be processed. But, when inside a JavaScript callback, we can process only non-
destructive/mutating message. Specifically, we can process only messages related
to request interception.
This commit is contained in:
Karl Seguin
2026-05-19 13:58:48 +08:00
parent 8ef6084fdb
commit 875c147783
13 changed files with 1207 additions and 307 deletions

322
src/Inbox.zig Normal file
View File

@@ -0,0 +1,322 @@
// Copyright (C) 2023-2026 Lightpanda (Selecy SAS)
//
// Francis Bouvier <francis@lightpanda.io>
// Pierre Tachoire <pierre@lightpanda.io>
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
// Thread-safe FIFO of Messages. Producer pushes from one thread,
// consumer pops from another. No wake mechanism is bundled — callers
// arrange that themselves (e.g. curl_multi_wakeup on the consumer's
// curl multi handle).
//
// Backed by a DoublyLinkedList so that pop is O(1) and the
// allowlist-during-sync-wait drain can cherry-pick messages out of
// the middle in O(1) given a node pointer.
const std = @import("std");
const CDP = @import("cdp/CDP.zig");
const ArenaPool = @import("ArenaPool.zig");
const Allocator = std.mem.Allocator;
const DoublyLinkedList = std.DoublyLinkedList;
const Inbox = @This();
mutex: std.Thread.Mutex = .{},
queue: DoublyLinkedList = .{},
pub fn deinit(self: *Inbox, arena_pool: *ArenaPool) void {
self.mutex.lock();
defer self.mutex.unlock();
while (self.queue.popFirst()) |node| {
const msg: *Message = @fieldParentPtr("node", node);
msg.deinit(arena_pool);
}
}
pub fn push(self: *Inbox, arena: Allocator, payload: Message.Payload) void {
const msg = arena.create(Message) catch |err| switch (err) {
error.OutOfMemory => @panic("OOM"),
};
msg.* = .{ .payload = payload, .arena = arena };
self.mutex.lock();
defer self.mutex.unlock();
self.queue.append(&msg.node);
}
pub fn pop(self: *Inbox) ?*Message {
self.mutex.lock();
defer self.mutex.unlock();
const node = self.queue.popFirst() orelse return null;
return @fieldParentPtr("node", node);
}
// Cherry-pick the first message for which `predicate(msg)` returns
// true, removing it from the queue. Walks the queue in FIFO order;
// non-matching messages stay in place. Used to dispatch only the
// safe subset of messages during sync-wait paths (the allowlist),
// while leaving unsafe ones to be drained at the next safe point.
pub fn popIf(self: *Inbox, predicate: *const fn (*Message) bool) ?*Message {
self.mutex.lock();
defer self.mutex.unlock();
var it = self.queue.first;
while (it) |node| : (it = node.next) {
const msg: *Message = @fieldParentPtr("node", node);
if (predicate(msg)) {
self.queue.remove(node);
return msg;
}
}
return null;
}
pub const Message = struct {
arena: Allocator,
payload: Payload,
node: DoublyLinkedList.Node = .{},
pub const Payload = union(enum) {
// A CDP text/binary frame, parsed on the Network thread. `raw`
// is the original JSON bytes (owned). `arena` holds any
// auxiliary allocations from parseFromSliceLeaky (typically
// empty for unescaped messages, but slices in `input` may
// reference it). `input` is the parsed view; its string
// slices reference `raw` or `arena`. Both must outlive the
// consumer's use of `input`.
cdp: Cdp,
// WS ping frame body (≤125 bytes per spec). Consumer is
// expected to echo via pong on its thread.
ping: []u8,
// Peer-initiated close frame. Consumer is expected to send a
// close reply and tear the connection down. The peer's close
// body is dropped — historically we always reply CLOSE_NORMAL
// (status 1000) regardless of what the peer sent.
close: void,
// No allocation; conveys "no more messages will arrive on
// this inbox" plus an optional reason. The Network thread
// pushes this on peer EOF, fatal WS framing error, or
// (now) JSON parse failure.
disconnect: ?anyerror,
};
pub const Cdp = struct {
raw: []u8,
input: CDP.InputMessage,
};
pub fn deinit(self: *const Message, pool: *ArenaPool) void {
pool.release(self.arena);
}
};
const testing = @import("testing.zig");
test "Inbox: push then pop returns FIFO order" {
const arena_pool = &testing.test_app.arena_pool;
var inbox = Inbox{};
defer inbox.deinit(&testing.test_app.arena_pool);
{
const arena = try arena_pool.acquire(.tiny, "inbox test");
inbox.push(arena, .{ .ping = try arena.dupe(u8, "first") });
}
{
const arena = try arena_pool.acquire(.tiny, "inbox test");
inbox.push(arena, .{ .ping = try arena.dupe(u8, "second") });
}
{
const arena = try arena_pool.acquire(.tiny, "inbox test");
inbox.push(arena, .{ .disconnect = null });
}
{
const m = inbox.pop().?;
defer m.deinit(arena_pool);
try testing.expectEqual("first", m.payload.ping);
}
{
const m = inbox.pop().?;
defer m.deinit(arena_pool);
try testing.expectEqual("second", m.payload.ping);
}
{
const m = inbox.pop().?;
defer m.deinit(arena_pool);
try testing.expectEqual(@as(?anyerror, null), m.payload.disconnect);
}
try testing.expect(inbox.pop() == null);
}
test "Inbox: deinit frees remaining items" {
const arena_pool = &testing.test_app.arena_pool;
var inbox = Inbox{};
{
const arena = try arena_pool.acquire(.tiny, "inbox test");
inbox.push(arena, .{ .ping = try arena.dupe(u8, "leftover") });
}
{
const arena = try arena_pool.acquire(.tiny, "inbox test");
inbox.push(arena, .{ .disconnect = error.PeerClosed });
}
inbox.deinit(&testing.test_app.arena_pool);
// Memory leaks would be caught by the test runner.
}
fn testAlwaysTrue(_: *Message) bool {
return true;
}
fn testAlwaysFalse(_: *Message) bool {
return false;
}
fn testIsPing(msg: *Message) bool {
return msg.payload == .ping;
}
test "Inbox: popIf on empty queue returns null" {
var inbox = Inbox{};
defer inbox.deinit(&testing.test_app.arena_pool);
try testing.expect(inbox.popIf(testAlwaysTrue) == null);
}
test "Inbox: popIf with no match leaves queue intact" {
const arena_pool = &testing.test_app.arena_pool;
var inbox = Inbox{};
defer inbox.deinit(arena_pool);
{
const arena = try arena_pool.acquire(.tiny, "popif test");
inbox.push(arena, .{ .ping = try arena.dupe(u8, "first") });
}
{
const arena = try arena_pool.acquire(.tiny, "popif test");
inbox.push(arena, .{ .ping = try arena.dupe(u8, "second") });
}
try testing.expect(inbox.popIf(testAlwaysFalse) == null);
// Original FIFO order preserved.
{
const m = inbox.pop().?;
defer m.deinit(arena_pool);
try testing.expectEqual("first", m.payload.ping);
}
{
const m = inbox.pop().?;
defer m.deinit(arena_pool);
try testing.expectEqual("second", m.payload.ping);
}
try testing.expect(inbox.pop() == null);
}
test "Inbox: popIf with always-true predicate behaves like pop" {
const arena_pool = &testing.test_app.arena_pool;
var inbox = Inbox{};
defer inbox.deinit(arena_pool);
{
const arena = try arena_pool.acquire(.tiny, "popif test");
inbox.push(arena, .{ .ping = try arena.dupe(u8, "a") });
}
{
const arena = try arena_pool.acquire(.tiny, "popif test");
inbox.push(arena, .{ .ping = try arena.dupe(u8, "b") });
}
{
const m = inbox.popIf(testAlwaysTrue).?;
defer m.deinit(arena_pool);
try testing.expectEqual("a", m.payload.ping);
}
{
const m = inbox.popIf(testAlwaysTrue).?;
defer m.deinit(arena_pool);
try testing.expectEqual("b", m.payload.ping);
}
try testing.expect(inbox.popIf(testAlwaysTrue) == null);
}
test "Inbox: popIf cherry-picks middle, preserves order of remainder" {
const arena_pool = &testing.test_app.arena_pool;
var inbox = Inbox{};
defer inbox.deinit(arena_pool);
{
const arena = try arena_pool.acquire(.tiny, "popif test");
inbox.push(arena, .{ .disconnect = null });
}
{
const arena = try arena_pool.acquire(.tiny, "popif test");
inbox.push(arena, .{ .ping = try arena.dupe(u8, "middle") });
}
{
const arena = try arena_pool.acquire(.tiny, "popif test");
inbox.push(arena, .{ .disconnect = error.PeerClosed });
}
// testIsPing skips the disconnect at the head and picks the middle.
{
const m = inbox.popIf(testIsPing).?;
defer m.deinit(arena_pool);
try testing.expectEqual("middle", m.payload.ping);
}
// Remaining two disconnects pop in original order.
{
const m = inbox.pop().?;
defer m.deinit(arena_pool);
try testing.expect(m.payload.disconnect == null);
}
{
const m = inbox.pop().?;
defer m.deinit(arena_pool);
try testing.expect(m.payload.disconnect.? == error.PeerClosed);
}
try testing.expect(inbox.pop() == null);
}
test "Inbox: popIf picks first match in FIFO order" {
const arena_pool = &testing.test_app.arena_pool;
var inbox = Inbox{};
defer inbox.deinit(arena_pool);
{
const arena = try arena_pool.acquire(.tiny, "popif test");
inbox.push(arena, .{ .ping = try arena.dupe(u8, "first") });
}
{
const arena = try arena_pool.acquire(.tiny, "popif test");
inbox.push(arena, .{ .disconnect = null });
}
{
const arena = try arena_pool.acquire(.tiny, "popif test");
inbox.push(arena, .{ .ping = try arena.dupe(u8, "second") });
}
const m = inbox.popIf(testIsPing).?;
defer m.deinit(arena_pool);
try testing.expectEqual("first", m.payload.ping);
}

View File

@@ -72,7 +72,9 @@ pub fn shutdown(self: *Server) void {
for (self.cdps.items) |cdp| {
if (cdp.conn.state == .live) {
cdp.browser.env.terminate();
cdp.conn.sendClose();
// We use to send a nice WS close frame here but (a) it isn't
// strictly required and (b) we'd have to protect against an interleaved
// write from the worker thread.
}
cdp.conn.shutdown();
}
@@ -166,13 +168,16 @@ fn handleConnection(self: *Server, socket: posix.socket_t) void {
defer _ = self.active_threads.fetchSub(1, .monotonic);
defer posix.close(socket);
// CDP is HUGE (> 512KB) because Connection has a large read buffer.
// V8 crashes if this is on the stack (likely related to its size).
const cdp = self.allocCDP() catch |err| {
log.err(.app, "CDP alloc", .{ .err = err });
return;
const cdp = blk: {
self.cdp_mutex.lock();
defer self.cdp_mutex.unlock();
break :blk self.cdp_pool.create() catch @panic("OOM");
};
defer self.releaseCDP(cdp);
defer {
self.cdp_mutex.lock();
defer self.cdp_mutex.unlock();
self.cdp_pool.destroy(cdp);
}
cdp.init(self.app, socket, self.json_version_response) catch |err| {
log.err(.app, "CDP init", .{ .err = err });
@@ -185,18 +190,63 @@ fn handleConnection(self: *Server, socket: posix.socket_t) void {
log.info(.app, "client connected", .{ .ip = client_address });
}
self.registerCDP(cdp);
defer self.unregisterCDP(cdp);
{
// track the connection
self.cdp_mutex.lock();
defer self.cdp_mutex.unlock();
self.cdps.append(self.app.allocator, cdp) catch {};
}
defer {
// untrack the connection
self.cdp_mutex.lock();
defer self.cdp_mutex.unlock();
for (self.cdps.items, 0..) |c, i| {
if (c == cdp) {
_ = self.cdps.swapRemove(i);
break;
}
}
}
const upgraded = cdp.conn.handshake() catch |err| {
log.err(.app, "CDP handshake", .{ .err = err });
return;
};
if (!upgraded) {
return;
}
self.markLive(cdp);
{
// Transition from .handshake state to .live
// Lock needed even though the main thread hasn't seen this yet because
// shutdown could access this from the sighandler thread.
self.cdp_mutex.lock();
defer self.cdp_mutex.unlock();
cdp.conn.state = .live;
}
// Hand the read side of the CDP socket over to the Network thread.
// From here until the matching unregisterCdp, the worker must NOT
// read from the socket directly — bytes arrive via the inbox.
// unregisterCdp is synchronous, so by the time it returns Network
// is guaranteed to be done with this link.
//
// cdp_link_active gates HttpClient.perform's block in
// curl_multi_poll: with it false (tests, pre-handshake), perform
// skips the poll when there's no in-flight curl work — sleeping
// would just eat the timeout waiting for a wakeup that won't
// come. We set it true *after* registerCdp so Network is already
// accepting wakeups by the time the worker might poll, and clear
// it *after* unregisterCdp returns (Network is guaranteed done
// with us by then).
self.app.network.registerCdp(&cdp.link);
cdp.browser.http_client.cdp_link_active = true;
defer {
self.app.network.unregisterCdp(&cdp.link);
cdp.browser.http_client.cdp_link_active = false;
}
// Check shutdown after markLive so that a concurrent shutdown either
// sees us as .live and terminates us, or we observe the stop signal
@@ -214,44 +264,6 @@ fn handleConnection(self: *Server, socket: posix.socket_t) void {
}
}
fn allocCDP(self: *Server) !*CDP {
self.cdp_mutex.lock();
defer self.cdp_mutex.unlock();
return self.cdp_pool.create();
}
fn releaseCDP(self: *Server, cdp: *CDP) void {
self.cdp_mutex.lock();
defer self.cdp_mutex.unlock();
self.cdp_pool.destroy(cdp);
}
fn registerCDP(self: *Server, cdp: *CDP) void {
self.cdp_mutex.lock();
defer self.cdp_mutex.unlock();
self.cdps.append(self.app.allocator, cdp) catch {};
}
fn unregisterCDP(self: *Server, cdp: *CDP) void {
self.cdp_mutex.lock();
defer self.cdp_mutex.unlock();
for (self.cdps.items, 0..) |c, i| {
if (c == cdp) {
_ = self.cdps.swapRemove(i);
break;
}
}
}
fn markLive(self: *Server, cdp: *CDP) void {
self.cdp_mutex.lock();
defer self.cdp_mutex.unlock();
cdp.conn.state = .live;
}
// Utils
// --------
fn buildJSONVersionResponse(app: *const App, port: u16) ![]const u8 {
const host = app.config.advertiseHost();
if (std.mem.eql(u8, host, "0.0.0.0")) {
@@ -283,9 +295,6 @@ fn buildJSONVersionResponse(app: *const App, port: u16) ![]const u8 {
return try std.fmt.allocPrint(app.allocator, response_format, .{ body_len, host, port });
}
pub const timestamp = @import("datetime.zig").timestamp;
pub const milliTimestamp = @import("datetime.zig").milliTimestamp;
const testing = @import("testing.zig");
test "server: buildJSONVersionResponse" {
const res = try buildJSONVersionResponse(testing.test_app, testing.test_app.config.port());
@@ -384,6 +393,9 @@ test "Client: http valid handshake" {
}
test "Client: read invalid websocket message" {
const filter: testing.LogFilter = .init(&.{.cdp});
defer filter.deinit();
// 131 = 128 (fin) | 3 where 3 isn't a valid type
try assertWebSocketError(
1002,

View File

@@ -23,6 +23,7 @@ const Allocator = std.mem.Allocator;
const js = @import("js/js.zig");
const App = @import("../App.zig");
const HttpClient = @import("HttpClient.zig");
const CDP = @import("../cdp/CDP.zig");
const ArenaPool = App.ArenaPool;
@@ -69,7 +70,7 @@ pub fn nextFrameId(self: *Browser) u32 {
return id;
}
pub fn init(self: *Browser, app: *App, opts: InitOpts, cdp_client: ?HttpClient.CDPClient) !void {
pub fn init(self: *Browser, app: *App, opts: InitOpts, cdp: ?*CDP) !void {
const allocator = app.allocator;
var env = try js.Env.init(app, opts.env);
@@ -84,7 +85,7 @@ pub fn init(self: *Browser, app: *App, opts: InitOpts, cdp_client: ?HttpClient.C
.http_client = undefined,
.page_pool = std.heap.MemoryPool(Page).init(allocator),
};
try self.http_client.init(allocator, &app.network, cdp_client);
try self.http_client.init(allocator, &app.network, cdp);
}
pub fn deinit(self: *Browser) void {

View File

@@ -31,6 +31,8 @@ const http = @import("../network/http.zig");
const Robots = @import("../network/Robots.zig");
const Network = @import("../network/Network.zig");
const CDP = @import("../cdp/CDP.zig");
const Inbox = @import("../Inbox.zig");
const CachedResponse = @import("../network/cache/Cache.zig").CachedResponse;
pub const CacheLayer = @import("../network/layer/CacheLayer.zig");
@@ -127,7 +129,29 @@ obey_robots: bool,
user_agent_override: ?[:0]const u8 = null,
user_agent_header_override: ?[:0]const u8 = null,
cdp_client: ?CDPClient = null,
// The CDP layer we dispatch inbox messages to. Set in CDP.init for
// `serve` mode; null in all other modes. Since this is set early, BEFORE the
// CDP socket is registered with the network thread, we also have the
// `cdp_link_active` boolean.
cdp: ?*CDP = null,
// True iff a producer (Server.handleConnection, after the worker
// handshake completes) has registered the CDP socket with the Network
// thread and Network will fire curl_multi_wakeup on our multi handle
// when it pushes to the inbox. perform uses this — NOT `cdp != null`
// — to decide whether to block in poll without any in-flight curl
// work. cdp is set in CDP.init, well before the link is wired; tests
// and the pre-handshake window have a cdp but no producer, so polling
// there would just eat the timeout waiting for a wakeup that's never
// coming.
cdp_link_active: bool = false,
// CDP messages parsed off the WS socket by the Network thread land
// here. perform drains the inbox at each safe point and dispatches
// via cdp.onMessage / onPing / onClose / onDisconnect. Always present
// even in non-CDP mode — the empty-queue drain is one mutex lock plus
// a linked-list head check, cheaper than nullability everywhere.
inbox: Inbox,
max_response_size: usize,
@@ -155,25 +179,7 @@ fn layerWith(self: anytype, next: Layer) Layer {
return self.layer();
}
// libcurl can monitor arbitrary sockets, this lets us use libcurl to poll
// both HTTP data as well as messages from an CDP connection.
// Furthermore, we have some tension between blocking scripts and request
// interception. For non-blocking scripts, because nothing blocks, we can
// just queue the scripts until we receive a response to the interception
// notification. But for blocking scripts (which block the parser), it's hard
// to return control back to the CDP loop. So the `read` function pointer is
// used by the Client to have the CDP client read more data from the socket,
// specifically when we're waiting for a request interception response to
// a blocking script.
pub const CDPClient = struct {
ctx: *anyopaque,
socket: posix.socket_t,
blocking_read_start: *const fn (*anyopaque) bool,
blocking_read: *const fn (*anyopaque) bool,
blocking_read_end: *const fn (*anyopaque) bool,
};
pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp_client: ?CDPClient) !void {
pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp: ?*CDP) !void {
var handles = try http.Handles.init(network.config);
errdefer handles.deinit();
@@ -183,7 +189,8 @@ pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp_client:
.handles = handles,
.network = network,
.allocator = allocator,
.cdp_client = cdp_client,
.cdp = cdp,
.inbox = .{},
.use_proxy = http_proxy != null,
.http_proxy = http_proxy,
@@ -228,6 +235,7 @@ pub fn deinit(self: *Client) void {
self.robots_layer.deinit(self.allocator);
self.transfers.deinit(self.allocator);
self.inbox.deinit(self.arena_pool);
}
// Look up a live transfer by its id. Returns null if the transfer has been
@@ -379,14 +387,29 @@ pub fn abortRequests(_: *Client, owner: *Owner) void {
// owner itself is freed, no orphan transfer points at it.
}
pub fn tick(self: *Client, timeout_ms: u32) !PerformStatus {
// What CDP messages drainInbox is allowed to dispatch this tick.
// .all — outer event loop (Runner.tick). Safe to dispatch
// everything; the JS stack is empty.
// .sync_wait — reachable from inside a JS callback (syncRequest,
// waitForImport). The JS callstack above us holds
// refs to page / session / V8 state; dispatching a
// command that frees that state would UAF on unwind.
// Cherry-pick only Fetch interception responses
const DrainMode = enum { all, sync_wait };
pub fn tick(self: *Client, timeout_ms: u32, mode: DrainMode) !void {
try self.drainQueue();
const status = try self.perform(@intCast(timeout_ms));
try self.perform(@intCast(timeout_ms));
// perform/processMessages just released a batch of connections back to
// the pool. Drain again so queued transfers can use them this tick
// instead of waiting for the next runner iteration.
try self.drainQueue();
return status;
// Dispatch CDP messages here, not inside perform: perform recurses
// via processOneMessage's redirect path (perform → processMessages
// → processOneMessage → perform), and dispatching CDP from that
// nested call would fire CDP handlers mid-redirect, defeating the
// "safe points only" guarantee.
try self.drainInbox(mode);
}
fn drainQueue(self: *Client) !void {
@@ -526,17 +549,7 @@ pub fn syncRequest(self: *Client, allocator: Allocator, req: Request) !SyncRespo
try self.request(r, null);
while (sync_ctx.completion == .in_progress) {
const status = try self.tick(200);
log.debug(.http, "sync request tick", .{ .status = status });
switch (status) {
.cdp_socket => {
const cdp = self.cdp_client.?;
if (cdp.blocking_read(cdp.ctx) == false) {
return error.ClientDisconnected;
}
},
.normal => continue,
}
try self.tick(200, .sync_wait);
}
switch (sync_ctx.completion) {
@@ -631,12 +644,7 @@ fn makeRequest(self: *Client, conn: *http.Connection, transfer: *Transfer) anyer
_ = try self.handles.perform();
}
pub const PerformStatus = enum {
cdp_socket,
normal,
};
fn perform(self: *Client, timeout_ms: c_int) anyerror!PerformStatus {
fn perform(self: *Client, timeout_ms: c_int) anyerror!void {
const running = blk: {
self.performing = true;
defer self.performing = false;
@@ -659,29 +667,82 @@ fn perform(self: *Client, timeout_ms: c_int) anyerror!PerformStatus {
try self.trackConn(conn);
}
// We're potentially going to block for a while until we get data. Process
// whatever messages we have waiting ahead of time.
if (try self.processMessages()) {
return .normal;
}
// Process completions queued from the curl_multi_perform above before
// we potentially block.
_ = try self.processMessages();
var status = PerformStatus.normal;
if (self.cdp_client) |cdp_client| {
var wait_fds = [_]http.WaitFd{.{
.fd = cdp_client.socket,
.events = .{ .pollin = true },
.revents = .{},
}};
try self.handles.poll(&wait_fds, timeout_ms);
if (wait_fds[0].revents.pollin or wait_fds[0].revents.pollpri or wait_fds[0].revents.pollout) {
status = .cdp_socket;
}
} else if (running > 0) {
// Poll for HTTP I/O. The Network thread will call curl_multi_wakeup
// on our multi handle whenever it pushes to our inbox, so we drop
// out of poll promptly even when we have no curl handles in flight
// — but ONLY if a producer is actually wired up. `cdp_link_active`
// is set by Server.handleConnection once network.registerCdp has
// returned; in tests (which never register) and during the
// pre-handshake window the flag stays false and we don't waste a
// poll timeout waiting for a wakeup that won't arrive.
if (running > 0 or self.cdp_link_active) {
// when cdp_link_active == true, the network thread will unblock this
// by calling wakup on our multi.
try self.handles.poll(&.{}, timeout_ms);
}
_ = try self.processMessages();
return status;
}
// Drain any CDP messages the Network thread pushed into our inbox
// and dispatch them via the cdp_client callbacks. Returns
// error.ClientDisconnected if the inbox surfaced a disconnect message,
// so the worker loop can tear down the connection. Called from tick
// only — NOT from perform, because perform recurses through
// processOneMessage's redirect path.
fn drainInbox(self: *Client, mode: DrainMode) !void {
const cdp = self.cdp orelse return;
while (true) {
const msg = switch (mode) {
.all => self.inbox.pop(),
.sync_wait => self.inbox.popIf(allowDuringSyncWait),
} orelse return;
defer msg.deinit(self.arena_pool);
switch (msg.payload) {
.cdp => |*c| cdp.onMessage(c) catch |err| {
// A single malformed/failed dispatch shouldn't poison
// the rest of the batch — log and continue.
log.warn(.cdp, "CDP dispatch", .{ .err = err });
},
.ping => |body| cdp.onPing(body),
.close => {
cdp.onClose();
cdp.onDisconnect(null);
return error.ClientDisconnected;
},
.disconnect => |err| {
cdp.onDisconnect(err);
return error.ClientDisconnected;
},
}
}
}
// Predicate for Inbox.popIf during sync_wait drains. Always allows
// ping/close/disconnect (control frames must be observed). CDP data
// messages are filtered: only the four Fetch interception methods
// are safe to dispatch from inside a JS callback (they mutate
// transfer state via InterceptionLayer; they don't touch page /
// session / V8 state). The check is exact on the parsed `method`
// field — no substring matching against raw JSON.
fn allowDuringSyncWait(msg: *Inbox.Message) bool {
return switch (msg.payload) {
.ping, .close, .disconnect => true,
.cdp => |c| isFetchInterceptionMethod(c.input.method),
};
}
fn isFetchInterceptionMethod(method: []const u8) bool {
return std.mem.eql(u8, method, "Fetch.continueRequest") or
std.mem.eql(u8, method, "Fetch.failRequest") or
std.mem.eql(u8, method, "Fetch.fulfillRequest") or
std.mem.eql(u8, method, "Fetch.continueWithAuth");
}
fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *Transfer) !bool {
@@ -1670,3 +1731,92 @@ pub const Owner = struct {
self.websockets.remove(&ws._owner_node);
}
};
const testing = @import("../testing.zig");
test "HttpClient: isFetchInterceptionMethod matches the four Fetch methods" {
try testing.expect(isFetchInterceptionMethod("Fetch.continueRequest"));
try testing.expect(isFetchInterceptionMethod("Fetch.failRequest"));
try testing.expect(isFetchInterceptionMethod("Fetch.fulfillRequest"));
try testing.expect(isFetchInterceptionMethod("Fetch.continueWithAuth"));
}
test "HttpClient: isFetchInterceptionMethod rejects unrelated methods" {
try testing.expect(!isFetchInterceptionMethod(""));
try testing.expect(!isFetchInterceptionMethod("Fetch.enable"));
try testing.expect(!isFetchInterceptionMethod("Fetch.disable"));
try testing.expect(!isFetchInterceptionMethod("Page.navigate"));
try testing.expect(!isFetchInterceptionMethod("Runtime.evaluate"));
// strict-equality check: a prefix of a valid name must not match
try testing.expect(!isFetchInterceptionMethod("Fetch.continueReq"));
// trailing space, etc.
try testing.expect(!isFetchInterceptionMethod("Fetch.continueRequest "));
}
test "HttpClient: allowDuringSyncWait allows ping/close/disconnect" {
var ping_msg = Inbox.Message{
.arena = testing.allocator,
.payload = .{ .ping = "" },
};
try testing.expect(allowDuringSyncWait(&ping_msg));
var close_msg = Inbox.Message{
.arena = testing.allocator,
.payload = .close,
};
try testing.expect(allowDuringSyncWait(&close_msg));
var disconnect_msg = Inbox.Message{
.arena = testing.allocator,
.payload = .{ .disconnect = null },
};
try testing.expect(allowDuringSyncWait(&disconnect_msg));
var disconnect_err_msg = Inbox.Message{
.arena = testing.allocator,
.payload = .{ .disconnect = error.PeerClosed },
};
try testing.expect(allowDuringSyncWait(&disconnect_err_msg));
}
test "HttpClient: allowDuringSyncWait allows only Fetch interception CDP methods" {
var raw_buf: [16]u8 = undefined;
inline for ([_][]const u8{
"Fetch.continueRequest",
"Fetch.failRequest",
"Fetch.fulfillRequest",
"Fetch.continueWithAuth",
}) |method| {
var msg = Inbox.Message{
.arena = testing.allocator,
.payload = .{ .cdp = .{
.raw = &raw_buf,
.input = .{ .method = method },
} },
};
try testing.expect(allowDuringSyncWait(&msg));
}
}
test "HttpClient: allowDuringSyncWait denies non-Fetch CDP methods" {
var raw_buf: [16]u8 = undefined;
inline for ([_][]const u8{
"Page.navigate",
"Runtime.evaluate",
"Target.createTarget",
"Fetch.enable",
"Fetch.disable",
"",
}) |method| {
var msg = Inbox.Message{
.arena = testing.allocator,
.payload = .{ .cdp = .{
.raw = &raw_buf,
.input = .{ .method = method },
} },
};
try testing.expect(!allowDuringSyncWait(&msg));
}
}

View File

@@ -53,19 +53,18 @@ pub const WaitOpts = struct {
ms: u32,
until: lp.Config.WaitUntil = .done,
};
pub fn wait(self: *Runner, opts: WaitOpts) !void {
_ = try self._wait(false, opts);
return self._wait(false, opts);
}
pub const CDPWaitResult = enum {
done,
cdp_socket,
};
pub fn waitCDP(self: *Runner, opts: WaitOpts) !CDPWaitResult {
pub fn waitCDP(self: *Runner, opts: WaitOpts) !void {
return self._wait(true, opts);
}
fn _wait(self: *Runner, comptime is_cdp: bool, opts: WaitOpts) !CDPWaitResult {
// Wait until either a parse-state / load goal is reached or `opts.ms`
// elapses. Returns as soon as _tick reports .done.
fn _wait(self: *Runner, comptime is_cdp: bool, opts: WaitOpts) !void {
const session = self.session;
const browser = session.browser;
@@ -105,13 +104,26 @@ fn _wait(self: *Runner, comptime is_cdp: bool, opts: WaitOpts) !CDPWaitResult {
const next_ms = switch (tick_result) {
.ok => |next_ms| next_ms,
.done => return .done,
.cdp_socket => if (comptime is_cdp) return .cdp_socket else unreachable,
.done => done_blk: {
if (comptime is_cdp == false) {
return;
}
// is_cdp keeps the loop alive past .done so the worker
// can observe CDP commands. We have nothing useful to do here
// but we can ask the http_client to wait for CDP messages.
const elapsed: u32 = @intCast(timer.read() / std.time.ns_per_ms);
if (elapsed >= opts.ms) {
return;
}
try self.http_client.tick(@min(opts.ms - elapsed, 200), .all);
break :done_blk 0;
},
};
const ms_elapsed: u32 = @intCast(timer.read() / std.time.ns_per_ms);
if (ms_elapsed >= opts.ms) {
return .done;
return;
}
if (next_ms > 0) {
std.Thread.sleep(std.time.ns_per_ms * next_ms);
@@ -129,23 +141,10 @@ pub const TickResult = union(enum) {
ok: u32,
};
pub fn tick(self: *Runner, opts: TickOpts) !TickResult {
return switch (try self._tick(false, opts)) {
.ok => |ms| .{ .ok = ms },
.done => .done,
.cdp_socket => unreachable,
};
return self._tick(false, opts);
}
pub const CDPTickResult = union(enum) {
done,
cdp_socket,
ok: u32,
};
pub fn tickCDP(self: *Runner, opts: TickOpts) !CDPTickResult {
return self._tick(true, opts);
}
fn _tick(self: *Runner, comptime is_cdp: bool, opts: TickOpts) !CDPTickResult {
fn _tick(self: *Runner, comptime is_cdp: bool, opts: TickOpts) !TickResult {
// Refresh self.frame from session. In case of pending page, we want to
// take its state while loading. If we use only the current frame, we will
// return a .done result immediately.
@@ -156,19 +155,14 @@ fn _tick(self: *Runner, comptime is_cdp: bool, opts: TickOpts) !CDPTickResult {
switch (frame._parse_state) {
.pre, .raw, .text, .image => {
// The main frame hasn't started/finished navigating.
// There's no JS to run, and no reason to run the scheduler.
// There's no JS to run, and no reason to run the scheduler
// — unless we're the CDP worker, in which case we want
// http_client.tick to drain the inbox.
if (http_client.http_active == 0 and (comptime is_cdp) == false) {
// haven't started navigating, I guess.
return .done;
}
// Either we have active http connections, or we're in CDP
// mode with an extra socket. Either way, we're waiting
// for http traffic
const http_result = try http_client.tick(@intCast(opts.ms));
if ((comptime is_cdp) and http_result == .cdp_socket) {
return .cdp_socket;
}
try http_client.tick(@intCast(opts.ms), .all);
return .{ .ok = 0 };
},
.html, .complete => {
@@ -212,11 +206,7 @@ fn _tick(self: *Runner, comptime is_cdp: bool, opts: TickOpts) !CDPTickResult {
},
}
if (http_active == 0 and http_client.ws_active == 0 and http_client.queue.first == null and http_client.ready_queue.first == null and (comptime is_cdp == false)) {
// we don't need to consider http_client.intercepted here
// because is_cdp is false, and that can only be
// the case when interception isn't possible.
//
if (http_active == 0 and http_client.ws_active == 0 and http_client.queue.first == null and http_client.ready_queue.first == null and (comptime is_cdp) == false) {
// ready_queue is also part of the check: makeRequest now
// wraps its handles.perform() in a performing=true window,
// and any synchronous libcurl callback that ends up
@@ -224,9 +214,10 @@ fn _tick(self: *Runner, comptime is_cdp: bool, opts: TickOpts) !CDPTickResult {
// a WebSocket) will append to ready_queue. Without this
// check we could observe it non-empty after
// http_client.tick returns.
// we don't need to consider http_client.intercepted here
// because is_cdp is false, and that can only be
// the case when interception isn't possible.
//
// intercepted is only non-zero in serve mode, and
// serve mode implies cdp_client != null — so if we got
// here, intercepted == 0.
if (comptime IS_DEBUG) {
std.debug.assert(http_client.interception_layer.intercepted == 0);
}
@@ -246,10 +237,9 @@ fn _tick(self: *Runner, comptime is_cdp: bool, opts: TickOpts) !CDPTickResult {
}
// We're here because we either have active HTTP
// connections, or is_cdp == false (aka, there's
// an cdp_socket registered with the http client).
// We should continue to run tasks, so we minimize how long
// we'll poll for network I/O.
// connections, or there's a CDP client whose inbox we have
// to drain via http_client.tick. We should continue to run
// tasks, so we minimize how long we'll poll for network I/O.
var ms_to_wait = @min(opts.ms, browser.msToNextMacrotask() orelse 200);
if (ms_to_wait > 10 and browser.hasBackgroundTasks()) {
// if we have background tasks, we don't want to wait too
@@ -257,10 +247,7 @@ fn _tick(self: *Runner, comptime is_cdp: bool, opts: TickOpts) !CDPTickResult {
// to the top of the loop and run macrotasks.
ms_to_wait = 10;
}
const http_result = try http_client.tick(@intCast(@min(opts.ms, ms_to_wait)));
if ((comptime is_cdp) and http_result == .cdp_socket) {
return .cdp_socket;
}
try http_client.tick(@intCast(@min(opts.ms, ms_to_wait)), .all);
return .{ .ok = 0 };
},
.err => |err| {
@@ -269,10 +256,7 @@ fn _tick(self: *Runner, comptime is_cdp: bool, opts: TickOpts) !CDPTickResult {
},
.raw_done => {
if (comptime is_cdp) {
const http_result = try http_client.tick(@intCast(opts.ms));
if (http_result == .cdp_socket) {
return .cdp_socket;
}
try http_client.tick(@intCast(opts.ms), .all);
return .{ .ok = 0 };
}
return .done;

View File

@@ -300,7 +300,7 @@ pub fn waitForImport(self: *ScriptManagerBase, url: [:0]const u8) !ModuleSource
while (true) {
switch (entry.value_ptr.state) {
.loading => {
_ = try client.tick(200);
_ = try client.tick(200, .sync_wait);
continue;
},
.done => |script| {

View File

@@ -20,7 +20,10 @@ const std = @import("std");
const lp = @import("lightpanda");
const App = @import("../App.zig");
const Inbox = @import("../Inbox.zig");
const Network = @import("../network/Network.zig");
const Notification = @import("../Notification.zig");
const WS = @import("../network/WS.zig");
const js = @import("../browser/js/js.zig");
const Browser = @import("../browser/Browser.zig");
const Session = @import("../browser/Session.zig");
@@ -56,6 +59,12 @@ conn: Connection,
browser: Browser,
allocator: Allocator,
// Network-thread read-side handle for the CDP socket. Populated in
// init; Server.handleConnection calls network.registerCdp(&cdp.link)
// after the worker-side handshake completes, and unregisterCdp before
// teardown.
link: Network.CdpLink,
// when true, any target creation must be attached.
target_auto_attach: bool = false,
@@ -88,6 +97,7 @@ pub fn init(
self.* = .{
.app = app,
.link = undefined,
.conn = undefined,
.browser = undefined,
.allocator = allocator,
@@ -98,16 +108,18 @@ pub fn init(
.browser_context_arena = std.heap.ArenaAllocator.init(allocator),
};
try self.conn.init(socket, self.app.allocator, json_version_response);
try self.browser.init(app, .{ .env = .{ .with_inspector = true } }, self);
const http_client = &self.browser.http_client;
try self.conn.init(allocator, socket, json_version_response, &http_client.inbox, &app.arena_pool);
errdefer self.conn.deinit();
try self.browser.init(app, .{ .env = .{ .with_inspector = true } }, .{
.ctx = self,
self.link = .{
.cdp = self,
.state = .live,
.socket = socket,
.blocking_read_start = CDP.blockingReadStart,
.blocking_read = CDP.blockingRead,
.blocking_read_end = CDP.blockingReadStop,
});
.handles = http_client.handles,
};
}
pub fn deinit(self: *CDP) void {
@@ -121,101 +133,151 @@ pub fn deinit(self: *CDP) void {
self.browser_context_arena.deinit();
self.conn.deinit();
}
pub fn blockingReadStart(ctx: *anyopaque) bool {
const self: *CDP = @ptrCast(@alignCast(ctx));
self.conn.setBlocking(true) catch |err| {
log.warn(.app, "CDP blockingReadStart", .{ .err = err });
return false;
};
return true;
}
pub fn blockingRead(ctx: *anyopaque) bool {
const self: *CDP = @ptrCast(@alignCast(ctx));
return self.readSocket();
}
pub fn blockingReadStop(ctx: *anyopaque) bool {
const self: *CDP = @ptrCast(@alignCast(ctx));
self.conn.setBlocking(false) catch |err| {
log.warn(.app, "CDP blockingReadStop", .{ .err = err });
return false;
};
return true;
}
pub fn readSocket(self: *CDP) bool {
const n = self.conn.read() catch |err| {
log.warn(.app, "CDP read", .{ .err = err });
return false;
};
if (n == 0) {
log.info(.app, "CDP disconnect", .{});
return false;
// Called by Network when readable bytes arrive on the CDP socket.
// Feeds them through the WS framer and pushes each parsed frame into
// the worker's inbox. Returns false if a close frame was seen (or a
// fatal frame error) so Network drops the link from its poll set.
//
// One network read can carry more bytes than the reader's current
// free space — large CDP messages (Page.addScriptToEvaluateOnNewDocument,
// Runtime evaluation results, etc.) routinely exceed 16 KB, and a
// single read can contain multiple messages, or part of messages. We loop: feed
// what fits, run processMessages (which extracts complete frames, compacts the
// reader, and grows the buffer if it sees a frame header larger than current
// capacity), repeat until the chunk is drained.
pub fn onData(self: *CDP, data: []const u8) anyerror!bool {
var remaining = data;
while (remaining.len > 0) {
const n = self.conn.feedBytes(remaining);
remaining = remaining[n..];
if ((try self.conn.processMessages()) == false) {
return false;
}
}
return self.conn.processMessages(self) catch false;
}
pub fn sendJSON(self: *CDP, message: anytype) !void {
try self.conn.sendJSON(message, .{ .emit_null_optional_fields = false });
}
pub fn handleMessage(self: *CDP, msg: []const u8) bool {
// if there's an error, it's already been logged
self.processMessage(msg) catch return false;
return true;
}
// Called by Network when it drops the link unsolicited (peer EOF, read
// error, poll HUP/ERR). Push a disconnect into the inbox so the
// worker's drainInbox surfaces error.ClientDisconnected.
pub fn onLinkDisconnect(self: *CDP, err: ?anyerror) void {
const arena = self.browser.arena_pool.acquire(.tiny, "cdp disconnect") catch |e| switch (e) {
error.OutOfMemory => @panic("OOM"),
};
self.browser.http_client.inbox.push(arena, .{ .disconnect = err });
}
// Called in the Worker to dispatch a single CDP message bubbled up by
// HttpClient.drainInbox. The Network thread already parsed the JSON
// when it pushed the message to the inbox, so we skip straight to
// dispatchParsed without re-parsing. `c.raw` and `c.arena` outlive
// this call (they're owned by the inbox Message which drainInbox
// frees right after we return), so `c.input`'s string slices stay
// valid for the duration of dispatch.
pub fn onMessage(self: *CDP, c: *Inbox.Message.Cdp) anyerror!void {
const arena = &self.message_arena;
defer _ = arena.reset(.{ .retain_with_limit = 1024 * 16 });
return self.dispatchParsed(arena.allocator(), .{ .cdp = self }, c.raw, c.input);
}
// Parse + dispatch a raw JSON CDP message. Used by tests (which
// don't go through the Network thread / inbox pipeline) and by any
// caller that has bytes rather than a pre-parsed InputMessage.
pub fn processMessage(self: *CDP, msg: []const u8) !void {
const arena = &self.message_arena;
defer _ = arena.reset(.{ .retain_with_limit = 1024 * 16 });
return self.dispatch(arena.allocator(), .{ .cdp = self }, msg);
}
// @newhttp
// A bit hacky right now. The main server loop doesn't unblock for
// scheduled task. So we run this directly in order to process any
// timeouts (or http events) which are ready to be processed.
pub fn pageWait(self: *CDP, ms: u32) !Session.Runner.CDPWaitResult {
// Called in the worker when a PING message is received
pub fn onPing(self: *CDP, body: []const u8) void {
self.conn.sendPong(body) catch |err| {
log.warn(.app, "CDP pong", .{ .err = err });
};
}
// Called in the Worker when a peer-initiated close with CLOSE_NORMAL. The worker
// loop tears down immediately after; drainInbox returns
// error.ClientDisconnected once we return.
pub fn onClose(self: *CDP) void {
self.conn.send(&WS.CLOSE_NORMAL) catch |err| {
log.warn(.app, "CDP close reply", .{ .err = err });
};
}
// Called in the Worker when the peer disconnected (peer EOF, fatal frame error,
// or right after a peer close was replied to). drainInbox returns
// error.ClientDisconnected, which the worker loop catches to tear down.
//
// If `err` is a recognized WS framing error, send the matching close
// frame back to the peer before tearing down — that's how clients
// observe protocol violations (close code 1002 / 1009 / etc.).
pub fn onDisconnect(self: *CDP, err: ?anyerror) void {
if (err) |e| {
if (WS.errorReply(e)) |close_frame| {
self.conn.send(close_frame) catch {};
}
log.warn(.cdp, "CDP disconnect", .{ .err = e });
} else {
log.info(.cdp, "CDP disconnect", .{});
}
}
pub fn sendJSON(self: *CDP, message: anytype) !void {
try self.conn.sendJSON(message, .{ .emit_null_optional_fields = false });
}
pub fn tick(self: *CDP) !bool {
// Liveness is enforced by TCP keepalive configured in
// Server.configureSocket; the wakeup lets V8 run or terminate.
const wait_ms: u32 = 1000; // 1s
self.pageWait(wait_ms) catch |wait_err| switch (wait_err) {
error.NoPage => {
// No active page yet (or a teardown is in flight). Fall
// back to ticking the http client directly so CDP messages
// still get dispatched.
self.browser.http_client.tick(wait_ms, .all) catch |err| switch (err) {
error.ClientDisconnected => return false,
else => {
log.err(.app, "http tick", .{ .err = err });
return false;
},
};
},
error.ClientDisconnected => return false,
else => return wait_err,
};
return true;
}
fn pageWait(self: *CDP, ms: u32) !void {
const session = &(self.browser.session orelse return error.NoPage);
var runner = try session.runner(.{});
return runner.waitCDP(.{ .ms = ms });
}
pub fn tick(self: *CDP) !bool {
// Liveness is enforced by TCP keepalive configured in
// Network.acceptConnections; the wakeup lets V8 run or terminate.
const wait_ms: u32 = 1000; // 1s
const result = self.pageWait(wait_ms) catch |wait_err| switch (wait_err) {
error.NoPage => {
const status = self.browser.http_client.tick(wait_ms) catch |err| {
log.err(.app, "http tick", .{ .err = err });
return false;
};
return status != .cdp_socket or self.readSocket();
},
else => return wait_err,
};
if (result == .cdp_socket) {
return self.readSocket();
}
return true;
}
// Called from above, in processMessage which handles client messages
// but can also be called internally. For example, Target.sendMessageToTarget
// calls back into dispatch to capture the response.
// Parse-then-dispatch entry point. Used by:
// - CDP.processMessage (tests, and any other caller that hands us
// raw JSON bytes).
// - Target.sendMessageToTarget (a CDP command that wraps another
// CDP message as a string parameter and forwards it through the
// dispatch table).
// The normal Network-thread path doesn't go through here — it parses
// once on the Network thread and reaches dispatchParsed directly via
// onMessage.
pub fn dispatch(self: *CDP, arena: Allocator, sender: Command.Sender, str: []const u8) !void {
const input = json.parseFromSliceLeaky(InputMessage, arena, str, .{
.ignore_unknown_fields = true,
}) catch return error.InvalidJSON;
return self.dispatchParsed(arena, sender, str, input);
}
// Dispatch a pre-parsed CDP message. The caller is responsible for
// keeping `str` and the backing storage for `input`'s string slices
// alive for the duration of the call.
fn dispatchParsed(self: *CDP, arena: Allocator, sender: Command.Sender, str: []const u8, input: InputMessage) !void {
var command = Command{
.input = .{
.json = str,
@@ -1150,8 +1212,11 @@ pub const Command = struct {
};
// When we parse a JSON message from the client, this is the structure
// we always expect
const InputMessage = struct {
// we always expect. Parsed on the Network thread inside
// Connection.handleMessage; the slices reference the raw JSON bytes
// (or arena allocations for fields that needed unescaping). Both
// outlive the InputMessage for the inbox message's lifetime.
pub const InputMessage = struct {
id: ?i64 = null,
method: []const u8,
params: ?InputParams = null,
@@ -1162,7 +1227,7 @@ const InputMessage = struct {
// capture the raw json object (including the opening and closing braces).
// Then, when we're processing the message, and we know what type it is, we
// can parse it (in Disaptch(T).params).
const InputParams = struct {
pub const InputParams = struct {
raw: []const u8,
pub fn jsonParse(

View File

@@ -20,8 +20,12 @@ const std = @import("std");
const lp = @import("lightpanda");
const builtin = @import("builtin");
const CDP = @import("CDP.zig");
const Inbox = @import("../Inbox.zig");
const Config = @import("../Config.zig");
const WS = @import("../network/WS.zig");
const ArenaPool = @import("../ArenaPool.zig");
const log = lp.log;
const posix = std.posix;
@@ -32,6 +36,9 @@ pub const Connection = @This();
pub const State = enum { handshaking, live };
// reference to http_client.inbox
inbox: *Inbox,
arena_pool: *ArenaPool,
socket: posix.socket_t,
socket_flags: usize,
state: State = .handshaking,
@@ -41,9 +48,11 @@ json_version_response: []const u8,
pub fn init(
self: *Connection,
socket: posix.socket_t,
allocator: Allocator,
socket: posix.socket_t,
json_version_response: []const u8,
inbox: *Inbox,
arena_pool: *ArenaPool,
) !void {
const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0);
const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true }));
@@ -52,7 +61,9 @@ pub fn init(
}
self.* = .{
.inbox = inbox,
.socket = socket,
.arena_pool = arena_pool,
.socket_flags = socket_flags,
.reader = try .init(allocator),
.send_arena = ArenaAllocator.init(allocator),
@@ -104,7 +115,7 @@ pub fn send(self: *Connection, data: []const u8) !void {
}
}
fn sendPong(self: *Connection, data: []const u8) !void {
pub fn sendPong(self: *Connection, data: []const u8) !void {
if (data.len == 0) {
return self.send(&WS.EMPTY_PONG);
}
@@ -157,15 +168,15 @@ pub fn handshake(self: *Connection) !bool {
}};
const n = try posix.poll(&pfds, 5000);
if (n == 0) {
log.info(.app, "CDP handshake timeout", .{});
log.info(.cdp, "CDP handshake timeout", .{});
return false;
}
const read_bytes = self.read() catch |err| {
log.warn(.app, "CDP read", .{ .err = err });
log.warn(.cdp, "CDP read", .{ .err = err });
return false;
};
if (read_bytes == 0) {
log.info(.app, "CDP disconnect", .{});
log.info(.cdp, "CDP disconnect", .{});
return false;
}
const result = self.processHttpRequest() catch return false;
@@ -183,17 +194,28 @@ pub fn read(self: *Connection) !usize {
return n;
}
// Append pre-read bytes (from the network thread) to the reader.
// Used post-handshake when the network thread owns socket reads and
// hands bytes back via the HttpClient inbox. Returns BufferTooSmall
// if the reader's free space can't hold this chunk — caller is
// expected to chunk reads to fit (Network reads in 16 KB chunks
// which matches the reader's initial capacity).
pub fn feedBytes(self: *Connection, data: []const u8) !void {
// Append as many bytes as fit into the reader's free space. Returns
// the number of bytes copied. Used post-handshake when the network
// thread owns socket reads.
//
// Why partial: a single network read can carry more bytes than the
// reader's current free space (e.g. one large pending frame plus the
// start of another). The caller is expected to loop:
//
// while (remaining.len > 0) {
// const n = conn.feedBytes(remaining);
// remaining = remaining[n..];
// _ = try conn.processMessages(); // extracts frames + compacts
// // processMessages also grows the reader buffer if it sees a
// // frame header bigger than the current capacity, so the next
// // feedBytes call has somewhere to land.
// }
pub fn feedBytes(self: *Connection, data: []const u8) usize {
const dst = self.reader.readBuf();
if (data.len > dst.len) return error.BufferTooSmall;
@memcpy(dst[0..data.len], data);
self.reader.len += data.len;
const n = @min(data.len, dst.len);
@memcpy(dst[0..n], data[0..n]);
self.reader.len += n;
return n;
}
fn processHttpRequest(self: *Connection) !HttpResult {
@@ -282,30 +304,34 @@ const empty_json_list_response =
"Content-Type: application/json; charset=UTF-8\r\n\r\n" ++
"[]";
pub fn processMessages(self: *Connection, handler: anytype) !bool {
// Framing-only iteration over received bytes. processMessages no
// longer auto-replies pong/close or sends close-on-error — the Network
// thread runs this loop and is read-only on the socket.
//
// Returns false if a close frame was seen (caller should drop the
// link) or the handler asked to stop; true if the loop exited because
// there were no more complete frames buffered.
pub fn processMessages(self: *Connection) !bool {
var reader = &self.reader;
while (true) {
const msg = (reader.next() catch |err| {
if (WS.errorReply(err)) |error_reply| {
self.send(error_reply) catch {};
}
return err;
}) orelse break;
const msg = (try reader.next()) orelse break;
switch (msg.type) {
.pong => {},
.ping => try self.sendPong(msg.data),
.close => {
self.send(&WS.CLOSE_NORMAL) catch {};
return false;
const keep = switch (msg.type) {
.pong => true,
.ping, .text, .binary => try self.handleMessage(msg),
.close => blk: {
_ = try self.handleMessage(msg);
break :blk false;
},
.text, .binary => if (handler.handleMessage(msg.data) == false) {
return false;
},
}
};
if (msg.cleanup_fragment) {
reader.cleanup();
}
if (!keep) {
return false;
}
}
// We might have read part of the next message. Our reader potentially
@@ -314,6 +340,54 @@ pub fn processMessages(self: *Connection, handler: anytype) !bool {
return true;
}
fn handleMessage(self: *Connection, msg: WS.Message) !bool {
switch (msg.type) {
.text, .binary => return self.pushCdp(msg.data),
.ping => {
const arena = try self.arena_pool.acquire(.tiny, "cdp ping");
errdefer self.arena_pool.release(arena);
self.inbox.push(arena, .{ .ping = try arena.dupe(u8, msg.data) });
return true;
},
.close => {
const arena = try self.arena_pool.acquire(.tiny, "cdp close");
self.inbox.push(arena, .close);
return true;
},
.pong => unreachable, // processMessages skips pong
}
}
// Parse a CDP JSON frame on the Network thread and push it onto the
// inbox already-parsed. The consumer's allowlist check works on
// `input.method` directly (no substring matching against raw JSON),
// and the worker doesn't re-parse on dispatch. On parse failure we
// push `.disconnect(error.InvalidJSON)` so the worker tears down —
// treated the same way as a fatal WS framing error.
fn pushCdp(self: *Connection, bytes: []const u8) !bool {
// TODO: is it worth trying to pad this for the cost overhead of parsing?
const arena = try self.arena_pool.acquire(bytes.len, "cdp data");
errdefer self.arena_pool.release(arena);
const raw = try arena.dupe(u8, bytes);
const input = std.json.parseFromSliceLeaky(
CDP.InputMessage,
arena,
raw,
.{ .ignore_unknown_fields = true },
) catch {
self.inbox.push(arena, .{ .disconnect = error.InvalidJSON });
return false;
};
self.inbox.push(arena, .{ .cdp = .{
.raw = raw,
.input = input,
} });
return true;
}
pub fn upgrade(self: *Connection, request: []u8) !void {
// our caller already confirmed that we have a trailing \r\n\r\n
const request_line_end = std.mem.indexOfScalar(u8, request, '\r') orelse unreachable;

View File

@@ -103,7 +103,7 @@ pub fn run(allocator: Allocator, file: []const u8, session: *lp.Session) !void {
try frame.navigate(url, .{});
var runner = try session.runner(.{});
try runner.wait(.{ .ms = 2000 });
try runner.wait(false, .{ .ms = 2000 });
ls.local.eval("testing.assertOk()", "testing.assertOk()") catch |err| {
const caught = try_catch.caughtOrError(allocator, err);

View File

@@ -20,12 +20,10 @@ const std = @import("std");
const lp = @import("lightpanda");
const builtin = @import("builtin");
const log = lp.log;
const net = std.net;
const posix = std.posix;
const Allocator = std.mem.Allocator;
const App = @import("../App.zig");
const Config = @import("../Config.zig");
const CDP = @import("../cdp/CDP.zig");
const libcurl = @import("../sys/libcurl.zig");
const http = @import("http.zig");
@@ -36,7 +34,12 @@ const WebBotAuth = @import("WebBotAuth.zig");
const Cache = @import("cache/Cache.zig");
const FsCache = @import("cache/FsCache.zig");
const App = @import("../App.zig");
const log = lp.log;
const net = std.net;
const posix = std.posix;
const Allocator = std.mem.Allocator;
const DoublyLinkedList = std.DoublyLinkedList;
const Network = @This();
const Listener = struct {
@@ -45,6 +48,34 @@ const Listener = struct {
onAccept: *const fn (ctx: *anyopaque, socket: posix.socket_t) void,
};
// Read side of a CDP WebSocket, registered with the Network thread so
// bytes are read off the socket from here and dispatched into the CDP
// layer via direct method calls on `cdp`. Network never sends on the
// socket — the worker is the sole writer. After registerCdp returns,
// the worker must not call posix.read on this socket directly.
// unregisterCdp is synchronous: it blocks until Network confirms the
// link has been dropped from its poll set and won't touch it again.
pub const CdpLink = struct {
cdp: *CDP,
state: State,
socket: posix.socket_t,
// The worker's HttpClient.Handles (by value — it's one pointer
// wide). Network calls handles.wakeup() to unblock the worker
// from curl_multi_poll whenever it pushes to the worker's inbox.
handles: http.Handles,
node: DoublyLinkedList.Node = .{},
pub const State = enum {
live,
// Worker called unregisterCdp; Network will drop the link on
// its next loop iteration and signal cdp_unregister.
unregistering,
// Network has dropped the link from its poll set. The worker
// can safely free anything the link's callbacks closed over.
removed,
};
};
// Number of fixed pollfds entries (wakeup pipe + listener).
const PSEUDO_POLLFDS = 2;
@@ -53,14 +84,14 @@ const MAX_TICK_CALLBACKS = 16;
allocator: Allocator,
app: *App,
cache: ?Cache,
config: *const Config,
ca_blob: ?http.Blob,
robot_store: RobotStore,
web_bot_auth: ?WebBotAuth,
cache: ?Cache,
connections: []http.Connection,
available: std.DoublyLinkedList = .{},
available: DoublyLinkedList = .{},
conn_mutex: std.Thread.Mutex = .{},
ws_pool: std.heap.MemoryPool(http.Connection),
@@ -82,12 +113,37 @@ shutdown: std.atomic.Value(bool) = .init(false),
// When Network becomes truly shared, it should become a regular field.
multi: ?*libcurl.CurlM = null,
submission_mutex: std.Thread.Mutex = .{},
submission_queue: std.DoublyLinkedList = .{},
submission_queue: DoublyLinkedList = .{},
callbacks: [MAX_TICK_CALLBACKS]TickCallback = undefined,
callbacks_len: usize = 0,
callbacks_mutex: std.Thread.Mutex = .{},
// Registered CDP read endpoints. Producer-side (the worker doing
// register/unregister) and consumer-side (this thread's run loop) are
// serialized by cdp_mutex. cdp_unregister signals when a link
// transitions to .removed so unregisterCdp can return.
cdp_links: DoublyLinkedList = .{},
cdp_mutex: std.Thread.Mutex = .{},
cdp_unregister: std.Thread.Condition = .{},
// Per-iteration snapshot of CdpLinks whose sockets are in pollfds.
// Sized at maxConnections at init time so we never allocate inside
// run(). Parallel to pollfds[cdp_start..cdp_start + cdp_poll_count].
// Persists across iterations; only rebuilt when `cdp_dirty` is set.
cdp_poll_snapshot: []?*CdpLink,
cdp_poll_count: usize = 0,
// Set whenever the cdp_links list changes (register / unregister /
// natural drop). prepareCdpPollFds rebuilds the snapshot only when
// this is true; idle iterations skip the rebuild. Network run() ticks
// hundreds of times per second, and the link set is stable between
// connection lifecycle events, so the steady-state cost of the CDP
// poll prep is one mutex acquire + one bool read.
cdp_dirty: bool = false,
// Location in pollfds where cdp sockets start
cdp_start: usize,
/// Optional IP filter for blocking requests to private/internal networks (--block-private-networks).
ip_filter: ?*IpFilter = null,
@@ -224,10 +280,25 @@ pub fn init(allocator: Allocator, app: *App, config: *const Config) !Network {
const pipe = try posix.pipe2(.{ .NONBLOCK = true, .CLOEXEC = true });
// 0 is wakeup, 1 is listener, rest for curl fds
const pollfds = try allocator.alloc(posix.pollfd, PSEUDO_POLLFDS + config.httpMaxConcurrent());
// IMPORTANT: This is a bit messy, and it exists specifically because
// self.multi is optional. self.multi is optional so that, when telemetry is
// disabled, we don't need the overhead of a multi. If self.multi wasn't
// optional, then we wouldn't need to use posix.poll, we could use
// curl_multi_poll. This is to do in a follow up.
// The structure is: 0 is wakeup, 1 is listener, rest for curl fds:
// [0] wakeup pipe
// [1] listener
// [PSEUDO_POLLFDS .. + httpMaxConcurrent] curl multi fds
// [.. + maxConnections] CDP socket fds
const max_cdp = config.maxConnections();
const pollfds = try allocator.alloc(posix.pollfd, PSEUDO_POLLFDS + config.httpMaxConcurrent() + max_cdp);
errdefer allocator.free(pollfds);
const cdp_poll_snapshot = try allocator.alloc(?*CdpLink, max_cdp);
errdefer allocator.free(cdp_poll_snapshot);
@memset(cdp_poll_snapshot, null);
@memset(pollfds, .{ .fd = -1, .events = 0, .revents = 0 });
pollfds[0] = .{ .fd = pipe[0], .events = posix.POLL.IN, .revents = 0 };
@@ -258,7 +329,7 @@ pub fn init(allocator: Allocator, app: *App, config: *const Config) !Network {
const connections = try allocator.alloc(http.Connection, count);
errdefer allocator.free(connections);
var available: std.DoublyLinkedList = .{};
var available: DoublyLinkedList = .{};
for (0..count) |i| {
connections[i] = try http.Connection.init(ca_blob, config, ip_filter);
available.append(&connections[i].node);
@@ -292,15 +363,17 @@ pub fn init(allocator: Allocator, app: *App, config: *const Config) !Network {
.pollfds = pollfds,
.wakeup_pipe = pipe,
.cdp_poll_snapshot = cdp_poll_snapshot,
.cdp_start = PSEUDO_POLLFDS + config.httpMaxConcurrent(),
.available = available,
.connections = connections,
.app = app,
.cache = cache,
.robot_store = RobotStore.init(allocator),
.web_bot_auth = web_bot_auth,
.cache = cache,
.ws_pool = .init(allocator),
.ws_max = config.wsMaxConcurrent(),
@@ -322,6 +395,7 @@ pub fn deinit(self: *Network) void {
}
self.allocator.free(self.pollfds);
self.allocator.free(self.cdp_poll_snapshot);
if (self.ca_blob) |ca_blob| {
const data: [*]u8 = @ptrCast(ca_blob.data);
@@ -420,6 +494,195 @@ pub fn fireTicks(self: *Network) void {
}
}
// Hand a CDP WebSocket's read side over to the main network thread. The caller
// owns the link and must keep it alive until unregisterCdp is called.
// The caller must not read from the socket.
pub fn registerCdp(self: *Network, link: *CdpLink) void {
self.cdp_mutex.lock();
self.cdp_links.append(&link.node);
self.cdp_dirty = true;
self.cdp_mutex.unlock();
self.wakeupPoll();
}
// Synchronous teardown. Blocks the caller until this thread has
// dropped the link from its poll set and won't invoke any of the
// link's callbacks. Safe to call after Network has already dropped
// the link unsolicited (state == .removed) — returns immediately in
// that case.
pub fn unregisterCdp(self: *Network, link: *CdpLink) void {
self.cdp_mutex.lock();
defer self.cdp_mutex.unlock();
if (link.state == .live) {
link.state = .unregistering;
self.cdp_dirty = true;
self.wakeupPoll();
}
while (link.state != .removed) {
// condition variable, waiting for a signal
self.cdp_unregister.wait(&self.cdp_mutex);
}
}
// Drop a link from the poll set. Caller must hold cdp_mutex.
// - on_disconnect is fired iff `notify` is true. Set notify=false
// when the consumer already knows the link is dead (e.g. close
// frame just went through on_bytes; the .close message in the
// inbox is enough to wake the worker).
// - The worker is woken via curl_multi_wakeup either way.
fn dropCdp(self: *Network, link: *CdpLink, err: ?anyerror, notify: bool) void {
self.cdp_links.remove(&link.node);
link.state = .removed;
self.cdp_dirty = true;
if (notify) {
// notify=true means the worker hasn't been told yet — push the
// disconnect into the inbox and break it out of curl_multi_poll.
// notify=false paths have already woken the worker (close frame
// case) or are about to be unblocked via cdp_unregister.broadcast
// (unregister case); no extra wakeup needed.
link.cdp.onLinkDisconnect(err);
link.handles.wakeup() catch |e| {
lp.log.warn(.cdp, "CDP link wakeup", .{ .err = e });
};
}
}
// Build the CDP portion of pollfds and snapshot the matching *CdpLink
// pointers so we can correlate revents after poll() returns. Called
// before poll, under cdp_mutex.
fn prepareCdpPollFds(self: *Network) void {
const cdp_start = self.cdp_start;
self.cdp_mutex.lock();
defer self.cdp_mutex.unlock();
// Idle fast-path: link set unchanged since last rebuild, so the
// snapshot + pollfds entries from the previous iteration are still
// correct. Kernel will overwrite `revents` in the next poll() call.
if (!self.cdp_dirty) {
return;
}
self.cdp_dirty = false;
@memset(self.pollfds[cdp_start..], .{ .fd = -1, .events = 0, .revents = 0 });
var i: usize = 0;
var it = self.cdp_links.first;
while (it) |node| : (it = node.next) {
lp.assert(i < self.cdp_poll_snapshot.len, "CDP poll snapshot overflow", .{ .i = i, .len = self.cdp_poll_snapshot.len });
const link: *CdpLink = @fieldParentPtr("node", node);
if (link.state != .live) {
// Will be handled in processCdpEvents; don't poll its fd.
continue;
}
self.pollfds[cdp_start + i] = .{
.fd = link.socket,
.events = posix.POLL.IN,
.revents = 0,
};
self.cdp_poll_snapshot[i] = link;
i += 1;
}
self.cdp_poll_count = i;
}
// Per-iteration CDP handling: process pending unregistrations, then
// process revents on each polled link. Called after poll().
fn processCdpEvents(self: *Network) void {
var any_removed = false;
const cdp_start = self.cdp_start;
self.cdp_mutex.lock();
defer self.cdp_mutex.unlock();
// First pass: pending unregister requests.
var it = self.cdp_links.first;
while (it) |node| {
const next = node.next;
const link: *CdpLink = @fieldParentPtr("node", node);
if (link.state == .unregistering) {
self.dropCdp(link, null, false);
any_removed = true;
}
it = next;
}
// Second pass: revents on the snapshot. Skip links the first pass
// (or a prior natural drop) has already removed.
for (self.cdp_poll_snapshot[0..self.cdp_poll_count], 0..) |link_opt, i| {
const link = link_opt orelse continue;
if (link.state != .live) {
continue;
}
const pfd = self.pollfds[cdp_start + i];
if (pfd.revents == 0) {
continue;
}
const fatal_events: i16 = comptime @intCast(posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL);
if (pfd.revents & fatal_events != 0) {
self.dropCdp(link, null, true);
any_removed = true;
continue;
}
if (pfd.revents & posix.POLL.IN == 0) {
continue;
}
var buf: [16 * 1024]u8 = undefined;
const n = posix.read(link.socket, &buf) catch |err| switch (err) {
error.WouldBlock => continue,
else => {
lp.log.warn(.cdp, "CDP read", .{ .err = err });
self.dropCdp(link, err, true);
any_removed = true;
continue;
},
};
if (n == 0) {
// peer EOF
self.dropCdp(link, null, true);
any_removed = true;
continue;
}
const keep = link.cdp.onData(buf[0..n]) catch |err| {
// Fatal frame/feed error. Whatever messages on_bytes
// managed to push are still in the inbox; the failing
// frame was NOT pushed, and the worker has no way to
// know it should exit. Drop with notify=true so
// on_disconnect surfaces a .disconnect into the inbox.
// dropCdp wakes the worker.
lp.log.warn(.cdp, "CDP onData", .{ .err = err });
self.dropCdp(link, err, true);
any_removed = true;
continue;
};
// on_bytes succeeded — wake the worker so it observes anything
// new in the inbox (data / ping / close).
link.handles.wakeup() catch |err| {
lp.log.warn(.cdp, "CDP link wakeup", .{ .err = err });
};
if (!keep) {
// Close frame: the handler already pushed .close. Worker's
// drainInbox will call on_disconnect itself after replying,
// so we drop without re-notifying.
self.dropCdp(link, null, false);
any_removed = true;
}
}
if (any_removed) {
self.cdp_unregister.broadcast();
}
}
pub fn run(self: *Network) void {
var drain_buf: [64]u8 = undefined;
var running_handles: c_int = 0;
@@ -450,6 +713,8 @@ pub fn run(self: *Network) void {
self.preparePollFds(multi);
}
self.prepareCdpPollFds();
// for ontick to work, you need to wake up periodically
const timeout = blk: {
const min_timeout = 250; // 250ms
@@ -491,6 +756,8 @@ pub fn run(self: *Network) void {
self.processCompletions(multi);
}
self.processCdpEvents();
self.fireTicks();
if (self.shutdown.load(.acquire) and running_handles == 0) {

View File

@@ -309,17 +309,26 @@ pub fn Reader(comptime EXPECT_MASK: bool, MAX_MESSAGE_SIZE: usize) type {
};
}
pub fn errorReply(err: NextError) ?[]const u8 {
// Map a reader error (or any error that flowed up out of one) to the
// matching server→client close frame. Takes anyerror so callers that
// hold the error in a wider type (e.g. ?anyerror across an inbox)
// don't need to narrow it first; unrecognized errors return null.
pub fn errorReply(err: anyerror) ?[]const u8 {
return switch (err) {
error.TooLarge => &CLOSE_TOO_BIG,
error.Masked => &CLOSE_PROTOCOL_ERROR,
error.NotMasked => &CLOSE_PROTOCOL_ERROR,
error.ReservedFlags => &CLOSE_PROTOCOL_ERROR,
error.InvalidMessageType => &CLOSE_PROTOCOL_ERROR,
error.ControlTooLarge => &CLOSE_PROTOCOL_ERROR,
error.InvalidContinuation => &CLOSE_PROTOCOL_ERROR,
error.NestedFragmentation => &CLOSE_PROTOCOL_ERROR,
error.OutOfMemory => null,
error.Masked,
error.NotMasked,
error.ReservedFlags,
error.InvalidMessageType,
error.ControlTooLarge,
error.InvalidContinuation,
error.NestedFragmentation,
// Strictly an application-level (CDP) error, but 1002
// "protocol error" is the closest fit and gives the peer a
// cleaner signal than a bare TCP FIN.
error.InvalidJSON,
=> &CLOSE_PROTOCOL_ERROR,
else => null,
};
}

View File

@@ -625,6 +625,13 @@ pub const Handles = struct {
try libcurl.curl_multi_poll(self.multi, extra_fds, timeout_ms, null);
}
// Thread-safe wake of a poll() in progress on this multi. Used by
// the Network thread to nudge the worker out of curl_multi_poll
// when it pushes work onto the worker's inbox.
pub fn wakeup(self: *Handles) !void {
try libcurl.curl_multi_wakeup(self.multi);
}
pub const MultiMessage = struct {
conn: *Connection,
err: ?Error,

View File

@@ -862,6 +862,15 @@ pub fn curl_multi_poll(
try errorMCheck(c.curl_multi_poll(multi, raw_fds, @intCast(extra_fds.len), timeout_ms, numfds));
}
// Thread-safe wakeup for a curl_multi_poll sleeping on another thread.
// Per libcurl docs: "This function can be called from any thread, and it
// wakes up a sleeping curl_multi_poll call that is currently (or will be)
// waiting." Safe to call concurrently with curl_multi_perform on the same
// handle — the wake byte is delivered via an internal self-pipe.
pub fn curl_multi_wakeup(multi: *CurlM) ErrorMulti!void {
try errorMCheck(c.curl_multi_wakeup(multi));
}
pub fn curl_multi_waitfds(multi: *CurlM, ufds: []CurlWaitFd, fd_count: *c_uint) ErrorMulti!void {
const raw_fds: [*c]c.curl_waitfd = if (ufds.len == 0) null else @ptrCast(ufds.ptr);
try errorMCheck(c.curl_multi_waitfds(multi, raw_fds, @intCast(ufds.len), fd_count));