@@ -54,7 +54,8 @@ public static Map<String, JsonNode> transform(
54
54
@ Nonnull final Urn urn ,
55
55
@ Nonnull final RecordTemplate timeseriesAspect ,
56
56
@ Nonnull final AspectSpec aspectSpec ,
57
- @ Nullable final SystemMetadata systemMetadata )
57
+ @ Nullable final SystemMetadata systemMetadata ,
58
+ @ Nonnull final String idHashAlgo )
58
59
throws JsonProcessingException {
59
60
ObjectNode commonDocument = getCommonDocument (urn , timeseriesAspect , systemMetadata );
60
61
Map <String , JsonNode > finalDocuments = new HashMap <>();
@@ -74,7 +75,7 @@ public static Map<String, JsonNode> transform(
74
75
final Map <TimeseriesFieldSpec , List <Object >> timeseriesFieldValueMap =
75
76
FieldExtractor .extractFields (timeseriesAspect , aspectSpec .getTimeseriesFieldSpecs ());
76
77
timeseriesFieldValueMap .forEach ((k , v ) -> setTimeseriesField (document , k , v ));
77
- finalDocuments .put (getDocId (document , null ), document );
78
+ finalDocuments .put (getDocId (document , null , idHashAlgo ), document );
78
79
79
80
// Create new rows for the member collection fields.
80
81
final Map <TimeseriesFieldCollectionSpec , List <Object >> timeseriesFieldCollectionValueMap =
@@ -83,7 +84,7 @@ public static Map<String, JsonNode> transform(
83
84
timeseriesFieldCollectionValueMap .forEach (
84
85
(key , values ) ->
85
86
finalDocuments .putAll (
86
- getTimeseriesFieldCollectionDocuments (key , values , commonDocument )));
87
+ getTimeseriesFieldCollectionDocuments (key , values , commonDocument , idHashAlgo )));
87
88
return finalDocuments ;
88
89
}
89
90
@@ -216,12 +217,13 @@ private static void setTimeseriesField(
216
217
private static Map <String , JsonNode > getTimeseriesFieldCollectionDocuments (
217
218
final TimeseriesFieldCollectionSpec fieldSpec ,
218
219
final List <Object > values ,
219
- final ObjectNode commonDocument ) {
220
+ final ObjectNode commonDocument ,
221
+ @ Nonnull final String idHashAlgo ) {
220
222
return values .stream ()
221
223
.map (value -> getTimeseriesFieldCollectionDocument (fieldSpec , value , commonDocument ))
222
224
.collect (
223
225
Collectors .toMap (
224
- keyDocPair -> getDocId (keyDocPair .getSecond (), keyDocPair .getFirst ()),
226
+ keyDocPair -> getDocId (keyDocPair .getSecond (), keyDocPair .getFirst (), idHashAlgo ),
225
227
Pair ::getSecond ));
226
228
}
227
229
@@ -257,9 +259,9 @@ private static Pair<String, ObjectNode> getTimeseriesFieldCollectionDocument(
257
259
finalDocument );
258
260
}
259
261
260
- private static String getDocId (@ Nonnull JsonNode document , String collectionId )
262
+ private static String getDocId (
263
+ @ Nonnull JsonNode document , String collectionId , @ Nonnull String idHashAlgo )
261
264
throws IllegalArgumentException {
262
- String hashAlgo = System .getenv ("ELASTIC_ID_HASH_ALGO" );
263
265
String docId = document .get (MappingsBuilder .TIMESTAMP_MILLIS_FIELD ).toString ();
264
266
JsonNode eventGranularity = document .get (MappingsBuilder .EVENT_GRANULARITY );
265
267
if (eventGranularity != null ) {
@@ -278,9 +280,9 @@ private static String getDocId(@Nonnull JsonNode document, String collectionId)
278
280
docId += partitionSpec .toString ();
279
281
}
280
282
281
- if (hashAlgo .equalsIgnoreCase ("SHA-256" )) {
283
+ if (idHashAlgo .equalsIgnoreCase ("SHA-256" )) {
282
284
return DigestUtils .sha256Hex (docId );
283
- } else if (hashAlgo .equalsIgnoreCase ("MD5" )) {
285
+ } else if (idHashAlgo .equalsIgnoreCase ("MD5" )) {
284
286
return DigestUtils .md5Hex (docId );
285
287
}
286
288
throw new IllegalArgumentException ("Hash function not handled !" );
0 commit comments