diff --git a/net/uring/io_uring.c b/net/uring/io_uring.c index cd0ac987c..a87f109a6 100644 --- a/net/uring/io_uring.c +++ b/net/uring/io_uring.c @@ -102,6 +102,12 @@ static void submit_nop_request(struct io_uring *ring) { io_uring_submit(ring); } +static void submit_cancel_request(struct io_uring *ring, size_t idx) { + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + io_uring_prep_cancel(sqe, (void *)(idx), 0); + io_uring_submit(ring); +} + // submit a writev request via liburing static int submit_writev_request(struct io_uring *ring, struct req *r, int buflen, size_t idx) { r->iov.iov_len = buflen; diff --git a/net/uring/io_uring_linux.go b/net/uring/io_uring_linux.go index 7eaa40729..91a43a3ef 100644 --- a/net/uring/io_uring_linux.go +++ b/net/uring/io_uring_linux.go @@ -37,12 +37,21 @@ type UDPConn struct { close sync.Once conn *net.UDPConn file *os.File // must keep file from being GC'd + // fd is the underlying fd associated with this connection. + // It is set to zero when the connection closes. + // It is accessed atomically. fd uintptr local net.Addr recvReqs [8]*C.goreq + // recvOut tracks which indices into recvReqs are outstanding in the kernel. + // Its elements are accessed atomically. + recvOut [8]int32 sendReqs [8]*C.goreq sendReqC chan int // indices into sendReqs is4 bool + // reads counts the number of outstanding read requests. + // It is accessed atomically. + reads int32 } func NewUDPConn(pconn net.PacketConn) (*UDPConn, error) { @@ -112,8 +121,9 @@ func (u *UDPConn) submitRecvRequest(idx int) error { // TODO: make a C struct instead of a Go struct, and pass that in, to simplify call sites. errno := C.submit_recvmsg_request(u.recvRing, u.recvReqs[idx], C.size_t(idx)) if errno < 0 { - return fmt.Errorf("uring.submitRecvRequest failed: %v", errno) // TODO: Improve + return fmt.Errorf("uring.submitRecvRequest failed: %w", syscall.Errno(-errno)) // TODO: Improve } + atomic.AddInt32(&u.recvOut[idx], 1) // TODO: CAS? return nil } @@ -129,20 +139,33 @@ func sliceOf(ptr *C.char, n int) []byte { } func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) { - if u.fd == 0 { - return 0, netaddr.IPPort{}, errors.New("invalid uring.UDPConn") + // Important: register that there is a read before checking whether the conn is closed. + // Close assumes that once it has set u.fd to zero there are no "hidden" reads outstanding, + // as their could be if we did this in the other order. + atomic.AddInt32(&u.reads, 1) + defer atomic.AddInt32(&u.reads, -1) + if atomic.LoadUintptr(&u.fd) == 0 { + return 0, netaddr.IPPort{}, net.ErrClosed } n, idx, err := waitCompletion(u.recvRing, &u.fd) if err != nil { + if errors.Is(err, syscall.ECANCELED) { + atomic.AddInt32(&u.recvOut[idx], -1) + } // io_uring failed to run our syscall. return 0, netaddr.IPPort{}, fmt.Errorf("ReadFromNetaddr io_uring could not run syscall: %w", err) } + atomic.AddInt32(&u.recvOut[idx], -1) if n < 0 { // io_uring ran our syscall, which failed. // Best effort attempt not to leak idx. u.submitRecvRequest(int(idx)) return 0, netaddr.IPPort{}, fmt.Errorf("ReadFromNetaddr syscall failed: %w", syscall.Errno(-n)) } + // Received nop. + if idx == -1 { + return 0, netaddr.IPPort{}, nil + } r := u.recvReqs[idx] var ip netaddr.IP var port uint16 @@ -169,30 +192,48 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) { func (u *UDPConn) Close() error { u.close.Do(func() { + // Announce to readers and writers that we are closing down. + atomic.StoreUintptr(&u.fd, 0) + // It is now not possible for u.reads to reach zero without + // all reads being unblocked. + + // Busy loop until all reads are unblocked. + // This is unpleasant, but I don't know of another way that + // doesn't introduce significant synchronization overhead. + // (The obvious alternative is to use a sync.RWMutex, + // but that has a chicken-and-egg problem: Reads must take an rlock, + // but we cannot take a wlock under all the rlocks are released, + // but we cannot issue cancellations to release the rlocks without + // first taking a wlock.) + BusyLoop: + for { + for idx := range u.recvOut { + if atomic.LoadInt32(&u.recvOut[idx]) != 0 { + C.submit_cancel_request(u.recvRing, C.size_t(idx)) + } + } + reads := atomic.LoadInt32(&u.reads) + if reads > 0 { + time.Sleep(time.Millisecond) + } else { + break BusyLoop + } + } u.conn.Close() u.conn = nil - // Send a nop to unblock any outstanding readers. - // Hope that we manage to close before any new readers appear. - // Not sure exactly how this is supposed to work reliably... - // I must be missing something. - // - // C.submit_nop_request(u.ptr) - // - // Update: this causes crashes, because of entirely predictable and predicted races. - // The mystery about how to safely unblock all outstanding io_uring_wait_cqe calls remains... - C.io_uring_queue_exit(u.recvRing) - C.io_uring_queue_exit(u.sendRing) - u.recvRing = nil - u.sendRing = nil u.file.Close() u.file = nil - u.fd = 0 + // TODO: block until no one else uses our rings. + // (Or is that unnecessary now?) + C.io_uring_queue_exit(u.recvRing) + C.io_uring_queue_exit(u.sendRing) // Free buffers - for _, reqs := range []*[8]*C.goreq{&u.recvReqs, &u.sendReqs} { - for _, r := range reqs { - C.freeReq(r) - } + for _, r := range u.recvReqs { + C.freeReq(r) + } + for _, r := range u.sendReqs { + C.freeReq(r) } }) return nil @@ -218,8 +259,8 @@ func (c *UDPConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { } func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { - if u.fd == 0 { - return 0, errors.New("invalid uring.UDPConn") + if atomic.LoadUintptr(&u.fd) == 0 { + return 0, net.ErrClosed } udpAddr, ok := addr.(*net.UDPAddr) if !ok { @@ -456,17 +497,15 @@ func (u *file) Write(buf []byte) (int, error) { return len(buf), nil } -// TODO: the TODOs from UDPConn.Close func (u *file) Close() error { u.close.Do(func() { + atomic.StoreUintptr(&u.fd, 0) u.file.Close() - // TODO: require kernel 5.5, send an abort SQE, handle aborts gracefully + u.file = nil + // TODO: bring the shutdown logic from UDPConn.Close here? + // Or is closing the file above enough, unlike for UDP? C.io_uring_queue_exit(u.readRing) C.io_uring_queue_exit(u.writeRing) - u.readRing = nil - u.writeRing = nil - u.file = nil - u.fd = 0 // Free buffers for _, r := range u.readReqs {