diff --git a/net/uring/io_uring.c b/net/uring/io_uring.c new file mode 100644 index 000000000..164daa0cd --- /dev/null +++ b/net/uring/io_uring.c @@ -0,0 +1,107 @@ +#include // debugging +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// TODO: use fixed buffers? https://unixism.net/loti/tutorial/fixed_buffers.html + +typedef struct io_uring go_uring; + +// Wait for a completion to be available, fetch the data +static int receive_into(int sock, struct io_uring *ring, char *ip, uint16_t *port) { + struct io_uring_cqe *cqe; +again:; + + int ret = io_uring_wait_cqe(ring, &cqe); + if (ret == -EINTR) { + goto again; + } + // TODO: Delete perror, fprintf, etc. + // Encode in return value or similar. + if (ret < 0) { + perror("io_uring_wait_cqe"); + return ret; + } + if (cqe->res < 0) { + fprintf(stderr, "recvmsg failed: %d.\n", cqe->res); + return cqe->res; + } + struct msghdr *mhdr = io_uring_cqe_get_data(cqe); + if (mhdr == NULL) { + fprintf(stderr, "received nop\n"); + return -1; + } + int n = cqe->res; + + struct sockaddr_in sa; + memcpy(&sa, mhdr->msg_name, mhdr->msg_namelen); + + memcpy(ip, &sa.sin_addr, 4); + *port = ntohs(sa.sin_port); + + free(mhdr->msg_iov); + free(mhdr->msg_name); + free(mhdr); + + io_uring_cqe_seen(ring, cqe); + return n; +} + +// submit a recvmsg request via liburing +// TODO: What recvfrom support arrives, maybe use that instead? +static int submit_recvmsg_request(int sock, struct io_uring *ring, char *buf, int buflen) { + struct msghdr *mhdr = malloc(sizeof(struct msghdr)); + if (!mhdr) { + perror("malloc(msghdr)"); + return 1; + } + + struct iovec *iov = malloc(sizeof(struct iovec)); + if (!iov) { + perror("malloc(iov)"); + free(iov); + return 1; + } + + char *sender = malloc(sizeof(struct sockaddr_in)); + if (!sender) { + perror("malloc(sender)"); + free(iov); + free(mhdr); + return 1; + } + + memset(iov, 0, sizeof(*iov)); + iov->iov_base = buf; + iov->iov_len = buflen; + + memset(mhdr, 0, sizeof(*mhdr)); + mhdr->msg_iov = iov; + mhdr->msg_iovlen = 1; + + memset(sender, 0, sizeof(struct sockaddr_in)); + mhdr->msg_name = sender; + mhdr->msg_namelen = sizeof(struct sockaddr_in); + + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + io_uring_prep_recvmsg(sqe, sock, mhdr, 0); + io_uring_sqe_set_data(sqe, mhdr); + io_uring_submit(ring); + + return 0; +} + +static void submit_nop_request(struct io_uring *ring) { + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + io_uring_prep_nop(sqe); + io_uring_submit(ring); +} diff --git a/net/uring/io_uring_linux.go b/net/uring/io_uring_linux.go new file mode 100644 index 000000000..29c401d02 --- /dev/null +++ b/net/uring/io_uring_linux.go @@ -0,0 +1,149 @@ +package uring + +// #cgo LDFLAGS: -luring +// #include "io_uring.c" +import "C" + +import ( + "errors" + "fmt" + "net" + "os" + "sync" + "time" + "unsafe" + + "inet.af/netaddr" +) + +// A UDPConn is a recv-only UDP fd manager. +// TODO: Support writes. +// TODO: support multiplexing multiple fds? +// May be more expensive than having multiple urings, and certainly more complicated. +// TODO: API review for performance. +// We'd like to enqueue a bunch of recv calls and deqeueue them later, +// but we have a problem with buffer management: We get our buffers just-in-time +// from wireguard-go, which means we have to make copies. +// That's OK for now, but later it could be a performance issue. +// For now, keep it simple and enqueue/dequeue in a single step. +// TODO: IPv6 +type UDPConn struct { + ptr *C.go_uring + close sync.Once + conn *net.UDPConn + file *os.File // must keep file from being GC'd + fd C.int + local net.Addr +} + +func NewUDPConn(conn *net.UDPConn) (*UDPConn, error) { + // this is dumb + local := conn.LocalAddr().String() + ip, err := netaddr.ParseIPPort(local) + if err != nil { + return nil, fmt.Errorf("failed to parse UDPConn local addr %s as IP: %w", local, err) + } + if !ip.IP().Is4() { + return nil, fmt.Errorf("uring only supports udp4 (for now), got local addr %s", local) + } + // TODO: probe for system capabilities: https://unixism.net/loti/tutorial/probe_liburing.html + file, err := conn.File() + if err != nil { + return nil, err + } + r := new(C.go_uring) + + const queue_depth = 8 // TODO: What value to use here? + C.io_uring_queue_init(queue_depth, r, 0) + c := &UDPConn{ + ptr: r, + conn: conn, + file: file, + fd: C.int(file.Fd()), + local: conn.LocalAddr(), + } + return c, nil +} + +func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) { + if u.fd == 0 { + return 0, netaddr.IPPort{}, errors.New("invalid uring.UDPConn") + } + // TODO: eventually separate submitting the request and waiting for the response. + errno := C.submit_recvmsg_request(u.fd, u.ptr, (*C.char)(unsafe.Pointer(&buf[0])), C.int(len(buf))) + if errno < 0 { + return 0, netaddr.IPPort{}, fmt.Errorf("uring.UDPConn recv failed: %v", errno) // TODO: Improve errno + } + + a := new([4]byte) + var port C.uint16_t + n := C.receive_into(u.fd, u.ptr, (*C.char)(unsafe.Pointer(a)), &port) + if n < 0 { + return 0, netaddr.IPPort{}, errors.New("something wrong") + } + ipp := netaddr.IPPortFrom(netaddr.IPFrom4(*a), uint16(port)) + return int(n), ipp, nil +} + +func (u *UDPConn) Close() error { + fmt.Println("CLOSE URING", u) + u.close.Do(func() { + // Send a nop to unblock any outstanding readers. + // Hope that we manage to close before any new readers appear. + // Not sure exactly how this is supposed to work reliably... + // I must be missing something. + // + // C.submit_nop_request(u.ptr) + // + // Update: this causes crashes, because of entirely predictable and predicted races. + // The mystery about how to safely unblock all outstanding io_uring_wait_cqe calls remains... + fmt.Println("io_uring_queue_exit", u.ptr) + C.io_uring_queue_exit(u.ptr) + fmt.Println("DONE io_uring_queue_exit", u.ptr) + u.ptr = nil + u.conn.Close() + u.conn = nil + u.file.Close() + u.file = nil + u.fd = 0 + }) + return nil +} + +// Implement net.PacketConn, for convenience integrating with magicsock. + +var _ net.PacketConn = (*UDPConn)(nil) + +type udpAddr struct { + ipp netaddr.IPPort +} + +func (u udpAddr) Network() string { return "udp4" } // TODO: ipv6 +func (u udpAddr) String() string { return u.ipp.String() } + +func (c *UDPConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) { + n, ipp, err := c.ReadFromNetaddr(p) + if err != nil { + return 0, nil, err + } + return n, udpAddr{ipp: ipp}, err +} + +func (c *UDPConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { + return c.conn.WriteTo(p, addr) +} + +// LocalAddr returns the local network address. +func (c *UDPConn) LocalAddr() net.Addr { return c.local } + +func (c *UDPConn) SetDeadline(t time.Time) error { + panic("not implemented") // TODO: Implement +} + +func (c *UDPConn) SetReadDeadline(t time.Time) error { + panic("not implemented") // TODO: Implement +} + +func (c *UDPConn) SetWriteDeadline(t time.Time) error { + panic("not implemented") // TODO: Implement +} diff --git a/udp.go b/udp.go new file mode 100644 index 000000000..315f0d616 --- /dev/null +++ b/udp.go @@ -0,0 +1,29 @@ +package main + +import ( + "fmt" + "net" + + "tailscale.com/net/uring" +) + +func main() { + listen, err := net.ListenUDP("udp4", &net.UDPAddr{Port: 9999}) + check(err) + fmt.Println("listening UDP on", listen.LocalAddr()) + + conn, err := uring.NewUDPConn(listen) + check(err) + for { + b := make([]byte, 2000) + n, ipp, err := conn.ReadFromNetaddr(b) + check(err) + fmt.Printf("received %q from %v\n", b[:n], ipp) + } +} + +func check(err error) { + if err != nil { + panic(err) + } +} diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index 2117f725c..e5bc28b44 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -44,6 +44,7 @@ "tailscale.com/net/netns" "tailscale.com/net/portmapper" "tailscale.com/net/stun" + "tailscale.com/net/uring" "tailscale.com/syncs" "tailscale.com/tailcfg" "tailscale.com/tstime" @@ -2695,7 +2696,14 @@ func (c *Conn) bindSocket(rucPtr **RebindingUDPConn, network string, curPortFate continue } // Success. - ruc.pconn = pconn + uring, err := uring.NewUDPConn(pconn.(*net.UDPConn)) + if err != nil { + c.logf("uring not available: %v", err) + ruc.pconn = pconn + } else { + c.logf("using uring") + ruc.pconn = uring + } if network == "udp4" { health.SetUDP4Unbound(false) } @@ -2856,12 +2864,16 @@ func (c *RebindingUDPConn) ReadFromNetaddr(b []byte) (n int, ipp netaddr.IPPort, // as long as pAddr itself doesn't escape. // The non-*net.UDPConn case works, but it allocates. var pAddr *net.UDPAddr - if udpConn, ok := pconn.(*net.UDPConn); ok { - n, pAddr, err = udpConn.ReadFromUDP(b) - } else { + switch pconn := pconn.(type) { + case *uring.UDPConn: + n, ipp, err = pconn.ReadFromNetaddr(b) + case *net.UDPConn: + n, pAddr, err = pconn.ReadFromUDP(b) + default: var addr net.Addr n, addr, err = pconn.ReadFrom(b) if addr != nil { + var ok bool pAddr, ok = addr.(*net.UDPAddr) if !ok { return 0, netaddr.IPPort{}, fmt.Errorf("RebindingUDPConn.ReadFromNetaddr: underlying connection returned address of type %T, want *netaddr.UDPAddr", addr) @@ -2873,7 +2885,7 @@ func (c *RebindingUDPConn) ReadFromNetaddr(b []byte) (n int, ipp netaddr.IPPort, if pconn != c.currentConn() { continue } - } else { + } else if pAddr != nil { // Convert pAddr to a netaddr.IPPort. // This prevents pAddr from escaping. var ok bool