diff --git a/net/uring/io_uring.c b/net/uring/io_uring.c index aa5254137..2dcb8caf2 100644 --- a/net/uring/io_uring.c +++ b/net/uring/io_uring.c @@ -130,27 +130,31 @@ static int submit_readv_request(struct io_uring *ring, struct req *r, size_t idx return 0; } -static uint64_t completion(struct io_uring *ring, int block) { + +struct completion_result { + int err; + int n; + size_t idx; +}; + +typedef struct completion_result go_completion_result; + +static go_completion_result completion(struct io_uring *ring, int block) { struct io_uring_cqe *cqe; - int ret; + struct completion_result res; + res.err = 0; + res.n = 0; + res.idx = 0; if (block) { - ret = io_uring_wait_cqe(ring, &cqe); + res.err = io_uring_wait_cqe(ring, &cqe); } else { - ret = io_uring_peek_cqe(ring, &cqe); + res.err = io_uring_peek_cqe(ring, &cqe); } - if (ret < 0) { - return ret; - } - // TODO: We need to always return idx, otherwise we leak it!! Need to adjust all callers. - int n = cqe->res; - uint64_t nidx; - if (n < 0) { - // common error seen here: -101 (network unreachable) - nidx = n; - } else { - size_t idx = (size_t)io_uring_cqe_get_data(cqe); - nidx = packNIdx(n, idx); + if (res.err < 0) { + return res; } + res.idx = (size_t)io_uring_cqe_get_data(cqe); + res.n = cqe->res; io_uring_cqe_seen(ring, cqe); - return nidx; + return res; } diff --git a/net/uring/io_uring_linux.go b/net/uring/io_uring_linux.go index e974f44ee..7eaa40729 100644 --- a/net/uring/io_uring_linux.go +++ b/net/uring/io_uring_linux.go @@ -12,6 +12,7 @@ "os" "reflect" "sync" + "sync/atomic" "syscall" "time" "unsafe" @@ -36,7 +37,7 @@ type UDPConn struct { close sync.Once conn *net.UDPConn file *os.File // must keep file from being GC'd - fd C.int + fd uintptr local net.Addr recvReqs [8]*C.goreq sendReqs [8]*C.goreq @@ -67,9 +68,9 @@ func NewUDPConn(pconn net.PacketConn) (*UDPConn, error) { recvRing := new(C.go_uring) sendRing := new(C.go_uring) - fd := C.int(file.Fd()) + fd := file.Fd() for _, r := range []*C.go_uring{recvRing, sendRing} { - ret := C.initialize(r, fd) + ret := C.initialize(r, C.int(fd)) if ret < 0 { // TODO: free recvRing if sendRing initialize failed return nil, fmt.Errorf("uring initialization failed: %d", ret) @@ -131,16 +132,22 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) { if u.fd == 0 { return 0, netaddr.IPPort{}, errors.New("invalid uring.UDPConn") } - n, idx, err := waitCompletion(u.recvRing) + n, idx, err := waitCompletion(u.recvRing, &u.fd) if err != nil { - return 0, netaddr.IPPort{}, fmt.Errorf("ReadFromNetaddr: %v", err) + // io_uring failed to run our syscall. + return 0, netaddr.IPPort{}, fmt.Errorf("ReadFromNetaddr io_uring could not run syscall: %w", err) + } + if n < 0 { + // io_uring ran our syscall, which failed. + // Best effort attempt not to leak idx. + u.submitRecvRequest(int(idx)) + return 0, netaddr.IPPort{}, fmt.Errorf("ReadFromNetaddr syscall failed: %w", syscall.Errno(-n)) } r := u.recvReqs[idx] - var ip netaddr.IP var port uint16 + // TODO: native go endianness conversion routines so we don't have to call ntohl, etc. if u.is4 { - // TODO: native go endianness conversion routines so we don't have to call ntohl, etc. ip = netaddr.IPFrom4(*(*[4]byte)((unsafe.Pointer)((&r.sa.sin_addr.s_addr)))) port = uint16(C.ntohs(r.sa.sin_port)) } else { @@ -151,10 +158,11 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) { rbuf := sliceOf(r.buf, n) copy(buf, rbuf) // Queue up a new request. - // TODO: Do this in a goroutine? - err = u.submitRecvRequest(int(idx)) - if err != nil { - panic("how should we handle this?") + if err := u.submitRecvRequest(int(idx)); err != nil { + // Aggressively return this error. + // The error will bubble up and cause the entire conn to be closed down, + // so it doesn't matter that we lost a packet here. + return 0, netaddr.IPPort{}, err } return n, ipp, nil } @@ -223,9 +231,15 @@ func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { case idx = <-u.sendReqC: default: // No request available. Get one from the kernel. - _, idx, err = waitCompletion(u.sendRing) + n, idx, err = waitCompletion(u.sendRing, &u.fd) if err != nil { - return 0, fmt.Errorf("some WriteTo failed, maybe long ago: %v", err) + // io_uring failed to issue the syscall. + return 0, fmt.Errorf("WriteTo io_uring call failed: %w", err) + } + if n < 0 { + // Past syscall failed. + u.sendReqC <- idx // don't leak idx + return 0, fmt.Errorf("previous WriteTo failed: %w", syscall.Errno(-n)) } } r := u.sendReqs[idx] @@ -253,9 +267,7 @@ func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { C.size_t(idx), // user data ) // Get an extra buffer, if available. - _, idx, err, ready := peekCompletion(u.sendRing) - if ready && err == nil { - // TODO: always put the buffer back?? + if idx, ok := peekCompletion(u.sendRing); ok { // Put the request buffer back in the usable queue. // Should never block, by construction. u.sendReqC <- idx @@ -288,21 +300,21 @@ type file struct { readRing *C.go_uring close sync.Once file *os.File // must keep file from being GC'd - fd C.int + fd uintptr readReqs [1]*C.goreq // Whoops! The kernel apparently cannot handle more than 1 concurrent preadv calls on a tun device! writeReqs [8]*C.goreq writeReqC chan int // indices into reqs } func newFile(f *os.File) (*file, error) { - fd := C.int(f.Fd()) + fd := f.Fd() u := &file{ file: f, fd: fd, } for _, ringPtr := range []**C.go_uring{&u.writeRing, &u.readRing} { r := new(C.go_uring) - ret := C.initialize(r, fd) + ret := C.initialize(r, C.int(fd)) if ret < 0 { // TODO: handle unwinding partial initialization return nil, fmt.Errorf("uring initialization failed: %d", ret) @@ -342,36 +354,39 @@ func (u *file) submitReadvRequest(idx int) error { return nil } -func unpackNIdx(nidx C.uint64_t) (n, idx int, err error) { - if int64(nidx) < 0 { - return 0, 0, fmt.Errorf("error %d", int64(nidx)) - } - return int(uint32(nidx >> 32)), int(uint32(nidx)), nil -} - const ( noBlockForCompletion = 0 blockForCompletion = 1 ) -func waitCompletion(ring *C.go_uring) (n, idx int, err error) { +// waitCompletion blocks until a completion on ring succeeds, or until *fd == 0. +// If *fd == 0, that indicates that the ring is no loner valid, in which case waitCompletion returns net.ErrClosed. +// Reads of *fd are atomic. +func waitCompletion(ring *C.go_uring, fd *uintptr) (n, idx int, err error) { for { - nidx := C.completion(ring, blockForCompletion) - if syscall.Errno(-nidx) == syscall.EAGAIN { + // If we have exited, stop looping. + if atomic.LoadUintptr(fd) == 0 { + return 0, 0, net.ErrClosed + } + // TODO: is this racy? + r := C.completion(ring, blockForCompletion) + if syscall.Errno(-r.err) == syscall.EAGAIN { continue } - return unpackNIdx(nidx) + var err error + if r.err < 0 { + err = syscall.Errno(-r.err) + } + return int(r.n), int(r.idx), err } } -func peekCompletion(ring *C.go_uring) (n, idx int, err error, ready bool) { - nidx := C.completion(ring, noBlockForCompletion) - if syscall.Errno(-nidx) == syscall.EAGAIN || syscall.Errno(-nidx) == syscall.EINTR { - return 0, 0, nil, false - // Nothing waiting for us. +func peekCompletion(ring *C.go_uring) (idx int, ok bool) { + r := C.completion(ring, noBlockForCompletion) + if r.err < 0 { + return 0, false } - n, idx, err = unpackNIdx(nidx) - return n, idx, err, true + return int(r.idx), true } type fileReq struct { @@ -382,20 +397,26 @@ type fileReq struct { // Read data into buf[offset:]. // We are allowed to write junk into buf[offset-4:offset]. func (u *file) Read(buf []byte) (n int, err error) { // read a packet from the device (without any additional headers) - if u.fd == 0 { + if u.fd == 0 { // TODO: review all uses of u.fd for atomic read/write return 0, errors.New("invalid uring.File") } - n, idx, err := waitCompletion(u.readRing) - if err != nil || n < 4 { - return 0, fmt.Errorf("Read: %v", err) + n, idx, err := waitCompletion(u.readRing, &u.fd) + if err != nil { + return 0, fmt.Errorf("Read: io_uring failed to issue syscall: %w", err) } + if n < 0 { + // Syscall failed. + u.submitReadvRequest(int(idx)) // best effort attempt not to leak idx + return 0, fmt.Errorf("Read: syscall failed: %w", syscall.Errno(-n)) + } + // Success. r := u.readReqs[idx] rbuf := sliceOf(r.buf, n) copy(buf, rbuf) // Queue up a new request. - err = u.submitReadvRequest(int(idx)) - if err != nil { - panic("how should we handle this?") + if err := u.submitReadvRequest(int(idx)); err != nil { + // Aggressively return this error. + return 0, err } return n, nil } @@ -410,10 +431,14 @@ func (u *file) Write(buf []byte) (int, error) { case idx = <-u.writeReqC: default: // No request available. Get one from the kernel. - var err error - _, idx, err = waitCompletion(u.writeRing) + n, idx, err := waitCompletion(u.writeRing, &u.fd) if err != nil { - return 0, fmt.Errorf("some write failed, maybe long ago: %v", err) + return 0, fmt.Errorf("Write io_uring call failed: %w", err) + } + if n < 0 { + // Past syscall failed. + u.writeReqC <- idx // don't leak idx + return 0, fmt.Errorf("previous Write failed: %w", syscall.Errno(-n)) } } r := u.writeReqs[idx] @@ -422,9 +447,8 @@ func (u *file) Write(buf []byte) (int, error) { copy(rbuf, buf) C.submit_writev_request(u.writeRing, r, C.int(len(buf)), C.size_t(idx)) // Get an extra buffer, if available. - _, idx, err, ready := peekCompletion(u.writeRing) - if ready && err == nil { - // TODO: put the buffer back even on error? + idx, ok := peekCompletion(u.writeRing) + if ok { // Put the request buffer back in the usable queue. // Should never block, by construction. u.writeReqC <- idx