diff --git a/src/browser/HttpClient.zig b/src/browser/HttpClient.zig index caefae50..78e8b393 100644 --- a/src/browser/HttpClient.zig +++ b/src/browser/HttpClient.zig @@ -232,7 +232,7 @@ pub fn deinit(self: *Client) void { self.network.drainPendingForShutdown(); } _ = self.processInbox(5_000) catch {}; - if (watchdog.read() > 30 * std.time.ns_per_s) { + if (watchdog.read() > 10 * std.time.ns_per_s) { lp.assert(false, "HttpClient.deinit: stuck draining cancellations", .{ .http_active = self.http_active, .ws_active = self.ws_active, @@ -336,6 +336,30 @@ pub fn abort(self: *Client) void { t.kill(); } + // WebSockets aren't in `self.transfers`; they live on owners, + // and the owner teardown path (`abortOwner`) normally kills + // them. If we reach here with WS conns still tracked in + // `in_use` it's because no owner aborted them — Client.deinit + // would otherwise hang on the wait loop. Snapshot the WS + // conns (kill mutates in_use) and tear each down. + var ws_snapshot: std.ArrayList(*WebSocket) = .empty; + defer ws_snapshot.deinit(self.allocator); + { + var node = self.in_use.first; + while (node) |n| : (node = n.next) { + const conn: *http.Connection = @fieldParentPtr("_worker_node", n); + if (conn._pool != .ws) continue; + const ws = switch (conn.transport) { + .websocket => |w| w, + else => continue, + }; + ws_snapshot.append(self.allocator, ws) catch @panic("OOM"); + } + } + for (ws_snapshot.items) |ws| { + ws.kill(); + } + // After the kill loop: // - self.queue is empty (queued transfers had no conn and // deinit'd synchronously). @@ -641,7 +665,7 @@ fn perform(self: *Client, timeout_ms: c_int) anyerror!PerformStatus { // Used by deinit to drain canceled completions until in_use empties. // Returns true if at least one message was processed. -fn processInbox(self: *Client, timeout_ms: u32) !bool { +pub fn processInbox(self: *Client, timeout_ms: u32) !bool { var processed = false; while (true) { const wait_ms: u32 = if (processed) 0 else timeout_ms; @@ -660,15 +684,13 @@ fn processInbox(self: *Client, timeout_ms: u32) !bool { .cdp_disconnect => { if (self.cdp_client) |cdp| cdp.on_disconnect(cdp.ctx); }, - .ws_open => |ws| ws.handleOpen() catch |err| { - log.err(.websocket, "ws_open dispatch", .{ .err = err }); + .ws_readable => |ws| ws.handleReadable() catch |err| { + log.err(.websocket, "ws_readable dispatch", .{ .err = err }); }, - .ws_message => |m| { - defer self.allocator.free(m.data); - m.ws.handleMessage(m.data, m.frame_type) catch |err| { - log.err(.websocket, "ws_message dispatch", .{ .err = err }); - }; + .ws_send_retry => |ws| ws.handleSendRetry() catch |err| { + log.err(.websocket, "ws_send_retry dispatch", .{ .err = err }); }, + .ws_disconnected => |ws| ws.handleNetDisconnected(), } } return processed; @@ -685,11 +707,17 @@ fn handleHttpCompletion(self: *Client, conn: *http.Connection, err: ?anyerror) v if (done) transfer.deinit(); }, .websocket => |ws| { - // ws_active gets decremented through the call to disconnected. + // CONNECT_ONLY=2: this is the upgrade-completion edge, + // not a teardown. On success, the WS hands the easy + // handle off to itself (worker-owned curl_ws_send/recv) + // and the conn is just metadata; on error, disconnect. if (err) |e| switch (e) { error.GotNothing, error.Canceled => ws.disconnected(null), else => ws.disconnected(e), - } else ws.disconnected(null); + } else ws.handshakeComplete() catch |perr| { + log.err(.websocket, "ws handshake complete", .{ .err = perr }); + ws.disconnected(perr); + }; }, .none => { // The owner disowned this conn before the terminal @@ -1642,32 +1670,32 @@ pub const InMessage = union(enum) { // CDP socket EOF / error / unregister ack from the network thread. cdp_disconnect, - // WebSocket open handshake completed. The network thread observed - // the upgrade headers and set `_ready_state = .open`; the worker - // dispatches the JS open event. - ws_open: *WebSocket, - // WebSocket text/binary frame fully assembled. `data` is - // heap-allocated from `Client.allocator` (copied off the libcurl - // buffer); the worker frees it after dispatching the JS message - // event. - ws_message: WsMessage, - // NOTE: there is no `ws_disconnect`/`ws_close` variant. The - // close-handshake completion (whether the server initiates or we - // do) flows through libcurl's natural termination — the WS conn - // completes, fires the normal HTTP-style completion, and - // handleHttpCompletion's `.websocket` arm dispatches via - // `ws.disconnected` on the worker. Avoids duplicating the path. + // Network thread observed the post-handshake socket as readable. + // The worker drains via curl_ws_recv until Again. Deduped by an + // atomic flag on the WebSocket so a slow worker doesn't pile up + // duplicate notifications (see WebSocket._pending_readable). + ws_readable: *WebSocket, + + // Worker self-push: curl_ws_send returned Again on a partially- + // sent message. Retry on next inbox tick. Carries no payload — + // the WebSocket owns its send queue + offset. + ws_send_retry: *WebSocket, + + // Ack from the network thread that it has stopped polling this + // WebSocket's socket (after submitWsUnregister or a socket-side + // error). No more ws_readable / ws_disconnected events will + // arrive for this WS after this fires. + ws_disconnected: *WebSocket, + + // NOTE: the WS open transition is not a separate inbox event — + // it's the success arm of `http_completion` for a `.websocket` + // transport (CONNECT_ONLY=2 mode: libcurl drives the upgrade + // and signals completion through the normal multi pathway). pub const HttpCompletion = struct { conn: *http.Connection, err: ?anyerror, }; - - pub const WsMessage = struct { - ws: *WebSocket, - data: []u8, - frame_type: http.WsFrameType, - }; }; pub const Inbox = struct { @@ -1704,14 +1732,13 @@ pub const Inbox = struct { fd.* = -1; } } - // Free any undrained heap-allocated payloads (cdp_data / - // ws_message bytes) before the pool (which owns the Item - // structs) goes away. + // Free any undrained heap-allocated payloads (cdp_data + // bytes) before the pool (which owns the Item structs) + // goes away. while (self.queue.popFirst()) |node| { const item: *Item = @fieldParentPtr("node", node); switch (item.msg) { .cdp_data => |bytes| self.allocator.free(bytes), - .ws_message => |msg| self.allocator.free(msg.data), else => {}, } } @@ -1746,7 +1773,9 @@ pub const Inbox = struct { if (self.queue.popFirst()) |node| return self.takeItem(node); } - if (timeout_ms == 0) return null; + if (timeout_ms == 0) { + return null; + } var fds = [_]posix.pollfd{ .{ .fd = self.wake_pipe[0], .events = posix.POLL.IN, .revents = 0 }, @@ -1835,3 +1864,42 @@ fn cdpNetDisconnect(ctx: *anyopaque) void { log.err(.http, "cdp_disconnect push failed", .{ .err = err }); }; } + +// ── WebSocket socket handoff ──────────────────────────────────────────────── +// +// After the handshake completes (CONNECT_ONLY=2 mode), the WebSocket +// hands its active socket to the network thread for POLLIN watching. +// Network-side handlers dedup readable notifications via the WS's +// atomic flag — see `WebSocket._pending_readable`. + +pub fn registerWebSocket(_: *Client, ws: *WebSocket, fd: posix.fd_t) void { + ws._http_client.network.submitWsRegister(fd, .{ + .ctx = ws, + .on_readable = wsNetReadable, + .on_disconnect = wsNetDisconnected, + }); +} + +pub fn unregisterWebSocket(_: *Client, ws: *WebSocket, fd: posix.fd_t) void { + ws._http_client.network.submitWsUnregister(fd); +} + +// Network-thread side of WS readable. Dedup via the WS's atomic +// flag so a slow worker doesn't pile up duplicate notifications. +fn wsNetReadable(ctx: *anyopaque) void { + const ws: *WebSocket = @ptrCast(@alignCast(ctx)); + // Already a pending notification in flight — nothing to do. + if (ws._pending_readable.swap(true, .acq_rel)) return; + ws._http_client.inbox.push(.{ .ws_readable = ws }) catch |err| { + log.err(.websocket, "ws_readable push failed", .{ .err = err }); + // Clear so we don't get stuck if push failed. + ws._pending_readable.store(false, .release); + }; +} + +fn wsNetDisconnected(ctx: *anyopaque) void { + const ws: *WebSocket = @ptrCast(@alignCast(ctx)); + ws._http_client.inbox.push(.{ .ws_disconnected = ws }) catch |err| { + log.err(.websocket, "ws_disconnected push failed", .{ .err = err }); + }; +} diff --git a/src/browser/tests/net/websocket.html b/src/browser/tests/net/websocket.html index 2c2e315b..34040405 100644 --- a/src/browser/tests/net/websocket.html +++ b/src/browser/tests/net/websocket.html @@ -1,7 +1,7 @@ - --> -