From 64cbd41f909bbdf240e7e64abab9eb33b9133edf Mon Sep 17 00:00:00 2001
From: Harshal Sheth <hsheth2@gmail.com>
Date: Tue, 13 Feb 2024 22:24:37 -0800
Subject: [PATCH 1/6] remove v2 suffix from aggregator filename

---
 ...ggregator_v2.py => sql_parsing_aggregator.py} | 16 ++++++++++++++--
 .../unit/sql_parsing/test_sql_aggregator.py      |  2 +-
 2 files changed, 15 insertions(+), 3 deletions(-)
 rename metadata-ingestion/src/datahub/sql_parsing/{sql_parsing_aggregator_v2.py => sql_parsing_aggregator.py} (98%)

diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator_v2.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py
similarity index 98%
rename from metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator_v2.py
rename to metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py
index cb23c9244cd86..2384cd3c9d83c 100644
--- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator_v2.py
+++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py
@@ -7,13 +7,14 @@
 import tempfile
 from collections import defaultdict
 from datetime import datetime, timezone
-from typing import Callable, Dict, Iterable, List, Optional, Set, cast
+from typing import Callable, Dict, Iterable, List, Optional, Set, Union, cast
 
 import datahub.metadata.schema_classes as models
 from datahub.emitter.mce_builder import get_sys_time, make_ts_millis
 from datahub.emitter.mcp import MetadataChangeProposalWrapper
 from datahub.emitter.sql_parsing_builder import compute_upstream_fields
 from datahub.ingestion.api.report import Report
+from datahub.ingestion.api.workunit import MetadataWorkUnit
 from datahub.ingestion.graph.client import DataHubGraph
 from datahub.ingestion.source.usage.usage_common import BaseUsageConfig, UsageAggregator
 from datahub.metadata.urns import (
@@ -246,7 +247,7 @@ def _need_schemas(self) -> bool:
         return self.generate_lineage or self.generate_usage_statistics
 
     def register_schema(
-        self, urn: DatasetUrn, schema: models.SchemaMetadataClass
+        self, urn: Union[str, DatasetUrn], schema: models.SchemaMetadataClass
     ) -> None:
         # If lineage or usage is enabled, adds the schema to the schema resolver
         # by putting the condition in here, we can avoid all the conditional
@@ -255,6 +256,16 @@ def register_schema(
         if self._need_schemas:
             self._schema_resolver.add_schema_metadata(str(urn), schema)
 
+    def register_schemas_from_stream(
+        self, stream: Iterable[MetadataWorkUnit]
+    ) -> Iterable[MetadataWorkUnit]:
+        for wu in stream:
+            schema_metadata = wu.get_aspect_of_type(models.SchemaMetadataClass)
+            if schema_metadata:
+                self.register_schema(wu.get_urn(), schema_metadata)
+
+            yield wu
+
     def _initialize_schema_resolver_from_graph(self, graph: DataHubGraph) -> None:
         # requires a graph instance
         # if no schemas are currently registered in the schema resolver
@@ -696,6 +707,7 @@ def _gen_lineage_for_downstream(
                 entityUrn=self._query_urn(query_id),
                 aspects=[
                     models.QueryPropertiesClass(
+                        dataPlatform=self.platform.urn(),
                         statement=models.QueryStatementClass(
                             value=query.formatted_query_string,
                             language=models.QueryLanguageClass.SQL,
diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py
index 8b0318664ea05..ddc1b4c0c2a60 100644
--- a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py
+++ b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py
@@ -6,7 +6,7 @@
 
 import datahub.emitter.mce_builder as builder
 from datahub.metadata.urns import CorpUserUrn, DatasetUrn
-from datahub.sql_parsing.sql_parsing_aggregator_v2 import (
+from datahub.sql_parsing.sql_parsing_aggregator import (
     QueryLogSetting,
     SqlParsingAggregator,
 )

From ca5e283fb150bd3c0f5adca719d232860d435934 Mon Sep 17 00:00:00 2001
From: Harshal Sheth <hsheth2@gmail.com>
Date: Wed, 14 Feb 2024 15:19:37 -0800
Subject: [PATCH 2/6] Add data platform urn to queries

---
 .../aggregator_goldens/test_basic_lineage.json    |  5 +++--
 .../test_overlapping_inserts.json                 | 14 ++++++++------
 .../aggregator_goldens/test_temp_table.json       | 15 +++++++++------
 .../aggregator_goldens/test_view_lineage.json     |  3 ++-
 4 files changed, 22 insertions(+), 15 deletions(-)

diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_basic_lineage.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_basic_lineage.json
index 5eaeb4e983925..9bc67f0edfee7 100644
--- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_basic_lineage.json
+++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_basic_lineage.json
@@ -67,9 +67,10 @@
                 "actor": "urn:li:corpuser:_ingestion"
             },
             "lastModified": {
-                "time": 0,
+                "time": 1707182625000,
                 "actor": "urn:li:corpuser:_ingestion"
-            }
+            },
+            "dataPlatform": "urn:li:dataPlatform:redshift"
         }
     }
 },
diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts.json
index 27bd757c267b7..3af93226eced9 100644
--- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts.json
+++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts.json
@@ -100,13 +100,14 @@
             },
             "source": "SYSTEM",
             "created": {
-                "time": 0,
+                "time": 20000,
                 "actor": "urn:li:corpuser:_ingestion"
             },
             "lastModified": {
-                "time": 0,
+                "time": 20000,
                 "actor": "urn:li:corpuser:_ingestion"
-            }
+            },
+            "dataPlatform": "urn:li:dataPlatform:redshift"
         }
     }
 },
@@ -141,13 +142,14 @@
             },
             "source": "SYSTEM",
             "created": {
-                "time": 0,
+                "time": 25000,
                 "actor": "urn:li:corpuser:_ingestion"
             },
             "lastModified": {
-                "time": 0,
+                "time": 25000,
                 "actor": "urn:li:corpuser:_ingestion"
-            }
+            },
+            "dataPlatform": "urn:li:dataPlatform:redshift"
         }
     }
 },
diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_temp_table.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_temp_table.json
index 31a37d6237e7b..2c485a5ea945f 100644
--- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_temp_table.json
+++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_temp_table.json
@@ -42,9 +42,10 @@
                 "actor": "urn:li:corpuser:_ingestion"
             },
             "lastModified": {
-                "time": 0,
+                "time": 1707182625000,
                 "actor": "urn:li:corpuser:_ingestion"
-            }
+            },
+            "dataPlatform": "urn:li:dataPlatform:redshift"
         }
     }
 },
@@ -135,9 +136,10 @@
                 "actor": "urn:li:corpuser:_ingestion"
             },
             "lastModified": {
-                "time": 1707251710392,
+                "time": 1707182625000,
                 "actor": "urn:li:corpuser:_ingestion"
-            }
+            },
+            "dataPlatform": "urn:li:dataPlatform:redshift"
         }
     }
 },
@@ -227,9 +229,10 @@
                 "actor": "urn:li:corpuser:_ingestion"
             },
             "lastModified": {
-                "time": 0,
+                "time": 1707182625000,
                 "actor": "urn:li:corpuser:_ingestion"
-            }
+            },
+            "dataPlatform": "urn:li:dataPlatform:redshift"
         }
     }
 },
diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_view_lineage.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_view_lineage.json
index 3f8fa7e5a1e28..872b5a41d4288 100644
--- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_view_lineage.json
+++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_view_lineage.json
@@ -69,7 +69,8 @@
             "lastModified": {
                 "time": 1707182625000,
                 "actor": "urn:li:corpuser:_ingestion"
-            }
+            },
+            "dataPlatform": "urn:li:dataPlatform:redshift"
         }
     }
 },

From 11771f3b877f463b2aec7284539f57d7c4660e1f Mon Sep 17 00:00:00 2001
From: Harshal Sheth <hsheth2@gmail.com>
Date: Wed, 14 Feb 2024 15:29:34 -0800
Subject: [PATCH 3/6] add_known_lineage_mapping

---
 .../sql_parsing/sql_parsing_aggregator.py     | 96 +++++++++++++++----
 .../test_known_lineage_mapping.json           | 77 +++++++++++++++
 .../unit/sql_parsing/test_sql_aggregator.py   | 33 +++++++
 3 files changed, 187 insertions(+), 19 deletions(-)
 create mode 100644 metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_known_lineage_mapping.json

diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py
index 2384cd3c9d83c..06d6eadebd5a8 100644
--- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py
+++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py
@@ -5,6 +5,7 @@
 import logging
 import pathlib
 import tempfile
+import uuid
 from collections import defaultdict
 from datetime import datetime, timezone
 from typing import Callable, Dict, Iterable, List, Optional, Set, Union, cast
@@ -58,8 +59,6 @@ class QueryLogSetting(enum.Enum):
 
 @dataclasses.dataclass
 class ViewDefinition:
-    # TODO view urn?
-
     view_definition: str
     default_db: Optional[str] = None
     default_schema: Optional[str] = None
@@ -295,6 +294,56 @@ def _initialize_schema_resolver_from_graph(self, graph: DataHubGraph) -> None:
             env=self.env,
         )
 
+    def add_known_query_lineage(self, TODO) -> None:
+        pass
+
+    def add_known_lineage_mapping(
+        self,
+        upstream_urn: UrnStr,
+        downstream_urn: UrnStr,
+        lineage_type: str = models.DatasetLineageTypeClass.COPY,
+    ) -> None:
+        """Add a known lineage mapping to the aggregator.
+
+        By mapping, we mean that the downstream is effectively a copy or
+        alias of the upstream. This is useful for things like external tables
+        (e.g. Redshift Spectrum, Redshift UNLOADs, Snowflake external tables).
+
+        Because this method takes in urns, it does not require that the urns
+        are part of the platform that the aggregator is configured for.
+
+        TODO: In the future, this method will also generate CLL if we have
+        schemas for either the upstream or downstream.
+
+        The known lineage mapping does not contribute to usage statistics or operations.
+
+        Args:
+            upstream_urn: The upstream dataset URN.
+            downstream_urn: The downstream dataset URN.
+        """
+
+        # We generate a fake "query" object to hold the lineage.
+        query_id = self._known_lineage_query_id()
+
+        # Register the query.
+        self._add_to_query_map(
+            QueryMetadata(
+                query_id=query_id,
+                formatted_query_string="-skip-",
+                session_id=_MISSING_SESSION_ID,
+                query_type=QueryType.UNKNOWN,
+                lineage_type=lineage_type,
+                latest_timestamp=None,
+                actor=None,
+                upstreams=[upstream_urn],
+                column_lineage=[],
+                confidence_score=1.0,
+            )
+        )
+
+        # Register the lineage.
+        self._lineage_map.for_mutation(downstream_urn, OrderedSet()).add(query_id)
+
     def add_view_definition(
         self,
         view_urn: DatasetUrn,
@@ -460,6 +509,10 @@ def _make_schema_resolver_for_session(
     def _process_view_definition(
         self, view_urn: UrnStr, view_definition: ViewDefinition
     ) -> None:
+        # Note that in some cases, the view definition will be a SELECT statement
+        # instead of a CREATE VIEW ... AS SELECT statement. In those cases, we can't
+        # trust the parsed query type or downstream urn.
+
         # Run the SQL parser.
         parsed = self._run_sql_parser(
             view_definition.view_definition,
@@ -475,10 +528,6 @@ def _process_view_definition(
         elif parsed.debug_info.error:
             self.report.num_views_column_failed += 1
 
-        # Note that in some cases, the view definition will be a SELECT statement
-        # instead of a CREATE VIEW ... AS SELECT statement. In those cases, we can't
-        # trust the parsed query type or downstream urn.
-
         query_fingerprint = self._view_query_id(view_urn)
 
         # Register the query.
@@ -551,15 +600,6 @@ def _add_to_query_map(self, new: QueryMetadata) -> None:
         else:
             self._query_map[query_fingerprint] = new
 
-    """
-    def add_lineage(self) -> None:
-        # A secondary mechanism for adding non-SQL-based lineage
-        # e.g. redshift external tables might use this when pointing at s3
-
-        # TODO Add this once we have a use case for it
-        pass
-    """
-
     def gen_metadata(self) -> Iterable[MetadataChangeProposalWrapper]:
         # diff from v1 - we generate operations here, and it also
         # generates MCPWs instead of workunits
@@ -651,7 +691,9 @@ def _gen_lineage_for_downstream(
                     dataset=upstream_urn,
                     type=queries_map[query_id].lineage_type,
                     query=(
-                        self._query_urn(query_id) if self.generate_queries else None
+                        self._query_urn(query_id)
+                        if self.can_generate_query(query_id)
+                        else None
                     ),
                     created=query.make_created_audit_stamp(),
                     auditStamp=models.AuditStampClass(
@@ -682,7 +724,9 @@ def _gen_lineage_for_downstream(
                             SchemaFieldUrn(downstream_urn, downstream_column).urn()
                         ],
                         query=(
-                            self._query_urn(query_id) if self.generate_queries else None
+                            self._query_urn(query_id)
+                            if self.can_generate_query(query_id)
+                            else None
                         ),
                         confidenceScore=queries_map[query_id].confidence_score,
                     )
@@ -693,9 +737,10 @@ def _gen_lineage_for_downstream(
             aspect=upstream_aspect,
         )
 
-        if not self.generate_queries:
-            return
         for query_id in required_queries:
+            if not self.can_generate_query(query_id):
+                continue
+
             # Avoid generating the same query twice.
             if query_id in queries_generated:
                 continue
@@ -741,6 +786,19 @@ def _composite_query_id(cls, composed_of_queries: Iterable[QueryId]) -> str:
     def _view_query_id(cls, view_urn: UrnStr) -> str:
         return f"view_{DatasetUrn.url_encode(view_urn)}"
 
+    @classmethod
+    def _known_lineage_query_id(cls) -> str:
+        return f"known_{uuid.uuid4()}"
+
+    @classmethod
+    def _is_known_lineage_query_id(cls, query_id: QueryId) -> bool:
+        # Our query fingerprints are hex and won't have underscores, so this will
+        # never conflict with a real query fingerprint.
+        return query_id.startswith("known_")
+
+    def can_generate_query(self, query_id: QueryId) -> bool:
+        return self.generate_queries and not self._is_known_lineage_query_id(query_id)
+
     def _resolve_query_with_temp_tables(
         self,
         base_query: QueryMetadata,
diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_known_lineage_mapping.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_known_lineage_mapping.json
new file mode 100644
index 0000000000000..5a4fb96618f6f
--- /dev/null
+++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_known_lineage_mapping.json
@@ -0,0 +1,77 @@
+[
+{
+    "entityType": "dataset",
+    "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
+    "changeType": "UPSERT",
+    "aspectName": "upstreamLineage",
+    "aspect": {
+        "json": {
+            "upstreams": [
+                {
+                    "auditStamp": {
+                        "time": 1707182625000,
+                        "actor": "urn:li:corpuser:_ingestion"
+                    },
+                    "created": {
+                        "time": 0,
+                        "actor": "urn:li:corpuser:_ingestion"
+                    },
+                    "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)",
+                    "type": "COPY"
+                }
+            ],
+            "fineGrainedLineages": []
+        }
+    }
+},
+{
+    "entityType": "dataset",
+    "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,bucket2/key2,PROD)",
+    "changeType": "UPSERT",
+    "aspectName": "upstreamLineage",
+    "aspect": {
+        "json": {
+            "upstreams": [
+                {
+                    "auditStamp": {
+                        "time": 1707182625000,
+                        "actor": "urn:li:corpuser:_ingestion"
+                    },
+                    "created": {
+                        "time": 0,
+                        "actor": "urn:li:corpuser:_ingestion"
+                    },
+                    "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
+                    "type": "COPY"
+                }
+            ],
+            "fineGrainedLineages": []
+        }
+    }
+},
+{
+    "entityType": "dataset",
+    "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)",
+    "changeType": "UPSERT",
+    "aspectName": "upstreamLineage",
+    "aspect": {
+        "json": {
+            "upstreams": [
+                {
+                    "auditStamp": {
+                        "time": 1707182625000,
+                        "actor": "urn:li:corpuser:_ingestion"
+                    },
+                    "created": {
+                        "time": 0,
+                        "actor": "urn:li:corpuser:_ingestion"
+                    },
+                    "dataset": "urn:li:dataset:(urn:li:dataPlatform:s3,bucket1/key1,PROD)",
+                    "type": "COPY"
+                }
+            ],
+            "fineGrainedLineages": []
+        }
+    }
+}
+]
\ No newline at end of file
diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py
index ddc1b4c0c2a60..1e4622a07bfe3 100644
--- a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py
+++ b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py
@@ -215,3 +215,36 @@ def test_view_lineage(pytestconfig: pytest.Config) -> None:
         outputs=mcps,
         golden_path=RESOURCE_DIR / "test_view_lineage.json",
     )
+
+
+@freeze_time(FROZEN_TIME)
+def test_known_lineage_mapping(pytestconfig: pytest.Config) -> None:
+    aggregator = SqlParsingAggregator(
+        platform="redshift",
+        platform_instance=None,
+        env=builder.DEFAULT_ENV,
+        generate_lineage=True,
+        generate_usage_statistics=False,
+        generate_operations=False,
+    )
+
+    aggregator.add_known_lineage_mapping(
+        upstream_urn=DatasetUrn("redshift", "dev.public.bar").urn(),
+        downstream_urn=DatasetUrn("redshift", "dev.public.foo").urn(),
+    )
+    aggregator.add_known_lineage_mapping(
+        upstream_urn=DatasetUrn("s3", "bucket1/key1").urn(),
+        downstream_urn=DatasetUrn("redshift", "dev.public.bar").urn(),
+    )
+    aggregator.add_known_lineage_mapping(
+        upstream_urn=DatasetUrn("redshift", "dev.public.foo").urn(),
+        downstream_urn=DatasetUrn("s3", "bucket2/key2").urn(),
+    )
+
+    mcps = list(aggregator.gen_metadata())
+
+    mce_helpers.check_goldens_stream(
+        pytestconfig,
+        outputs=mcps,
+        golden_path=RESOURCE_DIR / "test_known_lineage_mapping.json",
+    )

From f52bc4d38742158870ea726387b3e29ca78c8d6e Mon Sep 17 00:00:00 2001
From: Harshal Sheth <hsheth2@gmail.com>
Date: Wed, 14 Feb 2024 15:36:13 -0800
Subject: [PATCH 4/6] make urn ordering consistent

---
 .../sql_parsing/sql_parsing_aggregator.py     |   2 +-
 .../test_known_lineage_mapping.json           |  12 +-
 .../test_overlapping_inserts.json             |  24 ++--
 .../aggregator_goldens/test_temp_table.json   | 122 +++++++++---------
 4 files changed, 80 insertions(+), 80 deletions(-)

diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py
index 06d6eadebd5a8..5c011e622fb15 100644
--- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py
+++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py
@@ -620,7 +620,7 @@ def _gen_lineage_mcps(self) -> Iterable[MetadataChangeProposalWrapper]:
 
         # Generate lineage and queries.
         queries_generated: Set[QueryId] = set()
-        for downstream_urn in self._lineage_map:
+        for downstream_urn in sorted(self._lineage_map):
             yield from self._gen_lineage_for_downstream(
                 downstream_urn, queries_generated=queries_generated
             )
diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_known_lineage_mapping.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_known_lineage_mapping.json
index 5a4fb96618f6f..ab210c6f701b3 100644
--- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_known_lineage_mapping.json
+++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_known_lineage_mapping.json
@@ -1,7 +1,7 @@
 [
 {
     "entityType": "dataset",
-    "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
+    "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)",
     "changeType": "UPSERT",
     "aspectName": "upstreamLineage",
     "aspect": {
@@ -16,7 +16,7 @@
                         "time": 0,
                         "actor": "urn:li:corpuser:_ingestion"
                     },
-                    "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)",
+                    "dataset": "urn:li:dataset:(urn:li:dataPlatform:s3,bucket1/key1,PROD)",
                     "type": "COPY"
                 }
             ],
@@ -26,7 +26,7 @@
 },
 {
     "entityType": "dataset",
-    "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,bucket2/key2,PROD)",
+    "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
     "changeType": "UPSERT",
     "aspectName": "upstreamLineage",
     "aspect": {
@@ -41,7 +41,7 @@
                         "time": 0,
                         "actor": "urn:li:corpuser:_ingestion"
                     },
-                    "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
+                    "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)",
                     "type": "COPY"
                 }
             ],
@@ -51,7 +51,7 @@
 },
 {
     "entityType": "dataset",
-    "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)",
+    "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,bucket2/key2,PROD)",
     "changeType": "UPSERT",
     "aspectName": "upstreamLineage",
     "aspect": {
@@ -66,7 +66,7 @@
                         "time": 0,
                         "actor": "urn:li:corpuser:_ingestion"
                     },
-                    "dataset": "urn:li:dataset:(urn:li:dataPlatform:s3,bucket1/key1,PROD)",
+                    "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
                     "type": "COPY"
                 }
             ],
diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts.json
index 3af93226eced9..2e5404992245c 100644
--- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts.json
+++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_overlapping_inserts.json
@@ -89,22 +89,22 @@
 },
 {
     "entityType": "query",
-    "entityUrn": "urn:li:query:377a73bbf094c8b176b15157c24242cdfc7a0f407d78e52e63ded08c913468f1",
+    "entityUrn": "urn:li:query:c4b3a21ef8c262ebbe99a5bdb6c29cb0be646392bb4af10b6f4a758af881470e",
     "changeType": "UPSERT",
     "aspectName": "queryProperties",
     "aspect": {
         "json": {
             "statement": {
-                "value": "insert into downstream (a, b) select a, b from upstream1",
+                "value": "insert into downstream (a, c) select a, c from upstream2",
                 "language": "SQL"
             },
             "source": "SYSTEM",
             "created": {
-                "time": 20000,
+                "time": 25000,
                 "actor": "urn:li:corpuser:_ingestion"
             },
             "lastModified": {
-                "time": 20000,
+                "time": 25000,
                 "actor": "urn:li:corpuser:_ingestion"
             },
             "dataPlatform": "urn:li:dataPlatform:redshift"
@@ -113,7 +113,7 @@
 },
 {
     "entityType": "query",
-    "entityUrn": "urn:li:query:377a73bbf094c8b176b15157c24242cdfc7a0f407d78e52e63ded08c913468f1",
+    "entityUrn": "urn:li:query:c4b3a21ef8c262ebbe99a5bdb6c29cb0be646392bb4af10b6f4a758af881470e",
     "changeType": "UPSERT",
     "aspectName": "querySubjects",
     "aspect": {
@@ -123,7 +123,7 @@
                     "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.downstream,PROD)"
                 },
                 {
-                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream1,PROD)"
+                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream2,PROD)"
                 }
             ]
         }
@@ -131,22 +131,22 @@
 },
 {
     "entityType": "query",
-    "entityUrn": "urn:li:query:c4b3a21ef8c262ebbe99a5bdb6c29cb0be646392bb4af10b6f4a758af881470e",
+    "entityUrn": "urn:li:query:377a73bbf094c8b176b15157c24242cdfc7a0f407d78e52e63ded08c913468f1",
     "changeType": "UPSERT",
     "aspectName": "queryProperties",
     "aspect": {
         "json": {
             "statement": {
-                "value": "insert into downstream (a, c) select a, c from upstream2",
+                "value": "insert into downstream (a, b) select a, b from upstream1",
                 "language": "SQL"
             },
             "source": "SYSTEM",
             "created": {
-                "time": 25000,
+                "time": 20000,
                 "actor": "urn:li:corpuser:_ingestion"
             },
             "lastModified": {
-                "time": 25000,
+                "time": 20000,
                 "actor": "urn:li:corpuser:_ingestion"
             },
             "dataPlatform": "urn:li:dataPlatform:redshift"
@@ -155,7 +155,7 @@
 },
 {
     "entityType": "query",
-    "entityUrn": "urn:li:query:c4b3a21ef8c262ebbe99a5bdb6c29cb0be646392bb4af10b6f4a758af881470e",
+    "entityUrn": "urn:li:query:377a73bbf094c8b176b15157c24242cdfc7a0f407d78e52e63ded08c913468f1",
     "changeType": "UPSERT",
     "aspectName": "querySubjects",
     "aspect": {
@@ -165,7 +165,7 @@
                     "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.downstream,PROD)"
                 },
                 {
-                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream2,PROD)"
+                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.upstream1,PROD)"
                 }
             ]
         }
diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_temp_table.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_temp_table.json
index 2c485a5ea945f..c4a20610d344a 100644
--- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_temp_table.json
+++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_temp_table.json
@@ -1,7 +1,7 @@
 [
 {
     "entityType": "dataset",
-    "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_session3,PROD)",
+    "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
     "changeType": "UPSERT",
     "aspectName": "upstreamLineage",
     "aspect": {
@@ -16,24 +16,49 @@
                         "time": 0,
                         "actor": "urn:li:corpuser:_ingestion"
                     },
-                    "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
+                    "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)",
                     "type": "TRANSFORMED",
-                    "query": "urn:li:query:3e85e6f353c7fa33d6514cb090482852064d23df6491c9a8ae28be0d990a3c71"
+                    "query": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1"
                 }
             ],
-            "fineGrainedLineages": []
+            "fineGrainedLineages": [
+                {
+                    "upstreamType": "FIELD_SET",
+                    "upstreams": [
+                        "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),a)"
+                    ],
+                    "downstreamType": "FIELD",
+                    "downstreams": [
+                        "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),a)"
+                    ],
+                    "confidenceScore": 0.35,
+                    "query": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1"
+                },
+                {
+                    "upstreamType": "FIELD_SET",
+                    "upstreams": [
+                        "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),b)"
+                    ],
+                    "downstreamType": "FIELD",
+                    "downstreams": [
+                        "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),b)"
+                    ],
+                    "confidenceScore": 0.35,
+                    "query": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1"
+                }
+            ]
         }
     }
 },
 {
     "entityType": "query",
-    "entityUrn": "urn:li:query:3e85e6f353c7fa33d6514cb090482852064d23df6491c9a8ae28be0d990a3c71",
+    "entityUrn": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1",
     "changeType": "UPSERT",
     "aspectName": "queryProperties",
     "aspect": {
         "json": {
             "statement": {
-                "value": "create table foo_session3 as select * from foo",
+                "value": "create table foo as select a, 2*b as b from bar",
                 "language": "SQL"
             },
             "source": "SYSTEM",
@@ -51,17 +76,17 @@
 },
 {
     "entityType": "query",
-    "entityUrn": "urn:li:query:3e85e6f353c7fa33d6514cb090482852064d23df6491c9a8ae28be0d990a3c71",
+    "entityUrn": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1",
     "changeType": "UPSERT",
     "aspectName": "querySubjects",
     "aspect": {
         "json": {
             "subjects": [
                 {
-                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_session3,PROD)"
+                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)"
                 },
                 {
-                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)"
+                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)"
                 }
             ]
         }
@@ -143,9 +168,27 @@
         }
     }
 },
+{
+    "entityType": "query",
+    "entityUrn": "urn:li:query:composite_66ddf44283e4543440529f1d13b82221b5d60635b6a8c39751718049ce4f47ec",
+    "changeType": "UPSERT",
+    "aspectName": "querySubjects",
+    "aspect": {
+        "json": {
+            "subjects": [
+                {
+                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_session2,PROD)"
+                },
+                {
+                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)"
+                }
+            ]
+        }
+    }
+},
 {
     "entityType": "dataset",
-    "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
+    "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_session3,PROD)",
     "changeType": "UPSERT",
     "aspectName": "upstreamLineage",
     "aspect": {
@@ -160,67 +203,24 @@
                         "time": 0,
                         "actor": "urn:li:corpuser:_ingestion"
                     },
-                    "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)",
+                    "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
                     "type": "TRANSFORMED",
-                    "query": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1"
+                    "query": "urn:li:query:3e85e6f353c7fa33d6514cb090482852064d23df6491c9a8ae28be0d990a3c71"
                 }
             ],
-            "fineGrainedLineages": [
-                {
-                    "upstreamType": "FIELD_SET",
-                    "upstreams": [
-                        "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),a)"
-                    ],
-                    "downstreamType": "FIELD",
-                    "downstreams": [
-                        "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),a)"
-                    ],
-                    "confidenceScore": 0.35,
-                    "query": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1"
-                },
-                {
-                    "upstreamType": "FIELD_SET",
-                    "upstreams": [
-                        "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),b)"
-                    ],
-                    "downstreamType": "FIELD",
-                    "downstreams": [
-                        "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),b)"
-                    ],
-                    "confidenceScore": 0.35,
-                    "query": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1"
-                }
-            ]
-        }
-    }
-},
-{
-    "entityType": "query",
-    "entityUrn": "urn:li:query:composite_66ddf44283e4543440529f1d13b82221b5d60635b6a8c39751718049ce4f47ec",
-    "changeType": "UPSERT",
-    "aspectName": "querySubjects",
-    "aspect": {
-        "json": {
-            "subjects": [
-                {
-                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_session2,PROD)"
-                },
-                {
-                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)"
-                }
-            ]
+            "fineGrainedLineages": []
         }
     }
 },
 {
     "entityType": "query",
-    "entityUrn": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1",
+    "entityUrn": "urn:li:query:3e85e6f353c7fa33d6514cb090482852064d23df6491c9a8ae28be0d990a3c71",
     "changeType": "UPSERT",
     "aspectName": "queryProperties",
     "aspect": {
         "json": {
             "statement": {
-                "value": "create table foo as select a, 2*b as b from bar",
+                "value": "create table foo_session3 as select * from foo",
                 "language": "SQL"
             },
             "source": "SYSTEM",
@@ -238,17 +238,17 @@
 },
 {
     "entityType": "query",
-    "entityUrn": "urn:li:query:8b3a079997d562bdb1b14eb500e6123c4b00bb0263565dcaa0b66170e72602a1",
+    "entityUrn": "urn:li:query:3e85e6f353c7fa33d6514cb090482852064d23df6491c9a8ae28be0d990a3c71",
     "changeType": "UPSERT",
     "aspectName": "querySubjects",
     "aspect": {
         "json": {
             "subjects": [
                 {
-                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)"
+                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo_session3,PROD)"
                 },
                 {
-                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)"
+                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)"
                 }
             ]
         }

From 38e854062e46578f6723d4343bc0ac1ffa9debd3 Mon Sep 17 00:00:00 2001
From: Harshal Sheth <hsheth2@gmail.com>
Date: Wed, 14 Feb 2024 16:58:31 -0800
Subject: [PATCH 5/6] Add column lineage deduplication test

---
 .../test_column_lineage_deduplication.json    | 149 ++++++++++++++++++
 .../unit/sql_parsing/test_sql_aggregator.py   |  36 +++++
 2 files changed, 185 insertions(+)
 create mode 100644 metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_column_lineage_deduplication.json

diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_column_lineage_deduplication.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_column_lineage_deduplication.json
new file mode 100644
index 0000000000000..303641c150355
--- /dev/null
+++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_column_lineage_deduplication.json
@@ -0,0 +1,149 @@
+[
+{
+    "entityType": "dataset",
+    "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
+    "changeType": "UPSERT",
+    "aspectName": "upstreamLineage",
+    "aspect": {
+        "json": {
+            "upstreams": [
+                {
+                    "auditStamp": {
+                        "time": 1707182625000,
+                        "actor": "urn:li:corpuser:_ingestion"
+                    },
+                    "created": {
+                        "time": 0,
+                        "actor": "urn:li:corpuser:_ingestion"
+                    },
+                    "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)",
+                    "type": "TRANSFORMED",
+                    "query": "urn:li:query:fc48287a96588c73bcbdc1400f0c036b8d81196135618fb09a097459d54bd970"
+                }
+            ],
+            "fineGrainedLineages": [
+                {
+                    "upstreamType": "FIELD_SET",
+                    "upstreams": [
+                        "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),a)"
+                    ],
+                    "downstreamType": "FIELD",
+                    "downstreams": [
+                        "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),a)"
+                    ],
+                    "confidenceScore": 0.2,
+                    "query": "urn:li:query:fc48287a96588c73bcbdc1400f0c036b8d81196135618fb09a097459d54bd970"
+                },
+                {
+                    "upstreamType": "FIELD_SET",
+                    "upstreams": [
+                        "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),b)"
+                    ],
+                    "downstreamType": "FIELD",
+                    "downstreams": [
+                        "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),b)"
+                    ],
+                    "confidenceScore": 0.2,
+                    "query": "urn:li:query:fc48287a96588c73bcbdc1400f0c036b8d81196135618fb09a097459d54bd970"
+                },
+                {
+                    "upstreamType": "FIELD_SET",
+                    "upstreams": [
+                        "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),c)"
+                    ],
+                    "downstreamType": "FIELD",
+                    "downstreams": [
+                        "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),c)"
+                    ],
+                    "confidenceScore": 0.2,
+                    "query": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae"
+                }
+            ]
+        }
+    }
+},
+{
+    "entityType": "query",
+    "entityUrn": "urn:li:query:fc48287a96588c73bcbdc1400f0c036b8d81196135618fb09a097459d54bd970",
+    "changeType": "UPSERT",
+    "aspectName": "queryProperties",
+    "aspect": {
+        "json": {
+            "statement": {
+                "value": "/* query 2 */ insert into foo (a, b) select a, b from bar",
+                "language": "SQL"
+            },
+            "source": "SYSTEM",
+            "created": {
+                "time": 0,
+                "actor": "urn:li:corpuser:_ingestion"
+            },
+            "lastModified": {
+                "time": 1707182625000,
+                "actor": "urn:li:corpuser:_ingestion"
+            },
+            "dataPlatform": "urn:li:dataPlatform:redshift"
+        }
+    }
+},
+{
+    "entityType": "query",
+    "entityUrn": "urn:li:query:fc48287a96588c73bcbdc1400f0c036b8d81196135618fb09a097459d54bd970",
+    "changeType": "UPSERT",
+    "aspectName": "querySubjects",
+    "aspect": {
+        "json": {
+            "subjects": [
+                {
+                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)"
+                },
+                {
+                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)"
+                }
+            ]
+        }
+    }
+},
+{
+    "entityType": "query",
+    "entityUrn": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae",
+    "changeType": "UPSERT",
+    "aspectName": "queryProperties",
+    "aspect": {
+        "json": {
+            "statement": {
+                "value": "/* query 1 */ insert into foo (a, b, c) select a, b, c from bar",
+                "language": "SQL"
+            },
+            "source": "SYSTEM",
+            "created": {
+                "time": 0,
+                "actor": "urn:li:corpuser:_ingestion"
+            },
+            "lastModified": {
+                "time": 1707182625000,
+                "actor": "urn:li:corpuser:_ingestion"
+            },
+            "dataPlatform": "urn:li:dataPlatform:redshift"
+        }
+    }
+},
+{
+    "entityType": "query",
+    "entityUrn": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae",
+    "changeType": "UPSERT",
+    "aspectName": "querySubjects",
+    "aspect": {
+        "json": {
+            "subjects": [
+                {
+                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)"
+                },
+                {
+                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)"
+                }
+            ]
+        }
+    }
+}
+]
\ No newline at end of file
diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py
index 1e4622a07bfe3..cc6be4aa7a895 100644
--- a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py
+++ b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py
@@ -248,3 +248,39 @@ def test_known_lineage_mapping(pytestconfig: pytest.Config) -> None:
         outputs=mcps,
         golden_path=RESOURCE_DIR / "test_known_lineage_mapping.json",
     )
+
+
+@freeze_time(FROZEN_TIME)
+def test_column_lineage_deduplication(pytestconfig: pytest.Config) -> None:
+    aggregator = SqlParsingAggregator(
+        platform="redshift",
+        platform_instance=None,
+        env=builder.DEFAULT_ENV,
+        generate_lineage=True,
+        generate_usage_statistics=False,
+        generate_operations=False,
+    )
+
+    aggregator.add_observed_query(
+        query="/* query 1 */ insert into foo (a, b, c) select a, b, c from bar",
+        default_db="dev",
+        default_schema="public",
+    )
+    aggregator.add_observed_query(
+        query="/* query 2 */ insert into foo (a, b) select a, b from bar",
+        default_db="dev",
+        default_schema="public",
+    )
+
+    mcps = list(aggregator.gen_metadata())
+
+    # In this case, the lineage for a and b is attributed to query 2, and
+    # the lineage for c is attributed to query 1. Note that query 1 does
+    # not get any credit for a and b, as they are already covered by query 2,
+    # which came later and hence has higher precedence.
+
+    mce_helpers.check_goldens_stream(
+        pytestconfig,
+        outputs=mcps,
+        golden_path=RESOURCE_DIR / "test_column_lineage_deduplication.json",
+    )

From bca571130bdcba476636af3c9dc6abbe7e48a5f3 Mon Sep 17 00:00:00 2001
From: Harshal Sheth <hsheth2@gmail.com>
Date: Wed, 14 Feb 2024 17:49:31 -0800
Subject: [PATCH 6/6] Add add_known_query_lineage method

---
 .../sql_parsing/sql_parsing_aggregator.py     |  79 +++++++++--
 .../src/datahub/sql_parsing/sqlglot_utils.py  |  16 ++-
 .../test_add_known_query_lineage.json         | 127 ++++++++++++++++++
 .../test_aggregate_operations.json            |   6 -
 .../unit/sql_parsing/test_sql_aggregator.py   |  63 ++++++---
 5 files changed, 257 insertions(+), 34 deletions(-)
 create mode 100644 metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_add_known_query_lineage.json

diff --git a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py
index 5c011e622fb15..9b7b3efa87d1f 100644
--- a/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py
+++ b/metadata-ingestion/src/datahub/sql_parsing/sql_parsing_aggregator.py
@@ -10,6 +10,7 @@
 from datetime import datetime, timezone
 from typing import Callable, Dict, Iterable, List, Optional, Set, Union, cast
 
+import datahub.emitter.mce_builder as builder
 import datahub.metadata.schema_classes as models
 from datahub.emitter.mce_builder import get_sys_time, make_ts_millis
 from datahub.emitter.mcp import MetadataChangeProposalWrapper
@@ -34,7 +35,7 @@
     infer_output_schema,
     sqlglot_lineage,
 )
-from datahub.sql_parsing.sqlglot_utils import generate_hash
+from datahub.sql_parsing.sqlglot_utils import generate_hash, get_query_fingerprint
 from datahub.utilities.file_backed_collections import (
     ConnectionWrapper,
     FileBackedDict,
@@ -95,6 +96,18 @@ def make_last_modified_audit_stamp(self) -> models.AuditStampClass:
         )
 
 
+@dataclasses.dataclass
+class KnownQueryLineageInfo:
+    query_text: str
+
+    downstream: UrnStr
+    upstreams: List[UrnStr]
+    column_lineage: Optional[List[ColumnLineageInfo]] = None
+
+    timestamp: Optional[datetime] = None
+    query_type: QueryType = QueryType.UNKNOWN
+
+
 @dataclasses.dataclass
 class SqlAggregatorReport(Report):
     _aggregator: "SqlParsingAggregator"
@@ -103,12 +116,16 @@ class SqlAggregatorReport(Report):
     num_observed_queries: int = 0
     num_observed_queries_failed: int = 0
     num_observed_queries_column_failed: int = 0
-    observed_query_parse_failures = LossyList[str]()
+    observed_query_parse_failures: LossyList[str] = dataclasses.field(
+        default_factory=LossyList
+    )
 
     num_view_definitions: int = 0
     num_views_failed: int = 0
     num_views_column_failed: int = 0
-    views_parse_failures = LossyDict[UrnStr, str]()
+    views_parse_failures: LossyDict[UrnStr, str] = dataclasses.field(
+        default_factory=LossyDict
+    )
 
     num_queries_with_temp_tables_in_session: int = 0
 
@@ -142,8 +159,8 @@ def __init__(
         self,
         *,
         platform: str,
-        platform_instance: Optional[str],
-        env: str,
+        platform_instance: Optional[str] = None,
+        env: str = builder.DEFAULT_ENV,
         graph: Optional[DataHubGraph] = None,
         generate_lineage: bool = True,
         generate_queries: bool = True,
@@ -294,8 +311,48 @@ def _initialize_schema_resolver_from_graph(self, graph: DataHubGraph) -> None:
             env=self.env,
         )
 
-    def add_known_query_lineage(self, TODO) -> None:
-        pass
+    def add_known_query_lineage(
+        self, known_query_lineage: KnownQueryLineageInfo
+    ) -> None:
+        """Add a query and it's precomputed lineage to the aggregator.
+
+        This is useful for cases where we have lineage information that was
+        computed outside of the SQL parsing aggregator, e.g. from a data
+        warehouse's system tables.
+
+        This will also generate an operation aspect for the query if there is
+        a timestamp and the query type field is set to a mutation type.
+
+        Args:
+            known_query_lineage: The known query lineage information.
+        """
+
+        # Generate a fingerprint for the query.
+        query_fingerprint = get_query_fingerprint(
+            known_query_lineage.query_text, self.platform.platform_name
+        )
+        # TODO format the query text?
+
+        # Register the query.
+        self._add_to_query_map(
+            QueryMetadata(
+                query_id=query_fingerprint,
+                formatted_query_string=known_query_lineage.query_text,
+                session_id=_MISSING_SESSION_ID,
+                query_type=known_query_lineage.query_type,
+                lineage_type=models.DatasetLineageTypeClass.TRANSFORMED,
+                latest_timestamp=known_query_lineage.timestamp,
+                actor=None,
+                upstreams=known_query_lineage.upstreams,
+                column_lineage=known_query_lineage.column_lineage or [],
+                confidence_score=1.0,
+            )
+        )
+
+        # Register the lineage.
+        self._lineage_map.for_mutation(
+            known_query_lineage.downstream, OrderedSet()
+        ).add(query_fingerprint)
 
     def add_known_lineage_mapping(
         self,
@@ -965,8 +1022,10 @@ def _gen_operation_for_downstream(
             operationType=operation_type,
             lastUpdatedTimestamp=make_ts_millis(query.latest_timestamp),
             actor=query.actor.urn() if query.actor else None,
-            customProperties={
-                "query_urn": self._query_urn(query_id),
-            },
+            customProperties=(
+                {"query_urn": self._query_urn(query_id)}
+                if self.can_generate_query(query_id)
+                else None
+            ),
         )
         yield MetadataChangeProposalWrapper(entityUrn=downstream_urn, aspect=aspect)
diff --git a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py
index 587394cc14646..44337f1070140 100644
--- a/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py
+++ b/metadata-ingestion/src/datahub/sql_parsing/sqlglot_utils.py
@@ -1,8 +1,11 @@
 import hashlib
+import logging
 from typing import Dict, Iterable, Optional, Union
 
 import sqlglot
+import sqlglot.errors
 
+logger = logging.getLogger(__name__)
 DialectOrStr = Union[sqlglot.Dialect, str]
 
 
@@ -139,10 +142,17 @@ def get_query_fingerprint(
         The fingerprint for the SQL query.
     """
 
-    dialect = get_dialect(dialect)
-    expression_sql = generalize_query(expression, dialect=dialect)
-    fingerprint = generate_hash(expression_sql)
+    try:
+        dialect = get_dialect(dialect)
+        expression_sql = generalize_query(expression, dialect=dialect)
+    except (ValueError, sqlglot.errors.SqlglotError) as e:
+        if not isinstance(expression, str):
+            raise
 
+        logger.debug("Failed to generalize query for fingerprinting: %s", e)
+        expression_sql = expression
+
+    fingerprint = generate_hash(expression_sql)
     return fingerprint
 
 
diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_add_known_query_lineage.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_add_known_query_lineage.json
new file mode 100644
index 0000000000000..20bd08ce4c823
--- /dev/null
+++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_add_known_query_lineage.json
@@ -0,0 +1,127 @@
+[
+{
+    "entityType": "dataset",
+    "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
+    "changeType": "UPSERT",
+    "aspectName": "upstreamLineage",
+    "aspect": {
+        "json": {
+            "upstreams": [
+                {
+                    "auditStamp": {
+                        "time": 1707182625000,
+                        "actor": "urn:li:corpuser:_ingestion"
+                    },
+                    "created": {
+                        "time": 20000,
+                        "actor": "urn:li:corpuser:_ingestion"
+                    },
+                    "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)",
+                    "type": "TRANSFORMED",
+                    "query": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae"
+                }
+            ],
+            "fineGrainedLineages": [
+                {
+                    "upstreamType": "FIELD_SET",
+                    "upstreams": [
+                        "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),a)"
+                    ],
+                    "downstreamType": "FIELD",
+                    "downstreams": [
+                        "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),a)"
+                    ],
+                    "confidenceScore": 1.0,
+                    "query": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae"
+                },
+                {
+                    "upstreamType": "FIELD_SET",
+                    "upstreams": [
+                        "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),b)"
+                    ],
+                    "downstreamType": "FIELD",
+                    "downstreams": [
+                        "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),b)"
+                    ],
+                    "confidenceScore": 1.0,
+                    "query": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae"
+                },
+                {
+                    "upstreamType": "FIELD_SET",
+                    "upstreams": [
+                        "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD),c)"
+                    ],
+                    "downstreamType": "FIELD",
+                    "downstreams": [
+                        "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD),c)"
+                    ],
+                    "confidenceScore": 1.0,
+                    "query": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae"
+                }
+            ]
+        }
+    }
+},
+{
+    "entityType": "query",
+    "entityUrn": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae",
+    "changeType": "UPSERT",
+    "aspectName": "queryProperties",
+    "aspect": {
+        "json": {
+            "statement": {
+                "value": "insert into foo (a, b, c) select a, b, c from bar",
+                "language": "SQL"
+            },
+            "source": "SYSTEM",
+            "created": {
+                "time": 0,
+                "actor": "urn:li:corpuser:_ingestion"
+            },
+            "lastModified": {
+                "time": 1707182625000,
+                "actor": "urn:li:corpuser:_ingestion"
+            },
+            "dataPlatform": "urn:li:dataPlatform:redshift"
+        }
+    }
+},
+{
+    "entityType": "query",
+    "entityUrn": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae",
+    "changeType": "UPSERT",
+    "aspectName": "querySubjects",
+    "aspect": {
+        "json": {
+            "subjects": [
+                {
+                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)"
+                },
+                {
+                    "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)"
+                }
+            ]
+        }
+    }
+},
+{
+    "entityType": "dataset",
+    "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.foo,PROD)",
+    "changeType": "UPSERT",
+    "aspectName": "operation",
+    "aspect": {
+        "json": {
+            "timestampMillis": 1707182625000,
+            "partitionSpec": {
+                "type": "FULL_TABLE",
+                "partition": "FULL_TABLE_SNAPSHOT"
+            },
+            "operationType": "INSERT",
+            "customProperties": {
+                "query_urn": "urn:li:query:02e2ec36678bea2a8c4c855fed5255d087cfeb2710d326e95fd9b48a9c4fc0ae"
+            },
+            "lastUpdatedTimestamp": 20000
+        }
+    }
+}
+]
\ No newline at end of file
diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_aggregate_operations.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_aggregate_operations.json
index 551760b42394c..25e75317096df 100644
--- a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_aggregate_operations.json
+++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_aggregate_operations.json
@@ -13,9 +13,6 @@
             },
             "actor": "urn:li:corpuser:user2",
             "operationType": "CREATE",
-            "customProperties": {
-                "query_urn": "urn:li:query:cbdb3e148ea7fdae81815da4dd64f57873fb9c3d7d4bfad4e83b3d1ebd3c45c2"
-            },
             "lastUpdatedTimestamp": 25000
         }
     }
@@ -34,9 +31,6 @@
             },
             "actor": "urn:li:corpuser:user3",
             "operationType": "CREATE",
-            "customProperties": {
-                "query_urn": "urn:li:query:7fd78ed5f3d60f7f91206f5e0fea6851a2afe940944455fd292267613b7ee1e6"
-            },
             "lastUpdatedTimestamp": 26000
         }
     }
diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py
index cc6be4aa7a895..5b51266c692b7 100644
--- a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py
+++ b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py
@@ -4,12 +4,14 @@
 import pytest
 from freezegun import freeze_time
 
-import datahub.emitter.mce_builder as builder
 from datahub.metadata.urns import CorpUserUrn, DatasetUrn
 from datahub.sql_parsing.sql_parsing_aggregator import (
+    KnownQueryLineageInfo,
     QueryLogSetting,
     SqlParsingAggregator,
 )
+from datahub.sql_parsing.sql_parsing_common import QueryType
+from datahub.sql_parsing.sqlglot_lineage import ColumnLineageInfo, ColumnRef
 from tests.test_helpers import mce_helpers
 
 RESOURCE_DIR = pathlib.Path(__file__).parent / "aggregator_goldens"
@@ -24,8 +26,6 @@ def _ts(ts: int) -> datetime:
 def test_basic_lineage(pytestconfig: pytest.Config) -> None:
     aggregator = SqlParsingAggregator(
         platform="redshift",
-        platform_instance=None,
-        env=builder.DEFAULT_ENV,
         generate_lineage=True,
         generate_usage_statistics=False,
         generate_operations=False,
@@ -50,8 +50,6 @@ def test_basic_lineage(pytestconfig: pytest.Config) -> None:
 def test_overlapping_inserts(pytestconfig: pytest.Config) -> None:
     aggregator = SqlParsingAggregator(
         platform="redshift",
-        platform_instance=None,
-        env=builder.DEFAULT_ENV,
         generate_lineage=True,
         generate_usage_statistics=False,
         generate_operations=False,
@@ -83,8 +81,6 @@ def test_overlapping_inserts(pytestconfig: pytest.Config) -> None:
 def test_temp_table(pytestconfig: pytest.Config) -> None:
     aggregator = SqlParsingAggregator(
         platform="redshift",
-        platform_instance=None,
-        env=builder.DEFAULT_ENV,
         generate_lineage=True,
         generate_usage_statistics=False,
         generate_operations=False,
@@ -136,8 +132,6 @@ def test_temp_table(pytestconfig: pytest.Config) -> None:
 def test_aggregate_operations(pytestconfig: pytest.Config) -> None:
     aggregator = SqlParsingAggregator(
         platform="redshift",
-        platform_instance=None,
-        env=builder.DEFAULT_ENV,
         generate_lineage=False,
         generate_queries=False,
         generate_usage_statistics=False,
@@ -181,8 +175,6 @@ def test_aggregate_operations(pytestconfig: pytest.Config) -> None:
 def test_view_lineage(pytestconfig: pytest.Config) -> None:
     aggregator = SqlParsingAggregator(
         platform="redshift",
-        platform_instance=None,
-        env=builder.DEFAULT_ENV,
         generate_lineage=True,
         generate_usage_statistics=False,
         generate_operations=False,
@@ -221,8 +213,6 @@ def test_view_lineage(pytestconfig: pytest.Config) -> None:
 def test_known_lineage_mapping(pytestconfig: pytest.Config) -> None:
     aggregator = SqlParsingAggregator(
         platform="redshift",
-        platform_instance=None,
-        env=builder.DEFAULT_ENV,
         generate_lineage=True,
         generate_usage_statistics=False,
         generate_operations=False,
@@ -254,8 +244,6 @@ def test_known_lineage_mapping(pytestconfig: pytest.Config) -> None:
 def test_column_lineage_deduplication(pytestconfig: pytest.Config) -> None:
     aggregator = SqlParsingAggregator(
         platform="redshift",
-        platform_instance=None,
-        env=builder.DEFAULT_ENV,
         generate_lineage=True,
         generate_usage_statistics=False,
         generate_operations=False,
@@ -284,3 +272,48 @@ def test_column_lineage_deduplication(pytestconfig: pytest.Config) -> None:
         outputs=mcps,
         golden_path=RESOURCE_DIR / "test_column_lineage_deduplication.json",
     )
+
+
+@freeze_time(FROZEN_TIME)
+def test_add_known_query_lineage(pytestconfig: pytest.Config) -> None:
+    aggregator = SqlParsingAggregator(
+        platform="redshift",
+        generate_lineage=True,
+        generate_usage_statistics=False,
+        generate_operations=True,
+    )
+
+    downstream_urn = DatasetUrn("redshift", "dev.public.foo").urn()
+    upstream_urn = DatasetUrn("redshift", "dev.public.bar").urn()
+
+    known_query_lineage = KnownQueryLineageInfo(
+        query_text="insert into foo (a, b, c) select a, b, c from bar",
+        downstream=downstream_urn,
+        upstreams=[upstream_urn],
+        column_lineage=[
+            ColumnLineageInfo(
+                downstream=ColumnRef(table=downstream_urn, column="a"),
+                upstreams=[ColumnRef(table=upstream_urn, column="a")],
+            ),
+            ColumnLineageInfo(
+                downstream=ColumnRef(table=downstream_urn, column="b"),
+                upstreams=[ColumnRef(table=upstream_urn, column="b")],
+            ),
+            ColumnLineageInfo(
+                downstream=ColumnRef(table=downstream_urn, column="c"),
+                upstreams=[ColumnRef(table=upstream_urn, column="c")],
+            ),
+        ],
+        timestamp=_ts(20),
+        query_type=QueryType.INSERT,
+    )
+
+    aggregator.add_known_query_lineage(known_query_lineage)
+
+    mcps = list(aggregator.gen_metadata())
+
+    mce_helpers.check_goldens_stream(
+        pytestconfig,
+        outputs=mcps,
+        golden_path=RESOURCE_DIR / "test_add_known_query_lineage.json",
+    )