Skip to content

Commit 7aa17da

Browse files
committed
implement priority queue for handlers
Signed-off-by: Troy Connor <[email protected]>
1 parent ba53477 commit 7aa17da

7 files changed

+216
-38
lines changed

pkg/handler/enqueue.go

+13-6
Original file line numberDiff line numberDiff line change
@@ -52,25 +52,32 @@ func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.
5252
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
5353
return
5454
}
55-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
55+
56+
item := reconcile.Request{NamespacedName: types.NamespacedName{
5657
Name: evt.Object.GetName(),
5758
Namespace: evt.Object.GetNamespace(),
58-
}})
59+
}}
60+
61+
addToQueueCreate(q, evt, item)
5962
}
6063

6164
// Update implements EventHandler.
6265
func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
6366
switch {
6467
case !isNil(evt.ObjectNew):
65-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
68+
item := reconcile.Request{NamespacedName: types.NamespacedName{
6669
Name: evt.ObjectNew.GetName(),
6770
Namespace: evt.ObjectNew.GetNamespace(),
68-
}})
71+
}}
72+
73+
addToQueueUpdate(q, evt, item)
6974
case !isNil(evt.ObjectOld):
70-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
75+
item := reconcile.Request{NamespacedName: types.NamespacedName{
7176
Name: evt.ObjectOld.GetName(),
7277
Namespace: evt.ObjectOld.GetNamespace(),
73-
}})
78+
}}
79+
80+
addToQueueUpdate(q, evt, item)
7481
default:
7582
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
7683
}

pkg/handler/enqueue_mapped.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type TypedMapFunc[object any, request comparable] func(context.Context, object)
4646
// For UpdateEvents which contain both a new and old object, the transformation function is run on both
4747
// objects and both sets of Requests are enqueue.
4848
func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler {
49-
return TypedEnqueueRequestsFromMapFunc(fn)
49+
return WithLowPriorityWhenUnchanged(TypedEnqueueRequestsFromMapFunc(fn))
5050
}
5151

5252
// TypedEnqueueRequestsFromMapFunc enqueues Requests by running a transformation function that outputs a collection

pkg/handler/enqueue_owner.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ type OwnerOption func(e enqueueRequestForOwnerInterface)
4848
//
4949
// - a handler.enqueueRequestForOwner EventHandler with an OwnerType of ReplicaSet and OnlyControllerOwner set to true.
5050
func EnqueueRequestForOwner(scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) EventHandler {
51-
return TypedEnqueueRequestForOwner[client.Object](scheme, mapper, ownerType, opts...)
51+
return WithLowPriorityWhenUnchanged(TypedEnqueueRequestForOwner[client.Object](scheme, mapper, ownerType, opts...))
5252
}
5353

5454
// TypedEnqueueRequestForOwner enqueues Requests for the Owners of an object. E.g. the object that created

pkg/handler/eventhandler.go

+74-23
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ package handler
1818

1919
import (
2020
"context"
21+
"reflect"
2122
"time"
2223

24+
"k8s.io/apimachinery/pkg/types"
2325
"k8s.io/client-go/util/workqueue"
2426
"sigs.k8s.io/controller-runtime/pkg/client"
2527
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
@@ -65,7 +67,7 @@ type EventHandler = TypedEventHandler[client.Object, reconcile.Request]
6567
//
6668
// Unless you are implementing your own TypedEventHandler, you can ignore the functions on the TypedEventHandler interface.
6769
// Most users shouldn't need to implement their own TypedEventHandler.
68-
//
70+
6971
// TypedEventHandler is experimental and subject to future change.
7072
type TypedEventHandler[object any, request comparable] interface {
7173
// Create is called in response to a create event - e.g. Pod Creation.
@@ -111,7 +113,19 @@ type TypedFuncs[object any, request comparable] struct {
111113
// Create implements EventHandler.
112114
func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCreateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
113115
if h.CreateFunc != nil {
114-
h.CreateFunc(ctx, e, q)
116+
if reflect.TypeFor[request]() != reflect.TypeOf(reconcile.Request{}) || !reflect.TypeFor[object]().Implements(reflect.TypeFor[client.Object]()) {
117+
h.CreateFunc(ctx, e, q)
118+
}
119+
120+
wq, ok := q.(workqueue.TypedRateLimitingInterface[reconcile.Request])
121+
if ok {
122+
evt := any(e).(event.TypedCreateEvent[client.Object])
123+
item := reconcile.Request{NamespacedName: types.NamespacedName{
124+
Name: evt.Object.GetName(),
125+
Namespace: evt.Object.GetNamespace(),
126+
}}
127+
addToQueueCreate(wq, evt, item)
128+
}
115129
}
116130
}
117131

@@ -125,7 +139,30 @@ func (h TypedFuncs[object, request]) Delete(ctx context.Context, e event.TypedDe
125139
// Update implements EventHandler.
126140
func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
127141
if h.UpdateFunc != nil {
128-
h.UpdateFunc(ctx, e, q)
142+
if reflect.TypeFor[request]() != reflect.TypeOf(reconcile.Request{}) || !reflect.TypeFor[object]().Implements(reflect.TypeFor[client.Object]()) {
143+
h.UpdateFunc(ctx, e, q)
144+
}
145+
146+
wq, ok := q.(workqueue.TypedRateLimitingInterface[reconcile.Request])
147+
if ok {
148+
evt := any(e).(event.TypedUpdateEvent[client.Object])
149+
switch {
150+
case !isNil(evt.ObjectNew):
151+
item := reconcile.Request{NamespacedName: types.NamespacedName{
152+
Name: evt.ObjectNew.GetName(),
153+
Namespace: evt.ObjectNew.GetNamespace(),
154+
}}
155+
addToQueueUpdate(wq, evt, item)
156+
case !isNil(evt.ObjectOld):
157+
item := reconcile.Request{NamespacedName: types.NamespacedName{
158+
Name: evt.ObjectOld.GetName(),
159+
Namespace: evt.ObjectOld.GetNamespace(),
160+
}}
161+
addToQueueUpdate(wq, evt, item)
162+
default:
163+
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
164+
}
165+
}
129166
}
130167
}
131168

@@ -149,33 +186,15 @@ func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u Ty
149186
u.Create(ctx, tce, workqueueWithCustomAddFunc[request]{
150187
TypedRateLimitingInterface: trli,
151188
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
152-
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
153-
if !isPriorityQueue {
154-
q.Add(item)
155-
return
156-
}
157-
var priority int
158-
if isObjectUnchanged(tce) {
159-
priority = LowPriority
160-
}
161-
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
189+
addToQueueCreate(q, tce, item)
162190
},
163191
})
164192
},
165193
UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
166194
u.Update(ctx, tue, workqueueWithCustomAddFunc[request]{
167195
TypedRateLimitingInterface: trli,
168196
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
169-
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
170-
if !isPriorityQueue {
171-
q.Add(item)
172-
return
173-
}
174-
var priority int
175-
if tue.ObjectOld.GetResourceVersion() == tue.ObjectNew.GetResourceVersion() {
176-
priority = LowPriority
177-
}
178-
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
197+
addToQueueUpdate(q, tue, item)
179198
},
180199
})
181200
},
@@ -199,3 +218,35 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) {
199218
func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool {
200219
return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute))
201220
}
221+
222+
// addToQueueCreate adds the reconcile.Request to the priorityqueue in the handler
223+
// for Create requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
224+
func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedCreateEvent[T], item request) {
225+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
226+
if !isPriorityQueue {
227+
q.Add(item)
228+
return
229+
}
230+
231+
var priority int
232+
if isObjectUnchanged(evt) {
233+
priority = LowPriority
234+
}
235+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
236+
}
237+
238+
// addToQueueUpdate adds the reconcile.Request to the priorityqueue in the handler
239+
// for Update requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
240+
func addToQueueUpdate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedUpdateEvent[T], item request) {
241+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
242+
if !isPriorityQueue {
243+
q.Add(item)
244+
return
245+
}
246+
247+
var priority int
248+
if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() {
249+
priority = LowPriority
250+
}
251+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
252+
}

pkg/handler/eventhandler_test.go

+122-2
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ var _ = Describe("Eventhandler", func() {
659659
})
660660

661661
Describe("Funcs", func() {
662-
failingFuncs := handler.Funcs{
662+
failingFuncs := handler.TypedFuncs[client.Object, reconcile.Request]{
663663
CreateFunc: func(context.Context, event.CreateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) {
664664
defer GinkgoRecover()
665665
Fail("Did not expect CreateEvent to be called.")
@@ -797,6 +797,27 @@ var _ = Describe("Eventhandler", func() {
797797
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
798798
})
799799

800+
It("should lower the priority of a create request for an object that was created more than one minute in the past without the WithLowPriorityWrapper", func() {
801+
actualOpts := priorityqueue.AddOpts{}
802+
var actualRequests []reconcile.Request
803+
wq := &fakePriorityQueue{
804+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
805+
actualOpts = o
806+
actualRequests = items
807+
},
808+
}
809+
810+
h := &handler.EnqueueRequestForObject{}
811+
h.Create(ctx, event.CreateEvent{
812+
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
813+
Name: "my-pod",
814+
}},
815+
}, wq)
816+
817+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
818+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
819+
})
820+
800821
It("should not lower the priority of a create request for an object that was created less than one minute in the past", func() {
801822
actualOpts := priorityqueue.AddOpts{}
802823
var actualRequests []reconcile.Request
@@ -819,6 +840,28 @@ var _ = Describe("Eventhandler", func() {
819840
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
820841
})
821842

843+
It("should not lower the priority of a create request for an object that was created less than one minute in the past without the WithLowPriority wrapperr", func() {
844+
actualOpts := priorityqueue.AddOpts{}
845+
var actualRequests []reconcile.Request
846+
wq := &fakePriorityQueue{
847+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
848+
actualOpts = o
849+
actualRequests = items
850+
},
851+
}
852+
853+
h := &handler.EnqueueRequestForObject{}
854+
h.Create(ctx, event.CreateEvent{
855+
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
856+
Name: "my-pod",
857+
CreationTimestamp: metav1.Now(),
858+
}},
859+
}, wq)
860+
861+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
862+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
863+
})
864+
822865
It("should lower the priority of an update request with unchanged RV", func() {
823866
actualOpts := priorityqueue.AddOpts{}
824867
var actualRequests []reconcile.Request
@@ -843,6 +886,30 @@ var _ = Describe("Eventhandler", func() {
843886
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
844887
})
845888

889+
It("should lower the priority of an update request with unchanged RV without the WithLowPriority wrapper", func() {
890+
actualOpts := priorityqueue.AddOpts{}
891+
var actualRequests []reconcile.Request
892+
wq := &fakePriorityQueue{
893+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
894+
actualOpts = o
895+
actualRequests = items
896+
},
897+
}
898+
899+
h := &handler.EnqueueRequestForObject{}
900+
h.Update(ctx, event.UpdateEvent{
901+
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
902+
Name: "my-pod",
903+
}},
904+
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
905+
Name: "my-pod",
906+
}},
907+
}, wq)
908+
909+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
910+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
911+
})
912+
846913
It("should not lower the priority of an update request with changed RV", func() {
847914
actualOpts := priorityqueue.AddOpts{}
848915
var actualRequests []reconcile.Request
@@ -868,6 +935,31 @@ var _ = Describe("Eventhandler", func() {
868935
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
869936
})
870937

938+
It("should not lower the priority of an update request with changed RV without the WithLowPriority wrapper", func() {
939+
actualOpts := priorityqueue.AddOpts{}
940+
var actualRequests []reconcile.Request
941+
wq := &fakePriorityQueue{
942+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
943+
actualOpts = o
944+
actualRequests = items
945+
},
946+
}
947+
948+
h := &handler.EnqueueRequestForObject{}
949+
h.Update(ctx, event.UpdateEvent{
950+
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
951+
Name: "my-pod",
952+
}},
953+
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
954+
Name: "my-pod",
955+
ResourceVersion: "1",
956+
}},
957+
}, wq)
958+
959+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
960+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
961+
})
962+
871963
It("should have no effect on create if the workqueue is not a priorityqueue", func() {
872964
h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
873965
h.Create(ctx, event.CreateEvent{
@@ -881,6 +973,19 @@ var _ = Describe("Eventhandler", func() {
881973
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
882974
})
883975

976+
It("should have no effect on create if the workqueue is not a priorityqueue without the WithLowPriority wrapper", func() {
977+
h := &handler.EnqueueRequestForObject{}
978+
h.Create(ctx, event.CreateEvent{
979+
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
980+
Name: "my-pod",
981+
}},
982+
}, q)
983+
984+
Expect(q.Len()).To(Equal(1))
985+
item, _ := q.Get()
986+
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
987+
})
988+
884989
It("should have no effect on Update if the workqueue is not a priorityqueue", func() {
885990
h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
886991
h.Update(ctx, event.UpdateEvent{
@@ -896,8 +1001,23 @@ var _ = Describe("Eventhandler", func() {
8961001
item, _ := q.Get()
8971002
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
8981003
})
899-
})
9001004

1005+
It("should have no effect on Update if the workqueue is not a priorityqueue without the WithLowPriority wrapper", func() {
1006+
h := &handler.EnqueueRequestForObject{}
1007+
h.Update(ctx, event.UpdateEvent{
1008+
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
1009+
Name: "my-pod",
1010+
}},
1011+
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
1012+
Name: "my-pod",
1013+
}},
1014+
}, q)
1015+
1016+
Expect(q.Len()).To(Equal(1))
1017+
item, _ := q.Get()
1018+
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
1019+
})
1020+
})
9011021
})
9021022

9031023
type fakePriorityQueue struct {

pkg/internal/source/internal_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ var _ = Describe("Internal", func() {
6262
},
6363
}
6464

65-
setfuncs = &handler.Funcs{
65+
setfuncs = &handler.TypedFuncs[client.Object, reconcile.Request]{
6666
CreateFunc: func(context.Context, event.CreateEvent, workqueue.TypedRateLimitingInterface[reconcile.Request]) {
6767
set = true
6868
},

0 commit comments

Comments
 (0)