Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/snowflake): Create all structured propery templates before assignation #12469

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ class SnowflakeV2Config(
description="If enabled along with `extract_tags`, extracts snowflake's key-value tags as DataHub structured properties instead of DataHub tags.",
)

structured_properties_template_cache_invalidation_interval: int = Field(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe hide this one from docs, feels more like an implementation detail

default=60,
description="Interval in seconds to invalidate the structured properties template cache.",
)

include_external_url: bool = Field(
default=True,
description="Whether to populate Snowsight url for Snowflake Objects",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ def tables_for_schema(schema_name: str, db_name: Optional[str]) -> str:
and table_type in ('BASE TABLE', 'EXTERNAL TABLE', 'HYBRID TABLE')
order by table_schema, table_name"""

@staticmethod
def get_all_tags():
return """
SELECT tag_database as "TAG_DATABASE",
tag_schema AS "TAG_SCHEMA",
tag_name AS "TAG_NAME",
FROM snowflake.account_usage.tag_references
GROUP BY TAG_DATABASE , TAG_SCHEMA, tag_name
ORDER BY TAG_DATABASE, TAG_SCHEMA, TAG_NAME ASC;
"""

@staticmethod
def get_all_tags_on_object_with_propagation(
db_name: str, quoted_identifier: str, domain: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class SnowflakeV2Report(
num_tables_with_known_upstreams: int = 0
num_upstream_lineage_edge_parsing_failed: int = 0
num_secure_views_missing_definition: int = 0
num_structured_property_templates_created: int = 0

data_dictionary_cache: Optional["SnowflakeDataDictionary"] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,22 @@ def get_secure_view_definitions(self) -> Dict[str, Dict[str, Dict[str, str]]]:

return secure_view_definitions

def get_all_tags(self) -> List[SnowflakeTag]:
cur = self.connection.query(
SnowflakeQuery.get_all_tags(),
)

tags = []
for tag in cur:
snowflake_tag = SnowflakeTag(
database=tag["TAG_DATABASE"],
schema=tag["TAG_SCHEMA"],
name=tag["TAG_NAME"],
value="",
)
tags.append(snowflake_tag)
return tags
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be a list comprehension a bit more succinctly


@serialized_lru_cache(maxsize=1)
def get_tables_for_database(
self, db_name: str
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import itertools
import logging
import time
from typing import Dict, Iterable, List, Optional, Union

from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter.mce_builder import (
get_sys_time,
make_data_platform_urn,
make_dataset_urn_with_platform_instance,
make_schema_field_urn,
Expand Down Expand Up @@ -74,7 +74,6 @@
PROFILING,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp,
GlobalTags,
Status,
SubTypes,
Expand All @@ -101,15 +100,8 @@
StringType,
TimeType,
)
from datahub.metadata.com.linkedin.pegasus2avro.structured import (
StructuredPropertyDefinition,
)
from datahub.metadata.com.linkedin.pegasus2avro.tag import TagProperties
from datahub.metadata.urns import (
ContainerUrn,
DatasetUrn,
DataTypeUrn,
EntityTypeUrn,
SchemaFieldUrn,
StructuredPropertyUrn,
)
Expand Down Expand Up @@ -191,7 +183,7 @@ def __init__(
self.domain_registry: Optional[DomainRegistry] = domain_registry
self.classification_handler = ClassificationHandler(self.config, self.report)
self.tag_extractor = SnowflakeTagExtractor(
config, self.data_dictionary, self.report
config, self.data_dictionary, self.report, identifiers
)
self.profiler: Optional[SnowflakeProfiler] = profiler
self.snowsight_url_builder: Optional[SnowsightUrlBuilder] = (
Expand All @@ -217,6 +209,16 @@ def snowflake_identifier(self, identifier: str) -> str:
return self.identifiers.snowflake_identifier(identifier)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
if self.config.extract_tags_as_structured_properties:
logger.info("Creating structured property templates for tags")
yield from self.tag_extractor.create_structured_property_templates()
# We have to wait until cache invalidates to make sure the structured property template is available
logger.info(
f"Waiting for {self.config.structured_properties_template_cache_invalidation_interval} seconds for structured properties cache to invalidate"
)
time.sleep(
self.config.structured_properties_template_cache_invalidation_interval
)
self.databases = []
for database in self.get_databases() or []:
self.report.report_entity_scanned(database.name, "database")
Expand Down Expand Up @@ -698,6 +700,7 @@ def _process_view(

def _process_tag(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]:
use_sp = self.config.extract_tags_as_structured_properties

identifier = (
self.snowflake_identifier(tag.structured_property_identifier())
if use_sp
Expand All @@ -708,10 +711,11 @@ def _process_tag(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]:
return

self.report.report_tag_processed(identifier)

if use_sp:
yield from self.gen_tag_as_structured_property_workunits(tag)
else:
yield from self.gen_tag_workunits(tag)
return

yield from self.gen_tag_workunits(tag)

def _format_tags_as_structured_properties(
self, tags: List[SnowflakeTag]
Expand All @@ -732,6 +736,7 @@ def gen_dataset_workunits(
if table.tags:
for tag in table.tags:
yield from self._process_tag(tag)

for column_name in table.column_tags:
for tag in table.column_tags[column_name]:
yield from self._process_tag(tag)
Expand Down Expand Up @@ -903,29 +908,6 @@ def gen_tag_workunits(self, tag: SnowflakeTag) -> Iterable[MetadataWorkUnit]:
entityUrn=tag_urn, aspect=tag_properties_aspect
).as_workunit()

def gen_tag_as_structured_property_workunits(
self, tag: SnowflakeTag
) -> Iterable[MetadataWorkUnit]:
identifier = self.snowflake_identifier(tag.structured_property_identifier())
urn = StructuredPropertyUrn(identifier).urn()
aspect = StructuredPropertyDefinition(
qualifiedName=identifier,
displayName=tag.name,
valueType=DataTypeUrn("datahub.string").urn(),
entityTypes=[
EntityTypeUrn(f"datahub.{ContainerUrn.ENTITY_TYPE}").urn(),
EntityTypeUrn(f"datahub.{DatasetUrn.ENTITY_TYPE}").urn(),
EntityTypeUrn(f"datahub.{SchemaFieldUrn.ENTITY_TYPE}").urn(),
],
lastModified=AuditStamp(
time=get_sys_time(), actor="urn:li:corpuser:datahub"
),
)
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=aspect,
).as_workunit()

def gen_column_tags_as_structured_properties(
self, dataset_urn: str, table: Union[SnowflakeTable, SnowflakeView]
) -> Iterable[MetadataWorkUnit]:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging
from typing import Dict, List, Optional
from typing import Dict, Iterable, List, Optional

from datahub.emitter.mce_builder import get_sys_time
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.snowflake.constants import SnowflakeObjectDomain
from datahub.ingestion.source.snowflake.snowflake_config import (
SnowflakeV2Config,
Expand All @@ -12,7 +15,22 @@
SnowflakeTag,
_SnowflakeTagCache,
)
from datahub.ingestion.source.snowflake.snowflake_utils import SnowflakeCommonMixin
from datahub.ingestion.source.snowflake.snowflake_utils import (
SnowflakeCommonMixin,
SnowflakeIdentifierBuilder,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import AuditStamp
from datahub.metadata.com.linkedin.pegasus2avro.structured import (
StructuredPropertyDefinition,
)
from datahub.metadata.urns import (
ContainerUrn,
DatasetUrn,
DataTypeUrn,
EntityTypeUrn,
SchemaFieldUrn,
StructuredPropertyUrn,
)

logger: logging.Logger = logging.getLogger(__name__)

Expand All @@ -23,11 +41,12 @@
config: SnowflakeV2Config,
data_dictionary: SnowflakeDataDictionary,
report: SnowflakeV2Report,
snowflake_identifiers: SnowflakeIdentifierBuilder,
) -> None:
self.config = config
self.data_dictionary = data_dictionary
self.report = report

self.snowflake_identifiers = snowflake_identifiers
self.tag_cache: Dict[str, _SnowflakeTagCache] = {}

def _get_tags_on_object_without_propagation(
Expand Down Expand Up @@ -59,6 +78,46 @@
raise ValueError(f"Unknown domain {domain}")
return tags

def create_structured_property_templates(self) -> Iterable[MetadataWorkUnit]:
for tag in self.data_dictionary.get_all_tags():
if not self.config.tag_pattern.allowed(tag.tag_identifier()):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think structured_property_pattern makes more sense here, right? Our tag_pattern expects a key value pair, not just a key

continue

Check warning on line 84 in metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_tag.py#L84

Added line #L84 was not covered by tests
# Do we need to filter based on database and schema or is it enough if we filter based on tag pattern?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say no, because you can apply tags from other databases / schemas

# if not self.config.database_pattern.allowed(tag.database):
# continue
# if not self.config.schema_pattern.allowed(f"{tag.database}.{tag.schema}"):
# continue

if self.config.extract_tags_as_structured_properties:
self.report.num_structured_property_templates_created += 1
for workunit in self.gen_tag_as_structured_property_workunits(tag):
yield workunit
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for workunit in self.gen_tag_as_structured_property_workunits(tag):
yield workunit
yield from self.gen_tag_as_structured_property_workunits(tag)


def gen_tag_as_structured_property_workunits(
self, tag: SnowflakeTag
) -> Iterable[MetadataWorkUnit]:
identifier = self.snowflake_identifiers.snowflake_identifier(
tag.structured_property_identifier()
)
urn = StructuredPropertyUrn(identifier).urn()
aspect = StructuredPropertyDefinition(
qualifiedName=identifier,
displayName=tag.name,
valueType=DataTypeUrn("datahub.string").urn(),
entityTypes=[
EntityTypeUrn(f"datahub.{ContainerUrn.ENTITY_TYPE}").urn(),
EntityTypeUrn(f"datahub.{DatasetUrn.ENTITY_TYPE}").urn(),
EntityTypeUrn(f"datahub.{SchemaFieldUrn.ENTITY_TYPE}").urn(),
],
lastModified=AuditStamp(
time=get_sys_time(), actor="urn:li:corpuser:datahub"
),
)
yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=aspect,
).as_workunit()

def _get_tags_on_object_with_propagation(
self,
domain: str,
Expand Down
22 changes: 21 additions & 1 deletion metadata-ingestion/tests/integration/snowflake/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,27 @@ def default_query_results( # noqa: C901
),
]:
return []

elif query == snowflake_query.SnowflakeQuery.get_all_tags():
return [
*[
{
"TAG_DATABASE": "TEST_DB",
"TAG_SCHEMA": "TEST_SCHEMA",
"TAG_NAME": f"my_tag_{ix}",
}
for ix in range(3)
],
{
"TAG_DATABASE": "TEST_DB",
"TAG_SCHEMA": "TEST_SCHEMA",
"TAG_NAME": "security",
},
{
"TAG_DATABASE": "OTHER_DB",
"TAG_SCHEMA": "OTHER_SCHEMA",
"TAG_NAME": "my_other_tag",
},
]
elif (
query
== snowflake_query.SnowflakeQuery.get_all_tags_in_database_without_propagation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ def test_snowflake_tags_as_structured_properties(
include_column_lineage=False,
include_usage_stats=False,
include_operational_stats=False,
structured_properties_template_cache_invalidation_interval=1,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For test I'd prob put this at 0 unless it's needed just to avoid unnecessary sleeps

),
),
sink=DynamicTypedConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def test_snowflake_structured_property_pattern_deny():
match_fully_qualified_names=True,
schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]),
extract_tags_as_structured_properties=True,
structured_properties_template_cache_invalidation_interval=0,
tag_pattern=AllowDenyPattern(
deny=["TEST_DB.TEST_SCHEMA.my_tag_2:my_value_2"]
),
Expand Down Expand Up @@ -142,7 +143,7 @@ def test_snowflake_structured_property_pattern_deny():
source_report = pipeline.source.get_report()
assert isinstance(source_report, SnowflakeV2Report)
assert source_report.tags_scanned == 5
assert source_report._processed_tags == {
assert sorted(list(source_report._processed_tags)) == [
"snowflake.other_db.other_schema.my_other_tag",
"snowflake.test_db.test_schema.security",
}
]
Loading