From ceee9cfb102b6fba7f5cc4e1d68a0cb786659d2e Mon Sep 17 00:00:00 2001 From: Nikolay Govorov Date: Tue, 28 Apr 2026 21:44:31 +0100 Subject: [PATCH] Move Handles stuff to Network --- src/browser/HttpClient.zig | 58 ++++------- src/browser/webapi/net/WebSocket.zig | 6 +- src/cdp/CDP.zig | 2 +- src/network/Network.zig | 140 +++++++++++++++++++++++++-- src/network/http.zig | 75 -------------- 5 files changed, 159 insertions(+), 122 deletions(-) diff --git a/src/browser/HttpClient.zig b/src/browser/HttpClient.zig index 1e93be03..7c95ba50 100644 --- a/src/browser/HttpClient.zig +++ b/src/browser/HttpClient.zig @@ -72,14 +72,11 @@ ws_active: usize = 0, http_active: usize = 0, // Our curl multi handle. -handles: http.Handles, +handle: Network.Handle, // Connections currently in this client's curl_multi. in_use: std.DoublyLinkedList = .{}, -// Connections that failed to be removed from curl_multi during perform. -dirty: std.DoublyLinkedList = .{}, - // Whether we're currently inside a curl_multi_perform call. performing: bool = false, @@ -186,13 +183,13 @@ fn layerWith(self: anytype, next: Layer) Layer { } pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp: ?*CDP) !void { - var handles = try http.Handles.init(network.config); - errdefer handles.deinit(); + var handle = try network.getHandle(); + errdefer handle.deinit(); const http_proxy = network.config.httpProxy(); self.* = Client{ - .handles = handles, + .handle = handle, .network = network, .allocator = allocator, .cdp = cdp, @@ -235,7 +232,7 @@ pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp: ?*CDP) pub fn deinit(self: *Client) void { self.abort(); - self.handles.deinit(); + self.handle.deinit(); self.ws_ready.deinit(self.allocator); self.clearUserAgentOverride(); @@ -291,13 +288,13 @@ pub fn setTlsVerify(self: *Client, verify: bool) !void { var it = self.in_use.first; while (it) |node| : (it = node.next) { const conn: *http.Connection = @fieldParentPtr("node", node); - try conn.setTlsVerify(verify, self.use_proxy); + try self.handle.submitTlsVerify(conn, verify, self.use_proxy); } it = self.ready_queue.first; while (it) |node| : (it = node.next) { const conn: *http.Connection = @fieldParentPtr("node", node); - try conn.setTlsVerify(verify, self.use_proxy); + try self.handle.submitTlsVerify(conn, verify, self.use_proxy); } self.tls_verify = verify; @@ -354,7 +351,7 @@ pub fn abort(self: *Client) void { std.debug.assert(self.queue.first == null); std.debug.assert(self.in_use.first == null); std.debug.assert(self.ready_queue.first == null); - std.debug.assert(self.dirty.first == null); + std.debug.assert(self.handle.dirty.first == null); } } @@ -422,7 +419,7 @@ pub fn tick(self: *Client, timeout_ms: u32, mode: DrainMode) !void { fn drainQueue(self: *Client) !void { while (self.queue.popFirst()) |queue_node| { const transfer: *Transfer = @fieldParentPtr("_node", queue_node); - const conn = self.network.getConnection() orelse { + const conn = self.handle.getConnection() orelse { self.queue.prepend(queue_node); return; }; @@ -584,7 +581,7 @@ fn process(self: *Client, transfer: *Transfer) !void { // libcurl doesn't allow recursive calls, if we're in a `perform()` operation // then we _have_ to queue this. if (self.performing == false) { - if (self.network.getConnection()) |conn| { + if (self.handle.getConnection()) |conn| { return self.makeRequest(conn, transfer); } } @@ -657,7 +654,7 @@ fn makeRequest(self: *Client, conn: *http.Connection, transfer: *Transfer) anyer // execution). self.performing = true; defer self.performing = false; - _ = try self.handles.perform(); + _ = try self.handle.perform(); } fn perform(self: *Client, timeout_ms: c_int) anyerror!void { @@ -665,7 +662,7 @@ fn perform(self: *Client, timeout_ms: c_int) anyerror!void { self.performing = true; defer self.performing = false; - break :blk try self.handles.perform(); + break :blk try self.handle.perform(); }; // Drain queued WebSocket events. ws callbacks (called from libcurl during @@ -674,14 +671,7 @@ fn perform(self: *Client, timeout_ms: c_int) anyerror!void { self.drainReadyWs(); // Process dirty connections — return them to Network pool. - while (self.dirty.popFirst()) |node| { - const conn: *http.Connection = @fieldParentPtr("node", node); - self.handles.remove(conn) catch |err| { - log.fatal(.http, "multi remove handle", .{ .err = err, .src = "perform" }); - @panic("multi_remove_handle"); - }; - self.releaseConn(conn); - } + self.handle.drainDirty(); while (self.ready_queue.popFirst()) |node| { const conn: *http.Connection = @fieldParentPtr("node", node); @@ -708,7 +698,7 @@ fn perform(self: *Client, timeout_ms: c_int) anyerror!void { if (running > 0 or self.cdp_link_active) { // when cdp_link_active == true, the network thread will unblock this // by calling wakup on our multi. - try self.handles.poll(&.{}, timeout_ms); + try self.handle.poll(&.{}, timeout_ms); } _ = try self.processMessages(); @@ -776,7 +766,7 @@ fn isFetchInterceptionMethod(method: []const u8) bool { std.mem.eql(u8, method, "Fetch.continueWithAuth"); } -fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *Transfer) !bool { +fn processOneMessage(self: *Client, msg: Network.Handle.MultiMessage, transfer: *Transfer) !bool { // State at entry: .inflight = conn (multi just delivered a completion). if (msg.err == null or msg.err.? == error.RecvError) { transfer.detectAuthChallenge(msg.conn); @@ -816,7 +806,7 @@ fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *T const conn = transfer._conn.?; - try self.handles.remove(conn); + try self.handle.remove(conn); // Conn temporarily out of multi during reconfigure. // _detached_conn lets processMessages release it if any of // the steps below throw. State stays .inflight; _conn stays set @@ -824,7 +814,7 @@ fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *T transfer.reset(); try transfer.configureConn(conn); - try self.handles.add(conn); + try self.handle.add(conn); transfer._detached_conn = null; _ = try self.perform(0); @@ -893,7 +883,7 @@ fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *T fn processMessages(self: *Client) !bool { var processed = false; - while (try self.handles.readMessage()) |msg| { + while (try self.handle.readMessage()) |msg| { switch (msg.conn.transport) { .http => |transfer| { const done = self.processOneMessage(msg, transfer) catch |err| blk: { @@ -950,7 +940,7 @@ pub fn trackConn(self: *Client, conn: *http.Connection) !void { self.releaseConn(conn); return err; }; - self.handles.add(conn) catch |err| { + self.handle.add(conn) catch |err| { self.in_use.remove(&conn.node); conn.in_use = false; self.releaseConn(conn); @@ -978,17 +968,11 @@ pub fn removeConn(self: *Client, conn: *http.Connection) void { .websocket => self.ws_active -= 1, else => unreachable, } - if (self.handles.remove(conn)) { - self.releaseConn(conn); - } else |_| { - // Can happen if we're in a perform() call, so we'll queue this - // for cleanup later. - self.dirty.append(&conn.node); - } + self.handle.cancelConn(conn); } fn releaseConn(self: *Client, conn: *http.Connection) void { - self.network.releaseConnection(conn); + self.handle.releaseConnection(conn); } // Called from WebSocket libcurl callbacks (currently same worker thread, but diff --git a/src/browser/webapi/net/WebSocket.zig b/src/browser/webapi/net/WebSocket.zig index 3eeac07f..d543ee9e 100644 --- a/src/browser/webapi/net/WebSocket.zig +++ b/src/browser/webapi/net/WebSocket.zig @@ -152,11 +152,11 @@ pub fn init(url: []const u8, protocols: [][]const u8, frame: *Frame) !*WebSocket const resolved_url = try URL.resolve(arena, frame.base(), url, .{ .always_dupe = true, .encoding = frame.charset }); const http_client = &frame._session.browser.http_client; - const conn = http_client.network.newConnection() orelse { + const conn = http_client.handle.newConnection() orelse { return error.NoFreeConnection; }; - errdefer http_client.network.releaseConnection(conn); + errdefer http_client.handle.releaseConnection(conn); try conn.setURL(resolved_url); try conn.setConnectOnly(false); @@ -285,7 +285,7 @@ fn queueMessage(self: *WebSocket, msg: Message) !void { if (was_empty) { // Unpause the send callback so libcurl will request data if (self._conn) |conn| { - try conn.pause(.{ .cont = true }); + try self._http_client.handle.submitUnpause(conn); } } } diff --git a/src/cdp/CDP.zig b/src/cdp/CDP.zig index d1a0fb91..fbe31986 100644 --- a/src/cdp/CDP.zig +++ b/src/cdp/CDP.zig @@ -118,7 +118,7 @@ pub fn init( .cdp = self, .state = .live, .socket = socket, - .handles = http_client.handles, + .handle = http_client.handle, }; } diff --git a/src/network/Network.zig b/src/network/Network.zig index 1e51f042..77a80753 100644 --- a/src/network/Network.zig +++ b/src/network/Network.zig @@ -48,6 +48,8 @@ const Listener = struct { onAccept: *const fn (ctx: *anyopaque, socket: posix.socket_t) void, }; +const Error = libcurl.Error; + // Read side of a CDP WebSocket, registered with the Network thread so // bytes are read off the socket from here and dispatched into the CDP // layer via direct method calls on `cdp`. Network never sends on the @@ -59,10 +61,10 @@ pub const CdpLink = struct { cdp: *CDP, state: State, socket: posix.socket_t, - // The worker's HttpClient.Handles (by value — it's one pointer - // wide). Network calls handles.wakeup() to unblock the worker - // from curl_multi_poll whenever it pushes to the worker's inbox. - handles: http.Handles, + // The worker's Handle (by value — it's two pointers wide). + // Network calls handle.wakeup() to unblock the worker from + // curl_multi_poll whenever it pushes to the worker's inbox. + handle: Handle, node: DoublyLinkedList.Node = .{}, pub const State = enum { @@ -424,6 +426,10 @@ pub fn deinit(self: *Network) void { globalDeinit(); } +pub fn getHandle(self: *Network) !Handle { + return try Handle.init(self); +} + pub fn bind( self: *Network, address: *net.Address, @@ -542,7 +548,7 @@ fn dropCdp(self: *Network, link: *CdpLink, err: ?anyerror, notify: bool) void { // case) or are about to be unblocked via cdp_unregister.broadcast // (unregister case); no extra wakeup needed. link.cdp.onLinkDisconnect(err); - link.handles.wakeup() catch |e| { + link.handle.wakeup() catch |e| { lp.log.warn(.cdp, "CDP link wakeup", .{ .err = e }); }; } @@ -665,7 +671,7 @@ fn processCdpEvents(self: *Network) void { // on_bytes succeeded — wake the worker so it observes anything // new in the inbox (data / ping / close). - link.handles.wakeup() catch |err| { + link.handle.wakeup() catch |err| { lp.log.warn(.cdp, "CDP link wakeup", .{ .err = err }); }; @@ -983,6 +989,128 @@ pub fn newConnection(self: *Network) ?*http.Connection { return conn; } +pub const Handle = struct { + multi: *libcurl.CurlM, + network: *Network, + + // Conns whose remove was deferred because we were inside + // curl_multi_perform. Drained by drainDirty before the next perform. + dirty: std.DoublyLinkedList = .{}, + + pub fn init(network: *Network) !Handle { + const multi = libcurl.curl_multi_init() orelse return error.FailedToInitializeMulti; + errdefer libcurl.curl_multi_cleanup(multi) catch {}; + + try libcurl.curl_multi_setopt(multi, .max_host_connections, network.config.httpMaxHostOpen()); + + return .{ .network = network, .multi = multi }; + } + + pub fn deinit(self: *Handle) void { + libcurl.curl_multi_cleanup(self.multi) catch {}; + } + + pub fn add(self: *Handle, conn: *const http.Connection) !void { + try libcurl.curl_multi_add_handle(self.multi, conn._easy); + } + + pub fn remove(self: *Handle, conn: *const http.Connection) !void { + try libcurl.curl_multi_remove_handle(self.multi, conn._easy); + } + + pub fn perform(self: *Handle) !c_int { + var running: c_int = undefined; + try libcurl.curl_multi_perform(self.multi, &running); + return running; + } + + pub fn poll(self: *Handle, extra_fds: []libcurl.CurlWaitFd, timeout_ms: c_int) !void { + try libcurl.curl_multi_poll(self.multi, extra_fds, timeout_ms, null); + } + + // Thread-safe wake of a poll() in progress on this multi. Used by + // the Network thread to nudge the worker out of curl_multi_poll + // when it pushes work onto the worker's inbox. + pub fn wakeup(self: *Handle) !void { + try libcurl.curl_multi_wakeup(self.multi); + } + + pub const MultiMessage = struct { + conn: *http.Connection, + err: ?Error, + }; + + pub fn readMessage(self: *Handle) !?MultiMessage { + var messages_count: c_int = 0; + const msg = libcurl.curl_multi_info_read(self.multi, &messages_count) orelse return null; + return switch (msg.data) { + .done => |err| { + var private: *anyopaque = undefined; + try libcurl.curl_easy_getinfo(msg.easy_handle, .private, &private); + return .{ + .conn = @ptrCast(@alignCast(private)), + .err = err, + }; + }, + else => unreachable, + }; + } + + // connection pool delegates ---------------------------------------- + + pub fn getConnection(self: *Handle) ?*http.Connection { + return self.network.getConnection(); + } + + pub fn newConnection(self: *Handle) ?*http.Connection { + return self.network.newConnection(); + } + + pub fn releaseConnection(self: *Handle, conn: *http.Connection) void { + self.network.releaseConnection(conn); + } + + // Cancel an active conn: remove from multi and return to pool. If + // we're inside curl_multi_perform (libcurl forbids remove from a + // callback), defer the remove via the dirty queue and let + // drainDirty pick it up before the next perform. + pub fn cancelConn(self: *Handle, conn: *http.Connection) void { + if (self.remove(conn)) { + self.releaseConnection(conn); + } else |_| { + self.dirty.append(&conn.node); + } + } + + pub fn drainDirty(self: *Handle) void { + while (self.dirty.popFirst()) |node| { + const conn: *http.Connection = @fieldParentPtr("node", node); + self.remove(conn) catch |err| { + log.fatal(.http, "multi remove handle", .{ .err = err, .src = "drainDirty" }); + @panic("multi_remove_handle"); + }; + self.releaseConnection(conn); + } + } + + // per-conn ops on active conns ------------------------------------- + // + // These wrap libcurl mutators that today run synchronously on the + // worker thread. The seam exists so the same call sites stay valid + // once the network thread owns the multi and these become + // cross-thread submits. + + pub fn submitTlsVerify(self: *Handle, conn: *http.Connection, verify: bool, use_proxy: bool) !void { + _ = self; + try conn.setTlsVerify(verify, use_proxy); + } + + pub fn submitUnpause(self: *Handle, conn: *http.Connection) !void { + _ = self; + try conn.pause(.{ .cont = true }); + } +}; + // Wraps lines @ 64 columns. A PEM is basically a base64 encoded DER (which is // what Zig has), with lines wrapped at 64 characters and with a basic header // and footer diff --git a/src/network/http.zig b/src/network/http.zig index ce00ee51..e9bb84dd 100644 --- a/src/network/http.zig +++ b/src/network/http.zig @@ -34,8 +34,6 @@ pub const readfunc_pause = libcurl.curl_readfunc_pause; pub const writefunc_error = libcurl.curl_writefunc_error; pub const WsFrameType = libcurl.WsFrameType; -const Error = libcurl.Error; - pub fn curl_version() [*c]const u8 { return libcurl.curl_version(); } @@ -572,16 +570,6 @@ pub const Connection = struct { } } - pub fn request(self: *const Connection, http_headers: *const Config.HttpHeaders) !u16 { - var header_list = try Headers.init(http_headers.user_agent_header); - defer header_list.deinit(); - try self.secretHeaders(&header_list, http_headers); - try self.setHeaders(&header_list); - - try libcurl.curl_easy_perform(self._easy); - return self.getResponseCode(); - } - pub fn wsStartFrame(self: *const Connection, frame_type: libcurl.WsFrameType, size: usize) !void { try libcurl.curl_ws_start_frame(self._easy, frame_type, @intCast(size)); } @@ -591,69 +579,6 @@ pub const Connection = struct { } }; -pub const Handles = struct { - multi: *libcurl.CurlM, - - pub fn init(config: *const Config) !Handles { - const multi = libcurl.curl_multi_init() orelse return error.FailedToInitializeMulti; - errdefer libcurl.curl_multi_cleanup(multi) catch {}; - - try libcurl.curl_multi_setopt(multi, .max_host_connections, config.httpMaxHostOpen()); - - return .{ .multi = multi }; - } - - pub fn deinit(self: *Handles) void { - libcurl.curl_multi_cleanup(self.multi) catch {}; - } - - pub fn add(self: *Handles, conn: *const Connection) !void { - try libcurl.curl_multi_add_handle(self.multi, conn._easy); - } - - pub fn remove(self: *Handles, conn: *const Connection) !void { - try libcurl.curl_multi_remove_handle(self.multi, conn._easy); - } - - pub fn perform(self: *Handles) !c_int { - var running: c_int = undefined; - try libcurl.curl_multi_perform(self.multi, &running); - return running; - } - - pub fn poll(self: *Handles, extra_fds: []libcurl.CurlWaitFd, timeout_ms: c_int) !void { - try libcurl.curl_multi_poll(self.multi, extra_fds, timeout_ms, null); - } - - // Thread-safe wake of a poll() in progress on this multi. Used by - // the Network thread to nudge the worker out of curl_multi_poll - // when it pushes work onto the worker's inbox. - pub fn wakeup(self: *Handles) !void { - try libcurl.curl_multi_wakeup(self.multi); - } - - pub const MultiMessage = struct { - conn: *Connection, - err: ?Error, - }; - - pub fn readMessage(self: *Handles) !?MultiMessage { - var messages_count: c_int = 0; - const msg = libcurl.curl_multi_info_read(self.multi, &messages_count) orelse return null; - return switch (msg.data) { - .done => |err| { - var private: *anyopaque = undefined; - try libcurl.curl_easy_getinfo(msg.easy_handle, .private, &private); - return .{ - .conn = @ptrCast(@alignCast(private)), - .err = err, - }; - }, - else => unreachable, - }; - } -}; - fn debugCallback(_: *libcurl.Curl, msg_type: libcurl.CurlInfoType, raw: [*c]u8, len: usize, _: *anyopaque) c_int { const data = raw[0..len]; switch (msg_type) {