@@ -22,6 +22,7 @@ import (
22
22
"context"
23
23
"errors"
24
24
"fmt"
25
+ "io"
25
26
"strings"
26
27
"testing"
27
28
"time"
@@ -179,80 +180,113 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
179
180
// Tests the case where channel idleness is enabled by passing a small value for
180
181
// idle_timeout. Verifies that a READY channel with an ongoing RPC stays READY.
181
182
func (s ) TestChannelIdleness_Enabled_OngoingCall (t * testing.T ) {
182
- // Create a ClientConn with a short idle_timeout.
183
- r := manual .NewBuilderWithScheme ("whatever" )
184
- dopts := []grpc.DialOption {
185
- grpc .WithTransportCredentials (insecure .NewCredentials ()),
186
- grpc .WithResolvers (r ),
187
- grpc .WithIdleTimeout (defaultTestShortIdleTimeout ),
188
- grpc .WithDefaultServiceConfig (`{"loadBalancingConfig": [{"round_robin":{}}]}` ),
189
- }
190
- cc , err := grpc .Dial (r .Scheme ()+ ":///test.server" , dopts ... )
191
- if err != nil {
192
- t .Fatalf ("grpc.Dial() failed: %v" , err )
193
- }
194
- t .Cleanup (func () { cc .Close () })
195
-
196
- // Start a test backend which keeps a unary RPC call active by blocking on a
197
- // channel that is closed by the test later on. Also push an address update
198
- // via the resolver.
199
- blockCh := make (chan struct {})
200
- backend := & stubserver.StubServer {
201
- EmptyCallF : func (ctx context.Context , in * testpb.Empty ) (* testpb.Empty , error ) {
202
- <- blockCh
203
- return & testpb.Empty {}, nil
183
+ tests := []struct {
184
+ name string
185
+ makeRPC func (ctx context.Context , client testgrpc.TestServiceClient ) error
186
+ }{
187
+ {
188
+ name : "unary" ,
189
+ makeRPC : func (ctx context.Context , client testgrpc.TestServiceClient ) error {
190
+ if _ , err := client .EmptyCall (ctx , & testpb.Empty {}); err != nil {
191
+ return fmt .Errorf ("EmptyCall RPC failed: %v" , err )
192
+ }
193
+ return nil
194
+ },
195
+ },
196
+ {
197
+ name : "streaming" ,
198
+ makeRPC : func (ctx context.Context , client testgrpc.TestServiceClient ) error {
199
+ stream , err := client .FullDuplexCall (ctx )
200
+ if err != nil {
201
+ t .Fatalf ("FullDuplexCall RPC failed: %v" , err )
202
+ }
203
+ if _ , err := stream .Recv (); err != nil && err != io .EOF {
204
+ t .Fatalf ("stream.Recv() failed: %v" , err )
205
+ }
206
+ return nil
207
+ },
204
208
},
205
209
}
206
- if err := backend .StartServer (); err != nil {
207
- t .Fatalf ("Failed to start backend: %v" , err )
208
- }
209
- t .Cleanup (backend .Stop )
210
- r .UpdateState (resolver.State {Addresses : []resolver.Address {{Addr : backend .Address }}})
211
-
212
- // Verify that the ClientConn moves to READY.
213
- ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
214
- defer cancel ()
215
- testutils .AwaitState (ctx , t , cc , connectivity .Ready )
216
210
217
- // Spawn a goroutine which checks expected state transitions and idleness
218
- // channelz trace events. It eventually closes `blockCh`, thereby unblocking
219
- // the server RPC handler and the unary call below.
220
- errCh := make (chan error , 1 )
221
- go func () {
222
- defer close (blockCh )
223
- // Verify that the ClientConn stays in READY.
224
- sCtx , sCancel := context .WithTimeout (ctx , 3 * defaultTestShortIdleTimeout )
225
- defer sCancel ()
226
- testutils .AwaitNoStateChange (sCtx , t , cc , connectivity .Ready )
227
-
228
- // Verify that there are no idleness related channelz events.
229
- if err := channelzTraceEventNotFound (ctx , "entering idle mode" ); err != nil {
230
- errCh <- err
231
- return
232
- }
233
- if err := channelzTraceEventNotFound (ctx , "exiting idle mode" ); err != nil {
234
- errCh <- err
235
- return
236
- }
211
+ for _ , test := range tests {
212
+ t .Run (test .name , func (t * testing.T ) {
213
+ // Create a ClientConn with a short idle_timeout.
214
+ r := manual .NewBuilderWithScheme ("whatever" )
215
+ dopts := []grpc.DialOption {
216
+ grpc .WithTransportCredentials (insecure .NewCredentials ()),
217
+ grpc .WithResolvers (r ),
218
+ grpc .WithIdleTimeout (defaultTestShortIdleTimeout ),
219
+ grpc .WithDefaultServiceConfig (`{"loadBalancingConfig": [{"round_robin":{}}]}` ),
220
+ }
221
+ cc , err := grpc .Dial (r .Scheme ()+ ":///test.server" , dopts ... )
222
+ if err != nil {
223
+ t .Fatalf ("grpc.Dial() failed: %v" , err )
224
+ }
225
+ t .Cleanup (func () { cc .Close () })
226
+
227
+ // Start a test backend which keeps a unary RPC call active by blocking on a
228
+ // channel that is closed by the test later on. Also push an address update
229
+ // via the resolver.
230
+ blockCh := make (chan struct {})
231
+ backend := & stubserver.StubServer {
232
+ EmptyCallF : func (ctx context.Context , in * testpb.Empty ) (* testpb.Empty , error ) {
233
+ <- blockCh
234
+ return & testpb.Empty {}, nil
235
+ },
236
+ FullDuplexCallF : func (stream testgrpc.TestService_FullDuplexCallServer ) error {
237
+ <- blockCh
238
+ return nil
239
+ },
240
+ }
241
+ if err := backend .StartServer (); err != nil {
242
+ t .Fatalf ("Failed to start backend: %v" , err )
243
+ }
244
+ t .Cleanup (backend .Stop )
245
+ r .UpdateState (resolver.State {Addresses : []resolver.Address {{Addr : backend .Address }}})
246
+
247
+ // Verify that the ClientConn moves to READY.
248
+ ctx , cancel := context .WithTimeout (context .Background (), defaultTestTimeout )
249
+ defer cancel ()
250
+ testutils .AwaitState (ctx , t , cc , connectivity .Ready )
251
+
252
+ // Spawn a goroutine which checks expected state transitions and idleness
253
+ // channelz trace events.
254
+ errCh := make (chan error , 1 )
255
+ go func () {
256
+ defer close (blockCh )
257
+
258
+ // Verify that the ClientConn stays in READY.
259
+ sCtx , sCancel := context .WithTimeout (ctx , 3 * defaultTestShortIdleTimeout )
260
+ defer sCancel ()
261
+ if cc .WaitForStateChange (sCtx , connectivity .Ready ) {
262
+ errCh <- fmt .Errorf ("state changed from %q to %q when no state change was expected" , connectivity .Ready , cc .GetState ())
263
+ return
264
+ }
237
265
238
- // Unblock the unary RPC on the server.
239
- errCh <- nil
240
- }()
266
+ // Verify that there are no idleness related channelz events.
267
+ //
268
+ // TODO: Improve the checks here. If these log strings are
269
+ // changed in the code, these checks will continue to pass.
270
+ if err := channelzTraceEventNotFound (ctx , "entering idle mode" ); err != nil {
271
+ errCh <- err
272
+ return
273
+ }
274
+ errCh <- channelzTraceEventNotFound (ctx , "exiting idle mode" )
275
+ }()
241
276
242
- // Make a unary RPC that blocks on the server, thereby ensuring that the
243
- // count of active RPCs on the client is non-zero.
244
- client := testgrpc .NewTestServiceClient (cc )
245
- if _ , err := client .EmptyCall (ctx , & testpb.Empty {}); err != nil {
246
- t .Errorf ("EmptyCall RPC failed: %v" , err )
247
- }
277
+ if err := test .makeRPC (ctx , testgrpc .NewTestServiceClient (cc )); err != nil {
278
+ t .Fatalf ("%s rpc failed: %v" , test .name , err )
279
+ }
248
280
249
- select {
250
- case err := <- errCh :
251
- if err != nil {
252
- t .Fatal (err )
253
- }
254
- case <- ctx .Done ():
255
- t .Fatalf ("Timeout when trying to verify that an active RPC keeps channel from moving to IDLE" )
281
+ select {
282
+ case err := <- errCh :
283
+ if err != nil {
284
+ t .Fatal (err )
285
+ }
286
+ case <- ctx .Done ():
287
+ t .Fatalf ("Timeout when trying to verify that an active RPC keeps channel from moving to IDLE" )
288
+ }
289
+ })
256
290
}
257
291
}
258
292
0 commit comments