Skip to content

Commit 4beba6c

Browse files
authored
[source-mysql-v2] Add extra check logic for CDC (#45916)
1 parent b82a05a commit 4beba6c

File tree

4 files changed

+131
-7
lines changed

4 files changed

+131
-7
lines changed

airbyte-integrations/connectors/source-mysql-v2/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: 561393ed-7e3a-4d0d-8b8b-90ded371754c
12-
dockerImageTag: 0.0.10
12+
dockerImageTag: 0.0.11
1313
dockerRepository: airbyte/source-mysql-v2
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
1515
githubIssueLabel: source-mysql-v2

airbyte-integrations/connectors/source-mysql-v2/src/main/kotlin/io/airbyte/integrations/source/mysql/MysqlSourceMetadataQuerier.kt

+66-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
/* Copyright (c) 2024 Airbyte, Inc., all rights reserved. */
22
package io.airbyte.integrations.source.mysql
33

4+
import io.airbyte.cdk.ConfigErrorException
45
import io.airbyte.cdk.StreamIdentifier
56
import io.airbyte.cdk.check.JdbcCheckQueries
67
import io.airbyte.cdk.command.SourceConfiguration
@@ -15,7 +16,9 @@ import io.airbyte.protocol.models.v0.StreamDescriptor
1516
import io.github.oshai.kotlinlogging.KotlinLogging
1617
import io.micronaut.context.annotation.Primary
1718
import jakarta.inject.Singleton
19+
import java.sql.Connection
1820
import java.sql.ResultSet
21+
import java.sql.SQLException
1922
import java.sql.Statement
2023

2124
private val log = KotlinLogging.logger {}
@@ -25,6 +28,67 @@ class MysqlSourceMetadataQuerier(
2528
val base: JdbcMetadataQuerier,
2629
) : MetadataQuerier by base {
2730

31+
override fun extraChecks() {
32+
base.extraChecks()
33+
if (base.config.global) {
34+
// Extra checks for CDC
35+
var cdcVariableCheckQueries: List<Pair<String, String>> =
36+
listOf(
37+
Pair("show variables where Variable_name = 'log_bin'", "ON"),
38+
Pair("show variables where Variable_name = 'binlog_format'", "ROW"),
39+
Pair("show variables where Variable_name = 'binlog_row_image'", "FULL"),
40+
)
41+
42+
cdcVariableCheckQueries.forEach { runVariableCheckSql(it.first, it.second, base.conn) }
43+
44+
// Note: SHOW MASTER STATUS has been deprecated in latest mysql (8.4) and going forward
45+
// it should be SHOW BINARY LOG STATUS. We will run both - if both have been failed we
46+
// will throw exception.
47+
try {
48+
base.conn.createStatement().use { stmt: Statement ->
49+
stmt.execute("SHOW MASTER STATUS")
50+
}
51+
} catch (e: SQLException) {
52+
try {
53+
base.conn.createStatement().use { stmt: Statement ->
54+
stmt.execute("SHOW BINARY LOG STATUS")
55+
}
56+
} catch (ex: SQLException) {
57+
throw ConfigErrorException(
58+
"Please grant REPLICATION CLIENT privilege, so that binary log files are available for CDC mode."
59+
)
60+
}
61+
}
62+
}
63+
}
64+
65+
private fun runVariableCheckSql(sql: String, expectedValue: String, conn: Connection) {
66+
try {
67+
conn.createStatement().use { stmt: Statement ->
68+
stmt.executeQuery(sql).use { rs: ResultSet ->
69+
if (!rs.next()) {
70+
throw ConfigErrorException("Could not query the variable $sql")
71+
}
72+
val resultValue: String = rs.getString("Value")
73+
if (!resultValue.equals(expectedValue, ignoreCase = true)) {
74+
throw ConfigErrorException(
75+
String.format(
76+
"The variable should be set to \"%s\", but it is \"%s\"",
77+
expectedValue,
78+
resultValue,
79+
),
80+
)
81+
}
82+
if (rs.next()) {
83+
throw ConfigErrorException("Could not query the variable $sql")
84+
}
85+
}
86+
}
87+
} catch (e: Exception) {
88+
throw ConfigErrorException("Check query failed with: ${e.message}")
89+
}
90+
}
91+
2892
override fun fields(streamID: StreamIdentifier): List<Field> {
2993
val table: TableName = findTableName(streamID) ?: return listOf()
3094
if (table !in base.memoizedColumnMetadata) return listOf()
@@ -75,8 +139,8 @@ class MysqlSourceMetadataQuerier(
75139
.groupBy {
76140
findTableName(
77141
StreamIdentifier.from(
78-
StreamDescriptor().withName(it.tableName).withNamespace("public")
79-
)
142+
StreamDescriptor().withName(it.tableName).withNamespace("public"),
143+
),
80144
)
81145
}
82146
.mapNotNull { (table, rowsByTable) ->

airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlCdcIntegrationTest.kt

+58-4
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import io.airbyte.cdk.jdbc.StringFieldType
1616
import io.airbyte.cdk.output.BufferingOutputConsumer
1717
import io.airbyte.cdk.util.Jsons
1818
import io.airbyte.integrations.source.mysql.MysqlContainerFactory.execAsRoot
19+
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
20+
import io.airbyte.protocol.models.v0.AirbyteMessage
1921
import io.airbyte.protocol.models.v0.AirbyteStateMessage
2022
import io.airbyte.protocol.models.v0.AirbyteStream
2123
import io.airbyte.protocol.models.v0.CatalogHelpers
@@ -26,13 +28,57 @@ import io.airbyte.protocol.models.v0.SyncMode
2628
import io.github.oshai.kotlinlogging.KotlinLogging
2729
import java.sql.Connection
2830
import java.sql.Statement
31+
import org.junit.jupiter.api.Assertions.assertEquals
2932
import org.junit.jupiter.api.BeforeAll
3033
import org.junit.jupiter.api.Test
3134
import org.junit.jupiter.api.Timeout
3235
import org.testcontainers.containers.MySQLContainer
3336

3437
class MysqlCdcIntegrationTest {
3538

39+
@Test
40+
fun testCheck() {
41+
val run1: BufferingOutputConsumer = CliRunner.source("check", config(), null).run()
42+
43+
assertEquals(run1.messages().size, 1)
44+
assertEquals(
45+
run1.messages().first().connectionStatus.status,
46+
AirbyteConnectionStatus.Status.SUCCEEDED
47+
)
48+
49+
MysqlContainerFactory.exclusive(
50+
imageName = "mysql:8.0",
51+
MysqlContainerFactory.WithCdcOff,
52+
)
53+
.use { nonCdcDbContainer ->
54+
{
55+
val invalidConfig: MysqlSourceConfigurationJsonObject =
56+
MysqlContainerFactory.config(nonCdcDbContainer).apply {
57+
setCursorMethodValue(CdcCursor())
58+
}
59+
60+
val nonCdcConnectionFactory =
61+
JdbcConnectionFactory(MysqlSourceConfigurationFactory().make(invalidConfig))
62+
63+
provisionTestContainer(nonCdcDbContainer, nonCdcConnectionFactory)
64+
65+
val run2: BufferingOutputConsumer =
66+
CliRunner.source("check", invalidConfig, null).run()
67+
68+
val messageInRun2 =
69+
run2
70+
.messages()
71+
.filter { it.type == AirbyteMessage.Type.CONNECTION_STATUS }
72+
.first()
73+
74+
assertEquals(
75+
AirbyteConnectionStatus.Status.FAILED,
76+
messageInRun2.connectionStatus.status
77+
)
78+
}
79+
}
80+
}
81+
3682
@Test
3783
fun test() {
3884
val run1: BufferingOutputConsumer =
@@ -99,12 +145,20 @@ class MysqlCdcIntegrationTest {
99145
imageName = "mysql:8.0",
100146
MysqlContainerFactory.WithNetwork,
101147
)
148+
provisionTestContainer(dbContainer, connectionFactory)
149+
}
150+
151+
fun provisionTestContainer(
152+
targetContainer: MySQLContainer<*>,
153+
targetConnectionFactory: JdbcConnectionFactory
154+
) {
102155
val grant =
103156
"GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT " +
104-
"ON *.* TO '${dbContainer.username}'@'%';"
105-
dbContainer.execAsRoot(grant)
106-
dbContainer.execAsRoot("FLUSH PRIVILEGES;")
107-
connectionFactory.get().use { connection: Connection ->
157+
"ON *.* TO '${targetContainer.username}'@'%';"
158+
targetContainer.execAsRoot(grant)
159+
targetContainer.execAsRoot("FLUSH PRIVILEGES;")
160+
161+
targetConnectionFactory.get().use { connection: Connection ->
108162
connection.isReadOnly = false
109163
connection.createStatement().use { stmt: Statement ->
110164
stmt.execute("CREATE TABLE test.tbl(k INT PRIMARY KEY, v VARCHAR(80))")

airbyte-integrations/connectors/source-mysql-v2/src/test/kotlin/io/airbyte/integrations/source/mysql/MysqlContainerFactory.kt

+6
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@ object MysqlContainerFactory {
2525
}
2626
}
2727

28+
data object WithCdcOff : MysqlContainerModifier {
29+
override fun modify(container: MySQLContainer<*>) {
30+
container.withCommand("--skip-log-bin")
31+
}
32+
}
33+
2834
fun exclusive(
2935
imageName: String,
3036
vararg modifiers: MysqlContainerModifier,

0 commit comments

Comments
 (0)