Skip to content

Commit cca9e85

Browse files
committed
implement priority queue in EnqueueRequestForObject
Signed-off-by: Troy Connor <[email protected]>
1 parent ba53477 commit cca9e85

File tree

2 files changed

+162
-7
lines changed

2 files changed

+162
-7
lines changed

pkg/handler/enqueue.go

+41-6
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"k8s.io/apimachinery/pkg/types"
2424
"k8s.io/client-go/util/workqueue"
2525
"sigs.k8s.io/controller-runtime/pkg/client"
26+
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
2627
"sigs.k8s.io/controller-runtime/pkg/event"
2728
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
2829
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -52,25 +53,59 @@ func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.
5253
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
5354
return
5455
}
55-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
56+
57+
item := reconcile.Request{NamespacedName: types.NamespacedName{
5658
Name: evt.Object.GetName(),
5759
Namespace: evt.Object.GetNamespace(),
58-
}})
60+
}}
61+
62+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[reconcile.Request])
63+
if !isPriorityQueue {
64+
q.Add(item)
65+
return
66+
}
67+
var priority int
68+
if isObjectUnchanged(evt) {
69+
priority = LowPriority
70+
}
71+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
5972
}
6073

6174
// Update implements EventHandler.
6275
func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.TypedRateLimitingInterface[reconcile.Request]) {
6376
switch {
6477
case !isNil(evt.ObjectNew):
65-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
78+
item := reconcile.Request{NamespacedName: types.NamespacedName{
6679
Name: evt.ObjectNew.GetName(),
6780
Namespace: evt.ObjectNew.GetNamespace(),
68-
}})
81+
}}
82+
83+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[reconcile.Request])
84+
if !isPriorityQueue {
85+
q.Add(item)
86+
return
87+
}
88+
var priority int
89+
if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() {
90+
priority = LowPriority
91+
}
92+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
6993
case !isNil(evt.ObjectOld):
70-
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
94+
item := reconcile.Request{NamespacedName: types.NamespacedName{
7195
Name: evt.ObjectOld.GetName(),
7296
Namespace: evt.ObjectOld.GetNamespace(),
73-
}})
97+
}}
98+
99+
priorityQueue, isPriorityQueue := q.(priorityqueue.PriorityQueue[reconcile.Request])
100+
if !isPriorityQueue {
101+
q.Add(item)
102+
return
103+
}
104+
var priority int
105+
if evt.ObjectOld.GetResourceVersion() == evt.ObjectNew.GetResourceVersion() {
106+
priority = LowPriority
107+
}
108+
priorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: priority}, item)
74109
default:
75110
enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt)
76111
}

pkg/handler/eventhandler_test.go

+121-1
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,27 @@ var _ = Describe("Eventhandler", func() {
797797
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
798798
})
799799

800+
It("should lower the priority of a create request for an object that was created more than one minute in the past without the WithLowPriorityWrapper", func() {
801+
actualOpts := priorityqueue.AddOpts{}
802+
var actualRequests []reconcile.Request
803+
wq := &fakePriorityQueue{
804+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
805+
actualOpts = o
806+
actualRequests = items
807+
},
808+
}
809+
810+
h := &handler.EnqueueRequestForObject{}
811+
h.Create(ctx, event.CreateEvent{
812+
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
813+
Name: "my-pod",
814+
}},
815+
}, wq)
816+
817+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
818+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
819+
})
820+
800821
It("should not lower the priority of a create request for an object that was created less than one minute in the past", func() {
801822
actualOpts := priorityqueue.AddOpts{}
802823
var actualRequests []reconcile.Request
@@ -819,6 +840,28 @@ var _ = Describe("Eventhandler", func() {
819840
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
820841
})
821842

843+
It("should not lower the priority of a create request for an object that was created less than one minute in the past without the WithLowPriority wrapperr", func() {
844+
actualOpts := priorityqueue.AddOpts{}
845+
var actualRequests []reconcile.Request
846+
wq := &fakePriorityQueue{
847+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
848+
actualOpts = o
849+
actualRequests = items
850+
},
851+
}
852+
853+
h := &handler.EnqueueRequestForObject{}
854+
h.Create(ctx, event.CreateEvent{
855+
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
856+
Name: "my-pod",
857+
CreationTimestamp: metav1.Now(),
858+
}},
859+
}, wq)
860+
861+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
862+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
863+
})
864+
822865
It("should lower the priority of an update request with unchanged RV", func() {
823866
actualOpts := priorityqueue.AddOpts{}
824867
var actualRequests []reconcile.Request
@@ -843,6 +886,30 @@ var _ = Describe("Eventhandler", func() {
843886
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
844887
})
845888

889+
It("should lower the priority of an update request with unchanged RV without the WithLowPriority wrapper", func() {
890+
actualOpts := priorityqueue.AddOpts{}
891+
var actualRequests []reconcile.Request
892+
wq := &fakePriorityQueue{
893+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
894+
actualOpts = o
895+
actualRequests = items
896+
},
897+
}
898+
899+
h := &handler.EnqueueRequestForObject{}
900+
h.Update(ctx, event.UpdateEvent{
901+
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
902+
Name: "my-pod",
903+
}},
904+
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
905+
Name: "my-pod",
906+
}},
907+
}, wq)
908+
909+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{Priority: handler.LowPriority}))
910+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
911+
})
912+
846913
It("should not lower the priority of an update request with changed RV", func() {
847914
actualOpts := priorityqueue.AddOpts{}
848915
var actualRequests []reconcile.Request
@@ -868,6 +935,31 @@ var _ = Describe("Eventhandler", func() {
868935
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
869936
})
870937

938+
It("should not lower the priority of an update request with changed RV without the WithLowPriority wrapper", func() {
939+
actualOpts := priorityqueue.AddOpts{}
940+
var actualRequests []reconcile.Request
941+
wq := &fakePriorityQueue{
942+
addWithOpts: func(o priorityqueue.AddOpts, items ...reconcile.Request) {
943+
actualOpts = o
944+
actualRequests = items
945+
},
946+
}
947+
948+
h := &handler.EnqueueRequestForObject{}
949+
h.Update(ctx, event.UpdateEvent{
950+
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
951+
Name: "my-pod",
952+
}},
953+
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
954+
Name: "my-pod",
955+
ResourceVersion: "1",
956+
}},
957+
}, wq)
958+
959+
Expect(actualOpts).To(Equal(priorityqueue.AddOpts{}))
960+
Expect(actualRequests).To(Equal([]reconcile.Request{{NamespacedName: types.NamespacedName{Name: "my-pod"}}}))
961+
})
962+
871963
It("should have no effect on create if the workqueue is not a priorityqueue", func() {
872964
h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
873965
h.Create(ctx, event.CreateEvent{
@@ -881,6 +973,19 @@ var _ = Describe("Eventhandler", func() {
881973
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
882974
})
883975

976+
It("should have no effect on create if the workqueue is not a priorityqueue without the WithLowPriority wrapper", func() {
977+
h := &handler.EnqueueRequestForObject{}
978+
h.Create(ctx, event.CreateEvent{
979+
Object: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
980+
Name: "my-pod",
981+
}},
982+
}, q)
983+
984+
Expect(q.Len()).To(Equal(1))
985+
item, _ := q.Get()
986+
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
987+
})
988+
884989
It("should have no effect on Update if the workqueue is not a priorityqueue", func() {
885990
h := handler.WithLowPriorityWhenUnchanged(&handler.EnqueueRequestForObject{})
886991
h.Update(ctx, event.UpdateEvent{
@@ -896,8 +1001,23 @@ var _ = Describe("Eventhandler", func() {
8961001
item, _ := q.Get()
8971002
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
8981003
})
899-
})
9001004

1005+
It("should have no effect on Update if the workqueue is not a priorityqueue without the WithLowPriority wrapper", func() {
1006+
h := &handler.EnqueueRequestForObject{}
1007+
h.Update(ctx, event.UpdateEvent{
1008+
ObjectOld: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
1009+
Name: "my-pod",
1010+
}},
1011+
ObjectNew: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{
1012+
Name: "my-pod",
1013+
}},
1014+
}, q)
1015+
1016+
Expect(q.Len()).To(Equal(1))
1017+
item, _ := q.Get()
1018+
Expect(item).To(Equal(reconcile.Request{NamespacedName: types.NamespacedName{Name: "my-pod"}}))
1019+
})
1020+
})
9011021
})
9021022

9031023
type fakePriorityQueue struct {

0 commit comments

Comments
 (0)