13
13
from datahub .api .entities .dataset .dataset import Dataset
14
14
from datahub .emitter .mcp import MetadataChangeProposalWrapper
15
15
from datahub .ingestion .graph .client import get_default_graph
16
- from datahub .metadata .urns import DatasetUrn , DataPlatformUrn , MlModelGroupUrn , MlModelUrn
16
+ from datahub .metadata .urns import (
17
+ DatasetUrn ,
18
+ DataPlatformUrn ,
19
+ MlModelGroupUrn ,
20
+ MlModelUrn ,
21
+ )
17
22
from datahub .emitter .mcp_builder import ContainerKey
18
23
19
24
ORCHESTRATOR_MLFLOW = "mlflow"
@@ -133,7 +138,9 @@ def generate_pipeline(
133
138
134
139
yield from experiment .generate_mcp ()
135
140
136
- model_group_urn = MlModelGroupUrn (platform = "mlflow" , name = "airline_forecast_models" )
141
+ model_group_urn = MlModelGroupUrn (
142
+ platform = "mlflow" , name = "airline_forecast_models"
143
+ )
137
144
current_time = int (time .time () * 1000 )
138
145
model_group_info = models .MLModelGroupPropertiesClass (
139
146
description = "ML models for airline passenger forecasting" ,
@@ -153,8 +160,20 @@ def generate_pipeline(
153
160
154
161
print ("model_group_urn: " , model_group_urn )
155
162
156
- model_aliases = ["challenger" , "champion" , "production" , "experimental" , "deprecated" ]
157
- model_tags = ["stage:production" , "stage:development" , "team:data_science" , "team:ml_engineering" , "team:analytics" ]
163
+ model_aliases = [
164
+ "challenger" ,
165
+ "champion" ,
166
+ "production" ,
167
+ "experimental" ,
168
+ "deprecated" ,
169
+ ]
170
+ model_tags = [
171
+ "stage:production" ,
172
+ "stage:development" ,
173
+ "team:data_science" ,
174
+ "team:ml_engineering" ,
175
+ "team:analytics" ,
176
+ ]
158
177
159
178
model_dict = {
160
179
"arima_model_1" : "ARIMA model for airline passenger forecasting" ,
@@ -166,21 +185,45 @@ def generate_pipeline(
166
185
167
186
# Generate run timestamps within the last month
168
187
end_time = int (time .time () * 1000 ) # Current timestamp in milliseconds
169
- start_time = end_time - (30 * 24 * 60 * 60 * 1000 ) # 30 days ago in milliseconds
188
+ start_time = end_time - (
189
+ 30 * 24 * 60 * 60 * 1000
190
+ ) # 30 days ago in milliseconds
170
191
run_timestamps = [
171
192
start_time + (i * 5 * 24 * 60 * 60 * 1000 ) # 5-day intervals
172
193
for i in range (5 )
173
194
]
174
195
175
196
run_dict = {
176
- "run_1" : {"start_time" : run_timestamps [0 ], "duration" : 45 , "result" : InstanceRunResult .SUCCESS },
177
- "run_2" : {"start_time" : run_timestamps [1 ], "duration" : 60 , "result" : InstanceRunResult .FAILURE },
178
- "run_3" : {"start_time" : run_timestamps [2 ], "duration" : 55 , "result" : InstanceRunResult .SUCCESS },
179
- "run_4" : {"start_time" : run_timestamps [3 ], "duration" : 70 , "result" : InstanceRunResult .SUCCESS },
180
- "run_5" : {"start_time" : run_timestamps [4 ], "duration" : 50 , "result" : InstanceRunResult .FAILURE },
197
+ "run_1" : {
198
+ "start_time" : run_timestamps [0 ],
199
+ "duration" : 45 ,
200
+ "result" : InstanceRunResult .SUCCESS ,
201
+ },
202
+ "run_2" : {
203
+ "start_time" : run_timestamps [1 ],
204
+ "duration" : 60 ,
205
+ "result" : InstanceRunResult .FAILURE ,
206
+ },
207
+ "run_3" : {
208
+ "start_time" : run_timestamps [2 ],
209
+ "duration" : 55 ,
210
+ "result" : InstanceRunResult .SUCCESS ,
211
+ },
212
+ "run_4" : {
213
+ "start_time" : run_timestamps [3 ],
214
+ "duration" : 70 ,
215
+ "result" : InstanceRunResult .SUCCESS ,
216
+ },
217
+ "run_5" : {
218
+ "start_time" : run_timestamps [4 ],
219
+ "duration" : 50 ,
220
+ "result" : InstanceRunResult .FAILURE ,
221
+ },
181
222
}
182
223
183
- for i , (model_name , model_description ) in enumerate (model_dict .items (), start = 1 ):
224
+ for i , (model_name , model_description ) in enumerate (
225
+ model_dict .items (), start = 1
226
+ ):
184
227
run_id = f"run_{ i } "
185
228
data_process_instance = DataProcessInstance .from_container (
186
229
container_key = experiment .key , id = run_id
@@ -206,34 +249,36 @@ def generate_pipeline(
206
249
models .MLMetricClass (
207
250
name = "accuracy" ,
208
251
value = str (random .uniform (0.7 , 0.99 )),
209
- description = "Test accuracy"
252
+ description = "Test accuracy" ,
210
253
),
211
254
models .MLMetricClass (
212
255
name = "f1_score" ,
213
256
value = str (random .uniform (0.7 , 0.99 )),
214
- description = "Test F1 score"
215
- )
257
+ description = "Test F1 score" ,
258
+ ),
216
259
]
217
260
hyper_params = [
218
261
models .MLHyperParamClass (
219
262
name = "n_estimators" ,
220
263
value = str (random .randint (50 , 200 )),
221
- description = "Number of trees"
264
+ description = "Number of trees" ,
222
265
),
223
266
models .MLHyperParamClass (
224
267
name = "max_depth" ,
225
268
value = str (random .randint (5 , 15 )),
226
- description = "Maximum tree depth"
227
- )
269
+ description = "Maximum tree depth" ,
270
+ ),
228
271
]
229
272
230
273
# DPI properties
231
274
created_at = int (time .time () * 1000 )
232
275
print (start_time )
233
276
dpi_props = models .DataProcessInstancePropertiesClass (
234
277
name = f"Training { run_id } " ,
235
- created = models .AuditStampClass (time = created_at , actor = "urn:li:corpuser:datahub" ),
236
- createdAt = int (created_at / 1000 ),
278
+ created = models .AuditStampClass (
279
+ time = created_at , actor = "urn:li:corpuser:datahub"
280
+ ),
281
+ createdAt = int (created_at / 1000 ),
237
282
createdBy = "jane_doe" ,
238
283
loggedModels = ["sklearn" ],
239
284
artifactsLocation = "s3://mlflow/artifacts" ,
@@ -261,7 +306,9 @@ def generate_pipeline(
261
306
duration_minutes = run_dict [run_id ]["duration" ]
262
307
end_time_millis = start_time_millis + duration_minutes * 60000
263
308
result = run_dict [run_id ]["result" ]
264
- result_type = "SUCCESS" if result == InstanceRunResult .SUCCESS else "FAILURE"
309
+ result_type = (
310
+ "SUCCESS" if result == InstanceRunResult .SUCCESS else "FAILURE"
311
+ )
265
312
266
313
yield from data_process_instance .start_event_mcp (
267
314
start_timestamp_millis = start_time_millis
@@ -275,7 +322,12 @@ def generate_pipeline(
275
322
276
323
print ("data_process_instance.urn: " , data_process_instance .urn )
277
324
print ("start Time:" , start_time_millis )
278
- print ("start Time:" , time .strftime ('%Y-%m-%d %H:%M:%S' , time .localtime (start_time_millis / 1000 )))
325
+ print (
326
+ "start Time:" ,
327
+ time .strftime (
328
+ "%Y-%m-%d %H:%M:%S" , time .localtime (start_time_millis / 1000 )
329
+ ),
330
+ )
279
331
280
332
# Model
281
333
selected_aliases = random .sample (model_aliases , k = random .randint (1 , 2 ))
@@ -309,4 +361,4 @@ def generate_pipeline(
309
361
for mcp in generate_pipeline (
310
362
"airline_forecast_pipeline_airflow" , orchestrator = ORCHESTRATOR_AIRFLOW
311
363
):
312
- graph .emit (mcp )
364
+ graph .emit (mcp )
0 commit comments