Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #12272 - Potential deadlock with Vaadin. #12506

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ public void onClose(Throwable cause)
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Close {} ", this);
super.onClose(cause);

LifeCycle.stop(strategy);
}

Expand Down Expand Up @@ -375,7 +374,8 @@ else if (filled == 0)
{
shutdown = true;
session.onShutdown();
return null;
// The onShutDown() call above may have produced a task.
return pollTask();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,13 +615,17 @@ private void onFailure(FailureFrame frame, Callback callback)
failure = frame.getFailure();
flowControlLength = drain();
}
close();
boolean removed = session.removeStream(this);
session.dataConsumed(this, flowControlLength);
if (removed)
notifyFailure(this, frame, callback);
else
callback.succeeded();
close();

notifyFailure(this, frame, new Nested(callback)
{
@Override
public void completed()
{
session.removeStream(HTTP2Stream.this);
}
});
}

private int drain()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -202,12 +203,9 @@ public void onStreamFailure(Stream stream, Throwable failure, Callback callback)
if (channel != null)
{
Runnable task = channel.onFailure(failure, callback);
if (task != null)
{
// We must dispatch to another thread because the task
// may call application code that performs blocking I/O.
offerTask(task, true);
}
// The task may unblock a blocked read or write, so it cannot be
// queued, because there may be no threads available to run it.
ThreadPool.executeImmediately(getExecutor(), task);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,12 +591,32 @@ public Runnable onFailure(Throwable failure, Callback callback)
{
boolean remote = failure instanceof EOFException;
Runnable runnable = remote ? _httpChannel.onRemoteFailure(new EofException(failure)) : _httpChannel.onFailure(failure);
return () ->

class FailureTask implements Runnable
{
if (runnable != null)
runnable.run();
callback.succeeded();
};
@Override
public void run()
{
try
{
if (runnable != null)
runnable.run();
callback.succeeded();
}
catch (Throwable x)
{
callback.failed(x);
}
}

@Override
public String toString()
{
return "%s[%s]".formatted(getClass().getSimpleName(), runnable);
}
}

return new FailureTask();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ else if (filled == -1)
}
catch (IOException e)
{
LOG.debug("Unable to shutdown output", e);
if (LOG.isDebugEnabled())
LOG.debug("Unable to shutdown input", e);
shutdownInput();
filled = -1;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -33,23 +35,29 @@
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.CloseState;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.client.HTTP2Client;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.nullValue;
Expand All @@ -64,15 +72,20 @@ public class Http2AsyncIOServletTest

private void start(HttpServlet httpServlet) throws Exception
{
server = new Server();
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
connector = new ServerConnector(server, 1, 1, new HTTP2CServerConnectionFactory(httpConfig));
server.addConnector(connector);
ServletContextHandler servletContextHandler = new ServletContextHandler("/");
servletContextHandler.addServlet(new ServletHolder(httpServlet), "/*");
server.setHandler(servletContextHandler);
server.start();

QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
client = new HTTP2Client();
client.setExecutor(clientThreads);
client.start();
}

Expand Down Expand Up @@ -218,4 +231,206 @@ public void onStartAsync(AsyncEvent event)

assertTrue(errorLatch.await(5, TimeUnit.SECONDS));
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testSessionCloseWithPendingRequestThenReset(boolean useReaderWriter) throws Exception
{
// Disable output aggregation for Servlets, so each byte is echoed back.
httpConfig.setOutputAggregationSize(0);
CountDownLatch serverFailureLatch = new CountDownLatch(1);
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
try
{
if (useReaderWriter)
request.getReader().transferTo(response.getWriter());
else
request.getInputStream().transferTo(response.getOutputStream());
}
catch (Throwable x)
{
serverFailureLatch.countDown();
throw x;
}
}
});

HTTP2Session session = (HTTP2Session)client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {})
.get(5, TimeUnit.SECONDS);
Queue<Stream.Data> dataList = new ConcurrentLinkedQueue<>();
MetaData.Request request = new MetaData.Request("POST", HttpURI.from("/"), HttpVersion.HTTP_2, HttpFields.EMPTY);
Stream stream = session.newStream(new HeadersFrame(request, null, false), new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
while (true)
{
Stream.Data data = stream.readData();
if (data == null)
{
stream.demand();
return;
}
dataList.offer(data);
if (data.frame().isEndStream())
return;
}
}
}).get(5, TimeUnit.SECONDS);

stream.data(new DataFrame(stream.getId(), UTF_8.encode("Hello Jetty"), false))
.get(5, TimeUnit.SECONDS);
stream.demand();

await().atMost(5, TimeUnit.SECONDS).until(() -> !dataList.isEmpty());

// Initiates graceful close, waits for the streams to finish as per specification.
session.close(ErrorCode.NO_ERROR.code, "client_close", Callback.NOOP);

// Finish the pending stream, either by resetting or sending the last frame.
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code));

// The server should see the effects of the reset.
assertTrue(serverFailureLatch.await(5, TimeUnit.SECONDS));
// The session must eventually be closed.
await().atMost(5, TimeUnit.SECONDS).until(() -> session.getCloseState() == CloseState.CLOSED);
// The endPoint must eventually be closed.
await().atMost(5, TimeUnit.SECONDS).until(() -> !session.getEndPoint().isOpen());

// Cleanup.
dataList.forEach(Stream.Data::release);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleanup could be done right after awaiting !dataList.isEmpty()

}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testSessionCloseWithPendingRequestServerIdleTimeout(boolean useReaderWriter) throws Exception
{
// Disable output aggregation for Servlets, so each byte is echoed back.
httpConfig.setOutputAggregationSize(0);
CountDownLatch serverFailureLatch = new CountDownLatch(1);
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
try
{
if (useReaderWriter)
request.getReader().transferTo(response.getWriter());
else
request.getInputStream().transferTo(response.getOutputStream());
}
catch (Throwable x)
{
serverFailureLatch.countDown();
throw x;
}
}
});
long idleTimeout = 1000;
connector.setIdleTimeout(idleTimeout);

HTTP2Session session = (HTTP2Session)client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {})
.get(5, TimeUnit.SECONDS);
Queue<Stream.Data> dataList = new ConcurrentLinkedQueue<>();
MetaData.Request request = new MetaData.Request("POST", HttpURI.from("/"), HttpVersion.HTTP_2, HttpFields.EMPTY);
Stream stream = session.newStream(new HeadersFrame(request, null, false), new Stream.Listener()
{
@Override
public void onDataAvailable(Stream stream)
{
while (true)
{
Stream.Data data = stream.readData();
if (data == null)
{
stream.demand();
return;
}
dataList.offer(data);
if (data.frame().isEndStream())
return;
}
}
}).get(5, TimeUnit.SECONDS);

stream.data(new DataFrame(stream.getId(), UTF_8.encode("Hello Jetty"), false))
.get(5, TimeUnit.SECONDS);
stream.demand();

await().atMost(5, TimeUnit.SECONDS).until(() -> !dataList.isEmpty());

// Initiates graceful close, waits for the streams to finish as per specification.
session.close(ErrorCode.NO_ERROR.code, "client_close", Callback.NOOP);

// Do not finish the streams, the server must idle timeout.
assertTrue(serverFailureLatch.await(2 * idleTimeout, TimeUnit.SECONDS));
// The session must eventually be closed.
await().atMost(5, TimeUnit.SECONDS).until(() -> session.getCloseState() == CloseState.CLOSED);
// The endPoint must eventually be closed.
await().atMost(5, TimeUnit.SECONDS).until(() -> !session.getEndPoint().isOpen());

// Cleanup.
dataList.forEach(Stream.Data::release);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testSessionCloseWithPendingRequestThenClientDisconnectThenServerIdleTimeout(boolean useReaderWriter) throws Exception
{
AtomicReference<Thread> serverThreadRef = new AtomicReference<>();
CountDownLatch serverFailureLatch = new CountDownLatch(1);
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException
{
try
{
serverThreadRef.set(Thread.currentThread());
if (useReaderWriter)
request.getReader().transferTo(response.getWriter());
else
request.getInputStream().transferTo(response.getOutputStream());
}
catch (Throwable x)
{
serverFailureLatch.countDown();
throw x;
}
}
});
long idleTimeout = 1000;
connector.setIdleTimeout(idleTimeout);

HTTP2Session session = (HTTP2Session)client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {})
.get(5, TimeUnit.SECONDS);
MetaData.Request request = new MetaData.Request("POST", HttpURI.from("/"), HttpVersion.HTTP_2, HttpFields.EMPTY);
Stream stream = session.newStream(new HeadersFrame(request, null, false), new Stream.Listener() {}).get(5, TimeUnit.SECONDS);

stream.data(new DataFrame(stream.getId(), UTF_8.encode("Hello Jetty"), false))
.get(5, TimeUnit.SECONDS);
stream.demand();

await().atMost(5, TimeUnit.SECONDS).until(() ->
{
Thread serverThread = serverThreadRef.get();
return serverThread != null && serverThread.getState() == Thread.State.WAITING;
});

// Initiates graceful close, then immediately disconnect.
session.close(ErrorCode.NO_ERROR.code, "client_close", Callback.from(session::disconnect));

// Do not finish the streams, the server must idle timeout.
assertTrue(serverFailureLatch.await(2 * idleTimeout, TimeUnit.SECONDS));
// The session must eventually be closed.
await().atMost(5, TimeUnit.SECONDS).until(() -> session.getCloseState() == CloseState.CLOSED);
// The endPoint must eventually be closed.
await().atMost(5, TimeUnit.SECONDS).until(() -> !session.getEndPoint().isOpen());
}
}
Loading