Skip to content

Commit 94c3f9d

Browse files
lorbangregwsbordet
authored
Restore thread starvation tests (jetty#12395)
For jetty#12214 restore ee9 and ee10 thread starvation tests * Deprecated ContentSourceCompletableFuture and added protection to method to match invocation type * more demand invocables with invocation type * Deprecated the CF APIs and replaced with explicit getXxx onXxx methods * replace FutureCallback and FuturePromise usages with Blocker.* instead * Converted new API on FormFields to use Promise * Modified SerializedInvoker to take into account the task InvocationType. * Added ThreadStarvationTest for all transports, to check that also HTTP/2 and HTTP/3 do not starve in case of non-blocking reads. * Improved Core ThreadStarvationTest * Fixed AsyncServletLongPollTest in ee9 and ee10 to match the current behavior in case of close. * Removed Invocable.InvocableCompletableFuture * Fixed SerializedInvoker. * Improved ThreadStarvationTest. * refined more DemandTask implementations * Added InvocableType.runWithoutBlocking (name is WIP) * replace Thread.sleep() with awaitility --------- Signed-off-by: Ludovic Orban <[email protected]> Signed-off-by: Simone Bordet <[email protected]> Co-authored-by: gregw <[email protected]> Co-authored-by: Simone Bordet <[email protected]>
1 parent 17e5820 commit 94c3f9d

File tree

51 files changed

+2705
-846
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+2705
-846
lines changed

documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/migration/ServletToHandlerDocs.java

+19-20
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.util.Locale;
2121
import java.util.Map;
2222
import java.util.Set;
23-
import java.util.concurrent.CompletableFuture;
2423
import java.util.function.Supplier;
2524

2625
import org.eclipse.jetty.http.HttpCookie;
@@ -36,8 +35,8 @@
3635
import org.eclipse.jetty.server.Response;
3736
import org.eclipse.jetty.server.Session;
3837
import org.eclipse.jetty.util.Callback;
39-
import org.eclipse.jetty.util.CompletableTask;
4038
import org.eclipse.jetty.util.Fields;
39+
import org.eclipse.jetty.util.Promise;
4140

4241
import static java.nio.charset.StandardCharsets.UTF_8;
4342

@@ -211,18 +210,19 @@ public boolean handle(Request request, Response response, Callback callback) thr
211210
{
212211
// Non-blocking read the request content as a String.
213212
// Use with caution as the request content may be large.
214-
CompletableFuture<String> completable = Content.Source.asStringAsync(request, UTF_8);
215-
216-
completable.whenComplete((requestContent, failure) ->
213+
Content.Source.asString(request, UTF_8, new Promise<>()
217214
{
218-
if (failure == null)
215+
@Override
216+
public void succeeded(String result)
219217
{
220218
// Process the request content here.
221219

222220
// Implicitly respond with status code 200 and no content.
223221
callback.succeeded();
224222
}
225-
else
223+
224+
@Override
225+
public void failed(Throwable failure)
226226
{
227227
// Implicitly respond with status code 500.
228228
callback.failed(failure);
@@ -243,18 +243,19 @@ public boolean handle(Request request, Response response, Callback callback) thr
243243
{
244244
// Non-blocking read the request content as a ByteBuffer.
245245
// Use with caution as the request content may be large.
246-
CompletableFuture<ByteBuffer> completable = Content.Source.asByteBufferAsync(request);
247-
248-
completable.whenComplete((requestContent, failure) ->
246+
Content.Source.asByteBuffer(request, new Promise<>()
249247
{
250-
if (failure == null)
248+
@Override
249+
public void succeeded(ByteBuffer result)
251250
{
252251
// Process the request content here.
253252

254253
// Implicitly respond with status code 200 and no content.
255254
callback.succeeded();
256255
}
257-
else
256+
257+
@Override
258+
public void failed(Throwable failure)
258259
{
259260
// Implicitly respond with status code 500.
260261
callback.failed(failure);
@@ -303,7 +304,8 @@ public class RequestContentAPIsSource extends Handler.Abstract
303304
@Override
304305
public boolean handle(Request request, Response response, Callback callback) throws Exception
305306
{
306-
CompletableTask<Void> reader = new CompletableTask<>()
307+
// When the read is complete, complete the Handler callback.
308+
Promise.Task<Void> reader = new Promise.Task<>(callback::succeeded, callback::failed)
307309
{
308310
@Override
309311
public void run()
@@ -326,7 +328,7 @@ public void run()
326328
if (Content.Chunk.isFailure(chunk))
327329
{
328330
Throwable failure = chunk.getFailure();
329-
completeExceptionally(failure);
331+
failed(failure);
330332
return;
331333
}
332334

@@ -343,7 +345,7 @@ public void run()
343345
// If the last chunk is read, complete normally.
344346
if (chunk.isLast())
345347
{
346-
complete(null);
348+
succeeded(null);
347349
return;
348350
}
349351

@@ -353,10 +355,7 @@ public void run()
353355
};
354356

355357
// Initiate the read of the request content.
356-
reader.start();
357-
358-
// When the read is complete, complete the Handler callback.
359-
callback.completeWith(reader);
358+
reader.run();
360359

361360
return true;
362361
}
@@ -440,7 +439,7 @@ public boolean handle(Request request, Response response, Callback callback) thr
440439
// Replaces:
441440
// - servletResponse.encodeRedirectURL(location)
442441
// - servletResponse.sendRedirect(location)
443-
String location = Request.toRedirectURI(request, "/redirect");
442+
String location = Response.toRedirectURI(request, "/redirect");
444443
Response.sendRedirect(request, response, callback, location);
445444

446445
// Sends an error response.

jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/Response.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.eclipse.jetty.http.HttpFields;
2222
import org.eclipse.jetty.http.HttpVersion;
2323
import org.eclipse.jetty.io.Content;
24+
import org.eclipse.jetty.util.thread.Invocable;
2425

2526
/**
2627
* <p>{@link Response} represents an HTTP response and offers methods to retrieve status code, HTTP version
@@ -187,8 +188,8 @@ interface AsyncContentListener extends ContentSourceListener
187188
@Override
188189
default void onContentSource(Response response, Content.Source contentSource)
189190
{
191+
Runnable demandCallback = Invocable.from(Invocable.InvocationType.NON_BLOCKING, () -> onContentSource(response, contentSource));
190192
Content.Chunk chunk = contentSource.read();
191-
Runnable demandCallback = () -> onContentSource(response, contentSource);
192193
if (chunk == null)
193194
{
194195
contentSource.demand(demandCallback);

jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import java.net.URI;
1717
import java.util.List;
18+
import java.util.concurrent.Executor;
1819
import java.util.concurrent.atomic.AtomicReference;
1920

2021
import org.eclipse.jetty.client.ContentDecoder;
@@ -67,15 +68,17 @@ public abstract class HttpReceiver
6768
{
6869
private static final Logger LOG = LoggerFactory.getLogger(HttpReceiver.class);
6970

70-
private final SerializedInvoker invoker = new SerializedInvoker(HttpReceiver.class);
7171
private final HttpChannel channel;
72+
private final SerializedInvoker invoker;
7273
private ResponseState responseState = ResponseState.IDLE;
7374
private NotifiableContentSource contentSource;
7475
private Throwable failure;
7576

7677
protected HttpReceiver(HttpChannel channel)
7778
{
7879
this.channel = channel;
80+
Executor executor = channel.getHttpDestination().getHttpClient().getExecutor();
81+
invoker = new SerializedInvoker(HttpReceiver.class.getSimpleName(), executor);
7982
}
8083

8184
/**

jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpRequest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -369,11 +369,11 @@ public Request accept(String... accepts)
369369
StringBuilder result = new StringBuilder();
370370
for (String accept : accepts)
371371
{
372-
if (result.length() > 0)
372+
if (!result.isEmpty())
373373
result.append(", ");
374374
result.append(accept);
375375
}
376-
if (result.length() > 0)
376+
if (!result.isEmpty())
377377
headers.put(HttpHeader.ACCEPT, result.toString());
378378
return this;
379379
}
@@ -859,7 +859,7 @@ private void extractParams(String query)
859859
if (parts.length > 0)
860860
{
861861
String name = urlDecode(parts[0]);
862-
if (name.trim().length() == 0)
862+
if (name.trim().isEmpty())
863863
continue;
864864
param(name, parts.length < 2 ? "" : urlDecode(parts[1]), true);
865865
}

jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/ResponseListeners.java

+22-5
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package org.eclipse.jetty.client.transport;
1515

1616
import java.nio.ByteBuffer;
17+
import java.nio.channels.ReadPendingException;
1718
import java.util.ArrayList;
1819
import java.util.Iterator;
1920
import java.util.List;
@@ -28,6 +29,7 @@
2829
import org.eclipse.jetty.io.content.ByteBufferContentSource;
2930
import org.eclipse.jetty.util.ExceptionUtil;
3031
import org.eclipse.jetty.util.thread.AutoLock;
32+
import org.eclipse.jetty.util.thread.Invocable;
3133
import org.slf4j.Logger;
3234
import org.slf4j.LoggerFactory;
3335

@@ -231,7 +233,7 @@ private static void consume(Content.Source contentSource)
231233
if (chunk != null)
232234
chunk.release();
233235
if (chunk == null || !chunk.isLast())
234-
contentSource.demand(() -> consume(contentSource));
236+
contentSource.demand(Invocable.from(Invocable.InvocationType.NON_BLOCKING, () -> consume(contentSource)));
235237
}
236238

237239
private static void notifyContentSource(Response.ContentSourceListener listener, Response response, Content.Source contentSource)
@@ -490,7 +492,7 @@ private void onDemandCallback()
490492
{
491493
// Retry the demand on spurious wakeup to avoid passing
492494
// a null chunk to the demultiplexer's ContentSources.
493-
originalContentSource.demand(this::onDemandCallback);
495+
originalContentSource.demand(Invocable.from(getInvocationType(), this::onDemandCallback));
494496
return;
495497
}
496498
// Demultiplexer content sources are invoked sequentially to be consistent with other listeners,
@@ -502,6 +504,16 @@ private void onDemandCallback()
502504
chunk.release();
503505
}
504506

507+
private Invocable.InvocationType getInvocationType()
508+
{
509+
Invocable.InvocationType invocationType = Invocable.InvocationType.NON_BLOCKING;
510+
for (ContentSource contentSource : contentSources)
511+
{
512+
invocationType = Invocable.combine(invocationType, contentSource.getInvocationType());
513+
}
514+
return invocationType;
515+
}
516+
505517
private void registerFailure(ContentSource contentSource, Throwable failure)
506518
{
507519
boolean processFail = false;
@@ -524,7 +536,7 @@ else if (counters.total() == listeners.size())
524536
if (processFail)
525537
originalContentSource.fail(failure);
526538
else if (processDemand)
527-
originalContentSource.demand(this::onDemandCallback);
539+
originalContentSource.demand(Invocable.from(getInvocationType(), this::onDemandCallback));
528540

529541
if (LOG.isDebugEnabled())
530542
LOG.debug("Registered failure on {}; {}", contentSource, counters);
@@ -547,7 +559,7 @@ private void registerDemand(ContentSource contentSource)
547559
}
548560
}
549561
if (processDemand)
550-
originalContentSource.demand(this::onDemandCallback);
562+
originalContentSource.demand(Invocable.from(getInvocationType(), this::onDemandCallback));
551563

552564
if (LOG.isDebugEnabled())
553565
LOG.debug("Registered demand on {}; {}", contentSource, counters);
@@ -641,6 +653,11 @@ private void onDemandCallback()
641653
}
642654
}
643655

656+
private Invocable.InvocationType getInvocationType()
657+
{
658+
return Invocable.getInvocationType(demandCallbackRef.get());
659+
}
660+
644661
@Override
645662
public Content.Chunk read()
646663
{
@@ -663,7 +680,7 @@ public Content.Chunk read()
663680
public void demand(Runnable demandCallback)
664681
{
665682
if (!demandCallbackRef.compareAndSet(null, Objects.requireNonNull(demandCallback)))
666-
throw new IllegalStateException();
683+
throw new ReadPendingException();
667684
Content.Chunk currentChunk = this.chunk;
668685
if (LOG.isDebugEnabled())
669686
LOG.debug("Content source #{} demand while current chunk is {}", index, currentChunk);

jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,20 @@ public void close()
412412
{
413413
Runnable task = stream.getHttpChannel().onClose();
414414
if (task != null)
415-
task.run();
415+
{
416+
ThreadPool.executeImmediately(getExecutor(), () ->
417+
{
418+
try
419+
{
420+
task.run();
421+
}
422+
finally
423+
{
424+
super.close();
425+
}
426+
});
427+
return;
428+
}
416429
}
417430
super.close();
418431
}

jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPart.java

+24-8
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.eclipse.jetty.util.StringUtil;
4646
import org.eclipse.jetty.util.Utf8StringBuilder;
4747
import org.eclipse.jetty.util.thread.AutoLock;
48+
import org.eclipse.jetty.util.thread.Invocable;
4849
import org.eclipse.jetty.util.thread.SerializedInvoker;
4950
import org.slf4j.Logger;
5051
import org.slf4j.LoggerFactory;
@@ -823,14 +824,8 @@ public void demand(Runnable demandCallback)
823824
}
824825
if (part != null)
825826
{
826-
part.getContentSource().demand(() ->
827-
{
828-
try (AutoLock ignoredAgain = lock.lock())
829-
{
830-
this.demand = null;
831-
}
832-
demandCallback.run();
833-
});
827+
// Inner class used instead of lambda for clarity in stack traces.
828+
part.getContentSource().demand(new DemandTask(demandCallback));
834829
}
835830
else if (invoke)
836831
{
@@ -887,6 +882,27 @@ private enum State
887882
{
888883
FIRST, MIDDLE, HEADERS, CONTENT, COMPLETE
889884
}
885+
886+
private class DemandTask extends Invocable.Task.Abstract
887+
{
888+
private final Runnable demandCallback;
889+
890+
private DemandTask(Runnable demandCallback)
891+
{
892+
super(Invocable.getInvocationType(demandCallback));
893+
this.demandCallback = demandCallback;
894+
}
895+
896+
@Override
897+
public void run()
898+
{
899+
try (AutoLock ignoredAgain = lock.lock())
900+
{
901+
AbstractContentSource.this.demand = null;
902+
}
903+
demandCallback.run();
904+
}
905+
}
890906
}
891907

892908
/**

jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/MultiPartConfig.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@
1717

1818
import org.eclipse.jetty.io.Content;
1919
import org.eclipse.jetty.util.Attributes;
20+
import org.eclipse.jetty.util.Promise;
2021

2122
import static org.eclipse.jetty.http.ComplianceViolation.Listener.NOOP;
2223

2324
/**
2425
* The Configuration needed to parse multipart/form-data.
25-
* @see MultiPartFormData#from(Content.Source, Attributes, String, MultiPartConfig)
26+
* @see MultiPartFormData#getParts(Content.Source, Attributes, String, MultiPartConfig)
27+
* @see MultiPartFormData#onParts(Content.Source, Attributes, String, MultiPartConfig, Promise.Invocable)
2628
*/
2729
public class MultiPartConfig
2830
{

0 commit comments

Comments
 (0)