@@ -855,6 +855,7 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(
855
855
856
856
if (inputBatch .containsDuplicateAspects ()) {
857
857
log .warn (String .format ("Batch contains duplicates: %s" , inputBatch ));
858
+ MetricUtils .counter (EntityServiceImpl .class , "batch_with_duplicate" ).inc ();
858
859
}
859
860
860
861
return aspectDao
@@ -928,13 +929,15 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(
928
929
929
930
// No changes, return
930
931
if (changeMCPs .isEmpty ()) {
932
+ MetricUtils .counter (EntityServiceImpl .class , "batch_empty" ).inc ();
931
933
return Collections .<UpdateAspectResult >emptyList ();
932
934
}
933
935
934
936
// do final pre-commit checks with previous aspect value
935
937
ValidationExceptionCollection exceptions =
936
938
AspectsBatch .validatePreCommit (changeMCPs , opContext .getRetrieverContext ().get ());
937
939
if (!exceptions .isEmpty ()) {
940
+ MetricUtils .counter (EntityServiceImpl .class , "batch_validation_exception" ).inc ();
938
941
throw new ValidationException (collectMetrics (exceptions ).toString ());
939
942
}
940
943
@@ -972,10 +975,13 @@ This condition is specifically for an older conditional write ingestAspectIfNotP
972
975
*/
973
976
if (overwrite || databaseAspect == null ) {
974
977
result =
975
- ingestAspectToLocalDB (txContext , writeItem , databaseSystemAspect )
976
- .toBuilder ()
977
- .request (writeItem )
978
- .build ();
978
+ Optional .ofNullable (
979
+ ingestAspectToLocalDB (
980
+ txContext , writeItem , databaseSystemAspect ))
981
+ .map (
982
+ optResult ->
983
+ optResult .toBuilder ().request (writeItem ).build ())
984
+ .orElse (null );
979
985
980
986
} else {
981
987
RecordTemplate oldValue = databaseSystemAspect .getRecordTemplate ();
@@ -996,49 +1002,56 @@ This condition is specifically for an older conditional write ingestAspectIfNotP
996
1002
997
1003
return result ;
998
1004
})
1005
+ .filter (Objects ::nonNull )
999
1006
.collect (Collectors .toList ());
1000
1007
1001
- // commit upserts prior to retention or kafka send, if supported by impl
1002
- if (txContext != null ) {
1003
- txContext .commitAndContinue ();
1004
- }
1005
- long took = TimeUnit .NANOSECONDS .toMillis (ingestToLocalDBTimer .stop ());
1006
- if (took > DB_TIMER_LOG_THRESHOLD_MS ) {
1007
- log .info ("Ingestion of aspects batch to database took {} ms" , took );
1008
- }
1009
-
1010
- // Retention optimization and tx
1011
- if (retentionService != null ) {
1012
- List <RetentionService .RetentionContext > retentionBatch =
1013
- upsertResults .stream ()
1014
- // Only consider retention when there was a previous version
1015
- .filter (
1016
- result ->
1017
- batchAspects .containsKey (result .getUrn ().toString ())
1018
- && batchAspects
1019
- .get (result .getUrn ().toString ())
1020
- .containsKey (result .getRequest ().getAspectName ()))
1021
- .filter (
1022
- result -> {
1023
- RecordTemplate oldAspect = result .getOldValue ();
1024
- RecordTemplate newAspect = result .getNewValue ();
1025
- // Apply retention policies if there was an update to existing aspect
1026
- // value
1027
- return oldAspect != newAspect
1028
- && oldAspect != null
1029
- && retentionService != null ;
1030
- })
1031
- .map (
1032
- result ->
1033
- RetentionService .RetentionContext .builder ()
1034
- .urn (result .getUrn ())
1035
- .aspectName (result .getRequest ().getAspectName ())
1036
- .maxVersion (Optional .of (result .getMaxVersion ()))
1037
- .build ())
1038
- .collect (Collectors .toList ());
1039
- retentionService .applyRetentionWithPolicyDefaults (opContext , retentionBatch );
1008
+ if (!upsertResults .isEmpty ()) {
1009
+ // commit upserts prior to retention or kafka send, if supported by impl
1010
+ if (txContext != null ) {
1011
+ txContext .commitAndContinue ();
1012
+ }
1013
+ log .info (
1014
+ "Ingestion of aspects batch to database took {} in {} ms" ,
1015
+ upsertResults .size (),
1016
+ TimeUnit .NANOSECONDS .toMillis (ingestToLocalDBTimer .stop ()));
1017
+
1018
+ // Retention optimization and tx
1019
+ if (retentionService != null ) {
1020
+ List <RetentionService .RetentionContext > retentionBatch =
1021
+ upsertResults .stream ()
1022
+ // Only consider retention when there was a previous version
1023
+ .filter (
1024
+ result ->
1025
+ batchAspects .containsKey (result .getUrn ().toString ())
1026
+ && batchAspects
1027
+ .get (result .getUrn ().toString ())
1028
+ .containsKey (result .getRequest ().getAspectName ()))
1029
+ .filter (
1030
+ result -> {
1031
+ RecordTemplate oldAspect = result .getOldValue ();
1032
+ RecordTemplate newAspect = result .getNewValue ();
1033
+ // Apply retention policies if there was an update to existing
1034
+ // aspect
1035
+ // value
1036
+ return oldAspect != newAspect
1037
+ && oldAspect != null
1038
+ && retentionService != null ;
1039
+ })
1040
+ .map (
1041
+ result ->
1042
+ RetentionService .RetentionContext .builder ()
1043
+ .urn (result .getUrn ())
1044
+ .aspectName (result .getRequest ().getAspectName ())
1045
+ .maxVersion (Optional .of (result .getMaxVersion ()))
1046
+ .build ())
1047
+ .collect (Collectors .toList ());
1048
+ retentionService .applyRetentionWithPolicyDefaults (opContext , retentionBatch );
1049
+ } else {
1050
+ log .warn ("Retention service is missing!" );
1051
+ }
1040
1052
} else {
1041
- log .warn ("Retention service is missing!" );
1053
+ MetricUtils .counter (EntityServiceImpl .class , "batch_empty_transaction" ).inc ();
1054
+ log .warn ("Empty transaction detected. {}" , inputBatch );
1042
1055
}
1043
1056
1044
1057
return upsertResults ;
@@ -2506,7 +2519,7 @@ private Map<EntityAspectIdentifier, EnvelopedAspect> getEnvelopedAspects(
2506
2519
* @param databaseAspect The aspect as it exists in the database.
2507
2520
* @return result object
2508
2521
*/
2509
- @ Nonnull
2522
+ @ Nullable
2510
2523
private UpdateAspectResult ingestAspectToLocalDB (
2511
2524
@ Nullable TransactionContext txContext ,
2512
2525
@ Nonnull final ChangeMCP writeItem ,
@@ -2520,6 +2533,9 @@ private UpdateAspectResult ingestAspectToLocalDB(
2520
2533
.setLastRunId (writeItem .getSystemMetadata ().getRunId (GetMode .NULL ), SetMode .IGNORE_NULL );
2521
2534
2522
2535
// 2. Compare the latest existing and new.
2536
+ final RecordTemplate databaseValue =
2537
+ databaseAspect == null ? null : databaseAspect .getRecordTemplate ();
2538
+
2523
2539
final EntityAspect .EntitySystemAspect previousBatchAspect =
2524
2540
(EntityAspect .EntitySystemAspect ) writeItem .getPreviousSystemAspect ();
2525
2541
final RecordTemplate previousValue =
@@ -2528,7 +2544,7 @@ private UpdateAspectResult ingestAspectToLocalDB(
2528
2544
// 3. If there is no difference between existing and new, we just update
2529
2545
// the lastObserved in system metadata. RunId should stay as the original runId
2530
2546
if (previousValue != null
2531
- && DataTemplateUtil .areEqual (previousValue , writeItem .getRecordTemplate ())) {
2547
+ && DataTemplateUtil .areEqual (databaseValue , writeItem .getRecordTemplate ())) {
2532
2548
2533
2549
SystemMetadata latestSystemMetadata = previousBatchAspect .getSystemMetadata ();
2534
2550
latestSystemMetadata .setLastObserved (writeItem .getSystemMetadata ().getLastObserved ());
@@ -2564,45 +2580,49 @@ private UpdateAspectResult ingestAspectToLocalDB(
2564
2580
}
2565
2581
2566
2582
// 4. Save the newValue as the latest version
2567
- log .debug (
2568
- "Ingesting aspect with name {}, urn {}" , writeItem .getAspectName (), writeItem .getUrn ());
2569
- String newValueStr = EntityApiUtils .toJsonAspect (writeItem .getRecordTemplate ());
2570
- long versionOfOld =
2571
- aspectDao .saveLatestAspect (
2572
- txContext ,
2573
- writeItem .getUrn ().toString (),
2574
- writeItem .getAspectName (),
2575
- previousBatchAspect == null ? null : EntityApiUtils .toJsonAspect (previousValue ),
2576
- previousBatchAspect == null ? null : previousBatchAspect .getCreatedBy (),
2577
- previousBatchAspect == null
2578
- ? null
2579
- : previousBatchAspect .getEntityAspect ().getCreatedFor (),
2580
- previousBatchAspect == null ? null : previousBatchAspect .getCreatedOn (),
2581
- previousBatchAspect == null ? null : previousBatchAspect .getSystemMetadataRaw (),
2582
- newValueStr ,
2583
- writeItem .getAuditStamp ().getActor ().toString (),
2584
- writeItem .getAuditStamp ().hasImpersonator ()
2585
- ? writeItem .getAuditStamp ().getImpersonator ().toString ()
2586
- : null ,
2587
- new Timestamp (writeItem .getAuditStamp ().getTime ()),
2588
- EntityApiUtils .toJsonAspect (writeItem .getSystemMetadata ()),
2589
- writeItem .getNextAspectVersion ());
2590
-
2591
- // metrics
2592
- aspectDao .incrementWriteMetrics (
2593
- writeItem .getAspectName (), 1 , newValueStr .getBytes (StandardCharsets .UTF_8 ).length );
2594
-
2595
- return UpdateAspectResult .builder ()
2596
- .urn (writeItem .getUrn ())
2597
- .oldValue (previousValue )
2598
- .newValue (writeItem .getRecordTemplate ())
2599
- .oldSystemMetadata (
2600
- previousBatchAspect == null ? null : previousBatchAspect .getSystemMetadata ())
2601
- .newSystemMetadata (writeItem .getSystemMetadata ())
2602
- .operation (MetadataAuditOperation .UPDATE )
2603
- .auditStamp (writeItem .getAuditStamp ())
2604
- .maxVersion (versionOfOld )
2605
- .build ();
2583
+ if (!DataTemplateUtil .areEqual (databaseValue , writeItem .getRecordTemplate ())) {
2584
+ log .debug (
2585
+ "Ingesting aspect with name {}, urn {}" , writeItem .getAspectName (), writeItem .getUrn ());
2586
+ String newValueStr = EntityApiUtils .toJsonAspect (writeItem .getRecordTemplate ());
2587
+ long versionOfOld =
2588
+ aspectDao .saveLatestAspect (
2589
+ txContext ,
2590
+ writeItem .getUrn ().toString (),
2591
+ writeItem .getAspectName (),
2592
+ previousBatchAspect == null ? null : EntityApiUtils .toJsonAspect (previousValue ),
2593
+ previousBatchAspect == null ? null : previousBatchAspect .getCreatedBy (),
2594
+ previousBatchAspect == null
2595
+ ? null
2596
+ : previousBatchAspect .getEntityAspect ().getCreatedFor (),
2597
+ previousBatchAspect == null ? null : previousBatchAspect .getCreatedOn (),
2598
+ previousBatchAspect == null ? null : previousBatchAspect .getSystemMetadataRaw (),
2599
+ newValueStr ,
2600
+ writeItem .getAuditStamp ().getActor ().toString (),
2601
+ writeItem .getAuditStamp ().hasImpersonator ()
2602
+ ? writeItem .getAuditStamp ().getImpersonator ().toString ()
2603
+ : null ,
2604
+ new Timestamp (writeItem .getAuditStamp ().getTime ()),
2605
+ EntityApiUtils .toJsonAspect (writeItem .getSystemMetadata ()),
2606
+ writeItem .getNextAspectVersion ());
2607
+
2608
+ // metrics
2609
+ aspectDao .incrementWriteMetrics (
2610
+ writeItem .getAspectName (), 1 , newValueStr .getBytes (StandardCharsets .UTF_8 ).length );
2611
+
2612
+ return UpdateAspectResult .builder ()
2613
+ .urn (writeItem .getUrn ())
2614
+ .oldValue (previousValue )
2615
+ .newValue (writeItem .getRecordTemplate ())
2616
+ .oldSystemMetadata (
2617
+ previousBatchAspect == null ? null : previousBatchAspect .getSystemMetadata ())
2618
+ .newSystemMetadata (writeItem .getSystemMetadata ())
2619
+ .operation (MetadataAuditOperation .UPDATE )
2620
+ .auditStamp (writeItem .getAuditStamp ())
2621
+ .maxVersion (versionOfOld )
2622
+ .build ();
2623
+ }
2624
+
2625
+ return null ;
2606
2626
}
2607
2627
2608
2628
private static boolean shouldAspectEmitChangeLog (@ Nonnull final AspectSpec aspectSpec ) {
0 commit comments