Skip to content

Commit d696fb6

Browse files
committed
fix(ingest/snowflake): Fixing table rename query handling
1 parent 48b6581 commit d696fb6

File tree

1 file changed

+16
-13
lines changed

1 file changed

+16
-13
lines changed

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py

+16-13
Original file line numberDiff line numberDiff line change
@@ -403,6 +403,7 @@ def _parse_audit_log_row(
403403
res["session_id"],
404404
res["query_start_time"],
405405
object_modified_by_ddl,
406+
res["query_type"],
406407
)
407408
if known_ddl_entry:
408409
return known_ddl_entry
@@ -537,40 +538,42 @@ def parse_ddl_query(
537538
session_id: str,
538539
timestamp: datetime,
539540
object_modified_by_ddl: dict,
541+
query_type: str,
540542
) -> Optional[Union[TableRename, TableSwap]]:
541543
timestamp = timestamp.astimezone(timezone.utc)
542-
if object_modified_by_ddl[
543-
"operationType"
544-
] == "ALTER" and object_modified_by_ddl["properties"].get("swapTargetName"):
545-
urn1 = self.identifiers.gen_dataset_urn(
544+
if (
545+
object_modified_by_ddl["operationType"] == "ALTER"
546+
and query_type == "RENAME_TABLE"
547+
and object_modified_by_ddl["properties"].get("objectName")
548+
):
549+
original_un = self.identifiers.gen_dataset_urn(
546550
self.identifiers.get_dataset_identifier_from_qualified_name(
547551
object_modified_by_ddl["objectName"]
548552
)
549553
)
550554

551-
urn2 = self.identifiers.gen_dataset_urn(
555+
new_urn = self.identifiers.gen_dataset_urn(
552556
self.identifiers.get_dataset_identifier_from_qualified_name(
553-
object_modified_by_ddl["properties"]["swapTargetName"]["value"]
557+
object_modified_by_ddl["properties"]["objectName"]["value"]
554558
)
555559
)
556-
557-
return TableSwap(urn1, urn2, query, session_id, timestamp)
560+
return TableRename(original_un, new_urn, query, session_id, timestamp)
558561
elif object_modified_by_ddl[
559562
"operationType"
560-
] == "RENAME_TABLE" and object_modified_by_ddl["properties"].get("objectName"):
561-
original_un = self.identifiers.gen_dataset_urn(
563+
] == "ALTER" and object_modified_by_ddl["properties"].get("swapTargetName"):
564+
urn1 = self.identifiers.gen_dataset_urn(
562565
self.identifiers.get_dataset_identifier_from_qualified_name(
563566
object_modified_by_ddl["objectName"]
564567
)
565568
)
566569

567-
new_urn = self.identifiers.gen_dataset_urn(
570+
urn2 = self.identifiers.gen_dataset_urn(
568571
self.identifiers.get_dataset_identifier_from_qualified_name(
569-
object_modified_by_ddl["properties"]["objectName"]["value"]
572+
object_modified_by_ddl["properties"]["swapTargetName"]["value"]
570573
)
571574
)
572575

573-
return TableRename(original_un, new_urn, query, session_id, timestamp)
576+
return TableSwap(urn1, urn2, query, session_id, timestamp)
574577
else:
575578
self.report.num_ddl_queries_dropped += 1
576579
return None

0 commit comments

Comments
 (0)