Skip to content

Commit 83a3284

Browse files
committed
dump WIP
1 parent 6edcfbc commit 83a3284

File tree

3 files changed

+178
-25
lines changed

3 files changed

+178
-25
lines changed

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt

+84-14
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@ import com.fasterxml.jackson.databind.SerializerProvider
1212
import com.fasterxml.jackson.databind.annotation.JsonSerialize
1313
import com.fasterxml.jackson.databind.node.NullNode
1414
import io.airbyte.cdk.load.message.Meta
15-
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
16-
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.Reason
15+
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange.*
1716
import java.math.BigDecimal
1817
import java.math.BigInteger
1918
import java.time.LocalDate
@@ -184,12 +183,7 @@ class EnrichedAirbyteValue(
184183
* @return A new [EnrichedAirbyteValue] with a null value and an additional change record
185184
*/
186185
fun nullify(reason: Reason = Reason.DESTINATION_SERIALIZATION_ERROR) {
187-
val nullChange =
188-
Meta.Change(
189-
field = name,
190-
change = AirbyteRecordMessageMetaChange.Change.NULLED,
191-
reason = reason
192-
)
186+
val nullChange = Meta.Change(field = name, change = Change.NULLED, reason = reason)
193187

194188
value = NullValue
195189
changes.add(nullChange)
@@ -206,17 +200,93 @@ class EnrichedAirbyteValue(
206200
reason: Reason = Reason.DESTINATION_RECORD_SIZE_LIMITATION,
207201
newValue: AirbyteValue
208202
) {
209-
val truncateChange =
210-
Meta.Change(
211-
field = name,
212-
change = AirbyteRecordMessageMetaChange.Change.TRUNCATED,
213-
reason = reason
214-
)
203+
val truncateChange = Meta.Change(field = name, change = Change.TRUNCATED, reason = reason)
215204

216205
// Return a copy with null value and the new change added to the changes list
217206
value = newValue
218207
changes.add(truncateChange)
219208
}
209+
210+
/**
211+
* If this value is an Array element, compute a new value for each element of the array. If the
212+
* computed value is null, then update it to a [NullValue] and add a NULLED_OUT entry to the
213+
* changes list.
214+
*/
215+
fun mutateArrayElements(f: (AirbyteValue, AirbyteType) -> AirbyteValue?) {
216+
check(value !is ArrayValue) { "Attempting to walk over a non-array value" }
217+
val elementType = (type as ArrayType).items.type
218+
val changes = mutableListOf<Meta.Change>()
219+
val mutatedElements: List<AirbyteValue> =
220+
(value as ArrayValue).values.mapIndexed { i, element ->
221+
val mutatedElement = f(element, elementType)
222+
if (mutatedElement == null) {
223+
changes.add(
224+
Meta.Change(
225+
// TODO do we do this, or do we do `arst[42]` path syntax?
226+
field = "$name.$i",
227+
Change.NULLED,
228+
Reason.DESTINATION_SERIALIZATION_ERROR,
229+
)
230+
)
231+
NullValue
232+
} else {
233+
mutatedElement
234+
}
235+
}
236+
value = ArrayValue(mutatedElements)
237+
}
238+
239+
fun recursivelyMutateArrayElements(f: (AirbyteValue, AirbyteType) -> AirbyteValue?) {
240+
fun recursionHelper(
241+
value: ArrayValue,
242+
elementType: AirbyteType,
243+
path: String
244+
): AirbyteValue {
245+
val mutatedElements: List<AirbyteValue> =
246+
value.values.mapIndexed { i, element ->
247+
val elementPath = "$path.$i"
248+
if (elementType is ArrayType) {
249+
val coercedElement = AirbyteValueCoercer.coerceArray(element)
250+
if (coercedElement != null) {
251+
recursionHelper(coercedElement, elementType.items.type, name)
252+
} else {
253+
changes.add(
254+
Meta.Change(
255+
field = elementPath,
256+
change = Change.NULLED,
257+
reason = Reason.DESTINATION_SERIALIZATION_ERROR,
258+
)
259+
)
260+
NullValue
261+
}
262+
} else {
263+
val mutatedElement = f(element, elementType)
264+
if (mutatedElement == null) {
265+
changes.add(
266+
Meta.Change(
267+
// TODO do we do this, or do we do `arst[42]` path syntax?
268+
field = elementPath,
269+
Change.NULLED,
270+
Reason.DESTINATION_SERIALIZATION_ERROR,
271+
)
272+
)
273+
NullValue
274+
} else {
275+
mutatedElement
276+
}
277+
}
278+
}
279+
return ArrayValue(mutatedElements)
280+
}
281+
282+
val elementType = (type as ArrayType).items.type
283+
val coercedValue = AirbyteValueCoercer.coerceArray(value)
284+
if (coercedValue != null) {
285+
value = recursionHelper(coercedValue, elementType, name)
286+
} else {
287+
nullify()
288+
}
289+
}
220290
}
221291

222292
/**

airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/data/iceberg/parquet/AirbyteValueToIcebergRecord.kt

+19-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import io.airbyte.cdk.load.data.AirbyteValue
77
import io.airbyte.cdk.load.data.ArrayValue
88
import io.airbyte.cdk.load.data.BooleanValue
99
import io.airbyte.cdk.load.data.DateValue
10+
import io.airbyte.cdk.load.data.EnrichedAirbyteValue
1011
import io.airbyte.cdk.load.data.IntegerValue
1112
import io.airbyte.cdk.load.data.NullValue
1213
import io.airbyte.cdk.load.data.NumberValue
@@ -125,14 +126,29 @@ class AirbyteValueToIcebergRecord {
125126
}
126127
}
127128

128-
fun ObjectValue.toIcebergRecord(schema: Schema): GenericRecord {
129-
val record = GenericRecord.create(schema)
129+
fun ObjectValue.toIcebergRecord(icebergSchema: Schema): GenericRecord {
130+
val record = GenericRecord.create(icebergSchema)
130131
val airbyteValueToIcebergRecord = AirbyteValueToIcebergRecord()
131-
schema.asStruct().fields().forEach { field ->
132+
icebergSchema.asStruct().fields().forEach { field ->
132133
val value = this.values[field.name()]
133134
if (value != null) {
134135
record.setField(field.name(), airbyteValueToIcebergRecord.convert(value, field.type()))
135136
}
136137
}
137138
return record
138139
}
140+
141+
fun Map<String, EnrichedAirbyteValue>.toIcebergRecord(icebergSchema: Schema): GenericRecord {
142+
val record = GenericRecord.create(icebergSchema)
143+
val airbyteValueToIcebergRecord = AirbyteValueToIcebergRecord()
144+
icebergSchema.asStruct().fields().forEach { field ->
145+
val value = this[field.name()]
146+
if (value != null) {
147+
record.setField(
148+
field.name(),
149+
airbyteValueToIcebergRecord.convert(value.value, field.type())
150+
)
151+
}
152+
}
153+
return record
154+
}

airbyte-cdk/bulk/toolkits/load-iceberg-parquet/src/main/kotlin/io/airbyte/cdk/load/toolkits/iceberg/parquet/io/IcebergUtil.kt

+75-8
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,32 @@ package io.airbyte.cdk.load.toolkits.iceberg.parquet.io
77
import io.airbyte.cdk.load.command.Dedupe
88
import io.airbyte.cdk.load.command.DestinationStream
99
import io.airbyte.cdk.load.command.ImportType
10+
import io.airbyte.cdk.load.data.AirbyteValueCoercer
11+
import io.airbyte.cdk.load.data.ArrayType
12+
import io.airbyte.cdk.load.data.ArrayTypeWithoutSchema
13+
import io.airbyte.cdk.load.data.IntegerType
14+
import io.airbyte.cdk.load.data.IntegerValue
1015
import io.airbyte.cdk.load.data.MapperPipeline
1116
import io.airbyte.cdk.load.data.NullValue
17+
import io.airbyte.cdk.load.data.NumberType
18+
import io.airbyte.cdk.load.data.NumberValue
19+
import io.airbyte.cdk.load.data.ObjectType
20+
import io.airbyte.cdk.load.data.ObjectTypeWithEmptySchema
21+
import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema
1222
import io.airbyte.cdk.load.data.ObjectValue
23+
import io.airbyte.cdk.load.data.StringValue
24+
import io.airbyte.cdk.load.data.UnionType
25+
import io.airbyte.cdk.load.data.UnknownType
1326
import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergRecord
1427
import io.airbyte.cdk.load.data.iceberg.parquet.toIcebergSchema
1528
import io.airbyte.cdk.load.data.withAirbyteMeta
1629
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
30+
import io.airbyte.cdk.load.message.EnrichedDestinationRecordAirbyteValue
1731
import io.airbyte.cdk.load.toolkits.iceberg.parquet.TableIdGenerator
32+
import io.airbyte.cdk.load.util.serializeToString
1833
import io.github.oshai.kotlinlogging.KotlinLogging
34+
import java.math.BigDecimal
35+
import java.math.BigInteger
1936
import javax.inject.Singleton
2037
import org.apache.hadoop.conf.Configuration
2138
import org.apache.iceberg.CatalogUtil
@@ -141,18 +158,68 @@ class IcebergUtil(private val tableIdGenerator: TableIdGenerator) {
141158
* @return An Iceberg [Record] representation of the Airbyte [DestinationRecordAirbyteValue].
142159
*/
143160
fun toRecord(
144-
record: DestinationRecordAirbyteValue,
161+
record: EnrichedDestinationRecordAirbyteValue,
145162
stream: DestinationStream,
146163
tableSchema: Schema,
147-
pipeline: MapperPipeline
148164
): Record {
149-
val dataMapped =
150-
pipeline
151-
.map(record.data, record.meta?.changes)
152-
.withAirbyteMeta(stream, record.emittedAtMs, true)
153-
// TODO figure out how to detect the actual operation value
165+
// deep-coerce arrays
166+
record.declaredFields.forEach { (_, value) ->
167+
if (value.type is ArrayType) {
168+
value.mutateArrayElements { element, elementType ->
169+
AirbyteValueCoercer.coerce(element, elementType)
170+
}
171+
}
172+
}
173+
174+
// Convert complex types to string
175+
record.declaredFields.forEach { (_, value) ->
176+
when (value.type) {
177+
is ArrayType,
178+
ArrayTypeWithoutSchema,
179+
is ObjectType,
180+
ObjectTypeWithEmptySchema,
181+
ObjectTypeWithoutSchema,
182+
is UnionType,
183+
is UnknownType -> value.value = StringValue(value.value.serializeToString())
184+
else -> {}
185+
}
186+
}
187+
188+
// Null out numeric values that exceed int64/float64
189+
record.declaredFields.forEach { (_, value) ->
190+
if (value.type is ArrayType) {
191+
value.mutateArrayElements { element, elementType ->
192+
when (elementType) {
193+
is NumberType -> {
194+
val numberValue = (element as NumberValue)
195+
if (
196+
numberValue.value < BigDecimal(Double.MIN_VALUE) ||
197+
numberValue.value > BigDecimal(Double.MAX_VALUE)
198+
) {
199+
null
200+
} else {
201+
numberValue
202+
}
203+
}
204+
is IntegerType -> {
205+
val numberValue = (element as IntegerValue)
206+
if (
207+
numberValue.value < BigInteger.valueOf(Long.MIN_VALUE) ||
208+
numberValue.value > BigInteger.valueOf(Long.MAX_VALUE)
209+
) {
210+
null
211+
} else {
212+
numberValue
213+
}
214+
}
215+
else -> element
216+
}
217+
}
218+
}
219+
}
220+
154221
return RecordWrapper(
155-
delegate = dataMapped.toIcebergRecord(tableSchema),
222+
delegate = record.declaredFields.toIcebergRecord(tableSchema),
156223
operation = getOperation(record = record, importType = stream.importType)
157224
)
158225
}

0 commit comments

Comments
 (0)