Skip to content

Commit 09ba17e

Browse files
committed
fix(graph-edge): fix graph edge delete exception
1 parent eef9759 commit 09ba17e

File tree

5 files changed

+268
-39
lines changed

5 files changed

+268
-39
lines changed

.github/workflows/build-and-test.yml

+2
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ jobs:
7575
path: |
7676
~/.cache/uv
7777
key: ${{ runner.os }}-uv-${{ hashFiles('**/requirements.txt') }}
78+
- name: Install dependencies
79+
run: ./metadata-ingestion/scripts/install_deps.sh
7880
- name: Set up JDK 17
7981
uses: actions/setup-java@v4
8082
with:

entity-registry/src/main/java/com/linkedin/metadata/models/extractor/FieldExtractor.java

+24-5
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import java.util.Optional;
1515
import java.util.function.Function;
1616
import java.util.stream.Collectors;
17-
import javax.annotation.Nonnull;
17+
import javax.annotation.Nullable;
1818

1919
/** Extracts fields from a RecordTemplate based on the appropriate {@link FieldSpec}. */
2020
public class FieldExtractor {
@@ -30,15 +30,34 @@ private static long getNumArrayWildcards(PathSpec pathSpec) {
3030

3131
// Extract the value of each field in the field specs from the input record
3232
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
33-
@Nonnull RecordTemplate record, List<T> fieldSpecs) {
34-
return extractFields(record, fieldSpecs, MAX_VALUE_LENGTH);
33+
@Nullable RecordTemplate record, List<T> fieldSpecs) {
34+
return extractFields(record, fieldSpecs, false);
3535
}
3636

3737
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
38-
@Nonnull RecordTemplate record, List<T> fieldSpecs, int maxValueLength) {
38+
@Nullable RecordTemplate record, List<T> fieldSpecs, boolean requiredFieldExtract) {
39+
return extractFields(record, fieldSpecs, MAX_VALUE_LENGTH, requiredFieldExtract);
40+
}
41+
42+
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
43+
@Nullable RecordTemplate record, List<T> fieldSpecs, int maxValueLength) {
44+
return extractFields(record, fieldSpecs, maxValueLength, false);
45+
}
46+
47+
public static <T extends FieldSpec> Map<T, List<Object>> extractFields(
48+
@Nullable RecordTemplate record,
49+
List<T> fieldSpecs,
50+
int maxValueLength,
51+
boolean requiredFieldExtract) {
3952
final Map<T, List<Object>> extractedFields = new HashMap<>();
4053
for (T fieldSpec : fieldSpecs) {
41-
Optional<Object> value = RecordUtils.getFieldValue(record, fieldSpec.getPath());
54+
if (requiredFieldExtract && record == null) {
55+
throw new IllegalArgumentException(
56+
"Field extraction is required and the RecordTemplate is null");
57+
}
58+
Optional<Object> value =
59+
Optional.ofNullable(record)
60+
.flatMap(maybeRecord -> RecordUtils.getFieldValue(maybeRecord, fieldSpec.getPath()));
4261
if (!value.isPresent()) {
4362
extractedFields.put(fieldSpec, Collections.emptyList());
4463
} else {

metadata-io/src/main/java/com/linkedin/metadata/service/UpdateGraphIndicesService.java

+36-34
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,10 @@ private void handleDeleteChangeEvent(
190190
urn.getEntityType(), event.getAspectName()));
191191
}
192192

193-
RecordTemplate aspect = event.getRecordTemplate();
193+
final RecordTemplate aspect =
194+
event.getPreviousRecordTemplate() != null
195+
? event.getPreviousRecordTemplate()
196+
: event.getRecordTemplate();
194197
Boolean isDeletingKey = event.getAspectName().equals(entitySpec.getKeyAspectName());
195198

196199
if (!aspectSpec.isTimeseries()) {
@@ -280,45 +283,37 @@ private Pair<List<Edge>, HashMap<Urn, Set<String>>> getEdgesAndRelationshipTypes
280283
@Nonnull final RecordTemplate aspect,
281284
@Nonnull final MetadataChangeLog event,
282285
final boolean isNewAspectVersion) {
283-
final List<Edge> edgesToAdd = new ArrayList<>();
284-
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded = new HashMap<>();
286+
final List<Edge> edges = new ArrayList<>();
287+
final HashMap<Urn, Set<String>> urnToRelationshipTypes = new HashMap<>();
285288

286289
// we need to manually set schemaField <-> schemaField edges for fineGrainedLineage and
287290
// inputFields
288291
// since @Relationship only links between the parent entity urn and something else.
289292
if (aspectSpec.getName().equals(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)) {
290293
UpstreamLineage upstreamLineage = new UpstreamLineage(aspect.data());
291294
updateFineGrainedEdgesAndRelationships(
292-
urn,
293-
upstreamLineage.getFineGrainedLineages(),
294-
edgesToAdd,
295-
urnToRelationshipTypesBeingAdded);
295+
urn, upstreamLineage.getFineGrainedLineages(), edges, urnToRelationshipTypes);
296296
} else if (aspectSpec.getName().equals(Constants.INPUT_FIELDS_ASPECT_NAME)) {
297297
final InputFields inputFields = new InputFields(aspect.data());
298-
updateInputFieldEdgesAndRelationships(
299-
urn, inputFields, edgesToAdd, urnToRelationshipTypesBeingAdded);
298+
updateInputFieldEdgesAndRelationships(urn, inputFields, edges, urnToRelationshipTypes);
300299
} else if (aspectSpec.getName().equals(Constants.DATA_JOB_INPUT_OUTPUT_ASPECT_NAME)) {
301300
DataJobInputOutput dataJobInputOutput = new DataJobInputOutput(aspect.data());
302301
updateFineGrainedEdgesAndRelationships(
303-
urn,
304-
dataJobInputOutput.getFineGrainedLineages(),
305-
edgesToAdd,
306-
urnToRelationshipTypesBeingAdded);
302+
urn, dataJobInputOutput.getFineGrainedLineages(), edges, urnToRelationshipTypes);
307303
}
308304

309305
Map<RelationshipFieldSpec, List<Object>> extractedFields =
310-
FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs());
306+
FieldExtractor.extractFields(aspect, aspectSpec.getRelationshipFieldSpecs(), true);
311307

312308
for (Map.Entry<RelationshipFieldSpec, List<Object>> entry : extractedFields.entrySet()) {
313-
Set<String> relationshipTypes =
314-
urnToRelationshipTypesBeingAdded.getOrDefault(urn, new HashSet<>());
309+
Set<String> relationshipTypes = urnToRelationshipTypes.getOrDefault(urn, new HashSet<>());
315310
relationshipTypes.add(entry.getKey().getRelationshipName());
316-
urnToRelationshipTypesBeingAdded.put(urn, relationshipTypes);
311+
urnToRelationshipTypes.put(urn, relationshipTypes);
317312
final List<Edge> newEdges =
318313
GraphIndexUtils.extractGraphEdges(entry, aspect, urn, event, isNewAspectVersion);
319-
edgesToAdd.addAll(newEdges);
314+
edges.addAll(newEdges);
320315
}
321-
return Pair.of(edgesToAdd, urnToRelationshipTypesBeingAdded);
316+
return Pair.of(edges, urnToRelationshipTypes);
322317
}
323318

324319
/** Process snapshot and update graph index */
@@ -433,29 +428,36 @@ private void deleteGraphData(
433428
@Nonnull final OperationContext opContext,
434429
@Nonnull final Urn urn,
435430
@Nonnull final AspectSpec aspectSpec,
436-
@Nonnull final RecordTemplate aspect,
431+
@Nullable final RecordTemplate aspect,
437432
@Nonnull final Boolean isKeyAspect,
438433
@Nonnull final MetadataChangeLog event) {
439434
if (isKeyAspect) {
440435
graphService.removeNode(opContext, urn);
441436
return;
442437
}
443438

444-
Pair<List<Edge>, HashMap<Urn, Set<String>>> edgeAndRelationTypes =
445-
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true);
446-
447-
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingAdded =
448-
edgeAndRelationTypes.getSecond();
449-
if (!urnToRelationshipTypesBeingAdded.isEmpty()) {
450-
for (Map.Entry<Urn, Set<String>> entry : urnToRelationshipTypesBeingAdded.entrySet()) {
451-
graphService.removeEdgesFromNode(
452-
opContext,
453-
entry.getKey(),
454-
new ArrayList<>(entry.getValue()),
455-
createRelationshipFilter(
456-
new Filter().setOr(new ConjunctiveCriterionArray()),
457-
RelationshipDirection.OUTGOING));
439+
if (aspect != null) {
440+
Pair<List<Edge>, HashMap<Urn, Set<String>>> edgeAndRelationTypes =
441+
getEdgesAndRelationshipTypesFromAspect(urn, aspectSpec, aspect, event, true);
442+
443+
final HashMap<Urn, Set<String>> urnToRelationshipTypesBeingRemoved =
444+
edgeAndRelationTypes.getSecond();
445+
if (!urnToRelationshipTypesBeingRemoved.isEmpty()) {
446+
for (Map.Entry<Urn, Set<String>> entry : urnToRelationshipTypesBeingRemoved.entrySet()) {
447+
graphService.removeEdgesFromNode(
448+
opContext,
449+
entry.getKey(),
450+
new ArrayList<>(entry.getValue()),
451+
createRelationshipFilter(
452+
new Filter().setOr(new ConjunctiveCriterionArray()),
453+
RelationshipDirection.OUTGOING));
454+
}
458455
}
456+
} else {
457+
log.warn(
458+
"Insufficient information to perform graph delete. Missing deleted aspect {} for entity {}",
459+
aspectSpec.getName(),
460+
urn);
459461
}
460462
}
461463
}

metadata-io/src/test/java/com/linkedin/metadata/service/UpdateGraphIndicesServiceTest.java

+122
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,23 @@
11
package com.linkedin.metadata.service;
22

3+
import static com.linkedin.metadata.Constants.CONTAINER_ENTITY_NAME;
4+
import static com.linkedin.metadata.search.utils.QueryUtils.createRelationshipFilter;
5+
import static com.linkedin.metadata.utils.CriterionUtils.buildCriterion;
36
import static org.mockito.ArgumentMatchers.any;
7+
import static org.mockito.ArgumentMatchers.eq;
8+
import static org.mockito.ArgumentMatchers.nullable;
49
import static org.mockito.Mockito.mock;
510
import static org.mockito.Mockito.reset;
611
import static org.mockito.Mockito.times;
712
import static org.mockito.Mockito.verify;
813
import static org.mockito.Mockito.verifyNoInteractions;
914
import static org.testng.Assert.assertEquals;
1015

16+
import com.google.common.collect.ImmutableList;
1117
import com.linkedin.common.Status;
1218
import com.linkedin.common.urn.Urn;
1319
import com.linkedin.common.urn.UrnUtils;
20+
import com.linkedin.container.Container;
1421
import com.linkedin.dataset.DatasetProperties;
1522
import com.linkedin.events.metadata.ChangeType;
1623
import com.linkedin.metadata.Constants;
@@ -21,6 +28,14 @@
2128
import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService;
2229
import com.linkedin.metadata.models.registry.EntityRegistry;
2330
import com.linkedin.metadata.models.registry.LineageRegistry;
31+
import com.linkedin.metadata.query.filter.Condition;
32+
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
33+
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
34+
import com.linkedin.metadata.query.filter.Criterion;
35+
import com.linkedin.metadata.query.filter.CriterionArray;
36+
import com.linkedin.metadata.query.filter.Filter;
37+
import com.linkedin.metadata.query.filter.RelationshipDirection;
38+
import com.linkedin.metadata.query.filter.RelationshipFilter;
2439
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
2540
import com.linkedin.metadata.search.elasticsearch.update.ESBulkProcessor;
2641
import com.linkedin.metadata.utils.GenericRecordUtils;
@@ -29,6 +44,8 @@
2944
import com.linkedin.mxe.MetadataChangeLog;
3045
import io.datahubproject.metadata.context.OperationContext;
3146
import io.datahubproject.test.metadata.context.TestOperationContexts;
47+
import java.util.List;
48+
import javax.annotation.Nonnull;
3249
import org.mockito.ArgumentCaptor;
3350
import org.opensearch.index.query.QueryBuilder;
3451
import org.opensearch.script.Script;
@@ -180,4 +197,109 @@ public void testStatusNoOpEvent() {
180197

181198
verifyNoInteractions(mockWriteDAO);
182199
}
200+
201+
@Test
202+
public void testMissingAspectGraphDelete() {
203+
// Test deleting a null aspect
204+
test.handleChangeEvent(
205+
TEST_OP_CONTEXT,
206+
new MetadataChangeLog()
207+
.setChangeType(ChangeType.DELETE)
208+
.setEntityType(TEST_URN.getEntityType())
209+
.setEntityUrn(TEST_URN)
210+
.setAspectName(Constants.CONTAINER_ASPECT_NAME));
211+
212+
// For missing aspects, verify no writes
213+
verifyNoInteractions(mockWriteDAO);
214+
}
215+
216+
@Test
217+
public void testNodeGraphDelete() {
218+
Urn containerUrn = UrnUtils.getUrn("urn:li:container:foo");
219+
220+
// Test deleting container entity
221+
test.handleChangeEvent(
222+
TEST_OP_CONTEXT,
223+
new MetadataChangeLog()
224+
.setChangeType(ChangeType.DELETE)
225+
.setEntityType(CONTAINER_ENTITY_NAME)
226+
.setEntityUrn(containerUrn)
227+
.setAspectName(Constants.CONTAINER_KEY_ASPECT_NAME));
228+
229+
// Delete all outgoing edges of this entity
230+
verify(mockWriteDAO, times(1))
231+
.deleteByQuery(
232+
eq(TEST_OP_CONTEXT),
233+
nullable(String.class),
234+
eq(createUrnFilter(containerUrn)),
235+
nullable(String.class),
236+
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
237+
eq(List.of()),
238+
eq(new RelationshipFilter().setDirection(RelationshipDirection.OUTGOING)));
239+
240+
// Delete all incoming edges of this entity
241+
verify(mockWriteDAO, times(1))
242+
.deleteByQuery(
243+
eq(TEST_OP_CONTEXT),
244+
nullable(String.class),
245+
eq(createUrnFilter(containerUrn)),
246+
nullable(String.class),
247+
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
248+
eq(List.of()),
249+
eq(new RelationshipFilter().setDirection(RelationshipDirection.INCOMING)));
250+
251+
// Delete all edges where this entity is a lifecycle owner
252+
verify(mockWriteDAO, times(1))
253+
.deleteByQuery(
254+
eq(TEST_OP_CONTEXT),
255+
nullable(String.class),
256+
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
257+
nullable(String.class),
258+
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
259+
eq(List.of()),
260+
eq(new RelationshipFilter().setDirection(RelationshipDirection.INCOMING)),
261+
eq(containerUrn.toString()));
262+
}
263+
264+
@Test
265+
public void testContainerDelete() {
266+
Urn containerUrn = UrnUtils.getUrn("urn:li:container:foo");
267+
268+
// Test deleting a container aspect
269+
test.handleChangeEvent(
270+
TEST_OP_CONTEXT,
271+
new MetadataChangeLog()
272+
.setChangeType(ChangeType.DELETE)
273+
.setEntityType(TEST_URN.getEntityType())
274+
.setEntityUrn(TEST_URN)
275+
.setAspectName(Constants.CONTAINER_ASPECT_NAME)
276+
.setPreviousAspectValue(
277+
GenericRecordUtils.serializeAspect(new Container().setContainer(containerUrn))));
278+
279+
// For container aspects, verify that only edges are removed in both cases
280+
verify(mockWriteDAO, times(1))
281+
.deleteByQuery(
282+
eq(TEST_OP_CONTEXT),
283+
nullable(String.class),
284+
eq(createUrnFilter(TEST_URN)),
285+
nullable(String.class),
286+
eq(new Filter().setOr(new ConjunctiveCriterionArray())),
287+
eq(List.of("IsPartOf")),
288+
eq(
289+
createRelationshipFilter(
290+
new Filter().setOr(new ConjunctiveCriterionArray()),
291+
RelationshipDirection.OUTGOING)));
292+
}
293+
294+
private static Filter createUrnFilter(@Nonnull final Urn urn) {
295+
Filter filter = new Filter();
296+
CriterionArray criterionArray = new CriterionArray();
297+
Criterion criterion = buildCriterion("urn", Condition.EQUAL, urn.toString());
298+
criterionArray.add(criterion);
299+
filter.setOr(
300+
new ConjunctiveCriterionArray(
301+
ImmutableList.of(new ConjunctiveCriterion().setAnd(criterionArray))));
302+
303+
return filter;
304+
}
183305
}

0 commit comments

Comments
 (0)