Skip to content

Commit f80bc5d

Browse files
authored
🐛Implement priorityqueue as default on handlers if using priorityqueue interface (#3111)
* implement priority queue for handlers Signed-off-by: Troy Connor <[email protected]> * check object before placing in priorityqueue Signed-off-by: Troy Connor <[email protected]> * implement priority queue Signed-off-by: Troy Connor <[email protected]> --------- Signed-off-by: Troy Connor <[email protected]>
1 parent 0354f47 commit f80bc5d

File tree

4 files changed

+171
-30
lines changed

4 files changed

+171
-30
lines changed

pkg/handler/enqueue.go

+13-6
Original file line numberDiff line numberDiff line change
@@ -52,25 +52,32 @@ func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.
5252
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
5353
return
5454
}
55-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
55+
56+
item := reconcile.Request{NamespacedName: types.NamespacedName{
5657
Name: evt.Object.GetName(),
5758
Namespace: evt.Object.GetNamespace(),
58-
}})
59+
}}
60+
61+
addToQueueCreate(q, evt, item)
5962
}
6063

6164
// Update implements EventHandler.
6265
func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
6366
switch {
6467
case !isNil(evt.ObjectNew):
65-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
68+
item := reconcile.Request{NamespacedName: types.NamespacedName{
6669
Name: evt.ObjectNew.GetName(),
6770
Namespace: evt.ObjectNew.GetNamespace(),
68-
}})
71+
}}
72+
73+
addToQueueUpdate(q, evt, item)
6974
case !isNil(evt.ObjectOld):
70-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
75+
item := reconcile.Request{NamespacedName: types.NamespacedName{
7176
Name: evt.ObjectOld.GetName(),
7277
Namespace: evt.ObjectOld.GetNamespace(),
73-
}})
78+
}}
79+
80+
addToQueueUpdate(q, evt, item)
7481
default:
7582
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
7683
}

pkg/handler/enqueue_owner.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ type OwnerOption func(e enqueueRequestForOwnerInterface)
4848
//
4949
// - a handler.enqueueRequestForOwner EventHandler with an OwnerType of ReplicaSet and OnlyControllerOwner set to true.
5050
func EnqueueRequestForOwner(scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) EventHandler {
51-
return TypedEnqueueRequestForOwner[client.Object](scheme, mapper, ownerType, opts...)
51+
return WithLowPriorityWhenUnchanged(TypedEnqueueRequestForOwner[client.Object](scheme, mapper, ownerType, opts...))
5252
}
5353

5454
// TypedEnqueueRequestForOwner enqueues Requests for the Owners of an object. E.g. the object that created

pkg/handler/eventhandler.go

+35-21
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ type EventHandler = TypedEventHandler[client.Object, reconcile.Request]
6565
//
6666
// Unless you are implementing your own TypedEventHandler, you can ignore the functions on the TypedEventHandler interface.
6767
// Most users shouldn't need to implement their own TypedEventHandler.
68-
//
68+
6969
// TypedEventHandler is experimental and subject to future change.
7070
type TypedEventHandler[object any, request comparable] interface {
7171
// Create is called in response to a create event - e.g. Pod Creation.
@@ -149,33 +149,15 @@ func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u Ty
149149
u.Create(ctx, tce, workqueueWithCustomAddFunc[request]{
150150
TypedRateLimitingInterface: trli,
151151
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
152-
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
153-
if !isPriorityQueue {
154-
q.Add(item)
155-
return
156-
}
157-
var priority int
158-
if isObjectUnchanged(tce) {
159-
priority = LowPriority
160-
}
161-
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
152+
addToQueueCreate(q, tce, item)
162153
},
163154
})
164155
},
165156
UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
166157
u.Update(ctx, tue, workqueueWithCustomAddFunc[request]{
167158
TypedRateLimitingInterface: trli,
168159
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
169-
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
170-
if !isPriorityQueue {
171-
q.Add(item)
172-
return
173-
}
174-
var priority int
175-
if tue.ObjectOld.GetResourceVersion() == tue.ObjectNew.GetResourceVersion() {
176-
priority = LowPriority
177-
}
178-
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
160+
addToQueueUpdate(q, tue, item)
179161
},
180162
})
181163
},
@@ -199,3 +181,35 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) {
199181
func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool {
200182
return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute))
201183
}
184+
185+
// addToQueueCreate adds the reconcile.Request to the priorityqueue in the handler
186+
// for Create requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
187+
func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedCreateEvent[T], item request) {
188+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
189+
if !isPriorityQueue {
190+
q.Add(item)
191+
return
192+
}
193+
194+
var priority int
195+
if isObjectUnchanged(evt) {
196+
priority = LowPriority
197+
}
198+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
199+
}
200+
201+
// addToQueueUpdate adds the reconcile.Request to the priorityqueue in the handler
202+
// for Update requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
203+
func addToQueueUpdate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedUpdateEvent[T], item request) {
204+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
205+
if !isPriorityQueue {
206+
q.Add(item)
207+
return
208+
}
209+
210+
var priority int
211+
if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() {
212+
priority = LowPriority
213+
}
214+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
215+
}

pkg/handler/eventhandler_test.go

+122-2
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ var _ = Describe("Eventhandler", func() {
659659
})
660660

661661
Describe("Funcs", func() {
662-
failingFuncs := handler.Funcs{
662+
failingFuncs := handler.TypedFuncs[client.Object, reconcile.Request]{
663663
CreateFunc: func(context.Context, event.CreateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) {
664664
defer GinkgoRecover()
665665
Fail("Did not expect CreateEvent to be called.")
@@ -797,6 +797,27 @@ var _ = Describe("Eventhandler", func() {
797797
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
798798
})
799799

800+
It("should lower the priority of a create request for an object that was created more than one minute in the past without the WithLowPriorityWrapper", func() {
801+
actualOpts := priorityqueue.AddOpts{}
802+
var actualRequests []reconcile.Request
803+
wq := &fakePriorityQueue{
804+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
805+
actualOpts = o
806+
actualRequests = items
807+
},
808+
}
809+
810+
h := &handler.EnqueueRequestForObject{}
811+
h.Create(ctx, event.CreateEvent{
812+
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
813+
Name: "my-pod",
814+
}},
815+
}, wq)
816+
817+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
818+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
819+
})
820+
800821
It("should not lower the priority of a create request for an object that was created less than one minute in the past", func() {
801822
actualOpts := priorityqueue.AddOpts{}
802823
var actualRequests []reconcile.Request
@@ -819,6 +840,28 @@ var _ = Describe("Eventhandler", func() {
819840
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
820841
})
821842

843+
It("should not lower the priority of a create request for an object that was created less than one minute in the past without the WithLowPriority wrapperr", func() {
844+
actualOpts := priorityqueue.AddOpts{}
845+
var actualRequests []reconcile.Request
846+
wq := &fakePriorityQueue{
847+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
848+
actualOpts = o
849+
actualRequests = items
850+
},
851+
}
852+
853+
h := &handler.EnqueueRequestForObject{}
854+
h.Create(ctx, event.CreateEvent{
855+
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
856+
Name: "my-pod",
857+
CreationTimestamp: metav1.Now(),
858+
}},
859+
}, wq)
860+
861+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
862+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
863+
})
864+
822865
It("should lower the priority of an update request with unchanged RV", func() {
823866
actualOpts := priorityqueue.AddOpts{}
824867
var actualRequests []reconcile.Request
@@ -843,6 +886,30 @@ var _ = Describe("Eventhandler", func() {
843886
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
844887
})
845888

889+
It("should lower the priority of an update request with unchanged RV without the WithLowPriority wrapper", func() {
890+
actualOpts := priorityqueue.AddOpts{}
891+
var actualRequests []reconcile.Request
892+
wq := &fakePriorityQueue{
893+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
894+
actualOpts = o
895+
actualRequests = items
896+
},
897+
}
898+
899+
h := &handler.EnqueueRequestForObject{}
900+
h.Update(ctx, event.UpdateEvent{
901+
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
902+
Name: "my-pod",
903+
}},
904+
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
905+
Name: "my-pod",
906+
}},
907+
}, wq)
908+
909+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
910+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
911+
})
912+
846913
It("should not lower the priority of an update request with changed RV", func() {
847914
actualOpts := priorityqueue.AddOpts{}
848915
var actualRequests []reconcile.Request
@@ -868,6 +935,31 @@ var _ = Describe("Eventhandler", func() {
868935
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
869936
})
870937

938+
It("should not lower the priority of an update request with changed RV without the WithLowPriority wrapper", func() {
939+
actualOpts := priorityqueue.AddOpts{}
940+
var actualRequests []reconcile.Request
941+
wq := &fakePriorityQueue{
942+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
943+
actualOpts = o
944+
actualRequests = items
945+
},
946+
}
947+
948+
h := &handler.EnqueueRequestForObject{}
949+
h.Update(ctx, event.UpdateEvent{
950+
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
951+
Name: "my-pod",
952+
}},
953+
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
954+
Name: "my-pod",
955+
ResourceVersion: "1",
956+
}},
957+
}, wq)
958+
959+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
960+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
961+
})
962+
871963
It("should have no effect on create if the workqueue is not a priorityqueue", func() {
872964
h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
873965
h.Create(ctx, event.CreateEvent{
@@ -881,6 +973,19 @@ var _ = Describe("Eventhandler", func() {
881973
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
882974
})
883975

976+
It("should have no effect on create if the workqueue is not a priorityqueue without the WithLowPriority wrapper", func() {
977+
h := &handler.EnqueueRequestForObject{}
978+
h.Create(ctx, event.CreateEvent{
979+
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
980+
Name: "my-pod",
981+
}},
982+
}, q)
983+
984+
Expect(q.Len()).To(Equal(1))
985+
item, _ := q.Get()
986+
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
987+
})
988+
884989
It("should have no effect on Update if the workqueue is not a priorityqueue", func() {
885990
h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
886991
h.Update(ctx, event.UpdateEvent{
@@ -896,8 +1001,23 @@ var _ = Describe("Eventhandler", func() {
8961001
item, _ := q.Get()
8971002
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
8981003
})
899-
})
9001004

1005+
It("should have no effect on Update if the workqueue is not a priorityqueue without the WithLowPriority wrapper", func() {
1006+
h := &handler.EnqueueRequestForObject{}
1007+
h.Update(ctx, event.UpdateEvent{
1008+
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
1009+
Name: "my-pod",
1010+
}},
1011+
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
1012+
Name: "my-pod",
1013+
}},
1014+
}, q)
1015+
1016+
Expect(q.Len()).To(Equal(1))
1017+
item, _ := q.Get()
1018+
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
1019+
})
1020+
})
9011021
})
9021022

9031023
type fakePriorityQueue struct {

0 commit comments

Comments
 (0)