Skip to content

Commit 534e7d1

Browse files
sgomezvillamorhsheth2
authored andcommitted
feat(tableau): adds more reporting metrics to better understand lineage construction in tableau ingestion (datahub-project#12008)
Co-authored-by: Harshal Sheth <[email protected]>
1 parent 0c24480 commit 534e7d1

File tree

1 file changed

+38
-15
lines changed
  • metadata-ingestion/src/datahub/ingestion/source/tableau

1 file changed

+38
-15
lines changed

metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py

+38-15
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,15 @@ class TableauSourceReport(StaleEntityRemovalSourceReport):
596596
num_datasource_field_skipped_no_name: int = 0
597597
num_csql_field_skipped_no_name: int = 0
598598
num_table_field_skipped_no_name: int = 0
599+
# lineage
600+
num_tables_with_upstream_lineage: int = 0
601+
num_upstream_table_lineage: int = 0
602+
num_upstream_fine_grained_lineage: int = 0
599603
num_upstream_table_skipped_no_name: int = 0
604+
num_upstream_table_skipped_no_columns: int = 0
605+
num_upstream_table_failed_generate_reference: int = 0
606+
num_upstream_table_lineage_failed_parse_sql: int = 0
607+
num_upstream_fine_grained_lineage_failed_parse_sql: int = 0
600608

601609

602610
@platform_name("Tableau")
@@ -1311,7 +1319,7 @@ def _create_upstream_table_lineage(
13111319
datasource: dict,
13121320
browse_path: Optional[str],
13131321
is_embedded_ds: bool = False,
1314-
) -> Tuple:
1322+
) -> Tuple[List[Upstream], List[FineGrainedLineage]]:
13151323
upstream_tables: List[Upstream] = []
13161324
fine_grained_lineages: List[FineGrainedLineage] = []
13171325
table_id_to_urn = {}
@@ -1472,6 +1480,7 @@ def get_upstream_tables(
14721480
c.COLUMNS_CONNECTION
14731481
].get("totalCount")
14741482
if not is_custom_sql and not num_tbl_cols:
1483+
self.report.num_upstream_table_skipped_no_columns += 1
14751484
logger.warning(
14761485
f"Skipping upstream table with id {table[c.ID]}, no columns: {table}"
14771486
)
@@ -1488,6 +1497,7 @@ def get_upstream_tables(
14881497
table, default_schema_map=self.config.default_schema_map
14891498
)
14901499
except Exception as e:
1500+
self.report.num_upstream_table_failed_generate_reference += 1
14911501
self.report.warning(
14921502
title="Potentially Missing Lineage Issue",
14931503
message="Failed to generate upstream reference",
@@ -1659,15 +1669,7 @@ def get_upstream_fields_from_custom_sql(
16591669
func_overridden_info=None, # Here we don't want to override any information from configuration
16601670
)
16611671

1662-
if parsed_result is None:
1663-
logger.info(
1664-
f"Failed to extract column level lineage from datasource {datasource_urn}"
1665-
)
1666-
return []
1667-
if parsed_result.debug_info.error:
1668-
logger.info(
1669-
f"Failed to extract column level lineage from datasource {datasource_urn}: {parsed_result.debug_info.error}"
1670-
)
1672+
if parsed_result is None or parsed_result.debug_info.error:
16711673
return []
16721674

16731675
cll: List[ColumnLineageInfo] = (
@@ -2031,6 +2033,8 @@ def _create_lineage_to_upstream_tables(
20312033
aspect_name=c.UPSTREAM_LINEAGE,
20322034
aspect=upstream_lineage,
20332035
)
2036+
self.report.num_tables_with_upstream_lineage += 1
2037+
self.report.num_upstream_table_lineage += len(upstream_tables)
20342038

20352039
@staticmethod
20362040
def _clean_tableau_query_parameters(query: str) -> str:
@@ -2130,7 +2134,7 @@ def parse_custom_sql(
21302134
f"Overridden info upstream_db={upstream_db}, platform_instance={platform_instance}, platform={platform}"
21312135
)
21322136

2133-
return create_lineage_sql_parsed_result(
2137+
parsed_result = create_lineage_sql_parsed_result(
21342138
query=query,
21352139
default_db=upstream_db,
21362140
platform=platform,
@@ -2140,6 +2144,21 @@ def parse_custom_sql(
21402144
schema_aware=not self.config.sql_parsing_disable_schema_awareness,
21412145
)
21422146

2147+
assert parsed_result is not None
2148+
2149+
if parsed_result.debug_info.table_error:
2150+
logger.warning(
2151+
f"Failed to extract table lineage from datasource {datasource_urn}: {parsed_result.debug_info.table_error}"
2152+
)
2153+
self.report.num_upstream_table_lineage_failed_parse_sql += 1
2154+
elif parsed_result.debug_info.column_error:
2155+
logger.warning(
2156+
f"Failed to extract column level lineage from datasource {datasource_urn}: {parsed_result.debug_info.column_error}"
2157+
)
2158+
self.report.num_upstream_fine_grained_lineage_failed_parse_sql += 1
2159+
2160+
return parsed_result
2161+
21432162
def _enrich_database_tables_with_parsed_schemas(
21442163
self, parsing_result: SqlParsingResult
21452164
) -> None:
@@ -2174,9 +2193,6 @@ def _create_lineage_from_unsupported_csql(
21742193
)
21752194

21762195
if parsed_result is None:
2177-
logger.info(
2178-
f"Failed to extract table level lineage for datasource {csql_urn}"
2179-
)
21802196
return
21812197

21822198
self._enrich_database_tables_with_parsed_schemas(parsed_result)
@@ -2196,12 +2212,14 @@ def _create_lineage_from_unsupported_csql(
21962212
upstreams=upstream_tables,
21972213
fineGrainedLineages=fine_grained_lineages,
21982214
)
2199-
22002215
yield self.get_metadata_change_proposal(
22012216
csql_urn,
22022217
aspect_name=c.UPSTREAM_LINEAGE,
22032218
aspect=upstream_lineage,
22042219
)
2220+
self.report.num_tables_with_upstream_lineage += 1
2221+
self.report.num_upstream_table_lineage += len(upstream_tables)
2222+
self.report.num_upstream_fine_grained_lineage += len(fine_grained_lineages)
22052223

22062224
def _get_schema_metadata_for_datasource(
22072225
self, datasource_fields: List[dict]
@@ -2352,6 +2370,11 @@ def emit_datasource(
23522370
aspect_name=c.UPSTREAM_LINEAGE,
23532371
aspect=upstream_lineage,
23542372
)
2373+
self.report.num_tables_with_upstream_lineage += 1
2374+
self.report.num_upstream_table_lineage += len(upstream_tables)
2375+
self.report.num_upstream_fine_grained_lineage += len(
2376+
fine_grained_lineages
2377+
)
23552378

23562379
# Datasource Fields
23572380
schema_metadata = self._get_schema_metadata_for_datasource(

0 commit comments

Comments
 (0)