From 1b72de187a02ff06539f4a0bb18ad3ef9c5b49f6 Mon Sep 17 00:00:00 2001
From: Johnny Schmidt <john.schmidt@airbyte.io>
Date: Fri, 7 Mar 2025 09:11:23 -0800
Subject: [PATCH 1/2] S3 Destination Uses New Load CDK Interface (temporarily
 disabled)

---
 .../cdk/load/config/SyncBeanFactory.kt        |   3 +-
 .../airbyte/cdk/load/message/PipelineEvent.kt |  14 +-
 .../cdk/load/pipeline/BatchAccumulator.kt     |  37 ++-
 .../load/pipeline/DirectLoadPipelineStep.kt   |   4 +-
 .../pipeline/DirectLoadRecordAccumulator.kt   |  14 +-
 .../cdk/load/pipeline/InputPartitioner.kt     |   3 +
 .../pipeline/RoundRobinInputPartitioner.kt    |  29 +++
 .../cdk/load/task/DestinationTaskLauncher.kt  |   2 +-
 .../load/task/internal/InputConsumerTask.kt   |  14 +-
 .../task/internal/LoadPipelineStepTask.kt     |  77 +++++--
 .../pipeline/RoundRobinPartitionerTest.kt     |  34 +++
 .../load/task/DestinationTaskLauncherTest.kt  |   2 +-
 .../load/task/DestinationTaskLauncherUTest.kt |   2 +-
 .../task/internal/InputConsumerTaskUTest.kt   |   2 +-
 .../internal/LoadPipelineStepTaskUTest.kt     | 217 +++++++++---------
 .../load/file/object_storage/PartFactory.kt   |  12 +-
 .../ObjectLoaderPartPartitioner.kt            |  40 ++++
 .../object_storage/ObjectLoaderPartQueue.kt   |  62 +++++
 .../object_storage/ObjectLoaderPartStep.kt    |  48 ++++
 .../ObjectLoaderPartToObjectAccumulator.kt    |  86 +++++++
 .../object_storage/ObjectLoaderPipeline.kt    |  15 ++
 .../ObjectLoaderRecordToPartAccumulator.kt    | 115 ++++++++++
 .../object_storage/ObjectLoaderUploadStep.kt  |  46 ++++
 .../load/write/object_storage/ObjectLoader.kt |  67 ++++++
 .../ObjectLoaderPartPartitionerTest.kt        |  49 ++++
 .../connectors/destination-s3/metadata.yaml   |   2 +-
 .../src/main/kotlin/S3V2Configuration.kt      |   9 +-
 .../src/main/kotlin/S3V2ObjectLoader.kt       |  25 ++
 docs/integrations/destinations/s3.md          |   1 +
 29 files changed, 859 insertions(+), 172 deletions(-)
 create mode 100644 airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/RoundRobinInputPartitioner.kt
 create mode 100644 airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/pipeline/RoundRobinPartitionerTest.kt
 create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartPartitioner.kt
 create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartQueue.kt
 create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartStep.kt
 create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartToObjectAccumulator.kt
 create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPipeline.kt
 create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderRecordToPartAccumulator.kt
 create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderUploadStep.kt
 create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectLoader.kt
 create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/pipeline/object_storage/ObjectLoaderPartPartitionerTest.kt
 create mode 100644 airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2ObjectLoader.kt

diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt
index 07b2d90bfd912..b45002fc0270e 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt
@@ -16,7 +16,6 @@ import io.airbyte.cdk.load.message.PipelineEvent
 import io.airbyte.cdk.load.message.StreamKey
 import io.airbyte.cdk.load.pipeline.BatchUpdate
 import io.airbyte.cdk.load.state.ReservationManager
-import io.airbyte.cdk.load.state.Reserved
 import io.airbyte.cdk.load.task.implementor.FileAggregateMessage
 import io.airbyte.cdk.load.task.implementor.FileTransferQueueMessage
 import io.airbyte.cdk.load.write.LoadStrategy
@@ -120,7 +119,7 @@ class SyncBeanFactory {
     @Named("recordQueue")
     fun recordQueue(
         loadStrategy: LoadStrategy? = null,
-    ): PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>> {
+    ): PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>> {
         return PartitionedQueue(
             Array(loadStrategy?.inputPartitions ?: 1) {
                 ChannelMessageQueue(Channel(Channel.UNLIMITED))
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PipelineEvent.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PipelineEvent.kt
index 9152fa9a94b1a..fa3bb57a9d817 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PipelineEvent.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PipelineEvent.kt
@@ -10,15 +10,19 @@ import io.airbyte.cdk.load.state.CheckpointId
 /** Used internally by the CDK to pass messages between steps in the loader pipeline. */
 sealed interface PipelineEvent<K : WithStream, T>
 
+/**
+ * A message that contains a keyed payload. The key is used to manage the state of the payload's
+ * corresponding [io.airbyte.cdk.load.pipeline.BatchAccumulator]. [checkpointCounts] is used by the
+ * CDK to perform state message bookkeeping. [postProcessingCallback] is for releasing resources
+ * associated with the message.
+ */
 class PipelineMessage<K : WithStream, T>(
     val checkpointCounts: Map<CheckpointId, Long>,
     val key: K,
-    val value: T
+    val value: T,
+    val postProcessingCallback: suspend () -> Unit = {},
 ) : PipelineEvent<K, T>
 
-/**
- * We send the end message on the stream and not the key, because there's no way to partition an
- * empty message.
- */
+/** Broadcast at end-of-stream to all partitions to signal that the stream has ended. */
 class PipelineEndOfStream<K : WithStream, T>(val stream: DestinationStream.Descriptor) :
     PipelineEvent<K, T>
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/BatchAccumulator.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/BatchAccumulator.kt
index fe2f03dcc6f71..1713eab1da797 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/BatchAccumulator.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/BatchAccumulator.kt
@@ -7,11 +7,38 @@ package io.airbyte.cdk.load.pipeline
 import io.airbyte.cdk.load.message.WithStream
 
 /**
- * [BatchAccumulator] is used internally by the CDK to implement RecordLoaders. Connector devs
- * should never need to implement this interface.
+ * [BatchAccumulator] is used internally by the CDK to implement
+ * [io.airbyte.cdk.load.write.LoadStrategy]s. Connector devs should never need to implement this
+ * interface.
+ *
+ * It is the glue that connects a specific step in a specific pipeline to the generic pipeline on
+ * the back end. (For example, in a three-stage pipeline like bulk load, step 1 is to create a part,
+ * step 2 is to upload it, and step 3 is to load it from object storage into a table.)
+ *
+ * - [S] is a state type that will be threaded through accumulator calls.
+ * - [K] is a key type associated the input data. (NOTE: Currently, there is no support for
+ * key-mapping, so the key is always [io.airbyte.cdk.load.message.StreamKey]). Specifically, state
+ * will always be managed per-key.
+ * - [T] is the input data type
+ * - [U] is the output data type
+ *
+ * The first time data is seen for a given key, [start] is called (with the partition number). The
+ * state returned by [start] will be passed per input to [accept].
+ *
+ * If [accept] returns a non-null output, that output will be forwarded to the next stage (if
+ * applicable) and/or trigger bookkeeping (iff the output type implements
+ * [io.airbyte.cdk.load.message.WithBatchState]).
+ *
+ * If [accept] returns a non-null state, that state will be passed to the next call to [accept]. If
+ * [accept] returns a null state, the state will be discarded and a new one will be created on the
+ * next input by a new call to [start].
+ *
+ * When the input stream is exhausted, [finish] will be called with any remaining state iff at least
+ * one input was seen for that key. This means that [finish] will not be called on empty keys or on
+ * keys where the last call to [accept] yielded a null (finished) state.
  */
 interface BatchAccumulator<S, K : WithStream, T, U> {
-    fun start(key: K, part: Int): S
-    fun accept(record: T, state: S): Pair<S, U?>
-    fun finish(state: S): U
+    suspend fun start(key: K, part: Int): S
+    suspend fun accept(input: T, state: S): Pair<S?, U?>
+    suspend fun finish(state: S): U
 }
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadPipelineStep.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadPipelineStep.kt
index c611e103458ce..d553fed35d026 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadPipelineStep.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadPipelineStep.kt
@@ -9,7 +9,6 @@ import io.airbyte.cdk.load.message.PartitionedQueue
 import io.airbyte.cdk.load.message.PipelineEvent
 import io.airbyte.cdk.load.message.QueueWriter
 import io.airbyte.cdk.load.message.StreamKey
-import io.airbyte.cdk.load.state.Reserved
 import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask
 import io.airbyte.cdk.load.write.DirectLoader
 import io.airbyte.cdk.load.write.DirectLoaderFactory
@@ -24,8 +23,7 @@ import jakarta.inject.Singleton
 class DirectLoadPipelineStep<S : DirectLoader>(
     val accumulator: DirectLoadRecordAccumulator<S, StreamKey>,
     @Named("recordQueue")
-    val inputQueue:
-        PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
+    val inputQueue: PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
     @Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
     @Value("\${airbyte.destination.core.record-batch-size-override:null}")
     val batchSizeOverride: Long? = null,
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadRecordAccumulator.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadRecordAccumulator.kt
index e1f964919da8c..5752e0e3b7309 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadRecordAccumulator.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadRecordAccumulator.kt
@@ -26,23 +26,23 @@ data class DirectLoadAccResult(override val state: Batch.State) : WithBatchState
 class DirectLoadRecordAccumulator<S : DirectLoader, K : WithStream>(
     val directLoaderFactory: DirectLoaderFactory<S>
 ) : BatchAccumulator<S, K, DestinationRecordAirbyteValue, DirectLoadAccResult> {
-    override fun start(key: K, part: Int): S {
+    override suspend fun start(key: K, part: Int): S {
         return directLoaderFactory.create(key.stream, part)
     }
 
-    override fun accept(
-        record: DestinationRecordAirbyteValue,
+    override suspend fun accept(
+        input: DestinationRecordAirbyteValue,
         state: S
-    ): Pair<S, DirectLoadAccResult?> {
-        state.accept(record).let {
+    ): Pair<S?, DirectLoadAccResult?> {
+        state.accept(input).let {
             return when (it) {
                 is Incomplete -> Pair(state, null)
-                is Complete -> Pair(state, DirectLoadAccResult(Batch.State.COMPLETE))
+                is Complete -> Pair(null, DirectLoadAccResult(Batch.State.COMPLETE))
             }
         }
     }
 
-    override fun finish(state: S): DirectLoadAccResult {
+    override suspend fun finish(state: S): DirectLoadAccResult {
         state.finish()
         return DirectLoadAccResult(Batch.State.COMPLETE)
     }
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt
index 19189e7d33a44..d1aa786a88f5a 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt
@@ -17,6 +17,9 @@ interface InputPartitioner {
     fun getPartition(record: DestinationRecordAirbyteValue, numParts: Int): Int
 }
 
+/**
+ * The default input partitioner, which partitions by the stream name. TODO: Should be round-robin?
+ */
 @Singleton
 @Secondary
 class ByStreamInputPartitioner : InputPartitioner {
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/RoundRobinInputPartitioner.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/RoundRobinInputPartitioner.kt
new file mode 100644
index 0000000000000..f9b4dcf31e970
--- /dev/null
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/RoundRobinInputPartitioner.kt
@@ -0,0 +1,29 @@
+/*
+ * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.cdk.load.pipeline
+
+import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
+import kotlin.math.abs
+import kotlin.random.Random
+
+/**
+ * Declare a singleton of this type to have input distributed evenly across the input partitions.
+ * (The default is to [ByStreamInputPartitioner].)
+ */
+open class RoundRobinInputPartitioner(private val rotateEveryNRecords: Int = 10_000) :
+    InputPartitioner {
+    private var nextPartition =
+        Random(System.currentTimeMillis()).nextInt(Int.MAX_VALUE / rotateEveryNRecords) *
+            rotateEveryNRecords
+
+    override fun getPartition(record: DestinationRecordAirbyteValue, numParts: Int): Int {
+        val part = nextPartition++ / rotateEveryNRecords
+        return if (part == Int.MIN_VALUE) { // avoid overflow
+            0
+        } else {
+            abs(part) % numParts
+        }
+    }
+}
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt
index 9d67c39e4ebcb..6cd2d48c31482 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt
@@ -145,7 +145,7 @@ class DefaultDestinationTaskLauncher<K : WithStream>(
     // New interface shim
     @Named("recordQueue")
     private val recordQueueForPipeline:
-        PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
+        PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
     @Named("batchStateUpdateQueue") private val batchUpdateQueue: ChannelMessageQueue<BatchUpdate>,
     private val loadPipeline: LoadPipeline?,
     private val partitioner: InputPartitioner,
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt
index 0f149c96e890b..3a296117a8284 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt
@@ -80,7 +80,7 @@ class DefaultInputConsumerTask(
     // Required by new interface
     @Named("recordQueue")
     private val recordQueueForPipeline:
-        PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
+        PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
     private val loadPipeline: LoadPipeline? = null,
     private val partitioner: InputPartitioner,
     private val openStreamQueue: QueueWriter<DestinationStream>
@@ -165,20 +165,20 @@ class DefaultInputConsumerTask(
                         mapOf(manager.getCurrentCheckpointId() to 1),
                         StreamKey(stream),
                         record
-                    )
+                    ) { reserved.release() }
                 val partition = partitioner.getPartition(record, recordQueueForPipeline.partitions)
-                recordQueueForPipeline.publish(reserved.replace(pipelineMessage), partition)
+                recordQueueForPipeline.publish(pipelineMessage, partition)
             }
             is DestinationRecordStreamComplete -> {
                 manager.markEndOfStream(true)
                 log.info { "Read COMPLETE for stream $stream" }
-                recordQueueForPipeline.broadcast(reserved.replace(PipelineEndOfStream(stream)))
+                recordQueueForPipeline.broadcast(PipelineEndOfStream(stream))
                 reserved.release()
             }
             is DestinationRecordStreamIncomplete -> {
                 manager.markEndOfStream(false)
                 log.info { "Read INCOMPLETE for stream $stream" }
-                recordQueueForPipeline.broadcast(reserved.replace(PipelineEndOfStream(stream)))
+                recordQueueForPipeline.broadcast(PipelineEndOfStream(stream))
                 reserved.release()
             }
             is DestinationFile -> {
@@ -310,7 +310,7 @@ interface InputConsumerTaskFactory {
 
         // Required by new interface
         recordQueueForPipeline:
-            PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
+            PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
         loadPipeline: LoadPipeline?,
         partitioner: InputPartitioner,
         openStreamQueue: QueueWriter<DestinationStream>,
@@ -333,7 +333,7 @@ class DefaultInputConsumerTaskFactory(
 
         // Required by new interface
         recordQueueForPipeline:
-            PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
+            PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
         loadPipeline: LoadPipeline?,
         partitioner: InputPartitioner,
         openStreamQueue: QueueWriter<DestinationStream>,
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt
index 3139a44f4f89c..5f5200d10bcf3 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt
@@ -18,7 +18,6 @@ import io.airbyte.cdk.load.pipeline.BatchUpdate
 import io.airbyte.cdk.load.pipeline.OutputPartitioner
 import io.airbyte.cdk.load.pipeline.PipelineFlushStrategy
 import io.airbyte.cdk.load.state.CheckpointId
-import io.airbyte.cdk.load.state.Reserved
 import io.airbyte.cdk.load.task.OnEndOfSync
 import io.airbyte.cdk.load.task.Task
 import io.airbyte.cdk.load.task.TerminalCondition
@@ -34,7 +33,7 @@ data class RangeState<S>(
 /** A long-running task that actually implements a load pipeline step. */
 class LoadPipelineStepTask<S : AutoCloseable, K1 : WithStream, T, K2 : WithStream, U : Any>(
     private val batchAccumulator: BatchAccumulator<S, K1, T, U>,
-    private val inputFlow: Flow<Reserved<PipelineEvent<K1, T>>>,
+    private val inputFlow: Flow<PipelineEvent<K1, T>>,
     private val batchUpdateQueue: QueueWriter<BatchUpdate>,
     private val outputPartitioner: OutputPartitioner<K1, T, K2, U>?,
     private val outputQueue: PartitionedQueue<PipelineEvent<K2, U>>?,
@@ -44,11 +43,11 @@ class LoadPipelineStepTask<S : AutoCloseable, K1 : WithStream, T, K2 : WithStrea
     override val terminalCondition: TerminalCondition = OnEndOfSync
 
     override suspend fun execute() {
-        inputFlow.fold(mutableMapOf<K1, RangeState<S>>()) { stateStore, reservation ->
+        inputFlow.fold(mutableMapOf<K1, RangeState<S>>()) { stateStore, input ->
             try {
-                when (val input = reservation.value) {
+                when (input) {
                     is PipelineMessage -> {
-                        // Fetch and update the local state associated with the current batch.
+                        // Get or create the accumulator state associated w/ the input key.
                         val state =
                             stateStore
                                 .getOrPut(input.key) {
@@ -57,34 +56,62 @@ class LoadPipelineStepTask<S : AutoCloseable, K1 : WithStream, T, K2 : WithStrea
                                     )
                                 }
                                 .let { it.copy(inputCount = it.inputCount + 1) }
-                        val (newState, output) =
+
+                        // Accumulate the input and get the new state and output.
+                        val (newStateMaybe, outputMaybe) =
                             batchAccumulator.accept(
                                 input.value,
                                 state.state,
                             )
-                        reservation.release() // TODO: Accumulate and release when persisted
+                        /** TODO: Make this impossible at the return type level */
+                        if (newStateMaybe == null && outputMaybe == null) {
+                            throw IllegalStateException(
+                                "BatchAccumulator must return a new state or an output"
+                            )
+                        }
+
+                        // Update bookkeeping metadata
+                        input
+                            .postProcessingCallback() // TODO: Accumulate and release when persisted
                         input.checkpointCounts.forEach {
                             state.checkpointCounts.merge(it.key, it.value) { old, new -> old + new }
                         }
 
-                        // If the accumulator did not produce a result, check if we should flush.
-                        // If so, use the result of a finish call as the output.
-                        val finalOutput =
-                            output
-                                ?: if (flushStrategy?.shouldFlush(state.inputCount) == true) {
-                                    batchAccumulator.finish(newState)
+                        // Finalize the state and output
+                        val (finalState, finalOutput) =
+                            if (outputMaybe == null) {
+                                // Possibly force an output (and if so, discard the state)
+                                if (flushStrategy?.shouldFlush(state.inputCount) == true) {
+                                    val finalOutput = batchAccumulator.finish(newStateMaybe!!)
+                                    Pair(null, finalOutput)
                                 } else {
-                                    null
+                                    Pair(newStateMaybe, null)
                                 }
+                            } else {
+                                // Otherwise, just use what we were given
+                                Pair(newStateMaybe, outputMaybe)
+                            }
 
-                        if (finalOutput != null) {
-                            // Publish the emitted output and evict the state.
-                            handleOutput(input.key, state.checkpointCounts, finalOutput)
-                            stateStore.remove(input.key)
+                        // Publish the output if there is one & reset the input count
+                        val inputCount =
+                            if (finalOutput != null) {
+                                // Publish the emitted output and evict the state.
+                                handleOutput(input.key, state.checkpointCounts, finalOutput)
+                                state.checkpointCounts.clear()
+                                0
+                            } else {
+                                state.inputCount
+                            }
+
+                        // Update the state if `accept` returned a new state, otherwise evict.
+                        if (finalState != null) {
+                            // If accept returned a new state, update the state store.
+                            stateStore[input.key] =
+                                state.copy(state = finalState, inputCount = inputCount)
                         } else {
-                            // If there's no output yet, just update the local state.
-                            stateStore[input.key] = RangeState(newState, state.checkpointCounts)
+                            stateStore.remove(input.key)
                         }
+
                         stateStore
                     }
                     is PipelineEndOfStream -> {
@@ -92,8 +119,10 @@ class LoadPipelineStepTask<S : AutoCloseable, K1 : WithStream, T, K2 : WithStrea
                         val keysToRemove = stateStore.keys.filter { it.stream == input.stream }
                         keysToRemove.forEach { key ->
                             stateStore.remove(key)?.let { stored ->
-                                val output = batchAccumulator.finish(stored.state)
-                                handleOutput(key, stored.checkpointCounts, output)
+                                if (stored.inputCount > 0) {
+                                    val output = batchAccumulator.finish(stored.state)
+                                    handleOutput(key, stored.checkpointCounts, output)
+                                }
                             }
                         }
 
@@ -122,7 +151,7 @@ class LoadPipelineStepTask<S : AutoCloseable, K1 : WithStream, T, K2 : WithStrea
         // Only publish the output if there's a next step.
         outputQueue?.let {
             val outputKey = outputPartitioner!!.getOutputKey(inputKey, output)
-            val message = PipelineMessage(checkpointCounts, outputKey, output)
+            val message = PipelineMessage(checkpointCounts.toMap(), outputKey, output)
             val outputPart = outputPartitioner.getPart(outputKey, it.partitions)
             it.publish(message, outputPart)
         }
@@ -132,7 +161,7 @@ class LoadPipelineStepTask<S : AutoCloseable, K1 : WithStream, T, K2 : WithStrea
             val update =
                 BatchStateUpdate(
                     stream = inputKey.stream,
-                    checkpointCounts = checkpointCounts,
+                    checkpointCounts = checkpointCounts.toMap(),
                     state = output.state
                 )
             batchUpdateQueue.publish(update)
diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/pipeline/RoundRobinPartitionerTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/pipeline/RoundRobinPartitionerTest.kt
new file mode 100644
index 0000000000000..e733d3d398447
--- /dev/null
+++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/pipeline/RoundRobinPartitionerTest.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.cdk.load.pipeline
+
+import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
+import io.mockk.mockk
+import kotlin.test.assertEquals
+import org.junit.jupiter.api.Test
+
+class RoundRobinPartitionerTest {
+    @Test
+    fun `partitioner should round-robin`() {
+        val partitioner = RoundRobinInputPartitioner(rotateEveryNRecords = 5)
+        val record = mockk<DestinationRecordAirbyteValue>()
+        val numParts = 3
+        var lastPartition: Int? = null
+        var recordCount = 0
+        repeat(1000) {
+            val partition = partitioner.getPartition(record, numParts)
+            lastPartition?.let { last ->
+                recordCount++
+                if (recordCount == 5) {
+                    recordCount = 0
+                    assertEquals((last + 1) % numParts, partition)
+                } else {
+                    assertEquals(last, partition)
+                }
+            }
+            lastPartition = partition
+        }
+    }
+}
diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt
index ce7e08a847dcc..010238be5c0a5 100644
--- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt
+++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt
@@ -159,7 +159,7 @@ class DestinationTaskLauncherTest {
             destinationTaskLauncher: DestinationTaskLauncher,
             fileTransferQueue: MessageQueue<FileTransferQueueMessage>,
             recordQueueForPipeline:
-                PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
+                PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
             loadPipeline: LoadPipeline?,
             partitioner: InputPartitioner,
             openStreamQueue: QueueWriter<DestinationStream>,
diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt
index 02a85819668fd..adc8920832548 100644
--- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt
+++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt
@@ -95,7 +95,7 @@ class DestinationTaskLauncherUTest {
     private val openStreamQueue: MessageQueue<DestinationStream> = mockk(relaxed = true)
 
     private val recordQueueForPipeline:
-        PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>> =
+        PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>> =
         mockk(relaxed = true)
     private val batchUpdateQueue: ChannelMessageQueue<BatchUpdate> = mockk(relaxed = true)
     private val partitioner: InputPartitioner = mockk(relaxed = true)
diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskUTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskUTest.kt
index d9fe8a7e3947d..3cf9bbc30c832 100644
--- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskUTest.kt
+++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskUTest.kt
@@ -47,7 +47,7 @@ class InputConsumerTaskUTest {
     @MockK lateinit var fileTransferQueue: MessageQueue<FileTransferQueueMessage>
     @MockK
     lateinit var recordQueueForPipeline:
-        PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>
+        PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>
     @MockK lateinit var partitioner: InputPartitioner
     @MockK lateinit var openStreamQueue: QueueWriter<DestinationStream>
 
diff --git a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTaskUTest.kt b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTaskUTest.kt
index 99a806360279f..a50a30df6ae33 100644
--- a/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTaskUTest.kt
+++ b/airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTaskUTest.kt
@@ -16,13 +16,10 @@ import io.airbyte.cdk.load.pipeline.BatchAccumulator
 import io.airbyte.cdk.load.pipeline.BatchStateUpdate
 import io.airbyte.cdk.load.pipeline.BatchUpdate
 import io.airbyte.cdk.load.state.CheckpointId
-import io.airbyte.cdk.load.state.Reserved
 import io.airbyte.cdk.load.util.setOnce
 import io.mockk.coEvery
 import io.mockk.coVerify
-import io.mockk.every
 import io.mockk.impl.annotations.MockK
-import io.mockk.verify
 import java.util.concurrent.atomic.AtomicBoolean
 import kotlinx.coroutines.flow.Flow
 import kotlinx.coroutines.flow.FlowCollector
@@ -37,7 +34,7 @@ class LoadPipelineStepTaskUTest {
     @MockK
     lateinit var batchAccumulatorWithUpdate:
         BatchAccumulator<AutoCloseable, StreamKey, String, MyBatch>
-    @MockK lateinit var inputFlow: Flow<Reserved<PipelineEvent<StreamKey, String>>>
+    @MockK lateinit var inputFlow: Flow<PipelineEvent<StreamKey, String>>
     @MockK lateinit var batchUpdateQueue: QueueWriter<BatchUpdate>
 
     data class Closeable(val id: Int = 0) : AutoCloseable {
@@ -66,18 +63,17 @@ class LoadPipelineStepTaskUTest {
             part
         )
 
-    private fun <T> reserved(value: T): Reserved<T> = Reserved(null, 0L, value)
     private fun messageEvent(
         key: StreamKey,
         value: String,
         counts: Map<Int, Long> = emptyMap()
-    ): Reserved<PipelineEvent<StreamKey, String>> =
-        reserved(PipelineMessage(counts.mapKeys { CheckpointId(it.key) }, key, value))
-    private fun endOfStreamEvent(key: StreamKey): Reserved<PipelineEvent<StreamKey, String>> =
-        reserved(PipelineEndOfStream(key.stream))
+    ): PipelineEvent<StreamKey, String> =
+        PipelineMessage(counts.mapKeys { CheckpointId(it.key) }, key, value)
+    private fun endOfStreamEvent(key: StreamKey): PipelineEvent<StreamKey, String> =
+        PipelineEndOfStream(key.stream)
 
     @Test
-    fun `start and accept called on first no-output message, accept only on second`() = runTest {
+    fun `start only called on first message if state returned by accept is null`() = runTest {
         val key = StreamKey(DestinationStream.Descriptor("namespace", "stream"))
         val part = 7
         val task = createTask(part, batchAccumulatorNoUpdate)
@@ -85,72 +81,75 @@ class LoadPipelineStepTaskUTest {
         // No call to accept will finish the batch, but state will be threaded through.
         val state1 = Closeable(1)
         val state2 = Closeable(2)
-        every { batchAccumulatorNoUpdate.start(any(), any()) } returns state1
-        every { batchAccumulatorNoUpdate.accept("value_0", state1) } returns Pair(state2, null)
-        every { batchAccumulatorNoUpdate.accept("value_1", state2) } returns Pair(Closeable(), null)
+        coEvery { batchAccumulatorNoUpdate.start(any(), any()) } returns state1
+        coEvery { batchAccumulatorNoUpdate.accept("value_0", state1) } returns Pair(state2, null)
+        coEvery { batchAccumulatorNoUpdate.accept("value_1", state2) } returns
+            Pair(Closeable(), null)
 
         coEvery { inputFlow.collect(any()) } coAnswers
             {
-                val collector =
-                    firstArg<FlowCollector<Reserved<PipelineEvent<StreamKey, String>>>>()
+                val collector = firstArg<FlowCollector<PipelineEvent<StreamKey, String>>>()
                 repeat(2) { collector.emit(messageEvent(key, "value_$it")) }
             }
 
         task.execute()
 
-        verify(exactly = 1) { batchAccumulatorNoUpdate.start(key, part) }
-        repeat(2) { verify(exactly = 1) { batchAccumulatorNoUpdate.accept("value_$it", any()) } }
-        verify(exactly = 0) { batchAccumulatorNoUpdate.finish(any()) }
+        coVerify(exactly = 1) { batchAccumulatorNoUpdate.start(key, part) }
+        repeat(2) { coVerify(exactly = 1) { batchAccumulatorNoUpdate.accept("value_$it", any()) } }
+        coVerify(exactly = 0) { batchAccumulatorNoUpdate.finish(any()) }
     }
 
     @Test
-    fun `start called again after batch completes (no update)`() = runTest {
-        val key = StreamKey(DestinationStream.Descriptor("namespace", "stream"))
-        val part = 6
-        val task = createTask(part, batchAccumulatorNoUpdate)
-        val stateA1 = Closeable(1)
-        val stateA2 = Closeable(2)
-        val stateA3 = Closeable(3)
-        val stateB1 = Closeable(4)
-        val stateB2 = Closeable(5)
-        val startHasBeenCalled = AtomicBoolean(false)
-
-        every { batchAccumulatorNoUpdate.start(any(), any()) } answers
-            {
-                if (startHasBeenCalled.setOnce()) stateA1 else stateB1
-            }
-        every { batchAccumulatorNoUpdate.accept("value_0", stateA1) } returns Pair(stateA2, null)
-        every { batchAccumulatorNoUpdate.accept("value_1", stateA2) } returns Pair(stateA3, true)
-        every { batchAccumulatorNoUpdate.accept("value_2", stateB1) } returns Pair(stateB2, null)
+    fun `start called again only after null state returned, even if accept yields output`() =
+        runTest {
+            val key = StreamKey(DestinationStream.Descriptor("namespace", "stream"))
+            val part = 6
+            val task = createTask(part, batchAccumulatorNoUpdate)
+            val stateA1 = Closeable(1)
+            val stateA2 = Closeable(2)
+            val stateB1 = Closeable(4)
+            val stateB2 = Closeable(5)
+            val startHasBeenCalled = AtomicBoolean(false)
 
-        coEvery { inputFlow.collect(any()) } coAnswers
-            {
-                val collector =
-                    firstArg<FlowCollector<Reserved<PipelineEvent<StreamKey, String>>>>()
-                repeat(3) { collector.emit(messageEvent(key, "value_$it")) }
-            }
+            coEvery { batchAccumulatorNoUpdate.start(any(), any()) } answers
+                {
+                    if (startHasBeenCalled.setOnce()) stateA1 else stateB1
+                }
+            coEvery { batchAccumulatorNoUpdate.accept("value_0", stateA1) } returns
+                Pair(stateA2, true)
+            coEvery { batchAccumulatorNoUpdate.accept("value_1", stateA2) } returns Pair(null, true)
+            coEvery { batchAccumulatorNoUpdate.accept("value_2", stateB1) } returns
+                Pair(stateB2, null)
 
-        task.execute()
-        verify(exactly = 2) { batchAccumulatorNoUpdate.start(key, part) }
-        repeat(3) { verify(exactly = 1) { batchAccumulatorNoUpdate.accept("value_$it", any()) } }
-    }
+            coEvery { inputFlow.collect(any()) } coAnswers
+                {
+                    val collector = firstArg<FlowCollector<PipelineEvent<StreamKey, String>>>()
+                    repeat(3) { collector.emit(messageEvent(key, "value_$it")) }
+                }
+
+            task.execute()
+            coVerify(exactly = 2) { batchAccumulatorNoUpdate.start(key, part) }
+            repeat(3) {
+                coVerify(exactly = 1) { batchAccumulatorNoUpdate.accept("value_$it", any()) }
+            }
+        }
 
     @Test
-    fun `finish and update called on end-of-stream when last accept did not yield output`() =
+    fun `finish and update called on end-of-stream iff last accept returned a non-null state`() =
         runTest {
             val key = StreamKey(DestinationStream.Descriptor("namespace", "stream"))
             val part = 5
             val task = createTask(part, batchAccumulatorNoUpdate)
 
-            every { batchAccumulatorNoUpdate.start(any(), any()) } returns Closeable()
-            every { batchAccumulatorNoUpdate.accept(any(), any()) } returns Pair(Closeable(), null)
-            every { batchAccumulatorNoUpdate.finish(any()) } returns true
+            coEvery { batchAccumulatorNoUpdate.start(any(), any()) } returns Closeable()
+            coEvery { batchAccumulatorNoUpdate.accept(any(), any()) } returns
+                Pair(Closeable(), null)
+            coEvery { batchAccumulatorNoUpdate.finish(any()) } returns true
             coEvery { batchUpdateQueue.publish(any()) } returns Unit
 
             coEvery { inputFlow.collect(any()) } coAnswers
                 {
-                    val collector =
-                        firstArg<FlowCollector<Reserved<PipelineEvent<StreamKey, String>>>>()
+                    val collector = firstArg<FlowCollector<PipelineEvent<StreamKey, String>>>()
                     repeat(10) { // arbitrary number of messages
                         collector.emit(messageEvent(key, "value"))
                     }
@@ -159,32 +158,31 @@ class LoadPipelineStepTaskUTest {
 
             task.execute()
 
-            verify(exactly = 1) { batchAccumulatorNoUpdate.start(key, part) }
-            verify(exactly = 10) { batchAccumulatorNoUpdate.accept(any(), any()) }
-            verify(exactly = 1) { batchAccumulatorNoUpdate.finish(any()) }
+            coVerify(exactly = 1) { batchAccumulatorNoUpdate.start(key, part) }
+            coVerify(exactly = 10) { batchAccumulatorNoUpdate.accept(any(), any()) }
+            coVerify(exactly = 1) { batchAccumulatorNoUpdate.finish(any()) }
             coVerify(exactly = 1) { batchUpdateQueue.publish(any()) }
         }
 
     @Test
-    fun `update but not finish called on end-of-stream when last accept yielded output`() =
+    fun `update but not finish called on end-of-stream when last accept returned null state`() =
         runTest {
             val key = StreamKey(DestinationStream.Descriptor("namespace", "stream"))
             val part = 4
             val task = createTask(part, batchAccumulatorNoUpdate)
 
             var acceptCalls = 0
-            every { batchAccumulatorNoUpdate.start(any(), any()) } returns Closeable()
-            every { batchAccumulatorNoUpdate.accept(any(), any()) } answers
+            coEvery { batchAccumulatorNoUpdate.start(any(), any()) } returns Closeable()
+            coEvery { batchAccumulatorNoUpdate.accept(any(), any()) } answers
                 {
-                    if (++acceptCalls == 10) Pair(Closeable(), true) else Pair(Closeable(), null)
+                    if (++acceptCalls == 10) Pair(null, true) else Pair(Closeable(), null)
                 }
-            every { batchAccumulatorNoUpdate.finish(any()) } returns true
+            coEvery { batchAccumulatorNoUpdate.finish(any()) } returns true
             coEvery { batchUpdateQueue.publish(any()) } returns Unit
 
             coEvery { inputFlow.collect(any()) } coAnswers
                 {
-                    val collector =
-                        firstArg<FlowCollector<Reserved<PipelineEvent<StreamKey, String>>>>()
+                    val collector = firstArg<FlowCollector<PipelineEvent<StreamKey, String>>>()
                     repeat(10) { // arbitrary number of messages
                         collector.emit(messageEvent(key, "value"))
                     }
@@ -193,9 +191,9 @@ class LoadPipelineStepTaskUTest {
 
             task.execute()
 
-            verify(exactly = 1) { batchAccumulatorNoUpdate.start(key, part) }
-            verify(exactly = 10) { batchAccumulatorNoUpdate.accept(any(), any()) }
-            verify(exactly = 0) { batchAccumulatorNoUpdate.finish(any()) }
+            coVerify(exactly = 1) { batchAccumulatorNoUpdate.start(key, part) }
+            coVerify(exactly = 10) { batchAccumulatorNoUpdate.accept(any(), any()) }
+            coVerify(exactly = 0) { batchAccumulatorNoUpdate.finish(any()) }
             coVerify(exactly = 1) { batchUpdateQueue.publish(any()) }
         }
 
@@ -206,24 +204,21 @@ class LoadPipelineStepTaskUTest {
         val task = createTask(part, batchAccumulator = batchAccumulatorWithUpdate)
         var acceptCalls = 0
 
-        every { batchAccumulatorWithUpdate.start(any(), any()) } returns Closeable()
-        every { batchAccumulatorWithUpdate.accept(any(), any()) } answers
+        coEvery { batchAccumulatorWithUpdate.start(any(), any()) } returns Closeable()
+        coEvery { batchAccumulatorWithUpdate.accept(any(), any()) } answers
             {
-                val output =
-                    when (acceptCalls++ % 4) {
-                        0 -> null
-                        1 -> MyBatch(Batch.State.PROCESSED)
-                        2 -> MyBatch(Batch.State.PERSISTED)
-                        3 -> MyBatch(Batch.State.COMPLETE)
-                        else -> error("unreachable")
-                    }
-                Pair(Closeable(), output)
+                when (acceptCalls++ % 4) {
+                    0 -> Pair(Closeable(), null)
+                    1 -> Pair(null, MyBatch(Batch.State.PROCESSED))
+                    2 -> Pair(null, MyBatch(Batch.State.PERSISTED))
+                    3 -> Pair(null, MyBatch(Batch.State.COMPLETE))
+                    else -> error("unreachable")
+                }
             }
         coEvery { batchUpdateQueue.publish(any()) } returns Unit
         coEvery { inputFlow.collect(any()) } coAnswers
             {
-                val collector =
-                    firstArg<FlowCollector<Reserved<PipelineEvent<StreamKey, String>>>>()
+                val collector = firstArg<FlowCollector<PipelineEvent<StreamKey, String>>>()
                 repeat(12) { // arbitrary number of messages
                     collector.emit(messageEvent(key, "value"))
                 }
@@ -231,11 +226,11 @@ class LoadPipelineStepTaskUTest {
 
         task.execute()
 
-        verify(exactly = 9) {
+        coVerify(exactly = 9) {
             batchAccumulatorWithUpdate.start(key, part)
         } // only 1/4 are no output
-        verify(exactly = 12) { batchAccumulatorWithUpdate.accept(any(), any()) } // all have data
-        verify(exactly = 0) { batchAccumulatorWithUpdate.finish(any()) } // never end-of-stream
+        coVerify(exactly = 12) { batchAccumulatorWithUpdate.accept(any(), any()) } // all have data
+        coVerify(exactly = 0) { batchAccumulatorWithUpdate.finish(any()) } // never end-of-stream
         coVerify(exactly = 6) { batchUpdateQueue.publish(any()) } // half are PERSISTED/COMPLETE
     }
 
@@ -247,13 +242,13 @@ class LoadPipelineStepTaskUTest {
 
         val task = createTask(part, batchAccumulatorWithUpdate)
 
-        // Make stream1 finish with a persisted output every 3 calls (otherwise null)
-        // Make stream2 finish with a persisted output every 2 calls (otherwise null)
+        // Make stream1 finish with a persisted output coEvery 3 calls (otherwise null)
+        // Make stream2 finish with a persisted output coEvery 2 calls (otherwise null)
         val stream1States = (0 until 11).map { Closeable(it) }
         val stream2States = (0 until 11).map { Closeable(it + 100) }
         var stream1StartCalls = 0
         var stream2StartCalls = 0
-        every { batchAccumulatorWithUpdate.start(key1, any()) } answers
+        coEvery { batchAccumulatorWithUpdate.start(key1, any()) } answers
             {
                 // Stream 1 will finish on 0, 3, 6, 9
                 // (so the last finish is right before end-of-stream, leaving no input to finish)
@@ -265,7 +260,7 @@ class LoadPipelineStepTaskUTest {
                     else -> error("unreachable stream1 start call")
                 }
             }
-        every { batchAccumulatorWithUpdate.start(key2, any()) } answers
+        coEvery { batchAccumulatorWithUpdate.start(key2, any()) } answers
             {
                 // Stream 2 will finish on 0, 2, 4, 6, 8
                 // (so the last finish is one record before end-of-stream, leaving input to finish)
@@ -280,22 +275,23 @@ class LoadPipelineStepTaskUTest {
                 }
             }
         repeat(10) {
-            every { batchAccumulatorWithUpdate.accept(any(), stream1States[it]) } returns
-                Pair(
-                    stream1States[it + 1],
-                    if (it % 3 == 0) MyBatch(Batch.State.PERSISTED) else null
-                )
-            every { batchAccumulatorWithUpdate.accept(any(), stream2States[it]) } returns
-                Pair(
-                    stream2States[it + 1],
-                    if (it % 2 == 0) MyBatch(Batch.State.COMPLETE) else null
-                )
+            coEvery { batchAccumulatorWithUpdate.accept(any(), stream1States[it]) } returns
+                if (it % 3 == 0) {
+                    Pair(null, MyBatch(Batch.State.PERSISTED))
+                } else {
+                    Pair(stream1States[it + 1], null)
+                }
+            coEvery { batchAccumulatorWithUpdate.accept(any(), stream2States[it]) } returns
+                if (it % 2 == 0) {
+                    Pair(null, MyBatch(Batch.State.PERSISTED))
+                } else {
+                    Pair(stream2States[it + 1], null)
+                }
         }
 
         coEvery { inputFlow.collect(any()) } coAnswers
             {
-                val collector =
-                    firstArg<FlowCollector<Reserved<PipelineEvent<StreamKey, String>>>>()
+                val collector = firstArg<FlowCollector<PipelineEvent<StreamKey, String>>>()
                 repeat(10) { // arbitrary number of messages
                     collector.emit(messageEvent(key1, "stream1_value"))
                     collector.emit(messageEvent(key2, "stream2_value"))
@@ -306,20 +302,20 @@ class LoadPipelineStepTaskUTest {
                 collector.emit(endOfStreamEvent(key2))
             }
 
-        every { batchAccumulatorWithUpdate.finish(any()) } returns MyBatch(Batch.State.COMPLETE)
+        coEvery { batchAccumulatorWithUpdate.finish(any()) } returns MyBatch(Batch.State.COMPLETE)
         coEvery { batchUpdateQueue.publish(any()) } returns Unit
 
         task.execute()
 
-        verify(exactly = 4) { batchAccumulatorWithUpdate.start(key1, part) }
-        verify(exactly = 6) { batchAccumulatorWithUpdate.start(key2, part) }
-        verify(exactly = 10) {
+        coVerify(exactly = 4) { batchAccumulatorWithUpdate.start(key1, part) }
+        coVerify(exactly = 6) { batchAccumulatorWithUpdate.start(key2, part) }
+        coVerify(exactly = 10) {
             batchAccumulatorWithUpdate.accept("stream1_value", match { it in stream1States })
         }
-        verify(exactly = 10) {
+        coVerify(exactly = 10) {
             batchAccumulatorWithUpdate.accept("stream2_value", match { it in stream2States })
         }
-        verify(exactly = 1) { batchAccumulatorWithUpdate.finish(stream2States[10]) }
+        coVerify(exactly = 1) { batchAccumulatorWithUpdate.finish(stream2States[10]) }
     }
 
     @Test
@@ -330,21 +326,20 @@ class LoadPipelineStepTaskUTest {
 
         val task = createTask(part, batchAccumulatorWithUpdate)
 
-        every { batchAccumulatorWithUpdate.start(key1, part) } returns Closeable(1)
-        every { batchAccumulatorWithUpdate.start(key2, part) } returns Closeable(2)
-        every { batchAccumulatorWithUpdate.accept("stream1_value", any()) } returns
+        coEvery { batchAccumulatorWithUpdate.start(key1, part) } returns Closeable(1)
+        coEvery { batchAccumulatorWithUpdate.start(key2, part) } returns Closeable(2)
+        coEvery { batchAccumulatorWithUpdate.accept("stream1_value", any()) } returns
             Pair(Closeable(1), null)
-        every { batchAccumulatorWithUpdate.accept("stream2_value", any()) } returns
+        coEvery { batchAccumulatorWithUpdate.accept("stream2_value", any()) } returns
             Pair(Closeable(2), null)
-        every { batchAccumulatorWithUpdate.finish(Closeable(1)) } returns
+        coEvery { batchAccumulatorWithUpdate.finish(Closeable(1)) } returns
             MyBatch(Batch.State.COMPLETE)
-        every { batchAccumulatorWithUpdate.finish(Closeable(2)) } returns
+        coEvery { batchAccumulatorWithUpdate.finish(Closeable(2)) } returns
             MyBatch(Batch.State.PERSISTED)
 
         coEvery { inputFlow.collect(any()) } coAnswers
             {
-                val collector =
-                    firstArg<FlowCollector<Reserved<PipelineEvent<StreamKey, String>>>>()
+                val collector = firstArg<FlowCollector<PipelineEvent<StreamKey, String>>>()
 
                 // Emit 10 messages for stream1, 10 messages for stream2
                 repeat(12) {
diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/PartFactory.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/PartFactory.kt
index e8c09b56f0ab4..4fb8875e64884 100644
--- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/PartFactory.kt
+++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/file/object_storage/PartFactory.kt
@@ -63,6 +63,10 @@ data class Part(
 ) {
     val isEmpty: Boolean
         get() = bytes == null
+
+    override fun toString(): String {
+        return "Part(key='$key', size=${bytes?.size}, index=$partIndex, isFinal=$isFinal)"
+    }
 }
 
 class PartBookkeeper {
@@ -87,7 +91,9 @@ class PartBookkeeper {
         // index even if it is empty.
         if (part.isFinal) {
             if (!finalIndex.compareAndSet(null, part.partIndex)) {
-                throw IllegalStateException("Final part already seen for ${part.key}")
+                throw IllegalStateException(
+                    "Final part ${finalIndex.get()} already seen for ${part.key}"
+                )
             }
         }
     }
@@ -99,5 +105,7 @@ class PartBookkeeper {
      * 3. the last index is the final index
      */
     val isComplete: Boolean
-        get() = finalIndex.get()?.let { it == partIndexes.size } ?: false
+        get() {
+            return finalIndex.get()?.let { it == partIndexes.size } ?: false
+        }
 }
diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartPartitioner.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartPartitioner.kt
new file mode 100644
index 0000000000000..6fe3d749c7fee
--- /dev/null
+++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartPartitioner.kt
@@ -0,0 +1,40 @@
+/*
+ * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.cdk.load.pipline.object_storage
+
+import io.airbyte.cdk.load.file.object_storage.Part
+import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
+import io.airbyte.cdk.load.message.StreamKey
+import io.airbyte.cdk.load.pipeline.OutputPartitioner
+import kotlin.math.abs
+import kotlin.random.Random
+
+/**
+ * For routing parts to upload workers.
+ *
+ * The key is the object key (filename). This means that the framework will keep separate state per
+ * upload-in-progress.
+ *
+ * The partition is round-robin, for maximum concurrency.
+ */
+class ObjectLoaderPartPartitioner :
+    OutputPartitioner<StreamKey, DestinationRecordAirbyteValue, ObjectKey, Part> {
+    // Start on a random value
+    private var nextPartition = Random(System.currentTimeMillis()).nextInt(Int.MAX_VALUE)
+
+    override fun getOutputKey(inputKey: StreamKey, output: Part): ObjectKey {
+        return ObjectKey(inputKey.stream, output.key)
+    }
+
+    override fun getPart(outputKey: ObjectKey, numParts: Int): Int {
+        // Rotate through partitions
+        val part = nextPartition++
+        return if (part == Int.MIN_VALUE) { // avoid overflow
+            0
+        } else {
+            abs(part) % numParts
+        }
+    }
+}
diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartQueue.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartQueue.kt
new file mode 100644
index 0000000000000..3a84377a3525c
--- /dev/null
+++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartQueue.kt
@@ -0,0 +1,62 @@
+/*
+ * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.cdk.load.pipline.object_storage
+
+import io.airbyte.cdk.load.command.DestinationStream
+import io.airbyte.cdk.load.file.object_storage.Part
+import io.airbyte.cdk.load.message.ChannelMessageQueue
+import io.airbyte.cdk.load.message.PartitionedQueue
+import io.airbyte.cdk.load.message.PipelineMessage
+import io.airbyte.cdk.load.message.WithStream
+import io.airbyte.cdk.load.state.ReservationManager
+import io.airbyte.cdk.load.write.object_storage.ObjectLoader
+import io.github.oshai.kotlinlogging.KotlinLogging
+import io.micronaut.context.annotation.Factory
+import io.micronaut.context.annotation.Requires
+import jakarta.inject.Named
+import jakarta.inject.Singleton
+import kotlinx.coroutines.channels.Channel
+import kotlinx.coroutines.runBlocking
+
+data class ObjectKey(override val stream: DestinationStream.Descriptor, val objectKey: String) :
+    WithStream
+
+@Factory
+class ObjectLoaderPartQueueFactory(
+    val loader: ObjectLoader,
+    @Named("memoryManager") val memoryManager: ReservationManager,
+) {
+    val log = KotlinLogging.logger {}
+
+    @Singleton
+    @Named("objectLoaderPartQueue")
+    @Requires(bean = ObjectLoader::class)
+    fun objectLoaderPartQueue(): PartitionedQueue<PipelineMessage<ObjectKey, Part>> {
+        val bytes = memoryManager.totalCapacityBytes * loader.maxMemoryRatioReservedForParts
+        val reservation = runBlocking { memoryManager.reserve(bytes.toLong(), this) }
+        val bytesPerPartition = reservation.bytesReserved / loader.numPartWorkers
+        val partsPerPartition = bytesPerPartition / loader.partSizeBytes
+
+        if (partsPerPartition < 1) {
+            throw IllegalArgumentException(
+                "Reserved $bytes/${memoryManager.totalCapacityBytes}b not enough for ${loader.numPartWorkers} ${loader.partSizeBytes}b parts"
+            )
+        }
+
+        log.info {
+            "Reserved $bytes/${memoryManager.totalCapacityBytes}b for ${loader.numPartWorkers} ${loader.partSizeBytes}b parts => $partsPerPartition capacity per queue partition"
+        }
+
+        return PartitionedQueue(
+            (0 until loader.numUploadWorkers)
+                .map {
+                    ChannelMessageQueue(
+                        Channel<PipelineMessage<ObjectKey, Part>>(partsPerPartition.toInt())
+                    )
+                }
+                .toTypedArray()
+        )
+    }
+}
diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartStep.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartStep.kt
new file mode 100644
index 0000000000000..18e0b1475c1b5
--- /dev/null
+++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartStep.kt
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.cdk.load.pipline.object_storage
+
+import io.airbyte.cdk.load.file.object_storage.Part
+import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
+import io.airbyte.cdk.load.message.PartitionedQueue
+import io.airbyte.cdk.load.message.PipelineEvent
+import io.airbyte.cdk.load.message.QueueWriter
+import io.airbyte.cdk.load.message.StreamKey
+import io.airbyte.cdk.load.pipeline.BatchUpdate
+import io.airbyte.cdk.load.pipeline.LoadPipelineStep
+import io.airbyte.cdk.load.pipeline.RecordCountFlushStrategy
+import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask
+import io.airbyte.cdk.load.write.object_storage.ObjectLoader
+import io.micronaut.context.annotation.Requires
+import io.micronaut.context.annotation.Value
+import jakarta.inject.Named
+import jakarta.inject.Singleton
+
+@Singleton
+@Requires(bean = ObjectLoader::class)
+class ObjectLoaderPartStep(
+    private val objectLoader: ObjectLoader,
+    private val recordToPartAccumulator: ObjectLoaderRecordToPartAccumulator<*>,
+    @Named("recordQueue")
+    val inputQueue: PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
+    @Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
+    @Named("objectLoaderPartQueue") val partQueue: PartitionedQueue<PipelineEvent<ObjectKey, Part>>,
+    @Value("\${airbyte.destination.core.record-batch-size-override:null}")
+    val batchSizeOverride: Long? = null,
+) : LoadPipelineStep {
+    override val numWorkers: Int = objectLoader.numPartWorkers
+
+    override fun taskForPartition(partition: Int): LoadPipelineStepTask<*, *, *, *, *> {
+        return LoadPipelineStepTask(
+            recordToPartAccumulator,
+            inputQueue.consume(partition),
+            batchQueue,
+            ObjectLoaderPartPartitioner(),
+            partQueue,
+            batchSizeOverride?.let { RecordCountFlushStrategy(it) },
+            partition
+        )
+    }
+}
diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartToObjectAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartToObjectAccumulator.kt
new file mode 100644
index 0000000000000..abb5ee32df151
--- /dev/null
+++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartToObjectAccumulator.kt
@@ -0,0 +1,86 @@
+/*
+ * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.cdk.load.pipline.object_storage
+
+import io.airbyte.cdk.load.command.DestinationCatalog
+import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient
+import io.airbyte.cdk.load.file.object_storage.Part
+import io.airbyte.cdk.load.file.object_storage.PartBookkeeper
+import io.airbyte.cdk.load.file.object_storage.StreamingUpload
+import io.airbyte.cdk.load.message.Batch
+import io.airbyte.cdk.load.message.WithBatchState
+import io.airbyte.cdk.load.pipeline.BatchAccumulator
+import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState
+import io.airbyte.cdk.load.write.object_storage.ObjectLoader
+import io.micronaut.context.annotation.Requires
+import jakarta.inject.Singleton
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * In order to allow streaming uploads on the same key to be parallelized, upload state needs to be
+ * shared across workers.
+ */
+@Singleton
+@Requires(bean = ObjectLoader::class)
+class UploadsInProgress {
+    val byKey: ConcurrentHashMap<String, ObjectLoaderPartToObjectAccumulator.State> =
+        ConcurrentHashMap()
+}
+
+@Singleton
+@Requires(bean = ObjectLoader::class)
+class ObjectLoaderPartToObjectAccumulator(
+    private val client: ObjectStorageClient<*>,
+    private val catalog: DestinationCatalog,
+    private val uploads: UploadsInProgress,
+) :
+    BatchAccumulator<
+        ObjectLoaderPartToObjectAccumulator.State,
+        ObjectKey,
+        Part,
+        ObjectLoaderPartToObjectAccumulator.ObjectResult
+    > {
+
+    data class State(val streamingUpload: StreamingUpload<*>, val bookkeeper: PartBookkeeper) :
+        AutoCloseable {
+        override fun close() {
+            // Do Nothing
+        }
+    }
+
+    data class ObjectResult(override val state: Batch.State) : WithBatchState
+
+    override suspend fun start(key: ObjectKey, part: Int): State {
+        val stream = catalog.getStream(key.stream)
+        return uploads.byKey.getOrPut(key.objectKey) {
+            State(
+                client.startStreamingUpload(
+                    key.objectKey,
+                    metadata = ObjectStorageDestinationState.metadataFor(stream)
+                ),
+                PartBookkeeper()
+            )
+        }
+    }
+
+    override suspend fun accept(input: Part, state: State): Pair<State, ObjectResult?> {
+        input.bytes?.let { state.streamingUpload.uploadPart(it, input.partIndex) }
+        if (input.bytes == null) {
+            throw IllegalStateException("Empty non-final part received: this should not happen")
+        }
+        state.bookkeeper.add(input)
+        if (state.bookkeeper.isComplete) {
+            return Pair(state, finish(state))
+        }
+        return Pair(state, null)
+    }
+
+    override suspend fun finish(state: State): ObjectResult {
+        if (state.bookkeeper.isComplete) {
+            state.streamingUpload.complete()
+        } // else assume another part is finishing this
+        return ObjectResult(Batch.State.COMPLETE)
+    }
+}
diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPipeline.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPipeline.kt
new file mode 100644
index 0000000000000..f4c341dff710a
--- /dev/null
+++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPipeline.kt
@@ -0,0 +1,15 @@
+/*
+ * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.cdk.load.pipline.object_storage
+
+import io.airbyte.cdk.load.pipeline.LoadPipeline
+import io.airbyte.cdk.load.write.object_storage.ObjectLoader
+import io.micronaut.context.annotation.Requires
+import jakarta.inject.Singleton
+
+@Singleton
+@Requires(bean = ObjectLoader::class)
+class ObjectLoaderPipeline(partStep: ObjectLoaderPartStep, uploadStep: ObjectLoaderUploadStep) :
+    LoadPipeline(listOf(partStep, uploadStep))
diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderRecordToPartAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderRecordToPartAccumulator.kt
new file mode 100644
index 0000000000000..f69e7dd0da973
--- /dev/null
+++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderRecordToPartAccumulator.kt
@@ -0,0 +1,115 @@
+/*
+ * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.cdk.load.pipline.object_storage
+
+import io.airbyte.cdk.load.command.DestinationCatalog
+import io.airbyte.cdk.load.command.DestinationStream
+import io.airbyte.cdk.load.file.object_storage.BufferedFormattingWriter
+import io.airbyte.cdk.load.file.object_storage.BufferedFormattingWriterFactory
+import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient
+import io.airbyte.cdk.load.file.object_storage.Part
+import io.airbyte.cdk.load.file.object_storage.PartFactory
+import io.airbyte.cdk.load.file.object_storage.PathFactory
+import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
+import io.airbyte.cdk.load.message.StreamKey
+import io.airbyte.cdk.load.pipeline.BatchAccumulator
+import io.airbyte.cdk.load.state.DestinationStateManager
+import io.airbyte.cdk.load.state.object_storage.ObjectStorageDestinationState
+import io.airbyte.cdk.load.write.object_storage.ObjectLoader
+import io.github.oshai.kotlinlogging.KotlinLogging
+import io.micronaut.context.annotation.Requires
+import io.micronaut.context.annotation.Value
+import jakarta.inject.Singleton
+import java.io.OutputStream
+
+@Singleton
+@Requires(bean = ObjectLoader::class)
+class ObjectLoaderRecordToPartAccumulator<T : OutputStream>(
+    private val pathFactory: PathFactory,
+    private val catalog: DestinationCatalog,
+    private val writerFactory: BufferedFormattingWriterFactory<T>,
+    private val client: ObjectStorageClient<*>,
+    private val loader: ObjectLoader,
+    // TODO: This doesn't need to be "DestinationState", just a couple of utility classes
+    val stateMananger: DestinationStateManager<ObjectStorageDestinationState>,
+    @Value("\${airbyte.destination.core.record-batch-size-override:null}")
+    val batchSizeOverride: Long? = null,
+) :
+    BatchAccumulator<
+        ObjectLoaderRecordToPartAccumulator.State<T>, StreamKey, DestinationRecordAirbyteValue, Part
+    > {
+    private val log = KotlinLogging.logger {}
+
+    private val objectSizeBytes = loader.objectSizeBytes
+    private val partSizeBytes = loader.partSizeBytes
+
+    data class State<T : OutputStream>(
+        val stream: DestinationStream,
+        val writer: BufferedFormattingWriter<T>,
+        val partFactory: PartFactory
+    ) : AutoCloseable {
+        override fun close() {
+            writer.close()
+        }
+    }
+
+    private suspend fun newState(stream: DestinationStream): State<T> {
+        // Determine unique file name.
+        val pathOnly = pathFactory.getFinalDirectory(stream)
+        val state = stateMananger.getState(stream)
+        val fileNo = state.getPartIdCounter(pathOnly).incrementAndGet()
+        val fileName = state.ensureUnique(pathFactory.getPathToFile(stream, fileNo))
+
+        // Initialize the part factory and writer.
+        val partFactory = PartFactory(fileName, fileNo)
+        log.info { "Starting part generation for $fileName (${stream.descriptor})" }
+        return State(stream, writerFactory.create(stream), partFactory)
+    }
+
+    private fun makePart(state: State<T>, forceFinish: Boolean = false): Part {
+        state.writer.flush()
+        val newSize = state.partFactory.totalSize + state.writer.bufferSize
+        val isFinal =
+            forceFinish ||
+                newSize >= objectSizeBytes ||
+                batchSizeOverride != null // HACK: This is a hack to force a flush
+        val bytes =
+            if (isFinal) {
+                state.writer.finish()
+            } else {
+                state.writer.takeBytes()
+            }
+        val part = state.partFactory.nextPart(bytes, isFinal)
+        log.info { "Creating part $part" }
+        return part
+    }
+
+    override suspend fun start(key: StreamKey, part: Int): State<T> {
+        val stream = catalog.getStream(key.stream)
+        return newState(stream)
+    }
+
+    override suspend fun accept(
+        input: DestinationRecordAirbyteValue,
+        state: State<T>
+    ): Pair<State<T>?, Part?> {
+        state.writer.accept(input)
+        if (state.writer.bufferSize >= partSizeBytes || batchSizeOverride != null) {
+            val part = makePart(state)
+            val nextState =
+                if (part.isFinal) {
+                    null
+                } else {
+                    state
+                }
+            return Pair(nextState, part)
+        }
+        return Pair(state, null)
+    }
+
+    override suspend fun finish(state: State<T>): Part {
+        return makePart(state, true)
+    }
+}
diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderUploadStep.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderUploadStep.kt
new file mode 100644
index 0000000000000..accda27f9a010
--- /dev/null
+++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderUploadStep.kt
@@ -0,0 +1,46 @@
+/*
+ * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.cdk.load.pipline.object_storage
+
+import io.airbyte.cdk.load.file.object_storage.Part
+import io.airbyte.cdk.load.message.PartitionedQueue
+import io.airbyte.cdk.load.message.PipelineEvent
+import io.airbyte.cdk.load.message.QueueWriter
+import io.airbyte.cdk.load.message.StreamKey
+import io.airbyte.cdk.load.pipeline.BatchUpdate
+import io.airbyte.cdk.load.pipeline.LoadPipelineStep
+import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask
+import io.airbyte.cdk.load.write.object_storage.ObjectLoader
+import io.micronaut.context.annotation.Requires
+import jakarta.inject.Named
+import jakarta.inject.Singleton
+
+@Singleton
+@Requires(bean = ObjectLoader::class)
+class ObjectLoaderUploadStep(
+    val loader: ObjectLoader,
+    val accumulator: ObjectLoaderPartToObjectAccumulator,
+    @Named("objectLoaderPartQueue") val partQueue: PartitionedQueue<PipelineEvent<ObjectKey, Part>>,
+    @Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
+) : LoadPipelineStep {
+    override val numWorkers: Int = loader.numUploadWorkers
+
+    override fun taskForPartition(partition: Int): LoadPipelineStepTask<*, *, *, *, *> {
+        return LoadPipelineStepTask(
+            accumulator,
+            partQueue.consume(partition),
+            batchQueue,
+            outputPartitioner = null,
+            outputQueue =
+                null
+                    as
+                    PartitionedQueue<
+                        PipelineEvent<StreamKey, ObjectLoaderPartToObjectAccumulator.ObjectResult>
+                    >?,
+            flushStrategy = null,
+            partition
+        )
+    }
+}
diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectLoader.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectLoader.kt
new file mode 100644
index 0000000000000..85c461051c8d4
--- /dev/null
+++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/ObjectLoader.kt
@@ -0,0 +1,67 @@
+/*
+ * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.cdk.load.write.object_storage
+
+import io.airbyte.cdk.load.write.LoadStrategy
+
+/**
+ * [ObjectLoader] is for the use case where a destination writes records into some number of files
+ * in a file system or cloud storage provider whose client supports multipart uploads.
+ *
+ * Usage:
+ *
+ * - declare a bean implementing this interface and optionally override the default configuration
+ * values
+ * - declare the necessary beans to initialize an
+ * [io.airbyte.cdk.load.file.object_storage.ObjectStorageClient]
+ * - declare a bean of
+ * [io.airbyte.cdk.load.command.object_storage.ObjectStoragePathConfigurationProvider] to configure
+ * the path for each object (typically you would add this to your
+ * [io.airbyte.cdk.load.command.DestinationConfiguration])
+ * - declare a bean of
+ * [io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfigurationProvider] to control
+ * the format in which the data is loaded
+ *
+ * Configuration:
+ *
+ * - [numPartWorkers] is the number of threads (coroutines) devoted to formatting records into
+ * uploadable parts. (This is typically CPU-bound).
+ * - [numUploadWorkers] is the number of threads (coroutines) devoted to uploading parts to the
+ * object storage. (This is typically Network IO-bound).
+ * - [maxMemoryRatioReservedForParts] is proportion of the total heap reserved for parts in memory.
+ * This is used to calculate the size of the work queue, which when full will cause the part workers
+ * to suspend until the upload workers have processed some parts.
+ * - [partSizeBytes] is the approximate desired part size in bytes. When this much part data has
+ * been accumulated by a part worker, it will be forwarded to an upload worker and uploaded to the
+ * destination.
+ * - [objectSizeBytes] is the approximate desired file size in bytes. When this much part data has
+ * been accumulated, the upload will be completed, and the file will become visible to the end user.
+ *
+ * Partitioning:
+ *
+ * The default partitioning is to distribute the records round-robin (in small batches) to the part
+ * workers (using [io.airbyte.cdk.load.pipeline.RoundRobinInputPartitioner]). To override this,
+ * declare a bean implementing [io.airbyte.cdk.load.pipeline.InputPartitioner], however it should be
+ * fine for most purposes.
+ *
+ * The parts are also distributed round-robin to the upload workers using
+ * [io.airbyte.cdk.load.pipeline.object_storage.ObjectLoaderPartPartitioner]. This is not currently
+ * configurable.
+ */
+interface ObjectLoader : LoadStrategy {
+    val numPartWorkers: Int
+        get() = 1
+    val numUploadWorkers: Int
+        get() = 5
+    val maxMemoryRatioReservedForParts: Double
+        get() = 0.2
+    val partSizeBytes: Long
+        get() = 10L * 1024 * 1024
+    val objectSizeBytes: Long
+        get() = 200L * 1024 * 1024
+
+    override val inputPartitions: Int
+        get() = numPartWorkers
+}
diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/pipeline/object_storage/ObjectLoaderPartPartitionerTest.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/pipeline/object_storage/ObjectLoaderPartPartitionerTest.kt
new file mode 100644
index 0000000000000..af921b6acd3de
--- /dev/null
+++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/pipeline/object_storage/ObjectLoaderPartPartitionerTest.kt
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.cdk.load.pipeline.object_storage
+
+import io.airbyte.cdk.load.command.DestinationStream
+import io.airbyte.cdk.load.file.object_storage.Part
+import io.airbyte.cdk.load.message.StreamKey
+import io.airbyte.cdk.load.pipline.object_storage.ObjectKey
+import io.airbyte.cdk.load.pipline.object_storage.ObjectLoaderPartPartitioner
+import kotlin.test.assertEquals
+import org.junit.jupiter.api.Test
+
+class ObjectLoaderPartPartitionerTest {
+    @Test
+    fun `partitioner distributes keys round-robin`() {
+        val keys = (0 until 12).map(Int::toString)
+        val partitioner = ObjectLoaderPartPartitioner()
+        val stream = DestinationStream.Descriptor("test", "stream")
+        val numParts = 3
+        var lastPartition: Int? = null
+        (0 until 12).forEach {
+            val partition = partitioner.getPart(ObjectKey(stream, it.toString()), numParts)
+            lastPartition?.let { last -> assertEquals((last + 1) % numParts, partition) }
+            lastPartition = partition
+        }
+    }
+
+    @Test
+    fun `partitioner uses object key as accumulator key`() {
+        val keys = listOf("foo", "bar", "baz")
+        val parts =
+            keys.mapIndexed { index, key ->
+                Part(
+                    key = key,
+                    fileNumber = index * 10L,
+                    partIndex = index,
+                    bytes = null,
+                    isFinal = false
+                )
+            }
+        val partitioner = ObjectLoaderPartPartitioner()
+        val stream = DestinationStream.Descriptor("test", "stream")
+        val outputKeys = parts.map { part -> partitioner.getOutputKey(StreamKey(stream), part) }
+        val expectedKeys = keys.map { key -> ObjectKey(stream, key) }
+        assertEquals(expectedKeys, outputKeys)
+    }
+}
diff --git a/airbyte-integrations/connectors/destination-s3/metadata.yaml b/airbyte-integrations/connectors/destination-s3/metadata.yaml
index 7514ddc977f0e..36ec28227c669 100644
--- a/airbyte-integrations/connectors/destination-s3/metadata.yaml
+++ b/airbyte-integrations/connectors/destination-s3/metadata.yaml
@@ -2,7 +2,7 @@ data:
   connectorSubtype: file
   connectorType: destination
   definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
-  dockerImageTag: 1.5.3
+  dockerImageTag: 1.5.4
   dockerRepository: airbyte/destination-s3
   githubIssueLabel: destination-s3
   icon: s3.svg
diff --git a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Configuration.kt b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Configuration.kt
index d1658e831d856..d59c806cfdec5 100644
--- a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Configuration.kt
+++ b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Configuration.kt
@@ -36,9 +36,16 @@ data class S3V2Configuration<T : OutputStream>(
     // Internal configuration
     override val objectStorageUploadConfiguration: ObjectStorageUploadConfiguration =
         ObjectStorageUploadConfiguration(),
-    override val numProcessRecordsWorkers: Int = 2,
+    override val numProcessRecordsWorkers: Int = 1,
     override val estimatedRecordMemoryOverheadRatio: Double = 5.0,
     override val processEmptyFiles: Boolean = true,
+
+    /** Below has no effect until [S3V2ObjectLoader] is enabled. */
+    val numPartWorkers: Int = 2,
+    val numUploadWorkers: Int = 10,
+    val maxMemoryRatioReservedForParts: Double = 0.2,
+    val objectSizeBytes: Long = 200L * 1024 * 1024,
+    val partSizeBytes: Long = 10L * 1024 * 1024,
 ) :
     DestinationConfiguration(),
     AWSAccessKeyConfigurationProvider,
diff --git a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2ObjectLoader.kt b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2ObjectLoader.kt
new file mode 100644
index 0000000000000..ae89bbb3fa87b
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2ObjectLoader.kt
@@ -0,0 +1,25 @@
+/*
+ * Copyright (c) 2024 Airbyte, Inc., all rights reserved.
+ */
+
+package io.airbyte.integrations.destination.s3_v2
+
+import io.airbyte.cdk.load.pipeline.RoundRobinInputPartitioner
+import io.airbyte.cdk.load.write.object_storage.ObjectLoader
+
+/**
+ * These are temporarily disabled so that we can merge the underlying changes without doing a full
+ * S3 release. I will do a separate PR to re-enable this and roll it out gradually.
+ */
+
+// @Singleton
+class S3V2ObjectLoader(config: S3V2Configuration<*>) : ObjectLoader {
+    override val numPartWorkers: Int = config.numPartWorkers
+    override val numUploadWorkers: Int = config.numUploadWorkers
+    override val maxMemoryRatioReservedForParts: Double = config.maxMemoryRatioReservedForParts
+    override val objectSizeBytes: Long = config.objectSizeBytes
+    override val partSizeBytes: Long = config.partSizeBytes
+}
+
+// @Singleton
+class S3V2RoundRobinInputPartitioner : RoundRobinInputPartitioner()
diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md
index bca981ae15fd4..342586766b541 100644
--- a/docs/integrations/destinations/s3.md
+++ b/docs/integrations/destinations/s3.md
@@ -544,6 +544,7 @@ To see connector limitations, or troubleshoot your S3 connector, see more [in ou
 
 | Version     | Date       | Pull Request                                               | Subject                                                                                                                                              |
 |:------------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
+| 1.5.4       | 2025-03-05 | [54695](https://github.com/airbytehq/airbyte/pull/54695)   | Nonfunctional changes to support performance testing                                                                                                 |
 | 1.5.3       | 2025-03-04 | [54661](https://github.com/airbytehq/airbyte/pull/54661)   | Nonfunctional changes to support performance testing                                                                                                 |
 | 1.5.2       | 2025-02-25 | [54661](https://github.com/airbytehq/airbyte/pull/54661)   | Nonfunctional cleanup; dropped unused staging code                                                                                                   |
 | 1.5.1       | 2025-02-11 | [53636](https://github.com/airbytehq/airbyte/pull/53636)   | Nonfunctional CDK version pin.                                                                                                                       |

From 61d5a6273e6fd5a8f0fc6a9950901481dad91296 Mon Sep 17 00:00:00 2001
From: Johnny Schmidt <john.schmidt@airbyte.io>
Date: Wed, 12 Mar 2025 10:09:05 -0700
Subject: [PATCH 2/2] WIP: File Loader

---
 .../cdk/load/config/SyncBeanFactory.kt        |  16 +++
 .../cdk/load/message/DestinationMessage.kt    |   3 +-
 .../cdk/load/message/PartitionedQueue.kt      |  16 +--
 .../cdk/load/pipeline/InputPartitioner.kt     |   1 +
 .../airbyte/cdk/load/pipeline/LoadPipeline.kt |   2 +-
 .../pipeline/RoundRobinInputPartitioner.kt    |   1 +
 .../airbyte/cdk/load/state/StreamManager.kt   |   1 -
 .../cdk/load/task/DestinationTaskLauncher.kt  |   5 +
 .../load/task/internal/InputConsumerTask.kt   |  34 +++--
 .../task/internal/LoadPipelineStepTask.kt     |   7 +-
 .../FileLoaderMemoryMapFileTask.kt            |  32 +++++
 .../object_storage/FileLoaderPartStep.kt      |  49 +++++++
 .../object_storage/FileLoaderPipeline.kt      |  17 +++
 .../FileLoaderProcessFileTask.kt              | 122 ++++++++++++++++++
 .../ObjectLoaderPartPartitioner.kt            |   4 +-
 .../object_storage/ObjectLoaderPartQueue.kt   |  32 ++---
 .../ObjectLoaderPartToObjectAccumulator.kt    |   8 +-
 .../load/write/object_storage/FileLoader.kt   |  14 ++
 .../connectors/destination-s3/metadata.yaml   |   2 +-
 .../src/main/kotlin/S3V2Configuration.kt      |  13 +-
 .../src/main/kotlin/S3V2FileLoader.kt         |  19 +++
 .../src/main/kotlin/S3V2Specification.kt      |  12 ++
 .../destination/s3_v2/S3V2PerformanceTest.kt  |   2 +-
 23 files changed, 364 insertions(+), 48 deletions(-)
 create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/FileLoaderMemoryMapFileTask.kt
 create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/FileLoaderPartStep.kt
 create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/FileLoaderPipeline.kt
 create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/FileLoaderProcessFileTask.kt
 create mode 100644 airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FileLoader.kt
 create mode 100644 airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2FileLoader.kt

diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt
index b45002fc0270e..c03962f3dc818 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt
@@ -9,6 +9,7 @@ import io.airbyte.cdk.load.command.DestinationConfiguration
 import io.airbyte.cdk.load.command.DestinationStream
 import io.airbyte.cdk.load.message.BatchEnvelope
 import io.airbyte.cdk.load.message.ChannelMessageQueue
+import io.airbyte.cdk.load.message.DestinationFile
 import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
 import io.airbyte.cdk.load.message.MultiProducerChannel
 import io.airbyte.cdk.load.message.PartitionedQueue
@@ -127,6 +128,21 @@ class SyncBeanFactory {
         )
     }
 
+    /**
+     * Same as recordQueue, but for files.
+     */
+    @Singleton
+    @Named("fileQueue")
+    fun fileQueue(
+        loadStrategy: LoadStrategy? = null,
+    ): PartitionedQueue<PipelineEvent<StreamKey, DestinationFile>> {
+        return PartitionedQueue(
+            Array(loadStrategy?.inputPartitions ?: 1) {
+                ChannelMessageQueue(Channel(Channel.UNLIMITED))
+            }
+        )
+    }
+
     /** A queue for updating batch states, which is not partitioned. */
     @Singleton
     @Named("batchStateUpdateQueue")
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt
index 9cb52fd53593f..4adbf4b8102f6 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt
@@ -32,6 +32,7 @@ import io.airbyte.protocol.models.v0.AirbyteTraceMessage
 import io.micronaut.context.annotation.Value
 import jakarta.inject.Singleton
 import java.math.BigInteger
+import java.nio.MappedByteBuffer
 import java.time.OffsetDateTime
 
 /**
@@ -167,7 +168,7 @@ data class DestinationFile(
     override val stream: DestinationStream.Descriptor,
     val emittedAtMs: Long,
     val serialized: String,
-    val fileMessage: AirbyteRecordMessageFile
+    val fileMessage: AirbyteRecordMessageFile,
 ) : DestinationFileDomainMessage {
     /** Convenience constructor, primarily intended for use in tests. */
     class AirbyteRecordMessageFile {
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PartitionedQueue.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PartitionedQueue.kt
index 1b49db9eb8a7a..7282403dbb50b 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PartitionedQueue.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PartitionedQueue.kt
@@ -11,17 +11,17 @@ class PartitionedQueue<T>(private val queues: Array<MessageQueue<T>>) : Closeabl
     val partitions = queues.size
 
     fun consume(partition: Int): Flow<T> {
-        if (partition < 0 || partition >= queues.size) {
-            throw IllegalArgumentException("Invalid partition: $partition")
-        }
-        return queues[partition].consume()
+//        if (partition < 0 || partition >= queues.size) {
+//            throw IllegalArgumentException("Invalid partition: $partition")
+//        }
+        return queues[partition % partitions].consume()
     }
 
     suspend fun publish(value: T, partition: Int) {
-        if (partition < 0 || partition >= queues.size) {
-            throw IllegalArgumentException("Invalid partition: $partition")
-        }
-        queues[partition].publish(value)
+//        if (partition < 0 || partition >= queues.size) {
+//            throw IllegalArgumentException("Invalid partition: $partition")
+//        }
+        queues[partition % partitions].publish(value)
     }
 
     suspend fun broadcast(value: T) = queues.forEach { it.publish(value) }
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt
index d1aa786a88f5a..1c124f256fb43 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt
@@ -5,6 +5,7 @@
 package io.airbyte.cdk.load.pipeline
 
 import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
+import io.airbyte.cdk.load.message.DestinationStreamAffinedMessage
 import io.micronaut.context.annotation.Secondary
 import jakarta.inject.Singleton
 import kotlin.math.abs
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/LoadPipeline.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/LoadPipeline.kt
index 44aad5ad96945..b9348902e953e 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/LoadPipeline.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/LoadPipeline.kt
@@ -9,7 +9,7 @@ import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask
 
 interface LoadPipelineStep {
     val numWorkers: Int
-    fun taskForPartition(partition: Int): LoadPipelineStepTask<*, *, *, *, *>
+    fun taskForPartition(partition: Int): Task
 }
 
 /**
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/RoundRobinInputPartitioner.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/RoundRobinInputPartitioner.kt
index f9b4dcf31e970..f398a332f2800 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/RoundRobinInputPartitioner.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/RoundRobinInputPartitioner.kt
@@ -5,6 +5,7 @@
 package io.airbyte.cdk.load.pipeline
 
 import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
+import io.airbyte.cdk.load.message.DestinationStreamAffinedMessage
 import kotlin.math.abs
 import kotlin.random.Random
 
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/StreamManager.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/StreamManager.kt
index fb12001bfc85c..829eed8d501cc 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/StreamManager.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/StreamManager.kt
@@ -389,7 +389,6 @@ class DefaultStreamManager(
         if (readCount == 0L) {
             return true
         }
-
         val completedCount = checkpointCounts.values.sumOf { it.recordsCompleted.get() }
         return completedCount == readCount
     }
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt
index 6cd2d48c31482..1cb1c75b95d2f 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt
@@ -11,6 +11,7 @@ import io.airbyte.cdk.load.command.DestinationStream
 import io.airbyte.cdk.load.message.BatchEnvelope
 import io.airbyte.cdk.load.message.ChannelMessageQueue
 import io.airbyte.cdk.load.message.CheckpointMessageWrapped
+import io.airbyte.cdk.load.message.DestinationFile
 import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
 import io.airbyte.cdk.load.message.DestinationStreamEvent
 import io.airbyte.cdk.load.message.MessageQueue
@@ -146,6 +147,8 @@ class DefaultDestinationTaskLauncher<K : WithStream>(
     @Named("recordQueue")
     private val recordQueueForPipeline:
         PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
+    @Named("fileQueue")
+    private val fileQueueForPipeline: PartitionedQueue<PipelineEvent<StreamKey, DestinationFile>>,
     @Named("batchStateUpdateQueue") private val batchUpdateQueue: ChannelMessageQueue<BatchUpdate>,
     private val loadPipeline: LoadPipeline?,
     private val partitioner: InputPartitioner,
@@ -201,6 +204,7 @@ class DefaultDestinationTaskLauncher<K : WithStream>(
         log.info { "Starting input consumer task" }
         val inputConsumerTask =
             inputConsumerTaskFactory.make(
+                config = config,
                 catalog = catalog,
                 inputFlow = inputFlow,
                 recordQueueSupplier = recordQueueSupplier,
@@ -208,6 +212,7 @@ class DefaultDestinationTaskLauncher<K : WithStream>(
                 fileTransferQueue = fileTransferQueue,
                 destinationTaskLauncher = this,
                 recordQueueForPipeline = recordQueueForPipeline,
+                fileQueueForPipeline = fileQueueForPipeline,
                 loadPipeline = loadPipeline,
                 partitioner = partitioner,
                 openStreamQueue = openStreamQueue,
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt
index 3a296117a8284..893bebbb95dee 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt
@@ -6,6 +6,7 @@ package io.airbyte.cdk.load.task.internal
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
 import io.airbyte.cdk.load.command.DestinationCatalog
+import io.airbyte.cdk.load.command.DestinationConfiguration
 import io.airbyte.cdk.load.command.DestinationStream
 import io.airbyte.cdk.load.message.Batch
 import io.airbyte.cdk.load.message.BatchEnvelope
@@ -50,6 +51,8 @@ import io.github.oshai.kotlinlogging.KotlinLogging
 import io.micronaut.context.annotation.Secondary
 import jakarta.inject.Named
 import jakarta.inject.Singleton
+import java.io.RandomAccessFile
+import java.nio.channels.FileChannel
 import java.util.concurrent.ConcurrentHashMap
 
 interface InputConsumerTask : Task
@@ -67,6 +70,7 @@ interface InputConsumerTask : Task
 @Singleton
 @Secondary
 class DefaultInputConsumerTask(
+    private val config: DestinationConfiguration,
     private val catalog: DestinationCatalog,
     private val inputFlow: ReservingDeserializingInputFlow,
     private val recordQueueSupplier:
@@ -81,6 +85,9 @@ class DefaultInputConsumerTask(
     @Named("recordQueue")
     private val recordQueueForPipeline:
         PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
+    @Named("fileQueue")
+    private val fileQueueForPipeline:
+        PartitionedQueue<PipelineEvent<StreamKey, DestinationFile>>,
     private val loadPipeline: LoadPipeline? = null,
     private val partitioner: InputPartitioner,
     private val openStreamQueue: QueueWriter<DestinationStream>
@@ -182,19 +189,21 @@ class DefaultInputConsumerTask(
                 reserved.release()
             }
             is DestinationFile -> {
-                val index = manager.incrementReadCount()
-                // destinationTaskLauncher.handleFile(stream, message, index)
-                fileTransferQueue.publish(FileTransferQueueMessage(stream, message, index))
+                val partition = manager.incrementReadCount().toInt() % fileQueueForPipeline.partitions
+                fileQueueForPipeline.publish(
+                    PipelineMessage(
+                        mapOf(manager.getCurrentCheckpointId() to 1),
+                        StreamKey(stream),
+                        message
+                    ) { reserved.release() },
+                    partition
+                )
             }
             is DestinationFileStreamComplete -> {
                 reserved.release() // safe because multiple calls conflate
                 manager.markEndOfStream(true)
-                val envelope =
-                    BatchEnvelope(
-                        SimpleBatch(Batch.State.COMPLETE),
-                        streamDescriptor = message.stream,
-                    )
-                destinationTaskLauncher.handleNewBatch(stream, envelope)
+                log.info { "Read COMPLETE for file stream $stream" }
+                fileQueueForPipeline.broadcast(PipelineEndOfStream(stream))
             }
             is DestinationFileStreamIncomplete ->
                 throw IllegalStateException("File stream $stream failed upstream, cannot continue.")
@@ -294,12 +303,14 @@ class DefaultInputConsumerTask(
             catalog.streams.forEach { recordQueueSupplier.get(it.descriptor).close() }
             fileTransferQueue.close()
             recordQueueForPipeline.close()
+            fileQueueForPipeline.close()
         }
     }
 }
 
 interface InputConsumerTaskFactory {
     fun make(
+        config: DestinationConfiguration,
         catalog: DestinationCatalog,
         inputFlow: ReservingDeserializingInputFlow,
         recordQueueSupplier:
@@ -311,6 +322,7 @@ interface InputConsumerTaskFactory {
         // Required by new interface
         recordQueueForPipeline:
             PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
+        fileQueueForPipeline: PartitionedQueue<PipelineEvent<StreamKey, DestinationFile>>,
         loadPipeline: LoadPipeline?,
         partitioner: InputPartitioner,
         openStreamQueue: QueueWriter<DestinationStream>,
@@ -323,6 +335,7 @@ class DefaultInputConsumerTaskFactory(
     private val syncManager: SyncManager,
 ) : InputConsumerTaskFactory {
     override fun make(
+        config: DestinationConfiguration,
         catalog: DestinationCatalog,
         inputFlow: ReservingDeserializingInputFlow,
         recordQueueSupplier:
@@ -334,11 +347,13 @@ class DefaultInputConsumerTaskFactory(
         // Required by new interface
         recordQueueForPipeline:
             PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
+        fileQueueForPipeline: PartitionedQueue<PipelineEvent<StreamKey, DestinationFile>>,
         loadPipeline: LoadPipeline?,
         partitioner: InputPartitioner,
         openStreamQueue: QueueWriter<DestinationStream>,
     ): InputConsumerTask {
         return DefaultInputConsumerTask(
+            config,
             catalog,
             inputFlow,
             recordQueueSupplier,
@@ -349,6 +364,7 @@ class DefaultInputConsumerTaskFactory(
 
             // Required by new interface
             recordQueueForPipeline,
+            fileQueueForPipeline,
             loadPipeline,
             partitioner,
             openStreamQueue,
diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt
index 5f5200d10bcf3..896c5dc547cc8 100644
--- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt
+++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt
@@ -4,6 +4,7 @@
 
 package io.airbyte.cdk.load.task.internal
 
+import io.airbyte.cdk.load.message.Batch
 import io.airbyte.cdk.load.message.PartitionedQueue
 import io.airbyte.cdk.load.message.PipelineEndOfStream
 import io.airbyte.cdk.load.message.PipelineEvent
@@ -71,8 +72,6 @@ class LoadPipelineStepTask<S : AutoCloseable, K1 : WithStream, T, K2 : WithStrea
                         }
 
                         // Update bookkeeping metadata
-                        input
-                            .postProcessingCallback() // TODO: Accumulate and release when persisted
                         input.checkpointCounts.forEach {
                             state.checkpointCounts.merge(it.key, it.value) { old, new -> old + new }
                         }
@@ -112,6 +111,9 @@ class LoadPipelineStepTask<S : AutoCloseable, K1 : WithStream, T, K2 : WithStrea
                             stateStore.remove(input.key)
                         }
 
+                        input
+                            .postProcessingCallback() // TODO: Accumulate and release when persisted
+
                         stateStore
                     }
                     is PipelineEndOfStream -> {
@@ -147,7 +149,6 @@ class LoadPipelineStepTask<S : AutoCloseable, K1 : WithStream, T, K2 : WithStrea
         checkpointCounts: Map<CheckpointId, Long>,
         output: U
     ) {
-
         // Only publish the output if there's a next step.
         outputQueue?.let {
             val outputKey = outputPartitioner!!.getOutputKey(inputKey, output)
diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/FileLoaderMemoryMapFileTask.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/FileLoaderMemoryMapFileTask.kt
new file mode 100644
index 0000000000000..ee63175f1963a
--- /dev/null
+++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/FileLoaderMemoryMapFileTask.kt
@@ -0,0 +1,32 @@
+package io.airbyte.cdk.load.pipline.object_storage
+
+import io.airbyte.cdk.load.command.DestinationCatalog
+import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory
+import io.airbyte.cdk.load.file.object_storage.Part
+import io.airbyte.cdk.load.message.DestinationFile
+import io.airbyte.cdk.load.message.PartitionedQueue
+import io.airbyte.cdk.load.message.PipelineEvent
+import io.airbyte.cdk.load.message.QueueWriter
+import io.airbyte.cdk.load.message.StreamKey
+import io.airbyte.cdk.load.pipeline.BatchUpdate
+import io.airbyte.cdk.load.task.SelfTerminating
+import io.airbyte.cdk.load.task.Task
+import io.airbyte.cdk.load.task.TerminalCondition
+import io.airbyte.cdk.load.write.object_storage.FileLoader
+import jakarta.inject.Named
+
+class FileLoaderMemoryMapFileTask(
+    private val catalog: DestinationCatalog,
+    private val fileLoader: FileLoader,
+    @Named("fileQueue")
+    val inputQueue: PartitionedQueue<PipelineEvent<StreamKey, DestinationFile>>,
+    @Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
+    @Named("objectLoaderPartQueue") val partQueue: PartitionedQueue<PipelineEvent<ObjectKey, Part>>,
+    val pathFactory: ObjectStoragePathFactory,
+): Task {
+    override val terminalCondition: TerminalCondition = SelfTerminating
+
+    override suspend fun execute() {
+
+    }
+}
diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/FileLoaderPartStep.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/FileLoaderPartStep.kt
new file mode 100644
index 0000000000000..836dd4971d443
--- /dev/null
+++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/FileLoaderPartStep.kt
@@ -0,0 +1,49 @@
+package io.airbyte.cdk.load.pipline.object_storage
+
+import io.airbyte.cdk.load.command.DestinationCatalog
+import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory
+import io.airbyte.cdk.load.file.object_storage.Part
+import io.airbyte.cdk.load.message.DestinationFile
+import io.airbyte.cdk.load.message.PartitionedQueue
+import io.airbyte.cdk.load.message.PipelineEvent
+import io.airbyte.cdk.load.message.QueueWriter
+import io.airbyte.cdk.load.message.StreamKey
+import io.airbyte.cdk.load.pipeline.BatchUpdate
+import io.airbyte.cdk.load.pipeline.LoadPipelineStep
+import io.airbyte.cdk.load.pipeline.RecordCountFlushStrategy
+import io.airbyte.cdk.load.task.Task
+import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask
+import io.airbyte.cdk.load.write.object_storage.FileLoader
+import io.airbyte.cdk.load.write.object_storage.ObjectLoader
+import io.micronaut.context.annotation.Replaces
+import io.micronaut.context.annotation.Requires
+import io.micronaut.context.annotation.Value
+import jakarta.inject.Named
+import jakarta.inject.Singleton
+
+@Singleton
+@Requires(bean = FileLoader::class)
+@Replaces(ObjectLoaderPartStep::class)
+class FileLoaderPartStep(
+    private val catalog: DestinationCatalog,
+    private val fileLoader: FileLoader,
+    @Named("fileQueue")
+    val inputQueue: PartitionedQueue<PipelineEvent<StreamKey, DestinationFile>>,
+    @Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
+    @Named("objectLoaderPartQueue") val partQueue: PartitionedQueue<PipelineEvent<ObjectKey, Part>>,
+    val pathFactory: ObjectStoragePathFactory,
+) : LoadPipelineStep {
+    override val numWorkers: Int = fileLoader.numPartWorkers
+
+    override fun taskForPartition(partition: Int): Task {
+        return FileLoaderProcessFileTask(
+            catalog,
+            pathFactory,
+            fileLoader,
+            inputQueue,
+            batchQueue,
+            partQueue,
+            partition
+        )
+    }
+}
diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/FileLoaderPipeline.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/FileLoaderPipeline.kt
new file mode 100644
index 0000000000000..99a5cea6c0b4c
--- /dev/null
+++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/FileLoaderPipeline.kt
@@ -0,0 +1,17 @@
+package io.airbyte.cdk.load.pipline.object_storage
+
+import io.airbyte.cdk.load.pipeline.LoadPipeline
+import io.airbyte.cdk.load.write.object_storage.FileLoader
+import io.micronaut.context.annotation.Replaces
+import io.micronaut.context.annotation.Requires
+import jakarta.inject.Singleton
+
+@Singleton
+@Requires(bean = FileLoader::class)
+@Replaces(ObjectLoaderPipeline::class)
+class FileLoaderPipeline(
+    partStep: FileLoaderPartStep,
+    uploadStep: ObjectLoaderUploadStep
+): LoadPipeline(
+    listOf(partStep, uploadStep)
+)
diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/FileLoaderProcessFileTask.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/FileLoaderProcessFileTask.kt
new file mode 100644
index 0000000000000..cbf007a48b4eb
--- /dev/null
+++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/FileLoaderProcessFileTask.kt
@@ -0,0 +1,122 @@
+package io.airbyte.cdk.load.pipline.object_storage
+
+import io.airbyte.cdk.load.command.DestinationCatalog
+import io.airbyte.cdk.load.command.DestinationStream
+import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory
+import io.airbyte.cdk.load.file.object_storage.Part
+import io.airbyte.cdk.load.file.object_storage.PartFactory
+import io.airbyte.cdk.load.message.Batch
+import io.airbyte.cdk.load.message.DestinationFile
+import io.airbyte.cdk.load.message.PartitionedQueue
+import io.airbyte.cdk.load.message.PipelineEndOfStream
+import io.airbyte.cdk.load.message.PipelineEvent
+import io.airbyte.cdk.load.message.PipelineMessage
+import io.airbyte.cdk.load.message.QueueWriter
+import io.airbyte.cdk.load.message.StreamKey
+import io.airbyte.cdk.load.pipeline.BatchStateUpdate
+import io.airbyte.cdk.load.pipeline.BatchUpdate
+import io.airbyte.cdk.load.state.CheckpointId
+import io.airbyte.cdk.load.task.SelfTerminating
+import io.airbyte.cdk.load.task.Task
+import io.airbyte.cdk.load.task.TerminalCondition
+import io.airbyte.cdk.load.write.object_storage.FileLoader
+import io.github.oshai.kotlinlogging.KotlinLogging
+import java.io.File
+import java.nio.file.Path
+import java.util.concurrent.atomic.AtomicLong
+import kotlinx.coroutines.yield
+
+
+class FileLoaderProcessFileTask(
+    val catalog: DestinationCatalog,
+    val pathFactory: ObjectStoragePathFactory,
+    val fileLoader: FileLoader,
+    val inputQueue: PartitionedQueue<PipelineEvent<StreamKey, DestinationFile>>,
+    val batchQueue: QueueWriter<BatchUpdate>,
+    val outputQueue: PartitionedQueue<PipelineEvent<ObjectKey, Part>>,
+    val partition: Int
+): Task {
+    private val log = KotlinLogging.logger {}
+
+    override val terminalCondition: TerminalCondition = SelfTerminating
+
+    private val partitioner = ObjectLoaderPartPartitioner<DestinationFile>()
+
+    // Since we're denormalizing the parts, we need to force bookkeeping to work properly.
+    inner class PartCallback(
+        private val stream: DestinationStream.Descriptor,
+        numParts: Int,
+        private val checkpointCounts: Map<CheckpointId, Long>,
+    ) {
+        private val numParts = AtomicLong(numParts.toLong())
+
+        suspend fun onUpload() {
+            if (numParts.decrementAndGet() == 0L) {
+                batchQueue.publish(BatchStateUpdate(stream, checkpointCounts, Batch.State.COMPLETE))
+            }
+        }
+    }
+
+    override suspend fun execute() {
+        inputQueue.consume(partition).collect { event ->
+            when (event) {
+                is PipelineMessage -> {
+                    val stream = catalog.getStream(event.key.stream)
+                    val file = event.value
+                    val key =
+                        Path.of(pathFactory.getFinalDirectory(stream), "${file.fileMessage.fileRelativePath}")
+                            .toString()
+                    val fileSize = file.fileMessage.bytes!!
+                    val fileName = file.fileMessage.fileUrl!!
+                    // We expect a final marker even if size % partSize == 0.
+                    val numParts = (fileSize / fileLoader.partSizeBytes).toInt() + 1
+
+                    val partFactory =
+                        PartFactory(
+                            key = key,
+                            fileNumber = 0,
+                        )
+
+                    log.info { "Processing file $fileName with $numParts parts" }
+                    val localFile = File(fileName)
+                    val fileInputStream = localFile.inputStream()
+                    val callback = PartCallback(event.key.stream, numParts, event.checkpointCounts)
+
+                    while (true) {
+                        val bytes = fileInputStream.readNBytes(fileLoader.partSizeBytes.toInt())
+                        log.info { "Read ${bytes.size} bytes from $fileName" }
+
+                        if (bytes.isEmpty()) {
+                            val outputPart = partFactory.nextPart(null, isFinal = true)
+                            publishPart(callback, event.key.stream, outputPart)
+                            break
+                        } else if (bytes.size < fileLoader.partSizeBytes) {
+                            val outputPart = partFactory.nextPart(bytes, isFinal = true)
+                            publishPart(callback, event.key.stream, outputPart)
+                            break
+                        } else {
+                            val outputPart = partFactory.nextPart(bytes, isFinal = false)
+                            publishPart(callback, event.key.stream, outputPart)
+                        }
+                    }
+                    log.info { "Finished reading $fileName, deleting." }
+                    fileInputStream.close()
+                    localFile.delete()
+                }
+
+                is PipelineEndOfStream -> {
+                    outputQueue.broadcast(PipelineEndOfStream(event.stream))
+                }
+            }
+        }
+    }
+
+    private suspend fun publishPart(callback: PartCallback, stream: DestinationStream.Descriptor, part: Part) {
+        val outputKey = ObjectKey(stream, part.key)
+        val partition = partitioner.getPart(outputKey, fileLoader.numPartWorkers)
+        val outputMessage = PipelineMessage(emptyMap(), outputKey, part) {
+            callback.onUpload()
+        }
+        outputQueue.publish(outputMessage, partition)
+    }
+}
diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartPartitioner.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartPartitioner.kt
index 6fe3d749c7fee..ef42f7605d0f5 100644
--- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartPartitioner.kt
+++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartPartitioner.kt
@@ -19,8 +19,8 @@ import kotlin.random.Random
  *
  * The partition is round-robin, for maximum concurrency.
  */
-class ObjectLoaderPartPartitioner :
-    OutputPartitioner<StreamKey, DestinationRecordAirbyteValue, ObjectKey, Part> {
+class ObjectLoaderPartPartitioner<T> :
+    OutputPartitioner<StreamKey, T, ObjectKey, Part> {
     // Start on a random value
     private var nextPartition = Random(System.currentTimeMillis()).nextInt(Int.MAX_VALUE)
 
diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartQueue.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartQueue.kt
index 3a84377a3525c..f04529c0302d5 100644
--- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartQueue.kt
+++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartQueue.kt
@@ -36,27 +36,27 @@ class ObjectLoaderPartQueueFactory(
     fun objectLoaderPartQueue(): PartitionedQueue<PipelineMessage<ObjectKey, Part>> {
         val bytes = memoryManager.totalCapacityBytes * loader.maxMemoryRatioReservedForParts
         val reservation = runBlocking { memoryManager.reserve(bytes.toLong(), this) }
-        val bytesPerPartition = reservation.bytesReserved / loader.numPartWorkers
+        val bytesPerPartition = reservation.bytesReserved // / loader.numUploadWorkers
         val partsPerPartition = bytesPerPartition / loader.partSizeBytes
 
-        if (partsPerPartition < 1) {
-            throw IllegalArgumentException(
-                "Reserved $bytes/${memoryManager.totalCapacityBytes}b not enough for ${loader.numPartWorkers} ${loader.partSizeBytes}b parts"
-            )
-        }
-
-        log.info {
-            "Reserved $bytes/${memoryManager.totalCapacityBytes}b for ${loader.numPartWorkers} ${loader.partSizeBytes}b parts => $partsPerPartition capacity per queue partition"
-        }
+//        if (partsPerPartition < 1) {
+//            throw IllegalArgumentException(
+//                "Reserved $bytes/${memoryManager.totalCapacityBytes}b not enough for ${loader.numUploadWorkers} ${loader.partSizeBytes}b parts"
+//            )
+//        }
+//
+//        log.info {
+//            "Reserved $bytes/${memoryManager.totalCapacityBytes}b for ${loader.numUploadWorkers} ${loader.partSizeBytes}b parts => $partsPerPartition capacity per queue partition"
+//        }
 
         return PartitionedQueue(
-            (0 until loader.numUploadWorkers)
-                .map {
-                    ChannelMessageQueue(
+//            (0 until loader.numUploadWorkers)
+//                .map {
+                    arrayOf(ChannelMessageQueue(
                         Channel<PipelineMessage<ObjectKey, Part>>(partsPerPartition.toInt())
-                    )
-                }
-                .toTypedArray()
+                    ))
+//                }
+//                .toTypedArray()
         )
     }
 }
diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartToObjectAccumulator.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartToObjectAccumulator.kt
index abb5ee32df151..b8d2eb528bfdd 100644
--- a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartToObjectAccumulator.kt
+++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartToObjectAccumulator.kt
@@ -67,9 +67,9 @@ class ObjectLoaderPartToObjectAccumulator(
 
     override suspend fun accept(input: Part, state: State): Pair<State, ObjectResult?> {
         input.bytes?.let { state.streamingUpload.uploadPart(it, input.partIndex) }
-        if (input.bytes == null) {
-            throw IllegalStateException("Empty non-final part received: this should not happen")
-        }
+//        if (input.bytes == null) {
+//            throw IllegalStateException("Empty non-final part received: this should not happen")
+//        }
         state.bookkeeper.add(input)
         if (state.bookkeeper.isComplete) {
             return Pair(state, finish(state))
@@ -81,6 +81,6 @@ class ObjectLoaderPartToObjectAccumulator(
         if (state.bookkeeper.isComplete) {
             state.streamingUpload.complete()
         } // else assume another part is finishing this
-        return ObjectResult(Batch.State.COMPLETE)
+        return ObjectResult(Batch.State.STAGED)
     }
 }
diff --git a/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FileLoader.kt b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FileLoader.kt
new file mode 100644
index 0000000000000..38f20ba53b15f
--- /dev/null
+++ b/airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/write/object_storage/FileLoader.kt
@@ -0,0 +1,14 @@
+package io.airbyte.cdk.load.write.object_storage
+
+/**
+ * [FileLoader] is for the use case where the client wants to move whole files directly into
+ * object storage. Incoming records contain file metadata, and the file itself is stored in a
+ * staging directory on the pod where the connector is running.
+ *
+ * Usage:
+ *
+ * Declare a bean of type [FileLoader] and configure as per the documentation in [ObjectLoader].
+ * (The only exception is the that [io.airbyte.cdk.load.command.object_storage.ObjectStoragePathConfiguration.fileNamePattern]
+ * will be ignored in favor of the filename provided in the file metadata message.)
+ */
+interface FileLoader: ObjectLoader
diff --git a/airbyte-integrations/connectors/destination-s3/metadata.yaml b/airbyte-integrations/connectors/destination-s3/metadata.yaml
index 36ec28227c669..652a236bc3add 100644
--- a/airbyte-integrations/connectors/destination-s3/metadata.yaml
+++ b/airbyte-integrations/connectors/destination-s3/metadata.yaml
@@ -2,7 +2,7 @@ data:
   connectorSubtype: file
   connectorType: destination
   definitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
-  dockerImageTag: 1.5.4
+  dockerImageTag: 1.5.5
   dockerRepository: airbyte/destination-s3
   githubIssueLabel: destination-s3
   icon: s3.svg
diff --git a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Configuration.kt b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Configuration.kt
index d59c806cfdec5..683e8befccbf2 100644
--- a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Configuration.kt
+++ b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Configuration.kt
@@ -20,6 +20,8 @@ import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfigurati
 import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfigurationProvider
 import io.airbyte.cdk.load.command.s3.S3BucketConfiguration
 import io.airbyte.cdk.load.command.s3.S3BucketConfigurationProvider
+import io.airbyte.cdk.load.command.s3.S3ClientConfiguration
+import io.airbyte.cdk.load.command.s3.S3ClientConfigurationProvider
 import io.micronaut.context.annotation.Factory
 import jakarta.inject.Singleton
 import java.io.OutputStream
@@ -46,6 +48,7 @@ data class S3V2Configuration<T : OutputStream>(
     val maxMemoryRatioReservedForParts: Double = 0.2,
     val objectSizeBytes: Long = 200L * 1024 * 1024,
     val partSizeBytes: Long = 10L * 1024 * 1024,
+    val useLegacyClient: Boolean = false,
 ) :
     DestinationConfiguration(),
     AWSAccessKeyConfigurationProvider,
@@ -54,7 +57,10 @@ data class S3V2Configuration<T : OutputStream>(
     ObjectStoragePathConfigurationProvider,
     ObjectStorageFormatConfigurationProvider,
     ObjectStorageUploadConfigurationProvider,
-    ObjectStorageCompressionConfigurationProvider<T>
+    ObjectStorageCompressionConfigurationProvider<T>,
+    S3ClientConfigurationProvider {
+       override val s3ClientConfiguration = S3ClientConfiguration(useLegacyJavaClient = useLegacyClient)
+    }
 
 @Singleton
 class S3V2ConfigurationFactory :
@@ -67,6 +73,11 @@ class S3V2ConfigurationFactory :
             objectStoragePathConfiguration = pojo.toObjectStoragePathConfiguration(),
             objectStorageFormatConfiguration = pojo.toObjectStorageFormatConfiguration(),
             objectStorageCompressionConfiguration = pojo.toCompressionConfiguration(),
+            numPartWorkers = pojo.numPartWorkers ?: 2,
+            numUploadWorkers = pojo.numObjectLoaders ?: 3,
+            maxMemoryRatioReservedForParts = pojo.maxMemoryRatioReservedForParts ?: 0.2,
+            partSizeBytes = (pojo.partSizeMb ?: 10) * 1024L * 1024L,
+            useLegacyClient = pojo.useLegacyClient ?: false,
         )
     }
 }
diff --git a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2FileLoader.kt b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2FileLoader.kt
new file mode 100644
index 0000000000000..1e730d65c086a
--- /dev/null
+++ b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2FileLoader.kt
@@ -0,0 +1,19 @@
+package io.airbyte.integrations.destination.s3_v2
+
+import io.airbyte.cdk.load.write.object_storage.FileLoader
+import io.airbyte.cdk.load.write.object_storage.ObjectLoader
+import io.micronaut.context.annotation.Requires
+import io.micronaut.context.condition.Condition
+import io.micronaut.context.condition.ConditionContext
+import jakarta.inject.Singleton
+
+@Singleton
+@Requires(property = "airbyte.destination.core.file-transfer.enabled", value = "true")
+class S3V2FileLoader(
+    config: S3V2Configuration<*>
+): FileLoader, ObjectLoader {
+    override val numPartWorkers: Int = config.numPartWorkers
+    override val numUploadWorkers: Int = config.numUploadWorkers
+    override val partSizeBytes: Long = config.partSizeBytes
+    override val maxMemoryRatioReservedForParts: Double = config.maxMemoryRatioReservedForParts
+}
diff --git a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Specification.kt b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Specification.kt
index a1e83cf78b73e..31ab6711642e4 100644
--- a/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Specification.kt
+++ b/airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2Specification.kt
@@ -4,6 +4,7 @@
 
 package io.airbyte.integrations.destination.s3_v2
 
+import com.fasterxml.jackson.annotation.JsonProperty
 import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject
 import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
 import io.airbyte.cdk.command.ConfigurationSpecification
@@ -74,6 +75,17 @@ class S3V2Specification :
             "{\"examples\":[\"{date}\",\"{date:yyyy_MM}\",\"{timestamp}\",\"{part_number}\",\"{sync_id}\"],\"order\":9}"
     )
     override val fileNamePattern: String? = null
+
+    @get:JsonProperty("num_part_workers")
+    val numPartWorkers: Int? = null
+    @get:JsonProperty("num_upload_workers")
+    val numObjectLoaders: Int? = null
+    @get:JsonProperty("part_size_mb")
+    val partSizeMb: Int? = null
+    @get:JsonProperty("max_memory_ratio_reserved_for_parts")
+    val maxMemoryRatioReservedForParts: Double? = null
+    @get:JsonProperty("use_legacy_client")
+    val useLegacyClient: Boolean? = null
 }
 
 @Singleton
diff --git a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2PerformanceTest.kt b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2PerformanceTest.kt
index 6b02d9d8a32a8..0b9cdbc867296 100644
--- a/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2PerformanceTest.kt
+++ b/airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2PerformanceTest.kt
@@ -8,7 +8,7 @@ import io.airbyte.cdk.load.write.BasicPerformanceTest
 import org.junit.jupiter.api.Disabled
 import org.junit.jupiter.api.Test
 
-@Disabled("We don't want this to run in CI")
+//@Disabled("We don't want this to run in CI")
 class S3V2JsonNoFrillsPerformanceTest :
     BasicPerformanceTest(
         configContents = S3V2TestUtils.getConfig(S3V2TestUtils.JSON_UNCOMPRESSED_CONFIG_PATH),