Skip to content

Commit 31df9c4

Browse files
authored
feat(ingest/redshift): lineage for external schema created from redshift (#12826)
1 parent dd37113 commit 31df9c4

File tree

8 files changed

+160
-30
lines changed

8 files changed

+160
-30
lines changed

metadata-ingestion/src/datahub/ingestion/source/redshift/lineage.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -814,8 +814,8 @@ def get_lineage(
814814

815815
tablename = table.name
816816
if (
817-
table.is_external_table
818-
and schema.is_external_schema
817+
table.is_external_table()
818+
and schema.is_external_schema()
819819
and schema.external_platform
820820
):
821821
# external_db_params = schema.option

metadata-ingestion/src/datahub/ingestion/source/redshift/lineage_v2.py

+19-7
Original file line numberDiff line numberDiff line change
@@ -403,8 +403,8 @@ def _process_external_tables(
403403
for table in tables:
404404
schema = db_schemas[self.database][schema_name]
405405
if (
406-
table.is_external_table
407-
and schema.is_external_schema
406+
table.is_external_table()
407+
and schema.is_external_schema()
408408
and schema.external_platform
409409
):
410410
# external_db_params = schema.option
@@ -416,14 +416,26 @@ def _process_external_tables(
416416
platform_instance=self.config.platform_instance,
417417
env=self.config.env,
418418
)
419-
upstream_urn = mce_builder.make_dataset_urn_with_platform_instance(
420-
upstream_platform,
421-
f"{schema.external_database}.{table.name}",
422-
platform_instance=(
419+
if upstream_platform == self.platform:
420+
upstream_schema = schema.get_upstream_schema_name() or "public"
421+
upstream_dataset_name = (
422+
f"{schema.external_database}.{upstream_schema}.{table.name}"
423+
)
424+
upstream_platform_instance = self.config.platform_instance
425+
else:
426+
upstream_dataset_name = (
427+
f"{schema.external_database}.{table.name}"
428+
)
429+
upstream_platform_instance = (
423430
self.config.platform_instance_map.get(upstream_platform)
424431
if self.config.platform_instance_map
425432
else None
426-
),
433+
)
434+
435+
upstream_urn = mce_builder.make_dataset_urn_with_platform_instance(
436+
upstream_platform,
437+
upstream_dataset_name,
438+
platform_instance=upstream_platform_instance,
427439
env=self.config.env,
428440
)
429441

metadata-ingestion/src/datahub/ingestion/source/redshift/profile.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def get_workunits(
4848
if not self.config.schema_pattern.allowed(schema):
4949
continue
5050
for table in tables[db].get(schema, {}):
51-
if table.is_external_table:
51+
if table.is_external_table() or self.report.is_shared_database:
5252
if not self.config.profiling.profile_external_tables:
5353
# Case 1: If user did not tell us to profile external tables, simply log this.
5454
self.report.profiling_skipped_other[schema] += 1

metadata-ingestion/src/datahub/ingestion/source/redshift/query.py

+14-6
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@ def get_database_details(database):
8383
# NOTE: Tables from shared database are not available in pg_catalog.pg_class
8484
@staticmethod
8585
def list_tables(
86-
skip_external_tables: bool = False, is_shared_database: bool = False
86+
database: str,
87+
skip_external_tables: bool = False,
88+
is_shared_database: bool = False,
8789
) -> str:
8890
# NOTE: it looks like description is available only in pg_description
8991
# So this remains preferrred way
@@ -123,7 +125,7 @@ def list_tables(
123125
AND n.nspname != 'information_schema'
124126
"""
125127

126-
external_tables_query = """
128+
external_tables_query = f"""
127129
SELECT 'EXTERNAL_TABLE' as tabletype,
128130
NULL AS "schema_oid",
129131
schemaname AS "schema",
@@ -142,10 +144,11 @@ def list_tables(
142144
serde_parameters,
143145
NULL as table_description
144146
FROM pg_catalog.svv_external_tables
147+
WHERE redshift_database_name='{database}'
145148
ORDER BY "schema",
146149
"relname"
147150
"""
148-
shared_database_tables_query = """
151+
shared_database_tables_query = f"""
149152
SELECT table_type as tabletype,
150153
NULL AS "schema_oid",
151154
schema_name AS "schema",
@@ -164,6 +167,7 @@ def list_tables(
164167
NULL as serde_parameters,
165168
NULL as table_description
166169
FROM svv_redshift_tables
170+
WHERE database_name='{database}'
167171
ORDER BY "schema",
168172
"relname"
169173
"""
@@ -175,9 +179,11 @@ def list_tables(
175179
return f"{tables_query} UNION {external_tables_query}"
176180

177181
@staticmethod
178-
def list_columns(is_shared_database: bool = False) -> str:
182+
def list_columns(
183+
database_name: str, schema_name: str, is_shared_database: bool = False
184+
) -> str:
179185
if is_shared_database:
180-
return """
186+
return f"""
181187
SELECT
182188
schema_name as "schema",
183189
table_name as "table_name",
@@ -198,9 +204,10 @@ def list_columns(is_shared_database: bool = False) -> str:
198204
null as "table_oid"
199205
FROM SVV_REDSHIFT_COLUMNS
200206
WHERE 1 and schema = '{schema_name}'
207+
AND database_name = '{database_name}'
201208
ORDER BY "schema", "table_name", "attnum"
202209
"""
203-
return """
210+
return f"""
204211
SELECT
205212
n.nspname as "schema",
206213
c.relname as "table_name",
@@ -275,6 +282,7 @@ def list_columns(is_shared_database: bool = False) -> str:
275282
null as "table_oid"
276283
FROM SVV_EXTERNAL_COLUMNS
277284
WHERE 1 and schema = '{schema_name}'
285+
AND redshift_database_name = '{database_name}'
278286
ORDER BY "schema", "table_name", "attnum"
279287
"""
280288

metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py

+9-5
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,7 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit
366366

367367
self.db = self.data_dictionary.get_database_details(connection, database)
368368
self.report.is_shared_database = (
369-
self.db is not None and self.db.is_shared_database
369+
self.db is not None and self.db.is_shared_database()
370370
)
371371
with self.report.new_stage(METADATA_EXTRACTION):
372372
self.db_tables[database] = defaultdict()
@@ -508,6 +508,7 @@ def process_schema(
508508
schema_columns: Dict[str, Dict[str, List[RedshiftColumn]]] = {}
509509
schema_columns[schema.name] = self.data_dictionary.get_columns_for_schema(
510510
conn=connection,
511+
database=database,
511512
schema=schema,
512513
is_shared_database=self.report.is_shared_database,
513514
)
@@ -829,9 +830,12 @@ def gen_dataset_workunits(
829830
domain_config=self.config.domain,
830831
)
831832

832-
def cache_tables_and_views(self, connection, database):
833+
def cache_tables_and_views(
834+
self, connection: redshift_connector.Connection, database: str
835+
) -> None:
833836
tables, views = self.data_dictionary.get_tables_and_views(
834837
conn=connection,
838+
database=database,
835839
skip_external_tables=self.config.skip_external_tables,
836840
is_shared_database=self.report.is_shared_database,
837841
)
@@ -982,7 +986,7 @@ def extract_lineage_v2(
982986
self.datashares_helper.to_platform_resource(list(outbound_shares))
983987
)
984988

985-
if self.db and self.db.is_shared_database:
989+
if self.db and self.db.is_shared_database():
986990
inbound_share = self.db.get_inbound_share()
987991
if inbound_share is None:
988992
self.report.warning(
@@ -996,8 +1000,8 @@ def extract_lineage_v2(
9961000
):
9971001
lineage_extractor.aggregator.add(known_lineage)
9981002

999-
# TODO: distinguish between definition level lineage and audit log based lineage
1000-
# definition level lineage should never be skipped
1003+
# TODO: distinguish between definition level lineage and audit log based lineage.
1004+
# Definition level lineage should never be skipped
10011005
if not self._should_ingest_lineage():
10021006
return
10031007

metadata-ingestion/src/datahub/ingestion/source/redshift/redshift_schema.py

+27-7
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ class RedshiftTable(BaseTable):
4242
serde_parameters: Optional[str] = None
4343
last_altered: Optional[datetime] = None
4444

45-
@property
4645
def is_external_table(self) -> bool:
4746
return self.type == "EXTERNAL_TABLE"
4847

@@ -56,7 +55,6 @@ class RedshiftView(BaseTable):
5655
size_in_bytes: Optional[int] = None
5756
rows_count: Optional[int] = None
5857

59-
@property
6058
def is_external_table(self) -> bool:
6159
return self.type == "EXTERNAL_TABLE"
6260

@@ -71,10 +69,28 @@ class RedshiftSchema:
7169
external_platform: Optional[str] = None
7270
external_database: Optional[str] = None
7371

74-
@property
7572
def is_external_schema(self) -> bool:
7673
return self.type == "external"
7774

75+
def get_upstream_schema_name(self) -> Optional[str]:
76+
"""Gets the schema name from the external schema option.
77+
78+
Returns:
79+
Optional[str]: The schema name from the external schema option
80+
if this is an external schema and has a valid option format, None otherwise.
81+
"""
82+
83+
if not self.is_external_schema() or not self.option:
84+
return None
85+
86+
# For external schema on redshift, option is in form
87+
# {"SCHEMA":"tickit"}
88+
schema_match = re.search(r'"SCHEMA"\s*:\s*"([^"]*)"', self.option)
89+
if not schema_match:
90+
return None
91+
else:
92+
return schema_match.group(1)
93+
7894

7995
@dataclass
8096
class PartialInboundDatashare:
@@ -117,7 +133,6 @@ class RedshiftDatabase:
117133
type: str
118134
options: Optional[str] = None
119135

120-
@property
121136
def is_shared_database(self) -> bool:
122137
return self.type == "shared"
123138

@@ -128,7 +143,7 @@ def is_shared_database(self) -> bool:
128143
def get_inbound_share(
129144
self,
130145
) -> Optional[Union[InboundDatashare, PartialInboundDatashare]]:
131-
if not self.is_shared_database or not self.options:
146+
if not self.is_shared_database() or not self.options:
132147
return None
133148

134149
# Convert into single regex ??
@@ -323,6 +338,7 @@ def enrich_tables(
323338
def get_tables_and_views(
324339
self,
325340
conn: redshift_connector.Connection,
341+
database: str,
326342
skip_external_tables: bool = False,
327343
is_shared_database: bool = False,
328344
) -> Tuple[Dict[str, List[RedshiftTable]], Dict[str, List[RedshiftView]]]:
@@ -336,6 +352,7 @@ def get_tables_and_views(
336352
cur = RedshiftDataDictionary.get_query_result(
337353
conn,
338354
RedshiftCommonQuery.list_tables(
355+
database=database,
339356
skip_external_tables=skip_external_tables,
340357
is_shared_database=is_shared_database,
341358
),
@@ -484,14 +501,17 @@ def get_schema_fields_for_column(
484501
@staticmethod
485502
def get_columns_for_schema(
486503
conn: redshift_connector.Connection,
504+
database: str,
487505
schema: RedshiftSchema,
488506
is_shared_database: bool = False,
489507
) -> Dict[str, List[RedshiftColumn]]:
490508
cursor = RedshiftDataDictionary.get_query_result(
491509
conn,
492510
RedshiftCommonQuery.list_columns(
493-
is_shared_database=is_shared_database
494-
).format(schema_name=schema.name),
511+
database_name=database,
512+
schema_name=schema.name,
513+
is_shared_database=is_shared_database,
514+
),
495515
)
496516

497517
table_columns: Dict[str, List[RedshiftColumn]] = {}

metadata-ingestion/tests/unit/redshift/test_redshift_datashares.py

+48-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
RedshiftTable,
1717
RedshiftView,
1818
)
19-
from datahub.ingestion.source.redshift.redshift_schema import PartialInboundDatashare
19+
from datahub.ingestion.source.redshift.redshift_schema import (
20+
PartialInboundDatashare,
21+
RedshiftDatabase,
22+
)
2023
from datahub.ingestion.source.redshift.report import RedshiftReport
2124
from datahub.metadata.schema_classes import (
2225
PlatformResourceInfoClass,
@@ -484,3 +487,47 @@ def test_to_platform_resource_exception_handling(self):
484487
list(report.warnings)[0].title
485488
== "Downstream lineage to outbound datashare may not work"
486489
)
490+
491+
def test_database_get_inbound_datashare_success(self):
492+
db = RedshiftDatabase(
493+
name="db",
494+
type="shared",
495+
options='{"datashare_name":"xxx","datashare_producer_account":"1234","datashare_producer_namespace":"yyy"}',
496+
)
497+
498+
assert db.get_inbound_share() == InboundDatashare(
499+
share_name="xxx",
500+
producer_namespace="yyy",
501+
consumer_database="db",
502+
)
503+
504+
def test_database_get_partial_inbound_datashare_success(self):
505+
db = RedshiftDatabase(
506+
name="db",
507+
type="shared",
508+
options='{"datashare_name":"xxx","datashare_producer_account":"1234","datashare_producer_namespace":"yy',
509+
)
510+
511+
assert db.get_inbound_share() == PartialInboundDatashare(
512+
share_name="xxx",
513+
producer_namespace_prefix="yy",
514+
consumer_database="db",
515+
)
516+
517+
def test_database_no_inbound_datashare(self):
518+
db = RedshiftDatabase(
519+
name="db",
520+
type="local",
521+
options=None,
522+
)
523+
524+
assert db.get_inbound_share() is None
525+
526+
def test_shared_database_no_inbound_datashare(self):
527+
db = RedshiftDatabase(
528+
name="db",
529+
type="shared",
530+
options=None,
531+
)
532+
533+
assert db.get_inbound_share() is None

metadata-ingestion/tests/unit/redshift/test_redshift_lineage.py

+40-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
RedshiftLineageExtractor,
1616
parse_alter_table_rename,
1717
)
18-
from datahub.ingestion.source.redshift.redshift_schema import TempTableRow
18+
from datahub.ingestion.source.redshift.redshift_schema import (
19+
RedshiftSchema,
20+
TempTableRow,
21+
)
1922
from datahub.ingestion.source.redshift.report import RedshiftReport
2023
from datahub.metadata.schema_classes import NumberTypeClass, SchemaFieldDataTypeClass
2124
from datahub.sql_parsing.schema_resolver import SchemaResolver
@@ -794,3 +797,39 @@ def test_collapse_temp_recursive_cll_lineage_with_circular_reference():
794797

795798
assert len(datasets) == 1
796799
# Here we only interested if it fails or not
800+
801+
802+
def test_external_schema_get_upstream_schema_success():
803+
schema = RedshiftSchema(
804+
name="schema",
805+
database="XXXXXXXX",
806+
type="external",
807+
option='{"SCHEMA":"sales_schema"}',
808+
external_platform="redshift",
809+
)
810+
811+
assert schema.get_upstream_schema_name() == "sales_schema"
812+
813+
814+
def test_external_schema_no_upstream_schema():
815+
schema = RedshiftSchema(
816+
name="schema",
817+
database="XXXXXXXX",
818+
type="external",
819+
option=None,
820+
external_platform="redshift",
821+
)
822+
823+
assert schema.get_upstream_schema_name() is None
824+
825+
826+
def test_local_schema_no_upstream_schema():
827+
schema = RedshiftSchema(
828+
name="schema",
829+
database="XXXXXXXX",
830+
type="local",
831+
option='{"some_other_option":"x"}',
832+
external_platform=None,
833+
)
834+
835+
assert schema.get_upstream_schema_name() is None

0 commit comments

Comments
 (0)