Skip to content

Commit 1d0e50b

Browse files
committed
implement priority queue
Signed-off-by: Troy Connor <[email protected]>
1 parent 3086f3d commit 1d0e50b

File tree

3 files changed

+5
-48
lines changed

3 files changed

+5
-48
lines changed

pkg/handler/enqueue_mapped.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type TypedMapFunc[object any, request comparable] func(context.Context, object)
4646
// For UpdateEvents which contain both a new and old object, the transformation function is run on both
4747
// objects and both sets of Requests are enqueue.
4848
func EnqueueRequestsFromMapFunc(fn MapFunc) EventHandler {
49-
return WithLowPriorityWhenUnchanged(TypedEnqueueRequestsFromMapFunc(fn))
49+
return TypedEnqueueRequestsFromMapFunc(fn)
5050
}
5151

5252
// TypedEnqueueRequestsFromMapFunc enqueues Requests by running a transformation function that outputs a collection

pkg/handler/eventhandler.go

+2-45
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@ package handler
1818

1919
import (
2020
"context"
21-
"reflect"
2221
"time"
2322

24-
"k8s.io/apimachinery/pkg/types"
2523
"k8s.io/client-go/util/workqueue"
2624
"sigs.k8s.io/controller-runtime/pkg/client"
2725
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
@@ -113,22 +111,7 @@ type TypedFuncs[object any, request comparable] struct {
113111
// Create implements EventHandler.
114112
func (h TypedFuncs[object, request]) Create(ctx context.Context, e event.TypedCreateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
115113
if h.CreateFunc != nil {
116-
if reflect.TypeFor[request]() != reflect.TypeOf(reconcile.Request{}) || !reflect.TypeFor[object]().Implements(reflect.TypeFor[client.Object]()) {
117-
h.CreateFunc(ctx, e, q)
118-
}
119-
120-
wq, ok := q.(workqueue.TypedRateLimitingInterface[reconcile.Request])
121-
if ok {
122-
evt, ok := any(e).(event.TypedCreateEvent[client.Object])
123-
if ok {
124-
item := reconcile.Request{NamespacedName: types.NamespacedName{
125-
Name: evt.Object.GetName(),
126-
Namespace: evt.Object.GetNamespace(),
127-
}}
128-
addToQueueCreate(wq, evt, item)
129-
}
130-
h.CreateFunc(ctx, e, q)
131-
}
114+
h.CreateFunc(ctx, e, q)
132115
}
133116
}
134117

@@ -142,33 +125,7 @@ func (h TypedFuncs[object, request]) Delete(ctx context.Context, e event.TypedDe
142125
// Update implements EventHandler.
143126
func (h TypedFuncs[object, request]) Update(ctx context.Context, e event.TypedUpdateEvent[object], q workqueue.TypedRateLimitingInterface[request]) {
144127
if h.UpdateFunc != nil {
145-
if reflect.TypeFor[request]() != reflect.TypeOf(reconcile.Request{}) || !reflect.TypeFor[object]().Implements(reflect.TypeFor[client.Object]()) {
146-
h.UpdateFunc(ctx, e, q)
147-
}
148-
149-
wq, ok := q.(workqueue.TypedRateLimitingInterface[reconcile.Request])
150-
if ok {
151-
evt, ok := any(e).(event.TypedUpdateEvent[client.Object])
152-
if ok {
153-
switch {
154-
case !isNil(evt.ObjectNew):
155-
item := reconcile.Request{NamespacedName: types.NamespacedName{
156-
Name: evt.ObjectNew.GetName(),
157-
Namespace: evt.ObjectNew.GetNamespace(),
158-
}}
159-
addToQueueUpdate(wq, evt, item)
160-
case !isNil(evt.ObjectOld):
161-
item := reconcile.Request{NamespacedName: types.NamespacedName{
162-
Name: evt.ObjectOld.GetName(),
163-
Namespace: evt.ObjectOld.GetNamespace(),
164-
}}
165-
addToQueueUpdate(wq, evt, item)
166-
default:
167-
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
168-
}
169-
}
170-
h.UpdateFunc(ctx, e, q)
171-
}
128+
h.UpdateFunc(ctx, e, q)
172129
}
173130
}
174131

pkg/internal/source/internal_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,15 @@ var _ = Describe("Internal", func() {
101101
})
102102

103103
It("should used Predicates to filter CreateEvents", func() {
104-
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
104+
instance = internal.NewEventHandler(ctx, &controllertest.Queue{TypedInterface: workqueue.NewTyped[reconcile.Request]()}, setfuncs, []predicate.Predicate{
105105
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return false }},
106106
})
107107
set = false
108108
instance.OnAdd(pod)
109109
Expect(set).To(BeFalse())
110110

111111
set = false
112-
instance = internal.NewEventHandler(ctx, &controllertest.Queue{}, setfuncs, []predicate.Predicate{
112+
instance = internal.NewEventHandler(ctx, &controllertest.Queue{TypedInterface: workqueue.NewTyped[reconcile.Request]()}, setfuncs, []predicate.Predicate{
113113
predicate.Funcs{CreateFunc: func(event.CreateEvent) bool { return true }},
114114
})
115115
instance.OnAdd(pod)

0 commit comments

Comments
 (0)