Skip to content

Commit 15e1d6f

Browse files
committed
added timeout config to ensure queries don't hang overall ingestion pipeline
1 parent cffc6d4 commit 15e1d6f

File tree

1 file changed

+9
-4
lines changed
  • metadata-ingestion/src/datahub/ingestion/source

1 file changed

+9
-4
lines changed

metadata-ingestion/src/datahub/ingestion/source/superset.py

+9-4
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,8 @@ class SupersetConfig(
189189
provider: str = Field(default="db", description="Superset provider.")
190190
options: Dict = Field(default={}, description="")
191191

192+
timeout: int = Field(default=10, description="Timeout of single API call to superset.")
193+
192194
# TODO: Check and remove this if no longer needed.
193195
# Config database_alias is removed from sql sources.
194196
database_alias: Dict[str, str] = Field(
@@ -293,13 +295,14 @@ def login(self) -> requests.Session:
293295
}
294296
)
295297

296-
# Test the connection
297298
test_response = requests_session.get(
298-
f"{self.config.connect_uri}/api/v1/dashboard/"
299+
f"{self.config.connect_uri}/api/v1/dashboard/",
300+
timeout=self.config.timeout,
299301
)
300302
if test_response.status_code == 200:
301-
pass
302-
# TODO(Gabe): how should we message about this error?
303+
# throw an error and terminate ingestion,
304+
# cannot proceed without access token
305+
logger.error(f"Failed to log in to Superset with status: {test_response.status_code}")
303306
return requests_session
304307

305308
def paginate_entity_api_results(self, entity_type, page_size=100):
@@ -310,6 +313,7 @@ def paginate_entity_api_results(self, entity_type, page_size=100):
310313
response = self.session.get(
311314
f"{self.config.connect_uri}/api/v1/{entity_type}",
312315
params={"q": f"(page:{current_page},page_size:{page_size})"},
316+
timeout=self.config.timeout,
313317
)
314318

315319
if response.status_code != 200:
@@ -347,6 +351,7 @@ def build_owner_urn(self, data: Dict[str, Any]) -> List[str]:
347351
def get_dataset_info(self, dataset_id: int) -> dict:
348352
dataset_response = self.session.get(
349353
f"{self.config.connect_uri}/api/v1/dataset/{dataset_id}",
354+
timeout=self.config.timeout,
350355
)
351356
if dataset_response.status_code != 200:
352357
logger.warning(f"Failed to get dataset info: {dataset_response.text}")

0 commit comments

Comments
 (0)