|
85 | 85 | UpstreamLineageClass,
|
86 | 86 | )
|
87 | 87 | from datahub.sql_parsing.sqlglot_lineage import (
|
88 |
| - ColumnLineageInfo, |
89 |
| - ColumnRef, |
90 |
| - DownstreamColumnRef, |
91 | 88 | SqlParsingResult,
|
92 | 89 | create_lineage_sql_parsed_result,
|
93 | 90 | )
|
@@ -744,44 +741,15 @@ def generate_physical_dataset_lineage(
|
744 | 741 | # To generate column level lineage, we can manually decode the metadata
|
745 | 742 | # to produce the ColumnLineageInfo
|
746 | 743 | columns = dataset_response.get("result", {}).get("columns", [])
|
747 |
| - cll: List[ColumnLineageInfo] = [] |
| 744 | + fine_grained_lineages: List[FineGrainedLineageClass] = [] |
748 | 745 |
|
749 | 746 | for column in columns:
|
750 |
| - cll.append( |
751 |
| - ColumnLineageInfo( |
752 |
| - downstream=DownstreamColumnRef( |
753 |
| - table=datasource_urn, |
754 |
| - column=column.get("column_name", ""), |
755 |
| - native_column_type=column.get("type", ""), |
756 |
| - ), |
757 |
| - upstreams=[ |
758 |
| - ColumnRef( |
759 |
| - table=upstream_dataset, |
760 |
| - column=column.get("column_name", ""), |
761 |
| - ) |
762 |
| - ], |
763 |
| - logic=None, |
764 |
| - ) |
765 |
| - ) |
766 |
| - |
767 |
| - fine_grained_lineages: List[FineGrainedLineageClass] = [] |
| 747 | + column_name = column.get("column_name", "") |
| 748 | + if not column_name: |
| 749 | + continue |
768 | 750 |
|
769 |
| - for cll_info in cll: |
770 |
| - downstream = ( |
771 |
| - [ |
772 |
| - make_schema_field_urn( |
773 |
| - cll_info.downstream.table, cll_info.downstream.column |
774 |
| - ) |
775 |
| - ] |
776 |
| - if cll_info.downstream |
777 |
| - and cll_info.downstream.table |
778 |
| - and cll_info.downstream.column |
779 |
| - else [] |
780 |
| - ) |
781 |
| - upstreams = [ |
782 |
| - make_schema_field_urn(column_ref.table, column_ref.column) |
783 |
| - for column_ref in cll_info.upstreams |
784 |
| - ] |
| 751 | + downstream = [make_schema_field_urn(datasource_urn, column_name)] |
| 752 | + upstreams = [make_schema_field_urn(upstream_dataset, column_name)] |
785 | 753 | fine_grained_lineages.append(
|
786 | 754 | FineGrainedLineageClass(
|
787 | 755 | downstreamType=FineGrainedLineageDownstreamTypeClass.FIELD,
|
|
0 commit comments