Skip to content

Commit f456d2f

Browse files
WIP: File Loader
1 parent 1b72de1 commit f456d2f

File tree

23 files changed

+360
-48
lines changed

23 files changed

+360
-48
lines changed

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt

+16
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import io.airbyte.cdk.load.command.DestinationConfiguration
99
import io.airbyte.cdk.load.command.DestinationStream
1010
import io.airbyte.cdk.load.message.BatchEnvelope
1111
import io.airbyte.cdk.load.message.ChannelMessageQueue
12+
import io.airbyte.cdk.load.message.DestinationFile
1213
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
1314
import io.airbyte.cdk.load.message.MultiProducerChannel
1415
import io.airbyte.cdk.load.message.PartitionedQueue
@@ -127,6 +128,21 @@ class SyncBeanFactory {
127128
)
128129
}
129130

131+
/**
132+
* Same as recordQueue, but for files.
133+
*/
134+
@Singleton
135+
@Named("fileQueue")
136+
fun fileQueue(
137+
loadStrategy: LoadStrategy? = null,
138+
): PartitionedQueue<PipelineEvent<StreamKey, DestinationFile>> {
139+
return PartitionedQueue(
140+
Array(loadStrategy?.inputPartitions ?: 1) {
141+
ChannelMessageQueue(Channel(Channel.UNLIMITED))
142+
}
143+
)
144+
}
145+
130146
/** A queue for updating batch states, which is not partitioned. */
131147
@Singleton
132148
@Named("batchStateUpdateQueue")

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import io.airbyte.protocol.models.v0.AirbyteTraceMessage
3232
import io.micronaut.context.annotation.Value
3333
import jakarta.inject.Singleton
3434
import java.math.BigInteger
35+
import java.nio.MappedByteBuffer
3536
import java.time.OffsetDateTime
3637

3738
/**
@@ -167,7 +168,7 @@ data class DestinationFile(
167168
override val stream: DestinationStream.Descriptor,
168169
val emittedAtMs: Long,
169170
val serialized: String,
170-
val fileMessage: AirbyteRecordMessageFile
171+
val fileMessage: AirbyteRecordMessageFile,
171172
) : DestinationFileDomainMessage {
172173
/** Convenience constructor, primarily intended for use in tests. */
173174
class AirbyteRecordMessageFile {

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/PartitionedQueue.kt

+8-8
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,17 @@ class PartitionedQueue<T>(private val queues: Array<MessageQueue<T>>) : Closeabl
1111
val partitions = queues.size
1212

1313
fun consume(partition: Int): Flow<T> {
14-
if (partition < 0 || partition >= queues.size) {
15-
throw IllegalArgumentException("Invalid partition: $partition")
16-
}
17-
return queues[partition].consume()
14+
// if (partition < 0 || partition >= queues.size) {
15+
// throw IllegalArgumentException("Invalid partition: $partition")
16+
// }
17+
return queues[partition % partitions].consume()
1818
}
1919

2020
suspend fun publish(value: T, partition: Int) {
21-
if (partition < 0 || partition >= queues.size) {
22-
throw IllegalArgumentException("Invalid partition: $partition")
23-
}
24-
queues[partition].publish(value)
21+
// if (partition < 0 || partition >= queues.size) {
22+
// throw IllegalArgumentException("Invalid partition: $partition")
23+
// }
24+
queues[partition % partitions].publish(value)
2525
}
2626

2727
suspend fun broadcast(value: T) = queues.forEach { it.publish(value) }

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.airbyte.cdk.load.pipeline
66

77
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
8+
import io.airbyte.cdk.load.message.DestinationStreamAffinedMessage
89
import io.micronaut.context.annotation.Secondary
910
import jakarta.inject.Singleton
1011
import kotlin.math.abs

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/LoadPipeline.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask
99

1010
interface LoadPipelineStep {
1111
val numWorkers: Int
12-
fun taskForPartition(partition: Int): LoadPipelineStepTask<*, *, *, *, *>
12+
fun taskForPartition(partition: Int): Task
1313
}
1414

1515
/**

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/RoundRobinInputPartitioner.kt

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.airbyte.cdk.load.pipeline
66

77
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
8+
import io.airbyte.cdk.load.message.DestinationStreamAffinedMessage
89
import kotlin.math.abs
910
import kotlin.random.Random
1011

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/StreamManager.kt

-1
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,6 @@ class DefaultStreamManager(
389389
if (readCount == 0L) {
390390
return true
391391
}
392-
393392
val completedCount = checkpointCounts.values.sumOf { it.recordsCompleted.get() }
394393
return completedCount == readCount
395394
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt

+5
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import io.airbyte.cdk.load.command.DestinationStream
1111
import io.airbyte.cdk.load.message.BatchEnvelope
1212
import io.airbyte.cdk.load.message.ChannelMessageQueue
1313
import io.airbyte.cdk.load.message.CheckpointMessageWrapped
14+
import io.airbyte.cdk.load.message.DestinationFile
1415
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
1516
import io.airbyte.cdk.load.message.DestinationStreamEvent
1617
import io.airbyte.cdk.load.message.MessageQueue
@@ -146,6 +147,8 @@ class DefaultDestinationTaskLauncher<K : WithStream>(
146147
@Named("recordQueue")
147148
private val recordQueueForPipeline:
148149
PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
150+
@Named("fileQueue")
151+
private val fileQueueForPipeline: PartitionedQueue<PipelineEvent<StreamKey, DestinationFile>>,
149152
@Named("batchStateUpdateQueue") private val batchUpdateQueue: ChannelMessageQueue<BatchUpdate>,
150153
private val loadPipeline: LoadPipeline?,
151154
private val partitioner: InputPartitioner,
@@ -201,13 +204,15 @@ class DefaultDestinationTaskLauncher<K : WithStream>(
201204
log.info { "Starting input consumer task" }
202205
val inputConsumerTask =
203206
inputConsumerTaskFactory.make(
207+
config = config,
204208
catalog = catalog,
205209
inputFlow = inputFlow,
206210
recordQueueSupplier = recordQueueSupplier,
207211
checkpointQueue = checkpointQueue,
208212
fileTransferQueue = fileTransferQueue,
209213
destinationTaskLauncher = this,
210214
recordQueueForPipeline = recordQueueForPipeline,
215+
fileQueueForPipeline = fileQueueForPipeline,
211216
loadPipeline = loadPipeline,
212217
partitioner = partitioner,
213218
openStreamQueue = openStreamQueue,

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt

+25-9
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package io.airbyte.cdk.load.task.internal
66

77
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
88
import io.airbyte.cdk.load.command.DestinationCatalog
9+
import io.airbyte.cdk.load.command.DestinationConfiguration
910
import io.airbyte.cdk.load.command.DestinationStream
1011
import io.airbyte.cdk.load.message.Batch
1112
import io.airbyte.cdk.load.message.BatchEnvelope
@@ -50,6 +51,8 @@ import io.github.oshai.kotlinlogging.KotlinLogging
5051
import io.micronaut.context.annotation.Secondary
5152
import jakarta.inject.Named
5253
import jakarta.inject.Singleton
54+
import java.io.RandomAccessFile
55+
import java.nio.channels.FileChannel
5356
import java.util.concurrent.ConcurrentHashMap
5457

5558
interface InputConsumerTask : Task
@@ -67,6 +70,7 @@ interface InputConsumerTask : Task
6770
@Singleton
6871
@Secondary
6972
class DefaultInputConsumerTask(
73+
private val config: DestinationConfiguration,
7074
private val catalog: DestinationCatalog,
7175
private val inputFlow: ReservingDeserializingInputFlow,
7276
private val recordQueueSupplier:
@@ -81,6 +85,9 @@ class DefaultInputConsumerTask(
8185
@Named("recordQueue")
8286
private val recordQueueForPipeline:
8387
PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
88+
@Named("fileQueue")
89+
private val fileQueueForPipeline:
90+
PartitionedQueue<PipelineEvent<StreamKey, DestinationFile>>,
8491
private val loadPipeline: LoadPipeline? = null,
8592
private val partitioner: InputPartitioner,
8693
private val openStreamQueue: QueueWriter<DestinationStream>
@@ -182,19 +189,21 @@ class DefaultInputConsumerTask(
182189
reserved.release()
183190
}
184191
is DestinationFile -> {
185-
val index = manager.incrementReadCount()
186-
// destinationTaskLauncher.handleFile(stream, message, index)
187-
fileTransferQueue.publish(FileTransferQueueMessage(stream, message, index))
192+
val partition = manager.incrementReadCount().toInt() % fileQueueForPipeline.partitions
193+
fileQueueForPipeline.publish(
194+
PipelineMessage(
195+
mapOf(manager.getCurrentCheckpointId() to 1),
196+
StreamKey(stream),
197+
message
198+
) { reserved.release() },
199+
partition
200+
)
188201
}
189202
is DestinationFileStreamComplete -> {
190203
reserved.release() // safe because multiple calls conflate
191204
manager.markEndOfStream(true)
192-
val envelope =
193-
BatchEnvelope(
194-
SimpleBatch(Batch.State.COMPLETE),
195-
streamDescriptor = message.stream,
196-
)
197-
destinationTaskLauncher.handleNewBatch(stream, envelope)
205+
log.info { "Read COMPLETE for file stream $stream" }
206+
fileQueueForPipeline.broadcast(PipelineEndOfStream(stream))
198207
}
199208
is DestinationFileStreamIncomplete ->
200209
throw IllegalStateException("File stream $stream failed upstream, cannot continue.")
@@ -294,12 +303,14 @@ class DefaultInputConsumerTask(
294303
catalog.streams.forEach { recordQueueSupplier.get(it.descriptor).close() }
295304
fileTransferQueue.close()
296305
recordQueueForPipeline.close()
306+
fileQueueForPipeline.close()
297307
}
298308
}
299309
}
300310

301311
interface InputConsumerTaskFactory {
302312
fun make(
313+
config: DestinationConfiguration,
303314
catalog: DestinationCatalog,
304315
inputFlow: ReservingDeserializingInputFlow,
305316
recordQueueSupplier:
@@ -311,6 +322,7 @@ interface InputConsumerTaskFactory {
311322
// Required by new interface
312323
recordQueueForPipeline:
313324
PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
325+
fileQueueForPipeline: PartitionedQueue<PipelineEvent<StreamKey, DestinationFile>>,
314326
loadPipeline: LoadPipeline?,
315327
partitioner: InputPartitioner,
316328
openStreamQueue: QueueWriter<DestinationStream>,
@@ -323,6 +335,7 @@ class DefaultInputConsumerTaskFactory(
323335
private val syncManager: SyncManager,
324336
) : InputConsumerTaskFactory {
325337
override fun make(
338+
config: DestinationConfiguration,
326339
catalog: DestinationCatalog,
327340
inputFlow: ReservingDeserializingInputFlow,
328341
recordQueueSupplier:
@@ -334,11 +347,13 @@ class DefaultInputConsumerTaskFactory(
334347
// Required by new interface
335348
recordQueueForPipeline:
336349
PartitionedQueue<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>,
350+
fileQueueForPipeline: PartitionedQueue<PipelineEvent<StreamKey, DestinationFile>>,
337351
loadPipeline: LoadPipeline?,
338352
partitioner: InputPartitioner,
339353
openStreamQueue: QueueWriter<DestinationStream>,
340354
): InputConsumerTask {
341355
return DefaultInputConsumerTask(
356+
config,
342357
catalog,
343358
inputFlow,
344359
recordQueueSupplier,
@@ -349,6 +364,7 @@ class DefaultInputConsumerTaskFactory(
349364

350365
// Required by new interface
351366
recordQueueForPipeline,
367+
fileQueueForPipeline,
352368
loadPipeline,
353369
partitioner,
354370
openStreamQueue,

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/LoadPipelineStepTask.kt

+4-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.airbyte.cdk.load.task.internal
66

7+
import io.airbyte.cdk.load.message.Batch
78
import io.airbyte.cdk.load.message.PartitionedQueue
89
import io.airbyte.cdk.load.message.PipelineEndOfStream
910
import io.airbyte.cdk.load.message.PipelineEvent
@@ -71,8 +72,6 @@ class LoadPipelineStepTask<S : AutoCloseable, K1 : WithStream, T, K2 : WithStrea
7172
}
7273

7374
// Update bookkeeping metadata
74-
input
75-
.postProcessingCallback() // TODO: Accumulate and release when persisted
7675
input.checkpointCounts.forEach {
7776
state.checkpointCounts.merge(it.key, it.value) { old, new -> old + new }
7877
}
@@ -112,6 +111,9 @@ class LoadPipelineStepTask<S : AutoCloseable, K1 : WithStream, T, K2 : WithStrea
112111
stateStore.remove(input.key)
113112
}
114113

114+
input
115+
.postProcessingCallback() // TODO: Accumulate and release when persisted
116+
115117
stateStore
116118
}
117119
is PipelineEndOfStream -> {
@@ -147,7 +149,6 @@ class LoadPipelineStepTask<S : AutoCloseable, K1 : WithStream, T, K2 : WithStrea
147149
checkpointCounts: Map<CheckpointId, Long>,
148150
output: U
149151
) {
150-
151152
// Only publish the output if there's a next step.
152153
outputQueue?.let {
153154
val outputKey = outputPartitioner!!.getOutputKey(inputKey, output)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package io.airbyte.cdk.load.pipline.object_storage
2+
3+
import io.airbyte.cdk.load.command.DestinationCatalog
4+
import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory
5+
import io.airbyte.cdk.load.file.object_storage.Part
6+
import io.airbyte.cdk.load.message.DestinationFile
7+
import io.airbyte.cdk.load.message.PartitionedQueue
8+
import io.airbyte.cdk.load.message.PipelineEvent
9+
import io.airbyte.cdk.load.message.QueueWriter
10+
import io.airbyte.cdk.load.message.StreamKey
11+
import io.airbyte.cdk.load.pipeline.BatchUpdate
12+
import io.airbyte.cdk.load.task.SelfTerminating
13+
import io.airbyte.cdk.load.task.Task
14+
import io.airbyte.cdk.load.task.TerminalCondition
15+
import io.airbyte.cdk.load.write.object_storage.FileLoader
16+
import jakarta.inject.Named
17+
18+
class FileLoaderMemoryMapFileTask(
19+
private val catalog: DestinationCatalog,
20+
private val fileLoader: FileLoader,
21+
@Named("fileQueue")
22+
val inputQueue: PartitionedQueue<PipelineEvent<StreamKey, DestinationFile>>,
23+
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
24+
@Named("objectLoaderPartQueue") val partQueue: PartitionedQueue<PipelineEvent<ObjectKey, Part>>,
25+
val pathFactory: ObjectStoragePathFactory,
26+
): Task {
27+
override val terminalCondition: TerminalCondition = SelfTerminating
28+
29+
override suspend fun execute() {
30+
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package io.airbyte.cdk.load.pipline.object_storage
2+
3+
import io.airbyte.cdk.load.command.DestinationCatalog
4+
import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory
5+
import io.airbyte.cdk.load.file.object_storage.Part
6+
import io.airbyte.cdk.load.message.DestinationFile
7+
import io.airbyte.cdk.load.message.PartitionedQueue
8+
import io.airbyte.cdk.load.message.PipelineEvent
9+
import io.airbyte.cdk.load.message.QueueWriter
10+
import io.airbyte.cdk.load.message.StreamKey
11+
import io.airbyte.cdk.load.pipeline.BatchUpdate
12+
import io.airbyte.cdk.load.pipeline.LoadPipelineStep
13+
import io.airbyte.cdk.load.pipeline.RecordCountFlushStrategy
14+
import io.airbyte.cdk.load.task.Task
15+
import io.airbyte.cdk.load.task.internal.LoadPipelineStepTask
16+
import io.airbyte.cdk.load.write.object_storage.FileLoader
17+
import io.airbyte.cdk.load.write.object_storage.ObjectLoader
18+
import io.micronaut.context.annotation.Replaces
19+
import io.micronaut.context.annotation.Requires
20+
import io.micronaut.context.annotation.Value
21+
import jakarta.inject.Named
22+
import jakarta.inject.Singleton
23+
24+
@Singleton
25+
@Requires(bean = FileLoader::class)
26+
@Replaces(ObjectLoaderPartStep::class)
27+
class FileLoaderPartStep(
28+
private val catalog: DestinationCatalog,
29+
private val fileLoader: FileLoader,
30+
@Named("fileQueue")
31+
val inputQueue: PartitionedQueue<PipelineEvent<StreamKey, DestinationFile>>,
32+
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
33+
@Named("objectLoaderPartQueue") val partQueue: PartitionedQueue<PipelineEvent<ObjectKey, Part>>,
34+
val pathFactory: ObjectStoragePathFactory,
35+
) : LoadPipelineStep {
36+
override val numWorkers: Int = fileLoader.numPartWorkers
37+
38+
override fun taskForPartition(partition: Int): Task {
39+
return FileLoaderProcessFileTask(
40+
catalog,
41+
pathFactory,
42+
fileLoader,
43+
inputQueue,
44+
batchQueue,
45+
partQueue,
46+
partition
47+
)
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.airbyte.cdk.load.pipline.object_storage
2+
3+
import io.airbyte.cdk.load.pipeline.LoadPipeline
4+
import io.airbyte.cdk.load.write.object_storage.FileLoader
5+
import io.micronaut.context.annotation.Replaces
6+
import io.micronaut.context.annotation.Requires
7+
import jakarta.inject.Singleton
8+
9+
@Singleton
10+
@Requires(bean = FileLoader::class)
11+
@Replaces(ObjectLoaderPipeline::class)
12+
class FileLoaderPipeline(
13+
partStep: FileLoaderPartStep,
14+
uploadStep: ObjectLoaderUploadStep
15+
): LoadPipeline(
16+
listOf(partStep, uploadStep)
17+
)

0 commit comments

Comments
 (0)