Skip to content

Commit 35cd3c3

Browse files
committed
Save
1 parent 2524dbd commit 35cd3c3

File tree

9 files changed

+41
-17
lines changed

9 files changed

+41
-17
lines changed

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt

+18-3
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,8 @@ data class DestinationRecord(
147147
fun asDestinationRecordRaw(): DestinationRecordRaw {
148148
return DestinationRecordRaw(
149149
stream,
150-
message
150+
message,
151+
serialized
151152
)
152153
}
153154
}
@@ -182,8 +183,22 @@ data class EnrichedDestinationRecordAirbyteValue(
182183

183184
data class DestinationRecordRaw(
184185
val stream: DestinationStream.Descriptor,
185-
val rawData: AirbyteMessage
186-
)
186+
private val rawData: AirbyteMessage,
187+
private val serialized: String
188+
) {
189+
fun asDestinationRecordAirbyteValue(): DestinationRecordAirbyteValue {
190+
return DestinationRecordAirbyteValue(
191+
stream,
192+
rawData.record.data.toAirbyteValue(),
193+
rawData.record.emittedAt,
194+
Meta(
195+
rawData.record.meta?.changes?.map { Meta.Change(it.field, it.change, it.reason) }
196+
?: emptyList()
197+
),
198+
serialized.length.toLong()
199+
)
200+
}
201+
}
187202

188203
data class DestinationFile(
189204
override val stream: DestinationStream.Descriptor,

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadPipelineStep.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.airbyte.cdk.load.pipeline
66

77
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
8+
import io.airbyte.cdk.load.message.DestinationRecordRaw
89
import io.airbyte.cdk.load.message.PartitionedQueue
910
import io.airbyte.cdk.load.message.PipelineEvent
1011
import io.airbyte.cdk.load.message.QueueWriter
@@ -25,7 +26,7 @@ class DirectLoadPipelineStep<S : DirectLoader>(
2526
val accumulator: DirectLoadRecordAccumulator<S, StreamKey>,
2627
@Named("recordQueue")
2728
val inputQueue:
28-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
29+
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
2930
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
3031
@Value("\${airbyte.destination.core.record-batch-size-override:null}")
3132
val batchSizeOverride: Long? = null,

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadRecordAccumulator.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package io.airbyte.cdk.load.pipeline
66

77
import io.airbyte.cdk.load.message.Batch
88
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
9+
import io.airbyte.cdk.load.message.DestinationRecordRaw
910
import io.airbyte.cdk.load.message.WithBatchState
1011
import io.airbyte.cdk.load.message.WithStream
1112
import io.airbyte.cdk.load.write.DirectLoader
@@ -25,13 +26,13 @@ data class DirectLoadAccResult(override val state: Batch.State) : WithBatchState
2526
@Requires(bean = DirectLoaderFactory::class)
2627
class DirectLoadRecordAccumulator<S : DirectLoader, K : WithStream>(
2728
val directLoaderFactory: DirectLoaderFactory<S>
28-
) : BatchAccumulator<S, K, DestinationRecordAirbyteValue, DirectLoadAccResult> {
29+
) : BatchAccumulator<S, K, DestinationRecordRaw, DirectLoadAccResult> {
2930
override fun start(key: K, part: Int): S {
3031
return directLoaderFactory.create(key.stream, part)
3132
}
3233

3334
override fun accept(
34-
record: DestinationRecordAirbyteValue,
35+
record: DestinationRecordRaw,
3536
state: S
3637
): Pair<S, DirectLoadAccResult?> {
3738
state.accept(record).let {

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.airbyte.cdk.load.pipeline
66

77
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
8+
import io.airbyte.cdk.load.message.DestinationRecordRaw
89
import io.micronaut.context.annotation.Secondary
910
import jakarta.inject.Singleton
1011
import kotlin.math.abs
@@ -14,13 +15,13 @@ import kotlin.math.abs
1415
* partitioned by a hash of the stream name and namespace.
1516
*/
1617
interface InputPartitioner {
17-
fun getPartition(record: DestinationRecordAirbyteValue, numParts: Int): Int
18+
fun getPartition(record: DestinationRecordRaw, numParts: Int): Int
1819
}
1920

2021
@Singleton
2122
@Secondary
2223
class ByStreamInputPartitioner : InputPartitioner {
23-
override fun getPartition(record: DestinationRecordAirbyteValue, numParts: Int): Int {
24+
override fun getPartition(record: DestinationRecordRaw, numParts: Int): Int {
2425
return abs(record.stream.hashCode()) % numParts
2526
}
2627
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import io.airbyte.cdk.load.message.BatchEnvelope
1212
import io.airbyte.cdk.load.message.ChannelMessageQueue
1313
import io.airbyte.cdk.load.message.CheckpointMessageWrapped
1414
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
15+
import io.airbyte.cdk.load.message.DestinationRecordRaw
1516
import io.airbyte.cdk.load.message.DestinationStreamEvent
1617
import io.airbyte.cdk.load.message.MessageQueue
1718
import io.airbyte.cdk.load.message.MessageQueueSupplier
@@ -145,7 +146,7 @@ class DefaultDestinationTaskLauncher<K : WithStream>(
145146
// New interface shim
146147
@Named("recordQueue")
147148
private val recordQueueForPipeline:
148-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
149+
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
149150
@Named("batchStateUpdateQueue") private val batchUpdateQueue: ChannelMessageQueue<BatchUpdate>,
150151
private val loadPipeline: LoadPipeline?,
151152
private val partitioner: InputPartitioner,

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt

+4-3
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import io.airbyte.cdk.load.message.DestinationFileStreamComplete
1616
import io.airbyte.cdk.load.message.DestinationFileStreamIncomplete
1717
import io.airbyte.cdk.load.message.DestinationRecord
1818
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
19+
import io.airbyte.cdk.load.message.DestinationRecordRaw
1920
import io.airbyte.cdk.load.message.DestinationRecordStreamComplete
2021
import io.airbyte.cdk.load.message.DestinationRecordStreamIncomplete
2122
import io.airbyte.cdk.load.message.DestinationStreamAffinedMessage
@@ -80,7 +81,7 @@ class DefaultInputConsumerTask(
8081
// Required by new interface
8182
@Named("recordQueue")
8283
private val recordQueueForPipeline:
83-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
84+
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
8485
private val loadPipeline: LoadPipeline? = null,
8586
private val partitioner: InputPartitioner,
8687
private val openStreamQueue: QueueWriter<DestinationStream>
@@ -310,7 +311,7 @@ interface InputConsumerTaskFactory {
310311

311312
// Required by new interface
312313
recordQueueForPipeline:
313-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
314+
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
314315
loadPipeline: LoadPipeline?,
315316
partitioner: InputPartitioner,
316317
openStreamQueue: QueueWriter<DestinationStream>,
@@ -333,7 +334,7 @@ class DefaultInputConsumerTaskFactory(
333334

334335
// Required by new interface
335336
recordQueueForPipeline:
336-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
337+
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
337338
loadPipeline: LoadPipeline?,
338339
partitioner: InputPartitioner,
339340
openStreamQueue: QueueWriter<DestinationStream>,

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/DirectLoader.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package io.airbyte.cdk.load.write
66

77
import io.airbyte.cdk.load.command.DestinationStream
88
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
9+
import io.airbyte.cdk.load.message.DestinationRecordRaw
910

1011
/**
1112
* [DirectLoader] is for the use case where records are loaded directly into the destination or
@@ -56,7 +57,7 @@ interface DirectLoader : AutoCloseable {
5657
* Called once per record until it returns [Complete], after which [close] is called, the loader
5758
* is discarded, and the records are considered processed by the platform.
5859
*/
59-
fun accept(record: DestinationRecordAirbyteValue): DirectLoadResult
60+
fun accept(record: DestinationRecordRaw): DirectLoadResult
6061

6162
/**
6263
* Called by the CDK to force work to finish. It will only be called if the last call to

airbyte-integrations/connectors/destination-s3-data-lake/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
airbyteBulkConnector {
77
core = 'load'
88
toolkits = ['load-iceberg-parquet', 'load-aws']
9-
cdk = '0.334'
9+
cdk = 'local'
1010
}
1111

1212
application {

airbyte-integrations/connectors/destination-s3-data-lake/src/main/kotlin/io/airbyte/integrations/destination/s3_data_lake/S3DataLakeDirectLoader.kt

+6-3
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import io.airbyte.cdk.load.command.DestinationStream
99
import io.airbyte.cdk.load.data.MapperPipeline
1010
import io.airbyte.cdk.load.data.iceberg.parquet.IcebergParquetPipelineFactory
1111
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
12+
import io.airbyte.cdk.load.message.DestinationRecordRaw
1213
import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergTableWriterFactory
1314
import io.airbyte.cdk.load.toolkits.iceberg.parquet.io.IcebergUtil
1415
import io.airbyte.cdk.load.write.DirectLoader
@@ -79,17 +80,19 @@ class S3DataLakeDirectLoader(
7980
val commitLock: Any = Any()
8081
}
8182

82-
override fun accept(record: DestinationRecordAirbyteValue): DirectLoader.DirectLoadResult {
83+
override fun accept(record: DestinationRecordRaw): DirectLoader.DirectLoadResult {
84+
val recordAirbyteValue = record.asDestinationRecordAirbyteValue()
85+
8386
val icebergRecord =
8487
icebergUtil.toRecord(
85-
record = record,
88+
record = recordAirbyteValue,
8689
stream = stream,
8790
tableSchema = schema,
8891
pipeline = pipeline
8992
)
9093
writer.write(icebergRecord)
9194

92-
dataSize += record.serializedSizeBytes // TODO: use icebergRecord.size() instead?
95+
dataSize += recordAirbyteValue.serializedSizeBytes // TODO: use icebergRecord.size() instead?
9396
if (dataSize < batchSize) {
9497
return DirectLoader.Incomplete
9598
}

0 commit comments

Comments
 (0)