Skip to content

Commit 632638c

Browse files
committed
Further null checks
1 parent c47b846 commit 632638c

File tree

2 files changed

+49
-2
lines changed

2 files changed

+49
-2
lines changed

metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,6 @@ def keep_last_n_dpi(
262262
self.report.report_failure(
263263
f"Exception while deleting DPI: {e}", exc=e
264264
)
265-
266265
if deleted_count_last_n % self.config.batch_size == 0:
267266
logger.info(f"Deleted {deleted_count_last_n} DPIs from {job.urn}")
268267
if self.config.delay:
@@ -296,7 +295,7 @@ def delete_dpi_from_datajobs(self, job: DataJobEntity) -> None:
296295
dpis = self.fetch_dpis(job.urn, self.config.batch_size)
297296
dpis.sort(
298297
key=lambda x: x["created"]["time"]
299-
if "created" in x and "time" in x["created"]
298+
if x.get("created") and x["created"].get("time")
300299
else 0,
301300
reverse=True,
302301
)

metadata-ingestion/tests/unit/test_gc.py

+48
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,54 @@ def test_delete_dpi_from_datajobs_without_dpi_created_time(self, mock_fetch_dpis
8484
self.cleanup.delete_dpi_from_datajobs(job)
8585
self.assertEqual(10, self.report.num_aspects_removed)
8686

87+
@patch(
88+
"datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis"
89+
)
90+
def test_delete_dpi_from_datajobs_without_dpi_null_created_time(
91+
self, mock_fetch_dpis
92+
):
93+
job = DataJobEntity(
94+
urn="urn:li:dataJob:1",
95+
flow_urn="urn:li:dataFlow:1",
96+
lastIngested=int(datetime.now(timezone.utc).timestamp()),
97+
jobId="job1",
98+
dataPlatformInstance="urn:li:dataPlatformInstance:1",
99+
total_runs=10,
100+
)
101+
mock_fetch_dpis.return_value = [
102+
{"urn": f"urn:li:dataprocessInstance:{i}"} for i in range(10)
103+
] + [
104+
{
105+
"urn": "urn:li:dataprocessInstance:11",
106+
"created": {"time": None},
107+
}
108+
]
109+
self.cleanup.delete_dpi_from_datajobs(job)
110+
self.assertEqual(11, self.report.num_aspects_removed)
111+
112+
@patch(
113+
"datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis"
114+
)
115+
def test_delete_dpi_from_datajobs_without_dpi_without_time(self, mock_fetch_dpis):
116+
job = DataJobEntity(
117+
urn="urn:li:dataJob:1",
118+
flow_urn="urn:li:dataFlow:1",
119+
lastIngested=int(datetime.now(timezone.utc).timestamp()),
120+
jobId="job1",
121+
dataPlatformInstance="urn:li:dataPlatformInstance:1",
122+
total_runs=10,
123+
)
124+
mock_fetch_dpis.return_value = [
125+
{"urn": f"urn:li:dataprocessInstance:{i}"} for i in range(10)
126+
] + [
127+
{
128+
"urn": "urn:li:dataprocessInstance:11",
129+
"created": None,
130+
}
131+
]
132+
self.cleanup.delete_dpi_from_datajobs(job)
133+
self.assertEqual(11, self.report.num_aspects_removed)
134+
87135
def test_fetch_dpis(self):
88136
assert self.cleanup.ctx.graph
89137
self.cleanup.ctx.graph = MagicMock()

0 commit comments

Comments
 (0)