Skip to content

Commit f4f9bd3

Browse files
authored
feat(ingest/snowflake): include external table ddl lineage for queries… (#12179)
1 parent 42d4254 commit f4f9bd3

File tree

5 files changed

+80
-84
lines changed

5 files changed

+80
-84
lines changed

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py

+4-51
Original file line numberDiff line numberDiff line change
@@ -265,64 +265,17 @@ def _populate_external_upstreams(self, discovered_tables: List[str]) -> None:
265265
with PerfTimer() as timer:
266266
self.report.num_external_table_edges_scanned = 0
267267

268-
for (
269-
known_lineage_mapping
270-
) in self._populate_external_lineage_from_copy_history(discovered_tables):
271-
self.sql_aggregator.add(known_lineage_mapping)
272-
logger.info(
273-
"Done populating external lineage from copy history. "
274-
f"Found {self.report.num_external_table_edges_scanned} external lineage edges so far."
275-
)
276-
277-
for (
278-
known_lineage_mapping
279-
) in self._populate_external_lineage_from_show_query(discovered_tables):
280-
self.sql_aggregator.add(known_lineage_mapping)
281-
282-
logger.info(
283-
"Done populating external lineage from show external tables. "
284-
f"Found {self.report.num_external_table_edges_scanned} external lineage edges so far."
285-
)
268+
for entry in self._get_copy_history_lineage(discovered_tables):
269+
self.sql_aggregator.add(entry)
270+
logger.info("Done populating external lineage from copy history. ")
286271

287272
self.report.external_lineage_queries_secs = timer.elapsed_seconds()
288273

289-
# Handles the case for explicitly created external tables.
290-
# NOTE: Snowflake does not log this information to the access_history table.
291-
def _populate_external_lineage_from_show_query(
292-
self, discovered_tables: List[str]
293-
) -> Iterable[KnownLineageMapping]:
294-
external_tables_query: str = SnowflakeQuery.show_external_tables()
295-
try:
296-
for db_row in self.connection.query(external_tables_query):
297-
key = self.identifiers.get_dataset_identifier(
298-
db_row["name"], db_row["schema_name"], db_row["database_name"]
299-
)
300-
301-
if key not in discovered_tables:
302-
continue
303-
if db_row["location"].startswith("s3://"):
304-
yield KnownLineageMapping(
305-
upstream_urn=make_s3_urn_for_lineage(
306-
db_row["location"], self.config.env
307-
),
308-
downstream_urn=self.identifiers.gen_dataset_urn(key),
309-
)
310-
self.report.num_external_table_edges_scanned += 1
311-
312-
self.report.num_external_table_edges_scanned += 1
313-
except Exception as e:
314-
logger.debug(e, exc_info=e)
315-
self.structured_reporter.warning(
316-
"Error populating external table lineage from Snowflake",
317-
exc=e,
318-
)
319-
self.report_status(EXTERNAL_LINEAGE, False)
320-
321274
# Handles the case where a table is populated from an external stage/s3 location via copy.
322275
# Eg: copy into category_english from @external_s3_stage;
323276
# Eg: copy into category_english from 's3://acryl-snow-demo-olist/olist_raw_data/category_english'credentials=(aws_key_id='...' aws_secret_key='...') pattern='.*.csv';
324277
# NOTE: Snowflake does not log this information to the access_history table.
325-
def _populate_external_lineage_from_copy_history(
278+
def _get_copy_history_lineage(
326279
self, discovered_tables: List[str]
327280
) -> Iterable[KnownLineageMapping]:
328281
query: str = SnowflakeQuery.copy_lineage_history(

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_queries.py

-3
Original file line numberDiff line numberDiff line change
@@ -247,9 +247,6 @@ def get_workunits_internal(
247247
for entry in self.fetch_copy_history():
248248
queries.append(entry)
249249

250-
# TODO: Add "show external tables" lineage to the main schema extractor.
251-
# Because it's not a time-based thing, it doesn't really make sense in the snowflake-queries extractor.
252-
253250
with self.report.query_log_fetch_timer:
254251
for entry in self.fetch_query_log():
255252
queries.append(entry)

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py

+52-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
ClassificationHandler,
1717
classification_workunit_processor,
1818
)
19+
from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage
1920
from datahub.ingestion.source.common.subtypes import (
2021
DatasetContainerSubTypes,
2122
DatasetSubTypes,
@@ -35,6 +36,7 @@
3536
)
3637
from datahub.ingestion.source.snowflake.snowflake_data_reader import SnowflakeDataReader
3738
from datahub.ingestion.source.snowflake.snowflake_profiler import SnowflakeProfiler
39+
from datahub.ingestion.source.snowflake.snowflake_query import SnowflakeQuery
3840
from datahub.ingestion.source.snowflake.snowflake_report import SnowflakeV2Report
3941
from datahub.ingestion.source.snowflake.snowflake_schema import (
4042
SCHEMA_PARALLELISM,
@@ -65,6 +67,7 @@
6567
get_domain_wu,
6668
)
6769
from datahub.ingestion.source_report.ingestion_stage import (
70+
EXTERNAL_TABLE_DDL_LINEAGE,
6871
METADATA_EXTRACTION,
6972
PROFILING,
7073
)
@@ -96,7 +99,10 @@
9699
TimeType,
97100
)
98101
from datahub.metadata.com.linkedin.pegasus2avro.tag import TagProperties
99-
from datahub.sql_parsing.sql_parsing_aggregator import SqlParsingAggregator
102+
from datahub.sql_parsing.sql_parsing_aggregator import (
103+
KnownLineageMapping,
104+
SqlParsingAggregator,
105+
)
100106
from datahub.utilities.registries.domain_registry import DomainRegistry
101107
from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor
102108

@@ -180,7 +186,8 @@ def __init__(
180186

181187
# These are populated as side-effects of get_workunits_internal.
182188
self.databases: List[SnowflakeDatabase] = []
183-
self.aggregator: Optional[SqlParsingAggregator] = aggregator
189+
190+
self.aggregator = aggregator
184191

185192
def get_connection(self) -> SnowflakeConnection:
186193
return self.connection
@@ -212,6 +219,19 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
212219
self.report.set_ingestion_stage(snowflake_db.name, METADATA_EXTRACTION)
213220
yield from self._process_database(snowflake_db)
214221

222+
self.report.set_ingestion_stage("*", EXTERNAL_TABLE_DDL_LINEAGE)
223+
discovered_tables: List[str] = [
224+
self.identifiers.get_dataset_identifier(
225+
table_name, schema.name, db.name
226+
)
227+
for db in self.databases
228+
for schema in db.schemas
229+
for table_name in schema.tables
230+
]
231+
if self.aggregator:
232+
for entry in self._external_tables_ddl_lineage(discovered_tables):
233+
self.aggregator.add(entry)
234+
215235
except SnowflakePermissionError as e:
216236
self.structured_reporter.failure(
217237
GENERIC_PERMISSION_ERROR_KEY,
@@ -1082,3 +1102,33 @@ def get_fk_constraints_for_table(
10821102

10831103
# Access to table but none of its constraints - is this possible ?
10841104
return constraints.get(table_name, [])
1105+
1106+
# Handles the case for explicitly created external tables.
1107+
# NOTE: Snowflake does not log this information to the access_history table.
1108+
def _external_tables_ddl_lineage(
1109+
self, discovered_tables: List[str]
1110+
) -> Iterable[KnownLineageMapping]:
1111+
external_tables_query: str = SnowflakeQuery.show_external_tables()
1112+
try:
1113+
for db_row in self.connection.query(external_tables_query):
1114+
key = self.identifiers.get_dataset_identifier(
1115+
db_row["name"], db_row["schema_name"], db_row["database_name"]
1116+
)
1117+
1118+
if key not in discovered_tables:
1119+
continue
1120+
if db_row["location"].startswith("s3://"):
1121+
yield KnownLineageMapping(
1122+
upstream_urn=make_s3_urn_for_lineage(
1123+
db_row["location"], self.config.env
1124+
),
1125+
downstream_urn=self.identifiers.gen_dataset_urn(key),
1126+
)
1127+
self.report.num_external_table_edges_scanned += 1
1128+
1129+
self.report.num_external_table_edges_scanned += 1
1130+
except Exception as e:
1131+
self.structured_reporter.warning(
1132+
"External table ddl lineage extraction failed",
1133+
exc=e,
1134+
)

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py

+23-28
Original file line numberDiff line numberDiff line change
@@ -161,35 +161,32 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config):
161161
# For database, schema, tables, views, etc
162162
self.data_dictionary = SnowflakeDataDictionary(connection=self.connection)
163163
self.lineage_extractor: Optional[SnowflakeLineageExtractor] = None
164-
self.aggregator: Optional[SqlParsingAggregator] = None
165-
166-
if self.config.use_queries_v2 or self.config.include_table_lineage:
167-
self.aggregator = self._exit_stack.enter_context(
168-
SqlParsingAggregator(
169-
platform=self.identifiers.platform,
170-
platform_instance=self.config.platform_instance,
171-
env=self.config.env,
172-
graph=self.ctx.graph,
173-
eager_graph_load=(
174-
# If we're ingestion schema metadata for tables/views, then we will populate
175-
# schemas into the resolver as we go. We only need to do a bulk fetch
176-
# if we're not ingesting schema metadata as part of ingestion.
177-
not (
178-
self.config.include_technical_schema
179-
and self.config.include_tables
180-
and self.config.include_views
181-
)
182-
and not self.config.lazy_schema_resolver
183-
),
184-
generate_usage_statistics=False,
185-
generate_operations=False,
186-
format_queries=self.config.format_sql_queries,
187-
)
164+
165+
self.aggregator: SqlParsingAggregator = self._exit_stack.enter_context(
166+
SqlParsingAggregator(
167+
platform=self.identifiers.platform,
168+
platform_instance=self.config.platform_instance,
169+
env=self.config.env,
170+
graph=self.ctx.graph,
171+
eager_graph_load=(
172+
# If we're ingestion schema metadata for tables/views, then we will populate
173+
# schemas into the resolver as we go. We only need to do a bulk fetch
174+
# if we're not ingesting schema metadata as part of ingestion.
175+
not (
176+
self.config.include_technical_schema
177+
and self.config.include_tables
178+
and self.config.include_views
179+
)
180+
and not self.config.lazy_schema_resolver
181+
),
182+
generate_usage_statistics=False,
183+
generate_operations=False,
184+
format_queries=self.config.format_sql_queries,
188185
)
189-
self.report.sql_aggregator = self.aggregator.report
186+
)
187+
self.report.sql_aggregator = self.aggregator.report
190188

191189
if self.config.include_table_lineage:
192-
assert self.aggregator is not None
193190
redundant_lineage_run_skip_handler: Optional[
194191
RedundantLineageRunSkipHandler
195192
] = None
@@ -487,8 +484,6 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
487484

488485
databases = schema_extractor.databases
489486

490-
# TODO: The checkpoint state for stale entity detection can be committed here.
491-
492487
if self.config.shares:
493488
yield from SnowflakeSharesHandler(
494489
self.config, self.report

metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
USAGE_EXTRACTION_INGESTION = "Usage Extraction Ingestion"
1515
USAGE_EXTRACTION_OPERATIONAL_STATS = "Usage Extraction Operational Stats"
1616
USAGE_EXTRACTION_USAGE_AGGREGATION = "Usage Extraction Usage Aggregation"
17+
EXTERNAL_TABLE_DDL_LINEAGE = "External table DDL Lineage"
1718
QUERIES_EXTRACTION = "Queries Extraction"
1819
PROFILING = "Profiling"
1920

0 commit comments

Comments
 (0)