Skip to content

Commit 466654d

Browse files
Merge branch 'master' into update-otel-metrics-in-docs
2 parents 0fb376e + cd7c0aa commit 466654d

File tree

44 files changed

+316
-163
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+316
-163
lines changed

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import io.airbyte.cdk.load.command.DestinationConfiguration
99
import io.airbyte.cdk.load.command.DestinationStream
1010
import io.airbyte.cdk.load.message.BatchEnvelope
1111
import io.airbyte.cdk.load.message.ChannelMessageQueue
12-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
12+
import io.airbyte.cdk.load.message.DestinationRecordRaw
1313
import io.airbyte.cdk.load.message.MultiProducerChannel
1414
import io.airbyte.cdk.load.message.PartitionedQueue
1515
import io.airbyte.cdk.load.message.PipelineEvent
@@ -120,7 +120,7 @@ class SyncBeanFactory {
120120
@Named("recordQueue")
121121
fun recordQueue(
122122
loadStrategy: LoadStrategy? = null,
123-
): PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>> {
123+
): PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>> {
124124
return PartitionedQueue(
125125
Array(loadStrategy?.inputPartitions ?: 1) {
126126
ChannelMessageQueue(Channel(Channel.UNLIMITED))

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/message/DestinationMessage.kt

+41
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import io.airbyte.cdk.load.command.DestinationStream
1111
import io.airbyte.cdk.load.data.AirbyteType
1212
import io.airbyte.cdk.load.data.AirbyteValue
1313
import io.airbyte.cdk.load.data.AirbyteValueDeepCoercingMapper
14+
import io.airbyte.cdk.load.data.EnrichedAirbyteValue
1415
import io.airbyte.cdk.load.data.IntegerValue
1516
import io.airbyte.cdk.load.data.StringValue
1617
import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
@@ -143,6 +144,9 @@ data class DestinationRecord(
143144
serialized.length.toLong()
144145
)
145146
}
147+
fun asDestinationRecordRaw(): DestinationRecordRaw {
148+
return DestinationRecordRaw(stream, message, serialized)
149+
}
146150
}
147151

148152
/**
@@ -163,6 +167,43 @@ data class DestinationRecordAirbyteValue(
163167
val serializedSizeBytes: Long = 0L
164168
)
165169

170+
data class EnrichedDestinationRecordAirbyteValue(
171+
val stream: DestinationStream.Descriptor,
172+
val declaredFields: Map<String, EnrichedAirbyteValue>,
173+
val airbyteMetaFields: Map<String, EnrichedAirbyteValue>,
174+
val undeclaredFields: Map<String, JsonNode>,
175+
val emittedAtMs: Long,
176+
val meta: Meta?,
177+
val serializedSizeBytes: Long = 0L
178+
)
179+
180+
data class DestinationRecordRaw(
181+
val stream: DestinationStream.Descriptor,
182+
private val rawData: AirbyteMessage,
183+
private val serialized: String
184+
) {
185+
fun asRawJson(): JsonNode {
186+
return rawData.record.data
187+
}
188+
189+
fun asDestinationRecordAirbyteValue(): DestinationRecordAirbyteValue {
190+
return DestinationRecordAirbyteValue(
191+
stream,
192+
rawData.record.data.toAirbyteValue(),
193+
rawData.record.emittedAt,
194+
Meta(
195+
rawData.record.meta?.changes?.map { Meta.Change(it.field, it.change, it.reason) }
196+
?: emptyList()
197+
),
198+
serialized.length.toLong()
199+
)
200+
}
201+
202+
fun asEnrichedDestinationRecordAirbyteValue(): EnrichedDestinationRecordAirbyteValue {
203+
TODO()
204+
}
205+
}
206+
166207
data class DestinationFile(
167208
override val stream: DestinationStream.Descriptor,
168209
val emittedAtMs: Long,

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadPipelineStep.kt

+2-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
package io.airbyte.cdk.load.pipeline
66

7-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
7+
import io.airbyte.cdk.load.message.DestinationRecordRaw
88
import io.airbyte.cdk.load.message.PartitionedQueue
99
import io.airbyte.cdk.load.message.PipelineEvent
1010
import io.airbyte.cdk.load.message.QueueWriter
@@ -24,8 +24,7 @@ import jakarta.inject.Singleton
2424
class DirectLoadPipelineStep<S : DirectLoader>(
2525
val accumulator: DirectLoadRecordAccumulator<S, StreamKey>,
2626
@Named("recordQueue")
27-
val inputQueue:
28-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
27+
val inputQueue: PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
2928
@Named("batchStateUpdateQueue") val batchQueue: QueueWriter<BatchUpdate>,
3029
@Value("\${airbyte.destination.core.record-batch-size-override:null}")
3130
val batchSizeOverride: Long? = null,

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/DirectLoadRecordAccumulator.kt

+3-6
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
package io.airbyte.cdk.load.pipeline
66

77
import io.airbyte.cdk.load.message.Batch
8-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
8+
import io.airbyte.cdk.load.message.DestinationRecordRaw
99
import io.airbyte.cdk.load.message.WithBatchState
1010
import io.airbyte.cdk.load.message.WithStream
1111
import io.airbyte.cdk.load.write.DirectLoader
@@ -25,15 +25,12 @@ data class DirectLoadAccResult(override val state: Batch.State) : WithBatchState
2525
@Requires(bean = DirectLoaderFactory::class)
2626
class DirectLoadRecordAccumulator<S : DirectLoader, K : WithStream>(
2727
val directLoaderFactory: DirectLoaderFactory<S>
28-
) : BatchAccumulator<S, K, DestinationRecordAirbyteValue, DirectLoadAccResult> {
28+
) : BatchAccumulator<S, K, DestinationRecordRaw, DirectLoadAccResult> {
2929
override fun start(key: K, part: Int): S {
3030
return directLoaderFactory.create(key.stream, part)
3131
}
3232

33-
override fun accept(
34-
record: DestinationRecordAirbyteValue,
35-
state: S
36-
): Pair<S, DirectLoadAccResult?> {
33+
override fun accept(record: DestinationRecordRaw, state: S): Pair<S, DirectLoadAccResult?> {
3734
state.accept(record).let {
3835
return when (it) {
3936
is Incomplete -> Pair(state, null)

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/pipeline/InputPartitioner.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
package io.airbyte.cdk.load.pipeline
66

7-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
7+
import io.airbyte.cdk.load.message.DestinationRecordRaw
88
import io.micronaut.context.annotation.Secondary
99
import jakarta.inject.Singleton
1010
import kotlin.math.abs
@@ -14,13 +14,13 @@ import kotlin.math.abs
1414
* partitioned by a hash of the stream name and namespace.
1515
*/
1616
interface InputPartitioner {
17-
fun getPartition(record: DestinationRecordAirbyteValue, numParts: Int): Int
17+
fun getPartition(record: DestinationRecordRaw, numParts: Int): Int
1818
}
1919

2020
@Singleton
2121
@Secondary
2222
class ByStreamInputPartitioner : InputPartitioner {
23-
override fun getPartition(record: DestinationRecordAirbyteValue, numParts: Int): Int {
23+
override fun getPartition(record: DestinationRecordRaw, numParts: Int): Int {
2424
return abs(record.stream.hashCode()) % numParts
2525
}
2626
}

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncher.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import io.airbyte.cdk.load.command.DestinationStream
1111
import io.airbyte.cdk.load.message.BatchEnvelope
1212
import io.airbyte.cdk.load.message.ChannelMessageQueue
1313
import io.airbyte.cdk.load.message.CheckpointMessageWrapped
14-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
14+
import io.airbyte.cdk.load.message.DestinationRecordRaw
1515
import io.airbyte.cdk.load.message.DestinationStreamEvent
1616
import io.airbyte.cdk.load.message.MessageQueue
1717
import io.airbyte.cdk.load.message.MessageQueueSupplier
@@ -145,7 +145,7 @@ class DefaultDestinationTaskLauncher<K : WithStream>(
145145
// New interface shim
146146
@Named("recordQueue")
147147
private val recordQueueForPipeline:
148-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
148+
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
149149
@Named("batchStateUpdateQueue") private val batchUpdateQueue: ChannelMessageQueue<BatchUpdate>,
150150
private val loadPipeline: LoadPipeline?,
151151
private val partitioner: InputPartitioner,

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTask.kt

+5-5
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import io.airbyte.cdk.load.message.DestinationFile
1515
import io.airbyte.cdk.load.message.DestinationFileStreamComplete
1616
import io.airbyte.cdk.load.message.DestinationFileStreamIncomplete
1717
import io.airbyte.cdk.load.message.DestinationRecord
18-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
18+
import io.airbyte.cdk.load.message.DestinationRecordRaw
1919
import io.airbyte.cdk.load.message.DestinationRecordStreamComplete
2020
import io.airbyte.cdk.load.message.DestinationRecordStreamIncomplete
2121
import io.airbyte.cdk.load.message.DestinationStreamAffinedMessage
@@ -80,7 +80,7 @@ class DefaultInputConsumerTask(
8080
// Required by new interface
8181
@Named("recordQueue")
8282
private val recordQueueForPipeline:
83-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
83+
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
8484
private val loadPipeline: LoadPipeline? = null,
8585
private val partitioner: InputPartitioner,
8686
private val openStreamQueue: QueueWriter<DestinationStream>
@@ -158,7 +158,7 @@ class DefaultInputConsumerTask(
158158
val manager = syncManager.getStreamManager(stream)
159159
when (val message = reserved.value) {
160160
is DestinationRecord -> {
161-
val record = message.asRecordMarshaledToAirbyteValue()
161+
val record = message.asDestinationRecordRaw()
162162
manager.incrementReadCount()
163163
val pipelineMessage =
164164
PipelineMessage(
@@ -310,7 +310,7 @@ interface InputConsumerTaskFactory {
310310

311311
// Required by new interface
312312
recordQueueForPipeline:
313-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
313+
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
314314
loadPipeline: LoadPipeline?,
315315
partitioner: InputPartitioner,
316316
openStreamQueue: QueueWriter<DestinationStream>,
@@ -333,7 +333,7 @@ class DefaultInputConsumerTaskFactory(
333333

334334
// Required by new interface
335335
recordQueueForPipeline:
336-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
336+
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
337337
loadPipeline: LoadPipeline?,
338338
partitioner: InputPartitioner,
339339
openStreamQueue: QueueWriter<DestinationStream>,

airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/write/DirectLoader.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
package io.airbyte.cdk.load.write
66

77
import io.airbyte.cdk.load.command.DestinationStream
8-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
8+
import io.airbyte.cdk.load.message.DestinationRecordRaw
99

1010
/**
1111
* [DirectLoader] is for the use case where records are loaded directly into the destination or
@@ -56,7 +56,7 @@ interface DirectLoader : AutoCloseable {
5656
* Called once per record until it returns [Complete], after which [close] is called, the loader
5757
* is discarded, and the records are considered processed by the platform.
5858
*/
59-
fun accept(record: DestinationRecordAirbyteValue): DirectLoadResult
59+
fun accept(record: DestinationRecordRaw): DirectLoadResult
6060

6161
/**
6262
* Called by the CDK to force work to finish. It will only be called if the last call to

airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import io.airbyte.cdk.load.command.MockDestinationConfiguration
1313
import io.airbyte.cdk.load.message.Batch
1414
import io.airbyte.cdk.load.message.BatchEnvelope
1515
import io.airbyte.cdk.load.message.CheckpointMessageWrapped
16-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
16+
import io.airbyte.cdk.load.message.DestinationRecordRaw
1717
import io.airbyte.cdk.load.message.DestinationStreamEvent
1818
import io.airbyte.cdk.load.message.MessageQueue
1919
import io.airbyte.cdk.load.message.MessageQueueSupplier
@@ -159,7 +159,7 @@ class DestinationTaskLauncherTest {
159159
destinationTaskLauncher: DestinationTaskLauncher,
160160
fileTransferQueue: MessageQueue<FileTransferQueueMessage>,
161161
recordQueueForPipeline:
162-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>,
162+
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>,
163163
loadPipeline: LoadPipeline?,
164164
partitioner: InputPartitioner,
165165
openStreamQueue: QueueWriter<DestinationStream>,

airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/DestinationTaskLauncherUTest.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import io.airbyte.cdk.load.message.Batch
1111
import io.airbyte.cdk.load.message.BatchEnvelope
1212
import io.airbyte.cdk.load.message.ChannelMessageQueue
1313
import io.airbyte.cdk.load.message.CheckpointMessageWrapped
14-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
14+
import io.airbyte.cdk.load.message.DestinationRecordRaw
1515
import io.airbyte.cdk.load.message.DestinationStreamEvent
1616
import io.airbyte.cdk.load.message.MessageQueue
1717
import io.airbyte.cdk.load.message.MessageQueueSupplier
@@ -95,14 +95,14 @@ class DestinationTaskLauncherUTest {
9595
private val openStreamQueue: MessageQueue<DestinationStream> = mockk(relaxed = true)
9696

9797
private val recordQueueForPipeline:
98-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>> =
98+
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>> =
9999
mockk(relaxed = true)
100100
private val batchUpdateQueue: ChannelMessageQueue<BatchUpdate> = mockk(relaxed = true)
101101
private val partitioner: InputPartitioner = mockk(relaxed = true)
102102
private val updateBatchTaskFactory: UpdateBatchStateTaskFactory = mockk(relaxed = true)
103103

104104
private fun getDefaultDestinationTaskLauncher(
105-
useFileTranfer: Boolean,
105+
useFileTransfer: Boolean,
106106
loadPipeline: LoadPipeline? = null
107107
): DefaultDestinationTaskLauncher<Nothing> {
108108
return DefaultDestinationTaskLauncher(
@@ -124,7 +124,7 @@ class DestinationTaskLauncherUTest {
124124
updateCheckpointsTask,
125125
failStreamTaskFactory,
126126
failSyncTaskFactory,
127-
useFileTranfer,
127+
useFileTransfer,
128128
inputFlow,
129129
recordQueueSupplier,
130130
checkpointQueue,

airbyte-cdk/bulk/core/load/src/test/kotlin/io/airbyte/cdk/load/task/internal/InputConsumerTaskUTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import io.airbyte.cdk.load.data.ObjectTypeWithoutSchema
1010
import io.airbyte.cdk.load.message.CheckpointMessageWrapped
1111
import io.airbyte.cdk.load.message.DestinationMessage
1212
import io.airbyte.cdk.load.message.DestinationRecord
13-
import io.airbyte.cdk.load.message.DestinationRecordAirbyteValue
13+
import io.airbyte.cdk.load.message.DestinationRecordRaw
1414
import io.airbyte.cdk.load.message.DestinationStreamEvent
1515
import io.airbyte.cdk.load.message.MessageQueue
1616
import io.airbyte.cdk.load.message.MessageQueueSupplier
@@ -47,7 +47,7 @@ class InputConsumerTaskUTest {
4747
@MockK lateinit var fileTransferQueue: MessageQueue<FileTransferQueueMessage>
4848
@MockK
4949
lateinit var recordQueueForPipeline:
50-
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordAirbyteValue>>>
50+
PartitionedQueue<Reserved<PipelineEvent<StreamKey, DestinationRecordRaw>>>
5151
@MockK lateinit var partitioner: InputPartitioner
5252
@MockK lateinit var openStreamQueue: QueueWriter<DestinationStream>
5353

airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreator.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class CdcPartitionsCreator<T : Comparable<T>>(
4545
"Triggering reset. Incumbent CDC state is invalid, reason: ${reason}."
4646
)
4747
}
48+
var allStreams: List<Stream> = feedBootstrap.feed.streams
4849
val activeStreams: List<Stream> by lazy {
4950
feedBootstrap.feed.streams.filter { feedBootstrap.currentState(it) != null }
5051
}
@@ -68,7 +69,7 @@ class CdcPartitionsCreator<T : Comparable<T>>(
6869
val startingSchemaHistory: DebeziumSchemaHistory?
6970
when (warmStartState) {
7071
null -> {
71-
debeziumProperties = creatorOps.generateColdStartProperties()
72+
debeziumProperties = creatorOps.generateColdStartProperties(allStreams)
7273
startingOffset = syntheticOffset
7374
startingSchemaHistory = null
7475
}
@@ -97,7 +98,7 @@ class CdcPartitionsCreator<T : Comparable<T>>(
9798
resetReason.set(warmStartState.reason)
9899
log.info { "Resetting invalid incumbent CDC state with synthetic state." }
99100
feedBootstrap.resetAll()
100-
debeziumProperties = creatorOps.generateColdStartProperties()
101+
debeziumProperties = creatorOps.generateColdStartProperties(allStreams)
101102
startingOffset = syntheticOffset
102103
startingSchemaHistory = null
103104
}

airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/DebeziumOperations.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ interface CdcPartitionsCreatorDebeziumOperations<T : Comparable<T>> {
2626
fun generateColdStartOffset(): DebeziumOffset
2727

2828
/** Generates Debezium properties for use with a [DebeziumColdStartingState]. */
29-
fun generateColdStartProperties(): Map<String, String>
29+
fun generateColdStartProperties(streams: List<Stream>): Map<String, String>
3030

3131
/** Maps an incumbent [OpaqueStateValue] into a [DebeziumWarmStartState]. */
3232
fun deserializeState(opaqueStateValue: OpaqueStateValue): DebeziumWarmStartState

airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMongoTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ class CdcPartitionReaderMongoTest :
120120
return DebeziumOffset(mapOf(key to value))
121121
}
122122

123-
override fun generateColdStartProperties(): Map<String, String> =
123+
override fun generateColdStartProperties(streams: List<Stream>): Map<String, String> =
124124
DebeziumPropertiesBuilder()
125125
.withDefault()
126126
.withConnector(MongoDbConnector::class.java)
@@ -141,7 +141,7 @@ class CdcPartitionReaderMongoTest :
141141
.buildMap()
142142

143143
override fun generateWarmStartProperties(streams: List<Stream>): Map<String, String> =
144-
generateColdStartProperties()
144+
generateColdStartProperties(streams)
145145

146146
fun currentResumeToken(): BsonDocument =
147147
container.withMongoDatabase { mongoDatabase: MongoDatabase ->

airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMySQLTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ class CdcPartitionReaderMySQLTest :
113113
return DebeziumOffset(mapOf(key to value))
114114
}
115115

116-
override fun generateColdStartProperties(): Map<String, String> =
116+
override fun generateColdStartProperties(streams: List<Stream>): Map<String, String> =
117117
DebeziumPropertiesBuilder()
118118
.with(generateWarmStartProperties(emptyList()))
119119
.with("snapshot.mode", "recovery")

airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderPostgresTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class CdcPartitionReaderPostgresTest :
115115
.withStreams(streams)
116116
.buildMap()
117117

118-
override fun generateColdStartProperties(): Map<String, String> =
118+
override fun generateColdStartProperties(streams: List<Stream>): Map<String, String> =
119119
generateWarmStartProperties(emptyList())
120120

121121
override fun generateColdStartOffset(): DebeziumOffset {

airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorTest.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ class CdcPartitionsCreatorTest {
5050
configuredPrimaryKey = null,
5151
configuredCursor = TestMetaFieldDecorator.GlobalCursor,
5252
)
53-
54-
val global = Global(listOf(stream))
53+
val streams = listOf(stream)
54+
val global = Global(streams)
5555

5656
val lowerBoundReference = AtomicReference<CreatorPosition>(null)
5757
val upperBoundReference = AtomicReference<CreatorPosition>(null)
@@ -80,8 +80,8 @@ class CdcPartitionsCreatorTest {
8080
every { creatorOps.position(syntheticOffset) } returns 123L
8181
every { creatorOps.position(incumbentOffset) } returns 123L
8282
every { creatorOps.generateColdStartOffset() } returns syntheticOffset
83-
every { creatorOps.generateColdStartProperties() } returns emptyMap()
84-
every { creatorOps.generateWarmStartProperties(listOf(stream)) } returns emptyMap()
83+
every { creatorOps.generateColdStartProperties(streams) } returns emptyMap()
84+
every { creatorOps.generateWarmStartProperties(streams) } returns emptyMap()
8585
}
8686

8787
@Test

0 commit comments

Comments
 (0)