From 61cde40000ebf66536a8a4a7485ea849d6781f11 Mon Sep 17 00:00:00 2001 From: Josh Bleecher Snyder Date: Thu, 8 Jul 2021 17:02:42 -0700 Subject: [PATCH] use syncs, start reworking file --- net/uring/file_linux.go | 21 ++++++++++++++++----- net/uring/udp_linux.go | 28 ++++++++++++++-------------- syncs/syncs.go | 15 +++++++++++++++ 3 files changed, 45 insertions(+), 19 deletions(-) diff --git a/net/uring/file_linux.go b/net/uring/file_linux.go index 45274b130..87e05567a 100644 --- a/net/uring/file_linux.go +++ b/net/uring/file_linux.go @@ -15,13 +15,24 @@ "golang.zx2c4.com/wireguard/device" ) -// A File is a write-only file fd manager. -// TODO: Support reads -// TODO: all the todos from UDPConn +// A file is a file handle that uses io_uring for reads and writes. +// It is intended for use with TUN fds, and thus only supports +// reading from and writing to file offset 0. type file struct { + // We have two urings so that we don't have to demux completion events. + + // writeRing is the uring for pwritev calls. writeRing *C.go_uring - readRing *C.go_uring - close sync.Once + // readRing is the uring for preadv calls. + readRing *C.go_uring + + // close ensures that file closes occur exactly once. + close sync.Once + + // closed is an atomic variable that indicates whether the connection has been closed. + // TODO: Make an atomic bool type that we can use here. + closed uint32 + file *os.File // must keep file from being GC'd fd uintptr readReqs [1]*C.goreq // Whoops! The kernel apparently cannot handle more than 1 concurrent preadv calls on a tun device! diff --git a/net/uring/udp_linux.go b/net/uring/udp_linux.go index 20ea27427..b8ed6fecc 100644 --- a/net/uring/udp_linux.go +++ b/net/uring/udp_linux.go @@ -17,12 +17,13 @@ "golang.zx2c4.com/wireguard/device" "inet.af/netaddr" + "tailscale.com/syncs" "tailscale.com/util/endian" ) const bufferSize = device.MaxSegmentSize -// A UDPConn is a recv-only UDP fd manager. +// A UDPConn is a UDP connection that uses io_uring to send and receive packets. type UDPConn struct { // We have two urings so that we don't have to demux completion events. @@ -33,9 +34,8 @@ type UDPConn struct { // close ensures that connection closes occur exactly once. close sync.Once - // closed is an atomic variable that indicates whether the connection has been closed. - // TODO: Make an atomic bool type that we can use here. - closed uint32 + // closed indicates whether the connection has been closed. + closed syncs.AtomicBool // shutdown is a sequence of funcs to be called when the UDPConn closes. shutdown []func() @@ -61,7 +61,7 @@ type UDPConn struct { is4 bool // refcount counts the number of outstanding read/write requests. - // It is accessed atomically. refcount is used for graceful shutdown. + // refcount is used for graceful shutdown. // The pattern (very roughly) is: // // func readOrWrite() { @@ -83,7 +83,7 @@ type UDPConn struct { // (The obvious alternative is to use a sync.RWMutex, but that has a chicken-and-egg problem. // Reads/writes must take an rlock, but Close cannot take a wlock under all the rlocks are released, // but Close cannot issue cancellations to release the rlocks without first taking a wlock.) - refcount int32 + refcount syncs.AtomicInt32 } func NewUDPConn(pconn net.PacketConn) (*UDPConn, error) { @@ -181,9 +181,9 @@ func (u *UDPConn) recvReqInKernel(idx int) *int32 { func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) { // The docs for the u.refcount field document this prologue. - atomic.AddInt32(&u.refcount, 1) - defer atomic.AddInt32(&u.refcount, -1) - if atomic.LoadUint32(&u.closed) != 0 { + u.refcount.Add(1) + defer u.refcount.Add(-1) + if u.closed.Get() { return 0, netaddr.IPPort{}, net.ErrClosed } @@ -238,9 +238,9 @@ func (c *UDPConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { // The docs for the u.refcount field document this prologue. - atomic.AddInt32(&u.refcount, 1) - defer atomic.AddInt32(&u.refcount, -1) - if atomic.LoadUint32(&u.closed) != 0 { + u.refcount.Add(1) + defer u.refcount.Add(-1) + if u.closed.Get() { return 0, net.ErrClosed } @@ -298,7 +298,7 @@ func (u *UDPConn) Close() error { // Announce to readers and writers that we are closing down. // Busy loop until all reads and writes are unblocked. // See the docs for u.refcount. - atomic.StoreUint32(&u.closed, 1) + u.closed.Set(true) for { // Request that the kernel cancel all submitted reads. (Writes don't block indefinitely.) for idx := range u.recvReqs { @@ -306,7 +306,7 @@ func (u *UDPConn) Close() error { C.submit_cancel_request(u.recvRing, C.size_t(idx)) } } - if atomic.LoadInt32(&u.refcount) == 0 { + if u.refcount.Get() == 0 { break } time.Sleep(time.Millisecond) diff --git a/syncs/syncs.go b/syncs/syncs.go index 46861af63..ed04fa06c 100644 --- a/syncs/syncs.go +++ b/syncs/syncs.go @@ -94,6 +94,21 @@ func (b *AtomicUint32) Get() uint32 { return atomic.LoadUint32((*uint32)(b)) } +// AtomicInt32 is an atomic int32. +type AtomicInt32 int32 + +func (b *AtomicInt32) Set(v int32) { + atomic.StoreInt32((*int32)(b), v) +} + +func (b *AtomicInt32) Get() int32 { + return atomic.LoadInt32((*int32)(b)) +} + +func (b *AtomicInt32) Add(v int32) { + atomic.AddInt32((*int32)(b), v) +} + // Semaphore is a counting semaphore. // // Use NewSemaphore to create one.