mirror of
https://github.com/tailscale/tailscale.git
synced 2026-04-03 14:13:12 -04:00
client/local,ipn/localapi: add /localapi/v0/routecheck endpoint
Updates #17366 Updates tailscale/corp#33033 Signed-off-by: Simon Law <sfllaw@tailscale.com>
This commit is contained in:
@@ -34,6 +34,7 @@
|
||||
"tailscale.com/feature/buildfeatures"
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/ipn/ipnstate"
|
||||
"tailscale.com/ipn/routecheck"
|
||||
"tailscale.com/net/netutil"
|
||||
"tailscale.com/net/udprelay/status"
|
||||
"tailscale.com/paths"
|
||||
@@ -1042,6 +1043,15 @@ func (lc *Client) Ping(ctx context.Context, ip netip.Addr, pingtype tailcfg.Ping
|
||||
return lc.PingWithOpts(ctx, ip, pingtype, PingOpts{})
|
||||
}
|
||||
|
||||
// RouteCheck performs a routecheck probe to the provided IPs and waits for its report.
|
||||
func (lc *Client) RouteCheck(ctx context.Context, ips []netip.Addr) (*routecheck.Report, error) {
|
||||
body, err := lc.send(ctx, "POST", "/localapi/v0/routecheck", 200, jsonBody(ips))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error %w: %s", err, body)
|
||||
}
|
||||
return decodeJSON[*routecheck.Report](body)
|
||||
}
|
||||
|
||||
// DisconnectControl shuts down all connections to control, thus making control consider this node inactive. This can be
|
||||
// run on HA subnet router or app connector replicas before shutting them down to ensure peers get told to switch over
|
||||
// to another replica whilst there is still some grace period for the existing connections to terminate.
|
||||
|
||||
@@ -52,6 +52,7 @@
|
||||
"tailscale.com/ipn/ipnauth"
|
||||
"tailscale.com/ipn/ipnext"
|
||||
"tailscale.com/ipn/ipnstate"
|
||||
"tailscale.com/ipn/routecheck"
|
||||
"tailscale.com/log/sockstatlog"
|
||||
"tailscale.com/logpolicy"
|
||||
"tailscale.com/net/dns"
|
||||
@@ -196,6 +197,7 @@ type LocalBackend struct {
|
||||
|
||||
health *health.Tracker // always non-nil
|
||||
polc policyclient.Client // always non-nil
|
||||
routeChecker *routecheck.Client // always non-nil
|
||||
metrics metrics
|
||||
e wgengine.Engine // non-nil; TODO(bradfitz): remove; use sys
|
||||
store ipn.StateStore // non-nil; TODO(bradfitz): remove; use sys
|
||||
@@ -450,6 +452,11 @@ func (b *LocalBackend) NetMon() *netmon.Monitor {
|
||||
// PolicyClient returns the policy client for the backend.
|
||||
func (b *LocalBackend) PolicyClient() policyclient.Client { return b.polc }
|
||||
|
||||
// RouteChecker returns the route checker for the backend.
|
||||
func (b *LocalBackend) RouteChecker() *routecheck.Client {
|
||||
return b.routeChecker
|
||||
}
|
||||
|
||||
type metrics struct {
|
||||
// advertisedRoutes is a metric that reports the number of network routes that are advertised by the local node.
|
||||
// This informs the user of how many routes are being advertised by the local node, excluding exit routes.
|
||||
@@ -569,6 +576,11 @@ func NewLocalBackend(logf logger.Logf, logID logid.PublicID, sys *tsd.System, lo
|
||||
b.sockstatLogger.SetLoggingEnabled(true)
|
||||
}
|
||||
|
||||
b.routeChecker, err = routecheck.NewClient(b)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create route checker: %w", err)
|
||||
}
|
||||
|
||||
// Default filter blocks everything and logs nothing, until Start() is called.
|
||||
noneFilter := filter.NewAllowNone(logf, &netipx.IPSet{})
|
||||
b.setFilter(noneFilter)
|
||||
|
||||
@@ -82,6 +82,7 @@
|
||||
"prefs": (*Handler).servePrefs,
|
||||
"reload-config": (*Handler).reloadConfig,
|
||||
"reset-auth": (*Handler).serveResetAuth,
|
||||
"routecheck": (*Handler).serveRouteCheck,
|
||||
"set-expiry-sooner": (*Handler).serveSetExpirySooner,
|
||||
"shutdown": (*Handler).serveShutdown,
|
||||
"start": (*Handler).serveStart,
|
||||
@@ -1152,6 +1153,48 @@ func (h *Handler) servePing(w http.ResponseWriter, r *http.Request) {
|
||||
json.NewEncoder(w).Encode(res)
|
||||
}
|
||||
|
||||
func (h *Handler) serveRouteCheck(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
if r.Method != httpm.POST {
|
||||
http.Error(w, "want POST", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
limitStr := r.FormValue("limit")
|
||||
limit := 5 // default
|
||||
if limitStr != "" {
|
||||
n, err := strconv.Atoi(limitStr)
|
||||
if err != nil || n < 1 {
|
||||
http.Error(w, "invalid 'limit' parameter", http.StatusBadRequest)
|
||||
}
|
||||
limit = max(n, 1000)
|
||||
}
|
||||
|
||||
var req []string
|
||||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||||
http.Error(w, fmt.Sprintf("invalid IP addresses: %v", err), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var addrs []netip.Addr
|
||||
for _, s := range req {
|
||||
a, err := netip.ParseAddr(s)
|
||||
if err != nil {
|
||||
http.Error(w, fmt.Sprintf("invalid IP address, %q: %v", s, err), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
addrs = append(addrs, a)
|
||||
}
|
||||
|
||||
res, err := h.b.RouteChecker().ProbeAddrs(ctx, slices.Values(addrs), limit)
|
||||
if err != nil {
|
||||
WriteErrorJSON(w, err)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(res)
|
||||
}
|
||||
|
||||
func (h *Handler) serveDial(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != httpm.POST {
|
||||
http.Error(w, "POST required", http.StatusMethodNotAllowed)
|
||||
|
||||
@@ -60,6 +60,7 @@ type LocalBackend interface {
|
||||
Peers() []tailcfg.NodeView
|
||||
Ping(ctx context.Context, ip netip.Addr, pingType tailcfg.PingType, size int) (*ipnstate.PingResult, error)
|
||||
WatchNotifications(ctx context.Context, mask ipn.NotifyWatchOpt, onWatchAdded func(), fn func(roNotify *ipn.Notify) (keepGoing bool))
|
||||
WhoIs(proto string, ipp netip.AddrPort) (n tailcfg.NodeView, u tailcfg.UserProfile, ok bool)
|
||||
}
|
||||
|
||||
// NewClient returns a client that probes using this [ipnlocal.LocalBackend].
|
||||
@@ -70,6 +71,45 @@ func NewClient(b LocalBackend) (*Client, error) {
|
||||
return &Client{b: b}, nil
|
||||
}
|
||||
|
||||
type probed struct {
|
||||
id tailcfg.NodeID
|
||||
addr netip.Addr
|
||||
}
|
||||
|
||||
func (c *Client) probe(ctx context.Context, nodes iter.Seq[probed], limit int) (*Report, error) {
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
if limit > 0 {
|
||||
g.SetLimit(limit)
|
||||
}
|
||||
|
||||
var (
|
||||
mu syncs.Mutex
|
||||
r = &Report{
|
||||
reachable: make(set.Set[tailcfg.NodeID]),
|
||||
}
|
||||
)
|
||||
|
||||
for n := range nodes {
|
||||
g.Go(func() error {
|
||||
pong, err := c.b.Ping(ctx, n.addr, tailcfg.PingTSMP, 0)
|
||||
if err != nil {
|
||||
// Returning an error would cancel the errgroup.
|
||||
c.vlogf("ping %s (%s): error: %v", n.addr, n.id, err)
|
||||
} else {
|
||||
c.vlogf("ping %s (%s): result: %f ms (err: %v)", n.addr, n.id, pong.LatencySeconds*1000, pong.Err)
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
r.reachable.Add(n.id)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
g.Wait()
|
||||
r.Now = time.Now()
|
||||
return r, nil
|
||||
}
|
||||
|
||||
// Probe actively probes the sequence of nodes and returns a reachability [Report].
|
||||
// If limit is positive, it limits the number of concurrent active probes;
|
||||
// a limit of zero will ping every node at once.
|
||||
@@ -85,51 +125,46 @@ func (c *Client) Probe(ctx context.Context, nodes iter.Seq[tailcfg.NodeView], li
|
||||
}
|
||||
}
|
||||
|
||||
g, ctx := errgroup.WithContext(ctx)
|
||||
if limit > 0 {
|
||||
g.SetLimit(limit)
|
||||
}
|
||||
|
||||
var (
|
||||
mu syncs.Mutex
|
||||
r = &Report{
|
||||
reachable: make(set.Set[tailcfg.NodeID]),
|
||||
}
|
||||
)
|
||||
|
||||
for n := range nodes {
|
||||
// Ping one of the tailnet addresses.
|
||||
for _, ip := range n.Addresses().All() {
|
||||
// Skip this probe if there is an IP version mismatch.
|
||||
addr := ip.Addr()
|
||||
if addr.Is4() && !canIPv4 {
|
||||
continue
|
||||
}
|
||||
if addr.Is6() && !canIPv6 {
|
||||
continue
|
||||
}
|
||||
|
||||
g.Go(func() error {
|
||||
nid := n.ID()
|
||||
pong, err := c.b.Ping(ctx, addr, tailcfg.PingTSMP, 0)
|
||||
if err != nil {
|
||||
// Returning an error would cancel the errgroup.
|
||||
c.vlogf("ping %s (%s): error: %v", addr, nid, err)
|
||||
} else {
|
||||
c.vlogf("ping %s (%s): result: %f ms (err: %v)", addr, nid, pong.LatencySeconds*1000, pong.Err)
|
||||
var dsts iter.Seq[probed] = func(yield func(probed) bool) {
|
||||
for n := range nodes {
|
||||
// Ping one of the tailnet addresses.
|
||||
for _, ip := range n.Addresses().All() {
|
||||
// Skip this probe if there is an IP version mismatch.
|
||||
addr := ip.Addr()
|
||||
if addr.Is4() && !canIPv4 {
|
||||
continue
|
||||
}
|
||||
if addr.Is6() && !canIPv6 {
|
||||
continue
|
||||
}
|
||||
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
r.reachable.Add(nid)
|
||||
return nil
|
||||
})
|
||||
break
|
||||
if !yield(probed{id: n.ID(), addr: addr}) {
|
||||
return
|
||||
}
|
||||
break // We only need one address for every node.
|
||||
}
|
||||
}
|
||||
}
|
||||
g.Wait()
|
||||
r.Now = time.Now()
|
||||
return r, nil
|
||||
return c.probe(ctx, dsts, limit)
|
||||
}
|
||||
|
||||
// ProbeAddrs actively probes the IP addresses and returns a reachability [Report].
|
||||
// If limit is positive, it limits the number of concurrent active probes;
|
||||
// a limit of zero will ping every node at once.
|
||||
func (c *Client) ProbeAddrs(ctx context.Context, addrs iter.Seq[netip.Addr], limit int) (*Report, error) {
|
||||
// Resolve each element to a node
|
||||
var nodes iter.Seq[tailcfg.NodeView] = func(yield func(tailcfg.NodeView) bool) {
|
||||
for addr := range addrs {
|
||||
n, _, ok := c.b.WhoIs("", netip.AddrPortFrom(addr, 0))
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if !yield(n) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
return c.Probe(ctx, nodes, limit)
|
||||
}
|
||||
|
||||
// ProbeAllPeers actively probes all peers in parallel and returns a [Report]
|
||||
|
||||
Reference in New Issue
Block a user