diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index 197e73dca7141b..1dc32548e2eec4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -596,7 +596,15 @@ class TableauSourceReport(StaleEntityRemovalSourceReport): num_datasource_field_skipped_no_name: int = 0 num_csql_field_skipped_no_name: int = 0 num_table_field_skipped_no_name: int = 0 + # lineage + num_tables_with_upstream_lineage: int = 0 + num_upstream_table_lineage: int = 0 + num_upstream_fine_grained_lineage: int = 0 num_upstream_table_skipped_no_name: int = 0 + num_upstream_table_skipped_no_columns: int = 0 + num_upstream_table_failed_generate_reference: int = 0 + num_upstream_table_lineage_failed_parse_sql: int = 0 + num_upstream_fine_grained_lineage_failed_parse_sql: int = 0 @platform_name("Tableau") @@ -1311,7 +1319,7 @@ def _create_upstream_table_lineage( datasource: dict, browse_path: Optional[str], is_embedded_ds: bool = False, - ) -> Tuple: + ) -> Tuple[List[Upstream], List[FineGrainedLineage]]: upstream_tables: List[Upstream] = [] fine_grained_lineages: List[FineGrainedLineage] = [] table_id_to_urn = {} @@ -1472,6 +1480,7 @@ def get_upstream_tables( c.COLUMNS_CONNECTION ].get("totalCount") if not is_custom_sql and not num_tbl_cols: + self.report.num_upstream_table_skipped_no_columns += 1 logger.warning( f"Skipping upstream table with id {table[c.ID]}, no columns: {table}" ) @@ -1488,6 +1497,7 @@ def get_upstream_tables( table, default_schema_map=self.config.default_schema_map ) except Exception as e: + self.report.num_upstream_table_failed_generate_reference += 1 self.report.warning( title="Potentially Missing Lineage Issue", message="Failed to generate upstream reference", @@ -1659,15 +1669,7 @@ def get_upstream_fields_from_custom_sql( func_overridden_info=None, # Here we don't want to override any information from configuration ) - if parsed_result is None: - logger.info( - f"Failed to extract column level lineage from datasource {datasource_urn}" - ) - return [] - if parsed_result.debug_info.error: - logger.info( - f"Failed to extract column level lineage from datasource {datasource_urn}: {parsed_result.debug_info.error}" - ) + if parsed_result is None or parsed_result.debug_info.error: return [] cll: List[ColumnLineageInfo] = ( @@ -2031,6 +2033,8 @@ def _create_lineage_to_upstream_tables( aspect_name=c.UPSTREAM_LINEAGE, aspect=upstream_lineage, ) + self.report.num_tables_with_upstream_lineage += 1 + self.report.num_upstream_table_lineage += len(upstream_tables) @staticmethod def _clean_tableau_query_parameters(query: str) -> str: @@ -2130,7 +2134,7 @@ def parse_custom_sql( f"Overridden info upstream_db={upstream_db}, platform_instance={platform_instance}, platform={platform}" ) - return create_lineage_sql_parsed_result( + parsed_result = create_lineage_sql_parsed_result( query=query, default_db=upstream_db, platform=platform, @@ -2140,6 +2144,21 @@ def parse_custom_sql( schema_aware=not self.config.sql_parsing_disable_schema_awareness, ) + assert parsed_result is not None + + if parsed_result.debug_info.table_error: + logger.warning( + f"Failed to extract table lineage from datasource {datasource_urn}: {parsed_result.debug_info.table_error}" + ) + self.report.num_upstream_table_lineage_failed_parse_sql += 1 + elif parsed_result.debug_info.column_error: + logger.warning( + f"Failed to extract column level lineage from datasource {datasource_urn}: {parsed_result.debug_info.column_error}" + ) + self.report.num_upstream_fine_grained_lineage_failed_parse_sql += 1 + + return parsed_result + def _enrich_database_tables_with_parsed_schemas( self, parsing_result: SqlParsingResult ) -> None: @@ -2174,9 +2193,6 @@ def _create_lineage_from_unsupported_csql( ) if parsed_result is None: - logger.info( - f"Failed to extract table level lineage for datasource {csql_urn}" - ) return self._enrich_database_tables_with_parsed_schemas(parsed_result) @@ -2196,12 +2212,14 @@ def _create_lineage_from_unsupported_csql( upstreams=upstream_tables, fineGrainedLineages=fine_grained_lineages, ) - yield self.get_metadata_change_proposal( csql_urn, aspect_name=c.UPSTREAM_LINEAGE, aspect=upstream_lineage, ) + self.report.num_tables_with_upstream_lineage += 1 + self.report.num_upstream_table_lineage += len(upstream_tables) + self.report.num_upstream_fine_grained_lineage += len(fine_grained_lineages) def _get_schema_metadata_for_datasource( self, datasource_fields: List[dict] @@ -2352,6 +2370,11 @@ def emit_datasource( aspect_name=c.UPSTREAM_LINEAGE, aspect=upstream_lineage, ) + self.report.num_tables_with_upstream_lineage += 1 + self.report.num_upstream_table_lineage += len(upstream_tables) + self.report.num_upstream_fine_grained_lineage += len( + fine_grained_lineages + ) # Datasource Fields schema_metadata = self._get_schema_metadata_for_datasource(