diff --git a/src/Server.zig b/src/Server.zig index 50f81d8a..ca4afc09 100644 --- a/src/Server.zig +++ b/src/Server.zig @@ -39,6 +39,7 @@ json_version_response: []const u8, // Thread management active_threads: std.atomic.Value(u32) = .init(0), clients: std.ArrayList(*Client) = .{}, +pending_handshakes: std.ArrayList(*Net.WsConnection) = .{}, client_mutex: std.Thread.Mutex = .{}, clients_pool: std.heap.MemoryPool(Client), @@ -70,12 +71,16 @@ pub fn shutdown(self: *Server) void { for (self.clients.items) |client| { client.stop(); } + for (self.pending_handshakes.items) |ws| { + ws.shutdown(); + } } pub fn deinit(self: *Server) void { self.shutdown(); self.joinThreads(); self.clients.deinit(self.allocator); + self.pending_handshakes.deinit(self.allocator); self.clients_pool.deinit(); self.allocator.free(self.json_version_response); self.allocator.destroy(self); @@ -124,6 +129,35 @@ fn handleConnection(self: *Server, socket: posix.socket_t) void { setTcpKeepalive(socket); + var ws = Net.WsConnection.init(socket, self.allocator, self.json_version_response) catch |err| { + log.err(.app, "CDP ws init", .{ .err = err }); + return; + }; + var ws_owned = true; + defer if (ws_owned) ws.deinit(); + + if (log.enabled(.app, .info)) { + const client_address = ws.getAddress() catch null; + log.info(.app, "client connected", .{ .ip = client_address }); + } + + const http = HttpClient.init(self.allocator, &self.app.network) catch |err| { + log.err(.app, "CDP http init", .{ .err = err }); + return; + }; + var http_owned = true; + defer if (http_owned) http.deinit(); + + self.registerHandshake(&ws); + const handshake_result = ws.handshake(); + self.unregisterHandshake(&ws); + + const upgraded = handshake_result catch |err| { + log.err(.app, "CDP handshake", .{ .err = err }); + return; + }; + if (!upgraded) return; + // Client is HUGE (> 512KB) because it has a large read buffer. // V8 crashes if this is on the stack (likely related to its size). const client = self.getClient() catch |err| { @@ -132,16 +166,15 @@ fn handleConnection(self: *Server, socket: posix.socket_t) void { }; defer self.releaseClient(client); - client.* = Client.init( - socket, - self.allocator, - self.app, - self.json_version_response, - ) catch |err| { - log.err(.app, "CDP client init", .{ .err = err }); + client.* = Client.init(self.allocator, self.app, ws, http); + ws_owned = false; + http_owned = false; + defer client.deinit(); + + client.cdp = CDP.init(client) catch |err| { + log.err(.app, "CDP init", .{ .err = err }); return; }; - defer client.deinit(); self.registerClient(client); defer self.unregisterClient(client); @@ -185,6 +218,23 @@ fn unregisterClient(self: *Server, client: *Client) void { } } +fn registerHandshake(self: *Server, ws: *Net.WsConnection) void { + self.client_mutex.lock(); + defer self.client_mutex.unlock(); + self.pending_handshakes.append(self.allocator, ws) catch {}; +} + +fn unregisterHandshake(self: *Server, ws: *Net.WsConnection) void { + self.client_mutex.lock(); + defer self.client_mutex.unlock(); + for (self.pending_handshakes.items, 0..) |w, i| { + if (w == ws) { + _ = self.pending_handshakes.swapRemove(i); + break; + } + } +} + fn spawnWorker(self: *Server, socket: posix.socket_t) !void { if (self.app.shutdown()) { return error.ShuttingDown; @@ -227,62 +277,33 @@ fn joinThreads(self: *Server) void { } } -// Handle exactly one TCP connection. +// Handle exactly one upgraded CDP websocket connection. pub const Client = struct { - // The client is initially serving HTTP requests but, under normal circumstances - // should eventually be upgraded to a websocket connections - mode: union(enum) { - http: void, - cdp: CDP, - }, - allocator: Allocator, app: *App, http: *HttpClient, ws: Net.WsConnection, + cdp: ?CDP = null, - pub fn init( - socket: posix.socket_t, - allocator: Allocator, - app: *App, - json_version_response: []const u8, - ) !Client { - var ws = try Net.WsConnection.init(socket, allocator, json_version_response); - errdefer ws.deinit(); - - if (log.enabled(.app, .info)) { - const client_address = ws.getAddress() catch null; - log.info(.app, "client connected", .{ .ip = client_address }); - } - - const http = try HttpClient.init(allocator, &app.network); - errdefer http.deinit(); - + pub fn init(allocator: Allocator, app: *App, ws: Net.WsConnection, http: *HttpClient) Client { return .{ .allocator = allocator, .app = app, .http = http, .ws = ws, - .mode = .{ .http = {} }, }; } fn stop(self: *Client) void { - switch (self.mode) { - .http => {}, - .cdp => |*cdp| { - cdp.browser.env.terminate(); - self.ws.sendClose(); - }, + if (self.cdp) |*cdp| { + cdp.browser.env.terminate(); + self.ws.sendClose(); } self.ws.shutdown(); } pub fn deinit(self: *Client) void { - switch (self.mode) { - .cdp => |*cdp| cdp.deinit(), - .http => {}, - } + if (self.cdp) |*cdp| cdp.deinit(); self.ws.deinit(); self.http.deinit(); } @@ -298,13 +319,13 @@ pub const Client = struct { }; defer http.cdp_client = null; - self.httpLoop(http) catch |err| { + self.wsLoop(http) catch |err| { log.err(.app, "CDP client loop", .{ .err = err }); }; } - fn httpLoop(self: *Client, http: *HttpClient) !void { - lp.assert(self.mode == .http, "Client.httpLoop invalid mode", .{}); + fn wsLoop(self: *Client, http: *HttpClient) !void { + var cdp = &self.cdp.?; // Liveness is enforced by TCP keepalive configured in // Server.setTcpKeepalive; the kernel closes dead sockets, which @@ -313,24 +334,6 @@ pub const Client = struct { // i32-max because HttpClient.tick narrows to c_int. const wait_ms: u32 = std.math.maxInt(i32); - while (true) { - const status = http.tick(wait_ms) catch |err| { - log.err(.app, "http tick", .{ .err = err }); - return; - }; - if (status != .cdp_socket) continue; - - if (self.readSocket() == false) { - return; - } - - if (self.mode == .cdp) { - break; - } - } - - var cdp = &self.mode.cdp; - while (true) { const result = cdp.pageWait(wait_ms) catch |wait_err| switch (wait_err) { error.NoPage => { @@ -392,106 +395,7 @@ pub const Client = struct { return false; } - return self.processData() catch false; - } - - fn processData(self: *Client) !bool { - switch (self.mode) { - .cdp => |*cdp| return self.processWebsocketMessage(cdp), - .http => return self.processHTTPRequest(), - } - } - - fn processHTTPRequest(self: *Client) !bool { - lp.assert(self.ws.reader.pos == 0, "Client.HTTP pos", .{ .pos = self.ws.reader.pos }); - const request = self.ws.reader.buf[0..self.ws.reader.len]; - - if (request.len > Config.CDP_MAX_HTTP_REQUEST_SIZE) { - self.writeHTTPErrorResponse(413, "Request too large"); - return error.RequestTooLarge; - } - - // we're only expecting [body-less] GET requests. - if (std.mem.endsWith(u8, request, "\r\n\r\n") == false) { - // we need more data, put any more data here - return true; - } - - // the next incoming data can go to the front of our buffer - defer self.ws.reader.len = 0; - return self.handleHTTPRequest(request) catch |err| { - switch (err) { - error.NotFound => self.writeHTTPErrorResponse(404, "Not found"), - error.InvalidRequest => self.writeHTTPErrorResponse(400, "Invalid request"), - error.InvalidProtocol => self.writeHTTPErrorResponse(400, "Invalid HTTP protocol"), - error.MissingHeaders => self.writeHTTPErrorResponse(400, "Missing required header"), - error.InvalidUpgradeHeader => self.writeHTTPErrorResponse(400, "Unsupported upgrade type"), - error.InvalidVersionHeader => self.writeHTTPErrorResponse(400, "Invalid websocket version"), - error.InvalidConnectionHeader => self.writeHTTPErrorResponse(400, "Invalid connection header"), - else => { - log.err(.app, "server 500", .{ .err = err, .req = request[0..@min(100, request.len)] }); - self.writeHTTPErrorResponse(500, "Internal Server Error"); - }, - } - return err; - }; - } - - fn handleHTTPRequest(self: *Client, request: []u8) !bool { - if (request.len < 18) { - // 18 is [generously] the smallest acceptable HTTP request - return error.InvalidRequest; - } - - if (std.mem.eql(u8, request[0..4], "GET ") == false) { - return error.NotFound; - } - - const url_end = std.mem.indexOfScalarPos(u8, request, 4, ' ') orelse { - return error.InvalidRequest; - }; - - const url = request[4..url_end]; - - if (std.mem.eql(u8, url, "/")) { - try self.upgradeConnection(request); - return true; - } - - if (std.mem.eql(u8, url, "/json/version") or std.mem.eql(u8, url, "/json/version/")) { - try self.ws.send(self.ws.json_version_response); - // Chromedp (a Go driver) does an http request to /json/version - // then to / (websocket upgrade) using a different connection. - // Since we only allow 1 connection at a time, the 2nd one (the - // websocket upgrade) blocks until the first one times out. - // We can avoid that by closing the connection. json_version_response - // has a Connection: Close header too. - self.ws.shutdown(); - return false; - } - - if (std.mem.eql(u8, url, "/json/list") or std.mem.eql(u8, url, "/json/list/") or - std.mem.eql(u8, url, "/json") or std.mem.eql(u8, url, "/json/")) - { - try self.ws.send(empty_json_list_response); - self.ws.shutdown(); - return false; - } - - return error.NotFound; - } - - fn upgradeConnection(self: *Client, request: []u8) !void { - try self.ws.upgrade(request); - self.mode = .{ .cdp = try CDP.init(self) }; - } - - fn writeHTTPErrorResponse(self: *Client, comptime status: u16, comptime body: []const u8) void { - self.ws.sendHttpError(status, body); - } - - fn processWebsocketMessage(self: *Client, cdp: *CDP) !bool { - return self.ws.processMessages(cdp); + return self.ws.processMessages(&self.cdp.?) catch false; } pub fn sendAllocator(self: *Client) Allocator { @@ -544,13 +448,6 @@ fn buildJSONVersionResponse( return try std.fmt.allocPrint(app.allocator, response_format, .{ body_len, host, port }); } -const empty_json_list_response = - "HTTP/1.1 200 OK\r\n" ++ - "Content-Length: 2\r\n" ++ - "Connection: Close\r\n" ++ - "Content-Type: application/json; charset=UTF-8\r\n\r\n" ++ - "[]"; - pub const timestamp = @import("datetime.zig").timestamp; pub const milliTimestamp = @import("datetime.zig").milliTimestamp; diff --git a/src/cdp/testing.zig b/src/cdp/testing.zig index 1838ef39..fa86ff4e 100644 --- a/src/cdp/testing.zig +++ b/src/cdp/testing.zig @@ -22,6 +22,8 @@ const posix = std.posix; const CDP = @import("CDP.zig"); const Server = @import("../Server.zig"); +const Net = @import("../network/websocket.zig"); +const HttpClient = @import("../browser/HttpClient.zig"); const base = @import("../testing.zig"); pub const allocator = base.allocator; @@ -315,7 +317,9 @@ pub fn context() !TestContext { try posix.setsockopt(pair[1], posix.SOL.SOCKET, posix.SO.RCVBUF, &std.mem.toBytes(@as(c_int, 32_768))); try posix.setsockopt(pair[1], posix.SOL.SOCKET, posix.SO.SNDBUF, &std.mem.toBytes(@as(c_int, 32_768))); - const client = try Server.Client.init(pair[1], base.arena_allocator, base.test_app, "json-version"); + const ws = try Net.WsConnection.init(pair[1], base.arena_allocator, "json-version"); + const http = try HttpClient.init(base.arena_allocator, &base.test_app.network); + const client = Server.Client.init(base.arena_allocator, base.test_app, ws, http); return .{ .client = client, diff --git a/src/network/websocket.zig b/src/network/websocket.zig index 4ecb5daa..880e7c21 100644 --- a/src/network/websocket.zig +++ b/src/network/websocket.zig @@ -24,7 +24,8 @@ const ArenaAllocator = std.heap.ArenaAllocator; const log = @import("lightpanda").log; const assert = @import("lightpanda").assert; -const CDP_MAX_MESSAGE_SIZE = @import("../Config.zig").CDP_MAX_MESSAGE_SIZE; +const Config = @import("../Config.zig"); +const CDP_MAX_MESSAGE_SIZE = Config.CDP_MAX_MESSAGE_SIZE; const Fragments = struct { type: Message.Type, @@ -427,12 +428,135 @@ pub const WsConnection = struct { return self.send(framed); } + pub const HttpResult = enum { more, upgraded, close }; + + pub fn handshake(self: *WsConnection) !bool { + // Liveness is enforced by TCP keepalive configured in + // Server.setTcpKeepalive; a dead peer surfaces as a poll error or + // EOF from read(). The poll blocks for ~24 days rather than tracking + // an app-level timeout. Capped at i32-max because posix.poll narrows + // to c_int. + const wait_ms: i32 = std.math.maxInt(i32); + while (true) { + var pfds = [_]posix.pollfd{.{ + .fd = self.socket, + .events = posix.POLL.IN, + .revents = 0, + }}; + const n = try posix.poll(&pfds, wait_ms); + if (n == 0) { + log.info(.app, "CDP timeout", .{}); + return false; + } + const read_bytes = self.read() catch |err| { + log.warn(.app, "CDP read", .{ .err = err }); + return false; + }; + if (read_bytes == 0) { + log.info(.app, "CDP disconnect", .{}); + return false; + } + const result = self.processHttpRequest() catch return false; + switch (result) { + .more => continue, + .upgraded => return true, + .close => return false, + } + } + } + pub fn read(self: *WsConnection) !usize { const n = try posix.read(self.socket, self.reader.readBuf()); self.reader.len += n; return n; } + fn processHttpRequest(self: *WsConnection) !HttpResult { + assert(self.reader.pos == 0, "WsConnection.HTTP pos", .{ .pos = self.reader.pos }); + const request = self.reader.buf[0..self.reader.len]; + + if (request.len > Config.CDP_MAX_HTTP_REQUEST_SIZE) { + self.sendHttpError(413, "Request too large"); + return error.RequestTooLarge; + } + + // we're only expecting [body-less] GET requests. + if (std.mem.endsWith(u8, request, "\r\n\r\n") == false) { + // we need more data, put any more data here + return .more; + } + + // the next incoming data can go to the front of our buffer + defer self.reader.len = 0; + return self.handleHttpRequest(request) catch |err| { + switch (err) { + error.NotFound => self.sendHttpError(404, "Not found"), + error.InvalidRequest => self.sendHttpError(400, "Invalid request"), + error.InvalidProtocol => self.sendHttpError(400, "Invalid HTTP protocol"), + error.MissingHeaders => self.sendHttpError(400, "Missing required header"), + error.InvalidUpgradeHeader => self.sendHttpError(400, "Unsupported upgrade type"), + error.InvalidVersionHeader => self.sendHttpError(400, "Invalid websocket version"), + error.InvalidConnectionHeader => self.sendHttpError(400, "Invalid connection header"), + else => { + log.err(.app, "server 500", .{ .err = err, .req = request[0..@min(100, request.len)] }); + self.sendHttpError(500, "Internal Server Error"); + }, + } + return err; + }; + } + + fn handleHttpRequest(self: *WsConnection, request: []u8) !HttpResult { + if (request.len < 18) { + // 18 is [generously] the smallest acceptable HTTP request + return error.InvalidRequest; + } + + if (std.mem.eql(u8, request[0..4], "GET ") == false) { + return error.NotFound; + } + + const url_end = std.mem.indexOfScalarPos(u8, request, 4, ' ') orelse { + return error.InvalidRequest; + }; + + const url = request[4..url_end]; + + if (std.mem.eql(u8, url, "/")) { + try self.upgrade(request); + return .upgraded; + } + + if (std.mem.eql(u8, url, "/json/version") or std.mem.eql(u8, url, "/json/version/")) { + try self.send(self.json_version_response); + // Chromedp (a Go driver) does an http request to /json/version + // then to / (websocket upgrade) using a different connection. + // Since we only allow 1 connection at a time, the 2nd one (the + // websocket upgrade) blocks until the first one times out. + // We can avoid that by closing the connection. json_version_response + // has a Connection: Close header too. + self.shutdown(); + return .close; + } + + if (std.mem.eql(u8, url, "/json/list") or std.mem.eql(u8, url, "/json/list/") or + std.mem.eql(u8, url, "/json") or std.mem.eql(u8, url, "/json/")) + { + try self.send(empty_json_list_response); + self.shutdown(); + return .close; + } + + return error.NotFound; + } + + const empty_json_list_response = + "HTTP/1.1 200 OK\r\n" ++ + "Content-Length: 2\r\n" ++ + "Connection: Close\r\n" ++ + "Content-Type: application/json; charset=UTF-8\r\n\r\n" ++ + "[]"; + pub fn processMessages(self: *WsConnection, handler: anytype) !bool { var reader = &self.reader; while (true) {