mirror of
https://github.com/lightpanda-io/browser.git
synced 2026-06-11 01:25:53 -04:00
Refactor websocket callbacks
This commit is contained in:
@@ -83,6 +83,11 @@ dirty: std.DoublyLinkedList = .{},
|
||||
// Whether we're currently inside a curl_multi_perform call.
|
||||
performing: bool = false,
|
||||
|
||||
// WebSockets with queued events to be drained from the worker thread.
|
||||
// Populated by libcurl callbacks (currently same thread, future cross-thread).
|
||||
ws_ready: std.ArrayList(*WebSocket) = .{},
|
||||
ws_ready_mutex: std.Thread.Mutex = .{},
|
||||
|
||||
// Use to generate the next request ID
|
||||
next_request_id: u32 = 0,
|
||||
|
||||
@@ -184,6 +189,7 @@ pub fn deinit(self: *Client) void {
|
||||
self.abort();
|
||||
self.handles.deinit();
|
||||
|
||||
self.ws_ready.deinit(self.allocator);
|
||||
self.transfer_pool.deinit();
|
||||
|
||||
var robots_iter = self.pending_robots_queue.iterator();
|
||||
@@ -859,6 +865,11 @@ fn perform(self: *Client, timeout_ms: c_int) anyerror!PerformStatus {
|
||||
break :blk try self.handles.perform();
|
||||
};
|
||||
|
||||
// Drain queued WebSocket events. ws callbacks (called from libcurl during
|
||||
// perform above) only buffer/queue — actual JS dispatch happens here, on
|
||||
// the worker thread.
|
||||
self.drainReadyWs();
|
||||
|
||||
// Process dirty connections — return them to Network pool.
|
||||
while (self.dirty.popFirst()) |node| {
|
||||
const conn: *http.Connection = @fieldParentPtr("node", node);
|
||||
@@ -1148,6 +1159,27 @@ fn releaseConn(self: *Client, conn: *http.Connection) void {
|
||||
self.network.releaseConnection(conn);
|
||||
}
|
||||
|
||||
// Called from WebSocket libcurl callbacks (currently same worker thread, but
|
||||
// the API is mutex-protected so it stays correct if libcurl moves off-thread).
|
||||
pub fn addReadyWs(self: *Client, ws: *WebSocket) void {
|
||||
self.ws_ready_mutex.lock();
|
||||
defer self.ws_ready_mutex.unlock();
|
||||
self.ws_ready.append(self.allocator, ws) catch {};
|
||||
}
|
||||
|
||||
fn drainReadyWs(self: *Client) void {
|
||||
self.ws_ready_mutex.lock();
|
||||
const items = self.ws_ready.toOwnedSlice(self.allocator) catch {
|
||||
self.ws_ready_mutex.unlock();
|
||||
return;
|
||||
};
|
||||
self.ws_ready_mutex.unlock();
|
||||
defer self.allocator.free(items);
|
||||
for (items) |ws| {
|
||||
ws.drainPending();
|
||||
}
|
||||
}
|
||||
|
||||
fn ensureNoActiveConnection(self: *const Client) !void {
|
||||
if (self.http_active > 0 or self.ws_active > 0) {
|
||||
return error.InflightConnection;
|
||||
|
||||
@@ -62,9 +62,30 @@ _req_headers: http.Headers,
|
||||
_send_queue: std.ArrayList(Message) = .empty,
|
||||
_send_offset: usize = 0,
|
||||
|
||||
// buffered incoming frame
|
||||
// Single growing buffer for assembled ws frame bytes — both "currently
|
||||
// assembling" and "completed but not yet dispatched" messages live here.
|
||||
// Reset to zero length only by drainPending. Capacity grows to fit the
|
||||
// largest message ever received, then stays put (arena allocation, never
|
||||
// freed mid-life). Pending message data is referenced via _pending_messages
|
||||
// offset/length pairs into items — slices stay valid until drain clears.
|
||||
_recv_buffer: std.ArrayList(u8) = .empty,
|
||||
|
||||
// Offset within _recv_buffer where the current in-flight frame began.
|
||||
// Used to slice out the message when bytes_left reaches 0.
|
||||
_assembling_start: usize = 0,
|
||||
|
||||
// Events queued by libcurl callbacks; drained from the worker thread via
|
||||
// drainPending. Callbacks must NEVER enter V8 directly (they can run from
|
||||
// any thread driving curl_multi_perform), so all dispatch happens here.
|
||||
_pending_messages: std.ArrayList(QueuedMessage) = .empty,
|
||||
_pending_open: bool = false,
|
||||
_pending_close: ?PendingClose = null,
|
||||
|
||||
// Set while we're sitting in HttpClient.ws_ready. Doubles as the dedup
|
||||
// flag and the marker that we hold one extra "pending events" ref so the
|
||||
// WebSocket stays alive between queueing and drain.
|
||||
_in_ready_list: bool = false,
|
||||
|
||||
// close info for event dispatch
|
||||
_close_code: u16 = 1000,
|
||||
_close_reason: []const u8 = "",
|
||||
@@ -85,6 +106,19 @@ pub const ReadyState = enum(u8) {
|
||||
closed = 3,
|
||||
};
|
||||
|
||||
const QueuedMessage = struct {
|
||||
offset: usize,
|
||||
len: usize,
|
||||
frame_type: http.WsFrameType,
|
||||
};
|
||||
|
||||
const PendingClose = struct {
|
||||
code: u16,
|
||||
reason: []const u8,
|
||||
was_clean: bool,
|
||||
with_error: bool,
|
||||
};
|
||||
|
||||
pub const BinaryType = enum {
|
||||
blob,
|
||||
arraybuffer,
|
||||
@@ -202,6 +236,11 @@ pub fn kill(self: *WebSocket) void {
|
||||
}
|
||||
|
||||
pub fn disconnected(self: *WebSocket, err_: ?anyerror) void {
|
||||
if (self._ready_state == .closed) {
|
||||
// already disconnected (e.g. close-handshake disconnected us, then
|
||||
// libcurl reports the same connection completion).
|
||||
return;
|
||||
}
|
||||
const was_clean = self._ready_state == .closing and err_ == null;
|
||||
self._ready_state = .closed;
|
||||
|
||||
@@ -211,24 +250,18 @@ pub fn disconnected(self: *WebSocket, err_: ?anyerror) void {
|
||||
log.info(.websocket, "disconnected", .{ .url = self._url, .reason = "closed" });
|
||||
}
|
||||
|
||||
defer self.cleanup();
|
||||
|
||||
// Use 1006 (abnormal closure) if connection wasn't cleanly closed
|
||||
const code = if (was_clean) self._close_code else 1006;
|
||||
const reason = if (was_clean) self._close_reason else "";
|
||||
|
||||
// Spec requires error event before close on abnormal closure.
|
||||
// Dispatch events before cleanup since cleanup releases the ref count
|
||||
// which may free our event handler references.
|
||||
if (!was_clean) {
|
||||
self.dispatchErrorEvent() catch |err| {
|
||||
log.err(.websocket, "error event dispatch failed", .{ .err = err });
|
||||
};
|
||||
}
|
||||
|
||||
self.dispatchCloseEvent(code, reason, was_clean) catch |err| {
|
||||
log.err(.websocket, "close event dispatch failed", .{ .err = err });
|
||||
// Queue events first (markReady acquires a "pending" ref), then cleanup
|
||||
// (which releases the create-time ref). The pending ref keeps us alive
|
||||
// until drainPending dispatches and releases it.
|
||||
self._pending_close = .{
|
||||
.code = if (was_clean) self._close_code else 1006,
|
||||
.reason = if (was_clean) self._close_reason else "",
|
||||
.was_clean = was_clean,
|
||||
.with_error = !was_clean,
|
||||
};
|
||||
self.markReady();
|
||||
|
||||
self.cleanup();
|
||||
}
|
||||
|
||||
fn cleanup(self: *WebSocket) void {
|
||||
@@ -352,8 +385,14 @@ pub fn close(self: *WebSocket, code_: ?u16, reason_: ?[]const u8) !void {
|
||||
if (self._ready_state == .connecting) {
|
||||
// Connection not yet established - fail it
|
||||
self._ready_state = .closed;
|
||||
self._pending_close = .{
|
||||
.code = code,
|
||||
.reason = try self._arena.dupe(u8, reason),
|
||||
.was_clean = false,
|
||||
.with_error = false,
|
||||
};
|
||||
self.markReady();
|
||||
self.cleanup();
|
||||
try self.dispatchCloseEvent(code, reason, false);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -448,6 +487,53 @@ pub fn setOnClose(self: *WebSocket, cb_: ?js.Function) !void {
|
||||
}
|
||||
}
|
||||
|
||||
// Register self as having pending events to drain. Called from any thread
|
||||
// that produces ws events (currently libcurl callbacks on the worker thread,
|
||||
// future: Network thread). Acquires one extra ref to keep the WebSocket
|
||||
// alive between queueing and the drainPending call.
|
||||
fn markReady(self: *WebSocket) void {
|
||||
if (self._in_ready_list) return;
|
||||
self._in_ready_list = true;
|
||||
self.acquireRef();
|
||||
self._http_client.addReadyWs(self);
|
||||
}
|
||||
|
||||
// Dispatches all queued events to JS. Must be called from the worker thread
|
||||
// (the one that owns the V8 isolate). HttpClient calls this from its perform
|
||||
// loop after curl_multi_perform.
|
||||
pub fn drainPending(self: *WebSocket) void {
|
||||
self._in_ready_list = false;
|
||||
defer self.releaseRef(self._page._session);
|
||||
|
||||
if (self._pending_open) {
|
||||
self._pending_open = false;
|
||||
self.dispatchOpenEvent() catch |err| {
|
||||
log.err(.websocket, "open event fail", .{ .err = err });
|
||||
};
|
||||
}
|
||||
|
||||
for (self._pending_messages.items) |msg| {
|
||||
const data = self._recv_buffer.items[msg.offset..][0..msg.len];
|
||||
self.dispatchMessageEvent(data, msg.frame_type) catch |err| {
|
||||
log.warn(.websocket, "message dispatch", .{ .err = err });
|
||||
};
|
||||
}
|
||||
self._pending_messages.clearRetainingCapacity();
|
||||
self._recv_buffer.clearRetainingCapacity();
|
||||
|
||||
if (self._pending_close) |pc| {
|
||||
self._pending_close = null;
|
||||
if (pc.with_error) {
|
||||
self.dispatchErrorEvent() catch |err| {
|
||||
log.err(.websocket, "error event dispatch failed", .{ .err = err });
|
||||
};
|
||||
}
|
||||
self.dispatchCloseEvent(pc.code, pc.reason, pc.was_clean) catch |err| {
|
||||
log.err(.websocket, "close event dispatch failed", .{ .err = err });
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
fn dispatchOpenEvent(self: *WebSocket) !void {
|
||||
const frame = self._frame;
|
||||
const target = self.asEventTarget();
|
||||
@@ -612,12 +698,14 @@ fn _receivedDataCallback(conn: *http.Connection, data: []const u8) !void {
|
||||
if (comptime IS_DEBUG) {
|
||||
log.debug(.websocket, "incoming message", .{ .url = self._url, .len = meta.len, .bytes_left = meta.bytes_left, .type = meta.frame_type });
|
||||
}
|
||||
// Start of new frame. Pre-allocate buffer
|
||||
self._recv_buffer.clearRetainingCapacity();
|
||||
// Start of new frame. Record where it begins inside the shared buffer
|
||||
// (the buffer is only cleared by drainPending, so messages from the
|
||||
// same drain cycle live side-by-side).
|
||||
if (meta.len > self._http_client.max_response_size) {
|
||||
return error.MessageTooLarge;
|
||||
}
|
||||
try self._recv_buffer.ensureTotalCapacity(self._arena, meta.len);
|
||||
self._assembling_start = self._recv_buffer.items.len;
|
||||
try self._recv_buffer.ensureTotalCapacity(self._arena, self._assembling_start + meta.len);
|
||||
}
|
||||
|
||||
try self._recv_buffer.appendSlice(self._arena, data);
|
||||
@@ -627,11 +715,23 @@ fn _receivedDataCallback(conn: *http.Connection, data: []const u8) !void {
|
||||
return;
|
||||
}
|
||||
|
||||
const message = self._recv_buffer.items;
|
||||
const start = self._assembling_start;
|
||||
const len = self._recv_buffer.items.len - start;
|
||||
switch (meta.frame_type) {
|
||||
.text, .binary => try self.dispatchMessageEvent(message, meta.frame_type),
|
||||
.text, .binary => {
|
||||
// Queue the message — actual JS dispatch happens in drainPending
|
||||
// on the worker thread. Slice [start..start+len] of _recv_buffer
|
||||
// stays valid until drain clears it.
|
||||
try self._pending_messages.append(self._arena, .{
|
||||
.offset = start,
|
||||
.len = len,
|
||||
.frame_type = meta.frame_type,
|
||||
});
|
||||
self.markReady();
|
||||
},
|
||||
.close => {
|
||||
// Parse close frame: 2-byte code (big-endian) + optional reason
|
||||
const message = self._recv_buffer.items[start..][0..len];
|
||||
const received_code = if (message.len >= 2)
|
||||
@as(u16, message[0]) << 8 | message[1]
|
||||
else
|
||||
@@ -650,8 +750,13 @@ fn _receivedDataCallback(conn: *http.Connection, data: []const u8) !void {
|
||||
self._ready_state = .closing;
|
||||
try self.queueMessage(.close);
|
||||
}
|
||||
// Close payload isn't a queued message — discard from buffer.
|
||||
self._recv_buffer.shrinkRetainingCapacity(start);
|
||||
},
|
||||
.ping, .pong, .cont => {
|
||||
// Not dispatched as messages — discard from buffer.
|
||||
self._recv_buffer.shrinkRetainingCapacity(start);
|
||||
},
|
||||
.ping, .pong, .cont => {},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -681,9 +786,8 @@ fn receivedHeaderCallback(buffer: [*]const u8, header_count: usize, buf_len: usi
|
||||
self._ready_state = .open;
|
||||
log.info(.websocket, "connected", .{ .url = self._url });
|
||||
|
||||
self.dispatchOpenEvent() catch |err| {
|
||||
log.err(.websocket, "open event fail", .{ .err = err });
|
||||
};
|
||||
self._pending_open = true;
|
||||
self.markReady();
|
||||
return buf_len;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user