they work with extra junk

thanks, dave
no thanks, kernel devs

write no work
use writev
This commit is contained in:
Josh Bleecher Snyder
2021-06-02 17:07:24 -07:00
parent 7fd5e31070
commit 522fa9306e
3 changed files with 87 additions and 25 deletions

41
file.go Normal file
View File

@@ -0,0 +1,41 @@
package main
import (
"fmt"
"os"
"syscall"
"time"
"tailscale.com/net/uring"
)
func main() {
// f, err := os.Create("junk")
// check(err)
// _, err = f.Write([]byte("CMON\n"))
// check(err)
fd, err := syscall.Open("trash", syscall.O_RDWR|syscall.O_CREAT|syscall.O_TRUNC, 0644)
check(err)
n, err := syscall.Write(int(fd), []byte("part two\n"))
check(err)
fmt.Println("N", n)
ff := os.NewFile(uintptr(fd), "trash")
uf, err := uring.NewFile(ff)
check(err)
for i := 0; i < 1; i++ {
s := fmt.Sprintf("i can count to %d\x00\n", i)
n, err := uf.Write([]byte(s), n)
check(err)
fmt.Println("wrote", n, "bytes")
time.Sleep(3 * time.Second)
}
syscall.Close(fd)
}
func check(err error) {
if err != nil {
panic(err)
}
}

View File

@@ -1,4 +1,5 @@
#include <arpa/inet.h> // debugging
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <stdio.h>
@@ -25,11 +26,18 @@ static int initialize(struct io_uring *ring, int fd) {
memset(&params, 0, sizeof(params));
params.flags |= IORING_SETUP_SQPOLL;
params.sq_thread_idle = 1000; // 1s
io_uring_queue_init_params(16, ring, &params); // 16: size of ring
int ret;
ret = io_uring_queue_init_params(16, ring, &params); // 16: size of ring
if (ret < 0) {
return ret;
}
ret = io_uring_register_files(ring, &fd, 1);
// TODO: Do we need to unregister files on close, or is Closing the uring enough?
return ret;
perror("io_uring_queue_init");
if (ret < 0) {
return ret;
}
return 0;
}
// packNIdx packs a returned n (usually number of bytes) and a index into a request array into a 63-bit uint64.
@@ -130,13 +138,22 @@ again:;
}
// submit a write request via liburing
static int submit_write_request(struct io_uring *ring, char *buf, int buflen, size_t idx) {
fprintf(stderr, "submit_write_request buf %p buflen %d idx %lu\n", buf, buflen, idx);
static int submit_write_request(struct io_uring *ring, int fd, char *buf, int buflen, size_t idx, struct iovec *iov) {
// fprintf(stderr, "submit_write_request to fd %d buf %p %s buflen %d idx %lu\n", fd, buf, buf, buflen, idx);
// errno= 0;
// perror("before bonus write");
// int x = write(fd, buf, buflen);
// fprintf(stderr, "plain write returned %d\n", x);
// perror("submit_write_request bonus write");
iov->iov_base = buf;
iov->iov_len = buflen;
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_write(sqe, 0, buf, buflen, 0); // use the 0th file in the list of registered fds
io_uring_prep_writev(sqe, 0, iov, 1, 0); // use the 0th file in the list of registered fds
io_uring_sqe_set_flags(sqe, IOSQE_FIXED_FILE);
io_uring_sqe_set_data(sqe, (void *)(idx));
io_uring_submit(ring);
int submitted = io_uring_submit(ring);
// fprintf(stderr, "submitted %d sqes\n", submitted);
return 0;
}
@@ -144,15 +161,17 @@ static int submit_write_request(struct io_uring *ring, char *buf, int buflen, si
static uint64_t peek_file_completion(struct io_uring *ring) {
struct io_uring_cqe *cqe;
int ret = io_uring_peek_cqe(ring, &cqe);
// perror("on entry, peek_file_completion io_uring_wait_cqe");
if ((-ret == EAGAIN) || (-ret == EINTR)) {
return ret;
}
// TODO: Delete perror, fprintf, etc.
// Encode in return value or similar.
if (ret < 0) {
perror("peek_file_completion io_uring_wait_cqe");
perror("on failure, peek_file_completion io_uring_wait_cqe");
return ret;
}
errno = 0;
int n = cqe->res;
if (n < 0) {
// TODO: This leaks a buffer!!!

View File

@@ -192,22 +192,23 @@ type File struct {
close sync.Once
file *os.File // must keep file from being GC'd
fd C.int
reqs [8]udpReq
reqs [8]fileReq
reqC chan int // indices into reqs
}
func NewFile(file *os.File) (*File, error) {
r := new(C.go_uring)
d, err := syscall.Dup(int(file.Fd()))
if err != nil {
return nil, err
}
err = syscall.SetNonblock(d, false)
if err != nil {
return nil, err
}
fd := C.int(d)
// d, err := syscall.Dup(int(file.Fd()))
// if err != nil {
// return nil, err
// }
// err = syscall.SetNonblock(d, false)
// if err != nil {
// return nil, err
// }
// fd := C.int(d)
// fmt.Println("INIT NEW FILE WITH FD", int(file.Fd()), "DUP'd to", d)
fd := C.int(file.Fd())
ret := C.initialize(r, fd)
if ret < 0 {
return nil, fmt.Errorf("uring initialization failed: %d", ret)
@@ -232,11 +233,12 @@ func unpackNIdx(nidx C.uint64_t) (n, idx int, err error) {
}
type fileReq struct {
iov C.go_iovec
buf [device.MaxSegmentSize]byte
}
func (u *File) Write(buf []byte) (int, error) {
fmt.Println("WRITE ", len(buf), "BYTES")
// fmt.Println("WRITE ", len(buf), "BYTES")
if u.fd == 0 {
return 0, errors.New("invalid uring.FileConn")
}
@@ -244,9 +246,9 @@ func (u *File) Write(buf []byte) (int, error) {
var idx int
select {
case idx = <-u.reqC:
fmt.Println("REQ AVAIL")
// fmt.Println("REQ AVAIL")
default:
fmt.Println("NO REQ AVAIL??? wait for one...")
// fmt.Println("NO REQ AVAIL??? wait for one...")
// No request available. Get one from the kernel.
nidx := C.get_file_completion(u.ptr)
var err error
@@ -258,16 +260,16 @@ func (u *File) Write(buf []byte) (int, error) {
r := &u.reqs[idx]
// Do the write.
copy(r.buf[:], buf)
fmt.Println("SUBMIT WRITE REQUEST")
C.submit_write_request(u.ptr, (*C.char)(unsafe.Pointer(&r.buf[0])), C.int(len(buf)), C.size_t(idx))
// fmt.Println("SUBMIT WRITE REQUEST")
C.submit_write_request(u.ptr, u.fd, (*C.char)(unsafe.Pointer(&r.buf[0])), C.int(len(buf)), C.size_t(idx), &r.iov)
// Get an extra buffer, if available.
nidx := C.peek_file_completion(u.ptr)
if syscall.Errno(-nidx) == syscall.EAGAIN || syscall.Errno(-nidx) == syscall.EINTR {
// Nothing waiting for us.
fmt.Println("PEEK: ignore EAGAIN/EINTR")
// fmt.Println("PEEK: ignore EAGAIN/EINTR")
} else {
n, idx, err := unpackNIdx(nidx) // ignore errors here, this is best-effort only (TODO: right?)
fmt.Println("PEEK RESULT:", n, idx, err)
_, idx, err := unpackNIdx(nidx) // ignore errors here, this is best-effort only (TODO: right?)
// fmt.Println("PEEK RESULT:", n, idx, err)
if err == nil {
// Put the request buffer back in the usable queue.
// Should never block, by construction.