Skip to content

Commit 8e54b33

Browse files
committed
refactor mcp.construct_many for aspects
1 parent ba6585b commit 8e54b33

File tree

1 file changed

+120
-143
lines changed
  • metadata-ingestion/src/datahub/ingestion/source

1 file changed

+120
-143
lines changed

metadata-ingestion/src/datahub/ingestion/source/vertexai.py

+120-143
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
from pydantic.fields import Field
2121

2222
import datahub.emitter.mce_builder as builder
23-
from datahub._codegen.aspect import _Aspect
2423
from datahub.configuration.source_common import EnvConfigMixin
2524
from datahub.emitter.mcp import MetadataChangeProposalWrapper
2625
from datahub.emitter.mcp_builder import ProjectIdKey, gen_containers
@@ -275,44 +274,43 @@ def _gen_training_job_mcps(
275274
)
276275
created_actor = "urn:li:corpuser:datahub"
277276

278-
aspects: List[_Aspect] = list()
279-
aspects.append(
280-
DataProcessInstancePropertiesClass(
281-
name=job_id,
282-
created=AuditStampClass(
283-
time=created_time,
284-
actor=created_actor,
285-
),
286-
externalUrl=self._make_job_external_url(job),
287-
customProperties={
288-
"displayName": job.display_name,
289-
"jobType": job.__class__.__name__,
290-
},
291-
)
292-
)
293-
aspects.append(
294-
MLTrainingRunProperties(
295-
externalUrl=self._make_job_external_url(job), id=job.name
296-
)
297-
)
298-
aspects.append(SubTypesClass(typeNames=[MLTypes.TRAINING_JOB]))
299-
aspects.append(ContainerClass(container=self._get_project_container().as_urn()))
300-
301277
# If Training job has Input Dataset
302-
if job_meta.input_dataset:
303-
dataset_urn = builder.make_dataset_urn(
278+
dataset_urn = (
279+
builder.make_dataset_urn(
304280
platform=self.platform,
305281
name=self._make_vertexai_dataset_name(
306282
entity_id=job_meta.input_dataset.name
307283
),
308284
env=self.config.env,
309285
)
310-
aspects.append(
311-
DataProcessInstanceInputClass(inputs=[dataset_urn]),
312-
)
286+
if job_meta.input_dataset
287+
else None
288+
)
313289

314290
yield from MetadataChangeProposalWrapper.construct_many(
315-
job_urn, aspects=aspects
291+
job_urn,
292+
aspects=[
293+
DataProcessInstancePropertiesClass(
294+
name=job_id,
295+
created=AuditStampClass(
296+
time=created_time,
297+
actor=created_actor,
298+
),
299+
externalUrl=self._make_job_external_url(job),
300+
customProperties={
301+
"displayName": job.display_name,
302+
"jobType": job.__class__.__name__,
303+
},
304+
),
305+
MLTrainingRunProperties(
306+
externalUrl=self._make_job_external_url(job), id=job.name
307+
),
308+
SubTypesClass(typeNames=[MLTypes.TRAINING_JOB]),
309+
ContainerClass(container=self._get_project_container().as_urn()),
310+
DataProcessInstanceInputClass(inputs=[dataset_urn])
311+
if dataset_urn
312+
else None,
313+
],
316314
)
317315

318316
def _gen_ml_group_mcps(
@@ -324,31 +322,28 @@ def _gen_ml_group_mcps(
324322
"""
325323
ml_model_group_urn = self._make_ml_model_group_urn(model)
326324

327-
aspects: List[_Aspect] = list()
328-
aspects.append(
329-
MLModelGroupPropertiesClass(
330-
name=self._make_vertexai_model_group_name(model.name),
331-
description=model.description,
332-
created=(
333-
TimeStampClass(time=datetime_to_ts_millis(model.create_time))
334-
if model.create_time
335-
else None
336-
),
337-
lastModified=(
338-
TimeStampClass(time=datetime_to_ts_millis(model.update_time))
339-
if model.update_time
340-
else None
341-
),
342-
customProperties={"displayName": model.display_name},
343-
)
344-
)
345-
346-
# TODO add following when metadata model for mlgroup is updated (these aspects not supported currently)
347-
# aspects.append(SubTypesClass(typeNames=[MLTypes.MODEL_GROUP]))
348-
# aspects.append(ContainerClass(container=self._get_project_container().as_urn()))
349-
350325
yield from MetadataChangeProposalWrapper.construct_many(
351-
ml_model_group_urn, aspects=aspects
326+
ml_model_group_urn,
327+
aspects=[
328+
MLModelGroupPropertiesClass(
329+
name=self._make_vertexai_model_group_name(model.name),
330+
description=model.description,
331+
created=(
332+
TimeStampClass(time=datetime_to_ts_millis(model.create_time))
333+
if model.create_time
334+
else None
335+
),
336+
lastModified=(
337+
TimeStampClass(time=datetime_to_ts_millis(model.update_time))
338+
if model.update_time
339+
else None
340+
),
341+
customProperties={"displayName": model.display_name},
342+
),
343+
# TODO add following when metadata model for mlgroup is updated (these aspects not supported currently)
344+
# SubTypesClass(typeNames=[MLTypes.MODEL_GROUP]),
345+
# ContainerClass(container=self._get_project_container().as_urn())
346+
],
352347
)
353348

354349
def _make_ml_model_group_urn(self, model: Model) -> str:
@@ -421,32 +416,26 @@ def _get_input_dataset_mcps(
421416
env=self.config.env,
422417
)
423418

424-
# Create aspects for the dataset
425-
aspects: List[_Aspect] = list()
426-
aspects.append(
427-
DatasetPropertiesClass(
428-
name=self._make_vertexai_dataset_name(ds.name),
429-
created=(
430-
TimeStampClass(time=datetime_to_ts_millis(ds.create_time))
431-
if ds.create_time
432-
else None
433-
),
434-
description=f"Dataset: {ds.display_name}",
435-
customProperties={
436-
"displayName": ds.display_name,
437-
"resourceName": ds.resource_name,
438-
},
439-
qualifiedName=ds.resource_name,
440-
)
441-
)
442-
443-
aspects.append(SubTypesClass(typeNames=[MLTypes.DATASET]))
444-
# Create a container for Project as parent of the dataset
445-
aspects.append(
446-
ContainerClass(container=self._get_project_container().as_urn())
447-
)
448419
yield from MetadataChangeProposalWrapper.construct_many(
449-
dataset_urn, aspects=aspects
420+
dataset_urn,
421+
aspects=[
422+
DatasetPropertiesClass(
423+
name=self._make_vertexai_dataset_name(ds.name),
424+
created=(
425+
TimeStampClass(time=datetime_to_ts_millis(ds.create_time))
426+
if ds.create_time
427+
else None
428+
),
429+
description=f"Dataset: {ds.display_name}",
430+
customProperties={
431+
"displayName": ds.display_name,
432+
"resourceName": ds.resource_name,
433+
},
434+
qualifiedName=ds.resource_name,
435+
),
436+
SubTypesClass(typeNames=[MLTypes.DATASET]),
437+
ContainerClass(container=self._get_project_container().as_urn()),
438+
],
450439
)
451440

452441
def _get_training_job_metadata(
@@ -521,26 +510,21 @@ def _gen_endpoints_mcps(
521510
env=self.config.env,
522511
)
523512

524-
aspects: List[_Aspect] = list()
525-
aspects.append(
526-
MLModelDeploymentPropertiesClass(
527-
description=model.description,
528-
createdAt=datetime_to_ts_millis(endpoint.create_time),
529-
version=VersionTagClass(
530-
versionTag=str(model_version.version_id)
531-
),
532-
customProperties={"displayName": endpoint.display_name},
533-
)
534-
)
535-
536-
# TODO add followings when metadata for MLModelDeployment is updated (these aspects not supported currently)
537-
# aspects.append(
538-
# ContainerClass(container=self._get_project_container().as_urn())
539-
# )
540-
# aspects.append(SubTypesClass(typeNames=[MLTypes.ENDPOINT]))
541-
542513
yield from MetadataChangeProposalWrapper.construct_many(
543-
endpoint_urn, aspects=aspects
514+
endpoint_urn,
515+
aspects=[
516+
MLModelDeploymentPropertiesClass(
517+
description=model.description,
518+
createdAt=datetime_to_ts_millis(endpoint.create_time),
519+
version=VersionTagClass(
520+
versionTag=str(model_version.version_id)
521+
),
522+
customProperties={"displayName": endpoint.display_name},
523+
),
524+
# TODO add followings when metadata for MLModelDeployment is updated (these aspects not supported currently)
525+
# ContainerClass(container=self._get_project_container().as_urn()),
526+
# SubTypesClass(typeNames=[MLTypes.ENDPOINT])
527+
],
544528
)
545529

546530
def _gen_ml_model_mcps(
@@ -580,52 +564,45 @@ def _gen_ml_model_mcps(
580564
model_version_name = f"{model_name}_{model_version.version_id}"
581565
model_urn = self._make_ml_model_urn(model_version, model_name=model_name)
582566

583-
# Create aspects for ML Model
584-
aspects: List[_Aspect] = list()
585-
586-
aspects.append(
587-
MLModelPropertiesClass(
588-
name=model_version_name,
589-
description=model_version.version_description,
590-
customProperties={
591-
"displayName": f"{model_version.model_display_name}",
592-
"versionId": f"{model_version.version_id}",
593-
"resourceName": model.resource_name,
594-
},
595-
created=(
596-
TimeStampClass(
597-
datetime_to_ts_millis(model_version.version_create_time)
598-
)
599-
if model_version.version_create_time
600-
else None
601-
),
602-
lastModified=(
603-
TimeStampClass(
604-
datetime_to_ts_millis(model_version.version_update_time)
605-
)
606-
if model_version.version_update_time
607-
else None
608-
),
609-
version=VersionTagClass(versionTag=str(model_version.version_id)),
610-
groups=[model_group_urn], # link model version to model group
611-
trainingJobs=(
612-
[training_job_urn] if training_job_urn else None
613-
), # link to training job
614-
deployments=endpoint_urns,
615-
externalUrl=self._make_model_version_external_url(model),
616-
type="ML Model",
617-
)
618-
)
619-
620-
# TODO Add a container for Project as parent of the dataset
621-
# aspects.append(
622-
# ContainerClass(
623-
# container=self._get_project_container().as_urn(),
624-
# )
625-
# )
626-
627567
yield from MetadataChangeProposalWrapper.construct_many(
628-
entityUrn=model_urn, aspects=aspects
568+
entityUrn=model_urn,
569+
aspects=[
570+
MLModelPropertiesClass(
571+
name=model_version_name,
572+
description=model_version.version_description,
573+
customProperties={
574+
"displayName": f"{model_version.model_display_name}",
575+
"versionId": f"{model_version.version_id}",
576+
"resourceName": model.resource_name,
577+
},
578+
created=(
579+
TimeStampClass(
580+
datetime_to_ts_millis(model_version.version_create_time)
581+
)
582+
if model_version.version_create_time
583+
else None
584+
),
585+
lastModified=(
586+
TimeStampClass(
587+
datetime_to_ts_millis(model_version.version_update_time)
588+
)
589+
if model_version.version_update_time
590+
else None
591+
),
592+
version=VersionTagClass(versionTag=str(model_version.version_id)),
593+
groups=[model_group_urn], # link model version to model group
594+
trainingJobs=(
595+
[training_job_urn] if training_job_urn else None
596+
), # link to training job
597+
deployments=endpoint_urns,
598+
externalUrl=self._make_model_version_external_url(model),
599+
type="ML Model",
600+
),
601+
# TODO Add a container for Project as parent of the dataset
602+
# ContainerClass(
603+
# container=self._get_project_container().as_urn(),
604+
# )
605+
],
629606
)
630607

631608
def _search_endpoint(self, model: Model) -> List[Endpoint]:

0 commit comments

Comments
 (0)