Skip to content

Commit e7a0548

Browse files
akashkulkjatinyadav-cc
authored andcommitted
[Source-mysql] : Add config to throw an error on invalid CDC position (airbytehq#35338)
1 parent 1bf61fc commit e7a0548

File tree

12 files changed

+99
-1
lines changed

12 files changed

+99
-1
lines changed

airbyte-integrations/connectors/source-mysql/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
12-
dockerImageTag: 3.3.7
12+
dockerImageTag: 3.3.8
1313
dockerRepository: airbyte/source-mysql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
1515
githubIssueLabel: source-mysql
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.source.mysql;
6+
7+
// Constants defined in
8+
// airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json.
9+
public class MySqlSpecConstants {
10+
11+
public static final String INVALID_CDC_CURSOR_POSITION_PROPERTY = "invalid_cdc_cursor_position_behavior";
12+
public static final String FAIL_SYNC_OPTION = "Fail sync";
13+
public static final String RESYNC_DATA_OPTION = "Re-sync data";
14+
15+
}

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java

+8
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage;
88
import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.getTableSizeInfoForStreams;
99
import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.prettyPrintConfiguredAirbyteStreamList;
10+
import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.FAIL_SYNC_OPTION;
11+
import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY;
1012
import static io.airbyte.integrations.source.mysql.cdc.MysqlCdcStateConstants.MYSQL_CDC_OFFSET;
1113
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadGlobalStateManager.STATE_TYPE_KEY;
1214
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadStateManager.PRIMARY_KEY_STATE_TYPE;
@@ -25,6 +27,7 @@
2527
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo;
2628
import io.airbyte.cdk.integrations.source.relationaldb.models.CdcState;
2729
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
30+
import io.airbyte.commons.exceptions.ConfigErrorException;
2831
import io.airbyte.commons.json.Jsons;
2932
import io.airbyte.commons.util.AutoCloseableIterator;
3033
import io.airbyte.commons.util.AutoCloseableIterators;
@@ -111,6 +114,11 @@ public static List<AutoCloseableIterator<AirbyteMessage>> getCdcReadIterators(fi
111114

112115
if (!savedOffsetStillPresentOnServer) {
113116
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage());
117+
if (!sourceConfig.get("replication_method").has(INVALID_CDC_CURSOR_POSITION_PROPERTY) || sourceConfig.get("replication_method").get(
118+
INVALID_CDC_CURSOR_POSITION_PROPERTY).asText().equals(FAIL_SYNC_OPTION)) {
119+
throw new ConfigErrorException(
120+
"Saved offset no longer present on the server. Please reset the connection, and then increase binlog retention or reduce sync frequency. See https://docs.airbyte.com/integrations/sources/mysql/mysql-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details.");
121+
}
114122
LOGGER.warn("Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch");
115123
}
116124

airbyte-integrations/connectors/source-mysql/src/main/resources/spec.json

+9
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,15 @@
211211
"description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.",
212212
"order": 2,
213213
"always_show": true
214+
},
215+
"invalid_cdc_cursor_position_behavior": {
216+
"type": "string",
217+
"title": "Invalid CDC position behavior (Advanced)",
218+
"description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.",
219+
"enum": ["Fail sync", "Re-sync data"],
220+
"default": "Fail sync",
221+
"order": 3,
222+
"always_show": true
214223
}
215224
}
216225
},

airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_cloud_spec.json

+9
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,15 @@
189189
"description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.",
190190
"order": 2,
191191
"always_show": true
192+
},
193+
"invalid_cdc_cursor_position_behavior": {
194+
"type": "string",
195+
"title": "Invalid CDC position behavior (Advanced)",
196+
"description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.",
197+
"enum": ["Fail sync", "Re-sync data"],
198+
"default": "Fail sync",
199+
"order": 3,
200+
"always_show": true
192201
}
193202
}
194203
},

airbyte-integrations/connectors/source-mysql/src/test-integration/resources/expected_oss_spec.json

+9
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,15 @@
211211
"description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.",
212212
"order": 2,
213213
"always_show": true
214+
},
215+
"invalid_cdc_cursor_position_behavior": {
216+
"type": "string",
217+
"title": "Invalid CDC position behavior (Advanced)",
218+
"description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.",
219+
"enum": ["Fail sync", "Re-sync data"],
220+
"default": "Fail sync",
221+
"order": 3,
222+
"always_show": true
214223
}
215224
}
216225
},

airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/CdcMysqlSourceTest.java

+9
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_DEFAULT_CURSOR;
1111
import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_FILE;
1212
import static io.airbyte.integrations.source.mysql.MySqlSource.CDC_LOG_POS;
13+
import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.FAIL_SYNC_OPTION;
1314
import static io.airbyte.integrations.source.mysql.cdc.MysqlCdcStateConstants.IS_COMPRESSED;
1415
import static io.airbyte.integrations.source.mysql.cdc.MysqlCdcStateConstants.MYSQL_CDC_OFFSET;
1516
import static io.airbyte.integrations.source.mysql.cdc.MysqlCdcStateConstants.MYSQL_DB_HISTORY;
@@ -20,6 +21,7 @@
2021
import static org.junit.jupiter.api.Assertions.assertNotEquals;
2122
import static org.junit.jupiter.api.Assertions.assertNotNull;
2223
import static org.junit.jupiter.api.Assertions.assertNull;
24+
import static org.junit.jupiter.api.Assertions.assertThrows;
2325
import static org.junit.jupiter.api.Assertions.assertTrue;
2426

2527
import com.fasterxml.jackson.databind.JsonNode;
@@ -33,6 +35,7 @@
3335
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
3436
import io.airbyte.cdk.integrations.debezium.CdcSourceTest;
3537
import io.airbyte.cdk.integrations.debezium.internals.AirbyteSchemaHistoryStorage;
38+
import io.airbyte.commons.exceptions.ConfigErrorException;
3639
import io.airbyte.commons.json.Jsons;
3740
import io.airbyte.commons.util.AutoCloseableIterator;
3841
import io.airbyte.commons.util.AutoCloseableIterators;
@@ -277,6 +280,12 @@ protected void syncShouldHandlePurgedLogsGracefully() throws Exception {
277280
dataFromSecondBatch);
278281
assertEquals((recordsToCreate * 2) + recordsCreatedBeforeTestCount, recordsFromSecondBatch.size(),
279282
"Expected 46 records to be replicated in the second sync.");
283+
284+
JsonNode failSyncConfig = testdb.testConfigBuilder()
285+
.withCdcReplication(FAIL_SYNC_OPTION)
286+
.with(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1)
287+
.build();
288+
assertThrows(ConfigErrorException.class, () -> source().read(failSyncConfig, getConfiguredCatalog(), state));
280289
}
281290

282291
/**

airbyte-integrations/connectors/source-mysql/src/test/resources/expected_cloud_spec.json

+9
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,15 @@
205205
"description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.",
206206
"order": 2,
207207
"always_show": true
208+
},
209+
"invalid_cdc_cursor_position_behavior": {
210+
"type": "string",
211+
"title": "Invalid CDC position behavior (Advanced)",
212+
"description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.",
213+
"enum": ["Fail sync", "Re-sync data"],
214+
"default": "Fail sync",
215+
"order": 3,
216+
"always_show": true
208217
}
209218
}
210219
},

airbyte-integrations/connectors/source-mysql/src/test/resources/expected_oss_spec.json

+9
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,15 @@
211211
"description": "Enter the configured MySQL server timezone. This should only be done if the configured timezone in your MySQL instance does not conform to IANNA standard.",
212212
"order": 2,
213213
"always_show": true
214+
},
215+
"invalid_cdc_cursor_position_behavior": {
216+
"type": "string",
217+
"title": "Invalid CDC position behavior (Advanced)",
218+
"description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.",
219+
"enum": ["Fail sync", "Re-sync data"],
220+
"default": "Fail sync",
221+
"order": 3,
222+
"always_show": true
214223
}
215224
}
216225
},

airbyte-integrations/connectors/source-mysql/src/testFixtures/java/io/airbyte/integrations/source/mysql/MySQLTestDatabase.java

+8
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
package io.airbyte.integrations.source.mysql;
66

7+
import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY;
8+
import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.RESYNC_DATA_OPTION;
9+
710
import com.google.common.collect.ImmutableMap;
811
import io.airbyte.cdk.db.factory.DatabaseDriver;
912
import io.airbyte.cdk.testutils.TestDatabase;
@@ -128,12 +131,17 @@ public MySQLConfigBuilder withStandardReplication() {
128131
}
129132

130133
public MySQLConfigBuilder withCdcReplication() {
134+
return withCdcReplication(RESYNC_DATA_OPTION);
135+
}
136+
137+
public MySQLConfigBuilder withCdcReplication(String cdcCursorFailBehaviour) {
131138
return this
132139
.with("is_test", true)
133140
.with("replication_method", ImmutableMap.builder()
134141
.put("method", "CDC")
135142
.put("initial_waiting_seconds", 5)
136143
.put("server_time_zone", "America/Los_Angeles")
144+
.put(INVALID_CDC_CURSOR_POSITION_PROPERTY, cdcCursorFailBehaviour)
137145
.build());
138146
}
139147

docs/integrations/sources/mysql.md

+1
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ Any database or table encoding combination of charset and collation is supported
223223

224224
| Version | Date | Pull Request | Subject |
225225
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
226+
| 3.3.8 | 2024-02-20 | [35338](https://github.com/airbytehq/airbyte/pull/35338) | Add config to throw an error on invalid CDC position. |
226227
| 3.3.7 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. |
227228
| 3.3.6 | 2024-02-13 | [34869](https://github.com/airbytehq/airbyte/pull/34573) | Don't emit state in SourceStateIterator when there is an underlying stream failure. |
228229
| 3.3.5 | 2024-02-12 | [34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in db name |

docs/integrations/sources/mysql/mysql-troubleshooting.md

+12
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,18 @@
1818
* Amazon RDS MySQL or MariaDB connection issues: If you see the following `Cannot create a PoolableConnectionFactory` error, please add `enabledTLSProtocols=TLSv1.2` in the JDBC parameters.
1919
* Amazon RDS MySQL connection issues: If you see `Error: HikariPool-1 - Connection is not available, request timed out after 30001ms.`, many times this due to your VPC not allowing public traffic. We recommend going through [this AWS troubleshooting checklist](https://aws.amazon.com/premiumsupport/knowledge-center/rds-cannot-connect/) to ensure the correct permissions/settings have been granted to allow Airbyte to connect to your database.
2020

21+
### Under CDC incremental mode, there are still full refresh syncs
22+
23+
Normally under the CDC mode, the MySQL source will first run a full refresh sync to read the snapshot of all the existing data, and all subsequent runs will only be incremental syncs reading from the binlogs. However, occasionally, you may see full refresh syncs after the initial run. When this happens, you will see the following log:
24+
25+
> Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch
26+
27+
The root causes is that the binglogs needed for the incremental sync have been removed by MySQL. This can occur under the following scenarios:
28+
29+
- When there are lots of database updates resulting in more WAL files than allowed in the `pg_wal` directory, Postgres will purge or archive the WAL files. This scenario is preventable. Possible solutions include:
30+
- Sync the data source more frequently.
31+
- Set a higher `binlog_expire_logs_seconds`. It's recommended to set this value to a time period of 7 days. See detailed documentation [here](https://dev.mysql.com/doc/refman/8.0/en/replication-options-binary-log.html#sysvar_binlog_expire_logs_seconds). The downside of this approach is that more disk space will be needed.
32+
2133
### EventDataDeserializationException errors during initial snapshot
2234

2335
When a sync runs for the first time using CDC, Airbyte performs an initial consistent snapshot of your database. Airbyte doesn't acquire any table locks \(for tables defined with MyISAM engine, the tables would still be locked\) while creating the snapshot to allow writes by other database clients. But in order for the sync to work without any error/unexpected behaviour, it is assumed that no schema changes are happening while the snapshot is running.

0 commit comments

Comments
 (0)