Skip to content

Commit 4e80d67

Browse files
committed
fix(graph-edge): fix graph edge delete exception
1 parent eef9759 commit 4e80d67

File tree

4 files changed

+259
-23
lines changed

4 files changed

+259
-23
lines changed

entity-registry/src/main/java/com/linkedin/metadata/models/extractor/FieldExtractor.java

+25-6
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import java.util.Optional;
1515
import java.util.function.Function;
1616
import java.util.stream.Collectors;
17-
import javax.annotation.Nonnull;
17+
import javax.annotation.Nullable;
1818

1919
/** Extracts fields from a RecordTemplate based on the appropriate {@link FieldSpec}. */
2020
public class FieldExtractor {
@@ -30,16 +30,35 @@ private static long getNumArrayWildcards(PathSpec pathSpec) {
3030

3131
// Extract the value of each field in the field specs from the input record
3232
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
33-
@Nonnull RecordTemplate record, List<T> fieldSpecs) {
34-
return extractFields(record, fieldSpecs, MAX_VALUE_LENGTH);
33+
@Nullable RecordTemplate record, List<T> fieldSpecs) {
34+
return extractFields(record, fieldSpecs, false);
3535
}
3636

3737
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
38-
@Nonnull RecordTemplate record, List<T> fieldSpecs, int maxValueLength) {
38+
@Nullable RecordTemplate record, List<T> fieldSpecs, boolean requiredFieldExtract) {
39+
return extractFields(record, fieldSpecs, MAX_VALUE_LENGTH, requiredFieldExtract);
40+
}
41+
42+
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
43+
@Nullable RecordTemplate record, List<T> fieldSpecs, int maxValueLength) {
44+
return extractFields(record, fieldSpecs, maxValueLength, false);
45+
}
46+
47+
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
48+
@Nullable RecordTemplate record,
49+
List<T> fieldSpecs,
50+
int maxValueLength,
51+
boolean requiredFieldExtract) {
3952
final Map<T, List<Object>> extractedFields = new HashMap<>();
4053
for (T fieldSpec : fieldSpecs) {
41-
Optional<Object> value = RecordUtils.getFieldValue(record, fieldSpec.getPath());
42-
if (!value.isPresent()) {
54+
if (requiredFieldExtract && record == null) {
55+
throw new IllegalArgumentException(
56+
"Field extraction is required and the RecordTemplate is null");
57+
}
58+
Optional<Object> value =
59+
Optional.ofNullable(record)
60+
.flatMap(maybeRecord -> RecordUtils.getFieldValue(maybeRecord, fieldSpec.getPath()));
61+
if (value.isEmpty()) {
4362
extractedFields.put(fieldSpec, Collections.emptyList());
4463
} else {
4564
long numArrayWildcards = getNumArrayWildcards(fieldSpec.getPath());

metadata-io/src/main/java/com/linkedin/metadata/service/UpdateGraphIndicesService.java

+28-17
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,10 @@ private void handleDeleteChangeEvent(
190190
urn.getEntityType(), event.getAspectName()));
191191
}
192192

193-
RecordTemplate aspect = event.getRecordTemplate();
193+
final RecordTemplate aspect =
194+
event.getPreviousRecordTemplate() != null
195+
? event.getPreviousRecordTemplate()
196+
: event.getRecordTemplate();
194197
Boolean isDeletingKey = event.getAspectName().equals(entitySpec.getKeyAspectName());
195198

196199
if (!aspectSpec.isTimeseries()) {
@@ -307,7 +310,8 @@ private Pair<List<Edge>, HashMap<Urn, Set<String>>> getEdgesAndRelationshipTypes
307310
}
308311

309312
Map<RelationshipFieldSpec, List<Object>> extractedFields =
310-
FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs());
313+
FieldExtractor.extractFields(
314+
aspect, aspectSpec.getRelationshipFieldSpecs(), true);
311315

312316
for (Map.Entry<RelationshipFieldSpec, List<Object>> entry : extractedFields.entrySet()) {
313317
Set<String> relationshipTypes =
@@ -433,29 +437,36 @@ private void deleteGraphData(
433437
@Nonnull final OperationContext opContext,
434438
@Nonnull final Urn urn,
435439
@Nonnull final AspectSpec aspectSpec,
436-
@Nonnull final RecordTemplate aspect,
440+
@Nullable final RecordTemplate aspect,
437441
@Nonnull final Boolean isKeyAspect,
438442
@Nonnull final MetadataChangeLog event) {
439443
if (isKeyAspect) {
440444
graphService.removeNode(opContext, urn);
441445
return;
442446
}
443447

444-
Pair<List<Edge>, HashMap<Urn, Set<String>>> edgeAndRelationTypes =
445-
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true);
446-
447-
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded =
448-
edgeAndRelationTypes.getSecond();
449-
if (!urnToRelationshipTypesBeingAdded.isEmpty()) {
450-
for (Map.Entry<Urn, Set<String>> entry : urnToRelationshipTypesBeingAdded.entrySet()) {
451-
graphService.removeEdgesFromNode(
452-
opContext,
453-
entry.getKey(),
454-
new ArrayList<>(entry.getValue()),
455-
createRelationshipFilter(
456-
new Filter().setOr(new ConjunctiveCriterionArray()),
457-
RelationshipDirection.OUTGOING));
448+
if (aspect != null) {
449+
Pair<List<Edge>, HashMap<Urn, Set<String>>> edgeAndRelationTypes =
450+
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true);
451+
452+
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded =
453+
edgeAndRelationTypes.getSecond();
454+
if (!urnToRelationshipTypesBeingAdded.isEmpty()) {
455+
for (Map.Entry<Urn, Set<String>> entry : urnToRelationshipTypesBeingAdded.entrySet()) {
456+
graphService.removeEdgesFromNode(
457+
opContext,
458+
entry.getKey(),
459+
new ArrayList<>(entry.getValue()),
460+
createRelationshipFilter(
461+
new Filter().setOr(new ConjunctiveCriterionArray()),
462+
RelationshipDirection.OUTGOING));
463+
}
458464
}
465+
} else {
466+
log.warn(
467+
"Insufficient information to perform graph delete. Missing deleted aspect {} for entity {}",
468+
aspectSpec.getName(),
469+
urn);
459470
}
460471
}
461472
}

metadata-io/src/test/java/com/linkedin/metadata/service/UpdateGraphIndicesServiceTest.java

+122
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,23 @@
11
package com.linkedin.metadata.service;
22

3+
import static com.linkedin.metadata.Constants.CONTAINER_ENTITY_NAME;
4+
import static com.linkedin.metadata.search.utils.QueryUtils.createRelationshipFilter;
5+
import static com.linkedin.metadata.utils.CriterionUtils.buildCriterion;
36
import static org.mockito.ArgumentMatchers.any;
7+
import static org.mockito.ArgumentMatchers.eq;
8+
import static org.mockito.ArgumentMatchers.nullable;
49
import static org.mockito.Mockito.mock;
510
import static org.mockito.Mockito.reset;
611
import static org.mockito.Mockito.times;
712
import static org.mockito.Mockito.verify;
813
import static org.mockito.Mockito.verifyNoInteractions;
914
import static org.testng.Assert.assertEquals;
1015

16+
import com.google.common.collect.ImmutableList;
1117
import com.linkedin.common.Status;
1218
import com.linkedin.common.urn.Urn;
1319
import com.linkedin.common.urn.UrnUtils;
20+
import com.linkedin.container.Container;
1421
import com.linkedin.dataset.DatasetProperties;
1522
import com.linkedin.events.metadata.ChangeType;
1623
import com.linkedin.metadata.Constants;
@@ -21,6 +28,14 @@
2128
import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService;
2229
import com.linkedin.metadata.models.registry.EntityRegistry;
2330
import com.linkedin.metadata.models.registry.LineageRegistry;
31+
import com.linkedin.metadata.query.filter.Condition;
32+
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
33+
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
34+
import com.linkedin.metadata.query.filter.Criterion;
35+
import com.linkedin.metadata.query.filter.CriterionArray;
36+
import com.linkedin.metadata.query.filter.Filter;
37+
import com.linkedin.metadata.query.filter.RelationshipDirection;
38+
import com.linkedin.metadata.query.filter.RelationshipFilter;
2439
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
2540
import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor;
2641
import com.linkedin.metadata.utils.GenericRecordUtils;
@@ -29,6 +44,8 @@
2944
import com.linkedin.mxe.MetadataChangeLog;
3045
import io.datahubproject.metadata.context.OperationContext;
3146
import io.datahubproject.test.metadata.context.TestOperationContexts;
47+
import java.util.List;
48+
import javax.annotation.Nonnull;
3249
import org.mockito.ArgumentCaptor;
3350
import org.opensearch.index.query.QueryBuilder;
3451
import org.opensearch.script.Script;
@@ -180,4 +197,109 @@ public void testStatusNoOpEvent() {
180197

181198
verifyNoInteractions(mockWriteDAO);
182199
}
200+
201+
@Test
202+
public void testMissingAspectGraphDelete() {
203+
// Test deleting a null aspect
204+
test.handleChangeEvent(
205+
TEST_OP_CONTEXT,
206+
new MetadataChangeLog()
207+
.setChangeType(ChangeType.DELETE)
208+
.setEntityType(TEST_URN.getEntityType())
209+
.setEntityUrn(TEST_URN)
210+
.setAspectName(Constants.CONTAINER_ASPECT_NAME));
211+
212+
// For missing aspects, verify no writes
213+
verifyNoInteractions(mockWriteDAO);
214+
}
215+
216+
@Test
217+
public void testNodeGraphDelete() {
218+
Urn containerUrn = UrnUtils.getUrn("urn:li:container:foo");
219+
220+
// Test deleting container entity
221+
test.handleChangeEvent(
222+
TEST_OP_CONTEXT,
223+
new MetadataChangeLog()
224+
.setChangeType(ChangeType.DELETE)
225+
.setEntityType(CONTAINER_ENTITY_NAME)
226+
.setEntityUrn(containerUrn)
227+
.setAspectName(Constants.CONTAINER_KEY_ASPECT_NAME));
228+
229+
// Delete all outgoing edges of this entity
230+
verify(mockWriteDAO, times(1))
231+
.deleteByQuery(
232+
eq(TEST_OP_CONTEXT),
233+
nullable(String.class),
234+
eq(createUrnFilter(containerUrn)),
235+
nullable(String.class),
236+
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
237+
eq(List.of()),
238+
eq(new RelationshipFilter().setDirection(RelationshipDirection.OUTGOING)));
239+
240+
// Delete all incoming edges of this entity
241+
verify(mockWriteDAO, times(1))
242+
.deleteByQuery(
243+
eq(TEST_OP_CONTEXT),
244+
nullable(String.class),
245+
eq(createUrnFilter(containerUrn)),
246+
nullable(String.class),
247+
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
248+
eq(List.of()),
249+
eq(new RelationshipFilter().setDirection(RelationshipDirection.INCOMING)));
250+
251+
// Delete all edges where this entity is a lifecycle owner
252+
verify(mockWriteDAO, times(1))
253+
.deleteByQuery(
254+
eq(TEST_OP_CONTEXT),
255+
nullable(String.class),
256+
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
257+
nullable(String.class),
258+
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
259+
eq(List.of()),
260+
eq(new RelationshipFilter().setDirection(RelationshipDirection.INCOMING)),
261+
eq(containerUrn.toString()));
262+
}
263+
264+
@Test
265+
public void testContainerDelete() {
266+
Urn containerUrn = UrnUtils.getUrn("urn:li:container:foo");
267+
268+
// Test deleting a container aspect
269+
test.handleChangeEvent(
270+
TEST_OP_CONTEXT,
271+
new MetadataChangeLog()
272+
.setChangeType(ChangeType.DELETE)
273+
.setEntityType(TEST_URN.getEntityType())
274+
.setEntityUrn(TEST_URN)
275+
.setAspectName(Constants.CONTAINER_ASPECT_NAME)
276+
.setPreviousAspectValue(
277+
GenericRecordUtils.serializeAspect(new Container().setContainer(containerUrn))));
278+
279+
// For container aspects, verify that only edges are removed in both cases
280+
verify(mockWriteDAO, times(1))
281+
.deleteByQuery(
282+
eq(TEST_OP_CONTEXT),
283+
nullable(String.class),
284+
eq(createUrnFilter(TEST_URN)),
285+
nullable(String.class),
286+
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
287+
eq(List.of("IsPartOf")),
288+
eq(
289+
createRelationshipFilter(
290+
new Filter().setOr(new ConjunctiveCriterionArray()),
291+
RelationshipDirection.OUTGOING)));
292+
}
293+
294+
private static Filter createUrnFilter(@Nonnull final Urn urn) {
295+
Filter filter = new Filter();
296+
CriterionArray criterionArray = new CriterionArray();
297+
Criterion criterion = buildCriterion("urn", Condition.EQUAL, urn.toString());
298+
criterionArray.add(criterion);
299+
filter.setOr(
300+
new ConjunctiveCriterionArray(
301+
ImmutableList.of(new ConjunctiveCriterion().setAnd(criterionArray))));
302+
303+
return filter;
304+
}
183305
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package com.linkedin.metadata.service;
2+
3+
import static com.linkedin.metadata.Constants.CONTAINER_ASPECT_NAME;
4+
import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME;
5+
import static org.mockito.ArgumentMatchers.nullable;
6+
import static org.mockito.Mockito.eq;
7+
import static org.mockito.Mockito.verify;
8+
9+
import com.linkedin.common.urn.Urn;
10+
import com.linkedin.common.urn.UrnUtils;
11+
import com.linkedin.data.template.RecordTemplate;
12+
import com.linkedin.events.metadata.ChangeType;
13+
import com.linkedin.metadata.models.AspectSpec;
14+
import com.linkedin.metadata.models.EntitySpec;
15+
import com.linkedin.metadata.search.EntitySearchService;
16+
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
17+
import com.linkedin.metadata.search.transformer.SearchDocumentTransformer;
18+
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
19+
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
20+
import com.linkedin.metadata.utils.SystemMetadataUtils;
21+
import com.linkedin.mxe.MetadataChangeLog;
22+
import io.datahubproject.metadata.context.OperationContext;
23+
import io.datahubproject.test.metadata.context.TestOperationContexts;
24+
import org.mockito.Mock;
25+
import org.mockito.MockitoAnnotations;
26+
import org.testng.annotations.BeforeMethod;
27+
import org.testng.annotations.Test;
28+
29+
public class UpdateIndicesServiceTest {
30+
31+
@Mock private UpdateGraphIndicesService updateGraphIndicesService;
32+
@Mock private EntitySearchService entitySearchService;
33+
@Mock private TimeseriesAspectService timeseriesAspectService;
34+
@Mock private SystemMetadataService systemMetadataService;
35+
@Mock private SearchDocumentTransformer searchDocumentTransformer;
36+
@Mock private EntityIndexBuilders entityIndexBuilders;
37+
38+
private OperationContext operationContext;
39+
private UpdateIndicesService updateIndicesService;
40+
41+
@BeforeMethod
42+
public void setup() {
43+
MockitoAnnotations.openMocks(this);
44+
operationContext = TestOperationContexts.systemContextNoSearchAuthorization();
45+
updateIndicesService =
46+
new UpdateIndicesService(
47+
updateGraphIndicesService,
48+
entitySearchService,
49+
timeseriesAspectService,
50+
systemMetadataService,
51+
searchDocumentTransformer,
52+
entityIndexBuilders,
53+
"MD5");
54+
}
55+
56+
@Test
57+
public void testContainerHandleDeleteEvent() throws Exception {
58+
Urn urn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)");
59+
EntitySpec entitySpec = operationContext.getEntityRegistry().getEntitySpec(DATASET_ENTITY_NAME);
60+
AspectSpec aspectSpec = entitySpec.getAspectSpec(CONTAINER_ASPECT_NAME);
61+
62+
// Create test data
63+
MetadataChangeLog event = new MetadataChangeLog();
64+
event.setChangeType(ChangeType.DELETE);
65+
event.setEntityUrn(urn);
66+
event.setAspectName(CONTAINER_ASPECT_NAME);
67+
event.setEntityType(urn.getEntityType());
68+
event.setSystemMetadata(SystemMetadataUtils.createDefaultSystemMetadata());
69+
70+
// Execute Delete
71+
updateIndicesService.handleChangeEvent(operationContext, event);
72+
73+
// Verify
74+
verify(systemMetadataService).deleteAspect(urn.toString(), CONTAINER_ASPECT_NAME);
75+
verify(searchDocumentTransformer)
76+
.transformAspect(
77+
eq(operationContext),
78+
eq(urn),
79+
nullable(RecordTemplate.class),
80+
eq(aspectSpec),
81+
eq(true));
82+
verify(updateGraphIndicesService).handleChangeEvent(operationContext, event);
83+
}
84+
}

0 commit comments

Comments
 (0)