Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 79117b2

Browse files
committedFeb 19, 2025··
implement priority queue for handlers
Signed-off-by: Troy Connor <troy0820@users.noreply.github.com>
1 parent ba53477 commit 79117b2

File tree

5 files changed

+214
-9
lines changed

5 files changed

+214
-9
lines changed
 

‎pkg/handler/enqueue.go

+29-6
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"k8s.io/apimachinery/pkg/types"
2424
"k8s.io/client-go/util/workqueue"
2525
"sigs.k8s.io/controller-runtime/pkg/client"
26+
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
2627
"sigs.k8s.io/controller-runtime/pkg/event"
2728
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
2829
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -52,25 +53,47 @@ func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.
5253
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
5354
return
5455
}
55-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
56+
57+
item := reconcile.Request{NamespacedName: types.NamespacedName{
5658
Name: evt.Object.GetName(),
5759
Namespace: evt.Object.GetNamespace(),
58-
}})
60+
}}
61+
62+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[reconcile.Request])
63+
if !isPriorityQueue {
64+
q.Add(item)
65+
return
66+
}
67+
addToPriorityQueueCreate(priorityQueue, evt, item)
5968
}
6069

6170
// Update implements EventHandler.
6271
func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
6372
switch {
6473
case !isNil(evt.ObjectNew):
65-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
74+
item := reconcile.Request{NamespacedName: types.NamespacedName{
6675
Name: evt.ObjectNew.GetName(),
6776
Namespace: evt.ObjectNew.GetNamespace(),
68-
}})
77+
}}
78+
79+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[reconcile.Request])
80+
if !isPriorityQueue {
81+
q.Add(item)
82+
return
83+
}
84+
addToPriorityQueueUpdate(priorityQueue, evt, item)
6985
case !isNil(evt.ObjectOld):
70-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
86+
item := reconcile.Request{NamespacedName: types.NamespacedName{
7187
Name: evt.ObjectOld.GetName(),
7288
Namespace: evt.ObjectOld.GetNamespace(),
73-
}})
89+
}}
90+
91+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[reconcile.Request])
92+
if !isPriorityQueue {
93+
q.Add(item)
94+
return
95+
}
96+
addToPriorityQueueUpdate(priorityQueue, evt, item)
7497
default:
7598
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
7699
}

‎pkg/handler/enqueue_mapped.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type TypedMapFunc[object any, request comparable] func(context.Context, object)
4646
// For UpdateEvents which contain both a new and old object, the transformation function is run on both
4747
// objects and both sets of Requests are enqueue.
4848
func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler {
49-
return TypedEnqueueRequestsFromMapFunc(fn)
49+
return WithLowPriorityWhenUnchanged(TypedEnqueueRequestsFromMapFunc(fn))
5050
}
5151

5252
// TypedEnqueueRequestsFromMapFunc enqueues Requests by running a transformation function that outputs a collection

‎pkg/handler/enqueue_owner.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ type OwnerOption func(e enqueueRequestForOwnerInterface)
4848
//
4949
// - a handler.enqueueRequestForOwner EventHandler with an OwnerType of ReplicaSet and OnlyControllerOwner set to true.
5050
func EnqueueRequestForOwner(scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) EventHandler {
51-
return TypedEnqueueRequestForOwner[client.Object](scheme, mapper, ownerType, opts...)
51+
return WithLowPriorityWhenUnchanged(TypedEnqueueRequestForOwner[client.Object](scheme, mapper, ownerType, opts...))
5252
}
5353

5454
// TypedEnqueueRequestForOwner enqueues Requests for the Owners of an object. E.g. the object that created

‎pkg/handler/eventhandler.go

+62
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,27 @@ type TypedFuncs[object any, request comparable] struct {
111111
// Create implements EventHandler.
112112
func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCreateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
113113
if h.CreateFunc != nil {
114+
h.CreateFunc = func(ctx context.Context, e event.TypedCreateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
115+
h.Create(ctx, e, workqueueWithCustomAddFunc[request]{
116+
TypedRateLimitingInterface: trli,
117+
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
118+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
119+
if !isPriorityQueue {
120+
q.Add(item)
121+
return
122+
}
123+
var priority int
124+
obj, ok := any(e).(event.TypedCreateEvent[client.Object])
125+
if !ok {
126+
return
127+
}
128+
if isObjectUnchanged(obj) {
129+
priority = LowPriority
130+
}
131+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
132+
},
133+
})
134+
}
114135
h.CreateFunc(ctx, e, q)
115136
}
116137
}
@@ -125,6 +146,27 @@ func (h TypedFuncs[object, request]) Delete(ctx context.Context, e event.TypedDe
125146
// Update implements EventHandler.
126147
func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
127148
if h.UpdateFunc != nil {
149+
h.UpdateFunc = func(ctx context.Context, tue event.TypedUpdateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
150+
h.Update(ctx, e, workqueueWithCustomAddFunc[request]{
151+
TypedRateLimitingInterface: trli,
152+
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
153+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
154+
if !isPriorityQueue {
155+
q.Add(item)
156+
return
157+
}
158+
var priority int
159+
obj, ok := any(tue).(event.TypedUpdateEvent[client.Object])
160+
if !ok {
161+
return
162+
}
163+
if obj.ObjectOld.GetResourceVersion() == obj.ObjectNew.GetResourceVersion() {
164+
priority = LowPriority
165+
}
166+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
167+
},
168+
})
169+
}
128170
h.UpdateFunc(ctx, e, q)
129171
}
130172
}
@@ -199,3 +241,23 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) {
199241
func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool {
200242
return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute))
201243
}
244+
245+
// addToPriorityQueueCreate adds the reconcile.Request to the priorityqueue in the handler
246+
// for Create requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
247+
func addToPriorityQueueCreate[T client.Object](q priorityqueue.PriorityQueue[reconcile.Request], evt event.TypedCreateEvent[T], item reconcile.Request) {
248+
var priority int
249+
if isObjectUnchanged(evt) {
250+
priority = LowPriority
251+
}
252+
q.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
253+
}
254+
255+
// addToPriorityQueueUpdate adds the reconcile.Request to the priorityqueue in the handler
256+
// for Update requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
257+
func addToPriorityQueueUpdate[T client.Object](q priorityqueue.PriorityQueue[reconcile.Request], evt event.TypedUpdateEvent[T], item reconcile.Request) {
258+
var priority int
259+
if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() {
260+
priority = LowPriority
261+
}
262+
q.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
263+
}

‎pkg/handler/eventhandler_test.go

+121-1
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,27 @@ var _ = Describe("Eventhandler", func() {
797797
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
798798
})
799799

800+
It("should lower the priority of a create request for an object that was created more than one minute in the past without the WithLowPriorityWrapper", func() {
801+
actualOpts := priorityqueue.AddOpts{}
802+
var actualRequests []reconcile.Request
803+
wq := &fakePriorityQueue{
804+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
805+
actualOpts = o
806+
actualRequests = items
807+
},
808+
}
809+
810+
h := &handler.EnqueueRequestForObject{}
811+
h.Create(ctx, event.CreateEvent{
812+
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
813+
Name: "my-pod",
814+
}},
815+
}, wq)
816+
817+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
818+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
819+
})
820+
800821
It("should not lower the priority of a create request for an object that was created less than one minute in the past", func() {
801822
actualOpts := priorityqueue.AddOpts{}
802823
var actualRequests []reconcile.Request
@@ -819,6 +840,28 @@ var _ = Describe("Eventhandler", func() {
819840
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
820841
})
821842

843+
It("should not lower the priority of a create request for an object that was created less than one minute in the past without the WithLowPriority wrapperr", func() {
844+
actualOpts := priorityqueue.AddOpts{}
845+
var actualRequests []reconcile.Request
846+
wq := &fakePriorityQueue{
847+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
848+
actualOpts = o
849+
actualRequests = items
850+
},
851+
}
852+
853+
h := &handler.EnqueueRequestForObject{}
854+
h.Create(ctx, event.CreateEvent{
855+
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
856+
Name: "my-pod",
857+
CreationTimestamp: metav1.Now(),
858+
}},
859+
}, wq)
860+
861+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
862+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
863+
})
864+
822865
It("should lower the priority of an update request with unchanged RV", func() {
823866
actualOpts := priorityqueue.AddOpts{}
824867
var actualRequests []reconcile.Request
@@ -843,6 +886,30 @@ var _ = Describe("Eventhandler", func() {
843886
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
844887
})
845888

889+
It("should lower the priority of an update request with unchanged RV without the WithLowPriority wrapper", func() {
890+
actualOpts := priorityqueue.AddOpts{}
891+
var actualRequests []reconcile.Request
892+
wq := &fakePriorityQueue{
893+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
894+
actualOpts = o
895+
actualRequests = items
896+
},
897+
}
898+
899+
h := &handler.EnqueueRequestForObject{}
900+
h.Update(ctx, event.UpdateEvent{
901+
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
902+
Name: "my-pod",
903+
}},
904+
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
905+
Name: "my-pod",
906+
}},
907+
}, wq)
908+
909+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
910+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
911+
})
912+
846913
It("should not lower the priority of an update request with changed RV", func() {
847914
actualOpts := priorityqueue.AddOpts{}
848915
var actualRequests []reconcile.Request
@@ -868,6 +935,31 @@ var _ = Describe("Eventhandler", func() {
868935
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
869936
})
870937

938+
It("should not lower the priority of an update request with changed RV without the WithLowPriority wrapper", func() {
939+
actualOpts := priorityqueue.AddOpts{}
940+
var actualRequests []reconcile.Request
941+
wq := &fakePriorityQueue{
942+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
943+
actualOpts = o
944+
actualRequests = items
945+
},
946+
}
947+
948+
h := &handler.EnqueueRequestForObject{}
949+
h.Update(ctx, event.UpdateEvent{
950+
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
951+
Name: "my-pod",
952+
}},
953+
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
954+
Name: "my-pod",
955+
ResourceVersion: "1",
956+
}},
957+
}, wq)
958+
959+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
960+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
961+
})
962+
871963
It("should have no effect on create if the workqueue is not a priorityqueue", func() {
872964
h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
873965
h.Create(ctx, event.CreateEvent{
@@ -881,6 +973,19 @@ var _ = Describe("Eventhandler", func() {
881973
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
882974
})
883975

976+
It("should have no effect on create if the workqueue is not a priorityqueue without the WithLowPriority wrapper", func() {
977+
h := &handler.EnqueueRequestForObject{}
978+
h.Create(ctx, event.CreateEvent{
979+
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
980+
Name: "my-pod",
981+
}},
982+
}, q)
983+
984+
Expect(q.Len()).To(Equal(1))
985+
item, _ := q.Get()
986+
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
987+
})
988+
884989
It("should have no effect on Update if the workqueue is not a priorityqueue", func() {
885990
h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
886991
h.Update(ctx, event.UpdateEvent{
@@ -896,8 +1001,23 @@ var _ = Describe("Eventhandler", func() {
8961001
item, _ := q.Get()
8971002
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
8981003
})
899-
})
9001004

1005+
It("should have no effect on Update if the workqueue is not a priorityqueue without the WithLowPriority wrapper", func() {
1006+
h := &handler.EnqueueRequestForObject{}
1007+
h.Update(ctx, event.UpdateEvent{
1008+
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
1009+
Name: "my-pod",
1010+
}},
1011+
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
1012+
Name: "my-pod",
1013+
}},
1014+
}, q)
1015+
1016+
Expect(q.Len()).To(Equal(1))
1017+
item, _ := q.Get()
1018+
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
1019+
})
1020+
})
9011021
})
9021022

9031023
type fakePriorityQueue struct {

0 commit comments

Comments
 (0)
Please sign in to comment.