Skip to content

Commit 3da338c

Browse files
sagar-salvi-apptwaresleeperdeep
authored andcommitted
fix(ingest): replace sqllineage/sqlparse with our SQL parser (datahub-project#12020)
1 parent 3064d77 commit 3da338c

File tree

13 files changed

+130
-435
lines changed

13 files changed

+130
-435
lines changed

docs/how/updating-datahub.md

+14-9
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,21 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
1919
## Next
2020

2121
- #11560 - The PowerBI ingestion source configuration option include_workspace_name_in_dataset_urn determines whether the workspace name is included in the PowerBI dataset's URN.<br/> PowerBI allows to have identical name of semantic model and their tables across the workspace, It will overwrite the semantic model in-case of multi-workspace ingestion.<br/>
22-
Entity urn with `include_workspace_name_in_dataset_urn: false`
23-
```
24-
urn:li:dataset:(urn:li:dataPlatform:powerbi,[<PlatformInstance>.]<SemanticModelName>.<TableName>,<ENV>)
25-
```
22+
Entity urn with `include_workspace_name_in_dataset_urn: false`
2623

27-
Entity urn with `include_workspace_name_in_dataset_urn: true`
28-
```
29-
urn:li:dataset:(urn:li:dataPlatform:powerbi,[<PlatformInstance>.].<WorkspaceName>.<SemanticModelName>.<TableName>,<ENV>)
30-
```
24+
```
25+
urn:li:dataset:(urn:li:dataPlatform:powerbi,[<PlatformInstance>.]<SemanticModelName>.<TableName>,<ENV>)
26+
```
27+
28+
Entity urn with `include_workspace_name_in_dataset_urn: true`
29+
30+
```
31+
urn:li:dataset:(urn:li:dataPlatform:powerbi,[<PlatformInstance>.].<WorkspaceName>.<SemanticModelName>.<TableName>,<ENV>)
32+
```
3133

3234
The config `include_workspace_name_in_dataset_urn` is default to `false` for backward compatiblity, However, we recommend enabling this flag after performing the necessary cleanup.
3335
If stateful ingestion is enabled, running ingestion with the latest CLI version will handle the cleanup automatically. Otherwise, we recommend soft deleting all powerbi data via the DataHub CLI:
34-
`datahub delete --platform powerbi --soft` and then re-ingest with the latest CLI version, ensuring the `include_workspace_name_in_dataset_urn` configuration is set to true.
36+
`datahub delete --platform powerbi --soft` and then re-ingest with the latest CLI version, ensuring the `include_workspace_name_in_dataset_urn` configuration is set to true.
3537

3638
- #11701: The Fivetran `sources_to_database` field is deprecated in favor of setting directly within `sources_to_platform_instance.<key>.database`.
3739
- #11742: For PowerBi ingestion, `use_powerbi_email` is now enabled by default when extracting ownership information.
@@ -48,6 +50,9 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
4850
- #11619 - schema field/column paths can no longer be duplicated within the schema
4951
- #11570 - The `DatahubClientConfig`'s server field no longer defaults to `http://localhost:8080`. Be sure to explicitly set this.
5052
- #11570 - If a `datahub_api` is explicitly passed to a stateful ingestion config provider, it will be used. We previously ignored it if the pipeline context also had a graph object.
53+
- #11518 - DataHub Garbage Collection: Various entities that are soft-deleted (after 10d) or are timeseries _entities_ (dataprocess, execution requests) will be removed automatically using logic in the `datahub-gc` ingestion source.
54+
- #12020 - Removed `sql_parser` configuration from the Redash source, as Redash now exclusively uses the sqlglot-based parser for lineage extraction.
55+
- #12020 - Removed `datahub.utilities.sql_parser`, `datahub.utilities.sql_parser_base` and `datahub.utilities.sql_lineage_parser_impl` module along with `SqlLineageSQLParser` and `DefaultSQLParser`. Use `create_lineage_sql_parsed_result` from `datahub.sql_parsing.sqlglot_lineage` module instead.
5156
- #11518 - DataHub Garbage Collection: Various entities that are soft-deleted
5257
(after 10d) or are timeseries *entities* (dataprocess, execution requests)
5358
will be removed automatically using logic in the `datahub-gc` ingestion

metadata-ingestion-modules/gx-plugin/setup.py

+1-11
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,6 @@ def get_long_description():
1515

1616
rest_common = {"requests", "requests_file"}
1717

18-
# TODO: Can we move away from sqllineage and use sqlglot ??
19-
sqllineage_lib = {
20-
"sqllineage==1.3.8",
21-
# We don't have a direct dependency on sqlparse but it is a dependency of sqllineage.
22-
# There have previously been issues from not pinning sqlparse, so it's best to pin it.
23-
# Related: https://github.com/reata/sqllineage/issues/361 and https://github.com/reata/sqllineage/pull/360
24-
"sqlparse==0.4.4",
25-
}
26-
2718
_version: str = package_metadata["__version__"]
2819
_self_pin = (
2920
f"=={_version}"
@@ -43,8 +34,7 @@ def get_long_description():
4334
# https://github.com/ipython/traitlets/issues/741
4435
"traitlets<5.2.2",
4536
*rest_common,
46-
*sqllineage_lib,
47-
f"acryl-datahub[datahub-rest]{_self_pin}",
37+
f"acryl-datahub[datahub-rest,sql-parser]{_self_pin}",
4838
}
4939

5040
mypy_stubs = {

metadata-ingestion-modules/gx-plugin/src/datahub_gx_plugin/action.py

+19-5
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@
3434
)
3535
from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance
3636
from datahub.metadata.schema_classes import PartitionSpecClass, PartitionTypeClass
37+
from datahub.sql_parsing.sqlglot_lineage import create_lineage_sql_parsed_result
3738
from datahub.utilities._markupsafe_compat import MARKUPSAFE_PATCHED
38-
from datahub.utilities.sql_parser import DefaultSQLParser
39+
from datahub.utilities.urns.dataset_urn import DatasetUrn
3940
from great_expectations.checkpoint.actions import ValidationAction
4041
from great_expectations.core.batch import Batch
4142
from great_expectations.core.batch_spec import (
@@ -677,10 +678,23 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
677678
query=query,
678679
customProperties=batchSpecProperties,
679680
)
680-
try:
681-
tables = DefaultSQLParser(query).get_tables()
682-
except Exception as e:
683-
logger.warning(f"Sql parser failed on {query} with {e}")
681+
682+
data_platform = get_platform_from_sqlalchemy_uri(str(sqlalchemy_uri))
683+
sql_parser_in_tables = create_lineage_sql_parsed_result(
684+
query=query,
685+
platform=data_platform,
686+
env=self.env,
687+
platform_instance=None,
688+
default_db=None,
689+
)
690+
tables = [
691+
DatasetUrn.from_string(table_urn).name
692+
for table_urn in sql_parser_in_tables.in_tables
693+
]
694+
if sql_parser_in_tables.debug_info.table_error:
695+
logger.warning(
696+
f"Sql parser failed on {query} with {sql_parser_in_tables.debug_info.table_error}"
697+
)
684698
tables = []
685699

686700
if len(set(tables)) != 1:
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,2 @@
1-
Note! The integration can use an SQL parser to try to parse the tables the chart depends on. This parsing is disabled by default,
2-
but can be enabled by setting `parse_table_names_from_sql: true`. The default parser is based on the [`sqllineage`](https://pypi.org/project/sqllineage/) package.
3-
As this package doesn't officially support all the SQL dialects that Redash supports, the result might not be correct. You can, however, implement a
4-
custom parser and take it into use by setting the `sql_parser` configuration value. A custom SQL parser must inherit from `datahub.utilities.sql_parser.SQLParser`
5-
and must be made available to Datahub by ,for example, installing it. The configuration then needs to be set to `module_name.ClassName` of the parser.
1+
Note! The integration can use an SQL parser to try to parse the tables the chart depends on. This parsing is disabled by default,
2+
but can be enabled by setting `parse_table_names_from_sql: true`. The parser is based on the [`sqlglot`](https://pypi.org/project/sqlglot/) package.

metadata-ingestion/setup.py

+5-18
Original file line numberDiff line numberDiff line change
@@ -159,14 +159,6 @@
159159
| classification_lib
160160
)
161161

162-
sqllineage_lib = {
163-
"sqllineage==1.3.8",
164-
# We don't have a direct dependency on sqlparse but it is a dependency of sqllineage.
165-
# There have previously been issues from not pinning sqlparse, so it's best to pin it.
166-
# Related: https://github.com/reata/sqllineage/issues/361 and https://github.com/reata/sqllineage/pull/360
167-
"sqlparse==0.4.4",
168-
}
169-
170162
aws_common = {
171163
# AWS Python SDK
172164
"boto3",
@@ -216,7 +208,6 @@
216208
"sqlalchemy-redshift>=0.8.3",
217209
"GeoAlchemy2",
218210
"redshift-connector>=2.1.0",
219-
*sqllineage_lib,
220211
*path_spec_common,
221212
}
222213

@@ -464,9 +455,7 @@
464455
# It's technically wrong for packages to depend on setuptools. However, it seems mlflow does it anyways.
465456
"setuptools",
466457
},
467-
"mode": {"requests", "python-liquid", "tenacity>=8.0.1"}
468-
| sqllineage_lib
469-
| sqlglot_lib,
458+
"mode": {"requests", "python-liquid", "tenacity>=8.0.1"} | sqlglot_lib,
470459
"mongodb": {"pymongo[srv]>=3.11", "packaging"},
471460
"mssql": sql_common | mssql_common,
472461
"mssql-odbc": sql_common | mssql_common | {"pyodbc"},
@@ -482,7 +471,7 @@
482471
| pyhive_common
483472
| {"psycopg2-binary", "pymysql>=1.0.2"},
484473
"pulsar": {"requests"},
485-
"redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib,
474+
"redash": {"redash-toolbelt", "sql-metadata"} | sqlglot_lib,
486475
"redshift": sql_common
487476
| redshift_common
488477
| usage_common
@@ -503,9 +492,7 @@
503492
"slack": slack,
504493
"superset": superset_common,
505494
"preset": superset_common,
506-
# FIXME: I don't think tableau uses sqllineage anymore so we should be able
507-
# to remove that dependency.
508-
"tableau": {"tableauserverclient>=0.24.0"} | sqllineage_lib | sqlglot_lib,
495+
"tableau": {"tableauserverclient>=0.24.0"} | sqlglot_lib,
509496
"teradata": sql_common
510497
| usage_common
511498
| sqlglot_lib
@@ -527,9 +514,9 @@
527514
),
528515
"powerbi-report-server": powerbi_report_server,
529516
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.2"},
530-
"unity-catalog": databricks | sql_common | sqllineage_lib,
517+
"unity-catalog": databricks | sql_common,
531518
# databricks is alias for unity-catalog and needs to be kept in sync
532-
"databricks": databricks | sql_common | sqllineage_lib,
519+
"databricks": databricks | sql_common,
533520
"fivetran": snowflake_common | bigquery_common | sqlglot_lib,
534521
"qlik-sense": sqlglot_lib | {"requests", "websocket-client"},
535522
"sigma": sqlglot_lib | {"requests"},

metadata-ingestion/src/datahub/ingestion/source/mode.py

-23
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
from requests.adapters import HTTPAdapter, Retry
1919
from requests.exceptions import ConnectionError
2020
from requests.models import HTTPBasicAuth, HTTPError
21-
from sqllineage.runner import LineageRunner
2221
from tenacity import retry_if_exception_type, stop_after_attempt, wait_exponential
2322

2423
import datahub.emitter.mce_builder as builder
@@ -820,28 +819,6 @@ def _get_definition(self, definition_name):
820819
)
821820
return None
822821

823-
@lru_cache(maxsize=None)
824-
def _get_source_from_query(self, raw_query: str) -> set:
825-
query = self._replace_definitions(raw_query)
826-
parser = LineageRunner(query)
827-
source_paths = set()
828-
try:
829-
for table in parser.source_tables:
830-
sources = str(table).split(".")
831-
source_schema, source_table = sources[-2], sources[-1]
832-
if source_schema == "<default>":
833-
source_schema = str(self.config.default_schema)
834-
835-
source_paths.add(f"{source_schema}.{source_table}")
836-
except Exception as e:
837-
self.report.report_failure(
838-
title="Failed to Extract Lineage From Query",
839-
message="Unable to retrieve lineage from Mode query.",
840-
context=f"Query: {raw_query}, Error: {str(e)}",
841-
)
842-
843-
return source_paths
844-
845822
def _get_datasource_urn(
846823
self,
847824
platform: str,

metadata-ingestion/src/datahub/ingestion/source/redash.py

+13-63
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import math
33
import sys
44
from dataclasses import dataclass, field
5-
from typing import Dict, Iterable, List, Optional, Set, Type
5+
from typing import Dict, Iterable, List, Optional, Set
66

77
import dateutil.parser as dp
88
from packaging import version
@@ -22,7 +22,6 @@
2222
platform_name,
2323
support_status,
2424
)
25-
from datahub.ingestion.api.registry import import_path
2625
from datahub.ingestion.api.source import Source, SourceCapability, SourceReport
2726
from datahub.ingestion.api.workunit import MetadataWorkUnit
2827
from datahub.metadata.com.linkedin.pegasus2avro.common import (
@@ -39,9 +38,9 @@
3938
ChartTypeClass,
4039
DashboardInfoClass,
4140
)
41+
from datahub.sql_parsing.sqlglot_lineage import create_lineage_sql_parsed_result
4242
from datahub.utilities.lossy_collections import LossyDict, LossyList
4343
from datahub.utilities.perf_timer import PerfTimer
44-
from datahub.utilities.sql_parser_base import SQLParser
4544
from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor
4645

4746
logger = logging.getLogger(__name__)
@@ -270,10 +269,6 @@ class RedashConfig(ConfigModel):
270269
parse_table_names_from_sql: bool = Field(
271270
default=False, description="See note below."
272271
)
273-
sql_parser: str = Field(
274-
default="datahub.utilities.sql_parser.DefaultSQLParser",
275-
description="custom SQL parser. See note below for details.",
276-
)
277272

278273
env: str = Field(
279274
default=DEFAULT_ENV,
@@ -354,7 +349,6 @@ def __init__(self, ctx: PipelineContext, config: RedashConfig):
354349
self.api_page_limit = self.config.api_page_limit or math.inf
355350

356351
self.parse_table_names_from_sql = self.config.parse_table_names_from_sql
357-
self.sql_parser_path = self.config.sql_parser
358352

359353
logger.info(
360354
f"Running Redash ingestion with parse_table_names_from_sql={self.parse_table_names_from_sql}"
@@ -380,31 +374,6 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
380374
config = RedashConfig.parse_obj(config_dict)
381375
return cls(ctx, config)
382376

383-
@classmethod
384-
def _import_sql_parser_cls(cls, sql_parser_path: str) -> Type[SQLParser]:
385-
assert "." in sql_parser_path, "sql_parser-path must contain a ."
386-
parser_cls = import_path(sql_parser_path)
387-
388-
if not issubclass(parser_cls, SQLParser):
389-
raise ValueError(f"must be derived from {SQLParser}; got {parser_cls}")
390-
return parser_cls
391-
392-
@classmethod
393-
def _get_sql_table_names(cls, sql: str, sql_parser_path: str) -> List[str]:
394-
parser_cls = cls._import_sql_parser_cls(sql_parser_path)
395-
396-
try:
397-
sql_table_names: List[str] = parser_cls(sql).get_tables()
398-
except Exception as e:
399-
logger.warning(f"Sql parser failed on {sql} with {e}")
400-
return []
401-
402-
# Remove quotes from table names
403-
sql_table_names = [t.replace('"', "") for t in sql_table_names]
404-
sql_table_names = [t.replace("`", "") for t in sql_table_names]
405-
406-
return sql_table_names
407-
408377
def _get_chart_data_source(self, data_source_id: Optional[int] = None) -> Dict:
409378
url = f"/api/data_sources/{data_source_id}"
410379
resp = self.client._get(url).json()
@@ -441,14 +410,6 @@ def _get_database_name_based_on_datasource(
441410

442411
return database_name
443412

444-
def _construct_datalineage_urn(
445-
self, platform: str, database_name: str, sql_table_name: str
446-
) -> str:
447-
full_dataset_name = get_full_qualified_name(
448-
platform, database_name, sql_table_name
449-
)
450-
return builder.make_dataset_urn(platform, full_dataset_name, self.config.env)
451-
452413
def _get_datasource_urns(
453414
self, data_source: Dict, sql_query_data: Dict = {}
454415
) -> Optional[List[str]]:
@@ -464,34 +425,23 @@ def _get_datasource_urns(
464425
# Getting table lineage from SQL parsing
465426
if self.parse_table_names_from_sql and data_source_syntax == "sql":
466427
dataset_urns = list()
467-
try:
468-
sql_table_names = self._get_sql_table_names(
469-
query, self.sql_parser_path
470-
)
471-
except Exception as e:
428+
sql_parser_in_tables = create_lineage_sql_parsed_result(
429+
query=query,
430+
platform=platform,
431+
env=self.config.env,
432+
platform_instance=None,
433+
default_db=database_name,
434+
)
435+
# make sure dataset_urns is not empty list
436+
dataset_urns = sql_parser_in_tables.in_tables
437+
if sql_parser_in_tables.debug_info.table_error:
472438
self.report.queries_problem_parsing.add(str(query_id))
473439
self.error(
474440
logger,
475441
"sql-parsing",
476-
f"exception {e} in parsing query-{query_id}-datasource-{data_source_id}",
442+
f"exception {sql_parser_in_tables.debug_info.table_error} in parsing query-{query_id}-datasource-{data_source_id}",
477443
)
478-
sql_table_names = []
479-
for sql_table_name in sql_table_names:
480-
try:
481-
dataset_urns.append(
482-
self._construct_datalineage_urn(
483-
platform, database_name, sql_table_name
484-
)
485-
)
486-
except Exception:
487-
self.report.queries_problem_parsing.add(str(query_id))
488-
self.warn(
489-
logger,
490-
"data-urn-invalid",
491-
f"Problem making URN for {sql_table_name} parsed from query {query_id}",
492-
)
493444

494-
# make sure dataset_urns is not empty list
495445
return dataset_urns if len(dataset_urns) > 0 else None
496446

497447
else:

0 commit comments

Comments
 (0)