Skip to content

Commit 7930212

Browse files
committed
improvements
1 parent cec74f4 commit 7930212

File tree

3 files changed

+238
-74
lines changed

3 files changed

+238
-74
lines changed

metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran.py

+73-60
Original file line numberDiff line numberDiff line change
@@ -99,45 +99,76 @@ def _extend_lineage(
9999
if destination_details is None:
100100
destination_details = self._get_destination_details(connector)
101101

102+
# Ensure platform is set to avoid URN creation issues
103+
if not source_details.platform:
104+
source_details.platform = self._detect_source_platform(connector)
105+
106+
if not destination_details.platform:
107+
destination_details.platform = "snowflake" # Default to snowflake
108+
109+
# Log the lineage information for debugging
110+
logger.info(
111+
f"Processing lineage for connector {connector.connector_id}: "
112+
f"source_platform={source_details.platform}, "
113+
f"destination_platform={destination_details.platform}, "
114+
f"{len(connector.lineage)} table lineage entries"
115+
)
116+
102117
# Handle lineage truncation if needed
103118
if len(connector.lineage) >= MAX_TABLE_LINEAGE_PER_CONNECTOR:
104119
self._report_lineage_truncation(connector)
105120

106121
# Process each table lineage entry
107122
for lineage in connector.lineage:
108-
# Create source and destination URNs
109-
source_urn = self._create_dataset_urn(
110-
lineage.source_table,
111-
source_details,
112-
is_source=True,
113-
)
114-
115-
dest_urn = self._create_dataset_urn(
116-
lineage.destination_table,
117-
destination_details,
118-
is_source=False,
119-
)
120-
121-
# Skip if either URN creation failed
122-
if not source_urn or not dest_urn:
123-
continue
123+
try:
124+
# Create source and destination URNs
125+
source_urn = self._create_dataset_urn(
126+
lineage.source_table,
127+
source_details,
128+
is_source=True,
129+
)
124130

125-
# Add URNs to lists (avoiding duplicates)
126-
if source_urn not in input_dataset_urn_list:
127-
input_dataset_urn_list.append(source_urn)
131+
dest_urn = self._create_dataset_urn(
132+
lineage.destination_table,
133+
destination_details,
134+
is_source=False,
135+
)
128136

129-
if dest_urn not in output_dataset_urn_list:
130-
output_dataset_urn_list.append(dest_urn)
137+
# Skip if either URN creation failed
138+
if not source_urn or not dest_urn:
139+
logger.warning(
140+
f"Skipping lineage for {lineage.source_table} -> {lineage.destination_table}: "
141+
f"Failed to create URNs"
142+
)
143+
continue
144+
145+
# Add URNs to lists (avoiding duplicates)
146+
if str(source_urn) not in [str(u) for u in input_dataset_urn_list]:
147+
input_dataset_urn_list.append(source_urn)
148+
149+
if str(dest_urn) not in [str(u) for u in output_dataset_urn_list]:
150+
output_dataset_urn_list.append(dest_urn)
151+
152+
# Create column lineage if enabled
153+
if self.config.include_column_lineage:
154+
self._create_column_lineage(
155+
lineage=lineage,
156+
source_urn=source_urn,
157+
dest_urn=dest_urn,
158+
fine_grained_lineage=fine_grained_lineage,
159+
)
131160

132-
# Create column lineage if enabled
133-
if self.config.include_column_lineage:
134-
self._create_column_lineage(
135-
lineage=lineage,
136-
source_urn=source_urn,
137-
dest_urn=dest_urn,
138-
fine_grained_lineage=fine_grained_lineage,
161+
logger.debug(f"Created lineage from {source_urn} to {dest_urn}")
162+
except Exception as e:
163+
logger.warning(
164+
f"Error creating lineage for table {lineage.source_table} -> {lineage.destination_table}: {e}"
139165
)
140166

167+
# Log the lineage that was created for debugging
168+
logger.info(
169+
f"Created lineage with {len(input_dataset_urn_list)} input URNs and {len(output_dataset_urn_list)} output URNs"
170+
)
171+
141172
# Add URNs and lineage to the datajob
142173
datajob.inlets.extend(input_dataset_urn_list)
143174
datajob.outlets.extend(output_dataset_urn_list)
@@ -150,22 +181,6 @@ def _extend_lineage(
150181
destination_details=destination_details,
151182
)
152183

153-
# Add source and destination platform information to properties
154-
if source_details.platform:
155-
lineage_properties["source.platform"] = source_details.platform
156-
if destination_details.platform:
157-
lineage_properties["destination.platform"] = destination_details.platform
158-
159-
# Add database information if available
160-
if source_details.database:
161-
lineage_properties["source.database"] = source_details.database
162-
if destination_details.database:
163-
lineage_properties["destination.database"] = destination_details.database
164-
165-
# Add environment information
166-
lineage_properties["source.env"] = source_details.env or "PROD"
167-
lineage_properties["destination.env"] = destination_details.env or "PROD"
168-
169184
return lineage_properties
170185

171186
def _get_source_details(self, connector: Connector) -> PlatformDetail:
@@ -241,17 +256,24 @@ def _create_dataset_urn(
241256
platform = details.platform
242257
if not platform:
243258
platform = "snowflake" if not is_source else "external"
259+
logger.info(
260+
f"Using default platform {platform} for {'source' if is_source else 'destination'} table {table_name}"
261+
)
244262

245-
# Include database in the table name if available
246-
full_table_name = (
247-
f"{details.database.lower()}.{table_name}"
248-
if details.database
249-
else table_name
250-
)
263+
# Include database in the table name if available and ensure it's lowercase
264+
database = details.database.lower() if details.database else ""
265+
full_table_name = f"{database}.{table_name}" if database else table_name
251266

252267
# Ensure environment is set
253268
env = details.env or "PROD"
254269

270+
# Log the URN creation details for debugging
271+
logger.debug(
272+
f"Creating {'source' if is_source else 'destination'} URN with: "
273+
f"platform={platform}, table_name={full_table_name}, env={env}, "
274+
f"platform_instance={details.platform_instance}"
275+
)
276+
255277
return DatasetUrn.create_from_ids(
256278
platform_id=platform,
257279
table_name=full_table_name,
@@ -260,19 +282,10 @@ def _create_dataset_urn(
260282
)
261283
except Exception as e:
262284
logger.warning(
263-
f"Failed to create {'source' if is_source else 'destination'} URN: {e}"
285+
f"Failed to create {'source' if is_source else 'destination'} URN for {table_name}: {e}"
264286
)
265287
return None
266288

267-
def _report_lineage_truncation(self, connector: Connector) -> None:
268-
"""Report warning about truncated lineage."""
269-
self.report.warning(
270-
title="Table lineage truncated",
271-
message=f"The connector had more than {MAX_TABLE_LINEAGE_PER_CONNECTOR} table lineage entries. "
272-
f"Only the most recent {MAX_TABLE_LINEAGE_PER_CONNECTOR} entries were ingested.",
273-
context=f"{connector.connector_name} (connector_id: {connector.connector_id})",
274-
)
275-
276289
def _create_column_lineage(
277290
self,
278291
lineage,

metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_api_client.py

+31-5
Original file line numberDiff line numberDiff line change
@@ -206,16 +206,42 @@ def list_connector_schemas(self, connector_id: str) -> List[Dict]:
206206

207207
try:
208208
response = self._make_request("GET", f"/connectors/{connector_id}/schemas")
209+
# Log the raw response format for debugging
210+
logger.debug(
211+
f"Schema response format for connector {connector_id}: {type(response)}"
212+
)
213+
209214
schemas = response.get("data", {}).get("schemas", [])
215+
logger.debug(f"Schemas format: {type(schemas)}, value: {schemas}")
210216

211-
# Ensure schemas is a list of dictionaries
212-
if not isinstance(schemas, list):
213-
logger.warning(
214-
f"Unexpected schema format for connector {connector_id}: {schemas}"
215-
)
217+
# Handle various schema response formats
218+
if schemas is None:
216219
schemas = []
220+
elif isinstance(schemas, str):
221+
# Some APIs return a JSON string that needs to be parsed
222+
try:
223+
import json
224+
225+
parsed = json.loads(schemas)
226+
if isinstance(parsed, list):
227+
schemas = parsed
228+
else:
229+
logger.warning(f"Parsed schema is not a list: {parsed}")
230+
schemas = []
231+
except Exception as e:
232+
logger.warning(f"Failed to parse schema string: {e}")
233+
schemas = []
234+
elif not isinstance(schemas, list):
235+
logger.warning(f"Unexpected schema type: {type(schemas)}")
236+
schemas = []
237+
238+
# Filter out non-dict entries
239+
schemas = [s for s in schemas if isinstance(s, dict)]
217240

218241
self._schema_cache[connector_id] = schemas
242+
logger.info(
243+
f"Retrieved {len(schemas)} schemas for connector {connector_id}"
244+
)
219245
return schemas
220246
except Exception as e:
221247
logger.warning(f"Error fetching schemas for connector {connector_id}: {e}")

metadata-ingestion/src/datahub/ingestion/source/fivetran/fivetran_standard_api.py

+134-9
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@
33

44
from datahub.configuration.common import AllowDenyPattern
55
from datahub.ingestion.source.fivetran.config import FivetranSourceReport
6-
from datahub.ingestion.source.fivetran.data_classes import Connector
6+
from datahub.ingestion.source.fivetran.data_classes import (
7+
ColumnLineage,
8+
Connector,
9+
TableLineage,
10+
)
711
from datahub.ingestion.source.fivetran.fivetran_access import FivetranAccessInterface
812
from datahub.ingestion.source.fivetran.fivetran_api_client import FivetranAPIClient
913
from datahub.ingestion.source.fivetran.fivetran_query import (
@@ -130,20 +134,141 @@ def _fill_connectors_lineage(self, connectors: List[Connector]) -> None:
130134
"""
131135
for connector in connectors:
132136
try:
133-
# Extract table lineage for this connector
134-
lineage = self.api_client.extract_table_lineage(connector.connector_id)
137+
logger.info(
138+
f"Extracting lineage for connector {connector.connector_id}"
139+
)
140+
141+
# Get destination platform from connector properties
142+
destination_platform = connector.additional_properties.get(
143+
"destination_platform", "snowflake"
144+
)
145+
146+
# Try to get schema information with detailed logging
147+
schemas = self.api_client.list_connector_schemas(connector.connector_id)
148+
logger.info(
149+
f"Retrieved {len(schemas)} schemas for connector {connector.connector_id}"
150+
)
151+
152+
lineage_list = []
153+
154+
# Process each schema
155+
for schema in schemas:
156+
try:
157+
schema_name = schema.get("name", "")
158+
if not schema_name:
159+
logger.warning(f"Skipping schema with no name: {schema}")
160+
continue
161+
162+
tables = schema.get("tables", [])
163+
if not isinstance(tables, list):
164+
logger.warning(
165+
f"Schema {schema_name} has non-list tables: {tables}"
166+
)
167+
continue
168+
169+
# Log the number of tables found
170+
logger.info(
171+
f"Processing {len(tables)} tables in schema {schema_name}"
172+
)
173+
174+
# Process each table in the schema
175+
for table in tables:
176+
try:
177+
if not isinstance(table, dict):
178+
continue
179+
180+
table_name = table.get("name", "")
181+
enabled = table.get("enabled", False)
182+
183+
if not enabled or not table_name:
184+
continue
135185

136-
# Check if we need to truncate the lineage
137-
if len(lineage) > MAX_TABLE_LINEAGE_PER_CONNECTOR:
186+
# Create source and destination table identifiers
187+
source_table = f"{schema_name}.{table_name}"
188+
189+
# Adjust case based on destination platform
190+
dest_schema = (
191+
schema_name.upper()
192+
if destination_platform != "bigquery"
193+
else schema_name
194+
)
195+
dest_table = (
196+
table_name.upper()
197+
if destination_platform != "bigquery"
198+
else table_name
199+
)
200+
destination_table = f"{dest_schema}.{dest_table}"
201+
202+
# Process columns for lineage
203+
column_lineage = []
204+
columns = table.get("columns", [])
205+
206+
if isinstance(columns, list):
207+
for column in columns:
208+
try:
209+
if not isinstance(column, dict):
210+
continue
211+
212+
col_name = column.get("name", "")
213+
if not col_name:
214+
continue
215+
216+
# Destination column name follows same case convention as table
217+
dest_col_name = (
218+
col_name.upper()
219+
if destination_platform != "bigquery"
220+
else col_name
221+
)
222+
223+
column_lineage.append(
224+
ColumnLineage(
225+
source_column=col_name,
226+
destination_column=dest_col_name,
227+
)
228+
)
229+
except Exception as col_e:
230+
logger.warning(
231+
f"Error processing column in table {table_name}: {col_e}"
232+
)
233+
234+
# Add this table's lineage
235+
lineage_list.append(
236+
TableLineage(
237+
source_table=source_table,
238+
destination_table=destination_table,
239+
column_lineage=column_lineage,
240+
)
241+
)
242+
243+
logger.debug(
244+
f"Added lineage: {source_table} -> {destination_table} with {len(column_lineage)} columns"
245+
)
246+
except Exception as table_e:
247+
logger.warning(
248+
f"Error processing table {table.get('name', 'unknown')}: {table_e}"
249+
)
250+
except Exception as schema_e:
251+
logger.warning(
252+
f"Error processing schema {schema.get('name', 'unknown')}: {schema_e}"
253+
)
254+
255+
# Truncate if necessary
256+
if len(lineage_list) > MAX_TABLE_LINEAGE_PER_CONNECTOR:
138257
logger.warning(
139-
f"Connector {connector.connector_name} has {len(lineage)} tables, "
258+
f"Connector {connector.connector_name} has {len(lineage_list)} tables, "
140259
f"truncating to {MAX_TABLE_LINEAGE_PER_CONNECTOR}"
141260
)
142-
lineage = lineage[:MAX_TABLE_LINEAGE_PER_CONNECTOR]
261+
lineage_list = lineage_list[:MAX_TABLE_LINEAGE_PER_CONNECTOR]
262+
263+
connector.lineage = lineage_list
264+
265+
logger.info(
266+
f"Successfully extracted {len(lineage_list)} table lineages for connector {connector.connector_id}"
267+
)
143268

144-
connector.lineage = lineage
145269
except Exception as e:
146270
logger.error(
147-
f"Failed to extract lineage for connector {connector.connector_name}: {e}"
271+
f"Failed to extract lineage for connector {connector.connector_name}: {e}",
272+
exc_info=True,
148273
)
149274
connector.lineage = []

0 commit comments

Comments
 (0)