cmd/k8s-operator: fix Service reconcile triggers for default ProxyClass (#18983)

The e2e ingress test was very occasionally flaky. On looking at operator
logs from one failure, you can see the default ProxyClass was not ready
before the first reconcile loop for the exposed Service. The ProxyClass
became ready soon after, but no additional reconciles were triggered for
the exposed Service because we only triggered reconciles for Services
that explicitly named their ProxyClass.

This change adds additional list API calls for when it's the default
ProxyClass that's been updated in order to catch Services that use it by
default. It also adds indexes for the fields we need to search on to
ensure the list is efficient.

Fixes tailscale/corp#37533

Signed-off-by: Tom Proctor <tomhjp@users.noreply.github.com>
This commit is contained in:
Tom Proctor
2026-03-13 14:31:16 +00:00
committed by GitHub
parent dd480f0fb9
commit 621f71981c
5 changed files with 331 additions and 90 deletions

View File

@@ -5,7 +5,6 @@
import (
"context"
"encoding/json"
"fmt"
"net/http"
"testing"
@@ -14,10 +13,6 @@
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/yaml"
"tailscale.com/cmd/testwrapper/flakytest"
kube "tailscale.com/k8s-operator"
@@ -90,81 +85,20 @@ func TestIngress(t *testing.T) {
}
createAndCleanup(t, kubeClient, svc)
// TODO(tomhjp): Delete once we've reproduced the flake with this extra info.
t0 := time.Now()
watcherCtx, cancelWatcher := context.WithCancel(t.Context())
defer cancelWatcher()
go func() {
// client-go client for logs.
clientGoKubeClient, err := kubernetes.NewForConfig(restCfg)
if err != nil {
t.Logf("error creating client-go Kubernetes client: %v", err)
return
}
for {
select {
case <-watcherCtx.Done():
t.Logf("stopping watcher after %v", time.Since(t0))
return
case <-time.After(time.Minute):
t.Logf("dumping info after %v elapsed", time.Since(t0))
// Service itself.
svc := &corev1.Service{ObjectMeta: objectMeta("default", "test-ingress")}
err := get(watcherCtx, kubeClient, svc)
svcYaml, _ := yaml.Marshal(svc)
t.Logf("Service: %s, error: %v\n%s", svc.Name, err, string(svcYaml))
// Pods in tailscale namespace.
var pods corev1.PodList
if err := kubeClient.List(watcherCtx, &pods, client.InNamespace("tailscale")); err != nil {
t.Logf("error listing Pods in tailscale namespace: %v", err)
} else {
t.Logf("%d Pods", len(pods.Items))
for _, pod := range pods.Items {
podYaml, _ := yaml.Marshal(pod)
t.Logf("Pod: %s\n%s", pod.Name, string(podYaml))
logs := clientGoKubeClient.CoreV1().Pods("tailscale").GetLogs(pod.Name, &corev1.PodLogOptions{}).Do(watcherCtx)
logData, err := logs.Raw()
if err != nil {
t.Logf("error reading logs for Pod %s: %v", pod.Name, err)
continue
}
t.Logf("Logs for Pod %s:\n%s", pod.Name, string(logData))
}
}
// Tailscale status on the tailnet.
lc, err := tnClient.LocalClient()
if err != nil {
t.Logf("error getting tailnet local client: %v", err)
} else {
status, err := lc.Status(watcherCtx)
statusJSON, _ := json.MarshalIndent(status, "", " ")
t.Logf("Tailnet status: %s, error: %v", string(statusJSON), err)
}
}
}
}()
// TODO: instead of timing out only when test times out, cancel context after 60s or so.
if err := wait.PollUntilContextCancel(t.Context(), time.Millisecond*100, true, func(ctx context.Context) (done bool, err error) {
if time.Since(t0) > time.Minute {
t.Logf("%v elapsed waiting for Service default/test-ingress to become Ready", time.Since(t0))
}
if err := tstest.WaitFor(time.Minute, func() error {
maybeReadySvc := &corev1.Service{ObjectMeta: objectMeta("default", "test-ingress")}
if err := get(ctx, kubeClient, maybeReadySvc); err != nil {
return false, err
if err := get(t.Context(), kubeClient, maybeReadySvc); err != nil {
return err
}
isReady := kube.SvcIsReady(maybeReadySvc)
if isReady {
t.Log("Service is ready")
return nil
}
return isReady, nil
return fmt.Errorf("Service is not ready yet")
}); err != nil {
t.Fatalf("error waiting for the Service to become Ready: %v", err)
}
cancelWatcher()
var resp *http.Response
if err := tstest.WaitFor(time.Minute, func() error {

View File

@@ -56,6 +56,7 @@
"tailscale.com/internal/client/tailscale"
"tailscale.com/ipn"
"tailscale.com/ipn/store/mem"
tsoperator "tailscale.com/k8s-operator"
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
"tailscale.com/tsnet"
)
@@ -438,7 +439,7 @@ func runTests(m *testing.M) (int, error) {
return 0, fmt.Errorf("failed to install %q via helm: %w", relName, err)
}
if err := applyDefaultProxyClass(ctx, kubeClient); err != nil {
if err := applyDefaultProxyClass(ctx, logger, kubeClient); err != nil {
return 0, fmt.Errorf("failed to apply default ProxyClass: %w", err)
}
@@ -537,7 +538,7 @@ func tagForRepo(dir string) (string, error) {
return tag, nil
}
func applyDefaultProxyClass(ctx context.Context, cl client.Client) error {
func applyDefaultProxyClass(ctx context.Context, logger *zap.SugaredLogger, cl client.Client) error {
pc := &tsapi.ProxyClass{
TypeMeta: metav1.TypeMeta{
APIVersion: tsapi.SchemeGroupVersion.String(),
@@ -565,6 +566,24 @@ func applyDefaultProxyClass(ctx context.Context, cl client.Client) error {
return fmt.Errorf("failed to apply default ProxyClass: %w", err)
}
// Wait for the ProxyClass to be marked ready.
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
for {
if err := cl.Get(ctx, client.ObjectKeyFromObject(pc), pc); err != nil {
return fmt.Errorf("failed to get default ProxyClass: %w", err)
}
if tsoperator.ProxyClassIsReady(pc) {
break
}
logger.Info("waiting for default ProxyClass to be ready...")
select {
case <-ctx.Done():
return fmt.Errorf("timeout waiting for default ProxyClass to be ready")
case <-time.After(time.Second):
}
}
return nil
}

View File

@@ -77,6 +77,12 @@
// Generate CRD API docs.
//go:generate go run github.com/elastic/crd-ref-docs --renderer=markdown --source-path=../../k8s-operator/apis/ --config=../../k8s-operator/api-docs-config.yaml --output-path=../../k8s-operator/api.md
const (
indexServiceProxyClass = ".metadata.annotations.service-proxy-class"
indexServiceExposed = ".metadata.annotations.service-expose"
indexServiceType = ".metadata.annotations.service-type"
)
func main() {
// Required to use our client API. We're fine with the instability since the
// client lives in the same repo as this code.
@@ -351,7 +357,12 @@ func runReconcilers(opts reconcilerOpts) {
svcChildFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("svc"))
// If a ProxyClass changes, enqueue all Services labeled with that
// ProxyClass's name.
proxyClassFilterForSvc := handler.EnqueueRequestsFromMapFunc(proxyClassHandlerForSvc(mgr.GetClient(), startlog))
proxyClassFilterForSvc := handler.EnqueueRequestsFromMapFunc(proxyClassHandlerForSvc(
mgr.GetClient(),
startlog,
opts.defaultProxyClass,
opts.proxyActAsDefaultLoadBalancer,
))
eventRecorder := mgr.GetEventRecorderFor("tailscale-operator")
ssr := &tailscaleSTSReconciler{
@@ -389,6 +400,18 @@ func runReconcilers(opts reconcilerOpts) {
if err := mgr.GetFieldIndexer().IndexField(context.Background(), new(corev1.Service), indexServiceProxyClass, indexProxyClass); err != nil {
startlog.Fatalf("failed setting up ProxyClass indexer for Services: %v", err)
}
if opts.defaultProxyClass != "" {
// If a default ProxyClass is specified, we'll need to list all objects
// that could be affected. For L3 ingress, this is Services with the
// "tailscale.com/expose" annotation and LoadBalancer services (either
// with the loadBalancerClass "tailscale", or unset if we're the default).
if err := mgr.GetFieldIndexer().IndexField(context.Background(), new(corev1.Service), indexServiceExposed, indexExposed); err != nil {
startlog.Fatalf("failed setting up exposed indexer for Services: %v", err)
}
if err := mgr.GetFieldIndexer().IndexField(context.Background(), new(corev1.Service), indexServiceType, indexType); err != nil {
startlog.Fatalf("failed setting up type indexer for Services: %v", err)
}
}
ingressChildFilter := handler.EnqueueRequestsFromMapFunc(managedResourceHandlerForType("ingress"))
// If a ProxyClassChanges, enqueue all Ingresses labeled with that
@@ -910,10 +933,27 @@ func indexProxyClass(o client.Object) []string {
return []string{o.GetAnnotations()[LabelAnnotationProxyClass]}
}
func indexExposed(o client.Object) []string {
if o.GetAnnotations()[AnnotationExpose] != "true" {
return nil
}
return []string{o.GetAnnotations()[AnnotationExpose]}
}
func indexType(o client.Object) []string {
svc, ok := o.(*corev1.Service)
if !ok {
return nil
}
return []string{string(svc.Spec.Type)}
}
// proxyClassHandlerForSvc returns a handler that, for a given ProxyClass,
// returns a list of reconcile requests for all Services labeled with
// tailscale.com/proxy-class: <proxy class name>.
func proxyClassHandlerForSvc(cl client.Client, logger *zap.SugaredLogger) handler.MapFunc {
func proxyClassHandlerForSvc(cl client.Client, logger *zap.SugaredLogger, defaultProxyClass string, isDefaultLoadBalancer bool) handler.MapFunc {
return func(ctx context.Context, o client.Object) []reconcile.Request {
svcList := new(corev1.ServiceList)
labels := map[string]string{
@@ -932,13 +972,12 @@ func proxyClassHandlerForSvc(cl client.Client, logger *zap.SugaredLogger) handle
seenSvcs.Add(fmt.Sprintf("%s/%s", svc.Namespace, svc.Name))
}
svcAnnotationList := new(corev1.ServiceList)
if err := cl.List(ctx, svcAnnotationList, client.MatchingFields{indexServiceProxyClass: o.GetName()}); err != nil {
if err := cl.List(ctx, svcList, client.MatchingFields{indexServiceProxyClass: o.GetName()}); err != nil {
logger.Debugf("error listing Services for ProxyClass: %v", err)
return nil
}
for _, svc := range svcAnnotationList.Items {
for _, svc := range svcList.Items {
nsname := fmt.Sprintf("%s/%s", svc.Namespace, svc.Name)
if seenSvcs.Contains(nsname) {
continue
@@ -948,6 +987,36 @@ func proxyClassHandlerForSvc(cl client.Client, logger *zap.SugaredLogger) handle
seenSvcs.Add(nsname)
}
if o.GetName() == defaultProxyClass {
// For the default ProxyClass, we also need to reconcile all exposed
// Services that don't have an explicit ProxyClass set.
for _, matcher := range []client.ListOption{
client.MatchingFields{indexServiceExposed: "true"},
client.MatchingFields{indexServiceType: string(corev1.ServiceTypeLoadBalancer)},
} {
if err := cl.List(ctx, svcList, matcher); err != nil {
logger.Debugf("error listing exposed Services for ProxyClass: %v", err)
return nil
}
for _, svc := range svcList.Items {
if hasProxyClassAnnotation(&svc) {
continue
}
if !shouldExpose(&svc, isDefaultLoadBalancer) {
continue
}
nsname := fmt.Sprintf("%s/%s", svc.Namespace, svc.Name)
if seenSvcs.Contains(nsname) {
continue
}
reqs = append(reqs, reconcile.Request{NamespacedName: client.ObjectKeyFromObject(&svc)})
seenSvcs.Add(nsname)
}
}
}
return reqs
}
}

View File

@@ -42,8 +42,6 @@
reasonProxyInvalid = "ProxyInvalid"
reasonProxyFailed = "ProxyFailed"
reasonProxyPending = "ProxyPending"
indexServiceProxyClass = ".metadata.annotations.service-proxy-class"
)
type ServiceReconciler struct {
@@ -97,7 +95,7 @@ func childResourceLabels(name, ns, typ string) map[string]string {
func (a *ServiceReconciler) isTailscaleService(svc *corev1.Service) bool {
targetIP := tailnetTargetAnnotation(svc)
targetFQDN := svc.Annotations[AnnotationTailnetTargetFQDN]
return a.shouldExpose(svc) || targetIP != "" || targetFQDN != ""
return shouldExpose(svc, a.isDefaultLoadBalancer) || targetIP != "" || targetFQDN != ""
}
func (a *ServiceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
@@ -164,7 +162,7 @@ func (a *ServiceReconciler) maybeCleanup(ctx context.Context, logger *zap.Sugare
}
proxyTyp := proxyTypeEgress
if a.shouldExpose(svc) {
if shouldExpose(svc, a.isDefaultLoadBalancer) {
proxyTyp = proxyTypeIngressService
}
@@ -275,16 +273,16 @@ func (a *ServiceReconciler) maybeProvision(ctx context.Context, logger *zap.Suga
LoginServer: a.ssr.loginServer,
}
sts.proxyType = proxyTypeEgress
if a.shouldExpose(svc) {
if shouldExpose(svc, a.isDefaultLoadBalancer) {
sts.proxyType = proxyTypeIngressService
}
a.mu.Lock()
if a.shouldExposeClusterIP(svc) {
if shouldExposeClusterIP(svc, a.isDefaultLoadBalancer) {
sts.ClusterTargetIP = svc.Spec.ClusterIP
a.managedIngressProxies.Add(svc.UID)
gaugeIngressProxies.Set(int64(a.managedIngressProxies.Len()))
} else if a.shouldExposeDNSName(svc) {
} else if shouldExposeDNSName(svc) {
sts.ClusterTargetDNSName = svc.Spec.ExternalName
a.managedIngressProxies.Add(svc.UID)
gaugeIngressProxies.Set(int64(a.managedIngressProxies.Len()))
@@ -410,19 +408,19 @@ func validateService(svc *corev1.Service) []string {
return violations
}
func (a *ServiceReconciler) shouldExpose(svc *corev1.Service) bool {
return a.shouldExposeClusterIP(svc) || a.shouldExposeDNSName(svc)
func shouldExpose(svc *corev1.Service, isDefaultLoadBalancer bool) bool {
return shouldExposeClusterIP(svc, isDefaultLoadBalancer) || shouldExposeDNSName(svc)
}
func (a *ServiceReconciler) shouldExposeDNSName(svc *corev1.Service) bool {
func shouldExposeDNSName(svc *corev1.Service) bool {
return hasExposeAnnotation(svc) && svc.Spec.Type == corev1.ServiceTypeExternalName && svc.Spec.ExternalName != ""
}
func (a *ServiceReconciler) shouldExposeClusterIP(svc *corev1.Service) bool {
func shouldExposeClusterIP(svc *corev1.Service, isDefaultLoadBalancer bool) bool {
if svc.Spec.ClusterIP == "" {
return false
}
return isTailscaleLoadBalancerService(svc, a.isDefaultLoadBalancer) || hasExposeAnnotation(svc)
return isTailscaleLoadBalancerService(svc, isDefaultLoadBalancer) || hasExposeAnnotation(svc)
}
func isTailscaleLoadBalancerService(svc *corev1.Service, isDefaultLoadBalancer bool) bool {

View File

@@ -0,0 +1,221 @@
// Copyright (c) Tailscale Inc & contributors
// SPDX-License-Identifier: BSD-3-Clause
//go:build !plan9
package main
import (
"context"
"slices"
"testing"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
"tailscale.com/kube/kubetypes"
"tailscale.com/tstest"
)
func TestService_DefaultProxyClassInitiallyNotReady(t *testing.T) {
pc := &tsapi.ProxyClass{
ObjectMeta: metav1.ObjectMeta{Name: "custom-metadata"},
Spec: tsapi.ProxyClassSpec{
TailscaleConfig: &tsapi.TailscaleConfig{
AcceptRoutes: true,
},
StatefulSet: &tsapi.StatefulSet{
Labels: tsapi.Labels{"foo": "bar"},
Annotations: map[string]string{"bar.io/foo": "some-val"},
Pod: &tsapi.Pod{Annotations: map[string]string{"foo.io/bar": "some-val"}},
},
},
}
fc := fake.NewClientBuilder().
WithScheme(tsapi.GlobalScheme).
WithObjects(pc).
WithStatusSubresource(pc).
Build()
ft := &fakeTSClient{}
zl := zap.Must(zap.NewDevelopment())
clock := tstest.NewClock(tstest.ClockOpts{})
sr := &ServiceReconciler{
Client: fc,
ssr: &tailscaleSTSReconciler{
Client: fc,
tsClient: ft,
defaultTags: []string{"tag:k8s"},
operatorNamespace: "operator-ns",
proxyImage: "tailscale/tailscale",
},
defaultProxyClass: "custom-metadata",
logger: zl.Sugar(),
clock: clock,
}
// 1. A new tailscale LoadBalancer Service is created but the default
// ProxyClass is not ready yet.
mustCreate(t, fc, &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
// The apiserver is supposed to set the UID, but the fake client
// doesn't. So, set it explicitly because other code later depends
// on it being set.
UID: types.UID("1234-UID"),
},
Spec: corev1.ServiceSpec{
ClusterIP: "10.20.30.40",
Type: corev1.ServiceTypeLoadBalancer,
LoadBalancerClass: new("tailscale"),
},
})
expectReconciled(t, sr, "default", "test")
labels := map[string]string{
kubetypes.LabelManaged: "true",
LabelParentName: "test",
LabelParentNamespace: "operator-ns",
LabelParentType: "svc",
}
s, err := getSingleObject[corev1.Secret](context.Background(), fc, "operator-ns", labels)
if err != nil {
t.Fatalf("finding Secret for %q: %v", "test", err)
}
if s != nil {
t.Fatalf("expected no Secret to be created when default ProxyClass is not ready, but found one: %v", s)
}
// 2. ProxyClass is set to Ready, the Service can become ready now.
mustUpdateStatus(t, fc, "", "custom-metadata", func(pc *tsapi.ProxyClass) {
pc.Status = tsapi.ProxyClassStatus{
Conditions: []metav1.Condition{{
Status: metav1.ConditionTrue,
Type: string(tsapi.ProxyClassReady),
ObservedGeneration: pc.Generation,
}},
}
})
expectReconciled(t, sr, "default", "test")
fullName, shortName := findGenName(t, fc, "default", "test", "svc")
opts := configOpts{
replicas: new(int32(1)),
stsName: shortName,
secretName: fullName,
namespace: "default",
parentType: "svc",
hostname: "default-test",
clusterTargetIP: "10.20.30.40",
app: kubetypes.AppIngressProxy,
proxyClass: pc.Name,
}
expectEqual(t, fc, expectedSecret(t, fc, opts))
expectEqual(t, fc, expectedHeadlessService(shortName, "svc"))
expectEqual(t, fc, expectedSTS(t, fc, opts), removeResourceReqs)
}
func TestProxyClassHandlerForSvc(t *testing.T) {
svc := func(name string, annotations, labels map[string]string) *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "default",
Annotations: annotations,
Labels: labels,
},
Spec: corev1.ServiceSpec{
ClusterIP: "1.2.3.4",
},
}
}
lbSvc := func(name string, annotations map[string]string, class *string) *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: "foo",
Annotations: annotations,
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeLoadBalancer,
LoadBalancerClass: class,
ClusterIP: "1.2.3.4",
},
}
}
const (
defaultPCName = "default-proxyclass"
otherPCName = "other-proxyclass"
unreferencedPCName = "unreferenced-proxyclass"
)
fc := fake.NewClientBuilder().
WithScheme(tsapi.GlobalScheme).
WithIndex(&corev1.Service{}, indexServiceProxyClass, indexProxyClass).
WithIndex(&corev1.Service{}, indexServiceExposed, indexExposed).
WithIndex(&corev1.Service{}, indexServiceType, indexType).
WithObjects(
svc("not-exposed", nil, nil),
svc("exposed-default", map[string]string{AnnotationExpose: "true"}, nil),
svc("exposed-other", map[string]string{AnnotationExpose: "true", LabelAnnotationProxyClass: otherPCName}, nil),
svc("annotated", map[string]string{LabelAnnotationProxyClass: defaultPCName}, nil),
svc("labelled", nil, map[string]string{LabelAnnotationProxyClass: defaultPCName}),
lbSvc("lb-svc", nil, new("tailscale")),
lbSvc("lb-svc-no-class", nil, nil),
lbSvc("lb-svc-other-class", nil, new("other")),
lbSvc("lb-svc-other-pc", map[string]string{LabelAnnotationProxyClass: otherPCName}, nil),
).
Build()
zl := zap.Must(zap.NewDevelopment())
mapFunc := proxyClassHandlerForSvc(fc, zl.Sugar(), defaultPCName, true)
for _, tc := range []struct {
name string
proxyClassName string
expected []reconcile.Request
}{
{
name: "default_ProxyClass",
proxyClassName: defaultPCName,
expected: []reconcile.Request{
{NamespacedName: types.NamespacedName{Namespace: "default", Name: "exposed-default"}},
{NamespacedName: types.NamespacedName{Namespace: "default", Name: "annotated"}},
{NamespacedName: types.NamespacedName{Namespace: "default", Name: "labelled"}},
{NamespacedName: types.NamespacedName{Namespace: "foo", Name: "lb-svc"}},
{NamespacedName: types.NamespacedName{Namespace: "foo", Name: "lb-svc-no-class"}},
},
},
{
name: "other_ProxyClass",
proxyClassName: otherPCName,
expected: []reconcile.Request{
{NamespacedName: types.NamespacedName{Namespace: "default", Name: "exposed-other"}},
{NamespacedName: types.NamespacedName{Namespace: "foo", Name: "lb-svc-other-pc"}},
},
},
{
name: "unreferenced_ProxyClass",
proxyClassName: unreferencedPCName,
expected: nil,
},
} {
t.Run(tc.name, func(t *testing.T) {
reqs := mapFunc(t.Context(), &tsapi.ProxyClass{
ObjectMeta: metav1.ObjectMeta{
Name: tc.proxyClassName,
},
})
if len(reqs) != len(tc.expected) {
t.Fatalf("expected %d requests, got %d: %v", len(tc.expected), len(reqs), reqs)
}
for _, expected := range tc.expected {
if !slices.Contains(reqs, expected) {
t.Errorf("expected request for Service %q not found in results: %v", expected.Name, reqs)
}
}
})
}
}