Skip to content

Commit 9cb631f

Browse files
committed
remove include_view_lineage flag
1 parent 97f6230 commit 9cb631f

15 files changed

+21
-73
lines changed

docs/how/updating-datahub.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
This file documents any backwards-incompatible changes in DataHub and assists people when migrating to a new version.
1818

1919
## Next
20-
20+
- #12179 - Config `include_view_lineage` is removed from snowflake ingestion source. View and External Table DDL lineage will always be ingested when definitions are available.
2121
- #11560 - The PowerBI ingestion source configuration option include_workspace_name_in_dataset_urn determines whether the workspace name is included in the PowerBI dataset's URN.<br/> PowerBI allows to have identical name of semantic model and their tables across the workspace, It will overwrite the semantic model in-case of multi-workspace ingestion.<br/>
2222
Entity urn with `include_workspace_name_in_dataset_urn: false`
2323

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py

+3-21
Original file line numberDiff line numberDiff line change
@@ -163,26 +163,12 @@ class SnowflakeConfig(
163163
default=True,
164164
description="If enabled, populates the snowflake table-to-table and s3-to-snowflake table lineage. Requires appropriate grants given to the role and Snowflake Enterprise Edition or above.",
165165
)
166-
include_view_lineage: bool = pydantic.Field(
167-
default=True,
168-
description="If enabled, populates the snowflake view->table and table->view lineages. Requires appropriate grants given to the role, and include_table_lineage to be True. view->table lineage requires Snowflake Enterprise Edition or above.",
169-
)
166+
167+
_include_view_lineage = pydantic_removed_field("include_view_lineage")
170168

171169
ignore_start_time_lineage: bool = False
172170
upstream_lineage_in_report: bool = False
173171

174-
@pydantic.root_validator(skip_on_failure=True)
175-
def validate_include_view_lineage(cls, values):
176-
if (
177-
"include_table_lineage" in values
178-
and not values.get("include_table_lineage")
179-
and values.get("include_view_lineage")
180-
):
181-
raise ValueError(
182-
"include_table_lineage must be True for include_view_lineage to be set."
183-
)
184-
return values
185-
186172

187173
class SnowflakeV2Config(
188174
SnowflakeConfig,
@@ -228,7 +214,7 @@ class SnowflakeV2Config(
228214
)
229215

230216
use_queries_v2: bool = Field(
231-
default=False,
217+
default=True,
232218
description="If enabled, uses the new queries extractor to extract queries from snowflake.",
233219
)
234220

@@ -355,10 +341,6 @@ def get_sql_alchemy_url(
355341
self, database=database, username=username, password=password, role=role
356342
)
357343

358-
@property
359-
def parse_view_ddl(self) -> bool:
360-
return self.include_view_column_lineage
361-
362344
@validator("shares")
363345
def validate_shares(
364346
cls, shares: Optional[Dict[str, SnowflakeShareConfig]], values: Dict

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py

+3-10
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
from datahub.configuration.datetimes import parse_absolute_time
1010
from datahub.ingestion.api.closeable import Closeable
11-
from datahub.ingestion.api.workunit import MetadataWorkUnit
1211
from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage
1312
from datahub.ingestion.source.snowflake.constants import (
1413
LINEAGE_PERMISSION_ERROR,
@@ -163,11 +162,11 @@ def get_time_window(self) -> Tuple[datetime, datetime]:
163162
self.config.end_time,
164163
)
165164

166-
def get_workunits(
165+
def add_time_based_lineage_to_aggregator(
167166
self,
168167
discovered_tables: List[str],
169168
discovered_views: List[str],
170-
) -> Iterable[MetadataWorkUnit]:
169+
) -> None:
171170
if not self._should_ingest_lineage():
172171
return
173172

@@ -177,9 +176,7 @@ def get_workunits(
177176
# snowflake view/table -> snowflake table
178177
self.populate_table_upstreams(discovered_tables)
179178

180-
for mcp in self.sql_aggregator.gen_metadata():
181-
yield mcp.as_workunit()
182-
179+
def update_state(self):
183180
if self.redundant_run_skip_handler:
184181
# Update the checkpoint state for this run.
185182
self.redundant_run_skip_handler.update_state(
@@ -337,10 +334,6 @@ def _fetch_upstream_lineages_for_tables(self) -> Iterable[UpstreamLineageEdge]:
337334
start_time_millis=int(self.start_time.timestamp() * 1000),
338335
end_time_millis=int(self.end_time.timestamp() * 1000),
339336
upstreams_deny_pattern=self.config.temporary_tables_pattern,
340-
# The self.config.include_view_lineage setting is about fetching upstreams of views.
341-
# We always generate lineage pointing at views from tables, even if self.config.include_view_lineage is False.
342-
# TODO: Remove this `include_view_lineage` flag, since it's effectively dead code.
343-
include_view_lineage=True,
344337
include_column_lineage=self.config.include_column_lineage,
345338
)
346339
try:

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py

-9
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,6 @@ def view_dependencies() -> str:
376376
def table_to_table_lineage_history_v2(
377377
start_time_millis: int,
378378
end_time_millis: int,
379-
include_view_lineage: bool = True,
380379
include_column_lineage: bool = True,
381380
upstreams_deny_pattern: List[str] = DEFAULT_TEMP_TABLES_PATTERNS,
382381
) -> str:
@@ -385,14 +384,12 @@ def table_to_table_lineage_history_v2(
385384
start_time_millis,
386385
end_time_millis,
387386
upstreams_deny_pattern,
388-
include_view_lineage,
389387
)
390388
else:
391389
return SnowflakeQuery.table_upstreams_only(
392390
start_time_millis,
393391
end_time_millis,
394392
upstreams_deny_pattern,
395-
include_view_lineage,
396393
)
397394

398395
@staticmethod
@@ -677,12 +674,9 @@ def table_upstreams_with_column_lineage(
677674
start_time_millis: int,
678675
end_time_millis: int,
679676
upstreams_deny_pattern: List[str],
680-
include_view_lineage: bool = True,
681677
) -> str:
682678
allowed_upstream_table_domains = (
683679
SnowflakeQuery.ACCESS_HISTORY_TABLE_VIEW_DOMAINS_FILTER
684-
if include_view_lineage
685-
else SnowflakeQuery.ACCESS_HISTORY_TABLE_DOMAINS_FILTER
686680
)
687681

688682
upstream_sql_filter = create_deny_regex_sql_filter(
@@ -847,12 +841,9 @@ def table_upstreams_only(
847841
start_time_millis: int,
848842
end_time_millis: int,
849843
upstreams_deny_pattern: List[str],
850-
include_view_lineage: bool = True,
851844
) -> str:
852845
allowed_upstream_table_domains = (
853846
SnowflakeQuery.ACCESS_HISTORY_TABLE_VIEW_DOMAINS_FILTER
854-
if include_view_lineage
855-
else SnowflakeQuery.ACCESS_HISTORY_TABLE_DOMAINS_FILTER
856847
)
857848

858849
upstream_sql_filter = create_deny_regex_sql_filter(

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py

+1-5
Original file line numberDiff line numberDiff line change
@@ -435,11 +435,7 @@ def _process_schema(
435435
)
436436

437437
if self.config.include_views:
438-
if (
439-
self.aggregator
440-
and self.config.include_view_lineage
441-
and self.config.parse_view_ddl
442-
):
438+
if self.aggregator:
443439
for view in views:
444440
view_identifier = self.identifiers.get_dataset_identifier(
445441
view.name, schema_name, db_name

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_shares.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def get_shares_workunits(
7272
assert len(sibling_dbs) == 1
7373
# SnowflakeLineageExtractor is unaware of database->schema->table hierarchy
7474
# hence this lineage code is not written in SnowflakeLineageExtractor
75-
# also this is not governed by configs include_table_lineage and include_view_lineage
75+
# also this is not governed by configs include_table_lineage
7676
yield self.get_upstream_lineage_with_primary_sibling(
7777
db.name, schema.name, table_name, sibling_dbs[0]
7878
)

metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py

+11-5
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
LINEAGE_EXTRACTION,
8383
METADATA_EXTRACTION,
8484
QUERIES_EXTRACTION,
85+
VIEW_PARSING,
8586
)
8687
from datahub.sql_parsing.sql_parsing_aggregator import SqlParsingAggregator
8788
from datahub.utilities.registries.domain_registry import DomainRegistry
@@ -103,7 +104,7 @@
103104
@capability(SourceCapability.DESCRIPTIONS, "Enabled by default")
104105
@capability(
105106
SourceCapability.LINEAGE_COARSE,
106-
"Enabled by default, can be disabled via configuration `include_table_lineage` and `include_view_lineage`",
107+
"Enabled by default, can be disabled via configuration `include_table_lineage`",
107108
)
108109
@capability(
109110
SourceCapability.LINEAGE_FINE,
@@ -512,8 +513,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
512513
discovered_datasets = discovered_tables + discovered_views
513514

514515
if self.config.use_queries_v2:
515-
self.report.set_ingestion_stage("*", "View Parsing")
516-
assert self.aggregator is not None
516+
self.report.set_ingestion_stage("*", VIEW_PARSING)
517517
yield from auto_workunit(self.aggregator.gen_metadata())
518518

519519
self.report.set_ingestion_stage("*", QUERIES_EXTRACTION)
@@ -546,13 +546,19 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
546546
queries_extractor.close()
547547

548548
else:
549-
if self.config.include_table_lineage and self.lineage_extractor:
549+
if self.lineage_extractor:
550550
self.report.set_ingestion_stage("*", LINEAGE_EXTRACTION)
551-
yield from self.lineage_extractor.get_workunits(
551+
self.lineage_extractor.add_time_based_lineage_to_aggregator(
552552
discovered_tables=discovered_tables,
553553
discovered_views=discovered_views,
554554
)
555555

556+
for mcp in self.aggregator.gen_metadata():
557+
yield mcp.as_workunit()
558+
559+
if self.lineage_extractor:
560+
self.lineage_extractor.update_state()
561+
556562
if (
557563
self.config.include_usage_stats or self.config.include_operational_stats
558564
) and self.usage_extractor:

metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
USAGE_EXTRACTION_OPERATIONAL_STATS = "Usage Extraction Operational Stats"
1616
USAGE_EXTRACTION_USAGE_AGGREGATION = "Usage Extraction Usage Aggregation"
1717
EXTERNAL_TABLE_DDL_LINEAGE = "External table DDL Lineage"
18+
VIEW_PARSING = "View Parsing"
1819
QUERIES_EXTRACTION = "Queries Extraction"
1920
PROFILING = "Profiling"
2021

metadata-ingestion/tests/integration/snowflake/common.py

-2
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,6 @@ def default_query_results( # noqa: C901
458458
snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2(
459459
start_time_millis=1654473600000,
460460
end_time_millis=1654586220000,
461-
include_view_lineage=True,
462461
include_column_lineage=True,
463462
),
464463
):
@@ -548,7 +547,6 @@ def default_query_results( # noqa: C901
548547
snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2(
549548
start_time_millis=1654473600000,
550549
end_time_millis=1654586220000,
551-
include_view_lineage=True,
552550
include_column_lineage=False,
553551
),
554552
):

metadata-ingestion/tests/integration/snowflake/test_snowflake.py

-2
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
117117
schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]),
118118
include_technical_schema=True,
119119
include_table_lineage=True,
120-
include_view_lineage=True,
121120
include_usage_stats=True,
122121
format_sql_queries=True,
123122
validate_upstreams_against_patterns=False,
@@ -216,7 +215,6 @@ def test_snowflake_private_link_and_incremental_mcps(
216215
include_table_lineage=True,
217216
include_column_lineage=False,
218217
include_views=True,
219-
include_view_lineage=True,
220218
include_usage_stats=False,
221219
format_sql_queries=True,
222220
incremental_lineage=False,

metadata-ingestion/tests/integration/snowflake/test_snowflake_classification.py

-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ def test_snowflake_classification_perf(num_workers, num_cols_per_table, num_tabl
6666
schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]),
6767
include_technical_schema=True,
6868
include_table_lineage=False,
69-
include_view_lineage=False,
7069
include_column_lineage=False,
7170
include_usage_stats=False,
7271
include_operational_stats=False,

metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py

-2
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ def snowflake_pipeline_config(tmp_path):
4949
include_technical_schema=True,
5050
match_fully_qualified_names=True,
5151
schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]),
52-
include_view_lineage=False,
5352
include_usage_stats=False,
5453
start_time=datetime(2022, 6, 6, 0, 0, 0, 0).replace(
5554
tzinfo=timezone.utc
@@ -227,7 +226,6 @@ def test_snowflake_missing_snowflake_lineage_permission_causes_pipeline_failure(
227226
snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2(
228227
start_time_millis=1654473600000,
229228
end_time_millis=1654586220000,
230-
include_view_lineage=True,
231229
include_column_lineage=True,
232230
)
233231
],

metadata-ingestion/tests/integration/snowflake/test_snowflake_tag.py

-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ def test_snowflake_tag_pattern():
3030
),
3131
include_technical_schema=True,
3232
include_table_lineage=False,
33-
include_view_lineage=False,
3433
include_column_lineage=False,
3534
include_usage_stats=False,
3635
include_operational_stats=False,
@@ -74,7 +73,6 @@ def test_snowflake_tag_pattern_deny():
7473
),
7574
include_technical_schema=True,
7675
include_table_lineage=False,
77-
include_view_lineage=False,
7876
include_column_lineage=False,
7977
include_usage_stats=False,
8078
include_operational_stats=False,

metadata-ingestion/tests/performance/snowflake/test_snowflake.py

-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ def run_test():
3737
password="TST_PWD",
3838
include_technical_schema=False,
3939
include_table_lineage=True,
40-
include_view_lineage=True,
4140
include_usage_stats=True,
4241
include_operational_stats=True,
4342
start_time=datetime(2022, 6, 6, 0, 0, 0, 0).replace(tzinfo=timezone.utc),

metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py

-11
Original file line numberDiff line numberDiff line change
@@ -256,17 +256,6 @@ def test_options_contain_connect_args():
256256
assert connect_args is not None
257257

258258

259-
def test_snowflake_config_with_view_lineage_no_table_lineage_throws_error():
260-
config_dict = default_config_dict.copy()
261-
config_dict["include_view_lineage"] = True
262-
config_dict["include_table_lineage"] = False
263-
with pytest.raises(
264-
ValidationError,
265-
match="include_table_lineage must be True for include_view_lineage to be set",
266-
):
267-
SnowflakeV2Config.parse_obj(config_dict)
268-
269-
270259
def test_snowflake_config_with_column_lineage_no_table_lineage_throws_error():
271260
config_dict = default_config_dict.copy()
272261
config_dict["include_column_lineage"] = True

0 commit comments

Comments
 (0)