Skip to content

Commit 43bd8e8

Browse files
committed
Add a service controller to handle loadbalancerclass case
Signed-off-by: lubronzhan <[email protected]>
1 parent 6d726a8 commit 43bd8e8

6 files changed

+97
-44
lines changed

README.md

+8-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ The `kube-vip-cloud-provider` will only implement the `loadBalancer` functionali
1919
- Setting the special IP `0.0.0.0` for DHCP workflow.
2020
- Support single stack IPv6 or IPv4
2121
- Support for dualstack via the annotation: `kube-vip.io/loadbalancerIPs: 192.168.10.10,2001:db8::1`
22-
- Support ascending and descending search order by setting search-order=desc
22+
- Support ascending and descending search order when allocating IP from pool or range by setting search-order=desc
23+
- Support loadbalancerClass `kube-vip.io/kube-vip-class`
2324

2425
## Installing the `kube-vip-cloud-provider`
2526

@@ -134,6 +135,12 @@ address in each of both IP families for the pool.
134135

135136
Set the CIDR to `0.0.0.0/32`, that will make the controller to give all _LoadBalancers_ the IP `0.0.0.0`.
136137

138+
139+
## LoadbalancerClass support
140+
141+
If users only want kube-vip-cloud-provider to allocate ip for specific set of services, they can pass `KUBEVIP_ENABLE_LOADBALANCERCLASS: true` as an environment variable to kube-vip-cloud-provider. kube-vip-cloud-provider will only allocate ip to service with `spec.loadBalancerClass: kube-vip.io/kube-vip-class`.
142+
143+
137144
## Debugging
138145

139146
The logs for the cloud-provider controller can be viewed with the following command:

pkg/provider/config.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
v1 "k8s.io/api/core/v1"
77
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
8+
"k8s.io/client-go/kubernetes"
89
)
910

1011
// Services functions - once the service data is taken from the configMap, these functions will interact with the data
@@ -45,12 +46,12 @@ import (
4546
// return
4647
// }
4748

48-
func (k *kubevipLoadBalancerManager) GetConfigMap(ctx context.Context, cm, nm string) (*v1.ConfigMap, error) {
49+
func getConfigMap(ctx context.Context, kubeClient kubernetes.Interface, cm, nm string) (*v1.ConfigMap, error) {
4950
// Attempt to retrieve the config map
50-
return k.kubeClient.CoreV1().ConfigMaps(nm).Get(ctx, cm, metav1.GetOptions{})
51+
return kubeClient.CoreV1().ConfigMaps(nm).Get(ctx, cm, metav1.GetOptions{})
5152
}
5253

53-
func (k *kubevipLoadBalancerManager) CreateConfigMap(ctx context.Context, cm, nm string) (*v1.ConfigMap, error) {
54+
func createConfigMap(ctx context.Context, kubeClient kubernetes.Interface, cm, nm string) (*v1.ConfigMap, error) {
5455
// Create new configuration map in the correct namespace
5556
newConfigMap := v1.ConfigMap{
5657
ObjectMeta: metav1.ObjectMeta{
@@ -59,7 +60,7 @@ func (k *kubevipLoadBalancerManager) CreateConfigMap(ctx context.Context, cm, nm
5960
},
6061
}
6162
// Return results of configMap create
62-
return k.kubeClient.CoreV1().ConfigMaps(nm).Create(ctx, &newConfigMap, metav1.CreateOptions{})
63+
return kubeClient.CoreV1().ConfigMaps(nm).Create(ctx, &newConfigMap, metav1.CreateOptions{})
6364
}
6465

6566
// func (k *kubevipLoadBalancerManager) UpdateConfigMap(ctx context.Context, cm *v1.ConfigMap, s *kubevipServices) (*v1.ConfigMap, error) {

pkg/provider/loadBalancer.go

+15-15
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ func newLoadBalancer(kubeClient kubernetes.Interface, ns, cm string) cloudprovid
4444
}
4545

4646
func (k *kubevipLoadBalancerManager) EnsureLoadBalancer(ctx context.Context, _ string, service *v1.Service, _ []*v1.Node) (lbs *v1.LoadBalancerStatus, err error) {
47-
return k.syncLoadBalancer(ctx, service)
47+
return syncLoadBalancer(ctx, k.kubeClient, service, k.cloudConfigMap, k.namespace)
4848
}
4949

5050
func (k *kubevipLoadBalancerManager) UpdateLoadBalancer(ctx context.Context, _ string, service *v1.Service, _ []*v1.Node) (err error) {
51-
_, err = k.syncLoadBalancer(ctx, service)
51+
_, err = syncLoadBalancer(ctx, k.kubeClient, service, k.cloudConfigMap, k.namespace)
5252
return err
5353
}
5454

@@ -87,7 +87,7 @@ func (k *kubevipLoadBalancerManager) deleteLoadBalancer(ctx context.Context, ser
8787
// 2b. Get the network configuration for this service (namespace) / (CIDR/Range)
8888
// 2c. Between the two find a free address
8989

90-
func (k *kubevipLoadBalancerManager) syncLoadBalancer(ctx context.Context, service *v1.Service) (*v1.LoadBalancerStatus, error) {
90+
func syncLoadBalancer(ctx context.Context, kubeClient kubernetes.Interface, service *v1.Service, cmName, cmNamespace string) (*v1.LoadBalancerStatus, error) {
9191
// This function reconciles the load balancer state
9292
klog.Infof("syncing service '%s' (%s)", service.Name, service.UID)
9393

@@ -97,7 +97,7 @@ func (k *kubevipLoadBalancerManager) syncLoadBalancer(ctx context.Context, servi
9797
klog.Warningf("service.Spec.LoadBalancerIP is defined but annotations '%s' is not, assume it's a legacy service, updates its annotations", loadbalancerIPsAnnotations)
9898
// assume it's legacy service, need to update the annotation.
9999
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
100-
recentService, getErr := k.kubeClient.CoreV1().Services(service.Namespace).Get(ctx, service.Name, metav1.GetOptions{})
100+
recentService, getErr := kubeClient.CoreV1().Services(service.Namespace).Get(ctx, service.Name, metav1.GetOptions{})
101101
if getErr != nil {
102102
return getErr
103103
}
@@ -109,7 +109,7 @@ func (k *kubevipLoadBalancerManager) syncLoadBalancer(ctx context.Context, servi
109109
delete(recentService.Labels, legacyIpamAddressLabelKey)
110110

111111
// Update the actual service with the annotations
112-
_, updateErr := k.kubeClient.CoreV1().Services(recentService.Namespace).Update(ctx, recentService, metav1.UpdateOptions{})
112+
_, updateErr := kubeClient.CoreV1().Services(recentService.Namespace).Update(ctx, recentService, metav1.UpdateOptions{})
113113
return updateErr
114114
})
115115
if err != nil {
@@ -125,7 +125,7 @@ func (k *kubevipLoadBalancerManager) syncLoadBalancer(ctx context.Context, servi
125125
if service.Labels == nil || service.Labels[implementationLabelKey] != implementationLabelValue {
126126
klog.Infof("service '%s/%s' created with pre-defined ip '%s'", service.Namespace, service.Name, v)
127127
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
128-
recentService, getErr := k.kubeClient.CoreV1().Services(service.Namespace).Get(ctx, service.Name, metav1.GetOptions{})
128+
recentService, getErr := kubeClient.CoreV1().Services(service.Namespace).Get(ctx, service.Name, metav1.GetOptions{})
129129
if getErr != nil {
130130
return getErr
131131
}
@@ -135,7 +135,7 @@ func (k *kubevipLoadBalancerManager) syncLoadBalancer(ctx context.Context, servi
135135
}
136136
recentService.Labels[implementationLabelKey] = implementationLabelValue
137137
// Update the actual service with the annotations
138-
_, updateErr := k.kubeClient.CoreV1().Services(recentService.Namespace).Update(ctx, recentService, metav1.UpdateOptions{})
138+
_, updateErr := kubeClient.CoreV1().Services(recentService.Namespace).Update(ctx, recentService, metav1.UpdateOptions{})
139139
return updateErr
140140
})
141141
if err != nil {
@@ -146,31 +146,31 @@ func (k *kubevipLoadBalancerManager) syncLoadBalancer(ctx context.Context, servi
146146
}
147147

148148
// Get the clound controller configuration map
149-
controllerCM, err := k.GetConfigMap(ctx, k.cloudConfigMap, k.namespace)
149+
controllerCM, err := getConfigMap(ctx, kubeClient, cmName, cmNamespace)
150150
if err != nil {
151-
klog.Errorf("Unable to retrieve kube-vip ipam config from configMap [%s] in %s", k.cloudConfigMap, k.namespace)
151+
klog.Errorf("Unable to retrieve kube-vip ipam config from configMap [%s] in %s", cmName, cmNamespace)
152152
// TODO - determine best course of action, create one if it doesn't exist
153-
controllerCM, err = k.CreateConfigMap(ctx, k.cloudConfigMap, k.namespace)
153+
controllerCM, err = createConfigMap(ctx, kubeClient, cmName, cmNamespace)
154154
if err != nil {
155155
return nil, err
156156
}
157157
}
158158

159159
// Get ip pool from configmap and determine if it is namespace specific or global
160-
pool, global, err := discoverPool(controllerCM, service.Namespace, k.cloudConfigMap)
160+
pool, global, err := discoverPool(controllerCM, service.Namespace, cmName)
161161
if err != nil {
162162
return nil, err
163163
}
164164

165165
// Get all services in this namespace or globally, that have the correct label
166166
var svcs *v1.ServiceList
167167
if global {
168-
svcs, err = k.kubeClient.CoreV1().Services("").List(ctx, metav1.ListOptions{LabelSelector: getKubevipImplementationLabel()})
168+
svcs, err = kubeClient.CoreV1().Services("").List(ctx, metav1.ListOptions{LabelSelector: getKubevipImplementationLabel()})
169169
if err != nil {
170170
return &service.Status.LoadBalancer, err
171171
}
172172
} else {
173-
svcs, err = k.kubeClient.CoreV1().Services(service.Namespace).List(ctx, metav1.ListOptions{LabelSelector: getKubevipImplementationLabel()})
173+
svcs, err = kubeClient.CoreV1().Services(service.Namespace).List(ctx, metav1.ListOptions{LabelSelector: getKubevipImplementationLabel()})
174174
if err != nil {
175175
return &service.Status.LoadBalancer, err
176176
}
@@ -201,7 +201,7 @@ func (k *kubevipLoadBalancerManager) syncLoadBalancer(ctx context.Context, servi
201201

202202
// Update the services with this new address
203203
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
204-
recentService, getErr := k.kubeClient.CoreV1().Services(service.Namespace).Get(ctx, service.Name, metav1.GetOptions{})
204+
recentService, getErr := kubeClient.CoreV1().Services(service.Namespace).Get(ctx, service.Name, metav1.GetOptions{})
205205
if getErr != nil {
206206
return getErr
207207
}
@@ -226,7 +226,7 @@ func (k *kubevipLoadBalancerManager) syncLoadBalancer(ctx context.Context, servi
226226
recentService.Spec.LoadBalancerIP = strings.Split(loadBalancerIPs, ",")[0]
227227

228228
// Update the actual service with the address and the labels
229-
_, updateErr := k.kubeClient.CoreV1().Services(recentService.Namespace).Update(ctx, recentService, metav1.UpdateOptions{})
229+
_, updateErr := kubeClient.CoreV1().Services(recentService.Namespace).Update(ctx, recentService, metav1.UpdateOptions{})
230230
return updateErr
231231
})
232232
if retryErr != nil {

pkg/provider/loadBalancer_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -888,7 +888,7 @@ func Test_syncLoadBalancer(t *testing.T) {
888888
}
889889
}
890890

891-
_, err = mgr.syncLoadBalancer(context.Background(), &tt.originalService) // #nosec G601
891+
_, err = syncLoadBalancer(context.Background(), mgr.kubeClient, &tt.originalService, cm, ns) // #nosec G601
892892
if err != nil {
893893
t.Error(err)
894894
}

pkg/controller/service.go pkg/provider/loadbalancerclass.go

+38-18
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
1-
package controller
1+
package provider
22

33
import (
4+
"context"
45
"fmt"
56
"time"
67

78
corev1 "k8s.io/api/core/v1"
9+
v1 "k8s.io/api/core/v1"
810
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
911
"k8s.io/apimachinery/pkg/util/wait"
1012
"k8s.io/client-go/informers"
13+
"k8s.io/client-go/kubernetes"
1114
"k8s.io/client-go/kubernetes/scheme"
1215
corelisters "k8s.io/client-go/listers/core/v1"
1316
"k8s.io/client-go/tools/cache"
@@ -20,46 +23,56 @@ const (
2023
controllerName = "service-with-loadbalancerclass-controller"
2124
)
2225

23-
type Controller struct {
26+
type loadbalancerClassServiceController struct {
27+
kubeClient kubernetes.Interface
2428
serviceInformer cache.SharedIndexInformer
2529
serviceLister corelisters.ServiceLister
2630
serviceListerSynced cache.InformerSynced
2731

2832
recorder record.EventRecorder
2933
workqueue workqueue.RateLimitingInterface
34+
35+
cmName string
36+
cmNamespace string
3037
}
3138

32-
func NewController(
39+
func newLoadbalancerClassServiceController(
3340
sharedInformer informers.SharedInformerFactory,
34-
) *Controller {
41+
kubeClient kubernetes.Interface,
42+
cmName, cmNamespace string,
43+
) *loadbalancerClassServiceController {
3544
eventBroadcaster := record.NewBroadcaster()
3645
eventBroadcaster.StartLogging(klog.Infof)
3746
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerName})
3847
serviceInformer := sharedInformer.Core().V1().Services().Informer()
39-
c := &Controller{
48+
c := &loadbalancerClassServiceController{
4049
serviceInformer: serviceInformer,
4150
serviceLister: sharedInformer.Core().V1().Services().Lister(),
4251
serviceListerSynced: serviceInformer.HasSynced,
52+
kubeClient: kubeClient,
4353

4454
recorder: recorder,
4555
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Nodes"),
56+
57+
cmName: cmName,
58+
cmNamespace: cmNamespace,
4659
}
4760

48-
serviceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
61+
_, _ = serviceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
4962
AddFunc: func(cur interface{}) {
50-
node := cur.(*corev1.Node).DeepCopy()
51-
c.enqueueService(node)
63+
s := cur.(*corev1.Node).DeepCopy()
64+
c.enqueueService(s)
5265
},
5366
UpdateFunc: func(old interface{}, new interface{}) {
54-
c.enqueueService(old)
67+
c.enqueueService(new)
5568
},
5669
// DeleteFunc: ,
5770
})
5871

5972
return c
6073
}
6174

62-
func (c *Controller) enqueueService(obj interface{}) {
75+
func (c *loadbalancerClassServiceController) enqueueService(obj interface{}) {
6376
var key string
6477
var err error
6578
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
@@ -70,17 +83,17 @@ func (c *Controller) enqueueService(obj interface{}) {
7083
}
7184

7285
// Run starts the worker to process node updates
73-
func (c *Controller) Run(stopCh <-chan struct{}) {
86+
func (c *loadbalancerClassServiceController) Run(stopCh <-chan struct{}) {
7487
defer utilruntime.HandleCrash()
7588
defer c.workqueue.ShutDown()
7689

7790
klog.V(4).Info("Waiting cache to be synced.")
7891

79-
if !cache.WaitForNamedCacheSync("node", stopCh, c.serviceListerSynced) {
92+
if !cache.WaitForNamedCacheSync("service", stopCh, c.serviceListerSynced) {
8093
return
8194
}
8295

83-
klog.V(4).Info("Starting node workers.")
96+
klog.V(4).Info("Starting service workers for loadbalancerclass.")
8497
go wait.Until(c.runWorker, time.Second, stopCh)
8598

8699
<-stopCh
@@ -89,14 +102,14 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
89102
// runWorker is a long-running function that will continually call the
90103
// processNextWorkItem function in order to read and process a message on the
91104
// workqueue.
92-
func (c *Controller) runWorker() {
105+
func (c *loadbalancerClassServiceController) runWorker() {
93106
for c.processNextWorkItem() {
94107
}
95108
}
96109

97110
// processNextWorkItem will read a single work item off the workqueue and
98111
// attempt to process it, by calling the syncHandler.
99-
func (c *Controller) processNextWorkItem() bool {
112+
func (c *loadbalancerClassServiceController) processNextWorkItem() bool {
100113
obj, shutdown := c.workqueue.Get()
101114
if shutdown {
102115
return false
@@ -138,7 +151,7 @@ func (c *Controller) processNextWorkItem() bool {
138151
// syncService will sync the Service with the given key if it has had its expectations fulfilled,
139152
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
140153
// invoked concurrently with the same key.
141-
func (c *Controller) syncService(key string) error {
154+
func (c *loadbalancerClassServiceController) syncService(key string) error {
142155
startTime := time.Now()
143156
defer func() {
144157
klog.V(4).Infof("Finished syncing service %q (%v)", key, time.Since(startTime))
@@ -154,15 +167,22 @@ func (c *Controller) syncService(key string) error {
154167
switch {
155168
case err != nil:
156169
utilruntime.HandleError(fmt.Errorf("unable to retrieve service %v from store: %v", key, err))
170+
case loadbalancerClassMatch(svc):
171+
klog.V(4).Infof("Skip reoconciling service %s/%s, since loadbalancerClass doesn't match", svc.Namespace, svc.Name)
157172
default:
158173
err = c.processServiceCreateOrUpdate(svc)
159174
}
160175

161176
return err
162177
}
163178

164-
func (c *Controller) processServiceCreateOrUpdate(svc *corev1.Service) error {
179+
func (c *loadbalancerClassServiceController) processServiceCreateOrUpdate(svc *corev1.Service) error {
165180
// ctx := context.Background()
181+
_, err := syncLoadBalancer(context.Background(), c.kubeClient, svc, c.cmName, c.cmNamespace)
182+
c.recorder.Eventf(svc, v1.EventTypeWarning, "syncLoadBalancer", "Error syncing load balancer: %v", err)
183+
return err
184+
}
166185

167-
return nil
186+
func loadbalancerClassMatch(svc *corev1.Service) bool {
187+
return svc != nil && svc.Spec.LoadBalancerClass != nil && *svc.Spec.LoadBalancerClass == LoadbalancerClass
168188
}

0 commit comments

Comments
 (0)