Skip to content

Commit 0bf645c

Browse files
authored
[Source-Postgres] : Add config to throw an error on invalid CDC position (#35304)
1 parent be6519f commit 0bf645c

File tree

10 files changed

+102
-8
lines changed

10 files changed

+102
-8
lines changed

airbyte-integrations/connectors/source-postgres/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
12-
dockerImageTag: 3.3.10
12+
dockerImageTag: 3.3.11
1313
dockerRepository: airbyte/source-postgres
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
1515
githubIssueLabel: source-postgres
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.integrations.source.postgres;
6+
7+
// Constants defined in
8+
// airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json.
9+
public class PostgresSpecConstants {
10+
11+
public static final String INVALID_CDC_CURSOR_POSITION_PROPERTY = "invalid_cdc_cursor_position_behavior";
12+
public static final String FAIL_SYNC_OPTION = "Fail sync";
13+
public static final String RESYNC_DATA_OPTION = "Re-sync data";
14+
15+
}

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java

+7
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage;
88
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.streamsUnderVacuum;
9+
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.FAIL_SYNC_OPTION;
10+
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY;
911
import static io.airbyte.integrations.source.postgres.PostgresUtils.isDebugMode;
1012
import static io.airbyte.integrations.source.postgres.PostgresUtils.prettyPrintConfiguredAirbyteStreamList;
1113

@@ -112,6 +114,11 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin
112114

113115
if (!savedOffsetAfterReplicationSlotLSN) {
114116
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcCursorInvalidMessage());
117+
if (!sourceConfig.get("replication_method").has(INVALID_CDC_CURSOR_POSITION_PROPERTY) || sourceConfig.get("replication_method").get(
118+
INVALID_CDC_CURSOR_POSITION_PROPERTY).asText().equals(FAIL_SYNC_OPTION)) {
119+
throw new ConfigErrorException(
120+
"Saved offset is before replication slot's confirmed lsn. Please reset the connection, and then increase WAL retention or reduce sync frequency to prevent this from happening in the future. See https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details.");
121+
}
115122
LOGGER.warn("Saved offset is before Replication slot's confirmed_flush_lsn, Airbyte will trigger sync from scratch");
116123
} else if (!isDebugMode(sourceConfig) && PostgresUtils.shouldFlushAfterSync(sourceConfig)) {
117124
// We do not want to acknowledge the WAL logs in debug mode.

airbyte-integrations/connectors/source-postgres/src/main/resources/spec.json

+8
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,14 @@
296296
"description": "Specifies a query that the connector executes on the source database when the connector sends a heartbeat message. Please see the <a href=\"https://docs.airbyte.com/integrations/sources/postgres/postgres-wal-disk-consumption-and-heartbeat-action-query\">setup guide</a> for how and when to configure this setting.",
297297
"default": "",
298298
"order": 8
299+
},
300+
"invalid_cdc_cursor_position_behavior": {
301+
"type": "string",
302+
"title": "Invalid CDC position behavior (Advanced)",
303+
"description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.",
304+
"enum": ["Fail sync", "Re-sync data"],
305+
"default": "Fail sync",
306+
"order": 9
299307
}
300308
}
301309
},

airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_cloud_deployment_spec.json

+8
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,14 @@
297297
"description": "Specifies a query that the connector executes on the source database when the connector sends a heartbeat message. Please see the <a href=\"https://docs.airbyte.com/integrations/sources/postgres/postgres-wal-disk-consumption-and-heartbeat-action-query\">setup guide</a> for how and when to configure this setting.",
298298
"default": "",
299299
"order": 8
300+
},
301+
"invalid_cdc_cursor_position_behavior": {
302+
"type": "string",
303+
"title": "Invalid CDC position behavior (Advanced)",
304+
"description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.",
305+
"enum": ["Fail sync", "Re-sync data"],
306+
"default": "Fail sync",
307+
"order": 9
300308
}
301309
}
302310
},

airbyte-integrations/connectors/source-postgres/src/test-integration/resources/expected_spec.json

+8
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,14 @@
296296
"description": "Specifies a query that the connector executes on the source database when the connector sends a heartbeat message. Please see the <a href=\"https://docs.airbyte.com/integrations/sources/postgres/postgres-wal-disk-consumption-and-heartbeat-action-query\">setup guide</a> for how and when to configure this setting.",
297297
"default": "",
298298
"order": 8
299+
},
300+
"invalid_cdc_cursor_position_behavior": {
301+
"type": "string",
302+
"title": "Invalid CDC position behavior (Advanced)",
303+
"description": "Determines whether Airbyte should fail or re-sync data in case of an stale/invalid cursor value into the WAL. If 'Fail sync' is chosen, a user will have to manually reset the connection before being able to continue syncing data. If 'Re-sync data' is chosen, Airbyte will automatically trigger a refresh but could lead to higher cloud costs and data loss.",
304+
"enum": ["Fail sync", "Re-sync data"],
305+
"default": "Fail sync",
306+
"order": 9
299307
}
300308
}
301309
},

airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java

+45-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import static io.airbyte.cdk.integrations.debezium.internals.DebeziumEventConverter.CDC_DELETED_AT;
1010
import static io.airbyte.cdk.integrations.debezium.internals.DebeziumEventConverter.CDC_LSN;
1111
import static io.airbyte.cdk.integrations.debezium.internals.DebeziumEventConverter.CDC_UPDATED_AT;
12+
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.FAIL_SYNC_OPTION;
13+
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.RESYNC_DATA_OPTION;
1214
import static io.airbyte.integrations.source.postgres.ctid.CtidStateManager.STATE_TYPE_KEY;
1315
import static io.airbyte.integrations.source.postgres.ctid.InitialSyncCtidIteratorConstants.USE_TEST_CHUNK_SIZE;
1416
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -95,7 +97,7 @@ protected JsonNode config() {
9597
return testdb.testConfigBuilder()
9698
.withSchemas(modelsSchema(), modelsSchema() + "_random")
9799
.withoutSsl()
98-
.withCdcReplication("After loading Data in the destination")
100+
.withCdcReplication("After loading Data in the destination", RESYNC_DATA_OPTION)
99101
.with(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1)
100102
.with("heartbeat_action_query", "")
101103
.build();
@@ -122,7 +124,7 @@ void testDebugMode() {
122124
final JsonNode invalidDebugConfig = testdb.testConfigBuilder()
123125
.withSchemas(modelsSchema(), modelsSchema() + "_random")
124126
.withoutSsl()
125-
.withCdcReplication("While reading Data")
127+
.withCdcReplication("While reading Data", RESYNC_DATA_OPTION)
126128
.with(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1)
127129
.with("debug_mode", true)
128130
.build();
@@ -604,6 +606,47 @@ private void createAndPopulateTimestampTable() {
604606
}
605607
}
606608

609+
@Test
610+
void testSyncShouldFailPurgedLogs() throws Exception {
611+
final int recordsToCreate = 20;
612+
613+
final JsonNode config = testdb.testConfigBuilder()
614+
.withSchemas(modelsSchema(), modelsSchema() + "_random")
615+
.withoutSsl()
616+
.withCdcReplication("While reading Data", FAIL_SYNC_OPTION)
617+
.with(SYNC_CHECKPOINT_RECORDS_PROPERTY, 1)
618+
.build();
619+
final AutoCloseableIterator<AirbyteMessage> firstBatchIterator = source()
620+
.read(config, getConfiguredCatalog(), null);
621+
final List<AirbyteMessage> dataFromFirstBatch = AutoCloseableIterators
622+
.toListAndClose(firstBatchIterator);
623+
final List<AirbyteStateMessage> stateAfterFirstBatch = extractStateMessages(dataFromFirstBatch);
624+
assertExpectedStateMessages(stateAfterFirstBatch);
625+
// second batch of records again 20 being created
626+
bulkInsertRecords(recordsToCreate);
627+
628+
// Extract the last state message
629+
final JsonNode state = Jsons.jsonNode(Collections.singletonList(stateAfterFirstBatch.get(stateAfterFirstBatch.size() - 1)));
630+
final AutoCloseableIterator<AirbyteMessage> secondBatchIterator = source()
631+
.read(config, getConfiguredCatalog(), state);
632+
final List<AirbyteMessage> dataFromSecondBatch = AutoCloseableIterators
633+
.toListAndClose(secondBatchIterator);
634+
final List<AirbyteStateMessage> stateAfterSecondBatch = extractStateMessages(dataFromSecondBatch);
635+
assertExpectedStateMessagesFromIncrementalSync(stateAfterSecondBatch);
636+
637+
for (int recordsCreated = 0; recordsCreated < 1; recordsCreated++) {
638+
final JsonNode record =
639+
Jsons.jsonNode(ImmutableMap
640+
.of(COL_ID, 400 + recordsCreated, COL_MAKE_ID, 1, COL_MODEL,
641+
"H-" + recordsCreated));
642+
writeModelRecord(record);
643+
}
644+
645+
// Triggering sync with the first sync's state only which would mimic a scenario that the second
646+
// sync failed on destination end, and we didn't save state
647+
assertThrows(ConfigErrorException.class, () -> source().read(config, getConfiguredCatalog(), state));
648+
}
649+
607650
@Test
608651
protected void syncShouldHandlePurgedLogsGracefully() throws Exception {
609652

airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresTestDatabase.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
package io.airbyte.integrations.source.postgres;
66

7+
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY;
8+
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.RESYNC_DATA_OPTION;
9+
710
import com.google.common.collect.ImmutableMap;
811
import io.airbyte.cdk.db.factory.DatabaseDriver;
912
import io.airbyte.cdk.db.jdbc.JdbcUtils;
@@ -174,10 +177,10 @@ public PostgresConfigBuilder withStandardReplication() {
174177
}
175178

176179
public PostgresConfigBuilder withCdcReplication() {
177-
return withCdcReplication("While reading Data");
180+
return withCdcReplication("While reading Data", RESYNC_DATA_OPTION);
178181
}
179182

180-
public PostgresConfigBuilder withCdcReplication(String LsnCommitBehaviour) {
183+
public PostgresConfigBuilder withCdcReplication(String LsnCommitBehaviour, String cdcCursorFailBehaviour) {
181184
return this
182185
.with("is_test", true)
183186
.with("replication_method", Jsons.jsonNode(ImmutableMap.builder()
@@ -186,6 +189,7 @@ public PostgresConfigBuilder withCdcReplication(String LsnCommitBehaviour) {
186189
.put("publication", testDatabase.getPublicationName())
187190
.put("initial_waiting_seconds", DEFAULT_CDC_REPLICATION_INITIAL_WAIT.getSeconds())
188191
.put("lsn_commit_behaviour", LsnCommitBehaviour)
192+
.put(INVALID_CDC_CURSOR_POSITION_PROPERTY, cdcCursorFailBehaviour)
189193
.build()));
190194
}
191195

docs/integrations/sources/postgres.md

+3-2
Original file line numberDiff line numberDiff line change
@@ -292,9 +292,10 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp
292292

293293
| Version | Date | Pull Request | Subject |
294294
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
295+
| 3.3.11 | 2024-02-20 | [35304](https://github.com/airbytehq/airbyte/pull/35304) | Add config to throw an error on invalid CDC position and enable it by default. |
295296
| 3.3.10 | 2024-02-13 | [35036](https://github.com/airbytehq/airbyte/pull/34751) | Emit analytics message for invalid CDC cursor. |
296-
| 3.3.9 | 2024-02-13 | [35224](https://github.com/airbytehq/airbyte/pull/35224) | Adopt CDK 0.20.4 |
297-
| 3.3.8 | 2024-02-08 | [34751](https://github.com/airbytehq/airbyte/pull/34751) | Adopt CDK 0.19.0 |
297+
| 3.3.9 | 2024-02-13 | [35224](https://github.com/airbytehq/airbyte/pull/35224) | Adopt CDK 0.20.4 |
298+
| 3.3.8 | 2024-02-08 | [34751](https://github.com/airbytehq/airbyte/pull/34751) | Adopt CDK 0.19.0 |
298299
| 3.3.7 | 2024-02-08 | [34781](https://github.com/airbytehq/airbyte/pull/34781) | Add a setting in the setup page to advance the LSN. |
299300
| 3.3.6 | 2024-02-07 | [34892](https://github.com/airbytehq/airbyte/pull/34892) | Adopt CDK v0.16.6 |
300301
| 3.3.5 | 2024-02-07 | [34948](https://github.com/airbytehq/airbyte/pull/34948) | Adopt CDK v0.16.5 |

docs/integrations/sources/postgres/postgres-troubleshooting.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ Normally under the CDC mode, the Postgres source will first run a full refresh s
7878
The root causes is that the WALs needed for the incremental sync has been removed by Postgres. This can occur under the following scenarios:
7979

8080
- When there are lots of database updates resulting in more WAL files than allowed in the `pg_wal` directory, Postgres will purge or archive the WAL files. This scenario is preventable. Possible solutions include:
81-
- Sync the data source more frequently. The downside is that more computation resources will be consumed, leading to a higher Airbyte bill.
81+
- Sync the data source more frequently.
8282
- Set a higher `wal_keep_size`. If no unit is provided, it is in megabytes, and the default is `0`. See detailed documentation [here](https://www.postgresql.org/docs/current/runtime-config-replication.html#GUC-WAL-KEEP-SIZE). The downside of this approach is that more disk space will be needed.
8383
- When the Postgres connector successfully reads the WAL and acknowledges it to Postgres, but the destination connector fails to consume the data, the Postgres connector will try to read the same WAL again, which may have been removed by Postgres, since the WAL record is already acknowledged. This scenario is rare, because it can happen, and currently there is no way to prevent it. The correct behavior is to perform a full refresh.
8484

0 commit comments

Comments
 (0)