Skip to content

Commit ae1806f

Browse files
feat(ingest/dynamoDB): flatten struct fields (#9852)
Co-authored-by: Tamas Nemeth <[email protected]>
1 parent bbd818a commit ae1806f

File tree

4 files changed

+260
-40
lines changed

4 files changed

+260
-40
lines changed

metadata-ingestion/src/datahub/ingestion/source/dynamodb/dynamodb.py

+60-31
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import logging
22
from dataclasses import dataclass, field
3-
from typing import Any, Counter, Dict, Iterable, List, Optional, Type, Union
3+
from typing import Any, Counter, Dict, Iterable, List, Optional, Tuple, Type, Union
44

55
import boto3
66
import pydantic
@@ -61,6 +61,7 @@
6161
PAGE_SIZE = 100
6262
MAX_SCHEMA_SIZE = 300
6363
MAX_PRIMARY_KEYS_SIZE = 100
64+
FIELD_DELIMITER = "."
6465

6566
logger: logging.Logger = logging.getLogger(__name__)
6667

@@ -285,13 +286,13 @@ def construct_schema_from_dynamodb(
285286
dynamodb_client: BaseClient,
286287
region: str,
287288
table_name: str,
288-
) -> Dict[str, SchemaDescription]:
289+
) -> Dict[Tuple[str, ...], SchemaDescription]:
289290
"""
290291
This will use the dynamodb client to scan the given table to retrieve items with pagination,
291292
and construct the schema of this table by reading the attributes of the retrieved items
292293
"""
293294
paginator = dynamodb_client.get_paginator("scan")
294-
schema: Dict[str, SchemaDescription] = {}
295+
schema: Dict[Tuple[str, ...], SchemaDescription] = {}
295296
"""
296297
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Paginator.Scan
297298
Note that the behavior of the pagination does not align with the documentation according to https://stackoverflow.com/questions/39201093/how-to-use-boto3-pagination
@@ -323,7 +324,7 @@ def include_table_item_to_schema(
323324
dynamodb_client: Any,
324325
region: str,
325326
table_name: str,
326-
schema: Dict[str, SchemaDescription],
327+
schema: Dict[Tuple[str, ...], SchemaDescription],
327328
) -> None:
328329
"""
329330
It will look up in the config include_table_item dict to see if "region.table_name" exists as key,
@@ -358,7 +359,9 @@ def include_table_item_to_schema(
358359
self.construct_schema_from_items(items, schema)
359360

360361
def construct_schema_from_items(
361-
slef, items: List[Dict[str, Dict]], schema: Dict[str, SchemaDescription]
362+
self,
363+
items: List[Dict[str, Dict]],
364+
schema: Dict[Tuple[str, ...], SchemaDescription],
362365
) -> None:
363366
"""
364367
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Client.scan
@@ -367,35 +370,58 @@ def construct_schema_from_items(
367370
we are writing our own construct schema method, take the attribute name as key and SchemaDescription as value
368371
"""
369372
for document in items:
370-
# the key is the attribute name and the value is a dict with only one entry,
371-
# whose key is the data type and value is the data
372-
for key, value in document.items():
373-
if value is not None:
374-
data_type = list(value.keys())[0]
375-
if key not in schema:
376-
schema[key] = {
377-
"types": Counter(data_type),
378-
"count": 1,
379-
# It seems we don't have collapsed field name so we are using attribute name here
380-
"delimited_name": key,
381-
"type": data_type,
382-
"nullable": False,
383-
}
384-
else:
385-
# update the type count
386-
schema[key]["types"].update({data_type: 1})
387-
schema[key]["count"] += 1
388-
# if we found an attribute name with different attribute type, we consider this attribute type as "mixed"
389-
field_types = schema[key]["types"]
390-
if len(field_types.keys()) > 1:
391-
schema[key]["type"] = "mixed"
373+
self.append_schema(schema, document)
374+
375+
def append_schema(
376+
self,
377+
schema: Dict[Tuple[str, ...], SchemaDescription],
378+
document: Dict[str, Dict],
379+
parent_field_path: Tuple[str, ...] = (),
380+
) -> None:
381+
# the key is the attribute name and the value is a dict with only one entry,
382+
# whose key is the data type and value is the data and we will recursively expand
383+
# map data type to get flattened field
384+
for key, value in document.items():
385+
if value is not None:
386+
data_type = list(value.keys())[0]
387+
attribute_value = value[data_type]
388+
current_field_path = parent_field_path + (key,)
389+
# Handle nested maps by recursive calls
390+
if data_type == "M":
391+
logger.debug(
392+
f"expanding nested fields for map, current_field_path: {current_field_path}"
393+
)
394+
self.append_schema(schema, attribute_value, current_field_path)
395+
396+
if current_field_path not in schema:
397+
schema[current_field_path] = {
398+
"types": Counter({data_type: 1}),
399+
"count": 1,
400+
# It seems we don't have collapsed field name so we are using attribute name here
401+
"delimited_name": FIELD_DELIMITER.join(current_field_path),
402+
"type": data_type,
403+
"nullable": False,
404+
}
405+
else:
406+
schema[current_field_path]["types"].update({data_type: 1})
407+
schema[current_field_path]["count"] += 1
408+
# if we found an attribute name with different attribute type, we consider this attribute type as "mixed"
409+
if len(schema[current_field_path]["types"]) > 1:
410+
schema[current_field_path]["type"] = "mixed"
411+
schema[current_field_path]["nullable"] |= (
412+
attribute_value is None
413+
) # Mark as nullable if null encountered
414+
types = schema[current_field_path]["types"]
415+
logger.debug(
416+
f"append schema with field_path: {current_field_path} and type: {types}"
417+
)
392418

393419
def construct_schema_metadata(
394420
self,
395421
table_name: str,
396422
dataset_urn: str,
397423
dataset_properties: DatasetPropertiesClass,
398-
schema: Dict[str, SchemaDescription],
424+
schema: Dict[Tuple[str, ...], SchemaDescription],
399425
primary_key_dict: Dict[str, str],
400426
) -> SchemaMetadata:
401427
""" "
@@ -407,20 +433,23 @@ def construct_schema_metadata(
407433
canonical_schema: List[SchemaField] = []
408434
schema_size = len(schema.values())
409435
table_fields = list(schema.values())
410-
411436
if schema_size > MAX_SCHEMA_SIZE:
412437
# downsample the schema, using frequency as the sort key
413438
self.report.report_warning(
414439
key=dataset_urn,
415440
reason=f"Downsampling the table schema because MAX_SCHEMA_SIZE threshold is {MAX_SCHEMA_SIZE}",
416441
)
442+
417443
# Add this information to the custom properties so user can know they are looking at down sampled schema
418444
dataset_properties.customProperties["schema.downsampled"] = "True"
419445
dataset_properties.customProperties["schema.totalFields"] = f"{schema_size}"
420-
# append each schema field (sort so output is consistent)
446+
# append each schema field, schema will be sorted by count descending and delimited_name ascending and sliced to only include MAX_SCHEMA_SIZE items
421447
for schema_field in sorted(
422448
table_fields,
423-
key=lambda x: x["delimited_name"],
449+
key=lambda x: (
450+
-x["count"],
451+
x["delimited_name"],
452+
), # Negate `count` for descending order, `delimited_name` stays the same for ascending
424453
)[0:MAX_SCHEMA_SIZE]:
425454
field_path = schema_field["delimited_name"]
426455
native_data_type = self.get_native_type(schema_field["type"], table_name)

metadata-ingestion/tests/integration/dynamodb/dynamodb_default_platform_instance_mces_golden.json

+92-4
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,18 @@
4646
"recursive": false,
4747
"isPartOfKey": false
4848
},
49+
{
50+
"fieldPath": "contactNumbers",
51+
"nullable": true,
52+
"type": {
53+
"type": {
54+
"com.linkedin.schema.ArrayType": {}
55+
}
56+
},
57+
"nativeDataType": "List",
58+
"recursive": false,
59+
"isPartOfKey": false
60+
},
4961
{
5062
"fieldPath": "partitionKey",
5163
"nullable": false,
@@ -59,6 +71,78 @@
5971
"recursive": false,
6072
"isPartOfKey": false
6173
},
74+
{
75+
"fieldPath": "services",
76+
"nullable": true,
77+
"type": {
78+
"type": {
79+
"com.linkedin.schema.RecordType": {}
80+
}
81+
},
82+
"nativeDataType": "Map",
83+
"recursive": false,
84+
"isPartOfKey": false
85+
},
86+
{
87+
"fieldPath": "services.hours",
88+
"nullable": true,
89+
"type": {
90+
"type": {
91+
"com.linkedin.schema.RecordType": {}
92+
}
93+
},
94+
"nativeDataType": "Map",
95+
"recursive": false,
96+
"isPartOfKey": false
97+
},
98+
{
99+
"fieldPath": "services.hours.close",
100+
"nullable": true,
101+
"type": {
102+
"type": {
103+
"com.linkedin.schema.StringType": {}
104+
}
105+
},
106+
"nativeDataType": "String",
107+
"recursive": false,
108+
"isPartOfKey": false
109+
},
110+
{
111+
"fieldPath": "services.hours.open",
112+
"nullable": true,
113+
"type": {
114+
"type": {
115+
"com.linkedin.schema.StringType": {}
116+
}
117+
},
118+
"nativeDataType": "String",
119+
"recursive": false,
120+
"isPartOfKey": false
121+
},
122+
{
123+
"fieldPath": "services.parking",
124+
"nullable": true,
125+
"type": {
126+
"type": {
127+
"com.linkedin.schema.BooleanType": {}
128+
}
129+
},
130+
"nativeDataType": "Boolean",
131+
"recursive": false,
132+
"isPartOfKey": false
133+
},
134+
{
135+
"fieldPath": "services.wifi",
136+
"nullable": true,
137+
"type": {
138+
"type": {
139+
"com.linkedin.schema.StringType": {}
140+
}
141+
},
142+
"nativeDataType": "String",
143+
"recursive": false,
144+
"isPartOfKey": false
145+
},
62146
{
63147
"fieldPath": "zip",
64148
"nullable": true,
@@ -76,7 +160,8 @@
76160
},
77161
"systemMetadata": {
78162
"lastObserved": 1693396800000,
79-
"runId": "dynamodb-test"
163+
"runId": "dynamodb-test",
164+
"lastRunId": "no-run-id-provided"
80165
}
81166
},
82167
{
@@ -95,7 +180,8 @@
95180
},
96181
"systemMetadata": {
97182
"lastObserved": 1693396800000,
98-
"runId": "dynamodb-test"
183+
"runId": "dynamodb-test",
184+
"lastRunId": "no-run-id-provided"
99185
}
100186
},
101187
{
@@ -111,7 +197,8 @@
111197
},
112198
"systemMetadata": {
113199
"lastObserved": 1693396800000,
114-
"runId": "dynamodb-test"
200+
"runId": "dynamodb-test",
201+
"lastRunId": "no-run-id-provided"
115202
}
116203
},
117204
{
@@ -126,7 +213,8 @@
126213
},
127214
"systemMetadata": {
128215
"lastObserved": 1693396800000,
129-
"runId": "dynamodb-test"
216+
"runId": "dynamodb-test",
217+
"lastRunId": "no-run-id-provided"
130218
}
131219
}
132220
]

0 commit comments

Comments
 (0)