diff --git a/net/tstun/fake.go b/net/tstun/fake.go index 09d68b6ba..83c573a55 100644 --- a/net/tstun/fake.go +++ b/net/tstun/fake.go @@ -6,6 +6,7 @@ import ( "io" + "log" "os" "golang.zx2c4.com/wireguard/tun" @@ -35,11 +36,13 @@ func (t *fakeTUN) Close() error { } func (t *fakeTUN) Read(out []byte, offset int) (int, error) { + log.Println("TSTUN : FAKE") <-t.closechan return 0, io.EOF } func (t *fakeTUN) Write(b []byte, n int) (int, error) { + log.Println("FAKE : Write Called") select { case <-t.closechan: return 0, ErrClosed diff --git a/net/tstun/wrap.go b/net/tstun/wrap.go index 226ea3e10..e3fde0df0 100644 --- a/net/tstun/wrap.go +++ b/net/tstun/wrap.go @@ -8,9 +8,11 @@ import ( "errors" + "fmt" "io" "log" "os" + "runtime/debug" "sync" "sync/atomic" "time" @@ -141,6 +143,8 @@ type tunReadResult struct { } func Wrap(logf logger.Logf, tdev tun.Device) *Wrapper { + fmt.Printf("Tunnel Type %T", tdev) + debug.PrintStack() tun := &Wrapper{ logf: logger.WithPrefix(logf, "tstun: "), tdev: tdev, @@ -276,8 +280,8 @@ func allowSendOnClosedChannel() { // This is needed because t.tdev.Read in general may block (it does on Windows), // so packets may be stuck in t.outbound if t.Read called t.tdev.Read directly. func (t *Wrapper) poll() { -<<<<<<< HEAD defer allowSendOnClosedChannel() // for send to t.outbound + log.Println("TSTUN : POLL Started with len ", len(t.bufferConsumed)) for range t.bufferConsumed { var n int var err error @@ -289,55 +293,20 @@ func (t *Wrapper) poll() { // We don't need this loop for correctness, // but wireguard-go will skip an empty read, // so we might as well avoid the send through t.outbound. + log.Println("TSTUN : BEFORE READ") for n == 0 && err == nil { + log.Println("TSTUN : BEFORE READ IN FOR") if t.isClosed() { -======= - for { - log.Println("TSTUN: POLL START") - select { - case <-t.closed: - return - case <-t.bufferConsumed: - // continue - } - - log.Println("TSTUN: AFTER FIRST SELECT") - - // Read may use memory in t.buffer before PacketStartOffset for mandatory headers. - // This is the rationale behind the tun.Wrapper.{Read,Write} interfaces - // and the reason t.buffer has size MaxMessageSize and not MaxContentSize. - n, err := t.tdev.Read(t.buffer[:], PacketStartOffset) - log.Println("TSTUN : readerr,", err) - if err != nil { - select { - case <-t.closed: ->>>>>>> Many logs + log.Println("TSTUN : BEFORE T CLOSED") return } -<<<<<<< HEAD n, err = t.tdev.Read(t.buffer[:], PacketStartOffset) -======= - continue - } - - // Wireguard will skip an empty read, - // so we might as well do it here to avoid the send through t.outbound. - if n == 0 { - t.bufferConsumed <- struct{}{} - continue - } - log.Println("TSTUN : after err check", err) - - select { - case <-t.closed: - return - case t.outbound <- t.buffer[PacketStartOffset : PacketStartOffset+n]: - log.Println("TSTUN : Outbound Sent To") - // continue ->>>>>>> Many logs } + log.Println("TSTUN : BEFORE OUTBOUND") t.outbound <- tunReadResult{data: t.buffer[PacketStartOffset : PacketStartOffset+n], err: err} + log.Println("TSTUN : sent to outbound") } + log.Println("TSTUN : POLL FINISHED") } var magicDNSIPPort = netaddr.MustParseIPPort("100.100.100.100:0") @@ -392,13 +361,16 @@ func (t *Wrapper) IdleDuration() time.Duration { } func (t *Wrapper) Read(buf []byte, offset int) (int, error) { -<<<<<<< HEAD + now := time.Now() res, ok := <-t.outbound + log.Println("TSTUN : outbound wait for channel read time, ", time.Since(now)) if !ok { // Wrapper is closed. + log.Println("TSTUN : EOF") return 0, io.EOF } if res.err != nil { + log.Println("TSTUN : err: ", res.err) return 0, res.err } defer allowSendOnClosedChannel() // for send to t.bufferConsumed @@ -411,42 +383,11 @@ func (t *Wrapper) Read(buf []byte, offset int) (int, error) { if !isInjectedPacket { // We are done with t.buffer. Let poll re-use it. t.bufferConsumed <- struct{}{} -======= - log.Println("TSTUN: WRAPPERREAD") - now := time.Now() - var n int - - wasInjectedPacket := false - - log.Println("TSTUN : Before Select") - select { - case <-t.closed: - log.Println("TSTUN : EOF") - return 0, io.EOF - case err := <-t.errors: - log.Println("TSTUN : ", err) - return 0, err - case pkt := <-t.outbound: - log.Println("TSTUN : t<-outbound") - n = copy(buf[offset:], pkt) - // t.buffer has a fixed location in memory, - // so this is the easiest way to tell when it has been consumed. - // &pkt[0] can be used because empty packets do not reach t.outbound. - if &pkt[0] == &t.buffer[PacketStartOffset] { - t.bufferConsumed <- struct{}{} - } else { - // If the packet is not from t.buffer, then it is an injected packet. - wasInjectedPacket = true - } ->>>>>>> Many logs } - log.Println("TSTUN : After SELECT, took ", time.Since(now)) - p := parsedPacketPool.Get().(*packet.Parsed) defer parsedPacketPool.Put(p) p.Decode(buf[offset : offset+n]) - log.Println("TSTUN : After DECODE, took ", time.Since(now)) if m, ok := t.destIPActivity.Load().(map[netaddr.IP]func()); ok { if fn := m[p.Dst.IP()]; fn != nil { @@ -454,22 +395,8 @@ func (t *Wrapper) Read(buf []byte, offset int) (int, error) { } } -<<<<<<< HEAD // Do not filter injected packets. if !isInjectedPacket && !t.disableFilter { -======= - log.Println("TSTUN : After DestIP, took ", time.Since(now)) - - // For injected packets, we return early to bypass filtering. - if wasInjectedPacket { - log.Println("TSTUN : WasInjectedPacket, took ", time.Since(now)) - t.noteActivity() - log.Println("TSTUN BODY: ", n) - return n, nil - } - - if !t.disableFilter { ->>>>>>> Many logs response := t.filterOut(p) if response != filter.Accept { // Wireguard considers read errors fatal; pretend nothing was read @@ -478,11 +405,13 @@ func (t *Wrapper) Read(buf []byte, offset int) (int, error) { } t.noteActivity() - log.Printf("TSTUN: Full read took %vs", time.Since(now)) + log.Printf("TSTUN : Read Completed in %v\n", time.Since(now).Seconds()) return n, nil } func (t *Wrapper) filterIn(buf []byte) filter.Response { + log.Println("TSTUN : Filter In called") + now := time.Now() p := parsedPacketPool.Get().(*packet.Parsed) defer parsedPacketPool.Put(p) p.Decode(buf) @@ -498,12 +427,14 @@ func (t *Wrapper) filterIn(buf []byte) filter.Response { } } } + log.Println("TSTUN : Filter In After TSMP") if t.PreFilterIn != nil { if res := t.PreFilterIn(p, t); res.IsDrop() { return res } } + log.Println("TSTUN : Filter In After PreFilter") filt, _ := t.filter.Load().(*filter.Filter) @@ -512,6 +443,7 @@ func (t *Wrapper) filterIn(buf []byte) filter.Response { } outcome := filt.RunIn(p, t.filterFlags) + log.Println("TSTUN : Filter In After Outcome") // Let peerapi through the filter; its ACLs are handled at L7, // not at the packet level. @@ -523,14 +455,17 @@ func (t *Wrapper) filterIn(buf []byte) filter.Response { outcome = filter.Accept } } + log.Println("TSTUN : Filter In After Outcome check2 type : ", outcome.String()) if outcome != filter.Accept { + log.Println("TSTUN : Filter In After Outcome check3") // Tell them, via TSMP, we're dropping them due to the ACL. // Their host networking stack can translate this into ICMP // or whatnot as required. But notably, their GUI or tailscale CLI // can show them a rejection history with reasons. if p.IPVersion == 4 && p.IPProto == ipproto.TCP && p.TCPFlags&packet.TCPSyn != 0 && !t.disableTSMPRejected { + log.Println("TSTUN : Filter In After Outcome check4") rj := packet.TailscaleRejectedHeader{ IPSrc: p.Dst.IP(), IPDst: p.Src.IP(), @@ -544,18 +479,21 @@ func (t *Wrapper) filterIn(buf []byte) filter.Response { } pkt := packet.Generate(rj, nil) t.InjectOutbound(pkt) + log.Println("TSTUN : FilterIn Inject took ,", time.Since(now)) // TODO(bradfitz): also send a TCP RST, after the TSMP message. } return filter.Drop } + log.Println("TSTUN : Filter In After Outcome check5") if t.PostFilterIn != nil { if res := t.PostFilterIn(p, t); res.IsDrop() { return res } } + log.Println("TSTUN : Filter In After Outcome check6") return filter.Accept } @@ -563,6 +501,7 @@ func (t *Wrapper) filterIn(buf []byte) filter.Response { // Write accepts an incoming packet. The packet begins at buf[offset:], // like wireguard-go/tun.Device.Write. func (t *Wrapper) Write(buf []byte, offset int) (int, error) { + now := time.Now() if !t.disableFilter { if t.filterIn(buf[offset:]) != filter.Accept { // If we're not accepting the packet, lie to wireguard-go and pretend @@ -578,11 +517,16 @@ func (t *Wrapper) Write(buf []byte, offset int) (int, error) { // device/receive.go: _, err = device.tun.device.Write(....) // // TODO(bradfitz): fix upstream interface docs, implementation. + log.Println("TSTUN : Write completed early in ", time.Since(now)) return len(buf), nil } } + // Causes a data race ??? + // defer log.Println("TSTUN : Write completed in ", time.Since(now)) + // debug.PrintStack() t.noteActivity() + // log.Println("TSTUN : Write completed in ", time.Since(now)) return t.tdev.Write(buf, offset) } @@ -639,6 +583,7 @@ func (t *Wrapper) InjectInboundCopy(packet []byte) error { } func (t *Wrapper) injectOutboundPong(pp *packet.Parsed, req packet.TSMPPingRequest) { + now := time.Now() pong := packet.TSMPPongReply{ Data: req.Data, } @@ -659,6 +604,7 @@ func (t *Wrapper) injectOutboundPong(pp *packet.Parsed, req packet.TSMPPingReque } t.InjectOutbound(packet.Generate(pong, nil)) + log.Println("TSTUN : Inject outbound pong took ", time.Since(now)) } // InjectOutbound makes the Wrapper device behave as if a packet @@ -667,6 +613,8 @@ func (t *Wrapper) injectOutboundPong(pp *packet.Parsed, req packet.TSMPPingReque // The injected packet will not pass through outbound filters. // Injecting an empty packet is a no-op. func (t *Wrapper) InjectOutbound(packet []byte) error { + log.Println("TSTUN: Inject Outbound") + now := time.Now() if len(packet) > MaxPacketSize { return errPacketTooBig } @@ -675,6 +623,7 @@ func (t *Wrapper) InjectOutbound(packet []byte) error { } defer allowSendOnClosedChannel() // for send to t.outbound t.outbound <- tunReadResult{data: packet} + log.Println("TSTUN : Inject took ", time.Since(now)) return nil } diff --git a/tstest/integration/integration_test.go b/tstest/integration/integration_test.go index a19862cc5..d003017e7 100644 --- a/tstest/integration/integration_test.go +++ b/tstest/integration/integration_test.go @@ -298,12 +298,12 @@ func TestTwoNodeConnectivity(t *testing.T) { // Create two nodes and hope that logs come out correctly n1 := newTestNode(t, env) n1SocksAddrCh := n1.socks5AddrChan() - d1 := n1.StartDaemon(t) + d1 := n1.StartDaemonPrefix(t, "Node1 ") defer d1.Kill() n2 := newTestNode(t, env) n2SocksAddrCh := n2.socks5AddrChan() - d2 := n2.StartDaemon(t) + d2 := n2.StartDaemonPrefix(t, "Node2 ") defer d2.Kill() n1Socks := n1.AwaitSocksAddr(t, n1SocksAddrCh) @@ -402,11 +402,11 @@ func TestTwoNodeConnectivity(t *testing.T) { } // Read the bytes in - // p := make([]byte, 1024) - // _, err = dialerConn.Read(p) - // if err != nil { - // return err - // } + p := make([]byte, 1024) + _, err = dialerConn.Read(p) + if err != nil { + return err + } t.Logf("Time taken for this run : %vs", time.Since(now).Seconds()) return nil }); err != nil { @@ -609,9 +609,29 @@ func (d *Daemon) MustCleanShutdown(t testing.TB) { } } +type PrefixedWriter struct { + prefix string + w io.Writer +} + +func (p *PrefixedWriter) Write(b []byte) (int, error) { + var buf []byte + buf = append(buf, p.prefix...) + buf = append(buf, b...) + _, err := p.w.Write(buf) + if err != nil { + return 0, err + } + return len(b), err +} + +func (n *testNode) StartDaemon(t testing.TB) *Daemon { + return n.StartDaemonPrefix(t, "") +} + // StartDaemon starts the node's tailscaled, failing if it fails to // start. -func (n *testNode) StartDaemon(t testing.TB) *Daemon { +func (n *testNode) StartDaemonPrefix(t testing.TB, prefix string) *Daemon { cmd := exec.Command(n.env.Binaries.Daemon, "--tun=userspace-networking", "--state="+n.stateFile, @@ -625,8 +645,8 @@ func (n *testNode) StartDaemon(t testing.TB) *Daemon { ) cmd.Stderr = &nodeOutputParser{n: n} if *verboseTailscaled { - cmd.Stdout = os.Stdout - cmd.Stderr = io.MultiWriter(cmd.Stderr, os.Stderr) + cmd.Stdout = &PrefixedWriter{prefix: prefix, w: os.Stdout} + cmd.Stderr = io.MultiWriter(cmd.Stderr, &PrefixedWriter{prefix: prefix, w: os.Stderr}) } if err := cmd.Start(); err != nil { t.Fatalf("starting tailscaled: %v", err) diff --git a/wgengine/magicsock/magicsock.go b/wgengine/magicsock/magicsock.go index ba3bc5a24..83de1465f 100644 --- a/wgengine/magicsock/magicsock.go +++ b/wgengine/magicsock/magicsock.go @@ -1632,6 +1632,7 @@ func (c *Conn) receiveIPv4(b []byte) (n int, ep conn.Endpoint, err error) { // ok is whether this read should be reported up to wireguard-go (our // caller). func (c *Conn) receiveIP(b []byte, ipp netaddr.IPPort, cache *ippEndpointCache) (ep conn.Endpoint, ok bool) { + log.Println("MS : Received IP") if stun.Is(b) { c.stunReceiveFunc.Load().(func([]byte, netaddr.IPPort))(b, ipp) return nil, false @@ -1639,6 +1640,7 @@ func (c *Conn) receiveIP(b []byte, ipp netaddr.IPPort, cache *ippEndpointCache) if c.handleDiscoMessage(b, ipp) { return nil, false } + log.Println("MS : ReceivedIP called HandleDiscoMessage") if !c.havePrivateKey.Get() { // If we have no private key, we're logged out or // stopped. Don't try to pass these wireguard packets @@ -1685,6 +1687,7 @@ func (c *connBind) receiveDERP(b []byte) (n int, ep conn.Endpoint, err error) { } func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep conn.Endpoint) { + log.Println("MS : Process DERP READ RESULT") if dm.copyBuf == nil { return 0, nil } @@ -1699,8 +1702,10 @@ func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep con ipp := netaddr.IPPortFrom(derpMagicIPAddr, uint16(regionID)) if c.handleDiscoMessage(b[:n], ipp) { + log.Println("MS : c.HandleDiscoMessage") return 0, nil } + log.Println("MS : ProcessDerp HandleDiscoMessage") var ( didNoteRecvActivity bool @@ -1764,6 +1769,7 @@ func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep con ) func (c *Conn) sendDiscoMessage(dst netaddr.IPPort, dstKey tailcfg.NodeKey, dstDisco tailcfg.DiscoKey, m disco.Message, logLevel discoLogLevel) (sent bool, err error) { + log.Println("MS: SENDDISCOMESSAGE") c.mu.Lock() if c.closed { c.mu.Unlock() @@ -1936,6 +1942,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort) (isDiscoMsg bo switch dm := dm.(type) { case *disco.Ping: + log.Println("Pinger", de) c.handlePingLocked(dm, de, src, sender, peerNode) case *disco.Pong: log.Println("Ponger", de) @@ -1957,6 +1964,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort) (isDiscoMsg bo go de.handleCallMeMaybe(dm) } } + log.Println("DISCO MESSAGE COMPLETE") return } @@ -2655,6 +2663,7 @@ func (c *Conn) listenPacket(network string, host netaddr.IP, port uint16) (net.P // If curPortFate is set to dropCurrentPort, no attempt is made to reuse // the current port. func (c *Conn) bindSocket(rucPtr **RebindingUDPConn, network string, curPortFate currentPortFate) error { + log.Println("MS: BINDSOCKET") var host netaddr.IP if inTest() && !c.simulatedNetwork { switch network { @@ -2842,6 +2851,7 @@ func (c *RebindingUDPConn) currentConn() net.PacketConn { // ReadFrom reads a packet from c into b. // It returns the number of bytes copied and the source address. func (c *RebindingUDPConn) ReadFrom(b []byte) (int, net.Addr, error) { + log.Println("MS : ReadFrom ", c.pconn.LocalAddr()) for { pconn := c.currentConn() n, addr, err := pconn.ReadFrom(b) @@ -2927,6 +2937,9 @@ func (c *RebindingUDPConn) closeLocked() error { func (c *RebindingUDPConn) WriteTo(b []byte, addr net.Addr) (int, error) { for { + log.Printf("MS : WRITETO %v", c.pconn.LocalAddr()) + log.Println(string(b)) + log.Println(b) c.mu.Lock() pconn := c.pconn c.mu.Unlock() @@ -2941,6 +2954,7 @@ func (c *RebindingUDPConn) WriteTo(b []byte, addr net.Addr) (int, error) { continue } } + log.Println("MS : WRITETO complete ", n, err) return n, err } } @@ -2968,6 +2982,7 @@ func (c *blockForeverConn) ReadFrom(p []byte) (n int, addr net.Addr, err error) } func (c *blockForeverConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { + log.Println("DROP WRITE") // Silently drop writes. return len(p), nil } diff --git a/wgengine/netstack/netstack.go b/wgengine/netstack/netstack.go index 4bd00f041..c86641b43 100644 --- a/wgengine/netstack/netstack.go +++ b/wgengine/netstack/netstack.go @@ -399,6 +399,7 @@ func (ns *Impl) DialContextUDP(ctx context.Context, addr string) (*gonet.UDPConn } func (ns *Impl) injectOutbound() { + log.Println("NETSTACK Inject outbound") for { packetInfo, ok := ns.linkEP.ReadContext(context.Background()) if !ok { @@ -431,6 +432,7 @@ func (ns *Impl) isLocalIP(ip netaddr.IP) bool { } func (ns *Impl) injectInbound(p *packet.Parsed, t *tstun.Wrapper) filter.Response { + log.Println("NETSTACK: injectInbound") if ns.onlySubnets && ns.isLocalIP(p.Dst.IP()) { // In hybrid ("only subnets") mode, bail out early if // the traffic is destined for an actual Tailscale