From 1d98570fa17253183337a1e3c55e505c96521f6f Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Fri, 7 Mar 2025 09:11:23 -0800 Subject: [PATCH 1/2] S3 Destination Uses New Load CDK Interface (temporarily disabled) --- .../cdk/load/config/SyncBeanFactory.kt | 3 +- .../airbyte/cdk/load/message/PipelineEvent.kt | 14 +- .../cdk/load/pipeline/BatchAccumulator.kt | 37 ++- .../load/pipeline/DirectLoadPipelineStep.kt | 4 +- .../pipeline/DirectLoadRecordAccumulator.kt | 14 +- .../cdk/load/pipeline/InputPartitioner.kt | 3 + .../pipeline/RoundRobinInputPartitioner.kt | 29 +++ .../cdk/load/task/DestinationTaskLauncher.kt | 2 +- .../load/task/internal/InputConsumerTask.kt | 14 +- .../task/internal/LoadPipelineStepTask.kt | 77 +++++-- .../pipeline/RoundRobinPartitionerTest.kt | 34 +++ .../load/task/DestinationTaskLauncherTest.kt | 2 +- .../load/task/DestinationTaskLauncherUTest.kt | 2 +- .../task/internal/InputConsumerTaskUTest.kt | 2 +- .../internal/LoadPipelineStepTaskUTest.kt | 217 +++++++++--------- .../load/file/object_storage/PartFactory.kt | 12 +- .../ObjectLoaderPartPartitioner.kt | 40 ++++ .../object_storage/ObjectLoaderPartQueue.kt | 62 +++++ .../object_storage/ObjectLoaderPartStep.kt | 48 ++++ .../ObjectLoaderPartToObjectAccumulator.kt | 86 +++++++ .../object_storage/ObjectLoaderPipeline.kt | 15 ++ .../ObjectLoaderRecordToPartAccumulator.kt | 115 ++++++++++ .../object_storage/ObjectLoaderUploadStep.kt | 46 ++++ .../load/write/object_storage/ObjectLoader.kt | 67 ++++++ .../ObjectLoaderPartPartitionerTest.kt | 49 ++++ .../connectors/destination-s3/metadata.yaml | 2 +- .../src/main/kotlin/S3V2Configuration.kt | 9 +- .../src/main/kotlin/S3V2ObjectLoader.kt | 25 ++ docs/integrations/destinations/s3.md | 1 + 29 files changed, 859 insertions(+), 172 deletions(-) create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/RoundRobinInputPartitioner.kt create mode 100644 airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/pipeline/RoundRobinPartitionerTest.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartPartitioner.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartQueue.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartStep.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartToObjectAccumulator.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPipeline.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderRecordToPartAccumulator.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderUploadStep.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectLoader.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/pipeline/object_storage/ObjectLoaderPartPartitionerTest.kt create mode 100644 airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2ObjectLoader.kt diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt index 07b2d90bfd912..b45002fc0270e 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt @@ -16,7 +16,6 @@ import io.airbyte.cdk.load.message.PipelineEvent import io.airbyte.cdk.load.message.StreamKey import io.airbyte.cdk.load.pipeline.BatchUpdate import io.airbyte.cdk.load.state.ReservationManager -import io.airbyte.cdk.load.state.Reserved import io.airbyte.cdk.load.task.implementor.FileAggregateMessage import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage import io.airbyte.cdk.load.write.LoadStrategy @@ -120,7 +119,7 @@ class SyncBeanFactory { @Named("recordQueue") fun recordQueue( loadStrategy: LoadStrategy? = null, - ): PartitionedQueue>> { + ): PartitionedQueue> { return PartitionedQueue( Array(loadStrategy?.inputPartitions ?: 1) { ChannelMessageQueue(Channel(Channel.UNLIMITED)) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PipelineEvent.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PipelineEvent.kt index 9152fa9a94b1a..fa3bb57a9d817 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PipelineEvent.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PipelineEvent.kt @@ -10,15 +10,19 @@ import io.airbyte.cdk.load.state.CheckpointId /** Used internally by the CDK to pass messages between steps in the loader pipeline. */ sealed interface PipelineEvent +/** + * A message that contains a keyed payload. The key is used to manage the state of the payload's + * corresponding [io.airbyte.cdk.load.pipeline.BatchAccumulator]. [checkpointCounts] is used by the + * CDK to perform state message bookkeeping. [postProcessingCallback] is for releasing resources + * associated with the message. + */ class PipelineMessage( val checkpointCounts: Map, val key: K, - val value: T + val value: T, + val postProcessingCallback: suspend () -> Unit = {}, ) : PipelineEvent -/** - * We send the end message on the stream and not the key, because there's no way to partition an - * empty message. - */ +/** Broadcast at end-of-stream to all partitions to signal that the stream has ended. */ class PipelineEndOfStream(val stream: DestinationStream.Descriptor) : PipelineEvent diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/BatchAccumulator.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/BatchAccumulator.kt index fe2f03dcc6f71..1713eab1da797 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/BatchAccumulator.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/BatchAccumulator.kt @@ -7,11 +7,38 @@ package io.airbyte.cdk.load.pipeline import io.airbyte.cdk.load.message.WithStream /** - * [BatchAccumulator] is used internally by the CDK to implement RecordLoaders. Connector devs - * should never need to implement this interface. + * [BatchAccumulator] is used internally by the CDK to implement + * [io.airbyte.cdk.load.write.LoadStrategy]s. Connector devs should never need to implement this + * interface. + * + * It is the glue that connects a specific step in a specific pipeline to the generic pipeline on + * the back end. (For example, in a three-stage pipeline like bulk load, step 1 is to create a part, + * step 2 is to upload it, and step 3 is to load it from object storage into a table.) + * + * - [S] is a state type that will be threaded through accumulator calls. + * - [K] is a key type associated the input data. (NOTE: Currently, there is no support for + * key-mapping, so the key is always [io.airbyte.cdk.load.message.StreamKey]). Specifically, state + * will always be managed per-key. + * - [T] is the input data type + * - [U] is the output data type + * + * The first time data is seen for a given key, [start] is called (with the partition number). The + * state returned by [start] will be passed per input to [accept]. + * + * If [accept] returns a non-null output, that output will be forwarded to the next stage (if + * applicable) and/or trigger bookkeeping (iff the output type implements + * [io.airbyte.cdk.load.message.WithBatchState]). + * + * If [accept] returns a non-null state, that state will be passed to the next call to [accept]. If + * [accept] returns a null state, the state will be discarded and a new one will be created on the + * next input by a new call to [start]. + * + * When the input stream is exhausted, [finish] will be called with any remaining state iff at least + * one input was seen for that key. This means that [finish] will not be called on empty keys or on + * keys where the last call to [accept] yielded a null (finished) state. */ interface BatchAccumulator { - fun start(key: K, part: Int): S - fun accept(record: T, state: S): Pair - fun finish(state: S): U + suspend fun start(key: K, part: Int): S + suspend fun accept(input: T, state: S): Pair + suspend fun finish(state: S): U } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadPipelineStep.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadPipelineStep.kt index c611e103458ce..d553fed35d026 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadPipelineStep.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadPipelineStep.kt @@ -9,7 +9,6 @@ import io.airbyte.cdk.load.message.PartitionedQueue import io.airbyte.cdk.load.message.PipelineEvent import io.airbyte.cdk.load.message.QueueWriter import io.airbyte.cdk.load.message.StreamKey -import io.airbyte.cdk.load.state.Reserved import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask import io.airbyte.cdk.load.write.DirectLoader import io.airbyte.cdk.load.write.DirectLoaderFactory @@ -24,8 +23,7 @@ import jakarta.inject.Singleton class DirectLoadPipelineStep( val accumulator: DirectLoadRecordAccumulator, @Named("recordQueue") - val inputQueue: - PartitionedQueue>>, + val inputQueue: PartitionedQueue>, @Named("batchStateUpdateQueue") val batchQueue: QueueWriter, @Value("\${airbyte.destination.core.record-batch-size-override:null}") val batchSizeOverride: Long? = null, diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadRecordAccumulator.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadRecordAccumulator.kt index e1f964919da8c..5752e0e3b7309 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadRecordAccumulator.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadRecordAccumulator.kt @@ -26,23 +26,23 @@ data class DirectLoadAccResult(override val state: Batch.State) : WithBatchState class DirectLoadRecordAccumulator( val directLoaderFactory: DirectLoaderFactory ) : BatchAccumulator { - override fun start(key: K, part: Int): S { + override suspend fun start(key: K, part: Int): S { return directLoaderFactory.create(key.stream, part) } - override fun accept( - record: DestinationRecordAirbyteValue, + override suspend fun accept( + input: DestinationRecordAirbyteValue, state: S - ): Pair { - state.accept(record).let { + ): Pair { + state.accept(input).let { return when (it) { is Incomplete -> Pair(state, null) - is Complete -> Pair(state, DirectLoadAccResult(Batch.State.COMPLETE)) + is Complete -> Pair(null, DirectLoadAccResult(Batch.State.COMPLETE)) } } } - override fun finish(state: S): DirectLoadAccResult { + override suspend fun finish(state: S): DirectLoadAccResult { state.finish() return DirectLoadAccResult(Batch.State.COMPLETE) } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt index 19189e7d33a44..d1aa786a88f5a 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt @@ -17,6 +17,9 @@ interface InputPartitioner { fun getPartition(record: DestinationRecordAirbyteValue, numParts: Int): Int } +/** + * The default input partitioner, which partitions by the stream name. TODO: Should be round-robin? + */ @Singleton @Secondary class ByStreamInputPartitioner : InputPartitioner { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/RoundRobinInputPartitioner.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/RoundRobinInputPartitioner.kt new file mode 100644 index 0000000000000..f9b4dcf31e970 --- /dev/null +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/RoundRobinInputPartitioner.kt @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipeline + +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue +import kotlin.math.abs +import kotlin.random.Random + +/** + * Declare a singleton of this type to have input distributed evenly across the input partitions. + * (The default is to [ByStreamInputPartitioner].) + */ +open class RoundRobinInputPartitioner(private val rotateEveryNRecords: Int = 10_000) : + InputPartitioner { + private var nextPartition = + Random(System.currentTimeMillis()).nextInt(Int.MAX_VALUE / rotateEveryNRecords) * + rotateEveryNRecords + + override fun getPartition(record: DestinationRecordAirbyteValue, numParts: Int): Int { + val part = nextPartition++ / rotateEveryNRecords + return if (part == Int.MIN_VALUE) { // avoid overflow + 0 + } else { + abs(part) % numParts + } + } +} diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt index 9d67c39e4ebcb..6cd2d48c31482 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt @@ -145,7 +145,7 @@ class DefaultDestinationTaskLauncher( // New interface shim @Named("recordQueue") private val recordQueueForPipeline: - PartitionedQueue>>, + PartitionedQueue>, @Named("batchStateUpdateQueue") private val batchUpdateQueue: ChannelMessageQueue, private val loadPipeline: LoadPipeline?, private val partitioner: InputPartitioner, diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt index 0f149c96e890b..3a296117a8284 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt @@ -80,7 +80,7 @@ class DefaultInputConsumerTask( // Required by new interface @Named("recordQueue") private val recordQueueForPipeline: - PartitionedQueue>>, + PartitionedQueue>, private val loadPipeline: LoadPipeline? = null, private val partitioner: InputPartitioner, private val openStreamQueue: QueueWriter @@ -165,20 +165,20 @@ class DefaultInputConsumerTask( mapOf(manager.getCurrentCheckpointId() to 1), StreamKey(stream), record - ) + ) { reserved.release() } val partition = partitioner.getPartition(record, recordQueueForPipeline.partitions) - recordQueueForPipeline.publish(reserved.replace(pipelineMessage), partition) + recordQueueForPipeline.publish(pipelineMessage, partition) } is DestinationRecordStreamComplete -> { manager.markEndOfStream(true) log.info { "Read COMPLETE for stream $stream" } - recordQueueForPipeline.broadcast(reserved.replace(PipelineEndOfStream(stream))) + recordQueueForPipeline.broadcast(PipelineEndOfStream(stream)) reserved.release() } is DestinationRecordStreamIncomplete -> { manager.markEndOfStream(false) log.info { "Read INCOMPLETE for stream $stream" } - recordQueueForPipeline.broadcast(reserved.replace(PipelineEndOfStream(stream))) + recordQueueForPipeline.broadcast(PipelineEndOfStream(stream)) reserved.release() } is DestinationFile -> { @@ -310,7 +310,7 @@ interface InputConsumerTaskFactory { // Required by new interface recordQueueForPipeline: - PartitionedQueue>>, + PartitionedQueue>, loadPipeline: LoadPipeline?, partitioner: InputPartitioner, openStreamQueue: QueueWriter, @@ -333,7 +333,7 @@ class DefaultInputConsumerTaskFactory( // Required by new interface recordQueueForPipeline: - PartitionedQueue>>, + PartitionedQueue>, loadPipeline: LoadPipeline?, partitioner: InputPartitioner, openStreamQueue: QueueWriter, diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt index 3139a44f4f89c..5f5200d10bcf3 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt @@ -18,7 +18,6 @@ import io.airbyte.cdk.load.pipeline.BatchUpdate import io.airbyte.cdk.load.pipeline.OutputPartitioner import io.airbyte.cdk.load.pipeline.PipelineFlushStrategy import io.airbyte.cdk.load.state.CheckpointId -import io.airbyte.cdk.load.state.Reserved import io.airbyte.cdk.load.task.OnEndOfSync import io.airbyte.cdk.load.task.Task import io.airbyte.cdk.load.task.TerminalCondition @@ -34,7 +33,7 @@ data class RangeState( /** A long-running task that actually implements a load pipeline step. */ class LoadPipelineStepTask( private val batchAccumulator: BatchAccumulator, - private val inputFlow: Flow>>, + private val inputFlow: Flow>, private val batchUpdateQueue: QueueWriter, private val outputPartitioner: OutputPartitioner?, private val outputQueue: PartitionedQueue>?, @@ -44,11 +43,11 @@ class LoadPipelineStepTask>()) { stateStore, reservation -> + inputFlow.fold(mutableMapOf>()) { stateStore, input -> try { - when (val input = reservation.value) { + when (input) { is PipelineMessage -> { - // Fetch and update the local state associated with the current batch. + // Get or create the accumulator state associated w/ the input key. val state = stateStore .getOrPut(input.key) { @@ -57,34 +56,62 @@ class LoadPipelineStepTask old + new } } - // If the accumulator did not produce a result, check if we should flush. - // If so, use the result of a finish call as the output. - val finalOutput = - output - ?: if (flushStrategy?.shouldFlush(state.inputCount) == true) { - batchAccumulator.finish(newState) + // Finalize the state and output + val (finalState, finalOutput) = + if (outputMaybe == null) { + // Possibly force an output (and if so, discard the state) + if (flushStrategy?.shouldFlush(state.inputCount) == true) { + val finalOutput = batchAccumulator.finish(newStateMaybe!!) + Pair(null, finalOutput) } else { - null + Pair(newStateMaybe, null) } + } else { + // Otherwise, just use what we were given + Pair(newStateMaybe, outputMaybe) + } - if (finalOutput != null) { - // Publish the emitted output and evict the state. - handleOutput(input.key, state.checkpointCounts, finalOutput) - stateStore.remove(input.key) + // Publish the output if there is one & reset the input count + val inputCount = + if (finalOutput != null) { + // Publish the emitted output and evict the state. + handleOutput(input.key, state.checkpointCounts, finalOutput) + state.checkpointCounts.clear() + 0 + } else { + state.inputCount + } + + // Update the state if `accept` returned a new state, otherwise evict. + if (finalState != null) { + // If accept returned a new state, update the state store. + stateStore[input.key] = + state.copy(state = finalState, inputCount = inputCount) } else { - // If there's no output yet, just update the local state. - stateStore[input.key] = RangeState(newState, state.checkpointCounts) + stateStore.remove(input.key) } + stateStore } is PipelineEndOfStream -> { @@ -92,8 +119,10 @@ class LoadPipelineStepTask stateStore.remove(key)?.let { stored -> - val output = batchAccumulator.finish(stored.state) - handleOutput(key, stored.checkpointCounts, output) + if (stored.inputCount > 0) { + val output = batchAccumulator.finish(stored.state) + handleOutput(key, stored.checkpointCounts, output) + } } } @@ -122,7 +151,7 @@ class LoadPipelineStepTask() + val numParts = 3 + var lastPartition: Int? = null + var recordCount = 0 + repeat(1000) { + val partition = partitioner.getPartition(record, numParts) + lastPartition?.let { last -> + recordCount++ + if (recordCount == 5) { + recordCount = 0 + assertEquals((last + 1) % numParts, partition) + } else { + assertEquals(last, partition) + } + } + lastPartition = partition + } + } +} diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt index ce7e08a847dcc..010238be5c0a5 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt @@ -159,7 +159,7 @@ class DestinationTaskLauncherTest { destinationTaskLauncher: DestinationTaskLauncher, fileTransferQueue: MessageQueue, recordQueueForPipeline: - PartitionedQueue>>, + PartitionedQueue>, loadPipeline: LoadPipeline?, partitioner: InputPartitioner, openStreamQueue: QueueWriter, diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt index 02a85819668fd..adc8920832548 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt @@ -95,7 +95,7 @@ class DestinationTaskLauncherUTest { private val openStreamQueue: MessageQueue = mockk(relaxed = true) private val recordQueueForPipeline: - PartitionedQueue>> = + PartitionedQueue> = mockk(relaxed = true) private val batchUpdateQueue: ChannelMessageQueue = mockk(relaxed = true) private val partitioner: InputPartitioner = mockk(relaxed = true) diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskUTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskUTest.kt index d9fe8a7e3947d..3cf9bbc30c832 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskUTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskUTest.kt @@ -47,7 +47,7 @@ class InputConsumerTaskUTest { @MockK lateinit var fileTransferQueue: MessageQueue @MockK lateinit var recordQueueForPipeline: - PartitionedQueue>> + PartitionedQueue> @MockK lateinit var partitioner: InputPartitioner @MockK lateinit var openStreamQueue: QueueWriter diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTaskUTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTaskUTest.kt index 99a806360279f..a50a30df6ae33 100644 --- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTaskUTest.kt +++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTaskUTest.kt @@ -16,13 +16,10 @@ import io.airbyte.cdk.load.pipeline.BatchAccumulator import io.airbyte.cdk.load.pipeline.BatchStateUpdate import io.airbyte.cdk.load.pipeline.BatchUpdate import io.airbyte.cdk.load.state.CheckpointId -import io.airbyte.cdk.load.state.Reserved import io.airbyte.cdk.load.util.setOnce import io.mockk.coEvery import io.mockk.coVerify -import io.mockk.every import io.mockk.impl.annotations.MockK -import io.mockk.verify import java.util.concurrent.atomic.AtomicBoolean import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.FlowCollector @@ -37,7 +34,7 @@ class LoadPipelineStepTaskUTest { @MockK lateinit var batchAccumulatorWithUpdate: BatchAccumulator - @MockK lateinit var inputFlow: Flow>> + @MockK lateinit var inputFlow: Flow> @MockK lateinit var batchUpdateQueue: QueueWriter data class Closeable(val id: Int = 0) : AutoCloseable { @@ -66,18 +63,17 @@ class LoadPipelineStepTaskUTest { part ) - private fun reserved(value: T): Reserved = Reserved(null, 0L, value) private fun messageEvent( key: StreamKey, value: String, counts: Map = emptyMap() - ): Reserved> = - reserved(PipelineMessage(counts.mapKeys { CheckpointId(it.key) }, key, value)) - private fun endOfStreamEvent(key: StreamKey): Reserved> = - reserved(PipelineEndOfStream(key.stream)) + ): PipelineEvent = + PipelineMessage(counts.mapKeys { CheckpointId(it.key) }, key, value) + private fun endOfStreamEvent(key: StreamKey): PipelineEvent = + PipelineEndOfStream(key.stream) @Test - fun `start and accept called on first no-output message, accept only on second`() = runTest { + fun `start only called on first message if state returned by accept is null`() = runTest { val key = StreamKey(DestinationStream.Descriptor("namespace", "stream")) val part = 7 val task = createTask(part, batchAccumulatorNoUpdate) @@ -85,72 +81,75 @@ class LoadPipelineStepTaskUTest { // No call to accept will finish the batch, but state will be threaded through. val state1 = Closeable(1) val state2 = Closeable(2) - every { batchAccumulatorNoUpdate.start(any(), any()) } returns state1 - every { batchAccumulatorNoUpdate.accept("value_0", state1) } returns Pair(state2, null) - every { batchAccumulatorNoUpdate.accept("value_1", state2) } returns Pair(Closeable(), null) + coEvery { batchAccumulatorNoUpdate.start(any(), any()) } returns state1 + coEvery { batchAccumulatorNoUpdate.accept("value_0", state1) } returns Pair(state2, null) + coEvery { batchAccumulatorNoUpdate.accept("value_1", state2) } returns + Pair(Closeable(), null) coEvery { inputFlow.collect(any()) } coAnswers { - val collector = - firstArg>>>() + val collector = firstArg>>() repeat(2) { collector.emit(messageEvent(key, "value_$it")) } } task.execute() - verify(exactly = 1) { batchAccumulatorNoUpdate.start(key, part) } - repeat(2) { verify(exactly = 1) { batchAccumulatorNoUpdate.accept("value_$it", any()) } } - verify(exactly = 0) { batchAccumulatorNoUpdate.finish(any()) } + coVerify(exactly = 1) { batchAccumulatorNoUpdate.start(key, part) } + repeat(2) { coVerify(exactly = 1) { batchAccumulatorNoUpdate.accept("value_$it", any()) } } + coVerify(exactly = 0) { batchAccumulatorNoUpdate.finish(any()) } } @Test - fun `start called again after batch completes (no update)`() = runTest { - val key = StreamKey(DestinationStream.Descriptor("namespace", "stream")) - val part = 6 - val task = createTask(part, batchAccumulatorNoUpdate) - val stateA1 = Closeable(1) - val stateA2 = Closeable(2) - val stateA3 = Closeable(3) - val stateB1 = Closeable(4) - val stateB2 = Closeable(5) - val startHasBeenCalled = AtomicBoolean(false) - - every { batchAccumulatorNoUpdate.start(any(), any()) } answers - { - if (startHasBeenCalled.setOnce()) stateA1 else stateB1 - } - every { batchAccumulatorNoUpdate.accept("value_0", stateA1) } returns Pair(stateA2, null) - every { batchAccumulatorNoUpdate.accept("value_1", stateA2) } returns Pair(stateA3, true) - every { batchAccumulatorNoUpdate.accept("value_2", stateB1) } returns Pair(stateB2, null) + fun `start called again only after null state returned, even if accept yields output`() = + runTest { + val key = StreamKey(DestinationStream.Descriptor("namespace", "stream")) + val part = 6 + val task = createTask(part, batchAccumulatorNoUpdate) + val stateA1 = Closeable(1) + val stateA2 = Closeable(2) + val stateB1 = Closeable(4) + val stateB2 = Closeable(5) + val startHasBeenCalled = AtomicBoolean(false) - coEvery { inputFlow.collect(any()) } coAnswers - { - val collector = - firstArg>>>() - repeat(3) { collector.emit(messageEvent(key, "value_$it")) } - } + coEvery { batchAccumulatorNoUpdate.start(any(), any()) } answers + { + if (startHasBeenCalled.setOnce()) stateA1 else stateB1 + } + coEvery { batchAccumulatorNoUpdate.accept("value_0", stateA1) } returns + Pair(stateA2, true) + coEvery { batchAccumulatorNoUpdate.accept("value_1", stateA2) } returns Pair(null, true) + coEvery { batchAccumulatorNoUpdate.accept("value_2", stateB1) } returns + Pair(stateB2, null) - task.execute() - verify(exactly = 2) { batchAccumulatorNoUpdate.start(key, part) } - repeat(3) { verify(exactly = 1) { batchAccumulatorNoUpdate.accept("value_$it", any()) } } - } + coEvery { inputFlow.collect(any()) } coAnswers + { + val collector = firstArg>>() + repeat(3) { collector.emit(messageEvent(key, "value_$it")) } + } + + task.execute() + coVerify(exactly = 2) { batchAccumulatorNoUpdate.start(key, part) } + repeat(3) { + coVerify(exactly = 1) { batchAccumulatorNoUpdate.accept("value_$it", any()) } + } + } @Test - fun `finish and update called on end-of-stream when last accept did not yield output`() = + fun `finish and update called on end-of-stream iff last accept returned a non-null state`() = runTest { val key = StreamKey(DestinationStream.Descriptor("namespace", "stream")) val part = 5 val task = createTask(part, batchAccumulatorNoUpdate) - every { batchAccumulatorNoUpdate.start(any(), any()) } returns Closeable() - every { batchAccumulatorNoUpdate.accept(any(), any()) } returns Pair(Closeable(), null) - every { batchAccumulatorNoUpdate.finish(any()) } returns true + coEvery { batchAccumulatorNoUpdate.start(any(), any()) } returns Closeable() + coEvery { batchAccumulatorNoUpdate.accept(any(), any()) } returns + Pair(Closeable(), null) + coEvery { batchAccumulatorNoUpdate.finish(any()) } returns true coEvery { batchUpdateQueue.publish(any()) } returns Unit coEvery { inputFlow.collect(any()) } coAnswers { - val collector = - firstArg>>>() + val collector = firstArg>>() repeat(10) { // arbitrary number of messages collector.emit(messageEvent(key, "value")) } @@ -159,32 +158,31 @@ class LoadPipelineStepTaskUTest { task.execute() - verify(exactly = 1) { batchAccumulatorNoUpdate.start(key, part) } - verify(exactly = 10) { batchAccumulatorNoUpdate.accept(any(), any()) } - verify(exactly = 1) { batchAccumulatorNoUpdate.finish(any()) } + coVerify(exactly = 1) { batchAccumulatorNoUpdate.start(key, part) } + coVerify(exactly = 10) { batchAccumulatorNoUpdate.accept(any(), any()) } + coVerify(exactly = 1) { batchAccumulatorNoUpdate.finish(any()) } coVerify(exactly = 1) { batchUpdateQueue.publish(any()) } } @Test - fun `update but not finish called on end-of-stream when last accept yielded output`() = + fun `update but not finish called on end-of-stream when last accept returned null state`() = runTest { val key = StreamKey(DestinationStream.Descriptor("namespace", "stream")) val part = 4 val task = createTask(part, batchAccumulatorNoUpdate) var acceptCalls = 0 - every { batchAccumulatorNoUpdate.start(any(), any()) } returns Closeable() - every { batchAccumulatorNoUpdate.accept(any(), any()) } answers + coEvery { batchAccumulatorNoUpdate.start(any(), any()) } returns Closeable() + coEvery { batchAccumulatorNoUpdate.accept(any(), any()) } answers { - if (++acceptCalls == 10) Pair(Closeable(), true) else Pair(Closeable(), null) + if (++acceptCalls == 10) Pair(null, true) else Pair(Closeable(), null) } - every { batchAccumulatorNoUpdate.finish(any()) } returns true + coEvery { batchAccumulatorNoUpdate.finish(any()) } returns true coEvery { batchUpdateQueue.publish(any()) } returns Unit coEvery { inputFlow.collect(any()) } coAnswers { - val collector = - firstArg>>>() + val collector = firstArg>>() repeat(10) { // arbitrary number of messages collector.emit(messageEvent(key, "value")) } @@ -193,9 +191,9 @@ class LoadPipelineStepTaskUTest { task.execute() - verify(exactly = 1) { batchAccumulatorNoUpdate.start(key, part) } - verify(exactly = 10) { batchAccumulatorNoUpdate.accept(any(), any()) } - verify(exactly = 0) { batchAccumulatorNoUpdate.finish(any()) } + coVerify(exactly = 1) { batchAccumulatorNoUpdate.start(key, part) } + coVerify(exactly = 10) { batchAccumulatorNoUpdate.accept(any(), any()) } + coVerify(exactly = 0) { batchAccumulatorNoUpdate.finish(any()) } coVerify(exactly = 1) { batchUpdateQueue.publish(any()) } } @@ -206,24 +204,21 @@ class LoadPipelineStepTaskUTest { val task = createTask(part, batchAccumulator = batchAccumulatorWithUpdate) var acceptCalls = 0 - every { batchAccumulatorWithUpdate.start(any(), any()) } returns Closeable() - every { batchAccumulatorWithUpdate.accept(any(), any()) } answers + coEvery { batchAccumulatorWithUpdate.start(any(), any()) } returns Closeable() + coEvery { batchAccumulatorWithUpdate.accept(any(), any()) } answers { - val output = - when (acceptCalls++ % 4) { - 0 -> null - 1 -> MyBatch(Batch.State.PROCESSED) - 2 -> MyBatch(Batch.State.PERSISTED) - 3 -> MyBatch(Batch.State.COMPLETE) - else -> error("unreachable") - } - Pair(Closeable(), output) + when (acceptCalls++ % 4) { + 0 -> Pair(Closeable(), null) + 1 -> Pair(null, MyBatch(Batch.State.PROCESSED)) + 2 -> Pair(null, MyBatch(Batch.State.PERSISTED)) + 3 -> Pair(null, MyBatch(Batch.State.COMPLETE)) + else -> error("unreachable") + } } coEvery { batchUpdateQueue.publish(any()) } returns Unit coEvery { inputFlow.collect(any()) } coAnswers { - val collector = - firstArg>>>() + val collector = firstArg>>() repeat(12) { // arbitrary number of messages collector.emit(messageEvent(key, "value")) } @@ -231,11 +226,11 @@ class LoadPipelineStepTaskUTest { task.execute() - verify(exactly = 9) { + coVerify(exactly = 9) { batchAccumulatorWithUpdate.start(key, part) } // only 1/4 are no output - verify(exactly = 12) { batchAccumulatorWithUpdate.accept(any(), any()) } // all have data - verify(exactly = 0) { batchAccumulatorWithUpdate.finish(any()) } // never end-of-stream + coVerify(exactly = 12) { batchAccumulatorWithUpdate.accept(any(), any()) } // all have data + coVerify(exactly = 0) { batchAccumulatorWithUpdate.finish(any()) } // never end-of-stream coVerify(exactly = 6) { batchUpdateQueue.publish(any()) } // half are PERSISTED/COMPLETE } @@ -247,13 +242,13 @@ class LoadPipelineStepTaskUTest { val task = createTask(part, batchAccumulatorWithUpdate) - // Make stream1 finish with a persisted output every 3 calls (otherwise null) - // Make stream2 finish with a persisted output every 2 calls (otherwise null) + // Make stream1 finish with a persisted output coEvery 3 calls (otherwise null) + // Make stream2 finish with a persisted output coEvery 2 calls (otherwise null) val stream1States = (0 until 11).map { Closeable(it) } val stream2States = (0 until 11).map { Closeable(it + 100) } var stream1StartCalls = 0 var stream2StartCalls = 0 - every { batchAccumulatorWithUpdate.start(key1, any()) } answers + coEvery { batchAccumulatorWithUpdate.start(key1, any()) } answers { // Stream 1 will finish on 0, 3, 6, 9 // (so the last finish is right before end-of-stream, leaving no input to finish) @@ -265,7 +260,7 @@ class LoadPipelineStepTaskUTest { else -> error("unreachable stream1 start call") } } - every { batchAccumulatorWithUpdate.start(key2, any()) } answers + coEvery { batchAccumulatorWithUpdate.start(key2, any()) } answers { // Stream 2 will finish on 0, 2, 4, 6, 8 // (so the last finish is one record before end-of-stream, leaving input to finish) @@ -280,22 +275,23 @@ class LoadPipelineStepTaskUTest { } } repeat(10) { - every { batchAccumulatorWithUpdate.accept(any(), stream1States[it]) } returns - Pair( - stream1States[it + 1], - if (it % 3 == 0) MyBatch(Batch.State.PERSISTED) else null - ) - every { batchAccumulatorWithUpdate.accept(any(), stream2States[it]) } returns - Pair( - stream2States[it + 1], - if (it % 2 == 0) MyBatch(Batch.State.COMPLETE) else null - ) + coEvery { batchAccumulatorWithUpdate.accept(any(), stream1States[it]) } returns + if (it % 3 == 0) { + Pair(null, MyBatch(Batch.State.PERSISTED)) + } else { + Pair(stream1States[it + 1], null) + } + coEvery { batchAccumulatorWithUpdate.accept(any(), stream2States[it]) } returns + if (it % 2 == 0) { + Pair(null, MyBatch(Batch.State.PERSISTED)) + } else { + Pair(stream2States[it + 1], null) + } } coEvery { inputFlow.collect(any()) } coAnswers { - val collector = - firstArg>>>() + val collector = firstArg>>() repeat(10) { // arbitrary number of messages collector.emit(messageEvent(key1, "stream1_value")) collector.emit(messageEvent(key2, "stream2_value")) @@ -306,20 +302,20 @@ class LoadPipelineStepTaskUTest { collector.emit(endOfStreamEvent(key2)) } - every { batchAccumulatorWithUpdate.finish(any()) } returns MyBatch(Batch.State.COMPLETE) + coEvery { batchAccumulatorWithUpdate.finish(any()) } returns MyBatch(Batch.State.COMPLETE) coEvery { batchUpdateQueue.publish(any()) } returns Unit task.execute() - verify(exactly = 4) { batchAccumulatorWithUpdate.start(key1, part) } - verify(exactly = 6) { batchAccumulatorWithUpdate.start(key2, part) } - verify(exactly = 10) { + coVerify(exactly = 4) { batchAccumulatorWithUpdate.start(key1, part) } + coVerify(exactly = 6) { batchAccumulatorWithUpdate.start(key2, part) } + coVerify(exactly = 10) { batchAccumulatorWithUpdate.accept("stream1_value", match { it in stream1States }) } - verify(exactly = 10) { + coVerify(exactly = 10) { batchAccumulatorWithUpdate.accept("stream2_value", match { it in stream2States }) } - verify(exactly = 1) { batchAccumulatorWithUpdate.finish(stream2States[10]) } + coVerify(exactly = 1) { batchAccumulatorWithUpdate.finish(stream2States[10]) } } @Test @@ -330,21 +326,20 @@ class LoadPipelineStepTaskUTest { val task = createTask(part, batchAccumulatorWithUpdate) - every { batchAccumulatorWithUpdate.start(key1, part) } returns Closeable(1) - every { batchAccumulatorWithUpdate.start(key2, part) } returns Closeable(2) - every { batchAccumulatorWithUpdate.accept("stream1_value", any()) } returns + coEvery { batchAccumulatorWithUpdate.start(key1, part) } returns Closeable(1) + coEvery { batchAccumulatorWithUpdate.start(key2, part) } returns Closeable(2) + coEvery { batchAccumulatorWithUpdate.accept("stream1_value", any()) } returns Pair(Closeable(1), null) - every { batchAccumulatorWithUpdate.accept("stream2_value", any()) } returns + coEvery { batchAccumulatorWithUpdate.accept("stream2_value", any()) } returns Pair(Closeable(2), null) - every { batchAccumulatorWithUpdate.finish(Closeable(1)) } returns + coEvery { batchAccumulatorWithUpdate.finish(Closeable(1)) } returns MyBatch(Batch.State.COMPLETE) - every { batchAccumulatorWithUpdate.finish(Closeable(2)) } returns + coEvery { batchAccumulatorWithUpdate.finish(Closeable(2)) } returns MyBatch(Batch.State.PERSISTED) coEvery { inputFlow.collect(any()) } coAnswers { - val collector = - firstArg>>>() + val collector = firstArg>>() // Emit 10 messages for stream1, 10 messages for stream2 repeat(12) { diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/PartFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/PartFactory.kt index e8c09b56f0ab4..4fb8875e64884 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/PartFactory.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/PartFactory.kt @@ -63,6 +63,10 @@ data class Part( ) { val isEmpty: Boolean get() = bytes == null + + override fun toString(): String { + return "Part(key='$key', size=${bytes?.size}, index=$partIndex, isFinal=$isFinal)" + } } class PartBookkeeper { @@ -87,7 +91,9 @@ class PartBookkeeper { // index even if it is empty. if (part.isFinal) { if (!finalIndex.compareAndSet(null, part.partIndex)) { - throw IllegalStateException("Final part already seen for ${part.key}") + throw IllegalStateException( + "Final part ${finalIndex.get()} already seen for ${part.key}" + ) } } } @@ -99,5 +105,7 @@ class PartBookkeeper { * 3. the last index is the final index */ val isComplete: Boolean - get() = finalIndex.get()?.let { it == partIndexes.size } ?: false + get() { + return finalIndex.get()?.let { it == partIndexes.size } ?: false + } } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartPartitioner.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartPartitioner.kt new file mode 100644 index 0000000000000..6fe3d749c7fee --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartPartitioner.kt @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipline.object_storage + +import io.airbyte.cdk.load.file.object_storage.Part +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue +import io.airbyte.cdk.load.message.StreamKey +import io.airbyte.cdk.load.pipeline.OutputPartitioner +import kotlin.math.abs +import kotlin.random.Random + +/** + * For routing parts to upload workers. + * + * The key is the object key (filename). This means that the framework will keep separate state per + * upload-in-progress. + * + * The partition is round-robin, for maximum concurrency. + */ +class ObjectLoaderPartPartitioner : + OutputPartitioner { + // Start on a random value + private var nextPartition = Random(System.currentTimeMillis()).nextInt(Int.MAX_VALUE) + + override fun getOutputKey(inputKey: StreamKey, output: Part): ObjectKey { + return ObjectKey(inputKey.stream, output.key) + } + + override fun getPart(outputKey: ObjectKey, numParts: Int): Int { + // Rotate through partitions + val part = nextPartition++ + return if (part == Int.MIN_VALUE) { // avoid overflow + 0 + } else { + abs(part) % numParts + } + } +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartQueue.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartQueue.kt new file mode 100644 index 0000000000000..3a84377a3525c --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartQueue.kt @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipline.object_storage + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.file.object_storage.Part +import io.airbyte.cdk.load.message.ChannelMessageQueue +import io.airbyte.cdk.load.message.PartitionedQueue +import io.airbyte.cdk.load.message.PipelineMessage +import io.airbyte.cdk.load.message.WithStream +import io.airbyte.cdk.load.state.ReservationManager +import io.airbyte.cdk.load.write.object_storage.ObjectLoader +import io.github.oshai.kotlinlogging.KotlinLogging +import io.micronaut.context.annotation.Factory +import io.micronaut.context.annotation.Requires +import jakarta.inject.Named +import jakarta.inject.Singleton +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.runBlocking + +data class ObjectKey(override val stream: DestinationStream.Descriptor, val objectKey: String) : + WithStream + +@Factory +class ObjectLoaderPartQueueFactory( + val loader: ObjectLoader, + @Named("memoryManager") val memoryManager: ReservationManager, +) { + val log = KotlinLogging.logger {} + + @Singleton + @Named("objectLoaderPartQueue") + @Requires(bean = ObjectLoader::class) + fun objectLoaderPartQueue(): PartitionedQueue> { + val bytes = memoryManager.totalCapacityBytes * loader.maxMemoryRatioReservedForParts + val reservation = runBlocking { memoryManager.reserve(bytes.toLong(), this) } + val bytesPerPartition = reservation.bytesReserved / loader.numPartWorkers + val partsPerPartition = bytesPerPartition / loader.partSizeBytes + + if (partsPerPartition < 1) { + throw IllegalArgumentException( + "Reserved $bytes/${memoryManager.totalCapacityBytes}b not enough for ${loader.numPartWorkers} ${loader.partSizeBytes}b parts" + ) + } + + log.info { + "Reserved $bytes/${memoryManager.totalCapacityBytes}b for ${loader.numPartWorkers} ${loader.partSizeBytes}b parts => $partsPerPartition capacity per queue partition" + } + + return PartitionedQueue( + (0 until loader.numUploadWorkers) + .map { + ChannelMessageQueue( + Channel>(partsPerPartition.toInt()) + ) + } + .toTypedArray() + ) + } +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartStep.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartStep.kt new file mode 100644 index 0000000000000..18e0b1475c1b5 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartStep.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipline.object_storage + +import io.airbyte.cdk.load.file.object_storage.Part +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue +import io.airbyte.cdk.load.message.PartitionedQueue +import io.airbyte.cdk.load.message.PipelineEvent +import io.airbyte.cdk.load.message.QueueWriter +import io.airbyte.cdk.load.message.StreamKey +import io.airbyte.cdk.load.pipeline.BatchUpdate +import io.airbyte.cdk.load.pipeline.LoadPipelineStep +import io.airbyte.cdk.load.pipeline.RecordCountFlushStrategy +import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask +import io.airbyte.cdk.load.write.object_storage.ObjectLoader +import io.micronaut.context.annotation.Requires +import io.micronaut.context.annotation.Value +import jakarta.inject.Named +import jakarta.inject.Singleton + +@Singleton +@Requires(bean = ObjectLoader::class) +class ObjectLoaderPartStep( + private val objectLoader: ObjectLoader, + private val recordToPartAccumulator: ObjectLoaderRecordToPartAccumulator<*>, + @Named("recordQueue") + val inputQueue: PartitionedQueue>, + @Named("batchStateUpdateQueue") val batchQueue: QueueWriter, + @Named("objectLoaderPartQueue") val partQueue: PartitionedQueue>, + @Value("\${airbyte.destination.core.record-batch-size-override:null}") + val batchSizeOverride: Long? = null, +) : LoadPipelineStep { + override val numWorkers: Int = objectLoader.numPartWorkers + + override fun taskForPartition(partition: Int): LoadPipelineStepTask<*, *, *, *, *> { + return LoadPipelineStepTask( + recordToPartAccumulator, + inputQueue.consume(partition), + batchQueue, + ObjectLoaderPartPartitioner(), + partQueue, + batchSizeOverride?.let { RecordCountFlushStrategy(it) }, + partition + ) + } +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartToObjectAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartToObjectAccumulator.kt new file mode 100644 index 0000000000000..abb5ee32df151 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartToObjectAccumulator.kt @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipline.object_storage + +import io.airbyte.cdk.load.command.DestinationCatalog +import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient +import io.airbyte.cdk.load.file.object_storage.Part +import io.airbyte.cdk.load.file.object_storage.PartBookkeeper +import io.airbyte.cdk.load.file.object_storage.StreamingUpload +import io.airbyte.cdk.load.message.Batch +import io.airbyte.cdk.load.message.WithBatchState +import io.airbyte.cdk.load.pipeline.BatchAccumulator +import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState +import io.airbyte.cdk.load.write.object_storage.ObjectLoader +import io.micronaut.context.annotation.Requires +import jakarta.inject.Singleton +import java.util.concurrent.ConcurrentHashMap + +/** + * In order to allow streaming uploads on the same key to be parallelized, upload state needs to be + * shared across workers. + */ +@Singleton +@Requires(bean = ObjectLoader::class) +class UploadsInProgress { + val byKey: ConcurrentHashMap = + ConcurrentHashMap() +} + +@Singleton +@Requires(bean = ObjectLoader::class) +class ObjectLoaderPartToObjectAccumulator( + private val client: ObjectStorageClient<*>, + private val catalog: DestinationCatalog, + private val uploads: UploadsInProgress, +) : + BatchAccumulator< + ObjectLoaderPartToObjectAccumulator.State, + ObjectKey, + Part, + ObjectLoaderPartToObjectAccumulator.ObjectResult + > { + + data class State(val streamingUpload: StreamingUpload<*>, val bookkeeper: PartBookkeeper) : + AutoCloseable { + override fun close() { + // Do Nothing + } + } + + data class ObjectResult(override val state: Batch.State) : WithBatchState + + override suspend fun start(key: ObjectKey, part: Int): State { + val stream = catalog.getStream(key.stream) + return uploads.byKey.getOrPut(key.objectKey) { + State( + client.startStreamingUpload( + key.objectKey, + metadata = ObjectStorageDestinationState.metadataFor(stream) + ), + PartBookkeeper() + ) + } + } + + override suspend fun accept(input: Part, state: State): Pair { + input.bytes?.let { state.streamingUpload.uploadPart(it, input.partIndex) } + if (input.bytes == null) { + throw IllegalStateException("Empty non-final part received: this should not happen") + } + state.bookkeeper.add(input) + if (state.bookkeeper.isComplete) { + return Pair(state, finish(state)) + } + return Pair(state, null) + } + + override suspend fun finish(state: State): ObjectResult { + if (state.bookkeeper.isComplete) { + state.streamingUpload.complete() + } // else assume another part is finishing this + return ObjectResult(Batch.State.COMPLETE) + } +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPipeline.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPipeline.kt new file mode 100644 index 0000000000000..f4c341dff710a --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPipeline.kt @@ -0,0 +1,15 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipline.object_storage + +import io.airbyte.cdk.load.pipeline.LoadPipeline +import io.airbyte.cdk.load.write.object_storage.ObjectLoader +import io.micronaut.context.annotation.Requires +import jakarta.inject.Singleton + +@Singleton +@Requires(bean = ObjectLoader::class) +class ObjectLoaderPipeline(partStep: ObjectLoaderPartStep, uploadStep: ObjectLoaderUploadStep) : + LoadPipeline(listOf(partStep, uploadStep)) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderRecordToPartAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderRecordToPartAccumulator.kt new file mode 100644 index 0000000000000..f69e7dd0da973 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderRecordToPartAccumulator.kt @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipline.object_storage + +import io.airbyte.cdk.load.command.DestinationCatalog +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.file.object_storage.BufferedFormattingWriter +import io.airbyte.cdk.load.file.object_storage.BufferedFormattingWriterFactory +import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient +import io.airbyte.cdk.load.file.object_storage.Part +import io.airbyte.cdk.load.file.object_storage.PartFactory +import io.airbyte.cdk.load.file.object_storage.PathFactory +import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue +import io.airbyte.cdk.load.message.StreamKey +import io.airbyte.cdk.load.pipeline.BatchAccumulator +import io.airbyte.cdk.load.state.DestinationStateManager +import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState +import io.airbyte.cdk.load.write.object_storage.ObjectLoader +import io.github.oshai.kotlinlogging.KotlinLogging +import io.micronaut.context.annotation.Requires +import io.micronaut.context.annotation.Value +import jakarta.inject.Singleton +import java.io.OutputStream + +@Singleton +@Requires(bean = ObjectLoader::class) +class ObjectLoaderRecordToPartAccumulator( + private val pathFactory: PathFactory, + private val catalog: DestinationCatalog, + private val writerFactory: BufferedFormattingWriterFactory, + private val client: ObjectStorageClient<*>, + private val loader: ObjectLoader, + // TODO: This doesn't need to be "DestinationState", just a couple of utility classes + val stateMananger: DestinationStateManager, + @Value("\${airbyte.destination.core.record-batch-size-override:null}") + val batchSizeOverride: Long? = null, +) : + BatchAccumulator< + ObjectLoaderRecordToPartAccumulator.State, StreamKey, DestinationRecordAirbyteValue, Part + > { + private val log = KotlinLogging.logger {} + + private val objectSizeBytes = loader.objectSizeBytes + private val partSizeBytes = loader.partSizeBytes + + data class State( + val stream: DestinationStream, + val writer: BufferedFormattingWriter, + val partFactory: PartFactory + ) : AutoCloseable { + override fun close() { + writer.close() + } + } + + private suspend fun newState(stream: DestinationStream): State { + // Determine unique file name. + val pathOnly = pathFactory.getFinalDirectory(stream) + val state = stateMananger.getState(stream) + val fileNo = state.getPartIdCounter(pathOnly).incrementAndGet() + val fileName = state.ensureUnique(pathFactory.getPathToFile(stream, fileNo)) + + // Initialize the part factory and writer. + val partFactory = PartFactory(fileName, fileNo) + log.info { "Starting part generation for $fileName (${stream.descriptor})" } + return State(stream, writerFactory.create(stream), partFactory) + } + + private fun makePart(state: State, forceFinish: Boolean = false): Part { + state.writer.flush() + val newSize = state.partFactory.totalSize + state.writer.bufferSize + val isFinal = + forceFinish || + newSize >= objectSizeBytes || + batchSizeOverride != null // HACK: This is a hack to force a flush + val bytes = + if (isFinal) { + state.writer.finish() + } else { + state.writer.takeBytes() + } + val part = state.partFactory.nextPart(bytes, isFinal) + log.info { "Creating part $part" } + return part + } + + override suspend fun start(key: StreamKey, part: Int): State { + val stream = catalog.getStream(key.stream) + return newState(stream) + } + + override suspend fun accept( + input: DestinationRecordAirbyteValue, + state: State + ): Pair?, Part?> { + state.writer.accept(input) + if (state.writer.bufferSize >= partSizeBytes || batchSizeOverride != null) { + val part = makePart(state) + val nextState = + if (part.isFinal) { + null + } else { + state + } + return Pair(nextState, part) + } + return Pair(state, null) + } + + override suspend fun finish(state: State): Part { + return makePart(state, true) + } +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderUploadStep.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderUploadStep.kt new file mode 100644 index 0000000000000..accda27f9a010 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderUploadStep.kt @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipline.object_storage + +import io.airbyte.cdk.load.file.object_storage.Part +import io.airbyte.cdk.load.message.PartitionedQueue +import io.airbyte.cdk.load.message.PipelineEvent +import io.airbyte.cdk.load.message.QueueWriter +import io.airbyte.cdk.load.message.StreamKey +import io.airbyte.cdk.load.pipeline.BatchUpdate +import io.airbyte.cdk.load.pipeline.LoadPipelineStep +import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask +import io.airbyte.cdk.load.write.object_storage.ObjectLoader +import io.micronaut.context.annotation.Requires +import jakarta.inject.Named +import jakarta.inject.Singleton + +@Singleton +@Requires(bean = ObjectLoader::class) +class ObjectLoaderUploadStep( + val loader: ObjectLoader, + val accumulator: ObjectLoaderPartToObjectAccumulator, + @Named("objectLoaderPartQueue") val partQueue: PartitionedQueue>, + @Named("batchStateUpdateQueue") val batchQueue: QueueWriter, +) : LoadPipelineStep { + override val numWorkers: Int = loader.numUploadWorkers + + override fun taskForPartition(partition: Int): LoadPipelineStepTask<*, *, *, *, *> { + return LoadPipelineStepTask( + accumulator, + partQueue.consume(partition), + batchQueue, + outputPartitioner = null, + outputQueue = + null + as + PartitionedQueue< + PipelineEvent + >?, + flushStrategy = null, + partition + ) + } +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectLoader.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectLoader.kt new file mode 100644 index 0000000000000..85c461051c8d4 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectLoader.kt @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.write.object_storage + +import io.airbyte.cdk.load.write.LoadStrategy + +/** + * [ObjectLoader] is for the use case where a destination writes records into some number of files + * in a file system or cloud storage provider whose client supports multipart uploads. + * + * Usage: + * + * - declare a bean implementing this interface and optionally override the default configuration + * values + * - declare the necessary beans to initialize an + * [io.airbyte.cdk.load.file.object_storage.ObjectStorageClient] + * - declare a bean of + * [io.airbyte.cdk.load.command.object_storage.ObjectStoragePathConfigurationProvider] to configure + * the path for each object (typically you would add this to your + * [io.airbyte.cdk.load.command.DestinationConfiguration]) + * - declare a bean of + * [io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfigurationProvider] to control + * the format in which the data is loaded + * + * Configuration: + * + * - [numPartWorkers] is the number of threads (coroutines) devoted to formatting records into + * uploadable parts. (This is typically CPU-bound). + * - [numUploadWorkers] is the number of threads (coroutines) devoted to uploading parts to the + * object storage. (This is typically Network IO-bound). + * - [maxMemoryRatioReservedForParts] is proportion of the total heap reserved for parts in memory. + * This is used to calculate the size of the work queue, which when full will cause the part workers + * to suspend until the upload workers have processed some parts. + * - [partSizeBytes] is the approximate desired part size in bytes. When this much part data has + * been accumulated by a part worker, it will be forwarded to an upload worker and uploaded to the + * destination. + * - [objectSizeBytes] is the approximate desired file size in bytes. When this much part data has + * been accumulated, the upload will be completed, and the file will become visible to the end user. + * + * Partitioning: + * + * The default partitioning is to distribute the records round-robin (in small batches) to the part + * workers (using [io.airbyte.cdk.load.pipeline.RoundRobinInputPartitioner]). To override this, + * declare a bean implementing [io.airbyte.cdk.load.pipeline.InputPartitioner], however it should be + * fine for most purposes. + * + * The parts are also distributed round-robin to the upload workers using + * [io.airbyte.cdk.load.pipeline.object_storage.ObjectLoaderPartPartitioner]. This is not currently + * configurable. + */ +interface ObjectLoader : LoadStrategy { + val numPartWorkers: Int + get() = 1 + val numUploadWorkers: Int + get() = 5 + val maxMemoryRatioReservedForParts: Double + get() = 0.2 + val partSizeBytes: Long + get() = 10L * 1024 * 1024 + val objectSizeBytes: Long + get() = 200L * 1024 * 1024 + + override val inputPartitions: Int + get() = numPartWorkers +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/pipeline/object_storage/ObjectLoaderPartPartitionerTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/pipeline/object_storage/ObjectLoaderPartPartitionerTest.kt new file mode 100644 index 0000000000000..af921b6acd3de --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/pipeline/object_storage/ObjectLoaderPartPartitionerTest.kt @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipeline.object_storage + +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.file.object_storage.Part +import io.airbyte.cdk.load.message.StreamKey +import io.airbyte.cdk.load.pipline.object_storage.ObjectKey +import io.airbyte.cdk.load.pipline.object_storage.ObjectLoaderPartPartitioner +import kotlin.test.assertEquals +import org.junit.jupiter.api.Test + +class ObjectLoaderPartPartitionerTest { + @Test + fun `partitioner distributes keys round-robin`() { + val keys = (0 until 12).map(Int::toString) + val partitioner = ObjectLoaderPartPartitioner() + val stream = DestinationStream.Descriptor("test", "stream") + val numParts = 3 + var lastPartition: Int? = null + (0 until 12).forEach { + val partition = partitioner.getPart(ObjectKey(stream, it.toString()), numParts) + lastPartition?.let { last -> assertEquals((last + 1) % numParts, partition) } + lastPartition = partition + } + } + + @Test + fun `partitioner uses object key as accumulator key`() { + val keys = listOf("foo", "bar", "baz") + val parts = + keys.mapIndexed { index, key -> + Part( + key = key, + fileNumber = index * 10L, + partIndex = index, + bytes = null, + isFinal = false + ) + } + val partitioner = ObjectLoaderPartPartitioner() + val stream = DestinationStream.Descriptor("test", "stream") + val outputKeys = parts.map { part -> partitioner.getOutputKey(StreamKey(stream), part) } + val expectedKeys = keys.map { key -> ObjectKey(stream, key) } + assertEquals(expectedKeys, outputKeys) + } +} diff --git a/airbyte-integrations/connectors/destination-s3/metadata.yaml b/airbyte-integrations/connectors/destination-s3/metadata.yaml index 7514ddc977f0e..36ec28227c669 100644 --- a/airbyte-integrations/connectors/destination-s3/metadata.yaml +++ b/airbyte-integrations/connectors/destination-s3/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: file connectorType: destination definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 - dockerImageTag: 1.5.3 + dockerImageTag: 1.5.4 dockerRepository: airbyte/destination-s3 githubIssueLabel: destination-s3 icon: s3.svg diff --git a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Configuration.kt b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Configuration.kt index d1658e831d856..d59c806cfdec5 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Configuration.kt +++ b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Configuration.kt @@ -36,9 +36,16 @@ data class S3V2Configuration( // Internal configuration override val objectStorageUploadConfiguration: ObjectStorageUploadConfiguration = ObjectStorageUploadConfiguration(), - override val numProcessRecordsWorkers: Int = 2, + override val numProcessRecordsWorkers: Int = 1, override val estimatedRecordMemoryOverheadRatio: Double = 5.0, override val processEmptyFiles: Boolean = true, + + /** Below has no effect until [S3V2ObjectLoader] is enabled. */ + val numPartWorkers: Int = 2, + val numUploadWorkers: Int = 10, + val maxMemoryRatioReservedForParts: Double = 0.2, + val objectSizeBytes: Long = 200L * 1024 * 1024, + val partSizeBytes: Long = 10L * 1024 * 1024, ) : DestinationConfiguration(), AWSAccessKeyConfigurationProvider, diff --git a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2ObjectLoader.kt b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2ObjectLoader.kt new file mode 100644 index 0000000000000..ae89bbb3fa87b --- /dev/null +++ b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2ObjectLoader.kt @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.s3_v2 + +import io.airbyte.cdk.load.pipeline.RoundRobinInputPartitioner +import io.airbyte.cdk.load.write.object_storage.ObjectLoader + +/** + * These are temporarily disabled so that we can merge the underlying changes without doing a full + * S3 release. I will do a separate PR to re-enable this and roll it out gradually. + */ + +// @Singleton +class S3V2ObjectLoader(config: S3V2Configuration<*>) : ObjectLoader { + override val numPartWorkers: Int = config.numPartWorkers + override val numUploadWorkers: Int = config.numUploadWorkers + override val maxMemoryRatioReservedForParts: Double = config.maxMemoryRatioReservedForParts + override val objectSizeBytes: Long = config.objectSizeBytes + override val partSizeBytes: Long = config.partSizeBytes +} + +// @Singleton +class S3V2RoundRobinInputPartitioner : RoundRobinInputPartitioner() diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index bca981ae15fd4..342586766b541 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -544,6 +544,7 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou | Version | Date | Pull Request | Subject | |:------------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.5.4 | 2025-03-05 | [54695](https://github.com/airbytehq/airbyte/pull/54695) | Nonfunctional changes to support performance testing | | 1.5.3 | 2025-03-04 | [54661](https://github.com/airbytehq/airbyte/pull/54661) | Nonfunctional changes to support performance testing | | 1.5.2 | 2025-02-25 | [54661](https://github.com/airbytehq/airbyte/pull/54661) | Nonfunctional cleanup; dropped unused staging code | | 1.5.1 | 2025-02-11 | [53636](https://github.com/airbytehq/airbyte/pull/53636) | Nonfunctional CDK version pin. | From 6e292e92acd83fd7ec4dc79ca9c31d743a0fa717 Mon Sep 17 00:00:00 2001 From: Johnny Schmidt Date: Fri, 7 Mar 2025 09:11:23 -0800 Subject: [PATCH 2/2] WIP: Load CDK BulkLoad interface + MSSQL V2 Usage --- .../cdk/load/task/DestinationTaskLauncher.kt | 58 ++++--- .../cdk/load/write/StreamStateStore.kt | 2 +- .../bulk/toolkits/load-db/build.gradle | 6 + .../load/pipeline/db/BulkLoadAccumulator.kt | 48 ++++++ .../load/pipeline/db/BulkLoadIntoTableStep.kt | 44 ++++++ .../load/pipeline/db/BulkLoadObjectQueue.kt | 35 +++++ .../cdk/load/pipeline/db/BulkLoadPipeline.kt | 25 +++ .../airbyte/cdk/load/write/db/BulkLoader.kt | 55 +++++++ .../ObjectLoaderObjectPartitioner.kt | 53 +++++++ .../object_storage/ObjectLoaderPartQueue.kt | 6 +- .../ObjectLoaderPartToObjectAccumulator.kt | 54 ++++--- .../object_storage/ObjectLoaderPipeline.kt | 8 +- .../object_storage/ObjectLoaderUploadStep.kt | 25 +-- .../load/write/object_storage/ObjectLoader.kt | 17 +++ .../destination-mssql-v2/build.gradle.kts | 2 +- .../destination-mssql-v2/metadata.yaml | 2 +- .../mssql/v2/MSSQLBulkLoadStreamLoader.kt | 15 +- .../destination/mssql/v2/MSSQLBulkLoader.kt | 143 ++++++++++++++++++ .../destination/mssql/v2/MSSQLWriter.kt | 8 +- .../v2/config/MSSQLBulkLoadConfiguration.kt | 64 ++++++++ .../mssql/v2/config/MSSQLConfiguration.kt | 8 +- .../mssql/v2/config/MSSQLSpecification.kt | 6 + ...eckTest.kt => MSSQLBulkLoaderCheckTest.kt} | 2 +- .../mssql/v2/MSSQLPerformanceTest.kt | 7 +- ...rTest.kt => MSSQLBulkLoaderHandlerTest.kt} | 2 +- 25 files changed, 628 insertions(+), 67 deletions(-) create mode 100644 airbyte-cdk/bulk/toolkits/load-db/build.gradle create mode 100644 airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/pipeline/db/BulkLoadAccumulator.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/pipeline/db/BulkLoadIntoTableStep.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/pipeline/db/BulkLoadObjectQueue.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/pipeline/db/BulkLoadPipeline.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/write/db/BulkLoader.kt create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderObjectPartitioner.kt create mode 100644 airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLBulkLoader.kt create mode 100644 airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLBulkLoadConfiguration.kt rename airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/{MSSQLBulkLoadCheckTest.kt => MSSQLBulkLoaderCheckTest.kt} (95%) rename airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/{MSSQLBulkLoadHandlerTest.kt => MSSQLBulkLoaderHandlerTest.kt} (99%) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt index 6cd2d48c31482..6617ef18f6644 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt @@ -197,23 +197,6 @@ class DefaultDestinationTaskLauncher( } override suspend fun run() { - // Start the input consumer ASAP - log.info { "Starting input consumer task" } - val inputConsumerTask = - inputConsumerTaskFactory.make( - catalog = catalog, - inputFlow = inputFlow, - recordQueueSupplier = recordQueueSupplier, - checkpointQueue = checkpointQueue, - fileTransferQueue = fileTransferQueue, - destinationTaskLauncher = this, - recordQueueForPipeline = recordQueueForPipeline, - loadPipeline = loadPipeline, - partitioner = partitioner, - openStreamQueue = openStreamQueue, - ) - launch(inputConsumerTask) - // Launch the client interface setup task log.info { "Starting startup task" } val setupTask = setupTaskFactory.make(this) @@ -225,12 +208,29 @@ class DefaultDestinationTaskLauncher( } if (loadPipeline != null) { - log.info { "Setting up load pipeline" } - loadPipeline.start { launch(it) } + log.info { "Setup load pipeline" } + loadPipeline.start { task -> launch(task, withExceptionHandling = true) } log.info { "Launching update batch task" } val updateBatchTask = updateBatchTaskFactory.make(this) launch(updateBatchTask) } else { + // Start the input consumer ASAP + log.info { "Starting input consumer task" } + val inputConsumerTask = + inputConsumerTaskFactory.make( + catalog = catalog, + inputFlow = inputFlow, + recordQueueSupplier = recordQueueSupplier, + checkpointQueue = checkpointQueue, + fileTransferQueue = fileTransferQueue, + destinationTaskLauncher = this, + recordQueueForPipeline = recordQueueForPipeline, + loadPipeline = loadPipeline, + partitioner = partitioner, + openStreamQueue = openStreamQueue, + ) + launch(inputConsumerTask) + // TODO: pluggable file transfer if (!fileTransferEnabled) { // Start a spill-to-disk task for each record stream @@ -289,6 +289,26 @@ class DefaultDestinationTaskLauncher( catalog.streams.forEach { openStreamQueue.publish(it) } log.info { "Closing open stream queue" } openStreamQueue.close() + } else { + // When the pipeline is enabled, input consuming for + // each stream will wait on stream start to complete, + // but not on setup. This is the simplest way to make + // it do that. + log.info { "Setup complete, starting input consumer task" } + val inputConsumerTask = + inputConsumerTaskFactory.make( + catalog = catalog, + inputFlow = inputFlow, + recordQueueSupplier = recordQueueSupplier, + checkpointQueue = checkpointQueue, + fileTransferQueue = fileTransferQueue, + destinationTaskLauncher = this, + recordQueueForPipeline = recordQueueForPipeline, + loadPipeline = loadPipeline, + partitioner = partitioner, + openStreamQueue = openStreamQueue, + ) + launch(inputConsumerTask) } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamStateStore.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamStateStore.kt index 4292a39c2e238..3464f331bff71 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamStateStore.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/StreamStateStore.kt @@ -21,7 +21,7 @@ class StreamStateStore { store[stream] = state } - fun get(stream: DestinationStream.Descriptor): S? { + operator fun get(stream: DestinationStream.Descriptor): S? { return store[stream] } } diff --git a/airbyte-cdk/bulk/toolkits/load-db/build.gradle b/airbyte-cdk/bulk/toolkits/load-db/build.gradle new file mode 100644 index 0000000000000..6c6a0e4b68f9d --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-db/build.gradle @@ -0,0 +1,6 @@ +dependencies { + implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base') + implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load') + + api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-object-storage') +} diff --git a/airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/pipeline/db/BulkLoadAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/pipeline/db/BulkLoadAccumulator.kt new file mode 100644 index 0000000000000..a58eaafc9e9bf --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/pipeline/db/BulkLoadAccumulator.kt @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipeline.db + +import io.airbyte.cdk.load.file.object_storage.RemoteObject +import io.airbyte.cdk.load.message.Batch +import io.airbyte.cdk.load.message.WithBatchState +import io.airbyte.cdk.load.message.WithStream +import io.airbyte.cdk.load.pipeline.BatchAccumulator +import io.airbyte.cdk.load.pipline.object_storage.LoadedObject +import io.airbyte.cdk.load.write.db.BulkLoader +import io.airbyte.cdk.load.write.db.BulkLoaderFactory +import io.micronaut.context.annotation.Requires +import jakarta.inject.Singleton + +@Singleton +@Requires(bean = BulkLoaderFactory::class) +class BulkLoadAccumulator>( + val bulkLoad: BulkLoaderFactory +) : + BatchAccumulator< + BulkLoadAccumulator.State, K, LoadedObject, BulkLoadAccumulator.LoadResult> { + inner class State(val bulkLoader: BulkLoader) : AutoCloseable { + override fun close() { + bulkLoader.close() + } + } + + data object LoadResult : WithBatchState { + override val state: Batch.State = Batch.State.COMPLETE + } + + override suspend fun start(key: K, part: Int): State { + return State(bulkLoad.create(key)) + } + + override suspend fun accept(input: LoadedObject, state: State): Pair { + // Some workers might forward an object already completed by another. + if (!input.alreadyComplete) { + state.bulkLoader.load(input.remoteObject) + } + return state to LoadResult + } + + override suspend fun finish(state: State): LoadResult = LoadResult +} diff --git a/airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/pipeline/db/BulkLoadIntoTableStep.kt b/airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/pipeline/db/BulkLoadIntoTableStep.kt new file mode 100644 index 0000000000000..38229dc9e2815 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/pipeline/db/BulkLoadIntoTableStep.kt @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipeline.db + +import io.airbyte.cdk.load.file.object_storage.RemoteObject +import io.airbyte.cdk.load.message.PartitionedQueue +import io.airbyte.cdk.load.message.PipelineEvent +import io.airbyte.cdk.load.message.QueueWriter +import io.airbyte.cdk.load.message.WithStream +import io.airbyte.cdk.load.pipeline.BatchUpdate +import io.airbyte.cdk.load.pipeline.LoadPipelineStep +import io.airbyte.cdk.load.pipline.object_storage.LoadedObject +import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask +import io.airbyte.cdk.load.write.db.BulkLoaderFactory +import io.micronaut.context.annotation.Requires +import jakarta.inject.Named +import jakarta.inject.Singleton + +@Singleton +@Requires(bean = BulkLoaderFactory::class) +class BulkLoadIntoTableStep>( + val bulkLoadAccumulator: BulkLoadAccumulator, + val bulkLoad: BulkLoaderFactory, + @Named("objectLoaderOutputQueue") + val bulkLoadObjectQueue: PartitionedQueue>>, + @Named("batchStateUpdateQueue") val batchUpdateQueue: QueueWriter, +) : LoadPipelineStep { + override val numWorkers: Int = bulkLoad.maxNumConcurrentLoads + + /** TODO: This should just be a task: no need for a whole accumulator pipeline here */ + override fun taskForPartition(partition: Int): LoadPipelineStepTask<*, *, *, *, *> { + return LoadPipelineStepTask( + bulkLoadAccumulator, + bulkLoadObjectQueue.consume(partition), + batchUpdateQueue, + null, + null as PartitionedQueue>?, + null, + partition + ) + } +} diff --git a/airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/pipeline/db/BulkLoadObjectQueue.kt b/airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/pipeline/db/BulkLoadObjectQueue.kt new file mode 100644 index 0000000000000..89e0ffd0e240c --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/pipeline/db/BulkLoadObjectQueue.kt @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipeline.db + +import io.airbyte.cdk.load.file.object_storage.RemoteObject +import io.airbyte.cdk.load.message.ChannelMessageQueue +import io.airbyte.cdk.load.message.PartitionedQueue +import io.airbyte.cdk.load.message.PipelineEvent +import io.airbyte.cdk.load.message.WithStream +import io.airbyte.cdk.load.pipline.object_storage.LoadedObject +import io.airbyte.cdk.load.write.db.BulkLoaderFactory +import io.micronaut.context.annotation.Factory +import io.micronaut.context.annotation.Requires +import io.micronaut.context.annotation.Secondary +import jakarta.inject.Named +import jakarta.inject.Singleton +import kotlinx.coroutines.channels.Channel + +@Factory +@Requires(bean = BulkLoaderFactory::class) +class BulkLoadObjectQueueFactory>( + val bulkLoad: BulkLoaderFactory +) { + @Singleton + @Named("objectLoaderOutputQueue") + @Secondary + fun bulkLoadObjectQueue(): PartitionedQueue>> = + PartitionedQueue( + (0 until bulkLoad.maxNumConcurrentLoads) + .map { ChannelMessageQueue>>(Channel(1)) } + .toTypedArray() + ) +} diff --git a/airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/pipeline/db/BulkLoadPipeline.kt b/airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/pipeline/db/BulkLoadPipeline.kt new file mode 100644 index 0000000000000..7f6885c854d67 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/pipeline/db/BulkLoadPipeline.kt @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipeline.db + +import io.airbyte.cdk.load.file.object_storage.RemoteObject +import io.airbyte.cdk.load.message.WithStream +import io.airbyte.cdk.load.pipeline.LoadPipeline +import io.airbyte.cdk.load.pipline.object_storage.ObjectLoaderPartStep +import io.airbyte.cdk.load.pipline.object_storage.ObjectLoaderPipeline +import io.airbyte.cdk.load.pipline.object_storage.ObjectLoaderUploadStep +import io.airbyte.cdk.load.write.db.BulkLoaderFactory +import io.micronaut.context.annotation.Replaces +import io.micronaut.context.annotation.Requires +import jakarta.inject.Singleton + +@Singleton +@Requires(bean = BulkLoaderFactory::class) +@Replaces(ObjectLoaderPipeline::class) +class BulkLoadPipeline>( + partStep: ObjectLoaderPartStep, + uploadStep: ObjectLoaderUploadStep, + loadIntoTableStep: BulkLoadIntoTableStep<*, *>, +) : LoadPipeline(listOf(partStep, uploadStep, loadIntoTableStep)) diff --git a/airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/write/db/BulkLoader.kt b/airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/write/db/BulkLoader.kt new file mode 100644 index 0000000000000..8e3f0a40aedd2 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-db/src/main/kotlin/io/airbyte/cdk/load/write/db/BulkLoader.kt @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.write.db + +import io.airbyte.cdk.load.file.object_storage.RemoteObject +import io.airbyte.cdk.load.message.Batch +import io.airbyte.cdk.load.message.WithStream +import io.airbyte.cdk.load.write.object_storage.ObjectLoader + +/** + * [BulkLoader] is for the use case in which a destination first stages records in a temporary + * location, usually cloud object storage, then loads them into the destination database in bulk. + * + * To use, declare a singleton of type [BulkLoaderFactory] and implement the + * [BulkLoaderFactory.create] method. + * + * This strategy composes [ObjectLoader] with a post-processing step provided by the [load] method. + * As [ObjectLoader] makes loaded objects as they become available, [load] is called on each one in + * sequence. + * + * [BulkLoaderFactory.maxNumConcurrentLoads] determines the number of concurrent loads that can be + * in progress at once. + * + * The key type [K] determines how the destination will partition the work. By default, + * [io.airbyte.cdk.load.message.StreamKey] is provided and an interface using this type will just + * work. Specifically, no more than one [load] will ever be in progress per stream, but up to + * [BulkLoaderFactory.maxNumConcurrentLoads] can be in progress at once. + * + * Additionally, the configuration values provided by [ObjectLoader] can be overridden on the + * factory and will work as documents. + * + * The factory method [BulkLoaderFactory.create] will be called once per key the first time a key is + * seen. It is guaranteed to be closed if created. + * + * To adjust this behavior, declare a named singleton "objectLoaderOutputPartitioner" using the + * desired key and/or partition strategy. (See + * [io.airbyte.cdk.load.pipline.object_storage.ObjectLoaderUploadStep]) + * + * TODO: provide a method that allows the user to check for an available connection and defer work + * if it is not available. + */ +interface BulkLoader> : AutoCloseable { + /** Called as uploaded parts become available */ + suspend fun load(remoteObject: T) +} + +interface BulkLoaderFactory> : ObjectLoader { + val maxNumConcurrentLoads: Int + override val batchStateOnUpload: Batch.State + get() = Batch.State.PERSISTED + + fun create(key: K): BulkLoader +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderObjectPartitioner.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderObjectPartitioner.kt new file mode 100644 index 0000000000000..989764cf84342 --- /dev/null +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderObjectPartitioner.kt @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.cdk.load.pipline.object_storage + +import io.airbyte.cdk.load.file.object_storage.Part +import io.airbyte.cdk.load.file.object_storage.RemoteObject +import io.airbyte.cdk.load.message.StreamKey +import io.airbyte.cdk.load.pipeline.OutputPartitioner +import io.airbyte.cdk.load.write.object_storage.ObjectLoader +import io.micronaut.context.annotation.Requires +import io.micronaut.context.annotation.Secondary +import jakarta.inject.Named +import jakarta.inject.Singleton +import kotlin.math.abs +import kotlin.random.Random + +/** + * The default output partitioner for the ObjectLoader pipeline. It will not actually be used unless + * an output queue for the loaded objects is provided. (see + * [io.airbyte.cdk.load.write.object_storage.ObjectLoader]). + * + * In that case, the default behavior will be to key and partition by stream. This means that the + * number of concurrent loaders can be chosen based on the available resources, but no individual + * stream will ever run concurrently. + */ +@Singleton +@Secondary +@Requires(bean = ObjectLoader::class) +@Named("objectLoaderOutputPartitioner") +class ObjectLoaderObjectPartitioner> : + OutputPartitioner< + ObjectKey, + Part, + StreamKey, + LoadedObject, + > { + // TODO: Abstract this out to a round-robin partition generator + private var nextPartition = Random(System.currentTimeMillis()).nextInt(Int.MAX_VALUE) + + override fun getPart(outputKey: StreamKey, numParts: Int): Int { + val part = nextPartition++ + return if (part == Int.MIN_VALUE) { + 0 + } else { + abs(part) % numParts + } + } + + override fun getOutputKey(inputKey: ObjectKey, output: LoadedObject): StreamKey = + StreamKey(inputKey.stream) +} diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartQueue.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartQueue.kt index 3a84377a3525c..01e4d7212ff93 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartQueue.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartQueue.kt @@ -36,17 +36,17 @@ class ObjectLoaderPartQueueFactory( fun objectLoaderPartQueue(): PartitionedQueue> { val bytes = memoryManager.totalCapacityBytes * loader.maxMemoryRatioReservedForParts val reservation = runBlocking { memoryManager.reserve(bytes.toLong(), this) } - val bytesPerPartition = reservation.bytesReserved / loader.numPartWorkers + val bytesPerPartition = reservation.bytesReserved / loader.numUploadWorkers val partsPerPartition = bytesPerPartition / loader.partSizeBytes if (partsPerPartition < 1) { throw IllegalArgumentException( - "Reserved $bytes/${memoryManager.totalCapacityBytes}b not enough for ${loader.numPartWorkers} ${loader.partSizeBytes}b parts" + "Reserved $bytes/${memoryManager.totalCapacityBytes}b not enough for ${loader.numUploadWorkers} ${loader.partSizeBytes}b parts" ) } log.info { - "Reserved $bytes/${memoryManager.totalCapacityBytes}b for ${loader.numPartWorkers} ${loader.partSizeBytes}b parts => $partsPerPartition capacity per queue partition" + "Reserved $bytes/${memoryManager.totalCapacityBytes}b for ${loader.numUploadWorkers} ${loader.partSizeBytes}b parts => $partsPerPartition capacity per queue partition" } return PartitionedQueue( diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartToObjectAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartToObjectAccumulator.kt index abb5ee32df151..38cbec95e1986 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartToObjectAccumulator.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartToObjectAccumulator.kt @@ -8,15 +8,18 @@ import io.airbyte.cdk.load.command.DestinationCatalog import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient import io.airbyte.cdk.load.file.object_storage.Part import io.airbyte.cdk.load.file.object_storage.PartBookkeeper +import io.airbyte.cdk.load.file.object_storage.RemoteObject import io.airbyte.cdk.load.file.object_storage.StreamingUpload import io.airbyte.cdk.load.message.Batch import io.airbyte.cdk.load.message.WithBatchState import io.airbyte.cdk.load.pipeline.BatchAccumulator import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState +import io.airbyte.cdk.load.util.setOnce import io.airbyte.cdk.load.write.object_storage.ObjectLoader import io.micronaut.context.annotation.Requires import jakarta.inject.Singleton import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean /** * In order to allow streaming uploads on the same key to be parallelized, upload state needs to be @@ -24,33 +27,40 @@ import java.util.concurrent.ConcurrentHashMap */ @Singleton @Requires(bean = ObjectLoader::class) -class UploadsInProgress { - val byKey: ConcurrentHashMap = +class UploadsInProgress> { + val byKey: ConcurrentHashMap.State> = ConcurrentHashMap() } +data class LoadedObject>( + val remoteObject: T, + override val state: Batch.State = Batch.State.COMPLETE, + val alreadyComplete: Boolean +) : WithBatchState + @Singleton @Requires(bean = ObjectLoader::class) -class ObjectLoaderPartToObjectAccumulator( - private val client: ObjectStorageClient<*>, +class ObjectLoaderPartToObjectAccumulator>( + private val client: ObjectStorageClient, private val catalog: DestinationCatalog, - private val uploads: UploadsInProgress, + private val uploads: UploadsInProgress, + private val strategy: ObjectLoader, ) : BatchAccumulator< - ObjectLoaderPartToObjectAccumulator.State, - ObjectKey, - Part, - ObjectLoaderPartToObjectAccumulator.ObjectResult - > { + ObjectLoaderPartToObjectAccumulator.State, ObjectKey, Part, LoadedObject> { - data class State(val streamingUpload: StreamingUpload<*>, val bookkeeper: PartBookkeeper) : - AutoCloseable { + inner class State( + val streamingUpload: StreamingUpload, + val bookkeeper: PartBookkeeper, + val isComplete: AtomicBoolean = AtomicBoolean(false), + ) : AutoCloseable { override fun close() { // Do Nothing } } - data class ObjectResult(override val state: Batch.State) : WithBatchState + data class ObjectResult(override val state: Batch.State, val objectKey: String) : + WithBatchState override suspend fun start(key: ObjectKey, part: Int): State { val stream = catalog.getStream(key.stream) @@ -60,27 +70,29 @@ class ObjectLoaderPartToObjectAccumulator( key.objectKey, metadata = ObjectStorageDestinationState.metadataFor(stream) ), - PartBookkeeper() + PartBookkeeper(), ) } } - override suspend fun accept(input: Part, state: State): Pair { + override suspend fun accept(input: Part, state: State): Pair?> { input.bytes?.let { state.streamingUpload.uploadPart(it, input.partIndex) } if (input.bytes == null) { throw IllegalStateException("Empty non-final part received: this should not happen") } state.bookkeeper.add(input) if (state.bookkeeper.isComplete) { - return Pair(state, finish(state)) + return Pair(null, finish(state)) } return Pair(state, null) } - override suspend fun finish(state: State): ObjectResult { - if (state.bookkeeper.isComplete) { - state.streamingUpload.complete() - } // else assume another part is finishing this - return ObjectResult(Batch.State.COMPLETE) + override suspend fun finish(state: State): LoadedObject { + val obj = state.streamingUpload.complete() + return LoadedObject( + obj, + strategy.batchStateOnUpload, + alreadyComplete = !state.isComplete.setOnce() + ) } } diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPipeline.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPipeline.kt index f4c341dff710a..52e15a7fd968d 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPipeline.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPipeline.kt @@ -4,6 +4,8 @@ package io.airbyte.cdk.load.pipline.object_storage +import io.airbyte.cdk.load.file.object_storage.RemoteObject +import io.airbyte.cdk.load.message.WithStream import io.airbyte.cdk.load.pipeline.LoadPipeline import io.airbyte.cdk.load.write.object_storage.ObjectLoader import io.micronaut.context.annotation.Requires @@ -11,5 +13,7 @@ import jakarta.inject.Singleton @Singleton @Requires(bean = ObjectLoader::class) -class ObjectLoaderPipeline(partStep: ObjectLoaderPartStep, uploadStep: ObjectLoaderUploadStep) : - LoadPipeline(listOf(partStep, uploadStep)) +class ObjectLoaderPipeline>( + partStep: ObjectLoaderPartStep, + uploadStep: ObjectLoaderUploadStep +) : LoadPipeline(listOf(partStep, uploadStep)) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderUploadStep.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderUploadStep.kt index accda27f9a010..48554bc4539e2 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderUploadStep.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderUploadStep.kt @@ -5,12 +5,14 @@ package io.airbyte.cdk.load.pipline.object_storage import io.airbyte.cdk.load.file.object_storage.Part +import io.airbyte.cdk.load.file.object_storage.RemoteObject import io.airbyte.cdk.load.message.PartitionedQueue import io.airbyte.cdk.load.message.PipelineEvent import io.airbyte.cdk.load.message.QueueWriter -import io.airbyte.cdk.load.message.StreamKey +import io.airbyte.cdk.load.message.WithStream import io.airbyte.cdk.load.pipeline.BatchUpdate import io.airbyte.cdk.load.pipeline.LoadPipelineStep +import io.airbyte.cdk.load.pipeline.OutputPartitioner import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask import io.airbyte.cdk.load.write.object_storage.ObjectLoader import io.micronaut.context.annotation.Requires @@ -19,11 +21,19 @@ import jakarta.inject.Singleton @Singleton @Requires(bean = ObjectLoader::class) -class ObjectLoaderUploadStep( +class ObjectLoaderUploadStep>( val loader: ObjectLoader, - val accumulator: ObjectLoaderPartToObjectAccumulator, + val accumulator: ObjectLoaderPartToObjectAccumulator, @Named("objectLoaderPartQueue") val partQueue: PartitionedQueue>, @Named("batchStateUpdateQueue") val batchQueue: QueueWriter, + @Named("objectLoaderOutputPartitioner") + val partitioner: OutputPartitioner>, + /** + * Optional output queue for when strategies other than ObjectLoader use this step as part of a + * larger pipeline. + */ + @Named("objectLoaderOutputQueue") + val objectQueue: PartitionedQueue>>? = null ) : LoadPipelineStep { override val numWorkers: Int = loader.numUploadWorkers @@ -32,13 +42,8 @@ class ObjectLoaderUploadStep( accumulator, partQueue.consume(partition), batchQueue, - outputPartitioner = null, - outputQueue = - null - as - PartitionedQueue< - PipelineEvent - >?, + outputPartitioner = partitioner, + outputQueue = objectQueue, flushStrategy = null, partition ) diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectLoader.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectLoader.kt index 85c461051c8d4..54ea206b27fe9 100644 --- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectLoader.kt +++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectLoader.kt @@ -4,6 +4,7 @@ package io.airbyte.cdk.load.write.object_storage +import io.airbyte.cdk.load.message.Batch import io.airbyte.cdk.load.write.LoadStrategy /** @@ -38,6 +39,11 @@ import io.airbyte.cdk.load.write.LoadStrategy * destination. * - [objectSizeBytes] is the approximate desired file size in bytes. When this much part data has * been accumulated, the upload will be completed, and the file will become visible to the end user. + * - [batchStateOnUpload] determines whether work is considered COMPLETE on upload (the default), + * PERSISTED (not complete but recoverable and safe to ack to the platform), or STAGED/PROCESSED + * (neither). Connector devs who are implementing file storage destinations should not need to + * change this. It is for use internally by pipelines that compose object storage into more complex + * workflows (ie, bulk load). * * Partitioning: * @@ -49,6 +55,15 @@ import io.airbyte.cdk.load.write.LoadStrategy * The parts are also distributed round-robin to the upload workers using * [io.airbyte.cdk.load.pipeline.object_storage.ObjectLoaderPartPartitioner]. This is not currently * configurable. + * + * Post-processing: + * + * If a connector dev implements this interface directly, there will by default be no + * post-processing of individual files (except what is done when the stream is closed). + * + * However, CDK devs who are composing this into part of a larger strategy will probably also want + * to declare a named object ["objectLoaderObjectQueue"] to pass the final uploaded object to + * another step (see [io.airbyte.cdk.load.pipeline.object_storage.ObjectLoaderUploadStep]). */ interface ObjectLoader : LoadStrategy { val numPartWorkers: Int @@ -62,6 +77,8 @@ interface ObjectLoader : LoadStrategy { val objectSizeBytes: Long get() = 200L * 1024 * 1024 + val batchStateOnUpload: Batch.State + get() = Batch.State.COMPLETE override val inputPartitions: Int get() = numPartWorkers } diff --git a/airbyte-integrations/connectors/destination-mssql-v2/build.gradle.kts b/airbyte-integrations/connectors/destination-mssql-v2/build.gradle.kts index 03facc4fbf4bd..9f9b1d509a2ca 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/build.gradle.kts +++ b/airbyte-integrations/connectors/destination-mssql-v2/build.gradle.kts @@ -9,7 +9,7 @@ plugins { airbyteBulkConnector { core = "load" - toolkits = listOf("load-azure-blob-storage") + toolkits = listOf("load-azure-blob-storage", "load-db") cdk = "local" } diff --git a/airbyte-integrations/connectors/destination-mssql-v2/metadata.yaml b/airbyte-integrations/connectors/destination-mssql-v2/metadata.yaml index 45513bcc6036f..53f057649644f 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/metadata.yaml +++ b/airbyte-integrations/connectors/destination-mssql-v2/metadata.yaml @@ -16,7 +16,7 @@ data: type: GSM connectorType: destination definitionId: 37a928c1-2d5c-431a-a97d-ae236bd1ea0c - dockerImageTag: 0.1.15 + dockerImageTag: 0.1.16 dockerRepository: airbyte/destination-mssql-v2 documentationUrl: https://docs.airbyte.com/integrations/destinations/mssql-v2 githubIssueLabel: destination-mssql-v2 diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLBulkLoadStreamLoader.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLBulkLoadStreamLoader.kt index 756d4993ef131..54e34c9f1d130 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLBulkLoadStreamLoader.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLBulkLoadStreamLoader.kt @@ -27,6 +27,7 @@ import io.airbyte.cdk.load.message.Batch import io.airbyte.cdk.load.message.object_storage.LoadedObject import io.airbyte.cdk.load.state.StreamProcessingFailed import io.airbyte.cdk.load.write.BatchAccumulator +import io.airbyte.cdk.load.write.StreamStateStore import io.airbyte.cdk.load.write.object_storage.PartToObjectAccumulator import io.airbyte.cdk.load.write.object_storage.RecordToPartAccumulator import java.io.ByteArrayOutputStream @@ -43,7 +44,8 @@ class MSSQLBulkLoadStreamLoader( private val defaultSchema: String, private val azureBlobClient: AzureBlobClient, private val validateValuesPreLoad: Boolean, - private val recordBatchSizeOverride: Long? = null + private val recordBatchSizeOverride: Long? = null, + private val streamStateStore: StreamStateStore ) : AbstractMSSQLStreamLoader(dataSource, stream, sqlBuilder) { // Bulk-load related collaborators @@ -68,6 +70,8 @@ class MSSQLBulkLoadStreamLoader( override suspend fun start() { super.start() // calls ensureTableExists() formatFilePath = mssqlFormatFileCreator.createAndUploadFormatFile(defaultSchema).key + val state = MSSQLStreamState(dataSource, formatFilePath) + streamStateStore.put(stream.descriptor, state) } /** @@ -204,3 +208,12 @@ class MSSQLBulkLoadStreamLoader( ) } } + +/** + * For use by the new interface (to pass stream state creating during `start` to the BulkLoad + * loader.) + */ +data class MSSQLStreamState( + val dataSource: DataSource, + val formatFilePath: String, +) diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLBulkLoader.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLBulkLoader.kt new file mode 100644 index 0000000000000..5c70f0b8463f8 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLBulkLoader.kt @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2 + +import io.airbyte.cdk.load.command.Dedupe +import io.airbyte.cdk.load.command.DestinationCatalog +import io.airbyte.cdk.load.command.DestinationStream +import io.airbyte.cdk.load.data.csv.toCsvHeader +import io.airbyte.cdk.load.data.withAirbyteMeta +import io.airbyte.cdk.load.file.azureBlobStorage.AzureBlob +import io.airbyte.cdk.load.file.azureBlobStorage.AzureBlobClient +import io.airbyte.cdk.load.message.Batch +import io.airbyte.cdk.load.message.StreamKey +import io.airbyte.cdk.load.write.StreamStateStore +import io.airbyte.cdk.load.write.db.BulkLoader +import io.airbyte.cdk.load.write.db.BulkLoaderFactory +import io.airbyte.integrations.destination.mssql.v2.config.MSSQLBulkLoadConfiguration +import io.airbyte.integrations.destination.mssql.v2.config.MSSQLConfiguration +import io.github.oshai.kotlinlogging.KotlinLogging +import io.micronaut.context.annotation.Requires +import jakarta.inject.Singleton +import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.runBlocking + +class MSSQLBulkLoader( + private val azureBlobClient: AzureBlobClient, + private val stream: DestinationStream, + private val mssqlBulkLoadHandler: MSSQLBulkLoadHandler, + private val formatFilePath: String +) : BulkLoader { + private val log = KotlinLogging.logger {} + + override suspend fun load(remoteObject: AzureBlob) { + val dataFilePath = remoteObject.key + try { + if (stream.importType is Dedupe) { + handleDedup(dataFilePath) + } else { + handleAppendOverwrite(dataFilePath) + } + } finally { + // Best-effort cleanup of the data blob + deleteBlobSafe(dataFilePath) + } + } + + /** + * Merges upsert data by creating a temporary table, bulk-loading the CSV, and then MERGEing + * into the destination table using the PK columns. + */ + private fun handleDedup(dataFilePath: String) { + val importType = stream.importType as Dedupe + val primaryKey = + if (importType.primaryKey.isNotEmpty()) { + importType.primaryKey.flatten() + } else { + // If no dedicated PK is provided, use the cursor as the PK + importType.cursor + } + + // Build the full list of columns, including the Airbyte metadata columns + val allColumns = stream.schema.withAirbyteMeta(true).toCsvHeader().toList() + + val nonPkColumns = allColumns - primaryKey.toSet() + + mssqlBulkLoadHandler.bulkLoadAndUpsertForDedup( + primaryKeyColumns = primaryKey, + cursorColumns = importType.cursor, + nonPkColumns = nonPkColumns, + dataFilePath = dataFilePath, + formatFilePath = formatFilePath + ) + } + + /** Performs a simple bulk insert (append-overwrite behavior). */ + private fun handleAppendOverwrite(dataFilePath: String) { + runBlocking { println("HERE: ${azureBlobClient.list(dataFilePath).toList()}") } + mssqlBulkLoadHandler.bulkLoadForAppendOverwrite( + dataFilePath = dataFilePath, + formatFilePath = formatFilePath + ) + } + + /** + * Safely attempts to delete the provided blob path, logging any errors but not rethrowing by + * default. + */ + private fun deleteBlobSafe(path: String) { + try { + runBlocking { azureBlobClient.delete(path) } + } catch (e: Exception) { + log.error(e) { "Failed to delete blob at path=$path. Cause: ${e.message}" } + } + } + + override fun close() { + /* Do nothing */ + } +} + +@Singleton +@Requires(beans = [MSSQLBulkLoadConfiguration::class]) +class MSSQLBulkLoaderFactory( + private val azureBlobClient: AzureBlobClient, + private val catalog: DestinationCatalog, + private val config: MSSQLConfiguration, + private val bulkLoadConfig: MSSQLBulkLoadConfiguration, + private val streamStateStore: StreamStateStore +) : BulkLoaderFactory { + override val numPartWorkers: Int = config.numProcessRecordsWorkers + override val numUploadWorkers: Int = config.numProcessBatchWorkers + override val maxNumConcurrentLoads: Int = 1 + + override val objectSizeBytes: Long = 200 * 1024 * 1024 + override val partSizeBytes: Long = 10 * 1024 * 1024 + + // Default is to assume persisted, but it seems we don't recover + // orphaned objects. + override val batchStateOnUpload: Batch.State = Batch.State.STAGED + + private val defaultSchema = config.schema + // This cast is guaranteed to succeed by the `Requires` condition + + override fun create(key: StreamKey): BulkLoader { + val stream = catalog.getStream(key.stream) + val mssqlBulkLoadHandler = + MSSQLBulkLoadHandler( + streamStateStore[key.stream]!!.dataSource, + stream.descriptor.namespace ?: defaultSchema, + stream.descriptor.name, + bulkLoadConfig.dataSource, + MSSQLQueryBuilder(config.schema, stream) + ) + return MSSQLBulkLoader( + azureBlobClient, + stream, + mssqlBulkLoadHandler, + streamStateStore[key.stream]!!.formatFilePath + ) + } +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriter.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriter.kt index 17385294d3a54..18c660b5558df 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriter.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLWriter.kt @@ -8,6 +8,7 @@ import io.airbyte.cdk.load.command.DestinationStream import io.airbyte.cdk.load.state.DestinationFailure import io.airbyte.cdk.load.write.DestinationWriter import io.airbyte.cdk.load.write.StreamLoader +import io.airbyte.cdk.load.write.StreamStateStore import io.airbyte.integrations.destination.mssql.v2.config.AzureBlobStorageClientCreator import io.airbyte.integrations.destination.mssql.v2.config.BulkLoadConfiguration import io.airbyte.integrations.destination.mssql.v2.config.InsertLoadTypeConfiguration @@ -22,7 +23,8 @@ class MSSQLWriter( private val config: MSSQLConfiguration, private val dataSourceFactory: MSSQLDataSourceFactory, @Value("\${airbyte.destination.core.record-batch-size-override:null}") - private val recordBatchSizeOverride: Long? = null + private val recordBatchSizeOverride: Long? = null, + private val streamStateStore: StreamStateStore ) : DestinationWriter { /** Lazily initialized when [setup] is called. */ @@ -41,6 +43,7 @@ class MSSQLWriter( // Pick which loader to use based on the load type configuration return when (val loadConfig = config.mssqlLoadTypeConfiguration.loadTypeConfiguration) { is BulkLoadConfiguration -> { + // TODO: Drop this dummy object after migrating the Insert Load Type MSSQLBulkLoadStreamLoader( stream = stream, dataSource = dataSourceNotNull, @@ -50,7 +53,8 @@ class MSSQLWriter( azureBlobClient = AzureBlobStorageClientCreator.createAzureBlobClient(loadConfig), validateValuesPreLoad = loadConfig.validateValuesPreLoad ?: false, - recordBatchSizeOverride = recordBatchSizeOverride + recordBatchSizeOverride = recordBatchSizeOverride, + streamStateStore = streamStateStore ) } is InsertLoadTypeConfiguration -> { diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLBulkLoadConfiguration.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLBulkLoadConfiguration.kt new file mode 100644 index 0000000000000..fa8e879418336 --- /dev/null +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLBulkLoadConfiguration.kt @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2024 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.destination.mssql.v2.config + +import io.airbyte.cdk.load.command.azureBlobStorage.AzureBlobStorageConfiguration +import io.airbyte.cdk.load.command.azureBlobStorage.AzureBlobStorageConfigurationProvider +import io.airbyte.cdk.load.command.object_storage.MSSQLCSVFormatConfiguration +import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfiguration +import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfigurationProvider +import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfiguration +import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfigurationProvider +import io.airbyte.cdk.load.command.object_storage.ObjectStoragePathConfiguration +import io.airbyte.cdk.load.command.object_storage.ObjectStoragePathConfigurationProvider +import io.airbyte.cdk.load.file.NoopProcessor +import io.micronaut.context.annotation.Requires +import io.micronaut.context.condition.Condition +import io.micronaut.context.condition.ConditionContext +import jakarta.inject.Singleton +import java.io.ByteArrayOutputStream + +class MSSQLIsConfiguredForBulkLoad : Condition { + override fun matches(context: ConditionContext<*>): Boolean { + val config = context.beanContext.getBean(MSSQLConfiguration::class.java) + return config.mssqlLoadTypeConfiguration.loadTypeConfiguration is BulkLoadConfiguration + } +} + +@Singleton +@Requires(condition = MSSQLIsConfiguredForBulkLoad::class) +class MSSQLBulkLoadConfiguration( + private val config: MSSQLConfiguration, +) : + ObjectStoragePathConfigurationProvider, + ObjectStorageFormatConfigurationProvider, + ObjectStorageCompressionConfigurationProvider, + AzureBlobStorageConfigurationProvider { + + // Cast is guaranteed to succeed by the `Requires` guard. + private val bulkLoadConfig = + config.mssqlLoadTypeConfiguration.loadTypeConfiguration as BulkLoadConfiguration + + val dataSource: String = bulkLoadConfig.bulkLoadDataSource + override val objectStoragePathConfiguration = + ObjectStoragePathConfiguration( + prefix = "blob", + pathPattern = "\${NAMESPACE}/\${STREAM_NAME}/\${YEAR}/\${MONTH}/\${DAY}/\${EPOCH}/", + fileNamePattern = "{part_number}{format_extension}", + ) + override val objectStorageFormatConfiguration: ObjectStorageFormatConfiguration = + MSSQLCSVFormatConfiguration( + validateValuesPreLoad = bulkLoadConfig.validateValuesPreLoad == true + ) + override val objectStorageCompressionConfiguration: + ObjectStorageCompressionConfiguration = + ObjectStorageCompressionConfiguration(NoopProcessor) + override val azureBlobStorageConfiguration: AzureBlobStorageConfiguration = + AzureBlobStorageConfiguration( + accountName = bulkLoadConfig.accountName, + containerName = bulkLoadConfig.containerName, + sharedAccessSignature = bulkLoadConfig.sharedAccessSignature, + ) +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLConfiguration.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLConfiguration.kt index b2ee79c22ac58..56dc82d406a0a 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLConfiguration.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLConfiguration.kt @@ -22,9 +22,9 @@ data class MSSQLConfiguration( val jdbcUrlParams: String?, val sslMethod: EncryptionMethod, override val mssqlLoadTypeConfiguration: MSSQLLoadTypeConfiguration, + override val numProcessRecordsWorkers: Int, + override val numProcessBatchWorkers: Int ) : DestinationConfiguration(), MSSQLLoadTypeConfigurationProvider { - override val numProcessRecordsWorkers = 1 - override val numProcessBatchWorkers: Int = 1 override val processEmptyFiles: Boolean = true override val recordBatchSizeBytes = ObjectStorageUploadConfiguration.DEFAULT_PART_SIZE_BYTES } @@ -58,7 +58,9 @@ class MSSQLConfigurationFactory(private val featureFlags: Set) : password = overrides.getOrDefault("password", spec.password), jdbcUrlParams = overrides.getOrDefault("jdbcUrlParams", spec.jdbcUrlParams), sslMethod = spec.sslMethod, - mssqlLoadTypeConfiguration = spec.toLoadConfiguration() + mssqlLoadTypeConfiguration = spec.toLoadConfiguration(), + numProcessRecordsWorkers = spec.numPartWorkers ?: 2, + numProcessBatchWorkers = spec.numObjectLoaders ?: 10 ) } } diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLSpecification.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLSpecification.kt index 23659024def21..e67c51e136ac5 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLSpecification.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/main/kotlin/io/airbyte/integrations/destination/mssql/v2/config/MSSQLSpecification.kt @@ -78,6 +78,12 @@ class MSSQLSpecification : ConfigurationSpecification(), LoadTypeSpecification { @get:JsonProperty("load_type") @get:JsonSchemaInject(json = """{"always_show": true,"order":8}""") override val loadType: LoadType = InsertLoadSpecification() + + @get:JsonProperty("numPartWorkers") + val numPartWorkers: Int? = null + + @get:JsonProperty("numObjectLoaders") + val numObjectLoaders: Int? = null } @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "name") diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLBulkLoadCheckTest.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLBulkLoaderCheckTest.kt similarity index 95% rename from airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLBulkLoadCheckTest.kt rename to airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLBulkLoaderCheckTest.kt index 192b6ea3a85d8..6446fbc75fb41 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLBulkLoadCheckTest.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLBulkLoaderCheckTest.kt @@ -10,7 +10,7 @@ import io.airbyte.cdk.load.test.util.FakeConfigurationUpdater import io.airbyte.integrations.destination.mssql.v2.config.MSSQLSpecification import java.nio.file.Path -class MSSQLBulkLoadCheckTest : +class MSSQLBulkLoaderCheckTest : CheckIntegrationTest( successConfigFilenames = listOf( diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLPerformanceTest.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLPerformanceTest.kt index e2f8ae9891ea7..853edf4ee7e76 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLPerformanceTest.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test-integration/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLPerformanceTest.kt @@ -110,4 +110,9 @@ class MSSQLBulkInsertPerformanceTest : getConfiguration = { spec, _ -> MSSQLConfigurationFactory().makeWithOverrides(spec as MSSQLSpecification, emptyMap()) }, - ) + ) { + @Test + override fun testInsertRecordsWithManyColumns() { + super.testInsertRecordsWithManyColumns() + } +} diff --git a/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLBulkLoadHandlerTest.kt b/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLBulkLoaderHandlerTest.kt similarity index 99% rename from airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLBulkLoadHandlerTest.kt rename to airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLBulkLoaderHandlerTest.kt index 41d9734024009..78f7adf9c4b13 100644 --- a/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLBulkLoadHandlerTest.kt +++ b/airbyte-integrations/connectors/destination-mssql-v2/src/test/kotlin/io/airbyte/integrations/destination/mssql/v2/MSSQLBulkLoaderHandlerTest.kt @@ -21,7 +21,7 @@ import org.junit.jupiter.api.Assertions.fail import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test -class MSSQLBulkLoadHandlerTest { +class MSSQLBulkLoaderHandlerTest { private lateinit var dataSource: DataSource private lateinit var connection: Connection