Skip to content

Commit 664b217

Browse files
committed
updating logic ordering
1 parent 3f3151a commit 664b217

File tree

1 file changed

+175
-89
lines changed
  • metadata-ingestion/src/datahub/ingestion/source/bigquery_v2

1 file changed

+175
-89
lines changed

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py

+175-89
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,11 @@ def _get_external_table_partition_filters(
391391
None if partition filters could not be determined
392392
"""
393393
try:
394+
# Try sampling approach first - most efficient
395+
sample_filters = self._get_partitions_with_sampling(table, project, schema)
396+
if sample_filters:
397+
return sample_filters
398+
394399
# Step 1: Get partition columns from INFORMATION_SCHEMA
395400
partition_cols_with_types = self._get_partition_columns_from_info_schema(
396401
table, project, schema
@@ -459,7 +464,7 @@ def _try_partition_combinations(
459464
schema: str,
460465
partition_cols: List[str],
461466
partition_cols_with_types: Dict[str, str],
462-
timeout: int = 300, # This parameter will be ignored
467+
timeout: int = 300,
463468
) -> Optional[List[str]]:
464469
"""
465470
Try to find combinations of partition values that return data.
@@ -860,6 +865,152 @@ def _try_find_most_populated_partition(
860865
logger.warning("All fallback approaches failed to find valid partition values")
861866
return None
862867

868+
def _verify_partition_has_data(
869+
self,
870+
table: BigqueryTable,
871+
project: str,
872+
schema: str,
873+
filters: List[str],
874+
timeout: int = 300, # Increased from 120
875+
) -> bool:
876+
"""
877+
Verify that the partition filters actually return data.
878+
879+
Args:
880+
table: BigqueryTable instance
881+
project: BigQuery project ID
882+
schema: BigQuery dataset name
883+
filters: List of partition filter strings
884+
timeout: Query timeout in seconds
885+
886+
Returns:
887+
True if data exists, False otherwise
888+
"""
889+
if not filters:
890+
return False
891+
892+
# Build WHERE clause from filters
893+
where_clause = " AND ".join(filters)
894+
895+
# Run a simple count query to check if data exists
896+
query = f"""SELECT COUNT(*) as cnt
897+
FROM `{project}.{schema}.{table.name}`
898+
WHERE {where_clause}
899+
LIMIT 1000""" # Limit to avoid expensive full table scans
900+
901+
try:
902+
logger.debug(f"Verifying partition data with query: {query}")
903+
904+
# Set a longer timeout for this operation
905+
query_job = self.config.get_bigquery_client().query(query)
906+
results = list(query_job.result())
907+
908+
if results and results[0].cnt > 0:
909+
logger.info(
910+
f"Verified partition filters return {results[0].cnt} rows: {where_clause}"
911+
)
912+
return True
913+
else:
914+
logger.warning(f"Partition verification found no data: {where_clause}")
915+
return False
916+
except Exception as e:
917+
logger.warning(f"Error verifying partition data: {e}", exc_info=True)
918+
919+
# Try with a simpler query as fallback
920+
try:
921+
simpler_query = f"""
922+
SELECT 1
923+
FROM `{project}.{schema}.{table.name}`
924+
WHERE {where_clause}
925+
LIMIT 1
926+
"""
927+
query_job = self.config.get_bigquery_client().query(simpler_query)
928+
results = list(query_job.result())
929+
930+
return len(results) > 0
931+
except Exception as simple_e:
932+
logger.warning(f"Simple verification also failed: {simple_e}")
933+
return False
934+
935+
def _get_partitions_with_sampling(
936+
self,
937+
table: BigqueryTable,
938+
project: str,
939+
schema: str,
940+
) -> Optional[List[str]]:
941+
"""
942+
Get partition filters using sampling to avoid full table scans.
943+
944+
Args:
945+
table: BigqueryTable instance
946+
project: BigQuery project ID
947+
schema: BigQuery dataset name
948+
949+
Returns:
950+
List of partition filter strings, or None if unable to build filters
951+
"""
952+
try:
953+
# First get partition columns
954+
partition_cols_with_types = self._get_partition_columns_from_info_schema(
955+
table, project, schema
956+
)
957+
958+
if not partition_cols_with_types:
959+
partition_cols_with_types = self._get_partition_columns_from_ddl(
960+
table, project, schema
961+
)
962+
963+
if not partition_cols_with_types:
964+
return None
965+
966+
logger.info(
967+
f"Using sampling to find partition values for {len(partition_cols_with_types)} columns"
968+
)
969+
970+
# Use TABLESAMPLE to get a small sample of data
971+
sample_query = f"""
972+
SELECT *
973+
FROM `{project}.{schema}.{table.name}` TABLESAMPLE SYSTEM (1 PERCENT)
974+
LIMIT 100
975+
"""
976+
977+
query_job = self.config.get_bigquery_client().query(sample_query)
978+
results = list(query_job.result())
979+
980+
if not results:
981+
logger.info("Sample query returned no results")
982+
return None
983+
984+
# Extract values for partition columns
985+
filters = []
986+
for col_name, data_type in partition_cols_with_types.items():
987+
for row in results:
988+
if hasattr(row, col_name) and getattr(row, col_name) is not None:
989+
val = getattr(row, col_name)
990+
filter_str = self._create_partition_filter_from_value(
991+
col_name, val, data_type
992+
)
993+
filters.append(filter_str)
994+
logger.info(
995+
f"Found partition value from sample: {col_name}={val}"
996+
)
997+
break
998+
999+
# Verify the filters return data
1000+
if filters and self._verify_partition_has_data(
1001+
table, project, schema, filters
1002+
):
1003+
logger.info(
1004+
f"Successfully created partition filters from sample: {filters}"
1005+
)
1006+
return filters
1007+
1008+
return None
1009+
1010+
except Exception as e:
1011+
logger.warning(f"Error getting partition filters with sampling: {e}")
1012+
return None
1013+
8631014
def _find_valid_partition_combination(
8641015
self,
8651016
table: BigqueryTable,
@@ -947,74 +1098,6 @@ def _find_valid_partition_combination(
9471098
table, project, schema, partition_cols_with_types, timeout
9481099
)
9491100

950-
def _verify_partition_has_data(
951-
self,
952-
table: BigqueryTable,
953-
project: str,
954-
schema: str,
955-
filters: List[str],
956-
timeout: int = 300, # Increased from 120
957-
) -> bool:
958-
"""
959-
Verify that the partition filters actually return data.
960-
961-
Args:
962-
table: BigqueryTable instance
963-
project: BigQuery project ID
964-
schema: BigQuery dataset name
965-
filters: List of partition filter strings
966-
timeout: Query timeout in seconds
967-
968-
Returns:
969-
True if data exists, False otherwise
970-
"""
971-
if not filters:
972-
return False
973-
974-
# Build WHERE clause from filters
975-
where_clause = " AND ".join(filters)
976-
977-
# Run a simple count query to check if data exists
978-
query = f"""SELECT COUNT(*) as cnt
979-
FROM `{project}.{schema}.{table.name}`
980-
WHERE {where_clause}
981-
LIMIT 1000""" # Limit to avoid expensive full table scans
982-
983-
try:
984-
logger.debug(f"Verifying partition data with query: {query}")
985-
986-
# Set a longer timeout for this operation
987-
query_job = self.config.get_bigquery_client().query(query)
988-
results = list(query_job.result())
989-
990-
if results and results[0].cnt > 0:
991-
logger.info(
992-
f"Verified partition filters return {results[0].cnt} rows: {where_clause}"
993-
)
994-
return True
995-
else:
996-
logger.warning(f"Partition verification found no data: {where_clause}")
997-
return False
998-
except Exception as e:
999-
logger.warning(f"Error verifying partition data: {e}", exc_info=True)
1000-
1001-
# Try with a simpler query as fallback
1002-
try:
1003-
simpler_query = f"""
1004-
SELECT 1
1005-
FROM `{project}.{schema}.{table.name}`
1006-
WHERE {where_clause}
1007-
LIMIT 1
1008-
"""
1009-
query_job = self.config.get_bigquery_client().query(simpler_query)
1010-
results = list(query_job.result())
1011-
1012-
return len(results) > 0
1013-
except Exception as simple_e:
1014-
logger.warning(f"Simple verification also failed: {simple_e}")
1015-
return False
1016-
1017-
# Add this method to improve detection of partition columns from INFORMATION_SCHEMA if not found in partition_info
10181101
def _get_required_partition_filters(
10191102
self,
10201103
table: BigqueryTable,
@@ -1036,6 +1119,11 @@ def _get_required_partition_filters(
10361119
current_time = datetime.now(timezone.utc)
10371120
partition_filters = []
10381121

1122+
# First try sampling approach as it's most efficient
1123+
sample_filters = self._get_partitions_with_sampling(table, project, schema)
1124+
if sample_filters:
1125+
return sample_filters
1126+
10391127
# Get required partition columns from table info
10401128
required_partition_columns = set()
10411129

@@ -1053,10 +1141,10 @@ def _get_required_partition_filters(
10531141
if not required_partition_columns:
10541142
try:
10551143
query = f"""SELECT column_name
1056-
FROM `{project}.{schema}.INFORMATION_SCHEMA.COLUMNS`
1057-
WHERE table_name = '{table.name}' AND is_partitioning_column = 'YES'"""
1144+
FROM `{project}.{schema}.INFORMATION_SCHEMA.COLUMNS`
1145+
WHERE table_name = '{table.name}' AND is_partitioning_column = 'YES'"""
10581146
query_job = self.config.get_bigquery_client().query(query)
1059-
results = list(query_job)
1147+
results = list(query_job.result())
10601148
required_partition_columns = {row.column_name for row in results}
10611149
logger.debug(
10621150
f"Found partition columns from schema: {required_partition_columns}"
@@ -1076,14 +1164,14 @@ def _get_required_partition_filters(
10761164

10771165
logger.debug(f"Required partition columns: {required_partition_columns}")
10781166

1079-
# Get column data types to handle casting correctly
1167+
# Get column data types for the partition columns
10801168
column_data_types = {}
10811169
try:
10821170
query = f"""SELECT column_name, data_type
1083-
FROM `{project}.{schema}.INFORMATION_SCHEMA.COLUMNS`
1084-
WHERE table_name = '{table.name}'"""
1171+
FROM `{project}.{schema}.INFORMATION_SCHEMA.COLUMNS`
1172+
WHERE table_name = '{table.name}'"""
10851173
query_job = self.config.get_bigquery_client().query(query)
1086-
results = list(query_job)
1174+
results = list(query_job.result())
10871175
column_data_types = {row.column_name: row.data_type for row in results}
10881176
except Exception as e:
10891177
logger.error(f"Error fetching column data types: {e}")
@@ -1131,9 +1219,9 @@ def _get_required_partition_filters(
11311219
HAVING record_count > 0
11321220
ORDER BY {col_name} DESC
11331221
LIMIT 1
1134-
)
1135-
SELECT val, record_count
1136-
FROM PartitionStats"""
1222+
)
1223+
SELECT val, record_count
1224+
FROM PartitionStats"""
11371225
logger.debug(f"Executing query for partition value: {query}")
11381226

11391227
query_job = self.config.get_bigquery_client().query(query)
@@ -1175,16 +1263,14 @@ def get_batch_kwargs(
11751263
"table_name": bq_table.name,
11761264
}
11771265

1178-
# Different handling path for external tables vs native tables
1266+
# For external tables, add specific handling
11791267
if bq_table.external:
1180-
logger.info(f"Processing external table: {bq_table.name}")
1181-
partition_filters = self._get_external_table_partition_filters(
1182-
bq_table, db_name, schema_name, datetime.now(timezone.utc)
1183-
)
1184-
else:
1185-
partition_filters = self._get_required_partition_filters(
1186-
bq_table, db_name, schema_name
1187-
)
1268+
base_kwargs["is_external"] = "true"
1269+
# Add any specific external table options needed
1270+
1271+
partition_filters = self._get_required_partition_filters(
1272+
bq_table, db_name, schema_name
1273+
)
11881274

11891275
if partition_filters is None:
11901276
logger.warning(
@@ -1193,7 +1279,7 @@ def get_batch_kwargs(
11931279
)
11941280
return base_kwargs
11951281

1196-
# If no partition filters needed, return base kwargs
1282+
# If no partition filters needed (e.g. some external tables), return base kwargs
11971283
if not partition_filters:
11981284
return base_kwargs
11991285

0 commit comments

Comments
 (0)