NextTickNode is owned by Transfer

This commit is contained in:
Muki Kiboigo
2026-05-25 10:29:26 -07:00
parent f4de603cf5
commit 00ccb5ed52
2 changed files with 22 additions and 38 deletions

View File

@@ -185,12 +185,11 @@ fn layerWith(self: anytype, next: Layer) Layer {
pub const NextTickNode = struct {
pub const Run =
*const fn (*Transfer, *anyopaque) anyerror!void;
*const fn (*Transfer, *anyopaque) void;
node: std.DoublyLinkedList.Node = .{},
ctx: *anyopaque,
run: Run,
transfer_id: u32,
};
pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp: ?*CDP) !void {
@@ -244,13 +243,6 @@ pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp: ?*CDP)
pub fn deinit(self: *Client) void {
self.abort();
// Drain Next Tick Queue
while (self.next_tick_queue.popFirst()) |node| {
const n: *NextTickNode = @fieldParentPtr("node", node);
self.allocator.destroy(n);
}
self.next_tick_count = 0;
self.handles.deinit();
self.clearUserAgentOverride();
@@ -439,24 +431,16 @@ pub fn tick(self: *Client, timeout_ms: u32, mode: DrainMode) !void {
try self.drainInbox(mode);
}
pub fn runNextTick(self: *Client, transfer_id: u32, ctx: *anyopaque, run: NextTickNode.Run) !void {
const node = try self.allocator.create(NextTickNode);
node.* = .{ .ctx = ctx, .run = run, .transfer_id = transfer_id };
pub fn runNextTick(self: *Client, transfer: *Transfer, ctx: *anyopaque, run: NextTickNode.Run) !void {
transfer._next_tick_node = .{ .ctx = ctx, .run = run };
self.next_tick_count += 1;
self.next_tick_queue.append(&node.node);
self.next_tick_queue.append(&transfer._next_tick_node.?.node);
}
fn cancelNextTick(self: *Client, transfer_id: u32) void {
var it = self.next_tick_queue.first;
while (it) |node| {
it = node.next;
const n: *NextTickNode = @fieldParentPtr("node", node);
if (n.transfer_id == transfer_id) {
self.next_tick_queue.remove(node);
self.next_tick_count -= 1;
self.allocator.destroy(n);
}
fn cancelNextTick(self: *Client, transfer: *Transfer) void {
if (transfer._next_tick_node) |*ntn| {
self.next_tick_queue.remove(&ntn.node);
}
}
@@ -466,12 +450,13 @@ fn drainNextTickQueue(self: *Client) !void {
while (queue.popFirst()) |node| {
const n: *NextTickNode = @fieldParentPtr("node", node);
defer self.allocator.destroy(n);
defer self.next_tick_count -= 1;
if (self.findTransfer(n.transfer_id)) |t| {
try n.run(t, n.ctx);
}
const transfer: *Transfer = @fieldParentPtr(
"_next_tick_node",
@as(*?NextTickNode, @ptrCast(n)),
);
n.run(transfer, n.ctx);
}
}
@@ -1313,6 +1298,9 @@ pub const Transfer = struct {
// for when a Transfer is queued in the client.queue
_node: std.DoublyLinkedList.Node = .{},
// for when a Transfer is queued for the next tick.
_next_tick_node: ?NextTickNode = null,
pub const State = union(enum) {
// Pre-commit. Only valid inside the request flow (Client.request
// or a re-entry like continueTransfer / unpark) before any commit
@@ -1393,7 +1381,7 @@ pub const Transfer = struct {
// Any concurrent CDP lookup by id will now see this transfer as gone.
_ = self.client.transfers.remove(self.id);
self.client.cancelNextTick(self.id);
self.client.cancelNextTick(self);
self.req.deinit();
if (self.owner) |o| {

View File

@@ -73,19 +73,15 @@ fn request(ptr: *anyopaque, transfer: *Transfer) anyerror!void {
&.{ .transfer = transfer },
);
const CacheServeCtx = struct {
cached: CachedResponse,
};
const ctx = try arena.create(CachedResponse);
ctx.* = cached;
const ctx = try arena.create(CacheServeCtx);
ctx.* = .{ .cached = cached };
try transfer.client.runNextTick(transfer.id, ctx, struct {
fn run(t: *Transfer, ctx_ptr: *anyopaque) anyerror!void {
try transfer.client.runNextTick(transfer, ctx, struct {
fn run(t: *Transfer, ctx_ptr: *anyopaque) void {
defer t.deinit();
const c: *CacheServeCtx = @ptrCast(@alignCast(ctx_ptr));
serveFromCache(&t.req, &c.cached) catch |err| {
const c: *CachedResponse = @ptrCast(@alignCast(ctx_ptr));
serveFromCache(&t.req, c) catch |err| {
t.req.error_callback(t.req.ctx, err);
};
}