Skip to content

Commit 005a9b0

Browse files
fix(tracing): handle noop mcl (#12713)
1 parent 16ef1ac commit 005a9b0

File tree

10 files changed

+673
-11
lines changed

10 files changed

+673
-11
lines changed

entity-registry/src/main/java/com/linkedin/metadata/utils/SystemMetadataUtils.java

+21
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44

55
import com.datahub.util.RecordUtils;
66
import com.linkedin.data.template.SetMode;
7+
import com.linkedin.data.template.StringMap;
78
import com.linkedin.mxe.SystemMetadata;
89
import javax.annotation.Nullable;
910
import lombok.extern.slf4j.Slf4j;
1011

1112
@Slf4j
1213
public class SystemMetadataUtils {
14+
private static final String NO_OP_KEY = "isNoOp";
1315

1416
private SystemMetadataUtils() {}
1517

@@ -42,4 +44,23 @@ public static SystemMetadata parseSystemMetadata(String jsonSystemMetadata) {
4244
}
4345
return RecordUtils.toRecordTemplate(SystemMetadata.class, jsonSystemMetadata);
4446
}
47+
48+
public static boolean isNoOp(@Nullable SystemMetadata systemMetadata) {
49+
if (systemMetadata != null && systemMetadata.hasProperties()) {
50+
return Boolean.parseBoolean(systemMetadata.getProperties().getOrDefault(NO_OP_KEY, "false"));
51+
}
52+
53+
return false;
54+
}
55+
56+
@Nullable
57+
public static SystemMetadata setNoOp(@Nullable SystemMetadata systemMetadata, boolean isNoOp) {
58+
if (systemMetadata != null) {
59+
if (!systemMetadata.hasProperties()) {
60+
systemMetadata.setProperties(new StringMap());
61+
}
62+
systemMetadata.getProperties().put(NO_OP_KEY, String.valueOf(isNoOp));
63+
}
64+
return systemMetadata;
65+
}
4566
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
package com.linkedin.metadata.utils;
2+
3+
import static com.linkedin.metadata.Constants.DEFAULT_RUN_ID;
4+
import static org.testng.Assert.*;
5+
6+
import com.linkedin.data.template.StringMap;
7+
import com.linkedin.mxe.SystemMetadata;
8+
import org.testng.annotations.Test;
9+
10+
public class SystemMetadataUtilsTest {
11+
12+
@Test
13+
public void testCreateDefaultSystemMetadata() {
14+
SystemMetadata metadata = SystemMetadataUtils.createDefaultSystemMetadata();
15+
16+
assertNotNull(metadata);
17+
assertEquals(metadata.getRunId(), DEFAULT_RUN_ID);
18+
assertTrue(metadata.hasLastObserved());
19+
assertTrue(metadata.getLastObserved() > 0);
20+
}
21+
22+
@Test
23+
public void testCreateDefaultSystemMetadataWithRunId() {
24+
String customRunId = "custom-run-id";
25+
SystemMetadata metadata = SystemMetadataUtils.createDefaultSystemMetadata(customRunId);
26+
27+
assertNotNull(metadata);
28+
assertEquals(metadata.getRunId(), customRunId);
29+
assertTrue(metadata.hasLastObserved());
30+
assertTrue(metadata.getLastObserved() > 0);
31+
}
32+
33+
@Test
34+
public void testGenerateSystemMetadataIfEmpty() {
35+
// Test with null input
36+
SystemMetadata nullMetadata = SystemMetadataUtils.generateSystemMetadataIfEmpty(null);
37+
assertNotNull(nullMetadata);
38+
assertEquals(nullMetadata.getRunId(), DEFAULT_RUN_ID);
39+
assertTrue(nullMetadata.hasLastObserved());
40+
41+
// Test with existing metadata
42+
SystemMetadata existingMetadata =
43+
new SystemMetadata().setRunId("existing-run").setLastObserved(1234567890L);
44+
SystemMetadata result = SystemMetadataUtils.generateSystemMetadataIfEmpty(existingMetadata);
45+
46+
assertEquals(result.getRunId(), "existing-run");
47+
assertEquals(result.getLastObserved(), 1234567890L);
48+
}
49+
50+
@Test
51+
public void testParseSystemMetadata() {
52+
// Test null input
53+
SystemMetadata nullResult = SystemMetadataUtils.parseSystemMetadata(null);
54+
assertNotNull(nullResult);
55+
assertEquals(nullResult.getRunId(), DEFAULT_RUN_ID);
56+
57+
// Test empty string input
58+
SystemMetadata emptyResult = SystemMetadataUtils.parseSystemMetadata("");
59+
assertNotNull(emptyResult);
60+
assertEquals(emptyResult.getRunId(), DEFAULT_RUN_ID);
61+
62+
// Test valid JSON input
63+
String validJson = "{\"runId\":\"test-run\",\"lastObserved\":1234567890}";
64+
SystemMetadata jsonResult = SystemMetadataUtils.parseSystemMetadata(validJson);
65+
assertNotNull(jsonResult);
66+
assertEquals(jsonResult.getRunId(), "test-run");
67+
assertEquals(jsonResult.getLastObserved(), 1234567890L);
68+
}
69+
70+
@Test
71+
public void testIsNoOp() {
72+
// Test null metadata
73+
assertFalse(SystemMetadataUtils.isNoOp(null));
74+
75+
// Test metadata without properties
76+
SystemMetadata emptyMetadata = new SystemMetadata();
77+
assertFalse(SystemMetadataUtils.isNoOp(emptyMetadata));
78+
79+
// Test metadata with isNoOp=true
80+
SystemMetadata noOpMetadata = new SystemMetadata();
81+
StringMap properties = new StringMap();
82+
properties.put("isNoOp", "true");
83+
noOpMetadata.setProperties(properties);
84+
assertTrue(SystemMetadataUtils.isNoOp(noOpMetadata));
85+
86+
// Test metadata with isNoOp=false
87+
properties.put("isNoOp", "false");
88+
assertFalse(SystemMetadataUtils.isNoOp(noOpMetadata));
89+
}
90+
91+
@Test
92+
public void testSetNoOp() {
93+
// Test with null metadata
94+
assertNull(SystemMetadataUtils.setNoOp(null, true));
95+
96+
// Test setting noOp to true
97+
SystemMetadata metadata = new SystemMetadata();
98+
SystemMetadata result = SystemMetadataUtils.setNoOp(metadata, true);
99+
assertNotNull(result);
100+
assertTrue(result.hasProperties());
101+
assertNotNull(result.getProperties());
102+
assertEquals(result.getProperties().get("isNoOp"), "true");
103+
104+
// Test setting noOp to false
105+
result = SystemMetadataUtils.setNoOp(metadata, false);
106+
assertNotNull(result);
107+
assertTrue(result.hasProperties());
108+
assertNotNull(result.getProperties());
109+
assertEquals(result.getProperties().get("isNoOp"), "false");
110+
111+
// Test with existing properties
112+
StringMap existingProps = new StringMap();
113+
existingProps.put("otherKey", "value");
114+
metadata.setProperties(existingProps);
115+
result = SystemMetadataUtils.setNoOp(metadata, true);
116+
assertNotNull(result);
117+
assertEquals(result.getProperties().get("otherKey"), "value");
118+
assertEquals(result.getProperties().get("isNoOp"), "true");
119+
}
120+
121+
@Test
122+
public void testGenerateSystemMetadataIfEmpty_NullInput() {
123+
SystemMetadata result = SystemMetadataUtils.generateSystemMetadataIfEmpty(null);
124+
125+
assertNotNull(result);
126+
assertEquals(DEFAULT_RUN_ID, result.getRunId());
127+
assertNotNull(result.getLastObserved());
128+
assertTrue(result.getLastObserved() > 0);
129+
}
130+
131+
@Test
132+
public void testGenerateSystemMetadataIfEmpty_NoRunId() {
133+
SystemMetadata input = new SystemMetadata().setLastObserved(1234567890L);
134+
135+
SystemMetadata result = SystemMetadataUtils.generateSystemMetadataIfEmpty(input);
136+
137+
assertNotNull(result);
138+
assertEquals(DEFAULT_RUN_ID, result.getRunId());
139+
assertEquals(1234567890L, result.getLastObserved().longValue());
140+
}
141+
142+
@Test
143+
public void testGenerateSystemMetadataIfEmpty_NoLastObserved() {
144+
SystemMetadata input = new SystemMetadata().setRunId("custom-run-id");
145+
146+
SystemMetadata result = SystemMetadataUtils.generateSystemMetadataIfEmpty(input);
147+
148+
assertNotNull(result);
149+
assertEquals("custom-run-id", result.getRunId());
150+
assertNotNull(result.getLastObserved());
151+
assertTrue(result.getLastObserved() > 0);
152+
}
153+
154+
@Test
155+
public void testGenerateSystemMetadataIfEmpty_ZeroLastObserved() {
156+
SystemMetadata input = new SystemMetadata().setRunId("custom-run-id").setLastObserved(0L);
157+
158+
SystemMetadata result = SystemMetadataUtils.generateSystemMetadataIfEmpty(input);
159+
160+
assertNotNull(result);
161+
assertEquals("custom-run-id", result.getRunId());
162+
assertNotNull(result.getLastObserved());
163+
assertTrue(result.getLastObserved() > 0);
164+
}
165+
166+
@Test
167+
public void testGenerateSystemMetadataIfEmpty_AllFieldsPopulated() {
168+
SystemMetadata input =
169+
new SystemMetadata().setRunId("custom-run-id").setLastObserved(1234567890L);
170+
171+
SystemMetadata result = SystemMetadataUtils.generateSystemMetadataIfEmpty(input);
172+
173+
assertNotNull(result);
174+
assertEquals("custom-run-id", result.getRunId());
175+
assertEquals(1234567890L, result.getLastObserved().longValue());
176+
}
177+
}

metadata-io/build.gradle

+1
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ dependencies {
9797
testImplementation externalDependency.springBootTest
9898
testImplementation spec.product.pegasus.restliServer
9999
testImplementation externalDependency.ebeanTest
100+
testImplementation externalDependency.opentelemetrySdk
100101

101102
// logback >=1.3 required due to `testcontainers` only
102103
testImplementation 'ch.qos.logback:logback-classic:1.4.7'

metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
1111
import com.linkedin.metadata.entity.ebean.PartitionedStream;
1212
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
13+
import com.linkedin.metadata.utils.SystemMetadataUtils;
1314
import com.linkedin.metadata.utils.metrics.MetricUtils;
1415
import com.linkedin.mxe.SystemMetadata;
1516
import com.linkedin.util.Pair;
@@ -142,15 +143,19 @@ default Pair<Optional<EntityAspect>, Optional<EntityAspect>> saveLatestAspect(
142143
.equals(currentVersion0.getSystemMetadataVersion())) {
143144

144145
inserted = insertAspect(txContext, latestAspect.getDatabaseAspect().get(), targetVersion);
145-
146-
// add trace - overwrite if version incremented
147-
newAspect.setSystemMetadata(opContext.withTraceId(newAspect.getSystemMetadata(), true));
148146
}
149147

150148
// update version 0
151149
Optional<EntityAspect> updated = Optional.empty();
150+
boolean isNoOp =
151+
Objects.equals(currentVersion0.getRecordTemplate(), newAspect.getRecordTemplate());
152+
152153
if (!Objects.equals(currentVersion0.getSystemMetadata(), newAspect.getSystemMetadata())
153-
|| !Objects.equals(currentVersion0.getRecordTemplate(), newAspect.getRecordTemplate())) {
154+
|| !isNoOp) {
155+
// update no-op used for tracing
156+
SystemMetadataUtils.setNoOp(newAspect.getSystemMetadata(), isNoOp);
157+
// add trace - overwrite if version incremented
158+
newAspect.setSystemMetadata(opContext.withTraceId(newAspect.getSystemMetadata(), true));
154159
updated = updateAspect(txContext, newAspect);
155160
}
156161

metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import com.linkedin.metadata.utils.EntityApiUtils;
7777
import com.linkedin.metadata.utils.GenericRecordUtils;
7878
import com.linkedin.metadata.utils.PegasusUtils;
79+
import com.linkedin.metadata.utils.SystemMetadataUtils;
7980
import com.linkedin.metadata.utils.metrics.MetricUtils;
8081
import com.linkedin.mxe.MetadataAuditOperation;
8182
import com.linkedin.mxe.MetadataChangeLog;
@@ -2050,7 +2051,8 @@ public Optional<Pair<Future<?>, Boolean>> conditionallyProduceMCLAsync(
20502051
Urn entityUrn,
20512052
AuditStamp auditStamp,
20522053
AspectSpec aspectSpec) {
2053-
boolean isNoOp = Objects.equals(oldAspect, newAspect);
2054+
boolean isNoOp =
2055+
SystemMetadataUtils.isNoOp(newSystemMetadata) || Objects.equals(oldAspect, newAspect);
20542056
if (!isNoOp || alwaysEmitChangeLog || shouldAspectEmitChangeLog(aspectSpec)) {
20552057
log.info("Producing MCL for ingested aspect {}, urn {}", aspectSpec.getName(), entityUrn);
20562058

metadata-io/src/main/java/com/linkedin/metadata/trace/TraceServiceImpl.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import com.linkedin.metadata.run.AspectRowSummary;
1212
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
1313
import com.linkedin.metadata.systemmetadata.TraceService;
14+
import com.linkedin.metadata.utils.SystemMetadataUtils;
1415
import com.linkedin.mxe.FailedMetadataChangeProposal;
1516
import com.linkedin.mxe.SystemMetadata;
1617
import com.linkedin.util.Pair;
@@ -168,7 +169,12 @@ private Map<Urn, LinkedHashMap<String, TraceStorageStatus>> tracePrimaryInParall
168169
String aspectName = aspectEntry.getKey();
169170

170171
if (traceId.equals(systemTraceId)) {
171-
aspectStatuses.put(aspectName, TraceStorageStatus.ok(TraceWriteStatus.ACTIVE_STATE));
172+
if (SystemMetadataUtils.isNoOp(systemMetadata)) {
173+
aspectStatuses.put(aspectName, TraceStorageStatus.ok(TraceWriteStatus.NO_OP));
174+
} else {
175+
aspectStatuses.put(
176+
aspectName, TraceStorageStatus.ok(TraceWriteStatus.ACTIVE_STATE));
177+
}
172178
} else if (traceTimestampMillis <= extractTimestamp(systemTraceId, createdOnMillis)) {
173179
aspectStatuses.put(
174180
aspectName, TraceStorageStatus.ok(TraceWriteStatus.HISTORIC_STATE));
@@ -421,7 +427,9 @@ private static Map<String, TraceStatus> mergeStatus(
421427
storageEntry -> {
422428
String aspectName = storageEntry.getKey();
423429
TraceStorageStatus primaryStatus = storageEntry.getValue();
424-
TraceStorageStatus searchStatus = searchAspectStatus.get(aspectName);
430+
TraceStorageStatus searchStatus =
431+
searchAspectStatus.getOrDefault(
432+
aspectName, TraceStorageStatus.ok(TraceWriteStatus.PENDING));
425433
TraceStatus traceStatus =
426434
TraceStatus.builder()
427435
.primaryStorage(primaryStatus)
@@ -448,7 +456,7 @@ private static Map<String, TraceStatus> mergeStatus(
448456
}
449457

450458
private static boolean isSuccess(
451-
TraceStorageStatus primaryStatus, TraceStorageStatus searchStatus) {
459+
@Nonnull TraceStorageStatus primaryStatus, @Nonnull TraceStorageStatus searchStatus) {
452460
return !TraceWriteStatus.ERROR.equals(primaryStatus.getWriteStatus())
453461
&& !TraceWriteStatus.ERROR.equals(searchStatus.getWriteStatus());
454462
}

0 commit comments

Comments
 (0)