use syncs, start reworking file

This commit is contained in:
Josh Bleecher Snyder
2021-07-08 17:02:42 -07:00
parent 0132d52e9a
commit 61cde40000
3 changed files with 45 additions and 19 deletions

View File

@@ -15,13 +15,24 @@
"golang.zx2c4.com/wireguard/device"
)
// A File is a write-only file fd manager.
// TODO: Support reads
// TODO: all the todos from UDPConn
// A file is a file handle that uses io_uring for reads and writes.
// It is intended for use with TUN fds, and thus only supports
// reading from and writing to file offset 0.
type file struct {
// We have two urings so that we don't have to demux completion events.
// writeRing is the uring for pwritev calls.
writeRing *C.go_uring
readRing *C.go_uring
close sync.Once
// readRing is the uring for preadv calls.
readRing *C.go_uring
// close ensures that file closes occur exactly once.
close sync.Once
// closed is an atomic variable that indicates whether the connection has been closed.
// TODO: Make an atomic bool type that we can use here.
closed uint32
file *os.File // must keep file from being GC'd
fd uintptr
readReqs [1]*C.goreq // Whoops! The kernel apparently cannot handle more than 1 concurrent preadv calls on a tun device!

View File

@@ -17,12 +17,13 @@
"golang.zx2c4.com/wireguard/device"
"inet.af/netaddr"
"tailscale.com/syncs"
"tailscale.com/util/endian"
)
const bufferSize = device.MaxSegmentSize
// A UDPConn is a recv-only UDP fd manager.
// A UDPConn is a UDP connection that uses io_uring to send and receive packets.
type UDPConn struct {
// We have two urings so that we don't have to demux completion events.
@@ -33,9 +34,8 @@ type UDPConn struct {
// close ensures that connection closes occur exactly once.
close sync.Once
// closed is an atomic variable that indicates whether the connection has been closed.
// TODO: Make an atomic bool type that we can use here.
closed uint32
// closed indicates whether the connection has been closed.
closed syncs.AtomicBool
// shutdown is a sequence of funcs to be called when the UDPConn closes.
shutdown []func()
@@ -61,7 +61,7 @@ type UDPConn struct {
is4 bool
// refcount counts the number of outstanding read/write requests.
// It is accessed atomically. refcount is used for graceful shutdown.
// refcount is used for graceful shutdown.
// The pattern (very roughly) is:
//
// func readOrWrite() {
@@ -83,7 +83,7 @@ type UDPConn struct {
// (The obvious alternative is to use a sync.RWMutex, but that has a chicken-and-egg problem.
// Reads/writes must take an rlock, but Close cannot take a wlock under all the rlocks are released,
// but Close cannot issue cancellations to release the rlocks without first taking a wlock.)
refcount int32
refcount syncs.AtomicInt32
}
func NewUDPConn(pconn net.PacketConn) (*UDPConn, error) {
@@ -181,9 +181,9 @@ func (u *UDPConn) recvReqInKernel(idx int) *int32 {
func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) {
// The docs for the u.refcount field document this prologue.
atomic.AddInt32(&u.refcount, 1)
defer atomic.AddInt32(&u.refcount, -1)
if atomic.LoadUint32(&u.closed) != 0 {
u.refcount.Add(1)
defer u.refcount.Add(-1)
if u.closed.Get() {
return 0, netaddr.IPPort{}, net.ErrClosed
}
@@ -238,9 +238,9 @@ func (c *UDPConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
// The docs for the u.refcount field document this prologue.
atomic.AddInt32(&u.refcount, 1)
defer atomic.AddInt32(&u.refcount, -1)
if atomic.LoadUint32(&u.closed) != 0 {
u.refcount.Add(1)
defer u.refcount.Add(-1)
if u.closed.Get() {
return 0, net.ErrClosed
}
@@ -298,7 +298,7 @@ func (u *UDPConn) Close() error {
// Announce to readers and writers that we are closing down.
// Busy loop until all reads and writes are unblocked.
// See the docs for u.refcount.
atomic.StoreUint32(&u.closed, 1)
u.closed.Set(true)
for {
// Request that the kernel cancel all submitted reads. (Writes don't block indefinitely.)
for idx := range u.recvReqs {
@@ -306,7 +306,7 @@ func (u *UDPConn) Close() error {
C.submit_cancel_request(u.recvRing, C.size_t(idx))
}
}
if atomic.LoadInt32(&u.refcount) == 0 {
if u.refcount.Get() == 0 {
break
}
time.Sleep(time.Millisecond)

View File

@@ -94,6 +94,21 @@ func (b *AtomicUint32) Get() uint32 {
return atomic.LoadUint32((*uint32)(b))
}
// AtomicInt32 is an atomic int32.
type AtomicInt32 int32
func (b *AtomicInt32) Set(v int32) {
atomic.StoreInt32((*int32)(b), v)
}
func (b *AtomicInt32) Get() int32 {
return atomic.LoadInt32((*int32)(b))
}
func (b *AtomicInt32) Add(v int32) {
atomic.AddInt32((*int32)(b), v)
}
// Semaphore is a counting semaphore.
//
// Use NewSemaphore to create one.