Skip to content

Commit 05d5779

Browse files
Add ControllerModifyVolume Support in external-resizer
1 parent 908ae11 commit 05d5779

28 files changed

+2186
-211
lines changed

README.md

+5-4
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@ Various external-resizer releases come with different alpha / beta features.
2222

2323
The following table reflects the head of this branch.
2424

25-
| Feature | Status | Default | Description |
26-
| ----------------- | ------- | ------- | ----------------------------------------------------------------------------------------------------------------------------- |
27-
| VolumeExpansion | Beta | On | [Support for expanding CSI volumes](https://kubernetes.io/docs/concepts/storage/persistent-volumes/#csi-volume-expansion). |
28-
| ReadWriteOncePod | Alpha | Off | [Single pod access mode for PersistentVolumes](https://kubernetes.io/docs/concepts/storage/persistent-volumes/#access-modes). |
25+
| Feature | Status | Default | Description |
26+
| ---------------------- | ------- | ------- | ----------------------------------------------------------------------------------------------------------------------------- |
27+
| VolumeExpansion | Beta | On | [Support for expanding CSI volumes](https://kubernetes.io/docs/concepts/storage/persistent-volumes/#csi-volume-expansion). |
28+
| ReadWriteOncePod | Alpha | Off | [Single pod access mode for PersistentVolumes](https://kubernetes.io/docs/concepts/storage/persistent-volumes/#access-modes). |
29+
| VolumeAttributesClass | Alpha | Off | [Volume Attributes Classes](https://kubernetes.io/docs/concepts/storage/volume-attributes-classes). |
2930

3031
## Usage
3132

cmd/csi-resizer/main.go

+27-2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ import (
3535

3636
"github.com/kubernetes-csi/csi-lib-utils/leaderelection"
3737
"github.com/kubernetes-csi/external-resizer/pkg/controller"
38+
"github.com/kubernetes-csi/external-resizer/pkg/features"
39+
"github.com/kubernetes-csi/external-resizer/pkg/modifier"
40+
"github.com/kubernetes-csi/external-resizer/pkg/modifycontroller"
3841
"github.com/kubernetes-csi/external-resizer/pkg/resizer"
3942
"github.com/kubernetes-csi/external-resizer/pkg/util"
4043
csitrans "k8s.io/csi-translation-lib"
@@ -179,6 +182,17 @@ func main() {
179182
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
180183
}
181184

185+
csiModifier, err := modifier.NewModifierFromClient(
186+
csiClient,
187+
*timeout,
188+
kubeClient,
189+
informerFactory,
190+
driverName)
191+
if err != nil {
192+
klog.ErrorS(err, "Failed to create CSI modifier")
193+
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
194+
}
195+
182196
// Start HTTP server for metrics + leader election healthz
183197
if addr != "" {
184198
metricsManager.RegisterToServer(mux, *metricsPath)
@@ -197,10 +211,21 @@ func main() {
197211
rc := controller.NewResizeController(resizerName, csiResizer, kubeClient, *resyncPeriod, informerFactory,
198212
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax),
199213
*handleVolumeInUseError)
214+
modifierName := csiModifier.Name()
215+
var mc modifycontroller.ModifyController
216+
// Add modify controller only if the feature gate is enabled
217+
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeAttributesClass) {
218+
mc = modifycontroller.NewModifyController(modifierName, csiModifier, kubeClient, *resyncPeriod, informerFactory,
219+
workqueue.NewItemExponentialFailureRateLimiter(*retryIntervalStart, *retryIntervalMax))
220+
}
221+
200222
run := func(ctx context.Context) {
201223
informerFactory.Start(wait.NeverStop)
202-
rc.Run(*workers, ctx)
203-
224+
go rc.Run(*workers, ctx)
225+
if utilfeature.DefaultFeatureGate.Enabled(features.VolumeAttributesClass) {
226+
go mc.Run(*workers, ctx)
227+
}
228+
<-ctx.Done()
204229
}
205230

206231
if !*enableLeaderElection {

deploy/kubernetes/rbac.yaml

+4
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ rules:
4242
- apiGroups: [""]
4343
resources: ["events"]
4444
verbs: ["list", "watch", "create", "update", "patch"]
45+
# only required if enabling the alpha volume modify feature
46+
- apiGroups: ["storage.k8s.io"]
47+
resources: ["volumeattributesclasses"]
48+
verbs: ["get", "list", "watch"]
4549

4650
---
4751
kind: ClusterRoleBinding

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.21
44

55
require (
66
github.com/container-storage-interface/spec v1.9.0
7+
github.com/google/go-cmp v0.6.0
78
github.com/google/gofuzz v1.2.0 // indirect
89
github.com/imdario/mergo v0.3.12 // indirect
910
github.com/kubernetes-csi/csi-lib-utils v0.17.0
@@ -37,7 +38,6 @@ require (
3738
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
3839
github.com/golang/protobuf v1.5.3 // indirect
3940
github.com/google/gnostic-models v0.6.8 // indirect
40-
github.com/google/go-cmp v0.6.0 // indirect
4141
github.com/google/uuid v1.3.1 // indirect
4242
github.com/inconshreveable/mousetrap v1.1.0 // indirect
4343
github.com/josharian/intern v1.0.0 // indirect

pkg/controller/controller.go

+39-56
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func NewResizeController(
132132
}
133133

134134
func (ctrl *resizeController) addPVC(obj interface{}) {
135-
objKey, err := getObjectKey(obj)
135+
objKey, err := util.GetObjectKey(obj)
136136
if err != nil {
137137
return
138138
}
@@ -228,25 +228,13 @@ func (ctrl *resizeController) updatePVC(oldObj, newObj interface{}) {
228228
}
229229

230230
func (ctrl *resizeController) deletePVC(obj interface{}) {
231-
objKey, err := getObjectKey(obj)
231+
objKey, err := util.GetObjectKey(obj)
232232
if err != nil {
233233
return
234234
}
235235
ctrl.claimQueue.Forget(objKey)
236236
}
237237

238-
func getObjectKey(obj interface{}) (string, error) {
239-
if unknown, ok := obj.(cache.DeletedFinalStateUnknown); ok && unknown.Obj != nil {
240-
obj = unknown.Obj
241-
}
242-
objKey, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
243-
if err != nil {
244-
klog.ErrorS(err, "Failed to get key from object")
245-
return "", err
246-
}
247-
return objKey, nil
248-
}
249-
250238
// Run starts the controller.
251239
func (ctrl *resizeController) Run(
252240
workers int, ctx context.Context) {
@@ -292,7 +280,7 @@ func (ctrl *resizeController) syncPVCs() {
292280

293281
// syncPVC checks if a pvc requests resizing, and execute the resize operation if requested.
294282
func (ctrl *resizeController) syncPVC(key string) error {
295-
klog.V(4).InfoS("Started PVC processing", "key", key)
283+
klog.V(4).InfoS("Started PVC processing for resize controller", "key", key)
296284

297285
namespace, name, err := cache.SplitMetaNamespaceKey(key)
298286
if err != nil {
@@ -502,12 +490,16 @@ func (ctrl *resizeController) markPVCAsFSResizeRequired(pvc *v1.PersistentVolume
502490
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions,
503491
[]v1.PersistentVolumeClaimCondition{pvcCondition})
504492

505-
_, err := ctrl.patchClaim(pvc, newPVC, true /* addResourceVersionCheck */)
506-
493+
updatedPVC, err := util.PatchClaim(ctrl.kubeClient, pvc, newPVC, true /* addResourceVersionCheck */)
507494
if err != nil {
508495
return fmt.Errorf("Mark PVC %q as file system resize required failed: %v", klog.KObj(pvc), err)
509496
}
510497

498+
err = ctrl.claims.Update(updatedPVC)
499+
if err != nil {
500+
return fmt.Errorf("error updating PVC %s in local cache: %v", klog.KObj(newPVC), err)
501+
}
502+
511503
klog.V(4).InfoS("Mark PVC as file system resize required", "PVC", klog.KObj(pvc))
512504
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal,
513505
util.FileSystemResizeRequired, "Require file system resize of volume on node")
@@ -527,7 +519,14 @@ func (ctrl *resizeController) markPVCResizeInProgress(pvc *v1.PersistentVolumeCl
527519
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(newPVC.Status.Conditions,
528520
[]v1.PersistentVolumeClaimCondition{progressCondition})
529521

530-
updatedPVC, err := ctrl.patchClaim(pvc, newPVC, true /* addResourceVersionCheck */)
522+
updatedPVC, err := util.PatchClaim(ctrl.kubeClient, pvc, newPVC, true /* addResourceVersionCheck */)
523+
if err != nil {
524+
return updatedPVC, fmt.Errorf("Mark PVC %q as resize as in progress failed: %v", klog.KObj(pvc), err)
525+
}
526+
err = ctrl.claims.Update(updatedPVC)
527+
if err != nil {
528+
return updatedPVC, fmt.Errorf("error updating PVC %s in local cache: %v", klog.KObj(newPVC), err)
529+
}
531530
return updatedPVC, err
532531
}
533532

@@ -538,38 +537,22 @@ func (ctrl *resizeController) markPVCResizeFinished(
538537
newPVC.Status.Capacity[v1.ResourceStorage] = newSize
539538
newPVC.Status.Conditions = util.MergeResizeConditionsOfPVC(pvc.Status.Conditions, []v1.PersistentVolumeClaimCondition{})
540539

541-
_, err := ctrl.patchClaim(pvc, newPVC, true /* addResourceVersionCheck */)
540+
updatedPVC, err := util.PatchClaim(ctrl.kubeClient, pvc, newPVC, true /* addResourceVersionCheck */)
542541
if err != nil {
543542
return fmt.Errorf("Mark PVC %q as resize finished failed: %v", klog.KObj(pvc), err)
544543
}
545544

545+
err = ctrl.claims.Update(updatedPVC)
546+
if err != nil {
547+
return fmt.Errorf("error updating PVC %s in local cache: %v", klog.KObj(newPVC), err)
548+
}
549+
546550
klog.V(4).InfoS("Resize PVC finished", "PVC", klog.KObj(pvc))
547551
ctrl.eventRecorder.Eventf(pvc, v1.EventTypeNormal, util.VolumeResizeSuccess, "Resize volume succeeded")
548552

549553
return nil
550554
}
551555

552-
// Patches a given PVC with changes from newPVC. If addResourceVersionCheck is true
553-
// then a version check is added to the patch to ensure that we are not patching
554-
// old(and possibly outdated) PVC objects.
555-
func (ctrl *resizeController) patchClaim(oldPVC, newPVC *v1.PersistentVolumeClaim, addResourceVersionCheck bool) (*v1.PersistentVolumeClaim, error) {
556-
patchBytes, err := util.GetPVCPatchData(oldPVC, newPVC, addResourceVersionCheck)
557-
if err != nil {
558-
return oldPVC, fmt.Errorf("can't patch status of PVC %s as generate path data failed: %v", klog.KObj(oldPVC), err)
559-
}
560-
updatedClaim, updateErr := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(oldPVC.Namespace).
561-
Patch(context.TODO(), oldPVC.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
562-
if updateErr != nil {
563-
return oldPVC, fmt.Errorf("can't patch status of PVC %s with %v", klog.KObj(oldPVC), updateErr)
564-
}
565-
err = ctrl.claims.Update(updatedClaim)
566-
if err != nil {
567-
return oldPVC, fmt.Errorf("error updating PVC %s in local cache: %v", klog.KObj(newPVC), err)
568-
}
569-
570-
return updatedClaim, nil
571-
}
572-
573556
func (ctrl *resizeController) deletePreResizeCapAnnotation(pv *v1.PersistentVolume) error {
574557
// if the pv does not have a resize annotation skip the entire process
575558
if !metav1.HasAnnotation(pv.ObjectMeta, util.AnnPreResizeCapacity) {
@@ -608,22 +591,6 @@ func (ctrl *resizeController) updatePVCapacity(
608591
return updatedPV, nil
609592
}
610593

611-
func (ctrl *resizeController) patchPersistentVolume(oldPV, newPV *v1.PersistentVolume) (*v1.PersistentVolume, error) {
612-
patchBytes, err := util.GetPatchData(oldPV, newPV)
613-
if err != nil {
614-
return nil, fmt.Errorf("can't update capacity of PV %s as generate path data failed: %v", newPV.Name, err)
615-
}
616-
updatedPV, updateErr := ctrl.kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), newPV.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
617-
if updateErr != nil {
618-
return nil, fmt.Errorf("update capacity of PV %s failed: %v", newPV.Name, updateErr)
619-
}
620-
err = ctrl.volumes.Update(updatedPV)
621-
if err != nil {
622-
return nil, fmt.Errorf("error updating PV %s in local cache: %v", newPV.Name, err)
623-
}
624-
return updatedPV, nil
625-
}
626-
627594
func parsePod(obj interface{}) *v1.Pod {
628595
if obj == nil {
629596
return nil
@@ -644,6 +611,22 @@ func parsePod(obj interface{}) *v1.Pod {
644611
return pod
645612
}
646613

614+
func (ctrl *resizeController) patchPersistentVolume(oldPV, newPV *v1.PersistentVolume) (*v1.PersistentVolume, error) {
615+
patchBytes, err := util.GetPatchData(oldPV, newPV)
616+
if err != nil {
617+
return nil, fmt.Errorf("can't update capacity of PV %s as generate path data failed: %v", newPV.Name, err)
618+
}
619+
updatedPV, updateErr := ctrl.kubeClient.CoreV1().PersistentVolumes().Patch(context.TODO(), newPV.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
620+
if updateErr != nil {
621+
return nil, fmt.Errorf("update capacity of PV %s failed: %v", newPV.Name, updateErr)
622+
}
623+
err = ctrl.volumes.Update(updatedPV)
624+
if err != nil {
625+
return nil, fmt.Errorf("error updating PV %s in local cache: %v", newPV.Name, err)
626+
}
627+
return updatedPV, nil
628+
}
629+
647630
func inUseError(err error) bool {
648631
st, ok := status.FromError(err)
649632
if !ok {

pkg/controller/controller_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ func TestController(t *testing.T) {
207207
disableVolumeInUseErrorHandler: true,
208208
},
209209
} {
210-
client := csi.NewMockClient("mock", test.NodeResize, true, true, true)
210+
client := csi.NewMockClient("mock", test.NodeResize, true, false, true, true)
211211
driverName, _ := client.GetDriverName(context.TODO())
212212

213213
var expectedCap resource.Quantity
@@ -343,7 +343,7 @@ func TestResizePVC(t *testing.T) {
343343
expectFailure: true,
344344
},
345345
} {
346-
client := csi.NewMockClient("mock", test.NodeResize, true, true, true)
346+
client := csi.NewMockClient("mock", test.NodeResize, true, false, true, true)
347347
if test.expansionFailure {
348348
client.SetExpansionFailed()
349349
}

pkg/controller/expand_and_recover.go

+1-27
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ import (
2020
"fmt"
2121

2222
"github.com/kubernetes-csi/external-resizer/pkg/util"
23-
"google.golang.org/grpc/codes"
24-
"google.golang.org/grpc/status"
2523
v1 "k8s.io/api/core/v1"
2624
"k8s.io/apimachinery/pkg/api/resource"
2725
"k8s.io/klog/v2"
@@ -167,7 +165,7 @@ func (ctrl *resizeController) callResizeOnPlugin(
167165
if inUseError(err) {
168166
ctrl.usedPVCs.addPVCWithInUseError(pvc)
169167
}
170-
if isFinalError(err) {
168+
if util.IsFinalError(err) {
171169
var markExpansionFailedError error
172170
pvc, markExpansionFailedError = ctrl.markControllerExpansionFailed(pvc)
173171
if markExpansionFailedError != nil {
@@ -205,27 +203,3 @@ func (ctrl *resizeController) pvCanBeExpanded(pv *v1.PersistentVolume, pvc *v1.P
205203
}
206204
return true
207205
}
208-
209-
func isFinalError(err error) bool {
210-
// Sources:
211-
// https://github.com/grpc/grpc/blob/master/doc/statuscodes.md
212-
// https://github.com/container-storage-interface/spec/blob/master/spec.md
213-
st, ok := status.FromError(err)
214-
if !ok {
215-
// This is not gRPC error. The operation must have failed before gRPC
216-
// method was called, otherwise we would get gRPC error.
217-
// We don't know if any previous volume operation is in progress, be on the safe side.
218-
return false
219-
}
220-
switch st.Code() {
221-
case codes.Canceled, // gRPC: Client Application cancelled the request
222-
codes.DeadlineExceeded, // gRPC: Timeout
223-
codes.Unavailable, // gRPC: Server shutting down, TCP connection broken - previous volume operation may be still in progress.
224-
codes.ResourceExhausted, // gRPC: Server temporarily out of resources - previous volume operation may be still in progress.
225-
codes.Aborted: // CSI: Operation pending for volume
226-
return false
227-
}
228-
// All other errors mean that operation either did not
229-
// even start or failed. It is for sure not in progress.
230-
return true
231-
}

0 commit comments

Comments
 (0)