Skip to content

Commit 3347258

Browse files
committed
Adding DataProcessInputOutput Class to fix DPI lineage
1 parent 4e1ff8d commit 3347258

File tree

3 files changed

+151
-75
lines changed

3 files changed

+151
-75
lines changed

metadata-ingestion/src/datahub/ingestion/source/vertexai.py

+37-3
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,13 @@
4747
ContainerClass,
4848
DataPlatformInstanceClass,
4949
DataProcessInstanceInputClass,
50+
DataProcessInstanceOutputClass,
5051
DataProcessInstancePropertiesClass,
5152
DataProcessInstanceRunEventClass,
5253
DataProcessInstanceRunResultClass,
5354
DataProcessRunStatusClass,
5455
DatasetPropertiesClass,
56+
EdgeClass,
5557
MetadataAttributionClass,
5658
MLHyperParamClass,
5759
MLMetricClass,
@@ -306,6 +308,12 @@ def _gen_experiment_run_mcps(
306308
if isinstance(run_result_type, RunResultTypeClass)
307309
and created_time is not None
308310
else None,
311+
DataProcessInstanceOutputClass(
312+
outputs=[],
313+
outputEdges=[
314+
EdgeClass(destinationUrn=experiment_key.as_urn()),
315+
],
316+
),
309317
],
310318
)
311319

@@ -434,6 +442,17 @@ def _gen_training_job_mcps(
434442
if job_meta.input_dataset
435443
else None
436444
)
445+
# If Training Job has Output Model
446+
model_urn = (
447+
self._make_ml_model_urn(
448+
model_version=job_meta.output_model_version,
449+
model_name=self._make_vertexai_model_name(
450+
entity_id=job_meta.output_model.name
451+
),
452+
)
453+
if job_meta.output_model and job_meta.output_model_version
454+
else None
455+
)
437456

438457
yield from MetadataChangeProposalWrapper.construct_many(
439458
job_urn,
@@ -455,9 +474,22 @@ def _gen_training_job_mcps(
455474
SubTypesClass(typeNames=[MLAssetSubTypes.VERTEX_TRAINING_JOB]),
456475
ContainerClass(container=self._get_project_container().as_urn()),
457476
DataPlatformInstanceClass(platform=str(DataPlatformUrn(self.platform))),
458-
DataProcessInstanceInputClass(inputs=[dataset_urn])
477+
DataProcessInstanceInputClass(
478+
inputs=[],
479+
inputEdges=[
480+
EdgeClass(destinationUrn=dataset_urn),
481+
],
482+
)
459483
if dataset_urn
460484
else None,
485+
DataProcessInstanceOutputClass(
486+
outputs=[],
487+
outputEdges=[
488+
EdgeClass(destinationUrn=model_urn),
489+
],
490+
)
491+
if model_urn
492+
else None,
461493
],
462494
)
463495

@@ -593,7 +625,7 @@ def _get_input_dataset_mcps(
593625
ContainerClass(container=self._get_project_container().as_urn()),
594626
DataPlatformInstanceClass(
595627
platform=str(DataPlatformUrn(self.platform))
596-
),
628+
)
597629
],
598630
)
599631

@@ -770,7 +802,9 @@ def _gen_ml_model_mcps(
770802
versionTag=str(model_version.version_id),
771803
metadataAttribution=(
772804
MetadataAttributionClass(
773-
time=int(model_version.version_create_time.timestamp() * 1000),
805+
time=int(
806+
model_version.version_create_time.timestamp() * 1000
807+
),
774808
actor="urn:li:corpuser:datahub",
775809
)
776810
if model_version.version_create_time

0 commit comments

Comments
 (0)