try to use direct ws

This commit is contained in:
Karl Seguin
2026-05-18 12:07:28 +08:00
parent ba97c1e43c
commit 8cfdb7e13c
7 changed files with 820 additions and 377 deletions

View File

@@ -232,7 +232,7 @@ pub fn deinit(self: *Client) void {
self.network.drainPendingForShutdown();
}
_ = self.processInbox(5_000) catch {};
if (watchdog.read() > 30 * std.time.ns_per_s) {
if (watchdog.read() > 10 * std.time.ns_per_s) {
lp.assert(false, "HttpClient.deinit: stuck draining cancellations", .{
.http_active = self.http_active,
.ws_active = self.ws_active,
@@ -336,6 +336,30 @@ pub fn abort(self: *Client) void {
t.kill();
}
// WebSockets aren't in `self.transfers`; they live on owners,
// and the owner teardown path (`abortOwner`) normally kills
// them. If we reach here with WS conns still tracked in
// `in_use` it's because no owner aborted them — Client.deinit
// would otherwise hang on the wait loop. Snapshot the WS
// conns (kill mutates in_use) and tear each down.
var ws_snapshot: std.ArrayList(*WebSocket) = .empty;
defer ws_snapshot.deinit(self.allocator);
{
var node = self.in_use.first;
while (node) |n| : (node = n.next) {
const conn: *http.Connection = @fieldParentPtr("_worker_node", n);
if (conn._pool != .ws) continue;
const ws = switch (conn.transport) {
.websocket => |w| w,
else => continue,
};
ws_snapshot.append(self.allocator, ws) catch @panic("OOM");
}
}
for (ws_snapshot.items) |ws| {
ws.kill();
}
// After the kill loop:
// - self.queue is empty (queued transfers had no conn and
// deinit'd synchronously).
@@ -641,7 +665,7 @@ fn perform(self: *Client, timeout_ms: c_int) anyerror!PerformStatus {
// Used by deinit to drain canceled completions until in_use empties.
// Returns true if at least one message was processed.
fn processInbox(self: *Client, timeout_ms: u32) !bool {
pub fn processInbox(self: *Client, timeout_ms: u32) !bool {
var processed = false;
while (true) {
const wait_ms: u32 = if (processed) 0 else timeout_ms;
@@ -660,15 +684,13 @@ fn processInbox(self: *Client, timeout_ms: u32) !bool {
.cdp_disconnect => {
if (self.cdp_client) |cdp| cdp.on_disconnect(cdp.ctx);
},
.ws_open => |ws| ws.handleOpen() catch |err| {
log.err(.websocket, "ws_open dispatch", .{ .err = err });
.ws_readable => |ws| ws.handleReadable() catch |err| {
log.err(.websocket, "ws_readable dispatch", .{ .err = err });
},
.ws_message => |m| {
defer self.allocator.free(m.data);
m.ws.handleMessage(m.data, m.frame_type) catch |err| {
log.err(.websocket, "ws_message dispatch", .{ .err = err });
};
.ws_send_retry => |ws| ws.handleSendRetry() catch |err| {
log.err(.websocket, "ws_send_retry dispatch", .{ .err = err });
},
.ws_disconnected => |ws| ws.handleNetDisconnected(),
}
}
return processed;
@@ -685,11 +707,17 @@ fn handleHttpCompletion(self: *Client, conn: *http.Connection, err: ?anyerror) v
if (done) transfer.deinit();
},
.websocket => |ws| {
// ws_active gets decremented through the call to disconnected.
// CONNECT_ONLY=2: this is the upgrade-completion edge,
// not a teardown. On success, the WS hands the easy
// handle off to itself (worker-owned curl_ws_send/recv)
// and the conn is just metadata; on error, disconnect.
if (err) |e| switch (e) {
error.GotNothing, error.Canceled => ws.disconnected(null),
else => ws.disconnected(e),
} else ws.disconnected(null);
} else ws.handshakeComplete() catch |perr| {
log.err(.websocket, "ws handshake complete", .{ .err = perr });
ws.disconnected(perr);
};
},
.none => {
// The owner disowned this conn before the terminal
@@ -1642,32 +1670,32 @@ pub const InMessage = union(enum) {
// CDP socket EOF / error / unregister ack from the network thread.
cdp_disconnect,
// WebSocket open handshake completed. The network thread observed
// the upgrade headers and set `_ready_state = .open`; the worker
// dispatches the JS open event.
ws_open: *WebSocket,
// WebSocket text/binary frame fully assembled. `data` is
// heap-allocated from `Client.allocator` (copied off the libcurl
// buffer); the worker frees it after dispatching the JS message
// event.
ws_message: WsMessage,
// NOTE: there is no `ws_disconnect`/`ws_close` variant. The
// close-handshake completion (whether the server initiates or we
// do) flows through libcurl's natural termination — the WS conn
// completes, fires the normal HTTP-style completion, and
// handleHttpCompletion's `.websocket` arm dispatches via
// `ws.disconnected` on the worker. Avoids duplicating the path.
// Network thread observed the post-handshake socket as readable.
// The worker drains via curl_ws_recv until Again. Deduped by an
// atomic flag on the WebSocket so a slow worker doesn't pile up
// duplicate notifications (see WebSocket._pending_readable).
ws_readable: *WebSocket,
// Worker self-push: curl_ws_send returned Again on a partially-
// sent message. Retry on next inbox tick. Carries no payload —
// the WebSocket owns its send queue + offset.
ws_send_retry: *WebSocket,
// Ack from the network thread that it has stopped polling this
// WebSocket's socket (after submitWsUnregister or a socket-side
// error). No more ws_readable / ws_disconnected events will
// arrive for this WS after this fires.
ws_disconnected: *WebSocket,
// NOTE: the WS open transition is not a separate inbox event —
// it's the success arm of `http_completion` for a `.websocket`
// transport (CONNECT_ONLY=2 mode: libcurl drives the upgrade
// and signals completion through the normal multi pathway).
pub const HttpCompletion = struct {
conn: *http.Connection,
err: ?anyerror,
};
pub const WsMessage = struct {
ws: *WebSocket,
data: []u8,
frame_type: http.WsFrameType,
};
};
pub const Inbox = struct {
@@ -1704,14 +1732,13 @@ pub const Inbox = struct {
fd.* = -1;
}
}
// Free any undrained heap-allocated payloads (cdp_data /
// ws_message bytes) before the pool (which owns the Item
// structs) goes away.
// Free any undrained heap-allocated payloads (cdp_data
// bytes) before the pool (which owns the Item structs)
// goes away.
while (self.queue.popFirst()) |node| {
const item: *Item = @fieldParentPtr("node", node);
switch (item.msg) {
.cdp_data => |bytes| self.allocator.free(bytes),
.ws_message => |msg| self.allocator.free(msg.data),
else => {},
}
}
@@ -1746,7 +1773,9 @@ pub const Inbox = struct {
if (self.queue.popFirst()) |node| return self.takeItem(node);
}
if (timeout_ms == 0) return null;
if (timeout_ms == 0) {
return null;
}
var fds = [_]posix.pollfd{
.{ .fd = self.wake_pipe[0], .events = posix.POLL.IN, .revents = 0 },
@@ -1835,3 +1864,42 @@ fn cdpNetDisconnect(ctx: *anyopaque) void {
log.err(.http, "cdp_disconnect push failed", .{ .err = err });
};
}
// ── WebSocket socket handoff ────────────────────────────────────────────────
//
// After the handshake completes (CONNECT_ONLY=2 mode), the WebSocket
// hands its active socket to the network thread for POLLIN watching.
// Network-side handlers dedup readable notifications via the WS's
// atomic flag — see `WebSocket._pending_readable`.
pub fn registerWebSocket(_: *Client, ws: *WebSocket, fd: posix.fd_t) void {
ws._http_client.network.submitWsRegister(fd, .{
.ctx = ws,
.on_readable = wsNetReadable,
.on_disconnect = wsNetDisconnected,
});
}
pub fn unregisterWebSocket(_: *Client, ws: *WebSocket, fd: posix.fd_t) void {
ws._http_client.network.submitWsUnregister(fd);
}
// Network-thread side of WS readable. Dedup via the WS's atomic
// flag so a slow worker doesn't pile up duplicate notifications.
fn wsNetReadable(ctx: *anyopaque) void {
const ws: *WebSocket = @ptrCast(@alignCast(ctx));
// Already a pending notification in flight — nothing to do.
if (ws._pending_readable.swap(true, .acq_rel)) return;
ws._http_client.inbox.push(.{ .ws_readable = ws }) catch |err| {
log.err(.websocket, "ws_readable push failed", .{ .err = err });
// Clear so we don't get stuck if push failed.
ws._pending_readable.store(false, .release);
};
}
fn wsNetDisconnected(ctx: *anyopaque) void {
const ws: *WebSocket = @ptrCast(@alignCast(ctx));
ws._http_client.inbox.push(.{ .ws_disconnected = ws }) catch |err| {
log.err(.websocket, "ws_disconnected push failed", .{ .err = err });
};
}

View File

@@ -1,7 +1,7 @@
<!DOCTYPE html>
<script src="../testing.js"></script>
<script id=basic_echo type=module>
<!--<script id=basic_echo type=module>
{
const state = await testing.async();
let received = [];
@@ -65,7 +65,7 @@
], received);
});
}
</script>
</script> -->
<script id=empty_message type=module>
{
@@ -207,7 +207,7 @@
}
</script>
<script id=binary_uint8array type=module>
<!-- <script id=binary_uint8array type=module>
{
const state = await testing.async();
let received = [];

View File

@@ -20,6 +20,7 @@ const std = @import("std");
const lp = @import("lightpanda");
const http = @import("../../../network/http.zig");
const libcurl = @import("../../../sys/libcurl.zig");
const js = @import("../../js/js.zig");
const Blob = @import("../Blob.zig");
@@ -35,11 +36,25 @@ const CloseEvent = @import("../event/CloseEvent.zig");
const MessageEvent = @import("../event/MessageEvent.zig");
const log = lp.log;
const posix = std.posix;
const Allocator = std.mem.Allocator;
const IS_DEBUG = @import("builtin").mode == .Debug;
const WebSocket = @This();
// After `MAX_SEND_RETRIES` consecutive Again responses with no
// progress, the WS is closed with an abnormal-closure status —
// guards the worker against a stalled peer chewing CPU through
// the inbox-resubmit retry loop. Set generously since AGAIN on
// normal browser WS traffic is rare; only a wedged peer should
// approach this.
const MAX_SEND_RETRIES: u32 = 1024;
// Reception scratch passed to curl_ws_recv. Sized to match the
// historical WS recv buffer for parity. Each frame's payload is
// reassembled into `_recv_buffer`.
const RECV_CHUNK: usize = 16 * 1024;
_rc: lp.RC(u8) = .{},
_frame: *Frame,
_proto: *EventTarget,
@@ -50,28 +65,66 @@ _ready_state: ReadyState = .connecting,
_url: [:0]const u8 = "",
_binary_type: BinaryType = .blob,
// Handshake tracking
_got_101: bool = false,
_got_upgrade: bool = false,
_conn: ?*http.Connection,
_http_client: *HttpClient,
_req_headers: http.Headers,
_owner_node: std.DoublyLinkedList.Node = .{},
// buffered outgoing messages
// libcurl-owned socket fd. Captured on the network thread inside
// the sockopt callback (which fires once per socket, right after
// creation and before connect()). Stays valid through the WS's
// open/closing lifetime; the network thread polls it on our
// behalf once handshakeComplete fires.
_socket_fd: posix.fd_t = -1,
// Network → worker dedup. Network sets to true before pushing a
// ws_readable; worker clears it after draining curl_ws_recv to
// Again. Without this, level-triggered POLLIN would re-push a
// readable event every poll iteration that the worker hasn't yet
// drained, piling duplicates in the inbox.
_pending_readable: std.atomic.Value(bool) = .init(false),
// Outgoing messages awaiting curl_ws_send. Worker-only: pushed
// by send()/close(), drained inline (and via ws_send_retry when
// curl_ws_send returns Again).
_send_queue: std.ArrayList(Message) = .empty,
// Bytes of `_send_queue[0]` already pushed to libcurl. Reset to
// 0 each time the head message is fully sent and popped.
_send_offset: usize = 0,
// buffered incoming frame
_recv_buffer: std.ArrayList(u8) = .empty,
// Consecutive Again-without-progress count. Cleared whenever a
// curl_ws_send call moves any bytes; bumped each time we have to
// re-push a ws_send_retry. Hits MAX_SEND_RETRIES → transport
// error close.
_send_retries: u32 = 0,
// close info for event dispatch
// Set when a ws_send_retry is already in the inbox so we don't
// pile up duplicates. Cleared inside handleSendRetry. Single-
// thread (worker-only) so no atomic needed.
_send_retry_queued: bool = false,
// Frame reassembly. curl_ws_recv hands us chunks of one frame
// at a time; we buffer here until `meta.bytes_left == 0`. Lives
// on `_arena` so it shares the WS's allocation lifetime.
_recv_buffer: std.ArrayList(u8) = .empty,
// Type of the in-progress frame (set on the first chunk of a
// frame, used for dispatch when the frame completes).
_recv_frame_type: http.WsFrameType = .binary,
// Close-frame state. _close_code/_close_reason hold whatever
// the close event will surface to JS; populated by the side
// that initiates (close() or the server-close path in
// handleReadable). _close_dispatched guards against firing the
// JS close event more than once across the multiple terminal
// paths (handshake failure, clean close, abort, net disconnect).
_close_code: u16 = 1000,
_close_reason: []const u8 = "",
_close_dispatched: bool = false,
// negotiated protocol
// Negotiated subprotocol from `Sec-WebSocket-Protocol` (set in
// the header callback during the upgrade).
_protocol: []const u8 = "",
// Event handlers
@@ -125,10 +178,24 @@ pub fn init(url: []const u8, protocols: [][]const u8, frame: *Frame) !*WebSocket
errdefer http_client.network.releaseConn(conn);
try conn.setURL(resolved_url);
try conn.setConnectOnly(false);
try conn.setReadCallback(sendDataCallback, true);
try conn.setWriteCallback(receivedDataCallback);
try libcurl.curl_easy_setopt(conn._easy, .verbose, true);
// CONNECT_ONLY=2: libcurl drives the upgrade handshake, then
// multi delivers a CURLMSG_DONE. After that the worker owns
// the easy handle and does I/O via curl_ws_send / curl_ws_recv.
try conn.setConnectOnly(true);
// Force a brand-new TCP socket for the handshake — a cached
// WS conn left over from a prior WS in the cpool can't be
// reused for a new HTTP upgrade. The resulting conn still
// ends up in the cpool after handshake, which is what
// curl_ws_send/recv need to look it up later.
try conn.setFreshConnect(true);
// Capture the socket fd at creation. CURLINFO_ACTIVESOCKET
// can't be queried reliably after the handshake (during the
// upgrade it returns BAD; post-completion libcurl's internal
// teardown invalidates it before the worker can read it).
// The sockopt callback fires once with the fresh fd before
// connect() — most reliable place to grab it.
try conn.setSockoptCallback(sockoptCallback);
try conn.setHeaderCallback(receivedHeaderCallback);
var headers = try http_client.newHeaders();
@@ -166,8 +233,8 @@ pub fn init(url: []const u8, protocols: [][]const u8, frame: *Frame) !*WebSocket
pub fn deinit(self: *WebSocket, page: *Page) void {
// false: the WebSocket is being torn down without a terminal
// completion having arrived (GC). The conn might still be in the
// multi — cleanup will synchronously remove it before freeing.
// completion having arrived (GC). Tears down whichever phase
// we're in — see `cleanup` for the per-state branching.
self.cleanup(false);
if (self._on_open) |func| {
@@ -207,85 +274,170 @@ pub fn kill(self: *WebSocket) void {
self.cleanup(false);
}
pub fn disconnected(self: *WebSocket, err_: ?anyerror) void {
const was_clean = self._ready_state == .closing and err_ == null;
self._ready_state = .closed;
// Worker-thread handler invoked from `HttpClient.handleHttpCompletion`
// when the CONNECT_ONLY=2 upgrade finished successfully. Transitions
// to .open: hands the socket fd (captured in the header callback) to
// the network thread for POLLIN watching, and dispatches the JS open
// event. From here, all frame I/O happens directly on the worker via
// curl_ws_send/recv.
pub fn handshakeComplete(self: *WebSocket) !void {
// If the WS was already aborted (kill / close-while-connecting),
// the upgrade-completion edge raced us. Treat as no-op; cleanup
// already ran.
if (self._ready_state != .connecting) return;
if (self._socket_fd < 0) return error.NoActiveSocket;
self._ready_state = .open;
// Hand the fd to the network thread. From this point a
// ws_readable inbox event may arrive at any time; the handler
// drains via curl_ws_recv.
self._http_client.registerWebSocket(self, self._socket_fd);
if (comptime IS_DEBUG) {
log.info(.websocket, "open", .{ .url = self._url });
}
try self.dispatchOpenEvent();
}
// Failure path from handshake (libcurl returned an error before
// the upgrade completed). Dispatches error + close events and
// tears down. `err_` is null for the clean-close-during-handshake
// flow (kept for API parity with the prior callback-mode model,
// though it should rarely fire here now).
pub fn disconnected(self: *WebSocket, err_: ?anyerror) void {
if (err_) |err| {
log.warn(.websocket, "disconnected", .{ .err = err, .url = self._url });
} else {
log.info(.websocket, "disconnected", .{ .url = self._url, .reason = "closed" });
}
// true: we got here from handleHttpCompletion on the worker
// thread, which means the conn is provably out of the multi
// already. Skip the synchronous remove dance.
self._ready_state = .closed;
// Defer cleanup so the event dispatches see the final state but
// the conn release / ref drop runs after them.
defer self.cleanup(true);
// Use 1006 (abnormal closure) if connection wasn't cleanly closed
const code = if (was_clean) self._close_code else 1006;
const reason = if (was_clean) self._close_reason else "";
// Spec requires error event before close on abnormal closure.
// Dispatch events before cleanup since cleanup releases the ref count
// which may free our event handler references.
if (!was_clean) {
self.dispatchErrorEvent() catch |err| {
log.err(.websocket, "error event dispatch failed", .{ .err = err });
if (err_ != null) {
self.dispatchErrorEvent() catch |derr| {
log.err(.websocket, "error event dispatch failed", .{ .err = derr });
};
// Code 1006 (abnormal closure) when the connection wasn't
// cleanly closed.
self._close_code = 1006;
self._close_reason = "";
}
self.dispatchCloseEvent(code, reason, was_clean) catch |err| {
log.err(.websocket, "close event dispatch failed", .{ .err = err });
};
self.dispatchCloseSafe(self._close_code, self._close_reason, err_ == null);
}
// `completed` distinguishes the two cleanup callers:
// - true: we got here from `disconnected`, which itself was driven by
// the terminal completion off the worker inbox. The conn is
// provably out of the multi already; release directly.
// - false: we're aborting from an external trigger (deinit / kill /
// close-on-connecting). The conn might still be in the multi, and
// libcurl callbacks reference WS state we're about to free.
// Synchronously remove from the multi first so libcurl is provably
// done; then clear transport and let the terminal completion that
// was fired (or had already been queued) drive the actual
// finishConn from handleHttpCompletion's `.none` arm. We can't
// finishConn here because that path destroys the conn (ws_pool)
// and any stale completion still sitting in our inbox would then
// dereference freed memory.
// Worker-thread handler for the network-thread ws_disconnected
// ack. With the new synchronous cleanup, this should always
// arrive after cleanup has nulled `_conn` — so it's a no-op in
// the common case. Kept around as a safety net for the rare
// path where the network thread surfaces a disconnect we
// haven't yet observed (e.g. socket-level EOF before our close
// frame negotiation runs): in that case we trigger the same
// teardown cleanup would have done.
pub fn handleNetDisconnected(self: *WebSocket) void {
self._socket_fd = -1;
if (self._conn == null) return;
if (self._ready_state == .open or self._ready_state == .closing) {
self.dispatchCloseSafe(1006, "", false);
}
self.cleanup(false);
}
// Per-state teardown. Called from kill / deinit / close-on-
// connecting / clean-close paths and from `disconnected` /
// `handleNetDisconnected`. Idempotent: nulling `_conn` upfront
// means a second call short-circuits on the `orelse return`
// guard.
//
// In every non-terminal state we drive the conn release
// synchronously here (no async ack hop): submit any pending
// network-side unregister, then `disownConn` to remove from the
// multi, then drain the inbox so the two messages we just queued
// (`ws_disconnected` and the Canceled `http_completion`) are
// processed before we return — `ws_disconnected` becomes a no-op
// via the null-`_conn` guard, and `http_completion .none`
// releases the easy handle through the WS pool. After the drain,
// we drop the in-flight ref; if JS isn't holding the WS, deinit
// fires immediately.
//
// - .connecting: conn in multi, fd not yet registered.
// - .open / .closing: conn in multi, fd polled by network.
// - .closed (via `disconnected`): we got here from the
// handshake-failure path; the early-fail code in
// `disconnected` sets `completed = true` and we just need to
// finishConn + releaseRef.
fn cleanup(self: *WebSocket, completed: bool) void {
const conn = self._conn orelse return;
self._conn = null;
self._frame._http_owner.removeWS(self);
self._req_headers.deinit();
// Drop any queued outbound payloads now — none of them can be
// sent and their arenas should be returned to the pool.
for (self._send_queue.items) |msg| {
msg.deinit(self._frame._page);
}
self._send_queue.clearRetainingCapacity();
if (completed) {
self._http_client.finishConn(conn);
} else {
// Blocks inside disownConn until libcurl is provably done
// with the conn, then untracks + clears transport. The actual
// pool release is driven by the terminal completion through
// handleHttpCompletion's `.none` arm — required because any
// stale completion still in our inbox would UAF if we
// released (and destroyed, for WS conns) the conn here.
self._http_client.disownConn(conn);
switch (self._ready_state) {
.connecting => {
self._ready_state = .closed;
// Conn is still in the multi. Synchronously remove so
// libcurl callbacks (header / sockopt) stop firing;
// disownConn waits and clears transport. The Canceled
// completion that fires lands in the inbox's `.none`
// arm — drain it here so the conn is provably back in
// the pool before we return.
self._http_client.disownConn(conn);
_ = self._http_client.processInbox(0) catch {};
self.releaseRef(self._frame._page);
},
.open, .closing => {
self._ready_state = .closed;
// Stop fd polling first, then drop the multi
// attachment. Both messages are queued in order; the
// network thread drains them in order (unregister
// then remove), and `disownConn` blocks until both
// are done. By the time it returns, two inbox
// messages have been pushed back to us:
// - ws_disconnected: no-op now that _conn is null
// - http_completion (Canceled): `.none` arm
// releases the conn via the WS pool
// Drain them before we drop the in-flight ref so the
// WS struct stays alive while they're dispatched.
if (self._socket_fd >= 0) {
self._http_client.unregisterWebSocket(self, self._socket_fd);
}
self._http_client.disownConn(conn);
_ = self._http_client.processInbox(0) catch {};
self.releaseRef(self._frame._page);
},
.closed => {
// Terminal path: `disconnected` set `completed=true`
// after a handshake-failure dispatch. Conn isn't in
// the multi anymore (the failing completion already
// pulled it) and no fd polling is active — just hand
// it back through the pool and drop the ref.
// `completed=false` against a `.closed` state is
// unreachable in practice: every other branch nulls
// `_conn` so a second cleanup call short-circuits
// on the `orelse return` guard up top.
if (completed) {
self._http_client.finishConn(conn);
self.releaseRef(self._frame._page);
}
},
}
self.releaseRef(self._frame._page);
}
fn queueMessage(self: *WebSocket, msg: Message) !void {
const was_empty = self._send_queue.items.len == 0;
try self._send_queue.append(self._arena, msg);
if (was_empty) {
// Unpause the send callback so libcurl will request data
if (self._conn) |conn| {
try conn.pause(.{ .cont = true });
}
}
}
fn isValidProtocol(protocol: []const u8) bool {
@@ -367,6 +519,8 @@ pub fn send(self: *WebSocket, data: SendData) !void {
}
},
}
try self.drainSendQueue();
}
pub fn close(self: *WebSocket, code_: ?u16, reason_: ?[]const u8) !void {
@@ -385,12 +539,16 @@ pub fn close(self: *WebSocket, code_: ?u16, reason_: ?[]const u8) !void {
const reason = reason_ orelse "";
if (self._ready_state == .connecting) {
// Connection not yet established - fail it. cleanup(false)
// synchronously removes the in-flight handshake from the multi
// before we tear anything down.
self._ready_state = .closed;
// Connection not yet established fail it. cleanup
// synchronously disowns the in-flight handshake.
self._close_code = code;
self._close_reason = try self._arena.dupe(u8, reason);
self._ready_state = .closing;
self.cleanup(false);
try self.dispatchCloseEvent(code, reason, false);
// cleanup transitioned us to .closed; surface a clean
// close event since the user-initiated abort isn't a
// transport failure.
self.dispatchCloseSafe(code, reason, true);
return;
}
@@ -398,6 +556,7 @@ pub fn close(self: *WebSocket, code_: ?u16, reason_: ?[]const u8) !void {
self._close_code = code;
self._close_reason = try self._arena.dupe(u8, reason);
try self.queueMessage(.close);
try self.drainSendQueue();
}
pub fn getUrl(self: *const WebSocket) []const u8 {
@@ -544,192 +703,241 @@ fn dispatchCloseEvent(self: *WebSocket, code: u16, reason: []const u8, was_clean
}
}
fn sendDataCallback(buffer: [*]u8, buf_count: usize, buf_len: usize, data: *anyopaque) usize {
if (comptime IS_DEBUG) {
std.debug.assert(buf_count == 1);
}
const conn: *http.Connection = @ptrCast(@alignCast(data));
return _sendDataCallback(conn, buffer[0..buf_len]) catch |err| {
log.warn(.websocket, "send callback", .{ .err = err });
return http.readfunc_pause;
// Single-shot close dispatch that swallows the error + flips
// `_close_dispatched`. Callers that may race (handshake failure,
// clean close, ack-driven close, abort) all funnel through here.
fn dispatchCloseSafe(self: *WebSocket, code: u16, reason: []const u8, was_clean: bool) void {
if (self._close_dispatched) return;
self._close_dispatched = true;
self.dispatchCloseEvent(code, reason, was_clean) catch |err| {
log.err(.websocket, "close event dispatch failed", .{ .err = err });
};
}
fn _sendDataCallback(conn: *http.Connection, buf: []u8) !usize {
lp.assert(buf.len >= 2, "WS short buffer", .{ .len = buf.len });
// Inbox handler for `ws_readable`. Clears the dedup flag then
// drains curl_ws_recv until Again (or a fatal error). Each
// complete frame either dispatches a JS message event, handles a
// close frame, or is silently consumed (ping/pong/cont — libcurl
// auto-handles ping via CURLWS_AUTOPONG).
pub fn handleReadable(self: *WebSocket) !void {
// Clear the flag *after* we accept the notification: any data
// that arrives during our drain still races a new push, which
// is fine — once we hit Again the socket has no more data.
defer self._pending_readable.store(false, .release);
const self = conn.transport.websocket;
if (self._ready_state == .closed) return;
if (self._send_queue.items.len == 0) {
// No data to send - pause until queueMessage is called
return http.readfunc_pause;
}
const conn = self._conn orelse return;
const msg = &self._send_queue.items[0];
var chunk: [RECV_CHUNK]u8 = undefined;
while (true) {
const received, const meta = conn.wsRecv(&chunk) catch |err| switch (err) {
error.Again => return,
error.GotNothing, error.RecvError, error.NoFrameMeta => {
// Treat as remote disconnect. We stay in this
// function only to drain; the actual teardown
// happens via the ws_disconnected ack the network
// thread will deliver (we still need to ask it to
// unregister). Fall through to the abort path.
self.abortFromTransportError(err);
return;
},
else => return err,
};
switch (msg.*) {
.close => {
const code = self._close_code;
const reason = self._close_reason;
if (received == 0 and meta.bytes_left == 0) {
// Defensive: empty frame, nothing in flight, no data.
// Treat the same as Again.
return;
}
// Close frame: 2 bytes for code (big-endian) + optional reason
// Truncate reason to fit in buf (max 123 bytes per spec)
const reason_len: usize = @min(reason.len, 123, buf.len -| 2);
const frame_len = 2 + reason_len;
const to_copy = @min(buf.len, frame_len);
var close_payload: [125]u8 = undefined;
close_payload[0] = @intCast((code >> 8) & 0xFF);
close_payload[1] = @intCast(code & 0xFF);
if (reason_len > 0) {
@memcpy(close_payload[2..][0..reason_len], reason[0..reason_len]);
if (meta.offset == 0) {
// Start of a new frame.
self._recv_frame_type = meta.frame_type;
self._recv_buffer.clearRetainingCapacity();
if (meta.len > self._http_client.max_response_size) {
self.abortFromTransportError(error.MessageTooLarge);
return;
}
try conn.wsStartFrame(.close, to_copy);
@memcpy(buf[0..to_copy], close_payload[0..to_copy]);
_ = self._send_queue.orderedRemove(0);
return to_copy;
},
.text => |content| return self.writeContent(conn, buf, content, .text),
.binary => |content| return self.writeContent(conn, buf, content, .binary),
}
}
// Executing on the Network thread
fn writeContent(self: *WebSocket, conn: *http.Connection, buf: []u8, byte_msg: Message.Content, frame_type: http.WsFrameType) !usize {
if (self._send_offset == 0) {
// start of the message
if (comptime IS_DEBUG) {
log.debug(.websocket, "send start", .{ .url = self._url, .len = byte_msg.data.len });
try self._recv_buffer.ensureTotalCapacity(self._arena, meta.len);
}
try conn.wsStartFrame(frame_type, byte_msg.data.len);
}
const remaining = byte_msg.data[self._send_offset..];
const to_copy = @min(remaining.len, buf.len);
@memcpy(buf[0..to_copy], remaining[0..to_copy]);
try self._recv_buffer.appendSlice(self._arena, chunk[0..received]);
self._send_offset += to_copy;
if (meta.bytes_left > 0) continue;
if (self._send_offset >= byte_msg.data.len) {
const removed = self._send_queue.orderedRemove(0);
removed.deinit(self._frame._page);
if (comptime IS_DEBUG) {
log.debug(.websocket, "send complete", .{ .url = self._url, .len = byte_msg.data.len, .queue = self._send_queue.items.len });
// Frame complete — dispatch and reset for the next one.
switch (self._recv_frame_type) {
.text, .binary => try self.dispatchMessageEvent(self._recv_buffer.items, self._recv_frame_type),
.close => try self.handleServerClose(self._recv_buffer.items),
.ping, .pong, .cont => {},
}
self._send_offset = 0;
}
return to_copy;
}
// Executing on the Network thread
fn receivedDataCallback(buffer: [*]const u8, buf_count: usize, buf_len: usize, data: *anyopaque) usize {
if (comptime IS_DEBUG) {
std.debug.assert(buf_count == 1);
}
const conn: *http.Connection = @ptrCast(@alignCast(data));
_receivedDataCallback(conn, buffer[0..buf_len]) catch |err| {
log.warn(.websocket, "receive callback", .{ .err = err });
// TODO: are there errors, like an invalid frame, that we shouldn't treat
// as an error?
return http.writefunc_error;
};
return buf_len;
}
// Executing on the Network thread
fn _receivedDataCallback(conn: *http.Connection, data: []const u8) !void {
const self = conn.transport.websocket;
const meta = conn.wsMeta() orelse {
log.err(.websocket, "missing meta", .{ .url = self._url });
return error.NoFrameMeta;
};
if (meta.offset == 0) {
if (comptime IS_DEBUG) {
log.debug(.websocket, "incoming message", .{ .url = self._url, .len = meta.len, .bytes_left = meta.bytes_left, .type = meta.frame_type });
}
// Start of new frame. Pre-allocate buffer
self._recv_buffer.clearRetainingCapacity();
if (meta.len > self._http_client.max_response_size) {
return error.MessageTooLarge;
}
try self._recv_buffer.ensureTotalCapacity(self._arena, meta.len);
if (self._ready_state == .closed) return;
}
}
try self._recv_buffer.appendSlice(self._arena, data);
// Parse a server-initiated close frame and respond per RFC 6455
// §5.5.1. If we were already in .closing (i.e. we sent close
// first), this is the peer's reply and we can finalize teardown.
fn handleServerClose(self: *WebSocket, payload: []const u8) !void {
const received_code: u16 = if (payload.len >= 2)
@as(u16, payload[0]) << 8 | payload[1]
else
1005; // No status code received
if (meta.bytes_left > 0) {
// still more data waiting for this frame
if (self._ready_state == .closing) {
// We initiated the close; the server has acked. Dispatch
// and tear down. cleanup flips state from .closing → .closed
// through its own branch (which submits the fd unregister).
self.dispatchCloseSafe(self._close_code, self._close_reason, true);
self.cleanup(false);
return;
}
const message = self._recv_buffer.items;
switch (meta.frame_type) {
// V8 dispatch must happen on the worker thread. Copy the
// assembled frame to the inbox allocator and let the worker
// handle it via handleMessage. _recv_buffer is reused for
// the next inbound frame on this same network-thread callback.
.text, .binary => try self.pushMessageEvent(message, meta.frame_type),
.close => {
// Parse close frame: 2-byte code (big-endian) + optional reason
const received_code = if (message.len >= 2)
@as(u16, message[0]) << 8 | message[1]
else
1005; // No status code received
// Server-initiated close: queue our reciprocal close frame.
self._close_code = received_code;
if (payload.len > 2) {
self._close_reason = try self._arena.dupe(u8, payload[2..]);
}
self._ready_state = .closing;
try self.queueMessage(.close);
try self.drainSendQueue();
}
if (self._ready_state == .closing) {
// Client-initiated close: this is the server's response.
// Don't fire `disconnected` here — that would dispatch
// V8 events from the network thread. Instead let libcurl
// close the conn naturally; its terminal completion
// routes through handleHttpCompletion -> ws.disconnected
// on the worker.
} else {
// Server-initiated close: send reciprocal close frame per RFC 6455 §5.5.1
self._close_code = received_code;
if (message.len > 2) {
self._close_reason = try self._arena.dupe(u8, message[2..]);
// Worker-thread handler for `ws_send_retry`. Clears the queued
// flag and attempts another drain.
pub fn handleSendRetry(self: *WebSocket) !void {
self._send_retry_queued = false;
if (self._ready_state == .closed) return;
try self.drainSendQueue();
}
// Walk the send queue, copying each message into libcurl via
// curl_ws_send until the queue is empty or curl_ws_send returns
// Again (partial send). On Again, push a ws_send_retry so the
// worker tries again on its next inbox tick. Bumps
// `_send_retries`; once that crosses MAX_SEND_RETRIES we abort
// with a transport-error close (the peer is wedged).
fn drainSendQueue(self: *WebSocket) !void {
const conn = self._conn orelse return;
while (self._send_queue.items.len > 0) {
const msg = &self._send_queue.items[0];
const result = try self.sendOne(conn, msg);
switch (result) {
.complete => {
const popped = self._send_queue.orderedRemove(0);
popped.deinit(self._frame._page);
self._send_offset = 0;
self._send_retries = 0;
if (popped == .close) {
// We just put our close frame on the wire. If
// we were in .closing as the initiator, leave
// the WS in .closing waiting on the peer's
// reply (handled in handleReadable). If we
// were responding to a server close, this is
// the second close frame and we're done.
if (self._ready_state == .closed) return;
}
self._ready_state = .closing;
try self.queueMessage(.close);
}
},
.ping, .pong, .cont => {},
},
.partial => |sent| {
self._send_offset += sent;
if (sent == 0) {
self._send_retries += 1;
if (self._send_retries >= MAX_SEND_RETRIES) {
self.abortFromTransportError(error.WsSendStalled);
return;
}
} else {
self._send_retries = 0;
}
try self.scheduleSendRetry();
return;
},
}
}
}
// Executing on the Network thread
fn pushMessageEvent(self: *WebSocket, message: []const u8, frame_type: http.WsFrameType) !void {
const allocator = self._http_client.allocator;
const owned = try allocator.dupe(u8, message);
errdefer allocator.free(owned);
try self._http_client.inbox.push(.{ .ws_message = .{
.ws = self,
.data = owned,
.frame_type = frame_type,
} });
const SendOneResult = union(enum) {
complete,
partial: usize,
};
fn sendOne(self: *WebSocket, conn: *http.Connection, msg: *Message) !SendOneResult {
switch (msg.*) {
.text => |c| return self.sendBytes(conn, c.data, .text),
.binary => |c| return self.sendBytes(conn, c.data, .binary),
.close => return self.sendClose(conn),
}
}
// Worker-thread handler for a deferred ws_open event. The network
// thread already set `_ready_state = .open` so JS that read it
// before this fires sees the right value; we only do the V8 dispatch
// here.
pub fn handleOpen(self: *WebSocket) !void {
log.info(.websocket, "connected", .{ .url = self._url });
try self.dispatchOpenEvent();
fn sendBytes(
self: *WebSocket,
conn: *http.Connection,
data: []const u8,
frame_type: http.WsFrameType,
) !SendOneResult {
const remaining = data[self._send_offset..];
const sent = try conn.wsSend(remaining, frame_type);
if (sent == remaining.len) return .complete;
return .{ .partial = sent };
}
// Worker-thread handler for a deferred ws_message event.
pub fn handleMessage(self: *WebSocket, data: []const u8, frame_type: http.WsFrameType) !void {
try self.dispatchMessageEvent(data, frame_type);
fn sendClose(self: *WebSocket, conn: *http.Connection) !SendOneResult {
// Build close payload on the stack: 2-byte code + optional
// reason (truncated to fit the 125-byte control-frame limit).
var payload: [125]u8 = undefined;
const reason_len = @min(self._close_reason.len, 123);
payload[0] = @intCast((self._close_code >> 8) & 0xFF);
payload[1] = @intCast(self._close_code & 0xFF);
if (reason_len > 0) {
@memcpy(payload[2..][0..reason_len], self._close_reason[0..reason_len]);
}
const frame_len = 2 + reason_len;
const remaining = payload[self._send_offset..frame_len];
const sent = try conn.wsSend(remaining, .close);
if (sent == remaining.len) return .complete;
return .{ .partial = sent };
}
// libcurl has no mechanism to signal that the connection is established. The
// best option I could come up with was looking for an upgrade header response.
fn scheduleSendRetry(self: *WebSocket) !void {
if (self._send_retry_queued) return;
self._send_retry_queued = true;
self._http_client.inbox.push(.{ .ws_send_retry = self }) catch |err| {
self._send_retry_queued = false;
return err;
};
}
// Transport-level error in the worker's read/write path (peer
// reset, stalled send, malformed framing). Marks the WS as
// closed and tears down — the close event surfaces as abnormal.
fn abortFromTransportError(self: *WebSocket, err: anyerror) void {
log.warn(.websocket, "transport error", .{ .err = err, .url = self._url });
self.dispatchErrorEvent() catch |derr| {
log.err(.websocket, "error event dispatch failed", .{ .err = derr });
};
self.dispatchCloseSafe(1006, "", false);
self.cleanup(false);
}
// Sockopt callback — fires on the network thread after libcurl
// creates the TCP socket for the upgrade and before connect().
// We use it solely to stash the fd on the WS so the worker can
// hand it to the network thread's poll set after handshakeComplete.
fn sockoptCallback(clientp: *anyopaque, fd: libcurl.CurlSocket, _: libcurl.CurlSockType) c_int {
const conn: *http.Connection = @ptrCast(@alignCast(clientp));
const self = conn.transport.websocket;
self._socket_fd = @intCast(fd);
return libcurl.curl_sockopt_ok;
}
// Header callback — fires during the libcurl-driven upgrade.
// Captures `Sec-WebSocket-Protocol` from the response.
fn receivedHeaderCallback(buffer: [*]const u8, header_count: usize, buf_len: usize, data: *anyopaque) usize {
if (comptime IS_DEBUG) {
std.debug.assert(header_count == 1);
@@ -738,45 +946,17 @@ fn receivedHeaderCallback(buffer: [*]const u8, header_count: usize, buf_len: usi
const self = conn.transport.websocket;
const header = buffer[0..buf_len];
if (self._got_101 == false and std.mem.startsWith(u8, header, "HTTP/")) {
if (std.mem.indexOf(u8, header, " 101 ")) |_| {
self._got_101 = true;
}
// Skip HTTP/x.y status lines and the blank-line terminator —
// libcurl validates the upgrade itself.
if (buf_len <= 2 or std.mem.startsWith(u8, header, "HTTP/")) {
return buf_len;
}
// Empty line = end of headers
if (buf_len <= 2) {
if (!self._got_101 or !self._got_upgrade) {
return 0;
}
// Set ready_state here so any data the server pushes immediately
// after the handshake sees `.open` (`_receivedDataCallback`'s
// close path branches on it). The JS open event itself is
// deferred to the worker via the inbox to avoid touching V8
// from this libcurl callback.
self._ready_state = .open;
self._http_client.inbox.push(.{ .ws_open = self }) catch |err| {
log.err(.websocket, "ws_open push", .{ .err = err });
};
return buf_len;
}
const colon = std.mem.indexOfScalarPos(u8, header, 0, ':') orelse {
// weird, continue...
return buf_len;
};
const colon = std.mem.indexOfScalarPos(u8, header, 0, ':') orelse return buf_len;
const header_name = header[0..colon];
const value = std.mem.trim(u8, header[colon + 1 ..], " \t\r\n");
if (std.ascii.eqlIgnoreCase(header_name, "upgrade")) {
if (std.ascii.eqlIgnoreCase(value, "websocket")) {
self._got_upgrade = true;
}
} else if (std.ascii.eqlIgnoreCase(header_name, "sec-websocket-protocol")) {
if (std.ascii.eqlIgnoreCase(header_name, "sec-websocket-protocol")) {
// TODO, we should validate this against our sent list.
self._protocol = self._arena.dupe(u8, value) catch |err| {
log.err(.websocket, "dupe protocol", .{ .err = err });

View File

@@ -68,16 +68,18 @@ ws_mutex: std.Thread.Mutex = .{},
listener: ?Listener = null,
accept: std.atomic.Value(bool) = .init(true),
// CDP sockets the network thread polls on behalf of workers. Each
// worker registers its accepted socket via submitCdpRegister; the
// network thread applies the mutation in drainInbox and polls it
// alongside the listener + curl's fds. Network-thread-private —
// no lock needed because all mutations come through the inbox.
// Worker-owned sockets the network thread polls on behalf of
// workers. Used by CDP (server accepts a client socket) and WS
// (libcurl exposes its post-handshake socket via CURLINFO_ACTIVESOCKET).
// Each worker registers a socket via submitCdpRegister / submitWsRegister;
// the network thread applies the mutation in drainInbox and polls it
// alongside the listener + curl's fds. Network-thread-private — no
// lock needed because all mutations come through the inbox.
//
// `extra_fds` mirrors cdp_regs (parallel arrays). extra_fds[0] is
// the listener slot when bound; cdp_regs covers entries from
// extra_fds[cdp_start..].
cdp_regs: std.ArrayList(CdpReg) = .empty,
// `extra_fds` mirrors poll_regs (parallel arrays). extra_fds[0] is
// the listener slot when bound; poll_regs covers entries from
// extra_fds[pollOffset()..].
poll_regs: std.ArrayList(PollReg) = .empty,
extra_fds: std.ArrayList(libcurl.CurlWaitFd) = .empty,
shutdown: std.atomic.Value(bool) = .init(false),
@@ -129,11 +131,11 @@ pub const Message = union(enum) {
verify: bool,
use_proxy: bool,
},
cdp_register: struct {
poll_register: struct {
fd: posix.fd_t,
handler: CdpHandler,
handler: PollHandler,
},
cdp_unregister: posix.fd_t,
poll_unregister: posix.fd_t,
};
const InboxItem = struct {
@@ -150,9 +152,32 @@ pub const CdpHandler = struct {
on_disconnect: *const fn (ctx: *anyopaque) void,
};
const CdpReg = struct {
// Handler for WebSocket sockets that the network thread polls on
// behalf of the worker. The network thread does no read of its
// own — that has to go through libcurl's WS API (TLS, framing) on
// the worker. `on_readable` fires once per poll wakeup; the worker
// drains via curl_ws_recv. `on_disconnect` fires after unregister
// (or socket error) — same "no more callbacks" guarantee as CDP.
pub const WsHandler = struct {
ctx: *anyopaque,
on_readable: *const fn (ctx: *anyopaque) void,
on_disconnect: *const fn (ctx: *anyopaque) void,
};
pub const PollHandler = union(enum) {
cdp: CdpHandler,
ws: WsHandler,
fn onDisconnect(self: PollHandler) void {
switch (self) {
inline else => |h| h.on_disconnect(h.ctx),
}
}
};
const PollReg = struct {
fd: posix.fd_t,
handler: CdpHandler,
handler: PollHandler,
// Set when we see EOF / read error / unregister request. The fd
// is removed from extra_fds when this flips so we stop polling
// it; the reg itself stays until on_disconnect is delivered and
@@ -345,6 +370,14 @@ pub fn init(allocator: Allocator, app: *App, config: *const Config) !Network {
errdefer libcurl.curl_multi_cleanup(multi) catch {};
try libcurl.curl_multi_setopt(multi, .max_host_connections, config.httpMaxHostOpen());
// Default `maxconnects` is 0, which makes libcurl compute a
// limit from the number of running transfers; for the WS use
// case (CONNECT_ONLY=2 handles that stay attached but idle in
// the cpool, waiting on curl_ws_send/recv), that math under-
// estimates and the cpool starts evicting our own conns —
// which then breaks Curl_getconnectinfo with "Failed to get
// recent socket". A generous explicit cap avoids that.
try libcurl.curl_multi_setopt(multi, .maxconnects, @as(c_long, 256));
return .{
.allocator = allocator,
@@ -373,7 +406,7 @@ pub fn init(allocator: Allocator, app: *App, config: *const Config) !Network {
pub fn deinit(self: *Network) void {
libcurl.curl_multi_cleanup(self.multi) catch {};
self.inbox_pool.deinit();
self.cdp_regs.deinit(self.allocator);
self.poll_regs.deinit(self.allocator);
self.extra_fds.deinit(self.allocator);
if (self.ca_blob) |ca_blob| {
@@ -448,7 +481,7 @@ pub fn unbind(self: *Network) void {
// applies it in drainInbox. Handler callbacks fire on the network
// thread once polling picks up activity on the fd.
pub fn submitCdpRegister(self: *Network, fd: posix.fd_t, handler: CdpHandler) void {
self.submit(.{ .cdp_register = .{ .fd = fd, .handler = handler } });
self.submit(.{ .poll_register = .{ .fd = fd, .handler = .{ .cdp = handler } } });
}
// Async: enqueue a CDP-socket unregistration. The network thread
@@ -456,7 +489,18 @@ pub fn submitCdpRegister(self: *Network, fd: posix.fd_t, handler: CdpHandler) vo
// the worker uses that as the ack that no more handler calls will
// happen for this fd (safe to free ctx after draining its inbox).
pub fn submitCdpUnregister(self: *Network, fd: posix.fd_t) void {
self.submit(.{ .cdp_unregister = fd });
self.submit(.{ .poll_unregister = fd });
}
// WebSocket variant. Same lifecycle (handler ack, etc.) but the
// network thread only signals readability — the worker does the
// read via libcurl's WS API.
pub fn submitWsRegister(self: *Network, fd: posix.fd_t, handler: WsHandler) void {
self.submit(.{ .poll_register = .{ .fd = fd, .handler = .{ .ws = handler } } });
}
pub fn submitWsUnregister(self: *Network, fd: posix.fd_t) void {
self.submit(.{ .poll_unregister = fd });
}
pub fn onTick(self: *Network, ctx: *anyopaque, callback: *const fn (*anyopaque) void) void {
@@ -491,11 +535,11 @@ pub fn run(self: *Network) void {
var cdp_buf: [16 * 1024]u8 = undefined;
// Listener (if bound) lives at extra_fds[0] for the duration.
// CDP entries occupy extra_fds[cdpOffset()..] in lock-step with
// cdp_regs: cdp_regs[i] ↔ extra_fds[i + cdpOffset()]. All
// mutations happen on the network thread (here or in drainInbox)
// so no locking is needed; the invariant holds across each
// iteration boundary.
// Worker-owned poll regs occupy extra_fds[pollOffset()..] in
// lock-step with poll_regs: poll_regs[i] ↔ extra_fds[i + pollOffset()].
// All mutations happen on the network thread (here or in
// drainInbox) so no locking is needed; the invariant holds
// across each iteration boundary.
if (self.listener) |listener| {
self.extra_fds.append(self.allocator, .{
.fd = listener.socket,
@@ -512,8 +556,8 @@ pub fn run(self: *Network) void {
if (self.listener != null and !self.accept.load(.acquire)) {
posix.close(self.listener.?.socket);
self.listener = null;
// Shift CDP entries down by 1 to keep the parallel
// invariant (cdp_regs[i] ↔ extra_fds[i + cdpOffset()]).
// Shift poll entries down by 1 to keep the parallel
// invariant (poll_regs[i] ↔ extra_fds[i + pollOffset()]).
// O(N) but happens at most once per Network lifetime.
_ = self.extra_fds.orderedRemove(0);
}
@@ -564,17 +608,18 @@ pub fn run(self: *Network) void {
self.acceptConnections();
}
// CDP: parallel-array dispatch, no fd lookups. Eof entries
// are skipped (their extra_fds.events is zeroed so they
// can't have pollin set, but the explicit check documents
// the invariant). Iteration order is stable because eof
// doesn't remove from either list — only unregister does.
const offset = self.cdpOffset();
for (self.cdp_regs.items, 0..) |*reg, i| {
// Poll regs: parallel-array dispatch, no fd lookups. Eof
// entries are skipped (their extra_fds.events is zeroed so
// they can't have pollin set, but the explicit check
// documents the invariant). Iteration order is stable
// because eof doesn't remove from either list — only
// unregister does.
const offset = self.pollOffset();
for (self.poll_regs.items, 0..) |*reg, i| {
if (reg.eof) continue;
const fd_state = self.extra_fds.items[i + offset];
if (!fd_state.revents.pollin) continue;
self.dispatchCdpRead(reg, i + offset, &cdp_buf);
self.dispatchPollRead(reg, i + offset, &cdp_buf);
}
libcurl.curl_multi_perform(self.multi, &running_handles) catch |err| {
@@ -707,8 +752,8 @@ fn drainInbox(self: *Network) void {
lp.log.warn(.app, "curl setTlsVerify", .{ .err = err });
};
},
.cdp_register => |r| self.handleCdpRegister(r.fd, r.handler),
.cdp_unregister => |fd| self.handleCdpUnregister(fd),
.poll_register => |r| self.handlePollRegister(r.fd, r.handler),
.poll_unregister => |fd| self.handlePollUnregister(fd),
}
}
@@ -762,13 +807,13 @@ fn fireOnComplete(self: *Network, conn: *http.Connection, err: ?anyerror) void {
}
}
// Append a CDP registration. Both lists grow together so
// extra_fds[cdp_start + i] always matches cdp_regs[i] for non-eof
// entries — see the run loop's dispatch indexing.
fn handleCdpRegister(self: *Network, fd: posix.fd_t, handler: CdpHandler) void {
self.cdp_regs.append(self.allocator, .{ .fd = fd, .handler = handler }) catch |err| {
lp.log.err(.app, "cdp register OOM", .{ .err = err });
handler.on_disconnect(handler.ctx);
// Append a poll registration. Both lists grow together so
// extra_fds[pollOffset() + i] always matches poll_regs[i] for
// non-eof entries — see the run loop's dispatch indexing.
fn handlePollRegister(self: *Network, fd: posix.fd_t, handler: PollHandler) void {
self.poll_regs.append(self.allocator, .{ .fd = fd, .handler = handler }) catch |err| {
lp.log.err(.app, "poll register OOM", .{ .err = err });
handler.onDisconnect();
return;
};
self.extra_fds.append(self.allocator, .{
@@ -776,9 +821,9 @@ fn handleCdpRegister(self: *Network, fd: posix.fd_t, handler: CdpHandler) void {
.events = .{ .pollin = true },
.revents = .{},
}) catch |err| {
lp.log.err(.app, "cdp register OOM extra_fds", .{ .err = err });
_ = self.cdp_regs.pop();
handler.on_disconnect(handler.ctx);
lp.log.err(.app, "poll register OOM extra_fds", .{ .err = err });
_ = self.poll_regs.pop();
handler.onDisconnect();
return;
};
}
@@ -788,49 +833,54 @@ fn handleCdpRegister(self: *Network, fd: posix.fd_t, handler: CdpHandler) void {
// is waiting for — unless we already fired it on EOF.
//
// swapRemove on both lists at the same logical index preserves the
// parallel invariant: cdp_regs[i] ↔ extra_fds[i + offset]. Both
// parallel invariant: poll_regs[i] ↔ extra_fds[i + offset]. Both
// lists move their last entry to position i.
fn handleCdpUnregister(self: *Network, fd: posix.fd_t) void {
const offset = self.cdpOffset();
for (self.cdp_regs.items, 0..) |*reg, i| {
fn handlePollUnregister(self: *Network, fd: posix.fd_t) void {
const offset = self.pollOffset();
for (self.poll_regs.items, 0..) |*reg, i| {
if (reg.fd != fd) continue;
const already_eof = reg.eof;
const handler = reg.handler;
_ = self.cdp_regs.swapRemove(i);
_ = self.poll_regs.swapRemove(i);
_ = self.extra_fds.swapRemove(i + offset);
if (!already_eof) handler.on_disconnect(handler.ctx);
if (!already_eof) handler.onDisconnect();
return;
}
}
fn cdpOffset(self: *Network) usize {
fn pollOffset(self: *Network) usize {
return if (self.listener != null) 1 else 0;
}
// Read from a readable CDP socket and dispatch to its handler.
// On EOF / error, marks the reg eof'd and fires on_disconnect;
// the reg+extra_fds slot stay parallel until the worker's
// unregister fully removes them.
fn dispatchCdpRead(self: *Network, reg: *CdpReg, fd_idx: usize, buf: []u8) void {
const n = posix.read(reg.fd, buf) catch |err| {
lp.log.warn(.app, "cdp read", .{ .err = err });
self.markCdpEof(reg, fd_idx);
return;
};
if (n == 0) {
self.markCdpEof(reg, fd_idx);
return;
// Read from a readable poll socket and dispatch to its handler.
// CDP regs read here in the network thread; WS regs hand the
// readable edge to the worker (libcurl owns the read). On EOF /
// error for CDP, marks the reg eof'd and fires on_disconnect.
fn dispatchPollRead(self: *Network, reg: *PollReg, fd_idx: usize, buf: []u8) void {
switch (reg.handler) {
.cdp => |h| {
const n = posix.read(reg.fd, buf) catch |err| {
lp.log.warn(.app, "cdp read", .{ .err = err });
self.markPollEof(reg, fd_idx);
return;
};
if (n == 0) {
self.markPollEof(reg, fd_idx);
return;
}
h.on_data(h.ctx, buf[0..n]);
},
.ws => |h| h.on_readable(h.ctx),
}
reg.handler.on_data(reg.handler.ctx, buf[0..n]);
}
// EOF / error path. Reg stays in cdp_regs (and its extra_fds slot)
// EOF / error path. Reg stays in poll_regs (and its extra_fds slot)
// to preserve the parallel invariant; zeroing events stops poll
// from waking us on this fd. Worker's unregister will fully remove.
fn markCdpEof(self: *Network, reg: *CdpReg, fd_idx: usize) void {
fn markPollEof(self: *Network, reg: *PollReg, fd_idx: usize) void {
reg.eof = true;
self.extra_fds.items[fd_idx].events = .{};
reg.handler.on_disconnect(reg.handler.ctx);
reg.handler.onDisconnect();
}
pub fn stop(self: *Network) void {
@@ -890,10 +940,27 @@ fn processCompletions(self: *Network) void {
lp.assert(false, "curl getinfo private", .{});
const conn: *http.Connection = @ptrCast(@alignCast(ptr));
libcurl.curl_multi_remove_handle(self.multi, easy) catch |err| {
lp.assert(false, "curl multi remove (post-completion)", .{ .err = err });
// For CONNECT_ONLY=2 transports (WebSocket) we need the
// socket fd post-completion, but `curl_multi_remove_handle`
// invalidates CURLINFO_ACTIVESOCKET. Capture it here while
// the handle is still in the multi. Only meaningful on
// success; failures don't reach a usable socket.
// WebSocket transports stay attached to the multi after the
// CONNECT_ONLY=2 upgrade completes: curl_ws_send/recv look
// the conn up through the multi's connection pool, and
// detaching here would break those calls (CURLE_UNSUPPORTED_PROTOCOL
// from easy_connection's "Failed to get recent socket" branch).
// The worker removes the handle via disownConn during teardown.
const keep_attached = switch (conn.transport) {
.websocket => maybe_err == null,
else => false,
};
conn._in_multi = false;
if (!keep_attached) {
libcurl.curl_multi_remove_handle(self.multi, easy) catch |err| {
lp.assert(false, "curl multi remove (post-completion)", .{ .err = err });
};
conn._in_multi = false;
}
self.fireOnComplete(conn, maybe_err);
}
}
@@ -907,17 +974,21 @@ pub fn getConn(self: *Network) ?*http.Connection {
return @fieldParentPtr("node", node);
}
// Called from a Worker Thread
// Called from a Worker Thread. Routes by the conn's
// allocation origin (`_pool`), not its current `transport` —
// abort paths clear transport to `.none` before release, so the
// pool tag is the only reliable signal of where the conn came
// from.
pub fn releaseConn(self: *Network, conn: *http.Connection) void {
switch (conn.transport) {
.websocket => {
switch (conn._pool) {
.ws => {
conn.deinit();
self.ws_mutex.lock();
defer self.ws_mutex.unlock();
self.ws_pool.destroy(conn);
self.ws_count -= 1;
},
else => {
.http => {
conn.reset(self.config, self.ca_blob, self.ip_filter) catch |err| {
lp.assert(false, "couldn't reset curl easy", .{ .err = err });
};
@@ -952,6 +1023,7 @@ pub fn newWSConn(self: *Network) ?*http.Connection {
return null;
};
conn._pool = .ws;
return conn;
}

View File

@@ -274,6 +274,13 @@ pub const Connection = struct {
in_use: bool,
transport: Transport,
// Which pool this conn was allocated from. Routing in
// `Network.releaseConn` keys off this rather than the current
// `transport`, because abort paths (`disownConn`) clear
// transport to `.none` before the conn is released. Set by
// the pool-specific allocator (`newWSConn` flips it to `.ws`).
_pool: Pool = .http,
// Network-thread node. Lives in either the available pool or — once
// the network thread has handed the conn back — `Handle._completion_queue`.
// Mailbox messages reference the conn by pointer, so this node is *not*
@@ -309,6 +316,8 @@ pub const Connection = struct {
websocket: *@import("../browser/webapi/net/WebSocket.zig"),
};
pub const Pool = enum { http, ws };
pub fn init(
ca_blob: ?libcurl.CurlBlob,
config: *const Config,
@@ -405,6 +414,18 @@ pub const Connection = struct {
try libcurl.curl_easy_setopt(self._easy, .connect_only, value);
}
// Force this easy handle to open a new TCP connection
// instead of pulling one from libcurl's multi-level cpool.
// Used by WebSocket: a cached WS connection (post-upgrade)
// can't be reused for a fresh handshake — libcurl would try
// to send HTTP Upgrade on a socket that's already in WS
// protocol mode and fail. Leaves the resulting conn eligible
// for cpool placement after handshake (which curl_ws_send /
// curl_ws_recv rely on to look up the conn post-completion).
pub fn setFreshConnect(self: *const Connection, fresh: bool) !void {
try libcurl.curl_easy_setopt(self._easy, .fresh_connect, @as(c_long, if (fresh) 1 else 0));
}
pub fn setWriteCallback(
self: *Connection,
comptime data_cb: libcurl.CurlWriteFunction,
@@ -433,6 +454,18 @@ pub const Connection = struct {
try libcurl.curl_easy_setopt(self._easy, .header_function, data_cb);
}
// Register a sockopt callback. Fires once libcurl has created
// (or pulled from cache) the socket for the upcoming transfer,
// before connect(). Most reliable place to capture the fd —
// see WebSocket's use for the why.
pub fn setSockoptCallback(
self: *Connection,
comptime data_cb: libcurl.CurlSockoptFunction,
) !void {
try libcurl.curl_easy_setopt(self._easy, .sockopt_data, self);
try libcurl.curl_easy_setopt(self._easy, .sockopt_function, data_cb);
}
pub fn pause(
self: *Connection,
flags: libcurl.CurlPauseFlags,
@@ -617,6 +650,59 @@ pub const Connection = struct {
pub fn wsMeta(self: *const Connection) ?libcurl.WsFrameMeta {
return libcurl.curl_ws_meta(self._easy);
}
// Send a WebSocket frame using libcurl's CONNECT_ONLY=2 mode.
// Always issues at least one curl_ws_send so the frame header
// goes on the wire even for a zero-length payload (empty
// text/binary frame). After that, loops while libcurl wants
// more bytes (CURLE_OK with a partial `sent`); returns once
// the whole buffer is consumed or libcurl reports CURLE_AGAIN
// (socket would block). On Again the caller is expected to
// retry later from `buf[returned..]`.
pub fn wsSend(
self: *const Connection,
buf: []const u8,
frame_type: libcurl.WsFrameType,
) !usize {
var total: usize = 0;
while (true) {
var sent: usize = 0;
libcurl.curl_ws_send(self._easy, buf[total..], &sent, 0, frame_type) catch |err| {
if (err == error.Again) {
total += sent;
break;
}
return err;
};
total += sent;
if (total >= buf.len) break;
// Guard against an infinite loop if libcurl ever
// returns OK with sent=0 on a non-empty buffer.
if (sent == 0) break;
}
return total;
}
// Receive bytes from a CONNECT_ONLY=2 WebSocket into `buf`.
// Returns the byte count and the frame metadata for the
// in-progress frame. Propagates error.Again when no bytes are
// currently available.
pub fn wsRecv(self: *const Connection, buf: []u8) !struct { usize, libcurl.WsFrameMeta } {
var received: usize = 0;
var meta: ?libcurl.WsFrameMeta = null;
try libcurl.curl_ws_recv(self._easy, buf, &received, &meta);
return .{ received, meta orelse return error.NoFrameMeta };
}
// Returns the underlying socket fd. Only meaningful after the
// CONNECT_ONLY=2 handshake has completed — libcurl reports
// CURL_SOCKET_BAD before that.
pub fn getActiveSocket(self: *const Connection) !posix.fd_t {
var s: libcurl.CurlSocket = libcurl.CURL_SOCKET_BAD;
try libcurl.curl_easy_getinfo(self._easy, .active_socket, &s);
if (s == libcurl.CURL_SOCKET_BAD) return error.NoActiveSocket;
return @intCast(s);
}
};
fn debugCallback(_: *libcurl.Curl, msg_type: libcurl.CurlInfoType, raw: [*c]u8, len: usize, _: *anyopaque) c_int {

View File

@@ -43,6 +43,12 @@ pub const curl_writefunc_error: usize = c.CURL_WRITEFUNC_ERROR;
pub const curl_readfunc_pause: usize = c.CURL_READFUNC_PAUSE;
pub const CurlReadFunction = fn ([*]u8, usize, usize, *anyopaque) usize;
// Return values for CURLOPT_SOCKOPTFUNCTION.
pub const curl_sockopt_ok: c_int = c.CURL_SOCKOPT_OK;
pub const curl_sockopt_error: c_int = c.CURL_SOCKOPT_ERROR;
pub const curl_sockopt_already_connected: c_int = c.CURL_SOCKOPT_ALREADY_CONNECTED;
pub const CurlSockoptFunction = fn (clientp: *anyopaque, fd: CurlSocket, purpose: CurlSockType) c_int;
pub const CurlSockType = enum(c.curlsocktype) {
ipcxn = c.CURLSOCKTYPE_IPCXN,
accept = c.CURLSOCKTYPE_ACCEPT,
@@ -226,13 +232,18 @@ pub const CurlOption = enum(c.CURLoption) {
read_data = c.CURLOPT_READDATA,
read_function = c.CURLOPT_READFUNCTION,
connect_only = c.CURLOPT_CONNECT_ONLY,
forbid_reuse = c.CURLOPT_FORBID_REUSE,
fresh_connect = c.CURLOPT_FRESH_CONNECT,
upload = c.CURLOPT_UPLOAD,
opensocket_function = c.CURLOPT_OPENSOCKETFUNCTION,
opensocket_data = c.CURLOPT_OPENSOCKETDATA,
sockopt_function = c.CURLOPT_SOCKOPTFUNCTION,
sockopt_data = c.CURLOPT_SOCKOPTDATA,
};
pub const CurlMOption = enum(c.CURLMoption) {
max_host_connections = c.CURLMOPT_MAX_HOST_CONNECTIONS,
maxconnects = c.CURLMOPT_MAXCONNECTS,
};
pub const CurlInfo = enum(c.CURLINFO) {
@@ -241,6 +252,7 @@ pub const CurlInfo = enum(c.CURLINFO) {
redirect_count = c.CURLINFO_REDIRECT_COUNT,
response_code = c.CURLINFO_RESPONSE_CODE,
connect_code = c.CURLINFO_HTTP_CONNECTCODE,
active_socket = c.CURLINFO_ACTIVESOCKET,
};
pub const Error = error{
@@ -615,6 +627,8 @@ pub fn curl_easy_setopt(easy: *Curl, comptime option: CurlOption, value: anytype
.follow_location,
.post_field_size,
.connect_only,
.forbid_reuse,
.fresh_connect,
=> blk: {
const n: c_long = switch (@typeInfo(@TypeOf(value))) {
.comptime_int, .int => @intCast(value),
@@ -660,6 +674,7 @@ pub fn curl_easy_setopt(easy: *Curl, comptime option: CurlOption, value: anytype
.read_data,
.write_data,
.opensocket_data,
.sockopt_data,
=> blk: {
const ptr: ?*anyopaque = switch (@typeInfo(@TypeOf(value))) {
.null => null,
@@ -697,6 +712,20 @@ pub fn curl_easy_setopt(easy: *Curl, comptime option: CurlOption, value: anytype
break :blk c.curl_easy_setopt(easy, opt, cb);
},
.sockopt_function => blk: {
const cb: c.curl_sockopt_callback = switch (@typeInfo(@TypeOf(value))) {
.null => null,
.@"fn" => struct {
fn cb(clientp: ?*anyopaque, fd: c.curl_socket_t, purpose: c.curlsocktype) callconv(.c) c_int {
const u = clientp orelse return c.CURL_SOCKOPT_ERROR;
return value(u, fd, @enumFromInt(purpose));
}
}.cb,
else => @compileError("expected Zig function or null for " ++ @tagName(option) ++ ", got " ++ @typeName(@TypeOf(value))),
};
break :blk c.curl_easy_setopt(easy, opt, cb);
},
.header_function => blk: {
const cb: c.curl_write_callback = switch (@typeInfo(@TypeOf(value))) {
.null => null,
@@ -769,6 +798,10 @@ pub fn curl_easy_getinfo(easy: *Curl, comptime info: CurlInfo, out: anytype) Err
const p: **anyopaque = out;
break :blk c.curl_easy_getinfo(easy, inf, p);
},
.active_socket => blk: {
const p: *CurlSocket = out;
break :blk c.curl_easy_getinfo(easy, inf, p);
},
};
try errorCheck(code);
}
@@ -829,7 +862,9 @@ pub fn curl_multi_cleanup(multi: *CurlM) ErrorMulti!void {
pub fn curl_multi_setopt(multi: *CurlM, comptime option: CurlMOption, value: anytype) ErrorMulti!void {
const opt: c.CURLMoption = @intFromEnum(option);
const code = switch (option) {
.max_host_connections => blk: {
.max_host_connections,
.maxconnects,
=> blk: {
const n: c_long = switch (@typeInfo(@TypeOf(value))) {
.comptime_int, .int => @intCast(value),
else => @compileError("expected integer for " ++ @tagName(option) ++ ", got " ++ @typeName(@TypeOf(value))),

View File

@@ -453,7 +453,9 @@ fn runWebApiTest(test_file: [:0]const u8, timeout_ms: u32) !void {
return error.TestTimedOut;
}
wait_ms -= @intCast(ms_elapsed);
std.Thread.sleep(std.time.ns_per_ms * sleep_ms);
if (sleep_ms > 0) {
std.Thread.sleep(std.time.ns_per_ms * sleep_ms);
}
}
}