use io_uring

This commit is contained in:
Josh Bleecher Snyder
2021-05-27 12:36:03 -07:00
parent 4f4dae32dd
commit fcdc9086a2
4 changed files with 302 additions and 5 deletions

107
net/uring/io_uring.c Normal file
View File

@@ -0,0 +1,107 @@
#include <arpa/inet.h> // debugging
#include <fcntl.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/ioctl.h>
#include <liburing.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/udp.h>
// TODO: use fixed buffers? https://unixism.net/loti/tutorial/fixed_buffers.html
typedef struct io_uring go_uring;
// Wait for a completion to be available, fetch the data
static int receive_into(int sock, struct io_uring *ring, char *ip, uint16_t *port) {
struct io_uring_cqe *cqe;
again:;
int ret = io_uring_wait_cqe(ring, &cqe);
if (ret == -EINTR) {
goto again;
}
// TODO: Delete perror, fprintf, etc.
// Encode in return value or similar.
if (ret < 0) {
perror("io_uring_wait_cqe");
return ret;
}
if (cqe->res < 0) {
fprintf(stderr, "recvmsg failed: %d.\n", cqe->res);
return cqe->res;
}
struct msghdr *mhdr = io_uring_cqe_get_data(cqe);
if (mhdr == NULL) {
fprintf(stderr, "received nop\n");
return -1;
}
int n = cqe->res;
struct sockaddr_in sa;
memcpy(&sa, mhdr->msg_name, mhdr->msg_namelen);
memcpy(ip, &sa.sin_addr, 4);
*port = ntohs(sa.sin_port);
free(mhdr->msg_iov);
free(mhdr->msg_name);
free(mhdr);
io_uring_cqe_seen(ring, cqe);
return n;
}
// submit a recvmsg request via liburing
// TODO: What recvfrom support arrives, maybe use that instead?
static int submit_recvmsg_request(int sock, struct io_uring *ring, char *buf, int buflen) {
struct msghdr *mhdr = malloc(sizeof(struct msghdr));
if (!mhdr) {
perror("malloc(msghdr)");
return 1;
}
struct iovec *iov = malloc(sizeof(struct iovec));
if (!iov) {
perror("malloc(iov)");
free(iov);
return 1;
}
char *sender = malloc(sizeof(struct sockaddr_in));
if (!sender) {
perror("malloc(sender)");
free(iov);
free(mhdr);
return 1;
}
memset(iov, 0, sizeof(*iov));
iov->iov_base = buf;
iov->iov_len = buflen;
memset(mhdr, 0, sizeof(*mhdr));
mhdr->msg_iov = iov;
mhdr->msg_iovlen = 1;
memset(sender, 0, sizeof(struct sockaddr_in));
mhdr->msg_name = sender;
mhdr->msg_namelen = sizeof(struct sockaddr_in);
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_recvmsg(sqe, sock, mhdr, 0);
io_uring_sqe_set_data(sqe, mhdr);
io_uring_submit(ring);
return 0;
}
static void submit_nop_request(struct io_uring *ring) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_nop(sqe);
io_uring_submit(ring);
}

149
net/uring/io_uring_linux.go Normal file
View File

@@ -0,0 +1,149 @@
package uring
// #cgo LDFLAGS: -luring
// #include "io_uring.c"
import "C"
import (
"errors"
"fmt"
"net"
"os"
"sync"
"time"
"unsafe"
"inet.af/netaddr"
)
// A UDPConn is a recv-only UDP fd manager.
// TODO: Support writes.
// TODO: support multiplexing multiple fds?
// May be more expensive than having multiple urings, and certainly more complicated.
// TODO: API review for performance.
// We'd like to enqueue a bunch of recv calls and deqeueue them later,
// but we have a problem with buffer management: We get our buffers just-in-time
// from wireguard-go, which means we have to make copies.
// That's OK for now, but later it could be a performance issue.
// For now, keep it simple and enqueue/dequeue in a single step.
// TODO: IPv6
type UDPConn struct {
ptr *C.go_uring
close sync.Once
conn *net.UDPConn
file *os.File // must keep file from being GC'd
fd C.int
local net.Addr
}
func NewUDPConn(conn *net.UDPConn) (*UDPConn, error) {
// this is dumb
local := conn.LocalAddr().String()
ip, err := netaddr.ParseIPPort(local)
if err != nil {
return nil, fmt.Errorf("failed to parse UDPConn local addr %s as IP: %w", local, err)
}
if !ip.IP().Is4() {
return nil, fmt.Errorf("uring only supports udp4 (for now), got local addr %s", local)
}
// TODO: probe for system capabilities: https://unixism.net/loti/tutorial/probe_liburing.html
file, err := conn.File()
if err != nil {
return nil, err
}
r := new(C.go_uring)
const queue_depth = 8 // TODO: What value to use here?
C.io_uring_queue_init(queue_depth, r, 0)
c := &UDPConn{
ptr: r,
conn: conn,
file: file,
fd: C.int(file.Fd()),
local: conn.LocalAddr(),
}
return c, nil
}
func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) {
if u.fd == 0 {
return 0, netaddr.IPPort{}, errors.New("invalid uring.UDPConn")
}
// TODO: eventually separate submitting the request and waiting for the response.
errno := C.submit_recvmsg_request(u.fd, u.ptr, (*C.char)(unsafe.Pointer(&buf[0])), C.int(len(buf)))
if errno < 0 {
return 0, netaddr.IPPort{}, fmt.Errorf("uring.UDPConn recv failed: %v", errno) // TODO: Improve errno
}
a := new([4]byte)
var port C.uint16_t
n := C.receive_into(u.fd, u.ptr, (*C.char)(unsafe.Pointer(a)), &port)
if n < 0 {
return 0, netaddr.IPPort{}, errors.New("something wrong")
}
ipp := netaddr.IPPortFrom(netaddr.IPFrom4(*a), uint16(port))
return int(n), ipp, nil
}
func (u *UDPConn) Close() error {
fmt.Println("CLOSE URING", u)
u.close.Do(func() {
// Send a nop to unblock any outstanding readers.
// Hope that we manage to close before any new readers appear.
// Not sure exactly how this is supposed to work reliably...
// I must be missing something.
//
// C.submit_nop_request(u.ptr)
//
// Update: this causes crashes, because of entirely predictable and predicted races.
// The mystery about how to safely unblock all outstanding io_uring_wait_cqe calls remains...
fmt.Println("io_uring_queue_exit", u.ptr)
C.io_uring_queue_exit(u.ptr)
fmt.Println("DONE io_uring_queue_exit", u.ptr)
u.ptr = nil
u.conn.Close()
u.conn = nil
u.file.Close()
u.file = nil
u.fd = 0
})
return nil
}
// Implement net.PacketConn, for convenience integrating with magicsock.
var _ net.PacketConn = (*UDPConn)(nil)
type udpAddr struct {
ipp netaddr.IPPort
}
func (u udpAddr) Network() string { return "udp4" } // TODO: ipv6
func (u udpAddr) String() string { return u.ipp.String() }
func (c *UDPConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
n, ipp, err := c.ReadFromNetaddr(p)
if err != nil {
return 0, nil, err
}
return n, udpAddr{ipp: ipp}, err
}
func (c *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
return c.conn.WriteTo(p, addr)
}
// LocalAddr returns the local network address.
func (c *UDPConn) LocalAddr() net.Addr { return c.local }
func (c *UDPConn) SetDeadline(t time.Time) error {
panic("not implemented") // TODO: Implement
}
func (c *UDPConn) SetReadDeadline(t time.Time) error {
panic("not implemented") // TODO: Implement
}
func (c *UDPConn) SetWriteDeadline(t time.Time) error {
panic("not implemented") // TODO: Implement
}

29
udp.go Normal file
View File

@@ -0,0 +1,29 @@
package main
import (
"fmt"
"net"
"tailscale.com/net/uring"
)
func main() {
listen, err := net.ListenUDP("udp4", &net.UDPAddr{Port: 9999})
check(err)
fmt.Println("listening UDP on", listen.LocalAddr())
conn, err := uring.NewUDPConn(listen)
check(err)
for {
b := make([]byte, 2000)
n, ipp, err := conn.ReadFromNetaddr(b)
check(err)
fmt.Printf("received %q from %v\n", b[:n], ipp)
}
}
func check(err error) {
if err != nil {
panic(err)
}
}

View File

@@ -44,6 +44,7 @@
"tailscale.com/net/netns"
"tailscale.com/net/portmapper"
"tailscale.com/net/stun"
"tailscale.com/net/uring"
"tailscale.com/syncs"
"tailscale.com/tailcfg"
"tailscale.com/tstime"
@@ -2695,7 +2696,14 @@ func (c *Conn) bindSocket(rucPtr **RebindingUDPConn, network string, curPortFate
continue
}
// Success.
ruc.pconn = pconn
uring, err := uring.NewUDPConn(pconn.(*net.UDPConn))
if err != nil {
c.logf("uring not available: %v", err)
ruc.pconn = pconn
} else {
c.logf("using uring")
ruc.pconn = uring
}
if network == "udp4" {
health.SetUDP4Unbound(false)
}
@@ -2856,12 +2864,16 @@ func (c *RebindingUDPConn) ReadFromNetaddr(b []byte) (n int, ipp netaddr.IPPort,
// as long as pAddr itself doesn't escape.
// The non-*net.UDPConn case works, but it allocates.
var pAddr *net.UDPAddr
if udpConn, ok := pconn.(*net.UDPConn); ok {
n, pAddr, err = udpConn.ReadFromUDP(b)
} else {
switch pconn := pconn.(type) {
case *uring.UDPConn:
n, ipp, err = pconn.ReadFromNetaddr(b)
case *net.UDPConn:
n, pAddr, err = pconn.ReadFromUDP(b)
default:
var addr net.Addr
n, addr, err = pconn.ReadFrom(b)
if addr != nil {
var ok bool
pAddr, ok = addr.(*net.UDPAddr)
if !ok {
return 0, netaddr.IPPort{}, fmt.Errorf("RebindingUDPConn.ReadFromNetaddr: underlying connection returned address of type %T, want *netaddr.UDPAddr", addr)
@@ -2873,7 +2885,7 @@ func (c *RebindingUDPConn) ReadFromNetaddr(b []byte) (n int, ipp netaddr.IPPort,
if pconn != c.currentConn() {
continue
}
} else {
} else if pAddr != nil {
// Convert pAddr to a netaddr.IPPort.
// This prevents pAddr from escaping.
var ok bool