Skip to content

Commit e45f548

Browse files
authored
feat(ingest/iceberg): Improve iceberg connector (#12163)
1 parent 2e54461 commit e45f548

File tree

3 files changed

+189
-11
lines changed

3 files changed

+189
-11
lines changed

metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py

+27-1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
NoSuchNamespaceError,
1111
NoSuchPropertyException,
1212
NoSuchTableError,
13+
ServerError,
1314
)
1415
from pyiceberg.schema import Schema, SchemaVisitorPerPrimitiveType, visit
1516
from pyiceberg.table import Table
@@ -145,6 +146,13 @@ def _get_datasets(self, catalog: Catalog) -> Iterable[Identifier]:
145146
self.report.report_no_listed_namespaces(len(namespaces))
146147
tables_count = 0
147148
for namespace in namespaces:
149+
namespace_repr = ".".join(namespace)
150+
if not self.config.namespace_pattern.allowed(namespace_repr):
151+
LOGGER.info(
152+
f"Namespace {namespace_repr} is not allowed by config pattern, skipping"
153+
)
154+
self.report.report_dropped(f"{namespace_repr}.*")
155+
continue
148156
try:
149157
tables = catalog.list_tables(namespace)
150158
tables_count += len(tables)
@@ -181,6 +189,9 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
181189
if not self.config.table_pattern.allowed(dataset_name):
182190
# Dataset name is rejected by pattern, report as dropped.
183191
self.report.report_dropped(dataset_name)
192+
LOGGER.debug(
193+
f"Skipping table {dataset_name} due to not being allowed by the config pattern"
194+
)
184195
return
185196
try:
186197
if not hasattr(thread_local, "local_catalog"):
@@ -219,6 +230,22 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
219230
LOGGER.warning(
220231
f"NoSuchTableError while processing table {dataset_path}, skipping it.",
221232
)
233+
except FileNotFoundError as e:
234+
self.report.report_warning(
235+
"file-not-found",
236+
f"Encountered FileNotFoundError when trying to read manifest file for {dataset_name}. {e}",
237+
)
238+
LOGGER.warning(
239+
f"FileNotFoundError while processing table {dataset_path}, skipping it."
240+
)
241+
except ServerError as e:
242+
self.report.report_warning(
243+
"iceberg-rest-server-error",
244+
f"Iceberg Rest Catalog returned 500 status due to an unhandled exception for {dataset_name}. Exception: {e}",
245+
)
246+
LOGGER.warning(
247+
f"Iceberg Rest Catalog server error (500 status) encountered when processing table {dataset_path}, skipping it."
248+
)
222249
except Exception as e:
223250
self.report.report_failure("general", f"Failed to create workunit: {e}")
224251
LOGGER.exception(
@@ -269,7 +296,6 @@ def _create_iceberg_workunit(
269296
] = table.current_snapshot().manifest_list
270297
dataset_properties = DatasetPropertiesClass(
271298
name=table.name()[-1],
272-
tags=[],
273299
description=table.metadata.properties.get("comment", None),
274300
customProperties=custom_properties,
275301
)

metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py

+4
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ class IcebergSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin)
6868
default=AllowDenyPattern.allow_all(),
6969
description="Regex patterns for tables to filter in ingestion.",
7070
)
71+
namespace_pattern: AllowDenyPattern = Field(
72+
default=AllowDenyPattern.allow_all(),
73+
description="Regex patterns for namespaces to filter in ingestion.",
74+
)
7175
user_ownership_property: Optional[str] = Field(
7276
default="owner",
7377
description="Iceberg table property to look for a `CorpUser` owner. Can only hold a single user value. If property has no value, no owner information will be emitted.",

metadata-ingestion/tests/unit/test_iceberg.py

+158-10
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
NoSuchIcebergTableError,
1111
NoSuchNamespaceError,
1212
NoSuchPropertyException,
13+
NoSuchTableError,
14+
ServerError,
1315
)
1416
from pyiceberg.io.pyarrow import PyArrowFileIO
1517
from pyiceberg.partitioning import PartitionSpec
@@ -39,6 +41,7 @@
3941
UUIDType,
4042
)
4143

44+
from datahub.configuration.common import AllowDenyPattern
4245
from datahub.ingestion.api.common import PipelineContext
4346
from datahub.ingestion.api.workunit import MetadataWorkUnit
4447
from datahub.ingestion.source.iceberg.iceberg import (
@@ -62,12 +65,12 @@
6265
)
6366

6467

65-
def with_iceberg_source(processing_threads: int = 1) -> IcebergSource:
68+
def with_iceberg_source(processing_threads: int = 1, **kwargs: Any) -> IcebergSource:
6669
catalog = {"test": {"type": "rest"}}
6770
return IcebergSource(
6871
ctx=PipelineContext(run_id="iceberg-source-test"),
6972
config=IcebergSourceConfig(
70-
catalog=catalog, processing_threads=processing_threads
73+
catalog=catalog, processing_threads=processing_threads, **kwargs
7174
),
7275
)
7376

@@ -542,27 +545,27 @@ def __init__(self, tables: Dict[str, Dict[str, Callable[[], Table]]]):
542545
"""
543546
self.tables = tables
544547

545-
def list_namespaces(self) -> Iterable[str]:
546-
return [*self.tables.keys()]
548+
def list_namespaces(self) -> Iterable[Tuple[str]]:
549+
return [*[(key,) for key in self.tables.keys()]]
547550

548551
def list_tables(self, namespace: str) -> Iterable[Tuple[str, str]]:
549-
return [(namespace, table) for table in self.tables[namespace].keys()]
552+
return [(namespace[0], table) for table in self.tables[namespace[0]].keys()]
550553

551554
def load_table(self, dataset_path: Tuple[str, str]) -> Table:
552555
return self.tables[dataset_path[0]][dataset_path[1]]()
553556

554557

555558
class MockCatalogExceptionListingTables(MockCatalog):
556559
def list_tables(self, namespace: str) -> Iterable[Tuple[str, str]]:
557-
if namespace == "no_such_namespace":
560+
if namespace == ("no_such_namespace",):
558561
raise NoSuchNamespaceError()
559-
if namespace == "generic_exception":
562+
if namespace == ("generic_exception",):
560563
raise Exception()
561564
return super().list_tables(namespace)
562565

563566

564567
class MockCatalogExceptionListingNamespaces(MockCatalog):
565-
def list_namespaces(self) -> Iterable[str]:
568+
def list_namespaces(self) -> Iterable[Tuple[str]]:
566569
raise Exception()
567570

568571

@@ -814,15 +817,157 @@ def test_proper_run_with_multiple_namespaces() -> None:
814817
)
815818

816819

820+
def test_filtering() -> None:
821+
source = with_iceberg_source(
822+
processing_threads=1,
823+
table_pattern=AllowDenyPattern(deny=[".*abcd.*"]),
824+
namespace_pattern=AllowDenyPattern(allow=["namespace1"]),
825+
)
826+
mock_catalog = MockCatalog(
827+
{
828+
"namespace1": {
829+
"table_xyz": lambda: Table(
830+
identifier=("namespace1", "table_xyz"),
831+
metadata=TableMetadataV2(
832+
partition_specs=[PartitionSpec(spec_id=0)],
833+
location="s3://abcdefg/namespace1/table_xyz",
834+
last_column_id=0,
835+
schemas=[Schema(schema_id=0)],
836+
),
837+
metadata_location="s3://abcdefg/namespace1/table_xyz",
838+
io=PyArrowFileIO(),
839+
catalog=None,
840+
),
841+
"JKLtable": lambda: Table(
842+
identifier=("namespace1", "JKLtable"),
843+
metadata=TableMetadataV2(
844+
partition_specs=[PartitionSpec(spec_id=0)],
845+
location="s3://abcdefg/namespace1/JKLtable",
846+
last_column_id=0,
847+
schemas=[Schema(schema_id=0)],
848+
),
849+
metadata_location="s3://abcdefg/namespace1/JKLtable",
850+
io=PyArrowFileIO(),
851+
catalog=None,
852+
),
853+
"table_abcd": lambda: Table(
854+
identifier=("namespace1", "table_abcd"),
855+
metadata=TableMetadataV2(
856+
partition_specs=[PartitionSpec(spec_id=0)],
857+
location="s3://abcdefg/namespace1/table_abcd",
858+
last_column_id=0,
859+
schemas=[Schema(schema_id=0)],
860+
),
861+
metadata_location="s3://abcdefg/namespace1/table_abcd",
862+
io=PyArrowFileIO(),
863+
catalog=None,
864+
),
865+
"aaabcd": lambda: Table(
866+
identifier=("namespace1", "aaabcd"),
867+
metadata=TableMetadataV2(
868+
partition_specs=[PartitionSpec(spec_id=0)],
869+
location="s3://abcdefg/namespace1/aaabcd",
870+
last_column_id=0,
871+
schemas=[Schema(schema_id=0)],
872+
),
873+
metadata_location="s3://abcdefg/namespace1/aaabcd",
874+
io=PyArrowFileIO(),
875+
catalog=None,
876+
),
877+
},
878+
"namespace2": {
879+
"foo": lambda: Table(
880+
identifier=("namespace2", "foo"),
881+
metadata=TableMetadataV2(
882+
partition_specs=[PartitionSpec(spec_id=0)],
883+
location="s3://abcdefg/namespace2/foo",
884+
last_column_id=0,
885+
schemas=[Schema(schema_id=0)],
886+
),
887+
metadata_location="s3://abcdefg/namespace2/foo",
888+
io=PyArrowFileIO(),
889+
catalog=None,
890+
),
891+
"bar": lambda: Table(
892+
identifier=("namespace2", "bar"),
893+
metadata=TableMetadataV2(
894+
partition_specs=[PartitionSpec(spec_id=0)],
895+
location="s3://abcdefg/namespace2/bar",
896+
last_column_id=0,
897+
schemas=[Schema(schema_id=0)],
898+
),
899+
metadata_location="s3://abcdefg/namespace2/bar",
900+
io=PyArrowFileIO(),
901+
catalog=None,
902+
),
903+
},
904+
"namespace3": {
905+
"sales": lambda: Table(
906+
identifier=("namespace3", "sales"),
907+
metadata=TableMetadataV2(
908+
partition_specs=[PartitionSpec(spec_id=0)],
909+
location="s3://abcdefg/namespace3/sales",
910+
last_column_id=0,
911+
schemas=[Schema(schema_id=0)],
912+
),
913+
metadata_location="s3://abcdefg/namespace3/sales",
914+
io=PyArrowFileIO(),
915+
catalog=None,
916+
),
917+
"products": lambda: Table(
918+
identifier=("namespace2", "bar"),
919+
metadata=TableMetadataV2(
920+
partition_specs=[PartitionSpec(spec_id=0)],
921+
location="s3://abcdefg/namespace3/products",
922+
last_column_id=0,
923+
schemas=[Schema(schema_id=0)],
924+
),
925+
metadata_location="s3://abcdefg/namespace3/products",
926+
io=PyArrowFileIO(),
927+
catalog=None,
928+
),
929+
},
930+
}
931+
)
932+
with patch(
933+
"datahub.ingestion.source.iceberg.iceberg.IcebergSourceConfig.get_catalog"
934+
) as get_catalog:
935+
get_catalog.return_value = mock_catalog
936+
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
937+
assert len(wu) == 2
938+
urns = []
939+
for unit in wu:
940+
assert isinstance(unit.metadata, MetadataChangeEvent)
941+
assert isinstance(unit.metadata.proposedSnapshot, DatasetSnapshotClass)
942+
urns.append(unit.metadata.proposedSnapshot.urn)
943+
TestCase().assertCountEqual(
944+
urns,
945+
[
946+
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespace1.table_xyz,PROD)",
947+
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespace1.JKLtable,PROD)",
948+
],
949+
)
950+
assert source.report.tables_scanned == 2
951+
952+
817953
def test_handle_expected_exceptions() -> None:
818954
source = with_iceberg_source(processing_threads=3)
819955

820956
def _raise_no_such_property_exception():
821957
raise NoSuchPropertyException()
822958

823-
def _raise_no_such_table_exception():
959+
def _raise_no_such_iceberg_table_exception():
824960
raise NoSuchIcebergTableError()
825961

962+
def _raise_file_not_found_error():
963+
raise FileNotFoundError()
964+
965+
def _raise_no_such_table_exception():
966+
raise NoSuchTableError()
967+
968+
def _raise_server_error():
969+
raise ServerError()
970+
826971
mock_catalog = MockCatalog(
827972
{
828973
"namespaceA": {
@@ -876,6 +1021,9 @@ def _raise_no_such_table_exception():
8761021
),
8771022
"table5": _raise_no_such_property_exception,
8781023
"table6": _raise_no_such_table_exception,
1024+
"table7": _raise_file_not_found_error,
1025+
"table8": _raise_no_such_iceberg_table_exception,
1026+
"table9": _raise_server_error,
8791027
}
8801028
}
8811029
)
@@ -899,7 +1047,7 @@ def _raise_no_such_table_exception():
8991047
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table4,PROD)",
9001048
],
9011049
)
902-
assert source.report.warnings.total_elements == 2
1050+
assert source.report.warnings.total_elements == 5
9031051
assert source.report.failures.total_elements == 0
9041052
assert source.report.tables_scanned == 4
9051053

0 commit comments

Comments
 (0)