Skip to content

Commit 4020201

Browse files
committed
trying cheaper sampling methods
1 parent 664b217 commit 4020201

File tree

1 file changed

+290
-7
lines changed
  • metadata-ingestion/src/datahub/ingestion/source/bigquery_v2

1 file changed

+290
-7
lines changed

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py

+290-7
Original file line numberDiff line numberDiff line change
@@ -391,11 +391,6 @@ def _get_external_table_partition_filters(
391391
None if partition filters could not be determined
392392
"""
393393
try:
394-
# Try sampling approach first - most efficient
395-
sample_filters = self._get_partitions_with_sampling(table, project, schema)
396-
if sample_filters:
397-
return sample_filters
398-
399394
# Step 1: Get partition columns from INFORMATION_SCHEMA
400395
partition_cols_with_types = self._get_partition_columns_from_info_schema(
401396
table, project, schema
@@ -418,15 +413,303 @@ def _get_external_table_partition_filters(
418413
f"Found {len(partition_cols_with_types)} partition columns: {list(partition_cols_with_types.keys())}"
419414
)
420415

421-
# Step 4: Find a valid combination of partition filters that returns data
422-
return self._find_valid_partition_combination(
416+
# Step 4: First try sampling approach (most efficient)
417+
sample_filters = self._get_partitions_with_sampling(table, project, schema)
418+
if sample_filters:
419+
return sample_filters
420+
421+
# Step 5: For time partitions in external tables, try recent dates
422+
date_columns = {
423+
col: data_type
424+
for col, data_type in partition_cols_with_types.items()
425+
if (
426+
col.lower()
427+
in {"date", "dt", "day", "created_date", "partition_date"}
428+
)
429+
or (data_type.upper() == "DATE")
430+
}
431+
432+
timestamp_columns = {
433+
col: data_type
434+
for col, data_type in partition_cols_with_types.items()
435+
if (
436+
col.lower()
437+
in {"timestamp", "time", "datetime", "created_at", "updated_at"}
438+
)
439+
or (data_type.upper() in {"TIMESTAMP", "DATETIME"})
440+
}
441+
442+
# Handle date columns first - try working backwards from current date
443+
if date_columns:
444+
filters = self._try_recent_date_partitions(
445+
table, project, schema, date_columns, current_time
446+
)
447+
if filters:
448+
return filters
449+
450+
# Next try timestamp columns similarly
451+
if timestamp_columns:
452+
filters = self._try_recent_timestamp_partitions(
453+
table, project, schema, timestamp_columns, current_time
454+
)
455+
if filters:
456+
return filters
457+
458+
# Step 6: For non-time partitions, try small-scale sampling for each column
459+
for col_name, data_type in partition_cols_with_types.items():
460+
if col_name in date_columns or col_name in timestamp_columns:
461+
continue # Already tried these
462+
463+
# Try to get a few distinct values with minimal scanning
464+
try:
465+
query = f"""
466+
SELECT DISTINCT {col_name}
467+
FROM `{project}.{schema}.{table.name}` TABLESAMPLE SYSTEM (0.5 PERCENT)
468+
WHERE {col_name} IS NOT NULL
469+
LIMIT 5
470+
"""
471+
472+
query_job = self.config.get_bigquery_client().query(query)
473+
values = [getattr(row, col_name) for row in query_job.result()]
474+
475+
# Try each value to find one that works
476+
for val in values:
477+
filter_str = self._create_partition_filter_from_value(
478+
col_name, val, data_type
479+
)
480+
481+
# Verify this value returns data with minimal scanning
482+
if self._verify_partition_has_data_with_limit(
483+
table, project, schema, [filter_str], limit=10
484+
):
485+
return [filter_str]
486+
except Exception as e:
487+
logger.warning(f"Error sampling values for column {col_name}: {e}")
488+
489+
# Last resort: fallback to regular combination finding with careful limits
490+
logger.warning(f"Using fallback approach for external table {table.name}")
491+
return self._find_valid_partition_combination_for_external(
423492
table, project, schema, partition_cols_with_types
424493
)
425494

426495
except Exception as e:
427496
logger.error(f"Error checking external table partitioning: {e}")
428497
return None
429498

499+
def _try_recent_date_partitions(
500+
self,
501+
table: BigqueryTable,
502+
project: str,
503+
schema: str,
504+
date_columns: Dict[str, str],
505+
current_time: datetime,
506+
) -> Optional[List[str]]:
507+
"""
508+
Try recent date values for external table date partitions.
509+
"""
510+
# Try dates in reverse chronological order
511+
test_dates = [
512+
current_time.date(), # Today
513+
(current_time - timedelta(days=1)).date(), # Yesterday
514+
(current_time - timedelta(days=7)).date(), # Last week
515+
(current_time - timedelta(days=30)).date(), # Last month
516+
(current_time - timedelta(days=90)).date(), # Last quarter
517+
(current_time - timedelta(days=365)).date(), # Last year
518+
]
519+
520+
for col_name, data_type in date_columns.items():
521+
for test_date in test_dates:
522+
filter_str = self._create_partition_filter_from_value(
523+
col_name, test_date, data_type
524+
)
525+
526+
# Check if this date filter returns data with minimal scanning
527+
if self._verify_partition_has_data_with_limit(
528+
table, project, schema, [filter_str], limit=10
529+
):
530+
logger.info(
531+
f"Found working date filter for external table: {filter_str}"
532+
)
533+
return [filter_str]
534+
535+
return None
536+
537+
def _try_recent_timestamp_partitions(
538+
self,
539+
table: BigqueryTable,
540+
project: str,
541+
schema: str,
542+
timestamp_columns: Dict[str, str],
543+
current_time: datetime,
544+
) -> Optional[List[str]]:
545+
"""
546+
Try recent timestamp values for external table timestamp partitions.
547+
"""
548+
# Create timestamp ranges - truncate to hour to increase matches
549+
current_hour = current_time.replace(minute=0, second=0, microsecond=0)
550+
551+
test_times = [
552+
current_hour, # This hour
553+
current_hour - timedelta(hours=1), # Last hour
554+
current_hour - timedelta(hours=24), # Yesterday same hour
555+
current_hour - timedelta(days=7), # Last week same hour
556+
]
557+
558+
for col_name, _data_type in timestamp_columns.items():
559+
for test_time in test_times:
560+
# For timestamps, we can do a range check instead of exact match to increase chances
561+
# of finding data in a specific time window
562+
filter_str = (
563+
f"`{col_name}` >= TIMESTAMP '{test_time.strftime('%Y-%m-%d %H:00:00')}' "
564+
+ f"AND `{col_name}` < TIMESTAMP '{(test_time + timedelta(hours=1)).strftime('%Y-%m-%d %H:00:00')}'"
565+
)
566+
567+
# Check if this filter returns data with minimal scanning
568+
if self._verify_partition_has_data_with_limit(
569+
table, project, schema, [filter_str], limit=10
570+
):
571+
logger.info(
572+
f"Found working timestamp filter for external table: {filter_str}"
573+
)
574+
return [filter_str]
575+
576+
return None
577+
578+
def _verify_partition_has_data_with_limit(
579+
self,
580+
table: BigqueryTable,
581+
project: str,
582+
schema: str,
583+
filters: List[str],
584+
limit: int = 10,
585+
) -> bool:
586+
"""
587+
Verify partition filters return data using a LIMIT clause to avoid full scans.
588+
"""
589+
if not filters:
590+
return False
591+
592+
where_clause = " AND ".join(filters)
593+
594+
# Use LIMIT 1 to minimize data scanned
595+
query = f"""
596+
SELECT 1
597+
FROM `{project}.{schema}.{table.name}`
598+
WHERE {where_clause}
599+
LIMIT {limit}
600+
"""
601+
602+
try:
603+
logger.debug(
604+
f"Verifying external table partition with minimal query: {query}"
605+
)
606+
query_job = self.config.get_bigquery_client().query(query)
607+
results = list(query_job.result())
608+
return len(results) > 0
609+
except Exception as e:
610+
logger.warning(f"Error verifying partition with limit: {e}")
611+
return False
612+
613+
def _find_valid_partition_combination_for_external(
614+
self,
615+
table: BigqueryTable,
616+
project: str,
617+
schema: str,
618+
partition_cols_with_types: Dict[str, str],
619+
) -> Optional[List[str]]:
620+
"""
621+
Find valid partition combination for external tables with minimal scanning.
622+
"""
623+
# For external tables, we'll be extra cautious to avoid large scans
624+
# Start by trying to get a single value for each partition column individually
625+
individual_filters = []
626+
627+
for col_name, data_type in partition_cols_with_types.items():
628+
try:
629+
# Use TABLESAMPLE to minimize scanning
630+
query = f"""
631+
SELECT DISTINCT {col_name}
632+
FROM `{project}.{schema}.{table.name}` TABLESAMPLE SYSTEM (0.1 PERCENT)
633+
WHERE {col_name} IS NOT NULL
634+
LIMIT 5
635+
"""
636+
637+
query_job = self.config.get_bigquery_client().query(query)
638+
rows = list(query_job.result())
639+
640+
if rows:
641+
# Try each value to find one that works
642+
for row in rows:
643+
val = getattr(row, col_name)
644+
filter_str = self._create_partition_filter_from_value(
645+
col_name, val, data_type
646+
)
647+
648+
if self._verify_partition_has_data_with_limit(
649+
table, project, schema, [filter_str]
650+
):
651+
individual_filters.append(filter_str)
652+
logger.info(
653+
f"Found working filter for external table: {filter_str}"
654+
)
655+
# For external tables, we might only need one partition column filter
656+
# to get good performance
657+
return [filter_str]
658+
except Exception as e:
659+
logger.warning(
660+
f"Error finding valid partition for column {col_name}: {e}"
661+
)
662+
663+
# If we have multiple individual filters, verify they work together
664+
if (
665+
len(individual_filters) > 1
666+
and self._verify_partition_has_data_with_limit(
667+
table, project, schema, individual_filters
668+
)
669+
or len(individual_filters) == 1
670+
):
671+
return individual_filters
672+
673+
# Last resort - return a dummy filter that will get at least some data
674+
logger.warning(f"Using catch-all approach for external table {table.name}")
675+
676+
# Pick a non-time column if available
677+
non_time_cols = [
678+
col
679+
for col in partition_cols_with_types.keys()
680+
if not any(
681+
time_word in col.lower()
682+
for time_word in ["date", "time", "day", "month", "year", "hour"]
683+
)
684+
]
685+
686+
if non_time_cols:
687+
col_name = non_time_cols[0]
688+
try:
689+
# Try to get any value at all
690+
query = f"""
691+
SELECT DISTINCT {col_name}
692+
FROM `{project}.{schema}.{table.name}`
693+
WHERE {col_name} IS NOT NULL
694+
LIMIT 1
695+
"""
696+
697+
query_job = self.config.get_bigquery_client().query(query)
698+
rows = list(query_job.result())
699+
700+
if rows and hasattr(rows[0], col_name):
701+
val = getattr(rows[0], col_name)
702+
data_type = partition_cols_with_types[col_name]
703+
filter_str = self._create_partition_filter_from_value(
704+
col_name, val, data_type
705+
)
706+
return [filter_str]
707+
except Exception as e:
708+
logger.warning(f"Error in last resort approach: {e}")
709+
710+
# Empty list indicates no partitions found or usable
711+
return []
712+
430713
def _create_partition_filter_from_value(
431714
self, col_name: str, val: Any, data_type: str
432715
) -> str:

0 commit comments

Comments
 (0)