From 3e6e5a2eee79b33a90aaaa0fb777ed467ceccddc Mon Sep 17 00:00:00 2001 From: Josh Bleecher Snyder Date: Thu, 8 Jul 2021 12:35:34 -0700 Subject: [PATCH] incorporate recvOut into recvReqs --- net/uring/io_uring.c | 3 +++ net/uring/io_uring_linux.go | 43 +++++++++++++++++++------------------ 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/net/uring/io_uring.c b/net/uring/io_uring.c index a87f109a6..2a3c2aa2f 100644 --- a/net/uring/io_uring.c +++ b/net/uring/io_uring.c @@ -40,6 +40,9 @@ struct req { struct iovec iov; struct sockaddr_in sa; struct sockaddr_in6 sa6; + // in_kernel indicates (by being non-zero) whether this request is sitting in the kernel + // It is accessed atomically. + int32_t in_kernel; char *buf; }; diff --git a/net/uring/io_uring_linux.go b/net/uring/io_uring_linux.go index 8227c8434..8810483da 100644 --- a/net/uring/io_uring_linux.go +++ b/net/uring/io_uring_linux.go @@ -33,20 +33,25 @@ // For now, keep it simple and enqueue/dequeue in a single step. type UDPConn struct { // We have two urings so that we don't have to demux completion events. + // recvRing is the uring for recvmsg calls. recvRing *C.go_uring // sendRing is the uring for sendmsg calls. sendRing *C.go_uring + // close ensures that connection closes occur exactly once. close sync.Once // closed is an atomic variable that indicates whether the connection has been closed. // TODO: Make an atomic bool type that we can use here. - closed uint32 - local net.Addr + closed uint32 + + // local is the local address of this UDPConn. + local net.Addr + + // recvReqs is an array of re-usable UDP recvmsg requests. + // They bounce back and forth between us and the kernel. + // The array length is tied to the size of the uring. recvReqs [8]*C.goreq - // recvOut tracks which indices into recvReqs are outstanding in the kernel. - // Its elements are accessed atomically. - recvOut [8]int32 sendReqs [8]*C.goreq sendReqC chan int // indices into sendReqs is4 bool @@ -123,10 +128,14 @@ func (u *UDPConn) submitRecvRequest(idx int) error { if errno < 0 { return fmt.Errorf("uring.submitRecvRequest failed: %w", syscall.Errno(-errno)) // TODO: Improve } - atomic.AddInt32(&u.recvOut[idx], 1) // TODO: CAS? + atomic.AddInt32(u.recvReqInKernel(idx), 1) // TODO: CAS? return nil } +func (u *UDPConn) recvReqInKernel(idx int) *int32 { + return (*int32)(unsafe.Pointer(&u.recvReqs[idx].in_kernel)) +} + // TODO: replace with unsafe.Slice once we are using Go 1.17. func sliceOf(ptr *C.char, n int) []byte { @@ -150,12 +159,12 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) { n, idx, err := waitCompletion(u.recvRing) if err != nil { if errors.Is(err, syscall.ECANCELED) { - atomic.AddInt32(&u.recvOut[idx], -1) + atomic.AddInt32(u.recvReqInKernel(idx), -1) } // io_uring failed to run our syscall. return 0, netaddr.IPPort{}, fmt.Errorf("ReadFromNetaddr io_uring could not run syscall: %w", err) } - atomic.AddInt32(&u.recvOut[idx], -1) + atomic.AddInt32(u.recvReqInKernel(idx), -1) if n < 0 { // io_uring ran our syscall, which failed. // Best effort attempt not to leak idx. @@ -203,8 +212,8 @@ func (u *UDPConn) Close() error { // first taking a wlock.) BusyLoop: for { - for idx := range u.recvOut { - if atomic.LoadInt32(&u.recvOut[idx]) != 0 { + for idx := range u.recvReqs { + if atomic.LoadInt32(u.recvReqInKernel(idx)) != 0 { C.submit_cancel_request(u.recvRing, C.size_t(idx)) } } @@ -311,17 +320,9 @@ func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { // LocalAddr returns the local network address. func (c *UDPConn) LocalAddr() net.Addr { return c.local } -func (c *UDPConn) SetDeadline(t time.Time) error { - panic("not implemented") // TODO: Implement -} - -func (c *UDPConn) SetReadDeadline(t time.Time) error { - panic("not implemented") // TODO: Implement -} - -func (c *UDPConn) SetWriteDeadline(t time.Time) error { - panic("not implemented") // TODO: Implement -} +func (c *UDPConn) SetDeadline(t time.Time) error { panic("not implemented") } +func (c *UDPConn) SetReadDeadline(t time.Time) error { panic("not implemented") } +func (c *UDPConn) SetWriteDeadline(t time.Time) error { panic("not implemented") } // Files!