diff --git a/net/uring/io_uring.c b/net/uring/io_uring.c index 73fdbfa4c..e3eb73e06 100644 --- a/net/uring/io_uring.c +++ b/net/uring/io_uring.c @@ -20,7 +20,7 @@ typedef struct iovec go_iovec; typedef struct sockaddr_in go_sockaddr_in; // Wait for a completion to be available, fetch the data -static int receive_into(struct io_uring *ring) { +static int receive_into(struct io_uring *ring, size_t *idxptr) { struct io_uring_cqe *cqe; again:; @@ -38,9 +38,10 @@ again:; fprintf(stderr, "recvmsg failed: %d.\n", cqe->res); return cqe->res; } - struct msghdr *mhdr = io_uring_cqe_get_data(cqe); - if (mhdr == NULL) { + *idxptr = (size_t)(io_uring_cqe_get_data(cqe)); + if (*idxptr < 0) { fprintf(stderr, "received nop\n"); + io_uring_cqe_seen(ring, cqe); return -1; } int n = cqe->res; @@ -58,7 +59,7 @@ static uint16_t port(struct sockaddr_in *sa) { // submit a recvmsg request via liburing // TODO: What recvfrom support arrives, maybe use that instead? -static int submit_recvmsg_request(int sock, struct io_uring *ring, struct msghdr *mhdr, struct iovec *iov, struct sockaddr_in *sender, char *buf, int buflen) { +static int submit_recvmsg_request(int sock, struct io_uring *ring, struct msghdr *mhdr, struct iovec *iov, struct sockaddr_in *sender, char *buf, int buflen, size_t idx) { iov->iov_base = buf; iov->iov_len = buflen; @@ -70,7 +71,7 @@ static int submit_recvmsg_request(int sock, struct io_uring *ring, struct msghdr struct io_uring_sqe *sqe = io_uring_get_sqe(ring); io_uring_prep_recvmsg(sqe, sock, mhdr, 0); - io_uring_sqe_set_data(sqe, mhdr); + io_uring_sqe_set_data(sqe, (void *)(idx)); io_uring_submit(ring); return 0; @@ -79,5 +80,6 @@ static int submit_recvmsg_request(int sock, struct io_uring *ring, struct msghdr static void submit_nop_request(struct io_uring *ring) { struct io_uring_sqe *sqe = io_uring_get_sqe(ring); io_uring_prep_nop(sqe); + io_uring_sqe_set_data(sqe, (void *)(-1)); io_uring_submit(ring); } diff --git a/net/uring/io_uring_linux.go b/net/uring/io_uring_linux.go index ebe35f360..7eeb2f5c6 100644 --- a/net/uring/io_uring_linux.go +++ b/net/uring/io_uring_linux.go @@ -36,7 +36,7 @@ type UDPConn struct { file *os.File // must keep file from being GC'd fd C.int local net.Addr - reqs [1]req + reqs [8]req } func NewUDPConn(conn *net.UDPConn) (*UDPConn, error) { @@ -65,8 +65,8 @@ func NewUDPConn(conn *net.UDPConn) (*UDPConn, error) { fd: C.int(file.Fd()), local: conn.LocalAddr(), } - for i := range u.reqs[:1] { - if err := u.submitRequest(&u.reqs[i]); err != nil { + for i := range u.reqs { + if err := u.submitRequest(i); err != nil { u.Close() // TODO: will this crash? return nil, err } @@ -81,9 +81,10 @@ type req struct { buf [device.MaxSegmentSize]byte } -func (u *UDPConn) submitRequest(r *req) error { - // TODO: eventually separate submitting the request and waiting for the response. - errno := C.submit_recvmsg_request(u.fd, u.ptr, &r.mhdr, &r.iov, &r.sa, (*C.char)(unsafe.Pointer(&r.buf[0])), C.int(len(r.buf))) +func (u *UDPConn) submitRequest(idx int) error { + r := &u.reqs[idx] + // TODO: make a C struct instead of a Go struct, and pass that in, to simplify call sites. + errno := C.submit_recvmsg_request(u.fd, u.ptr, &r.mhdr, &r.iov, &r.sa, (*C.char)(unsafe.Pointer(&r.buf[0])), C.int(len(r.buf)), C.size_t(idx)) if errno < 0 { return fmt.Errorf("uring.submitRequest failed: %v", errno) // TODO: Improve } @@ -94,12 +95,12 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) { if u.fd == 0 { return 0, netaddr.IPPort{}, errors.New("invalid uring.UDPConn") } - r := &u.reqs[0] - n := C.receive_into(u.ptr) + var idx C.size_t + n := C.receive_into(u.ptr, &idx) if n < 0 { return 0, netaddr.IPPort{}, errors.New("something wrong") } - // TODO: this is broken :((( + r := &u.reqs[int(idx)] ip := C.ip(&r.sa) var ip4 [4]byte binary.BigEndian.PutUint32(ip4[:], uint32(ip)) @@ -107,7 +108,7 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) { ipp := netaddr.IPPortFrom(netaddr.IPFrom4(ip4), uint16(port)) copy(buf, r.buf[:n]) // Queue up a new request. - err := u.submitRequest(r) + err := u.submitRequest(int(idx)) if err != nil { panic("how should we handle this?") } @@ -115,7 +116,7 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) { } func (u *UDPConn) Close() error { - fmt.Println("CLOSE URING", u) + // fmt.Println("CLOSE URING", u) u.close.Do(func() { // Send a nop to unblock any outstanding readers. // Hope that we manage to close before any new readers appear. @@ -126,9 +127,9 @@ func (u *UDPConn) Close() error { // // Update: this causes crashes, because of entirely predictable and predicted races. // The mystery about how to safely unblock all outstanding io_uring_wait_cqe calls remains... - fmt.Println("io_uring_queue_exit", u.ptr) + // fmt.Println("io_uring_queue_exit", u.ptr) C.io_uring_queue_exit(u.ptr) - fmt.Println("DONE io_uring_queue_exit", u.ptr) + // fmt.Println("DONE io_uring_queue_exit", u.ptr) u.ptr = nil u.conn.Close() u.conn = nil