39
39
StatefulIngestionConfigBase ,
40
40
StatefulIngestionSourceBase ,
41
41
)
42
- from datahub .metadata .com .linkedin .pegasus2avro .common import Siblings
43
42
from datahub .metadata .schema_classes import (
44
43
AuditStampClass ,
45
44
ContainerClass ,
63
62
TagAssociationClass ,
64
63
TagPropertiesClass ,
65
64
TimeStampClass ,
65
+ UpstreamClass ,
66
+ UpstreamLineageClass ,
66
67
VersionPropertiesClass ,
67
68
VersionTagClass ,
68
69
_Aspect ,
69
70
)
70
- from datahub .metadata .urns import DataPlatformUrn , DatasetUrn , MlModelUrn , VersionSetUrn
71
+ from datahub .metadata .urns import DataPlatformUrn , MlModelUrn , VersionSetUrn
71
72
from datahub .sdk .container import Container
72
73
from datahub .sdk .dataset import Dataset
73
74
@@ -207,14 +208,13 @@ def _create_workunit(self, urn: str, aspect: _Aspect) -> MetadataWorkUnit:
207
208
def _get_experiment_workunits (self ) -> Iterable [MetadataWorkUnit ]:
208
209
experiments = self ._get_mlflow_experiments ()
209
210
for experiment in experiments :
210
- if experiment .name == "lineage_platform_dataset_lineage_experiment" :
211
- yield from self ._get_experiment_container_workunit (experiment )
211
+ yield from self ._get_experiment_container_workunit (experiment )
212
212
213
- runs = self ._get_mlflow_runs_from_experiment (experiment )
214
- if runs :
215
- for run in runs :
216
- yield from self ._get_run_workunits (experiment , run )
217
- yield from self ._get_dataset_input_workunits (run )
213
+ runs = self ._get_mlflow_runs_from_experiment (experiment )
214
+ if runs :
215
+ for run in runs :
216
+ yield from self ._get_run_workunits (experiment , run )
217
+ yield from self ._get_dataset_input_workunits (run )
218
218
219
219
def _get_experiment_custom_properties (self , experiment ):
220
220
experiment_custom_props = getattr (experiment , "tags" , {}) or {}
@@ -271,7 +271,6 @@ def _get_dataset_schema(self, schema: str) -> Optional[List[Tuple[str, str]]]:
271
271
print ("Failed to parse schema JSON" )
272
272
return None
273
273
274
- # Check for mlflow_colspec and extract field information
275
274
if "mlflow_colspec" in schema_dict :
276
275
try :
277
276
return [
@@ -281,12 +280,9 @@ def _get_dataset_schema(self, schema: str) -> Optional[List[Tuple[str, str]]]:
281
280
except (KeyError , TypeError ):
282
281
return None
283
282
284
- # If we reach here, schema doesn't have the expected structure
285
283
return None
286
284
287
285
def _get_dataset_platform_from_source_type (self , source_type ):
288
- # source_type_to_platform = {}
289
- # TODO: add ingestion config for this
290
286
if source_type == "gs" :
291
287
return "gcs"
292
288
return source_type
@@ -318,40 +314,36 @@ def _get_dataset_input_workunits(self, run: Run) -> Iterable[MetadataWorkUnit]:
318
314
dataset_reference_urns .append (str (local_dataset_reference .urn ))
319
315
320
316
else :
321
- # workaround for setting siblings
322
- hosted_dataset_reference_urn = DatasetUrn .create_from_ids (
323
- platform_id = self .platform , table_name = dataset .name , env = "PROD"
324
- )
325
317
hosted_dataset = Dataset (
326
318
platform = self ._get_dataset_platform_from_source_type (source_type ),
327
319
name = dataset .name ,
328
320
schema = formatted_schema ,
329
321
custom_properties = dataset_tags ,
330
- extra_aspects = [
331
- Siblings (
332
- primary = True , siblings = [str (hosted_dataset_reference_urn )]
333
- )
334
- ],
335
322
)
336
- # create dataset reference
337
323
hosted_dataset_reference = Dataset (
338
324
platform = self .platform ,
339
325
name = dataset .name ,
340
326
schema = formatted_schema ,
341
327
custom_properties = dataset_tags ,
342
- extra_aspects = [
343
- Siblings (primary = False , siblings = [str (hosted_dataset .urn )])
344
- ],
328
+ upstreams = UpstreamLineageClass (
329
+ upstreams = [
330
+ UpstreamClass (dataset = str (hosted_dataset .urn ), type = "COPY" )
331
+ ]
332
+ ),
345
333
)
346
334
dataset_reference_urns .append (str (hosted_dataset_reference .urn ))
347
335
348
336
yield from hosted_dataset .as_workunits ()
349
337
yield from hosted_dataset_reference .as_workunits ()
350
338
351
339
if dataset_reference_urns :
340
+ input_edges = [
341
+ EdgeClass (destinationUrn = dataset_referece_urn )
342
+ for dataset_referece_urn in dataset_reference_urns
343
+ ]
352
344
yield MetadataChangeProposalWrapper (
353
345
entityUrn = str (run_urn ),
354
- aspect = DataProcessInstanceInputClass (inputs = dataset_reference_urns ),
346
+ aspect = DataProcessInstanceInputClass (inputs = [], inputEdges = input_edges ),
355
347
).as_workunit ()
356
348
357
349
def _get_run_workunits (
@@ -405,12 +397,7 @@ def _get_run_workunits(
405
397
model_version_urn = self ._make_ml_model_urn (model_versions [0 ])
406
398
yield MetadataChangeProposalWrapper (
407
399
entityUrn = str (data_process_instance .urn ),
408
- aspect = DataProcessInstanceOutputClass (
409
- outputs = [],
410
- outputEdges = [
411
- EdgeClass (destinationUrn = model_version_urn ),
412
- ],
413
- ),
400
+ aspect = DataProcessInstanceOutputClass (outputs = [model_version_urn ]),
414
401
).as_workunit ()
415
402
416
403
metrics = self ._get_run_metrics (run )
0 commit comments