Skip to content

Commit 6474af1

Browse files
committed
fix(ingest/mode): Handle 204 response and invalid json
1 parent ecf6c8c commit 6474af1

File tree

2 files changed

+136
-16
lines changed

2 files changed

+136
-16
lines changed

metadata-ingestion/src/datahub/ingestion/source/mode.py

+10
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
import logging
33
import re
44
import time
5+
from json import JSONDecodeError
6+
57
from dataclasses import dataclass
68
from datetime import datetime, timezone
79
from functools import lru_cache
@@ -15,6 +17,7 @@
1517
import yaml
1618
from liquid import Template, Undefined
1719
from pydantic import Field, validator
20+
from requests import Response
1821
from requests.adapters import HTTPAdapter, Retry
1922
from requests.exceptions import ConnectionError
2023
from requests.models import HTTPBasicAuth, HTTPError
@@ -1466,10 +1469,13 @@ def _get_request_json(self, url: str) -> Dict:
14661469

14671470
@r.wraps
14681471
def get_request():
1472+
response: Optional[Response] = None
14691473
try:
14701474
response = self.session.get(
14711475
url, timeout=self.config.api_options.timeout
14721476
)
1477+
if response.status_code == 204:
1478+
return {}
14731479
return response.json()
14741480
except HTTPError as http_error:
14751481
error_response = http_error.response
@@ -1481,6 +1487,10 @@ def get_request():
14811487
raise HTTPError429
14821488

14831489
raise http_error
1490+
except JSONDecodeError as json_error:
1491+
raise HTTPError(
1492+
f"{json_error.__class__.__name__}: {json_error}"
1493+
) from json_error
14841494

14851495
return get_request()
14861496

metadata-ingestion/tests/integration/mode/test_mode.py

+126-16
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import json
22
import pathlib
3+
from typing import Sequence
34
from unittest.mock import patch
45

6+
import pytest
57
from freezegun import freeze_time
68
from requests.models import HTTPError
79

810
from datahub.configuration.common import PipelineExecutionError
11+
from datahub.ingestion.api.source import StructuredLogEntry
912
from datahub.ingestion.run.pipeline import Pipeline
1013
from tests.test_helpers import mce_helpers
1114

@@ -28,13 +31,14 @@
2831
"https://app.mode.com/api/acryl/reports/24f66e1701b6/queries": "dataset_queries_24f66e1701b6.json",
2932
}
3033

31-
RESPONSE_ERROR_LIST = ["https://app.mode.com/api/acryl/spaces/75737b70402e/reports"]
34+
ERROR_URL = "https://app.mode.com/api/acryl/spaces/75737b70402e/reports"
3235

3336
test_resources_dir = pathlib.Path(__file__).parent
3437

3538

3639
class MockResponse:
3740
def __init__(self, error_list, status_code):
41+
self.text = None
3842
self.json_data = None
3943
self.error_list = error_list
4044
self.status_code = status_code
@@ -49,6 +53,14 @@ def mount(self, prefix, adaptor):
4953
return self
5054

5155
def get(self, url, timeout=40):
56+
if self.error_list is not None and self.url in self.error_list:
57+
http_error_msg = "{} Client Error: {} for url: {}".format(
58+
400,
59+
"Simulate error",
60+
self.url,
61+
)
62+
raise HTTPError(http_error_msg, response=self)
63+
5264
self.url = url
5365
self.timeout = timeout
5466
response_json_path = f"{test_resources_dir}/setup/{JSON_RESPONSE_MAP.get(url)}"
@@ -57,29 +69,46 @@ def get(self, url, timeout=40):
5769
self.json_data = data
5870
return self
5971

60-
def raise_for_status(self):
61-
if self.error_list is not None and self.url in self.error_list:
62-
http_error_msg = "{} Client Error: {} for url: {}".format(
63-
400,
64-
"Simulate error",
65-
self.url,
66-
)
67-
raise HTTPError(http_error_msg, response=self)
72+
73+
class MockResponseJson(MockResponse):
74+
def __init__(
75+
self,
76+
status_code=200,
77+
*,
78+
json_empty_list: Sequence[str] = (),
79+
json_error_list: Sequence[str] = (),
80+
):
81+
super().__init__(None, status_code)
82+
self.json_empty_list = json_empty_list
83+
self.json_error_list = json_error_list
84+
85+
def json(self):
86+
if self.url in self.json_empty_list:
87+
return json.loads("") # Shouldn't be called
88+
if self.url in self.json_error_list:
89+
return json.loads("{")
90+
return super().json()
91+
92+
def get(self, url, timeout=40):
93+
response = super().get(url, timeout)
94+
if self.url in self.json_empty_list:
95+
response.status_code = 204
96+
return response
6897

6998

70-
def mocked_requests_sucess(*args, **kwargs):
99+
def mocked_requests_success(*args, **kwargs):
71100
return MockResponse(None, 200)
72101

73102

74103
def mocked_requests_failure(*args, **kwargs):
75-
return MockResponse(RESPONSE_ERROR_LIST, 200)
104+
return MockResponse([ERROR_URL], 200)
76105

77106

78107
@freeze_time(FROZEN_TIME)
79108
def test_mode_ingest_success(pytestconfig, tmp_path):
80109
with patch(
81110
"datahub.ingestion.source.mode.requests.Session",
82-
side_effect=mocked_requests_sucess,
111+
side_effect=mocked_requests_success,
83112
):
84113
pipeline = Pipeline.create(
85114
{
@@ -142,8 +171,89 @@ def test_mode_ingest_failure(pytestconfig, tmp_path):
142171
}
143172
)
144173
pipeline.run()
145-
try:
174+
with pytest.raises(PipelineExecutionError) as exec_error:
146175
pipeline.raise_from_status()
147-
except PipelineExecutionError as exec_error:
148-
assert exec_error.args[0] == "Source reported errors"
149-
assert len(exec_error.args[1].failures) == 1
176+
assert exec_error.value.args[0] == "Source reported errors"
177+
assert len(exec_error.value.args[1].failures) == 1
178+
error_dict: StructuredLogEntry
179+
_level, error_dict = exec_error.value.args[1].failures[0]
180+
error = next(iter(error_dict.context))
181+
assert "Simulate error" in error
182+
assert ERROR_URL in error
183+
184+
185+
@freeze_time(FROZEN_TIME)
186+
def test_mode_ingest_json_empty(pytestconfig, tmp_path):
187+
with patch(
188+
"datahub.ingestion.source.mode.requests.Session",
189+
side_effect=lambda *args, **kwargs: MockResponseJson(
190+
json_empty_list=["https://app.mode.com/api/modeuser"]
191+
),
192+
):
193+
global test_resources_dir
194+
test_resources_dir = pytestconfig.rootpath / "tests/integration/mode"
195+
196+
pipeline = Pipeline.create(
197+
{
198+
"run_id": "mode-test",
199+
"source": {
200+
"type": "mode",
201+
"config": {
202+
"token": "xxxx",
203+
"password": "xxxx",
204+
"connect_uri": "https://app.mode.com/",
205+
"workspace": "acryl",
206+
},
207+
},
208+
"sink": {
209+
"type": "file",
210+
"config": {
211+
"filename": f"{tmp_path}/mode_mces.json",
212+
},
213+
},
214+
}
215+
)
216+
pipeline.run()
217+
pipeline.raise_from_status(raise_warnings=True)
218+
219+
220+
@freeze_time(FROZEN_TIME)
221+
def test_mode_ingest_json_failure(pytestconfig, tmp_path):
222+
with patch(
223+
"datahub.ingestion.source.mode.requests.Session",
224+
side_effect=lambda *args, **kwargs: MockResponseJson(
225+
json_error_list=["https://app.mode.com/api/modeuser"]
226+
),
227+
):
228+
global test_resources_dir
229+
test_resources_dir = pytestconfig.rootpath / "tests/integration/mode"
230+
231+
pipeline = Pipeline.create(
232+
{
233+
"run_id": "mode-test",
234+
"source": {
235+
"type": "mode",
236+
"config": {
237+
"token": "xxxx",
238+
"password": "xxxx",
239+
"connect_uri": "https://app.mode.com/",
240+
"workspace": "acryl",
241+
},
242+
},
243+
"sink": {
244+
"type": "file",
245+
"config": {
246+
"filename": f"{tmp_path}/mode_mces.json",
247+
},
248+
},
249+
}
250+
)
251+
pipeline.run()
252+
pipeline.raise_from_status(raise_warnings=False)
253+
with pytest.raises(PipelineExecutionError) as exec_error:
254+
pipeline.raise_from_status(raise_warnings=True)
255+
assert len(exec_error.value.args[1].warnings) > 0
256+
error_dict: StructuredLogEntry
257+
_level, error_dict = exec_error.value.args[1].warnings[0]
258+
error = next(iter(error_dict.context))
259+
assert "JSONDecodeError" in error

0 commit comments

Comments
 (0)