Skip to content

Commit bdf9688

Browse files
[Destination-S3] Uses New Load CDK LoadStrategy Interface (temporarily disabled) (#54695)
1 parent 78fd84a commit bdf9688

34 files changed

+1360
-216
lines changed

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ import io.airbyte.cdk.load.message.MultiProducerChannel
1414
import io.airbyte.cdk.load.message.PartitionedQueue
1515
import io.airbyte.cdk.load.message.PipelineEvent
1616
import io.airbyte.cdk.load.message.StreamKey
17+
import io.airbyte.cdk.load.message.StrictPartitionedQueue
1718
import io.airbyte.cdk.load.pipeline.BatchUpdate
1819
import io.airbyte.cdk.load.state.ReservationManager
19-
import io.airbyte.cdk.load.state.Reserved
2020
import io.airbyte.cdk.load.task.implementor.FileAggregateMessage
2121
import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage
2222
import io.airbyte.cdk.load.write.LoadStrategy
@@ -120,8 +120,8 @@ class SyncBeanFactory {
120120
@Named("recordQueue")
121121
fun recordQueue(
122122
loadStrategy: LoadStrategy? = null,
123-
): PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>> {
124-
return PartitionedQueue(
123+
): PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>> {
124+
return StrictPartitionedQueue(
125125
Array(loadStrategy?.inputPartitions ?: 1) {
126126
ChannelMessageQueue(Channel(Channel.UNLIMITED))
127127
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PartitionedQueue.kt

+40-7
Original file line numberDiff line numberDiff line change
@@ -5,28 +5,61 @@
55
package io.airbyte.cdk.load.message
66

77
import io.airbyte.cdk.load.util.CloseableCoroutine
8+
import kotlinx.coroutines.channels.Channel
89
import kotlinx.coroutines.flow.Flow
10+
import kotlinx.coroutines.flow.merge
911

10-
class PartitionedQueue<T>(private val queues: Array<MessageQueue<T>>) : CloseableCoroutine {
11-
val partitions = queues.size
12+
interface PartitionedQueue<T> : CloseableCoroutine {
13+
val partitions: Int
14+
fun consume(partition: Int): Flow<T>
15+
suspend fun publish(value: T, partition: Int)
16+
suspend fun broadcast(value: T)
17+
}
18+
19+
class StrictPartitionedQueue<T>(private val queues: Array<MessageQueue<T>>) : PartitionedQueue<T> {
20+
override val partitions = queues.size
1221

13-
fun consume(partition: Int): Flow<T> {
14-
if (partition < 0 || partition >= queues.size) {
22+
override fun consume(partition: Int): Flow<T> {
23+
if (partition < 0 || partition >= partitions) {
1524
throw IllegalArgumentException("Invalid partition: $partition")
1625
}
1726
return queues[partition].consume()
1827
}
1928

20-
suspend fun publish(value: T, partition: Int) {
21-
if (partition < 0 || partition >= queues.size) {
29+
override suspend fun publish(value: T, partition: Int) {
30+
if (partition < 0 || partition >= partitions) {
2231
throw IllegalArgumentException("Invalid partition: $partition")
2332
}
2433
queues[partition].publish(value)
2534
}
2635

27-
suspend fun broadcast(value: T) = queues.forEach { it.publish(value) }
36+
override suspend fun broadcast(value: T) = queues.forEach { it.publish(value) }
2837

2938
override suspend fun close() {
3039
queues.forEach { it.close() }
3140
}
3241
}
42+
43+
/**
44+
* This is for the use case where you want workers to grab work as it becomes available but still be
45+
* able to receive notifications that are guaranteed to be consumed by every partition.
46+
*/
47+
class SinglePartitionQueueWithMultiPartitionBroadcast<T>(
48+
private val sharedQueue: MessageQueue<T>,
49+
override val partitions: Int
50+
) : PartitionedQueue<T> {
51+
private val broadcastChannels =
52+
StrictPartitionedQueue(
53+
(0 until partitions).map { ChannelMessageQueue<T>(Channel(1)) }.toTypedArray()
54+
)
55+
56+
override fun consume(partition: Int): Flow<T> =
57+
merge(sharedQueue.consume(), broadcastChannels.consume(partition))
58+
override suspend fun publish(value: T, partition: Int) = sharedQueue.publish(value)
59+
override suspend fun broadcast(value: T) = broadcastChannels.broadcast(value)
60+
61+
override suspend fun close() {
62+
sharedQueue.close()
63+
broadcastChannels.close()
64+
}
65+
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PipelineEvent.kt

+9-5
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,19 @@ import io.airbyte.cdk.load.state.CheckpointId
1010
/** Used internally by the CDK to pass messages between steps in the loader pipeline. */
1111
sealed interface PipelineEvent<K : WithStream, T>
1212

13+
/**
14+
* A message that contains a keyed payload. The key is used to manage the state of the payload's
15+
* corresponding [io.airbyte.cdk.load.pipeline.BatchAccumulator]. [checkpointCounts] is used by the
16+
* CDK to perform state message bookkeeping. [postProcessingCallback] is for releasing resources
17+
* associated with the message.
18+
*/
1319
class PipelineMessage<K : WithStream, T>(
1420
val checkpointCounts: Map<CheckpointId, Long>,
1521
val key: K,
16-
val value: T
22+
val value: T,
23+
val postProcessingCallback: suspend () -> Unit = {},
1724
) : PipelineEvent<K, T>
1825

19-
/**
20-
* We send the end message on the stream and not the key, because there's no way to partition an
21-
* empty message.
22-
*/
26+
/** Broadcast at end-of-stream to all partitions to signal that the stream has ended. */
2327
class PipelineEndOfStream<K : WithStream, T>(val stream: DestinationStream.Descriptor) :
2428
PipelineEvent<K, T>

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/BatchAccumulator.kt

+52-5
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,58 @@ package io.airbyte.cdk.load.pipeline
77
import io.airbyte.cdk.load.message.WithStream
88

99
/**
10-
* [BatchAccumulator] is used internally by the CDK to implement RecordLoaders. Connector devs
11-
* should never need to implement this interface.
10+
* [BatchAccumulator] is used internally by the CDK to implement
11+
* [io.airbyte.cdk.load.write.LoadStrategy]s. Connector devs should never need to implement this
12+
* interface.
13+
*
14+
* It is the glue that connects a specific step in a specific pipeline to the generic pipeline on
15+
* the back end. (For example, in a three-stage pipeline like bulk load, step 1 is to create a part,
16+
* step 2 is to upload it, and step 3 is to load it from object storage into a table.)
17+
*
18+
* - [S] is a state type that will be threaded through accumulator calls.
19+
* - [K] is a key type associated the input data. (NOTE: Currently, there is no support for
20+
* key-mapping, so the key is always [io.airbyte.cdk.load.message.StreamKey]). Specifically, state
21+
* will always be managed per-key.
22+
* - [T] is the input data type
23+
* - [U] is the output data type
24+
*
25+
* The first time data is seen for a given key, [start] is called (with the partition number). The
26+
* state returned by [start] will be passed per input to [accept].
27+
*
28+
* If [accept] returns [IntermediateOutput] or [FinalOutput], the output will be forwarded to the
29+
* next stage (if applicable) and/or trigger bookkeeping (iff the output type implements
30+
* [io.airbyte.cdk.load.message.WithBatchState]).
31+
*
32+
* If [accept] returns a [NoOutput] or [IntermediateOutput], the nextState will be passed to the
33+
* next call to [accept]. If [accept] returns a [FinalOutput] state, any state will be discarded and
34+
* a new one will be created on the next input by a new call to [start].
35+
*
36+
* When the input stream is exhausted, [finish] will be called with any remaining state iff at least
37+
* one input was seen for that key. This means that [finish] will not be called on empty keys or on
38+
* keys where the last call to [accept] yielded a null (finished) state.
1239
*/
1340
interface BatchAccumulator<S, K : WithStream, T, U> {
14-
fun start(key: K, part: Int): S
15-
fun accept(record: T, state: S): Pair<S, U?>
16-
fun finish(state: S): U
41+
suspend fun start(key: K, part: Int): S
42+
suspend fun accept(input: T, state: S): BatchAccumulatorResult<S, U>
43+
suspend fun finish(state: S): FinalOutput<S, U>
44+
}
45+
46+
sealed interface BatchAccumulatorResult<S, U> {
47+
val nextState: S?
48+
val output: U?
49+
}
50+
51+
data class NoOutput<S, U>(
52+
override val nextState: S,
53+
) : BatchAccumulatorResult<S, U> {
54+
override val output: U? = null
55+
}
56+
57+
data class IntermediateOutput<S, U>(
58+
override val nextState: S,
59+
override val output: U,
60+
) : BatchAccumulatorResult<S, U>
61+
62+
data class FinalOutput<S, U>(override val output: U) : BatchAccumulatorResult<S, U> {
63+
override val nextState: S? = null
1764
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadPipelineStep.kt

+6-21
Original file line numberDiff line numberDiff line change
@@ -4,46 +4,31 @@
44

55
package io.airbyte.cdk.load.pipeline
66

7-
import io.airbyte.cdk.load.message.DestinationRecordRaw
8-
import io.airbyte.cdk.load.message.PartitionedQueue
9-
import io.airbyte.cdk.load.message.PipelineEvent
10-
import io.airbyte.cdk.load.message.QueueWriter
117
import io.airbyte.cdk.load.message.StreamKey
12-
import io.airbyte.cdk.load.state.Reserved
138
import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask
9+
import io.airbyte.cdk.load.task.internal.LoadPipelineStepTaskFactory
1410
import io.airbyte.cdk.load.write.DirectLoader
1511
import io.airbyte.cdk.load.write.DirectLoaderFactory
1612
import io.github.oshai.kotlinlogging.KotlinLogging
1713
import io.micronaut.context.annotation.Requires
18-
import io.micronaut.context.annotation.Value
19-
import jakarta.inject.Named
2014
import jakarta.inject.Singleton
2115

2216
@Singleton
2317
@Requires(bean = DirectLoaderFactory::class)
2418
class DirectLoadPipelineStep<S : DirectLoader>(
25-
val accumulator: DirectLoadRecordAccumulator<S, StreamKey>,
26-
@Named("recordQueue")
27-
val inputQueue: PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
28-
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
29-
@Value("\${airbyte.destination.core.record-batch-size-override:null}")
30-
val batchSizeOverride: Long? = null,
3119
val directLoaderFactory: DirectLoaderFactory<S>,
20+
val accumulator: DirectLoadRecordAccumulator<S, StreamKey>,
21+
val taskFactory: LoadPipelineStepTaskFactory,
3222
) : LoadPipelineStep {
3323
private val log = KotlinLogging.logger {}
34-
3524
override val numWorkers: Int = directLoaderFactory.inputPartitions
3625

3726
override fun taskForPartition(partition: Int): LoadPipelineStepTask<*, *, *, *, *> {
3827
log.info { "Creating DirectLoad pipeline step task for partition $partition" }
39-
return LoadPipelineStepTask(
28+
return taskFactory.createOnlyStep<S, StreamKey, DirectLoadAccResult>(
4029
accumulator,
41-
inputQueue.consume(partition),
42-
batchUpdateQueue = batchQueue,
43-
outputPartitioner = null,
44-
outputQueue = null as PartitionedQueue<PipelineEvent<StreamKey, DirectLoadAccResult>>?,
45-
batchSizeOverride?.let { RecordCountFlushStrategy(it) },
46-
partition
30+
partition,
31+
numWorkers
4732
)
4833
}
4934
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadRecordAccumulator.kt

+10-7
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,24 @@ data class DirectLoadAccResult(override val state: Batch.State) : WithBatchState
2626
class DirectLoadRecordAccumulator<S : DirectLoader, K : WithStream>(
2727
val directLoaderFactory: DirectLoaderFactory<S>
2828
) : BatchAccumulator<S, K, DestinationRecordRaw, DirectLoadAccResult> {
29-
override fun start(key: K, part: Int): S {
29+
override suspend fun start(key: K, part: Int): S {
3030
return directLoaderFactory.create(key.stream, part)
3131
}
3232

33-
override fun accept(record: DestinationRecordRaw, state: S): Pair<S, DirectLoadAccResult?> {
34-
state.accept(record).let {
33+
override suspend fun accept(
34+
input: DestinationRecordRaw,
35+
state: S
36+
): BatchAccumulatorResult<S, DirectLoadAccResult> {
37+
state.accept(input).let {
3538
return when (it) {
36-
is Incomplete -> Pair(state, null)
37-
is Complete -> Pair(state, DirectLoadAccResult(Batch.State.COMPLETE))
39+
is Incomplete -> NoOutput(state)
40+
is Complete -> FinalOutput(DirectLoadAccResult(Batch.State.COMPLETE))
3841
}
3942
}
4043
}
4144

42-
override fun finish(state: S): DirectLoadAccResult {
45+
override suspend fun finish(state: S): FinalOutput<S, DirectLoadAccResult> {
4346
state.finish()
44-
return DirectLoadAccResult(Batch.State.COMPLETE)
47+
return FinalOutput(DirectLoadAccResult(Batch.State.COMPLETE))
4548
}
4649
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt

+3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ interface InputPartitioner {
1717
fun getPartition(record: DestinationRecordRaw, numParts: Int): Int
1818
}
1919

20+
/**
21+
* The default input partitioner, which partitions by the stream name. TODO: Should be round-robin?
22+
*/
2023
@Singleton
2124
@Secondary
2225
class ByStreamInputPartitioner : InputPartitioner {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.pipeline
6+
7+
import io.airbyte.cdk.load.message.DestinationRecordRaw
8+
import kotlin.random.Random
9+
10+
/**
11+
* Declare a singleton of this type to have input distributed evenly across the input partitions.
12+
* (The default is to [ByStreamInputPartitioner].)
13+
*
14+
* [rotateEveryNRecords] determines how often to rotate to the next partition. In testing, 10_000
15+
* seems to be the sweet spot between too much context switching and not enough load balancing.
16+
*/
17+
open class RoundRobinInputPartitioner(private val rotateEveryNRecords: Int = 10_000) :
18+
InputPartitioner {
19+
private var nextPartition =
20+
Random(System.currentTimeMillis()).nextInt(Int.MAX_VALUE / rotateEveryNRecords) *
21+
rotateEveryNRecords
22+
23+
override fun getPartition(record: DestinationRecordRaw, numParts: Int): Int {
24+
val part = nextPartition++ / rotateEveryNRecords
25+
return Math.floorMod(part, numParts)
26+
}
27+
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ class DefaultDestinationTaskLauncher<K : WithStream>(
145145
// New interface shim
146146
@Named("recordQueue")
147147
private val recordQueueForPipeline:
148-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
148+
PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>>,
149149
@Named("batchStateUpdateQueue") private val batchUpdateQueue: ChannelMessageQueue<BatchUpdate>,
150150
private val loadPipeline: LoadPipeline?,
151151
private val partitioner: InputPartitioner,

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/implementor/TeardownTask.kt

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class DefaultTeardownTask(
4141
return
4242
}
4343

44+
log.info { "All streams processed successfully, awaiting all checkpoints flushed" }
4445
checkpointManager.awaitAllCheckpointsFlushed()
4546
log.info { "Starting teardown task" }
4647
destination.teardown()

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt

+7-9
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class DefaultInputConsumerTask(
8080
// Required by new interface
8181
@Named("recordQueue")
8282
private val recordQueueForPipeline:
83-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
83+
PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>>,
8484
private val loadPipeline: LoadPipeline? = null,
8585
private val partitioner: InputPartitioner,
8686
private val openStreamQueue: QueueWriter<DestinationStream>
@@ -165,20 +165,20 @@ class DefaultInputConsumerTask(
165165
mapOf(manager.getCurrentCheckpointId() to 1),
166166
StreamKey(stream),
167167
record
168-
)
168+
) { reserved.release() }
169169
val partition = partitioner.getPartition(record, recordQueueForPipeline.partitions)
170-
recordQueueForPipeline.publish(reserved.replace(pipelineMessage), partition)
170+
recordQueueForPipeline.publish(pipelineMessage, partition)
171171
}
172172
is DestinationRecordStreamComplete -> {
173173
manager.markEndOfStream(true)
174174
log.info { "Read COMPLETE for stream $stream" }
175-
recordQueueForPipeline.broadcast(reserved.replace(PipelineEndOfStream(stream)))
175+
recordQueueForPipeline.broadcast(PipelineEndOfStream(stream))
176176
reserved.release()
177177
}
178178
is DestinationRecordStreamIncomplete -> {
179179
manager.markEndOfStream(false)
180180
log.info { "Read INCOMPLETE for stream $stream" }
181-
recordQueueForPipeline.broadcast(reserved.replace(PipelineEndOfStream(stream)))
181+
recordQueueForPipeline.broadcast(PipelineEndOfStream(stream))
182182
reserved.release()
183183
}
184184
is DestinationFile -> {
@@ -309,8 +309,7 @@ interface InputConsumerTaskFactory {
309309
fileTransferQueue: MessageQueue<FileTransferQueueMessage>,
310310

311311
// Required by new interface
312-
recordQueueForPipeline:
313-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
312+
recordQueueForPipeline: PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>>,
314313
loadPipeline: LoadPipeline?,
315314
partitioner: InputPartitioner,
316315
openStreamQueue: QueueWriter<DestinationStream>,
@@ -332,8 +331,7 @@ class DefaultInputConsumerTaskFactory(
332331
fileTransferQueue: MessageQueue<FileTransferQueueMessage>,
333332

334333
// Required by new interface
335-
recordQueueForPipeline:
336-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
334+
recordQueueForPipeline: PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>>,
337335
loadPipeline: LoadPipeline?,
338336
partitioner: InputPartitioner,
339337
openStreamQueue: QueueWriter<DestinationStream>,

0 commit comments

Comments
 (0)