Skip to content

Commit a71f2cb

Browse files
committed
implement priority queue
Signed-off-by: Troy Connor <[email protected]>
1 parent 1e01bc7 commit a71f2cb

File tree

1 file changed

+70
-53
lines changed

1 file changed

+70
-53
lines changed

pkg/handler/enqueue_mapped.go

+70-53
Original file line numberDiff line numberDiff line change
@@ -82,23 +82,8 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Create(
8282
evt event.TypedCreateEvent[object],
8383
q workqueue.TypedRateLimitingInterface[request],
8484
) {
85-
wq, ok := q.(workqueue.TypedRateLimitingInterface[reconcile.Request])
86-
if ok {
87-
evnt, ok := any(e).(event.TypedCreateEvent[client.Object])
88-
if ok {
89-
priorityQueue, isPriorityQueue := wq.(priorityqueue.PriorityQueue[reconcile.Request])
90-
if isPriorityQueue {
91-
item := reconcile.Request{NamespacedName: types.NamespacedName{
92-
Name: evnt.Object.GetName(),
93-
Namespace: evnt.Object.GetNamespace(),
94-
}}
95-
addToQueueCreate(priorityQueue, evnt, item)
96-
return
97-
}
98-
reqs := map[request]empty{}
99-
e.mapAndEnqueue(ctx, q, evt.Object, reqs)
100-
}
101-
}
85+
reqs := map[request]empty{}
86+
e.mapAndEnqueueCreate(ctx, q, evt.Object, reqs)
10287
}
10388

10489
// Update implements EventHandler.
@@ -107,43 +92,9 @@ func (e *enqueueRequestsFromMapFunc[object, request]) Update(
10792
evt event.TypedUpdateEvent[object],
10893
q workqueue.TypedRateLimitingInterface[request],
10994
) {
110-
wq, ok := q.(workqueue.TypedRateLimitingInterface[reconcile.Request])
111-
if ok {
112-
evnt, ok := any(e).(event.TypedUpdateEvent[client.Object])
113-
if ok {
114-
switch {
115-
case !isNil(evt.ObjectNew):
116-
priorityQueue, isPriorityQueue := wq.(priorityqueue.PriorityQueue[reconcile.Request])
117-
if isPriorityQueue {
118-
item := reconcile.Request{NamespacedName: types.NamespacedName{
119-
Name: evnt.ObjectNew.GetName(),
120-
Namespace: evnt.ObjectNew.GetNamespace(),
121-
}}
122-
addToQueueUpdate(priorityQueue, evnt, item)
123-
return
124-
}
125-
reqs := map[request]empty{}
126-
e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs)
127-
case !isNil(evt.ObjectOld):
128-
priorityQueue, isPriorityQueue := wq.(priorityqueue.PriorityQueue[reconcile.Request])
129-
if isPriorityQueue {
130-
item := reconcile.Request{NamespacedName: types.NamespacedName{
131-
Name: evnt.ObjectOld.GetName(),
132-
Namespace: evnt.ObjectOld.GetNamespace(),
133-
}}
134-
addToQueueUpdate(priorityQueue, evnt, item)
135-
return
136-
}
137-
reqs := map[request]empty{}
138-
e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs)
139-
default:
140-
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
141-
}
142-
}
143-
}
14495
reqs := map[request]empty{}
145-
e.mapAndEnqueue(ctx, q, evt.ObjectOld, reqs)
146-
e.mapAndEnqueue(ctx, q, evt.ObjectNew, reqs)
96+
e.mapAndEnqueueUpdate(ctx, q, evt.ObjectOld, reqs)
97+
e.mapAndEnqueueUpdate(ctx, q, evt.ObjectNew, reqs)
14798
}
14899

149100
// Delete implements EventHandler.
@@ -170,8 +121,74 @@ func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueue(ctx context.
170121
for _, req := range e.toRequests(ctx, o) {
171122
_, ok := reqs[req]
172123
if !ok {
124+
reqs := map[request]empty{}
173125
q.Add(req)
174126
reqs[req] = empty{}
175127
}
176128
}
177129
}
130+
131+
func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueueCreate(ctx context.Context, q workqueue.TypedRateLimitingInterface[request], o object, reqs map[request]empty) {
132+
for _, req := range e.toRequests(ctx, o) {
133+
_, ok := reqs[req]
134+
if !ok {
135+
wq, ok := q.(workqueue.TypedRateLimitingInterface[reconcile.Request])
136+
if ok {
137+
evnt, ok := any(e).(event.TypedCreateEvent[client.Object])
138+
if ok {
139+
priorityQueue, isPriorityQueue := wq.(priorityqueue.PriorityQueue[reconcile.Request])
140+
if isPriorityQueue {
141+
item := reconcile.Request{NamespacedName: types.NamespacedName{
142+
Name: evnt.Object.GetName(),
143+
Namespace: evnt.Object.GetNamespace(),
144+
}}
145+
addToQueueCreate(priorityQueue, evnt, item)
146+
continue
147+
}
148+
}
149+
reqs := map[request]empty{}
150+
q.Add(req)
151+
reqs[req] = empty{}
152+
}
153+
}
154+
}
155+
}
156+
157+
func (e *enqueueRequestsFromMapFunc[object, request]) mapAndEnqueueUpdate(ctx context.Context, q workqueue.TypedRateLimitingInterface[request], o object, reqs map[request]empty) {
158+
for _, req := range e.toRequests(ctx, o) {
159+
_, ok := reqs[req]
160+
if !ok {
161+
wq, ok := q.(workqueue.TypedRateLimitingInterface[reconcile.Request])
162+
if ok {
163+
evnt, ok := any(e).(event.TypedUpdateEvent[client.Object])
164+
if ok {
165+
priorityQueue, isPriorityQueue := wq.(priorityqueue.PriorityQueue[reconcile.Request])
166+
if !isPriorityQueue {
167+
reqs := map[request]empty{}
168+
q.Add(req)
169+
reqs[req] = empty{}
170+
continue
171+
}
172+
switch {
173+
case !isNil(evnt.ObjectNew):
174+
item := reconcile.Request{NamespacedName: types.NamespacedName{
175+
Name: evnt.ObjectNew.GetName(),
176+
Namespace: evnt.ObjectNew.GetNamespace(),
177+
}}
178+
addToQueueUpdate(priorityQueue, evnt, item)
179+
continue
180+
case !isNil(evnt.ObjectOld):
181+
item := reconcile.Request{NamespacedName: types.NamespacedName{
182+
Name: evnt.ObjectOld.GetName(),
183+
Namespace: evnt.ObjectOld.GetNamespace(),
184+
}}
185+
addToQueueUpdate(priorityQueue, evnt, item)
186+
continue
187+
default:
188+
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evnt)
189+
}
190+
}
191+
}
192+
}
193+
}
194+
}

0 commit comments

Comments
 (0)