14
14
ChangeTypeClass ,
15
15
DataProcessInstanceRunResultClass ,
16
16
DataProcessRunStatusClass ,
17
+ EdgeClass ,
17
18
)
18
19
from datahub .metadata .urns import (
19
20
ContainerUrn ,
@@ -255,7 +256,7 @@ def create_model(
255
256
version_props = {
256
257
"version" : version_tag ,
257
258
"versionSet" : str (version_set_urn ),
258
- "sortId" : "AAAAAAAA" ,
259
+ "sortId" : str ( version_tag ). zfill ( 10 ) ,
259
260
}
260
261
261
262
# Add alias if provided
@@ -266,22 +267,10 @@ def create_model(
266
267
models .VersionPropertiesClass , version_props
267
268
)
268
269
269
- # Create version set properties
270
- version_set_properties = models .VersionSetPropertiesClass (
271
- latest = str (model_urn ),
272
- versioningScheme = "ALPHANUMERIC_GENERATED_BY_DATAHUB" ,
273
- )
274
-
275
270
mcps = [
276
271
self ._create_mcp (
277
272
str (model_urn ), properties , "mlModel" , "mlModelProperties"
278
273
),
279
- self ._create_mcp (
280
- str (version_set_urn ),
281
- version_set_properties ,
282
- "versionSet" ,
283
- "versionSetProperties" ,
284
- ),
285
274
self ._create_mcp (
286
275
str (model_urn ), version_properties , "mlModel" , "versionProperties"
287
276
),
@@ -429,7 +418,13 @@ def add_input_datasets_to_run(self, run_urn: str, dataset_urns: List[str]) -> No
429
418
entity_urn = run_urn ,
430
419
entity_type = "dataProcessInstance" ,
431
420
aspect_name = "dataProcessInstanceInput" ,
432
- aspect = DataProcessInstanceInput (inputs = dataset_urns ),
421
+ aspect = DataProcessInstanceInput (
422
+ inputs = [],
423
+ inputEdges = [
424
+ EdgeClass (destinationUrn = str (dataset_urn ))
425
+ for dataset_urn in dataset_urns
426
+ ],
427
+ ),
433
428
)
434
429
self ._emit_mcps (mcp )
435
430
logger .info (f"Added input datasets to run { run_urn } " )
@@ -440,7 +435,13 @@ def add_output_datasets_to_run(self, run_urn: str, dataset_urns: List[str]) -> N
440
435
entity_urn = run_urn ,
441
436
entity_type = "dataProcessInstance" ,
442
437
aspect_name = "dataProcessInstanceOutput" ,
443
- aspect = DataProcessInstanceOutput (outputs = dataset_urns ),
438
+ aspect = DataProcessInstanceOutput (
439
+ outputEdges = [
440
+ EdgeClass (destinationUrn = str (dataset_urn ))
441
+ for dataset_urn in dataset_urns
442
+ ],
443
+ outputs = [],
444
+ ),
444
445
)
445
446
self ._emit_mcps (mcp )
446
447
logger .info (f"Added output datasets to run { run_urn } " )
0 commit comments