@@ -208,22 +208,28 @@ def fetch_dpis(self, job_urn: str, batch_size: int) -> List[dict]:
208
208
dpis = []
209
209
start = 0
210
210
while True :
211
- job_query_result = self .ctx .graph .execute_graphql (
212
- DATA_PROCESS_INSTANCES_QUERY ,
213
- {"dataJobUrn" : job_urn , "start" : start , "count" : batch_size },
214
- )
215
- job_data = job_query_result .get ("dataJob" )
216
- if not job_data :
217
- raise ValueError (f"Error getting job { job_urn } " )
218
-
219
- runs_data = job_data .get ("runs" )
220
- if not runs_data :
221
- raise ValueError (f"Error getting runs for { job_urn } " )
222
-
223
- runs = runs_data .get ("runs" )
224
- dpis .extend (runs )
225
- start += batch_size
226
- if len (runs ) < batch_size :
211
+ try :
212
+ job_query_result = self .ctx .graph .execute_graphql (
213
+ DATA_PROCESS_INSTANCES_QUERY ,
214
+ {"dataJobUrn" : job_urn , "start" : start , "count" : batch_size },
215
+ )
216
+ job_data = job_query_result .get ("dataJob" )
217
+ if not job_data :
218
+ logger .error (f"Error getting job { job_urn } " )
219
+ break
220
+
221
+ runs_data = job_data .get ("runs" )
222
+ if not runs_data :
223
+ logger .error (f"Error getting runs for { job_urn } " )
224
+ break
225
+
226
+ runs = runs_data .get ("runs" )
227
+ dpis .extend (runs )
228
+ start += batch_size
229
+ if len (runs ) < batch_size :
230
+ break
231
+ except Exception as e :
232
+ logger .error (f"Exception while fetching DPIs for job { job_urn } : { e } " )
227
233
break
228
234
return dpis
229
235
@@ -243,8 +249,12 @@ def keep_last_n_dpi(
243
249
futures [future ] = dpi
244
250
245
251
for future in as_completed (futures ):
246
- deleted_count_last_n += 1
247
- futures [future ]["deleted" ] = True
252
+ try :
253
+ future .result ()
254
+ deleted_count_last_n += 1
255
+ futures [future ]["deleted" ] = True
256
+ except Exception as e :
257
+ logger .error (f"Exception while deleting DPI: { e } " )
248
258
249
259
if deleted_count_last_n % self .config .batch_size == 0 :
250
260
logger .info (f"Deleted { deleted_count_last_n } DPIs from { job .urn } " )
@@ -279,7 +289,7 @@ def delete_dpi_from_datajobs(self, job: DataJobEntity) -> None:
279
289
dpis = self .fetch_dpis (job .urn , self .config .batch_size )
280
290
dpis .sort (
281
291
key = lambda x : x ["created" ]["time" ]
282
- if x [ "created" ] and x [ "created" ][ "time " ]
292
+ if "created" in x and "time" in x [ "created " ]
283
293
else 0 ,
284
294
reverse = True ,
285
295
)
@@ -314,15 +324,23 @@ def remove_old_dpis(
314
324
if dpi .get ("deleted" ):
315
325
continue
316
326
317
- if dpi ["created" ]["time" ] < retention_time * 1000 :
327
+ if (
328
+ "created" not in dpi
329
+ or "time" not in dpi ["created" ]
330
+ or dpi ["created" ]["time" ] < retention_time * 1000
331
+ ):
318
332
future = executor .submit (
319
333
self .delete_entity , dpi ["urn" ], "dataprocessInstance"
320
334
)
321
335
futures [future ] = dpi
322
336
323
337
for future in as_completed (futures ):
324
- deleted_count_retention += 1
325
- futures [future ]["deleted" ] = True
338
+ try :
339
+ future .result ()
340
+ deleted_count_retention += 1
341
+ futures [future ]["deleted" ] = True
342
+ except Exception as e :
343
+ logger .error (f"Exception while deleting DPI: { e } " )
326
344
327
345
if deleted_count_retention % self .config .batch_size == 0 :
328
346
logger .info (
@@ -378,8 +396,11 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
378
396
dataFlows [flow .urn ] = flow
379
397
380
398
scroll_id : Optional [str ] = None
399
+ previous_scroll_id : Optional [str ] = None
400
+
381
401
dataJobs : Dict [str , List [DataJobEntity ]] = defaultdict (list )
382
402
deleted_jobs : int = 0
403
+
383
404
while True :
384
405
result = self .ctx .graph .execute_graphql (
385
406
DATAJOB_QUERY ,
@@ -426,9 +447,11 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
426
447
else :
427
448
dataJobs [datajob_entity .flow_urn ].append (datajob_entity )
428
449
429
- if not scroll_id :
450
+ if not scroll_id or previous_scroll_id == scroll_id :
430
451
break
431
452
453
+ previous_scroll_id = scroll_id
454
+
432
455
logger .info (f"Deleted { deleted_jobs } DataJobs" )
433
456
# Delete empty dataflows if needed
434
457
if self .config .delete_empty_data_flows :
@@ -443,4 +466,5 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
443
466
if deleted_jobs % self .config .batch_size == 0 :
444
467
logger .info (f"Deleted { deleted_data_flows } DataFlows" )
445
468
logger .info (f"Deleted { deleted_data_flows } DataFlows" )
469
+
446
470
return []
0 commit comments