Skip to content

Commit 2432cc8

Browse files
authored
Destinations: Refreshes: CDK updates (#38067)
1 parent f310bcd commit 2432cc8

File tree

5 files changed

+68
-15
lines changed

5 files changed

+68
-15
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ corresponds to that version.
174174

175175
| Version | Date | Pull Request | Subject |
176176
| :------ | :--------- | :--------------------------------------------------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------- |
177+
| 0.39.0 | 2024-06-17 | [\#38067](https://github.com/airbytehq/airbyte/pull/38067) | Destinations: Breaking changes for refreshes (fail on INCOMPLETE stream status; ignore OVERWRITE sync mode) |
177178
| 0.38.2 | 2024-06-14 | [\#39460](https://github.com/airbytehq/airbyte/pull/39460) | Bump postgres JDBC driver version |
178179
| 0.38.1 | 2024-06-13 | [\#39445](https://github.com/airbytehq/airbyte/pull/39445) | Sources: More CDK changes to handle big initial snapshots. |
179180
| 0.38.0 | 2024-06-11 | [\#39405](https://github.com/airbytehq/airbyte/pull/39405) | Sources: Debezium properties manager interface changed to accept a list of streams to scope to |

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt

+18-4
Original file line numberDiff line numberDiff line change
@@ -162,13 +162,16 @@ constructor(
162162

163163
bufferManager.close()
164164

165+
val unsuccessfulStreams = ArrayList<StreamDescriptor>()
165166
val streamSyncSummaries =
166167
streamNames.associate { streamDescriptor ->
167-
// If we didn't receive a stream status message, assume success.
168-
// Platform won't send us any stream status messages yet (since we're not declaring
169-
// supportsRefresh in metadata), so we will always hit this case.
168+
// If we didn't receive a stream status message, assume failure.
169+
// This is possible if e.g. the orchestrator crashes before sending us the message.
170170
val terminalStatusFromSource =
171-
terminalStatusesFromSource[streamDescriptor] ?: AirbyteStreamStatus.COMPLETE
171+
terminalStatusesFromSource[streamDescriptor] ?: AirbyteStreamStatus.INCOMPLETE
172+
if (terminalStatusFromSource == AirbyteStreamStatus.INCOMPLETE) {
173+
unsuccessfulStreams.add(streamDescriptor)
174+
}
172175
StreamDescriptorUtils.withDefaultNamespace(
173176
streamDescriptor,
174177
bufferManager.defaultNamespace,
@@ -183,6 +186,17 @@ constructor(
183186
// as this throws an exception, we need to be after all other close functions.
184187
propagateFlushWorkerExceptionIfPresent()
185188
logger.info { "${AsyncStreamConsumer::class.java} closed" }
189+
190+
// In principle, platform should detect this.
191+
// However, as a backstop, the destination should still do this check.
192+
// This handles e.g. platform bugs where we don't receive a stream status message.
193+
// In this case, it would be misleading to mark the sync as successful, because e.g. we
194+
// maybe didn't commit a truncate.
195+
if (unsuccessfulStreams.isNotEmpty()) {
196+
throw RuntimeException(
197+
"Some streams were unsuccessful due to a source error: $unsuccessfulStreams"
198+
)
199+
}
186200
}
187201

188202
private fun getRecordCounter(streamDescriptor: StreamDescriptor): AtomicLong {
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.38.2
1+
version=0.39.0

airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt

+37-7
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,26 @@ class AsyncStreamConsumerTest {
148148
),
149149
),
150150
)
151+
private val STREAM2_SUCCESS_MESSAGE =
152+
Jsons.serialize(
153+
AirbyteMessage()
154+
.withType(AirbyteMessage.Type.TRACE)
155+
.withTrace(
156+
AirbyteTraceMessage()
157+
.withType(AirbyteTraceMessage.Type.STREAM_STATUS)
158+
.withStreamStatus(
159+
AirbyteStreamStatusTraceMessage()
160+
.withStreamDescriptor(
161+
StreamDescriptor()
162+
.withName(STREAM_NAME2)
163+
.withNamespace(SCHEMA_NAME),
164+
)
165+
.withStatus(
166+
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE
167+
),
168+
),
169+
),
170+
)
151171
private val STREAM2_FAILURE_MESSAGE =
152172
Jsons.serialize(
153173
AirbyteMessage()
@@ -262,6 +282,9 @@ class AsyncStreamConsumerTest {
262282
consumer.start()
263283
consumeRecords(consumer, expectedRecords)
264284
consumer.accept(Jsons.serialize(STATE_MESSAGE1), RECORD_SIZE_20_BYTES)
285+
consumer.accept(STREAM1_SUCCESS_MESSAGE, STREAM1_SUCCESS_MESSAGE.length)
286+
consumer.accept(STREAM2_SUCCESS_MESSAGE, STREAM2_SUCCESS_MESSAGE.length)
287+
consumer.accept(STREAM3_SUCCESS_MESSAGE, STREAM3_SUCCESS_MESSAGE.length)
265288
consumer.close()
266289

267290
verifyStartAndClose()
@@ -298,6 +321,9 @@ class AsyncStreamConsumerTest {
298321
consumeRecords(consumer, expectedRecords)
299322
consumer.accept(Jsons.serialize(STATE_MESSAGE1), RECORD_SIZE_20_BYTES)
300323
consumer.accept(Jsons.serialize(STATE_MESSAGE2), RECORD_SIZE_20_BYTES)
324+
consumer.accept(STREAM1_SUCCESS_MESSAGE, STREAM1_SUCCESS_MESSAGE.length)
325+
consumer.accept(STREAM2_SUCCESS_MESSAGE, STREAM2_SUCCESS_MESSAGE.length)
326+
consumer.accept(STREAM3_SUCCESS_MESSAGE, STREAM3_SUCCESS_MESSAGE.length)
301327
consumer.close()
302328

303329
verifyStartAndClose()
@@ -334,6 +360,9 @@ class AsyncStreamConsumerTest {
334360

335361
consumer.start()
336362
consumeRecords(consumer, allRecords)
363+
consumer.accept(STREAM1_SUCCESS_MESSAGE, STREAM1_SUCCESS_MESSAGE.length)
364+
consumer.accept(STREAM2_SUCCESS_MESSAGE, STREAM2_SUCCESS_MESSAGE.length)
365+
consumer.accept(STREAM3_SUCCESS_MESSAGE, STREAM3_SUCCESS_MESSAGE.length)
337366
consumer.close()
338367

339368
verifyStartAndClose()
@@ -496,7 +525,8 @@ class AsyncStreamConsumerTest {
496525
consumer.accept(STREAM1_SUCCESS_MESSAGE, STREAM1_SUCCESS_MESSAGE.length)
497526
consumer.accept(STREAM2_FAILURE_MESSAGE, STREAM2_FAILURE_MESSAGE.length)
498527
consumer.accept(STREAM3_SUCCESS_MESSAGE, STREAM3_SUCCESS_MESSAGE.length)
499-
consumer.close()
528+
// We had a failure, so expect an exception
529+
assertThrows(RuntimeException::class.java) { consumer.close() }
500530

501531
val captor: ArgumentCaptor<Map<StreamDescriptor, StreamSyncSummary>> =
502532
ArgumentCaptor.captor()
@@ -532,29 +562,29 @@ class AsyncStreamConsumerTest {
532562
consumer.start()
533563
consumeRecords(consumer, expectedRecords)
534564
// Note: no stream status messages
535-
consumer.close()
565+
// We assume stream failure, so expect an exception
566+
assertThrows(RuntimeException::class.java) { consumer.close() }
536567

537568
val captor: ArgumentCaptor<Map<StreamDescriptor, StreamSyncSummary>> =
538569
ArgumentCaptor.captor()
539570
Mockito.verify(onClose).accept(any(), capture(captor))
540571
assertEquals(
541-
// All streams have a COMPLETE status.
542-
// TODO: change this to INCOMPLETE after we switch the default behavior.
572+
// All streams have an INCOMPLETE status.
543573
mapOf(
544574
StreamDescriptor().withNamespace(SCHEMA_NAME).withName(STREAM_NAME) to
545575
StreamSyncSummary(
546576
expectedRecords.size.toLong(),
547-
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE,
577+
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.INCOMPLETE,
548578
),
549579
StreamDescriptor().withNamespace(SCHEMA_NAME).withName(STREAM_NAME2) to
550580
StreamSyncSummary(
551581
0,
552-
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE,
582+
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.INCOMPLETE,
553583
),
554584
StreamDescriptor().withNamespace(DEFAULT_NAMESPACE).withName(STREAM_NAME3) to
555585
StreamSyncSummary(
556586
0,
557-
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.COMPLETE,
587+
AirbyteStreamStatusTraceMessage.AirbyteStreamStatus.INCOMPLETE,
558588
),
559589
),
560590
captor.value,

airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt

+11-3
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ package io.airbyte.integrations.base.destination.typing_deduping
66
import com.google.common.annotations.VisibleForTesting
77
import io.airbyte.cdk.integrations.base.AirbyteExceptionHandler.Companion.addStringForDeinterpolation
88
import io.airbyte.cdk.integrations.base.JavaBaseConstants
9+
import io.airbyte.commons.exceptions.ConfigErrorException
910
import io.airbyte.commons.json.Jsons
1011
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
1112
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
13+
import io.airbyte.protocol.models.v0.DestinationSyncMode
1214
import io.github.oshai.kotlinlogging.KotlinLogging
1315
import java.util.Optional
1416
import java.util.function.Consumer
@@ -32,6 +34,12 @@ constructor(
3234
if (it.stream.namespace.isNullOrEmpty()) {
3335
it.stream.namespace = defaultNamespace
3436
}
37+
// The refreshes project is the beginning of the end for OVERWRITE syncs.
38+
// The sync mode still exists, but we are fully dependent on min_generation to trigger
39+
// overwrite logic.
40+
if (it.destinationSyncMode == DestinationSyncMode.OVERWRITE) {
41+
it.destinationSyncMode = DestinationSyncMode.APPEND
42+
}
3543
}
3644

3745
// this code is bad and I feel bad
@@ -122,9 +130,9 @@ constructor(
122130
@VisibleForTesting
123131
fun toStreamConfig(stream: ConfiguredAirbyteStream): StreamConfig {
124132
if (stream.generationId == null) {
125-
stream.generationId = 0
126-
stream.minimumGenerationId = 0
127-
stream.syncId = 0
133+
throw ConfigErrorException(
134+
"You must upgrade your platform version to use this connector version. Either downgrade your connector or upgrade platform to 0.63.0"
135+
)
128136
}
129137
if (
130138
stream.minimumGenerationId != 0.toLong() &&

0 commit comments

Comments
 (0)