Skip to content

Commit e95d9df

Browse files
committed
Update profiler.py
1 parent d462564 commit e95d9df

File tree

1 file changed

+71
-21
lines changed
  • metadata-ingestion/src/datahub/ingestion/source/bigquery_v2

1 file changed

+71
-21
lines changed

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py

+71-21
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,7 @@ def _try_column_specific_strategies(
545545
project: BigQuery project ID
546546
schema: Dataset name
547547
partition_cols_with_types: Dictionary of column names to data types
548-
current_time: Current UTC datetime
548+
current_time: Current UTC datetime (used only for context)
549549
550550
Returns:
551551
List of filter strings if found, None otherwise
@@ -566,35 +566,85 @@ def _try_column_specific_strategies(
566566

567567
# Try date columns first - most commonly used for partitioning
568568
if date_cols:
569-
col_name = date_cols[0] # Start with first date column
570-
data_type = partition_cols_with_types[col_name]
569+
for col_name in date_cols:
570+
data_type = partition_cols_with_types[col_name]
571571

572-
# Try today, yesterday, last week (most common scenarios)
573-
for days_ago in [0, 1, 7, 30]:
574-
test_date = (current_time - timedelta(days=days_ago)).date()
575-
filter_str = self._create_partition_filter_from_value(
576-
col_name, test_date, data_type
572+
# Find actual date values with data instead of assuming current date
573+
query = f"""
574+
WITH PartitionStats AS (
575+
SELECT {col_name} as val, COUNT(*) as record_count
576+
FROM `{project}.{schema}.{table.name}`
577+
WHERE {col_name} IS NOT NULL
578+
GROUP BY {col_name}
579+
HAVING record_count > 0
580+
ORDER BY record_count DESC
581+
LIMIT 5
577582
)
583+
SELECT val, record_count
584+
FROM PartitionStats
585+
"""
578586

579-
if self._quick_verify_filters(table, project, schema, [filter_str]):
580-
logger.info(f"Found valid recent date filter: {filter_str}")
581-
return [filter_str]
587+
try:
588+
query_job = self.config.get_bigquery_client().query(query)
589+
results = list(query_job.result())
590+
591+
for result in results:
592+
if result.val is not None:
593+
filter_str = self._create_partition_filter_from_value(
594+
col_name, result.val, data_type
595+
)
596+
597+
if self._quick_verify_filters(
598+
table, project, schema, [filter_str]
599+
):
600+
logger.info(
601+
f"Found valid date filter: {filter_str} with {result.record_count} rows"
602+
)
603+
return [filter_str]
604+
except Exception as e:
605+
logger.warning(f"Error finding date partitions for {col_name}: {e}")
582606

583607
# Try time columns next
584608
if time_cols:
585-
col_name = time_cols[0]
586-
data_type = partition_cols_with_types[col_name]
609+
for col_name in time_cols:
610+
data_type = partition_cols_with_types[col_name]
587611

588-
# Try current hour, day, etc.
589-
for hours_ago in [0, 1, 24, 168]: # Now, 1hr ago, 1 day ago, 1 week ago
590-
test_time = current_time - timedelta(hours=hours_ago)
591-
filter_str = self._create_partition_filter_from_value(
592-
col_name, test_time, data_type
612+
# Find actual timestamp values with data instead of assuming current time
613+
query = f"""
614+
WITH PartitionStats AS (
615+
SELECT {col_name} as val, COUNT(*) as record_count
616+
FROM `{project}.{schema}.{table.name}`
617+
WHERE {col_name} IS NOT NULL
618+
GROUP BY {col_name}
619+
HAVING record_count > 0
620+
ORDER BY record_count DESC
621+
LIMIT 5
593622
)
623+
SELECT val, record_count
624+
FROM PartitionStats
625+
"""
594626

595-
if self._quick_verify_filters(table, project, schema, [filter_str]):
596-
logger.info(f"Found valid recent time filter: {filter_str}")
597-
return [filter_str]
627+
try:
628+
query_job = self.config.get_bigquery_client().query(query)
629+
results = list(query_job.result())
630+
631+
for result in results:
632+
if result.val is not None:
633+
filter_str = self._create_partition_filter_from_value(
634+
col_name, result.val, data_type
635+
)
636+
637+
if self._quick_verify_filters(
638+
table, project, schema, [filter_str]
639+
):
640+
logger.info(
641+
f"Found valid timestamp filter: {filter_str} with {result.record_count} rows"
642+
)
643+
return [filter_str]
644+
except Exception as e:
645+
logger.warning(
646+
f"Error finding timestamp partitions for {col_name}: {e}"
647+
)
598648

599649
return None
600650

0 commit comments

Comments
 (0)