Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit b5c5d8e

Browse files
committedMar 5, 2025·
WIP: S3 Uses New Interface
1 parent 3d3edfb commit b5c5d8e

File tree

30 files changed

+685
-104
lines changed

30 files changed

+685
-104
lines changed
 

‎airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import io.airbyte.cdk.load.message.PipelineEvent
1616
import io.airbyte.cdk.load.message.StreamKey
1717
import io.airbyte.cdk.load.pipeline.BatchUpdate
1818
import io.airbyte.cdk.load.state.ReservationManager
19-
import io.airbyte.cdk.load.state.Reserved
2019
import io.airbyte.cdk.load.task.implementor.FileAggregateMessage
2120
import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage
2221
import io.airbyte.cdk.load.write.LoadStrategy
@@ -120,7 +119,7 @@ class SyncBeanFactory {
120119
@Named("recordQueue")
121120
fun recordQueue(
122121
loadStrategy: LoadStrategy? = null,
123-
): PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>> {
122+
): PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>> {
124123
return PartitionedQueue(
125124
Array(loadStrategy?.inputPartitions ?: 1) {
126125
ChannelMessageQueue(Channel(Channel.UNLIMITED))

‎airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PipelineEvent.kt

+9-5
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,19 @@ import io.airbyte.cdk.load.state.CheckpointId
1010
/** Used internally by the CDK to pass messages between steps in the loader pipeline. */
1111
sealed interface PipelineEvent<K : WithStream, T>
1212

13+
/**
14+
* A message that contains a keyed payload. The key is used to manage the state of the payload's
15+
* corresponding [io.airbyte.cdk.load.pipeline.BatchAccumulator]. [checkpointCounts] is used by the
16+
* CDK to perform state message bookkeeping. [postProcessingCallback] is for releasing resources
17+
* associated with the message.
18+
*/
1319
class PipelineMessage<K : WithStream, T>(
1420
val checkpointCounts: Map<CheckpointId, Long>,
1521
val key: K,
16-
val value: T
22+
val value: T,
23+
val postProcessingCallback: suspend () -> Unit = {},
1724
) : PipelineEvent<K, T>
1825

19-
/**
20-
* We send the end message on the stream and not the key, because there's no way to partition an
21-
* empty message.
22-
*/
26+
/** Broadcast at end-of-stream to all partitions to signal that the stream has ended. */
2327
class PipelineEndOfStream<K : WithStream, T>(val stream: DestinationStream.Descriptor) :
2428
PipelineEvent<K, T>

‎airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/BatchAccumulator.kt

+32-5
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,38 @@ package io.airbyte.cdk.load.pipeline
77
import io.airbyte.cdk.load.message.WithStream
88

99
/**
10-
* [BatchAccumulator] is used internally by the CDK to implement RecordLoaders. Connector devs
11-
* should never need to implement this interface.
10+
* [BatchAccumulator] is used internally by the CDK to implement
11+
* [io.airbyte.cdk.load.write.LoadStrategy]s. Connector devs should never need to implement this
12+
* interface.
13+
*
14+
* It is the glue that connects a specific step in a specific pipeline to the generic pipeline on
15+
* the back end. (For example, in a three-stage pipeline like bulk load, step 1 is to create a part,
16+
* step 2 is to upload it, and step 3 is to load it from object storage into a table.)
17+
*
18+
* - [S] is a state type that will be threaded through accumulator calls.
19+
* - [K] is a key type associated the input data. (NOTE: Currently, there is no support for
20+
* key-mapping, so the key is always [io.airbyte.cdk.load.message.StreamKey]). Specifically, state
21+
* will always be managed per-key.
22+
* - [T] is the input data type
23+
* - [U] is the output data type
24+
*
25+
* The first time data is seen for a given key, [start] is called (with the partition number). The
26+
* state returned by [start] will be passed per input to [accept].
27+
*
28+
* If [accept] returns a non-null output, that output will be forwarded to the next stage (if
29+
* applicable) and/or trigger bookkeeping (iff the output type implements
30+
* [io.airbyte.cdk.load.message.WithBatchState]).
31+
*
32+
* If [accept] returns a non-null state, that state will be passed to the next call to [accept]. If
33+
* [accept] returns a null state, the state will be discarded and a new one will be created on the
34+
* next input by a new call to [start].
35+
*
36+
* When the input stream is exhausted, [finish] will be called with any remaining state iff at least
37+
* one input was seen for that key. This means that [finish] will not be called on empty keys or on
38+
* keys where the last call to [accept] yielded a null (finished) state.
1239
*/
1340
interface BatchAccumulator<S, K : WithStream, T, U> {
14-
fun start(key: K, part: Int): S
15-
fun accept(record: T, state: S): Pair<S, U?>
16-
fun finish(state: S): U
41+
suspend fun start(key: K, part: Int): S
42+
suspend fun accept(input: T, state: S): Pair<S?, U?>
43+
suspend fun finish(state: S): U
1744
}

‎airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadPipelineStep.kt

+1-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import io.airbyte.cdk.load.message.PartitionedQueue
99
import io.airbyte.cdk.load.message.PipelineEvent
1010
import io.airbyte.cdk.load.message.QueueWriter
1111
import io.airbyte.cdk.load.message.StreamKey
12-
import io.airbyte.cdk.load.state.Reserved
1312
import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask
1413
import io.airbyte.cdk.load.write.DirectLoader
1514
import io.airbyte.cdk.load.write.DirectLoaderFactory
@@ -24,8 +23,7 @@ import jakarta.inject.Singleton
2423
class DirectLoadPipelineStep<S : DirectLoader>(
2524
val accumulator: DirectLoadRecordAccumulator<S, StreamKey>,
2625
@Named("recordQueue")
27-
val inputQueue:
28-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
26+
val inputQueue: PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
2927
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
3028
@Value("\${airbyte.destination.core.record-batch-size-override:null}")
3129
val batchSizeOverride: Long? = null,

‎airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadRecordAccumulator.kt

+7-7
Original file line numberDiff line numberDiff line change
@@ -26,23 +26,23 @@ data class DirectLoadAccResult(override val state: Batch.State) : WithBatchState
2626
class DirectLoadRecordAccumulator<S : DirectLoader, K : WithStream>(
2727
val directLoaderFactory: DirectLoaderFactory<S>
2828
) : BatchAccumulator<S, K, DestinationRecordAirbyteValue, DirectLoadAccResult> {
29-
override fun start(key: K, part: Int): S {
29+
override suspend fun start(key: K, part: Int): S {
3030
return directLoaderFactory.create(key.stream, part)
3131
}
3232

33-
override fun accept(
34-
record: DestinationRecordAirbyteValue,
33+
override suspend fun accept(
34+
input: DestinationRecordAirbyteValue,
3535
state: S
36-
): Pair<S, DirectLoadAccResult?> {
37-
state.accept(record).let {
36+
): Pair<S?, DirectLoadAccResult?> {
37+
state.accept(input).let {
3838
return when (it) {
3939
is Incomplete -> Pair(state, null)
40-
is Complete -> Pair(state, DirectLoadAccResult(Batch.State.COMPLETE))
40+
is Complete -> Pair(null, DirectLoadAccResult(Batch.State.COMPLETE))
4141
}
4242
}
4343
}
4444

45-
override fun finish(state: S): DirectLoadAccResult {
45+
override suspend fun finish(state: S): DirectLoadAccResult {
4646
state.finish()
4747
return DirectLoadAccResult(Batch.State.COMPLETE)
4848
}

‎airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt

+3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ interface InputPartitioner {
1717
fun getPartition(record: DestinationRecordAirbyteValue, numParts: Int): Int
1818
}
1919

20+
/**
21+
* The default input partitioner, which partitions by the stream name. TODO: Should be round-robin?
22+
*/
2023
@Singleton
2124
@Secondary
2225
class ByStreamInputPartitioner : InputPartitioner {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.pipeline
6+
7+
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
8+
import kotlin.math.abs
9+
import kotlin.random.Random
10+
11+
/**
12+
* Declare a singleton of this type to have input distributed evenly across the input partitions.
13+
* (The default is to [ByStreamInputPartitioner].)
14+
*/
15+
open class RoundRobinInputPartitioner(private val rotateEveryNRecords: Int = 10_000) :
16+
InputPartitioner {
17+
private var nextPartition = Random(System.currentTimeMillis()).nextInt()
18+
19+
override fun getPartition(record: DestinationRecordAirbyteValue, numParts: Int): Int {
20+
val part = nextPartition++ / rotateEveryNRecords
21+
return if (part == Int.MIN_VALUE) { // avoid overflow
22+
0
23+
} else {
24+
abs(part) % numParts
25+
}
26+
}
27+
}

‎airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ class DefaultDestinationTaskLauncher<K : WithStream>(
145145
// New interface shim
146146
@Named("recordQueue")
147147
private val recordQueueForPipeline:
148-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
148+
PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
149149
@Named("batchStateUpdateQueue") private val batchUpdateQueue: ChannelMessageQueue<BatchUpdate>,
150150
private val loadPipeline: LoadPipeline?,
151151
private val partitioner: InputPartitioner,

‎airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt

+7-7
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class DefaultInputConsumerTask(
8080
// Required by new interface
8181
@Named("recordQueue")
8282
private val recordQueueForPipeline:
83-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
83+
PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
8484
private val loadPipeline: LoadPipeline? = null,
8585
private val partitioner: InputPartitioner,
8686
private val openStreamQueue: QueueWriter<DestinationStream>
@@ -165,20 +165,20 @@ class DefaultInputConsumerTask(
165165
mapOf(manager.getCurrentCheckpointId() to 1),
166166
StreamKey(stream),
167167
record
168-
)
168+
) { reserved.release() }
169169
val partition = partitioner.getPartition(record, recordQueueForPipeline.partitions)
170-
recordQueueForPipeline.publish(reserved.replace(pipelineMessage), partition)
170+
recordQueueForPipeline.publish(pipelineMessage, partition)
171171
}
172172
is DestinationRecordStreamComplete -> {
173173
manager.markEndOfStream(true)
174174
log.info { "Read COMPLETE for stream $stream" }
175-
recordQueueForPipeline.broadcast(reserved.replace(PipelineEndOfStream(stream)))
175+
recordQueueForPipeline.broadcast(PipelineEndOfStream(stream))
176176
reserved.release()
177177
}
178178
is DestinationRecordStreamIncomplete -> {
179179
manager.markEndOfStream(false)
180180
log.info { "Read INCOMPLETE for stream $stream" }
181-
recordQueueForPipeline.broadcast(reserved.replace(PipelineEndOfStream(stream)))
181+
recordQueueForPipeline.broadcast(PipelineEndOfStream(stream))
182182
reserved.release()
183183
}
184184
is DestinationFile -> {
@@ -310,7 +310,7 @@ interface InputConsumerTaskFactory {
310310

311311
// Required by new interface
312312
recordQueueForPipeline:
313-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
313+
PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
314314
loadPipeline: LoadPipeline?,
315315
partitioner: InputPartitioner,
316316
openStreamQueue: QueueWriter<DestinationStream>,
@@ -333,7 +333,7 @@ class DefaultInputConsumerTaskFactory(
333333

334334
// Required by new interface
335335
recordQueueForPipeline:
336-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
336+
PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
337337
loadPipeline: LoadPipeline?,
338338
partitioner: InputPartitioner,
339339
openStreamQueue: QueueWriter<DestinationStream>,

‎airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt

+53-24
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import io.airbyte.cdk.load.pipeline.BatchUpdate
1818
import io.airbyte.cdk.load.pipeline.OutputPartitioner
1919
import io.airbyte.cdk.load.pipeline.PipelineFlushStrategy
2020
import io.airbyte.cdk.load.state.CheckpointId
21-
import io.airbyte.cdk.load.state.Reserved
2221
import io.airbyte.cdk.load.task.OnEndOfSync
2322
import io.airbyte.cdk.load.task.Task
2423
import io.airbyte.cdk.load.task.TerminalCondition
@@ -34,7 +33,7 @@ data class RangeState<S>(
3433
/** A long-running task that actually implements a load pipeline step. */
3534
class LoadPipelineStepTask<S : AutoCloseable, K1 : WithStream, T, K2 : WithStream, U : Any>(
3635
private val batchAccumulator: BatchAccumulator<S, K1, T, U>,
37-
private val inputFlow: Flow<Reserved<PipelineEvent<K1, T>>>,
36+
private val inputFlow: Flow<PipelineEvent<K1, T>>,
3837
private val batchUpdateQueue: QueueWriter<BatchUpdate>,
3938
private val outputPartitioner: OutputPartitioner<K1, T, K2, U>?,
4039
private val outputQueue: PartitionedQueue<PipelineEvent<K2, U>>?,
@@ -44,11 +43,11 @@ class LoadPipelineStepTask<S : AutoCloseable, K1 : WithStream, T, K2 : WithStrea
4443
override val terminalCondition: TerminalCondition = OnEndOfSync
4544

4645
override suspend fun execute() {
47-
inputFlow.fold(mutableMapOf<K1, RangeState<S>>()) { stateStore, reservation ->
46+
inputFlow.fold(mutableMapOf<K1, RangeState<S>>()) { stateStore, input ->
4847
try {
49-
when (val input = reservation.value) {
48+
when (input) {
5049
is PipelineMessage -> {
51-
// Fetch and update the local state associated with the current batch.
50+
// Get or create the accumulator state associated w/ the input key.
5251
val state =
5352
stateStore
5453
.getOrPut(input.key) {
@@ -57,43 +56,73 @@ class LoadPipelineStepTask<S : AutoCloseable, K1 : WithStream, T, K2 : WithStrea
5756
)
5857
}
5958
.let { it.copy(inputCount = it.inputCount + 1) }
60-
val (newState, output) =
59+
60+
// Accumulate the input and get the new state and output.
61+
val (newStateMaybe, outputMaybe) =
6162
batchAccumulator.accept(
6263
input.value,
6364
state.state,
6465
)
65-
reservation.release() // TODO: Accumulate and release when persisted
66+
/** TODO: Make this impossible at the return type level */
67+
if (newStateMaybe == null && outputMaybe == null) {
68+
throw IllegalStateException(
69+
"BatchAccumulator must return a new state or an output"
70+
)
71+
}
72+
73+
// Update bookkeeping metadata
74+
input
75+
.postProcessingCallback() // TODO: Accumulate and release when persisted
6676
input.checkpointCounts.forEach {
6777
state.checkpointCounts.merge(it.key, it.value) { old, new -> old + new }
6878
}
6979

70-
// If the accumulator did not produce a result, check if we should flush.
71-
// If so, use the result of a finish call as the output.
72-
val finalOutput =
73-
output
74-
?: if (flushStrategy?.shouldFlush(state.inputCount) == true) {
75-
batchAccumulator.finish(newState)
80+
// Finalize the state and output
81+
val (finalState, finalOutput) =
82+
if (outputMaybe == null) {
83+
// Possibly force an output (and if so, discard the state)
84+
if (flushStrategy?.shouldFlush(state.inputCount) == true) {
85+
val finalOutput = batchAccumulator.finish(newStateMaybe!!)
86+
Pair(null, finalOutput)
7687
} else {
77-
null
88+
Pair(newStateMaybe, null)
7889
}
90+
} else {
91+
// Otherwise, just use what we were given
92+
Pair(newStateMaybe, outputMaybe)
93+
}
7994

80-
if (finalOutput != null) {
81-
// Publish the emitted output and evict the state.
82-
handleOutput(input.key, state.checkpointCounts, finalOutput)
83-
stateStore.remove(input.key)
95+
// Publish the output if there is one & reset the input count
96+
val inputCount =
97+
if (finalOutput != null) {
98+
// Publish the emitted output and evict the state.
99+
handleOutput(input.key, state.checkpointCounts, finalOutput)
100+
state.checkpointCounts.clear()
101+
0
102+
} else {
103+
state.inputCount
104+
}
105+
106+
// Update the state if `accept` returned a new state, otherwise evict.
107+
if (finalState != null) {
108+
// If accept returned a new state, update the state store.
109+
stateStore[input.key] =
110+
state.copy(state = finalState, inputCount = inputCount)
84111
} else {
85-
// If there's no output yet, just update the local state.
86-
stateStore[input.key] = RangeState(newState, state.checkpointCounts)
112+
stateStore.remove(input.key)
87113
}
114+
88115
stateStore
89116
}
90117
is PipelineEndOfStream -> {
91118
// Give any key associated with the stream a chance to finish
92119
val keysToRemove = stateStore.keys.filter { it.stream == input.stream }
93120
keysToRemove.forEach { key ->
94121
stateStore.remove(key)?.let { stored ->
95-
val output = batchAccumulator.finish(stored.state)
96-
handleOutput(key, stored.checkpointCounts, output)
122+
if (stored.inputCount > 0) {
123+
val output = batchAccumulator.finish(stored.state)
124+
handleOutput(key, stored.checkpointCounts, output)
125+
}
97126
}
98127
}
99128

@@ -122,7 +151,7 @@ class LoadPipelineStepTask<S : AutoCloseable, K1 : WithStream, T, K2 : WithStrea
122151
// Only publish the output if there's a next step.
123152
outputQueue?.let {
124153
val outputKey = outputPartitioner!!.getOutputKey(inputKey, output)
125-
val message = PipelineMessage(checkpointCounts, outputKey, output)
154+
val message = PipelineMessage(checkpointCounts.toMap(), outputKey, output)
126155
val outputPart = outputPartitioner.getPart(outputKey, it.partitions)
127156
it.publish(message, outputPart)
128157
}
@@ -132,7 +161,7 @@ class LoadPipelineStepTask<S : AutoCloseable, K1 : WithStream, T, K2 : WithStrea
132161
val update =
133162
BatchStateUpdate(
134163
stream = inputKey.stream,
135-
checkpointCounts = checkpointCounts,
164+
checkpointCounts = checkpointCounts.toMap(),
136165
state = output.state
137166
)
138167
batchUpdateQueue.publish(update)

‎airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ class DestinationTaskLauncherTest {
159159
destinationTaskLauncher: DestinationTaskLauncher,
160160
fileTransferQueue: MessageQueue<FileTransferQueueMessage>,
161161
recordQueueForPipeline:
162-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
162+
PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
163163
loadPipeline: LoadPipeline?,
164164
partitioner: InputPartitioner,
165165
openStreamQueue: QueueWriter<DestinationStream>,

‎airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ class DestinationTaskLauncherUTest {
9595
private val openStreamQueue: MessageQueue<DestinationStream> = mockk(relaxed = true)
9696

9797
private val recordQueueForPipeline:
98-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>> =
98+
PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>> =
9999
mockk(relaxed = true)
100100
private val batchUpdateQueue: ChannelMessageQueue<BatchUpdate> = mockk(relaxed = true)
101101
private val partitioner: InputPartitioner = mockk(relaxed = true)

‎airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskUTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class InputConsumerTaskUTest {
4747
@MockK lateinit var fileTransferQueue: MessageQueue<FileTransferQueueMessage>
4848
@MockK
4949
lateinit var recordQueueForPipeline:
50-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>
50+
PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>
5151
@MockK lateinit var partitioner: InputPartitioner
5252
@MockK lateinit var openStreamQueue: QueueWriter<DestinationStream>
5353

‎airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTaskUTest.kt

+31-41
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import io.airbyte.cdk.load.pipeline.BatchAccumulator
1616
import io.airbyte.cdk.load.pipeline.BatchStateUpdate
1717
import io.airbyte.cdk.load.pipeline.BatchUpdate
1818
import io.airbyte.cdk.load.state.CheckpointId
19-
import io.airbyte.cdk.load.state.Reserved
2019
import io.airbyte.cdk.load.util.setOnce
2120
import io.mockk.coEvery
2221
import io.mockk.coVerify
@@ -37,7 +36,7 @@ class LoadPipelineStepTaskUTest {
3736
@MockK
3837
lateinit var batchAccumulatorWithUpdate:
3938
BatchAccumulator<AutoCloseable, StreamKey, String, MyBatch>
40-
@MockK lateinit var inputFlow: Flow<Reserved<PipelineEvent<StreamKey, String>>>
39+
@MockK lateinit var inputFlow: Flow<PipelineEvent<StreamKey, String>>
4140
@MockK lateinit var batchUpdateQueue: QueueWriter<BatchUpdate>
4241

4342
data class Closeable(val id: Int = 0) : AutoCloseable {
@@ -66,15 +65,14 @@ class LoadPipelineStepTaskUTest {
6665
part
6766
)
6867

69-
private fun <T> reserved(value: T): Reserved<T> = Reserved(null, 0L, value)
7068
private fun messageEvent(
7169
key: StreamKey,
7270
value: String,
7371
counts: Map<Int, Long> = emptyMap()
74-
): Reserved<PipelineEvent<StreamKey, String>> =
75-
reserved(PipelineMessage(counts.mapKeys { CheckpointId(it.key) }, key, value))
76-
private fun endOfStreamEvent(key: StreamKey): Reserved<PipelineEvent<StreamKey, String>> =
77-
reserved(PipelineEndOfStream(key.stream))
72+
): PipelineEvent<StreamKey, String> =
73+
PipelineMessage(counts.mapKeys { CheckpointId(it.key) }, key, value)
74+
private fun endOfStreamEvent(key: StreamKey): PipelineEvent<StreamKey, String> =
75+
PipelineEndOfStream(key.stream)
7876

7977
@Test
8078
fun `start and accept called on first no-output message, accept only on second`() = runTest {
@@ -91,8 +89,7 @@ class LoadPipelineStepTaskUTest {
9189

9290
coEvery { inputFlow.collect(any()) } coAnswers
9391
{
94-
val collector =
95-
firstArg<FlowCollector<Reserved<PipelineEvent<StreamKey, String>>>>()
92+
val collector = firstArg<FlowCollector<PipelineEvent<StreamKey, String>>>()
9693
repeat(2) { collector.emit(messageEvent(key, "value_$it")) }
9794
}
9895

@@ -110,7 +107,6 @@ class LoadPipelineStepTaskUTest {
110107
val task = createTask(part, batchAccumulatorNoUpdate)
111108
val stateA1 = Closeable(1)
112109
val stateA2 = Closeable(2)
113-
val stateA3 = Closeable(3)
114110
val stateB1 = Closeable(4)
115111
val stateB2 = Closeable(5)
116112
val startHasBeenCalled = AtomicBoolean(false)
@@ -120,13 +116,12 @@ class LoadPipelineStepTaskUTest {
120116
if (startHasBeenCalled.setOnce()) stateA1 else stateB1
121117
}
122118
every { batchAccumulatorNoUpdate.accept("value_0", stateA1) } returns Pair(stateA2, null)
123-
every { batchAccumulatorNoUpdate.accept("value_1", stateA2) } returns Pair(stateA3, true)
119+
every { batchAccumulatorNoUpdate.accept("value_1", stateA2) } returns Pair(null, true)
124120
every { batchAccumulatorNoUpdate.accept("value_2", stateB1) } returns Pair(stateB2, null)
125121

126122
coEvery { inputFlow.collect(any()) } coAnswers
127123
{
128-
val collector =
129-
firstArg<FlowCollector<Reserved<PipelineEvent<StreamKey, String>>>>()
124+
val collector = firstArg<FlowCollector<PipelineEvent<StreamKey, String>>>()
130125
repeat(3) { collector.emit(messageEvent(key, "value_$it")) }
131126
}
132127

@@ -149,8 +144,7 @@ class LoadPipelineStepTaskUTest {
149144

150145
coEvery { inputFlow.collect(any()) } coAnswers
151146
{
152-
val collector =
153-
firstArg<FlowCollector<Reserved<PipelineEvent<StreamKey, String>>>>()
147+
val collector = firstArg<FlowCollector<PipelineEvent<StreamKey, String>>>()
154148
repeat(10) { // arbitrary number of messages
155149
collector.emit(messageEvent(key, "value"))
156150
}
@@ -176,15 +170,14 @@ class LoadPipelineStepTaskUTest {
176170
every { batchAccumulatorNoUpdate.start(any(), any()) } returns Closeable()
177171
every { batchAccumulatorNoUpdate.accept(any(), any()) } answers
178172
{
179-
if (++acceptCalls == 10) Pair(Closeable(), true) else Pair(Closeable(), null)
173+
if (++acceptCalls == 10) Pair(null, true) else Pair(Closeable(), null)
180174
}
181175
every { batchAccumulatorNoUpdate.finish(any()) } returns true
182176
coEvery { batchUpdateQueue.publish(any()) } returns Unit
183177

184178
coEvery { inputFlow.collect(any()) } coAnswers
185179
{
186-
val collector =
187-
firstArg<FlowCollector<Reserved<PipelineEvent<StreamKey, String>>>>()
180+
val collector = firstArg<FlowCollector<PipelineEvent<StreamKey, String>>>()
188181
repeat(10) { // arbitrary number of messages
189182
collector.emit(messageEvent(key, "value"))
190183
}
@@ -209,21 +202,18 @@ class LoadPipelineStepTaskUTest {
209202
every { batchAccumulatorWithUpdate.start(any(), any()) } returns Closeable()
210203
every { batchAccumulatorWithUpdate.accept(any(), any()) } answers
211204
{
212-
val output =
213-
when (acceptCalls++ % 4) {
214-
0 -> null
215-
1 -> MyBatch(Batch.State.PROCESSED)
216-
2 -> MyBatch(Batch.State.PERSISTED)
217-
3 -> MyBatch(Batch.State.COMPLETE)
218-
else -> error("unreachable")
219-
}
220-
Pair(Closeable(), output)
205+
when (acceptCalls++ % 4) {
206+
0 -> Pair(Closeable(), null)
207+
1 -> Pair(null, MyBatch(Batch.State.PROCESSED))
208+
2 -> Pair(null, MyBatch(Batch.State.PERSISTED))
209+
3 -> Pair(null, MyBatch(Batch.State.COMPLETE))
210+
else -> error("unreachable")
211+
}
221212
}
222213
coEvery { batchUpdateQueue.publish(any()) } returns Unit
223214
coEvery { inputFlow.collect(any()) } coAnswers
224215
{
225-
val collector =
226-
firstArg<FlowCollector<Reserved<PipelineEvent<StreamKey, String>>>>()
216+
val collector = firstArg<FlowCollector<PipelineEvent<StreamKey, String>>>()
227217
repeat(12) { // arbitrary number of messages
228218
collector.emit(messageEvent(key, "value"))
229219
}
@@ -281,21 +271,22 @@ class LoadPipelineStepTaskUTest {
281271
}
282272
repeat(10) {
283273
every { batchAccumulatorWithUpdate.accept(any(), stream1States[it]) } returns
284-
Pair(
285-
stream1States[it + 1],
286-
if (it % 3 == 0) MyBatch(Batch.State.PERSISTED) else null
287-
)
274+
if (it % 3 == 0) {
275+
Pair(null, MyBatch(Batch.State.PERSISTED))
276+
} else {
277+
Pair(stream1States[it + 1], null)
278+
}
288279
every { batchAccumulatorWithUpdate.accept(any(), stream2States[it]) } returns
289-
Pair(
290-
stream2States[it + 1],
291-
if (it % 2 == 0) MyBatch(Batch.State.COMPLETE) else null
292-
)
280+
if (it % 2 == 0) {
281+
Pair(null, MyBatch(Batch.State.PERSISTED))
282+
} else {
283+
Pair(stream2States[it + 1], null)
284+
}
293285
}
294286

295287
coEvery { inputFlow.collect(any()) } coAnswers
296288
{
297-
val collector =
298-
firstArg<FlowCollector<Reserved<PipelineEvent<StreamKey, String>>>>()
289+
val collector = firstArg<FlowCollector<PipelineEvent<StreamKey, String>>>()
299290
repeat(10) { // arbitrary number of messages
300291
collector.emit(messageEvent(key1, "stream1_value"))
301292
collector.emit(messageEvent(key2, "stream2_value"))
@@ -343,8 +334,7 @@ class LoadPipelineStepTaskUTest {
343334

344335
coEvery { inputFlow.collect(any()) } coAnswers
345336
{
346-
val collector =
347-
firstArg<FlowCollector<Reserved<PipelineEvent<StreamKey, String>>>>()
337+
val collector = firstArg<FlowCollector<PipelineEvent<StreamKey, String>>>()
348338

349339
// Emit 10 messages for stream1, 10 messages for stream2
350340
repeat(12) {

‎airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/PartFactory.kt

+10-2
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ data class Part(
6363
) {
6464
val isEmpty: Boolean
6565
get() = bytes == null
66+
67+
override fun toString(): String {
68+
return "Part(key='$key', size=${bytes?.size}, index=$partIndex, isFinal=$isFinal)"
69+
}
6670
}
6771

6872
class PartBookkeeper {
@@ -87,7 +91,9 @@ class PartBookkeeper {
8791
// index even if it is empty.
8892
if (part.isFinal) {
8993
if (!finalIndex.compareAndSet(null, part.partIndex)) {
90-
throw IllegalStateException("Final part already seen for ${part.key}")
94+
throw IllegalStateException(
95+
"Final part ${finalIndex.get()} already seen for ${part.key}"
96+
)
9197
}
9298
}
9399
}
@@ -99,5 +105,7 @@ class PartBookkeeper {
99105
* 3. the last index is the final index
100106
*/
101107
val isComplete: Boolean
102-
get() = finalIndex.get()?.let { it == partIndexes.size } ?: false
108+
get() {
109+
return finalIndex.get()?.let { it == partIndexes.size } ?: false
110+
}
103111
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.pipline.object_storage
6+
7+
import io.airbyte.cdk.load.file.object_storage.Part
8+
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
9+
import io.airbyte.cdk.load.message.StreamKey
10+
import io.airbyte.cdk.load.pipeline.OutputPartitioner
11+
import kotlin.math.abs
12+
import kotlin.random.Random
13+
14+
/**
15+
* For routing parts to upload workers.
16+
*
17+
* The key is the object key (filename). This means that the framework will keep separate state per
18+
* upload-in-progress.
19+
*
20+
* The partition is round-robin, for maximum concurrency.
21+
*/
22+
class ObjectLoaderPartPartitioner :
23+
OutputPartitioner<StreamKey, DestinationRecordAirbyteValue, ObjectKey, Part> {
24+
// Start on a random value
25+
private var nextPartition = Random(System.currentTimeMillis()).nextInt()
26+
27+
override fun getOutputKey(inputKey: StreamKey, output: Part): ObjectKey {
28+
return ObjectKey(inputKey.stream, output.key)
29+
}
30+
31+
override fun getPart(outputKey: ObjectKey, numParts: Int): Int {
32+
// Rotate through partitions
33+
val part = nextPartition++
34+
return if (part == Int.MIN_VALUE) { // avoid overflow
35+
0
36+
} else {
37+
abs(part) % numParts
38+
}
39+
}
40+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.pipline.object_storage
6+
7+
import io.airbyte.cdk.load.command.DestinationStream
8+
import io.airbyte.cdk.load.file.object_storage.Part
9+
import io.airbyte.cdk.load.message.ChannelMessageQueue
10+
import io.airbyte.cdk.load.message.PartitionedQueue
11+
import io.airbyte.cdk.load.message.PipelineMessage
12+
import io.airbyte.cdk.load.message.WithStream
13+
import io.airbyte.cdk.load.state.ReservationManager
14+
import io.airbyte.cdk.load.write.object_storage.ObjectLoader
15+
import io.github.oshai.kotlinlogging.KotlinLogging
16+
import io.micronaut.context.annotation.Factory
17+
import io.micronaut.context.annotation.Requires
18+
import jakarta.inject.Named
19+
import jakarta.inject.Singleton
20+
import kotlinx.coroutines.channels.Channel
21+
import kotlinx.coroutines.runBlocking
22+
23+
data class ObjectKey(override val stream: DestinationStream.Descriptor, val objectKey: String) :
24+
WithStream
25+
26+
@Factory
27+
class ObjectLoaderPartQueueFactory(
28+
val loader: ObjectLoader,
29+
@Named("memoryManager") val memoryManager: ReservationManager,
30+
) {
31+
val log = KotlinLogging.logger {}
32+
33+
@Singleton
34+
@Named("objectLoaderPartQueue")
35+
@Requires(bean = ObjectLoader::class)
36+
fun objectLoaderPartQueue(): PartitionedQueue<PipelineMessage<ObjectKey, Part>> {
37+
val bytes = memoryManager.totalCapacityBytes * loader.maxMemoryRatioReservedForParts
38+
val reservation = runBlocking { memoryManager.reserve(bytes.toLong(), this) }
39+
val bytesPerPartition = reservation.bytesReserved / loader.numPartWorkers
40+
val partsPerPartition = bytesPerPartition / loader.partSizeBytes
41+
42+
if (partsPerPartition < 1) {
43+
throw IllegalArgumentException(
44+
"Reserved $bytes/${memoryManager.totalCapacityBytes}b not enough for ${loader.numPartWorkers} ${loader.partSizeBytes}b parts"
45+
)
46+
}
47+
48+
log.info {
49+
"Reserved $bytes/${memoryManager.totalCapacityBytes}b for ${loader.numPartWorkers} ${loader.partSizeBytes}b parts => $partsPerPartition capacity per queue partition"
50+
}
51+
52+
return PartitionedQueue(
53+
(0 until loader.numUploadWorkers)
54+
.map {
55+
ChannelMessageQueue(
56+
Channel<PipelineMessage<ObjectKey, Part>>(partsPerPartition.toInt())
57+
)
58+
}
59+
.toTypedArray()
60+
)
61+
}
62+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.pipline.object_storage
6+
7+
import io.airbyte.cdk.load.file.object_storage.Part
8+
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
9+
import io.airbyte.cdk.load.message.PartitionedQueue
10+
import io.airbyte.cdk.load.message.PipelineEvent
11+
import io.airbyte.cdk.load.message.QueueWriter
12+
import io.airbyte.cdk.load.message.StreamKey
13+
import io.airbyte.cdk.load.pipeline.BatchUpdate
14+
import io.airbyte.cdk.load.pipeline.LoadPipelineStep
15+
import io.airbyte.cdk.load.pipeline.RecordCountFlushStrategy
16+
import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask
17+
import io.airbyte.cdk.load.write.object_storage.ObjectLoader
18+
import io.micronaut.context.annotation.Requires
19+
import io.micronaut.context.annotation.Value
20+
import jakarta.inject.Named
21+
import jakarta.inject.Singleton
22+
23+
@Singleton
24+
@Requires(bean = ObjectLoader::class)
25+
class ObjectLoaderPartStep(
26+
private val objectLoader: ObjectLoader,
27+
private val recordToPartAccumulator: ObjectLoaderRecordToPartAccumulator<*>,
28+
@Named("recordQueue")
29+
val inputQueue: PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
30+
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
31+
@Named("objectLoaderPartQueue") val partQueue: PartitionedQueue<PipelineEvent<ObjectKey, Part>>,
32+
@Value("\${airbyte.destination.core.record-batch-size-override:null}")
33+
val batchSizeOverride: Long? = null,
34+
) : LoadPipelineStep {
35+
override val numWorkers: Int = objectLoader.numPartWorkers
36+
37+
override fun taskForPartition(partition: Int): LoadPipelineStepTask<*, *, *, *, *> {
38+
return LoadPipelineStepTask(
39+
recordToPartAccumulator,
40+
inputQueue.consume(partition),
41+
batchQueue,
42+
ObjectLoaderPartPartitioner(),
43+
partQueue,
44+
batchSizeOverride?.let { RecordCountFlushStrategy(it) },
45+
partition
46+
)
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.pipline.object_storage
6+
7+
import io.airbyte.cdk.load.command.DestinationCatalog
8+
import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient
9+
import io.airbyte.cdk.load.file.object_storage.Part
10+
import io.airbyte.cdk.load.file.object_storage.PartBookkeeper
11+
import io.airbyte.cdk.load.file.object_storage.StreamingUpload
12+
import io.airbyte.cdk.load.message.Batch
13+
import io.airbyte.cdk.load.message.WithBatchState
14+
import io.airbyte.cdk.load.pipeline.BatchAccumulator
15+
import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState
16+
import io.airbyte.cdk.load.write.object_storage.ObjectLoader
17+
import io.micronaut.context.annotation.Requires
18+
import jakarta.inject.Singleton
19+
import java.util.concurrent.ConcurrentHashMap
20+
21+
/**
22+
* In order to allow streaming uploads on the same key to be parallelized, upload state needs to be
23+
* shared across workers.
24+
*/
25+
@Singleton
26+
@Requires(bean = ObjectLoader::class)
27+
class UploadsInProgress {
28+
val byKey: ConcurrentHashMap<String, ObjectLoaderPartToObjectAccumulator.State> =
29+
ConcurrentHashMap()
30+
}
31+
32+
@Singleton
33+
@Requires(bean = ObjectLoader::class)
34+
class ObjectLoaderPartToObjectAccumulator(
35+
private val client: ObjectStorageClient<*>,
36+
private val catalog: DestinationCatalog,
37+
private val uploads: UploadsInProgress,
38+
) :
39+
BatchAccumulator<
40+
ObjectLoaderPartToObjectAccumulator.State,
41+
ObjectKey,
42+
Part,
43+
ObjectLoaderPartToObjectAccumulator.ObjectResult
44+
> {
45+
46+
data class State(val streamingUpload: StreamingUpload<*>, val bookkeeper: PartBookkeeper) :
47+
AutoCloseable {
48+
override fun close() {
49+
// Do Nothing
50+
}
51+
}
52+
53+
data class ObjectResult(override val state: Batch.State) : WithBatchState
54+
55+
override suspend fun start(key: ObjectKey, part: Int): State {
56+
val stream = catalog.getStream(key.stream)
57+
return uploads.byKey.getOrPut(key.objectKey) {
58+
State(
59+
client.startStreamingUpload(
60+
key.objectKey,
61+
metadata = ObjectStorageDestinationState.metadataFor(stream)
62+
),
63+
PartBookkeeper()
64+
)
65+
}
66+
}
67+
68+
override suspend fun accept(input: Part, state: State): Pair<State, ObjectResult?> {
69+
input.bytes?.let { state.streamingUpload.uploadPart(it, input.partIndex) }
70+
if (input.bytes == null) {
71+
throw IllegalStateException("Empty non-final part received: this should not happen")
72+
}
73+
state.bookkeeper.add(input)
74+
if (state.bookkeeper.isComplete) {
75+
return Pair(state, finish(state))
76+
}
77+
return Pair(state, null)
78+
}
79+
80+
override suspend fun finish(state: State): ObjectResult {
81+
if (state.bookkeeper.isComplete) {
82+
state.streamingUpload.complete()
83+
} // else assume another part is finishing this
84+
return ObjectResult(Batch.State.COMPLETE)
85+
}
86+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.pipline.object_storage
6+
7+
import io.airbyte.cdk.load.pipeline.LoadPipeline
8+
import io.airbyte.cdk.load.write.object_storage.ObjectLoader
9+
import io.micronaut.context.annotation.Requires
10+
import jakarta.inject.Singleton
11+
12+
@Singleton
13+
@Requires(bean = ObjectLoader::class)
14+
class ObjectLoaderPipeline(partStep: ObjectLoaderPartStep, uploadStep: ObjectLoaderUploadStep) :
15+
LoadPipeline(listOf(partStep, uploadStep))
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.pipline.object_storage
6+
7+
import io.airbyte.cdk.load.command.DestinationCatalog
8+
import io.airbyte.cdk.load.command.DestinationStream
9+
import io.airbyte.cdk.load.file.object_storage.BufferedFormattingWriter
10+
import io.airbyte.cdk.load.file.object_storage.BufferedFormattingWriterFactory
11+
import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient
12+
import io.airbyte.cdk.load.file.object_storage.Part
13+
import io.airbyte.cdk.load.file.object_storage.PartFactory
14+
import io.airbyte.cdk.load.file.object_storage.PathFactory
15+
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
16+
import io.airbyte.cdk.load.message.StreamKey
17+
import io.airbyte.cdk.load.pipeline.BatchAccumulator
18+
import io.airbyte.cdk.load.state.DestinationStateManager
19+
import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState
20+
import io.airbyte.cdk.load.write.object_storage.ObjectLoader
21+
import io.github.oshai.kotlinlogging.KotlinLogging
22+
import io.micronaut.context.annotation.Requires
23+
import io.micronaut.context.annotation.Value
24+
import jakarta.inject.Singleton
25+
import java.io.OutputStream
26+
27+
@Singleton
28+
@Requires(bean = ObjectLoader::class)
29+
class ObjectLoaderRecordToPartAccumulator<T : OutputStream>(
30+
private val pathFactory: PathFactory,
31+
private val catalog: DestinationCatalog,
32+
private val writerFactory: BufferedFormattingWriterFactory<T>,
33+
private val client: ObjectStorageClient<*>,
34+
private val loader: ObjectLoader,
35+
// TODO: This doesn't need to be "DestinationState", just a couple of utility classes
36+
val stateMananger: DestinationStateManager<ObjectStorageDestinationState>,
37+
@Value("\${airbyte.destination.core.record-batch-size-override:null}")
38+
val batchSizeOverride: Long? = null,
39+
) :
40+
BatchAccumulator<
41+
ObjectLoaderRecordToPartAccumulator.State<T>, StreamKey, DestinationRecordAirbyteValue, Part
42+
> {
43+
private val log = KotlinLogging.logger {}
44+
45+
private val objectSizeBytes = loader.objectSizeBytes
46+
private val partSizeBytes = loader.partSizeBytes
47+
48+
data class State<T : OutputStream>(
49+
val stream: DestinationStream,
50+
val writer: BufferedFormattingWriter<T>,
51+
val partFactory: PartFactory
52+
) : AutoCloseable {
53+
override fun close() {
54+
writer.close()
55+
}
56+
}
57+
58+
private suspend fun newState(stream: DestinationStream): State<T> {
59+
// Determine unique file name.
60+
val pathOnly = pathFactory.getFinalDirectory(stream)
61+
val state = stateMananger.getState(stream)
62+
val fileNo = state.getPartIdCounter(pathOnly).incrementAndGet()
63+
val fileName = state.ensureUnique(pathFactory.getPathToFile(stream, fileNo))
64+
65+
// Initialize the part factory and writer.
66+
val partFactory = PartFactory(fileName, fileNo)
67+
log.info { "Starting part generation for $fileName (${stream.descriptor})" }
68+
return State(stream, writerFactory.create(stream), partFactory)
69+
}
70+
71+
private fun makePart(state: State<T>, forceFinish: Boolean = false): Part {
72+
state.writer.flush()
73+
val newSize = state.partFactory.totalSize + state.writer.bufferSize
74+
val isFinal =
75+
forceFinish ||
76+
newSize >= objectSizeBytes ||
77+
batchSizeOverride != null // HACK: This is a hack to force a flush
78+
val bytes =
79+
if (isFinal) {
80+
state.writer.finish()
81+
} else {
82+
state.writer.takeBytes()
83+
}
84+
val part = state.partFactory.nextPart(bytes, isFinal)
85+
log.info { "Creating part $part" }
86+
return part
87+
}
88+
89+
override suspend fun start(key: StreamKey, part: Int): State<T> {
90+
val stream = catalog.getStream(key.stream)
91+
return newState(stream)
92+
}
93+
94+
override suspend fun accept(
95+
input: DestinationRecordAirbyteValue,
96+
state: State<T>
97+
): Pair<State<T>?, Part?> {
98+
state.writer.accept(input)
99+
if (state.writer.bufferSize >= partSizeBytes || batchSizeOverride != null) {
100+
val part = makePart(state)
101+
val nextState =
102+
if (part.isFinal) {
103+
null
104+
} else {
105+
state
106+
}
107+
return Pair(nextState, part)
108+
}
109+
return Pair(state, null)
110+
}
111+
112+
override suspend fun finish(state: State<T>): Part {
113+
return makePart(state, true)
114+
}
115+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.pipline.object_storage
6+
7+
import io.airbyte.cdk.load.file.object_storage.Part
8+
import io.airbyte.cdk.load.message.PartitionedQueue
9+
import io.airbyte.cdk.load.message.PipelineEvent
10+
import io.airbyte.cdk.load.message.QueueWriter
11+
import io.airbyte.cdk.load.message.StreamKey
12+
import io.airbyte.cdk.load.pipeline.BatchUpdate
13+
import io.airbyte.cdk.load.pipeline.LoadPipelineStep
14+
import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask
15+
import io.airbyte.cdk.load.write.object_storage.ObjectLoader
16+
import io.micronaut.context.annotation.Requires
17+
import jakarta.inject.Named
18+
import jakarta.inject.Singleton
19+
20+
@Singleton
21+
@Requires(bean = ObjectLoader::class)
22+
class ObjectLoaderUploadStep(
23+
val loader: ObjectLoader,
24+
val accumulator: ObjectLoaderPartToObjectAccumulator,
25+
@Named("objectLoaderPartQueue") val partQueue: PartitionedQueue<PipelineEvent<ObjectKey, Part>>,
26+
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
27+
) : LoadPipelineStep {
28+
override val numWorkers: Int = loader.numUploadWorkers
29+
30+
override fun taskForPartition(partition: Int): LoadPipelineStepTask<*, *, *, *, *> {
31+
return LoadPipelineStepTask(
32+
accumulator,
33+
partQueue.consume(partition),
34+
batchQueue,
35+
outputPartitioner = null,
36+
outputQueue =
37+
null
38+
as
39+
PartitionedQueue<
40+
PipelineEvent<StreamKey, ObjectLoaderPartToObjectAccumulator.ObjectResult>
41+
>?,
42+
flushStrategy = null,
43+
partition
44+
)
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.load.write.object_storage
6+
7+
import io.airbyte.cdk.load.write.LoadStrategy
8+
9+
/**
10+
* [ObjectLoader] is for the use case where a destination writers records into some number of files
11+
* in a file system or cloud storage whose client supports streaming multipart uploads.
12+
*/
13+
interface ObjectLoader : LoadStrategy {
14+
val numPartWorkers: Int
15+
val numUploadWorkers: Int
16+
val maxMemoryRatioReservedForParts: Double
17+
val objectSizeBytes: Long
18+
val partSizeBytes: Long
19+
20+
override val inputPartitions: Int
21+
get() = numPartWorkers
22+
}

‎airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/PartToObjectAccumulator.kt

+1
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ class PartToObjectAccumulator<T : RemoteObject<*>>(
6363
log.info { "Completed upload of ${obj.key}" }
6464
return LoadedObject(remoteObject = obj, fileNumber = batch.part.fileNumber)
6565
} else {
66+
log.info { "Not completing upload of ${batch.part.key} yet" }
6667
return IncompletePartialUpload(batch.part.key)
6768
}
6869
}

‎airbyte-integrations/connectors/destination-s3/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ data:
22
connectorSubtype: file
33
connectorType: destination
44
definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
5-
dockerImageTag: 1.5.3
5+
dockerImageTag: 1.5.4
66
dockerRepository: airbyte/destination-s3
77
githubIssueLabel: destination-s3
88
icon: s3.svg

‎airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Configuration.kt

+24-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfigurati
2020
import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfigurationProvider
2121
import io.airbyte.cdk.load.command.s3.S3BucketConfiguration
2222
import io.airbyte.cdk.load.command.s3.S3BucketConfigurationProvider
23+
import io.airbyte.cdk.load.command.s3.S3ClientConfiguration
24+
import io.airbyte.cdk.load.command.s3.S3ClientConfigurationProvider
2325
import io.micronaut.context.annotation.Factory
2426
import jakarta.inject.Singleton
2527
import java.io.OutputStream
@@ -36,9 +38,18 @@ data class S3V2Configuration<T : OutputStream>(
3638
// Internal configuration
3739
override val objectStorageUploadConfiguration: ObjectStorageUploadConfiguration =
3840
ObjectStorageUploadConfiguration(),
39-
override val numProcessRecordsWorkers: Int = 2,
41+
override val numProcessRecordsWorkers: Int = 1,
4042
override val estimatedRecordMemoryOverheadRatio: Double = 5.0,
4143
override val processEmptyFiles: Boolean = true,
44+
45+
// Temporary for tuning
46+
val numPartWorkers: Int,
47+
val numUploadWorkers: Int,
48+
val maxMemoryRatioReservedForParts: Double,
49+
val objectSizeBytes: Long,
50+
val partSizeBytes: Long,
51+
override val maxMessageQueueMemoryUsageRatio: Double,
52+
val useLegacyJavaClient: Boolean = false,
4253
) :
4354
DestinationConfiguration(),
4455
AWSAccessKeyConfigurationProvider,
@@ -47,7 +58,11 @@ data class S3V2Configuration<T : OutputStream>(
4758
ObjectStoragePathConfigurationProvider,
4859
ObjectStorageFormatConfigurationProvider,
4960
ObjectStorageUploadConfigurationProvider,
50-
ObjectStorageCompressionConfigurationProvider<T>
61+
ObjectStorageCompressionConfigurationProvider<T>,
62+
S3ClientConfigurationProvider {
63+
override val s3ClientConfiguration: S3ClientConfiguration =
64+
S3ClientConfiguration(useLegacyJavaClient = useLegacyJavaClient)
65+
}
5166

5267
@Singleton
5368
class S3V2ConfigurationFactory :
@@ -60,6 +75,13 @@ class S3V2ConfigurationFactory :
6075
objectStoragePathConfiguration = pojo.toObjectStoragePathConfiguration(),
6176
objectStorageFormatConfiguration = pojo.toObjectStorageFormatConfiguration(),
6277
objectStorageCompressionConfiguration = pojo.toCompressionConfiguration(),
78+
numPartWorkers = pojo.numPartWorkers ?: 2,
79+
numUploadWorkers = pojo.numUploadWorkers ?: 10,
80+
maxMemoryRatioReservedForParts = pojo.maxMemoryRatioReservedForParts ?: 0.2,
81+
objectSizeBytes = pojo.objectSizeBytes ?: (200 * 1024 * 1024),
82+
partSizeBytes = pojo.partSizeBytes ?: (10 * 1024 * 1024),
83+
maxMessageQueueMemoryUsageRatio = pojo.maxMessageQueueMemoryUsageRatio ?: 0.2,
84+
useLegacyJavaClient = pojo.useLegacyJavaClient ?: false
6385
)
6486
}
6587
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.destination.s3_v2
6+
7+
import io.airbyte.cdk.load.pipeline.RoundRobinInputPartitioner
8+
import io.airbyte.cdk.load.write.object_storage.ObjectLoader
9+
import jakarta.inject.Singleton
10+
11+
@Singleton
12+
class S3V2ObjectLoader(config: S3V2Configuration<*>) : ObjectLoader {
13+
override val numPartWorkers: Int = config.numPartWorkers
14+
override val numUploadWorkers: Int = config.numUploadWorkers
15+
override val maxMemoryRatioReservedForParts: Double = config.maxMemoryRatioReservedForParts
16+
override val objectSizeBytes: Long = config.objectSizeBytes
17+
override val partSizeBytes: Long = config.partSizeBytes
18+
}
19+
20+
@Singleton class S3V2RoundRobinInputPartitioner : RoundRobinInputPartitioner()

‎airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Specification.kt

+18
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.integrations.destination.s3_v2
66

7+
import com.fasterxml.jackson.annotation.JsonProperty
78
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject
89
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
910
import io.airbyte.cdk.command.ConfigurationSpecification
@@ -74,6 +75,23 @@ class S3V2Specification :
7475
"{\"examples\":[\"{date}\",\"{date:yyyy_MM}\",\"{timestamp}\",\"{part_number}\",\"{sync_id}\"],\"order\":9}"
7576
)
7677
override val fileNamePattern: String? = null
78+
79+
@get:JsonProperty("num_part_workers", defaultValue = "1") val numPartWorkers: Int? = null
80+
81+
@get:JsonProperty("num_upload_workers", defaultValue = "1") val numUploadWorkers: Int? = null
82+
83+
@get:JsonProperty("max_memory_ratio_reserved_for_parts")
84+
val maxMemoryRatioReservedForParts: Double? = null
85+
86+
@get:JsonProperty("object_size_bytes", defaultValue = "209715200")
87+
val objectSizeBytes: Long? = null
88+
89+
@get:JsonProperty("part_size_bytes", defaultValue = "10485760") val partSizeBytes: Long? = null
90+
91+
@get:JsonProperty("max_message_queue_memory_usage_ratio")
92+
val maxMessageQueueMemoryUsageRatio: Double? = null
93+
94+
@get:JsonProperty("use_legacy_java_client") val useLegacyJavaClient: Boolean? = null
7795
}
7896

7997
@Singleton

‎airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2PerformanceTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ class S3V2ParquetSnappyPerformanceTest :
3333
configContents = S3V2TestUtils.getConfig(S3V2TestUtils.PARQUET_SNAPPY_CONFIG_PATH),
3434
configSpecClass = S3V2Specification::class.java,
3535
defaultRecordsToInsert = 1_000_000,
36-
micronautProperties = S3V2TestUtils.PERFORMANCE_TEST_MICRONAUT_PROPERTIES,
36+
micronautProperties = S3V2TestUtils.PERFORMANCE_TEST_MICRONAUT_PROPERTIES
3737
) {
3838
@Test
3939
override fun testInsertRecords() {

‎docs/integrations/destinations/s3.md

+1
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,7 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou
544544

545545
| Version | Date | Pull Request | Subject |
546546
|:------------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
547+
| 1.5.4 | 2025-03-05 | [54695](https://github.com/airbytehq/airbyte/pull/54695) | Nonfunctional changes to support performance testing |
547548
| 1.5.3 | 2025-03-04 | [54661](https://github.com/airbytehq/airbyte/pull/54661) | Nonfunctional changes to support performance testing |
548549
| 1.5.2 | 2025-02-25 | [54661](https://github.com/airbytehq/airbyte/pull/54661) | Nonfunctional cleanup; dropped unused staging code |
549550
| 1.5.1 | 2025-02-11 | [53636](https://github.com/airbytehq/airbyte/pull/53636) | Nonfunctional CDK version pin. |

0 commit comments

Comments
 (0)
Please sign in to comment.