Skip to content

Commit 4679d16

Browse files
committed
Merged branch 'jetty-12.0.x' into 'jetty-12.1.x'.
Signed-off-by: Simone Bordet <[email protected]>
2 parents 4f815f6 + 5f121a7 commit 4679d16

File tree

7 files changed

+475
-22
lines changed

7 files changed

+475
-22
lines changed

jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,6 @@ public void onClose(Throwable cause)
145145
if (LOG.isDebugEnabled())
146146
LOG.debug("HTTP2 Close {} ", this);
147147
super.onClose(cause);
148-
149148
LifeCycle.stop(strategy);
150149
}
151150

@@ -375,7 +374,8 @@ else if (filled == 0)
375374
{
376375
shutdown = true;
377376
session.onShutdown();
378-
return null;
377+
// The onShutDown() call above may have produced a task.
378+
return pollTask();
379379
}
380380
}
381381
}

jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -615,13 +615,17 @@ private void onFailure(FailureFrame frame, Callback callback)
615615
failure = frame.getFailure();
616616
flowControlLength = drain();
617617
}
618-
close();
619-
boolean removed = session.removeStream(this);
620618
session.dataConsumed(this, flowControlLength);
621-
if (removed)
622-
notifyFailure(this, frame, callback);
623-
else
624-
callback.succeeded();
619+
close();
620+
621+
notifyFailure(this, frame, new Nested(callback)
622+
{
623+
@Override
624+
public void completed()
625+
{
626+
session.removeStream(HTTP2Stream.this);
627+
}
628+
});
625629
}
626630

627631
private int drain()

jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HTTP2ServerConnection.java

+4-6
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.eclipse.jetty.util.Callback;
5555
import org.eclipse.jetty.util.Promise;
5656
import org.eclipse.jetty.util.StringUtil;
57+
import org.eclipse.jetty.util.thread.ThreadPool;
5758
import org.slf4j.Logger;
5859
import org.slf4j.LoggerFactory;
5960

@@ -202,12 +203,9 @@ public void onStreamFailure(Stream stream, Throwable failure, Callback callback)
202203
if (channel != null)
203204
{
204205
Runnable task = channel.onFailure(failure, callback);
205-
if (task != null)
206-
{
207-
// We must dispatch to another thread because the task
208-
// may call application code that performs blocking I/O.
209-
offerTask(task, true);
210-
}
206+
// The task may unblock a blocked read or write, so it cannot be
207+
// queued, because there may be no threads available to run it.
208+
ThreadPool.executeImmediately(getExecutor(), task);
211209
}
212210
else
213211
{

jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HttpStreamOverHTTP2.java

+25-5
Original file line numberDiff line numberDiff line change
@@ -591,12 +591,32 @@ public Runnable onFailure(Throwable failure, Callback callback)
591591
{
592592
boolean remote = failure instanceof EOFException;
593593
Runnable runnable = remote ? _httpChannel.onRemoteFailure(new EofException(failure)) : _httpChannel.onFailure(failure);
594-
return () ->
594+
595+
class FailureTask implements Runnable
595596
{
596-
if (runnable != null)
597-
runnable.run();
598-
callback.succeeded();
599-
};
597+
@Override
598+
public void run()
599+
{
600+
try
601+
{
602+
if (runnable != null)
603+
runnable.run();
604+
callback.succeeded();
605+
}
606+
catch (Throwable x)
607+
{
608+
callback.failed(x);
609+
}
610+
}
611+
612+
@Override
613+
public String toString()
614+
{
615+
return "%s[%s]".formatted(getClass().getSimpleName(), runnable);
616+
}
617+
}
618+
619+
return new FailureTask();
600620
}
601621

602622
@Override

jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/SocketChannelEndPoint.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,8 @@ else if (filled == -1)
8989
}
9090
catch (IOException e)
9191
{
92-
LOG.debug("Unable to shutdown output", e);
92+
if (LOG.isDebugEnabled())
93+
LOG.debug("Unable to shutdown input", e);
9394
shutdownInput();
9495
filled = -1;
9596
}

jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/Http2AsyncIOServletTest.java

+216-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
import java.io.IOException;
1717
import java.net.InetSocketAddress;
18+
import java.util.Queue;
19+
import java.util.concurrent.ConcurrentLinkedQueue;
1820
import java.util.concurrent.CountDownLatch;
1921
import java.util.concurrent.TimeUnit;
2022
import java.util.concurrent.atomic.AtomicReference;
@@ -33,23 +35,29 @@
3335
import org.eclipse.jetty.http.HttpURI;
3436
import org.eclipse.jetty.http.HttpVersion;
3537
import org.eclipse.jetty.http.MetaData;
38+
import org.eclipse.jetty.http2.CloseState;
3639
import org.eclipse.jetty.http2.ErrorCode;
40+
import org.eclipse.jetty.http2.HTTP2Session;
3741
import org.eclipse.jetty.http2.api.Session;
3842
import org.eclipse.jetty.http2.api.Stream;
3943
import org.eclipse.jetty.http2.client.HTTP2Client;
44+
import org.eclipse.jetty.http2.frames.DataFrame;
4045
import org.eclipse.jetty.http2.frames.HeadersFrame;
4146
import org.eclipse.jetty.http2.frames.ResetFrame;
4247
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
4348
import org.eclipse.jetty.io.EofException;
4449
import org.eclipse.jetty.server.HttpConfiguration;
4550
import org.eclipse.jetty.server.Server;
4651
import org.eclipse.jetty.server.ServerConnector;
52+
import org.eclipse.jetty.util.Callback;
4753
import org.eclipse.jetty.util.FuturePromise;
4854
import org.eclipse.jetty.util.component.LifeCycle;
55+
import org.eclipse.jetty.util.thread.QueuedThreadPool;
4956
import org.junit.jupiter.api.AfterEach;
5057
import org.junit.jupiter.params.ParameterizedTest;
5158
import org.junit.jupiter.params.provider.ValueSource;
5259

60+
import static java.nio.charset.StandardCharsets.UTF_8;
5361
import static org.awaitility.Awaitility.await;
5462
import static org.hamcrest.Matchers.instanceOf;
5563
import static org.hamcrest.Matchers.nullValue;
@@ -64,15 +72,20 @@ public class Http2AsyncIOServletTest
6472

6573
private void start(HttpServlet httpServlet) throws Exception
6674
{
67-
server = new Server();
75+
QueuedThreadPool serverThreads = new QueuedThreadPool();
76+
serverThreads.setName("server");
77+
server = new Server(serverThreads);
6878
connector = new ServerConnector(server, 1, 1, new HTTP2CServerConnectionFactory(httpConfig));
6979
server.addConnector(connector);
7080
ServletContextHandler servletContextHandler = new ServletContextHandler("/");
7181
servletContextHandler.addServlet(new ServletHolder(httpServlet), "/*");
7282
server.setHandler(servletContextHandler);
7383
server.start();
7484

85+
QueuedThreadPool clientThreads = new QueuedThreadPool();
86+
clientThreads.setName("client");
7587
client = new HTTP2Client();
88+
client.setExecutor(clientThreads);
7689
client.start();
7790
}
7891

@@ -218,4 +231,206 @@ public void onStartAsync(AsyncEvent event)
218231

219232
assertTrue(errorLatch.await(5, TimeUnit.SECONDS));
220233
}
234+
235+
@ParameterizedTest
236+
@ValueSource(booleans = {false, true})
237+
public void testSessionCloseWithPendingRequestThenReset(boolean useReaderWriter) throws Exception
238+
{
239+
// Disable output aggregation for Servlets, so each byte is echoed back.
240+
httpConfig.setOutputAggregationSize(0);
241+
CountDownLatch serverFailureLatch = new CountDownLatch(1);
242+
start(new HttpServlet()
243+
{
244+
@Override
245+
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
246+
{
247+
try
248+
{
249+
if (useReaderWriter)
250+
request.getReader().transferTo(response.getWriter());
251+
else
252+
request.getInputStream().transferTo(response.getOutputStream());
253+
}
254+
catch (Throwable x)
255+
{
256+
serverFailureLatch.countDown();
257+
throw x;
258+
}
259+
}
260+
});
261+
262+
HTTP2Session session = (HTTP2Session)client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {})
263+
.get(5, TimeUnit.SECONDS);
264+
Queue<Stream.Data> dataList = new ConcurrentLinkedQueue<>();
265+
MetaData.Request request = new MetaData.Request("POST", HttpURI.from("/"), HttpVersion.HTTP_2, HttpFields.EMPTY);
266+
Stream stream = session.newStream(new HeadersFrame(request, null, false), new Stream.Listener()
267+
{
268+
@Override
269+
public void onDataAvailable(Stream stream)
270+
{
271+
while (true)
272+
{
273+
Stream.Data data = stream.readData();
274+
if (data == null)
275+
{
276+
stream.demand();
277+
return;
278+
}
279+
dataList.offer(data);
280+
if (data.frame().isEndStream())
281+
return;
282+
}
283+
}
284+
}).get(5, TimeUnit.SECONDS);
285+
286+
stream.data(new DataFrame(stream.getId(), UTF_8.encode("Hello Jetty"), false))
287+
.get(5, TimeUnit.SECONDS);
288+
stream.demand();
289+
290+
await().atMost(5, TimeUnit.SECONDS).until(() -> !dataList.isEmpty());
291+
292+
// Initiates graceful close, waits for the streams to finish as per specification.
293+
session.close(ErrorCode.NO_ERROR.code, "client_close", Callback.NOOP);
294+
295+
// Finish the pending stream, either by resetting or sending the last frame.
296+
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code));
297+
298+
// The server should see the effects of the reset.
299+
assertTrue(serverFailureLatch.await(5, TimeUnit.SECONDS));
300+
// The session must eventually be closed.
301+
await().atMost(5, TimeUnit.SECONDS).until(() -> session.getCloseState() == CloseState.CLOSED);
302+
// The endPoint must eventually be closed.
303+
await().atMost(5, TimeUnit.SECONDS).until(() -> !session.getEndPoint().isOpen());
304+
305+
// Cleanup.
306+
dataList.forEach(Stream.Data::release);
307+
}
308+
309+
@ParameterizedTest
310+
@ValueSource(booleans = {false, true})
311+
public void testSessionCloseWithPendingRequestServerIdleTimeout(boolean useReaderWriter) throws Exception
312+
{
313+
// Disable output aggregation for Servlets, so each byte is echoed back.
314+
httpConfig.setOutputAggregationSize(0);
315+
CountDownLatch serverFailureLatch = new CountDownLatch(1);
316+
start(new HttpServlet()
317+
{
318+
@Override
319+
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
320+
{
321+
try
322+
{
323+
if (useReaderWriter)
324+
request.getReader().transferTo(response.getWriter());
325+
else
326+
request.getInputStream().transferTo(response.getOutputStream());
327+
}
328+
catch (Throwable x)
329+
{
330+
serverFailureLatch.countDown();
331+
throw x;
332+
}
333+
}
334+
});
335+
long idleTimeout = 1000;
336+
connector.setIdleTimeout(idleTimeout);
337+
338+
HTTP2Session session = (HTTP2Session)client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {})
339+
.get(5, TimeUnit.SECONDS);
340+
Queue<Stream.Data> dataList = new ConcurrentLinkedQueue<>();
341+
MetaData.Request request = new MetaData.Request("POST", HttpURI.from("/"), HttpVersion.HTTP_2, HttpFields.EMPTY);
342+
Stream stream = session.newStream(new HeadersFrame(request, null, false), new Stream.Listener()
343+
{
344+
@Override
345+
public void onDataAvailable(Stream stream)
346+
{
347+
while (true)
348+
{
349+
Stream.Data data = stream.readData();
350+
if (data == null)
351+
{
352+
stream.demand();
353+
return;
354+
}
355+
dataList.offer(data);
356+
if (data.frame().isEndStream())
357+
return;
358+
}
359+
}
360+
}).get(5, TimeUnit.SECONDS);
361+
362+
stream.data(new DataFrame(stream.getId(), UTF_8.encode("Hello Jetty"), false))
363+
.get(5, TimeUnit.SECONDS);
364+
stream.demand();
365+
366+
await().atMost(5, TimeUnit.SECONDS).until(() -> !dataList.isEmpty());
367+
368+
// Initiates graceful close, waits for the streams to finish as per specification.
369+
session.close(ErrorCode.NO_ERROR.code, "client_close", Callback.NOOP);
370+
371+
// Do not finish the streams, the server must idle timeout.
372+
assertTrue(serverFailureLatch.await(2 * idleTimeout, TimeUnit.SECONDS));
373+
// The session must eventually be closed.
374+
await().atMost(5, TimeUnit.SECONDS).until(() -> session.getCloseState() == CloseState.CLOSED);
375+
// The endPoint must eventually be closed.
376+
await().atMost(5, TimeUnit.SECONDS).until(() -> !session.getEndPoint().isOpen());
377+
378+
// Cleanup.
379+
dataList.forEach(Stream.Data::release);
380+
}
381+
382+
@ParameterizedTest
383+
@ValueSource(booleans = {false, true})
384+
public void testSessionCloseWithPendingRequestThenClientDisconnectThenServerIdleTimeout(boolean useReaderWriter) throws Exception
385+
{
386+
AtomicReference<Thread> serverThreadRef = new AtomicReference<>();
387+
CountDownLatch serverFailureLatch = new CountDownLatch(1);
388+
start(new HttpServlet()
389+
{
390+
@Override
391+
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
392+
{
393+
try
394+
{
395+
serverThreadRef.set(Thread.currentThread());
396+
if (useReaderWriter)
397+
request.getReader().transferTo(response.getWriter());
398+
else
399+
request.getInputStream().transferTo(response.getOutputStream());
400+
}
401+
catch (Throwable x)
402+
{
403+
serverFailureLatch.countDown();
404+
throw x;
405+
}
406+
}
407+
});
408+
long idleTimeout = 1000;
409+
connector.setIdleTimeout(idleTimeout);
410+
411+
HTTP2Session session = (HTTP2Session)client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {})
412+
.get(5, TimeUnit.SECONDS);
413+
MetaData.Request request = new MetaData.Request("POST", HttpURI.from("/"), HttpVersion.HTTP_2, HttpFields.EMPTY);
414+
Stream stream = session.newStream(new HeadersFrame(request, null, false), new Stream.Listener() {}).get(5, TimeUnit.SECONDS);
415+
416+
stream.data(new DataFrame(stream.getId(), UTF_8.encode("Hello Jetty"), false))
417+
.get(5, TimeUnit.SECONDS);
418+
stream.demand();
419+
420+
await().atMost(5, TimeUnit.SECONDS).until(() ->
421+
{
422+
Thread serverThread = serverThreadRef.get();
423+
return serverThread != null && serverThread.getState() == Thread.State.WAITING;
424+
});
425+
426+
// Initiates graceful close, then immediately disconnect.
427+
session.close(ErrorCode.NO_ERROR.code, "client_close", Callback.from(session::disconnect));
428+
429+
// Do not finish the streams, the server must idle timeout.
430+
assertTrue(serverFailureLatch.await(2 * idleTimeout, TimeUnit.SECONDS));
431+
// The session must eventually be closed.
432+
await().atMost(5, TimeUnit.SECONDS).until(() -> session.getCloseState() == CloseState.CLOSED);
433+
// The endPoint must eventually be closed.
434+
await().atMost(5, TimeUnit.SECONDS).until(() -> !session.getEndPoint().isOpen());
435+
}
221436
}

0 commit comments

Comments
 (0)