Skip to content

Commit 48f41b2

Browse files
committed
updates for missed partition tables
1 parent 64e48a4 commit 48f41b2

File tree

2 files changed

+122
-52
lines changed

2 files changed

+122
-52
lines changed

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/profiler.py

+121-52
Original file line numberDiff line numberDiff line change
@@ -84,32 +84,69 @@ def _get_external_table_partition_filters(
8484
None if partition filters could not be determined
8585
"""
8686
try:
87+
# For external tables, we need to check specifically for partitioning columns
88+
# and also look at the DDL if available to detect hive-style partitioning
89+
90+
# First, try to get partition columns directly from INFORMATION_SCHEMA
8791
query = f"""SELECT column_name, data_type
8892
FROM `{project}.{schema}.INFORMATION_SCHEMA.COLUMNS`
8993
WHERE table_name = '{table.name}' AND is_partitioning_column = 'YES'"""
9094
query_job = self.config.get_bigquery_client().query(query)
9195
results = list(query_job)
9296

93-
if results:
94-
# For external tables, also capture data type
95-
partition_cols_with_types = {
96-
row.column_name: row.data_type for row in results
97-
}
98-
required_partition_columns = set(partition_cols_with_types.keys())
97+
partition_cols_with_types = {
98+
row.column_name: row.data_type for row in results
99+
}
100+
101+
# If we didn't find any partition columns through INFORMATION_SCHEMA,
102+
# check the DDL for external table declarations that have partition info
103+
if not partition_cols_with_types and table.ddl:
104+
# Very simple DDL parsing to look for PARTITION BY statements
105+
if "PARTITION BY" in table.ddl.upper():
106+
ddl_lines = table.ddl.upper().split("\n")
107+
for line in ddl_lines:
108+
if "PARTITION BY" in line:
109+
# Look for column names mentioned in the PARTITION BY clause
110+
# This is a basic extraction and may need enhancement for complex DDLs
111+
parts = (
112+
line.split("PARTITION BY")[1]
113+
.split("OPTIONS")[0]
114+
.strip()
115+
)
116+
potential_cols = [
117+
col.strip(", `()") for col in parts.split()
118+
]
119+
120+
# Get all columns to check data types for potential partition columns
121+
all_cols_query = f"""SELECT column_name, data_type
122+
FROM `{project}.{schema}.INFORMATION_SCHEMA.COLUMNS`
123+
WHERE table_name = '{table.name}'"""
124+
all_cols_job = self.config.get_bigquery_client().query(
125+
all_cols_query
126+
)
127+
all_cols_results = list(all_cols_job)
128+
all_cols_dict = {
129+
row.column_name.upper(): row.data_type
130+
for row in all_cols_results
131+
}
132+
133+
# Add potential partition columns with their types
134+
for col in potential_cols:
135+
if col in all_cols_dict:
136+
partition_cols_with_types[col] = all_cols_dict[col]
137+
138+
partition_filters = []
139+
140+
# Process all identified partition columns
141+
for col_name, data_type in partition_cols_with_types.items():
99142
logger.debug(
100-
f"Found external partition columns: {required_partition_columns}"
143+
f"Processing external table partition column: {col_name} with type {data_type}"
101144
)
102145

103-
# For tables with single DATE/TIMESTAMP partition, we can use current date/time
104-
if len(required_partition_columns) == 1:
105-
col_name = list(required_partition_columns)[0]
106-
col_type = partition_cols_with_types[col_name].upper()
107-
108-
# For temporal partitions, find the latest non-empty partition
109-
if col_type in ("DATE", "TIMESTAMP", "DATETIME"):
110-
# Query to find latest non-empty partition for temporal columns
111-
temporal_query = f"""WITH PartitionStats AS (
112-
SELECT {col_name} as partition_value,
146+
# For each partition column, we need to find a valid value
147+
query = f"""
148+
WITH PartitionStats AS (
149+
SELECT {col_name} as val,
113150
COUNT(*) as record_count
114151
FROM `{project}.{schema}.{table.name}`
115152
WHERE {col_name} IS NOT NULL
@@ -118,41 +155,57 @@ def _get_external_table_partition_filters(
118155
ORDER BY {col_name} DESC
119156
LIMIT 1
120157
)
121-
SELECT partition_value, record_count
158+
SELECT val, record_count
122159
FROM PartitionStats"""
123160

124-
query_job = self.config.get_bigquery_client().query(
125-
temporal_query
126-
)
127-
temporal_results = list(query_job)
161+
try:
162+
query_job = self.config.get_bigquery_client().query(query)
163+
results = list(query_job.result(timeout=30))
128164

129-
if not temporal_results:
130-
logger.warning(
131-
f"No non-empty partitions found for {col_name}"
132-
)
133-
return None
165+
if not results or results[0].val is None:
166+
logger.warning(
167+
f"No non-empty partition values found for column {col_name}"
168+
)
169+
continue
134170

135-
partition_value = temporal_results[0].partition_value
136-
record_count = temporal_results[0].record_count
171+
val = results[0].val
172+
record_count = results[0].record_count
173+
logger.info(
174+
f"Selected external partition {col_name}={val} with {record_count} records"
175+
)
137176

138-
if col_type == "DATE":
139-
filter_value = (
140-
f"DATE '{partition_value.strftime('%Y-%m-%d')}'"
177+
# Format the filter based on the data type
178+
data_type_upper = data_type.upper() if data_type else ""
179+
if data_type_upper in ("STRING", "VARCHAR"):
180+
partition_filters.append(f"`{col_name}` = '{val}'")
181+
elif data_type_upper == "DATE":
182+
partition_filters.append(f"`{col_name}` = DATE '{val}'")
183+
elif data_type_upper in ("TIMESTAMP", "DATETIME"):
184+
if isinstance(val, datetime):
185+
partition_filters.append(
186+
f"`{col_name}` = TIMESTAMP '{val.strftime('%Y-%m-%d %H:%M:%S')}'"
187+
)
188+
else:
189+
partition_filters.append(
190+
f"`{col_name}` = TIMESTAMP '{val}'"
141191
)
142-
else: # TIMESTAMP or DATETIME
143-
filter_value = f"TIMESTAMP '{partition_value.strftime('%Y-%m-%d %H:%M:%S')}'"
192+
else:
193+
# Default to numeric or other type
194+
partition_filters.append(f"`{col_name}` = {val}")
144195

145-
logger.info(
146-
f"Selected temporal partition {col_name}={partition_value} "
147-
f"with {record_count} records"
148-
)
149-
return [f"`{col_name}` = {filter_value}"]
196+
except Exception as e:
197+
logger.warning(
198+
f"Error determining value for partition column {col_name}: {e}"
199+
)
200+
continue
201+
202+
return partition_filters
150203

151-
return [] # No partitions found (valid for external tables)
152204
except Exception as e:
153205
logger.error(f"Error checking external table partitioning: {e}")
154206
return None
155207

208+
# Add this method to improve detection of partition columns from INFORMATION_SCHEMA if not found in partition_info
156209
def _get_required_partition_filters(
157210
self,
158211
table: BigqueryTable,
@@ -187,15 +240,30 @@ def _get_required_partition_filters(
187240
col.name for col in table.partition_info.columns if col
188241
)
189242

190-
# If no partition columns found, check for external table partitioning
243+
# If no partition columns found from partition_info, query INFORMATION_SCHEMA
244+
if not required_partition_columns:
245+
try:
246+
query = f"""SELECT column_name
247+
FROM `{project}.{schema}.INFORMATION_SCHEMA.COLUMNS`
248+
WHERE table_name = '{table.name}' AND is_partitioning_column = 'YES'"""
249+
query_job = self.config.get_bigquery_client().query(query)
250+
results = list(query_job)
251+
required_partition_columns = {row.column_name for row in results}
252+
logger.debug(
253+
f"Found partition columns from schema: {required_partition_columns}"
254+
)
255+
except Exception as e:
256+
logger.error(f"Error querying partition columns: {e}")
257+
258+
# If still no partition columns found, check for external table partitioning
191259
if not required_partition_columns:
192260
logger.debug(f"No partition columns found for table {table.name}")
193261
if table.external:
194262
return self._get_external_table_partition_filters(
195263
table, project, schema, current_time
196264
)
197265
else:
198-
return None # Internal table without partitions (unexpected)
266+
return None
199267

200268
logger.debug(f"Required partition columns: {required_partition_columns}")
201269

@@ -245,8 +313,7 @@ def _get_required_partition_filters(
245313
for col_name in other_columns:
246314
try:
247315
# Query to get latest non-empty partition
248-
query = f"""
249-
WITH PartitionStats AS (
316+
query = f"""WITH PartitionStats AS (
250317
SELECT {col_name} as val,
251318
COUNT(*) as record_count
252319
FROM `{project}.{schema}.{table.name}`
@@ -299,14 +366,16 @@ def get_batch_kwargs(
299366
"table_name": bq_table.name,
300367
}
301368

302-
# For external tables, add specific handling
369+
# Different handling path for external tables vs native tables
303370
if bq_table.external:
304-
base_kwargs["is_external"] = "true"
305-
# Add any specific external table options needed
306-
307-
partition_filters = self._get_required_partition_filters(
308-
bq_table, db_name, schema_name
309-
)
371+
logger.info(f"Processing external table: {bq_table.name}")
372+
partition_filters = self._get_external_table_partition_filters(
373+
bq_table, db_name, schema_name, datetime.now(timezone.utc)
374+
)
375+
else:
376+
partition_filters = self._get_required_partition_filters(
377+
bq_table, db_name, schema_name
378+
)
310379

311380
if partition_filters is None:
312381
logger.warning(
@@ -315,7 +384,7 @@ def get_batch_kwargs(
315384
)
316385
return base_kwargs
317386

318-
# If no partition filters needed (e.g. some external tables), return base kwargs
387+
# If no partition filters needed, return base kwargs
319388
if not partition_filters:
320389
return base_kwargs
321390

metadata-ingestion/tests/unit/bigquery/test_bigquery_profiler.py

+1
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ def test_get_partition_filters_for_single_day_partition():
119119
timestamp_str = filters[0].split("'")[1]
120120
datetime.strptime(timestamp_str.split("+")[0], "%Y-%m-%d %H:%M:%S.%f")
121121

122+
122123
def test_get_partition_filters_for_external_table_with_partitions():
123124
"""Test handling of external table with partitions."""
124125
profiler = BigqueryProfiler(config=BigQueryV2Config(), report=BigQueryV2Report())

0 commit comments

Comments
 (0)