Skip to content

Commit 6edcfbc

Browse files
committed
make it mutable
1 parent 30f8a67 commit 6edcfbc

File tree

2 files changed

+26
-29
lines changed

2 files changed

+26
-29
lines changed

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/data/AirbyteValue.kt

+10-9
Original file line numberDiff line numberDiff line change
@@ -166,12 +166,12 @@ private class ObjectValueSerializer : JsonSerializer<ObjectValue>() {
166166
* @property name Field name
167167
* @property fieldCategory [FieldCategory] of the field
168168
*/
169-
data class EnrichedAirbyteValue(
170-
val value: AirbyteValue,
169+
class EnrichedAirbyteValue(
170+
var value: AirbyteValue,
171171
val type: AirbyteType,
172172
val name: String,
173173
val fieldCategory: FieldCategory,
174-
val changes: List<Meta.Change> = emptyList()
174+
val changes: MutableList<Meta.Change> = mutableListOf()
175175
) {
176176
init {
177177
require(name.isNotBlank()) { "Field name cannot be blank" }
@@ -183,16 +183,16 @@ data class EnrichedAirbyteValue(
183183
* @param reason The [Reason] for nullification, defaults to DESTINATION_SERIALIZATION_ERROR
184184
* @return A new [EnrichedAirbyteValue] with a null value and an additional change record
185185
*/
186-
fun toNullified(reason: Reason = Reason.DESTINATION_SERIALIZATION_ERROR): EnrichedAirbyteValue {
186+
fun nullify(reason: Reason = Reason.DESTINATION_SERIALIZATION_ERROR) {
187187
val nullChange =
188188
Meta.Change(
189189
field = name,
190190
change = AirbyteRecordMessageMetaChange.Change.NULLED,
191191
reason = reason
192192
)
193193

194-
// Return a copy with null value and the new change added to the changes list
195-
return copy(value = NullValue, changes = changes + nullChange)
194+
value = NullValue
195+
changes.add(nullChange)
196196
}
197197

198198
/**
@@ -202,10 +202,10 @@ data class EnrichedAirbyteValue(
202202
* @param newValue The new (truncated) value to use
203203
* @return A new [EnrichedAirbyteValue] with the truncated value and an additional change record
204204
*/
205-
fun toTruncated(
205+
fun truncate(
206206
reason: Reason = Reason.DESTINATION_RECORD_SIZE_LIMITATION,
207207
newValue: AirbyteValue
208-
): EnrichedAirbyteValue {
208+
) {
209209
val truncateChange =
210210
Meta.Change(
211211
field = name,
@@ -214,7 +214,8 @@ data class EnrichedAirbyteValue(
214214
)
215215

216216
// Return a copy with null value and the new change added to the changes list
217-
return copy(value = newValue, changes = changes + truncateChange)
217+
value = newValue
218+
changes.add(truncateChange)
218219
}
219220
}
220221

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt

+16-20
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ import java.math.BigInteger
4141
import java.time.Instant
4242
import java.time.OffsetDateTime
4343
import java.time.ZoneOffset
44-
import java.util.*
4544

4645
/**
4746
* Internal representation of destination messages. These are intended to be specialized for
@@ -197,6 +196,8 @@ data class EnrichedDestinationRecordAirbyteValue(
197196
TODO("all the other efields"),
198197
)
199198
}
199+
200+
val allFields = declaredFields + airbyteMetaFields
200201
}
201202

202203
data class DestinationRecordRaw(
@@ -246,26 +247,21 @@ data class DestinationRecordRaw(
246247
"Field '$fieldName' exists in schema keys but not in properties"
247248
)
248249

249-
val airbyteValue =
250-
if (fieldValue.isNull) NullValue else fieldValue.toAirbyteValue()
251-
declaredFields[fieldName] =
250+
val enrichedValue =
252251
EnrichedAirbyteValue(
253-
value = airbyteValue,
254-
type = fieldType,
255-
name = fieldName,
256-
fieldCategory = FieldCategory.CLIENT_DATA,
257-
)
258-
.let {
259-
val coercedValue = AirbyteValueCoercer.coerce(airbyteValue, schema)
260-
if (coercedValue == null) {
261-
it.toNullified(
262-
AirbyteRecordMessageMetaChange.Reason
263-
.DESTINATION_SERIALIZATION_ERROR
264-
)
265-
} else {
266-
it
267-
}
268-
}
252+
value = NullValue,
253+
type = fieldType,
254+
name = fieldName,
255+
fieldCategory = FieldCategory.CLIENT_DATA,
256+
)
257+
AirbyteValueCoercer.coerce(fieldValue.toAirbyteValue(), schema)?.let {
258+
enrichedValue.value = it
259+
}
260+
?: enrichedValue.nullify(
261+
AirbyteRecordMessageMetaChange.Reason.DESTINATION_SERIALIZATION_ERROR
262+
)
263+
264+
declaredFields[fieldName] = enrichedValue
269265
}
270266
else -> {
271267
// Undeclared field (not in schema)

0 commit comments

Comments
 (0)