From 522fa9306ed250c4da15af4f4bfa490984d2b96d Mon Sep 17 00:00:00 2001 From: Josh Bleecher Snyder Date: Wed, 2 Jun 2021 17:07:24 -0700 Subject: [PATCH] they work with extra junk thanks, dave no thanks, kernel devs write no work use writev --- file.go | 41 +++++++++++++++++++++++++++++++++++++ net/uring/io_uring.c | 33 ++++++++++++++++++++++------- net/uring/io_uring_linux.go | 38 ++++++++++++++++++---------------- 3 files changed, 87 insertions(+), 25 deletions(-) create mode 100644 file.go diff --git a/file.go b/file.go new file mode 100644 index 000000000..5804c73c5 --- /dev/null +++ b/file.go @@ -0,0 +1,41 @@ +package main + +import ( + "fmt" + "os" + "syscall" + "time" + + "tailscale.com/net/uring" +) + +func main() { + // f, err := os.Create("junk") + // check(err) + // _, err = f.Write([]byte("CMON\n")) + // check(err) + + fd, err := syscall.Open("trash", syscall.O_RDWR|syscall.O_CREAT|syscall.O_TRUNC, 0644) + check(err) + n, err := syscall.Write(int(fd), []byte("part two\n")) + check(err) + fmt.Println("N", n) + + ff := os.NewFile(uintptr(fd), "trash") + uf, err := uring.NewFile(ff) + check(err) + for i := 0; i < 1; i++ { + s := fmt.Sprintf("i can count to %d\x00\n", i) + n, err := uf.Write([]byte(s), n) + check(err) + fmt.Println("wrote", n, "bytes") + time.Sleep(3 * time.Second) + } + syscall.Close(fd) +} + +func check(err error) { + if err != nil { + panic(err) + } +} diff --git a/net/uring/io_uring.c b/net/uring/io_uring.c index 3817cdf8e..6a0121ae2 100644 --- a/net/uring/io_uring.c +++ b/net/uring/io_uring.c @@ -1,4 +1,5 @@ #include // debugging +#include #include #include #include @@ -25,11 +26,18 @@ static int initialize(struct io_uring *ring, int fd) { memset(¶ms, 0, sizeof(params)); params.flags |= IORING_SETUP_SQPOLL; params.sq_thread_idle = 1000; // 1s - io_uring_queue_init_params(16, ring, ¶ms); // 16: size of ring int ret; + ret = io_uring_queue_init_params(16, ring, ¶ms); // 16: size of ring + if (ret < 0) { + return ret; + } ret = io_uring_register_files(ring, &fd, 1); // TODO: Do we need to unregister files on close, or is Closing the uring enough? - return ret; + perror("io_uring_queue_init"); + if (ret < 0) { + return ret; + } + return 0; } // packNIdx packs a returned n (usually number of bytes) and a index into a request array into a 63-bit uint64. @@ -130,13 +138,22 @@ again:; } // submit a write request via liburing -static int submit_write_request(struct io_uring *ring, char *buf, int buflen, size_t idx) { - fprintf(stderr, "submit_write_request buf %p buflen %d idx %lu\n", buf, buflen, idx); +static int submit_write_request(struct io_uring *ring, int fd, char *buf, int buflen, size_t idx, struct iovec *iov) { + // fprintf(stderr, "submit_write_request to fd %d buf %p %s buflen %d idx %lu\n", fd, buf, buf, buflen, idx); + // errno= 0; + // perror("before bonus write"); + // int x = write(fd, buf, buflen); + // fprintf(stderr, "plain write returned %d\n", x); + // perror("submit_write_request bonus write"); + iov->iov_base = buf; + iov->iov_len = buflen; + struct io_uring_sqe *sqe = io_uring_get_sqe(ring); - io_uring_prep_write(sqe, 0, buf, buflen, 0); // use the 0th file in the list of registered fds + io_uring_prep_writev(sqe, 0, iov, 1, 0); // use the 0th file in the list of registered fds io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE); io_uring_sqe_set_data(sqe, (void *)(idx)); - io_uring_submit(ring); + int submitted = io_uring_submit(ring); + // fprintf(stderr, "submitted %d sqes\n", submitted); return 0; } @@ -144,15 +161,17 @@ static int submit_write_request(struct io_uring *ring, char *buf, int buflen, si static uint64_t peek_file_completion(struct io_uring *ring) { struct io_uring_cqe *cqe; int ret = io_uring_peek_cqe(ring, &cqe); + // perror("on entry, peek_file_completion io_uring_wait_cqe"); if ((-ret == EAGAIN) || (-ret == EINTR)) { return ret; } // TODO: Delete perror, fprintf, etc. // Encode in return value or similar. if (ret < 0) { - perror("peek_file_completion io_uring_wait_cqe"); + perror("on failure, peek_file_completion io_uring_wait_cqe"); return ret; } + errno = 0; int n = cqe->res; if (n < 0) { // TODO: This leaks a buffer!!! diff --git a/net/uring/io_uring_linux.go b/net/uring/io_uring_linux.go index 8dc9095f4..fb0a6a303 100644 --- a/net/uring/io_uring_linux.go +++ b/net/uring/io_uring_linux.go @@ -192,22 +192,23 @@ type File struct { close sync.Once file *os.File // must keep file from being GC'd fd C.int - reqs [8]udpReq + reqs [8]fileReq reqC chan int // indices into reqs } func NewFile(file *os.File) (*File, error) { r := new(C.go_uring) - d, err := syscall.Dup(int(file.Fd())) - if err != nil { - return nil, err - } - err = syscall.SetNonblock(d, false) - if err != nil { - return nil, err - } - fd := C.int(d) + // d, err := syscall.Dup(int(file.Fd())) + // if err != nil { + // return nil, err + // } + // err = syscall.SetNonblock(d, false) + // if err != nil { + // return nil, err + // } + // fd := C.int(d) // fmt.Println("INIT NEW FILE WITH FD", int(file.Fd()), "DUP'd to", d) + fd := C.int(file.Fd()) ret := C.initialize(r, fd) if ret < 0 { return nil, fmt.Errorf("uring initialization failed: %d", ret) @@ -232,11 +233,12 @@ func unpackNIdx(nidx C.uint64_t) (n, idx int, err error) { } type fileReq struct { + iov C.go_iovec buf [device.MaxSegmentSize]byte } func (u *File) Write(buf []byte) (int, error) { - fmt.Println("WRITE ", len(buf), "BYTES") + // fmt.Println("WRITE ", len(buf), "BYTES") if u.fd == 0 { return 0, errors.New("invalid uring.FileConn") } @@ -244,9 +246,9 @@ func (u *File) Write(buf []byte) (int, error) { var idx int select { case idx = <-u.reqC: - fmt.Println("REQ AVAIL") + // fmt.Println("REQ AVAIL") default: - fmt.Println("NO REQ AVAIL??? wait for one...") + // fmt.Println("NO REQ AVAIL??? wait for one...") // No request available. Get one from the kernel. nidx := C.get_file_completion(u.ptr) var err error @@ -258,16 +260,16 @@ func (u *File) Write(buf []byte) (int, error) { r := &u.reqs[idx] // Do the write. copy(r.buf[:], buf) - fmt.Println("SUBMIT WRITE REQUEST") - C.submit_write_request(u.ptr, (*C.char)(unsafe.Pointer(&r.buf[0])), C.int(len(buf)), C.size_t(idx)) + // fmt.Println("SUBMIT WRITE REQUEST") + C.submit_write_request(u.ptr, u.fd, (*C.char)(unsafe.Pointer(&r.buf[0])), C.int(len(buf)), C.size_t(idx), &r.iov) // Get an extra buffer, if available. nidx := C.peek_file_completion(u.ptr) if syscall.Errno(-nidx) == syscall.EAGAIN || syscall.Errno(-nidx) == syscall.EINTR { // Nothing waiting for us. - fmt.Println("PEEK: ignore EAGAIN/EINTR") + // fmt.Println("PEEK: ignore EAGAIN/EINTR") } else { - n, idx, err := unpackNIdx(nidx) // ignore errors here, this is best-effort only (TODO: right?) - fmt.Println("PEEK RESULT:", n, idx, err) + _, idx, err := unpackNIdx(nidx) // ignore errors here, this is best-effort only (TODO: right?) + // fmt.Println("PEEK RESULT:", n, idx, err) if err == nil { // Put the request buffer back in the usable queue. // Should never block, by construction.