Skip to content

Commit f4b33b5

Browse files
fix(ingest/mode): Handle 204 response and invalid json (#12156)
Co-authored-by: Aseem Bansal <[email protected]>
1 parent 48736a0 commit f4b33b5

File tree

2 files changed

+151
-36
lines changed

2 files changed

+151
-36
lines changed

metadata-ingestion/src/datahub/ingestion/source/mode.py

+26-20
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from dataclasses import dataclass
66
from datetime import datetime, timezone
77
from functools import lru_cache
8+
from json import JSONDecodeError
89
from typing import Dict, Iterable, List, Optional, Set, Tuple, Union
910

1011
import dateutil.parser as dp
@@ -193,6 +194,9 @@ class HTTPError429(HTTPError):
193194
pass
194195

195196

197+
ModeRequestError = (HTTPError, JSONDecodeError)
198+
199+
196200
@dataclass
197201
class ModeSourceReport(StaleEntityRemovalSourceReport):
198202
filtered_spaces: LossyList[str] = dataclasses.field(default_factory=LossyList)
@@ -328,11 +332,11 @@ def __init__(self, ctx: PipelineContext, config: ModeConfig):
328332
# Test the connection
329333
try:
330334
self._get_request_json(f"{self.config.connect_uri}/api/verify")
331-
except HTTPError as http_error:
335+
except ModeRequestError as e:
332336
self.report.report_failure(
333337
title="Failed to Connect",
334338
message="Unable to verify connection to mode.",
335-
context=f"Error: {str(http_error)}",
339+
context=f"Error: {str(e)}",
336340
)
337341

338342
self.workspace_uri = f"{self.config.connect_uri}/api/{self.config.workspace}"
@@ -521,11 +525,11 @@ def _get_creator(self, href: str) -> Optional[str]:
521525
if self.config.owner_username_instead_of_email
522526
else user_json.get("email")
523527
)
524-
except HTTPError as http_error:
528+
except ModeRequestError as e:
525529
self.report.report_warning(
526530
title="Failed to retrieve Mode creator",
527531
message=f"Unable to retrieve user for {href}",
528-
context=f"Reason: {str(http_error)}",
532+
context=f"Reason: {str(e)}",
529533
)
530534
return user
531535

@@ -571,11 +575,11 @@ def _get_space_name_and_tokens(self) -> dict:
571575
logging.debug(f"Skipping space {space_name} due to space pattern")
572576
continue
573577
space_info[s.get("token", "")] = s.get("name", "")
574-
except HTTPError as http_error:
578+
except ModeRequestError as e:
575579
self.report.report_failure(
576580
title="Failed to Retrieve Spaces",
577581
message="Unable to retrieve spaces / collections for workspace.",
578-
context=f"Workspace: {self.workspace_uri}, Error: {str(http_error)}",
582+
context=f"Workspace: {self.workspace_uri}, Error: {str(e)}",
579583
)
580584

581585
return space_info
@@ -721,11 +725,11 @@ def _get_data_sources(self) -> List[dict]:
721725
try:
722726
ds_json = self._get_request_json(f"{self.workspace_uri}/data_sources")
723727
data_sources = ds_json.get("_embedded", {}).get("data_sources", [])
724-
except HTTPError as http_error:
728+
except ModeRequestError as e:
725729
self.report.report_failure(
726730
title="Failed to retrieve Data Sources",
727731
message="Unable to retrieve data sources from Mode.",
728-
context=f"Error: {str(http_error)}",
732+
context=f"Error: {str(e)}",
729733
)
730734

731735
return data_sources
@@ -812,11 +816,11 @@ def _get_definition(self, definition_name):
812816
if definition.get("name", "") == definition_name:
813817
return definition.get("source", "")
814818

815-
except HTTPError as http_error:
819+
except ModeRequestError as e:
816820
self.report.report_failure(
817821
title="Failed to Retrieve Definition",
818822
message="Unable to retrieve definition from Mode.",
819-
context=f"Definition Name: {definition_name}, Error: {str(http_error)}",
823+
context=f"Definition Name: {definition_name}, Error: {str(e)}",
820824
)
821825
return None
822826

@@ -1382,11 +1386,11 @@ def _get_reports(self, space_token: str) -> List[dict]:
13821386
f"{self.workspace_uri}/spaces/{space_token}/reports"
13831387
)
13841388
reports = reports_json.get("_embedded", {}).get("reports", {})
1385-
except HTTPError as http_error:
1389+
except ModeRequestError as e:
13861390
self.report.report_failure(
13871391
title="Failed to Retrieve Reports for Space",
13881392
message="Unable to retrieve reports for space token.",
1389-
context=f"Space Token: {space_token}, Error: {str(http_error)}",
1393+
context=f"Space Token: {space_token}, Error: {str(e)}",
13901394
)
13911395
return reports
13921396

@@ -1400,11 +1404,11 @@ def _get_datasets(self, space_token: str) -> List[dict]:
14001404
url = f"{self.workspace_uri}/spaces/{space_token}/datasets"
14011405
datasets_json = self._get_request_json(url)
14021406
datasets = datasets_json.get("_embedded", {}).get("reports", [])
1403-
except HTTPError as http_error:
1407+
except ModeRequestError as e:
14041408
self.report.report_failure(
14051409
title="Failed to Retrieve Datasets for Space",
14061410
message=f"Unable to retrieve datasets for space token {space_token}.",
1407-
context=f"Error: {str(http_error)}",
1411+
context=f"Error: {str(e)}",
14081412
)
14091413
return datasets
14101414

@@ -1416,11 +1420,11 @@ def _get_queries(self, report_token: str) -> list:
14161420
f"{self.workspace_uri}/reports/{report_token}/queries"
14171421
)
14181422
queries = queries_json.get("_embedded", {}).get("queries", {})
1419-
except HTTPError as http_error:
1423+
except ModeRequestError as e:
14201424
self.report.report_failure(
14211425
title="Failed to Retrieve Queries",
14221426
message="Unable to retrieve queries for report token.",
1423-
context=f"Report Token: {report_token}, Error: {str(http_error)}",
1427+
context=f"Report Token: {report_token}, Error: {str(e)}",
14241428
)
14251429
return queries
14261430

@@ -1433,11 +1437,11 @@ def _get_last_query_run(
14331437
f"{self.workspace_uri}/reports/{report_token}/runs/{report_run_id}/query_runs{query_run_id}"
14341438
)
14351439
queries = queries_json.get("_embedded", {}).get("queries", {})
1436-
except HTTPError as http_error:
1440+
except ModeRequestError as e:
14371441
self.report.report_failure(
14381442
title="Failed to Retrieve Queries for Report",
14391443
message="Unable to retrieve queries for report token.",
1440-
context=f"Report Token:{report_token}, Error: {str(http_error)}",
1444+
context=f"Report Token:{report_token}, Error: {str(e)}",
14411445
)
14421446
return {}
14431447
return queries
@@ -1451,13 +1455,13 @@ def _get_charts(self, report_token: str, query_token: str) -> list:
14511455
f"/queries/{query_token}/charts"
14521456
)
14531457
charts = charts_json.get("_embedded", {}).get("charts", {})
1454-
except HTTPError as http_error:
1458+
except ModeRequestError as e:
14551459
self.report.report_failure(
14561460
title="Failed to Retrieve Charts",
14571461
message="Unable to retrieve charts from Mode.",
14581462
context=f"Report Token: {report_token}, "
14591463
f"Query token: {query_token}, "
1460-
f"Error: {str(http_error)}",
1464+
f"Error: {str(e)}",
14611465
)
14621466
return charts
14631467

@@ -1477,6 +1481,8 @@ def get_request():
14771481
response = self.session.get(
14781482
url, timeout=self.config.api_options.timeout
14791483
)
1484+
if response.status_code == 204: # No content, don't parse json
1485+
return {}
14801486
return response.json()
14811487
except HTTPError as http_error:
14821488
error_response = http_error.response

metadata-ingestion/tests/integration/mode/test_mode.py

+125-16
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
import json
22
import pathlib
3+
from typing import Sequence
34
from unittest.mock import patch
45

6+
import pytest
57
from freezegun import freeze_time
68
from requests.models import HTTPError
79

810
from datahub.configuration.common import PipelineExecutionError
11+
from datahub.ingestion.api.source import StructuredLogEntry
912
from datahub.ingestion.run.pipeline import Pipeline
1013
from tests.test_helpers import mce_helpers
1114

@@ -28,7 +31,7 @@
2831
"https://app.mode.com/api/acryl/reports/24f66e1701b6/queries": "dataset_queries_24f66e1701b6.json",
2932
}
3033

31-
RESPONSE_ERROR_LIST = ["https://app.mode.com/api/acryl/spaces/75737b70402e/reports"]
34+
ERROR_URL = "https://app.mode.com/api/acryl/spaces/75737b70402e/reports"
3235

3336
test_resources_dir = pathlib.Path(__file__).parent
3437

@@ -49,6 +52,14 @@ def mount(self, prefix, adaptor):
4952
return self
5053

5154
def get(self, url, timeout=40):
55+
if self.error_list is not None and self.url in self.error_list:
56+
http_error_msg = "{} Client Error: {} for url: {}".format(
57+
400,
58+
"Simulate error",
59+
self.url,
60+
)
61+
raise HTTPError(http_error_msg, response=self)
62+
5263
self.url = url
5364
self.timeout = timeout
5465
response_json_path = f"{test_resources_dir}/setup/{JSON_RESPONSE_MAP.get(url)}"
@@ -57,29 +68,46 @@ def get(self, url, timeout=40):
5768
self.json_data = data
5869
return self
5970

60-
def raise_for_status(self):
61-
if self.error_list is not None and self.url in self.error_list:
62-
http_error_msg = "{} Client Error: {} for url: {}".format(
63-
400,
64-
"Simulate error",
65-
self.url,
66-
)
67-
raise HTTPError(http_error_msg, response=self)
71+
72+
class MockResponseJson(MockResponse):
73+
def __init__(
74+
self,
75+
status_code: int = 200,
76+
*,
77+
json_empty_list: Sequence[str] = (),
78+
json_error_list: Sequence[str] = (),
79+
):
80+
super().__init__(None, status_code)
81+
self.json_empty_list = json_empty_list
82+
self.json_error_list = json_error_list
83+
84+
def json(self):
85+
if self.url in self.json_empty_list:
86+
return json.loads("") # Shouldn't be called
87+
if self.url in self.json_error_list:
88+
return json.loads("{")
89+
return super().json()
90+
91+
def get(self, url, timeout=40):
92+
response = super().get(url, timeout)
93+
if self.url in self.json_empty_list:
94+
response.status_code = 204
95+
return response
6896

6997

70-
def mocked_requests_sucess(*args, **kwargs):
98+
def mocked_requests_success(*args, **kwargs):
7199
return MockResponse(None, 200)
72100

73101

74102
def mocked_requests_failure(*args, **kwargs):
75-
return MockResponse(RESPONSE_ERROR_LIST, 200)
103+
return MockResponse([ERROR_URL], 200)
76104

77105

78106
@freeze_time(FROZEN_TIME)
79107
def test_mode_ingest_success(pytestconfig, tmp_path):
80108
with patch(
81109
"datahub.ingestion.source.mode.requests.Session",
82-
side_effect=mocked_requests_sucess,
110+
side_effect=mocked_requests_success,
83111
):
84112
pipeline = Pipeline.create(
85113
{
@@ -142,8 +170,89 @@ def test_mode_ingest_failure(pytestconfig, tmp_path):
142170
}
143171
)
144172
pipeline.run()
145-
try:
173+
with pytest.raises(PipelineExecutionError) as exec_error:
146174
pipeline.raise_from_status()
147-
except PipelineExecutionError as exec_error:
148-
assert exec_error.args[0] == "Source reported errors"
149-
assert len(exec_error.args[1].failures) == 1
175+
assert exec_error.value.args[0] == "Source reported errors"
176+
assert len(exec_error.value.args[1].failures) == 1
177+
error_dict: StructuredLogEntry
178+
_level, error_dict = exec_error.value.args[1].failures[0]
179+
error = next(iter(error_dict.context))
180+
assert "Simulate error" in error
181+
assert ERROR_URL in error
182+
183+
184+
@freeze_time(FROZEN_TIME)
185+
def test_mode_ingest_json_empty(pytestconfig, tmp_path):
186+
with patch(
187+
"datahub.ingestion.source.mode.requests.Session",
188+
side_effect=lambda *args, **kwargs: MockResponseJson(
189+
json_empty_list=["https://app.mode.com/api/modeuser"]
190+
),
191+
):
192+
global test_resources_dir
193+
test_resources_dir = pytestconfig.rootpath / "tests/integration/mode"
194+
195+
pipeline = Pipeline.create(
196+
{
197+
"run_id": "mode-test",
198+
"source": {
199+
"type": "mode",
200+
"config": {
201+
"token": "xxxx",
202+
"password": "xxxx",
203+
"connect_uri": "https://app.mode.com/",
204+
"workspace": "acryl",
205+
},
206+
},
207+
"sink": {
208+
"type": "file",
209+
"config": {
210+
"filename": f"{tmp_path}/mode_mces.json",
211+
},
212+
},
213+
}
214+
)
215+
pipeline.run()
216+
pipeline.raise_from_status(raise_warnings=True)
217+
218+
219+
@freeze_time(FROZEN_TIME)
220+
def test_mode_ingest_json_failure(pytestconfig, tmp_path):
221+
with patch(
222+
"datahub.ingestion.source.mode.requests.Session",
223+
side_effect=lambda *args, **kwargs: MockResponseJson(
224+
json_error_list=["https://app.mode.com/api/modeuser"]
225+
),
226+
):
227+
global test_resources_dir
228+
test_resources_dir = pytestconfig.rootpath / "tests/integration/mode"
229+
230+
pipeline = Pipeline.create(
231+
{
232+
"run_id": "mode-test",
233+
"source": {
234+
"type": "mode",
235+
"config": {
236+
"token": "xxxx",
237+
"password": "xxxx",
238+
"connect_uri": "https://app.mode.com/",
239+
"workspace": "acryl",
240+
},
241+
},
242+
"sink": {
243+
"type": "file",
244+
"config": {
245+
"filename": f"{tmp_path}/mode_mces.json",
246+
},
247+
},
248+
}
249+
)
250+
pipeline.run()
251+
pipeline.raise_from_status(raise_warnings=False)
252+
with pytest.raises(PipelineExecutionError) as exec_error:
253+
pipeline.raise_from_status(raise_warnings=True)
254+
assert len(exec_error.value.args[1].warnings) > 0
255+
error_dict: StructuredLogEntry
256+
_level, error_dict = exec_error.value.args[1].warnings[0]
257+
error = next(iter(error_dict.context))
258+
assert "Expecting property name enclosed in double quotes" in error

0 commit comments

Comments
 (0)