@@ -3,9 +3,11 @@ package provider
3
3
import (
4
4
"context"
5
5
"fmt"
6
+ "reflect"
6
7
"time"
7
8
8
9
corev1 "k8s.io/api/core/v1"
10
+ apierrors "k8s.io/apimachinery/pkg/api/errors"
9
11
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
10
12
"k8s.io/apimachinery/pkg/util/wait"
11
13
"k8s.io/client-go/informers"
@@ -20,9 +22,12 @@ import (
20
22
)
21
23
22
24
const (
23
- controllerName = "service-with-loadbalancerclass -controller"
25
+ controllerName = "service-lbc -controller"
24
26
)
25
27
28
+ // loadbalancerClassServiceController starts a controller that reconcile type loadbalancer service with
29
+ // loadbalancerclass set to kube-vip.io/kube-vip-class.
30
+ // no need to add node controller since kube-vip-cp itself doesn't use node info to update loadbalancer
26
31
type loadbalancerClassServiceController struct {
27
32
kubeClient kubernetes.Interface
28
33
serviceInformer cache.SharedIndexInformer
@@ -60,12 +65,16 @@ func newLoadbalancerClassServiceController(
60
65
61
66
_ , _ = serviceInformer .AddEventHandler (cache.ResourceEventHandlerFuncs {
62
67
AddFunc : func (cur interface {}) {
63
- s := cur .(* corev1.Service ).DeepCopy ()
64
- c .enqueueService (s )
68
+ if svc , ok := cur .(* corev1.Service ); ok && wantsLoadBalancer (svc ) {
69
+ c .enqueueService (svc )
70
+ }
65
71
},
66
- UpdateFunc : func (old interface {}, new interface {}) {
67
- s := new .(* corev1.Service ).DeepCopy ()
68
- c .enqueueService (s )
72
+ UpdateFunc : func (old interface {}, cur interface {}) {
73
+ oldSvc , ok1 := old .(* corev1.Service )
74
+ curSvc , ok2 := cur .(* corev1.Service )
75
+ if ok1 && ok2 && wantsLoadBalancer (curSvc ) && (c .needsUpdate (oldSvc , curSvc ) || needsCleanup (curSvc )) {
76
+ c .enqueueService (curSvc )
77
+ }
69
78
},
70
79
// Delete is handled in the UpdateFunc
71
80
})
@@ -161,18 +170,17 @@ func (c *loadbalancerClassServiceController) syncService(key string) error {
161
170
svc , err := c .serviceLister .Services (namespace ).Get (name )
162
171
163
172
switch {
173
+ case apierrors .IsNotFound (err ):
174
+ // service absence in store means watcher caught the deletion, ensure LB info is cleaned
175
+ return nil
164
176
case err != nil :
165
177
utilruntime .HandleError (fmt .Errorf ("unable to retrieve service %v from store: %v" , key , err ))
166
178
return err
167
- case isLoadbalancerService ( svc ) && loadbalancerClassMatch ( svc ) :
179
+ default :
168
180
klog .Infof ("Reconcile service %s/%s, since loadbalancerClass match" , svc .Namespace , svc .Name )
169
181
if err = c .processServiceCreateOrUpdate (svc ); err != nil {
170
182
return err
171
183
}
172
- case isLoadbalancerService (svc ):
173
- klog .Infof ("Skip reconciling service %s/%s, since loadbalancerClass doesn't match" , svc .Namespace , svc .Name )
174
- default :
175
- // skip if it's not service type lb
176
184
}
177
185
178
186
return nil
@@ -183,27 +191,31 @@ func (c *loadbalancerClassServiceController) processServiceCreateOrUpdate(svc *c
183
191
defer func () {
184
192
klog .Infof ("Finished processing service %s/%s (%v)" , svc .Namespace , svc .Name , time .Since (startTime ))
185
193
}()
194
+
186
195
// if it's getting deleted, remove the finalizer
187
196
if ! svc .DeletionTimestamp .IsZero () {
188
197
if err := c .removeFinalizer (svc ); err != nil {
189
198
klog .Infof ("Error removing finalizer from service %s/%s" , svc .Namespace , svc .Name )
190
199
return err
191
200
}
192
- c .recorder .Eventf (svc , corev1 .EventTypeWarning , "LoadBalancerDeleted" , "loadbalancer is deleted " )
201
+ c .recorder .Event (svc , corev1 .EventTypeNormal , "LoadBalancerDeleted" , "Deleted load balancer " )
193
202
return nil
194
203
}
195
204
205
+ c .recorder .Event (svc , corev1 .EventTypeNormal , "EnsuringLoadBalancer" , "Ensuring load balancer" )
206
+
196
207
if err := c .addFinalizer (svc ); err != nil {
197
208
klog .Infof ("Error adding finalizer to service %s/%s" , svc .Namespace , svc .Name )
198
209
return err
199
210
}
200
211
201
- _ , err := syncLoadBalancer (context .Background (), c .kubeClient , svc , c .cmName , c .cmNamespace )
202
- if err != nil {
212
+ if _ , err := syncLoadBalancer (context .Background (), c .kubeClient , svc , c .cmName , c .cmNamespace ); err != nil {
203
213
c .recorder .Eventf (svc , corev1 .EventTypeWarning , "syncLoadBalancer" , "Error syncing load balancer: %v" , err )
204
214
return err
205
215
}
206
216
217
+ c .recorder .Event (svc , corev1 .EventTypeNormal , "EnsuredLoadBalancer" , "Ensured load balancer" )
218
+
207
219
return nil
208
220
}
209
221
@@ -237,12 +249,72 @@ func (c *loadbalancerClassServiceController) removeFinalizer(service *corev1.Ser
237
249
return err
238
250
}
239
251
240
- func loadbalancerClassMatch (svc * corev1.Service ) bool {
241
- return svc != nil && svc .Spec .LoadBalancerClass != nil && * svc .Spec .LoadBalancerClass == LoadbalancerClass
252
+ // needsUpdate checks if load balancer needs to be updated due to change in attributes.
253
+ func (c * loadbalancerClassServiceController ) needsUpdate (oldService * corev1.Service , newService * corev1.Service ) bool {
254
+ if wantsLoadBalancer (newService ) && ! reflect .DeepEqual (oldService .Spec .LoadBalancerSourceRanges , newService .Spec .LoadBalancerSourceRanges ) {
255
+ c .recorder .Eventf (newService , corev1 .EventTypeNormal , "LoadBalancerSourceRanges" , "%v -> %v" ,
256
+ oldService .Spec .LoadBalancerSourceRanges , newService .Spec .LoadBalancerSourceRanges )
257
+ return true
258
+ }
259
+
260
+ if ! portsEqualForLB (oldService , newService ) || oldService .Spec .SessionAffinity != newService .Spec .SessionAffinity {
261
+ return true
262
+ }
263
+
264
+ if ! reflect .DeepEqual (oldService .Spec .SessionAffinityConfig , newService .Spec .SessionAffinityConfig ) {
265
+ return true
266
+ }
267
+ if ! loadBalancerIPsAreEqual (oldService , newService ) {
268
+ c .recorder .Eventf (newService , corev1 .EventTypeNormal , "LoadbalancerIP" , "%v -> %v" ,
269
+ oldService .Spec .LoadBalancerIP , newService .Spec .LoadBalancerIP )
270
+ return true
271
+ }
272
+ if len (oldService .Spec .ExternalIPs ) != len (newService .Spec .ExternalIPs ) {
273
+ c .recorder .Eventf (newService , corev1 .EventTypeNormal , "ExternalIP" , "Count: %v -> %v" ,
274
+ len (oldService .Spec .ExternalIPs ), len (newService .Spec .ExternalIPs ))
275
+ return true
276
+ }
277
+ for i := range oldService .Spec .ExternalIPs {
278
+ if oldService .Spec .ExternalIPs [i ] != newService .Spec .ExternalIPs [i ] {
279
+ c .recorder .Eventf (newService , corev1 .EventTypeNormal , "ExternalIP" , "Added: %v" ,
280
+ newService .Spec .ExternalIPs [i ])
281
+ return true
282
+ }
283
+ }
284
+ if ! reflect .DeepEqual (oldService .Annotations , newService .Annotations ) {
285
+ return true
286
+ }
287
+ if oldService .UID != newService .UID {
288
+ c .recorder .Eventf (newService , corev1 .EventTypeNormal , "UID" , "%v -> %v" ,
289
+ oldService .UID , newService .UID )
290
+ return true
291
+ }
292
+ if oldService .Spec .ExternalTrafficPolicy != newService .Spec .ExternalTrafficPolicy {
293
+ c .recorder .Eventf (newService , corev1 .EventTypeNormal , "ExternalTrafficPolicy" , "%v -> %v" ,
294
+ oldService .Spec .ExternalTrafficPolicy , newService .Spec .ExternalTrafficPolicy )
295
+ return true
296
+ }
297
+ if oldService .Spec .HealthCheckNodePort != newService .Spec .HealthCheckNodePort {
298
+ c .recorder .Eventf (newService , corev1 .EventTypeNormal , "HealthCheckNodePort" , "%v -> %v" ,
299
+ oldService .Spec .HealthCheckNodePort , newService .Spec .HealthCheckNodePort )
300
+ return true
301
+ }
302
+
303
+ // User can upgrade (add another clusterIP or ipFamily) or can downgrade (remove secondary clusterIP or ipFamily),
304
+ // but CAN NOT change primary/secondary clusterIP || ipFamily UNLESS they are changing from/to/ON ExternalName
305
+ // so not care about order, only need check the length.
306
+ if len (oldService .Spec .IPFamilies ) != len (newService .Spec .IPFamilies ) {
307
+ c .recorder .Eventf (newService , corev1 .EventTypeNormal , "IPFamilies" , "Count: %v -> %v" ,
308
+ len (oldService .Spec .IPFamilies ), len (newService .Spec .IPFamilies ))
309
+ return true
310
+ }
311
+
312
+ return false
242
313
}
243
314
244
- func isLoadbalancerService (svc * corev1.Service ) bool {
245
- return svc != nil && svc .Spec .Type == corev1 .ServiceTypeLoadBalancer
315
+ // only return service that's service type loadbalancer and loadbalancerclass match
316
+ func wantsLoadBalancer (svc * corev1.Service ) bool {
317
+ return svc != nil && svc .Spec .Type == corev1 .ServiceTypeLoadBalancer && svc .Spec .LoadBalancerClass != nil && * svc .Spec .LoadBalancerClass == LoadbalancerClass
246
318
}
247
319
248
320
// removeString returns a newly created []string that contains all items from slice that
@@ -256,3 +328,77 @@ func removeString(slice []string, s string) []string {
256
328
}
257
329
return newSlice
258
330
}
331
+
332
+ // needsCleanup checks if load balancer needs to be cleaned up as indicated by finalizer.
333
+ func needsCleanup (service * corev1.Service ) bool {
334
+ if ! servicehelper .HasLBFinalizer (service ) {
335
+ return false
336
+ }
337
+
338
+ if ! service .ObjectMeta .DeletionTimestamp .IsZero () {
339
+ return true
340
+ }
341
+
342
+ return false
343
+ }
344
+
345
+ func loadBalancerIPsAreEqual (oldService , newService * corev1.Service ) bool {
346
+ return oldService .Spec .LoadBalancerIP == newService .Spec .LoadBalancerIP
347
+ }
348
+
349
+ func portsEqualForLB (x , y * corev1.Service ) bool {
350
+ xPorts := getPortsForLB (x )
351
+ yPorts := getPortsForLB (y )
352
+ return portSlicesEqualForLB (xPorts , yPorts )
353
+ }
354
+
355
+ func getPortsForLB (service * corev1.Service ) []* corev1.ServicePort {
356
+ ports := []* corev1.ServicePort {}
357
+ for i := range service .Spec .Ports {
358
+ sp := & service .Spec .Ports [i ]
359
+ ports = append (ports , sp )
360
+ }
361
+ return ports
362
+ }
363
+
364
+ func portSlicesEqualForLB (x , y []* corev1.ServicePort ) bool {
365
+ if len (x ) != len (y ) {
366
+ return false
367
+ }
368
+
369
+ for i := range x {
370
+ if ! portEqualForLB (x [i ], y [i ]) {
371
+ return false
372
+ }
373
+ }
374
+ return true
375
+ }
376
+
377
+ func portEqualForLB (x , y * corev1.ServicePort ) bool {
378
+ // TODO: Should we check name? (In theory, an LB could expose it)
379
+ if x .Name != y .Name {
380
+ return false
381
+ }
382
+
383
+ if x .Protocol != y .Protocol {
384
+ return false
385
+ }
386
+
387
+ if x .Port != y .Port {
388
+ return false
389
+ }
390
+
391
+ if x .NodePort != y .NodePort {
392
+ return false
393
+ }
394
+
395
+ if x .TargetPort != y .TargetPort {
396
+ return false
397
+ }
398
+
399
+ if ! reflect .DeepEqual (x .AppProtocol , y .AppProtocol ) {
400
+ return false
401
+ }
402
+
403
+ return true
404
+ }
0 commit comments