diff --git a/cmd/containerboot/kube.go b/cmd/containerboot/kube.go index 4943bddba..73f5819b4 100644 --- a/cmd/containerboot/kube.go +++ b/cmd/containerboot/kube.go @@ -14,9 +14,12 @@ "net/http" "net/netip" "os" + "path/filepath" "strings" "time" + "github.com/fsnotify/fsnotify" + "tailscale.com/client/local" "tailscale.com/ipn" "tailscale.com/kube/egressservices" "tailscale.com/kube/ingressservices" @@ -26,9 +29,11 @@ "tailscale.com/tailcfg" "tailscale.com/types/logger" "tailscale.com/util/backoff" - "tailscale.com/util/set" ) +const fieldManager = "tailscale-container" +const kubeletMountedConfigLn = "..data" + // kubeClient is a wrapper around Tailscale's internal kube client that knows how to talk to the kube API server. We use // this rather than any of the upstream Kubernetes client libaries to avoid extra imports. type kubeClient struct { @@ -46,7 +51,7 @@ func newKubeClient(root string, stateSecret string) (*kubeClient, error) { var err error kc, err := kubeclient.New("tailscale-container") if err != nil { - return nil, fmt.Errorf("Error creating kube client: %w", err) + return nil, fmt.Errorf("error creating kube client: %w", err) } if (root != "/") || os.Getenv("TS_KUBERNETES_READ_API_SERVER_ADDRESS_FROM_ENV") == "true" { // Derive the API server address from the environment variables @@ -63,7 +68,7 @@ func (kc *kubeClient) storeDeviceID(ctx context.Context, deviceID tailcfg.Stable kubetypes.KeyDeviceID: []byte(deviceID), }, } - return kc.StrategicMergePatchSecret(ctx, kc.stateSecret, s, "tailscale-container") + return kc.StrategicMergePatchSecret(ctx, kc.stateSecret, s, fieldManager) } // storeDeviceEndpoints writes device's tailnet IPs and MagicDNS name to fields 'device_ips', 'device_fqdn' of client's @@ -84,7 +89,7 @@ func (kc *kubeClient) storeDeviceEndpoints(ctx context.Context, fqdn string, add kubetypes.KeyDeviceIPs: deviceIPs, }, } - return kc.StrategicMergePatchSecret(ctx, kc.stateSecret, s, "tailscale-container") + return kc.StrategicMergePatchSecret(ctx, kc.stateSecret, s, fieldManager) } // storeHTTPSEndpoint writes an HTTPS endpoint exposed by this device via 'tailscale serve' to the client's state @@ -96,7 +101,7 @@ func (kc *kubeClient) storeHTTPSEndpoint(ctx context.Context, ep string) error { kubetypes.KeyHTTPSEndpoint: []byte(ep), }, } - return kc.StrategicMergePatchSecret(ctx, kc.stateSecret, s, "tailscale-container") + return kc.StrategicMergePatchSecret(ctx, kc.stateSecret, s, fieldManager) } // deleteAuthKey deletes the 'authkey' field of the given kube @@ -122,7 +127,7 @@ func (kc *kubeClient) deleteAuthKey(ctx context.Context) error { // resetContainerbootState resets state from previous runs of containerboot to // ensure the operator doesn't use stale state when a Pod is first recreated. -func (kc *kubeClient) resetContainerbootState(ctx context.Context, podUID string) error { +func (kc *kubeClient) resetContainerbootState(ctx context.Context, podUID string, tailscaledConfigAuthkey string) error { existingSecret, err := kc.GetSecret(ctx, kc.stateSecret) switch { case kubeclient.IsNotFoundErr(err): @@ -131,32 +136,135 @@ func (kc *kubeClient) resetContainerbootState(ctx context.Context, podUID string case err != nil: return fmt.Errorf("failed to read state Secret %q to reset state: %w", kc.stateSecret, err) } + s := &kubeapi.Secret{ Data: map[string][]byte{ kubetypes.KeyCapVer: fmt.Appendf(nil, "%d", tailcfg.CurrentCapabilityVersion), + + // TODO(tomhjp): Perhaps shouldn't clear device ID and use a different signal, as this could leak tailnet devices. + kubetypes.KeyDeviceID: nil, + kubetypes.KeyDeviceFQDN: nil, + kubetypes.KeyDeviceIPs: nil, + kubetypes.KeyHTTPSEndpoint: nil, + egressservices.KeyEgressServices: nil, + ingressservices.IngressConfigKey: nil, }, } if podUID != "" { s.Data[kubetypes.KeyPodUID] = []byte(podUID) } - toClear := set.SetOf([]string{ - kubetypes.KeyDeviceID, - kubetypes.KeyDeviceFQDN, - kubetypes.KeyDeviceIPs, - kubetypes.KeyHTTPSEndpoint, - egressservices.KeyEgressServices, - ingressservices.IngressConfigKey, - }) - for key := range existingSecret.Data { - if toClear.Contains(key) { - // It's fine to leave the key in place as a debugging breadcrumb, - // it should get a new value soon. - s.Data[key] = nil - } + // Only clear reissue_authkey if the operator has actioned it. + brokenAuthkey, ok := existingSecret.Data[kubetypes.KeyReissueAuthkey] + if ok && tailscaledConfigAuthkey != "" && string(brokenAuthkey) != tailscaledConfigAuthkey { + s.Data[kubetypes.KeyReissueAuthkey] = nil } - return kc.StrategicMergePatchSecret(ctx, kc.stateSecret, s, "tailscale-container") + return kc.StrategicMergePatchSecret(ctx, kc.stateSecret, s, fieldManager) +} + +func (kc *kubeClient) setAndWaitForAuthKeyReissue(ctx context.Context, client *local.Client, cfg *settings, tailscaledConfigAuthKey string) error { + err := client.DisconnectControl(ctx) + if err != nil { + return fmt.Errorf("error disconnecting from control: %w", err) + } + + err = kc.setReissueAuthKey(ctx, tailscaledConfigAuthKey) + if err != nil { + return fmt.Errorf("failed to set reissue_authkey in Kubernetes Secret: %w", err) + } + + err = kc.waitForAuthKeyReissue(ctx, cfg.TailscaledConfigFilePath, tailscaledConfigAuthKey, 10*time.Minute) + if err != nil { + return fmt.Errorf("failed to receive new auth key: %w", err) + } + + return nil +} + +func (kc *kubeClient) setReissueAuthKey(ctx context.Context, authKey string) error { + s := &kubeapi.Secret{ + Data: map[string][]byte{ + kubetypes.KeyReissueAuthkey: []byte(authKey), + }, + } + + log.Printf("Requesting a new auth key from operator") + return kc.StrategicMergePatchSecret(ctx, kc.stateSecret, s, fieldManager) +} + +func (kc *kubeClient) waitForAuthKeyReissue(ctx context.Context, configPath string, oldAuthKey string, maxWait time.Duration) error { + log.Printf("Waiting for operator to provide new auth key (max wait: %v)", maxWait) + + ctx, cancel := context.WithTimeout(ctx, maxWait) + defer cancel() + + tailscaledCfgDir := filepath.Dir(configPath) + toWatch := filepath.Join(tailscaledCfgDir, kubeletMountedConfigLn) + + var ( + pollTicker <-chan time.Time + eventChan <-chan fsnotify.Event + ) + + pollInterval := 5 * time.Second + + // Try to use fsnotify for faster notification + if w, err := fsnotify.NewWatcher(); err != nil { + log.Printf("auth key reissue: fsnotify unavailable, using polling: %v", err) + } else if err := w.Add(tailscaledCfgDir); err != nil { + w.Close() + log.Printf("auth key reissue: fsnotify watch failed, using polling: %v", err) + } else { + defer w.Close() + log.Printf("auth key reissue: watching for config changes via fsnotify") + eventChan = w.Events + } + + // still keep polling if using fsnotify, for logging and in case fsnotify fails + pt := time.NewTicker(pollInterval) + defer pt.Stop() + pollTicker = pt.C + + start := time.Now() + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for auth key reissue after %v", maxWait) + case <-pollTicker: // Waits for polling tick, continues when received + case event := <-eventChan: + if event.Name != toWatch { + continue + } + } + + newAuthKey := authkeyFromTailscaledConfig(configPath) + if newAuthKey != "" && newAuthKey != oldAuthKey { + log.Printf("New auth key received from operator after %v", time.Since(start).Round(time.Second)) + + if err := kc.clearReissueAuthKeyRequest(ctx); err != nil { + log.Printf("Warning: failed to clear reissue request: %v", err) + } + + return nil + } + + if eventChan == nil && pollTicker != nil { + log.Printf("Waiting for new auth key from operator (%v elapsed)", time.Since(start).Round(time.Second)) + } + } +} + +// clearReissueAuthKeyRequest removes the reissue_authkey marker from the Secret +// to signal to the operator that we've successfully received the new key. +func (kc *kubeClient) clearReissueAuthKeyRequest(ctx context.Context) error { + s := &kubeapi.Secret{ + Data: map[string][]byte{ + kubetypes.KeyReissueAuthkey: nil, + }, + } + return kc.StrategicMergePatchSecret(ctx, kc.stateSecret, s, fieldManager) } // waitForConsistentState waits for tailscaled to finish writing state if it diff --git a/cmd/containerboot/kube_test.go b/cmd/containerboot/kube_test.go index bc80e9cdf..6acaa60e1 100644 --- a/cmd/containerboot/kube_test.go +++ b/cmd/containerboot/kube_test.go @@ -248,25 +248,42 @@ func TestResetContainerbootState(t *testing.T) { capver := fmt.Appendf(nil, "%d", tailcfg.CurrentCapabilityVersion) for name, tc := range map[string]struct { podUID string + authkey string initial map[string][]byte expected map[string][]byte }{ "empty_initial": { podUID: "1234", + authkey: "new-authkey", initial: map[string][]byte{}, expected: map[string][]byte{ kubetypes.KeyCapVer: capver, kubetypes.KeyPodUID: []byte("1234"), + // Cleared keys. + kubetypes.KeyDeviceID: nil, + kubetypes.KeyDeviceFQDN: nil, + kubetypes.KeyDeviceIPs: nil, + kubetypes.KeyHTTPSEndpoint: nil, + egressservices.KeyEgressServices: nil, + ingressservices.IngressConfigKey: nil, }, }, "empty_initial_no_pod_uid": { initial: map[string][]byte{}, expected: map[string][]byte{ kubetypes.KeyCapVer: capver, + // Cleared keys. + kubetypes.KeyDeviceID: nil, + kubetypes.KeyDeviceFQDN: nil, + kubetypes.KeyDeviceIPs: nil, + kubetypes.KeyHTTPSEndpoint: nil, + egressservices.KeyEgressServices: nil, + ingressservices.IngressConfigKey: nil, }, }, "only_relevant_keys_updated": { - podUID: "1234", + podUID: "1234", + authkey: "new-authkey", initial: map[string][]byte{ kubetypes.KeyCapVer: []byte("1"), kubetypes.KeyPodUID: []byte("5678"), @@ -295,6 +312,57 @@ func TestResetContainerbootState(t *testing.T) { // Tailscaled keys not included in patch. }, }, + "new_authkey_issued": { + initial: map[string][]byte{ + kubetypes.KeyReissueAuthkey: []byte("old-authkey"), + }, + authkey: "new-authkey", + expected: map[string][]byte{ + kubetypes.KeyCapVer: capver, + kubetypes.KeyReissueAuthkey: nil, + // Cleared keys. + kubetypes.KeyDeviceID: nil, + kubetypes.KeyDeviceFQDN: nil, + kubetypes.KeyDeviceIPs: nil, + kubetypes.KeyHTTPSEndpoint: nil, + egressservices.KeyEgressServices: nil, + ingressservices.IngressConfigKey: nil, + }, + }, + "authkey_not_yet_updated": { + initial: map[string][]byte{ + kubetypes.KeyReissueAuthkey: []byte("old-authkey"), + }, + authkey: "old-authkey", + expected: map[string][]byte{ + kubetypes.KeyCapVer: capver, + // reissue_authkey not cleared. + // Cleared keys. + kubetypes.KeyDeviceID: nil, + kubetypes.KeyDeviceFQDN: nil, + kubetypes.KeyDeviceIPs: nil, + kubetypes.KeyHTTPSEndpoint: nil, + egressservices.KeyEgressServices: nil, + ingressservices.IngressConfigKey: nil, + }, + }, + "authkey_deleted_from_config": { + initial: map[string][]byte{ + kubetypes.KeyReissueAuthkey: []byte("old-authkey"), + }, + authkey: "", + expected: map[string][]byte{ + kubetypes.KeyCapVer: capver, + // reissue_authkey not cleared. + // Cleared keys. + kubetypes.KeyDeviceID: nil, + kubetypes.KeyDeviceFQDN: nil, + kubetypes.KeyDeviceIPs: nil, + kubetypes.KeyHTTPSEndpoint: nil, + egressservices.KeyEgressServices: nil, + ingressservices.IngressConfigKey: nil, + }, + }, } { t.Run(name, func(t *testing.T) { var actual map[string][]byte @@ -309,7 +377,7 @@ func TestResetContainerbootState(t *testing.T) { return nil }, }} - if err := kc.resetContainerbootState(context.Background(), tc.podUID); err != nil { + if err := kc.resetContainerbootState(context.Background(), tc.podUID, tc.authkey); err != nil { t.Fatalf("resetContainerbootState() error = %v", err) } if diff := cmp.Diff(tc.expected, actual); diff != "" { diff --git a/cmd/containerboot/main.go b/cmd/containerboot/main.go index ba47111fd..76c6e910a 100644 --- a/cmd/containerboot/main.go +++ b/cmd/containerboot/main.go @@ -137,7 +137,9 @@ "golang.org/x/sys/unix" "tailscale.com/client/tailscale" + "tailscale.com/health" "tailscale.com/ipn" + "tailscale.com/ipn/conffile" kubeutils "tailscale.com/k8s-operator" healthz "tailscale.com/kube/health" "tailscale.com/kube/kubetypes" @@ -206,6 +208,11 @@ func run() error { bootCtx, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() + var tailscaledConfigAuthkey string + if isOneStepConfig(cfg) { + tailscaledConfigAuthkey = authkeyFromTailscaledConfig(cfg.TailscaledConfigFilePath) + } + var kc *kubeClient if cfg.KubeSecret != "" { kc, err = newKubeClient(cfg.Root, cfg.KubeSecret) @@ -219,7 +226,7 @@ func run() error { // hasKubeStateStore because although we know we're in kube, that // doesn't guarantee the state store is properly configured. if hasKubeStateStore(cfg) { - if err := kc.resetContainerbootState(bootCtx, cfg.PodUID); err != nil { + if err := kc.resetContainerbootState(bootCtx, cfg.PodUID, tailscaledConfigAuthkey); err != nil { return fmt.Errorf("error clearing previous state from Secret: %w", err) } } @@ -299,7 +306,7 @@ func run() error { } } - w, err := client.WatchIPNBus(bootCtx, ipn.NotifyInitialNetMap|ipn.NotifyInitialPrefs|ipn.NotifyInitialState) + w, err := client.WatchIPNBus(bootCtx, ipn.NotifyInitialNetMap|ipn.NotifyInitialPrefs|ipn.NotifyInitialState|ipn.NotifyInitialHealthState) if err != nil { return fmt.Errorf("failed to watch tailscaled for updates: %w", err) } @@ -365,8 +372,23 @@ func run() error { if isOneStepConfig(cfg) { // This could happen if this is the first time tailscaled was run for this // device and the auth key was not passed via the configfile. - return fmt.Errorf("invalid state: tailscaled daemon started with a config file, but tailscale is not logged in: ensure you pass a valid auth key in the config file.") + if hasKubeStateStore(cfg) { + log.Printf("Auth key missing or invalid (NeedsLogin state), disconnecting from control and requesting new key from operator") + + err := kc.setAndWaitForAuthKeyReissue(bootCtx, client, cfg, tailscaledConfigAuthkey) + if err != nil { + return fmt.Errorf("failed to get a reissued authkey: %w", err) + } + + log.Printf("Successfully received new auth key, restarting to apply configuration") + + // we don't return an error here since we have handled the reissue gracefully. + return nil + } + + return errors.New("invalid state: tailscaled daemon started with a config file, but tailscale is not logged in: ensure you pass a valid auth key in the config file") } + if err := authTailscale(); err != nil { return fmt.Errorf("failed to auth tailscale: %w", err) } @@ -384,6 +406,27 @@ func run() error { log.Printf("tailscaled in state %q, waiting", *n.State) } } + + if n.Health != nil { + // This can happen if the config has an auth key but it's invalid, + // for example if it was single-use and already got used, but the + // device state was lost. + if _, ok := n.Health.Warnings[health.LoginStateWarnable.Code]; ok { + if isOneStepConfig(cfg) && hasKubeStateStore(cfg) { + log.Printf("Auth key failed to authenticate (may be expired or single-use), disconnecting from control and requesting new key from operator") + + err := kc.setAndWaitForAuthKeyReissue(bootCtx, client, cfg, tailscaledConfigAuthkey) + if err != nil { + return fmt.Errorf("failed to get a reissued authkey: %w", err) + } + + // we don't return an error here since we have handled the reissue gracefully. + log.Printf("Successfully received new auth key, restarting to apply configuration") + + return nil + } + } + } } w.Close() @@ -409,9 +452,9 @@ func run() error { // We were told to only auth once, so any secret-bound // authkey is no longer needed. We don't strictly need to // wipe it, but it's good hygiene. - log.Printf("Deleting authkey from kube secret") + log.Printf("Deleting authkey from Kubernetes Secret") if err := kc.deleteAuthKey(ctx); err != nil { - return fmt.Errorf("deleting authkey from kube secret: %w", err) + return fmt.Errorf("deleting authkey from Kubernetes Secret: %w", err) } } @@ -422,8 +465,10 @@ func run() error { // If tailscaled config was read from a mounted file, watch the file for updates and reload. cfgWatchErrChan := make(chan error) + cfgWatchCtx, cfgWatchCancel := context.WithCancel(ctx) + defer cfgWatchCancel() if cfg.TailscaledConfigFilePath != "" { - go watchTailscaledConfigChanges(ctx, cfg.TailscaledConfigFilePath, client, cfgWatchErrChan) + go watchTailscaledConfigChanges(cfgWatchCtx, cfg.TailscaledConfigFilePath, client, cfgWatchErrChan) } var ( @@ -523,6 +568,7 @@ func run() error { case err := <-cfgWatchErrChan: return fmt.Errorf("failed to watch tailscaled config: %w", err) case n := <-notifyChan: + // TODO: (ChaosInTheCRD) Add node removed check when supported by ipn if n.State != nil && *n.State != ipn.Running { // Something's gone wrong and we've left the authenticated state. // Our container image never recovered gracefully from this, and the @@ -979,3 +1025,11 @@ func serviceIPsFromNetMap(nm *netmap.NetworkMap, fqdn dnsname.FQDN) []netip.Pref return prefixes } + +func authkeyFromTailscaledConfig(path string) string { + if cfg, err := conffile.Load(path); err == nil && cfg.Parsed.AuthKey != nil { + return *cfg.Parsed.AuthKey + } + + return "" +} diff --git a/cmd/containerboot/main_test.go b/cmd/containerboot/main_test.go index 365cf2184..5ea402f66 100644 --- a/cmd/containerboot/main_test.go +++ b/cmd/containerboot/main_test.go @@ -32,6 +32,7 @@ "github.com/google/go-cmp/cmp" "golang.org/x/sys/unix" + "tailscale.com/health" "tailscale.com/ipn" "tailscale.com/kube/egressservices" "tailscale.com/kube/kubeclient" @@ -41,6 +42,8 @@ "tailscale.com/types/netmap" ) +const configFileAuthKey = "some-auth-key" + func TestContainerBoot(t *testing.T) { boot := filepath.Join(t.TempDir(), "containerboot") if err := exec.Command("go", "build", "-ldflags", "-X main.testSleepDuration=1ms", "-o", boot, "tailscale.com/cmd/containerboot").Run(); err != nil { @@ -77,6 +80,10 @@ type phase struct { // phase (simulates our fake tailscaled doing it). UpdateKubeSecret map[string]string + // Update files with these paths/contents at the beginning of the phase + // (simulates the operator updating mounted config files). + UpdateFiles map[string]string + // WantFiles files that should exist in the container and their // contents. WantFiles map[string]string @@ -781,6 +788,127 @@ type testCase struct { }, } }, + "sets_reissue_authkey_if_needs_login": func(env *testEnv) testCase { + newAuthKey := "new-reissued-auth-key" + return testCase{ + Env: map[string]string{ + "TS_EXPERIMENTAL_VERSIONED_CONFIG_DIR": filepath.Join(env.d, "etc/tailscaled/"), + "KUBERNETES_SERVICE_HOST": env.kube.Host, + "KUBERNETES_SERVICE_PORT_HTTPS": env.kube.Port, + }, + Phases: []phase{ + { + UpdateFiles: map[string]string{ + "etc/tailscaled/..data": "", + }, + WantCmds: []string{ + "/usr/bin/tailscaled --socket=/tmp/tailscaled.sock --state=kube:tailscale --statedir=/tmp --tun=userspace-networking --config=/etc/tailscaled/cap-95.hujson", + }, + WantKubeSecret: map[string]string{ + kubetypes.KeyCapVer: capver, + }, + }, { + Notify: &ipn.Notify{ + State: new(ipn.NeedsLogin), + }, + WantKubeSecret: map[string]string{ + kubetypes.KeyCapVer: capver, + kubetypes.KeyReissueAuthkey: configFileAuthKey, + }, + WantLog: "watching for config changes via fsnotify", + }, { + UpdateFiles: map[string]string{ + "etc/tailscaled/cap-95.hujson": fmt.Sprintf(`{"Version":"alpha0","AuthKey":"%s"}`, newAuthKey), + "etc/tailscaled/..data": "updated", + }, + WantKubeSecret: map[string]string{ + kubetypes.KeyCapVer: capver, + }, + WantExitCode: new(0), + WantLog: "Successfully received new auth key, restarting to apply configuration", + }, + }, + } + }, + "sets_reissue_authkey_if_auth_fails": func(env *testEnv) testCase { + newAuthKey := "new-reissued-auth-key" + return testCase{ + Env: map[string]string{ + "TS_EXPERIMENTAL_VERSIONED_CONFIG_DIR": filepath.Join(env.d, "etc/tailscaled/"), + "KUBERNETES_SERVICE_HOST": env.kube.Host, + "KUBERNETES_SERVICE_PORT_HTTPS": env.kube.Port, + }, + Phases: []phase{ + { + UpdateFiles: map[string]string{ + "etc/tailscaled/..data": "", + }, + WantCmds: []string{ + "/usr/bin/tailscaled --socket=/tmp/tailscaled.sock --state=kube:tailscale --statedir=/tmp --tun=userspace-networking --config=/etc/tailscaled/cap-95.hujson", + }, + WantKubeSecret: map[string]string{ + kubetypes.KeyCapVer: capver, + }, + }, { + Notify: &ipn.Notify{ + Health: &health.State{ + Warnings: map[health.WarnableCode]health.UnhealthyState{ + health.LoginStateWarnable.Code: {}, + }, + }, + }, + WantKubeSecret: map[string]string{ + kubetypes.KeyCapVer: capver, + kubetypes.KeyReissueAuthkey: configFileAuthKey, + }, + WantLog: "watching for config changes via fsnotify", + }, { + UpdateFiles: map[string]string{ + "etc/tailscaled/cap-95.hujson": fmt.Sprintf(`{"Version":"alpha0","AuthKey":"%s"}`, newAuthKey), + "etc/tailscaled/..data": "updated", + }, + WantKubeSecret: map[string]string{ + kubetypes.KeyCapVer: capver, + }, + WantExitCode: new(0), + WantLog: "Successfully received new auth key, restarting to apply configuration", + }, + }, + } + }, + "clears_reissue_authkey_on_change": func(env *testEnv) testCase { + return testCase{ + Env: map[string]string{ + "TS_EXPERIMENTAL_VERSIONED_CONFIG_DIR": filepath.Join(env.d, "etc/tailscaled/"), + "KUBERNETES_SERVICE_HOST": env.kube.Host, + "KUBERNETES_SERVICE_PORT_HTTPS": env.kube.Port, + }, + KubeSecret: map[string]string{ + kubetypes.KeyReissueAuthkey: "some-older-authkey", + "foo": "bar", // Check not everything is cleared. + }, + Phases: []phase{ + { + WantCmds: []string{ + "/usr/bin/tailscaled --socket=/tmp/tailscaled.sock --state=kube:tailscale --statedir=/tmp --tun=userspace-networking --config=/etc/tailscaled/cap-95.hujson", + }, + WantKubeSecret: map[string]string{ + kubetypes.KeyCapVer: capver, + "foo": "bar", + }, + }, { + Notify: runningNotify, + WantKubeSecret: map[string]string{ + kubetypes.KeyCapVer: capver, + "foo": "bar", + kubetypes.KeyDeviceFQDN: "test-node.test.ts.net.", + kubetypes.KeyDeviceID: "myID", + kubetypes.KeyDeviceIPs: `["100.64.0.1"]`, + }, + }, + }, + } + }, "metrics_enabled": func(env *testEnv) testCase { return testCase{ Env: map[string]string{ @@ -1134,19 +1262,22 @@ type testCase struct { for k, v := range p.UpdateKubeSecret { env.kube.SetSecret(k, v) } + for path, content := range p.UpdateFiles { + fullPath := filepath.Join(env.d, path) + if err := os.WriteFile(fullPath, []byte(content), 0700); err != nil { + t.Fatalf("phase %d: updating file %q: %v", i, path, err) + } + // Explicitly update mtime to ensure fsnotify detects the change. + // Without this, file operations can be buffered and fsnotify events may not trigger. + now := time.Now() + if err := os.Chtimes(fullPath, now, now); err != nil { + t.Fatalf("phase %d: updating mtime for %q: %v", i, path, err) + } + } env.lapi.Notify(p.Notify) if p.Signal != nil { cmd.Process.Signal(*p.Signal) } - if p.WantLog != "" { - err := tstest.WaitFor(2*time.Second, func() error { - waitLogLine(t, time.Second, cbOut, p.WantLog) - return nil - }) - if err != nil { - t.Fatal(err) - } - } if p.WantExitCode != nil { state, err := cmd.Process.Wait() @@ -1156,14 +1287,19 @@ type testCase struct { if state.ExitCode() != *p.WantExitCode { t.Fatalf("phase %d: want exit code %d, got %d", i, *p.WantExitCode, state.ExitCode()) } - - // Early test return, we don't expect the successful startup log message. - return } - wantCmds = append(wantCmds, p.WantCmds...) - waitArgs(t, 2*time.Second, env.d, env.argFile, strings.Join(wantCmds, "\n")) - err := tstest.WaitFor(2*time.Second, func() error { + if p.WantLog != "" { + err := tstest.WaitFor(5*time.Second, func() error { + waitLogLine(t, 5*time.Second, cbOut, p.WantLog) + return nil + }) + if err != nil { + t.Fatal(err) + } + } + + err := tstest.WaitFor(5*time.Second, func() error { if p.WantKubeSecret != nil { got := env.kube.Secret() if diff := cmp.Diff(got, p.WantKubeSecret); diff != "" { @@ -1180,6 +1316,16 @@ type testCase struct { if err != nil { t.Fatalf("test: %q phase %d: %v", name, i, err) } + + // if we provide a wanted exit code, we expect that the process is finished, + // so should return from the test. + if p.WantExitCode != nil { + return + } + + wantCmds = append(wantCmds, p.WantCmds...) + waitArgs(t, 2*time.Second, env.d, env.argFile, strings.Join(wantCmds, "\n")) + err = tstest.WaitFor(2*time.Second, func() error { for path, want := range p.WantFiles { gotBs, err := os.ReadFile(filepath.Join(env.d, path)) @@ -1393,6 +1539,13 @@ func (lc *localAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) { default: panic(fmt.Sprintf("unsupported method %q", r.Method)) } + // In the localAPI ServeHTTP method + case "/localapi/v0/disconnect-control": + if r.Method != "POST" { + panic(fmt.Sprintf("unsupported method %q", r.Method)) + } + w.WriteHeader(http.StatusOK) + return default: panic(fmt.Sprintf("unsupported path %q", r.URL.Path)) } @@ -1591,7 +1744,11 @@ func (k *kubeServer) serveSecret(w http.ResponseWriter, r *http.Request) { panic(fmt.Sprintf("json decode failed: %v. Body:\n\n%s", err, string(bs))) } for key, val := range req.Data { - k.secret[key] = string(val) + if val == nil { + delete(k.secret, key) + } else { + k.secret[key] = string(val) + } } default: panic(fmt.Sprintf("unknown content type %q", r.Header.Get("Content-Type"))) @@ -1659,7 +1816,7 @@ func newTestEnv(t *testing.T) testEnv { kube.Start(t) t.Cleanup(kube.Close) - tailscaledConf := &ipn.ConfigVAlpha{AuthKey: new("foo"), Version: "alpha0"} + tailscaledConf := &ipn.ConfigVAlpha{AuthKey: new(configFileAuthKey), Version: "alpha0"} serveConf := ipn.ServeConfig{TCP: map[uint16]*ipn.TCPPortHandler{80: {HTTP: true}}} serveConfWithServices := ipn.ServeConfig{ TCP: map[uint16]*ipn.TCPPortHandler{80: {HTTP: true}}, diff --git a/cmd/k8s-operator/operator.go b/cmd/k8s-operator/operator.go index ef55d2748..1060c6f3d 100644 --- a/cmd/k8s-operator/operator.go +++ b/cmd/k8s-operator/operator.go @@ -20,6 +20,7 @@ "github.com/go-logr/zapr" "go.uber.org/zap" "go.uber.org/zap/zapcore" + "golang.org/x/time/rate" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" @@ -723,6 +724,8 @@ func runReconcilers(opts reconcilerOpts) { tsFirewallMode: opts.proxyFirewallMode, defaultProxyClass: opts.defaultProxyClass, loginServer: opts.tsServer.ControlURL, + authKeyRateLimits: make(map[string]*rate.Limiter), + authKeyReissuing: make(map[string]bool), }) if err != nil { startlog.Fatalf("could not create ProxyGroup reconciler: %v", err) diff --git a/cmd/k8s-operator/proxygroup.go b/cmd/k8s-operator/proxygroup.go index 538933f14..4d5a795d7 100644 --- a/cmd/k8s-operator/proxygroup.go +++ b/cmd/k8s-operator/proxygroup.go @@ -16,10 +16,12 @@ "sort" "strings" "sync" + "time" dockerref "github.com/distribution/reference" "go.uber.org/zap" xslices "golang.org/x/exp/slices" + "golang.org/x/time/rate" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -94,10 +96,12 @@ type ProxyGroupReconciler struct { defaultProxyClass string loginServer string - mu sync.Mutex // protects following - egressProxyGroups set.Slice[types.UID] // for egress proxygroups gauge - ingressProxyGroups set.Slice[types.UID] // for ingress proxygroups gauge - apiServerProxyGroups set.Slice[types.UID] // for kube-apiserver proxygroups gauge + mu sync.Mutex // protects following + egressProxyGroups set.Slice[types.UID] // for egress proxygroups gauge + ingressProxyGroups set.Slice[types.UID] // for ingress proxygroups gauge + apiServerProxyGroups set.Slice[types.UID] // for kube-apiserver proxygroups gauge + authKeyRateLimits map[string]*rate.Limiter // per-ProxyGroup rate limiters for auth key re-issuance. + authKeyReissuing map[string]bool } func (r *ProxyGroupReconciler) logger(name string) *zap.SugaredLogger { @@ -294,7 +298,7 @@ func (r *ProxyGroupReconciler) validate(ctx context.Context, pg *tsapi.ProxyGrou func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, tailscaleClient tsClient, loginUrl string, pg *tsapi.ProxyGroup, proxyClass *tsapi.ProxyClass) (map[string][]netip.AddrPort, *notReadyReason, error) { logger := r.logger(pg.Name) r.mu.Lock() - r.ensureAddedToGaugeForProxyGroup(pg) + r.ensureStateAddedForProxyGroup(pg) r.mu.Unlock() svcToNodePorts := make(map[string]uint16) @@ -629,13 +633,13 @@ func (r *ProxyGroupReconciler) cleanupDanglingResources(ctx context.Context, tai } for _, m := range metadata { - if m.ordinal+1 <= int(pgReplicas(pg)) { + if m.ordinal+1 <= pgReplicas(pg) { continue } // Dangling resource, delete the config + state Secrets, as well as // deleting the device from the tailnet. - if err := r.deleteTailnetDevice(ctx, tailscaleClient, m.tsID, logger); err != nil { + if err := r.ensureDeviceDeleted(ctx, tailscaleClient, m.tsID, logger); err != nil { return err } if err := r.Delete(ctx, m.stateSecret); err != nil && !apierrors.IsNotFound(err) { @@ -687,7 +691,7 @@ func (r *ProxyGroupReconciler) maybeCleanup(ctx context.Context, tailscaleClient } for _, m := range metadata { - if err := r.deleteTailnetDevice(ctx, tailscaleClient, m.tsID, logger); err != nil { + if err := r.ensureDeviceDeleted(ctx, tailscaleClient, m.tsID, logger); err != nil { return false, err } } @@ -703,12 +707,12 @@ func (r *ProxyGroupReconciler) maybeCleanup(ctx context.Context, tailscaleClient logger.Infof("cleaned up ProxyGroup resources") r.mu.Lock() - r.ensureRemovedFromGaugeForProxyGroup(pg) + r.ensureStateRemovedForProxyGroup(pg) r.mu.Unlock() return true, nil } -func (r *ProxyGroupReconciler) deleteTailnetDevice(ctx context.Context, tailscaleClient tsClient, id tailcfg.StableNodeID, logger *zap.SugaredLogger) error { +func (r *ProxyGroupReconciler) ensureDeviceDeleted(ctx context.Context, tailscaleClient tsClient, id tailcfg.StableNodeID, logger *zap.SugaredLogger) error { logger.Debugf("deleting device %s from control", string(id)) if err := tailscaleClient.DeleteDevice(ctx, string(id)); err != nil { if errResp, ok := errors.AsType[tailscale.ErrResponse](err); ok && errResp.Status == http.StatusNotFound { @@ -734,6 +738,7 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated( logger := r.logger(pg.Name) endpoints = make(map[string][]netip.AddrPort, pgReplicas(pg)) // keyed by Service name. for i := range pgReplicas(pg) { + logger = logger.With("Pod", fmt.Sprintf("%s-%d", pg.Name, i)) cfgSecret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: pgConfigSecretName(pg.Name, i), @@ -751,38 +756,9 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated( return nil, err } - var authKey *string - if existingCfgSecret == nil { - logger.Debugf("Creating authkey for new ProxyGroup proxy") - tags := pg.Spec.Tags.Stringify() - if len(tags) == 0 { - tags = r.defaultTags - } - key, err := newAuthKey(ctx, tailscaleClient, tags) - if err != nil { - return nil, err - } - authKey = &key - } - - if authKey == nil { - // Get state Secret to check if it's already authed. - stateSecret := &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: pgStateSecretName(pg.Name, i), - Namespace: r.tsNamespace, - }, - } - if err = r.Get(ctx, client.ObjectKeyFromObject(stateSecret), stateSecret); err != nil && !apierrors.IsNotFound(err) { - return nil, err - } - - if shouldRetainAuthKey(stateSecret) && existingCfgSecret != nil { - authKey, err = authKeyFromSecret(existingCfgSecret) - if err != nil { - return nil, fmt.Errorf("error retrieving auth key from existing config Secret: %w", err) - } - } + authKey, err := r.getAuthKey(ctx, tailscaleClient, pg, existingCfgSecret, i, logger) + if err != nil { + return nil, err } nodePortSvcName := pgNodePortServiceName(pg.Name, i) @@ -918,11 +894,137 @@ func (r *ProxyGroupReconciler) ensureConfigSecretsCreated( return nil, err } } + } return endpoints, nil } +// getAuthKey returns an auth key for the proxy, or nil if none is needed. +// A new key is created if the config Secret doesn't exist yet, or if the +// proxy has requested a reissue via its state Secret. An existing key is +// retained while the device hasn't authed or a reissue is in progress. +func (r *ProxyGroupReconciler) getAuthKey(ctx context.Context, tailscaleClient tsClient, pg *tsapi.ProxyGroup, existingCfgSecret *corev1.Secret, ordinal int32, logger *zap.SugaredLogger) (*string, error) { + // Get state Secret to check if it's already authed or has requested + // a fresh auth key. + stateSecret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: pgStateSecretName(pg.Name, ordinal), + Namespace: r.tsNamespace, + }, + } + if err := r.Get(ctx, client.ObjectKeyFromObject(stateSecret), stateSecret); err != nil && !apierrors.IsNotFound(err) { + return nil, err + } + + var createAuthKey bool + var cfgAuthKey *string + if existingCfgSecret == nil { + createAuthKey = true + } else { + var err error + cfgAuthKey, err = authKeyFromSecret(existingCfgSecret) + if err != nil { + return nil, fmt.Errorf("error retrieving auth key from existing config Secret: %w", err) + } + } + + if !createAuthKey { + var err error + createAuthKey, err = r.shouldReissueAuthKey(ctx, tailscaleClient, pg, stateSecret, cfgAuthKey) + if err != nil { + return nil, err + } + } + + var authKey *string + if createAuthKey { + logger.Debugf("creating auth key for ProxyGroup proxy %q", stateSecret.Name) + + tags := pg.Spec.Tags.Stringify() + if len(tags) == 0 { + tags = r.defaultTags + } + key, err := newAuthKey(ctx, tailscaleClient, tags) + if err != nil { + return nil, err + } + authKey = &key + } else { + // Retain auth key if the device hasn't authed yet, or if a + // reissue is in progress (device_id is stale during reissue). + _, reissueRequested := stateSecret.Data[kubetypes.KeyReissueAuthkey] + if !deviceAuthed(stateSecret) || reissueRequested { + authKey = cfgAuthKey + } + } + + return authKey, nil +} + +// shouldReissueAuthKey returns true if the proxy needs a new auth key. It +// tracks in-flight reissues via authKeyReissuing to avoid duplicate API calls +// across reconciles. +func (r *ProxyGroupReconciler) shouldReissueAuthKey(ctx context.Context, tailscaleClient tsClient, pg *tsapi.ProxyGroup, stateSecret *corev1.Secret, cfgAuthKey *string) (shouldReissue bool, err error) { + r.mu.Lock() + reissuing := r.authKeyReissuing[stateSecret.Name] + r.mu.Unlock() + + if reissuing { + // Check if reissue is complete by seeing if request was cleared + _, requestStillPresent := stateSecret.Data[kubetypes.KeyReissueAuthkey] + if !requestStillPresent { + // Containerboot cleared the request, reissue is complete + r.mu.Lock() + r.authKeyReissuing[stateSecret.Name] = false + r.mu.Unlock() + r.log.Debugf("auth key reissue completed for %q", stateSecret.Name) + return false, nil + } + + // Reissue still in-flight; waiting for containerboot to pick up new key + r.log.Debugf("auth key already in process of re-issuance, waiting for secret to be updated") + return false, nil + } + + defer func() { + r.mu.Lock() + r.authKeyReissuing[stateSecret.Name] = shouldReissue + r.mu.Unlock() + }() + + brokenAuthkey, ok := stateSecret.Data[kubetypes.KeyReissueAuthkey] + if !ok { + // reissue hasn't been requested since the key in the secret hasn't been populated + return false, nil + } + + empty := cfgAuthKey == nil || *cfgAuthKey == "" + broken := cfgAuthKey != nil && *cfgAuthKey == string(brokenAuthkey) + + // A new key has been written but the proxy hasn't picked it up yet. + if !empty && !broken { + return false, nil + } + + lim := r.authKeyRateLimits[pg.Name] + if !lim.Allow() { + r.log.Debugf("auth key re-issuance rate limit exceeded, limit: %.2f, burst: %d, tokens: %.2f", + lim.Limit(), lim.Burst(), lim.Tokens()) + return false, fmt.Errorf("auth key re-issuance rate limit exceeded for ProxyGroup %q, will retry with backoff", pg.Name) + } + + r.log.Infof("Proxy failing to auth; attempting cleanup and new key") + if tsID := stateSecret.Data[kubetypes.KeyDeviceID]; len(tsID) > 0 { + id := tailcfg.StableNodeID(tsID) + if err := r.ensureDeviceDeleted(ctx, tailscaleClient, id, r.log); err != nil { + return false, err + } + } + + return true, nil +} + type FindStaticEndpointErr struct { msg string } @@ -1016,9 +1118,9 @@ func getStaticEndpointAddress(a *corev1.NodeAddress, port uint16) *netip.AddrPor return new(netip.AddrPortFrom(addr, port)) } -// ensureAddedToGaugeForProxyGroup ensures the gauge metric for the ProxyGroup resource is updated when the ProxyGroup -// is created. r.mu must be held. -func (r *ProxyGroupReconciler) ensureAddedToGaugeForProxyGroup(pg *tsapi.ProxyGroup) { +// ensureStateAddedForProxyGroup ensures the gauge metric for the ProxyGroup resource is updated when the ProxyGroup +// is created, and initialises per-ProxyGroup rate limits on re-issuing auth keys. r.mu must be held. +func (r *ProxyGroupReconciler) ensureStateAddedForProxyGroup(pg *tsapi.ProxyGroup) { switch pg.Spec.Type { case tsapi.ProxyGroupTypeEgress: r.egressProxyGroups.Add(pg.UID) @@ -1030,11 +1132,24 @@ func (r *ProxyGroupReconciler) ensureAddedToGaugeForProxyGroup(pg *tsapi.ProxyGr gaugeEgressProxyGroupResources.Set(int64(r.egressProxyGroups.Len())) gaugeIngressProxyGroupResources.Set(int64(r.ingressProxyGroups.Len())) gaugeAPIServerProxyGroupResources.Set(int64(r.apiServerProxyGroups.Len())) + + if _, ok := r.authKeyRateLimits[pg.Name]; !ok { + // Allow every replica to have its auth key re-issued quickly the first + // time, but with an overall limit of 1 every 30s after a burst. + r.authKeyRateLimits[pg.Name] = rate.NewLimiter(rate.Every(30*time.Second), int(pgReplicas(pg))) + } + + for i := range pgReplicas(pg) { + rep := pgStateSecretName(pg.Name, i) + if _, ok := r.authKeyReissuing[rep]; !ok { + r.authKeyReissuing[rep] = false + } + } } -// ensureRemovedFromGaugeForProxyGroup ensures the gauge metric for the ProxyGroup resource type is updated when the -// ProxyGroup is deleted. r.mu must be held. -func (r *ProxyGroupReconciler) ensureRemovedFromGaugeForProxyGroup(pg *tsapi.ProxyGroup) { +// ensureStateRemovedForProxyGroup ensures the gauge metric for the ProxyGroup resource type is updated when the +// ProxyGroup is deleted, and deletes the per-ProxyGroup rate limiter to free memory. r.mu must be held. +func (r *ProxyGroupReconciler) ensureStateRemovedForProxyGroup(pg *tsapi.ProxyGroup) { switch pg.Spec.Type { case tsapi.ProxyGroupTypeEgress: r.egressProxyGroups.Remove(pg.UID) @@ -1046,6 +1161,7 @@ func (r *ProxyGroupReconciler) ensureRemovedFromGaugeForProxyGroup(pg *tsapi.Pro gaugeEgressProxyGroupResources.Set(int64(r.egressProxyGroups.Len())) gaugeIngressProxyGroupResources.Set(int64(r.ingressProxyGroups.Len())) gaugeAPIServerProxyGroupResources.Set(int64(r.apiServerProxyGroups.Len())) + delete(r.authKeyRateLimits, pg.Name) } func pgTailscaledConfig(pg *tsapi.ProxyGroup, loginServer string, pc *tsapi.ProxyClass, idx int32, authKey *string, staticEndpoints []netip.AddrPort, oldAdvertiseServices []string) (tailscaledConfigs, error) { @@ -1106,7 +1222,7 @@ func getNodeMetadata(ctx context.Context, pg *tsapi.ProxyGroup, cl client.Client return nil, fmt.Errorf("failed to list state Secrets: %w", err) } for _, secret := range secrets.Items { - var ordinal int + var ordinal int32 if _, err := fmt.Sscanf(secret.Name, pg.Name+"-%d", &ordinal); err != nil { return nil, fmt.Errorf("unexpected secret %s was labelled as owned by the ProxyGroup %s: %w", secret.Name, pg.Name, err) } @@ -1213,7 +1329,7 @@ func (r *ProxyGroupReconciler) getClientAndLoginURL(ctx context.Context, tailnet } type nodeMetadata struct { - ordinal int + ordinal int32 stateSecret *corev1.Secret podUID string // or empty if the Pod no longer exists. tsID tailcfg.StableNodeID diff --git a/cmd/k8s-operator/proxygroup_test.go b/cmd/k8s-operator/proxygroup_test.go index 9b3ee0e0f..1a50ee1f0 100644 --- a/cmd/k8s-operator/proxygroup_test.go +++ b/cmd/k8s-operator/proxygroup_test.go @@ -6,15 +6,19 @@ package main import ( + "context" "encoding/json" "fmt" "net/netip" + "reflect" "slices" + "strings" "testing" "time" "github.com/google/go-cmp/cmp" "go.uber.org/zap" + "golang.org/x/time/rate" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" @@ -28,7 +32,6 @@ "sigs.k8s.io/controller-runtime/pkg/client/fake" "tailscale.com/client/tailscale" "tailscale.com/ipn" - kube "tailscale.com/k8s-operator" tsoperator "tailscale.com/k8s-operator" tsapi "tailscale.com/k8s-operator/apis/v1alpha1" "tailscale.com/kube/k8s-proxy/conf" @@ -637,10 +640,12 @@ type reconcile struct { tsFirewallMode: "auto", defaultProxyClass: "default-pc", - Client: fc, - tsClient: tsClient, - recorder: fr, - clock: cl, + Client: fc, + tsClient: tsClient, + recorder: fr, + clock: cl, + authKeyRateLimits: make(map[string]*rate.Limiter), + authKeyReissuing: make(map[string]bool), } for i, r := range tt.reconciles { @@ -780,11 +785,13 @@ type reconcile struct { tsFirewallMode: "auto", defaultProxyClass: "default-pc", - Client: fc, - tsClient: tsClient, - recorder: fr, - log: zl.Sugar().With("TestName", tt.name).With("Reconcile", "cleanup"), - clock: cl, + Client: fc, + tsClient: tsClient, + recorder: fr, + log: zl.Sugar().With("TestName", tt.name).With("Reconcile", "cleanup"), + clock: cl, + authKeyRateLimits: make(map[string]*rate.Limiter), + authKeyReissuing: make(map[string]bool), } if err := fc.Delete(t.Context(), pg); err != nil { @@ -841,12 +848,15 @@ func TestProxyGroup(t *testing.T) { tsFirewallMode: "auto", defaultProxyClass: "default-pc", - Client: fc, - tsClient: tsClient, - recorder: fr, - log: zl.Sugar(), - clock: cl, + Client: fc, + tsClient: tsClient, + recorder: fr, + log: zl.Sugar(), + clock: cl, + authKeyRateLimits: make(map[string]*rate.Limiter), + authKeyReissuing: make(map[string]bool), } + crd := &apiextensionsv1.CustomResourceDefinition{ObjectMeta: metav1.ObjectMeta{Name: serviceMonitorCRD}} opts := configOpts{ proxyType: "proxygroup", @@ -863,7 +873,7 @@ func TestProxyGroup(t *testing.T) { tsoperator.SetProxyGroupCondition(pg, tsapi.ProxyGroupReady, metav1.ConditionFalse, reasonProxyGroupCreating, "the ProxyGroup's ProxyClass \"default-pc\" is not yet in a ready state, waiting...", 1, cl, zl.Sugar()) expectEqual(t, fc, pg) expectProxyGroupResources(t, fc, pg, false, pc) - if kube.ProxyGroupAvailable(pg) { + if tsoperator.ProxyGroupAvailable(pg) { t.Fatal("expected ProxyGroup to not be available") } }) @@ -891,7 +901,7 @@ func TestProxyGroup(t *testing.T) { tsoperator.SetProxyGroupCondition(pg, tsapi.ProxyGroupAvailable, metav1.ConditionFalse, reasonProxyGroupCreating, "0/2 ProxyGroup pods running", 0, cl, zl.Sugar()) expectEqual(t, fc, pg) expectProxyGroupResources(t, fc, pg, true, pc) - if kube.ProxyGroupAvailable(pg) { + if tsoperator.ProxyGroupAvailable(pg) { t.Fatal("expected ProxyGroup to not be available") } if expected := 1; reconciler.egressProxyGroups.Len() != expected { @@ -935,7 +945,7 @@ func TestProxyGroup(t *testing.T) { tsoperator.SetProxyGroupCondition(pg, tsapi.ProxyGroupAvailable, metav1.ConditionTrue, reasonProxyGroupAvailable, "2/2 ProxyGroup pods running", 0, cl, zl.Sugar()) expectEqual(t, fc, pg) expectProxyGroupResources(t, fc, pg, true, pc) - if !kube.ProxyGroupAvailable(pg) { + if !tsoperator.ProxyGroupAvailable(pg) { t.Fatal("expected ProxyGroup to be available") } }) @@ -1045,12 +1055,14 @@ func TestProxyGroupTypes(t *testing.T) { zl, _ := zap.NewDevelopment() reconciler := &ProxyGroupReconciler{ - tsNamespace: tsNamespace, - tsProxyImage: testProxyImage, - Client: fc, - log: zl.Sugar(), - tsClient: &fakeTSClient{}, - clock: tstest.NewClock(tstest.ClockOpts{}), + tsNamespace: tsNamespace, + tsProxyImage: testProxyImage, + Client: fc, + log: zl.Sugar(), + tsClient: &fakeTSClient{}, + clock: tstest.NewClock(tstest.ClockOpts{}), + authKeyRateLimits: make(map[string]*rate.Limiter), + authKeyReissuing: make(map[string]bool), } t.Run("egress_type", func(t *testing.T) { @@ -1285,12 +1297,14 @@ func TestKubeAPIServerStatusConditionFlow(t *testing.T) { WithStatusSubresource(pg). Build() r := &ProxyGroupReconciler{ - tsNamespace: tsNamespace, - tsProxyImage: testProxyImage, - Client: fc, - log: zap.Must(zap.NewDevelopment()).Sugar(), - tsClient: &fakeTSClient{}, - clock: tstest.NewClock(tstest.ClockOpts{}), + tsNamespace: tsNamespace, + tsProxyImage: testProxyImage, + Client: fc, + log: zap.Must(zap.NewDevelopment()).Sugar(), + tsClient: &fakeTSClient{}, + clock: tstest.NewClock(tstest.ClockOpts{}), + authKeyRateLimits: make(map[string]*rate.Limiter), + authKeyReissuing: make(map[string]bool), } expectReconciled(t, r, "", pg.Name) @@ -1338,12 +1352,14 @@ func TestKubeAPIServerType_DoesNotOverwriteServicesConfig(t *testing.T) { Build() reconciler := &ProxyGroupReconciler{ - tsNamespace: tsNamespace, - tsProxyImage: testProxyImage, - Client: fc, - log: zap.Must(zap.NewDevelopment()).Sugar(), - tsClient: &fakeTSClient{}, - clock: tstest.NewClock(tstest.ClockOpts{}), + tsNamespace: tsNamespace, + tsProxyImage: testProxyImage, + Client: fc, + log: zap.Must(zap.NewDevelopment()).Sugar(), + tsClient: &fakeTSClient{}, + clock: tstest.NewClock(tstest.ClockOpts{}), + authKeyRateLimits: make(map[string]*rate.Limiter), + authKeyReissuing: make(map[string]bool), } pg := &tsapi.ProxyGroup{ @@ -1367,7 +1383,7 @@ func TestKubeAPIServerType_DoesNotOverwriteServicesConfig(t *testing.T) { cfg := conf.VersionedConfig{ Version: "v1alpha1", ConfigV1Alpha1: &conf.ConfigV1Alpha1{ - AuthKey: new("secret-authkey"), + AuthKey: new("new-authkey"), State: new(fmt.Sprintf("kube:%s", pgPodName(pg.Name, 0))), App: new(kubetypes.AppProxyGroupKubeAPIServer), LogLevel: new("debug"), @@ -1423,12 +1439,14 @@ func TestIngressAdvertiseServicesConfigPreserved(t *testing.T) { WithStatusSubresource(&tsapi.ProxyGroup{}). Build() reconciler := &ProxyGroupReconciler{ - tsNamespace: tsNamespace, - tsProxyImage: testProxyImage, - Client: fc, - log: zap.Must(zap.NewDevelopment()).Sugar(), - tsClient: &fakeTSClient{}, - clock: tstest.NewClock(tstest.ClockOpts{}), + tsNamespace: tsNamespace, + tsProxyImage: testProxyImage, + Client: fc, + log: zap.Must(zap.NewDevelopment()).Sugar(), + tsClient: &fakeTSClient{}, + clock: tstest.NewClock(tstest.ClockOpts{}), + authKeyRateLimits: make(map[string]*rate.Limiter), + authKeyReissuing: make(map[string]bool), } existingServices := []string{"svc1", "svc2"} @@ -1653,6 +1671,197 @@ type unwrapper interface { } } +func TestProxyGroupGetAuthKey(t *testing.T) { + pg := &tsapi.ProxyGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Finalizers: []string{"tailscale.com/finalizer"}, + }, + Spec: tsapi.ProxyGroupSpec{ + Type: tsapi.ProxyGroupTypeEgress, + Replicas: new(int32(1)), + }, + } + tsClient := &fakeTSClient{} + + // Variables to reference in test cases. + existingAuthKey := new("existing-auth-key") + newAuthKey := new("new-authkey") + configWith := func(authKey *string) map[string][]byte { + value := []byte("{}") + if authKey != nil { + value = fmt.Appendf(nil, `{"AuthKey": "%s"}`, *authKey) + } + return map[string][]byte{ + tsoperator.TailscaledConfigFileName(pgMinCapabilityVersion): value, + } + } + + initTest := func() (*ProxyGroupReconciler, client.WithWatch) { + fc := fake.NewClientBuilder(). + WithScheme(tsapi.GlobalScheme). + WithObjects(pg). + WithStatusSubresource(pg). + Build() + zl, _ := zap.NewDevelopment() + fr := record.NewFakeRecorder(1) + cl := tstest.NewClock(tstest.ClockOpts{}) + reconciler := &ProxyGroupReconciler{ + tsNamespace: tsNamespace, + tsProxyImage: testProxyImage, + defaultTags: []string{"tag:test-tag"}, + tsFirewallMode: "auto", + + Client: fc, + tsClient: tsClient, + recorder: fr, + log: zl.Sugar(), + clock: cl, + authKeyRateLimits: make(map[string]*rate.Limiter), + authKeyReissuing: make(map[string]bool), + } + reconciler.ensureStateAddedForProxyGroup(pg) + + return reconciler, fc + } + + // Config Secret: exists or not, has key or not. + // State Secret: has device ID or not, requested reissue or not. + for name, tc := range map[string]struct { + configData map[string][]byte + stateData map[string][]byte + expectedAuthKey *string + expectReissue bool + }{ + "no_secrets_needs_new": { + expectedAuthKey: newAuthKey, // New ProxyGroup or manually cleared Pod. + }, + "no_config_secret_state_authed_ok": { + stateData: map[string][]byte{ + kubetypes.KeyDeviceID: []byte("nodeid-0"), + }, + expectedAuthKey: newAuthKey, // Always create an auth key if we're creating the config Secret. + }, + "config_secret_without_key_state_authed_with_reissue_needs_new": { + configData: configWith(nil), + stateData: map[string][]byte{ + kubetypes.KeyDeviceID: []byte("nodeid-0"), + kubetypes.KeyReissueAuthkey: []byte(""), + }, + expectedAuthKey: newAuthKey, + expectReissue: true, // Device is authed but reissue was requested. + }, + "config_secret_with_key_state_with_reissue_stale_ok": { + configData: configWith(existingAuthKey), + stateData: map[string][]byte{ + kubetypes.KeyReissueAuthkey: []byte("some-older-authkey"), + }, + expectedAuthKey: existingAuthKey, // Config's auth key is different from the one marked for reissue. + }, + "config_secret_with_key_state_with_reissue_existing_key_needs_new": { + configData: configWith(existingAuthKey), + stateData: map[string][]byte{ + kubetypes.KeyDeviceID: []byte("nodeid-0"), + kubetypes.KeyReissueAuthkey: []byte(*existingAuthKey), + }, + expectedAuthKey: newAuthKey, + expectReissue: true, // Current config's auth key is marked for reissue. + }, + "config_secret_without_key_no_state_ok": { + configData: configWith(nil), + expectedAuthKey: nil, // Proxy will set reissue_authkey and then next reconcile will reissue. + }, + "config_secret_without_key_state_authed_ok": { + configData: configWith(nil), + stateData: map[string][]byte{ + kubetypes.KeyDeviceID: []byte("nodeid-0"), + }, + expectedAuthKey: nil, // Device is already authed. + }, + "config_secret_with_key_state_authed_ok": { + configData: configWith(existingAuthKey), + stateData: map[string][]byte{ + kubetypes.KeyDeviceID: []byte("nodeid-0"), + }, + expectedAuthKey: nil, // Auth key getting removed because device is authed. + }, + "config_secret_with_key_no_state_keeps_existing": { + configData: configWith(existingAuthKey), + expectedAuthKey: existingAuthKey, // No state, waiting for containerboot to try the auth key. + }, + } { + t.Run(name, func(t *testing.T) { + tsClient.deleted = tsClient.deleted[:0] // Reset deleted devices for each test case. + reconciler, fc := initTest() + var cfgSecret *corev1.Secret + if tc.configData != nil { + cfgSecret = &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: pgConfigSecretName(pg.Name, 0), + Namespace: tsNamespace, + }, + Data: tc.configData, + } + } + if tc.stateData != nil { + mustCreate(t, fc, &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: pgStateSecretName(pg.Name, 0), + Namespace: tsNamespace, + }, + Data: tc.stateData, + }) + } + + authKey, err := reconciler.getAuthKey(t.Context(), tsClient, pg, cfgSecret, 0, reconciler.log.With("TestName", t.Name())) + if err != nil { + t.Fatalf("unexpected error getting auth key: %v", err) + } + if !reflect.DeepEqual(authKey, tc.expectedAuthKey) { + deref := func(s *string) string { + if s == nil { + return "" + } + return *s + } + t.Errorf("expected auth key %v, got %v", deref(tc.expectedAuthKey), deref(authKey)) + } + + // Use the device deletion as a proxy for the fact the new auth key + // was due to a reissue. + switch { + case tc.expectReissue && len(tsClient.deleted) != 1: + t.Errorf("expected 1 deleted device, got %v", tsClient.deleted) + case !tc.expectReissue && len(tsClient.deleted) != 0: + t.Errorf("expected no deleted devices, got %v", tsClient.deleted) + } + + if tc.expectReissue { + // Trigger the rate limit in a tight loop. Up to 100 iterations + // to allow for CI that is extremely slow, but should happen on + // first try for any reasonable machine. + stateSecretName := pgStateSecretName(pg.Name, 0) + for range 100 { + //NOTE: (ChaosInTheCRD) we added some protection here to avoid + // trying to reissue when already reissung. This overrides it. + reconciler.mu.Lock() + reconciler.authKeyReissuing[stateSecretName] = false + reconciler.mu.Unlock() + _, err := reconciler.getAuthKey(context.Background(), tsClient, pg, cfgSecret, 0, + reconciler.log.With("TestName", t.Name())) + if err != nil { + if !strings.Contains(err.Error(), "rate limit exceeded") { + t.Fatalf("unexpected error getting auth key: %v", err) + } + return // Expected rate limit error. + } + } + t.Fatal("expected rate limit error, but got none") + } + }) + } +} + func proxyClassesForLEStagingTest() (*tsapi.ProxyClass, *tsapi.ProxyClass, *tsapi.ProxyClass) { pcLEStaging := &tsapi.ProxyClass{ ObjectMeta: metav1.ObjectMeta{ @@ -1903,6 +2112,8 @@ type proxyGroupLETestCase struct { tsClient: &fakeTSClient{}, log: zl.Sugar(), clock: cl, + authKeyRateLimits: make(map[string]*rate.Limiter), + authKeyReissuing: make(map[string]bool), } expectReconciled(t, reconciler, "", pg.Name) diff --git a/cmd/k8s-operator/sts.go b/cmd/k8s-operator/sts.go index 5f33a9490..519f81fe0 100644 --- a/cmd/k8s-operator/sts.go +++ b/cmd/k8s-operator/sts.go @@ -1111,7 +1111,7 @@ func tailscaledConfig(stsC *tailscaleSTSConfig, loginUrl string, newAuthkey stri if newAuthkey != "" { conf.AuthKey = &newAuthkey - } else if shouldRetainAuthKey(oldSecret) { + } else if !deviceAuthed(oldSecret) { key, err := authKeyFromSecret(oldSecret) if err != nil { return nil, fmt.Errorf("error retrieving auth key from Secret: %w", err) @@ -1164,6 +1164,8 @@ func latestConfigFromSecret(s *corev1.Secret) (*ipn.ConfigVAlpha, error) { return conf, nil } +// authKeyFromSecret returns the auth key from the latest config version if +// found, or else nil. func authKeyFromSecret(s *corev1.Secret) (key *string, err error) { conf, err := latestConfigFromSecret(s) if err != nil { @@ -1180,13 +1182,13 @@ func authKeyFromSecret(s *corev1.Secret) (key *string, err error) { return key, nil } -// shouldRetainAuthKey returns true if the state stored in a proxy's state Secret suggests that auth key should be -// retained (because the proxy has not yet successfully authenticated). -func shouldRetainAuthKey(s *corev1.Secret) bool { +// deviceAuthed returns true if the state stored in a proxy's state Secret +// suggests that the proxy has successfully authenticated. +func deviceAuthed(s *corev1.Secret) bool { if s == nil { - return false // nothing to retain here + return false // No state Secret means no device state. } - return len(s.Data["device_id"]) == 0 // proxy has not authed yet + return len(s.Data["device_id"]) > 0 } func shouldAcceptRoutes(pc *tsapi.ProxyClass) bool { diff --git a/cmd/k8s-operator/testutils_test.go b/cmd/k8s-operator/testutils_test.go index d418f0128..36b608ef6 100644 --- a/cmd/k8s-operator/testutils_test.go +++ b/cmd/k8s-operator/testutils_test.go @@ -529,7 +529,7 @@ func expectedSecret(t *testing.T, cl client.Client, opts configOpts) *corev1.Sec AcceptDNS: "false", Hostname: &opts.hostname, Locked: "false", - AuthKey: new("secret-authkey"), + AuthKey: new("new-authkey"), AcceptRoutes: "false", AppConnector: &ipn.AppConnectorPrefs{Advertise: false}, NoStatefulFiltering: "true", @@ -859,7 +859,7 @@ func (c *fakeTSClient) CreateKey(ctx context.Context, caps tailscale.KeyCapabili Created: time.Now(), Capabilities: caps, } - return "secret-authkey", k, nil + return "new-authkey", k, nil } func (c *fakeTSClient) Device(ctx context.Context, deviceID string, fields *tailscale.DeviceFieldsOpts) (*tailscale.Device, error) { diff --git a/cmd/k8s-operator/tsrecorder_test.go b/cmd/k8s-operator/tsrecorder_test.go index 0e1641243..d3ebc3bd5 100644 --- a/cmd/k8s-operator/tsrecorder_test.go +++ b/cmd/k8s-operator/tsrecorder_test.go @@ -284,7 +284,7 @@ func expectRecorderResources(t *testing.T, fc client.WithWatch, tsr *tsapi.Recor } for replica := range replicas { - auth := tsrAuthSecret(tsr, tsNamespace, "secret-authkey", replica) + auth := tsrAuthSecret(tsr, tsNamespace, "new-authkey", replica) state := tsrStateSecret(tsr, tsNamespace, replica) if shouldExist { diff --git a/kube/kubetypes/types.go b/kube/kubetypes/types.go index 187f54f34..9f1b29064 100644 --- a/kube/kubetypes/types.go +++ b/kube/kubetypes/types.go @@ -38,17 +38,17 @@ // Keys that containerboot writes to state file that can be used to determine its state. // fields set in Tailscale state Secret. These are mostly used by the Tailscale Kubernetes operator to determine // the state of this tailscale device. - KeyDeviceID string = "device_id" // node stable ID of the device - KeyDeviceFQDN string = "device_fqdn" // device's tailnet hostname - KeyDeviceIPs string = "device_ips" // device's tailnet IPs - KeyPodUID string = "pod_uid" // Pod UID - // KeyCapVer contains Tailscale capability version of this proxy instance. - KeyCapVer string = "tailscale_capver" + KeyDeviceID = "device_id" // node stable ID of the device + KeyDeviceFQDN = "device_fqdn" // device's tailnet hostname + KeyDeviceIPs = "device_ips" // device's tailnet IPs + KeyPodUID = "pod_uid" // Pod UID + KeyCapVer = "tailscale_capver" // tailcfg.CurrentCapabilityVersion of this proxy instance. + KeyReissueAuthkey = "reissue_authkey" // Proxies will set this to the authkey that failed, or "no-authkey", if they can't log in. // KeyHTTPSEndpoint is a name of a field that can be set to the value of any HTTPS endpoint currently exposed by // this device to the tailnet. This is used by the Kubernetes operator Ingress proxy to communicate to the operator // that cluster workloads behind the Ingress can now be accessed via the given DNS name over HTTPS. - KeyHTTPSEndpoint string = "https_endpoint" - ValueNoHTTPS string = "no-https" + KeyHTTPSEndpoint = "https_endpoint" + ValueNoHTTPS = "no-https" // Pod's IPv4 address header key as returned by containerboot health check endpoint. PodIPv4Header string = "Pod-IPv4"