Skip to content

Commit adf82be

Browse files
committed
BQ improvements
1 parent 7930212 commit adf82be

File tree

3 files changed

+122
-52
lines changed

3 files changed

+122
-52
lines changed

metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py

+28-9
Original file line numberDiff line numberDiff line change
@@ -530,10 +530,21 @@ def _generate_dataflow_from_connector(self, connector: Connector) -> DataFlow:
530530
properties["paused"] = str(connector.paused)
531531
properties["destination_id"] = connector.destination_id
532532

533-
# Get destination platform if available
534-
if "destination_platform" in connector.additional_properties:
535-
destination = connector.additional_properties.get("destination_platform")
536-
description += f" to {destination}"
533+
# Get destination platform
534+
default_destination = (
535+
"bigquery"
536+
if (
537+
hasattr(self.config, "fivetran_log_config")
538+
and self.config.fivetran_log_config
539+
and self.config.fivetran_log_config.destination_platform == "bigquery"
540+
)
541+
else "snowflake"
542+
)
543+
544+
destination = connector.additional_properties.get(
545+
"destination_platform", default_destination
546+
)
547+
description += f" to {destination}"
537548

538549
return DataFlow(
539550
orchestrator=Constant.ORCHESTRATOR,
@@ -562,12 +573,20 @@ def _generate_datajob_from_connector(self, connector: Connector) -> DataJob:
562573
# Get source platform from connector type
563574
source_platform = self._detect_source_platform(connector)
564575

565-
# Get destination platform
566-
destination_platform = "snowflake" # Default
567-
if "destination_platform" in connector.additional_properties:
568-
destination_platform = connector.additional_properties.get(
569-
"destination_platform"
576+
# Get destination platform - use bigquery as default if we have bigquery config
577+
default_destination = (
578+
"bigquery"
579+
if (
580+
hasattr(self.config, "fivetran_log_config")
581+
and self.config.fivetran_log_config
582+
and self.config.fivetran_log_config.destination_platform == "bigquery"
570583
)
584+
else "snowflake"
585+
)
586+
587+
destination_platform = connector.additional_properties.get(
588+
"destination_platform", default_destination
589+
)
571590

572591
# Create job description
573592
description = f"Fivetran data pipeline from {connector.connector_type} to {destination_platform}"

metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_api_client.py

+70-30
Original file line numberDiff line numberDiff line change
@@ -206,45 +206,85 @@ def list_connector_schemas(self, connector_id: str) -> List[Dict]:
206206

207207
try:
208208
response = self._make_request("GET", f"/connectors/{connector_id}/schemas")
209-
# Log the raw response format for debugging
210-
logger.debug(
211-
f"Schema response format for connector {connector_id}: {type(response)}"
212-
)
213209

214-
schemas = response.get("data", {}).get("schemas", [])
215-
logger.debug(f"Schemas format: {type(schemas)}, value: {schemas}")
210+
# Debug the response format
211+
logger.debug(f"Schema response for connector {connector_id}: {response}")
216212

217-
# Handle various schema response formats
218-
if schemas is None:
219-
schemas = []
220-
elif isinstance(schemas, str):
221-
# Some APIs return a JSON string that needs to be parsed
222-
try:
223-
import json
224-
225-
parsed = json.loads(schemas)
226-
if isinstance(parsed, list):
227-
schemas = parsed
228-
else:
229-
logger.warning(f"Parsed schema is not a list: {parsed}")
230-
schemas = []
231-
except Exception as e:
232-
logger.warning(f"Failed to parse schema string: {e}")
233-
schemas = []
234-
elif not isinstance(schemas, list):
235-
logger.warning(f"Unexpected schema type: {type(schemas)}")
236-
schemas = []
213+
# The API can return schemas in different formats
214+
# Format 1: {'data': {'schemas': [...]}}
215+
# Format 2: {'data': {'schemas': {'schema_name': {'name_in_destination': 'schema_name', 'enabled': True, 'tables': {...}}}}}
216+
raw_schemas = response.get("data", {}).get("schemas", [])
237217

238-
# Filter out non-dict entries
239-
schemas = [s for s in schemas if isinstance(s, dict)]
218+
schemas = []
219+
220+
# Handle different response formats
221+
if isinstance(raw_schemas, dict):
222+
# Handle nested object format
223+
logger.info(
224+
f"Converting nested schema format for connector {connector_id}"
225+
)
226+
for schema_name, schema_data in raw_schemas.items():
227+
# Convert to the expected format
228+
schema_obj = {
229+
"name": schema_name,
230+
"name_in_destination": schema_data.get(
231+
"name_in_destination", schema_name
232+
),
233+
"enabled": schema_data.get("enabled", True),
234+
"tables": [],
235+
}
236+
237+
# Convert tables from dict to list format
238+
tables_dict = schema_data.get("tables", {})
239+
if isinstance(tables_dict, dict):
240+
for table_name, table_data in tables_dict.items():
241+
table_obj = {
242+
"name": table_name,
243+
"name_in_destination": table_data.get(
244+
"name_in_destination", table_name
245+
),
246+
"enabled": table_data.get("enabled", False),
247+
}
248+
249+
# Handle columns if present
250+
columns_dict = table_data.get("columns", {})
251+
columns = []
252+
if isinstance(columns_dict, dict):
253+
for column_name, column_data in columns_dict.items():
254+
column_obj = {
255+
"name": column_name,
256+
"name_in_destination": column_data.get(
257+
"name_in_destination", column_name
258+
),
259+
"enabled": column_data.get("enabled", True),
260+
}
261+
columns.append(column_obj)
262+
263+
if columns:
264+
table_obj["columns"] = columns
265+
266+
schema_obj["tables"].append(table_obj)
267+
268+
schemas.append(schema_obj)
269+
elif isinstance(raw_schemas, list):
270+
# Already in the expected format
271+
schemas = raw_schemas
272+
else:
273+
logger.warning(
274+
f"Unexpected schema format type for connector {connector_id}: {type(raw_schemas)}"
275+
)
276+
schemas = []
240277

241278
self._schema_cache[connector_id] = schemas
242279
logger.info(
243-
f"Retrieved {len(schemas)} schemas for connector {connector_id}"
280+
f"Processed {len(schemas)} schemas for connector {connector_id}"
244281
)
245282
return schemas
246283
except Exception as e:
247-
logger.warning(f"Error fetching schemas for connector {connector_id}: {e}")
284+
logger.warning(
285+
f"Error fetching schemas for connector {connector_id}: {e}",
286+
exc_info=True,
287+
)
248288
return []
249289

250290
def list_users(self) -> List[Dict]:

metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_standard_api.py

+24-13
Original file line numberDiff line numberDiff line change
@@ -138,15 +138,29 @@ def _fill_connectors_lineage(self, connectors: List[Connector]) -> None:
138138
f"Extracting lineage for connector {connector.connector_id}"
139139
)
140140

141-
# Get destination platform from connector properties
142-
destination_platform = connector.additional_properties.get(
143-
"destination_platform", "snowflake"
144-
)
141+
# Make sure we're using the correct destination platform from config
142+
# The destination_platform was incorrectly set to "snowflake" as the default
143+
if "destination_platform" in connector.additional_properties:
144+
destination_platform = connector.additional_properties.get(
145+
"destination_platform"
146+
)
147+
else:
148+
# Use the platform from the config if available
149+
destination_platform = (
150+
"bigquery" # Default to bigquery based on your config
151+
)
152+
# Update the connector properties
153+
connector.additional_properties["destination_platform"] = (
154+
destination_platform
155+
)
156+
logger.info(
157+
f"Setting destination platform to {destination_platform} for connector {connector.connector_id}"
158+
)
145159

146-
# Try to get schema information with detailed logging
160+
# Get schema information
147161
schemas = self.api_client.list_connector_schemas(connector.connector_id)
148162
logger.info(
149-
f"Retrieved {len(schemas)} schemas for connector {connector.connector_id}"
163+
f"Got {len(schemas)} schemas for connector {connector.connector_id}"
150164
)
151165

152166
lineage_list = []
@@ -156,21 +170,18 @@ def _fill_connectors_lineage(self, connectors: List[Connector]) -> None:
156170
try:
157171
schema_name = schema.get("name", "")
158172
if not schema_name:
159-
logger.warning(f"Skipping schema with no name: {schema}")
173+
logger.warning(
174+
f"Skipping schema with no name in connector {connector.connector_id}"
175+
)
160176
continue
161177

162178
tables = schema.get("tables", [])
163179
if not isinstance(tables, list):
164180
logger.warning(
165-
f"Schema {schema_name} has non-list tables: {tables}"
181+
f"Schema {schema_name} has non-list tables: {type(tables)}"
166182
)
167183
continue
168184

169-
# Log the number of tables found
170-
logger.info(
171-
f"Processing {len(tables)} tables in schema {schema_name}"
172-
)
173-
174185
# Process each table in the schema
175186
for table in tables:
176187
try:

0 commit comments

Comments
 (0)