diff --git a/net/uring/io_uring.c b/net/uring/io_uring.c index 68b4ffd00..aa5254137 100644 --- a/net/uring/io_uring.c +++ b/net/uring/io_uring.c @@ -109,33 +109,6 @@ static void submit_nop_request(struct io_uring *ring) { io_uring_submit(ring); } -// Wait for a completion to be available, fetch the data -static uint64_t wait_completion(struct io_uring *ring) { - struct io_uring_cqe *cqe; -again:; - - int ret = io_uring_wait_cqe(ring, &cqe); - if (ret == -EINTR) { - goto again; - } - // TODO: Delete perror, fprintf, etc. - // Encode in return value or similar. - if (ret < 0) { - perror("wait_completion io_uring_wait_cqe"); - return ret; - } - int n = cqe->res; - if (n < 0) { - // TODO: This leaks a buffer!!! - fprintf(stderr, "wait_completion failed: %d.\n", n); - return n; - } - size_t idx = (size_t)io_uring_cqe_get_data(cqe); - uint64_t nidx = packNIdx(n, idx); - io_uring_cqe_seen(ring, cqe); - return nidx; -} - // submit a writev request via liburing static int submit_writev_request(struct io_uring *ring, struct req *r, int buflen, size_t idx) { r->iov.iov_len = buflen; @@ -157,27 +130,27 @@ static int submit_readv_request(struct io_uring *ring, struct req *r, size_t idx return 0; } -static uint64_t peek_completion(struct io_uring *ring) { +static uint64_t completion(struct io_uring *ring, int block) { struct io_uring_cqe *cqe; - int ret = io_uring_peek_cqe(ring, &cqe); - if ((-ret == EAGAIN) || (-ret == EINTR)) { - return ret; + int ret; + if (block) { + ret = io_uring_wait_cqe(ring, &cqe); + } else { + ret = io_uring_peek_cqe(ring, &cqe); } - // TODO: Delete perror, fprintf, etc. - // Encode in return value or similar. if (ret < 0) { - perror("on failure, peek_file_completion io_uring_wait_cqe"); return ret; } - errno = 0; + // TODO: We need to always return idx, otherwise we leak it!! Need to adjust all callers. int n = cqe->res; + uint64_t nidx; if (n < 0) { - // TODO: This leaks a buffer!!! - fprintf(stderr, "peek_file_completion write failed: %d.\n", n); - return n; + // common error seen here: -101 (network unreachable) + nidx = n; + } else { + size_t idx = (size_t)io_uring_cqe_get_data(cqe); + nidx = packNIdx(n, idx); } - size_t idx = (size_t)io_uring_cqe_get_data(cqe); - uint64_t nidx = packNIdx(n, idx); io_uring_cqe_seen(ring, cqe); return nidx; } diff --git a/net/uring/io_uring_linux.go b/net/uring/io_uring_linux.go index e330b5a05..e974f44ee 100644 --- a/net/uring/io_uring_linux.go +++ b/net/uring/io_uring_linux.go @@ -131,8 +131,7 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) { if u.fd == 0 { return 0, netaddr.IPPort{}, errors.New("invalid uring.UDPConn") } - nidx := C.wait_completion(u.recvRing) - n, idx, err := unpackNIdx(nidx) + n, idx, err := waitCompletion(u.recvRing) if err != nil { return 0, netaddr.IPPort{}, fmt.Errorf("ReadFromNetaddr: %v", err) } @@ -224,9 +223,7 @@ func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { case idx = <-u.sendReqC: default: // No request available. Get one from the kernel. - nidx := C.wait_completion(u.sendRing) - var err error - _, idx, err = unpackNIdx(nidx) + _, idx, err = waitCompletion(u.sendRing) if err != nil { return 0, fmt.Errorf("some WriteTo failed, maybe long ago: %v", err) } @@ -256,16 +253,12 @@ func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { C.size_t(idx), // user data ) // Get an extra buffer, if available. - nidx := C.peek_completion(u.sendRing) - if syscall.Errno(-nidx) == syscall.EAGAIN || syscall.Errno(-nidx) == syscall.EINTR { - // Nothing waiting for us. - } else { - _, idx, err := unpackNIdx(nidx) // ignore errors here, this is best-effort only (TODO: right?) - if err == nil { - // Put the request buffer back in the usable queue. - // Should never block, by construction. - u.sendReqC <- idx - } + _, idx, err, ready := peekCompletion(u.sendRing) + if ready && err == nil { + // TODO: always put the buffer back?? + // Put the request buffer back in the usable queue. + // Should never block, by construction. + u.sendReqC <- idx } return len(p), nil } @@ -356,6 +349,31 @@ func unpackNIdx(nidx C.uint64_t) (n, idx int, err error) { return int(uint32(nidx >> 32)), int(uint32(nidx)), nil } +const ( + noBlockForCompletion = 0 + blockForCompletion = 1 +) + +func waitCompletion(ring *C.go_uring) (n, idx int, err error) { + for { + nidx := C.completion(ring, blockForCompletion) + if syscall.Errno(-nidx) == syscall.EAGAIN { + continue + } + return unpackNIdx(nidx) + } +} + +func peekCompletion(ring *C.go_uring) (n, idx int, err error, ready bool) { + nidx := C.completion(ring, noBlockForCompletion) + if syscall.Errno(-nidx) == syscall.EAGAIN || syscall.Errno(-nidx) == syscall.EINTR { + return 0, 0, nil, false + // Nothing waiting for us. + } + n, idx, err = unpackNIdx(nidx) + return n, idx, err, true +} + type fileReq struct { iov C.go_iovec buf [device.MaxSegmentSize]byte @@ -367,8 +385,7 @@ func (u *file) Read(buf []byte) (n int, err error) { // read a packet from the d if u.fd == 0 { return 0, errors.New("invalid uring.File") } - nidx := C.wait_completion(u.readRing) - n, idx, err := unpackNIdx(nidx) + n, idx, err := waitCompletion(u.readRing) if err != nil || n < 4 { return 0, fmt.Errorf("Read: %v", err) } @@ -393,9 +410,8 @@ func (u *file) Write(buf []byte) (int, error) { case idx = <-u.writeReqC: default: // No request available. Get one from the kernel. - nidx := C.wait_completion(u.writeRing) var err error - _, idx, err = unpackNIdx(nidx) + _, idx, err = waitCompletion(u.writeRing) if err != nil { return 0, fmt.Errorf("some write failed, maybe long ago: %v", err) } @@ -406,16 +422,12 @@ func (u *file) Write(buf []byte) (int, error) { copy(rbuf, buf) C.submit_writev_request(u.writeRing, r, C.int(len(buf)), C.size_t(idx)) // Get an extra buffer, if available. - nidx := C.peek_completion(u.writeRing) - if syscall.Errno(-nidx) == syscall.EAGAIN || syscall.Errno(-nidx) == syscall.EINTR { - // Nothing waiting for us. - } else { - _, idx, err := unpackNIdx(nidx) // ignore errors here, this is best-effort only (TODO: right?) - if err == nil { - // Put the request buffer back in the usable queue. - // Should never block, by construction. - u.writeReqC <- idx - } + _, idx, err, ready := peekCompletion(u.writeRing) + if ready && err == nil { + // TODO: put the buffer back even on error? + // Put the request buffer back in the usable queue. + // Should never block, by construction. + u.writeReqC <- idx } return len(buf), nil }