Skip to content

Commit d87947a

Browse files
committed
fix(ingest/glue): Add additional checks and logging when specifying catalog_id
1 parent 8c724db commit d87947a

File tree

3 files changed

+48
-10
lines changed

3 files changed

+48
-10
lines changed

metadata-ingestion/src/datahub/ingestion/source/aws/glue.py

+11-3
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@
115115

116116
logger = logging.getLogger(__name__)
117117

118-
119118
DEFAULT_PLATFORM = "glue"
120119
VALID_PLATFORMS = [DEFAULT_PLATFORM, "athena"]
121120

@@ -668,6 +667,7 @@ def get_datajob_wu(self, node: Dict[str, Any], job_name: str) -> MetadataWorkUni
668667
return MetadataWorkUnit(id=f'{job_name}-{node["Id"]}', mce=mce)
669668

670669
def get_all_databases(self) -> Iterable[Mapping[str, Any]]:
670+
logger.debug("Getting all databases")
671671
# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/paginator/GetDatabases.html
672672
paginator = self.glue_client.get_paginator("get_databases")
673673

@@ -684,10 +684,18 @@ def get_all_databases(self) -> Iterable[Mapping[str, Any]]:
684684
pattern += "[?!TargetDatabase]"
685685

686686
for database in paginator_response.search(pattern):
687-
if self.source_config.database_pattern.allowed(database["Name"]):
688-
yield database
687+
if not self.source_config.database_pattern.allowed(database["Name"]):
688+
continue
689+
if (
690+
self.source_config.catalog_id
691+
and database.get("CatalogId")
692+
and database.get("CatalogId") != self.source_config.catalog_id
693+
):
694+
continue
695+
yield database
689696

690697
def get_tables_from_database(self, database: Mapping[str, Any]) -> Iterable[Dict]:
698+
logger.debug(f"Getting tables from database {database['Name']}")
691699
# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/paginator/GetTables.html
692700
paginator = self.glue_client.get_paginator("get_tables")
693701
database_name = database["Name"]

metadata-ingestion/tests/unit/glue/test_glue_source.py

+32-4
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@
3535
validate_all_providers_have_committed_successfully,
3636
)
3737
from tests.unit.glue.test_glue_source_stubs import (
38-
databases_1,
39-
databases_2,
38+
flights_database,
39+
test_database,
4040
get_bucket_tagging,
4141
get_databases_delta_response,
4242
get_databases_response,
@@ -64,6 +64,7 @@
6464
tables_2,
6565
tables_profiling_1,
6666
target_database_tables,
67+
empty_database,
6768
)
6869

6970
FROZEN_TIME = "2020-04-14 07:00:00"
@@ -310,6 +311,33 @@ def test_config_without_platform():
310311
assert source.platform == "glue"
311312

312313

314+
def test_get_databases_filters_by_catalog():
315+
def format_databases(databases):
316+
return set(d["Name"] for d in databases)
317+
318+
all_catalogs_source: GlueSource = GlueSource(
319+
config=GlueSourceConfig(), ctx=PipelineContext(run_id="glue-source-test")
320+
)
321+
with Stubber(all_catalogs_source.glue_client) as glue_stubber:
322+
glue_stubber.add_response("get_databases", get_databases_response, {})
323+
324+
expected = format_databases([flights_database, test_database, empty_database])
325+
assert format_databases(all_catalogs_source.get_all_databases()) == expected
326+
327+
catalog_id = "123412341234"
328+
single_catalog_source = GlueSource(
329+
config=GlueSourceConfig(catalog_id=catalog_id),
330+
ctx=PipelineContext(run_id="glue-source-test"),
331+
)
332+
with Stubber(single_catalog_source.glue_client) as glue_stubber:
333+
glue_stubber.add_response(
334+
"get_databases", get_databases_response, {"CatalogId": catalog_id}
335+
)
336+
337+
expected = format_databases([flights_database, test_database])
338+
assert format_databases(single_catalog_source.get_all_databases()) == expected
339+
340+
313341
@freeze_time(FROZEN_TIME)
314342
def test_glue_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
315343
deleted_actor_golden_mcs = "{}/glue_deleted_actor_mces_golden.json".format(
@@ -357,8 +385,8 @@ def test_glue_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
357385
tables_on_first_call = tables_1
358386
tables_on_second_call = tables_2
359387
mock_get_all_databases_and_tables.side_effect = [
360-
(databases_1, tables_on_first_call),
361-
(databases_2, tables_on_second_call),
388+
([flights_database], tables_on_first_call),
389+
([test_database], tables_on_second_call),
362390
]
363391

364392
pipeline_run1 = run_and_get_pipeline(pipeline_config_dict)

metadata-ingestion/tests/unit/glue/test_glue_source_stubs.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,14 @@
8888
"Permissions": ["ALL"],
8989
}
9090
],
91-
"CatalogId": "123412341234",
91+
"CatalogId": "000000000000",
9292
},
9393
]
9494
}
95-
databases_1 = [{"Name": "flights-database", "CatalogId": "123412341234"}]
96-
databases_2 = [{"Name": "test-database", "CatalogId": "123412341234"}]
95+
flights_database = {"Name": "flights-database", "CatalogId": "123412341234"}
96+
test_database = {"Name": "test-database", "CatalogId": "123412341234"}
97+
empty_database = {"Name": "empty-database", "CatalogId": "000000000000"}
98+
9799
tables_1 = [
98100
{
99101
"Name": "avro",

0 commit comments

Comments
 (0)