Skip to content

Commit b80a728

Browse files
xiaohansongbtkcodedevAnjayGoelmarcosmarxmbazarnov
authored
[source-postgres/mssql/mysql] Send state and count for full refresh (#39349)
Signed-off-by: dependabot[bot] <[email protected]> Co-authored-by: btkcodedev <[email protected]> Co-authored-by: Anjay Goel <[email protected]> Co-authored-by: marcosmarxm <[email protected]> Co-authored-by: Baz <[email protected]> Co-authored-by: Natalie Kwong <[email protected]> Co-authored-by: Audrey Maldonado <[email protected]> Co-authored-by: gosusnp <[email protected]> Co-authored-by: Augustin <[email protected]> Co-authored-by: Abdul Rahman Zantout <[email protected]> Co-authored-by: Anatolii Yatsuk <[email protected]> Co-authored-by: Dhroov Makwana <[email protected]> Co-authored-by: Alexandre Girard <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Serhii Lazebnyi <[email protected]> Co-authored-by: Marcos Marx <[email protected]> Co-authored-by: Ben Church <[email protected]> Co-authored-by: Justin Flannery <[email protected]> Co-authored-by: Mal Hancock <[email protected]> Co-authored-by: Enrique Alcázar Garzás <[email protected]> Co-authored-by: Daniela García Nistor <[email protected]> Co-authored-by: Carlo Nuccio <[email protected]> Co-authored-by: Natik Gadzhi <[email protected]> Co-authored-by: Charlie Duong <[email protected]> Co-authored-by: Yue Li <[email protected]> Co-authored-by: Cristina Mariscal <[email protected]> Co-authored-by: cristina.mariscal <[email protected]> Co-authored-by: Gonzalo Villafañe Tapia <[email protected]> Co-authored-by: Jérémy Denquin <[email protected]> Co-authored-by: Maxime Carbonneau-Leclerc <[email protected]> Co-authored-by: williammcguinness <[email protected]> Co-authored-by: Marius Posta <[email protected]> Co-authored-by: Stephane Geneix <[email protected]> Co-authored-by: Gireesh Sreepathi <[email protected]> Co-authored-by: Akash Kulkarni <[email protected]> Co-authored-by: Danylo Jablonski <[email protected]> Co-authored-by: Serhii Lazebnyi <[email protected]> Co-authored-by: alafanechere <[email protected]>
1 parent f9bade8 commit b80a728

File tree

23 files changed

+222
-69
lines changed

23 files changed

+222
-69
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,8 @@ corresponds to that version.
173173
### Java CDK
174174

175175
| Version | Date | Pull Request | Subject |
176-
| :------ | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
176+
|:--------| :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
177+
| 0.40.1 | 2024-06-14 | [\#39349](https://github.com/airbytehq/airbyte/pull/39349) | Source stats for full refresh streams |
177178
| 0.40.0 | 2024-06-17 | [\#38622](https://github.com/airbytehq/airbyte/pull/38622) | Destinations: Implement refreshes logic in AbstractStreamOperation |
178179
| 0.39.0 | 2024-06-17 | [\#38067](https://github.com/airbytehq/airbyte/pull/38067) | Destinations: Breaking changes for refreshes (fail on INCOMPLETE stream status; ignore OVERWRITE sync mode) |
179180
| 0.38.2 | 2024-06-14 | [\#39460](https://github.com/airbytehq/airbyte/pull/39460) | Bump postgres JDBC driver version |
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.40.0
1+
version=0.40.1

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt

+32
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ import io.airbyte.cdk.integrations.source.relationaldb.InitialLoadHandler
4646
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils
4747
import io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier
4848
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo
49+
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateIterator
50+
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateMessageProducer
51+
import io.airbyte.cdk.integrations.source.relationaldb.state.StateEmitFrequency
4952
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager
5053
import io.airbyte.commons.exceptions.ConfigErrorException
5154
import io.airbyte.commons.functional.CheckedConsumer
@@ -70,6 +73,7 @@ import java.sql.Connection
7073
import java.sql.PreparedStatement
7174
import java.sql.ResultSet
7275
import java.sql.SQLException
76+
import java.time.Duration
7377
import java.time.Instant
7478
import java.util.*
7579
import java.util.function.Consumer
@@ -173,12 +177,40 @@ abstract class AbstractJdbcSource<Datatype>(
173177
cursorField,
174178
)
175179

180+
if (airbyteStream.syncMode == FULL_REFRESH) {
181+
var defaultProducer = getSourceStateProducerForNonResumableFullRefreshStream(database)
182+
if (defaultProducer != null) {
183+
iterator =
184+
AutoCloseableIterators.transform(
185+
{ autoCloseableIterator: AutoCloseableIterator<AirbyteMessage> ->
186+
SourceStateIterator(
187+
autoCloseableIterator,
188+
airbyteStream,
189+
defaultProducer,
190+
StateEmitFrequency(stateEmissionFrequency.toLong(), Duration.ZERO)
191+
)
192+
},
193+
iterator,
194+
AirbyteStreamUtils.convertFromNameAndNamespace(
195+
airbyteStream.stream.name,
196+
airbyteStream.stream.namespace
197+
)
198+
)
199+
}
200+
}
201+
176202
return when (airbyteStream.syncMode) {
177203
FULL_REFRESH -> augmentWithStreamStatus(airbyteStream, iterator)
178204
else -> iterator
179205
}
180206
}
181207

208+
protected open fun getSourceStateProducerForNonResumableFullRefreshStream(
209+
database: JdbcDatabase
210+
): SourceStateMessageProducer<AirbyteMessage>? {
211+
return null
212+
}
213+
182214
open fun augmentWithStreamStatus(
183215
airbyteStream: ConfiguredAirbyteStream,
184216
streamItrator: AutoCloseableIterator<AirbyteMessage>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
3+
*/
4+
5+
package io.airbyte.cdk.integrations.source.relationaldb.state
6+
7+
import io.airbyte.protocol.models.v0.AirbyteMessage
8+
import io.airbyte.protocol.models.v0.AirbyteStateMessage
9+
import io.airbyte.protocol.models.v0.AirbyteStreamState
10+
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
11+
import io.airbyte.protocol.models.v0.StreamDescriptor
12+
13+
class NonResumableStateMessageProducer<T>(
14+
private val isCdc: Boolean,
15+
private val sourceStateMessageProducer: SourceStateMessageProducer<T>
16+
) : SourceStateMessageProducer<AirbyteMessage> {
17+
override fun generateStateMessageAtCheckpoint(
18+
stream: ConfiguredAirbyteStream?
19+
): AirbyteStateMessage? {
20+
return null
21+
}
22+
23+
override fun processRecordMessage(
24+
stream: ConfiguredAirbyteStream?,
25+
message: AirbyteMessage
26+
): AirbyteMessage {
27+
return message
28+
}
29+
30+
override fun createFinalStateMessage(stream: ConfiguredAirbyteStream?): AirbyteStateMessage? {
31+
if (isCdc) {
32+
return sourceStateMessageProducer.createFinalStateMessage(stream)
33+
} else {
34+
val airbyteStreamState =
35+
AirbyteStreamState()
36+
.withStreamDescriptor(
37+
StreamDescriptor()
38+
.withName(stream!!.stream.name)
39+
.withNamespace(stream.stream.namespace),
40+
)
41+
42+
return AirbyteStateMessage()
43+
.withType(AirbyteStateMessage.AirbyteStateType.STREAM)
44+
.withStream(airbyteStreamState)
45+
}
46+
}
47+
48+
// no intermediate state message.
49+
override fun shouldEmitStateMessage(stream: ConfiguredAirbyteStream?): Boolean {
50+
return false
51+
}
52+
}

airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.kt

+10-19
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@ import com.google.common.collect.AbstractIterator
77
import io.airbyte.protocol.models.v0.AirbyteMessage
88
import io.airbyte.protocol.models.v0.AirbyteStateStats
99
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
10-
import io.airbyte.protocol.models.v0.SyncMode
10+
import io.github.oshai.kotlinlogging.KotlinLogging
1111
import java.time.Duration
1212
import java.time.Instant
1313
import java.time.OffsetDateTime
1414

15+
private val LOGGER = KotlinLogging.logger {}
16+
1517
open class SourceStateIterator<T>(
1618
private val messageIterator: Iterator<T>,
1719
private val stream: ConfiguredAirbyteStream?,
@@ -40,11 +42,9 @@ open class SourceStateIterator<T>(
4042
) {
4143
val stateMessage =
4244
sourceStateMessageProducer.generateStateMessageAtCheckpoint(stream)
43-
if (shouldAttachCountWithState()) {
44-
stateMessage!!.withSourceStats(
45-
AirbyteStateStats().withRecordCount(recordCount.toDouble())
46-
)
47-
}
45+
stateMessage!!.withSourceStats(
46+
AirbyteStateStats().withRecordCount(recordCount.toDouble())
47+
)
4848

4949
recordCount = 0L
5050
lastCheckpoint = Instant.now()
@@ -65,11 +65,10 @@ open class SourceStateIterator<T>(
6565
hasEmittedFinalState = true
6666
val finalStateMessageForStream =
6767
sourceStateMessageProducer.createFinalStateMessage(stream)
68-
if (shouldAttachCountWithState()) {
69-
finalStateMessageForStream!!.withSourceStats(
70-
AirbyteStateStats().withRecordCount(recordCount.toDouble())
71-
)
72-
}
68+
finalStateMessageForStream!!.withSourceStats(
69+
AirbyteStateStats().withRecordCount(recordCount.toDouble())
70+
)
71+
7372
recordCount = 0L
7473
return AirbyteMessage()
7574
.withType(AirbyteMessage.Type.STATE)
@@ -79,14 +78,6 @@ open class SourceStateIterator<T>(
7978
}
8079
}
8180

82-
/**
83-
* We are disabling counts for FULL_REFRESH streams cause there is are issues with it. We should
84-
* re-enable it once we do the work for project Counts: Emit Counts in Full Refresh
85-
*/
86-
private fun shouldAttachCountWithState(): Boolean {
87-
return stream?.syncMode != SyncMode.FULL_REFRESH
88-
}
89-
9081
// This method is used to check if we should emit a state message. If the record count is set to
9182
// 0,
9283
// we should not emit a state message.

airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt

+15-5
Original file line numberDiff line numberDiff line change
@@ -781,7 +781,11 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
781781
modelsSchema(),
782782
)
783783
} else {
784-
assertExpectedStateMessageCountMatches(stateMessages1, MODEL_RECORDS.size.toLong())
784+
// We are expecting count match for all streams, including non RFR streams.
785+
assertExpectedStateMessageCountMatches(
786+
stateMessages1,
787+
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong()
788+
)
785789

786790
// Expect state and record message from MODEL_RECORDS_2.
787791
assertStreamStatusTraceMessageIndex(
@@ -923,8 +927,11 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
923927
waitForCdcRecords(modelsSchema(), MODELS_STREAM_NAME, 1)
924928

925929
// assertExpectedStateMessages(stateMessages1)
926-
// Non resumeable full refresh does not get any state messages.
927-
assertExpectedStateMessageCountMatches(stateMessages1, MODEL_RECORDS.size.toLong())
930+
// Non resumeable full refresh will also get state messages with count.
931+
assertExpectedStateMessageCountMatches(
932+
stateMessages1,
933+
MODEL_RECORDS.size.toLong() + MODEL_RECORDS_2.size.toLong()
934+
)
928935
assertExpectedRecords(
929936
(MODEL_RECORDS_2 + MODEL_RECORDS).toSet(),
930937
recordMessages1,
@@ -933,15 +940,18 @@ abstract class CdcSourceTest<S : Source, T : TestDatabase<*, T, *>> {
933940
modelsSchema(),
934941
)
935942

943+
// Platform will remove non RFR streams before each new sync.
936944
val state = Jsons.jsonNode(listOf(stateMessages1[stateMessages1.size - 1]))
945+
val streamStates = state.get(0).get("global").get("stream_states") as ArrayNode
946+
removeStreamState(MODELS_STREAM_NAME_2, streamStates)
947+
937948
val read2 = source().read(config()!!, configuredCatalog, state)
938949
val actualRecords2 = AutoCloseableIterators.toListAndClose(read2)
939950

940951
val recordMessages2 = extractRecordMessages(actualRecords2)
941952
val stateMessages2 = extractStateMessages(actualRecords2)
942953

943-
assertExpectedStateMessagesFromIncrementalSync(stateMessages2)
944-
assertExpectedStateMessageCountMatches(stateMessages2, 1)
954+
assertExpectedStateMessageCountMatches(stateMessages2, 1 + MODEL_RECORDS_2.size.toLong())
945955
assertExpectedRecords(
946956
(MODEL_RECORDS_2 + listOf(puntoRecord)).toSet(),
947957
recordMessages2,

airbyte-integrations/connectors/source-mssql/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ plugins {
33
}
44

55
airbyteJavaConnector {
6-
cdkVersionRequired = '0.38.1'
6+
cdkVersionRequired = '0.40.1'
77
features = ['db-sources']
88
useLocalCdk = false
99
}

airbyte-integrations/connectors/source-mssql/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
12-
dockerImageTag: 4.0.29
12+
dockerImageTag: 4.0.30
1313
dockerRepository: airbyte/source-mssql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
1515
githubIssueLabel: source-mssql

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import static io.airbyte.cdk.integrations.debezium.internals.DebeziumEventConverter.CDC_UPDATED_AT;
1010
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbQueryUtils.*;
1111
import static io.airbyte.cdk.integrations.source.relationaldb.RelationalDbReadUtil.identifyStreamsForCursorBased;
12+
import static io.airbyte.integrations.source.mssql.MssqlCdcHelper.isCdc;
1213
import static io.airbyte.integrations.source.mssql.MssqlQueryUtils.getCursorBasedSyncStatusForStreams;
1314
import static io.airbyte.integrations.source.mssql.MssqlQueryUtils.getTableSizeInfoForStreams;
1415
import static io.airbyte.integrations.source.mssql.initialsync.MssqlInitialReadUtil.*;
@@ -37,6 +38,8 @@
3738
import io.airbyte.cdk.integrations.source.relationaldb.InitialLoadHandler;
3839
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo;
3940
import io.airbyte.cdk.integrations.source.relationaldb.models.CursorBasedStatus;
41+
import io.airbyte.cdk.integrations.source.relationaldb.state.NonResumableStateMessageProducer;
42+
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateMessageProducer;
4043
import io.airbyte.cdk.integrations.source.relationaldb.state.StateGeneratorUtils;
4144
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
4245
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManagerFactory;
@@ -639,7 +642,7 @@ protected void initializeForStateManager(final JdbcDatabase database,
639642
return;
640643
}
641644
var sourceConfig = database.getSourceConfig();
642-
if (MssqlCdcHelper.isCdc(sourceConfig)) {
645+
if (isCdc(sourceConfig)) {
643646
initialLoadStateManager = getMssqlInitialLoadGlobalStateManager(database, catalog, stateManager, tableNameToTable, getQuoteString());
644647
} else {
645648
final MssqlCursorBasedStateManager cursorBasedStateManager = new MssqlCursorBasedStateManager(stateManager.getRawStateMessages(), catalog);
@@ -656,7 +659,7 @@ public InitialLoadHandler<JDBCType> getInitialLoadHandler(final JdbcDatabase dat
656659
final ConfiguredAirbyteCatalog catalog,
657660
final StateManager stateManager) {
658661
var sourceConfig = database.getSourceConfig();
659-
if (MssqlCdcHelper.isCdc(sourceConfig)) {
662+
if (isCdc(sourceConfig)) {
660663
return getMssqlFullRefreshInitialLoadHandler(database, catalog, initialLoadStateManager, stateManager, airbyteStream, Instant.now(),
661664
getQuoteString())
662665
.get();
@@ -677,6 +680,11 @@ public boolean supportResumableFullRefresh(final JdbcDatabase database, final Co
677680
return false;
678681
}
679682

683+
@Override
684+
protected SourceStateMessageProducer<AirbyteMessage> getSourceStateProducerForNonResumableFullRefreshStream(final JdbcDatabase database) {
685+
return new NonResumableStateMessageProducer<>(isCdc(database.getSourceConfig()), initialLoadStateManager);
686+
}
687+
680688
@NotNull
681689
@Override
682690
public AutoCloseableIterator<AirbyteMessage> augmentWithStreamStatus(@NotNull final ConfiguredAirbyteStream airbyteStream,

airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/initialsync/MssqlInitialLoadGlobalStateManager.java

+18-6
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class MssqlInitialLoadGlobalStateManager extends MssqlInitialLoadStateMan
3030

3131
// No special handling for resumable full refresh streams. We will report the cursor as it is.
3232
private Set<AirbyteStreamNameNamespacePair> resumableFullRefreshStreams;
33+
private Set<AirbyteStreamNameNamespacePair> nonResumableFullRefreshStreams;
3334

3435
public MssqlInitialLoadGlobalStateManager(final InitialLoadStreams initialLoadStreams,
3536
final Map<AirbyteStreamNameNamespacePair, OrderedColumnInfo> pairToOrderedColInfo,
@@ -60,16 +61,21 @@ private void initStreams(final InitialLoadStreams initialLoadStreams,
6061
final ConfiguredAirbyteCatalog catalog) {
6162
this.streamsThatHaveCompletedSnapshot = new HashSet<>();
6263
this.resumableFullRefreshStreams = new HashSet<>();
64+
this.nonResumableFullRefreshStreams = new HashSet<>();
65+
6366
catalog.getStreams().forEach(configuredAirbyteStream -> {
67+
var pairInStream =
68+
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace());
6469
if (!initialLoadStreams.streamsForInitialLoad().contains(configuredAirbyteStream)
6570
&& configuredAirbyteStream.getSyncMode() == SyncMode.INCREMENTAL) {
66-
this.streamsThatHaveCompletedSnapshot.add(
67-
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace()));
71+
this.streamsThatHaveCompletedSnapshot.add(pairInStream);
6872
}
69-
if (initialLoadStreams.streamsForInitialLoad().contains(configuredAirbyteStream)
70-
&& configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
71-
this.resumableFullRefreshStreams.add(
72-
new AirbyteStreamNameNamespacePair(configuredAirbyteStream.getStream().getName(), configuredAirbyteStream.getStream().getNamespace()));
73+
if (configuredAirbyteStream.getSyncMode() == SyncMode.FULL_REFRESH) {
74+
if (initialLoadStreams.streamsForInitialLoad().contains(configuredAirbyteStream)) {
75+
this.resumableFullRefreshStreams.add(pairInStream);
76+
} else {
77+
this.nonResumableFullRefreshStreams.add(pairInStream);
78+
}
7379
}
7480
});
7581
}
@@ -128,6 +134,12 @@ public AirbyteStateMessage createFinalStateMessage(final ConfiguredAirbyteStream
128134
streamStates.add(getAirbyteStreamState(stream, Jsons.jsonNode(ocStatus)));
129135
});
130136

137+
nonResumableFullRefreshStreams.forEach(stream -> {
138+
streamStates.add(new AirbyteStreamState()
139+
.withStreamDescriptor(
140+
new StreamDescriptor().withName(stream.getName()).withNamespace(stream.getNamespace())));
141+
});
142+
131143
return new AirbyteStateMessage()
132144
.withType(AirbyteStateType.GLOBAL)
133145
.withGlobal(generateGlobalState(streamStates));

airbyte-integrations/connectors/source-mysql/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
}
77

88
airbyteJavaConnector {
9-
cdkVersionRequired = '0.38.0'
9+
cdkVersionRequired = '0.40.1'
1010
features = ['db-sources']
1111
useLocalCdk = false
1212
}

airbyte-integrations/connectors/source-mysql/metadata.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ data:
99
connectorSubtype: database
1010
connectorType: source
1111
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
12-
dockerImageTag: 3.4.9
12+
dockerImageTag: 3.4.10
1313
dockerRepository: airbyte/source-mysql
1414
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
1515
githubIssueLabel: source-mysql

airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java

+7
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import io.airbyte.cdk.integrations.source.relationaldb.DbSourceDiscoverUtil;
4747
import io.airbyte.cdk.integrations.source.relationaldb.InitialLoadHandler;
4848
import io.airbyte.cdk.integrations.source.relationaldb.TableInfo;
49+
import io.airbyte.cdk.integrations.source.relationaldb.state.NonResumableStateMessageProducer;
50+
import io.airbyte.cdk.integrations.source.relationaldb.state.SourceStateMessageProducer;
4951
import io.airbyte.cdk.integrations.source.relationaldb.state.StateGeneratorUtils;
5052
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManager;
5153
import io.airbyte.cdk.integrations.source.relationaldb.state.StateManagerFactory;
@@ -228,6 +230,11 @@ public InitialLoadHandler<MysqlType> getInitialLoadHandler(final JdbcDatabase da
228230
}
229231
}
230232

233+
@Override
234+
protected SourceStateMessageProducer<AirbyteMessage> getSourceStateProducerForNonResumableFullRefreshStream(final JdbcDatabase database) {
235+
return new NonResumableStateMessageProducer<>(isCdc(database.getSourceConfig()), initialLoadStateManager);
236+
}
237+
231238
private static AirbyteStream overrideSyncModes(final AirbyteStream stream) {
232239
return stream.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL));
233240
}

0 commit comments

Comments
 (0)