Skip to content

Commit 3e1b20c

Browse files
authored
feat(ingestion/iceberg): Several improvements to iceberg connector (#12744)
1 parent 862d1ac commit 3e1b20c

File tree

7 files changed

+135
-50
lines changed

7 files changed

+135
-50
lines changed

metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py

+46-12
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22
import logging
33
import threading
44
import uuid
5-
from typing import Any, Dict, Iterable, List, Optional
5+
from typing import Any, Dict, Iterable, List, Optional, Tuple
66

7+
from dateutil import parser as dateutil_parser
78
from pyiceberg.catalog import Catalog
89
from pyiceberg.exceptions import (
910
NoSuchIcebergTableError,
@@ -81,6 +82,7 @@
8182
OwnerClass,
8283
OwnershipClass,
8384
OwnershipTypeClass,
85+
TimeStampClass,
8486
)
8587
from datahub.utilities.perf_timer import PerfTimer
8688
from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor
@@ -183,16 +185,9 @@ def _get_datasets(self, catalog: Catalog) -> Iterable[Identifier]:
183185
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
184186
thread_local = threading.local()
185187

186-
def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
187-
LOGGER.debug(f"Processing dataset for path {dataset_path}")
188-
dataset_name = ".".join(dataset_path)
189-
if not self.config.table_pattern.allowed(dataset_name):
190-
# Dataset name is rejected by pattern, report as dropped.
191-
self.report.report_dropped(dataset_name)
192-
LOGGER.debug(
193-
f"Skipping table {dataset_name} due to not being allowed by the config pattern"
194-
)
195-
return
188+
def _try_processing_dataset(
189+
dataset_path: Tuple[str, ...], dataset_name: str
190+
) -> Iterable[MetadataWorkUnit]:
196191
try:
197192
if not hasattr(thread_local, "local_catalog"):
198193
LOGGER.debug(
@@ -248,10 +243,31 @@ def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
248243
LOGGER.warning(
249244
f"Iceberg Rest Catalog server error (500 status) encountered when processing table {dataset_path}, skipping it."
250245
)
246+
except ValueError as e:
247+
if "Could not initialize FileIO" not in str(e):
248+
raise
249+
self.report.warning(
250+
"Could not initialize FileIO",
251+
f"Could not initialize FileIO for {dataset_path} due to: {e}",
252+
)
253+
254+
def _process_dataset(dataset_path: Identifier) -> Iterable[MetadataWorkUnit]:
255+
try:
256+
LOGGER.debug(f"Processing dataset for path {dataset_path}")
257+
dataset_name = ".".join(dataset_path)
258+
if not self.config.table_pattern.allowed(dataset_name):
259+
# Dataset name is rejected by pattern, report as dropped.
260+
self.report.report_dropped(dataset_name)
261+
LOGGER.debug(
262+
f"Skipping table {dataset_name} due to not being allowed by the config pattern"
263+
)
264+
return
265+
266+
yield from _try_processing_dataset(dataset_path, dataset_name)
251267
except Exception as e:
252268
self.report.report_failure(
253269
"general",
254-
f"Failed to create workunit for dataset {dataset_name}: {e}",
270+
f"Failed to create workunit for dataset {dataset_path}: {e}",
255271
)
256272
LOGGER.exception(
257273
f"Exception while processing table {dataset_path}, skipping it.",
@@ -288,6 +304,7 @@ def _create_iceberg_workunit(
288304
)
289305

290306
# Dataset properties aspect.
307+
additional_properties = {}
291308
custom_properties = table.metadata.properties.copy()
292309
custom_properties["location"] = table.metadata.location
293310
custom_properties["format-version"] = str(table.metadata.format_version)
@@ -299,10 +316,27 @@ def _create_iceberg_workunit(
299316
custom_properties["manifest-list"] = (
300317
table.current_snapshot().manifest_list
301318
)
319+
additional_properties["lastModified"] = TimeStampClass(
320+
int(table.current_snapshot().timestamp_ms)
321+
)
322+
if "created-at" in custom_properties:
323+
try:
324+
dt = dateutil_parser.isoparse(custom_properties["created-at"])
325+
additional_properties["created"] = TimeStampClass(
326+
int(dt.timestamp() * 1000)
327+
)
328+
except Exception as ex:
329+
LOGGER.warning(
330+
f"Exception while trying to parse creation date {custom_properties['created-at']}, ignoring: {ex}"
331+
)
332+
302333
dataset_properties = DatasetPropertiesClass(
303334
name=table.name()[-1],
304335
description=table.metadata.properties.get("comment", None),
305336
customProperties=custom_properties,
337+
lastModified=additional_properties.get("lastModified"),
338+
created=additional_properties.get("created"),
339+
qualifiedName=dataset_name,
306340
)
307341
dataset_snapshot.aspects.append(dataset_properties)
308342
# Dataset ownership aspect.

metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py

+31-20
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
import threading
23
from dataclasses import dataclass, field
34
from typing import Any, Dict, Optional
45

@@ -156,43 +157,53 @@ class TopTableTimings:
156157
def __init__(self, size: int = 10):
157158
self._size = size
158159
self.top_entites = SortedList(key=lambda x: -x.get(self._VALUE_FIELD, 0))
160+
self._lock = threading.Lock()
159161

160162
def add(self, entity: Dict[str, Any]) -> None:
161163
if self._VALUE_FIELD not in entity:
162164
return
163-
self.top_entites.add(entity)
164-
if len(self.top_entites) > self._size:
165-
self.top_entites.pop()
165+
with self._lock:
166+
self.top_entites.add(entity)
167+
if len(self.top_entites) > self._size:
168+
self.top_entites.pop()
166169

167170
def __str__(self) -> str:
168-
if len(self.top_entites) == 0:
169-
return "no timings reported"
170-
return str(list(self.top_entites))
171+
with self._lock:
172+
if len(self.top_entites) == 0:
173+
return "no timings reported"
174+
return str(list(self.top_entites))
171175

172176

173177
class TimingClass:
174178
times: SortedList
175179

176180
def __init__(self):
177181
self.times = SortedList()
182+
self._lock = threading.Lock()
178183

179184
def add_timing(self, t: float) -> None:
180-
self.times.add(t)
185+
with self._lock:
186+
self.times.add(t)
181187

182188
def __str__(self) -> str:
183-
if len(self.times) == 0:
184-
return "no timings reported"
185-
total = sum(self.times)
186-
avg = total / len(self.times)
187-
return str(
188-
{
189-
"average_time": format_timespan(avg, detailed=True, max_units=3),
190-
"min_time": format_timespan(self.times[0], detailed=True, max_units=3),
191-
"max_time": format_timespan(self.times[-1], detailed=True, max_units=3),
192-
# total_time does not provide correct information in case we run in more than 1 thread
193-
"total_time": format_timespan(total, detailed=True, max_units=3),
194-
}
195-
)
189+
with self._lock:
190+
if len(self.times) == 0:
191+
return "no timings reported"
192+
total = sum(self.times)
193+
avg = total / len(self.times)
194+
return str(
195+
{
196+
"average_time": format_timespan(avg, detailed=True, max_units=3),
197+
"min_time": format_timespan(
198+
self.times[0], detailed=True, max_units=3
199+
),
200+
"max_time": format_timespan(
201+
self.times[-1], detailed=True, max_units=3
202+
),
203+
# total_time does not provide correct information in case we run in more than 1 thread
204+
"total_time": format_timespan(total, detailed=True, max_units=3),
205+
}
206+
)
196207

197208

198209
@dataclass

metadata-ingestion/tests/integration/iceberg/iceberg_deleted_table_mces_golden.json

+14-7
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,23 @@
1111
},
1212
{
1313
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
14-
"name": "another_taxis",
1514
"customProperties": {
1615
"owner": "root",
17-
"created-at": "2024-06-27T17:29:32.492204247Z",
16+
"created-at": "2025-02-28T22:29:45.128452801Z",
1817
"write.format.default": "parquet",
1918
"location": "s3a://warehouse/wh/nyc/another_taxis",
2019
"format-version": "1",
2120
"partition-spec": "[{\"name\": \"trip_date\", \"transform\": \"identity\", \"source\": \"trip_date\", \"source-id\": 2, \"source-type\": \"timestamptz\", \"field-id\": 1000}]",
22-
"snapshot-id": "1131595459662979239",
23-
"manifest-list": "s3a://warehouse/wh/nyc/another_taxis/metadata/snap-1131595459662979239-1-0e80739b-774c-4eda-9d96-3a4c70873c32.avro"
21+
"snapshot-id": "2650690634256613262",
22+
"manifest-list": "s3a://warehouse/wh/nyc/another_taxis/metadata/snap-2650690634256613262-1-da2608b8-1ca0-4331-ac49-332e8a7654d3.avro"
23+
},
24+
"name": "another_taxis",
25+
"qualifiedName": "nyc.another_taxis",
26+
"created": {
27+
"time": 1740781785128
28+
},
29+
"lastModified": {
30+
"time": 1740781786629
2431
},
2532
"tags": []
2633
}
@@ -150,7 +157,7 @@
150157
},
151158
"systemMetadata": {
152159
"lastObserved": 1586847600000,
153-
"runId": "iceberg-2020_04_14-07_00_00",
160+
"runId": "iceberg-2020_04_14-07_00_00-x9zw1z",
154161
"lastRunId": "no-run-id-provided",
155162
"pipelineName": "test_pipeline"
156163
}
@@ -168,7 +175,7 @@
168175
},
169176
"systemMetadata": {
170177
"lastObserved": 1586847600000,
171-
"runId": "iceberg-2020_04_14-07_00_00",
178+
"runId": "iceberg-2020_04_14-07_00_00-x9zw1z",
172179
"lastRunId": "no-run-id-provided",
173180
"pipelineName": "test_pipeline"
174181
}
@@ -185,7 +192,7 @@
185192
},
186193
"systemMetadata": {
187194
"lastObserved": 1586847600000,
188-
"runId": "iceberg-2020_04_14-07_00_00",
195+
"runId": "iceberg-2020_04_14-07_00_00-x9zw1z",
189196
"lastRunId": "no-run-id-provided",
190197
"pipelineName": "test_pipeline"
191198
}

metadata-ingestion/tests/integration/iceberg/iceberg_ingest_mces_golden.json

+11-4
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,23 @@
1111
},
1212
{
1313
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
14-
"name": "taxis",
1514
"customProperties": {
1615
"owner": "root",
17-
"created-at": "2024-05-22T14:08:04.001538500Z",
16+
"created-at": "2025-02-28T22:28:42.803284675Z",
1817
"write.format.default": "parquet",
1918
"location": "s3a://warehouse/wh/nyc/taxis",
2019
"format-version": "1",
2120
"partition-spec": "[{\"name\": \"trip_date\", \"transform\": \"identity\", \"source\": \"trip_date\", \"source-id\": 2, \"source-type\": \"timestamptz\", \"field-id\": 1000}]",
22-
"snapshot-id": "5259199139271057622",
23-
"manifest-list": "s3a://warehouse/wh/nyc/taxis/metadata/snap-5259199139271057622-1-24dca7b8-d437-458e-ae91-df1d3e30bdc8.avro"
21+
"snapshot-id": "7588721351475793452",
22+
"manifest-list": "s3a://warehouse/wh/nyc/taxis/metadata/snap-7588721351475793452-1-55e4dfe5-1478-4fda-a866-f03c24e643bb.avro"
23+
},
24+
"name": "taxis",
25+
"qualifiedName": "nyc.taxis",
26+
"created": {
27+
"time": 1740781722803
28+
},
29+
"lastModified": {
30+
"time": 1740781724377
2431
},
2532
"tags": []
2633
}

metadata-ingestion/tests/integration/iceberg/iceberg_profile_mces_golden.json

+13-6
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,23 @@
1111
},
1212
{
1313
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
14-
"name": "taxis",
1514
"customProperties": {
1615
"owner": "root",
17-
"created-at": "2024-05-22T14:10:22.926080700Z",
16+
"created-at": "2025-02-28T22:30:17.629572885Z",
1817
"write.format.default": "parquet",
1918
"location": "s3a://warehouse/wh/nyc/taxis",
2019
"format-version": "1",
2120
"partition-spec": "[{\"name\": \"trip_date\", \"transform\": \"identity\", \"source\": \"trip_date\", \"source-id\": 2, \"source-type\": \"timestamptz\", \"field-id\": 1000}]",
22-
"snapshot-id": "564034874306625146",
23-
"manifest-list": "s3a://warehouse/wh/nyc/taxis/metadata/snap-564034874306625146-1-562a1705-d774-4e0a-baf0-1988bcc7be72.avro"
21+
"snapshot-id": "7306419080492452372",
22+
"manifest-list": "s3a://warehouse/wh/nyc/taxis/metadata/snap-7306419080492452372-1-53f849ab-2791-4a72-888c-fd2b0173712c.avro"
23+
},
24+
"name": "taxis",
25+
"qualifiedName": "nyc.taxis",
26+
"created": {
27+
"time": 1740781817629
28+
},
29+
"lastModified": {
30+
"time": 1740781819146
2431
},
2532
"tags": []
2633
}
@@ -163,8 +170,8 @@
163170
"json": {
164171
"timestampMillis": 1586847600000,
165172
"partitionSpec": {
166-
"type": "FULL_TABLE",
167-
"partition": "FULL_TABLE_SNAPSHOT"
173+
"partition": "FULL_TABLE_SNAPSHOT",
174+
"type": "FULL_TABLE"
168175
},
169176
"rowCount": 5,
170177
"columnCount": 6,

metadata-ingestion/tests/integration/iceberg/test_iceberg.py

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
# these.
2323
PATHS_IN_GOLDEN_FILE_TO_IGNORE = [
2424
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['created-at'\]",
25+
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['created'\]",
26+
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['com.linkedin.pegasus2avro.dataset.DatasetProperties'\]\['lastModified'\]",
2527
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['snapshot-id'\]",
2628
r"root\[\d+\]\['proposedSnapshot'\].+\['aspects'\].+\['customProperties'\]\['manifest-list'\]",
2729
]

metadata-ingestion/tests/unit/test_iceberg.py

+18-1
Original file line numberDiff line numberDiff line change
@@ -968,6 +968,9 @@ def _raise_no_such_table_exception():
968968
def _raise_server_error():
969969
raise ServerError()
970970

971+
def _raise_fileio_error():
972+
raise ValueError("Could not initialize FileIO: abc.dummy.fileio")
973+
971974
mock_catalog = MockCatalog(
972975
{
973976
"namespaceA": {
@@ -1024,6 +1027,7 @@ def _raise_server_error():
10241027
"table7": _raise_file_not_found_error,
10251028
"table8": _raise_no_such_iceberg_table_exception,
10261029
"table9": _raise_server_error,
1030+
"table10": _raise_fileio_error,
10271031
}
10281032
}
10291033
)
@@ -1047,7 +1051,7 @@ def _raise_server_error():
10471051
"urn:li:dataset:(urn:li:dataPlatform:iceberg,namespaceA.table4,PROD)",
10481052
],
10491053
)
1050-
assert source.report.warnings.total_elements == 5
1054+
assert source.report.warnings.total_elements == 6
10511055
assert source.report.failures.total_elements == 0
10521056
assert source.report.tables_scanned == 4
10531057

@@ -1058,6 +1062,9 @@ def test_handle_unexpected_exceptions() -> None:
10581062
def _raise_exception():
10591063
raise Exception()
10601064

1065+
def _raise_other_value_error_exception():
1066+
raise ValueError("Other value exception")
1067+
10611068
mock_catalog = MockCatalog(
10621069
{
10631070
"namespaceA": {
@@ -1110,6 +1117,7 @@ def _raise_exception():
11101117
catalog=None,
11111118
),
11121119
"table5": _raise_exception,
1120+
"table6": _raise_other_value_error_exception,
11131121
}
11141122
}
11151123
)
@@ -1136,3 +1144,12 @@ def _raise_exception():
11361144
assert source.report.warnings.total_elements == 0
11371145
assert source.report.failures.total_elements == 1
11381146
assert source.report.tables_scanned == 4
1147+
# Needed to make sure all failures are recognized properly
1148+
failures = [f for f in source.report.failures]
1149+
TestCase().assertCountEqual(
1150+
failures[0].context,
1151+
[
1152+
"Failed to create workunit for dataset ('namespaceA', 'table6'): Other value exception",
1153+
"Failed to create workunit for dataset ('namespaceA', 'table5'): ",
1154+
],
1155+
)

0 commit comments

Comments
 (0)