From 8478d34cca4cf3e324ccc10ed58a11172318bf64 Mon Sep 17 00:00:00 2001 From: Josh Bleecher Snyder Date: Thu, 8 Jul 2021 13:47:07 -0700 Subject: [PATCH] doc and improve shutdown refcounting --- net/uring/io_uring_linux.go | 60 +++++++++++++++++++++++-------------- 1 file changed, 38 insertions(+), 22 deletions(-) diff --git a/net/uring/io_uring_linux.go b/net/uring/io_uring_linux.go index d99a1bc42..dad042631 100644 --- a/net/uring/io_uring_linux.go +++ b/net/uring/io_uring_linux.go @@ -68,9 +68,31 @@ type UDPConn struct { sendReqC chan int // is4 indicates whether the conn is an IPv4 connection. is4 bool - // reads counts the number of outstanding read requests. - // It is accessed atomically. - reads int32 + + // refcount counts the number of outstanding read/write requests. + // It is accessed atomically. refcount is used for graceful shutdown. + // The pattern (very roughly) is: + // + // func readOrWrite() { + // refcount++ + // defer refcount-- + // if closed { + // return + // } + // // ... + // } + // + // Close sets closed to true and polls until refcount hits zero. + // Once refcount hits zero, there are no ongoing reads or writes. + // Any future reads or writes will exit immediately (because closed is true), + // so resources used by reads and writes may be freed. + // The polling is unfortunate, but it occurs only during Close, is fast, + // and avoids ugly sequencing issues around canceling outstanding io_uring submissions. + // + // (The obvious alternative is to use a sync.RWMutex, but that has a chicken-and-egg problem. + // Reads/writes must take an rlock, but Close cannot take a wlock under all the rlocks are released, + // but Close cannot issue cancellations to release the rlocks without first taking a wlock.) + refcount int32 } func NewUDPConn(pconn net.PacketConn) (*UDPConn, error) { @@ -178,14 +200,13 @@ func sliceOf(ptr *C.char, n int) []byte { } func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) { - // Important: register that there is a read before checking whether the conn is closed. - // Close assumes that once it has set u.closed to non-zero there are no "hidden" reads outstanding, - // as there could be if we did this in the other order. - atomic.AddInt32(&u.reads, 1) - defer atomic.AddInt32(&u.reads, -1) + // The docs for the u.refcount field document this prologue. + atomic.AddInt32(&u.refcount, 1) + defer atomic.AddInt32(&u.refcount, -1) if atomic.LoadUint32(&u.closed) != 0 { return 0, netaddr.IPPort{}, net.ErrClosed } + n, idx, err := waitCompletion(u.recvRing) if err != nil { if errors.Is(err, syscall.ECANCELED) { @@ -232,27 +253,18 @@ func (u *UDPConn) Close() error { // It is now not possible for u.reads to reach zero without // all reads being unblocked. - // Busy loop until all reads are unblocked. - // This is unpleasant, but I don't know of another way that - // doesn't introduce significant synchronization overhead. - // (The obvious alternative is to use a sync.RWMutex, - // but that has a chicken-and-egg problem: Reads must take an rlock, - // but we cannot take a wlock under all the rlocks are released, - // but we cannot issue cancellations to release the rlocks without - // first taking a wlock.) - BusyLoop: + // Busy loop until all reads and writes are unblocked. + // See the docs for u.refcount. for { for idx := range u.recvReqs { if atomic.LoadInt32(u.recvReqInKernel(idx)) != 0 { C.submit_cancel_request(u.recvRing, C.size_t(idx)) } } - reads := atomic.LoadInt32(&u.reads) - if reads > 0 { - time.Sleep(time.Millisecond) - } else { - break BusyLoop + if atomic.LoadInt32(&u.refcount) == 0 { + break } + time.Sleep(time.Millisecond) } // TODO: block until no one else uses our rings. // (Or is that unnecessary now?) @@ -287,9 +299,13 @@ func (c *UDPConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { } func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { + // The docs for the u.refcount field document this prologue. + atomic.AddInt32(&u.refcount, 1) + defer atomic.AddInt32(&u.refcount, -1) if atomic.LoadUint32(&u.closed) != 0 { return 0, net.ErrClosed } + udpAddr, ok := addr.(*net.UDPAddr) if !ok { return 0, fmt.Errorf("cannot WriteTo net.Addr of type %T", addr)