incorporate recvOut into recvReqs

This commit is contained in:
Josh Bleecher Snyder
2021-07-08 12:35:34 -07:00
parent 6ec3378f7b
commit 3e6e5a2eee
2 changed files with 25 additions and 21 deletions

View File

@@ -40,6 +40,9 @@ struct req {
struct iovec iov;
struct sockaddr_in sa;
struct sockaddr_in6 sa6;
// in_kernel indicates (by being non-zero) whether this request is sitting in the kernel
// It is accessed atomically.
int32_t in_kernel;
char *buf;
};

View File

@@ -33,20 +33,25 @@
// For now, keep it simple and enqueue/dequeue in a single step.
type UDPConn struct {
// We have two urings so that we don't have to demux completion events.
// recvRing is the uring for recvmsg calls.
recvRing *C.go_uring
// sendRing is the uring for sendmsg calls.
sendRing *C.go_uring
// close ensures that connection closes occur exactly once.
close sync.Once
// closed is an atomic variable that indicates whether the connection has been closed.
// TODO: Make an atomic bool type that we can use here.
closed uint32
local net.Addr
closed uint32
// local is the local address of this UDPConn.
local net.Addr
// recvReqs is an array of re-usable UDP recvmsg requests.
// They bounce back and forth between us and the kernel.
// The array length is tied to the size of the uring.
recvReqs [8]*C.goreq
// recvOut tracks which indices into recvReqs are outstanding in the kernel.
// Its elements are accessed atomically.
recvOut [8]int32
sendReqs [8]*C.goreq
sendReqC chan int // indices into sendReqs
is4 bool
@@ -123,10 +128,14 @@ func (u *UDPConn) submitRecvRequest(idx int) error {
if errno < 0 {
return fmt.Errorf("uring.submitRecvRequest failed: %w", syscall.Errno(-errno)) // TODO: Improve
}
atomic.AddInt32(&u.recvOut[idx], 1) // TODO: CAS?
atomic.AddInt32(u.recvReqInKernel(idx), 1) // TODO: CAS?
return nil
}
func (u *UDPConn) recvReqInKernel(idx int) *int32 {
return (*int32)(unsafe.Pointer(&u.recvReqs[idx].in_kernel))
}
// TODO: replace with unsafe.Slice once we are using Go 1.17.
func sliceOf(ptr *C.char, n int) []byte {
@@ -150,12 +159,12 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) {
n, idx, err := waitCompletion(u.recvRing)
if err != nil {
if errors.Is(err, syscall.ECANCELED) {
atomic.AddInt32(&u.recvOut[idx], -1)
atomic.AddInt32(u.recvReqInKernel(idx), -1)
}
// io_uring failed to run our syscall.
return 0, netaddr.IPPort{}, fmt.Errorf("ReadFromNetaddr io_uring could not run syscall: %w", err)
}
atomic.AddInt32(&u.recvOut[idx], -1)
atomic.AddInt32(u.recvReqInKernel(idx), -1)
if n < 0 {
// io_uring ran our syscall, which failed.
// Best effort attempt not to leak idx.
@@ -203,8 +212,8 @@ func (u *UDPConn) Close() error {
// first taking a wlock.)
BusyLoop:
for {
for idx := range u.recvOut {
if atomic.LoadInt32(&u.recvOut[idx]) != 0 {
for idx := range u.recvReqs {
if atomic.LoadInt32(u.recvReqInKernel(idx)) != 0 {
C.submit_cancel_request(u.recvRing, C.size_t(idx))
}
}
@@ -311,17 +320,9 @@ func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
// LocalAddr returns the local network address.
func (c *UDPConn) LocalAddr() net.Addr { return c.local }
func (c *UDPConn) SetDeadline(t time.Time) error {
panic("not implemented") // TODO: Implement
}
func (c *UDPConn) SetReadDeadline(t time.Time) error {
panic("not implemented") // TODO: Implement
}
func (c *UDPConn) SetWriteDeadline(t time.Time) error {
panic("not implemented") // TODO: Implement
}
func (c *UDPConn) SetDeadline(t time.Time) error { panic("not implemented") }
func (c *UDPConn) SetReadDeadline(t time.Time) error { panic("not implemented") }
func (c *UDPConn) SetWriteDeadline(t time.Time) error { panic("not implemented") }
// Files!