21
21
import org .opensearch .cluster .node .DiscoveryNode ;
22
22
import org .opensearch .cluster .service .ClusterService ;
23
23
import org .opensearch .common .settings .Settings ;
24
+ import org .opensearch .common .util .concurrent .ThreadContext ;
24
25
import org .opensearch .core .common .transport .TransportAddress ;
25
26
import org .opensearch .core .transport .TransportResponse ;
26
27
import org .opensearch .extensions .ExtensionsManager ;
@@ -108,8 +109,7 @@ public void setup() {
108
109
);
109
110
}
110
111
111
- private void testSendRequestDecorate (Version remoteNodeVersion ) {
112
- boolean useJDKSerialization = remoteNodeVersion .before (ConfigConstants .FIRST_CUSTOM_SERIALIZATION_SUPPORTED_OS_VERSION );
112
+ private void testSendRequestDecorate (DiscoveryNode localNode , DiscoveryNode otherNode , boolean shouldUseJDKSerialization ) {
113
113
ClusterName clusterName = ClusterName .DEFAULT ;
114
114
when (clusterService .getClusterName ()).thenReturn (clusterName );
115
115
@@ -143,17 +143,7 @@ private void testSendRequestDecorate(Version remoteNodeVersion) {
143
143
@ SuppressWarnings ("unchecked" )
144
144
TransportResponseHandler <TransportResponse > handler = mock (TransportResponseHandler .class );
145
145
146
- InetAddress localAddress = null ;
147
- try {
148
- localAddress = InetAddress .getByName ("0.0.0.0" );
149
- } catch (final UnknownHostException uhe ) {
150
- throw new RuntimeException (uhe );
151
- }
152
-
153
- DiscoveryNode localNode = new DiscoveryNode ("local-node" , new TransportAddress (localAddress , 1234 ), Version .CURRENT );
154
146
Connection connection1 = transportService .getConnection (localNode );
155
-
156
- DiscoveryNode otherNode = new DiscoveryNode ("remote-node" , new TransportAddress (localAddress , 4321 ), remoteNodeVersion );
157
147
Connection connection2 = transportService .getConnection (otherNode );
158
148
159
149
// from thread context inside sendRequestDecorate
@@ -189,7 +179,7 @@ public <T extends TransportResponse> void sendRequest(
189
179
TransportResponseHandler <T > handler
190
180
) {
191
181
String serializedUserHeader = threadPool .getThreadContext ().getHeader (ConfigConstants .OPENDISTRO_SECURITY_USER_HEADER );
192
- assertEquals (serializedUserHeader , Base64Helper .serializeObject (user , useJDKSerialization ));
182
+ assertEquals (serializedUserHeader , Base64Helper .serializeObject (user , shouldUseJDKSerialization ));
193
183
}
194
184
};
195
185
// isSameNodeRequest = false
@@ -201,17 +191,48 @@ public <T extends TransportResponse> void sendRequest(
201
191
assertEquals (threadPool .getThreadContext ().getHeader (ConfigConstants .OPENDISTRO_SECURITY_USER_HEADER ), null );
202
192
}
203
193
194
+
195
+ /**
196
+ * Tests the scenario when remote node is on same OS version
197
+ */
204
198
@ Test
205
199
public void testSendRequestDecorate () {
206
- testSendRequestDecorate (Version .CURRENT );
200
+ DiscoveryNode localNode = new DiscoveryNode ("local-node" , new TransportAddress (getLocalAddress (), 1234 ), Version .CURRENT );
201
+ DiscoveryNode otherNode = new DiscoveryNode ("other-node" , new TransportAddress (getLocalAddress () ,3456 ), Version .CURRENT );
202
+ testSendRequestDecorate (localNode , otherNode , true );
207
203
}
208
204
209
205
/**
210
- * Tests the scenario when remote node does not implement custom serialization protocol and uses JDK serialization
206
+ * Tests the scenarios for mixed node versions
211
207
*/
212
208
@ Test
213
- public void testSendRequestDecorateWhenRemoteNodeUsesJDKSerde () {
214
- testSendRequestDecorate (Version .V_2_0_0 );
209
+ public void testSendRequestDecorateWithMixedNodeVersions () {
210
+
211
+ // local on latest version, remote on 2.11.0 - should use custom
212
+
213
+ try (ThreadContext .StoredContext ignore = threadPool .getThreadContext ().stashContext ()) {
214
+ DiscoveryNode localNode = new DiscoveryNode ("local-node" , new TransportAddress (getLocalAddress (), 1234 ), Version .CURRENT );
215
+ DiscoveryNode otherNode = new DiscoveryNode ("other-node" , new TransportAddress (getLocalAddress (), 3456 ), ConfigConstants .FIRST_CUSTOM_SERIALIZATION_SUPPORTED_OS_VERSION );
216
+ testSendRequestDecorate (localNode , otherNode , false );
217
+ }
218
+
219
+ // remote node is on a version > 2.11.1 while local node is on version 2.11.1 - should use JDK
220
+ try (ThreadContext .StoredContext ignore = threadPool .getThreadContext ().stashContext ()) {
221
+ DiscoveryNode localNode = new DiscoveryNode ("local-node" , new TransportAddress (getLocalAddress (), 1234 ), Version .CURRENT );
222
+ DiscoveryNode otherNode = new DiscoveryNode ("other-node" , new TransportAddress (getLocalAddress (), 3456 ), Version .V_2_11_1 );
223
+ testSendRequestDecorate (localNode , otherNode , true );
224
+ }
225
+
215
226
}
216
227
228
+
229
+ private static InetAddress getLocalAddress () {
230
+ try {
231
+ return InetAddress .getByName ("0.0.0.0" );
232
+ } catch (final UnknownHostException uhe ) {
233
+ throw new RuntimeException (uhe );
234
+ }
235
+ }
236
+
237
+
217
238
}
0 commit comments