keep 8 requests in the queue at all times

This commit is contained in:
Josh Bleecher Snyder
2021-06-01 21:30:39 -07:00
parent 38a872d2c1
commit bb78cf81b6
2 changed files with 21 additions and 18 deletions

View File

@@ -20,7 +20,7 @@ typedef struct iovec go_iovec;
typedef struct sockaddr_in go_sockaddr_in;
// Wait for a completion to be available, fetch the data
static int receive_into(struct io_uring *ring) {
static int receive_into(struct io_uring *ring, size_t *idxptr) {
struct io_uring_cqe *cqe;
again:;
@@ -38,9 +38,10 @@ again:;
fprintf(stderr, "recvmsg failed: %d.\n", cqe->res);
return cqe->res;
}
struct msghdr *mhdr = io_uring_cqe_get_data(cqe);
if (mhdr == NULL) {
*idxptr = (size_t)(io_uring_cqe_get_data(cqe));
if (*idxptr < 0) {
fprintf(stderr, "received nop\n");
io_uring_cqe_seen(ring, cqe);
return -1;
}
int n = cqe->res;
@@ -58,7 +59,7 @@ static uint16_t port(struct sockaddr_in *sa) {
// submit a recvmsg request via liburing
// TODO: What recvfrom support arrives, maybe use that instead?
static int submit_recvmsg_request(int sock, struct io_uring *ring, struct msghdr *mhdr, struct iovec *iov, struct sockaddr_in *sender, char *buf, int buflen) {
static int submit_recvmsg_request(int sock, struct io_uring *ring, struct msghdr *mhdr, struct iovec *iov, struct sockaddr_in *sender, char *buf, int buflen, size_t idx) {
iov->iov_base = buf;
iov->iov_len = buflen;
@@ -70,7 +71,7 @@ static int submit_recvmsg_request(int sock, struct io_uring *ring, struct msghdr
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_recvmsg(sqe, sock, mhdr, 0);
io_uring_sqe_set_data(sqe, mhdr);
io_uring_sqe_set_data(sqe, (void *)(idx));
io_uring_submit(ring);
return 0;
@@ -79,5 +80,6 @@ static int submit_recvmsg_request(int sock, struct io_uring *ring, struct msghdr
static void submit_nop_request(struct io_uring *ring) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_nop(sqe);
io_uring_sqe_set_data(sqe, (void *)(-1));
io_uring_submit(ring);
}

View File

@@ -36,7 +36,7 @@ type UDPConn struct {
file *os.File // must keep file from being GC'd
fd C.int
local net.Addr
reqs [1]req
reqs [8]req
}
func NewUDPConn(conn *net.UDPConn) (*UDPConn, error) {
@@ -65,8 +65,8 @@ func NewUDPConn(conn *net.UDPConn) (*UDPConn, error) {
fd: C.int(file.Fd()),
local: conn.LocalAddr(),
}
for i := range u.reqs[:1] {
if err := u.submitRequest(&u.reqs[i]); err != nil {
for i := range u.reqs {
if err := u.submitRequest(i); err != nil {
u.Close() // TODO: will this crash?
return nil, err
}
@@ -81,9 +81,10 @@ type req struct {
buf [device.MaxSegmentSize]byte
}
func (u *UDPConn) submitRequest(r *req) error {
// TODO: eventually separate submitting the request and waiting for the response.
errno := C.submit_recvmsg_request(u.fd, u.ptr, &r.mhdr, &r.iov, &r.sa, (*C.char)(unsafe.Pointer(&r.buf[0])), C.int(len(r.buf)))
func (u *UDPConn) submitRequest(idx int) error {
r := &u.reqs[idx]
// TODO: make a C struct instead of a Go struct, and pass that in, to simplify call sites.
errno := C.submit_recvmsg_request(u.fd, u.ptr, &r.mhdr, &r.iov, &r.sa, (*C.char)(unsafe.Pointer(&r.buf[0])), C.int(len(r.buf)), C.size_t(idx))
if errno < 0 {
return fmt.Errorf("uring.submitRequest failed: %v", errno) // TODO: Improve
}
@@ -94,12 +95,12 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) {
if u.fd == 0 {
return 0, netaddr.IPPort{}, errors.New("invalid uring.UDPConn")
}
r := &u.reqs[0]
n := C.receive_into(u.ptr)
var idx C.size_t
n := C.receive_into(u.ptr, &idx)
if n < 0 {
return 0, netaddr.IPPort{}, errors.New("something wrong")
}
// TODO: this is broken :(((
r := &u.reqs[int(idx)]
ip := C.ip(&r.sa)
var ip4 [4]byte
binary.BigEndian.PutUint32(ip4[:], uint32(ip))
@@ -107,7 +108,7 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) {
ipp := netaddr.IPPortFrom(netaddr.IPFrom4(ip4), uint16(port))
copy(buf, r.buf[:n])
// Queue up a new request.
err := u.submitRequest(r)
err := u.submitRequest(int(idx))
if err != nil {
panic("how should we handle this?")
}
@@ -115,7 +116,7 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) {
}
func (u *UDPConn) Close() error {
fmt.Println("CLOSE URING", u)
// fmt.Println("CLOSE URING", u)
u.close.Do(func() {
// Send a nop to unblock any outstanding readers.
// Hope that we manage to close before any new readers appear.
@@ -126,9 +127,9 @@ func (u *UDPConn) Close() error {
//
// Update: this causes crashes, because of entirely predictable and predicted races.
// The mystery about how to safely unblock all outstanding io_uring_wait_cqe calls remains...
fmt.Println("io_uring_queue_exit", u.ptr)
// fmt.Println("io_uring_queue_exit", u.ptr)
C.io_uring_queue_exit(u.ptr)
fmt.Println("DONE io_uring_queue_exit", u.ptr)
// fmt.Println("DONE io_uring_queue_exit", u.ptr)
u.ptr = nil
u.conn.Close()
u.conn = nil