15
15
16
16
import java .io .IOException ;
17
17
import java .net .InetSocketAddress ;
18
+ import java .util .Queue ;
19
+ import java .util .concurrent .ConcurrentLinkedQueue ;
18
20
import java .util .concurrent .CountDownLatch ;
19
21
import java .util .concurrent .TimeUnit ;
20
22
import java .util .concurrent .atomic .AtomicReference ;
33
35
import org .eclipse .jetty .http .HttpURI ;
34
36
import org .eclipse .jetty .http .HttpVersion ;
35
37
import org .eclipse .jetty .http .MetaData ;
38
+ import org .eclipse .jetty .http2 .CloseState ;
36
39
import org .eclipse .jetty .http2 .ErrorCode ;
40
+ import org .eclipse .jetty .http2 .HTTP2Session ;
37
41
import org .eclipse .jetty .http2 .api .Session ;
38
42
import org .eclipse .jetty .http2 .api .Stream ;
39
43
import org .eclipse .jetty .http2 .client .HTTP2Client ;
44
+ import org .eclipse .jetty .http2 .frames .DataFrame ;
40
45
import org .eclipse .jetty .http2 .frames .HeadersFrame ;
41
46
import org .eclipse .jetty .http2 .frames .ResetFrame ;
42
47
import org .eclipse .jetty .http2 .server .HTTP2CServerConnectionFactory ;
43
48
import org .eclipse .jetty .io .EofException ;
44
49
import org .eclipse .jetty .server .HttpConfiguration ;
45
50
import org .eclipse .jetty .server .Server ;
46
51
import org .eclipse .jetty .server .ServerConnector ;
52
+ import org .eclipse .jetty .util .Callback ;
47
53
import org .eclipse .jetty .util .FuturePromise ;
48
54
import org .eclipse .jetty .util .component .LifeCycle ;
55
+ import org .eclipse .jetty .util .thread .QueuedThreadPool ;
49
56
import org .junit .jupiter .api .AfterEach ;
50
57
import org .junit .jupiter .params .ParameterizedTest ;
51
58
import org .junit .jupiter .params .provider .ValueSource ;
52
59
60
+ import static java .nio .charset .StandardCharsets .UTF_8 ;
53
61
import static org .awaitility .Awaitility .await ;
54
62
import static org .hamcrest .Matchers .instanceOf ;
55
63
import static org .hamcrest .Matchers .nullValue ;
@@ -64,15 +72,20 @@ public class Http2AsyncIOServletTest
64
72
65
73
private void start (HttpServlet httpServlet ) throws Exception
66
74
{
67
- server = new Server ();
75
+ QueuedThreadPool serverThreads = new QueuedThreadPool ();
76
+ serverThreads .setName ("server" );
77
+ server = new Server (serverThreads );
68
78
connector = new ServerConnector (server , 1 , 1 , new HTTP2CServerConnectionFactory (httpConfig ));
69
79
server .addConnector (connector );
70
80
ServletContextHandler servletContextHandler = new ServletContextHandler ("/" );
71
81
servletContextHandler .addServlet (new ServletHolder (httpServlet ), "/*" );
72
82
server .setHandler (servletContextHandler );
73
83
server .start ();
74
84
85
+ QueuedThreadPool clientThreads = new QueuedThreadPool ();
86
+ clientThreads .setName ("client" );
75
87
client = new HTTP2Client ();
88
+ client .setExecutor (clientThreads );
76
89
client .start ();
77
90
}
78
91
@@ -218,4 +231,206 @@ public void onStartAsync(AsyncEvent event)
218
231
219
232
assertTrue (errorLatch .await (5 , TimeUnit .SECONDS ));
220
233
}
234
+
235
+ @ ParameterizedTest
236
+ @ ValueSource (booleans = {false , true })
237
+ public void testSessionCloseWithPendingRequestThenReset (boolean useReaderWriter ) throws Exception
238
+ {
239
+ // Disable output aggregation for Servlets, so each byte is echoed back.
240
+ httpConfig .setOutputAggregationSize (0 );
241
+ CountDownLatch serverFailureLatch = new CountDownLatch (1 );
242
+ start (new HttpServlet ()
243
+ {
244
+ @ Override
245
+ protected void service (HttpServletRequest request , HttpServletResponse response ) throws IOException
246
+ {
247
+ try
248
+ {
249
+ if (useReaderWriter )
250
+ request .getReader ().transferTo (response .getWriter ());
251
+ else
252
+ request .getInputStream ().transferTo (response .getOutputStream ());
253
+ }
254
+ catch (Throwable x )
255
+ {
256
+ serverFailureLatch .countDown ();
257
+ throw x ;
258
+ }
259
+ }
260
+ });
261
+
262
+ HTTP2Session session = (HTTP2Session )client .connect (new InetSocketAddress ("localhost" , connector .getLocalPort ()), new Session .Listener () {})
263
+ .get (5 , TimeUnit .SECONDS );
264
+ Queue <Stream .Data > dataList = new ConcurrentLinkedQueue <>();
265
+ MetaData .Request request = new MetaData .Request ("POST" , HttpURI .from ("/" ), HttpVersion .HTTP_2 , HttpFields .EMPTY );
266
+ Stream stream = session .newStream (new HeadersFrame (request , null , false ), new Stream .Listener ()
267
+ {
268
+ @ Override
269
+ public void onDataAvailable (Stream stream )
270
+ {
271
+ while (true )
272
+ {
273
+ Stream .Data data = stream .readData ();
274
+ if (data == null )
275
+ {
276
+ stream .demand ();
277
+ return ;
278
+ }
279
+ dataList .offer (data );
280
+ if (data .frame ().isEndStream ())
281
+ return ;
282
+ }
283
+ }
284
+ }).get (5 , TimeUnit .SECONDS );
285
+
286
+ stream .data (new DataFrame (stream .getId (), UTF_8 .encode ("Hello Jetty" ), false ))
287
+ .get (5 , TimeUnit .SECONDS );
288
+ stream .demand ();
289
+
290
+ await ().atMost (5 , TimeUnit .SECONDS ).until (() -> !dataList .isEmpty ());
291
+
292
+ // Initiates graceful close, waits for the streams to finish as per specification.
293
+ session .close (ErrorCode .NO_ERROR .code , "client_close" , Callback .NOOP );
294
+
295
+ // Finish the pending stream, either by resetting or sending the last frame.
296
+ stream .reset (new ResetFrame (stream .getId (), ErrorCode .CANCEL_STREAM_ERROR .code ));
297
+
298
+ // The server should see the effects of the reset.
299
+ assertTrue (serverFailureLatch .await (5 , TimeUnit .SECONDS ));
300
+ // The session must eventually be closed.
301
+ await ().atMost (5 , TimeUnit .SECONDS ).until (() -> session .getCloseState () == CloseState .CLOSED );
302
+ // The endPoint must eventually be closed.
303
+ await ().atMost (5 , TimeUnit .SECONDS ).until (() -> !session .getEndPoint ().isOpen ());
304
+
305
+ // Cleanup.
306
+ dataList .forEach (Stream .Data ::release );
307
+ }
308
+
309
+ @ ParameterizedTest
310
+ @ ValueSource (booleans = {false , true })
311
+ public void testSessionCloseWithPendingRequestServerIdleTimeout (boolean useReaderWriter ) throws Exception
312
+ {
313
+ // Disable output aggregation for Servlets, so each byte is echoed back.
314
+ httpConfig .setOutputAggregationSize (0 );
315
+ CountDownLatch serverFailureLatch = new CountDownLatch (1 );
316
+ start (new HttpServlet ()
317
+ {
318
+ @ Override
319
+ protected void service (HttpServletRequest request , HttpServletResponse response ) throws IOException
320
+ {
321
+ try
322
+ {
323
+ if (useReaderWriter )
324
+ request .getReader ().transferTo (response .getWriter ());
325
+ else
326
+ request .getInputStream ().transferTo (response .getOutputStream ());
327
+ }
328
+ catch (Throwable x )
329
+ {
330
+ serverFailureLatch .countDown ();
331
+ throw x ;
332
+ }
333
+ }
334
+ });
335
+ long idleTimeout = 1000 ;
336
+ connector .setIdleTimeout (idleTimeout );
337
+
338
+ HTTP2Session session = (HTTP2Session )client .connect (new InetSocketAddress ("localhost" , connector .getLocalPort ()), new Session .Listener () {})
339
+ .get (5 , TimeUnit .SECONDS );
340
+ Queue <Stream .Data > dataList = new ConcurrentLinkedQueue <>();
341
+ MetaData .Request request = new MetaData .Request ("POST" , HttpURI .from ("/" ), HttpVersion .HTTP_2 , HttpFields .EMPTY );
342
+ Stream stream = session .newStream (new HeadersFrame (request , null , false ), new Stream .Listener ()
343
+ {
344
+ @ Override
345
+ public void onDataAvailable (Stream stream )
346
+ {
347
+ while (true )
348
+ {
349
+ Stream .Data data = stream .readData ();
350
+ if (data == null )
351
+ {
352
+ stream .demand ();
353
+ return ;
354
+ }
355
+ dataList .offer (data );
356
+ if (data .frame ().isEndStream ())
357
+ return ;
358
+ }
359
+ }
360
+ }).get (5 , TimeUnit .SECONDS );
361
+
362
+ stream .data (new DataFrame (stream .getId (), UTF_8 .encode ("Hello Jetty" ), false ))
363
+ .get (5 , TimeUnit .SECONDS );
364
+ stream .demand ();
365
+
366
+ await ().atMost (5 , TimeUnit .SECONDS ).until (() -> !dataList .isEmpty ());
367
+
368
+ // Initiates graceful close, waits for the streams to finish as per specification.
369
+ session .close (ErrorCode .NO_ERROR .code , "client_close" , Callback .NOOP );
370
+
371
+ // Do not finish the streams, the server must idle timeout.
372
+ assertTrue (serverFailureLatch .await (2 * idleTimeout , TimeUnit .SECONDS ));
373
+ // The session must eventually be closed.
374
+ await ().atMost (5 , TimeUnit .SECONDS ).until (() -> session .getCloseState () == CloseState .CLOSED );
375
+ // The endPoint must eventually be closed.
376
+ await ().atMost (5 , TimeUnit .SECONDS ).until (() -> !session .getEndPoint ().isOpen ());
377
+
378
+ // Cleanup.
379
+ dataList .forEach (Stream .Data ::release );
380
+ }
381
+
382
+ @ ParameterizedTest
383
+ @ ValueSource (booleans = {false , true })
384
+ public void testSessionCloseWithPendingRequestThenClientDisconnectThenServerIdleTimeout (boolean useReaderWriter ) throws Exception
385
+ {
386
+ AtomicReference <Thread > serverThreadRef = new AtomicReference <>();
387
+ CountDownLatch serverFailureLatch = new CountDownLatch (1 );
388
+ start (new HttpServlet ()
389
+ {
390
+ @ Override
391
+ protected void service (HttpServletRequest request , HttpServletResponse response ) throws IOException
392
+ {
393
+ try
394
+ {
395
+ serverThreadRef .set (Thread .currentThread ());
396
+ if (useReaderWriter )
397
+ request .getReader ().transferTo (response .getWriter ());
398
+ else
399
+ request .getInputStream ().transferTo (response .getOutputStream ());
400
+ }
401
+ catch (Throwable x )
402
+ {
403
+ serverFailureLatch .countDown ();
404
+ throw x ;
405
+ }
406
+ }
407
+ });
408
+ long idleTimeout = 1000 ;
409
+ connector .setIdleTimeout (idleTimeout );
410
+
411
+ HTTP2Session session = (HTTP2Session )client .connect (new InetSocketAddress ("localhost" , connector .getLocalPort ()), new Session .Listener () {})
412
+ .get (5 , TimeUnit .SECONDS );
413
+ MetaData .Request request = new MetaData .Request ("POST" , HttpURI .from ("/" ), HttpVersion .HTTP_2 , HttpFields .EMPTY );
414
+ Stream stream = session .newStream (new HeadersFrame (request , null , false ), new Stream .Listener () {}).get (5 , TimeUnit .SECONDS );
415
+
416
+ stream .data (new DataFrame (stream .getId (), UTF_8 .encode ("Hello Jetty" ), false ))
417
+ .get (5 , TimeUnit .SECONDS );
418
+ stream .demand ();
419
+
420
+ await ().atMost (5 , TimeUnit .SECONDS ).until (() ->
421
+ {
422
+ Thread serverThread = serverThreadRef .get ();
423
+ return serverThread != null && serverThread .getState () == Thread .State .WAITING ;
424
+ });
425
+
426
+ // Initiates graceful close, then immediately disconnect.
427
+ session .close (ErrorCode .NO_ERROR .code , "client_close" , Callback .from (session ::disconnect ));
428
+
429
+ // Do not finish the streams, the server must idle timeout.
430
+ assertTrue (serverFailureLatch .await (2 * idleTimeout , TimeUnit .SECONDS ));
431
+ // The session must eventually be closed.
432
+ await ().atMost (5 , TimeUnit .SECONDS ).until (() -> session .getCloseState () == CloseState .CLOSED );
433
+ // The endPoint must eventually be closed.
434
+ await ().atMost (5 , TimeUnit .SECONDS ).until (() -> !session .getEndPoint ().isOpen ());
435
+ }
221
436
}
0 commit comments