Skip to content

Commit 62bafec

Browse files
committed
improved sql queries
1 parent bc18e1b commit 62bafec

File tree

1 file changed

+108
-89
lines changed
  • metadata-ingestion/src/datahub/ingestion/source/bigquery_v2

1 file changed

+108
-89
lines changed

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py

+108-89
Original file line numberDiff line numberDiff line change
@@ -1529,46 +1529,29 @@ def _fetch_schema_info(
15291529
self, table: BigqueryTable, project: str, schema: str, metadata: Dict[str, Any]
15301530
) -> Dict[str, Any]:
15311531
"""Fetch schema information from INFORMATION_SCHEMA."""
1532-
if not hasattr(self, "_column_name_mapping"):
1533-
self._detect_bq_schema_version()
1534-
1535-
# Get column name mappings
1536-
row_count_col = self._column_name_mapping.get("row_count", "row_count")
1537-
size_bytes_col = self._column_name_mapping.get("size_bytes", "size_bytes")
1538-
1539-
# Use explicit column selection and aliases to avoid issues
1540-
combined_query = f"""
1541-
SELECT
1542-
c.column_name,
1543-
c.data_type,
1544-
c.is_partitioning_column,
1545-
c.clustering_ordinal_position,
1546-
t.{row_count_col} as row_count,
1547-
t.{size_bytes_col} as size_bytes,
1548-
t.ddl,
1549-
t.creation_time,
1550-
t.last_modified_time,
1551-
t.table_type
1552-
FROM
1553-
`{project}.{schema}.INFORMATION_SCHEMA.COLUMNS` c
1554-
JOIN
1555-
`{project}.{schema}.INFORMATION_SCHEMA.TABLES` t
1556-
ON
1557-
c.table_name = t.table_name
1558-
WHERE
1559-
c.table_name = '{table.name}'
1560-
AND (c.is_partitioning_column = 'YES' OR c.clustering_ordinal_position IS NOT NULL)
1561-
"""
1562-
15631532
try:
1564-
schema_results = self._execute_cached_query(
1565-
combined_query,
1566-
f"schema_info_{project}_{schema}_{table.name}",
1533+
# Get partition and clustering info from COLUMNS
1534+
columns_query = f"""
1535+
SELECT
1536+
column_name,
1537+
data_type,
1538+
is_partitioning_column,
1539+
clustering_ordinal_position
1540+
FROM
1541+
`{project}.{schema}.INFORMATION_SCHEMA.COLUMNS`
1542+
WHERE
1543+
table_name = '{table.name}'
1544+
AND (is_partitioning_column = 'YES' OR clustering_ordinal_position IS NOT NULL)
1545+
"""
1546+
1547+
columns_results = self._execute_cached_query(
1548+
columns_query,
1549+
f"columns_info_{project}_{schema}_{table.name}",
15671550
timeout=45,
15681551
)
15691552

1570-
# Process results for partition and clustering columns
1571-
for row in schema_results:
1553+
# Process partition and clustering columns
1554+
for row in columns_results:
15721555
# Update partition columns
15731556
if row.is_partitioning_column == "YES":
15741557
metadata["partition_columns"][row.column_name] = row.data_type
@@ -1580,41 +1563,63 @@ def _fetch_schema_info(
15801563
"data_type": row.data_type,
15811564
}
15821565

1583-
# Update table metadata from first row (all rows have same values)
1584-
# Use hasattr with default values to handle missing attributes safely
1585-
if (
1586-
hasattr(row, "row_count")
1587-
and row.row_count
1588-
and not metadata["row_count"]
1589-
):
1590-
metadata["row_count"] = row.row_count
1566+
# Get table metadata from TABLES
1567+
table_query = f"""
1568+
SELECT
1569+
ddl,
1570+
creation_time,
1571+
last_modified_time,
1572+
table_type
1573+
FROM
1574+
`{project}.{schema}.INFORMATION_SCHEMA.TABLES`
1575+
WHERE
1576+
table_name = '{table.name}'
1577+
"""
15911578

1592-
if (
1593-
hasattr(row, "size_bytes")
1594-
and row.size_bytes
1595-
and not metadata["size_bytes"]
1596-
):
1597-
metadata["size_bytes"] = row.size_bytes
1579+
table_results = self._execute_cached_query(
1580+
table_query,
1581+
f"table_info_{project}_{schema}_{table.name}",
1582+
timeout=30,
1583+
)
15981584

1599-
if hasattr(row, "ddl") and row.ddl and not metadata["ddl"]:
1585+
if table_results and len(table_results) > 0:
1586+
row = table_results[0]
1587+
if hasattr(row, "ddl") and row.ddl:
16001588
metadata["ddl"] = row.ddl
1601-
1602-
if (
1603-
hasattr(row, "creation_time")
1604-
and row.creation_time
1605-
and not metadata.get("creation_time")
1606-
):
1589+
if hasattr(row, "creation_time") and row.creation_time:
16071590
metadata["creation_time"] = row.creation_time
1591+
if hasattr(row, "last_modified_time") and row.last_modified_time:
1592+
metadata["last_modified_time"] = row.last_modified_time
1593+
if hasattr(row, "table_type") and row.table_type:
1594+
metadata["table_type"] = row.table_type
1595+
1596+
# Get storage statistics separately
1597+
storage_query = f"""
1598+
SELECT
1599+
total_rows,
1600+
total_logical_bytes
1601+
FROM
1602+
`{project}.{schema}.INFORMATION_SCHEMA.TABLE_STORAGE`
1603+
WHERE
1604+
table_name = '{table.name}'
1605+
"""
1606+
1607+
storage_results = self._execute_cached_query(
1608+
storage_query,
1609+
f"storage_info_{project}_{schema}_{table.name}",
1610+
timeout=30,
1611+
)
16081612

1613+
if storage_results and len(storage_results) > 0:
1614+
row = storage_results[0]
1615+
if hasattr(row, "total_rows") and row.total_rows is not None:
1616+
metadata["row_count"] = row.total_rows
16091617
if (
1610-
hasattr(row, "last_modified_time")
1611-
and row.last_modified_time
1612-
and not metadata.get("last_modified_time")
1618+
hasattr(row, "total_logical_bytes")
1619+
and row.total_logical_bytes is not None
16131620
):
1614-
metadata["last_modified_time"] = row.last_modified_time
1621+
metadata["size_bytes"] = row.total_logical_bytes
16151622

1616-
if hasattr(row, "table_type") and row.table_type:
1617-
metadata["table_type"] = row.table_type
16181623
except Exception as e:
16191624
logger.warning(f"Error fetching schema information: {e}")
16201625

@@ -1623,46 +1628,60 @@ def _fetch_schema_info(
16231628
def _fetch_table_stats(
16241629
self, project: str, schema: str, table_name: str, metadata: Dict[str, Any]
16251630
) -> Dict[str, Any]:
1626-
"""Fetch additional table stats if needed."""
1631+
"""Fetch additional table stats from the correct INFORMATION_SCHEMA view."""
16271632
if not metadata.get("row_count") or not metadata.get("size_bytes"):
1628-
if not hasattr(self, "_column_name_mapping"):
1629-
self._detect_bq_schema_version()
1630-
1631-
# Get column name mappings
1632-
row_count_col = self._column_name_mapping.get("row_count", "row_count")
1633-
size_bytes_col = self._column_name_mapping.get("size_bytes", "size_bytes")
1634-
1635-
# Use explicit column selection with aliases
1636-
stats_query = f"""
1637-
SELECT
1638-
{row_count_col} as row_count,
1639-
{size_bytes_col} as size_bytes,
1640-
creation_time,
1641-
last_modified_time
1642-
FROM
1643-
`{project}.{schema}.INFORMATION_SCHEMA.TABLES`
1644-
WHERE
1645-
table_name = '{table_name}'
1646-
"""
1647-
16481633
try:
1634+
# Use TABLE_STORAGE which has the row count and size information
1635+
stats_query = f"""
1636+
SELECT
1637+
total_rows,
1638+
total_logical_bytes,
1639+
total_physical_bytes
1640+
FROM
1641+
`{project}.{schema}.INFORMATION_SCHEMA.TABLE_STORAGE`
1642+
WHERE
1643+
table_name = '{table_name}'
1644+
"""
1645+
16491646
stats_results = self._execute_cached_query(
16501647
stats_query, f"table_stats_{project}_{schema}_{table_name}"
16511648
)
16521649

1653-
if stats_results:
1650+
if stats_results and len(stats_results) > 0:
16541651
row = stats_results[0]
1655-
if hasattr(row, "row_count") and row.row_count is not None:
1656-
metadata["row_count"] = row.row_count
1657-
if hasattr(row, "size_bytes") and row.size_bytes is not None:
1658-
metadata["size_bytes"] = row.size_bytes
1652+
if hasattr(row, "total_rows") and row.total_rows is not None:
1653+
metadata["row_count"] = row.total_rows
1654+
if (
1655+
hasattr(row, "total_logical_bytes")
1656+
and row.total_logical_bytes is not None
1657+
):
1658+
metadata["size_bytes"] = row.total_logical_bytes
1659+
1660+
# Get creation time and last modified time from TABLES view
1661+
time_query = f"""
1662+
SELECT
1663+
creation_time,
1664+
last_modified_time
1665+
FROM
1666+
`{project}.{schema}.INFORMATION_SCHEMA.TABLES`
1667+
WHERE
1668+
table_name = '{table_name}'
1669+
"""
1670+
1671+
time_results = self._execute_cached_query(
1672+
time_query, f"table_time_{project}_{schema}_{table_name}"
1673+
)
1674+
1675+
if time_results and len(time_results) > 0:
1676+
row = time_results[0]
16591677
if hasattr(row, "creation_time") and row.creation_time is not None:
16601678
metadata["creation_time"] = row.creation_time
16611679
if (
16621680
hasattr(row, "last_modified_time")
16631681
and row.last_modified_time is not None
16641682
):
16651683
metadata["last_modified_time"] = row.last_modified_time
1684+
16661685
except Exception as e:
16671686
logger.warning(
16681687
f"Error fetching table stats: {e}, continuing with available metadata"

0 commit comments

Comments
 (0)