Skip to content

Commit 8d804ba

Browse files
committed
implement priority queue for handlers
Signed-off-by: Troy Connor <[email protected]>
1 parent ba53477 commit 8d804ba

9 files changed

+253
-48
lines changed

pkg/handler/enqueue.go

+13-6
Original file line numberDiff line numberDiff line change
@@ -52,25 +52,32 @@ func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.
5252
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
5353
return
5454
}
55-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
55+
56+
item := reconcile.Request{NamespacedName: types.NamespacedName{
5657
Name: evt.Object.GetName(),
5758
Namespace: evt.Object.GetNamespace(),
58-
}})
59+
}}
60+
61+
addToQueueCreate(q, evt, item)
5962
}
6063

6164
// Update implements EventHandler.
6265
func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
6366
switch {
6467
case !isNil(evt.ObjectNew):
65-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
68+
item := reconcile.Request{NamespacedName: types.NamespacedName{
6669
Name: evt.ObjectNew.GetName(),
6770
Namespace: evt.ObjectNew.GetNamespace(),
68-
}})
71+
}}
72+
73+
addToQueueUpdate(q, evt, item)
6974
case !isNil(evt.ObjectOld):
70-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
75+
item := reconcile.Request{NamespacedName: types.NamespacedName{
7176
Name: evt.ObjectOld.GetName(),
7277
Namespace: evt.ObjectOld.GetNamespace(),
73-
}})
78+
}}
79+
80+
addToQueueUpdate(q, evt, item)
7481
default:
7582
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
7683
}

pkg/handler/enqueue_mapped.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type TypedMapFunc[object any, request comparable] func(context.Context, object)
4646
// For UpdateEvents which contain both a new and old object, the transformation function is run on both
4747
// objects and both sets of Requests are enqueue.
4848
func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler {
49-
return TypedEnqueueRequestsFromMapFunc(fn)
49+
return WithLowPriorityWhenUnchanged(TypedEnqueueRequestsFromMapFunc(fn))
5050
}
5151

5252
// TypedEnqueueRequestsFromMapFunc enqueues Requests by running a transformation function that outputs a collection

pkg/handler/enqueue_owner.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ type OwnerOption func(e enqueueRequestForOwnerInterface)
4848
//
4949
// - a handler.enqueueRequestForOwner EventHandler with an OwnerType of ReplicaSet and OnlyControllerOwner set to true.
5050
func EnqueueRequestForOwner(scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) EventHandler {
51-
return TypedEnqueueRequestForOwner[client.Object](scheme, mapper, ownerType, opts...)
51+
return WithLowPriorityWhenUnchanged(TypedEnqueueRequestForOwner[client.Object](scheme, mapper, ownerType, opts...))
5252
}
5353

5454
// TypedEnqueueRequestForOwner enqueues Requests for the Owners of an object. E.g. the object that created

pkg/handler/eventhandler.go

+99-21
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"time"
2222

23+
"k8s.io/apimachinery/pkg/types"
2324
"k8s.io/client-go/util/workqueue"
2425
"sigs.k8s.io/controller-runtime/pkg/client"
2526
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
@@ -83,9 +84,72 @@ type TypedEventHandler[object any, request comparable] interface {
8384
}
8485

8586
var _ EventHandler = Funcs{}
87+
var _ EventHandler = TypedFuncs[client.Object, reconcile.Request]{}
8688

8789
// Funcs implements eventhandler.
88-
type Funcs = TypedFuncs[client.Object, reconcile.Request]
90+
type Funcs = funcs[client.Object, reconcile.Request]
91+
92+
// funcs implements eventhandler
93+
type funcs[object client.Object, request reconcile.Request] struct{}
94+
95+
func (f funcs[T, R]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
96+
if isNil(evt.Object) {
97+
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
98+
return
99+
}
100+
101+
item := reconcile.Request{NamespacedName: types.NamespacedName{
102+
Name: evt.Object.GetName(),
103+
Namespace: evt.Object.GetNamespace(),
104+
}}
105+
addToQueueCreate(q, evt, item)
106+
}
107+
108+
// Update implements EventHandler.
109+
func (f funcs[T, R]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
110+
switch {
111+
case !isNil(evt.ObjectNew):
112+
item := reconcile.Request{NamespacedName: types.NamespacedName{
113+
Name: evt.ObjectNew.GetName(),
114+
Namespace: evt.ObjectNew.GetNamespace(),
115+
}}
116+
117+
addToQueueUpdate(q, evt, item)
118+
case !isNil(evt.ObjectOld):
119+
item := reconcile.Request{NamespacedName: types.NamespacedName{
120+
Name: evt.ObjectOld.GetName(),
121+
Namespace: evt.ObjectOld.GetNamespace(),
122+
}}
123+
124+
addToQueueUpdate(q, evt, item)
125+
default:
126+
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
127+
}
128+
}
129+
130+
// Delete implements EventHandler.
131+
func (f funcs[T, R]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
132+
if isNil(evt.Object) {
133+
enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
134+
return
135+
}
136+
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
137+
Name: evt.Object.GetName(),
138+
Namespace: evt.Object.GetNamespace(),
139+
}})
140+
}
141+
142+
// Generic implements EventHandler.
143+
func (f funcs[T, R]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
144+
if isNil(evt.Object) {
145+
enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt)
146+
return
147+
}
148+
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
149+
Name: evt.Object.GetName(),
150+
Namespace: evt.Object.GetNamespace(),
151+
}})
152+
}
89153

90154
// TypedFuncs implements eventhandler.
91155
//
@@ -149,33 +213,15 @@ func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u Ty
149213
u.Create(ctx, tce, workqueueWithCustomAddFunc[request]{
150214
TypedRateLimitingInterface: trli,
151215
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
152-
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
153-
if !isPriorityQueue {
154-
q.Add(item)
155-
return
156-
}
157-
var priority int
158-
if isObjectUnchanged(tce) {
159-
priority = LowPriority
160-
}
161-
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
216+
addToQueueCreate(q, tce, item)
162217
},
163218
})
164219
},
165220
UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
166221
u.Update(ctx, tue, workqueueWithCustomAddFunc[request]{
167222
TypedRateLimitingInterface: trli,
168223
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
169-
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
170-
if !isPriorityQueue {
171-
q.Add(item)
172-
return
173-
}
174-
var priority int
175-
if tue.ObjectOld.GetResourceVersion() == tue.ObjectNew.GetResourceVersion() {
176-
priority = LowPriority
177-
}
178-
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
224+
addToQueueUpdate(q, tue, item)
179225
},
180226
})
181227
},
@@ -199,3 +245,35 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) {
199245
func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool {
200246
return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute))
201247
}
248+
249+
// addToQueueCreate adds the reconcile.Request to the priorityqueue in the handler
250+
// for Create requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
251+
func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedCreateEvent[T], item request) {
252+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
253+
if !isPriorityQueue {
254+
q.Add(item)
255+
return
256+
}
257+
258+
var priority int
259+
if isObjectUnchanged(evt) {
260+
priority = LowPriority
261+
}
262+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
263+
}
264+
265+
// addToQueueUpdate adds the reconcile.Request to the priorityqueue in the handler
266+
// for Update requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
267+
func addToQueueUpdate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedUpdateEvent[T], item request) {
268+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
269+
if !isPriorityQueue {
270+
q.Add(item)
271+
return
272+
}
273+
274+
var priority int
275+
if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() {
276+
priority = LowPriority
277+
}
278+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
279+
}

pkg/handler/eventhandler_test.go

+122-2
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ var _ = Describe("Eventhandler", func() {
659659
})
660660

661661
Describe("Funcs", func() {
662-
failingFuncs := handler.Funcs{
662+
failingFuncs := handler.TypedFuncs[client.Object, reconcile.Request]{
663663
CreateFunc: func(context.Context, event.CreateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) {
664664
defer GinkgoRecover()
665665
Fail("Did not expect CreateEvent to be called.")
@@ -797,6 +797,27 @@ var _ = Describe("Eventhandler", func() {
797797
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
798798
})
799799

800+
It("should lower the priority of a create request for an object that was created more than one minute in the past without the WithLowPriorityWrapper", func() {
801+
actualOpts := priorityqueue.AddOpts{}
802+
var actualRequests []reconcile.Request
803+
wq := &fakePriorityQueue{
804+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
805+
actualOpts = o
806+
actualRequests = items
807+
},
808+
}
809+
810+
h := &handler.EnqueueRequestForObject{}
811+
h.Create(ctx, event.CreateEvent{
812+
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
813+
Name: "my-pod",
814+
}},
815+
}, wq)
816+
817+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
818+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
819+
})
820+
800821
It("should not lower the priority of a create request for an object that was created less than one minute in the past", func() {
801822
actualOpts := priorityqueue.AddOpts{}
802823
var actualRequests []reconcile.Request
@@ -819,6 +840,28 @@ var _ = Describe("Eventhandler", func() {
819840
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
820841
})
821842

843+
It("should not lower the priority of a create request for an object that was created less than one minute in the past without the WithLowPriority wrapperr", func() {
844+
actualOpts := priorityqueue.AddOpts{}
845+
var actualRequests []reconcile.Request
846+
wq := &fakePriorityQueue{
847+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
848+
actualOpts = o
849+
actualRequests = items
850+
},
851+
}
852+
853+
h := &handler.EnqueueRequestForObject{}
854+
h.Create(ctx, event.CreateEvent{
855+
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
856+
Name: "my-pod",
857+
CreationTimestamp: metav1.Now(),
858+
}},
859+
}, wq)
860+
861+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
862+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
863+
})
864+
822865
It("should lower the priority of an update request with unchanged RV", func() {
823866
actualOpts := priorityqueue.AddOpts{}
824867
var actualRequests []reconcile.Request
@@ -843,6 +886,30 @@ var _ = Describe("Eventhandler", func() {
843886
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
844887
})
845888

889+
It("should lower the priority of an update request with unchanged RV without the WithLowPriority wrapper", func() {
890+
actualOpts := priorityqueue.AddOpts{}
891+
var actualRequests []reconcile.Request
892+
wq := &fakePriorityQueue{
893+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
894+
actualOpts = o
895+
actualRequests = items
896+
},
897+
}
898+
899+
h := &handler.EnqueueRequestForObject{}
900+
h.Update(ctx, event.UpdateEvent{
901+
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
902+
Name: "my-pod",
903+
}},
904+
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
905+
Name: "my-pod",
906+
}},
907+
}, wq)
908+
909+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
910+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
911+
})
912+
846913
It("should not lower the priority of an update request with changed RV", func() {
847914
actualOpts := priorityqueue.AddOpts{}
848915
var actualRequests []reconcile.Request
@@ -868,6 +935,31 @@ var _ = Describe("Eventhandler", func() {
868935
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
869936
})
870937

938+
It("should not lower the priority of an update request with changed RV without the WithLowPriority wrapper", func() {
939+
actualOpts := priorityqueue.AddOpts{}
940+
var actualRequests []reconcile.Request
941+
wq := &fakePriorityQueue{
942+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
943+
actualOpts = o
944+
actualRequests = items
945+
},
946+
}
947+
948+
h := &handler.EnqueueRequestForObject{}
949+
h.Update(ctx, event.UpdateEvent{
950+
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
951+
Name: "my-pod",
952+
}},
953+
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
954+
Name: "my-pod",
955+
ResourceVersion: "1",
956+
}},
957+
}, wq)
958+
959+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
960+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
961+
})
962+
871963
It("should have no effect on create if the workqueue is not a priorityqueue", func() {
872964
h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
873965
h.Create(ctx, event.CreateEvent{
@@ -881,6 +973,19 @@ var _ = Describe("Eventhandler", func() {
881973
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
882974
})
883975

976+
It("should have no effect on create if the workqueue is not a priorityqueue without the WithLowPriority wrapper", func() {
977+
h := &handler.EnqueueRequestForObject{}
978+
h.Create(ctx, event.CreateEvent{
979+
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
980+
Name: "my-pod",
981+
}},
982+
}, q)
983+
984+
Expect(q.Len()).To(Equal(1))
985+
item, _ := q.Get()
986+
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
987+
})
988+
884989
It("should have no effect on Update if the workqueue is not a priorityqueue", func() {
885990
h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
886991
h.Update(ctx, event.UpdateEvent{
@@ -896,8 +1001,23 @@ var _ = Describe("Eventhandler", func() {
8961001
item, _ := q.Get()
8971002
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
8981003
})
899-
})
9001004

1005+
It("should have no effect on Update if the workqueue is not a priorityqueue without the WithLowPriority wrapper", func() {
1006+
h := &handler.EnqueueRequestForObject{}
1007+
h.Update(ctx, event.UpdateEvent{
1008+
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
1009+
Name: "my-pod",
1010+
}},
1011+
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
1012+
Name: "my-pod",
1013+
}},
1014+
}, q)
1015+
1016+
Expect(q.Len()).To(Equal(1))
1017+
item, _ := q.Get()
1018+
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
1019+
})
1020+
})
9011021
})
9021022

9031023
type fakePriorityQueue struct {

pkg/internal/controller/controller_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ var _ = Describe("controller", func() {
266266

267267
ins := source.Channel(
268268
ch,
269-
handler.Funcs{
269+
handler.TypedFuncs[client.Object, reconcile.Request]{
270270
GenericFunc: func(ctx context.Context, evt event.GenericEvent, q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
271271
defer GinkgoRecover()
272272
close(processed)

0 commit comments

Comments
 (0)