From 3b4629b9e5d635067b7840ebe66fcf9b50ee047d Mon Sep 17 00:00:00 2001 From: Nikolay Govorov Date: Tue, 21 Apr 2026 09:57:34 +0100 Subject: [PATCH] Move Handles to Network --- src/browser/HttpClient.zig | 444 ++++++++++----------------- src/browser/webapi/net/WebSocket.zig | 101 +++--- src/network/Network.zig | 326 ++++++++++++++++---- src/network/Slot.zig | 60 ++++ src/network/http.zig | 112 +++---- 5 files changed, 599 insertions(+), 444 deletions(-) create mode 100644 src/network/Slot.zig diff --git a/src/browser/HttpClient.zig b/src/browser/HttpClient.zig index 22860b55..5119a9cd 100644 --- a/src/browser/HttpClient.zig +++ b/src/browser/HttpClient.zig @@ -23,9 +23,11 @@ const builtin = @import("builtin"); const URL = @import("URL.zig"); const Notification = @import("../Notification.zig"); const CookieJar = @import("webapi/storage/Cookie.zig").Jar; +const WebSocket = @import("webapi/net/WebSocket.zig"); const http = @import("../network/http.zig"); const Network = @import("../network/Network.zig"); +const Slot = @import("../network/Slot.zig"); const Robots = @import("../network/Robots.zig"); const Cache = @import("../network/cache/Cache.zig"); const timestamp = @import("../datetime.zig").timestamp; @@ -71,36 +73,26 @@ http_active: usize = 0, // 'networkAlmostIdle' Page.lifecycleEvent in CDP). intercepted: usize = 0, -// Our curl multi handle. -handles: http.Handles, - -// Connections currently in this client's curl_multi. +// In-flight connections (curl is driven by Network on main thread). The +// list holds `conn._worker_node` so it doesn't conflict with the +// Network-side `conn.node`. Used for abort enumeration. in_use: std.DoublyLinkedList = .{}, -// Connections that failed to be removed from curl_multi during perform. -dirty: std.DoublyLinkedList = .{}, - -// Whether we're currently inside a curl_multi_perform call. -performing: bool = false, - // WebSockets with queued events to be drained from the worker thread. // Populated by libcurl callbacks (currently same thread, future cross-thread). ws_ready: std.ArrayList(*WebSocket) = .{}, ws_ready_mutex: std.Thread.Mutex = .{}, +// Slot the Network (main) thread uses to deliver completed transfers +// and any other cross-thread signals back to this worker. +slot: Slot, + // Use to generate the next request ID next_request_id: u32 = 0, -// When handles has no more available easys, requests get queued. +// Transfers waiting for a free Connection from Network.getConnection. queue: std.DoublyLinkedList = .{}, -// Queue is for Transfers that have no connection. ready_queue is for connections -// that were initiated when performing == true and thus need to wait until -// performing == false before being added. I'm hoping this is temporary and that -// we can unify the two queues. But HTTP is being changed a lot right now, and -// I'm trying to minimize the surface area. -ready_queue: std.DoublyLinkedList = .{}, - // The main app allocator allocator: Allocator, @@ -164,13 +156,13 @@ pub fn init(allocator: Allocator, network: *Network) !*Client { const client = try allocator.create(Client); errdefer allocator.destroy(client); - var handles = try http.Handles.init(network.config); - errdefer handles.deinit(); + var slot = try Slot.init(); + errdefer slot.deinit(); const http_proxy = network.config.httpProxy(); client.* = .{ - .handles = handles, + .slot = slot, .network = network, .allocator = allocator, .transfer_pool = transfer_pool, @@ -187,7 +179,15 @@ pub fn init(allocator: Allocator, network: *Network) !*Client { pub fn deinit(self: *Client) void { self.abort(); - self.handles.deinit(); + + // If Network has already stopped, drive its queues ourselves so the + // cancelations from abort() deliver before we close the slot. + if (self.network.shutdown.load(.acquire)) { + self.network.drainPendingForShutdown(); + self.drainCompletions(); + } + + self.slot.deinit(); self.ws_ready.deinit(self.allocator); self.transfer_pool.deinit(); @@ -230,19 +230,15 @@ pub fn setTlsVerify(self: *Client, verify: bool) !void { // Remove inflight connections check on enable TLS b/c chromiumoxide calls // the command during navigate and Curl seems to accept it... + self.tls_verify = verify; var it = self.in_use.first; while (it) |node| : (it = node.next) { - const conn: *http.Connection = @fieldParentPtr("node", node); - try conn.setTlsVerify(verify, self.use_proxy); + const conn: *http.Connection = @fieldParentPtr("_worker_node", node); + self.network.submitOp(conn, .{ .tls_verify = .{ + .verify = verify, + .use_proxy = self.use_proxy, + } }); } - - it = self.ready_queue.first; - while (it) |node| : (it = node.next) { - const conn: *http.Connection = @fieldParentPtr("node", node); - try conn.setTlsVerify(verify, self.use_proxy); - } - - self.tls_verify = verify; } // Restrictive since it'll only work if there are no inflight requests. In some @@ -280,8 +276,26 @@ pub fn abortFrame(self: *Client, frame_id: u32) void { // Written this way so that both abort and abortFrame can share the same code // but abort can avoid the frame_id check at comptime. fn _abort(self: *Client, comptime abort_all: bool, frame_id: u32) void { - abortConnections(self.in_use, abort_all, frame_id); - abortConnections(self.ready_queue, abort_all, frame_id); + { + var n = self.in_use.first; + while (n) |node| { + n = node.next; + const conn: *http.Connection = @fieldParentPtr("_worker_node", node); + switch (conn.transport) { + .http => |transfer| { + if ((comptime abort_all) or transfer.req.frame_id == frame_id) { + transfer.kill(); + } + }, + .websocket => |ws| { + if ((comptime abort_all) or ws._frame._frame_id == frame_id) { + ws.kill(); + } + }, + .none => unreachable, + } + } + } { var q = &self.queue; @@ -300,16 +314,15 @@ fn _abort(self: *Client, comptime abort_all: bool, frame_id: u32) void { if (comptime abort_all) { self.queue = .{}; - self.ready_queue = .{}; } if (comptime IS_DEBUG and abort_all) { - // Even after an abort_all, we could still have transfers, but, at the - // very least, they should all be flagged as aborted. + // After abort_all, http transfers are flagged aborted; WS conns + // linger in in_use until their canceled completion arrives. var it = self.in_use.first; var leftover: usize = 0; while (it) |node| : (it = node.next) { - const conn: *http.Connection = @fieldParentPtr("node", node); + const conn: *http.Connection = @fieldParentPtr("_worker_node", node); switch (conn.transport) { .http => |transfer| std.debug.assert(transfer.aborted), .websocket => {}, @@ -317,42 +330,46 @@ fn _abort(self: *Client, comptime abort_all: bool, frame_id: u32) void { } leftover += 1; } - std.debug.assert(self.http_active == leftover); - } -} - -fn abortConnections(list: std.DoublyLinkedList, comptime abort_all: bool, frame_id: u32) void { - var n = list.first; - while (n) |node| { - n = node.next; - const conn: *http.Connection = @fieldParentPtr("node", node); - switch (conn.transport) { - .http => |transfer| { - if ((comptime abort_all) or transfer.req.frame_id == frame_id) { - transfer.kill(); - } - }, - .websocket => |ws| { - if ((comptime abort_all) or ws._frame._frame_id == frame_id) { - ws.kill(); - } - }, - .none => unreachable, - } + std.debug.assert(self.http_active + self.ws_active == leftover); } } pub fn tick(self: *Client, timeout_ms: u32) !PerformStatus { + try self.drainQueue(); + self.drainCompletions(); + self.drainReadyWs(); + + var pollfds: [2]posix.pollfd = undefined; + var count: usize = 0; + pollfds[count] = .{ .fd = self.slot.pollFd(), .events = posix.POLL.IN, .revents = 0 }; + count += 1; + const cdp_idx: ?usize = if (self.cdp_client) |cdp_client| blk: { + pollfds[count] = .{ .fd = cdp_client.socket, .events = posix.POLL.IN, .revents = 0 }; + const i = count; + count += 1; + break :blk i; + } else null; + + _ = posix.poll(pollfds[0..count], @intCast(timeout_ms)) catch {}; + + try self.drainQueue(); + self.drainCompletions(); + self.drainReadyWs(); + + if (cdp_idx) |i| { + if (pollfds[i].revents != 0) return .cdp_socket; + } + return .normal; +} + +fn drainQueue(self: *Client) !void { while (self.queue.popFirst()) |queue_node| { const conn = self.network.getConnection() orelse { self.queue.prepend(queue_node); break; }; - try self.makeRequest(conn, @fieldParentPtr("_node", queue_node)); } - - return self.perform(@intCast(timeout_ms)); } pub fn request(self: *Client, req: Request) !void { @@ -701,14 +718,9 @@ fn waitForInterceptedResponse(self: *Client, transfer: *Transfer) !bool { // cases, the interceptor is expected to call resume to continue the transfer // or transfer.abort() to abort it. fn process(self: *Client, transfer: *Transfer) !void { - // libcurl doesn't allow recursive calls, if we're in a `perform()` operation - // then we _have_ to queue this. - if (self.performing == false) { - if (self.network.getConnection()) |conn| { - return self.makeRequest(conn, transfer); - } + if (self.network.getConnection()) |conn| { + return self.makeRequest(conn, transfer); } - self.queue.append(&transfer._node); } @@ -817,7 +829,6 @@ pub fn restoreOriginalProxy(self: *Client) !void { fn makeRequest(self: *Client, conn: *http.Connection, transfer: *Transfer) anyerror!void { { - // Reset per-response state for retries (auth challenge, queue). const auth = transfer._auth_challenge; transfer.reset(); transfer._auth_challenge = auth; @@ -826,22 +837,13 @@ fn makeRequest(self: *Client, conn: *http.Connection, transfer: *Transfer) anyer errdefer { transfer._conn = null; transfer.deinit(); - self.releaseConn(conn); + self.network.releaseConnection(conn); } try transfer.configureConn(conn); } - // As soon as this is called, our "perform" loop is responsible for - // cleaning things up. That's why the above code is in a block. If anything - // fails BEFORE `curl_multi_add_handle` succeeds, the we still need to do - // cleanup. But if things fail after `curl_multi_add_handle`, we expect - // perform to pickup the failure and cleanup. - self.trackConn(conn) catch |err| { - transfer._conn = null; - transfer.deinit(); - return err; - }; + self.trackConn(conn); if (transfer.req.start_callback) |cb| { cb(Response.fromTransfer(transfer)) catch |err| { @@ -849,7 +851,6 @@ fn makeRequest(self: *Client, conn: *http.Connection, transfer: *Transfer) anyer return err; }; } - _ = try self.perform(0); } pub const PerformStatus = enum { @@ -857,65 +858,51 @@ pub const PerformStatus = enum { normal, }; -fn perform(self: *Client, timeout_ms: c_int) anyerror!PerformStatus { - const running = blk: { - self.performing = true; - defer self.performing = false; - - break :blk try self.handles.perform(); +fn httpCompletionCallback(conn: *http.Connection, _: ?anyerror) void { + const client = switch (conn.transport) { + .http => |t| t.client, + .websocket => |ws| ws._http_client, + .none => return, }; - - // Drain queued WebSocket events. ws callbacks (called from libcurl during - // perform above) only buffer/queue — actual JS dispatch happens here, on - // the worker thread. - self.drainReadyWs(); - - // Process dirty connections — return them to Network pool. - while (self.dirty.popFirst()) |node| { - const conn: *http.Connection = @fieldParentPtr("node", node); - self.handles.remove(conn) catch |err| { - log.fatal(.http, "multi remove handle", .{ .err = err, .src = "perform" }); - @panic("multi_remove_handle"); - }; - self.releaseConn(conn); - } - - while (self.ready_queue.popFirst()) |node| { - const conn: *http.Connection = @fieldParentPtr("node", node); - try self.trackConn(conn); - } - - // We're potentially going to block for a while until we get data. Process - // whatever messages we have waiting ahead of time. - if (try self.processMessages()) { - return .normal; - } - - var status = PerformStatus.normal; - if (self.cdp_client) |cdp_client| { - var wait_fds = [_]http.WaitFd{.{ - .fd = cdp_client.socket, - .events = .{ .pollin = true }, - .revents = .{}, - }}; - try self.handles.poll(&wait_fds, timeout_ms); - if (wait_fds[0].revents.pollin or wait_fds[0].revents.pollpri or wait_fds[0].revents.pollout) { - status = .cdp_socket; - } - } else if (running > 0) { - try self.handles.poll(&.{}, timeout_ms); - } - - _ = try self.processMessages(); - return status; + client.slot.push(&conn.node); } -fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *Transfer) !bool { +fn drainCompletions(self: *Client) void { + var list = self.slot.drain(); + while (list.popFirst()) |node| { + const conn: *http.Connection = @fieldParentPtr("node", node); + const err = conn._completion_err; + conn._completion_err = null; + switch (conn.transport) { + .http => |transfer| { + const done = self.processOneMessage(conn, err, transfer) catch |e| blk: { + log.err(.http, "drain completions", .{ .err = e, .req = transfer }); + transfer.requestFailed(e, true); + break :blk true; + }; + if (done) transfer.deinit(); + }, + .websocket => |ws| { + if (err) |e| switch (e) { + error.GotNothing => ws.disconnected(null), + else => ws.disconnected(e), + } else { + ws.disconnected(null); + } + }, + .none => { + log.err(.http, "drain none transport", .{ .err = err }); + }, + } + } +} + +fn processOneMessage(self: *Client, conn: *http.Connection, maybe_err: ?anyerror, transfer: *Transfer) !bool { // Detect auth challenge from response headers. // Also check on RecvError: proxy may send 407 with headers before // closing the connection (CONNECT tunnel not yet established). - if (msg.err == null or msg.err.? == error.RecvError) { - transfer.detectAuthChallenge(msg.conn); + if (maybe_err == null or maybe_err.? == error.RecvError) { + transfer.detectAuthChallenge(conn); } // In case of auth challenge @@ -964,58 +951,40 @@ fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *T } // Handle redirects: reuse the same connection to preserve TCP state. - if (msg.err == null) { - const status = try msg.conn.getResponseCode(); + // Conn is already out of the multi (removed by Network.processCompletions). + if (maybe_err == null) { + const status = try conn.getResponseCode(); if (status >= 300 and status <= 399) { try transfer.handleRedirect(); - - const conn = transfer._conn.?; - - try self.handles.remove(conn); - transfer._conn = null; - transfer._detached_conn = conn; // signal orphan for processMessages cleanup - transfer.reset(); try transfer.configureConn(conn); - try self.handles.add(conn); - transfer._detached_conn = null; - transfer._conn = conn; // reattach after successful re-add - - _ = try self.perform(0); - + self.network.submitRequest(conn); return false; } } - // Transfer is done (success or error). Caller (processMessages) owns deinit. - // Return true = done (caller will deinit), false = continues (redirect/auth). - // When the server closes the TLS onnection without a close_notify alert, // BoringSSL reports RecvError. If we already received valid HTTP headers, // this is a normal end-of-body (the connection closure signals the end // of the response per HTTP/1.1 when there is no Content-Length). // We must check this before endTransfer, which may reset the easy handle. const is_conn_close_recv = blk: { - const err = msg.err orelse break :blk false; + const err = maybe_err orelse break :blk false; if (err != error.RecvError) break :blk false; - const hdr = msg.conn.getResponseHeader("connection", 0) orelse break :blk true; + const hdr = conn.getResponseHeader("connection", 0) orelse break :blk true; break :blk std.ascii.eqlIgnoreCase(hdr.value, "close"); }; - // make sure the transfer can't be immediately aborted from a callback - // since we still need it here. transfer._performing = true; defer transfer._performing = false; - if (msg.err != null and !is_conn_close_recv) { - transfer.requestFailed(transfer._callback_error orelse msg.err.?, true); + if (maybe_err != null and !is_conn_close_recv) { + transfer.requestFailed(transfer._callback_error orelse maybe_err.?, true); return true; } if (!transfer._header_done_called) { - // In case of request w/o data, we need to call the header done - // callback now. - const proceed = try transfer.headerDoneCallback(msg.conn); + const proceed = try transfer.headerDoneCallback(conn); if (!proceed) { transfer.requestFailed(error.Abort, true); return true; @@ -1059,108 +1028,38 @@ fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *T return true; } -fn processMessages(self: *Client) !bool { - var processed = false; - while (try self.handles.readMessage()) |msg| { - switch (msg.conn.transport) { - .http => |transfer| { - const done = self.processOneMessage(msg, transfer) catch |err| blk: { - log.err(.http, "process_messages", .{ .err = err, .req = transfer }); - transfer.requestFailed(err, true); - if (transfer._detached_conn) |c| { - // Conn was removed from handles during redirect reconfiguration - // but not re-added. Release it directly to avoid double-remove. - self.in_use.remove(&c.node); - self.http_active -= 1; - self.releaseConn(c); - transfer._detached_conn = null; - } - break :blk true; - }; - if (done) { - transfer.deinit(); - processed = true; - } - }, - .websocket => |ws| { - // ws_active will be decremented through the call to disconnected - if (msg.err) |err| switch (err) { - error.GotNothing => ws.disconnected(null), - else => ws.disconnected(err), - } else { - // Clean close - no error - ws.disconnected(null); - } - - processed = true; - }, - .none => unreachable, - } - } - return processed; -} - -pub fn trackConn(self: *Client, conn: *http.Connection) !void { - if (self.performing) { - conn.in_use = false; - self.ready_queue.append(&conn.node); - return; - } - - self.in_use.append(&conn.node); - conn.in_use = true; - // Set private pointer so readMessage can find the Connection. - // Must be done each time since curl_easy_reset clears it when - // connections are returned to pool. - conn.setPrivate(conn) catch |err| { - self.in_use.remove(&conn.node); - conn.in_use = false; - self.releaseConn(conn); - return err; - }; - self.handles.add(conn) catch |err| { - self.in_use.remove(&conn.node); - conn.in_use = false; - self.releaseConn(conn); - return err; - }; - +pub fn trackConn(self: *Client, conn: *http.Connection) void { + self.in_use.append(&conn._worker_node); + conn.on_complete = httpCompletionCallback; switch (conn.transport) { .http => self.http_active += 1, .websocket => self.ws_active += 1, else => unreachable, } + self.network.submitRequest(conn); } -pub fn removeConn(self: *Client, conn: *http.Connection) void { - if (conn.in_use == false) { - self.ready_queue.remove(&conn.node); - self.releaseConn(conn); - return; - } - - self.in_use.remove(&conn.node); - conn.in_use = false; +// Completion cleanup: conn is already out of the multi. Release to pool. +pub fn finishConn(self: *Client, conn: *http.Connection) void { + self.in_use.remove(&conn._worker_node); switch (conn.transport) { .http => self.http_active -= 1, .websocket => self.ws_active -= 1, else => unreachable, } - if (self.handles.remove(conn)) { - self.releaseConn(conn); - } else |_| { - // Can happen if we're in a perform() call, so we'll queue this - // for cleanup later. - self.dirty.append(&conn.node); - } -} - -fn releaseConn(self: *Client, conn: *http.Connection) void { self.network.releaseConnection(conn); } -// Called from WebSocket libcurl callbacks (currently same worker thread, but -// the API is mutex-protected so it stays correct if libcurl moves off-thread). +// Abort path: asks main to remove the conn from the multi. The resulting +// Canceled completion will flow back through the slot and be finalized +// by drainCompletions → finishConn, so we do not touch in_use or the +// active counters here. +pub fn cancelConn(self: *Client, conn: *http.Connection) void { + self.network.submitRemove(conn); +} + +// Called from WebSocket libcurl callbacks on the Network thread. +// The API is mutex-protected because the worker drains this queue. pub fn addReadyWs(self: *Client, ws: *WebSocket) void { self.ws_ready_mutex.lock(); defer self.ws_ready_mutex.unlock(); @@ -1336,10 +1235,6 @@ pub const Transfer = struct { _notified_fail: bool = false, _conn: ?*http.Connection = null, - // Set when conn is temporarily detached from transfer during redirect - // reconfiguration. Used by processMessages to release the orphaned conn - // if reconfiguration fails. - _detached_conn: ?*http.Connection = null, _auth_challenge: ?http.AuthChallenge = null, @@ -1371,14 +1266,14 @@ pub const Transfer = struct { fn releaseConn(self: *Transfer) void { if (self._conn) |conn| { - self.client.removeConn(conn); + self.client.finishConn(conn); self._conn = null; } } fn deinit(self: *Transfer) void { if (self._conn) |conn| { - self.client.removeConn(conn); + self.client.finishConn(conn); self._conn = null; } @@ -1389,15 +1284,16 @@ pub const Transfer = struct { pub fn abort(self: *Transfer, err: anyerror) void { self.requestFailed(err, true); + self.aborted = true; - if (self._performing or self.client.performing) { - // We're currently in a curl_multi_perform. We cannot call - // curl_multi_remove_handle from a curl callback. Instead, we flag - // this transfer and our callbacks will check for this flag. - self.aborted = true; + if (self._performing) return; + + if (self._conn) |conn| { + self.client.cancelConn(conn); return; } + self.client.queue.remove(&self._node); self.deinit(); } @@ -1413,20 +1309,18 @@ pub const Transfer = struct { cb(self.req.ctx); } - if (self._performing or self.client.performing) { - // We're currently inside of a callback. This client, and libcurl - // generally don't expect a transfer to become deinitialized during - // a callback. We can flag the transfer as aborted (which is what - // we do when transfer.abort() is called in this condition) AND, - // since this "kill()"should prevent any future callbacks, the best - // we can do is null/noop them. - self.aborted = true; - self.req.start_callback = null; - self.req.shutdown_callback = null; - self.req.header_callback = Noop.headerCallback; - self.req.data_callback = Noop.dataCallback; - self.req.done_callback = Noop.doneCallback; - self.req.error_callback = Noop.errorCallback; + self.aborted = true; + self.req.start_callback = null; + self.req.shutdown_callback = null; + self.req.header_callback = Noop.headerCallback; + self.req.data_callback = Noop.dataCallback; + self.req.done_callback = Noop.doneCallback; + self.req.error_callback = Noop.errorCallback; + + if (self._performing) return; + + if (self._conn) |conn| { + self.client.cancelConn(conn); return; } diff --git a/src/browser/webapi/net/WebSocket.zig b/src/browser/webapi/net/WebSocket.zig index 21c5a3c9..3a397207 100644 --- a/src/browser/webapi/net/WebSocket.zig +++ b/src/browser/webapi/net/WebSocket.zig @@ -28,6 +28,7 @@ const URL = @import("../../URL.zig"); const Page = @import("../../Page.zig"); const Frame = @import("../../Frame.zig"); const HttpClient = @import("../../HttpClient.zig"); +const ArenaPool = @import("../../../ArenaPool.zig"); const Event = @import("../Event.zig"); const EventTarget = @import("../EventTarget.zig"); @@ -44,6 +45,8 @@ _rc: lp.RC(u8) = .{}, _frame: *Frame, _proto: *EventTarget, _arena: Allocator, +// Cached for use by deinit — `_frame._page` may be torn down by then. +_arena_pool: *ArenaPool, // Connection state _ready_state: ReadyState = .connecting, @@ -74,9 +77,9 @@ _recv_buffer: std.ArrayList(u8) = .empty, // Used to slice out the message when bytes_left reaches 0. _assembling_start: usize = 0, -// Events queued by libcurl callbacks; drained from the worker thread via -// drainPending. Callbacks must NEVER enter V8 directly (they can run from -// any thread driving curl_multi_perform), so all dispatch happens here. +// Events queued by libcurl callbacks on the Network thread; drained from +// the worker thread via drainPending. Callbacks must NEVER enter V8 directly, +// so all dispatch happens here. _pending_messages: std.ArrayList(QueuedMessage) = .empty, _pending_open: bool = false, _pending_close: ?PendingClose = null, @@ -86,6 +89,10 @@ _pending_close: ?PendingClose = null, // WebSocket stays alive between queueing and drain. _in_ready_list: bool = false, +// Set while a cancel is in flight; holds an extra ref so callbacks +// can't deref `_conn` after free, and dedupes repeated cleanup(false). +_cancel_pending: bool = false, + // close info for event dispatch _close_code: u16 = 1000, _close_reason: []const u8 = "", @@ -175,13 +182,14 @@ pub fn init(url: []const u8, protocols: [][]const u8, frame: *Frame) !*WebSocket ._frame = frame, ._conn = conn, ._arena = arena, + ._arena_pool = frame._session.browser.arena_pool, ._proto = undefined, ._url = resolved_url, ._req_headers = headers, ._http_client = http_client, }); conn.transport = .{ .websocket = self }; - try http_client.trackConn(conn); + http_client.trackConn(conn); if (comptime IS_DEBUG) { log.info(.websocket, "connecting", .{ .url = url }); @@ -196,7 +204,8 @@ pub fn init(url: []const u8, protocols: [][]const u8, frame: *Frame) !*WebSocket } pub fn deinit(self: *WebSocket, page: *Page) void { - self.cleanup(); + _ = page; + self.cleanup(false); if (self._on_open) |func| { func.release(); @@ -212,10 +221,9 @@ pub fn deinit(self: *WebSocket, page: *Page) void { } for (self._send_queue.items) |msg| { - msg.deinit(page); + msg.deinit(self._arena_pool); } - - page.releaseArena(self._arena); + self._arena_pool.release(self._arena); } pub fn releaseRef(self: *WebSocket, page: *Page) void { @@ -232,15 +240,15 @@ fn asEventTarget(self: *WebSocket) *EventTarget { // we're being aborted internally (e.g. frame shutting down) pub fn kill(self: *WebSocket) void { - self.cleanup(); + self._ready_state = .closed; + self.cleanup(false); } pub fn disconnected(self: *WebSocket, err_: ?anyerror) void { - if (self._ready_state == .closed) { - // already disconnected (e.g. close-handshake disconnected us, then - // libcurl reports the same connection completion). - return; - } + defer self.cleanup(true); + + if (self._ready_state == .closed) return; + const was_clean = self._ready_state == .closing and err_ == null; self._ready_state = .closed; @@ -250,9 +258,6 @@ pub fn disconnected(self: *WebSocket, err_: ?anyerror) void { log.info(.websocket, "disconnected", .{ .url = self._url, .reason = "closed" }); } - // Queue events first (markReady acquires a "pending" ref), then cleanup - // (which releases the create-time ref). The pending ref keeps us alive - // until drainPending dispatches and releases it. self._pending_close = .{ .code = if (was_clean) self._close_code else 1006, .reason = if (was_clean) self._close_reason else "", @@ -260,18 +265,30 @@ pub fn disconnected(self: *WebSocket, err_: ?anyerror) void { .with_error = !was_clean, }; self.markReady(); - - self.cleanup(); } -fn cleanup(self: *WebSocket) void { - if (self._conn) |conn| { - self._http_client.removeConn(conn); - self._req_headers.deinit(); - self._conn = null; - self.releaseRef(self._frame._page); - self._send_queue.clearRetainingCapacity(); +// completed=true releases the conn to the pool and drops the create-time +// ref; called from disconnected(). completed=false begins a cancel and +// holds an extra ref until the canceled completion routes through +// disconnected → cleanup(true). +fn cleanup(self: *WebSocket, completed: bool) void { + const conn = self._conn orelse return; + if (!completed) { + if (self._cancel_pending) return; + self._cancel_pending = true; + self.acquireRef(); + self._http_client.cancelConn(conn); + return; } + self._http_client.finishConn(conn); + self._req_headers.deinit(); + self._conn = null; + self.releaseRef(self._frame._page); // create-time + if (self._cancel_pending) { + self._cancel_pending = false; + self.releaseRef(self._frame._page); // pending-cancel + } + self._send_queue.clearRetainingCapacity(); } fn queueMessage(self: *WebSocket, msg: Message) !void { @@ -279,9 +296,10 @@ fn queueMessage(self: *WebSocket, msg: Message) !void { try self._send_queue.append(self._arena, msg); if (was_empty) { - // Unpause the send callback so libcurl will request data + // Unpause via Network — curl_easy_pause from this thread would + // race with curl_multi_perform. if (self._conn) |conn| { - try conn.pause(.{ .cont = true }); + self._http_client.network.submitOp(conn, .unpause); } } } @@ -392,7 +410,7 @@ pub fn close(self: *WebSocket, code_: ?u16, reason_: ?[]const u8) !void { .with_error = false, }; self.markReady(); - self.cleanup(); + self.cleanup(false); return; } @@ -487,10 +505,9 @@ pub fn setOnClose(self: *WebSocket, cb_: ?js.Function) !void { } } -// Register self as having pending events to drain. Called from any thread -// that produces ws events (currently libcurl callbacks on the worker thread, -// future: Network thread). Acquires one extra ref to keep the WebSocket -// alive between queueing and the drainPending call. +// Register self as having pending events to drain. Called from the Network +// thread while libcurl is producing WS events. Acquires one extra ref to keep +// the WebSocket alive between queueing and the drainPending call. fn markReady(self: *WebSocket) void { if (self._in_ready_list) return; self._in_ready_list = true; @@ -499,11 +516,10 @@ fn markReady(self: *WebSocket) void { } // Dispatches all queued events to JS. Must be called from the worker thread -// (the one that owns the V8 isolate). HttpClient calls this from its perform -// loop after curl_multi_perform. +// (the one that owns the V8 isolate). pub fn drainPending(self: *WebSocket) void { self._in_ready_list = false; - defer self.releaseRef(self._page._session); + defer self.releaseRef(self._frame._page); if (self._pending_open) { self._pending_open = false; @@ -662,7 +678,7 @@ fn writeContent(self: *WebSocket, conn: *http.Connection, buf: []u8, byte_msg: M if (self._send_offset >= byte_msg.data.len) { const removed = self._send_queue.orderedRemove(0); - removed.deinit(self._frame._page); + removed.deinit(self._arena_pool); if (comptime IS_DEBUG) { log.debug(.websocket, "send complete", .{ .url = self._url, .len = byte_msg.data.len, .queue = self._send_queue.items.len }); } @@ -738,9 +754,10 @@ fn _receivedDataCallback(conn: *http.Connection, data: []const u8) !void { 1005; // No status code received if (self._ready_state == .closing) { - // Client-initiated close: this is the server's response. - // Close handshake complete - disconnect. - self.disconnected(null); + // Client-initiated close — server's response. Don't + // disconnect inline (UAF: conn still in multi). Curl + // will deliver normal completion when the server closes + // the socket per RFC 6455 §5.5.1. } else { // Server-initiated close: send reciprocal close frame per RFC 6455 §5.5.1 self._close_code = received_code; @@ -823,9 +840,9 @@ const Message = union(enum) { arena: Allocator, data: []const u8, }; - fn deinit(self: Message, page: *Page) void { + fn deinit(self: Message, pool: *ArenaPool) void { switch (self) { - .text, .binary => |msg| page.releaseArena(msg.arena), + .text, .binary => |msg| pool.release(msg.arena), .close => {}, } } diff --git a/src/network/Network.zig b/src/network/Network.zig index 42756783..cf7567cd 100644 --- a/src/network/Network.zig +++ b/src/network/Network.zig @@ -80,8 +80,14 @@ shutdown: std.atomic.Value(bool) = .init(false), // Currently, Network is used sparingly, and we only create it on demand. // When Network becomes truly shared, it should become a regular field. multi: ?*libcurl.CurlM = null, + +// Workers push via submit*; main pops in drainQueue. `conn.node` is +// shared across pending_add/pending_remove (mutually exclusive); +// `conn._op_node` is independent and only ever lives in pending_ops. submission_mutex: std.Thread.Mutex = .{}, -submission_queue: std.DoublyLinkedList = .{}, +pending_add: std.DoublyLinkedList = .{}, +pending_remove: std.DoublyLinkedList = .{}, +pending_ops: std.DoublyLinkedList = .{}, callbacks: [MAX_TICK_CALLBACKS]TickCallback = undefined, callbacks_len: usize = 0, @@ -223,8 +229,8 @@ pub fn init(allocator: Allocator, app: *App, config: *const Config) !Network { const pipe = try posix.pipe2(.{ .NONBLOCK = true, .CLOEXEC = true }); - // 0 is wakeup, 1 is listener, rest for curl fds - const pollfds = try allocator.alloc(posix.pollfd, PSEUDO_POLLFDS + config.httpMaxConcurrent()); + // 0 is wakeup, 1 is listener. + const pollfds = try allocator.alloc(posix.pollfd, PSEUDO_POLLFDS); errdefer allocator.free(pollfds); @memset(pollfds, .{ .fd = -1, .events = 0, .revents = 0 }); @@ -425,8 +431,7 @@ pub fn run(self: *Network) void { libcurl.curl_multi_perform(multi, &running_handles) catch |err| { lp.log.err(.app, "curl perform", .{ .err = err }); }; - - self.preparePollFds(multi); + self.processCompletions(multi); } // for ontick to work, you need to wake up periodically @@ -438,16 +443,48 @@ pub fn run(self: *Network) void { const curl_timeout = self.getCurlTimeout(); if (curl_timeout == 0) { - break :blk min_timeout; + break :blk 0; } break :blk @min(min_timeout, curl_timeout); }; - _ = posix.poll(self.pollfds, timeout) catch |err| { - lp.log.err(.app, "poll", .{ .err = err }); - continue; - }; + if (self.multi != null and running_handles > 0) { + const multi = self.multi.?; + var extra_fds: [PSEUDO_POLLFDS]libcurl.CurlWaitFd = undefined; + var extra_len: usize = 0; + const wake_idx = extra_len; + extra_fds[extra_len] = .{ + .fd = poll_fd.fd, + .events = .{ .pollin = true }, + .revents = .{}, + }; + extra_len += 1; + const listen_idx = if (listen_fd.fd >= 0) blk: { + const idx = extra_len; + extra_fds[extra_len] = .{ + .fd = listen_fd.fd, + .events = .{ .pollin = true }, + .revents = .{}, + }; + extra_len += 1; + break :blk idx; + } else null; + + libcurl.curl_multi_poll(multi, extra_fds[0..extra_len], timeout, null) catch |err| { + lp.log.err(.app, "curl poll", .{ .err = err }); + continue; + }; + poll_fd.revents = if (extra_fds[wake_idx].revents.pollin) posix.POLL.IN else 0; + if (listen_idx) |idx| { + listen_fd.revents = if (extra_fds[idx].revents.pollin) posix.POLL.IN else 0; + } + } else { + _ = posix.poll(self.pollfds[0..PSEUDO_POLLFDS], timeout) catch |err| { + lp.log.err(.app, "poll", .{ .err = err }); + continue; + }; + } // check wakeup pipe if (poll_fd.revents != 0) { @@ -476,7 +513,9 @@ pub fn run(self: *Network) void { // Check if fireTicks submitted new requests (e.g. telemetry flush). // If so, continue the loop to drain and send them before exiting. self.submission_mutex.lock(); - const has_pending = self.submission_queue.first != null; + const has_pending = self.pending_add.first != null or + self.pending_remove.first != null or + self.pending_ops.first != null; self.submission_mutex.unlock(); if (!has_pending) break; } @@ -497,22 +536,161 @@ pub fn run(self: *Network) void { } pub fn submitRequest(self: *Network, conn: *http.Connection) void { - self.submission_mutex.lock(); - self.submission_queue.append(&conn.node); - self.submission_mutex.unlock(); + { + self.submission_mutex.lock(); + defer self.submission_mutex.unlock(); + lp.assert(conn._submission == .idle, "submitRequest: conn not idle", .{}); + conn._submission = .pending_add; + self.pending_add.append(&conn.node); + } self.wakeupPoll(); } +// Fired from the worker thread. If the conn is still in pending_add +// (never reached the multi), short-circuit: remove it from the list and +// deliver a Canceled completion synchronously via on_complete. Otherwise +// queue a remove for main to process. +pub fn submitRemove(self: *Network, conn: *http.Connection) void { + var local_cancel: bool = false; + { + self.submission_mutex.lock(); + defer self.submission_mutex.unlock(); + switch (conn._submission) { + .pending_add => { + self.pending_add.remove(&conn.node); + conn._submission = .idle; + self.removeFromOpsLocked(conn); + local_cancel = true; + }, + .in_multi => { + conn._submission = .pending_remove; + self.pending_remove.append(&conn.node); + }, + .idle, .pending_remove => { + lp.log.warn(.app, "submitRemove bad state", .{ .state = @tagName(conn._submission) }); + return; + }, + } + } + if (local_cancel) { + if (conn.on_complete) |cb| { + conn._completion_err = error.Canceled; + cb(conn, error.Canceled); + } else { + self.releaseConnection(conn); + } + return; + } + self.wakeupPoll(); +} + +// Fire-and-forget op queued for Network to execute on a conn that's +// currently driven by curl. Dropped if the conn isn't in the multi. +pub const Op = union(enum) { + unpause, + tls_verify: http.Connection.TlsVerifyOp, +}; + +pub fn submitOp(self: *Network, conn: *http.Connection, op: Op) void { + { + self.submission_mutex.lock(); + defer self.submission_mutex.unlock(); + switch (conn._submission) { + .pending_add, .in_multi => {}, + .idle, .pending_remove => return, + } + switch (op) { + .unpause => conn._op_unpause = true, + .tls_verify => |t| conn._op_tls_verify = t, + } + if (!conn._op_in_list) { + conn._op_in_list = true; + self.pending_ops.append(&conn._op_node); + } + } + self.wakeupPoll(); +} + +// Caller holds submission_mutex. Called on every transition to .idle. +fn removeFromOpsLocked(self: *Network, conn: *http.Connection) void { + if (conn._op_in_list) { + self.pending_ops.remove(&conn._op_node); + conn._op_in_list = false; + } + conn._op_unpause = false; + conn._op_tls_verify = null; +} + fn wakeupPoll(self: *Network) void { _ = posix.write(self.wakeup_pipe[1], &.{1}) catch {}; } fn drainQueue(self: *Network) void { - self.submission_mutex.lock(); - defer self.submission_mutex.unlock(); + // add/remove are queued for execution outside the lock so that + // on_complete / releaseConnection can run unblocked. Ops execute + // *under* the lock — that's what keeps the conn alive (every path + // that releases a WS conn first transitions out of .in_multi here). + // pause/setopt only flip internal libcurl flags, no callbacks fire. + var to_add: std.DoublyLinkedList = .{}; + var to_remove: std.DoublyLinkedList = .{}; + { + self.submission_mutex.lock(); + defer self.submission_mutex.unlock(); - if (self.submission_queue.first == null) return; + while (self.pending_remove.popFirst()) |node| { + const conn: *http.Connection = @fieldParentPtr("node", node); + lp.assert(conn._submission == .pending_remove, "drainQueue: conn not in pending_remove", .{}); + conn._submission = .idle; + self.removeFromOpsLocked(conn); + to_remove.append(node); + } + while (self.pending_add.popFirst()) |node| { + const conn: *http.Connection = @fieldParentPtr("node", node); + lp.assert(conn._submission == .pending_add, "drainQueue: conn not in pending_add", .{}); + // `in_multi` is the target state; handleAdd may downgrade to + // idle on failure and release the conn. + conn._submission = .in_multi; + to_add.append(node); + } + while (self.pending_ops.popFirst()) |node| { + const conn: *http.Connection = @fieldParentPtr("_op_node", node); + conn._op_in_list = false; + // Conn raced out of multi between submitOp and now; drop ops. + if (conn._submission != .in_multi) { + conn._op_unpause = false; + conn._op_tls_verify = null; + continue; + } + if (conn._op_unpause) { + conn._op_unpause = false; + conn.pause(.{ .cont = true }) catch |err| { + lp.log.warn(.app, "curl pause", .{ .err = err }); + }; + } + if (conn._op_tls_verify) |t| { + conn._op_tls_verify = null; + conn.setTlsVerify(t.verify, t.use_proxy) catch |err| { + lp.log.warn(.app, "curl setTlsVerify", .{ .err = err }); + }; + } + } + } + // Process removes before adds: cancellations should take effect before + // we admit new transfers. + while (to_remove.popFirst()) |node| { + const conn: *http.Connection = @fieldParentPtr("node", node); + self.handleRemove(conn); + } + while (to_add.popFirst()) |node| { + const conn: *http.Connection = @fieldParentPtr("node", node); + self.handleAdd(conn); + } +} + +// Assumes conn._submission has already been set to .in_multi. On failure +// we must roll back to .idle and release the conn. +fn handleAdd(self: *Network, conn: *http.Connection) void { const multi = self.multi orelse blk: { const m = libcurl.curl_multi_init() orelse { lp.assert(false, "curl multi init failed", .{}); @@ -522,17 +700,44 @@ fn drainQueue(self: *Network) void { break :blk m; }; - while (self.submission_queue.popFirst()) |node| { - const conn: *http.Connection = @fieldParentPtr("node", node); - conn.setPrivate(conn) catch |err| { - lp.log.err(.app, "curl set private", .{ .err = err }); - self.releaseConnection(conn); - continue; - }; - libcurl.curl_multi_add_handle(multi, conn._easy) catch |err| { - lp.log.err(.app, "curl multi add", .{ .err = err }); - self.releaseConnection(conn); - }; + conn.setPrivate(conn) catch |err| { + lp.log.err(.app, "curl set private", .{ .err = err }); + self.handleAddFailure(conn, err); + return; + }; + libcurl.curl_multi_add_handle(multi, conn._easy) catch |err| { + lp.log.err(.app, "curl multi add", .{ .err = err }); + self.handleAddFailure(conn, err); + }; +} + +fn handleAddFailure(self: *Network, conn: *http.Connection, err: anyerror) void { + { + self.submission_mutex.lock(); + defer self.submission_mutex.unlock(); + conn._submission = .idle; + self.removeFromOpsLocked(conn); + } + if (conn.on_complete) |cb| { + conn._completion_err = err; + cb(conn, err); + } else { + self.releaseConnection(conn); + } +} + +// Assumes conn._submission has already been set to .idle and the conn is +// not in any submission list. The conn may still be in the multi (normal +// cancel path). +fn handleRemove(self: *Network, conn: *http.Connection) void { + if (self.multi) |multi| { + _ = libcurl.curl_multi_remove_handle(multi, conn._easy) catch {}; + } + if (conn.on_complete) |cb| { + conn._completion_err = error.Canceled; + cb(conn, error.Canceled); + } else { + self.releaseConnection(conn); } } @@ -541,6 +746,15 @@ pub fn stop(self: *Network) void { self.wakeupPoll(); } +// Caller guarantees Network.run is not executing. Used to drive late +// abort() cancelations through after Network.stop()+join(). +pub fn drainPendingForShutdown(self: *Network) void { + self.drainQueue(); + if (self.multi) |multi| { + self.processCompletions(multi); + } +} + fn acceptConnections(self: *Network) void { if (self.shutdown.load(.acquire)) { return; @@ -571,17 +785,6 @@ fn acceptConnections(self: *Network) void { } } -fn preparePollFds(self: *Network, multi: *libcurl.CurlM) void { - const curl_fds = self.pollfds[PSEUDO_POLLFDS..]; - @memset(curl_fds, .{ .fd = -1, .events = 0, .revents = 0 }); - - var fd_count: c_uint = 0; - const wait_fds: []libcurl.CurlWaitFd = @ptrCast(curl_fds); - libcurl.curl_multi_waitfds(multi, wait_fds, &fd_count) catch |err| { - lp.log.err(.app, "curl waitfds", .{ .err = err }); - }; -} - fn getCurlTimeout(self: *Network) i32 { const multi = self.multi orelse return -1; var timeout_ms: c_long = -1; @@ -592,13 +795,12 @@ fn getCurlTimeout(self: *Network) i32 { fn processCompletions(self: *Network, multi: *libcurl.CurlM) void { var msgs_in_queue: c_int = 0; while (libcurl.curl_multi_info_read(multi, &msgs_in_queue)) |msg| { - switch (msg.data) { - .done => |maybe_err| { - if (maybe_err) |err| { - lp.log.warn(.app, "curl transfer error", .{ .err = err }); - } - }, + const maybe_err: ?anyerror = switch (msg.data) { + .done => |e| e, else => continue, + }; + if (maybe_err) |err| { + lp.log.warn(.app, "curl transfer error", .{ .err = err }); } const easy: *libcurl.Curl = msg.easy_handle; @@ -608,19 +810,29 @@ fn processCompletions(self: *Network, multi: *libcurl.CurlM) void { const conn: *http.Connection = @ptrCast(@alignCast(ptr)); libcurl.curl_multi_remove_handle(multi, easy) catch {}; - self.releaseConnection(conn); - } -} -comptime { - if (@sizeOf(posix.pollfd) != @sizeOf(libcurl.CurlWaitFd)) { - @compileError("pollfd and CurlWaitFd size mismatch"); - } - if (@offsetOf(posix.pollfd, "fd") != @offsetOf(libcurl.CurlWaitFd, "fd") or - @offsetOf(posix.pollfd, "events") != @offsetOf(libcurl.CurlWaitFd, "events") or - @offsetOf(posix.pollfd, "revents") != @offsetOf(libcurl.CurlWaitFd, "revents")) - { - @compileError("pollfd and CurlWaitFd layout mismatch"); + // Transition submission state. Races with worker submitRemove: + // if the worker already queued a remove, we absorb it here and + // treat this as a normal completion (cancel-after-complete is + // effectively a no-op). + { + self.submission_mutex.lock(); + defer self.submission_mutex.unlock(); + switch (conn._submission) { + .in_multi => {}, + .pending_remove => self.pending_remove.remove(&conn.node), + else => lp.assert(false, "completion bad state", .{ .state = @tagName(conn._submission) }), + } + conn._submission = .idle; + self.removeFromOpsLocked(conn); + } + + if (conn.on_complete) |cb| { + conn._completion_err = maybe_err; + cb(conn, maybe_err); + } else { + self.releaseConnection(conn); + } } } diff --git a/src/network/Slot.zig b/src/network/Slot.zig new file mode 100644 index 00000000..af82df4d --- /dev/null +++ b/src/network/Slot.zig @@ -0,0 +1,60 @@ +// A Slot couples a wakeup pipe with a thread-safe linked-list queue. One +// consumer polls the pipe fd and drains the queue; any thread may push +// nodes or wake the consumer. Used to deliver completed transfers (and +// other cross-thread signals) from the main/Network thread to a worker. + +const std = @import("std"); +const posix = std.posix; + +const Slot = @This(); + +_pipe: [2]posix.fd_t, +_mutex: std.Thread.Mutex = .{}, +_queue: std.DoublyLinkedList = .{}, + +pub fn init() !Slot { + const pipe = try posix.pipe2(.{ .NONBLOCK = true, .CLOEXEC = true }); + return .{ ._pipe = pipe }; +} + +pub fn deinit(self: *Slot) void { + for (&self._pipe) |*fd| { + if (fd.* >= 0) { + posix.close(fd.*); + fd.* = -1; + } + } +} + +pub fn pollFd(self: *const Slot) posix.fd_t { + return self._pipe[0]; +} + +pub fn wake(self: *Slot) void { + _ = posix.write(self._pipe[1], &.{1}) catch {}; +} + +pub fn push(self: *Slot, node: *std.DoublyLinkedList.Node) void { + self._mutex.lock(); + self._queue.append(node); + self._mutex.unlock(); + self.wake(); +} + +// Consumer drains signal bytes first, then pops all queued nodes. +// Pipe-first ordering ensures pushes that race with drain are not lost: +// producer writes the queue before the byte, so any byte observed implies +// the queued node is visible on the next lock. +pub fn drain(self: *Slot) std.DoublyLinkedList { + var buf: [64]u8 = undefined; + while (true) { + _ = posix.read(self._pipe[0], &buf) catch break; + } + + self._mutex.lock(); + defer self._mutex.unlock(); + + var out: std.DoublyLinkedList = .{}; + while (self._queue.popFirst()) |n| out.append(n); + return out; +} diff --git a/src/network/http.zig b/src/network/http.zig index e97e779f..ee591b59 100644 --- a/src/network/http.zig +++ b/src/network/http.zig @@ -34,8 +34,6 @@ pub const readfunc_pause = libcurl.curl_readfunc_pause; pub const writefunc_error = libcurl.curl_writefunc_error; pub const WsFrameType = libcurl.WsFrameType; -const Error = libcurl.Error; - pub fn curl_version() [*c]const u8 { return libcurl.curl_version(); } @@ -271,10 +269,46 @@ fn opensocketCallback( pub const Connection = struct { _easy: *libcurl.Curl, - in_use: bool, transport: Transport, + + // Network-side node: the conn's submission state selects which list + // currently owns it (available pool / pending_add / pending_remove / + // Slot queue). Transitions of `_submission` and list membership must + // happen together under `Network.submission_mutex`. node: std.DoublyLinkedList.Node = .{}, + // Worker-side: HttpClient.in_use. Independent of `node`. + _worker_node: std.DoublyLinkedList.Node = .{}, + + // Tracks which submission list (if any) currently contains `node`. + // Guarded by Network.submission_mutex. + _submission: SubmissionState = .idle, + + // If set, called after the easy handle is removed from the multi; the + // callback takes ownership of the conn and must eventually release it. + on_complete: ?*const fn (conn: *Connection, err: ?anyerror) void = null, + + // Err stashed by on_complete for worker to pick up via Slot. + _completion_err: ?anyerror = null, + + // Op channel guarded by Network.submission_mutex. + _op_node: std.DoublyLinkedList.Node = .{}, + _op_in_list: bool = false, + _op_unpause: bool = false, + _op_tls_verify: ?TlsVerifyOp = null, + + pub const TlsVerifyOp = struct { verify: bool, use_proxy: bool }; + + pub const SubmissionState = enum(u8) { + // Not in pending_add/pending_remove, not in the curl multi. + // Conn is either in the pool, freshly drawn, or in worker/slot + // queues. + idle, + pending_add, + in_multi, + pending_remove, + }; + pub const Transport = union(enum) { none, // used for cases that manage their own connection, e.g. telemetry http: *@import("../browser/HttpClient.zig").Transfer, @@ -288,7 +322,7 @@ pub const Connection = struct { ) !Connection { const easy = libcurl.curl_easy_init() orelse return error.FailedToInitializeEasy; - var self = Connection{ ._easy = easy, .in_use = false, .transport = .none }; + var self = Connection{ ._easy = easy, .transport = .none }; errdefer self.deinit(); try self.reset(config, ca_blob, ip_filter); @@ -420,6 +454,10 @@ pub const Connection = struct { ) !void { libcurl.curl_easy_reset(self._easy); self.transport = .none; + self.on_complete = null; + self._completion_err = null; + // _submission and _op_* fields are owned by Network and cleared + // by removeFromOpsLocked on every transition to .idle. // timeouts try libcurl.curl_easy_setopt(self._easy, .timeout_ms, config.httpTimeout()); @@ -572,16 +610,6 @@ pub const Connection = struct { } } - pub fn request(self: *const Connection, http_headers: *const Config.HttpHeaders) !u16 { - var header_list = try Headers.init(http_headers.user_agent_header); - defer header_list.deinit(); - try self.secretHeaders(&header_list, http_headers); - try self.setHeaders(&header_list); - - try libcurl.curl_easy_perform(self._easy); - return self.getResponseCode(); - } - pub fn wsStartFrame(self: *const Connection, frame_type: libcurl.WsFrameType, size: usize) !void { try libcurl.curl_ws_start_frame(self._easy, frame_type, @intCast(size)); } @@ -591,62 +619,6 @@ pub const Connection = struct { } }; -pub const Handles = struct { - multi: *libcurl.CurlM, - - pub fn init(config: *const Config) !Handles { - const multi = libcurl.curl_multi_init() orelse return error.FailedToInitializeMulti; - errdefer libcurl.curl_multi_cleanup(multi) catch {}; - - try libcurl.curl_multi_setopt(multi, .max_host_connections, config.httpMaxHostOpen()); - - return .{ .multi = multi }; - } - - pub fn deinit(self: *Handles) void { - libcurl.curl_multi_cleanup(self.multi) catch {}; - } - - pub fn add(self: *Handles, conn: *const Connection) !void { - try libcurl.curl_multi_add_handle(self.multi, conn._easy); - } - - pub fn remove(self: *Handles, conn: *const Connection) !void { - try libcurl.curl_multi_remove_handle(self.multi, conn._easy); - } - - pub fn perform(self: *Handles) !c_int { - var running: c_int = undefined; - try libcurl.curl_multi_perform(self.multi, &running); - return running; - } - - pub fn poll(self: *Handles, extra_fds: []libcurl.CurlWaitFd, timeout_ms: c_int) !void { - try libcurl.curl_multi_poll(self.multi, extra_fds, timeout_ms, null); - } - - pub const MultiMessage = struct { - conn: *Connection, - err: ?Error, - }; - - pub fn readMessage(self: *Handles) !?MultiMessage { - var messages_count: c_int = 0; - const msg = libcurl.curl_multi_info_read(self.multi, &messages_count) orelse return null; - return switch (msg.data) { - .done => |err| { - var private: *anyopaque = undefined; - try libcurl.curl_easy_getinfo(msg.easy_handle, .private, &private); - return .{ - .conn = @ptrCast(@alignCast(private)), - .err = err, - }; - }, - else => unreachable, - }; - } -}; - fn debugCallback(_: *libcurl.Curl, msg_type: libcurl.CurlInfoType, raw: [*c]u8, len: usize, _: *anyopaque) c_int { const data = raw[0..len]; switch (msg_type) {