@@ -35,11 +35,13 @@ import (
35
35
"k8s.io/utils/clock"
36
36
"k8s.io/utils/ptr"
37
37
ctrl "sigs.k8s.io/controller-runtime"
38
+ "sigs.k8s.io/controller-runtime/pkg/builder"
38
39
"sigs.k8s.io/controller-runtime/pkg/client"
39
40
"sigs.k8s.io/controller-runtime/pkg/controller"
40
41
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
41
42
"sigs.k8s.io/controller-runtime/pkg/event"
42
43
"sigs.k8s.io/controller-runtime/pkg/handler"
44
+ "sigs.k8s.io/controller-runtime/pkg/predicate"
43
45
"sigs.k8s.io/controller-runtime/pkg/reconcile"
44
46
"sigs.k8s.io/controller-runtime/pkg/source"
45
47
@@ -79,6 +81,9 @@ type ClusterQueueReconciler struct {
79
81
clock clock.Clock
80
82
}
81
83
84
+ var _ reconcile.Reconciler = (* ClusterQueueReconciler )(nil )
85
+ var _ predicate.TypedPredicate [* kueue.ClusterQueue ] = (* ClusterQueueReconciler )(nil )
86
+
82
87
type ClusterQueueReconcilerOptions struct {
83
88
Watchers []ClusterQueueUpdateWatcher
84
89
ReportResourceMetrics bool
@@ -311,87 +316,66 @@ func (r *ClusterQueueReconciler) NotifyAdmissionCheckUpdate(oldAc, newAc *kueue.
311
316
// Event handlers return true to signal the controller to reconcile the
312
317
// ClusterQueue associated with the event.
313
318
314
- func (r * ClusterQueueReconciler ) Create (e event.CreateEvent ) bool {
315
- cq , match := e .Object .(* kueue.ClusterQueue )
316
- if ! match {
317
- // No need to interact with the cache for other objects.
318
- return true
319
- }
320
- defer r .notifyWatchers (nil , cq )
319
+ func (r * ClusterQueueReconciler ) Create (e event.TypedCreateEvent [* kueue.ClusterQueue ]) bool {
320
+ defer r .notifyWatchers (nil , e .Object )
321
321
322
- log := r .log .WithValues ("clusterQueue" , klog .KObj (cq ))
322
+ log := r .log .WithValues ("clusterQueue" , klog .KObj (e . Object ))
323
323
log .V (2 ).Info ("ClusterQueue create event" )
324
324
ctx := ctrl .LoggerInto (context .Background (), log )
325
- if err := r .cache .AddClusterQueue (ctx , cq ); err != nil {
325
+ if err := r .cache .AddClusterQueue (ctx , e . Object ); err != nil {
326
326
log .Error (err , "Failed to add clusterQueue to cache" )
327
327
}
328
328
329
- if err := r .qManager .AddClusterQueue (ctx , cq ); err != nil {
329
+ if err := r .qManager .AddClusterQueue (ctx , e . Object ); err != nil {
330
330
log .Error (err , "Failed to add clusterQueue to queue manager" )
331
331
}
332
332
333
333
if r .reportResourceMetrics {
334
- recordResourceMetrics (cq )
334
+ recordResourceMetrics (e . Object )
335
335
}
336
336
337
337
return true
338
338
}
339
339
340
- func (r * ClusterQueueReconciler ) Delete (e event.DeleteEvent ) bool {
341
- cq , match := e .Object .(* kueue.ClusterQueue )
342
- if ! match {
343
- // No need to interact with the cache for other objects.
344
- return true
345
- }
346
- defer r .notifyWatchers (cq , nil )
340
+ func (r * ClusterQueueReconciler ) Delete (e event.TypedDeleteEvent [* kueue.ClusterQueue ]) bool {
341
+ defer r .notifyWatchers (e .Object , nil )
347
342
348
- r .log .V (2 ).Info ("ClusterQueue delete event" , "clusterQueue" , klog .KObj (cq ))
349
- r .cache .DeleteClusterQueue (cq )
350
- r .qManager .DeleteClusterQueue (cq )
351
- r .qManager .DeleteSnapshot (cq )
343
+ r .log .V (2 ).Info ("ClusterQueue delete event" , "clusterQueue" , klog .KObj (e . Object ))
344
+ r .cache .DeleteClusterQueue (e . Object )
345
+ r .qManager .DeleteClusterQueue (e . Object )
346
+ r .qManager .DeleteSnapshot (e . Object )
352
347
353
- metrics .ClearClusterQueueResourceMetrics (cq .Name )
354
- r .log .V (2 ).Info ("Cleared resource metrics for deleted ClusterQueue." , "clusterQueue" , klog .KObj (cq ))
348
+ metrics .ClearClusterQueueResourceMetrics (e . Object .Name )
349
+ r .log .V (2 ).Info ("Cleared resource metrics for deleted ClusterQueue." , "clusterQueue" , klog .KObj (e . Object ))
355
350
356
351
return true
357
352
}
358
353
359
- func (r * ClusterQueueReconciler ) Update (e event.UpdateEvent ) bool {
360
- oldCq , match := e .ObjectOld .(* kueue.ClusterQueue )
361
- if ! match {
362
- // No need to interact with the cache for other objects.
363
- return true
364
- }
365
- newCq , match := e .ObjectNew .(* kueue.ClusterQueue )
366
- if ! match {
367
- // No need to interact with the cache for other objects.
368
- return true
369
- }
370
-
371
- log := r .log .WithValues ("clusterQueue" , klog .KObj (newCq ))
354
+ func (r * ClusterQueueReconciler ) Update (e event.TypedUpdateEvent [* kueue.ClusterQueue ]) bool {
355
+ log := r .log .WithValues ("clusterQueue" , klog .KObj (e .ObjectNew ))
372
356
log .V (2 ).Info ("ClusterQueue update event" )
373
357
374
- if newCq .DeletionTimestamp != nil {
358
+ if e . ObjectNew .DeletionTimestamp != nil {
375
359
return true
376
360
}
377
- defer r .notifyWatchers (oldCq , newCq )
378
- specUpdated := ! equality .Semantic .DeepEqual (oldCq . Spec , newCq .Spec )
361
+ defer r .notifyWatchers (e . ObjectOld , e . ObjectNew )
362
+ specUpdated := ! equality .Semantic .DeepEqual (e . ObjectOld . Spec , e . ObjectNew .Spec )
379
363
380
- if err := r .cache .UpdateClusterQueue (newCq ); err != nil {
364
+ if err := r .cache .UpdateClusterQueue (e . ObjectNew ); err != nil {
381
365
log .Error (err , "Failed to update clusterQueue in cache" )
382
366
}
383
- if err := r .qManager .UpdateClusterQueue (context .Background (), newCq , specUpdated ); err != nil {
367
+ if err := r .qManager .UpdateClusterQueue (context .Background (), e . ObjectNew , specUpdated ); err != nil {
384
368
log .Error (err , "Failed to update clusterQueue in queue manager" )
385
369
}
386
370
387
371
if r .reportResourceMetrics {
388
- updateResourceMetrics (oldCq , newCq )
372
+ updateResourceMetrics (e . ObjectOld , e . ObjectNew )
389
373
}
390
374
return true
391
375
}
392
376
393
- func (r * ClusterQueueReconciler ) Generic (e event.GenericEvent ) bool {
394
- r .log .V (2 ).Info ("Got generic event" , "obj " , klog .KObj (e .Object ), "kind" , e . Object . GetObjectKind (). GroupVersionKind ( ))
377
+ func (r * ClusterQueueReconciler ) Generic (e event.TypedGenericEvent [ * kueue. ClusterQueue ] ) bool {
378
+ r .log .V (3 ).Info ("Got ClusterQueue generic event" , "clusterQueue " , klog .KObj (e .Object ))
395
379
return true
396
380
}
397
381
@@ -595,13 +579,18 @@ func (r *ClusterQueueReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.
595
579
snapHandler := cqSnapshotHandler {
596
580
queueVisibilityUpdateInterval : r .queueVisibilityUpdateInterval ,
597
581
}
598
- return ctrl .NewControllerManagedBy (mgr ).
599
- For (& kueue.ClusterQueue {}).
582
+ return builder.TypedControllerManagedBy [reconcile.Request ](mgr ).
583
+ Named ("clusterqueue_controller" ).
584
+ WatchesRawSource (source .TypedKind (
585
+ mgr .GetCache (),
586
+ & kueue.ClusterQueue {},
587
+ & handler.TypedEnqueueRequestForObject [* kueue.ClusterQueue ]{},
588
+ r ,
589
+ )).
600
590
WithOptions (controller.Options {NeedLeaderElection : ptr .To (false )}).
601
591
Watches (& corev1.Namespace {}, & nsHandler ).
602
592
WatchesRawSource (source .Channel (r .snapUpdateCh , & snapHandler )).
603
593
WatchesRawSource (source .Channel (r .nonCQObjectUpdateCh , & nonCQObjectHandler {})).
604
- WithEventFilter (r ).
605
594
Complete (WithLeadingManager (mgr , r , & kueue.ClusterQueue {}, cfg ))
606
595
}
607
596
0 commit comments