Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: Added changes to make ServerImpl.internalClose() thread-safe #11924

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions core/src/main/java/io/grpc/internal/AbstractServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ protected interface Sink {
private final StatsTraceContext statsTraceCtx;
private boolean outboundClosed;
private boolean headersSent;
private boolean closeCalled;

protected AbstractServerStream(
WritableBufferAllocator bufferAllocator, StatsTraceContext statsTraceCtx) {
Expand Down Expand Up @@ -120,6 +121,7 @@ public final void deliverFrame(

@Override
public final void close(Status status, Metadata trailers) {
Preconditions.checkState(!closeCalled, "call already closed");
Preconditions.checkNotNull(status, "status");
Preconditions.checkNotNull(trailers, "trailers");
if (!outboundClosed) {
Expand All @@ -130,6 +132,7 @@ public final void close(Status status, Metadata trailers) {
// closedStatus is only set from here, and is read from a place that has happen-after
// guarantees with respect to here.
transportState().setClosedStatus(status);
closeCalled = true;
abstractServerStreamSink().writeTrailers(trailers, headersSent, status);
}
}
Expand Down
13 changes: 8 additions & 5 deletions core/src/main/java/io/grpc/internal/ServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -779,8 +779,8 @@ static final class JumpToApplicationThreadServerStreamListener implements Server
// Only accessed from callExecutor.
private ServerStreamListener listener;

public JumpToApplicationThreadServerStreamListener(Executor executor,
Executor cancelExecutor, ServerStream stream, Context.CancellableContext context, Tag tag) {
public JumpToApplicationThreadServerStreamListener(Executor executor, Executor cancelExecutor,
ServerStream stream, Context.CancellableContext context, Tag tag) {
this.callExecutor = executor;
this.cancelExecutor = cancelExecutor;
this.stream = stream;
Expand Down Expand Up @@ -808,10 +808,13 @@ void setListener(ServerStreamListener listener) {
/**
* Like {@link ServerCall#close(Status, Metadata)}, but thread-safe for internal use.
*/
private void internalClose(Throwable t) {
// TODO(ejona86): this is not thread-safe :)
void internalClose(Throwable t) {
String description = "Application error processing RPC";
stream.close(Status.UNKNOWN.withDescription(description).withCause(t), new Metadata());
Metadata metadata = Status.trailersFromThrowable(t);
if (metadata == null) {
metadata = new Metadata();
}
stream.close(Status.UNKNOWN.withDescription(description).withCause(t), metadata);
}

@Override
Expand Down
20 changes: 19 additions & 1 deletion core/src/test/java/io/grpc/internal/ServerImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
import io.grpc.StringMarshaller;
import io.grpc.internal.ServerImpl.JumpToApplicationThreadServerStreamListener;
import io.grpc.internal.ServerImplBuilder.ClientTransportServersBuilder;
import io.grpc.internal.SingleMessageProducer;
import io.grpc.internal.testing.TestServerStreamTracer;
import io.grpc.util.MutableHandlerRegistry;
import io.perfmark.PerfMark;
Expand Down Expand Up @@ -1535,6 +1534,25 @@ public void channelz_transport_membershp() throws Exception {
assertTrue(after.end);
}

@Test
public void testInternalClose_withNullMetadata() {
JumpToApplicationThreadServerStreamListener listener
= new JumpToApplicationThreadServerStreamListener(
executor.getScheduledExecutorService(),
executor.getScheduledExecutorService(),
stream,
Context.ROOT.withCancellation(),
PerfMark.createTag());
Throwable throwableMock = mock(Throwable.class);
// Stub Status.trailersFromThrowable to return null, simulating the case where metadata is null
when(Status.trailersFromThrowable(throwableMock)).thenReturn(null);
listener.internalClose(throwableMock);
// Capture the arguments passed to stream.close()
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
verify(stream).close(statusCaptor.capture(), metadataCaptor.capture());
}

private void createAndStartServer() throws IOException {
createServer();
server.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1573,6 +1573,7 @@ public void messageProducerOnlyProducesRequestedMessages() throws Exception {
verifyMessageCountAndClose(serverStreamCreation.listener.messageQueue, 1);
}

@SuppressWarnings("MissingFail")
@Test
public void interactionsAfterServerStreamCloseAreNoops() throws Exception {
server.start(serverListener);
Expand All @@ -1597,10 +1598,14 @@ public void interactionsAfterServerStreamCloseAreNoops() throws Exception {
assertNotNull(clientStreamListener.status.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertNotNull(clientStreamListener.trailers.get(TIMEOUT_MS, TimeUnit.MILLISECONDS));

// Ensure that for a closed ServerStream, interactions are noops
server.stream.writeHeaders(new Metadata(), true);
server.stream.writeMessage(methodDescriptor.streamResponse("response"));
server.stream.close(Status.INTERNAL, new Metadata());
try {
// Ensure that for a closed ServerStream, interactions are noops
server.stream.writeHeaders(new Metadata(), true);
server.stream.writeMessage(methodDescriptor.streamResponse("response"));
server.stream.close(Status.INTERNAL, new Metadata());
} catch (Exception exception) {
assertTrue(exception.getMessage().contains("call already closed"));
}

// Make sure new streams still work properly
doPingPong(serverListener);
Expand Down