Skip to content

Commit f48b486

Browse files
everything is a factory (and the complete counts work)
1 parent 13001de commit f48b486

File tree

7 files changed

+61
-75
lines changed

7 files changed

+61
-75
lines changed

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt

+37-7
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ class LoadPipelineStepTask<S : AutoCloseable, K1 : WithStream, T, K2 : WithStrea
5555
private val flushStrategy: PipelineFlushStrategy?,
5656
private val part: Int,
5757
private val numWorkers: Int,
58-
private val streamCompletions: ConcurrentHashMap<DestinationStream.Descriptor, AtomicInteger>
58+
private val taskIndex: Int,
59+
private val streamCompletions:
60+
ConcurrentHashMap<Pair<Int, DestinationStream.Descriptor>, AtomicInteger>
5961
) : Task {
6062
override val terminalCondition: TerminalCondition = OnEndOfSync
6163

@@ -150,7 +152,7 @@ class LoadPipelineStepTask<S : AutoCloseable, K1 : WithStream, T, K2 : WithStrea
150152
// Only forward end-of-stream if ALL workers have seen end-of-stream.
151153
if (
152154
streamCompletions
153-
.getOrPut(input.stream) { AtomicInteger(0) }
155+
.getOrPut(Pair(taskIndex, input.stream)) { AtomicInteger(0) }
154156
.incrementAndGet() == numWorkers
155157
) {
156158
outputQueue?.broadcast(PipelineEndOfStream(input.stream))
@@ -211,7 +213,10 @@ class LoadPipelineStepTaskFactory(
211213
@Value("\${airbyte.destination.core.record-batch-size-override:null}")
212214
val batchSizeOverride: Long? = null,
213215
) {
214-
private val streamCompletions = ConcurrentHashMap<DestinationStream.Descriptor, AtomicInteger>()
216+
// A map of (TaskIndex, Stream) -> Count_of_closed streams to ensure eos is not forwared from
217+
// task N to N+1 until all workers have seen eos.
218+
private val streamCompletions =
219+
ConcurrentHashMap<Pair<Int, DestinationStream.Descriptor>, AtomicInteger>()
215220

216221
fun <S : AutoCloseable, K1 : WithStream, T, K2 : WithStream, U : Any> create(
217222
batchAccumulator: BatchAccumulator<S, K1, T, U>,
@@ -221,6 +226,7 @@ class LoadPipelineStepTaskFactory(
221226
flushStrategy: PipelineFlushStrategy?,
222227
part: Int,
223228
numWorkers: Int,
229+
taskIndex: Int,
224230
): LoadPipelineStepTask<S, K1, T, K2, U> {
225231
return LoadPipelineStepTask(
226232
batchAccumulator,
@@ -231,6 +237,7 @@ class LoadPipelineStepTaskFactory(
231237
flushStrategy,
232238
part,
233239
numWorkers,
240+
taskIndex,
234241
streamCompletions
235242
)
236243
}
@@ -249,24 +256,26 @@ class LoadPipelineStepTaskFactory(
249256
outputQueue,
250257
batchSizeOverride?.let { RecordCountFlushStrategy(it) },
251258
part,
252-
numWorkers
259+
numWorkers,
260+
taskIndex = 0
253261
)
254262
}
255263

256-
fun <S : AutoCloseable, K1 : WithStream, T, K2 : WithStream, U : Any> createFinalStep(
264+
fun <S : AutoCloseable, K1 : WithStream, T, U : Any> createFinalStep(
257265
batchAccumulator: BatchAccumulator<S, K1, T, U>,
258266
inputQueue: PartitionedQueue<PipelineEvent<K1, T>>,
259267
part: Int,
260268
numWorkers: Int,
261-
): LoadPipelineStepTask<S, K1, T, K2, U> {
269+
): LoadPipelineStepTask<S, K1, T, K1, U> {
262270
return create(
263271
batchAccumulator,
264272
inputQueue.consume(part),
265273
null,
266274
null,
267275
null,
268276
part,
269-
numWorkers
277+
numWorkers,
278+
taskIndex = -1
270279
)
271280
}
272281

@@ -277,4 +286,25 @@ class LoadPipelineStepTaskFactory(
277286
): LoadPipelineStepTask<S, StreamKey, DestinationRecordRaw, K2, U> {
278287
return createFirstStep(batchAccumulator, null, null, part, numWorkers)
279288
}
289+
290+
fun <S : AutoCloseable, K1 : WithStream, T, K2 : WithStream, U : Any> createIntermediateStep(
291+
batchAccumulator: BatchAccumulator<S, K1, T, U>,
292+
inputQueue: PartitionedQueue<PipelineEvent<K1, T>>,
293+
outputPartitioner: OutputPartitioner<K1, T, K2, U>?,
294+
outputQueue: PartitionedQueue<PipelineEvent<K2, U>>?,
295+
part: Int,
296+
numWorkers: Int,
297+
taskIndex: Int,
298+
): LoadPipelineStepTask<S, K1, T, K2, U> {
299+
return create(
300+
batchAccumulator,
301+
inputQueue.consume(part),
302+
outputPartitioner,
303+
outputQueue,
304+
null,
305+
part,
306+
numWorkers,
307+
taskIndex
308+
)
309+
}
280310
}

airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartFormatterStep.kt

+6-24
Original file line numberDiff line numberDiff line change
@@ -4,53 +4,35 @@
44

55
package io.airbyte.cdk.load.pipline.object_storage
66

7-
import io.airbyte.cdk.load.command.DestinationStream
8-
import io.airbyte.cdk.load.message.DestinationRecordRaw
97
import io.airbyte.cdk.load.message.PartitionedQueue
108
import io.airbyte.cdk.load.message.PipelineEvent
11-
import io.airbyte.cdk.load.message.QueueWriter
12-
import io.airbyte.cdk.load.message.StreamKey
13-
import io.airbyte.cdk.load.pipeline.BatchUpdate
149
import io.airbyte.cdk.load.pipeline.LoadPipelineStep
15-
import io.airbyte.cdk.load.pipeline.RecordCountFlushStrategy
1610
import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask
11+
import io.airbyte.cdk.load.task.internal.LoadPipelineStepTaskFactory
1712
import io.airbyte.cdk.load.write.object_storage.ObjectLoader
1813
import io.micronaut.context.annotation.Requires
19-
import io.micronaut.context.annotation.Value
2014
import jakarta.inject.Named
2115
import jakarta.inject.Singleton
22-
import java.util.concurrent.ConcurrentHashMap
23-
import java.util.concurrent.atomic.AtomicInteger
2416

2517
@Singleton
2618
@Requires(bean = ObjectLoader::class)
2719
class ObjectLoaderPartFormatterStep(
2820
private val objectLoader: ObjectLoader,
29-
private val recordToPartAccumulator: ObjectLoaderPartFormatter<*>,
30-
@Named("recordQueue")
31-
val inputQueue: PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordRaw>>,
32-
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
21+
private val partFormatter: ObjectLoaderPartFormatter<*>,
3322
@Named("objectLoaderPartQueue")
3423
val outputQueue:
3524
PartitionedQueue<PipelineEvent<ObjectKey, ObjectLoaderPartFormatter.FormattedPart>>,
36-
@Value("\${airbyte.destination.core.record-batch-size-override:null}")
37-
val batchSizeOverride: Long? = null,
25+
val taskFactory: LoadPipelineStepTaskFactory
3826
) : LoadPipelineStep {
3927
override val numWorkers: Int = objectLoader.numPartWorkers
40-
private val streamCompletionMap =
41-
ConcurrentHashMap<DestinationStream.Descriptor, AtomicInteger>()
4228

4329
override fun taskForPartition(partition: Int): LoadPipelineStepTask<*, *, *, *, *> {
44-
return LoadPipelineStepTask(
45-
recordToPartAccumulator,
46-
inputQueue.consume(partition),
47-
batchQueue,
30+
return taskFactory.createFirstStep(
31+
partFormatter,
4832
ObjectLoaderFormattedPartPartitioner(),
4933
outputQueue,
50-
batchSizeOverride?.let { RecordCountFlushStrategy(it) },
5134
partition,
52-
numWorkers,
53-
streamCompletionMap
35+
numWorkers
5436
)
5537
}
5638
}

airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderPartLoaderStep.kt

+9-17
Original file line numberDiff line numberDiff line change
@@ -4,47 +4,39 @@
44

55
package io.airbyte.cdk.load.pipline.object_storage
66

7-
import io.airbyte.cdk.load.command.DestinationStream
87
import io.airbyte.cdk.load.message.PartitionedQueue
98
import io.airbyte.cdk.load.message.PipelineEvent
10-
import io.airbyte.cdk.load.message.QueueWriter
11-
import io.airbyte.cdk.load.pipeline.BatchUpdate
129
import io.airbyte.cdk.load.pipeline.LoadPipelineStep
1310
import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask
11+
import io.airbyte.cdk.load.task.internal.LoadPipelineStepTaskFactory
1412
import io.airbyte.cdk.load.write.object_storage.ObjectLoader
1513
import io.micronaut.context.annotation.Requires
1614
import jakarta.inject.Named
1715
import jakarta.inject.Singleton
18-
import java.util.concurrent.ConcurrentHashMap
19-
import java.util.concurrent.atomic.AtomicInteger
2016

2117
@Singleton
2218
@Requires(bean = ObjectLoader::class)
2319
class ObjectLoaderPartLoaderStep(
2420
val loader: ObjectLoader,
25-
val accumulator: ObjectLoaderPartLoader,
21+
val partLoader: ObjectLoaderPartLoader,
2622
@Named("objectLoaderPartQueue")
2723
val inputQueue:
2824
PartitionedQueue<PipelineEvent<ObjectKey, ObjectLoaderPartFormatter.FormattedPart>>,
2925
@Named("objectLoaderLoadedPartQueue")
3026
val outputQueue: PartitionedQueue<PipelineEvent<ObjectKey, ObjectLoaderPartLoader.PartResult>>,
31-
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
27+
val taskFactory: LoadPipelineStepTaskFactory,
3228
) : LoadPipelineStep {
3329
override val numWorkers: Int = loader.numUploadWorkers
34-
private val streamCompletionMap =
35-
ConcurrentHashMap<DestinationStream.Descriptor, AtomicInteger>()
3630

3731
override fun taskForPartition(partition: Int): LoadPipelineStepTask<*, *, *, *, *> {
38-
return LoadPipelineStepTask(
39-
accumulator,
40-
inputQueue.consume(partition),
41-
batchQueue,
42-
outputPartitioner = ObjectLoaderLoadedPartPartitioner(),
43-
outputQueue = outputQueue,
44-
flushStrategy = null,
32+
return taskFactory.createIntermediateStep(
33+
partLoader,
34+
inputQueue,
35+
ObjectLoaderLoadedPartPartitioner(),
36+
outputQueue,
4537
partition,
4638
numWorkers,
47-
streamCompletionMap
39+
taskIndex = 2
4840
)
4941
}
5042
}

airbyte-cdk/bulk/toolkits/load-object-storage/src/main/kotlin/io/airbyte/cdk/load/pipline/object_storage/ObjectLoaderUploadCompleterStep.kt

+4-18
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,15 @@
44

55
package io.airbyte.cdk.load.pipline.object_storage
66

7-
import io.airbyte.cdk.load.command.DestinationStream
87
import io.airbyte.cdk.load.message.PartitionedQueue
98
import io.airbyte.cdk.load.message.PipelineEvent
10-
import io.airbyte.cdk.load.message.QueueWriter
11-
import io.airbyte.cdk.load.pipeline.BatchUpdate
129
import io.airbyte.cdk.load.pipeline.LoadPipelineStep
1310
import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask
11+
import io.airbyte.cdk.load.task.internal.LoadPipelineStepTaskFactory
1412
import io.airbyte.cdk.load.write.object_storage.ObjectLoader
1513
import io.micronaut.context.annotation.Requires
1614
import jakarta.inject.Named
1715
import jakarta.inject.Singleton
18-
import java.util.concurrent.ConcurrentHashMap
19-
import java.util.concurrent.atomic.AtomicInteger
2016

2117
@Singleton
2218
@Requires(bean = ObjectLoader::class)
@@ -25,26 +21,16 @@ class ObjectLoaderUploadCompleterStep(
2521
val uploadCompleter: ObjectLoaderUploadCompleter,
2622
@Named("objectLoaderLoadedPartQueue")
2723
val inputQueue: PartitionedQueue<PipelineEvent<ObjectKey, ObjectLoaderPartLoader.PartResult>>,
28-
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
24+
val taskFactory: LoadPipelineStepTaskFactory
2925
) : LoadPipelineStep {
3026
override val numWorkers: Int = objectLoader.numUploadCompleters
31-
private val streamCompletionMap =
32-
ConcurrentHashMap<DestinationStream.Descriptor, AtomicInteger>()
3327

3428
override fun taskForPartition(partition: Int): LoadPipelineStepTask<*, *, *, *, *> {
35-
return LoadPipelineStepTask(
29+
return taskFactory.createFinalStep(
3630
uploadCompleter,
37-
inputQueue.consume(partition),
38-
batchQueue,
39-
null,
40-
null
41-
as
42-
PartitionedQueue<
43-
PipelineEvent<ObjectKey, ObjectLoaderUploadCompleter.UploadResult>>?,
44-
null,
31+
inputQueue,
4532
partition,
4633
numWorkers,
47-
streamCompletionMap
4834
)
4935
}
5036
}

airbyte-cdk/bulk/toolkits/load-object-storage/src/test/kotlin/io/airbyte/cdk/load/pipeline/object_storage/ObjectLoaderPartPartitionerTest.kt

+1-2
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,13 @@ import org.junit.jupiter.api.Test
1717
class ObjectLoaderPartPartitionerTest<K : WithStream, T> {
1818
@Test
1919
fun `partitioner always assigns 0`() {
20-
val keys = (0 until 12).map(Int::toString)
2120
val partitioner = ObjectLoaderFormattedPartPartitioner<K, T>()
2221
val stream = DestinationStream.Descriptor("test", "stream")
2322
val numParts = 3
2423
var lastPartition: Int? = null
2524
(0 until 12).forEach {
2625
val partition = partitioner.getPart(ObjectKey(stream, it.toString()), numParts)
27-
lastPartition?.let { last -> assertEquals(0, partition) }
26+
lastPartition?.let { _ -> assertEquals(0, partition) }
2827
lastPartition = partition
2928
}
3029
}

airbyte-integrations/connectors/destination-s3/src/main/kotlin/S3V2ObjectLoader.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.integrations.destination.s3_v2
66

7+
import io.airbyte.cdk.load.pipeline.RoundRobinInputPartitioner
78
import io.airbyte.cdk.load.write.object_storage.ObjectLoader
89

910
/**
@@ -19,4 +20,5 @@ class S3V2ObjectLoader(config: S3V2Configuration<*>) : ObjectLoader {
1920
override val partSizeBytes: Long = config.partSizeBytes
2021
}
2122

22-
// @Singleton class S3V2RoundRobinInputPartitioner : RoundRobinInputPartitioner()
23+
// @Singleton
24+
class S3V2RoundRobinInputPartitioner : RoundRobinInputPartitioner()

airbyte-integrations/connectors/destination-s3/src/test-integration/kotlin/io/airbyte/integrations/destination/s3_v2/S3V2PerformanceTest.kt

+1-6
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import io.airbyte.cdk.load.write.BasicPerformanceTest
88
import org.junit.jupiter.api.Disabled
99
import org.junit.jupiter.api.Test
1010

11-
// @Disabled("We don't want this to run in CI")
11+
@Disabled("We don't want this to run in CI")
1212
class S3V2JsonNoFrillsPerformanceTest :
1313
BasicPerformanceTest(
1414
configContents = S3V2TestUtils.getConfig(S3V2TestUtils.JSON_UNCOMPRESSED_CONFIG_PATH),
@@ -22,11 +22,6 @@ class S3V2JsonNoFrillsPerformanceTest :
2222
override fun testFileTransfer() {
2323
super.testFileTransfer()
2424
}
25-
26-
@Test
27-
override fun testInsertRecords() {
28-
super.testInsertRecords()
29-
}
3025
}
3126

3227
@Disabled("We don't want this to run in CI")

0 commit comments

Comments
 (0)