Move Handles to Network

This commit is contained in:
Nikolay Govorov
2026-04-21 09:57:34 +01:00
parent 8bba8a0a4b
commit 3b4629b9e5
5 changed files with 599 additions and 444 deletions

View File

@@ -23,9 +23,11 @@ const builtin = @import("builtin");
const URL = @import("URL.zig");
const Notification = @import("../Notification.zig");
const CookieJar = @import("webapi/storage/Cookie.zig").Jar;
const WebSocket = @import("webapi/net/WebSocket.zig");
const http = @import("../network/http.zig");
const Network = @import("../network/Network.zig");
const Slot = @import("../network/Slot.zig");
const Robots = @import("../network/Robots.zig");
const Cache = @import("../network/cache/Cache.zig");
const timestamp = @import("../datetime.zig").timestamp;
@@ -71,36 +73,26 @@ http_active: usize = 0,
// 'networkAlmostIdle' Page.lifecycleEvent in CDP).
intercepted: usize = 0,
// Our curl multi handle.
handles: http.Handles,
// Connections currently in this client's curl_multi.
// In-flight connections (curl is driven by Network on main thread). The
// list holds `conn._worker_node` so it doesn't conflict with the
// Network-side `conn.node`. Used for abort enumeration.
in_use: std.DoublyLinkedList = .{},
// Connections that failed to be removed from curl_multi during perform.
dirty: std.DoublyLinkedList = .{},
// Whether we're currently inside a curl_multi_perform call.
performing: bool = false,
// WebSockets with queued events to be drained from the worker thread.
// Populated by libcurl callbacks (currently same thread, future cross-thread).
ws_ready: std.ArrayList(*WebSocket) = .{},
ws_ready_mutex: std.Thread.Mutex = .{},
// Slot the Network (main) thread uses to deliver completed transfers
// and any other cross-thread signals back to this worker.
slot: Slot,
// Use to generate the next request ID
next_request_id: u32 = 0,
// When handles has no more available easys, requests get queued.
// Transfers waiting for a free Connection from Network.getConnection.
queue: std.DoublyLinkedList = .{},
// Queue is for Transfers that have no connection. ready_queue is for connections
// that were initiated when performing == true and thus need to wait until
// performing == false before being added. I'm hoping this is temporary and that
// we can unify the two queues. But HTTP is being changed a lot right now, and
// I'm trying to minimize the surface area.
ready_queue: std.DoublyLinkedList = .{},
// The main app allocator
allocator: Allocator,
@@ -164,13 +156,13 @@ pub fn init(allocator: Allocator, network: *Network) !*Client {
const client = try allocator.create(Client);
errdefer allocator.destroy(client);
var handles = try http.Handles.init(network.config);
errdefer handles.deinit();
var slot = try Slot.init();
errdefer slot.deinit();
const http_proxy = network.config.httpProxy();
client.* = .{
.handles = handles,
.slot = slot,
.network = network,
.allocator = allocator,
.transfer_pool = transfer_pool,
@@ -187,7 +179,15 @@ pub fn init(allocator: Allocator, network: *Network) !*Client {
pub fn deinit(self: *Client) void {
self.abort();
self.handles.deinit();
// If Network has already stopped, drive its queues ourselves so the
// cancelations from abort() deliver before we close the slot.
if (self.network.shutdown.load(.acquire)) {
self.network.drainPendingForShutdown();
self.drainCompletions();
}
self.slot.deinit();
self.ws_ready.deinit(self.allocator);
self.transfer_pool.deinit();
@@ -230,19 +230,15 @@ pub fn setTlsVerify(self: *Client, verify: bool) !void {
// Remove inflight connections check on enable TLS b/c chromiumoxide calls
// the command during navigate and Curl seems to accept it...
self.tls_verify = verify;
var it = self.in_use.first;
while (it) |node| : (it = node.next) {
const conn: *http.Connection = @fieldParentPtr("node", node);
try conn.setTlsVerify(verify, self.use_proxy);
const conn: *http.Connection = @fieldParentPtr("_worker_node", node);
self.network.submitOp(conn, .{ .tls_verify = .{
.verify = verify,
.use_proxy = self.use_proxy,
} });
}
it = self.ready_queue.first;
while (it) |node| : (it = node.next) {
const conn: *http.Connection = @fieldParentPtr("node", node);
try conn.setTlsVerify(verify, self.use_proxy);
}
self.tls_verify = verify;
}
// Restrictive since it'll only work if there are no inflight requests. In some
@@ -280,8 +276,26 @@ pub fn abortFrame(self: *Client, frame_id: u32) void {
// Written this way so that both abort and abortFrame can share the same code
// but abort can avoid the frame_id check at comptime.
fn _abort(self: *Client, comptime abort_all: bool, frame_id: u32) void {
abortConnections(self.in_use, abort_all, frame_id);
abortConnections(self.ready_queue, abort_all, frame_id);
{
var n = self.in_use.first;
while (n) |node| {
n = node.next;
const conn: *http.Connection = @fieldParentPtr("_worker_node", node);
switch (conn.transport) {
.http => |transfer| {
if ((comptime abort_all) or transfer.req.frame_id == frame_id) {
transfer.kill();
}
},
.websocket => |ws| {
if ((comptime abort_all) or ws._frame._frame_id == frame_id) {
ws.kill();
}
},
.none => unreachable,
}
}
}
{
var q = &self.queue;
@@ -300,16 +314,15 @@ fn _abort(self: *Client, comptime abort_all: bool, frame_id: u32) void {
if (comptime abort_all) {
self.queue = .{};
self.ready_queue = .{};
}
if (comptime IS_DEBUG and abort_all) {
// Even after an abort_all, we could still have transfers, but, at the
// very least, they should all be flagged as aborted.
// After abort_all, http transfers are flagged aborted; WS conns
// linger in in_use until their canceled completion arrives.
var it = self.in_use.first;
var leftover: usize = 0;
while (it) |node| : (it = node.next) {
const conn: *http.Connection = @fieldParentPtr("node", node);
const conn: *http.Connection = @fieldParentPtr("_worker_node", node);
switch (conn.transport) {
.http => |transfer| std.debug.assert(transfer.aborted),
.websocket => {},
@@ -317,42 +330,46 @@ fn _abort(self: *Client, comptime abort_all: bool, frame_id: u32) void {
}
leftover += 1;
}
std.debug.assert(self.http_active == leftover);
}
}
fn abortConnections(list: std.DoublyLinkedList, comptime abort_all: bool, frame_id: u32) void {
var n = list.first;
while (n) |node| {
n = node.next;
const conn: *http.Connection = @fieldParentPtr("node", node);
switch (conn.transport) {
.http => |transfer| {
if ((comptime abort_all) or transfer.req.frame_id == frame_id) {
transfer.kill();
}
},
.websocket => |ws| {
if ((comptime abort_all) or ws._frame._frame_id == frame_id) {
ws.kill();
}
},
.none => unreachable,
}
std.debug.assert(self.http_active + self.ws_active == leftover);
}
}
pub fn tick(self: *Client, timeout_ms: u32) !PerformStatus {
try self.drainQueue();
self.drainCompletions();
self.drainReadyWs();
var pollfds: [2]posix.pollfd = undefined;
var count: usize = 0;
pollfds[count] = .{ .fd = self.slot.pollFd(), .events = posix.POLL.IN, .revents = 0 };
count += 1;
const cdp_idx: ?usize = if (self.cdp_client) |cdp_client| blk: {
pollfds[count] = .{ .fd = cdp_client.socket, .events = posix.POLL.IN, .revents = 0 };
const i = count;
count += 1;
break :blk i;
} else null;
_ = posix.poll(pollfds[0..count], @intCast(timeout_ms)) catch {};
try self.drainQueue();
self.drainCompletions();
self.drainReadyWs();
if (cdp_idx) |i| {
if (pollfds[i].revents != 0) return .cdp_socket;
}
return .normal;
}
fn drainQueue(self: *Client) !void {
while (self.queue.popFirst()) |queue_node| {
const conn = self.network.getConnection() orelse {
self.queue.prepend(queue_node);
break;
};
try self.makeRequest(conn, @fieldParentPtr("_node", queue_node));
}
return self.perform(@intCast(timeout_ms));
}
pub fn request(self: *Client, req: Request) !void {
@@ -701,14 +718,9 @@ fn waitForInterceptedResponse(self: *Client, transfer: *Transfer) !bool {
// cases, the interceptor is expected to call resume to continue the transfer
// or transfer.abort() to abort it.
fn process(self: *Client, transfer: *Transfer) !void {
// libcurl doesn't allow recursive calls, if we're in a `perform()` operation
// then we _have_ to queue this.
if (self.performing == false) {
if (self.network.getConnection()) |conn| {
return self.makeRequest(conn, transfer);
}
if (self.network.getConnection()) |conn| {
return self.makeRequest(conn, transfer);
}
self.queue.append(&transfer._node);
}
@@ -817,7 +829,6 @@ pub fn restoreOriginalProxy(self: *Client) !void {
fn makeRequest(self: *Client, conn: *http.Connection, transfer: *Transfer) anyerror!void {
{
// Reset per-response state for retries (auth challenge, queue).
const auth = transfer._auth_challenge;
transfer.reset();
transfer._auth_challenge = auth;
@@ -826,22 +837,13 @@ fn makeRequest(self: *Client, conn: *http.Connection, transfer: *Transfer) anyer
errdefer {
transfer._conn = null;
transfer.deinit();
self.releaseConn(conn);
self.network.releaseConnection(conn);
}
try transfer.configureConn(conn);
}
// As soon as this is called, our "perform" loop is responsible for
// cleaning things up. That's why the above code is in a block. If anything
// fails BEFORE `curl_multi_add_handle` succeeds, the we still need to do
// cleanup. But if things fail after `curl_multi_add_handle`, we expect
// perform to pickup the failure and cleanup.
self.trackConn(conn) catch |err| {
transfer._conn = null;
transfer.deinit();
return err;
};
self.trackConn(conn);
if (transfer.req.start_callback) |cb| {
cb(Response.fromTransfer(transfer)) catch |err| {
@@ -849,7 +851,6 @@ fn makeRequest(self: *Client, conn: *http.Connection, transfer: *Transfer) anyer
return err;
};
}
_ = try self.perform(0);
}
pub const PerformStatus = enum {
@@ -857,65 +858,51 @@ pub const PerformStatus = enum {
normal,
};
fn perform(self: *Client, timeout_ms: c_int) anyerror!PerformStatus {
const running = blk: {
self.performing = true;
defer self.performing = false;
break :blk try self.handles.perform();
fn httpCompletionCallback(conn: *http.Connection, _: ?anyerror) void {
const client = switch (conn.transport) {
.http => |t| t.client,
.websocket => |ws| ws._http_client,
.none => return,
};
// Drain queued WebSocket events. ws callbacks (called from libcurl during
// perform above) only buffer/queue — actual JS dispatch happens here, on
// the worker thread.
self.drainReadyWs();
// Process dirty connections — return them to Network pool.
while (self.dirty.popFirst()) |node| {
const conn: *http.Connection = @fieldParentPtr("node", node);
self.handles.remove(conn) catch |err| {
log.fatal(.http, "multi remove handle", .{ .err = err, .src = "perform" });
@panic("multi_remove_handle");
};
self.releaseConn(conn);
}
while (self.ready_queue.popFirst()) |node| {
const conn: *http.Connection = @fieldParentPtr("node", node);
try self.trackConn(conn);
}
// We're potentially going to block for a while until we get data. Process
// whatever messages we have waiting ahead of time.
if (try self.processMessages()) {
return .normal;
}
var status = PerformStatus.normal;
if (self.cdp_client) |cdp_client| {
var wait_fds = [_]http.WaitFd{.{
.fd = cdp_client.socket,
.events = .{ .pollin = true },
.revents = .{},
}};
try self.handles.poll(&wait_fds, timeout_ms);
if (wait_fds[0].revents.pollin or wait_fds[0].revents.pollpri or wait_fds[0].revents.pollout) {
status = .cdp_socket;
}
} else if (running > 0) {
try self.handles.poll(&.{}, timeout_ms);
}
_ = try self.processMessages();
return status;
client.slot.push(&conn.node);
}
fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *Transfer) !bool {
fn drainCompletions(self: *Client) void {
var list = self.slot.drain();
while (list.popFirst()) |node| {
const conn: *http.Connection = @fieldParentPtr("node", node);
const err = conn._completion_err;
conn._completion_err = null;
switch (conn.transport) {
.http => |transfer| {
const done = self.processOneMessage(conn, err, transfer) catch |e| blk: {
log.err(.http, "drain completions", .{ .err = e, .req = transfer });
transfer.requestFailed(e, true);
break :blk true;
};
if (done) transfer.deinit();
},
.websocket => |ws| {
if (err) |e| switch (e) {
error.GotNothing => ws.disconnected(null),
else => ws.disconnected(e),
} else {
ws.disconnected(null);
}
},
.none => {
log.err(.http, "drain none transport", .{ .err = err });
},
}
}
}
fn processOneMessage(self: *Client, conn: *http.Connection, maybe_err: ?anyerror, transfer: *Transfer) !bool {
// Detect auth challenge from response headers.
// Also check on RecvError: proxy may send 407 with headers before
// closing the connection (CONNECT tunnel not yet established).
if (msg.err == null or msg.err.? == error.RecvError) {
transfer.detectAuthChallenge(msg.conn);
if (maybe_err == null or maybe_err.? == error.RecvError) {
transfer.detectAuthChallenge(conn);
}
// In case of auth challenge
@@ -964,58 +951,40 @@ fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *T
}
// Handle redirects: reuse the same connection to preserve TCP state.
if (msg.err == null) {
const status = try msg.conn.getResponseCode();
// Conn is already out of the multi (removed by Network.processCompletions).
if (maybe_err == null) {
const status = try conn.getResponseCode();
if (status >= 300 and status <= 399) {
try transfer.handleRedirect();
const conn = transfer._conn.?;
try self.handles.remove(conn);
transfer._conn = null;
transfer._detached_conn = conn; // signal orphan for processMessages cleanup
transfer.reset();
try transfer.configureConn(conn);
try self.handles.add(conn);
transfer._detached_conn = null;
transfer._conn = conn; // reattach after successful re-add
_ = try self.perform(0);
self.network.submitRequest(conn);
return false;
}
}
// Transfer is done (success or error). Caller (processMessages) owns deinit.
// Return true = done (caller will deinit), false = continues (redirect/auth).
// When the server closes the TLS onnection without a close_notify alert,
// BoringSSL reports RecvError. If we already received valid HTTP headers,
// this is a normal end-of-body (the connection closure signals the end
// of the response per HTTP/1.1 when there is no Content-Length).
// We must check this before endTransfer, which may reset the easy handle.
const is_conn_close_recv = blk: {
const err = msg.err orelse break :blk false;
const err = maybe_err orelse break :blk false;
if (err != error.RecvError) break :blk false;
const hdr = msg.conn.getResponseHeader("connection", 0) orelse break :blk true;
const hdr = conn.getResponseHeader("connection", 0) orelse break :blk true;
break :blk std.ascii.eqlIgnoreCase(hdr.value, "close");
};
// make sure the transfer can't be immediately aborted from a callback
// since we still need it here.
transfer._performing = true;
defer transfer._performing = false;
if (msg.err != null and !is_conn_close_recv) {
transfer.requestFailed(transfer._callback_error orelse msg.err.?, true);
if (maybe_err != null and !is_conn_close_recv) {
transfer.requestFailed(transfer._callback_error orelse maybe_err.?, true);
return true;
}
if (!transfer._header_done_called) {
// In case of request w/o data, we need to call the header done
// callback now.
const proceed = try transfer.headerDoneCallback(msg.conn);
const proceed = try transfer.headerDoneCallback(conn);
if (!proceed) {
transfer.requestFailed(error.Abort, true);
return true;
@@ -1059,108 +1028,38 @@ fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *T
return true;
}
fn processMessages(self: *Client) !bool {
var processed = false;
while (try self.handles.readMessage()) |msg| {
switch (msg.conn.transport) {
.http => |transfer| {
const done = self.processOneMessage(msg, transfer) catch |err| blk: {
log.err(.http, "process_messages", .{ .err = err, .req = transfer });
transfer.requestFailed(err, true);
if (transfer._detached_conn) |c| {
// Conn was removed from handles during redirect reconfiguration
// but not re-added. Release it directly to avoid double-remove.
self.in_use.remove(&c.node);
self.http_active -= 1;
self.releaseConn(c);
transfer._detached_conn = null;
}
break :blk true;
};
if (done) {
transfer.deinit();
processed = true;
}
},
.websocket => |ws| {
// ws_active will be decremented through the call to disconnected
if (msg.err) |err| switch (err) {
error.GotNothing => ws.disconnected(null),
else => ws.disconnected(err),
} else {
// Clean close - no error
ws.disconnected(null);
}
processed = true;
},
.none => unreachable,
}
}
return processed;
}
pub fn trackConn(self: *Client, conn: *http.Connection) !void {
if (self.performing) {
conn.in_use = false;
self.ready_queue.append(&conn.node);
return;
}
self.in_use.append(&conn.node);
conn.in_use = true;
// Set private pointer so readMessage can find the Connection.
// Must be done each time since curl_easy_reset clears it when
// connections are returned to pool.
conn.setPrivate(conn) catch |err| {
self.in_use.remove(&conn.node);
conn.in_use = false;
self.releaseConn(conn);
return err;
};
self.handles.add(conn) catch |err| {
self.in_use.remove(&conn.node);
conn.in_use = false;
self.releaseConn(conn);
return err;
};
pub fn trackConn(self: *Client, conn: *http.Connection) void {
self.in_use.append(&conn._worker_node);
conn.on_complete = httpCompletionCallback;
switch (conn.transport) {
.http => self.http_active += 1,
.websocket => self.ws_active += 1,
else => unreachable,
}
self.network.submitRequest(conn);
}
pub fn removeConn(self: *Client, conn: *http.Connection) void {
if (conn.in_use == false) {
self.ready_queue.remove(&conn.node);
self.releaseConn(conn);
return;
}
self.in_use.remove(&conn.node);
conn.in_use = false;
// Completion cleanup: conn is already out of the multi. Release to pool.
pub fn finishConn(self: *Client, conn: *http.Connection) void {
self.in_use.remove(&conn._worker_node);
switch (conn.transport) {
.http => self.http_active -= 1,
.websocket => self.ws_active -= 1,
else => unreachable,
}
if (self.handles.remove(conn)) {
self.releaseConn(conn);
} else |_| {
// Can happen if we're in a perform() call, so we'll queue this
// for cleanup later.
self.dirty.append(&conn.node);
}
}
fn releaseConn(self: *Client, conn: *http.Connection) void {
self.network.releaseConnection(conn);
}
// Called from WebSocket libcurl callbacks (currently same worker thread, but
// the API is mutex-protected so it stays correct if libcurl moves off-thread).
// Abort path: asks main to remove the conn from the multi. The resulting
// Canceled completion will flow back through the slot and be finalized
// by drainCompletions → finishConn, so we do not touch in_use or the
// active counters here.
pub fn cancelConn(self: *Client, conn: *http.Connection) void {
self.network.submitRemove(conn);
}
// Called from WebSocket libcurl callbacks on the Network thread.
// The API is mutex-protected because the worker drains this queue.
pub fn addReadyWs(self: *Client, ws: *WebSocket) void {
self.ws_ready_mutex.lock();
defer self.ws_ready_mutex.unlock();
@@ -1336,10 +1235,6 @@ pub const Transfer = struct {
_notified_fail: bool = false,
_conn: ?*http.Connection = null,
// Set when conn is temporarily detached from transfer during redirect
// reconfiguration. Used by processMessages to release the orphaned conn
// if reconfiguration fails.
_detached_conn: ?*http.Connection = null,
_auth_challenge: ?http.AuthChallenge = null,
@@ -1371,14 +1266,14 @@ pub const Transfer = struct {
fn releaseConn(self: *Transfer) void {
if (self._conn) |conn| {
self.client.removeConn(conn);
self.client.finishConn(conn);
self._conn = null;
}
}
fn deinit(self: *Transfer) void {
if (self._conn) |conn| {
self.client.removeConn(conn);
self.client.finishConn(conn);
self._conn = null;
}
@@ -1389,15 +1284,16 @@ pub const Transfer = struct {
pub fn abort(self: *Transfer, err: anyerror) void {
self.requestFailed(err, true);
self.aborted = true;
if (self._performing or self.client.performing) {
// We're currently in a curl_multi_perform. We cannot call
// curl_multi_remove_handle from a curl callback. Instead, we flag
// this transfer and our callbacks will check for this flag.
self.aborted = true;
if (self._performing) return;
if (self._conn) |conn| {
self.client.cancelConn(conn);
return;
}
self.client.queue.remove(&self._node);
self.deinit();
}
@@ -1413,20 +1309,18 @@ pub const Transfer = struct {
cb(self.req.ctx);
}
if (self._performing or self.client.performing) {
// We're currently inside of a callback. This client, and libcurl
// generally don't expect a transfer to become deinitialized during
// a callback. We can flag the transfer as aborted (which is what
// we do when transfer.abort() is called in this condition) AND,
// since this "kill()"should prevent any future callbacks, the best
// we can do is null/noop them.
self.aborted = true;
self.req.start_callback = null;
self.req.shutdown_callback = null;
self.req.header_callback = Noop.headerCallback;
self.req.data_callback = Noop.dataCallback;
self.req.done_callback = Noop.doneCallback;
self.req.error_callback = Noop.errorCallback;
self.aborted = true;
self.req.start_callback = null;
self.req.shutdown_callback = null;
self.req.header_callback = Noop.headerCallback;
self.req.data_callback = Noop.dataCallback;
self.req.done_callback = Noop.doneCallback;
self.req.error_callback = Noop.errorCallback;
if (self._performing) return;
if (self._conn) |conn| {
self.client.cancelConn(conn);
return;
}

View File

@@ -28,6 +28,7 @@ const URL = @import("../../URL.zig");
const Page = @import("../../Page.zig");
const Frame = @import("../../Frame.zig");
const HttpClient = @import("../../HttpClient.zig");
const ArenaPool = @import("../../../ArenaPool.zig");
const Event = @import("../Event.zig");
const EventTarget = @import("../EventTarget.zig");
@@ -44,6 +45,8 @@ _rc: lp.RC(u8) = .{},
_frame: *Frame,
_proto: *EventTarget,
_arena: Allocator,
// Cached for use by deinit — `_frame._page` may be torn down by then.
_arena_pool: *ArenaPool,
// Connection state
_ready_state: ReadyState = .connecting,
@@ -74,9 +77,9 @@ _recv_buffer: std.ArrayList(u8) = .empty,
// Used to slice out the message when bytes_left reaches 0.
_assembling_start: usize = 0,
// Events queued by libcurl callbacks; drained from the worker thread via
// drainPending. Callbacks must NEVER enter V8 directly (they can run from
// any thread driving curl_multi_perform), so all dispatch happens here.
// Events queued by libcurl callbacks on the Network thread; drained from
// the worker thread via drainPending. Callbacks must NEVER enter V8 directly,
// so all dispatch happens here.
_pending_messages: std.ArrayList(QueuedMessage) = .empty,
_pending_open: bool = false,
_pending_close: ?PendingClose = null,
@@ -86,6 +89,10 @@ _pending_close: ?PendingClose = null,
// WebSocket stays alive between queueing and drain.
_in_ready_list: bool = false,
// Set while a cancel is in flight; holds an extra ref so callbacks
// can't deref `_conn` after free, and dedupes repeated cleanup(false).
_cancel_pending: bool = false,
// close info for event dispatch
_close_code: u16 = 1000,
_close_reason: []const u8 = "",
@@ -175,13 +182,14 @@ pub fn init(url: []const u8, protocols: [][]const u8, frame: *Frame) !*WebSocket
._frame = frame,
._conn = conn,
._arena = arena,
._arena_pool = frame._session.browser.arena_pool,
._proto = undefined,
._url = resolved_url,
._req_headers = headers,
._http_client = http_client,
});
conn.transport = .{ .websocket = self };
try http_client.trackConn(conn);
http_client.trackConn(conn);
if (comptime IS_DEBUG) {
log.info(.websocket, "connecting", .{ .url = url });
@@ -196,7 +204,8 @@ pub fn init(url: []const u8, protocols: [][]const u8, frame: *Frame) !*WebSocket
}
pub fn deinit(self: *WebSocket, page: *Page) void {
self.cleanup();
_ = page;
self.cleanup(false);
if (self._on_open) |func| {
func.release();
@@ -212,10 +221,9 @@ pub fn deinit(self: *WebSocket, page: *Page) void {
}
for (self._send_queue.items) |msg| {
msg.deinit(page);
msg.deinit(self._arena_pool);
}
page.releaseArena(self._arena);
self._arena_pool.release(self._arena);
}
pub fn releaseRef(self: *WebSocket, page: *Page) void {
@@ -232,15 +240,15 @@ fn asEventTarget(self: *WebSocket) *EventTarget {
// we're being aborted internally (e.g. frame shutting down)
pub fn kill(self: *WebSocket) void {
self.cleanup();
self._ready_state = .closed;
self.cleanup(false);
}
pub fn disconnected(self: *WebSocket, err_: ?anyerror) void {
if (self._ready_state == .closed) {
// already disconnected (e.g. close-handshake disconnected us, then
// libcurl reports the same connection completion).
return;
}
defer self.cleanup(true);
if (self._ready_state == .closed) return;
const was_clean = self._ready_state == .closing and err_ == null;
self._ready_state = .closed;
@@ -250,9 +258,6 @@ pub fn disconnected(self: *WebSocket, err_: ?anyerror) void {
log.info(.websocket, "disconnected", .{ .url = self._url, .reason = "closed" });
}
// Queue events first (markReady acquires a "pending" ref), then cleanup
// (which releases the create-time ref). The pending ref keeps us alive
// until drainPending dispatches and releases it.
self._pending_close = .{
.code = if (was_clean) self._close_code else 1006,
.reason = if (was_clean) self._close_reason else "",
@@ -260,18 +265,30 @@ pub fn disconnected(self: *WebSocket, err_: ?anyerror) void {
.with_error = !was_clean,
};
self.markReady();
self.cleanup();
}
fn cleanup(self: *WebSocket) void {
if (self._conn) |conn| {
self._http_client.removeConn(conn);
self._req_headers.deinit();
self._conn = null;
self.releaseRef(self._frame._page);
self._send_queue.clearRetainingCapacity();
// completed=true releases the conn to the pool and drops the create-time
// ref; called from disconnected(). completed=false begins a cancel and
// holds an extra ref until the canceled completion routes through
// disconnected → cleanup(true).
fn cleanup(self: *WebSocket, completed: bool) void {
const conn = self._conn orelse return;
if (!completed) {
if (self._cancel_pending) return;
self._cancel_pending = true;
self.acquireRef();
self._http_client.cancelConn(conn);
return;
}
self._http_client.finishConn(conn);
self._req_headers.deinit();
self._conn = null;
self.releaseRef(self._frame._page); // create-time
if (self._cancel_pending) {
self._cancel_pending = false;
self.releaseRef(self._frame._page); // pending-cancel
}
self._send_queue.clearRetainingCapacity();
}
fn queueMessage(self: *WebSocket, msg: Message) !void {
@@ -279,9 +296,10 @@ fn queueMessage(self: *WebSocket, msg: Message) !void {
try self._send_queue.append(self._arena, msg);
if (was_empty) {
// Unpause the send callback so libcurl will request data
// Unpause via Network — curl_easy_pause from this thread would
// race with curl_multi_perform.
if (self._conn) |conn| {
try conn.pause(.{ .cont = true });
self._http_client.network.submitOp(conn, .unpause);
}
}
}
@@ -392,7 +410,7 @@ pub fn close(self: *WebSocket, code_: ?u16, reason_: ?[]const u8) !void {
.with_error = false,
};
self.markReady();
self.cleanup();
self.cleanup(false);
return;
}
@@ -487,10 +505,9 @@ pub fn setOnClose(self: *WebSocket, cb_: ?js.Function) !void {
}
}
// Register self as having pending events to drain. Called from any thread
// that produces ws events (currently libcurl callbacks on the worker thread,
// future: Network thread). Acquires one extra ref to keep the WebSocket
// alive between queueing and the drainPending call.
// Register self as having pending events to drain. Called from the Network
// thread while libcurl is producing WS events. Acquires one extra ref to keep
// the WebSocket alive between queueing and the drainPending call.
fn markReady(self: *WebSocket) void {
if (self._in_ready_list) return;
self._in_ready_list = true;
@@ -499,11 +516,10 @@ fn markReady(self: *WebSocket) void {
}
// Dispatches all queued events to JS. Must be called from the worker thread
// (the one that owns the V8 isolate). HttpClient calls this from its perform
// loop after curl_multi_perform.
// (the one that owns the V8 isolate).
pub fn drainPending(self: *WebSocket) void {
self._in_ready_list = false;
defer self.releaseRef(self._page._session);
defer self.releaseRef(self._frame._page);
if (self._pending_open) {
self._pending_open = false;
@@ -662,7 +678,7 @@ fn writeContent(self: *WebSocket, conn: *http.Connection, buf: []u8, byte_msg: M
if (self._send_offset >= byte_msg.data.len) {
const removed = self._send_queue.orderedRemove(0);
removed.deinit(self._frame._page);
removed.deinit(self._arena_pool);
if (comptime IS_DEBUG) {
log.debug(.websocket, "send complete", .{ .url = self._url, .len = byte_msg.data.len, .queue = self._send_queue.items.len });
}
@@ -738,9 +754,10 @@ fn _receivedDataCallback(conn: *http.Connection, data: []const u8) !void {
1005; // No status code received
if (self._ready_state == .closing) {
// Client-initiated close: this is the server's response.
// Close handshake complete - disconnect.
self.disconnected(null);
// Client-initiated close server's response. Don't
// disconnect inline (UAF: conn still in multi). Curl
// will deliver normal completion when the server closes
// the socket per RFC 6455 §5.5.1.
} else {
// Server-initiated close: send reciprocal close frame per RFC 6455 §5.5.1
self._close_code = received_code;
@@ -823,9 +840,9 @@ const Message = union(enum) {
arena: Allocator,
data: []const u8,
};
fn deinit(self: Message, page: *Page) void {
fn deinit(self: Message, pool: *ArenaPool) void {
switch (self) {
.text, .binary => |msg| page.releaseArena(msg.arena),
.text, .binary => |msg| pool.release(msg.arena),
.close => {},
}
}

View File

@@ -80,8 +80,14 @@ shutdown: std.atomic.Value(bool) = .init(false),
// Currently, Network is used sparingly, and we only create it on demand.
// When Network becomes truly shared, it should become a regular field.
multi: ?*libcurl.CurlM = null,
// Workers push via submit*; main pops in drainQueue. `conn.node` is
// shared across pending_add/pending_remove (mutually exclusive);
// `conn._op_node` is independent and only ever lives in pending_ops.
submission_mutex: std.Thread.Mutex = .{},
submission_queue: std.DoublyLinkedList = .{},
pending_add: std.DoublyLinkedList = .{},
pending_remove: std.DoublyLinkedList = .{},
pending_ops: std.DoublyLinkedList = .{},
callbacks: [MAX_TICK_CALLBACKS]TickCallback = undefined,
callbacks_len: usize = 0,
@@ -223,8 +229,8 @@ pub fn init(allocator: Allocator, app: *App, config: *const Config) !Network {
const pipe = try posix.pipe2(.{ .NONBLOCK = true, .CLOEXEC = true });
// 0 is wakeup, 1 is listener, rest for curl fds
const pollfds = try allocator.alloc(posix.pollfd, PSEUDO_POLLFDS + config.httpMaxConcurrent());
// 0 is wakeup, 1 is listener.
const pollfds = try allocator.alloc(posix.pollfd, PSEUDO_POLLFDS);
errdefer allocator.free(pollfds);
@memset(pollfds, .{ .fd = -1, .events = 0, .revents = 0 });
@@ -425,8 +431,7 @@ pub fn run(self: *Network) void {
libcurl.curl_multi_perform(multi, &running_handles) catch |err| {
lp.log.err(.app, "curl perform", .{ .err = err });
};
self.preparePollFds(multi);
self.processCompletions(multi);
}
// for ontick to work, you need to wake up periodically
@@ -438,16 +443,48 @@ pub fn run(self: *Network) void {
const curl_timeout = self.getCurlTimeout();
if (curl_timeout == 0) {
break :blk min_timeout;
break :blk 0;
}
break :blk @min(min_timeout, curl_timeout);
};
_ = posix.poll(self.pollfds, timeout) catch |err| {
lp.log.err(.app, "poll", .{ .err = err });
continue;
};
if (self.multi != null and running_handles > 0) {
const multi = self.multi.?;
var extra_fds: [PSEUDO_POLLFDS]libcurl.CurlWaitFd = undefined;
var extra_len: usize = 0;
const wake_idx = extra_len;
extra_fds[extra_len] = .{
.fd = poll_fd.fd,
.events = .{ .pollin = true },
.revents = .{},
};
extra_len += 1;
const listen_idx = if (listen_fd.fd >= 0) blk: {
const idx = extra_len;
extra_fds[extra_len] = .{
.fd = listen_fd.fd,
.events = .{ .pollin = true },
.revents = .{},
};
extra_len += 1;
break :blk idx;
} else null;
libcurl.curl_multi_poll(multi, extra_fds[0..extra_len], timeout, null) catch |err| {
lp.log.err(.app, "curl poll", .{ .err = err });
continue;
};
poll_fd.revents = if (extra_fds[wake_idx].revents.pollin) posix.POLL.IN else 0;
if (listen_idx) |idx| {
listen_fd.revents = if (extra_fds[idx].revents.pollin) posix.POLL.IN else 0;
}
} else {
_ = posix.poll(self.pollfds[0..PSEUDO_POLLFDS], timeout) catch |err| {
lp.log.err(.app, "poll", .{ .err = err });
continue;
};
}
// check wakeup pipe
if (poll_fd.revents != 0) {
@@ -476,7 +513,9 @@ pub fn run(self: *Network) void {
// Check if fireTicks submitted new requests (e.g. telemetry flush).
// If so, continue the loop to drain and send them before exiting.
self.submission_mutex.lock();
const has_pending = self.submission_queue.first != null;
const has_pending = self.pending_add.first != null or
self.pending_remove.first != null or
self.pending_ops.first != null;
self.submission_mutex.unlock();
if (!has_pending) break;
}
@@ -497,22 +536,161 @@ pub fn run(self: *Network) void {
}
pub fn submitRequest(self: *Network, conn: *http.Connection) void {
self.submission_mutex.lock();
self.submission_queue.append(&conn.node);
self.submission_mutex.unlock();
{
self.submission_mutex.lock();
defer self.submission_mutex.unlock();
lp.assert(conn._submission == .idle, "submitRequest: conn not idle", .{});
conn._submission = .pending_add;
self.pending_add.append(&conn.node);
}
self.wakeupPoll();
}
// Fired from the worker thread. If the conn is still in pending_add
// (never reached the multi), short-circuit: remove it from the list and
// deliver a Canceled completion synchronously via on_complete. Otherwise
// queue a remove for main to process.
pub fn submitRemove(self: *Network, conn: *http.Connection) void {
var local_cancel: bool = false;
{
self.submission_mutex.lock();
defer self.submission_mutex.unlock();
switch (conn._submission) {
.pending_add => {
self.pending_add.remove(&conn.node);
conn._submission = .idle;
self.removeFromOpsLocked(conn);
local_cancel = true;
},
.in_multi => {
conn._submission = .pending_remove;
self.pending_remove.append(&conn.node);
},
.idle, .pending_remove => {
lp.log.warn(.app, "submitRemove bad state", .{ .state = @tagName(conn._submission) });
return;
},
}
}
if (local_cancel) {
if (conn.on_complete) |cb| {
conn._completion_err = error.Canceled;
cb(conn, error.Canceled);
} else {
self.releaseConnection(conn);
}
return;
}
self.wakeupPoll();
}
// Fire-and-forget op queued for Network to execute on a conn that's
// currently driven by curl. Dropped if the conn isn't in the multi.
pub const Op = union(enum) {
unpause,
tls_verify: http.Connection.TlsVerifyOp,
};
pub fn submitOp(self: *Network, conn: *http.Connection, op: Op) void {
{
self.submission_mutex.lock();
defer self.submission_mutex.unlock();
switch (conn._submission) {
.pending_add, .in_multi => {},
.idle, .pending_remove => return,
}
switch (op) {
.unpause => conn._op_unpause = true,
.tls_verify => |t| conn._op_tls_verify = t,
}
if (!conn._op_in_list) {
conn._op_in_list = true;
self.pending_ops.append(&conn._op_node);
}
}
self.wakeupPoll();
}
// Caller holds submission_mutex. Called on every transition to .idle.
fn removeFromOpsLocked(self: *Network, conn: *http.Connection) void {
if (conn._op_in_list) {
self.pending_ops.remove(&conn._op_node);
conn._op_in_list = false;
}
conn._op_unpause = false;
conn._op_tls_verify = null;
}
fn wakeupPoll(self: *Network) void {
_ = posix.write(self.wakeup_pipe[1], &.{1}) catch {};
}
fn drainQueue(self: *Network) void {
self.submission_mutex.lock();
defer self.submission_mutex.unlock();
// add/remove are queued for execution outside the lock so that
// on_complete / releaseConnection can run unblocked. Ops execute
// *under* the lock — that's what keeps the conn alive (every path
// that releases a WS conn first transitions out of .in_multi here).
// pause/setopt only flip internal libcurl flags, no callbacks fire.
var to_add: std.DoublyLinkedList = .{};
var to_remove: std.DoublyLinkedList = .{};
{
self.submission_mutex.lock();
defer self.submission_mutex.unlock();
if (self.submission_queue.first == null) return;
while (self.pending_remove.popFirst()) |node| {
const conn: *http.Connection = @fieldParentPtr("node", node);
lp.assert(conn._submission == .pending_remove, "drainQueue: conn not in pending_remove", .{});
conn._submission = .idle;
self.removeFromOpsLocked(conn);
to_remove.append(node);
}
while (self.pending_add.popFirst()) |node| {
const conn: *http.Connection = @fieldParentPtr("node", node);
lp.assert(conn._submission == .pending_add, "drainQueue: conn not in pending_add", .{});
// `in_multi` is the target state; handleAdd may downgrade to
// idle on failure and release the conn.
conn._submission = .in_multi;
to_add.append(node);
}
while (self.pending_ops.popFirst()) |node| {
const conn: *http.Connection = @fieldParentPtr("_op_node", node);
conn._op_in_list = false;
// Conn raced out of multi between submitOp and now; drop ops.
if (conn._submission != .in_multi) {
conn._op_unpause = false;
conn._op_tls_verify = null;
continue;
}
if (conn._op_unpause) {
conn._op_unpause = false;
conn.pause(.{ .cont = true }) catch |err| {
lp.log.warn(.app, "curl pause", .{ .err = err });
};
}
if (conn._op_tls_verify) |t| {
conn._op_tls_verify = null;
conn.setTlsVerify(t.verify, t.use_proxy) catch |err| {
lp.log.warn(.app, "curl setTlsVerify", .{ .err = err });
};
}
}
}
// Process removes before adds: cancellations should take effect before
// we admit new transfers.
while (to_remove.popFirst()) |node| {
const conn: *http.Connection = @fieldParentPtr("node", node);
self.handleRemove(conn);
}
while (to_add.popFirst()) |node| {
const conn: *http.Connection = @fieldParentPtr("node", node);
self.handleAdd(conn);
}
}
// Assumes conn._submission has already been set to .in_multi. On failure
// we must roll back to .idle and release the conn.
fn handleAdd(self: *Network, conn: *http.Connection) void {
const multi = self.multi orelse blk: {
const m = libcurl.curl_multi_init() orelse {
lp.assert(false, "curl multi init failed", .{});
@@ -522,17 +700,44 @@ fn drainQueue(self: *Network) void {
break :blk m;
};
while (self.submission_queue.popFirst()) |node| {
const conn: *http.Connection = @fieldParentPtr("node", node);
conn.setPrivate(conn) catch |err| {
lp.log.err(.app, "curl set private", .{ .err = err });
self.releaseConnection(conn);
continue;
};
libcurl.curl_multi_add_handle(multi, conn._easy) catch |err| {
lp.log.err(.app, "curl multi add", .{ .err = err });
self.releaseConnection(conn);
};
conn.setPrivate(conn) catch |err| {
lp.log.err(.app, "curl set private", .{ .err = err });
self.handleAddFailure(conn, err);
return;
};
libcurl.curl_multi_add_handle(multi, conn._easy) catch |err| {
lp.log.err(.app, "curl multi add", .{ .err = err });
self.handleAddFailure(conn, err);
};
}
fn handleAddFailure(self: *Network, conn: *http.Connection, err: anyerror) void {
{
self.submission_mutex.lock();
defer self.submission_mutex.unlock();
conn._submission = .idle;
self.removeFromOpsLocked(conn);
}
if (conn.on_complete) |cb| {
conn._completion_err = err;
cb(conn, err);
} else {
self.releaseConnection(conn);
}
}
// Assumes conn._submission has already been set to .idle and the conn is
// not in any submission list. The conn may still be in the multi (normal
// cancel path).
fn handleRemove(self: *Network, conn: *http.Connection) void {
if (self.multi) |multi| {
_ = libcurl.curl_multi_remove_handle(multi, conn._easy) catch {};
}
if (conn.on_complete) |cb| {
conn._completion_err = error.Canceled;
cb(conn, error.Canceled);
} else {
self.releaseConnection(conn);
}
}
@@ -541,6 +746,15 @@ pub fn stop(self: *Network) void {
self.wakeupPoll();
}
// Caller guarantees Network.run is not executing. Used to drive late
// abort() cancelations through after Network.stop()+join().
pub fn drainPendingForShutdown(self: *Network) void {
self.drainQueue();
if (self.multi) |multi| {
self.processCompletions(multi);
}
}
fn acceptConnections(self: *Network) void {
if (self.shutdown.load(.acquire)) {
return;
@@ -571,17 +785,6 @@ fn acceptConnections(self: *Network) void {
}
}
fn preparePollFds(self: *Network, multi: *libcurl.CurlM) void {
const curl_fds = self.pollfds[PSEUDO_POLLFDS..];
@memset(curl_fds, .{ .fd = -1, .events = 0, .revents = 0 });
var fd_count: c_uint = 0;
const wait_fds: []libcurl.CurlWaitFd = @ptrCast(curl_fds);
libcurl.curl_multi_waitfds(multi, wait_fds, &fd_count) catch |err| {
lp.log.err(.app, "curl waitfds", .{ .err = err });
};
}
fn getCurlTimeout(self: *Network) i32 {
const multi = self.multi orelse return -1;
var timeout_ms: c_long = -1;
@@ -592,13 +795,12 @@ fn getCurlTimeout(self: *Network) i32 {
fn processCompletions(self: *Network, multi: *libcurl.CurlM) void {
var msgs_in_queue: c_int = 0;
while (libcurl.curl_multi_info_read(multi, &msgs_in_queue)) |msg| {
switch (msg.data) {
.done => |maybe_err| {
if (maybe_err) |err| {
lp.log.warn(.app, "curl transfer error", .{ .err = err });
}
},
const maybe_err: ?anyerror = switch (msg.data) {
.done => |e| e,
else => continue,
};
if (maybe_err) |err| {
lp.log.warn(.app, "curl transfer error", .{ .err = err });
}
const easy: *libcurl.Curl = msg.easy_handle;
@@ -608,19 +810,29 @@ fn processCompletions(self: *Network, multi: *libcurl.CurlM) void {
const conn: *http.Connection = @ptrCast(@alignCast(ptr));
libcurl.curl_multi_remove_handle(multi, easy) catch {};
self.releaseConnection(conn);
}
}
comptime {
if (@sizeOf(posix.pollfd) != @sizeOf(libcurl.CurlWaitFd)) {
@compileError("pollfd and CurlWaitFd size mismatch");
}
if (@offsetOf(posix.pollfd, "fd") != @offsetOf(libcurl.CurlWaitFd, "fd") or
@offsetOf(posix.pollfd, "events") != @offsetOf(libcurl.CurlWaitFd, "events") or
@offsetOf(posix.pollfd, "revents") != @offsetOf(libcurl.CurlWaitFd, "revents"))
{
@compileError("pollfd and CurlWaitFd layout mismatch");
// Transition submission state. Races with worker submitRemove:
// if the worker already queued a remove, we absorb it here and
// treat this as a normal completion (cancel-after-complete is
// effectively a no-op).
{
self.submission_mutex.lock();
defer self.submission_mutex.unlock();
switch (conn._submission) {
.in_multi => {},
.pending_remove => self.pending_remove.remove(&conn.node),
else => lp.assert(false, "completion bad state", .{ .state = @tagName(conn._submission) }),
}
conn._submission = .idle;
self.removeFromOpsLocked(conn);
}
if (conn.on_complete) |cb| {
conn._completion_err = maybe_err;
cb(conn, maybe_err);
} else {
self.releaseConnection(conn);
}
}
}

60
src/network/Slot.zig Normal file
View File

@@ -0,0 +1,60 @@
// A Slot couples a wakeup pipe with a thread-safe linked-list queue. One
// consumer polls the pipe fd and drains the queue; any thread may push
// nodes or wake the consumer. Used to deliver completed transfers (and
// other cross-thread signals) from the main/Network thread to a worker.
const std = @import("std");
const posix = std.posix;
const Slot = @This();
_pipe: [2]posix.fd_t,
_mutex: std.Thread.Mutex = .{},
_queue: std.DoublyLinkedList = .{},
pub fn init() !Slot {
const pipe = try posix.pipe2(.{ .NONBLOCK = true, .CLOEXEC = true });
return .{ ._pipe = pipe };
}
pub fn deinit(self: *Slot) void {
for (&self._pipe) |*fd| {
if (fd.* >= 0) {
posix.close(fd.*);
fd.* = -1;
}
}
}
pub fn pollFd(self: *const Slot) posix.fd_t {
return self._pipe[0];
}
pub fn wake(self: *Slot) void {
_ = posix.write(self._pipe[1], &.{1}) catch {};
}
pub fn push(self: *Slot, node: *std.DoublyLinkedList.Node) void {
self._mutex.lock();
self._queue.append(node);
self._mutex.unlock();
self.wake();
}
// Consumer drains signal bytes first, then pops all queued nodes.
// Pipe-first ordering ensures pushes that race with drain are not lost:
// producer writes the queue before the byte, so any byte observed implies
// the queued node is visible on the next lock.
pub fn drain(self: *Slot) std.DoublyLinkedList {
var buf: [64]u8 = undefined;
while (true) {
_ = posix.read(self._pipe[0], &buf) catch break;
}
self._mutex.lock();
defer self._mutex.unlock();
var out: std.DoublyLinkedList = .{};
while (self._queue.popFirst()) |n| out.append(n);
return out;
}

View File

@@ -34,8 +34,6 @@ pub const readfunc_pause = libcurl.curl_readfunc_pause;
pub const writefunc_error = libcurl.curl_writefunc_error;
pub const WsFrameType = libcurl.WsFrameType;
const Error = libcurl.Error;
pub fn curl_version() [*c]const u8 {
return libcurl.curl_version();
}
@@ -271,10 +269,46 @@ fn opensocketCallback(
pub const Connection = struct {
_easy: *libcurl.Curl,
in_use: bool,
transport: Transport,
// Network-side node: the conn's submission state selects which list
// currently owns it (available pool / pending_add / pending_remove /
// Slot queue). Transitions of `_submission` and list membership must
// happen together under `Network.submission_mutex`.
node: std.DoublyLinkedList.Node = .{},
// Worker-side: HttpClient.in_use. Independent of `node`.
_worker_node: std.DoublyLinkedList.Node = .{},
// Tracks which submission list (if any) currently contains `node`.
// Guarded by Network.submission_mutex.
_submission: SubmissionState = .idle,
// If set, called after the easy handle is removed from the multi; the
// callback takes ownership of the conn and must eventually release it.
on_complete: ?*const fn (conn: *Connection, err: ?anyerror) void = null,
// Err stashed by on_complete for worker to pick up via Slot.
_completion_err: ?anyerror = null,
// Op channel guarded by Network.submission_mutex.
_op_node: std.DoublyLinkedList.Node = .{},
_op_in_list: bool = false,
_op_unpause: bool = false,
_op_tls_verify: ?TlsVerifyOp = null,
pub const TlsVerifyOp = struct { verify: bool, use_proxy: bool };
pub const SubmissionState = enum(u8) {
// Not in pending_add/pending_remove, not in the curl multi.
// Conn is either in the pool, freshly drawn, or in worker/slot
// queues.
idle,
pending_add,
in_multi,
pending_remove,
};
pub const Transport = union(enum) {
none, // used for cases that manage their own connection, e.g. telemetry
http: *@import("../browser/HttpClient.zig").Transfer,
@@ -288,7 +322,7 @@ pub const Connection = struct {
) !Connection {
const easy = libcurl.curl_easy_init() orelse return error.FailedToInitializeEasy;
var self = Connection{ ._easy = easy, .in_use = false, .transport = .none };
var self = Connection{ ._easy = easy, .transport = .none };
errdefer self.deinit();
try self.reset(config, ca_blob, ip_filter);
@@ -420,6 +454,10 @@ pub const Connection = struct {
) !void {
libcurl.curl_easy_reset(self._easy);
self.transport = .none;
self.on_complete = null;
self._completion_err = null;
// _submission and _op_* fields are owned by Network and cleared
// by removeFromOpsLocked on every transition to .idle.
// timeouts
try libcurl.curl_easy_setopt(self._easy, .timeout_ms, config.httpTimeout());
@@ -572,16 +610,6 @@ pub const Connection = struct {
}
}
pub fn request(self: *const Connection, http_headers: *const Config.HttpHeaders) !u16 {
var header_list = try Headers.init(http_headers.user_agent_header);
defer header_list.deinit();
try self.secretHeaders(&header_list, http_headers);
try self.setHeaders(&header_list);
try libcurl.curl_easy_perform(self._easy);
return self.getResponseCode();
}
pub fn wsStartFrame(self: *const Connection, frame_type: libcurl.WsFrameType, size: usize) !void {
try libcurl.curl_ws_start_frame(self._easy, frame_type, @intCast(size));
}
@@ -591,62 +619,6 @@ pub const Connection = struct {
}
};
pub const Handles = struct {
multi: *libcurl.CurlM,
pub fn init(config: *const Config) !Handles {
const multi = libcurl.curl_multi_init() orelse return error.FailedToInitializeMulti;
errdefer libcurl.curl_multi_cleanup(multi) catch {};
try libcurl.curl_multi_setopt(multi, .max_host_connections, config.httpMaxHostOpen());
return .{ .multi = multi };
}
pub fn deinit(self: *Handles) void {
libcurl.curl_multi_cleanup(self.multi) catch {};
}
pub fn add(self: *Handles, conn: *const Connection) !void {
try libcurl.curl_multi_add_handle(self.multi, conn._easy);
}
pub fn remove(self: *Handles, conn: *const Connection) !void {
try libcurl.curl_multi_remove_handle(self.multi, conn._easy);
}
pub fn perform(self: *Handles) !c_int {
var running: c_int = undefined;
try libcurl.curl_multi_perform(self.multi, &running);
return running;
}
pub fn poll(self: *Handles, extra_fds: []libcurl.CurlWaitFd, timeout_ms: c_int) !void {
try libcurl.curl_multi_poll(self.multi, extra_fds, timeout_ms, null);
}
pub const MultiMessage = struct {
conn: *Connection,
err: ?Error,
};
pub fn readMessage(self: *Handles) !?MultiMessage {
var messages_count: c_int = 0;
const msg = libcurl.curl_multi_info_read(self.multi, &messages_count) orelse return null;
return switch (msg.data) {
.done => |err| {
var private: *anyopaque = undefined;
try libcurl.curl_easy_getinfo(msg.easy_handle, .private, &private);
return .{
.conn = @ptrCast(@alignCast(private)),
.err = err,
};
},
else => unreachable,
};
}
};
fn debugCallback(_: *libcurl.Curl, msg_type: libcurl.CurlInfoType, raw: [*c]u8, len: usize, _: *anyopaque) c_int {
const data = raw[0..len];
switch (msg_type) {