Skip to content

Commit 17de393

Browse files
fix(lineage) Support views and sorting in impact analysis (#12769)
1 parent 3f6b8c1 commit 17de393

File tree

7 files changed

+389
-8
lines changed

7 files changed

+389
-8
lines changed

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/GmsGraphQLEngine.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -1026,7 +1026,8 @@ private void configureQueryResolvers(final RuntimeWiring.Builder builder) {
10261026
new ScrollAcrossEntitiesResolver(this.entityClient, this.viewService))
10271027
.dataFetcher(
10281028
"searchAcrossLineage",
1029-
new SearchAcrossLineageResolver(this.entityClient, this.entityRegistry))
1029+
new SearchAcrossLineageResolver(
1030+
this.entityClient, this.entityRegistry, this.viewService))
10301031
.dataFetcher(
10311032
"scrollAcrossLineage", new ScrollAcrossLineageResolver(this.entityClient))
10321033
.dataFetcher(

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/search/SearchAcrossLineageResolver.java

+26-3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import com.google.common.collect.ImmutableSet;
88
import com.linkedin.common.urn.Urn;
9+
import com.linkedin.common.urn.UrnUtils;
910
import com.linkedin.datahub.graphql.QueryContext;
1011
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
1112
import com.linkedin.datahub.graphql.generated.AndFilterInput;
@@ -25,8 +26,12 @@
2526
import com.linkedin.metadata.query.LineageFlags;
2627
import com.linkedin.metadata.query.SearchFlags;
2728
import com.linkedin.metadata.query.filter.Filter;
29+
import com.linkedin.metadata.query.filter.SortCriterion;
2830
import com.linkedin.metadata.search.LineageSearchResult;
31+
import com.linkedin.metadata.service.ViewService;
32+
import com.linkedin.metadata.utils.elasticsearch.FilterUtils;
2933
import com.linkedin.r2.RemoteInvocationException;
34+
import com.linkedin.view.DataHubViewInfo;
3035
import graphql.VisibleForTesting;
3136
import graphql.schema.DataFetcher;
3237
import graphql.schema.DataFetchingEnvironment;
@@ -53,10 +58,13 @@ public class SearchAcrossLineageResolver
5358

5459
private final EntityRegistry _entityRegistry;
5560

61+
private final ViewService _viewService;
62+
5663
@VisibleForTesting final Set<String> _allEntities;
5764
private final List<String> _allowedEntities;
5865

59-
public SearchAcrossLineageResolver(EntityClient entityClient, EntityRegistry entityRegistry) {
66+
public SearchAcrossLineageResolver(
67+
EntityClient entityClient, EntityRegistry entityRegistry, final ViewService viewService) {
6068
this._entityClient = entityClient;
6169
this._entityRegistry = entityRegistry;
6270
this._allEntities =
@@ -68,6 +76,8 @@ public SearchAcrossLineageResolver(EntityClient entityClient, EntityRegistry ent
6876
this._allEntities.stream()
6977
.filter(e -> !TRANSIENT_ENTITIES.contains(e))
7078
.collect(Collectors.toList());
79+
80+
this._viewService = viewService;
7181
}
7282

7383
private List<String> getEntityNamesFromInput(List<EntityType> inputTypes) {
@@ -127,6 +137,13 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
127137
com.linkedin.metadata.graph.LineageDirection.valueOf(lineageDirection.toString());
128138
return GraphQLConcurrencyUtils.supplyAsync(
129139
() -> {
140+
final DataHubViewInfo maybeResolvedView =
141+
(input.getViewUrn() != null)
142+
? resolveView(
143+
context.getOperationContext(),
144+
_viewService,
145+
UrnUtils.getUrn(input.getViewUrn()))
146+
: null;
130147
try {
131148
log.debug(
132149
"Executing search across relationships: source urn {}, direction {}, entity types {}, query {}, filters: {}, start: {}, count: {}",
@@ -138,8 +155,13 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
138155
start,
139156
count);
140157

141-
final Filter filter =
158+
final Filter baseFilter =
142159
ResolverUtils.buildFilter(input.getFilters(), input.getOrFilters());
160+
Filter filter =
161+
maybeResolvedView != null
162+
? FilterUtils.combineFilters(
163+
baseFilter, maybeResolvedView.getDefinition().getFilter())
164+
: baseFilter;
143165
final SearchFlags searchFlags;
144166
com.linkedin.datahub.graphql.generated.SearchFlags inputFlags = input.getSearchFlags();
145167
if (inputFlags != null) {
@@ -150,6 +172,7 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
150172
} else {
151173
searchFlags = new SearchFlags().setFulltext(true).setSkipHighlighting(true);
152174
}
175+
List<SortCriterion> sortCriteria = SearchUtils.getSortCriteria(input.getSortInput());
153176
LineageSearchResult salResults =
154177
_entityClient.searchAcrossLineage(
155178
context
@@ -162,7 +185,7 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
162185
sanitizedQuery,
163186
maxHops,
164187
filter,
165-
null,
188+
sortCriteria,
166189
start,
167190
count);
168191

datahub-graphql-core/src/main/resources/search.graphql

+10
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,16 @@ input SearchAcrossLineageInput {
417417
Flags controlling the lineage query
418418
"""
419419
lineageFlags: LineageFlags
420+
421+
"""
422+
Optional - A View to apply when generating results
423+
"""
424+
viewUrn: String
425+
426+
"""
427+
Optional - Information on how to sort this search result
428+
"""
429+
sortInput: SearchSortInput
420430
}
421431

422432
"""

datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/search/ScrollAcrossLineageResolverTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.linkedin.metadata.search.LineageSearchEntityArray;
3232
import com.linkedin.metadata.search.MatchedFieldArray;
3333
import com.linkedin.metadata.search.SearchResultMetadata;
34+
import com.linkedin.metadata.service.ViewService;
3435
import graphql.schema.DataFetchingEnvironment;
3536
import io.datahubproject.metadata.context.OperationContext;
3637
import java.io.InputStream;
@@ -76,7 +77,7 @@ public void testAllEntitiesInitialization() {
7677
InputStream inputStream = ClassLoader.getSystemResourceAsStream("entity-registry.yml");
7778
EntityRegistry entityRegistry = new ConfigEntityRegistry(inputStream);
7879
SearchAcrossLineageResolver resolver =
79-
new SearchAcrossLineageResolver(_entityClient, entityRegistry);
80+
new SearchAcrossLineageResolver(_entityClient, entityRegistry, mock(ViewService.class));
8081
assertTrue(resolver._allEntities.contains("dataset"));
8182
assertTrue(resolver._allEntities.contains("dataFlow"));
8283
// Test for case sensitivity

datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/search/SearchAcrossLineageResolverTest.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.linkedin.metadata.search.LineageSearchResult;
2323
import com.linkedin.metadata.search.MatchedFieldArray;
2424
import com.linkedin.metadata.search.SearchResultMetadata;
25+
import com.linkedin.metadata.service.ViewService;
2526
import graphql.schema.DataFetchingEnvironment;
2627
import io.datahubproject.metadata.context.OperationContext;
2728
import java.io.InputStream;
@@ -48,6 +49,7 @@ public class SearchAcrossLineageResolverTest {
4849
private SearchAcrossLineageResolver _resolver;
4950

5051
private EntityRegistry _entityRegistry;
52+
private ViewService _viewService;
5153

5254
@BeforeMethod
5355
public void setupTest() {
@@ -56,15 +58,16 @@ public void setupTest() {
5658
_authentication = mock(Authentication.class);
5759

5860
_entityRegistry = mock(EntityRegistry.class);
59-
_resolver = new SearchAcrossLineageResolver(_entityClient, _entityRegistry);
61+
_viewService = mock(ViewService.class);
62+
_resolver = new SearchAcrossLineageResolver(_entityClient, _entityRegistry, _viewService);
6063
}
6164

6265
@Test
6366
public void testAllEntitiesInitialization() {
6467
InputStream inputStream = ClassLoader.getSystemResourceAsStream("entity-registry.yml");
6568
EntityRegistry entityRegistry = new ConfigEntityRegistry(inputStream);
6669
SearchAcrossLineageResolver resolver =
67-
new SearchAcrossLineageResolver(_entityClient, entityRegistry);
70+
new SearchAcrossLineageResolver(_entityClient, entityRegistry, _viewService);
6871
assertTrue(resolver._allEntities.contains("dataset"));
6972
assertTrue(resolver._allEntities.contains("dataFlow"));
7073
// Test for case sensitivity
@@ -115,7 +118,7 @@ public void testSearchAcrossLineage() throws Exception {
115118
eq(QUERY),
116119
eq(null),
117120
any(),
118-
eq(null),
121+
eq(Collections.emptyList()),
119122
eq(START),
120123
eq(COUNT)))
121124
.thenReturn(lineageSearchResult);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package com.linkedin.metadata.utils.elasticsearch;
2+
3+
import com.google.common.collect.ImmutableList;
4+
import com.linkedin.data.template.StringArray;
5+
import com.linkedin.metadata.query.filter.Condition;
6+
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
7+
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
8+
import com.linkedin.metadata.query.filter.Criterion;
9+
import com.linkedin.metadata.query.filter.CriterionArray;
10+
import com.linkedin.metadata.query.filter.Filter;
11+
import com.linkedin.metadata.utils.CriterionUtils;
12+
import java.util.ArrayList;
13+
import java.util.Collections;
14+
import java.util.List;
15+
import javax.annotation.Nonnull;
16+
import javax.annotation.Nullable;
17+
18+
public class FilterUtils {
19+
20+
/**
21+
* Combines two {@link Filter} instances in a conjunction and returns a new instance of {@link
22+
* Filter} in disjunctive normal form.
23+
*
24+
* @param baseFilter the filter to apply the view to
25+
* @param viewFilter the view filter, null if it doesn't exist
26+
* @return a new instance of {@link Filter} representing the applied view.
27+
*/
28+
@Nonnull
29+
public static Filter combineFilters(
30+
@Nullable final Filter baseFilter, @Nonnull final Filter viewFilter) {
31+
final Filter finalBaseFilter =
32+
baseFilter == null
33+
? new Filter().setOr(new ConjunctiveCriterionArray(Collections.emptyList()))
34+
: baseFilter;
35+
36+
// Join the filter conditions in Disjunctive Normal Form.
37+
return combineFiltersInConjunction(finalBaseFilter, viewFilter);
38+
}
39+
40+
/**
41+
* Joins two filters in conjunction by reducing to Disjunctive Normal Form.
42+
*
43+
* @param filter1 the first filter in the pair
44+
* @param filter2 the second filter in the pair
45+
* <p>This method supports either Filter format, where the "or" field is used, instead of
46+
* criteria. If the criteria filter is used, then it will be converted into an "OR" before
47+
* returning the new filter.
48+
* @return the result of joining the 2 filters in a conjunction (AND)
49+
* <p>How does it work? It basically cross-products the conjunctions inside of each Filter
50+
* clause.
51+
* <p>Example Inputs: filter1 -> { or: [ { and: [ { field: tags, condition: EQUAL, values:
52+
* ["urn:li:tag:tag"] } ] }, { and: [ { field: glossaryTerms, condition: EQUAL, values:
53+
* ["urn:li:glossaryTerm:term"] } ] } ] } filter2 -> { or: [ { and: [ { field: domain,
54+
* condition: EQUAL, values: ["urn:li:domain:domain"] }, ] }, { and: [ { field: glossaryTerms,
55+
* condition: EQUAL, values: ["urn:li:glossaryTerm:term2"] } ] } ] } Example Output: { or: [ {
56+
* and: [ { field: tags, condition: EQUAL, values: ["urn:li:tag:tag"] }, { field: domain,
57+
* condition: EQUAL, values: ["urn:li:domain:domain"] } ] }, { and: [ { field: tags,
58+
* condition: EQUAL, values: ["urn:li:tag:tag"] }, { field: glossaryTerms, condition: EQUAL,
59+
* values: ["urn:li:glosaryTerm:term2"] } ] }, { and: [ { field: glossaryTerm, condition:
60+
* EQUAL, values: ["urn:li:glossaryTerm:term"] }, { field: domain, condition: EQUAL, values:
61+
* ["urn:li:domain:domain"] } ] }, { and: [ { field: glossaryTerm, condition: EQUAL, values:
62+
* ["urn:li:glossaryTerm:term"] }, { field: glossaryTerms, condition: EQUAL, values:
63+
* ["urn:li:glosaryTerm:term2"] } ] }, ] }
64+
*/
65+
@Nonnull
66+
private static Filter combineFiltersInConjunction(
67+
@Nonnull final Filter filter1, @Nonnull final Filter filter2) {
68+
69+
final Filter finalFilter1 = convertToV2Filter(filter1);
70+
final Filter finalFilter2 = convertToV2Filter(filter2);
71+
72+
// If either filter is empty, simply return the other filter.
73+
if (!finalFilter1.hasOr() || finalFilter1.getOr().size() == 0) {
74+
return finalFilter2;
75+
}
76+
if (!finalFilter2.hasOr() || finalFilter2.getOr().size() == 0) {
77+
return finalFilter1;
78+
}
79+
80+
// Iterate through the base filter, then cross-product with filter 2 conditions.
81+
final Filter result = new Filter();
82+
final List<ConjunctiveCriterion> newDisjunction = new ArrayList<>();
83+
for (ConjunctiveCriterion conjunction1 : finalFilter1.getOr()) {
84+
for (ConjunctiveCriterion conjunction2 : finalFilter2.getOr()) {
85+
final List<Criterion> joinedCriterion = new ArrayList<>(conjunction1.getAnd());
86+
joinedCriterion.addAll(conjunction2.getAnd());
87+
ConjunctiveCriterion newConjunction =
88+
new ConjunctiveCriterion().setAnd(new CriterionArray(joinedCriterion));
89+
newDisjunction.add(newConjunction);
90+
}
91+
}
92+
result.setOr(new ConjunctiveCriterionArray(newDisjunction));
93+
return result;
94+
}
95+
96+
@Nonnull
97+
private static Filter convertToV2Filter(@Nonnull Filter filter) {
98+
if (filter.hasOr()) {
99+
return filter;
100+
} else if (filter.hasCriteria()) {
101+
// Convert criteria to an OR
102+
return new Filter()
103+
.setOr(
104+
new ConjunctiveCriterionArray(
105+
ImmutableList.of(new ConjunctiveCriterion().setAnd(filter.getCriteria()))));
106+
}
107+
throw new IllegalArgumentException(
108+
String.format(
109+
"Illegal filter provided! Neither 'or' nor 'criteria' fields were populated for filter %s",
110+
filter));
111+
}
112+
113+
@Nonnull
114+
public static Filter createValuesFilter(
115+
@Nonnull final String fieldName, @Nonnull final List<String> values) {
116+
Filter filter = new Filter();
117+
CriterionArray criterionArray = new CriterionArray();
118+
119+
StringArray valuesArray = new StringArray();
120+
valuesArray.addAll(values);
121+
Criterion criterion = CriterionUtils.buildCriterion(fieldName, Condition.EQUAL, valuesArray);
122+
123+
criterionArray.add(criterion);
124+
filter.setOr(
125+
new ConjunctiveCriterionArray(
126+
ImmutableList.of(new ConjunctiveCriterion().setAnd(criterionArray))));
127+
128+
return filter;
129+
}
130+
}

0 commit comments

Comments
 (0)