Skip to content

Commit 3f3151a

Browse files
committed
removing timeouts
1 parent 3078e25 commit 3f3151a

File tree

1 file changed

+38
-13
lines changed
  • metadata-ingestion/src/datahub/ingestion/source/bigquery_v2

1 file changed

+38
-13
lines changed

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py

+38-13
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ def _get_most_populated_partitions(
109109

110110
logger.debug(f"Executing combined partition query: {query}")
111111
query_job = self.config.get_bigquery_client().query(query)
112-
results = list(query_job.result(timeout=timeout))
112+
results = list(query_job.result())
113113

114114
if results:
115115
# Take the partition combination with the most records
@@ -184,7 +184,7 @@ def _get_most_populated_partitions(
184184
f"Executing query for partition column {col_name}: {query}"
185185
)
186186
query_job = self.config.get_bigquery_client().query(query)
187-
results = list(query_job.result(timeout=timeout))
187+
results = list(query_job.result())
188188

189189
if not results or results[0].val is None:
190190
logger.warning(
@@ -334,7 +334,7 @@ def _create_filter_for_partition_column(
334334

335335
try:
336336
query_job = self.config.get_bigquery_client().query(query)
337-
results = list(query_job.result(timeout=timeout))
337+
results = list(query_job.result())
338338

339339
if not results or results[0].val is None:
340340
logger.warning(
@@ -459,7 +459,7 @@ def _try_partition_combinations(
459459
schema: str,
460460
partition_cols: List[str],
461461
partition_cols_with_types: Dict[str, str],
462-
timeout: int = 300,
462+
timeout: int = 300, # This parameter will be ignored
463463
) -> Optional[List[str]]:
464464
"""
465465
Try to find combinations of partition values that return data.
@@ -470,7 +470,7 @@ def _try_partition_combinations(
470470
schema: BigQuery dataset name
471471
partition_cols: List of partition column names
472472
partition_cols_with_types: Dictionary of column names to data types
473-
timeout: Query timeout in seconds
473+
timeout: Parameter retained for API compatibility but not used
474474
475475
Returns:
476476
List of filter strings if found, None otherwise
@@ -494,10 +494,35 @@ def _try_partition_combinations(
494494
"""
495495

496496
logger.debug(f"Finding partition combinations: {query}")
497-
query_job = self.config.get_bigquery_client().query(query)
498-
results = list(query_job.result(timeout=timeout))
499497

498+
try:
499+
query_job = self.config.get_bigquery_client().query(query)
500+
# Call result() without a timeout parameter
501+
results = list(query_job.result())
502+
except Exception as e:
503+
logger.warning(f"Error querying partition combinations: {e}")
504+
try:
505+
# Try a simpler version of the query
506+
simplified_query = f"""
507+
SELECT {partition_cols_select}, COUNT(*) as record_count
508+
FROM `{project}.{schema}.{table.name}`
509+
WHERE {partition_cols[0]} IS NOT NULL
510+
GROUP BY {partition_cols_select}
511+
ORDER BY record_count DESC
512+
LIMIT 5
513+
"""
514+
logger.info(f"Trying simplified query: {simplified_query}")
515+
simplified_job = self.config.get_bigquery_client().query(
516+
simplified_query
517+
)
518+
results = list(simplified_job.result())
519+
except Exception as simplified_error:
520+
logger.warning(f"Simplified query also failed: {simplified_error}")
521+
return None
522+
523+
# If the query returned no results
500524
if not results:
525+
logger.warning("No partition combinations found")
501526
return None
502527

503528
# Try each combination, starting with the one with most records
@@ -683,7 +708,7 @@ def _try_column_with_different_orderings(
683708
logger.info(f"Executing fallback query with {order_by} ordering")
684709
try:
685710
query_job = self.config.get_bigquery_client().query(query)
686-
results = list(query_job.result(timeout=timeout))
711+
results = list(query_job.result())
687712

688713
if results:
689714
logger.info(f"Query returned {len(results)} potential values")
@@ -759,7 +784,7 @@ def _try_get_most_recent_partition(
759784
"""
760785

761786
query_job = self.config.get_bigquery_client().query(query)
762-
results = list(query_job.result(timeout=timeout))
787+
results = list(query_job.result())
763788

764789
if results and len(results) > 0:
765790
for col_name in partition_cols_with_types:
@@ -809,7 +834,7 @@ def _try_find_most_populated_partition(
809834

810835
try:
811836
query_job = self.config.get_bigquery_client().query(query)
812-
results = list(query_job.result(timeout=timeout))
837+
results = list(query_job.result())
813838

814839
if (
815840
results
@@ -960,7 +985,7 @@ def _verify_partition_has_data(
960985

961986
# Set a longer timeout for this operation
962987
query_job = self.config.get_bigquery_client().query(query)
963-
results = list(query_job.result(timeout=timeout))
988+
results = list(query_job.result())
964989

965990
if results and results[0].cnt > 0:
966991
logger.info(
@@ -982,7 +1007,7 @@ def _verify_partition_has_data(
9821007
LIMIT 1
9831008
"""
9841009
query_job = self.config.get_bigquery_client().query(simpler_query)
985-
results = list(query_job.result(timeout=timeout))
1010+
results = list(query_job.result())
9861011

9871012
return len(results) > 0
9881013
except Exception as simple_e:
@@ -1112,7 +1137,7 @@ def _get_required_partition_filters(
11121137
logger.debug(f"Executing query for partition value: {query}")
11131138

11141139
query_job = self.config.get_bigquery_client().query(query)
1115-
results = list(query_job.result(timeout=300))
1140+
results = list(query_job.result())
11161141

11171142
if not results or results[0].val is None:
11181143
logger.warning(

0 commit comments

Comments
 (0)