Skip to content

Commit 532991b

Browse files
committed
🌱 Add debug logging for the state of the priority queue
This debug logging prints the state of the workqueue in order to allow debugging it. It will continously check at runtime if debug logging is enabled and do nothing if not, making it very cheap if unused. Sample output piped through `jq` for readability: ``` { "level": "debug", "ts": "2025-01-19T12:00:43-05:00", "msg": "workqueue_state", "controller": "configmap", "items": [ { "key": { "Namespace": "kube-system", "Name": "kubeadm-config" }, "addedCounter": 1, "priority": -100 } ] } ```
1 parent 8f7e114 commit 532991b

File tree

3 files changed

+74
-37
lines changed

3 files changed

+74
-37
lines changed

examples/priorityqueue/main.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"os"
2323
"time"
2424

25+
"go.uber.org/zap/zapcore"
2526
corev1 "k8s.io/api/core/v1"
2627
"k8s.io/utils/ptr"
2728
"sigs.k8s.io/controller-runtime/pkg/builder"
@@ -45,7 +46,9 @@ func main() {
4546
}
4647

4748
func run() error {
48-
log.SetLogger(zap.New())
49+
log.SetLogger(zap.New(func(o *zap.Options) {
50+
o.Level = zapcore.DebugLevel
51+
}))
4952

5053
// Setup a Manager
5154
mgr, err := manager.New(kubeconfig.GetConfigOrDie(), manager.Options{

pkg/controller/controller.go

+1
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ func NewTypedUnmanaged[request comparable](name string, mgr manager.Manager, opt
202202
options.NewQueue = func(controllerName string, rateLimiter workqueue.TypedRateLimiter[request]) workqueue.TypedRateLimitingInterface[request] {
203203
if ptr.Deref(mgr.GetControllerOptions().UsePriorityQueue, false) {
204204
return priorityqueue.New(controllerName, func(o *priorityqueue.Opts[request]) {
205+
o.Log = mgr.GetLogger().WithValues("controller", controllerName)
205206
o.RateLimiter = rateLimiter
206207
})
207208
}

pkg/controller/priorityqueue/priorityqueue.go

+69-36
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"sync/atomic"
66
"time"
77

8+
"github.com/go-logr/logr"
89
"github.com/google/btree"
910
"k8s.io/apimachinery/pkg/util/sets"
1011
"k8s.io/client-go/util/workqueue"
@@ -36,6 +37,7 @@ type Opts[T comparable] struct {
3637
// limiter with an initial delay of five milliseconds and a max delay of 1000 seconds.
3738
RateLimiter workqueue.TypedRateLimiter[T]
3839
MetricProvider workqueue.MetricsProvider
40+
Log logr.Logger
3941
}
4042

4143
// Opt allows to configure a PriorityQueue.
@@ -57,6 +59,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
5759
}
5860

5961
pq := &priorityqueue[T]{
62+
log: opts.Log,
6063
items: map[T]*item[T]{},
6164
queue: btree.NewG(32, less[T]),
6265
becameReady: sets.Set[T]{},
@@ -75,6 +78,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
7578
}
7679

7780
go pq.spin()
81+
go pq.logState()
7882
if _, ok := pq.metrics.(noMetrics[T]); !ok {
7983
go pq.updateUnfinishedWorkLoop()
8084
}
@@ -83,6 +87,7 @@ func New[T comparable](name string, o ...Opt[T]) PriorityQueue[T] {
8387
}
8488

8589
type priorityqueue[T comparable] struct {
90+
log logr.Logger
8691
// lock has to be acquired for any access any of items, queue, addedCounter
8792
// or becameReady
8893
lock sync.Mutex
@@ -141,14 +146,14 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
141146
}
142147
if _, ok := w.items[key]; !ok {
143148
item := &item[T]{
144-
key: key,
145-
addedCounter: w.addedCounter,
146-
priority: o.Priority,
147-
readyAt: readyAt,
149+
Key: key,
150+
AddedCounter: w.addedCounter,
151+
Priority: o.Priority,
152+
ReadyAt: readyAt,
148153
}
149154
w.items[key] = item
150155
w.queue.ReplaceOrInsert(item)
151-
if item.readyAt == nil {
156+
if item.ReadyAt == nil {
152157
w.metrics.add(key)
153158
}
154159
w.addedCounter++
@@ -158,15 +163,15 @@ func (w *priorityqueue[T]) AddWithOpts(o AddOpts, items ...T) {
158163
// The b-tree de-duplicates based on ordering and any change here
159164
// will affect the order - Just delete and re-add.
160165
item, _ := w.queue.Delete(w.items[key])
161-
if o.Priority > item.priority {
162-
item.priority = o.Priority
166+
if o.Priority > item.Priority {
167+
item.Priority = o.Priority
163168
}
164169

165-
if item.readyAt != nil && (readyAt == nil || readyAt.Before(*item.readyAt)) {
170+
if item.ReadyAt != nil && (readyAt == nil || readyAt.Before(*item.ReadyAt)) {
166171
if readyAt == nil {
167172
w.metrics.add(key)
168173
}
169-
item.readyAt = readyAt
174+
item.ReadyAt = readyAt
170175
}
171176

172177
w.queue.ReplaceOrInsert(item)
@@ -210,14 +215,14 @@ func (w *priorityqueue[T]) spin() {
210215
// track what we want to delete and do it after we are done ascending.
211216
var toDelete []*item[T]
212217
w.queue.Ascend(func(item *item[T]) bool {
213-
if item.readyAt != nil {
214-
if readyAt := item.readyAt.Sub(w.now()); readyAt > 0 {
218+
if item.ReadyAt != nil {
219+
if readyAt := item.ReadyAt.Sub(w.now()); readyAt > 0 {
215220
nextReady = w.tick(readyAt)
216221
return false
217222
}
218-
if !w.becameReady.Has(item.key) {
219-
w.metrics.add(item.key)
220-
w.becameReady.Insert(item.key)
223+
if !w.becameReady.Has(item.Key) {
224+
w.metrics.add(item.Key)
225+
w.becameReady.Insert(item.Key)
221226
}
222227
}
223228

@@ -228,16 +233,16 @@ func (w *priorityqueue[T]) spin() {
228233
}
229234

230235
// Item is locked, we can not hand it out
231-
if w.locked.Has(item.key) {
236+
if w.locked.Has(item.Key) {
232237
return true
233238
}
234239

235-
w.metrics.get(item.key)
236-
w.locked.Insert(item.key)
240+
w.metrics.get(item.Key)
241+
w.locked.Insert(item.Key)
237242
w.waiters.Add(-1)
238-
delete(w.items, item.key)
243+
delete(w.items, item.Key)
239244
toDelete = append(toDelete, item)
240-
w.becameReady.Delete(item.key)
245+
w.becameReady.Delete(item.Key)
241246
w.get <- *item
242247

243248
return true
@@ -268,7 +273,7 @@ func (w *priorityqueue[T]) GetWithPriority() (_ T, priority int, shutdown bool)
268273
w.notifyItemOrWaiterAdded()
269274
item := <-w.get
270275

271-
return item.key, item.priority, w.shutdown.Load()
276+
return item.Key, item.Priority, w.shutdown.Load()
272277
}
273278

274279
func (w *priorityqueue[T]) Get() (item T, shutdown bool) {
@@ -316,7 +321,7 @@ func (w *priorityqueue[T]) Len() int {
316321

317322
var result int
318323
w.queue.Ascend(func(item *item[T]) bool {
319-
if item.readyAt == nil || item.readyAt.Compare(w.now()) <= 0 {
324+
if item.ReadyAt == nil || item.ReadyAt.Compare(w.now()) <= 0 {
320325
result++
321326
return true
322327
}
@@ -326,36 +331,64 @@ func (w *priorityqueue[T]) Len() int {
326331
return result
327332
}
328333

334+
func (w *priorityqueue[T]) logState() {
335+
t := time.Tick(10 * time.Second)
336+
for {
337+
select {
338+
case <-w.done:
339+
return
340+
case <-t:
341+
}
342+
343+
// Log level may change at runtime, so keep the
344+
// loop going even if a given level is currently
345+
// not enabled.
346+
if !w.log.V(1).Enabled() {
347+
continue
348+
}
349+
items := make([]*item[T], 0, len(w.items))
350+
w.lock.Lock()
351+
w.queue.Ascend(func(item *item[T]) bool {
352+
items = append(items, item)
353+
return true
354+
})
355+
w.lock.Unlock()
356+
357+
w.log.V(1).Info("workqueue_state", "items", items)
358+
}
359+
}
360+
329361
func less[T comparable](a, b *item[T]) bool {
330-
if a.readyAt == nil && b.readyAt != nil {
362+
if a.ReadyAt == nil && b.ReadyAt != nil {
331363
return true
332364
}
333-
if b.readyAt == nil && a.readyAt != nil {
365+
if b.ReadyAt == nil && a.ReadyAt != nil {
334366
return false
335367
}
336-
if a.readyAt != nil && b.readyAt != nil && !a.readyAt.Equal(*b.readyAt) {
337-
return a.readyAt.Before(*b.readyAt)
368+
if a.ReadyAt != nil && b.ReadyAt != nil && !a.ReadyAt.Equal(*b.ReadyAt) {
369+
return a.ReadyAt.Before(*b.ReadyAt)
338370
}
339-
if a.priority != b.priority {
340-
return a.priority > b.priority
371+
if a.Priority != b.Priority {
372+
return a.Priority > b.Priority
341373
}
342374

343-
return a.addedCounter < b.addedCounter
375+
return a.AddedCounter < b.AddedCounter
344376
}
345377

346378
type item[T comparable] struct {
347-
key T
348-
addedCounter uint64
349-
priority int
350-
readyAt *time.Time
379+
Key T `json:"key"`
380+
AddedCounter uint64 `json:"addedCounter"`
381+
Priority int `json:"priority"`
382+
ReadyAt *time.Time `json:"readyAt,omitempty"`
351383
}
352384

353385
func (w *priorityqueue[T]) updateUnfinishedWorkLoop() {
354-
t := time.NewTicker(500 * time.Millisecond) // borrowed from workqueue: https://github.com/kubernetes/kubernetes/blob/67a807bf142c7a2a5ecfdb2a5d24b4cdea4cc79c/staging/src/k8s.io/client-go/util/workqueue/queue.go#L182
355-
defer t.Stop()
356-
for range t.C {
357-
if w.shutdown.Load() {
386+
t := time.Tick(500 * time.Millisecond) // borrowed from workqueue: https://github.com/kubernetes/kubernetes/blob/67a807bf142c7a2a5ecfdb2a5d24b4cdea4cc79c/staging/src/k8s.io/client-go/util/workqueue/queue.go#L182
387+
for {
388+
select {
389+
case <-w.done:
358390
return
391+
case <-t:
359392
}
360393
w.metrics.updateUnfinishedWork()
361394
}

0 commit comments

Comments
 (0)