Skip to content

Commit 2b4e75d

Browse files
committed
implement priority queue on enqueueRequestFromMapFunc handler
Signed-off-by: Troy Connor <[email protected]>
1 parent 753b14a commit 2b4e75d

File tree

1 file changed

+45
-3
lines changed

1 file changed

+45
-3
lines changed

pkg/handler/enqueue_mapped.go

+45-3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"k8s.io/client-go/util/workqueue"
2323
"sigs.k8s.io/controller-runtime/pkg/client"
24+
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
2425
"sigs.k8s.io/controller-runtime/pkg/event"
2526
"sigs.k8s.io/controller-runtime/pkg/reconcile"
2627
)
@@ -81,7 +82,12 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Create(
8182
q workqueue.TypedRateLimitingInterface[request],
8283
) {
8384
reqs := map[request]empty{}
84-
e.mapAndEnqueue(ctx, q, evt.Object, reqs)
85+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
86+
if !isPriorityQueue {
87+
e.mapAndEnqueue(ctx, q, evt.Object, reqs)
88+
return
89+
}
90+
e.mapAndEnqueuePriorityCreate(ctx, priorityQueue, evt.Object, reqs)
8591
}
8692

8793
// Update implements EventHandler.
@@ -90,9 +96,14 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Update(
9096
evt event.TypedUpdateEvent[object],
9197
q workqueue.TypedRateLimitingInterface[request],
9298
) {
99+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[request])
93100
reqs := map[request]empty{}
94-
e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs)
95-
e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs)
101+
if !isPriorityQueue {
102+
e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs)
103+
e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs)
104+
return
105+
}
106+
e.mapAndEnqueuePriorityUpdate(ctx, priorityQueue, evt.ObjectOld, evt.ObjectNew, reqs)
96107
}
97108

98109
// Delete implements EventHandler.
@@ -124,3 +135,34 @@ func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue(ctx context.
124135
}
125136
}
126137
}
138+
139+
func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueuePriorityCreate(ctx context.Context, q priorityqueue.PriorityQueue[request], o object, reqs map[request]empty) {
140+
for _, req := range e.toRequests(ctx, o) {
141+
var priority int
142+
obj := any(o).(client.Object)
143+
evtObj := event.TypedCreateEvent[client.Object]{
144+
Object: obj,
145+
}
146+
if isObjectUnchanged(evtObj) {
147+
priority = LowPriority
148+
}
149+
150+
q.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, req)
151+
reqs[req] = empty{}
152+
}
153+
}
154+
155+
func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueuePriorityUpdate(ctx context.Context, q priorityqueue.PriorityQueue[request], oldobj object, newobj object, reqs map[request]empty) {
156+
list := e.toRequests(ctx, oldobj)
157+
list = append(list, e.toRequests(ctx, newobj)...)
158+
for _, req := range list {
159+
var priority int
160+
objnew := any(oldobj).(client.Object)
161+
objold := any(newobj).(client.Object)
162+
if objnew.GetResourceVersion() == objold.GetResourceVersion() {
163+
priority = LowPriority
164+
}
165+
q.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, req)
166+
reqs[req] = empty{}
167+
}
168+
}

0 commit comments

Comments
 (0)