@@ -197,23 +197,6 @@ class DefaultDestinationTaskLauncher<K : WithStream>(
197
197
}
198
198
199
199
override suspend fun run () {
200
- // Start the input consumer ASAP
201
- log.info { " Starting input consumer task" }
202
- val inputConsumerTask =
203
- inputConsumerTaskFactory.make(
204
- catalog = catalog,
205
- inputFlow = inputFlow,
206
- recordQueueSupplier = recordQueueSupplier,
207
- checkpointQueue = checkpointQueue,
208
- fileTransferQueue = fileTransferQueue,
209
- destinationTaskLauncher = this ,
210
- recordQueueForPipeline = recordQueueForPipeline,
211
- loadPipeline = loadPipeline,
212
- partitioner = partitioner,
213
- openStreamQueue = openStreamQueue,
214
- )
215
- launch(inputConsumerTask)
216
-
217
200
// Launch the client interface setup task
218
201
log.info { " Starting startup task" }
219
202
val setupTask = setupTaskFactory.make(this )
@@ -225,12 +208,29 @@ class DefaultDestinationTaskLauncher<K : WithStream>(
225
208
}
226
209
227
210
if (loadPipeline != null ) {
228
- log.info { " Setting up load pipeline" }
229
- loadPipeline.start { launch(it ) }
211
+ log.info { " Setup load pipeline" }
212
+ loadPipeline.start { task -> launch(task, withExceptionHandling = true ) }
230
213
log.info { " Launching update batch task" }
231
214
val updateBatchTask = updateBatchTaskFactory.make(this )
232
215
launch(updateBatchTask)
233
216
} else {
217
+ // Start the input consumer ASAP
218
+ log.info { " Starting input consumer task" }
219
+ val inputConsumerTask =
220
+ inputConsumerTaskFactory.make(
221
+ catalog = catalog,
222
+ inputFlow = inputFlow,
223
+ recordQueueSupplier = recordQueueSupplier,
224
+ checkpointQueue = checkpointQueue,
225
+ fileTransferQueue = fileTransferQueue,
226
+ destinationTaskLauncher = this ,
227
+ recordQueueForPipeline = recordQueueForPipeline,
228
+ loadPipeline = loadPipeline,
229
+ partitioner = partitioner,
230
+ openStreamQueue = openStreamQueue,
231
+ )
232
+ launch(inputConsumerTask)
233
+
234
234
// TODO: pluggable file transfer
235
235
if (! fileTransferEnabled) {
236
236
// Start a spill-to-disk task for each record stream
@@ -289,6 +289,26 @@ class DefaultDestinationTaskLauncher<K : WithStream>(
289
289
catalog.streams.forEach { openStreamQueue.publish(it) }
290
290
log.info { " Closing open stream queue" }
291
291
openStreamQueue.close()
292
+ } else {
293
+ // When the pipeline is enabled, input consuming for
294
+ // each stream will wait on stream start to complete,
295
+ // but not on setup. This is the simplest way to make
296
+ // it do that.
297
+ log.info { " Setup complete, starting input consumer task" }
298
+ val inputConsumerTask =
299
+ inputConsumerTaskFactory.make(
300
+ catalog = catalog,
301
+ inputFlow = inputFlow,
302
+ recordQueueSupplier = recordQueueSupplier,
303
+ checkpointQueue = checkpointQueue,
304
+ fileTransferQueue = fileTransferQueue,
305
+ destinationTaskLauncher = this ,
306
+ recordQueueForPipeline = recordQueueForPipeline,
307
+ loadPipeline = loadPipeline,
308
+ partitioner = partitioner,
309
+ openStreamQueue = openStreamQueue,
310
+ )
311
+ launch(inputConsumerTask)
292
312
}
293
313
}
294
314
0 commit comments