1
1
import json
2
2
import logging
3
+ from dataclasses import dataclass , field
3
4
from datetime import datetime
4
5
from functools import lru_cache
5
6
from typing import Any , Dict , Iterable , List , Optional
80
81
UpstreamLineageClass ,
81
82
)
82
83
from datahub .utilities import config_clean
84
+ from datahub .utilities .lossy_collections import LossyList
83
85
from datahub .utilities .registries .domain_registry import DomainRegistry
84
86
85
87
logger = logging .getLogger (__name__ )
107
109
platform_without_databases = ["druid" ]
108
110
109
111
112
+ @dataclass
113
+ class SupersetSourceReport (StaleEntityRemovalSourceReport ):
114
+ filtered : LossyList [str ] = field (default_factory = LossyList )
115
+
116
+ def report_dropped (self , name : str ) -> None :
117
+ self .filtered .append (name )
118
+
119
+
110
120
class SupersetDataset (BaseModel ):
111
121
id : int
112
122
table_name : str
@@ -142,6 +152,18 @@ class SupersetConfig(
142
152
default = dict (),
143
153
description = "regex patterns for tables to filter to assign domain_key. " ,
144
154
)
155
+ dataset_pattern : AllowDenyPattern = Field (
156
+ default = AllowDenyPattern .allow_all (),
157
+ description = "Regex patterns for dataset to filter in ingestion." ,
158
+ )
159
+ chart_pattern : AllowDenyPattern = Field (
160
+ AllowDenyPattern .allow_all (),
161
+ description = "Patterns for selecting chart names that are to be included" ,
162
+ )
163
+ dashboard_pattern : AllowDenyPattern = Field (
164
+ AllowDenyPattern .allow_all (),
165
+ description = "Patterns for selecting dashboard names that are to be included" ,
166
+ )
145
167
username : Optional [str ] = Field (default = None , description = "Superset username." )
146
168
password : Optional [str ] = Field (default = None , description = "Superset password." )
147
169
# Configuration for stateful ingestion
@@ -222,7 +244,7 @@ class SupersetSource(StatefulIngestionSourceBase):
222
244
"""
223
245
224
246
config : SupersetConfig
225
- report : StaleEntityRemovalSourceReport
247
+ report : SupersetSourceReport
226
248
platform = "superset"
227
249
228
250
def __hash__ (self ):
@@ -231,7 +253,7 @@ def __hash__(self):
231
253
def __init__ (self , ctx : PipelineContext , config : SupersetConfig ):
232
254
super ().__init__ (config , ctx )
233
255
self .config = config
234
- self .report = StaleEntityRemovalSourceReport ()
256
+ self .report = SupersetSourceReport ()
235
257
if self .config .domain :
236
258
self .domain_registry = DomainRegistry (
237
259
cached_domains = [domain_id for domain_id in self .config .domain ],
@@ -449,6 +471,15 @@ def construct_dashboard_from_api_data(
449
471
def emit_dashboard_mces (self ) -> Iterable [MetadataWorkUnit ]:
450
472
for dashboard_data in self .paginate_entity_api_results ("dashboard/" , PAGE_SIZE ):
451
473
try :
474
+ dashboard_id = str (dashboard_data .get ("id" ))
475
+ dashboard_title = dashboard_data .get ("dashboard_title" , "" )
476
+
477
+ if not self .config .dashboard_pattern .allowed (dashboard_title ):
478
+ self .report .report_dropped (
479
+ f"Dashboard '{ dashboard_title } ' (id: { dashboard_id } ) filtered by dashboard_pattern"
480
+ )
481
+ continue
482
+
452
483
dashboard_snapshot = self .construct_dashboard_from_api_data (
453
484
dashboard_data
454
485
)
@@ -461,7 +492,7 @@ def emit_dashboard_mces(self) -> Iterable[MetadataWorkUnit]:
461
492
mce = MetadataChangeEvent (proposedSnapshot = dashboard_snapshot )
462
493
yield MetadataWorkUnit (id = dashboard_snapshot .urn , mce = mce )
463
494
yield from self ._get_domain_wu (
464
- title = dashboard_data . get ( " dashboard_title" , "" ) ,
495
+ title = dashboard_title ,
465
496
entity_urn = dashboard_snapshot .urn ,
466
497
)
467
498
@@ -569,12 +600,37 @@ def construct_chart_from_chart_data(self, chart_data: dict) -> ChartSnapshot:
569
600
def emit_chart_mces (self ) -> Iterable [MetadataWorkUnit ]:
570
601
for chart_data in self .paginate_entity_api_results ("chart/" , PAGE_SIZE ):
571
602
try :
603
+ chart_id = str (chart_data .get ("id" ))
604
+ chart_name = chart_data .get ("slice_name" , "" )
605
+
606
+ if not self .config .chart_pattern .allowed (chart_name ):
607
+ self .report .report_dropped (
608
+ f"Chart '{ chart_name } ' (id: { chart_id } ) filtered by chart_pattern"
609
+ )
610
+ continue
611
+
612
+ # Emit a warning if charts use data from a dataset that will be filtered out
613
+ if self .config .dataset_pattern != AllowDenyPattern .allow_all ():
614
+ datasource_id = chart_data .get ("datasource_id" )
615
+ if datasource_id :
616
+ dataset_response = self .get_dataset_info (datasource_id )
617
+ dataset_name = dataset_response .get ("result" , {}).get (
618
+ "table_name" , ""
619
+ )
620
+
621
+ if dataset_name and not self .config .dataset_pattern .allowed (
622
+ dataset_name
623
+ ):
624
+ self .report .warning (
625
+ f"Chart '{ chart_name } ' (id: { chart_id } ) uses dataset '{ dataset_name } ' which is filtered by dataset_pattern"
626
+ )
627
+
572
628
chart_snapshot = self .construct_chart_from_chart_data (chart_data )
573
629
574
630
mce = MetadataChangeEvent (proposedSnapshot = chart_snapshot )
575
631
except Exception as e :
576
632
self .report .warning (
577
- f"Failed to construct chart snapshot. Chart name: { chart_data . get ( 'table_name' ) } . Error: \n { e } "
633
+ f"Failed to construct chart snapshot. Chart name: { chart_name } . Error: \n { e } "
578
634
)
579
635
continue
580
636
# Emit the chart
@@ -716,6 +772,15 @@ def construct_dataset_from_dataset_data(
716
772
def emit_dataset_mces (self ) -> Iterable [MetadataWorkUnit ]:
717
773
for dataset_data in self .paginate_entity_api_results ("dataset/" , PAGE_SIZE ):
718
774
try :
775
+ dataset_name = dataset_data .get ("table_name" , "" )
776
+
777
+ # Check if dataset should be filtered by dataset name
778
+ if not self .config .dataset_pattern .allowed (dataset_name ):
779
+ self .report .report_dropped (
780
+ f"Dataset '{ dataset_name } ' filtered by dataset_pattern"
781
+ )
782
+ continue
783
+
719
784
dataset_snapshot = self .construct_dataset_from_dataset_data (
720
785
dataset_data
721
786
)
0 commit comments