From 7fd5e310705159dbf14757cba7190ee5dd317a7e Mon Sep 17 00:00:00 2001 From: Josh Bleecher Snyder Date: Wed, 2 Jun 2021 15:30:02 -0700 Subject: [PATCH] tun writes...not working yet (but why not??) --- net/tstun/wrap.go | 44 ++++++++++++- net/uring/io_uring.c | 96 ++++++++++++++++++++++----- net/uring/io_uring_linux.go | 127 +++++++++++++++++++++++++++++++++--- 3 files changed, 240 insertions(+), 27 deletions(-) diff --git a/net/tstun/wrap.go b/net/tstun/wrap.go index 5bebf16ca..1da686a19 100644 --- a/net/tstun/wrap.go +++ b/net/tstun/wrap.go @@ -12,12 +12,16 @@ "os" "sync" "sync/atomic" + "syscall" "time" + "golang.org/x/net/ipv6" "golang.zx2c4.com/wireguard/device" "golang.zx2c4.com/wireguard/tun" + wgtun "golang.zx2c4.com/wireguard/tun" // import twice to work around shadowing (TODO: fix properly) "inet.af/netaddr" "tailscale.com/net/packet" + "tailscale.com/net/uring" "tailscale.com/types/ipproto" "tailscale.com/types/logger" "tailscale.com/wgengine/filter" @@ -62,6 +66,9 @@ type Wrapper struct { // tdev is the underlying Wrapper device. tdev tun.Device + // uring performs writes to the underlying Wrapper device. + ring *uring.File + closeOnce sync.Once lastActivityAtomic int64 // unix seconds of last send or receive @@ -165,6 +172,13 @@ func Wrap(logf logger.Logf, tdev tun.Device) *Wrapper { // The buffer starts out consumed. tun.bufferConsumed <- struct{}{} + f := tdev.(*wgtun.NativeTun).File() + ring, err := uring.NewFile(f) + if err != nil { + panic(err) // TODO: just log? wat? + } else { + tun.ring = ring + } return tun } @@ -519,7 +533,35 @@ func (t *Wrapper) Write(buf []byte, offset int) (int, error) { } t.noteActivity() - return t.tdev.Write(buf, offset) + return t.write(buf, offset) +} + +func (t *Wrapper) write(buf []byte, offset int) (int, error) { + if t.ring == nil { + return t.tdev.Write(buf, offset) + } + + // below copied from wireguard-go NativeTUN.Write + + // reserve space for header + buf = buf[offset-4:] + + // add packet information header + buf[0] = 0x00 + buf[1] = 0x00 + if buf[4]>>4 == ipv6.Version { + buf[2] = 0x86 + buf[3] = 0xdd + } else { + buf[2] = 0x08 + buf[3] = 0x00 + } + + n, err := t.ring.Write(buf) + if errors.Is(err, syscall.EBADFD) { + err = os.ErrClosed + } + return n, err } func (t *Wrapper) GetFilter() *filter.Filter { diff --git a/net/uring/io_uring.c b/net/uring/io_uring.c index 13eec5094..3817cdf8e 100644 --- a/net/uring/io_uring.c +++ b/net/uring/io_uring.c @@ -32,8 +32,15 @@ static int initialize(struct io_uring *ring, int fd) { return ret; } +// packNIdx packs a returned n (usually number of bytes) and a index into a request array into a 63-bit uint64. +static uint64_t packNIdx(int n, size_t idx) { + uint64_t idx64 = idx & 0xFFFFFFFF; // truncate to 32 bits, just to be careful (should never be larger than 8) + uint64_t n64 = n & 0x7FFFFFFF; // truncate to 31 bits, just to be careful (should never be larger than 65544, max UDP write + IP header) + return (n64 << 32) | idx64; +} + // Wait for a completion to be available, fetch the data -static uint64_t receive_into(struct io_uring *ring) { +static uint64_t receive_into_udp(struct io_uring *ring) { struct io_uring_cqe *cqe; again:; @@ -47,22 +54,16 @@ again:; perror("io_uring_wait_cqe"); return ret; } - if (cqe->res < 0) { - fprintf(stderr, "recvmsg failed: %d.\n", cqe->res); - return cqe->res; + int n = cqe->res; + if (n < 0) { + // TODO: this leaks a buffer!!!! + fprintf(stderr, "recvmsg failed: %d.\n", n); + return n; } - size_t idxptr = (size_t)(io_uring_cqe_get_data(cqe)); - uint32_t idxptr32 = (uint32_t)(idxptr); - uint64_t idxptr64 = (uint64_t)(idxptr32); - uint64_t n = cqe->res; - uint32_t n32 = (uint32_t)n; - uint64_t n64 = (uint64_t)n; + size_t idx = (size_t)io_uring_cqe_get_data(cqe); + uint64_t nidx = packNIdx(n, idx); io_uring_cqe_seen(ring, cqe); - if (idxptr < 0) { - fprintf(stderr, "received nop\n"); - return -1; - } - return (n64 << 32) | idxptr64; + return nidx; } static uint32_t ip(struct sockaddr_in *sa) { @@ -90,7 +91,6 @@ static int submit_recvmsg_request(struct io_uring *ring, struct msghdr *mhdr, st io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); io_uring_sqe_set_data(sqe, (void *)(idx)); io_uring_submit(ring); - return 0; } @@ -100,3 +100,67 @@ static void submit_nop_request(struct io_uring *ring) { io_uring_sqe_set_data(sqe, (void *)(-1)); io_uring_submit(ring); } + +// Wait for a completion to be available, fetch the data +// TODO: unify with receive_into_udp +static uint64_t get_file_completion(struct io_uring *ring) { + struct io_uring_cqe *cqe; +again:; + + int ret = io_uring_wait_cqe(ring, &cqe); + if (ret == -EINTR) { + goto again; + } + // TODO: Delete perror, fprintf, etc. + // Encode in return value or similar. + if (ret < 0) { + perror("get_file_completion io_uring_wait_cqe"); + return ret; + } + int n = cqe->res; + if (n < 0) { + // TODO: This leaks a buffer!!! + fprintf(stderr, "get_file_completion write failed: %d.\n", n); + return n; + } + size_t idx = (size_t)io_uring_cqe_get_data(cqe); + uint64_t nidx = packNIdx(n, idx); + io_uring_cqe_seen(ring, cqe); + return nidx; +} + +// submit a write request via liburing +static int submit_write_request(struct io_uring *ring, char *buf, int buflen, size_t idx) { + fprintf(stderr, "submit_write_request buf %p buflen %d idx %lu\n", buf, buflen, idx); + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); + io_uring_prep_write(sqe, 0, buf, buflen, 0); // use the 0th file in the list of registered fds + io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); + io_uring_sqe_set_data(sqe, (void *)(idx)); + io_uring_submit(ring); + return 0; +} + +// TODO: unify with get_file_completion +static uint64_t peek_file_completion(struct io_uring *ring) { + struct io_uring_cqe *cqe; + int ret = io_uring_peek_cqe(ring, &cqe); + if ((-ret == EAGAIN) || (-ret == EINTR)) { + return ret; + } + // TODO: Delete perror, fprintf, etc. + // Encode in return value or similar. + if (ret < 0) { + perror("peek_file_completion io_uring_wait_cqe"); + return ret; + } + int n = cqe->res; + if (n < 0) { + // TODO: This leaks a buffer!!! + fprintf(stderr, "peek_file_completion write failed: %d.\n", n); + return n; + } + size_t idx = (size_t)io_uring_cqe_get_data(cqe); + uint64_t nidx = packNIdx(n, idx); + io_uring_cqe_seen(ring, cqe); + return nidx; +} diff --git a/net/uring/io_uring_linux.go b/net/uring/io_uring_linux.go index 003b02164..8dc9095f4 100644 --- a/net/uring/io_uring_linux.go +++ b/net/uring/io_uring_linux.go @@ -11,6 +11,7 @@ "net" "os" "sync" + "syscall" "time" "unsafe" @@ -36,7 +37,7 @@ type UDPConn struct { file *os.File // must keep file from being GC'd fd C.int local net.Addr - reqs [8]req + reqs [8]udpReq } func NewUDPConn(conn *net.UDPConn) (*UDPConn, error) { @@ -77,7 +78,7 @@ func NewUDPConn(conn *net.UDPConn) (*UDPConn, error) { return u, nil } -type req struct { +type udpReq struct { mhdr C.go_msghdr iov C.go_iovec sa C.go_sockaddr_in @@ -98,13 +99,12 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) { if u.fd == 0 { return 0, netaddr.IPPort{}, errors.New("invalid uring.UDPConn") } - nidx := C.receive_into(u.ptr) - if int64(nidx) < 0 { - return 0, netaddr.IPPort{}, fmt.Errorf("something wrong, errno %v", int64(nidx)) + nidx := C.receive_into_udp(u.ptr) + n, idx, err := unpackNIdx(nidx) + if err != nil { + return 0, netaddr.IPPort{}, fmt.Errorf("ReadFromNetaddr: %v", err) } - idx := uint32(nidx) - n := uint32(nidx >> 32) - r := &u.reqs[int(idx)] + r := &u.reqs[idx] ip := C.ip(&r.sa) var ip4 [4]byte binary.BigEndian.PutUint32(ip4[:], uint32(ip)) @@ -112,11 +112,11 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) { ipp := netaddr.IPPortFrom(netaddr.IPFrom4(ip4), uint16(port)) copy(buf, r.buf[:n]) // Queue up a new request. - err := u.submitRequest(int(idx)) + err = u.submitRequest(int(idx)) if err != nil { panic("how should we handle this?") } - return int(n), ipp, nil + return n, ipp, nil } func (u *UDPConn) Close() error { @@ -181,3 +181,110 @@ func (c *UDPConn) SetReadDeadline(t time.Time) error { func (c *UDPConn) SetWriteDeadline(t time.Time) error { panic("not implemented") // TODO: Implement } + +// Files! + +// A File is a write-only file fd manager. +// TODO: Support reads +// TODO: all the todos from UDPConn +type File struct { + ptr *C.go_uring + close sync.Once + file *os.File // must keep file from being GC'd + fd C.int + reqs [8]udpReq + reqC chan int // indices into reqs +} + +func NewFile(file *os.File) (*File, error) { + r := new(C.go_uring) + d, err := syscall.Dup(int(file.Fd())) + if err != nil { + return nil, err + } + err = syscall.SetNonblock(d, false) + if err != nil { + return nil, err + } + fd := C.int(d) + // fmt.Println("INIT NEW FILE WITH FD", int(file.Fd()), "DUP'd to", d) + ret := C.initialize(r, fd) + if ret < 0 { + return nil, fmt.Errorf("uring initialization failed: %d", ret) + } + u := &File{ + ptr: r, + file: file, + fd: fd, + } + u.reqC = make(chan int, len(u.reqs)) + for i := range u.reqs { + u.reqC <- i + } + return u, nil +} + +func unpackNIdx(nidx C.uint64_t) (n, idx int, err error) { + if int64(nidx) < 0 { + return 0, 0, fmt.Errorf("error %d", int64(nidx)) + } + return int(uint32(nidx >> 32)), int(uint32(nidx)), nil +} + +type fileReq struct { + buf [device.MaxSegmentSize]byte +} + +func (u *File) Write(buf []byte) (int, error) { + fmt.Println("WRITE ", len(buf), "BYTES") + if u.fd == 0 { + return 0, errors.New("invalid uring.FileConn") + } + // If we need a buffer, get a buffer, potentially blocking. + var idx int + select { + case idx = <-u.reqC: + fmt.Println("REQ AVAIL") + default: + fmt.Println("NO REQ AVAIL??? wait for one...") + // No request available. Get one from the kernel. + nidx := C.get_file_completion(u.ptr) + var err error + _, idx, err = unpackNIdx(nidx) + if err != nil { + return 0, fmt.Errorf("some write failed, maybe long ago: %v", err) + } + } + r := &u.reqs[idx] + // Do the write. + copy(r.buf[:], buf) + fmt.Println("SUBMIT WRITE REQUEST") + C.submit_write_request(u.ptr, (*C.char)(unsafe.Pointer(&r.buf[0])), C.int(len(buf)), C.size_t(idx)) + // Get an extra buffer, if available. + nidx := C.peek_file_completion(u.ptr) + if syscall.Errno(-nidx) == syscall.EAGAIN || syscall.Errno(-nidx) == syscall.EINTR { + // Nothing waiting for us. + fmt.Println("PEEK: ignore EAGAIN/EINTR") + } else { + n, idx, err := unpackNIdx(nidx) // ignore errors here, this is best-effort only (TODO: right?) + fmt.Println("PEEK RESULT:", n, idx, err) + if err == nil { + // Put the request buffer back in the usable queue. + // Should never block, by construction. + u.reqC <- idx + } + } + return len(buf), nil +} + +// TODO: the TODOs from UDPConn.Close +func (u *File) Close() error { + u.close.Do(func() { + C.io_uring_queue_exit(u.ptr) + u.ptr = nil + u.file.Close() + u.file = nil + u.fd = 0 + }) + return nil +}