Skip to content

Commit 17ec197

Browse files
authored
Merge branch 'master' into peter/column-level-lineage
2 parents 6a0b886 + f507e2c commit 17ec197

File tree

78 files changed

+5285
-530
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+5285
-530
lines changed

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/incident/IncidentUtils.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.linkedin.common.AuditStamp;
77
import com.linkedin.common.urn.Urn;
88
import com.linkedin.common.urn.UrnUtils;
9+
import com.linkedin.data.template.SetMode;
910
import com.linkedin.datahub.graphql.QueryContext;
1011
import com.linkedin.datahub.graphql.authorization.AuthorizationUtils;
1112
import com.linkedin.datahub.graphql.generated.IncidentPriority;
@@ -92,12 +93,13 @@ public static IncidentStatus mapIncidentStatus(
9293

9394
IncidentStatus status = new IncidentStatus();
9495
status.setState(IncidentState.valueOf(input.getState().toString()));
95-
if (input.getStage() != null) {
96-
status.setStage(IncidentStage.valueOf(input.getStage().toString()));
97-
}
96+
status.setStage(
97+
input.getStage() == null ? null : IncidentStage.valueOf(input.getStage().toString()),
98+
SetMode.REMOVE_IF_NULL);
9899
if (input.getMessage() != null) {
99100
status.setMessage(input.getMessage());
100101
}
102+
status.setLastUpdated(auditStamp);
101103
return status;
102104
}
103105

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/incident/UpdateIncidentResolver.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.linkedin.common.UrnArray;
1313
import com.linkedin.common.urn.Urn;
1414
import com.linkedin.common.urn.UrnUtils;
15+
import com.linkedin.data.template.SetMode;
1516
import com.linkedin.datahub.graphql.QueryContext;
1617
import com.linkedin.datahub.graphql.authorization.AuthorizationUtils;
1718
import com.linkedin.datahub.graphql.exception.AuthorizationException;
@@ -116,18 +117,19 @@ private void updateIncidentInfo(
116117
if (input.getDescription() != null) {
117118
info.setDescription(input.getDescription());
118119
}
119-
if (input.getPriority() != null) {
120-
info.setPriority(IncidentUtils.mapIncidentPriority(input.getPriority()));
121-
}
122-
if (input.getAssigneeUrns() != null) {
123-
info.setAssignees(IncidentUtils.mapIncidentAssignees(input.getAssigneeUrns(), actorStamp));
124-
}
125120
if (input.getStatus() != null) {
126121
info.setStatus(IncidentUtils.mapIncidentStatus(input.getStatus(), actorStamp));
127122
}
128123
if (input.getResourceUrns() != null && !input.getResourceUrns().isEmpty()) {
129124
info.setEntities(new UrnArray(IncidentUtils.stringsToUrns(input.getResourceUrns())));
130125
}
126+
127+
info.setPriority(
128+
IncidentUtils.mapIncidentPriority(input.getPriority()), SetMode.REMOVE_IF_NULL);
129+
130+
info.setAssignees(
131+
IncidentUtils.mapIncidentAssignees(input.getAssigneeUrns(), actorStamp),
132+
SetMode.REMOVE_IF_NULL);
131133
}
132134

133135
private boolean isAuthorizedToUpdateIncident(final Urn resourceUrn, final QueryContext context) {

datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/types/glossary/GlossaryNodeType.java

+49-1
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,22 @@
1111
import com.linkedin.common.urn.Urn;
1212
import com.linkedin.common.urn.UrnUtils;
1313
import com.linkedin.datahub.graphql.QueryContext;
14+
import com.linkedin.datahub.graphql.generated.AutoCompleteResults;
1415
import com.linkedin.datahub.graphql.generated.Entity;
1516
import com.linkedin.datahub.graphql.generated.EntityType;
17+
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
1618
import com.linkedin.datahub.graphql.generated.GlossaryNode;
19+
import com.linkedin.datahub.graphql.generated.SearchResults;
20+
import com.linkedin.datahub.graphql.resolvers.ResolverUtils;
21+
import com.linkedin.datahub.graphql.types.SearchableEntityType;
1722
import com.linkedin.datahub.graphql.types.glossary.mappers.GlossaryNodeMapper;
23+
import com.linkedin.datahub.graphql.types.mappers.AutoCompleteResultsMapper;
24+
import com.linkedin.datahub.graphql.types.mappers.UrnSearchResultsMapper;
1825
import com.linkedin.entity.EntityResponse;
1926
import com.linkedin.entity.client.EntityClient;
27+
import com.linkedin.metadata.query.AutoCompleteResult;
28+
import com.linkedin.metadata.query.filter.Filter;
29+
import com.linkedin.metadata.search.SearchResult;
2030
import graphql.execution.DataFetcherResult;
2131
import java.util.ArrayList;
2232
import java.util.HashSet;
@@ -25,9 +35,12 @@
2535
import java.util.Set;
2636
import java.util.function.Function;
2737
import java.util.stream.Collectors;
38+
import javax.annotation.Nonnull;
39+
import javax.annotation.Nullable;
2840

2941
public class GlossaryNodeType
30-
implements com.linkedin.datahub.graphql.types.EntityType<GlossaryNode, String> {
42+
implements SearchableEntityType<GlossaryNode, String>,
43+
com.linkedin.datahub.graphql.types.EntityType<GlossaryNode, String> {
3144

3245
static final Set<String> ASPECTS_TO_RESOLVE =
3346
ImmutableSet.of(
@@ -89,4 +102,39 @@ public List<DataFetcherResult<GlossaryNode>> batchLoad(
89102
throw new RuntimeException("Failed to batch load GlossaryNodes", e);
90103
}
91104
}
105+
106+
@Override
107+
public SearchResults search(
108+
@Nonnull String query,
109+
@Nullable List<FacetFilterInput> filters,
110+
int start,
111+
int count,
112+
@Nonnull final QueryContext context)
113+
throws Exception {
114+
final Map<String, String> facetFilters =
115+
ResolverUtils.buildFacetFilters(filters, ImmutableSet.of());
116+
final SearchResult searchResult =
117+
_entityClient.search(
118+
context.getOperationContext().withSearchFlags(flags -> flags.setFulltext(true)),
119+
GLOSSARY_NODE_ENTITY_NAME,
120+
query,
121+
facetFilters,
122+
start,
123+
count);
124+
return UrnSearchResultsMapper.map(context, searchResult);
125+
}
126+
127+
@Override
128+
public AutoCompleteResults autoComplete(
129+
@Nonnull String query,
130+
@Nullable String field,
131+
@Nullable Filter filters,
132+
int limit,
133+
@Nonnull final QueryContext context)
134+
throws Exception {
135+
final AutoCompleteResult result =
136+
_entityClient.autoComplete(
137+
context.getOperationContext(), GLOSSARY_NODE_ENTITY_NAME, query, filters, limit);
138+
return AutoCompleteResultsMapper.map(context, result);
139+
}
92140
}

datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/incident/UpdateIncidentResolverTest.java

+2-22
Original file line numberDiff line numberDiff line change
@@ -53,17 +53,7 @@ public void testGetSuccessAllFields() throws Exception {
5353
existingInfo.setEntities(
5454
new UrnArray(ImmutableList.of(UrnUtils.getUrn("urn:li:dataset:(test,test,test)"))));
5555
existingInfo.setStatus(
56-
new IncidentStatus()
57-
.setState(IncidentState.ACTIVE)
58-
.setStage(IncidentStage.INVESTIGATION)
59-
.setMessage("Message"));
60-
existingInfo.setAssignees(
61-
new IncidentAssigneeArray(
62-
ImmutableList.of(
63-
new IncidentAssignee()
64-
.setActor(UrnUtils.getUrn("urn:li:corpuser:test"))
65-
.setAssignedAt(new AuditStamp()))));
66-
existingInfo.setPriority(0);
56+
new IncidentStatus().setState(IncidentState.ACTIVE).setMessage("Message"));
6757
existingInfo.setSource(new IncidentSource().setType(IncidentSourceType.MANUAL));
6858

6959
EntityService mockEntityService = Mockito.mock(EntityService.class);
@@ -198,17 +188,7 @@ public void testGetSuccessRequiredFields() throws Exception {
198188
expectedInfo.setEntities(
199189
new UrnArray(ImmutableList.of(UrnUtils.getUrn("urn:li:dataset:(test,test,test)"))));
200190
expectedInfo.setStatus(
201-
new IncidentStatus()
202-
.setState(IncidentState.ACTIVE)
203-
.setStage(IncidentStage.INVESTIGATION)
204-
.setMessage("Message"));
205-
expectedInfo.setAssignees(
206-
new IncidentAssigneeArray(
207-
ImmutableList.of(
208-
new IncidentAssignee()
209-
.setActor(UrnUtils.getUrn("urn:li:corpuser:test"))
210-
.setAssignedAt(new AuditStamp()))));
211-
expectedInfo.setPriority(0);
191+
new IncidentStatus().setState(IncidentState.ACTIVE).setMessage("Message"));
212192
expectedInfo.setSource(new IncidentSource().setType(IncidentSourceType.MANUAL));
213193

214194
// Verify entity client

datahub-upgrade/build.gradle

+4-1
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,9 @@ dependencies {
8888
testImplementation 'uk.org.webcompere:system-stubs-testng:2.1.7'
8989
testRuntimeOnly externalDependency.logbackClassic
9090

91+
testImplementation externalDependency.h2
92+
testImplementation testFixtures(project(':metadata-io'))
93+
9194
constraints {
9295
implementation(implementation externalDependency.parquetHadoop) {
9396
because("CVE-2022-42003")
@@ -137,7 +140,7 @@ task runRestoreIndices(type: Exec) {
137140
"-jar",
138141
"-Dkafka.schemaRegistry.url=http://localhost:8080/schema-registry/api",
139142
"-Dserver.port=8083",
140-
bootJar.getArchiveFile().get(), "-u", "RestoreIndices", "-a", "batchSize=100"
143+
bootJar.getArchiveFile().get(), "-u", "RestoreIndices", "-a", "batchSize=100", "-a", "createDefaultAspects=true"
141144
}
142145

143146
task runRestoreIndicesUrn(type: Exec) {

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/RestoreIndices.java

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public class RestoreIndices implements Upgrade {
2626
public static final String URN_ARG_NAME = "urn";
2727
public static final String URN_LIKE_ARG_NAME = "urnLike";
2828
public static final String URN_BASED_PAGINATION_ARG_NAME = "urnBasedPagination";
29+
public static final String CREATE_DEFAULT_ASPECTS_ARG_NAME = "createDefaultAspects";
2930

3031
public static final String STARTING_OFFSET_ARG_NAME = "startingOffset";
3132
public static final String LAST_URN_ARG_NAME = "lastUrn";

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/restoreindices/SendMAEStep.java

+21-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static com.linkedin.metadata.Constants.ASPECT_LATEST_VERSION;
44

5+
import com.google.common.annotations.VisibleForTesting;
56
import com.linkedin.datahub.upgrade.UpgradeContext;
67
import com.linkedin.datahub.upgrade.UpgradeStep;
78
import com.linkedin.datahub.upgrade.UpgradeStepResult;
@@ -80,9 +81,11 @@ private List<RestoreIndicesResult> iterateFutures(List<Future<RestoreIndicesResu
8081
if (future.isDone()) {
8182
try {
8283
result.add(future.get());
83-
futures.remove(future);
8484
} catch (InterruptedException | ExecutionException e) {
8585
log.error("Error iterating futures", e);
86+
result.add(null); // add null to indicate failure
87+
} finally {
88+
futures.remove(future);
8689
}
8790
}
8891
}
@@ -101,6 +104,7 @@ private RestoreIndicesArgs getArgs(UpgradeContext context) {
101104
result.batchDelayMs = getBatchDelayMs(context.parsedArgs());
102105
result.start = getStartingOffset(context.parsedArgs());
103106
result.urnBasedPagination = getUrnBasedPagination(context.parsedArgs());
107+
result.createDefaultAspects = getCreateDefaultAspects(context.parsedArgs());
104108
if (containsKey(context.parsedArgs(), RestoreIndices.ASPECT_NAME_ARG_NAME)) {
105109
result.aspectName = context.parsedArgs().get(RestoreIndices.ASPECT_NAME_ARG_NAME).get();
106110
context.report().addLine(String.format("aspect is %s", result.aspectName));
@@ -151,7 +155,8 @@ private RestoreIndicesArgs getArgs(UpgradeContext context) {
151155
return result;
152156
}
153157

154-
private int getRowCount(RestoreIndicesArgs args) {
158+
@VisibleForTesting
159+
int getRowCount(RestoreIndicesArgs args) {
155160
ExpressionList<EbeanAspectV2> countExp =
156161
_server
157162
.find(EbeanAspectV2.class)
@@ -233,6 +238,10 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
233238
while (futures.size() > 0) {
234239
List<RestoreIndicesResult> tmpResults = iterateFutures(futures);
235240
for (RestoreIndicesResult tmpResult : tmpResults) {
241+
if (tmpResult == null) {
242+
// return error if there was an error processing a future
243+
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.FAILED);
244+
}
236245
reportStats(context, finalJobResult, tmpResult, rowCount, startTime);
237246
}
238247
}
@@ -315,6 +324,16 @@ private long getBatchDelayMs(final Map<String, Optional<String>> parsedArgs) {
315324
return resolvedBatchDelayMs;
316325
}
317326

327+
private boolean getCreateDefaultAspects(final Map<String, Optional<String>> parsedArgs) {
328+
Boolean createDefaultAspects = null;
329+
if (containsKey(parsedArgs, RestoreIndices.CREATE_DEFAULT_ASPECTS_ARG_NAME)) {
330+
createDefaultAspects =
331+
Boolean.parseBoolean(
332+
parsedArgs.get(RestoreIndices.CREATE_DEFAULT_ASPECTS_ARG_NAME).get());
333+
}
334+
return createDefaultAspects != null ? createDefaultAspects : false;
335+
}
336+
318337
private int getThreadCount(final Map<String, Optional<String>> parsedArgs) {
319338
return getInt(parsedArgs, DEFAULT_THREADS, RestoreIndices.NUM_THREADS_ARG_NAME);
320339
}

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/AbstractMCLStep.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,9 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
113113
List<Pair<Future<?>, SystemAspect>> futures;
114114
futures =
115115
EntityUtils.toSystemAspectFromEbeanAspects(
116-
opContext.getRetrieverContext(), batch.collect(Collectors.toList()))
116+
opContext.getRetrieverContext(),
117+
batch.collect(Collectors.toList()),
118+
false)
117119
.stream()
118120
.map(
119121
systemAspect -> {

datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/schemafield/GenerateSchemaFieldsFromSchemaMetadataStep.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,8 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
175175
ebeanAspectV2 ->
176176
EntityUtils.toSystemAspectFromEbeanAspects(
177177
opContext.getRetrieverContext(),
178-
Set.of(ebeanAspectV2))
178+
Set.of(ebeanAspectV2),
179+
true)
179180
.stream())
180181
.map(
181182
systemAspect ->

0 commit comments

Comments
 (0)