tun writes...not working yet (but why not??)

This commit is contained in:
Josh Bleecher Snyder
2021-06-02 15:30:02 -07:00
parent a03ee93e21
commit 7fd5e31070
3 changed files with 240 additions and 27 deletions

View File

@@ -12,12 +12,16 @@
"os"
"sync"
"sync/atomic"
"syscall"
"time"
"golang.org/x/net/ipv6"
"golang.zx2c4.com/wireguard/device"
"golang.zx2c4.com/wireguard/tun"
wgtun "golang.zx2c4.com/wireguard/tun" // import twice to work around shadowing (TODO: fix properly)
"inet.af/netaddr"
"tailscale.com/net/packet"
"tailscale.com/net/uring"
"tailscale.com/types/ipproto"
"tailscale.com/types/logger"
"tailscale.com/wgengine/filter"
@@ -62,6 +66,9 @@ type Wrapper struct {
// tdev is the underlying Wrapper device.
tdev tun.Device
// uring performs writes to the underlying Wrapper device.
ring *uring.File
closeOnce sync.Once
lastActivityAtomic int64 // unix seconds of last send or receive
@@ -165,6 +172,13 @@ func Wrap(logf logger.Logf, tdev tun.Device) *Wrapper {
// The buffer starts out consumed.
tun.bufferConsumed <- struct{}{}
f := tdev.(*wgtun.NativeTun).File()
ring, err := uring.NewFile(f)
if err != nil {
panic(err) // TODO: just log? wat?
} else {
tun.ring = ring
}
return tun
}
@@ -519,7 +533,35 @@ func (t *Wrapper) Write(buf []byte, offset int) (int, error) {
}
t.noteActivity()
return t.tdev.Write(buf, offset)
return t.write(buf, offset)
}
func (t *Wrapper) write(buf []byte, offset int) (int, error) {
if t.ring == nil {
return t.tdev.Write(buf, offset)
}
// below copied from wireguard-go NativeTUN.Write
// reserve space for header
buf = buf[offset-4:]
// add packet information header
buf[0] = 0x00
buf[1] = 0x00
if buf[4]>>4 == ipv6.Version {
buf[2] = 0x86
buf[3] = 0xdd
} else {
buf[2] = 0x08
buf[3] = 0x00
}
n, err := t.ring.Write(buf)
if errors.Is(err, syscall.EBADFD) {
err = os.ErrClosed
}
return n, err
}
func (t *Wrapper) GetFilter() *filter.Filter {

View File

@@ -32,8 +32,15 @@ static int initialize(struct io_uring *ring, int fd) {
return ret;
}
// packNIdx packs a returned n (usually number of bytes) and a index into a request array into a 63-bit uint64.
static uint64_t packNIdx(int n, size_t idx) {
uint64_t idx64 = idx & 0xFFFFFFFF; // truncate to 32 bits, just to be careful (should never be larger than 8)
uint64_t n64 = n & 0x7FFFFFFF; // truncate to 31 bits, just to be careful (should never be larger than 65544, max UDP write + IP header)
return (n64 << 32) | idx64;
}
// Wait for a completion to be available, fetch the data
static uint64_t receive_into(struct io_uring *ring) {
static uint64_t receive_into_udp(struct io_uring *ring) {
struct io_uring_cqe *cqe;
again:;
@@ -47,22 +54,16 @@ again:;
perror("io_uring_wait_cqe");
return ret;
}
if (cqe->res < 0) {
fprintf(stderr, "recvmsg failed: %d.\n", cqe->res);
return cqe->res;
int n = cqe->res;
if (n < 0) {
// TODO: this leaks a buffer!!!!
fprintf(stderr, "recvmsg failed: %d.\n", n);
return n;
}
size_t idxptr = (size_t)(io_uring_cqe_get_data(cqe));
uint32_t idxptr32 = (uint32_t)(idxptr);
uint64_t idxptr64 = (uint64_t)(idxptr32);
uint64_t n = cqe->res;
uint32_t n32 = (uint32_t)n;
uint64_t n64 = (uint64_t)n;
size_t idx = (size_t)io_uring_cqe_get_data(cqe);
uint64_t nidx = packNIdx(n, idx);
io_uring_cqe_seen(ring, cqe);
if (idxptr < 0) {
fprintf(stderr, "received nop\n");
return -1;
}
return (n64 << 32) | idxptr64;
return nidx;
}
static uint32_t ip(struct sockaddr_in *sa) {
@@ -90,7 +91,6 @@ static int submit_recvmsg_request(struct io_uring *ring, struct msghdr *mhdr, st
io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
io_uring_sqe_set_data(sqe, (void *)(idx));
io_uring_submit(ring);
return 0;
}
@@ -100,3 +100,67 @@ static void submit_nop_request(struct io_uring *ring) {
io_uring_sqe_set_data(sqe, (void *)(-1));
io_uring_submit(ring);
}
// Wait for a completion to be available, fetch the data
// TODO: unify with receive_into_udp
static uint64_t get_file_completion(struct io_uring *ring) {
struct io_uring_cqe *cqe;
again:;
int ret = io_uring_wait_cqe(ring, &cqe);
if (ret == -EINTR) {
goto again;
}
// TODO: Delete perror, fprintf, etc.
// Encode in return value or similar.
if (ret < 0) {
perror("get_file_completion io_uring_wait_cqe");
return ret;
}
int n = cqe->res;
if (n < 0) {
// TODO: This leaks a buffer!!!
fprintf(stderr, "get_file_completion write failed: %d.\n", n);
return n;
}
size_t idx = (size_t)io_uring_cqe_get_data(cqe);
uint64_t nidx = packNIdx(n, idx);
io_uring_cqe_seen(ring, cqe);
return nidx;
}
// submit a write request via liburing
static int submit_write_request(struct io_uring *ring, char *buf, int buflen, size_t idx) {
fprintf(stderr, "submit_write_request buf %p buflen %d idx %lu\n", buf, buflen, idx);
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_write(sqe, 0, buf, buflen, 0); // use the 0th file in the list of registered fds
io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
io_uring_sqe_set_data(sqe, (void *)(idx));
io_uring_submit(ring);
return 0;
}
// TODO: unify with get_file_completion
static uint64_t peek_file_completion(struct io_uring *ring) {
struct io_uring_cqe *cqe;
int ret = io_uring_peek_cqe(ring, &cqe);
if ((-ret == EAGAIN) || (-ret == EINTR)) {
return ret;
}
// TODO: Delete perror, fprintf, etc.
// Encode in return value or similar.
if (ret < 0) {
perror("peek_file_completion io_uring_wait_cqe");
return ret;
}
int n = cqe->res;
if (n < 0) {
// TODO: This leaks a buffer!!!
fprintf(stderr, "peek_file_completion write failed: %d.\n", n);
return n;
}
size_t idx = (size_t)io_uring_cqe_get_data(cqe);
uint64_t nidx = packNIdx(n, idx);
io_uring_cqe_seen(ring, cqe);
return nidx;
}

View File

@@ -11,6 +11,7 @@
"net"
"os"
"sync"
"syscall"
"time"
"unsafe"
@@ -36,7 +37,7 @@ type UDPConn struct {
file *os.File // must keep file from being GC'd
fd C.int
local net.Addr
reqs [8]req
reqs [8]udpReq
}
func NewUDPConn(conn *net.UDPConn) (*UDPConn, error) {
@@ -77,7 +78,7 @@ func NewUDPConn(conn *net.UDPConn) (*UDPConn, error) {
return u, nil
}
type req struct {
type udpReq struct {
mhdr C.go_msghdr
iov C.go_iovec
sa C.go_sockaddr_in
@@ -98,13 +99,12 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) {
if u.fd == 0 {
return 0, netaddr.IPPort{}, errors.New("invalid uring.UDPConn")
}
nidx := C.receive_into(u.ptr)
if int64(nidx) < 0 {
return 0, netaddr.IPPort{}, fmt.Errorf("something wrong, errno %v", int64(nidx))
nidx := C.receive_into_udp(u.ptr)
n, idx, err := unpackNIdx(nidx)
if err != nil {
return 0, netaddr.IPPort{}, fmt.Errorf("ReadFromNetaddr: %v", err)
}
idx := uint32(nidx)
n := uint32(nidx >> 32)
r := &u.reqs[int(idx)]
r := &u.reqs[idx]
ip := C.ip(&r.sa)
var ip4 [4]byte
binary.BigEndian.PutUint32(ip4[:], uint32(ip))
@@ -112,11 +112,11 @@ func (u *UDPConn) ReadFromNetaddr(buf []byte) (int, netaddr.IPPort, error) {
ipp := netaddr.IPPortFrom(netaddr.IPFrom4(ip4), uint16(port))
copy(buf, r.buf[:n])
// Queue up a new request.
err := u.submitRequest(int(idx))
err = u.submitRequest(int(idx))
if err != nil {
panic("how should we handle this?")
}
return int(n), ipp, nil
return n, ipp, nil
}
func (u *UDPConn) Close() error {
@@ -181,3 +181,110 @@ func (c *UDPConn) SetReadDeadline(t time.Time) error {
func (c *UDPConn) SetWriteDeadline(t time.Time) error {
panic("not implemented") // TODO: Implement
}
// Files!
// A File is a write-only file fd manager.
// TODO: Support reads
// TODO: all the todos from UDPConn
type File struct {
ptr *C.go_uring
close sync.Once
file *os.File // must keep file from being GC'd
fd C.int
reqs [8]udpReq
reqC chan int // indices into reqs
}
func NewFile(file *os.File) (*File, error) {
r := new(C.go_uring)
d, err := syscall.Dup(int(file.Fd()))
if err != nil {
return nil, err
}
err = syscall.SetNonblock(d, false)
if err != nil {
return nil, err
}
fd := C.int(d)
// fmt.Println("INIT NEW FILE WITH FD", int(file.Fd()), "DUP'd to", d)
ret := C.initialize(r, fd)
if ret < 0 {
return nil, fmt.Errorf("uring initialization failed: %d", ret)
}
u := &File{
ptr: r,
file: file,
fd: fd,
}
u.reqC = make(chan int, len(u.reqs))
for i := range u.reqs {
u.reqC <- i
}
return u, nil
}
func unpackNIdx(nidx C.uint64_t) (n, idx int, err error) {
if int64(nidx) < 0 {
return 0, 0, fmt.Errorf("error %d", int64(nidx))
}
return int(uint32(nidx >> 32)), int(uint32(nidx)), nil
}
type fileReq struct {
buf [device.MaxSegmentSize]byte
}
func (u *File) Write(buf []byte) (int, error) {
fmt.Println("WRITE ", len(buf), "BYTES")
if u.fd == 0 {
return 0, errors.New("invalid uring.FileConn")
}
// If we need a buffer, get a buffer, potentially blocking.
var idx int
select {
case idx = <-u.reqC:
fmt.Println("REQ AVAIL")
default:
fmt.Println("NO REQ AVAIL??? wait for one...")
// No request available. Get one from the kernel.
nidx := C.get_file_completion(u.ptr)
var err error
_, idx, err = unpackNIdx(nidx)
if err != nil {
return 0, fmt.Errorf("some write failed, maybe long ago: %v", err)
}
}
r := &u.reqs[idx]
// Do the write.
copy(r.buf[:], buf)
fmt.Println("SUBMIT WRITE REQUEST")
C.submit_write_request(u.ptr, (*C.char)(unsafe.Pointer(&r.buf[0])), C.int(len(buf)), C.size_t(idx))
// Get an extra buffer, if available.
nidx := C.peek_file_completion(u.ptr)
if syscall.Errno(-nidx) == syscall.EAGAIN || syscall.Errno(-nidx) == syscall.EINTR {
// Nothing waiting for us.
fmt.Println("PEEK: ignore EAGAIN/EINTR")
} else {
n, idx, err := unpackNIdx(nidx) // ignore errors here, this is best-effort only (TODO: right?)
fmt.Println("PEEK RESULT:", n, idx, err)
if err == nil {
// Put the request buffer back in the usable queue.
// Should never block, by construction.
u.reqC <- idx
}
}
return len(buf), nil
}
// TODO: the TODOs from UDPConn.Close
func (u *File) Close() error {
u.close.Do(func() {
C.io_uring_queue_exit(u.ptr)
u.ptr = nil
u.file.Close()
u.file = nil
u.fd = 0
})
return nil
}