Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(models): update mlflow-related mappers #12263

Merged
merged 19 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.linkedin.datahub.graphql.generated.DataJobInputOutput;
import com.linkedin.datahub.graphql.generated.DataPlatform;
import com.linkedin.datahub.graphql.generated.DataPlatformInstance;
import com.linkedin.datahub.graphql.generated.DataProcessInstance;
import com.linkedin.datahub.graphql.generated.DataQualityContract;
import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.DatasetStatsSummary;
Expand Down Expand Up @@ -346,6 +347,7 @@
import com.linkedin.datahub.graphql.types.datajob.DataJobType;
import com.linkedin.datahub.graphql.types.dataplatform.DataPlatformType;
import com.linkedin.datahub.graphql.types.dataplatforminstance.DataPlatformInstanceType;
import com.linkedin.datahub.graphql.types.dataprocessinst.DataProcessInstanceType;
import com.linkedin.datahub.graphql.types.dataprocessinst.mappers.DataProcessInstanceRunEventMapper;
import com.linkedin.datahub.graphql.types.dataproduct.DataProductType;
import com.linkedin.datahub.graphql.types.dataset.DatasetType;
Expand Down Expand Up @@ -530,6 +532,7 @@ public class GmsGraphQLEngine {
private final FormType formType;
private final IncidentType incidentType;
private final RestrictedType restrictedType;
private final DataProcessInstanceType dataProcessInstanceType;

private final int graphQLQueryComplexityLimit;
private final int graphQLQueryDepthLimit;
Expand Down Expand Up @@ -649,6 +652,7 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
this.formType = new FormType(entityClient);
this.incidentType = new IncidentType(entityClient);
this.restrictedType = new RestrictedType(entityClient, restrictedService);
this.dataProcessInstanceType = new DataProcessInstanceType(entityClient, featureFlags);

this.graphQLQueryComplexityLimit = args.graphQLQueryComplexityLimit;
this.graphQLQueryDepthLimit = args.graphQLQueryDepthLimit;
Expand Down Expand Up @@ -699,7 +703,8 @@ public GmsGraphQLEngine(final GmsGraphQLEngineArgs args) {
formType,
incidentType,
restrictedType,
businessAttributeType));
businessAttributeType,
dataProcessInstanceType));
this.loadableTypes = new ArrayList<>(entityTypes);
// Extend loadable types with types from the plugins
// This allows us to offer search and browse capabilities out of the box for
Expand Down Expand Up @@ -1024,6 +1029,7 @@ private void configureQueryResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("tag", getResolver(tagType))
.dataFetcher("dataFlow", getResolver(dataFlowType))
.dataFetcher("dataJob", getResolver(dataJobType))
.dataFetcher("dataProcessInstance", getResolver(dataProcessInstanceType))
.dataFetcher("glossaryTerm", getResolver(glossaryTermType))
.dataFetcher("glossaryNode", getResolver(glossaryNodeType))
.dataFetcher("domain", getResolver((domainType)))
Expand Down Expand Up @@ -3058,6 +3064,35 @@ private void configureDataProcessInstanceResolvers(final RuntimeWiring.Builder b
"DataProcessInstance",
typeWiring ->
typeWiring
.dataFetcher(
"dataPlatformInstance",
new LoadableTypeResolver<>(
dataPlatformInstanceType,
(env) -> {
final DataProcessInstance dataProcessInstance = env.getSource();
return dataProcessInstance.getDataPlatformInstance() != null
? dataProcessInstance.getDataPlatformInstance().getUrn()
: null;
}))
.dataFetcher(
"platform",
new LoadableTypeResolver<>(
dataPlatformType,
(env) -> {
final DataProcessInstance dataProcessInstance = env.getSource();
return dataProcessInstance.getPlatform() != null
? dataProcessInstance.getPlatform().getUrn()
: null;
}))
.dataFetcher("parentContainers", new ParentContainersResolver(entityClient))
.dataFetcher(
"container",
new LoadableTypeResolver<>(
containerType,
(env) -> {
final DataProcessInstance dpi = env.getSource();
return dpi.getContainer() != null ? dpi.getContainer().getUrn() : null;
}))
.dataFetcher("relationships", new EntityRelationshipsResultResolver(graphClient))
.dataFetcher(
"lineage",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.datahub.graphql.types.common.mappers;

import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.DataPlatform;
import com.linkedin.datahub.graphql.generated.DataPlatformInstance;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
Expand Down Expand Up @@ -28,6 +29,11 @@ public DataPlatformInstance apply(
result.setType(EntityType.DATA_PLATFORM_INSTANCE);
result.setUrn(input.getInstance().toString());
}
result.setPlatform(
DataPlatform.builder()
.setUrn(input.getPlatform().toString())
.setType(EntityType.DATA_PLATFORM)
.build());
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.linkedin.datahub.graphql.types.common.mappers;

import com.linkedin.common.TimeStamp;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.AuditStamp;
import javax.annotation.Nullable;

public class TimeStampToAuditStampMapper {

public static final TimeStampToAuditStampMapper INSTANCE = new TimeStampToAuditStampMapper();

public static AuditStamp map(
@Nullable final QueryContext context, @Nullable final TimeStamp input) {
if (input == null) {
return null;
}
final AuditStamp result = new AuditStamp();
result.setTime(input.getTime());
if (input.hasActor()) {
result.setActor(input.getActor().toString());
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.linkedin.datahub.graphql.generated.DataJob;
import com.linkedin.datahub.graphql.generated.DataPlatform;
import com.linkedin.datahub.graphql.generated.DataPlatformInstance;
import com.linkedin.datahub.graphql.generated.DataProcessInstance;
import com.linkedin.datahub.graphql.generated.DataProduct;
import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.Domain;
Expand Down Expand Up @@ -225,6 +226,11 @@ public Entity apply(@Nullable QueryContext context, Urn input) {
((BusinessAttribute) partialEntity).setUrn(input.toString());
((BusinessAttribute) partialEntity).setType(EntityType.BUSINESS_ATTRIBUTE);
}
if (input.getEntityType().equals(DATA_PROCESS_INSTANCE_ENTITY_NAME)) {
partialEntity = new DataProcessInstance();
((DataProcessInstance) partialEntity).setUrn(input.toString());
((DataProcessInstance) partialEntity).setType(EntityType.DATA_PROCESS_INSTANCE);
}
return partialEntity;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.linkedin.datahub.graphql.types.dataprocessinst;

import static com.linkedin.metadata.Constants.*;

import com.google.common.collect.ImmutableSet;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
import com.linkedin.datahub.graphql.generated.DataProcessInstance;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.types.dataprocessinst.mappers.DataProcessInstanceMapper;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
import graphql.execution.DataFetcherResult;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class DataProcessInstanceType
implements com.linkedin.datahub.graphql.types.EntityType<DataProcessInstance, String> {

public static final Set<String> ASPECTS_TO_FETCH =
ImmutableSet.of(
DATA_PROCESS_INSTANCE_KEY_ASPECT_NAME,
DATA_PLATFORM_INSTANCE_ASPECT_NAME,
DATA_PROCESS_INSTANCE_PROPERTIES_ASPECT_NAME,
DATA_PROCESS_INSTANCE_INPUT_ASPECT_NAME,
DATA_PROCESS_INSTANCE_OUTPUT_ASPECT_NAME,
DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME,
TEST_RESULTS_ASPECT_NAME,
DATA_PROCESS_INSTANCE_RELATIONSHIPS_ASPECT_NAME,
ML_TRAINING_RUN_PROPERTIES_ASPECT_NAME,
SUB_TYPES_ASPECT_NAME,
CONTAINER_ASPECT_NAME);

private final EntityClient _entityClient;
private final FeatureFlags _featureFlags;

@Override
public EntityType type() {
return EntityType.DATA_PROCESS_INSTANCE;
}

@Override
public Function<Entity, String> getKeyProvider() {
return Entity::getUrn;
}

@Override
public Class<DataProcessInstance> objectClass() {
return DataProcessInstance.class;
}

@Override
public List<DataFetcherResult<DataProcessInstance>> batchLoad(
@Nonnull List<String> urns, @Nonnull QueryContext context) throws Exception {
final List<Urn> dataProcessInstanceUrns =
urns.stream().map(UrnUtils::getUrn).collect(Collectors.toList());

try {
Map<Urn, EntityResponse> entities = new HashMap<>();
if (_featureFlags.isDataProcessInstanceEntityEnabled()) {
entities =
_entityClient.batchGetV2(
context.getOperationContext(),
DATA_PROCESS_INSTANCE_ENTITY_NAME,
new HashSet<>(dataProcessInstanceUrns),
ASPECTS_TO_FETCH);
}

final List<EntityResponse> gmsResults = new ArrayList<>();
for (Urn urn : dataProcessInstanceUrns) {
if (_featureFlags.isDataProcessInstanceEntityEnabled()) {
gmsResults.add(entities.getOrDefault(urn, null));
}
}

return gmsResults.stream()
.map(
gmsResult ->
gmsResult == null
? null
: DataFetcherResult.<DataProcessInstance>newResult()
.data(DataProcessInstanceMapper.map(context, gmsResult))
.build())
.collect(Collectors.toList());

} catch (Exception e) {
throw new RuntimeException("Failed to load Data Process Instance entity", e);
}
}
}
Loading
Loading