This commit is contained in:
Josh Bleecher Snyder
2021-07-06 18:40:45 -07:00
parent c1bc58defc
commit e0d8dcf3eb
2 changed files with 54 additions and 69 deletions

View File

@@ -109,33 +109,6 @@ static void submit_nop_request(struct io_uring *ring) {
io_uring_submit(ring);
}
// Wait for a completion to be available, fetch the data
static uint64_t wait_completion(struct io_uring *ring) {
struct io_uring_cqe *cqe;
again:;
int ret = io_uring_wait_cqe(ring, &cqe);
if (ret == -EINTR) {
goto again;
}
// TODO: Delete perror, fprintf, etc.
// Encode in return value or similar.
if (ret < 0) {
perror("wait_completion io_uring_wait_cqe");
return ret;
}
int n = cqe->res;
if (n < 0) {
// TODO: This leaks a buffer!!!
fprintf(stderr, "wait_completion failed: %d.\n", n);
return n;
}
size_t idx = (size_t)io_uring_cqe_get_data(cqe);
uint64_t nidx = packNIdx(n, idx);
io_uring_cqe_seen(ring, cqe);
return nidx;
}
// submit a writev request via liburing
static int submit_writev_request(struct io_uring *ring, struct req *r, int buflen, size_t idx) {
r->iov.iov_len = buflen;
@@ -157,27 +130,27 @@ static int submit_readv_request(struct io_uring *ring, struct req *r, size_t idx
return 0;
}
static uint64_t peek_completion(struct io_uring *ring) {
static uint64_t completion(struct io_uring *ring, int block) {
struct io_uring_cqe *cqe;
int ret = io_uring_peek_cqe(ring, &cqe);
if ((-ret == EAGAIN) || (-ret == EINTR)) {
return ret;
int ret;
if (block) {
ret = io_uring_wait_cqe(ring, &cqe);
} else {
ret = io_uring_peek_cqe(ring, &cqe);
}
// TODO: Delete perror, fprintf, etc.
// Encode in return value or similar.
if (ret < 0) {
perror("on failure, peek_file_completion io_uring_wait_cqe");
return ret;
}
errno = 0;
// TODO: We need to always return idx, otherwise we leak it!! Need to adjust all callers.
int n = cqe->res;
uint64_t nidx;
if (n < 0) {
// TODO: This leaks a buffer!!!
fprintf(stderr, "peek_file_completion write failed: %d.\n", n);
return n;
// common error seen here: -101 (network unreachable)
nidx = n;
} else {
size_t idx = (size_t)io_uring_cqe_get_data(cqe);
nidx = packNIdx(n, idx);
}
size_t idx = (size_t)io_uring_cqe_get_data(cqe);
uint64_t nidx = packNIdx(n, idx);
io_uring_cqe_seen(ring, cqe);
return nidx;
}

View File

@@ -131,8 +131,7 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) {
if u.fd == 0 {
return 0, netaddr.IPPort{}, errors.New("invalid uring.UDPConn")
}
nidx := C.wait_completion(u.recvRing)
n, idx, err := unpackNIdx(nidx)
n, idx, err := waitCompletion(u.recvRing)
if err != nil {
return 0, netaddr.IPPort{}, fmt.Errorf("ReadFromNetaddr: %v", err)
}
@@ -224,9 +223,7 @@ func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
case idx = <-u.sendReqC:
default:
// No request available. Get one from the kernel.
nidx := C.wait_completion(u.sendRing)
var err error
_, idx, err = unpackNIdx(nidx)
_, idx, err = waitCompletion(u.sendRing)
if err != nil {
return 0, fmt.Errorf("some WriteTo failed, maybe long ago: %v", err)
}
@@ -256,16 +253,12 @@ func (u *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
C.size_t(idx), // user data
)
// Get an extra buffer, if available.
nidx := C.peek_completion(u.sendRing)
if syscall.Errno(-nidx) == syscall.EAGAIN || syscall.Errno(-nidx) == syscall.EINTR {
// Nothing waiting for us.
} else {
_, idx, err := unpackNIdx(nidx) // ignore errors here, this is best-effort only (TODO: right?)
if err == nil {
// Put the request buffer back in the usable queue.
// Should never block, by construction.
u.sendReqC <- idx
}
_, idx, err, ready := peekCompletion(u.sendRing)
if ready && err == nil {
// TODO: always put the buffer back??
// Put the request buffer back in the usable queue.
// Should never block, by construction.
u.sendReqC <- idx
}
return len(p), nil
}
@@ -356,6 +349,31 @@ func unpackNIdx(nidx C.uint64_t) (n, idx int, err error) {
return int(uint32(nidx >> 32)), int(uint32(nidx)), nil
}
const (
noBlockForCompletion = 0
blockForCompletion = 1
)
func waitCompletion(ring *C.go_uring) (n, idx int, err error) {
for {
nidx := C.completion(ring, blockForCompletion)
if syscall.Errno(-nidx) == syscall.EAGAIN {
continue
}
return unpackNIdx(nidx)
}
}
func peekCompletion(ring *C.go_uring) (n, idx int, err error, ready bool) {
nidx := C.completion(ring, noBlockForCompletion)
if syscall.Errno(-nidx) == syscall.EAGAIN || syscall.Errno(-nidx) == syscall.EINTR {
return 0, 0, nil, false
// Nothing waiting for us.
}
n, idx, err = unpackNIdx(nidx)
return n, idx, err, true
}
type fileReq struct {
iov C.go_iovec
buf [device.MaxSegmentSize]byte
@@ -367,8 +385,7 @@ func (u *file) Read(buf []byte) (n int, err error) { // read a packet from the d
if u.fd == 0 {
return 0, errors.New("invalid uring.File")
}
nidx := C.wait_completion(u.readRing)
n, idx, err := unpackNIdx(nidx)
n, idx, err := waitCompletion(u.readRing)
if err != nil || n < 4 {
return 0, fmt.Errorf("Read: %v", err)
}
@@ -393,9 +410,8 @@ func (u *file) Write(buf []byte) (int, error) {
case idx = <-u.writeReqC:
default:
// No request available. Get one from the kernel.
nidx := C.wait_completion(u.writeRing)
var err error
_, idx, err = unpackNIdx(nidx)
_, idx, err = waitCompletion(u.writeRing)
if err != nil {
return 0, fmt.Errorf("some write failed, maybe long ago: %v", err)
}
@@ -406,16 +422,12 @@ func (u *file) Write(buf []byte) (int, error) {
copy(rbuf, buf)
C.submit_writev_request(u.writeRing, r, C.int(len(buf)), C.size_t(idx))
// Get an extra buffer, if available.
nidx := C.peek_completion(u.writeRing)
if syscall.Errno(-nidx) == syscall.EAGAIN || syscall.Errno(-nidx) == syscall.EINTR {
// Nothing waiting for us.
} else {
_, idx, err := unpackNIdx(nidx) // ignore errors here, this is best-effort only (TODO: right?)
if err == nil {
// Put the request buffer back in the usable queue.
// Should never block, by construction.
u.writeReqC <- idx
}
_, idx, err, ready := peekCompletion(u.writeRing)
if ready && err == nil {
// TODO: put the buffer back even on error?
// Put the request buffer back in the usable queue.
// Should never block, by construction.
u.writeReqC <- idx
}
return len(buf), nil
}