Skip to content

Commit 2ad5bf5

Browse files
david-leifkersleeperdeep
authored andcommitted
fix(entity-service): no-op batches (datahub-project#12047)
1 parent 3059b56 commit 2ad5bf5

File tree

3 files changed

+201
-87
lines changed

3 files changed

+201
-87
lines changed

li-utils/src/main/java/com/linkedin/metadata/Constants.java

+1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public class Constants {
5151
// App sources
5252
public static final String UI_SOURCE = "ui";
5353
public static final String SYSTEM_UPDATE_SOURCE = "systemUpdate";
54+
public static final String METADATA_TESTS_SOURCE = "metadataTests";
5455

5556
/** Entities */
5657
public static final String CORP_USER_ENTITY_NAME = "corpuser";

metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java

+104-84
Original file line numberDiff line numberDiff line change
@@ -855,6 +855,7 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(
855855

856856
if (inputBatch.containsDuplicateAspects()) {
857857
log.warn(String.format("Batch contains duplicates: %s", inputBatch));
858+
MetricUtils.counter(EntityServiceImpl.class, "batch_with_duplicate").inc();
858859
}
859860

860861
return aspectDao
@@ -928,13 +929,15 @@ private List<UpdateAspectResult> ingestAspectsToLocalDB(
928929

929930
// No changes, return
930931
if (changeMCPs.isEmpty()) {
932+
MetricUtils.counter(EntityServiceImpl.class, "batch_empty").inc();
931933
return Collections.<UpdateAspectResult>emptyList();
932934
}
933935

934936
// do final pre-commit checks with previous aspect value
935937
ValidationExceptionCollection exceptions =
936938
AspectsBatch.validatePreCommit(changeMCPs, opContext.getRetrieverContext().get());
937939
if (!exceptions.isEmpty()) {
940+
MetricUtils.counter(EntityServiceImpl.class, "batch_validation_exception").inc();
938941
throw new ValidationException(collectMetrics(exceptions).toString());
939942
}
940943

@@ -972,10 +975,13 @@ This condition is specifically for an older conditional write ingestAspectIfNotP
972975
*/
973976
if (overwrite || databaseAspect == null) {
974977
result =
975-
ingestAspectToLocalDB(txContext, writeItem, databaseSystemAspect)
976-
.toBuilder()
977-
.request(writeItem)
978-
.build();
978+
Optional.ofNullable(
979+
ingestAspectToLocalDB(
980+
txContext, writeItem, databaseSystemAspect))
981+
.map(
982+
optResult ->
983+
optResult.toBuilder().request(writeItem).build())
984+
.orElse(null);
979985

980986
} else {
981987
RecordTemplate oldValue = databaseSystemAspect.getRecordTemplate();
@@ -996,49 +1002,56 @@ This condition is specifically for an older conditional write ingestAspectIfNotP
9961002

9971003
return result;
9981004
})
1005+
.filter(Objects::nonNull)
9991006
.collect(Collectors.toList());
10001007

1001-
// commit upserts prior to retention or kafka send, if supported by impl
1002-
if (txContext != null) {
1003-
txContext.commitAndContinue();
1004-
}
1005-
long took = TimeUnit.NANOSECONDS.toMillis(ingestToLocalDBTimer.stop());
1006-
if (took > DB_TIMER_LOG_THRESHOLD_MS) {
1007-
log.info("Ingestion of aspects batch to database took {} ms", took);
1008-
}
1008+
if (!upsertResults.isEmpty()) {
1009+
// commit upserts prior to retention or kafka send, if supported by impl
1010+
if (txContext != null) {
1011+
txContext.commitAndContinue();
1012+
}
1013+
long took = TimeUnit.NANOSECONDS.toMillis(ingestToLocalDBTimer.stop());
1014+
if (took > DB_TIMER_LOG_THRESHOLD_MS) {
1015+
log.info("Ingestion of aspects batch to database took {} ms", took);
1016+
}
10091017

1010-
// Retention optimization and tx
1011-
if (retentionService != null) {
1012-
List<RetentionService.RetentionContext> retentionBatch =
1013-
upsertResults.stream()
1014-
// Only consider retention when there was a previous version
1015-
.filter(
1016-
result ->
1017-
batchAspects.containsKey(result.getUrn().toString())
1018-
&& batchAspects
1019-
.get(result.getUrn().toString())
1020-
.containsKey(result.getRequest().getAspectName()))
1021-
.filter(
1022-
result -> {
1023-
RecordTemplate oldAspect = result.getOldValue();
1024-
RecordTemplate newAspect = result.getNewValue();
1025-
// Apply retention policies if there was an update to existing aspect
1026-
// value
1027-
return oldAspect != newAspect
1028-
&& oldAspect != null
1029-
&& retentionService != null;
1030-
})
1031-
.map(
1032-
result ->
1033-
RetentionService.RetentionContext.builder()
1034-
.urn(result.getUrn())
1035-
.aspectName(result.getRequest().getAspectName())
1036-
.maxVersion(Optional.of(result.getMaxVersion()))
1037-
.build())
1038-
.collect(Collectors.toList());
1039-
retentionService.applyRetentionWithPolicyDefaults(opContext, retentionBatch);
1018+
// Retention optimization and tx
1019+
if (retentionService != null) {
1020+
List<RetentionService.RetentionContext> retentionBatch =
1021+
upsertResults.stream()
1022+
// Only consider retention when there was a previous version
1023+
.filter(
1024+
result ->
1025+
batchAspects.containsKey(result.getUrn().toString())
1026+
&& batchAspects
1027+
.get(result.getUrn().toString())
1028+
.containsKey(result.getRequest().getAspectName()))
1029+
.filter(
1030+
result -> {
1031+
RecordTemplate oldAspect = result.getOldValue();
1032+
RecordTemplate newAspect = result.getNewValue();
1033+
// Apply retention policies if there was an update to existing
1034+
// aspect
1035+
// value
1036+
return oldAspect != newAspect
1037+
&& oldAspect != null
1038+
&& retentionService != null;
1039+
})
1040+
.map(
1041+
result ->
1042+
RetentionService.RetentionContext.builder()
1043+
.urn(result.getUrn())
1044+
.aspectName(result.getRequest().getAspectName())
1045+
.maxVersion(Optional.of(result.getMaxVersion()))
1046+
.build())
1047+
.collect(Collectors.toList());
1048+
retentionService.applyRetentionWithPolicyDefaults(opContext, retentionBatch);
1049+
} else {
1050+
log.warn("Retention service is missing!");
1051+
}
10401052
} else {
1041-
log.warn("Retention service is missing!");
1053+
MetricUtils.counter(EntityServiceImpl.class, "batch_empty_transaction").inc();
1054+
log.warn("Empty transaction detected. {}", inputBatch);
10421055
}
10431056

10441057
return upsertResults;
@@ -2506,7 +2519,7 @@ private Map<EntityAspectIdentifier, EnvelopedAspect> getEnvelopedAspects(
25062519
* @param databaseAspect The aspect as it exists in the database.
25072520
* @return result object
25082521
*/
2509-
@Nonnull
2522+
@Nullable
25102523
private UpdateAspectResult ingestAspectToLocalDB(
25112524
@Nullable TransactionContext txContext,
25122525
@Nonnull final ChangeMCP writeItem,
@@ -2520,6 +2533,9 @@ private UpdateAspectResult ingestAspectToLocalDB(
25202533
.setLastRunId(writeItem.getSystemMetadata().getRunId(GetMode.NULL), SetMode.IGNORE_NULL);
25212534

25222535
// 2. Compare the latest existing and new.
2536+
final RecordTemplate databaseValue =
2537+
databaseAspect == null ? null : databaseAspect.getRecordTemplate();
2538+
25232539
final EntityAspect.EntitySystemAspect previousBatchAspect =
25242540
(EntityAspect.EntitySystemAspect) writeItem.getPreviousSystemAspect();
25252541
final RecordTemplate previousValue =
@@ -2528,7 +2544,7 @@ private UpdateAspectResult ingestAspectToLocalDB(
25282544
// 3. If there is no difference between existing and new, we just update
25292545
// the lastObserved in system metadata. RunId should stay as the original runId
25302546
if (previousValue != null
2531-
&& DataTemplateUtil.areEqual(previousValue, writeItem.getRecordTemplate())) {
2547+
&& DataTemplateUtil.areEqual(databaseValue, writeItem.getRecordTemplate())) {
25322548

25332549
SystemMetadata latestSystemMetadata = previousBatchAspect.getSystemMetadata();
25342550
latestSystemMetadata.setLastObserved(writeItem.getSystemMetadata().getLastObserved());
@@ -2564,45 +2580,49 @@ private UpdateAspectResult ingestAspectToLocalDB(
25642580
}
25652581

25662582
// 4. Save the newValue as the latest version
2567-
log.debug(
2568-
"Ingesting aspect with name {}, urn {}", writeItem.getAspectName(), writeItem.getUrn());
2569-
String newValueStr = EntityApiUtils.toJsonAspect(writeItem.getRecordTemplate());
2570-
long versionOfOld =
2571-
aspectDao.saveLatestAspect(
2572-
txContext,
2573-
writeItem.getUrn().toString(),
2574-
writeItem.getAspectName(),
2575-
previousBatchAspect == null ? null : EntityApiUtils.toJsonAspect(previousValue),
2576-
previousBatchAspect == null ? null : previousBatchAspect.getCreatedBy(),
2577-
previousBatchAspect == null
2578-
? null
2579-
: previousBatchAspect.getEntityAspect().getCreatedFor(),
2580-
previousBatchAspect == null ? null : previousBatchAspect.getCreatedOn(),
2581-
previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadataRaw(),
2582-
newValueStr,
2583-
writeItem.getAuditStamp().getActor().toString(),
2584-
writeItem.getAuditStamp().hasImpersonator()
2585-
? writeItem.getAuditStamp().getImpersonator().toString()
2586-
: null,
2587-
new Timestamp(writeItem.getAuditStamp().getTime()),
2588-
EntityApiUtils.toJsonAspect(writeItem.getSystemMetadata()),
2589-
writeItem.getNextAspectVersion());
2590-
2591-
// metrics
2592-
aspectDao.incrementWriteMetrics(
2593-
writeItem.getAspectName(), 1, newValueStr.getBytes(StandardCharsets.UTF_8).length);
2594-
2595-
return UpdateAspectResult.builder()
2596-
.urn(writeItem.getUrn())
2597-
.oldValue(previousValue)
2598-
.newValue(writeItem.getRecordTemplate())
2599-
.oldSystemMetadata(
2600-
previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadata())
2601-
.newSystemMetadata(writeItem.getSystemMetadata())
2602-
.operation(MetadataAuditOperation.UPDATE)
2603-
.auditStamp(writeItem.getAuditStamp())
2604-
.maxVersion(versionOfOld)
2605-
.build();
2583+
if (!DataTemplateUtil.areEqual(databaseValue, writeItem.getRecordTemplate())) {
2584+
log.debug(
2585+
"Ingesting aspect with name {}, urn {}", writeItem.getAspectName(), writeItem.getUrn());
2586+
String newValueStr = EntityApiUtils.toJsonAspect(writeItem.getRecordTemplate());
2587+
long versionOfOld =
2588+
aspectDao.saveLatestAspect(
2589+
txContext,
2590+
writeItem.getUrn().toString(),
2591+
writeItem.getAspectName(),
2592+
previousBatchAspect == null ? null : EntityApiUtils.toJsonAspect(previousValue),
2593+
previousBatchAspect == null ? null : previousBatchAspect.getCreatedBy(),
2594+
previousBatchAspect == null
2595+
? null
2596+
: previousBatchAspect.getEntityAspect().getCreatedFor(),
2597+
previousBatchAspect == null ? null : previousBatchAspect.getCreatedOn(),
2598+
previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadataRaw(),
2599+
newValueStr,
2600+
writeItem.getAuditStamp().getActor().toString(),
2601+
writeItem.getAuditStamp().hasImpersonator()
2602+
? writeItem.getAuditStamp().getImpersonator().toString()
2603+
: null,
2604+
new Timestamp(writeItem.getAuditStamp().getTime()),
2605+
EntityApiUtils.toJsonAspect(writeItem.getSystemMetadata()),
2606+
writeItem.getNextAspectVersion());
2607+
2608+
// metrics
2609+
aspectDao.incrementWriteMetrics(
2610+
writeItem.getAspectName(), 1, newValueStr.getBytes(StandardCharsets.UTF_8).length);
2611+
2612+
return UpdateAspectResult.builder()
2613+
.urn(writeItem.getUrn())
2614+
.oldValue(previousValue)
2615+
.newValue(writeItem.getRecordTemplate())
2616+
.oldSystemMetadata(
2617+
previousBatchAspect == null ? null : previousBatchAspect.getSystemMetadata())
2618+
.newSystemMetadata(writeItem.getSystemMetadata())
2619+
.operation(MetadataAuditOperation.UPDATE)
2620+
.auditStamp(writeItem.getAuditStamp())
2621+
.maxVersion(versionOfOld)
2622+
.build();
2623+
}
2624+
2625+
return null;
26062626
}
26072627

26082628
private static boolean shouldAspectEmitChangeLog(@Nonnull final AspectSpec aspectSpec) {

0 commit comments

Comments
 (0)