Skip to content

Commit 855893c

Browse files
treff7eseagle-25
authored andcommitted
fix(ingest/snowflake): Create all structured propery templates before assignation (datahub-project#12469)
1 parent cf4792e commit 855893c

File tree

9 files changed

+135
-42
lines changed

9 files changed

+135
-42
lines changed

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py

+6
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,12 @@ class SnowflakeV2Config(
249249
description="If enabled along with `extract_tags`, extracts snowflake's key-value tags as DataHub structured properties instead of DataHub tags.",
250250
)
251251

252+
structured_properties_template_cache_invalidation_interval: int = Field(
253+
hidden_from_docs=True,
254+
default=60,
255+
description="Interval in seconds to invalidate the structured properties template cache.",
256+
)
257+
252258
include_external_url: bool = Field(
253259
default=True,
254260
description="Whether to populate Snowsight url for Snowflake Objects",

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py

+11
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,17 @@ def tables_for_schema(schema_name: str, db_name: Optional[str]) -> str:
159159
and table_type in ('BASE TABLE', 'EXTERNAL TABLE', 'HYBRID TABLE')
160160
order by table_schema, table_name"""
161161

162+
@staticmethod
163+
def get_all_tags():
164+
return """
165+
SELECT tag_database as "TAG_DATABASE",
166+
tag_schema AS "TAG_SCHEMA",
167+
tag_name AS "TAG_NAME",
168+
FROM snowflake.account_usage.tag_references
169+
GROUP BY TAG_DATABASE , TAG_SCHEMA, tag_name
170+
ORDER BY TAG_DATABASE, TAG_SCHEMA, TAG_NAME ASC;
171+
"""
172+
162173
@staticmethod
163174
def get_all_tags_on_object_with_propagation(
164175
db_name: str, quoted_identifier: str, domain: str

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py

+1
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ class SnowflakeV2Report(
114114
num_tables_with_known_upstreams: int = 0
115115
num_upstream_lineage_edge_parsing_failed: int = 0
116116
num_secure_views_missing_definition: int = 0
117+
num_structured_property_templates_created: int = 0
117118

118119
data_dictionary_cache: Optional["SnowflakeDataDictionary"] = None
119120

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

+17
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,23 @@ def get_secure_view_definitions(self) -> Dict[str, Dict[str, Dict[str, str]]]:
285285

286286
return secure_view_definitions
287287

288+
def get_all_tags(self) -> List[SnowflakeTag]:
289+
cur = self.connection.query(
290+
SnowflakeQuery.get_all_tags(),
291+
)
292+
293+
tags = [
294+
SnowflakeTag(
295+
database=tag["TAG_DATABASE"],
296+
schema=tag["TAG_SCHEMA"],
297+
name=tag["TAG_NAME"],
298+
value="",
299+
)
300+
for tag in cur
301+
]
302+
303+
return tags
304+
288305
@serialized_lru_cache(maxsize=1)
289306
def get_tables_for_database(
290307
self, db_name: str

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py

+18-36
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import itertools
22
import logging
3+
import time
34
from typing import Dict, Iterable, List, Optional, Union
45

56
from datahub.configuration.pattern_utils import is_schema_allowed
67
from datahub.emitter.mce_builder import (
7-
get_sys_time,
88
make_data_platform_urn,
99
make_dataset_urn_with_platform_instance,
1010
make_schema_field_urn,
@@ -74,7 +74,6 @@
7474
PROFILING,
7575
)
7676
from datahub.metadata.com.linkedin.pegasus2avro.common import (
77-
AuditStamp,
7877
GlobalTags,
7978
Status,
8079
SubTypes,
@@ -101,15 +100,8 @@
101100
StringType,
102101
TimeType,
103102
)
104-
from datahub.metadata.com.linkedin.pegasus2avro.structured import (
105-
StructuredPropertyDefinition,
106-
)
107103
from datahub.metadata.com.linkedin.pegasus2avro.tag import TagProperties
108104
from datahub.metadata.urns import (
109-
ContainerUrn,
110-
DatasetUrn,
111-
DataTypeUrn,
112-
EntityTypeUrn,
113105
SchemaFieldUrn,
114106
StructuredPropertyUrn,
115107
)
@@ -191,7 +183,7 @@ def __init__(
191183
self.domain_registry: Optional[DomainRegistry] = domain_registry
192184
self.classification_handler = ClassificationHandler(self.config, self.report)
193185
self.tag_extractor = SnowflakeTagExtractor(
194-
config, self.data_dictionary, self.report
186+
config, self.data_dictionary, self.report, identifiers
195187
)
196188
self.profiler: Optional[SnowflakeProfiler] = profiler
197189
self.snowsight_url_builder: Optional[SnowsightUrlBuilder] = (
@@ -217,6 +209,16 @@ def snowflake_identifier(self, identifier: str) -> str:
217209
return self.identifiers.snowflake_identifier(identifier)
218210

219211
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
212+
if self.config.extract_tags_as_structured_properties:
213+
logger.info("Creating structured property templates for tags")
214+
yield from self.tag_extractor.create_structured_property_templates()
215+
# We have to wait until cache invalidates to make sure the structured property template is available
216+
logger.info(
217+
f"Waiting for {self.config.structured_properties_template_cache_invalidation_interval} seconds for structured properties cache to invalidate"
218+
)
219+
time.sleep(
220+
self.config.structured_properties_template_cache_invalidation_interval
221+
)
220222
self.databases = []
221223
for database in self.get_databases() or []:
222224
self.report.report_entity_scanned(database.name, "database")
@@ -698,6 +700,7 @@ def _process_view(
698700

699701
def _process_tag(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]:
700702
use_sp = self.config.extract_tags_as_structured_properties
703+
701704
identifier = (
702705
self.snowflake_identifier(tag.structured_property_identifier())
703706
if use_sp
@@ -708,10 +711,11 @@ def _process_tag(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]:
708711
return
709712

710713
self.report.report_tag_processed(identifier)
714+
711715
if use_sp:
712-
yield from self.gen_tag_as_structured_property_workunits(tag)
713-
else:
714-
yield from self.gen_tag_workunits(tag)
716+
return
717+
718+
yield from self.gen_tag_workunits(tag)
715719

716720
def _format_tags_as_structured_properties(
717721
self, tags: List[SnowflakeTag]
@@ -732,6 +736,7 @@ def gen_dataset_workunits(
732736
if table.tags:
733737
for tag in table.tags:
734738
yield from self._process_tag(tag)
739+
735740
for column_name in table.column_tags:
736741
for tag in table.column_tags[column_name]:
737742
yield from self._process_tag(tag)
@@ -903,29 +908,6 @@ def gen_tag_workunits(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]:
903908
entityUrn=tag_urn, aspect=tag_properties_aspect
904909
).as_workunit()
905910

906-
def gen_tag_as_structured_property_workunits(
907-
self, tag: SnowflakeTag
908-
) -> Iterable[MetadataWorkUnit]:
909-
identifier = self.snowflake_identifier(tag.structured_property_identifier())
910-
urn = StructuredPropertyUrn(identifier).urn()
911-
aspect = StructuredPropertyDefinition(
912-
qualifiedName=identifier,
913-
displayName=tag.name,
914-
valueType=DataTypeUrn("datahub.string").urn(),
915-
entityTypes=[
916-
EntityTypeUrn(f"datahub.{ContainerUrn.ENTITY_TYPE}").urn(),
917-
EntityTypeUrn(f"datahub.{DatasetUrn.ENTITY_TYPE}").urn(),
918-
EntityTypeUrn(f"datahub.{SchemaFieldUrn.ENTITY_TYPE}").urn(),
919-
],
920-
lastModified=AuditStamp(
921-
time=get_sys_time(), actor="urn:li:corpuser:datahub"
922-
),
923-
)
924-
yield MetadataChangeProposalWrapper(
925-
entityUrn=urn,
926-
aspect=aspect,
927-
).as_workunit()
928-
929911
def gen_column_tags_as_structured_properties(
930912
self, dataset_urn: str, table: Union[SnowflakeTable, SnowflakeView]
931913
) -> Iterable[MetadataWorkUnit]:

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py

+57-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import logging
2-
from typing import Dict, List, Optional
2+
from typing import Dict, Iterable, List, Optional
33

4+
from datahub.emitter.mce_builder import get_sys_time
5+
from datahub.emitter.mcp import MetadataChangeProposalWrapper
6+
from datahub.ingestion.api.workunit import MetadataWorkUnit
47
from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain
58
from datahub.ingestion.source.snowflake.snowflake_config import (
69
SnowflakeV2Config,
@@ -12,7 +15,22 @@
1215
SnowflakeTag,
1316
_SnowflakeTagCache,
1417
)
15-
from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeCommonMixin
18+
from datahub.ingestion.source.snowflake.snowflake_utils import (
19+
SnowflakeCommonMixin,
20+
SnowflakeIdentifierBuilder,
21+
)
22+
from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp
23+
from datahub.metadata.com.linkedin.pegasus2avro.structured import (
24+
StructuredPropertyDefinition,
25+
)
26+
from datahub.metadata.urns import (
27+
ContainerUrn,
28+
DatasetUrn,
29+
DataTypeUrn,
30+
EntityTypeUrn,
31+
SchemaFieldUrn,
32+
StructuredPropertyUrn,
33+
)
1634

1735
logger: logging.Logger = logging.getLogger(__name__)
1836

@@ -23,11 +41,12 @@ def __init__(
2341
config: SnowflakeV2Config,
2442
data_dictionary: SnowflakeDataDictionary,
2543
report: SnowflakeV2Report,
44+
snowflake_identifiers: SnowflakeIdentifierBuilder,
2645
) -> None:
2746
self.config = config
2847
self.data_dictionary = data_dictionary
2948
self.report = report
30-
49+
self.snowflake_identifiers = snowflake_identifiers
3150
self.tag_cache: Dict[str, _SnowflakeTagCache] = {}
3251

3352
def _get_tags_on_object_without_propagation(
@@ -59,6 +78,41 @@ def _get_tags_on_object_without_propagation(
5978
raise ValueError(f"Unknown domain {domain}")
6079
return tags
6180

81+
def create_structured_property_templates(self) -> Iterable[MetadataWorkUnit]:
82+
for tag in self.data_dictionary.get_all_tags():
83+
if not self.config.structured_property_pattern.allowed(
84+
tag.tag_identifier()
85+
):
86+
continue
87+
if self.config.extract_tags_as_structured_properties:
88+
self.report.num_structured_property_templates_created += 1
89+
yield from self.gen_tag_as_structured_property_workunits(tag)
90+
91+
def gen_tag_as_structured_property_workunits(
92+
self, tag: SnowflakeTag
93+
) -> Iterable[MetadataWorkUnit]:
94+
identifier = self.snowflake_identifiers.snowflake_identifier(
95+
tag.structured_property_identifier()
96+
)
97+
urn = StructuredPropertyUrn(identifier).urn()
98+
aspect = StructuredPropertyDefinition(
99+
qualifiedName=identifier,
100+
displayName=tag.name,
101+
valueType=DataTypeUrn("datahub.string").urn(),
102+
entityTypes=[
103+
EntityTypeUrn(f"datahub.{ContainerUrn.ENTITY_TYPE}").urn(),
104+
EntityTypeUrn(f"datahub.{DatasetUrn.ENTITY_TYPE}").urn(),
105+
EntityTypeUrn(f"datahub.{SchemaFieldUrn.ENTITY_TYPE}").urn(),
106+
],
107+
lastModified=AuditStamp(
108+
time=get_sys_time(), actor="urn:li:corpuser:datahub"
109+
),
110+
)
111+
yield MetadataChangeProposalWrapper(
112+
entityUrn=urn,
113+
aspect=aspect,
114+
).as_workunit()
115+
62116
def _get_tags_on_object_with_propagation(
63117
self,
64118
domain: str,

metadata-ingestion/tests/integration/snowflake/common.py

+21-1
Original file line numberDiff line numberDiff line change
@@ -629,7 +629,27 @@ def default_query_results( # noqa: C901
629629
),
630630
]:
631631
return []
632-
632+
elif query == snowflake_query.SnowflakeQuery.get_all_tags():
633+
return [
634+
*[
635+
{
636+
"TAG_DATABASE": "TEST_DB",
637+
"TAG_SCHEMA": "TEST_SCHEMA",
638+
"TAG_NAME": f"my_tag_{ix}",
639+
}
640+
for ix in range(3)
641+
],
642+
{
643+
"TAG_DATABASE": "TEST_DB",
644+
"TAG_SCHEMA": "TEST_SCHEMA",
645+
"TAG_NAME": "security",
646+
},
647+
{
648+
"TAG_DATABASE": "OTHER_DB",
649+
"TAG_SCHEMA": "OTHER_SCHEMA",
650+
"TAG_NAME": "my_other_tag",
651+
},
652+
]
633653
elif (
634654
query
635655
== snowflake_query.SnowflakeQuery.get_all_tags_in_database_without_propagation(

metadata-ingestion/tests/integration/snowflake/test_snowflake.py

+1
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ def test_snowflake_tags_as_structured_properties(
219219
include_column_lineage=False,
220220
include_usage_stats=False,
221221
include_operational_stats=False,
222+
structured_properties_template_cache_invalidation_interval=0,
222223
),
223224
),
224225
sink=DynamicTypedConfig(

metadata-ingestion/tests/integration/snowflake/test_snowflake_tag.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ def test_snowflake_structured_property_pattern_deny():
115115
match_fully_qualified_names=True,
116116
schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]),
117117
extract_tags_as_structured_properties=True,
118+
structured_properties_template_cache_invalidation_interval=0,
118119
tag_pattern=AllowDenyPattern(
119120
deny=["TEST_DB.TEST_SCHEMA.my_tag_2:my_value_2"]
120121
),
@@ -142,7 +143,7 @@ def test_snowflake_structured_property_pattern_deny():
142143
source_report = pipeline.source.get_report()
143144
assert isinstance(source_report, SnowflakeV2Report)
144145
assert source_report.tags_scanned == 5
145-
assert source_report._processed_tags == {
146+
assert sorted(list(source_report._processed_tags)) == [
146147
"snowflake.other_db.other_schema.my_other_tag",
147148
"snowflake.test_db.test_schema.security",
148-
}
149+
]

0 commit comments

Comments
 (0)