Skip to content

Commit ba8932c

Browse files
authored
fix(ingest/snowflake): Fixing table rename query handling (#12852)
1 parent d1c804e commit ba8932c

File tree

2 files changed

+135
-14
lines changed

2 files changed

+135
-14
lines changed

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py

+16-13
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,7 @@ def _parse_audit_log_row(
403403
res["session_id"],
404404
res["query_start_time"],
405405
object_modified_by_ddl,
406+
res["query_type"],
406407
)
407408
if known_ddl_entry:
408409
return known_ddl_entry
@@ -537,40 +538,42 @@ def parse_ddl_query(
537538
session_id: str,
538539
timestamp: datetime,
539540
object_modified_by_ddl: dict,
541+
query_type: str,
540542
) -> Optional[Union[TableRename, TableSwap]]:
541543
timestamp = timestamp.astimezone(timezone.utc)
542-
if object_modified_by_ddl[
543-
"operationType"
544-
] == "ALTER" and object_modified_by_ddl["properties"].get("swapTargetName"):
545-
urn1 = self.identifiers.gen_dataset_urn(
544+
if (
545+
object_modified_by_ddl["operationType"] == "ALTER"
546+
and query_type == "RENAME_TABLE"
547+
and object_modified_by_ddl["properties"].get("objectName")
548+
):
549+
original_un = self.identifiers.gen_dataset_urn(
546550
self.identifiers.get_dataset_identifier_from_qualified_name(
547551
object_modified_by_ddl["objectName"]
548552
)
549553
)
550554

551-
urn2 = self.identifiers.gen_dataset_urn(
555+
new_urn = self.identifiers.gen_dataset_urn(
552556
self.identifiers.get_dataset_identifier_from_qualified_name(
553-
object_modified_by_ddl["properties"]["swapTargetName"]["value"]
557+
object_modified_by_ddl["properties"]["objectName"]["value"]
554558
)
555559
)
556-
557-
return TableSwap(urn1, urn2, query, session_id, timestamp)
560+
return TableRename(original_un, new_urn, query, session_id, timestamp)
558561
elif object_modified_by_ddl[
559562
"operationType"
560-
] == "RENAME_TABLE" and object_modified_by_ddl["properties"].get("objectName"):
561-
original_un = self.identifiers.gen_dataset_urn(
563+
] == "ALTER" and object_modified_by_ddl["properties"].get("swapTargetName"):
564+
urn1 = self.identifiers.gen_dataset_urn(
562565
self.identifiers.get_dataset_identifier_from_qualified_name(
563566
object_modified_by_ddl["objectName"]
564567
)
565568
)
566569

567-
new_urn = self.identifiers.gen_dataset_urn(
570+
urn2 = self.identifiers.gen_dataset_urn(
568571
self.identifiers.get_dataset_identifier_from_qualified_name(
569-
object_modified_by_ddl["properties"]["objectName"]["value"]
572+
object_modified_by_ddl["properties"]["swapTargetName"]["value"]
570573
)
571574
)
572575

573-
return TableRename(original_un, new_urn, query, session_id, timestamp)
576+
return TableSwap(urn1, urn2, query, session_id, timestamp)
574577
else:
575578
self.report.num_ddl_queries_dropped += 1
576579
return None

metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py

+119-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import datetime
12
from typing import Any, Dict
23
from unittest.mock import MagicMock, patch
34

@@ -16,18 +17,27 @@
1617
from datahub.ingestion.source.snowflake.oauth_config import OAuthConfiguration
1718
from datahub.ingestion.source.snowflake.snowflake_config import (
1819
DEFAULT_TEMP_TABLES_PATTERNS,
20+
SnowflakeIdentifierConfig,
1921
SnowflakeV2Config,
2022
)
2123
from datahub.ingestion.source.snowflake.snowflake_lineage_v2 import UpstreamLineageEdge
24+
from datahub.ingestion.source.snowflake.snowflake_queries import (
25+
SnowflakeQueriesExtractor,
26+
SnowflakeQueriesExtractorConfig,
27+
)
2228
from datahub.ingestion.source.snowflake.snowflake_query import (
2329
SnowflakeQuery,
2430
create_deny_regex_sql_filter,
2531
)
2632
from datahub.ingestion.source.snowflake.snowflake_usage_v2 import (
2733
SnowflakeObjectAccessEntry,
2834
)
29-
from datahub.ingestion.source.snowflake.snowflake_utils import SnowsightUrlBuilder
35+
from datahub.ingestion.source.snowflake.snowflake_utils import (
36+
SnowflakeIdentifierBuilder,
37+
SnowsightUrlBuilder,
38+
)
3039
from datahub.ingestion.source.snowflake.snowflake_v2 import SnowflakeV2Source
40+
from datahub.sql_parsing.sql_parsing_aggregator import TableRename, TableSwap
3141
from datahub.testing.doctest import assert_doctest
3242
from tests.test_helpers import test_connection_helpers
3343

@@ -689,3 +699,111 @@ def test_snowflake_query_result_parsing():
689699
],
690700
}
691701
assert UpstreamLineageEdge.parse_obj(db_row)
702+
703+
704+
class TestDDLProcessing:
705+
@pytest.fixture
706+
def session_id(self):
707+
return "14774700483022321"
708+
709+
@pytest.fixture
710+
def timestamp(self):
711+
return datetime.datetime(
712+
year=2025, month=2, day=3, hour=15, minute=1, second=43
713+
).astimezone(datetime.timezone.utc)
714+
715+
@pytest.fixture
716+
def extractor(self) -> SnowflakeQueriesExtractor:
717+
connection = MagicMock()
718+
config = SnowflakeQueriesExtractorConfig()
719+
structured_report = MagicMock()
720+
filters = MagicMock()
721+
structured_report.num_ddl_queries_dropped = 0
722+
identifier_config = SnowflakeIdentifierConfig()
723+
identifiers = SnowflakeIdentifierBuilder(identifier_config, structured_report)
724+
return SnowflakeQueriesExtractor(
725+
connection, config, structured_report, filters, identifiers
726+
)
727+
728+
def test_ddl_processing_alter_table_rename(self, extractor, session_id, timestamp):
729+
query = "ALTER TABLE person_info_loading RENAME TO person_info_final;"
730+
object_modified_by_ddl = {
731+
"objectDomain": "Table",
732+
"objectId": 1789034,
733+
"objectName": "DUMMY_DB.PUBLIC.PERSON_INFO_LOADING",
734+
"operationType": "ALTER",
735+
"properties": {
736+
"objectName": {"value": "DUMMY_DB.PUBLIC.PERSON_INFO_FINAL"}
737+
},
738+
}
739+
query_type = "RENAME_TABLE"
740+
741+
ddl = extractor.parse_ddl_query(
742+
query, session_id, timestamp, object_modified_by_ddl, query_type
743+
)
744+
745+
assert ddl == TableRename(
746+
original_urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info_loading,PROD)",
747+
new_urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info_final,PROD)",
748+
query=query,
749+
session_id=session_id,
750+
timestamp=timestamp,
751+
), "Processing ALTER ... RENAME should result in a proper TableRename object"
752+
753+
def test_ddl_processing_alter_table_add_column(
754+
self, extractor, session_id, timestamp
755+
):
756+
query = "ALTER TABLE person_info ADD year BIGINT"
757+
object_modified_by_ddl = {
758+
"objectDomain": "Table",
759+
"objectId": 2612260,
760+
"objectName": "DUMMY_DB.PUBLIC.PERSON_INFO",
761+
"operationType": "ALTER",
762+
"properties": {
763+
"columns": {
764+
"BIGINT": {
765+
"objectId": {"value": 8763407},
766+
"subOperationType": "ADD",
767+
}
768+
}
769+
},
770+
}
771+
query_type = "ALTER_TABLE_ADD_COLUMN"
772+
773+
ddl = extractor.parse_ddl_query(
774+
query, session_id, timestamp, object_modified_by_ddl, query_type
775+
)
776+
777+
assert ddl is None, (
778+
"For altering columns statement ddl parsing should return None"
779+
)
780+
assert extractor.report.num_ddl_queries_dropped == 1, (
781+
"Dropped ddls should be properly counted"
782+
)
783+
784+
def test_ddl_processing_alter_table_swap(self, extractor, session_id, timestamp):
785+
query = "ALTER TABLE person_info SWAP WITH person_info_swap;"
786+
object_modified_by_ddl = {
787+
"objectDomain": "Table",
788+
"objectId": 3776835,
789+
"objectName": "DUMMY_DB.PUBLIC.PERSON_INFO",
790+
"operationType": "ALTER",
791+
"properties": {
792+
"swapTargetDomain": {"value": "Table"},
793+
"swapTargetId": {"value": 3786260},
794+
"swapTargetName": {"value": "DUMMY_DB.PUBLIC.PERSON_INFO_SWAP"},
795+
},
796+
}
797+
query_type = "ALTER"
798+
799+
ddl = extractor.parse_ddl_query(
800+
query, session_id, timestamp, object_modified_by_ddl, query_type
801+
)
802+
803+
assert ddl == TableSwap(
804+
urn1="urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info,PROD)",
805+
urn2="urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info_swap,PROD)",
806+
query=query,
807+
session_id=session_id,
808+
timestamp=timestamp,
809+
), "Processing ALTER ... SWAP DDL should result in a proper TableSwap object"

0 commit comments

Comments
 (0)