Skip to content

Commit e52d188

Browse files
authored
pass streams to debezium sources on cold start (#55734)
1 parent ddadbea commit e52d188

File tree

10 files changed

+16
-14
lines changed

10 files changed

+16
-14
lines changed

airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreator.kt

+3-2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ class CdcPartitionsCreator<T : Comparable<T>>(
4545
"Triggering reset. Incumbent CDC state is invalid, reason: ${reason}."
4646
)
4747
}
48+
var allStreams: List<Stream> = feedBootstrap.feed.streams
4849
val activeStreams: List<Stream> by lazy {
4950
feedBootstrap.feed.streams.filter { feedBootstrap.currentState(it) != null }
5051
}
@@ -68,7 +69,7 @@ class CdcPartitionsCreator<T : Comparable<T>>(
6869
val startingSchemaHistory: DebeziumSchemaHistory?
6970
when (warmStartState) {
7071
null -> {
71-
debeziumProperties = creatorOps.generateColdStartProperties()
72+
debeziumProperties = creatorOps.generateColdStartProperties(allStreams)
7273
startingOffset = syntheticOffset
7374
startingSchemaHistory = null
7475
}
@@ -97,7 +98,7 @@ class CdcPartitionsCreator<T : Comparable<T>>(
9798
resetReason.set(warmStartState.reason)
9899
log.info { "Resetting invalid incumbent CDC state with synthetic state." }
99100
feedBootstrap.resetAll()
100-
debeziumProperties = creatorOps.generateColdStartProperties()
101+
debeziumProperties = creatorOps.generateColdStartProperties(allStreams)
101102
startingOffset = syntheticOffset
102103
startingSchemaHistory = null
103104
}

airbyte-cdk/bulk/toolkits/extract-cdc/src/main/kotlin/io/airbyte/cdk/read/cdc/DebeziumOperations.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ interface CdcPartitionsCreatorDebeziumOperations<T : Comparable<T>> {
2626
fun generateColdStartOffset(): DebeziumOffset
2727

2828
/** Generates Debezium properties for use with a [DebeziumColdStartingState]. */
29-
fun generateColdStartProperties(): Map<String, String>
29+
fun generateColdStartProperties(streams: List<Stream>): Map<String, String>
3030

3131
/** Maps an incumbent [OpaqueStateValue] into a [DebeziumWarmStartState]. */
3232
fun deserializeState(opaqueStateValue: OpaqueStateValue): DebeziumWarmStartState

airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMongoTest.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ class CdcPartitionReaderMongoTest :
120120
return DebeziumOffset(mapOf(key to value))
121121
}
122122

123-
override fun generateColdStartProperties(): Map<String, String> =
123+
override fun generateColdStartProperties(streams: List<Stream>): Map<String, String> =
124124
DebeziumPropertiesBuilder()
125125
.withDefault()
126126
.withConnector(MongoDbConnector::class.java)
@@ -141,7 +141,7 @@ class CdcPartitionReaderMongoTest :
141141
.buildMap()
142142

143143
override fun generateWarmStartProperties(streams: List<Stream>): Map<String, String> =
144-
generateColdStartProperties()
144+
generateColdStartProperties(streams)
145145

146146
fun currentResumeToken(): BsonDocument =
147147
container.withMongoDatabase { mongoDatabase: MongoDatabase ->

airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderMySQLTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ class CdcPartitionReaderMySQLTest :
113113
return DebeziumOffset(mapOf(key to value))
114114
}
115115

116-
override fun generateColdStartProperties(): Map<String, String> =
116+
override fun generateColdStartProperties(streams: List<Stream>): Map<String, String> =
117117
DebeziumPropertiesBuilder()
118118
.with(generateWarmStartProperties(emptyList()))
119119
.with("snapshot.mode", "recovery")

airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionReaderPostgresTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ class CdcPartitionReaderPostgresTest :
115115
.withStreams(streams)
116116
.buildMap()
117117

118-
override fun generateColdStartProperties(): Map<String, String> =
118+
override fun generateColdStartProperties(streams: List<Stream>): Map<String, String> =
119119
generateWarmStartProperties(emptyList())
120120

121121
override fun generateColdStartOffset(): DebeziumOffset {

airbyte-cdk/bulk/toolkits/extract-cdc/src/test/kotlin/io/airbyte/cdk/read/cdc/CdcPartitionsCreatorTest.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ class CdcPartitionsCreatorTest {
5050
configuredPrimaryKey = null,
5151
configuredCursor = TestMetaFieldDecorator.GlobalCursor,
5252
)
53-
54-
val global = Global(listOf(stream))
53+
val streams = listOf(stream)
54+
val global = Global(streams)
5555

5656
val lowerBoundReference = AtomicReference<CreatorPosition>(null)
5757
val upperBoundReference = AtomicReference<CreatorPosition>(null)
@@ -80,8 +80,8 @@ class CdcPartitionsCreatorTest {
8080
every { creatorOps.position(syntheticOffset) } returns 123L
8181
every { creatorOps.position(incumbentOffset) } returns 123L
8282
every { creatorOps.generateColdStartOffset() } returns syntheticOffset
83-
every { creatorOps.generateColdStartProperties() } returns emptyMap()
84-
every { creatorOps.generateWarmStartProperties(listOf(stream)) } returns emptyMap()
83+
every { creatorOps.generateColdStartProperties(streams) } returns emptyMap()
84+
every { creatorOps.generateWarmStartProperties(streams) } returns emptyMap()
8585
}
8686

8787
@Test

airbyte-cdk/bulk/toolkits/extract-cdc/src/testFixtures/kotlin/io/airbyte/cdk/read/cdc/AbstractCdcPartitionReaderTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ abstract class AbstractCdcPartitionReaderTest<T : Comparable<T>, C : AutoCloseab
8383
container.createStream()
8484
val i0 =
8585
ReadInput(
86-
debeziumOperations.generateColdStartProperties(),
86+
debeziumOperations.generateColdStartProperties(listOf(stream)),
8787
debeziumOperations.generateColdStartOffset(),
8888
schemaHistory = null,
8989
isSynthetic = true,

airbyte-integrations/connectors/source-mysql/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
12-
dockerImageTag: 3.11.6
12+
dockerImageTag: 3.11.7
1313
dockerRepository: airbyte/source-mysql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
1515
githubIssueLabel: source-mysql

airbyte-integrations/connectors/source-mysql/src/main/kotlin/io/airbyte/integrations/source/mysql/MySqlSourceDebeziumOperations.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ class MySqlSourceDebeziumOperations(
342342
}
343343
}
344344

345-
override fun generateColdStartProperties(): Map<String, String> =
345+
override fun generateColdStartProperties(streams: List<Stream>): Map<String, String> =
346346
DebeziumPropertiesBuilder()
347347
.with(commonProperties)
348348
// https://debezium.io/documentation/reference/2.2/connectors/mysql.html#mysql-property-snapshot-mode

docs/integrations/sources/mysql.md

+1
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,7 @@ Any database or table encoding combination of charset and collation is supported
226226

227227
| Version | Date | Pull Request | Subject |
228228
|:------------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
229+
| 3.11.7 | 2025-03-12 | [55734](https://github.com/airbytehq/airbyte/pull/55734) | Expose additional stream context for debezium properties at startup |
229230
| 3.11.6 | 2025-03-06 | [55237](https://github.com/airbytehq/airbyte/pull/55237) | [Fix fetching binlog status for version >=8.4](https://github.com/airbytehq/airbyte/pull/55237#top) |
230231
| 3.11.5 | 2025-03-06 | [55234](https://github.com/airbytehq/airbyte/pull/55234) | Update base image version for certified DB source connectors |
231232
| 3.11.4 | 2025-03-06 | [55214](https://github.com/airbytehq/airbyte/pull/55214) | Update default encryption method to 'required'. | |

0 commit comments

Comments
 (0)