Skip to content

Commit e7eff8d

Browse files
S3 Destination Uses New Load CDK Interface (temporarily disabled)
1 parent e2ea32e commit e7eff8d

File tree

31 files changed

+899
-180
lines changed

31 files changed

+899
-180
lines changed

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import io.airbyte.cdk.load.message.PipelineEvent
1616
import io.airbyte.cdk.load.message.StreamKey
1717
import io.airbyte.cdk.load.pipeline.BatchUpdate
1818
import io.airbyte.cdk.load.state.ReservationManager
19-
import io.airbyte.cdk.load.state.Reserved
2019
import io.airbyte.cdk.load.task.implementor.FileAggregateMessage
2120
import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage
2221
import io.airbyte.cdk.load.write.LoadStrategy
@@ -120,7 +119,7 @@ class SyncBeanFactory {
120119
@Named("recordQueue")
121120
fun recordQueue(
122121
loadStrategy: LoadStrategy? = null,
123-
): PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>> {
122+
): PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>> {
124123
return PartitionedQueue(
125124
Array(loadStrategy?.inputPartitions ?: 1) {
126125
ChannelMessageQueue(Channel(Channel.UNLIMITED))

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PartitionedQueue.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@ class PartitionedQueue<T>(private val queues: Array<MessageQueue<T>>) : Closeabl
1111
val partitions = queues.size
1212

1313
fun consume(partition: Int): Flow<T> {
14-
if (partition < 0 || partition >= queues.size) {
14+
if (partition < 0 || partition >= partitions) {
1515
throw IllegalArgumentException("Invalid partition: $partition")
1616
}
1717
return queues[partition].consume()
1818
}
1919

2020
suspend fun publish(value: T, partition: Int) {
21-
if (partition < 0 || partition >= queues.size) {
21+
if (partition < 0 || partition >= partitions) {
2222
throw IllegalArgumentException("Invalid partition: $partition")
2323
}
2424
queues[partition].publish(value)

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PipelineEvent.kt

+9-5
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,19 @@ import io.airbyte.cdk.load.state.CheckpointId
1010
/** Used internally by the CDK to pass messages between steps in the loader pipeline. */
1111
sealed interface PipelineEvent<K : WithStream, T>
1212

13+
/**
14+
* A message that contains a keyed payload. The key is used to manage the state of the payload's
15+
* corresponding [io.airbyte.cdk.load.pipeline.BatchAccumulator]. [checkpointCounts] is used by the
16+
* CDK to perform state message bookkeeping. [postProcessingCallback] is for releasing resources
17+
* associated with the message.
18+
*/
1319
class PipelineMessage<K : WithStream, T>(
1420
val checkpointCounts: Map<CheckpointId, Long>,
1521
val key: K,
16-
val value: T
22+
val value: T,
23+
val postProcessingCallback: suspend () -> Unit = {},
1724
) : PipelineEvent<K, T>
1825

19-
/**
20-
* We send the end message on the stream and not the key, because there's no way to partition an
21-
* empty message.
22-
*/
26+
/** Broadcast at end-of-stream to all partitions to signal that the stream has ended. */
2327
class PipelineEndOfStream<K : WithStream, T>(val stream: DestinationStream.Descriptor) :
2428
PipelineEvent<K, T>

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/BatchAccumulator.kt

+50-5
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,56 @@ package io.airbyte.cdk.load.pipeline
77
import io.airbyte.cdk.load.message.WithStream
88

99
/**
10-
* [BatchAccumulator] is used internally by the CDK to implement RecordLoaders. Connector devs
11-
* should never need to implement this interface.
10+
* [BatchAccumulator] is used internally by the CDK to implement
11+
* [io.airbyte.cdk.load.write.LoadStrategy]s. Connector devs should never need to implement this
12+
* interface.
13+
*
14+
* It is the glue that connects a specific step in a specific pipeline to the generic pipeline on
15+
* the back end. (For example, in a three-stage pipeline like bulk load, step 1 is to create a part,
16+
* step 2 is to upload it, and step 3 is to load it from object storage into a table.)
17+
*
18+
* - [S] is a state type that will be threaded through accumulator calls.
19+
* - [K] is a key type associated the input data. (NOTE: Currently, there is no support for
20+
* key-mapping, so the key is always [io.airbyte.cdk.load.message.StreamKey]). Specifically, state
21+
* will always be managed per-key.
22+
* - [T] is the input data type
23+
* - [U] is the output data type
24+
*
25+
* The first time data is seen for a given key, [start] is called (with the partition number). The
26+
* state returned by [start] will be passed per input to [accept].
27+
*
28+
* If [accept] returns [IntermediateOutput] or [FinalOutput], the output will be forwarded to the
29+
* next stage (if applicable) and/or trigger bookkeeping (iff the output type implements
30+
* [io.airbyte.cdk.load.message.WithBatchState]).
31+
*
32+
* If [accept] returns a [NoOutput] or [IntermediateOutput], the nextState will be passed to the
33+
* next call to [accept]. If [accept] returns a [FinalOutput] state, any state will be discarded and
34+
* a new one will be created on the next input by a new call to [start].
35+
*
36+
* When the input stream is exhausted, [finish] will be called with any remaining state iff at least
37+
* one input was seen for that key. This means that [finish] will not be called on empty keys or on
38+
* keys where the last call to [accept] yielded a null (finished) state.
1239
*/
1340
interface BatchAccumulator<S, K : WithStream, T, U> {
14-
fun start(key: K, part: Int): S
15-
fun accept(record: T, state: S): Pair<S, U?>
16-
fun finish(state: S): U
41+
suspend fun start(key: K, part: Int): S
42+
suspend fun accept(input: T, state: S): BatchAccumulatorResult<S, U>
43+
suspend fun finish(state: S): FinalOutput<S, U>
1744
}
45+
46+
sealed interface BatchAccumulatorResult<S, U> {
47+
val nextState: S?
48+
get() = null
49+
val output: U?
50+
get() = null
51+
}
52+
53+
data class NoOutput<S, U>(
54+
override val nextState: S,
55+
) : BatchAccumulatorResult<S, U>
56+
57+
data class IntermediateOutput<S, U>(
58+
override val nextState: S,
59+
override val output: U,
60+
) : BatchAccumulatorResult<S, U>
61+
62+
data class FinalOutput<S, U>(override val output: U) : BatchAccumulatorResult<S, U>

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadPipelineStep.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import io.airbyte.cdk.load.message.PartitionedQueue
99
import io.airbyte.cdk.load.message.PipelineEvent
1010
import io.airbyte.cdk.load.message.QueueWriter
1111
import io.airbyte.cdk.load.message.StreamKey
12-
import io.airbyte.cdk.load.state.Reserved
1312
import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask
1413
import io.airbyte.cdk.load.write.DirectLoader
1514
import io.airbyte.cdk.load.write.DirectLoaderFactory
@@ -24,7 +23,7 @@ import jakarta.inject.Singleton
2423
class DirectLoadPipelineStep<S : DirectLoader>(
2524
val accumulator: DirectLoadRecordAccumulator<S, StreamKey>,
2625
@Named("recordQueue")
27-
val inputQueue: PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
26+
val inputQueue: PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>>,
2827
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
2928
@Value("\${airbyte.destination.core.record-batch-size-override:null}")
3029
val batchSizeOverride: Long? = null,

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadRecordAccumulator.kt

+10-7
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,24 @@ data class DirectLoadAccResult(override val state: Batch.State) : WithBatchState
2626
class DirectLoadRecordAccumulator<S : DirectLoader, K : WithStream>(
2727
val directLoaderFactory: DirectLoaderFactory<S>
2828
) : BatchAccumulator<S, K, DestinationRecordRaw, DirectLoadAccResult> {
29-
override fun start(key: K, part: Int): S {
29+
override suspend fun start(key: K, part: Int): S {
3030
return directLoaderFactory.create(key.stream, part)
3131
}
3232

33-
override fun accept(record: DestinationRecordRaw, state: S): Pair<S, DirectLoadAccResult?> {
34-
state.accept(record).let {
33+
override suspend fun accept(
34+
input: DestinationRecordRaw,
35+
state: S
36+
): BatchAccumulatorResult<S, DirectLoadAccResult> {
37+
state.accept(input).let {
3538
return when (it) {
36-
is Incomplete -> Pair(state, null)
37-
is Complete -> Pair(state, DirectLoadAccResult(Batch.State.COMPLETE))
39+
is Incomplete -> NoOutput(state)
40+
is Complete -> FinalOutput(DirectLoadAccResult(Batch.State.COMPLETE))
3841
}
3942
}
4043
}
4144

42-
override fun finish(state: S): DirectLoadAccResult {
45+
override suspend fun finish(state: S): FinalOutput<S, DirectLoadAccResult> {
4346
state.finish()
44-
return DirectLoadAccResult(Batch.State.COMPLETE)
47+
return FinalOutput(DirectLoadAccResult(Batch.State.COMPLETE))
4548
}
4649
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt

+3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ interface InputPartitioner {
1717
fun getPartition(record: DestinationRecordRaw, numParts: Int): Int
1818
}
1919

20+
/**
21+
* The default input partitioner, which partitions by the stream name. TODO: Should be round-robin?
22+
*/
2023
@Singleton
2124
@Secondary
2225
class ByStreamInputPartitioner : InputPartitioner {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.pipeline
6+
7+
import io.airbyte.cdk.load.message.DestinationRecordRaw
8+
import kotlin.random.Random
9+
10+
/**
11+
* Declare a singleton of this type to have input distributed evenly across the input partitions.
12+
* (The default is to [ByStreamInputPartitioner].)
13+
*
14+
* [rotateEveryNRecords] determines how often to rotate to the next partition. In testing, 10_000
15+
* seems to be the sweet spot between too much context switching and not enough load balancing.
16+
*/
17+
open class RoundRobinInputPartitioner(private val rotateEveryNRecords: Int = 10_000) :
18+
InputPartitioner {
19+
private var nextPartition =
20+
Random(System.currentTimeMillis()).nextInt(Int.MAX_VALUE / rotateEveryNRecords) *
21+
rotateEveryNRecords
22+
23+
override fun getPartition(record: DestinationRecordRaw, numParts: Int): Int {
24+
val part = nextPartition++ / rotateEveryNRecords
25+
return Math.floorMod(part, numParts)
26+
}
27+
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ class DefaultDestinationTaskLauncher<K : WithStream>(
145145
// New interface shim
146146
@Named("recordQueue")
147147
private val recordQueueForPipeline:
148-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
148+
PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>>,
149149
@Named("batchStateUpdateQueue") private val batchUpdateQueue: ChannelMessageQueue<BatchUpdate>,
150150
private val loadPipeline: LoadPipeline?,
151151
private val partitioner: InputPartitioner,

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt

+7-9
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class DefaultInputConsumerTask(
8080
// Required by new interface
8181
@Named("recordQueue")
8282
private val recordQueueForPipeline:
83-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
83+
PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>>,
8484
private val loadPipeline: LoadPipeline? = null,
8585
private val partitioner: InputPartitioner,
8686
private val openStreamQueue: QueueWriter<DestinationStream>
@@ -165,20 +165,20 @@ class DefaultInputConsumerTask(
165165
mapOf(manager.getCurrentCheckpointId() to 1),
166166
StreamKey(stream),
167167
record
168-
)
168+
) { reserved.release() }
169169
val partition = partitioner.getPartition(record, recordQueueForPipeline.partitions)
170-
recordQueueForPipeline.publish(reserved.replace(pipelineMessage), partition)
170+
recordQueueForPipeline.publish(pipelineMessage, partition)
171171
}
172172
is DestinationRecordStreamComplete -> {
173173
manager.markEndOfStream(true)
174174
log.info { "Read COMPLETE for stream $stream" }
175-
recordQueueForPipeline.broadcast(reserved.replace(PipelineEndOfStream(stream)))
175+
recordQueueForPipeline.broadcast(PipelineEndOfStream(stream))
176176
reserved.release()
177177
}
178178
is DestinationRecordStreamIncomplete -> {
179179
manager.markEndOfStream(false)
180180
log.info { "Read INCOMPLETE for stream $stream" }
181-
recordQueueForPipeline.broadcast(reserved.replace(PipelineEndOfStream(stream)))
181+
recordQueueForPipeline.broadcast(PipelineEndOfStream(stream))
182182
reserved.release()
183183
}
184184
is DestinationFile -> {
@@ -309,8 +309,7 @@ interface InputConsumerTaskFactory {
309309
fileTransferQueue: MessageQueue<FileTransferQueueMessage>,
310310

311311
// Required by new interface
312-
recordQueueForPipeline:
313-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
312+
recordQueueForPipeline: PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>>,
314313
loadPipeline: LoadPipeline?,
315314
partitioner: InputPartitioner,
316315
openStreamQueue: QueueWriter<DestinationStream>,
@@ -332,8 +331,7 @@ class DefaultInputConsumerTaskFactory(
332331
fileTransferQueue: MessageQueue<FileTransferQueueMessage>,
333332

334333
// Required by new interface
335-
recordQueueForPipeline:
336-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
334+
recordQueueForPipeline: PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>>,
337335
loadPipeline: LoadPipeline?,
338336
partitioner: InputPartitioner,
339337
openStreamQueue: QueueWriter<DestinationStream>,

0 commit comments

Comments
 (0)