doc and improve shutdown refcounting

This commit is contained in:
Josh Bleecher Snyder
2021-07-08 13:47:07 -07:00
parent f329d69fb4
commit 8478d34cca

View File

@@ -68,9 +68,31 @@ type UDPConn struct {
sendReqC chan int
// is4 indicates whether the conn is an IPv4 connection.
is4 bool
// reads counts the number of outstanding read requests.
// It is accessed atomically.
reads int32
// refcount counts the number of outstanding read/write requests.
// It is accessed atomically. refcount is used for graceful shutdown.
// The pattern (very roughly) is:
//
// func readOrWrite() {
// refcount++
// defer refcount--
// if closed {
// return
// }
// // ...
// }
//
// Close sets closed to true and polls until refcount hits zero.
// Once refcount hits zero, there are no ongoing reads or writes.
// Any future reads or writes will exit immediately (because closed is true),
// so resources used by reads and writes may be freed.
// The polling is unfortunate, but it occurs only during Close, is fast,
// and avoids ugly sequencing issues around canceling outstanding io_uring submissions.
//
// (The obvious alternative is to use a sync.RWMutex, but that has a chicken-and-egg problem.
// Reads/writes must take an rlock, but Close cannot take a wlock under all the rlocks are released,
// but Close cannot issue cancellations to release the rlocks without first taking a wlock.)
refcount int32
}
func NewUDPConn(pconn net.PacketConn) (*UDPConn, error) {
@@ -178,14 +200,13 @@ func sliceOf(ptr *C.char, n int) []byte {
}
func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) {
// Important: register that there is a read before checking whether the conn is closed.
// Close assumes that once it has set u.closed to non-zero there are no "hidden" reads outstanding,
// as there could be if we did this in the other order.
atomic.AddInt32(&u.reads, 1)
defer atomic.AddInt32(&u.reads, -1)
// The docs for the u.refcount field document this prologue.
atomic.AddInt32(&u.refcount, 1)
defer atomic.AddInt32(&u.refcount, -1)
if atomic.LoadUint32(&u.closed) != 0 {
return 0, netaddr.IPPort{}, net.ErrClosed
}
n, idx, err := waitCompletion(u.recvRing)
if err != nil {
if errors.Is(err, syscall.ECANCELED) {
@@ -232,27 +253,18 @@ func (u *UDPConn) Close() error {
// It is now not possible for u.reads to reach zero without
// all reads being unblocked.
// Busy loop until all reads are unblocked.
// This is unpleasant, but I don't know of another way that
// doesn't introduce significant synchronization overhead.
// (The obvious alternative is to use a sync.RWMutex,
// but that has a chicken-and-egg problem: Reads must take an rlock,
// but we cannot take a wlock under all the rlocks are released,
// but we cannot issue cancellations to release the rlocks without
// first taking a wlock.)
BusyLoop:
// Busy loop until all reads and writes are unblocked.
// See the docs for u.refcount.
for {
for idx := range u.recvReqs {
if atomic.LoadInt32(u.recvReqInKernel(idx)) != 0 {
C.submit_cancel_request(u.recvRing, C.size_t(idx))
}
}
reads := atomic.LoadInt32(&u.reads)
if reads > 0 {
time.Sleep(time.Millisecond)
} else {
break BusyLoop
if atomic.LoadInt32(&u.refcount) == 0 {
break
}
time.Sleep(time.Millisecond)
}
// TODO: block until no one else uses our rings.
// (Or is that unnecessary now?)
@@ -287,9 +299,13 @@ func (c *UDPConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
}
func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
// The docs for the u.refcount field document this prologue.
atomic.AddInt32(&u.refcount, 1)
defer atomic.AddInt32(&u.refcount, -1)
if atomic.LoadUint32(&u.closed) != 0 {
return 0, net.ErrClosed
}
udpAddr, ok := addr.(*net.UDPAddr)
if !ok {
return 0, fmt.Errorf("cannot WriteTo net.Addr of type %T", addr)