diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator_v2.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py similarity index 85% rename from metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator_v2.py rename to metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py index cb23c9244cd862..9b7b3efa87d1fa 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator_v2.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py @@ -5,15 +5,18 @@ import logging import pathlib import tempfile +import uuid from collections import defaultdict from datetime import datetime, timezone -from typing import Callable, Dict, Iterable, List, Optional, Set, cast +from typing import Callable, Dict, Iterable, List, Optional, Set, Union, cast +import datahub.emitter.mce_builder as builder import datahub.metadata.schema_classes as models from datahub.emitter.mce_builder import get_sys_time, make_ts_millis from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.emitter.sql_parsing_builder import compute_upstream_fields from datahub.ingestion.api.report import Report +from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.graph.client import DataHubGraph from datahub.ingestion.source.usage.usage_common import BaseUsageConfig, UsageAggregator from datahub.metadata.urns import ( @@ -32,7 +35,7 @@ infer_output_schema, sqlglot_lineage, ) -from datahub.sql_parsing.sqlglot_utils import generate_hash +from datahub.sql_parsing.sqlglot_utils import generate_hash, get_query_fingerprint from datahub.utilities.file_backed_collections import ( ConnectionWrapper, FileBackedDict, @@ -57,8 +60,6 @@ class QueryLogSetting(enum.Enum): @dataclasses.dataclass class ViewDefinition: - # TODO view urn? - view_definition: str default_db: Optional[str] = None default_schema: Optional[str] = None @@ -95,6 +96,18 @@ def make_last_modified_audit_stamp(self) -> models.AuditStampClass: ) +@dataclasses.dataclass +class KnownQueryLineageInfo: + query_text: str + + downstream: UrnStr + upstreams: List[UrnStr] + column_lineage: Optional[List[ColumnLineageInfo]] = None + + timestamp: Optional[datetime] = None + query_type: QueryType = QueryType.UNKNOWN + + @dataclasses.dataclass class SqlAggregatorReport(Report): _aggregator: "SqlParsingAggregator" @@ -103,12 +116,16 @@ class SqlAggregatorReport(Report): num_observed_queries: int = 0 num_observed_queries_failed: int = 0 num_observed_queries_column_failed: int = 0 - observed_query_parse_failures = LossyList[str]() + observed_query_parse_failures: LossyList[str] = dataclasses.field( + default_factory=LossyList + ) num_view_definitions: int = 0 num_views_failed: int = 0 num_views_column_failed: int = 0 - views_parse_failures = LossyDict[UrnStr, str]() + views_parse_failures: LossyDict[UrnStr, str] = dataclasses.field( + default_factory=LossyDict + ) num_queries_with_temp_tables_in_session: int = 0 @@ -142,8 +159,8 @@ def __init__( self, *, platform: str, - platform_instance: Optional[str], - env: str, + platform_instance: Optional[str] = None, + env: str = builder.DEFAULT_ENV, graph: Optional[DataHubGraph] = None, generate_lineage: bool = True, generate_queries: bool = True, @@ -246,7 +263,7 @@ def _need_schemas(self) -> bool: return self.generate_lineage or self.generate_usage_statistics def register_schema( - self, urn: DatasetUrn, schema: models.SchemaMetadataClass + self, urn: Union[str, DatasetUrn], schema: models.SchemaMetadataClass ) -> None: # If lineage or usage is enabled, adds the schema to the schema resolver # by putting the condition in here, we can avoid all the conditional @@ -255,6 +272,16 @@ def register_schema( if self._need_schemas: self._schema_resolver.add_schema_metadata(str(urn), schema) + def register_schemas_from_stream( + self, stream: Iterable[MetadataWorkUnit] + ) -> Iterable[MetadataWorkUnit]: + for wu in stream: + schema_metadata = wu.get_aspect_of_type(models.SchemaMetadataClass) + if schema_metadata: + self.register_schema(wu.get_urn(), schema_metadata) + + yield wu + def _initialize_schema_resolver_from_graph(self, graph: DataHubGraph) -> None: # requires a graph instance # if no schemas are currently registered in the schema resolver @@ -284,6 +311,96 @@ def _initialize_schema_resolver_from_graph(self, graph: DataHubGraph) -> None: env=self.env, ) + def add_known_query_lineage( + self, known_query_lineage: KnownQueryLineageInfo + ) -> None: + """Add a query and it's precomputed lineage to the aggregator. + + This is useful for cases where we have lineage information that was + computed outside of the SQL parsing aggregator, e.g. from a data + warehouse's system tables. + + This will also generate an operation aspect for the query if there is + a timestamp and the query type field is set to a mutation type. + + Args: + known_query_lineage: The known query lineage information. + """ + + # Generate a fingerprint for the query. + query_fingerprint = get_query_fingerprint( + known_query_lineage.query_text, self.platform.platform_name + ) + # TODO format the query text? + + # Register the query. + self._add_to_query_map( + QueryMetadata( + query_id=query_fingerprint, + formatted_query_string=known_query_lineage.query_text, + session_id=_MISSING_SESSION_ID, + query_type=known_query_lineage.query_type, + lineage_type=models.DatasetLineageTypeClass.TRANSFORMED, + latest_timestamp=known_query_lineage.timestamp, + actor=None, + upstreams=known_query_lineage.upstreams, + column_lineage=known_query_lineage.column_lineage or [], + confidence_score=1.0, + ) + ) + + # Register the lineage. + self._lineage_map.for_mutation( + known_query_lineage.downstream, OrderedSet() + ).add(query_fingerprint) + + def add_known_lineage_mapping( + self, + upstream_urn: UrnStr, + downstream_urn: UrnStr, + lineage_type: str = models.DatasetLineageTypeClass.COPY, + ) -> None: + """Add a known lineage mapping to the aggregator. + + By mapping, we mean that the downstream is effectively a copy or + alias of the upstream. This is useful for things like external tables + (e.g. Redshift Spectrum, Redshift UNLOADs, Snowflake external tables). + + Because this method takes in urns, it does not require that the urns + are part of the platform that the aggregator is configured for. + + TODO: In the future, this method will also generate CLL if we have + schemas for either the upstream or downstream. + + The known lineage mapping does not contribute to usage statistics or operations. + + Args: + upstream_urn: The upstream dataset URN. + downstream_urn: The downstream dataset URN. + """ + + # We generate a fake "query" object to hold the lineage. + query_id = self._known_lineage_query_id() + + # Register the query. + self._add_to_query_map( + QueryMetadata( + query_id=query_id, + formatted_query_string="-skip-", + session_id=_MISSING_SESSION_ID, + query_type=QueryType.UNKNOWN, + lineage_type=lineage_type, + latest_timestamp=None, + actor=None, + upstreams=[upstream_urn], + column_lineage=[], + confidence_score=1.0, + ) + ) + + # Register the lineage. + self._lineage_map.for_mutation(downstream_urn, OrderedSet()).add(query_id) + def add_view_definition( self, view_urn: DatasetUrn, @@ -449,6 +566,10 @@ def _make_schema_resolver_for_session( def _process_view_definition( self, view_urn: UrnStr, view_definition: ViewDefinition ) -> None: + # Note that in some cases, the view definition will be a SELECT statement + # instead of a CREATE VIEW ... AS SELECT statement. In those cases, we can't + # trust the parsed query type or downstream urn. + # Run the SQL parser. parsed = self._run_sql_parser( view_definition.view_definition, @@ -464,10 +585,6 @@ def _process_view_definition( elif parsed.debug_info.error: self.report.num_views_column_failed += 1 - # Note that in some cases, the view definition will be a SELECT statement - # instead of a CREATE VIEW ... AS SELECT statement. In those cases, we can't - # trust the parsed query type or downstream urn. - query_fingerprint = self._view_query_id(view_urn) # Register the query. @@ -540,15 +657,6 @@ def _add_to_query_map(self, new: QueryMetadata) -> None: else: self._query_map[query_fingerprint] = new - """ - def add_lineage(self) -> None: - # A secondary mechanism for adding non-SQL-based lineage - # e.g. redshift external tables might use this when pointing at s3 - - # TODO Add this once we have a use case for it - pass - """ - def gen_metadata(self) -> Iterable[MetadataChangeProposalWrapper]: # diff from v1 - we generate operations here, and it also # generates MCPWs instead of workunits @@ -569,7 +677,7 @@ def _gen_lineage_mcps(self) -> Iterable[MetadataChangeProposalWrapper]: # Generate lineage and queries. queries_generated: Set[QueryId] = set() - for downstream_urn in self._lineage_map: + for downstream_urn in sorted(self._lineage_map): yield from self._gen_lineage_for_downstream( downstream_urn, queries_generated=queries_generated ) @@ -640,7 +748,9 @@ def _gen_lineage_for_downstream( dataset=upstream_urn, type=queries_map[query_id].lineage_type, query=( - self._query_urn(query_id) if self.generate_queries else None + self._query_urn(query_id) + if self.can_generate_query(query_id) + else None ), created=query.make_created_audit_stamp(), auditStamp=models.AuditStampClass( @@ -671,7 +781,9 @@ def _gen_lineage_for_downstream( SchemaFieldUrn(downstream_urn, downstream_column).urn() ], query=( - self._query_urn(query_id) if self.generate_queries else None + self._query_urn(query_id) + if self.can_generate_query(query_id) + else None ), confidenceScore=queries_map[query_id].confidence_score, ) @@ -682,9 +794,10 @@ def _gen_lineage_for_downstream( aspect=upstream_aspect, ) - if not self.generate_queries: - return for query_id in required_queries: + if not self.can_generate_query(query_id): + continue + # Avoid generating the same query twice. if query_id in queries_generated: continue @@ -696,6 +809,7 @@ def _gen_lineage_for_downstream( entityUrn=self._query_urn(query_id), aspects=[ models.QueryPropertiesClass( + dataPlatform=self.platform.urn(), statement=models.QueryStatementClass( value=query.formatted_query_string, language=models.QueryLanguageClass.SQL, @@ -729,6 +843,19 @@ def _composite_query_id(cls, composed_of_queries: Iterable[QueryId]) -> str: def _view_query_id(cls, view_urn: UrnStr) -> str: return f"view_{DatasetUrn.url_encode(view_urn)}" + @classmethod + def _known_lineage_query_id(cls) -> str: + return f"known_{uuid.uuid4()}" + + @classmethod + def _is_known_lineage_query_id(cls, query_id: QueryId) -> bool: + # Our query fingerprints are hex and won't have underscores, so this will + # never conflict with a real query fingerprint. + return query_id.startswith("known_") + + def can_generate_query(self, query_id: QueryId) -> bool: + return self.generate_queries and not self._is_known_lineage_query_id(query_id) + def _resolve_query_with_temp_tables( self, base_query: QueryMetadata, @@ -895,8 +1022,10 @@ def _gen_operation_for_downstream( operationType=operation_type, lastUpdatedTimestamp=make_ts_millis(query.latest_timestamp), actor=query.actor.urn() if query.actor else None, - customProperties={ - "query_urn": self._query_urn(query_id), - }, + customProperties=( + {"query_urn": self._query_urn(query_id)} + if self.can_generate_query(query_id) + else None + ), ) yield MetadataChangeProposalWrapper(entityUrn=downstream_urn, aspect=aspect) diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py index 587394cc14646a..44337f1070140a 100644 --- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py +++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py @@ -1,8 +1,11 @@ import hashlib +import logging from typing import Dict, Iterable, Optional, Union import sqlglot +import sqlglot.errors +logger = logging.getLogger(__name__) DialectOrStr = Union[sqlglot.Dialect, str] @@ -139,10 +142,17 @@ def get_query_fingerprint( The fingerprint for the SQL query. """ - dialect = get_dialect(dialect) - expression_sql = generalize_query(expression, dialect=dialect) - fingerprint = generate_hash(expression_sql) + try: + dialect = get_dialect(dialect) + expression_sql = generalize_query(expression, dialect=dialect) + except (ValueError, sqlglot.errors.SqlglotError) as e: + if not isinstance(expression, str): + raise + logger.debug("Failed to generalize query for fingerprinting: %s", e) + expression_sql = expression + + fingerprint = generate_hash(expression_sql) return fingerprint diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_add_known_query_lineage.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_add_known_query_lineage.json new file mode 100644 index 00000000000000..20bd08ce4c8235 --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_add_known_query_lineage.json @@ -0,0 +1,127 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 20000, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),a)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),a)" + ], + "confidenceScore": 1.0, + "query": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),b)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),b)" + ], + "confidenceScore": 1.0, + "query": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),c)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),c)" + ], + "confidenceScore": 1.0, + "query": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "insert into foo (a, b, c) select a, b, c from bar", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataPlatform": "urn:li:dataPlatform:redshift" + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)" + } + ] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1707182625000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "operationType": "INSERT", + "customProperties": { + "query_urn": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae" + }, + "lastUpdatedTimestamp": 20000 + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_aggregate_operations.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_aggregate_operations.json index 551760b42394cb..25e75317096df6 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_aggregate_operations.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_aggregate_operations.json @@ -13,9 +13,6 @@ }, "actor": "urn:li:corpuser:user2", "operationType": "CREATE", - "customProperties": { - "query_urn": "urn:li:query:cbdb3e148ea7fdae81815da4dd64f57873fb9c3d7d4bfad4e83b3d1ebd3c45c2" - }, "lastUpdatedTimestamp": 25000 } } @@ -34,9 +31,6 @@ }, "actor": "urn:li:corpuser:user3", "operationType": "CREATE", - "customProperties": { - "query_urn": "urn:li:query:7fd78ed5f3d60f7f91206f5e0fea6851a2afe940944455fd292267613b7ee1e6" - }, "lastUpdatedTimestamp": 26000 } } diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_basic_lineage.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_basic_lineage.json index 5eaeb4e9839254..9bc67f0edfee7c 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_basic_lineage.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_basic_lineage.json @@ -67,9 +67,10 @@ "actor": "urn:li:corpuser:_ingestion" }, "lastModified": { - "time": 0, + "time": 1707182625000, "actor": "urn:li:corpuser:_ingestion" - } + }, + "dataPlatform": "urn:li:dataPlatform:redshift" } } }, diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_column_lineage_deduplication.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_column_lineage_deduplication.json new file mode 100644 index 00000000000000..303641c1503553 --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_column_lineage_deduplication.json @@ -0,0 +1,149 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:fc48287a96588c73bcbdc1400f0c036b8d81196135618fb09a097459d54bd970" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),a)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),a)" + ], + "confidenceScore": 0.2, + "query": "urn:li:query:fc48287a96588c73bcbdc1400f0c036b8d81196135618fb09a097459d54bd970" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),b)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),b)" + ], + "confidenceScore": 0.2, + "query": "urn:li:query:fc48287a96588c73bcbdc1400f0c036b8d81196135618fb09a097459d54bd970" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),c)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),c)" + ], + "confidenceScore": 0.2, + "query": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:fc48287a96588c73bcbdc1400f0c036b8d81196135618fb09a097459d54bd970", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "/* query 2 */ insert into foo (a, b) select a, b from bar", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataPlatform": "urn:li:dataPlatform:redshift" + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:fc48287a96588c73bcbdc1400f0c036b8d81196135618fb09a097459d54bd970", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "/* query 1 */ insert into foo (a, b, c) select a, b, c from bar", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataPlatform": "urn:li:dataPlatform:redshift" + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)" + } + ] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_known_lineage_mapping.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_known_lineage_mapping.json new file mode 100644 index 00000000000000..ab210c6f701b3f --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_known_lineage_mapping.json @@ -0,0 +1,77 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:s3,bucket1/key1,PROD)", + "type": "COPY" + } + ], + "fineGrainedLineages": [] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)", + "type": "COPY" + } + ], + "fineGrainedLineages": [] + } + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,bucket2/key2,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)", + "type": "COPY" + } + ], + "fineGrainedLineages": [] + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts.json index 27bd757c267b73..2e5404992245ce 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts.json @@ -89,30 +89,31 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:377a73bbf094c8b176b15157c24242cdfc7a0f407d78e52e63ded08c913468f1", + "entityUrn": "urn:li:query:c4b3a21ef8c262ebbe99a5bdb6c29cb0be646392bb4af10b6f4a758af881470e", "changeType": "UPSERT", "aspectName": "queryProperties", "aspect": { "json": { "statement": { - "value": "insert into downstream (a, b) select a, b from upstream1", + "value": "insert into downstream (a, c) select a, c from upstream2", "language": "SQL" }, "source": "SYSTEM", "created": { - "time": 0, + "time": 25000, "actor": "urn:li:corpuser:_ingestion" }, "lastModified": { - "time": 0, + "time": 25000, "actor": "urn:li:corpuser:_ingestion" - } + }, + "dataPlatform": "urn:li:dataPlatform:redshift" } } }, { "entityType": "query", - "entityUrn": "urn:li:query:377a73bbf094c8b176b15157c24242cdfc7a0f407d78e52e63ded08c913468f1", + "entityUrn": "urn:li:query:c4b3a21ef8c262ebbe99a5bdb6c29cb0be646392bb4af10b6f4a758af881470e", "changeType": "UPSERT", "aspectName": "querySubjects", "aspect": { @@ -122,7 +123,7 @@ "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.downstream,PROD)" }, { - "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream1,PROD)" + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream2,PROD)" } ] } @@ -130,30 +131,31 @@ }, { "entityType": "query", - "entityUrn": "urn:li:query:c4b3a21ef8c262ebbe99a5bdb6c29cb0be646392bb4af10b6f4a758af881470e", + "entityUrn": "urn:li:query:377a73bbf094c8b176b15157c24242cdfc7a0f407d78e52e63ded08c913468f1", "changeType": "UPSERT", "aspectName": "queryProperties", "aspect": { "json": { "statement": { - "value": "insert into downstream (a, c) select a, c from upstream2", + "value": "insert into downstream (a, b) select a, b from upstream1", "language": "SQL" }, "source": "SYSTEM", "created": { - "time": 0, + "time": 20000, "actor": "urn:li:corpuser:_ingestion" }, "lastModified": { - "time": 0, + "time": 20000, "actor": "urn:li:corpuser:_ingestion" - } + }, + "dataPlatform": "urn:li:dataPlatform:redshift" } } }, { "entityType": "query", - "entityUrn": "urn:li:query:c4b3a21ef8c262ebbe99a5bdb6c29cb0be646392bb4af10b6f4a758af881470e", + "entityUrn": "urn:li:query:377a73bbf094c8b176b15157c24242cdfc7a0f407d78e52e63ded08c913468f1", "changeType": "UPSERT", "aspectName": "querySubjects", "aspect": { @@ -163,7 +165,7 @@ "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.downstream,PROD)" }, { - "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream2,PROD)" + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream1,PROD)" } ] } diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_temp_table.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_temp_table.json index 31a37d6237e7b9..c4a20610d344a3 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_temp_table.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_temp_table.json @@ -1,7 +1,7 @@ [ { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_session3,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)", "changeType": "UPSERT", "aspectName": "upstreamLineage", "aspect": { @@ -16,24 +16,49 @@ "time": 0, "actor": "urn:li:corpuser:_ingestion" }, - "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)", + "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)", "type": "TRANSFORMED", - "query": "urn:li:query:3e85e6f353c7fa33d6514cb090482852064d23df6491c9a8ae28be0d990a3c71" + "query": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1" } ], - "fineGrainedLineages": [] + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),a)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),a)" + ], + "confidenceScore": 0.35, + "query": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1" + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),b)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),b)" + ], + "confidenceScore": 0.35, + "query": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1" + } + ] } } }, { "entityType": "query", - "entityUrn": "urn:li:query:3e85e6f353c7fa33d6514cb090482852064d23df6491c9a8ae28be0d990a3c71", + "entityUrn": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1", "changeType": "UPSERT", "aspectName": "queryProperties", "aspect": { "json": { "statement": { - "value": "create table foo_session3 as select * from foo", + "value": "create table foo as select a, 2*b as b from bar", "language": "SQL" }, "source": "SYSTEM", @@ -42,25 +67,26 @@ "actor": "urn:li:corpuser:_ingestion" }, "lastModified": { - "time": 0, + "time": 1707182625000, "actor": "urn:li:corpuser:_ingestion" - } + }, + "dataPlatform": "urn:li:dataPlatform:redshift" } } }, { "entityType": "query", - "entityUrn": "urn:li:query:3e85e6f353c7fa33d6514cb090482852064d23df6491c9a8ae28be0d990a3c71", + "entityUrn": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1", "changeType": "UPSERT", "aspectName": "querySubjects", "aspect": { "json": { "subjects": [ { - "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_session3,PROD)" + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)" }, { - "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)" + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)" } ] } @@ -135,15 +161,34 @@ "actor": "urn:li:corpuser:_ingestion" }, "lastModified": { - "time": 1707251710392, + "time": 1707182625000, "actor": "urn:li:corpuser:_ingestion" - } + }, + "dataPlatform": "urn:li:dataPlatform:redshift" + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:composite_66ddf44283e4543440529f1d13b82221b5d60635b6a8c39751718049ce4f47ec", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_session2,PROD)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)" + } + ] } } }, { "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_session3,PROD)", "changeType": "UPSERT", "aspectName": "upstreamLineage", "aspect": { @@ -158,67 +203,24 @@ "time": 0, "actor": "urn:li:corpuser:_ingestion" }, - "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)", + "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)", "type": "TRANSFORMED", - "query": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1" + "query": "urn:li:query:3e85e6f353c7fa33d6514cb090482852064d23df6491c9a8ae28be0d990a3c71" } ], - "fineGrainedLineages": [ - { - "upstreamType": "FIELD_SET", - "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),a)" - ], - "downstreamType": "FIELD", - "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),a)" - ], - "confidenceScore": 0.35, - "query": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1" - }, - { - "upstreamType": "FIELD_SET", - "upstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),b)" - ], - "downstreamType": "FIELD", - "downstreams": [ - "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),b)" - ], - "confidenceScore": 0.35, - "query": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1" - } - ] - } - } -}, -{ - "entityType": "query", - "entityUrn": "urn:li:query:composite_66ddf44283e4543440529f1d13b82221b5d60635b6a8c39751718049ce4f47ec", - "changeType": "UPSERT", - "aspectName": "querySubjects", - "aspect": { - "json": { - "subjects": [ - { - "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_session2,PROD)" - }, - { - "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)" - } - ] + "fineGrainedLineages": [] } } }, { "entityType": "query", - "entityUrn": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1", + "entityUrn": "urn:li:query:3e85e6f353c7fa33d6514cb090482852064d23df6491c9a8ae28be0d990a3c71", "changeType": "UPSERT", "aspectName": "queryProperties", "aspect": { "json": { "statement": { - "value": "create table foo as select a, 2*b as b from bar", + "value": "create table foo_session3 as select * from foo", "language": "SQL" }, "source": "SYSTEM", @@ -227,25 +229,26 @@ "actor": "urn:li:corpuser:_ingestion" }, "lastModified": { - "time": 0, + "time": 1707182625000, "actor": "urn:li:corpuser:_ingestion" - } + }, + "dataPlatform": "urn:li:dataPlatform:redshift" } } }, { "entityType": "query", - "entityUrn": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1", + "entityUrn": "urn:li:query:3e85e6f353c7fa33d6514cb090482852064d23df6491c9a8ae28be0d990a3c71", "changeType": "UPSERT", "aspectName": "querySubjects", "aspect": { "json": { "subjects": [ { - "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)" + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_session3,PROD)" }, { - "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)" + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)" } ] } diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_view_lineage.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_view_lineage.json index 3f8fa7e5a1e282..872b5a41d4288c 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_view_lineage.json +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_view_lineage.json @@ -69,7 +69,8 @@ "lastModified": { "time": 1707182625000, "actor": "urn:li:corpuser:_ingestion" - } + }, + "dataPlatform": "urn:li:dataPlatform:redshift" } } }, diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py index 8b0318664ea05f..5b51266c692b76 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py @@ -4,12 +4,14 @@ import pytest from freezegun import freeze_time -import datahub.emitter.mce_builder as builder from datahub.metadata.urns import CorpUserUrn, DatasetUrn -from datahub.sql_parsing.sql_parsing_aggregator_v2 import ( +from datahub.sql_parsing.sql_parsing_aggregator import ( + KnownQueryLineageInfo, QueryLogSetting, SqlParsingAggregator, ) +from datahub.sql_parsing.sql_parsing_common import QueryType +from datahub.sql_parsing.sqlglot_lineage import ColumnLineageInfo, ColumnRef from tests.test_helpers import mce_helpers RESOURCE_DIR = pathlib.Path(__file__).parent / "aggregator_goldens" @@ -24,8 +26,6 @@ def _ts(ts: int) -> datetime: def test_basic_lineage(pytestconfig: pytest.Config) -> None: aggregator = SqlParsingAggregator( platform="redshift", - platform_instance=None, - env=builder.DEFAULT_ENV, generate_lineage=True, generate_usage_statistics=False, generate_operations=False, @@ -50,8 +50,6 @@ def test_basic_lineage(pytestconfig: pytest.Config) -> None: def test_overlapping_inserts(pytestconfig: pytest.Config) -> None: aggregator = SqlParsingAggregator( platform="redshift", - platform_instance=None, - env=builder.DEFAULT_ENV, generate_lineage=True, generate_usage_statistics=False, generate_operations=False, @@ -83,8 +81,6 @@ def test_overlapping_inserts(pytestconfig: pytest.Config) -> None: def test_temp_table(pytestconfig: pytest.Config) -> None: aggregator = SqlParsingAggregator( platform="redshift", - platform_instance=None, - env=builder.DEFAULT_ENV, generate_lineage=True, generate_usage_statistics=False, generate_operations=False, @@ -136,8 +132,6 @@ def test_temp_table(pytestconfig: pytest.Config) -> None: def test_aggregate_operations(pytestconfig: pytest.Config) -> None: aggregator = SqlParsingAggregator( platform="redshift", - platform_instance=None, - env=builder.DEFAULT_ENV, generate_lineage=False, generate_queries=False, generate_usage_statistics=False, @@ -181,8 +175,6 @@ def test_aggregate_operations(pytestconfig: pytest.Config) -> None: def test_view_lineage(pytestconfig: pytest.Config) -> None: aggregator = SqlParsingAggregator( platform="redshift", - platform_instance=None, - env=builder.DEFAULT_ENV, generate_lineage=True, generate_usage_statistics=False, generate_operations=False, @@ -215,3 +207,113 @@ def test_view_lineage(pytestconfig: pytest.Config) -> None: outputs=mcps, golden_path=RESOURCE_DIR / "test_view_lineage.json", ) + + +@freeze_time(FROZEN_TIME) +def test_known_lineage_mapping(pytestconfig: pytest.Config) -> None: + aggregator = SqlParsingAggregator( + platform="redshift", + generate_lineage=True, + generate_usage_statistics=False, + generate_operations=False, + ) + + aggregator.add_known_lineage_mapping( + upstream_urn=DatasetUrn("redshift", "dev.public.bar").urn(), + downstream_urn=DatasetUrn("redshift", "dev.public.foo").urn(), + ) + aggregator.add_known_lineage_mapping( + upstream_urn=DatasetUrn("s3", "bucket1/key1").urn(), + downstream_urn=DatasetUrn("redshift", "dev.public.bar").urn(), + ) + aggregator.add_known_lineage_mapping( + upstream_urn=DatasetUrn("redshift", "dev.public.foo").urn(), + downstream_urn=DatasetUrn("s3", "bucket2/key2").urn(), + ) + + mcps = list(aggregator.gen_metadata()) + + mce_helpers.check_goldens_stream( + pytestconfig, + outputs=mcps, + golden_path=RESOURCE_DIR / "test_known_lineage_mapping.json", + ) + + +@freeze_time(FROZEN_TIME) +def test_column_lineage_deduplication(pytestconfig: pytest.Config) -> None: + aggregator = SqlParsingAggregator( + platform="redshift", + generate_lineage=True, + generate_usage_statistics=False, + generate_operations=False, + ) + + aggregator.add_observed_query( + query="/* query 1 */ insert into foo (a, b, c) select a, b, c from bar", + default_db="dev", + default_schema="public", + ) + aggregator.add_observed_query( + query="/* query 2 */ insert into foo (a, b) select a, b from bar", + default_db="dev", + default_schema="public", + ) + + mcps = list(aggregator.gen_metadata()) + + # In this case, the lineage for a and b is attributed to query 2, and + # the lineage for c is attributed to query 1. Note that query 1 does + # not get any credit for a and b, as they are already covered by query 2, + # which came later and hence has higher precedence. + + mce_helpers.check_goldens_stream( + pytestconfig, + outputs=mcps, + golden_path=RESOURCE_DIR / "test_column_lineage_deduplication.json", + ) + + +@freeze_time(FROZEN_TIME) +def test_add_known_query_lineage(pytestconfig: pytest.Config) -> None: + aggregator = SqlParsingAggregator( + platform="redshift", + generate_lineage=True, + generate_usage_statistics=False, + generate_operations=True, + ) + + downstream_urn = DatasetUrn("redshift", "dev.public.foo").urn() + upstream_urn = DatasetUrn("redshift", "dev.public.bar").urn() + + known_query_lineage = KnownQueryLineageInfo( + query_text="insert into foo (a, b, c) select a, b, c from bar", + downstream=downstream_urn, + upstreams=[upstream_urn], + column_lineage=[ + ColumnLineageInfo( + downstream=ColumnRef(table=downstream_urn, column="a"), + upstreams=[ColumnRef(table=upstream_urn, column="a")], + ), + ColumnLineageInfo( + downstream=ColumnRef(table=downstream_urn, column="b"), + upstreams=[ColumnRef(table=upstream_urn, column="b")], + ), + ColumnLineageInfo( + downstream=ColumnRef(table=downstream_urn, column="c"), + upstreams=[ColumnRef(table=upstream_urn, column="c")], + ), + ], + timestamp=_ts(20), + query_type=QueryType.INSERT, + ) + + aggregator.add_known_query_lineage(known_query_lineage) + + mcps = list(aggregator.gen_metadata()) + + mce_helpers.check_goldens_stream( + pytestconfig, + outputs=mcps, + golden_path=RESOURCE_DIR / "test_add_known_query_lineage.json", + )