Skip to content

Commit ba53477

Browse files
committed
Fix behavior of rate limit option in priorityqueue.AddWithOpts
Signed-off-by: zach593 <[email protected]>
1 parent 2e8ba92 commit ba53477

File tree

2 files changed

+74
-5
lines changed

2 files changed

+74
-5
lines changed

pkg/controller/priorityqueue/priorityqueue.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"k8s.io/client-go/util/workqueue"
1212
"k8s.io/utils/clock"
1313
"k8s.io/utils/ptr"
14+
1415
"sigs.k8s.io/controller-runtime/pkg/internal/metrics"
1516
)
1617

@@ -132,16 +133,17 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
132133
defer w.lock.Unlock()
133134

134135
for _, key := range items {
136+
after := o.After
135137
if o.RateLimited {
136-
after := w.rateLimiter.When(key)
137-
if o.After == 0 || after < o.After {
138-
o.After = after
138+
rlAfter := w.rateLimiter.When(key)
139+
if after == 0 || rlAfter < after {
140+
after = rlAfter
139141
}
140142
}
141143

142144
var readyAt *time.Time
143-
if o.After > 0 {
144-
readyAt = ptr.To(w.now().Add(o.After))
145+
if after > 0 {
146+
readyAt = ptr.To(w.now().Add(after))
145147
w.metrics.retry()
146148
}
147149
if _, ok := w.items[key]; !ok {

pkg/controller/priorityqueue/priorityqueue_test.go

+67
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
. "github.com/onsi/ginkgo/v2"
1212
. "github.com/onsi/gomega"
1313
"k8s.io/apimachinery/pkg/util/sets"
14+
"k8s.io/client-go/util/workqueue"
1415
)
1516

1617
var _ = Describe("Controllerworkqueue", func() {
@@ -438,6 +439,72 @@ var _ = Describe("Controllerworkqueue", func() {
438439
Expect(metrics.depth["test"]).To(Equal(0))
439440
metrics.mu.Unlock()
440441
})
442+
443+
It("When adding items with rateLimit, previous items' rateLimit should not affect subsequent items", func() {
444+
q, metrics := newQueue()
445+
defer q.ShutDown()
446+
447+
now := time.Now().Round(time.Second)
448+
nowLock := sync.Mutex{}
449+
tick := make(chan time.Time)
450+
451+
cwq := q.(*priorityqueue[string])
452+
cwq.rateLimiter = workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second)
453+
cwq.now = func() time.Time {
454+
nowLock.Lock()
455+
defer nowLock.Unlock()
456+
return now
457+
}
458+
cwq.tick = func(d time.Duration) <-chan time.Time {
459+
done := make(chan struct{})
460+
go func() {
461+
defer GinkgoRecover()
462+
defer close(done)
463+
464+
Expect(d).To(Or(Equal(5*time.Millisecond), Equal(635*time.Millisecond)))
465+
}()
466+
<-done
467+
return tick
468+
}
469+
470+
retrievedItem := make(chan struct{})
471+
retrievedSecondItem := make(chan struct{})
472+
473+
go func() {
474+
defer GinkgoRecover()
475+
first, _, _ := q.GetWithPriority()
476+
Expect(first).To(Equal("foo"))
477+
close(retrievedItem)
478+
479+
second, _, _ := q.GetWithPriority()
480+
Expect(second).To(Equal("bar"))
481+
close(retrievedSecondItem)
482+
}()
483+
484+
// after 7 calls, the next When("bar") call will return 640ms.
485+
for range 7 {
486+
cwq.rateLimiter.When("bar")
487+
}
488+
q.AddWithOpts(AddOpts{RateLimited: true}, "foo", "bar")
489+
490+
Consistently(retrievedItem).ShouldNot(BeClosed())
491+
nowLock.Lock()
492+
now = now.Add(5 * time.Millisecond)
493+
nowLock.Unlock()
494+
tick <- now
495+
Eventually(retrievedItem).Should(BeClosed())
496+
497+
Consistently(retrievedSecondItem).ShouldNot(BeClosed())
498+
nowLock.Lock()
499+
now = now.Add(635 * time.Millisecond)
500+
nowLock.Unlock()
501+
tick <- now
502+
Eventually(retrievedSecondItem).Should(BeClosed())
503+
504+
Expect(metrics.depth["test"]).To(Equal(0))
505+
Expect(metrics.adds["test"]).To(Equal(2))
506+
Expect(metrics.retries["test"]).To(Equal(2))
507+
})
441508
})
442509

443510
func BenchmarkAddGetDone(b *testing.B) {

0 commit comments

Comments
 (0)