Skip to content

Commit 1a384f3

Browse files
committed
fix(ingest/powerbi): Snowflake column lineage
1 parent 32b654c commit 1a384f3

File tree

3 files changed

+54
-3
lines changed

3 files changed

+54
-3
lines changed

metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/pattern_handler.py

+40-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22
from abc import ABC, abstractmethod
33
from enum import Enum
4-
from typing import Dict, List, Optional, Tuple, Type, cast
4+
from typing import Dict, List, Optional, Tuple, Type, cast, Any
55

66
from lark import Tree
77

@@ -30,7 +30,13 @@
3030
ReferencedTable,
3131
)
3232
from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import Table
33-
from datahub.sql_parsing.sqlglot_lineage import SqlParsingResult
33+
from datahub.sql_parsing.sqlglot_lineage import (
34+
SqlParsingResult,
35+
ColumnLineageInfo,
36+
DownstreamColumnRef,
37+
ColumnRef
38+
)
39+
from datahub.metadata.schema_classes import SchemaFieldDataTypeClass
3440

3541
logger = logging.getLogger(__name__)
3642

@@ -262,6 +268,35 @@ def parse_custom_sql(
262268
),
263269
)
264270

271+
def create_table_column_lineage(
272+
self, urn: str
273+
) -> List[ColumnLineageInfo]:
274+
column_lineage = []
275+
276+
for column in self.table.columns:
277+
downstream = DownstreamColumnRef(
278+
table=self.table.name,
279+
column=column.name,
280+
column_type=SchemaFieldDataTypeClass(type=column.datahubDataType),
281+
native_column_type=column.dataType or "UNKNOWN"
282+
)
283+
284+
upstreams = [
285+
ColumnRef(
286+
table=urn,
287+
column=column.name.lower(),
288+
)
289+
]
290+
291+
column_lineage_info = ColumnLineageInfo(
292+
downstream=downstream,
293+
upstreams=upstreams
294+
)
295+
296+
column_lineage.append(column_lineage_info)
297+
298+
return column_lineage
299+
265300

266301
class AmazonRedshiftLineage(AbstractLineage):
267302
def get_platform_pair(self) -> DataPlatformPair:
@@ -671,14 +706,16 @@ def create_lineage(
671706
qualified_table_name=qualified_table_name,
672707
)
673708

709+
column_lineage = self.create_table_column_lineage(urn)
710+
674711
return Lineage(
675712
upstreams=[
676713
DataPlatformTable(
677714
data_platform_pair=self.get_platform_pair(),
678715
urn=urn,
679716
)
680717
],
681-
column_lineage=[],
718+
column_lineage=column_lineage,
682719
)
683720

684721

metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py

+10
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,9 @@ def resolve_to_lineage(
361361
)
362362

363363
if output_variable is None:
364+
logger.debug(
365+
f"Table: {self.table.full_name}: output-variable not found in tree"
366+
)
364367
self.reporter.report_warning(
365368
f"{self.table.full_name}-output-variable",
366369
"output-variable not found in table expression",
@@ -374,6 +377,9 @@ def resolve_to_lineage(
374377

375378
# Each item is data-access function
376379
for f_detail in table_links:
380+
logger.debug(
381+
f"Processing data-access-function {f_detail.data_access_function_name}"
382+
)
377383
# Get & Check if we support data-access-function available in M-Query
378384
supported_resolver = SupportedPattern.get_pattern_handler(
379385
f_detail.data_access_function_name
@@ -390,6 +396,10 @@ def resolve_to_lineage(
390396

391397
# From supported_resolver enum get respective handler like AmazonRedshift or Snowflake or Oracle or NativeQuery and create instance of it
392398
# & also pass additional information that will be need to generate lineage
399+
logger.debug(
400+
f"Creating instance of {supported_resolver.handler().__name__} "
401+
f"for data-access-function {f_detail.data_access_function_name}"
402+
)
393403
pattern_handler: AbstractLineage = supported_resolver.handler()(
394404
ctx=ctx,
395405
table=self.table,

metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py

+4
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,10 @@ def extract_lineage(
238238
upstream: List[UpstreamClass] = []
239239
cll_lineage: List[FineGrainedLineage] = []
240240

241+
logger.debug(
242+
f"Extracting lineage for table {table.full_name} in dataset {table.dataset.name}"
243+
)
244+
241245
upstream_lineage: List[
242246
datahub.ingestion.source.powerbi.m_query.data_classes.Lineage
243247
] = parser.get_upstream_tables(

0 commit comments

Comments
 (0)