Skip to content

Commit 721405e

Browse files
authored
Merge pull request #360 from jacobwolfaws/master
Change node-level idempotency to volume-target
2 parents b02afc9 + 265c7a9 commit 721405e

File tree

2 files changed

+104
-19
lines changed

2 files changed

+104
-19
lines changed

pkg/driver/node.go

+13-10
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ var (
3737
nodeCaps = []csi.NodeServiceCapability_RPC_Type{}
3838
)
3939

40-
// VolumeOperationAlreadyExists is message fmt returned to CO when there is another in-flight call on the given volumeID
41-
const VolumeOperationAlreadyExists = "An operation with the given volume=%q is already in progress"
40+
// VolumeOperationAlreadyExists is message fmt returned to CO when there is another in-flight call on the given rpcKey
41+
const VolumeOperationAlreadyExists = "An operation with the given volume=%q and target=%q is already in progress"
4242

4343
type nodeService struct {
4444
metadata cloud.MetadataService
@@ -120,12 +120,14 @@ func (d *nodeService) NodePublishVolume(ctx context.Context, req *csi.NodePublis
120120
return nil, status.Error(codes.InvalidArgument, "Volume capability not supported")
121121
}
122122

123-
if ok := d.inFlight.Insert(volumeID); !ok {
124-
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
123+
rpcKey := fmt.Sprintf("%s-%s", volumeID, target)
124+
125+
if ok := d.inFlight.Insert(rpcKey); !ok {
126+
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID, target)
125127
}
126128
defer func() {
127-
klog.V(4).InfoS("NodePublishVolume: volume operation finished", "volumeId", volumeID)
128-
d.inFlight.Delete(volumeID)
129+
klog.V(4).InfoS("NodePublishVolume: volume operation finished", "rpcKey", rpcKey)
130+
d.inFlight.Delete(rpcKey)
129131
}()
130132

131133
mountOptions := []string{}
@@ -182,12 +184,13 @@ func (d *nodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
182184
return nil, status.Error(codes.InvalidArgument, "Target path not provided")
183185
}
184186

185-
if ok := d.inFlight.Insert(volumeID); !ok {
186-
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
187+
rpcKey := fmt.Sprintf("%s-%s", volumeID, target)
188+
if ok := d.inFlight.Insert(rpcKey); !ok {
189+
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID, target)
187190
}
188191
defer func() {
189-
klog.V(4).InfoS("NodeUnpublishVolume: volume operation finished", "volumeId", volumeID)
190-
d.inFlight.Delete(volumeID)
192+
klog.V(4).InfoS("NodeUnpublishVolume: volume operation finished", "rpcKey", rpcKey)
193+
d.inFlight.Delete(rpcKey)
191194
}()
192195

193196
// Check if the target is mounted before unmounting

pkg/driver/node_test.go

+91-9
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,11 @@ var (
4040
func TestNodePublishVolume(t *testing.T) {
4141

4242
var (
43-
dnsname = "fs-0a2d0632b5ff567e9.fsx.us-west-2.amazonaws.com"
44-
mountname = "random"
45-
targetPath = "/target/path"
46-
stdVolCap = &csi.VolumeCapability{
43+
dnsname = "fs-0a2d0632b5ff567e9.fsx.us-west-2.amazonaws.com"
44+
mountname = "random"
45+
targetPath = "/target/path"
46+
targetPathAlt = "/target/alt_path"
47+
stdVolCap = &csi.VolumeCapability{
4748
AccessType: &csi.VolumeCapability_Mount{
4849
Mount: &csi.VolumeCapability_MountVolume{},
4950
},
@@ -438,7 +439,7 @@ func TestNodePublishVolume(t *testing.T) {
438439
},
439440
},
440441
{
441-
name: "fail another operation in-flight on given volumeId",
442+
name: "fail another operation in-flight on given volumeId-targetPath",
442443
testFunc: func(t *testing.T) {
443444
mockCtl := gomock.NewController(t)
444445
defer mockCtl.Finish()
@@ -462,11 +463,56 @@ func TestNodePublishVolume(t *testing.T) {
462463
TargetPath: targetPath,
463464
}
464465

465-
awsDriver.inFlight.Insert(volumeID)
466+
rpcKey := fmt.Sprintf("%s-%s", volumeID, targetPath)
467+
468+
awsDriver.inFlight.Insert(rpcKey)
466469
_, err := awsDriver.NodePublishVolume(context.TODO(), req)
467470
expectErr(t, err, codes.Aborted)
468471
},
469472
},
473+
{
474+
name: "success: operation in-flight with different volumeId-targetPath",
475+
testFunc: func(t *testing.T) {
476+
mockCtl := gomock.NewController(t)
477+
defer mockCtl.Finish()
478+
479+
mockMetadata := cloudMock.NewMockMetadataService(mockCtl)
480+
mockMounter := driverMocks.NewMockMounter(mockCtl)
481+
482+
awsDriver := &nodeService{
483+
metadata: mockMetadata,
484+
mounter: mockMounter,
485+
inFlight: internal.NewInFlight(),
486+
}
487+
488+
source := dnsname + "@tcp:/" + mountname
489+
490+
ctx := context.Background()
491+
req := &csi.NodePublishVolumeRequest{
492+
VolumeId: "volumeId",
493+
VolumeContext: map[string]string{
494+
volumeContextDnsName: dnsname,
495+
volumeContextMountName: mountname,
496+
},
497+
VolumeCapability: stdVolCap,
498+
TargetPath: targetPath,
499+
}
500+
501+
rpcKeyAlt := fmt.Sprintf("%s-%s", volumeID, targetPathAlt)
502+
503+
awsDriver.inFlight.Insert(rpcKeyAlt)
504+
505+
mockMounter.EXPECT().MakeDir(gomock.Eq(targetPath)).Return(nil)
506+
mockMounter.EXPECT().IsLikelyNotMountPoint(gomock.Eq(targetPath)).Return(true, nil)
507+
mockMounter.EXPECT().Mount(gomock.Eq(source), gomock.Eq(targetPath), gomock.Eq("lustre"), gomock.Any()).Return(nil)
508+
_, err := awsDriver.NodePublishVolume(ctx, req)
509+
if err != nil {
510+
t.Fatalf("NodePublishVolume is failed: %v", err)
511+
}
512+
513+
mockCtl.Finish()
514+
},
515+
},
470516
}
471517

472518
for _, tc := range testCases {
@@ -477,7 +523,8 @@ func TestNodePublishVolume(t *testing.T) {
477523
func TestNodeUnpublishVolume(t *testing.T) {
478524

479525
var (
480-
targetPath = "/target/path"
526+
targetPath = "/target/path"
527+
targetPathAlt = "/target/alt_path"
481528
)
482529

483530
testCases := []struct {
@@ -601,7 +648,7 @@ func TestNodeUnpublishVolume(t *testing.T) {
601648
},
602649
},
603650
{
604-
name: "fail another operation in-flight on given volumeId",
651+
name: "fail another operation in-flight on given volumeId-targetPath",
605652
testFunc: func(t *testing.T) {
606653
mockCtl := gomock.NewController(t)
607654
defer mockCtl.Finish()
@@ -620,11 +667,46 @@ func TestNodeUnpublishVolume(t *testing.T) {
620667
TargetPath: targetPath,
621668
}
622669

623-
awsDriver.inFlight.Insert(volumeID)
670+
rpcKey := fmt.Sprintf("%s-%s", volumeID, targetPath)
671+
672+
awsDriver.inFlight.Insert(rpcKey)
624673
_, err := awsDriver.NodeUnpublishVolume(context.TODO(), req)
625674
expectErr(t, err, codes.Aborted)
626675
},
627676
},
677+
{
678+
name: "success: operation in-flight with different volumeId-targetPath",
679+
testFunc: func(t *testing.T) {
680+
mockCtl := gomock.NewController(t)
681+
defer mockCtl.Finish()
682+
683+
mockMetadata := cloudMock.NewMockMetadataService(mockCtl)
684+
mockMounter := driverMocks.NewMockMounter(mockCtl)
685+
686+
awsDriver := &nodeService{
687+
metadata: mockMetadata,
688+
mounter: mockMounter,
689+
inFlight: internal.NewInFlight(),
690+
}
691+
692+
ctx := context.Background()
693+
req := &csi.NodeUnpublishVolumeRequest{
694+
VolumeId: "volumeId",
695+
TargetPath: targetPath,
696+
}
697+
698+
rpcKeyAlt := fmt.Sprintf("%s-%s", volumeID, targetPathAlt)
699+
awsDriver.inFlight.Insert(rpcKeyAlt)
700+
701+
mockMounter.EXPECT().IsLikelyNotMountPoint(gomock.Eq(targetPath)).Return(false, nil)
702+
mockMounter.EXPECT().Unmount(gomock.Eq(targetPath)).Return(nil)
703+
704+
_, err := awsDriver.NodeUnpublishVolume(ctx, req)
705+
if err != nil {
706+
t.Fatalf("NodeUnpublishVolume is failed: %v", err)
707+
}
708+
},
709+
},
628710
}
629711
for _, tc := range testCases {
630712
t.Run(tc.name, tc.testFunc)

0 commit comments

Comments
 (0)