overhaul error handling of peek/waitcompletion

This commit is contained in:
Josh Bleecher Snyder
2021-07-07 13:20:09 -07:00
parent e0d8dcf3eb
commit 1e3e5fd8e7
2 changed files with 95 additions and 67 deletions

View File

@@ -130,27 +130,31 @@ static int submit_readv_request(struct io_uring *ring, struct req *r, size_t idx
return 0;
}
static uint64_t completion(struct io_uring *ring, int block) {
struct completion_result {
int err;
int n;
size_t idx;
};
typedef struct completion_result go_completion_result;
static go_completion_result completion(struct io_uring *ring, int block) {
struct io_uring_cqe *cqe;
int ret;
struct completion_result res;
res.err = 0;
res.n = 0;
res.idx = 0;
if (block) {
ret = io_uring_wait_cqe(ring, &cqe);
res.err = io_uring_wait_cqe(ring, &cqe);
} else {
ret = io_uring_peek_cqe(ring, &cqe);
res.err = io_uring_peek_cqe(ring, &cqe);
}
if (ret < 0) {
return ret;
}
// TODO: We need to always return idx, otherwise we leak it!! Need to adjust all callers.
int n = cqe->res;
uint64_t nidx;
if (n < 0) {
// common error seen here: -101 (network unreachable)
nidx = n;
} else {
size_t idx = (size_t)io_uring_cqe_get_data(cqe);
nidx = packNIdx(n, idx);
if (res.err < 0) {
return res;
}
res.idx = (size_t)io_uring_cqe_get_data(cqe);
res.n = cqe->res;
io_uring_cqe_seen(ring, cqe);
return nidx;
return res;
}

View File

@@ -12,6 +12,7 @@
"os"
"reflect"
"sync"
"sync/atomic"
"syscall"
"time"
"unsafe"
@@ -36,7 +37,7 @@ type UDPConn struct {
close sync.Once
conn *net.UDPConn
file *os.File // must keep file from being GC'd
fd C.int
fd uintptr
local net.Addr
recvReqs [8]*C.goreq
sendReqs [8]*C.goreq
@@ -67,9 +68,9 @@ func NewUDPConn(pconn net.PacketConn) (*UDPConn, error) {
recvRing := new(C.go_uring)
sendRing := new(C.go_uring)
fd := C.int(file.Fd())
fd := file.Fd()
for _, r := range []*C.go_uring{recvRing, sendRing} {
ret := C.initialize(r, fd)
ret := C.initialize(r, C.int(fd))
if ret < 0 {
// TODO: free recvRing if sendRing initialize failed
return nil, fmt.Errorf("uring initialization failed: %d", ret)
@@ -131,16 +132,22 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) {
if u.fd == 0 {
return 0, netaddr.IPPort{}, errors.New("invalid uring.UDPConn")
}
n, idx, err := waitCompletion(u.recvRing)
n, idx, err := waitCompletion(u.recvRing, &u.fd)
if err != nil {
return 0, netaddr.IPPort{}, fmt.Errorf("ReadFromNetaddr: %v", err)
// io_uring failed to run our syscall.
return 0, netaddr.IPPort{}, fmt.Errorf("ReadFromNetaddr io_uring could not run syscall: %w", err)
}
if n < 0 {
// io_uring ran our syscall, which failed.
// Best effort attempt not to leak idx.
u.submitRecvRequest(int(idx))
return 0, netaddr.IPPort{}, fmt.Errorf("ReadFromNetaddr syscall failed: %w", syscall.Errno(-n))
}
r := u.recvReqs[idx]
var ip netaddr.IP
var port uint16
// TODO: native go endianness conversion routines so we don't have to call ntohl, etc.
if u.is4 {
// TODO: native go endianness conversion routines so we don't have to call ntohl, etc.
ip = netaddr.IPFrom4(*(*[4]byte)((unsafe.Pointer)((&r.sa.sin_addr.s_addr))))
port = uint16(C.ntohs(r.sa.sin_port))
} else {
@@ -151,10 +158,11 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) {
rbuf := sliceOf(r.buf, n)
copy(buf, rbuf)
// Queue up a new request.
// TODO: Do this in a goroutine?
err = u.submitRecvRequest(int(idx))
if err != nil {
panic("how should we handle this?")
if err := u.submitRecvRequest(int(idx)); err != nil {
// Aggressively return this error.
// The error will bubble up and cause the entire conn to be closed down,
// so it doesn't matter that we lost a packet here.
return 0, netaddr.IPPort{}, err
}
return n, ipp, nil
}
@@ -223,9 +231,15 @@ func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
case idx = <-u.sendReqC:
default:
// No request available. Get one from the kernel.
_, idx, err = waitCompletion(u.sendRing)
n, idx, err = waitCompletion(u.sendRing, &u.fd)
if err != nil {
return 0, fmt.Errorf("some WriteTo failed, maybe long ago: %v", err)
// io_uring failed to issue the syscall.
return 0, fmt.Errorf("WriteTo io_uring call failed: %w", err)
}
if n < 0 {
// Past syscall failed.
u.sendReqC <- idx // don't leak idx
return 0, fmt.Errorf("previous WriteTo failed: %w", syscall.Errno(-n))
}
}
r := u.sendReqs[idx]
@@ -253,9 +267,7 @@ func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
C.size_t(idx), // user data
)
// Get an extra buffer, if available.
_, idx, err, ready := peekCompletion(u.sendRing)
if ready && err == nil {
// TODO: always put the buffer back??
if idx, ok := peekCompletion(u.sendRing); ok {
// Put the request buffer back in the usable queue.
// Should never block, by construction.
u.sendReqC <- idx
@@ -288,21 +300,21 @@ type file struct {
readRing *C.go_uring
close sync.Once
file *os.File // must keep file from being GC'd
fd C.int
fd uintptr
readReqs [1]*C.goreq // Whoops! The kernel apparently cannot handle more than 1 concurrent preadv calls on a tun device!
writeReqs [8]*C.goreq
writeReqC chan int // indices into reqs
}
func newFile(f *os.File) (*file, error) {
fd := C.int(f.Fd())
fd := f.Fd()
u := &file{
file: f,
fd: fd,
}
for _, ringPtr := range []**C.go_uring{&u.writeRing, &u.readRing} {
r := new(C.go_uring)
ret := C.initialize(r, fd)
ret := C.initialize(r, C.int(fd))
if ret < 0 {
// TODO: handle unwinding partial initialization
return nil, fmt.Errorf("uring initialization failed: %d", ret)
@@ -342,36 +354,39 @@ func (u *file) submitReadvRequest(idx int) error {
return nil
}
func unpackNIdx(nidx C.uint64_t) (n, idx int, err error) {
if int64(nidx) < 0 {
return 0, 0, fmt.Errorf("error %d", int64(nidx))
}
return int(uint32(nidx >> 32)), int(uint32(nidx)), nil
}
const (
noBlockForCompletion = 0
blockForCompletion = 1
)
func waitCompletion(ring *C.go_uring) (n, idx int, err error) {
// waitCompletion blocks until a completion on ring succeeds, or until *fd == 0.
// If *fd == 0, that indicates that the ring is no loner valid, in which case waitCompletion returns net.ErrClosed.
// Reads of *fd are atomic.
func waitCompletion(ring *C.go_uring, fd *uintptr) (n, idx int, err error) {
for {
nidx := C.completion(ring, blockForCompletion)
if syscall.Errno(-nidx) == syscall.EAGAIN {
// If we have exited, stop looping.
if atomic.LoadUintptr(fd) == 0 {
return 0, 0, net.ErrClosed
}
// TODO: is this racy?
r := C.completion(ring, blockForCompletion)
if syscall.Errno(-r.err) == syscall.EAGAIN {
continue
}
return unpackNIdx(nidx)
var err error
if r.err < 0 {
err = syscall.Errno(-r.err)
}
return int(r.n), int(r.idx), err
}
}
func peekCompletion(ring *C.go_uring) (n, idx int, err error, ready bool) {
nidx := C.completion(ring, noBlockForCompletion)
if syscall.Errno(-nidx) == syscall.EAGAIN || syscall.Errno(-nidx) == syscall.EINTR {
return 0, 0, nil, false
// Nothing waiting for us.
func peekCompletion(ring *C.go_uring) (idx int, ok bool) {
r := C.completion(ring, noBlockForCompletion)
if r.err < 0 {
return 0, false
}
n, idx, err = unpackNIdx(nidx)
return n, idx, err, true
return int(r.idx), true
}
type fileReq struct {
@@ -382,20 +397,26 @@ type fileReq struct {
// Read data into buf[offset:].
// We are allowed to write junk into buf[offset-4:offset].
func (u *file) Read(buf []byte) (n int, err error) { // read a packet from the device (without any additional headers)
if u.fd == 0 {
if u.fd == 0 { // TODO: review all uses of u.fd for atomic read/write
return 0, errors.New("invalid uring.File")
}
n, idx, err := waitCompletion(u.readRing)
if err != nil || n < 4 {
return 0, fmt.Errorf("Read: %v", err)
n, idx, err := waitCompletion(u.readRing, &u.fd)
if err != nil {
return 0, fmt.Errorf("Read: io_uring failed to issue syscall: %w", err)
}
if n < 0 {
// Syscall failed.
u.submitReadvRequest(int(idx)) // best effort attempt not to leak idx
return 0, fmt.Errorf("Read: syscall failed: %w", syscall.Errno(-n))
}
// Success.
r := u.readReqs[idx]
rbuf := sliceOf(r.buf, n)
copy(buf, rbuf)
// Queue up a new request.
err = u.submitReadvRequest(int(idx))
if err != nil {
panic("how should we handle this?")
if err := u.submitReadvRequest(int(idx)); err != nil {
// Aggressively return this error.
return 0, err
}
return n, nil
}
@@ -410,10 +431,14 @@ func (u *file) Write(buf []byte) (int, error) {
case idx = <-u.writeReqC:
default:
// No request available. Get one from the kernel.
var err error
_, idx, err = waitCompletion(u.writeRing)
n, idx, err := waitCompletion(u.writeRing, &u.fd)
if err != nil {
return 0, fmt.Errorf("some write failed, maybe long ago: %v", err)
return 0, fmt.Errorf("Write io_uring call failed: %w", err)
}
if n < 0 {
// Past syscall failed.
u.writeReqC <- idx // don't leak idx
return 0, fmt.Errorf("previous Write failed: %w", syscall.Errno(-n))
}
}
r := u.writeReqs[idx]
@@ -422,9 +447,8 @@ func (u *file) Write(buf []byte) (int, error) {
copy(rbuf, buf)
C.submit_writev_request(u.writeRing, r, C.int(len(buf)), C.size_t(idx))
// Get an extra buffer, if available.
_, idx, err, ready := peekCompletion(u.writeRing)
if ready && err == nil {
// TODO: put the buffer back even on error?
idx, ok := peekCompletion(u.writeRing)
if ok {
// Put the request buffer back in the usable queue.
// Should never block, by construction.
u.writeReqC <- idx