Skip to content

Commit 000dc47

Browse files
authored
[WASS] : Enhancements in logging & analytics (#42122)
1 parent e5a1cb4 commit 000dc47

File tree

19 files changed

+51
-20
lines changed

19 files changed

+51
-20
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+5-4
Original file line numberDiff line numberDiff line change
@@ -174,11 +174,12 @@ corresponds to that version.
174174

175175
| Version | Date | Pull Request | Subject |
176176
|:-----------|:-----------| :--------------------------------------------------------- |:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| 0.42.2 | 2024-07-21 | [\#42122](https://github.com/airbytehq/airbyte/pull/42122) | Support for Debezium resync and shutdown scenarios. |
177178
| 0.42.2 | 2024-07-04 | [\#40208](https://github.com/airbytehq/airbyte/pull/40208) | Implement a new connector error handling and translation framework |
178-
| 0.41.8 | 2024-07-18 | [\#42068](https://github.com/airbytehq/airbyte/pull/42068) | Add analytics message for WASS occurrence. |
179-
| 0.41.7 | 2024-07-17 | [\#42055](https://github.com/airbytehq/airbyte/pull/42055) | Add debezium heartbeat timeout back to shutdown debezium. |
180-
| 0.41.6 | 2024-07-17 | [\#41996](https://github.com/airbytehq/airbyte/pull/41996) | Fix java interop compilation issue in Config/TransientErrorException. |
181-
| 0.41.5 | 2024-07-16 | [\#42011] (https://github.com/airbytehq/airbyte/pull/42011) | Async consumer accepts null default namespace |
179+
| 0.41.8 | 2024-07-18 | [\#42068](https://github.com/airbytehq/airbyte/pull/42068) | Add analytics message for WASS occurrence. |
180+
| 0.41.7 | 2024-07-17 | [\#42055](https://github.com/airbytehq/airbyte/pull/42055) | Add debezium heartbeat timeout back to shutdown debezium. |
181+
| 0.41.6 | 2024-07-17 | [\#41996](https://github.com/airbytehq/airbyte/pull/41996) | Fix java interop compilation issue in Config/TransientErrorException. |
182+
| 0.41.5 | 2024-07-16 | [\#42011] (https://github.com/airbytehq/airbyte/pull/42011) | Async consumer accepts null default namespace |
182183
| 0.41.4 | 2024-07-15 | [\#41959](https://github.com/airbytehq/airbyte/pull/41959) | Allow setting `internal_message` in Config/TransientErrorException. Destinations: shorten error message for INCOMPLETE stream status. |
183184
| 0.41.3 | 2024-07-15 | [\#41680](https://github.com/airbytehq/airbyte/pull/41680) | Fix: CompletableFutures.allOf now handles empty list and `Throwable` |
184185
| 0.41.2 | 2024-07-12 | [\#40567](https://github.com/airbytehq/airbyte/pull/40567) | Fix BaseSqlGenerator test case (generation_id support); update minimum platform version for refreshes support. |

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/DbAnalyticsUtils.kt

+12
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ object DbAnalyticsUtils {
1616
const val CDC_SNAPSHOT_FORCE_SHUTDOWN_KEY = "db-sources-snapshot-force-shutdown"
1717
const val DEBEZIUM_CLOSE_REASON_KEY = "db-sources-debezium-close-reason"
1818
const val WASS_OCCURRENCE_KEY = "db-sources-wass-occurrence"
19+
const val CDC_RESYNC_KEY = "db-sources-cdc-resync"
20+
const val DEBEZIUM_SHUTDOWN_ERROR = "debezium-shutdown-error"
1921

2022
@JvmStatic
2123
fun cdcCursorInvalidMessage(): AirbyteAnalyticsTraceMessage {
@@ -45,4 +47,14 @@ object DbAnalyticsUtils {
4547
fun wassOccurrenceMessage(): AirbyteAnalyticsTraceMessage {
4648
return AirbyteAnalyticsTraceMessage().withType(WASS_OCCURRENCE_KEY).withValue("1")
4749
}
50+
51+
@JvmStatic
52+
fun cdcResyncMessage(): AirbyteAnalyticsTraceMessage {
53+
return AirbyteAnalyticsTraceMessage().withType(CDC_RESYNC_KEY).withValue("1")
54+
}
55+
56+
@JvmStatic
57+
fun debeziumShutdownError(): AirbyteAnalyticsTraceMessage {
58+
return AirbyteAnalyticsTraceMessage().withType(DEBEZIUM_SHUTDOWN_ERROR).withValue("1")
59+
}
4860
}
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.42.2
1+
version=0.42.3

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumShutdownProcedure.kt

+3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
*/
44
package io.airbyte.cdk.integrations.debezium.internals
55

6+
import io.airbyte.cdk.db.DbAnalyticsUtils
7+
import io.airbyte.cdk.integrations.base.AirbyteTraceMessageUtility
68
import io.airbyte.commons.concurrency.VoidCallable
79
import io.airbyte.commons.lang.MoreBooleans
810
import io.github.oshai.kotlinlogging.KotlinLogging
@@ -89,6 +91,7 @@ class DebeziumShutdownProcedure<T>(
8991
debeziumThreadRequestClose.call()
9092
} catch (e: Exception) {
9193
exceptionDuringEngineClose = e
94+
AirbyteTraceMessageUtility.emitAnalyticsTrace(DbAnalyticsUtils.debeziumShutdownError())
9295
throw RuntimeException(e)
9396
} finally {
9497
try {

airbyte-integrations/connectors/source-mssql/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ plugins {
33
}
44

55
airbyteJavaConnector {
6-
cdkVersionRequired = '0.41.8'
6+
cdkVersionRequired = '0.42.3'
77
features = ['db-sources']
88
useLocalCdk = false
99
}

airbyte-integrations/connectors/source-mssql/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
12-
dockerImageTag: 4.1.0
12+
dockerImageTag: 4.1.1
1313
dockerRepository: airbyte/source-mssql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
1515
githubIssueLabel: source-mssql

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadRecordIterator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ protected AirbyteRecordData computeNext() {
8585
if (isCdcSync && cdcInitialLoadTimeout.isPresent()
8686
&& Duration.between(startInstant, Instant.now()).compareTo(cdcInitialLoadTimeout.get()) > 0) {
8787
final String cdcInitialLoadTimeoutMessage = String.format(
88-
"Initial load has taken longer than %s, Canceling sync so that CDC replication can catch-up on subsequent attempt, and then initial snapshotting will resume",
89-
cdcInitialLoadTimeout.get());
88+
"Initial load has taken longer than %s hours, Canceling sync so that CDC replication can catch-up on subsequent attempt, and then initial snapshotting will resume",
89+
cdcInitialLoadTimeout.get().toHours());
9090
LOGGER.info(cdcInitialLoadTimeoutMessage);
9191
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcSnapshotForceShutdownMessage());
9292
throw new TransientErrorException(cdcInitialLoadTimeoutMessage);

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialReadUtil.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@
55
package io.airbyte.integrations.source.mssql.initialsync;
66

77
import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage;
8+
import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcResyncMessage;
89
import static io.airbyte.cdk.db.DbAnalyticsUtils.wassOccurrenceMessage;
910
import static io.airbyte.integrations.source.mssql.MsSqlSpecConstants.FAIL_SYNC_OPTION;
1011
import static io.airbyte.integrations.source.mssql.MsSqlSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY;
12+
import static io.airbyte.integrations.source.mssql.MsSqlSpecConstants.RESYNC_DATA_OPTION;
1113
import static io.airbyte.integrations.source.mssql.MssqlCdcHelper.getDebeziumProperties;
1214
import static io.airbyte.integrations.source.mssql.MssqlQueryUtils.getTableSizeInfoForStreams;
1315
import static io.airbyte.integrations.source.mssql.cdc.MssqlCdcStateConstants.MSSQL_CDC_OFFSET;
@@ -161,8 +163,10 @@ public static boolean isSavedOffsetStillPresentOnServer(final JdbcDatabase datab
161163
INVALID_CDC_CURSOR_POSITION_PROPERTY).asText().equals(FAIL_SYNC_OPTION)) {
162164
throw new ConfigErrorException(
163165
"Saved offset no longer present on the server. Please reset the connection, and then increase binlog retention and/or increase sync frequency.");
166+
} else if (sourceConfig.get("replication_method").get(INVALID_CDC_CURSOR_POSITION_PROPERTY).asText().equals(RESYNC_DATA_OPTION)) {
167+
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcResyncMessage());
168+
LOGGER.warn("Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch");
164169
}
165-
LOGGER.warn("Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch");
166170
}
167171
return savedOffsetStillPresentOnServer;
168172
}

airbyte-integrations/connectors/source-mysql/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
}
77

88
airbyteJavaConnector {
9-
cdkVersionRequired = '0.42.1'
9+
cdkVersionRequired = '0.42.3'
1010
features = ['db-sources']
1111
useLocalCdk = false
1212
}

airbyte-integrations/connectors/source-mysql/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
12-
dockerImageTag: 3.6.0
12+
dockerImageTag: 3.6.1
1313
dockerRepository: airbyte/source-mysql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
1515
githubIssueLabel: source-mysql

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialLoadRecordIterator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ protected AirbyteRecordData computeNext() {
9999
if (isCdcSync && cdcInitialLoadTimeout.isPresent()
100100
&& Duration.between(startInstant, Instant.now()).compareTo(cdcInitialLoadTimeout.get()) > 0) {
101101
final String cdcInitialLoadTimeoutMessage = String.format(
102-
"Initial load for table %s has taken longer than %s, Canceling sync so that CDC replication can catch-up on subsequent attempt, and then initial snapshotting will resume",
103-
getAirbyteStream().get(), cdcInitialLoadTimeout.get());
102+
"Initial load for table %s has taken longer than %s hours, Canceling sync so that CDC replication can catch-up on subsequent attempt, and then initial snapshotting will resume",
103+
getAirbyteStream().get(), cdcInitialLoadTimeout.get().toHours());
104104
LOGGER.info(cdcInitialLoadTimeoutMessage);
105105
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcSnapshotForceShutdownMessage());
106106
throw new TransientErrorException(cdcInitialLoadTimeoutMessage);

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/initialsync/MySqlInitialReadUtil.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
package io.airbyte.integrations.source.mysql.initialsync;
66

77
import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage;
8+
import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcResyncMessage;
89
import static io.airbyte.cdk.db.DbAnalyticsUtils.wassOccurrenceMessage;
910
import static io.airbyte.integrations.source.mysql.MySqlQueryUtils.getTableSizeInfoForStreams;
1011
import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.FAIL_SYNC_OPTION;
1112
import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY;
13+
import static io.airbyte.integrations.source.mysql.MySqlSpecConstants.RESYNC_DATA_OPTION;
1214
import static io.airbyte.integrations.source.mysql.cdc.MysqlCdcStateConstants.MYSQL_CDC_OFFSET;
1315
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadGlobalStateManager.STATE_TYPE_KEY;
1416
import static io.airbyte.integrations.source.mysql.initialsync.MySqlInitialLoadStateManager.PRIMARY_KEY_STATE_TYPE;
@@ -160,8 +162,10 @@ public static boolean isSavedOffsetStillPresentOnServer(final JdbcDatabase datab
160162
INVALID_CDC_CURSOR_POSITION_PROPERTY).asText().equals(FAIL_SYNC_OPTION)) {
161163
throw new ConfigErrorException(
162164
"Saved offset no longer present on the server. Please reset the connection, and then increase binlog retention and/or increase sync frequency. See https://docs.airbyte.com/integrations/sources/mysql/mysql-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details.");
165+
} else if (sourceConfig.get("replication_method").get(INVALID_CDC_CURSOR_POSITION_PROPERTY).asText().equals(RESYNC_DATA_OPTION)) {
166+
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcResyncMessage());
167+
LOGGER.warn("Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch");
163168
}
164-
LOGGER.warn("Saved offset no longer present on the server, Airbyte is going to trigger a sync from scratch");
165169
}
166170
return savedOffsetStillPresentOnServer;
167171
}

airbyte-integrations/connectors/source-postgres/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ java {
1212
}
1313

1414
airbyteJavaConnector {
15-
cdkVersionRequired = '0.42.2'
15+
cdkVersionRequired = '0.42.3'
1616
features = ['db-sources', 'datastore-postgres']
1717
useLocalCdk = false
1818
}

airbyte-integrations/connectors/source-postgres/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
12-
dockerImageTag: 3.6.2
12+
dockerImageTag: 3.6.3
1313
dockerRepository: airbyte/source-postgres
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
1515
githubIssueLabel: source-postgres

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/cdc/PostgresCdcCtidInitializer.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
package io.airbyte.integrations.source.postgres.cdc;
66

77
import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcCursorInvalidMessage;
8+
import static io.airbyte.cdk.db.DbAnalyticsUtils.cdcResyncMessage;
89
import static io.airbyte.cdk.db.DbAnalyticsUtils.wassOccurrenceMessage;
910
import static io.airbyte.integrations.source.postgres.PostgresQueryUtils.streamsUnderVacuum;
1011
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.FAIL_SYNC_OPTION;
1112
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.INVALID_CDC_CURSOR_POSITION_PROPERTY;
13+
import static io.airbyte.integrations.source.postgres.PostgresSpecConstants.RESYNC_DATA_OPTION;
1214
import static io.airbyte.integrations.source.postgres.PostgresUtils.isDebugMode;
1315
import static io.airbyte.integrations.source.postgres.PostgresUtils.prettyPrintConfiguredAirbyteStreamList;
1416
import static io.airbyte.integrations.source.postgres.ctid.CtidUtils.createInitialLoader;
@@ -182,8 +184,10 @@ public static List<AutoCloseableIterator<AirbyteMessage>> cdcCtidIteratorsCombin
182184
INVALID_CDC_CURSOR_POSITION_PROPERTY).asText().equals(FAIL_SYNC_OPTION)) {
183185
throw new ConfigErrorException(
184186
"Saved offset is before replication slot's confirmed lsn. Please reset the connection, and then increase WAL retention and/or increase sync frequency to prevent this from happening in the future. See https://docs.airbyte.com/integrations/sources/postgres/postgres-troubleshooting#under-cdc-incremental-mode-there-are-still-full-refresh-syncs for more details.");
187+
} else if (sourceConfig.get("replication_method").get(INVALID_CDC_CURSOR_POSITION_PROPERTY).asText().equals(RESYNC_DATA_OPTION)) {
188+
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcResyncMessage());
189+
LOGGER.warn("Saved offset is before Replication slot's confirmed_flush_lsn, Airbyte will trigger sync from scratch");
185190
}
186-
LOGGER.warn("Saved offset is before Replication slot's confirmed_flush_lsn, Airbyte will trigger sync from scratch");
187191
} else if (!isDebugMode(sourceConfig) && PostgresUtils.shouldFlushAfterSync(sourceConfig)) {
188192
// We do not want to acknowledge the WAL logs in debug mode.
189193
postgresDebeziumStateUtil.commitLSNToPostgresDatabase(database.getDatabaseConfig(),

airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/ctid/InitialSyncCtidIterator.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ protected RowDataWithCtid computeNext() {
117117
if (isCdcSync && cdcInitialLoadTimeout.isPresent()
118118
&& Duration.between(startInstant, Instant.now()).compareTo(cdcInitialLoadTimeout.get()) > 0) {
119119
final String cdcInitialLoadTimeoutMessage = String.format(
120-
"Initial load for table %s has taken longer than %s, Canceling sync so that CDC replication can catch-up on subsequent attempt, and then initial snapshotting will resume",
121-
getAirbyteStream().get(), cdcInitialLoadTimeout.get());
120+
"Initial load for table %s has taken longer than %s hours, Canceling sync so that CDC replication can catch-up on subsequent attempt, and then initial snapshotting will resume",
121+
getAirbyteStream().get(), cdcInitialLoadTimeout.get().toHours());
122122
LOGGER.info(cdcInitialLoadTimeoutMessage);
123123
AirbyteTraceMessageUtility.emitAnalyticsTrace(cdcSnapshotForceShutdownMessage());
124124
throw new TransientErrorException(cdcInitialLoadTimeoutMessage);

docs/integrations/sources/mssql.md

+1
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,7 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura
422422

423423
| Version | Date | Pull Request | Subject |
424424
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
425+
| 4.1.1 | 2024-07-19 | [42122](https://github.com/airbytehq/airbyte/pull/42122) | Improve wass error message + logging. |
425426
| 4.1.0 | 2024-07-17 | [42078](https://github.com/airbytehq/airbyte/pull/42078) | WASS analytics + bug fixes. |
426427
| 4.0.36 | 2024-07-17 | [41648](https://github.com/airbytehq/airbyte/pull/41648) | Implement WASS. |
427428
| 4.0.35 | 2024-07-05 | [40570](https://github.com/airbytehq/airbyte/pull/40570) | Bump debezium-connector-sqlserver from 2.6.1.Final to 2.6.2.Final. |

docs/integrations/sources/mysql.md

+1
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ Any database or table encoding combination of charset and collation is supported
233233

234234
| Version | Date | Pull Request | Subject |
235235
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
236+
| 3.6.1 | 2024-07-19 | [42122](https://github.com/airbytehq/airbyte/pull/42122) | Improve wass error message + logging. |
236237
| 3.6.0 | 2024-07-17 | [40208](https://github.com/airbytehq/airbyte/pull/40208) | Start using the new error MySql source error handler that comes with a new error translation layer. |
237238
| 3.5.1 | 2024-07-17 | [42043](https://github.com/airbytehq/airbyte/pull/42043) | Adopt latest CDK + fixes. |
238239
| 3.5.0 | 2024-07-11 | [38240](https://github.com/airbytehq/airbyte/pull/38240) | Implement WASS. |

0 commit comments

Comments
 (0)