Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Load-CDK: Object-Storage: WriteOp override for cloud perf experiments #55810

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import io.airbyte.cdk.load.message.MultiProducerChannel
import io.airbyte.cdk.load.message.PartitionedQueue
import io.airbyte.cdk.load.message.PipelineEvent
import io.airbyte.cdk.load.message.StreamKey
import io.airbyte.cdk.load.message.StrictPartitionedQueue
import io.airbyte.cdk.load.pipeline.BatchUpdate
import io.airbyte.cdk.load.state.ReservationManager
import io.airbyte.cdk.load.state.Reserved
import io.airbyte.cdk.load.task.implementor.FileAggregateMessage
import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage
import io.airbyte.cdk.load.write.LoadStrategy
Expand Down Expand Up @@ -120,8 +120,8 @@ class SyncBeanFactory {
@Named("recordQueue")
fun recordQueue(
loadStrategy: LoadStrategy? = null,
): PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>> {
return PartitionedQueue(
): PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>> {
return StrictPartitionedQueue(
Array(loadStrategy?.inputPartitions ?: 1) {
ChannelMessageQueue(Channel(Channel.UNLIMITED))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,61 @@
package io.airbyte.cdk.load.message

import io.airbyte.cdk.load.util.CloseableCoroutine
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.merge

class PartitionedQueue<T>(private val queues: Array<MessageQueue<T>>) : CloseableCoroutine {
val partitions = queues.size
interface PartitionedQueue<T> : CloseableCoroutine {
val partitions: Int
fun consume(partition: Int): Flow<T>
suspend fun publish(value: T, partition: Int)
suspend fun broadcast(value: T)
}

class StrictPartitionedQueue<T>(private val queues: Array<MessageQueue<T>>) : PartitionedQueue<T> {
override val partitions = queues.size

fun consume(partition: Int): Flow<T> {
if (partition < 0 || partition >= queues.size) {
override fun consume(partition: Int): Flow<T> {
if (partition < 0 || partition >= partitions) {
throw IllegalArgumentException("Invalid partition: $partition")
}
return queues[partition].consume()
}

suspend fun publish(value: T, partition: Int) {
if (partition < 0 || partition >= queues.size) {
override suspend fun publish(value: T, partition: Int) {
if (partition < 0 || partition >= partitions) {
throw IllegalArgumentException("Invalid partition: $partition")
}
queues[partition].publish(value)
}

suspend fun broadcast(value: T) = queues.forEach { it.publish(value) }
override suspend fun broadcast(value: T) = queues.forEach { it.publish(value) }

override suspend fun close() {
queues.forEach { it.close() }
}
}

/**
* This is for the use case where you want workers to grab work as it becomes available but still be
* able to receive notifications that are guaranteed to be consumed by every partition.
*/
class SinglePartitionQueueWithMultiPartitionBroadcast<T>(
private val sharedQueue: MessageQueue<T>,
override val partitions: Int
) : PartitionedQueue<T> {
private val broadcastChannels =
StrictPartitionedQueue(
(0 until partitions).map { ChannelMessageQueue<T>(Channel(1)) }.toTypedArray()
)

override fun consume(partition: Int): Flow<T> =
merge(sharedQueue.consume(), broadcastChannels.consume(partition))
override suspend fun publish(value: T, partition: Int) = sharedQueue.publish(value)
override suspend fun broadcast(value: T) = broadcastChannels.broadcast(value)

override suspend fun close() {
sharedQueue.close()
broadcastChannels.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@ import io.airbyte.cdk.load.state.CheckpointId
/** Used internally by the CDK to pass messages between steps in the loader pipeline. */
sealed interface PipelineEvent<K : WithStream, T>

/**
* A message that contains a keyed payload. The key is used to manage the state of the payload's
* corresponding [io.airbyte.cdk.load.pipeline.BatchAccumulator]. [checkpointCounts] is used by the
* CDK to perform state message bookkeeping. [postProcessingCallback] is for releasing resources
* associated with the message.
*/
class PipelineMessage<K : WithStream, T>(
val checkpointCounts: Map<CheckpointId, Long>,
val key: K,
val value: T
val value: T,
val postProcessingCallback: suspend () -> Unit = {},
) : PipelineEvent<K, T>

/**
* We send the end message on the stream and not the key, because there's no way to partition an
* empty message.
*/
/** Broadcast at end-of-stream to all partitions to signal that the stream has ended. */
class PipelineEndOfStream<K : WithStream, T>(val stream: DestinationStream.Descriptor) :
PipelineEvent<K, T>
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,58 @@ package io.airbyte.cdk.load.pipeline
import io.airbyte.cdk.load.message.WithStream

/**
* [BatchAccumulator] is used internally by the CDK to implement RecordLoaders. Connector devs
* should never need to implement this interface.
* [BatchAccumulator] is used internally by the CDK to implement
* [io.airbyte.cdk.load.write.LoadStrategy]s. Connector devs should never need to implement this
* interface.
*
* It is the glue that connects a specific step in a specific pipeline to the generic pipeline on
* the back end. (For example, in a three-stage pipeline like bulk load, step 1 is to create a part,
* step 2 is to upload it, and step 3 is to load it from object storage into a table.)
*
* - [S] is a state type that will be threaded through accumulator calls.
* - [K] is a key type associated the input data. (NOTE: Currently, there is no support for
* key-mapping, so the key is always [io.airbyte.cdk.load.message.StreamKey]). Specifically, state
* will always be managed per-key.
* - [T] is the input data type
* - [U] is the output data type
*
* The first time data is seen for a given key, [start] is called (with the partition number). The
* state returned by [start] will be passed per input to [accept].
*
* If [accept] returns [IntermediateOutput] or [FinalOutput], the output will be forwarded to the
* next stage (if applicable) and/or trigger bookkeeping (iff the output type implements
* [io.airbyte.cdk.load.message.WithBatchState]).
*
* If [accept] returns a [NoOutput] or [IntermediateOutput], the nextState will be passed to the
* next call to [accept]. If [accept] returns a [FinalOutput] state, any state will be discarded and
* a new one will be created on the next input by a new call to [start].
*
* When the input stream is exhausted, [finish] will be called with any remaining state iff at least
* one input was seen for that key. This means that [finish] will not be called on empty keys or on
* keys where the last call to [accept] yielded a null (finished) state.
*/
interface BatchAccumulator<S, K : WithStream, T, U> {
fun start(key: K, part: Int): S
fun accept(record: T, state: S): Pair<S, U?>
fun finish(state: S): U
suspend fun start(key: K, part: Int): S
suspend fun accept(input: T, state: S): BatchAccumulatorResult<S, U>
suspend fun finish(state: S): FinalOutput<S, U>
}

sealed interface BatchAccumulatorResult<S, U> {
val nextState: S?
val output: U?
}

data class NoOutput<S, U>(
override val nextState: S,
) : BatchAccumulatorResult<S, U> {
override val output: U? = null
}

data class IntermediateOutput<S, U>(
override val nextState: S,
override val output: U,
) : BatchAccumulatorResult<S, U>

data class FinalOutput<S, U>(override val output: U) : BatchAccumulatorResult<S, U> {
override val nextState: S? = null
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,31 @@

package io.airbyte.cdk.load.pipeline

import io.airbyte.cdk.load.message.DestinationRecordRaw
import io.airbyte.cdk.load.message.PartitionedQueue
import io.airbyte.cdk.load.message.PipelineEvent
import io.airbyte.cdk.load.message.QueueWriter
import io.airbyte.cdk.load.message.StreamKey
import io.airbyte.cdk.load.state.Reserved
import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask
import io.airbyte.cdk.load.task.internal.LoadPipelineStepTaskFactory
import io.airbyte.cdk.load.write.DirectLoader
import io.airbyte.cdk.load.write.DirectLoaderFactory
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Value
import jakarta.inject.Named
import jakarta.inject.Singleton

@Singleton
@Requires(bean = DirectLoaderFactory::class)
class DirectLoadPipelineStep<S : DirectLoader>(
val accumulator: DirectLoadRecordAccumulator<S, StreamKey>,
@Named("recordQueue")
val inputQueue: PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
@Value("\${airbyte.destination.core.record-batch-size-override:null}")
val batchSizeOverride: Long? = null,
val directLoaderFactory: DirectLoaderFactory<S>,
val accumulator: DirectLoadRecordAccumulator<S, StreamKey>,
val taskFactory: LoadPipelineStepTaskFactory,
) : LoadPipelineStep {
private val log = KotlinLogging.logger {}

override val numWorkers: Int = directLoaderFactory.inputPartitions

override fun taskForPartition(partition: Int): LoadPipelineStepTask<*, *, *, *, *> {
log.info { "Creating DirectLoad pipeline step task for partition $partition" }
return LoadPipelineStepTask(
return taskFactory.createOnlyStep<S, StreamKey, DirectLoadAccResult>(
accumulator,
inputQueue.consume(partition),
batchUpdateQueue = batchQueue,
outputPartitioner = null,
outputQueue = null as PartitionedQueue<PipelineEvent<StreamKey, DirectLoadAccResult>>?,
batchSizeOverride?.let { RecordCountFlushStrategy(it) },
partition
partition,
numWorkers
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,24 @@ data class DirectLoadAccResult(override val state: Batch.State) : WithBatchState
class DirectLoadRecordAccumulator<S : DirectLoader, K : WithStream>(
val directLoaderFactory: DirectLoaderFactory<S>
) : BatchAccumulator<S, K, DestinationRecordRaw, DirectLoadAccResult> {
override fun start(key: K, part: Int): S {
override suspend fun start(key: K, part: Int): S {
return directLoaderFactory.create(key.stream, part)
}

override fun accept(record: DestinationRecordRaw, state: S): Pair<S, DirectLoadAccResult?> {
state.accept(record).let {
override suspend fun accept(
input: DestinationRecordRaw,
state: S
): BatchAccumulatorResult<S, DirectLoadAccResult> {
state.accept(input).let {
return when (it) {
is Incomplete -> Pair(state, null)
is Complete -> Pair(state, DirectLoadAccResult(Batch.State.COMPLETE))
is Incomplete -> NoOutput(state)
is Complete -> FinalOutput(DirectLoadAccResult(Batch.State.COMPLETE))
}
}
}

override fun finish(state: S): DirectLoadAccResult {
override suspend fun finish(state: S): FinalOutput<S, DirectLoadAccResult> {
state.finish()
return DirectLoadAccResult(Batch.State.COMPLETE)
return FinalOutput(DirectLoadAccResult(Batch.State.COMPLETE))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ interface InputPartitioner {
fun getPartition(record: DestinationRecordRaw, numParts: Int): Int
}

/**
* The default input partitioner, which partitions by the stream name. TODO: Should be round-robin?
*/
@Singleton
@Secondary
class ByStreamInputPartitioner : InputPartitioner {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.pipeline

import io.airbyte.cdk.load.message.DestinationRecordRaw
import kotlin.random.Random

/**
* Declare a singleton of this type to have input distributed evenly across the input partitions.
* (The default is to [ByStreamInputPartitioner].)
*
* [rotateEveryNRecords] determines how often to rotate to the next partition. In testing, 10_000
* seems to be the sweet spot between too much context switching and not enough load balancing.
*/
open class RoundRobinInputPartitioner(private val rotateEveryNRecords: Int = 10_000) :
InputPartitioner {
private var nextPartition =
Random(System.currentTimeMillis()).nextInt(Int.MAX_VALUE / rotateEveryNRecords) *
rotateEveryNRecords

override fun getPartition(record: DestinationRecordRaw, numParts: Int): Int {
val part = nextPartition++ / rotateEveryNRecords
return Math.floorMod(part, numParts)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class DefaultDestinationTaskLauncher<K : WithStream>(
// New interface shim
@Named("recordQueue")
private val recordQueueForPipeline:
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>>,
@Named("batchStateUpdateQueue") private val batchUpdateQueue: ChannelMessageQueue<BatchUpdate>,
private val loadPipeline: LoadPipeline?,
private val partitioner: InputPartitioner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class DefaultTeardownTask(
return
}

log.info { "All streams processed successfully, awaiting all checkpoints flushed" }
checkpointManager.awaitAllCheckpointsFlushed()
log.info { "Starting teardown task" }
destination.teardown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class DefaultInputConsumerTask(
// Required by new interface
@Named("recordQueue")
private val recordQueueForPipeline:
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>>,
private val loadPipeline: LoadPipeline? = null,
private val partitioner: InputPartitioner,
private val openStreamQueue: QueueWriter<DestinationStream>
Expand Down Expand Up @@ -165,20 +165,20 @@ class DefaultInputConsumerTask(
mapOf(manager.getCurrentCheckpointId() to 1),
StreamKey(stream),
record
)
) { reserved.release() }
val partition = partitioner.getPartition(record, recordQueueForPipeline.partitions)
recordQueueForPipeline.publish(reserved.replace(pipelineMessage), partition)
recordQueueForPipeline.publish(pipelineMessage, partition)
}
is DestinationRecordStreamComplete -> {
manager.markEndOfStream(true)
log.info { "Read COMPLETE for stream $stream" }
recordQueueForPipeline.broadcast(reserved.replace(PipelineEndOfStream(stream)))
recordQueueForPipeline.broadcast(PipelineEndOfStream(stream))
reserved.release()
}
is DestinationRecordStreamIncomplete -> {
manager.markEndOfStream(false)
log.info { "Read INCOMPLETE for stream $stream" }
recordQueueForPipeline.broadcast(reserved.replace(PipelineEndOfStream(stream)))
recordQueueForPipeline.broadcast(PipelineEndOfStream(stream))
reserved.release()
}
is DestinationFile -> {
Expand Down Expand Up @@ -309,8 +309,7 @@ interface InputConsumerTaskFactory {
fileTransferQueue: MessageQueue<FileTransferQueueMessage>,

// Required by new interface
recordQueueForPipeline:
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
recordQueueForPipeline: PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>>,
loadPipeline: LoadPipeline?,
partitioner: InputPartitioner,
openStreamQueue: QueueWriter<DestinationStream>,
Expand All @@ -332,8 +331,7 @@ class DefaultInputConsumerTaskFactory(
fileTransferQueue: MessageQueue<FileTransferQueueMessage>,

// Required by new interface
recordQueueForPipeline:
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
recordQueueForPipeline: PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>>,
loadPipeline: LoadPipeline?,
partitioner: InputPartitioner,
openStreamQueue: QueueWriter<DestinationStream>,
Expand Down
Loading
Loading