21
21
import org .opensearch .cluster .node .DiscoveryNode ;
22
22
import org .opensearch .cluster .service .ClusterService ;
23
23
import org .opensearch .common .settings .Settings ;
24
+ import org .opensearch .common .util .concurrent .ThreadContext ;
24
25
import org .opensearch .core .common .transport .TransportAddress ;
25
26
import org .opensearch .core .transport .TransportResponse ;
26
27
import org .opensearch .extensions .ExtensionsManager ;
@@ -108,8 +109,7 @@ public void setup() {
108
109
);
109
110
}
110
111
111
- private void testSendRequestDecorate (Version remoteNodeVersion ) {
112
- boolean useJDKSerialization = remoteNodeVersion .before (ConfigConstants .FIRST_CUSTOM_SERIALIZATION_SUPPORTED_OS_VERSION );
112
+ private void testSendRequestDecorate (DiscoveryNode localNode , DiscoveryNode otherNode , boolean shouldUseJDKSerialization ) {
113
113
ClusterName clusterName = ClusterName .DEFAULT ;
114
114
when (clusterService .getClusterName ()).thenReturn (clusterName );
115
115
@@ -143,17 +143,7 @@ private void testSendRequestDecorate(Version remoteNodeVersion) {
143
143
@ SuppressWarnings ("unchecked" )
144
144
TransportResponseHandler <TransportResponse > handler = mock (TransportResponseHandler .class );
145
145
146
- InetAddress localAddress = null ;
147
- try {
148
- localAddress = InetAddress .getByName ("0.0.0.0" );
149
- } catch (final UnknownHostException uhe ) {
150
- throw new RuntimeException (uhe );
151
- }
152
-
153
- DiscoveryNode localNode = new DiscoveryNode ("local-node" , new TransportAddress (localAddress , 1234 ), Version .CURRENT );
154
146
Connection connection1 = transportService .getConnection (localNode );
155
-
156
- DiscoveryNode otherNode = new DiscoveryNode ("remote-node" , new TransportAddress (localAddress , 4321 ), remoteNodeVersion );
157
147
Connection connection2 = transportService .getConnection (otherNode );
158
148
159
149
// from thread context inside sendRequestDecorate
@@ -189,7 +179,7 @@ public <T extends TransportResponse> void sendRequest(
189
179
TransportResponseHandler <T > handler
190
180
) {
191
181
String serializedUserHeader = threadPool .getThreadContext ().getHeader (ConfigConstants .OPENDISTRO_SECURITY_USER_HEADER );
192
- assertEquals (serializedUserHeader , Base64Helper .serializeObject (user , useJDKSerialization ));
182
+ assertEquals (serializedUserHeader , Base64Helper .serializeObject (user , shouldUseJDKSerialization ));
193
183
}
194
184
};
195
185
// isSameNodeRequest = false
@@ -201,17 +191,49 @@ public <T extends TransportResponse> void sendRequest(
201
191
assertEquals (threadPool .getThreadContext ().getHeader (ConfigConstants .OPENDISTRO_SECURITY_USER_HEADER ), null );
202
192
}
203
193
194
+ /**
195
+ * Tests the scenario when remote node is on same OS version
196
+ */
204
197
@ Test
205
198
public void testSendRequestDecorate () {
206
- testSendRequestDecorate (Version .CURRENT );
199
+ DiscoveryNode localNode = new DiscoveryNode ("local-node" , new TransportAddress (getLocalAddress (), 1234 ), Version .CURRENT );
200
+ DiscoveryNode otherNode = new DiscoveryNode ("other-node" , new TransportAddress (getLocalAddress (), 3456 ), Version .CURRENT );
201
+ testSendRequestDecorate (localNode , otherNode , true );
207
202
}
208
203
209
204
/**
210
- * Tests the scenario when remote node does not implement custom serialization protocol and uses JDK serialization
205
+ * Tests the scenarios for mixed node versions
211
206
*/
212
207
@ Test
213
- public void testSendRequestDecorateWhenRemoteNodeUsesJDKSerde () {
214
- testSendRequestDecorate (Version .V_2_0_0 );
208
+ public void testSendRequestDecorateWithMixedNodeVersions () {
209
+
210
+ // local on latest version, remote on 2.11.0 - should use custom
211
+
212
+ try (ThreadContext .StoredContext ignore = threadPool .getThreadContext ().stashContext ()) {
213
+ DiscoveryNode localNode = new DiscoveryNode ("local-node" , new TransportAddress (getLocalAddress (), 1234 ), Version .CURRENT );
214
+ DiscoveryNode otherNode = new DiscoveryNode (
215
+ "other-node" ,
216
+ new TransportAddress (getLocalAddress (), 3456 ),
217
+ ConfigConstants .FIRST_CUSTOM_SERIALIZATION_SUPPORTED_OS_VERSION
218
+ );
219
+ testSendRequestDecorate (localNode , otherNode , false );
220
+ }
221
+
222
+ // remote node is on a version > 2.11.1 while local node is on version 2.11.1 - should use JDK
223
+ try (ThreadContext .StoredContext ignore = threadPool .getThreadContext ().stashContext ()) {
224
+ DiscoveryNode localNode = new DiscoveryNode ("local-node" , new TransportAddress (getLocalAddress (), 1234 ), Version .CURRENT );
225
+ DiscoveryNode otherNode = new DiscoveryNode ("other-node" , new TransportAddress (getLocalAddress (), 3456 ), Version .V_2_11_1 );
226
+ testSendRequestDecorate (localNode , otherNode , true );
227
+ }
228
+
229
+ }
230
+
231
+ private static InetAddress getLocalAddress () {
232
+ try {
233
+ return InetAddress .getByName ("0.0.0.0" );
234
+ } catch (final UnknownHostException uhe ) {
235
+ throw new RuntimeException (uhe );
236
+ }
215
237
}
216
238
217
239
}
0 commit comments