Skip to content

Commit 7c2ea0c

Browse files
committed
fix connector
1 parent 2f4a8a6 commit 7c2ea0c

File tree

1 file changed

+8
-19
lines changed
  • metadata-ingestion/src/datahub/ingestion/source

1 file changed

+8
-19
lines changed

metadata-ingestion/src/datahub/ingestion/source/mlflow.py

+8-19
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
VersionTagClass,
3434
DataProcessInstanceRunEventClass,
3535
DataProcessInstancePropertiesClass,
36-
DataProcessInstanceRelationshipsClass,
3736
ContainerPropertiesClass,
3837
TimeStampClass,
3938
DataProcessRunStatusClass,
@@ -44,7 +43,7 @@
4443
MLTrainingRunPropertiesClass,
4544
DataProcessInstanceRunResultClass,
4645
)
47-
from datahub.metadata.urns import DatasetUrn, DataPlatformUrn, MlModelUrn, MlModelGroupUrn, DataProcessInstanceUrn, DataPlatformInstanceUrn
46+
from datahub.metadata.urns import DataPlatformUrn
4847
from datahub.api.entities.dataprocess.dataprocess_instance import (
4948
DataProcessInstance,
5049
)
@@ -161,7 +160,7 @@ def get_report(self) -> SourceReport:
161160
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
162161
yield from self._get_tags_workunits()
163162
yield from self._get_ml_model_workunits()
164-
# yield from self._get_experiment_workunits()
163+
yield from self._get_experiment_workunits()
165164

166165
def _get_tags_workunits(self) -> Iterable[MetadataWorkUnit]:
167166
for stage_info in self.registered_model_stages_info:
@@ -208,20 +207,17 @@ def _get_experiment_custom_properties(self, experiment):
208207
return experiment_custom_props
209208

210209
def _get_experiment_container_workunit(self, experiment: Experiment) -> List[MetadataWorkUnit]:
211-
experiment = Container(
210+
experiment_container = Container(
212211
key=ContainerKeyWithId(
213212
platform=str(DataPlatformUrn.create_from_id("mlflow")),
214-
id=experiment.name
213+
id=experiment.name,
215214
),
216215
subtype="ML Experiment",
217216
name=experiment.name,
218217
description=experiment.tags.get('mlflow.note.content')
219-
) # TODO: urn should be experiment id
220-
221-
# print("experiment.key.id:", experiment.key.id) # this should be same as container key as urn
222-
# print("experiment.key.as_urn(): ", experiment.key.as_urn())
218+
) # TODO: this generates a urn as guid, should we change this to use experiment.id?
223219

224-
workunits = [mcp.as_workunit() for mcp in experiment.generate_mcp()]
220+
workunits = [mcp.as_workunit() for mcp in experiment_container.generate_mcp()]
225221
return workunits
226222

227223

@@ -253,15 +249,7 @@ def _get_run_workunits(self, experiment: Experiment, run: Run) -> List[MetadataW
253249
data_process_instance = DataProcessInstance.from_container(
254250
container_key=experiment_key,
255251
id=run.info.run_name
256-
)
257-
258-
# TODO: urn should be run id
259-
# print("dpi id", run.info.run_name)
260-
# print("experiment_key.id:", experiment_key.id)
261-
# print("run id", run.info.run_id)
262-
# print("data_proceess_instance.urn:", str(data_process_instance.urn))
263-
# print("--------------------")
264-
252+
) # TODO: this generates a urn as guid, should we change this to use run.info.run_id?
265253
workunits = []
266254

267255
run_custom_props = self._get_run_custom_properties(run)
@@ -290,6 +278,7 @@ def _get_run_workunits(self, experiment: Experiment, run: Run) -> List[MetadataW
290278
hyperParams=hyperparams,
291279
trainingMetrics=metrics,
292280
outputUrls=[run.info.artifact_uri],
281+
id=run.info.run_id,
293282
)
294283
).as_workunit()
295284
)

0 commit comments

Comments
 (0)