From ab3eaf69ce0d6dddc2402f03a626bdd1fb0131da Mon Sep 17 00:00:00 2001 From: Simeng He Date: Fri, 25 Jun 2021 16:07:28 -0400 Subject: [PATCH] bad news --- net/isoping/isoping.go | 131 +++++++++++++++++++++++++++++++++--- net/isoping/isoping_test.go | 2 + 2 files changed, 125 insertions(+), 8 deletions(-) diff --git a/net/isoping/isoping.go b/net/isoping/isoping.go index bd351de6c..b5005ded0 100644 --- a/net/isoping/isoping.go +++ b/net/isoping/isoping.go @@ -171,7 +171,8 @@ func (srv *Isoping) initVars() { srv.minCycleRxdiff = 0 srv.nextCycle = 0 srv.now = srv.Ustime() - srv.nextSend = 0 + srv.nextSend = srv.now + uint32(srv.usecPerPkt) + srv.numLost = 0 srv.nextTxackIndex = 0 srv.Tx = Packet{} srv.Rx = Packet{} @@ -194,7 +195,8 @@ func (srv *Isoping) generateInitialPacket() (*bytes.Buffer, error) { srv.Tx.Usec_per_pkt = uint32(srv.usecPerPkt) srv.Tx.Clockdiff = 0 if srv.startRtxtime > 0 { - srv.Rx.Clockdiff = srv.startRtxtime - srv.startRxtime + srv.Tx.Clockdiff = srv.startRtxtime - srv.startRxtime + log.Println("SETCLOCKDIFF to", srv.Rx.Clockdiff) } srv.Tx.Num_lost = srv.numLost srv.Tx.First_ack = uint32(srv.nextTxackIndex) @@ -214,11 +216,45 @@ func (srv *Isoping) Start(address ...string) { } srv.initVars() } +func (srv *Isoping) MainTest() { + for { + srv.initTimer() + } +} + +func (srv *Isoping) initTimer() { + tv := time.Duration(0) + if DIFF(srv.nextSend, srv.now) < 0 { + log.Printf("Set to %v us\n", tv.Microseconds()) + } else { + tv = time.Microsecond * time.Duration(DIFF(srv.nextSend, srv.now)) + log.Printf("Set to %v us\n", tv.Microseconds()) + } + log.Println(tv) + + // emulate the select with timeout of tv. + if srv.RemoteAddr != nil { + deadline := time.Now().Add(time.Duration(tv.Microseconds()) * time.Microsecond) + log.Println("TIMEOUT DURATION : ", deadline) + err := srv.Conn.SetReadDeadline(deadline) + if err != nil { + log.Println(err) + return + } + } else { + // Reset the timeout, we will have no timeout in this case. + err := srv.Conn.SetReadDeadline(time.Time{}) + if err != nil { + log.Println(err) + return + } + } + srv.now = srv.Ustime() +} func (srv *Isoping) MainLoop() { for { - log.Println("Action") - srv.now = srv.Ustime() + srv.initTimer() log.Printf("%d - %d = %d\n", srv.now, srv.nextSend, DIFF(srv.now, srv.nextSend)) if srv.RemoteAddr != nil && DIFF(srv.now, srv.nextSend) >= 0 { // Check if it is a server or not @@ -256,8 +292,10 @@ func (srv *Isoping) MainLoop() { p := make([]byte, binary.Size(srv.Rx)) got, rxaddr, err := srv.Conn.ReadFromUDP(p) if err != nil { - log.Println(err) + // log.Println("READ error : ", err) continue + } else { + log.Println("SUCC") } srv.RxAddr = rxaddr log.Println("AFTER READ") @@ -265,7 +303,7 @@ func (srv *Isoping) MainLoop() { buffer := bytes.NewBuffer(p) err = binary.Read(buffer, binary.BigEndian, &srv.Rx) if err != nil { - log.Println(err) + log.Println("BINARY READ err: ", err) continue } @@ -335,7 +373,7 @@ func (srv *Isoping) MainLoop() { rtt := clockdiff + int32(srv.Rx.Clockdiff) // Casting issue here may exist - // offset := DIFF(uint32(clockdiff), uint32(rtt/2)) + offset := DIFF(uint32(clockdiff), uint32(rtt/2)) if srv.Rx.Clockdiff == 0 { srv.lastPrint = srv.now - uint32(srv.usecPerPrint) + 1 } else { @@ -354,7 +392,84 @@ func (srv *Isoping) MainLoop() { srv.latRxVarSum += srv.latRx * srv.latRx } okToPrint := !srv.Quiet && DIFF(srv.now, srv.lastPrint) >= srv.usecPerPrint - log.Println("OKTOPRINT : ", okToPrint) + log.Printf("OKTOPRINT : %v, QUIET : %v\n", okToPrint, srv.Quiet) + log.Printf("%v - %v = %v\n", srv.now, srv.lastPrint, DIFF(srv.now, srv.lastPrint)) + log.Printf("usecPerPrint : %v", srv.usecPerPrint) + // Print the information. + if okToPrint { + ackinfo := srv.LastAckInfo + msRx := float64((rxdiff + rtt/2) / 1000.0) + min := float64((rtt / 2) / 1000.0) + rxNumLost := int64(srv.Rx.Num_lost) + nextTxId := srv.nextTxId - 1 + numLost := int64(srv.numLost) + nextRxId := int64(srv.nextRxId - 1) + log.Printf("%12s %6.1f ms rx (min=%.1f) loss: %v/%v tx %v/%v rx\n", + ackinfo, msRx, min, rxNumLost, nextTxId, numLost, nextRxId) + srv.lastPrint = srv.now + } + + if rxdiff < srv.minCycleRxdiff { + srv.minCycleRxdiff = rxdiff + } + + if DIFF(srv.now, srv.nextCycle) >= 0 { + if srv.minCycleRxdiff > 0 { + log.Printf("clock skew: sliding start by %v usec\n", srv.minCycleRxdiff) + srv.startRxtime += uint32(srv.minCycleRxdiff) + } + srv.minCycleRxdiff = 0x7fffffff + srv.nextCycle += uint32(srv.minCycleRxdiff) + } + log.Println("SCHEDULING") + + srv.Tx.Acks[srv.nextTxackIndex].Id = id + srv.Tx.Acks[srv.nextTxackIndex].Rxtime = rxtime + srv.nextTxackIndex = (srv.nextTxackIndex + 1) % len(srv.Tx.Acks) + + first_ack := uint32(srv.Rx.First_ack) + log.Println("FOR START") + for i := uint32(0); i < uint32(len(srv.Rx.Acks)); i++ { + var acki uint32 = (first_ack + i) % uint32(len(srv.Rx.Acks)) + var ackid uint32 = srv.Rx.Acks[acki].Id + if ackid == 0 { + continue + } + if DIFF(ackid, srv.nextRxackId) >= 0 { + log.Println("EXPECTED AN ACK") + + startTxTime := srv.nextSend - srv.nextTxId*uint32(srv.usecPerPkt) + txtime := startTxTime + ackid*uint32(srv.usecPerPkt) + rrxtime := srv.Rx.Acks[acki].Rxtime + rxtime := rrxtime + uint32(offset) + + txdiff := DIFF(rxtime, txtime) + if srv.usecPerPkt <= 0 && len(srv.LastAckInfo) > 0 { + log.Printf("%12s\n", srv.LastAckInfo) + } + if len(srv.LastAckInfo) == 0 { + //populate it + } + srv.nextRxackId = ackid + 1 + srv.latTxCount++ + srv.latTx = int64(txdiff) + if srv.latTxMin > srv.latTx { + srv.latTxMin = srv.latTx + } + + if srv.latTxMax < srv.latTx { + srv.latTxMax = srv.latTx + } + + srv.latTxSum += srv.latTx + srv.latTxVarSum += srv.latTx * srv.latTx + } + } + srv.lastRxtime = rxtime } } + +func (srv *Isoping) printResult() { + log.Printf("\n---\n") +} diff --git a/net/isoping/isoping_test.go b/net/isoping/isoping_test.go index ae3829da4..fee0121df 100644 --- a/net/isoping/isoping_test.go +++ b/net/isoping/isoping_test.go @@ -93,6 +93,7 @@ func TestMainLoop(t *testing.T) { server.Start() defer server.Conn.Close() server.MainLoop() + // server.MainTest() } func TestStartClient(t *testing.T) { @@ -100,4 +101,5 @@ func TestStartClient(t *testing.T) { client.Start("[::]:4948") defer client.Conn.Close() client.MainLoop() + // client.MainTest() }