Skip to content

Commit df00f35

Browse files
committed
merge resolve
1 parent d21e007 commit df00f35

File tree

1 file changed

+91
-2
lines changed
  • metadata-ingestion/src/datahub/ingestion/source

1 file changed

+91
-2
lines changed

metadata-ingestion/src/datahub/ingestion/source/superset.py

+91-2
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,11 @@
2424
make_dataset_urn_with_platform_instance,
2525
make_domain_urn,
2626
make_user_urn,
27+
make_term_urn,
2728
)
29+
from datahub.emitter.mcp import MetadataChangeProposalWrapper
2830
from datahub.emitter.mcp_builder import add_domain_to_entity_wu
31+
from datahub.emitter.rest_emitter import DatahubRestEmitter
2932
from datahub.ingestion.api.common import PipelineContext
3033
from datahub.ingestion.api.decorators import (
3134
SourceCapability,
@@ -37,6 +40,7 @@
3740
)
3841
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
3942
from datahub.ingestion.api.workunit import MetadataWorkUnit
43+
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
4044
from datahub.ingestion.source.sql.sql_types import resolve_sql_type
4145
from datahub.ingestion.source.state.stale_entity_removal_handler import (
4246
StaleEntityRemovalHandler,
@@ -47,6 +51,12 @@
4751
StatefulIngestionConfigBase,
4852
StatefulIngestionSourceBase,
4953
)
54+
from datahub.metadata._schema_classes import (
55+
AuditStampClass,
56+
GlossaryTermAssociationClass,
57+
GlossaryTermInfoClass,
58+
GlossaryTermsClass,
59+
)
5060
from datahub.metadata.com.linkedin.pegasus2avro.common import (
5161
ChangeAuditStamps,
5262
Status,
@@ -88,7 +98,6 @@
8898

8999
PAGE_SIZE = 25
90100

91-
92101
chart_type_from_viz_type = {
93102
"line": ChartTypeClass.LINE,
94103
"big_number": ChartTypeClass.LINE,
@@ -105,7 +114,6 @@
105114
"box_plot": ChartTypeClass.BAR,
106115
}
107116

108-
109117
platform_without_databases = ["druid"]
110118

111119

@@ -259,6 +267,7 @@ def __init__(self, ctx: PipelineContext, config: SupersetConfig):
259267
cached_domains=[domain_id for domain_id in self.config.domain],
260268
graph=self.ctx.graph,
261269
)
270+
self.sink_config = ctx.pipeline_config.sink.config
262271
self.session = self.login()
263272
self.owner_info = self.parse_owner_info()
264273

@@ -680,6 +689,72 @@ def gen_dataset_urn(self, datahub_dataset_name: str) -> str:
680689
env=self.config.env,
681690
)
682691

692+
def check_if_term_exists(self, term_urn):
693+
graph = DataHubGraph(
694+
DatahubClientConfig(
695+
server=self.sink_config.get("server", ""),
696+
token=self.sink_config.get("token", ""),
697+
)
698+
)
699+
# Query multiple aspects from entity
700+
result = graph.get_entity_semityped(
701+
entity_urn=term_urn,
702+
aspects=["glossaryTermInfo"],
703+
)
704+
705+
if result.get("glossaryTermInfo"):
706+
return True
707+
return False
708+
709+
def parse_glossary_terms_from_metrics(
710+
self, metrics, last_modified
711+
) -> GlossaryTermsClass:
712+
glossary_term_urns = []
713+
for metric in metrics:
714+
expression = metric.get("expression", "")
715+
certification_details = metric.get("extra", "")
716+
metric_name = metric.get("metric_name", "")
717+
description = metric.get("description", "")
718+
term_urn = make_term_urn(metric_name)
719+
720+
if self.check_if_term_exists(term_urn):
721+
logger.info(f"Term {term_urn} already exists")
722+
glossary_term_urns.append(GlossaryTermAssociationClass(urn=term_urn))
723+
continue
724+
725+
term_properties_aspect = GlossaryTermInfoClass(
726+
name=metric_name,
727+
definition=f"Description: {description} \nSql Expression: {expression} \nCertification details: {certification_details}",
728+
termSource="",
729+
)
730+
731+
event: MetadataChangeProposalWrapper = MetadataChangeProposalWrapper(
732+
entityUrn=term_urn,
733+
aspect=term_properties_aspect,
734+
)
735+
736+
# Create rest emitter
737+
rest_emitter = DatahubRestEmitter(
738+
gms_server=self.sink_config.get("server", ""),
739+
token=self.sink_config.get("token", ""),
740+
)
741+
rest_emitter.emit(event)
742+
logger.info(f"Created Glossary term {term_urn}")
743+
glossary_term_urns.append(GlossaryTermAssociationClass(urn=term_urn))
744+
745+
return GlossaryTermsClass(terms=glossary_term_urns, auditStamp=last_modified)
746+
747+
def _is_certified_metric(self, response_result: dict) -> bool:
748+
# We only want to ingest certified metrics for physical preset dataset
749+
metrics = response_result.get("metrics", {})
750+
extra = response_result.get("extra", {})
751+
# kind = response_result.get("kind")
752+
if (metrics and extra and "This table is produced by dbt" in extra):
753+
# and kind == "physical"):
754+
return True
755+
else:
756+
return False
757+
683758
def construct_dataset_from_dataset_data(
684759
self, dataset_data: dict
685760
) -> DatasetSnapshot:
@@ -690,6 +765,12 @@ def construct_dataset_from_dataset_data(
690765
dataset_response, self.platform
691766
)
692767
dataset_url = f"{self.config.display_uri}{dataset_response.get('result', {}).get('url', '')}"
768+
now = datetime.now().strftime("%I:%M%p on %B %d, %Y")
769+
modified_ts = int(
770+
dp.parse(dataset_data.get("changed_on") or now).timestamp() * 1000
771+
)
772+
modified_actor = f"urn:li:corpuser:{(dataset_data.get('changed_by') or {}).get('username', 'unknown')}"
773+
last_modified = AuditStampClass(time=modified_ts, actor=modified_actor)
693774

694775
modified_actor = f"urn:li:corpuser:{self.owner_info.get((dataset_data.get('changed_by') or {}).get('id', -1), 'unknown')}"
695776
modified_ts = int(
@@ -747,6 +828,14 @@ def construct_dataset_from_dataset_data(
747828
]
748829
)
749830

831+
response_result = dataset_response.get("result", {})
832+
833+
if self._is_certified_metric(response_result):
834+
glossary_terms = self.parse_glossary_terms_from_metrics(
835+
response_result.get("metrics", {}), last_modified
836+
)
837+
aspects_items.append(glossary_terms)
838+
750839
dataset_snapshot = DatasetSnapshot(
751840
urn=datasource_urn,
752841
aspects=aspects_items,

0 commit comments

Comments
 (0)