diff --git a/cmd/k8s-operator/e2e/ingress_test.go b/cmd/k8s-operator/e2e/ingress_test.go index 95fbbab9d..a136d2ad3 100644 --- a/cmd/k8s-operator/e2e/ingress_test.go +++ b/cmd/k8s-operator/e2e/ingress_test.go @@ -5,7 +5,6 @@ import ( "context" - "encoding/json" "fmt" "net/http" "testing" @@ -14,10 +13,6 @@ appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/yaml" "tailscale.com/cmd/testwrapper/flakytest" kube "tailscale.com/k8s-operator" @@ -90,81 +85,20 @@ func TestIngress(t *testing.T) { } createAndCleanup(t, kubeClient, svc) - // TODO(tomhjp): Delete once we've reproduced the flake with this extra info. - t0 := time.Now() - watcherCtx, cancelWatcher := context.WithCancel(t.Context()) - defer cancelWatcher() - go func() { - // client-go client for logs. - clientGoKubeClient, err := kubernetes.NewForConfig(restCfg) - if err != nil { - t.Logf("error creating client-go Kubernetes client: %v", err) - return - } - - for { - select { - case <-watcherCtx.Done(): - t.Logf("stopping watcher after %v", time.Since(t0)) - return - case <-time.After(time.Minute): - t.Logf("dumping info after %v elapsed", time.Since(t0)) - // Service itself. - svc := &corev1.Service{ObjectMeta: objectMeta("default", "test-ingress")} - err := get(watcherCtx, kubeClient, svc) - svcYaml, _ := yaml.Marshal(svc) - t.Logf("Service: %s, error: %v\n%s", svc.Name, err, string(svcYaml)) - - // Pods in tailscale namespace. - var pods corev1.PodList - if err := kubeClient.List(watcherCtx, &pods, client.InNamespace("tailscale")); err != nil { - t.Logf("error listing Pods in tailscale namespace: %v", err) - } else { - t.Logf("%d Pods", len(pods.Items)) - for _, pod := range pods.Items { - podYaml, _ := yaml.Marshal(pod) - t.Logf("Pod: %s\n%s", pod.Name, string(podYaml)) - logs := clientGoKubeClient.CoreV1().Pods("tailscale").GetLogs(pod.Name, &corev1.PodLogOptions{}).Do(watcherCtx) - logData, err := logs.Raw() - if err != nil { - t.Logf("error reading logs for Pod %s: %v", pod.Name, err) - continue - } - t.Logf("Logs for Pod %s:\n%s", pod.Name, string(logData)) - } - } - - // Tailscale status on the tailnet. - lc, err := tnClient.LocalClient() - if err != nil { - t.Logf("error getting tailnet local client: %v", err) - } else { - status, err := lc.Status(watcherCtx) - statusJSON, _ := json.MarshalIndent(status, "", " ") - t.Logf("Tailnet status: %s, error: %v", string(statusJSON), err) - } - } - } - }() - - // TODO: instead of timing out only when test times out, cancel context after 60s or so. - if err := wait.PollUntilContextCancel(t.Context(), time.Millisecond*100, true, func(ctx context.Context) (done bool, err error) { - if time.Since(t0) > time.Minute { - t.Logf("%v elapsed waiting for Service default/test-ingress to become Ready", time.Since(t0)) - } + if err := tstest.WaitFor(time.Minute, func() error { maybeReadySvc := &corev1.Service{ObjectMeta: objectMeta("default", "test-ingress")} - if err := get(ctx, kubeClient, maybeReadySvc); err != nil { - return false, err + if err := get(t.Context(), kubeClient, maybeReadySvc); err != nil { + return err } isReady := kube.SvcIsReady(maybeReadySvc) if isReady { t.Log("Service is ready") + return nil } - return isReady, nil + return fmt.Errorf("Service is not ready yet") }); err != nil { t.Fatalf("error waiting for the Service to become Ready: %v", err) } - cancelWatcher() var resp *http.Response if err := tstest.WaitFor(time.Minute, func() error { diff --git a/cmd/k8s-operator/e2e/setup.go b/cmd/k8s-operator/e2e/setup.go index c4fd45d3e..e3d7ed89b 100644 --- a/cmd/k8s-operator/e2e/setup.go +++ b/cmd/k8s-operator/e2e/setup.go @@ -56,6 +56,7 @@ "tailscale.com/internal/client/tailscale" "tailscale.com/ipn" "tailscale.com/ipn/store/mem" + tsoperator "tailscale.com/k8s-operator" tsapi "tailscale.com/k8s-operator/apis/v1alpha1" "tailscale.com/tsnet" ) @@ -438,7 +439,7 @@ func runTests(m *testing.M) (int, error) { return 0, fmt.Errorf("failed to install %q via helm: %w", relName, err) } - if err := applyDefaultProxyClass(ctx, kubeClient); err != nil { + if err := applyDefaultProxyClass(ctx, logger, kubeClient); err != nil { return 0, fmt.Errorf("failed to apply default ProxyClass: %w", err) } @@ -537,7 +538,7 @@ func tagForRepo(dir string) (string, error) { return tag, nil } -func applyDefaultProxyClass(ctx context.Context, cl client.Client) error { +func applyDefaultProxyClass(ctx context.Context, logger *zap.SugaredLogger, cl client.Client) error { pc := &tsapi.ProxyClass{ TypeMeta: metav1.TypeMeta{ APIVersion: tsapi.SchemeGroupVersion.String(), @@ -565,6 +566,24 @@ func applyDefaultProxyClass(ctx context.Context, cl client.Client) error { return fmt.Errorf("failed to apply default ProxyClass: %w", err) } + // Wait for the ProxyClass to be marked ready. + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + for { + if err := cl.Get(ctx, client.ObjectKeyFromObject(pc), pc); err != nil { + return fmt.Errorf("failed to get default ProxyClass: %w", err) + } + if tsoperator.ProxyClassIsReady(pc) { + break + } + logger.Info("waiting for default ProxyClass to be ready...") + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for default ProxyClass to be ready") + case <-time.After(time.Second): + } + } + return nil } diff --git a/cmd/k8s-operator/operator.go b/cmd/k8s-operator/operator.go index 1060c6f3d..d353c5333 100644 --- a/cmd/k8s-operator/operator.go +++ b/cmd/k8s-operator/operator.go @@ -77,6 +77,12 @@ // Generate CRD API docs. //go:generate go run github.com/elastic/crd-ref-docs --renderer=markdown --source-path=../../k8s-operator/apis/ --config=../../k8s-operator/api-docs-config.yaml --output-path=../../k8s-operator/api.md +const ( + indexServiceProxyClass = ".metadata.annotations.service-proxy-class" + indexServiceExposed = ".metadata.annotations.service-expose" + indexServiceType = ".metadata.annotations.service-type" +) + func main() { // Required to use our client API. We're fine with the instability since the // client lives in the same repo as this code. @@ -351,7 +357,12 @@ func runReconcilers(opts reconcilerOpts) { svcChildFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("svc")) // If a ProxyClass changes, enqueue all Services labeled with that // ProxyClass's name. - proxyClassFilterForSvc := handler.EnqueueRequestsFromMapFunc(proxyClassHandlerForSvc(mgr.GetClient(), startlog)) + proxyClassFilterForSvc := handler.EnqueueRequestsFromMapFunc(proxyClassHandlerForSvc( + mgr.GetClient(), + startlog, + opts.defaultProxyClass, + opts.proxyActAsDefaultLoadBalancer, + )) eventRecorder := mgr.GetEventRecorderFor("tailscale-operator") ssr := &tailscaleSTSReconciler{ @@ -389,6 +400,18 @@ func runReconcilers(opts reconcilerOpts) { if err := mgr.GetFieldIndexer().IndexField(context.Background(), new(corev1.Service), indexServiceProxyClass, indexProxyClass); err != nil { startlog.Fatalf("failed setting up ProxyClass indexer for Services: %v", err) } + if opts.defaultProxyClass != "" { + // If a default ProxyClass is specified, we'll need to list all objects + // that could be affected. For L3 ingress, this is Services with the + // "tailscale.com/expose" annotation and LoadBalancer services (either + // with the loadBalancerClass "tailscale", or unset if we're the default). + if err := mgr.GetFieldIndexer().IndexField(context.Background(), new(corev1.Service), indexServiceExposed, indexExposed); err != nil { + startlog.Fatalf("failed setting up exposed indexer for Services: %v", err) + } + if err := mgr.GetFieldIndexer().IndexField(context.Background(), new(corev1.Service), indexServiceType, indexType); err != nil { + startlog.Fatalf("failed setting up type indexer for Services: %v", err) + } + } ingressChildFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("ingress")) // If a ProxyClassChanges, enqueue all Ingresses labeled with that @@ -910,10 +933,27 @@ func indexProxyClass(o client.Object) []string { return []string{o.GetAnnotations()[LabelAnnotationProxyClass]} } +func indexExposed(o client.Object) []string { + if o.GetAnnotations()[AnnotationExpose] != "true" { + return nil + } + + return []string{o.GetAnnotations()[AnnotationExpose]} +} + +func indexType(o client.Object) []string { + svc, ok := o.(*corev1.Service) + if !ok { + return nil + } + + return []string{string(svc.Spec.Type)} +} + // proxyClassHandlerForSvc returns a handler that, for a given ProxyClass, // returns a list of reconcile requests for all Services labeled with // tailscale.com/proxy-class: . -func proxyClassHandlerForSvc(cl client.Client, logger *zap.SugaredLogger) handler.MapFunc { +func proxyClassHandlerForSvc(cl client.Client, logger *zap.SugaredLogger, defaultProxyClass string, isDefaultLoadBalancer bool) handler.MapFunc { return func(ctx context.Context, o client.Object) []reconcile.Request { svcList := new(corev1.ServiceList) labels := map[string]string{ @@ -932,13 +972,12 @@ func proxyClassHandlerForSvc(cl client.Client, logger *zap.SugaredLogger) handle seenSvcs.Add(fmt.Sprintf("%s/%s", svc.Namespace, svc.Name)) } - svcAnnotationList := new(corev1.ServiceList) - if err := cl.List(ctx, svcAnnotationList, client.MatchingFields{indexServiceProxyClass: o.GetName()}); err != nil { + if err := cl.List(ctx, svcList, client.MatchingFields{indexServiceProxyClass: o.GetName()}); err != nil { logger.Debugf("error listing Services for ProxyClass: %v", err) return nil } - for _, svc := range svcAnnotationList.Items { + for _, svc := range svcList.Items { nsname := fmt.Sprintf("%s/%s", svc.Namespace, svc.Name) if seenSvcs.Contains(nsname) { continue @@ -948,6 +987,36 @@ func proxyClassHandlerForSvc(cl client.Client, logger *zap.SugaredLogger) handle seenSvcs.Add(nsname) } + if o.GetName() == defaultProxyClass { + // For the default ProxyClass, we also need to reconcile all exposed + // Services that don't have an explicit ProxyClass set. + for _, matcher := range []client.ListOption{ + client.MatchingFields{indexServiceExposed: "true"}, + client.MatchingFields{indexServiceType: string(corev1.ServiceTypeLoadBalancer)}, + } { + if err := cl.List(ctx, svcList, matcher); err != nil { + logger.Debugf("error listing exposed Services for ProxyClass: %v", err) + return nil + } + + for _, svc := range svcList.Items { + if hasProxyClassAnnotation(&svc) { + continue + } + if !shouldExpose(&svc, isDefaultLoadBalancer) { + continue + } + nsname := fmt.Sprintf("%s/%s", svc.Namespace, svc.Name) + if seenSvcs.Contains(nsname) { + continue + } + + reqs = append(reqs, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(&svc)}) + seenSvcs.Add(nsname) + } + } + } + return reqs } } diff --git a/cmd/k8s-operator/svc.go b/cmd/k8s-operator/svc.go index 31be22aa1..6f12148c8 100644 --- a/cmd/k8s-operator/svc.go +++ b/cmd/k8s-operator/svc.go @@ -42,8 +42,6 @@ reasonProxyInvalid = "ProxyInvalid" reasonProxyFailed = "ProxyFailed" reasonProxyPending = "ProxyPending" - - indexServiceProxyClass = ".metadata.annotations.service-proxy-class" ) type ServiceReconciler struct { @@ -97,7 +95,7 @@ func childResourceLabels(name, ns, typ string) map[string]string { func (a *ServiceReconciler) isTailscaleService(svc *corev1.Service) bool { targetIP := tailnetTargetAnnotation(svc) targetFQDN := svc.Annotations[AnnotationTailnetTargetFQDN] - return a.shouldExpose(svc) || targetIP != "" || targetFQDN != "" + return shouldExpose(svc, a.isDefaultLoadBalancer) || targetIP != "" || targetFQDN != "" } func (a *ServiceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) { @@ -164,7 +162,7 @@ func (a *ServiceReconciler) maybeCleanup(ctx context.Context, logger *zap.Sugare } proxyTyp := proxyTypeEgress - if a.shouldExpose(svc) { + if shouldExpose(svc, a.isDefaultLoadBalancer) { proxyTyp = proxyTypeIngressService } @@ -275,16 +273,16 @@ func (a *ServiceReconciler) maybeProvision(ctx context.Context, logger *zap.Suga LoginServer: a.ssr.loginServer, } sts.proxyType = proxyTypeEgress - if a.shouldExpose(svc) { + if shouldExpose(svc, a.isDefaultLoadBalancer) { sts.proxyType = proxyTypeIngressService } a.mu.Lock() - if a.shouldExposeClusterIP(svc) { + if shouldExposeClusterIP(svc, a.isDefaultLoadBalancer) { sts.ClusterTargetIP = svc.Spec.ClusterIP a.managedIngressProxies.Add(svc.UID) gaugeIngressProxies.Set(int64(a.managedIngressProxies.Len())) - } else if a.shouldExposeDNSName(svc) { + } else if shouldExposeDNSName(svc) { sts.ClusterTargetDNSName = svc.Spec.ExternalName a.managedIngressProxies.Add(svc.UID) gaugeIngressProxies.Set(int64(a.managedIngressProxies.Len())) @@ -410,19 +408,19 @@ func validateService(svc *corev1.Service) []string { return violations } -func (a *ServiceReconciler) shouldExpose(svc *corev1.Service) bool { - return a.shouldExposeClusterIP(svc) || a.shouldExposeDNSName(svc) +func shouldExpose(svc *corev1.Service, isDefaultLoadBalancer bool) bool { + return shouldExposeClusterIP(svc, isDefaultLoadBalancer) || shouldExposeDNSName(svc) } -func (a *ServiceReconciler) shouldExposeDNSName(svc *corev1.Service) bool { +func shouldExposeDNSName(svc *corev1.Service) bool { return hasExposeAnnotation(svc) && svc.Spec.Type == corev1.ServiceTypeExternalName && svc.Spec.ExternalName != "" } -func (a *ServiceReconciler) shouldExposeClusterIP(svc *corev1.Service) bool { +func shouldExposeClusterIP(svc *corev1.Service, isDefaultLoadBalancer bool) bool { if svc.Spec.ClusterIP == "" { return false } - return isTailscaleLoadBalancerService(svc, a.isDefaultLoadBalancer) || hasExposeAnnotation(svc) + return isTailscaleLoadBalancerService(svc, isDefaultLoadBalancer) || hasExposeAnnotation(svc) } func isTailscaleLoadBalancerService(svc *corev1.Service, isDefaultLoadBalancer bool) bool { diff --git a/cmd/k8s-operator/svc_test.go b/cmd/k8s-operator/svc_test.go new file mode 100644 index 000000000..3a6ea044d --- /dev/null +++ b/cmd/k8s-operator/svc_test.go @@ -0,0 +1,221 @@ +// Copyright (c) Tailscale Inc & contributors +// SPDX-License-Identifier: BSD-3-Clause + +//go:build !plan9 + +package main + +import ( + "context" + "slices" + "testing" + + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + tsapi "tailscale.com/k8s-operator/apis/v1alpha1" + "tailscale.com/kube/kubetypes" + "tailscale.com/tstest" +) + +func TestService_DefaultProxyClassInitiallyNotReady(t *testing.T) { + pc := &tsapi.ProxyClass{ + ObjectMeta: metav1.ObjectMeta{Name: "custom-metadata"}, + Spec: tsapi.ProxyClassSpec{ + TailscaleConfig: &tsapi.TailscaleConfig{ + AcceptRoutes: true, + }, + StatefulSet: &tsapi.StatefulSet{ + Labels: tsapi.Labels{"foo": "bar"}, + Annotations: map[string]string{"bar.io/foo": "some-val"}, + Pod: &tsapi.Pod{Annotations: map[string]string{"foo.io/bar": "some-val"}}, + }, + }, + } + fc := fake.NewClientBuilder(). + WithScheme(tsapi.GlobalScheme). + WithObjects(pc). + WithStatusSubresource(pc). + Build() + ft := &fakeTSClient{} + zl := zap.Must(zap.NewDevelopment()) + clock := tstest.NewClock(tstest.ClockOpts{}) + sr := &ServiceReconciler{ + Client: fc, + ssr: &tailscaleSTSReconciler{ + Client: fc, + tsClient: ft, + defaultTags: []string{"tag:k8s"}, + operatorNamespace: "operator-ns", + proxyImage: "tailscale/tailscale", + }, + defaultProxyClass: "custom-metadata", + logger: zl.Sugar(), + clock: clock, + } + + // 1. A new tailscale LoadBalancer Service is created but the default + // ProxyClass is not ready yet. + mustCreate(t, fc, &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "default", + // The apiserver is supposed to set the UID, but the fake client + // doesn't. So, set it explicitly because other code later depends + // on it being set. + UID: types.UID("1234-UID"), + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.20.30.40", + Type: corev1.ServiceTypeLoadBalancer, + LoadBalancerClass: new("tailscale"), + }, + }) + expectReconciled(t, sr, "default", "test") + labels := map[string]string{ + kubetypes.LabelManaged: "true", + LabelParentName: "test", + LabelParentNamespace: "operator-ns", + LabelParentType: "svc", + } + s, err := getSingleObject[corev1.Secret](context.Background(), fc, "operator-ns", labels) + if err != nil { + t.Fatalf("finding Secret for %q: %v", "test", err) + } + if s != nil { + t.Fatalf("expected no Secret to be created when default ProxyClass is not ready, but found one: %v", s) + } + + // 2. ProxyClass is set to Ready, the Service can become ready now. + mustUpdateStatus(t, fc, "", "custom-metadata", func(pc *tsapi.ProxyClass) { + pc.Status = tsapi.ProxyClassStatus{ + Conditions: []metav1.Condition{{ + Status: metav1.ConditionTrue, + Type: string(tsapi.ProxyClassReady), + ObservedGeneration: pc.Generation, + }}, + } + }) + expectReconciled(t, sr, "default", "test") + fullName, shortName := findGenName(t, fc, "default", "test", "svc") + opts := configOpts{ + replicas: new(int32(1)), + stsName: shortName, + secretName: fullName, + namespace: "default", + parentType: "svc", + hostname: "default-test", + clusterTargetIP: "10.20.30.40", + app: kubetypes.AppIngressProxy, + proxyClass: pc.Name, + } + expectEqual(t, fc, expectedSecret(t, fc, opts)) + expectEqual(t, fc, expectedHeadlessService(shortName, "svc")) + expectEqual(t, fc, expectedSTS(t, fc, opts), removeResourceReqs) +} + +func TestProxyClassHandlerForSvc(t *testing.T) { + svc := func(name string, annotations, labels map[string]string) *corev1.Service { + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + Annotations: annotations, + Labels: labels, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "1.2.3.4", + }, + } + } + lbSvc := func(name string, annotations map[string]string, class *string) *corev1.Service { + return &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "foo", + Annotations: annotations, + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeLoadBalancer, + LoadBalancerClass: class, + ClusterIP: "1.2.3.4", + }, + } + } + + const ( + defaultPCName = "default-proxyclass" + otherPCName = "other-proxyclass" + unreferencedPCName = "unreferenced-proxyclass" + ) + fc := fake.NewClientBuilder(). + WithScheme(tsapi.GlobalScheme). + WithIndex(&corev1.Service{}, indexServiceProxyClass, indexProxyClass). + WithIndex(&corev1.Service{}, indexServiceExposed, indexExposed). + WithIndex(&corev1.Service{}, indexServiceType, indexType). + WithObjects( + svc("not-exposed", nil, nil), + svc("exposed-default", map[string]string{AnnotationExpose: "true"}, nil), + svc("exposed-other", map[string]string{AnnotationExpose: "true", LabelAnnotationProxyClass: otherPCName}, nil), + svc("annotated", map[string]string{LabelAnnotationProxyClass: defaultPCName}, nil), + svc("labelled", nil, map[string]string{LabelAnnotationProxyClass: defaultPCName}), + lbSvc("lb-svc", nil, new("tailscale")), + lbSvc("lb-svc-no-class", nil, nil), + lbSvc("lb-svc-other-class", nil, new("other")), + lbSvc("lb-svc-other-pc", map[string]string{LabelAnnotationProxyClass: otherPCName}, nil), + ). + Build() + + zl := zap.Must(zap.NewDevelopment()) + mapFunc := proxyClassHandlerForSvc(fc, zl.Sugar(), defaultPCName, true) + + for _, tc := range []struct { + name string + proxyClassName string + expected []reconcile.Request + }{ + { + name: "default_ProxyClass", + proxyClassName: defaultPCName, + expected: []reconcile.Request{ + {NamespacedName: types.NamespacedName{Namespace: "default", Name: "exposed-default"}}, + {NamespacedName: types.NamespacedName{Namespace: "default", Name: "annotated"}}, + {NamespacedName: types.NamespacedName{Namespace: "default", Name: "labelled"}}, + {NamespacedName: types.NamespacedName{Namespace: "foo", Name: "lb-svc"}}, + {NamespacedName: types.NamespacedName{Namespace: "foo", Name: "lb-svc-no-class"}}, + }, + }, + { + name: "other_ProxyClass", + proxyClassName: otherPCName, + expected: []reconcile.Request{ + {NamespacedName: types.NamespacedName{Namespace: "default", Name: "exposed-other"}}, + {NamespacedName: types.NamespacedName{Namespace: "foo", Name: "lb-svc-other-pc"}}, + }, + }, + { + name: "unreferenced_ProxyClass", + proxyClassName: unreferencedPCName, + expected: nil, + }, + } { + t.Run(tc.name, func(t *testing.T) { + reqs := mapFunc(t.Context(), &tsapi.ProxyClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: tc.proxyClassName, + }, + }) + if len(reqs) != len(tc.expected) { + t.Fatalf("expected %d requests, got %d: %v", len(tc.expected), len(reqs), reqs) + } + for _, expected := range tc.expected { + if !slices.Contains(reqs, expected) { + t.Errorf("expected request for Service %q not found in results: %v", expected.Name, reqs) + } + } + }) + } +}