From 8bba8a0a4bede87a7565fdc6c43af46d9e5ac0fc Mon Sep 17 00:00:00 2001 From: Nikolay Govorov Date: Tue, 14 Apr 2026 12:35:36 +0100 Subject: [PATCH] Refactor websocket callbacks --- src/browser/HttpClient.zig | 32 ++++++ src/browser/webapi/net/WebSocket.zig | 160 ++++++++++++++++++++++----- 2 files changed, 164 insertions(+), 28 deletions(-) diff --git a/src/browser/HttpClient.zig b/src/browser/HttpClient.zig index 0832db1a..22860b55 100644 --- a/src/browser/HttpClient.zig +++ b/src/browser/HttpClient.zig @@ -83,6 +83,11 @@ dirty: std.DoublyLinkedList = .{}, // Whether we're currently inside a curl_multi_perform call. performing: bool = false, +// WebSockets with queued events to be drained from the worker thread. +// Populated by libcurl callbacks (currently same thread, future cross-thread). +ws_ready: std.ArrayList(*WebSocket) = .{}, +ws_ready_mutex: std.Thread.Mutex = .{}, + // Use to generate the next request ID next_request_id: u32 = 0, @@ -184,6 +189,7 @@ pub fn deinit(self: *Client) void { self.abort(); self.handles.deinit(); + self.ws_ready.deinit(self.allocator); self.transfer_pool.deinit(); var robots_iter = self.pending_robots_queue.iterator(); @@ -859,6 +865,11 @@ fn perform(self: *Client, timeout_ms: c_int) anyerror!PerformStatus { break :blk try self.handles.perform(); }; + // Drain queued WebSocket events. ws callbacks (called from libcurl during + // perform above) only buffer/queue — actual JS dispatch happens here, on + // the worker thread. + self.drainReadyWs(); + // Process dirty connections — return them to Network pool. while (self.dirty.popFirst()) |node| { const conn: *http.Connection = @fieldParentPtr("node", node); @@ -1148,6 +1159,27 @@ fn releaseConn(self: *Client, conn: *http.Connection) void { self.network.releaseConnection(conn); } +// Called from WebSocket libcurl callbacks (currently same worker thread, but +// the API is mutex-protected so it stays correct if libcurl moves off-thread). +pub fn addReadyWs(self: *Client, ws: *WebSocket) void { + self.ws_ready_mutex.lock(); + defer self.ws_ready_mutex.unlock(); + self.ws_ready.append(self.allocator, ws) catch {}; +} + +fn drainReadyWs(self: *Client) void { + self.ws_ready_mutex.lock(); + const items = self.ws_ready.toOwnedSlice(self.allocator) catch { + self.ws_ready_mutex.unlock(); + return; + }; + self.ws_ready_mutex.unlock(); + defer self.allocator.free(items); + for (items) |ws| { + ws.drainPending(); + } +} + fn ensureNoActiveConnection(self: *const Client) !void { if (self.http_active > 0 or self.ws_active > 0) { return error.InflightConnection; diff --git a/src/browser/webapi/net/WebSocket.zig b/src/browser/webapi/net/WebSocket.zig index 0b49b693..21c5a3c9 100644 --- a/src/browser/webapi/net/WebSocket.zig +++ b/src/browser/webapi/net/WebSocket.zig @@ -62,9 +62,30 @@ _req_headers: http.Headers, _send_queue: std.ArrayList(Message) = .empty, _send_offset: usize = 0, -// buffered incoming frame +// Single growing buffer for assembled ws frame bytes — both "currently +// assembling" and "completed but not yet dispatched" messages live here. +// Reset to zero length only by drainPending. Capacity grows to fit the +// largest message ever received, then stays put (arena allocation, never +// freed mid-life). Pending message data is referenced via _pending_messages +// offset/length pairs into items — slices stay valid until drain clears. _recv_buffer: std.ArrayList(u8) = .empty, +// Offset within _recv_buffer where the current in-flight frame began. +// Used to slice out the message when bytes_left reaches 0. +_assembling_start: usize = 0, + +// Events queued by libcurl callbacks; drained from the worker thread via +// drainPending. Callbacks must NEVER enter V8 directly (they can run from +// any thread driving curl_multi_perform), so all dispatch happens here. +_pending_messages: std.ArrayList(QueuedMessage) = .empty, +_pending_open: bool = false, +_pending_close: ?PendingClose = null, + +// Set while we're sitting in HttpClient.ws_ready. Doubles as the dedup +// flag and the marker that we hold one extra "pending events" ref so the +// WebSocket stays alive between queueing and drain. +_in_ready_list: bool = false, + // close info for event dispatch _close_code: u16 = 1000, _close_reason: []const u8 = "", @@ -85,6 +106,19 @@ pub const ReadyState = enum(u8) { closed = 3, }; +const QueuedMessage = struct { + offset: usize, + len: usize, + frame_type: http.WsFrameType, +}; + +const PendingClose = struct { + code: u16, + reason: []const u8, + was_clean: bool, + with_error: bool, +}; + pub const BinaryType = enum { blob, arraybuffer, @@ -202,6 +236,11 @@ pub fn kill(self: *WebSocket) void { } pub fn disconnected(self: *WebSocket, err_: ?anyerror) void { + if (self._ready_state == .closed) { + // already disconnected (e.g. close-handshake disconnected us, then + // libcurl reports the same connection completion). + return; + } const was_clean = self._ready_state == .closing and err_ == null; self._ready_state = .closed; @@ -211,24 +250,18 @@ pub fn disconnected(self: *WebSocket, err_: ?anyerror) void { log.info(.websocket, "disconnected", .{ .url = self._url, .reason = "closed" }); } - defer self.cleanup(); - - // Use 1006 (abnormal closure) if connection wasn't cleanly closed - const code = if (was_clean) self._close_code else 1006; - const reason = if (was_clean) self._close_reason else ""; - - // Spec requires error event before close on abnormal closure. - // Dispatch events before cleanup since cleanup releases the ref count - // which may free our event handler references. - if (!was_clean) { - self.dispatchErrorEvent() catch |err| { - log.err(.websocket, "error event dispatch failed", .{ .err = err }); - }; - } - - self.dispatchCloseEvent(code, reason, was_clean) catch |err| { - log.err(.websocket, "close event dispatch failed", .{ .err = err }); + // Queue events first (markReady acquires a "pending" ref), then cleanup + // (which releases the create-time ref). The pending ref keeps us alive + // until drainPending dispatches and releases it. + self._pending_close = .{ + .code = if (was_clean) self._close_code else 1006, + .reason = if (was_clean) self._close_reason else "", + .was_clean = was_clean, + .with_error = !was_clean, }; + self.markReady(); + + self.cleanup(); } fn cleanup(self: *WebSocket) void { @@ -352,8 +385,14 @@ pub fn close(self: *WebSocket, code_: ?u16, reason_: ?[]const u8) !void { if (self._ready_state == .connecting) { // Connection not yet established - fail it self._ready_state = .closed; + self._pending_close = .{ + .code = code, + .reason = try self._arena.dupe(u8, reason), + .was_clean = false, + .with_error = false, + }; + self.markReady(); self.cleanup(); - try self.dispatchCloseEvent(code, reason, false); return; } @@ -448,6 +487,53 @@ pub fn setOnClose(self: *WebSocket, cb_: ?js.Function) !void { } } +// Register self as having pending events to drain. Called from any thread +// that produces ws events (currently libcurl callbacks on the worker thread, +// future: Network thread). Acquires one extra ref to keep the WebSocket +// alive between queueing and the drainPending call. +fn markReady(self: *WebSocket) void { + if (self._in_ready_list) return; + self._in_ready_list = true; + self.acquireRef(); + self._http_client.addReadyWs(self); +} + +// Dispatches all queued events to JS. Must be called from the worker thread +// (the one that owns the V8 isolate). HttpClient calls this from its perform +// loop after curl_multi_perform. +pub fn drainPending(self: *WebSocket) void { + self._in_ready_list = false; + defer self.releaseRef(self._page._session); + + if (self._pending_open) { + self._pending_open = false; + self.dispatchOpenEvent() catch |err| { + log.err(.websocket, "open event fail", .{ .err = err }); + }; + } + + for (self._pending_messages.items) |msg| { + const data = self._recv_buffer.items[msg.offset..][0..msg.len]; + self.dispatchMessageEvent(data, msg.frame_type) catch |err| { + log.warn(.websocket, "message dispatch", .{ .err = err }); + }; + } + self._pending_messages.clearRetainingCapacity(); + self._recv_buffer.clearRetainingCapacity(); + + if (self._pending_close) |pc| { + self._pending_close = null; + if (pc.with_error) { + self.dispatchErrorEvent() catch |err| { + log.err(.websocket, "error event dispatch failed", .{ .err = err }); + }; + } + self.dispatchCloseEvent(pc.code, pc.reason, pc.was_clean) catch |err| { + log.err(.websocket, "close event dispatch failed", .{ .err = err }); + }; + } +} + fn dispatchOpenEvent(self: *WebSocket) !void { const frame = self._frame; const target = self.asEventTarget(); @@ -612,12 +698,14 @@ fn _receivedDataCallback(conn: *http.Connection, data: []const u8) !void { if (comptime IS_DEBUG) { log.debug(.websocket, "incoming message", .{ .url = self._url, .len = meta.len, .bytes_left = meta.bytes_left, .type = meta.frame_type }); } - // Start of new frame. Pre-allocate buffer - self._recv_buffer.clearRetainingCapacity(); + // Start of new frame. Record where it begins inside the shared buffer + // (the buffer is only cleared by drainPending, so messages from the + // same drain cycle live side-by-side). if (meta.len > self._http_client.max_response_size) { return error.MessageTooLarge; } - try self._recv_buffer.ensureTotalCapacity(self._arena, meta.len); + self._assembling_start = self._recv_buffer.items.len; + try self._recv_buffer.ensureTotalCapacity(self._arena, self._assembling_start + meta.len); } try self._recv_buffer.appendSlice(self._arena, data); @@ -627,11 +715,23 @@ fn _receivedDataCallback(conn: *http.Connection, data: []const u8) !void { return; } - const message = self._recv_buffer.items; + const start = self._assembling_start; + const len = self._recv_buffer.items.len - start; switch (meta.frame_type) { - .text, .binary => try self.dispatchMessageEvent(message, meta.frame_type), + .text, .binary => { + // Queue the message — actual JS dispatch happens in drainPending + // on the worker thread. Slice [start..start+len] of _recv_buffer + // stays valid until drain clears it. + try self._pending_messages.append(self._arena, .{ + .offset = start, + .len = len, + .frame_type = meta.frame_type, + }); + self.markReady(); + }, .close => { // Parse close frame: 2-byte code (big-endian) + optional reason + const message = self._recv_buffer.items[start..][0..len]; const received_code = if (message.len >= 2) @as(u16, message[0]) << 8 | message[1] else @@ -650,8 +750,13 @@ fn _receivedDataCallback(conn: *http.Connection, data: []const u8) !void { self._ready_state = .closing; try self.queueMessage(.close); } + // Close payload isn't a queued message — discard from buffer. + self._recv_buffer.shrinkRetainingCapacity(start); + }, + .ping, .pong, .cont => { + // Not dispatched as messages — discard from buffer. + self._recv_buffer.shrinkRetainingCapacity(start); }, - .ping, .pong, .cont => {}, } } @@ -681,9 +786,8 @@ fn receivedHeaderCallback(buffer: [*]const u8, header_count: usize, buf_len: usi self._ready_state = .open; log.info(.websocket, "connected", .{ .url = self._url }); - self.dispatchOpenEvent() catch |err| { - log.err(.websocket, "open event fail", .{ .err = err }); - }; + self._pending_open = true; + self.markReady(); return buf_len; }