Skip to content

Commit 3bd6446

Browse files
committed
fix(ingest/powerbi): MySQL and incremental lineage
1 parent 6177b95 commit 3bd6446

File tree

4 files changed

+72
-1
lines changed

4 files changed

+72
-1
lines changed

metadata-ingestion/src/datahub/ingestion/source/powerbi/config.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
from datahub.configuration.common import AllowDenyPattern, ConfigModel
1212
from datahub.configuration.source_common import DatasetSourceConfigMixin, PlatformDetail
1313
from datahub.configuration.validate_field_deprecation import pydantic_field_deprecated
14+
from datahub.ingestion.api.incremental_lineage_helper import (
15+
IncrementalLineageConfigMixin,
16+
)
1417
from datahub.ingestion.source.common.subtypes import BIAssetSubTypes
1518
from datahub.ingestion.source.state.stale_entity_removal_handler import (
1619
StaleEntityRemovalSourceReport,
@@ -183,6 +186,11 @@ class SupportedDataPlatform(Enum):
183186
datahub_data_platform_name="databricks",
184187
)
185188

189+
MYSQL = DataPlatformPair(
190+
powerbi_data_platform_name="MySQL",
191+
datahub_data_platform_name="mysql",
192+
)
193+
186194

187195
@dataclass
188196
class PowerBiDashboardSourceReport(StaleEntityRemovalSourceReport):
@@ -275,7 +283,7 @@ class PowerBiProfilingConfig(ConfigModel):
275283

276284

277285
class PowerBiDashboardSourceConfig(
278-
StatefulIngestionConfigBase, DatasetSourceConfigMixin
286+
StatefulIngestionConfigBase, DatasetSourceConfigMixin, IncrementalLineageConfigMixin
279287
):
280288
platform_name: str = pydantic.Field(
281289
default=Constant.PLATFORM_NAME, hidden_from_docs=True

metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/data_classes.py

+1
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,4 @@ class FunctionName(Enum):
7474
GOOGLE_BIGQUERY_DATA_ACCESS = "GoogleBigQuery.Database"
7575
AMAZON_REDSHIFT_DATA_ACCESS = "AmazonRedshift.Database"
7676
DATABRICK_MULTI_CLOUD_DATA_ACCESS = "DatabricksMultiCloud.Catalogs"
77+
MYSQL_DATA_ACCESS = "MySQL.Database"

metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/pattern_handler.py

+57
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,58 @@ def two_level_access_pattern(
562562
)
563563

564564

565+
class MySQLLineage(AbstractLineage):
566+
def create_lineage(
567+
self, data_access_func_detail: DataAccessFunctionDetail
568+
) -> Lineage:
569+
logger.debug(
570+
f"Processing {self.get_platform_pair().powerbi_data_platform_name} data-access function detail {data_access_func_detail}"
571+
)
572+
573+
server, db_name = self.get_db_detail_from_argument(
574+
data_access_func_detail.arg_list
575+
)
576+
if server is None or db_name is None:
577+
return Lineage.empty() # Return an empty list
578+
579+
schema_name: str = cast(
580+
IdentifierAccessor, data_access_func_detail.identifier_accessor
581+
).items["Schema"]
582+
583+
table_name: str = cast(
584+
IdentifierAccessor, data_access_func_detail.identifier_accessor
585+
).items["Item"]
586+
587+
qualified_table_name: str = f"{schema_name}.{table_name}"
588+
589+
logger.debug(
590+
f"Platform({self.get_platform_pair().datahub_data_platform_name}) qualified_table_name= {qualified_table_name}"
591+
)
592+
593+
urn = make_urn(
594+
config=self.config,
595+
platform_instance_resolver=self.platform_instance_resolver,
596+
data_platform_pair=self.get_platform_pair(),
597+
server=server,
598+
qualified_table_name=qualified_table_name,
599+
)
600+
601+
column_lineage = self.create_table_column_lineage(urn)
602+
603+
return Lineage(
604+
upstreams=[
605+
DataPlatformTable(
606+
data_platform_pair=self.get_platform_pair(),
607+
urn=urn,
608+
)
609+
],
610+
column_lineage=column_lineage,
611+
)
612+
613+
def get_platform_pair(self) -> DataPlatformPair:
614+
return SupportedDataPlatform.MYSQL.value
615+
616+
565617
class PostgresLineage(TwoStepDataAccessPattern):
566618
def create_lineage(
567619
self, data_access_func_detail: DataAccessFunctionDetail
@@ -929,6 +981,11 @@ class SupportedPattern(Enum):
929981
FunctionName.AMAZON_REDSHIFT_DATA_ACCESS,
930982
)
931983

984+
MYSQL = (
985+
MySQLLineage,
986+
FunctionName.MYSQL_DATA_ACCESS,
987+
)
988+
932989
NATIVE_QUERY = (
933990
NativeQueryLineage,
934991
FunctionName.NATIVE_QUERY,

metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py

+5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# Meta Data Ingestion From the Power BI Source
44
#
55
#########################################################
6+
import functools
67
import logging
78
from datetime import datetime
89
from typing import Iterable, List, Optional, Tuple, Union
@@ -24,6 +25,7 @@
2425
support_status,
2526
)
2627
from datahub.ingestion.api.incremental_lineage_helper import (
28+
auto_incremental_lineage,
2729
convert_dashboard_info_to_patch,
2830
)
2931
from datahub.ingestion.api.source import (
@@ -1524,6 +1526,9 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
15241526
else:
15251527
return [
15261528
*super().get_workunit_processors(),
1529+
functools.partial(
1530+
auto_incremental_lineage, self.source_config.incremental_lineage
1531+
),
15271532
self.stale_entity_removal_handler.workunit_processor,
15281533
]
15291534

0 commit comments

Comments
 (0)