Skip to content

Commit b03d6f0

Browse files
committed
Create structured propery templates before assignation
1 parent a4f8d17 commit b03d6f0

File tree

6 files changed

+110
-10
lines changed

6 files changed

+110
-10
lines changed

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py

+5
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,11 @@ class SnowflakeV2Config(
249249
description="If enabled along with `extract_tags`, extracts snowflake's key-value tags as DataHub structured properties instead of DataHub tags.",
250250
)
251251

252+
structured_properties_template_cache_invalidation_interval: int = Field(
253+
default=60,
254+
description="Interval in seconds to invalidate the structured properties template cache.",
255+
)
256+
252257
include_external_url: bool = Field(
253258
default=True,
254259
description="Whether to populate Snowsight url for Snowflake Objects",

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py

+11
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,17 @@ def tables_for_schema(schema_name: str, db_name: Optional[str]) -> str:
159159
and table_type in ('BASE TABLE', 'EXTERNAL TABLE', 'HYBRID TABLE')
160160
order by table_schema, table_name"""
161161

162+
@staticmethod
163+
def get_all_tags():
164+
return """
165+
SELECT tag_database as "TAG_DATABASE",
166+
tag_schema AS "TAG_SCHEMA",
167+
tag_name AS "TAG_NAME",
168+
FROM snowflake.account_usage.tag_references
169+
GROUP BY TAG_DATABASE , TAG_SCHEMA, tag_name
170+
ORDER BY TAG_DATABASE, TAG_SCHEMA, TAG_NAME ASC;
171+
"""
172+
162173
@staticmethod
163174
def get_all_tags_on_object_with_propagation(
164175
db_name: str, quoted_identifier: str, domain: str

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py

+1
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ class SnowflakeV2Report(
114114
num_tables_with_known_upstreams: int = 0
115115
num_upstream_lineage_edge_parsing_failed: int = 0
116116
num_secure_views_missing_definition: int = 0
117+
num_structured_property_templates_created: int = 0
117118

118119
data_dictionary_cache: Optional["SnowflakeDataDictionary"] = None
119120

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

+16
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,22 @@ def get_secure_view_definitions(self) -> Dict[str, Dict[str, Dict[str, str]]]:
285285

286286
return secure_view_definitions
287287

288+
def get_all_tags(self) -> List[SnowflakeTag]:
289+
cur = self.connection.query(
290+
SnowflakeQuery.get_all_tags(),
291+
)
292+
293+
tags = []
294+
for tag in cur:
295+
snowflake_tag = SnowflakeTag(
296+
database=tag["TAG_DATABASE"],
297+
schema=tag["TAG_SCHEMA"],
298+
name=tag["TAG_NAME"],
299+
value="",
300+
)
301+
tags.append(snowflake_tag)
302+
return tags
303+
288304
@serialized_lru_cache(maxsize=1)
289305
def get_tables_for_database(
290306
self, db_name: str

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py

+16-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import itertools
22
import logging
3+
import time
34
from typing import Dict, Iterable, List, Optional, Union
45

56
from datahub.configuration.pattern_utils import is_schema_allowed
@@ -191,12 +192,12 @@ def __init__(
191192
self.domain_registry: Optional[DomainRegistry] = domain_registry
192193
self.classification_handler = ClassificationHandler(self.config, self.report)
193194
self.tag_extractor = SnowflakeTagExtractor(
194-
config, self.data_dictionary, self.report
195+
config, self.data_dictionary, self.report, identifiers
195196
)
196197
self.profiler: Optional[SnowflakeProfiler] = profiler
197-
self.snowsight_url_builder: Optional[SnowsightUrlBuilder] = (
198-
snowsight_url_builder
199-
)
198+
self.snowsight_url_builder: Optional[
199+
SnowsightUrlBuilder
200+
] = snowsight_url_builder
200201

201202
# These are populated as side-effects of get_workunits_internal.
202203
self.databases: List[SnowflakeDatabase] = []
@@ -217,6 +218,13 @@ def snowflake_identifier(self, identifier: str) -> str:
217218
return self.identifiers.snowflake_identifier(identifier)
218219

219220
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
221+
if self.config.extract_tags_as_structured_properties:
222+
logger.info("Creating structured property templates for tags")
223+
yield from self.tag_extractor.create_structured_property_templates()
224+
# We have to wait until cache invalidates to make sure the structured property template is available
225+
time.sleep(
226+
self.config.structured_properties_template_cache_invalidation_interval
227+
)
220228
self.databases = []
221229
for database in self.get_databases() or []:
222230
self.report.report_entity_scanned(database.name, "database")
@@ -267,9 +275,9 @@ def get_databases(self) -> Optional[List[SnowflakeDatabase]]:
267275
)
268276
return None
269277
else:
270-
ischema_databases: List[SnowflakeDatabase] = (
271-
self.get_databases_from_ischema(databases)
272-
)
278+
ischema_databases: List[
279+
SnowflakeDatabase
280+
] = self.get_databases_from_ischema(databases)
273281

274282
if len(ischema_databases) == 0:
275283
self.structured_reporter.failure(
@@ -732,6 +740,7 @@ def gen_dataset_workunits(
732740
if table.tags:
733741
for tag in table.tags:
734742
yield from self._process_tag(tag)
743+
735744
for column_name in table.column_tags:
736745
for tag in table.column_tags[column_name]:
737746
yield from self._process_tag(tag)

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py

+61-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import logging
2-
from typing import Dict, List, Optional
2+
from typing import Dict, Iterable, List, Optional
33

4+
from datahub.emitter.mce_builder import get_sys_time
5+
from datahub.emitter.mcp import MetadataChangeProposalWrapper
6+
from datahub.ingestion.api.workunit import MetadataWorkUnit
47
from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain
58
from datahub.ingestion.source.snowflake.snowflake_config import (
69
SnowflakeV2Config,
@@ -12,7 +15,22 @@
1215
SnowflakeTag,
1316
_SnowflakeTagCache,
1417
)
15-
from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeCommonMixin
18+
from datahub.ingestion.source.snowflake.snowflake_utils import (
19+
SnowflakeCommonMixin,
20+
SnowflakeIdentifierBuilder,
21+
)
22+
from datahub.metadata._urns.urn_defs import (
23+
ContainerUrn,
24+
DatasetUrn,
25+
DataTypeUrn,
26+
EntityTypeUrn,
27+
SchemaFieldUrn,
28+
StructuredPropertyUrn,
29+
)
30+
from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp
31+
from datahub.metadata.com.linkedin.pegasus2avro.structured import (
32+
StructuredPropertyDefinition,
33+
)
1634

1735
logger: logging.Logger = logging.getLogger(__name__)
1836

@@ -23,11 +41,12 @@ def __init__(
2341
config: SnowflakeV2Config,
2442
data_dictionary: SnowflakeDataDictionary,
2543
report: SnowflakeV2Report,
44+
snowflake_identifiers: SnowflakeIdentifierBuilder,
2645
) -> None:
2746
self.config = config
2847
self.data_dictionary = data_dictionary
2948
self.report = report
30-
49+
self.snowflake_identifiers = snowflake_identifiers
3150
self.tag_cache: Dict[str, _SnowflakeTagCache] = {}
3251

3352
def _get_tags_on_object_without_propagation(
@@ -59,6 +78,45 @@ def _get_tags_on_object_without_propagation(
5978
raise ValueError(f"Unknown domain {domain}")
6079
return tags
6180

81+
def create_structured_property_templates(self) -> Iterable[MetadataWorkUnit]:
82+
for tag in self.data_dictionary.get_all_tags():
83+
if not self.config.tag_pattern.allowed(tag.tag_identifier()):
84+
continue
85+
if not self.config.database_pattern.allowed(tag.database):
86+
continue
87+
if not self.config.schema_pattern.allowed(f"{tag.database}.{tag.schema}"):
88+
continue
89+
90+
if self.config.extract_tags_as_structured_properties:
91+
self.report.num_structured_property_templates_created += 1
92+
for workunit in self.gen_tag_as_structured_property_workunits(tag):
93+
yield workunit
94+
95+
def gen_tag_as_structured_property_workunits(
96+
self, tag: SnowflakeTag
97+
) -> Iterable[MetadataWorkUnit]:
98+
identifier = self.snowflake_identifiers.snowflake_identifier(
99+
tag.structured_property_identifier()
100+
)
101+
urn = StructuredPropertyUrn(identifier).urn()
102+
aspect = StructuredPropertyDefinition(
103+
qualifiedName=identifier,
104+
displayName=tag.name,
105+
valueType=DataTypeUrn("datahub.string").urn(),
106+
entityTypes=[
107+
EntityTypeUrn(f"datahub.{ContainerUrn.ENTITY_TYPE}").urn(),
108+
EntityTypeUrn(f"datahub.{DatasetUrn.ENTITY_TYPE}").urn(),
109+
EntityTypeUrn(f"datahub.{SchemaFieldUrn.ENTITY_TYPE}").urn(),
110+
],
111+
lastModified=AuditStamp(
112+
time=get_sys_time(), actor="urn:li:corpuser:datahub"
113+
),
114+
)
115+
yield MetadataChangeProposalWrapper(
116+
entityUrn=urn,
117+
aspect=aspect,
118+
).as_workunit()
119+
62120
def _get_tags_on_object_with_propagation(
63121
self,
64122
domain: str,

0 commit comments

Comments
 (0)