25
25
from datahub ._codegen .aspect import _Aspect
26
26
from datahub .configuration .source_common import EnvConfigMixin
27
27
from datahub .emitter .mcp import MetadataChangeProposalWrapper
28
- from datahub .emitter .mcp_builder import ProjectIdKey , gen_containers
28
+ from datahub .emitter .mcp_builder import ContainerKey , ProjectIdKey , gen_containers
29
29
from datahub .ingestion .api .common import PipelineContext
30
30
from datahub .ingestion .api .decorators import (
31
31
SupportStatus ,
41
41
from datahub .ingestion .source .mlflow import ContainerKeyWithId
42
42
from datahub .metadata ._schema_classes import MLHyperParamClass , MLMetricClass
43
43
from datahub .metadata ._urns .urn_defs import DataPlatformUrn
44
+ from datahub .metadata ._schema_classes import VersionPropertiesClass , MetadataAttributionClass
45
+ from datahub .metadata .urns import VersionSetUrn ,MlModelUrn
44
46
from datahub .metadata .com .linkedin .pegasus2avro .ml .metadata import (
45
47
MLTrainingRunProperties ,
46
48
)
54
56
DataProcessInstanceRunResultClass ,
55
57
DataProcessRunStatusClass ,
56
58
DatasetPropertiesClass ,
59
+ MLHyperParamClass ,
60
+ MLMetricClass ,
57
61
MLModelDeploymentPropertiesClass ,
58
62
MLModelGroupPropertiesClass ,
59
63
MLModelPropertiesClass ,
63
67
TimeStampClass ,
64
68
VersionTagClass ,
65
69
)
70
+ from datahub .metadata .urns import DataPlatformUrn
66
71
from datahub .utilities .str_enum import StrEnum
67
72
from datahub .utilities .time import datetime_to_ts_millis
68
73
@@ -155,6 +160,7 @@ def __init__(self, ctx: PipelineContext, config: VertexAIConfig):
155
160
self .client = aiplatform
156
161
self .endpoints : Optional [Dict [str , List [Endpoint ]]] = None
157
162
self .datasets : Optional [Dict [str , VertexAiResourceNoun ]] = None
163
+ self .experiments : Optional [List [Experiment ]] = None
158
164
159
165
def get_report (self ) -> SourceReport :
160
166
return self .report
@@ -173,9 +179,9 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
173
179
# Fetch and Ingest Training Jobs
174
180
yield from auto_workunit (self ._get_training_jobs_mcps ())
175
181
# Fetch and Ingest Experiments
176
- yield from self ._get_experiments_workunits ()
182
+ # yield from self._get_experiments_workunits()
177
183
# Fetch and Ingest Experiment Runs
178
- yield from auto_workunit (self ._get_experiment_runs_mcps ())
184
+ # yield from auto_workunit(self._get_experiment_runs_mcps())
179
185
180
186
def _get_experiments_workunits (self ) -> Iterable [MetadataWorkUnit ]:
181
187
# List all experiments
@@ -225,12 +231,22 @@ def _get_experiment_run_metrics(self, run: ExperimentRun) -> List[MLMetricClass]
225
231
MLMetricClass (name = k , value = str (v )) for k , v in run .get_metrics ().items ()
226
232
]
227
233
228
- def _get_create_time_from_run (self , run : ExperimentRun ) -> Optional [int ]:
234
+ def _get_run_create_time_duration (self , run : ExperimentRun ) -> ( Optional [int ], Optional [ int ]) :
229
235
executions = run .get_executions ()
230
236
if len (executions ) == 0 :
231
- return None
237
+ return None , None
232
238
min_create_time = min ([exec .create_time for exec in executions ])
233
- return int (min_create_time .timestamp () * 1000 )
239
+ max_upload_time = max ([exec .update_time for exec in executions ])
240
+ create_time_millis = int (min_create_time .timestamp () * 1000 )
241
+ duration = max_upload_time .timestamp () * 1000 - create_time_millis
242
+ return create_time_millis , duration
243
+
244
+ def _get_run_duration_millis (self , run : ExperimentRun ) -> Optional [int ]:
245
+ executions = run .get_executions ()
246
+ if len (executions ) == 0 :
247
+ return None
248
+ max_upload_time = max ([exec .update_time for exec in executions ])
249
+
234
250
235
251
def _get_run_result_status (self , status : str ) -> Union [str , RunResultTypeClass ]:
236
252
if status == "COMPLETE" :
@@ -268,7 +284,7 @@ def _gen_experiment_run_mcps(
268
284
)
269
285
run_urn = builder .make_data_process_instance_urn (run_name )
270
286
271
- created_time = self ._get_create_time_from_run (run )
287
+ created_time , duration = self ._get_run_create_time_duration (run )
272
288
created_actor = f"urn:li:platformResource:{ self .platform } "
273
289
274
290
aspects : List [_Aspect ] = list ()
@@ -298,8 +314,10 @@ def _gen_experiment_run_mcps(
298
314
)
299
315
)
300
316
317
+ state = run .get_state ()
301
318
run_result_type = self ._get_run_result_status (run .get_state ())
302
319
if isinstance (run_result_type , RunResultTypeClass ) and created_time is not None :
320
+
303
321
aspects .append (
304
322
DataProcessInstanceRunEventClass (
305
323
status = DataProcessRunStatusClass .STARTED ,
@@ -308,6 +326,7 @@ def _gen_experiment_run_mcps(
308
326
type = self ._get_run_result_status (run .get_state ()),
309
327
nativeResultType = self .platform ,
310
328
),
329
+ durationMillis = duration ,
311
330
)
312
331
)
313
332
@@ -729,6 +748,7 @@ def _gen_ml_model_mcps(
729
748
model_version_name = f"{ model_name } _{ model_version .version_id } "
730
749
model_urn = self ._make_ml_model_urn (model_version , model_name = model_name )
731
750
751
+
732
752
yield from MetadataChangeProposalWrapper .construct_many (
733
753
entityUrn = model_urn ,
734
754
aspects = [
@@ -768,9 +788,31 @@ def _gen_ml_model_mcps(
768
788
container = self ._get_project_container ().as_urn (),
769
789
),
770
790
SubTypesClass (typeNames = [MLTypes .MODEL ]),
791
+ VersionPropertiesClass (
792
+ version = VersionTagClass (
793
+ versionTag = str (model_version .version_id ),
794
+ metadataAttribution = MetadataAttributionClass (
795
+ time = int (model_version .version_create_time .timestamp () * 1000 ),
796
+ actor = "urn:li:corpuser:datahub" ,
797
+ ),
798
+ ),
799
+ versionSet = str (self ._get_version_set_urn (model )),
800
+ sortId = str (model_version .version_id ).zfill (10 ),
801
+ # aliases=[
802
+ # VersionTagClass(versionTag=alias) for alias in model_version.aliases
803
+ # ],
804
+ )
771
805
],
772
806
)
773
807
808
+ def _get_version_set_urn (self , model : Model ) -> VersionSetUrn :
809
+ guid_dict = {"platform" : self .platform , "name" : model .name }
810
+ version_set_urn = VersionSetUrn (
811
+ id = builder .datahub_guid (guid_dict ),
812
+ entity_type = MlModelUrn .ENTITY_TYPE ,
813
+ )
814
+ return version_set_urn
815
+
774
816
def _search_endpoint (self , model : Model ) -> List [Endpoint ]:
775
817
"""
776
818
Search for an endpoint associated with the model.
0 commit comments