Skip to content

Commit ac13f25

Browse files
feat(ingestion/snowflake):adds streams as a new dataset with lineage and properties. (#12318)
Co-authored-by: Mayuri Nehate <[email protected]>
1 parent 7f6e399 commit ac13f25

17 files changed

+3783
-1811
lines changed

metadata-ingestion/docs/sources/snowflake/snowflake_pre.md

+7-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ grant operate, usage on warehouse "<your-warehouse>" to role datahub_role;
1515
grant usage on DATABASE "<your-database>" to role datahub_role;
1616
grant usage on all schemas in database "<your-database>" to role datahub_role;
1717
grant usage on future schemas in database "<your-database>" to role datahub_role;
18+
grant select on all streams in database "<your-database>> to role datahub_role;
19+
grant select on future streams in database "<your-database>> to role datahub_role;
1820

1921
// If you are NOT using Snowflake Profiling or Classification feature: Grant references privileges to your tables and views
2022
grant references on all tables in database "<your-database>" to role datahub_role;
@@ -50,9 +52,12 @@ The details of each granted privilege can be viewed in [snowflake docs](https://
5052
If the warehouse is already running during ingestion or has auto-resume enabled,
5153
this permission is not required.
5254
- `usage` is required for us to run queries using the warehouse
53-
- `usage` on `database` and `schema` are required because without it tables and views inside them are not accessible. If an admin does the required grants on `table` but misses the grants on `schema` or the `database` in which the table/view exists then we will not be able to get metadata for the table/view.
55+
- `usage` on `database` and `schema` are required because without it tables, views, and streams inside them are not accessible. If an admin does the required grants on `table` but misses the grants on `schema` or the `database` in which the table/view/stream exists then we will not be able to get metadata for the table/view/stream.
5456
- If metadata is required only on some schemas then you can grant the usage privilieges only on a particular schema like
55-
57+
```sql
58+
grant usage on schema "<your-database>"."<your-schema>" to role datahub_role;
59+
```
60+
- `select` on `streams` is required in order for stream definitions to be available. This does not allow selecting of the data (not required) unless the underlying dataset has select access as well.
5661
```sql
5762
grant usage on schema "<your-database>"."<your-schema>" to role datahub_role;
5863
```

metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ class DatasetSubTypes(StrEnum):
2424
SAC_LIVE_DATA_MODEL = "Live Data Model"
2525
NEO4J_NODE = "Neo4j Node"
2626
NEO4J_RELATIONSHIP = "Neo4j Relationship"
27+
SNOWFLAKE_STREAM = "Snowflake Stream"
2728

2829
# TODO: Create separate entity...
2930
NOTEBOOK = "Notebook"

metadata-ingestion/src/datahub/ingestion/source/snowflake/constants.py

+1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ class SnowflakeObjectDomain(StrEnum):
5353
SCHEMA = "schema"
5454
COLUMN = "column"
5555
ICEBERG_TABLE = "iceberg table"
56+
STREAM = "stream"
5657

5758

5859
GENERIC_PERMISSION_ERROR_KEY = "permission-error"

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py

+10
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ class SnowflakeFilterConfig(SQLFilterConfig):
9898
)
9999
# table_pattern and view_pattern are inherited from SQLFilterConfig
100100

101+
stream_pattern: AllowDenyPattern = Field(
102+
default=AllowDenyPattern.allow_all(),
103+
description="Regex patterns for streams to filter in ingestion. Note: Defaults to table_pattern if not specified. Specify regex to match the entire view name in database.schema.view format. e.g. to match all views starting with customer in Customer database and public schema, use the regex 'Customer.public.customer.*'",
104+
)
105+
101106
match_fully_qualified_names: bool = Field(
102107
default=False,
103108
description="Whether `schema_pattern` is matched against fully qualified schema name `<catalog>.<schema>`.",
@@ -274,6 +279,11 @@ class SnowflakeV2Config(
274279
description="List of regex patterns for tags to include in ingestion. Only used if `extract_tags` is enabled.",
275280
)
276281

282+
include_streams: bool = Field(
283+
default=True,
284+
description="If enabled, streams will be ingested as separate entities from tables/views.",
285+
)
286+
277287
structured_property_pattern: AllowDenyPattern = Field(
278288
default=AllowDenyPattern.allow_all(),
279289
description=(

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py

+45-10
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
from datahub.sql_parsing.schema_resolver import SchemaResolver
5050
from datahub.sql_parsing.sql_parsing_aggregator import (
5151
KnownLineageMapping,
52+
ObservedQuery,
5253
PreparsedQuery,
5354
SqlAggregatorReport,
5455
SqlParsingAggregator,
@@ -241,7 +242,13 @@ def get_workunits_internal(
241242
use_cached_audit_log = audit_log_file.exists()
242243

243244
queries: FileBackedList[
244-
Union[KnownLineageMapping, PreparsedQuery, TableRename, TableSwap]
245+
Union[
246+
KnownLineageMapping,
247+
PreparsedQuery,
248+
TableRename,
249+
TableSwap,
250+
ObservedQuery,
251+
]
245252
]
246253
if use_cached_audit_log:
247254
logger.info("Using cached audit log")
@@ -252,7 +259,13 @@ def get_workunits_internal(
252259

253260
shared_connection = ConnectionWrapper(audit_log_file)
254261
queries = FileBackedList(shared_connection)
255-
entry: Union[KnownLineageMapping, PreparsedQuery, TableRename, TableSwap]
262+
entry: Union[
263+
KnownLineageMapping,
264+
PreparsedQuery,
265+
TableRename,
266+
TableSwap,
267+
ObservedQuery,
268+
]
256269

257270
with self.report.copy_history_fetch_timer:
258271
for entry in self.fetch_copy_history():
@@ -329,7 +342,7 @@ def fetch_copy_history(self) -> Iterable[KnownLineageMapping]:
329342

330343
def fetch_query_log(
331344
self, users: UsersMapping
332-
) -> Iterable[Union[PreparsedQuery, TableRename, TableSwap]]:
345+
) -> Iterable[Union[PreparsedQuery, TableRename, TableSwap, ObservedQuery]]:
333346
query_log_query = _build_enriched_query_log_query(
334347
start_time=self.config.window.start_time,
335348
end_time=self.config.window.end_time,
@@ -362,7 +375,7 @@ def fetch_query_log(
362375

363376
def _parse_audit_log_row(
364377
self, row: Dict[str, Any], users: UsersMapping
365-
) -> Optional[Union[TableRename, TableSwap, PreparsedQuery]]:
378+
) -> Optional[Union[TableRename, TableSwap, PreparsedQuery, ObservedQuery]]:
366379
json_fields = {
367380
"DIRECT_OBJECTS_ACCESSED",
368381
"OBJECTS_MODIFIED",
@@ -398,6 +411,34 @@ def _parse_audit_log_row(
398411
pass
399412
else:
400413
return None
414+
415+
user = CorpUserUrn(
416+
self.identifiers.get_user_identifier(
417+
res["user_name"], users.get(res["user_name"])
418+
)
419+
)
420+
421+
# Use direct_objects_accessed instead objects_modified
422+
# objects_modified returns $SYS_VIEW_X with no mapping
423+
has_stream_objects = any(
424+
obj.get("objectDomain") == "Stream" for obj in direct_objects_accessed
425+
)
426+
427+
# If a stream is used, default to query parsing.
428+
if has_stream_objects:
429+
logger.debug("Found matching stream object")
430+
return ObservedQuery(
431+
query=res["query_text"],
432+
session_id=res["session_id"],
433+
timestamp=res["query_start_time"].astimezone(timezone.utc),
434+
user=user,
435+
default_db=res["default_db"],
436+
default_schema=res["default_schema"],
437+
query_hash=get_query_fingerprint(
438+
res["query_text"], self.identifiers.platform, fast=True
439+
),
440+
)
441+
401442
upstreams = []
402443
column_usage = {}
403444

@@ -460,12 +501,6 @@ def _parse_audit_log_row(
460501
)
461502
)
462503

463-
user = CorpUserUrn(
464-
self.identifiers.get_user_identifier(
465-
res["user_name"], users.get(res["user_name"])
466-
)
467-
)
468-
469504
timestamp: datetime = res["query_start_time"]
470505
timestamp = timestamp.astimezone(timezone.utc)
471506

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py

+20-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from datahub.utilities.prefix_batch_builder import PrefixGroup
1010

1111
SHOW_VIEWS_MAX_PAGE_SIZE = 10000
12+
SHOW_STREAM_MAX_PAGE_SIZE = 10000
1213

1314

1415
def create_deny_regex_sql_filter(
@@ -36,6 +37,7 @@ class SnowflakeQuery:
3637
SnowflakeObjectDomain.VIEW.capitalize(),
3738
SnowflakeObjectDomain.MATERIALIZED_VIEW.capitalize(),
3839
SnowflakeObjectDomain.ICEBERG_TABLE.capitalize(),
40+
SnowflakeObjectDomain.STREAM.capitalize(),
3941
}
4042

4143
ACCESS_HISTORY_TABLE_VIEW_DOMAINS_FILTER = "({})".format(
@@ -44,7 +46,8 @@ class SnowflakeQuery:
4446
ACCESS_HISTORY_TABLE_DOMAINS_FILTER = (
4547
"("
4648
f"'{SnowflakeObjectDomain.TABLE.capitalize()}',"
47-
f"'{SnowflakeObjectDomain.VIEW.capitalize()}'"
49+
f"'{SnowflakeObjectDomain.VIEW.capitalize()}',"
50+
f"'{SnowflakeObjectDomain.STREAM.capitalize()}',"
4851
")"
4952
)
5053

@@ -963,3 +966,19 @@ def dmf_assertion_results(start_time_millis: int, end_time_millis: int) -> str:
963966
@staticmethod
964967
def get_all_users() -> str:
965968
return """SELECT name as "NAME", email as "EMAIL" FROM SNOWFLAKE.ACCOUNT_USAGE.USERS"""
969+
970+
@staticmethod
971+
def streams_for_database(
972+
db_name: str,
973+
limit: int = SHOW_STREAM_MAX_PAGE_SIZE,
974+
stream_pagination_marker: Optional[str] = None,
975+
) -> str:
976+
# SHOW STREAMS can return a maximum of 10000 rows.
977+
# https://docs.snowflake.com/en/sql-reference/sql/show-streams#usage-notes
978+
assert limit <= SHOW_STREAM_MAX_PAGE_SIZE
979+
980+
# To work around this, we paginate through the results using the FROM clause.
981+
from_clause = (
982+
f"""FROM '{stream_pagination_marker}'""" if stream_pagination_marker else ""
983+
)
984+
return f"""SHOW STREAMS IN DATABASE {db_name} LIMIT {limit} {from_clause};"""

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py

+6
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ class SnowflakeV2Report(
104104
schemas_scanned: int = 0
105105
databases_scanned: int = 0
106106
tags_scanned: int = 0
107+
streams_scanned: int = 0
107108

108109
include_usage_stats: bool = False
109110
include_operational_stats: bool = False
@@ -113,6 +114,7 @@ class SnowflakeV2Report(
113114
table_lineage_query_secs: float = -1
114115
external_lineage_queries_secs: float = -1
115116
num_tables_with_known_upstreams: int = 0
117+
num_streams_with_known_upstreams: int = 0
116118
num_upstream_lineage_edge_parsing_failed: int = 0
117119
num_secure_views_missing_definition: int = 0
118120
num_structured_property_templates_created: int = 0
@@ -131,6 +133,8 @@ class SnowflakeV2Report(
131133
num_get_tags_for_object_queries: int = 0
132134
num_get_tags_on_columns_for_table_queries: int = 0
133135

136+
num_get_streams_for_schema_queries: int = 0
137+
134138
rows_zero_objects_modified: int = 0
135139

136140
_processed_tags: MutableSet[str] = field(default_factory=set)
@@ -157,6 +161,8 @@ def report_entity_scanned(self, name: str, ent_type: str = "table") -> None:
157161
return
158162
self._scanned_tags.add(name)
159163
self.tags_scanned += 1
164+
elif ent_type == "stream":
165+
self.streams_scanned += 1
160166
else:
161167
raise KeyError(f"Unknown entity {ent_type}.")
162168

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema.py

+98-4
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
)
1515
from datahub.ingestion.source.sql.sql_generic import BaseColumn, BaseTable, BaseView
1616
from datahub.utilities.file_backed_collections import FileBackedDict
17-
from datahub.utilities.prefix_batch_builder import build_prefix_batches
17+
from datahub.utilities.prefix_batch_builder import PrefixGroup, build_prefix_batches
1818
from datahub.utilities.serialized_lru_cache import serialized_lru_cache
1919

2020
logger: logging.Logger = logging.getLogger(__name__)
@@ -118,6 +118,7 @@ class SnowflakeSchema:
118118
comment: Optional[str]
119119
tables: List[str] = field(default_factory=list)
120120
views: List[str] = field(default_factory=list)
121+
streams: List[str] = field(default_factory=list)
121122
tags: Optional[List[SnowflakeTag]] = None
122123

123124

@@ -131,6 +132,29 @@ class SnowflakeDatabase:
131132
tags: Optional[List[SnowflakeTag]] = None
132133

133134

135+
@dataclass
136+
class SnowflakeStream:
137+
name: str
138+
created: datetime
139+
owner: str
140+
source_type: str
141+
type: str
142+
stale: str
143+
mode: str
144+
invalid_reason: str
145+
owner_role_type: str
146+
database_name: str
147+
schema_name: str
148+
table_name: str
149+
comment: Optional[str]
150+
columns: List[SnowflakeColumn] = field(default_factory=list)
151+
stale_after: Optional[datetime] = None
152+
base_tables: Optional[str] = None
153+
tags: Optional[List[SnowflakeTag]] = None
154+
column_tags: Dict[str, List[SnowflakeTag]] = field(default_factory=dict)
155+
last_altered: Optional[datetime] = None
156+
157+
134158
class _SnowflakeTagCache:
135159
def __init__(self) -> None:
136160
# self._database_tags[<database_name>] = list of tags applied to database
@@ -208,6 +232,7 @@ def as_obj(self) -> Dict[str, Dict[str, int]]:
208232
self.get_tables_for_database,
209233
self.get_views_for_database,
210234
self.get_columns_for_schema,
235+
self.get_streams_for_database,
211236
self.get_pk_constraints_for_schema,
212237
self.get_fk_constraints_for_schema,
213238
]
@@ -431,9 +456,18 @@ def get_columns_for_schema(
431456
# For massive schemas, use a FileBackedDict to avoid memory issues.
432457
columns = FileBackedDict()
433458

434-
object_batches = build_prefix_batches(
435-
all_objects, max_batch_size=10000, max_groups_in_batch=5
436-
)
459+
# Single prefix table case (for streams)
460+
if len(all_objects) == 1:
461+
object_batches = [
462+
[PrefixGroup(prefix=all_objects[0], names=[], exact_match=True)]
463+
]
464+
else:
465+
# Build batches for full schema scan
466+
object_batches = build_prefix_batches(
467+
all_objects, max_batch_size=10000, max_groups_in_batch=5
468+
)
469+
470+
# Process batches
437471
for batch_index, object_batch in enumerate(object_batches):
438472
if batch_index > 0:
439473
logger.info(
@@ -611,3 +645,63 @@ def get_tags_on_columns_for_table(
611645
tags[column_name].append(snowflake_tag)
612646

613647
return tags
648+
649+
@serialized_lru_cache(maxsize=1)
650+
def get_streams_for_database(
651+
self, db_name: str
652+
) -> Dict[str, List[SnowflakeStream]]:
653+
page_limit = SHOW_VIEWS_MAX_PAGE_SIZE
654+
655+
streams: Dict[str, List[SnowflakeStream]] = {}
656+
657+
first_iteration = True
658+
stream_pagination_marker: Optional[str] = None
659+
while first_iteration or stream_pagination_marker is not None:
660+
cur = self.connection.query(
661+
SnowflakeQuery.streams_for_database(
662+
db_name,
663+
limit=page_limit,
664+
stream_pagination_marker=stream_pagination_marker,
665+
)
666+
)
667+
668+
first_iteration = False
669+
stream_pagination_marker = None
670+
671+
result_set_size = 0
672+
for stream in cur:
673+
result_set_size += 1
674+
675+
stream_name = stream["name"]
676+
schema_name = stream["schema_name"]
677+
if schema_name not in streams:
678+
streams[schema_name] = []
679+
streams[stream["schema_name"]].append(
680+
SnowflakeStream(
681+
name=stream["name"],
682+
created=stream["created_on"],
683+
owner=stream["owner"],
684+
comment=stream["comment"],
685+
source_type=stream["source_type"],
686+
type=stream["type"],
687+
stale=stream["stale"],
688+
mode=stream["mode"],
689+
database_name=stream["database_name"],
690+
schema_name=stream["schema_name"],
691+
invalid_reason=stream["invalid_reason"],
692+
owner_role_type=stream["owner_role_type"],
693+
stale_after=stream["stale_after"],
694+
table_name=stream["table_name"],
695+
base_tables=stream["base_tables"],
696+
last_altered=stream["created_on"],
697+
)
698+
)
699+
700+
if result_set_size >= page_limit:
701+
# If we hit the limit, we need to send another request to get the next page.
702+
logger.info(
703+
f"Fetching next page of streams for {db_name} - after {stream_name}"
704+
)
705+
stream_pagination_marker = stream_name
706+
707+
return streams

0 commit comments

Comments
 (0)