Signed-off-by: Simeng He <simeng@tailscale.com>
This commit is contained in:
Simeng He
2021-07-07 13:35:30 -04:00
parent fcbabd35b2
commit ccde87bee5
5 changed files with 88 additions and 99 deletions

View File

@@ -6,6 +6,7 @@
import (
"io"
"log"
"os"
"golang.zx2c4.com/wireguard/tun"
@@ -35,11 +36,13 @@ func (t *fakeTUN) Close() error {
}
func (t *fakeTUN) Read(out []byte, offset int) (int, error) {
log.Println("TSTUN : FAKE")
<-t.closechan
return 0, io.EOF
}
func (t *fakeTUN) Write(b []byte, n int) (int, error) {
log.Println("FAKE : Write Called")
select {
case <-t.closechan:
return 0, ErrClosed

View File

@@ -8,9 +8,11 @@
import (
"errors"
"fmt"
"io"
"log"
"os"
"runtime/debug"
"sync"
"sync/atomic"
"time"
@@ -141,6 +143,8 @@ type tunReadResult struct {
}
func Wrap(logf logger.Logf, tdev tun.Device) *Wrapper {
fmt.Printf("Tunnel Type %T", tdev)
debug.PrintStack()
tun := &Wrapper{
logf: logger.WithPrefix(logf, "tstun: "),
tdev: tdev,
@@ -276,8 +280,8 @@ func allowSendOnClosedChannel() {
// This is needed because t.tdev.Read in general may block (it does on Windows),
// so packets may be stuck in t.outbound if t.Read called t.tdev.Read directly.
func (t *Wrapper) poll() {
<<<<<<< HEAD
defer allowSendOnClosedChannel() // for send to t.outbound
log.Println("TSTUN : POLL Started with len ", len(t.bufferConsumed))
for range t.bufferConsumed {
var n int
var err error
@@ -289,55 +293,20 @@ func (t *Wrapper) poll() {
// We don't need this loop for correctness,
// but wireguard-go will skip an empty read,
// so we might as well avoid the send through t.outbound.
log.Println("TSTUN : BEFORE READ")
for n == 0 && err == nil {
log.Println("TSTUN : BEFORE READ IN FOR")
if t.isClosed() {
=======
for {
log.Println("TSTUN: POLL START")
select {
case <-t.closed:
return
case <-t.bufferConsumed:
// continue
}
log.Println("TSTUN: AFTER FIRST SELECT")
// Read may use memory in t.buffer before PacketStartOffset for mandatory headers.
// This is the rationale behind the tun.Wrapper.{Read,Write} interfaces
// and the reason t.buffer has size MaxMessageSize and not MaxContentSize.
n, err := t.tdev.Read(t.buffer[:], PacketStartOffset)
log.Println("TSTUN : readerr,", err)
if err != nil {
select {
case <-t.closed:
>>>>>>> Many logs
log.Println("TSTUN : BEFORE T CLOSED")
return
}
<<<<<<< HEAD
n, err = t.tdev.Read(t.buffer[:], PacketStartOffset)
=======
continue
}
// Wireguard will skip an empty read,
// so we might as well do it here to avoid the send through t.outbound.
if n == 0 {
t.bufferConsumed <- struct{}{}
continue
}
log.Println("TSTUN : after err check", err)
select {
case <-t.closed:
return
case t.outbound <- t.buffer[PacketStartOffset : PacketStartOffset+n]:
log.Println("TSTUN : Outbound Sent To")
// continue
>>>>>>> Many logs
}
log.Println("TSTUN : BEFORE OUTBOUND")
t.outbound <- tunReadResult{data: t.buffer[PacketStartOffset : PacketStartOffset+n], err: err}
log.Println("TSTUN : sent to outbound")
}
log.Println("TSTUN : POLL FINISHED")
}
var magicDNSIPPort = netaddr.MustParseIPPort("100.100.100.100:0")
@@ -392,13 +361,16 @@ func (t *Wrapper) IdleDuration() time.Duration {
}
func (t *Wrapper) Read(buf []byte, offset int) (int, error) {
<<<<<<< HEAD
now := time.Now()
res, ok := <-t.outbound
log.Println("TSTUN : outbound wait for channel read time, ", time.Since(now))
if !ok {
// Wrapper is closed.
log.Println("TSTUN : EOF")
return 0, io.EOF
}
if res.err != nil {
log.Println("TSTUN : err: ", res.err)
return 0, res.err
}
defer allowSendOnClosedChannel() // for send to t.bufferConsumed
@@ -411,42 +383,11 @@ func (t *Wrapper) Read(buf []byte, offset int) (int, error) {
if !isInjectedPacket {
// We are done with t.buffer. Let poll re-use it.
t.bufferConsumed <- struct{}{}
=======
log.Println("TSTUN: WRAPPERREAD")
now := time.Now()
var n int
wasInjectedPacket := false
log.Println("TSTUN : Before Select")
select {
case <-t.closed:
log.Println("TSTUN : EOF")
return 0, io.EOF
case err := <-t.errors:
log.Println("TSTUN : ", err)
return 0, err
case pkt := <-t.outbound:
log.Println("TSTUN : t<-outbound")
n = copy(buf[offset:], pkt)
// t.buffer has a fixed location in memory,
// so this is the easiest way to tell when it has been consumed.
// &pkt[0] can be used because empty packets do not reach t.outbound.
if &pkt[0] == &t.buffer[PacketStartOffset] {
t.bufferConsumed <- struct{}{}
} else {
// If the packet is not from t.buffer, then it is an injected packet.
wasInjectedPacket = true
}
>>>>>>> Many logs
}
log.Println("TSTUN : After SELECT, took ", time.Since(now))
p := parsedPacketPool.Get().(*packet.Parsed)
defer parsedPacketPool.Put(p)
p.Decode(buf[offset : offset+n])
log.Println("TSTUN : After DECODE, took ", time.Since(now))
if m, ok := t.destIPActivity.Load().(map[netaddr.IP]func()); ok {
if fn := m[p.Dst.IP()]; fn != nil {
@@ -454,22 +395,8 @@ func (t *Wrapper) Read(buf []byte, offset int) (int, error) {
}
}
<<<<<<< HEAD
// Do not filter injected packets.
if !isInjectedPacket && !t.disableFilter {
=======
log.Println("TSTUN : After DestIP, took ", time.Since(now))
// For injected packets, we return early to bypass filtering.
if wasInjectedPacket {
log.Println("TSTUN : WasInjectedPacket, took ", time.Since(now))
t.noteActivity()
log.Println("TSTUN BODY: ", n)
return n, nil
}
if !t.disableFilter {
>>>>>>> Many logs
response := t.filterOut(p)
if response != filter.Accept {
// Wireguard considers read errors fatal; pretend nothing was read
@@ -478,11 +405,13 @@ func (t *Wrapper) Read(buf []byte, offset int) (int, error) {
}
t.noteActivity()
log.Printf("TSTUN: Full read took %vs", time.Since(now))
log.Printf("TSTUN : Read Completed in %v\n", time.Since(now).Seconds())
return n, nil
}
func (t *Wrapper) filterIn(buf []byte) filter.Response {
log.Println("TSTUN : Filter In called")
now := time.Now()
p := parsedPacketPool.Get().(*packet.Parsed)
defer parsedPacketPool.Put(p)
p.Decode(buf)
@@ -498,12 +427,14 @@ func (t *Wrapper) filterIn(buf []byte) filter.Response {
}
}
}
log.Println("TSTUN : Filter In After TSMP")
if t.PreFilterIn != nil {
if res := t.PreFilterIn(p, t); res.IsDrop() {
return res
}
}
log.Println("TSTUN : Filter In After PreFilter")
filt, _ := t.filter.Load().(*filter.Filter)
@@ -512,6 +443,7 @@ func (t *Wrapper) filterIn(buf []byte) filter.Response {
}
outcome := filt.RunIn(p, t.filterFlags)
log.Println("TSTUN : Filter In After Outcome")
// Let peerapi through the filter; its ACLs are handled at L7,
// not at the packet level.
@@ -523,14 +455,17 @@ func (t *Wrapper) filterIn(buf []byte) filter.Response {
outcome = filter.Accept
}
}
log.Println("TSTUN : Filter In After Outcome check2 type : ", outcome.String())
if outcome != filter.Accept {
log.Println("TSTUN : Filter In After Outcome check3")
// Tell them, via TSMP, we're dropping them due to the ACL.
// Their host networking stack can translate this into ICMP
// or whatnot as required. But notably, their GUI or tailscale CLI
// can show them a rejection history with reasons.
if p.IPVersion == 4 && p.IPProto == ipproto.TCP && p.TCPFlags&packet.TCPSyn != 0 && !t.disableTSMPRejected {
log.Println("TSTUN : Filter In After Outcome check4")
rj := packet.TailscaleRejectedHeader{
IPSrc: p.Dst.IP(),
IPDst: p.Src.IP(),
@@ -544,18 +479,21 @@ func (t *Wrapper) filterIn(buf []byte) filter.Response {
}
pkt := packet.Generate(rj, nil)
t.InjectOutbound(pkt)
log.Println("TSTUN : FilterIn Inject took ,", time.Since(now))
// TODO(bradfitz): also send a TCP RST, after the TSMP message.
}
return filter.Drop
}
log.Println("TSTUN : Filter In After Outcome check5")
if t.PostFilterIn != nil {
if res := t.PostFilterIn(p, t); res.IsDrop() {
return res
}
}
log.Println("TSTUN : Filter In After Outcome check6")
return filter.Accept
}
@@ -563,6 +501,7 @@ func (t *Wrapper) filterIn(buf []byte) filter.Response {
// Write accepts an incoming packet. The packet begins at buf[offset:],
// like wireguard-go/tun.Device.Write.
func (t *Wrapper) Write(buf []byte, offset int) (int, error) {
now := time.Now()
if !t.disableFilter {
if t.filterIn(buf[offset:]) != filter.Accept {
// If we're not accepting the packet, lie to wireguard-go and pretend
@@ -578,11 +517,16 @@ func (t *Wrapper) Write(buf []byte, offset int) (int, error) {
// device/receive.go: _, err = device.tun.device.Write(....)
//
// TODO(bradfitz): fix upstream interface docs, implementation.
log.Println("TSTUN : Write completed early in ", time.Since(now))
return len(buf), nil
}
}
// Causes a data race ???
// defer log.Println("TSTUN : Write completed in ", time.Since(now))
// debug.PrintStack()
t.noteActivity()
// log.Println("TSTUN : Write completed in ", time.Since(now))
return t.tdev.Write(buf, offset)
}
@@ -639,6 +583,7 @@ func (t *Wrapper) InjectInboundCopy(packet []byte) error {
}
func (t *Wrapper) injectOutboundPong(pp *packet.Parsed, req packet.TSMPPingRequest) {
now := time.Now()
pong := packet.TSMPPongReply{
Data: req.Data,
}
@@ -659,6 +604,7 @@ func (t *Wrapper) injectOutboundPong(pp *packet.Parsed, req packet.TSMPPingReque
}
t.InjectOutbound(packet.Generate(pong, nil))
log.Println("TSTUN : Inject outbound pong took ", time.Since(now))
}
// InjectOutbound makes the Wrapper device behave as if a packet
@@ -667,6 +613,8 @@ func (t *Wrapper) injectOutboundPong(pp *packet.Parsed, req packet.TSMPPingReque
// The injected packet will not pass through outbound filters.
// Injecting an empty packet is a no-op.
func (t *Wrapper) InjectOutbound(packet []byte) error {
log.Println("TSTUN: Inject Outbound")
now := time.Now()
if len(packet) > MaxPacketSize {
return errPacketTooBig
}
@@ -675,6 +623,7 @@ func (t *Wrapper) InjectOutbound(packet []byte) error {
}
defer allowSendOnClosedChannel() // for send to t.outbound
t.outbound <- tunReadResult{data: packet}
log.Println("TSTUN : Inject took ", time.Since(now))
return nil
}

View File

@@ -298,12 +298,12 @@ func TestTwoNodeConnectivity(t *testing.T) {
// Create two nodes and hope that logs come out correctly
n1 := newTestNode(t, env)
n1SocksAddrCh := n1.socks5AddrChan()
d1 := n1.StartDaemon(t)
d1 := n1.StartDaemonPrefix(t, "Node1 ")
defer d1.Kill()
n2 := newTestNode(t, env)
n2SocksAddrCh := n2.socks5AddrChan()
d2 := n2.StartDaemon(t)
d2 := n2.StartDaemonPrefix(t, "Node2 ")
defer d2.Kill()
n1Socks := n1.AwaitSocksAddr(t, n1SocksAddrCh)
@@ -402,11 +402,11 @@ func TestTwoNodeConnectivity(t *testing.T) {
}
// Read the bytes in
// p := make([]byte, 1024)
// _, err = dialerConn.Read(p)
// if err != nil {
// return err
// }
p := make([]byte, 1024)
_, err = dialerConn.Read(p)
if err != nil {
return err
}
t.Logf("Time taken for this run : %vs", time.Since(now).Seconds())
return nil
}); err != nil {
@@ -609,9 +609,29 @@ func (d *Daemon) MustCleanShutdown(t testing.TB) {
}
}
type PrefixedWriter struct {
prefix string
w io.Writer
}
func (p *PrefixedWriter) Write(b []byte) (int, error) {
var buf []byte
buf = append(buf, p.prefix...)
buf = append(buf, b...)
_, err := p.w.Write(buf)
if err != nil {
return 0, err
}
return len(b), err
}
func (n *testNode) StartDaemon(t testing.TB) *Daemon {
return n.StartDaemonPrefix(t, "")
}
// StartDaemon starts the node's tailscaled, failing if it fails to
// start.
func (n *testNode) StartDaemon(t testing.TB) *Daemon {
func (n *testNode) StartDaemonPrefix(t testing.TB, prefix string) *Daemon {
cmd := exec.Command(n.env.Binaries.Daemon,
"--tun=userspace-networking",
"--state="+n.stateFile,
@@ -625,8 +645,8 @@ func (n *testNode) StartDaemon(t testing.TB) *Daemon {
)
cmd.Stderr = &nodeOutputParser{n: n}
if *verboseTailscaled {
cmd.Stdout = os.Stdout
cmd.Stderr = io.MultiWriter(cmd.Stderr, os.Stderr)
cmd.Stdout = &PrefixedWriter{prefix: prefix, w: os.Stdout}
cmd.Stderr = io.MultiWriter(cmd.Stderr, &PrefixedWriter{prefix: prefix, w: os.Stderr})
}
if err := cmd.Start(); err != nil {
t.Fatalf("starting tailscaled: %v", err)

View File

@@ -1632,6 +1632,7 @@ func (c *Conn) receiveIPv4(b []byte) (n int, ep conn.Endpoint, err error) {
// ok is whether this read should be reported up to wireguard-go (our
// caller).
func (c *Conn) receiveIP(b []byte, ipp netaddr.IPPort, cache *ippEndpointCache) (ep conn.Endpoint, ok bool) {
log.Println("MS : Received IP")
if stun.Is(b) {
c.stunReceiveFunc.Load().(func([]byte, netaddr.IPPort))(b, ipp)
return nil, false
@@ -1639,6 +1640,7 @@ func (c *Conn) receiveIP(b []byte, ipp netaddr.IPPort, cache *ippEndpointCache)
if c.handleDiscoMessage(b, ipp) {
return nil, false
}
log.Println("MS : ReceivedIP called HandleDiscoMessage")
if !c.havePrivateKey.Get() {
// If we have no private key, we're logged out or
// stopped. Don't try to pass these wireguard packets
@@ -1685,6 +1687,7 @@ func (c *connBind) receiveDERP(b []byte) (n int, ep conn.Endpoint, err error) {
}
func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep conn.Endpoint) {
log.Println("MS : Process DERP READ RESULT")
if dm.copyBuf == nil {
return 0, nil
}
@@ -1699,8 +1702,10 @@ func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep con
ipp := netaddr.IPPortFrom(derpMagicIPAddr, uint16(regionID))
if c.handleDiscoMessage(b[:n], ipp) {
log.Println("MS : c.HandleDiscoMessage")
return 0, nil
}
log.Println("MS : ProcessDerp HandleDiscoMessage")
var (
didNoteRecvActivity bool
@@ -1764,6 +1769,7 @@ func (c *Conn) processDERPReadResult(dm derpReadResult, b []byte) (n int, ep con
)
func (c *Conn) sendDiscoMessage(dst netaddr.IPPort, dstKey tailcfg.NodeKey, dstDisco tailcfg.DiscoKey, m disco.Message, logLevel discoLogLevel) (sent bool, err error) {
log.Println("MS: SENDDISCOMESSAGE")
c.mu.Lock()
if c.closed {
c.mu.Unlock()
@@ -1936,6 +1942,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort) (isDiscoMsg bo
switch dm := dm.(type) {
case *disco.Ping:
log.Println("Pinger", de)
c.handlePingLocked(dm, de, src, sender, peerNode)
case *disco.Pong:
log.Println("Ponger", de)
@@ -1957,6 +1964,7 @@ func (c *Conn) handleDiscoMessage(msg []byte, src netaddr.IPPort) (isDiscoMsg bo
go de.handleCallMeMaybe(dm)
}
}
log.Println("DISCO MESSAGE COMPLETE")
return
}
@@ -2655,6 +2663,7 @@ func (c *Conn) listenPacket(network string, host netaddr.IP, port uint16) (net.P
// If curPortFate is set to dropCurrentPort, no attempt is made to reuse
// the current port.
func (c *Conn) bindSocket(rucPtr **RebindingUDPConn, network string, curPortFate currentPortFate) error {
log.Println("MS: BINDSOCKET")
var host netaddr.IP
if inTest() && !c.simulatedNetwork {
switch network {
@@ -2842,6 +2851,7 @@ func (c *RebindingUDPConn) currentConn() net.PacketConn {
// ReadFrom reads a packet from c into b.
// It returns the number of bytes copied and the source address.
func (c *RebindingUDPConn) ReadFrom(b []byte) (int, net.Addr, error) {
log.Println("MS : ReadFrom ", c.pconn.LocalAddr())
for {
pconn := c.currentConn()
n, addr, err := pconn.ReadFrom(b)
@@ -2927,6 +2937,9 @@ func (c *RebindingUDPConn) closeLocked() error {
func (c *RebindingUDPConn) WriteTo(b []byte, addr net.Addr) (int, error) {
for {
log.Printf("MS : WRITETO %v", c.pconn.LocalAddr())
log.Println(string(b))
log.Println(b)
c.mu.Lock()
pconn := c.pconn
c.mu.Unlock()
@@ -2941,6 +2954,7 @@ func (c *RebindingUDPConn) WriteTo(b []byte, addr net.Addr) (int, error) {
continue
}
}
log.Println("MS : WRITETO complete ", n, err)
return n, err
}
}
@@ -2968,6 +2982,7 @@ func (c *blockForeverConn) ReadFrom(p []byte) (n int, addr net.Addr, err error)
}
func (c *blockForeverConn) WriteTo(p []byte, addr net.Addr) (n int, err error) {
log.Println("DROP WRITE")
// Silently drop writes.
return len(p), nil
}

View File

@@ -399,6 +399,7 @@ func (ns *Impl) DialContextUDP(ctx context.Context, addr string) (*gonet.UDPConn
}
func (ns *Impl) injectOutbound() {
log.Println("NETSTACK Inject outbound")
for {
packetInfo, ok := ns.linkEP.ReadContext(context.Background())
if !ok {
@@ -431,6 +432,7 @@ func (ns *Impl) isLocalIP(ip netaddr.IP) bool {
}
func (ns *Impl) injectInbound(p *packet.Parsed, t *tstun.Wrapper) filter.Response {
log.Println("NETSTACK: injectInbound")
if ns.onlySubnets && ns.isLocalIP(p.Dst.IP()) {
// In hybrid ("only subnets") mode, bail out early if
// the traffic is destined for an actual Tailscale