|
| 1 | +package watchdog |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "fmt" |
| 6 | + "strings" |
| 7 | + |
| 8 | + "github.com/pkg/errors" |
| 9 | + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" |
| 10 | + k8serrors "k8s.io/apimachinery/pkg/api/errors" |
| 11 | + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
| 12 | + "k8s.io/apimachinery/pkg/util/sets" |
| 13 | + ctrl "sigs.k8s.io/controller-runtime" |
| 14 | + "sigs.k8s.io/controller-runtime/pkg/builder" |
| 15 | + "sigs.k8s.io/controller-runtime/pkg/client" |
| 16 | + "sigs.k8s.io/controller-runtime/pkg/event" |
| 17 | + "sigs.k8s.io/controller-runtime/pkg/handler" |
| 18 | + "sigs.k8s.io/controller-runtime/pkg/manager" |
| 19 | + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" |
| 20 | + "sigs.k8s.io/controller-runtime/pkg/predicate" |
| 21 | + "sigs.k8s.io/controller-runtime/pkg/reconcile" |
| 22 | + "sigs.k8s.io/controller-runtime/pkg/source" |
| 23 | +) |
| 24 | + |
| 25 | +type resourceManager struct { |
| 26 | + cancelFn context.CancelFunc |
| 27 | + watchedVersions sets.Set[string] |
| 28 | +} |
| 29 | + |
| 30 | +type watchMap map[string]resourceManager |
| 31 | + |
| 32 | +type CRDWatcher struct { |
| 33 | + Client client.Client |
| 34 | + watchMap watchMap |
| 35 | + requeue chan event.GenericEvent |
| 36 | +} |
| 37 | + |
| 38 | +func (c *CRDWatcher) keyFunction(group, kind string) string { |
| 39 | + return fmt.Sprintf("%s-%s", group, kind) |
| 40 | +} |
| 41 | + |
| 42 | +func (c *CRDWatcher) register(ctx context.Context, group string, versions []string, kind string) error { |
| 43 | + mgr, _ := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ |
| 44 | + Scheme: c.Client.Scheme(), |
| 45 | + Metrics: metricsserver.Options{ |
| 46 | + BindAddress: "0", |
| 47 | + }, |
| 48 | + }) |
| 49 | + |
| 50 | + watchedVersions := sets.New[string]() |
| 51 | + |
| 52 | + for _, v := range versions { |
| 53 | + watchedVersions.Insert(v) |
| 54 | + |
| 55 | + gvk := metav1.GroupVersionKind{ |
| 56 | + Group: group, |
| 57 | + Version: v, |
| 58 | + Kind: kind, |
| 59 | + } |
| 60 | + //nolint:contextcheck |
| 61 | + if err := (&NamespacedWatcher{Client: c.Client}).SetupWithManager(mgr, gvk); err != nil { |
| 62 | + return err |
| 63 | + } |
| 64 | + } |
| 65 | + |
| 66 | + scopedCtx, scopedCancelFn := context.WithCancel(ctx) |
| 67 | + |
| 68 | + go func() { |
| 69 | + if err := mgr.Start(scopedCtx); err != nil { |
| 70 | + scopedCancelFn() |
| 71 | + } |
| 72 | + }() |
| 73 | + |
| 74 | + c.watchMap[c.keyFunction(group, kind)] = resourceManager{ |
| 75 | + cancelFn: scopedCancelFn, |
| 76 | + watchedVersions: watchedVersions, |
| 77 | + } |
| 78 | + |
| 79 | + return nil |
| 80 | +} |
| 81 | + |
| 82 | +func (c *CRDWatcher) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { |
| 83 | + crd := apiextensionsv1.CustomResourceDefinition{} |
| 84 | + if err := c.Client.Get(ctx, request.NamespacedName, &crd); err != nil { |
| 85 | + if k8serrors.IsNotFound(err) { |
| 86 | + return reconcile.Result{}, nil |
| 87 | + } |
| 88 | + |
| 89 | + return reconcile.Result{}, err |
| 90 | + } |
| 91 | + |
| 92 | + key := c.keyFunction(crd.Spec.Group, crd.Spec.Names.Kind) |
| 93 | + |
| 94 | + resourceMgr, found := c.watchMap[key] |
| 95 | + if !found && crd.DeletionTimestamp != nil { |
| 96 | + return reconcile.Result{}, nil |
| 97 | + } |
| 98 | + |
| 99 | + if !found && crd.DeletionTimestamp == nil { |
| 100 | + versions := make([]string, 0, len(crd.Spec.Versions)) |
| 101 | + |
| 102 | + for _, v := range crd.Spec.Versions { |
| 103 | + versions = append(versions, v.Name) |
| 104 | + } |
| 105 | + |
| 106 | + if err := c.register(ctx, crd.Spec.Group, versions, crd.Spec.Names.Kind); err != nil { |
| 107 | + return reconcile.Result{}, err |
| 108 | + } |
| 109 | + |
| 110 | + resourceMgr = c.watchMap[key] |
| 111 | + } |
| 112 | + |
| 113 | + if crd.DeletionTimestamp != nil { |
| 114 | + resourceMgr.cancelFn() |
| 115 | + delete(c.watchMap, key) |
| 116 | + |
| 117 | + return reconcile.Result{}, nil |
| 118 | + } |
| 119 | + |
| 120 | + for _, v := range crd.Spec.Versions { |
| 121 | + if !resourceMgr.watchedVersions.Has(v.Name) { |
| 122 | + resourceMgr.cancelFn() |
| 123 | + delete(c.watchMap, key) |
| 124 | + |
| 125 | + return reconcile.Result{Requeue: true}, nil |
| 126 | + } |
| 127 | + } |
| 128 | + |
| 129 | + return reconcile.Result{}, nil |
| 130 | +} |
| 131 | + |
| 132 | +func (c *CRDWatcher) SetupWithManager(ctx context.Context, mgr manager.Manager) error { |
| 133 | + c.watchMap = make(map[string]resourceManager) |
| 134 | + c.requeue = make(chan event.GenericEvent) |
| 135 | + |
| 136 | + apis, err := API(mgr.GetConfig()) |
| 137 | + if err != nil { |
| 138 | + return err |
| 139 | + } |
| 140 | + |
| 141 | + bundleGroupAndKind := map[string]sets.Set[string]{} |
| 142 | + |
| 143 | + for _, api := range apis { |
| 144 | + slashedName := fmt.Sprintf("%s/%s", api.Group, api.Kind) |
| 145 | + |
| 146 | + if _, ok := bundleGroupAndKind[slashedName]; !ok { |
| 147 | + bundleGroupAndKind[slashedName] = sets.Set[string]{} |
| 148 | + } |
| 149 | + |
| 150 | + bundleGroupAndKind[slashedName].Insert(api.Version) |
| 151 | + } |
| 152 | + |
| 153 | + for group, versions := range bundleGroupAndKind { |
| 154 | + parts := strings.Split(group, "/") |
| 155 | + |
| 156 | + apiGroup, apiKind := parts[0], parts[1] |
| 157 | + |
| 158 | + if registerErr := c.register(ctx, apiGroup, versions.UnsortedList(), apiKind); registerErr != nil { |
| 159 | + return errors.Wrap(err, "cannot register watcher prior to start-up") |
| 160 | + } |
| 161 | + } |
| 162 | + |
| 163 | + return ctrl.NewControllerManagedBy(mgr). |
| 164 | + WatchesRawSource(&source.Channel{Source: c.requeue}, &handler.EnqueueRequestForObject{}). |
| 165 | + For(&apiextensionsv1.CustomResourceDefinition{}, builder.WithPredicates(predicate.NewPredicateFuncs(func(object client.Object) bool { |
| 166 | + crd := object.(*apiextensionsv1.CustomResourceDefinition) |
| 167 | + |
| 168 | + return crd.Spec.Scope == apiextensionsv1.NamespaceScoped |
| 169 | + }))). |
| 170 | + Complete(c) |
| 171 | +} |
0 commit comments