From 9a312a4177dfcb11be807d5790a90e8c764c0c2f Mon Sep 17 00:00:00 2001 From: Nikolay Govorov Date: Tue, 14 Apr 2026 07:10:09 +0100 Subject: [PATCH] Refactor server/client/cdp structure --- src/Server.zig | 523 ++++--------- src/browser/Browser.zig | 11 +- src/browser/Frame.zig | 4 +- src/browser/HttpClient.zig | 24 +- src/browser/Runner.zig | 2 +- src/browser/webapi/Worker.zig | 2 +- src/browser/webapi/WorkerGlobalScope.zig | 4 +- src/browser/webapi/net/Fetch.zig | 2 +- src/browser/webapi/net/WebSocket.zig | 2 +- src/browser/webapi/net/XMLHttpRequest.zig | 2 +- src/cdp/CDP.zig | 121 +++- src/cdp/domains/emulation.zig | 2 +- src/cdp/domains/fetch.zig | 8 +- src/cdp/testing.zig | 33 +- src/lightpanda.zig | 19 +- src/main.zig | 9 +- src/main_legacy_test.zig | 6 +- src/mcp/Server.zig | 15 +- src/network/Network.zig | 40 +- .../{websocket.zig => WsConnection.zig} | 685 +++++++++++------- src/testing.zig | 7 +- 21 files changed, 724 insertions(+), 797 deletions(-) rename src/network/{websocket.zig => WsConnection.zig} (51%) diff --git a/src/Server.zig b/src/Server.zig index d99347dc..0b5d8ef5 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -20,10 +20,10 @@ const std = @import("std"); const lp = @import("lightpanda"); const App = @import("App.zig"); -const Config = @import("Config.zig"); const CDP = @import("cdp/CDP.zig"); -const Net = @import("network/websocket.zig"); -const HttpClient = @import("browser/HttpClient.zig"); +const Config = @import("Config.zig"); +const CDPClient = @import("./browser/HttpClient.zig").CDPClient; +const WsConnection = @import("network/WsConnection.zig"); const log = lp.log; const net = std.net; @@ -33,29 +33,29 @@ const Allocator = std.mem.Allocator; const Server = @This(); app: *App, -allocator: Allocator, json_version_response: []const u8, // Thread management active_threads: std.atomic.Value(u32) = .init(0), -clients: std.ArrayList(*Client) = .{}, -client_mutex: std.Thread.Mutex = .{}, -clients_pool: std.heap.MemoryPool(Client), +pending: std.ArrayList(*CDP) = .{}, + +conns: std.ArrayList(*CDP) = .{}, +conns_mutex: std.Thread.Mutex = .{}, +conns_pool: std.heap.MemoryPool(CDP), pub fn init(app: *App, address: net.Address) !*Server { - const allocator = app.allocator; const json_version_response = try buildJSONVersionResponse(app); - errdefer allocator.free(json_version_response); + errdefer app.allocator.free(json_version_response); - const self = try allocator.create(Server); - errdefer allocator.destroy(self); + const self = try app.allocator.create(Server); + errdefer app.allocator.destroy(self); self.* = .{ .app = app, - .allocator = allocator, + .conns_pool = .init(app.allocator), .json_version_response = json_version_response, - .clients_pool = std.heap.MemoryPool(Client).init(allocator), }; + errdefer self.conns_pool.deinit(); var bound_address = address; try self.app.network.bind(&bound_address, self, onAccept); @@ -65,21 +65,34 @@ pub fn init(app: *App, address: net.Address) !*Server { } pub fn shutdown(self: *Server) void { - self.client_mutex.lock(); - defer self.client_mutex.unlock(); + self.conns_mutex.lock(); + defer self.conns_mutex.unlock(); - for (self.clients.items) |client| { - client.stop(); + self.app.network.unbind(); + + for (self.conns.items) |cdp| { + cdp.browser.env.terminate(); + cdp.ws.sendClose(); + cdp.ws.shutdown(); + } + + for (self.pending.items) |conn| { + conn.ws.shutdown(); } } pub fn deinit(self: *Server) void { self.shutdown(); - self.joinThreads(); - self.clients.deinit(self.allocator); - self.clients_pool.deinit(); - self.allocator.free(self.json_version_response); - self.allocator.destroy(self); + + while (self.active_threads.load(.monotonic) > 0) { + std.Thread.sleep(10 * std.time.ns_per_ms); + } + + self.conns.deinit(self.app.allocator); + self.pending.deinit(self.app.allocator); + self.conns_pool.deinit(); + self.app.allocator.free(self.json_version_response); + self.app.allocator.destroy(self); } fn onAccept(ctx: *anyopaque, socket: posix.socket_t) void { @@ -90,102 +103,6 @@ fn onAccept(ctx: *anyopaque, socket: posix.socket_t) void { }; } -// Liveness is enforced at the TCP layer via keepalive probes sent by the -// kernel. This is transparent to CDP clients — unlike a WebSocket ping, which -// go-rod panics on and chromedp logs as "malformed". Tunables in Config.zig. -fn setTcpKeepalive(socket: posix.socket_t) void { - posix.setsockopt(socket, posix.SOL.SOCKET, posix.SO.KEEPALIVE, &std.mem.toBytes(@as(c_int, 1))) catch |err| { - log.warn(.app, "SO_KEEPALIVE", .{ .err = err }); - return; - }; - - const option = switch (@import("builtin").os.tag) { - .macos, .ios => posix.TCP.KEEPALIVE, - else => posix.TCP.KEEPIDLE, - }; - - posix.setsockopt(socket, posix.IPPROTO.TCP, option, &std.mem.toBytes(Config.CDP_KEEPALIVE_IDLE_S)) catch |err| { - log.warn(.app, "TCP_KEEPIDLE", .{ .err = err }); - }; - - if (@hasDecl(posix.TCP, "KEEPINTVL")) { - posix.setsockopt(socket, posix.IPPROTO.TCP, posix.TCP.KEEPINTVL, &std.mem.toBytes(Config.CDP_KEEPALIVE_INTVL_S)) catch |err| { - log.warn(.app, "TCP_KEEPINTVL", .{ .err = err }); - }; - } - if (@hasDecl(posix.TCP, "KEEPCNT")) { - posix.setsockopt(socket, posix.IPPROTO.TCP, posix.TCP.KEEPCNT, &std.mem.toBytes(Config.CDP_KEEPALIVE_CNT)) catch |err| { - log.warn(.app, "TCP_KEEPCNT", .{ .err = err }); - }; - } -} - -fn handleConnection(self: *Server, socket: posix.socket_t) void { - defer posix.close(socket); - - setTcpKeepalive(socket); - - // Client is HUGE (> 512KB) because it has a large read buffer. - // V8 crashes if this is on the stack (likely related to its size). - const client = self.getClient() catch |err| { - log.err(.app, "CDP client create", .{ .err = err }); - return; - }; - defer self.releaseClient(client); - - client.* = Client.init( - socket, - self.allocator, - self.app, - self.json_version_response, - ) catch |err| { - log.err(.app, "CDP client init", .{ .err = err }); - return; - }; - defer client.deinit(); - - self.registerClient(client); - defer self.unregisterClient(client); - - // Check shutdown after registering to avoid missing the stop signal. - // If deinit() already iterated over clients, this client won't receive stop() - // and would block joinThreads() indefinitely. - if (self.app.shutdown()) { - return; - } - - client.start(); -} - -fn getClient(self: *Server) !*Client { - self.client_mutex.lock(); - defer self.client_mutex.unlock(); - return self.clients_pool.create(); -} - -fn releaseClient(self: *Server, client: *Client) void { - self.client_mutex.lock(); - defer self.client_mutex.unlock(); - self.clients_pool.destroy(client); -} - -fn registerClient(self: *Server, client: *Client) void { - self.client_mutex.lock(); - defer self.client_mutex.unlock(); - self.clients.append(self.allocator, client) catch {}; -} - -fn unregisterClient(self: *Server, client: *Client) void { - self.client_mutex.lock(); - defer self.client_mutex.unlock(); - for (self.clients.items, 0..) |c, i| { - if (c == client) { - _ = self.clients.swapRemove(i); - break; - } - } -} - fn spawnWorker(self: *Server, socket: posix.socket_t) !void { if (self.app.shutdown()) { return error.ShuttingDown; @@ -213,300 +130,109 @@ fn spawnWorker(self: *Server, socket: posix.socket_t) !void { } errdefer _ = self.active_threads.fetchSub(1, .monotonic); - const thread = try std.Thread.spawn(.{}, runWorker, .{ self, socket }); + const thread = try std.Thread.spawn(.{}, handleConnection, .{ self, socket }); thread.detach(); } -fn runWorker(self: *Server, socket: posix.socket_t) void { +fn handleConnection(self: *Server, socket: posix.socket_t) void { defer _ = self.active_threads.fetchSub(1, .monotonic); - handleConnection(self, socket); -} + defer posix.close(socket); -fn joinThreads(self: *Server) void { - while (self.active_threads.load(.monotonic) > 0) { - std.Thread.sleep(10 * std.time.ns_per_ms); + // CDP is HUGE (> 512KB) because WsConnection has a large read buffer. + // V8 crashes if this is on the stack (likely related to its size). + const cdp = self.allocConn() catch |err| { + log.err(.app, "CDP alloc", .{ .err = err }); + return; + }; + defer self.releaseConn(cdp); + + cdp.init(self.app, socket, self.json_version_response) catch |err| { + log.err(.app, "CDP init", .{ .err = err }); + return; + }; + defer cdp.deinit(); + + if (log.enabled(.app, .info)) { + const client_address = cdp.ws.getAddress() catch null; + log.info(.app, "client connected", .{ .ip = client_address }); + } + + self.registerHandshake(cdp); + const handshake_result = cdp.ws.handshake(); + self.unregisterHandshake(cdp); + + const upgraded = handshake_result catch |err| { + log.err(.app, "CDP handshake", .{ .err = err }); + return; + }; + if (!upgraded) return; + + self.registerConn(cdp); + defer self.unregisterConn(cdp); + + // Check shutdown after registering to avoid missing the stop signal. + // If shutdown() already iterated over conns, this conn won't be terminated + // and would block deinit() indefinitely. + if (self.app.shutdown()) { + return; + } + + while (true) { + const next = cdp.tick() catch |err| { + log.err(.app, "cdp tick", .{ .err = err }); + return; + }; + if (!next) break; } } -// Handle exactly one TCP connection. -pub const Client = struct { - // The client is initially serving HTTP requests but, under normal circumstances - // should eventually be upgraded to a websocket connections - mode: union(enum) { - http: void, - cdp: CDP, - }, +fn registerHandshake(self: *Server, conn: *CDP) void { + self.conns_mutex.lock(); + defer self.conns_mutex.unlock(); - allocator: Allocator, - app: *App, - http: *HttpClient, - ws: Net.WsConnection, + self.pending.append(self.app.allocator, conn) catch {}; +} - pub fn init( - socket: posix.socket_t, - allocator: Allocator, - app: *App, - json_version_response: []const u8, - ) !Client { - var ws = try Net.WsConnection.init(socket, allocator, json_version_response); - errdefer ws.deinit(); +fn unregisterHandshake(self: *Server, conn: *CDP) void { + self.conns_mutex.lock(); + defer self.conns_mutex.unlock(); - if (log.enabled(.app, .info)) { - const client_address = ws.getAddress() catch null; - log.info(.app, "client connected", .{ .ip = client_address }); - } - - const http = try HttpClient.init(allocator, &app.network); - errdefer http.deinit(); - - return .{ - .allocator = allocator, - .app = app, - .http = http, - .ws = ws, - .mode = .{ .http = {} }, - }; - } - - fn stop(self: *Client) void { - switch (self.mode) { - .http => {}, - .cdp => |*cdp| { - cdp.browser.env.terminate(); - self.ws.sendClose(); - }, - } - self.ws.shutdown(); - } - - pub fn deinit(self: *Client) void { - switch (self.mode) { - .cdp => |*cdp| cdp.deinit(), - .http => {}, - } - self.ws.deinit(); - self.http.deinit(); - } - - fn start(self: *Client) void { - const http = self.http; - http.cdp_client = .{ - .socket = self.ws.socket, - .ctx = self, - .blocking_read_start = Client.blockingReadStart, - .blocking_read = Client.blockingRead, - .blocking_read_end = Client.blockingReadStop, - }; - defer http.cdp_client = null; - - self.httpLoop(http) catch |err| { - log.err(.app, "CDP client loop", .{ .err = err }); - }; - } - - fn httpLoop(self: *Client, http: *HttpClient) !void { - lp.assert(self.mode == .http, "Client.httpLoop invalid mode", .{}); - - // Liveness is enforced by TCP keepalive configured in - // Server.setTcpKeepalive; the kernel closes dead sockets, which - // surfaces as EOF/error from readSocket. The loop blocks for ~24 days - // on each poll rather than tracking app-level timeouts. Capped at - // i32-max because HttpClient.tick narrows to c_int. - const wait_ms: u32 = std.math.maxInt(i32); - - while (true) { - const status = http.tick(wait_ms) catch |err| { - log.err(.app, "http tick", .{ .err = err }); - return; - }; - if (status != .cdp_socket) continue; - - if (self.readSocket() == false) { - return; - } - - if (self.mode == .cdp) { - break; - } - } - - var cdp = &self.mode.cdp; - - while (true) { - const result = cdp.pageWait(wait_ms) catch |wait_err| switch (wait_err) { - error.NoPage => { - const status = http.tick(wait_ms) catch |err| { - log.err(.app, "http tick", .{ .err = err }); - return; - }; - if (status != .cdp_socket) continue; - if (self.readSocket() == false) { - return; - } - continue; - }, - else => return wait_err, - }; - - switch (result) { - .cdp_socket => { - if (self.readSocket() == false) { - return; - } - }, - .done => {}, - } + for (self.pending.items, 0..) |w, i| { + if (w == conn) { + _ = self.pending.swapRemove(i); + break; } } +} - fn blockingReadStart(ctx: *anyopaque) bool { - const self: *Client = @ptrCast(@alignCast(ctx)); - self.ws.setBlocking(true) catch |err| { - log.warn(.app, "CDP blockingReadStart", .{ .err = err }); - return false; - }; - return true; - } +fn allocConn(self: *Server) !*CDP { + self.conns_mutex.lock(); + defer self.conns_mutex.unlock(); + return self.conns_pool.create(); +} - fn blockingRead(ctx: *anyopaque) bool { - const self: *Client = @ptrCast(@alignCast(ctx)); - return self.readSocket(); - } +fn releaseConn(self: *Server, conn: *CDP) void { + self.conns_mutex.lock(); + defer self.conns_mutex.unlock(); + self.conns_pool.destroy(conn); +} - fn blockingReadStop(ctx: *anyopaque) bool { - const self: *Client = @ptrCast(@alignCast(ctx)); - self.ws.setBlocking(false) catch |err| { - log.warn(.app, "CDP blockingReadStop", .{ .err = err }); - return false; - }; - return true; - } +fn registerConn(self: *Server, conn: *CDP) void { + self.conns_mutex.lock(); + defer self.conns_mutex.unlock(); + self.conns.append(self.app.allocator, conn) catch {}; +} - fn readSocket(self: *Client) bool { - const n = self.ws.read() catch |err| { - log.warn(.app, "CDP read", .{ .err = err }); - return false; - }; - - if (n == 0) { - log.info(.app, "CDP disconnect", .{}); - return false; - } - - return self.processData() catch false; - } - - fn processData(self: *Client) !bool { - switch (self.mode) { - .cdp => |*cdp| return self.processWebsocketMessage(cdp), - .http => return self.processHTTPRequest(), +fn unregisterConn(self: *Server, conn: *CDP) void { + self.conns_mutex.lock(); + defer self.conns_mutex.unlock(); + for (self.conns.items, 0..) |c, i| { + if (c == conn) { + _ = self.conns.swapRemove(i); + break; } } - - fn processHTTPRequest(self: *Client) !bool { - lp.assert(self.ws.reader.pos == 0, "Client.HTTP pos", .{ .pos = self.ws.reader.pos }); - const request = self.ws.reader.buf[0..self.ws.reader.len]; - - if (request.len > Config.CDP_MAX_HTTP_REQUEST_SIZE) { - self.writeHTTPErrorResponse(413, "Request too large"); - return error.RequestTooLarge; - } - - // we're only expecting [body-less] GET requests. - if (std.mem.endsWith(u8, request, "\r\n\r\n") == false) { - // we need more data, put any more data here - return true; - } - - // the next incoming data can go to the front of our buffer - defer self.ws.reader.len = 0; - return self.handleHTTPRequest(request) catch |err| { - switch (err) { - error.NotFound => self.writeHTTPErrorResponse(404, "Not found"), - error.InvalidRequest => self.writeHTTPErrorResponse(400, "Invalid request"), - error.InvalidProtocol => self.writeHTTPErrorResponse(400, "Invalid HTTP protocol"), - error.MissingHeaders => self.writeHTTPErrorResponse(400, "Missing required header"), - error.InvalidUpgradeHeader => self.writeHTTPErrorResponse(400, "Unsupported upgrade type"), - error.InvalidVersionHeader => self.writeHTTPErrorResponse(400, "Invalid websocket version"), - error.InvalidConnectionHeader => self.writeHTTPErrorResponse(400, "Invalid connection header"), - else => { - log.err(.app, "server 500", .{ .err = err, .req = request[0..@min(100, request.len)] }); - self.writeHTTPErrorResponse(500, "Internal Server Error"); - }, - } - return err; - }; - } - - fn handleHTTPRequest(self: *Client, request: []u8) !bool { - if (request.len < 18) { - // 18 is [generously] the smallest acceptable HTTP request - return error.InvalidRequest; - } - - if (std.mem.eql(u8, request[0..4], "GET ") == false) { - return error.NotFound; - } - - const url_end = std.mem.indexOfScalarPos(u8, request, 4, ' ') orelse { - return error.InvalidRequest; - }; - - const url = request[4..url_end]; - - if (std.mem.eql(u8, url, "/")) { - try self.upgradeConnection(request); - return true; - } - - if (std.mem.eql(u8, url, "/json/version") or std.mem.eql(u8, url, "/json/version/")) { - try self.ws.send(self.ws.json_version_response); - // Chromedp (a Go driver) does an http request to /json/version - // then to / (websocket upgrade) using a different connection. - // Since we only allow 1 connection at a time, the 2nd one (the - // websocket upgrade) blocks until the first one times out. - // We can avoid that by closing the connection. json_version_response - // has a Connection: Close header too. - self.ws.shutdown(); - return false; - } - - if (std.mem.eql(u8, url, "/json/list") or std.mem.eql(u8, url, "/json/list/") or - std.mem.eql(u8, url, "/json") or std.mem.eql(u8, url, "/json/")) - { - try self.ws.send(empty_json_list_response); - self.ws.shutdown(); - return false; - } - - return error.NotFound; - } - - fn upgradeConnection(self: *Client, request: []u8) !void { - try self.ws.upgrade(request); - self.mode = .{ .cdp = try CDP.init(self) }; - } - - fn writeHTTPErrorResponse(self: *Client, comptime status: u16, comptime body: []const u8) void { - self.ws.sendHttpError(status, body); - } - - fn processWebsocketMessage(self: *Client, cdp: *CDP) !bool { - return self.ws.processMessages(cdp); - } - - pub fn sendAllocator(self: *Client) Allocator { - return self.ws.send_arena.allocator(); - } - - pub fn sendJSON(self: *Client, message: anytype, opts: std.json.Stringify.Options) !void { - return self.ws.sendJSON(message, opts); - } - - pub fn sendJSONRaw(self: *Client, buf: std.ArrayList(u8)) !void { - return self.ws.sendJSONRaw(buf); - } -}; +} // Utils // -------- @@ -545,13 +271,6 @@ fn buildJSONVersionResponse( return try std.fmt.allocPrint(app.allocator, response_format, .{ body_len, host, port }); } -const empty_json_list_response = - "HTTP/1.1 200 OK\r\n" ++ - "Content-Length: 2\r\n" ++ - "Connection: Close\r\n" ++ - "Content-Type: application/json; charset=UTF-8\r\n\r\n" ++ - "[]"; - pub const timestamp = @import("datetime.zig").timestamp; pub const milliTimestamp = @import("datetime.zig").milliTimestamp; @@ -901,7 +620,7 @@ fn createTestClient() !TestClient { const TestClient = struct { stream: std.net.Stream, buf: [1024]u8 = undefined, - reader: Net.Reader(false), + reader: WsConnection.Reader(false), fn deinit(self: *TestClient) void { self.stream.close(); @@ -968,7 +687,7 @@ const TestClient = struct { "Sec-Websocket-Accept: flzHu2DevQ2dSCSVqKSii5e9C2o=\r\n\r\n", res); } - fn readWebsocketMessage(self: *TestClient) !?Net.Message { + fn readWebsocketMessage(self: *TestClient) !?WsConnection.Message { while (true) { const n = try self.stream.read(self.reader.readBuf()); if (n == 0) { diff --git a/src/browser/Browser.zig b/src/browser/Browser.zig index b1490fcb..c29c796e 100644 --- a/src/browser/Browser.zig +++ b/src/browser/Browser.zig @@ -40,37 +40,38 @@ app: *App, session: ?Session, allocator: Allocator, arena_pool: *ArenaPool, -http_client: *HttpClient, +http_client: HttpClient, // used by sessions to allocate pages. page_pool: std.heap.MemoryPool(Page), const InitOpts = struct { env: js.Env.InitOpts = .{}, - http_client: *HttpClient, }; -pub fn init(app: *App, opts: InitOpts) !Browser { +pub fn init(self: *Browser, app: *App, opts: InitOpts, cdp_client: ?HttpClient.CDPClient) !void { const allocator = app.allocator; var env = try js.Env.init(app, opts.env); errdefer env.deinit(); - return .{ + self.* = .{ .app = app, .env = env, .session = null, .allocator = allocator, .arena_pool = &app.arena_pool, - .http_client = opts.http_client, + .http_client = undefined, .page_pool = std.heap.MemoryPool(Page).init(allocator), }; + try self.http_client.init(allocator, &app.network, cdp_client); } pub fn deinit(self: *Browser) void { self.closeSession(); self.env.deinit(); self.page_pool.deinit(); + self.http_client.deinit(); } pub fn newSession(self: *Browser, notification: *Notification) !*Session { diff --git a/src/browser/Frame.zig b/src/browser/Frame.zig index 2c63e3af..8f2b52a9 100644 --- a/src/browser/Frame.zig +++ b/src/browser/Frame.zig @@ -325,7 +325,7 @@ pub fn init(self: *Frame, frame_id: u32, page: *Page, parent: ?*Frame) !void { errdefer self._style_manager.deinit(); const browser = session.browser; - self._script_manager = ScriptManager.init(browser.allocator, browser.http_client, self); + self._script_manager = ScriptManager.init(browser.allocator, &browser.http_client, self); errdefer self._script_manager.deinit(); self.js = try browser.env.createContext(self, .{ @@ -605,7 +605,7 @@ pub fn navigate(self: *Frame, request_url: [:0]const u8, opts: NavigateOpts) !vo return; } - var http_client = session.browser.http_client; + const http_client = &session.browser.http_client; self.url = try self.arena.dupeZ(u8, request_url); self.origin = try URL.getOrigin(self.arena, self.url); diff --git a/src/browser/HttpClient.zig b/src/browser/HttpClient.zig index 3423bdcd..fe619020 100644 --- a/src/browser/HttpClient.zig +++ b/src/browser/HttpClient.zig @@ -166,23 +166,21 @@ pub const CDPClient = struct { blocking_read_end: *const fn (*anyopaque) bool, }; -pub fn init(allocator: Allocator, network: *Network) !*Client { +pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp_client: ?CDPClient) !void { var transfer_pool = std.heap.MemoryPool(Transfer).init(allocator); errdefer transfer_pool.deinit(); - const client = try allocator.create(Client); - errdefer allocator.destroy(client); - var handles = try http.Handles.init(network.config); errdefer handles.deinit(); const http_proxy = network.config.httpProxy(); - client.* = .{ + self.* = Client{ .handles = handles, .network = network, .allocator = allocator, .transfer_pool = transfer_pool, + .cdp_client = cdp_client, .use_proxy = http_proxy != null, .http_proxy = http_proxy, @@ -197,25 +195,23 @@ pub fn init(allocator: Allocator, network: *Network) !*Client { .entry_layer = undefined, }; - var next = client.layer(); + var next = self.layer(); if (network.config.obeyRobots()) { - next = layerWith(&client.robots_layer, next); + next = layerWith(&self.robots_layer, next); } if (network.config.httpCacheDir() != null) { - next = layerWith(&client.cache_layer, next); + next = layerWith(&self.cache_layer, next); } - next = layerWith(&client.interception_layer, next); + next = layerWith(&self.interception_layer, next); if (network.config.webBotAuth() != null) { - next = layerWith(&client.web_bot_auth_layer, next); + next = layerWith(&self.web_bot_auth_layer, next); } - client.entry_layer = next; - - return client; + self.entry_layer = next; } pub fn deinit(self: *Client) void { @@ -226,8 +222,6 @@ pub fn deinit(self: *Client) void { self.clearUserAgentOverride(); self.robots_layer.deinit(self.allocator); - - self.allocator.destroy(self); } pub fn layer(self: *Client) Layer { diff --git a/src/browser/Runner.zig b/src/browser/Runner.zig index d7a66e64..009235bb 100644 --- a/src/browser/Runner.zig +++ b/src/browser/Runner.zig @@ -45,7 +45,7 @@ pub fn init(session: *Session, _: Opts) !Runner { return .{ .frame = frame, .session = session, - .http_client = session.browser.http_client, + .http_client = &session.browser.http_client, }; } diff --git a/src/browser/webapi/Worker.zig b/src/browser/webapi/Worker.zig index b1ad43bd..5a271793 100644 --- a/src/browser/webapi/Worker.zig +++ b/src/browser/webapi/Worker.zig @@ -92,7 +92,7 @@ pub fn init(url: []const u8, exec: *Execution) !*Worker { return self; } - const http_client = session.browser.http_client; + const http_client = &session.browser.http_client; http_client.request(.{ .ctx = self, .params = .{ diff --git a/src/browser/webapi/WorkerGlobalScope.zig b/src/browser/webapi/WorkerGlobalScope.zig index 1d5e203b..1034f0e6 100644 --- a/src/browser/webapi/WorkerGlobalScope.zig +++ b/src/browser/webapi/WorkerGlobalScope.zig @@ -130,7 +130,7 @@ pub fn init(worker: *Worker, url: [:0]const u8) !*WorkerGlobalScope { self._script_manager = ScriptManagerBase.init( arena, - session.browser.http_client, + &session.browser.http_client, .{ .worker = self }, ); @@ -361,7 +361,7 @@ fn importScript(self: *WorkerGlobalScope, arena: Allocator, url: [:0]const u8) ! const resolved_url = try URL.resolve(arena, self.url, url, .{}); - const http_client = session.browser.http_client; + const http_client = &session.browser.http_client; var headers = try http_client.newHeaders(); try self.headersForRequest(&headers); diff --git a/src/browser/webapi/net/Fetch.zig b/src/browser/webapi/net/Fetch.zig index e6c71a9f..6fe47ad7 100644 --- a/src/browser/webapi/net/Fetch.zig +++ b/src/browser/webapi/net/Fetch.zig @@ -77,7 +77,7 @@ pub fn init(input: Input, options: ?InitOpts, exec: *const Execution) !js.Promis }; const session = exec.context.page.session; - const http_client = session.browser.http_client; + const http_client = &session.browser.http_client; var headers = try http_client.newHeaders(); if (request._headers) |h| { try h.populateHttpHeader(exec.call_arena, &headers); diff --git a/src/browser/webapi/net/WebSocket.zig b/src/browser/webapi/net/WebSocket.zig index 0b49b693..4677aed8 100644 --- a/src/browser/webapi/net/WebSocket.zig +++ b/src/browser/webapi/net/WebSocket.zig @@ -115,7 +115,7 @@ pub fn init(url: []const u8, protocols: [][]const u8, frame: *Frame) !*WebSocket const resolved_url = try URL.resolve(arena, frame.base(), url, .{ .always_dupe = true, .encoding = frame.charset }); - const http_client = frame._session.browser.http_client; + const http_client = &frame._session.browser.http_client; const conn = http_client.network.newConnection() orelse { return error.NoFreeConnection; }; diff --git a/src/browser/webapi/net/XMLHttpRequest.zig b/src/browser/webapi/net/XMLHttpRequest.zig index 31a72cd0..3c5031f9 100644 --- a/src/browser/webapi/net/XMLHttpRequest.zig +++ b/src/browser/webapi/net/XMLHttpRequest.zig @@ -242,7 +242,7 @@ pub fn send(self: *XMLHttpRequest, body_: ?[]const u8) !void { } const session = exec.context.page.session; - const http_client = session.browser.http_client; + const http_client = &session.browser.http_client; var headers = try http_client.newHeaders(); // Only add cookies for same-origin or when withCredentials is true diff --git a/src/cdp/CDP.zig b/src/cdp/CDP.zig index 94b99edf..a56c5f30 100644 --- a/src/cdp/CDP.zig +++ b/src/cdp/CDP.zig @@ -19,8 +19,8 @@ const std = @import("std"); const lp = @import("lightpanda"); +const App = @import("../App.zig"); const Notification = @import("../Notification.zig"); -const Client = @import("../Server.zig").Client; const js = @import("../browser/js/js.zig"); const Browser = @import("../browser/Browser.zig"); const Session = @import("../browser/Session.zig"); @@ -29,12 +29,15 @@ const Mime = @import("../browser/Mime.zig"); const Element = @import("../browser/webapi/Element.zig"); const Label = @import("../browser/webapi/element/html/Label.zig"); const Request = @import("../browser/HttpClient.zig").Request; +const CDPClient = @import("../browser/HttpClient.zig").CDPClient; +const WsConnection = @import("../network/WsConnection.zig"); const Incrementing = @import("id.zig").Incrementing; const InterceptState = @import("domains/fetch.zig").InterceptState; const log = lp.log; const json = std.json; +const posix = std.posix; const Allocator = std.mem.Allocator; pub const URL_BASE = "chrome://newtab/"; @@ -48,10 +51,10 @@ const BrowserContextIdGen = Incrementing(u32, "BID"); // Generic so that we can inject mocks into it. const CDP = @This(); -// Used for sending message to the client and closing on error -client: *Client, - allocator: Allocator, +app: *App, + +ws: WsConnection, // The active browser browser: Browser, @@ -79,18 +82,18 @@ frame_arena: std.heap.ArenaAllocator, // (or altogether eliminate) our use of this. browser_context_arena: std.heap.ArenaAllocator, -pub fn init(client: *Client) !CDP { - const app = client.app; +pub fn init( + self: *CDP, + app: *App, + socket: posix.socket_t, + json_version_response: []const u8, +) !void { const allocator = app.allocator; - const browser = try Browser.init(app, .{ - .env = .{ .with_inspector = true }, - .http_client = client.http, - }); - errdefer browser.deinit(); - return .{ - .client = client, - .browser = browser, + self.* = .{ + .app = app, + .ws = undefined, + .browser = undefined, .allocator = allocator, .browser_context = null, .frame_arena = std.heap.ArenaAllocator.init(allocator), @@ -98,6 +101,17 @@ pub fn init(client: *Client) !CDP { .notification_arena = std.heap.ArenaAllocator.init(allocator), .browser_context_arena = std.heap.ArenaAllocator.init(allocator), }; + + try self.ws.init(socket, self.app.allocator, json_version_response); + errdefer self.ws.deinit(); + + try self.browser.init(app, .{ .env = .{ .with_inspector = true } }, .{ + .ctx = self, + .socket = socket, + .blocking_read_start = CDP.blockingReadStart, + .blocking_read = CDP.blockingRead, + .blocking_read_end = CDP.blockingReadStop, + }); } pub fn deinit(self: *CDP) void { @@ -109,6 +123,48 @@ pub fn deinit(self: *CDP) void { self.message_arena.deinit(); self.notification_arena.deinit(); self.browser_context_arena.deinit(); + self.ws.deinit(); +} + +pub fn blockingReadStart(ctx: *anyopaque) bool { + const self: *CDP = @ptrCast(@alignCast(ctx)); + self.ws.setBlocking(true) catch |err| { + log.warn(.app, "CDP blockingReadStart", .{ .err = err }); + return false; + }; + return true; +} + +pub fn blockingRead(ctx: *anyopaque) bool { + const self: *CDP = @ptrCast(@alignCast(ctx)); + return self.readSocket(); +} + +pub fn blockingReadStop(ctx: *anyopaque) bool { + const self: *CDP = @ptrCast(@alignCast(ctx)); + self.ws.setBlocking(false) catch |err| { + log.warn(.app, "CDP blockingReadStop", .{ .err = err }); + return false; + }; + return true; +} + +pub fn readSocket(self: *CDP) bool { + const n = self.ws.read() catch |err| { + log.warn(.app, "CDP read", .{ .err = err }); + return false; + }; + + if (n == 0) { + log.info(.app, "CDP disconnect", .{}); + return false; + } + + return self.ws.processMessages(self) catch false; +} + +pub fn sendJSON(self: *CDP, message: anytype) !void { + try self.ws.sendJSON(message, .{ .emit_null_optional_fields = false }); } pub fn handleMessage(self: *CDP, msg: []const u8) bool { @@ -133,6 +189,29 @@ pub fn pageWait(self: *CDP, ms: u32) !Session.Runner.CDPWaitResult { return runner.waitCDP(.{ .ms = ms }); } +pub fn tick(self: *CDP) !bool { + // Liveness is enforced by TCP keepalive configured in + // Network.acceptConnections; the wakeup lets V8 run or terminate. + const wait_ms: u32 = 1000; // 1s + + const result = self.pageWait(wait_ms) catch |wait_err| switch (wait_err) { + error.NoPage => { + const status = self.browser.http_client.tick(wait_ms) catch |err| { + log.err(.app, "http tick", .{ .err = err }); + return false; + }; + return status != .cdp_socket or self.readSocket(); + }, + else => return wait_err, + }; + + if (result == .cdp_socket) { + return self.readSocket(); + } + + return true; +} + // Called from above, in processMessage which handles client messages // but can also be called internally. For example, Target.sendMessageToTarget // calls back into dispatch to capture the response. @@ -303,12 +382,6 @@ pub fn sendEvent(self: *CDP, method: []const u8, p: anytype, opts: SendEventOpts }); } -pub fn sendJSON(self: *CDP, message: anytype) !void { - return self.client.sendJSON(message, .{ - .emit_null_optional_fields = false, - }); -} - pub const BrowserContext = struct { const Node = @import("Node.zig"); const AXNode = @import("AXNode.zig"); @@ -414,7 +487,7 @@ pub const BrowserContext = struct { errdefer notification.deinit(); const session = try cdp.browser.newSession(notification); - if (cdp.client.app.config.cookieFile()) |cookie_path| { + if (cdp.app.config.cookieFile()) |cookie_path| { lp.cookies.loadFromFile(session, cookie_path); } @@ -470,7 +543,7 @@ pub const BrowserContext = struct { // abort all intercepted requests before closing the session/page // since some of these might callback into the page/scriptmanager - const http_client = browser.http_client; + const http_client = &browser.http_client; for (self.intercept_state.pendingIntercepts()) |intercept| { defer { lp.assert( @@ -804,7 +877,7 @@ pub const BrowserContext = struct { }; const cdp = self.cdp; - const allocator = cdp.client.sendAllocator(); + const allocator = cdp.ws.send_arena.allocator(); const field = ",\"sessionId\":\""; @@ -830,7 +903,7 @@ pub const BrowserContext = struct { std.debug.assert(buf.items.len == message_len); } - try cdp.client.sendJSONRaw(buf); + try cdp.ws.sendJSONRaw(buf); } }; diff --git a/src/cdp/domains/emulation.zig b/src/cdp/domains/emulation.zig index e9d0a96b..4738e9ac 100644 --- a/src/cdp/domains/emulation.zig +++ b/src/cdp/domains/emulation.zig @@ -98,7 +98,7 @@ pub fn setUserAgentOverride(cmd: *CDP.Command) !void { }; const bc = cmd.browser_context orelse return error.BrowserContextNotLoaded; - const http_client = cmd.cdp.browser.http_client; + const http_client = &cmd.cdp.browser.http_client; try http_client.setUserAgentOverride(ua); bc.user_agent_changed = true; diff --git a/src/cdp/domains/fetch.zig b/src/cdp/domains/fetch.zig index edff1761..8eb3ab25 100644 --- a/src/cdp/domains/fetch.zig +++ b/src/cdp/domains/fetch.zig @@ -286,7 +286,7 @@ fn continueRequest(cmd: *CDP.Command) !void { } // todo: replace. - const client = bc.cdp.browser.http_client; + const client = &bc.cdp.browser.http_client; try client.interception_layer.continueRequest(client, request); return cmd.sendResult(null, .{}); } @@ -321,7 +321,7 @@ fn continueWithAuth(cmd: *CDP.Command) !void { .response = params.authChallengeResponse.response, }); - const client = bc.cdp.browser.http_client; + const client = &bc.cdp.browser.http_client; if (params.authChallengeResponse.response != .ProvideCredentials) { transfer.abortAuthChallenge(); @@ -385,7 +385,7 @@ fn fulfillRequest(cmd: *CDP.Command) !void { body = buf; } - const client = bc.cdp.browser.http_client; + const client = &bc.cdp.browser.http_client; try client.interception_layer.fulfillRequest(client, request, params.responseCode, params.responseHeaders orelse &.{}, body); return cmd.sendResult(null, .{}); } @@ -403,7 +403,7 @@ fn failRequest(cmd: *CDP.Command) !void { const pending = intercept_state.remove(request_id) orelse return error.RequestNotFound; const request = pending.request; - const client = bc.cdp.browser.http_client; + const client = &bc.cdp.browser.http_client; defer client.interception_layer.abortRequest(client, request); log.info(.cdp, "request intercept", .{ diff --git a/src/cdp/testing.zig b/src/cdp/testing.zig index 04489d98..db22e736 100644 --- a/src/cdp/testing.zig +++ b/src/cdp/testing.zig @@ -22,6 +22,8 @@ const posix = std.posix; const CDP = @import("CDP.zig"); const Server = @import("../Server.zig"); +const Net = @import("../network/WsConnection.zig"); +const HttpClient = @import("../browser/HttpClient.zig"); const base = @import("../testing.zig"); pub const allocator = base.allocator; @@ -37,26 +39,25 @@ pub const LogFilter = base.LogFilter; const TestContext = struct { read_at: usize = 0, read_buf: [1024 * 32]u8 = undefined, - cdp_: ?CDP = null, - client: Server.Client, + cdp_: CDP = undefined, + cdp_initialized: bool = false, + cdp_socket: posix.socket_t, socket: posix.socket_t, received: std.ArrayList(json.Value) = .empty, received_raw: std.ArrayList([]const u8) = .empty, pub fn deinit(self: *TestContext) void { - if (self.cdp_) |*c| { - c.deinit(); - } - self.client.deinit(); + if (self.cdp_initialized) self.cdp_.deinit(); posix.close(self.socket); base.reset(); } pub fn cdp(self: *TestContext) *CDP { - if (self.cdp_ == null) { - self.cdp_ = CDP.init(&self.client) catch |err| @panic(@errorName(err)); + if (!self.cdp_initialized) { + self.cdp_.init(base.test_app, self.cdp_socket, "json-version") catch |err| @panic(@errorName(err)); + self.cdp_initialized = true; } - return &self.cdp_.?; + return &self.cdp_; } const BrowserContextOpts = struct { @@ -202,12 +203,10 @@ const TestContext = struct { return; } - if (self.cdp_) |*cdp__| { - if (cdp__.browser_context) |*bc| { - if (bc.session.hasPage()) { - var runner = try bc.session.runner(.{}); - _ = try runner.tick(.{ .ms = 1000 }); - } + if (self.cdp_.browser_context) |*bc| { + if (bc.session.hasPage()) { + var runner = try bc.session.runner(.{}); + _ = try runner.tick(.{ .ms = 1000 }); } } std.Thread.sleep(5 * std.time.ns_per_ms); @@ -315,10 +314,8 @@ pub fn context() !TestContext { try posix.setsockopt(pair[1], posix.SOL.SOCKET, posix.SO.RCVBUF, &std.mem.toBytes(@as(c_int, 32_768))); try posix.setsockopt(pair[1], posix.SOL.SOCKET, posix.SO.SNDBUF, &std.mem.toBytes(@as(c_int, 32_768))); - const client = try Server.Client.init(pair[1], base.arena_allocator, base.test_app, "json-version"); - return .{ - .client = client, + .cdp_socket = pair[1], .socket = pair[0], }; } diff --git a/src/lightpanda.zig b/src/lightpanda.zig index f50e7f5d..95601503 100644 --- a/src/lightpanda.zig +++ b/src/lightpanda.zig @@ -248,29 +248,26 @@ noinline fn assertionFailure(comptime ctx: []const u8, args: anytype) noreturn { // Reference counting helper pub fn RC(comptime T: type) type { return struct { - _refs: T = 0, + _refs: std.atomic.Value(T) = .init(0), pub fn init(refs: T) @This() { - return .{ ._refs = refs }; + return .{ ._refs = .init(refs) }; } pub fn acquire(self: *@This()) void { - self._refs += 1; + _ = self._refs.fetchAdd(1, .monotonic); } pub fn release(self: *@This(), value: anytype, page: *Page) void { - assert(self._refs > 0, "release overflow", .{ .type = @typeName(@TypeOf(value)) }); - - const refs = self._refs - 1; - self._refs = refs; - if (refs > 0) { - return; + const prev = self._refs.fetchSub(1, .acq_rel); + assert(prev > 0, "release overflow", .{ .type = @typeName(@TypeOf(value)) }); + if (prev == 1) { + value.deinit(page); } - value.deinit(page); } pub fn format(self: @This(), writer: *std.Io.Writer) !void { - return writer.print("{d}", .{self._refs}); + return writer.print("{d}", .{self._refs.load(.monotonic)}); } }; } diff --git a/src/main.zig b/src/main.zig index 0a955839..450e8247 100644 --- a/src/main.zig +++ b/src/main.zig @@ -210,13 +210,8 @@ const FetchTerminator = struct { fn fetchThread(app: *App, ft: *FetchTerminator, url: [:0]const u8, fetch_opts: lp.FetchOpts) void { defer app.network.stop(); - const http_client = lp.HttpClient.init(app.allocator, &app.network) catch |err| { - log.fatal(.app, "http client init error", .{ .err = err }); - return; - }; - defer http_client.deinit(); - - var browser = lp.Browser.init(app, .{ .http_client = http_client }) catch |err| { + var browser: lp.Browser = undefined; + browser.init(app, .{}, null) catch |err| { log.fatal(.app, "browser init error", .{ .err = err }); return; }; diff --git a/src/main_legacy_test.zig b/src/main_legacy_test.zig index 8e02f4ff..6274a31c 100644 --- a/src/main_legacy_test.zig +++ b/src/main_legacy_test.zig @@ -44,10 +44,8 @@ pub fn main() !void { var test_arena = std.heap.ArenaAllocator.init(allocator); defer test_arena.deinit(); - const http_client = try lp.HttpClient.init(allocator, &app.network); - defer http_client.deinit(); - - var browser = try lp.Browser.init(app, .{ .http_client = http_client }); + var browser: lp.Browser = undefined; + try browser.init(app, .{}, null); defer browser.deinit(); const notification = try lp.Notification.init(allocator); diff --git a/src/mcp/Server.zig b/src/mcp/Server.zig index 59668398..9a5a285e 100644 --- a/src/mcp/Server.zig +++ b/src/mcp/Server.zig @@ -3,7 +3,6 @@ const std = @import("std"); const lp = @import("lightpanda"); const App = @import("../App.zig"); -const HttpClient = @import("../browser/HttpClient.zig"); const testing = @import("../testing.zig"); const protocol = @import("protocol.zig"); const router = @import("router.zig"); @@ -14,7 +13,6 @@ const Self = @This(); allocator: std.mem.Allocator, app: *App, -http_client: *HttpClient, notification: *lp.Notification, browser: lp.Browser, session: *lp.Session, @@ -25,30 +23,26 @@ mutex: std.Thread.Mutex = .{}, aw: std.io.Writer.Allocating, pub fn init(allocator: std.mem.Allocator, app: *App, writer: *std.io.Writer) !*Self { - const http_client = try HttpClient.init(allocator, &app.network); - errdefer http_client.deinit(); - const notification = try lp.Notification.init(allocator); errdefer notification.deinit(); const self = try allocator.create(Self); errdefer allocator.destroy(self); - var browser = try lp.Browser.init(app, .{ .http_client = http_client }); - errdefer browser.deinit(); - self.* = .{ .allocator = allocator, .app = app, .writer = writer, - .browser = browser, + .browser = undefined, .aw = .init(allocator), - .http_client = http_client, .notification = notification, .session = undefined, .node_registry = CDPNode.Registry.init(allocator), }; + try self.browser.init(app, .{}, null); + errdefer self.browser.deinit(); + self.session = try self.browser.newSession(self.notification); if (app.config.cookieFile()) |cookie_path| { @@ -67,7 +61,6 @@ pub fn deinit(self: *Self) void { self.aw.deinit(); self.browser.deinit(); self.notification.deinit(); - self.http_client.deinit(); self.allocator.destroy(self); } diff --git a/src/network/Network.zig b/src/network/Network.zig index 4057641c..991eada5 100644 --- a/src/network/Network.zig +++ b/src/network/Network.zig @@ -70,6 +70,7 @@ ws_mutex: std.Thread.Mutex = .{}, pollfds: []posix.pollfd, listener: ?Listener = null, +accept: std.atomic.Value(bool) = .init(true), // Wakeup pipe: workers write to [1], main thread polls [0] wakeup_pipe: [2]posix.fd_t = .{ -1, -1 }, @@ -355,6 +356,10 @@ pub fn bind( ctx: *anyopaque, on_accept: *const fn (ctx: *anyopaque, socket: posix.socket_t) void, ) !void { + if (self.listener != null) return error.TooManyListeners; + + self.accept.store(true, .release); + const flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | posix.SOCK.NONBLOCK; const listener = try posix.socket(address.any.family, flags, posix.IPPROTO.TCP); errdefer posix.close(listener); @@ -374,8 +379,6 @@ pub fn bind( try posix.getsockname(listener, @ptrCast(&bound), &bound_len); address.* = net.Address.initPosix(@ptrCast(@alignCast(&bound))); - if (self.listener != null) return error.TooManyListeners; - self.listener = .{ .socket = listener, .ctx = ctx, @@ -388,6 +391,11 @@ pub fn bind( }; } +pub fn unbind(self: *Network) void { + self.accept.store(false, .release); + self.wakeupPoll(); +} + pub fn onTick(self: *Network, ctx: *anyopaque, callback: *const fn (*anyopaque) void) void { self.callbacks_mutex.lock(); defer self.callbacks_mutex.unlock(); @@ -424,6 +432,12 @@ pub fn run(self: *Network) void { // telemetry, but we stop accepting new connections. It is the responsibility // of external code to terminate its requests upon shutdown. while (true) { + if (self.listener != null and !self.accept.load(.acquire)) { + posix.close(self.listener.?.socket); + self.listener = null; + self.pollfds[1] = .{ .fd = -1, .events = 0, .revents = 0 }; + } + self.drainQueue(); if (self.multi) |multi| { @@ -574,6 +588,28 @@ fn acceptConnections(self: *Network) void { } }; + // Liveness is enforced at the TCP layer via keepalive probes sent by the + // kernel. This is transparent to CDP clients — unlike a WebSocket ping, which + // go-rod panics on and chromedp logs as "malformed". Tunables in Config.zig. + posix.setsockopt(socket, posix.SOL.SOCKET, posix.SO.KEEPALIVE, &std.mem.toBytes(@as(c_int, 1))) catch |err| { + log.warn(.app, "SO_KEEPALIVE", .{ .err = err }); + return; + }; + + const option = switch (@import("builtin").os.tag) { + .macos, .ios => posix.TCP.KEEPALIVE, + else => posix.TCP.KEEPIDLE, + }; + posix.setsockopt(socket, posix.IPPROTO.TCP, option, &std.mem.toBytes(Config.CDP_KEEPALIVE_IDLE_S)) catch |err| { + log.warn(.app, "TCP_KEEPIDLE", .{ .err = err }); + }; + posix.setsockopt(socket, posix.IPPROTO.TCP, posix.TCP.KEEPINTVL, &std.mem.toBytes(Config.CDP_KEEPALIVE_INTVL_S)) catch |err| { + log.warn(.app, "TCP_KEEPINTVL", .{ .err = err }); + }; + posix.setsockopt(socket, posix.IPPROTO.TCP, posix.TCP.KEEPCNT, &std.mem.toBytes(Config.CDP_KEEPALIVE_CNT)) catch |err| { + log.warn(.app, "TCP_KEEPCNT", .{ .err = err }); + }; + listener.onAccept(listener.ctx, socket); } } diff --git a/src/network/websocket.zig b/src/network/WsConnection.zig similarity index 51% rename from src/network/websocket.zig rename to src/network/WsConnection.zig index 4ecb5daa..d4598904 100644 --- a/src/network/websocket.zig +++ b/src/network/WsConnection.zig @@ -24,7 +24,8 @@ const ArenaAllocator = std.heap.ArenaAllocator; const log = @import("lightpanda").log; const assert = @import("lightpanda").assert; -const CDP_MAX_MESSAGE_SIZE = @import("../Config.zig").CDP_MAX_MESSAGE_SIZE; +const Config = @import("../Config.zig"); +const CDP_MAX_MESSAGE_SIZE = Config.CDP_MAX_MESSAGE_SIZE; const Fragments = struct { type: Message.Type, @@ -305,301 +306,429 @@ pub fn Reader(comptime EXPECT_MASK: bool) type { }; } -pub const WsConnection = struct { - // CLOSE, 2 length, code - const CLOSE_NORMAL = [_]u8{ 136, 2, 3, 232 }; // code: 1000 - const CLOSE_GOING_AWAY = [_]u8{ 136, 2, 3, 233 }; // code: 1001 - const CLOSE_TOO_BIG = [_]u8{ 136, 2, 3, 241 }; // 1009 - const CLOSE_PROTOCOL_ERROR = [_]u8{ 136, 2, 3, 234 }; //code: 1002 - // "private-use" close codes must be from 4000-49999 - const CLOSE_TIMEOUT = [_]u8{ 136, 2, 15, 160 }; // code: 4000 +pub const WsConnection = @This(); +// CLOSE, 2 length, code +const CLOSE_NORMAL = [_]u8{ 136, 2, 3, 232 }; // code: 1000 +const CLOSE_GOING_AWAY = [_]u8{ 136, 2, 3, 233 }; // code: 1001 +const CLOSE_TOO_BIG = [_]u8{ 136, 2, 3, 241 }; // 1009 +const CLOSE_PROTOCOL_ERROR = [_]u8{ 136, 2, 3, 234 }; //code: 1002 +// "private-use" close codes must be from 4000-49999 +const CLOSE_TIMEOUT = [_]u8{ 136, 2, 15, 160 }; // code: 4000 + +socket: posix.socket_t, +socket_flags: usize, +reader: Reader(true), +send_arena: ArenaAllocator, +json_version_response: []const u8, + +pub fn init( + self: *WsConnection, socket: posix.socket_t, - socket_flags: usize, - reader: Reader(true), - send_arena: ArenaAllocator, + allocator: Allocator, json_version_response: []const u8, +) !void { + const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0); + const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true })); + if (builtin.is_test == false) { + assert(socket_flags & nonblocking == nonblocking, "WsConnection.init blocking", .{}); + } - pub fn init(socket: posix.socket_t, allocator: Allocator, json_version_response: []const u8) !WsConnection { - const socket_flags = try posix.fcntl(socket, posix.F.GETFL, 0); - const nonblocking = @as(u32, @bitCast(posix.O{ .NONBLOCK = true })); - if (builtin.is_test == false) { - assert(socket_flags & nonblocking == nonblocking, "WsConnection.init blocking", .{}); - } + var reader = try Reader(true).init(allocator); + errdefer reader.deinit(); - var reader = try Reader(true).init(allocator); - errdefer reader.deinit(); + self.* = .{ + .socket = socket, + .socket_flags = socket_flags, + .reader = reader, + .send_arena = ArenaAllocator.init(allocator), + .json_version_response = json_version_response, + }; +} - return .{ - .socket = socket, - .socket_flags = socket_flags, - .reader = reader, - .send_arena = ArenaAllocator.init(allocator), - .json_version_response = json_version_response, +pub fn deinit(self: *WsConnection) void { + self.reader.deinit(); + self.send_arena.deinit(); +} + +pub fn send(self: *WsConnection, data: []const u8) !void { + var pos: usize = 0; + var changed_to_blocking: bool = false; + defer _ = self.send_arena.reset(.{ .retain_with_limit = 1024 * 32 }); + + defer if (changed_to_blocking) { + // We had to change our socket to blocking me to get our write out + // We need to change it back to non-blocking. + _ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags) catch |err| { + log.err(.app, "ws restore nonblocking", .{ .err = err }); }; - } + }; - pub fn deinit(self: *WsConnection) void { - self.reader.deinit(); - self.send_arena.deinit(); - } - - pub fn send(self: *WsConnection, data: []const u8) !void { - var pos: usize = 0; - var changed_to_blocking: bool = false; - defer _ = self.send_arena.reset(.{ .retain_with_limit = 1024 * 32 }); - - defer if (changed_to_blocking) { - // We had to change our socket to blocking me to get our write out - // We need to change it back to non-blocking. - _ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags) catch |err| { - log.err(.app, "ws restore nonblocking", .{ .err = err }); - }; + LOOP: while (pos < data.len) { + const written = posix.write(self.socket, data[pos..]) catch |err| switch (err) { + error.WouldBlock => { + // self.socket is nonblocking, because we don't want to block + // reads. But our life is a lot easier if we block writes, + // largely, because we don't have to maintain a queue of pending + // writes (which would each need their own allocations). So + // if we get a WouldBlock error, we'll switch the socket to + // blocking and switch it back to non-blocking after the write + // is complete. Doesn't seem particularly efficiently, but + // this should virtually never happen. + assert(changed_to_blocking == false, "WsConnection.double block", .{}); + changed_to_blocking = true; + _ = try posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true }))); + continue :LOOP; + }, + else => return err, }; - LOOP: while (pos < data.len) { - const written = posix.write(self.socket, data[pos..]) catch |err| switch (err) { - error.WouldBlock => { - // self.socket is nonblocking, because we don't want to block - // reads. But our life is a lot easier if we block writes, - // largely, because we don't have to maintain a queue of pending - // writes (which would each need their own allocations). So - // if we get a WouldBlock error, we'll switch the socket to - // blocking and switch it back to non-blocking after the write - // is complete. Doesn't seem particularly efficiently, but - // this should virtually never happen. - assert(changed_to_blocking == false, "WsConnection.double block", .{}); - changed_to_blocking = true; - _ = try posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true }))); - continue :LOOP; - }, - else => return err, - }; - - if (written == 0) { - return error.Closed; - } - pos += written; + if (written == 0) { + return error.Closed; } + pos += written; } +} - const EMPTY_PONG = [_]u8{ 138, 0 }; +const EMPTY_PONG = [_]u8{ 138, 0 }; - fn sendPong(self: *WsConnection, data: []const u8) !void { - if (data.len == 0) { - return self.send(&EMPTY_PONG); +fn sendPong(self: *WsConnection, data: []const u8) !void { + if (data.len == 0) { + return self.send(&EMPTY_PONG); + } + var header_buf: [10]u8 = undefined; + const header = websocketHeader(&header_buf, .pong, data.len); + + const allocator = self.send_arena.allocator(); + const framed = try allocator.alloc(u8, header.len + data.len); + @memcpy(framed[0..header.len], header); + @memcpy(framed[header.len..], data); + return self.send(framed); +} + +// called by CDP +// Websocket frames have a variable length header. For server-client, +// it could be anywhere from 2 to 10 bytes. Our IO.Loop doesn't have +// writev, so we need to get creative. We'll JSON serialize to a +// buffer, where the first 10 bytes are reserved. We can then backfill +// the header and send the slice. +pub fn sendJSON(self: *WsConnection, message: anytype, opts: std.json.Stringify.Options) !void { + const allocator = self.send_arena.allocator(); + + var aw = try std.Io.Writer.Allocating.initCapacity(allocator, 512); + + // reserve space for the maximum possible header + try aw.writer.writeAll(&[_]u8{0} ** 10); + try std.json.Stringify.value(message, opts, &aw.writer); + const framed = fillWebsocketHeader(aw.toArrayList()); + return self.send(framed); +} + +pub fn sendJSONRaw( + self: *WsConnection, + buf: std.ArrayList(u8), +) !void { + // Dangerous API!. We assume the caller has reserved the first 10 + // bytes in `buf`. + const framed = fillWebsocketHeader(buf); + return self.send(framed); +} + +pub const HttpResult = enum { more, upgraded, close }; + +pub fn handshake(self: *WsConnection) !bool { + // Liveness is enforced by TCP keepalive configured in + // Server.setTcpKeepalive; a dead peer surfaces as a poll error or + // EOF from read(). The poll blocks for ~24 days rather than tracking + // an app-level timeout. Capped at i32-max because posix.poll narrows + // to c_int. + const wait_ms: i32 = std.math.maxInt(i32); + while (true) { + var pfds = [_]posix.pollfd{.{ + .fd = self.socket, + .events = posix.POLL.IN, + .revents = 0, + }}; + const n = try posix.poll(&pfds, wait_ms); + if (n == 0) { + log.info(.app, "CDP timeout", .{}); + return false; } - var header_buf: [10]u8 = undefined; - const header = websocketHeader(&header_buf, .pong, data.len); - - const allocator = self.send_arena.allocator(); - const framed = try allocator.alloc(u8, header.len + data.len); - @memcpy(framed[0..header.len], header); - @memcpy(framed[header.len..], data); - return self.send(framed); - } - - // called by CDP - // Websocket frames have a variable length header. For server-client, - // it could be anywhere from 2 to 10 bytes. Our IO.Loop doesn't have - // writev, so we need to get creative. We'll JSON serialize to a - // buffer, where the first 10 bytes are reserved. We can then backfill - // the header and send the slice. - pub fn sendJSON(self: *WsConnection, message: anytype, opts: std.json.Stringify.Options) !void { - const allocator = self.send_arena.allocator(); - - var aw = try std.Io.Writer.Allocating.initCapacity(allocator, 512); - - // reserve space for the maximum possible header - try aw.writer.writeAll(&.{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }); - try std.json.Stringify.value(message, opts, &aw.writer); - const framed = fillWebsocketHeader(aw.toArrayList()); - return self.send(framed); - } - - pub fn sendJSONRaw( - self: *WsConnection, - buf: std.ArrayList(u8), - ) !void { - // Dangerous API!. We assume the caller has reserved the first 10 - // bytes in `buf`. - const framed = fillWebsocketHeader(buf); - return self.send(framed); - } - - pub fn read(self: *WsConnection) !usize { - const n = try posix.read(self.socket, self.reader.readBuf()); - self.reader.len += n; - return n; - } - - pub fn processMessages(self: *WsConnection, handler: anytype) !bool { - var reader = &self.reader; - while (true) { - const msg = reader.next() catch |err| { - switch (err) { - error.TooLarge => self.send(&CLOSE_TOO_BIG) catch {}, - error.NotMasked => self.send(&CLOSE_PROTOCOL_ERROR) catch {}, - error.ReservedFlags => self.send(&CLOSE_PROTOCOL_ERROR) catch {}, - error.InvalidMessageType => self.send(&CLOSE_PROTOCOL_ERROR) catch {}, - error.ControlTooLarge => self.send(&CLOSE_PROTOCOL_ERROR) catch {}, - error.InvalidContinuation => self.send(&CLOSE_PROTOCOL_ERROR) catch {}, - error.NestedFragmentation => self.send(&CLOSE_PROTOCOL_ERROR) catch {}, - error.OutOfMemory => {}, // don't borther trying to send an error in this case - } - return err; - } orelse break; - - switch (msg.type) { - .pong => {}, - .ping => try self.sendPong(msg.data), - .close => { - self.send(&CLOSE_NORMAL) catch {}; - return false; - }, - .text, .binary => if (handler.handleMessage(msg.data) == false) { - return false; - }, - } - if (msg.cleanup_fragment) { - reader.cleanup(); - } - } - - // We might have read part of the next message. Our reader potentially - // has to move data around in its buffer to make space. - reader.compact(); - return true; - } - - pub fn upgrade(self: *WsConnection, request: []u8) !void { - // our caller already confirmed that we have a trailing \r\n\r\n - const request_line_end = std.mem.indexOfScalar(u8, request, '\r') orelse unreachable; - const request_line = request[0..request_line_end]; - - if (!std.ascii.endsWithIgnoreCase(request_line, "http/1.1")) { - return error.InvalidProtocol; - } - - // we need to extract the sec-websocket-key value - var key: []const u8 = ""; - - // we need to make sure that we got all the necessary headers + values - var required_headers: u8 = 0; - - // can't std.mem.split because it forces the iterated value to be const - // (we could @constCast...) - - var buf = request[request_line_end + 2 ..]; - - while (buf.len > 4) { - const index = std.mem.indexOfScalar(u8, buf, '\r') orelse unreachable; - const separator = std.mem.indexOfScalar(u8, buf[0..index], ':') orelse return error.InvalidRequest; - - const name = std.mem.trim(u8, toLower(buf[0..separator]), &std.ascii.whitespace); - const value = std.mem.trim(u8, buf[(separator + 1)..index], &std.ascii.whitespace); - - if (std.mem.eql(u8, name, "upgrade")) { - if (!std.ascii.eqlIgnoreCase("websocket", value)) { - return error.InvalidUpgradeHeader; - } - required_headers |= 1; - } else if (std.mem.eql(u8, name, "sec-websocket-version")) { - if (value.len != 2 or value[0] != '1' or value[1] != '3') { - return error.InvalidVersionHeader; - } - required_headers |= 2; - } else if (std.mem.eql(u8, name, "connection")) { - // find if connection header has upgrade in it, example header: - // Connection: keep-alive, Upgrade - if (std.ascii.indexOfIgnoreCase(value, "upgrade") == null) { - return error.InvalidConnectionHeader; - } - required_headers |= 4; - } else if (std.mem.eql(u8, name, "sec-websocket-key")) { - key = value; - required_headers |= 8; - } - - const next = index + 2; - buf = buf[next..]; - } - - if (required_headers != 15) { - return error.MissingHeaders; - } - - // our caller has already made sure this request ended in \r\n\r\n - // so it isn't something we need to check again - - const alloc = self.send_arena.allocator(); - - const response = blk: { - // Response to an upgrade request is always this, with - // the Sec-Websocket-Accept value a spacial sha1 hash of the - // request "sec-websocket-version" and a magic value. - - const template = - "HTTP/1.1 101 Switching Protocols\r\n" ++ - "Upgrade: websocket\r\n" ++ - "Connection: upgrade\r\n" ++ - "Sec-Websocket-Accept: 0000000000000000000000000000\r\n\r\n"; - - // The response will be sent via the IO Loop and thus has to have its - // own lifetime. - const res = try alloc.dupe(u8, template); - - // magic response - const key_pos = res.len - 32; - var h: [20]u8 = undefined; - var hasher = std.crypto.hash.Sha1.init(.{}); - hasher.update(key); - // websocket spec always used this value - hasher.update("258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); - hasher.final(&h); - - _ = std.base64.standard.Encoder.encode(res[key_pos .. key_pos + 28], h[0..]); - - break :blk res; + const read_bytes = self.read() catch |err| { + log.warn(.app, "CDP read", .{ .err = err }); + return false; }; - - return self.send(response); - } - - pub fn sendHttpError(self: *WsConnection, comptime status: u16, comptime body: []const u8) void { - const response = std.fmt.comptimePrint( - "HTTP/1.1 {d} \r\nConnection: Close\r\nContent-Length: {d}\r\n\r\n{s}", - .{ status, body.len, body }, - ); - - // we're going to close this connection anyways, swallowing any - // error seems safe - self.send(response) catch {}; - } - - pub fn getAddress(self: *WsConnection) !std.net.Address { - var address: std.net.Address = undefined; - var socklen: posix.socklen_t = @sizeOf(std.net.Address); - try posix.getpeername(self.socket, &address.any, &socklen); - return address; - } - - pub fn sendClose(self: *WsConnection) void { - self.send(&CLOSE_GOING_AWAY) catch {}; - } - - pub fn shutdown(self: *WsConnection) void { - posix.shutdown(self.socket, .recv) catch {}; - } - - pub fn setBlocking(self: *WsConnection, blocking: bool) !void { - if (blocking) { - _ = try posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true }))); - } else { - _ = try posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags); + if (read_bytes == 0) { + log.info(.app, "CDP disconnect", .{}); + return false; + } + const result = self.processHttpRequest() catch return false; + switch (result) { + .more => continue, + .upgraded => return true, + .close => return false, } } -}; +} + +pub fn read(self: *WsConnection) !usize { + const n = try posix.read(self.socket, self.reader.readBuf()); + self.reader.len += n; + return n; +} + +fn processHttpRequest(self: *WsConnection) !HttpResult { + assert(self.reader.pos == 0, "WsConnection.HTTP pos", .{ .pos = self.reader.pos }); + const request = self.reader.buf[0..self.reader.len]; + + if (request.len > Config.CDP_MAX_HTTP_REQUEST_SIZE) { + self.sendHttpError(413, "Request too large"); + return error.RequestTooLarge; + } + + // we're only expecting [body-less] GET requests. + if (std.mem.endsWith(u8, request, "\r\n\r\n") == false) { + // we need more data, put any more data here + return .more; + } + + // the next incoming data can go to the front of our buffer + defer self.reader.len = 0; + return self.handleHttpRequest(request) catch |err| { + switch (err) { + error.NotFound => self.sendHttpError(404, "Not found"), + error.InvalidRequest => self.sendHttpError(400, "Invalid request"), + error.InvalidProtocol => self.sendHttpError(400, "Invalid HTTP protocol"), + error.MissingHeaders => self.sendHttpError(400, "Missing required header"), + error.InvalidUpgradeHeader => self.sendHttpError(400, "Unsupported upgrade type"), + error.InvalidVersionHeader => self.sendHttpError(400, "Invalid websocket version"), + error.InvalidConnectionHeader => self.sendHttpError(400, "Invalid connection header"), + else => { + log.err(.app, "server 500", .{ .err = err, .req = request[0..@min(100, request.len)] }); + self.sendHttpError(500, "Internal Server Error"); + }, + } + return err; + }; +} + +fn handleHttpRequest(self: *WsConnection, request: []u8) !HttpResult { + if (request.len < 18) { + // 18 is [generously] the smallest acceptable HTTP request + return error.InvalidRequest; + } + + if (std.mem.eql(u8, request[0..4], "GET ") == false) { + return error.NotFound; + } + + const url_end = std.mem.indexOfScalarPos(u8, request, 4, ' ') orelse { + return error.InvalidRequest; + }; + + const url = request[4..url_end]; + + if (std.mem.eql(u8, url, "/")) { + try self.upgrade(request); + return .upgraded; + } + + if (std.mem.eql(u8, url, "/json/version") or std.mem.eql(u8, url, "/json/version/")) { + try self.send(self.json_version_response); + // Chromedp (a Go driver) does an http request to /json/version + // then to / (websocket upgrade) using a different connection. + // Since we only allow 1 connection at a time, the 2nd one (the + // websocket upgrade) blocks until the first one times out. + // We can avoid that by closing the connection. json_version_response + // has a Connection: Close header too. + self.shutdown(); + return .close; + } + + if (std.mem.eql(u8, url, "/json/list") or std.mem.eql(u8, url, "/json/list/") or + std.mem.eql(u8, url, "/json") or std.mem.eql(u8, url, "/json/")) + { + try self.send(empty_json_list_response); + self.shutdown(); + return .close; + } + + return error.NotFound; +} + +const empty_json_list_response = + "HTTP/1.1 200 OK\r\n" ++ + "Content-Length: 2\r\n" ++ + "Connection: Close\r\n" ++ + "Content-Type: application/json; charset=UTF-8\r\n\r\n" ++ + "[]"; + +pub fn processMessages(self: *WsConnection, handler: anytype) !bool { + var reader = &self.reader; + while (true) { + const msg = reader.next() catch |err| { + switch (err) { + error.TooLarge => self.send(&CLOSE_TOO_BIG) catch {}, + error.NotMasked => self.send(&CLOSE_PROTOCOL_ERROR) catch {}, + error.ReservedFlags => self.send(&CLOSE_PROTOCOL_ERROR) catch {}, + error.InvalidMessageType => self.send(&CLOSE_PROTOCOL_ERROR) catch {}, + error.ControlTooLarge => self.send(&CLOSE_PROTOCOL_ERROR) catch {}, + error.InvalidContinuation => self.send(&CLOSE_PROTOCOL_ERROR) catch {}, + error.NestedFragmentation => self.send(&CLOSE_PROTOCOL_ERROR) catch {}, + error.OutOfMemory => {}, // don't borther trying to send an error in this case + } + return err; + } orelse break; + + switch (msg.type) { + .pong => {}, + .ping => try self.sendPong(msg.data), + .close => { + self.send(&CLOSE_NORMAL) catch {}; + return false; + }, + .text, .binary => if (handler.handleMessage(msg.data) == false) { + return false; + }, + } + if (msg.cleanup_fragment) { + reader.cleanup(); + } + } + + // We might have read part of the next message. Our reader potentially + // has to move data around in its buffer to make space. + reader.compact(); + return true; +} + +pub fn upgrade(self: *WsConnection, request: []u8) !void { + // our caller already confirmed that we have a trailing \r\n\r\n + const request_line_end = std.mem.indexOfScalar(u8, request, '\r') orelse unreachable; + const request_line = request[0..request_line_end]; + + if (!std.ascii.endsWithIgnoreCase(request_line, "http/1.1")) { + return error.InvalidProtocol; + } + + // we need to extract the sec-websocket-key value + var key: []const u8 = ""; + + // we need to make sure that we got all the necessary headers + values + var required_headers: u8 = 0; + + // can't std.mem.split because it forces the iterated value to be const + // (we could @constCast...) + + var buf = request[request_line_end + 2 ..]; + + while (buf.len > 4) { + const index = std.mem.indexOfScalar(u8, buf, '\r') orelse unreachable; + const separator = std.mem.indexOfScalar(u8, buf[0..index], ':') orelse return error.InvalidRequest; + + const name = std.mem.trim(u8, toLower(buf[0..separator]), &std.ascii.whitespace); + const value = std.mem.trim(u8, buf[(separator + 1)..index], &std.ascii.whitespace); + + if (std.mem.eql(u8, name, "upgrade")) { + if (!std.ascii.eqlIgnoreCase("websocket", value)) { + return error.InvalidUpgradeHeader; + } + required_headers |= 1; + } else if (std.mem.eql(u8, name, "sec-websocket-version")) { + if (value.len != 2 or value[0] != '1' or value[1] != '3') { + return error.InvalidVersionHeader; + } + required_headers |= 2; + } else if (std.mem.eql(u8, name, "connection")) { + // find if connection header has upgrade in it, example header: + // Connection: keep-alive, Upgrade + if (std.ascii.indexOfIgnoreCase(value, "upgrade") == null) { + return error.InvalidConnectionHeader; + } + required_headers |= 4; + } else if (std.mem.eql(u8, name, "sec-websocket-key")) { + key = value; + required_headers |= 8; + } + + const next = index + 2; + buf = buf[next..]; + } + + if (required_headers != 15) { + return error.MissingHeaders; + } + + // our caller has already made sure this request ended in \r\n\r\n + // so it isn't something we need to check again + + const alloc = self.send_arena.allocator(); + + const response = blk: { + // Response to an upgrade request is always this, with + // the Sec-Websocket-Accept value a spacial sha1 hash of the + // request "sec-websocket-version" and a magic value. + + const template = + "HTTP/1.1 101 Switching Protocols\r\n" ++ + "Upgrade: websocket\r\n" ++ + "Connection: upgrade\r\n" ++ + "Sec-Websocket-Accept: 0000000000000000000000000000\r\n\r\n"; + + // The response will be sent via the IO Loop and thus has to have its + // own lifetime. + const res = try alloc.dupe(u8, template); + + // magic response + const key_pos = res.len - 32; + var h: [20]u8 = undefined; + var hasher = std.crypto.hash.Sha1.init(.{}); + hasher.update(key); + // websocket spec always used this value + hasher.update("258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); + hasher.final(&h); + + _ = std.base64.standard.Encoder.encode(res[key_pos .. key_pos + 28], h[0..]); + + break :blk res; + }; + + return self.send(response); +} + +pub fn sendHttpError(self: *WsConnection, comptime status: u16, comptime body: []const u8) void { + const response = std.fmt.comptimePrint( + "HTTP/1.1 {d} \r\nConnection: Close\r\nContent-Length: {d}\r\n\r\n{s}", + .{ status, body.len, body }, + ); + + // we're going to close this connection anyways, swallowing any + // error seems safe + self.send(response) catch {}; +} + +pub fn getAddress(self: *WsConnection) !std.net.Address { + var address: std.net.Address = undefined; + var socklen: posix.socklen_t = @sizeOf(std.net.Address); + try posix.getpeername(self.socket, &address.any, &socklen); + return address; +} + +pub fn sendClose(self: *WsConnection) void { + self.send(&CLOSE_GOING_AWAY) catch {}; +} + +pub fn shutdown(self: *WsConnection) void { + posix.shutdown(self.socket, .recv) catch {}; +} + +pub fn setBlocking(self: *WsConnection, blocking: bool) !void { + if (blocking) { + _ = try posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true }))); + } else { + _ = try posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags); + } +} fn fillWebsocketHeader(buf: std.ArrayList(u8)) []const u8 { // can't use buf[0..10] here, because the header length diff --git a/src/testing.zig b/src/testing.zig index 1ea704b0..549d1349 100644 --- a/src/testing.zig +++ b/src/testing.zig @@ -333,7 +333,6 @@ fn isJsonValue(a: std.json.Value, b: std.json.Value) bool { } pub var test_app: *App = undefined; -pub var test_http: *HttpClient = undefined; pub var test_browser: Browser = undefined; pub var test_notification: *Notification = undefined; pub var test_session: *Session = undefined; @@ -499,10 +498,7 @@ test "tests:beforeAll" { test_app = try App.init(test_allocator, &test_config); errdefer test_app.deinit(); - test_http = try HttpClient.init(test_allocator, &test_app.network); - errdefer test_http.deinit(); - - test_browser = try Browser.init(test_app, .{ .http_client = test_http }); + try test_browser.init(test_app, .{}, null); errdefer test_browser.deinit(); // Create notification for testing @@ -557,7 +553,6 @@ test "tests:afterAll" { test_notification.deinit(); test_browser.deinit(); - test_http.deinit(); test_app.deinit(); test_config.deinit(@import("root").tracking_allocator); }