Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix cnsunregister API to not use CNS metadata #3192

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pkg/common/unittestcommon/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,11 @@ func (c *FakeK8SOrchestrator) GetPVNameFromCSIVolumeID(volumeID string) (string,
return "", false
}

// GetPVCNameFromCSIVolumeID retrieves the pvc name from volumeID.
func (c *FakeK8SOrchestrator) GetPVCNameFromCSIVolumeID(volumeID string) (string, bool) {
return "", false
}

// InitializeCSINodes creates CSINode instances for each K8s node with the appropriate topology keys.
func (c *FakeK8SOrchestrator) InitializeCSINodes(ctx context.Context) error {
return nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/csi/service/common/commonco/coagnostic.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ type COCommonInterface interface {
// GetPVNameFromCSIVolumeID retrieves the pv name from the volumeID.
// This method will not return pv name in case of in-tree migrated volumes
GetPVNameFromCSIVolumeID(volumeID string) (string, bool)
// GetPVCNameFromCSIVolumeID returns PV claim name for the volume ID
GetPVCNameFromCSIVolumeID(volumeID string) (string, bool)
// InitializeCSINodes creates CSINode instances for each K8s node with the appropriate topology keys.
InitializeCSINodes(ctx context.Context) error
// StartZonesInformer starts a dynamic informer which listens on Zones CR in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ func (m *volumeIDToPvcMap) remove(volumeHandle string) {
}

// Returns the namespaced pvc name corresponding to volumeHandle.
func (m *volumeIDToPvcMap) get(volumeHandle string) string {
func (m *volumeIDToPvcMap) get(volumeHandle string) (string, bool) {
m.RLock()
defer m.RUnlock()
return m.items[volumeHandle]
pvcname, found := m.items[volumeHandle]
return pvcname, found
}

// Map of the volumeName which refers to the PVName, to the list of node names in the cluster.
Expand Down Expand Up @@ -1792,3 +1793,8 @@ func (c *K8sOrchestrator) CreateConfigMap(ctx context.Context, name string, name
func (c *K8sOrchestrator) GetPVNameFromCSIVolumeID(volumeID string) (string, bool) {
return c.volumeIDToNameMap.get(volumeID)
}

// GetPVCNameFromCSIVolumeID retrieves the pvc name from volumeID using volumeIDToPvcMap.
func (c *K8sOrchestrator) GetPVCNameFromCSIVolumeID(volumeID string) (string, bool) {
return c.volumeIDToPvcMap.get(volumeID)
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
func (c *K8sOrchestrator) getPVCAnnotations(ctx context.Context, volumeID string) (map[string]string, error) {
log := logger.GetLogger(ctx)
log.Debugf("Getting annotations on pvc corresponding to volume: %s", volumeID)
if pvc := c.volumeIDToPvcMap.get(volumeID); pvc != "" {
if pvc, _ := c.volumeIDToPvcMap.get(volumeID); pvc != "" {
parts := strings.Split(pvc, "/")
pvcNamespace := parts[0]
pvcName := parts[1]
Expand Down Expand Up @@ -67,7 +67,7 @@ func (c *K8sOrchestrator) getPVCAnnotations(ctx context.Context, volumeID string
func (c *K8sOrchestrator) updatePVCAnnotations(ctx context.Context,
volumeID string, annotations map[string]string) error {
log := logger.GetLogger(ctx)
if pvc := c.volumeIDToPvcMap.get(volumeID); pvc != "" {
if pvc, _ := c.volumeIDToPvcMap.get(volumeID); pvc != "" {
parts := strings.Split(pvc, "/")
pvcNamespace := parts[0]
pvcName := parts[1]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cnsunregistervolume
import (
"context"
"fmt"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -55,7 +56,6 @@ import (

const (
defaultMaxWorkerThreadsForUnregisterVolume = 40
metadata = "VOLUME_METADATA"
)

var (
Expand Down Expand Up @@ -211,57 +211,23 @@ func (r *ReconcileCnsUnregisterVolume) Reconcile(ctx context.Context,
// 5. Invoke CNS DeleteVolume API with deleteDisk set to false.
// 6. Set the CnsUnregisterVolumeStatus.Unregistered to true.

// TODO - Add validations whether the volume is not in use in a TKC or by a VM service VM.

queryFilter := cnstypes.CnsQueryFilter{
VolumeIds: []cnstypes.CnsVolumeId{{Id: instance.Spec.VolumeID}},
}
querySelection := cnstypes.CnsQuerySelection{
Names: []string{
metadata,
},
}

queryResult, err := r.volumeManager.QueryVolumeAsync(ctx, queryFilter, &querySelection)
if err != nil {
msg := fmt.Sprintf("Unable to query volume %q . Error: %+v", instance.Spec.VolumeID, err)
log.Error(msg)
setInstanceError(ctx, r, instance, msg)
return reconcile.Result{RequeueAfter: timeout}, nil
}
if len(queryResult.Volumes) == 0 {
msg := fmt.Sprintf("Volume: %q not found while querying CNS. It may have already been unregistered.",
instance.Spec.VolumeID)
err = setInstanceSuccess(ctx, r, instance, msg)
if err != nil {
msg := fmt.Sprintf("Failed to update CnsUnregistered instance with error: %+v", err)
log.Error(msg)
setInstanceError(ctx, r, instance, msg)
return reconcile.Result{RequeueAfter: timeout}, nil
}
backOffDurationMapMutex.Lock()
delete(backOffDuration, instance.Name)
backOffDurationMapMutex.Unlock()
log.Info(msg)
return reconcile.Result{}, nil
}

cnsVol := queryResult.Volumes[0]

var pvName, pvcName, pvcNamespace string
for _, entity := range cnsVol.Metadata.EntityMetadata {
if k8sEntityMetadata, ok := entity.(*cnstypes.CnsKubernetesEntityMetadata); ok {
entityType := k8sEntityMetadata.EntityType

if entityType == string(cnstypes.CnsKubernetesEntityTypePV) {
pvName = entity.(*cnstypes.CnsKubernetesEntityMetadata).EntityName
}

if entityType == string(cnstypes.CnsKubernetesEntityTypePVC) {
pvcName = entity.(*cnstypes.CnsKubernetesEntityMetadata).EntityName
pvcNamespace = entity.(*cnstypes.CnsKubernetesEntityMetadata).Namespace
}
pvName, pvfound := commonco.ContainerOrchestratorUtility.GetPVNameFromCSIVolumeID(instance.Spec.VolumeID)
if pvfound {
log.Infof("found PV: %q for the volumd Id: %q", pvName, instance.Spec.VolumeID)
pvcNamewithNamespace, pvcfound := commonco.ContainerOrchestratorUtility.
GetPVCNameFromCSIVolumeID(instance.Spec.VolumeID)
if pvcfound {
parts := strings.Split(pvcNamewithNamespace, "/")
pvcNamespace = parts[0]
pvcName = parts[1]
log.Infof("found PVC: %q in the namespace:%q for the volumd Id: %q", pvcName, pvcNamespace,
instance.Spec.VolumeID)
} else {
log.Infof("cound not find PVC for the volume Id: %q", instance.Spec.VolumeID)
}
} else {
log.Infof("cound not find PV for the volume Id: %q", instance.Spec.VolumeID)
}

k8sclient, err := k8s.NewClient(ctx)
Expand All @@ -271,8 +237,9 @@ func (r *ReconcileCnsUnregisterVolume) Reconcile(ctx context.Context,
setInstanceError(ctx, r, instance, "Failed to init K8S client for volume unregistration")
return reconcile.Result{RequeueAfter: timeout}, nil
}

err = validateVolumeNotInUse(ctx, cnsVol, pvcName, pvcNamespace, k8sclient)
// TODO - Add validations whether the volume is not in use in a TKC
// validateVolumeNotInUse does not check if detached PVC/PV in TKC, having reference to supervisor PVC/PV
err = validateVolumeNotInUse(ctx, instance.Spec.VolumeID, pvcName, pvcNamespace, k8sclient)
if err != nil {
log.Error(err)
setInstanceError(ctx, r, instance, err.Error())
Expand Down Expand Up @@ -301,9 +268,6 @@ func (r *ReconcileCnsUnregisterVolume) Reconcile(ctx context.Context,
}
log.Infof("Updated ReclaimPolicy on PV %q to %q", pvName, v1.PersistentVolumeReclaimRetain)
}
} else {
log.Infof("CNS metadata for volume %s has missing pvName."+
"PV may have already been deleted. Continuing with other operations..", instance.Spec.VolumeID)
}

// Delete PVC.
Expand All @@ -322,9 +286,6 @@ func (r *ReconcileCnsUnregisterVolume) Reconcile(ctx context.Context,
} else {
log.Infof("Deleted PVC %q in namespace %q", pvcName, pvcNamespace)
}
} else {
log.Infof("CNS metadata for volume %s has missing pvcName or namespace."+
"PVC may have already been deleted. Continuing with other operations..", instance.Spec.VolumeID)
}

if pvName != "" {
Expand Down Expand Up @@ -377,7 +338,7 @@ func (r *ReconcileCnsUnregisterVolume) Reconcile(ctx context.Context,

// validateVolumeNotInUse validates whether the volume to be unregistered is not in use by
// either PodVM, TKG cluster or Volume service VM.
func validateVolumeNotInUse(ctx context.Context, cnsVol cnstypes.CnsVolume, pvcName string,
func validateVolumeNotInUse(ctx context.Context, volumdId string, pvcName string,
pvcNamespace string, k8sClient clientset.Interface) error {

log := logger.GetLogger(ctx)
Expand All @@ -394,10 +355,10 @@ func validateVolumeNotInUse(ctx context.Context, cnsVol cnstypes.CnsVolume, pvcN
for _, podVol := range pod.Spec.Volumes {
if podVol.PersistentVolumeClaim != nil &&
podVol.PersistentVolumeClaim.ClaimName == pvcName {
log.Debugf("Volume %s is in use by pod %s in namespace %s", cnsVol.VolumeId.Id,
log.Debugf("Volume %s is in use by pod %s in namespace %s", volumdId,
pod.Name, pvcNamespace)
return fmt.Errorf("cannot unregister the volume %s as it's in use by pod %s in namespace %s",
cnsVol.VolumeId.Id, pod.Name, pvcNamespace)
volumdId, pod.Name, pvcNamespace)
}
}
}
Expand All @@ -406,14 +367,6 @@ func validateVolumeNotInUse(ctx context.Context, cnsVol cnstypes.CnsVolume, pvcN
// For volumes created from TKGs Cluster, CNS metadata will have two entries for containerClusterArray.
// One for clusterFlavor: "WORKLOAD" & clusterDistribution "SupervisorCluster",
// another for clusterFlavor: "GUEST_CLUSTER" & clusterDistribution: "TKGService".
for _, containerCluster := range cnsVol.Metadata.ContainerClusterArray {
if containerCluster.ClusterFlavor == "GUEST_CLUSTER" {
log.Debugf("Volume %s is in use by guest cluster with CNS clusterId %s", cnsVol.VolumeId.Id,
containerCluster.ClusterId)
return fmt.Errorf("cannot unregister the volume %s as it's in use by guest cluster with CNS clusterId %s",
cnsVol.VolumeId.Id, containerCluster.ClusterId)
}
}

// Check if the Supervisor volume is not used by a volume service VM.
// If the volume is specified in the VirtualMachine's spec, then it intends
Expand Down Expand Up @@ -444,10 +397,10 @@ func validateVolumeNotInUse(ctx context.Context, cnsVol cnstypes.CnsVolume, pvcN
for _, vmVol := range vmInstance.Spec.Volumes {
if vmVol.PersistentVolumeClaim != nil &&
vmVol.PersistentVolumeClaim.ClaimName == pvcName {
log.Debugf("Volume %s is in use by VirtualMachine %s in namespace %s", cnsVol.VolumeId.Id,
log.Debugf("Volume %s is in use by VirtualMachine %s in namespace %s", volumdId,
vmInstance.Name, pvcNamespace)
return fmt.Errorf("cannot unregister the volume %s as it's in use by VirtualMachine %s in namespace %s",
cnsVol.VolumeId.Id, vmInstance.Name, pvcNamespace)
volumdId, vmInstance.Name, pvcNamespace)
}
}
}
Expand Down