Skip to content

Commit ba0ced7

Browse files
committed
added column level lineage for physical datasets
1 parent 549f2a6 commit ba0ced7

File tree

3 files changed

+1285
-309
lines changed

3 files changed

+1285
-309
lines changed

metadata-ingestion/src/datahub/ingestion/source/superset.py

+83-25
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@
8585
UpstreamLineageClass,
8686
)
8787
from datahub.sql_parsing.sqlglot_lineage import (
88+
ColumnLineageInfo,
89+
ColumnRef,
90+
DownstreamColumnRef,
8891
SqlParsingResult,
8992
create_lineage_sql_parsed_result,
9093
)
@@ -692,7 +695,6 @@ def generate_virtual_dataset_lineage(
692695
self,
693696
parsed_query_object: SqlParsingResult,
694697
datasource_urn: str,
695-
dataset_url: str,
696698
) -> UpstreamLineageClass:
697699
cll = (
698700
parsed_query_object.column_lineage
@@ -733,6 +735,72 @@ def generate_virtual_dataset_lineage(
733735
)
734736
return upstream_lineage
735737

738+
def generate_physical_dataset_lineage(
739+
self,
740+
dataset_response: dict,
741+
upstream_warehouse_platform: str,
742+
datasource_urn: str,
743+
) -> UpstreamLineageClass:
744+
# To generate column level lineage, we can manually decode the metadata
745+
# to produce the ColumnLineageInfo
746+
columns = dataset_response.get("result", {}).get("columns", [])
747+
cll: List[ColumnLineageInfo] = []
748+
749+
for column in columns:
750+
cll.append(
751+
ColumnLineageInfo(
752+
downstream=DownstreamColumnRef(
753+
table=None,
754+
column=column.get("column_name", ""),
755+
column_type=column.get("type", ""),
756+
native_column_type=column.get("type", ""),
757+
),
758+
upstreams=[
759+
ColumnRef(
760+
table=upstream_warehouse_platform,
761+
column=column.get("column_name", ""),
762+
)
763+
],
764+
logic=None,
765+
)
766+
)
767+
768+
fine_grained_lineages: List[FineGrainedLineageClass] = []
769+
770+
for cll_info in cll:
771+
downstream = (
772+
[make_schema_field_urn(datasource_urn, cll_info.downstream.column)]
773+
if cll_info.downstream and cll_info.downstream.column
774+
else []
775+
)
776+
upstreams = [
777+
make_schema_field_urn(column_ref.table, column_ref.column)
778+
for column_ref in cll_info.upstreams
779+
]
780+
fine_grained_lineages.append(
781+
FineGrainedLineageClass(
782+
downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
783+
downstreams=downstream,
784+
upstreamType=FineGrainedLineageUpstreamTypeClass.FIELD_SET,
785+
upstreams=upstreams,
786+
)
787+
)
788+
789+
upstream_dataset = self.get_datasource_urn_from_id(
790+
dataset_response, upstream_warehouse_platform
791+
)
792+
793+
upstream_lineage = UpstreamLineageClass(
794+
upstreams=[
795+
UpstreamClass(
796+
type=DatasetLineageTypeClass.TRANSFORMED,
797+
dataset=upstream_dataset,
798+
)
799+
],
800+
fineGrainedLineages=fine_grained_lineages,
801+
)
802+
return upstream_lineage
803+
736804
def construct_dataset_from_dataset_data(
737805
self, dataset_data: dict
738806
) -> DatasetSnapshot:
@@ -773,33 +841,23 @@ def construct_dataset_from_dataset_data(
773841
if upstream_warehouse_platform in warehouse_naming:
774842
upstream_warehouse_platform = warehouse_naming[upstream_warehouse_platform]
775843

776-
parsed_query_object = create_lineage_sql_parsed_result(
777-
query=sql,
778-
default_db=upstream_warehouse_db_name,
779-
platform=upstream_warehouse_platform,
780-
platform_instance=None,
781-
env=self.config.env,
782-
)
783-
784-
# if we have sql, we label the datasets as virtual
785-
if sql:
786-
tag_urn = f"urn:li:tag:{self.platform}:virtual"
787-
upstream_lineage = self.generate_virtual_dataset_lineage(
788-
parsed_query_object, datasource_urn, dataset_url
844+
# Sometimes the field will be null instead of not existing
845+
if sql == "null" or not sql:
846+
tag_urn = f"urn:li:tag:{self.platform}:physical"
847+
upstream_lineage = self.generate_physical_dataset_lineage(
848+
dataset_response, upstream_warehouse_platform, datasource_urn
789849
)
790850
else:
791-
tag_urn = f"urn:li:tag:{self.platform}:physical"
792-
upstream_dataset = self.get_datasource_urn_from_id(
793-
dataset_response, upstream_warehouse_platform
851+
tag_urn = f"urn:li:tag:{self.platform}:virtual"
852+
parsed_query_object = create_lineage_sql_parsed_result(
853+
query=sql,
854+
default_db=upstream_warehouse_db_name,
855+
platform=upstream_warehouse_platform,
856+
platform_instance=None,
857+
env=self.config.env,
794858
)
795-
upstream_lineage = UpstreamLineageClass(
796-
upstreams=[
797-
UpstreamClass(
798-
type=DatasetLineageTypeClass.TRANSFORMED,
799-
dataset=upstream_dataset,
800-
properties={"externalUrl": dataset_url},
801-
)
802-
]
859+
upstream_lineage = self.generate_virtual_dataset_lineage(
860+
parsed_query_object, datasource_urn
803861
)
804862

805863
dataset_info = DatasetPropertiesClass(

0 commit comments

Comments
 (0)