Skip to content

Commit 1102f6f

Browse files
committed
tweak filtering logic
1 parent ba4e07f commit 1102f6f

File tree

1 file changed

+29
-18
lines changed
  • metadata-ingestion/src/datahub/ingestion/source/redshift

1 file changed

+29
-18
lines changed

metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py

+29-18
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,16 @@ def __init__(
7575
self.report.lineage_end_time,
7676
) = self._lineage_v1.get_time_window()
7777

78+
self.known_urns = set() # will be set later
79+
7880
def build(
7981
self,
8082
connection: redshift_connector.Connection,
8183
all_tables: Dict[str, Dict[str, List[Union[RedshiftView, RedshiftTable]]]],
8284
db_schemas: Dict[str, Dict[str, RedshiftSchema]],
8385
) -> None:
8486
# Assume things not in `all_tables` as temp tables.
85-
known_urns = set(
87+
self.known_urns = set(
8688
DatasetUrn.create_from_ids(
8789
self.platform,
8890
f"{db}.{schema}.{table.name}",
@@ -93,7 +95,7 @@ def build(
9395
for schema, tables in schemas.items()
9496
for table in tables
9597
)
96-
self.aggregator.is_temp_table = lambda urn: urn not in known_urns
98+
self.aggregator.is_temp_table = lambda urn: urn not in self.known_urns
9799

98100
# Handle all the temp tables up front.
99101
if self.config.resolve_temp_table_in_lineage:
@@ -238,13 +240,26 @@ def _process_sql_parser_lineage(self, lineage_row: LineageRow) -> None:
238240
query_timestamp=lineage_row.timestamp,
239241
)
240242

241-
def _process_stl_scan_lineage(self, lineage_row: LineageRow) -> None:
243+
def _make_filtered_target(self, lineage_row: LineageRow) -> Optional[DatasetUrn]:
242244
target = DatasetUrn.create_from_ids(
243245
self.platform,
244246
f"{self.database}.{lineage_row.target_schema}.{lineage_row.target_table}",
245247
env=self.config.env,
246248
platform_instance=self.config.platform_instance,
247249
)
250+
if target.urn() not in self.known_urns:
251+
logger.debug(
252+
f"Skipping lineage for {target.urn()} as it is not in known_urns"
253+
)
254+
return
255+
256+
return target
257+
258+
def _process_stl_scan_lineage(self, lineage_row: LineageRow) -> None:
259+
target = self._make_filtered_target(lineage_row)
260+
if not target:
261+
return
262+
248263
source = DatasetUrn.create_from_ids(
249264
self.platform,
250265
f"{self.database}.{lineage_row.source_schema}.{lineage_row.source_table}",
@@ -268,15 +283,9 @@ def _process_view_lineage(self, lineage_row: LineageRow) -> None:
268283
if ddl is None:
269284
return
270285

271-
target_name = (
272-
f"{self.database}.{lineage_row.target_schema}.{lineage_row.target_table}"
273-
)
274-
target = DatasetUrn.create_from_ids(
275-
self.platform,
276-
target_name,
277-
env=self.config.env,
278-
platform_instance=self.config.platform_instance,
279-
)
286+
target = self._make_filtered_target(lineage_row)
287+
if not target:
288+
return
280289

281290
self.aggregator.add_view_definition(
282291
view_urn=target,
@@ -300,12 +309,9 @@ def _process_copy_command(self, lineage_row: LineageRow) -> None:
300309

301310
if not lineage_row.target_schema or not lineage_row.target_table:
302311
return
303-
target = DatasetUrn.create_from_ids(
304-
self.platform,
305-
f"{self.database}.{lineage_row.target_schema}.{lineage_row.target_table}",
306-
env=self.config.env,
307-
platform_instance=self.config.platform_instance,
308-
)
312+
target = self._make_filtered_target(lineage_row)
313+
if not target:
314+
return
309315

310316
self.aggregator.add_known_lineage_mapping(
311317
upstream_urn=s3_urn, downstream_urn=target.urn()
@@ -330,6 +336,11 @@ def _process_unload_command(self, lineage_row: LineageRow) -> None:
330336
env=self.config.env,
331337
platform_instance=self.config.platform_instance,
332338
)
339+
if source.urn() not in self.known_urns:
340+
logger.debug(
341+
f"Skipping unload lineage for {source.urn()} as it is not in known_urns"
342+
)
343+
return
333344

334345
self.aggregator.add_known_lineage_mapping(
335346
upstream_urn=source.urn(), downstream_urn=output_urn

0 commit comments

Comments
 (0)