Skip to content

Commit 792722a

Browse files
authored
Merge branch 'master' into container-hierarchy-changes
2 parents a8936a7 + 4811de1 commit 792722a

15 files changed

+114877
-92
lines changed

metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py

+107-30
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,18 @@ class TableauConfig(
485485
description="Configuration settings for ingesting Tableau groups and their capabilities as custom properties.",
486486
)
487487

488+
ingest_hidden_assets: bool = Field(
489+
True,
490+
description="When enabled, hidden views and dashboards are ingested into Datahub. "
491+
"If a dashboard or view is hidden in Tableau the luid is blank. Default of this config field is True.",
492+
)
493+
494+
tags_for_hidden_assets: List[str] = Field(
495+
default=[],
496+
description="Tags to be added to hidden dashboards and views. If a dashboard or view is hidden in Tableau the luid is blank. "
497+
"This can only be used with ingest_tags enabled as it will overwrite tags entered from the UI.",
498+
)
499+
488500
# pre = True because we want to take some decision before pydantic initialize the configuration to default values
489501
@root_validator(pre=True)
490502
def projects_backward_compatibility(cls, values: Dict) -> Dict:
@@ -510,6 +522,20 @@ def projects_backward_compatibility(cls, values: Dict) -> Dict:
510522

511523
return values
512524

525+
@root_validator()
526+
def validate_config_values(cls, values: Dict) -> Dict:
527+
tags_for_hidden_assets = values.get("tags_for_hidden_assets")
528+
ingest_tags = values.get("ingest_tags")
529+
if (
530+
not ingest_tags
531+
and tags_for_hidden_assets
532+
and len(tags_for_hidden_assets) > 0
533+
):
534+
raise ValueError(
535+
"tags_for_hidden_assets is only allowed with ingest_tags enabled. Be aware that this will overwrite tags entered from the UI."
536+
)
537+
return values
538+
513539

514540
class WorkbookKey(ContainerKey):
515541
workbook_id: str
@@ -596,7 +622,16 @@ class TableauSourceReport(StaleEntityRemovalSourceReport):
596622
num_datasource_field_skipped_no_name: int = 0
597623
num_csql_field_skipped_no_name: int = 0
598624
num_table_field_skipped_no_name: int = 0
625+
# lineage
626+
num_tables_with_upstream_lineage: int = 0
627+
num_upstream_table_lineage: int = 0
628+
num_upstream_fine_grained_lineage: int = 0
599629
num_upstream_table_skipped_no_name: int = 0
630+
num_upstream_table_skipped_no_columns: int = 0
631+
num_upstream_table_failed_generate_reference: int = 0
632+
num_upstream_table_lineage_failed_parse_sql: int = 0
633+
num_upstream_fine_grained_lineage_failed_parse_sql: int = 0
634+
num_hidden_assets_skipped: int = 0
600635

601636

602637
@platform_name("Tableau")
@@ -1043,6 +1078,11 @@ def get_data_platform_instance(self) -> DataPlatformInstanceClass:
10431078
),
10441079
)
10451080

1081+
def _is_hidden_view(self, dashboard_or_view: Dict) -> bool:
1082+
# LUID is blank if the view is hidden in the workbook.
1083+
# More info here: https://help.tableau.com/current/api/metadata_api/en-us/reference/view.doc.html
1084+
return not dashboard_or_view.get(c.LUID)
1085+
10461086
def get_connection_object_page(
10471087
self,
10481088
query: str,
@@ -1311,7 +1351,7 @@ def _create_upstream_table_lineage(
13111351
datasource: dict,
13121352
browse_path: Optional[str],
13131353
is_embedded_ds: bool = False,
1314-
) -> Tuple:
1354+
) -> Tuple[List[Upstream], List[FineGrainedLineage]]:
13151355
upstream_tables: List[Upstream] = []
13161356
fine_grained_lineages: List[FineGrainedLineage] = []
13171357
table_id_to_urn = {}
@@ -1472,6 +1512,7 @@ def get_upstream_tables(
14721512
c.COLUMNS_CONNECTION
14731513
].get("totalCount")
14741514
if not is_custom_sql and not num_tbl_cols:
1515+
self.report.num_upstream_table_skipped_no_columns += 1
14751516
logger.warning(
14761517
f"Skipping upstream table with id {table[c.ID]}, no columns: {table}"
14771518
)
@@ -1488,6 +1529,7 @@ def get_upstream_tables(
14881529
table, default_schema_map=self.config.default_schema_map
14891530
)
14901531
except Exception as e:
1532+
self.report.num_upstream_table_failed_generate_reference += 1
14911533
self.report.warning(
14921534
title="Potentially Missing Lineage Issue",
14931535
message="Failed to generate upstream reference",
@@ -1659,15 +1701,7 @@ def get_upstream_fields_from_custom_sql(
16591701
func_overridden_info=None, # Here we don't want to override any information from configuration
16601702
)
16611703

1662-
if parsed_result is None:
1663-
logger.info(
1664-
f"Failed to extract column level lineage from datasource {datasource_urn}"
1665-
)
1666-
return []
1667-
if parsed_result.debug_info.error:
1668-
logger.info(
1669-
f"Failed to extract column level lineage from datasource {datasource_urn}: {parsed_result.debug_info.error}"
1670-
)
1704+
if parsed_result is None or parsed_result.debug_info.error:
16711705
return []
16721706

16731707
cll: List[ColumnLineageInfo] = (
@@ -2031,6 +2065,8 @@ def _create_lineage_to_upstream_tables(
20312065
aspect_name=c.UPSTREAM_LINEAGE,
20322066
aspect=upstream_lineage,
20332067
)
2068+
self.report.num_tables_with_upstream_lineage += 1
2069+
self.report.num_upstream_table_lineage += len(upstream_tables)
20342070

20352071
@staticmethod
20362072
def _clean_tableau_query_parameters(query: str) -> str:
@@ -2130,7 +2166,7 @@ def parse_custom_sql(
21302166
f"Overridden info upstream_db={upstream_db}, platform_instance={platform_instance}, platform={platform}"
21312167
)
21322168

2133-
return create_lineage_sql_parsed_result(
2169+
parsed_result = create_lineage_sql_parsed_result(
21342170
query=query,
21352171
default_db=upstream_db,
21362172
platform=platform,
@@ -2140,6 +2176,21 @@ def parse_custom_sql(
21402176
schema_aware=not self.config.sql_parsing_disable_schema_awareness,
21412177
)
21422178

2179+
assert parsed_result is not None
2180+
2181+
if parsed_result.debug_info.table_error:
2182+
logger.warning(
2183+
f"Failed to extract table lineage from datasource {datasource_urn}: {parsed_result.debug_info.table_error}"
2184+
)
2185+
self.report.num_upstream_table_lineage_failed_parse_sql += 1
2186+
elif parsed_result.debug_info.column_error:
2187+
logger.warning(
2188+
f"Failed to extract column level lineage from datasource {datasource_urn}: {parsed_result.debug_info.column_error}"
2189+
)
2190+
self.report.num_upstream_fine_grained_lineage_failed_parse_sql += 1
2191+
2192+
return parsed_result
2193+
21432194
def _enrich_database_tables_with_parsed_schemas(
21442195
self, parsing_result: SqlParsingResult
21452196
) -> None:
@@ -2174,9 +2225,6 @@ def _create_lineage_from_unsupported_csql(
21742225
)
21752226

21762227
if parsed_result is None:
2177-
logger.info(
2178-
f"Failed to extract table level lineage for datasource {csql_urn}"
2179-
)
21802228
return
21812229

21822230
self._enrich_database_tables_with_parsed_schemas(parsed_result)
@@ -2196,12 +2244,14 @@ def _create_lineage_from_unsupported_csql(
21962244
upstreams=upstream_tables,
21972245
fineGrainedLineages=fine_grained_lineages,
21982246
)
2199-
22002247
yield self.get_metadata_change_proposal(
22012248
csql_urn,
22022249
aspect_name=c.UPSTREAM_LINEAGE,
22032250
aspect=upstream_lineage,
22042251
)
2252+
self.report.num_tables_with_upstream_lineage += 1
2253+
self.report.num_upstream_table_lineage += len(upstream_tables)
2254+
self.report.num_upstream_fine_grained_lineage += len(fine_grained_lineages)
22052255

22062256
def _get_schema_metadata_for_datasource(
22072257
self, datasource_fields: List[dict]
@@ -2278,12 +2328,11 @@ def emit_datasource(
22782328
)
22792329

22802330
# Tags
2281-
if datasource_info:
2331+
if datasource_info and self.config.ingest_tags:
22822332
tags = self.get_tags(datasource_info)
2283-
if tags:
2284-
dataset_snapshot.aspects.append(
2285-
builder.make_global_tag_aspect_with_tag_list(tags)
2286-
)
2333+
dataset_snapshot.aspects.append(
2334+
builder.make_global_tag_aspect_with_tag_list(tags)
2335+
)
22872336

22882337
# Browse path
22892338
if browse_path and is_embedded_ds and workbook and workbook.get(c.NAME):
@@ -2352,6 +2401,11 @@ def emit_datasource(
23522401
aspect_name=c.UPSTREAM_LINEAGE,
23532402
aspect=upstream_lineage,
23542403
)
2404+
self.report.num_tables_with_upstream_lineage += 1
2405+
self.report.num_upstream_table_lineage += len(upstream_tables)
2406+
self.report.num_upstream_fine_grained_lineage += len(
2407+
fine_grained_lineages
2408+
)
23552409

23562410
# Datasource Fields
23572411
schema_metadata = self._get_schema_metadata_for_datasource(
@@ -2669,7 +2723,13 @@ def emit_sheets(self) -> Iterable[MetadataWorkUnit]:
26692723
c.SHEETS_CONNECTION,
26702724
sheets_filter,
26712725
):
2672-
yield from self.emit_sheets_as_charts(sheet, sheet.get(c.WORKBOOK))
2726+
if self.config.ingest_hidden_assets or not self._is_hidden_view(sheet):
2727+
yield from self.emit_sheets_as_charts(sheet, sheet.get(c.WORKBOOK))
2728+
else:
2729+
self.report.num_hidden_assets_skipped += 1
2730+
logger.debug(
2731+
f"Skip view {sheet.get(c.ID)} because it's hidden (luid is blank)."
2732+
)
26732733

26742734
def emit_sheets_as_charts(
26752735
self, sheet: dict, workbook: Optional[Dict]
@@ -2760,11 +2820,17 @@ def emit_sheets_as_charts(
27602820
chart_snapshot.aspects.append(owner)
27612821

27622822
# Tags
2763-
tags = self.get_tags(sheet)
2764-
if tags:
2823+
if self.config.ingest_tags:
2824+
tags = self.get_tags(sheet)
2825+
if len(self.config.tags_for_hidden_assets) > 0 and self._is_hidden_view(
2826+
sheet
2827+
):
2828+
tags.extend(self.config.tags_for_hidden_assets)
2829+
27652830
chart_snapshot.aspects.append(
27662831
builder.make_global_tag_aspect_with_tag_list(tags)
27672832
)
2833+
27682834
yield self.get_metadata_change_event(chart_snapshot)
27692835
if sheet_external_url is not None and self.config.ingest_embed_url is True:
27702836
yield self.new_work_unit(
@@ -2846,7 +2912,7 @@ def emit_workbook_as_container(self, workbook: Dict) -> Iterable[MetadataWorkUni
28462912
else None
28472913
)
28482914

2849-
tags = self.get_tags(workbook)
2915+
tags = self.get_tags(workbook) if self.config.ingest_tags else None
28502916

28512917
parent_key = None
28522918
project_luid: Optional[str] = self._get_workbook_project_luid(workbook)
@@ -2977,17 +3043,23 @@ def emit_dashboards(self) -> Iterable[MetadataWorkUnit]:
29773043
c.DASHBOARDS_CONNECTION,
29783044
dashboards_filter,
29793045
):
2980-
yield from self.emit_dashboard(dashboard, dashboard.get(c.WORKBOOK))
3046+
if self.config.ingest_hidden_assets or not self._is_hidden_view(dashboard):
3047+
yield from self.emit_dashboard(dashboard, dashboard.get(c.WORKBOOK))
3048+
else:
3049+
self.report.num_hidden_assets_skipped += 1
3050+
logger.debug(
3051+
f"Skip dashboard {dashboard.get(c.ID)} because it's hidden (luid is blank)."
3052+
)
29813053

2982-
def get_tags(self, obj: dict) -> Optional[List[str]]:
3054+
def get_tags(self, obj: dict) -> List[str]:
29833055
tag_list = obj.get(c.TAGS, [])
2984-
if tag_list and self.config.ingest_tags:
3056+
if tag_list:
29853057
tag_list_str = [
29863058
t[c.NAME] for t in tag_list if t is not None and t.get(c.NAME)
29873059
]
29883060

29893061
return tag_list_str
2990-
return None
3062+
return []
29913063

29923064
def emit_dashboard(
29933065
self, dashboard: dict, workbook: Optional[Dict]
@@ -3038,8 +3110,13 @@ def emit_dashboard(
30383110
)
30393111
dashboard_snapshot.aspects.append(dashboard_info_class)
30403112

3041-
tags = self.get_tags(dashboard)
3042-
if tags:
3113+
if self.config.ingest_tags:
3114+
tags = self.get_tags(dashboard)
3115+
if len(self.config.tags_for_hidden_assets) > 0 and self._is_hidden_view(
3116+
dashboard
3117+
):
3118+
tags.extend(self.config.tags_for_hidden_assets)
3119+
30433120
dashboard_snapshot.aspects.append(
30443121
builder.make_global_tag_aspect_with_tag_list(tags)
30453122
)

0 commit comments

Comments
 (0)