From 875c1477837d12e1f8a083eaad50fb0df9020fe4 Mon Sep 17 00:00:00 2001 From: Karl Seguin Date: Tue, 19 May 2026 13:58:48 +0800 Subject: [PATCH] Main/Network reads CDP socket Previously, the CDP socket was added to the worker's multi and fully owned by the worker. While this is simple, it introduced some issues: 1 - Cannot detect a disconnected client during JS processing ( for(;;) ) 2 - A blocked worker can cause back-pressure that blocks the client. This can cause a deadlock if the worker is blocked waiting for a CDP message In addition to these 2 problems, there was 1 other serious CDP-related issue: arbitrary CDP messages could be processed during JavaScript callback. For example, a Worker calls importScripts while request interception is enabled, this requires us to tick the HttpClient waiting for the interception response. But, a client could sent Target.closeTarget, which we'd process and delete the frame..all while importScripts is still blocked. Assuming importScripts unblocks everything is a big UAF since the frame (and its workers) were cleared from closeTarget. The CDP socket is now read from the network (main) thread and an OTP-style mailbox is used. The network thread posts message to the Worker's inbox and signals it to wakeup. This solves #1 and #2. It doesn't directly solve the reentrancy issue, but it provides the foundation. Specifically, in introduces a queue for of CDP message and more control over when/how that queue is processed. At "safe points" (Runner.tick, HttpClient.tick), any message can be processed. But, when inside a JavaScript callback, we can process only non- destructive/mutating message. Specifically, we can process only messages related to request interception. --- src/Inbox.zig | 322 ++++++++++++++++++++++++++++++ src/Server.zig | 114 ++++++----- src/browser/Browser.zig | 5 +- src/browser/HttpClient.zig | 268 +++++++++++++++++++------ src/browser/Runner.zig | 92 ++++----- src/browser/ScriptManagerBase.zig | 2 +- src/cdp/CDP.zig | 235 ++++++++++++++-------- src/cdp/Connection.zig | 138 ++++++++++--- src/main_legacy_test.zig | 2 +- src/network/Network.zig | 293 +++++++++++++++++++++++++-- src/network/WS.zig | 27 ++- src/network/http.zig | 7 + src/sys/libcurl.zig | 9 + 13 files changed, 1207 insertions(+), 307 deletions(-) create mode 100644 src/Inbox.zig diff --git a/src/Inbox.zig b/src/Inbox.zig new file mode 100644 index 00000000..ce9c7d78 --- /dev/null +++ b/src/Inbox.zig @@ -0,0 +1,322 @@ +// Copyright (C) 2023-2026 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +// Thread-safe FIFO of Messages. Producer pushes from one thread, +// consumer pops from another. No wake mechanism is bundled — callers +// arrange that themselves (e.g. curl_multi_wakeup on the consumer's +// curl multi handle). +// +// Backed by a DoublyLinkedList so that pop is O(1) and the +// allowlist-during-sync-wait drain can cherry-pick messages out of +// the middle in O(1) given a node pointer. + +const std = @import("std"); + +const CDP = @import("cdp/CDP.zig"); + +const ArenaPool = @import("ArenaPool.zig"); + +const Allocator = std.mem.Allocator; +const DoublyLinkedList = std.DoublyLinkedList; + +const Inbox = @This(); + +mutex: std.Thread.Mutex = .{}, +queue: DoublyLinkedList = .{}, + +pub fn deinit(self: *Inbox, arena_pool: *ArenaPool) void { + self.mutex.lock(); + defer self.mutex.unlock(); + while (self.queue.popFirst()) |node| { + const msg: *Message = @fieldParentPtr("node", node); + msg.deinit(arena_pool); + } +} + +pub fn push(self: *Inbox, arena: Allocator, payload: Message.Payload) void { + const msg = arena.create(Message) catch |err| switch (err) { + error.OutOfMemory => @panic("OOM"), + }; + + msg.* = .{ .payload = payload, .arena = arena }; + self.mutex.lock(); + defer self.mutex.unlock(); + self.queue.append(&msg.node); +} + +pub fn pop(self: *Inbox) ?*Message { + self.mutex.lock(); + defer self.mutex.unlock(); + const node = self.queue.popFirst() orelse return null; + return @fieldParentPtr("node", node); +} + +// Cherry-pick the first message for which `predicate(msg)` returns +// true, removing it from the queue. Walks the queue in FIFO order; +// non-matching messages stay in place. Used to dispatch only the +// safe subset of messages during sync-wait paths (the allowlist), +// while leaving unsafe ones to be drained at the next safe point. +pub fn popIf(self: *Inbox, predicate: *const fn (*Message) bool) ?*Message { + self.mutex.lock(); + defer self.mutex.unlock(); + var it = self.queue.first; + while (it) |node| : (it = node.next) { + const msg: *Message = @fieldParentPtr("node", node); + if (predicate(msg)) { + self.queue.remove(node); + return msg; + } + } + return null; +} + +pub const Message = struct { + arena: Allocator, + payload: Payload, + node: DoublyLinkedList.Node = .{}, + + pub const Payload = union(enum) { + // A CDP text/binary frame, parsed on the Network thread. `raw` + // is the original JSON bytes (owned). `arena` holds any + // auxiliary allocations from parseFromSliceLeaky (typically + // empty for unescaped messages, but slices in `input` may + // reference it). `input` is the parsed view; its string + // slices reference `raw` or `arena`. Both must outlive the + // consumer's use of `input`. + cdp: Cdp, + + // WS ping frame body (≤125 bytes per spec). Consumer is + // expected to echo via pong on its thread. + ping: []u8, + + // Peer-initiated close frame. Consumer is expected to send a + // close reply and tear the connection down. The peer's close + // body is dropped — historically we always reply CLOSE_NORMAL + // (status 1000) regardless of what the peer sent. + close: void, + + // No allocation; conveys "no more messages will arrive on + // this inbox" plus an optional reason. The Network thread + // pushes this on peer EOF, fatal WS framing error, or + // (now) JSON parse failure. + disconnect: ?anyerror, + }; + + pub const Cdp = struct { + raw: []u8, + input: CDP.InputMessage, + }; + + pub fn deinit(self: *const Message, pool: *ArenaPool) void { + pool.release(self.arena); + } +}; + +const testing = @import("testing.zig"); +test "Inbox: push then pop returns FIFO order" { + const arena_pool = &testing.test_app.arena_pool; + + var inbox = Inbox{}; + defer inbox.deinit(&testing.test_app.arena_pool); + + { + const arena = try arena_pool.acquire(.tiny, "inbox test"); + inbox.push(arena, .{ .ping = try arena.dupe(u8, "first") }); + } + + { + const arena = try arena_pool.acquire(.tiny, "inbox test"); + inbox.push(arena, .{ .ping = try arena.dupe(u8, "second") }); + } + + { + const arena = try arena_pool.acquire(.tiny, "inbox test"); + inbox.push(arena, .{ .disconnect = null }); + } + + { + const m = inbox.pop().?; + defer m.deinit(arena_pool); + try testing.expectEqual("first", m.payload.ping); + } + { + const m = inbox.pop().?; + defer m.deinit(arena_pool); + try testing.expectEqual("second", m.payload.ping); + } + { + const m = inbox.pop().?; + defer m.deinit(arena_pool); + try testing.expectEqual(@as(?anyerror, null), m.payload.disconnect); + } + try testing.expect(inbox.pop() == null); +} + +test "Inbox: deinit frees remaining items" { + const arena_pool = &testing.test_app.arena_pool; + + var inbox = Inbox{}; + { + const arena = try arena_pool.acquire(.tiny, "inbox test"); + inbox.push(arena, .{ .ping = try arena.dupe(u8, "leftover") }); + } + { + const arena = try arena_pool.acquire(.tiny, "inbox test"); + inbox.push(arena, .{ .disconnect = error.PeerClosed }); + } + + inbox.deinit(&testing.test_app.arena_pool); + // Memory leaks would be caught by the test runner. +} + +fn testAlwaysTrue(_: *Message) bool { + return true; +} + +fn testAlwaysFalse(_: *Message) bool { + return false; +} + +fn testIsPing(msg: *Message) bool { + return msg.payload == .ping; +} + +test "Inbox: popIf on empty queue returns null" { + var inbox = Inbox{}; + defer inbox.deinit(&testing.test_app.arena_pool); + try testing.expect(inbox.popIf(testAlwaysTrue) == null); +} + +test "Inbox: popIf with no match leaves queue intact" { + const arena_pool = &testing.test_app.arena_pool; + var inbox = Inbox{}; + defer inbox.deinit(arena_pool); + + { + const arena = try arena_pool.acquire(.tiny, "popif test"); + inbox.push(arena, .{ .ping = try arena.dupe(u8, "first") }); + } + { + const arena = try arena_pool.acquire(.tiny, "popif test"); + inbox.push(arena, .{ .ping = try arena.dupe(u8, "second") }); + } + + try testing.expect(inbox.popIf(testAlwaysFalse) == null); + + // Original FIFO order preserved. + { + const m = inbox.pop().?; + defer m.deinit(arena_pool); + try testing.expectEqual("first", m.payload.ping); + } + { + const m = inbox.pop().?; + defer m.deinit(arena_pool); + try testing.expectEqual("second", m.payload.ping); + } + try testing.expect(inbox.pop() == null); +} + +test "Inbox: popIf with always-true predicate behaves like pop" { + const arena_pool = &testing.test_app.arena_pool; + var inbox = Inbox{}; + defer inbox.deinit(arena_pool); + + { + const arena = try arena_pool.acquire(.tiny, "popif test"); + inbox.push(arena, .{ .ping = try arena.dupe(u8, "a") }); + } + { + const arena = try arena_pool.acquire(.tiny, "popif test"); + inbox.push(arena, .{ .ping = try arena.dupe(u8, "b") }); + } + + { + const m = inbox.popIf(testAlwaysTrue).?; + defer m.deinit(arena_pool); + try testing.expectEqual("a", m.payload.ping); + } + { + const m = inbox.popIf(testAlwaysTrue).?; + defer m.deinit(arena_pool); + try testing.expectEqual("b", m.payload.ping); + } + try testing.expect(inbox.popIf(testAlwaysTrue) == null); +} + +test "Inbox: popIf cherry-picks middle, preserves order of remainder" { + const arena_pool = &testing.test_app.arena_pool; + var inbox = Inbox{}; + defer inbox.deinit(arena_pool); + + { + const arena = try arena_pool.acquire(.tiny, "popif test"); + inbox.push(arena, .{ .disconnect = null }); + } + { + const arena = try arena_pool.acquire(.tiny, "popif test"); + inbox.push(arena, .{ .ping = try arena.dupe(u8, "middle") }); + } + { + const arena = try arena_pool.acquire(.tiny, "popif test"); + inbox.push(arena, .{ .disconnect = error.PeerClosed }); + } + + // testIsPing skips the disconnect at the head and picks the middle. + { + const m = inbox.popIf(testIsPing).?; + defer m.deinit(arena_pool); + try testing.expectEqual("middle", m.payload.ping); + } + + // Remaining two disconnects pop in original order. + { + const m = inbox.pop().?; + defer m.deinit(arena_pool); + try testing.expect(m.payload.disconnect == null); + } + { + const m = inbox.pop().?; + defer m.deinit(arena_pool); + try testing.expect(m.payload.disconnect.? == error.PeerClosed); + } + try testing.expect(inbox.pop() == null); +} + +test "Inbox: popIf picks first match in FIFO order" { + const arena_pool = &testing.test_app.arena_pool; + var inbox = Inbox{}; + defer inbox.deinit(arena_pool); + + { + const arena = try arena_pool.acquire(.tiny, "popif test"); + inbox.push(arena, .{ .ping = try arena.dupe(u8, "first") }); + } + { + const arena = try arena_pool.acquire(.tiny, "popif test"); + inbox.push(arena, .{ .disconnect = null }); + } + { + const arena = try arena_pool.acquire(.tiny, "popif test"); + inbox.push(arena, .{ .ping = try arena.dupe(u8, "second") }); + } + + const m = inbox.popIf(testIsPing).?; + defer m.deinit(arena_pool); + try testing.expectEqual("first", m.payload.ping); +} diff --git a/src/Server.zig b/src/Server.zig index 66ab5477..0c2d7a85 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -72,7 +72,9 @@ pub fn shutdown(self: *Server) void { for (self.cdps.items) |cdp| { if (cdp.conn.state == .live) { cdp.browser.env.terminate(); - cdp.conn.sendClose(); + // We use to send a nice WS close frame here but (a) it isn't + // strictly required and (b) we'd have to protect against an interleaved + // write from the worker thread. } cdp.conn.shutdown(); } @@ -166,13 +168,16 @@ fn handleConnection(self: *Server, socket: posix.socket_t) void { defer _ = self.active_threads.fetchSub(1, .monotonic); defer posix.close(socket); - // CDP is HUGE (> 512KB) because Connection has a large read buffer. - // V8 crashes if this is on the stack (likely related to its size). - const cdp = self.allocCDP() catch |err| { - log.err(.app, "CDP alloc", .{ .err = err }); - return; + const cdp = blk: { + self.cdp_mutex.lock(); + defer self.cdp_mutex.unlock(); + break :blk self.cdp_pool.create() catch @panic("OOM"); }; - defer self.releaseCDP(cdp); + defer { + self.cdp_mutex.lock(); + defer self.cdp_mutex.unlock(); + self.cdp_pool.destroy(cdp); + } cdp.init(self.app, socket, self.json_version_response) catch |err| { log.err(.app, "CDP init", .{ .err = err }); @@ -185,18 +190,63 @@ fn handleConnection(self: *Server, socket: posix.socket_t) void { log.info(.app, "client connected", .{ .ip = client_address }); } - self.registerCDP(cdp); - defer self.unregisterCDP(cdp); + { + // track the connection + self.cdp_mutex.lock(); + defer self.cdp_mutex.unlock(); + self.cdps.append(self.app.allocator, cdp) catch {}; + } + + defer { + // untrack the connection + self.cdp_mutex.lock(); + defer self.cdp_mutex.unlock(); + for (self.cdps.items, 0..) |c, i| { + if (c == cdp) { + _ = self.cdps.swapRemove(i); + break; + } + } + } const upgraded = cdp.conn.handshake() catch |err| { log.err(.app, "CDP handshake", .{ .err = err }); return; }; + if (!upgraded) { return; } - self.markLive(cdp); + { + // Transition from .handshake state to .live + // Lock needed even though the main thread hasn't seen this yet because + // shutdown could access this from the sighandler thread. + self.cdp_mutex.lock(); + defer self.cdp_mutex.unlock(); + cdp.conn.state = .live; + } + + // Hand the read side of the CDP socket over to the Network thread. + // From here until the matching unregisterCdp, the worker must NOT + // read from the socket directly — bytes arrive via the inbox. + // unregisterCdp is synchronous, so by the time it returns Network + // is guaranteed to be done with this link. + // + // cdp_link_active gates HttpClient.perform's block in + // curl_multi_poll: with it false (tests, pre-handshake), perform + // skips the poll when there's no in-flight curl work — sleeping + // would just eat the timeout waiting for a wakeup that won't + // come. We set it true *after* registerCdp so Network is already + // accepting wakeups by the time the worker might poll, and clear + // it *after* unregisterCdp returns (Network is guaranteed done + // with us by then). + self.app.network.registerCdp(&cdp.link); + cdp.browser.http_client.cdp_link_active = true; + defer { + self.app.network.unregisterCdp(&cdp.link); + cdp.browser.http_client.cdp_link_active = false; + } // Check shutdown after markLive so that a concurrent shutdown either // sees us as .live and terminates us, or we observe the stop signal @@ -214,44 +264,6 @@ fn handleConnection(self: *Server, socket: posix.socket_t) void { } } -fn allocCDP(self: *Server) !*CDP { - self.cdp_mutex.lock(); - defer self.cdp_mutex.unlock(); - return self.cdp_pool.create(); -} - -fn releaseCDP(self: *Server, cdp: *CDP) void { - self.cdp_mutex.lock(); - defer self.cdp_mutex.unlock(); - self.cdp_pool.destroy(cdp); -} - -fn registerCDP(self: *Server, cdp: *CDP) void { - self.cdp_mutex.lock(); - defer self.cdp_mutex.unlock(); - self.cdps.append(self.app.allocator, cdp) catch {}; -} - -fn unregisterCDP(self: *Server, cdp: *CDP) void { - self.cdp_mutex.lock(); - defer self.cdp_mutex.unlock(); - for (self.cdps.items, 0..) |c, i| { - if (c == cdp) { - _ = self.cdps.swapRemove(i); - break; - } - } -} - -fn markLive(self: *Server, cdp: *CDP) void { - self.cdp_mutex.lock(); - defer self.cdp_mutex.unlock(); - cdp.conn.state = .live; -} - -// Utils -// -------- - fn buildJSONVersionResponse(app: *const App, port: u16) ![]const u8 { const host = app.config.advertiseHost(); if (std.mem.eql(u8, host, "0.0.0.0")) { @@ -283,9 +295,6 @@ fn buildJSONVersionResponse(app: *const App, port: u16) ![]const u8 { return try std.fmt.allocPrint(app.allocator, response_format, .{ body_len, host, port }); } -pub const timestamp = @import("datetime.zig").timestamp; -pub const milliTimestamp = @import("datetime.zig").milliTimestamp; - const testing = @import("testing.zig"); test "server: buildJSONVersionResponse" { const res = try buildJSONVersionResponse(testing.test_app, testing.test_app.config.port()); @@ -384,6 +393,9 @@ test "Client: http valid handshake" { } test "Client: read invalid websocket message" { + const filter: testing.LogFilter = .init(&.{.cdp}); + defer filter.deinit(); + // 131 = 128 (fin) | 3 where 3 isn't a valid type try assertWebSocketError( 1002, diff --git a/src/browser/Browser.zig b/src/browser/Browser.zig index 99f6fa1c..cf14fb8f 100644 --- a/src/browser/Browser.zig +++ b/src/browser/Browser.zig @@ -23,6 +23,7 @@ const Allocator = std.mem.Allocator; const js = @import("js/js.zig"); const App = @import("../App.zig"); const HttpClient = @import("HttpClient.zig"); +const CDP = @import("../cdp/CDP.zig"); const ArenaPool = App.ArenaPool; @@ -69,7 +70,7 @@ pub fn nextFrameId(self: *Browser) u32 { return id; } -pub fn init(self: *Browser, app: *App, opts: InitOpts, cdp_client: ?HttpClient.CDPClient) !void { +pub fn init(self: *Browser, app: *App, opts: InitOpts, cdp: ?*CDP) !void { const allocator = app.allocator; var env = try js.Env.init(app, opts.env); @@ -84,7 +85,7 @@ pub fn init(self: *Browser, app: *App, opts: InitOpts, cdp_client: ?HttpClient.C .http_client = undefined, .page_pool = std.heap.MemoryPool(Page).init(allocator), }; - try self.http_client.init(allocator, &app.network, cdp_client); + try self.http_client.init(allocator, &app.network, cdp); } pub fn deinit(self: *Browser) void { diff --git a/src/browser/HttpClient.zig b/src/browser/HttpClient.zig index de2b75fb..76eab06a 100644 --- a/src/browser/HttpClient.zig +++ b/src/browser/HttpClient.zig @@ -31,6 +31,8 @@ const http = @import("../network/http.zig"); const Robots = @import("../network/Robots.zig"); const Network = @import("../network/Network.zig"); +const CDP = @import("../cdp/CDP.zig"); +const Inbox = @import("../Inbox.zig"); const CachedResponse = @import("../network/cache/Cache.zig").CachedResponse; pub const CacheLayer = @import("../network/layer/CacheLayer.zig"); @@ -127,7 +129,29 @@ obey_robots: bool, user_agent_override: ?[:0]const u8 = null, user_agent_header_override: ?[:0]const u8 = null, -cdp_client: ?CDPClient = null, +// The CDP layer we dispatch inbox messages to. Set in CDP.init for +// `serve` mode; null in all other modes. Since this is set early, BEFORE the +// CDP socket is registered with the network thread, we also have the +// `cdp_link_active` boolean. +cdp: ?*CDP = null, + +// True iff a producer (Server.handleConnection, after the worker +// handshake completes) has registered the CDP socket with the Network +// thread and Network will fire curl_multi_wakeup on our multi handle +// when it pushes to the inbox. perform uses this — NOT `cdp != null` +// — to decide whether to block in poll without any in-flight curl +// work. cdp is set in CDP.init, well before the link is wired; tests +// and the pre-handshake window have a cdp but no producer, so polling +// there would just eat the timeout waiting for a wakeup that's never +// coming. +cdp_link_active: bool = false, + +// CDP messages parsed off the WS socket by the Network thread land +// here. perform drains the inbox at each safe point and dispatches +// via cdp.onMessage / onPing / onClose / onDisconnect. Always present +// even in non-CDP mode — the empty-queue drain is one mutex lock plus +// a linked-list head check, cheaper than nullability everywhere. +inbox: Inbox, max_response_size: usize, @@ -155,25 +179,7 @@ fn layerWith(self: anytype, next: Layer) Layer { return self.layer(); } -// libcurl can monitor arbitrary sockets, this lets us use libcurl to poll -// both HTTP data as well as messages from an CDP connection. -// Furthermore, we have some tension between blocking scripts and request -// interception. For non-blocking scripts, because nothing blocks, we can -// just queue the scripts until we receive a response to the interception -// notification. But for blocking scripts (which block the parser), it's hard -// to return control back to the CDP loop. So the `read` function pointer is -// used by the Client to have the CDP client read more data from the socket, -// specifically when we're waiting for a request interception response to -// a blocking script. -pub const CDPClient = struct { - ctx: *anyopaque, - socket: posix.socket_t, - blocking_read_start: *const fn (*anyopaque) bool, - blocking_read: *const fn (*anyopaque) bool, - blocking_read_end: *const fn (*anyopaque) bool, -}; - -pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp_client: ?CDPClient) !void { +pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp: ?*CDP) !void { var handles = try http.Handles.init(network.config); errdefer handles.deinit(); @@ -183,7 +189,8 @@ pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp_client: .handles = handles, .network = network, .allocator = allocator, - .cdp_client = cdp_client, + .cdp = cdp, + .inbox = .{}, .use_proxy = http_proxy != null, .http_proxy = http_proxy, @@ -228,6 +235,7 @@ pub fn deinit(self: *Client) void { self.robots_layer.deinit(self.allocator); self.transfers.deinit(self.allocator); + self.inbox.deinit(self.arena_pool); } // Look up a live transfer by its id. Returns null if the transfer has been @@ -379,14 +387,29 @@ pub fn abortRequests(_: *Client, owner: *Owner) void { // owner itself is freed, no orphan transfer points at it. } -pub fn tick(self: *Client, timeout_ms: u32) !PerformStatus { +// What CDP messages drainInbox is allowed to dispatch this tick. +// .all — outer event loop (Runner.tick). Safe to dispatch +// everything; the JS stack is empty. +// .sync_wait — reachable from inside a JS callback (syncRequest, +// waitForImport). The JS callstack above us holds +// refs to page / session / V8 state; dispatching a +// command that frees that state would UAF on unwind. +// Cherry-pick only Fetch interception responses +const DrainMode = enum { all, sync_wait }; + +pub fn tick(self: *Client, timeout_ms: u32, mode: DrainMode) !void { try self.drainQueue(); - const status = try self.perform(@intCast(timeout_ms)); + try self.perform(@intCast(timeout_ms)); // perform/processMessages just released a batch of connections back to // the pool. Drain again so queued transfers can use them this tick // instead of waiting for the next runner iteration. try self.drainQueue(); - return status; + // Dispatch CDP messages here, not inside perform: perform recurses + // via processOneMessage's redirect path (perform → processMessages + // → processOneMessage → perform), and dispatching CDP from that + // nested call would fire CDP handlers mid-redirect, defeating the + // "safe points only" guarantee. + try self.drainInbox(mode); } fn drainQueue(self: *Client) !void { @@ -526,17 +549,7 @@ pub fn syncRequest(self: *Client, allocator: Allocator, req: Request) !SyncRespo try self.request(r, null); while (sync_ctx.completion == .in_progress) { - const status = try self.tick(200); - log.debug(.http, "sync request tick", .{ .status = status }); - switch (status) { - .cdp_socket => { - const cdp = self.cdp_client.?; - if (cdp.blocking_read(cdp.ctx) == false) { - return error.ClientDisconnected; - } - }, - .normal => continue, - } + try self.tick(200, .sync_wait); } switch (sync_ctx.completion) { @@ -631,12 +644,7 @@ fn makeRequest(self: *Client, conn: *http.Connection, transfer: *Transfer) anyer _ = try self.handles.perform(); } -pub const PerformStatus = enum { - cdp_socket, - normal, -}; - -fn perform(self: *Client, timeout_ms: c_int) anyerror!PerformStatus { +fn perform(self: *Client, timeout_ms: c_int) anyerror!void { const running = blk: { self.performing = true; defer self.performing = false; @@ -659,29 +667,82 @@ fn perform(self: *Client, timeout_ms: c_int) anyerror!PerformStatus { try self.trackConn(conn); } - // We're potentially going to block for a while until we get data. Process - // whatever messages we have waiting ahead of time. - if (try self.processMessages()) { - return .normal; - } + // Process completions queued from the curl_multi_perform above before + // we potentially block. + _ = try self.processMessages(); - var status = PerformStatus.normal; - if (self.cdp_client) |cdp_client| { - var wait_fds = [_]http.WaitFd{.{ - .fd = cdp_client.socket, - .events = .{ .pollin = true }, - .revents = .{}, - }}; - try self.handles.poll(&wait_fds, timeout_ms); - if (wait_fds[0].revents.pollin or wait_fds[0].revents.pollpri or wait_fds[0].revents.pollout) { - status = .cdp_socket; - } - } else if (running > 0) { + // Poll for HTTP I/O. The Network thread will call curl_multi_wakeup + // on our multi handle whenever it pushes to our inbox, so we drop + // out of poll promptly even when we have no curl handles in flight + // — but ONLY if a producer is actually wired up. `cdp_link_active` + // is set by Server.handleConnection once network.registerCdp has + // returned; in tests (which never register) and during the + // pre-handshake window the flag stays false and we don't waste a + // poll timeout waiting for a wakeup that won't arrive. + if (running > 0 or self.cdp_link_active) { + // when cdp_link_active == true, the network thread will unblock this + // by calling wakup on our multi. try self.handles.poll(&.{}, timeout_ms); } _ = try self.processMessages(); - return status; +} + +// Drain any CDP messages the Network thread pushed into our inbox +// and dispatch them via the cdp_client callbacks. Returns +// error.ClientDisconnected if the inbox surfaced a disconnect message, +// so the worker loop can tear down the connection. Called from tick +// only — NOT from perform, because perform recurses through +// processOneMessage's redirect path. +fn drainInbox(self: *Client, mode: DrainMode) !void { + const cdp = self.cdp orelse return; + while (true) { + const msg = switch (mode) { + .all => self.inbox.pop(), + .sync_wait => self.inbox.popIf(allowDuringSyncWait), + } orelse return; + + defer msg.deinit(self.arena_pool); + + switch (msg.payload) { + .cdp => |*c| cdp.onMessage(c) catch |err| { + // A single malformed/failed dispatch shouldn't poison + // the rest of the batch — log and continue. + log.warn(.cdp, "CDP dispatch", .{ .err = err }); + }, + .ping => |body| cdp.onPing(body), + .close => { + cdp.onClose(); + cdp.onDisconnect(null); + return error.ClientDisconnected; + }, + .disconnect => |err| { + cdp.onDisconnect(err); + return error.ClientDisconnected; + }, + } + } +} + +// Predicate for Inbox.popIf during sync_wait drains. Always allows +// ping/close/disconnect (control frames must be observed). CDP data +// messages are filtered: only the four Fetch interception methods +// are safe to dispatch from inside a JS callback (they mutate +// transfer state via InterceptionLayer; they don't touch page / +// session / V8 state). The check is exact on the parsed `method` +// field — no substring matching against raw JSON. +fn allowDuringSyncWait(msg: *Inbox.Message) bool { + return switch (msg.payload) { + .ping, .close, .disconnect => true, + .cdp => |c| isFetchInterceptionMethod(c.input.method), + }; +} + +fn isFetchInterceptionMethod(method: []const u8) bool { + return std.mem.eql(u8, method, "Fetch.continueRequest") or + std.mem.eql(u8, method, "Fetch.failRequest") or + std.mem.eql(u8, method, "Fetch.fulfillRequest") or + std.mem.eql(u8, method, "Fetch.continueWithAuth"); } fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *Transfer) !bool { @@ -1670,3 +1731,92 @@ pub const Owner = struct { self.websockets.remove(&ws._owner_node); } }; + +const testing = @import("../testing.zig"); + +test "HttpClient: isFetchInterceptionMethod matches the four Fetch methods" { + try testing.expect(isFetchInterceptionMethod("Fetch.continueRequest")); + try testing.expect(isFetchInterceptionMethod("Fetch.failRequest")); + try testing.expect(isFetchInterceptionMethod("Fetch.fulfillRequest")); + try testing.expect(isFetchInterceptionMethod("Fetch.continueWithAuth")); +} + +test "HttpClient: isFetchInterceptionMethod rejects unrelated methods" { + try testing.expect(!isFetchInterceptionMethod("")); + try testing.expect(!isFetchInterceptionMethod("Fetch.enable")); + try testing.expect(!isFetchInterceptionMethod("Fetch.disable")); + try testing.expect(!isFetchInterceptionMethod("Page.navigate")); + try testing.expect(!isFetchInterceptionMethod("Runtime.evaluate")); + // strict-equality check: a prefix of a valid name must not match + try testing.expect(!isFetchInterceptionMethod("Fetch.continueReq")); + // trailing space, etc. + try testing.expect(!isFetchInterceptionMethod("Fetch.continueRequest ")); +} + +test "HttpClient: allowDuringSyncWait allows ping/close/disconnect" { + var ping_msg = Inbox.Message{ + .arena = testing.allocator, + .payload = .{ .ping = "" }, + }; + try testing.expect(allowDuringSyncWait(&ping_msg)); + + var close_msg = Inbox.Message{ + .arena = testing.allocator, + .payload = .close, + }; + try testing.expect(allowDuringSyncWait(&close_msg)); + + var disconnect_msg = Inbox.Message{ + .arena = testing.allocator, + .payload = .{ .disconnect = null }, + }; + try testing.expect(allowDuringSyncWait(&disconnect_msg)); + + var disconnect_err_msg = Inbox.Message{ + .arena = testing.allocator, + .payload = .{ .disconnect = error.PeerClosed }, + }; + try testing.expect(allowDuringSyncWait(&disconnect_err_msg)); +} + +test "HttpClient: allowDuringSyncWait allows only Fetch interception CDP methods" { + var raw_buf: [16]u8 = undefined; + + inline for ([_][]const u8{ + "Fetch.continueRequest", + "Fetch.failRequest", + "Fetch.fulfillRequest", + "Fetch.continueWithAuth", + }) |method| { + var msg = Inbox.Message{ + .arena = testing.allocator, + .payload = .{ .cdp = .{ + .raw = &raw_buf, + .input = .{ .method = method }, + } }, + }; + try testing.expect(allowDuringSyncWait(&msg)); + } +} + +test "HttpClient: allowDuringSyncWait denies non-Fetch CDP methods" { + var raw_buf: [16]u8 = undefined; + + inline for ([_][]const u8{ + "Page.navigate", + "Runtime.evaluate", + "Target.createTarget", + "Fetch.enable", + "Fetch.disable", + "", + }) |method| { + var msg = Inbox.Message{ + .arena = testing.allocator, + .payload = .{ .cdp = .{ + .raw = &raw_buf, + .input = .{ .method = method }, + } }, + }; + try testing.expect(!allowDuringSyncWait(&msg)); + } +} diff --git a/src/browser/Runner.zig b/src/browser/Runner.zig index 34c768f5..eef8f4db 100644 --- a/src/browser/Runner.zig +++ b/src/browser/Runner.zig @@ -53,19 +53,18 @@ pub const WaitOpts = struct { ms: u32, until: lp.Config.WaitUntil = .done, }; + pub fn wait(self: *Runner, opts: WaitOpts) !void { - _ = try self._wait(false, opts); + return self._wait(false, opts); } -pub const CDPWaitResult = enum { - done, - cdp_socket, -}; -pub fn waitCDP(self: *Runner, opts: WaitOpts) !CDPWaitResult { +pub fn waitCDP(self: *Runner, opts: WaitOpts) !void { return self._wait(true, opts); } -fn _wait(self: *Runner, comptime is_cdp: bool, opts: WaitOpts) !CDPWaitResult { +// Wait until either a parse-state / load goal is reached or `opts.ms` +// elapses. Returns as soon as _tick reports .done. +fn _wait(self: *Runner, comptime is_cdp: bool, opts: WaitOpts) !void { const session = self.session; const browser = session.browser; @@ -105,13 +104,26 @@ fn _wait(self: *Runner, comptime is_cdp: bool, opts: WaitOpts) !CDPWaitResult { const next_ms = switch (tick_result) { .ok => |next_ms| next_ms, - .done => return .done, - .cdp_socket => if (comptime is_cdp) return .cdp_socket else unreachable, + .done => done_blk: { + if (comptime is_cdp == false) { + return; + } + + // is_cdp keeps the loop alive past .done so the worker + // can observe CDP commands. We have nothing useful to do here + // but we can ask the http_client to wait for CDP messages. + const elapsed: u32 = @intCast(timer.read() / std.time.ns_per_ms); + if (elapsed >= opts.ms) { + return; + } + try self.http_client.tick(@min(opts.ms - elapsed, 200), .all); + break :done_blk 0; + }, }; const ms_elapsed: u32 = @intCast(timer.read() / std.time.ns_per_ms); if (ms_elapsed >= opts.ms) { - return .done; + return; } if (next_ms > 0) { std.Thread.sleep(std.time.ns_per_ms * next_ms); @@ -129,23 +141,10 @@ pub const TickResult = union(enum) { ok: u32, }; pub fn tick(self: *Runner, opts: TickOpts) !TickResult { - return switch (try self._tick(false, opts)) { - .ok => |ms| .{ .ok = ms }, - .done => .done, - .cdp_socket => unreachable, - }; + return self._tick(false, opts); } -pub const CDPTickResult = union(enum) { - done, - cdp_socket, - ok: u32, -}; -pub fn tickCDP(self: *Runner, opts: TickOpts) !CDPTickResult { - return self._tick(true, opts); -} - -fn _tick(self: *Runner, comptime is_cdp: bool, opts: TickOpts) !CDPTickResult { +fn _tick(self: *Runner, comptime is_cdp: bool, opts: TickOpts) !TickResult { // Refresh self.frame from session. In case of pending page, we want to // take its state while loading. If we use only the current frame, we will // return a .done result immediately. @@ -156,19 +155,14 @@ fn _tick(self: *Runner, comptime is_cdp: bool, opts: TickOpts) !CDPTickResult { switch (frame._parse_state) { .pre, .raw, .text, .image => { // The main frame hasn't started/finished navigating. - // There's no JS to run, and no reason to run the scheduler. + // There's no JS to run, and no reason to run the scheduler + // — unless we're the CDP worker, in which case we want + // http_client.tick to drain the inbox. if (http_client.http_active == 0 and (comptime is_cdp) == false) { // haven't started navigating, I guess. return .done; } - - // Either we have active http connections, or we're in CDP - // mode with an extra socket. Either way, we're waiting - // for http traffic - const http_result = try http_client.tick(@intCast(opts.ms)); - if ((comptime is_cdp) and http_result == .cdp_socket) { - return .cdp_socket; - } + try http_client.tick(@intCast(opts.ms), .all); return .{ .ok = 0 }; }, .html, .complete => { @@ -212,11 +206,7 @@ fn _tick(self: *Runner, comptime is_cdp: bool, opts: TickOpts) !CDPTickResult { }, } - if (http_active == 0 and http_client.ws_active == 0 and http_client.queue.first == null and http_client.ready_queue.first == null and (comptime is_cdp == false)) { - // we don't need to consider http_client.intercepted here - // because is_cdp is false, and that can only be - // the case when interception isn't possible. - // + if (http_active == 0 and http_client.ws_active == 0 and http_client.queue.first == null and http_client.ready_queue.first == null and (comptime is_cdp) == false) { // ready_queue is also part of the check: makeRequest now // wraps its handles.perform() in a performing=true window, // and any synchronous libcurl callback that ends up @@ -224,9 +214,10 @@ fn _tick(self: *Runner, comptime is_cdp: bool, opts: TickOpts) !CDPTickResult { // a WebSocket) will append to ready_queue. Without this // check we could observe it non-empty after // http_client.tick returns. - // we don't need to consider http_client.intercepted here - // because is_cdp is false, and that can only be - // the case when interception isn't possible. + // + // intercepted is only non-zero in serve mode, and + // serve mode implies cdp_client != null — so if we got + // here, intercepted == 0. if (comptime IS_DEBUG) { std.debug.assert(http_client.interception_layer.intercepted == 0); } @@ -246,10 +237,9 @@ fn _tick(self: *Runner, comptime is_cdp: bool, opts: TickOpts) !CDPTickResult { } // We're here because we either have active HTTP - // connections, or is_cdp == false (aka, there's - // an cdp_socket registered with the http client). - // We should continue to run tasks, so we minimize how long - // we'll poll for network I/O. + // connections, or there's a CDP client whose inbox we have + // to drain via http_client.tick. We should continue to run + // tasks, so we minimize how long we'll poll for network I/O. var ms_to_wait = @min(opts.ms, browser.msToNextMacrotask() orelse 200); if (ms_to_wait > 10 and browser.hasBackgroundTasks()) { // if we have background tasks, we don't want to wait too @@ -257,10 +247,7 @@ fn _tick(self: *Runner, comptime is_cdp: bool, opts: TickOpts) !CDPTickResult { // to the top of the loop and run macrotasks. ms_to_wait = 10; } - const http_result = try http_client.tick(@intCast(@min(opts.ms, ms_to_wait))); - if ((comptime is_cdp) and http_result == .cdp_socket) { - return .cdp_socket; - } + try http_client.tick(@intCast(@min(opts.ms, ms_to_wait)), .all); return .{ .ok = 0 }; }, .err => |err| { @@ -269,10 +256,7 @@ fn _tick(self: *Runner, comptime is_cdp: bool, opts: TickOpts) !CDPTickResult { }, .raw_done => { if (comptime is_cdp) { - const http_result = try http_client.tick(@intCast(opts.ms)); - if (http_result == .cdp_socket) { - return .cdp_socket; - } + try http_client.tick(@intCast(opts.ms), .all); return .{ .ok = 0 }; } return .done; diff --git a/src/browser/ScriptManagerBase.zig b/src/browser/ScriptManagerBase.zig index fbaf2c6e..531d6c4d 100644 --- a/src/browser/ScriptManagerBase.zig +++ b/src/browser/ScriptManagerBase.zig @@ -300,7 +300,7 @@ pub fn waitForImport(self: *ScriptManagerBase, url: [:0]const u8) !ModuleSource while (true) { switch (entry.value_ptr.state) { .loading => { - _ = try client.tick(200); + _ = try client.tick(200, .sync_wait); continue; }, .done => |script| { diff --git a/src/cdp/CDP.zig b/src/cdp/CDP.zig index 62947555..292e0205 100644 --- a/src/cdp/CDP.zig +++ b/src/cdp/CDP.zig @@ -20,7 +20,10 @@ const std = @import("std"); const lp = @import("lightpanda"); const App = @import("../App.zig"); +const Inbox = @import("../Inbox.zig"); +const Network = @import("../network/Network.zig"); const Notification = @import("../Notification.zig"); +const WS = @import("../network/WS.zig"); const js = @import("../browser/js/js.zig"); const Browser = @import("../browser/Browser.zig"); const Session = @import("../browser/Session.zig"); @@ -56,6 +59,12 @@ conn: Connection, browser: Browser, allocator: Allocator, +// Network-thread read-side handle for the CDP socket. Populated in +// init; Server.handleConnection calls network.registerCdp(&cdp.link) +// after the worker-side handshake completes, and unregisterCdp before +// teardown. +link: Network.CdpLink, + // when true, any target creation must be attached. target_auto_attach: bool = false, @@ -88,6 +97,7 @@ pub fn init( self.* = .{ .app = app, + .link = undefined, .conn = undefined, .browser = undefined, .allocator = allocator, @@ -98,16 +108,18 @@ pub fn init( .browser_context_arena = std.heap.ArenaAllocator.init(allocator), }; - try self.conn.init(socket, self.app.allocator, json_version_response); + try self.browser.init(app, .{ .env = .{ .with_inspector = true } }, self); + const http_client = &self.browser.http_client; + + try self.conn.init(allocator, socket, json_version_response, &http_client.inbox, &app.arena_pool); errdefer self.conn.deinit(); - try self.browser.init(app, .{ .env = .{ .with_inspector = true } }, .{ - .ctx = self, + self.link = .{ + .cdp = self, + .state = .live, .socket = socket, - .blocking_read_start = CDP.blockingReadStart, - .blocking_read = CDP.blockingRead, - .blocking_read_end = CDP.blockingReadStop, - }); + .handles = http_client.handles, + }; } pub fn deinit(self: *CDP) void { @@ -121,101 +133,151 @@ pub fn deinit(self: *CDP) void { self.browser_context_arena.deinit(); self.conn.deinit(); } - -pub fn blockingReadStart(ctx: *anyopaque) bool { - const self: *CDP = @ptrCast(@alignCast(ctx)); - self.conn.setBlocking(true) catch |err| { - log.warn(.app, "CDP blockingReadStart", .{ .err = err }); - return false; - }; - return true; -} - -pub fn blockingRead(ctx: *anyopaque) bool { - const self: *CDP = @ptrCast(@alignCast(ctx)); - return self.readSocket(); -} - -pub fn blockingReadStop(ctx: *anyopaque) bool { - const self: *CDP = @ptrCast(@alignCast(ctx)); - self.conn.setBlocking(false) catch |err| { - log.warn(.app, "CDP blockingReadStop", .{ .err = err }); - return false; - }; - return true; -} - -pub fn readSocket(self: *CDP) bool { - const n = self.conn.read() catch |err| { - log.warn(.app, "CDP read", .{ .err = err }); - return false; - }; - - if (n == 0) { - log.info(.app, "CDP disconnect", .{}); - return false; +// Called by Network when readable bytes arrive on the CDP socket. +// Feeds them through the WS framer and pushes each parsed frame into +// the worker's inbox. Returns false if a close frame was seen (or a +// fatal frame error) so Network drops the link from its poll set. +// +// One network read can carry more bytes than the reader's current +// free space — large CDP messages (Page.addScriptToEvaluateOnNewDocument, +// Runtime evaluation results, etc.) routinely exceed 16 KB, and a +// single read can contain multiple messages, or part of messages. We loop: feed +// what fits, run processMessages (which extracts complete frames, compacts the +// reader, and grows the buffer if it sees a frame header larger than current +// capacity), repeat until the chunk is drained. +pub fn onData(self: *CDP, data: []const u8) anyerror!bool { + var remaining = data; + while (remaining.len > 0) { + const n = self.conn.feedBytes(remaining); + remaining = remaining[n..]; + if ((try self.conn.processMessages()) == false) { + return false; + } } - - return self.conn.processMessages(self) catch false; -} - -pub fn sendJSON(self: *CDP, message: anytype) !void { - try self.conn.sendJSON(message, .{ .emit_null_optional_fields = false }); -} - -pub fn handleMessage(self: *CDP, msg: []const u8) bool { - // if there's an error, it's already been logged - self.processMessage(msg) catch return false; return true; } +// Called by Network when it drops the link unsolicited (peer EOF, read +// error, poll HUP/ERR). Push a disconnect into the inbox so the +// worker's drainInbox surfaces error.ClientDisconnected. +pub fn onLinkDisconnect(self: *CDP, err: ?anyerror) void { + const arena = self.browser.arena_pool.acquire(.tiny, "cdp disconnect") catch |e| switch (e) { + error.OutOfMemory => @panic("OOM"), + }; + + self.browser.http_client.inbox.push(arena, .{ .disconnect = err }); +} + +// Called in the Worker to dispatch a single CDP message bubbled up by +// HttpClient.drainInbox. The Network thread already parsed the JSON +// when it pushed the message to the inbox, so we skip straight to +// dispatchParsed without re-parsing. `c.raw` and `c.arena` outlive +// this call (they're owned by the inbox Message which drainInbox +// frees right after we return), so `c.input`'s string slices stay +// valid for the duration of dispatch. +pub fn onMessage(self: *CDP, c: *Inbox.Message.Cdp) anyerror!void { + const arena = &self.message_arena; + defer _ = arena.reset(.{ .retain_with_limit = 1024 * 16 }); + return self.dispatchParsed(arena.allocator(), .{ .cdp = self }, c.raw, c.input); +} + +// Parse + dispatch a raw JSON CDP message. Used by tests (which +// don't go through the Network thread / inbox pipeline) and by any +// caller that has bytes rather than a pre-parsed InputMessage. pub fn processMessage(self: *CDP, msg: []const u8) !void { const arena = &self.message_arena; defer _ = arena.reset(.{ .retain_with_limit = 1024 * 16 }); return self.dispatch(arena.allocator(), .{ .cdp = self }, msg); } -// @newhttp -// A bit hacky right now. The main server loop doesn't unblock for -// scheduled task. So we run this directly in order to process any -// timeouts (or http events) which are ready to be processed. -pub fn pageWait(self: *CDP, ms: u32) !Session.Runner.CDPWaitResult { +// Called in the worker when a PING message is received +pub fn onPing(self: *CDP, body: []const u8) void { + self.conn.sendPong(body) catch |err| { + log.warn(.app, "CDP pong", .{ .err = err }); + }; +} + +// Called in the Worker when a peer-initiated close with CLOSE_NORMAL. The worker +// loop tears down immediately after; drainInbox returns +// error.ClientDisconnected once we return. +pub fn onClose(self: *CDP) void { + self.conn.send(&WS.CLOSE_NORMAL) catch |err| { + log.warn(.app, "CDP close reply", .{ .err = err }); + }; +} + +// Called in the Worker when the peer disconnected (peer EOF, fatal frame error, +// or right after a peer close was replied to). drainInbox returns +// error.ClientDisconnected, which the worker loop catches to tear down. +// +// If `err` is a recognized WS framing error, send the matching close +// frame back to the peer before tearing down — that's how clients +// observe protocol violations (close code 1002 / 1009 / etc.). +pub fn onDisconnect(self: *CDP, err: ?anyerror) void { + if (err) |e| { + if (WS.errorReply(e)) |close_frame| { + self.conn.send(close_frame) catch {}; + } + log.warn(.cdp, "CDP disconnect", .{ .err = e }); + } else { + log.info(.cdp, "CDP disconnect", .{}); + } +} + +pub fn sendJSON(self: *CDP, message: anytype) !void { + try self.conn.sendJSON(message, .{ .emit_null_optional_fields = false }); +} + +pub fn tick(self: *CDP) !bool { + // Liveness is enforced by TCP keepalive configured in + // Server.configureSocket; the wakeup lets V8 run or terminate. + const wait_ms: u32 = 1000; // 1s + + self.pageWait(wait_ms) catch |wait_err| switch (wait_err) { + error.NoPage => { + // No active page yet (or a teardown is in flight). Fall + // back to ticking the http client directly so CDP messages + // still get dispatched. + self.browser.http_client.tick(wait_ms, .all) catch |err| switch (err) { + error.ClientDisconnected => return false, + else => { + log.err(.app, "http tick", .{ .err = err }); + return false; + }, + }; + }, + error.ClientDisconnected => return false, + else => return wait_err, + }; + return true; +} + +fn pageWait(self: *CDP, ms: u32) !void { const session = &(self.browser.session orelse return error.NoPage); var runner = try session.runner(.{}); return runner.waitCDP(.{ .ms = ms }); } -pub fn tick(self: *CDP) !bool { - // Liveness is enforced by TCP keepalive configured in - // Network.acceptConnections; the wakeup lets V8 run or terminate. - const wait_ms: u32 = 1000; // 1s - - const result = self.pageWait(wait_ms) catch |wait_err| switch (wait_err) { - error.NoPage => { - const status = self.browser.http_client.tick(wait_ms) catch |err| { - log.err(.app, "http tick", .{ .err = err }); - return false; - }; - return status != .cdp_socket or self.readSocket(); - }, - else => return wait_err, - }; - - if (result == .cdp_socket) { - return self.readSocket(); - } - - return true; -} - -// Called from above, in processMessage which handles client messages -// but can also be called internally. For example, Target.sendMessageToTarget -// calls back into dispatch to capture the response. +// Parse-then-dispatch entry point. Used by: +// - CDP.processMessage (tests, and any other caller that hands us +// raw JSON bytes). +// - Target.sendMessageToTarget (a CDP command that wraps another +// CDP message as a string parameter and forwards it through the +// dispatch table). +// The normal Network-thread path doesn't go through here — it parses +// once on the Network thread and reaches dispatchParsed directly via +// onMessage. pub fn dispatch(self: *CDP, arena: Allocator, sender: Command.Sender, str: []const u8) !void { const input = json.parseFromSliceLeaky(InputMessage, arena, str, .{ .ignore_unknown_fields = true, }) catch return error.InvalidJSON; + return self.dispatchParsed(arena, sender, str, input); +} +// Dispatch a pre-parsed CDP message. The caller is responsible for +// keeping `str` and the backing storage for `input`'s string slices +// alive for the duration of the call. +fn dispatchParsed(self: *CDP, arena: Allocator, sender: Command.Sender, str: []const u8, input: InputMessage) !void { var command = Command{ .input = .{ .json = str, @@ -1150,8 +1212,11 @@ pub const Command = struct { }; // When we parse a JSON message from the client, this is the structure -// we always expect -const InputMessage = struct { +// we always expect. Parsed on the Network thread inside +// Connection.handleMessage; the slices reference the raw JSON bytes +// (or arena allocations for fields that needed unescaping). Both +// outlive the InputMessage for the inbox message's lifetime. +pub const InputMessage = struct { id: ?i64 = null, method: []const u8, params: ?InputParams = null, @@ -1162,7 +1227,7 @@ const InputMessage = struct { // capture the raw json object (including the opening and closing braces). // Then, when we're processing the message, and we know what type it is, we // can parse it (in Disaptch(T).params). -const InputParams = struct { +pub const InputParams = struct { raw: []const u8, pub fn jsonParse( diff --git a/src/cdp/Connection.zig b/src/cdp/Connection.zig index a6a2bcbd..05811414 100644 --- a/src/cdp/Connection.zig +++ b/src/cdp/Connection.zig @@ -20,8 +20,12 @@ const std = @import("std"); const lp = @import("lightpanda"); const builtin = @import("builtin"); +const CDP = @import("CDP.zig"); + +const Inbox = @import("../Inbox.zig"); const Config = @import("../Config.zig"); const WS = @import("../network/WS.zig"); +const ArenaPool = @import("../ArenaPool.zig"); const log = lp.log; const posix = std.posix; @@ -32,6 +36,9 @@ pub const Connection = @This(); pub const State = enum { handshaking, live }; +// reference to http_client.inbox +inbox: *Inbox, +arena_pool: *ArenaPool, socket: posix.socket_t, socket_flags: usize, state: State = .handshaking, @@ -41,9 +48,11 @@ json_version_response: []const u8, pub fn init( self: *Connection, - socket: posix.socket_t, allocator: Allocator, + socket: posix.socket_t, json_version_response: []const u8, + inbox: *Inbox, + arena_pool: *ArenaPool, ) !void { const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0); const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true })); @@ -52,7 +61,9 @@ pub fn init( } self.* = .{ + .inbox = inbox, .socket = socket, + .arena_pool = arena_pool, .socket_flags = socket_flags, .reader = try .init(allocator), .send_arena = ArenaAllocator.init(allocator), @@ -104,7 +115,7 @@ pub fn send(self: *Connection, data: []const u8) !void { } } -fn sendPong(self: *Connection, data: []const u8) !void { +pub fn sendPong(self: *Connection, data: []const u8) !void { if (data.len == 0) { return self.send(&WS.EMPTY_PONG); } @@ -157,15 +168,15 @@ pub fn handshake(self: *Connection) !bool { }}; const n = try posix.poll(&pfds, 5000); if (n == 0) { - log.info(.app, "CDP handshake timeout", .{}); + log.info(.cdp, "CDP handshake timeout", .{}); return false; } const read_bytes = self.read() catch |err| { - log.warn(.app, "CDP read", .{ .err = err }); + log.warn(.cdp, "CDP read", .{ .err = err }); return false; }; if (read_bytes == 0) { - log.info(.app, "CDP disconnect", .{}); + log.info(.cdp, "CDP disconnect", .{}); return false; } const result = self.processHttpRequest() catch return false; @@ -183,17 +194,28 @@ pub fn read(self: *Connection) !usize { return n; } -// Append pre-read bytes (from the network thread) to the reader. -// Used post-handshake when the network thread owns socket reads and -// hands bytes back via the HttpClient inbox. Returns BufferTooSmall -// if the reader's free space can't hold this chunk — caller is -// expected to chunk reads to fit (Network reads in 16 KB chunks -// which matches the reader's initial capacity). -pub fn feedBytes(self: *Connection, data: []const u8) !void { +// Append as many bytes as fit into the reader's free space. Returns +// the number of bytes copied. Used post-handshake when the network +// thread owns socket reads. +// +// Why partial: a single network read can carry more bytes than the +// reader's current free space (e.g. one large pending frame plus the +// start of another). The caller is expected to loop: +// +// while (remaining.len > 0) { +// const n = conn.feedBytes(remaining); +// remaining = remaining[n..]; +// _ = try conn.processMessages(); // extracts frames + compacts +// // processMessages also grows the reader buffer if it sees a +// // frame header bigger than the current capacity, so the next +// // feedBytes call has somewhere to land. +// } +pub fn feedBytes(self: *Connection, data: []const u8) usize { const dst = self.reader.readBuf(); - if (data.len > dst.len) return error.BufferTooSmall; - @memcpy(dst[0..data.len], data); - self.reader.len += data.len; + const n = @min(data.len, dst.len); + @memcpy(dst[0..n], data[0..n]); + self.reader.len += n; + return n; } fn processHttpRequest(self: *Connection) !HttpResult { @@ -282,30 +304,34 @@ const empty_json_list_response = "Content-Type: application/json; charset=UTF-8\r\n\r\n" ++ "[]"; -pub fn processMessages(self: *Connection, handler: anytype) !bool { +// Framing-only iteration over received bytes. processMessages no +// longer auto-replies pong/close or sends close-on-error — the Network +// thread runs this loop and is read-only on the socket. +// +// Returns false if a close frame was seen (caller should drop the +// link) or the handler asked to stop; true if the loop exited because +// there were no more complete frames buffered. +pub fn processMessages(self: *Connection) !bool { var reader = &self.reader; while (true) { - const msg = (reader.next() catch |err| { - if (WS.errorReply(err)) |error_reply| { - self.send(error_reply) catch {}; - } - return err; - }) orelse break; + const msg = (try reader.next()) orelse break; - switch (msg.type) { - .pong => {}, - .ping => try self.sendPong(msg.data), - .close => { - self.send(&WS.CLOSE_NORMAL) catch {}; - return false; + const keep = switch (msg.type) { + .pong => true, + .ping, .text, .binary => try self.handleMessage(msg), + .close => blk: { + _ = try self.handleMessage(msg); + break :blk false; }, - .text, .binary => if (handler.handleMessage(msg.data) == false) { - return false; - }, - } + }; + if (msg.cleanup_fragment) { reader.cleanup(); } + + if (!keep) { + return false; + } } // We might have read part of the next message. Our reader potentially @@ -314,6 +340,54 @@ pub fn processMessages(self: *Connection, handler: anytype) !bool { return true; } +fn handleMessage(self: *Connection, msg: WS.Message) !bool { + switch (msg.type) { + .text, .binary => return self.pushCdp(msg.data), + .ping => { + const arena = try self.arena_pool.acquire(.tiny, "cdp ping"); + errdefer self.arena_pool.release(arena); + self.inbox.push(arena, .{ .ping = try arena.dupe(u8, msg.data) }); + return true; + }, + .close => { + const arena = try self.arena_pool.acquire(.tiny, "cdp close"); + self.inbox.push(arena, .close); + return true; + }, + .pong => unreachable, // processMessages skips pong + } +} + +// Parse a CDP JSON frame on the Network thread and push it onto the +// inbox already-parsed. The consumer's allowlist check works on +// `input.method` directly (no substring matching against raw JSON), +// and the worker doesn't re-parse on dispatch. On parse failure we +// push `.disconnect(error.InvalidJSON)` so the worker tears down — +// treated the same way as a fatal WS framing error. +fn pushCdp(self: *Connection, bytes: []const u8) !bool { + // TODO: is it worth trying to pad this for the cost overhead of parsing? + const arena = try self.arena_pool.acquire(bytes.len, "cdp data"); + errdefer self.arena_pool.release(arena); + + const raw = try arena.dupe(u8, bytes); + + const input = std.json.parseFromSliceLeaky( + CDP.InputMessage, + arena, + raw, + .{ .ignore_unknown_fields = true }, + ) catch { + self.inbox.push(arena, .{ .disconnect = error.InvalidJSON }); + return false; + }; + + self.inbox.push(arena, .{ .cdp = .{ + .raw = raw, + .input = input, + } }); + return true; +} + pub fn upgrade(self: *Connection, request: []u8) !void { // our caller already confirmed that we have a trailing \r\n\r\n const request_line_end = std.mem.indexOfScalar(u8, request, '\r') orelse unreachable; diff --git a/src/main_legacy_test.zig b/src/main_legacy_test.zig index 6274a31c..bd77b5fa 100644 --- a/src/main_legacy_test.zig +++ b/src/main_legacy_test.zig @@ -103,7 +103,7 @@ pub fn run(allocator: Allocator, file: []const u8, session: *lp.Session) !void { try frame.navigate(url, .{}); var runner = try session.runner(.{}); - try runner.wait(.{ .ms = 2000 }); + try runner.wait(false, .{ .ms = 2000 }); ls.local.eval("testing.assertOk()", "testing.assertOk()") catch |err| { const caught = try_catch.caughtOrError(allocator, err); diff --git a/src/network/Network.zig b/src/network/Network.zig index 8d8234ab..968e9425 100644 --- a/src/network/Network.zig +++ b/src/network/Network.zig @@ -20,12 +20,10 @@ const std = @import("std"); const lp = @import("lightpanda"); const builtin = @import("builtin"); -const log = lp.log; -const net = std.net; -const posix = std.posix; -const Allocator = std.mem.Allocator; - +const App = @import("../App.zig"); const Config = @import("../Config.zig"); + +const CDP = @import("../cdp/CDP.zig"); const libcurl = @import("../sys/libcurl.zig"); const http = @import("http.zig"); @@ -36,7 +34,12 @@ const WebBotAuth = @import("WebBotAuth.zig"); const Cache = @import("cache/Cache.zig"); const FsCache = @import("cache/FsCache.zig"); -const App = @import("../App.zig"); +const log = lp.log; +const net = std.net; +const posix = std.posix; +const Allocator = std.mem.Allocator; +const DoublyLinkedList = std.DoublyLinkedList; + const Network = @This(); const Listener = struct { @@ -45,6 +48,34 @@ const Listener = struct { onAccept: *const fn (ctx: *anyopaque, socket: posix.socket_t) void, }; +// Read side of a CDP WebSocket, registered with the Network thread so +// bytes are read off the socket from here and dispatched into the CDP +// layer via direct method calls on `cdp`. Network never sends on the +// socket — the worker is the sole writer. After registerCdp returns, +// the worker must not call posix.read on this socket directly. +// unregisterCdp is synchronous: it blocks until Network confirms the +// link has been dropped from its poll set and won't touch it again. +pub const CdpLink = struct { + cdp: *CDP, + state: State, + socket: posix.socket_t, + // The worker's HttpClient.Handles (by value — it's one pointer + // wide). Network calls handles.wakeup() to unblock the worker + // from curl_multi_poll whenever it pushes to the worker's inbox. + handles: http.Handles, + node: DoublyLinkedList.Node = .{}, + + pub const State = enum { + live, + // Worker called unregisterCdp; Network will drop the link on + // its next loop iteration and signal cdp_unregister. + unregistering, + // Network has dropped the link from its poll set. The worker + // can safely free anything the link's callbacks closed over. + removed, + }; +}; + // Number of fixed pollfds entries (wakeup pipe + listener). const PSEUDO_POLLFDS = 2; @@ -53,14 +84,14 @@ const MAX_TICK_CALLBACKS = 16; allocator: Allocator, app: *App, +cache: ?Cache, config: *const Config, ca_blob: ?http.Blob, robot_store: RobotStore, web_bot_auth: ?WebBotAuth, -cache: ?Cache, connections: []http.Connection, -available: std.DoublyLinkedList = .{}, +available: DoublyLinkedList = .{}, conn_mutex: std.Thread.Mutex = .{}, ws_pool: std.heap.MemoryPool(http.Connection), @@ -82,12 +113,37 @@ shutdown: std.atomic.Value(bool) = .init(false), // When Network becomes truly shared, it should become a regular field. multi: ?*libcurl.CurlM = null, submission_mutex: std.Thread.Mutex = .{}, -submission_queue: std.DoublyLinkedList = .{}, +submission_queue: DoublyLinkedList = .{}, callbacks: [MAX_TICK_CALLBACKS]TickCallback = undefined, callbacks_len: usize = 0, callbacks_mutex: std.Thread.Mutex = .{}, +// Registered CDP read endpoints. Producer-side (the worker doing +// register/unregister) and consumer-side (this thread's run loop) are +// serialized by cdp_mutex. cdp_unregister signals when a link +// transitions to .removed so unregisterCdp can return. +cdp_links: DoublyLinkedList = .{}, +cdp_mutex: std.Thread.Mutex = .{}, +cdp_unregister: std.Thread.Condition = .{}, +// Per-iteration snapshot of CdpLinks whose sockets are in pollfds. +// Sized at maxConnections at init time so we never allocate inside +// run(). Parallel to pollfds[cdp_start..cdp_start + cdp_poll_count]. +// Persists across iterations; only rebuilt when `cdp_dirty` is set. +cdp_poll_snapshot: []?*CdpLink, +cdp_poll_count: usize = 0, + +// Set whenever the cdp_links list changes (register / unregister / +// natural drop). prepareCdpPollFds rebuilds the snapshot only when +// this is true; idle iterations skip the rebuild. Network run() ticks +// hundreds of times per second, and the link set is stable between +// connection lifecycle events, so the steady-state cost of the CDP +// poll prep is one mutex acquire + one bool read. +cdp_dirty: bool = false, + +// Location in pollfds where cdp sockets start +cdp_start: usize, + /// Optional IP filter for blocking requests to private/internal networks (--block-private-networks). ip_filter: ?*IpFilter = null, @@ -224,10 +280,25 @@ pub fn init(allocator: Allocator, app: *App, config: *const Config) !Network { const pipe = try posix.pipe2(.{ .NONBLOCK = true, .CLOEXEC = true }); - // 0 is wakeup, 1 is listener, rest for curl fds - const pollfds = try allocator.alloc(posix.pollfd, PSEUDO_POLLFDS + config.httpMaxConcurrent()); + // IMPORTANT: This is a bit messy, and it exists specifically because + // self.multi is optional. self.multi is optional so that, when telemetry is + // disabled, we don't need the overhead of a multi. If self.multi wasn't + // optional, then we wouldn't need to use posix.poll, we could use + // curl_multi_poll. This is to do in a follow up. + + // The structure is: 0 is wakeup, 1 is listener, rest for curl fds: + // [0] wakeup pipe + // [1] listener + // [PSEUDO_POLLFDS .. + httpMaxConcurrent] curl multi fds + // [.. + maxConnections] CDP socket fds + const max_cdp = config.maxConnections(); + const pollfds = try allocator.alloc(posix.pollfd, PSEUDO_POLLFDS + config.httpMaxConcurrent() + max_cdp); errdefer allocator.free(pollfds); + const cdp_poll_snapshot = try allocator.alloc(?*CdpLink, max_cdp); + errdefer allocator.free(cdp_poll_snapshot); + @memset(cdp_poll_snapshot, null); + @memset(pollfds, .{ .fd = -1, .events = 0, .revents = 0 }); pollfds[0] = .{ .fd = pipe[0], .events = posix.POLL.IN, .revents = 0 }; @@ -258,7 +329,7 @@ pub fn init(allocator: Allocator, app: *App, config: *const Config) !Network { const connections = try allocator.alloc(http.Connection, count); errdefer allocator.free(connections); - var available: std.DoublyLinkedList = .{}; + var available: DoublyLinkedList = .{}; for (0..count) |i| { connections[i] = try http.Connection.init(ca_blob, config, ip_filter); available.append(&connections[i].node); @@ -292,15 +363,17 @@ pub fn init(allocator: Allocator, app: *App, config: *const Config) !Network { .pollfds = pollfds, .wakeup_pipe = pipe, + .cdp_poll_snapshot = cdp_poll_snapshot, + .cdp_start = PSEUDO_POLLFDS + config.httpMaxConcurrent(), .available = available, .connections = connections, .app = app, + .cache = cache, .robot_store = RobotStore.init(allocator), .web_bot_auth = web_bot_auth, - .cache = cache, .ws_pool = .init(allocator), .ws_max = config.wsMaxConcurrent(), @@ -322,6 +395,7 @@ pub fn deinit(self: *Network) void { } self.allocator.free(self.pollfds); + self.allocator.free(self.cdp_poll_snapshot); if (self.ca_blob) |ca_blob| { const data: [*]u8 = @ptrCast(ca_blob.data); @@ -420,6 +494,195 @@ pub fn fireTicks(self: *Network) void { } } +// Hand a CDP WebSocket's read side over to the main network thread. The caller +// owns the link and must keep it alive until unregisterCdp is called. +// The caller must not read from the socket. +pub fn registerCdp(self: *Network, link: *CdpLink) void { + self.cdp_mutex.lock(); + self.cdp_links.append(&link.node); + self.cdp_dirty = true; + self.cdp_mutex.unlock(); + self.wakeupPoll(); +} + +// Synchronous teardown. Blocks the caller until this thread has +// dropped the link from its poll set and won't invoke any of the +// link's callbacks. Safe to call after Network has already dropped +// the link unsolicited (state == .removed) — returns immediately in +// that case. +pub fn unregisterCdp(self: *Network, link: *CdpLink) void { + self.cdp_mutex.lock(); + defer self.cdp_mutex.unlock(); + if (link.state == .live) { + link.state = .unregistering; + self.cdp_dirty = true; + self.wakeupPoll(); + } + + while (link.state != .removed) { + // condition variable, waiting for a signal + self.cdp_unregister.wait(&self.cdp_mutex); + } +} + +// Drop a link from the poll set. Caller must hold cdp_mutex. +// - on_disconnect is fired iff `notify` is true. Set notify=false +// when the consumer already knows the link is dead (e.g. close +// frame just went through on_bytes; the .close message in the +// inbox is enough to wake the worker). +// - The worker is woken via curl_multi_wakeup either way. +fn dropCdp(self: *Network, link: *CdpLink, err: ?anyerror, notify: bool) void { + self.cdp_links.remove(&link.node); + link.state = .removed; + self.cdp_dirty = true; + if (notify) { + // notify=true means the worker hasn't been told yet — push the + // disconnect into the inbox and break it out of curl_multi_poll. + // notify=false paths have already woken the worker (close frame + // case) or are about to be unblocked via cdp_unregister.broadcast + // (unregister case); no extra wakeup needed. + link.cdp.onLinkDisconnect(err); + link.handles.wakeup() catch |e| { + lp.log.warn(.cdp, "CDP link wakeup", .{ .err = e }); + }; + } +} + +// Build the CDP portion of pollfds and snapshot the matching *CdpLink +// pointers so we can correlate revents after poll() returns. Called +// before poll, under cdp_mutex. +fn prepareCdpPollFds(self: *Network) void { + const cdp_start = self.cdp_start; + + self.cdp_mutex.lock(); + defer self.cdp_mutex.unlock(); + + // Idle fast-path: link set unchanged since last rebuild, so the + // snapshot + pollfds entries from the previous iteration are still + // correct. Kernel will overwrite `revents` in the next poll() call. + if (!self.cdp_dirty) { + return; + } + self.cdp_dirty = false; + + @memset(self.pollfds[cdp_start..], .{ .fd = -1, .events = 0, .revents = 0 }); + + var i: usize = 0; + var it = self.cdp_links.first; + while (it) |node| : (it = node.next) { + lp.assert(i < self.cdp_poll_snapshot.len, "CDP poll snapshot overflow", .{ .i = i, .len = self.cdp_poll_snapshot.len }); + const link: *CdpLink = @fieldParentPtr("node", node); + if (link.state != .live) { + // Will be handled in processCdpEvents; don't poll its fd. + continue; + } + + self.pollfds[cdp_start + i] = .{ + .fd = link.socket, + .events = posix.POLL.IN, + .revents = 0, + }; + self.cdp_poll_snapshot[i] = link; + i += 1; + } + self.cdp_poll_count = i; +} + +// Per-iteration CDP handling: process pending unregistrations, then +// process revents on each polled link. Called after poll(). +fn processCdpEvents(self: *Network) void { + var any_removed = false; + const cdp_start = self.cdp_start; + + self.cdp_mutex.lock(); + defer self.cdp_mutex.unlock(); + + // First pass: pending unregister requests. + var it = self.cdp_links.first; + while (it) |node| { + const next = node.next; + const link: *CdpLink = @fieldParentPtr("node", node); + if (link.state == .unregistering) { + self.dropCdp(link, null, false); + any_removed = true; + } + it = next; + } + + // Second pass: revents on the snapshot. Skip links the first pass + // (or a prior natural drop) has already removed. + for (self.cdp_poll_snapshot[0..self.cdp_poll_count], 0..) |link_opt, i| { + const link = link_opt orelse continue; + if (link.state != .live) { + continue; + } + const pfd = self.pollfds[cdp_start + i]; + if (pfd.revents == 0) { + continue; + } + + const fatal_events: i16 = comptime @intCast(posix.POLL.HUP | posix.POLL.ERR | posix.POLL.NVAL); + if (pfd.revents & fatal_events != 0) { + self.dropCdp(link, null, true); + any_removed = true; + continue; + } + + if (pfd.revents & posix.POLL.IN == 0) { + continue; + } + + var buf: [16 * 1024]u8 = undefined; + const n = posix.read(link.socket, &buf) catch |err| switch (err) { + error.WouldBlock => continue, + else => { + lp.log.warn(.cdp, "CDP read", .{ .err = err }); + self.dropCdp(link, err, true); + any_removed = true; + continue; + }, + }; + + if (n == 0) { + // peer EOF + self.dropCdp(link, null, true); + any_removed = true; + continue; + } + + const keep = link.cdp.onData(buf[0..n]) catch |err| { + // Fatal frame/feed error. Whatever messages on_bytes + // managed to push are still in the inbox; the failing + // frame was NOT pushed, and the worker has no way to + // know it should exit. Drop with notify=true so + // on_disconnect surfaces a .disconnect into the inbox. + // dropCdp wakes the worker. + lp.log.warn(.cdp, "CDP onData", .{ .err = err }); + self.dropCdp(link, err, true); + any_removed = true; + continue; + }; + + // on_bytes succeeded — wake the worker so it observes anything + // new in the inbox (data / ping / close). + link.handles.wakeup() catch |err| { + lp.log.warn(.cdp, "CDP link wakeup", .{ .err = err }); + }; + + if (!keep) { + // Close frame: the handler already pushed .close. Worker's + // drainInbox will call on_disconnect itself after replying, + // so we drop without re-notifying. + self.dropCdp(link, null, false); + any_removed = true; + } + } + + if (any_removed) { + self.cdp_unregister.broadcast(); + } +} + pub fn run(self: *Network) void { var drain_buf: [64]u8 = undefined; var running_handles: c_int = 0; @@ -450,6 +713,8 @@ pub fn run(self: *Network) void { self.preparePollFds(multi); } + self.prepareCdpPollFds(); + // for ontick to work, you need to wake up periodically const timeout = blk: { const min_timeout = 250; // 250ms @@ -491,6 +756,8 @@ pub fn run(self: *Network) void { self.processCompletions(multi); } + self.processCdpEvents(); + self.fireTicks(); if (self.shutdown.load(.acquire) and running_handles == 0) { diff --git a/src/network/WS.zig b/src/network/WS.zig index 43c77816..097179a7 100644 --- a/src/network/WS.zig +++ b/src/network/WS.zig @@ -309,17 +309,26 @@ pub fn Reader(comptime EXPECT_MASK: bool, MAX_MESSAGE_SIZE: usize) type { }; } -pub fn errorReply(err: NextError) ?[]const u8 { +// Map a reader error (or any error that flowed up out of one) to the +// matching server→client close frame. Takes anyerror so callers that +// hold the error in a wider type (e.g. ?anyerror across an inbox) +// don't need to narrow it first; unrecognized errors return null. +pub fn errorReply(err: anyerror) ?[]const u8 { return switch (err) { error.TooLarge => &CLOSE_TOO_BIG, - error.Masked => &CLOSE_PROTOCOL_ERROR, - error.NotMasked => &CLOSE_PROTOCOL_ERROR, - error.ReservedFlags => &CLOSE_PROTOCOL_ERROR, - error.InvalidMessageType => &CLOSE_PROTOCOL_ERROR, - error.ControlTooLarge => &CLOSE_PROTOCOL_ERROR, - error.InvalidContinuation => &CLOSE_PROTOCOL_ERROR, - error.NestedFragmentation => &CLOSE_PROTOCOL_ERROR, - error.OutOfMemory => null, + error.Masked, + error.NotMasked, + error.ReservedFlags, + error.InvalidMessageType, + error.ControlTooLarge, + error.InvalidContinuation, + error.NestedFragmentation, + // Strictly an application-level (CDP) error, but 1002 + // "protocol error" is the closest fit and gives the peer a + // cleaner signal than a bare TCP FIN. + error.InvalidJSON, + => &CLOSE_PROTOCOL_ERROR, + else => null, }; } diff --git a/src/network/http.zig b/src/network/http.zig index e97e779f..ce00ee51 100644 --- a/src/network/http.zig +++ b/src/network/http.zig @@ -625,6 +625,13 @@ pub const Handles = struct { try libcurl.curl_multi_poll(self.multi, extra_fds, timeout_ms, null); } + // Thread-safe wake of a poll() in progress on this multi. Used by + // the Network thread to nudge the worker out of curl_multi_poll + // when it pushes work onto the worker's inbox. + pub fn wakeup(self: *Handles) !void { + try libcurl.curl_multi_wakeup(self.multi); + } + pub const MultiMessage = struct { conn: *Connection, err: ?Error, diff --git a/src/sys/libcurl.zig b/src/sys/libcurl.zig index b621a3a3..2804db00 100644 --- a/src/sys/libcurl.zig +++ b/src/sys/libcurl.zig @@ -862,6 +862,15 @@ pub fn curl_multi_poll( try errorMCheck(c.curl_multi_poll(multi, raw_fds, @intCast(extra_fds.len), timeout_ms, numfds)); } +// Thread-safe wakeup for a curl_multi_poll sleeping on another thread. +// Per libcurl docs: "This function can be called from any thread, and it +// wakes up a sleeping curl_multi_poll call that is currently (or will be) +// waiting." Safe to call concurrently with curl_multi_perform on the same +// handle — the wake byte is delivered via an internal self-pipe. +pub fn curl_multi_wakeup(multi: *CurlM) ErrorMulti!void { + try errorMCheck(c.curl_multi_wakeup(multi)); +} + pub fn curl_multi_waitfds(multi: *CurlM, ufds: []CurlWaitFd, fd_count: *c_uint) ErrorMulti!void { const raw_fds: [*c]c.curl_waitfd = if (ufds.len == 0) null else @ptrCast(ufds.ptr); try errorMCheck(c.curl_multi_waitfds(multi, raw_fds, @intCast(ufds.len), fd_count));