Skip to content

Commit a3bd1c0

Browse files
committed
implement priority queue for handlers
Signed-off-by: Troy Connor <[email protected]>
1 parent ba53477 commit a3bd1c0

9 files changed

+270
-44
lines changed

pkg/handler/enqueue.go

+24-6
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"k8s.io/apimachinery/pkg/types"
2424
"k8s.io/client-go/util/workqueue"
2525
"sigs.k8s.io/controller-runtime/pkg/client"
26+
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
2627
"sigs.k8s.io/controller-runtime/pkg/event"
2728
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
2829
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -52,25 +53,42 @@ func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.
5253
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
5354
return
5455
}
55-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
56+
57+
item := reconcile.Request{NamespacedName: types.NamespacedName{
5658
Name: evt.Object.GetName(),
5759
Namespace: evt.Object.GetNamespace(),
58-
}})
60+
}}
61+
62+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[reconcile.Request])
63+
if !isPriorityQueue {
64+
q.Add(item)
65+
return
66+
}
67+
addToQueueCreate(priorityQueue, evt, item)
5968
}
6069

6170
// Update implements EventHandler.
6271
func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
6372
switch {
6473
case !isNil(evt.ObjectNew):
65-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
74+
item := reconcile.Request{NamespacedName: types.NamespacedName{
6675
Name: evt.ObjectNew.GetName(),
6776
Namespace: evt.ObjectNew.GetNamespace(),
68-
}})
77+
}}
78+
79+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[reconcile.Request])
80+
if !isPriorityQueue {
81+
q.Add(item)
82+
return
83+
}
84+
addToQueueUpdate(priorityQueue, evt, item)
6985
case !isNil(evt.ObjectOld):
70-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
86+
item := reconcile.Request{NamespacedName: types.NamespacedName{
7187
Name: evt.ObjectOld.GetName(),
7288
Namespace: evt.ObjectOld.GetNamespace(),
73-
}})
89+
}}
90+
91+
addToQueueUpdate(q, evt, item)
7492
default:
7593
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
7694
}

pkg/handler/enqueue_mapped.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type TypedMapFunc[object any, request comparable] func(context.Context, object)
4646
// For UpdateEvents which contain both a new and old object, the transformation function is run on both
4747
// objects and both sets of Requests are enqueue.
4848
func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler {
49-
return TypedEnqueueRequestsFromMapFunc(fn)
49+
return WithLowPriorityWhenUnchanged(TypedEnqueueRequestsFromMapFunc(fn))
5050
}
5151

5252
// TypedEnqueueRequestsFromMapFunc enqueues Requests by running a transformation function that outputs a collection

pkg/handler/enqueue_owner.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ type OwnerOption func(e enqueueRequestForOwnerInterface)
4848
//
4949
// - a handler.enqueueRequestForOwner EventHandler with an OwnerType of ReplicaSet and OnlyControllerOwner set to true.
5050
func EnqueueRequestForOwner(scheme *runtime.Scheme, mapper meta.RESTMapper, ownerType client.Object, opts ...OwnerOption) EventHandler {
51-
return TypedEnqueueRequestForOwner[client.Object](scheme, mapper, ownerType, opts...)
51+
return WithLowPriorityWhenUnchanged(TypedEnqueueRequestForOwner[client.Object](scheme, mapper, ownerType, opts...))
5252
}
5353

5454
// TypedEnqueueRequestForOwner enqueues Requests for the Owners of an object. E.g. the object that created

pkg/handler/eventhandler.go

+105-17
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"time"
2222

23+
"k8s.io/apimachinery/pkg/types"
2324
"k8s.io/client-go/util/workqueue"
2425
"sigs.k8s.io/controller-runtime/pkg/client"
2526
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
@@ -82,10 +83,78 @@ type TypedEventHandler[object any, request comparable] interface {
8283
Generic(context.Context, event.TypedGenericEvent[object], workqueue.TypedRateLimitingInterface[request])
8384
}
8485

85-
var _ EventHandler = Funcs{}
86+
var _ EventHandler = &Funcs{}
8687

8788
// Funcs implements eventhandler.
88-
type Funcs = TypedFuncs[client.Object, reconcile.Request]
89+
type Funcs = funcs[client.Object, reconcile.Request]
90+
91+
// funcs implements eventhandler
92+
type funcs[object client.Object, request reconcile.Request] struct{}
93+
94+
func (f *funcs[T, R]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
95+
if isNil(evt.Object) {
96+
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
97+
return
98+
}
99+
100+
item := reconcile.Request{NamespacedName: types.NamespacedName{
101+
Name: evt.Object.GetName(),
102+
Namespace: evt.Object.GetNamespace(),
103+
}}
104+
105+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[reconcile.Request])
106+
if !isPriorityQueue {
107+
q.Add(item)
108+
return
109+
}
110+
addToQueueCreate(priorityQueue, evt, item)
111+
}
112+
113+
// Update implements EventHandler.
114+
func (f *funcs[T, R]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
115+
switch {
116+
case !isNil(evt.ObjectNew):
117+
item := reconcile.Request{NamespacedName: types.NamespacedName{
118+
Name: evt.ObjectNew.GetName(),
119+
Namespace: evt.ObjectNew.GetNamespace(),
120+
}}
121+
122+
addToQueueUpdate(q, evt, item)
123+
case !isNil(evt.ObjectOld):
124+
item := reconcile.Request{NamespacedName: types.NamespacedName{
125+
Name: evt.ObjectOld.GetName(),
126+
Namespace: evt.ObjectOld.GetNamespace(),
127+
}}
128+
129+
addToQueueUpdate(q, evt, item)
130+
default:
131+
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
132+
}
133+
}
134+
135+
// Delete implements EventHandler.
136+
func (f *funcs[T, R]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
137+
if isNil(evt.Object) {
138+
enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
139+
return
140+
}
141+
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
142+
Name: evt.Object.GetName(),
143+
Namespace: evt.Object.GetNamespace(),
144+
}})
145+
}
146+
147+
// Generic implements EventHandler.
148+
func (f *funcs[T, R]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
149+
if isNil(evt.Object) {
150+
enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt)
151+
return
152+
}
153+
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
154+
Name: evt.Object.GetName(),
155+
Namespace: evt.Object.GetNamespace(),
156+
}})
157+
}
89158

90159
// TypedFuncs implements eventhandler.
91160
//
@@ -154,28 +223,15 @@ func WithLowPriorityWhenUnchanged[object client.Object, request comparable](u Ty
154223
q.Add(item)
155224
return
156225
}
157-
var priority int
158-
if isObjectUnchanged(tce) {
159-
priority = LowPriority
160-
}
161-
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
226+
addToQueueCreate(priorityQueue, tce, item)
162227
},
163228
})
164229
},
165230
UpdateFunc: func(ctx context.Context, tue event.TypedUpdateEvent[object], trli workqueue.TypedRateLimitingInterface[request]) {
166231
u.Update(ctx, tue, workqueueWithCustomAddFunc[request]{
167232
TypedRateLimitingInterface: trli,
168233
addFunc: func(item request, q workqueue.TypedRateLimitingInterface[request]) {
169-
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
170-
if !isPriorityQueue {
171-
q.Add(item)
172-
return
173-
}
174-
var priority int
175-
if tue.ObjectOld.GetResourceVersion() == tue.ObjectNew.GetResourceVersion() {
176-
priority = LowPriority
177-
}
178-
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
234+
addToQueueUpdate(q, tue, item)
179235
},
180236
})
181237
},
@@ -199,3 +255,35 @@ func (w workqueueWithCustomAddFunc[request]) Add(item request) {
199255
func isObjectUnchanged[object client.Object](e event.TypedCreateEvent[object]) bool {
200256
return e.Object.GetCreationTimestamp().Time.Before(time.Now().Add(-time.Minute))
201257
}
258+
259+
// addToQueueCreate adds the reconcile.Request to the priorityqueue in the handler
260+
// for Create requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
261+
func addToQueueCreate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedCreateEvent[T], item request) {
262+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
263+
if !isPriorityQueue {
264+
q.Add(item)
265+
return
266+
}
267+
268+
var priority int
269+
if isObjectUnchanged(evt) {
270+
priority = LowPriority
271+
}
272+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
273+
}
274+
275+
// addToQueueUpdate adds the reconcile.Request to the priorityqueue in the handler
276+
// for Update requests if and only if the workqueue being used is of type priorityqueue.PriorityQueue[reconcile.Request]
277+
func addToQueueUpdate[T client.Object, request comparable](q workqueue.TypedRateLimitingInterface[request], evt event.TypedUpdateEvent[T], item request) {
278+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
279+
if !isPriorityQueue {
280+
q.Add(item)
281+
return
282+
}
283+
284+
var priority int
285+
if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() {
286+
priority = LowPriority
287+
}
288+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
289+
}

0 commit comments

Comments
 (0)