|
| 1 | +import datetime |
1 | 2 | from typing import Any, Dict
|
2 | 3 | from unittest.mock import MagicMock, patch
|
3 | 4 |
|
|
16 | 17 | from datahub.ingestion.source.snowflake.oauth_config import OAuthConfiguration
|
17 | 18 | from datahub.ingestion.source.snowflake.snowflake_config import (
|
18 | 19 | DEFAULT_TEMP_TABLES_PATTERNS,
|
| 20 | + SnowflakeIdentifierConfig, |
19 | 21 | SnowflakeV2Config,
|
20 | 22 | )
|
21 | 23 | from datahub.ingestion.source.snowflake.snowflake_lineage_v2 import UpstreamLineageEdge
|
| 24 | +from datahub.ingestion.source.snowflake.snowflake_queries import ( |
| 25 | + SnowflakeQueriesExtractor, |
| 26 | + SnowflakeQueriesExtractorConfig, |
| 27 | +) |
22 | 28 | from datahub.ingestion.source.snowflake.snowflake_query import (
|
23 | 29 | SnowflakeQuery,
|
24 | 30 | create_deny_regex_sql_filter,
|
25 | 31 | )
|
26 | 32 | from datahub.ingestion.source.snowflake.snowflake_usage_v2 import (
|
27 | 33 | SnowflakeObjectAccessEntry,
|
28 | 34 | )
|
29 |
| -from datahub.ingestion.source.snowflake.snowflake_utils import SnowsightUrlBuilder |
| 35 | +from datahub.ingestion.source.snowflake.snowflake_utils import ( |
| 36 | + SnowflakeIdentifierBuilder, |
| 37 | + SnowsightUrlBuilder, |
| 38 | +) |
30 | 39 | from datahub.ingestion.source.snowflake.snowflake_v2 import SnowflakeV2Source
|
| 40 | +from datahub.sql_parsing.sql_parsing_aggregator import TableRename, TableSwap |
31 | 41 | from datahub.testing.doctest import assert_doctest
|
32 | 42 | from tests.test_helpers import test_connection_helpers
|
33 | 43 |
|
@@ -689,3 +699,111 @@ def test_snowflake_query_result_parsing():
|
689 | 699 | ],
|
690 | 700 | }
|
691 | 701 | assert UpstreamLineageEdge.parse_obj(db_row)
|
| 702 | + |
| 703 | + |
| 704 | +class TestDDLProcessing: |
| 705 | + @pytest.fixture |
| 706 | + def session_id(self): |
| 707 | + return "14774700483022321" |
| 708 | + |
| 709 | + @pytest.fixture |
| 710 | + def timestamp(self): |
| 711 | + return datetime.datetime( |
| 712 | + year=2025, month=2, day=3, hour=15, minute=1, second=43 |
| 713 | + ).astimezone(datetime.timezone.utc) |
| 714 | + |
| 715 | + @pytest.fixture |
| 716 | + def extractor(self) -> SnowflakeQueriesExtractor: |
| 717 | + connection = MagicMock() |
| 718 | + config = SnowflakeQueriesExtractorConfig() |
| 719 | + structured_report = MagicMock() |
| 720 | + filters = MagicMock() |
| 721 | + structured_report.num_ddl_queries_dropped = 0 |
| 722 | + identifier_config = SnowflakeIdentifierConfig() |
| 723 | + identifiers = SnowflakeIdentifierBuilder(identifier_config, structured_report) |
| 724 | + return SnowflakeQueriesExtractor( |
| 725 | + connection, config, structured_report, filters, identifiers |
| 726 | + ) |
| 727 | + |
| 728 | + def test_ddl_processing_alter_table_rename(self, extractor, session_id, timestamp): |
| 729 | + query = "ALTER TABLE person_info_loading RENAME TO person_info_final;" |
| 730 | + object_modified_by_ddl = { |
| 731 | + "objectDomain": "Table", |
| 732 | + "objectId": 1789034, |
| 733 | + "objectName": "DUMMY_DB.PUBLIC.PERSON_INFO_LOADING", |
| 734 | + "operationType": "ALTER", |
| 735 | + "properties": { |
| 736 | + "objectName": {"value": "DUMMY_DB.PUBLIC.PERSON_INFO_FINAL"} |
| 737 | + }, |
| 738 | + } |
| 739 | + query_type = "RENAME_TABLE" |
| 740 | + |
| 741 | + ddl = extractor.parse_ddl_query( |
| 742 | + query, session_id, timestamp, object_modified_by_ddl, query_type |
| 743 | + ) |
| 744 | + |
| 745 | + assert ddl == TableRename( |
| 746 | + original_urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info_loading,PROD)", |
| 747 | + new_urn="urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info_final,PROD)", |
| 748 | + query=query, |
| 749 | + session_id=session_id, |
| 750 | + timestamp=timestamp, |
| 751 | + ), "Processing ALTER ... RENAME should result in a proper TableRename object" |
| 752 | + |
| 753 | + def test_ddl_processing_alter_table_add_column( |
| 754 | + self, extractor, session_id, timestamp |
| 755 | + ): |
| 756 | + query = "ALTER TABLE person_info ADD year BIGINT" |
| 757 | + object_modified_by_ddl = { |
| 758 | + "objectDomain": "Table", |
| 759 | + "objectId": 2612260, |
| 760 | + "objectName": "DUMMY_DB.PUBLIC.PERSON_INFO", |
| 761 | + "operationType": "ALTER", |
| 762 | + "properties": { |
| 763 | + "columns": { |
| 764 | + "BIGINT": { |
| 765 | + "objectId": {"value": 8763407}, |
| 766 | + "subOperationType": "ADD", |
| 767 | + } |
| 768 | + } |
| 769 | + }, |
| 770 | + } |
| 771 | + query_type = "ALTER_TABLE_ADD_COLUMN" |
| 772 | + |
| 773 | + ddl = extractor.parse_ddl_query( |
| 774 | + query, session_id, timestamp, object_modified_by_ddl, query_type |
| 775 | + ) |
| 776 | + |
| 777 | + assert ddl is None, ( |
| 778 | + "For altering columns statement ddl parsing should return None" |
| 779 | + ) |
| 780 | + assert extractor.report.num_ddl_queries_dropped == 1, ( |
| 781 | + "Dropped ddls should be properly counted" |
| 782 | + ) |
| 783 | + |
| 784 | + def test_ddl_processing_alter_table_swap(self, extractor, session_id, timestamp): |
| 785 | + query = "ALTER TABLE person_info SWAP WITH person_info_swap;" |
| 786 | + object_modified_by_ddl = { |
| 787 | + "objectDomain": "Table", |
| 788 | + "objectId": 3776835, |
| 789 | + "objectName": "DUMMY_DB.PUBLIC.PERSON_INFO", |
| 790 | + "operationType": "ALTER", |
| 791 | + "properties": { |
| 792 | + "swapTargetDomain": {"value": "Table"}, |
| 793 | + "swapTargetId": {"value": 3786260}, |
| 794 | + "swapTargetName": {"value": "DUMMY_DB.PUBLIC.PERSON_INFO_SWAP"}, |
| 795 | + }, |
| 796 | + } |
| 797 | + query_type = "ALTER" |
| 798 | + |
| 799 | + ddl = extractor.parse_ddl_query( |
| 800 | + query, session_id, timestamp, object_modified_by_ddl, query_type |
| 801 | + ) |
| 802 | + |
| 803 | + assert ddl == TableSwap( |
| 804 | + urn1="urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info,PROD)", |
| 805 | + urn2="urn:li:dataset:(urn:li:dataPlatform:snowflake,dummy_db.public.person_info_swap,PROD)", |
| 806 | + query=query, |
| 807 | + session_id=session_id, |
| 808 | + timestamp=timestamp, |
| 809 | + ), "Processing ALTER ... SWAP DDL should result in a proper TableSwap object" |
0 commit comments