Skip to content

Commit cffc6d4

Browse files
authored
feat(ingestion/superset): superset column level lineage (#12786)
1 parent 3df12dc commit cffc6d4

File tree

4 files changed

+2222
-123
lines changed

4 files changed

+2222
-123
lines changed

metadata-ingestion/src/datahub/ingestion/source/superset.py

+118-14
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
make_dataset_urn,
2424
make_dataset_urn_with_platform_instance,
2525
make_domain_urn,
26+
make_schema_field_urn,
2627
make_user_urn,
2728
)
2829
from datahub.emitter.mcp_builder import add_domain_to_entity_wu
@@ -72,6 +73,9 @@
7273
DashboardInfoClass,
7374
DatasetLineageTypeClass,
7475
DatasetPropertiesClass,
76+
FineGrainedLineageClass,
77+
FineGrainedLineageDownstreamTypeClass,
78+
FineGrainedLineageUpstreamTypeClass,
7579
GlobalTagsClass,
7680
OwnerClass,
7781
OwnershipClass,
@@ -80,6 +84,10 @@
8084
UpstreamClass,
8185
UpstreamLineageClass,
8286
)
87+
from datahub.sql_parsing.sqlglot_lineage import (
88+
SqlParsingResult,
89+
create_lineage_sql_parsed_result,
90+
)
8391
from datahub.utilities import config_clean
8492
from datahub.utilities.lossy_collections import LossyList
8593
from datahub.utilities.registries.domain_registry import DomainRegistry
@@ -342,7 +350,7 @@ def get_dataset_info(self, dataset_id: int) -> dict:
342350
)
343351
if dataset_response.status_code != 200:
344352
logger.warning(f"Failed to get dataset info: {dataset_response.text}")
345-
dataset_response.raise_for_status()
353+
return {}
346354
return dataset_response.json()
347355

348356
def get_datasource_urn_from_id(
@@ -680,6 +688,88 @@ def gen_dataset_urn(self, datahub_dataset_name: str) -> str:
680688
env=self.config.env,
681689
)
682690

691+
def generate_virtual_dataset_lineage(
692+
self,
693+
parsed_query_object: SqlParsingResult,
694+
datasource_urn: str,
695+
) -> UpstreamLineageClass:
696+
cll = (
697+
parsed_query_object.column_lineage
698+
if parsed_query_object.column_lineage is not None
699+
else []
700+
)
701+
702+
fine_grained_lineages: List[FineGrainedLineageClass] = []
703+
704+
for cll_info in cll:
705+
downstream = (
706+
[make_schema_field_urn(datasource_urn, cll_info.downstream.column)]
707+
if cll_info.downstream and cll_info.downstream.column
708+
else []
709+
)
710+
upstreams = [
711+
make_schema_field_urn(column_ref.table, column_ref.column)
712+
for column_ref in cll_info.upstreams
713+
]
714+
fine_grained_lineages.append(
715+
FineGrainedLineageClass(
716+
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
717+
downstreams=downstream,
718+
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
719+
upstreams=upstreams,
720+
)
721+
)
722+
723+
upstream_lineage = UpstreamLineageClass(
724+
upstreams=[
725+
UpstreamClass(
726+
type=DatasetLineageTypeClass.TRANSFORMED,
727+
dataset=input_table_urn,
728+
)
729+
for input_table_urn in parsed_query_object.in_tables
730+
],
731+
fineGrainedLineages=fine_grained_lineages,
732+
)
733+
return upstream_lineage
734+
735+
def generate_physical_dataset_lineage(
736+
self,
737+
dataset_response: dict,
738+
upstream_dataset: str,
739+
datasource_urn: str,
740+
) -> UpstreamLineageClass:
741+
# To generate column level lineage, we can manually decode the metadata
742+
# to produce the ColumnLineageInfo
743+
columns = dataset_response.get("result", {}).get("columns", [])
744+
fine_grained_lineages: List[FineGrainedLineageClass] = []
745+
746+
for column in columns:
747+
column_name = column.get("column_name", "")
748+
if not column_name:
749+
continue
750+
751+
downstream = [make_schema_field_urn(datasource_urn, column_name)]
752+
upstreams = [make_schema_field_urn(upstream_dataset, column_name)]
753+
fine_grained_lineages.append(
754+
FineGrainedLineageClass(
755+
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
756+
downstreams=downstream,
757+
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
758+
upstreams=upstreams,
759+
)
760+
)
761+
762+
upstream_lineage = UpstreamLineageClass(
763+
upstreams=[
764+
UpstreamClass(
765+
type=DatasetLineageTypeClass.TRANSFORMED,
766+
dataset=upstream_dataset,
767+
)
768+
],
769+
fineGrainedLineages=fine_grained_lineages,
770+
)
771+
return upstream_lineage
772+
683773
def construct_dataset_from_dataset_data(
684774
self, dataset_data: dict
685775
) -> DatasetSnapshot:
@@ -700,6 +790,14 @@ def construct_dataset_from_dataset_data(
700790
upstream_warehouse_platform = (
701791
dataset_response.get("result", {}).get("database", {}).get("backend")
702792
)
793+
upstream_warehouse_db_name = (
794+
dataset_response.get("result", {}).get("database", {}).get("database_name")
795+
)
796+
797+
# if we have rendered sql, we always use that and defualt back to regular sql
798+
sql = dataset_response.get("result", {}).get(
799+
"rendered_sql"
800+
) or dataset_response.get("result", {}).get("sql")
703801

704802
# Preset has a way of naming their platforms differently than
705803
# how datahub names them, so map the platform name to the correct naming
@@ -712,22 +810,28 @@ def construct_dataset_from_dataset_data(
712810
if upstream_warehouse_platform in warehouse_naming:
713811
upstream_warehouse_platform = warehouse_naming[upstream_warehouse_platform]
714812

715-
# TODO: Categorize physical vs virtual upstream dataset
716-
# mark all upstream dataset as physical for now, in the future we would ideally like
717-
# to differentiate physical vs virtual upstream datasets
718-
tag_urn = f"urn:li:tag:{self.platform}:physical"
719813
upstream_dataset = self.get_datasource_urn_from_id(
720814
dataset_response, upstream_warehouse_platform
721815
)
722-
upstream_lineage = UpstreamLineageClass(
723-
upstreams=[
724-
UpstreamClass(
725-
type=DatasetLineageTypeClass.TRANSFORMED,
726-
dataset=upstream_dataset,
727-
properties={"externalUrl": dataset_url},
728-
)
729-
]
730-
)
816+
817+
# Sometimes the field will be null instead of not existing
818+
if sql == "null" or not sql:
819+
tag_urn = f"urn:li:tag:{self.platform}:physical"
820+
upstream_lineage = self.generate_physical_dataset_lineage(
821+
dataset_response, upstream_dataset, datasource_urn
822+
)
823+
else:
824+
tag_urn = f"urn:li:tag:{self.platform}:virtual"
825+
parsed_query_object = create_lineage_sql_parsed_result(
826+
query=sql,
827+
default_db=upstream_warehouse_db_name,
828+
platform=upstream_warehouse_platform,
829+
platform_instance=None,
830+
env=self.config.env,
831+
)
832+
upstream_lineage = self.generate_virtual_dataset_lineage(
833+
parsed_query_object, datasource_urn
834+
)
731835

732836
dataset_info = DatasetPropertiesClass(
733837
name=dataset.table_name,

0 commit comments

Comments
 (0)