Skip to content

Commit 05fd09b

Browse files
authored
Destinations: Refreshes: Track stream statuses in async framework (#38075)
1 parent a78647e commit 05fd09b

File tree

16 files changed

+443
-184
lines changed

16 files changed

+443
-184
lines changed

airbyte-cdk/java/airbyte-cdk/README.md

+1
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ corresponds to that version.
174174

175175
| Version | Date | Pull Request | Subject |
176176
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
177+
| 0.37.1 | 2024-06-10 | [\#38075](https://github.com/airbytehq/airbyte/pull/38075) | Destinations: Track stream statuses in async framework |
177178
| 0.37.0 | 2024-06-10 | [\#38121](https://github.com/airbytehq/airbyte/pull/38121) | Destinations: Set default namespace via CatalogParser |
178179
| 0.36.8 | 2024-06-07 | [\#38763](https://github.com/airbytehq/airbyte/pull/38763) | Increase Jackson message length limit |
179180
| 0.36.7 | 2024-06-06 | [\#39220](https://github.com/airbytehq/airbyte/pull/39220) | Handle null messages in ConnectorExceptionUtil |

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StreamSyncSummary.kt

+5-13
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,9 @@
33
*/
44
package io.airbyte.cdk.integrations.destination
55

6-
import java.util.*
6+
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus
77

8-
/**
9-
* @param recordsWritten The number of records written to the stream, or empty if the caller does
10-
* not track this information. (this is primarily for backwards-compatibility with the legacy
11-
* destinations framework; new implementations should always provide this information). If this
12-
* value is empty, consumers should assume that the sync wrote nonzero records for this stream.
13-
*/
14-
data class StreamSyncSummary(val recordsWritten: Optional<Long>) {
15-
16-
companion object {
17-
@JvmField val DEFAULT: StreamSyncSummary = StreamSyncSummary(Optional.empty())
18-
}
19-
}
8+
data class StreamSyncSummary(
9+
val recordsWritten: Long,
10+
val terminalStatus: AirbyteStreamStatus,
11+
)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumer.kt

+46-7
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnCloseF
1717
import io.airbyte.cdk.integrations.destination.buffered_stream_consumer.OnStartFunction
1818
import io.airbyte.commons.json.Jsons
1919
import io.airbyte.protocol.models.v0.AirbyteMessage
20+
import io.airbyte.protocol.models.v0.AirbyteStreamStatusTraceMessage.AirbyteStreamStatus
2021
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
2122
import io.airbyte.protocol.models.v0.StreamDescriptor
2223
import io.github.oshai.kotlinlogging.KotlinLogging
23-
import java.util.Optional
2424
import java.util.concurrent.ConcurrentHashMap
2525
import java.util.concurrent.ConcurrentMap
2626
import java.util.concurrent.ExecutorService
@@ -71,6 +71,8 @@ constructor(
7171

7272
// Note that this map will only be populated for streams with nonzero records.
7373
private val recordCounts: ConcurrentMap<StreamDescriptor, AtomicLong> = ConcurrentHashMap()
74+
private val terminalStatusesFromSource: ConcurrentMap<StreamDescriptor, AirbyteStreamStatus> =
75+
ConcurrentHashMap()
7476

7577
private var hasStarted = false
7678
private var hasClosed = false
@@ -103,12 +105,43 @@ constructor(
103105
airbyteMessageDeserializer.deserializeAirbyteMessage(
104106
message,
105107
)
106-
if (AirbyteMessage.Type.RECORD == partialAirbyteMessage.type) {
107-
validateRecord(partialAirbyteMessage)
108-
109-
partialAirbyteMessage.record?.streamDescriptor?.let {
110-
getRecordCounter(it).incrementAndGet()
108+
when (partialAirbyteMessage.type) {
109+
AirbyteMessage.Type.RECORD -> {
110+
validateRecord(partialAirbyteMessage)
111+
112+
partialAirbyteMessage.record?.streamDescriptor?.let {
113+
getRecordCounter(it).incrementAndGet()
114+
115+
if (terminalStatusesFromSource.containsKey(it)) {
116+
throw IllegalStateException(
117+
"Received a record message after a terminal stream status for stream ${it.namespace}.${it.name}"
118+
)
119+
}
120+
}
121+
}
122+
AirbyteMessage.Type.TRACE -> {
123+
// There are many types of trace messages, but we only care about stream status
124+
// messages with status=COMPLETE or INCOMPLETE.
125+
// INCOMPLETE is a slightly misleading name - it actually means "Stream has stopped
126+
// due to an interruption or error", i.e. failure
127+
partialAirbyteMessage.trace?.streamStatus?.let {
128+
val isTerminalStatus =
129+
it.status == AirbyteStreamStatus.COMPLETE ||
130+
it.status == AirbyteStreamStatus.INCOMPLETE
131+
if (isTerminalStatus) {
132+
val conflictsWithExistingStatus =
133+
terminalStatusesFromSource.containsKey(it.streamDescriptor) &&
134+
terminalStatusesFromSource[it.streamDescriptor] != it.status
135+
if (conflictsWithExistingStatus) {
136+
throw IllegalStateException(
137+
"Received conflicting stream statuses for stream ${it.streamDescriptor.namespace}.${it.streamDescriptor.name}"
138+
)
139+
}
140+
terminalStatusesFromSource[it.streamDescriptor] = it.status
141+
}
142+
}
111143
}
144+
else -> {}
112145
}
113146
bufferEnqueue.addRecord(
114147
partialAirbyteMessage,
@@ -131,12 +164,18 @@ constructor(
131164

132165
val streamSyncSummaries =
133166
streamNames.associate { streamDescriptor ->
167+
// If we didn't receive a stream status message, assume success.
168+
// Platform won't send us any stream status messages yet (since we're not declaring
169+
// supportsRefresh in metadata), so we will always hit this case.
170+
val terminalStatusFromSource =
171+
terminalStatusesFromSource[streamDescriptor] ?: AirbyteStreamStatus.COMPLETE
134172
StreamDescriptorUtils.withDefaultNamespace(
135173
streamDescriptor,
136174
bufferManager.defaultNamespace,
137175
) to
138176
StreamSyncSummary(
139-
Optional.of(getRecordCounter(streamDescriptor).get()),
177+
getRecordCounter(streamDescriptor).get(),
178+
terminalStatusFromSource,
140179
)
141180
}
142181
onClose.accept(hasFailed, streamSyncSummaries)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/buffers/BufferEnqueue.kt

+12-6
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,22 @@ class BufferEnqueue(
3333
message: PartialAirbyteMessage,
3434
sizeInBytes: Int,
3535
) {
36-
if (message.type == AirbyteMessage.Type.RECORD) {
37-
handleRecord(message, sizeInBytes)
38-
} else if (message.type == AirbyteMessage.Type.STATE) {
39-
stateManager.trackState(message, sizeInBytes.toLong())
36+
when (message.type) {
37+
AirbyteMessage.Type.RECORD -> {
38+
handleRecord(message, sizeInBytes)
39+
}
40+
AirbyteMessage.Type.STATE -> {
41+
stateManager.trackState(message, sizeInBytes.toLong())
42+
}
43+
else -> {}
4044
}
4145
}
4246

4347
private fun handleRecord(
4448
message: PartialAirbyteMessage,
4549
sizeInBytes: Int,
4650
) {
47-
val streamDescriptor = extractStateFromRecord(message)
51+
val streamDescriptor = extractStreamDescriptorFromRecord(message)
4852
val queue =
4953
buffers.computeIfAbsent(
5054
streamDescriptor,
@@ -87,7 +91,9 @@ class BufferEnqueue(
8791
}
8892

8993
companion object {
90-
private fun extractStateFromRecord(message: PartialAirbyteMessage): StreamDescriptor {
94+
private fun extractStreamDescriptorFromRecord(
95+
message: PartialAirbyteMessage
96+
): StreamDescriptor {
9197
return StreamDescriptor()
9298
.withNamespace(message.record?.namespace)
9399
.withName(message.record?.stream)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/deser/AirbyteMessageDeserializer.kt

+5-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@ package io.airbyte.cdk.integrations.destination.async.deser
66
import io.airbyte.cdk.integrations.destination.async.model.PartialAirbyteMessage
77
import io.airbyte.commons.json.Jsons
88
import io.airbyte.protocol.models.v0.AirbyteMessage
9+
import io.github.oshai.kotlinlogging.KotlinLogging
10+
11+
private val logger = KotlinLogging.logger {}
912

1013
class AirbyteMessageDeserializer(
1114
private val dataTransformer: StreamAwareDataTransformer = IdentityDataTransformer(),
@@ -53,8 +56,8 @@ class AirbyteMessageDeserializer(
5356
partial.record?.data = null
5457
} else if (AirbyteMessage.Type.STATE == msgType) {
5558
partial.withSerialized(message)
56-
} else {
57-
throw RuntimeException(String.format("Unsupported message type: %s", msgType))
59+
} else if (AirbyteMessage.Type.TRACE != msgType) {
60+
logger.warn { "Unsupported message type: $msgType" }
5861
}
5962

6063
return partial

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/model/PartialAirbyteMessage.kt

+7
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package io.airbyte.cdk.integrations.destination.async.model
77
import com.fasterxml.jackson.annotation.JsonProperty
88
import com.fasterxml.jackson.annotation.JsonPropertyDescription
99
import io.airbyte.protocol.models.v0.AirbyteMessage
10+
import io.airbyte.protocol.models.v0.AirbyteTraceMessage
1011
import java.util.Objects
1112

1213
class PartialAirbyteMessage {
@@ -26,6 +27,12 @@ class PartialAirbyteMessage {
2627
@JsonProperty("state")
2728
var state: PartialAirbyteStateMessage? = null
2829

30+
@get:JsonProperty("trace")
31+
@set:JsonProperty("trace")
32+
@JsonProperty("trace")
33+
// These messages don't contain arbitrary blobs, so just directly reference the protocol struct.
34+
var trace: AirbyteTraceMessage? = null
35+
2936
/**
3037
* For record messages, this stores the serialized data blob (i.e.
3138
* `Jsons.serialize(message.getRecord().getData())`). For state messages, this stores the

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumer.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,8 @@ internal constructor(
286286
* hasFailed=false, then it could be full success. if hasFailed=true, then going for partial
287287
* success.
288288
*/
289-
onClose.accept(false, null)
289+
// TODO what to do here?
290+
onClose.accept(false, HashMap())
290291
}
291292

292293
stateManager.listCommitted()!!.forEach(outputRecordCollector)

airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/OnCloseFunction.kt

+8-7
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,21 @@
55
package io.airbyte.cdk.integrations.destination.buffered_stream_consumer
66

77
import io.airbyte.cdk.integrations.destination.StreamSyncSummary
8-
import io.airbyte.commons.functional.CheckedBiConsumer
98
import io.airbyte.protocol.models.v0.StreamDescriptor
109

1110
/**
1211
* Interface allowing destination to specify clean up logic that must be executed after all
1312
* record-related logic has finished.
1413
*
15-
* The map of StreamSyncSummaries MUST be non-null, but MAY be empty. Streams not present in the map
16-
* will be treated as equivalent to [StreamSyncSummary.DEFAULT].
17-
*
1814
* The @JvmSuppressWildcards is here so that the 2nd parameter of accept stays a java
1915
* Map<StreamDescriptor, StreamSyncSummary> rather than becoming a Map<StreamDescriptor, ? extends
2016
* StreamSyncSummary>
2117
*/
22-
fun interface OnCloseFunction :
23-
CheckedBiConsumer<
24-
Boolean, @JvmSuppressWildcards Map<StreamDescriptor, StreamSyncSummary>, Exception>
18+
fun interface OnCloseFunction {
19+
@JvmSuppressWildcards
20+
@Throws(Exception::class)
21+
fun accept(
22+
hasFailed: Boolean,
23+
streamSyncSummaries: Map<StreamDescriptor, StreamSyncSummary>,
24+
)
25+
}
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.37.0
1+
version=0.37.1

0 commit comments

Comments
 (0)