Skip to content

Commit 7494014

Browse files
committed
Save state
1 parent cd7c0aa commit 7494014

File tree

4 files changed

+73
-7
lines changed

4 files changed

+73
-7
lines changed

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt

+63-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ import io.airbyte.cdk.load.data.AirbyteType
1212
import io.airbyte.cdk.load.data.AirbyteValue
1313
import io.airbyte.cdk.load.data.AirbyteValueDeepCoercingMapper
1414
import io.airbyte.cdk.load.data.EnrichedAirbyteValue
15+
import io.airbyte.cdk.load.data.FieldCategory
1516
import io.airbyte.cdk.load.data.IntegerValue
17+
import io.airbyte.cdk.load.data.NullValue
18+
import io.airbyte.cdk.load.data.ObjectType
1619
import io.airbyte.cdk.load.data.StringValue
1720
import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
1821
import io.airbyte.cdk.load.data.json.toAirbyteValue
@@ -34,6 +37,7 @@ import io.micronaut.context.annotation.Value
3437
import jakarta.inject.Singleton
3538
import java.math.BigInteger
3639
import java.time.OffsetDateTime
40+
import java.util.*
3741

3842
/**
3943
* Internal representation of destination messages. These are intended to be specialized for
@@ -145,7 +149,7 @@ data class DestinationRecord(
145149
)
146150
}
147151
fun asDestinationRecordRaw(): DestinationRecordRaw {
148-
return DestinationRecordRaw(stream, message, serialized)
152+
return DestinationRecordRaw(stream, message, serialized, schema)
149153
}
150154
}
151155

@@ -180,7 +184,8 @@ data class EnrichedDestinationRecordAirbyteValue(
180184
data class DestinationRecordRaw(
181185
val stream: DestinationStream.Descriptor,
182186
private val rawData: AirbyteMessage,
183-
private val serialized: String
187+
private val serialized: String,
188+
private val schema: AirbyteType
184189
) {
185190
fun asRawJson(): JsonNode {
186191
return rawData.record.data
@@ -200,7 +205,62 @@ data class DestinationRecordRaw(
200205
}
201206

202207
fun asEnrichedDestinationRecordAirbyteValue(): EnrichedDestinationRecordAirbyteValue {
203-
TODO()
208+
val rawJson = asRawJson()
209+
210+
// Get the set of field names defined in the schema
211+
val schemaFields =
212+
when (schema) {
213+
is ObjectType -> schema.properties.keys
214+
else -> emptySet()
215+
}
216+
217+
val declaredFields = mutableMapOf<String, EnrichedAirbyteValue>()
218+
val airbyteMetaFields = mutableMapOf<String, EnrichedAirbyteValue>()
219+
val undeclaredFields = mutableMapOf<String, JsonNode>()
220+
221+
// Process fields from the raw JSON
222+
rawJson.fields().forEach { (fieldName, fieldValue) ->
223+
when {
224+
schemaFields.contains(fieldName) -> {
225+
// Declared field (exists in schema)
226+
val fieldType =
227+
(schema as ObjectType).properties[fieldName]?.type
228+
?: throw IllegalStateException(
229+
"Field '$fieldName' exists in schema keys but not in properties"
230+
)
231+
232+
val airbyteValue =
233+
if (fieldValue.isNull) NullValue else fieldValue.toAirbyteValue()
234+
declaredFields[fieldName] =
235+
EnrichedAirbyteValue(
236+
value = airbyteValue,
237+
type = fieldType,
238+
name = fieldName,
239+
fieldCategory = FieldCategory.CLIENT_DATA
240+
)
241+
}
242+
else -> {
243+
// Undeclared field (not in schema)
244+
undeclaredFields[fieldName] = fieldValue
245+
}
246+
}
247+
}
248+
249+
return EnrichedDestinationRecordAirbyteValue(
250+
stream = stream,
251+
declaredFields = declaredFields,
252+
airbyteMetaFields = airbyteMetaFields,
253+
undeclaredFields = undeclaredFields,
254+
emittedAtMs = rawData.record.emittedAt,
255+
meta =
256+
Meta(
257+
rawData.record.meta?.changes?.map {
258+
Meta.Change(it.field, it.change, it.reason)
259+
}
260+
?: emptyList()
261+
),
262+
serializedSizeBytes = serialized.length.toLong()
263+
)
204264
}
205265
}
206266

airbyte-integrations/connectors/destination-s3-data-lake/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
airbyteBulkConnector {
77
core = 'load'
88
toolkits = ['load-iceberg-parquet', 'load-aws']
9-
cdk = '0.344'
9+
cdk = 'local'
1010
}
1111

1212
application {

airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDirectLoader.kt

+4-3
Original file line numberDiff line numberDiff line change
@@ -80,19 +80,20 @@ class S3DataLakeDirectLoader(
8080
}
8181

8282
override fun accept(record: DestinationRecordRaw): DirectLoader.DirectLoadResult {
83-
val recordAirbyteValue = record.asDestinationRecordAirbyteValue()
83+
val enrichedRecordAirbyteValue = record.asEnrichedDestinationRecordAirbyteValue()
8484

8585
val icebergRecord =
8686
icebergUtil.toRecord(
87-
record = recordAirbyteValue,
87+
record = enrichedRecordAirbyteValue,
8888
stream = stream,
8989
tableSchema = schema,
9090
pipeline = pipeline
9191
)
9292
writer.write(icebergRecord)
9393

9494
dataSize +=
95-
recordAirbyteValue.serializedSizeBytes // TODO: use icebergRecord.size() instead?
95+
enrichedRecordAirbyteValue
96+
.serializedSizeBytes // TODO: use icebergRecord.size() instead?
9697
if (dataSize < batchSize) {
9798
return DirectLoader.Incomplete
9899
}

airbyte-integrations/connectors/destination-s3-data-lake/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeWriteTest.kt

+5
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ class GlueWriteTest :
9090
val failure = expectFailure { runSync(updatedConfig, catalog, messages = emptyList()) }
9191
assertContains(failure.message, "Detected naming conflicts between streams")
9292
}
93+
94+
@Test
95+
override fun testBasicWrite() {
96+
super.testBasicWrite()
97+
}
9398
}
9499

95100
class GlueAssumeRoleWriteTest :

0 commit comments

Comments
 (0)