Skip to content

Commit 18379ea

Browse files
committed
Update profiler.py
1 parent b177908 commit 18379ea

File tree

1 file changed

+83
-36
lines changed
  • metadata-ingestion/src/datahub/ingestion/source/bigquery_v2

1 file changed

+83
-36
lines changed

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py

+83-36
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ def __init__(
4747
self._queried_tables: Set[str] = set()
4848
# Cache for successful partition filters
4949
self._successful_filters_cache: Dict[str, List[str]] = {}
50+
# Detect BigQuery schema version and set up column mappings
51+
self._detect_bq_schema_version()
5052

5153
def _execute_cached_query(
5254
self,
@@ -64,10 +66,8 @@ def _execute_cached_query(
6466

6567
while retries <= max_retries:
6668
try:
67-
# Apply query modifier to adjust column names based on BigQuery version
68-
modified_query = self._adjust_query_for_bq_version(query)
69-
70-
def execute_query(query_to_execute=modified_query):
69+
# Define a function to execute the query to avoid lambda binding issues
70+
def execute_query(query_to_execute=query):
7171
return list(
7272
self.config.get_bigquery_client()
7373
.query(query_to_execute)
@@ -88,9 +88,7 @@ def execute_query(query_to_execute=modified_query):
8888
)
8989
retries += 1
9090
if retries > max_retries:
91-
logger.warning(
92-
f"Final timeout for query: {modified_query[:200]}..."
93-
)
91+
logger.warning(f"Final timeout for query: {query[:200]}...")
9492
return []
9593
# Increase timeout for retries
9694
timeout = min(timeout * 2, 300) # Max 5 minutes
@@ -154,37 +152,69 @@ def _adjust_query_for_bq_version(self, query: str) -> str:
154152

155153
return modified_query
156154

157-
def _detect_bq_schema_version(self):
155+
def _detect_bq_schema_version(self) -> None:
158156
"""
159157
Detect which version of INFORMATION_SCHEMA we're working with.
160-
Set self._bq_uses_new_schema to True if we have the newer schema version.
158+
Sets self._column_name_mapping with the appropriate column mappings.
161159
"""
162160
try:
163-
# Try to execute a simple query to detect the schema version
164-
detect_query = """
165-
SELECT column_name
166-
FROM INFORMATION_SCHEMA.COLUMNS
167-
WHERE table_name = 'TABLES'
168-
AND table_schema = 'INFORMATION_SCHEMA'
161+
# Try to execute a simple test query that works on all versions
162+
# This just gets us a small amount of data to check column names
163+
test_query = """
164+
SELECT *
165+
FROM INFORMATION_SCHEMA.TABLES
166+
LIMIT 1
169167
"""
170168

171-
results = list(
172-
self.config.get_bigquery_client().query(detect_query).result()
173-
)
169+
results = list(self.config.get_bigquery_client().query(test_query).result())
170+
if not results:
171+
# If no results, default to old schema (safer choice)
172+
self._column_name_mapping = {
173+
"row_count": "row_count",
174+
"size_bytes": "size_bytes",
175+
}
176+
logger.info(
177+
"No test results returned. Defaulting to old schema column names."
178+
)
179+
return
174180

175-
# Look for the new column names in the results
176-
column_names = [row.column_name for row in results]
177-
self._bq_uses_new_schema = "total_rows" in column_names
181+
# Check what column names are available in the result
182+
row = results[0]
183+
184+
# Initialize mapping dict
185+
self._column_name_mapping = {}
186+
187+
# Check for row count column
188+
if hasattr(row, "total_rows"):
189+
self._column_name_mapping["row_count"] = "total_rows"
190+
elif hasattr(row, "row_count"):
191+
self._column_name_mapping["row_count"] = "row_count"
192+
else:
193+
# Default
194+
self._column_name_mapping["row_count"] = "row_count"
195+
196+
# Check for size bytes column
197+
if hasattr(row, "total_logical_bytes"):
198+
self._column_name_mapping["size_bytes"] = "total_logical_bytes"
199+
elif hasattr(row, "size_bytes"):
200+
self._column_name_mapping["size_bytes"] = "size_bytes"
201+
else:
202+
# Default
203+
self._column_name_mapping["size_bytes"] = "size_bytes"
178204

179205
logger.info(
180-
f"Detected BigQuery INFORMATION_SCHEMA version. Uses new schema: {self._bq_uses_new_schema}"
206+
f"Detected BigQuery INFORMATION_SCHEMA column mapping: {self._column_name_mapping}"
181207
)
208+
182209
except Exception as e:
183-
# If detection fails, default to the new schema (safer option)
210+
# If detection fails, default to old schema as it's more common
211+
self._column_name_mapping = {
212+
"row_count": "row_count",
213+
"size_bytes": "size_bytes",
214+
}
184215
logger.warning(
185-
f"Could not detect BigQuery schema version: {e}. Defaulting to new schema."
216+
f"Error detecting BigQuery schema version: {e}. Defaulting to old schema column names."
186217
)
187-
self._bq_uses_new_schema = True
188218

189219
@staticmethod
190220
def get_partition_range_from_partition_id(
@@ -1499,15 +1529,22 @@ def _fetch_schema_info(
14991529
self, table: BigqueryTable, project: str, schema: str, metadata: Dict[str, Any]
15001530
) -> Dict[str, Any]:
15011531
"""Fetch schema information from INFORMATION_SCHEMA."""
1502-
# Use column aliases to handle both schema versions
1532+
if not hasattr(self, "_column_name_mapping"):
1533+
self._detect_bq_schema_version()
1534+
1535+
# Get column name mappings
1536+
row_count_col = self._column_name_mapping.get("row_count", "row_count")
1537+
size_bytes_col = self._column_name_mapping.get("size_bytes", "size_bytes")
1538+
1539+
# Use explicit column selection and aliases to avoid issues
15031540
combined_query = f"""
15041541
SELECT
15051542
c.column_name,
15061543
c.data_type,
15071544
c.is_partitioning_column,
15081545
c.clustering_ordinal_position,
1509-
t.row_count,
1510-
t.size_bytes,
1546+
t.{row_count_col} as row_count,
1547+
t.{size_bytes_col} as size_bytes,
15111548
t.ddl,
15121549
t.creation_time,
15131550
t.last_modified_time,
@@ -1544,7 +1581,7 @@ def _fetch_schema_info(
15441581
}
15451582

15461583
# Update table metadata from first row (all rows have same values)
1547-
# Use getattr with default values to handle missing attributes safely
1584+
# Use hasattr with default values to handle missing attributes safely
15481585
if (
15491586
hasattr(row, "row_count")
15501587
and row.row_count
@@ -1588,11 +1625,18 @@ def _fetch_table_stats(
15881625
) -> Dict[str, Any]:
15891626
"""Fetch additional table stats if needed."""
15901627
if not metadata.get("row_count") or not metadata.get("size_bytes"):
1591-
# Use basic column names - our _adjust_query_for_bq_version function will handle the conversion
1628+
if not hasattr(self, "_column_name_mapping"):
1629+
self._detect_bq_schema_version()
1630+
1631+
# Get column name mappings
1632+
row_count_col = self._column_name_mapping.get("row_count", "row_count")
1633+
size_bytes_col = self._column_name_mapping.get("size_bytes", "size_bytes")
1634+
1635+
# Use explicit column selection with aliases
15921636
stats_query = f"""
15931637
SELECT
1594-
row_count,
1595-
size_bytes,
1638+
{row_count_col} as row_count,
1639+
{size_bytes_col} as size_bytes,
15961640
creation_time,
15971641
last_modified_time
15981642
FROM
@@ -1608,13 +1652,16 @@ def _fetch_table_stats(
16081652

16091653
if stats_results:
16101654
row = stats_results[0]
1611-
if hasattr(row, "row_count") and row.row_count:
1655+
if hasattr(row, "row_count") and row.row_count is not None:
16121656
metadata["row_count"] = row.row_count
1613-
if hasattr(row, "size_bytes") and row.size_bytes:
1657+
if hasattr(row, "size_bytes") and row.size_bytes is not None:
16141658
metadata["size_bytes"] = row.size_bytes
1615-
if hasattr(row, "creation_time") and row.creation_time:
1659+
if hasattr(row, "creation_time") and row.creation_time is not None:
16161660
metadata["creation_time"] = row.creation_time
1617-
if hasattr(row, "last_modified_time") and row.last_modified_time:
1661+
if (
1662+
hasattr(row, "last_modified_time")
1663+
and row.last_modified_time is not None
1664+
):
16181665
metadata["last_modified_time"] = row.last_modified_time
16191666
except Exception as e:
16201667
logger.warning(

0 commit comments

Comments
 (0)