Skip to content

Commit 9d3291d

Browse files
committed
wip fixing physical column level lineage
1 parent ba0ced7 commit 9d3291d

File tree

3 files changed

+611
-68
lines changed

3 files changed

+611
-68
lines changed

metadata-ingestion/src/datahub/ingestion/source/superset.py

+15-8
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
OwnerClass,
8181
OwnershipClass,
8282
OwnershipTypeClass,
83+
SchemaFieldDataTypeClass,
8384
TagAssociationClass,
8485
UpstreamClass,
8586
UpstreamLineageClass,
@@ -738,33 +739,37 @@ def generate_virtual_dataset_lineage(
738739
def generate_physical_dataset_lineage(
739740
self,
740741
dataset_response: dict,
741-
upstream_warehouse_platform: str,
742+
upstream_dataset: str,
742743
datasource_urn: str,
743744
) -> UpstreamLineageClass:
744745
# To generate column level lineage, we can manually decode the metadata
745746
# to produce the ColumnLineageInfo
746747
columns = dataset_response.get("result", {}).get("columns", [])
748+
print(f"\n\ncolumns are: {columns}\n\n")
747749
cll: List[ColumnLineageInfo] = []
748750

749751
for column in columns:
752+
print("\nwas here\n")
750753
cll.append(
751754
ColumnLineageInfo(
752755
downstream=DownstreamColumnRef(
753756
table=None,
754757
column=column.get("column_name", ""),
755-
column_type=column.get("type", ""),
758+
column_type=SchemaFieldDataTypeClass(column.get("type"), ""),
756759
native_column_type=column.get("type", ""),
757760
),
758761
upstreams=[
759762
ColumnRef(
760-
table=upstream_warehouse_platform,
763+
table=upstream_dataset,
761764
column=column.get("column_name", ""),
762765
)
763766
],
764767
logic=None,
765768
)
766769
)
770+
print(f"\n\n cll now is: {cll}\n\n")
767771

772+
print(f"\n\ncll is; {cll}\n\n")
768773
fine_grained_lineages: List[FineGrainedLineageClass] = []
769774

770775
for cll_info in cll:
@@ -786,10 +791,6 @@ def generate_physical_dataset_lineage(
786791
)
787792
)
788793

789-
upstream_dataset = self.get_datasource_urn_from_id(
790-
dataset_response, upstream_warehouse_platform
791-
)
792-
793794
upstream_lineage = UpstreamLineageClass(
794795
upstreams=[
795796
UpstreamClass(
@@ -799,6 +800,7 @@ def generate_physical_dataset_lineage(
799800
],
800801
fineGrainedLineages=fine_grained_lineages,
801802
)
803+
print(f"\n\n at the end, column level lineage is; {upstream_lineage}\n\n")
802804
return upstream_lineage
803805

804806
def construct_dataset_from_dataset_data(
@@ -841,11 +843,15 @@ def construct_dataset_from_dataset_data(
841843
if upstream_warehouse_platform in warehouse_naming:
842844
upstream_warehouse_platform = warehouse_naming[upstream_warehouse_platform]
843845

846+
upstream_dataset = self.get_datasource_urn_from_id(
847+
dataset_response, upstream_warehouse_platform
848+
)
849+
844850
# Sometimes the field will be null instead of not existing
845851
if sql == "null" or not sql:
846852
tag_urn = f"urn:li:tag:{self.platform}:physical"
847853
upstream_lineage = self.generate_physical_dataset_lineage(
848-
dataset_response, upstream_warehouse_platform, datasource_urn
854+
dataset_response, upstream_dataset, datasource_urn
849855
)
850856
else:
851857
tag_urn = f"urn:li:tag:{self.platform}:virtual"
@@ -856,6 +862,7 @@ def construct_dataset_from_dataset_data(
856862
platform_instance=None,
857863
env=self.config.env,
858864
)
865+
print(f"\n\n\n parsed query object: {parsed_query_object.column_lineage}")
859866
upstream_lineage = self.generate_virtual_dataset_lineage(
860867
parsed_query_object, datasource_urn
861868
)

0 commit comments

Comments
 (0)