diff --git a/.github/actions/install/action.yml b/.github/actions/install/action.yml index 2f7ad2e2..f5bb72d3 100644 --- a/.github/actions/install/action.yml +++ b/.github/actions/install/action.yml @@ -13,7 +13,7 @@ inputs: zig-v8: description: 'zig v8 version to install' required: false - default: 'v0.3.7' + default: 'v0.3.8' v8: description: 'v8 version to install' required: false diff --git a/Dockerfile b/Dockerfile index b266f916..ed90e014 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,7 +3,7 @@ FROM debian:stable-slim ARG MINISIG=0.12 ARG ZIG_MINISIG=RWSGOq2NVecA2UPNdBUZykf1CCb147pkmdtYxgb3Ti+JO/wCYvhbAb/U ARG V8=14.0.365.4 -ARG ZIG_V8=v0.3.7 +ARG ZIG_V8=v0.3.8 ARG TARGETPLATFORM RUN apt-get update -yq && \ diff --git a/build.zig b/build.zig index 81c50c9e..9421ef92 100644 --- a/build.zig +++ b/build.zig @@ -495,6 +495,7 @@ fn buildCurl( .CURL_DISABLE_SMTP = true, .CURL_DISABLE_TELNET = true, .CURL_DISABLE_TFTP = true, + .CURL_DISABLE_WEBSOCKETS = false, // Enable WebSocket support .ssize_t = null, ._FILE_OFFSET_BITS = 64, diff --git a/build.zig.zon b/build.zig.zon index 3c1ff21e..e244734a 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -5,8 +5,8 @@ .minimum_zig_version = "0.15.2", .dependencies = .{ .v8 = .{ - .url = "https://github.com/lightpanda-io/zig-v8-fork/archive/refs/tags/v0.3.7.tar.gz", - .hash = "v8-0.0.0-xddH67uBBAD95hWsPQz3Ni1PlZjdywtPXrGUAp8rSKco", + .url = "https://github.com/lightpanda-io/zig-v8-fork/archive/refs/tags/v0.3.8.tar.gz", + .hash = "v8-0.0.0-xddH6weEBAAdY3uxkNWqYpG7bX_h1Oj3UYBIkbxEyNCl", }, // .v8 = .{ .path = "../zig-v8-fork" }, .brotli = .{ diff --git a/src/Config.zig b/src/Config.zig index ec3a8f81..41f80994 100644 --- a/src/Config.zig +++ b/src/Config.zig @@ -129,6 +129,13 @@ pub fn httpMaxResponseSize(self: *const Config) ?usize { }; } +pub fn wsMaxConcurrent(self: *const Config) u8 { + return switch (self.mode) { + inline .serve, .fetch, .mcp => |opts| opts.common.ws_max_concurrent orelse 8, + else => unreachable, + }; +} + pub fn logLevel(self: *const Config) ?log.Level { return switch (self.mode) { inline .serve, .fetch, .mcp, .agent => |opts| opts.common.log_level, @@ -295,6 +302,7 @@ pub const Common = struct { http_timeout: ?u31 = null, http_connect_timeout: ?u31 = null, http_max_response_size: ?usize = null, + ws_max_concurrent: ?u8 = null, tls_verify_host: bool = true, log_level: ?log.Level = null, log_format: ?log.Format = null, @@ -395,6 +403,10 @@ pub fn printUsageAndExit(self: *const Config, success: bool) void { \\ (e.g. XHR, fetch, script loading, ...). \\ Defaults to no limit. \\ + \\--ws-max-concurrent + \\ The maximum number of concurrent WebSocket connections. + \\ Defaults to 8. + \\ \\--log-level The log level: debug, info, warn, error or fatal. \\ Defaults to ++ (if (builtin.mode == .Debug) " info." else "warn.") ++ @@ -1110,6 +1122,19 @@ fn parseCommonArg( return true; } + if (std.mem.eql(u8, "--ws-max-concurrent", opt) or std.mem.eql(u8, "--ws_max_concurrent", opt)) { + const str = args.next() orelse { + log.fatal(.app, "missing argument value", .{ .arg = opt }); + return error.InvalidArgument; + }; + + common.ws_max_concurrent = std.fmt.parseInt(u8, str, 10) catch |err| { + log.fatal(.app, "invalid argument value", .{ .arg = opt, .err = err }); + return error.InvalidArgument; + }; + return true; + } + if (std.mem.eql(u8, "--log-level", opt) or std.mem.eql(u8, "--log_level", opt)) { const str = args.next() orelse { log.fatal(.app, "missing argument value", .{ .arg = opt }); diff --git a/src/Notification.zig b/src/Notification.zig index 6adbfd81..e7bf1e33 100644 --- a/src/Notification.zig +++ b/src/Notification.zig @@ -83,6 +83,7 @@ const EventListeners = struct { http_request_auth_required: List = .{}, http_response_data: List = .{}, http_response_header_done: List = .{}, + javascript_dialog_opening: List = .{}, }; const Events = union(enum) { @@ -102,6 +103,7 @@ const Events = union(enum) { http_request_done: *const RequestDone, http_response_data: *const ResponseData, http_response_header_done: *const ResponseHeaderDone, + javascript_dialog_opening: *const JavascriptDialogOpening, }; const EventType = std.meta.FieldEnum(Events); @@ -185,6 +187,12 @@ pub const RequestFail = struct { err: anyerror, }; +pub const JavascriptDialogOpening = struct { + url: [:0]const u8, + message: []const u8, + dialog_type: []const u8, +}; + pub fn init(allocator: Allocator) !*Notification { const notification = try allocator.create(Notification); errdefer allocator.destroy(notification); diff --git a/src/TestWSServer.zig b/src/TestWSServer.zig new file mode 100644 index 00000000..6d28acf3 --- /dev/null +++ b/src/TestWSServer.zig @@ -0,0 +1,366 @@ +// Copyright (C) 2023-2026 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const std = @import("std"); +const posix = std.posix; + +const TestWSServer = @This(); + +shutdown: std.atomic.Value(bool), +listener: ?posix.socket_t, + +pub fn init() TestWSServer { + return .{ + .shutdown = .init(true), + .listener = null, + }; +} + +pub fn stop(self: *TestWSServer) void { + self.shutdown.store(true, .release); + if (self.listener) |socket| { + switch (@import("builtin").target.os.tag) { + .linux => std.posix.shutdown(socket, .recv) catch {}, + else => std.posix.close(socket), + } + } +} + +pub fn run(self: *TestWSServer, wg: *std.Thread.WaitGroup) void { + self.runImpl(wg) catch |err| { + std.debug.print("WebSocket echo server error: {}\n", .{err}); + }; +} + +fn runImpl(self: *TestWSServer, wg: *std.Thread.WaitGroup) !void { + const socket = try posix.socket(posix.AF.INET, posix.SOCK.STREAM, 0); + errdefer posix.close(socket); + + const addr = std.net.Address.initIp4(.{ 127, 0, 0, 1 }, 9584); + + try posix.setsockopt(socket, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1))); + try posix.bind(socket, &addr.any, addr.getOsSockLen()); + try posix.listen(socket, 8); + + self.listener = socket; + self.shutdown.store(false, .release); + wg.finish(); + + while (!self.shutdown.load(.acquire)) { + var client_addr: posix.sockaddr = undefined; + var addr_len: posix.socklen_t = @sizeOf(posix.sockaddr); + + const client = posix.accept(socket, &client_addr, &addr_len, 0) catch |err| { + if (self.shutdown.load(.acquire)) return; + std.debug.print("[WS Server] Accept error: {}\n", .{err}); + continue; + }; + + const thread = std.Thread.spawn(.{}, handleClient, .{client}) catch |err| { + std.debug.print("[WS Server] Thread spawn error: {}\n", .{err}); + posix.close(client); + continue; + }; + thread.detach(); + } +} + +fn handleClient(client: posix.socket_t) void { + defer posix.close(client); + + var buf: [4096]u8 = undefined; + const n = posix.read(client, &buf) catch return; + + const request = buf[0..n]; + + // Find Sec-WebSocket-Key + const key_header = "Sec-WebSocket-Key: "; + const key_start = std.mem.indexOf(u8, request, key_header) orelse return; + const key_line_start = key_start + key_header.len; + const key_end = std.mem.indexOfScalarPos(u8, request, key_line_start, '\r') orelse return; + const key = request[key_line_start..key_end]; + + // Compute accept key + var hasher = std.crypto.hash.Sha1.init(.{}); + hasher.update(key); + hasher.update("258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); + var hash: [20]u8 = undefined; + hasher.final(&hash); + + var accept_key: [28]u8 = undefined; + _ = std.base64.standard.Encoder.encode(&accept_key, &hash); + + // Send upgrade response + var resp_buf: [256]u8 = undefined; + const resp = std.fmt.bufPrint(&resp_buf, "HTTP/1.1 101 Switching Protocols\r\n" ++ + "Upgrade: websocket\r\n" ++ + "Connection: Upgrade\r\n" ++ + "Sec-WebSocket-Accept: {s}\r\n\r\n", .{accept_key}) catch return; + _ = posix.write(client, resp) catch return; + + // Message loop with larger buffer for big messages + var msg_buf: [128 * 1024]u8 = undefined; + var recv_buf = RecvBuffer{ .buf = &msg_buf }; + + while (true) { + const frame = recv_buf.readFrame(client) orelse break; + + // Close frame - echo it back before closing + if (frame.opcode == 8) { + sendFrame(client, 8, "", frame.payload) catch {}; + break; + } + + // Handle commands or echo + if (frame.opcode == 1) { // Text + handleTextMessage(client, frame.payload) catch break; + } else if (frame.opcode == 2) { // Binary + handleBinaryMessage(client, frame.payload) catch break; + } + } +} + +const Frame = struct { + opcode: u8, + payload: []u8, +}; + +const RecvBuffer = struct { + buf: []u8, + start: usize = 0, + end: usize = 0, + + fn available(self: *RecvBuffer) []u8 { + return self.buf[self.start..self.end]; + } + + fn consume(self: *RecvBuffer, n: usize) void { + self.start += n; + if (self.start >= self.end) { + self.start = 0; + self.end = 0; + } + } + + fn ensureBytes(self: *RecvBuffer, client: posix.socket_t, needed: usize) bool { + while (self.end - self.start < needed) { + // Compact buffer if needed + if (self.end >= self.buf.len - 1024) { + const avail = self.end - self.start; + std.mem.copyForwards(u8, self.buf[0..avail], self.buf[self.start..self.end]); + self.start = 0; + self.end = avail; + } + + const n = posix.read(client, self.buf[self.end..]) catch return false; + if (n == 0) return false; + self.end += n; + } + return true; + } + + fn readFrame(self: *RecvBuffer, client: posix.socket_t) ?Frame { + // Need at least 2 bytes for basic header + if (!self.ensureBytes(client, 2)) return null; + + const data = self.available(); + const opcode = data[0] & 0x0F; + const masked = (data[1] & 0x80) != 0; + var payload_len: usize = data[1] & 0x7F; + var header_size: usize = 2; + + // Extended payload length + if (payload_len == 126) { + if (!self.ensureBytes(client, 4)) return null; + const d = self.available(); + payload_len = @as(usize, d[2]) << 8 | d[3]; + header_size = 4; + } else if (payload_len == 127) { + if (!self.ensureBytes(client, 10)) return null; + const d = self.available(); + payload_len = @as(usize, d[2]) << 56 | + @as(usize, d[3]) << 48 | + @as(usize, d[4]) << 40 | + @as(usize, d[5]) << 32 | + @as(usize, d[6]) << 24 | + @as(usize, d[7]) << 16 | + @as(usize, d[8]) << 8 | + d[9]; + header_size = 10; + } + + const mask_size: usize = if (masked) 4 else 0; + const total_frame_size = header_size + mask_size + payload_len; + + if (!self.ensureBytes(client, total_frame_size)) return null; + + const frame_data = self.available(); + + // Get mask key if present + var mask_key: [4]u8 = undefined; + if (masked) { + @memcpy(&mask_key, frame_data[header_size..][0..4]); + } + + // Get payload and unmask + const payload_start = header_size + mask_size; + const payload = frame_data[payload_start..][0..payload_len]; + + if (masked) { + for (payload, 0..) |*b, i| { + b.* ^= mask_key[i % 4]; + } + } + + self.consume(total_frame_size); + + return .{ .opcode = opcode, .payload = payload }; + } +}; + +fn handleTextMessage(client: posix.socket_t, payload: []const u8) !void { + // Command: force-close - close socket immediately without close frame + if (std.mem.eql(u8, payload, "force-close")) { + return error.ForceClose; + } + + // Command: send-large:N - send a message of N bytes + if (std.mem.startsWith(u8, payload, "send-large:")) { + const size_str = payload["send-large:".len..]; + const size = std.fmt.parseInt(usize, size_str, 10) catch return error.InvalidCommand; + try sendLargeMessage(client, size); + return; + } + + // Command: close:CODE:REASON - send close frame with specific code/reason + if (std.mem.startsWith(u8, payload, "close:")) { + const rest = payload["close:".len..]; + if (std.mem.indexOf(u8, rest, ":")) |sep| { + const code = std.fmt.parseInt(u16, rest[0..sep], 10) catch 1000; + const reason = rest[sep + 1 ..]; + try sendCloseFrame(client, code, reason); + } + return; + } + + // Default: echo with "echo-" prefix + const prefix = "echo-"; + try sendFrame(client, 1, prefix, payload); +} + +fn handleBinaryMessage(client: posix.socket_t, payload: []const u8) !void { + // Echo binary data back with byte 0xEE prepended as marker + const marker = [_]u8{0xEE}; + try sendFrame(client, 2, &marker, payload); +} + +fn sendFrame(client: posix.socket_t, opcode: u8, prefix: []const u8, payload: []const u8) !void { + const total_len = prefix.len + payload.len; + + // Build header + var header: [10]u8 = undefined; + var header_len: usize = 2; + + header[0] = 0x80 | opcode; // FIN + opcode + + if (total_len <= 125) { + header[1] = @intCast(total_len); + } else if (total_len <= 65535) { + header[1] = 126; + header[2] = @intCast((total_len >> 8) & 0xFF); + header[3] = @intCast(total_len & 0xFF); + header_len = 4; + } else { + header[1] = 127; + header[2] = @intCast((total_len >> 56) & 0xFF); + header[3] = @intCast((total_len >> 48) & 0xFF); + header[4] = @intCast((total_len >> 40) & 0xFF); + header[5] = @intCast((total_len >> 32) & 0xFF); + header[6] = @intCast((total_len >> 24) & 0xFF); + header[7] = @intCast((total_len >> 16) & 0xFF); + header[8] = @intCast((total_len >> 8) & 0xFF); + header[9] = @intCast(total_len & 0xFF); + header_len = 10; + } + + _ = try posix.write(client, header[0..header_len]); + if (prefix.len > 0) { + _ = try posix.write(client, prefix); + } + if (payload.len > 0) { + _ = try posix.write(client, payload); + } +} + +fn sendLargeMessage(client: posix.socket_t, size: usize) !void { + // Build header + var header: [10]u8 = undefined; + var header_len: usize = 2; + + header[0] = 0x81; // FIN + text + + if (size <= 125) { + header[1] = @intCast(size); + } else if (size <= 65535) { + header[1] = 126; + header[2] = @intCast((size >> 8) & 0xFF); + header[3] = @intCast(size & 0xFF); + header_len = 4; + } else { + header[1] = 127; + header[2] = @intCast((size >> 56) & 0xFF); + header[3] = @intCast((size >> 48) & 0xFF); + header[4] = @intCast((size >> 40) & 0xFF); + header[5] = @intCast((size >> 32) & 0xFF); + header[6] = @intCast((size >> 24) & 0xFF); + header[7] = @intCast((size >> 16) & 0xFF); + header[8] = @intCast((size >> 8) & 0xFF); + header[9] = @intCast(size & 0xFF); + header_len = 10; + } + + _ = try posix.write(client, header[0..header_len]); + + // Send payload in chunks - pattern of 'A'-'Z' repeating + var sent: usize = 0; + var chunk: [4096]u8 = undefined; + while (sent < size) { + const to_send = @min(chunk.len, size - sent); + for (chunk[0..to_send], 0..) |*b, i| { + b.* = @intCast('A' + ((sent + i) % 26)); + } + _ = try posix.write(client, chunk[0..to_send]); + sent += to_send; + } +} + +fn sendCloseFrame(client: posix.socket_t, code: u16, reason: []const u8) !void { + const reason_len = @min(reason.len, 123); // Max 123 bytes for reason + const payload_len = 2 + reason_len; + + var frame: [129]u8 = undefined; // 2 header + 2 code + 123 reason + 2 padding + frame[0] = 0x88; // FIN + close + frame[1] = @intCast(payload_len); + frame[2] = @intCast((code >> 8) & 0xFF); + frame[3] = @intCast(code & 0xFF); + if (reason_len > 0) { + @memcpy(frame[4..][0..reason_len], reason[0..reason_len]); + } + + _ = try posix.write(client, frame[0 .. 4 + reason_len]); +} diff --git a/src/browser/HttpClient.zig b/src/browser/HttpClient.zig index 6247e323..4683e38e 100644 --- a/src/browser/HttpClient.zig +++ b/src/browser/HttpClient.zig @@ -28,6 +28,7 @@ const URL = @import("URL.zig"); const Config = @import("../Config.zig"); const Notification = @import("../Notification.zig"); const CookieJar = @import("webapi/storage/Cookie.zig").Jar; +const WebSocket = @import("webapi/net/WebSocket.zig"); const http = @import("../network/http.zig"); const Network = @import("../network/Network.zig"); @@ -116,6 +117,8 @@ obey_robots: bool, cdp_client: ?CDPClient = null, +max_response_size: usize, + // libcurl can monitor arbitrary sockets, this lets us use libcurl to poll // both HTTP data as well as messages from an CDP connection. // Furthermore, we have some tension between blocking scripts and request @@ -156,6 +159,7 @@ pub fn init(allocator: Allocator, network: *Network) !*Client { .http_proxy = http_proxy, .tls_verify = network.config.tlsVerifyHost(), .obey_robots = network.config.obeyRobots(), + .max_response_size = network.config.httpMaxResponseSize() orelse std.math.maxInt(u32), }; return client; @@ -224,16 +228,18 @@ fn _abort(self: *Client, comptime abort_all: bool, frame_id: u32) void { while (n) |node| { n = node.next; const conn: *http.Connection = @fieldParentPtr("node", node); - var transfer = Transfer.fromConnection(conn) catch |err| { - // Let's cleanup what we can - self.removeConn(conn); - log.err(.http, "get private info", .{ .err = err, .source = "abort" }); - continue; - }; - if (comptime abort_all) { - transfer.kill(); - } else if (transfer.req.frame_id == frame_id) { - transfer.kill(); + switch (conn.transport) { + .http => |transfer| { + if ((comptime abort_all) or transfer.req.frame_id == frame_id) { + transfer.kill(); + } + }, + .websocket => |ws| { + if ((comptime abort_all) or ws._page._frame_id == frame_id) { + ws.kill(); + } + }, + .none => unreachable, } } } @@ -264,7 +270,11 @@ fn _abort(self: *Client, comptime abort_all: bool, frame_id: u32) void { var leftover: usize = 0; while (it) |node| : (it = node.next) { const conn: *http.Connection = @fieldParentPtr("node", node); - std.debug.assert((Transfer.fromConnection(conn) catch unreachable).aborted); + switch (conn.transport) { + .http => |transfer| std.debug.assert(transfer.aborted), + .websocket => {}, + .none => {}, + } leftover += 1; } std.debug.assert(self.active == leftover); @@ -706,7 +716,6 @@ fn makeTransfer(self: *Client, req: Request) !*Transfer { .url = req.url, .req = req, .client = self, - .max_response_size = self.network.config.httpMaxResponseSize(), }; return transfer; } @@ -764,15 +773,11 @@ fn makeRequest(self: *Client, conn: *http.Connection, transfer: *Transfer) anyer // fails BEFORE `curl_multi_add_handle` succeeds, the we still need to do // cleanup. But if things fail after `curl_multi_add_handle`, we expect // perfom to pickup the failure and cleanup. - self.in_use.append(&conn.node); - self.handles.add(conn) catch |err| { + self.trackConn(conn) catch |err| { transfer._conn = null; transfer.deinit(); - self.in_use.remove(&conn.node); - self.releaseConn(conn); return err; }; - self.active += 1; if (transfer.req.start_callback) |cb| { cb(Response.fromTransfer(transfer)) catch |err| { @@ -836,7 +841,7 @@ fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *T // Also check on RecvError: proxy may send 407 with headers before // closing the connection (CONNECT tunnel not yet established). if (msg.err == null or msg.err.? == error.RecvError) { - transfer.detectAuthChallenge(&msg.conn); + transfer.detectAuthChallenge(msg.conn); } // In case of auth challenge @@ -938,7 +943,7 @@ fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *T if (!transfer._header_done_called) { // In case of request w/o data, we need to call the header done // callback now. - const proceed = try transfer.headerDoneCallback(&msg.conn); + const proceed = try transfer.headerDoneCallback(msg.conn); if (!proceed) { transfer.requestFailed(error.Abort, true); return true; @@ -984,30 +989,63 @@ fn processOneMessage(self: *Client, msg: http.Handles.MultiMessage, transfer: *T fn processMessages(self: *Client) !bool { var processed = false; - while (self.handles.readMessage()) |msg| { - const transfer = try Transfer.fromConnection(&msg.conn); - const done = self.processOneMessage(msg, transfer) catch |err| blk: { - log.err(.http, "process_messages", .{ .err = err, .req = transfer }); - transfer.requestFailed(err, true); - if (transfer._detached_conn) |c| { - // Conn was removed from handles during redirect reconfiguration - // but not re-added. Release it directly to avoid double-remove. - self.in_use.remove(&c.node); - self.active -= 1; - self.releaseConn(c); - transfer._detached_conn = null; - } - break :blk true; - }; - if (done) { - transfer.deinit(); - processed = true; + while (try self.handles.readMessage()) |msg| { + switch (msg.conn.transport) { + .http => |transfer| { + const done = self.processOneMessage(msg, transfer) catch |err| blk: { + log.err(.http, "process_messages", .{ .err = err, .req = transfer }); + transfer.requestFailed(err, true); + if (transfer._detached_conn) |c| { + // Conn was removed from handles during redirect reconfiguration + // but not re-added. Release it directly to avoid double-remove. + self.in_use.remove(&c.node); + self.active -= 1; + self.releaseConn(c); + transfer._detached_conn = null; + } + break :blk true; + }; + if (done) { + transfer.deinit(); + processed = true; + } + }, + .websocket => |ws| { + if (msg.err) |err| switch (err) { + error.GotNothing => ws.disconnected(null), + else => ws.disconnected(err), + } else { + // Clean close - no error + ws.disconnected(null); + } + + processed = true; + }, + .none => unreachable, } } return processed; } -fn removeConn(self: *Client, conn: *http.Connection) void { +pub fn trackConn(self: *Client, conn: *http.Connection) !void { + self.in_use.append(&conn.node); + // Set private pointer so readMessage can find the Connection. + // Must be done each time since curl_easy_reset clears it when + // connections are returned to pool. + conn.setPrivate(conn) catch |err| { + self.in_use.remove(&conn.node); + self.releaseConn(conn); + return err; + }; + self.handles.add(conn) catch |err| { + self.in_use.remove(&conn.node); + self.releaseConn(conn); + return err; + }; + self.active += 1; +} + +pub fn removeConn(self: *Client, conn: *http.Connection) void { self.in_use.remove(&conn.node); self.active -= 1; if (self.handles.remove(conn)) { @@ -1040,7 +1078,6 @@ pub const Request = struct { resource_type: ResourceType, credentials: ?[:0]const u8 = null, notification: *Notification, - max_response_size: ?usize = null, // This is only relevant for intercepted requests. If a request is flagged // as blocking AND is intercepted, then it'll be up to us to wait until @@ -1168,8 +1205,6 @@ pub const Transfer = struct { aborted: bool = false, - max_response_size: ?usize = null, - // We'll store the response header here response_header: ?ResponseHead = null, @@ -1300,7 +1335,7 @@ pub const Transfer = struct { const req = &self.req; // Set callbacks and per-client settings on the pooled connection. - try conn.setCallbacks(Transfer.dataCallback); + try conn.setWriteCallback(Transfer.dataCallback); try conn.setFollowLocation(false); try conn.setProxy(client.http_proxy); try conn.setTlsVerify(client.tls_verify, client.use_proxy); @@ -1328,7 +1363,7 @@ pub const Transfer = struct { try conn.setCookies(@ptrCast(cookies.ptr)); } - try conn.setPrivate(self); + conn.transport = .{ .http = self }; // add credentials if (req.credentials) |creds| { @@ -1528,11 +1563,9 @@ pub const Transfer = struct { } } - if (transfer.max_response_size) |max_size| { - if (transfer.getContentLength()) |cl| { - if (cl > max_size) { - return error.ResponseTooLarge; - } + if (transfer.getContentLength()) |cl| { + if (cl > transfer.client.max_response_size) { + return error.ResponseTooLarge; } } @@ -1605,10 +1638,7 @@ pub const Transfer = struct { } const conn: *http.Connection = @ptrCast(@alignCast(data)); - var transfer = fromConnection(conn) catch |err| { - log.err(.http, "get private info", .{ .err = err, .source = "body callback" }); - return http.writefunc_error; - }; + var transfer = conn.transport.http; if (!transfer._first_data_received) { transfer._first_data_received = true; @@ -1625,11 +1655,9 @@ pub const Transfer = struct { // Pre-size buffer from Content-Length. if (transfer.getContentLength()) |cl| { - if (transfer.max_response_size) |max_size| { - if (cl > max_size) { - transfer._callback_error = error.ResponseTooLarge; - return http.writefunc_error; - } + if (cl > transfer.client.max_response_size) { + transfer._callback_error = error.ResponseTooLarge; + return http.writefunc_error; } transfer._stream_buffer.ensureTotalCapacity(transfer.arena.allocator(), cl) catch {}; } @@ -1638,11 +1666,9 @@ pub const Transfer = struct { if (transfer._skip_body) return @intCast(chunk_len); transfer.bytes_received += chunk_len; - if (transfer.max_response_size) |max_size| { - if (transfer.bytes_received > max_size) { - transfer._callback_error = error.ResponseTooLarge; - return http.writefunc_error; - } + if (transfer.bytes_received > transfer.client.max_response_size) { + transfer._callback_error = error.ResponseTooLarge; + return http.writefunc_error; } const chunk = buffer[0..chunk_len]; @@ -1671,11 +1697,6 @@ pub const Transfer = struct { return .{ .list = .{ .list = self.response_header.?._injected_headers } }; } - fn fromConnection(conn: *const http.Connection) !*Transfer { - const private = try conn.getPrivate(); - return @ptrCast(@alignCast(private)); - } - pub fn fulfill(transfer: *Transfer, status: u16, headers: []const http.Header, body: ?[]const u8) !void { if (transfer._conn != null) { // should never happen, should have been intercepted/paused, and then diff --git a/src/browser/Page.zig b/src/browser/Page.zig index b265a45d..16a05806 100644 --- a/src/browser/Page.zig +++ b/src/browser/Page.zig @@ -3438,10 +3438,7 @@ pub fn handleClick(self: *Page, target: *Node) !void { pub fn triggerKeyboard(self: *Page, keyboard_event: *KeyboardEvent) !void { const event = keyboard_event.asEvent(); - const element = self.window._document._active_element orelse { - _ = event.releaseRef(self._session); - return; - }; + const element = self.window._document._active_element orelse return; if (comptime IS_DEBUG) { log.debug(.page, "page keydown", .{ diff --git a/src/browser/js/bridge.zig b/src/browser/js/bridge.zig index 0a51327e..8fbdc315 100644 --- a/src/browser/js/bridge.zig +++ b/src/browser/js/bridge.zig @@ -829,6 +829,8 @@ pub const JsApis = flattenTypes(&.{ @import("../webapi/net/URLSearchParams.zig"), @import("../webapi/net/XMLHttpRequest.zig"), @import("../webapi/net/XMLHttpRequestEventTarget.zig"), + @import("../webapi/net/WebSocket.zig"), + @import("../webapi/event/CloseEvent.zig"), @import("../webapi/streams/ReadableStream.zig"), @import("../webapi/streams/ReadableStreamDefaultReader.zig"), @import("../webapi/streams/ReadableStreamDefaultController.zig"), diff --git a/src/browser/tests/net/websocket.html b/src/browser/tests/net/websocket.html new file mode 100644 index 00000000..9ca32120 --- /dev/null +++ b/src/browser/tests/net/websocket.html @@ -0,0 +1,563 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/browser/webapi/Event.zig b/src/browser/webapi/Event.zig index b48bc059..b573bfc7 100644 --- a/src/browser/webapi/Event.zig +++ b/src/browser/webapi/Event.zig @@ -80,6 +80,7 @@ pub const Type = union(enum) { promise_rejection_event: *@import("event/PromiseRejectionEvent.zig"), submit_event: *@import("event/SubmitEvent.zig"), form_data_event: *@import("event/FormDataEvent.zig"), + close_event: *@import("event/CloseEvent.zig"), }; pub const Options = struct { @@ -171,6 +172,7 @@ pub fn is(self: *Event, comptime T: type) ?*T { .promise_rejection_event => |e| return if (T == @import("event/PromiseRejectionEvent.zig")) e else null, .submit_event => |e| return if (T == @import("event/SubmitEvent.zig")) e else null, .form_data_event => |e| return if (T == @import("event/FormDataEvent.zig")) e else null, + .close_event => |e| return if (T == @import("event/CloseEvent.zig")) e else null, .ui_event => |e| { if (T == @import("event/UIEvent.zig")) { return e; diff --git a/src/browser/webapi/EventTarget.zig b/src/browser/webapi/EventTarget.zig index 704efeb3..60dfbf11 100644 --- a/src/browser/webapi/EventTarget.zig +++ b/src/browser/webapi/EventTarget.zig @@ -45,6 +45,7 @@ pub const Type = union(enum) { visual_viewport: *@import("VisualViewport.zig"), file_reader: *@import("FileReader.zig"), font_face_set: *@import("css/FontFaceSet.zig"), + websocket: *@import("net/WebSocket.zig"), }; pub fn init(page: *Page) !*EventTarget { @@ -141,6 +142,7 @@ pub fn format(self: *EventTarget, writer: *std.Io.Writer) !void { .visual_viewport => writer.writeAll(""), .file_reader => writer.writeAll(""), .font_face_set => writer.writeAll(""), + .websocket => writer.writeAll(""), }; } @@ -160,6 +162,7 @@ pub fn toString(self: *EventTarget) []const u8 { .visual_viewport => return "[object VisualViewport]", .file_reader => return "[object FileReader]", .font_face_set => return "[object FontFaceSet]", + .websocket => return "[object WebSocket]", }; } diff --git a/src/browser/webapi/MessagePort.zig b/src/browser/webapi/MessagePort.zig index 51d208b0..a7bb9bfc 100644 --- a/src/browser/webapi/MessagePort.zig +++ b/src/browser/webapi/MessagePort.zig @@ -125,7 +125,7 @@ const PostMessageCallback = struct { const target = self.port.asEventTarget(); if (page._event_manager.hasDirectListeners(target, "message", self.port._on_message)) { const event = (MessageEvent.initTrusted(comptime .wrap("message"), .{ - .data = self.message, + .data = .{ .value = self.message }, .origin = "", .source = null, }, page) catch |err| { diff --git a/src/browser/webapi/Window.zig b/src/browser/webapi/Window.zig index fb3ec8f8..418037fd 100644 --- a/src/browser/webapi/Window.zig +++ b/src/browser/webapi/Window.zig @@ -791,7 +791,7 @@ const PostMessageCallback = struct { const event_target = window.asEventTarget(); if (page._event_manager.hasDirectListeners(event_target, "message", window._on_message)) { const event = (try MessageEvent.initTrusted(comptime .wrap("message"), .{ - .data = self.message, + .data = .{ .value = self.message }, .origin = self.origin, .source = self.source, .bubbles = false, @@ -903,15 +903,31 @@ pub const JsApi = struct { pub const opener = bridge.property(null, .{ .template = false }); pub const alert = bridge.function(struct { - fn alert(_: *const Window, _: ?[]const u8) void {} - }.alert, .{ .noop = true }); + fn alert(_: *const Window, message: ?[]const u8, page: *Page) void { + page._session.notification.dispatch(.javascript_dialog_opening, &.{ + .url = page.url, + .message = message orelse "", + .dialog_type = "alert", + }); + } + }.alert, .{}); pub const confirm = bridge.function(struct { - fn confirm(_: *const Window, _: ?[]const u8) bool { + fn confirm(_: *const Window, message: ?[]const u8, page: *Page) bool { + page._session.notification.dispatch(.javascript_dialog_opening, &.{ + .url = page.url, + .message = message orelse "", + .dialog_type = "confirm", + }); return false; } }.confirm, .{}); pub const prompt = bridge.function(struct { - fn prompt(_: *const Window, _: ?[]const u8, _: ?[]const u8) ?[]const u8 { + fn prompt(_: *const Window, message: ?[]const u8, _: ?[]const u8, page: *Page) ?[]const u8 { + page._session.notification.dispatch(.javascript_dialog_opening, &.{ + .url = page.url, + .message = message orelse "", + .dialog_type = "prompt", + }); return null; } }.prompt, .{}); diff --git a/src/browser/webapi/collections/ChildNodes.zig b/src/browser/webapi/collections/ChildNodes.zig index 9c2bde91..df3e7ee1 100644 --- a/src/browser/webapi/collections/ChildNodes.zig +++ b/src/browser/webapi/collections/ChildNodes.zig @@ -126,8 +126,12 @@ fn versionCheck(self: *ChildNodes, page: *Page) bool { } const NodeList = @import("NodeList.zig"); -pub fn runtimeGenericWrap(self: *ChildNodes, page: *Page) !*NodeList { - return page._factory.create(NodeList{ ._data = .{ .child_nodes = self } }); +pub fn runtimeGenericWrap(self: *ChildNodes, _: *const Page) !*NodeList { + const nl = try self._arena.create(NodeList); + nl.* = .{ + ._data = .{ .child_nodes = self }, + }; + return nl; } const Iterator = struct { diff --git a/src/browser/webapi/event/CloseEvent.zig b/src/browser/webapi/event/CloseEvent.zig new file mode 100644 index 00000000..aa9f1d2b --- /dev/null +++ b/src/browser/webapi/event/CloseEvent.zig @@ -0,0 +1,102 @@ +// Copyright (C) 2023-2026 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const std = @import("std"); +const String = @import("../../../string.zig").String; + +const Page = @import("../../Page.zig"); +const Session = @import("../../Session.zig"); +const Event = @import("../Event.zig"); +const Allocator = std.mem.Allocator; + +const CloseEvent = @This(); +_proto: *Event, +_code: u16 = 1000, +_reason: []const u8 = "", +_was_clean: bool = true, + +const CloseEventOptions = struct { + code: u16 = 1000, + reason: []const u8 = "", + wasClean: bool = true, +}; + +const Options = Event.inheritOptions(CloseEvent, CloseEventOptions); + +pub fn init(typ: []const u8, _opts: ?Options, page: *Page) !*CloseEvent { + const arena = try page.getArena(.{ .debug = "CloseEvent" }); + errdefer page.releaseArena(arena); + const type_string = try String.init(arena, typ, .{}); + return initWithTrusted(arena, type_string, _opts, false, page); +} + +pub fn initTrusted(typ: String, _opts: ?Options, page: *Page) !*CloseEvent { + const arena = try page.getArena(.{ .debug = "CloseEvent.trusted" }); + errdefer page.releaseArena(arena); + return initWithTrusted(arena, typ, _opts, true, page); +} + +fn initWithTrusted(arena: Allocator, typ: String, _opts: ?Options, trusted: bool, page: *Page) !*CloseEvent { + const opts = _opts orelse Options{}; + + const event = try page._factory.event( + arena, + typ, + CloseEvent{ + ._proto = undefined, + ._code = opts.code, + ._reason = if (opts.reason.len > 0) try arena.dupe(u8, opts.reason) else "", + ._was_clean = opts.wasClean, + }, + ); + + Event.populatePrototypes(event, opts, trusted); + return event; +} + +pub fn asEvent(self: *CloseEvent) *Event { + return self._proto; +} + +pub fn getCode(self: *const CloseEvent) u16 { + return self._code; +} + +pub fn getReason(self: *const CloseEvent) []const u8 { + return self._reason; +} + +pub fn getWasClean(self: *const CloseEvent) bool { + return self._was_clean; +} + +pub const JsApi = struct { + const js = @import("../../js/js.zig"); + pub const bridge = js.Bridge(CloseEvent); + + pub const Meta = struct { + pub const name = "CloseEvent"; + pub const prototype_chain = bridge.prototypeChain(); + pub var class_id: bridge.ClassId = undefined; + }; + + pub const constructor = bridge.constructor(CloseEvent.init, .{}); + pub const code = bridge.accessor(CloseEvent.getCode, null, .{}); + pub const reason = bridge.accessor(CloseEvent.getReason, null, .{}); + pub const wasClean = bridge.accessor(CloseEvent.getWasClean, null, .{}); +}; diff --git a/src/browser/webapi/event/MessageEvent.zig b/src/browser/webapi/event/MessageEvent.zig index 03530400..dfd813d5 100644 --- a/src/browser/webapi/event/MessageEvent.zig +++ b/src/browser/webapi/event/MessageEvent.zig @@ -30,16 +30,23 @@ const Allocator = std.mem.Allocator; const MessageEvent = @This(); _proto: *Event, -_data: ?js.Value.Temp = null, +_data: ?Data = null, _origin: []const u8 = "", _source: ?*Window = null, const MessageEventOptions = struct { - data: ?js.Value.Temp = null, + data: ?Data = null, origin: ?[]const u8 = null, source: ?*Window = null, }; +pub const Data = union(enum) { + value: js.Value.Temp, + string: []const u8, + arraybuffer: js.ArrayBuffer, + blob: *@import("../Blob.zig"), +}; + const Options = Event.inheritOptions(MessageEvent, MessageEventOptions); pub fn init(typ: []const u8, opts_: ?Options, page: *Page) !*MessageEvent { @@ -75,7 +82,11 @@ fn initWithTrusted(arena: Allocator, typ: String, opts_: ?Options, trusted: bool pub fn deinit(self: *MessageEvent, session: *Session) void { if (self._data) |d| { - d.release(); + switch (d) { + .value => |js_val| js_val.release(), + .blob => |blob| blob.releaseRef(session), + .string, .arraybuffer => {}, + } } self._proto.deinit(session); } @@ -92,7 +103,7 @@ pub fn asEvent(self: *MessageEvent) *Event { return self._proto; } -pub fn getData(self: *const MessageEvent) ?js.Value.Temp { +pub fn getData(self: *const MessageEvent) ?Data { return self._data; } diff --git a/src/browser/webapi/net/WebSocket.zig b/src/browser/webapi/net/WebSocket.zig new file mode 100644 index 00000000..1244a61e --- /dev/null +++ b/src/browser/webapi/net/WebSocket.zig @@ -0,0 +1,727 @@ +// Copyright (C) 2023-2026 Lightpanda (Selecy SAS) +// +// Francis Bouvier +// Pierre Tachoire +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +const std = @import("std"); +const lp = @import("lightpanda"); + +const log = @import("../../../log.zig"); +const http = @import("../../../network/http.zig"); + +const js = @import("../../js/js.zig"); +const Blob = @import("../Blob.zig"); +const URL = @import("../../URL.zig"); +const Page = @import("../../Page.zig"); +const Session = @import("../../Session.zig"); +const HttpClient = @import("../../HttpClient.zig"); + +const Event = @import("../Event.zig"); +const EventTarget = @import("../EventTarget.zig"); +const MessageEvent = @import("../event/MessageEvent.zig"); +const CloseEvent = @import("../event/CloseEvent.zig"); + +const Allocator = std.mem.Allocator; +const IS_DEBUG = @import("builtin").mode == .Debug; + +const WebSocket = @This(); +_rc: lp.RC(u8) = .{}, +_page: *Page, +_proto: *EventTarget, +_arena: Allocator, + +// Connection state +_ready_state: ReadyState = .connecting, +_url: [:0]const u8 = "", +_binary_type: BinaryType = .blob, + +// Handshake tracking +_got_101: bool = false, +_got_upgrade: bool = false, + +_conn: ?*http.Connection, +_http_client: *HttpClient, + +// buffered outgoing messages +_send_queue: std.ArrayList(Message) = .empty, +_send_offset: usize = 0, + +// buffered incoming frame +_recv_buffer: std.ArrayList(u8) = .empty, + +// close info for event dispatch +_close_code: u16 = 1000, +_close_reason: []const u8 = "", + +// Event handlers +_on_open: ?js.Function.Temp = null, +_on_message: ?js.Function.Temp = null, +_on_error: ?js.Function.Temp = null, +_on_close: ?js.Function.Temp = null, + +pub const ReadyState = enum(u8) { + connecting = 0, + open = 1, + closing = 2, + closed = 3, +}; + +pub const BinaryType = enum { + blob, + arraybuffer, +}; + +pub fn init(url: []const u8, protocols_: ?[]const u8, page: *Page) !*WebSocket { + if (protocols_) |protocols| { + if (protocols.len > 0) { + log.warn(.not_implemented, "WS protocols", .{ .protocols = protocols }); + } + } + + { + if (url.len < 6) { + return error.SyntaxError; + } + const normalized_start = std.ascii.lowerString(&page.buf, url[0..6]); + if (!std.mem.startsWith(u8, normalized_start, "ws://") and !std.mem.startsWith(u8, normalized_start, "wss://")) { + return error.SyntaxError; + } + // Fragments are not allowed in WebSocket URLs + if (std.mem.indexOfScalar(u8, url, '#') != null) { + return error.SyntaxError; + } + } + + const arena = try page.getArena(.{ .debug = "WebSocket" }); + errdefer page.releaseArena(arena); + + const resolved_url = try URL.resolve(arena, page.base(), url, .{ .always_dupe = true, .encode = true }); + + const http_client = page._session.browser.http_client; + const conn = http_client.network.newConnection() orelse { + return error.NoFreeConnection; + }; + + errdefer http_client.network.releaseConnection(conn); + + try conn.setURL(resolved_url); + try conn.setConnectOnly(false); + + try conn.setReadCallback(sendDataCallback, true); + try conn.setWriteCallback(receivedDataCallback); + try conn.setHeaderCallback(receivedHeaderCallback); + + const self = try page._factory.eventTargetWithAllocator(arena, WebSocket{ + ._page = page, + ._conn = conn, + ._arena = arena, + ._proto = undefined, + ._url = resolved_url, + ._http_client = http_client, + }); + conn.transport = .{ .websocket = self }; + try http_client.trackConn(conn); + + if (comptime IS_DEBUG) { + log.info(.websocket, "connecting", .{ .url = url }); + } + + // Unlike an XHR object where we only selectively reference the instance + // while the request is actually inflight, WS connection is "inflight" from + // the moment it's created. + self.acquireRef(); + + return self; +} + +pub fn deinit(self: *WebSocket, session: *Session) void { + self.cleanup(); + + if (self._on_open) |func| { + func.release(); + } + if (self._on_message) |func| { + func.release(); + } + if (self._on_error) |func| { + func.release(); + } + if (self._on_close) |func| { + func.release(); + } + + for (self._send_queue.items) |msg| { + msg.deinit(session); + } + + session.releaseArena(self._arena); +} + +// we're being aborted internally (e.g. page shutting down) +pub fn kill(self: *WebSocket) void { + self.cleanup(); +} + +pub fn disconnected(self: *WebSocket, err_: ?anyerror) void { + const was_clean = self._ready_state == .closing and err_ == null; + self._ready_state = .closed; + + if (err_) |err| { + log.warn(.websocket, "disconnected", .{ .err = err, .url = self._url }); + } else { + log.info(.websocket, "disconnected", .{ .url = self._url, .reason = "closed" }); + } + + self.cleanup(); + + // Use 1006 (abnormal closure) if connection wasn't cleanly closed + const code = if (was_clean) self._close_code else 1006; + const reason = if (was_clean) self._close_reason else ""; + + // Spec requires error event before close on abnormal closure + if (!was_clean) { + self.dispatchErrorEvent() catch |err| { + log.err(.websocket, "error event dispatch failed", .{ .err = err }); + }; + } + + self.dispatchCloseEvent(code, reason, was_clean) catch |err| { + log.err(.websocket, "close event dispatch failed", .{ .err = err }); + }; +} + +fn cleanup(self: *WebSocket) void { + if (self._conn) |conn| { + self._http_client.removeConn(conn); + self._conn = null; + self.releaseRef(self._page._session); + self._send_queue.clearRetainingCapacity(); + } +} + +pub fn releaseRef(self: *WebSocket, session: *Session) void { + self._rc.release(self, session); +} + +pub fn acquireRef(self: *WebSocket) void { + self._rc.acquire(); +} + +fn asEventTarget(self: *WebSocket) *EventTarget { + return self._proto; +} + +fn queueMessage(self: *WebSocket, msg: Message) !void { + const was_empty = self._send_queue.items.len == 0; + try self._send_queue.append(self._arena, msg); + + if (was_empty) { + // Unpause the send callback so libcurl will request data + if (self._conn) |conn| { + try conn.pause(.{ .cont = true }); + } + } +} + +/// WebSocket send() accepts string, Blob, ArrayBuffer, or TypedArray +const SendData = union(enum) { + blob: *Blob, + js_val: js.Value, +}; + +/// Union for extracting bytes from ArrayBuffer/TypedArray +const BinaryData = union(enum) { + int8: []i8, + uint8: []u8, + int16: []i16, + uint16: []u16, + int32: []i32, + uint32: []u32, + int64: []i64, + uint64: []u64, + + fn asBuffer(self: BinaryData) []u8 { + return switch (self) { + .int8 => |b| @as([*]u8, @ptrCast(b.ptr))[0..b.len], + .uint8 => |b| b, + .int16 => |b| @as([*]u8, @ptrCast(b.ptr))[0 .. b.len * 2], + .uint16 => |b| @as([*]u8, @ptrCast(b.ptr))[0 .. b.len * 2], + .int32 => |b| @as([*]u8, @ptrCast(b.ptr))[0 .. b.len * 4], + .uint32 => |b| @as([*]u8, @ptrCast(b.ptr))[0 .. b.len * 4], + .int64 => |b| @as([*]u8, @ptrCast(b.ptr))[0 .. b.len * 8], + .uint64 => |b| @as([*]u8, @ptrCast(b.ptr))[0 .. b.len * 8], + }; + } +}; + +pub fn send(self: *WebSocket, data: SendData) !void { + if (self._ready_state != .open) { + return error.InvalidStateError; + } + + // Get a dedicated arena for this message + const arena = try self._page._session.getArena(.{ .debug = "WebSocket message" }); + errdefer self._page._session.releaseArena(arena); + + switch (data) { + .blob => |blob| { + try self.queueMessage(.{ .binary = .{ + .arena = arena, + .data = try arena.dupe(u8, blob._slice), + } }); + }, + .js_val => |js_val| { + if (js_val.isString()) |str| { + try self.queueMessage(.{ .text = .{ + .arena = arena, + .data = try str.toSliceWithAlloc(arena), + } }); + } else { + const binary = try js_val.toZig(BinaryData); + try self.queueMessage(.{ .binary = .{ + .arena = arena, + .data = try arena.dupe(u8, binary.asBuffer()), + } }); + } + }, + } +} + +pub fn close(self: *WebSocket, code_: ?u16, reason_: ?[]const u8) !void { + if (self._ready_state == .closing or self._ready_state == .closed) { + return; + } + + // Validate close code per spec: must be 1000 or in range 3000-4999 + if (code_) |code| { + if (code != 1000 and (code < 3000 or code > 4999)) { + return error.InvalidAccessError; + } + } + + const code = code_ orelse 1000; + const reason = reason_ orelse ""; + + if (self._ready_state == .connecting) { + // Connection not yet established - fail it + self._ready_state = .closed; + self.cleanup(); + try self.dispatchCloseEvent(code, reason, false); + return; + } + + self._ready_state = .closing; + self._close_code = code; + self._close_reason = try self._arena.dupe(u8, reason); + try self.queueMessage(.close); +} + +pub fn getUrl(self: *const WebSocket) []const u8 { + return self._url; +} + +pub fn getReadyState(self: *const WebSocket) u16 { + return @intFromEnum(self._ready_state); +} + +pub fn getBufferedAmount(self: *const WebSocket) u32 { + var buffered: u32 = 0; + for (self._send_queue.items) |msg| { + switch (msg) { + .text, .binary => |byte_msg| buffered += @intCast(byte_msg.data.len), + .close => buffered += @intCast(2 + self._close_reason.len), + } + } + return buffered; +} + +pub fn getBinaryType(self: *const WebSocket) []const u8 { + return @tagName(self._binary_type); +} + +pub fn setBinaryType(self: *WebSocket, value: []const u8) void { + if (std.meta.stringToEnum(BinaryType, value)) |bt| { + self._binary_type = bt; + } +} + +pub fn getOnOpen(self: *const WebSocket) ?js.Function.Temp { + return self._on_open; +} + +pub fn setOnOpen(self: *WebSocket, cb_: ?js.Function) !void { + if (self._on_open) |old| old.release(); + if (cb_) |cb| { + self._on_open = try cb.tempWithThis(self); + } else { + self._on_open = null; + } +} + +pub fn getOnMessage(self: *const WebSocket) ?js.Function.Temp { + return self._on_message; +} + +pub fn setOnMessage(self: *WebSocket, cb_: ?js.Function) !void { + if (self._on_message) |old| old.release(); + if (cb_) |cb| { + self._on_message = try cb.tempWithThis(self); + } else { + self._on_message = null; + } +} + +pub fn getOnError(self: *const WebSocket) ?js.Function.Temp { + return self._on_error; +} + +pub fn setOnError(self: *WebSocket, cb_: ?js.Function) !void { + if (self._on_error) |old| old.release(); + if (cb_) |cb| { + self._on_error = try cb.tempWithThis(self); + } else { + self._on_error = null; + } +} + +pub fn getOnClose(self: *const WebSocket) ?js.Function.Temp { + return self._on_close; +} + +pub fn setOnClose(self: *WebSocket, cb_: ?js.Function) !void { + if (self._on_close) |old| old.release(); + if (cb_) |cb| { + self._on_close = try cb.tempWithThis(self); + } else { + self._on_close = null; + } +} + +fn dispatchOpenEvent(self: *WebSocket) !void { + const page = self._page; + const target = self.asEventTarget(); + + if (page._event_manager.hasDirectListeners(target, "open", self._on_open)) { + const event = try Event.initTrusted(comptime .wrap("open"), .{}, page); + try page._event_manager.dispatchDirect(target, event, self._on_open, .{ .context = "WebSocket open" }); + } +} + +fn dispatchMessageEvent(self: *WebSocket, data: []const u8, frame_type: http.WsFrameType) !void { + const page = self._page; + const target = self.asEventTarget(); + + if (page._event_manager.hasDirectListeners(target, "message", self._on_message)) { + const msg_data: MessageEvent.Data = if (frame_type == .binary) + switch (self._binary_type) { + .arraybuffer => .{ .arraybuffer = .{ .values = data } }, + .blob => blk: { + const blob = try Blob.init(&.{data}, .{}, page); + blob.acquireRef(); + break :blk .{ .blob = blob }; + }, + } + else + .{ .string = data }; + + const event = try MessageEvent.initTrusted(comptime .wrap("message"), .{ + .data = msg_data, + .origin = "", + }, page); + try page._event_manager.dispatchDirect(target, event.asEvent(), self._on_message, .{ .context = "WebSocket message" }); + } +} + +fn dispatchErrorEvent(self: *WebSocket) !void { + const page = self._page; + const target = self.asEventTarget(); + + if (page._event_manager.hasDirectListeners(target, "error", self._on_error)) { + const event = try Event.initTrusted(comptime .wrap("error"), .{}, page); + try page._event_manager.dispatchDirect(target, event, self._on_error, .{ .context = "WebSocket error" }); + } +} + +fn dispatchCloseEvent(self: *WebSocket, code: u16, reason: []const u8, was_clean: bool) !void { + const page = self._page; + const target = self.asEventTarget(); + + if (page._event_manager.hasDirectListeners(target, "close", self._on_close)) { + const event = try CloseEvent.initTrusted(comptime .wrap("close"), .{ + .code = code, + .reason = reason, + .wasClean = was_clean, + }, page); + try page._event_manager.dispatchDirect(target, event.asEvent(), self._on_close, .{ .context = "WebSocket close" }); + } +} + +fn sendDataCallback(buffer: [*]u8, buf_count: usize, buf_len: usize, data: *anyopaque) usize { + if (comptime IS_DEBUG) { + std.debug.assert(buf_count == 1); + } + const conn: *http.Connection = @ptrCast(@alignCast(data)); + return _sendDataCallback(conn, buffer[0..buf_len]) catch |err| { + log.warn(.websocket, "send callback", .{ .err = err }); + return http.readfunc_pause; + }; +} + +fn _sendDataCallback(conn: *http.Connection, buf: []u8) !usize { + lp.assert(buf.len >= 2, "WS short buffer", .{ .len = buf.len }); + + const self = conn.transport.websocket; + + if (self._send_queue.items.len == 0) { + // No data to send - pause until queueMessage is called + return http.readfunc_pause; + } + + const msg = &self._send_queue.items[0]; + + switch (msg.*) { + .close => { + const code = self._close_code; + const reason = self._close_reason; + + // Close frame: 2 bytes for code (big-endian) + optional reason + // Truncate reason to fit in buf (max 123 bytes per spec) + const reason_len: usize = @min(reason.len, 123, buf.len -| 2); + const frame_len = 2 + reason_len; + const to_copy = @min(buf.len, frame_len); + + var close_payload: [125]u8 = undefined; + close_payload[0] = @intCast((code >> 8) & 0xFF); + close_payload[1] = @intCast(code & 0xFF); + if (reason_len > 0) { + @memcpy(close_payload[2..][0..reason_len], reason[0..reason_len]); + } + + try conn.wsStartFrame(.close, to_copy); + @memcpy(buf[0..to_copy], close_payload[0..to_copy]); + + _ = self._send_queue.orderedRemove(0); + return to_copy; + }, + .text => |content| return self.writeContent(conn, buf, content, .text), + .binary => |content| return self.writeContent(conn, buf, content, .binary), + } +} + +fn writeContent(self: *WebSocket, conn: *http.Connection, buf: []u8, byte_msg: Message.Content, frame_type: http.WsFrameType) !usize { + if (self._send_offset == 0) { + // start of the message + if (comptime IS_DEBUG) { + log.debug(.websocket, "send start", .{ .url = self._url, .len = byte_msg.data.len }); + } + try conn.wsStartFrame(frame_type, byte_msg.data.len); + } + + const remaining = byte_msg.data[self._send_offset..]; + const to_copy = @min(remaining.len, buf.len); + @memcpy(buf[0..to_copy], remaining[0..to_copy]); + + self._send_offset += to_copy; + + if (self._send_offset >= byte_msg.data.len) { + const removed = self._send_queue.orderedRemove(0); + removed.deinit(self._page._session); + if (comptime IS_DEBUG) { + log.debug(.websocket, "send complete", .{ .url = self._url, .len = byte_msg.data.len, .queue = self._send_queue.items.len }); + } + self._send_offset = 0; + } + + return to_copy; +} + +fn receivedDataCallback(buffer: [*]const u8, buf_count: usize, buf_len: usize, data: *anyopaque) usize { + if (comptime IS_DEBUG) { + std.debug.assert(buf_count == 1); + } + const conn: *http.Connection = @ptrCast(@alignCast(data)); + _receivedDataCallback(conn, buffer[0..buf_len]) catch |err| { + log.warn(.websocket, "receive callback", .{ .err = err }); + // TODO: are there errors, like an invalid frame, that we shouldn't treat + // as an error? + return http.writefunc_error; + }; + + return buf_len; +} + +fn _receivedDataCallback(conn: *http.Connection, data: []const u8) !void { + const self = conn.transport.websocket; + const meta = conn.wsMeta() orelse { + log.err(.websocket, "missing meta", .{ .url = self._url }); + return error.NoFrameMeta; + }; + + if (meta.offset == 0) { + if (comptime IS_DEBUG) { + log.debug(.websocket, "incoming message", .{ .url = self._url, .len = meta.len, .bytes_left = meta.bytes_left, .type = meta.frame_type }); + } + // Start of new frame. Pre-allocate buffer + self._recv_buffer.clearRetainingCapacity(); + if (meta.len > self._http_client.max_response_size) { + return error.MessageTooLarge; + } + try self._recv_buffer.ensureTotalCapacity(self._arena, meta.len); + } + + try self._recv_buffer.appendSlice(self._arena, data); + + if (meta.bytes_left > 0) { + // still more data waiting for this frame + return; + } + + const message = self._recv_buffer.items; + switch (meta.frame_type) { + .text, .binary => try self.dispatchMessageEvent(message, meta.frame_type), + .close => { + // Parse close frame: 2-byte code (big-endian) + optional reason + const received_code = if (message.len >= 2) + @as(u16, message[0]) << 8 | message[1] + else + 1005; // No status code received + + if (self._ready_state == .closing) { + // Client-initiated close: this is the server's response. + // Close handshake complete - disconnect. + self.disconnected(null); + } else { + // Server-initiated close: send reciprocal close frame per RFC 6455 ยง5.5.1 + self._close_code = received_code; + if (message.len > 2) { + self._close_reason = try self._arena.dupe(u8, message[2..]); + } + self._ready_state = .closing; + try self.queueMessage(.close); + } + }, + .ping, .pong, .cont => {}, + } +} + +// libcurl has no mechanism to signal that the connection is established. The +// best option I could come up with was looking for an upgrade header response. +fn receivedHeaderCallback(buffer: [*]const u8, header_count: usize, buf_len: usize, data: *anyopaque) usize { + if (comptime IS_DEBUG) { + std.debug.assert(header_count == 1); + } + const conn: *http.Connection = @ptrCast(@alignCast(data)); + const self = conn.transport.websocket; + const header = buffer[0..buf_len]; + + if (self._got_101 == false and std.mem.startsWith(u8, header, "HTTP/")) { + if (std.mem.indexOf(u8, header, " 101 ")) |_| { + self._got_101 = true; + } + return buf_len; + } + + // Empty line = end of headers + if (buf_len <= 2) { + if (!self._got_101 or !self._got_upgrade) { + return 0; + } + + self._ready_state = .open; + log.info(.websocket, "connected", .{ .url = self._url }); + + self.dispatchOpenEvent() catch |err| { + log.err(.websocket, "open event fail", .{ .err = err }); + }; + return buf_len; + } + + if (self._got_upgrade) { + // dont' care about headers once we've gotten the upgrade header + return buf_len; + } + + const colon = std.mem.indexOfScalarPos(u8, header, 0, ':') orelse { + // weird, continue... + return buf_len; + }; + + if (std.ascii.eqlIgnoreCase(header[0..colon], "upgrade") == false) { + return buf_len; + } + + const value = std.mem.trim(u8, header[colon + 1 ..], " \t\r\n"); + if (std.ascii.eqlIgnoreCase(value, "websocket")) { + self._got_upgrade = true; + } + + return buf_len; +} + +const Message = union(enum) { + close, + text: Content, + binary: Content, + + const Content = struct { + arena: Allocator, + data: []const u8, + }; + fn deinit(self: Message, session: *Session) void { + switch (self) { + .text, .binary => |msg| session.releaseArena(msg.arena), + .close => {}, + } + } +}; + +pub const JsApi = struct { + pub const bridge = js.Bridge(WebSocket); + + pub const Meta = struct { + pub const name = "WebSocket"; + pub const prototype_chain = bridge.prototypeChain(); + pub var class_id: bridge.ClassId = undefined; + }; + + pub const constructor = bridge.constructor(WebSocket.init, .{ .dom_exception = true }); + + pub const CONNECTING = bridge.property(@intFromEnum(ReadyState.connecting), .{ .template = true }); + pub const OPEN = bridge.property(@intFromEnum(ReadyState.open), .{ .template = true }); + pub const CLOSING = bridge.property(@intFromEnum(ReadyState.closing), .{ .template = true }); + pub const CLOSED = bridge.property(@intFromEnum(ReadyState.closed), .{ .template = true }); + + pub const url = bridge.accessor(WebSocket.getUrl, null, .{}); + pub const readyState = bridge.accessor(WebSocket.getReadyState, null, .{}); + pub const bufferedAmount = bridge.accessor(WebSocket.getBufferedAmount, null, .{}); + pub const binaryType = bridge.accessor(WebSocket.getBinaryType, WebSocket.setBinaryType, .{}); + + pub const protocol = bridge.property("", .{ .template = false }); + pub const extensions = bridge.property("", .{ .template = false }); + + pub const onopen = bridge.accessor(WebSocket.getOnOpen, WebSocket.setOnOpen, .{}); + pub const onmessage = bridge.accessor(WebSocket.getOnMessage, WebSocket.setOnMessage, .{}); + pub const onerror = bridge.accessor(WebSocket.getOnError, WebSocket.setOnError, .{}); + pub const onclose = bridge.accessor(WebSocket.getOnClose, WebSocket.setOnClose, .{}); + + pub const send = bridge.function(WebSocket.send, .{ .dom_exception = true }); + pub const close = bridge.function(WebSocket.close, .{}); +}; + +const testing = @import("../../../testing.zig"); +test "WebApi: WebSocket" { + try testing.htmlRunner("net/websocket.html", .{}); +} diff --git a/src/browser/webapi/storage/Cookie.zig b/src/browser/webapi/storage/Cookie.zig index 649a04cc..b34fbffe 100644 --- a/src/browser/webapi/storage/Cookie.zig +++ b/src/browser/webapi/storage/Cookie.zig @@ -273,6 +273,12 @@ pub fn parseDomain(arena: Allocator, url_: ?[:0]const u8, explicit_domain: ?[]co // can't set a cookie for a TLD return error.InvalidDomain; } + + // Can't set a cookie for a public suffix (e.g. co.uk, com.au). + if (public_suffix_list(owned_domain[1..])) { + return error.InvalidDomain; + } + if (encoded_host) |host| { if (std.mem.endsWith(u8, host, owned_domain[1..]) == false) { return error.InvalidDomain; @@ -283,7 +289,10 @@ pub fn parseDomain(arena: Allocator, url_: ?[:0]const u8, explicit_domain: ?[]co } } - return encoded_host orelse return error.InvalidDomain; // default-domain + if (encoded_host) |host| { + if (host.len > 0) return host; + } + return error.InvalidDomain; } pub fn percentEncode(arena: Allocator, part: []const u8, comptime isValidChar: fn (u8) bool) ![]u8 { @@ -353,6 +362,9 @@ pub fn appliesTo(self: *const Cookie, url: *const PreparedUri, same_site: bool, } { + if (self.domain.len == 0) { + return false; + } if (self.domain[0] == '.') { // When a Set-Cookie header has a Domain attribute // Then we will _always_ prefix it with a dot, extending its @@ -1027,6 +1039,15 @@ test "Cookie: parse domain" { try expectError(error.InvalidDomain, "http://lightpanda.io/", "b;domain=other.lightpanda.io"); try expectError(error.InvalidDomain, "http://lightpanda.io/", "b;domain=other.lightpanda.com"); try expectError(error.InvalidDomain, "http://lightpanda.io/", "b;domain=other.example.com"); + + // Public suffixes should be rejected (test PSL entries: "gov.uk", "api.gov.uk") + try expectError(error.InvalidDomain, "http://example.gov.uk/", "b;domain=gov.uk"); + try expectError(error.InvalidDomain, "http://example.gov.uk/", "b;domain=.gov.uk"); + try expectError(error.InvalidDomain, "http://test.api.gov.uk/", "b;domain=api.gov.uk"); + + // Subdomains of public suffixes should still be accepted + try expectAttribute(.{ .domain = ".example.gov.uk" }, "http://example.gov.uk/", "b;domain=example.gov.uk"); + try expectAttribute(.{ .domain = ".example.gov.uk" }, "http://sub.example.gov.uk/", "b;domain=example.gov.uk"); } test "Cookie: parse limit" { @@ -1079,3 +1100,28 @@ fn expectAttribute(expected: anytype, url_: ?[:0]const u8, set_cookie: []const u fn expectError(expected: anyerror, url: ?[:0]const u8, set_cookie: []const u8) !void { try testing.expectError(expected, Cookie.parse(testing.allocator, url orelse test_url, set_cookie)); } + +test "Cookie: appliesTo with empty domain" { + const cookie = Cookie{ + .arena = std.heap.ArenaAllocator.init(testing.allocator), + .name = "test", + .value = "value", + .domain = "", + .path = "/", + .expires = null, + }; + defer cookie.deinit(); + + const target = PreparedUri{ + .host = "example.com", + .path = "/", + .secure = false, + }; + + try testing.expectEqual(false, cookie.appliesTo(&target, true, true, true)); +} + +test "Cookie: parse rejects URL with empty host" { + try testing.expectError(error.InvalidDomain, Cookie.parse(testing.allocator, "http:///path", "name=value")); + try testing.expectError(error.InvalidDomain, Cookie.parse(testing.allocator, "http://", "name=value")); +} diff --git a/src/cdp/CDP.zig b/src/cdp/CDP.zig index 09fa8324..89efcef1 100644 --- a/src/cdp/CDP.zig +++ b/src/cdp/CDP.zig @@ -431,6 +431,7 @@ pub const BrowserContext = struct { try notification.register(.page_frame_created, self, onPageFrameCreated); try notification.register(.page_dom_content_loaded, self, onPageDOMContentLoaded); try notification.register(.page_loaded, self, onPageLoaded); + try notification.register(.javascript_dialog_opening, self, onJavascriptDialogOpening); } pub fn deinit(self: *BrowserContext) void { @@ -641,6 +642,11 @@ pub const BrowserContext = struct { return @import("domains/page.zig").pageLoaded(self, msg); } + pub fn onJavascriptDialogOpening(ctx: *anyopaque, msg: *const Notification.JavascriptDialogOpening) !void { + const self: *BrowserContext = @ptrCast(@alignCast(ctx)); + return @import("domains/page.zig").javascriptDialogOpening(self, msg); + } + pub fn onHttpResponseHeadersDone(ctx: *anyopaque, msg: *const Notification.ResponseHeaderDone) !void { const self: *BrowserContext = @ptrCast(@alignCast(ctx)); defer self.resetNotificationArena(); diff --git a/src/cdp/domains/page.zig b/src/cdp/domains/page.zig index cf3cdd7d..c306ead7 100644 --- a/src/cdp/domains/page.zig +++ b/src/cdp/domains/page.zig @@ -48,6 +48,7 @@ pub fn processMessage(cmd: *CDP.Command) !void { close, captureScreenshot, getLayoutMetrics, + handleJavaScriptDialog, }, cmd.input.action) orelse return error.UnknownMethod; switch (action) { @@ -63,6 +64,7 @@ pub fn processMessage(cmd: *CDP.Command) !void { .close => return close(cmd), .captureScreenshot => return captureScreenshot(cmd), .getLayoutMetrics => return getLayoutMetrics(cmd), + .handleJavaScriptDialog => return handleJavaScriptDialog(cmd), } } @@ -642,6 +644,32 @@ fn sendPageLifecycle(bc: *CDP.BrowserContext, name: []const u8, timestamp: u64, }, .{ .session_id = session_id }); } +// https://chromedevtools.github.io/devtools-protocol/tot/Page/#method-handleJavaScriptDialog +fn handleJavaScriptDialog(cmd: *CDP.Command) !void { + // Dialogs auto-dismiss in headless mode. By the time the CDP client + // sends this command, the dialog has already returned and there is + // no pending dialog to accept or dismiss. + _ = try cmd.params(struct { + accept: bool, + promptText: ?[]const u8 = null, + }); + return cmd.sendError(-32000, "No dialog is showing", .{}); +} + +// https://chromedevtools.github.io/devtools-protocol/tot/Page/#event-javascriptDialogOpening +pub fn javascriptDialogOpening(bc: anytype, event: *const Notification.JavascriptDialogOpening) !void { + const session_id = bc.session_id orelse return; + var cdp = bc.cdp; + + try cdp.sendEvent("Page.javascriptDialogOpening", .{ + .url = event.url, + .message = event.message, + .type = event.dialog_type, + .hasBrowserHandler = false, + .defaultPrompt = "", + }, .{ .session_id = session_id }); +} + const LifecycleEvent = struct { frameId: []const u8, loaderId: ?[]const u8, diff --git a/src/datetime.zig b/src/datetime.zig index 04f1a6d5..3b9d5951 100644 --- a/src/datetime.zig +++ b/src/datetime.zig @@ -335,7 +335,7 @@ pub const DateTime = struct { } const tm = try parser.time(false); - if (parser.consumeIf(' ') == false) { + if (parser.unconsumed() == 0 or parser.consumeIf(' ') == false) { return error.InvalidTime; } @@ -1496,6 +1496,9 @@ test "DateTime: parse RFC822" { try testing.expectError(error.InvalidTime, DateTime.parse("Wed, 01 Jan 20 20:1a:22 X", .rfc822)); try testing.expectError(error.InvalidTime, DateTime.parse("Wed, 01 Jan 20 20:1a:22 ZZ", .rfc822)); + // Missing timezone - input ends exactly at time boundary (was causing index out of bounds) + try testing.expectError(error.InvalidTime, DateTime.parse("Wed, 01 Jan 2020 10:10:10", .rfc822)); + { const dt = try DateTime.parse("31 Dec 68 23:59 Z", .rfc822); try testing.expectEqual(3124223940000000, dt.micros); diff --git a/src/lightpanda.zig b/src/lightpanda.zig index 37ec9c44..3e07d530 100644 --- a/src/lightpanda.zig +++ b/src/lightpanda.zig @@ -250,9 +250,7 @@ pub fn RC(comptime T: type) type { } pub fn release(self: *@This(), value: anytype, session: *Session) void { - if (comptime IS_DEBUG) { - std.debug.assert(self._refs > 0); - } + assert(self._refs > 0, "release overflow", .{ .type = @typeName(@TypeOf(value)) }); const refs = self._refs - 1; self._refs = refs; diff --git a/src/log.zig b/src/log.zig index 3e1016c5..84ff1049 100644 --- a/src/log.zig +++ b/src/log.zig @@ -40,6 +40,7 @@ pub const Scope = enum { unknown_prop, mcp, cache, + websocket, }; const Opts = struct { diff --git a/src/network/Network.zig b/src/network/Network.zig index ab11e5ce..1fb8c8fb 100644 --- a/src/network/Network.zig +++ b/src/network/Network.zig @@ -61,6 +61,11 @@ connections: []http.Connection, available: std.DoublyLinkedList = .{}, conn_mutex: std.Thread.Mutex = .{}, +ws_pool: std.heap.MemoryPool(http.Connection), +ws_count: usize = 0, +ws_max: u8, +ws_mutex: std.Thread.Mutex = .{}, + pollfds: []posix.pollfd, listener: ?Listener = null, @@ -268,9 +273,13 @@ pub fn init(allocator: Allocator, app: *App, config: *const Config) !Network { .connections = connections, .app = app, + .robot_store = RobotStore.init(allocator), .web_bot_auth = web_bot_auth, .cache = cache, + + .ws_pool = .init(allocator), + .ws_max = config.wsMaxConcurrent(), }; } @@ -298,6 +307,8 @@ pub fn deinit(self: *Network) void { } self.allocator.free(self.connections); + self.ws_pool.deinit(); + self.robot_store.deinit(); if (self.web_bot_auth) |wba| { wba.deinit(self.allocator); @@ -592,18 +603,50 @@ pub fn getConnection(self: *Network) ?*http.Connection { } pub fn releaseConnection(self: *Network, conn: *http.Connection) void { - conn.reset(self.config, self.ca_blob) catch |err| { - lp.assert(false, "couldn't reset curl easy", .{ .err = err }); - }; - - self.conn_mutex.lock(); - defer self.conn_mutex.unlock(); - - self.available.append(&conn.node); + switch (conn.transport) { + .websocket => { + conn.deinit(); + self.ws_mutex.lock(); + defer self.ws_mutex.unlock(); + self.ws_pool.destroy(conn); + self.ws_count -= 1; + }, + else => { + conn.reset(self.config, self.ca_blob) catch |err| { + lp.assert(false, "couldn't reset curl easy", .{ .err = err }); + }; + self.conn_mutex.lock(); + defer self.conn_mutex.unlock(); + self.available.append(&conn.node); + }, + } } -pub fn newConnection(self: *Network) !http.Connection { - return http.Connection.init(self.ca_blob, self.config); +pub fn newConnection(self: *Network) ?*http.Connection { + const conn = blk: { + self.ws_mutex.lock(); + defer self.ws_mutex.unlock(); + + if (self.ws_count >= self.ws_max) { + return null; + } + + const c = self.ws_pool.create() catch return null; + self.ws_count += 1; + break :blk c; + }; + + // don't do this under lock + conn.* = http.Connection.init(self.ca_blob, self.config) catch { + self.ws_mutex.lock(); + defer self.ws_mutex.unlock(); + self.ws_pool.destroy(conn); + self.ws_count -= 1; + + return null; + }; + + return conn; } // Wraps lines @ 64 columns. A PEM is basically a base64 encoded DER (which is diff --git a/src/network/http.zig b/src/network/http.zig index 6dc217ea..e7a7fab4 100644 --- a/src/network/http.zig +++ b/src/network/http.zig @@ -28,7 +28,9 @@ pub const ENABLE_DEBUG = false; pub const Blob = libcurl.CurlBlob; pub const WaitFd = libcurl.CurlWaitFd; +pub const readfunc_pause = libcurl.curl_readfunc_pause; pub const writefunc_error = libcurl.curl_writefunc_error; +pub const WsFrameType = libcurl.WsFrameType; const Error = libcurl.Error; @@ -222,15 +224,19 @@ pub const ResponseHead = struct { pub const Connection = struct { _easy: *libcurl.Curl, + transport: Transport, node: std.DoublyLinkedList.Node = .{}, - pub fn init( - ca_blob: ?libcurl.CurlBlob, - config: *const Config, - ) !Connection { + pub const Transport = union(enum) { + none, // used for cases that manage their own connection, e.g. telemetry + http: *@import("../browser/HttpClient.zig").Transfer, + websocket: *@import("../browser/webapi/net/WebSocket.zig"), + }; + + pub fn init(ca_blob: ?libcurl.CurlBlob, config: *const Config) !Connection { const easy = libcurl.curl_easy_init() orelse return error.FailedToInitializeEasy; - const self = Connection{ ._easy = easy }; + var self = Connection{ ._easy = easy, .transport = .none }; errdefer self.deinit(); try self.reset(config, ca_blob); @@ -310,7 +316,12 @@ pub const Connection = struct { try libcurl.curl_easy_setopt(self._easy, .user_pwd, creds.ptr); } - pub fn setCallbacks( + pub fn setConnectOnly(self: *const Connection, connect_only: bool) !void { + const value: c_long = if (connect_only) 2 else 0; + try libcurl.curl_easy_setopt(self._easy, .connect_only, value); + } + + pub fn setWriteCallback( self: *Connection, comptime data_cb: libcurl.CurlWriteFunction, ) !void { @@ -318,12 +329,40 @@ pub const Connection = struct { try libcurl.curl_easy_setopt(self._easy, .write_function, data_cb); } + pub fn setReadCallback( + self: *Connection, + comptime data_cb: libcurl.CurlReadFunction, + upload: bool, + ) !void { + try libcurl.curl_easy_setopt(self._easy, .read_data, self); + try libcurl.curl_easy_setopt(self._easy, .read_function, data_cb); + if (upload) { + try libcurl.curl_easy_setopt(self._easy, .upload, true); + } + } + + pub fn setHeaderCallback( + self: *Connection, + comptime data_cb: libcurl.CurlHeaderFunction, + ) !void { + try libcurl.curl_easy_setopt(self._easy, .header_data, self); + try libcurl.curl_easy_setopt(self._easy, .header_function, data_cb); + } + + pub fn pause( + self: *Connection, + flags: libcurl.CurlPauseFlags, + ) !void { + try libcurl.curl_easy_pause(self._easy, flags); + } + pub fn reset( - self: *const Connection, + self: *Connection, config: *const Config, ca_blob: ?libcurl.CurlBlob, ) !void { libcurl.curl_easy_reset(self._easy); + self.transport = .none; // timeouts try libcurl.curl_easy_setopt(self._easy, .timeout_ms, config.httpTimeout()); @@ -460,12 +499,6 @@ pub const Connection = struct { }; } - pub fn getPrivate(self: *const Connection) !*anyopaque { - var private: *anyopaque = undefined; - try libcurl.curl_easy_getinfo(self._easy, .private, &private); - return private; - } - // These are headers that may not be send to the users for inteception. pub fn secretHeaders(_: *const Connection, headers: *Headers, http_headers: *const Config.HttpHeaders) !void { if (http_headers.proxy_bearer_header) |hdr| { @@ -482,6 +515,14 @@ pub const Connection = struct { try libcurl.curl_easy_perform(self._easy); return self.getResponseCode(); } + + pub fn wsStartFrame(self: *const Connection, frame_type: libcurl.WsFrameType, size: usize) !void { + try libcurl.curl_ws_start_frame(self._easy, frame_type, @intCast(size)); + } + + pub fn wsMeta(self: *const Connection) ?libcurl.WsFrameMeta { + return libcurl.curl_ws_meta(self._easy); + } }; pub const Handles = struct { @@ -519,17 +560,21 @@ pub const Handles = struct { } pub const MultiMessage = struct { - conn: Connection, + conn: *Connection, err: ?Error, }; - pub fn readMessage(self: *Handles) ?MultiMessage { + pub fn readMessage(self: *Handles) !?MultiMessage { var messages_count: c_int = 0; const msg = libcurl.curl_multi_info_read(self.multi, &messages_count) orelse return null; return switch (msg.data) { - .done => |err| .{ - .conn = .{ ._easy = msg.easy_handle }, - .err = err, + .done => |err| { + var private: *anyopaque = undefined; + try libcurl.curl_easy_getinfo(msg.easy_handle, .private, &private); + return .{ + .conn = @ptrCast(@alignCast(private)), + .err = err, + }; }, else => unreachable, }; diff --git a/src/sys/libcurl.zig b/src/sys/libcurl.zig index 0e2defe3..31587823 100644 --- a/src/sys/libcurl.zig +++ b/src/sys/libcurl.zig @@ -40,6 +40,8 @@ pub const CurlDebugFunction = fn (*Curl, CurlInfoType, [*c]u8, usize, *anyopaque pub const CurlHeaderFunction = fn ([*]const u8, usize, usize, *anyopaque) usize; pub const CurlWriteFunction = fn ([*]const u8, usize, usize, *anyopaque) usize; pub const curl_writefunc_error: usize = c.CURL_WRITEFUNC_ERROR; +pub const curl_readfunc_pause: usize = c.CURL_READFUNC_PAUSE; +pub const CurlReadFunction = fn ([*]u8, usize, usize, *anyopaque) usize; pub const FreeCallback = fn (ptr: ?*anyopaque) void; pub const StrdupCallback = fn (str: [*:0]const u8) ?[*:0]u8; @@ -98,6 +100,23 @@ pub const CurlWaitFd = extern struct { revents: CurlWaitEvents, }; +pub const CurlPauseFlags = packed struct(c_short) { + recv: bool = false, + send: bool = false, + all: bool = false, + cont: bool = false, + _reserved: u12 = 0, + + pub fn to_c(self: @This()) c_int { + var flags: c_int = 0; + if (self.recv) flags |= c.CURLPAUSE_RECV; + if (self.send) flags |= c.CURLPAUSE_SEND; + if (self.all) flags |= c.CURLPAUSE_ALL; + if (self.cont) flags |= c.CURLPAUSE_CONT; + return flags; + } +}; + comptime { const debug_cb_check: c.curl_debug_callback = struct { fn cb(handle: ?*Curl, msg_type: c.curl_infotype, raw: [*c]u8, len: usize, user: ?*anyopaque) callconv(.c) c_int { @@ -167,6 +186,10 @@ pub const CurlOption = enum(c.CURLoption) { header_function = c.CURLOPT_HEADERFUNCTION, write_data = c.CURLOPT_WRITEDATA, write_function = c.CURLOPT_WRITEFUNCTION, + read_data = c.CURLOPT_READDATA, + read_function = c.CURLOPT_READFUNCTION, + connect_only = c.CURLOPT_CONNECT_ONLY, + upload = c.CURLOPT_UPLOAD, }; pub const CurlMOption = enum(c.CURLMoption) { @@ -530,6 +553,7 @@ pub fn curl_easy_setopt(easy: *Curl, comptime option: CurlOption, value: anytype const code = switch (option) { .verbose, .post, + .upload, .http_get, .ssl_verify_host, .ssl_verify_peer, @@ -551,6 +575,7 @@ pub fn curl_easy_setopt(easy: *Curl, comptime option: CurlOption, value: anytype .max_redirs, .follow_location, .post_field_size, + .connect_only, => blk: { const n: c_long = switch (@typeInfo(@TypeOf(value))) { .comptime_int, .int => @intCast(value), @@ -593,6 +618,7 @@ pub fn curl_easy_setopt(easy: *Curl, comptime option: CurlOption, value: anytype .private, .header_data, + .read_data, .write_data, => blk: { const ptr: ?*anyopaque = switch (@typeInfo(@TypeOf(value))) { @@ -631,6 +657,22 @@ pub fn curl_easy_setopt(easy: *Curl, comptime option: CurlOption, value: anytype break :blk c.curl_easy_setopt(easy, opt, cb); }, + .read_function => blk: { + const cb: c.curl_write_callback = switch (@typeInfo(@TypeOf(value))) { + .null => null, + .@"fn" => |info| struct { + fn cb(buffer: [*c]u8, count: usize, len: usize, user: ?*anyopaque) callconv(.c) usize { + const user_arg = if (@typeInfo(info.params[3].type.?) == .optional) + user + else + user orelse unreachable; + return value(@ptrCast(buffer), count, len, user_arg); + } + }.cb, + else => @compileError("expected Zig function or null for " ++ @tagName(option) ++ ", got " ++ @typeName(@TypeOf(value))), + }; + break :blk c.curl_easy_setopt(easy, opt, cb); + }, .write_function => blk: { const cb: c.curl_write_callback = switch (@typeInfo(@TypeOf(value))) { .null => null, @@ -677,6 +719,10 @@ pub fn curl_easy_getinfo(easy: *Curl, comptime info: CurlInfo, out: anytype) Err try errorCheck(code); } +pub fn curl_easy_pause(easy: *Curl, flags: CurlPauseFlags) Error!void { + try errorCheck(c.curl_easy_pause(easy, flags.to_c())); +} + pub fn curl_easy_header( easy: *Curl, name: [*:0]const u8, @@ -804,3 +850,79 @@ pub fn curl_slist_free_all(list: ?*CurlSList) void { c.curl_slist_free_all(ptr); } } + +// WebSocket support (requires libcurl 7.86.0+) +pub const WsFrameType = enum { + text, + binary, + cont, + close, + ping, + pong, + + fn toInt(self: WsFrameType) c_uint { + return switch (self) { + .text => c.CURLWS_TEXT, + .binary => c.CURLWS_BINARY, + .cont => c.CURLWS_CONT, + .close => c.CURLWS_CLOSE, + .ping => c.CURLWS_PING, + .pong => c.CURLWS_PONG, + }; + } + + fn fromFlags(flags: c_int) WsFrameType { + const f: c_uint = @bitCast(flags); + if (f & c.CURLWS_TEXT != 0) return .text; + if (f & c.CURLWS_BINARY != 0) return .binary; + if (f & c.CURLWS_CLOSE != 0) return .close; + if (f & c.CURLWS_PING != 0) return .ping; + if (f & c.CURLWS_PONG != 0) return .pong; + if (f & c.CURLWS_CONT != 0) return .cont; + return .binary; // default fallback + } +}; + +pub const WsFrameMeta = struct { + frame_type: WsFrameType, + offset: usize, + bytes_left: usize, + len: usize, + + fn from(frame: *const c.curl_ws_frame) WsFrameMeta { + return .{ + .frame_type = WsFrameType.fromFlags(frame.flags), + .offset = @intCast(frame.offset), + .bytes_left = @intCast(frame.bytesleft), + .len = if (frame.len < 0) + std.math.maxInt(usize) + else + @intCast(frame.len), + }; + } +}; + +pub fn curl_ws_send(easy: *Curl, buffer: []const u8, sent: *usize, fragsize: CurlOffT, frame_type: WsFrameType) Error!void { + try errorCheck(c.curl_ws_send(easy, buffer.ptr, buffer.len, sent, fragsize, frame_type.toInt())); +} + +pub fn curl_ws_recv(easy: *Curl, buffer: []u8, recv: *usize, meta: *?WsFrameMeta) Error!void { + var c_meta: [*c]const c.curl_ws_frame = null; + const code = c.curl_ws_recv(easy, buffer.ptr, buffer.len, recv, &c_meta); + if (c_meta) |m| { + meta.* = WsFrameMeta.from(m); + } else { + meta.* = null; + } + try errorCheck(code); +} + +pub fn curl_ws_meta(easy: *Curl) ?WsFrameMeta { + const ptr = c.curl_ws_meta(easy); + if (ptr == null) return null; + return WsFrameMeta.from(ptr); +} + +pub fn curl_ws_start_frame(easy: *Curl, frame_type: WsFrameType, size: CurlOffT) Error!void { + try errorCheck(c.curl_ws_start_frame(easy, frame_type.toInt(), size)); +} diff --git a/src/testing.zig b/src/testing.zig index 8ff59751..e2ec937e 100644 --- a/src/testing.zig +++ b/src/testing.zig @@ -436,19 +436,17 @@ fn runWebApiTest(test_file: [:0]const u8) !void { if (js_val.isTrue()) { return; } - switch (try runner.tick(.{ .ms = 20 })) { - .done => return error.TestNeverSignaledCompletion, - .ok => |next_ms| { - const ms_elapsed = timer.lap() / 1_000_000; - if (ms_elapsed >= wait_ms) { - return error.TestTimedOut; - } - wait_ms -= @intCast(ms_elapsed); - if (next_ms > 0) { - std.Thread.sleep(std.time.ns_per_ms * next_ms); - } - }, + const sleep_ms: usize = switch (try runner.tick(.{ .ms = 20 })) { + .done => 20, + .ok => |next_ms| @min(next_ms, 20), + }; + + const ms_elapsed = timer.lap() / 1_000_000; + if (ms_elapsed >= wait_ms) { + return error.TestTimedOut; } + wait_ms -= @intCast(ms_elapsed); + std.Thread.sleep(std.time.ns_per_ms * sleep_ms); } } @@ -476,12 +474,15 @@ pub fn pageTest(comptime test_file: []const u8, opts: PageTestOpts) !*Page { const log = @import("log.zig"); const TestHTTPServer = @import("TestHTTPServer.zig"); +const TestWSServer = @import("TestWSServer.zig"); const Server = @import("Server.zig"); var test_cdp_server: ?*Server = null; var test_cdp_server_thread: ?std.Thread = null; var test_http_server: ?TestHTTPServer = null; var test_http_server_thread: ?std.Thread = null; +var test_ws_server: ?TestWSServer = null; +var test_ws_server_thread: ?std.Thread = null; var test_config: Config = undefined; @@ -495,6 +496,7 @@ test "tests:beforeAll" { .common = .{ .tls_verify_host = false, .user_agent_suffix = "internal-tester", + .ws_max_concurrent = 50, }, } }); @@ -514,13 +516,16 @@ test "tests:beforeAll" { test_session = try test_browser.newSession(test_notification); var wg: std.Thread.WaitGroup = .{}; - wg.startMany(2); + wg.startMany(3); test_cdp_server_thread = try std.Thread.spawn(.{}, serveCDP, .{&wg}); test_http_server = TestHTTPServer.init(testHTTPHandler); test_http_server_thread = try std.Thread.spawn(.{}, TestHTTPServer.run, .{ &test_http_server.?, &wg }); + test_ws_server = TestWSServer.init(); + test_ws_server_thread = try std.Thread.spawn(.{}, TestWSServer.run, .{ &test_ws_server.?, &wg }); + // need to wait for the servers to be listening, else tests will fail because // they aren't able to connect. wg.wait(); @@ -545,6 +550,13 @@ test "tests:afterAll" { server.deinit(); } + if (test_ws_server) |*server| { + server.stop(); + } + if (test_ws_server_thread) |thread| { + thread.join(); + } + @import("root").v8_peak_memory = test_browser.env.isolate.getHeapStatistics().total_physical_size; test_notification.deinit();