Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(platform): add support for via nodes #9733

Merged
merged 5 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,8 @@ private void configureQueryResolvers(final RuntimeWiring.Builder builder) {
"scrollAcrossEntities",
new ScrollAcrossEntitiesResolver(this.entityClient, this.viewService))
.dataFetcher(
"searchAcrossLineage", new SearchAcrossLineageResolver(this.entityClient))
"searchAcrossLineage",
new SearchAcrossLineageResolver(this.entityClient, this.entityRegistry))
.dataFetcher(
"scrollAcrossLineage", new ScrollAcrossLineageResolver(this.entityClient))
.dataFetcher(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ public static CompletableFuture<List<Entity>> batchLoadEntitiesOfSameType(
.filter(entity -> entities.get(0).getClass().isAssignableFrom(entity.objectClass()))
.collect(Collectors.toList()));

final DataLoader loader = dataLoaderRegistry.getDataLoader(filteredEntity.name());
List keyList = new ArrayList();
final DataLoader<Object, Entity> loader =
dataLoaderRegistry.getDataLoader(filteredEntity.name());
List<Object> keyList = new ArrayList();
for (Entity entity : entities) {
keyList.add(filteredEntity.getKeyProvider().apply(entity));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;
import static com.linkedin.datahub.graphql.resolvers.search.SearchUtils.*;
import static com.linkedin.metadata.Constants.QUERY_ENTITY_NAME;

import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
Expand All @@ -14,31 +16,63 @@
import com.linkedin.datahub.graphql.types.entitytype.EntityTypeMapper;
import com.linkedin.datahub.graphql.types.mappers.UrnSearchAcrossLineageResultsMapper;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.search.LineageSearchResult;
import com.linkedin.r2.RemoteInvocationException;
import graphql.VisibleForTesting;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/** Resolver responsible for resolving 'searchAcrossEntities' field of the Query type */
@Slf4j
@RequiredArgsConstructor
public class SearchAcrossLineageResolver
implements DataFetcher<CompletableFuture<SearchAcrossLineageResults>> {

private static final int DEFAULT_START = 0;
private static final int DEFAULT_COUNT = 10;

private static final Set<String> TRANSIENT_ENTITIES = ImmutableSet.of(QUERY_ENTITY_NAME);

private final EntityClient _entityClient;

private final EntityRegistry _entityRegistry;

@VisibleForTesting final Set<String> _allEntities;
private final List<String> _allowedEntities;

public SearchAcrossLineageResolver(EntityClient entityClient, EntityRegistry entityRegistry) {
this._entityClient = entityClient;
this._entityRegistry = entityRegistry;
this._allEntities =
entityRegistry.getEntitySpecs().values().stream()
.map(EntitySpec::getName)
.collect(Collectors.toSet());

this._allowedEntities =
this._allEntities.stream()
.filter(e -> !TRANSIENT_ENTITIES.contains(e))
.collect(Collectors.toList());
}

private List<String> getEntityNamesFromInput(List<EntityType> inputTypes) {
if (inputTypes != null && !inputTypes.isEmpty()) {
return inputTypes.stream().map(EntityTypeMapper::getName).collect(Collectors.toList());
} else {
return this._allowedEntities;
}
}

@Override
public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment environment)
throws URISyntaxException {
Expand All @@ -50,12 +84,7 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment

final LineageDirection lineageDirection = input.getDirection();

List<EntityType> entityTypes =
(input.getTypes() == null || input.getTypes().isEmpty())
? SEARCHABLE_ENTITY_TYPES
: input.getTypes();
List<String> entityNames =
entityTypes.stream().map(EntityTypeMapper::getName).collect(Collectors.toList());
List<String> entityNames = getEntityNamesFromInput(input.getTypes());

// escape forward slash since it is a reserved character in Elasticsearch
final String sanitizedQuery =
Expand Down Expand Up @@ -99,8 +128,7 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
} else {
searchFlags = new SearchFlags().setFulltext(true).setSkipHighlighting(true);
}

return UrnSearchAcrossLineageResultsMapper.map(
LineageSearchResult salResults =
_entityClient.searchAcrossLineage(
urn,
resolvedDirection,
Expand All @@ -114,7 +142,9 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
startTimeMillis,
endTimeMillis,
searchFlags,
ResolverUtils.getAuthentication(environment)));
getAuthentication(environment));

return UrnSearchAcrossLineageResultsMapper.map(salResults);
} catch (RemoteInvocationException e) {
log.error(
"Failed to execute search across relationships: source urn {}, direction {}, entity types {}, query {}, filters: {}, start: {}, count: {}",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.linkedin.datahub.graphql.types.common.mappers;

import com.linkedin.datahub.graphql.generated.GroupingCriterion;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
import javax.annotation.Nonnull;

public class GroupingCriterionInputMapper
implements ModelMapper<GroupingCriterion, com.linkedin.metadata.query.GroupingCriterion> {

public static final GroupingCriterionInputMapper INSTANCE = new GroupingCriterionInputMapper();

public static com.linkedin.metadata.query.GroupingCriterion map(
@Nonnull final GroupingCriterion groupingCriterion) {
return INSTANCE.apply(groupingCriterion);
}

@Override
public com.linkedin.metadata.query.GroupingCriterion apply(GroupingCriterion input) {
return new com.linkedin.metadata.query.GroupingCriterion()
.setRawEntityType(input.getRawEntityType())
.setGroupingEntityType(input.getGroupingEntityType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import com.linkedin.datahub.graphql.generated.SearchFlags;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
import com.linkedin.metadata.query.GroupingCriterionArray;
import com.linkedin.metadata.query.GroupingSpec;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;

/**
Expand Down Expand Up @@ -42,6 +45,15 @@ public com.linkedin.metadata.query.SearchFlags apply(@Nonnull final SearchFlags
if (searchFlags.getGetSuggestions() != null) {
result.setGetSuggestions(searchFlags.getGetSuggestions());
}
if (searchFlags.getGroupingSpec() != null) {
result.setGroupingSpec(
new GroupingSpec()
.setGroupingCriteria(
new GroupingCriterionArray(
searchFlags.getGroupingSpec().getGroupingCriteria().stream()
.map(GroupingCriterionInputMapper::map)
.collect(Collectors.toList()))));
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.linkedin.datahub.graphql.generated.MLPrimaryKey;
import com.linkedin.datahub.graphql.generated.Notebook;
import com.linkedin.datahub.graphql.generated.OwnershipTypeEntity;
import com.linkedin.datahub.graphql.generated.QueryEntity;
import com.linkedin.datahub.graphql.generated.Role;
import com.linkedin.datahub.graphql.generated.SchemaFieldEntity;
import com.linkedin.datahub.graphql.generated.StructuredPropertyEntity;
Expand Down Expand Up @@ -198,6 +199,11 @@ public Entity apply(Urn input) {
((StructuredPropertyEntity) partialEntity).setUrn(input.toString());
((StructuredPropertyEntity) partialEntity).setType(EntityType.STRUCTURED_PROPERTY);
}
if (input.getEntityType().equals(QUERY_ENTITY_NAME)) {
partialEntity = new QueryEntity();
((QueryEntity) partialEntity).setUrn(input.toString());
((QueryEntity) partialEntity).setType(EntityType.QUERY);
}
return partialEntity;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ private SearchAcrossLineageResult mapResult(LineageSearchEntity searchEntity) {
.setMatchedFields(getMatchedFieldEntry(searchEntity.getMatchedFields()))
.setPaths(searchEntity.getPaths().stream().map(this::mapPath).collect(Collectors.toList()))
.setDegree(searchEntity.getDegree())
.setDegrees(searchEntity.getDegrees().stream().collect(Collectors.toList()))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
public class QueryType
implements com.linkedin.datahub.graphql.types.EntityType<QueryEntity, String> {
Expand Down Expand Up @@ -50,6 +52,7 @@ public List<DataFetcherResult<QueryEntity>> batchLoad(
final List<Urn> viewUrns = urns.stream().map(UrnUtils::getUrn).collect(Collectors.toList());

try {
log.debug("Fetching query entities: {}", viewUrns);
final Map<Urn, EntityResponse> entities =
_entityClient.batchGetV2(
QUERY_ENTITY_NAME,
Expand Down
5 changes: 5 additions & 0 deletions datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -10948,6 +10948,11 @@ enum QuerySource {
The query was provided manually, e.g. from the UI.
"""
MANUAL

"""
The query was extracted by the system, e.g. from a dashboard.
"""
SYSTEM
}

"""
Expand Down
38 changes: 38 additions & 0 deletions datahub-graphql-core/src/main/resources/search.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ input SearchFlags {
Whether to request for search suggestions on the _entityName virtualized field
"""
getSuggestions: Boolean

"""
Additional grouping specifications to apply to the search results
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add maybe note:
Notice: This API is experimental and subject to change.

"""
groupingSpec: GroupingSpec
}

"""
Expand Down Expand Up @@ -278,6 +283,7 @@ input ScrollAcrossEntitiesInput {
searchFlags: SearchFlags
}


"""
Input arguments for a search query over the results of a multi-hop graph query
"""
Expand Down Expand Up @@ -669,6 +675,12 @@ type SearchAcrossLineageResult {
Degree of relationship (number of hops to get to entity)
"""
degree: Int!

"""
Degrees of relationship (for entities discoverable at multiple degrees)
"""
degrees: [Int!]

}

"""
Expand Down Expand Up @@ -1303,4 +1315,30 @@ input SortCriterion {
The order in which we will be sorting
"""
sortOrder: SortOrder!
}

"""
A grouping specification for search results
"""
input GroupingSpec {

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can we copy the PDL comment here

groupingCriteria: [GroupingCriterion!]!

}

"""
A single grouping criterion for grouping search results
"""
input GroupingCriterion {

"""
The raw entity type that needs to be grouped
"""
rawEntityType: String!
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


"""
The grouping entity type
"""
groupingEntityType: String!

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import com.linkedin.datahub.graphql.generated.SearchAcrossLineageResult;
import com.linkedin.datahub.graphql.generated.SearchAcrossLineageResults;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.search.AggregationMetadataArray;
import com.linkedin.metadata.search.LineageSearchEntity;
Expand All @@ -22,6 +24,7 @@
import com.linkedin.metadata.search.MatchedFieldArray;
import com.linkedin.metadata.search.SearchResultMetadata;
import graphql.schema.DataFetchingEnvironment;
import java.io.InputStream;
import java.util.Collections;
import java.util.List;
import org.testng.annotations.BeforeMethod;
Expand All @@ -43,13 +46,28 @@ public class SearchAcrossLineageResolverTest {
private Authentication _authentication;
private SearchAcrossLineageResolver _resolver;

private EntityRegistry _entityRegistry;

@BeforeMethod
public void setupTest() {
_entityClient = mock(EntityClient.class);
_dataFetchingEnvironment = mock(DataFetchingEnvironment.class);
_authentication = mock(Authentication.class);

_resolver = new SearchAcrossLineageResolver(_entityClient);
_entityRegistry = mock(EntityRegistry.class);
_resolver = new SearchAcrossLineageResolver(_entityClient, _entityRegistry);
}

@Test
public void testAllEntitiesInitialization() {
InputStream inputStream = ClassLoader.getSystemResourceAsStream("entity-registry.yml");
EntityRegistry entityRegistry = new ConfigEntityRegistry(inputStream);
SearchAcrossLineageResolver resolver =
new SearchAcrossLineageResolver(_entityClient, entityRegistry);
assertTrue(resolver._allEntities.contains("dataset"));
assertTrue(resolver._allEntities.contains("dataFlow"));
// Test for case sensitivity
assertFalse(resolver._allEntities.contains("dataflow"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public void testDefaultSearchFlags() throws Exception {
.setSkipAggregates(false)
.setSkipHighlighting(true) // empty/wildcard
.setMaxAggValues(20)
.setSkipCache(false));
.setSkipCache(false)
.setConvertSchemaFieldsToDatasets(true));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this flag intentional

}

@Test
Expand Down Expand Up @@ -82,7 +83,8 @@ public void testOverrideSearchFlags() throws Exception {
.setSkipAggregates(true)
.setSkipHighlighting(true)
.setMaxAggValues(10)
.setSkipCache(true));
.setSkipCache(true)
.setConvertSchemaFieldsToDatasets(true));
}

@Test
Expand Down Expand Up @@ -112,7 +114,8 @@ public void testNonWildCardSearchFlags() throws Exception {
.setSkipAggregates(false)
.setSkipHighlighting(false) // empty/wildcard
.setMaxAggValues(20)
.setSkipCache(false));
.setSkipCache(false)
.setConvertSchemaFieldsToDatasets(true));
}

private EntityClient initMockSearchEntityClient() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.linkedin.datahub.upgrade.config;

import com.linkedin.datahub.upgrade.system.via.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.entity.EntityService;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ReindexDataJobViaNodesCLLConfig {

@Bean
public ReindexDataJobViaNodesCLL _reindexDataJobViaNodesCLL(EntityService<?> entityService) {
return new ReindexDataJobViaNodesCLL(entityService);
}
}
Loading
Loading