Skip to content

Commit 1bfd4ee

Browse files
authored
feat(ingest): handle mssql casing issues in lineage (#11920)
1 parent b5d5db3 commit 1bfd4ee

12 files changed

+1947
-795
lines changed

metadata-ingestion/src/datahub/sql_parsing/sql_parsing_common.py

+6
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,19 @@
2121
# See more below:
2222
# https://documentation.sas.com/doc/en/pgmsascdc/9.4_3.5/acreldb/n0ejgx4895bofnn14rlguktfx5r3.htm
2323
"teradata",
24+
# For SQL server, the default collation rules mean that all identifiers (schema, table, column names)
25+
# are case preserving but case insensitive.
26+
"mssql",
2427
}
2528
DIALECTS_WITH_DEFAULT_UPPERCASE_COLS = {
2629
# In some dialects, column identifiers are effectively case insensitive
2730
# because they are automatically converted to uppercase. Most other systems
2831
# automatically lowercase unquoted identifiers.
2932
"snowflake",
3033
}
34+
assert DIALECTS_WITH_DEFAULT_UPPERCASE_COLS.issubset(
35+
DIALECTS_WITH_CASE_INSENSITIVE_COLS
36+
)
3137

3238

3339
class QueryType(enum.Enum):

metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py

+49-7
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import logging
66
import traceback
77
from collections import defaultdict
8-
from typing import Any, Dict, List, Optional, Set, Tuple, Union
8+
from typing import Any, Dict, List, Optional, Set, Tuple, TypeVar, Union
99

1010
import pydantic.dataclasses
1111
import sqlglot
@@ -873,6 +873,49 @@ def _translate_internal_column_lineage(
873873
)
874874

875875

876+
_StrOrNone = TypeVar("_StrOrNone", str, Optional[str])
877+
878+
879+
def _normalize_db_or_schema(
880+
db_or_schema: _StrOrNone,
881+
dialect: sqlglot.Dialect,
882+
) -> _StrOrNone:
883+
if db_or_schema is None:
884+
return None
885+
886+
# In snowflake, table identifiers must be uppercased to match sqlglot's behavior.
887+
if is_dialect_instance(dialect, "snowflake"):
888+
return db_or_schema.upper()
889+
890+
# In mssql, table identifiers must be lowercased.
891+
elif is_dialect_instance(dialect, "mssql"):
892+
return db_or_schema.lower()
893+
894+
return db_or_schema
895+
896+
897+
def _simplify_select_into(statement: sqlglot.exp.Expression) -> sqlglot.exp.Expression:
898+
"""
899+
Check if the expression is a SELECT INTO statement. If so, converts it into a CTAS.
900+
Other expressions are returned as-is.
901+
"""
902+
903+
if not (isinstance(statement, sqlglot.exp.Select) and statement.args.get("into")):
904+
return statement
905+
906+
# Convert from SELECT <cols> INTO <out> <expr>
907+
# to CREATE TABLE <out> AS SELECT <cols> <expr>
908+
into_expr: sqlglot.exp.Into = statement.args["into"].pop()
909+
into_table = into_expr.this
910+
911+
create = sqlglot.exp.Create(
912+
this=into_table,
913+
kind="TABLE",
914+
expression=statement,
915+
)
916+
return create
917+
918+
876919
def _sqlglot_lineage_inner(
877920
sql: sqlglot.exp.ExpOrStr,
878921
schema_resolver: SchemaResolverInterface,
@@ -885,12 +928,9 @@ def _sqlglot_lineage_inner(
885928
else:
886929
dialect = get_dialect(default_dialect)
887930

888-
if is_dialect_instance(dialect, "snowflake"):
889-
# in snowflake, table identifiers must be uppercased to match sqlglot's behavior.
890-
if default_db:
891-
default_db = default_db.upper()
892-
if default_schema:
893-
default_schema = default_schema.upper()
931+
default_db = _normalize_db_or_schema(default_db, dialect)
932+
default_schema = _normalize_db_or_schema(default_schema, dialect)
933+
894934
if is_dialect_instance(dialect, "redshift") and not default_schema:
895935
# On Redshift, there's no "USE SCHEMA <schema>" command. The default schema
896936
# is public, and "current schema" is the one at the front of the search path.
@@ -918,6 +958,8 @@ def _sqlglot_lineage_inner(
918958
# original_statement.sql(pretty=True, dialect=dialect),
919959
# )
920960

961+
statement = _simplify_select_into(statement)
962+
921963
# Make sure the tables are resolved with the default db / schema.
922964
# This only works for Unionable statements. For other types of statements,
923965
# we have to do it manually afterwards, but that's slightly lower accuracy

metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def is_dialect_instance(
6161
else:
6262
platforms = list(platforms)
6363

64-
dialects = [sqlglot.Dialect.get_or_raise(platform) for platform in platforms]
64+
dialects = [get_dialect(platform) for platform in platforms]
6565

6666
if any(isinstance(dialect, dialect_class.__class__) for dialect_class in dialects):
6767
return True

metadata-ingestion/tests/integration/powerbi/golden_test_cll.json

+26
Original file line numberDiff line numberDiff line change
@@ -1024,6 +1024,32 @@
10241024
"dataset": "urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_ps_cd_retention,PROD)",
10251025
"type": "TRANSFORMED"
10261026
}
1027+
],
1028+
"fineGrainedLineages": [
1029+
{
1030+
"upstreamType": "FIELD_SET",
1031+
"upstreams": [
1032+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_ps_cd_retention,PROD),client_director)",
1033+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_ps_cd_retention,PROD),month_wid)"
1034+
],
1035+
"downstreamType": "FIELD",
1036+
"downstreams": [
1037+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:powerbi,hr_pbi_test.ms_sql_native_table,DEV),cd_agent_key)"
1038+
],
1039+
"confidenceScore": 1.0
1040+
},
1041+
{
1042+
"upstreamType": "FIELD_SET",
1043+
"upstreams": [
1044+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_ps_cd_retention,PROD),client_manager_closing_month)",
1045+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mssql,commopsdb.dbo.v_ps_cd_retention,PROD),month_wid)"
1046+
],
1047+
"downstreamType": "FIELD",
1048+
"downstreams": [
1049+
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:powerbi,hr_pbi_test.ms_sql_native_table,DEV),agent_key)"
1050+
],
1051+
"confidenceScore": 1.0
1052+
}
10271053
]
10281054
}
10291055
},

0 commit comments

Comments
 (0)