1
1
import logging
2
2
from abc import ABC , abstractmethod
3
3
from enum import Enum
4
- from typing import Dict , List , Optional , Tuple , Type , cast , Any
4
+ from typing import Dict , List , Optional , Tuple , Type , cast
5
5
6
6
from lark import Tree
7
7
30
30
ReferencedTable ,
31
31
)
32
32
from datahub .ingestion .source .powerbi .rest_api_wrapper .data_classes import Table
33
+ from datahub .metadata .schema_classes import SchemaFieldDataTypeClass
33
34
from datahub .sql_parsing .sqlglot_lineage import (
34
- SqlParsingResult ,
35
35
ColumnLineageInfo ,
36
+ ColumnRef ,
36
37
DownstreamColumnRef ,
37
- ColumnRef
38
+ SqlParsingResult ,
38
39
)
39
- from datahub .metadata .schema_classes import SchemaFieldDataTypeClass
40
40
41
41
logger = logging .getLogger (__name__ )
42
42
@@ -268,17 +268,15 @@ def parse_custom_sql(
268
268
),
269
269
)
270
270
271
- def create_table_column_lineage (
272
- self , urn : str
273
- ) -> List [ColumnLineageInfo ]:
271
+ def create_table_column_lineage (self , urn : str ) -> List [ColumnLineageInfo ]:
274
272
column_lineage = []
275
273
276
274
for column in self .table .columns :
277
275
downstream = DownstreamColumnRef (
278
276
table = self .table .name ,
279
277
column = column .name ,
280
278
column_type = SchemaFieldDataTypeClass (type = column .datahubDataType ),
281
- native_column_type = column .dataType or "UNKNOWN"
279
+ native_column_type = column .dataType or "UNKNOWN" ,
282
280
)
283
281
284
282
upstreams = [
@@ -289,8 +287,7 @@ def create_table_column_lineage(
289
287
]
290
288
291
289
column_lineage_info = ColumnLineageInfo (
292
- downstream = downstream ,
293
- upstreams = upstreams
290
+ downstream = downstream , upstreams = upstreams
294
291
)
295
292
296
293
column_lineage .append (column_lineage_info )
@@ -343,7 +340,7 @@ def create_lineage(
343
340
urn = urn ,
344
341
)
345
342
],
346
- column_lineage = column_lineage
343
+ column_lineage = column_lineage ,
347
344
)
348
345
349
346
@@ -410,7 +407,7 @@ def create_lineage(
410
407
urn = urn ,
411
408
)
412
409
],
413
- column_lineage = column_lineage
410
+ column_lineage = column_lineage ,
414
411
)
415
412
416
413
@@ -497,7 +494,7 @@ def create_lineage(
497
494
urn = urn ,
498
495
)
499
496
],
500
- column_lineage = column_lineage
497
+ column_lineage = column_lineage ,
501
498
)
502
499
503
500
return Lineage .empty ()
@@ -560,7 +557,7 @@ def two_level_access_pattern(
560
557
urn = urn ,
561
558
)
562
559
],
563
- column_lineage = column_lineage
560
+ column_lineage = column_lineage ,
564
561
)
565
562
566
563
@@ -724,7 +721,7 @@ def create_lineage(
724
721
urn = urn ,
725
722
)
726
723
],
727
- column_lineage = column_lineage
724
+ column_lineage = column_lineage ,
728
725
)
729
726
730
727
@@ -799,10 +796,7 @@ def create_urn_using_old_parser(self, query: str, server: str) -> Lineage:
799
796
800
797
logger .debug (f"Generated dataplatform_tables { dataplatform_tables } " )
801
798
802
- return Lineage (
803
- upstreams = dataplatform_tables ,
804
- column_lineage = column_lineage
805
- )
799
+ return Lineage (upstreams = dataplatform_tables , column_lineage = column_lineage )
806
800
807
801
def get_db_name (self , data_access_tokens : List [str ]) -> Optional [str ]:
808
802
if (
0 commit comments