Skip to content

Commit 30f8a67

Browse files
committed
coerce immediately + kill meta fields
1 parent 7494014 commit 30f8a67

File tree

3 files changed

+62
-11
lines changed

3 files changed

+62
-11
lines changed

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -169,9 +169,9 @@ private class ObjectValueSerializer : JsonSerializer<ObjectValue>() {
169169
data class EnrichedAirbyteValue(
170170
val value: AirbyteValue,
171171
val type: AirbyteType,
172-
val changes: List<Meta.Change> = emptyList(),
173172
val name: String,
174-
val fieldCategory: FieldCategory
173+
val fieldCategory: FieldCategory,
174+
val changes: List<Meta.Change> = emptyList()
175175
) {
176176
init {
177177
require(name.isNotBlank()) { "Field name cannot be blank" }

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValueCoercer.kt

+24
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,30 @@ import java.time.format.DateTimeFormatter
2525
* common-sense conversions among numeric types, as well as upcasting any value to StringValue.
2626
*/
2727
object AirbyteValueCoercer {
28+
fun coerce(value: AirbyteValue, type: AirbyteType): AirbyteValue? =
29+
when (type) {
30+
BooleanType -> coerceBoolean(value)
31+
DateType -> coerceDate(value)
32+
IntegerType -> coerceInt(value)
33+
NumberType -> coerceNumber(value)
34+
StringType -> coerceString(value)
35+
TimeTypeWithTimezone -> coerceTimeTz(value)
36+
TimeTypeWithoutTimezone -> coerceTimeNtz(value)
37+
TimestampTypeWithTimezone -> coerceTimestampTz(value)
38+
TimestampTypeWithoutTimezone -> coerceTimestampNtz(value)
39+
is ArrayType,
40+
ArrayTypeWithoutSchema -> coerceArray(value)
41+
is ObjectType,
42+
ObjectTypeWithEmptySchema,
43+
ObjectTypeWithoutSchema -> coerceObject(value)
44+
45+
// Don't touch unions, just pass it through
46+
is UnionType -> value
47+
// Similarly, if we don't know what type it's supposed to be,
48+
// leave it unchanged.
49+
is UnknownType -> value
50+
}
51+
2852
fun coerceBoolean(value: AirbyteValue): BooleanValue? = requireType<BooleanValue>(value)
2953

3054
fun coerceInt(value: AirbyteValue): IntegerValue? =

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt

+36-9
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@ import io.airbyte.cdk.load.command.DestinationCatalog
1010
import io.airbyte.cdk.load.command.DestinationStream
1111
import io.airbyte.cdk.load.data.AirbyteType
1212
import io.airbyte.cdk.load.data.AirbyteValue
13+
import io.airbyte.cdk.load.data.AirbyteValueCoercer
1314
import io.airbyte.cdk.load.data.AirbyteValueDeepCoercingMapper
1415
import io.airbyte.cdk.load.data.EnrichedAirbyteValue
1516
import io.airbyte.cdk.load.data.FieldCategory
1617
import io.airbyte.cdk.load.data.IntegerValue
1718
import io.airbyte.cdk.load.data.NullValue
1819
import io.airbyte.cdk.load.data.ObjectType
1920
import io.airbyte.cdk.load.data.StringValue
21+
import io.airbyte.cdk.load.data.TimestampTypeWithTimezone
2022
import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
2123
import io.airbyte.cdk.load.data.json.toAirbyteValue
2224
import io.airbyte.cdk.load.message.CheckpointMessage.Checkpoint
@@ -36,7 +38,9 @@ import io.airbyte.protocol.models.v0.AirbyteTraceMessage
3638
import io.micronaut.context.annotation.Value
3739
import jakarta.inject.Singleton
3840
import java.math.BigInteger
41+
import java.time.Instant
3942
import java.time.OffsetDateTime
43+
import java.time.ZoneOffset
4044
import java.util.*
4145

4246
/**
@@ -174,12 +178,26 @@ data class DestinationRecordAirbyteValue(
174178
data class EnrichedDestinationRecordAirbyteValue(
175179
val stream: DestinationStream.Descriptor,
176180
val declaredFields: Map<String, EnrichedAirbyteValue>,
177-
val airbyteMetaFields: Map<String, EnrichedAirbyteValue>,
178181
val undeclaredFields: Map<String, JsonNode>,
179182
val emittedAtMs: Long,
180183
val meta: Meta?,
181184
val serializedSizeBytes: Long = 0L
182-
)
185+
) {
186+
val airbyteMetaFields: Map<String, EnrichedAirbyteValue> by lazy {
187+
mapOf(
188+
"_airbyte_extracted_at" to
189+
EnrichedAirbyteValue(
190+
TimestampWithTimezoneValue(
191+
OffsetDateTime.ofInstant(Instant.ofEpochMilli(emittedAtMs), ZoneOffset.UTC)
192+
),
193+
TimestampTypeWithTimezone,
194+
name = "_airbyte_extracted_at",
195+
FieldCategory.EXTRACTED_AT,
196+
),
197+
TODO("all the other efields"),
198+
)
199+
}
200+
}
183201

184202
data class DestinationRecordRaw(
185203
val stream: DestinationStream.Descriptor,
@@ -215,7 +233,6 @@ data class DestinationRecordRaw(
215233
}
216234

217235
val declaredFields = mutableMapOf<String, EnrichedAirbyteValue>()
218-
val airbyteMetaFields = mutableMapOf<String, EnrichedAirbyteValue>()
219236
val undeclaredFields = mutableMapOf<String, JsonNode>()
220237

221238
// Process fields from the raw JSON
@@ -233,11 +250,22 @@ data class DestinationRecordRaw(
233250
if (fieldValue.isNull) NullValue else fieldValue.toAirbyteValue()
234251
declaredFields[fieldName] =
235252
EnrichedAirbyteValue(
236-
value = airbyteValue,
237-
type = fieldType,
238-
name = fieldName,
239-
fieldCategory = FieldCategory.CLIENT_DATA
240-
)
253+
value = airbyteValue,
254+
type = fieldType,
255+
name = fieldName,
256+
fieldCategory = FieldCategory.CLIENT_DATA,
257+
)
258+
.let {
259+
val coercedValue = AirbyteValueCoercer.coerce(airbyteValue, schema)
260+
if (coercedValue == null) {
261+
it.toNullified(
262+
AirbyteRecordMessageMetaChange.Reason
263+
.DESTINATION_SERIALIZATION_ERROR
264+
)
265+
} else {
266+
it
267+
}
268+
}
241269
}
242270
else -> {
243271
// Undeclared field (not in schema)
@@ -249,7 +277,6 @@ data class DestinationRecordRaw(
249277
return EnrichedDestinationRecordAirbyteValue(
250278
stream = stream,
251279
declaredFields = declaredFields,
252-
airbyteMetaFields = airbyteMetaFields,
253280
undeclaredFields = undeclaredFields,
254281
emittedAtMs = rawData.record.emittedAt,
255282
meta =

0 commit comments

Comments
 (0)