Skip to content

Commit 70bec48

Browse files
authored
fix(ingest/gc): Additional dataprocess cleanup fixes (#12049)
1 parent 1d5ddf0 commit 70bec48

File tree

2 files changed

+68
-11
lines changed

2 files changed

+68
-11
lines changed

metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py

+20-11
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,9 @@ def fetch_dpis(self, job_urn: str, batch_size: int) -> List[dict]:
207207
assert self.ctx.graph
208208
dpis = []
209209
start = 0
210+
# This graphql endpoint doesn't support scrolling and therefore after 10k DPIs it causes performance issues on ES
211+
# Therefore, we are limiting the max DPIs to 9000
212+
max_item = 9000
210213
while True:
211214
try:
212215
job_query_result = self.ctx.graph.execute_graphql(
@@ -226,10 +229,12 @@ def fetch_dpis(self, job_urn: str, batch_size: int) -> List[dict]:
226229
runs = runs_data.get("runs")
227230
dpis.extend(runs)
228231
start += batch_size
229-
if len(runs) < batch_size:
232+
if len(runs) < batch_size or start >= max_item:
230233
break
231234
except Exception as e:
232-
logger.error(f"Exception while fetching DPIs for job {job_urn}: {e}")
235+
self.report.failure(
236+
f"Exception while fetching DPIs for job {job_urn}:", exc=e
237+
)
233238
break
234239
return dpis
235240

@@ -254,8 +259,9 @@ def keep_last_n_dpi(
254259
deleted_count_last_n += 1
255260
futures[future]["deleted"] = True
256261
except Exception as e:
257-
logger.error(f"Exception while deleting DPI: {e}")
258-
262+
self.report.report_failure(
263+
f"Exception while deleting DPI: {e}", exc=e
264+
)
259265
if deleted_count_last_n % self.config.batch_size == 0:
260266
logger.info(f"Deleted {deleted_count_last_n} DPIs from {job.urn}")
261267
if self.config.delay:
@@ -289,7 +295,7 @@ def delete_dpi_from_datajobs(self, job: DataJobEntity) -> None:
289295
dpis = self.fetch_dpis(job.urn, self.config.batch_size)
290296
dpis.sort(
291297
key=lambda x: x["created"]["time"]
292-
if "created" in x and "time" in x["created"]
298+
if x.get("created") and x["created"].get("time")
293299
else 0,
294300
reverse=True,
295301
)
@@ -325,8 +331,8 @@ def remove_old_dpis(
325331
continue
326332

327333
if (
328-
"created" not in dpi
329-
or "time" not in dpi["created"]
334+
not dpi.get("created")
335+
or not dpi["created"].get("time")
330336
or dpi["created"]["time"] < retention_time * 1000
331337
):
332338
future = executor.submit(
@@ -340,7 +346,7 @@ def remove_old_dpis(
340346
deleted_count_retention += 1
341347
futures[future]["deleted"] = True
342348
except Exception as e:
343-
logger.error(f"Exception while deleting DPI: {e}")
349+
self.report.report_failure(f"Exception while deleting DPI: {e}", exc=e)
344350

345351
if deleted_count_retention % self.config.batch_size == 0:
346352
logger.info(
@@ -351,9 +357,12 @@ def remove_old_dpis(
351357
logger.info(f"Sleeping for {self.config.delay} seconds")
352358
time.sleep(self.config.delay)
353359

354-
logger.info(
355-
f"Deleted {deleted_count_retention} DPIs from {job.urn} due to retention"
356-
)
360+
if deleted_count_retention > 0:
361+
logger.info(
362+
f"Deleted {deleted_count_retention} DPIs from {job.urn} due to retention"
363+
)
364+
else:
365+
logger.debug(f"No DPIs to delete from {job.urn} due to retention")
357366

358367
def get_data_flows(self) -> Iterable[DataFlowEntity]:
359368
assert self.ctx.graph

metadata-ingestion/tests/unit/test_gc.py

+48
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,54 @@ def test_delete_dpi_from_datajobs_without_dpi_created_time(self, mock_fetch_dpis
8484
self.cleanup.delete_dpi_from_datajobs(job)
8585
self.assertEqual(10, self.report.num_aspects_removed)
8686

87+
@patch(
88+
"datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis"
89+
)
90+
def test_delete_dpi_from_datajobs_without_dpi_null_created_time(
91+
self, mock_fetch_dpis
92+
):
93+
job = DataJobEntity(
94+
urn="urn:li:dataJob:1",
95+
flow_urn="urn:li:dataFlow:1",
96+
lastIngested=int(datetime.now(timezone.utc).timestamp()),
97+
jobId="job1",
98+
dataPlatformInstance="urn:li:dataPlatformInstance:1",
99+
total_runs=10,
100+
)
101+
mock_fetch_dpis.return_value = [
102+
{"urn": f"urn:li:dataprocessInstance:{i}"} for i in range(10)
103+
] + [
104+
{
105+
"urn": "urn:li:dataprocessInstance:11",
106+
"created": {"time": None},
107+
}
108+
]
109+
self.cleanup.delete_dpi_from_datajobs(job)
110+
self.assertEqual(11, self.report.num_aspects_removed)
111+
112+
@patch(
113+
"datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis"
114+
)
115+
def test_delete_dpi_from_datajobs_without_dpi_without_time(self, mock_fetch_dpis):
116+
job = DataJobEntity(
117+
urn="urn:li:dataJob:1",
118+
flow_urn="urn:li:dataFlow:1",
119+
lastIngested=int(datetime.now(timezone.utc).timestamp()),
120+
jobId="job1",
121+
dataPlatformInstance="urn:li:dataPlatformInstance:1",
122+
total_runs=10,
123+
)
124+
mock_fetch_dpis.return_value = [
125+
{"urn": f"urn:li:dataprocessInstance:{i}"} for i in range(10)
126+
] + [
127+
{
128+
"urn": "urn:li:dataprocessInstance:11",
129+
"created": None,
130+
}
131+
]
132+
self.cleanup.delete_dpi_from_datajobs(job)
133+
self.assertEqual(11, self.report.num_aspects_removed)
134+
87135
def test_fetch_dpis(self):
88136
assert self.cleanup.ctx.graph
89137
self.cleanup.ctx.graph = MagicMock()

0 commit comments

Comments
 (0)