Skip to content

Commit bb682c5

Browse files
committed
fix(graph-edge): fix graph edge delete exception
1 parent eef9759 commit bb682c5

File tree

4 files changed

+264
-27
lines changed

4 files changed

+264
-27
lines changed

entity-registry/src/main/java/com/linkedin/metadata/models/extractor/FieldExtractor.java

+25-6
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import java.util.Optional;
1515
import java.util.function.Function;
1616
import java.util.stream.Collectors;
17-
import javax.annotation.Nonnull;
17+
import javax.annotation.Nullable;
1818

1919
/** Extracts fields from a RecordTemplate based on the appropriate {@link FieldSpec}. */
2020
public class FieldExtractor {
@@ -30,16 +30,35 @@ private static long getNumArrayWildcards(PathSpec pathSpec) {
3030

3131
// Extract the value of each field in the field specs from the input record
3232
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
33-
@Nonnull RecordTemplate record, List<T> fieldSpecs) {
34-
return extractFields(record, fieldSpecs, MAX_VALUE_LENGTH);
33+
@Nullable RecordTemplate record, List<T> fieldSpecs) {
34+
return extractFields(record, fieldSpecs, false);
3535
}
3636

3737
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
38-
@Nonnull RecordTemplate record, List<T> fieldSpecs, int maxValueLength) {
38+
@Nullable RecordTemplate record, List<T> fieldSpecs, boolean requiredFieldExtract) {
39+
return extractFields(record, fieldSpecs, MAX_VALUE_LENGTH, requiredFieldExtract);
40+
}
41+
42+
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
43+
@Nullable RecordTemplate record, List<T> fieldSpecs, int maxValueLength) {
44+
return extractFields(record, fieldSpecs, maxValueLength, false);
45+
}
46+
47+
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
48+
@Nullable RecordTemplate record,
49+
List<T> fieldSpecs,
50+
int maxValueLength,
51+
boolean requiredFieldExtract) {
3952
final Map<T, List<Object>> extractedFields = new HashMap<>();
4053
for (T fieldSpec : fieldSpecs) {
41-
Optional<Object> value = RecordUtils.getFieldValue(record, fieldSpec.getPath());
42-
if (!value.isPresent()) {
54+
if (requiredFieldExtract && record == null) {
55+
throw new IllegalArgumentException(
56+
"Field extraction is required and the RecordTemplate is null");
57+
}
58+
Optional<Object> value =
59+
Optional.ofNullable(record)
60+
.flatMap(maybeRecord -> RecordUtils.getFieldValue(maybeRecord, fieldSpec.getPath()));
61+
if (value.isEmpty()) {
4362
extractedFields.put(fieldSpec, Collections.emptyList());
4463
} else {
4564
long numArrayWildcards = getNumArrayWildcards(fieldSpec.getPath());

metadata-io/src/main/java/com/linkedin/metadata/service/UpdateGraphIndicesService.java

+33-21
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,10 @@ private void handleDeleteChangeEvent(
190190
urn.getEntityType(), event.getAspectName()));
191191
}
192192

193-
RecordTemplate aspect = event.getRecordTemplate();
193+
final RecordTemplate aspect =
194+
event.getPreviousRecordTemplate() != null
195+
? event.getPreviousRecordTemplate()
196+
: event.getRecordTemplate();
194197
Boolean isDeletingKey = event.getAspectName().equals(entitySpec.getKeyAspectName());
195198

196199
if (!aspectSpec.isTimeseries()) {
@@ -279,7 +282,8 @@ private Pair<List<Edge>, HashMap<Urn, Set<String>>> getEdgesAndRelationshipTypes
279282
@Nonnull final AspectSpec aspectSpec,
280283
@Nonnull final RecordTemplate aspect,
281284
@Nonnull final MetadataChangeLog event,
282-
final boolean isNewAspectVersion) {
285+
final boolean isNewAspectVersion,
286+
final boolean requiredFieldExtract) {
283287
final List<Edge> edgesToAdd = new ArrayList<>();
284288
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded = new HashMap<>();
285289

@@ -307,7 +311,8 @@ private Pair<List<Edge>, HashMap<Urn, Set<String>>> getEdgesAndRelationshipTypes
307311
}
308312

309313
Map<RelationshipFieldSpec, List<Object>> extractedFields =
310-
FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs());
314+
FieldExtractor.extractFields(
315+
aspect, aspectSpec.getRelationshipFieldSpecs(), requiredFieldExtract);
311316

312317
for (Map.Entry<RelationshipFieldSpec, List<Object>> entry : extractedFields.entrySet()) {
313318
Set<String> relationshipTypes =
@@ -329,7 +334,7 @@ private void updateGraphService(
329334
@Nonnull final RecordTemplate aspect,
330335
@Nonnull final MetadataChangeLog event) {
331336
Pair<List<Edge>, HashMap<Urn, Set<String>>> edgeAndRelationTypes =
332-
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true);
337+
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true, false);
333338

334339
final List<Edge> edgesToAdd = edgeAndRelationTypes.getFirst();
335340
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded =
@@ -359,7 +364,7 @@ private void updateGraphServiceDiff(
359364
Pair<List<Edge>, HashMap<Urn, Set<String>>> oldEdgeAndRelationTypes = null;
360365
if (oldAspect != null) {
361366
oldEdgeAndRelationTypes =
362-
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, oldAspect, event, false);
367+
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, oldAspect, event, false, false);
363368
}
364369

365370
final List<Edge> oldEdges =
@@ -369,7 +374,7 @@ private void updateGraphServiceDiff(
369374
final Set<Edge> oldEdgeSet = new HashSet<>(oldEdges);
370375

371376
Pair<List<Edge>, HashMap<Urn, Set<String>>> newEdgeAndRelationTypes =
372-
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, newAspect, event, true);
377+
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, newAspect, event, true, false);
373378

374379
final List<Edge> newEdges = newEdgeAndRelationTypes.getFirst();
375380
final Set<Edge> newEdgeSet = new HashSet<>(newEdges);
@@ -433,29 +438,36 @@ private void deleteGraphData(
433438
@Nonnull final OperationContext opContext,
434439
@Nonnull final Urn urn,
435440
@Nonnull final AspectSpec aspectSpec,
436-
@Nonnull final RecordTemplate aspect,
441+
@Nullable final RecordTemplate aspect,
437442
@Nonnull final Boolean isKeyAspect,
438443
@Nonnull final MetadataChangeLog event) {
439444
if (isKeyAspect) {
440445
graphService.removeNode(opContext, urn);
441446
return;
442447
}
443448

444-
Pair<List<Edge>, HashMap<Urn, Set<String>>> edgeAndRelationTypes =
445-
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true);
446-
447-
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded =
448-
edgeAndRelationTypes.getSecond();
449-
if (!urnToRelationshipTypesBeingAdded.isEmpty()) {
450-
for (Map.Entry<Urn, Set<String>> entry : urnToRelationshipTypesBeingAdded.entrySet()) {
451-
graphService.removeEdgesFromNode(
452-
opContext,
453-
entry.getKey(),
454-
new ArrayList<>(entry.getValue()),
455-
createRelationshipFilter(
456-
new Filter().setOr(new ConjunctiveCriterionArray()),
457-
RelationshipDirection.OUTGOING));
449+
if (aspect != null) {
450+
Pair<List<Edge>, HashMap<Urn, Set<String>>> edgeAndRelationTypes =
451+
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true, true);
452+
453+
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded =
454+
edgeAndRelationTypes.getSecond();
455+
if (!urnToRelationshipTypesBeingAdded.isEmpty()) {
456+
for (Map.Entry<Urn, Set<String>> entry : urnToRelationshipTypesBeingAdded.entrySet()) {
457+
graphService.removeEdgesFromNode(
458+
opContext,
459+
entry.getKey(),
460+
new ArrayList<>(entry.getValue()),
461+
createRelationshipFilter(
462+
new Filter().setOr(new ConjunctiveCriterionArray()),
463+
RelationshipDirection.OUTGOING));
464+
}
458465
}
466+
} else {
467+
log.warn(
468+
"Insufficient information to perform graph delete. Missing deleted aspect {} for entity {}",
469+
aspectSpec.getName(),
470+
urn);
459471
}
460472
}
461473
}

metadata-io/src/test/java/com/linkedin/metadata/service/UpdateGraphIndicesServiceTest.java

+122
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,23 @@
11
package com.linkedin.metadata.service;
22

3+
import static com.linkedin.metadata.Constants.CONTAINER_ENTITY_NAME;
4+
import static com.linkedin.metadata.search.utils.QueryUtils.createRelationshipFilter;
5+
import static com.linkedin.metadata.utils.CriterionUtils.buildCriterion;
36
import static org.mockito.ArgumentMatchers.any;
7+
import static org.mockito.ArgumentMatchers.eq;
8+
import static org.mockito.ArgumentMatchers.nullable;
49
import static org.mockito.Mockito.mock;
510
import static org.mockito.Mockito.reset;
611
import static org.mockito.Mockito.times;
712
import static org.mockito.Mockito.verify;
813
import static org.mockito.Mockito.verifyNoInteractions;
914
import static org.testng.Assert.assertEquals;
1015

16+
import com.google.common.collect.ImmutableList;
1117
import com.linkedin.common.Status;
1218
import com.linkedin.common.urn.Urn;
1319
import com.linkedin.common.urn.UrnUtils;
20+
import com.linkedin.container.Container;
1421
import com.linkedin.dataset.DatasetProperties;
1522
import com.linkedin.events.metadata.ChangeType;
1623
import com.linkedin.metadata.Constants;
@@ -21,6 +28,14 @@
2128
import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService;
2229
import com.linkedin.metadata.models.registry.EntityRegistry;
2330
import com.linkedin.metadata.models.registry.LineageRegistry;
31+
import com.linkedin.metadata.query.filter.Condition;
32+
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
33+
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
34+
import com.linkedin.metadata.query.filter.Criterion;
35+
import com.linkedin.metadata.query.filter.CriterionArray;
36+
import com.linkedin.metadata.query.filter.Filter;
37+
import com.linkedin.metadata.query.filter.RelationshipDirection;
38+
import com.linkedin.metadata.query.filter.RelationshipFilter;
2439
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
2540
import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor;
2641
import com.linkedin.metadata.utils.GenericRecordUtils;
@@ -29,6 +44,8 @@
2944
import com.linkedin.mxe.MetadataChangeLog;
3045
import io.datahubproject.metadata.context.OperationContext;
3146
import io.datahubproject.test.metadata.context.TestOperationContexts;
47+
import java.util.List;
48+
import javax.annotation.Nonnull;
3249
import org.mockito.ArgumentCaptor;
3350
import org.opensearch.index.query.QueryBuilder;
3451
import org.opensearch.script.Script;
@@ -180,4 +197,109 @@ public void testStatusNoOpEvent() {
180197

181198
verifyNoInteractions(mockWriteDAO);
182199
}
200+
201+
@Test
202+
public void testMissingAspectGraphDelete() {
203+
// Test deleting a null aspect
204+
test.handleChangeEvent(
205+
TEST_OP_CONTEXT,
206+
new MetadataChangeLog()
207+
.setChangeType(ChangeType.DELETE)
208+
.setEntityType(TEST_URN.getEntityType())
209+
.setEntityUrn(TEST_URN)
210+
.setAspectName(Constants.CONTAINER_ASPECT_NAME));
211+
212+
// For missing aspects, verify no writes
213+
verifyNoInteractions(mockWriteDAO);
214+
}
215+
216+
@Test
217+
public void testNodeGraphDelete() {
218+
Urn containerUrn = UrnUtils.getUrn("urn:li:container:foo");
219+
220+
// Test deleting container entity
221+
test.handleChangeEvent(
222+
TEST_OP_CONTEXT,
223+
new MetadataChangeLog()
224+
.setChangeType(ChangeType.DELETE)
225+
.setEntityType(CONTAINER_ENTITY_NAME)
226+
.setEntityUrn(containerUrn)
227+
.setAspectName(Constants.CONTAINER_KEY_ASPECT_NAME));
228+
229+
// Delete all outgoing edges of this entity
230+
verify(mockWriteDAO, times(1))
231+
.deleteByQuery(
232+
eq(TEST_OP_CONTEXT),
233+
nullable(String.class),
234+
eq(createUrnFilter(containerUrn)),
235+
nullable(String.class),
236+
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
237+
eq(List.of()),
238+
eq(new RelationshipFilter().setDirection(RelationshipDirection.OUTGOING)));
239+
240+
// Delete all incoming edges of this entity
241+
verify(mockWriteDAO, times(1))
242+
.deleteByQuery(
243+
eq(TEST_OP_CONTEXT),
244+
nullable(String.class),
245+
eq(createUrnFilter(containerUrn)),
246+
nullable(String.class),
247+
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
248+
eq(List.of()),
249+
eq(new RelationshipFilter().setDirection(RelationshipDirection.INCOMING)));
250+
251+
// Delete all edges where this entity is a lifecycle owner
252+
verify(mockWriteDAO, times(1))
253+
.deleteByQuery(
254+
eq(TEST_OP_CONTEXT),
255+
nullable(String.class),
256+
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
257+
nullable(String.class),
258+
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
259+
eq(List.of()),
260+
eq(new RelationshipFilter().setDirection(RelationshipDirection.INCOMING)),
261+
eq(containerUrn.toString()));
262+
}
263+
264+
@Test
265+
public void testContainerDelete() {
266+
Urn containerUrn = UrnUtils.getUrn("urn:li:container:foo");
267+
268+
// Test deleting a container aspect
269+
test.handleChangeEvent(
270+
TEST_OP_CONTEXT,
271+
new MetadataChangeLog()
272+
.setChangeType(ChangeType.DELETE)
273+
.setEntityType(TEST_URN.getEntityType())
274+
.setEntityUrn(TEST_URN)
275+
.setAspectName(Constants.CONTAINER_ASPECT_NAME)
276+
.setPreviousAspectValue(
277+
GenericRecordUtils.serializeAspect(new Container().setContainer(containerUrn))));
278+
279+
// For container aspects, verify that only edges are removed in both cases
280+
verify(mockWriteDAO, times(1))
281+
.deleteByQuery(
282+
eq(TEST_OP_CONTEXT),
283+
nullable(String.class),
284+
eq(createUrnFilter(TEST_URN)),
285+
nullable(String.class),
286+
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
287+
eq(List.of("IsPartOf")),
288+
eq(
289+
createRelationshipFilter(
290+
new Filter().setOr(new ConjunctiveCriterionArray()),
291+
RelationshipDirection.OUTGOING)));
292+
}
293+
294+
private static Filter createUrnFilter(@Nonnull final Urn urn) {
295+
Filter filter = new Filter();
296+
CriterionArray criterionArray = new CriterionArray();
297+
Criterion criterion = buildCriterion("urn", Condition.EQUAL, urn.toString());
298+
criterionArray.add(criterion);
299+
filter.setOr(
300+
new ConjunctiveCriterionArray(
301+
ImmutableList.of(new ConjunctiveCriterion().setAnd(criterionArray))));
302+
303+
return filter;
304+
}
183305
}

0 commit comments

Comments
 (0)