Skip to content

Commit d82613a

Browse files
authored
xds: Fix cluster selection races when updating config selector
Listener2.onResult() doesn't require running in the sync context, so when called from the sync context it is guaranteed not to do its processing immediately (instead, it schedules work into the sync context). The code was doing an update dance: 1) update service config to add new cluster, 2) update config selector to use new cluster, 3) update service config to remove old clusters. But the onResult() wasn't being processed immediately, so the actual execution order was 2, 1, 3 which has a small window where RPCs will fail. But onResult2() does run immediately. And since ca4819a, updateBalancingState() updates the picker immediately. cleanUpRoutes() was also racy because it updated the routingConfig before swapping to the new config selector, so RPCs could fail saying there was no route instead of the useful error message. Even with the opposite order, some RPCs may be executing the while loop of selectConfig(), trying to acquire a cluster. The code unreffed the clusters before updating the routingConfig, so those RPCs could go into a tight loop until the routingConfig was updated. Also, once the routingConfig was updated to EMPTY those RPCs would similarly see the wrong error message. To give the correct error message, selectConfig() must fail such RPCs directly, and once it can do that there's no need to stop using the config selector in error cases. This has the benefit of fewer moving parts and more consistent threading among cases. The added test was able to detect the race 2% of the time. The slower the code/machine, the more reliable the test failed. ca4819a along with this commit reduced it to 0 failures in 1000 runs. Discovered when investigating b/394850611
1 parent ca4819a commit d82613a

File tree

3 files changed

+177
-84
lines changed

3 files changed

+177
-84
lines changed

xds/src/main/java/io/grpc/xds/XdsNameResolver.java

+24-29
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ final class XdsNameResolver extends NameResolver {
132132
// NamedFilterConfig.filterStateKey -> filter_instance.
133133
private final HashMap<String, Filter> activeFilters = new HashMap<>();
134134

135-
private volatile RoutingConfig routingConfig = RoutingConfig.EMPTY;
135+
private volatile RoutingConfig routingConfig;
136136
private Listener2 listener;
137137
private ObjectPool<XdsClient> xdsClientPool;
138138
private XdsClient xdsClient;
@@ -306,7 +306,7 @@ private void updateResolutionResult() {
306306

307307
if (logger.isLoggable(XdsLogLevel.INFO)) {
308308
logger.log(
309-
XdsLogLevel.INFO, "Generated service config:\n{0}", new Gson().toJson(rawServiceConfig));
309+
XdsLogLevel.INFO, "Generated service config: {0}", new Gson().toJson(rawServiceConfig));
310310
}
311311
ConfigOrError parsedServiceConfig = serviceConfigParser.parseServiceConfig(rawServiceConfig);
312312
Attributes attrs =
@@ -320,7 +320,7 @@ private void updateResolutionResult() {
320320
.setAttributes(attrs)
321321
.setServiceConfig(parsedServiceConfig)
322322
.build();
323-
listener.onResult(result);
323+
listener.onResult2(result);
324324
receivedConfig = true;
325325
}
326326

@@ -395,6 +395,9 @@ public Result selectConfig(PickSubchannelArgs args) {
395395
String path = "/" + args.getMethodDescriptor().getFullMethodName();
396396
do {
397397
routingCfg = routingConfig;
398+
if (routingCfg.errorStatus != null) {
399+
return Result.forError(routingCfg.errorStatus);
400+
}
398401
selectedRoute = null;
399402
for (RouteData route : routingCfg.routes) {
400403
if (RoutingUtils.matchRoute(route.routeMatch, path, headers, random)) {
@@ -626,19 +629,6 @@ private static String prefixedClusterSpecifierPluginName(String pluginName) {
626629
return "cluster_specifier_plugin:" + pluginName;
627630
}
628631

629-
private static final class FailingConfigSelector extends InternalConfigSelector {
630-
private final Result result;
631-
632-
public FailingConfigSelector(Status error) {
633-
this.result = Result.forError(error);
634-
}
635-
636-
@Override
637-
public Result selectConfig(PickSubchannelArgs args) {
638-
return result;
639-
}
640-
}
641-
642632
private class ResolveState implements ResourceWatcher<XdsListenerResource.LdsUpdate> {
643633
private final ConfigOrError emptyServiceConfig =
644634
serviceConfigParser.parseServiceConfig(Collections.<String, Object>emptyMap());
@@ -835,13 +825,13 @@ private void updateRoutes(List<VirtualHost> virtualHosts, long httpMaxStreamDura
835825
}
836826
}
837827
// Update service config to include newly added clusters.
838-
if (shouldUpdateResult) {
828+
if (shouldUpdateResult && routingConfig != null) {
839829
updateResolutionResult();
830+
shouldUpdateResult = false;
840831
}
841832
// Make newly added clusters selectable by config selector and deleted clusters no longer
842833
// selectable.
843834
routingConfig = new RoutingConfig(httpMaxStreamDurationNano, routesData.build());
844-
shouldUpdateResult = false;
845835
for (String cluster : deletedClusters) {
846836
int count = clusterRefs.get(cluster).refCount.decrementAndGet();
847837
if (count == 0) {
@@ -893,6 +883,9 @@ private ClientInterceptor createFilters(
893883
}
894884

895885
private void cleanUpRoutes(String error) {
886+
String errorWithNodeId =
887+
error + ", xDS node ID: " + xdsClient.getBootstrapInfo().node().getId();
888+
routingConfig = new RoutingConfig(Status.UNAVAILABLE.withDescription(errorWithNodeId));
896889
if (existingClusters != null) {
897890
for (String cluster : existingClusters) {
898891
int count = clusterRefs.get(cluster).refCount.decrementAndGet();
@@ -902,17 +895,12 @@ private void cleanUpRoutes(String error) {
902895
}
903896
existingClusters = null;
904897
}
905-
routingConfig = RoutingConfig.EMPTY;
898+
906899
// Without addresses the default LB (normally pick_first) should become TRANSIENT_FAILURE, and
907-
// the config selector handles the error message itself. Once the LB API allows providing
908-
// failure information for addresses yet still providing a service config, the config seector
909-
// could be avoided.
910-
String errorWithNodeId =
911-
error + ", xDS node ID: " + xdsClient.getBootstrapInfo().node().getId();
912-
listener.onResult(ResolutionResult.newBuilder()
900+
// the config selector handles the error message itself.
901+
listener.onResult2(ResolutionResult.newBuilder()
913902
.setAttributes(Attributes.newBuilder()
914-
.set(InternalConfigSelector.KEY,
915-
new FailingConfigSelector(Status.UNAVAILABLE.withDescription(errorWithNodeId)))
903+
.set(InternalConfigSelector.KEY, configSelector)
916904
.build())
917905
.setServiceConfig(emptyServiceConfig)
918906
.build());
@@ -983,12 +971,19 @@ public void onResourceDoesNotExist(final String resourceName) {
983971
private static class RoutingConfig {
984972
private final long fallbackTimeoutNano;
985973
final ImmutableList<RouteData> routes;
986-
987-
private static final RoutingConfig EMPTY = new RoutingConfig(0, ImmutableList.of());
974+
final Status errorStatus;
988975

989976
private RoutingConfig(long fallbackTimeoutNano, ImmutableList<RouteData> routes) {
990977
this.fallbackTimeoutNano = fallbackTimeoutNano;
991978
this.routes = checkNotNull(routes, "routes");
979+
this.errorStatus = null;
980+
}
981+
982+
private RoutingConfig(Status errorStatus) {
983+
this.fallbackTimeoutNano = 0;
984+
this.routes = null;
985+
this.errorStatus = checkNotNull(errorStatus, "errorStatus");
986+
checkArgument(!errorStatus.isOk(), "errorStatus should not be okay");
992987
}
993988
}
994989

xds/src/test/java/io/grpc/xds/FakeControlPlaneXdsIntegrationTest.java

+108-10
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919

2020
import static com.google.common.truth.Truth.assertThat;
2121
import static io.grpc.xds.DataPlaneRule.ENDPOINT_HOST_NAME;
22+
import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_CDS;
23+
import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_EDS;
2224
import static org.junit.Assert.assertEquals;
2325

2426
import com.github.xds.type.v3.TypedStruct;
27+
import com.google.common.collect.ImmutableMap;
2528
import com.google.protobuf.Any;
2629
import com.google.protobuf.Struct;
2730
import com.google.protobuf.Value;
@@ -36,11 +39,17 @@
3639
import io.envoyproxy.envoy.config.endpoint.v3.Endpoint;
3740
import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint;
3841
import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints;
42+
import io.envoyproxy.envoy.config.route.v3.Route;
43+
import io.envoyproxy.envoy.config.route.v3.RouteAction;
44+
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
45+
import io.envoyproxy.envoy.config.route.v3.RouteMatch;
46+
import io.envoyproxy.envoy.config.route.v3.VirtualHost;
3947
import io.envoyproxy.envoy.extensions.load_balancing_policies.wrr_locality.v3.WrrLocality;
4048
import io.grpc.CallOptions;
4149
import io.grpc.Channel;
4250
import io.grpc.ClientCall;
4351
import io.grpc.ClientInterceptor;
52+
import io.grpc.ClientStreamTracer;
4453
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
4554
import io.grpc.ForwardingClientCallListener;
4655
import io.grpc.LoadBalancerRegistry;
@@ -89,8 +98,7 @@ public void pingPong() throws Exception {
8998
ManagedChannel channel = dataPlane.getManagedChannel();
9099
SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub = SimpleServiceGrpc.newBlockingStub(
91100
channel);
92-
SimpleRequest request = SimpleRequest.newBuilder()
93-
.build();
101+
SimpleRequest request = SimpleRequest.getDefaultInstance();
94102
SimpleResponse goldenResponse = SimpleResponse.newBuilder()
95103
.setResponseMessage("Hi, xDS! Authority= test-server")
96104
.build();
@@ -104,8 +112,7 @@ public void pingPong_edsEndpoint_authorityOverride() throws Exception {
104112
ManagedChannel channel = dataPlane.getManagedChannel();
105113
SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub = SimpleServiceGrpc.newBlockingStub(
106114
channel);
107-
SimpleRequest request = SimpleRequest.newBuilder()
108-
.build();
115+
SimpleRequest request = SimpleRequest.getDefaultInstance();
109116
SimpleResponse goldenResponse = SimpleResponse.newBuilder()
110117
.setResponseMessage("Hi, xDS! Authority= " + ENDPOINT_HOST_NAME)
111118
.build();
@@ -145,8 +152,7 @@ public void pingPong_metadataLoadBalancer() throws Exception {
145152
// We add an interceptor to catch the response headers from the server.
146153
SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub = SimpleServiceGrpc.newBlockingStub(
147154
dataPlane.getManagedChannel()).withInterceptors(responseHeaderInterceptor);
148-
SimpleRequest request = SimpleRequest.newBuilder()
149-
.build();
155+
SimpleRequest request = SimpleRequest.getDefaultInstance();
150156
SimpleResponse goldenResponse = SimpleResponse.newBuilder()
151157
.setResponseMessage("Hi, xDS! Authority= test-server")
152158
.build();
@@ -160,6 +166,100 @@ public void pingPong_metadataLoadBalancer() throws Exception {
160166
}
161167
}
162168

169+
// Try to trigger "UNAVAILABLE: CDS encountered error: unable to find available subchannel for
170+
// cluster cluster:cluster1" race, if XdsNameResolver updates its ConfigSelector before
171+
// cluster_manager config.
172+
@Test
173+
public void changeClusterForRoute() throws Exception {
174+
// Start with route to cluster0
175+
InetSocketAddress edsInetSocketAddress
176+
= (InetSocketAddress) dataPlane.getServer().getListenSockets().get(0);
177+
controlPlane.getService().setXdsConfig(
178+
ADS_TYPE_URL_EDS,
179+
ImmutableMap.of(
180+
"eds-service-0",
181+
ControlPlaneRule.buildClusterLoadAssignment(
182+
edsInetSocketAddress.getHostName(), "", edsInetSocketAddress.getPort(),
183+
"eds-service-0"),
184+
"eds-service-1",
185+
ControlPlaneRule.buildClusterLoadAssignment(
186+
edsInetSocketAddress.getHostName(), "", edsInetSocketAddress.getPort(),
187+
"eds-service-1")));
188+
controlPlane.getService().setXdsConfig(
189+
ADS_TYPE_URL_CDS,
190+
ImmutableMap.of(
191+
"cluster0",
192+
ControlPlaneRule.buildCluster("cluster0", "eds-service-0"),
193+
"cluster1",
194+
ControlPlaneRule.buildCluster("cluster1", "eds-service-1")));
195+
controlPlane.setRdsConfig(RouteConfiguration.newBuilder()
196+
.setName("route-config.googleapis.com")
197+
.addVirtualHosts(VirtualHost.newBuilder()
198+
.addDomains("test-server")
199+
.addRoutes(Route.newBuilder()
200+
.setMatch(RouteMatch.newBuilder().setPrefix("/").build())
201+
.setRoute(RouteAction.newBuilder().setCluster("cluster0").build())
202+
.build())
203+
.build())
204+
.build());
205+
206+
class ClusterClientStreamTracer extends ClientStreamTracer {
207+
boolean usedCluster1;
208+
209+
@Override
210+
public void addOptionalLabel(String key, String value) {
211+
if ("grpc.lb.backend_service".equals(key)) {
212+
usedCluster1 = "cluster1".equals(value);
213+
}
214+
}
215+
}
216+
217+
ClusterClientStreamTracer tracer = new ClusterClientStreamTracer();
218+
ClientStreamTracer.Factory tracerFactory = new ClientStreamTracer.Factory() {
219+
@Override
220+
public ClientStreamTracer newClientStreamTracer(
221+
ClientStreamTracer.StreamInfo info, Metadata headers) {
222+
return tracer;
223+
}
224+
};
225+
ClientInterceptor tracerInterceptor = new ClientInterceptor() {
226+
@Override
227+
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
228+
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
229+
return next.newCall(method, callOptions.withStreamTracerFactory(tracerFactory));
230+
}
231+
};
232+
SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc
233+
.newBlockingStub(dataPlane.getManagedChannel())
234+
.withInterceptors(tracerInterceptor);
235+
SimpleRequest request = SimpleRequest.getDefaultInstance();
236+
SimpleResponse goldenResponse = SimpleResponse.newBuilder()
237+
.setResponseMessage("Hi, xDS! Authority= test-server")
238+
.build();
239+
assertThat(stub.unaryRpc(request)).isEqualTo(goldenResponse);
240+
assertThat(tracer.usedCluster1).isFalse();
241+
242+
// Check for errors when swapping route to cluster1
243+
controlPlane.setRdsConfig(RouteConfiguration.newBuilder()
244+
.setName("route-config.googleapis.com")
245+
.addVirtualHosts(VirtualHost.newBuilder()
246+
.addDomains("test-server")
247+
.addRoutes(Route.newBuilder()
248+
.setMatch(RouteMatch.newBuilder().setPrefix("/").build())
249+
.setRoute(RouteAction.newBuilder().setCluster("cluster1").build())
250+
.build())
251+
.build())
252+
.build());
253+
254+
for (int j = 0; j < 10; j++) {
255+
stub.unaryRpc(request);
256+
if (tracer.usedCluster1) {
257+
break;
258+
}
259+
}
260+
assertThat(tracer.usedCluster1).isTrue();
261+
}
262+
163263
// Captures response headers from the server.
164264
private static class ResponseHeaderClientInterceptor implements ClientInterceptor {
165265
Metadata reponseHeaders;
@@ -199,8 +299,7 @@ public void pingPong_ringHash() {
199299
ManagedChannel channel = dataPlane.getManagedChannel();
200300
SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub = SimpleServiceGrpc.newBlockingStub(
201301
channel);
202-
SimpleRequest request = SimpleRequest.newBuilder()
203-
.build();
302+
SimpleRequest request = SimpleRequest.getDefaultInstance();
204303
SimpleResponse goldenResponse = SimpleResponse.newBuilder()
205304
.setResponseMessage("Hi, xDS! Authority= test-server")
206305
.build();
@@ -231,8 +330,7 @@ public void pingPong_logicalDns_authorityOverride() {
231330
ManagedChannel channel = dataPlane.getManagedChannel();
232331
SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub = SimpleServiceGrpc.newBlockingStub(
233332
channel);
234-
SimpleRequest request = SimpleRequest.newBuilder()
235-
.build();
333+
SimpleRequest request = SimpleRequest.getDefaultInstance();
236334
SimpleResponse goldenResponse = SimpleResponse.newBuilder()
237335
.setResponseMessage("Hi, xDS! Authority= localhost:" + serverAddress.getPort())
238336
.build();

0 commit comments

Comments
 (0)