Skip to content

Commit 2e54461

Browse files
feat(ingest): add looker meta extractor support in sql parsing (#12062)
Co-authored-by: Mayuri N <[email protected]> Co-authored-by: Mayuri Nehate <[email protected]>
1 parent 953893c commit 2e54461

26 files changed

+1026
-79
lines changed

metadata-ingestion/src/datahub/configuration/source_common.py

+13
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,16 @@ class DatasetLineageProviderConfigBase(EnvConfigMixin):
6363
default=None,
6464
description="A holder for platform -> platform_instance mappings to generate correct dataset urns",
6565
)
66+
67+
68+
class PlatformDetail(ConfigModel):
69+
platform_instance: Optional[str] = Field(
70+
default=None,
71+
description="DataHub platform instance name. To generate correct urn for upstream dataset, this should match "
72+
"with platform instance name used in ingestion "
73+
"recipe of other datahub sources.",
74+
)
75+
env: str = Field(
76+
default=DEFAULT_ENV,
77+
description="The environment that all assets produced by DataHub platform ingestion source belong to",
78+
)

metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py

+54-2
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@
3131
from pydantic.class_validators import validator
3232

3333
import datahub.emitter.mce_builder as builder
34+
from datahub.api.entities.platformresource.platform_resource import (
35+
PlatformResource,
36+
PlatformResourceKey,
37+
)
3438
from datahub.emitter.mcp import MetadataChangeProposalWrapper
3539
from datahub.emitter.mcp_builder import ContainerKey, create_embed_mcp
3640
from datahub.ingestion.api.report import Report
@@ -106,7 +110,7 @@
106110
from datahub.utilities.url_util import remove_port_from_url
107111

108112
CORPUSER_DATAHUB = "urn:li:corpuser:datahub"
109-
113+
LOOKER = "looker"
110114
logger = logging.getLogger(__name__)
111115

112116

@@ -1411,6 +1415,7 @@ class LookerDashboardSourceReport(StaleEntityRemovalSourceReport):
14111415

14121416
resolved_user_ids: int = 0
14131417
email_ids_missing: int = 0 # resolved users with missing email addresses
1418+
looker_user_count: int = 0
14141419

14151420
_looker_api: Optional[LookerAPI] = None
14161421
query_latency: Dict[str, datetime.timedelta] = dataclasses_field(
@@ -1614,16 +1619,31 @@ def get_urn_dashboard_id(self):
16141619
class LookerUserRegistry:
16151620
looker_api_wrapper: LookerAPI
16161621
fields: str = ",".join(["id", "email", "display_name", "first_name", "last_name"])
1622+
_user_cache: Dict[str, LookerUser] = {}
16171623

1618-
def __init__(self, looker_api: LookerAPI):
1624+
def __init__(self, looker_api: LookerAPI, report: LookerDashboardSourceReport):
16191625
self.looker_api_wrapper = looker_api
1626+
self.report = report
1627+
self._initialize_user_cache()
1628+
1629+
def _initialize_user_cache(self) -> None:
1630+
raw_users: Sequence[User] = self.looker_api_wrapper.all_users(
1631+
user_fields=self.fields
1632+
)
1633+
1634+
for raw_user in raw_users:
1635+
looker_user = LookerUser.create_looker_user(raw_user)
1636+
self._user_cache[str(looker_user.id)] = looker_user
16201637

16211638
def get_by_id(self, id_: str) -> Optional[LookerUser]:
16221639
if not id_:
16231640
return None
16241641

16251642
logger.debug(f"Will get user {id_}")
16261643

1644+
if str(id_) in self._user_cache:
1645+
return self._user_cache.get(str(id_))
1646+
16271647
raw_user: Optional[User] = self.looker_api_wrapper.get_user(
16281648
str(id_), user_fields=self.fields
16291649
)
@@ -1632,3 +1652,35 @@ def get_by_id(self, id_: str) -> Optional[LookerUser]:
16321652

16331653
looker_user = LookerUser.create_looker_user(raw_user)
16341654
return looker_user
1655+
1656+
def to_platform_resource(
1657+
self, platform_instance: Optional[str]
1658+
) -> Iterable[MetadataChangeProposalWrapper]:
1659+
try:
1660+
platform_resource_key = PlatformResourceKey(
1661+
platform=LOOKER,
1662+
resource_type="USER_ID_MAPPING",
1663+
platform_instance=platform_instance,
1664+
primary_key="",
1665+
)
1666+
1667+
# Extract user email mappings
1668+
user_email_cache = {
1669+
user_id: user.email
1670+
for user_id, user in self._user_cache.items()
1671+
if user.email
1672+
}
1673+
1674+
platform_resource = PlatformResource.create(
1675+
key=platform_resource_key,
1676+
value=user_email_cache,
1677+
)
1678+
1679+
self.report.looker_user_count = len(user_email_cache)
1680+
yield from platform_resource.to_mcps()
1681+
1682+
except Exception as exc:
1683+
self.report.warning(
1684+
message="Failed to generate platform resource for looker id mappings",
1685+
exc=exc,
1686+
)

metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ class LookerAPIStats(BaseModel):
6868
get_look_calls: int = 0
6969
search_looks_calls: int = 0
7070
search_dashboards_calls: int = 0
71+
all_user_calls: int = 0
7172

7273

7374
class LookerAPI:
@@ -135,7 +136,7 @@ def get_available_permissions(self) -> Set[str]:
135136

136137
return permissions
137138

138-
@lru_cache(maxsize=1000)
139+
@lru_cache(maxsize=5000)
139140
def get_user(self, id_: str, user_fields: str) -> Optional[User]:
140141
self.client_stats.user_calls += 1
141142
try:
@@ -154,6 +155,17 @@ def get_user(self, id_: str, user_fields: str) -> Optional[User]:
154155
# User not found
155156
return None
156157

158+
def all_users(self, user_fields: str) -> Sequence[User]:
159+
self.client_stats.all_user_calls += 1
160+
try:
161+
return self.client.all_users(
162+
fields=cast(str, user_fields),
163+
transport_options=self.transport_options,
164+
)
165+
except SDKError as e:
166+
logger.warning(f"Failure was {e}")
167+
return []
168+
157169
def execute_query(self, write_query: WriteQuery) -> List[Dict]:
158170
logger.debug(f"Executing query {write_query}")
159171
self.client_stats.query_calls += 1

metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py

+12-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,9 @@ def __init__(self, config: LookerDashboardSourceConfig, ctx: PipelineContext):
145145
self.source_config: LookerDashboardSourceConfig = config
146146
self.reporter: LookerDashboardSourceReport = LookerDashboardSourceReport()
147147
self.looker_api: LookerAPI = LookerAPI(self.source_config)
148-
self.user_registry: LookerUserRegistry = LookerUserRegistry(self.looker_api)
148+
self.user_registry: LookerUserRegistry = LookerUserRegistry(
149+
self.looker_api, self.reporter
150+
)
149151
self.explore_registry: LookerExploreRegistry = LookerExploreRegistry(
150152
self.looker_api, self.reporter, self.source_config
151153
)
@@ -1673,5 +1675,14 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
16731675
yield usage_mcp.as_workunit()
16741676
self.reporter.report_stage_end("usage_extraction")
16751677

1678+
# Dump looker user resource mappings.
1679+
logger.info("Ingesting looker user resource mapping workunits")
1680+
self.reporter.report_stage_start("user_resource_extraction")
1681+
yield from auto_workunit(
1682+
self.user_registry.to_platform_resource(
1683+
self.source_config.platform_instance
1684+
)
1685+
)
1686+
16761687
def get_report(self) -> SourceReport:
16771688
return self.reporter

metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py

+1-14
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
import datahub.emitter.mce_builder as builder
1111
from datahub.configuration.common import AllowDenyPattern, ConfigModel
12-
from datahub.configuration.source_common import DatasetSourceConfigMixin
12+
from datahub.configuration.source_common import DatasetSourceConfigMixin, PlatformDetail
1313
from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated
1414
from datahub.ingestion.source.common.subtypes import BIAssetSubTypes
1515
from datahub.ingestion.source.state.stale_entity_removal_handler import (
@@ -232,19 +232,6 @@ def default_for_dataset_type_mapping() -> Dict[str, str]:
232232
return dict_
233233

234234

235-
class PlatformDetail(ConfigModel):
236-
platform_instance: Optional[str] = pydantic.Field(
237-
default=None,
238-
description="DataHub platform instance name. To generate correct urn for upstream dataset, this should match "
239-
"with platform instance name used in ingestion "
240-
"recipe of other datahub sources.",
241-
)
242-
env: str = pydantic.Field(
243-
default=builder.DEFAULT_ENV,
244-
description="The environment that all assets produced by DataHub platform ingestion source belong to",
245-
)
246-
247-
248235
class DataBricksPlatformDetail(PlatformDetail):
249236
"""
250237
metastore is an additional field used in Databricks connector to generate the dataset urn

metadata-ingestion/src/datahub/ingestion/source/powerbi/dataplatform_instance_resolver.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
from abc import ABC, abstractmethod
33
from typing import Union
44

5+
from datahub.configuration.source_common import PlatformDetail
56
from datahub.ingestion.source.powerbi.config import (
6-
PlatformDetail,
77
PowerBiDashboardSourceConfig,
88
PowerBIPlatformDetail,
99
)

metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/pattern_handler.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@
55

66
from lark import Tree
77

8+
from datahub.configuration.source_common import PlatformDetail
89
from datahub.emitter import mce_builder as builder
910
from datahub.ingestion.api.common import PipelineContext
1011
from datahub.ingestion.source.powerbi.config import (
1112
Constant,
1213
DataBricksPlatformDetail,
1314
DataPlatformPair,
14-
PlatformDetail,
1515
PowerBiDashboardSourceConfig,
1616
PowerBiDashboardSourceReport,
1717
PowerBIPlatformDetail,

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py

+1
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
540540
identifiers=self.identifiers,
541541
schema_resolver=schema_resolver,
542542
discovered_tables=discovered_datasets,
543+
graph=self.ctx.graph,
543544
)
544545

545546
# TODO: This is slightly suboptimal because we create two SqlParsingAggregator instances with different configs

metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ def __init__(
490490
self._exit_stack.push(self._query_usage_counts)
491491

492492
# Tool Extractor
493-
self._tool_meta_extractor = ToolMetaExtractor()
493+
self._tool_meta_extractor = ToolMetaExtractor.create(graph)
494494
self.report.tool_meta_report = self._tool_meta_extractor.report
495495

496496
def close(self) -> None:

0 commit comments

Comments
 (0)