@@ -95,9 +95,9 @@ class DefaultInputConsumerTask(
95
95
reserved : Reserved <DestinationStreamAffinedMessage >,
96
96
sizeBytes : Long
97
97
) {
98
- val stream = reserved.value.stream
99
- val manager = syncManager.getStreamManager(stream )
100
- val recordQueue = recordQueueSupplier.get(stream )
98
+ val streamDescriptor = reserved.value.stream.descriptor
99
+ val manager = syncManager.getStreamManager(streamDescriptor )
100
+ val recordQueue = recordQueueSupplier.get(streamDescriptor )
101
101
when (val message = reserved.value) {
102
102
is DestinationRecord -> {
103
103
val wrapped =
@@ -111,93 +111,105 @@ class DefaultInputConsumerTask(
111
111
is DestinationRecordStreamComplete -> {
112
112
reserved.release() // safe because multiple calls conflate
113
113
val wrapped = StreamEndEvent (index = manager.markEndOfStream(true ))
114
- log.info { " Read COMPLETE for stream $stream " }
114
+ log.info { " Read COMPLETE for stream $streamDescriptor " }
115
115
recordQueue.publish(reserved.replace(wrapped))
116
116
recordQueue.close()
117
117
}
118
118
is DestinationRecordStreamIncomplete -> {
119
119
reserved.release() // safe because multiple calls conflate
120
120
val wrapped = StreamEndEvent (index = manager.markEndOfStream(false ))
121
- log.info { " Read INCOMPLETE for stream $stream " }
121
+ log.info { " Read INCOMPLETE for stream $streamDescriptor " }
122
122
recordQueue.publish(reserved.replace(wrapped))
123
123
recordQueue.close()
124
124
}
125
125
is DestinationFile -> {
126
126
val index = manager.incrementReadCount()
127
127
// destinationTaskLauncher.handleFile(stream, message, index)
128
- fileTransferQueue.publish(FileTransferQueueMessage (stream, message, index))
128
+ fileTransferQueue.publish(
129
+ FileTransferQueueMessage (streamDescriptor, message, index)
130
+ )
129
131
}
130
132
is DestinationFileStreamComplete -> {
131
133
reserved.release() // safe because multiple calls conflate
132
134
manager.markEndOfStream(true )
133
135
val envelope =
134
136
BatchEnvelope (
135
137
SimpleBatch (Batch .State .COMPLETE ),
136
- streamDescriptor = message.stream ,
138
+ streamDescriptor = streamDescriptor ,
137
139
)
138
- destinationTaskLauncher.handleNewBatch(stream , envelope)
140
+ destinationTaskLauncher.handleNewBatch(streamDescriptor , envelope)
139
141
}
140
142
is DestinationFileStreamIncomplete ->
141
- throw IllegalStateException (" File stream $stream failed upstream, cannot continue." )
143
+ throw IllegalStateException (
144
+ " File stream $streamDescriptor failed upstream, cannot continue."
145
+ )
142
146
}
143
147
}
144
148
145
149
private suspend fun handleRecordForPipeline (
146
150
reserved : Reserved <DestinationStreamAffinedMessage >,
147
151
) {
148
- val stream = reserved.value.stream
149
- unopenedStreams.remove(stream )?.let {
150
- log.info { " Saw first record for stream $stream ; initializing" }
152
+ val streamDescriptor = reserved.value.stream.descriptor
153
+ unopenedStreams.remove(streamDescriptor )?.let {
154
+ log.info { " Saw first record for stream $streamDescriptor ; initializing" }
151
155
// Note, since we're not spilling to disk, there is nothing to do with
152
156
// any records before initialization is complete, so we'll wait here
153
157
// for it to finish.
154
158
openStreamQueue.publish(it)
155
- syncManager.getOrAwaitStreamLoader(stream )
156
- log.info { " Initialization for stream $stream complete" }
159
+ syncManager.getOrAwaitStreamLoader(streamDescriptor )
160
+ log.info { " Initialization for stream $streamDescriptor complete" }
157
161
}
158
- val manager = syncManager.getStreamManager(stream )
162
+ val manager = syncManager.getStreamManager(streamDescriptor )
159
163
when (val message = reserved.value) {
160
164
is DestinationRecord -> {
161
165
val record = message.asDestinationRecordRaw()
162
166
manager.incrementReadCount()
163
167
val pipelineMessage =
164
168
PipelineMessage (
165
169
mapOf (manager.getCurrentCheckpointId() to 1 ),
166
- StreamKey (stream ),
170
+ StreamKey (streamDescriptor ),
167
171
record
168
172
)
169
173
val partition = partitioner.getPartition(record, recordQueueForPipeline.partitions)
170
174
recordQueueForPipeline.publish(reserved.replace(pipelineMessage), partition)
171
175
}
172
176
is DestinationRecordStreamComplete -> {
173
177
manager.markEndOfStream(true )
174
- log.info { " Read COMPLETE for stream $stream " }
175
- recordQueueForPipeline.broadcast(reserved.replace(PipelineEndOfStream (stream)))
178
+ log.info { " Read COMPLETE for stream $streamDescriptor " }
179
+ recordQueueForPipeline.broadcast(
180
+ reserved.replace(PipelineEndOfStream (streamDescriptor))
181
+ )
176
182
reserved.release()
177
183
}
178
184
is DestinationRecordStreamIncomplete -> {
179
185
manager.markEndOfStream(false )
180
- log.info { " Read INCOMPLETE for stream $stream " }
181
- recordQueueForPipeline.broadcast(reserved.replace(PipelineEndOfStream (stream)))
186
+ log.info { " Read INCOMPLETE for stream $streamDescriptor " }
187
+ recordQueueForPipeline.broadcast(
188
+ reserved.replace(PipelineEndOfStream (streamDescriptor))
189
+ )
182
190
reserved.release()
183
191
}
184
192
is DestinationFile -> {
185
193
val index = manager.incrementReadCount()
186
194
// destinationTaskLauncher.handleFile(stream, message, index)
187
- fileTransferQueue.publish(FileTransferQueueMessage (stream, message, index))
195
+ fileTransferQueue.publish(
196
+ FileTransferQueueMessage (streamDescriptor, message, index)
197
+ )
188
198
}
189
199
is DestinationFileStreamComplete -> {
190
200
reserved.release() // safe because multiple calls conflate
191
201
manager.markEndOfStream(true )
192
202
val envelope =
193
203
BatchEnvelope (
194
204
SimpleBatch (Batch .State .COMPLETE ),
195
- streamDescriptor = message.stream ,
205
+ streamDescriptor = streamDescriptor ,
196
206
)
197
- destinationTaskLauncher.handleNewBatch(stream , envelope)
207
+ destinationTaskLauncher.handleNewBatch(streamDescriptor , envelope)
198
208
}
199
209
is DestinationFileStreamIncomplete ->
200
- throw IllegalStateException (" File stream $stream failed upstream, cannot continue." )
210
+ throw IllegalStateException (
211
+ " File stream $streamDescriptor failed upstream, cannot continue."
212
+ )
201
213
}
202
214
}
203
215
0 commit comments