Skip to content

Commit 09a9b6e

Browse files
authored
feat(ingest/looker): Do not emit usage for non-ingested dashboards and charts (#11647)
1 parent 047644b commit 09a9b6e

File tree

5 files changed

+482
-40
lines changed

5 files changed

+482
-40
lines changed

metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py

+9
Original file line numberDiff line numberDiff line change
@@ -1408,6 +1408,15 @@ class LookerDashboardSourceReport(StaleEntityRemovalSourceReport):
14081408
dashboards_with_activity: LossySet[str] = dataclasses_field(
14091409
default_factory=LossySet
14101410
)
1411+
1412+
# Entities that don't seem to exist, so we don't emit usage aspects for them despite having usage data
1413+
dashboards_skipped_for_usage: LossySet[str] = dataclasses_field(
1414+
default_factory=LossySet
1415+
)
1416+
charts_skipped_for_usage: LossySet[str] = dataclasses_field(
1417+
default_factory=LossySet
1418+
)
1419+
14111420
stage_latency: List[StageLatency] = dataclasses_field(default_factory=list)
14121421
_looker_explore_registry: Optional[LookerExploreRegistry] = None
14131422
total_explores: int = 0

metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py

+19-3
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
ViewField,
6969
ViewFieldType,
7070
gen_model_key,
71+
get_urn_looker_element_id,
7172
)
7273
from datahub.ingestion.source.looker.looker_config import LookerDashboardSourceConfig
7374
from datahub.ingestion.source.looker.looker_lib_wrapper import LookerAPI
@@ -165,6 +166,9 @@ def __init__(self, config: LookerDashboardSourceConfig, ctx: PipelineContext):
165166
# Required, as we do not ingest all folders but only those that have dashboards/looks
166167
self.processed_folders: List[str] = []
167168

169+
# Keep track of ingested chart urns, to omit usage for non-ingested entities
170+
self.chart_urns: Set[str] = set()
171+
168172
@staticmethod
169173
def test_connection(config_dict: dict) -> TestConnectionReport:
170174
test_report = TestConnectionReport()
@@ -642,6 +646,7 @@ def _make_chart_metadata_events(
642646
chart_urn = self._make_chart_urn(
643647
element_id=dashboard_element.get_urn_element_id()
644648
)
649+
self.chart_urns.add(chart_urn)
645650
chart_snapshot = ChartSnapshot(
646651
urn=chart_urn,
647652
aspects=[Status(removed=False)],
@@ -1380,7 +1385,9 @@ def _get_folder_and_ancestors_workunits(
13801385
yield from self._emit_folder_as_container(folder)
13811386

13821387
def extract_usage_stat(
1383-
self, looker_dashboards: List[looker_usage.LookerDashboardForUsage]
1388+
self,
1389+
looker_dashboards: List[looker_usage.LookerDashboardForUsage],
1390+
ingested_chart_urns: Set[str],
13841391
) -> List[MetadataChangeProposalWrapper]:
13851392
looks: List[looker_usage.LookerChartForUsage] = []
13861393
# filter out look from all dashboard
@@ -1391,6 +1398,15 @@ def extract_usage_stat(
13911398

13921399
# dedup looks
13931400
looks = list({str(look.id): look for look in looks}.values())
1401+
filtered_looks = []
1402+
for look in looks:
1403+
if not look.id:
1404+
continue
1405+
chart_urn = self._make_chart_urn(get_urn_looker_element_id(look.id))
1406+
if chart_urn in ingested_chart_urns:
1407+
filtered_looks.append(look)
1408+
else:
1409+
self.reporter.charts_skipped_for_usage.add(look.id)
13941410

13951411
# Keep stat generators to generate entity stat aspect later
13961412
stat_generator_config: looker_usage.StatGeneratorConfig = (
@@ -1414,7 +1430,7 @@ def extract_usage_stat(
14141430
stat_generator_config,
14151431
self.reporter,
14161432
self._make_chart_urn,
1417-
looks,
1433+
filtered_looks,
14181434
)
14191435

14201436
mcps: List[MetadataChangeProposalWrapper] = []
@@ -1669,7 +1685,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
16691685
if self.source_config.extract_usage_history:
16701686
self.reporter.report_stage_start("usage_extraction")
16711687
usage_mcps: List[MetadataChangeProposalWrapper] = self.extract_usage_stat(
1672-
looker_dashboards_for_usage
1688+
looker_dashboards_for_usage, self.chart_urns
16731689
)
16741690
for usage_mcp in usage_mcps:
16751691
yield usage_mcp.as_workunit()

metadata-ingestion/src/datahub/ingestion/source/looker/looker_usage.py

+23-17
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
TimeWindowSizeClass,
4343
_Aspect as AspectAbstract,
4444
)
45+
from datahub.utilities.lossy_collections import LossySet
4546

4647
logger = logging.getLogger(__name__)
4748

@@ -170,7 +171,7 @@ def __init__(
170171
self.config = config
171172
self.looker_models = looker_models
172173
# Later it will help to find out for what are the looker entities from query result
173-
self.id_vs_model: Dict[str, ModelForUsage] = {
174+
self.id_to_model: Dict[str, ModelForUsage] = {
174175
self.get_id(looker_object): looker_object for looker_object in looker_models
175176
}
176177
self.post_filter = len(self.looker_models) > 100
@@ -225,6 +226,10 @@ def get_id(self, looker_object: ModelForUsage) -> str:
225226
def get_id_from_row(self, row: dict) -> str:
226227
pass
227228

229+
@abstractmethod
230+
def report_skip_set(self) -> LossySet[str]:
231+
pass
232+
228233
def create_mcp(
229234
self, model: ModelForUsage, aspect: Aspect
230235
) -> MetadataChangeProposalWrapper:
@@ -258,20 +263,11 @@ def _process_entity_timeseries_rows(
258263

259264
return entity_stat_aspect
260265

261-
def _process_absolute_aspect(self) -> List[Tuple[ModelForUsage, AspectAbstract]]:
262-
aspects: List[Tuple[ModelForUsage, AspectAbstract]] = []
263-
for looker_object in self.looker_models:
264-
aspects.append(
265-
(looker_object, self.to_entity_absolute_stat_aspect(looker_object))
266-
)
267-
268-
return aspects
269-
270266
def _fill_user_stat_aspect(
271267
self,
272268
entity_usage_stat: Dict[Tuple[str, str], Aspect],
273269
user_wise_rows: List[Dict],
274-
) -> Iterable[Tuple[ModelForUsage, Aspect]]:
270+
) -> Iterable[Tuple[str, Aspect]]:
275271
logger.debug("Entering fill user stat aspect")
276272

277273
# We first resolve all the users using a threadpool to warm up the cache
@@ -300,7 +296,7 @@ def _fill_user_stat_aspect(
300296

301297
for row in user_wise_rows:
302298
# Confirm looker object was given for stat generation
303-
looker_object = self.id_vs_model.get(self.get_id_from_row(row))
299+
looker_object = self.id_to_model.get(self.get_id_from_row(row))
304300
if looker_object is None:
305301
logger.warning(
306302
"Looker object with id({}) was not register with stat generator".format(
@@ -338,7 +334,7 @@ def _fill_user_stat_aspect(
338334
logger.debug("Starting to yield answers for user-wise counts")
339335

340336
for (id, _), aspect in entity_usage_stat.items():
341-
yield self.id_vs_model[id], aspect
337+
yield id, aspect
342338

343339
def _execute_query(self, query: LookerQuery, query_name: str) -> List[Dict]:
344340
rows = []
@@ -357,7 +353,7 @@ def _execute_query(self, query: LookerQuery, query_name: str) -> List[Dict]:
357353
)
358354
if self.post_filter:
359355
logger.debug("post filtering")
360-
rows = [r for r in rows if self.get_id_from_row(r) in self.id_vs_model]
356+
rows = [r for r in rows if self.get_id_from_row(r) in self.id_to_model]
361357
logger.debug("Filtered down to %d rows", len(rows))
362358
except Exception as e:
363359
logger.warning(f"Failed to execute {query_name} query: {e}")
@@ -378,7 +374,8 @@ def generate_usage_stat_mcps(self) -> Iterable[MetadataChangeProposalWrapper]:
378374
return
379375

380376
# yield absolute stat for looker entities
381-
for looker_object, aspect in self._process_absolute_aspect(): # type: ignore
377+
for looker_object in self.looker_models:
378+
aspect = self.to_entity_absolute_stat_aspect(looker_object)
382379
yield self.create_mcp(looker_object, aspect)
383380

384381
# Execute query and process the raw json which contains stat information
@@ -399,10 +396,13 @@ def generate_usage_stat_mcps(self) -> Iterable[MetadataChangeProposalWrapper]:
399396
)
400397
user_wise_rows = self._execute_query(user_wise_query_with_filters, "user_query")
401398
# yield absolute stat for entity
402-
for looker_object, aspect in self._fill_user_stat_aspect(
399+
for object_id, aspect in self._fill_user_stat_aspect(
403400
entity_usage_stat, user_wise_rows
404401
):
405-
yield self.create_mcp(looker_object, aspect)
402+
if object_id in self.id_to_model:
403+
yield self.create_mcp(self.id_to_model[object_id], aspect)
404+
else:
405+
self.report_skip_set().add(object_id)
406406

407407

408408
class DashboardStatGenerator(BaseStatGenerator):
@@ -425,6 +425,9 @@ def __init__(
425425
def get_stats_generator_name(self) -> str:
426426
return "DashboardStats"
427427

428+
def report_skip_set(self) -> LossySet[str]:
429+
return self.report.dashboards_skipped_for_usage
430+
428431
def get_filter(self) -> Dict[ViewField, str]:
429432
return {
430433
HistoryViewField.HISTORY_DASHBOARD_ID: ",".join(
@@ -541,6 +544,9 @@ def __init__(
541544
def get_stats_generator_name(self) -> str:
542545
return "ChartStats"
543546

547+
def report_skip_set(self) -> LossySet[str]:
548+
return self.report.charts_skipped_for_usage
549+
544550
def get_filter(self) -> Dict[ViewField, str]:
545551
return {
546552
LookViewField.LOOK_ID: ",".join(

0 commit comments

Comments
 (0)