Skip to content

Commit d4e8a82

Browse files
committed
fix(entity-service): handle no-op system-metadata batches
* revert previous changes * disable legacy locking * handle no-op when no change to system metadata
1 parent 46aa962 commit d4e8a82

File tree

4 files changed

+41
-37
lines changed

4 files changed

+41
-37
lines changed

metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java

+36-32
Original file line numberDiff line numberDiff line change
@@ -1051,7 +1051,8 @@ This condition is specifically for an older conditional write ingestAspectIfNotP
10511051
}
10521052
} else {
10531053
MetricUtils.counter(EntityServiceImpl.class, "batch_empty_transaction").inc();
1054-
log.warn("Empty transaction detected. {}", inputBatch);
1054+
// This includes no-op batches. i.e. patch removing non-existent items
1055+
log.debug("Empty transaction detected. {}", inputBatch);
10551056
}
10561057

10571058
return upsertResults;
@@ -2533,9 +2534,6 @@ private UpdateAspectResult ingestAspectToLocalDB(
25332534
.setLastRunId(writeItem.getSystemMetadata().getRunId(GetMode.NULL), SetMode.IGNORE_NULL);
25342535

25352536
// 2. Compare the latest existing and new.
2536-
final RecordTemplate databaseValue =
2537-
databaseAspect == null ? null : databaseAspect.getRecordTemplate();
2538-
25392537
final EntityAspect.EntitySystemAspect previousBatchAspect =
25402538
(EntityAspect.EntitySystemAspect) writeItem.getPreviousSystemAspect();
25412539
final RecordTemplate previousValue =
@@ -2544,43 +2542,49 @@ private UpdateAspectResult ingestAspectToLocalDB(
25442542
// 3. If there is no difference between existing and new, we just update
25452543
// the lastObserved in system metadata. RunId should stay as the original runId
25462544
if (previousValue != null
2547-
&& DataTemplateUtil.areEqual(databaseValue, writeItem.getRecordTemplate())) {
2545+
&& DataTemplateUtil.areEqual(previousValue, writeItem.getRecordTemplate())) {
25482546

25492547
SystemMetadata latestSystemMetadata = previousBatchAspect.getSystemMetadata();
25502548
latestSystemMetadata.setLastObserved(writeItem.getSystemMetadata().getLastObserved());
25512549
latestSystemMetadata.setLastRunId(
25522550
writeItem.getSystemMetadata().getLastRunId(GetMode.NULL), SetMode.IGNORE_NULL);
25532551

2554-
previousBatchAspect
2555-
.getEntityAspect()
2556-
.setSystemMetadata(RecordUtils.toJsonString(latestSystemMetadata));
2557-
2558-
log.info(
2559-
"Ingesting aspect with name {}, urn {}",
2560-
previousBatchAspect.getAspectName(),
2561-
previousBatchAspect.getUrn());
2562-
aspectDao.saveAspect(txContext, previousBatchAspect.getEntityAspect(), false);
2563-
2564-
// metrics
2565-
aspectDao.incrementWriteMetrics(
2566-
previousBatchAspect.getAspectName(),
2567-
1,
2568-
previousBatchAspect.getMetadataRaw().getBytes(StandardCharsets.UTF_8).length);
2569-
2570-
return UpdateAspectResult.builder()
2571-
.urn(writeItem.getUrn())
2572-
.oldValue(previousValue)
2573-
.newValue(previousValue)
2574-
.oldSystemMetadata(previousBatchAspect.getSystemMetadata())
2575-
.newSystemMetadata(latestSystemMetadata)
2576-
.operation(MetadataAuditOperation.UPDATE)
2577-
.auditStamp(writeItem.getAuditStamp())
2578-
.maxVersion(0)
2579-
.build();
2552+
if (!latestSystemMetadata.equals(previousBatchAspect.getSystemMetadata())) {
2553+
2554+
previousBatchAspect
2555+
.getEntityAspect()
2556+
.setSystemMetadata(RecordUtils.toJsonString(latestSystemMetadata));
2557+
2558+
log.debug(
2559+
"Update aspect with name {}, urn {}",
2560+
previousBatchAspect.getAspectName(),
2561+
previousBatchAspect.getUrn());
2562+
aspectDao.saveAspect(txContext, previousBatchAspect.getEntityAspect(), false);
2563+
2564+
// metrics
2565+
aspectDao.incrementWriteMetrics(
2566+
previousBatchAspect.getAspectName(),
2567+
1,
2568+
previousBatchAspect.getMetadataRaw().getBytes(StandardCharsets.UTF_8).length);
2569+
2570+
return UpdateAspectResult.builder()
2571+
.urn(writeItem.getUrn())
2572+
.oldValue(previousValue)
2573+
.newValue(previousValue)
2574+
.oldSystemMetadata(previousBatchAspect.getSystemMetadata())
2575+
.newSystemMetadata(latestSystemMetadata)
2576+
.operation(MetadataAuditOperation.UPDATE)
2577+
.auditStamp(writeItem.getAuditStamp())
2578+
.maxVersion(0)
2579+
.build();
2580+
} else {
2581+
MetricUtils.counter(EntityServiceImpl.class, "batch_with_noop_sysmetadata").inc();
2582+
return null;
2583+
}
25802584
}
25812585

25822586
// 4. Save the newValue as the latest version
2583-
if (!DataTemplateUtil.areEqual(databaseValue, writeItem.getRecordTemplate())) {
2587+
if (!DataTemplateUtil.areEqual(previousValue, writeItem.getRecordTemplate())) {
25842588
log.debug(
25852589
"Ingesting aspect with name {}, urn {}", writeItem.getAspectName(), writeItem.getUrn());
25862590
String newValueStr = EntityApiUtils.toJsonAspect(writeItem.getRecordTemplate());

metadata-io/src/main/java/com/linkedin/metadata/entity/ebean/EbeanAspectDao.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ public long saveLatestAspect(
191191
newTime,
192192
newSystemMetadata,
193193
ASPECT_LATEST_VERSION,
194-
oldAspectMetadata == null);
194+
ASPECT_LATEST_VERSION.equals(nextVersion));
195195

196196
return largestVersion;
197197
}

metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ public void testBatchPatchWithTrailingNoOp() throws Exception {
538538
opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
539539
assertEquals(
540540
envelopedAspect.getSystemMetadata().getVersion(),
541-
"3",
541+
"2",
542542
"Expected version 3. 1 - Initial, + 1 add, 1 remove");
543543
assertEquals(
544544
new GlobalTags(envelopedAspect.getValue().data())
@@ -653,7 +653,7 @@ public void testBatchPatchAdd() throws Exception {
653653
EnvelopedAspect envelopedAspect =
654654
_entityServiceImpl.getLatestEnvelopedAspect(
655655
opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
656-
assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "4", "Expected version 4");
656+
assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "3", "Expected version 4");
657657
assertEquals(
658658
new GlobalTags(envelopedAspect.getValue().data())
659659
.getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),
@@ -741,7 +741,7 @@ public void testBatchPatchAddDuplicate() throws Exception {
741741
EnvelopedAspect envelopedAspect =
742742
_entityServiceImpl.getLatestEnvelopedAspect(
743743
opContext, DATASET_ENTITY_NAME, entityUrn, GLOBAL_TAGS_ASPECT_NAME);
744-
assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "3", "Expected version 3");
744+
assertEquals(envelopedAspect.getSystemMetadata().getVersion(), "2", "Expected version 2");
745745
assertEquals(
746746
new GlobalTags(envelopedAspect.getValue().data())
747747
.getTags().stream().map(TagAssociation::getTag).collect(Collectors.toSet()),

metadata-service/configuration/src/main/resources/application.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ ebean:
159159
autoCreateDdl: ${EBEAN_AUTOCREATE:false}
160160
postgresUseIamAuth: ${EBEAN_POSTGRES_USE_AWS_IAM_AUTH:false}
161161
locking:
162-
enabled: ${EBEAN_LOCKING_ENABLED:true}
162+
enabled: ${EBEAN_LOCKING_ENABLED:false}
163163
durationSeconds: ${EBEAN_LOCKING_DURATION_SECONDS:60}
164164
maximumLocks: ${EBEAN_LOCKING_MAXIMUM_LOCKS:20000}
165165

0 commit comments

Comments
 (0)