Move Handles stuff to Network

This commit is contained in:
Nikolay Govorov
2026-04-28 21:44:31 +01:00
parent 5327f00473
commit ceee9cfb10
5 changed files with 159 additions and 122 deletions

View File

@@ -72,14 +72,11 @@ ws_active: usize = 0,
http_active: usize = 0,
// Our curl multi handle.
handles: http.Handles,
handle: Network.Handle,
// Connections currently in this client's curl_multi.
in_use: std.DoublyLinkedList = .{},
// Connections that failed to be removed from curl_multi during perform.
dirty: std.DoublyLinkedList = .{},
// Whether we're currently inside a curl_multi_perform call.
performing: bool = false,
@@ -186,13 +183,13 @@ fn layerWith(self: anytype, next: Layer) Layer {
}
pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp: ?*CDP) !void {
var handles = try http.Handles.init(network.config);
errdefer handles.deinit();
var handle = try network.getHandle();
errdefer handle.deinit();
const http_proxy = network.config.httpProxy();
self.* = Client{
.handles = handles,
.handle = handle,
.network = network,
.allocator = allocator,
.cdp = cdp,
@@ -235,7 +232,7 @@ pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp: ?*CDP)
pub fn deinit(self: *Client) void {
self.abort();
self.handles.deinit();
self.handle.deinit();
self.ws_ready.deinit(self.allocator);
self.clearUserAgentOverride();
@@ -291,13 +288,13 @@ pub fn setTlsVerify(self: *Client, verify: bool) !void {
var it = self.in_use.first;
while (it) |node| : (it = node.next) {
const conn: *http.Connection = @fieldParentPtr("node", node);
try conn.setTlsVerify(verify, self.use_proxy);
try self.handle.submitTlsVerify(conn, verify, self.use_proxy);
}
it = self.ready_queue.first;
while (it) |node| : (it = node.next) {
const conn: *http.Connection = @fieldParentPtr("node", node);
try conn.setTlsVerify(verify, self.use_proxy);
try self.handle.submitTlsVerify(conn, verify, self.use_proxy);
}
self.tls_verify = verify;
@@ -354,7 +351,7 @@ pub fn abort(self: *Client) void {
std.debug.assert(self.queue.first == null);
std.debug.assert(self.in_use.first == null);
std.debug.assert(self.ready_queue.first == null);
std.debug.assert(self.dirty.first == null);
std.debug.assert(self.handle.dirty.first == null);
}
}
@@ -422,7 +419,7 @@ pub fn tick(self: *Client, timeout_ms: u32, mode: DrainMode) !void {
fn drainQueue(self: *Client) !void {
while (self.queue.popFirst()) |queue_node| {
const transfer: *Transfer = @fieldParentPtr("_node", queue_node);
const conn = self.network.getConnection() orelse {
const conn = self.handle.getConnection() orelse {
self.queue.prepend(queue_node);
return;
};
@@ -584,7 +581,7 @@ fn process(self: *Client, transfer: *Transfer) !void {
// libcurl doesn't allow recursive calls, if we're in a `perform()` operation
// then we _have_ to queue this.
if (self.performing == false) {
if (self.network.getConnection()) |conn| {
if (self.handle.getConnection()) |conn| {
return self.makeRequest(conn, transfer);
}
}
@@ -657,7 +654,7 @@ fn makeRequest(self: *Client, conn: *http.Connection, transfer: *Transfer) anyer
// execution).
self.performing = true;
defer self.performing = false;
_ = try self.handles.perform();
_ = try self.handle.perform();
}
fn perform(self: *Client, timeout_ms: c_int) anyerror!void {
@@ -665,7 +662,7 @@ fn perform(self: *Client, timeout_ms: c_int) anyerror!void {
self.performing = true;
defer self.performing = false;
break :blk try self.handles.perform();
break :blk try self.handle.perform();
};
// Drain queued WebSocket events. ws callbacks (called from libcurl during
@@ -674,14 +671,7 @@ fn perform(self: *Client, timeout_ms: c_int) anyerror!void {
self.drainReadyWs();
// Process dirty connections — return them to Network pool.
while (self.dirty.popFirst()) |node| {
const conn: *http.Connection = @fieldParentPtr("node", node);
self.handles.remove(conn) catch |err| {
log.fatal(.http, "multi remove handle", .{ .err = err, .src = "perform" });
@panic("multi_remove_handle");
};
self.releaseConn(conn);
}
self.handle.drainDirty();
while (self.ready_queue.popFirst()) |node| {
const conn: *http.Connection = @fieldParentPtr("node", node);
@@ -708,7 +698,7 @@ fn perform(self: *Client, timeout_ms: c_int) anyerror!void {
if (running > 0 or self.cdp_link_active) {
// when cdp_link_active == true, the network thread will unblock this
// by calling wakup on our multi.
try self.handles.poll(&.{}, timeout_ms);
try self.handle.poll(&.{}, timeout_ms);
}
_ = try self.processMessages();
@@ -776,7 +766,7 @@ fn isFetchInterceptionMethod(method: []const u8) bool {
std.mem.eql(u8, method, "Fetch.continueWithAuth");
}
fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *Transfer) !bool {
fn processOneMessage(self: *Client, msg: Network.Handle.MultiMessage, transfer: *Transfer) !bool {
// State at entry: .inflight = conn (multi just delivered a completion).
if (msg.err == null or msg.err.? == error.RecvError) {
transfer.detectAuthChallenge(msg.conn);
@@ -816,7 +806,7 @@ fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *T
const conn = transfer._conn.?;
try self.handles.remove(conn);
try self.handle.remove(conn);
// Conn temporarily out of multi during reconfigure.
// _detached_conn lets processMessages release it if any of
// the steps below throw. State stays .inflight; _conn stays set
@@ -824,7 +814,7 @@ fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *T
transfer.reset();
try transfer.configureConn(conn);
try self.handles.add(conn);
try self.handle.add(conn);
transfer._detached_conn = null;
_ = try self.perform(0);
@@ -893,7 +883,7 @@ fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *T
fn processMessages(self: *Client) !bool {
var processed = false;
while (try self.handles.readMessage()) |msg| {
while (try self.handle.readMessage()) |msg| {
switch (msg.conn.transport) {
.http => |transfer| {
const done = self.processOneMessage(msg, transfer) catch |err| blk: {
@@ -950,7 +940,7 @@ pub fn trackConn(self: *Client, conn: *http.Connection) !void {
self.releaseConn(conn);
return err;
};
self.handles.add(conn) catch |err| {
self.handle.add(conn) catch |err| {
self.in_use.remove(&conn.node);
conn.in_use = false;
self.releaseConn(conn);
@@ -978,17 +968,11 @@ pub fn removeConn(self: *Client, conn: *http.Connection) void {
.websocket => self.ws_active -= 1,
else => unreachable,
}
if (self.handles.remove(conn)) {
self.releaseConn(conn);
} else |_| {
// Can happen if we're in a perform() call, so we'll queue this
// for cleanup later.
self.dirty.append(&conn.node);
}
self.handle.cancelConn(conn);
}
fn releaseConn(self: *Client, conn: *http.Connection) void {
self.network.releaseConnection(conn);
self.handle.releaseConnection(conn);
}
// Called from WebSocket libcurl callbacks (currently same worker thread, but

View File

@@ -152,11 +152,11 @@ pub fn init(url: []const u8, protocols: [][]const u8, frame: *Frame) !*WebSocket
const resolved_url = try URL.resolve(arena, frame.base(), url, .{ .always_dupe = true, .encoding = frame.charset });
const http_client = &frame._session.browser.http_client;
const conn = http_client.network.newConnection() orelse {
const conn = http_client.handle.newConnection() orelse {
return error.NoFreeConnection;
};
errdefer http_client.network.releaseConnection(conn);
errdefer http_client.handle.releaseConnection(conn);
try conn.setURL(resolved_url);
try conn.setConnectOnly(false);
@@ -285,7 +285,7 @@ fn queueMessage(self: *WebSocket, msg: Message) !void {
if (was_empty) {
// Unpause the send callback so libcurl will request data
if (self._conn) |conn| {
try conn.pause(.{ .cont = true });
try self._http_client.handle.submitUnpause(conn);
}
}
}

View File

@@ -118,7 +118,7 @@ pub fn init(
.cdp = self,
.state = .live,
.socket = socket,
.handles = http_client.handles,
.handle = http_client.handle,
};
}

View File

@@ -48,6 +48,8 @@ const Listener = struct {
onAccept: *const fn (ctx: *anyopaque, socket: posix.socket_t) void,
};
const Error = libcurl.Error;
// Read side of a CDP WebSocket, registered with the Network thread so
// bytes are read off the socket from here and dispatched into the CDP
// layer via direct method calls on `cdp`. Network never sends on the
@@ -59,10 +61,10 @@ pub const CdpLink = struct {
cdp: *CDP,
state: State,
socket: posix.socket_t,
// The worker's HttpClient.Handles (by value — it's one pointer
// wide). Network calls handles.wakeup() to unblock the worker
// from curl_multi_poll whenever it pushes to the worker's inbox.
handles: http.Handles,
// The worker's Handle (by value — it's two pointers wide).
// Network calls handle.wakeup() to unblock the worker from
// curl_multi_poll whenever it pushes to the worker's inbox.
handle: Handle,
node: DoublyLinkedList.Node = .{},
pub const State = enum {
@@ -424,6 +426,10 @@ pub fn deinit(self: *Network) void {
globalDeinit();
}
pub fn getHandle(self: *Network) !Handle {
return try Handle.init(self);
}
pub fn bind(
self: *Network,
address: *net.Address,
@@ -542,7 +548,7 @@ fn dropCdp(self: *Network, link: *CdpLink, err: ?anyerror, notify: bool) void {
// case) or are about to be unblocked via cdp_unregister.broadcast
// (unregister case); no extra wakeup needed.
link.cdp.onLinkDisconnect(err);
link.handles.wakeup() catch |e| {
link.handle.wakeup() catch |e| {
lp.log.warn(.cdp, "CDP link wakeup", .{ .err = e });
};
}
@@ -665,7 +671,7 @@ fn processCdpEvents(self: *Network) void {
// on_bytes succeeded — wake the worker so it observes anything
// new in the inbox (data / ping / close).
link.handles.wakeup() catch |err| {
link.handle.wakeup() catch |err| {
lp.log.warn(.cdp, "CDP link wakeup", .{ .err = err });
};
@@ -983,6 +989,128 @@ pub fn newConnection(self: *Network) ?*http.Connection {
return conn;
}
pub const Handle = struct {
multi: *libcurl.CurlM,
network: *Network,
// Conns whose remove was deferred because we were inside
// curl_multi_perform. Drained by drainDirty before the next perform.
dirty: std.DoublyLinkedList = .{},
pub fn init(network: *Network) !Handle {
const multi = libcurl.curl_multi_init() orelse return error.FailedToInitializeMulti;
errdefer libcurl.curl_multi_cleanup(multi) catch {};
try libcurl.curl_multi_setopt(multi, .max_host_connections, network.config.httpMaxHostOpen());
return .{ .network = network, .multi = multi };
}
pub fn deinit(self: *Handle) void {
libcurl.curl_multi_cleanup(self.multi) catch {};
}
pub fn add(self: *Handle, conn: *const http.Connection) !void {
try libcurl.curl_multi_add_handle(self.multi, conn._easy);
}
pub fn remove(self: *Handle, conn: *const http.Connection) !void {
try libcurl.curl_multi_remove_handle(self.multi, conn._easy);
}
pub fn perform(self: *Handle) !c_int {
var running: c_int = undefined;
try libcurl.curl_multi_perform(self.multi, &running);
return running;
}
pub fn poll(self: *Handle, extra_fds: []libcurl.CurlWaitFd, timeout_ms: c_int) !void {
try libcurl.curl_multi_poll(self.multi, extra_fds, timeout_ms, null);
}
// Thread-safe wake of a poll() in progress on this multi. Used by
// the Network thread to nudge the worker out of curl_multi_poll
// when it pushes work onto the worker's inbox.
pub fn wakeup(self: *Handle) !void {
try libcurl.curl_multi_wakeup(self.multi);
}
pub const MultiMessage = struct {
conn: *http.Connection,
err: ?Error,
};
pub fn readMessage(self: *Handle) !?MultiMessage {
var messages_count: c_int = 0;
const msg = libcurl.curl_multi_info_read(self.multi, &messages_count) orelse return null;
return switch (msg.data) {
.done => |err| {
var private: *anyopaque = undefined;
try libcurl.curl_easy_getinfo(msg.easy_handle, .private, &private);
return .{
.conn = @ptrCast(@alignCast(private)),
.err = err,
};
},
else => unreachable,
};
}
// connection pool delegates ----------------------------------------
pub fn getConnection(self: *Handle) ?*http.Connection {
return self.network.getConnection();
}
pub fn newConnection(self: *Handle) ?*http.Connection {
return self.network.newConnection();
}
pub fn releaseConnection(self: *Handle, conn: *http.Connection) void {
self.network.releaseConnection(conn);
}
// Cancel an active conn: remove from multi and return to pool. If
// we're inside curl_multi_perform (libcurl forbids remove from a
// callback), defer the remove via the dirty queue and let
// drainDirty pick it up before the next perform.
pub fn cancelConn(self: *Handle, conn: *http.Connection) void {
if (self.remove(conn)) {
self.releaseConnection(conn);
} else |_| {
self.dirty.append(&conn.node);
}
}
pub fn drainDirty(self: *Handle) void {
while (self.dirty.popFirst()) |node| {
const conn: *http.Connection = @fieldParentPtr("node", node);
self.remove(conn) catch |err| {
log.fatal(.http, "multi remove handle", .{ .err = err, .src = "drainDirty" });
@panic("multi_remove_handle");
};
self.releaseConnection(conn);
}
}
// per-conn ops on active conns -------------------------------------
//
// These wrap libcurl mutators that today run synchronously on the
// worker thread. The seam exists so the same call sites stay valid
// once the network thread owns the multi and these become
// cross-thread submits.
pub fn submitTlsVerify(self: *Handle, conn: *http.Connection, verify: bool, use_proxy: bool) !void {
_ = self;
try conn.setTlsVerify(verify, use_proxy);
}
pub fn submitUnpause(self: *Handle, conn: *http.Connection) !void {
_ = self;
try conn.pause(.{ .cont = true });
}
};
// Wraps lines @ 64 columns. A PEM is basically a base64 encoded DER (which is
// what Zig has), with lines wrapped at 64 characters and with a basic header
// and footer

View File

@@ -34,8 +34,6 @@ pub const readfunc_pause = libcurl.curl_readfunc_pause;
pub const writefunc_error = libcurl.curl_writefunc_error;
pub const WsFrameType = libcurl.WsFrameType;
const Error = libcurl.Error;
pub fn curl_version() [*c]const u8 {
return libcurl.curl_version();
}
@@ -572,16 +570,6 @@ pub const Connection = struct {
}
}
pub fn request(self: *const Connection, http_headers: *const Config.HttpHeaders) !u16 {
var header_list = try Headers.init(http_headers.user_agent_header);
defer header_list.deinit();
try self.secretHeaders(&header_list, http_headers);
try self.setHeaders(&header_list);
try libcurl.curl_easy_perform(self._easy);
return self.getResponseCode();
}
pub fn wsStartFrame(self: *const Connection, frame_type: libcurl.WsFrameType, size: usize) !void {
try libcurl.curl_ws_start_frame(self._easy, frame_type, @intCast(size));
}
@@ -591,69 +579,6 @@ pub const Connection = struct {
}
};
pub const Handles = struct {
multi: *libcurl.CurlM,
pub fn init(config: *const Config) !Handles {
const multi = libcurl.curl_multi_init() orelse return error.FailedToInitializeMulti;
errdefer libcurl.curl_multi_cleanup(multi) catch {};
try libcurl.curl_multi_setopt(multi, .max_host_connections, config.httpMaxHostOpen());
return .{ .multi = multi };
}
pub fn deinit(self: *Handles) void {
libcurl.curl_multi_cleanup(self.multi) catch {};
}
pub fn add(self: *Handles, conn: *const Connection) !void {
try libcurl.curl_multi_add_handle(self.multi, conn._easy);
}
pub fn remove(self: *Handles, conn: *const Connection) !void {
try libcurl.curl_multi_remove_handle(self.multi, conn._easy);
}
pub fn perform(self: *Handles) !c_int {
var running: c_int = undefined;
try libcurl.curl_multi_perform(self.multi, &running);
return running;
}
pub fn poll(self: *Handles, extra_fds: []libcurl.CurlWaitFd, timeout_ms: c_int) !void {
try libcurl.curl_multi_poll(self.multi, extra_fds, timeout_ms, null);
}
// Thread-safe wake of a poll() in progress on this multi. Used by
// the Network thread to nudge the worker out of curl_multi_poll
// when it pushes work onto the worker's inbox.
pub fn wakeup(self: *Handles) !void {
try libcurl.curl_multi_wakeup(self.multi);
}
pub const MultiMessage = struct {
conn: *Connection,
err: ?Error,
};
pub fn readMessage(self: *Handles) !?MultiMessage {
var messages_count: c_int = 0;
const msg = libcurl.curl_multi_info_read(self.multi, &messages_count) orelse return null;
return switch (msg.data) {
.done => |err| {
var private: *anyopaque = undefined;
try libcurl.curl_easy_getinfo(msg.easy_handle, .private, &private);
return .{
.conn = @ptrCast(@alignCast(private)),
.err = err,
};
},
else => unreachable,
};
}
};
fn debugCallback(_: *libcurl.Curl, msg_type: libcurl.CurlInfoType, raw: [*c]u8, len: usize, _: *anyopaque) c_int {
const data = raw[0..len];
switch (msg_type) {