diff --git a/xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java b/xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java index ae5b3c5b1c9..fd2a1d2a069 100644 --- a/xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java +++ b/xds/src/main/java/io/grpc/xds/EnvoyServerProtoData.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.protobuf.util.Durations; +import io.envoyproxy.envoy.config.core.v3.SocketAddress.Protocol; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CommonTlsContext; import io.grpc.Internal; import io.grpc.xds.client.EnvoyProtoData; @@ -248,13 +249,17 @@ abstract static class Listener { @Nullable abstract FilterChain defaultFilterChain(); + @Nullable + abstract Protocol protocol(); + static Listener create( String name, @Nullable String address, ImmutableList filterChains, - @Nullable FilterChain defaultFilterChain) { + @Nullable FilterChain defaultFilterChain, + @Nullable Protocol protocol) { return new AutoValue_EnvoyServerProtoData_Listener(name, address, filterChains, - defaultFilterChain); + defaultFilterChain, protocol); } } diff --git a/xds/src/main/java/io/grpc/xds/XdsListenerResource.java b/xds/src/main/java/io/grpc/xds/XdsListenerResource.java index ec0cbbf243f..041b659b4c3 100644 --- a/xds/src/main/java/io/grpc/xds/XdsListenerResource.java +++ b/xds/src/main/java/io/grpc/xds/XdsListenerResource.java @@ -162,13 +162,16 @@ static EnvoyServerProtoData.Listener parseServerSideListener( } String address = null; + SocketAddress socketAddress = null; if (proto.getAddress().hasSocketAddress()) { - SocketAddress socketAddress = proto.getAddress().getSocketAddress(); + socketAddress = proto.getAddress().getSocketAddress(); address = socketAddress.getAddress(); + if (address.isEmpty()) { + throw new ResourceInvalidException("Invalid address: Empty address is not allowed."); + } switch (socketAddress.getPortSpecifierCase()) { case NAMED_PORT: - address = address + ":" + socketAddress.getNamedPort(); - break; + throw new ResourceInvalidException("NAMED_PORT is not supported in gRPC."); case PORT_VALUE: address = address + ":" + socketAddress.getPortValue(); break; @@ -209,8 +212,8 @@ static EnvoyServerProtoData.Listener parseServerSideListener( null, certProviderInstances, args); } - return EnvoyServerProtoData.Listener.create( - proto.getName(), address, filterChains.build(), defaultFilterChain); + return EnvoyServerProtoData.Listener.create(proto.getName(), address, filterChains.build(), + defaultFilterChain, socketAddress == null ? null : socketAddress.getProtocol()); } @VisibleForTesting diff --git a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java index 5c1b3105c45..4cb5f6cb003 100644 --- a/xds/src/main/java/io/grpc/xds/XdsNameResolver.java +++ b/xds/src/main/java/io/grpc/xds/XdsNameResolver.java @@ -676,6 +676,14 @@ public void onUpdate(StatusOr updateOrStatus) { // Process Route XdsConfig update = updateOrStatus.getValue(); HttpConnectionManager httpConnectionManager = update.getListener().httpConnectionManager(); + if (httpConnectionManager == null) { + String error = "API Listener: httpConnectionManager does not exist."; + logger.log(XdsLogLevel.INFO, error); + updateActiveFilters(null); + cleanUpRoutes(updateOrStatus.getStatus()); + return; + } + VirtualHost virtualHost = update.getVirtualHost(); ImmutableList filterConfigs = httpConnectionManager.httpFilterConfigs(); long streamDurationNano = httpConnectionManager.httpMaxStreamDurationNano(); diff --git a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java index e0185974861..4049ee8fbb7 100644 --- a/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java +++ b/xds/src/main/java/io/grpc/xds/XdsServerWrapper.java @@ -24,7 +24,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.net.HostAndPort; +import com.google.common.net.InetAddresses; import com.google.common.util.concurrent.SettableFuture; +import io.envoyproxy.envoy.config.core.v3.SocketAddress.Protocol; import io.grpc.Attributes; import io.grpc.InternalServerInterceptors; import io.grpc.Metadata; @@ -57,6 +60,7 @@ import io.grpc.xds.client.XdsClient.ResourceWatcher; import io.grpc.xds.internal.security.SslContextProviderSupplier; import java.io.IOException; +import java.net.InetAddress; import java.net.SocketAddress; import java.util.ArrayList; import java.util.HashMap; @@ -383,7 +387,21 @@ public void onChanged(final LdsUpdate update) { return; } logger.log(Level.FINEST, "Received Lds update {0}", update); - checkNotNull(update.listener(), "update"); + if (update.listener() == null) { + onResourceDoesNotExist("Non-API"); + return; + } + + String ldsAddress = update.listener().address(); + if (ldsAddress == null || update.listener().protocol() != Protocol.TCP + || !ipAddressesMatch(ldsAddress)) { + handleConfigNotFoundOrMismatch( + Status.UNKNOWN.withDescription( + String.format( + "Listener address mismatch: expected %s, but got %s.", + listenerAddress, ldsAddress)).asException()); + return; + } if (!pendingRds.isEmpty()) { // filter chain state has not yet been applied to filterChainSelectorManager and there // are two sets of sslContextProviderSuppliers, so we release the old ones. @@ -432,6 +450,20 @@ public void onChanged(final LdsUpdate update) { } } + private boolean ipAddressesMatch(String ldsAddress) { + HostAndPort ldsAddressHnP = HostAndPort.fromString(ldsAddress); + HostAndPort listenerAddressHnP = HostAndPort.fromString(listenerAddress); + + InetAddress listenerIp = InetAddresses.forString(listenerAddressHnP.getHost()); + InetAddress ldsIp = InetAddresses.forString(ldsAddressHnP.getHost()); + if (!ldsAddressHnP.hasPort() || !listenerAddressHnP.hasPort() + || ldsAddressHnP.getPort() != listenerAddressHnP.getPort()) { + return false; + } + + return listenerIp.equals(ldsIp); + } + @Override public void onResourceDoesNotExist(final String resourceName) { if (stopped) { @@ -440,7 +472,7 @@ public void onResourceDoesNotExist(final String resourceName) { StatusException statusException = Status.UNAVAILABLE.withDescription( String.format("Listener %s unavailable, xDS node ID: %s", resourceName, xdsClient.getBootstrapInfo().node().getId())).asException(); - handleConfigNotFound(statusException); + handleConfigNotFoundOrMismatch(statusException); } @Override @@ -673,7 +705,7 @@ public Listener interceptCall(ServerCall call, }; } - private void handleConfigNotFound(StatusException exception) { + private void handleConfigNotFoundOrMismatch(StatusException exception) { cleanUpRouteDiscoveryStates(); shutdownActiveFilters(); List toRelease = getSuppliersInUse(); diff --git a/xds/src/test/java/io/grpc/xds/ControlPlaneRule.java b/xds/src/test/java/io/grpc/xds/ControlPlaneRule.java index 39761912ea5..8c10627d153 100644 --- a/xds/src/test/java/io/grpc/xds/ControlPlaneRule.java +++ b/xds/src/test/java/io/grpc/xds/ControlPlaneRule.java @@ -366,10 +366,14 @@ static Listener buildServerListener() { .setFilterChainMatch(filterChainMatch) .addFilters(filter) .build(); + Address address = Address.newBuilder() + .setSocketAddress(SocketAddress.newBuilder().setAddress("0.0.0.0").setPortValue(0)) + .build(); return Listener.newBuilder() .setName(SERVER_LISTENER_TEMPLATE_NO_REPLACEMENT) .setTrafficDirection(TrafficDirection.INBOUND) .addFilterChains(filterChain) + .setAddress(address) .build(); } } diff --git a/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTestMisc.java b/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTestMisc.java index f3f4d74eb2f..ff97afe6916 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTestMisc.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientWrapperForServerSdsTestMisc.java @@ -32,6 +32,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.SettableFuture; +import io.envoyproxy.envoy.config.core.v3.SocketAddress.Protocol; import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.Status; @@ -165,9 +166,10 @@ public void run() { EnvoyServerProtoData.Listener tcpListener = EnvoyServerProtoData.Listener.create( "listener1", - "10.1.2.3", + "0.0.0.0:7000", ImmutableList.of(), - null); + null, + Protocol.TCP); LdsUpdate listenerUpdate = LdsUpdate.forTcpListener(tcpListener); xdsClient.ldsWatcher.onChanged(listenerUpdate); verify(listener, timeout(5000)).onServing(); diff --git a/xds/src/test/java/io/grpc/xds/XdsSecurityClientServerTest.java b/xds/src/test/java/io/grpc/xds/XdsSecurityClientServerTest.java index cd3ef293369..271e7191e4c 100644 --- a/xds/src/test/java/io/grpc/xds/XdsSecurityClientServerTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsSecurityClientServerTest.java @@ -35,6 +35,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.SettableFuture; +import io.envoyproxy.envoy.config.core.v3.SocketAddress.Protocol; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.CertificateValidationContext; import io.grpc.Attributes; import io.grpc.EquivalentAddressGroup; @@ -488,7 +489,7 @@ public void mtlsClientServer_changeServerContext_expectException() DownstreamTlsContext downstreamTlsContext = CommonTlsContextTestsUtil.buildDownstreamTlsContext( "cert-instance-name2", true, true); - EnvoyServerProtoData.Listener listener = buildListener("listener1", "0.0.0.0", + EnvoyServerProtoData.Listener listener = buildListener("listener1", "0.0.0.0:0", downstreamTlsContext, tlsContextManagerForServer); xdsClient.deliverLdsUpdate(LdsUpdate.forTcpListener(listener)); @@ -592,7 +593,7 @@ private void buildServer( tlsContextManagerForServer = new TlsContextManagerImpl(bootstrapInfoForServer); XdsServerWrapper xdsServer = (XdsServerWrapper) builder.build(); SettableFuture startFuture = startServerAsync(xdsServer); - EnvoyServerProtoData.Listener listener = buildListener("listener1", "10.1.2.3", + EnvoyServerProtoData.Listener listener = buildListener("listener1", "0.0.0.0:0", downstreamTlsContext, tlsContextManagerForServer); LdsUpdate listenerUpdate = LdsUpdate.forTcpListener(listener); xdsClient.deliverLdsUpdate(listenerUpdate); @@ -633,7 +634,7 @@ static EnvoyServerProtoData.Listener buildListener( "filter-chain-foo", filterChainMatch, httpConnectionManager, tlsContext, tlsContextManager); EnvoyServerProtoData.Listener listener = EnvoyServerProtoData.Listener.create( - name, address, ImmutableList.of(defaultFilterChain), null); + name, address, ImmutableList.of(defaultFilterChain), null, Protocol.TCP); return listener; } diff --git a/xds/src/test/java/io/grpc/xds/XdsServerBuilderTest.java b/xds/src/test/java/io/grpc/xds/XdsServerBuilderTest.java index d28c7d7c607..2c168c65869 100644 --- a/xds/src/test/java/io/grpc/xds/XdsServerBuilderTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsServerBuilderTest.java @@ -17,6 +17,7 @@ package io.grpc.xds; import static com.google.common.truth.Truth.assertThat; +import static io.grpc.xds.XdsServerTestHelper.buildTestListener; import static org.junit.Assert.fail; import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; @@ -26,6 +27,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.SettableFuture; import io.grpc.BindableService; import io.grpc.InsecureServerCredentials; @@ -33,6 +35,7 @@ import io.grpc.Status; import io.grpc.StatusException; import io.grpc.testing.GrpcCleanupRule; +import io.grpc.xds.XdsListenerResource.LdsUpdate; import io.grpc.xds.XdsServerTestHelper.FakeXdsClient; import io.grpc.xds.XdsServerTestHelper.FakeXdsClientPoolFactory; import io.grpc.xds.internal.security.CommonTlsContextTestsUtil; @@ -221,10 +224,13 @@ public void xdsServer_startError() buildServer(mockXdsServingStatusListener); Future future = startServerAsync(); // create port conflict for start to fail - XdsServerTestHelper.generateListenerUpdate( - xdsClient, + EnvoyServerProtoData.Listener listener = buildTestListener( + "listener1", "0.0.0.0:" + port, ImmutableList.of(), CommonTlsContextTestsUtil.buildTestInternalDownstreamTlsContext("CERT1", "VA1"), - tlsContextManager); + null, tlsContextManager); + LdsUpdate listenerUpdate = LdsUpdate.forTcpListener(listener); + xdsClient.deliverLdsUpdate(listenerUpdate); + Throwable exception = future.get(5, TimeUnit.SECONDS); assertThat(exception).isInstanceOf(IOException.class); assertThat(exception).hasMessageThat().contains("Failed to bind"); diff --git a/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java b/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java index ec9f4c54c31..94353cc886d 100644 --- a/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java +++ b/xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.SettableFuture; +import io.envoyproxy.envoy.config.core.v3.SocketAddress.Protocol; import io.grpc.InsecureChannelCredentials; import io.grpc.MetricRecorder; import io.grpc.internal.ObjectPool; @@ -74,7 +75,7 @@ public class XdsServerTestHelper { static void generateListenerUpdate(FakeXdsClient xdsClient, EnvoyServerProtoData.DownstreamTlsContext tlsContext, TlsContextManager tlsContextManager) { - EnvoyServerProtoData.Listener listener = buildTestListener("listener1", "10.1.2.3", + EnvoyServerProtoData.Listener listener = buildTestListener("listener1", "0.0.0.0:0", ImmutableList.of(), tlsContext, null, tlsContextManager); LdsUpdate listenerUpdate = LdsUpdate.forTcpListener(listener); xdsClient.deliverLdsUpdate(listenerUpdate); @@ -85,7 +86,8 @@ static void generateListenerUpdate( EnvoyServerProtoData.DownstreamTlsContext tlsContext, EnvoyServerProtoData.DownstreamTlsContext tlsContextForDefaultFilterChain, TlsContextManager tlsContextManager) { - EnvoyServerProtoData.Listener listener = buildTestListener("listener1", "10.1.2.3", sourcePorts, + EnvoyServerProtoData.Listener listener = buildTestListener( + "listener1", "0.0.0.0:7000", sourcePorts, tlsContext, tlsContextForDefaultFilterChain, tlsContextManager); LdsUpdate listenerUpdate = LdsUpdate.forTcpListener(listener); xdsClient.deliverLdsUpdate(listenerUpdate); @@ -128,9 +130,8 @@ static EnvoyServerProtoData.Listener buildTestListener( EnvoyServerProtoData.FilterChain defaultFilterChain = EnvoyServerProtoData.FilterChain.create( "filter-chain-bar", defaultFilterChainMatch, httpConnectionManager, tlsContextForDefaultFilterChain, tlsContextManager); - EnvoyServerProtoData.Listener listener = - EnvoyServerProtoData.Listener.create( - name, address, ImmutableList.of(filterChain1), defaultFilterChain); + EnvoyServerProtoData.Listener listener = EnvoyServerProtoData.Listener.create( + name, address, ImmutableList.of(filterChain1), defaultFilterChain, Protocol.TCP); return listener; } @@ -297,8 +298,8 @@ void deliverLdsUpdate(LdsUpdate ldsUpdate) { void deliverLdsUpdate( List filterChains, @Nullable FilterChain defaultFilterChain) { - deliverLdsUpdate(LdsUpdate.forTcpListener(Listener.create( - "listener", "0.0.0.0:1", ImmutableList.copyOf(filterChains), defaultFilterChain))); + deliverLdsUpdate(LdsUpdate.forTcpListener(Listener.create("listener", "0.0.0.0:1", + ImmutableList.copyOf(filterChains), defaultFilterChain, Protocol.TCP))); } void deliverLdsUpdate(FilterChain filterChain, @Nullable FilterChain defaultFilterChain) { diff --git a/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java b/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java index b866e10c559..7035bff4901 100644 --- a/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java +++ b/xds/src/test/java/io/grpc/xds/XdsServerWrapperTest.java @@ -37,6 +37,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.net.InetAddresses; import com.google.common.util.concurrent.SettableFuture; +import io.envoyproxy.envoy.config.core.v3.SocketAddress.Protocol; import io.grpc.Attributes; import io.grpc.InsecureChannelCredentials; import io.grpc.Metadata; @@ -54,6 +55,7 @@ import io.grpc.xds.EnvoyServerProtoData.CidrRange; import io.grpc.xds.EnvoyServerProtoData.FilterChain; import io.grpc.xds.EnvoyServerProtoData.FilterChainMatch; +import io.grpc.xds.EnvoyServerProtoData.Listener; import io.grpc.xds.Filter.FilterConfig; import io.grpc.xds.Filter.NamedFilterConfig; import io.grpc.xds.FilterChainMatchingProtocolNegotiators.FilterChainMatchingHandler.FilterChainSelector; @@ -61,6 +63,7 @@ import io.grpc.xds.VirtualHost.Route; import io.grpc.xds.VirtualHost.Route.RouteMatch; import io.grpc.xds.VirtualHost.Route.RouteMatch.PathMatcher; +import io.grpc.xds.XdsListenerResource.LdsUpdate; import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; import io.grpc.xds.XdsServerBuilder.XdsServingStatusListener; import io.grpc.xds.XdsServerTestHelper.FakeXdsClient; @@ -538,6 +541,46 @@ public void run() { verify(mockServer).start(); } + @Test + public void onChanged_listenerAddressMismatch() + throws ExecutionException, InterruptedException, TimeoutException { + + when(mockBuilder.build()).thenReturn(mockServer); + xdsServerWrapper = new XdsServerWrapper("10.1.2.3:1", mockBuilder, listener, + selectorManager, new FakeXdsClientPoolFactory(xdsClient), + filterRegistry, executor.getScheduledExecutorService()); + + final SettableFuture start = SettableFuture.create(); + Executors.newSingleThreadExecutor().execute(new Runnable() { + @Override + public void run() { + try { + start.set(xdsServerWrapper.start()); + } catch (Exception ex) { + start.setException(ex); + } + } + }); + String ldsResource = xdsClient.ldsResource.get(5, TimeUnit.SECONDS); + assertThat(ldsResource).isEqualTo("grpc/server?udpa.resource.listening_address=10.1.2.3:1"); + + VirtualHost virtualHost = + VirtualHost.create( + "virtual-host", Collections.singletonList("auth"), new ArrayList(), + ImmutableMap.of()); + HttpConnectionManager httpConnectionManager = HttpConnectionManager.forVirtualHosts( + 0L, Collections.singletonList(virtualHost), new ArrayList()); + EnvoyServerProtoData.FilterChain filterChain = EnvoyServerProtoData.FilterChain.create( + "filter-chain-foo", createMatch(), httpConnectionManager, createTls(), + mock(TlsContextManager.class)); + + LdsUpdate listenerUpdate = LdsUpdate.forTcpListener( + Listener.create("listener", "20.3.4.5:1", + ImmutableList.copyOf(Collections.singletonList(filterChain)), null, Protocol.TCP)); + xdsClient.deliverLdsUpdate(listenerUpdate); + verify(listener, timeout(10000)).onNotServing(any()); + } + @Test public void discoverState_rds() throws Exception { final SettableFuture start = SettableFuture.create();