Skip to content

Commit 4e82bce

Browse files
committed
implement priority queue
Signed-off-by: Troy Connor <[email protected]>
1 parent 8cb0a52 commit 4e82bce

File tree

1 file changed

+3
-54
lines changed

1 file changed

+3
-54
lines changed

pkg/handler/enqueue_mapped.go

+3-54
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,8 @@ package handler
1919
import (
2020
"context"
2121

22-
"k8s.io/apimachinery/pkg/types"
2322
"k8s.io/client-go/util/workqueue"
2423
"sigs.k8s.io/controller-runtime/pkg/client"
25-
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
2624
"sigs.k8s.io/controller-runtime/pkg/event"
2725
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2826
)
@@ -48,7 +46,7 @@ type TypedMapFunc[object any, request comparable] func(context.Context, object)
4846
// For UpdateEvents which contain both a new and old object, the transformation function is run on both
4947
// objects and both sets of Requests are enqueue.
5048
func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler {
51-
return WithLowPriorityWhenUnchanged(TypedEnqueueRequestsFromMapFunc(fn))
49+
return TypedEnqueueRequestsFromMapFunc(fn)
5250
}
5351

5452
// TypedEnqueueRequestsFromMapFunc enqueues Requests by running a transformation function that outputs a collection
@@ -82,23 +80,8 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Create(
8280
evt event.TypedCreateEvent[object],
8381
q workqueue.TypedRateLimitingInterface[request],
8482
) {
85-
wq, ok := q.(workqueue.TypedRateLimitingInterface[reconcile.Request])
86-
if ok {
87-
evnt, ok := any(e).(event.TypedCreateEvent[client.Object])
88-
if ok {
89-
priorityQueue, isPriorityQueue := wq.(priorityqueue.PriorityQueue[reconcile.Request])
90-
if isPriorityQueue {
91-
item := reconcile.Request{NamespacedName: types.NamespacedName{
92-
Name: evnt.Object.GetName(),
93-
Namespace: evnt.Object.GetNamespace(),
94-
}}
95-
addToQueueCreate(priorityQueue, evnt, item)
96-
return
97-
}
98-
reqs := map[request]empty{}
99-
e.mapAndEnqueue(ctx, q, evt.Object, reqs)
100-
}
101-
}
83+
reqs := map[request]empty{}
84+
e.mapAndEnqueue(ctx, q, evt.Object, reqs)
10285
}
10386

10487
// Update implements EventHandler.
@@ -107,40 +90,6 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Update(
10790
evt event.TypedUpdateEvent[object],
10891
q workqueue.TypedRateLimitingInterface[request],
10992
) {
110-
wq, ok := q.(workqueue.TypedRateLimitingInterface[reconcile.Request])
111-
if ok {
112-
evnt, ok := any(e).(event.TypedUpdateEvent[client.Object])
113-
if ok {
114-
switch {
115-
case !isNil(evt.ObjectNew):
116-
priorityQueue, isPriorityQueue := wq.(priorityqueue.PriorityQueue[reconcile.Request])
117-
if isPriorityQueue {
118-
item := reconcile.Request{NamespacedName: types.NamespacedName{
119-
Name: evnt.ObjectNew.GetName(),
120-
Namespace: evnt.ObjectNew.GetNamespace(),
121-
}}
122-
addToQueueUpdate(priorityQueue, evnt, item)
123-
return
124-
}
125-
reqs := map[request]empty{}
126-
e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs)
127-
case !isNil(evt.ObjectOld):
128-
priorityQueue, isPriorityQueue := wq.(priorityqueue.PriorityQueue[reconcile.Request])
129-
if isPriorityQueue {
130-
item := reconcile.Request{NamespacedName: types.NamespacedName{
131-
Name: evnt.ObjectOld.GetName(),
132-
Namespace: evnt.ObjectOld.GetNamespace(),
133-
}}
134-
addToQueueUpdate(priorityQueue, evnt, item)
135-
return
136-
}
137-
reqs := map[request]empty{}
138-
e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs)
139-
default:
140-
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
141-
}
142-
}
143-
}
14493
reqs := map[request]empty{}
14594
e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs)
14695
e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs)

0 commit comments

Comments
 (0)