@@ -290,62 +290,6 @@ def create_model(
290
290
logger .info (f"Created model: { model_urn } " )
291
291
return str (model_urn )
292
292
293
- def create_training_job (
294
- self ,
295
- run_id : str ,
296
- properties : Optional [models .DataProcessInstancePropertiesClass ] = None ,
297
- training_run_properties : Optional [models .MLTrainingRunPropertiesClass ] = None ,
298
- run_result : Optional [str ] = None ,
299
- start_timestamp : Optional [int ] = None ,
300
- end_timestamp : Optional [int ] = None ,
301
- ** kwargs : Any ,
302
- ) -> str :
303
- """Create a training job with properties and events."""
304
- dpi_urn = f"urn:li:dataProcessInstance:{ run_id } "
305
-
306
- # Create basic properties and aspects
307
- aspects = [
308
- (
309
- properties
310
- or self ._create_properties_class (
311
- models .DataProcessInstancePropertiesClass , kwargs
312
- )
313
- ),
314
- models .SubTypesClass (typeNames = ["ML Training Run" ]),
315
- ]
316
-
317
- # Add training run properties if provided
318
- if training_run_properties :
319
- aspects .append (training_run_properties )
320
-
321
- # Handle run events
322
- current_time = int (time .time () * 1000 )
323
- start_ts = start_timestamp or current_time
324
- end_ts = end_timestamp or current_time
325
-
326
- # Create events
327
- aspects .append (
328
- self ._create_run_event (
329
- status = DataProcessRunStatusClass .STARTED , timestamp = start_ts
330
- )
331
- )
332
-
333
- if run_result :
334
- aspects .append (
335
- self ._create_run_event (
336
- status = DataProcessRunStatusClass .COMPLETE ,
337
- timestamp = end_ts ,
338
- result = run_result ,
339
- duration_millis = end_ts - start_ts ,
340
- )
341
- )
342
-
343
- # Create and emit MCPs
344
- mcps = [self ._create_mcp (dpi_urn , aspect ) for aspect in aspects ]
345
- self ._emit_mcps (mcps )
346
- logger .info (f"Created training job: { dpi_urn } " )
347
- return dpi_urn
348
-
349
293
def create_experiment (
350
294
self ,
351
295
experiment_id : str ,
@@ -438,48 +382,28 @@ def create_dataset(self, name: str, platform: str, **kwargs: Any) -> str:
438
382
raise ValueError (f"Failed to create dataset URN for { name } " )
439
383
return dataset .urn
440
384
441
- def _add_process_to_model (self , model_urn : str , process_urn : str ) -> None :
442
- """Add a DataProcessInstance to a model while preserving existing properties."""
385
+ def add_run_to_model (self , model_urn : str , run_urn : str ) -> None :
386
+ """Add a run to a model while preserving existing properties."""
443
387
self ._update_entity_properties (
444
388
entity_urn = model_urn ,
445
389
aspect_type = models .MLModelPropertiesClass ,
446
- updates = {"trainingJobs" : process_urn },
390
+ updates = {"trainingJobs" : run_urn },
447
391
entity_type = "mlModel" ,
448
392
skip_properties = ["trainingJobs" ],
449
393
)
450
-
451
- def add_run_to_model (self , model_urn : str , run_urn : str ) -> None :
452
- """Add a run to a model while preserving existing properties."""
453
- self ._add_process_to_model (model_urn , run_urn )
454
394
logger .info (f"Added run { run_urn } to model { model_urn } " )
455
395
456
- def add_job_to_model (self , model_urn : str , job_urn : str ) -> None :
457
- """Add a job to a model while preserving existing properties."""
458
- self ._add_process_to_model (model_urn , job_urn )
459
- logger .info (f"Added training job { job_urn } to model { model_urn } " )
460
-
461
- def _add_process_to_model_group (
462
- self , model_group_urn : str , process_urn : str
463
- ) -> None :
464
- """Add DatapProcessInstance to a model group while preserving existing properties."""
396
+ def add_run_to_model_group (self , model_group_urn : str , run_urn : str ) -> None :
397
+ """Add a run to a model group while preserving existing properties."""
465
398
self ._update_entity_properties (
466
399
entity_urn = model_group_urn ,
467
400
aspect_type = models .MLModelGroupPropertiesClass ,
468
- updates = {"trainingJobs" : process_urn },
401
+ updates = {"trainingJobs" : run_urn },
469
402
entity_type = "mlModelGroup" ,
470
403
skip_properties = ["trainingJobs" ],
471
404
)
472
-
473
- def add_run_to_model_group (self , model_group_urn : str , run_urn : str ) -> None :
474
- """Add a run to a model group while preserving existing properties."""
475
- self ._add_process_to_model_group (model_group_urn , run_urn )
476
405
logger .info (f"Added run { run_urn } to model group { model_group_urn } " )
477
406
478
- def add_job_to_model_group (self , model_group_urn : str , job_urn : str ) -> None :
479
- """Add a job to a model group while preserving existing properties."""
480
- self ._add_process_to_model_group (model_group_urn , job_urn )
481
- logger .info (f"Added job { job_urn } to model group { model_group_urn } " )
482
-
483
407
def add_model_to_model_group (self , model_urn : str , group_urn : str ) -> None :
484
408
"""Add a model to a group while preserving existing properties"""
485
409
self ._update_entity_properties (
@@ -499,9 +423,7 @@ def add_run_to_experiment(self, run_urn: str, experiment_urn: str) -> None:
499
423
self ._emit_mcps (mcp )
500
424
logger .info (f"Added run { run_urn } to experiment { experiment_urn } " )
501
425
502
- def _add_input_datasets_to_process (
503
- self , run_urn : str , dataset_urns : List [str ]
504
- ) -> None :
426
+ def add_input_datasets_to_run (self , run_urn : str , dataset_urns : List [str ]) -> None :
505
427
"""Add input datasets to a run"""
506
428
mcp = self ._create_mcp (
507
429
entity_urn = run_urn ,
@@ -510,15 +432,8 @@ def _add_input_datasets_to_process(
510
432
aspect = DataProcessInstanceInput (inputs = dataset_urns ),
511
433
)
512
434
self ._emit_mcps (mcp )
513
-
514
- def add_input_datasets_to_run (self , run_urn : str , dataset_urns : List [str ]) -> None :
515
- self ._add_input_datasets_to_process (run_urn , dataset_urns )
516
435
logger .info (f"Added input datasets to run { run_urn } " )
517
436
518
- def add_input_datasets_to_job (self , job_urn : str , dataset_urns : List [str ]) -> None :
519
- self ._add_input_datasets_to_process (job_urn , dataset_urns )
520
- logger .info (f"Added input datasets to training job { job_urn } " )
521
-
522
437
def add_output_datasets_to_run (self , run_urn : str , dataset_urns : List [str ]) -> None :
523
438
"""Add output datasets to a run"""
524
439
mcp = self ._create_mcp (
0 commit comments