Skip to content

Commit f06cac4

Browse files
authoredJan 20, 2025··
Merge pull request #3075 from alvaroaleman/logs
🌱 Add debug logging for the state of the priority queue
2 parents 5a8edf6 + 158f539 commit f06cac4

File tree

3 files changed

+74
-37
lines changed

3 files changed

+74
-37
lines changed
 

‎examples/priorityqueue/main.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"os"
2323
"time"
2424

25+
"go.uber.org/zap/zapcore"
2526
corev1 "k8s.io/api/core/v1"
2627
"k8s.io/utils/ptr"
2728
"sigs.k8s.io/controller-runtime/pkg/builder"
@@ -45,7 +46,9 @@ func main() {
4546
}
4647

4748
func run() error {
48-
log.SetLogger(zap.New())
49+
log.SetLogger(zap.New(func(o *zap.Options) {
50+
o.Level = zapcore.Level(-5)
51+
}))
4952

5053
// Setup a Manager
5154
mgr, err := manager.New(kubeconfig.GetConfigOrDie(), manager.Options{

‎pkg/controller/controller.go

+1
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt
202202
options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] {
203203
if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) {
204204
return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) {
205+
o.Log = mgr.GetLogger().WithValues("controller", controllerName)
205206
o.RateLimiter = rateLimiter
206207
})
207208
}

‎pkg/controller/priorityqueue/priorityqueue.go

+69-36
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"sync/atomic"
66
"time"
77

8+
"github.com/go-logr/logr"
89
"github.com/google/btree"
910
"k8s.io/apimachinery/pkg/util/sets"
1011
"k8s.io/client-go/util/workqueue"
@@ -36,6 +37,7 @@ type Opts[T comparable] struct {
3637
// limiter with an initial delay of five milliseconds and a max delay of 1000 seconds.
3738
RateLimiter workqueue.TypedRateLimiter[T]
3839
MetricProvider workqueue.MetricsProvider
40+
Log logr.Logger
3941
}
4042

4143
// Opt allows to configure a PriorityQueue.
@@ -57,6 +59,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
5759
}
5860

5961
pq := &priorityqueue[T]{
62+
log: opts.Log,
6063
items: map[T]*item[T]{},
6164
queue: btree.NewG(32, less[T]),
6265
becameReady: sets.Set[T]{},
@@ -75,6 +78,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
7578
}
7679

7780
go pq.spin()
81+
go pq.logState()
7882
if _, ok := pq.metrics.(noMetrics[T]); !ok {
7983
go pq.updateUnfinishedWorkLoop()
8084
}
@@ -83,6 +87,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
8387
}
8488

8589
type priorityqueue[T comparable] struct {
90+
log logr.Logger
8691
// lock has to be acquired for any access any of items, queue, addedCounter
8792
// or becameReady
8893
lock sync.Mutex
@@ -141,14 +146,14 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
141146
}
142147
if _, ok := w.items[key]; !ok {
143148
item := &item[T]{
144-
key: key,
145-
addedCounter: w.addedCounter,
146-
priority: o.Priority,
147-
readyAt: readyAt,
149+
Key: key,
150+
AddedCounter: w.addedCounter,
151+
Priority: o.Priority,
152+
ReadyAt: readyAt,
148153
}
149154
w.items[key] = item
150155
w.queue.ReplaceOrInsert(item)
151-
if item.readyAt == nil {
156+
if item.ReadyAt == nil {
152157
w.metrics.add(key)
153158
}
154159
w.addedCounter++
@@ -158,15 +163,15 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
158163
// The b-tree de-duplicates based on ordering and any change here
159164
// will affect the order - Just delete and re-add.
160165
item, _ := w.queue.Delete(w.items[key])
161-
if o.Priority > item.priority {
162-
item.priority = o.Priority
166+
if o.Priority > item.Priority {
167+
item.Priority = o.Priority
163168
}
164169

165-
if item.readyAt != nil && (readyAt == nil || readyAt.Before(*item.readyAt)) {
170+
if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) {
166171
if readyAt == nil {
167172
w.metrics.add(key)
168173
}
169-
item.readyAt = readyAt
174+
item.ReadyAt = readyAt
170175
}
171176

172177
w.queue.ReplaceOrInsert(item)
@@ -210,14 +215,14 @@ func (w *priorityqueue[T]) spin() {
210215
// track what we want to delete and do it after we are done ascending.
211216
var toDelete []*item[T]
212217
w.queue.Ascend(func(item *item[T]) bool {
213-
if item.readyAt != nil {
214-
if readyAt := item.readyAt.Sub(w.now()); readyAt > 0 {
218+
if item.ReadyAt != nil {
219+
if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 {
215220
nextReady = w.tick(readyAt)
216221
return false
217222
}
218-
if !w.becameReady.Has(item.key) {
219-
w.metrics.add(item.key)
220-
w.becameReady.Insert(item.key)
223+
if !w.becameReady.Has(item.Key) {
224+
w.metrics.add(item.Key)
225+
w.becameReady.Insert(item.Key)
221226
}
222227
}
223228

@@ -228,16 +233,16 @@ func (w *priorityqueue[T]) spin() {
228233
}
229234

230235
// Item is locked, we can not hand it out
231-
if w.locked.Has(item.key) {
236+
if w.locked.Has(item.Key) {
232237
return true
233238
}
234239

235-
w.metrics.get(item.key)
236-
w.locked.Insert(item.key)
240+
w.metrics.get(item.Key)
241+
w.locked.Insert(item.Key)
237242
w.waiters.Add(-1)
238-
delete(w.items, item.key)
243+
delete(w.items, item.Key)
239244
toDelete = append(toDelete, item)
240-
w.becameReady.Delete(item.key)
245+
w.becameReady.Delete(item.Key)
241246
w.get <- *item
242247

243248
return true
@@ -268,7 +273,7 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool)
268273
w.notifyItemOrWaiterAdded()
269274
item := <-w.get
270275

271-
return item.key, item.priority, w.shutdown.Load()
276+
return item.Key, item.Priority, w.shutdown.Load()
272277
}
273278

274279
func (w *priorityqueue[T]) Get() (item T, shutdown bool) {
@@ -316,7 +321,7 @@ func (w *priorityqueue[T]) Len() int {
316321

317322
var result int
318323
w.queue.Ascend(func(item *item[T]) bool {
319-
if item.readyAt == nil || item.readyAt.Compare(w.now()) <= 0 {
324+
if item.ReadyAt == nil || item.ReadyAt.Compare(w.now()) <= 0 {
320325
result++
321326
return true
322327
}
@@ -326,36 +331,64 @@ func (w *priorityqueue[T]) Len() int {
326331
return result
327332
}
328333

334+
func (w *priorityqueue[T]) logState() {
335+
t := time.Tick(10 * time.Second)
336+
for {
337+
select {
338+
case <-w.done:
339+
return
340+
case <-t:
341+
}
342+
343+
// Log level may change at runtime, so keep the
344+
// loop going even if a given level is currently
345+
// not enabled.
346+
if !w.log.V(5).Enabled() {
347+
continue
348+
}
349+
w.lock.Lock()
350+
items := make([]*item[T], 0, len(w.items))
351+
w.queue.Ascend(func(item *item[T]) bool {
352+
items = append(items, item)
353+
return true
354+
})
355+
w.lock.Unlock()
356+
357+
w.log.V(5).Info("workqueue_items", "items", items)
358+
}
359+
}
360+
329361
func less[T comparable](a, b *item[T]) bool {
330-
if a.readyAt == nil && b.readyAt != nil {
362+
if a.ReadyAt == nil && b.ReadyAt != nil {
331363
return true
332364
}
333-
if b.readyAt == nil && a.readyAt != nil {
365+
if b.ReadyAt == nil && a.ReadyAt != nil {
334366
return false
335367
}
336-
if a.readyAt != nil && b.readyAt != nil && !a.readyAt.Equal(*b.readyAt) {
337-
return a.readyAt.Before(*b.readyAt)
368+
if a.ReadyAt != nil && b.ReadyAt != nil && !a.ReadyAt.Equal(*b.ReadyAt) {
369+
return a.ReadyAt.Before(*b.ReadyAt)
338370
}
339-
if a.priority != b.priority {
340-
return a.priority > b.priority
371+
if a.Priority != b.Priority {
372+
return a.Priority > b.Priority
341373
}
342374

343-
return a.addedCounter < b.addedCounter
375+
return a.AddedCounter < b.AddedCounter
344376
}
345377

346378
type item[T comparable] struct {
347-
key T
348-
addedCounter uint64
349-
priority int
350-
readyAt *time.Time
379+
Key T `json:"key"`
380+
AddedCounter uint64 `json:"addedCounter"`
381+
Priority int `json:"priority"`
382+
ReadyAt *time.Time `json:"readyAt,omitempty"`
351383
}
352384

353385
func (w *priorityqueue[T]) updateUnfinishedWorkLoop() {
354-
t := time.NewTicker(500 * time.Millisecond) // borrowed from workqueue: https://github.com/kubernetes/kubernetes/blob/67a807bf142c7a2a5ecfdb2a5d24b4cdea4cc79c/staging/src/k8s.io/client-go/util/workqueue/queue.go#L182
355-
defer t.Stop()
356-
for range t.C {
357-
if w.shutdown.Load() {
386+
t := time.Tick(500 * time.Millisecond) // borrowed from workqueue: https://github.com/kubernetes/kubernetes/blob/67a807bf142c7a2a5ecfdb2a5d24b4cdea4cc79c/staging/src/k8s.io/client-go/util/workqueue/queue.go#L182
387+
for {
388+
select {
389+
case <-w.done:
358390
return
391+
case <-t:
359392
}
360393
w.metrics.updateUnfinishedWork()
361394
}

0 commit comments

Comments
 (0)
Please sign in to comment.