Skip to content

Commit 2f2e7d8

Browse files
authored
Merge pull request #3167 from alvaroaleman/reuse
✨ Controller: Retain the priority
2 parents 85e014c + b32e654 commit 2f2e7d8

File tree

2 files changed

+152
-15
lines changed

2 files changed

+152
-15
lines changed

pkg/internal/controller/controller.go

+36-8
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"k8s.io/apimachinery/pkg/util/uuid"
3232
"k8s.io/client-go/util/workqueue"
3333

34+
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
3435
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
3536
logf "sigs.k8s.io/controller-runtime/pkg/log"
3637
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -60,7 +61,7 @@ type Controller[request comparable] struct {
6061

6162
// Queue is an listeningQueue that listens for events from Informers and adds object keys to
6263
// the Queue for processing
63-
Queue workqueue.TypedRateLimitingInterface[request]
64+
Queue priorityqueue.PriorityQueue[request]
6465

6566
// mu is used to synchronize Controller setup
6667
mu sync.Mutex
@@ -157,7 +158,12 @@ func (c *Controller[request]) Start(ctx context.Context) error {
157158
// Set the internal context.
158159
c.ctx = ctx
159160

160-
c.Queue = c.NewQueue(c.Name, c.RateLimiter)
161+
queue := c.NewQueue(c.Name, c.RateLimiter)
162+
if priorityQueue, isPriorityQueue := queue.(priorityqueue.PriorityQueue[request]); isPriorityQueue {
163+
c.Queue = priorityQueue
164+
} else {
165+
c.Queue = &priorityQueueWrapper[request]{TypedRateLimitingInterface: queue}
166+
}
161167
go func() {
162168
<-ctx.Done()
163169
c.Queue.ShutDown()
@@ -268,7 +274,7 @@ func (c *Controller[request]) Start(ctx context.Context) error {
268274
// processNextWorkItem will read a single work item off the workqueue and
269275
// attempt to process it, by calling the reconcileHandler.
270276
func (c *Controller[request]) processNextWorkItem(ctx context.Context) bool {
271-
obj, shutdown := c.Queue.Get()
277+
obj, priority, shutdown := c.Queue.GetWithPriority()
272278
if shutdown {
273279
// Stop working
274280
return false
@@ -285,7 +291,7 @@ func (c *Controller[request]) processNextWorkItem(ctx context.Context) bool {
285291
ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(1)
286292
defer ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Add(-1)
287293

288-
c.reconcileHandler(ctx, obj)
294+
c.reconcileHandler(ctx, obj, priority)
289295
return true
290296
}
291297

@@ -308,7 +314,7 @@ func (c *Controller[request]) initMetrics() {
308314
ctrlmetrics.ActiveWorkers.WithLabelValues(c.Name).Set(0)
309315
}
310316

311-
func (c *Controller[request]) reconcileHandler(ctx context.Context, req request) {
317+
func (c *Controller[request]) reconcileHandler(ctx context.Context, req request, priority int) {
312318
// Update metrics after processing each item
313319
reconcileStartTS := time.Now()
314320
defer func() {
@@ -331,7 +337,7 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request)
331337
if errors.Is(err, reconcile.TerminalError(nil)) {
332338
ctrlmetrics.TerminalReconcileErrors.WithLabelValues(c.Name).Inc()
333339
} else {
334-
c.Queue.AddRateLimited(req)
340+
c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req)
335341
}
336342
ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
337343
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
@@ -346,11 +352,11 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request)
346352
// We need to drive to stable reconcile loops before queuing due
347353
// to result.RequestAfter
348354
c.Queue.Forget(req)
349-
c.Queue.AddAfter(req, result.RequeueAfter)
355+
c.Queue.AddWithOpts(priorityqueue.AddOpts{After: result.RequeueAfter, Priority: priority}, req)
350356
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeueAfter).Inc()
351357
case result.Requeue: //nolint: staticcheck // We have to handle it until it is removed
352358
log.V(5).Info("Reconcile done, requeueing")
353-
c.Queue.AddRateLimited(req)
359+
c.Queue.AddWithOpts(priorityqueue.AddOpts{RateLimited: true, Priority: priority}, req)
354360
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelRequeue).Inc()
355361
default:
356362
log.V(5).Info("Reconcile successful")
@@ -388,3 +394,25 @@ type reconcileIDKey struct{}
388394
func addReconcileID(ctx context.Context, reconcileID types.UID) context.Context {
389395
return context.WithValue(ctx, reconcileIDKey{}, reconcileID)
390396
}
397+
398+
type priorityQueueWrapper[request comparable] struct {
399+
workqueue.TypedRateLimitingInterface[request]
400+
}
401+
402+
func (p *priorityQueueWrapper[request]) AddWithOpts(opts priorityqueue.AddOpts, items ...request) {
403+
for _, item := range items {
404+
switch {
405+
case opts.RateLimited:
406+
p.TypedRateLimitingInterface.AddRateLimited(item)
407+
case opts.After > 0:
408+
p.TypedRateLimitingInterface.AddAfter(item, opts.After)
409+
default:
410+
p.TypedRateLimitingInterface.Add(item)
411+
}
412+
}
413+
}
414+
415+
func (p *priorityQueueWrapper[request]) GetWithPriority() (request, int, bool) {
416+
item, shutdown := p.TypedRateLimitingInterface.Get()
417+
return item, 0, shutdown
418+
}

pkg/internal/controller/controller_test.go

+116-7
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
3939
"sigs.k8s.io/controller-runtime/pkg/client"
4040
"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
41+
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
4142
"sigs.k8s.io/controller-runtime/pkg/event"
4243
"sigs.k8s.io/controller-runtime/pkg/handler"
4344
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
@@ -345,9 +346,10 @@ var _ = Describe("controller", func() {
345346
})
346347

347348
It("should check for correct TypedSyncingSource if custom types are used", func() {
348-
queue := &controllertest.TypedQueue[TestRequest]{
349-
TypedInterface: workqueue.NewTyped[TestRequest](),
350-
}
349+
queue := &priorityQueueWrapper[TestRequest]{
350+
TypedRateLimitingInterface: &controllertest.TypedQueue[TestRequest]{
351+
TypedInterface: workqueue.NewTyped[TestRequest](),
352+
}}
351353
ctrl := &Controller[TestRequest]{
352354
NewQueue: func(string, workqueue.TypedRateLimiter[TestRequest]) workqueue.TypedRateLimitingInterface[TestRequest] {
353355
return queue
@@ -400,10 +402,6 @@ var _ = Describe("controller", func() {
400402
Eventually(func() int { return queue.NumRequeues(request) }).Should(Equal(0))
401403
})
402404

403-
PIt("should forget an item if it is not a Request and continue processing items", func() {
404-
// TODO(community): write this test
405-
})
406-
407405
It("should requeue a Request if there is an error and continue processing items", func() {
408406
ctx, cancel := context.WithCancel(context.Background())
409407
defer cancel()
@@ -523,6 +521,37 @@ var _ = Describe("controller", func() {
523521
Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0))
524522
})
525523

524+
It("should retain the priority when the reconciler requests a requeue", func() {
525+
q := &fakePriorityQueue{PriorityQueue: priorityqueue.New[reconcile.Request]("controller1")}
526+
ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
527+
return q
528+
}
529+
530+
ctx, cancel := context.WithCancel(context.Background())
531+
defer cancel()
532+
go func() {
533+
defer GinkgoRecover()
534+
Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
535+
}()
536+
537+
q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request)
538+
539+
By("Invoking Reconciler which will request a requeue")
540+
fakeReconcile.AddResult(reconcile.Result{Requeue: true}, nil)
541+
Expect(<-reconciled).To(Equal(request))
542+
Eventually(func() []priorityQueueAddition {
543+
q.lock.Lock()
544+
defer q.lock.Unlock()
545+
return q.added
546+
}).Should(Equal([]priorityQueueAddition{{
547+
AddOpts: priorityqueue.AddOpts{
548+
RateLimited: true,
549+
Priority: 10,
550+
},
551+
items: []reconcile.Request{request},
552+
}}))
553+
})
554+
526555
It("should requeue a Request after a duration (but not rate-limitted) if the Result sets RequeueAfter (regardless of Requeue)", func() {
527556
dq := &DelegatingQueue{TypedRateLimitingInterface: ctrl.NewQueue("controller1", nil)}
528557
ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
@@ -555,6 +584,37 @@ var _ = Describe("controller", func() {
555584
Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0))
556585
})
557586

587+
It("should retain the priority with RequeAfter", func() {
588+
q := &fakePriorityQueue{PriorityQueue: priorityqueue.New[reconcile.Request]("controller1")}
589+
ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
590+
return q
591+
}
592+
593+
ctx, cancel := context.WithCancel(context.Background())
594+
defer cancel()
595+
go func() {
596+
defer GinkgoRecover()
597+
Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
598+
}()
599+
600+
q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request)
601+
602+
By("Invoking Reconciler which will ask for RequeueAfter")
603+
fakeReconcile.AddResult(reconcile.Result{RequeueAfter: time.Millisecond * 100}, nil)
604+
Expect(<-reconciled).To(Equal(request))
605+
Eventually(func() []priorityQueueAddition {
606+
q.lock.Lock()
607+
defer q.lock.Unlock()
608+
return q.added
609+
}).Should(Equal([]priorityQueueAddition{{
610+
AddOpts: priorityqueue.AddOpts{
611+
After: time.Millisecond * 100,
612+
Priority: 10,
613+
},
614+
items: []reconcile.Request{request},
615+
}}))
616+
})
617+
558618
It("should perform error behavior if error is not nil, regardless of RequeueAfter", func() {
559619
dq := &DelegatingQueue{TypedRateLimitingInterface: ctrl.NewQueue("controller1", nil)}
560620
ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
@@ -586,6 +646,37 @@ var _ = Describe("controller", func() {
586646
Eventually(func() int { return dq.NumRequeues(request) }).Should(Equal(0))
587647
})
588648

649+
It("should retain the priority when there was an error", func() {
650+
q := &fakePriorityQueue{PriorityQueue: priorityqueue.New[reconcile.Request]("controller1")}
651+
ctrl.NewQueue = func(string, workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
652+
return q
653+
}
654+
655+
ctx, cancel := context.WithCancel(context.Background())
656+
defer cancel()
657+
go func() {
658+
defer GinkgoRecover()
659+
Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())
660+
}()
661+
662+
q.PriorityQueue.AddWithOpts(priorityqueue.AddOpts{Priority: 10}, request)
663+
664+
By("Invoking Reconciler which will return an error")
665+
fakeReconcile.AddResult(reconcile.Result{}, errors.New("oups, I did it again"))
666+
Expect(<-reconciled).To(Equal(request))
667+
Eventually(func() []priorityQueueAddition {
668+
q.lock.Lock()
669+
defer q.lock.Unlock()
670+
return q.added
671+
}).Should(Equal([]priorityQueueAddition{{
672+
AddOpts: priorityqueue.AddOpts{
673+
RateLimited: true,
674+
Priority: 10,
675+
},
676+
items: []reconcile.Request{request},
677+
}}))
678+
})
679+
589680
PIt("should return if the queue is shutdown", func() {
590681
// TODO(community): write this test
591682
})
@@ -977,3 +1068,21 @@ func (t *bisignallingSource[T]) WaitForSync(ctx context.Context) error {
9771068
return ctx.Err()
9781069
}
9791070
}
1071+
1072+
type priorityQueueAddition struct {
1073+
priorityqueue.AddOpts
1074+
items []reconcile.Request
1075+
}
1076+
1077+
type fakePriorityQueue struct {
1078+
priorityqueue.PriorityQueue[reconcile.Request]
1079+
1080+
lock sync.Mutex
1081+
added []priorityQueueAddition
1082+
}
1083+
1084+
func (f *fakePriorityQueue) AddWithOpts(o priorityqueue.AddOpts, items ...reconcile.Request) {
1085+
f.lock.Lock()
1086+
defer f.lock.Unlock()
1087+
f.added = append(f.added, priorityQueueAddition{AddOpts: o, items: items})
1088+
}

0 commit comments

Comments
 (0)