Skip to content

Commit ba4e07f

Browse files
committed
more filters + external table lineage
1 parent 66d0733 commit ba4e07f

File tree

2 files changed

+77
-12
lines changed

2 files changed

+77
-12
lines changed

metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py

+70-9
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import logging
22
import traceback
3-
from typing import Callable, Iterable, List, Optional, Tuple
3+
from typing import Callable, Dict, Iterable, List, Optional, Tuple, Union
44

55
import redshift_connector
66

7+
from datahub.emitter import mce_builder
78
from datahub.ingestion.api.common import PipelineContext
89
from datahub.ingestion.api.workunit import MetadataWorkUnit
910
from datahub.ingestion.source.redshift.config import LineageMode, RedshiftConfig
@@ -15,6 +16,9 @@
1516
from datahub.ingestion.source.redshift.redshift_schema import (
1617
LineageRow,
1718
RedshiftDataDictionary,
19+
RedshiftSchema,
20+
RedshiftTable,
21+
RedshiftView,
1822
)
1923
from datahub.ingestion.source.redshift.report import RedshiftReport
2024
from datahub.ingestion.source.state.redundant_run_skip_handler import (
@@ -40,13 +44,14 @@ def __init__(
4044
database: str,
4145
redundant_run_skip_handler: Optional[RedundantLineageRunSkipHandler] = None,
4246
):
47+
self.platform = "redshift"
4348
self.config = config
4449
self.report = report
4550
self.context = context
4651

4752
self.database = database
4853
self.aggregator = SqlParsingAggregator(
49-
platform="redshift",
54+
platform=self.platform,
5055
platform_instance=self.config.platform_instance,
5156
env=self.config.env,
5257
generate_lineage=True,
@@ -70,7 +75,27 @@ def __init__(
7075
self.report.lineage_end_time,
7176
) = self._lineage_v1.get_time_window()
7277

73-
def build(self, connection: redshift_connector.Connection) -> None:
78+
def build(
79+
self,
80+
connection: redshift_connector.Connection,
81+
all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]],
82+
db_schemas: Dict[str, Dict[str, RedshiftSchema]],
83+
) -> None:
84+
# Assume things not in `all_tables` as temp tables.
85+
known_urns = set(
86+
DatasetUrn.create_from_ids(
87+
self.platform,
88+
f"{db}.{schema}.{table.name}",
89+
env=self.config.env,
90+
platform_instance=self.config.platform_instance,
91+
).urn()
92+
for db, schemas in all_tables.items()
93+
for schema, tables in schemas.items()
94+
for table in tables
95+
)
96+
self.aggregator.is_temp_table = lambda urn: urn not in known_urns
97+
98+
# Handle all the temp tables up front.
7499
if self.config.resolve_temp_table_in_lineage:
75100
for temp_row in self._lineage_v1.get_temp_tables(connection=connection):
76101
self.aggregator.add_observed_query(
@@ -173,7 +198,8 @@ def build(self, connection: redshift_connector.Connection) -> None:
173198
connection=connection,
174199
)
175200

176-
# TODO add lineage for external tables
201+
# Populate lineage for external tables.
202+
self._process_external_tables(all_tables=all_tables, db_schemas=db_schemas)
177203

178204
def _populate_lineage_agg(
179205
self,
@@ -214,13 +240,13 @@ def _process_sql_parser_lineage(self, lineage_row: LineageRow) -> None:
214240

215241
def _process_stl_scan_lineage(self, lineage_row: LineageRow) -> None:
216242
target = DatasetUrn.create_from_ids(
217-
"redshift",
243+
self.platform,
218244
f"{self.database}.{lineage_row.target_schema}.{lineage_row.target_table}",
219245
env=self.config.env,
220246
platform_instance=self.config.platform_instance,
221247
)
222248
source = DatasetUrn.create_from_ids(
223-
"redshift",
249+
self.platform,
224250
f"{self.database}.{lineage_row.source_schema}.{lineage_row.source_table}",
225251
env=self.config.env,
226252
platform_instance=self.config.platform_instance,
@@ -246,7 +272,7 @@ def _process_view_lineage(self, lineage_row: LineageRow) -> None:
246272
f"{self.database}.{lineage_row.target_schema}.{lineage_row.target_table}"
247273
)
248274
target = DatasetUrn.create_from_ids(
249-
"redshift",
275+
self.platform,
250276
target_name,
251277
env=self.config.env,
252278
platform_instance=self.config.platform_instance,
@@ -275,7 +301,7 @@ def _process_copy_command(self, lineage_row: LineageRow) -> None:
275301
if not lineage_row.target_schema or not lineage_row.target_table:
276302
return
277303
target = DatasetUrn.create_from_ids(
278-
"redshift",
304+
self.platform,
279305
f"{self.database}.{lineage_row.target_schema}.{lineage_row.target_table}",
280306
env=self.config.env,
281307
platform_instance=self.config.platform_instance,
@@ -299,7 +325,7 @@ def _process_unload_command(self, lineage_row: LineageRow) -> None:
299325
if not lineage_row.source_schema or not lineage_row.source_table:
300326
return
301327
source = DatasetUrn.create_from_ids(
302-
"redshift",
328+
self.platform,
303329
f"{self.database}.{lineage_row.source_schema}.{lineage_row.source_table}",
304330
env=self.config.env,
305331
platform_instance=self.config.platform_instance,
@@ -309,6 +335,41 @@ def _process_unload_command(self, lineage_row: LineageRow) -> None:
309335
upstream_urn=source.urn(), downstream_urn=output_urn
310336
)
311337

338+
def _process_external_tables(
339+
self,
340+
all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]],
341+
db_schemas: Dict[str, Dict[str, RedshiftSchema]],
342+
) -> None:
343+
for schema_name, tables in all_tables[self.database].items():
344+
for table in tables:
345+
if table.type == "EXTERNAL_TABLE":
346+
schema = db_schemas[self.database][schema_name]
347+
348+
# external_db_params = schema.option
349+
upstream_platform = schema.type.lower()
350+
351+
table_urn = mce_builder.make_dataset_urn_with_platform_instance(
352+
self.platform,
353+
f"{self.database}.{schema_name}.{table.name}",
354+
platform_instance=self.config.platform_instance,
355+
env=self.config.env,
356+
)
357+
upstream_urn = mce_builder.make_dataset_urn_with_platform_instance(
358+
upstream_platform,
359+
f"{schema.external_database}.{table.name}",
360+
platform_instance=(
361+
self.config.platform_instance_map.get(upstream_platform)
362+
if self.config.platform_instance_map
363+
else None
364+
),
365+
env=self.config.env,
366+
)
367+
368+
self.aggregator.add_known_lineage_mapping(
369+
upstream_urn=upstream_urn,
370+
downstream_urn=table_urn,
371+
)
372+
312373
def generate(self) -> Iterable[MetadataWorkUnit]:
313374
for mcp in self.aggregator.gen_metadata():
314375
yield mcp.as_workunit()

metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py

+7-3
Original file line numberDiff line numberDiff line change
@@ -961,14 +961,18 @@ def extract_lineage_usage_v2(
961961
return
962962

963963
with PerfTimer() as timer:
964-
lineage_extractor.build(connection=connection)
964+
all_tables = self.get_all_tables()
965965

966-
self.report.lineage_extraction_sec[f"{database}"] = round(
967-
timer.elapsed_seconds(), 2
966+
lineage_extractor.build(
967+
connection=connection, all_tables=all_tables, db_schemas=self.db_schemas
968968
)
969969

970970
yield from lineage_extractor.generate()
971971

972+
self.report.lineage_extraction_sec[f"{database}"] = round(
973+
timer.elapsed_seconds(), 2
974+
)
975+
972976
if self.redundant_lineage_run_skip_handler:
973977
# Update the checkpoint state for this run.
974978
self.redundant_lineage_run_skip_handler.update_state(

0 commit comments

Comments
 (0)